3

Python定时事件调度器

 2 years ago
source link: https://allenwind.github.io/blog/5447/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

先前写过实现简单的协程调度器一文,指出调度器的核心是调度算法。即当要执行多个任务时,需要一种调度方法,决定某个任务什么时候执行,什么时候暂停。对于一个定时事件调度器,它需要的算法很简单,以时间先后进行调度分配任务的执行。

如何让任务达到时间先后执行?很简单,对时间排序。但是,任务个数通常是一个态的过程,如果每次有新任务添加进来或删除都进行排序(sorting)性能很差。我们可以使用堆的构建过程,即添加任务—往堆中添加元素。

因此,定时事件调度器的核心就是堆这个数据结构和相关的操作。我们可以在和思想上自己实现,但Python标准库自带了sched模块,那么接下来使用它的模块接口。

  1. sched模块的基本使用
  2. 简单的剖析

基本使用方法

这个模块简单到直接通过源码就能学会使用它。scheduler类的enter方法的参数签名如下:

  • enter(delay, priority, action, args=(), kwargs=None)

支持delay、优先级别,action函数的参数。

  • enterabs(time, priority, action, args=(), kwargs=None)

enterabs则是指定绝对时间。

一个简单的延时运行例子

下面的两个延时事件分别打印相关的信息。

import sched
import time

scheduler = sched.scheduler(time.time, time.sleep)

def event_1(delay, name):
now = time.time()
elapsed = now - delay
print("event[%s]: elapsed=%s, now is %s" % (name, elapsed, time.ctime(now)))

def event_2(delay, name):
print("event 2", delay, name)

def main():
scheduler.enter(2, 1, event_1, (time.time(), 'first'))
scheduler.enter(3, 1, event_2, (time.time(), 'second'))
print("scheduler start at %s" % time.time())
scheduler.run()

if __name__ == '__main__':
main()

默认调用run方法是阻塞,知道任务都运行退出。

运行和输出:

$python3 sched_delay.py
scheduler start at 1516230099.0156646
event[first]: elapsed=2.015648365020752, now is Thu Jan 18 07:01:41 2018
event 2 1516230099.0156646 second

我们知道run方法阻塞,事件调用也是单线程,因此如果某个事件运行过长,就超出了事件之间的延时,会出现重叠问题。这样并不会导致事件丢失。但可以通过延时后面的事件来解决问题。

接下来通过一个简单例子我们试试

import time
import sched

def cpu_bound_event(name):
print("begin: ", time.ctime())
time.sleep(3)
print("end: ", time.ctime())

def main():
scheduler = sched.scheduler()
scheduler.enter(2, 1, cpu_bound_event, ('first',))
scheduler.enter(3, 1, cpu_bound_event, ('second',))
scheduler.run()

if __name__ == '__main__':
main()

运行输出:

begin:  Thu Jan 18 07:14:09 2018
end: Thu Jan 18 07:14:12 2018
begin: Thu Jan 18 07:14:12 2018
end: Thu Jan 18 07:14:15 2018

可以看到,第一个事件运行完后第二个事件马上调用,因为它等待得足够长事件了。可以理解,延时最后都是通过绝对事件来比较的。

sched模块还有的功能就是优先级设置和事件的取消。

sched模块简单剖析

简单说说sched模块。

sched模块是一个通用定时调度器,可以在指定的恰当时刻(延时delay)运行。

从源码可以sched对事件action做了简单的封装:

class Event(namedtuple('Event', 'time, priority, action, argument, kwargs')):
__slots__ = []
def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority)
def __lt__(s, o): return (s.time, s.priority) < (o.time, o.priority)
def __le__(s, o): return (s.time, s.priority) <= (o.time, o.priority)
def __gt__(s, o): return (s.time, s.priority) > (o.time, o.priority)
def __ge__(s, o): return (s.time, s.priority) >= (o.time, o.priority)

注意到,事件除了delay事件,还有优先级。对比标准库中sched模块中Event类的这种写法其实,还有一种更简洁的写法,直接使用functools.total_ordering装饰器只要实现一个不等比较即可。

scheduler类

scheduler类包括如下实例方法:

  • enter(delay, priority, action, args=(), kwargs=None)

enter方法可以指定action的参数和delay事件和优先级。它们都是Event类的参数。delay事件最后通过time = self.timefunc() + delay转化为绝对事件。

  • enterabs(time, priority, action, args=(), kwargs=None)

enterabs可以指定action执行的绝对事件。

  • def cancel(self, event)

取消事件。它本质上就是把事件从调度堆中移除。

  • run(self, blocking=True)

run方法让调度器运行,默认是非阻塞情况,即等待最近的事件来临,否则会返回阻塞事件,让开发者自己运用这端本来阻塞的事件。

class scheduler:

def run(self, blocking=True):
lock = self._lock
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
while True:
with lock:
if not q:
break
time, priority, action, argument, kwargs = q[0]
now = timefunc()
if time > now:
delay = True
else:
delay = False
pop(q)
if delay:
if not blocking:
return time - now
delayfunc(time - now)
else:
action(*argument, **kwargs)
delayfunc(0) # Let other threads run

整个事件调度的核心就是run方法。通过源码可以理解它处理事件重叠的方法。

当然我们也可以重写run方法,让scheduler以线程的方式运行。

重写scheduler类

  1. 让任务以多线程的方式运行
  2. 让scheduler以多线程方式运行
  3. 让任务周期性运行

  4. 让任务以多线程的方式运行

用于enter方法最终调用enterabs方法,因此只需要重写enterabs方法即可。

import sched
import threading
import functools
import heapq

_sentinel = object()

def threadize(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
target = functools.partial(func, *args, **kwargs)
task = threading.Thread(target=target, daemon=True)
task.start()
return task.ident
return wrapper

class ThreadingEventScheduler(sched.scheduler):

def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel):
if kwargs is _sentinel:
kwargs = {}
action = threadize(action)
event = sched.Event(time, priority, action, argument, kwargs)
with self._lock:
heapq.heappush(self._queue, event)
return event
  1. 让scheduler以多线程方式运行
import threading
import sched
import time

class ThreadingScheduler(sched.scheduler):

def run(self):
threading.Thread(target=super().run).start()

s = ThreadingScheduler()
s.enter(2, 1, print, ('first',))
s.run()
print('before')
time.sleep(3)

转载请包括本文地址:https://allenwind.github.io/blog/5447
更多文章请参考:https://allenwind.github.io/blog/archives/


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK