10

从零开始手敲次世代游戏引擎(八十)

 3 years ago
source link: https://zhuanlan.zhihu.com/p/139997744
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

从零开始手敲次世代游戏引擎(八十)

前两天我在想法里转了一个MIT八十多的老教授仍然在录制网课的事情,并说希望自己也能像他一样。

well,这个目标现在正好过半了 还要努力很久。不过,很高兴,这个系列文章终于进入第八十篇(其实算上各种特别篇已经快九十了吧)。这个意义上,很接近了。

到目前为止,我们的引擎还只是个单线程的执行模型。单线程的最大好处是不会发生data-racing,也就是不会出现两个以上线程同时需要修改同一块buffer,或者,一个线程读取到另外一个线程正在修改当中(半新半旧)的数据。

简单来说,就是确定性强,好控制。

如果CPU的发展是按照当年X86、286、386、486、586、686的轨迹一直继续到今天的话,我们也许并不需要去改变它。然而,CPU单核的处理能力并不是可以无限提高的。事实上,在过去的10年当中,这种提升已经变得非常缓慢。

于是乎,CPU厂商开始动起了多核心的脑筋。这就好比车没法开得更快了,那么可以把车做得更大,多坐点儿人,同样可以提升(运送旅客)的效率。

但是这也就要求我们的程序能够在多个核心上跑,也就是要支持多线程。否则,我们将无法充分发挥现代CPU的处理能力。

有些事情天然是具有并行性质的。比如互联网。一个网页1个人在看和100万个人在看,在处理的流程上并没有什么不同,只不过是需要按人数进行重复。因此,即便CPU是单核的,我们依然可以通过增加CPU个数,或者服务器的台数来实现性能的提升,而且这种提升在相当大的一个区间当中是接近线性的。

但是很可惜,游戏并没有那么好运。游戏的实时交互性决定了它与用户操作的环环相扣的特点:你只有在得到用户输入之后,才能知道接下来该干些什么,显示什么,而且这个过程必须在ms级别很快完成。

我们来看一个典型的游戏基本循环:

接受用户输入--更新游戏状态--渲染出画面

这3个环节是有非常严格的次序的,不能交换,也不能并行。

而且,用户只有在看到更新之后的画面之后,才能知道下一个操作是啥。因此这3个部分所形成的循环节之间,也是无法并行的。

接下来考虑两个用户(A、B)对战的情况,他们之间能并行吗?well,A和B通过联网都在玩一个游戏,看起来他们是并行的。然而,他们两个的输入共同构成了对游戏的最终输入,同等地决定了游戏下一帧的状态。

也就是说,虽然两个人,两台设备,分别操作,但是在概念上实际上依然只有一个上面的游戏循环。

所以,当参与游戏的玩家越来越多时,游戏的技术难度实际上是在不断升级,性能实际上是在不断下降的。因为越来越多的时间需要消耗在同步分散的用户操作和他们的设备上,而游戏本体的循环依然只有一个,并享受不到横向扩展所带来的好处。

好了,扯得有点儿远了,让我们回到单机的情况。因为游戏的这种特点,我们并不能以让单机上多核每个跑一个游戏循环的方式来提升游戏的性能。我们能够做到的是,尽量缩短(加速)这个循环的时间,或者说,拓宽在一个循环当中所能做得事情的量。

比如,游戏当中有6个小兵。如果这6个小兵是独立行动的,那么他们的AI、位置更新、碰撞计算这些,就可以并行进行。即便它们之间有协作关系,在大多数时候,我们也可以将其解耦到2帧或者更多帧:每个小兵根据上一帧其它小兵的状态计算出自己接下来的状态。

这就很类似神经网络当中前一层节点和后一层节点的关系。每一层当中的节点相互之间是没有联系的。但是如果前一层代表它们在上一个时间点的状态,那么它们共同决定了后一层(现在)各个节点的状态。

前面说了游戏的短循环以及循环之间的顺序锁定决定了它不能很好并行化。但是换个角度,这种短循环也使得每次迭代的边界条件十分清晰:每一帧只需要考虑前一帧的状态即可,而且因为循环很短,帧间的状态结构变化是不大的。不像普通的应用程序,不同的用户操作流程直接导致整个界面呀模块呀都不一样,也就是基础结构(节点的拓扑结构)发生了变化。

我想这就是为啥游戏作为非常大规模的软件,看起来有着那么丰富的内容,但是总体上比一般类似规模的应用程序品质更加稳定,或者说更容易稳定的原因。

好了,扯了1600多字,其实都是白扯,因为我们的引擎AI部分还是一篇未开挖的泥土。但是我们还是有一个很好的可以利用并行的地方:资源的加载。

我们的引擎,因为单线程,而且图片解压缩自己写的,还没有优化,现在加载一个非常简单点场景(两个mesh、2个材质、1个天空盒)都需要5s+的时间。

对程序进行profile之后(比如使用visual studio的性能监视器),发现果不其然大部分时间都是花在了图片的加载上。那么一个很容易想到的方法就是将这部分进行并行化。

并行化文件读取一般有两种方式:

  1. 使用系统提供的异步IO接口。好处是调用者依然可以延续单线程模式,多线程部分由异步IO库实现。但是API接口以及使用方法发生变化,需要改动代码逻辑;
  2. 自己实现多线程管理。好处是可以重用之前的绝大部分代码。但是需要自己处理线程的创建销毁和管理,并且要防止data racing,就是一开篇提到的数据冲突。

考虑到我们的游戏资源的加载并不仅仅是读取文件,还有文件格式的读取和解压转换等操作,使用异步IO加速效果有限;况且今后别的环节(如AI/物理/渲染等)也不可避免地要多线程化,所以我决定采用方式2。

线程是一种操作系统提供的资源,它的创建需要系统调用的帮助。这就像我们无法在用户态下直接命令CPU将程序转移到另外一个核心一样,即便使用汇编都不行。这是因为这部分的特殊调用被限制在内核态,用户态是没有直接使用这些调用的权利的。

在早期,C/C++里面创建线程依靠pthread(或者类似的)库。这种方法今天依旧有效,只不过C++标准将其包装了一下,放入到标准库当中,称为std::thread,使其看起来更加面向对象。

#include <iostream>
#include <utility>
#include <thread>
#include <chrono>
 
void f1(int n)
{
    for (int i = 0; i < 5; ++i) {
        std::cout << "Thread 1 executing\n";
        ++n;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}
 
void f2(int& n)
{
    for (int i = 0; i < 5; ++i) {
        std::cout << "Thread 2 executing\n";
        ++n;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}
 
class foo
{
public:
    void bar()
    {
        for (int i = 0; i < 5; ++i) {
            std::cout << "Thread 3 executing\n";
            ++n;
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
    int n = 0;
};
 
class baz
{
public:
    void operator()()
    {
        for (int i = 0; i < 5; ++i) {
            std::cout << "Thread 4 executing\n";
            ++n;
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
    int n = 0;
};
 
int main()
{
    int n = 0;
    foo f;
    baz b;
    std::thread t1; // t1 is not a thread
    std::thread t2(f1, n + 1); // pass by value
    std::thread t3(f2, std::ref(n)); // pass by reference
    std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
    std::thread t5(&foo::bar, &f); // t5 runs foo::bar() on object f
    std::thread t6(b); // t6 runs baz::operator() on object b
    t2.join();
    t4.join();
    t5.join();
    t6.join();
    std::cout << "Final value of n is " << n << '\n';
    std::cout << "Final value of foo::n is " << f.n << '\n';
}

[1]

不过其实如果仅仅是这样除了看起来更加C++一点儿之外,并没有方便太多。上面提到了,thread的创建依赖系统调用,而且需要系统为其分配一些资源,比如线程描述子、专属堆栈等,还要将其登记到系统的线程调度器所维护的一张大表当中,从而得以分配到所需的硬件资源。所以这是一个相对开销较大的过程。

所以线程并非是越多越好的。一个是创建及销毁的开销,一个是加大系统调度器当中的表格导致的查询更新开销。

使用std::thread进行实现的时候,最容易想到的就是逐资源(逐文件)开线程。这当然可以,能跑起来。但是一个游戏所需的资源往往成千上万,开上千个线程并不是什么好主意。

如果线程的创建和销毁耗费资源,那么我们是不是尽量不要去创建新线程,或者说不要去销毁使用完毕的线程,而是重用它们就好了呢?

答案是肯定的。这就是线程池的概念。线程池就是事先创建几个“工作线程”,然后不断重用这些线程。在实现时,由于通常线程在执行完毕之后就进入zombie状态等待join而无法重用,我们需要用一个近似无限的循环来保持其永远不结束。但是这样一来我们就不能简单地通过创建线程时指定要执行的函数来指定任务,而是需要实现一种机制来给线程派发任务。而且对象线程很可能还没有完成之前的任务,这就是说我们还需要实现一种任务的排队机制。

这在以往(C++11之前)是比较麻烦的。因为不同的任务的返回值类型不同,而且它要跨越线程边界,我们需要保证对其访问不出现data-race。但是C++11为我们提供了std::future对象,它能够很好地包装跨越线程的返回值。同时,C++11还为我们提供了std::function以及lambda对象,它们能够将函数(任务)以类似普通参数的方式进行传递(虽然C语言的函数指针也能实现类似效果,但是从语法上来说显然后者更加清晰)

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;
    
    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};
 
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif

[2]

事实上,C++11还为我们提供了std::async这个模版,使得我们都不用自己去实现Thread Pool。std::async模版提供至少两种执行模式:异步(async)和延迟(deffered)。这两种模式都会将任务交给工作线程,区别在于前者是在创建时立即开始执行,而后者在创建时不会执行,而是等待对其发生任何进一步操作(比如查询执行结果、或者是wait、或者是销毁)的时候开始执行。

std::async同样使用std::future来包装返回值。我们在创建std::async任务的时候,会得到这个std::future对象。之后我们就可以通过它查询异步执行的结果。

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
 
std::mutex m;
struct X {
    void foo(int i, const std::string& str) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << str << ' ' << i << '\n';
    }
    void bar(const std::string& str) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << str << '\n';
    }
    int operator()(int i) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << i << '\n';
        return i + 10;
    }
};
 
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
    auto len = end - beg;
    if (len < 1000)
        return std::accumulate(beg, end, 0);
 
    RandomIt mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                             parallel_sum<RandomIt>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}
 
int main()
{
    std::vector<int> v(10000, 1);
    std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
 
    X x;
    // Calls (&x)->foo(42, "Hello") with default policy:
    // may print "Hello 42" concurrently or defer execution
    auto a1 = std::async(&X::foo, &x, 42, "Hello");
    // Calls x.bar("world!") with deferred policy
    // prints "world!" when a2.get() or a2.wait() is called
    auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
    // Calls X()(43); with async policy
    // prints "43" concurrently
    auto a3 = std::async(std::launch::async, X(), 43);
    a2.wait();                     // prints "world!"
    std::cout << a3.get() << '\n'; // prints "53"
} // if a1 is not done at this point, destructor of a1 prints "Hello 42" here

[3]

所以,简单来说我们只需要使用这个std::async,对我们代码当中想要异步执行(并行执行)的部分进行一下简单的包装就可以了。

下面是改动之前的,同步加载贴图,以及对贴图进行色深变换(以符合GPU要求)的代码:

void LoadTexture() {
    if (!m_pImage)
    {
        // we should lookup if the texture has been loaded already to prevent
        // duplicated load. This could be done in Asset Loader Manager.
        Buffer buf = g_pAssetLoader->SyncOpenAndReadBinary(m_Name.c_str());
        std::string ext = m_Name.substr(m_Name.find_last_of('.'));
        if (ext == ".jpg" || ext == ".jpeg")
        {
            JfifParser jfif_parser;
            m_pImage = std::make_shared<Image>(jfif_parser.Parse(buf));
        }
        else if (ext == ".png")
        {
            PngParser png_parser;
            m_pImage = std::make_shared<Image>(png_parser.Parse(buf));
        }
        else if (ext == ".bmp")
        {
            BmpParser bmp_parser;
            m_pImage = std::make_shared<Image>(bmp_parser.Parse(buf));
        }
        else if (ext == ".tga")
        {
            TgaParser tga_parser;
            m_pImage = std::make_shared<Image>(tga_parser.Parse(buf));
        }
        else if (ext == ".dds")
        {
            DdsParser dds_parser;
            m_pImage = std::make_shared<Image>(dds_parser.Parse(buf));
        }
        else if (ext == ".hdr")
        {
            HdrParser hdr_parser;
            m_pImage = std::make_shared<Image>(hdr_parser.Parse(buf));
        }
    }
}

void AdjustTextureBitcount()
{
    // GPU does not support 24bit and 48bit textures, so adjust it
    if (m_pImage->bitcount == 24)
    {
        // DXGI does not have 24bit formats so we have to extend it to 32bit
        uint32_t new_pitch = m_pImage->pitch / 3 * 4;
        size_t data_size = new_pitch * m_pImage->Height;
        auto* data = new uint8_t[data_size];
        uint8_t* buf;
        uint8_t* src;
        for (uint32_t row = 0; row < m_pImage->Height; row++) {
            buf = data + row * new_pitch;
            src = m_pImage->data + row * m_pImage->pitch;
            for (uint32_t col = 0; col < m_pImage->Width; col++) {
                memcpy(buf, src, 3);
                memset(buf+3, 0x00, 1);  // set alpha to 0
                buf += 4;
                src += 3;
            }
        }

        delete m_pImage->data;
        m_pImage->data = data;
        m_pImage->data_size = data_size;
        m_pImage->pitch = new_pitch;
        m_pImage->bitcount = 32;
        
        // adjust mipmaps
        for (uint32_t mip = 0; mip < m_pImage->mipmap_count; mip++)
        {
            m_pImage->mipmaps[mip].pitch = m_pImage->mipmaps[mip].pitch / 3 * 4;
            m_pImage->mipmaps[mip].offset = m_pImage->mipmaps[mip].offset / 3 * 4;
            m_pImage->mipmaps[mip].data_size = m_pImage->mipmaps[mip].data_size / 3 * 4;
        }
    }
    else if (m_pImage->bitcount == 48)
    {
        // DXGI does not have 48bit formats so we have to extend it to 64bit
        uint32_t new_pitch = m_pImage->pitch / 3 * 4;
        size_t data_size = new_pitch * m_pImage->Height;
        auto* data = new uint8_t[data_size];
        uint8_t* buf;
        uint8_t* src;
        for (uint32_t row = 0; row < m_pImage->Height; row++) {
            buf = data + row * new_pitch;
            src = m_pImage->data + row * m_pImage->pitch;
            for (uint32_t col = 0; col < m_pImage->Width; col++) {
                memcpy(buf, src, 6);
                memset(buf+6, 0x00, 2); // set alpha to 0
                buf += 8;
                src += 6;
            }
        }

        delete m_pImage->data;
        m_pImage->data = data;
        m_pImage->data_size = data_size;
        m_pImage->pitch = new_pitch;
        m_pImage->bitcount = 64;
        
        // adjust mipmaps
        for (uint32_t mip = 0; mip < m_pImage->mipmap_count; mip++)
        {
            m_pImage->mipmaps[mip].pitch = m_pImage->mipmaps[mip].pitch / 3 * 4;
            m_pImage->mipmaps[mip].offset = m_pImage->mipmaps[mip].offset / 3 * 4;
            m_pImage->mipmaps[mip].data_size = m_pImage->mipmaps[mip].data_size / 3 * 4;
        }
    }
}

我们首先将这两部分合并成一个函数(当然也可以通过写一个函数按顺序调用这两个函数),并且将其中对于成员变量m_pImage的直接访问改为对临时自动变量image的访问,在函数的最末尾,通过原子操作以及memory order,将image存入m_pImage

bool SceneObjectTexture::LoadTexture() {
    if(!g_pAssetLoader->FileExists(m_Name.c_str())) return false;

    cerr << "Start async loading of " << m_Name << endl;

    Image image;
    Buffer buf = g_pAssetLoader->SyncOpenAndReadBinary(m_Name.c_str());
    string ext = m_Name.substr(m_Name.find_last_of('.'));
    if (ext == ".jpg" || ext == ".jpeg")
    {
        JfifParser jfif_parser;
        image = jfif_parser.Parse(buf);
    }
    else if (ext == ".png")
    {
        PngParser png_parser;
        image = png_parser.Parse(buf);
    }
    else if (ext == ".bmp")
    {
        BmpParser bmp_parser;
        image = bmp_parser.Parse(buf);
    }
    else if (ext == ".tga")
    {
        TgaParser tga_parser;
        image = tga_parser.Parse(buf);
    }
    else if (ext == ".dds")
    {
        DdsParser dds_parser;
        image = dds_parser.Parse(buf);
    }
    else if (ext == ".hdr")
    {
        HdrParser hdr_parser;
        image = hdr_parser.Parse(buf);
    }

    // GPU does not support 24bit and 48bit textures, so adjust it
    if (image.bitcount == 24)
    {
        // DXGI does not have 24bit formats so we have to extend it to 32bit
        uint32_t new_pitch = image.pitch / 3 * 4;
        size_t data_size = (size_t)new_pitch * image.Height;
        auto* data = new uint8_t[data_size];
        uint8_t* buf;
        uint8_t* src;
        for (uint32_t row = 0; row < image.Height; row++) {
            buf = data + (ptrdiff_t)row * new_pitch;
            src = image.data + (ptrdiff_t)row * image.pitch;
            for (uint32_t col = 0; col < image.Width; col++) {
                memcpy(buf, src, 3);
                memset(buf+3, 0x00, 1);  // set alpha to 0
                buf += 4;
                src += 3;
            }
        }

        delete[] image.data;
        image.data = data;
        image.data_size = data_size;
        image.pitch = new_pitch;
        image.bitcount = 32;
        
        // adjust mipmaps
        for (uint32_t mip = 0; mip < image.mipmap_count; mip++)
        {
            image.mipmaps[mip].pitch = image.mipmaps[mip].pitch / 3 * 4;
            image.mipmaps[mip].offset = image.mipmaps[mip].offset / 3 * 4;
            image.mipmaps[mip].data_size = image.mipmaps[mip].data_size / 3 * 4;
        }
    }
    else if (image.bitcount == 48)
    {
        // DXGI does not have 48bit formats so we have to extend it to 64bit
        uint32_t new_pitch = image.pitch / 3 * 4;
        size_t data_size = (size_t)new_pitch * image.Height;
        auto* data = new uint8_t[data_size];
        uint8_t* buf;
        uint8_t* src;
        for (uint32_t row = 0; row < image.Height; row++) {
            buf = data + (ptrdiff_t)row * new_pitch;
            src = image.data + (ptrdiff_t)row * image.pitch;
            for (uint32_t col = 0; col < image.Width; col++) {
                memcpy(buf, src, 6);
                memset(buf+6, 0x00, 2); // set alpha to 0
                buf += 8;
                src += 6;
            }
        }

        delete[] image.data;
        image.data = data;
        image.data_size = data_size;
        image.pitch = new_pitch;
        image.bitcount = 64;
        
        // adjust mipmaps
        for (uint32_t mip = 0; mip < image.mipmap_count; mip++)
        {
            image.mipmaps[mip].pitch = image.mipmaps[mip].pitch / 3 * 4;
            image.mipmaps[mip].offset = image.mipmaps[mip].offset / 3 * 4;
            image.mipmaps[mip].data_size = image.mipmaps[mip].data_size / 3 * 4;
        }
    }

    cerr << "End async loading of " << m_Name << endl;

    atomic_store_explicit(&m_pImage, make_shared<Image>(std::move(image)), std::memory_order::memory_order_release);

    return true;
}

然后,让我们新建一个方法,来创建异步加载贴图的任务:

void SceneObjectTexture::LoadTextureAsync() {
    if(!m_asyncLoadFuture.valid())
    {
        m_asyncLoadFuture = async(launch::async, &SceneObjectTexture::LoadTexture, this);
    }
}

其中,对于m_asyncLoadFuture.valid()的检查,是为了避免重复开启异步任务。当一个future是valid的时候,说明它处于share状态,也就是有异步线程正在使用它。对于这种情况,显然我们无需再次创建异步任务。

最后需要修改的是获取贴图的地方。这里需要考虑异步任务的完成状态,进行不同的操作:

std::shared_ptr<Image> SceneObjectTexture::GetTextureImage()
{ 
    if(m_asyncLoadFuture.valid())
    {
        m_asyncLoadFuture.wait();
        assert(m_asyncLoadFuture.get());
        return atomic_load_explicit(&m_pImage, std::memory_order::memory_order_acquire);
    }
    else
    {
        return m_pImage;
    }
}

这里目前对于未完成的贴图加载,是采取等待的方法。由于读取贴图的是渲染线程,这会造成线程的卡顿,也就是丢帧。更好的做法其实是返回一张临时的全局静态贴图。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK