Production Line Simulation with SimPy: Flow, Buffers, and Bottlenecks
Production lines are sequences of stations connected by buffers. Material flows from start to finish. Bottlenecks determine throughput. Simulation finds them.
The Production Line Model
Key elements: - Stations - Work centers that process items - Buffers - Storage between stations - Parts - Items flowing through the line - Cycle times - Time at each station - Breakdowns - Unplanned stops
Basic Serial Line
import simpy
import random
class ProductionLine:
def __init__(self, env, station_times, buffer_sizes):
self.env = env
self.num_stations = len(station_times)
self.station_times = station_times
# Stations
self.stations = [
simpy.Resource(env, capacity=1)
for _ in range(self.num_stations)
]
# Buffers between stations
self.buffers = [
simpy.Container(env, capacity=size, init=0)
for size in buffer_sizes
]
self.parts_completed = 0
def part(self, part_id):
"""Part flows through all stations."""
for i in range(self.num_stations):
# Take from upstream buffer (except first station)
if i > 0:
yield self.buffers[i-1].get(1)
# Process at station
with self.stations[i].request() as req:
yield req
process_time = random.expovariate(1/self.station_times[i])
yield self.env.timeout(process_time)
# Put in downstream buffer (except last station)
if i < len(self.buffers):
yield self.buffers[i].put(1)
self.parts_completed += 1
def part_source(env, line, arrival_rate):
"""Generate parts arriving at the line."""
part_id = 0
while True:
yield env.timeout(random.expovariate(arrival_rate))
env.process(line.part(part_id))
part_id += 1
# Run simulation
env = simpy.Environment()
line = ProductionLine(
env,
station_times=[5, 7, 4, 6], # 4 stations
buffer_sizes=[10, 10, 10] # Buffers between them
)
env.process(part_source(env, line, arrival_rate=0.15))
env.run(until=1000)
print(f"Parts completed: {line.parts_completed}")
print(f"Throughput: {line.parts_completed/1000:.3f} parts/time unit")
Blocking and Starving
When buffers fill or empty:
class MonitoredLine:
def __init__(self, env, config):
self.env = env
self.stations = []
self.buffers = []
self.stats = {
'blocked_time': [0] * len(config['station_times']),
'starved_time': [0] * len(config['station_times']),
'busy_time': [0] * len(config['station_times'])
}
for i, time in enumerate(config['station_times']):
self.stations.append({
'resource': simpy.Resource(env, capacity=1),
'process_time': time
})
for size in config['buffer_sizes']:
self.buffers.append(simpy.Container(env, capacity=size, init=0))
def station_process(self, station_idx, part_id):
station = self.stations[station_idx]
# Check for starving (waiting for input)
if station_idx > 0:
starve_start = self.env.now
yield self.buffers[station_idx-1].get(1)
self.stats['starved_time'][station_idx] += self.env.now - starve_start
# Process
with station['resource'].request() as req:
yield req
process_time = random.expovariate(1/station['process_time'])
self.stats['busy_time'][station_idx] += process_time
yield self.env.timeout(process_time)
# Check for blocking (waiting for output space)
if station_idx < len(self.buffers):
block_start = self.env.now
yield self.buffers[station_idx].put(1)
self.stats['blocked_time'][station_idx] += self.env.now - block_start
Parallel Stations
Multiple machines at a station:
class ParallelStation:
def __init__(self, env, name, num_machines, process_time):
self.env = env
self.name = name
self.machines = simpy.Resource(env, capacity=num_machines)
self.process_time = process_time
def process(self, part_id):
with self.machines.request() as req:
yield req
yield self.env.timeout(random.expovariate(1/self.process_time))
# Create line with parallel stations
stations = [
ParallelStation(env, "Assembly", num_machines=3, process_time=15),
ParallelStation(env, "Testing", num_machines=2, process_time=8),
ParallelStation(env, "Packaging", num_machines=1, process_time=3),
]
Station with Failures
class UnreliableStation:
def __init__(self, env, name, process_time, mttf, mttr):
self.env = env
self.name = name
self.process_time = process_time
self.mttf = mttf # Mean time to failure
self.mttr = mttr # Mean time to repair
self.machine = simpy.PreemptiveResource(env, capacity=1)
self.broken = False
self.parts_made = 0
# Start failure process
env.process(self.fail())
def fail(self):
while True:
yield self.env.timeout(random.expovariate(1/self.mttf))
if not self.broken:
self.broken = True
# Interrupt current job
if self.machine.count > 0:
for req in self.machine.users:
if req.proc.is_alive:
req.proc.interrupt("breakdown")
# Repair
yield self.env.timeout(random.expovariate(1/self.mttr))
self.broken = False
def process(self, part_id):
while True:
try:
with self.machine.request(priority=1) as req:
yield req
if not self.broken:
yield self.env.timeout(random.expovariate(1/self.process_time))
self.parts_made += 1
return
except simpy.Interrupt:
pass # Retry after repair
Complete Production Line
import simpy
import random
import numpy as np
class FullProductionLine:
def __init__(self, env, config):
self.env = env
self.config = config
# Create stations
self.stations = []
for i, station_config in enumerate(config['stations']):
self.stations.append({
'name': station_config['name'],
'resource': simpy.Resource(env, capacity=station_config.get('capacity', 1)),
'process_time': station_config['process_time'],
'setup_time': station_config.get('setup_time', 0)
})
# Create buffers
self.buffers = [
simpy.Container(env, capacity=size, init=0)
for size in config['buffer_sizes']
]
# Stats
self.completed_parts = []
self.buffer_log = []
def part(self, part_id, part_type):
"""Process a part through the line."""
start_time = self.env.now
record = {'id': part_id, 'type': part_type, 'start': start_time}
for i, station in enumerate(self.stations):
# Get from upstream buffer
if i > 0:
yield self.buffers[i-1].get(1)
station_start = self.env.now
# Process at station
with station['resource'].request() as req:
yield req
# Setup if needed
if station['setup_time'] > 0:
yield self.env.timeout(station['setup_time'])
# Process
process_time = random.expovariate(1/station['process_time'])
yield self.env.timeout(process_time)
record[f"station_{i}_time"] = self.env.now - station_start
# Put in downstream buffer
if i < len(self.buffers):
yield self.buffers[i].put(1)
record['end'] = self.env.now
record['total_time'] = self.env.now - start_time
self.completed_parts.append(record)
def part_source(self):
"""Generate parts."""
part_id = 0
while True:
yield self.env.timeout(random.expovariate(self.config['arrival_rate']))
part_type = random.choice(self.config.get('part_types', ['standard']))
self.env.process(self.part(part_id, part_type))
part_id += 1
def monitor(self, interval=10):
"""Monitor buffer levels."""
while True:
levels = [b.level for b in self.buffers]
self.buffer_log.append({
'time': self.env.now,
'levels': levels
})
yield self.env.timeout(interval)
def run(self, duration):
self.env.process(self.part_source())
self.env.process(self.monitor())
self.env.run(until=duration)
def report(self):
print("\n=== Production Line Report ===")
print(f"Duration: {self.env.now}")
print(f"Parts completed: {len(self.completed_parts)}")
print(f"Throughput: {len(self.completed_parts)/self.env.now:.4f} parts/time unit")
if self.completed_parts:
cycle_times = [p['total_time'] for p in self.completed_parts]
print(f"\nCycle time:")
print(f" Mean: {np.mean(cycle_times):.2f}")
print(f" Std: {np.std(cycle_times):.2f}")
print(f" Min: {min(cycle_times):.2f}")
print(f" Max: {max(cycle_times):.2f}")
# Station times
print("\nStation times (mean):")
for i, station in enumerate(self.stations):
times = [p.get(f'station_{i}_time', 0) for p in self.completed_parts]
print(f" {station['name']}: {np.mean(times):.2f}")
# Buffer utilisation
print("\nBuffer utilisation:")
for i, _ in enumerate(self.buffers):
levels = [log['levels'][i] for log in self.buffer_log]
capacity = self.config['buffer_sizes'][i]
print(f" Buffer {i}: avg {np.mean(levels):.1f}/{capacity}")
# Configuration
config = {
'stations': [
{'name': 'Cut', 'process_time': 5, 'capacity': 1},
{'name': 'Bend', 'process_time': 8, 'capacity': 1},
{'name': 'Weld', 'process_time': 12, 'capacity': 2},
{'name': 'Paint', 'process_time': 15, 'capacity': 1},
{'name': 'Inspect', 'process_time': 4, 'capacity': 1},
],
'buffer_sizes': [5, 10, 10, 5],
'arrival_rate': 0.1,
'part_types': ['standard', 'custom']
}
random.seed(42)
env = simpy.Environment()
line = FullProductionLine(env, config)
line.run(duration=1000)
line.report()
Summary
Production line simulation reveals: - Bottleneck stations - Buffer sizing requirements - Impact of variability - Effect of breakdowns - Throughput capacity
Flow follows the bottleneck. Find it. Fix it.
Next Steps
Build Professional Simulations
Break free from commercial software and learn how to build powerful, industry-standard simulations in Python. The Complete Simulation in Python with SimPy Bootcamp gives you everything you need.
Explore the Bootcamp