2

Dlang 并行化 - jeefy

 1 year ago
source link: https://www.cnblogs.com/jeefy/p/17512683.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Dlang 并行化

好难受,dlang 生态太差,没办法,学了半天才明白。

我尽量以精炼的语言解释。

采用 定义,例子(代码),解释 的步骤讲解。

所以你可能看到很多代码,一点解释……

我会省略一些 import,让代码短一些

parallelism 并行

感觉好废物,这一小部分了解即可。

这部分只需要会 parallelmap & amap 其实就差不多了。

介绍比较实用的几种方法。

parallel 迭代

foreach (i; parallel(range, work_uint_size = 100)) {
    // do something here
}

其中 work_unit_size 表示最多同时运行的数量。

例子

import std.stdio, std.parallelism;
import core.thread;

struct Producer {
    void produce() {
        Thread.sleep(1.seconds);

        writeln("Process +1");
    }
};

void main() {
    auto prods = new Producer[](10);

    foreach (prod; parallel(prods)) {
        prod.produce();
    }
}

创建任务:

auto theTask = task!anOperation(arguments);
// or
auto theTask = task(&someFunction, parameters...)

运行任务:theTask.executeInNewThread()

查看是否完成:if (theTask.done) { ... }

获取结果:auto result = theTask.yeildForce()


asyncBuf

感觉没啥用。

并行保存多个需要长时间制作的元素。还需要保证使用的长时间的……

struct Producer {
    int i, total;

    bool empty() const {
        return total <= i;
    }

    int front() const {
        return i;
    }

    void popFront() {
        writefln("Producing product ID: %d", i);
        Thread.sleep(1.seconds / 2);
        ++i;
    }
};

void main() {
    auto prods = Producer(0, 10);
    foreach (prod; taskPool.asyncBuf(prods, 3)) {
        writef("Got product id: %d\n", prod);
        Thread.sleep(1.seconds);
        writeln("Used product...");
    }
}

map & amap

先看例子:

int increase(int x) {
    Thread.sleep(500.msecs);
    return x + 3;
}

void main() {
    int[] nums;
    foreach (i; 0 .. 10) {
        nums ~= i;
    }

    // auto results = taskPool.map!increase(nums);
    auto results = taskPool.amap!increase(nums);
    foreach (result; results) {
        writeln(result);
    }
}

可以类比 python 中的 map

两者的区别:

  • map 可以指定同时运行的数量,而 amap 是有多少运行多少。

  • map 会一定程度上按顺序执行,而 amap 并不是顺序执行,它依靠 RandomAccessRange,也就是随机顺序执行。


我不知道怎么翻译,反正就是 Message Passing Concurrency

核心方法: spawn (唤起)

我们可以形象的认为,spawn 方法可以唤起一个新的工人(线程)来为我们工作。

并且这个工人与主线程是分开的(先看代码后面解释):

import std.stdio;
import std.concurrency;
import core.thread;
void worker() {
    foreach (i; 0 .. 5) {
        Thread.sleep(500.msecs);
        writeln(i, " (worker) in ", thisTid);

    }

}
void main() {
    Tid myWorkerTid = spawn(&worker);
    foreach (i; 0 .. 5) {
        Thread.sleep(300.msecs);
        writeln(i, " (main) in ", thisTid);

    }

    writeln("main is done!");
}

最终输出:

0 (main) in Tid(7f0eb19bc0b0)
0 (worker) in Tid(7f0eb19bc000)
1 (main) in Tid(7f0eb19bc0b0)
2 (main) in Tid(7f0eb19bc0b0)
1 (worker) in Tid(7f0eb19bc000)
3 (main) in Tid(7f0eb19bc0b0)
2 (worker) in Tid(7f0eb19bc000)
4 (main) in Tid(7f0eb19bc0b0)
main is done!
3 (worker) in Tid(7f0eb19bc000)
4 (worker) in Tid(7f0eb19bc000)

实际输出可能略有差异。

解释

  • spawn(&worker) 唤起了一个新的线程运行 worker 函数,并返回了新的线程的 id 是一个结构体 Tid

  • thisTid 类似于一个宏,用于获取当前所在线程的 id


先看代码后解释:

