Production Line Simulation with SimPy: Flow, Buffers, and Bottlenecks

Production lines are sequences of stations connected by buffers. Material flows from start to finish. Bottlenecks determine throughput. Simulation finds them.

The Production Line Model

Key elements: - Stations - Work centers that process items - Buffers - Storage between stations - Parts - Items flowing through the line - Cycle times - Time at each station - Breakdowns - Unplanned stops

Basic Serial Line

import simpy
import random

class ProductionLine:
    def __init__(self, env, station_times, buffer_sizes):
        self.env = env
        self.num_stations = len(station_times)
        self.station_times = station_times

        # Stations
        self.stations = [
            simpy.Resource(env, capacity=1)
            for _ in range(self.num_stations)
        ]

        # Buffers between stations
        self.buffers = [
            simpy.Container(env, capacity=size, init=0)
            for size in buffer_sizes
        ]

        self.parts_completed = 0

    def part(self, part_id):
        """Part flows through all stations."""
        for i in range(self.num_stations):
            # Take from upstream buffer (except first station)
            if i > 0:
                yield self.buffers[i-1].get(1)

            # Process at station
            with self.stations[i].request() as req:
                yield req
                process_time = random.expovariate(1/self.station_times[i])
                yield self.env.timeout(process_time)

            # Put in downstream buffer (except last station)
            if i < len(self.buffers):
                yield self.buffers[i].put(1)

        self.parts_completed += 1

def part_source(env, line, arrival_rate):
    """Generate parts arriving at the line."""
    part_id = 0
    while True:
        yield env.timeout(random.expovariate(arrival_rate))
        env.process(line.part(part_id))
        part_id += 1

# Run simulation
env = simpy.Environment()
line = ProductionLine(
    env,
    station_times=[5, 7, 4, 6],  # 4 stations
    buffer_sizes=[10, 10, 10]     # Buffers between them
)
env.process(part_source(env, line, arrival_rate=0.15))
env.run(until=1000)

print(f"Parts completed: {line.parts_completed}")
print(f"Throughput: {line.parts_completed/1000:.3f} parts/time unit")

Blocking and Starving

When buffers fill or empty:

class MonitoredLine:
    def __init__(self, env, config):
        self.env = env
        self.stations = []
        self.buffers = []
        self.stats = {
            'blocked_time': [0] * len(config['station_times']),
            'starved_time': [0] * len(config['station_times']),
            'busy_time': [0] * len(config['station_times'])
        }

        for i, time in enumerate(config['station_times']):
            self.stations.append({
                'resource': simpy.Resource(env, capacity=1),
                'process_time': time
            })

        for size in config['buffer_sizes']:
            self.buffers.append(simpy.Container(env, capacity=size, init=0))

    def station_process(self, station_idx, part_id):
        station = self.stations[station_idx]

        # Check for starving (waiting for input)
        if station_idx > 0:
            starve_start = self.env.now
            yield self.buffers[station_idx-1].get(1)
            self.stats['starved_time'][station_idx] += self.env.now - starve_start

        # Process
        with station['resource'].request() as req:
            yield req
            process_time = random.expovariate(1/station['process_time'])
            self.stats['busy_time'][station_idx] += process_time
            yield self.env.timeout(process_time)

        # Check for blocking (waiting for output space)
        if station_idx < len(self.buffers):
            block_start = self.env.now
            yield self.buffers[station_idx].put(1)
            self.stats['blocked_time'][station_idx] += self.env.now - block_start

Parallel Stations

Multiple machines at a station:

class ParallelStation:
    def __init__(self, env, name, num_machines, process_time):
        self.env = env
        self.name = name
        self.machines = simpy.Resource(env, capacity=num_machines)
        self.process_time = process_time

    def process(self, part_id):
        with self.machines.request() as req:
            yield req
            yield self.env.timeout(random.expovariate(1/self.process_time))

# Create line with parallel stations
stations = [
    ParallelStation(env, "Assembly", num_machines=3, process_time=15),
    ParallelStation(env, "Testing", num_machines=2, process_time=8),
    ParallelStation(env, "Packaging", num_machines=1, process_time=3),
]

Station with Failures

class UnreliableStation:
    def __init__(self, env, name, process_time, mttf, mttr):
        self.env = env
        self.name = name
        self.process_time = process_time
        self.mttf = mttf  # Mean time to failure
        self.mttr = mttr  # Mean time to repair
        self.machine = simpy.PreemptiveResource(env, capacity=1)
        self.broken = False
        self.parts_made = 0

        # Start failure process
        env.process(self.fail())

    def fail(self):
        while True:
            yield self.env.timeout(random.expovariate(1/self.mttf))
            if not self.broken:
                self.broken = True
                # Interrupt current job
                if self.machine.count > 0:
                    for req in self.machine.users:
                        if req.proc.is_alive:
                            req.proc.interrupt("breakdown")
                # Repair
                yield self.env.timeout(random.expovariate(1/self.mttr))
                self.broken = False

    def process(self, part_id):
        while True:
            try:
                with self.machine.request(priority=1) as req:
                    yield req
                    if not self.broken:
                        yield self.env.timeout(random.expovariate(1/self.process_time))
                        self.parts_made += 1
                        return
            except simpy.Interrupt:
                pass  # Retry after repair

Complete Production Line

import simpy
import random
import numpy as np

class FullProductionLine:
    def __init__(self, env, config):
        self.env = env
        self.config = config

        # Create stations
        self.stations = []
        for i, station_config in enumerate(config['stations']):
            self.stations.append({
                'name': station_config['name'],
                'resource': simpy.Resource(env, capacity=station_config.get('capacity', 1)),
                'process_time': station_config['process_time'],
                'setup_time': station_config.get('setup_time', 0)
            })

        # Create buffers
        self.buffers = [
            simpy.Container(env, capacity=size, init=0)
            for size in config['buffer_sizes']
        ]

        # Stats
        self.completed_parts = []
        self.buffer_log = []

    def part(self, part_id, part_type):
        """Process a part through the line."""
        start_time = self.env.now
        record = {'id': part_id, 'type': part_type, 'start': start_time}

        for i, station in enumerate(self.stations):
            # Get from upstream buffer
            if i > 0:
                yield self.buffers[i-1].get(1)

            station_start = self.env.now

            # Process at station
            with station['resource'].request() as req:
                yield req

                # Setup if needed
                if station['setup_time'] > 0:
                    yield self.env.timeout(station['setup_time'])

                # Process
                process_time = random.expovariate(1/station['process_time'])
                yield self.env.timeout(process_time)

            record[f"station_{i}_time"] = self.env.now - station_start

            # Put in downstream buffer
            if i < len(self.buffers):
                yield self.buffers[i].put(1)

        record['end'] = self.env.now
        record['total_time'] = self.env.now - start_time
        self.completed_parts.append(record)

    def part_source(self):
        """Generate parts."""
        part_id = 0
        while True:
            yield self.env.timeout(random.expovariate(self.config['arrival_rate']))
            part_type = random.choice(self.config.get('part_types', ['standard']))
            self.env.process(self.part(part_id, part_type))
            part_id += 1

    def monitor(self, interval=10):
        """Monitor buffer levels."""
        while True:
            levels = [b.level for b in self.buffers]
            self.buffer_log.append({
                'time': self.env.now,
                'levels': levels
            })
            yield self.env.timeout(interval)

    def run(self, duration):
        self.env.process(self.part_source())
        self.env.process(self.monitor())
        self.env.run(until=duration)

    def report(self):
        print("\n=== Production Line Report ===")
        print(f"Duration: {self.env.now}")
        print(f"Parts completed: {len(self.completed_parts)}")
        print(f"Throughput: {len(self.completed_parts)/self.env.now:.4f} parts/time unit")

        if self.completed_parts:
            cycle_times = [p['total_time'] for p in self.completed_parts]
            print(f"\nCycle time:")
            print(f"  Mean: {np.mean(cycle_times):.2f}")
            print(f"  Std: {np.std(cycle_times):.2f}")
            print(f"  Min: {min(cycle_times):.2f}")
            print(f"  Max: {max(cycle_times):.2f}")

            # Station times
            print("\nStation times (mean):")
            for i, station in enumerate(self.stations):
                times = [p.get(f'station_{i}_time', 0) for p in self.completed_parts]
                print(f"  {station['name']}: {np.mean(times):.2f}")

            # Buffer utilisation
            print("\nBuffer utilisation:")
            for i, _ in enumerate(self.buffers):
                levels = [log['levels'][i] for log in self.buffer_log]
                capacity = self.config['buffer_sizes'][i]
                print(f"  Buffer {i}: avg {np.mean(levels):.1f}/{capacity}")

# Configuration
config = {
    'stations': [
        {'name': 'Cut', 'process_time': 5, 'capacity': 1},
        {'name': 'Bend', 'process_time': 8, 'capacity': 1},
        {'name': 'Weld', 'process_time': 12, 'capacity': 2},
        {'name': 'Paint', 'process_time': 15, 'capacity': 1},
        {'name': 'Inspect', 'process_time': 4, 'capacity': 1},
    ],
    'buffer_sizes': [5, 10, 10, 5],
    'arrival_rate': 0.1,
    'part_types': ['standard', 'custom']
}

random.seed(42)
env = simpy.Environment()
line = FullProductionLine(env, config)
line.run(duration=1000)
line.report()

Summary

Production line simulation reveals: - Bottleneck stations - Buffer sizing requirements - Impact of variability - Effect of breakdowns - Throughput capacity

Flow follows the bottleneck. Find it. Fix it.

Next Steps


Build Professional Simulations

Break free from commercial software and learn how to build powerful, industry-standard simulations in Python. The Complete Simulation in Python with SimPy Bootcamp gives you everything you need.

Explore the Bootcamp