Running Multiple Processes in SimPy: Concurrency Made Simple
Real systems have many things happening at once. Customers shopping. Machines running. Trucks delivering. SimPy handles them all.
Adding Multiple Processes
import simpy
def customer(env, name):
print(f"{name} starts at {env.now}")
yield env.timeout(5)
print(f"{name} finishes at {env.now}")
env = simpy.Environment()
env.process(customer(env, "Alice"))
env.process(customer(env, "Bob"))
env.process(customer(env, "Charlie"))
env.run()
All three run "concurrently" in simulation time. They all start at time 0 and finish at time 5.
Dynamic Process Creation
Create processes during simulation:
def customer_generator(env, server):
i = 0
while True:
yield env.timeout(random.expovariate(1/5))
env.process(customer(env, f"Customer{i}", server))
i += 1
env.process(customer_generator(env, server))
Processes spawn processes. That's how arrivals work.
Process Classes
For complex entities with state:
class Machine:
def __init__(self, env, name, capacity):
self.env = env
self.name = name
self.parts_made = 0
self.process = env.process(self.run())
def run(self):
while True:
yield self.env.timeout(random.uniform(3, 7))
self.parts_made += 1
print(f"{self.name} made part #{self.parts_made} at {self.env.now}")
env = simpy.Environment()
machines = [Machine(env, f"Machine{i}", 1) for i in range(5)]
env.run(until=50)
for m in machines:
print(f"{m.name}: {m.parts_made} parts")
Waiting for Multiple Processes
Wait for a process to complete:
def task(env, duration):
yield env.timeout(duration)
return duration
def coordinator(env):
# Start tasks
task1 = env.process(task(env, 5))
task2 = env.process(task(env, 10))
task3 = env.process(task(env, 3))
# Wait for all
yield task1 & task2 & task3
print(f"All tasks done at {env.now}")
# Or wait for any
result = yield task1 | task2 | task3
print(f"First task done at {env.now}")
Parallel Workers
Multiple workers processing from a shared queue:
import simpy
def worker(env, name, job_store):
while True:
job = yield job_store.get()
print(f"{name} starts {job['id']} at {env.now}")
yield env.timeout(job['duration'])
print(f"{name} finishes {job['id']} at {env.now}")
def job_generator(env, job_store):
for i in range(20):
job = {'id': i, 'duration': random.uniform(2, 8)}
yield job_store.put(job)
yield env.timeout(random.uniform(0.5, 2))
env = simpy.Environment()
jobs = simpy.Store(env)
# Create 3 parallel workers
for i in range(3):
env.process(worker(env, f"Worker{i}", jobs))
env.process(job_generator(env, jobs))
env.run(until=100)
Process Communication
Via Shared Resources
buffer = simpy.Store(env, capacity=10)
def producer(env, buffer):
while True:
yield env.timeout(2)
yield buffer.put("item")
def consumer(env, buffer):
while True:
item = yield buffer.get()
yield env.timeout(3)
Via Events
def waiter(env, signal):
print(f"Waiting at {env.now}")
yield signal
print(f"Signal received at {env.now}")
def signaller(env, signal):
yield env.timeout(5)
signal.succeed()
signal = env.event()
env.process(waiter(env, signal))
env.process(signaller(env, signal))
Via Shared State
class SharedState:
def __init__(self):
self.counter = 0
state = SharedState()
def incrementer(env, state):
while True:
yield env.timeout(1)
state.counter += 1
def reader(env, state):
while True:
yield env.timeout(5)
print(f"Counter: {state.counter}")
Process Pipelines
Chain of processing stages:
def stage(env, name, input_store, output_store, process_time):
while True:
item = yield input_store.get()
yield env.timeout(process_time)
if output_store:
yield output_store.put(item)
print(f"{name} processed {item} at {env.now}")
# Create pipeline
stage1_out = simpy.Store(env)
stage2_out = simpy.Store(env)
env.process(stage(env, "Stage1", raw_materials, stage1_out, 3))
env.process(stage(env, "Stage2", stage1_out, stage2_out, 5))
env.process(stage(env, "Stage3", stage2_out, None, 2))
Managing Many Processes
class ProcessManager:
def __init__(self, env):
self.env = env
self.processes = {}
def start(self, name, process_func, *args):
proc = self.env.process(process_func(self.env, *args))
self.processes[name] = proc
return proc
def stop(self, name):
if name in self.processes:
proc = self.processes[name]
if proc.is_alive:
proc.interrupt()
del self.processes[name]
def status(self):
return {
name: 'alive' if proc.is_alive else 'finished'
for name, proc in self.processes.items()
}
# Usage
manager = ProcessManager(env)
manager.start('worker1', worker_func, server)
manager.start('worker2', worker_func, server)
print(manager.status())
Scaling Up
SimPy handles thousands of processes efficiently:
import simpy
import random
import time
def simple_process(env, name):
while True:
yield env.timeout(random.expovariate(1))
env = simpy.Environment()
start = time.time()
for i in range(10000):
env.process(simple_process(env, f"P{i}"))
env.run(until=1000)
elapsed = time.time() - start
print(f"10000 processes, 1000 time units: {elapsed:.2f} seconds")
Processes are lightweight—they're just generators.
Common Patterns
Worker Pool
def worker_pool(env, num_workers, job_queue, results):
def worker(worker_id):
while True:
job = yield job_queue.get()
yield env.timeout(job['duration'])
results.append({'job': job['id'], 'worker': worker_id})
for i in range(num_workers):
env.process(worker(i))
Supervisor Process
def supervisor(env, workers):
"""Monitor workers and restart if they fail."""
while True:
yield env.timeout(10)
for name, proc in workers.items():
if not proc.is_alive:
print(f"Restarting {name}")
workers[name] = env.process(worker_func(env))
Batch Processing
def batch_processor(env, input_queue, batch_size=10):
batch = []
while True:
item = yield input_queue.get()
batch.append(item)
if len(batch) >= batch_size:
yield env.timeout(process_batch_time)
batch = []
Gotchas
Process Reference
Keep a reference if you need to wait or interrupt:
# Lost reference - can't wait for it
env.process(task(env))
# Keep reference
proc = env.process(task(env))
yield proc # Wait for completion
Too Many Processes
While SimPy is efficient, be mindful:
# Be careful with unbounded creation
while True:
yield env.timeout(0.001) # Creates 1000 processes per time unit!
env.process(short_task(env))
Summary
Multiple processes:
- Add with env.process()
- Create dynamically during simulation
- Communicate via stores, events, or shared state
- Wait for multiple with & (all) or | (any)
- Scale to thousands efficiently
Concurrency in simulation. No threads required.
Next Steps
Discover the Power of Simulation
Want to become a go-to expert in simulation with Python? The Complete Simulation Bootcamp will show you how simulation can transform your career and your projects.
Explore the Bootcamp