5

持续发烧,聊聊Dart语言的并发处理,能挑战Go不?

 3 years ago
source link: https://my.oschina.net/u/5235478/blog/5118327
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

貌似关于Dart的文章没流量啊,就算在小编关怀上了首页,看得人还是很少的。

算了,今天持续发烧,再来写写如何使用 Dart 语言的并发操作。说起并发操作,玩 Go 的同学该笑了,这就是我们的看家本领啊。玩 PHP 的同学继续看看,表示我们光看不说话。

代码演示之前,我们先假设一个场景。假设我有一些漂亮妹妹,他们要出门旅行了,旅行的时候会发照片给我。在这里个过程中,代码需要做的事情:

  1. 安排出行计划,同时出行哦,不能有先后之分
  2. 他们各自出行,可以发照片给我

这个过程中,我关心的是他们能不能处理好自己的事情,因为我妹妹太多了,如果都让我帮他们,累死我也搞不定啊,我就干两件事,安排好出行计划,最后收照片。

代码演示一下吧

import 'dart:io';
import 'dart:isolate';

main() async {
  print(DateTime.now().toString() + ' 开始');

  //这是一个接收端口,可以理解成我的手机
  ReceivePort receivePort = new ReceivePort();

  //安排三个妹妹出去旅行,让她们牢记我的手机号
  await Isolate.spawn(travelToBeijing, receivePort.sendPort);
  await Isolate.spawn(travelToShanghai, receivePort.sendPort);
  await Isolate.spawn(travelToGuangzhou, receivePort.sendPort);

  //我就在手机上,等待他们的消息
  receivePort.listen((message) {
    print(message);
  });

  print(DateTime.now().toString() + ' 结束');
}

void travelToBeijing(SendPort sendPort) {
  sleep(Duration(seconds: 3));
  sendPort.send(DateTime.now().toString() + ' 我是妹妹1,我在北京了');
}

void travelToShanghai(SendPort sendPort) {
  sleep(Duration(seconds: 3));
  sendPort.send(DateTime.now().toString() + ' 我是妹妹2,我在上海了');
}

void travelToGuangzhou(SendPort sendPort) {
  sleep(Duration(seconds: 3));
  sendPort.send(DateTime.now().toString() + ' 我是妹妹3,我在广州了');
}

上面写了那么多,都是啥意思呢,我自己稍微解释下上面的代码。

Dart 里的并发,用到的是 Isolate 类。

Isolate 翻译过来即是 隔离区, 是 Dart 实现并发的重要类。它并发的原理,既不是多进程,也不是多线程,你说类似于 Go 的协程吧,也不像。

真的很难定义它,期待对操作系统研究更深的同学来布道,当然这不影响我们使用。

Isolate.spawn 来定义一个并发任务,接收两个参数,第一个是该任务的处理函数,第二个是该处理函数的所需要的参数

ReceivePort 翻译一下 接收端口, 也可以翻译成 接收器,用来接收各个任务回传的消息。

receivePort.listen 用来监听各任务的回传信息,代码里只是简单打印

执行它,得到打印的结果

2021-07-01 15:40:38.132122 开始
2021-07-01 15:40:38.295683 结束
2021-07-01 15:40:41.200316 我是妹妹1,我在北京了
2021-07-01 15:40:41.248851 我是妹妹2,我在上海了
2021-07-01 15:40:41.296737 我是妹妹3,我在广州了

注意看打印结果,妹妹们很乖,基本在同一时间给我回复了消息。总共时间差不多3秒钟,你可以添加更多任务,都会是3秒钟。

再次封装一下

实际使用的时候,我们可以再次封装,使用的同学不用去想 Isolate, ReceivePort 都是什么鬼

import 'dart:io';
import 'dart:isolate';

class MultiTask {
  static void run(
      {List<TaskItem> taskList,
      Function taskFunc,
      Function onCompletedItem,
      Function onCompletedAll}) async {
    ReceivePort receivePort = new ReceivePort();

    Map<String, Isolate> mapParam = {};
    for (int i = 0; i < taskList.length; i++) {
      TaskItem taskItem = taskList[i];
      mapParam[taskItem.key] = await Isolate.spawn(
          taskFunc, TaskMessage(receivePort.sendPort, taskItem));
    }

    List<TaskRes> taskResList = [];
    receivePort.listen((message) async {
      TaskRes taskRes = message as TaskRes;
      if (null != onCompletedItem) await onCompletedItem(taskRes);
      taskResList.add(taskRes);
      mapParam[taskRes.key].kill();
      if (taskResList.length == taskList.length) {
        receivePort.close();
        if (null != onCompletedAll) await onCompletedAll(taskResList);
      }
    });
  }
}

class TaskMessage {
  SendPort sendPort;
  TaskItem taskItem;
  TaskMessage(this.sendPort, this.taskItem);
}

class TaskItem {
  String key;
  String param;

  TaskItem(this.key, this.param);
}

class TaskRes {
  String key;
  String res;

  TaskRes(this.key, this.res);
}

使用的时候,这样来:

import 'dart:io';
import 'dart:isolate';
import 'MultiTask.dart';

main() async {
  List<TaskItem> taskList = [
    TaskItem('1', 'param1'),
    TaskItem('2', 'param2'),
    TaskItem('3', 'param1'),
    TaskItem('4', 'param2'),
    TaskItem('5', 'param1'),
    TaskItem('6', 'param2'),
    TaskItem('7', 'param1'),
    TaskItem('8', 'param2'),
    TaskItem('9', 'param1'),
    TaskItem('0', 'param2'),
  ];

  print(DateTime.now());

  MultiTask.run(
    taskList: taskList,
    taskFunc: taskFunc,
    onCompletedItem: (TaskRes taskRes) =>
        print(DateTime.now().toString() + '_' + taskRes.res),
    onCompletedAll: (List<TaskRes> taskResList) =>
        print(DateTime.now().toString() + '_onCompletedAll'),
  );
}

void taskFunc(TaskMessage taskMessage) async {
  sleep(Duration(seconds: 3));

  String res = 'onCompletedItem is ok';

  taskMessage.sendPort.send(TaskRes(taskMessage.taskItem.key, res));
}
2021-07-01 15:50:54.862973
2021-07-01 15:50:57.924675_onCompletedItem is ok
2021-07-01 15:50:57.954982_onCompletedItem is ok
2021-07-01 15:50:57.986042_onCompletedItem is ok
2021-07-01 15:50:58.021282_onCompletedItem is ok
2021-07-01 15:50:58.053387_onCompletedItem is ok
2021-07-01 15:50:58.088492_onCompletedItem is ok
2021-07-01 15:50:58.121968_onCompletedItem is ok
2021-07-01 15:50:58.157117_onCompletedItem is ok
2021-07-01 15:50:58.190835_onCompletedItem is ok
2021-07-01 15:50:58.229044_onCompletedItem is ok
2021-07-01 15:50:58.241011_onCompletedAll

可以看到,中间每一个任务完成的时间,都很接近,并发处理很完美

当需要处理很多任务时,可以开辟多个隔离区,并发执行,提高效率。

Dart语言对并发的处理,还算人性化,理解起来没有难度,用起来也容易。

同学们,骚起来吧。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK