2

王霸雄图荣华敝屣,谈笑间尽归尘土|基于Python3双队列数据结构搭建股票/外汇交易匹配...

 3 years ago
source link: https://v3u.cn/a_id_192
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

王霸雄图荣华敝屣,谈笑间尽归尘土|基于Python3双队列数据结构搭建股票/外汇交易匹配撮合系统

首页 - Python /2021-04-22
王霸雄图荣华敝屣,谈笑间尽归尘土|基于Python3双队列数据结构搭建股票/外汇交易匹配撮合系统

如果你爱他,那么送他去股市,因为那里是天堂;如果你恨他,送他去股市,因为那里是地狱。

    在过去的一年里,新冠疫情持续冲击世界经济,全球主要股票市场的波动都相对频繁,尤其是A股,正所谓:曾经跌停难为鬼,除非解套才做人;抄底时难抛亦难,反弹无力百花残。对于波谲云诡的股票市场,新投资人还是需要谨慎入场,本次我们来利用双队列的数据结构实现实时在线交易匹配引擎,探索股票交易的奥秘。

    首先需要明确一点,证券交易和传统的B2C电商系统交易完全不同,证券交易系统提供的买卖标的物是标准的数字化资产,如美元、股票、比特币等等,它们的特点是数字计价,可分割买卖,也就是说,当我们发起买盘申请的时候,需要有价格对应的卖盘响应,才能真正完成交易,反之亦然。

    具体逻辑是:所有买盘或者卖盘的订单队列都传递给匹配引擎,匹配引擎尝试将它们的价格进行匹配。该匹配队列分为买单(按价格升序排列,出价最高的优先交易)和卖单(按降序排列,卖价最低的优先交易)。如果股票订单找不到与匹配的价格,那么该订单就继续保存在订单队列中的原适当位置。

    这里我们以实际的案例来看一下相关匹配算法的实现,假设我有两个订单队列,一个买盘,一个卖盘:

#买盘
价格 数量
100 50
100 10
90 5
88 3

#卖盘
价格 数量
170 50
180 40
199 10
200 5

    最常见的匹配算法就是“价格/时间优先”队列。订单主要根据价格进行匹配,如果以相同的价格水平存在多个订单,则最早的订单将首先被匹配,这也和队列原理相同:先入先出。

    如上所示,假设有两个订单紧挨着。第一个是以100块钱的价格买入50股的买入订单,第二个也是以相同价格买入10股的买入订单。鉴于订单与任何卖价都不匹配(由于其价格低于最低的卖价),所以它们都被放置在订单队列中。第一订单和第二订单以相同的价格水平存储,但是由于时间优先,前者比后者具有优先权。这基本上意味着,第一个订单将被放置在买入队列中的第二个订单的前面。

    而卖盘同理,首先卖价最低的优先交易,如果卖价相同,则时间优先,先进队列的先交易,可是很多散户都遇见过一种情况,就是如果手里的一支股票连续跌停,就算拼命挂低价单也很难卖出去,甚至可能直接跌到退市血本无归,这是为什么呢?

    因为当一只股票跌停时,也意味着有一大堆筹码堆积在跌停板上,想卖出去是不容易的,得排队,理论上按照“时间优先、价格优先”的交易原则排队成交,但跌停的情况下,只存在“时间优先”的考虑,也就是说,如果想在封死跌停板时把股票卖出去,就得尽早对该股票挂跌停板价格卖出。

    可实际上,一只股票跌停,不光是小部分散户卖不出去,而是大多数散户都卖不出去,都在恐慌性出货,大家都在排队卖。更何况,股票买卖是通过券商进行的,而券商有VIP快速通道也不是什么秘密,一些大资金的大户、游资、机构享有券商优待,或通过租用通道实现对盘面的快速优先买卖,这也导致了在股票涨停板抢筹、跌停板出货时存在一定的“不公平”性,也就说,交易队列并非完全遵照“价格/时间”定序,还有可能出现优先级(加权)队列,所以,跌停时跑不了,涨停时买不进就不是什么新鲜事了。

    另外,还需要注意匹配算法中的价格一直而数量匹配填充的问题,假设买单10块挂单50手,卖单10块挂单30手,则匹配的价格为10块钱,在买一卖一各显示30手,买单队列首位置就会有20手在排队,如下所示:

#买盘
价格 数量
10 50


#卖盘
价格 数量
10 30
11 50

    经过匹配算法之后:

#买盘
价格 数量
10 20


#卖盘
价格 数量
11 50

    OK,了解了基本概念,让我们用Python3具体实现,首先需要定义两个类,订单和交易,订单对象作为匹配算法之前的元素,而交易对象则是匹配之后的成交对象:

class Order:

def __init__(self, order_type, side, price, quantity):
self.type = order_type
self.side = side.lower()
self.price = price
self.quantity = quantity

class Trade:

def __init__(self, price, quantity):
self.price = price
self.quantity = quantity

    这里type是订单类型,side代表买单或者卖单,price为价格,quantity为数量。

    紧接着我们来实现订单队列:

class OrderBook:

def __init__(self, bids=[], asks=[]):

self.bids = sorted(bids, key = lambda order: -order.price)
self.asks = sorted(asks, key = lambda order: order.price)

def __len__(self):
return len(self.bids) + len(self.asks)

def add(self, order):
if order.type == 'buy':
self.bids.append(order)
elif order.type == 'sell':
self.asks.append(order)

def remove(self, order):
if order.type == 'buy':
self.bids.remove(order)
elif order.type == 'sell':
self.asks.remove(order)

    这里的订单队列很容易地实现为具有两个排序列表的数据结构,其中两个列表包含两个按价格排序的订单实例。一种按升序排序(买单),另一种按降序排序(卖单)。

    下面来实现系统的核心功能,匹配引擎:

from collections import deque

class MatchingEngine:

def __init__(self):

self.queue = deque()
self.orderbook = OrderBook()
self.trades = deque()

    首先,我们需要两个FIFO队列;一个用于存储所有传入的订单,另一个用于存储经过匹配后所有产生的交易。我们还需要存储所有没有匹配的订单。

    之后,通过调用.process(order)函数将订单传递给匹配引擎。然后将匹配生成的交易存储在队列中,然后可以依次检索(通过匹配引擎交易队列),也可以通过调用.get_trades()函数将其存储在列表中。

def process(self, order):
self.match(order)

def get_trades(self):
trades = list(self.trades)
return trades

    随后就是匹配方法:

def match(self, order):
if order.side == 'buy':


filled = 0
consumed_asks = []


for i in range(len(self.orderbook.asks)):
ask = self.orderbook.asks[i]

if ask.price > order.price:
break # 卖价过高
elif filled == order.quantity:
break # 已经匹配

if filled + ask.quantity <= order.quantity:
filled += ask.quantity
trade = Trade(ask.price, ask.quantity)
self.trades.append(trade)
consumed_asks.append(ask)
elif filled + ask.quantity > order.quantity:
volume = order.quantity-filled
filled += volume
trade = Trade(ask.price, volume)
self.trades.append(trade)
ask.quantity -= volume

# 没匹配成功的
if filled < order.quantity:
self.orderbook.add(Order("limit", "buy", order.price, order.quantity-filled))

# 成功匹配的移出订单队列
for ask in consumed_asks:
self.orderbook.remove(ask)


elif order.side == 'sell':

filled = 0
consumed_bids = []
for i in range(len(self.orderbook.bids)):
bid = self.orderbook.bids[i]

if bid.price < order.price:
break
if filled == order.quantity:
break

if filled + bid.quantity <= order.quantity:
filled += bid.quantity
trade = Trade(bid.price, bid.quantity)
self.trades.append(trade)
consumed_bids.append(bid)
elif filled + bid.quantity > order.quantity:
volume = order.quantity-filled
filled += volume
trade = Trade(bid.price, volume)
self.trades.append(trade)
bid.quantity -= volume


if filled < order.quantity:
self.orderbook.add(Order("limit", "sell", order.price, order.quantity-filled))


for bid in consumed_bids:
self.orderbook.remove(bid)
else:

self.orderbook.add(order)

    逻辑上并不复杂,基本上就是在订单队列中遍历,直到收到的订单被完全匹配为止。对于每个匹配成功的订单,都会创建一个交易对象并将其添加到交易队列中。如果匹配引擎无法完全完成匹配,则它将剩余量作为单独的订单再添加会订单队列中。

    当然了,为了应对高并发场景,实现每秒成千上万的交易量,我们可以对匹配引擎进行改造,让它具备多任务异步执行的功能:

from threading import Thread
from collections import deque

class MatchingEngine:

def __init__(self, threaded=False):

self.queue = deque()

self.orderbook = OrderBook()

        self.trades = deque()

self.threaded = threaded
if self.threaded:
self.thread = Thread(target=self.run)
self.thread.start()

    改造线程方法:

def process(self, order):
if self.threaded:
self.queue.append(order)
else:
self.match(order)

    最后,为了让匹配引擎能够以线程的方式进行循环匹配,添加启动入口:

def run(self):

while True:
if len(self.queue) > 0:
order = self.queue.popleft()
self.match(order)
print(self.get_trades())
print(len(self.orderbook))

    大功告成,完整代码如下:

class Order:

def __init__(self, order_type, side, price, quantity):
self.type = order_type
self.side = side.lower()
self.price = price
self.quantity = quantity

class Trade:

def __init__(self, price, quantity):
self.price = price
self.quantity = quantity

class OrderBook:

def __init__(self, bids=[], asks=[]):

self.bids = sorted(bids, key = lambda order: -order.price)
self.asks = sorted(asks, key = lambda order: order.price)

def __len__(self):
return len(self.bids) + len(self.asks)

def add(self, order):
if order.type == 'buy':
self.bids.append(order)
elif order.type == 'sell':
self.asks.append(order)

def remove(self, order):
if order.type == 'buy':
self.bids.remove(order)
elif order.type == 'sell':
self.asks.remove(order)


from threading import Thread
from collections import deque

class MatchingEngine:

def __init__(self, threaded=False):


order1 = Order(order_type="buy",side="buy",price=10,quantity=10)
order2 = Order(order_type="sell",side="sell",price=10,quantity=20)

self.queue = deque()
self.orderbook = OrderBook()

self.orderbook.add(order1)
self.orderbook.add(order2)


self.queue.append(order1)
self.queue.append(order2)

self.trades = deque()
self.threaded = threaded
if self.threaded:
self.thread = Thread(target=self.run)
self.thread.start()


def run(self):

while True:
if len(self.queue) > 0:
order = self.queue.popleft()
self.match(order)
print(self.get_trades())
print(len(self.orderbook))


def process(self, order):
if self.threaded:
self.queue.append(order)
else:
self.match(order)

def get_trades(self):
trades = list(self.trades)
return trades

def match(self, order):
if order.side == 'buy':


filled = 0
consumed_asks = []


for i in range(len(self.orderbook.asks)):
ask = self.orderbook.asks[i]

if ask.price > order.price:
break # 卖价过高
elif filled == order.quantity:
break # 已经匹配

if filled + ask.quantity <= order.quantity:
filled += ask.quantity
trade = Trade(ask.price, ask.quantity)
self.trades.append(trade)
consumed_asks.append(ask)
elif filled + ask.quantity > order.quantity:
volume = order.quantity-filled
filled += volume
trade = Trade(ask.price, volume)
self.trades.append(trade)
ask.quantity -= volume

# 没匹配成功的
if filled < order.quantity:
self.orderbook.add(Order("limit", "buy", order.price, order.quantity-filled))

# 成功匹配的移出订单队列
for ask in consumed_asks:
self.orderbook.remove(ask)


elif order.side == 'sell':

filled = 0
consumed_bids = []
for i in range(len(self.orderbook.bids)):
bid = self.orderbook.bids[i]

if bid.price < order.price:
break
if filled == order.quantity:
break

if filled + bid.quantity <= order.quantity:
filled += bid.quantity
trade = Trade(bid.price, bid.quantity)
self.trades.append(trade)
consumed_bids.append(bid)
elif filled + bid.quantity > order.quantity:
volume = order.quantity-filled
filled += volume
trade = Trade(bid.price, volume)
self.trades.append(trade)
bid.quantity -= volume


if filled < order.quantity:
self.orderbook.add(Order("limit", "sell", order.price, order.quantity-filled))


for bid in consumed_bids:
self.orderbook.remove(bid)
else:

self.orderbook.add(order)

    测试一下:

me = MatchingEngine(threaded=True)

me.run()

    返回结果:

liuyue:mytornado liuyue$ python3 "/Users/liuyue/wodfan/work/mytornado/test_order_match.py"
[<__main__.Trade object at 0x102c71750>]
2
[<__main__.Trade object at 0x102c71750>, <__main__.Trade object at 0x102c71790>]
1

    没有问题。

    结语:所谓天下熙熙,皆为利来;天下攘攘,皆为利往。太史公这句名言揭示了股票市场的本质,人性的本能就是追求利益,追求利益却要在决对原则之下,但是资本市场往往是残酷的,王霸雄图,荣华敝屣,到最后,也不过是尽归尘土。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK