6

python | 多线程、协程、主线程结合

 8 months ago
source link: https://benpaodewoniu.github.io/2023/12/25/python197/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

python | 多线程、协程、主线程结合

关于 多线程、协程、主线程结合 的升级用法。

如果你这样使用。

import asyncio

import aiohttp

import time

async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg)
time.sleep(5)


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["[email protected]@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(ws_binance())
loop.create_task(ws_mexc())
loop.run_forever()

就会发现两个 ws 共享一个 event loop,会造成数据卡顿。两个人的数据受制于 time.sleep(5),当然如果把 time.sleep(5) 换成 await asyncio.sleep(5),两者不受影响。

真正的使用方法如下

import asyncio
import threading
import time

import aiohttp


async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:8001") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg)
await asyncio.sleep(0.0001)


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:8001") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["[email protected]@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg)
await asyncio.sleep(0.0001)


async def ws_tt():
while 1:
print("=====================================")
await asyncio.sleep(5)


async def main():
task1 = asyncio.create_task(ws_binance())
task2 = asyncio.create_task(ws_mexc())
task3 = asyncio.create_task(ws_tt())
await asyncio.gather(task1, task2, task3)


def start_asyncio_loop():
asyncio.run(main())


if __name__ == '__main__':
t = threading.Thread(target=start_asyncio_loop)
t.start()

while 1:
print("++++++++")
time.sleep(1)

首先,协程中 ws 必须用一个独立的线程运行,否则,主线程执行不下去。

在独立线程中,每一个 ws 都要单独开一个 event loop,否则他们都会共享一个 event loop,会造成卡顿。

另外,每一个 ws 都要弄一个 sleep ,否则会出现 cpu 过高问题。

如果把 await asyncio.sleep(0.0001) 换成 time.sleep() 也会有问题,相当于该独立线程睡眠了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK