Running Multiple Processes in SimPy: Concurrency Made Simple

Real systems have many things happening at once. Customers shopping. Machines running. Trucks delivering. SimPy handles them all.

Adding Multiple Processes

import simpy

def customer(env, name):
    print(f"{name} starts at {env.now}")
    yield env.timeout(5)
    print(f"{name} finishes at {env.now}")

env = simpy.Environment()
env.process(customer(env, "Alice"))
env.process(customer(env, "Bob"))
env.process(customer(env, "Charlie"))
env.run()

All three run "concurrently" in simulation time. They all start at time 0 and finish at time 5.

Dynamic Process Creation

Create processes during simulation:

def customer_generator(env, server):
    i = 0
    while True:
        yield env.timeout(random.expovariate(1/5))
        env.process(customer(env, f"Customer{i}", server))
        i += 1

env.process(customer_generator(env, server))

Processes spawn processes. That's how arrivals work.

Process Classes

For complex entities with state:

class Machine:
    def __init__(self, env, name, capacity):
        self.env = env
        self.name = name
        self.parts_made = 0
        self.process = env.process(self.run())

    def run(self):
        while True:
            yield self.env.timeout(random.uniform(3, 7))
            self.parts_made += 1
            print(f"{self.name} made part #{self.parts_made} at {self.env.now}")

env = simpy.Environment()
machines = [Machine(env, f"Machine{i}", 1) for i in range(5)]
env.run(until=50)

for m in machines:
    print(f"{m.name}: {m.parts_made} parts")

Waiting for Multiple Processes

Wait for a process to complete:

def task(env, duration):
    yield env.timeout(duration)
    return duration

def coordinator(env):
    # Start tasks
    task1 = env.process(task(env, 5))
    task2 = env.process(task(env, 10))
    task3 = env.process(task(env, 3))

    # Wait for all
    yield task1 & task2 & task3
    print(f"All tasks done at {env.now}")

    # Or wait for any
    result = yield task1 | task2 | task3
    print(f"First task done at {env.now}")

Parallel Workers

Multiple workers processing from a shared queue:

import simpy

def worker(env, name, job_store):
    while True:
        job = yield job_store.get()
        print(f"{name} starts {job['id']} at {env.now}")
        yield env.timeout(job['duration'])
        print(f"{name} finishes {job['id']} at {env.now}")

def job_generator(env, job_store):
    for i in range(20):
        job = {'id': i, 'duration': random.uniform(2, 8)}
        yield job_store.put(job)
        yield env.timeout(random.uniform(0.5, 2))

env = simpy.Environment()
jobs = simpy.Store(env)

# Create 3 parallel workers
for i in range(3):
    env.process(worker(env, f"Worker{i}", jobs))

env.process(job_generator(env, jobs))
env.run(until=100)

Process Communication

Via Shared Resources

buffer = simpy.Store(env, capacity=10)

def producer(env, buffer):
    while True:
        yield env.timeout(2)
        yield buffer.put("item")

def consumer(env, buffer):
    while True:
        item = yield buffer.get()
        yield env.timeout(3)

Via Events

def waiter(env, signal):
    print(f"Waiting at {env.now}")
    yield signal
    print(f"Signal received at {env.now}")

def signaller(env, signal):
    yield env.timeout(5)
    signal.succeed()

signal = env.event()
env.process(waiter(env, signal))
env.process(signaller(env, signal))

Via Shared State

class SharedState:
    def __init__(self):
        self.counter = 0

state = SharedState()

def incrementer(env, state):
    while True:
        yield env.timeout(1)
        state.counter += 1

def reader(env, state):
    while True:
        yield env.timeout(5)
        print(f"Counter: {state.counter}")

Process Pipelines

Chain of processing stages:

def stage(env, name, input_store, output_store, process_time):
    while True:
        item = yield input_store.get()
        yield env.timeout(process_time)
        if output_store:
            yield output_store.put(item)
        print(f"{name} processed {item} at {env.now}")

# Create pipeline
stage1_out = simpy.Store(env)
stage2_out = simpy.Store(env)

env.process(stage(env, "Stage1", raw_materials, stage1_out, 3))
env.process(stage(env, "Stage2", stage1_out, stage2_out, 5))
env.process(stage(env, "Stage3", stage2_out, None, 2))

Managing Many Processes

class ProcessManager:
    def __init__(self, env):
        self.env = env
        self.processes = {}

    def start(self, name, process_func, *args):
        proc = self.env.process(process_func(self.env, *args))
        self.processes[name] = proc
        return proc

    def stop(self, name):
        if name in self.processes:
            proc = self.processes[name]
            if proc.is_alive:
                proc.interrupt()
            del self.processes[name]

    def status(self):
        return {
            name: 'alive' if proc.is_alive else 'finished'
            for name, proc in self.processes.items()
        }

# Usage
manager = ProcessManager(env)
manager.start('worker1', worker_func, server)
manager.start('worker2', worker_func, server)
print(manager.status())

Scaling Up

SimPy handles thousands of processes efficiently:

import simpy
import random
import time

def simple_process(env, name):
    while True:
        yield env.timeout(random.expovariate(1))

env = simpy.Environment()

start = time.time()
for i in range(10000):
    env.process(simple_process(env, f"P{i}"))

env.run(until=1000)
elapsed = time.time() - start
print(f"10000 processes, 1000 time units: {elapsed:.2f} seconds")

Processes are lightweight—they're just generators.

Common Patterns

Worker Pool

def worker_pool(env, num_workers, job_queue, results):
    def worker(worker_id):
        while True:
            job = yield job_queue.get()
            yield env.timeout(job['duration'])
            results.append({'job': job['id'], 'worker': worker_id})

    for i in range(num_workers):
        env.process(worker(i))

Supervisor Process

def supervisor(env, workers):
    """Monitor workers and restart if they fail."""
    while True:
        yield env.timeout(10)
        for name, proc in workers.items():
            if not proc.is_alive:
                print(f"Restarting {name}")
                workers[name] = env.process(worker_func(env))

Batch Processing

def batch_processor(env, input_queue, batch_size=10):
    batch = []
    while True:
        item = yield input_queue.get()
        batch.append(item)
        if len(batch) >= batch_size:
            yield env.timeout(process_batch_time)
            batch = []

Gotchas

Process Reference

Keep a reference if you need to wait or interrupt:

# Lost reference - can't wait for it
env.process(task(env))

# Keep reference
proc = env.process(task(env))
yield proc  # Wait for completion

Too Many Processes

While SimPy is efficient, be mindful:

# Be careful with unbounded creation
while True:
    yield env.timeout(0.001)  # Creates 1000 processes per time unit!
    env.process(short_task(env))

Summary

Multiple processes: - Add with env.process() - Create dynamically during simulation - Communicate via stores, events, or shared state - Wait for multiple with & (all) or | (any) - Scale to thousands efficiently

Concurrency in simulation. No threads required.

Next Steps


Discover the Power of Simulation

Want to become a go-to expert in simulation with Python? The Complete Simulation Bootcamp will show you how simulation can transform your career and your projects.

Explore the Bootcamp