Dlang 并行化 - jeefy
source link: https://www.cnblogs.com/jeefy/p/17512683.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Dlang 并行化
好难受,dlang 生态太差,没办法,学了半天才明白。
我尽量以精炼的语言解释。
采用 定义,例子(代码),解释 的步骤讲解。
所以你可能看到很多代码,一点解释……
我会省略一些
import
,让代码短一些
parallelism
并行
感觉好废物,这一小部分了解即可。
这部分只需要会
parallel
和map & amap
其实就差不多了。
介绍比较实用的几种方法。
parallel
迭代
foreach (i; parallel(range, work_uint_size = 100)) {
// do something here
}
其中 work_unit_size
表示最多同时运行的数量。
例子:
import std.stdio, std.parallelism;
import core.thread;
struct Producer {
void produce() {
Thread.sleep(1.seconds);
writeln("Process +1");
}
};
void main() {
auto prods = new Producer[](10);
foreach (prod; parallel(prods)) {
prod.produce();
}
}
创建任务:
auto theTask = task!anOperation(arguments);
// or
auto theTask = task(&someFunction, parameters...)
运行任务:theTask.executeInNewThread()
查看是否完成:if (theTask.done) { ... }
获取结果:auto result = theTask.yeildForce()
asyncBuf
感觉没啥用。
并行保存多个需要长时间制作的元素。还需要保证使用的长时间的……
struct Producer {
int i, total;
bool empty() const {
return total <= i;
}
int front() const {
return i;
}
void popFront() {
writefln("Producing product ID: %d", i);
Thread.sleep(1.seconds / 2);
++i;
}
};
void main() {
auto prods = Producer(0, 10);
foreach (prod; taskPool.asyncBuf(prods, 3)) {
writef("Got product id: %d\n", prod);
Thread.sleep(1.seconds);
writeln("Used product...");
}
}
map & amap
先看例子:
int increase(int x) {
Thread.sleep(500.msecs);
return x + 3;
}
void main() {
int[] nums;
foreach (i; 0 .. 10) {
nums ~= i;
}
// auto results = taskPool.map!increase(nums);
auto results = taskPool.amap!increase(nums);
foreach (result; results) {
writeln(result);
}
}
可以类比 python
中的 map
。
两者的区别:
-
map
可以指定同时运行的数量,而amap
是有多少运行多少。 -
map
会一定程度上按顺序执行,而amap
并不是顺序执行,它依靠RandomAccessRange
,也就是随机顺序执行。
我不知道怎么翻译,反正就是
Message Passing Concurrency
。
核心方法: spawn
(唤起)
我们可以形象的认为,spawn
方法可以唤起一个新的工人(线程)来为我们工作。
并且这个工人与主线程是分开的(先看代码后面解释):
import std.stdio;
import std.concurrency;
import core.thread;
void worker() {
foreach (i; 0 .. 5) {
Thread.sleep(500.msecs);
writeln(i, " (worker) in ", thisTid);
}
}
void main() {
Tid myWorkerTid = spawn(&worker);
foreach (i; 0 .. 5) {
Thread.sleep(300.msecs);
writeln(i, " (main) in ", thisTid);
}
writeln("main is done!");
}
最终输出:
0 (main) in Tid(7f0eb19bc0b0)
0 (worker) in Tid(7f0eb19bc000)
1 (main) in Tid(7f0eb19bc0b0)
2 (main) in Tid(7f0eb19bc0b0)
1 (worker) in Tid(7f0eb19bc000)
3 (main) in Tid(7f0eb19bc0b0)
2 (worker) in Tid(7f0eb19bc000)
4 (main) in Tid(7f0eb19bc0b0)
main is done!
3 (worker) in Tid(7f0eb19bc000)
4 (worker) in Tid(7f0eb19bc000)
实际输出可能略有差异。
解释:
-
spawn(&worker)
唤起了一个新的线程运行worker
函数,并返回了新的线程的id
是一个结构体Tid
。 -
thisTid
类似于一个宏,用于获取当前所在线程的id
。
先看代码后解释:
void worker() {
int value = 0;
while (value >= 0) {
value = receiveOnly!int();
double result = cast(double)value / 7;
ownerTid.send(result);
}
}
void main() {
Tid myWorker = spawn(&worker);
foreach (val; 0 .. 10) {
myWorker.send(val);
double result = receiveOnly!double();
writefln("Send %s got %s", val, result);
}
myWorker.send(-1); // terminate worker process
}
最终输出:
Send 0 got 0
Send 1 got 0.142857
Send 2 got 0.285714
Send 3 got 0.428571
Send 4 got 0.571429
Send 5 got 0.714286
Send 6 got 0.857143
Send 7 got 1
Send 8 got 1.14286
Send 9 got 1.28571
解释:
-
ownerTid
类似于一个宏,用于取得唤醒自己的线程的Tid
,从而发送消息。 -
Tid.send(...)
可以向Tid
代表的那个线程发送一条消息。-
如果同时要发送多个东西,在发送的地方是
Tid.send(a, b, c, ...)
。 -
在接受的地方要变化为
receiveOnly!(typeof(a), typeof(b), typeof(c), ...)
,最终得到的是一个tuple
,可以通过下标访问。
-
-
receiveOnly!type()
表示只接受类型为type
的消息。 -
最后
myWorker.send(-1)
是根据代码逻辑结束的,并不属于通法。
如果我们需要更灵活的接受方法怎么办?
void workerFunc() {
bool isDone = false;
while (!isDone) {
void intHandler(int message) {
writeln("handling int message: ", message);
if (message == -1) {
writeln("exiting");
isDone = true;
}
}
void stringHandler(string message) {
writeln("handling string message: ", message);
}
receive(&intHandler, &stringHandler);
}
}
我们可以指定多种 Handler
以处理不同的数据类型。利用 receive
注册 到处理类型消息的函数中。
更优雅的方式
处理更多的类型:
struct Exit {}
void worker() {
bool done = false;
while (!done) {
receive(
(int message) {
writeln("int message ", message);
},
(string message) {
writeln("string message", message);
},
(Exit message) {
writeln("Exit message");
done = true;
},
(Variant message) {
writeln("Unexpected message: ", message);
}
);
}
}
void main() {
Tid myWorker = spawn(&worker);
myWorker.send(10);
myWorker.send("hello");
myWorker.send(10.1);
myWorker.send(Exit());
}
主要是使用了匿名函数……
解释:
- 利用
std.variant.Variant
以接收任何类型的数据。但是需要保证,处理所有类型数据的方法应该放在最后面,不然会导致全部被判断成Variant
。
我们可以定一个超时时间,超过这个时间就直接返回。
先看代码:
struct Exit {}
void worker() {
bool done = false;
while (!done) {
bool received = receiveTimeout(600.msecs,
(Exit message) {
writeln("Exit message");
done = true;
},
(Variant message) {
writeln("Some message: ", message);
}
);
if (!received) {
writeln("no message yet...");
}
}
}
void main() {
Tid myWorker = spawn(&worker);
myWorker.send(10);
myWorker.send("hello");
Thread.sleep(1.seconds);
myWorker.send(10.1);
myWorker.send(Exit());
}
最终输出:
Some message: 10
Some message: hello
no message yet...
Some message: 10.1
Exit message
解释:
-
receiveTimeout
只比recieve
多了一个参数,用于指定超时时间。 -
返回一个
bool
变量,如果为false
则没有接收到任何消息。
等待所有线程结束:thread_joinAll()
。
一般来说放在需要放的地方……即可。
终于讲到这里了。
我们先考虑一个程序:
import std.stdio;
import std.concurrency;
import core.thread;
int variable;
void printInfo(string message) {
writefln("%s: %s (@%s)", message, variable, &variable);
}
void worker() {
variable = 42;
printInfo("Before the worker is terminated");
}
void main() {
spawn(&worker);
thread_joinAll();
printInfo("After the worker is terminated");
}
其输出是这样的:
Before the worker is terminated: 42 (@7F308C88C530)
After the worker is terminated: 0 (@7F308C98D730)
可以发现,同样的变量在不同的线程里面地址是不一样的,也就是说数据是独立的,所以要有共享。
此时我们只需要修改:
shared int variable;
实际上写为
shared(int) variable;
会更标准,但是好麻烦……
当然,不得不说,有了消息传递,那么数据共享就是备用的方案了。
Data Race
数据竞争是一个很常见的问题。
例子:
void worker(shared int* i) {
foreach (t; 0 .. 200000) {
*i = *i + 1;
}
}
void main() {
shared int i = 0;
foreach (id; 0 .. 10) {
spawn(&worker, &i);
}
thread_joinAll();
writeln("after i to ", i);
}
期望输出 2000000
,但是实际输出可能远小于此。
所以我们要考虑同步:
void worker(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized {
*i = *i + 1;
}
}
}
解释:
-
synchronized
会隐式地创建一个锁,保证只有一个线程会持有这个锁,并且执行这些操作。 -
有些时候,
synchronized
会使得因为等待锁的额外开销使得程序变慢。但有些时候,我们可以通过更好的方法避免等待的开销,例如使用原子操作。 -
synchronized
创建的锁只会对于这一个代码块生效,不会影响到其他的代码块。
void increase(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized {
*i = *i + 1;
}
}
}
void decrese(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized {
*i = *i - 1;
}
}
}
void main() {
shared int i = 0;
foreach (id; 0 .. 10) {
if (id & 1) spawn(&increase, &i);
else spawn(&decrese, &i);
}
thread_joinAll();
writeln("after i to ", i);
}
期望输出 0
但是实际输出……不知道。所以我们需要共用锁:
synchronized (lock_object) {
// ...
}
修改后的代码:
class Lock {}
shared Lock lock = new Lock();
void increase(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized (lock) {
*i = *i + 1;
}
}
}
void decrese(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized (lock) {
*i = *i - 1;
}
}
}
现在就可以得到正确的答案了。
我们可以使用 synchronized
修饰一个类。这相当于在每一个代码块里面嵌套一个 synchronzied
:
synchronized class Cls {
void func() {
// ...
}
}
上面的等价于:
class Cls {
void func() {
synchronized (this) {
// ...
}
}
}
同步初始化
我们考虑这份代码:
static this() {
writeln("executing static this()");
}
void worker() {
}
void main() {
spawn(&worker);
thread_joinAll();
}
最终会输出两次 executing static this()
。
如果我们修改为 shared static this() { ... }
,那么最终只会输出一次。
需要用到
core.atomic
库。
atomic!"+="(var, x);
atomic!"-="(var, x);
// ... like *= /= ^= ...
这些都是原子操作。
shared(int) *value;
bool is_mutated = cas(value, currentValue, newValue);
如果返回 true
,那么值会改变,否则没有。
原子操作一般来说快于
synchronized
。同时,原子操作也可以作用于结构体上,这里不作为讲解。
更多操作可以参考标准库:
core.sync.barrier
core.sync.condition
core.sync.config
core.sync.exception
core.sync.mutex
core.sync.rwmutex
core.sync.semaphore
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK