Running SimPy Until a Condition: Flexible Termination
Sometimes you don't know when to stop. Run until the queue is empty. Until 1000 customers are served. Until something interesting happens.
Basic Termination Methods
Run Until Time
env.run(until=100) # Stop at time 100
Run Until Process Completes
env.run() # Run until all processes finish
Run Until Event
stop_event = env.event()
env.run(until=stop_event)
Run Until Custom Condition
Create a monitoring process that triggers termination:
def stop_condition_monitor(env, stop_event, condition_func):
"""Monitor until condition is true, then trigger stop."""
while True:
if condition_func():
stop_event.succeed()
return
yield env.timeout(1) # Check interval
# Usage
stop_event = env.event()
env.process(stop_condition_monitor(
env, stop_event,
condition_func=lambda: len(server.queue) == 0 and server.count == 0
))
env.run(until=stop_event)
Run Until N Customers Served
class SimulationController:
def __init__(self, env, target_customers):
self.env = env
self.target = target_customers
self.served = 0
self.done = env.event()
def record_service(self):
self.served += 1
if self.served >= self.target:
self.done.succeed()
controller = SimulationController(env, target_customers=1000)
def customer(env, resource, controller):
with resource.request() as req:
yield req
yield env.timeout(5)
controller.record_service()
env.run(until=controller.done)
Run Until Queue Empty
def queue_empty_monitor(env, resource, stop_event, check_interval=1):
"""Stop when queue and servers are both empty."""
yield env.timeout(10) # Wait for warm-up
while True:
if len(resource.queue) == 0 and resource.count == 0:
stop_event.succeed()
return
yield env.timeout(check_interval)
stop = env.event()
env.process(queue_empty_monitor(env, server, stop))
env.run(until=stop)
Run Until Threshold Exceeded
def threshold_monitor(env, stats, stop_event, threshold):
"""Stop when average wait exceeds threshold."""
while True:
yield env.timeout(10)
if stats.wait_times:
avg_wait = sum(stats.wait_times) / len(stats.wait_times)
if avg_wait > threshold:
print(f"Threshold exceeded at {env.now}: avg_wait = {avg_wait:.2f}")
stop_event.succeed()
return
Combining Conditions
Stop on any of multiple conditions:
def multi_condition_monitor(env, conditions, stop_event):
"""Stop when any condition becomes true."""
while True:
for name, check_func in conditions.items():
if check_func():
print(f"Stopping: {name}")
stop_event.succeed()
return
yield env.timeout(1)
conditions = {
'time_limit': lambda: env.now >= 1000,
'customers_served': lambda: controller.served >= 500,
'queue_exploded': lambda: len(server.queue) > 100
}
env.process(multi_condition_monitor(env, conditions, stop_event))
Graceful Shutdown
Let in-progress work complete:
def graceful_shutdown(env, resource, stop_event, grace_period=50):
"""Signal stop, then wait for work to complete."""
# Wait for stop signal
yield stop_event
print(f"Shutdown initiated at {env.now}")
shutdown_started = env.now
# Wait for queue to drain
while len(resource.queue) > 0 or resource.count > 0:
if env.now - shutdown_started > grace_period:
print("Grace period expired, forcing shutdown")
break
yield env.timeout(1)
print(f"Shutdown complete at {env.now}")
Step-by-Step Execution
Run one event at a time:
env = simpy.Environment()
# ... setup processes
while True:
try:
env.step()
print(f"Time now: {env.now}")
if some_condition():
break
except simpy.EmptySchedule:
break # No more events
Using peek()
Check next event without advancing:
while env.peek() < 100: # Next event before time 100
env.step()
# Or check if anything is scheduled
if env.peek() != float('inf'):
env.run(until=env.peek() + 1)
Timeout Fallback
Always have a maximum run time:
def run_with_limit(env, stop_condition, max_time=10000):
"""Run until condition or max_time, whichever comes first."""
stop = env.event()
timeout = env.timeout(max_time)
env.process(condition_monitor(env, stop_condition, stop))
result = yield stop | timeout
if stop in result:
print("Condition met")
else:
print("Max time reached")
Complete Example
import simpy
import random
class TerminationController:
def __init__(self, env, config):
self.env = env
self.config = config
self.served = 0
self.stop_event = env.event()
self.stop_reason = None
def record_service(self):
self.served += 1
self.check_conditions()
def check_conditions(self):
if self.stop_event.triggered:
return
# Check various conditions
if self.served >= self.config.get('max_customers', float('inf')):
self.stop('max_customers_reached')
elif self.env.now >= self.config.get('max_time', float('inf')):
self.stop('max_time_reached')
def stop(self, reason):
if not self.stop_event.triggered:
self.stop_reason = reason
self.stop_event.succeed()
def time_checker(self):
while not self.stop_event.triggered:
yield self.env.timeout(10)
self.check_conditions()
def run_simulation(config):
env = simpy.Environment()
server = simpy.Resource(env, capacity=config['servers'])
controller = TerminationController(env, config)
def customer(name):
with server.request() as req:
yield req
yield env.timeout(random.expovariate(1/5))
controller.record_service()
def arrivals():
i = 0
while not controller.stop_event.triggered:
yield env.timeout(random.expovariate(1/4))
env.process(customer(f"C{i}"))
i += 1
env.process(arrivals())
env.process(controller.time_checker())
env.run(until=controller.stop_event)
return {
'stop_reason': controller.stop_reason,
'customers_served': controller.served,
'final_time': env.now
}
# Run with different termination conditions
result = run_simulation({
'servers': 2,
'max_customers': 500,
'max_time': 1000
})
print(result)
Summary
Flexible termination: - Use events for custom stop conditions - Monitor processes to check conditions - Combine multiple conditions with AnyOf - Always have a fallback max time - Consider graceful shutdown for in-progress work
Stop when it makes sense, not just when time runs out.
Next Steps
Discover the Power of Simulation
Want to become a go-to expert in simulation with Python? The Complete Simulation Bootcamp will show you how simulation can transform your career and your projects.
Explore the Bootcamp