3

聊聊多线程程序的load balance

 2 years ago
source link: https://blogread.cn/it/article/7248?f=hot1
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

聊聊多线程程序的load balance

浏览:1754次  出处信息

   说起load balance,一般比较容易想到的是大型服务在多个replica之间的load balance、和kernal的load balance。前者一般只是在流量入口做一下流量分配,逻辑相对简单;而后者则比较复杂,需要不断发现正在运行的各个进程之间的imbalance,然后通过将进程在CPU之间进行迁移,使得各个CPU都被充分利用起来。

   而本文想要讨论的load balance有别于以上两种,它是多线程(多进程)server程序内部,各个worker线程(进程)之间的load balance。

   考虑一种常用的server模型:一个receiver线程负责接收请求,后面有一个线程池装了一堆worker线程,收到的请求被分派给这些worker进行处理。receiver与worker之间通过pthread_cond+request_queue来进行通信。一般的做法是:receiver将收到的请求放入queue,然后signal一下cond,就OK了。具体哪个worker会被唤醒,那是kernel的事情(实际上kernel会遵循先来后到原则,唤醒先进入等待的进程,参阅《linux futex浅析》)。通常情况下这样做就足够了,receiver唤醒worker不需要涉及load balance的逻辑。但是有时候我们还是可以做一些load balance的工作,来提高server的性能。

kernel load balance概述

   由于这里的load balance跟kernel的load balance息息相关,所以我们有必要先看看kernel的load balance都做了些什么。详细的内容请参阅《linux内核SMP负载均衡浅析》,这里只做一些简要的概括。

   说白了,kernel的load balance就做一件事情: 让系统中RUNNING状态的进程尽可能的被分摊,在每一个调度域上看都是balance的 。怎么理解呢?现在CPU的结构一般有:物理CPU、core、超线程、这么几个层次。”在每一个调度域上看都balance”可以理解为在每一个层次上都balance:每个物理CPU上的总load相当、每个core上的总load相当、每个超线程上的load也相当。

   我们在系统中看到的”CPU”都是最底层的超线程这个层次,我们可能会直观的认为把RUNNING状态的进程分摊到每一个”CPU”上就行了,但是实际上kernel的load balance还有更高的要求。假设我们的机器有2个物理CPU、每个物理CPU有2个core、每个core有2个超线程,共8个”CPU”。如果现在有8个RUNNING状态的进程(假设优先级都相同),每个”CPU”各分摊一个进程,那么自然就是balance的。但是如果现在只有4个RUNNING状态的进程(假设优先级都相同),真正的balance并不仅仅是每个进程各自落到一个”CPU”上就行了,而是进一步要求每个物理CPU上跑两个进程、每个core上跑一个进程。

   为什么要有这样的强约束呢?因为尽管各个”CPU”逻辑上是独立的(不存在主从关系之类),但它们并非孤立存在。相同物理CPU下的”CPU”会共享cache、相同core下的”CPU”会共享计算资源(所谓的超线程也就是一套流水线跑两个线程)。而共享也就意味着争抢。所以,在RUNNING状态的进程并非正好均摊给每一个”CPU”的情况下,需要考虑更高层次的CPU是否被均摊,以避免cache和CPU流水线的争抢(当然,除了性能,这也体现了kernel的公平性)。

   最后再多提一点,kernel的load balance是异步的。为避免占用过多资源,kernel肯定不可能实时监控各个”CPU”的情况,然后面对变化实时的做出反应(当然,实时进程除外,但这不在我们讨论范围内)。

server的load balance考虑

   有了kernel的load balance作为铺垫,看看我们server上的receiver线程能做些什么吧。

   首先是worker线程的数量问题。如果worker数量过多会发生什么情况?还是假设我们的机器有上述的8个”CPU”,假设我们开了80个worker,再假设这80个线程被平均分派到每一个”CPU”上,等待处理任务。当一堆请求陆续到来的时候,由于我们的receiver没有任何load balance的策略,被唤醒的worker出现在哪个”CPU”上可以说是随机的。你想想,”同时”到来的8个请求正好落到8个不同”CPU”上的概率是多少?是:(70*60*50*40*30*20*10)/(79*78*77*76*75*74*73)=0.34%。也就是说几乎肯定会出现某些”CPU”要处理多个请求、某些”CPU”却闲着没事干的情况,系统的性能可想而知。而等到后知后觉的kernel load balance将这些请求balance到每一个”CPU”上时,可能请求已经处理得差不多了,等到下一批请求到来时,load又还是凌乱的。因为刚刚已经balance好的那些worker线程又被放回到了cond等待队列的尾部,而优先响应新请求的则是那些位于队列头部的未曾被balance过的worker。

   那么会不会经历几轮请求之后就能达到balance了呢?如果请求真的是一轮一轮的过来,并且每个请求的处理时间完全相同,那么有可能会达到balance,但是实际情况肯定相差甚远。

   解决办法是什么呢?将cond先进先出的队列式等待逻辑改为后进先出的栈式逻辑,或许可以解决问题,但是更好的办法应该是限制worker的数目等于或者略小于”CPU”数目,这样很自然的就balance了。

   第二个问题,既然我们承认kernel在各个调度域上的load balance的有意义的,我们server中的receiver线程是不是也可以通过类似的办法来获得收益呢?现在我们吸取了之前的教训,只开了8个worker线程。依靠kernel load balance的作用,这8个线程基本会固定在每一个”CPU”上。假设现在一下子来了4个请求,它们会落到4个不同的”CPU”上,如果运气好,这4个”CPU”分别属于不同的core,那么处理请求的过程就不会涉及CPU资源的争抢;反之,可能形成2个core非常忙、2个core闲着的局面。

   要解决这个问题需要做到两点,继续以我们之前的server程序为例。首先,receiver线程要知道各个worker线程都落在哪一个”CPU”上;然后在分派任务时还需要有balance的眼光。要做到第一点,最好是借助sched_affinity功能将线程固定在某个”CPU”上,避免kernel load balance把问题搞复杂了。既然前面我们已经得出了工作线程数等于或略小于CPU数的结论,现在每个线程固定在一个CPU上就是可行的。第二点,我们需要在现有pthread_cond的基础上做一些改进,给进入等待状态的worker线程赋一个优先级,比如每个core的第一个超线程作为第一优先级,第二个超线程为第二优先级。那么在cond唤醒工作线程的时候,我们就可以尽量让worker线程不落到同一个core上。实现上可以利用futex的bitset系列功能,通过bitset来标识优先级,以便在唤醒指定的worker线程。(参阅《linux futex浅析》。)

   好了,纸上谈兵讲了这么多,得来点实际的例子验证一下。为了简单,就不写什么server程序了,只需要一个生产者线程和若干消费者线程。生产者线程生成一些任务,通过cond+queue将其传递给消费者线程。为了观察在不同任务负载下的程序表现,我们需要控制任务负载。消费者线程在完成任务后通过另一组cond+queue把任务应答给生产者线程,于是生产者就知道当前有多少个任务正在处理中,以便控制生产新任务的节奏。最后,我们通过观察在不同条件下完成一批任务的时间来体会程序的性能。

   这里面比较关键的是任务本身的处理逻辑,既然我们讨论的是CPU的负载,任务肯定应该是CPU密集型的任务。然后,单个任务的处理时间不宜太短,否则可能调度过程会成为程序的瓶颈,体现不出CPU的负载问题;另一方面,单个任务的处理时间也不宜太长,否则后知后觉的kernel load balance也能解决问题,体现不出我们主动做load balance的好处(比如任务处理时间是10秒,kernel load balance花费几十毫秒来解决balance问题其实也无伤大雅)。

   代码贴在文章最后,编译出来的bin文件是这样的:

$g++ cond.cpp -pthread -O2
$./a.out
usage: ./a.out -j job_kind=shm|calc [-t thread_count=1] 
[-o job_load=1] [-c job_count=10] [-a affinity=0] [-l] 
[-f filename="./TEST" -n filelength=128M]
  • 代码里面准备了两种任务逻辑,”-j shm”是mmap一个文件,然后读取上面的数据做一些运算(文件及其长度由-f和-n参数来限定);”-j calc”是做一些算术运算;

  • “-t”参数指定工作线程的线程数;

  • “-o”指定任务负载;

  • “-c”指定单个线程处理任务的个数;

  • “-a”指定是否设置sched_affinity,并且指明跳几个”CPU”放一个worker线程。比如”-a 1″表示把worker线程顺序固定在1、2、3、……号”CPU”上,而”-a 2″表示固定在2、4、6、……号”CPU”上,以此类推。需要注意的是,邻近的”CPU”号并不表示”CPU”在物理上是邻近的,比如在我测试用的机器上,共24个”CPU”,0~11号是每个core的第一个超线程、12~23是第二个超线程。这个细节需要读/proc/cpuinfo来确定。

  • “-l”参数指定启用我们增强版的分级cond,启用的话会将0~11号worker作为第一优先级,12~23作为第二优先级(当然,需要配合”-a”参数才有实际意义,否则也不确定这些worker都落在哪些”CPU”上);

   首先来看worker线程过多所带来的问题(以下case各运行5次取时间最小值)。

case-1,启240个worker线程,24个任务负载:
$./a.out -j calc -t 240 -o 24
total cost: 23790
$./a.out -j shm -t 240 -o 24
total cost: 16827

case-2,启24个worker线程,24个任务负载:
$./a.out -j calc -t 24 -o 24
total cost: 23210
$./a.out -j shm -t 24 -o 24
total cost: 16121

   case-2效果明显要好略一些。并且在运行过程中如果用top观察的话,你会发现case-1只能压到2200%左右的CPU,而case-2几乎能达到2400%。

   在case-1的基础上,如果禁止kernel load balance会怎样?加affinity试试看:

case-3,启240个worker线程,24个任务负载,加affinity:
$./a.out -j calc -t 240 -o 24 -a 1
total cost: 27170
$./a.out -j shm -t 240 -o 24 -a 1
total cost: 15351

   calc任务比较符合预期,没有kernel load balance的情况下,性能继续下降。

   而shm任务则让人大跌眼镜,性能居然提升了!其实这个任务除了CPU之外还很依赖于内存,因为所有任务都工作在同一个文件的mmap上,”CPU”挨得近反而更能发挥内存cache。(可见在这种情况下,kernel load balance其实是帮了倒忙。)

   那么,我们将工作线程再调回24,是不是应该更理想?

case-3'
$./a.out -j shm -t 24 -o 24 -a 1
total cost: 15133

   再来看第二个问题,worker线程站位不均所带来的影响。

case-4,启24个worker线程,12个任务负载:
$./a.out -j calc -t 24 -o 12
total cost: 14686
$./a.out -j shm -t 24 -o 12
total cost: 13265

case-5,启24个worker线程,12个任务负载,加affinity,启用分级cond:
$./a.out -j calc -t 24 -o 12 -a 1 -l
total cost: 12206
$./a.out -j shm -t 24 -o 12 -a 1 -l
total cost: 12376

   效果还是不错的。改一下”-a”参数,让同一个core的两个超线程都分在同一优先级呢?

case-5'
$./a.out -j calc -t 24 -o 12 -a 2 -l
total cost: 23510
$./a.out -j shm -t 24 -o 12 -a 2 -l
total cost: 15063

   由于争抢CPU资源,calc任务性能变得很差,几乎减半。而shm任务由于cache复用所带来的好处,情况还好(比case-3还略好一些)。

   这里的任务只是举了calc和shm两个例子,实际情况可能是很复杂的。尽管load balance的问题肯定存在,但是任务会因共享cache而得利、还是因争抢cache而失利?争抢CPU流水线又会造成多大的损失?这些都只能具体问题具体分析。kernel的load balance将负载尽量均摊到离得远的”CPU”上,大多数情况下没有问题。不过我们也看到shm任务中cache共享的收益还是很大的,如果例子更极端一点,肯定会出现承受负载的CPU离得越近,反而效果越好的情况。

   另一方面,争抢CPU流水线会有多大损失,也可以简单的分析一下。超线程相当于两个线程共用一套CPU流水线,如果单个线程的代码上下文依赖很严重,指令基本上只能串行工作,无法充分利用流水线,那么流水线的空余能力就可以留给第二个线程使用。反之如果一个线程就能把流水线填满,硬塞两个线程进来肯定就只能有50%的性能(上述calc的例子就差不多是这样)。

   为了说明这个问题,我们给calc任务加了一个SERIAL_CALC的宏开关,让它的运算逻辑变成上下文强依赖。然后重跑case-5中的两个命令,我们会看到其实在这种情况下承受负载的CPU离得近一些似乎也问题不大:

case-6,采用SERIAL_CALC运算逻辑,重跑case-5中的calc任务
$g++ cond.cpp -pthread -O2 -DSERIAL_CALC
$./a.out -j calc -t 24 -o 12 -a 1 -l
total cost: 51269
$./a.out -j calc -t 24 -o 12 -a 2 -l
total cost: 56753

   最后是代码,有兴趣你还可以尝试更多的case,have fun!

#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sched.h>
#include <sys/types.h>
#include <errno.h>
#include <string.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <math.h>
#include <sys/syscall.h>

#define CPUS    24
#define FUTEX_WAIT_BITSET   9
#define FUTEX_WAKE_BITSET   10

struct Job
{
    long _input;
    long _output;
};

class JobRunner
{
public:
    virtual void run(Job* job) = 0;
};

class ShmJobRunner : public JobRunner
{
public:
    ShmJobRunner(const char* filepath, size_t length)
            : _length(length) {
        int fd = open(filepath, O_RDONLY);
        _base = (long*)mmap(NULL, _length*sizeof(long),
                PROT_READ, MAP_SHARED|MAP_POPULATE, fd, 0);
        if (_base == MAP_FAILED) {
            printf("FATAL: mmap %s(%lu) failed!\n",
                    filepath, _length*sizeof(long));
            abort();
        }
        close(fd);
    }
    virtual void run(Job* job) {
        long i = job->_input % _length;
        long j = i + _length - 1;
        const int step = 4;
        while (i + step < j) {
            if (_base[i%_length] * _base[j%_length] > 0) {
                j -= step;
            }
            else {
                i += step;
            }
        }
        job->_output = _base[i%_length];
    }
private:
    const long* _base;
    size_t _length;
};

class CalcJobRunner : public JobRunner
{
public:
    virtual void run(Job* job) {
        long v1 = 1;
        long v2 = 1;
        long v3 = 1;
        for (int i = 0; i < job->_input; i++) {
#ifndef SERIAL_CALC
            v1 += v2 + v3;
            v2 *= 3;
            v3 *= 5;
#else
            v1 += v2 + v3;
            v2 = v1 * 5 + v2 * v3;
            v3 = v1 * 3 + v1 * v2;
#endif
        }
        job->_output = v1;
    }
};

class JobRunnerCreator
{
public:
    static JobRunner* create(const char* name,
            const char* filepath, size_t filelength) {
        if (strcmp(name, "shm") == 0) {
            printf("share memory job\n");
            return new ShmJobRunner(filepath, filelength);
        }
        else if (strcmp(name, "calc") == 0) {
            printf("caculation job\n");
            return new CalcJobRunner();
        }
        printf("unknown job '%s'\n", name);
        return NULL;
    }
};

class Cond
{
public:
    virtual void lock() = 0;
    virtual void unlock() = 0;
    virtual void wait(size_t) = 0;
    virtual void wake() = 0;
};

class NormalCond : public Cond
{
public:
    NormalCond() {
        pthread_mutex_init(&_mutex, NULL);
        pthread_cond_init(&_cond, NULL);
    }
    ~NormalCond() {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    void lock() { pthread_mutex_lock(&_mutex); }
    void unlock() { pthread_mutex_unlock(&_mutex); }
    void wait(size_t) { pthread_cond_wait(&_cond, &_mutex); }
    void wake() { pthread_cond_signal(&_cond); }
private:
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

class LayeredCond : public Cond
{
public:
    LayeredCond(size_t layers = 1) : _value(0), _layers(layers) {
        pthread_mutex_init(&_mutex, NULL);
        if (_layers > sizeof(int)*8) {
            printf("FATAL: cannot support such layer %u (max %u)\n",
                    _layers, sizeof(int)*8);
            abort();
        }
        _waiters = new size_t[_layers];
        memset(_waiters, 0, sizeof(size_t)*_layers);
    }
    ~LayeredCond() {
        pthread_mutex_destroy(&_mutex);
        delete _waiters;
        _waiters = NULL;
    }
    void lock() {
        pthread_mutex_lock(&_mutex);
    }
    void unlock() {
        pthread_mutex_unlock(&_mutex);
    }
    void wait(size_t layer) {
        if (layer >= _layers) {
            printf("FATAL: layer overflow (%u/%u)\n", layer, _layers);
            abort();
        }
        _waiters[layer]++;
        while (_value == 0) {
            int value = _value;
            unlock();
            syscall(__NR_futex, &_value, FUTEX_WAIT_BITSET, value,
                    NULL, NULL, layer2mask(layer));
            lock();
        }
        _waiters[layer]--;
        _value--;
    }
    void wake() {
        int mask = ~0;
        lock();
        for (size_t i = 0; i < _layers; i++) {
            if (_waiters[i] > 0) {
                mask = layer2mask(i);
                break;
            }
        }
        _value++;
        unlock();
        syscall(__NR_futex, &_value, FUTEX_WAKE_BITSET, 1,
                NULL, NULL, mask);
    }
private:
    int layer2mask(size_t layer) {
        return 1 << layer;
    }
private:
    pthread_mutex_t _mutex;
    int _value;
    size_t* _waiters;
    size_t _layers;
};

template<class T>
class Stack
{
public:
    Stack(size_t size, size_t cond_layers = 0) : _size(size), _sp(0) {
        _buf = new T*[_size];
        _cond = (cond_layers > 0) ?
            (Cond*)new LayeredCond(cond_layers) : (Cond*)new NormalCond();
    }
    ~Stack() {
        delete []_buf;
        delete _cond;
    }
    T* pop(size_t layer = 0) {
        T* ret = NULL;
        _cond->lock();
        do {
            if (_sp > 0) {
                ret = _buf[--_sp];
            }
            else {
                _cond->wait(layer);
            }
        } while (ret == NULL);
        _cond->unlock();
        return ret;
    }
    void push(T* obj) {
        _cond->lock();
        if (_sp >= _size) {
            printf("FATAL: stack overflow\n");
            abort();
        }
        _buf[_sp++] = obj;
        _cond->unlock();
        _cond->wake();
    }
private:
    const size_t _size;
    size_t _sp;
    T** _buf;
    Cond* _cond;
};

inline struct timeval cost_begin()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv;
}

inline long cost_end(struct timeval &tv)
{
    struct timeval tv2;
    gettimeofday(&tv2, NULL);
    tv2.tv_sec -= tv.tv_sec;
    tv2.tv_usec -= tv.tv_usec;
    return tv2.tv_sec*1000+tv2.tv_usec/1000;
}

struct ThreadParam
{
    size_t layer;
    Stack<Job>* inputQ;
    Stack<Job>* outputQ;
    JobRunner* runner;
};

void* thread_func(void *data)
{
    size_t layer = ((ThreadParam*)data)->layer;
    Stack<Job>* inputQ = ((ThreadParam*)data)->inputQ;
    Stack<Job>* outputQ = ((ThreadParam*)data)->outputQ;
    JobRunner* runner = ((ThreadParam*)data)->runner;

    while (1) {
        Job* job = inputQ->pop(layer);
        runner->run(job);
        outputQ->push(job);
    }
    return NULL;
}

void force_cpu(pthread_t t, int n)
{
    cpu_set_t cpus;
    CPU_ZERO(&cpus);
    CPU_SET(n, &cpus);
    if (pthread_setaffinity_np(t, sizeof(cpus), &cpus) != 0) {
        printf("FATAL: force cpu %d failed: %s\n", n, strerror(errno));
        abort();
    }
}

void usage(const char* bin)
{
    printf("usage: %s -j job_kind=shm|calc "
        "[-t thread_count=1] [-o job_load=1] [-c job_count=10] "
        "[-a affinity=0] [-l] "
        "[-f filename="./TEST" -n filelength=128M]\n", bin);
    abort();
}

int main(int argc, char* const* argv)
{
    int THREAD_COUNT = 1;
    int JOB_LOAD = 1;
    int JOB_COUNT = 10;
    int AFFINITY = 0;
    int LAYER = 0;
    char JOB_KIND[16] = "";
    char FILEPATH[1024] = "./TEST";
    size_t LENGTH = 128*1024*1024;
    for (int i = EOF;
        (i = getopt(argc, argv, "t:o:c:a:j:lf:n:")) != EOF;) {
        switch (i) {
        case 't': THREAD_COUNT = atoi(optarg); break;
        case 'o': JOB_LOAD = atoi(optarg); break;
        case 'c': JOB_COUNT = atoi(optarg); break;
        case 'a': AFFINITY = atoi(optarg); break;
        case 'l': LAYER = 2; break;
        case 'j': strncpy(JOB_KIND, optarg, sizeof(JOB_KIND)-1); break;
        case 'f': strncpy(FILEPATH, optarg, sizeof(FILEPATH)-1); break;
        case 'n': LENGTH = atoi(optarg); break;
        default: usage(argv[0]); break;
        }
    }
    JobRunner* runner = JobRunnerCreator::create(
            JOB_KIND, FILEPATH, LENGTH);
    if (!runner) {
        usage(argv[0]);
    }

    srand(0);
    Job jobs[JOB_LOAD];

#ifdef TEST_LOAD
    for (int i = 0; i < JOB_LOAD; i++) {
        jobs[i]._input = rand();
        struct timeval tv = cost_begin();
        runner->run(&jobs[i]);
        long cost = cost_end(tv);
        printf("job[%d](%ld)=(%ld) costs: %ld\n",
                i, jobs[i]._input, jobs[i]._output, cost);
    }
    delete runner;
    return 0;
#endif

    printf("use layer %d\n", LAYER);
    Stack<Job> inputQ(JOB_LOAD, LAYER);
    Stack<Job> outputQ(JOB_LOAD, LAYER);

    pthread_t t;
    ThreadParam param[THREAD_COUNT];

    printf("thread init: ");
    for (int i = 0; i < THREAD_COUNT; i++) {
        int cpu = AFFINITY ? (i/AFFINITY+i%AFFINITY*CPUS/2)%CPUS : -1;
        size_t layer = !!(LAYER && i % CPUS >= CPUS/2);
        param[i].inputQ = &inputQ;
        param[i].outputQ = &outputQ;
        param[i].runner = runner;
        param[i].layer = layer;
        pthread_create(&t, NULL, thread_func, (void*)&param[i]);
        if (cpu >= 0) {
            printf("%d(%d|%d),", i, cpu, layer);
            force_cpu(t, cpu);
        }
        else {
            printf("%d(*|%d),", i, layer);
        }
        usleep(1000);
    }
    printf("\n");

    struct timeval tv = cost_begin();
    for (int i = 0; i < JOB_LOAD; i++) {
        jobs[i]._input = rand();
        inputQ.push(&jobs[i]);
    }
    for (int i = 0; i < JOB_LOAD*JOB_COUNT; i++) {
        Job* job = outputQ.pop();
        job->_input = rand();
        inputQ.push(job);
    }
    for (int i = 0; i < JOB_LOAD; i++) {
        outputQ.pop();
    }
    long cost = cost_end(tv);
    printf("total cost: %ld\n", cost);

    delete runner;
    return 0;
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK