Skip to content

openg2g.datacenter

openg2g.datacenter.base

Abstract base class for datacenter backends and base state types.

DatacenterState dataclass

State emitted by a datacenter backend each timestep.

Contains only universally applicable fields. LLM-inference-specific fields (batch sizes, replicas, latency) live on LLMDatacenterState.

Source code in openg2g/datacenter/base.py
@dataclass(frozen=True)
class DatacenterState:
    """State emitted by a datacenter backend each timestep.

    Contains only universally applicable fields. LLM-inference-specific
    fields (batch sizes, replicas, latency) live on `LLMDatacenterState`.
    """

    time_s: float
    power_w: ThreePhase

LLMDatacenterState dataclass

Bases: DatacenterState

State from a datacenter serving LLM workloads.

Extends DatacenterState with per-model batch size, replica count, and observed inter-token latency fields used by LLM controllers.

Source code in openg2g/datacenter/base.py
@dataclass(frozen=True)
class LLMDatacenterState(DatacenterState):
    """State from a datacenter serving LLM workloads.

    Extends `DatacenterState` with per-model batch size, replica count,
    and observed inter-token latency fields used by LLM controllers.
    """

    batch_size_by_model: dict[str, int] = field(default_factory=dict)
    active_replicas_by_model: dict[str, int] = field(default_factory=dict)
    observed_itl_s_by_model: dict[str, float] = field(default_factory=dict)

DatacenterBackend

Bases: Generic[DCStateT], ABC

Interface for datacenter power simulation backends.

Source code in openg2g/datacenter/base.py
class DatacenterBackend(Generic[DCStateT], ABC):
    """Interface for datacenter power simulation backends."""

    @property
    @abstractmethod
    def dt_s(self) -> Fraction:
        """Native timestep as a Fraction (seconds)."""

    @property
    @abstractmethod
    def state(self) -> DCStateT:
        """Latest emitted state.

        Raises:
            RuntimeError: If accessed before the first `step()` call.
        """

    @abstractmethod
    def history(self, n: int | None = None) -> Sequence[DCStateT]:
        """Return emitted state history (all, or latest `n`)."""

    @abstractmethod
    def step(self, clock: SimulationClock) -> DCStateT:
        """Advance one native timestep. Return state for this step."""

    @abstractmethod
    def apply_control(self, command: DatacenterCommand) -> None:
        """Apply one command. Takes effect on next step() call."""

    @abstractmethod
    def reset(self) -> None:
        """Reset simulation state to initial conditions.

        Called by the coordinator before each `start()`. Must clear all
        simulation state: history, counters, RNG seeds, cached values.
        Configuration (dt_s, models, templates) is not affected.

        Abstract so every implementation explicitly enumerates its state.
        A forgotten field is a bug -- not clearing it silently corrupts
        the second run.
        """

    def start(self) -> None:
        """Acquire per-run resources (threads, solver circuits).

        Called after `reset()`, before the simulation loop. Override for
        backends that need resource acquisition (e.g., `OpenDSSGrid`
        compiles its DSS circuit here). No-op by default because most
        offline components have no resources to acquire.
        """

    def stop(self) -> None:
        """Release per-run resources. Simulation state is preserved.

        Called after the simulation loop in LIFO order. Override for
        backends that acquired resources in `start()`. No-op by default.
        """

    def bind_event_emitter(self, emitter: EventEmitter) -> None:
        """Attach a clock-bound emitter for backend-originated events."""

dt_s abstractmethod property

Native timestep as a Fraction (seconds).

state abstractmethod property

Latest emitted state.

Raises:

Type Description
RuntimeError

If accessed before the first step() call.

history(n=None) abstractmethod

Return emitted state history (all, or latest n).

Source code in openg2g/datacenter/base.py
@abstractmethod
def history(self, n: int | None = None) -> Sequence[DCStateT]:
    """Return emitted state history (all, or latest `n`)."""

step(clock) abstractmethod

Advance one native timestep. Return state for this step.

Source code in openg2g/datacenter/base.py
@abstractmethod
def step(self, clock: SimulationClock) -> DCStateT:
    """Advance one native timestep. Return state for this step."""

apply_control(command) abstractmethod

Apply one command. Takes effect on next step() call.

Source code in openg2g/datacenter/base.py
@abstractmethod
def apply_control(self, command: DatacenterCommand) -> None:
    """Apply one command. Takes effect on next step() call."""

reset() abstractmethod

Reset simulation state to initial conditions.

Called by the coordinator before each start(). Must clear all simulation state: history, counters, RNG seeds, cached values. Configuration (dt_s, models, templates) is not affected.

Abstract so every implementation explicitly enumerates its state. A forgotten field is a bug -- not clearing it silently corrupts the second run.

Source code in openg2g/datacenter/base.py
@abstractmethod
def reset(self) -> None:
    """Reset simulation state to initial conditions.

    Called by the coordinator before each `start()`. Must clear all
    simulation state: history, counters, RNG seeds, cached values.
    Configuration (dt_s, models, templates) is not affected.

    Abstract so every implementation explicitly enumerates its state.
    A forgotten field is a bug -- not clearing it silently corrupts
    the second run.
    """

start()

Acquire per-run resources (threads, solver circuits).

Called after reset(), before the simulation loop. Override for backends that need resource acquisition (e.g., OpenDSSGrid compiles its DSS circuit here). No-op by default because most offline components have no resources to acquire.

Source code in openg2g/datacenter/base.py
def start(self) -> None:
    """Acquire per-run resources (threads, solver circuits).

    Called after `reset()`, before the simulation loop. Override for
    backends that need resource acquisition (e.g., `OpenDSSGrid`
    compiles its DSS circuit here). No-op by default because most
    offline components have no resources to acquire.
    """

stop()

Release per-run resources. Simulation state is preserved.

Called after the simulation loop in LIFO order. Override for backends that acquired resources in start(). No-op by default.

Source code in openg2g/datacenter/base.py
def stop(self) -> None:
    """Release per-run resources. Simulation state is preserved.

    Called after the simulation loop in LIFO order. Override for
    backends that acquired resources in `start()`. No-op by default.
    """

bind_event_emitter(emitter)

Attach a clock-bound emitter for backend-originated events.

Source code in openg2g/datacenter/base.py
def bind_event_emitter(self, emitter: EventEmitter) -> None:
    """Attach a clock-bound emitter for backend-originated events."""

LLMBatchSizeControlledDatacenter

Bases: DatacenterBackend[DCStateT]

Datacenter that serves LLM workloads and supports batch-size control.

Marker layer between DatacenterBackend and concrete implementations. Controllers that issue set_batch_size commands or read active_replicas_by_model / observed_itl_s_by_model from state should bind their generic to this class.

Source code in openg2g/datacenter/base.py
class LLMBatchSizeControlledDatacenter(DatacenterBackend[DCStateT]):
    """Datacenter that serves LLM workloads and supports batch-size control.

    Marker layer between `DatacenterBackend` and concrete implementations.
    Controllers that issue `set_batch_size` commands or read
    `active_replicas_by_model` / `observed_itl_s_by_model` from state
    should bind their generic to this class.
    """

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors `[frac_A, frac_B, frac_C]`.

        Returns an empty dict by default. Consumers treat missing keys
        as uniform `[1/3, 1/3, 1/3]`. Override in subclasses that know
        actual server-to-phase placement.
        """
        return {}

phase_share_by_model property

Per-model phase share vectors [frac_A, frac_B, frac_C].

Returns an empty dict by default. Consumers treat missing keys as uniform [1/3, 1/3, 1/3]. Override in subclasses that know actual server-to-phase placement.

dt_s abstractmethod property

Native timestep as a Fraction (seconds).

state abstractmethod property

Latest emitted state.

Raises:

Type Description
RuntimeError

If accessed before the first step() call.

history(n=None) abstractmethod

Return emitted state history (all, or latest n).

Source code in openg2g/datacenter/base.py
@abstractmethod
def history(self, n: int | None = None) -> Sequence[DCStateT]:
    """Return emitted state history (all, or latest `n`)."""

step(clock) abstractmethod

Advance one native timestep. Return state for this step.

Source code in openg2g/datacenter/base.py
@abstractmethod
def step(self, clock: SimulationClock) -> DCStateT:
    """Advance one native timestep. Return state for this step."""

apply_control(command) abstractmethod

Apply one command. Takes effect on next step() call.

Source code in openg2g/datacenter/base.py
@abstractmethod
def apply_control(self, command: DatacenterCommand) -> None:
    """Apply one command. Takes effect on next step() call."""

reset() abstractmethod

Reset simulation state to initial conditions.

Called by the coordinator before each start(). Must clear all simulation state: history, counters, RNG seeds, cached values. Configuration (dt_s, models, templates) is not affected.

Abstract so every implementation explicitly enumerates its state. A forgotten field is a bug -- not clearing it silently corrupts the second run.

Source code in openg2g/datacenter/base.py
@abstractmethod
def reset(self) -> None:
    """Reset simulation state to initial conditions.

    Called by the coordinator before each `start()`. Must clear all
    simulation state: history, counters, RNG seeds, cached values.
    Configuration (dt_s, models, templates) is not affected.

    Abstract so every implementation explicitly enumerates its state.
    A forgotten field is a bug -- not clearing it silently corrupts
    the second run.
    """

start()

Acquire per-run resources (threads, solver circuits).

Called after reset(), before the simulation loop. Override for backends that need resource acquisition (e.g., OpenDSSGrid compiles its DSS circuit here). No-op by default because most offline components have no resources to acquire.

Source code in openg2g/datacenter/base.py
def start(self) -> None:
    """Acquire per-run resources (threads, solver circuits).

    Called after `reset()`, before the simulation loop. Override for
    backends that need resource acquisition (e.g., `OpenDSSGrid`
    compiles its DSS circuit here). No-op by default because most
    offline components have no resources to acquire.
    """

stop()

Release per-run resources. Simulation state is preserved.

Called after the simulation loop in LIFO order. Override for backends that acquired resources in start(). No-op by default.

Source code in openg2g/datacenter/base.py
def stop(self) -> None:
    """Release per-run resources. Simulation state is preserved.

    Called after the simulation loop in LIFO order. Override for
    backends that acquired resources in `start()`. No-op by default.
    """

bind_event_emitter(emitter)

Attach a clock-bound emitter for backend-originated events.

Source code in openg2g/datacenter/base.py
def bind_event_emitter(self, emitter: EventEmitter) -> None:
    """Attach a clock-bound emitter for backend-originated events."""

openg2g.datacenter.config

Datacenter facility and workload configuration.

TrainingRun dataclass

A single training workload window.

Attributes:

Name Type Description
t_start float

Global simulation time when training becomes active (seconds).

t_end float

Global simulation time when training stops (seconds).

n_gpus int

Number of GPUs running the training workload.

trace TrainingTrace

Single-GPU training power trace.

target_peak_W_per_gpu float

The trace is rescaled so its peak equals this value.

Source code in openg2g/datacenter/config.py
@dataclass(frozen=True)
class TrainingRun:
    """A single training workload window.

    Attributes:
        t_start: Global simulation time when training becomes active (seconds).
        t_end: Global simulation time when training stops (seconds).
        n_gpus: Number of GPUs running the training workload.
        trace: Single-GPU training power trace.
        target_peak_W_per_gpu: The trace is rescaled so its peak equals this value.
    """

    t_start: float
    t_end: float
    n_gpus: int
    trace: TrainingTrace
    target_peak_W_per_gpu: float = 400.0

    def __post_init__(self) -> None:
        if self.t_end < self.t_start:
            raise ValueError(f"TrainingRun t_end ({self.t_end}) must be >= t_start ({self.t_start}).")
        if self.n_gpus < 0:
            raise ValueError(f"TrainingRun n_gpus must be >= 0, got {self.n_gpus}.")

    def __or__(self, other: TrainingRun | TrainingSchedule) -> TrainingSchedule:
        if isinstance(other, TrainingRun):
            return TrainingSchedule(entries=(self, other))
        return TrainingSchedule(entries=(self, *other))

TrainingSchedule

Ordered collection of training windows, built with |.

Example:

schedule = (
    TrainingRun(t_start=500, t_end=1500, n_gpus=2400, trace=trace_a)
    | TrainingRun(t_start=2000, t_end=3000, n_gpus=1200, trace=trace_b)
)
Source code in openg2g/datacenter/config.py
class TrainingSchedule:
    """Ordered collection of training windows, built with `|`.

    Example:

        schedule = (
            TrainingRun(t_start=500, t_end=1500, n_gpus=2400, trace=trace_a)
            | TrainingRun(t_start=2000, t_end=3000, n_gpus=1200, trace=trace_b)
        )
    """

    __slots__ = ("_entries",)

    def __init__(self, entries: tuple[TrainingRun, ...]) -> None:
        self._entries = tuple(sorted(entries, key=lambda e: e.t_start))

    def __or__(self, other: TrainingRun | TrainingSchedule) -> TrainingSchedule:
        if isinstance(other, TrainingRun):
            return TrainingSchedule(entries=(*self._entries, other))
        return TrainingSchedule(entries=(*self._entries, *other._entries))

    def __iter__(self) -> Iterator[TrainingRun]:
        return iter(self._entries)

    def __len__(self) -> int:
        return len(self._entries)

    def __bool__(self) -> bool:
        return bool(self._entries)

    def __repr__(self) -> str:
        parts = [f"TrainingRun(t_start={r.t_start}, t_end={r.t_end}, n_gpus={r.n_gpus})" for r in self._entries]
        return " | ".join(parts)

ServerRamp dataclass

A single server ramp event.

Transitions the active-server fraction to target linearly over [t_start, t_end].

Attributes:

Name Type Description
t_start float

Global simulation time when the ramp begins (seconds).

t_end float

Global simulation time when the ramp ends (seconds).

target float

Target active-server fraction after the ramp (0.0--1.0).

Source code in openg2g/datacenter/config.py
@dataclass(frozen=True)
class ServerRamp:
    """A single server ramp event.

    Transitions the active-server fraction to `target` linearly over
    `[t_start, t_end]`.

    Attributes:
        t_start: Global simulation time when the ramp begins (seconds).
        t_end: Global simulation time when the ramp ends (seconds).
        target: Target active-server fraction after the ramp (0.0--1.0).
    """

    t_start: float
    t_end: float
    target: float

    def __post_init__(self) -> None:
        if self.t_end < self.t_start:
            raise ValueError(f"ServerRamp t_end ({self.t_end}) must be >= t_start ({self.t_start}).")
        if not (0.0 <= self.target <= 1.0):
            raise ValueError(f"ServerRamp target must be in [0.0, 1.0], got {self.target}.")

    def __or__(self, other: ServerRamp | ServerRampSchedule) -> ServerRampSchedule:
        if isinstance(other, ServerRamp):
            return ServerRampSchedule(entries=(self, other))
        return ServerRampSchedule(entries=(self, *other))

ServerRampSchedule

Ordered collection of server ramp events, built with |.

Semantics: before the first ramp, fraction = 1.0. During each [t_start, t_end] window, the fraction linearly interpolates from the previous level to target. Between ramps, the fraction holds at the last target.

An empty schedule means all servers are active (fraction = 1.0) at all times.

Example:

ramps = (
    ServerRamp(t_start=2500, t_end=3000, target=0.2)
    | ServerRamp(t_start=3200, t_end=3400, target=1.0)
)
Source code in openg2g/datacenter/config.py
class ServerRampSchedule:
    """Ordered collection of server ramp events, built with `|`.

    Semantics: before the first ramp, fraction = 1.0.  During each
    `[t_start, t_end]` window, the fraction linearly interpolates from
    the previous level to `target`.  Between ramps, the fraction holds
    at the last target.

    An empty schedule means all servers are active (fraction = 1.0) at all times.

    Example:

        ramps = (
            ServerRamp(t_start=2500, t_end=3000, target=0.2)
            | ServerRamp(t_start=3200, t_end=3400, target=1.0)
        )
    """

    __slots__ = ("_entries",)

    def __init__(self, entries: tuple[ServerRamp, ...]) -> None:
        self._entries = tuple(sorted(entries, key=lambda e: e.t_start))

    def __or__(self, other: ServerRamp | ServerRampSchedule) -> ServerRampSchedule:
        if isinstance(other, ServerRamp):
            return ServerRampSchedule(entries=(*self._entries, other))
        return ServerRampSchedule(entries=(*self._entries, *other._entries))

    def __iter__(self) -> Iterator[ServerRamp]:
        return iter(self._entries)

    def __len__(self) -> int:
        return len(self._entries)

    def __bool__(self) -> bool:
        return bool(self._entries)

    def __repr__(self) -> str:
        parts = [f"ServerRamp(t_start={r.t_start}, t_end={r.t_end}, target={r.target})" for r in self._entries]
        return " | ".join(parts)

    def fraction_at(self, t: float | np.ndarray) -> float | np.ndarray:
        """Evaluate the active-server fraction at time(s) *t*.

        Piecewise-linear interpolation between ramp events.
        Before the first ramp, fraction = 1.0.

        Args:
            t: Scalar or array of global simulation times (seconds).

        Returns:
            Active-server fraction(s), same shape as *t*.
        """
        if isinstance(t, np.ndarray):
            return self._fraction_array(t)
        return float(self._fraction_scalar(float(t)))

    def _fraction_scalar(self, t: float) -> float:
        level = 1.0
        for ramp in self._entries:
            if t < ramp.t_start:
                return level
            if t <= ramp.t_end:
                if ramp.t_end == ramp.t_start:
                    return ramp.target
                alpha = (t - ramp.t_start) / (ramp.t_end - ramp.t_start)
                return level + (ramp.target - level) * alpha
            level = ramp.target
        return level

    def _fraction_array(self, t: np.ndarray) -> np.ndarray:
        vfunc = np.vectorize(self._fraction_scalar, otypes=[float])
        return vfunc(t)

fraction_at(t)

Evaluate the active-server fraction at time(s) t.

Piecewise-linear interpolation between ramp events. Before the first ramp, fraction = 1.0.

Parameters:

Name Type Description Default
t float | ndarray

Scalar or array of global simulation times (seconds).

required

Returns:

Type Description
float | ndarray

Active-server fraction(s), same shape as t.

Source code in openg2g/datacenter/config.py
def fraction_at(self, t: float | np.ndarray) -> float | np.ndarray:
    """Evaluate the active-server fraction at time(s) *t*.

    Piecewise-linear interpolation between ramp events.
    Before the first ramp, fraction = 1.0.

    Args:
        t: Scalar or array of global simulation times (seconds).

    Returns:
        Active-server fraction(s), same shape as *t*.
    """
    if isinstance(t, np.ndarray):
        return self._fraction_array(t)
    return float(self._fraction_scalar(float(t)))

DatacenterConfig dataclass

Physical datacenter facility configuration.

Attributes:

Name Type Description
gpus_per_server int

Number of GPUs per physical server rack.

base_kw_per_phase float

Constant base load per phase (kW).

Source code in openg2g/datacenter/config.py
@dataclass(frozen=True)
class DatacenterConfig:
    """Physical datacenter facility configuration.

    Attributes:
        gpus_per_server: Number of GPUs per physical server rack.
        base_kw_per_phase: Constant base load per phase (kW).
    """

    gpus_per_server: int = 8
    base_kw_per_phase: float = 0.0

    def __post_init__(self) -> None:
        if self.gpus_per_server < 1:
            raise ValueError(f"gpus_per_server must be >= 1, got {self.gpus_per_server}.")

WorkloadConfig

What runs in the datacenter: inference, training, and ramp events.

Accepts flexible input types and normalizes them internally: - A single TrainingRun is wrapped in a TrainingSchedule. - A single ServerRamp is wrapped in a ServerRampSchedule. - None yields an empty schedule.

Properties always return schedule types, eliminating isinstance checks at consumption sites.

Parameters:

Name Type Description Default
inference LLMInferenceWorkload

LLM inference workload specification.

required
training TrainingRun | TrainingSchedule | None

Training workload window(s). None disables training overlay.

None
server_ramps ServerRamp | ServerRampSchedule | None

Server ramp event(s). None keeps all servers active.

None
Source code in openg2g/datacenter/config.py
class WorkloadConfig:
    """What runs in the datacenter: inference, training, and ramp events.

    Accepts flexible input types and normalizes them internally:
    - A single `TrainingRun` is wrapped in a `TrainingSchedule`.
    - A single `ServerRamp` is wrapped in a `ServerRampSchedule`.
    - `None` yields an empty schedule.

    Properties always return schedule types, eliminating `isinstance`
    checks at consumption sites.

    Args:
        inference: LLM inference workload specification.
        training: Training workload window(s). `None` disables training overlay.
        server_ramps: Server ramp event(s). `None` keeps all servers active.
    """

    def __init__(
        self,
        inference: LLMInferenceWorkload,
        training: TrainingRun | TrainingSchedule | None = None,
        server_ramps: ServerRamp | ServerRampSchedule | None = None,
    ) -> None:
        self._inference = inference

        if training is None:
            self._training = TrainingSchedule(entries=())
        elif isinstance(training, TrainingRun):
            self._training = TrainingSchedule(entries=(training,))
        else:
            self._training = training

        if server_ramps is None:
            self._server_ramps = ServerRampSchedule(entries=())
        elif isinstance(server_ramps, ServerRamp):
            self._server_ramps = ServerRampSchedule(entries=(server_ramps,))
        else:
            self._server_ramps = server_ramps

    @property
    def inference(self) -> LLMInferenceWorkload:
        return self._inference

    @property
    def training(self) -> TrainingSchedule:
        return self._training

    @property
    def server_ramps(self) -> ServerRampSchedule:
        return self._server_ramps

openg2g.datacenter.layout

Server layout and power augmentation primitives.

Provides the shared components for scaling per-GPU power measurements to datacenter-level three-phase power output. These primitives are backend-agnostic and can be used by both offline (trace-based) and online (live GPU) datacenters.

ActivationPolicy

Bases: ABC

Per-model activation policy that answers "which servers are active?"

Created by ActivationStrategy.for_model and bound to a specific model's server pool.

Source code in openg2g/datacenter/layout.py
class ActivationPolicy(ABC):
    """Per-model activation policy that answers "which servers are active?"

    Created by `ActivationStrategy.for_model` and bound to a specific
    model's server pool.
    """

    @abstractmethod
    def active_mask(self, t: float) -> np.ndarray:
        """Boolean mask of active servers at time *t*.

        Returns:
            Array of shape `(num_servers,)` with `True` for active servers.
        """

    def active_indices(self, t: float) -> np.ndarray:
        """Indices of active servers at time *t*.

        The default implementation returns indices in ascending order via
        `np.where(active_mask(t))`. Subclasses may override to return
        indices in a specific order (e.g., priority order) to control
        floating-point summation order in the datacenter.

        Returns:
            1-D int array of active server indices.
        """
        return np.where(self.active_mask(t))[0]

active_mask(t) abstractmethod

Boolean mask of active servers at time t.

Returns:

Type Description
ndarray

Array of shape (num_servers,) with True for active servers.

Source code in openg2g/datacenter/layout.py
@abstractmethod
def active_mask(self, t: float) -> np.ndarray:
    """Boolean mask of active servers at time *t*.

    Returns:
        Array of shape `(num_servers,)` with `True` for active servers.
    """

active_indices(t)

Indices of active servers at time t.

The default implementation returns indices in ascending order via np.where(active_mask(t)). Subclasses may override to return indices in a specific order (e.g., priority order) to control floating-point summation order in the datacenter.

Returns:

Type Description
ndarray

1-D int array of active server indices.

Source code in openg2g/datacenter/layout.py
def active_indices(self, t: float) -> np.ndarray:
    """Indices of active servers at time *t*.

    The default implementation returns indices in ascending order via
    `np.where(active_mask(t))`. Subclasses may override to return
    indices in a specific order (e.g., priority order) to control
    floating-point summation order in the datacenter.

    Returns:
        1-D int array of active server indices.
    """
    return np.where(self.active_mask(t))[0]

ActivationStrategy

Bases: ABC

Factory that creates per-model ActivationPolicy instances.

A strategy is instantiated once and passed to the datacenter. When the datacenter builds each model's server layout, it calls for_model to create a model-specific ActivationPolicy.

Subclass to implement custom activation strategies. The phase_list argument in for_model enables phase-aware load balancing.

Source code in openg2g/datacenter/layout.py
class ActivationStrategy(ABC):
    """Factory that creates per-model `ActivationPolicy` instances.

    A strategy is instantiated once and passed to the datacenter. When
    the datacenter builds each model's server layout, it calls
    `for_model` to create a model-specific `ActivationPolicy`.

    Subclass to implement custom activation strategies. The `phase_list`
    argument in `for_model` enables phase-aware load balancing.
    """

    @abstractmethod
    def for_model(
        self,
        *,
        num_servers: int,
        phase_list: np.ndarray,
        rng: np.random.Generator,
    ) -> ActivationPolicy:
        """Create a policy for one model's server pool.

        Args:
            num_servers: Number of physical servers for this model.
            phase_list: Phase assignment per server (0=A, 1=B, 2=C), shape
                `(num_servers,)`.
            rng: RNG for randomized decisions (priority ordering, etc.).
                Implementations must consume RNG calls deterministically
                so that downstream layout generation is reproducible.

        Returns:
            Policy that answers `active_mask(t)` queries.
        """

for_model(*, num_servers, phase_list, rng) abstractmethod

Create a policy for one model's server pool.

Parameters:

Name Type Description Default
num_servers int

Number of physical servers for this model.

required
phase_list ndarray

Phase assignment per server (0=A, 1=B, 2=C), shape (num_servers,).

required
rng Generator

RNG for randomized decisions (priority ordering, etc.). Implementations must consume RNG calls deterministically so that downstream layout generation is reproducible.

required

Returns:

Type Description
ActivationPolicy

Policy that answers active_mask(t) queries.

Source code in openg2g/datacenter/layout.py
@abstractmethod
def for_model(
    self,
    *,
    num_servers: int,
    phase_list: np.ndarray,
    rng: np.random.Generator,
) -> ActivationPolicy:
    """Create a policy for one model's server pool.

    Args:
        num_servers: Number of physical servers for this model.
        phase_list: Phase assignment per server (0=A, 1=B, 2=C), shape
            `(num_servers,)`.
        rng: RNG for randomized decisions (priority ordering, etc.).
            Implementations must consume RNG calls deterministically
            so that downstream layout generation is reproducible.

    Returns:
        Policy that answers `active_mask(t)` queries.
    """

RampActivationStrategy

Bases: ActivationStrategy

Activate servers by fixed random priority, following a ServerRampSchedule.

At time t, the top-k servers (by random priority) are active, where k = round(schedule.fraction_at(t) * num_servers).

This is the default strategy used by OfflineDatacenter.

Parameters:

Name Type Description Default
schedule ServerRampSchedule

Temporal ramp schedule mapping time to active-server fraction.

required
Source code in openg2g/datacenter/layout.py
class RampActivationStrategy(ActivationStrategy):
    """Activate servers by fixed random priority, following a `ServerRampSchedule`.

    At time *t*, the top-*k* servers (by random priority) are active, where
    `k = round(schedule.fraction_at(t) * num_servers)`.

    This is the default strategy used by `OfflineDatacenter`.

    Args:
        schedule: Temporal ramp schedule mapping time to active-server fraction.
    """

    def __init__(self, schedule: ServerRampSchedule) -> None:
        self._schedule = schedule

    def for_model(
        self,
        *,
        num_servers: int,
        phase_list: np.ndarray,
        rng: np.random.Generator,
    ) -> ActivationPolicy:
        priority = np.arange(num_servers, dtype=int)
        rng.shuffle(priority)
        return _RampActivationPolicy(self._schedule, num_servers, priority)

ServerLayout dataclass

Per-model server layout describing how GPUs are organized.

Attributes:

Name Type Description
num_servers int

Number of physical servers for this model.

total_gpus int

Total GPU count across all servers.

gpus_per_replica int

GPUs per model replica.

gpus_per_server_list ndarray

GPU count per server (last may be partial).

phase_list ndarray

Phase assignment per server (0=A, 1=B, 2=C).

activation_policy ActivationPolicy

Determines which servers are active at time t.

stagger_offsets ndarray

Per-server offsets for desynchronization. In offline mode these are integer indices into a power template; in online mode they can be float time offsets into a rolling buffer.

amplitude_scales ndarray

Per-server power multiplier for inter-server variation.

noise_fraction float

Gaussian noise standard deviation as a fraction of per-server power.

Source code in openg2g/datacenter/layout.py
@dataclass
class ServerLayout:
    """Per-model server layout describing how GPUs are organized.

    Attributes:
        num_servers: Number of physical servers for this model.
        total_gpus: Total GPU count across all servers.
        gpus_per_replica: GPUs per model replica.
        gpus_per_server_list: GPU count per server (last may be partial).
        phase_list: Phase assignment per server (0=A, 1=B, 2=C).
        activation_policy: Determines which servers are active at time *t*.
        stagger_offsets: Per-server offsets for desynchronization. In offline
            mode these are integer indices into a power template; in online
            mode they can be float time offsets into a rolling buffer.
        amplitude_scales: Per-server power multiplier for inter-server variation.
        noise_fraction: Gaussian noise standard deviation as a fraction of
            per-server power.
    """

    num_servers: int
    total_gpus: int
    gpus_per_replica: int
    gpus_per_server_list: np.ndarray
    phase_list: np.ndarray
    activation_policy: ActivationPolicy
    stagger_offsets: np.ndarray
    amplitude_scales: np.ndarray
    noise_fraction: float

    @classmethod
    def build(
        cls,
        model_spec: LLMInferenceModelSpec,
        *,
        gpus_per_server: int,
        stagger_range: int | float,
        activation_strategy: ActivationStrategy,
        amplitude_scale_range: tuple[float, float],
        noise_fraction: float,
        rng: np.random.Generator,
    ) -> ServerLayout:
        """Build a server layout for one model.

        This is a pure function of its inputs (plus RNG state). The caller
        is responsible for providing a consistently-seeded RNG so that
        layout generation is reproducible.

        Args:
            model_spec: Model specification (replicas, GPUs per replica, etc.).
            gpus_per_server: Number of GPUs per physical server rack.
            stagger_range: Upper bound for per-server stagger offsets. If `int`,
                offsets are drawn from `rng.integers(0, stagger_range)` (for
                offline template indexing). If `float`, offsets are drawn from
                `rng.uniform(0, stagger_range)` (for online time-based staggering).
            activation_strategy: Strategy for determining active servers.
            amplitude_scale_range: `(low, high)` range for per-server amplitude
                scaling. Each server draws a uniform multiplier from this range.
            noise_fraction: Gaussian noise standard deviation as a fraction
                of per-server power.
            rng: Random number generator (consumed for phase assignment,
                activation policy, stagger offsets, and amplitude scales).

        Returns:
            Frozen `ServerLayout` for the model.
        """
        num_replicas = int(model_spec.num_replicas)
        gpus_per_replica = int(model_spec.gpus_per_replica)
        total_gpus = num_replicas * gpus_per_replica
        num_servers = int(math.ceil(total_gpus / gpus_per_server))

        gpus_per_server_list = np.full(num_servers, gpus_per_server, dtype=int)
        tail = total_gpus - (num_servers - 1) * gpus_per_server
        gpus_per_server_list[-1] = int(tail) if tail > 0 else gpus_per_server

        sA, sB, sC = split_integer_evenly(num_servers, 3)
        phase_list = np.asarray(([0] * sA) + ([1] * sB) + ([2] * sC), dtype=int)
        rng.shuffle(phase_list)

        bound_policy = activation_strategy.for_model(
            num_servers=num_servers,
            phase_list=phase_list,
            rng=rng,
        )

        if isinstance(stagger_range, int):
            stagger_offsets = rng.integers(low=0, high=max(stagger_range, 1), size=num_servers)
        else:
            stagger_offsets = rng.uniform(0.0, max(float(stagger_range), 1e-9), size=num_servers)

        amplitude_scales = rng.uniform(
            float(amplitude_scale_range[0]),
            float(amplitude_scale_range[1]),
            size=num_servers,
        )

        return cls(
            num_servers=num_servers,
            total_gpus=total_gpus,
            gpus_per_replica=gpus_per_replica,
            gpus_per_server_list=gpus_per_server_list,
            phase_list=phase_list,
            activation_policy=bound_policy,
            stagger_offsets=stagger_offsets,
            amplitude_scales=amplitude_scales,
            noise_fraction=float(noise_fraction),
        )

build(model_spec, *, gpus_per_server, stagger_range, activation_strategy, amplitude_scale_range, noise_fraction, rng) classmethod

Build a server layout for one model.

This is a pure function of its inputs (plus RNG state). The caller is responsible for providing a consistently-seeded RNG so that layout generation is reproducible.

Parameters:

Name Type Description Default
model_spec LLMInferenceModelSpec

Model specification (replicas, GPUs per replica, etc.).

required
gpus_per_server int

Number of GPUs per physical server rack.

required
stagger_range int | float

Upper bound for per-server stagger offsets. If int, offsets are drawn from rng.integers(0, stagger_range) (for offline template indexing). If float, offsets are drawn from rng.uniform(0, stagger_range) (for online time-based staggering).

required
activation_strategy ActivationStrategy

Strategy for determining active servers.

required
amplitude_scale_range tuple[float, float]

(low, high) range for per-server amplitude scaling. Each server draws a uniform multiplier from this range.

required
noise_fraction float

Gaussian noise standard deviation as a fraction of per-server power.

required
rng Generator

Random number generator (consumed for phase assignment, activation policy, stagger offsets, and amplitude scales).

required

Returns:

Type Description
ServerLayout

Frozen ServerLayout for the model.

Source code in openg2g/datacenter/layout.py
@classmethod
def build(
    cls,
    model_spec: LLMInferenceModelSpec,
    *,
    gpus_per_server: int,
    stagger_range: int | float,
    activation_strategy: ActivationStrategy,
    amplitude_scale_range: tuple[float, float],
    noise_fraction: float,
    rng: np.random.Generator,
) -> ServerLayout:
    """Build a server layout for one model.

    This is a pure function of its inputs (plus RNG state). The caller
    is responsible for providing a consistently-seeded RNG so that
    layout generation is reproducible.

    Args:
        model_spec: Model specification (replicas, GPUs per replica, etc.).
        gpus_per_server: Number of GPUs per physical server rack.
        stagger_range: Upper bound for per-server stagger offsets. If `int`,
            offsets are drawn from `rng.integers(0, stagger_range)` (for
            offline template indexing). If `float`, offsets are drawn from
            `rng.uniform(0, stagger_range)` (for online time-based staggering).
        activation_strategy: Strategy for determining active servers.
        amplitude_scale_range: `(low, high)` range for per-server amplitude
            scaling. Each server draws a uniform multiplier from this range.
        noise_fraction: Gaussian noise standard deviation as a fraction
            of per-server power.
        rng: Random number generator (consumed for phase assignment,
            activation policy, stagger offsets, and amplitude scales).

    Returns:
        Frozen `ServerLayout` for the model.
    """
    num_replicas = int(model_spec.num_replicas)
    gpus_per_replica = int(model_spec.gpus_per_replica)
    total_gpus = num_replicas * gpus_per_replica
    num_servers = int(math.ceil(total_gpus / gpus_per_server))

    gpus_per_server_list = np.full(num_servers, gpus_per_server, dtype=int)
    tail = total_gpus - (num_servers - 1) * gpus_per_server
    gpus_per_server_list[-1] = int(tail) if tail > 0 else gpus_per_server

    sA, sB, sC = split_integer_evenly(num_servers, 3)
    phase_list = np.asarray(([0] * sA) + ([1] * sB) + ([2] * sC), dtype=int)
    rng.shuffle(phase_list)

    bound_policy = activation_strategy.for_model(
        num_servers=num_servers,
        phase_list=phase_list,
        rng=rng,
    )

    if isinstance(stagger_range, int):
        stagger_offsets = rng.integers(low=0, high=max(stagger_range, 1), size=num_servers)
    else:
        stagger_offsets = rng.uniform(0.0, max(float(stagger_range), 1e-9), size=num_servers)

    amplitude_scales = rng.uniform(
        float(amplitude_scale_range[0]),
        float(amplitude_scale_range[1]),
        size=num_servers,
    )

    return cls(
        num_servers=num_servers,
        total_gpus=total_gpus,
        gpus_per_replica=gpus_per_replica,
        gpus_per_server_list=gpus_per_server_list,
        phase_list=phase_list,
        activation_policy=bound_policy,
        stagger_offsets=stagger_offsets,
        amplitude_scales=amplitude_scales,
        noise_fraction=float(noise_fraction),
    )

AugmentedPower dataclass

Result of power augmentation for one simulation timestep.

Attributes:

Name Type Description
power_w ThreePhase

Three-phase total power (watts), including base load.

power_by_model_w dict[str, float]

Per-model total active power (watts).

active_replicas_by_model dict[str, int]

Per-model active replica count.

Source code in openg2g/datacenter/layout.py
@dataclass(frozen=True)
class AugmentedPower:
    """Result of power augmentation for one simulation timestep.

    Attributes:
        power_w: Three-phase total power (watts), including base load.
        power_by_model_w: Per-model total active power (watts).
        active_replicas_by_model: Per-model active replica count.
    """

    power_w: ThreePhase
    power_by_model_w: dict[str, float] = field(default_factory=dict)
    active_replicas_by_model: dict[str, int] = field(default_factory=dict)

PowerAugmenter

Scales per-GPU power through server layouts to three-phase power.

Given per-GPU power values for each server (one value per server per model), applies per-server scaling, noise, activation masking, and phase summation to produce datacenter-level three-phase power.

This class is backend-agnostic. The offline datacenter feeds it template-indexed values; the online datacenter can feed it live-measured values.

Parameters:

Name Type Description Default
layouts dict[str, ServerLayout]

Per-model server layouts.

required
base_w_per_phase float

Constant base load per phase (watts).

0.0
seed int

Random seed for noise RNG.

0
Source code in openg2g/datacenter/layout.py
class PowerAugmenter:
    """Scales per-GPU power through server layouts to three-phase power.

    Given per-GPU power values for each server (one value per server per
    model), applies per-server scaling, noise, activation masking, and
    phase summation to produce datacenter-level three-phase power.

    This class is backend-agnostic. The offline datacenter feeds it
    template-indexed values; the online datacenter can feed it
    live-measured values.

    Args:
        layouts: Per-model server layouts.
        base_w_per_phase: Constant base load per phase (watts).
        seed: Random seed for noise RNG.
    """

    def __init__(
        self,
        layouts: dict[str, ServerLayout],
        base_w_per_phase: float = 0.0,
        seed: int = 0,
    ) -> None:
        self._layouts = layouts
        self._base_w_per_phase = float(base_w_per_phase)
        self._seed = int(seed)
        self._rng = np.random.default_rng(self._seed)

    def step(
        self,
        per_gpu_by_model: dict[str, np.ndarray],
        t: float,
    ) -> AugmentedPower:
        """Augment per-server per-GPU power to three-phase power.

        Args:
            per_gpu_by_model: Mapping of model label to per-GPU power
                array of shape `(num_servers,)`. Only models with active
                replicas should be included.
            t: Current simulation time (seconds), passed to activation
                policies.

        Returns:
            `AugmentedPower` with three-phase power, per-model power,
            and per-model active replica counts.
        """
        phase_power = np.full(3, self._base_w_per_phase, dtype=float)
        power_by_model: dict[str, float] = {}
        active_replicas_by_model: dict[str, int] = {}

        for label, per_gpu in per_gpu_by_model.items():
            layout = self._layouts[label]

            server_powers = per_gpu * layout.gpus_per_server_list * layout.amplitude_scales
            if layout.noise_fraction > 0:
                levels = np.maximum(server_powers, 1.0)
                server_powers = (
                    server_powers + self._rng.normal(0.0, 1.0, size=layout.num_servers) * layout.noise_fraction * levels
                )
            server_powers = np.maximum(server_powers, 0.0)

            active_indices = layout.activation_policy.active_indices(t)
            active_powers = server_powers[active_indices]
            active_phases = layout.phase_list[active_indices]

            model_phase_power = np.zeros(3, dtype=float)
            np.add.at(model_phase_power, active_phases, active_powers)
            phase_power += model_phase_power

            power_by_model[label] = float(np.sum(active_powers))
            active_gpus = int(np.sum(layout.gpus_per_server_list[active_indices]))
            active_replicas_by_model[label] = active_gpus // layout.gpus_per_replica

        return AugmentedPower(
            power_w=ThreePhase(
                a=float(phase_power[0]),
                b=float(phase_power[1]),
                c=float(phase_power[2]),
            ),
            power_by_model_w=power_by_model,
            active_replicas_by_model=active_replicas_by_model,
        )

    def reset(self) -> None:
        """Re-seed the noise RNG to its initial state."""
        self._rng = np.random.default_rng(self._seed)

step(per_gpu_by_model, t)

Augment per-server per-GPU power to three-phase power.

Parameters:

Name Type Description Default
per_gpu_by_model dict[str, ndarray]

Mapping of model label to per-GPU power array of shape (num_servers,). Only models with active replicas should be included.

required
t float

Current simulation time (seconds), passed to activation policies.

required

Returns:

Type Description
AugmentedPower

AugmentedPower with three-phase power, per-model power,

AugmentedPower

and per-model active replica counts.

Source code in openg2g/datacenter/layout.py
def step(
    self,
    per_gpu_by_model: dict[str, np.ndarray],
    t: float,
) -> AugmentedPower:
    """Augment per-server per-GPU power to three-phase power.

    Args:
        per_gpu_by_model: Mapping of model label to per-GPU power
            array of shape `(num_servers,)`. Only models with active
            replicas should be included.
        t: Current simulation time (seconds), passed to activation
            policies.

    Returns:
        `AugmentedPower` with three-phase power, per-model power,
        and per-model active replica counts.
    """
    phase_power = np.full(3, self._base_w_per_phase, dtype=float)
    power_by_model: dict[str, float] = {}
    active_replicas_by_model: dict[str, int] = {}

    for label, per_gpu in per_gpu_by_model.items():
        layout = self._layouts[label]

        server_powers = per_gpu * layout.gpus_per_server_list * layout.amplitude_scales
        if layout.noise_fraction > 0:
            levels = np.maximum(server_powers, 1.0)
            server_powers = (
                server_powers + self._rng.normal(0.0, 1.0, size=layout.num_servers) * layout.noise_fraction * levels
            )
        server_powers = np.maximum(server_powers, 0.0)

        active_indices = layout.activation_policy.active_indices(t)
        active_powers = server_powers[active_indices]
        active_phases = layout.phase_list[active_indices]

        model_phase_power = np.zeros(3, dtype=float)
        np.add.at(model_phase_power, active_phases, active_powers)
        phase_power += model_phase_power

        power_by_model[label] = float(np.sum(active_powers))
        active_gpus = int(np.sum(layout.gpus_per_server_list[active_indices]))
        active_replicas_by_model[label] = active_gpus // layout.gpus_per_replica

    return AugmentedPower(
        power_w=ThreePhase(
            a=float(phase_power[0]),
            b=float(phase_power[1]),
            c=float(phase_power[2]),
        ),
        power_by_model_w=power_by_model,
        active_replicas_by_model=active_replicas_by_model,
    )

reset()

Re-seed the noise RNG to its initial state.

Source code in openg2g/datacenter/layout.py
def reset(self) -> None:
    """Re-seed the noise RNG to its initial state."""
    self._rng = np.random.default_rng(self._seed)

openg2g.datacenter.offline

Offline (trace-based) datacenter backend.

Loads power trace CSVs and serves per-timestep OfflineDatacenterState objects via step().

OfflineDatacenterState dataclass

Bases: LLMDatacenterState

Extended state from the offline (trace-based) backend.

Source code in openg2g/datacenter/offline.py
@dataclass(frozen=True)
class OfflineDatacenterState(LLMDatacenterState):
    """Extended state from the offline (trace-based) backend."""

    power_by_model_w: dict[str, float] = field(default_factory=dict)

PowerTrace dataclass

A single power trace measurement.

Attributes:

Name Type Description
t_s ndarray

Time vector (seconds), monotonically increasing.

power_w ndarray

Total power vector (watts) across all measured GPUs, same length as t_s.

measured_gpus int

Number of GPUs used in the measurement.

Source code in openg2g/datacenter/offline.py
@dataclass(frozen=True)
class PowerTrace:
    """A single power trace measurement.

    Attributes:
        t_s: Time vector (seconds), monotonically increasing.
        power_w: Total power vector (watts) across all measured GPUs,
            same length as `t_s`.
        measured_gpus: Number of GPUs used in the measurement.
    """

    t_s: np.ndarray
    power_w: np.ndarray
    measured_gpus: int

    def __post_init__(self) -> None:
        if len(self.t_s) != len(self.power_w):
            raise ValueError(f"t_s and power_w must have the same length, got {len(self.t_s)} and {len(self.power_w)}")
        if len(self.t_s) < 5:
            raise ValueError("Trace too short (need at least 5 samples).")
        if self.measured_gpus < 1:
            raise ValueError(f"measured_gpus must be >= 1, got {self.measured_gpus}")

PowerTraceStore

Manages power traces and pre-built per-GPU templates.

Indexed by (model_label, batch_size). Provides: - load(manifest): load traces discovered via a manifest CSV - from_traces(traces): construct from pre-built PowerTrace objects - build_templates(...): pre-build per-GPU power templates - template(model_label, batch_size): look up a pre-built template - trace(model_label, batch_size): access the raw trace

Attributes:

Name Type Description
MANIFEST_COL_MODEL_LABEL

Column name for model label in the manifest.

MANIFEST_COL_NUM_GPUS

Column name for measured GPU count in the manifest.

MANIFEST_COL_BATCH_SIZE

Column name for batch size in the manifest.

MANIFEST_COL_TRACE_FILE

Column name for trace file path in the manifest.

TRACE_COL_TIME

Column name for time in trace CSVs.

TRACE_COL_POWER

Column name for power in trace CSVs.

Source code in openg2g/datacenter/offline.py
class PowerTraceStore:
    """Manages power traces and pre-built per-GPU templates.

    Indexed by `(model_label, batch_size)`. Provides:
    - `load(manifest)`: load traces discovered via a manifest CSV
    - `from_traces(traces)`: construct from pre-built `PowerTrace` objects
    - `build_templates(...)`: pre-build per-GPU power templates
    - `template(model_label, batch_size)`: look up a pre-built template
    - `trace(model_label, batch_size)`: access the raw trace

    Attributes:
        MANIFEST_COL_MODEL_LABEL: Column name for model label in the manifest.
        MANIFEST_COL_NUM_GPUS: Column name for measured GPU count in the manifest.
        MANIFEST_COL_BATCH_SIZE: Column name for batch size in the manifest.
        MANIFEST_COL_TRACE_FILE: Column name for trace file path in the manifest.
        TRACE_COL_TIME: Column name for time in trace CSVs.
        TRACE_COL_POWER: Column name for power in trace CSVs.
    """

    MANIFEST_COL_MODEL_LABEL = "model_label"
    MANIFEST_COL_NUM_GPUS = "num_gpus"
    MANIFEST_COL_BATCH_SIZE = "max_num_seqs"
    MANIFEST_COL_TRACE_FILE = "trace_file"
    TRACE_COL_TIME = "relative_time_s"
    TRACE_COL_POWER = "power_total_W"

    def __init__(self, traces: dict[str, dict[int, PowerTrace]]) -> None:
        self._traces = {str(label): {int(b): tr for b, tr in per_batch.items()} for label, per_batch in traces.items()}
        self._templates: dict[tuple[str, int], np.ndarray] = {}
        self._built = False

    @classmethod
    def load(cls, manifest: Path) -> PowerTraceStore:
        """Load traces discovered via a manifest CSV.

        Trace file paths in the manifest are resolved relative to the
        manifest file's parent directory.

        Args:
            manifest: Path to the manifest CSV (e.g. `traces_summary.csv`).
                Expected columns: `model_label`, `num_gpus`, `max_num_seqs`,
                `trace_file`.
        """
        import pandas as pd

        manifest = Path(manifest)
        base_dir = manifest.parent
        df = pd.read_csv(manifest)

        required_cols = [
            cls.MANIFEST_COL_MODEL_LABEL,
            cls.MANIFEST_COL_NUM_GPUS,
            cls.MANIFEST_COL_BATCH_SIZE,
            cls.MANIFEST_COL_TRACE_FILE,
        ]
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            raise ValueError(f"Manifest {manifest} missing columns: {missing}. Got: {list(df.columns)}")

        traces: dict[str, dict[int, PowerTrace]] = {}
        for row in df.to_dict(orient="records"):
            label = str(row[cls.MANIFEST_COL_MODEL_LABEL])
            num_gpus = int(row[cls.MANIFEST_COL_NUM_GPUS])
            batch = int(row[cls.MANIFEST_COL_BATCH_SIZE])
            trace_path = base_dir / str(row[cls.MANIFEST_COL_TRACE_FILE])

            if not trace_path.exists():
                raise FileNotFoundError(f"Trace file not found: {trace_path} (model={label}, batch={batch})")

            tdf = pd.read_csv(trace_path)
            if cls.TRACE_COL_TIME not in tdf.columns or cls.TRACE_COL_POWER not in tdf.columns:
                raise ValueError(
                    f"{trace_path} must contain {cls.TRACE_COL_TIME!r} and "
                    f"{cls.TRACE_COL_POWER!r}. Got: {list(tdf.columns)}"
                )

            t = tdf[cls.TRACE_COL_TIME].to_numpy(float)
            p = tdf[cls.TRACE_COL_POWER].to_numpy(float)
            if np.any(np.diff(t) < 0):
                idx = np.argsort(t)
                t, p = t[idx], p[idx]

            traces.setdefault(label, {})[batch] = PowerTrace(
                t_s=t,
                power_w=p,
                measured_gpus=num_gpus,
            )

        return cls(traces)

    @classmethod
    def from_traces(cls, traces: dict[str, dict[int, PowerTrace]]) -> PowerTraceStore:
        """Construct from pre-built `PowerTrace` objects.

        Args:
            traces: Mapping of `model_label -> batch_size -> PowerTrace`.
        """
        return cls(traces)

    def trace(self, model_label: str, batch_size: int) -> PowerTrace:
        """Return the raw power trace for a model and batch size."""
        per_batch = self._traces.get(model_label)
        if per_batch is None:
            raise KeyError(f"Unknown model: {model_label!r}")
        tr = per_batch.get(int(batch_size))
        if tr is None:
            raise KeyError(
                f"No trace for model={model_label!r}, batch={batch_size}. "
                f"Available batch sizes: {sorted(per_batch.keys())}"
            )
        return tr

    def build_templates(
        self,
        *,
        duration_s: Fraction | float,
        timestep_s: Fraction | float,
        steady_skip_s: float = 0.0,
    ) -> None:
        """Pre-build per-GPU power templates for all traces.

        Args:
            duration_s: Total simulation duration (seconds).
            timestep_s: Simulation timestep (seconds).
            steady_skip_s: Skip this many seconds from the start of each
                trace to avoid warm-up transients.
        """
        self._templates.clear()
        for label, per_batch in self._traces.items():
            for batch, tr in per_batch.items():
                tpl = _build_per_gpu_power_template(
                    tr,
                    timestep_s=timestep_s,
                    duration_s=duration_s,
                    steady_skip_s=steady_skip_s,
                )
                self._templates[(label, batch)] = tpl
        self._built = True

    def template(self, model_label: str, batch_size: int) -> np.ndarray:
        """Return a pre-built per-GPU power template.

        Requires a prior call to `build_templates`.
        """
        if not self._built:
            raise RuntimeError("Call build_templates() first.")
        key = (str(model_label), int(batch_size))
        if key not in self._templates:
            raise KeyError(f"No template for model={model_label!r}, batch={batch_size}.")
        return self._templates[key]

    @property
    def model_labels(self) -> list[str]:
        """List of model labels in the store."""
        return list(self._traces.keys())

    def batch_sizes(self, model_label: str) -> list[int]:
        """List of batch sizes available for a model."""
        per_batch = self._traces.get(model_label)
        if per_batch is None:
            raise KeyError(f"Unknown model: {model_label!r}")
        return sorted(per_batch.keys())

model_labels property

List of model labels in the store.

load(manifest) classmethod

Load traces discovered via a manifest CSV.

Trace file paths in the manifest are resolved relative to the manifest file's parent directory.

Parameters:

Name Type Description Default
manifest Path

Path to the manifest CSV (e.g. traces_summary.csv). Expected columns: model_label, num_gpus, max_num_seqs, trace_file.

required
Source code in openg2g/datacenter/offline.py
@classmethod
def load(cls, manifest: Path) -> PowerTraceStore:
    """Load traces discovered via a manifest CSV.

    Trace file paths in the manifest are resolved relative to the
    manifest file's parent directory.

    Args:
        manifest: Path to the manifest CSV (e.g. `traces_summary.csv`).
            Expected columns: `model_label`, `num_gpus`, `max_num_seqs`,
            `trace_file`.
    """
    import pandas as pd

    manifest = Path(manifest)
    base_dir = manifest.parent
    df = pd.read_csv(manifest)

    required_cols = [
        cls.MANIFEST_COL_MODEL_LABEL,
        cls.MANIFEST_COL_NUM_GPUS,
        cls.MANIFEST_COL_BATCH_SIZE,
        cls.MANIFEST_COL_TRACE_FILE,
    ]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Manifest {manifest} missing columns: {missing}. Got: {list(df.columns)}")

    traces: dict[str, dict[int, PowerTrace]] = {}
    for row in df.to_dict(orient="records"):
        label = str(row[cls.MANIFEST_COL_MODEL_LABEL])
        num_gpus = int(row[cls.MANIFEST_COL_NUM_GPUS])
        batch = int(row[cls.MANIFEST_COL_BATCH_SIZE])
        trace_path = base_dir / str(row[cls.MANIFEST_COL_TRACE_FILE])

        if not trace_path.exists():
            raise FileNotFoundError(f"Trace file not found: {trace_path} (model={label}, batch={batch})")

        tdf = pd.read_csv(trace_path)
        if cls.TRACE_COL_TIME not in tdf.columns or cls.TRACE_COL_POWER not in tdf.columns:
            raise ValueError(
                f"{trace_path} must contain {cls.TRACE_COL_TIME!r} and "
                f"{cls.TRACE_COL_POWER!r}. Got: {list(tdf.columns)}"
            )

        t = tdf[cls.TRACE_COL_TIME].to_numpy(float)
        p = tdf[cls.TRACE_COL_POWER].to_numpy(float)
        if np.any(np.diff(t) < 0):
            idx = np.argsort(t)
            t, p = t[idx], p[idx]

        traces.setdefault(label, {})[batch] = PowerTrace(
            t_s=t,
            power_w=p,
            measured_gpus=num_gpus,
        )

    return cls(traces)

from_traces(traces) classmethod

Construct from pre-built PowerTrace objects.

Parameters:

Name Type Description Default
traces dict[str, dict[int, PowerTrace]]

Mapping of model_label -> batch_size -> PowerTrace.

required
Source code in openg2g/datacenter/offline.py
@classmethod
def from_traces(cls, traces: dict[str, dict[int, PowerTrace]]) -> PowerTraceStore:
    """Construct from pre-built `PowerTrace` objects.

    Args:
        traces: Mapping of `model_label -> batch_size -> PowerTrace`.
    """
    return cls(traces)

trace(model_label, batch_size)

Return the raw power trace for a model and batch size.

Source code in openg2g/datacenter/offline.py
def trace(self, model_label: str, batch_size: int) -> PowerTrace:
    """Return the raw power trace for a model and batch size."""
    per_batch = self._traces.get(model_label)
    if per_batch is None:
        raise KeyError(f"Unknown model: {model_label!r}")
    tr = per_batch.get(int(batch_size))
    if tr is None:
        raise KeyError(
            f"No trace for model={model_label!r}, batch={batch_size}. "
            f"Available batch sizes: {sorted(per_batch.keys())}"
        )
    return tr

build_templates(*, duration_s, timestep_s, steady_skip_s=0.0)

Pre-build per-GPU power templates for all traces.

Parameters:

Name Type Description Default
duration_s Fraction | float

Total simulation duration (seconds).

required
timestep_s Fraction | float

Simulation timestep (seconds).

required
steady_skip_s float

Skip this many seconds from the start of each trace to avoid warm-up transients.

0.0
Source code in openg2g/datacenter/offline.py
def build_templates(
    self,
    *,
    duration_s: Fraction | float,
    timestep_s: Fraction | float,
    steady_skip_s: float = 0.0,
) -> None:
    """Pre-build per-GPU power templates for all traces.

    Args:
        duration_s: Total simulation duration (seconds).
        timestep_s: Simulation timestep (seconds).
        steady_skip_s: Skip this many seconds from the start of each
            trace to avoid warm-up transients.
    """
    self._templates.clear()
    for label, per_batch in self._traces.items():
        for batch, tr in per_batch.items():
            tpl = _build_per_gpu_power_template(
                tr,
                timestep_s=timestep_s,
                duration_s=duration_s,
                steady_skip_s=steady_skip_s,
            )
            self._templates[(label, batch)] = tpl
    self._built = True

template(model_label, batch_size)

Return a pre-built per-GPU power template.

Requires a prior call to build_templates.

Source code in openg2g/datacenter/offline.py
def template(self, model_label: str, batch_size: int) -> np.ndarray:
    """Return a pre-built per-GPU power template.

    Requires a prior call to `build_templates`.
    """
    if not self._built:
        raise RuntimeError("Call build_templates() first.")
    key = (str(model_label), int(batch_size))
    if key not in self._templates:
        raise KeyError(f"No template for model={model_label!r}, batch={batch_size}.")
    return self._templates[key]

batch_sizes(model_label)

List of batch sizes available for a model.

Source code in openg2g/datacenter/offline.py
def batch_sizes(self, model_label: str) -> list[int]:
    """List of batch sizes available for a model."""
    per_batch = self._traces.get(model_label)
    if per_batch is None:
        raise KeyError(f"Unknown model: {model_label!r}")
    return sorted(per_batch.keys())

OfflineDatacenter

Bases: LLMBatchSizeControlledDatacenter[OfflineDatacenterState]

Trace-based datacenter simulation with step-by-step interface.

Each step() call computes one timestep of power output by indexing into pre-built per-GPU templates, applying per-server amplitude scaling and noise, and summing across active servers per phase.

Batch size changes via apply_control() take effect on the next step() call.

Parameters:

Name Type Description Default
trace_store PowerTraceStore

PowerTraceStore with templates for all (model, batch) pairs.

required
models list[LLMInferenceModelSpec]

List of model specs describing the served models.

required
timestep_s Fraction

Simulation timestep (seconds).

required
gpus_per_server int

Number of GPUs per physical server rack.

8
seed int

Random seed for layout generation and noise.

0
amplitude_scale_range tuple[float, float]

(low, high) range for per-server amplitude scaling. Each server draws a uniform multiplier from this range.

(1.0, 1.0)
noise_fraction float

Gaussian noise standard deviation as a fraction of per-server power.

0.0
activation_strategy ActivationStrategy | None

Controls which servers are active at each timestep. Subclass ActivationStrategy for custom strategies (e.g., phase-aware load balancing). If None, all servers are always active.

None
base_kw_per_phase float

Constant base load per phase (kW).

0.0
training_overlays list[tuple[TrainingOverlayCache, TrainingRun]] | None

List of (TrainingOverlayCache, TrainingRun) pairs for training workloads.

None
itl_distributions dict[str, dict[int, ITLMixtureModel]] | None

Optional per-model ITL mixture distributions: model_label -> batch_size -> ITLMixtureModel.

None
latency_exact_threshold int

Exact-sampling threshold for latency averaging.

30
latency_seed int | None

Optional seed for latency RNG. Defaults to seed + 54321.

None
Source code in openg2g/datacenter/offline.py
class OfflineDatacenter(LLMBatchSizeControlledDatacenter[OfflineDatacenterState]):
    """Trace-based datacenter simulation with step-by-step interface.

    Each `step()` call computes one timestep of power output by indexing
    into pre-built per-GPU templates, applying per-server amplitude scaling
    and noise, and summing across active servers per phase.

    Batch size changes via `apply_control()` take effect on the next
    `step()` call.

    Args:
        trace_store: `PowerTraceStore` with templates for all
            (model, batch) pairs.
        models: List of model specs describing the served models.
        timestep_s: Simulation timestep (seconds).
        gpus_per_server: Number of GPUs per physical server rack.
        seed: Random seed for layout generation and noise.
        amplitude_scale_range: `(low, high)` range for per-server amplitude
            scaling. Each server draws a uniform multiplier from this range.
        noise_fraction: Gaussian noise standard deviation as a fraction of
            per-server power.
        activation_strategy: Controls which servers are active at each
            timestep. Subclass `ActivationStrategy` for custom strategies
            (e.g., phase-aware load balancing). If `None`, all servers are
            always active.
        base_kw_per_phase: Constant base load per phase (kW).
        training_overlays: List of `(TrainingOverlayCache, TrainingRun)` pairs
            for training workloads.
        itl_distributions: Optional per-model ITL mixture distributions:
            `model_label -> batch_size -> ITLMixtureModel`.
        latency_exact_threshold: Exact-sampling threshold for latency averaging.
        latency_seed: Optional seed for latency RNG. Defaults to `seed + 54321`.
    """

    def __init__(
        self,
        *,
        trace_store: PowerTraceStore,
        models: list[LLMInferenceModelSpec],
        timestep_s: Fraction,
        gpus_per_server: int = 8,
        seed: int = 0,
        amplitude_scale_range: tuple[float, float] = (1.0, 1.0),
        noise_fraction: float = 0.0,
        activation_strategy: ActivationStrategy | None = None,
        base_kw_per_phase: float = 0.0,
        training_overlays: list[tuple[TrainingOverlayCache, TrainingRun]] | None = None,
        itl_distributions: dict[str, dict[int, ITLMixtureModel]] | None = None,
        latency_exact_threshold: int = 30,
        latency_seed: int | None = None,
    ) -> None:
        self._timestep_s = timestep_s
        self._trace_store = trace_store
        self._models = list(models)
        self._gpus_per_server = int(gpus_per_server)
        self._seed = int(seed)
        self._amplitude_scale_range = (
            float(amplitude_scale_range[0]),
            float(amplitude_scale_range[1]),
        )
        self._noise_fraction = float(noise_fraction)

        self._layout_rng = np.random.default_rng(self._seed)

        self._activation_strategy = activation_strategy
        self._base_W_per_phase = float(base_kw_per_phase) * 1e3

        self._training_overlays: list[tuple[TrainingOverlayCache, TrainingRun]] = list(training_overlays or [])
        self._itl_distributions = itl_distributions
        self._latency_exact_threshold = int(latency_exact_threshold)

        self._batch_by_model: dict[str, int] = {ms.model_label: ms.initial_batch_size for ms in models}

        self._layouts: dict[str, ServerLayout] = {}
        self._build_all_layouts()
        self._augmenter = PowerAugmenter(
            layouts=self._layouts,
            base_w_per_phase=self._base_W_per_phase,
            seed=self._seed + 12345,
        )

        self._global_step: int = 0
        self._latency_seed = int(seed) + 54321 if latency_seed is None else int(latency_seed)
        self._latency_rng = np.random.default_rng(self._latency_seed)
        self._events: EventEmitter | None = None
        self._state: OfflineDatacenterState | None = None
        self._history: list[OfflineDatacenterState] = []

        logger.info(
            "OfflineDatacenter: %d models, dt=%s s, seed=%d",
            len(models),
            timestep_s,
            seed,
        )
        for ms in models:
            logger.info(
                "  %s: %d replicas, %d GPUs/replica, batch=%d",
                ms.model_label,
                ms.num_replicas,
                ms.gpus_per_replica,
                ms.initial_batch_size,
            )

    @property
    def dt_s(self) -> Fraction:
        return self._timestep_s

    @property
    def models(self) -> list[LLMInferenceModelSpec]:
        return list(self._models)

    @property
    def batch_by_model(self) -> dict[str, int]:
        return dict(self._batch_by_model)

    @property
    def state(self) -> OfflineDatacenterState:
        if self._state is None:
            raise RuntimeError("OfflineDatacenter.state accessed before first step().")
        return self._state

    def history(self, n: int | None = None) -> list[OfflineDatacenterState]:
        if n is None:
            return list(self._history)
        if n <= 0:
            return []
        return list(self._history[-int(n) :])

    def step(self, clock: SimulationClock) -> OfflineDatacenterState:
        t_now = clock.time_s

        per_gpu_by_model: dict[str, np.ndarray] = {}
        for ms in self._models:
            label = ms.model_label
            if ms.num_replicas <= 0:
                continue
            if label not in self._batch_by_model:
                raise KeyError(f"Missing required batch size for model {label!r}")
            batch = int(self._batch_by_model[label])

            layout = self._layouts[label]
            template = self._trace_store.template(label, batch)
            L = len(template)
            indices = (self._global_step + layout.stagger_offsets) % L
            per_gpu_by_model[label] = template[indices]

        aug = self._augmenter.step(per_gpu_by_model, t_now)

        power_by_model = dict(aug.power_by_model_w)
        active_replicas_by_model = dict(aug.active_replicas_by_model)
        for ms in self._models:
            power_by_model.setdefault(ms.model_label, 0.0)
            active_replicas_by_model.setdefault(ms.model_label, 0)

        phase_power = np.array([aug.power_w.a, aug.power_w.b, aug.power_w.c])

        # Training overlay
        t_arr = np.asarray(t_now, dtype=float)
        for overlay, tr in self._training_overlays:
            training_power_w = float(
                overlay.eval_total_on_grid(
                    t_arr,
                    t_add_start=tr.t_start,
                    t_add_end=tr.t_end,
                    n_train_gpus=tr.n_gpus,
                )
            )
            phase_power += training_power_w / 3.0

        # ITL sampling
        observed_itl_s_by_model: dict[str, float] = {}
        for ms in self._models:
            label = ms.model_label
            n_rep = active_replicas_by_model.get(label, 0)
            if self._itl_distributions is None or n_rep <= 0:
                observed_itl_s_by_model[label] = float("nan")
                continue
            batch = int(self._batch_by_model[label])
            model_dists = self._itl_distributions.get(label)
            if model_dists is None:
                raise KeyError(f"No ITL distributions for model={label!r}")
            params = model_dists.get(batch)
            if params is None:
                raise KeyError(
                    f"No ITL distributions for model={label!r}, batch={batch}. Available={sorted(model_dists.keys())}"
                )
            observed_itl_s_by_model[label] = params.sample_avg(
                n_replicas=n_rep,
                rng=self._latency_rng,
                exact_threshold=self._latency_exact_threshold,
            )

        state = OfflineDatacenterState(
            time_s=float(t_now),
            power_w=ThreePhase(
                a=float(phase_power[0]),
                b=float(phase_power[1]),
                c=float(phase_power[2]),
            ),
            power_by_model_w=power_by_model,
            active_replicas_by_model=active_replicas_by_model,
            batch_size_by_model=dict(self._batch_by_model),
            observed_itl_s_by_model=observed_itl_s_by_model,
        )
        self._global_step += 1
        self._state = state
        self._history.append(state)
        return state

    @functools.singledispatchmethod
    def apply_control(self, command: DatacenterCommand) -> None:
        """Apply a control command. Dispatches on command type."""
        raise TypeError(f"OfflineDatacenter does not support {type(command).__name__}")

    @apply_control.register
    def _(self, command: SetBatchSize) -> None:
        """Record new batch sizes. Changes take effect on the next step."""
        if command.ramp_up_rate_by_model:
            raise ValueError(
                f"OfflineDatacenter does not support ramp_up_rate_by_model (got {command.ramp_up_rate_by_model}). "
                f"Batch size changes are always immediate in trace-based simulation."
            )
        for label, b in command.batch_size_by_model.items():
            b_int = int(b)
            if b_int <= 0:
                raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
            old = self._batch_by_model.get(str(label))
            self._batch_by_model[str(label)] = b_int
            if old != b_int:
                logger.info("Batch size %s: %s -> %d", label, old, b_int)
        if self._events is not None:
            self._events.emit(
                "datacenter.batch_size.updated",
                {"batch_size_by_model": dict(self._batch_by_model)},
            )

    def reset(self) -> None:
        self._state = None
        self._history = []
        self._global_step = 0
        self._batch_by_model = {ms.model_label: ms.initial_batch_size for ms in self._models}
        self._layout_rng = np.random.default_rng(self._seed)
        self._layouts = {}
        self._build_all_layouts()
        self._augmenter = PowerAugmenter(
            layouts=self._layouts,
            base_w_per_phase=self._base_W_per_phase,
            seed=self._seed + 12345,
        )
        self._latency_rng = np.random.default_rng(self._latency_seed)

    def bind_event_emitter(self, emitter: EventEmitter) -> None:
        self._events = emitter

    @classmethod
    def from_config(
        cls,
        datacenter: DatacenterConfig,
        workload: WorkloadConfig,
        *,
        trace_store: PowerTraceStore,
        timestep_s: Fraction,
        seed: int = 0,
        amplitude_scale_range: tuple[float, float] = (1.0, 1.0),
        noise_fraction: float = 0.0,
        itl_distributions: dict[str, dict[int, ITLMixtureModel]] | None = None,
        latency_seed: int | None = None,
        latency_exact_threshold: int = 30,
    ) -> OfflineDatacenter:
        """Create an OfflineDatacenter from config objects.

        If `workload.server_ramps` is set, it is wrapped in a
        `RampActivationStrategy`. For custom activation strategies,
        use the direct constructor with `activation_strategy=`.

        Args:
            datacenter: Facility configuration (GPUs per server, base load).
            workload: Workload configuration (inference models, training, ramps).
            trace_store: Power trace store with templates for all (model, batch).
            timestep_s: Simulation timestep (seconds).
            seed: Random seed for layout generation and noise.
            amplitude_scale_range: `(low, high)` for per-server amplitude scaling.
            noise_fraction: Noise std as fraction of per-server power.
            itl_distributions: Optional per-model ITL mixture distributions.
            latency_seed: Optional seed for latency RNG.
            latency_exact_threshold: Exact-sampling threshold for latency averaging.
        """
        inference = workload.inference
        models = list(inference.models)

        training_runs = list(workload.training)

        activation_strategy: ActivationStrategy | None = None
        if workload.server_ramps:
            activation_strategy = RampActivationStrategy(workload.server_ramps)

        training_overlays: list[tuple[TrainingOverlayCache, TrainingRun]] = []
        for tr in training_runs:
            overlay = TrainingOverlayCache(
                tr.trace,
                target_peak_W_per_gpu=tr.target_peak_W_per_gpu,
            )
            training_overlays.append((overlay, tr))

        return cls(
            trace_store=trace_store,
            models=models,
            timestep_s=timestep_s,
            gpus_per_server=datacenter.gpus_per_server,
            seed=seed,
            amplitude_scale_range=amplitude_scale_range,
            noise_fraction=noise_fraction,
            activation_strategy=activation_strategy,
            base_kw_per_phase=datacenter.base_kw_per_phase,
            training_overlays=training_overlays,
            itl_distributions=itl_distributions,
            latency_exact_threshold=latency_exact_threshold,
            latency_seed=latency_seed,
        )

    def _build_all_layouts(self) -> None:
        """Eagerly build layouts for all models with replicas > 0."""
        default_strategy = RampActivationStrategy(ServerRampSchedule(entries=()))
        strategy = self._activation_strategy or default_strategy
        for ms in self._models:
            if ms.num_replicas > 0:
                any_batch = self.batch_sizes(ms.model_label)[0]
                tpl_len = len(self._trace_store.template(ms.model_label, any_batch))
                self._layouts[ms.model_label] = ServerLayout.build(
                    ms,
                    gpus_per_server=self._gpus_per_server,
                    stagger_range=tpl_len,
                    activation_strategy=strategy,
                    amplitude_scale_range=self._amplitude_scale_range,
                    noise_fraction=self._noise_fraction,
                    rng=self._layout_rng,
                )

    def batch_sizes(self, model_label: str) -> list[int]:
        """Delegate to the trace store for available batch sizes."""
        return self._trace_store.batch_sizes(model_label)

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors derived from server placement.

        Returns:
            Mapping of model label to a 3-element array `[frac_A, frac_B, frac_C]`
            representing the fraction of servers on each phase.
        """
        shares: dict[str, np.ndarray] = {}
        for label, layout in self._layouts.items():
            counts = np.bincount(layout.phase_list, minlength=3).astype(float)
            total = counts.sum()
            if total > 0:
                shares[label] = counts / total
            else:
                shares[label] = np.array([1 / 3, 1 / 3, 1 / 3], dtype=float)
        return shares

phase_share_by_model property

Per-model phase share vectors derived from server placement.

Returns:

Type Description
dict[str, ndarray]

Mapping of model label to a 3-element array [frac_A, frac_B, frac_C]

dict[str, ndarray]

representing the fraction of servers on each phase.

apply_control(command)

Apply a control command. Dispatches on command type.

Source code in openg2g/datacenter/offline.py
@functools.singledispatchmethod
def apply_control(self, command: DatacenterCommand) -> None:
    """Apply a control command. Dispatches on command type."""
    raise TypeError(f"OfflineDatacenter does not support {type(command).__name__}")

from_config(datacenter, workload, *, trace_store, timestep_s, seed=0, amplitude_scale_range=(1.0, 1.0), noise_fraction=0.0, itl_distributions=None, latency_seed=None, latency_exact_threshold=30) classmethod

Create an OfflineDatacenter from config objects.

If workload.server_ramps is set, it is wrapped in a RampActivationStrategy. For custom activation strategies, use the direct constructor with activation_strategy=.

Parameters:

Name Type Description Default
datacenter DatacenterConfig

Facility configuration (GPUs per server, base load).

required
workload WorkloadConfig

Workload configuration (inference models, training, ramps).

required
trace_store PowerTraceStore

Power trace store with templates for all (model, batch).

required
timestep_s Fraction

Simulation timestep (seconds).

required
seed int

Random seed for layout generation and noise.

0
amplitude_scale_range tuple[float, float]

(low, high) for per-server amplitude scaling.

(1.0, 1.0)
noise_fraction float

Noise std as fraction of per-server power.

0.0
itl_distributions dict[str, dict[int, ITLMixtureModel]] | None

Optional per-model ITL mixture distributions.

None
latency_seed int | None

Optional seed for latency RNG.

None
latency_exact_threshold int

Exact-sampling threshold for latency averaging.

30
Source code in openg2g/datacenter/offline.py
@classmethod
def from_config(
    cls,
    datacenter: DatacenterConfig,
    workload: WorkloadConfig,
    *,
    trace_store: PowerTraceStore,
    timestep_s: Fraction,
    seed: int = 0,
    amplitude_scale_range: tuple[float, float] = (1.0, 1.0),
    noise_fraction: float = 0.0,
    itl_distributions: dict[str, dict[int, ITLMixtureModel]] | None = None,
    latency_seed: int | None = None,
    latency_exact_threshold: int = 30,
) -> OfflineDatacenter:
    """Create an OfflineDatacenter from config objects.

    If `workload.server_ramps` is set, it is wrapped in a
    `RampActivationStrategy`. For custom activation strategies,
    use the direct constructor with `activation_strategy=`.

    Args:
        datacenter: Facility configuration (GPUs per server, base load).
        workload: Workload configuration (inference models, training, ramps).
        trace_store: Power trace store with templates for all (model, batch).
        timestep_s: Simulation timestep (seconds).
        seed: Random seed for layout generation and noise.
        amplitude_scale_range: `(low, high)` for per-server amplitude scaling.
        noise_fraction: Noise std as fraction of per-server power.
        itl_distributions: Optional per-model ITL mixture distributions.
        latency_seed: Optional seed for latency RNG.
        latency_exact_threshold: Exact-sampling threshold for latency averaging.
    """
    inference = workload.inference
    models = list(inference.models)

    training_runs = list(workload.training)

    activation_strategy: ActivationStrategy | None = None
    if workload.server_ramps:
        activation_strategy = RampActivationStrategy(workload.server_ramps)

    training_overlays: list[tuple[TrainingOverlayCache, TrainingRun]] = []
    for tr in training_runs:
        overlay = TrainingOverlayCache(
            tr.trace,
            target_peak_W_per_gpu=tr.target_peak_W_per_gpu,
        )
        training_overlays.append((overlay, tr))

    return cls(
        trace_store=trace_store,
        models=models,
        timestep_s=timestep_s,
        gpus_per_server=datacenter.gpus_per_server,
        seed=seed,
        amplitude_scale_range=amplitude_scale_range,
        noise_fraction=noise_fraction,
        activation_strategy=activation_strategy,
        base_kw_per_phase=datacenter.base_kw_per_phase,
        training_overlays=training_overlays,
        itl_distributions=itl_distributions,
        latency_exact_threshold=latency_exact_threshold,
        latency_seed=latency_seed,
    )

batch_sizes(model_label)

Delegate to the trace store for available batch sizes.

Source code in openg2g/datacenter/offline.py
def batch_sizes(self, model_label: str) -> list[int]:
    """Delegate to the trace store for available batch sizes."""
    return self._trace_store.batch_sizes(model_label)

start()

Acquire per-run resources (threads, solver circuits).

Called after reset(), before the simulation loop. Override for backends that need resource acquisition (e.g., OpenDSSGrid compiles its DSS circuit here). No-op by default because most offline components have no resources to acquire.

Source code in openg2g/datacenter/base.py
def start(self) -> None:
    """Acquire per-run resources (threads, solver circuits).

    Called after `reset()`, before the simulation loop. Override for
    backends that need resource acquisition (e.g., `OpenDSSGrid`
    compiles its DSS circuit here). No-op by default because most
    offline components have no resources to acquire.
    """

stop()

Release per-run resources. Simulation state is preserved.

Called after the simulation loop in LIFO order. Override for backends that acquired resources in start(). No-op by default.

Source code in openg2g/datacenter/base.py
def stop(self) -> None:
    """Release per-run resources. Simulation state is preserved.

    Called after the simulation loop in LIFO order. Override for
    backends that acquired resources in `start()`. No-op by default.
    """

openg2g.datacenter.online

Online (live GPU) datacenter backend with power augmentation.

Connects to real vLLM inference servers for load generation and ITL measurement, and to zeusd instances for live GPU power monitoring. Power readings from a small number of real GPUs are augmented to datacenter scale using the shared PowerAugmenter pipeline from openg2g.datacenter.layout.

Requires pip install zeus aiohttp.

OnlineDatacenterState dataclass

Bases: LLMDatacenterState

Extended state from the online (live GPU) backend.

The base power_w field carries the augmented three-phase power (what the grid sees). This subclass adds the measured (pre-augmentation) breakdown for post-hoc analysis.

Attributes:

Name Type Description
measured_power_w ThreePhase

Total measured three-phase power from real GPUs (before augmentation), plus base load.

measured_power_w_by_model dict[str, float]

Per-model total measured power from real GPUs (watts).

augmented_power_w_by_model dict[str, float]

Per-model augmented power (watts). This is the power fed to the grid for each model after scaling up.

augmentation_factor_by_model dict[str, float]

Per-model augmentation multiplier (virtual replicas / real replicas).

prometheus_metrics_by_model dict[str, dict[str, float]]

Per-model Prometheus metrics snapshot. Keys are model labels, values are dicts with metric names like num_requests_running, num_requests_waiting, kv_cache_usage_perc, num_preemptions_total.

Source code in openg2g/datacenter/online.py
@dataclass(frozen=True)
class OnlineDatacenterState(LLMDatacenterState):
    """Extended state from the online (live GPU) backend.

    The base `power_w` field carries the augmented three-phase power
    (what the grid sees). This subclass adds the measured (pre-augmentation)
    breakdown for post-hoc analysis.

    Attributes:
        measured_power_w: Total measured three-phase power from real GPUs
            (before augmentation), plus base load.
        measured_power_w_by_model: Per-model total measured power from real
            GPUs (watts).
        augmented_power_w_by_model: Per-model augmented power (watts). This
            is the power fed to the grid for each model after scaling up.
        augmentation_factor_by_model: Per-model augmentation multiplier
            (virtual replicas / real replicas).
        prometheus_metrics_by_model: Per-model Prometheus metrics snapshot.
            Keys are model labels, values are dicts with metric names like
            `num_requests_running`, `num_requests_waiting`,
            `kv_cache_usage_perc`, `num_preemptions_total`.
    """

    measured_power_w: ThreePhase = field(default_factory=lambda: ThreePhase(a=0.0, b=0.0, c=0.0))
    measured_power_w_by_model: dict[str, float] = field(default_factory=dict)
    augmented_power_w_by_model: dict[str, float] = field(default_factory=dict)
    augmentation_factor_by_model: dict[str, float] = field(default_factory=dict)
    prometheus_metrics_by_model: dict[str, dict[str, float]] = field(default_factory=dict)

GPUEndpointMapping dataclass

Maps a zeusd endpoint to specific GPUs on a specific electrical phase.

Parameters:

Name Type Description Default
host str

Hostname or IP of the zeusd instance.

required
port int

TCP port of the zeusd instance.

4938
gpu_indices tuple[int, ...]

GPU device indices to monitor on this endpoint.

(0,)
phase Phase

Electrical phase this endpoint's GPUs are connected to.

A
Source code in openg2g/datacenter/online.py
@dataclass(frozen=True)
class GPUEndpointMapping:
    """Maps a zeusd endpoint to specific GPUs on a specific electrical phase.

    Args:
        host: Hostname or IP of the zeusd instance.
        port: TCP port of the zeusd instance.
        gpu_indices: GPU device indices to monitor on this endpoint.
        phase: Electrical phase this endpoint's GPUs are connected to.
    """

    host: str
    port: int = 4938
    gpu_indices: tuple[int, ...] = (0,)
    phase: Phase = Phase.A

    @property
    def endpoint_key(self) -> str:
        """Return the `host:port` key used by `PowerStreamingClient`."""
        return f"{self.host}:{self.port}"

endpoint_key property

Return the host:port key used by PowerStreamingClient.

OnlineModelDeployment dataclass

Deployment of one model on physical hardware.

Pairs a reusable LLMInferenceModelSpec with physical deployment details. spec.num_replicas is the simulated (augmented) count for grid simulation. The real replica count is derived from gpu_endpoints and spec.gpus_per_replica.

Parameters:

Name Type Description Default
spec LLMInferenceModelSpec

Model specification (shared with offline datacenter).

required
vllm_base_url str

Base URL of the vLLM server (e.g. http://node1:8000).

required
model_name str

OpenAI API model name served by vLLM.

required
gpu_endpoints tuple[GPUEndpointMapping, ...]

GPU endpoint mappings for power monitoring.

()
Source code in openg2g/datacenter/online.py
@dataclass(frozen=True)
class OnlineModelDeployment:
    """Deployment of one model on physical hardware.

    Pairs a reusable `LLMInferenceModelSpec` with physical deployment
    details.  `spec.num_replicas` is the simulated (augmented) count
    for grid simulation.  The real replica count is derived from
    `gpu_endpoints` and `spec.gpus_per_replica`.

    Args:
        spec: Model specification (shared with offline datacenter).
        vllm_base_url: Base URL of the vLLM server (e.g. `http://node1:8000`).
        model_name: OpenAI API model name served by vLLM.
        gpu_endpoints: GPU endpoint mappings for power monitoring.
    """

    spec: LLMInferenceModelSpec
    vllm_base_url: str
    model_name: str
    gpu_endpoints: tuple[GPUEndpointMapping, ...] = ()

    @property
    def model_label(self) -> str:
        return self.spec.model_label

    @property
    def num_real_gpus(self) -> int:
        """Total number of real GPUs for this model across all endpoints."""
        return sum(len(ep.gpu_indices) for ep in self.gpu_endpoints)

    @property
    def num_real_replicas(self) -> int:
        """Number of real replicas (real GPUs / GPUs per replica)."""
        return self.num_real_gpus // max(self.spec.gpus_per_replica, 1)

    @property
    def augmentation_factor(self) -> float:
        """Ratio of simulated replicas to real replicas."""
        return self.spec.num_replicas / max(self.num_real_replicas, 1)

num_real_gpus property

Total number of real GPUs for this model across all endpoints.

num_real_replicas property

Number of real replicas (real GPUs / GPUs per replica).

augmentation_factor property

Ratio of simulated replicas to real replicas.

LoadGenerationConfig dataclass

Configuration for the request load generator.

Parameters:

Name Type Description Default
max_output_tokens int

Maximum output tokens per request.

512
concurrency_multiplier float

Number of concurrent requests per unit of batch size (N_concurrent = batch_size * this).

3.0
itl_window_s float

Seconds of ITL history to average over.

1.0
Source code in openg2g/datacenter/online.py
@dataclass(frozen=True)
class LoadGenerationConfig:
    """Configuration for the request load generator.

    Args:
        max_output_tokens: Maximum output tokens per request.
        concurrency_multiplier: Number of concurrent requests per unit
            of batch size (`N_concurrent = batch_size * this`).
        itl_window_s: Seconds of ITL history to average over.
    """

    max_output_tokens: int = 512
    concurrency_multiplier: float = 3.0
    itl_window_s: float = 1.0

PowerAugmentationConfig dataclass

Configuration for scaling real GPU power to datacenter level.

Parameters:

Name Type Description Default
base_kw_per_phase float

Constant base load per electrical phase (kW).

0.0
noise_fraction float

Gaussian noise standard deviation as a fraction of per-server power. Applied per-server by the shared PowerAugmenter.

0.02
stagger_buffer_s float

Seconds of power history for temporal staggering. Also used as the stagger range when building ServerLayout (float offsets drawn from [0, stagger_buffer_s)).

10.0
gpus_per_server int

Number of GPUs per virtual server for layout computation.

8
amplitude_scale_range tuple[float, float]

(low, high) range for per-server amplitude scaling. Each virtual server draws a uniform multiplier from this range.

(0.9, 1.1)
seed int

Random seed for layout generation and noise.

0
Source code in openg2g/datacenter/online.py
@dataclass(frozen=True)
class PowerAugmentationConfig:
    """Configuration for scaling real GPU power to datacenter level.

    Args:
        base_kw_per_phase: Constant base load per electrical phase (kW).
        noise_fraction: Gaussian noise standard deviation as a fraction
            of per-server power. Applied per-server by the shared
            `PowerAugmenter`.
        stagger_buffer_s: Seconds of power history for temporal staggering.
            Also used as the stagger range when building `ServerLayout`
            (float offsets drawn from `[0, stagger_buffer_s)`).
        gpus_per_server: Number of GPUs per virtual server for layout
            computation.
        amplitude_scale_range: `(low, high)` range for per-server amplitude
            scaling. Each virtual server draws a uniform multiplier from
            this range.
        seed: Random seed for layout generation and noise.
    """

    base_kw_per_phase: float = 0.0
    noise_fraction: float = 0.02
    stagger_buffer_s: float = 10.0
    gpus_per_server: int = 8
    amplitude_scale_range: tuple[float, float] = (0.9, 1.1)
    seed: int = 0

OnlineDatacenter

Bases: LLMBatchSizeControlledDatacenter[OnlineDatacenterState]

Live GPU datacenter backend with power augmentation.

Dispatches inference load to vLLM servers, streams GPU power from zeusd, measures ITL from streaming responses, and augments power readings to datacenter scale using the shared PowerAugmenter pipeline (same as OfflineDatacenter).

Call start() before the first step() and stop() after the simulation loop finishes.

Parameters:

Name Type Description Default
deployments Sequence[OnlineModelDeployment]

List of model deployments with physical hardware mapping.

required
power_client PowerStreamingClient

Zeus PowerStreamingClient connected to zeusd instances.

required
augmentation PowerAugmentationConfig | None

Power augmentation configuration.

None
load_gen LoadGenerationConfig | None

Load generation configuration.

None
requests_by_model dict[str, list[dict]]

Mapping of model_label -> list of pre-built OpenAI Chat Completion request dicts (from data/online/build_requests.py). Each dict should contain at least model, messages, and max_completion_tokens. Streaming fields are added automatically.

required
dt_s Fraction

Simulation timestep (seconds).

Fraction(1, 10)
activation_strategy ActivationStrategy | None

Controls which virtual servers are active at each timestep. If None, all servers are always active.

None
prometheus_poll_interval_s float

How often to poll vLLM /metrics (seconds). Set to 0 to disable Prometheus polling.

0.5
health_check bool

If True, run health checks on start().

True
Source code in openg2g/datacenter/online.py
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
class OnlineDatacenter(LLMBatchSizeControlledDatacenter[OnlineDatacenterState]):
    """Live GPU datacenter backend with power augmentation.

    Dispatches inference load to vLLM servers, streams GPU power from
    zeusd, measures ITL from streaming responses, and augments power
    readings to datacenter scale using the shared `PowerAugmenter`
    pipeline (same as `OfflineDatacenter`).

    Call `start()` before the first `step()` and `stop()` after the
    simulation loop finishes.

    Args:
        deployments: List of model deployments with physical hardware mapping.
        power_client: Zeus `PowerStreamingClient` connected to zeusd instances.
        augmentation: Power augmentation configuration.
        load_gen: Load generation configuration.
        requests_by_model: Mapping of model_label -> list of pre-built
            OpenAI Chat Completion request dicts (from `data/online/build_requests.py`).
            Each dict should contain at least `model`, `messages`, and
            `max_completion_tokens`. Streaming fields are added automatically.
        dt_s: Simulation timestep (seconds).
        activation_strategy: Controls which virtual servers are active at
            each timestep. If `None`, all servers are always active.
        prometheus_poll_interval_s: How often to poll vLLM /metrics (seconds).
            Set to 0 to disable Prometheus polling.
        health_check: If True, run health checks on start().
    """

    def __init__(
        self,
        *,
        deployments: Sequence[OnlineModelDeployment],
        power_client: PowerStreamingClient,
        augmentation: PowerAugmentationConfig | None = None,
        load_gen: LoadGenerationConfig | None = None,
        requests_by_model: dict[str, list[dict]],
        dt_s: Fraction = Fraction(1, 10),
        activation_strategy: ActivationStrategy | None = None,
        prometheus_poll_interval_s: float = 0.5,
        health_check: bool = True,
    ) -> None:
        if augmentation is None:
            augmentation = PowerAugmentationConfig()
        if load_gen is None:
            load_gen = LoadGenerationConfig()
        self._dt = dt_s
        self._deployments = list(deployments)
        self._deployment_map = {d.model_label: d for d in deployments}
        self._power_client = power_client
        self._augmentation_config = augmentation
        self._load_gen_config = load_gen
        self._requests_by_model = dict(requests_by_model)
        self._health_check = health_check
        self._activation_strategy = activation_strategy

        self._prometheus = (
            _PrometheusPoller(
                deployments,
                poll_interval_s=prometheus_poll_interval_s,
            )
            if prometheus_poll_interval_s > 0
            else None
        )

        self._load_gen = _LoadGenerator(
            deployments,
            requests_by_model,
            load_gen,
            prometheus_poller=self._prometheus,
        )

        self._layout_rng = np.random.default_rng(augmentation.seed)
        self._layouts: dict[str, ServerLayout] = {}
        self._build_all_layouts()
        self._augmenter = PowerAugmenter(
            layouts=self._layouts,
            base_w_per_phase=augmentation.base_kw_per_phase * 1e3,
            seed=augmentation.seed + 12345,
        )
        self._rolling_buffer = _RollingPowerBuffer(
            [d.model_label for d in deployments],
            max_samples=max(int(augmentation.stagger_buffer_s * 100), 1000),
        )

        self._batch_by_model: dict[str, int] = {d.model_label: d.spec.initial_batch_size for d in deployments}

        self._state: OnlineDatacenterState | None = None
        self._history: list[OnlineDatacenterState] = []
        self._events: EventEmitter | None = None
        self._started = False

        logger.info(
            "OnlineDatacenter: %d deployments, dt=%s s",
            len(self._deployments),
            dt_s,
        )
        for d in deployments:
            layout = self._layouts.get(d.model_label)
            n_servers = layout.num_servers if layout else 0
            logger.info(
                "  %s: %d real GPUs, %d simulated replicas (%.0fx augmentation), %d virtual servers, vllm=%s",
                d.model_label,
                d.num_real_gpus,
                d.spec.num_replicas,
                d.augmentation_factor,
                n_servers,
                d.vllm_base_url,
            )

    def _build_all_layouts(self) -> None:
        """Build ServerLayout for each deployed model."""
        default_strategy = RampActivationStrategy(ServerRampSchedule(entries=()))
        strategy = self._activation_strategy or default_strategy
        for d in self._deployments:
            if d.spec.num_replicas > 0:
                self._layouts[d.model_label] = ServerLayout.build(
                    d.spec,
                    gpus_per_server=self._augmentation_config.gpus_per_server,
                    stagger_range=float(self._augmentation_config.stagger_buffer_s),
                    activation_strategy=strategy,
                    amplitude_scale_range=self._augmentation_config.amplitude_scale_range,
                    noise_fraction=self._augmentation_config.noise_fraction,
                    rng=self._layout_rng,
                )

    @property
    def dt_s(self) -> Fraction:
        return self._dt

    @property
    def state(self) -> OnlineDatacenterState:
        if self._state is None:
            raise RuntimeError("OnlineDatacenter.state accessed before first step().")
        return self._state

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors derived from server layout."""
        shares: dict[str, np.ndarray] = {}
        for label, layout in self._layouts.items():
            counts = np.bincount(layout.phase_list, minlength=3).astype(float)
            total = counts.sum()
            if total > 0:
                shares[label] = counts / total
            else:
                shares[label] = np.array([1 / 3, 1 / 3, 1 / 3], dtype=float)
        return shares

    def history(self, n: int | None = None) -> list[OnlineDatacenterState]:
        if n is None:
            return list(self._history)
        if n <= 0:
            return []
        return list(self._history[-int(n) :])

    def reset(self) -> None:
        if self._started:
            self._load_gen.stop()
        self._load_gen = _LoadGenerator(
            self._deployments,
            self._requests_by_model,
            self._load_gen_config,
            prometheus_poller=self._prometheus,
        )
        self._layout_rng = np.random.default_rng(self._augmentation_config.seed)
        self._layouts = {}
        self._build_all_layouts()
        self._augmenter = PowerAugmenter(
            layouts=self._layouts,
            base_w_per_phase=self._augmentation_config.base_kw_per_phase * 1e3,
            seed=self._augmentation_config.seed + 12345,
        )
        self._rolling_buffer.clear()
        self._batch_by_model = {d.model_label: d.spec.initial_batch_size for d in self._deployments}
        self._state = None
        self._history = []
        self._started = False

    def start(self) -> None:
        """Start load generation, warm up servers, and fill the power buffer.

        Sequence:
            1. Run health checks on all vLLM servers and zeusd instances.
            2. Wait for at least one power reading per endpoint (10 s timeout).
            3. Set initial batch sizes on all vLLM servers.
            4. Start load generation threads.
            5. Warm up: poll power into the rolling buffer while waiting for
               each model's `num_requests_running` to reach 95% of its
               `initial_batch_size`. Fails after 60 s if any model does not
               saturate.
        """
        if self._started:
            raise RuntimeError("OnlineDatacenter already started")

        logger.info("Starting OnlineDatacenter with %d deployments", len(self._deployments))

        # 1. Health checks
        if self._health_check:
            logger.info("Running health checks...")
            for d in self._deployments:
                _check_vllm_health(d.vllm_base_url)
                _check_vllm_model(d.vllm_base_url, d.model_name)
                for ep in d.gpu_endpoints:
                    _check_zeusd_health(ep.host, ep.port)
            logger.info("All health checks passed")

        # 2. Wait for power readings from all endpoints
        all_endpoints: set[str] = set()
        for d in self._deployments:
            for ep in d.gpu_endpoints:
                all_endpoints.add(ep.endpoint_key)

        deadline = time.monotonic() + 10.0
        while time.monotonic() < deadline:
            readings = self._power_client.get_power()
            if all_endpoints.issubset(readings.keys()):
                logger.info("Power readings received from all %d endpoints", len(all_endpoints))
                break
            time.sleep(0.5)
        else:
            connected = set(self._power_client.get_power().keys())
            missing = all_endpoints - connected
            logger.warning("Timed out waiting for power readings from: %s", missing)

        # 3. Set initial batch sizes on vLLM servers
        for d in self._deployments:
            batch = d.spec.initial_batch_size
            _set_vllm_batch_size(d.vllm_base_url, batch)
            logger.info("Set initial batch size for %s: %d", d.model_label, batch)

        # 4. Start load generation (and Prometheus poller)
        self._load_gen.start()
        logger.info("LoadGenerator started")

        # 5. Warm up: fill power buffer + wait for server saturation
        self._warmup()

        self._started = True
        logger.info("OnlineDatacenter ready")

    def _poll_power_into_buffer(self) -> tuple[float, dict[str, float]]:
        """Read GPU power from all endpoints and feed the rolling buffer.

        Returns:
            Tuple of (monotonic timestamp, per-model average per-GPU watts).
        """
        now = time.monotonic()
        raw_power = self._power_client.get_power()
        per_gpu_by_model: dict[str, float] = {}
        for d in self._deployments:
            total_w = 0.0
            n_gpus = 0
            for ep in d.gpu_endpoints:
                pr = raw_power.get(ep.endpoint_key)
                if pr is None:
                    continue
                for idx in ep.gpu_indices:
                    if idx in pr.gpu_power_w:
                        total_w += pr.gpu_power_w[idx]
                        n_gpus += 1
            per_gpu_w = total_w / n_gpus if n_gpus > 0 else 0.0
            self._rolling_buffer.append(d.model_label, now, per_gpu_w)
            per_gpu_by_model[d.model_label] = per_gpu_w
        return now, per_gpu_by_model

    def _warmup(
        self,
        timeout_s: float = 60.0,
        saturation_threshold: float = 0.95,
        poll_interval_s: float = 0.1,
    ) -> None:
        """Fill the rolling power buffer and wait for vLLM server saturation.

        Actively polls GPU power to fill the rolling buffer while monitoring
        Prometheus `num_requests_running` to verify each model has reached
        `saturation_threshold` of its `initial_batch_size`.

        Completion requires both conditions for every model:
            1. `num_requests_running >= saturation_threshold * initial_batch_size`
            2. At least `stagger_buffer_s` has elapsed since that model first
               reached saturation (so the buffer contains a full stagger
               window of steady-state power data).

        Args:
            timeout_s: Maximum warmup duration in seconds.
            saturation_threshold: Fraction of `initial_batch_size` that
                `num_requests_running` must reach (0.0-1.0).
            poll_interval_s: Seconds between power polls.

        Raises:
            RuntimeError: If any model fails to saturate within `timeout_s`.
                Includes the `num_requests_running` trajectory for failed
                models.
        """
        stagger_s = self._augmentation_config.stagger_buffer_s
        logger.info(
            "Warming up: waiting for server saturation (%.0f%% of initial_batch_size) "
            "+ %.1f s buffer fill per model...",
            saturation_threshold * 100,
            stagger_s,
        )

        warmup_start = time.monotonic()
        deadline = warmup_start + timeout_s
        last_log = warmup_start

        trajectory: dict[str, list[tuple[float, float]]] = {d.model_label: [] for d in self._deployments}
        saturation_time: dict[str, float | None] = {d.model_label: None for d in self._deployments}

        while time.monotonic() < deadline:
            now = time.monotonic()
            elapsed = now - warmup_start

            self._poll_power_into_buffer()

            all_ready = True
            if self._prometheus is not None:
                prom = self._prometheus.get_latest()
                for d in self._deployments:
                    label = d.model_label
                    running = prom.get(label, {}).get("num_requests_running", 0.0)
                    trajectory[label].append((elapsed, running))
                    target = d.spec.initial_batch_size * saturation_threshold

                    if running >= target and saturation_time[label] is None:
                        saturation_time[label] = now
                        logger.info(
                            "  %s saturated at t=%.1f s (num_requests_running=%.0f)",
                            label,
                            elapsed,
                            running,
                        )

                    sat_t = saturation_time[label]
                    if sat_t is None or (now - sat_t) < stagger_s:
                        all_ready = False
            else:
                logger.warning(
                    "Prometheus polling is disabled; cannot verify server saturation. "
                    "Waiting %.1f s for power buffer only.",
                    stagger_s,
                )
                if elapsed < stagger_s:
                    all_ready = False

            if all_ready:
                logger.info("Warmup complete in %.1f s", elapsed)
                return

            if now - last_log >= 10.0:
                last_log = now
                if self._prometheus is not None:
                    prom = self._prometheus.get_latest()
                    for d in self._deployments:
                        label = d.model_label
                        running = prom.get(label, {}).get("num_requests_running", 0.0)
                        target = d.spec.initial_batch_size
                        sat_t = saturation_time[label]
                        buf_s = (now - sat_t) if sat_t is not None else 0.0
                        logger.info(
                            "  Warmup %s: num_requests_running=%.0f / %d (%.0f%%), buffer=%.1f / %.1f s",
                            label,
                            running,
                            target,
                            running / max(target, 1) * 100,
                            buf_s,
                            stagger_s,
                        )

            time.sleep(poll_interval_s)

        if self._prometheus is None:
            raise RuntimeError(
                f"Warmup timed out after {timeout_s:.0f} s waiting for power buffer to fill ({stagger_s:.1f} s)"
            )

        prom = self._prometheus.get_latest()
        failed: list[str] = []
        for d in self._deployments:
            label = d.model_label
            running = prom.get(label, {}).get("num_requests_running", 0.0)
            sat_t = saturation_time[label]
            not_saturated = running < d.spec.initial_batch_size * saturation_threshold
            not_buffered = sat_t is None or (time.monotonic() - sat_t) < stagger_s
            if not_saturated or not_buffered:
                failed.append(label)

        parts = [
            f"Warmup timed out after {timeout_s:.0f} s. "
            f"Models that failed to reach {saturation_threshold:.0%} of initial_batch_size:",
        ]
        for label in failed:
            target = self._deployment_map[label].spec.initial_batch_size
            traj = trajectory[label]
            final = traj[-1][1] if traj else 0.0
            parts.append(f"  {label} (target: {target}, reached: {final:.0f}):")
            step = max(1, int(5.0 / poll_interval_s))
            samples = traj[::step]
            if traj and (not samples or samples[-1] is not traj[-1]):
                samples.append(traj[-1])
            entries = [f"t={t:.0f}s: {r:.0f}" for t, r in samples]
            parts.append("    " + ", ".join(entries))
        raise RuntimeError("\n".join(parts))

    def stop(self) -> None:
        """Stop load generation and power streaming."""
        self._load_gen.stop()
        self._power_client.stop()
        self._started = False
        logger.info("OnlineDatacenter stopped")

    def step(self, clock: SimulationClock) -> OnlineDatacenterState:
        """Read live power, augment to datacenter scale, and return state."""
        now, per_gpu_w_by_model = self._poll_power_into_buffer()

        measured_power_by_model: dict[str, float] = {}
        augmentation_factor_by_model: dict[str, float] = {}
        for d in self._deployments:
            label = d.model_label
            measured_power_by_model[label] = per_gpu_w_by_model.get(label, 0.0) * d.num_real_gpus
            augmentation_factor_by_model[label] = d.augmentation_factor

        per_gpu_by_model: dict[str, np.ndarray] = {}
        for d in self._deployments:
            label = d.model_label
            if label not in self._layouts:
                continue
            layout = self._layouts[label]
            per_gpu_by_model[label] = self._rolling_buffer.sample_servers(label, now, layout.stagger_offsets)

        aug = self._augmenter.step(per_gpu_by_model, clock.time_s)

        measured_total = sum(measured_power_by_model.values())
        measured_per_phase = measured_total / 3.0
        base_w = self._augmentation_config.base_kw_per_phase * 1e3

        observed_itl: dict[str, float] = {
            d.model_label: self._load_gen.get_observed_itl(d.model_label) for d in self._deployments
        }

        prometheus_metrics: dict[str, dict[str, float]] = {}
        if self._prometheus is not None:
            prometheus_metrics = self._prometheus.get_latest()

        state = OnlineDatacenterState(
            time_s=clock.time_s,
            power_w=aug.power_w,
            batch_size_by_model=dict(self._batch_by_model),
            active_replicas_by_model=aug.active_replicas_by_model,
            observed_itl_s_by_model=observed_itl,
            measured_power_w=ThreePhase(
                a=measured_per_phase + base_w,
                b=measured_per_phase + base_w,
                c=measured_per_phase + base_w,
            ),
            measured_power_w_by_model=measured_power_by_model,
            augmented_power_w_by_model=aug.power_by_model_w,
            augmentation_factor_by_model=augmentation_factor_by_model,
            prometheus_metrics_by_model=prometheus_metrics,
        )
        self._state = state
        self._history.append(state)
        return state

    @functools.singledispatchmethod
    def apply_control(self, command: DatacenterCommand) -> None:
        """Apply a control command. Dispatches on command type."""
        raise TypeError(f"OnlineDatacenter does not support {type(command).__name__}")

    @apply_control.register
    def _(self, command: SetBatchSize) -> None:
        """Apply batch size command by sending HTTP requests to vLLM servers."""
        for label, b in command.batch_size_by_model.items():
            label = str(label)
            b_int = int(b)
            if b_int <= 0:
                raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
            old = self._batch_by_model.get(label)
            self._batch_by_model[label] = b_int
            if old != b_int:
                dep = self._deployment_map.get(label)
                if dep is not None:
                    _set_vllm_batch_size(
                        dep.vllm_base_url, b_int, ramp_up_rate=command.ramp_up_rate_by_model.get(label, 0.0)
                    )
                    self._load_gen.set_batch_size(label, b_int)
                logger.info("Batch size %s: %s -> %d", label, old, b_int)

        if self._events is not None:
            self._events.emit(
                "datacenter.batch_size.updated",
                {"batch_size_by_model": dict(self._batch_by_model)},
            )

    def bind_event_emitter(self, emitter: EventEmitter) -> None:
        self._events = emitter

phase_share_by_model property

Per-model phase share vectors derived from server layout.

start()

Start load generation, warm up servers, and fill the power buffer.

Sequence
  1. Run health checks on all vLLM servers and zeusd instances.
  2. Wait for at least one power reading per endpoint (10 s timeout).
  3. Set initial batch sizes on all vLLM servers.
  4. Start load generation threads.
  5. Warm up: poll power into the rolling buffer while waiting for each model's num_requests_running to reach 95% of its initial_batch_size. Fails after 60 s if any model does not saturate.
Source code in openg2g/datacenter/online.py
def start(self) -> None:
    """Start load generation, warm up servers, and fill the power buffer.

    Sequence:
        1. Run health checks on all vLLM servers and zeusd instances.
        2. Wait for at least one power reading per endpoint (10 s timeout).
        3. Set initial batch sizes on all vLLM servers.
        4. Start load generation threads.
        5. Warm up: poll power into the rolling buffer while waiting for
           each model's `num_requests_running` to reach 95% of its
           `initial_batch_size`. Fails after 60 s if any model does not
           saturate.
    """
    if self._started:
        raise RuntimeError("OnlineDatacenter already started")

    logger.info("Starting OnlineDatacenter with %d deployments", len(self._deployments))

    # 1. Health checks
    if self._health_check:
        logger.info("Running health checks...")
        for d in self._deployments:
            _check_vllm_health(d.vllm_base_url)
            _check_vllm_model(d.vllm_base_url, d.model_name)
            for ep in d.gpu_endpoints:
                _check_zeusd_health(ep.host, ep.port)
        logger.info("All health checks passed")

    # 2. Wait for power readings from all endpoints
    all_endpoints: set[str] = set()
    for d in self._deployments:
        for ep in d.gpu_endpoints:
            all_endpoints.add(ep.endpoint_key)

    deadline = time.monotonic() + 10.0
    while time.monotonic() < deadline:
        readings = self._power_client.get_power()
        if all_endpoints.issubset(readings.keys()):
            logger.info("Power readings received from all %d endpoints", len(all_endpoints))
            break
        time.sleep(0.5)
    else:
        connected = set(self._power_client.get_power().keys())
        missing = all_endpoints - connected
        logger.warning("Timed out waiting for power readings from: %s", missing)

    # 3. Set initial batch sizes on vLLM servers
    for d in self._deployments:
        batch = d.spec.initial_batch_size
        _set_vllm_batch_size(d.vllm_base_url, batch)
        logger.info("Set initial batch size for %s: %d", d.model_label, batch)

    # 4. Start load generation (and Prometheus poller)
    self._load_gen.start()
    logger.info("LoadGenerator started")

    # 5. Warm up: fill power buffer + wait for server saturation
    self._warmup()

    self._started = True
    logger.info("OnlineDatacenter ready")

stop()

Stop load generation and power streaming.

Source code in openg2g/datacenter/online.py
def stop(self) -> None:
    """Stop load generation and power streaming."""
    self._load_gen.stop()
    self._power_client.stop()
    self._started = False
    logger.info("OnlineDatacenter stopped")

step(clock)

Read live power, augment to datacenter scale, and return state.

Source code in openg2g/datacenter/online.py
def step(self, clock: SimulationClock) -> OnlineDatacenterState:
    """Read live power, augment to datacenter scale, and return state."""
    now, per_gpu_w_by_model = self._poll_power_into_buffer()

    measured_power_by_model: dict[str, float] = {}
    augmentation_factor_by_model: dict[str, float] = {}
    for d in self._deployments:
        label = d.model_label
        measured_power_by_model[label] = per_gpu_w_by_model.get(label, 0.0) * d.num_real_gpus
        augmentation_factor_by_model[label] = d.augmentation_factor

    per_gpu_by_model: dict[str, np.ndarray] = {}
    for d in self._deployments:
        label = d.model_label
        if label not in self._layouts:
            continue
        layout = self._layouts[label]
        per_gpu_by_model[label] = self._rolling_buffer.sample_servers(label, now, layout.stagger_offsets)

    aug = self._augmenter.step(per_gpu_by_model, clock.time_s)

    measured_total = sum(measured_power_by_model.values())
    measured_per_phase = measured_total / 3.0
    base_w = self._augmentation_config.base_kw_per_phase * 1e3

    observed_itl: dict[str, float] = {
        d.model_label: self._load_gen.get_observed_itl(d.model_label) for d in self._deployments
    }

    prometheus_metrics: dict[str, dict[str, float]] = {}
    if self._prometheus is not None:
        prometheus_metrics = self._prometheus.get_latest()

    state = OnlineDatacenterState(
        time_s=clock.time_s,
        power_w=aug.power_w,
        batch_size_by_model=dict(self._batch_by_model),
        active_replicas_by_model=aug.active_replicas_by_model,
        observed_itl_s_by_model=observed_itl,
        measured_power_w=ThreePhase(
            a=measured_per_phase + base_w,
            b=measured_per_phase + base_w,
            c=measured_per_phase + base_w,
        ),
        measured_power_w_by_model=measured_power_by_model,
        augmented_power_w_by_model=aug.power_by_model_w,
        augmentation_factor_by_model=augmentation_factor_by_model,
        prometheus_metrics_by_model=prometheus_metrics,
    )
    self._state = state
    self._history.append(state)
    return state

apply_control(command)

Apply a control command. Dispatches on command type.

Source code in openg2g/datacenter/online.py
@functools.singledispatchmethod
def apply_control(self, command: DatacenterCommand) -> None:
    """Apply a control command. Dispatches on command type."""
    raise TypeError(f"OnlineDatacenter does not support {type(command).__name__}")

openg2g.datacenter.training_overlay

Training workload overlay: typed trace data and periodic overlay evaluation.

TrainingTrace dataclass

A single-GPU training power trace.

Attributes:

Name Type Description
t_s ndarray

Time vector (seconds), monotonically increasing.

power_w ndarray

Power vector (watts) for one GPU, same length as t_s.

Source code in openg2g/datacenter/training_overlay.py
@dataclass(frozen=True)
class TrainingTrace:
    """A single-GPU training power trace.

    Attributes:
        t_s: Time vector (seconds), monotonically increasing.
        power_w: Power vector (watts) for one GPU, same length as `t_s`.
    """

    COL_TIME = "t_s"
    COL_POWER = "power_W"

    t_s: np.ndarray
    power_w: np.ndarray

    def __post_init__(self) -> None:
        if len(self.t_s) != len(self.power_w):
            raise ValueError(f"t_s and power_w must have the same length, got {len(self.t_s)} and {len(self.power_w)}")
        if len(self.t_s) < 2:
            raise ValueError("Training trace must have >= 2 samples.")

    @classmethod
    def load(cls, csv_path: Path) -> TrainingTrace:
        """Load a training trace from CSV.

        Args:
            csv_path: Path to CSV with columns `t_s` and `power_W`.
        """
        csv_path = Path(csv_path)
        df = pd.read_csv(csv_path)
        if cls.COL_TIME not in df.columns or cls.COL_POWER not in df.columns:
            raise ValueError(
                f"{csv_path} must have columns {cls.COL_TIME!r} and {cls.COL_POWER!r}. Got {list(df.columns)}"
            )

        t = df[cls.COL_TIME].to_numpy(float)
        p = np.clip(df[cls.COL_POWER].to_numpy(float), 0.0, None)

        if np.any(np.diff(t) < 0):
            idx = np.argsort(t)
            t, p = t[idx], p[idx]

        return cls(t_s=t, power_w=p)

load(csv_path) classmethod

Load a training trace from CSV.

Parameters:

Name Type Description Default
csv_path Path

Path to CSV with columns t_s and power_W.

required
Source code in openg2g/datacenter/training_overlay.py
@classmethod
def load(cls, csv_path: Path) -> TrainingTrace:
    """Load a training trace from CSV.

    Args:
        csv_path: Path to CSV with columns `t_s` and `power_W`.
    """
    csv_path = Path(csv_path)
    df = pd.read_csv(csv_path)
    if cls.COL_TIME not in df.columns or cls.COL_POWER not in df.columns:
        raise ValueError(
            f"{csv_path} must have columns {cls.COL_TIME!r} and {cls.COL_POWER!r}. Got {list(df.columns)}"
        )

    t = df[cls.COL_TIME].to_numpy(float)
    p = np.clip(df[cls.COL_POWER].to_numpy(float), 0.0, None)

    if np.any(np.diff(t) < 0):
        idx = np.argsort(t)
        t, p = t[idx], p[idx]

    return cls(t_s=t, power_w=p)

TrainingOverlayCache

Rescales a training trace and provides periodic overlay evaluation.

Parameters:

Name Type Description Default
trace TrainingTrace

Single-GPU training power trace.

required
target_peak_W_per_gpu float

The trace is rescaled so its peak equals this value.

required
Source code in openg2g/datacenter/training_overlay.py
class TrainingOverlayCache:
    """Rescales a training trace and provides periodic overlay evaluation.

    Args:
        trace: Single-GPU training power trace.
        target_peak_W_per_gpu: The trace is rescaled so its peak equals this
            value.
    """

    def __init__(self, trace: TrainingTrace, *, target_peak_W_per_gpu: float) -> None:
        training_time = np.asarray(trace.t_s, float)
        raw_power = np.asarray(trace.power_w, float)

        training_time = training_time - training_time[0]
        period = float(training_time[-1] - training_time[0])
        if period <= 0:
            raise ValueError("Training trace time span must be positive.")

        peak = float(np.max(raw_power))
        if peak <= 0:
            raise ValueError("Training trace has non-positive peak; cannot scale.")

        per_gpu_power = raw_power * (float(target_peak_W_per_gpu) / peak)

        self.training_time = training_time
        self.per_gpu_power = per_gpu_power
        self.period = period
        self.target_peak_W_per_gpu = float(target_peak_W_per_gpu)
        self.peak_1gpu_after_scaling = float(np.max(per_gpu_power))

    def eval_total_on_grid(
        self,
        t_global_s: np.ndarray,
        *,
        t_add_start: float,
        t_add_end: float,
        n_train_gpus: int,
    ) -> np.ndarray:
        """Evaluate total training power overlay on a time grid.

        Args:
            t_global_s: Global simulation time stamps (seconds).
            t_add_start: Global time when training becomes active.
            t_add_end: Global time when training stops.
            n_train_gpus: Number of GPUs running the training workload.

        Returns:
            Total training power (W) at each time step.
        """
        t = np.asarray(t_global_s, float)
        overlay_total = np.zeros_like(t, dtype=float)
        mask = (t >= float(t_add_start)) & (t <= float(t_add_end))
        if not np.any(mask):
            return overlay_total

        t_local = t[mask] - float(t_add_start)
        t_mod = np.mod(t_local, self.period)
        p_local_1gpu = np.interp(
            t_mod,
            self.training_time,
            self.per_gpu_power,
            left=float(self.per_gpu_power[0]),
            right=float(self.per_gpu_power[-1]),
        )
        overlay_total[mask] = p_local_1gpu * int(n_train_gpus)
        return overlay_total

eval_total_on_grid(t_global_s, *, t_add_start, t_add_end, n_train_gpus)

Evaluate total training power overlay on a time grid.

Parameters:

Name Type Description Default
t_global_s ndarray

Global simulation time stamps (seconds).

required
t_add_start float

Global time when training becomes active.

required
t_add_end float

Global time when training stops.

required
n_train_gpus int

Number of GPUs running the training workload.

required

Returns:

Type Description
ndarray

Total training power (W) at each time step.

Source code in openg2g/datacenter/training_overlay.py
def eval_total_on_grid(
    self,
    t_global_s: np.ndarray,
    *,
    t_add_start: float,
    t_add_end: float,
    n_train_gpus: int,
) -> np.ndarray:
    """Evaluate total training power overlay on a time grid.

    Args:
        t_global_s: Global simulation time stamps (seconds).
        t_add_start: Global time when training becomes active.
        t_add_end: Global time when training stops.
        n_train_gpus: Number of GPUs running the training workload.

    Returns:
        Total training power (W) at each time step.
    """
    t = np.asarray(t_global_s, float)
    overlay_total = np.zeros_like(t, dtype=float)
    mask = (t >= float(t_add_start)) & (t <= float(t_add_end))
    if not np.any(mask):
        return overlay_total

    t_local = t[mask] - float(t_add_start)
    t_mod = np.mod(t_local, self.period)
    p_local_1gpu = np.interp(
        t_mod,
        self.training_time,
        self.per_gpu_power,
        left=float(self.per_gpu_power[0]),
        right=float(self.per_gpu_power[-1]),
    )
    overlay_total[mask] = p_local_1gpu * int(n_train_gpus)
    return overlay_total