6

python 包之 blinker 信号库教程

 2 years ago
source link: https://blog.51cto.com/autofelix/5260719
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
  • 一个基于Python的强大的信号库,它既支持简单的对象到对象通信,也支持针对多个对象进行组播
  • 支持注册全局命名信号,支持自定义命名信号
  • 支持匿名信号,线程安全
  • 支持与接收者之间的持久连接与短暂连接
  • 通过弱引用实现与接收者之间的自动断开连接
  • 支持发送任意大小的数据,支持收集信号接收者的返回值
pip install blinker

二、创建发送信号

  • 信号通过 signal() 方法进行创建
  • 信号通过 send() 方法进行发送
from blinker import signal

# 创建一个信号,并给信号命名
s = signal('autofelix')

def say(args):
print('我是飞兔小哥')

# 信号注册一个接收者
s.connect(say)

if "__main__" == __name__:
# 发送信号
s.send()

三、匿名信号

  • blinker也支持匿名信号,就是不需要指定一个具体的信号值
  • 创建的每一个匿名信号都是互相独立的
from blinker import signal

# 创建一个匿名信号
s = signal()

def say(args):
print('我是飞兔小哥')

# 信号注册一个接收者
s.connect(say)

if "__main__" == __name__:
# 发送信号
s.send()

四、组播信号

  • 组播信号是比较能体现出信号优点的特征
  • 多个接收者注册到信号上,发送者只需要发送一次就能传递信息到多个接收者
from blinker import signal

s = signal('autofelix')

def fans_one(args):
print(f'我是粉丝一号,我关注了: {args}')

def fans_two(args):
print(f'我是粉丝二号,我关注了: {args}')

s.connect(fans_one)
s.connect(fans_two)

if "__main__" == __name__:
s.send('飞兔小哥')

五、接受方订阅主题

  • 只有当指定的主题发送消息时才发送给接收方
from blinker import signal

s = signal('autofelix')


def fans(args):
print(f'我是个小粉丝,{args} 是我的老师')

# 信号注册一个接收者, 并指定主题
s.connect(animal, sender='飞兔')

if "__main__" == __name__:
for i in ['飞兔', '飞兔小姐姐', '飞兔小哥哥']:
s.send(i)

六、装饰器用法

  • 除了可以函数注册之外还有更简单的信号注册方法,那就是装饰器
from blinker import signal

s = signal('autofelix')

@s.connect
def fans_one(args):
print(f'我是粉丝一号,我关注了: {args}')

@s.connect
def fans_two(args):
print(f'我是粉丝二号,我关注了: {args}')

if "__main__" == __name__:
s.send('飞兔小哥')

七、订阅主题的装饰器

  • connect 的注册方法用着装饰器时有一个弊端就是不能够订阅主题
  • connect_via 方法支持订阅主题
from blinker import signal

s = signal('autofelix')

@s.connect_via('飞兔')
def fans(args):
print(f'我是个小粉丝,{args} 是我的老师')


if "__main__" == __name__:
for i in ['飞兔', '飞兔小姐姐', '飞兔小哥哥']:
s.send(i)

八、检查信号是否有接收者

  • 如果对于一个发送者发送消息前要准备的耗时很长
  • 为了避免没有接收者导致浪费性能的情况
  • 可以先检查某一个信号是否有接收者
  • 在确定有接收者的情况下才发送
from blinker import signal

a1 = signal('autofelix-1')
a2 = signal('autofelix-2')

def fans(sender):
print('我是个小粉丝')

a1.connect(fans)

if "__main__" == __name__:
res = a1.receivers
print(res)

if res:
a1.send()

res = a2.receivers
print(res)

if res:
a2.send()
else:
print("我是个偶像")

九、是否订阅了某个信号

  • 检查订阅者是否由某一个信号
from blinker import signal

a1 = signal('autofelix-1')
a2 = signal('autofelix-2')

def fans(sender):
print('我是个小粉丝')

a1.connect(fans)

if "__main__" == __name__:
res = a1.has_receivers_for(fans)
print(res)

res = a2.has_receivers_for(fans)
print(res)

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK