Skip to content

openg2g.datacenter

openg2g.datacenter.base

Abstract base class for datacenter backends and base state types.

DatacenterState dataclass

State emitted by a datacenter backend each timestep.

Contains only universally applicable fields. LLM-inference-specific fields (batch sizes, replicas, latency) live on child classes like LLMDatacenterState.

Attributes:

Name Type Description
time_s float

Simulation time in seconds.

power_w ThreePhase

Three-phase power in watts.

Source code in openg2g/datacenter/base.py
@dataclass(frozen=True)
class DatacenterState:
    """State emitted by a datacenter backend each timestep.

    Contains only universally applicable fields. LLM-inference-specific
    fields (batch sizes, replicas, latency) live on child classes like
    [`LLMDatacenterState`][..LLMDatacenterState].

    Attributes:
        time_s: Simulation time in seconds.
        power_w: Three-phase power in watts.
    """

    time_s: float
    power_w: ThreePhase

LLMDatacenterState dataclass

Bases: DatacenterState

State from a datacenter serving LLM workloads.

Extends DatacenterState with per-model batch size, replica count, and observed inter-token latency fields used by LLM controllers.

Attributes:

Name Type Description
batch_size_by_model dict[str, int]

Current batch size per model label.

active_replicas_by_model dict[str, int]

Number of active replicas per model.

observed_itl_s_by_model dict[str, float]

Observed average inter-token latency (seconds) per model. NaN if unavailable.

Source code in openg2g/datacenter/base.py
@dataclass(frozen=True)
class LLMDatacenterState(DatacenterState):
    """State from a datacenter serving LLM workloads.

    Extends [`DatacenterState`][..DatacenterState] with per-model batch
    size, replica count, and observed inter-token latency fields used
    by LLM controllers.

    Attributes:
        batch_size_by_model: Current batch size per model label.
        active_replicas_by_model: Number of active replicas per model.
        observed_itl_s_by_model: Observed average inter-token latency
            (seconds) per model. `NaN` if unavailable.
    """

    batch_size_by_model: dict[str, int] = field(default_factory=dict)
    active_replicas_by_model: dict[str, int] = field(default_factory=dict)
    observed_itl_s_by_model: dict[str, float] = field(default_factory=dict)

DatacenterBackend

Bases: Generic[DCStateT], ABC

Interface for datacenter power simulation backends.

Source code in openg2g/datacenter/base.py
class DatacenterBackend(Generic[DCStateT], ABC):
    """Interface for datacenter power simulation backends."""

    _INIT_SENTINEL = object()

    def __init__(self, *, name: str) -> None:
        self._name = name
        self._state: DCStateT | None = None
        self._history: list[DCStateT] = []
        self._dc_base_init = DatacenterBackend._INIT_SENTINEL

    @property
    def name(self) -> str:
        """Human-readable name for logging and display."""
        return self._name

    def _check_base_init(self) -> None:
        if getattr(self, "_dc_base_init", None) is not DatacenterBackend._INIT_SENTINEL:
            raise TypeError(f"{type(self).__name__}.__init__ must call super().__init__() ")

    @property
    @abstractmethod
    def dt_s(self) -> Fraction:
        """Native timestep as a Fraction (seconds)."""

    @final
    @property
    def state(self) -> DCStateT:
        """Latest emitted state.

        Raises:
            RuntimeError: If accessed before the first `step()` call.
        """
        self._check_base_init()
        if self._state is None:
            raise RuntimeError(f"{type(self).__name__}.state accessed before first step().")
        return self._state

    @final
    def history(self, n: int | None = None) -> list[DCStateT]:
        """Return emitted state history (all, or latest `n`)."""
        self._check_base_init()
        if n is None:
            return list(self._history)
        if n <= 0:
            return []
        return list(self._history[-int(n) :])

    @final
    def do_step(self, clock: SimulationClock, events: EventEmitter) -> DCStateT:
        """Call `step`, record the state, and return it.

        Called by the coordinator. Subclasses should not override this.
        """
        self._check_base_init()
        state = self.step(clock, events)
        self._state = state
        self._history.append(state)
        return state

    @abstractmethod
    def step(self, clock: SimulationClock, events: EventEmitter) -> DCStateT:
        """Advance one native timestep. Return state for this step."""

    @abstractmethod
    def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
        """Apply one command. Takes effect on next step() call."""

    @final
    def do_reset(self) -> None:
        """Clear history and call `reset`.

        Called by the coordinator. Subclasses should not override this.
        """
        self._check_base_init()
        self._state = None
        self._history.clear()
        self.reset()

    @abstractmethod
    def reset(self) -> None:
        """Reset simulation state to initial conditions.

        Called by the coordinator (via `do_reset`) before each
        [`start`][..start]. Must clear all simulation state: counters,
        RNG seeds, cached values. Configuration (dt_s, models,
        templates) is not affected. History is cleared automatically
        by `do_reset`.

        Abstract so every implementation explicitly enumerates its state.
        A forgotten field is a bug -- not clearing it silently corrupts
        the second run.
        """

    def start(self) -> None:
        """Acquire per-run resources (threads, solver circuits).

        Called after [`reset`][..reset], before the simulation loop.
        Override for backends that need resource acquisition (e.g.,
        [`OpenDSSGrid`][openg2g.grid.opendss.OpenDSSGrid] compiles its
        DSS circuit here). No-op by default because most offline
        components have no resources to acquire.
        """

    def stop(self) -> None:
        """Release per-run resources. Simulation state is preserved.

        Called after the simulation loop in LIFO order. Override for
        backends that acquired resources in [`start`][..start]. No-op
        by default.
        """

name property

Human-readable name for logging and display.

dt_s abstractmethod property

Native timestep as a Fraction (seconds).

state property

Latest emitted state.

Raises:

Type Description
RuntimeError

If accessed before the first step() call.

history(n=None)

Return emitted state history (all, or latest n).

Source code in openg2g/datacenter/base.py
@final
def history(self, n: int | None = None) -> list[DCStateT]:
    """Return emitted state history (all, or latest `n`)."""
    self._check_base_init()
    if n is None:
        return list(self._history)
    if n <= 0:
        return []
    return list(self._history[-int(n) :])

do_step(clock, events)

Call step, record the state, and return it.

Called by the coordinator. Subclasses should not override this.

Source code in openg2g/datacenter/base.py
@final
def do_step(self, clock: SimulationClock, events: EventEmitter) -> DCStateT:
    """Call `step`, record the state, and return it.

    Called by the coordinator. Subclasses should not override this.
    """
    self._check_base_init()
    state = self.step(clock, events)
    self._state = state
    self._history.append(state)
    return state

step(clock, events) abstractmethod

Advance one native timestep. Return state for this step.

Source code in openg2g/datacenter/base.py
@abstractmethod
def step(self, clock: SimulationClock, events: EventEmitter) -> DCStateT:
    """Advance one native timestep. Return state for this step."""

apply_control(command, events) abstractmethod

Apply one command. Takes effect on next step() call.

Source code in openg2g/datacenter/base.py
@abstractmethod
def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
    """Apply one command. Takes effect on next step() call."""

do_reset()

Clear history and call reset.

Called by the coordinator. Subclasses should not override this.

Source code in openg2g/datacenter/base.py
@final
def do_reset(self) -> None:
    """Clear history and call `reset`.

    Called by the coordinator. Subclasses should not override this.
    """
    self._check_base_init()
    self._state = None
    self._history.clear()
    self.reset()

reset() abstractmethod

Reset simulation state to initial conditions.

Called by the coordinator (via do_reset) before each start. Must clear all simulation state: counters, RNG seeds, cached values. Configuration (dt_s, models, templates) is not affected. History is cleared automatically by do_reset.

Abstract so every implementation explicitly enumerates its state. A forgotten field is a bug -- not clearing it silently corrupts the second run.

Source code in openg2g/datacenter/base.py
@abstractmethod
def reset(self) -> None:
    """Reset simulation state to initial conditions.

    Called by the coordinator (via `do_reset`) before each
    [`start`][..start]. Must clear all simulation state: counters,
    RNG seeds, cached values. Configuration (dt_s, models,
    templates) is not affected. History is cleared automatically
    by `do_reset`.

    Abstract so every implementation explicitly enumerates its state.
    A forgotten field is a bug -- not clearing it silently corrupts
    the second run.
    """

start()

Acquire per-run resources (threads, solver circuits).

Called after reset, before the simulation loop. Override for backends that need resource acquisition (e.g., OpenDSSGrid compiles its DSS circuit here). No-op by default because most offline components have no resources to acquire.

Source code in openg2g/datacenter/base.py
def start(self) -> None:
    """Acquire per-run resources (threads, solver circuits).

    Called after [`reset`][..reset], before the simulation loop.
    Override for backends that need resource acquisition (e.g.,
    [`OpenDSSGrid`][openg2g.grid.opendss.OpenDSSGrid] compiles its
    DSS circuit here). No-op by default because most offline
    components have no resources to acquire.
    """

stop()

Release per-run resources. Simulation state is preserved.

Called after the simulation loop in LIFO order. Override for backends that acquired resources in start. No-op by default.

Source code in openg2g/datacenter/base.py
def stop(self) -> None:
    """Release per-run resources. Simulation state is preserved.

    Called after the simulation loop in LIFO order. Override for
    backends that acquired resources in [`start`][..start]. No-op
    by default.
    """

LLMBatchSizeControlledDatacenter

Bases: DatacenterBackend[DCStateT]

Datacenter that serves LLM workloads and supports batch-size control.

Marker layer between DatacenterBackend and concrete implementations. Controllers that issue SetBatchSize commands or read active_replicas_by_model / observed_itl_s_by_model from state should bind their generic to this class.

Source code in openg2g/datacenter/base.py
class LLMBatchSizeControlledDatacenter(DatacenterBackend[DCStateT]):
    """Datacenter that serves LLM workloads and supports batch-size control.

    Marker layer between [`DatacenterBackend`][..DatacenterBackend] and
    concrete implementations. Controllers that issue
    [`SetBatchSize`][openg2g.datacenter.command.SetBatchSize] commands or read
    `active_replicas_by_model` / `observed_itl_s_by_model`
    from state should bind their generic to this class.
    """

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors `[frac_A, frac_B, frac_C]`.

        Returns an empty dict by default. Consumers treat missing keys
        as uniform `[1/3, 1/3, 1/3]`. Override in subclasses that know
        actual server-to-phase placement.
        """
        return {}

phase_share_by_model property

Per-model phase share vectors [frac_A, frac_B, frac_C].

Returns an empty dict by default. Consumers treat missing keys as uniform [1/3, 1/3, 1/3]. Override in subclasses that know actual server-to-phase placement.

openg2g.datacenter.command

Command types targeting datacenter backends.

DatacenterCommand

Base for commands targeting the datacenter backend.

Subclass this for each concrete datacenter command kind. The coordinator applies commands via command.target.apply_control().

Attributes:

Name Type Description
target DatacenterBackend

Datacenter backend this command targets.

Source code in openg2g/datacenter/command.py
class DatacenterCommand:
    """Base for commands targeting the datacenter backend.

    Subclass this for each concrete datacenter command kind.
    The coordinator applies commands via `command.target.apply_control()`.

    Attributes:
        target: Datacenter backend this command targets.
    """

    target: DatacenterBackend

    def __init__(self) -> None:
        if type(self) is DatacenterCommand:
            raise TypeError("DatacenterCommand cannot be instantiated directly; subclass it.")

SetBatchSize dataclass

Bases: DatacenterCommand

Set batch sizes for one or more models.

Attributes:

Name Type Description
batch_size_by_model dict[str, int]

Mapping of model label to target batch size.

ramp_up_rate_by_model dict[str, float]

Per-model requests/second ramp-up rate. Models not present get immediate changes (rate 0).

target DatacenterBackend

Datacenter backend this command targets.

Source code in openg2g/datacenter/command.py
@dataclass(frozen=True)
class SetBatchSize(DatacenterCommand):
    """Set batch sizes for one or more models.

    Attributes:
        batch_size_by_model: Mapping of model label to target batch size.
        ramp_up_rate_by_model: Per-model requests/second ramp-up rate.
            Models not present get immediate changes (rate 0).
        target: Datacenter backend this command targets.
    """

    batch_size_by_model: dict[str, int]
    ramp_up_rate_by_model: dict[str, float] = field(default_factory=dict)
    target: DatacenterBackend = field(kw_only=True)

ShiftReplicas dataclass

Bases: DatacenterCommand

Shift replicas for a model at this datacenter.

Positive replica_delta adds replicas (receiving site); negative removes them (sending site).

Attributes:

Name Type Description
model_label str

Which model to shift.

replica_delta int

Number of replicas to add (>0) or remove (<0).

target DatacenterBackend

Datacenter backend this command targets.

Source code in openg2g/datacenter/command.py
@dataclass(frozen=True)
class ShiftReplicas(DatacenterCommand):
    """Shift replicas for a model at this datacenter.

    Positive `replica_delta` adds replicas (receiving site);
    negative removes them (sending site).

    Attributes:
        model_label: Which model to shift.
        replica_delta: Number of replicas to add (>0) or remove (<0).
        target: Datacenter backend this command targets.
    """

    model_label: str
    replica_delta: int
    target: DatacenterBackend = field(kw_only=True)

openg2g.datacenter.config

Datacenter facility and workload configuration.

InferenceModelSpec

Bases: BaseModel

Specification for one LLM model served in the datacenter.

This is a pure model-identity object describing what is served, not how many or at what batch size. Deployment-specific parameters (replica count, initial batch size) are specified via ModelDeployment.

Attributes:

Name Type Description
model_label str

Human-readable model identifier (e.g. "Llama-3.1-70B").

model_id str

HuggingFace model ID (e.g. "meta-llama/Llama-3.1-70B-Instruct"). Used for benchmark data lookups and online API model fields.

gpus_per_replica int

GPUs allocated to each replica (determines model parallelism and per-replica power draw).

itl_deadline_s float

Per-model inter-token latency deadline for the OFO latency dual (seconds).

feasible_batch_sizes tuple[int, ...]

Allowed batch sizes. Used by the OFO controller for discretizing continuous batch-size updates and by the online datacenter for load-generator sizing.

Source code in openg2g/datacenter/config.py
class InferenceModelSpec(BaseModel):
    """Specification for one LLM model served in the datacenter.

    This is a pure model-identity object describing *what* is served, not
    *how many* or *at what batch size*.  Deployment-specific parameters
    (replica count, initial batch size) are specified via
    [`ModelDeployment`][openg2g.datacenter.config.ModelDeployment].

    Attributes:
        model_label: Human-readable model identifier (e.g. `"Llama-3.1-70B"`).
        model_id: HuggingFace model ID (e.g. `"meta-llama/Llama-3.1-70B-Instruct"`).
            Used for benchmark data lookups and online API model fields.
        gpus_per_replica: GPUs allocated to each replica (determines model
            parallelism and per-replica power draw).
        itl_deadline_s: Per-model inter-token latency deadline for the OFO
            latency dual (seconds).
        feasible_batch_sizes: Allowed batch sizes. Used by the OFO
            controller for discretizing continuous batch-size updates
            and by the online datacenter for load-generator sizing.
    """

    model_config = ConfigDict(frozen=True)

    model_label: str
    model_id: str
    gpus_per_replica: int
    itl_deadline_s: float
    feasible_batch_sizes: tuple[int, ...]

    @model_validator(mode="after")
    def _validate(self) -> InferenceModelSpec:
        if self.gpus_per_replica < 1:
            raise ValueError(f"gpus_per_replica must be >= 1, got {self.gpus_per_replica}.")
        if self.itl_deadline_s <= 0:
            raise ValueError(f"itl_deadline_s must be > 0, got {self.itl_deadline_s}.")
        if not self.feasible_batch_sizes:
            raise ValueError("feasible_batch_sizes must not be empty.")
        return self

ModelDeployment dataclass

One model's deployment at a datacenter site.

Pairs an InferenceModelSpec (model identity) with the initial batch size. Replica counts (including runtime ramps) live on ReplicaSchedule and are passed separately via OfflineWorkload.replica_schedules.

Attributes:

Name Type Description
spec InferenceModelSpec

The model specification.

initial_batch_size int

Starting batch size for this deployment. Must be in spec.feasible_batch_sizes.

Source code in openg2g/datacenter/config.py
@dataclass(frozen=True)
class ModelDeployment:
    """One model's deployment at a datacenter site.

    Pairs an [`InferenceModelSpec`][openg2g.datacenter.config.InferenceModelSpec] (model identity)
    with the initial batch size. Replica counts (including runtime ramps)
    live on [`ReplicaSchedule`][openg2g.datacenter.config.ReplicaSchedule]
    and are passed separately via `OfflineWorkload.replica_schedules`.

    Attributes:
        spec: The model specification.
        initial_batch_size: Starting batch size for this deployment.
            Must be in `spec.feasible_batch_sizes`.
    """

    spec: InferenceModelSpec
    initial_batch_size: int

    def __post_init__(self) -> None:
        if self.initial_batch_size <= 0:
            raise ValueError(f"initial_batch_size must be > 0, got {self.initial_batch_size}.")
        if self.initial_batch_size not in self.spec.feasible_batch_sizes:
            raise ValueError(
                f"initial_batch_size ({self.initial_batch_size}) must be in "
                f"feasible_batch_sizes ({self.spec.feasible_batch_sizes})."
            )

TrainingRun

Training workload parameters.

The trace is eagerly rescaled so its peak matches target_peak_W_per_gpu. Use eval_power to evaluate total training power at a given simulation time.

Combine with at and | to build a TrainingSchedule:

schedule = (
    TrainingRun(n_gpus=2400, trace=trace_a).at(t_start=1000, t_end=2000)
    | TrainingRun(n_gpus=1200, trace=trace_b).at(t_start=2500, t_end=3500)
)

Attributes:

Name Type Description
n_gpus

Number of GPUs running the training workload.

trace

Single-GPU TrainingTrace.

target_peak_W_per_gpu

The trace is rescaled so its peak equals this value.

Source code in openg2g/datacenter/config.py
class TrainingRun:
    """Training workload parameters.

    The trace is eagerly rescaled so its peak matches `target_peak_W_per_gpu`.
    Use `eval_power` to evaluate total training power at a given simulation time.

    Combine with [`at`][.at] and `|` to build a [`TrainingSchedule`][..TrainingSchedule]:

    ```python
    schedule = (
        TrainingRun(n_gpus=2400, trace=trace_a).at(t_start=1000, t_end=2000)
        | TrainingRun(n_gpus=1200, trace=trace_b).at(t_start=2500, t_end=3500)
    )
    ```

    Attributes:
        n_gpus: Number of GPUs running the training workload.
        trace: Single-GPU [`TrainingTrace`][openg2g.datacenter.workloads.training.TrainingTrace].
        target_peak_W_per_gpu: The trace is rescaled so its peak equals this value.
    """

    __slots__ = ("_period", "_rescaled_power", "_trace_time", "n_gpus", "target_peak_W_per_gpu", "trace")

    def __init__(self, *, n_gpus: int, trace: TrainingTrace, target_peak_W_per_gpu: float = 400.0) -> None:
        if n_gpus <= 0:
            raise ValueError(f"TrainingRun n_gpus must be > 0, got {n_gpus}.")
        self.n_gpus = n_gpus
        self.trace = trace
        self.target_peak_W_per_gpu = target_peak_W_per_gpu

        t = np.asarray(trace.t_s, float)
        p = np.asarray(trace.power_w, float)
        t = t - t[0]
        period = float(t[-1] - t[0])
        if period <= 0:
            raise ValueError("Training trace time span must be positive.")
        peak = float(np.max(p))
        if peak <= 0:
            raise ValueError("Training trace has non-positive peak; cannot scale.")
        self._rescaled_power = p * (target_peak_W_per_gpu / peak)
        self._trace_time = t
        self._period = period

    def eval_power(self, t: float, t_start: float, t_end: float) -> float:
        """Evaluate total training power at simulation time `t`.

        Returns zero if `t` is outside `[t_start, t_end]`.

        Args:
            t: Global simulation time (seconds).
            t_start: Time when training becomes active (seconds).
            t_end: Time when training stops (seconds).

        Returns:
            Total training power (W) across all `n_gpus` GPUs.
        """
        if t < t_start or t > t_end:
            return 0.0
        t_local = t - t_start
        t_mod = t_local % self._period
        p_1gpu = float(np.interp(t_mod, self._trace_time, self._rescaled_power))
        return p_1gpu * self.n_gpus

    def at(self, t_start: float, t_end: float) -> TrainingSchedule:
        """Schedule this training run over `[t_start, t_end]`.

        Args:
            t_start: Global simulation time when training becomes active (seconds).
            t_end: Global simulation time when training stops (seconds).

        Returns:
            A single-entry [`TrainingSchedule`][...TrainingSchedule].
        """
        if t_end < t_start:
            raise ValueError(f"t_end ({t_end}) must be >= t_start ({t_start}).")
        return TrainingSchedule(((self, float(t_start), float(t_end)),))

eval_power(t, t_start, t_end)

Evaluate total training power at simulation time t.

Returns zero if t is outside [t_start, t_end].

Parameters:

Name Type Description Default
t float

Global simulation time (seconds).

required
t_start float

Time when training becomes active (seconds).

required
t_end float

Time when training stops (seconds).

required

Returns:

Type Description
float

Total training power (W) across all n_gpus GPUs.

Source code in openg2g/datacenter/config.py
def eval_power(self, t: float, t_start: float, t_end: float) -> float:
    """Evaluate total training power at simulation time `t`.

    Returns zero if `t` is outside `[t_start, t_end]`.

    Args:
        t: Global simulation time (seconds).
        t_start: Time when training becomes active (seconds).
        t_end: Time when training stops (seconds).

    Returns:
        Total training power (W) across all `n_gpus` GPUs.
    """
    if t < t_start or t > t_end:
        return 0.0
    t_local = t - t_start
    t_mod = t_local % self._period
    p_1gpu = float(np.interp(t_mod, self._trace_time, self._rescaled_power))
    return p_1gpu * self.n_gpus

at(t_start, t_end)

Schedule this training run over [t_start, t_end].

Parameters:

Name Type Description Default
t_start float

Global simulation time when training becomes active (seconds).

required
t_end float

Global simulation time when training stops (seconds).

required

Returns:

Type Description
TrainingSchedule

A single-entry TrainingSchedule.

Source code in openg2g/datacenter/config.py
def at(self, t_start: float, t_end: float) -> TrainingSchedule:
    """Schedule this training run over `[t_start, t_end]`.

    Args:
        t_start: Global simulation time when training becomes active (seconds).
        t_end: Global simulation time when training stops (seconds).

    Returns:
        A single-entry [`TrainingSchedule`][...TrainingSchedule].
    """
    if t_end < t_start:
        raise ValueError(f"t_end ({t_end}) must be >= t_start ({t_start}).")
    return TrainingSchedule(((self, float(t_start), float(t_end)),))

TrainingSchedule

Ordered collection of TrainingRun objects scheduled over time windows.

Each entry is a (TrainingRun, t_start, t_end) tuple. Entries are sorted by t_start.

Built with TrainingRun.at and |.

Example:

schedule = (
    TrainingRun(n_gpus=2400, trace=trace_a).at(t_start=1000, t_end=2000)
    | TrainingRun(n_gpus=1200, trace=trace_b).at(t_start=2500, t_end=3500)
)
Source code in openg2g/datacenter/config.py
class TrainingSchedule:
    """Ordered collection of [`TrainingRun`][..TrainingRun] objects scheduled
    over time windows.

    Each entry is a `(TrainingRun, t_start, t_end)` tuple. Entries are
    sorted by `t_start`.

    Built with [`TrainingRun.at`][..TrainingRun.at] and `|`.

    Example:

    ```python
    schedule = (
        TrainingRun(n_gpus=2400, trace=trace_a).at(t_start=1000, t_end=2000)
        | TrainingRun(n_gpus=1200, trace=trace_b).at(t_start=2500, t_end=3500)
    )
    ```
    """

    __slots__ = ("_entries",)

    def __init__(self, entries: tuple[tuple[TrainingRun, float, float], ...] = ()) -> None:
        self._entries = tuple(sorted(entries, key=lambda e: e[1]))

    def __or__(self, other: TrainingSchedule) -> TrainingSchedule:
        return TrainingSchedule((*self._entries, *other._entries))

    def __iter__(self) -> Iterator[tuple[TrainingRun, float, float]]:
        return iter(self._entries)

    def __len__(self) -> int:
        return len(self._entries)

    def __bool__(self) -> bool:
        return bool(self._entries)

    def __repr__(self) -> str:
        parts = [f"TrainingRun(n_gpus={r.n_gpus}).at(t_start={s}, t_end={e})" for r, s, e in self._entries]
        return " | ".join(parts)

ReplicaSchedule

Per-model replica count over time.

Specifies the initial replica count and optional linear ramps to new target counts over time windows. Method-chaining API:

ReplicaSchedule(initial=720).ramp_to(144, t_start=2500, t_end=3000)

ReplicaSchedule(initial=720)
    .ramp_to(144, t_start=2500, t_end=3000)
    .ramp_to(200, t_start=3200, t_end=3400)

Semantics: before the first ramp, the active count equals initial. During each [t_start, t_end] window the count linearly interpolates from the previous level to target. Between ramps, the count holds at the last target.

Attributes:

Name Type Description
initial int

Replica count before any ramp.

Source code in openg2g/datacenter/config.py
class ReplicaSchedule:
    """Per-model replica count over time.

    Specifies the initial replica count and optional linear ramps to
    new target counts over time windows. Method-chaining API:

    ```python
    ReplicaSchedule(initial=720).ramp_to(144, t_start=2500, t_end=3000)

    ReplicaSchedule(initial=720)
        .ramp_to(144, t_start=2500, t_end=3000)
        .ramp_to(200, t_start=3200, t_end=3400)
    ```

    Semantics: before the first ramp, the active count equals `initial`.
    During each `[t_start, t_end]` window the count linearly interpolates
    from the previous level to `target`. Between ramps, the count holds
    at the last target.

    Attributes:
        initial: Replica count before any ramp.
    """

    __slots__ = ("_initial", "_ramps")

    def __init__(self, *, initial: int) -> None:
        if initial < 0:
            raise ValueError(f"ReplicaSchedule initial must be >= 0, got {initial}.")
        self._initial = initial
        self._ramps: tuple[tuple[int, float, float], ...] = ()

    @property
    def initial(self) -> int:
        """Replica count before any ramp."""
        return self._initial

    def ramp_to(self, target: int, *, t_start: float, t_end: float) -> ReplicaSchedule:
        """Add a linear ramp to `target` over `[t_start, t_end]`.

        Ramps may be specified in any order; they are sorted by `t_start`
        in the returned schedule. The new ramp must not overlap any
        existing ramp's window (touching at the boundary is allowed:
        `prev.t_end == new.t_start`).

        Args:
            target: Target replica count after the ramp completes.
            t_start: Global simulation time when the ramp begins (seconds).
            t_end: Global simulation time when the ramp ends (seconds).

        Returns:
            A new `ReplicaSchedule` with the ramp added and ramps sorted
            by `t_start`.

        Raises:
            ValueError: If `target` is negative, `t_end < t_start`, or
                the new ramp overlaps any existing ramp window.
        """
        if target < 0:
            raise ValueError(f"ramp_to target must be >= 0, got {target}.")
        if t_end < t_start:
            raise ValueError(f"t_end ({t_end}) must be >= t_start ({t_start}).")
        # Reject overlap with existing ramps: [t_start, t_end] must not
        # overlap any existing [s, e]. Touching boundaries (t_end == s or
        # t_start == e) is allowed.
        for _target, s, e in self._ramps:
            if t_start < e and t_end > s:
                raise ValueError(f"ramp_to window [{t_start}, {t_end}] overlaps existing ramp window [{s}, {e}].")
        new = ReplicaSchedule(initial=self._initial)
        new._ramps = tuple(
            sorted(
                (*self._ramps, (int(target), float(t_start), float(t_end))),
                key=lambda r: r[1],
            )
        )
        return new

    def max_count(self) -> int:
        """Return the maximum replica count (initial or any ramp target)."""
        if not self._ramps:
            return self._initial
        return max(self._initial, *(target for target, _, _ in self._ramps))

    def count_at(self, t: float | np.ndarray) -> float | np.ndarray:
        """Evaluate the active replica count at time(s) *t*.

        Piecewise-linear interpolation between ramp events.
        Before the first ramp, returns `initial`.

        Args:
            t: Scalar or array of global simulation times (seconds).

        Returns:
            Active replica count(s), same shape as *t*.
        """
        if isinstance(t, np.ndarray):
            vfunc = np.vectorize(self._count_scalar, otypes=[float])
            return vfunc(t)
        return float(self._count_scalar(float(t)))

    def _count_scalar(self, t: float) -> float:
        level = float(self._initial)
        for target, t_start, t_end in self._ramps:
            if t < t_start:
                return level
            if t <= t_end:
                if t_end == t_start:
                    return float(target)
                alpha = (t - t_start) / (t_end - t_start)
                return level + (float(target) - level) * alpha
            level = float(target)
        return level

    def __repr__(self) -> str:
        parts = [f"ReplicaSchedule(initial={self._initial})"]
        for target, t_start, t_end in self._ramps:
            parts.append(f".ramp_to({target}, t_start={t_start}, t_end={t_end})")
        return "".join(parts)

    def __len__(self) -> int:
        return len(self._ramps)

    def __bool__(self) -> bool:
        return True

initial property

Replica count before any ramp.

ramp_to(target, *, t_start, t_end)

Add a linear ramp to target over [t_start, t_end].

Ramps may be specified in any order; they are sorted by t_start in the returned schedule. The new ramp must not overlap any existing ramp's window (touching at the boundary is allowed: prev.t_end == new.t_start).

Parameters:

Name Type Description Default
target int

Target replica count after the ramp completes.

required
t_start float

Global simulation time when the ramp begins (seconds).

required
t_end float

Global simulation time when the ramp ends (seconds).

required

Returns:

Type Description
ReplicaSchedule

A new ReplicaSchedule with the ramp added and ramps sorted

ReplicaSchedule

by t_start.

Raises:

Type Description
ValueError

If target is negative, t_end < t_start, or the new ramp overlaps any existing ramp window.

Source code in openg2g/datacenter/config.py
def ramp_to(self, target: int, *, t_start: float, t_end: float) -> ReplicaSchedule:
    """Add a linear ramp to `target` over `[t_start, t_end]`.

    Ramps may be specified in any order; they are sorted by `t_start`
    in the returned schedule. The new ramp must not overlap any
    existing ramp's window (touching at the boundary is allowed:
    `prev.t_end == new.t_start`).

    Args:
        target: Target replica count after the ramp completes.
        t_start: Global simulation time when the ramp begins (seconds).
        t_end: Global simulation time when the ramp ends (seconds).

    Returns:
        A new `ReplicaSchedule` with the ramp added and ramps sorted
        by `t_start`.

    Raises:
        ValueError: If `target` is negative, `t_end < t_start`, or
            the new ramp overlaps any existing ramp window.
    """
    if target < 0:
        raise ValueError(f"ramp_to target must be >= 0, got {target}.")
    if t_end < t_start:
        raise ValueError(f"t_end ({t_end}) must be >= t_start ({t_start}).")
    # Reject overlap with existing ramps: [t_start, t_end] must not
    # overlap any existing [s, e]. Touching boundaries (t_end == s or
    # t_start == e) is allowed.
    for _target, s, e in self._ramps:
        if t_start < e and t_end > s:
            raise ValueError(f"ramp_to window [{t_start}, {t_end}] overlaps existing ramp window [{s}, {e}].")
    new = ReplicaSchedule(initial=self._initial)
    new._ramps = tuple(
        sorted(
            (*self._ramps, (int(target), float(t_start), float(t_end))),
            key=lambda r: r[1],
        )
    )
    return new

max_count()

Return the maximum replica count (initial or any ramp target).

Source code in openg2g/datacenter/config.py
def max_count(self) -> int:
    """Return the maximum replica count (initial or any ramp target)."""
    if not self._ramps:
        return self._initial
    return max(self._initial, *(target for target, _, _ in self._ramps))

count_at(t)

Evaluate the active replica count at time(s) t.

Piecewise-linear interpolation between ramp events. Before the first ramp, returns initial.

Parameters:

Name Type Description Default
t float | ndarray

Scalar or array of global simulation times (seconds).

required

Returns:

Type Description
float | ndarray

Active replica count(s), same shape as t.

Source code in openg2g/datacenter/config.py
def count_at(self, t: float | np.ndarray) -> float | np.ndarray:
    """Evaluate the active replica count at time(s) *t*.

    Piecewise-linear interpolation between ramp events.
    Before the first ramp, returns `initial`.

    Args:
        t: Scalar or array of global simulation times (seconds).

    Returns:
        Active replica count(s), same shape as *t*.
    """
    if isinstance(t, np.ndarray):
        vfunc = np.vectorize(self._count_scalar, otypes=[float])
        return vfunc(t)
    return float(self._count_scalar(float(t)))

DatacenterConfig

Bases: BaseModel

Physical datacenter facility configuration.

Attributes:

Name Type Description
gpus_per_server int

Number of GPUs per physical server rack.

base_kw_per_phase float

Constant base load per phase (kW).

power_factor float

Power factor of the datacenter loads (lagging).

Source code in openg2g/datacenter/config.py
class DatacenterConfig(BaseModel):
    """Physical datacenter facility configuration.

    Attributes:
        gpus_per_server: Number of GPUs per physical server rack.
        base_kw_per_phase: Constant base load per phase (kW).
        power_factor: Power factor of the datacenter loads (lagging).
    """

    model_config = ConfigDict(frozen=True)

    gpus_per_server: int = 8
    base_kw_per_phase: float = 0.0
    power_factor: float = 0.95

    @model_validator(mode="after")
    def _validate(self) -> DatacenterConfig:
        if self.gpus_per_server < 1:
            raise ValueError(f"gpus_per_server must be >= 1, got {self.gpus_per_server}.")
        if not (0.0 < self.power_factor <= 1.0):
            raise ValueError(f"power_factor must be in (0, 1], got {self.power_factor}.")
        return self

PowerAugmentationConfig

Bases: BaseModel

Power augmentation settings for virtual server scaling.

Controls per-server amplitude jitter and additive noise applied during power augmentation.

Attributes:

Name Type Description
amplitude_scale_range tuple[float, float]

(low, high) range for per-server amplitude scaling. Each virtual server draws a uniform multiplier from this range.

noise_fraction float

Gaussian noise standard deviation as a fraction of per-server power.

Source code in openg2g/datacenter/config.py
class PowerAugmentationConfig(BaseModel):
    """Power augmentation settings for virtual server scaling.

    Controls per-server amplitude jitter and additive noise applied during
    power augmentation.

    Attributes:
        amplitude_scale_range: `(low, high)` range for per-server amplitude
            scaling. Each virtual server draws a uniform multiplier from this range.
        noise_fraction: Gaussian noise standard deviation as a fraction of
            per-server power.
    """

    model_config = ConfigDict(frozen=True)

    amplitude_scale_range: tuple[float, float] = (1.0, 1.0)
    noise_fraction: float = 0.0

    @model_validator(mode="after")
    def _validate(self) -> PowerAugmentationConfig:
        lo, hi = self.amplitude_scale_range
        if lo > hi:
            raise ValueError(f"amplitude_scale_range low ({lo}) must be <= high ({hi}).")
        if lo <= 0:
            raise ValueError(f"amplitude_scale_range low ({lo}) must be > 0.")
        if self.noise_fraction < 0:
            raise ValueError(f"noise_fraction must be >= 0, got {self.noise_fraction}.")
        return self

openg2g.datacenter.layout

Server pool primitives.

Provides the shared server pool used by datacenter backends. Power augmentation (scaling per-GPU power to three-phase datacenter power) lives in openg2g.datacenter.workloads.inference.

ServerPool dataclass

Shared pool of physical servers for a datacenter.

All models draw servers from this single pool based on their current GPU demand. Each server has model-independent properties (phase assignment, stagger offset, amplitude scale). The pool handles server-to-model allocation at each timestep via per-model priority orderings.

Attributes:

Name Type Description
num_servers int

Total number of physical servers in the pool.

gpus_per_server int

GPUs per physical server.

phase_list ndarray

Phase assignment per server (0=A, 1=B, 2=C).

stagger_offsets ndarray

Per-server offsets for desynchronization.

amplitude_scales ndarray

Per-server power multiplier for inter-server variation.

noise_fraction float

Gaussian noise standard deviation as a fraction of per-server power.

model_priorities dict[str, ndarray]

Per-model priority ordering over all servers. Each model gets a deterministic permutation of [0..num_servers).

Source code in openg2g/datacenter/layout.py
@dataclass
class ServerPool:
    """Shared pool of physical servers for a datacenter.

    All models draw servers from this single pool based on their current
    GPU demand. Each server has model-independent properties (phase
    assignment, stagger offset, amplitude scale). The pool handles
    server-to-model allocation at each timestep via per-model priority
    orderings.

    Attributes:
        num_servers: Total number of physical servers in the pool.
        gpus_per_server: GPUs per physical server.
        phase_list: Phase assignment per server (0=A, 1=B, 2=C).
        stagger_offsets: Per-server offsets for desynchronization.
        amplitude_scales: Per-server power multiplier for inter-server variation.
        noise_fraction: Gaussian noise standard deviation as a fraction of
            per-server power.
        model_priorities: Per-model priority ordering over all servers.
            Each model gets a deterministic permutation of `[0..num_servers)`.
    """

    num_servers: int
    gpus_per_server: int
    phase_list: np.ndarray
    stagger_offsets: np.ndarray
    amplitude_scales: np.ndarray
    noise_fraction: float
    model_priorities: dict[str, np.ndarray]

    def allocate(self, gpu_demands: dict[str, int]) -> dict[str, np.ndarray]:
        """Allocate servers to models with phase-balanced round-robin.

        Each model gets k = ceil(gpu_demand / gpus_per_server) servers.
        Servers are picked by cycling through phases (A, B, C, A, B, C, ...)
        and within each phase, picking the highest-priority unclaimed server.
        This ensures each model's allocation is as phase-balanced as possible.

        Args:
            gpu_demands: Mapping of model label to total GPUs needed.

        Returns:
            Mapping of model label to array of allocated server indices.
        """
        claimed = np.zeros(self.num_servers, dtype=bool)
        result: dict[str, np.ndarray] = {}

        # Pre-group servers by phase, sorted by priority per model
        phase_groups: dict[str, list[list[int]]] = {}
        for label in sorted(gpu_demands):
            priority = self.model_priorities[label]
            groups: list[list[int]] = [[], [], []]
            for idx in priority:
                groups[self.phase_list[idx]].append(idx)
            phase_groups[label] = groups

        for label in sorted(gpu_demands):
            demand = gpu_demands[label]
            if demand <= 0:
                result[label] = np.array([], dtype=int)
                continue
            k = math.ceil(demand / self.gpus_per_server)
            groups = phase_groups[label]
            cursors = [0, 0, 0]
            allocated: list[int] = []
            phase = 0

            while len(allocated) < k:
                # Try each phase starting from current, wrapping around
                found = False
                for attempt in range(3):
                    p = (phase + attempt) % 3
                    while cursors[p] < len(groups[p]):
                        idx = groups[p][cursors[p]]
                        cursors[p] += 1
                        if not claimed[idx]:
                            allocated.append(idx)
                            claimed[idx] = True
                            phase = (p + 1) % 3
                            found = True
                            break
                    if found:
                        break
                if not found:
                    raise RuntimeError(
                        f"ServerPool over-subscribed: model {label!r} requested "
                        f"{k} servers but pool has no more unclaimed servers."
                    )

            result[label] = np.array(allocated, dtype=int)

        return result

allocate(gpu_demands)

Allocate servers to models with phase-balanced round-robin.

Each model gets k = ceil(gpu_demand / gpus_per_server) servers. Servers are picked by cycling through phases (A, B, C, A, B, C, ...) and within each phase, picking the highest-priority unclaimed server. This ensures each model's allocation is as phase-balanced as possible.

Parameters:

Name Type Description Default
gpu_demands dict[str, int]

Mapping of model label to total GPUs needed.

required

Returns:

Type Description
dict[str, ndarray]

Mapping of model label to array of allocated server indices.

Source code in openg2g/datacenter/layout.py
def allocate(self, gpu_demands: dict[str, int]) -> dict[str, np.ndarray]:
    """Allocate servers to models with phase-balanced round-robin.

    Each model gets k = ceil(gpu_demand / gpus_per_server) servers.
    Servers are picked by cycling through phases (A, B, C, A, B, C, ...)
    and within each phase, picking the highest-priority unclaimed server.
    This ensures each model's allocation is as phase-balanced as possible.

    Args:
        gpu_demands: Mapping of model label to total GPUs needed.

    Returns:
        Mapping of model label to array of allocated server indices.
    """
    claimed = np.zeros(self.num_servers, dtype=bool)
    result: dict[str, np.ndarray] = {}

    # Pre-group servers by phase, sorted by priority per model
    phase_groups: dict[str, list[list[int]]] = {}
    for label in sorted(gpu_demands):
        priority = self.model_priorities[label]
        groups: list[list[int]] = [[], [], []]
        for idx in priority:
            groups[self.phase_list[idx]].append(idx)
        phase_groups[label] = groups

    for label in sorted(gpu_demands):
        demand = gpu_demands[label]
        if demand <= 0:
            result[label] = np.array([], dtype=int)
            continue
        k = math.ceil(demand / self.gpus_per_server)
        groups = phase_groups[label]
        cursors = [0, 0, 0]
        allocated: list[int] = []
        phase = 0

        while len(allocated) < k:
            # Try each phase starting from current, wrapping around
            found = False
            for attempt in range(3):
                p = (phase + attempt) % 3
                while cursors[p] < len(groups[p]):
                    idx = groups[p][cursors[p]]
                    cursors[p] += 1
                    if not claimed[idx]:
                        allocated.append(idx)
                        claimed[idx] = True
                        phase = (p + 1) % 3
                        found = True
                        break
                if found:
                    break
            if not found:
                raise RuntimeError(
                    f"ServerPool over-subscribed: model {label!r} requested "
                    f"{k} servers but pool has no more unclaimed servers."
                )

        result[label] = np.array(allocated, dtype=int)

    return result

openg2g.datacenter.offline

Offline (trace-based) datacenter backend.

OfflineDatacenterState dataclass

Bases: LLMDatacenterState

Extended state from the offline (trace-based) backend.

Adds per-model power breakdown to LLMDatacenterState.

Source code in openg2g/datacenter/offline.py
@dataclass(frozen=True)
class OfflineDatacenterState(LLMDatacenterState):
    """Extended state from the offline (trace-based) backend.

    Adds per-model power breakdown to
    [`LLMDatacenterState`][openg2g.datacenter.base.LLMDatacenterState].
    """

    power_by_model_w: dict[str, float] = field(default_factory=dict)

OfflineWorkload dataclass

Complete offline simulation workload.

Bundles inference data with per-model replica schedules, batch sizes, and optional training overlays.

Attributes:

Name Type Description
inference_data InferenceData

LLM inference workload with offline simulation data (model specs, power templates, ITL fits).

replica_schedules dict[str, ReplicaSchedule]

Per-model replica count schedules. Each key is a model label, each value is a ReplicaSchedule specifying initial count and optional ramps.

initial_batch_sizes dict[str, int]

Mapping of model label to initial batch size.

training TrainingSchedule

Training workload schedule. An empty schedule disables training overlay.

Source code in openg2g/datacenter/offline.py
@dataclass
class OfflineWorkload:
    """Complete offline simulation workload.

    Bundles inference data with per-model replica schedules, batch sizes,
    and optional training overlays.

    Attributes:
        inference_data: LLM inference workload with offline simulation
            data (model specs, power templates, ITL fits).
        replica_schedules: Per-model replica count schedules. Each key is
            a model label, each value is a `ReplicaSchedule` specifying
            initial count and optional ramps.
        initial_batch_sizes: Mapping of model label to initial batch size.
        training: Training workload schedule. An empty schedule disables
            training overlay.
    """

    inference_data: InferenceData
    replica_schedules: dict[str, ReplicaSchedule] = field(default_factory=dict)
    initial_batch_sizes: dict[str, int] = field(default_factory=dict)
    training: TrainingSchedule = field(default_factory=TrainingSchedule)

OfflineDatacenter

Bases: LLMBatchSizeControlledDatacenter[OfflineDatacenterState]

Trace-based datacenter simulation with step-by-step interface.

Each step call computes one timestep of power output by indexing into pre-built per-GPU templates, applying per-server amplitude scaling and noise, and summing across active servers per phase.

Batch size changes via apply_control take effect on the next step call.

Each model with active replicas gets a ServerPool with a random priority ordering that determines server activation.

Parameters:

Name Type Description Default
datacenter DatacenterConfig

Facility configuration (GPUs per server, base load).

required
workload OfflineWorkload

Offline workload configuration bundling inference data, training overlays, and server ramp events.

required
dt_s Fraction

Simulation timestep (seconds).

required
seed int

Random seed for layout generation, noise, and latency sampling. Sub-seeds are derived deterministically.

0
power_augmentation PowerAugmentationConfig | None

Per-server amplitude scaling and noise settings.

None
Source code in openg2g/datacenter/offline.py
class OfflineDatacenter(LLMBatchSizeControlledDatacenter[OfflineDatacenterState]):
    """Trace-based datacenter simulation with step-by-step interface.

    Each `step` call computes one timestep of power output by indexing
    into pre-built per-GPU templates, applying per-server amplitude
    scaling and noise, and summing across active servers per phase.

    Batch size changes via `apply_control` take effect on the next
    `step` call.

    Each model with active replicas gets a
    [`ServerPool`][openg2g.datacenter.layout.ServerPool] with a random
    priority ordering that determines server activation.

    Args:
        datacenter: Facility configuration (GPUs per server, base load).
        workload: Offline workload configuration bundling inference data,
            training overlays, and server ramp events.
        dt_s: Simulation timestep (seconds).
        seed: Random seed for layout generation, noise, and latency
            sampling. Sub-seeds are derived deterministically.
        power_augmentation: Per-server amplitude scaling and noise
            settings.
    """

    def __init__(
        self,
        datacenter: DatacenterConfig,
        workload: OfflineWorkload,
        *,
        name: str,
        dt_s: Fraction,
        seed: int = 0,
        power_augmentation: PowerAugmentationConfig | None = None,
        total_gpu_capacity: int,
    ) -> None:
        super().__init__(name=name)
        if power_augmentation is None:
            power_augmentation = PowerAugmentationConfig()

        self._datacenter = datacenter
        self._workload = workload
        self._power_augmentation = power_augmentation
        self._total_gpu_capacity = total_gpu_capacity
        self._replica_counts = {label: sched.initial for label, sched in workload.replica_schedules.items()}
        self._dt_s = dt_s
        self._seed = int(seed)
        self._models = list(workload.inference_data.models)
        self._base_W_per_phase = float(datacenter.base_kw_per_phase) * 1e3

        # Validate: initial GPU usage must not exceed capacity
        initial_usage = sum(self._replica_counts.get(ms.model_label, 0) * ms.gpus_per_replica for ms in self._models)
        if initial_usage > total_gpu_capacity:
            raise ValueError(f"Initial GPU usage ({initial_usage}) exceeds total_gpu_capacity ({total_gpu_capacity}).")

        # Validate ramp schedule against GPU capacity
        self._validate_ramp_capacity()

        self._layout_rng = np.random.default_rng(self._seed)
        self._batch_by_model: dict[str, int] = {
            ms.model_label: self._workload.initial_batch_sizes.get(ms.model_label, ms.feasible_batch_sizes[0])
            for ms in self._models
        }

        self._model_schedules: dict[str, ReplicaSchedule] = {
            ms.model_label: self._workload.replica_schedules[ms.model_label]
            for ms in self._models
            if ms.model_label in self._workload.replica_schedules
        }
        self._pool: ServerPool = self._build_server_pool()
        self._inference_augmenter = InferencePowerAugmenter(
            pool=self._pool,
            gpus_per_replica_by_model={ms.model_label: ms.gpus_per_replica for ms in self._models},
            seed=self._seed + 12345,
        )

        self._global_step: int = 0
        self._latency_rng = np.random.default_rng(self._seed + 54321)
        self._replica_offset_by_model: dict[str, int] = {ms.model_label: 0 for ms in self._models}
        self._last_allocation: dict[str, np.ndarray] = {}

        logger.info(
            "OfflineDatacenter: %d models, dt=%s s, seed=%d, gpu_capacity=%d",
            len(self._models),
            dt_s,
            seed,
            total_gpu_capacity,
        )
        for ms in self._models:
            n_rep = self._replica_counts.get(ms.model_label, 0)
            logger.info(
                "  %s: %d replicas, %d GPUs/replica, batch=%d",
                ms.model_label,
                n_rep,
                ms.gpus_per_replica,
                self._batch_by_model.get(ms.model_label, 0),
            )

    def _validate_ramp_capacity(self) -> None:
        """Check that the ramp schedule never exceeds total GPU capacity."""
        schedules = self._workload.replica_schedules
        # Collect all ramp boundary times
        boundary_times: set[float] = set()
        for sched in schedules.values():
            for _target, t_start, t_end in sched._ramps:
                boundary_times.add(t_start)
                boundary_times.add(t_end)
        if not boundary_times:
            return
        # At each boundary, compute total GPU usage
        for t in sorted(boundary_times):
            total_gpus = 0
            for ms in self._models:
                label = ms.model_label
                sched = schedules.get(label)
                if sched is None:
                    continue
                count = sched.count_at(t)
                total_gpus += int(round(float(count))) * ms.gpus_per_replica
            if total_gpus > self._total_gpu_capacity:
                raise ValueError(
                    f"Ramp schedule exceeds total_gpu_capacity at t={t:.1f}s: "
                    f"needs {total_gpus} GPUs but capacity is {self._total_gpu_capacity}."
                )

    @property
    def total_gpu_capacity(self) -> int:
        """Maximum number of GPUs this datacenter can host."""
        return self._total_gpu_capacity

    def current_gpu_usage(self) -> int:
        """Current total GPU usage across all models (initial + offsets)."""
        total = 0
        for ms in self._models:
            initial = self._replica_counts.get(ms.model_label, 0)
            effective = max(0, initial + self._replica_offset_by_model.get(ms.model_label, 0))
            total += effective * ms.gpus_per_replica
        return total

    def available_gpu_capacity(self) -> int:
        """Remaining GPU slots available for incoming replicas."""
        return max(0, self._total_gpu_capacity - self.current_gpu_usage())

    @property
    def dt_s(self) -> Fraction:
        return self._dt_s

    def step(self, clock: SimulationClock, events: EventEmitter) -> OfflineDatacenterState:
        t_now = clock.time_s
        template_store = self._workload.inference_data.power_templates

        # Build per-GPU power (indexed into shared pool) and effective replica counts.
        pool = self._pool
        per_gpu_by_model: dict[str, np.ndarray] = {}
        replica_counts: dict[str, int] = {}
        for ms in self._models:
            label = ms.model_label
            if self._replica_counts.get(label, 0) <= 0:
                continue
            batch = int(self._batch_by_model[label])

            template = template_store.template(label, batch)
            indices = (self._global_step + pool.stagger_offsets) % len(template)
            per_gpu_by_model[label] = template[indices]

            schedule = self._model_schedules[label]
            offset = self._replica_offset_by_model.get(label, 0)
            replica_counts[label] = max(0, int(round(schedule.count_at(t_now))) + offset)

        inference_aug = self._inference_augmenter.augment(per_gpu_by_model, replica_counts)
        self._last_allocation = inference_aug.allocation

        power_by_model = dict(inference_aug.power_by_model_w)
        active_replicas_by_model = dict(inference_aug.active_replicas_by_model)
        for ms in self._models:
            power_by_model.setdefault(ms.model_label, 0.0)
            active_replicas_by_model.setdefault(ms.model_label, 0)

        # This is where we accumulate power across workloads.
        phase_power = np.array(
            [
                self._base_W_per_phase + inference_aug.power_w.a,
                self._base_W_per_phase + inference_aug.power_w.b,
                self._base_W_per_phase + inference_aug.power_w.c,
            ]
        )

        # Training overlay
        for run, t_start, t_end in self._workload.training:
            training_power_w = run.eval_power(float(t_now), t_start, t_end)
            phase_power += training_power_w / 3.0

        # ITL sampling
        itl_fits = self._workload.inference_data.itl_fits
        observed_itl_s_by_model: dict[str, float] = {}
        for ms in self._models:
            label = ms.model_label
            n_rep = active_replicas_by_model.get(label, 0)
            if itl_fits is None or n_rep <= 0:
                observed_itl_s_by_model[label] = float("nan")
                continue
            batch = int(self._batch_by_model[label])
            observed_itl_s_by_model[label] = itl_fits.sample_avg(
                model_label=label,
                batch_size=batch,
                n_replicas=n_rep,
                rng=self._latency_rng,
            )

        state = OfflineDatacenterState(
            time_s=float(t_now),
            power_w=ThreePhase(
                a=float(phase_power[0]),
                b=float(phase_power[1]),
                c=float(phase_power[2]),
            ),
            power_by_model_w=power_by_model,
            active_replicas_by_model=active_replicas_by_model,
            batch_size_by_model=dict(self._batch_by_model),
            observed_itl_s_by_model=observed_itl_s_by_model,
        )
        self._global_step += 1
        return state

    @functools.singledispatchmethod
    def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
        """Apply a control command. Dispatches on command type."""
        raise TypeError(f"OfflineDatacenter does not support {type(command).__name__}")

    @apply_control.register
    def apply_control_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
        """Record new batch sizes. Changes take effect on the next step."""
        if command.ramp_up_rate_by_model:
            raise ValueError(
                f"OfflineDatacenter does not support ramp_up_rate_by_model (got {command.ramp_up_rate_by_model}). "
                f"Batch size changes are always immediate in trace-based simulation."
            )
        for label, b in command.batch_size_by_model.items():
            b_int = int(b)
            if b_int <= 0:
                raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
            old = self._batch_by_model.get(str(label))
            self._batch_by_model[str(label)] = b_int
            if old != b_int:
                logger.info("Batch size %s: %s -> %d", label, old, b_int)
        events.emit(
            "datacenter.batch_size.updated",
            {"batch_size_by_model": dict(self._batch_by_model)},
        )

    @apply_control.register
    def apply_control_shift_replicas(self, command: ShiftReplicas, events: EventEmitter) -> None:
        """Shift replicas for a model by adjusting the replica offset."""
        label = command.model_label
        delta = command.replica_delta
        if label not in self._replica_offset_by_model:
            raise ValueError(f"ShiftReplicas: unknown model {label!r}")

        self._replica_offset_by_model[label] += delta

        initial = self._replica_counts.get(label, 0)
        effective = max(0, initial + self._replica_offset_by_model[label])
        events.emit(
            "datacenter.replicas.shifted",
            {"model_label": label, "replica_delta": delta, "effective_replicas": effective},
        )

    def reset(self) -> None:
        self._global_step = 0
        self._batch_by_model = {
            ms.model_label: self._workload.initial_batch_sizes.get(ms.model_label, ms.feasible_batch_sizes[0])
            for ms in self._models
        }
        self._replica_offset_by_model = {ms.model_label: 0 for ms in self._models}
        self._last_allocation = {}
        self._layout_rng = np.random.default_rng(self._seed)
        self._model_schedules = {
            ms.model_label: self._workload.replica_schedules[ms.model_label]
            for ms in self._models
            if ms.model_label in self._workload.replica_schedules
        }
        self._pool = self._build_server_pool()
        self._inference_augmenter = InferencePowerAugmenter(
            pool=self._pool,
            gpus_per_replica_by_model={ms.model_label: ms.gpus_per_replica for ms in self._models},
            seed=self._seed + 12345,
        )
        self._latency_rng = np.random.default_rng(self._seed + 54321)

    def _build_server_pool(self) -> ServerPool:
        """Build shared server pool for the datacenter."""
        rng = self._layout_rng
        gpus_per_server = self._datacenter.gpus_per_server
        amp_lo, amp_hi = self._power_augmentation.amplitude_scale_range
        noise_fraction = self._power_augmentation.noise_fraction
        template_store = self._workload.inference_data.power_templates

        num_servers = math.ceil(self._total_gpu_capacity / gpus_per_server)

        # Server properties (model-independent)
        sA, sB, sC = split_integer_evenly(num_servers, 3)
        phase_list = np.asarray(([0] * sA) + ([1] * sB) + ([2] * sC), dtype=int)
        rng.shuffle(phase_list)

        # Stagger offsets: use max template length across all models
        max_tpl_len = 1
        for ms in self._models:
            if ms.model_label in self._replica_counts and self._replica_counts[ms.model_label] > 0:
                any_batch = template_store.batch_sizes(ms.model_label)[0]
                tpl_len = len(template_store.template(ms.model_label, any_batch))
                max_tpl_len = max(max_tpl_len, tpl_len)
        stagger_offsets = rng.integers(low=0, high=max_tpl_len, size=num_servers)

        amplitude_scales = rng.uniform(amp_lo, amp_hi, size=num_servers)

        # Per-model priority orderings
        model_priorities: dict[str, np.ndarray] = {}
        for ms in self._models:
            priority = np.arange(num_servers, dtype=int)
            rng.shuffle(priority)
            model_priorities[ms.model_label] = priority

        return ServerPool(
            num_servers=num_servers,
            gpus_per_server=gpus_per_server,
            phase_list=phase_list,
            stagger_offsets=stagger_offsets,
            amplitude_scales=amplitude_scales,
            noise_fraction=noise_fraction,
            model_priorities=model_priorities,
        )

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors from the last pool allocation.

        Returns:
            Mapping of model label to a 3-element array `[frac_A, frac_B, frac_C]`
                representing the fraction of allocated servers on each phase.
        """
        shares: dict[str, np.ndarray] = {}
        for label, server_indices in self._last_allocation.items():
            if len(server_indices) == 0:
                continue
            counts = np.bincount(self._pool.phase_list[server_indices], minlength=3).astype(float)
            total = counts.sum()
            shares[label] = counts / total if total > 0 else np.array([1 / 3, 1 / 3, 1 / 3], dtype=float)
        return shares

total_gpu_capacity property

Maximum number of GPUs this datacenter can host.

phase_share_by_model property

Per-model phase share vectors from the last pool allocation.

Returns:

Type Description
dict[str, ndarray]

Mapping of model label to a 3-element array [frac_A, frac_B, frac_C] representing the fraction of allocated servers on each phase.

current_gpu_usage()

Current total GPU usage across all models (initial + offsets).

Source code in openg2g/datacenter/offline.py
def current_gpu_usage(self) -> int:
    """Current total GPU usage across all models (initial + offsets)."""
    total = 0
    for ms in self._models:
        initial = self._replica_counts.get(ms.model_label, 0)
        effective = max(0, initial + self._replica_offset_by_model.get(ms.model_label, 0))
        total += effective * ms.gpus_per_replica
    return total

available_gpu_capacity()

Remaining GPU slots available for incoming replicas.

Source code in openg2g/datacenter/offline.py
def available_gpu_capacity(self) -> int:
    """Remaining GPU slots available for incoming replicas."""
    return max(0, self._total_gpu_capacity - self.current_gpu_usage())

apply_control(command, events)

Apply a control command. Dispatches on command type.

Source code in openg2g/datacenter/offline.py
@functools.singledispatchmethod
def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
    """Apply a control command. Dispatches on command type."""
    raise TypeError(f"OfflineDatacenter does not support {type(command).__name__}")

apply_control_set_batch_size(command, events)

Record new batch sizes. Changes take effect on the next step.

Source code in openg2g/datacenter/offline.py
@apply_control.register
def apply_control_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
    """Record new batch sizes. Changes take effect on the next step."""
    if command.ramp_up_rate_by_model:
        raise ValueError(
            f"OfflineDatacenter does not support ramp_up_rate_by_model (got {command.ramp_up_rate_by_model}). "
            f"Batch size changes are always immediate in trace-based simulation."
        )
    for label, b in command.batch_size_by_model.items():
        b_int = int(b)
        if b_int <= 0:
            raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
        old = self._batch_by_model.get(str(label))
        self._batch_by_model[str(label)] = b_int
        if old != b_int:
            logger.info("Batch size %s: %s -> %d", label, old, b_int)
    events.emit(
        "datacenter.batch_size.updated",
        {"batch_size_by_model": dict(self._batch_by_model)},
    )

apply_control_shift_replicas(command, events)

Shift replicas for a model by adjusting the replica offset.

Source code in openg2g/datacenter/offline.py
@apply_control.register
def apply_control_shift_replicas(self, command: ShiftReplicas, events: EventEmitter) -> None:
    """Shift replicas for a model by adjusting the replica offset."""
    label = command.model_label
    delta = command.replica_delta
    if label not in self._replica_offset_by_model:
        raise ValueError(f"ShiftReplicas: unknown model {label!r}")

    self._replica_offset_by_model[label] += delta

    initial = self._replica_counts.get(label, 0)
    effective = max(0, initial + self._replica_offset_by_model[label])
    events.emit(
        "datacenter.replicas.shifted",
        {"model_label": label, "replica_delta": delta, "effective_replicas": effective},
    )

openg2g.datacenter.online

Online (live GPU) datacenter backend with power augmentation.

Connects to real vLLM inference servers for load generation and ITL measurement, and to zeusd instances for live GPU power monitoring. Power readings from a small number of real GPUs are augmented to datacenter scale using the shared InferencePowerAugmenter pipeline.

Requires pip install zeus aiohttp.

STAGGER_BUFFER_S = 10.0 module-attribute

Seconds of power history for temporal staggering.

Also used as the stagger range when building ServerPool (float offsets drawn from [0, STAGGER_BUFFER_S)).

Not user-configurable. Patchable for testing via openg2g.datacenter.online.STAGGER_BUFFER_S = ....

OnlineDatacenterState dataclass

Bases: LLMDatacenterState

Extended state from the online (live GPU) backend.

The base power_w field carries the augmented three-phase power (what the grid sees). This subclass adds the measured (pre-augmentation) breakdown for post-hoc analysis.

Attributes:

Name Type Description
measured_power_w ThreePhase

Total measured three-phase power from real GPUs (before augmentation), plus base load.

measured_power_w_by_model dict[str, float]

Per-model total measured power from real GPUs (watts).

augmented_power_w_by_model dict[str, float]

Per-model augmented power (watts). This is the power fed to the grid for each model after scaling up.

augmentation_factor_by_model dict[str, float]

Per-model augmentation multiplier (virtual replicas / real replicas).

prometheus_metrics_by_model dict[str, dict[str, float]]

Per-model Prometheus metrics snapshot. Keys are model labels, values are dicts with metric names like num_requests_running, num_requests_waiting, kv_cache_usage_perc, num_preemptions_total.

Source code in openg2g/datacenter/online.py
@dataclass(frozen=True)
class OnlineDatacenterState(LLMDatacenterState):
    """Extended state from the online (live GPU) backend.

    The base `power_w`
    field carries the augmented three-phase power (what the grid sees).
    This subclass adds the measured (pre-augmentation) breakdown for
    post-hoc analysis.

    Attributes:
        measured_power_w: Total measured three-phase power from real GPUs
            (before augmentation), plus base load.
        measured_power_w_by_model: Per-model total measured power from real
            GPUs (watts).
        augmented_power_w_by_model: Per-model augmented power (watts). This
            is the power fed to the grid for each model after scaling up.
        augmentation_factor_by_model: Per-model augmentation multiplier
            (virtual replicas / real replicas).
        prometheus_metrics_by_model: Per-model Prometheus metrics snapshot.
            Keys are model labels, values are dicts with metric names like
            `num_requests_running`, `num_requests_waiting`,
            `kv_cache_usage_perc`, `num_preemptions_total`.
    """

    measured_power_w: ThreePhase = field(default_factory=lambda: ThreePhase(a=0.0, b=0.0, c=0.0))
    measured_power_w_by_model: dict[str, float] = field(default_factory=dict)
    augmented_power_w_by_model: dict[str, float] = field(default_factory=dict)
    augmentation_factor_by_model: dict[str, float] = field(default_factory=dict)
    prometheus_metrics_by_model: dict[str, dict[str, float]] = field(default_factory=dict)

GPUEndpointMapping

Bases: BaseModel

Maps a zeusd endpoint to specific GPUs.

Attributes:

Name Type Description
host str

Hostname or IP of the zeusd instance.

port int

TCP port of the zeusd instance.

gpu_indices tuple[int, ...]

GPU device indices to monitor on this endpoint.

Source code in openg2g/datacenter/online.py
class GPUEndpointMapping(BaseModel):
    """Maps a zeusd endpoint to specific GPUs.

    Attributes:
        host: Hostname or IP of the zeusd instance.
        port: TCP port of the zeusd instance.
        gpu_indices: GPU device indices to monitor on this endpoint.
    """

    model_config = ConfigDict(frozen=True)

    host: str
    port: int = 4938
    gpu_indices: tuple[int, ...] = (0,)

    @property
    def endpoint_key(self) -> str:
        """Return the `host:port` key used by `PowerStreamingClient`."""
        return f"{self.host}:{self.port}"

endpoint_key property

Return the host:port key used by PowerStreamingClient.

VLLMDeployment

Bases: BaseModel

Deployment of one LLM model on a vLLM server.

Warning

vLLM must be a patched version with the POST /set_max_num_seqs endpoint implemented.

Pairs a reusable InferenceModelSpec with physical deployment details. simulated_num_replicas is the augmented replica count for grid simulation. The real replica count is derived from gpu_endpoints and spec.gpus_per_replica.

Tracks the current batch size (max_num_seqs) and provides set_batch_size() to update it on the vLLM server.

Attributes:

Name Type Description
spec InferenceModelSpec

Model specification (shared with offline datacenter).

simulated_num_replicas int

Number of replicas to simulate for grid power augmentation. Must be specified explicitly.

vllm_base_url str

Base URL of the vLLM server (e.g. http://node1:8000).

gpu_endpoints tuple[GPUEndpointMapping, ...]

GPU endpoint mappings for power monitoring.

request_extra_body dict[str, Any] | None

Extra fields merged into every request dict for this model (e.g. chat_template_kwargs).

initial_batch_size int

Starting batch size. The batch_size field is initialized from this value.

batch_size int

Current batch size (max_num_seqs). Initialized from initial_batch_size if not set explicitly.

Source code in openg2g/datacenter/online.py
class VLLMDeployment(BaseModel):
    """Deployment of one LLM model on a vLLM server.

    !!! Warning
        vLLM must be a patched version with the `POST /set_max_num_seqs`
        endpoint implemented.

    Pairs a reusable
    [`InferenceModelSpec`][openg2g.datacenter.config.InferenceModelSpec]
    with physical deployment details. `simulated_num_replicas` is the
    augmented replica count for grid simulation. The real replica
    count is derived from `gpu_endpoints` and `spec.gpus_per_replica`.

    Tracks the current batch size (`max_num_seqs`) and provides
    `set_batch_size()` to update it on the vLLM server.

    Attributes:
        spec: Model specification (shared with offline datacenter).
        simulated_num_replicas: Number of replicas to simulate for grid
            power augmentation. Must be specified explicitly.
        vllm_base_url: Base URL of the vLLM server (e.g. `http://node1:8000`).
        gpu_endpoints: GPU endpoint mappings for power monitoring.
        request_extra_body: Extra fields merged into every request dict
            for this model (e.g. `chat_template_kwargs`).
        initial_batch_size: Starting batch size. The `batch_size` field
            is initialized from this value.
        batch_size: Current batch size (`max_num_seqs`). Initialized from
            `initial_batch_size` if not set explicitly.
    """

    spec: InferenceModelSpec
    simulated_num_replicas: int
    initial_batch_size: int
    vllm_base_url: str
    gpu_endpoints: tuple[GPUEndpointMapping, ...] = ()
    request_extra_body: dict[str, Any] | None = None
    batch_size: int = 0

    def model_post_init(self, __context: Any) -> None:
        if self.batch_size == 0:
            self.batch_size = self.initial_batch_size

    @property
    def model_label(self) -> str:
        return self.spec.model_label

    @property
    def num_real_gpus(self) -> int:
        """Total number of real GPUs for this model across all endpoints."""
        return sum(len(ep.gpu_indices) for ep in self.gpu_endpoints)

    @property
    def num_real_replicas(self) -> int:
        """Number of real replicas (real GPUs / GPUs per replica)."""
        return self.num_real_gpus // max(self.spec.gpus_per_replica, 1)

    @property
    def augmentation_factor(self) -> float:
        """Ratio of simulated replicas to real replicas."""
        return self.simulated_num_replicas / max(self.num_real_replicas, 1)

    def set_batch_size(self, batch_size: int, ramp_up_rate: float = 0.0) -> None:
        """Update batch size on the vLLM server and track it locally.

        Sends `POST /set_max_num_seqs` to the vLLM server.

        Args:
            batch_size: New batch size (max_num_seqs) to set.
            ramp_up_rate: Optional ramp-up rate for gradual increase.
        """
        old = self.batch_size
        url = f"{self.vllm_base_url}/set_max_num_seqs?max_num_seqs={batch_size}"
        if ramp_up_rate > 0:
            url += f"&ramp_up_rate={ramp_up_rate}"
        try:
            req = urllib.request.Request(url, method="POST", data=b"")
            with urllib.request.urlopen(req, timeout=2.0) as resp:
                if resp.status >= 400:
                    raise RuntimeError(
                        f"Failed to set batch size {batch_size} on {self.vllm_base_url}: HTTP {resp.status}"
                    )
        except Exception:
            logger.error(
                "Failed to set batch size %d on %s (keeping old=%d)",
                batch_size,
                self.vllm_base_url,
                old,
                exc_info=True,
            )
            raise
        self.batch_size = batch_size
        if old != batch_size:
            logger.info("Batch size %s: %d -> %d", self.model_label, old, batch_size)

num_real_gpus property

Total number of real GPUs for this model across all endpoints.

num_real_replicas property

Number of real replicas (real GPUs / GPUs per replica).

augmentation_factor property

Ratio of simulated replicas to real replicas.

set_batch_size(batch_size, ramp_up_rate=0.0)

Update batch size on the vLLM server and track it locally.

Sends POST /set_max_num_seqs to the vLLM server.

Parameters:

Name Type Description Default
batch_size int

New batch size (max_num_seqs) to set.

required
ramp_up_rate float

Optional ramp-up rate for gradual increase.

0.0
Source code in openg2g/datacenter/online.py
def set_batch_size(self, batch_size: int, ramp_up_rate: float = 0.0) -> None:
    """Update batch size on the vLLM server and track it locally.

    Sends `POST /set_max_num_seqs` to the vLLM server.

    Args:
        batch_size: New batch size (max_num_seqs) to set.
        ramp_up_rate: Optional ramp-up rate for gradual increase.
    """
    old = self.batch_size
    url = f"{self.vllm_base_url}/set_max_num_seqs?max_num_seqs={batch_size}"
    if ramp_up_rate > 0:
        url += f"&ramp_up_rate={ramp_up_rate}"
    try:
        req = urllib.request.Request(url, method="POST", data=b"")
        with urllib.request.urlopen(req, timeout=2.0) as resp:
            if resp.status >= 400:
                raise RuntimeError(
                    f"Failed to set batch size {batch_size} on {self.vllm_base_url}: HTTP {resp.status}"
                )
    except Exception:
        logger.error(
            "Failed to set batch size %d on %s (keeping old=%d)",
            batch_size,
            self.vllm_base_url,
            old,
            exc_info=True,
        )
        raise
    self.batch_size = batch_size
    if old != batch_size:
        logger.info("Batch size %s: %d -> %d", self.model_label, old, batch_size)

LiveServerConfig

Bases: BaseModel

Configuration for interacting with live vLLM servers.

Groups settings related to load generation, ITL measurement, and Prometheus monitoring. The online counterpart of offline's trace/template data.

Attributes:

Name Type Description
requests_dir Path | None

Directory containing per-model JSONL request files (e.g. {model_label}.jsonl). If None, a minimal fallback request is used for each model.

prometheus_poll_interval_s float

How often to poll vLLM /metrics for request counts and saturation monitoring. Set to 0 to disable.

max_output_tokens int

Token limit for generated load requests (used by the fallback request when no JSONL requests are provided).

itl_window_s float

Sliding window for ITL averaging (seconds).

Source code in openg2g/datacenter/online.py
class LiveServerConfig(BaseModel):
    """Configuration for interacting with live vLLM servers.

    Groups settings related to load generation, ITL measurement, and
    Prometheus monitoring. The online counterpart of offline's
    trace/template data.

    Attributes:
        requests_dir: Directory containing per-model JSONL request files
            (e.g. `{model_label}.jsonl`). If `None`, a minimal fallback
            request is used for each model.
        prometheus_poll_interval_s: How often to poll vLLM /metrics for
            request counts and saturation monitoring. Set to 0 to disable.
        max_output_tokens: Token limit for generated load requests (used
            by the fallback request when no JSONL requests are provided).
        itl_window_s: Sliding window for ITL averaging (seconds).
    """

    model_config = ConfigDict(frozen=True)

    requests_dir: Path | None = None
    prometheus_poll_interval_s: float = 0.5
    max_output_tokens: int = 512
    itl_window_s: float = 1.0

OnlineDatacenter

Bases: LLMBatchSizeControlledDatacenter[OnlineDatacenterState]

Live GPU datacenter backend with power augmentation.

Dispatches inference load to vLLM servers, streams GPU power from zeusd, measures ITL from streaming responses, and augments power readings to datacenter scale using the shared InferencePowerAugmenter pipeline (same as OfflineDatacenter).

Call start before the first step and stop after the simulation loop finishes.

PowerStreamingClient is constructed internally from the GPU endpoints declared in each deployment. Health checks are always performed during start.

Parameters:

Name Type Description Default
datacenter DatacenterConfig

Facility configuration (GPUs per server, base load).

required
deployments Sequence[VLLMDeployment]

Model deployments with physical hardware mapping.

required
dt_s Fraction

Simulation timestep (seconds).

Fraction(1, 10)
seed int

Random seed for layout generation and noise.

0
power_augmentation PowerAugmentationConfig | None

Per-server amplitude scaling and noise settings.

None
replica_schedules dict[str, ReplicaSchedule] | None

Per-model replica schedules. If None, all servers are active at their initial replica counts.

None
live_server LiveServerConfig | None

Configuration for interacting with live vLLM servers. Request data is loaded from LiveServerConfig.requests_dir.

None
Source code in openg2g/datacenter/online.py
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
class OnlineDatacenter(LLMBatchSizeControlledDatacenter[OnlineDatacenterState]):
    """Live GPU datacenter backend with power augmentation.

    Dispatches inference load to vLLM servers, streams GPU power from
    zeusd, measures ITL from streaming responses, and augments power
    readings to datacenter scale using the shared
    [`InferencePowerAugmenter`][openg2g.datacenter.workloads.inference.InferencePowerAugmenter]
    pipeline (same as
    [`OfflineDatacenter`][openg2g.datacenter.offline.OfflineDatacenter]).

    Call [`start`][.start] before the first [`step`][.step] and
    [`stop`][.stop] after the simulation loop finishes.

    `PowerStreamingClient` is constructed internally from the GPU
    endpoints declared in each deployment. Health checks are always
    performed during [`start`][.start].

    Args:
        datacenter: Facility configuration (GPUs per server, base load).
        deployments: Model deployments with physical hardware mapping.
        dt_s: Simulation timestep (seconds).
        seed: Random seed for layout generation and noise.
        power_augmentation: Per-server amplitude scaling and noise
            settings.
        replica_schedules: Per-model replica schedules. If `None`,
            all servers are active at their initial replica counts.
        live_server: Configuration for interacting with live vLLM
            servers. Request data is loaded from
            `LiveServerConfig.requests_dir`.
    """

    def __init__(
        self,
        datacenter: DatacenterConfig,
        deployments: Sequence[VLLMDeployment],
        *,
        name: str,
        dt_s: Fraction = Fraction(1, 10),
        seed: int = 0,
        power_augmentation: PowerAugmentationConfig | None = None,
        replica_schedules: dict[str, ReplicaSchedule] | None = None,
        live_server: LiveServerConfig | None = None,
    ) -> None:
        super().__init__(name=name)
        if power_augmentation is None:
            power_augmentation = PowerAugmentationConfig()
        if live_server is None:
            live_server = LiveServerConfig()
        self._dt_s = dt_s
        self._seed = int(seed)
        self._deployments = list(deployments)
        self._deployment_map = {d.model_label: d for d in deployments}
        self._datacenter_config = datacenter
        self._power_augmentation = power_augmentation
        self._live_server_config = live_server

        self._base_W_per_phase = float(datacenter.base_kw_per_phase) * 1e3
        self._replica_schedules = replica_schedules or {}

        servers_by_key: dict[str, ZeusdConfig] = {}
        gpu_indices_by_key: dict[str, list[int]] = {}
        for d in self._deployments:
            for ep in d.gpu_endpoints:
                key = ep.endpoint_key
                if key not in gpu_indices_by_key:
                    gpu_indices_by_key[key] = []
                for idx in ep.gpu_indices:
                    if idx not in gpu_indices_by_key[key]:
                        gpu_indices_by_key[key].append(idx)
                servers_by_key[key] = ZeusdConfig.tcp(
                    ep.host,
                    ep.port,
                    gpu_indices=gpu_indices_by_key[key],
                    cpu_indices=[],
                )
        self._power_client = PowerStreamingClient(servers=list(servers_by_key.values()))

        self._prometheus = (
            _PrometheusPoller(
                deployments,
                poll_interval_s=live_server.prometheus_poll_interval_s,
            )
            if live_server.prometheus_poll_interval_s > 0
            else None
        )

        self._request_store = RequestStore.load(live_server.requests_dir) if live_server.requests_dir else None
        self._load_gen = _LoadGenerator(
            deployments,
            request_store=self._request_store,
            max_output_tokens=live_server.max_output_tokens,
            itl_window_s=live_server.itl_window_s,
            prometheus_poller=self._prometheus,
        )

        self._layout_rng = np.random.default_rng(self._seed)
        self._model_schedules: dict[str, ReplicaSchedule] = {}
        self._pool: ServerPool = self._build_server_pool()
        self._inference_augmenter = InferencePowerAugmenter(
            pool=self._pool,
            gpus_per_replica_by_model={d.spec.model_label: d.spec.gpus_per_replica for d in deployments},
            seed=self._seed + 12345,
        )
        self._rolling_buffer = _RollingPowerBuffer(
            [d.model_label for d in deployments],
            max_samples=max(int(STAGGER_BUFFER_S * 100), 1000),
        )
        self._last_allocation: dict[str, np.ndarray] = {}

        self._started = False

        logger.info(
            "OnlineDatacenter: %d deployments, dt=%s s",
            len(self._deployments),
            dt_s,
        )
        for d in deployments:
            logger.info(
                "  %s: %d real GPUs, %d simulated replicas (%.0fx augmentation), vllm=%s",
                d.model_label,
                d.num_real_gpus,
                d.simulated_num_replicas,
                d.augmentation_factor,
                d.vllm_base_url,
            )

    def _build_server_pool(self) -> ServerPool:
        """Build shared server pool for the datacenter."""
        schedules = self._replica_schedules
        gpus_per_server = self._datacenter_config.gpus_per_server
        rng = self._layout_rng
        amp_lo, amp_hi = self._power_augmentation.amplitude_scale_range
        noise_fraction = self._power_augmentation.noise_fraction
        stagger_s = float(STAGGER_BUFFER_S)

        # Total servers from total simulated GPU count
        total_gpus = sum(d.simulated_num_replicas * d.spec.gpus_per_replica for d in self._deployments)
        num_servers = math.ceil(total_gpus / gpus_per_server) if total_gpus > 0 else 1

        # Store per-model schedules
        for d in self._deployments:
            if d.simulated_num_replicas > 0:
                model_schedule = schedules.get(d.spec.model_label, ReplicaSchedule(initial=d.simulated_num_replicas))
                self._model_schedules[d.model_label] = model_schedule

        # Server properties (model-independent)
        sA, sB, sC = split_integer_evenly(num_servers, 3)
        phase_list = np.asarray(([0] * sA) + ([1] * sB) + ([2] * sC), dtype=int)
        rng.shuffle(phase_list)

        stagger_offsets = rng.uniform(0.0, max(stagger_s, 1e-9), size=num_servers)
        amplitude_scales = rng.uniform(amp_lo, amp_hi, size=num_servers)

        # Per-model priority orderings
        model_priorities: dict[str, np.ndarray] = {}
        for d in self._deployments:
            priority = np.arange(num_servers, dtype=int)
            rng.shuffle(priority)
            model_priorities[d.model_label] = priority

        return ServerPool(
            num_servers=num_servers,
            gpus_per_server=gpus_per_server,
            phase_list=phase_list,
            stagger_offsets=stagger_offsets,
            amplitude_scales=amplitude_scales,
            noise_fraction=noise_fraction,
            model_priorities=model_priorities,
        )

    @property
    def dt_s(self) -> Fraction:
        return self._dt_s

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors from the last pool allocation."""
        shares: dict[str, np.ndarray] = {}
        for label, server_indices in self._last_allocation.items():
            if len(server_indices) == 0:
                continue
            counts = np.bincount(self._pool.phase_list[server_indices], minlength=3).astype(float)
            total = counts.sum()
            shares[label] = counts / total if total > 0 else np.array([1 / 3, 1 / 3, 1 / 3], dtype=float)
        return shares

    def reset(self) -> None:
        if self._started:
            self._load_gen.stop()
        self._load_gen = _LoadGenerator(
            self._deployments,
            request_store=self._request_store,
            max_output_tokens=self._live_server_config.max_output_tokens,
            itl_window_s=self._live_server_config.itl_window_s,
            prometheus_poller=self._prometheus,
        )
        self._layout_rng = np.random.default_rng(self._seed)
        self._model_schedules = {}
        self._pool = self._build_server_pool()
        self._inference_augmenter = InferencePowerAugmenter(
            pool=self._pool,
            gpus_per_replica_by_model={d.spec.model_label: d.spec.gpus_per_replica for d in self._deployments},
            seed=self._seed + 12345,
        )
        self._rolling_buffer.clear()
        self._last_allocation = {}
        for d in self._deployments:
            d.batch_size = d.initial_batch_size
        self._started = False

    def start(self) -> None:
        """Start load generation, warm up servers, and fill the power buffer.

        Sequence:
            1. Run health checks on all vLLM servers and zeusd instances.
            2. Wait for at least one power reading per endpoint (10 s timeout).
            3. Set initial batch sizes on all vLLM servers.
            4. Start load generation threads.
            5. Warm up: poll power into the rolling buffer while waiting for
               each model's `num_requests_running` to reach 95% of its
               `initial_batch_size`. Fails after 60 s if any model does not
               saturate.
        """
        if self._started:
            raise RuntimeError("OnlineDatacenter already started")

        logger.info("Starting OnlineDatacenter with %d deployments", len(self._deployments))

        # 1. Health checks
        logger.info("Running health checks...")
        for d in self._deployments:
            _check_vllm_health(d.vllm_base_url)
            _check_vllm_model(d.vllm_base_url, d.spec.model_id)
            for ep in d.gpu_endpoints:
                _check_zeusd_health(ep.host, ep.port)
        logger.info("All health checks passed")

        # 2. Wait for power readings from all endpoints
        all_endpoints: set[str] = set()
        for d in self._deployments:
            for ep in d.gpu_endpoints:
                all_endpoints.add(ep.endpoint_key)

        deadline = time.monotonic() + 10.0
        while time.monotonic() < deadline:
            readings = self._power_client.get_power()
            if all_endpoints.issubset(readings.keys()):
                logger.info("Power readings received from all %d endpoints", len(all_endpoints))
                break
            time.sleep(0.5)
        else:
            connected = set(self._power_client.get_power().keys())
            missing = all_endpoints - connected
            logger.warning("Timed out waiting for power readings from: %s", missing)

        # 3. Set initial batch sizes on vLLM servers
        for d in self._deployments:
            d.set_batch_size(d.initial_batch_size)

        # 4. Start load generation (and Prometheus poller)
        self._load_gen.start()
        logger.info("LoadGenerator started")

        # 5. Warm up: fill power buffer + wait for server saturation
        self._warmup()

        self._started = True
        logger.info("OnlineDatacenter ready")

    def _poll_power_into_buffer(self) -> tuple[float, dict[str, float]]:
        """Read GPU power from all endpoints and feed the rolling buffer.

        Returns:
            Tuple of (monotonic timestamp, per-model average per-GPU watts).
        """
        now = time.monotonic()
        raw_power = self._power_client.get_power()
        per_gpu_by_model: dict[str, float] = {}
        for d in self._deployments:
            total_w = 0.0
            n_gpus = 0
            for ep in d.gpu_endpoints:
                pr = raw_power.get(ep.endpoint_key)
                if pr is None:
                    continue
                for idx in ep.gpu_indices:
                    if idx in pr.gpu_power_w:
                        total_w += pr.gpu_power_w[idx]
                        n_gpus += 1
            per_gpu_w = total_w / n_gpus if n_gpus > 0 else 0.0
            self._rolling_buffer.append(d.model_label, now, per_gpu_w)
            per_gpu_by_model[d.model_label] = per_gpu_w
        return now, per_gpu_by_model

    def _warmup(
        self,
        timeout_s: float = 60.0,
        saturation_threshold: float = 0.95,
        poll_interval_s: float = 0.1,
    ) -> None:
        """Fill the rolling power buffer and wait for vLLM server saturation.

        Actively polls GPU power to fill the rolling buffer while monitoring
        Prometheus `num_requests_running` to verify each model has reached
        `saturation_threshold` of its `initial_batch_size`.

        Completion requires both conditions for every model:
            1. `num_requests_running >= saturation_threshold * initial_batch_size`
            2. At least `stagger_buffer_s` has elapsed since that model first
               reached saturation (so the buffer contains a full stagger
               window of steady-state power data).

        Args:
            timeout_s: Maximum warmup duration in seconds.
            saturation_threshold: Fraction of `initial_batch_size` that
                `num_requests_running` must reach (0.0-1.0).
            poll_interval_s: Seconds between power polls.

        Raises:
            RuntimeError: If any model fails to saturate within `timeout_s`.
                Includes the `num_requests_running` trajectory for failed
                models.
        """
        stagger_s = STAGGER_BUFFER_S
        logger.info(
            "Warming up: waiting for server saturation (%.0f%% of initial_batch_size) "
            "+ %.1f s buffer fill per model...",
            saturation_threshold * 100,
            stagger_s,
        )

        warmup_start = time.monotonic()
        deadline = warmup_start + timeout_s
        last_log = warmup_start

        trajectory: dict[str, list[tuple[float, float]]] = {d.model_label: [] for d in self._deployments}
        saturation_time: dict[str, float | None] = {d.model_label: None for d in self._deployments}

        while time.monotonic() < deadline:
            now = time.monotonic()
            elapsed = now - warmup_start

            self._poll_power_into_buffer()

            all_ready = True
            if self._prometheus is not None:
                prom = self._prometheus.get_latest()
                for d in self._deployments:
                    label = d.model_label
                    running = prom.get(label, {}).get("num_requests_running", 0.0)
                    trajectory[label].append((elapsed, running))
                    target = d.initial_batch_size * saturation_threshold

                    if running >= target and saturation_time[label] is None:
                        saturation_time[label] = now
                        logger.info(
                            "  %s saturated at t=%.1f s (num_requests_running=%.0f)",
                            label,
                            elapsed,
                            running,
                        )

                    sat_t = saturation_time[label]
                    if sat_t is None or (now - sat_t) < stagger_s:
                        all_ready = False
            else:
                logger.warning(
                    "Prometheus polling is disabled; cannot verify server saturation. "
                    "Waiting %.1f s for power buffer only.",
                    stagger_s,
                )
                if elapsed < stagger_s:
                    all_ready = False

            if all_ready:
                logger.info("Warmup complete in %.1f s", elapsed)
                return

            if now - last_log >= 10.0:
                last_log = now
                if self._prometheus is not None:
                    prom = self._prometheus.get_latest()
                    for d in self._deployments:
                        label = d.model_label
                        running = prom.get(label, {}).get("num_requests_running", 0.0)
                        target = d.initial_batch_size
                        sat_t = saturation_time[label]
                        buf_s = (now - sat_t) if sat_t is not None else 0.0
                        logger.info(
                            "  Warmup %s: num_requests_running=%.0f / %d (%.0f%%), buffer=%.1f / %.1f s",
                            label,
                            running,
                            target,
                            running / max(target, 1) * 100,
                            buf_s,
                            stagger_s,
                        )

            time.sleep(poll_interval_s)

        if self._prometheus is None:
            raise RuntimeError(
                f"Warmup timed out after {timeout_s:.0f} s waiting for power buffer to fill ({stagger_s:.1f} s)"
            )

        prom = self._prometheus.get_latest()
        failed: list[str] = []
        for d in self._deployments:
            label = d.model_label
            running = prom.get(label, {}).get("num_requests_running", 0.0)
            sat_t = saturation_time[label]
            not_saturated = running < d.initial_batch_size * saturation_threshold
            not_buffered = sat_t is None or (time.monotonic() - sat_t) < stagger_s
            if not_saturated or not_buffered:
                failed.append(label)

        parts = [
            f"Warmup timed out after {timeout_s:.0f} s. "
            f"Models that failed to reach {saturation_threshold:.0%} of initial_batch_size:",
        ]
        for label in failed:
            target = self._deployment_map[label].initial_batch_size
            traj = trajectory[label]
            final = traj[-1][1] if traj else 0.0
            parts.append(f"  {label} (target: {target}, reached: {final:.0f}):")
            step = max(1, int(5.0 / poll_interval_s))
            samples = traj[::step]
            if traj and (not samples or samples[-1] is not traj[-1]):
                samples.append(traj[-1])
            entries = [f"t={t:.0f}s: {r:.0f}" for t, r in samples]
            parts.append("    " + ", ".join(entries))
        raise RuntimeError("\n".join(parts))

    def stop(self) -> None:
        """Stop load generation and power streaming."""
        self._load_gen.stop()
        self._power_client.stop()
        self._started = False
        logger.info("OnlineDatacenter stopped")

    def step(self, clock: SimulationClock, events: EventEmitter) -> OnlineDatacenterState:
        """Read live power, augment to datacenter scale, and return state."""
        now, per_gpu_w_by_model = self._poll_power_into_buffer()

        measured_power_by_model: dict[str, float] = {}
        augmentation_factor_by_model: dict[str, float] = {}
        for d in self._deployments:
            label = d.model_label
            measured_power_by_model[label] = per_gpu_w_by_model.get(label, 0.0) * d.num_real_gpus
            augmentation_factor_by_model[label] = d.augmentation_factor

        per_gpu_by_model: dict[str, np.ndarray] = {}
        replica_counts: dict[str, int] = {}
        pool = self._pool
        for d in self._deployments:
            label = d.model_label
            if label not in self._model_schedules:
                continue
            per_gpu_by_model[label] = self._rolling_buffer.sample_servers(label, now, pool.stagger_offsets)
            schedule = self._model_schedules[label]
            replica_counts[label] = int(round(schedule.count_at(clock.time_s)))

        inference_aug = self._inference_augmenter.augment(per_gpu_by_model, replica_counts)
        self._last_allocation = inference_aug.allocation

        measured_total = sum(measured_power_by_model.values())
        measured_per_phase = measured_total / 3.0

        observed_itl: dict[str, float] = {
            d.model_label: self._load_gen.get_observed_itl(d.model_label) for d in self._deployments
        }

        prometheus_metrics: dict[str, dict[str, float]] = {}
        if self._prometheus is not None:
            prometheus_metrics = self._prometheus.get_latest()

        state = OnlineDatacenterState(
            time_s=clock.time_s,
            power_w=ThreePhase(
                a=self._base_W_per_phase + inference_aug.power_w.a,
                b=self._base_W_per_phase + inference_aug.power_w.b,
                c=self._base_W_per_phase + inference_aug.power_w.c,
            ),
            batch_size_by_model={d.model_label: d.batch_size for d in self._deployments},
            active_replicas_by_model=inference_aug.active_replicas_by_model,
            observed_itl_s_by_model=observed_itl,
            measured_power_w=ThreePhase(
                a=measured_per_phase + self._base_W_per_phase,
                b=measured_per_phase + self._base_W_per_phase,
                c=measured_per_phase + self._base_W_per_phase,
            ),
            measured_power_w_by_model=measured_power_by_model,
            augmented_power_w_by_model=inference_aug.power_by_model_w,
            augmentation_factor_by_model=augmentation_factor_by_model,
            prometheus_metrics_by_model=prometheus_metrics,
        )
        return state

    @functools.singledispatchmethod
    def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
        """Apply a control command. Dispatches on command type."""
        raise TypeError(f"OnlineDatacenter does not support {type(command).__name__}")

    @apply_control.register
    def apply_control_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
        """Apply batch size command by sending HTTP requests to vLLM servers."""
        for label, b in command.batch_size_by_model.items():
            label = str(label)
            b_int = int(b)
            if b_int <= 0:
                raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
            if label not in self._deployment_map:
                raise ValueError(f"Unknown model label {label!r}. Known: {sorted(self._deployment_map)}")
            self._deployment_map[label].set_batch_size(
                b_int, ramp_up_rate=command.ramp_up_rate_by_model.get(label, 0.0)
            )

        events.emit(
            "datacenter.batch_size.updated",
            {"batch_size_by_model": {d.model_label: d.batch_size for d in self._deployments}},
        )

phase_share_by_model property

Per-model phase share vectors from the last pool allocation.

start()

Start load generation, warm up servers, and fill the power buffer.

Sequence
  1. Run health checks on all vLLM servers and zeusd instances.
  2. Wait for at least one power reading per endpoint (10 s timeout).
  3. Set initial batch sizes on all vLLM servers.
  4. Start load generation threads.
  5. Warm up: poll power into the rolling buffer while waiting for each model's num_requests_running to reach 95% of its initial_batch_size. Fails after 60 s if any model does not saturate.
Source code in openg2g/datacenter/online.py
def start(self) -> None:
    """Start load generation, warm up servers, and fill the power buffer.

    Sequence:
        1. Run health checks on all vLLM servers and zeusd instances.
        2. Wait for at least one power reading per endpoint (10 s timeout).
        3. Set initial batch sizes on all vLLM servers.
        4. Start load generation threads.
        5. Warm up: poll power into the rolling buffer while waiting for
           each model's `num_requests_running` to reach 95% of its
           `initial_batch_size`. Fails after 60 s if any model does not
           saturate.
    """
    if self._started:
        raise RuntimeError("OnlineDatacenter already started")

    logger.info("Starting OnlineDatacenter with %d deployments", len(self._deployments))

    # 1. Health checks
    logger.info("Running health checks...")
    for d in self._deployments:
        _check_vllm_health(d.vllm_base_url)
        _check_vllm_model(d.vllm_base_url, d.spec.model_id)
        for ep in d.gpu_endpoints:
            _check_zeusd_health(ep.host, ep.port)
    logger.info("All health checks passed")

    # 2. Wait for power readings from all endpoints
    all_endpoints: set[str] = set()
    for d in self._deployments:
        for ep in d.gpu_endpoints:
            all_endpoints.add(ep.endpoint_key)

    deadline = time.monotonic() + 10.0
    while time.monotonic() < deadline:
        readings = self._power_client.get_power()
        if all_endpoints.issubset(readings.keys()):
            logger.info("Power readings received from all %d endpoints", len(all_endpoints))
            break
        time.sleep(0.5)
    else:
        connected = set(self._power_client.get_power().keys())
        missing = all_endpoints - connected
        logger.warning("Timed out waiting for power readings from: %s", missing)

    # 3. Set initial batch sizes on vLLM servers
    for d in self._deployments:
        d.set_batch_size(d.initial_batch_size)

    # 4. Start load generation (and Prometheus poller)
    self._load_gen.start()
    logger.info("LoadGenerator started")

    # 5. Warm up: fill power buffer + wait for server saturation
    self._warmup()

    self._started = True
    logger.info("OnlineDatacenter ready")

stop()

Stop load generation and power streaming.

Source code in openg2g/datacenter/online.py
def stop(self) -> None:
    """Stop load generation and power streaming."""
    self._load_gen.stop()
    self._power_client.stop()
    self._started = False
    logger.info("OnlineDatacenter stopped")

step(clock, events)

Read live power, augment to datacenter scale, and return state.

Source code in openg2g/datacenter/online.py
def step(self, clock: SimulationClock, events: EventEmitter) -> OnlineDatacenterState:
    """Read live power, augment to datacenter scale, and return state."""
    now, per_gpu_w_by_model = self._poll_power_into_buffer()

    measured_power_by_model: dict[str, float] = {}
    augmentation_factor_by_model: dict[str, float] = {}
    for d in self._deployments:
        label = d.model_label
        measured_power_by_model[label] = per_gpu_w_by_model.get(label, 0.0) * d.num_real_gpus
        augmentation_factor_by_model[label] = d.augmentation_factor

    per_gpu_by_model: dict[str, np.ndarray] = {}
    replica_counts: dict[str, int] = {}
    pool = self._pool
    for d in self._deployments:
        label = d.model_label
        if label not in self._model_schedules:
            continue
        per_gpu_by_model[label] = self._rolling_buffer.sample_servers(label, now, pool.stagger_offsets)
        schedule = self._model_schedules[label]
        replica_counts[label] = int(round(schedule.count_at(clock.time_s)))

    inference_aug = self._inference_augmenter.augment(per_gpu_by_model, replica_counts)
    self._last_allocation = inference_aug.allocation

    measured_total = sum(measured_power_by_model.values())
    measured_per_phase = measured_total / 3.0

    observed_itl: dict[str, float] = {
        d.model_label: self._load_gen.get_observed_itl(d.model_label) for d in self._deployments
    }

    prometheus_metrics: dict[str, dict[str, float]] = {}
    if self._prometheus is not None:
        prometheus_metrics = self._prometheus.get_latest()

    state = OnlineDatacenterState(
        time_s=clock.time_s,
        power_w=ThreePhase(
            a=self._base_W_per_phase + inference_aug.power_w.a,
            b=self._base_W_per_phase + inference_aug.power_w.b,
            c=self._base_W_per_phase + inference_aug.power_w.c,
        ),
        batch_size_by_model={d.model_label: d.batch_size for d in self._deployments},
        active_replicas_by_model=inference_aug.active_replicas_by_model,
        observed_itl_s_by_model=observed_itl,
        measured_power_w=ThreePhase(
            a=measured_per_phase + self._base_W_per_phase,
            b=measured_per_phase + self._base_W_per_phase,
            c=measured_per_phase + self._base_W_per_phase,
        ),
        measured_power_w_by_model=measured_power_by_model,
        augmented_power_w_by_model=inference_aug.power_by_model_w,
        augmentation_factor_by_model=augmentation_factor_by_model,
        prometheus_metrics_by_model=prometheus_metrics,
    )
    return state

apply_control(command, events)

Apply a control command. Dispatches on command type.

Source code in openg2g/datacenter/online.py
@functools.singledispatchmethod
def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
    """Apply a control command. Dispatches on command type."""
    raise TypeError(f"OnlineDatacenter does not support {type(command).__name__}")

apply_control_set_batch_size(command, events)

Apply batch size command by sending HTTP requests to vLLM servers.

Source code in openg2g/datacenter/online.py
@apply_control.register
def apply_control_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
    """Apply batch size command by sending HTTP requests to vLLM servers."""
    for label, b in command.batch_size_by_model.items():
        label = str(label)
        b_int = int(b)
        if b_int <= 0:
            raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
        if label not in self._deployment_map:
            raise ValueError(f"Unknown model label {label!r}. Known: {sorted(self._deployment_map)}")
        self._deployment_map[label].set_batch_size(
            b_int, ramp_up_rate=command.ramp_up_rate_by_model.get(label, 0.0)
        )

    events.emit(
        "datacenter.batch_size.updated",
        {"batch_size_by_model": {d.model_label: d.batch_size for d in self._deployments}},
    )

openg2g.datacenter.workloads.inference

Inference workload: power traces, templates, ITL fits, and augmentation.

MLEnergySource

Bases: BaseModel

Per-model ML.ENERGY benchmark data extraction settings.

Attributes:

Name Type Description
model_label str

Simulation label for the model.

task str

Benchmark task name (e.g. "lm-arena-chat", "gpqa").

gpu str

GPU model name (e.g. "H100").

batch_sizes tuple[int, ...]

Batch sizes to extract from the benchmark data.

fit_exclude_batch_sizes tuple[int, ...]

Batch sizes to exclude from logistic curve fitting (but still included in trace extraction).

Source code in openg2g/datacenter/workloads/inference.py
class MLEnergySource(BaseModel):
    """Per-model ML.ENERGY benchmark data extraction settings.

    Attributes:
        model_label: Simulation label for the model.
        task: Benchmark task name (e.g. `"lm-arena-chat"`, `"gpqa"`).
        gpu: GPU model name (e.g. `"H100"`).
        batch_sizes: Batch sizes to extract from the benchmark data.
        fit_exclude_batch_sizes: Batch sizes to exclude from logistic
            curve fitting (but still included in trace extraction).
    """

    model_config = ConfigDict(frozen=True)

    model_label: str
    task: str
    gpu: str
    batch_sizes: tuple[int, ...]
    fit_exclude_batch_sizes: tuple[int, ...] = ()

InferenceTrace dataclass

A single power trace measurement.

Attributes:

Name Type Description
t_s ndarray

Time vector (seconds), monotonically increasing.

power_w ndarray

Total power vector (watts) across all measured GPUs, same length as t_s.

measured_gpus int

Number of GPUs used in the measurement.

Source code in openg2g/datacenter/workloads/inference.py
@dataclass(frozen=True)
class InferenceTrace:
    """A single power trace measurement.

    Attributes:
        t_s: Time vector (seconds), monotonically increasing.
        power_w: Total power vector (watts) across all measured GPUs,
            same length as `t_s`.
        measured_gpus: Number of GPUs used in the measurement.
    """

    t_s: np.ndarray
    power_w: np.ndarray
    measured_gpus: int

    def __post_init__(self) -> None:
        if len(self.t_s) != len(self.power_w):
            raise ValueError(f"t_s and power_w must have the same length, got {len(self.t_s)} and {len(self.power_w)}")
        if len(self.t_s) < 5:
            raise ValueError("Trace too short (need at least 5 samples).")
        if self.measured_gpus < 1:
            raise ValueError(f"measured_gpus must be >= 1, got {self.measured_gpus}")

ITLFitStore

Per-model, per-batch-size ITL mixture distributions.

Indexed by (model_label, batch_size). Provides:

  • load: load fits from a CSV produced by the data pipeline
  • distributions: access as a nested dict
  • sample_avg: sample a fleet-average ITL value

Attributes:

Name Type Description
COL_MODEL_LABEL

Column name for model label in the CSV.

COL_BATCH_SIZE

Column name for batch size in the CSV.

Source code in openg2g/datacenter/workloads/inference.py
class ITLFitStore:
    """Per-model, per-batch-size ITL mixture distributions.

    Indexed by `(model_label, batch_size)`. Provides:

    - [`load`][.load]: load fits from a CSV produced by the data pipeline
    - [`distributions`][.distributions]: access as a nested dict
    - [`sample_avg`][.sample_avg]: sample a fleet-average ITL value

    Attributes:
        COL_MODEL_LABEL: Column name for model label in the CSV.
        COL_BATCH_SIZE: Column name for batch size in the CSV.
    """

    COL_MODEL_LABEL = "model_label"
    COL_BATCH_SIZE = "max_num_seqs"

    def __init__(
        self,
        distributions: dict[str, dict[int, ITLMixtureModel]],
        approx_sampling_thresh: int = 30,
    ) -> None:
        self._distributions = {
            str(label): {int(b): m for b, m in per_batch.items()} for label, per_batch in distributions.items()
        }
        self._approx_sampling_thresh = int(approx_sampling_thresh)

    @property
    def distributions(self) -> dict[str, dict[int, ITLMixtureModel]]:
        """Nested dict: `model_label -> batch_size -> ITLMixtureModel`."""
        return self._distributions

    def sample_avg(
        self,
        model_label: str,
        batch_size: int,
        n_replicas: int,
        rng: np.random.Generator,
    ) -> float:
        """Sample a fleet-average ITL for the given model and batch size.

        Uses `ITLMixtureModel.sample_avg` under the hood, with the
        `approx_sampling_thresh` set at construction time.

        Args:
            model_label: Model label string.
            batch_size: Current batch size.
            n_replicas: Number of active replicas.
            rng: NumPy random generator for sampling.

        Returns:
            Fleet-average ITL in seconds.

        Raises:
            KeyError: If model or batch size is not in the store.
        """
        model_dists = self._distributions.get(model_label)
        if model_dists is None:
            raise KeyError(f"No ITL distributions for model={model_label!r}")
        params = model_dists.get(int(batch_size))
        if params is None:
            raise KeyError(
                f"No ITL distributions for model={model_label!r}, batch={batch_size}. "
                f"Available={sorted(model_dists.keys())}"
            )
        return params.sample_avg(
            n_replicas=n_replicas,
            rng=rng,
            exact_threshold=self._approx_sampling_thresh,
        )

    @classmethod
    def load(cls, csv_path: Path | str, approx_sampling_thresh: int = 30) -> ITLFitStore:
        """Load ITL mixture fits from a CSV.

        Expected columns: `model_label`, `max_num_seqs`, plus the
        `itl_mix_*` parameter columns produced by
        `ITLMixtureModel.to_dict()`.

        Args:
            csv_path: Path to the latency fits CSV.
            approx_sampling_thresh: Replica count above which sampling
                uses a CLT normal approximation instead of drawing
                individual samples.
        """
        csv_path = Path(csv_path)
        df = pd.read_csv(csv_path)

        required_cols = [cls.COL_MODEL_LABEL, cls.COL_BATCH_SIZE]
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            raise ValueError(f"{csv_path} missing columns: {missing}. Got: {list(df.columns)}")

        distributions: dict[str, dict[int, ITLMixtureModel]] = {}
        for row in df.to_dict(orient="records"):
            label = str(row[cls.COL_MODEL_LABEL]).strip()
            batch = int(row[cls.COL_BATCH_SIZE])
            distributions.setdefault(label, {})[batch] = ITLMixtureModel.from_dict(row)

        if not distributions:
            raise ValueError(f"No ITL mixture rows loaded from {csv_path}")
        return cls(distributions, approx_sampling_thresh=approx_sampling_thresh)

    def save(self, csv_path: Path) -> None:
        """Save ITL mixture fits to a CSV.

        Args:
            csv_path: Output CSV path.
        """
        csv_path = Path(csv_path)
        csv_path.parent.mkdir(parents=True, exist_ok=True)
        rows: list[dict[str, Any]] = []
        for label in sorted(self._distributions):
            for batch in sorted(self._distributions[label]):
                model = self._distributions[label][batch]
                rows.append(
                    {
                        self.COL_MODEL_LABEL: label,
                        self.COL_BATCH_SIZE: batch,
                        "itl_dist": "lognormal_mixture_2",
                        **{f"itl_mix_{k}": v for k, v in model.to_dict().items()},
                    }
                )
        pd.DataFrame(rows).to_csv(csv_path, index=False)

distributions property

Nested dict: model_label -> batch_size -> ITLMixtureModel.

sample_avg(model_label, batch_size, n_replicas, rng)

Sample a fleet-average ITL for the given model and batch size.

Uses ITLMixtureModel.sample_avg under the hood, with the approx_sampling_thresh set at construction time.

Parameters:

Name Type Description Default
model_label str

Model label string.

required
batch_size int

Current batch size.

required
n_replicas int

Number of active replicas.

required
rng Generator

NumPy random generator for sampling.

required

Returns:

Type Description
float

Fleet-average ITL in seconds.

Raises:

Type Description
KeyError

If model or batch size is not in the store.

Source code in openg2g/datacenter/workloads/inference.py
def sample_avg(
    self,
    model_label: str,
    batch_size: int,
    n_replicas: int,
    rng: np.random.Generator,
) -> float:
    """Sample a fleet-average ITL for the given model and batch size.

    Uses `ITLMixtureModel.sample_avg` under the hood, with the
    `approx_sampling_thresh` set at construction time.

    Args:
        model_label: Model label string.
        batch_size: Current batch size.
        n_replicas: Number of active replicas.
        rng: NumPy random generator for sampling.

    Returns:
        Fleet-average ITL in seconds.

    Raises:
        KeyError: If model or batch size is not in the store.
    """
    model_dists = self._distributions.get(model_label)
    if model_dists is None:
        raise KeyError(f"No ITL distributions for model={model_label!r}")
    params = model_dists.get(int(batch_size))
    if params is None:
        raise KeyError(
            f"No ITL distributions for model={model_label!r}, batch={batch_size}. "
            f"Available={sorted(model_dists.keys())}"
        )
    return params.sample_avg(
        n_replicas=n_replicas,
        rng=rng,
        exact_threshold=self._approx_sampling_thresh,
    )

load(csv_path, approx_sampling_thresh=30) classmethod

Load ITL mixture fits from a CSV.

Expected columns: model_label, max_num_seqs, plus the itl_mix_* parameter columns produced by ITLMixtureModel.to_dict().

Parameters:

Name Type Description Default
csv_path Path | str

Path to the latency fits CSV.

required
approx_sampling_thresh int

Replica count above which sampling uses a CLT normal approximation instead of drawing individual samples.

30
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def load(cls, csv_path: Path | str, approx_sampling_thresh: int = 30) -> ITLFitStore:
    """Load ITL mixture fits from a CSV.

    Expected columns: `model_label`, `max_num_seqs`, plus the
    `itl_mix_*` parameter columns produced by
    `ITLMixtureModel.to_dict()`.

    Args:
        csv_path: Path to the latency fits CSV.
        approx_sampling_thresh: Replica count above which sampling
            uses a CLT normal approximation instead of drawing
            individual samples.
    """
    csv_path = Path(csv_path)
    df = pd.read_csv(csv_path)

    required_cols = [cls.COL_MODEL_LABEL, cls.COL_BATCH_SIZE]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"{csv_path} missing columns: {missing}. Got: {list(df.columns)}")

    distributions: dict[str, dict[int, ITLMixtureModel]] = {}
    for row in df.to_dict(orient="records"):
        label = str(row[cls.COL_MODEL_LABEL]).strip()
        batch = int(row[cls.COL_BATCH_SIZE])
        distributions.setdefault(label, {})[batch] = ITLMixtureModel.from_dict(row)

    if not distributions:
        raise ValueError(f"No ITL mixture rows loaded from {csv_path}")
    return cls(distributions, approx_sampling_thresh=approx_sampling_thresh)

save(csv_path)

Save ITL mixture fits to a CSV.

Parameters:

Name Type Description Default
csv_path Path

Output CSV path.

required
Source code in openg2g/datacenter/workloads/inference.py
def save(self, csv_path: Path) -> None:
    """Save ITL mixture fits to a CSV.

    Args:
        csv_path: Output CSV path.
    """
    csv_path = Path(csv_path)
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    rows: list[dict[str, Any]] = []
    for label in sorted(self._distributions):
        for batch in sorted(self._distributions[label]):
            model = self._distributions[label][batch]
            rows.append(
                {
                    self.COL_MODEL_LABEL: label,
                    self.COL_BATCH_SIZE: batch,
                    "itl_dist": "lognormal_mixture_2",
                    **{f"itl_mix_{k}": v for k, v in model.to_dict().items()},
                }
            )
    pd.DataFrame(rows).to_csv(csv_path, index=False)

InferenceTemplateStore

Pre-built per-GPU power templates for a specific simulation config.

Created by InferenceTraceStore.build_templates. Use template to look up a template by model label and batch size.

Source code in openg2g/datacenter/workloads/inference.py
class InferenceTemplateStore:
    """Pre-built per-GPU power templates for a specific simulation config.

    Created by [`InferenceTraceStore.build_templates`][..InferenceTraceStore.build_templates].
    Use [`template`][.template] to look up a template by model label and batch size.
    """

    def __init__(
        self,
        templates: dict[tuple[str, int], np.ndarray],
        batch_sizes_by_model: dict[str, list[int]],
    ) -> None:
        self._templates = templates
        self._batch_sizes_by_model = batch_sizes_by_model

    def template(self, model_label: str, batch_size: int) -> np.ndarray:
        """Return a pre-built per-GPU power template."""
        key = (str(model_label), int(batch_size))
        if key not in self._templates:
            raise KeyError(f"No template for model={model_label!r}, batch={batch_size}.")
        return self._templates[key]

    def batch_sizes(self, model_label: str) -> list[int]:
        """List of batch sizes available for a model."""
        sizes = self._batch_sizes_by_model.get(model_label)
        if sizes is None:
            raise KeyError(f"Unknown model: {model_label!r}")
        return list(sizes)

template(model_label, batch_size)

Return a pre-built per-GPU power template.

Source code in openg2g/datacenter/workloads/inference.py
def template(self, model_label: str, batch_size: int) -> np.ndarray:
    """Return a pre-built per-GPU power template."""
    key = (str(model_label), int(batch_size))
    if key not in self._templates:
        raise KeyError(f"No template for model={model_label!r}, batch={batch_size}.")
    return self._templates[key]

batch_sizes(model_label)

List of batch sizes available for a model.

Source code in openg2g/datacenter/workloads/inference.py
def batch_sizes(self, model_label: str) -> list[int]:
    """List of batch sizes available for a model."""
    sizes = self._batch_sizes_by_model.get(model_label)
    if sizes is None:
        raise KeyError(f"Unknown model: {model_label!r}")
    return list(sizes)

InferenceTraceStore

Manages raw power traces loaded from CSV files.

Indexed by (model_label, batch_size). Provides:

Source code in openg2g/datacenter/workloads/inference.py
class InferenceTraceStore:
    """Manages raw power traces loaded from CSV files.

    Indexed by `(model_label, batch_size)`. Provides:

    - [`load`][.load]: load traces discovered via a manifest CSV
    - [`build_templates`][.build_templates]: build per-GPU power
      templates for a specific simulation config, returning a
      [`InferenceTemplateStore`][..InferenceTemplateStore]
    """

    MANIFEST_COL_MODEL_LABEL = "model_label"
    MANIFEST_COL_NUM_GPUS = "num_gpus"
    MANIFEST_COL_BATCH_SIZE = "max_num_seqs"
    MANIFEST_COL_TRACE_FILE = "trace_file"
    TRACE_COL_TIME = "relative_time_s"
    TRACE_COL_POWER = "power_total_W"

    def __init__(self, traces: dict[str, dict[int, InferenceTrace]]) -> None:
        self._traces = {str(label): {int(b): tr for b, tr in per_batch.items()} for label, per_batch in traces.items()}

    @classmethod
    def load(cls, manifest: Path) -> InferenceTraceStore:
        """Load traces discovered via a manifest CSV.

        Trace file paths in the manifest are resolved relative to the
        manifest file's parent directory.

        Args:
            manifest: Path to the manifest CSV (e.g. `traces_summary.csv`).
                Expected columns: `model_label`, `num_gpus`, `max_num_seqs`,
                `trace_file`.
        """
        manifest = Path(manifest)
        base_dir = manifest.parent
        df = pd.read_csv(manifest)

        required_cols = [
            cls.MANIFEST_COL_MODEL_LABEL,
            cls.MANIFEST_COL_NUM_GPUS,
            cls.MANIFEST_COL_BATCH_SIZE,
            cls.MANIFEST_COL_TRACE_FILE,
        ]
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            raise ValueError(f"Manifest {manifest} missing columns: {missing}. Got: {list(df.columns)}")

        traces: dict[str, dict[int, InferenceTrace]] = {}
        for row in df.to_dict(orient="records"):
            label = str(row[cls.MANIFEST_COL_MODEL_LABEL])
            num_gpus = int(row[cls.MANIFEST_COL_NUM_GPUS])
            batch = int(row[cls.MANIFEST_COL_BATCH_SIZE])
            trace_path = base_dir / str(row[cls.MANIFEST_COL_TRACE_FILE])

            if not trace_path.exists():
                raise FileNotFoundError(f"Trace file not found: {trace_path} (model={label}, batch={batch})")

            tdf = pd.read_csv(trace_path)
            if cls.TRACE_COL_TIME not in tdf.columns or cls.TRACE_COL_POWER not in tdf.columns:
                raise ValueError(
                    f"{trace_path} must contain {cls.TRACE_COL_TIME!r} and "
                    f"{cls.TRACE_COL_POWER!r}. Got: {list(tdf.columns)}"
                )

            t = tdf[cls.TRACE_COL_TIME].to_numpy(float)
            p = tdf[cls.TRACE_COL_POWER].to_numpy(float)
            if np.any(np.diff(t) < 0):
                idx = np.argsort(t)
                t, p = t[idx], p[idx]

            traces.setdefault(label, {})[batch] = InferenceTrace(
                t_s=t,
                power_w=p,
                measured_gpus=num_gpus,
            )

        return cls(traces)

    def build_templates(
        self,
        *,
        duration_s: Fraction | float,
        dt_s: Fraction | float,
        steady_skip_s: float = 0.0,
    ) -> InferenceTemplateStore:
        """Build per-GPU power templates for all traces.

        Args:
            duration_s: Total simulation duration (seconds).
            dt_s: Simulation timestep (seconds).
            steady_skip_s: Skip this many seconds from the start of each
                trace to avoid warm-up transients.

        Returns:
            A [`InferenceTemplateStore`][openg2g.datacenter.workloads.inference.InferenceTemplateStore]
                holding the built templates.
        """
        templates: dict[tuple[str, int], np.ndarray] = {}
        batch_sizes_by_model: dict[str, list[int]] = {}
        for label, per_batch in self._traces.items():
            batch_sizes_by_model[label] = sorted(per_batch.keys())
            for batch, tr in per_batch.items():
                tpl = _build_per_gpu_power_template(
                    tr,
                    dt_s=dt_s,
                    duration_s=duration_s,
                    steady_skip_s=steady_skip_s,
                )
                templates[(label, batch)] = tpl
        return InferenceTemplateStore(templates, batch_sizes_by_model)

    def save(self, out_dir: Path) -> None:
        """Save traces and manifest CSV to a directory.

        Writes individual trace CSVs to `out_dir/traces/` and a manifest
        CSV at `out_dir/traces_summary.csv`.

        Args:
            out_dir: Output directory.
        """
        out_dir = Path(out_dir)
        traces_dir = out_dir / "traces"
        traces_dir.mkdir(parents=True, exist_ok=True)

        summary_rows: list[dict[str, Any]] = []
        for label in sorted(self._traces):
            for batch in sorted(self._traces[label]):
                tr = self._traces[label][batch]
                trace_name = f"{label}_num_gpus_{tr.measured_gpus}_max_num_seqs_{batch}.csv"
                pd.DataFrame(
                    {
                        self.TRACE_COL_TIME: tr.t_s,
                        self.TRACE_COL_POWER: tr.power_w,
                    }
                ).to_csv(traces_dir / trace_name, index=False)
                summary_rows.append(
                    {
                        self.MANIFEST_COL_MODEL_LABEL: label,
                        self.MANIFEST_COL_NUM_GPUS: tr.measured_gpus,
                        self.MANIFEST_COL_BATCH_SIZE: batch,
                        self.MANIFEST_COL_TRACE_FILE: f"traces/{trace_name}",
                    }
                )
        pd.DataFrame(summary_rows).to_csv(out_dir / "traces_summary.csv", index=False)

load(manifest) classmethod

Load traces discovered via a manifest CSV.

Trace file paths in the manifest are resolved relative to the manifest file's parent directory.

Parameters:

Name Type Description Default
manifest Path

Path to the manifest CSV (e.g. traces_summary.csv). Expected columns: model_label, num_gpus, max_num_seqs, trace_file.

required
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def load(cls, manifest: Path) -> InferenceTraceStore:
    """Load traces discovered via a manifest CSV.

    Trace file paths in the manifest are resolved relative to the
    manifest file's parent directory.

    Args:
        manifest: Path to the manifest CSV (e.g. `traces_summary.csv`).
            Expected columns: `model_label`, `num_gpus`, `max_num_seqs`,
            `trace_file`.
    """
    manifest = Path(manifest)
    base_dir = manifest.parent
    df = pd.read_csv(manifest)

    required_cols = [
        cls.MANIFEST_COL_MODEL_LABEL,
        cls.MANIFEST_COL_NUM_GPUS,
        cls.MANIFEST_COL_BATCH_SIZE,
        cls.MANIFEST_COL_TRACE_FILE,
    ]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Manifest {manifest} missing columns: {missing}. Got: {list(df.columns)}")

    traces: dict[str, dict[int, InferenceTrace]] = {}
    for row in df.to_dict(orient="records"):
        label = str(row[cls.MANIFEST_COL_MODEL_LABEL])
        num_gpus = int(row[cls.MANIFEST_COL_NUM_GPUS])
        batch = int(row[cls.MANIFEST_COL_BATCH_SIZE])
        trace_path = base_dir / str(row[cls.MANIFEST_COL_TRACE_FILE])

        if not trace_path.exists():
            raise FileNotFoundError(f"Trace file not found: {trace_path} (model={label}, batch={batch})")

        tdf = pd.read_csv(trace_path)
        if cls.TRACE_COL_TIME not in tdf.columns or cls.TRACE_COL_POWER not in tdf.columns:
            raise ValueError(
                f"{trace_path} must contain {cls.TRACE_COL_TIME!r} and "
                f"{cls.TRACE_COL_POWER!r}. Got: {list(tdf.columns)}"
            )

        t = tdf[cls.TRACE_COL_TIME].to_numpy(float)
        p = tdf[cls.TRACE_COL_POWER].to_numpy(float)
        if np.any(np.diff(t) < 0):
            idx = np.argsort(t)
            t, p = t[idx], p[idx]

        traces.setdefault(label, {})[batch] = InferenceTrace(
            t_s=t,
            power_w=p,
            measured_gpus=num_gpus,
        )

    return cls(traces)

build_templates(*, duration_s, dt_s, steady_skip_s=0.0)

Build per-GPU power templates for all traces.

Parameters:

Name Type Description Default
duration_s Fraction | float

Total simulation duration (seconds).

required
dt_s Fraction | float

Simulation timestep (seconds).

required
steady_skip_s float

Skip this many seconds from the start of each trace to avoid warm-up transients.

0.0

Returns:

Type Description
InferenceTemplateStore

A InferenceTemplateStore holding the built templates.

Source code in openg2g/datacenter/workloads/inference.py
def build_templates(
    self,
    *,
    duration_s: Fraction | float,
    dt_s: Fraction | float,
    steady_skip_s: float = 0.0,
) -> InferenceTemplateStore:
    """Build per-GPU power templates for all traces.

    Args:
        duration_s: Total simulation duration (seconds).
        dt_s: Simulation timestep (seconds).
        steady_skip_s: Skip this many seconds from the start of each
            trace to avoid warm-up transients.

    Returns:
        A [`InferenceTemplateStore`][openg2g.datacenter.workloads.inference.InferenceTemplateStore]
            holding the built templates.
    """
    templates: dict[tuple[str, int], np.ndarray] = {}
    batch_sizes_by_model: dict[str, list[int]] = {}
    for label, per_batch in self._traces.items():
        batch_sizes_by_model[label] = sorted(per_batch.keys())
        for batch, tr in per_batch.items():
            tpl = _build_per_gpu_power_template(
                tr,
                dt_s=dt_s,
                duration_s=duration_s,
                steady_skip_s=steady_skip_s,
            )
            templates[(label, batch)] = tpl
    return InferenceTemplateStore(templates, batch_sizes_by_model)

save(out_dir)

Save traces and manifest CSV to a directory.

Writes individual trace CSVs to out_dir/traces/ and a manifest CSV at out_dir/traces_summary.csv.

Parameters:

Name Type Description Default
out_dir Path

Output directory.

required
Source code in openg2g/datacenter/workloads/inference.py
def save(self, out_dir: Path) -> None:
    """Save traces and manifest CSV to a directory.

    Writes individual trace CSVs to `out_dir/traces/` and a manifest
    CSV at `out_dir/traces_summary.csv`.

    Args:
        out_dir: Output directory.
    """
    out_dir = Path(out_dir)
    traces_dir = out_dir / "traces"
    traces_dir.mkdir(parents=True, exist_ok=True)

    summary_rows: list[dict[str, Any]] = []
    for label in sorted(self._traces):
        for batch in sorted(self._traces[label]):
            tr = self._traces[label][batch]
            trace_name = f"{label}_num_gpus_{tr.measured_gpus}_max_num_seqs_{batch}.csv"
            pd.DataFrame(
                {
                    self.TRACE_COL_TIME: tr.t_s,
                    self.TRACE_COL_POWER: tr.power_w,
                }
            ).to_csv(traces_dir / trace_name, index=False)
            summary_rows.append(
                {
                    self.MANIFEST_COL_MODEL_LABEL: label,
                    self.MANIFEST_COL_NUM_GPUS: tr.measured_gpus,
                    self.MANIFEST_COL_BATCH_SIZE: batch,
                    self.MANIFEST_COL_TRACE_FILE: f"traces/{trace_name}",
                }
            )
    pd.DataFrame(summary_rows).to_csv(out_dir / "traces_summary.csv", index=False)

InferenceData

LLM inference workload with offline simulation data.

Bundles model specifications with power templates and latency distributions. Validates that all models have matching data entries.

Parameters:

Name Type Description Default
models tuple[InferenceModelSpec, ...]

Model specifications as a tuple of InferenceModelSpec.

required
power_templates InferenceTemplateStore

Pre-built per-GPU power templates for all models and batch sizes, created via InferenceTraceStore.build_templates.

required
itl_fits ITLFitStore | None

Per-model ITL mixture distributions. Required when using controllers that read observed latency (e.g., OFOBatchSizeController). When omitted, NaN is reported for observed latency.

None
Source code in openg2g/datacenter/workloads/inference.py
class InferenceData:
    """LLM inference workload with offline simulation data.

    Bundles model specifications with power templates and latency
    distributions. Validates that all models have matching data entries.

    Args:
        models: Model specifications as a tuple of
            [`InferenceModelSpec`][openg2g.datacenter.config.InferenceModelSpec].
        power_templates: Pre-built per-GPU power templates for all models
            and batch sizes, created via
            [`InferenceTraceStore.build_templates`][..InferenceTraceStore.build_templates].
        itl_fits: Per-model ITL mixture distributions. Required when using
            controllers that read observed latency (e.g.,
            `OFOBatchSizeController`). When omitted, NaN is reported for
            observed latency.
    """

    def __init__(
        self,
        models: tuple[InferenceModelSpec, ...],
        *,
        power_templates: InferenceTemplateStore,
        itl_fits: ITLFitStore | None = None,
    ) -> None:
        if isinstance(power_templates, InferenceTraceStore):
            raise TypeError(
                "Expected a InferenceTemplateStore, got InferenceTraceStore. "
                "Call InferenceTraceStore.build_templates() first to create a InferenceTemplateStore."
            )
        if not models:
            raise ValueError("models must not be empty.")
        labels = [ms.model_label for ms in models]
        if len(labels) != len(set(labels)):
            raise ValueError(f"Duplicate model labels: {labels}")

        self._models = models
        self._power_templates: InferenceTemplateStore | None = power_templates
        self._trace_store: InferenceTraceStore | None = None
        self._itl_fit_store: ITLFitStore | None = None
        self._itl_fits = itl_fits
        self._itl_samples_df: pd.DataFrame | None = None

        for ms in self._models:
            try:
                power_templates.batch_sizes(ms.model_label)
            except KeyError:
                raise ValueError(
                    f"Power templates missing for model {ms.model_label!r}. "
                    f"Ensure InferenceTraceStore contains traces for all models."
                ) from None

            if itl_fits is not None and ms.model_label not in itl_fits.distributions:
                raise ValueError(
                    f"ITL fits missing for model {ms.model_label!r}. "
                    f"Available models in ITLFitStore: {sorted(itl_fits.distributions.keys())}"
                )

    def filter_models(
        self,
        models: tuple[InferenceModelSpec, ...],
    ) -> InferenceData:
        """Return a new InferenceData containing only the specified models."""
        labels = {ms.model_label for ms in models}

        # Filter power templates
        filtered_templates = {k: v for k, v in self._power_templates._templates.items() if k[0] in labels}
        filtered_batch_sizes = {k: v for k, v in self._power_templates._batch_sizes_by_model.items() if k in labels}
        new_templates = InferenceTemplateStore(filtered_templates, filtered_batch_sizes)

        # Filter ITL fits
        new_itl_fits = None
        if self._itl_fits is not None:
            filtered_dists = {k: v for k, v in self._itl_fits.distributions.items() if k in labels}
            new_itl_fits = ITLFitStore(filtered_dists)

        return InferenceData(models, power_templates=new_templates, itl_fits=new_itl_fits)

    @classmethod
    def generate(
        cls,
        models: tuple[InferenceModelSpec, ...],
        data_sources: dict[str, MLEnergySource],
        *,
        runs: Any = None,
        mlenergy_data_dir: Path | None = None,
        dt_s: float = 0.1,
        seed: int = 0,
        itl_sample_cap: int = 2048,
    ) -> InferenceData:
        """Generate inference data from ML.ENERGY benchmark data.

        Produces power traces and ITL mixture fits for all models and
        batch sizes specified in `data_sources`.

        Args:
            models: Model specifications.
            data_sources: Per-model benchmark data extraction settings,
                keyed by `model_label`.
            runs: Pre-loaded `LLMRuns` object. If `None`, loads from
                `mlenergy_data_dir` or the HuggingFace Hub.
            mlenergy_data_dir: Path to compiled mlenergy-data directory.
                Ignored if `runs` is provided.
            dt_s: Trace timestep (seconds).
            seed: Random seed for ITL fitting.
            itl_sample_cap: Maximum ITL samples per run for fitting.

        Returns:
            A new `InferenceData` with generated traces and ITL fits (no
            templates -- call `InferenceTraceStore.build_templates()` on the
            saved/loaded store to get templates).
        """
        if runs is None:
            unique_tasks = {src.task for src in data_sources.values()}
            if mlenergy_data_dir:
                logger.info("Loading runs from %s (tasks: %s)", mlenergy_data_dir, sorted(unique_tasks))
                runs = LLMRuns.from_directory(str(mlenergy_data_dir), stable_only=False).task(*unique_tasks)
            else:
                logger.info("Loading runs from Hugging Face Hub (tasks: %s)", sorted(unique_tasks))
                runs = LLMRuns.from_hf(stable_only=False).task(*unique_tasks)
        if not runs:
            raise ValueError("No runs found for the specified tasks")

        subsets_by_label: dict[str, Any] = {}
        tl_frames: list[pd.DataFrame] = []
        itl_frames: list[pd.DataFrame] = []

        for ms in models:
            src = data_sources.get(ms.model_label)
            if src is None:
                raise ValueError(f"No data source for model {ms.model_label!r}")
            model_id = ms.model_id
            if not model_id:
                raise ValueError(f"model_id is required for data generation (model={ms.model_label!r})")

            subset = (
                runs.model_id(model_id).gpu_model(src.gpu).num_gpus(ms.gpus_per_replica).max_num_seqs(*src.batch_sizes)
            )
            if not subset:
                raise ValueError(
                    f"Config matched zero runs: model_id={model_id!r}, "
                    f"gpu={src.gpu!r}, num_gpus={ms.gpus_per_replica}, "
                    f"batch_sizes={src.batch_sizes}"
                )
            subsets_by_label[ms.model_label] = subset
            logger.info(
                "%s: %d runs (model_id=%s, gpu=%s, num_gpus=%d, batches=%s)",
                ms.model_label,
                len(subset),
                model_id,
                src.gpu,
                ms.gpus_per_replica,
                sorted({r.max_num_seqs for r in subset}),
            )

        logger.info("Downloading raw result files for %d models ...", len(subsets_by_label))
        for subset in subsets_by_label.values():
            subset.download_raw_files(file="results")
        logger.info("Downloads complete. Extracting timelines and ITL samples ...")

        for label, subset in subsets_by_label.items():
            for run in subset:
                tl = run.timelines(metric="power.device_instant")
                tl["model_label"] = label
                tl["num_gpus"] = run.num_gpus
                tl["max_num_seqs"] = run.max_num_seqs
                tl["run_index"] = len(tl_frames)
                tl_frames.append(tl)

            itl = subset.inter_token_latencies()
            itl["model_label"] = label
            itl_frames.append(itl)

        all_tl = pd.concat(tl_frames, ignore_index=True)
        itl_samples_df = pd.concat(itl_frames, ignore_index=True)
        logger.info("Building trace store (%d timeline rows) and fitting ITL models ...", len(all_tl))

        trace_store = _build_trace_store_from_timelines(all_tl, dt_s=dt_s)
        itl_fit_store = _build_itl_fit_store(itl_samples_df, max_samples=itl_sample_cap, seed=seed)

        return cls._from_stores(
            models,
            trace_store=trace_store,
            itl_fit_store=itl_fit_store,
            itl_samples_df=itl_samples_df,
        )

    @classmethod
    def _from_stores(
        cls,
        models: tuple[InferenceModelSpec, ...],
        *,
        trace_store: InferenceTraceStore,
        itl_fit_store: ITLFitStore,
        itl_samples_df: pd.DataFrame | None = None,
    ) -> InferenceData:
        """Create from raw stores (internal, used by generate)."""
        instance = object.__new__(cls)
        instance._models = models
        instance._trace_store = trace_store
        instance._itl_fit_store = itl_fit_store
        instance._power_templates = None
        instance._itl_fits = itl_fit_store
        instance._itl_samples_df = itl_samples_df
        return instance

    def save(self, out_dir: Path, *, plot: bool = False) -> None:
        """Save traces and ITL fits to a directory.

        Args:
            out_dir: Output directory.
            plot: If `True`, also write characterization plots (power
                trajectories, ITL distributions).
        """
        out_dir = Path(out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)
        if self._trace_store is not None:
            self._trace_store.save(out_dir)
        if self._itl_fits is not None:
            self._itl_fits.save(out_dir / "latency_fits.csv")

        (out_dir / "_manifest.json").write_text(
            json.dumps({"openg2g_version": openg2g.__version__}, indent=2, sort_keys=True)
        )

        if plot and self._trace_store is not None:
            _plot_power_trajectories(self._trace_store, self._models, out_dir)
            itl_samples = self._itl_samples_df
            if self._itl_fit_store is not None and itl_samples is not None:
                for ms in self._models:
                    _plot_itl_distributions(self._itl_fit_store, itl_samples, ms.model_label, out_dir)

    @classmethod
    def load(
        cls,
        data_dir: Path,
        models: tuple[InferenceModelSpec, ...],
        *,
        duration_s: float = 600.0,
        dt_s: float = 0.1,
        steady_skip_s: float = 0.0,
    ) -> InferenceData:
        """Load from a generated data directory.

        Loads traces from `traces_summary.csv`, builds templates, and
        loads ITL fits from `latency_fits.csv`.

        Args:
            data_dir: Directory containing generated data.
            models: Model specifications.
            duration_s: Simulation duration for template building.
            dt_s: Simulation timestep for template building.
            steady_skip_s: Skip seconds for template building.
        """
        data_dir = Path(data_dir)
        _check_version_stamp(data_dir, "InferenceData")
        store = InferenceTraceStore.load(data_dir / "traces_summary.csv")
        templates = store.build_templates(duration_s=duration_s, dt_s=dt_s, steady_skip_s=steady_skip_s)
        itl_fits = ITLFitStore.load(data_dir / "latency_fits.csv")
        return cls(models, power_templates=templates, itl_fits=itl_fits)

    @classmethod
    def ensure(
        cls,
        data_dir: Path,
        models: tuple[InferenceModelSpec, ...],
        data_sources: dict[str, MLEnergySource] | None = None,
        *,
        mlenergy_data_dir: Path | None = None,
        plot: bool = False,
        duration_s: float = 600.0,
        dt_s: float = 0.1,
        steady_skip_s: float = 0.0,
    ) -> InferenceData:
        """Load from `data_dir`, generating first if needed.

        If `data_dir/traces_summary.csv` does not exist, generates
        inference data from ML.ENERGY benchmark data and saves it.
        Then loads and returns.

        Args:
            data_dir: Data directory (generated files go here).
            models: Model specifications.
            data_sources: Per-model benchmark data extraction settings,
                keyed by `model_label`. Required when no cached data exists.
            mlenergy_data_dir: Path to compiled mlenergy-data directory.
            plot: If `True`, generate characterization plots on generation.
            duration_s: Simulation duration for template building.
            dt_s: Simulation timestep for template building.
            steady_skip_s: Skip seconds for template building.
        """
        data_dir = Path(data_dir)
        if not (data_dir / "traces_summary.csv").exists():
            if data_sources is None:
                raise ValueError("data_sources required for InferenceData generation (no cached data)")
            logger.info("Generating inference data to %s ...", data_dir)
            cls.generate(
                models,
                data_sources,
                mlenergy_data_dir=mlenergy_data_dir,
                dt_s=dt_s,
            ).save(data_dir, plot=plot)
        return cls.load(data_dir, models, duration_s=duration_s, dt_s=dt_s, steady_skip_s=steady_skip_s)

    @property
    def models(self) -> tuple[InferenceModelSpec, ...]:
        """The model specifications."""
        return self._models

    @property
    def power_templates(self) -> InferenceTemplateStore:
        if self._power_templates is None:
            raise RuntimeError("power_templates not available (generate-only instance). Load from disk first.")
        return self._power_templates

    @property
    def itl_fits(self) -> ITLFitStore | None:
        return self._itl_fits

models property

The model specifications.

filter_models(models)

Return a new InferenceData containing only the specified models.

Source code in openg2g/datacenter/workloads/inference.py
def filter_models(
    self,
    models: tuple[InferenceModelSpec, ...],
) -> InferenceData:
    """Return a new InferenceData containing only the specified models."""
    labels = {ms.model_label for ms in models}

    # Filter power templates
    filtered_templates = {k: v for k, v in self._power_templates._templates.items() if k[0] in labels}
    filtered_batch_sizes = {k: v for k, v in self._power_templates._batch_sizes_by_model.items() if k in labels}
    new_templates = InferenceTemplateStore(filtered_templates, filtered_batch_sizes)

    # Filter ITL fits
    new_itl_fits = None
    if self._itl_fits is not None:
        filtered_dists = {k: v for k, v in self._itl_fits.distributions.items() if k in labels}
        new_itl_fits = ITLFitStore(filtered_dists)

    return InferenceData(models, power_templates=new_templates, itl_fits=new_itl_fits)

generate(models, data_sources, *, runs=None, mlenergy_data_dir=None, dt_s=0.1, seed=0, itl_sample_cap=2048) classmethod

Generate inference data from ML.ENERGY benchmark data.

Produces power traces and ITL mixture fits for all models and batch sizes specified in data_sources.

Parameters:

Name Type Description Default
models tuple[InferenceModelSpec, ...]

Model specifications.

required
data_sources dict[str, MLEnergySource]

Per-model benchmark data extraction settings, keyed by model_label.

required
runs Any

Pre-loaded LLMRuns object. If None, loads from mlenergy_data_dir or the HuggingFace Hub.

None
mlenergy_data_dir Path | None

Path to compiled mlenergy-data directory. Ignored if runs is provided.

None
dt_s float

Trace timestep (seconds).

0.1
seed int

Random seed for ITL fitting.

0
itl_sample_cap int

Maximum ITL samples per run for fitting.

2048

Returns:

Type Description
InferenceData

A new InferenceData with generated traces and ITL fits (no

InferenceData

templates -- call InferenceTraceStore.build_templates() on the

InferenceData

saved/loaded store to get templates).

Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def generate(
    cls,
    models: tuple[InferenceModelSpec, ...],
    data_sources: dict[str, MLEnergySource],
    *,
    runs: Any = None,
    mlenergy_data_dir: Path | None = None,
    dt_s: float = 0.1,
    seed: int = 0,
    itl_sample_cap: int = 2048,
) -> InferenceData:
    """Generate inference data from ML.ENERGY benchmark data.

    Produces power traces and ITL mixture fits for all models and
    batch sizes specified in `data_sources`.

    Args:
        models: Model specifications.
        data_sources: Per-model benchmark data extraction settings,
            keyed by `model_label`.
        runs: Pre-loaded `LLMRuns` object. If `None`, loads from
            `mlenergy_data_dir` or the HuggingFace Hub.
        mlenergy_data_dir: Path to compiled mlenergy-data directory.
            Ignored if `runs` is provided.
        dt_s: Trace timestep (seconds).
        seed: Random seed for ITL fitting.
        itl_sample_cap: Maximum ITL samples per run for fitting.

    Returns:
        A new `InferenceData` with generated traces and ITL fits (no
        templates -- call `InferenceTraceStore.build_templates()` on the
        saved/loaded store to get templates).
    """
    if runs is None:
        unique_tasks = {src.task for src in data_sources.values()}
        if mlenergy_data_dir:
            logger.info("Loading runs from %s (tasks: %s)", mlenergy_data_dir, sorted(unique_tasks))
            runs = LLMRuns.from_directory(str(mlenergy_data_dir), stable_only=False).task(*unique_tasks)
        else:
            logger.info("Loading runs from Hugging Face Hub (tasks: %s)", sorted(unique_tasks))
            runs = LLMRuns.from_hf(stable_only=False).task(*unique_tasks)
    if not runs:
        raise ValueError("No runs found for the specified tasks")

    subsets_by_label: dict[str, Any] = {}
    tl_frames: list[pd.DataFrame] = []
    itl_frames: list[pd.DataFrame] = []

    for ms in models:
        src = data_sources.get(ms.model_label)
        if src is None:
            raise ValueError(f"No data source for model {ms.model_label!r}")
        model_id = ms.model_id
        if not model_id:
            raise ValueError(f"model_id is required for data generation (model={ms.model_label!r})")

        subset = (
            runs.model_id(model_id).gpu_model(src.gpu).num_gpus(ms.gpus_per_replica).max_num_seqs(*src.batch_sizes)
        )
        if not subset:
            raise ValueError(
                f"Config matched zero runs: model_id={model_id!r}, "
                f"gpu={src.gpu!r}, num_gpus={ms.gpus_per_replica}, "
                f"batch_sizes={src.batch_sizes}"
            )
        subsets_by_label[ms.model_label] = subset
        logger.info(
            "%s: %d runs (model_id=%s, gpu=%s, num_gpus=%d, batches=%s)",
            ms.model_label,
            len(subset),
            model_id,
            src.gpu,
            ms.gpus_per_replica,
            sorted({r.max_num_seqs for r in subset}),
        )

    logger.info("Downloading raw result files for %d models ...", len(subsets_by_label))
    for subset in subsets_by_label.values():
        subset.download_raw_files(file="results")
    logger.info("Downloads complete. Extracting timelines and ITL samples ...")

    for label, subset in subsets_by_label.items():
        for run in subset:
            tl = run.timelines(metric="power.device_instant")
            tl["model_label"] = label
            tl["num_gpus"] = run.num_gpus
            tl["max_num_seqs"] = run.max_num_seqs
            tl["run_index"] = len(tl_frames)
            tl_frames.append(tl)

        itl = subset.inter_token_latencies()
        itl["model_label"] = label
        itl_frames.append(itl)

    all_tl = pd.concat(tl_frames, ignore_index=True)
    itl_samples_df = pd.concat(itl_frames, ignore_index=True)
    logger.info("Building trace store (%d timeline rows) and fitting ITL models ...", len(all_tl))

    trace_store = _build_trace_store_from_timelines(all_tl, dt_s=dt_s)
    itl_fit_store = _build_itl_fit_store(itl_samples_df, max_samples=itl_sample_cap, seed=seed)

    return cls._from_stores(
        models,
        trace_store=trace_store,
        itl_fit_store=itl_fit_store,
        itl_samples_df=itl_samples_df,
    )

save(out_dir, *, plot=False)

Save traces and ITL fits to a directory.

Parameters:

Name Type Description Default
out_dir Path

Output directory.

required
plot bool

If True, also write characterization plots (power trajectories, ITL distributions).

False
Source code in openg2g/datacenter/workloads/inference.py
def save(self, out_dir: Path, *, plot: bool = False) -> None:
    """Save traces and ITL fits to a directory.

    Args:
        out_dir: Output directory.
        plot: If `True`, also write characterization plots (power
            trajectories, ITL distributions).
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    if self._trace_store is not None:
        self._trace_store.save(out_dir)
    if self._itl_fits is not None:
        self._itl_fits.save(out_dir / "latency_fits.csv")

    (out_dir / "_manifest.json").write_text(
        json.dumps({"openg2g_version": openg2g.__version__}, indent=2, sort_keys=True)
    )

    if plot and self._trace_store is not None:
        _plot_power_trajectories(self._trace_store, self._models, out_dir)
        itl_samples = self._itl_samples_df
        if self._itl_fit_store is not None and itl_samples is not None:
            for ms in self._models:
                _plot_itl_distributions(self._itl_fit_store, itl_samples, ms.model_label, out_dir)

load(data_dir, models, *, duration_s=600.0, dt_s=0.1, steady_skip_s=0.0) classmethod

Load from a generated data directory.

Loads traces from traces_summary.csv, builds templates, and loads ITL fits from latency_fits.csv.

Parameters:

Name Type Description Default
data_dir Path

Directory containing generated data.

required
models tuple[InferenceModelSpec, ...]

Model specifications.

required
duration_s float

Simulation duration for template building.

600.0
dt_s float

Simulation timestep for template building.

0.1
steady_skip_s float

Skip seconds for template building.

0.0
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def load(
    cls,
    data_dir: Path,
    models: tuple[InferenceModelSpec, ...],
    *,
    duration_s: float = 600.0,
    dt_s: float = 0.1,
    steady_skip_s: float = 0.0,
) -> InferenceData:
    """Load from a generated data directory.

    Loads traces from `traces_summary.csv`, builds templates, and
    loads ITL fits from `latency_fits.csv`.

    Args:
        data_dir: Directory containing generated data.
        models: Model specifications.
        duration_s: Simulation duration for template building.
        dt_s: Simulation timestep for template building.
        steady_skip_s: Skip seconds for template building.
    """
    data_dir = Path(data_dir)
    _check_version_stamp(data_dir, "InferenceData")
    store = InferenceTraceStore.load(data_dir / "traces_summary.csv")
    templates = store.build_templates(duration_s=duration_s, dt_s=dt_s, steady_skip_s=steady_skip_s)
    itl_fits = ITLFitStore.load(data_dir / "latency_fits.csv")
    return cls(models, power_templates=templates, itl_fits=itl_fits)

ensure(data_dir, models, data_sources=None, *, mlenergy_data_dir=None, plot=False, duration_s=600.0, dt_s=0.1, steady_skip_s=0.0) classmethod

Load from data_dir, generating first if needed.

If data_dir/traces_summary.csv does not exist, generates inference data from ML.ENERGY benchmark data and saves it. Then loads and returns.

Parameters:

Name Type Description Default
data_dir Path

Data directory (generated files go here).

required
models tuple[InferenceModelSpec, ...]

Model specifications.

required
data_sources dict[str, MLEnergySource] | None

Per-model benchmark data extraction settings, keyed by model_label. Required when no cached data exists.

None
mlenergy_data_dir Path | None

Path to compiled mlenergy-data directory.

None
plot bool

If True, generate characterization plots on generation.

False
duration_s float

Simulation duration for template building.

600.0
dt_s float

Simulation timestep for template building.

0.1
steady_skip_s float

Skip seconds for template building.

0.0
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def ensure(
    cls,
    data_dir: Path,
    models: tuple[InferenceModelSpec, ...],
    data_sources: dict[str, MLEnergySource] | None = None,
    *,
    mlenergy_data_dir: Path | None = None,
    plot: bool = False,
    duration_s: float = 600.0,
    dt_s: float = 0.1,
    steady_skip_s: float = 0.0,
) -> InferenceData:
    """Load from `data_dir`, generating first if needed.

    If `data_dir/traces_summary.csv` does not exist, generates
    inference data from ML.ENERGY benchmark data and saves it.
    Then loads and returns.

    Args:
        data_dir: Data directory (generated files go here).
        models: Model specifications.
        data_sources: Per-model benchmark data extraction settings,
            keyed by `model_label`. Required when no cached data exists.
        mlenergy_data_dir: Path to compiled mlenergy-data directory.
        plot: If `True`, generate characterization plots on generation.
        duration_s: Simulation duration for template building.
        dt_s: Simulation timestep for template building.
        steady_skip_s: Skip seconds for template building.
    """
    data_dir = Path(data_dir)
    if not (data_dir / "traces_summary.csv").exists():
        if data_sources is None:
            raise ValueError("data_sources required for InferenceData generation (no cached data)")
        logger.info("Generating inference data to %s ...", data_dir)
        cls.generate(
            models,
            data_sources,
            mlenergy_data_dir=mlenergy_data_dir,
            dt_s=dt_s,
        ).save(data_dir, plot=plot)
    return cls.load(data_dir, models, duration_s=duration_s, dt_s=dt_s, steady_skip_s=steady_skip_s)

InferenceAugmentedPower dataclass

Result of inference power augmentation for one simulation timestep.

Attributes:

Name Type Description
power_w ThreePhase

Three-phase inference power (watts), excluding base load.

power_by_model_w dict[str, float]

Per-model total active power (watts).

active_replicas_by_model dict[str, int]

Per-model active replica count.

allocation dict[str, ndarray]

Server indices allocated to each model in the pool.

Source code in openg2g/datacenter/workloads/inference.py
@dataclass(frozen=True)
class InferenceAugmentedPower:
    """Result of inference power augmentation for one simulation timestep.

    Attributes:
        power_w: Three-phase inference power (watts), excluding base load.
        power_by_model_w: Per-model total active power (watts).
        active_replicas_by_model: Per-model active replica count.
        allocation: Server indices allocated to each model in the pool.
    """

    power_w: ThreePhase
    power_by_model_w: dict[str, float] = field(default_factory=dict)
    active_replicas_by_model: dict[str, int] = field(default_factory=dict)
    allocation: dict[str, np.ndarray] = field(default_factory=dict)

InferencePowerAugmenter

Scales per-GPU inference power through a shared server pool to three-phase power.

Given per-GPU power values for each model's allocated servers, applies per-server scaling, noise, and phase summation to produce inference-level three-phase power.

This class is backend-agnostic. The offline datacenter feeds it template-indexed values; the online datacenter can feed it live-measured values. The datacenter backend is responsible for adding facility base load on top of the returned inference power.

Parameters:

Name Type Description Default
pool ServerPool

Shared server pool for the datacenter.

required
gpus_per_replica_by_model dict[str, int]

Mapping of model label to GPUs per replica.

required
seed int

Random seed for noise RNG.

0
Source code in openg2g/datacenter/workloads/inference.py
class InferencePowerAugmenter:
    """Scales per-GPU inference power through a shared server pool to three-phase power.

    Given per-GPU power values for each model's allocated servers,
    applies per-server scaling, noise, and phase summation to produce
    inference-level three-phase power.

    This class is backend-agnostic. The offline datacenter feeds it
    template-indexed values; the online datacenter can feed it
    live-measured values. The datacenter backend is responsible for
    adding facility base load on top of the returned inference power.

    Args:
        pool: Shared server pool for the datacenter.
        gpus_per_replica_by_model: Mapping of model label to GPUs per replica.
        seed: Random seed for noise RNG.
    """

    def __init__(
        self,
        pool: ServerPool,
        gpus_per_replica_by_model: dict[str, int],
        seed: int = 0,
    ) -> None:
        self._pool = pool
        self._gpus_per_replica = gpus_per_replica_by_model
        self._seed = int(seed)
        self._rng = np.random.default_rng(self._seed)

    def augment(
        self,
        per_gpu_by_model: dict[str, np.ndarray],
        replica_counts: dict[str, int],
    ) -> InferenceAugmentedPower:
        """Augment per-GPU power to three-phase power using shared pool allocation.

        Args:
            per_gpu_by_model: Mapping of model label to per-GPU power
                array of shape `(pool.num_servers,)`. Indexed by absolute
                server index; only entries at indices returned by
                `pool.allocate()` for this model contribute to its power.
            replica_counts: Mapping of model label to effective replica
                count (schedule + runtime adjustments).

        Returns:
            Augmented inference power with three-phase totals, per-model
                power, per-model active replica counts, and the
                server-index allocation used.
        """
        pool = self._pool

        # Allocate servers from shared pool
        gpu_demands = {label: max(0, count) * self._gpus_per_replica[label] for label, count in replica_counts.items()}
        allocation = pool.allocate(gpu_demands)

        # Generate noise for all servers once (deterministic RNG consumption)
        if pool.noise_fraction > 0:
            noise_raw = self._rng.normal(0.0, 1.0, size=pool.num_servers)
        else:
            noise_raw = None

        phase_power = np.zeros(3, dtype=float)
        power_by_model: dict[str, float] = {}
        active_replicas_by_model: dict[str, int] = {}

        for label, per_gpu in per_gpu_by_model.items():
            server_indices = allocation.get(label, np.array([], dtype=int))
            if len(server_indices) == 0:
                power_by_model[label] = 0.0
                active_replicas_by_model[label] = 0
                continue

            server_powers = per_gpu[server_indices] * pool.gpus_per_server * pool.amplitude_scales[server_indices]
            if noise_raw is not None and pool.noise_fraction > 0:
                levels = np.maximum(server_powers, 1.0)
                server_powers += noise_raw[server_indices] * pool.noise_fraction * levels
            server_powers = np.maximum(server_powers, 0.0)

            active_phases = pool.phase_list[server_indices]

            model_phase_power = np.zeros(3, dtype=float)
            np.add.at(model_phase_power, active_phases, server_powers)
            phase_power += model_phase_power

            power_by_model[label] = float(np.sum(server_powers))
            active_gpus = len(server_indices) * pool.gpus_per_server
            active_replicas_by_model[label] = active_gpus // self._gpus_per_replica[label]

        return InferenceAugmentedPower(
            power_w=ThreePhase(
                a=float(phase_power[0]),
                b=float(phase_power[1]),
                c=float(phase_power[2]),
            ),
            power_by_model_w=power_by_model,
            active_replicas_by_model=active_replicas_by_model,
            allocation=allocation,
        )

    def reset(self) -> None:
        """Re-seed the noise RNG to its initial state."""
        self._rng = np.random.default_rng(self._seed)

augment(per_gpu_by_model, replica_counts)

Augment per-GPU power to three-phase power using shared pool allocation.

Parameters:

Name Type Description Default
per_gpu_by_model dict[str, ndarray]

Mapping of model label to per-GPU power array of shape (pool.num_servers,). Indexed by absolute server index; only entries at indices returned by pool.allocate() for this model contribute to its power.

required
replica_counts dict[str, int]

Mapping of model label to effective replica count (schedule + runtime adjustments).

required

Returns:

Type Description
InferenceAugmentedPower

Augmented inference power with three-phase totals, per-model power, per-model active replica counts, and the server-index allocation used.

Source code in openg2g/datacenter/workloads/inference.py
def augment(
    self,
    per_gpu_by_model: dict[str, np.ndarray],
    replica_counts: dict[str, int],
) -> InferenceAugmentedPower:
    """Augment per-GPU power to three-phase power using shared pool allocation.

    Args:
        per_gpu_by_model: Mapping of model label to per-GPU power
            array of shape `(pool.num_servers,)`. Indexed by absolute
            server index; only entries at indices returned by
            `pool.allocate()` for this model contribute to its power.
        replica_counts: Mapping of model label to effective replica
            count (schedule + runtime adjustments).

    Returns:
        Augmented inference power with three-phase totals, per-model
            power, per-model active replica counts, and the
            server-index allocation used.
    """
    pool = self._pool

    # Allocate servers from shared pool
    gpu_demands = {label: max(0, count) * self._gpus_per_replica[label] for label, count in replica_counts.items()}
    allocation = pool.allocate(gpu_demands)

    # Generate noise for all servers once (deterministic RNG consumption)
    if pool.noise_fraction > 0:
        noise_raw = self._rng.normal(0.0, 1.0, size=pool.num_servers)
    else:
        noise_raw = None

    phase_power = np.zeros(3, dtype=float)
    power_by_model: dict[str, float] = {}
    active_replicas_by_model: dict[str, int] = {}

    for label, per_gpu in per_gpu_by_model.items():
        server_indices = allocation.get(label, np.array([], dtype=int))
        if len(server_indices) == 0:
            power_by_model[label] = 0.0
            active_replicas_by_model[label] = 0
            continue

        server_powers = per_gpu[server_indices] * pool.gpus_per_server * pool.amplitude_scales[server_indices]
        if noise_raw is not None and pool.noise_fraction > 0:
            levels = np.maximum(server_powers, 1.0)
            server_powers += noise_raw[server_indices] * pool.noise_fraction * levels
        server_powers = np.maximum(server_powers, 0.0)

        active_phases = pool.phase_list[server_indices]

        model_phase_power = np.zeros(3, dtype=float)
        np.add.at(model_phase_power, active_phases, server_powers)
        phase_power += model_phase_power

        power_by_model[label] = float(np.sum(server_powers))
        active_gpus = len(server_indices) * pool.gpus_per_server
        active_replicas_by_model[label] = active_gpus // self._gpus_per_replica[label]

    return InferenceAugmentedPower(
        power_w=ThreePhase(
            a=float(phase_power[0]),
            b=float(phase_power[1]),
            c=float(phase_power[2]),
        ),
        power_by_model_w=power_by_model,
        active_replicas_by_model=active_replicas_by_model,
        allocation=allocation,
    )

reset()

Re-seed the noise RNG to its initial state.

Source code in openg2g/datacenter/workloads/inference.py
def reset(self) -> None:
    """Re-seed the noise RNG to its initial state."""
    self._rng = np.random.default_rng(self._seed)

RequestsConfig

Bases: BaseModel

Configuration for building per-model JSONL request files.

Attributes:

Name Type Description
dataset str

Dataset to sample prompts from ("gpqa" or "lm-arena-chat").

num_requests int

Number of requests to sample per model.

max_completion_tokens int

Maximum output tokens per request.

seed int

Random seed for dataset shuffling and oversampling.

system_prompt str

System prompt prepended to every request.

Source code in openg2g/datacenter/workloads/inference.py
class RequestsConfig(BaseModel):
    """Configuration for building per-model JSONL request files.

    Attributes:
        dataset: Dataset to sample prompts from (`"gpqa"` or `"lm-arena-chat"`).
        num_requests: Number of requests to sample per model.
        max_completion_tokens: Maximum output tokens per request.
        seed: Random seed for dataset shuffling and oversampling.
        system_prompt: System prompt prepended to every request.
    """

    model_config = ConfigDict(frozen=True)

    dataset: str = "lm-arena-chat"
    num_requests: int = 1000
    max_completion_tokens: int = 512
    seed: int = 0
    system_prompt: str = "You are a helpful AI assistant."

RequestStore

Per-model request dicts for online load generation.

Each model's requests are stored as a list of OpenAI Chat Completion streaming request dicts, suitable for submission to a vLLM server.

Attributes:

Name Type Description
requests_by_model

Mapping from model label to request dicts.

Source code in openg2g/datacenter/workloads/inference.py
class RequestStore:
    """Per-model request dicts for online load generation.

    Each model's requests are stored as a list of OpenAI Chat Completion
    streaming request dicts, suitable for submission to a vLLM server.

    Attributes:
        requests_by_model: Mapping from model label to request dicts.
    """

    def __init__(self, requests_by_model: dict[str, list[dict]]) -> None:
        self.requests_by_model = requests_by_model

    @classmethod
    def generate(
        cls,
        models: Sequence[InferenceModelSpec],
        config: RequestsConfig | None = None,
        *,
        extra_body_by_model: dict[str, dict] | None = None,
    ) -> RequestStore:
        """Sample prompts and build per-model request dicts.

        Requires `pip install datasets openai`.

        Args:
            models: Model specifications. Uses `model_id` for the API
                model field.
            config: Request generation config. Uses defaults if `None`.
            extra_body_by_model: Optional per-model extra fields merged
                into every request dict (e.g. `chat_template_kwargs`).
                Keyed by `model_label`.
        """
        import random as _random

        from datasets import load_dataset
        from openai.types.chat import (
            ChatCompletionAssistantMessageParam,
            ChatCompletionContentPartTextParam,
            ChatCompletionMessageParam,
            ChatCompletionSystemMessageParam,
            ChatCompletionUserMessageParam,
        )
        from openai.types.chat.completion_create_params import CompletionCreateParamsStreaming

        if config is None:
            config = RequestsConfig()

        def _text_part(text: str) -> ChatCompletionContentPartTextParam:
            return ChatCompletionContentPartTextParam(type="text", text=text)

        def _prompt_to_messages(prompt: str | list[str]) -> list[ChatCompletionMessageParam]:
            if isinstance(prompt, str):
                return [ChatCompletionUserMessageParam(role="user", content=[_text_part(prompt)])]
            msgs: list[ChatCompletionMessageParam] = [
                ChatCompletionUserMessageParam(role="user", content=[_text_part(prompt[0])])
            ]
            for i, turn in enumerate(prompt[1:]):
                if i % 2 == 0:
                    msgs.append(ChatCompletionAssistantMessageParam(role="assistant", content=[_text_part(turn)]))
                else:
                    msgs.append(ChatCompletionUserMessageParam(role="user", content=[_text_part(turn)]))
            return msgs

        def _maybe_oversample(items: list, target: int, seed: int) -> None:
            if len(items) >= target:
                return
            rng = _random.Random(seed)
            original = list(items)
            while len(items) < target:
                items.append(rng.choice(original))

        def _sample_lm_arena_chat(num_requests: int, seed: int) -> list[str | list[str]]:
            data = load_dataset("lmarena-ai/arena-human-preference-100k", split="train").shuffle(seed=seed)
            prompts: list[str | list[str]] = []
            for item in data:
                num_turns = item["turn"]
                conversation = item["conversation_a"]
                for turns in range(num_turns):
                    if len(prompts) >= num_requests:
                        break
                    messages: list[str] = []
                    for message in conversation[: 2 * turns + 1]:
                        messages.append(message["content"])
                    prompts.append(messages if len(messages) > 1 else messages[0])
                if len(prompts) >= num_requests:
                    break
            _maybe_oversample(prompts, num_requests, seed)
            return prompts

        def _sample_gpqa(num_requests: int, seed: int) -> list[str | list[str]]:
            data = load_dataset("Idavidrein/gpqa", "gpqa_extended", split="train", streaming=True).shuffle(seed=seed)
            _random.seed(seed)
            prompts: list[str | list[str]] = []
            for item in data:
                if len(prompts) >= num_requests:
                    break
                choices = [
                    item["Incorrect Answer 1"].strip(),
                    item["Incorrect Answer 2"].strip(),
                    item["Incorrect Answer 3"].strip(),
                    item["Correct Answer"].strip(),
                ]
                _random.shuffle(choices)
                question = item["Question"]
                prompt = f"What is the correct answer to the following question: {question}\n\nChoices:"
                for letter, choice in zip("ABCD", choices, strict=True):
                    prompt += f"\n({letter}) {choice}"
                prompts.append(prompt)
            _maybe_oversample(prompts, num_requests, seed)
            return prompts

        samplers = {"lm-arena-chat": _sample_lm_arena_chat, "gpqa": _sample_gpqa}
        sampler = samplers.get(config.dataset)
        if sampler is None:
            raise ValueError(f"Unknown dataset: {config.dataset!r}. Available: {sorted(samplers)}")

        extra = extra_body_by_model or {}

        requests_by_model: dict[str, list[dict]] = {}
        for spec in models:
            label = spec.model_label
            model_id = spec.model_id

            logger.info("Sampling %d %s prompts for %s (%s)...", config.num_requests, config.dataset, label, model_id)
            prompts = sampler(num_requests=config.num_requests, seed=config.seed)

            system_msgs: list[ChatCompletionMessageParam] = []
            if config.system_prompt:
                system_msgs.append(ChatCompletionSystemMessageParam(role="system", content=config.system_prompt))

            template = CompletionCreateParamsStreaming(
                model=model_id,
                messages=system_msgs,
                max_completion_tokens=config.max_completion_tokens,
                stream=True,
                stream_options={"include_usage": True, "continuous_usage_stats": True},
            )
            if label in extra:
                template.update(extra[label])

            reqs: list[dict] = []
            for prompt in prompts:
                request = dict(template)
                request["messages"] = list(template["messages"]) + _prompt_to_messages(prompt)
                reqs.append(request)
            requests_by_model[label] = reqs

        return cls(requests_by_model)

    def save(self, out_dir: Path) -> None:
        """Write per-model JSONL files to `out_dir`.

        Args:
            out_dir: Output directory. Created if it doesn't exist.
        """
        out_dir = Path(out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)
        for label, reqs in self.requests_by_model.items():
            out_path = out_dir / f"{label}.jsonl"
            with open(out_path, "w") as f:
                for req in reqs:
                    f.write(json.dumps(req) + "\n")
            logger.info("Wrote %d requests for %s to %s", len(reqs), label, out_path)

    @classmethod
    def load(cls, out_dir: Path) -> RequestStore:
        """Load per-model JSONL files from `out_dir`.

        Args:
            out_dir: Directory containing `{model_label}.jsonl` files.
        """
        requests_by_model: dict[str, list[dict]] = {}
        out_dir = Path(out_dir)
        for path in sorted(out_dir.glob("*.jsonl")):
            label = path.stem
            reqs: list[dict] = []
            with open(path) as f:
                for line in f:
                    line = line.strip()
                    if line:
                        reqs.append(json.loads(line))
            requests_by_model[label] = reqs
            logger.info("Loaded %d requests for %s", len(reqs), label)
        return cls(requests_by_model)

    @classmethod
    def ensure(
        cls,
        out_dir: Path,
        models: Sequence[InferenceModelSpec] | None = None,
        config: RequestsConfig | None = None,
        *,
        extra_body_by_model: dict[str, dict] | None = None,
    ) -> RequestStore:
        """Load request files from `out_dir`, generating first if needed.

        Args:
            out_dir: Directory for JSONL files.
            models: Required if request files don't exist yet.
            config: Request generation config. Uses defaults if `None`.
            extra_body_by_model: Optional per-model extra fields for
                request generation. Keyed by `model_label`.
        """
        out_dir = Path(out_dir)
        if not out_dir.exists():
            if models is None:
                raise ValueError("models required (no cached request data)")
            logger.info("Generating request files to %s ...", out_dir)
            cls.generate(models, config, extra_body_by_model=extra_body_by_model).save(out_dir)
        return cls.load(out_dir)

generate(models, config=None, *, extra_body_by_model=None) classmethod

Sample prompts and build per-model request dicts.

Requires pip install datasets openai.

Parameters:

Name Type Description Default
models Sequence[InferenceModelSpec]

Model specifications. Uses model_id for the API model field.

required
config RequestsConfig | None

Request generation config. Uses defaults if None.

None
extra_body_by_model dict[str, dict] | None

Optional per-model extra fields merged into every request dict (e.g. chat_template_kwargs). Keyed by model_label.

None
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def generate(
    cls,
    models: Sequence[InferenceModelSpec],
    config: RequestsConfig | None = None,
    *,
    extra_body_by_model: dict[str, dict] | None = None,
) -> RequestStore:
    """Sample prompts and build per-model request dicts.

    Requires `pip install datasets openai`.

    Args:
        models: Model specifications. Uses `model_id` for the API
            model field.
        config: Request generation config. Uses defaults if `None`.
        extra_body_by_model: Optional per-model extra fields merged
            into every request dict (e.g. `chat_template_kwargs`).
            Keyed by `model_label`.
    """
    import random as _random

    from datasets import load_dataset
    from openai.types.chat import (
        ChatCompletionAssistantMessageParam,
        ChatCompletionContentPartTextParam,
        ChatCompletionMessageParam,
        ChatCompletionSystemMessageParam,
        ChatCompletionUserMessageParam,
    )
    from openai.types.chat.completion_create_params import CompletionCreateParamsStreaming

    if config is None:
        config = RequestsConfig()

    def _text_part(text: str) -> ChatCompletionContentPartTextParam:
        return ChatCompletionContentPartTextParam(type="text", text=text)

    def _prompt_to_messages(prompt: str | list[str]) -> list[ChatCompletionMessageParam]:
        if isinstance(prompt, str):
            return [ChatCompletionUserMessageParam(role="user", content=[_text_part(prompt)])]
        msgs: list[ChatCompletionMessageParam] = [
            ChatCompletionUserMessageParam(role="user", content=[_text_part(prompt[0])])
        ]
        for i, turn in enumerate(prompt[1:]):
            if i % 2 == 0:
                msgs.append(ChatCompletionAssistantMessageParam(role="assistant", content=[_text_part(turn)]))
            else:
                msgs.append(ChatCompletionUserMessageParam(role="user", content=[_text_part(turn)]))
        return msgs

    def _maybe_oversample(items: list, target: int, seed: int) -> None:
        if len(items) >= target:
            return
        rng = _random.Random(seed)
        original = list(items)
        while len(items) < target:
            items.append(rng.choice(original))

    def _sample_lm_arena_chat(num_requests: int, seed: int) -> list[str | list[str]]:
        data = load_dataset("lmarena-ai/arena-human-preference-100k", split="train").shuffle(seed=seed)
        prompts: list[str | list[str]] = []
        for item in data:
            num_turns = item["turn"]
            conversation = item["conversation_a"]
            for turns in range(num_turns):
                if len(prompts) >= num_requests:
                    break
                messages: list[str] = []
                for message in conversation[: 2 * turns + 1]:
                    messages.append(message["content"])
                prompts.append(messages if len(messages) > 1 else messages[0])
            if len(prompts) >= num_requests:
                break
        _maybe_oversample(prompts, num_requests, seed)
        return prompts

    def _sample_gpqa(num_requests: int, seed: int) -> list[str | list[str]]:
        data = load_dataset("Idavidrein/gpqa", "gpqa_extended", split="train", streaming=True).shuffle(seed=seed)
        _random.seed(seed)
        prompts: list[str | list[str]] = []
        for item in data:
            if len(prompts) >= num_requests:
                break
            choices = [
                item["Incorrect Answer 1"].strip(),
                item["Incorrect Answer 2"].strip(),
                item["Incorrect Answer 3"].strip(),
                item["Correct Answer"].strip(),
            ]
            _random.shuffle(choices)
            question = item["Question"]
            prompt = f"What is the correct answer to the following question: {question}\n\nChoices:"
            for letter, choice in zip("ABCD", choices, strict=True):
                prompt += f"\n({letter}) {choice}"
            prompts.append(prompt)
        _maybe_oversample(prompts, num_requests, seed)
        return prompts

    samplers = {"lm-arena-chat": _sample_lm_arena_chat, "gpqa": _sample_gpqa}
    sampler = samplers.get(config.dataset)
    if sampler is None:
        raise ValueError(f"Unknown dataset: {config.dataset!r}. Available: {sorted(samplers)}")

    extra = extra_body_by_model or {}

    requests_by_model: dict[str, list[dict]] = {}
    for spec in models:
        label = spec.model_label
        model_id = spec.model_id

        logger.info("Sampling %d %s prompts for %s (%s)...", config.num_requests, config.dataset, label, model_id)
        prompts = sampler(num_requests=config.num_requests, seed=config.seed)

        system_msgs: list[ChatCompletionMessageParam] = []
        if config.system_prompt:
            system_msgs.append(ChatCompletionSystemMessageParam(role="system", content=config.system_prompt))

        template = CompletionCreateParamsStreaming(
            model=model_id,
            messages=system_msgs,
            max_completion_tokens=config.max_completion_tokens,
            stream=True,
            stream_options={"include_usage": True, "continuous_usage_stats": True},
        )
        if label in extra:
            template.update(extra[label])

        reqs: list[dict] = []
        for prompt in prompts:
            request = dict(template)
            request["messages"] = list(template["messages"]) + _prompt_to_messages(prompt)
            reqs.append(request)
        requests_by_model[label] = reqs

    return cls(requests_by_model)

save(out_dir)

Write per-model JSONL files to out_dir.

Parameters:

Name Type Description Default
out_dir Path

Output directory. Created if it doesn't exist.

required
Source code in openg2g/datacenter/workloads/inference.py
def save(self, out_dir: Path) -> None:
    """Write per-model JSONL files to `out_dir`.

    Args:
        out_dir: Output directory. Created if it doesn't exist.
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    for label, reqs in self.requests_by_model.items():
        out_path = out_dir / f"{label}.jsonl"
        with open(out_path, "w") as f:
            for req in reqs:
                f.write(json.dumps(req) + "\n")
        logger.info("Wrote %d requests for %s to %s", len(reqs), label, out_path)

load(out_dir) classmethod

Load per-model JSONL files from out_dir.

Parameters:

Name Type Description Default
out_dir Path

Directory containing {model_label}.jsonl files.

required
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def load(cls, out_dir: Path) -> RequestStore:
    """Load per-model JSONL files from `out_dir`.

    Args:
        out_dir: Directory containing `{model_label}.jsonl` files.
    """
    requests_by_model: dict[str, list[dict]] = {}
    out_dir = Path(out_dir)
    for path in sorted(out_dir.glob("*.jsonl")):
        label = path.stem
        reqs: list[dict] = []
        with open(path) as f:
            for line in f:
                line = line.strip()
                if line:
                    reqs.append(json.loads(line))
        requests_by_model[label] = reqs
        logger.info("Loaded %d requests for %s", len(reqs), label)
    return cls(requests_by_model)

ensure(out_dir, models=None, config=None, *, extra_body_by_model=None) classmethod

Load request files from out_dir, generating first if needed.

Parameters:

Name Type Description Default
out_dir Path

Directory for JSONL files.

required
models Sequence[InferenceModelSpec] | None

Required if request files don't exist yet.

None
config RequestsConfig | None

Request generation config. Uses defaults if None.

None
extra_body_by_model dict[str, dict] | None

Optional per-model extra fields for request generation. Keyed by model_label.

None
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def ensure(
    cls,
    out_dir: Path,
    models: Sequence[InferenceModelSpec] | None = None,
    config: RequestsConfig | None = None,
    *,
    extra_body_by_model: dict[str, dict] | None = None,
) -> RequestStore:
    """Load request files from `out_dir`, generating first if needed.

    Args:
        out_dir: Directory for JSONL files.
        models: Required if request files don't exist yet.
        config: Request generation config. Uses defaults if `None`.
        extra_body_by_model: Optional per-model extra fields for
            request generation. Keyed by `model_label`.
    """
    out_dir = Path(out_dir)
    if not out_dir.exists():
        if models is None:
            raise ValueError("models required (no cached request data)")
        logger.info("Generating request files to %s ...", out_dir)
        cls.generate(models, config, extra_body_by_model=extra_body_by_model).save(out_dir)
    return cls.load(out_dir)

openg2g.datacenter.workloads.training

Training workload: typed trace data and periodic overlay evaluation.

TrainingTraceParams

Bases: BaseModel

Parameters for synthetic training-like power trace generation.

Attributes:

Name Type Description
duration_s float

Total duration (seconds).

dt_s float

Timestep (seconds).

seed int

Random seed.

P_hi float

High plateau power (W).

P_lo float

Low plateau power (W).

sigma_hi float

Noise std in high plateaus (W).

sigma_lo float

Noise std in low plateaus (W).

seg_lo_range tuple[float, float]

Duration range for low segments (seconds).

seg_hi_range tuple[float, float]

Duration range for high segments (seconds).

dip_prob_per_sec float

Expected brief dips per second.

dip_depth_range tuple[float, float]

Depth range for brief dips (W below current level).

dip_dur_range tuple[float, float]

Duration range for brief dips (seconds).

smooth_window_s float

Smoothing window width (seconds).

ramp_s float

Initial warm-up ramp duration (seconds).

ramp_from float

Power at ramp start (W).

Source code in openg2g/datacenter/workloads/training.py
class TrainingTraceParams(BaseModel):
    """Parameters for synthetic training-like power trace generation.

    Attributes:
        duration_s: Total duration (seconds).
        dt_s: Timestep (seconds).
        seed: Random seed.
        P_hi: High plateau power (W).
        P_lo: Low plateau power (W).
        sigma_hi: Noise std in high plateaus (W).
        sigma_lo: Noise std in low plateaus (W).
        seg_lo_range: Duration range for low segments (seconds).
        seg_hi_range: Duration range for high segments (seconds).
        dip_prob_per_sec: Expected brief dips per second.
        dip_depth_range: Depth range for brief dips (W below current level).
        dip_dur_range: Duration range for brief dips (seconds).
        smooth_window_s: Smoothing window width (seconds).
        ramp_s: Initial warm-up ramp duration (seconds).
        ramp_from: Power at ramp start (W).
    """

    model_config = ConfigDict(frozen=True)

    duration_s: float = 1000.0
    dt_s: float = 0.1
    seed: int = 2
    P_hi: float = 225.0
    P_lo: float = 175.0
    sigma_hi: float = 50.0
    sigma_lo: float = 50.0
    seg_lo_range: tuple[float, float] = (10.0, 15.0)
    seg_hi_range: tuple[float, float] = (35.0, 40.0)
    dip_prob_per_sec: float = 0.010
    dip_depth_range: tuple[float, float] = (120.0, 125.0)
    dip_dur_range: tuple[float, float] = (0.06, 0.14)
    smooth_window_s: float = 0.30
    ramp_s: float = 18.0
    ramp_from: float = 50.0

TrainingTrace dataclass

A single-GPU training power trace.

Attributes:

Name Type Description
t_s ndarray

Time vector (seconds), monotonically increasing.

power_w ndarray

Power vector (watts) for one GPU, same length as t_s.

Source code in openg2g/datacenter/workloads/training.py
@dataclass(frozen=True)
class TrainingTrace:
    """A single-GPU training power trace.

    Attributes:
        t_s: Time vector (seconds), monotonically increasing.
        power_w: Power vector (watts) for one GPU, same length as `t_s`.
    """

    COL_TIME = "t_s"
    COL_POWER = "power_W"

    t_s: np.ndarray
    power_w: np.ndarray

    def __post_init__(self) -> None:
        if len(self.t_s) != len(self.power_w):
            raise ValueError(f"t_s and power_w must have the same length, got {len(self.t_s)} and {len(self.power_w)}")
        if len(self.t_s) < 2:
            raise ValueError("Training trace must have >= 2 samples.")

    @classmethod
    def generate(cls, params: TrainingTraceParams | None = None) -> TrainingTrace:
        """Generate a synthetic training-like power trace.

        Args:
            params: Generation parameters. Uses defaults if `None`.

        Returns:
            A new [`TrainingTrace`][.] with generated data.
        """
        if params is None:
            params = TrainingTraceParams()
        t, p = _generate_training_like_trace(params)
        return cls(t_s=t, power_w=p)

    def save(self, csv_path: Path) -> None:
        """Save the trace to a CSV file.

        Args:
            csv_path: Output CSV path.
        """
        csv_path = Path(csv_path)
        csv_path.parent.mkdir(parents=True, exist_ok=True)
        df = pd.DataFrame({self.COL_TIME: self.t_s, self.COL_POWER: self.power_w})
        df.to_csv(csv_path, index=False)

    @classmethod
    def load(cls, csv_path: Path) -> TrainingTrace:
        """Load a training trace from CSV.

        Args:
            csv_path: Path to CSV with columns `t_s` and `power_W`.
        """
        csv_path = Path(csv_path)
        df = pd.read_csv(csv_path)
        if cls.COL_TIME not in df.columns or cls.COL_POWER not in df.columns:
            raise ValueError(
                f"{csv_path} must have columns {cls.COL_TIME!r} and {cls.COL_POWER!r}. Got {list(df.columns)}"
            )

        t = df[cls.COL_TIME].to_numpy(float)
        p = np.clip(df[cls.COL_POWER].to_numpy(float), 0.0, None)

        if np.any(np.diff(t) < 0):
            idx = np.argsort(t)
            t, p = t[idx], p[idx]

        return cls(t_s=t, power_w=p)

    @classmethod
    def ensure(cls, csv_path: Path, params: TrainingTraceParams | None = None) -> TrainingTrace:
        """Load from `csv_path`, generating first if needed.

        Args:
            csv_path: Path to the training trace CSV.
            params: Generation parameters. Required when no cached file exists.
                Uses defaults if `None` and generation is needed.
        """
        csv_path = Path(csv_path)
        if not csv_path.exists():
            logger.info("Generating training trace to %s ...", csv_path)
            cls.generate(params).save(csv_path)
        return cls.load(csv_path)

generate(params=None) classmethod

Generate a synthetic training-like power trace.

Parameters:

Name Type Description Default
params TrainingTraceParams | None

Generation parameters. Uses defaults if None.

None

Returns:

Type Description
TrainingTrace

A new TrainingTrace with generated data.

Source code in openg2g/datacenter/workloads/training.py
@classmethod
def generate(cls, params: TrainingTraceParams | None = None) -> TrainingTrace:
    """Generate a synthetic training-like power trace.

    Args:
        params: Generation parameters. Uses defaults if `None`.

    Returns:
        A new [`TrainingTrace`][.] with generated data.
    """
    if params is None:
        params = TrainingTraceParams()
    t, p = _generate_training_like_trace(params)
    return cls(t_s=t, power_w=p)

save(csv_path)

Save the trace to a CSV file.

Parameters:

Name Type Description Default
csv_path Path

Output CSV path.

required
Source code in openg2g/datacenter/workloads/training.py
def save(self, csv_path: Path) -> None:
    """Save the trace to a CSV file.

    Args:
        csv_path: Output CSV path.
    """
    csv_path = Path(csv_path)
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    df = pd.DataFrame({self.COL_TIME: self.t_s, self.COL_POWER: self.power_w})
    df.to_csv(csv_path, index=False)

load(csv_path) classmethod

Load a training trace from CSV.

Parameters:

Name Type Description Default
csv_path Path

Path to CSV with columns t_s and power_W.

required
Source code in openg2g/datacenter/workloads/training.py
@classmethod
def load(cls, csv_path: Path) -> TrainingTrace:
    """Load a training trace from CSV.

    Args:
        csv_path: Path to CSV with columns `t_s` and `power_W`.
    """
    csv_path = Path(csv_path)
    df = pd.read_csv(csv_path)
    if cls.COL_TIME not in df.columns or cls.COL_POWER not in df.columns:
        raise ValueError(
            f"{csv_path} must have columns {cls.COL_TIME!r} and {cls.COL_POWER!r}. Got {list(df.columns)}"
        )

    t = df[cls.COL_TIME].to_numpy(float)
    p = np.clip(df[cls.COL_POWER].to_numpy(float), 0.0, None)

    if np.any(np.diff(t) < 0):
        idx = np.argsort(t)
        t, p = t[idx], p[idx]

    return cls(t_s=t, power_w=p)

ensure(csv_path, params=None) classmethod

Load from csv_path, generating first if needed.

Parameters:

Name Type Description Default
csv_path Path

Path to the training trace CSV.

required
params TrainingTraceParams | None

Generation parameters. Required when no cached file exists. Uses defaults if None and generation is needed.

None
Source code in openg2g/datacenter/workloads/training.py
@classmethod
def ensure(cls, csv_path: Path, params: TrainingTraceParams | None = None) -> TrainingTrace:
    """Load from `csv_path`, generating first if needed.

    Args:
        csv_path: Path to the training trace CSV.
        params: Generation parameters. Required when no cached file exists.
            Uses defaults if `None` and generation is needed.
    """
    csv_path = Path(csv_path)
    if not csv_path.exists():
        logger.info("Generating training trace to %s ...", csv_path)
        cls.generate(params).save(csv_path)
    return cls.load(csv_path)