python离散事件仿真库SimPy官方教程
参考simpy官网。SimPy有三个关键的组成部分:- ==**environment**==(环境):存储events在事件列表里,不断追踪当前仿真时间- ==**event**==(事件):仿真过程中的各种异步事件- ==**process**== function(进程函数):用来实现你的仿真模型,也就是定义你的仿真行为,它们是普通的Python生成器函数,可以生成events实例.....
参考:SimPy Discrete event simulation for Python
建议先简单了解仿真原理:离散事件仿真原理DES
简单介绍
simpy的实现关键在于生成器的使用,通过例子说明一下
# 生成器function:use yield
def generator(n):
while True:
for j in range(n):
yield j
# 普通function:use return
def function(n):
for j in range(n):
return j
使用function定义ourfunc,输出值为0
ourfunc = function(3)
print(ourfunc)
0
再次输出ourfunc,值仍然为1
print(ourfunc)
0
使用generator定义,第一次输出为0
ourgen = generator(3)
print(next(ourgen))
0
再次输出为1
print(next(ourgen))
1
生成器函数和普通函数区别在于yield可以暂时停止进程,再次调用时会从中断位置开始,而函数每次调用都是从头开始,使用generator就能模仿离散事件,实现事件调度与进程交互。
接下来看一个模拟红绿灯的例子
import simpy
def main():
env = simpy.Environment()
env.process(traffic_light(env))
env.run(until=120)
print('simulation done')
# traffic_light是一个process,传入env是告诉traffic_light它是在env里执行的,而不是其它环境
def traffic_light(env):
while True:
print(f'light green at :{env.now}')
yield env.timeout(30)
print(f'light yellow at :{env.now}')
yield env.timeout(5)
print(f'light red at :{env.now}')
yield env.timeout(20)
if __name__ == '__main__':
main()
light green at :0
light yellow at :30
light red at :35
light green at :55
light yellow at :85
light red at :90
light green at :110
simulation done
运行逻辑如下:执行main后首先生成环境env = simpy.Environment()
,在环境中加入一个process也就是traffic_light
,traffic_light
使用了三次env.timeout()
,process本身就是一个event,在traffic_light
中先输出print(f'light green at :{env.now}')
,然后遇到yield env.timeout(30)
,等待30s,转到env.run(until=120)
,表明环境要运行120,run()
表明一直执行step()
直到until条件满足,step()
是处理下一个event,因此环境会接着上次执行中断的位置到print(f'light yellow at :{env.now}')
,如此循环直到结束。
1.SimPy basics
1.1 how SimPy work
将SimPy 拆解开,可以看到它就是一个异步事件调度程序,你先生成某个事件(events)并在指定的仿真时间点调度该事件,事件按照优先级,仿真时间,递增的事件id进行排序。事件有一个回调列表(callback),这些回调会在事件被循环触发和处理时开始执行。事件也可能有返回值。
SimPy有三个关键的组成部分:
- environment(环境):存储events在事件列表里,不断追踪当前仿真时间
- event(事件):仿真过程中的各种异步事件
- process function(进程函数):用来实现你的仿真模型,也就是定义你的仿真行为,它们是普通的Python生成器函数,可以生成events实例
如果一个 process function生成一个event,SimPy将这个process添加到event的回调中并且暂停这个process,等待event被触发和处理。当等待event的进程恢复时,会收到event的返回值。下面是一个简单的例子
>>> import simpy
>>> def example(env):
value = yield env.timeout(env,delay=1,value=42)
print('now=%d,value=%d'%(env.now,value))
>>> env = simpy.Environment()
>>> p = env.process(example(env))
>>> env.run()
now=1,value=42
(1)example() 作为process首先创建一个 Timeout event ,参数包括env,delay,value,Timeout会在now+delay的时候开始执行,因此需要传入env获取now。其它的event通常会在当前仿真时间点被调用。
(2)process因为调用了event而中断,当SimPy处理完 Timeout event ,process就会恢复,并且接受来自event的值42,但是没有返回值也可以
(3)最后这个process输出当前仿真时间点(通过env.now获取)以及Timeout的值
如果已经定义了所有需要的process函数,就可以开始实例化仿真需要的对象,一般先要创建Environmet实例,因为后续的其它实例都是以仿真环境为基础。
1.2 yield
SimPy中yield的使用非常关键,其理解过程为 iterables->generators->yield
- iterables:迭代器
当你创建一个列表时,你可以一个一个读取元素,这个过程称为迭代。
>>> mylist = [1,2,3]
>>> for i in mylist:
print(i)
1
2
3
mylist是一个迭代器,使用循环创建列表
>>> mylist = [x*x for x in range(3)]
>>> for i in mylist:
print(i)
0
1
4
任何可以使用for…in… 操作的都是一个迭代器,如列表,字符串,文件。
这些迭代器是简单的,因为你可以按照自己的想法读取所有的值,所有的值都已经存储在记忆里,但是如果你某一时刻不需要这么多值,内存就被浪费了。
- Generators:生成器
生成器也是迭代器,是一种你每次只能迭代一次的迭代器。生成器不会存储所有的值在记忆里,它们动态生成值。
>>> mygenerator = (x*x for x in range(3))
>>> for i in mygenerator:
print(i)
0
1
4
相同的代码只是用 () 代替 [],但是你不能执行 for i in mygenerator 两次,因为生成器只能使用一次,生成器计算出0然后忘记这个值,再计算出1,最后是4,一个接着一个
- yield
yield 是一个用法与return类似的关键词,只是这个函数返回一个生成器
>>> def create_generator():
mylist = range(3)
for i in mylist:
yield i*i
>>> mygenerator = create_generator()
>>> print(mygenerator)
<generator object create_generator at 0x7f7e702c86d0>
>>> for i in mygenerator:
print(i)
0
1
4
总结:调用包含yield的函数会返回一个生成器,函数中的代码并没有运行,调用生成器后,每次遇到 yield 时函数会暂停并保存当前所有的运行信息(保留局部变量),返回 yield 的值, 并在下一次迭代时从当前位置继续运行,直到生成器被全部遍历完。
def gen(n):
for i in range(n):
print('before')
yield i
print('after')
a = gen(4) # 没有运行代码
print(a.__next__()) # 运行到yield停止,返回yield值=1
before
0
print(a.__next__()) # 从上次停止的位置继续运行,直到再遇见yield
after
before
1
for i in a: # 直接用循环遍历生成器
print(i)
after
before
2
after
before
3
after
2.Environment
普通的仿真使用 Environment,实时的仿真使用RealtimeEnvironment
2.1 Simulation control
仿真控制中最重要的方法是 Environment.run()
- 直接调用env.run(),仿真会一直进行直到没有事件要处理了
- 多数情况下需要设置仿真时间,可以传入until参数,如env.run(until=10),仿真将会在时钟到达10时结束。随着不断增加的until值重复调用此函数可以绘制出进程图
for i in range(100):
env.run(until=i)
progressbar.update(i)
- until不一定是数值,可以传入任何其它event,当event执行完run()后就会返回。如果当前时间为0,env.run(until=env.timeout(5))等价于env.run(until=5)。一个process也是一个event
>>> import simpy
>>> def myproc(env):
yield env.timeout(1)
return 'fly'
>>> env = simpy.Environment()
>>> proc = env.process(myproc(env))
>>> env.run(until = proc)
'fly'
为了能逐步按event执行仿真,环境会提供peek()和step()
- peek():返回下一个调度event的时间,如果没有,返回infinity
- step():执行下一个调度的event,如果没有,响应EmptySchedule
step() 只执行下一个进程,run() 一直执行直到条件终止
典型用法如下循环
until = 10
while env.peek() < until:
env.step()
2.2 State access
- Environment.now:通过Environment.now可以获取当前仿真时间,默认可以now在0时刻开始,可以传入initial_time设置开始时间
- Environment.active_process:获取当前执行的进程,只有在进程函数里能获取到该信息
import simpy
def subfunc(env):
print(env.active_process)
def proc(env):
while True:
print(env.active_process)
subfunc(env)
yield env.timeout(1)
env = simpy.Environment()
p1 = env.process(proc(env))
>>>env.active_process
None
>>>env.step()
<Process(proc) object at 0x7fb34007ee20>
<Process(proc) object at 0x7fb34007ee20>
>>>env.active_process
None
2.3 Event creation
创建event需要导入simpy.events再实例化类并且传入环境中,为了方便,Environment提供了一些event创建的简写,如 Environment.event() 等同于simpy.events.Event(env)
- Environment.event()
- Environment.process()
- Environment.timeout()
- Environment.all_of()
- Environment.any_of()
3.Events
simpy包含很多用于各种目的的event,均继承自simpy.events.Event
events.Event
|
+— events.Timeout
|
+— events.Initialize
|
+— events.Process
|
+— events.Condition
| |
| +— events.AllOf
| |
| +— events.AnyOf
.
.
.
3.1 Event basics
一个event有以下三种状态:
- 可能发生(triggered = False)
- 将要发生(triggered = True)
- 已经发生(processed = True))
event随着时间的推移,按照如上顺序遍历更新状态。初始时刻,events没有被触发,只是记忆中的对象。
- Event.triggered:True,event被触发也就是在某个时间点被调用,进入simpy的事件队列里
- Event.callbacks:回调列表,只要event没有被处理,就可以添加回调。回调就是event以参数的形式存储在Event.callbacks列表中
- Event.processed:True,simpy把event从事件队列中弹出,开始处理并调用所有回调信息,之后将不再添加回调
- Event.value:events返回的值,也可以在process中使用value = yield event
event被触发可能成功或失败,例如一个event在计算快结束且一切正常时被触发将会succeed,但是如果在计算过程中被触发,将会fail。
- Event.succeed(value=None):触发一件event并将其标记为成功
- Event.fail(exception):触发event失败,并闯入原因
- Event.trigger(event):触发event通用方法,返回成功或者失败的值
3.2 Example usages for Event
import simpy
class school:
def __init__(self, env):
self.env = env
self.class_end = env.event()
# 创建三个pupil进程和一个bell进程
self.pupil_procs = [env.process(self.pupil()) for i in range(3)]
self.bell_procs = env.process(self.bell())
def bell(self):
for i in range(2): # (1)bell循环
yield self.env.timeout(45) # (2)等待45
self.class_end.succeed() # (3)触发class_end标记成功
self.class_end = self.env.event() # (4)生成新的class_end事件
def pupil(self):
for i in range(2): # (5)pupil循环
print(r'\0/',end=' ') # (6)输出\0/
yield self.class_end #(7)中断处理class_end事件
school = School(env)
env = simpy.Environment()
env.run()
\o/ \o/ \o/
\o/ \o/ \o/
首先创建三个pupil进程和一个bell进程,依次处理这些进程
- event1:语句5,6,7,输出\0/,等待class_end
- event2:语句5,6,7,输出\0/,等待class_end
- event3:语句5,6,7,输出\0/,等待class_end
- event4:语句1,2,处理timeout(45),无输出
- event5:语句3,4,class_end触发成功,执行5,6,7,输出 \o/ \o/ \o/
3.3 Let time pass by: the Timeout
为了在仿真中能真正让时间流动,需要使用Timeout event
- Timeout(delay, value=None):Timeout事件在创建时被触发,在now+delay时被调用。
3.4 Processes are events, too
process也是进程,如下代码,pa可以yield sub,处理pa时遇到yield会暂停,去处理sub,sub遇到yield先执行timeout(1),然后返回23给pa的yield,pa继续运行到return 23
def sub(env):
yield env.timeout(1)
return 23
def pa(env):
ret = yield env.process(sub(env))
return ret
import simpy
env = simpy.Environment()
env.run(env.process(pa(env)))
Out[6]: 23
如果需要process延迟开始,可以使用simpy.util.start_delayed()
def pa1(env):
sub_proc = yield simpy.util.start_delayed(env, sub(env), delay=3)
ret = yield sub_proc
return ret
3.5 Waiting for multiple events at once
如果想要同一时刻等待多件event,simpy提供AnyOf和AllOf,属于Condition 条件事件。
- AnyOf(env, events):events中至少有个event被触发
- AllOf(env, events):events中所有event被触发
from simpy.events import AnyOf, AllOf, Event
events = [Event(env) for i in range(3)]
a = AllOf(env, events)
b = AnyOf(env, events)
一个condition event的值是以字典形式按序存储每个event的值,AnyOf和AllOf也可以用&和|代替
def test_condition(env):
t1, t2 = env.timeout(1, value='spam'), env.timeout(2, value='egg')
ret = yield t1 | t2
assert ret == {t1: 'spam'}
t1, t2 = env.timeout(1, value='spam'), env.timeout(2, value='egg')
ret = yield t1 & t2
assert ret == {t1: 'spam',t2:'egg'}
e1, e2, e3 = [env.timeout(1) for i in range(3)]
yield (e1 | e2) & e3
assert all(e.processed for e in [e1, e2, e3])
proc = env.process(test_condition(env))
env.run()
也可以直接取值
def fetch_value(env):
t1,t2 = env.timeout(1,value='spam'), env.timeout(2, value='egg')
r1,r2 = (yield t1 & t2).values()
assert r1=='spam' and r2=='egg'
proc = env.process(fetch_value(env))
env.run()
4.Process Interaction
离散事件的模拟主要通过进程交互实现
- Sleep until woken up (passivate/reactivate)
- Waiting for another process to terminate
- Interrupting another process
4.1 Sleep until woken up
假设要模拟带有智能电池充电控制器的电动汽车,车辆行驶时,控制器睡眠,只要车子连接上充电桩开始充电,控制器就会被激活。
import simpy
import random
env = simpy.Environment()
class Charge:
def __init__(self, env):
self.env = env
self.drive_proc = env.process(self.drive(env))
self.bat_ctrl_proc = env.process(self.bat_ctrl(env))
self.bat_ctrl_reactivate = env.event()
def drive(self, env):
while True:
yield env.timeout(random.randint(20, 40)) # (1)骑行20-40min
print('start parking at:{}'.format(env.now)) # (2)停车时间点
self.bat_ctrl_reactivate.succeed() # (3)触发充电事件
self.bat_ctrl_reactivate = env.event() # (4)生成下一次充电事件
yield env.timeout(random.randint(60, 360)) # (5)停车1-6h
print('stop parking at:{}'.format(env.now)) # (6)停车结束事件点
def bat_ctrl(self, env):
while True:
print('passivating at:', env.now) # (7)充电未被激活
yield self.bat_ctrl_reactivate # (8)等待充电事件被触发
print('reactivated at:', env.now) # (9)充电激活
yield env.timeout(random.randint(30, 90)) # (10)充电30-90min
ev = Charge(env)
env.run(until=200)
passivating at: 0
start parking at:25
reactivated at: 25
passivating at: 74
stop parking at:106
start parking at:126
reactivated at: 126
passivating at: 160
物理过程:骑行,停车,充电,其中停车期间内完成充电,属于并行
逻辑过程:bat_ctrl_reactivate事件触发成功,电动车才能开始充电,而bat_ctrl_reactivate只有在停车后才能被触发,如此便能满足骑行,停车,开始充电,停止充电,结束停车,开始骑行的循环过程。
- event1:语句1,无输出
- event2:语句7,8,输出passivating at: 0
- event3:语句2,3,输出start parking at:25
- event4:语句8,9,输出reactivated at: 25,
- event5:语句7,8,输出passivating at: 74
- event6:语句5,6,输出stop parking at:106
重复上述过程
4.2 Waiting for another process to terminate
前文的电动车充电模型有个问题:如果车子停车时间比充电时间要短,模型将不成立。因此需要调整,充电事件在开始停车的时候被触发,同时满足充电时间和停车时间,才会停车
import simpy
import random
env = simpy.Environment()
class Charge:
def __init__(self, env):
self.env = env
self.drive_proc = env.process(self.drive(env))
def drive(self, env):
while True:
yield env.timeout(random.randint(20, 40)) # 1.骑行20-40min
print('start parking at:{}'.format(env.now)) # 2.停车
charging = env.process(self.bat_ctrl(env)) # 3.创建充电进程(事件)
parking = env.timeout(random.randint(60, 360)) # 4.创建停车事件
yield charging & parking # 5.两个事件同时结束
print('stop parking at:{}'.format(env.now)) # 6.停车结束
def bat_ctrl(self, env):
while True:
print('bat start at:', env.now) # 7.开始充电点
yield env.timeout(random.randint(30, 90)) # 充电30-90min
print('bat done at:', env.now) # 8.充电结束
ev = Charge(env)
env.run(until=150)
start parking at:37
bat start at: 37
bat done at: 110
stop parking at:128
使用yield charging & parking控制充电和停车都结束才输出stop parking
- event1:语句1,无输出
- event2:语句2,3,4,5,输出start parking at:37
- event3:语句7,8,输出bat start at: 37
- event4:语句9,输出bat done at: 110
- event5:语句5,6,输出stop parking at:128
4.4 Interrupting another process
假设你的行程很急,需要打断充电立刻骑行,可以使用进程的interrupt(),设置的parking时间结束而充电未结束时,打断充电事件
import simpy
import random
env = simpy.Environment()
class Charge:
def __init__(self, env):
self.env = env
self.drive_proc = env.process(self.drive(env))
def drive(self, env):
while True:
yield env.timeout(random.randint(20, 40)) # 1.骑行20-40min
print('start parking at:{}'.format(env.now)) # 2.停车
charging = env.process(self.bat_ctrl(env)) # 3.创建充电进程(事件)
parking = env.timeout(60) # 4.创建停车事件
yield charging | parking # 5.停车结束就可离开
if not charging.triggered: # 6.charging还没有结束执行干扰
charging.interrupt('need to go')
print('stop parking at:{}'.format(env.now)) # 7.停车结束
def bat_ctrl(self, env):
print('bat start at:', env.now) # 8.开始充电点
try:
yield env.timeout(random.randint(60, 90)) # 9.充电60-90min
print('bat done at:', env.now) # 10.充电结束
except simpy.Interrupt as i:
print('bat interrupt at:', env.now, 'mes', i.cause) # 11.充电被干扰
ev = Charge(env)
env.run(until=100)
start parking at:24
bat start at: 24
stop parking at:84
bat interrupt at: 84 mes need to go
parking时间:now+60
charging时间:now+60-90
yield charging | parking语句中满足parking
5.Shared Resources
共享资源是另一种进程交互建模的方式,通过形成拥塞点使进程排队使用资源。simpy资源系统中有三个组成部分:
- Resources:一次可由有限数量的流程使用的资源(例如,具有有限数量的燃油泵的加油站)
- Containers:对同质、无差别散装的生产和消费进行建模的资源。它可以是连续的(如水)或离散的(如苹果)
- Stores:允许生产和使用 Python 对象的资源
5.1 The basic concept of resources
所有资源都有相同的基础定义:资源本身就是一种容器,通常容量有限。进程可以从资源里执行放取操作,如果资源满了或者空了,必需排队等待
BaseResource(capacity):
put_queue
get_queue
put(): event
get(): event
每个资源都有最大容量和两个队列
- put():一个用于想要将某些内容放入其中的进程
- get():一个用于想要从中取出某些内容的进程
put() 和 get() 方法都返回一个事件,该事件在相应的操作成功时触发。
Resources and interrupts
当一个进程在等待put或者get事件成功时,它可能会被其它进程打扰,中断后有两种情况:
- 继续申请资源(再次yield这个事件)
- 停止申请资源,调用事件的**cancel()**方法
资源系统是模块化,可扩展的,资源使用专用的队列和事件类型,因此提供了排序队列,优先级,抢占等功能
5.2 Resources
Resources can be used by a limited number of processes at a time (e.g., a gas station with a limited number of fuel pumps).
process使用Resources,一旦结束就释放Resources,例如汽车到达加油站,使用一个燃油泵(如果有),加完油后就释放这个燃油泵。
- request():请求Resources被建模成将进程令牌放入Resources中,等效于put()
- release():释放Resources被建模成从Resources中获取进程,等效于get()
Resources有三种:
- Resource
- PriorityResource:按优先级给进程排序
- PreemptiveResource:高优先级进程可以抢占较低优先级进程的资源
(1)Resource
- Resource(env, capacity=1):信号量
不只是记录当前用户,而是将request事件存储作为每个用户的访问令牌,这样也方便添加抢占操作。使用Resource的基本用法如下:
import simpy
def resource_user(env, resource):
request = resource.request() # 生成request事件申请资源
yield request # 等待访问
yield env.timeout(1) # 用资源做某事
resource.release(request) # 释放资源
env = simpy.Environment()
res = simpy.Resource(env, capacity=1)
user = env.process(resource_user(env, res))
env.run()
资源必须被释放,如果在等待或者使用资源的过程中,出现干扰,需要使用try: … finally: … 结构,为了避免大量使用,request事件可以作为context manager(上下文资源管理器)被使用,如此资源会自动释放
import simpy
def resource_user(env, resource):
with resource.request() as req:
yield req
yield env.timeout(1) # 资源会自动释放
env = simpy.Environment()
res = simpy.Resource(env, capacity=1)
user = env.process(resource_user(env, res))
env.run()
Resource能检索当前用户或排队用户的列表、当前用户数和资源的容量
import simpy
env = simpy.Environment()
res = simpy.Resource(env, capacity=2)
def print_state(res):
print(f'{res.count} of {res.capacity} slots are allocated')
print(f'users:{res.users}')
print(f'queued events:{res.queue}')
def user(res):
print_state(res)
with res.request() as req:
yield req
print_state(res)
print_state(res)
procs = env.process(user(res))
env.run()
0 of 2 slots are allocated
users:[]
queued events:[]
1 of 2 slots are allocated
users:[<Request() object at 0x7ff4d85bf580>]
queued events:[]
0 of 2 slots are allocated
users:[]
queued events:[]
资源自动释放,因此第三句print_state(res)输出和初始状态一样
(3)PriorityResource
事实上在现实世界里,不是所有事物都同等重要,PriorityResource会给资源的request添上优先级,通过整数体现,值越小优先级越高
import simpy
env = simpy.Environment()
res = simpy.PriorityResource(env, capacity=1)
def resource_user(name, env, resource, wait, prio):
yield env.timeout(wait)
with resource.request(priority=prio) as req:
print(f'{name} requesting at {env.now} with priority={prio}')
yield req
print(f'{name} got resource at {env.now}')
yield env.timeout(3)
p1 = env.process(resource_user(1, env, res, wait=0, prio=0))
p2 = env.process(resource_user(2, env, res, wait=1, prio=0))
p3 = env.process(resource_user(3, env, res, wait=2, prio=-1))
env.run()
1 requesting at 0 with priority=0
1 got resource at 0
2 requesting at 1 with priority=0
3 requesting at 2 with priority=-1
3 got resource at 3
2 got resource at 6
yield env.timeout(wait) 语句使得p1最快request资源并且在now=3释放,在此期间,p2,p3都经过wait时间开始request资源,即资源被释放可用时,p2,p3在申请同一个资源,p3优先级更高,因此p3在now=3时刻优先得到资源
(3)PreemptiveResource
有时新的request非常重要,插队已经不满足要求,它们需要踢掉现在资源的使用者,这就是可抢占资源PreemptiveResource
import simpy
env = simpy.Environment()
res = simpy.PreemptiveResource(env, capacity=1)
def resource_user(name, env, resource, wait, prio):
yield env.timeout(wait)
with resource.request(priority=prio) as req:
print(f'{name} requesting at {env.now} with priority={prio}')
yield req
print(f'{name} got resource at {env.now}')
try:
yield env.timeout(3)
except simpy.Interrupt as interrupt:
by = interrupt.cause.by
usage = env.now - interrupt.cause.usage_since
print(f'{name} got preempted by {by} at {env.now}'
f'after {usage}')
p1 = env.process(resource_user(1, env, res, wait=0, prio=0))
p2 = env.process(resource_user(2, env, res, wait=1, prio=0))
p3 = env.process(resource_user(3, env, res, wait=2, prio=-1))
env.run()
1 requesting at 0 with priority=0
1 got resource at 0
2 requesting at 1 with priority=0
3 requesting at 2 with priority=-1
1 got preempted by <Process(resource_user) object at 0x7fe0a01332b0> at 2after 2
3 got resource at 2
2 got resource at 5
在now=2的时候,p3申请资源,由于其优先级比p1高,因此可以抢占,3 got resource at 2。
PreemptiveResource继承自PriorityResource,给request增加了一个抢占标志(默认True),如果设置resource.request(priority=x, preempt=False),进程就无法抢占资源,只能按照队列优先级。PreemptiveResource只能抢占比自己优先级更低的进程的资源,如下例子表明是在优先级的基础上抢占。
import simpy
env = simpy.Environment()
res = simpy.PreemptiveResource(env, capacity=1)
def user(name, env, res, prio, preempt):
with res.request(priority=prio, preempt=preempt) as req:
try:
print(f'{name} request at {env.now}')
assert isinstance(env.now, int), type(env.now)
yield req
assert isinstance(env.now, int), type(env.now)
print(f'{name} got resourec at {env.now}')
yield env.timeout(3)
except simpy.Interrupt:
print(f'{name} got preempted at {env.now}')
>>>A = env.process(user('A', env, res, prio=0, preempt=True))
>>>env.run(until=1)
A request at 0
A got resourec at 0
>>>B = env.process(user('B', env, res, prio=-2, preempt=False))
>>>C = env.process(user('C', env, res, prio=-1, preempt=True))
>>>env.run()
B request at 1
C request at 1
B got resourec at 3
C got resourec at 6
- now=0:进程A首先申请资源,因此在now=0就能成为用户
- now=1:进程B和C申请资源,进程B的优先级比A高但是preempt=False,因此B只能排队等待,进程C优先级比A高并且preempt=True,可以抢占A,但是由于B排在C的前面阻止了C的抢占,C优先级比B低因此C不能抢占B和A
- now=3:进程B获取资源
- now=6:进程C获取资源
对于混合资源抢占的情况,一定要分清楚优先级和是否可以抢占。
5.3 Containers
Containers help you modelling the production and consumption of a homogeneous, undifferentiated bulk. It may either be continuous (like water) or discrete (like apples).
可以使用Containers建模加油站的油箱,邮轮会增加油箱里的汽油量,而汽车会减少汽油量。如下是一个加油站的模型,有限数量的加油机(Resource)和一个油箱(Container)
class GasStation:
... def __init__(self, env):
... self.fuel_dispensers = simpy.Resource(env, capacity=2)
... self.gas_tank = simpy.Container(env, init=100, capacity=1000)
... self.mon_proc = env.process(self.monitor_tank(env))
...
... def monitor_tank(self, env):
... while True:
... if self.gas_tank.level < 100:
... print(f'Calling tanker at {env.now}')
... env.process(tanker(env, self))
... yield env.timeout(15)
>>>
>>>
>>> def tanker(env, gas_station):
... yield env.timeout(10) # Need 10 Minutes to arrive
... print(f'Tanker arriving at {env.now}')
... amount = gas_station.gas_tank.capacity - gas_station.gas_tank.level
... yield gas_station.gas_tank.put(amount)
>>>
>>>
>>> def car(name, env, gas_station):
... print(f'Car {name} arriving at {env.now}')
... with gas_station.fuel_dispensers.request() as req:
... yield req
... print(f'Car {name} starts refueling at {env.now}')
... yield gas_station.gas_tank.get(40)
... yield env.timeout(5)
... print(f'Car {name} done refueling at {env.now}')
>>>
>>>
>>> def car_generator(env, gas_station):
... for i in range(4):
... env.process(car(i, env, gas_station))
... yield env.timeout(5)
>>>
>>>
>>> env = simpy.Environment()
>>> gas_station = GasStation(env)
>>> car_gen = env.process(car_generator(env, gas_station))
>>> env.run(35)
Car 0 arriving at 0
Car 0 starts refueling at 0
Car 1 arriving at 5
Car 0 done refueling at 5
Car 1 starts refueling at 5
Car 2 arriving at 10
Car 1 done refueling at 10
Car 2 starts refueling at 10
Calling tanker at 15
Car 3 arriving at 15
Car 3 starts refueling at 15
Tanker arriving at 25
Car 2 done refueling at 30
Car 3 done refueling at 30
5.3 Stores
Using Stores you can model the production and consumption of concrete objects (in contrast to the rather abstract “amount” stored in containers). A single Store can even contain multiple types of objects.
Store
FilterStore
:使用自定义函数过滤从store中取出的对象PriorityStore
:按照优先级从store中取出对象
以下是生产者-消费者模型
import simpy
def producer(env, store):
for i in range(100):
yield env.timeout(2) # 2分钟生产一次
yield store.put(f'spam:{i}') # 存放到store中
print(f'produced spam at', env.now)
def consumer(name, env, store):
while True:
yield env.timeout(1) # 间隔1分钟申请一次
print(name, 'request spam at', env.now)
item = yield store.get() # 从store中取出对象
print(name, 'get', item, 'at', env.now)
env = simpy.Environment()
store = simpy.Store(env, capacity=1)
prod = env.process(producer(env, store)) # 一个生产者
cons = [env.process(consumer(i, env, store)) for i in range(2)] # 两个消费者
env.run(until=5)
store有以下属性:
capacity
:容量items
:当前在store中的物品put_queue
和get_queue
:取出和存放队列
FilterStore
可以用来建立机器有不同属性的机械车间
from collections import namedtuple
import simpy
Machine = namedtuple('Machine', 'size, duration')
m1 = Machine(1, 2)
m2 = Machine(2, 1)
def user(name, env, ms, size):
machine = yield ms.get(lambda machine: machine.size == size)
print(f'{name} got {machine} at {env.now}')
yield env.timeout(machine.duration)
yield ms.put(machine)
print(f'{name} release {machine} at {env.now}')
env = simpy.Environment()
machine_shop = simpy.FilterStore(env, capacity=2)
machine_shop.items = [m1, m2]
user = [env.process(user(i, env, machine_shop, (i % 2) + 1)) for i in range(3)]
env.run()
0 got Machine(size=1, duration=2) at 0
1 got Machine(size=2, duration=1) at 0
1 release Machine(size=2, duration=1) at 1
0 release Machine(size=1, duration=2) at 2
2 got Machine(size=1, duration=2) at 2
2 release Machine(size=1, duration=2) at 4
PriorityStore
建模不同优先级的队列,下面例子中,inspector发现并记录事件,maintainer按优先级修复事件
import simpy
env = simpy.Environment()
issues = simpy.PriorityStore(env)
def inspector(env, issues):
for issue in [simpy.PriorityItem('P2', '#0001'),
simpy.PriorityItem('P0', '#0001'),
simpy.PriorityItem('P3', '#0002'),
simpy.PriorityItem('P1', '#0003')]:
yield env.timeout(1)
print(f'{env.now} log {issue}')
yield issues.put(issue)
def maintainer(env, issues):
while True:
yield env.timeout(3)
issue = yield issues.get()
print(f'{env.now} repair {issue}')
_ = env.process(inspector(env, issues))
_ = env.process(maintainer(env, issues))
env.run()
/Users/bujibujibiu/PycharmProjects/w1/venv/bin/python /Users/bujibujibiu/PycharmProjects/w1/1111111.py
1 log PriorityItem(priority='P2', item='#0001')
2 log PriorityItem(priority='P0', item='#0001')
3 log PriorityItem(priority='P3', item='#0002')
3 repair PriorityItem(priority='P0', item='#0001')
4 log PriorityItem(priority='P1', item='#0003')
6 repair PriorityItem(priority='P1', item='#0003')
9 repair PriorityItem(priority='P2', item='#0001')
12 repair PriorityItem(priority='P3', item='#0002')
6.Real-time simulations
前文中的simpy.Environment()
时间推进是仿真环境内部在推进,实际上的时间并没有经过这么长,但是如果想要将仿真中的时间和现实的时间结合起来,比如仿真过程中存在人机交互,想要分析一个算法的实时性能等,就需要用到实时模拟,只需要将simpy的Environment 变成simpy.rt.RealtimeEnvironment.
除了初始时间,还有两个附加的参数,factor和strict。RealtimeEnvironment(initial_time=0, factor=1.0, strict=True)
- factor
factor定义了仿真时间的每个步骤所经过的实际时间,默认是一秒,如果factor=0.1
,单位仿真时间等于1/10秒,如果factor=60
,会花费1分钟。
下面是一个简单的例子,将正常的模拟变成实时模拟,每个仿真时间单位的持续时间为十分之一秒。
首先使用simpy.Environment()
,可以看到仿真里的环境是1,但是实际上现实的时间并没有持续1
import time
import simpy
def example(env):
start = time.perf_counter()
yield env.timeout(1)
end = time.perf_counter()
print('duration of one simulation time unit:%.2f' %(end - start))
env = simpy.Environment()
proc = env.process(example(env))
env.run(until=proc)
>>>duration of one simulation time unit:0.00
### env.now = 1
接下来换成simpy.rt.RealtimeEnvironment()
,设置factor=60,再运行代码就会发现程序真的等了1分钟才出结果,然而在仿真环境中时间仍然为1
env = simpy.rt.RealtimeEnvironment(factor=60)
proc = env.process(example(env))
env.run(until=proc)
>>>duration of one simulation time unit:60.01
### env.now = 1
- strict
如果把strict设置为True
,当仿真时间所需时间超过真实时间时,step()
和run()
会提示RuntimeError
。下面例子中env时间1=现实时间0.01,但是time.sleep(0.02)
让现实时间已经到了0.02,这时仿真时间就落后于现实时间,提示时间错误。
def example(env):
time.sleep(0.02)
yield env.timeout(1)
env = simpy.rt.RealtimeEnvironment(factor=0.01)
proc = env.process(example(env))
try:
env.run(until=proc)
except RuntimeError:
print('simulation is slow')
>>>simulation is slow
如果即便出现这种情况也不用报错,就可以将strict设置为False
env = simpy.rt.RealtimeEnvironment(factor=0.01, strict=False)
proc = env.process(example(env))
try:
env.run(until=proc)
print('right')
except RuntimeError:
print('simulation is slow')
>>>right
7.Monitoring
在开始监测你的仿真模型前,需要先回答以下问题
(1)你想要监测哪些内容:进程?资源使用情况?追踪所有的仿真事件?
(2)你什么时候去监测:在预先定义的时间间隔?当某事发生的时候?
(3)你如何存储监测的数据:存储在列表里?log到文件中?写进数据库?
7.1 Monitoring your processes
监测进程比较简单,因为进程是自己代码写出来的,比较普遍的要求是记录某个变量变化的值,简单方法是用列表存储,下面代码中使用列表data
存储了每次
data = []
def test(env, data):
val = 0
for i in range(5):
val += env.now
data.append(val)
yield env.timeout(1)
env = simpy.Environment()
proc = env.process(test(env, data))
env.run(proc)
print(data)
>>>[0, 1, 3, 6, 10]
7.2 Resource usage
7.3 Event tracing
上面两节内容暂未理解清楚,待更新
8.Time and Scheduling
这部分内容是为了能深入理解时间在simpy里面如何流动,以及它如何调度和处理事件
8.1 What is time?
维基百科定义:时间是存在的无限持续进步和事件,这些事件显然是不可逆转的,从过去到现在再到未来。时间是各种测量的组成部分,用于对事件进行排序,比较事件的持续时间或它们之间的间隔,并量化物质现实或意识体验中数量的变化率。时间通常被称为第四维,以及三个空间维度
8.2 What’s the problem with it?
通常来说,真实世界里的时间好像会碰巧的同一时刻发生,但是实际上发生时间是有轻微差别的,这里有一个明显的例子,Alice 和 Bob 在同一天出生,如果你的时间以天为单位,纳闷生日时间是同时发生的,如果你以分钟为单位,那么Alice 实际上出生在0:42 而Bob是在11:14。
在电脑上模拟有相似的问题,在整数(浮点数)都是离散的数字,在它们之间有很多其它的小数,尽管现实世界里时间先后发生(0.1和0.2),但是扩大规模到整数,也许就是相同的时间。
simpy是一个单线程,确定性的库,它按顺序处理时间,一个接一个,如果两个事件在同一时刻调度,先调度的那个时间会先加工(FIFO)。这一点对于理解非常重要,你建立的仿真世界也许需要平行运行,但是在CPU上仿真,事件是按照顺序加工的,如果你多次运行模拟,就会得到一样的结果
- 真实世界,通常不会有完全同时刻
- 时间标量的离散化会让事件看起来是在同一时刻发生
- simpy一个接一个处理事件,即使它们有相同的时间
8.3 SimPy Events and time
回顾之前的内容,一个event有几下几种状态:
- 未触发:没有在event队列里
- 触发:t时刻被调度,进入event队列中
- 处理中:从event队列中移除
simpy的队列使用堆队列head queue
,堆是二叉树,每个父节点的值都小于等于其任何子节点。因此如果我们将event以元组(t,event)
的方式插入队列中,队列中的第一个元素将是t最小的那个,也就是下一个要处理的事件。
但是,如果某一时刻两个事件都被调度了,为了解决这个问题,可以使用(t,eid,event)
来存储事件,eid是事件编号。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)