2

Rhino | Rhino_Collect 编写原理

 1 year ago
source link: https://benpaodewoniu.github.io/2022/11/18/tomoon42/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Rhino_CollectRhino 的核心模块,是采集数据的模块。

整个模块做成一个库,通过传递配置来进行采集数据,整个模块一共有 3 个核心类。

  • Restful 类
  • Websocket 类
  • HeartBeat 类

Restful 类

有的交易所,或者链上数据只能通过 restful 的方式获得,所以,需要一个 restful 请求,下面举一个简单的例子。

import asyncio

import aiohttp


async def get_binance():
session = aiohttp.ClientSession()
while 1:
res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt", proxy="http://127.0.0.1:1087")
response = await res.text()
print("restful binance")
await asyncio.sleep(0.5)


async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket binance")


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["[email protected]@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket mexc")


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(get_binance())
loop.create_task(ws_binance())
loop.create_task(ws_mexc())
loop.run_forever()

上面的 get_binance 就是 restful 的请求方式,但是,通过学习

get_binance 改成下面这种形式。

import asyncio

import aiohttp


async def restful(loop):
session = aiohttp.ClientSession()
loop.create_task(get_binance(session, loop))


async def get_binance(session, loop):
res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt", proxy="http://127.0.0.1:1087")
response = await res.text()
print("restful binance")
await asyncio.sleep(0.5)
loop.create_task(get_binance(session, loop))


async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket binance")


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["[email protected]@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket mexc")


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(restful(loop))
loop.create_task(ws_binance())
loop.create_task(ws_mexc())
loop.run_forever()

websocket

websocket 就是上面的代码

HeartBeat 类

这个是心跳模块,主要是监控各个交易所的服务是否还活着。

目前,Rhino_Collect 并没有断开长久后,进行通知的服务「钉钉、微信等」,相关的通知,通过一个专门的监控模块,进行监控。

HeartBeat 主要是,进行 websocket 连接保活。

import asyncio

import aiohttp
import time

websock_data = {}


async def restful(loop):
session = aiohttp.ClientSession()
loop.create_task(get_binance(session, loop))


async def get_binance(session, loop):
res = await session.get("https://fapi.binance.com/fapi/v1/depth?symbol=btcusdt", proxy="http://127.0.0.1:1087")
response = await res.text()
print("restful binance")
await asyncio.sleep(0.5)
loop.create_task(get_binance(session, loop))


async def ws_binance():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://fstream.binance.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{"method": "SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@depth"],"id": 1}')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket binance")
websock_data["binance"] = int(time.time() * 1000)


async def ws_mexc():
session = aiohttp.ClientSession()
async with session.ws_connect('wss://wbs.mexc.com/ws', proxy="http://127.0.0.1:1087") as ws:
await ws.send_str('{ "method":"SUBSCRIPTION", "params":["[email protected]@BTCUSDT"] }')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("websocket mexc")
websock_data["mexc"] = int(time.time() * 1000)


async def listen():
while 1:
now = int(time.time() * 1000)
for key, store_time in websock_data.items():
if now - store_time > 10_000:
print(f"{key} 断开,将进行重连")
await asyncio.sleep(1)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(restful(loop))
loop.create_task(ws_binance())
loop.create_task(ws_mexc())
loop.create_task(listen())
loop.run_forever()

数据流走向

数据拿到后,通过向 redis 中进行数据存储,采用生产者和消费者模式,将数据推送给下一个模块。

配置化开启服务

Rhino_Collect 通过配置化的方式,进行到底采集什么数据,具体细节在这里就不展开了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK