Collecting Statistics in SimPy: Measuring What Matters

A simulation without statistics is just an animation. You need numbers. Trends. Distributions. Evidence.

What to Collect

Every simulation should track:

  1. Wait times - How long entities wait
  2. Queue lengths - How many waiting at each moment
  3. Utilisation - How busy resources are
  4. Throughput - How many completed per time unit
  5. Cycle time - Total time in system

Basic Collection Pattern

class Statistics:
    def __init__(self):
        self.wait_times = []
        self.service_times = []
        self.total_times = []

    def record(self, wait, service):
        self.wait_times.append(wait)
        self.service_times.append(service)
        self.total_times.append(wait + service)

stats = Statistics()

def customer(env, resource):
    arrival = env.now

    with resource.request() as req:
        yield req
        wait = env.now - arrival

        service = random.expovariate(1/5)
        yield env.timeout(service)

        stats.record(wait, service)

Time-Series Collection

Track how things change over time:

class TimeSeriesStats:
    def __init__(self):
        self.queue_log = []
        self.utilisation_log = []

    def monitor(self, env, resource, interval=1):
        while True:
            self.queue_log.append({
                'time': env.now,
                'queue_length': len(resource.queue),
                'in_service': resource.count
            })
            self.utilisation_log.append({
                'time': env.now,
                'utilisation': resource.count / resource.capacity
            })
            yield env.timeout(interval)

stats = TimeSeriesStats()
env.process(stats.monitor(env, server))

Summary Statistics

def summarise(stats):
    waits = stats.wait_times

    return {
        'count': len(waits),
        'mean_wait': sum(waits) / len(waits),
        'median_wait': sorted(waits)[len(waits) // 2],
        'max_wait': max(waits),
        'min_wait': min(waits),
        'p90_wait': sorted(waits)[int(0.9 * len(waits))],
        'p95_wait': sorted(waits)[int(0.95 * len(waits))],
        'p99_wait': sorted(waits)[int(0.99 * len(waits))],
    }

Using pandas

For serious analysis:

import pandas as pd

class PandasStats:
    def __init__(self):
        self.records = []

    def record(self, customer_id, arrival, service_start, service_end):
        self.records.append({
            'customer_id': customer_id,
            'arrival': arrival,
            'service_start': service_start,
            'service_end': service_end,
            'wait': service_start - arrival,
            'service': service_end - service_start,
            'total': service_end - arrival
        })

    def to_dataframe(self):
        return pd.DataFrame(self.records)

# After simulation
df = stats.to_dataframe()
print(df.describe())
print(df['wait'].quantile([0.5, 0.9, 0.95, 0.99]))

Queue Length Distribution

def queue_length_distribution(queue_log):
    """Calculate time spent at each queue length."""
    df = pd.DataFrame(queue_log)
    df['duration'] = df['time'].diff().shift(-1).fillna(0)

    distribution = df.groupby('queue_length')['duration'].sum()
    total_time = distribution.sum()

    return distribution / total_time  # Proportion of time at each length

Utilisation Calculation

Proper utilisation accounting:

class UtilisationTracker:
    def __init__(self, env, resource):
        self.env = env
        self.resource = resource
        self.busy_time = 0
        self.last_update = 0
        self.last_count = 0

    def update(self):
        elapsed = self.env.now - self.last_update
        self.busy_time += self.last_count * elapsed
        self.last_update = self.env.now
        self.last_count = self.resource.count

    def monitor(self, interval=1):
        while True:
            self.update()
            yield self.env.timeout(interval)

    def utilisation(self):
        self.update()
        capacity_time = self.env.now * self.resource.capacity
        return self.busy_time / capacity_time if capacity_time > 0 else 0

Throughput

class ThroughputTracker:
    def __init__(self):
        self.completions = []

    def record_completion(self, time):
        self.completions.append(time)

    def throughput(self, window_size=100):
        """Throughput in last window_size time units."""
        if not self.completions:
            return 0
        recent = [t for t in self.completions if t > self.completions[-1] - window_size]
        return len(recent) / window_size

By Entity Type

Track different entity types separately:

class SegmentedStats:
    def __init__(self):
        self.by_type = {}

    def record(self, entity_type, wait, service):
        if entity_type not in self.by_type:
            self.by_type[entity_type] = {'waits': [], 'services': []}
        self.by_type[entity_type]['waits'].append(wait)
        self.by_type[entity_type]['services'].append(service)

    def summary_by_type(self):
        results = {}
        for entity_type, data in self.by_type.items():
            waits = data['waits']
            results[entity_type] = {
                'count': len(waits),
                'avg_wait': sum(waits) / len(waits) if waits else 0
            }
        return results

# Usage
def customer(env, resource, customer_type, stats):
    arrival = env.now
    with resource.request() as req:
        yield req
        wait = env.now - arrival
        service = random.expovariate(1/5)
        yield env.timeout(service)
        stats.record(customer_type, wait, service)

Exporting Results

# To CSV
df = stats.to_dataframe()
df.to_csv('simulation_results.csv', index=False)

# To JSON
import json
with open('summary.json', 'w') as f:
    json.dump(summarise(stats), f, indent=2)

# To Excel
df.to_excel('results.xlsx', index=False)

Complete Statistics Class

class SimulationStatistics:
    def __init__(self, env, resources):
        self.env = env
        self.resources = resources
        self.entity_records = []
        self.time_series = []

    def record_entity(self, entity_id, entity_type, arrival, service_start, departure):
        self.entity_records.append({
            'entity_id': entity_id,
            'type': entity_type,
            'arrival': arrival,
            'service_start': service_start,
            'departure': departure,
            'wait': service_start - arrival,
            'service': departure - service_start,
            'total': departure - arrival
        })

    def monitor(self, interval=1):
        while True:
            record = {'time': self.env.now}
            for name, resource in self.resources.items():
                record[f'{name}_queue'] = len(resource.queue)
                record[f'{name}_busy'] = resource.count
                record[f'{name}_utilisation'] = resource.count / resource.capacity
            self.time_series.append(record)
            yield self.env.timeout(interval)

    def entity_dataframe(self):
        return pd.DataFrame(self.entity_records)

    def time_series_dataframe(self):
        return pd.DataFrame(self.time_series)

    def summary(self):
        df = self.entity_dataframe()
        return {
            'total_entities': len(df),
            'mean_wait': df['wait'].mean(),
            'median_wait': df['wait'].median(),
            'p95_wait': df['wait'].quantile(0.95),
            'max_wait': df['wait'].max(),
            'mean_total': df['total'].mean(),
            'throughput': len(df) / self.env.now if self.env.now > 0 else 0
        }

Summary

Collect everything. Filter later.

Key statistics: - Wait times (mean, median, percentiles, max) - Queue lengths (over time, distribution) - Utilisation (by resource) - Throughput (completions per time) - Cycle time (arrival to departure)

Without measurement, you're just guessing.

Next Steps


Discover the Power of Simulation

Want to become a go-to expert in simulation with Python? The Complete Simulation Bootcamp will show you how simulation can transform your career and your projects.

Explore the Bootcamp