4

python | 进程池的信息同步

 1 year ago
source link: https://benpaodewoniu.github.io/2022/10/12/python177/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

python | 进程池的信息同步

先看下面一段代码。

import os
import time
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager


def job(v, k):
for i in range(10):
value = v.setdefault(k, 0)
time.sleep(0.1)
v[k] = value + i
print(f"{os.getpid()} {value} {v}")


if __name__ == '__main__':
v = Manager().dict()
jobs = ["aa", "bb"]
p = ProcessPoolExecutor(max_workers=2)
for i in range(10):
for k in jobs:
p.submit(job, v, k)
34248 441 {'aa': 450, 'bb': 450}

如果,把进程池的数量改为 5

p = ProcessPoolExecutor(max_workers=5)
22632 171 {'aa': 180, 'bb': 180}

很明显, max_workers 等于 5 的时候,在未改变变量的时候,已经读取了,造成了数据的紊乱。

这个时候就要加锁。

import os
import time
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager

lock = Manager().Lock()


def job(v, k):
lock.acquire()
for i in range(10):
value = v.setdefault(k, 0)
time.sleep(0.1)
v[k] = value + i
print(f"{os.getpid()} {value} {v}")
lock.release()


if __name__ == '__main__':
v = Manager().dict()
jobs = ["aa", "bb"]
p = ProcessPoolExecutor(max_workers=5)
for i in range(10):
for k in jobs:
p.submit(job, v, k)

但是,根据

这种加锁方式,就变成了串行了,最终的输出也是如此。

34668 0 {'aa': 0}
34668 0 {'aa': 1}
34668 1 {'aa': 3}
34668 3 {'aa': 6}
34668 6 {'aa': 10}
34668 10 {'aa': 15}
34668 15 {'aa': 21}
34668 21 {'aa': 28}
34668 28 {'aa': 36}
34668 36 {'aa': 45}
34669 0 {'aa': 45, 'bb': 0}
34669 0 {'aa': 45, 'bb': 1}
34669 1 {'aa': 45, 'bb': 3}
34669 3 {'aa': 45, 'bb': 6}
34669 6 {'aa': 45, 'bb': 10}
34669 10 {'aa': 45, 'bb': 15}
34669 15 {'aa': 45, 'bb': 21}
34669 21 {'aa': 45, 'bb': 28}
34669 28 {'aa': 45, 'bb': 36}
34669 36 {'aa': 45, 'bb': 45}
...

这里想一个场景,假设,我们做一个股票信息处理系统。股票信息是实时过来的,所以系统有以下的特点

  • 系统是 CPU 密集型
  • 股票可能有几千个
  • 系统需要进程池管理,因为,如果每一个股票都创建一个进程的话,对系统的压力太大了
  • 进程池需要进行数据同步
    • 进程池每一次调用都相当于新进程,所以,进程池需要读取老数据,然后根据新数据来更新老数据
  • 会出现股票数据连续的情况
    • 类似上面第二个执行代码「进程池里面的数量为 5」
  • 一个进程池
  • 一个同步数据区 Manger().dict()
    • 同步数据区里面为每一个股票代码创建一个数据区
  • 为每一个股票代码创建一个锁

简化的代码如下:

import os
import time
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager


def job(v, k, lock):
with lock:
for i in range(10):
value = v.setdefault(k, 0)
time.sleep(0.1)
v[k] = value + i
print(f"{k} {os.getpid()} {value} {v}")


if __name__ == '__main__':
v = Manager().dict()
l = {}
jobs = ["aa", "bb"]
p = ProcessPoolExecutor(max_workers=5)
for i in range(10):
for k in jobs:
lock = l.setdefault(k, Manager().Lock())
p.submit(job, v, k, lock)

如果运行的话,就会发现运行的和设计的一样。

但是,我有一个疑问,就是把

l 换成 Manager().dict() 会报错,这个我暂时不知道原因。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK