4

Actor并发模式实现

 2 years ago
source link: https://allenwind.github.io/blog/3145/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
Mr.Feng Blog

NLP、深度学习、机器学习、Python、Go

Actor并发模式实现

Pythonic的Actor并发模式实现

Actor模式,参与者模式,是一种解决并发和分布式计算的方法。这种方法适用于共享内存的计算机和分布式场景。可以把它类比成企业内部的组织架构。

  1. Actor类,角色的内部状态和处理消息的方式的抽象。
  2. Actor实例,程序运行时存在的实体,它们能够接收消息。每个Actor自己决定是否把消息分发到其他Actor实例。Actor实例也可以创建自己的Actor实例。
  3. 消息,通信数据的单位,Actor间使用消息进行通信。
  4. 邮箱,缓存消息的内存区域,每个Actor实例都有一块专属的区域。
  5. Actor引用,是一种对象,使用该对象可以向指定的Actor实例发送消息。Actor引用就像企业员工的邮件地址
  6. 分配器,是一种组件,用于决定何时允许Actor实例处理消息,并未Actor分配计算资源。

下面我们来实现简单的Actor模式

class Actor:
_qsize = 100

def send(self, task, *args, **kwargs):
self._mailbox.put((task, args, kwargs))

def recv(self, block=True, timeout=None):
try:
task, args, kwargs = self._mailbox.get(block=True, timeout=timeout)
except queue.Empty as err:
task = ActorExit

if task is ActorExit:
raise ActorExit()
return task, args, kwargs

def start(self):
raise NotImplementedError("must start by MixIn pattern!")

def close(self):
self.send(ActorExit)

def _bootstrap(self):
try:
self.run()
except ActorExit:
pass
finally:
self._terminated.set()

def join(self):
self._terminated.wait()

def run(self):
pass

def _wrapper_task(self, task, *args, **kwargs):
result = task(*args, **kwargs)
self._results.put(result)
return

通过重写run方法可以为每个Actor添加运行时功能。

转载请包括本文地址:https://allenwind.github.io/blog/3145
更多文章请参考:https://allenwind.github.io/blog/archives/


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK