Building Simulators¶
This page walks through building your own simulator for your scenario using OpenG2G components. For how the components fit together conceptually, see Architecture.
Essential Components¶
A simulator needs three things: a datacenter backend, a grid backend, and at least one controller.
You wire them together with the Coordinator:
from openg2g.coordinator import Coordinator
coord = Coordinator(
datacenter=dc,
grid=grid,
controllers=[controller1, controller2, ...],
total_duration_s=3600,
dc_bus="671",
)
log = coord.run()
The sections below show how to set up each component.
Datacenter¶
The OfflineDatacenter replays GPU power traces built from the data pipeline.
Load traces from the generated manifest, build templates for the simulation config, then create the datacenter with a DatacenterConfig and OfflineWorkload:
from fractions import Fraction
from pathlib import Path
from openg2g.datacenter.config import (
DatacenterConfig, InferenceModelSpec, InferenceRamp,
PowerAugmentationConfig, TrainingRun,
)
from openg2g.datacenter.offline import OfflineDatacenter, OfflineWorkload
from openg2g.datacenter.workloads.inference import InferenceData
from openg2g.datacenter.workloads.training import TrainingTrace
models = (
InferenceModelSpec(
model_label="Llama-3.1-8B", num_replicas=720, gpus_per_replica=1,
initial_batch_size=128, itl_deadline_s=0.08,
),
InferenceModelSpec(
model_label="Llama-3.1-70B", num_replicas=180, gpus_per_replica=4,
initial_batch_size=128, itl_deadline_s=0.10,
),
)
data_dir = Path("data/offline")
inference_data = InferenceData.ensure(data_dir, models, dt_s=0.1)
training_trace = TrainingTrace.ensure(data_dir / "training_trace.csv")
workload = OfflineWorkload(
inference_data=inference_data,
inference_ramps=InferenceRamp(target=0.2).at(t_start=2500.0, t_end=3000.0),
training=TrainingRun(n_gpus=2400, trace=training_trace).at(t_start=1000.0, t_end=2000.0),
)
dc_config = DatacenterConfig(gpus_per_server=8, base_kw_per_phase=500.0)
dc = OfflineDatacenter(
dc_config,
workload,
dt_s=Fraction(1, 10),
seed=0,
power_augmentation=PowerAugmentationConfig(
amplitude_scale_range=(0.98, 1.02),
noise_fraction=0.005,
),
)
Grid¶
Construct an OpenDSSGrid with a DSS case directory and initial tap position. Tap schedules are built using TapPosition and the | operator, and passed to a TapScheduleController:
from fractions import Fraction
from openg2g.grid.opendss import OpenDSSGrid
from openg2g.grid.config import TapPosition
TAP_STEP = 0.00625 # standard 5/8% tap step
# Fixed taps (single position at t=0)
tap_schedule = TapPosition(a=1.0 + 14 * TAP_STEP, b=1.0 + 6 * TAP_STEP, c=1.0 + 15 * TAP_STEP).at(t=0)
# Or scheduled changes at multiple times
tap_schedule = (
TapPosition(a=1.0 + 14 * TAP_STEP, b=1.0 + 6 * TAP_STEP, c=1.0 + 15 * TAP_STEP).at(t=0)
| TapPosition(a=1.0 + 16 * TAP_STEP, b=1.0 + 6 * TAP_STEP, c=1.0 + 17 * TAP_STEP).at(t=25 * 60)
| TapPosition(a=1.0 + 10 * TAP_STEP, b=1.0 + 6 * TAP_STEP, c=1.0 + 10 * TAP_STEP).at(t=55 * 60)
)
grid = OpenDSSGrid(
dss_case_dir="examples/ieee13",
dss_master_file="IEEE13Nodeckt.dss",
dc_bus="671",
dc_bus_kv=4.16,
power_factor=dc_config.power_factor,
dt_s=Fraction(1, 10),
connection_type="wye",
)
Controllers¶
Controllers compose in order. Pass them as a list to the coordinator:
from openg2g.controller.tap_schedule import TapScheduleController
from openg2g.controller.ofo import OFOBatchSizeController
coord = Coordinator(
datacenter=dc,
grid=grid,
controllers=[
TapScheduleController(schedule=tap_schedule, dt_s=Fraction(1)),
OFOBatchSizeController(...),
],
total_duration_s=3600,
dc_bus="671",
)
Actions from all controllers are applied before the next tick. Put tap controllers before batch controllers if tap changes should be visible to the batch optimizer within the same tick.
Analyzing Results¶
Coordinator.run() returns a SimulationLog containing all state and action history:
log = coord.run()
# Voltage statistics
from openg2g.metrics.voltage import compute_allbus_voltage_stats
stats = compute_allbus_voltage_stats(log.grid_states, v_min=0.95, v_max=1.05)
print(f"Violation time: {stats.violation_time_s:.1f} s")
# Time-series data for plotting
import numpy as np
time_s = np.array(log.time_s)
dc_time_s = np.array([s.time_s for s in log.dc_states])
kW_A = np.array([s.power_w.a / 1e3 for s in log.dc_states])
kW_B = np.array([s.power_w.b / 1e3 for s in log.dc_states])
kW_C = np.array([s.power_w.c / 1e3 for s in log.dc_states])
See examples/offline/plotting.py for reusable plot functions used by the example scripts.
Writing Custom Components¶
OpenG2G is designed for extensibility. Subclass the abstract base classes to implement your own datacenter backends and controllers, or subclass an existing implementation to create a custom variant. For background on the type system and generics, see Architecture: State Types and Generics.
Custom Datacenter¶
Datacenter backend.
Subclass DatacenterBackend and parameterize it with the state type your backend returns from step(). Your __init__ must call super().__init__() to initialize the base class's state tracking. The coordinator calls do_step() at the rate specified by dt_s, which internally calls your step() and records the returned state. Each step() call should return a DatacenterState (or subclass) with three-phase power in watts. Between steps, the coordinator may call apply_control(command, events) with individual DatacenterCommand instances; changes take effect on the next step(). Current state is available through state and past states through history(n); both are managed automatically by the base class.
reset() must clear all simulation state (counters, RNG state) while preserving configuration (dt_s, models, templates). History is cleared automatically by the coordinator (via do_reset()). Override start() and stop() for backends that acquire resources (connections, solver circuits); the coordinator calls start() before the simulation loop and stop() in LIFO order afterward. See Architecture: Component Lifecycle for the full sequence.
from __future__ import annotations
import functools
from fractions import Fraction
import numpy as np
from openg2g.clock import SimulationClock
from openg2g.datacenter.base import DatacenterBackend
from openg2g.events import EventEmitter
from openg2g.common import ThreePhase
from openg2g.datacenter.base import DatacenterState
from openg2g.datacenter.command import DatacenterCommand, SetBatchSize
class SyntheticDatacenter(DatacenterBackend[DatacenterState]):
"""A datacenter that generates sinusoidal power profiles."""
def __init__(self, dt_s: Fraction = Fraction(1, 10), base_kw: float = 1000.0):
super().__init__()
self._dt = dt_s
self._base_kw = base_kw
self._batch: dict[str, int] = {}
@property
def dt_s(self) -> Fraction:
return self._dt
def reset(self) -> None:
self._batch.clear()
def step(self, clock: SimulationClock, events: EventEmitter) -> DatacenterState:
t = clock.time_s
power_kw = self._base_kw * (1.0 + 0.3 * np.sin(2 * np.pi * t / 600))
power_w = power_kw * 1e3 / 3 # split equally across phases
return DatacenterState(
time_s=t,
power_w=ThreePhase(a=power_w, b=power_w, c=power_w),
)
@functools.singledispatchmethod
def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
raise TypeError(f"SyntheticDatacenter does not support {type(command).__name__}")
@apply_control.register
def _apply_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
self._batch.update({str(k): int(v) for k, v in command.batch_size_by_model.items()})
Datacenter state.
For richer state, define a DatacenterState subclass:
@dataclass(frozen=True)
class MyState(DatacenterState):
per_gpu_power_w: dict[int, float] = field(default_factory=dict)
class MyDatacenter(DatacenterBackend[MyState]):
def __init__(self):
super().__init__()
def step(self, clock: SimulationClock, events: EventEmitter) -> MyState:
...
The state type propagates through to SimulationLog, so log.dc_states will be correctly typed as list[MyState].
Datacenter commands.
For custom commands, subclass DatacenterCommand and register a handler with @apply_control.register:
from dataclasses import dataclass
from openg2g.datacenter.command import DatacenterCommand
@dataclass(frozen=True)
class SetPowerCap(DatacenterCommand):
cap_kw: float
class MyDatacenter(DatacenterBackend[DatacenterState]):
@functools.singledispatchmethod
def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
raise TypeError(f"MyDatacenter does not support {type(command).__name__}")
@apply_control.register
def _apply_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
self._batch.update(command.batch_size_by_model)
@apply_control.register
def _apply_set_power_cap(self, command: SetPowerCap, events: EventEmitter) -> None:
self._power_cap = command.cap_kw
The coordinator routes commands to backends based on their type hierarchy: DatacenterCommand subclasses go to the datacenter, GridCommand subclasses go to the grid.
Testing. DatacenterState and GridState are simple frozen dataclasses, so you can construct them directly in unit tests without running a full simulation.
Custom Controller¶
Controller.
Subclass Controller and parameterize it with the datacenter and grid backend types it works with. The coordinator calls step(clock, datacenter, grid, events) at the rate specified by dt_s; each call must return a list of DatacenterCommand and/or GridCommand objects (return [] for a no-op). Read current state via datacenter.state and grid.state, and history via datacenter.history(...) and grid.history(...). Use events.emit(topic, data) to log controller-side events. Keep step() fast since it runs synchronously in the simulation loop.
reset() must clear all simulation state (e.g., dual variables, counters, cached matrices) while preserving configuration. Override start() and stop() if the controller acquires external resources. See Architecture: Component Lifecycle for the full sequence.
from __future__ import annotations
from openg2g.clock import SimulationClock
from openg2g.controller.base import Controller
from openg2g.events import EventEmitter
from openg2g.grid.opendss import OpenDSSGrid
from openg2g.datacenter.command import DatacenterCommand, SetBatchSize
from openg2g.grid.command import GridCommand
class MyController(Controller[MyDatacenter, OpenDSSGrid]):
"""A controller that reduces batch size when any voltage is below a threshold."""
def __init__(self, v_threshold: float = 0.96, dt_s: float = 1.0):
self._v_threshold = v_threshold
self._dt_s = dt_s
@property
def dt_s(self) -> float:
return self._dt_s
def step(
self,
clock: SimulationClock,
datacenter: MyDatacenter,
grid: OpenDSSGrid,
events: EventEmitter,
) -> list[DatacenterCommand | GridCommand]:
if grid.state is None:
return []
for bus in grid.state.voltages.buses():
tp = grid.state.voltages[bus]
for v in (tp.a, tp.b, tp.c):
if v < self._v_threshold:
events.emit("controller.low_voltage", {"bus": bus, "time_s": clock.time_s})
return [SetBatchSize(batch_size_by_model={"MyModel": 64})]
return [SetBatchSize(batch_size_by_model={"MyModel": 128})]
Backend-specific controllers.
Controllers that need specific backend features (e.g., voltages_vector() from OpenDSSGrid) should bind their generic type parameters to concrete types instead of using the base classes. See the OFO controller below for an example.
Built-in Components¶
The following sections describe how the built-in components implement the interfaces above.
OfflineDatacenter¶
OfflineDatacenter implements DatacenterBackend by replaying real GPU power traces at controlled batch sizes.
Per-model server fleet Power assembly (3-phase)
┌─────────────────────┐ Phase A Phase B Phase C
│ Llama-3.1-8B │ │ │ │
│ 48 servers × 8 GPU │──┐ │ ┌─────┐ │ ┌─────┐ │ ┌─────┐
│ batch = 256 │ │ ├──│srv 1│ ├──│srv 2│ ├──│srv 3│
├─────────────────────┤ │ │ └─────┘ │ └─────┘ │ └─────┘
│ Llama-3.1-70B │ │ │ ┌─────┐ │ │ ┌─────┐
│ 30 servers × 8 GPU │──┤ sum kW ├──│srv 4│ │ ├──│srv 6│
│ batch = 128 │ │──per phase─>│ └─────┘ │ │ └─────┘
├─────────────────────┤ │ │ ... │ ... │ ...
│ Llama-3.1-405B │ │ │ │ │
│ 16 servers × 8 GPU │──┤ │ │ │
│ batch = 64 │ │ + power from training overlays
├─────────────────────┤ │ + per-replica noise and jitter
│ (+ 2 MoE models) │──┘ │ │ │
└─────────────────────┘ v v v
P_A(t) P_B(t) P_C(t)
How it implements the interface:
step(clock, events)indexes into pre-built per-GPU power templates using(global_step + offset) % template_lengthper server. Random restart offsets desynchronize servers for a realistic aggregate power profile. Returns anOfflineDatacenterState(extendsLLMDatacenterState) with per-model batch sizes, replica counts, and observed ITL.apply_control(command, events)dispatchesSetBatchSizecommands. Batch size changes take effect immediately on the nextstep()call.reset()clears step counter and RNG state. Rebuilds server layouts from the stored config so the next run starts fresh. History is cleared automatically bydo_reset().- An
ActivationPolicydetermines which servers are active at each timestep. The defaultRampActivationPolicyfollows anInferenceRampSchedulewith random priority ordering. - Training workload overlays add transient high-power phases.
OpenDSSGrid¶
OpenDSSGrid implements GridBackend using the OpenDSS power flow solver on standard IEEE test feeders.
How it implements the interface:
step(clock, power_samples_w, events)takes the most recent power sample from the accumulated buffer and runs a single OpenDSS power flow solve. If no samples are provided (grid runs faster than datacenter), the last known power is reused. Returns aGridStatewith per-bus, per-phase voltages and tap positions.apply_control(command, events)dispatchesSetTapscommands to update regulator tap positions.reset()clears cached state and the_startedflag. History is cleared automatically bydo_reset(). The DSS circuit is recompiled on the nextstart().start()compiles the DSS circuit from the case files, builds the bus-phase voltage index (v_index), and prepares snapshot indexing structures.voltages_vector()returns a flat numpy array of all bus-phase voltages inv_indexorder (used by the OFO controller for gradient computation).estimate_sensitivity(perturbation_kw)computes a finite-difference estimate of the voltage sensitivity matrix dV/dP.
OFOBatchSizeController¶
OFOBatchSizeController implements Controller using Online Feedback Optimization (primal-dual) to regulate batch sizes for voltage safety. For the full mathematical formulation, see the G2G paper.
How it implements the interface:
step(clock, datacenter, grid, events)readsactive_replicas_by_modelandobserved_itl_s_by_modelfromdatacenter.state, andphase_share_by_modelfrom the datacenter itself. On the grid side, it callsgrid.v_index,grid.voltages_vector(), andgrid.estimate_sensitivity(). Returns a list ofSetBatchSizecommands.reset()clears dual variables (voltage and latency multipliers), primal state, step counters, and the cached sensitivity matrix.- Binds its generic type parameters to
LLMBatchSizeControlledDatacenterandOpenDSSGrid, since it requires LLM-specific state fields and OpenDSS-specific methods (voltages_vector,estimate_sensitivity).