void worker() {
    int value = 0;
    while (value >= 0) {
        value = receiveOnly!int();
        double result = cast(double)value / 7;
        ownerTid.send(result);
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    foreach (val; 0 .. 10) {
        myWorker.send(val);
        double result = receiveOnly!double();
        writefln("Send %s got %s", val, result);
    }

    myWorker.send(-1); // terminate worker process
}

最终输出:

Send 0 got 0
Send 1 got 0.142857
Send 2 got 0.285714
Send 3 got 0.428571
Send 4 got 0.571429
Send 5 got 0.714286
Send 6 got 0.857143
Send 7 got 1
Send 8 got 1.14286
Send 9 got 1.28571

解释

  • ownerTid 类似于一个宏,用于取得唤醒自己的线程的 Tid,从而发送消息。

  • Tid.send(...) 可以向 Tid 代表的那个线程发送一条消息。

    • 如果同时要发送多个东西,在发送的地方是 Tid.send(a, b, c, ...)

    • 在接受的地方要变化为 receiveOnly!(typeof(a), typeof(b), typeof(c), ...),最终得到的是一个 tuple,可以通过下标访问。

  • receiveOnly!type() 表示只接受类型为 type 的消息。

  • 最后 myWorker.send(-1) 是根据代码逻辑结束的,并不属于通法。

如果我们需要更灵活的接受方法怎么办?

void workerFunc() {
    bool isDone = false;
    while (!isDone) {
        void intHandler(int message) {
            writeln("handling int message: ", message);

            if (message == -1) {
                writeln("exiting");
                isDone = true;
            }
        }

        void stringHandler(string message) {
            writeln("handling string message: ", message);
        }
        
        receive(&intHandler, &stringHandler);
    }    
}

我们可以指定多种 Handler 以处理不同的数据类型。利用 receive 注册 到处理类型消息的函数中。


更优雅的方式

处理更多的类型:

struct Exit {}

void worker() {
    bool done = false;

    while (!done) {
        receive(
            (int message) {
                writeln("int message ", message);
            },

            (string message) {
                writeln("string message", message);
            },

            (Exit message) {
                writeln("Exit message");
                done = true;
            },

            (Variant message) {
                writeln("Unexpected message: ", message);
            }
        );
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    myWorker.send(10);
    myWorker.send("hello");
    myWorker.send(10.1);
    myWorker.send(Exit());
}

主要是使用了匿名函数……

解释

  • 利用 std.variant.Variant 以接收任何类型的数据。但是需要保证,处理所有类型数据的方法应该放在最后面,不然会导致全部被判断成 Variant

我们可以定一个超时时间,超过这个时间就直接返回。

先看代码:

struct Exit {}

void worker() {
    bool done = false;

    while (!done) {
        bool received = receiveTimeout(600.msecs,
            (Exit message) {
                writeln("Exit message");
                done = true;
            },

            (Variant message) {
                writeln("Some message: ", message);
            }
        );
        if (!received) {
            writeln("no message yet...");
        }
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    myWorker.send(10);
    myWorker.send("hello");
    Thread.sleep(1.seconds);
    myWorker.send(10.1);
    myWorker.send(Exit());
}

最终输出

Some message: 10
Some message: hello
no message yet...
Some message: 10.1
Exit message

解释

  • receiveTimeout 只比 recieve 多了一个参数,用于指定超时时间。

  • 返回一个 bool 变量,如果为 false 则没有接收到任何消息。


等待所有线程结束thread_joinAll()

一般来说放在需要放的地方……即可。


终于讲到这里了。

我们先考虑一个程序:

import std.stdio;
import std.concurrency;
import core.thread;

int variable;

void printInfo(string message) {
    writefln("%s: %s (@%s)", message, variable, &variable);
}

void worker() {
    variable = 42;
    printInfo("Before the worker is terminated");
}

void main() {
    spawn(&worker);
    thread_joinAll();
    printInfo("After the worker is terminated");
}

其输出是这样的:

Before the worker is terminated: 42 (@7F308C88C530)
After the worker is terminated: 0 (@7F308C98D730)

可以发现,同样的变量在不同的线程里面地址是不一样的,也就是说数据是独立的,所以要有共享。

此时我们只需要修改:

shared int variable;

实际上写为 shared(int) variable; 会更标准,但是好麻烦……

当然,不得不说,有了消息传递,那么数据共享就是备用的方案了。


Data Race

数据竞争是一个很常见的问题。

例子

void worker(shared int* i) {
    foreach (t; 0 .. 200000) {
        *i = *i + 1;
    }
}

void main() {
    shared int i = 0;

    foreach (id; 0 .. 10) {
        spawn(&worker, &i);
    }

    thread_joinAll();
    writeln("after i to ", i);
}

期望输出 2000000,但是实际输出可能远小于此。

所以我们要考虑同步:

void worker(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i + 1;
        }
    }
}

解释

  • synchronized 会隐式地创建一个锁,保证只有一个线程会持有这个锁,并且执行这些操作。

  • 有些时候,synchronized 会使得因为等待锁的额外开销使得程序变慢。但有些时候,我们可以通过更好的方法避免等待的开销,例如使用原子操作。

  • synchronized 创建的锁只会对于这一个代码块生效,不会影响到其他的代码块。


void increase(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i + 1;
        }
    }
}

void decrese(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i - 1;
        }
    }
}

void main() {
    shared int i = 0;

    foreach (id; 0 .. 10) {
        if (id & 1) spawn(&increase, &i);
        else spawn(&decrese, &i);
    }

    thread_joinAll();
    writeln("after i to ", i);
}

期望输出 0 但是实际输出……不知道。所以我们需要共用锁:

synchronized (lock_object) {
    // ...
}

修改后的代码

class Lock {}
shared Lock lock = new Lock();

void increase(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized (lock) {
            *i = *i + 1;
        }
    }
}

void decrese(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized (lock) {
            *i = *i - 1;
        }
    }
}

现在就可以得到正确的答案了。


我们可以使用 synchronized 修饰一个类。这相当于在每一个代码块里面嵌套一个 synchronzied

synchronized class Cls {
    void func() {
        // ...
    }
}

上面的等价于:

class Cls {
    void func() {
        synchronized (this) {
            // ...
        }
    }
}

同步初始化

我们考虑这份代码:

static this() {
    writeln("executing static this()");
}

void worker() {
}
void main() {
    spawn(&worker);
    thread_joinAll();
}

最终会输出两次 executing static this()

如果我们修改为 shared static this() { ... },那么最终只会输出一次。


需要用到 core.atomic 库。

atomic!"+="(var, x);
atomic!"-="(var, x);
// ... like *= /= ^= ...

这些都是原子操作。

shared(int) *value;
bool is_mutated = cas(value, currentValue, newValue);

如果返回 true,那么值会改变,否则没有。

原子操作一般来说快于 synchronized

同时,原子操作也可以作用于结构体上,这里不作为讲解。

更多操作可以参考标准库:

  • core.sync.barrier

  • core.sync.condition

  • core.sync.config

  • core.sync.exception

  • core.sync.mutex

  • core.sync.rwmutex

  • core.sync.semaphore


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK