Skip to content

openg2g.datacenter

openg2g.datacenter.base

Abstract base class for datacenter backends and base state types.

DatacenterState dataclass

State emitted by a datacenter backend each timestep.

Contains only universally applicable fields. LLM-inference-specific fields (batch sizes, replicas, latency) live on child classes like LLMDatacenterState.

Attributes:

Name Type Description
time_s float

Simulation time in seconds.

power_w ThreePhase

Three-phase power in watts.

Source code in openg2g/datacenter/base.py
@dataclass(frozen=True)
class DatacenterState:
    """State emitted by a datacenter backend each timestep.

    Contains only universally applicable fields. LLM-inference-specific
    fields (batch sizes, replicas, latency) live on child classes like
    [`LLMDatacenterState`][..LLMDatacenterState].

    Attributes:
        time_s: Simulation time in seconds.
        power_w: Three-phase power in watts.
    """

    time_s: float
    power_w: ThreePhase

LLMDatacenterState dataclass

Bases: DatacenterState

State from a datacenter serving LLM workloads.

Extends DatacenterState with per-model batch size, replica count, and observed inter-token latency fields used by LLM controllers.

Attributes:

Name Type Description
batch_size_by_model dict[str, int]

Current batch size per model label.

active_replicas_by_model dict[str, int]

Number of active replicas per model.

observed_itl_s_by_model dict[str, float]

Observed average inter-token latency (seconds) per model. NaN if unavailable.

Source code in openg2g/datacenter/base.py
@dataclass(frozen=True)
class LLMDatacenterState(DatacenterState):
    """State from a datacenter serving LLM workloads.

    Extends [`DatacenterState`][..DatacenterState] with per-model batch
    size, replica count, and observed inter-token latency fields used
    by LLM controllers.

    Attributes:
        batch_size_by_model: Current batch size per model label.
        active_replicas_by_model: Number of active replicas per model.
        observed_itl_s_by_model: Observed average inter-token latency
            (seconds) per model. `NaN` if unavailable.
    """

    batch_size_by_model: dict[str, int] = field(default_factory=dict)
    active_replicas_by_model: dict[str, int] = field(default_factory=dict)
    observed_itl_s_by_model: dict[str, float] = field(default_factory=dict)

DatacenterBackend

Bases: Generic[DCStateT], ABC

Interface for datacenter power simulation backends.

Source code in openg2g/datacenter/base.py
class DatacenterBackend(Generic[DCStateT], ABC):
    """Interface for datacenter power simulation backends."""

    _INIT_SENTINEL = object()

    def __init__(self) -> None:
        self._state: DCStateT | None = None
        self._history: list[DCStateT] = []
        self._dc_base_init = DatacenterBackend._INIT_SENTINEL

    def _check_base_init(self) -> None:
        if getattr(self, "_dc_base_init", None) is not DatacenterBackend._INIT_SENTINEL:
            raise TypeError(f"{type(self).__name__}.__init__ must call super().__init__() ")

    @property
    @abstractmethod
    def dt_s(self) -> Fraction:
        """Native timestep as a Fraction (seconds)."""

    @final
    @property
    def state(self) -> DCStateT:
        """Latest emitted state.

        Raises:
            RuntimeError: If accessed before the first `step()` call.
        """
        self._check_base_init()
        if self._state is None:
            raise RuntimeError(f"{type(self).__name__}.state accessed before first step().")
        return self._state

    @final
    def history(self, n: int | None = None) -> list[DCStateT]:
        """Return emitted state history (all, or latest `n`)."""
        self._check_base_init()
        if n is None:
            return list(self._history)
        if n <= 0:
            return []
        return list(self._history[-int(n) :])

    @final
    def do_step(self, clock: SimulationClock, events: EventEmitter) -> DCStateT:
        """Call `step`, record the state, and return it.

        Called by the coordinator. Subclasses should not override this.
        """
        self._check_base_init()
        state = self.step(clock, events)
        self._state = state
        self._history.append(state)
        return state

    @abstractmethod
    def step(self, clock: SimulationClock, events: EventEmitter) -> DCStateT:
        """Advance one native timestep. Return state for this step."""

    @abstractmethod
    def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
        """Apply one command. Takes effect on next step() call."""

    @final
    def do_reset(self) -> None:
        """Clear history and call `reset`.

        Called by the coordinator. Subclasses should not override this.
        """
        self._check_base_init()
        self._state = None
        self._history.clear()
        self.reset()

    @abstractmethod
    def reset(self) -> None:
        """Reset simulation state to initial conditions.

        Called by the coordinator (via `do_reset`) before each
        [`start`][..start]. Must clear all simulation state: counters,
        RNG seeds, cached values. Configuration (dt_s, models,
        templates) is not affected. History is cleared automatically
        by `do_reset`.

        Abstract so every implementation explicitly enumerates its state.
        A forgotten field is a bug -- not clearing it silently corrupts
        the second run.
        """

    def start(self) -> None:
        """Acquire per-run resources (threads, solver circuits).

        Called after [`reset`][..reset], before the simulation loop.
        Override for backends that need resource acquisition (e.g.,
        [`OpenDSSGrid`][openg2g.grid.opendss.OpenDSSGrid] compiles its
        DSS circuit here). No-op by default because most offline
        components have no resources to acquire.
        """

    def stop(self) -> None:
        """Release per-run resources. Simulation state is preserved.

        Called after the simulation loop in LIFO order. Override for
        backends that acquired resources in [`start`][..start]. No-op
        by default.
        """

dt_s abstractmethod property

Native timestep as a Fraction (seconds).

state property

Latest emitted state.

Raises:

Type Description
RuntimeError

If accessed before the first step() call.

history(n=None)

Return emitted state history (all, or latest n).

Source code in openg2g/datacenter/base.py
@final
def history(self, n: int | None = None) -> list[DCStateT]:
    """Return emitted state history (all, or latest `n`)."""
    self._check_base_init()
    if n is None:
        return list(self._history)
    if n <= 0:
        return []
    return list(self._history[-int(n) :])

do_step(clock, events)

Call step, record the state, and return it.

Called by the coordinator. Subclasses should not override this.

Source code in openg2g/datacenter/base.py
@final
def do_step(self, clock: SimulationClock, events: EventEmitter) -> DCStateT:
    """Call `step`, record the state, and return it.

    Called by the coordinator. Subclasses should not override this.
    """
    self._check_base_init()
    state = self.step(clock, events)
    self._state = state
    self._history.append(state)
    return state

step(clock, events) abstractmethod

Advance one native timestep. Return state for this step.

Source code in openg2g/datacenter/base.py
@abstractmethod
def step(self, clock: SimulationClock, events: EventEmitter) -> DCStateT:
    """Advance one native timestep. Return state for this step."""

apply_control(command, events) abstractmethod

Apply one command. Takes effect on next step() call.

Source code in openg2g/datacenter/base.py
@abstractmethod
def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
    """Apply one command. Takes effect on next step() call."""

do_reset()

Clear history and call reset.

Called by the coordinator. Subclasses should not override this.

Source code in openg2g/datacenter/base.py
@final
def do_reset(self) -> None:
    """Clear history and call `reset`.

    Called by the coordinator. Subclasses should not override this.
    """
    self._check_base_init()
    self._state = None
    self._history.clear()
    self.reset()

reset() abstractmethod

Reset simulation state to initial conditions.

Called by the coordinator (via do_reset) before each start. Must clear all simulation state: counters, RNG seeds, cached values. Configuration (dt_s, models, templates) is not affected. History is cleared automatically by do_reset.

Abstract so every implementation explicitly enumerates its state. A forgotten field is a bug -- not clearing it silently corrupts the second run.

Source code in openg2g/datacenter/base.py
@abstractmethod
def reset(self) -> None:
    """Reset simulation state to initial conditions.

    Called by the coordinator (via `do_reset`) before each
    [`start`][..start]. Must clear all simulation state: counters,
    RNG seeds, cached values. Configuration (dt_s, models,
    templates) is not affected. History is cleared automatically
    by `do_reset`.

    Abstract so every implementation explicitly enumerates its state.
    A forgotten field is a bug -- not clearing it silently corrupts
    the second run.
    """

start()

Acquire per-run resources (threads, solver circuits).

Called after reset, before the simulation loop. Override for backends that need resource acquisition (e.g., OpenDSSGrid compiles its DSS circuit here). No-op by default because most offline components have no resources to acquire.

Source code in openg2g/datacenter/base.py
def start(self) -> None:
    """Acquire per-run resources (threads, solver circuits).

    Called after [`reset`][..reset], before the simulation loop.
    Override for backends that need resource acquisition (e.g.,
    [`OpenDSSGrid`][openg2g.grid.opendss.OpenDSSGrid] compiles its
    DSS circuit here). No-op by default because most offline
    components have no resources to acquire.
    """

stop()

Release per-run resources. Simulation state is preserved.

Called after the simulation loop in LIFO order. Override for backends that acquired resources in start. No-op by default.

Source code in openg2g/datacenter/base.py
def stop(self) -> None:
    """Release per-run resources. Simulation state is preserved.

    Called after the simulation loop in LIFO order. Override for
    backends that acquired resources in [`start`][..start]. No-op
    by default.
    """

LLMBatchSizeControlledDatacenter

Bases: DatacenterBackend[DCStateT]

Datacenter that serves LLM workloads and supports batch-size control.

Marker layer between DatacenterBackend and concrete implementations. Controllers that issue SetBatchSize commands or read active_replicas_by_model / observed_itl_s_by_model from state should bind their generic to this class.

Source code in openg2g/datacenter/base.py
class LLMBatchSizeControlledDatacenter(DatacenterBackend[DCStateT]):
    """Datacenter that serves LLM workloads and supports batch-size control.

    Marker layer between [`DatacenterBackend`][..DatacenterBackend] and
    concrete implementations. Controllers that issue
    [`SetBatchSize`][openg2g.datacenter.command.SetBatchSize] commands or read
    `active_replicas_by_model` / `observed_itl_s_by_model`
    from state should bind their generic to this class.
    """

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors `[frac_A, frac_B, frac_C]`.

        Returns an empty dict by default. Consumers treat missing keys
        as uniform `[1/3, 1/3, 1/3]`. Override in subclasses that know
        actual server-to-phase placement.
        """
        return {}

phase_share_by_model property

Per-model phase share vectors [frac_A, frac_B, frac_C].

Returns an empty dict by default. Consumers treat missing keys as uniform [1/3, 1/3, 1/3]. Override in subclasses that know actual server-to-phase placement.

openg2g.datacenter.command

Command types targeting datacenter backends.

DatacenterCommand

Base for commands targeting the datacenter backend.

Subclass this for each concrete datacenter command kind. The coordinator routes commands to backends based on this type hierarchy.

Source code in openg2g/datacenter/command.py
class DatacenterCommand:
    """Base for commands targeting the datacenter backend.

    Subclass this for each concrete datacenter command kind.
    The coordinator routes commands to backends based on this type hierarchy.
    """

    def __init__(self) -> None:
        if type(self) is DatacenterCommand:
            raise TypeError("DatacenterCommand cannot be instantiated directly; subclass it.")

SetBatchSize dataclass

Bases: DatacenterCommand

Set batch sizes for one or more models.

Attributes:

Name Type Description
batch_size_by_model dict[str, int]

Mapping of model label to target batch size.

ramp_up_rate_by_model dict[str, float]

Per-model requests/second ramp-up rate. Models not present get immediate changes (rate 0).

target_site_id str | None

Site this command targets. The coordinator uses this to route the command to the correct datacenter.

Source code in openg2g/datacenter/command.py
@dataclass(frozen=True)
class SetBatchSize(DatacenterCommand):
    """Set batch sizes for one or more models.

    Attributes:
        batch_size_by_model: Mapping of model label to target batch size.
        ramp_up_rate_by_model: Per-model requests/second ramp-up rate.
            Models not present get immediate changes (rate 0).
        target_site_id: Site this command targets. The coordinator uses
            this to route the command to the correct datacenter.
    """

    batch_size_by_model: dict[str, int]
    ramp_up_rate_by_model: dict[str, float] = field(default_factory=dict)
    target_site_id: str | None = None

ShiftReplicas dataclass

Bases: DatacenterCommand

Shift replicas for a model at this datacenter.

Positive replica_delta adds replicas (receiving site); negative removes them (sending site).

Attributes:

Name Type Description
model_label str

Which model to shift.

replica_delta int

Number of replicas to add (>0) or remove (<0).

target_site_id str | None

Site this command targets. The coordinator uses this to route the command to the correct datacenter.

Source code in openg2g/datacenter/command.py
@dataclass(frozen=True)
class ShiftReplicas(DatacenterCommand):
    """Shift replicas for a model at this datacenter.

    Positive ``replica_delta`` adds replicas (receiving site);
    negative removes them (sending site).

    Attributes:
        model_label: Which model to shift.
        replica_delta: Number of replicas to add (>0) or remove (<0).
        target_site_id: Site this command targets.  The coordinator uses
            this to route the command to the correct datacenter.
    """

    model_label: str
    replica_delta: int
    target_site_id: str | None = None

openg2g.datacenter.config

Datacenter facility and workload configuration.

InferenceModelSpec

Bases: BaseModel

Specification for one LLM model served in the datacenter.

This is a pure model-identity object describing what is served, not how many or at what batch size. Deployment-specific parameters (replica count, initial batch size) are specified via :class:ModelDeployment.

Attributes:

Name Type Description
model_label str

Human-readable model identifier (e.g. "Llama-3.1-70B").

model_id str

HuggingFace model ID (e.g. "meta-llama/Llama-3.1-70B-Instruct"). Used for benchmark data lookups and online API model fields.

gpus_per_replica int

GPUs allocated to each replica (determines model parallelism and per-replica power draw).

itl_deadline_s float

Per-model inter-token latency deadline for the OFO latency dual (seconds).

feasible_batch_sizes tuple[int, ...]

Allowed batch sizes. Used by the OFO controller for discretizing continuous batch-size updates and by the online datacenter for load-generator sizing.

Source code in openg2g/datacenter/config.py
class InferenceModelSpec(BaseModel):
    """Specification for one LLM model served in the datacenter.

    This is a pure model-identity object describing *what* is served, not
    *how many* or *at what batch size*.  Deployment-specific parameters
    (replica count, initial batch size) are specified via
    :class:`ModelDeployment`.

    Attributes:
        model_label: Human-readable model identifier (e.g. `"Llama-3.1-70B"`).
        model_id: HuggingFace model ID (e.g. `"meta-llama/Llama-3.1-70B-Instruct"`).
            Used for benchmark data lookups and online API model fields.
        gpus_per_replica: GPUs allocated to each replica (determines model
            parallelism and per-replica power draw).
        itl_deadline_s: Per-model inter-token latency deadline for the OFO
            latency dual (seconds).
        feasible_batch_sizes: Allowed batch sizes. Used by the OFO
            controller for discretizing continuous batch-size updates
            and by the online datacenter for load-generator sizing.
    """

    model_config = ConfigDict(frozen=True)

    model_label: str
    model_id: str = ""
    gpus_per_replica: int
    itl_deadline_s: float
    feasible_batch_sizes: tuple[int, ...]

    @model_validator(mode="after")
    def _validate(self) -> InferenceModelSpec:
        if self.gpus_per_replica < 1:
            raise ValueError(f"gpus_per_replica must be >= 1, got {self.gpus_per_replica}.")
        if self.itl_deadline_s <= 0:
            raise ValueError(f"itl_deadline_s must be > 0, got {self.itl_deadline_s}.")
        if not self.feasible_batch_sizes:
            raise ValueError("feasible_batch_sizes must not be empty.")
        return self

ModelDeployment dataclass

One model's deployment at a datacenter site.

Pairs an :class:InferenceModelSpec (model identity) with deployment-specific parameters.

Attributes:

Name Type Description
spec InferenceModelSpec

The model specification.

num_replicas int

Number of replicas deployed at this site.

initial_batch_size int

Starting batch size for this deployment. Must be in spec.feasible_batch_sizes.

Source code in openg2g/datacenter/config.py
@dataclass(frozen=True)
class ModelDeployment:
    """One model's deployment at a datacenter site.

    Pairs an :class:`InferenceModelSpec` (model identity) with
    deployment-specific parameters.

    Attributes:
        spec: The model specification.
        num_replicas: Number of replicas deployed at this site.
        initial_batch_size: Starting batch size for this deployment.
            Must be in ``spec.feasible_batch_sizes``.
    """

    spec: InferenceModelSpec
    num_replicas: int
    initial_batch_size: int

    def __post_init__(self) -> None:
        if self.num_replicas < 0:
            raise ValueError(f"num_replicas must be >= 0, got {self.num_replicas}.")
        if self.initial_batch_size <= 0:
            raise ValueError(f"initial_batch_size must be > 0, got {self.initial_batch_size}.")
        if self.initial_batch_size not in self.spec.feasible_batch_sizes:
            raise ValueError(
                f"initial_batch_size ({self.initial_batch_size}) must be in "
                f"feasible_batch_sizes ({self.spec.feasible_batch_sizes})."
            )

TrainingRun

Training workload parameters.

The trace is eagerly rescaled so its peak matches target_peak_W_per_gpu. Use eval_power to evaluate total training power at a given simulation time.

Combine with at and | to build a TrainingSchedule:

schedule = (
    TrainingRun(n_gpus=2400, trace=trace_a).at(t_start=1000, t_end=2000)
    | TrainingRun(n_gpus=1200, trace=trace_b).at(t_start=2500, t_end=3500)
)

Attributes:

Name Type Description
n_gpus

Number of GPUs running the training workload.

trace

Single-GPU TrainingTrace.

target_peak_W_per_gpu

The trace is rescaled so its peak equals this value.

Source code in openg2g/datacenter/config.py
class TrainingRun:
    """Training workload parameters.

    The trace is eagerly rescaled so its peak matches `target_peak_W_per_gpu`.
    Use `eval_power` to evaluate total training power at a given simulation time.

    Combine with [`at`][.at] and `|` to build a [`TrainingSchedule`][..TrainingSchedule]:

    ```python
    schedule = (
        TrainingRun(n_gpus=2400, trace=trace_a).at(t_start=1000, t_end=2000)
        | TrainingRun(n_gpus=1200, trace=trace_b).at(t_start=2500, t_end=3500)
    )
    ```

    Attributes:
        n_gpus: Number of GPUs running the training workload.
        trace: Single-GPU [`TrainingTrace`][openg2g.datacenter.workloads.training.TrainingTrace].
        target_peak_W_per_gpu: The trace is rescaled so its peak equals this value.
    """

    __slots__ = ("_period", "_rescaled_power", "_trace_time", "n_gpus", "target_peak_W_per_gpu", "trace")

    def __init__(self, *, n_gpus: int, trace: TrainingTrace, target_peak_W_per_gpu: float = 400.0) -> None:
        if n_gpus <= 0:
            raise ValueError(f"TrainingRun n_gpus must be > 0, got {n_gpus}.")
        self.n_gpus = n_gpus
        self.trace = trace
        self.target_peak_W_per_gpu = target_peak_W_per_gpu

        t = np.asarray(trace.t_s, float)
        p = np.asarray(trace.power_w, float)
        t = t - t[0]
        period = float(t[-1] - t[0])
        if period <= 0:
            raise ValueError("Training trace time span must be positive.")
        peak = float(np.max(p))
        if peak <= 0:
            raise ValueError("Training trace has non-positive peak; cannot scale.")
        self._rescaled_power = p * (target_peak_W_per_gpu / peak)
        self._trace_time = t
        self._period = period

    def eval_power(self, t: float, t_start: float, t_end: float) -> float:
        """Evaluate total training power at simulation time `t`.

        Returns zero if `t` is outside `[t_start, t_end]`.

        Args:
            t: Global simulation time (seconds).
            t_start: Time when training becomes active (seconds).
            t_end: Time when training stops (seconds).

        Returns:
            Total training power (W) across all `n_gpus` GPUs.
        """
        if t < t_start or t > t_end:
            return 0.0
        t_local = t - t_start
        t_mod = t_local % self._period
        p_1gpu = float(np.interp(t_mod, self._trace_time, self._rescaled_power))
        return p_1gpu * self.n_gpus

    def at(self, t_start: float, t_end: float) -> TrainingSchedule:
        """Schedule this training run over `[t_start, t_end]`.

        Args:
            t_start: Global simulation time when training becomes active (seconds).
            t_end: Global simulation time when training stops (seconds).

        Returns:
            A single-entry [`TrainingSchedule`][...TrainingSchedule].
        """
        if t_end < t_start:
            raise ValueError(f"t_end ({t_end}) must be >= t_start ({t_start}).")
        return TrainingSchedule(((self, float(t_start), float(t_end)),))

eval_power(t, t_start, t_end)

Evaluate total training power at simulation time t.

Returns zero if t is outside [t_start, t_end].

Parameters:

Name Type Description Default
t float

Global simulation time (seconds).

required
t_start float

Time when training becomes active (seconds).

required
t_end float

Time when training stops (seconds).

required

Returns:

Type Description
float

Total training power (W) across all n_gpus GPUs.

Source code in openg2g/datacenter/config.py
def eval_power(self, t: float, t_start: float, t_end: float) -> float:
    """Evaluate total training power at simulation time `t`.

    Returns zero if `t` is outside `[t_start, t_end]`.

    Args:
        t: Global simulation time (seconds).
        t_start: Time when training becomes active (seconds).
        t_end: Time when training stops (seconds).

    Returns:
        Total training power (W) across all `n_gpus` GPUs.
    """
    if t < t_start or t > t_end:
        return 0.0
    t_local = t - t_start
    t_mod = t_local % self._period
    p_1gpu = float(np.interp(t_mod, self._trace_time, self._rescaled_power))
    return p_1gpu * self.n_gpus

at(t_start, t_end)

Schedule this training run over [t_start, t_end].

Parameters:

Name Type Description Default
t_start float

Global simulation time when training becomes active (seconds).

required
t_end float

Global simulation time when training stops (seconds).

required

Returns:

Type Description
TrainingSchedule

A single-entry TrainingSchedule.

Source code in openg2g/datacenter/config.py
def at(self, t_start: float, t_end: float) -> TrainingSchedule:
    """Schedule this training run over `[t_start, t_end]`.

    Args:
        t_start: Global simulation time when training becomes active (seconds).
        t_end: Global simulation time when training stops (seconds).

    Returns:
        A single-entry [`TrainingSchedule`][...TrainingSchedule].
    """
    if t_end < t_start:
        raise ValueError(f"t_end ({t_end}) must be >= t_start ({t_start}).")
    return TrainingSchedule(((self, float(t_start), float(t_end)),))

TrainingSchedule

Ordered collection of TrainingRun objects scheduled over time windows.

Each entry is a (TrainingRun, t_start, t_end) tuple. Entries are sorted by t_start.

Built with TrainingRun.at and |.

Example:

schedule = (
    TrainingRun(n_gpus=2400, trace=trace_a).at(t_start=1000, t_end=2000)
    | TrainingRun(n_gpus=1200, trace=trace_b).at(t_start=2500, t_end=3500)
)
Source code in openg2g/datacenter/config.py
class TrainingSchedule:
    """Ordered collection of [`TrainingRun`][..TrainingRun] objects scheduled
    over time windows.

    Each entry is a `(TrainingRun, t_start, t_end)` tuple. Entries are
    sorted by `t_start`.

    Built with [`TrainingRun.at`][..TrainingRun.at] and `|`.

    Example:

    ```python
    schedule = (
        TrainingRun(n_gpus=2400, trace=trace_a).at(t_start=1000, t_end=2000)
        | TrainingRun(n_gpus=1200, trace=trace_b).at(t_start=2500, t_end=3500)
    )
    ```
    """

    __slots__ = ("_entries",)

    def __init__(self, entries: tuple[tuple[TrainingRun, float, float], ...] = ()) -> None:
        self._entries = tuple(sorted(entries, key=lambda e: e[1]))

    def __or__(self, other: TrainingSchedule) -> TrainingSchedule:
        return TrainingSchedule((*self._entries, *other._entries))

    def __iter__(self) -> Iterator[tuple[TrainingRun, float, float]]:
        return iter(self._entries)

    def __len__(self) -> int:
        return len(self._entries)

    def __bool__(self) -> bool:
        return bool(self._entries)

    def __repr__(self) -> str:
        parts = [f"TrainingRun(n_gpus={r.n_gpus}).at(t_start={s}, t_end={e})" for r, s, e in self._entries]
        return " | ".join(parts)

InferenceRamp dataclass

Inference server ramp parameters.

Transitions the active replica count for a specific model to target. Combine with at and | to build an InferenceRampSchedule:

ramps = (
    InferenceRamp(target=144, model="Llama-3.1-8B").at(t_start=2500, t_end=3000)
    | InferenceRamp(target=864, model="Llama-3.1-8B").at(t_start=3200, t_end=3400)
)

Attributes:

Name Type Description
target int

Target number of active replicas after the ramp completes.

model str

Model label this ramp applies to.

Source code in openg2g/datacenter/config.py
@dataclass(frozen=True)
class InferenceRamp:
    """Inference server ramp parameters.

    Transitions the active replica count for a specific model to `target`.
    Combine with [`at`][.at] and `|` to build an
    [`InferenceRampSchedule`][..InferenceRampSchedule]:

    ```python
    ramps = (
        InferenceRamp(target=144, model="Llama-3.1-8B").at(t_start=2500, t_end=3000)
        | InferenceRamp(target=864, model="Llama-3.1-8B").at(t_start=3200, t_end=3400)
    )
    ```

    Attributes:
        target: Target number of active replicas after the ramp completes.
        model: Model label this ramp applies to.
    """

    target: int
    model: str

    def __post_init__(self) -> None:
        if self.target < 0:
            raise ValueError(f"InferenceRamp target must be >= 0, got {self.target}.")

    def at(self, t_start: float, t_end: float) -> InferenceRampSchedule:
        """Schedule this ramp over `[t_start, t_end]`.

        Args:
            t_start: Global simulation time when the ramp begins (seconds).
            t_end: Global simulation time when the ramp ends (seconds).

        Returns:
            A single-entry [`InferenceRampSchedule`][...InferenceRampSchedule].
        """
        if t_end < t_start:
            raise ValueError(f"t_end ({t_end}) must be >= t_start ({t_start}).")
        return InferenceRampSchedule(((self, float(t_start), float(t_end)),))

at(t_start, t_end)

Schedule this ramp over [t_start, t_end].

Parameters:

Name Type Description Default
t_start float

Global simulation time when the ramp begins (seconds).

required
t_end float

Global simulation time when the ramp ends (seconds).

required

Returns:

Type Description
InferenceRampSchedule

A single-entry InferenceRampSchedule.

Source code in openg2g/datacenter/config.py
def at(self, t_start: float, t_end: float) -> InferenceRampSchedule:
    """Schedule this ramp over `[t_start, t_end]`.

    Args:
        t_start: Global simulation time when the ramp begins (seconds).
        t_end: Global simulation time when the ramp ends (seconds).

    Returns:
        A single-entry [`InferenceRampSchedule`][...InferenceRampSchedule].
    """
    if t_end < t_start:
        raise ValueError(f"t_end ({t_end}) must be >= t_start ({t_start}).")
    return InferenceRampSchedule(((self, float(t_start), float(t_end)),))

InferenceRampSchedule

Ordered collection of InferenceRamp events for a single model.

Each entry is an (InferenceRamp, t_start, t_end) tuple. Entries are sorted by t_start.

Built with InferenceRamp.at and |.

Semantics: before the first ramp, the active count equals initial_count. During each [t_start, t_end] window the count linearly interpolates from the previous level to target. Between ramps, the count holds at the last target.

An empty schedule means initial_count replicas are active at all times.

Example:

ramps = (
    InferenceRamp(target=144, model="Llama-3.1-8B").at(t_start=2500, t_end=3000)
    | InferenceRamp(target=720, model="Llama-3.1-8B").at(t_start=3200, t_end=3400)
)
Source code in openg2g/datacenter/config.py
class InferenceRampSchedule:
    """Ordered collection of [`InferenceRamp`][..InferenceRamp] events for
    a single model.

    Each entry is an `(InferenceRamp, t_start, t_end)` tuple. Entries are
    sorted by `t_start`.

    Built with [`InferenceRamp.at`][..InferenceRamp.at] and `|`.

    Semantics: before the first ramp, the active count equals
    ``initial_count``.  During each `[t_start, t_end]` window the count
    linearly interpolates from the previous level to ``target``.  Between
    ramps, the count holds at the last target.

    An empty schedule means ``initial_count`` replicas are active at all
    times.

    Example:

    ```python
    ramps = (
        InferenceRamp(target=144, model="Llama-3.1-8B").at(t_start=2500, t_end=3000)
        | InferenceRamp(target=720, model="Llama-3.1-8B").at(t_start=3200, t_end=3400)
    )
    ```
    """

    __slots__ = ("_entries", "_initial_count")

    def __init__(
        self,
        entries: tuple[tuple[InferenceRamp, float, float], ...] = (),
        *,
        initial_count: int = 0,
    ) -> None:
        self._entries = tuple(sorted(entries, key=lambda e: e[1]))
        self._initial_count = initial_count

    @property
    def initial_count(self) -> int:
        """Replica count before any ramp event."""
        return self._initial_count

    def __or__(self, other: InferenceRampSchedule) -> InferenceRampSchedule:
        # Preserve initial_count from left operand (the one built first).
        return InferenceRampSchedule(
            (*self._entries, *other._entries),
            initial_count=self._initial_count,
        )

    def __iter__(self) -> Iterator[tuple[InferenceRamp, float, float]]:
        return iter(self._entries)

    def __len__(self) -> int:
        return len(self._entries)

    def __bool__(self) -> bool:
        return bool(self._entries)

    def for_model(self, model_label: str, *, initial_count: int | None = None) -> InferenceRampSchedule:
        """Return a schedule containing only entries for *model_label*.

        Args:
            model_label: Model to filter for.
            initial_count: Override the initial replica count for this
                per-model schedule.  If ``None``, inherits from ``self``.
        """
        filtered = tuple(e for e in self._entries if e[0].model == model_label)
        ic = initial_count if initial_count is not None else self._initial_count
        return InferenceRampSchedule(filtered, initial_count=ic)

    def max_count(self) -> int:
        """Return the maximum target across all entries, or ``initial_count`` if empty."""
        if not self._entries:
            return self._initial_count
        return max(self._initial_count, *(r.target for r, _, _ in self._entries))

    def __repr__(self) -> str:
        parts = []
        for r, s, e in self._entries:
            parts.append(f"InferenceRamp(target={r.target}, model={r.model!r}).at(t_start={s}, t_end={e})")
        prefix = f"InferenceRampSchedule(initial_count={self._initial_count}): "
        return prefix + (" | ".join(parts) if parts else "(empty)")

    def count_at(self, t: float | np.ndarray) -> float | np.ndarray:
        """Evaluate the active replica count at time(s) *t*.

        Piecewise-linear interpolation between ramp events.
        Before the first ramp, returns ``initial_count``.

        Args:
            t: Scalar or array of global simulation times (seconds).

        Returns:
            Active replica count(s), same shape as *t*.
        """
        if isinstance(t, np.ndarray):
            return self._count_array(t)
        return float(self._count_scalar(float(t)))

    def _count_scalar(self, t: float) -> float:
        level = float(self._initial_count)
        for ramp, t_start, t_end in self._entries:
            if t < t_start:
                return level
            if t <= t_end:
                if t_end == t_start:
                    return float(ramp.target)
                alpha = (t - t_start) / (t_end - t_start)
                return level + (float(ramp.target) - level) * alpha
            level = float(ramp.target)
        return level

    def _count_array(self, t: np.ndarray) -> np.ndarray:
        vfunc = np.vectorize(self._count_scalar, otypes=[float])
        return vfunc(t)

initial_count property

Replica count before any ramp event.

for_model(model_label, *, initial_count=None)

Return a schedule containing only entries for model_label.

Parameters:

Name Type Description Default
model_label str

Model to filter for.

required
initial_count int | None

Override the initial replica count for this per-model schedule. If None, inherits from self.

None
Source code in openg2g/datacenter/config.py
def for_model(self, model_label: str, *, initial_count: int | None = None) -> InferenceRampSchedule:
    """Return a schedule containing only entries for *model_label*.

    Args:
        model_label: Model to filter for.
        initial_count: Override the initial replica count for this
            per-model schedule.  If ``None``, inherits from ``self``.
    """
    filtered = tuple(e for e in self._entries if e[0].model == model_label)
    ic = initial_count if initial_count is not None else self._initial_count
    return InferenceRampSchedule(filtered, initial_count=ic)

max_count()

Return the maximum target across all entries, or initial_count if empty.

Source code in openg2g/datacenter/config.py
def max_count(self) -> int:
    """Return the maximum target across all entries, or ``initial_count`` if empty."""
    if not self._entries:
        return self._initial_count
    return max(self._initial_count, *(r.target for r, _, _ in self._entries))

count_at(t)

Evaluate the active replica count at time(s) t.

Piecewise-linear interpolation between ramp events. Before the first ramp, returns initial_count.

Parameters:

Name Type Description Default
t float | ndarray

Scalar or array of global simulation times (seconds).

required

Returns:

Type Description
float | ndarray

Active replica count(s), same shape as t.

Source code in openg2g/datacenter/config.py
def count_at(self, t: float | np.ndarray) -> float | np.ndarray:
    """Evaluate the active replica count at time(s) *t*.

    Piecewise-linear interpolation between ramp events.
    Before the first ramp, returns ``initial_count``.

    Args:
        t: Scalar or array of global simulation times (seconds).

    Returns:
        Active replica count(s), same shape as *t*.
    """
    if isinstance(t, np.ndarray):
        return self._count_array(t)
    return float(self._count_scalar(float(t)))

DatacenterConfig

Bases: BaseModel

Physical datacenter facility configuration.

Attributes:

Name Type Description
gpus_per_server int

Number of GPUs per physical server rack.

base_kw_per_phase float

Constant base load per phase (kW).

power_factor float

Power factor of the datacenter loads (lagging).

Source code in openg2g/datacenter/config.py
class DatacenterConfig(BaseModel):
    """Physical datacenter facility configuration.

    Attributes:
        gpus_per_server: Number of GPUs per physical server rack.
        base_kw_per_phase: Constant base load per phase (kW).
        power_factor: Power factor of the datacenter loads (lagging).
    """

    model_config = ConfigDict(frozen=True)

    gpus_per_server: int = 8
    base_kw_per_phase: float = 0.0
    power_factor: float = 0.95

    @model_validator(mode="after")
    def _validate(self) -> DatacenterConfig:
        if self.gpus_per_server < 1:
            raise ValueError(f"gpus_per_server must be >= 1, got {self.gpus_per_server}.")
        if not (0.0 < self.power_factor <= 1.0):
            raise ValueError(f"power_factor must be in (0, 1], got {self.power_factor}.")
        return self

PowerAugmentationConfig

Bases: BaseModel

Power augmentation settings for virtual server scaling.

Controls per-server amplitude jitter and additive noise applied during power augmentation.

Attributes:

Name Type Description
amplitude_scale_range tuple[float, float]

(low, high) range for per-server amplitude scaling. Each virtual server draws a uniform multiplier from this range.

noise_fraction float

Gaussian noise standard deviation as a fraction of per-server power.

Source code in openg2g/datacenter/config.py
class PowerAugmentationConfig(BaseModel):
    """Power augmentation settings for virtual server scaling.

    Controls per-server amplitude jitter and additive noise applied during
    power augmentation.

    Attributes:
        amplitude_scale_range: `(low, high)` range for per-server amplitude
            scaling. Each virtual server draws a uniform multiplier from this range.
        noise_fraction: Gaussian noise standard deviation as a fraction of
            per-server power.
    """

    model_config = ConfigDict(frozen=True)

    amplitude_scale_range: tuple[float, float] = (1.0, 1.0)
    noise_fraction: float = 0.0

openg2g.datacenter.layout

Server layout and activation policy primitives.

Provides the topology and activation-policy building blocks used by datacenter backends. Power augmentation (scaling per-GPU power to three-phase datacenter power) lives in openg2g.datacenter.workloads.inference.

ActivationPolicy

Bases: ABC

Per-model activation policy that answers "which servers are active?"

Subclass to implement custom activation logic. The datacenter creates one policy per model and passes it to InferencePowerAugmenter.

Source code in openg2g/datacenter/layout.py
class ActivationPolicy(ABC):
    """Per-model activation policy that answers "which servers are active?"

    Subclass to implement custom activation logic. The datacenter creates
    one policy per model and passes it to
    [`InferencePowerAugmenter`][openg2g.datacenter.workloads.inference.InferencePowerAugmenter].
    """

    @abstractmethod
    def active_mask(self, t: float) -> np.ndarray:
        """Boolean mask of active servers at time *t*.

        Returns:
            Array of shape `(num_servers,)` with `True` for active servers.
        """

    def active_indices(self, t: float) -> np.ndarray:
        """Indices of active servers at time *t*.

        The default implementation returns indices in ascending order
        via `np.where(`[`active_mask`][..active_mask]`(t))`. Subclasses
        may override to return
        indices in a specific order (e.g., priority order) to control
        floating-point summation order in the datacenter.

        Returns:
            1-D int array of active server indices.
        """
        return np.where(self.active_mask(t))[0]

active_mask(t) abstractmethod

Boolean mask of active servers at time t.

Returns:

Type Description
ndarray

Array of shape (num_servers,) with True for active servers.

Source code in openg2g/datacenter/layout.py
@abstractmethod
def active_mask(self, t: float) -> np.ndarray:
    """Boolean mask of active servers at time *t*.

    Returns:
        Array of shape `(num_servers,)` with `True` for active servers.
    """

active_indices(t)

Indices of active servers at time t.

The default implementation returns indices in ascending order via np.where(active_mask(t)). Subclasses may override to return indices in a specific order (e.g., priority order) to control floating-point summation order in the datacenter.

Returns:

Type Description
ndarray

1-D int array of active server indices.

Source code in openg2g/datacenter/layout.py
def active_indices(self, t: float) -> np.ndarray:
    """Indices of active servers at time *t*.

    The default implementation returns indices in ascending order
    via `np.where(`[`active_mask`][..active_mask]`(t))`. Subclasses
    may override to return
    indices in a specific order (e.g., priority order) to control
    floating-point summation order in the datacenter.

    Returns:
        1-D int array of active server indices.
    """
    return np.where(self.active_mask(t))[0]

RampActivationPolicy

Bases: ActivationPolicy

Activate servers by fixed random priority, following an InferenceRampSchedule.

At time t, the top-k servers (by random priority) are active, where k is derived from the schedule's absolute replica count and the model's GPU requirements.

This is the default policy used by OfflineDatacenter.

Parameters:

Name Type Description Default
schedule InferenceRampSchedule

Per-model ramp schedule (absolute replica counts).

required
num_servers int

Total allocated servers (may exceed baseline when ramp targets exceed the initial replica count).

required
rng Generator

RNG for randomizing priority ordering. Consumed once at construction time.

required
gpus_per_replica int

GPUs required per model replica.

required
gpus_per_server int

GPUs per physical server.

required
Source code in openg2g/datacenter/layout.py
class RampActivationPolicy(ActivationPolicy):
    """Activate servers by fixed random priority, following an
    [`InferenceRampSchedule`][openg2g.datacenter.config.InferenceRampSchedule].

    At time *t*, the top-*k* servers (by random priority) are active,
    where *k* is derived from the schedule's absolute replica count and
    the model's GPU requirements.

    This is the default policy used by
    [`OfflineDatacenter`][openg2g.datacenter.offline.OfflineDatacenter].

    Args:
        schedule: Per-model ramp schedule (absolute replica counts).
        num_servers: Total allocated servers (may exceed baseline when
            ramp targets exceed the initial replica count).
        rng: RNG for randomizing priority ordering. Consumed once at
            construction time.
        gpus_per_replica: GPUs required per model replica.
        gpus_per_server: GPUs per physical server.
    """

    __slots__ = ("_gpus_per_replica", "_gpus_per_server", "_n", "_priority", "_replica_offset", "_schedule")

    def __init__(
        self,
        schedule: InferenceRampSchedule,
        num_servers: int,
        rng: np.random.Generator,
        *,
        gpus_per_replica: int,
        gpus_per_server: int,
    ) -> None:
        self._schedule = schedule
        self._n = num_servers
        self._gpus_per_replica = gpus_per_replica
        self._gpus_per_server = gpus_per_server
        self._replica_offset: int = 0
        priority = np.arange(num_servers, dtype=int)
        rng.shuffle(priority)
        self._priority = priority

    def _servers_for_count(self, replica_count: float) -> int:
        """Convert a replica count to a server count."""
        import math

        gpus_needed = max(0.0, replica_count + self._replica_offset) * self._gpus_per_replica
        return max(0, min(self._n, math.ceil(gpus_needed / self._gpus_per_server)))

    def active_mask(self, t: float) -> np.ndarray:
        count = self._schedule.count_at(t)
        k = self._servers_for_count(float(count))
        mask = np.zeros(self._n, dtype=bool)
        mask[self._priority[:k]] = True
        return mask

    def active_indices(self, t: float) -> np.ndarray:
        """Return active server indices in priority order."""
        count = self._schedule.count_at(t)
        k = self._servers_for_count(float(count))
        return self._priority[:k].copy()

active_indices(t)

Return active server indices in priority order.

Source code in openg2g/datacenter/layout.py
def active_indices(self, t: float) -> np.ndarray:
    """Return active server indices in priority order."""
    count = self._schedule.count_at(t)
    k = self._servers_for_count(float(count))
    return self._priority[:k].copy()

ServerLayout dataclass

Per-model server layout describing how GPUs are organized.

This describes the physical topology only. Activation policies (which servers are on/off at a given time) are managed separately by the datacenter and passed to InferencePowerAugmenter alongside layouts.

Attributes:

Name Type Description
num_servers int

Number of physical servers for this model.

total_gpus int

Total GPU count across all servers.

gpus_per_replica int

GPUs per model replica.

gpus_per_server_list ndarray

GPU count per server (last may be partial).

phase_list ndarray

Phase assignment per server (0=A, 1=B, 2=C).

stagger_offsets ndarray

Per-server offsets for desynchronization. In offline mode these are integer indices into a power template; in online mode they can be float time offsets into a rolling buffer.

amplitude_scales ndarray

Per-server power multiplier for inter-server variation.

noise_fraction float

Gaussian noise standard deviation as a fraction of per-server power.

Source code in openg2g/datacenter/layout.py
@dataclass
class ServerLayout:
    """Per-model server layout describing how GPUs are organized.

    This describes the physical topology only. Activation policies (which
    servers are on/off at a given time) are managed separately by the
    datacenter and passed to
    [`InferencePowerAugmenter`][openg2g.datacenter.workloads.inference.InferencePowerAugmenter]
    alongside layouts.

    Attributes:
        num_servers: Number of physical servers for this model.
        total_gpus: Total GPU count across all servers.
        gpus_per_replica: GPUs per model replica.
        gpus_per_server_list: GPU count per server (last may be partial).
        phase_list: Phase assignment per server (0=A, 1=B, 2=C).
        stagger_offsets: Per-server offsets for desynchronization. In offline
            mode these are integer indices into a power template; in online
            mode they can be float time offsets into a rolling buffer.
        amplitude_scales: Per-server power multiplier for inter-server variation.
        noise_fraction: Gaussian noise standard deviation as a fraction of
            per-server power.
    """

    num_servers: int
    total_gpus: int
    gpus_per_replica: int
    gpus_per_server_list: np.ndarray
    phase_list: np.ndarray
    stagger_offsets: np.ndarray
    amplitude_scales: np.ndarray
    noise_fraction: float

openg2g.datacenter.offline

Offline (trace-based) datacenter backend.

OfflineDatacenterState dataclass

Bases: LLMDatacenterState

Extended state from the offline (trace-based) backend.

Adds per-model power breakdown to LLMDatacenterState.

Source code in openg2g/datacenter/offline.py
@dataclass(frozen=True)
class OfflineDatacenterState(LLMDatacenterState):
    """Extended state from the offline (trace-based) backend.

    Adds per-model power breakdown to
    [`LLMDatacenterState`][openg2g.datacenter.base.LLMDatacenterState].
    """

    power_by_model_w: dict[str, float] = field(default_factory=dict)

OfflineWorkload dataclass

Complete offline simulation workload.

Bundles inference data with replica counts, optional training overlays, and inference server ramp events.

Attributes:

Name Type Description
inference_data InferenceData

LLM inference workload with offline simulation data (model specs, power templates, ITL fits).

replica_counts dict[str, int]

Mapping of model label to initial replica count at this site.

inference_ramps InferenceRampSchedule

Inference server ramp schedule. An empty schedule keeps all servers active at their initial replica counts.

training TrainingSchedule

Training workload schedule. An empty schedule disables training overlay.

Source code in openg2g/datacenter/offline.py
@dataclass
class OfflineWorkload:
    """Complete offline simulation workload.

    Bundles inference data with replica counts, optional training overlays,
    and inference server ramp events.

    Attributes:
        inference_data: LLM inference workload with offline simulation
            data (model specs, power templates, ITL fits).
        replica_counts: Mapping of model label to initial replica count
            at this site.
        inference_ramps: Inference server ramp schedule. An empty schedule
            keeps all servers active at their initial replica counts.
        training: Training workload schedule. An empty schedule disables
            training overlay.
    """

    inference_data: InferenceData
    replica_counts: dict[str, int] = field(default_factory=dict)
    initial_batch_sizes: dict[str, int] = field(default_factory=dict)
    inference_ramps: InferenceRampSchedule = field(default_factory=InferenceRampSchedule)
    training: TrainingSchedule = field(default_factory=TrainingSchedule)

OfflineDatacenter

Bases: LLMBatchSizeControlledDatacenter[OfflineDatacenterState]

Trace-based datacenter simulation with step-by-step interface.

Each step call computes one timestep of power output by indexing into pre-built per-GPU templates, applying per-server amplitude scaling and noise, and summing across active servers per phase.

Batch size changes via apply_control take effect on the next step call.

If workload.inference_ramps is set, a RampActivationPolicy is created per model.

Parameters:

Name Type Description Default
datacenter DatacenterConfig

Facility configuration (GPUs per server, base load).

required
workload OfflineWorkload

Offline workload configuration bundling inference data, training overlays, and server ramp events.

required
dt_s Fraction

Simulation timestep (seconds).

required
seed int

Random seed for layout generation, noise, and latency sampling. Sub-seeds are derived deterministically.

0
power_augmentation PowerAugmentationConfig | None

Per-server amplitude scaling and noise settings.

None
Source code in openg2g/datacenter/offline.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
class OfflineDatacenter(LLMBatchSizeControlledDatacenter[OfflineDatacenterState]):
    """Trace-based datacenter simulation with step-by-step interface.

    Each `step` call computes one timestep of power output by indexing
    into pre-built per-GPU templates, applying per-server amplitude
    scaling and noise, and summing across active servers per phase.

    Batch size changes via `apply_control` take effect on the next
    `step` call.

    If `workload.inference_ramps` is set, a
    [`RampActivationPolicy`][openg2g.datacenter.layout.RampActivationPolicy]
    is created per model.

    Args:
        datacenter: Facility configuration (GPUs per server, base load).
        workload: Offline workload configuration bundling inference data,
            training overlays, and server ramp events.
        dt_s: Simulation timestep (seconds).
        seed: Random seed for layout generation, noise, and latency
            sampling. Sub-seeds are derived deterministically.
        power_augmentation: Per-server amplitude scaling and noise
            settings.
    """

    def __init__(
        self,
        datacenter: DatacenterConfig,
        workload: OfflineWorkload,
        *,
        dt_s: Fraction,
        seed: int = 0,
        power_augmentation: PowerAugmentationConfig | None = None,
        load_shift_headroom: float = 0.0,
        total_gpu_capacity: int,
    ) -> None:
        super().__init__()
        if power_augmentation is None:
            power_augmentation = PowerAugmentationConfig()

        self._datacenter = datacenter
        self._workload = workload
        self._power_augmentation = power_augmentation
        self._load_shift_headroom = load_shift_headroom
        self._total_gpu_capacity = total_gpu_capacity
        self._replica_counts = dict(workload.replica_counts)
        self._dt_s = dt_s
        self._seed = int(seed)
        self._models = list(workload.inference_data.models)
        self._base_W_per_phase = float(datacenter.base_kw_per_phase) * 1e3

        # Validate: initial GPU usage must not exceed capacity
        initial_usage = sum(self._replica_counts.get(ms.model_label, 0) * ms.gpus_per_replica for ms in self._models)
        if initial_usage > total_gpu_capacity:
            raise ValueError(f"Initial GPU usage ({initial_usage}) exceeds total_gpu_capacity ({total_gpu_capacity}).")

        # Validate ramp schedule against GPU capacity
        self._validate_ramp_capacity()

        self._layout_rng = np.random.default_rng(self._seed)
        self._batch_by_model: dict[str, int] = {
            ms.model_label: self._workload.initial_batch_sizes.get(ms.model_label, ms.feasible_batch_sizes[0])
            for ms in self._models
        }

        self._layouts: dict[str, ServerLayout] = {}
        self._policies: dict[str, ActivationPolicy] = {}
        self._build_all_layouts()
        self._inference_augmenter = InferencePowerAugmenter(
            layouts=self._layouts,
            policies=self._policies,
            seed=self._seed + 12345,
        )

        self._global_step: int = 0
        self._latency_rng = np.random.default_rng(self._seed + 54321)
        self._replica_offset_by_model: dict[str, int] = {ms.model_label: 0 for ms in self._models}

        logger.info(
            "OfflineDatacenter: %d models, dt=%s s, seed=%d, gpu_capacity=%d",
            len(self._models),
            dt_s,
            seed,
            total_gpu_capacity,
        )
        for ms in self._models:
            n_rep = self._replica_counts.get(ms.model_label, 0)
            logger.info(
                "  %s: %d replicas, %d GPUs/replica, batch=%d",
                ms.model_label,
                n_rep,
                ms.gpus_per_replica,
                self._batch_by_model.get(ms.model_label, 0),
            )

    def _validate_ramp_capacity(self) -> None:
        """Check that the ramp schedule never exceeds total GPU capacity."""
        schedule = self._workload.inference_ramps
        if not schedule:
            return
        # Collect all ramp boundary times
        boundary_times: set[float] = set()
        for _ramp, t_start, t_end in schedule:
            boundary_times.add(t_start)
            boundary_times.add(t_end)
        # At each boundary, compute total GPU usage
        for t in sorted(boundary_times):
            total_gpus = 0
            for ms in self._models:
                label = ms.model_label
                initial = self._replica_counts.get(label, 0)
                model_schedule = schedule.for_model(label, initial_count=initial)
                count = model_schedule.count_at(t)
                total_gpus += int(round(float(count))) * ms.gpus_per_replica
            if total_gpus > self._total_gpu_capacity:
                raise ValueError(
                    f"Ramp schedule exceeds total_gpu_capacity at t={t:.1f}s: "
                    f"needs {total_gpus} GPUs but capacity is {self._total_gpu_capacity}."
                )

    @property
    def total_gpu_capacity(self) -> int:
        """Maximum number of GPUs this datacenter can host."""
        return self._total_gpu_capacity

    def current_gpu_usage(self) -> int:
        """Current total GPU usage across all models (initial + offsets)."""
        total = 0
        for ms in self._models:
            initial = self._replica_counts.get(ms.model_label, 0)
            effective = max(0, initial + self._replica_offset_by_model.get(ms.model_label, 0))
            total += effective * ms.gpus_per_replica
        return total

    def available_gpu_capacity(self) -> int:
        """Remaining GPU slots available for incoming replicas."""
        return max(0, self._total_gpu_capacity - self.current_gpu_usage())

    @property
    def dt_s(self) -> Fraction:
        return self._dt_s

    def step(self, clock: SimulationClock, events: EventEmitter) -> OfflineDatacenterState:
        t_now = clock.time_s
        template_store = self._workload.inference_data.power_templates

        # Build per-GPU power dict by indexing into templates with layout offsets.
        per_gpu_by_model: dict[str, np.ndarray] = {}
        for ms in self._models:
            label = ms.model_label
            if self._replica_counts.get(label, 0) <= 0:
                continue
            batch = int(self._batch_by_model[label])

            layout = self._layouts[label]
            template = template_store.template(label, batch)
            indices = (self._global_step + layout.stagger_offsets) % len(template)
            per_gpu_by_model[label] = template[indices]

        inference_aug = self._inference_augmenter.augment(per_gpu_by_model, t_now)

        power_by_model = dict(inference_aug.power_by_model_w)
        active_replicas_by_model = dict(inference_aug.active_replicas_by_model)
        for ms in self._models:
            power_by_model.setdefault(ms.model_label, 0.0)
            active_replicas_by_model.setdefault(ms.model_label, 0)

        # This is where we accumulate power across workloads.
        phase_power = np.array(
            [
                self._base_W_per_phase + inference_aug.power_w.a,
                self._base_W_per_phase + inference_aug.power_w.b,
                self._base_W_per_phase + inference_aug.power_w.c,
            ]
        )

        # Training overlay
        for run, t_start, t_end in self._workload.training:
            training_power_w = run.eval_power(float(t_now), t_start, t_end)
            phase_power += training_power_w / 3.0

        # ITL sampling
        itl_fits = self._workload.inference_data.itl_fits
        observed_itl_s_by_model: dict[str, float] = {}
        for ms in self._models:
            label = ms.model_label
            n_rep = active_replicas_by_model.get(label, 0)
            if itl_fits is None or n_rep <= 0:
                observed_itl_s_by_model[label] = float("nan")
                continue
            batch = int(self._batch_by_model[label])
            observed_itl_s_by_model[label] = itl_fits.sample_avg(
                model_label=label,
                batch_size=batch,
                n_replicas=n_rep,
                rng=self._latency_rng,
            )

        state = OfflineDatacenterState(
            time_s=float(t_now),
            power_w=ThreePhase(
                a=float(phase_power[0]),
                b=float(phase_power[1]),
                c=float(phase_power[2]),
            ),
            power_by_model_w=power_by_model,
            active_replicas_by_model=active_replicas_by_model,
            batch_size_by_model=dict(self._batch_by_model),
            observed_itl_s_by_model=observed_itl_s_by_model,
        )
        self._global_step += 1
        return state

    @functools.singledispatchmethod
    def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
        """Apply a control command. Dispatches on command type."""
        raise TypeError(f"OfflineDatacenter does not support {type(command).__name__}")

    @apply_control.register
    def apply_control_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
        """Record new batch sizes. Changes take effect on the next step."""
        if command.ramp_up_rate_by_model:
            raise ValueError(
                f"OfflineDatacenter does not support ramp_up_rate_by_model (got {command.ramp_up_rate_by_model}). "
                f"Batch size changes are always immediate in trace-based simulation."
            )
        for label, b in command.batch_size_by_model.items():
            b_int = int(b)
            if b_int <= 0:
                raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
            old = self._batch_by_model.get(str(label))
            self._batch_by_model[str(label)] = b_int
            if old != b_int:
                logger.info("Batch size %s: %s -> %d", label, old, b_int)
        events.emit(
            "datacenter.batch_size.updated",
            {"batch_size_by_model": dict(self._batch_by_model)},
        )

    @apply_control.register
    def apply_control_shift_replicas(self, command: ShiftReplicas, events: EventEmitter) -> None:
        """Shift replicas for a model by adjusting the activation policy base count."""
        label = command.model_label
        delta = command.replica_delta
        if label not in self._replica_offset_by_model:
            logger.warning("ShiftReplicas: unknown model %s, ignoring", label)
            return

        old_offset = self._replica_offset_by_model[label]
        new_offset = old_offset + delta

        # Find the model spec to compute server delta
        ms = next((m for m in self._models if m.model_label == label), None)
        initial = self._replica_counts.get(label, 0)
        if ms is None or initial <= 0:
            return
        # Compute new effective replica count (clamped to >= 0)
        effective_replicas = max(0, initial + new_offset)

        # Enforce total GPU capacity when adding replicas
        if delta > 0:
            gpus_needed = delta * ms.gpus_per_replica
            available = self.available_gpu_capacity()
            if gpus_needed > available:
                # Clamp to available capacity
                max_replicas = available // ms.gpus_per_replica
                if max_replicas <= 0:
                    logger.warning(
                        "ShiftReplicas %s: rejected, no GPU capacity (need %d, have %d)",
                        label,
                        gpus_needed,
                        available,
                    )
                    return
                effective_replicas = initial + old_offset + max_replicas
                logger.info(
                    "ShiftReplicas %s: clamped from %+d to %+d replicas (GPU cap %d, used %d)",
                    label,
                    delta,
                    max_replicas,
                    self._total_gpu_capacity,
                    self.current_gpu_usage(),
                )

        # Update policy replica offset
        policy = self._policies.get(label)
        if policy is None:
            return

        new_offset_val = effective_replicas - initial
        self._replica_offset_by_model[label] = new_offset_val
        policy._replica_offset = new_offset_val

        if old_offset != new_offset_val:
            logger.info(
                "ShiftReplicas %s: offset %+d -> %+d, effective_replicas=%d (cap %d)",
                label,
                old_offset,
                new_offset_val,
                effective_replicas,
                self._total_gpu_capacity,
            )

        events.emit(
            "datacenter.replicas.shifted",
            {"model_label": label, "replica_delta": delta, "effective_replicas": effective_replicas},
        )

    def reset(self) -> None:
        self._global_step = 0
        self._batch_by_model = {
            ms.model_label: self._workload.initial_batch_sizes.get(ms.model_label, ms.feasible_batch_sizes[0])
            for ms in self._models
        }
        self._replica_offset_by_model = {ms.model_label: 0 for ms in self._models}
        self._layout_rng = np.random.default_rng(self._seed)
        self._layouts = {}
        self._policies = {}
        self._build_all_layouts()
        self._inference_augmenter = InferencePowerAugmenter(
            layouts=self._layouts,
            policies=self._policies,
            seed=self._seed + 12345,
        )
        self._latency_rng = np.random.default_rng(self._seed + 54321)

    def _build_all_layouts(self) -> None:
        """Build layouts and activation policies for all models."""
        schedule = self._workload.inference_ramps
        rng = self._layout_rng
        gpus_per_server = self._datacenter.gpus_per_server
        amp_lo, amp_hi = self._power_augmentation.amplitude_scale_range
        noise_fraction = self._power_augmentation.noise_fraction
        template_store = self._workload.inference_data.power_templates

        for ms in self._models:
            initial_replicas = self._replica_counts.get(ms.model_label, 0)
            if initial_replicas > 0:
                any_batch = template_store.batch_sizes(ms.model_label)[0]
                tpl_len = len(template_store.template(ms.model_label, any_batch))

                # Per-model ramp schedule with absolute replica counts.
                model_schedule = schedule.for_model(ms.model_label, initial_count=initial_replicas)
                max_replicas = model_schedule.max_count()

                # Account for load-shift headroom
                headroom_replicas = int(math.ceil(initial_replicas * self._load_shift_headroom))
                peak_replicas = max(max_replicas, initial_replicas + headroom_replicas)
                peak_gpus = peak_replicas * ms.gpus_per_replica
                num_servers = math.ceil(peak_gpus / gpus_per_server)

                # Phase shuffle
                sA, sB, sC = split_integer_evenly(num_servers, 3)
                phase_list = np.asarray(([0] * sA) + ([1] * sB) + ([2] * sC), dtype=int)
                rng.shuffle(phase_list)

                # Policy dictates which servers are active at a given time.
                self._policies[ms.model_label] = RampActivationPolicy(
                    model_schedule,
                    num_servers,
                    rng,
                    gpus_per_replica=ms.gpus_per_replica,
                    gpus_per_server=gpus_per_server,
                )

                # This offset determines for each server, how much to stagger its power template indexing.
                stagger_offsets = rng.integers(low=0, high=max(tpl_len, 1), size=num_servers)

                # Amplitude scales
                amplitude_scales = rng.uniform(amp_lo, amp_hi, size=num_servers)

                total_gpus = num_servers * gpus_per_server
                gpus_per_server_list = np.full(num_servers, gpus_per_server, dtype=int)
                # Adjust last server for the actual GPU count at peak.
                tail = peak_gpus - (num_servers - 1) * gpus_per_server
                gpus_per_server_list[-1] = max(1, int(tail)) if tail > 0 else gpus_per_server
                total_gpus = int(gpus_per_server_list.sum())

                self._layouts[ms.model_label] = ServerLayout(
                    num_servers=num_servers,
                    total_gpus=total_gpus,
                    gpus_per_replica=ms.gpus_per_replica,
                    gpus_per_server_list=gpus_per_server_list,
                    phase_list=phase_list,
                    stagger_offsets=stagger_offsets,
                    amplitude_scales=amplitude_scales,
                    noise_fraction=noise_fraction,
                )

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors derived from server placement.

        Returns:
            Mapping of model label to a 3-element array `[frac_A, frac_B, frac_C]`
                representing the fraction of servers on each phase.
        """
        shares: dict[str, np.ndarray] = {}
        for label, layout in self._layouts.items():
            counts = np.bincount(layout.phase_list, minlength=3).astype(float)
            total = counts.sum()
            if total > 0:
                shares[label] = counts / total
            else:
                shares[label] = np.array([1 / 3, 1 / 3, 1 / 3], dtype=float)
        return shares

total_gpu_capacity property

Maximum number of GPUs this datacenter can host.

phase_share_by_model property

Per-model phase share vectors derived from server placement.

Returns:

Type Description
dict[str, ndarray]

Mapping of model label to a 3-element array [frac_A, frac_B, frac_C] representing the fraction of servers on each phase.

current_gpu_usage()

Current total GPU usage across all models (initial + offsets).

Source code in openg2g/datacenter/offline.py
def current_gpu_usage(self) -> int:
    """Current total GPU usage across all models (initial + offsets)."""
    total = 0
    for ms in self._models:
        initial = self._replica_counts.get(ms.model_label, 0)
        effective = max(0, initial + self._replica_offset_by_model.get(ms.model_label, 0))
        total += effective * ms.gpus_per_replica
    return total

available_gpu_capacity()

Remaining GPU slots available for incoming replicas.

Source code in openg2g/datacenter/offline.py
def available_gpu_capacity(self) -> int:
    """Remaining GPU slots available for incoming replicas."""
    return max(0, self._total_gpu_capacity - self.current_gpu_usage())

apply_control(command, events)

Apply a control command. Dispatches on command type.

Source code in openg2g/datacenter/offline.py
@functools.singledispatchmethod
def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
    """Apply a control command. Dispatches on command type."""
    raise TypeError(f"OfflineDatacenter does not support {type(command).__name__}")

apply_control_set_batch_size(command, events)

Record new batch sizes. Changes take effect on the next step.

Source code in openg2g/datacenter/offline.py
@apply_control.register
def apply_control_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
    """Record new batch sizes. Changes take effect on the next step."""
    if command.ramp_up_rate_by_model:
        raise ValueError(
            f"OfflineDatacenter does not support ramp_up_rate_by_model (got {command.ramp_up_rate_by_model}). "
            f"Batch size changes are always immediate in trace-based simulation."
        )
    for label, b in command.batch_size_by_model.items():
        b_int = int(b)
        if b_int <= 0:
            raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
        old = self._batch_by_model.get(str(label))
        self._batch_by_model[str(label)] = b_int
        if old != b_int:
            logger.info("Batch size %s: %s -> %d", label, old, b_int)
    events.emit(
        "datacenter.batch_size.updated",
        {"batch_size_by_model": dict(self._batch_by_model)},
    )

apply_control_shift_replicas(command, events)

Shift replicas for a model by adjusting the activation policy base count.

Source code in openg2g/datacenter/offline.py
@apply_control.register
def apply_control_shift_replicas(self, command: ShiftReplicas, events: EventEmitter) -> None:
    """Shift replicas for a model by adjusting the activation policy base count."""
    label = command.model_label
    delta = command.replica_delta
    if label not in self._replica_offset_by_model:
        logger.warning("ShiftReplicas: unknown model %s, ignoring", label)
        return

    old_offset = self._replica_offset_by_model[label]
    new_offset = old_offset + delta

    # Find the model spec to compute server delta
    ms = next((m for m in self._models if m.model_label == label), None)
    initial = self._replica_counts.get(label, 0)
    if ms is None or initial <= 0:
        return
    # Compute new effective replica count (clamped to >= 0)
    effective_replicas = max(0, initial + new_offset)

    # Enforce total GPU capacity when adding replicas
    if delta > 0:
        gpus_needed = delta * ms.gpus_per_replica
        available = self.available_gpu_capacity()
        if gpus_needed > available:
            # Clamp to available capacity
            max_replicas = available // ms.gpus_per_replica
            if max_replicas <= 0:
                logger.warning(
                    "ShiftReplicas %s: rejected, no GPU capacity (need %d, have %d)",
                    label,
                    gpus_needed,
                    available,
                )
                return
            effective_replicas = initial + old_offset + max_replicas
            logger.info(
                "ShiftReplicas %s: clamped from %+d to %+d replicas (GPU cap %d, used %d)",
                label,
                delta,
                max_replicas,
                self._total_gpu_capacity,
                self.current_gpu_usage(),
            )

    # Update policy replica offset
    policy = self._policies.get(label)
    if policy is None:
        return

    new_offset_val = effective_replicas - initial
    self._replica_offset_by_model[label] = new_offset_val
    policy._replica_offset = new_offset_val

    if old_offset != new_offset_val:
        logger.info(
            "ShiftReplicas %s: offset %+d -> %+d, effective_replicas=%d (cap %d)",
            label,
            old_offset,
            new_offset_val,
            effective_replicas,
            self._total_gpu_capacity,
        )

    events.emit(
        "datacenter.replicas.shifted",
        {"model_label": label, "replica_delta": delta, "effective_replicas": effective_replicas},
    )

openg2g.datacenter.online

Online (live GPU) datacenter backend with power augmentation.

Connects to real vLLM inference servers for load generation and ITL measurement, and to zeusd instances for live GPU power monitoring. Power readings from a small number of real GPUs are augmented to datacenter scale using the shared InferencePowerAugmenter pipeline.

Requires pip install zeus aiohttp.

STAGGER_BUFFER_S = 10.0 module-attribute

Seconds of power history for temporal staggering.

Also used as the stagger range when building ServerLayout (float offsets drawn from [0, STAGGER_BUFFER_S)).

Not user-configurable. Patchable for testing via openg2g.datacenter.online.STAGGER_BUFFER_S = ....

OnlineDatacenterState dataclass

Bases: LLMDatacenterState

Extended state from the online (live GPU) backend.

The base power_w field carries the augmented three-phase power (what the grid sees). This subclass adds the measured (pre-augmentation) breakdown for post-hoc analysis.

Attributes:

Name Type Description
measured_power_w ThreePhase

Total measured three-phase power from real GPUs (before augmentation), plus base load.

measured_power_w_by_model dict[str, float]

Per-model total measured power from real GPUs (watts).

augmented_power_w_by_model dict[str, float]

Per-model augmented power (watts). This is the power fed to the grid for each model after scaling up.

augmentation_factor_by_model dict[str, float]

Per-model augmentation multiplier (virtual replicas / real replicas).

prometheus_metrics_by_model dict[str, dict[str, float]]

Per-model Prometheus metrics snapshot. Keys are model labels, values are dicts with metric names like num_requests_running, num_requests_waiting, kv_cache_usage_perc, num_preemptions_total.

Source code in openg2g/datacenter/online.py
@dataclass(frozen=True)
class OnlineDatacenterState(LLMDatacenterState):
    """Extended state from the online (live GPU) backend.

    The base `power_w`
    field carries the augmented three-phase power (what the grid sees).
    This subclass adds the measured (pre-augmentation) breakdown for
    post-hoc analysis.

    Attributes:
        measured_power_w: Total measured three-phase power from real GPUs
            (before augmentation), plus base load.
        measured_power_w_by_model: Per-model total measured power from real
            GPUs (watts).
        augmented_power_w_by_model: Per-model augmented power (watts). This
            is the power fed to the grid for each model after scaling up.
        augmentation_factor_by_model: Per-model augmentation multiplier
            (virtual replicas / real replicas).
        prometheus_metrics_by_model: Per-model Prometheus metrics snapshot.
            Keys are model labels, values are dicts with metric names like
            `num_requests_running`, `num_requests_waiting`,
            `kv_cache_usage_perc`, `num_preemptions_total`.
    """

    measured_power_w: ThreePhase = field(default_factory=lambda: ThreePhase(a=0.0, b=0.0, c=0.0))
    measured_power_w_by_model: dict[str, float] = field(default_factory=dict)
    augmented_power_w_by_model: dict[str, float] = field(default_factory=dict)
    augmentation_factor_by_model: dict[str, float] = field(default_factory=dict)
    prometheus_metrics_by_model: dict[str, dict[str, float]] = field(default_factory=dict)

GPUEndpointMapping

Bases: BaseModel

Maps a zeusd endpoint to specific GPUs.

Attributes:

Name Type Description
host str

Hostname or IP of the zeusd instance.

port int

TCP port of the zeusd instance.

gpu_indices tuple[int, ...]

GPU device indices to monitor on this endpoint.

Source code in openg2g/datacenter/online.py
class GPUEndpointMapping(BaseModel):
    """Maps a zeusd endpoint to specific GPUs.

    Attributes:
        host: Hostname or IP of the zeusd instance.
        port: TCP port of the zeusd instance.
        gpu_indices: GPU device indices to monitor on this endpoint.
    """

    model_config = ConfigDict(frozen=True)

    host: str
    port: int = 4938
    gpu_indices: tuple[int, ...] = (0,)

    @property
    def endpoint_key(self) -> str:
        """Return the `host:port` key used by `PowerStreamingClient`."""
        return f"{self.host}:{self.port}"

endpoint_key property

Return the host:port key used by PowerStreamingClient.

VLLMDeployment

Bases: BaseModel

Deployment of one LLM model on a vLLM server.

Warning

vLLM must be a patched version with the POST /set_max_num_seqs endpoint implemented.

Pairs a reusable InferenceModelSpec with physical deployment details. simulated_num_replicas is the augmented replica count for grid simulation. The real replica count is derived from gpu_endpoints and spec.gpus_per_replica.

Tracks the current batch size (max_num_seqs) and provides set_batch_size() to update it on the vLLM server.

Attributes:

Name Type Description
spec InferenceModelSpec

Model specification (shared with offline datacenter).

simulated_num_replicas int

Number of replicas to simulate for grid power augmentation. Must be specified explicitly.

vllm_base_url str

Base URL of the vLLM server (e.g. http://node1:8000).

gpu_endpoints tuple[GPUEndpointMapping, ...]

GPU endpoint mappings for power monitoring.

request_extra_body dict[str, Any] | None

Extra fields merged into every request dict for this model (e.g. chat_template_kwargs).

initial_batch_size int

Starting batch size. The batch_size field is initialized from this value.

batch_size int

Current batch size (max_num_seqs). Initialized from initial_batch_size if not set explicitly.

Source code in openg2g/datacenter/online.py
class VLLMDeployment(BaseModel):
    """Deployment of one LLM model on a vLLM server.

    !!! Warning
        vLLM must be a patched version with the `POST /set_max_num_seqs`
        endpoint implemented.

    Pairs a reusable
    [`InferenceModelSpec`][openg2g.datacenter.config.InferenceModelSpec]
    with physical deployment details. ``simulated_num_replicas`` is the
    augmented replica count for grid simulation. The real replica
    count is derived from ``gpu_endpoints`` and ``spec.gpus_per_replica``.

    Tracks the current batch size (`max_num_seqs`) and provides
    `set_batch_size()` to update it on the vLLM server.

    Attributes:
        spec: Model specification (shared with offline datacenter).
        simulated_num_replicas: Number of replicas to simulate for grid
            power augmentation. Must be specified explicitly.
        vllm_base_url: Base URL of the vLLM server (e.g. `http://node1:8000`).
        gpu_endpoints: GPU endpoint mappings for power monitoring.
        request_extra_body: Extra fields merged into every request dict
            for this model (e.g. `chat_template_kwargs`).
        initial_batch_size: Starting batch size. The ``batch_size`` field
            is initialized from this value.
        batch_size: Current batch size (`max_num_seqs`). Initialized from
            ``initial_batch_size`` if not set explicitly.
    """

    spec: InferenceModelSpec
    simulated_num_replicas: int
    initial_batch_size: int
    vllm_base_url: str
    gpu_endpoints: tuple[GPUEndpointMapping, ...] = ()
    request_extra_body: dict[str, Any] | None = None
    batch_size: int = 0

    def model_post_init(self, __context: Any) -> None:
        if self.batch_size == 0:
            self.batch_size = self.initial_batch_size

    @property
    def model_label(self) -> str:
        return self.spec.model_label

    @property
    def num_real_gpus(self) -> int:
        """Total number of real GPUs for this model across all endpoints."""
        return sum(len(ep.gpu_indices) for ep in self.gpu_endpoints)

    @property
    def num_real_replicas(self) -> int:
        """Number of real replicas (real GPUs / GPUs per replica)."""
        return self.num_real_gpus // max(self.spec.gpus_per_replica, 1)

    @property
    def augmentation_factor(self) -> float:
        """Ratio of simulated replicas to real replicas."""
        return self.simulated_num_replicas / max(self.num_real_replicas, 1)

    def set_batch_size(self, batch_size: int, ramp_up_rate: float = 0.0) -> None:
        """Update batch size on the vLLM server and track it locally.

        Sends `POST /set_max_num_seqs` to the vLLM server.

        Args:
            batch_size: New batch size (max_num_seqs) to set.
            ramp_up_rate: Optional ramp-up rate for gradual increase.
        """
        old = self.batch_size
        url = f"{self.vllm_base_url}/set_max_num_seqs?max_num_seqs={batch_size}"
        if ramp_up_rate > 0:
            url += f"&ramp_up_rate={ramp_up_rate}"
        try:
            req = urllib.request.Request(url, method="POST", data=b"")
            with urllib.request.urlopen(req, timeout=2.0) as resp:
                if resp.status >= 400:
                    raise RuntimeError(
                        f"Failed to set batch size {batch_size} on {self.vllm_base_url}: HTTP {resp.status}"
                    )
        except Exception:
            logger.error(
                "Failed to set batch size %d on %s (keeping old=%d)",
                batch_size,
                self.vllm_base_url,
                old,
                exc_info=True,
            )
            raise
        self.batch_size = batch_size
        if old != batch_size:
            logger.info("Batch size %s: %d -> %d", self.model_label, old, batch_size)

num_real_gpus property

Total number of real GPUs for this model across all endpoints.

num_real_replicas property

Number of real replicas (real GPUs / GPUs per replica).

augmentation_factor property

Ratio of simulated replicas to real replicas.

set_batch_size(batch_size, ramp_up_rate=0.0)

Update batch size on the vLLM server and track it locally.

Sends POST /set_max_num_seqs to the vLLM server.

Parameters:

Name Type Description Default
batch_size int

New batch size (max_num_seqs) to set.

required
ramp_up_rate float

Optional ramp-up rate for gradual increase.

0.0
Source code in openg2g/datacenter/online.py
def set_batch_size(self, batch_size: int, ramp_up_rate: float = 0.0) -> None:
    """Update batch size on the vLLM server and track it locally.

    Sends `POST /set_max_num_seqs` to the vLLM server.

    Args:
        batch_size: New batch size (max_num_seqs) to set.
        ramp_up_rate: Optional ramp-up rate for gradual increase.
    """
    old = self.batch_size
    url = f"{self.vllm_base_url}/set_max_num_seqs?max_num_seqs={batch_size}"
    if ramp_up_rate > 0:
        url += f"&ramp_up_rate={ramp_up_rate}"
    try:
        req = urllib.request.Request(url, method="POST", data=b"")
        with urllib.request.urlopen(req, timeout=2.0) as resp:
            if resp.status >= 400:
                raise RuntimeError(
                    f"Failed to set batch size {batch_size} on {self.vllm_base_url}: HTTP {resp.status}"
                )
    except Exception:
        logger.error(
            "Failed to set batch size %d on %s (keeping old=%d)",
            batch_size,
            self.vllm_base_url,
            old,
            exc_info=True,
        )
        raise
    self.batch_size = batch_size
    if old != batch_size:
        logger.info("Batch size %s: %d -> %d", self.model_label, old, batch_size)

LiveServerConfig

Bases: BaseModel

Configuration for interacting with live vLLM servers.

Groups settings related to load generation, ITL measurement, and Prometheus monitoring. The online counterpart of offline's trace/template data.

Attributes:

Name Type Description
requests_dir Path | None

Directory containing per-model JSONL request files (e.g. {model_label}.jsonl). If None, a minimal fallback request is used for each model.

prometheus_poll_interval_s float

How often to poll vLLM /metrics for request counts and saturation monitoring. Set to 0 to disable.

max_output_tokens int

Token limit for generated load requests (used by the fallback request when no JSONL requests are provided).

itl_window_s float

Sliding window for ITL averaging (seconds).

Source code in openg2g/datacenter/online.py
class LiveServerConfig(BaseModel):
    """Configuration for interacting with live vLLM servers.

    Groups settings related to load generation, ITL measurement, and
    Prometheus monitoring. The online counterpart of offline's
    trace/template data.

    Attributes:
        requests_dir: Directory containing per-model JSONL request files
            (e.g. `{model_label}.jsonl`). If `None`, a minimal fallback
            request is used for each model.
        prometheus_poll_interval_s: How often to poll vLLM /metrics for
            request counts and saturation monitoring. Set to 0 to disable.
        max_output_tokens: Token limit for generated load requests (used
            by the fallback request when no JSONL requests are provided).
        itl_window_s: Sliding window for ITL averaging (seconds).
    """

    model_config = ConfigDict(frozen=True)

    requests_dir: Path | None = None
    prometheus_poll_interval_s: float = 0.5
    max_output_tokens: int = 512
    itl_window_s: float = 1.0

OnlineDatacenter

Bases: LLMBatchSizeControlledDatacenter[OnlineDatacenterState]

Live GPU datacenter backend with power augmentation.

Dispatches inference load to vLLM servers, streams GPU power from zeusd, measures ITL from streaming responses, and augments power readings to datacenter scale using the shared InferencePowerAugmenter pipeline (same as OfflineDatacenter).

Call start before the first step and stop after the simulation loop finishes.

PowerStreamingClient is constructed internally from the GPU endpoints declared in each deployment. Health checks are always performed during start.

Parameters:

Name Type Description Default
datacenter DatacenterConfig

Facility configuration (GPUs per server, base load).

required
deployments Sequence[VLLMDeployment]

Model deployments with physical hardware mapping.

required
dt_s Fraction

Simulation timestep (seconds).

Fraction(1, 10)
seed int

Random seed for layout generation and noise.

0
power_augmentation PowerAugmentationConfig | None

Per-server amplitude scaling and noise settings.

None
inference_ramps InferenceRampSchedule | None

Inference server ramp event(s). None keeps all servers active.

None
live_server LiveServerConfig | None

Configuration for interacting with live vLLM servers. Request data is loaded from LiveServerConfig.requests_dir.

None
Source code in openg2g/datacenter/online.py
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
class OnlineDatacenter(LLMBatchSizeControlledDatacenter[OnlineDatacenterState]):
    """Live GPU datacenter backend with power augmentation.

    Dispatches inference load to vLLM servers, streams GPU power from
    zeusd, measures ITL from streaming responses, and augments power
    readings to datacenter scale using the shared
    [`InferencePowerAugmenter`][openg2g.datacenter.workloads.inference.InferencePowerAugmenter]
    pipeline (same as
    [`OfflineDatacenter`][openg2g.datacenter.offline.OfflineDatacenter]).

    Call [`start`][.start] before the first [`step`][.step] and
    [`stop`][.stop] after the simulation loop finishes.

    `PowerStreamingClient` is constructed internally from the GPU
    endpoints declared in each deployment. Health checks are always
    performed during [`start`][.start].

    Args:
        datacenter: Facility configuration (GPUs per server, base load).
        deployments: Model deployments with physical hardware mapping.
        dt_s: Simulation timestep (seconds).
        seed: Random seed for layout generation and noise.
        power_augmentation: Per-server amplitude scaling and noise
            settings.
        inference_ramps: Inference server ramp event(s). `None` keeps
            all servers active.
        live_server: Configuration for interacting with live vLLM
            servers. Request data is loaded from
            `LiveServerConfig.requests_dir`.
    """

    def __init__(
        self,
        datacenter: DatacenterConfig,
        deployments: Sequence[VLLMDeployment],
        *,
        dt_s: Fraction = Fraction(1, 10),
        seed: int = 0,
        power_augmentation: PowerAugmentationConfig | None = None,
        inference_ramps: InferenceRampSchedule | None = None,
        live_server: LiveServerConfig | None = None,
    ) -> None:
        super().__init__()
        if power_augmentation is None:
            power_augmentation = PowerAugmentationConfig()
        if live_server is None:
            live_server = LiveServerConfig()
        self._dt_s = dt_s
        self._seed = int(seed)
        self._deployments = list(deployments)
        self._deployment_map = {d.model_label: d for d in deployments}
        self._datacenter_config = datacenter
        self._power_augmentation = power_augmentation
        self._live_server_config = live_server

        self._base_W_per_phase = float(datacenter.base_kw_per_phase) * 1e3
        self._inference_ramp_schedule = inference_ramps if inference_ramps is not None else InferenceRampSchedule()

        servers_by_key: dict[str, ZeusdConfig] = {}
        gpu_indices_by_key: dict[str, list[int]] = {}
        for d in self._deployments:
            for ep in d.gpu_endpoints:
                key = ep.endpoint_key
                if key not in gpu_indices_by_key:
                    gpu_indices_by_key[key] = []
                for idx in ep.gpu_indices:
                    if idx not in gpu_indices_by_key[key]:
                        gpu_indices_by_key[key].append(idx)
                servers_by_key[key] = ZeusdConfig.tcp(
                    ep.host,
                    ep.port,
                    gpu_indices=gpu_indices_by_key[key],
                    cpu_indices=[],
                )
        self._power_client = PowerStreamingClient(servers=list(servers_by_key.values()))

        self._prometheus = (
            _PrometheusPoller(
                deployments,
                poll_interval_s=live_server.prometheus_poll_interval_s,
            )
            if live_server.prometheus_poll_interval_s > 0
            else None
        )

        self._request_store = RequestStore.load(live_server.requests_dir) if live_server.requests_dir else None
        self._load_gen = _LoadGenerator(
            deployments,
            request_store=self._request_store,
            max_output_tokens=live_server.max_output_tokens,
            itl_window_s=live_server.itl_window_s,
            prometheus_poller=self._prometheus,
        )

        self._layout_rng = np.random.default_rng(self._seed)
        self._layouts: dict[str, ServerLayout] = {}
        self._policies: dict[str, ActivationPolicy] = {}
        self._build_all_layouts()
        self._inference_augmenter = InferencePowerAugmenter(
            layouts=self._layouts,
            policies=self._policies,
            seed=self._seed + 12345,
        )
        self._rolling_buffer = _RollingPowerBuffer(
            [d.model_label for d in deployments],
            max_samples=max(int(STAGGER_BUFFER_S * 100), 1000),
        )

        self._started = False

        logger.info(
            "OnlineDatacenter: %d deployments, dt=%s s",
            len(self._deployments),
            dt_s,
        )
        for d in deployments:
            layout = self._layouts.get(d.model_label)
            n_servers = layout.num_servers if layout else 0
            logger.info(
                "  %s: %d real GPUs, %d simulated replicas (%.0fx augmentation), %d virtual servers, vllm=%s",
                d.model_label,
                d.num_real_gpus,
                d.simulated_num_replicas,
                d.augmentation_factor,
                n_servers,
                d.vllm_base_url,
            )

    def _build_all_layouts(self) -> None:
        """Build ServerLayout and activation policies for each deployed model.

        The RNG invocation order per model must be: phase shuffle,
        priority shuffle, stagger offsets, amplitude scales. We
        interleave policy construction between the phase shuffle
        and stagger/amplitude draws to preserve this ordering.
        """
        schedule = self._inference_ramp_schedule
        gpus_per_server = self._datacenter_config.gpus_per_server
        rng = self._layout_rng
        amp_lo, amp_hi = self._power_augmentation.amplitude_scale_range
        noise_fraction = self._power_augmentation.noise_fraction
        stagger_s = float(STAGGER_BUFFER_S)

        for d in self._deployments:
            spec = d.spec
            n_replicas = d.simulated_num_replicas
            if n_replicas > 0:
                total_gpus = n_replicas * spec.gpus_per_replica
                num_servers = math.ceil(total_gpus / gpus_per_server)

                # Per-model schedule with absolute counts
                model_schedule = schedule.for_model(spec.model_label, initial_count=n_replicas)

                # Phase shuffle (consumes RNG)
                sA, sB, sC = split_integer_evenly(num_servers, 3)
                phase_list = np.asarray(([0] * sA) + ([1] * sB) + ([2] * sC), dtype=int)
                rng.shuffle(phase_list)

                # Priority shuffle (consumes RNG) — must happen here
                self._policies[d.model_label] = RampActivationPolicy(
                    model_schedule,
                    num_servers,
                    rng,
                    gpus_per_replica=spec.gpus_per_replica,
                    gpus_per_server=gpus_per_server,
                )

                # Stagger offsets (consumes RNG) — float for online
                stagger_offsets = rng.uniform(0.0, max(stagger_s, 1e-9), size=num_servers)

                # Amplitude scales (consumes RNG)
                amplitude_scales = rng.uniform(amp_lo, amp_hi, size=num_servers)

                gpus_per_server_list = np.full(num_servers, gpus_per_server, dtype=int)
                tail = total_gpus - (num_servers - 1) * gpus_per_server
                gpus_per_server_list[-1] = int(tail) if tail > 0 else gpus_per_server

                self._layouts[d.model_label] = ServerLayout(
                    num_servers=num_servers,
                    total_gpus=total_gpus,
                    gpus_per_replica=spec.gpus_per_replica,
                    gpus_per_server_list=gpus_per_server_list,
                    phase_list=phase_list,
                    stagger_offsets=stagger_offsets,
                    amplitude_scales=amplitude_scales,
                    noise_fraction=noise_fraction,
                )

    @property
    def dt_s(self) -> Fraction:
        return self._dt_s

    @property
    def phase_share_by_model(self) -> dict[str, np.ndarray]:
        """Per-model phase share vectors derived from server layout."""
        shares: dict[str, np.ndarray] = {}
        for label, layout in self._layouts.items():
            counts = np.bincount(layout.phase_list, minlength=3).astype(float)
            total = counts.sum()
            if total > 0:
                shares[label] = counts / total
            else:
                shares[label] = np.array([1 / 3, 1 / 3, 1 / 3], dtype=float)
        return shares

    def reset(self) -> None:
        if self._started:
            self._load_gen.stop()
        self._load_gen = _LoadGenerator(
            self._deployments,
            request_store=self._request_store,
            max_output_tokens=self._live_server_config.max_output_tokens,
            itl_window_s=self._live_server_config.itl_window_s,
            prometheus_poller=self._prometheus,
        )
        self._layout_rng = np.random.default_rng(self._seed)
        self._layouts = {}
        self._policies = {}
        self._build_all_layouts()
        self._inference_augmenter = InferencePowerAugmenter(
            layouts=self._layouts,
            policies=self._policies,
            seed=self._seed + 12345,
        )
        self._rolling_buffer.clear()
        for d in self._deployments:
            d.batch_size = d.initial_batch_size
        self._started = False

    def start(self) -> None:
        """Start load generation, warm up servers, and fill the power buffer.

        Sequence:
            1. Run health checks on all vLLM servers and zeusd instances.
            2. Wait for at least one power reading per endpoint (10 s timeout).
            3. Set initial batch sizes on all vLLM servers.
            4. Start load generation threads.
            5. Warm up: poll power into the rolling buffer while waiting for
               each model's `num_requests_running` to reach 95% of its
               `initial_batch_size`. Fails after 60 s if any model does not
               saturate.
        """
        if self._started:
            raise RuntimeError("OnlineDatacenter already started")

        logger.info("Starting OnlineDatacenter with %d deployments", len(self._deployments))

        # 1. Health checks
        logger.info("Running health checks...")
        for d in self._deployments:
            _check_vllm_health(d.vllm_base_url)
            _check_vllm_model(d.vllm_base_url, d.spec.model_id)
            for ep in d.gpu_endpoints:
                _check_zeusd_health(ep.host, ep.port)
        logger.info("All health checks passed")

        # 2. Wait for power readings from all endpoints
        all_endpoints: set[str] = set()
        for d in self._deployments:
            for ep in d.gpu_endpoints:
                all_endpoints.add(ep.endpoint_key)

        deadline = time.monotonic() + 10.0
        while time.monotonic() < deadline:
            readings = self._power_client.get_power()
            if all_endpoints.issubset(readings.keys()):
                logger.info("Power readings received from all %d endpoints", len(all_endpoints))
                break
            time.sleep(0.5)
        else:
            connected = set(self._power_client.get_power().keys())
            missing = all_endpoints - connected
            logger.warning("Timed out waiting for power readings from: %s", missing)

        # 3. Set initial batch sizes on vLLM servers
        for d in self._deployments:
            d.set_batch_size(d.initial_batch_size)

        # 4. Start load generation (and Prometheus poller)
        self._load_gen.start()
        logger.info("LoadGenerator started")

        # 5. Warm up: fill power buffer + wait for server saturation
        self._warmup()

        self._started = True
        logger.info("OnlineDatacenter ready")

    def _poll_power_into_buffer(self) -> tuple[float, dict[str, float]]:
        """Read GPU power from all endpoints and feed the rolling buffer.

        Returns:
            Tuple of (monotonic timestamp, per-model average per-GPU watts).
        """
        now = time.monotonic()
        raw_power = self._power_client.get_power()
        per_gpu_by_model: dict[str, float] = {}
        for d in self._deployments:
            total_w = 0.0
            n_gpus = 0
            for ep in d.gpu_endpoints:
                pr = raw_power.get(ep.endpoint_key)
                if pr is None:
                    continue
                for idx in ep.gpu_indices:
                    if idx in pr.gpu_power_w:
                        total_w += pr.gpu_power_w[idx]
                        n_gpus += 1
            per_gpu_w = total_w / n_gpus if n_gpus > 0 else 0.0
            self._rolling_buffer.append(d.model_label, now, per_gpu_w)
            per_gpu_by_model[d.model_label] = per_gpu_w
        return now, per_gpu_by_model

    def _warmup(
        self,
        timeout_s: float = 60.0,
        saturation_threshold: float = 0.95,
        poll_interval_s: float = 0.1,
    ) -> None:
        """Fill the rolling power buffer and wait for vLLM server saturation.

        Actively polls GPU power to fill the rolling buffer while monitoring
        Prometheus `num_requests_running` to verify each model has reached
        `saturation_threshold` of its `initial_batch_size`.

        Completion requires both conditions for every model:
            1. `num_requests_running >= saturation_threshold * initial_batch_size`
            2. At least `stagger_buffer_s` has elapsed since that model first
               reached saturation (so the buffer contains a full stagger
               window of steady-state power data).

        Args:
            timeout_s: Maximum warmup duration in seconds.
            saturation_threshold: Fraction of `initial_batch_size` that
                `num_requests_running` must reach (0.0-1.0).
            poll_interval_s: Seconds between power polls.

        Raises:
            RuntimeError: If any model fails to saturate within `timeout_s`.
                Includes the `num_requests_running` trajectory for failed
                models.
        """
        stagger_s = STAGGER_BUFFER_S
        logger.info(
            "Warming up: waiting for server saturation (%.0f%% of initial_batch_size) "
            "+ %.1f s buffer fill per model...",
            saturation_threshold * 100,
            stagger_s,
        )

        warmup_start = time.monotonic()
        deadline = warmup_start + timeout_s
        last_log = warmup_start

        trajectory: dict[str, list[tuple[float, float]]] = {d.model_label: [] for d in self._deployments}
        saturation_time: dict[str, float | None] = {d.model_label: None for d in self._deployments}

        while time.monotonic() < deadline:
            now = time.monotonic()
            elapsed = now - warmup_start

            self._poll_power_into_buffer()

            all_ready = True
            if self._prometheus is not None:
                prom = self._prometheus.get_latest()
                for d in self._deployments:
                    label = d.model_label
                    running = prom.get(label, {}).get("num_requests_running", 0.0)
                    trajectory[label].append((elapsed, running))
                    target = d.initial_batch_size * saturation_threshold

                    if running >= target and saturation_time[label] is None:
                        saturation_time[label] = now
                        logger.info(
                            "  %s saturated at t=%.1f s (num_requests_running=%.0f)",
                            label,
                            elapsed,
                            running,
                        )

                    sat_t = saturation_time[label]
                    if sat_t is None or (now - sat_t) < stagger_s:
                        all_ready = False
            else:
                logger.warning(
                    "Prometheus polling is disabled; cannot verify server saturation. "
                    "Waiting %.1f s for power buffer only.",
                    stagger_s,
                )
                if elapsed < stagger_s:
                    all_ready = False

            if all_ready:
                logger.info("Warmup complete in %.1f s", elapsed)
                return

            if now - last_log >= 10.0:
                last_log = now
                if self._prometheus is not None:
                    prom = self._prometheus.get_latest()
                    for d in self._deployments:
                        label = d.model_label
                        running = prom.get(label, {}).get("num_requests_running", 0.0)
                        target = d.initial_batch_size
                        sat_t = saturation_time[label]
                        buf_s = (now - sat_t) if sat_t is not None else 0.0
                        logger.info(
                            "  Warmup %s: num_requests_running=%.0f / %d (%.0f%%), buffer=%.1f / %.1f s",
                            label,
                            running,
                            target,
                            running / max(target, 1) * 100,
                            buf_s,
                            stagger_s,
                        )

            time.sleep(poll_interval_s)

        if self._prometheus is None:
            raise RuntimeError(
                f"Warmup timed out after {timeout_s:.0f} s waiting for power buffer to fill ({stagger_s:.1f} s)"
            )

        prom = self._prometheus.get_latest()
        failed: list[str] = []
        for d in self._deployments:
            label = d.model_label
            running = prom.get(label, {}).get("num_requests_running", 0.0)
            sat_t = saturation_time[label]
            not_saturated = running < d.initial_batch_size * saturation_threshold
            not_buffered = sat_t is None or (time.monotonic() - sat_t) < stagger_s
            if not_saturated or not_buffered:
                failed.append(label)

        parts = [
            f"Warmup timed out after {timeout_s:.0f} s. "
            f"Models that failed to reach {saturation_threshold:.0%} of initial_batch_size:",
        ]
        for label in failed:
            target = self._deployment_map[label].initial_batch_size
            traj = trajectory[label]
            final = traj[-1][1] if traj else 0.0
            parts.append(f"  {label} (target: {target}, reached: {final:.0f}):")
            step = max(1, int(5.0 / poll_interval_s))
            samples = traj[::step]
            if traj and (not samples or samples[-1] is not traj[-1]):
                samples.append(traj[-1])
            entries = [f"t={t:.0f}s: {r:.0f}" for t, r in samples]
            parts.append("    " + ", ".join(entries))
        raise RuntimeError("\n".join(parts))

    def stop(self) -> None:
        """Stop load generation and power streaming."""
        self._load_gen.stop()
        self._power_client.stop()
        self._started = False
        logger.info("OnlineDatacenter stopped")

    def step(self, clock: SimulationClock, events: EventEmitter) -> OnlineDatacenterState:
        """Read live power, augment to datacenter scale, and return state."""
        now, per_gpu_w_by_model = self._poll_power_into_buffer()

        measured_power_by_model: dict[str, float] = {}
        augmentation_factor_by_model: dict[str, float] = {}
        for d in self._deployments:
            label = d.model_label
            measured_power_by_model[label] = per_gpu_w_by_model.get(label, 0.0) * d.num_real_gpus
            augmentation_factor_by_model[label] = d.augmentation_factor

        per_gpu_by_model: dict[str, np.ndarray] = {}
        for d in self._deployments:
            label = d.model_label
            if label not in self._layouts:
                continue
            layout = self._layouts[label]
            per_gpu_by_model[label] = self._rolling_buffer.sample_servers(label, now, layout.stagger_offsets)

        inference_aug = self._inference_augmenter.augment(per_gpu_by_model, clock.time_s)

        measured_total = sum(measured_power_by_model.values())
        measured_per_phase = measured_total / 3.0

        observed_itl: dict[str, float] = {
            d.model_label: self._load_gen.get_observed_itl(d.model_label) for d in self._deployments
        }

        prometheus_metrics: dict[str, dict[str, float]] = {}
        if self._prometheus is not None:
            prometheus_metrics = self._prometheus.get_latest()

        state = OnlineDatacenterState(
            time_s=clock.time_s,
            power_w=ThreePhase(
                a=self._base_W_per_phase + inference_aug.power_w.a,
                b=self._base_W_per_phase + inference_aug.power_w.b,
                c=self._base_W_per_phase + inference_aug.power_w.c,
            ),
            batch_size_by_model={d.model_label: d.batch_size for d in self._deployments},
            active_replicas_by_model=inference_aug.active_replicas_by_model,
            observed_itl_s_by_model=observed_itl,
            measured_power_w=ThreePhase(
                a=measured_per_phase + self._base_W_per_phase,
                b=measured_per_phase + self._base_W_per_phase,
                c=measured_per_phase + self._base_W_per_phase,
            ),
            measured_power_w_by_model=measured_power_by_model,
            augmented_power_w_by_model=inference_aug.power_by_model_w,
            augmentation_factor_by_model=augmentation_factor_by_model,
            prometheus_metrics_by_model=prometheus_metrics,
        )
        return state

    @functools.singledispatchmethod
    def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
        """Apply a control command. Dispatches on command type."""
        raise TypeError(f"OnlineDatacenter does not support {type(command).__name__}")

    @apply_control.register
    def apply_control_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
        """Apply batch size command by sending HTTP requests to vLLM servers."""
        for label, b in command.batch_size_by_model.items():
            label = str(label)
            b_int = int(b)
            if b_int <= 0:
                raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
            dep = self._deployment_map.get(label)
            if dep is not None:
                dep.set_batch_size(b_int, ramp_up_rate=command.ramp_up_rate_by_model.get(label, 0.0))

        events.emit(
            "datacenter.batch_size.updated",
            {"batch_size_by_model": {d.model_label: d.batch_size for d in self._deployments}},
        )

phase_share_by_model property

Per-model phase share vectors derived from server layout.

start()

Start load generation, warm up servers, and fill the power buffer.

Sequence
  1. Run health checks on all vLLM servers and zeusd instances.
  2. Wait for at least one power reading per endpoint (10 s timeout).
  3. Set initial batch sizes on all vLLM servers.
  4. Start load generation threads.
  5. Warm up: poll power into the rolling buffer while waiting for each model's num_requests_running to reach 95% of its initial_batch_size. Fails after 60 s if any model does not saturate.
Source code in openg2g/datacenter/online.py
def start(self) -> None:
    """Start load generation, warm up servers, and fill the power buffer.

    Sequence:
        1. Run health checks on all vLLM servers and zeusd instances.
        2. Wait for at least one power reading per endpoint (10 s timeout).
        3. Set initial batch sizes on all vLLM servers.
        4. Start load generation threads.
        5. Warm up: poll power into the rolling buffer while waiting for
           each model's `num_requests_running` to reach 95% of its
           `initial_batch_size`. Fails after 60 s if any model does not
           saturate.
    """
    if self._started:
        raise RuntimeError("OnlineDatacenter already started")

    logger.info("Starting OnlineDatacenter with %d deployments", len(self._deployments))

    # 1. Health checks
    logger.info("Running health checks...")
    for d in self._deployments:
        _check_vllm_health(d.vllm_base_url)
        _check_vllm_model(d.vllm_base_url, d.spec.model_id)
        for ep in d.gpu_endpoints:
            _check_zeusd_health(ep.host, ep.port)
    logger.info("All health checks passed")

    # 2. Wait for power readings from all endpoints
    all_endpoints: set[str] = set()
    for d in self._deployments:
        for ep in d.gpu_endpoints:
            all_endpoints.add(ep.endpoint_key)

    deadline = time.monotonic() + 10.0
    while time.monotonic() < deadline:
        readings = self._power_client.get_power()
        if all_endpoints.issubset(readings.keys()):
            logger.info("Power readings received from all %d endpoints", len(all_endpoints))
            break
        time.sleep(0.5)
    else:
        connected = set(self._power_client.get_power().keys())
        missing = all_endpoints - connected
        logger.warning("Timed out waiting for power readings from: %s", missing)

    # 3. Set initial batch sizes on vLLM servers
    for d in self._deployments:
        d.set_batch_size(d.initial_batch_size)

    # 4. Start load generation (and Prometheus poller)
    self._load_gen.start()
    logger.info("LoadGenerator started")

    # 5. Warm up: fill power buffer + wait for server saturation
    self._warmup()

    self._started = True
    logger.info("OnlineDatacenter ready")

stop()

Stop load generation and power streaming.

Source code in openg2g/datacenter/online.py
def stop(self) -> None:
    """Stop load generation and power streaming."""
    self._load_gen.stop()
    self._power_client.stop()
    self._started = False
    logger.info("OnlineDatacenter stopped")

step(clock, events)

Read live power, augment to datacenter scale, and return state.

Source code in openg2g/datacenter/online.py
def step(self, clock: SimulationClock, events: EventEmitter) -> OnlineDatacenterState:
    """Read live power, augment to datacenter scale, and return state."""
    now, per_gpu_w_by_model = self._poll_power_into_buffer()

    measured_power_by_model: dict[str, float] = {}
    augmentation_factor_by_model: dict[str, float] = {}
    for d in self._deployments:
        label = d.model_label
        measured_power_by_model[label] = per_gpu_w_by_model.get(label, 0.0) * d.num_real_gpus
        augmentation_factor_by_model[label] = d.augmentation_factor

    per_gpu_by_model: dict[str, np.ndarray] = {}
    for d in self._deployments:
        label = d.model_label
        if label not in self._layouts:
            continue
        layout = self._layouts[label]
        per_gpu_by_model[label] = self._rolling_buffer.sample_servers(label, now, layout.stagger_offsets)

    inference_aug = self._inference_augmenter.augment(per_gpu_by_model, clock.time_s)

    measured_total = sum(measured_power_by_model.values())
    measured_per_phase = measured_total / 3.0

    observed_itl: dict[str, float] = {
        d.model_label: self._load_gen.get_observed_itl(d.model_label) for d in self._deployments
    }

    prometheus_metrics: dict[str, dict[str, float]] = {}
    if self._prometheus is not None:
        prometheus_metrics = self._prometheus.get_latest()

    state = OnlineDatacenterState(
        time_s=clock.time_s,
        power_w=ThreePhase(
            a=self._base_W_per_phase + inference_aug.power_w.a,
            b=self._base_W_per_phase + inference_aug.power_w.b,
            c=self._base_W_per_phase + inference_aug.power_w.c,
        ),
        batch_size_by_model={d.model_label: d.batch_size for d in self._deployments},
        active_replicas_by_model=inference_aug.active_replicas_by_model,
        observed_itl_s_by_model=observed_itl,
        measured_power_w=ThreePhase(
            a=measured_per_phase + self._base_W_per_phase,
            b=measured_per_phase + self._base_W_per_phase,
            c=measured_per_phase + self._base_W_per_phase,
        ),
        measured_power_w_by_model=measured_power_by_model,
        augmented_power_w_by_model=inference_aug.power_by_model_w,
        augmentation_factor_by_model=augmentation_factor_by_model,
        prometheus_metrics_by_model=prometheus_metrics,
    )
    return state

apply_control(command, events)

Apply a control command. Dispatches on command type.

Source code in openg2g/datacenter/online.py
@functools.singledispatchmethod
def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None:
    """Apply a control command. Dispatches on command type."""
    raise TypeError(f"OnlineDatacenter does not support {type(command).__name__}")

apply_control_set_batch_size(command, events)

Apply batch size command by sending HTTP requests to vLLM servers.

Source code in openg2g/datacenter/online.py
@apply_control.register
def apply_control_set_batch_size(self, command: SetBatchSize, events: EventEmitter) -> None:
    """Apply batch size command by sending HTTP requests to vLLM servers."""
    for label, b in command.batch_size_by_model.items():
        label = str(label)
        b_int = int(b)
        if b_int <= 0:
            raise ValueError(f"Batch size must be positive for model {label!r}, got {b_int}.")
        dep = self._deployment_map.get(label)
        if dep is not None:
            dep.set_batch_size(b_int, ramp_up_rate=command.ramp_up_rate_by_model.get(label, 0.0))

    events.emit(
        "datacenter.batch_size.updated",
        {"batch_size_by_model": {d.model_label: d.batch_size for d in self._deployments}},
    )

openg2g.datacenter.workloads.inference

Inference workload: power traces, templates, ITL fits, and augmentation.

MLEnergySource

Bases: BaseModel

Per-model ML.ENERGY benchmark data extraction settings.

Attributes:

Name Type Description
model_label str

Simulation label for the model.

task str

Benchmark task name (e.g. "lm-arena-chat", "gpqa").

gpu str

GPU model name (e.g. "H100").

batch_sizes tuple[int, ...]

Batch sizes to extract from the benchmark data.

fit_exclude_batch_sizes tuple[int, ...]

Batch sizes to exclude from logistic curve fitting (but still included in trace extraction).

Source code in openg2g/datacenter/workloads/inference.py
class MLEnergySource(BaseModel):
    """Per-model ML.ENERGY benchmark data extraction settings.

    Attributes:
        model_label: Simulation label for the model.
        task: Benchmark task name (e.g. `"lm-arena-chat"`, `"gpqa"`).
        gpu: GPU model name (e.g. `"H100"`).
        batch_sizes: Batch sizes to extract from the benchmark data.
        fit_exclude_batch_sizes: Batch sizes to exclude from logistic
            curve fitting (but still included in trace extraction).
    """

    model_config = ConfigDict(frozen=True)

    model_label: str
    task: str
    gpu: str
    batch_sizes: tuple[int, ...]
    fit_exclude_batch_sizes: tuple[int, ...] = ()

InferenceTrace dataclass

A single power trace measurement.

Attributes:

Name Type Description
t_s ndarray

Time vector (seconds), monotonically increasing.

power_w ndarray

Total power vector (watts) across all measured GPUs, same length as t_s.

measured_gpus int

Number of GPUs used in the measurement.

Source code in openg2g/datacenter/workloads/inference.py
@dataclass(frozen=True)
class InferenceTrace:
    """A single power trace measurement.

    Attributes:
        t_s: Time vector (seconds), monotonically increasing.
        power_w: Total power vector (watts) across all measured GPUs,
            same length as `t_s`.
        measured_gpus: Number of GPUs used in the measurement.
    """

    t_s: np.ndarray
    power_w: np.ndarray
    measured_gpus: int

    def __post_init__(self) -> None:
        if len(self.t_s) != len(self.power_w):
            raise ValueError(f"t_s and power_w must have the same length, got {len(self.t_s)} and {len(self.power_w)}")
        if len(self.t_s) < 5:
            raise ValueError("Trace too short (need at least 5 samples).")
        if self.measured_gpus < 1:
            raise ValueError(f"measured_gpus must be >= 1, got {self.measured_gpus}")

ITLFitStore

Per-model, per-batch-size ITL mixture distributions.

Indexed by (model_label, batch_size). Provides:

  • load: load fits from a CSV produced by the data pipeline
  • distributions: access as a nested dict
  • sample_avg: sample a fleet-average ITL value

Attributes:

Name Type Description
COL_MODEL_LABEL

Column name for model label in the CSV.

COL_BATCH_SIZE

Column name for batch size in the CSV.

Source code in openg2g/datacenter/workloads/inference.py
class ITLFitStore:
    """Per-model, per-batch-size ITL mixture distributions.

    Indexed by `(model_label, batch_size)`. Provides:

    - [`load`][.load]: load fits from a CSV produced by the data pipeline
    - [`distributions`][.distributions]: access as a nested dict
    - [`sample_avg`][.sample_avg]: sample a fleet-average ITL value

    Attributes:
        COL_MODEL_LABEL: Column name for model label in the CSV.
        COL_BATCH_SIZE: Column name for batch size in the CSV.
    """

    COL_MODEL_LABEL = "model_label"
    COL_BATCH_SIZE = "max_num_seqs"

    def __init__(
        self,
        distributions: dict[str, dict[int, ITLMixtureModel]],
        approx_sampling_thresh: int = 30,
    ) -> None:
        self._distributions = {
            str(label): {int(b): m for b, m in per_batch.items()} for label, per_batch in distributions.items()
        }
        self._approx_sampling_thresh = int(approx_sampling_thresh)

    @property
    def distributions(self) -> dict[str, dict[int, ITLMixtureModel]]:
        """Nested dict: `model_label -> batch_size -> ITLMixtureModel`."""
        return self._distributions

    def sample_avg(
        self,
        model_label: str,
        batch_size: int,
        n_replicas: int,
        rng: np.random.Generator,
    ) -> float:
        """Sample a fleet-average ITL for the given model and batch size.

        Uses `ITLMixtureModel.sample_avg` under the hood, with the
        `approx_sampling_thresh` set at construction time.

        Args:
            model_label: Model label string.
            batch_size: Current batch size.
            n_replicas: Number of active replicas.
            rng: NumPy random generator for sampling.

        Returns:
            Fleet-average ITL in seconds.

        Raises:
            KeyError: If model or batch size is not in the store.
        """
        model_dists = self._distributions.get(model_label)
        if model_dists is None:
            raise KeyError(f"No ITL distributions for model={model_label!r}")
        params = model_dists.get(int(batch_size))
        if params is None:
            raise KeyError(
                f"No ITL distributions for model={model_label!r}, batch={batch_size}. "
                f"Available={sorted(model_dists.keys())}"
            )
        return params.sample_avg(
            n_replicas=n_replicas,
            rng=rng,
            exact_threshold=self._approx_sampling_thresh,
        )

    @classmethod
    def load(cls, csv_path: Path | str, approx_sampling_thresh: int = 30) -> ITLFitStore:
        """Load ITL mixture fits from a CSV.

        Expected columns: `model_label`, `max_num_seqs`, plus the
        `itl_mix_*` parameter columns produced by
        `ITLMixtureModel.to_dict()`.

        Args:
            csv_path: Path to the latency fits CSV.
            approx_sampling_thresh: Replica count above which sampling
                uses a CLT normal approximation instead of drawing
                individual samples.
        """
        csv_path = Path(csv_path)
        df = pd.read_csv(csv_path)

        required_cols = [cls.COL_MODEL_LABEL, cls.COL_BATCH_SIZE]
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            raise ValueError(f"{csv_path} missing columns: {missing}. Got: {list(df.columns)}")

        distributions: dict[str, dict[int, ITLMixtureModel]] = {}
        for row in df.to_dict(orient="records"):
            label = str(row[cls.COL_MODEL_LABEL]).strip()
            batch = int(row[cls.COL_BATCH_SIZE])
            distributions.setdefault(label, {})[batch] = ITLMixtureModel.from_dict(row)

        if not distributions:
            raise ValueError(f"No ITL mixture rows loaded from {csv_path}")
        return cls(distributions, approx_sampling_thresh=approx_sampling_thresh)

    def save(self, csv_path: Path) -> None:
        """Save ITL mixture fits to a CSV.

        Args:
            csv_path: Output CSV path.
        """
        csv_path = Path(csv_path)
        csv_path.parent.mkdir(parents=True, exist_ok=True)
        rows: list[dict[str, Any]] = []
        for label in sorted(self._distributions):
            for batch in sorted(self._distributions[label]):
                model = self._distributions[label][batch]
                rows.append(
                    {
                        self.COL_MODEL_LABEL: label,
                        self.COL_BATCH_SIZE: batch,
                        "itl_dist": "lognormal_mixture_2",
                        **{f"itl_mix_{k}": v for k, v in model.to_dict().items()},
                    }
                )
        pd.DataFrame(rows).to_csv(csv_path, index=False)

distributions property

Nested dict: model_label -> batch_size -> ITLMixtureModel.

sample_avg(model_label, batch_size, n_replicas, rng)

Sample a fleet-average ITL for the given model and batch size.

Uses ITLMixtureModel.sample_avg under the hood, with the approx_sampling_thresh set at construction time.

Parameters:

Name Type Description Default
model_label str

Model label string.

required
batch_size int

Current batch size.

required
n_replicas int

Number of active replicas.

required
rng Generator

NumPy random generator for sampling.

required

Returns:

Type Description
float

Fleet-average ITL in seconds.

Raises:

Type Description
KeyError

If model or batch size is not in the store.

Source code in openg2g/datacenter/workloads/inference.py
def sample_avg(
    self,
    model_label: str,
    batch_size: int,
    n_replicas: int,
    rng: np.random.Generator,
) -> float:
    """Sample a fleet-average ITL for the given model and batch size.

    Uses `ITLMixtureModel.sample_avg` under the hood, with the
    `approx_sampling_thresh` set at construction time.

    Args:
        model_label: Model label string.
        batch_size: Current batch size.
        n_replicas: Number of active replicas.
        rng: NumPy random generator for sampling.

    Returns:
        Fleet-average ITL in seconds.

    Raises:
        KeyError: If model or batch size is not in the store.
    """
    model_dists = self._distributions.get(model_label)
    if model_dists is None:
        raise KeyError(f"No ITL distributions for model={model_label!r}")
    params = model_dists.get(int(batch_size))
    if params is None:
        raise KeyError(
            f"No ITL distributions for model={model_label!r}, batch={batch_size}. "
            f"Available={sorted(model_dists.keys())}"
        )
    return params.sample_avg(
        n_replicas=n_replicas,
        rng=rng,
        exact_threshold=self._approx_sampling_thresh,
    )

load(csv_path, approx_sampling_thresh=30) classmethod

Load ITL mixture fits from a CSV.

Expected columns: model_label, max_num_seqs, plus the itl_mix_* parameter columns produced by ITLMixtureModel.to_dict().

Parameters:

Name Type Description Default
csv_path Path | str

Path to the latency fits CSV.

required
approx_sampling_thresh int

Replica count above which sampling uses a CLT normal approximation instead of drawing individual samples.

30
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def load(cls, csv_path: Path | str, approx_sampling_thresh: int = 30) -> ITLFitStore:
    """Load ITL mixture fits from a CSV.

    Expected columns: `model_label`, `max_num_seqs`, plus the
    `itl_mix_*` parameter columns produced by
    `ITLMixtureModel.to_dict()`.

    Args:
        csv_path: Path to the latency fits CSV.
        approx_sampling_thresh: Replica count above which sampling
            uses a CLT normal approximation instead of drawing
            individual samples.
    """
    csv_path = Path(csv_path)
    df = pd.read_csv(csv_path)

    required_cols = [cls.COL_MODEL_LABEL, cls.COL_BATCH_SIZE]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"{csv_path} missing columns: {missing}. Got: {list(df.columns)}")

    distributions: dict[str, dict[int, ITLMixtureModel]] = {}
    for row in df.to_dict(orient="records"):
        label = str(row[cls.COL_MODEL_LABEL]).strip()
        batch = int(row[cls.COL_BATCH_SIZE])
        distributions.setdefault(label, {})[batch] = ITLMixtureModel.from_dict(row)

    if not distributions:
        raise ValueError(f"No ITL mixture rows loaded from {csv_path}")
    return cls(distributions, approx_sampling_thresh=approx_sampling_thresh)

save(csv_path)

Save ITL mixture fits to a CSV.

Parameters:

Name Type Description Default
csv_path Path

Output CSV path.

required
Source code in openg2g/datacenter/workloads/inference.py
def save(self, csv_path: Path) -> None:
    """Save ITL mixture fits to a CSV.

    Args:
        csv_path: Output CSV path.
    """
    csv_path = Path(csv_path)
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    rows: list[dict[str, Any]] = []
    for label in sorted(self._distributions):
        for batch in sorted(self._distributions[label]):
            model = self._distributions[label][batch]
            rows.append(
                {
                    self.COL_MODEL_LABEL: label,
                    self.COL_BATCH_SIZE: batch,
                    "itl_dist": "lognormal_mixture_2",
                    **{f"itl_mix_{k}": v for k, v in model.to_dict().items()},
                }
            )
    pd.DataFrame(rows).to_csv(csv_path, index=False)

InferenceTemplateStore

Pre-built per-GPU power templates for a specific simulation config.

Created by InferenceTraceStore.build_templates. Use template to look up a template by model label and batch size.

Source code in openg2g/datacenter/workloads/inference.py
class InferenceTemplateStore:
    """Pre-built per-GPU power templates for a specific simulation config.

    Created by [`InferenceTraceStore.build_templates`][..InferenceTraceStore.build_templates].
    Use [`template`][.template] to look up a template by model label and batch size.
    """

    def __init__(
        self,
        templates: dict[tuple[str, int], np.ndarray],
        batch_sizes_by_model: dict[str, list[int]],
    ) -> None:
        self._templates = templates
        self._batch_sizes_by_model = batch_sizes_by_model

    def template(self, model_label: str, batch_size: int) -> np.ndarray:
        """Return a pre-built per-GPU power template."""
        key = (str(model_label), int(batch_size))
        if key not in self._templates:
            raise KeyError(f"No template for model={model_label!r}, batch={batch_size}.")
        return self._templates[key]

    def batch_sizes(self, model_label: str) -> list[int]:
        """List of batch sizes available for a model."""
        sizes = self._batch_sizes_by_model.get(model_label)
        if sizes is None:
            raise KeyError(f"Unknown model: {model_label!r}")
        return list(sizes)

template(model_label, batch_size)

Return a pre-built per-GPU power template.

Source code in openg2g/datacenter/workloads/inference.py
def template(self, model_label: str, batch_size: int) -> np.ndarray:
    """Return a pre-built per-GPU power template."""
    key = (str(model_label), int(batch_size))
    if key not in self._templates:
        raise KeyError(f"No template for model={model_label!r}, batch={batch_size}.")
    return self._templates[key]

batch_sizes(model_label)

List of batch sizes available for a model.

Source code in openg2g/datacenter/workloads/inference.py
def batch_sizes(self, model_label: str) -> list[int]:
    """List of batch sizes available for a model."""
    sizes = self._batch_sizes_by_model.get(model_label)
    if sizes is None:
        raise KeyError(f"Unknown model: {model_label!r}")
    return list(sizes)

InferenceTraceStore

Manages raw power traces loaded from CSV files.

Indexed by (model_label, batch_size). Provides:

Source code in openg2g/datacenter/workloads/inference.py
class InferenceTraceStore:
    """Manages raw power traces loaded from CSV files.

    Indexed by `(model_label, batch_size)`. Provides:

    - [`load`][.load]: load traces discovered via a manifest CSV
    - [`build_templates`][.build_templates]: build per-GPU power
      templates for a specific simulation config, returning a
      [`InferenceTemplateStore`][..InferenceTemplateStore]
    """

    MANIFEST_COL_MODEL_LABEL = "model_label"
    MANIFEST_COL_NUM_GPUS = "num_gpus"
    MANIFEST_COL_BATCH_SIZE = "max_num_seqs"
    MANIFEST_COL_TRACE_FILE = "trace_file"
    TRACE_COL_TIME = "relative_time_s"
    TRACE_COL_POWER = "power_total_W"

    def __init__(self, traces: dict[str, dict[int, InferenceTrace]]) -> None:
        self._traces = {str(label): {int(b): tr for b, tr in per_batch.items()} for label, per_batch in traces.items()}

    @classmethod
    def load(cls, manifest: Path) -> InferenceTraceStore:
        """Load traces discovered via a manifest CSV.

        Trace file paths in the manifest are resolved relative to the
        manifest file's parent directory.

        Args:
            manifest: Path to the manifest CSV (e.g. `traces_summary.csv`).
                Expected columns: `model_label`, `num_gpus`, `max_num_seqs`,
                `trace_file`.
        """
        manifest = Path(manifest)
        base_dir = manifest.parent
        df = pd.read_csv(manifest)

        required_cols = [
            cls.MANIFEST_COL_MODEL_LABEL,
            cls.MANIFEST_COL_NUM_GPUS,
            cls.MANIFEST_COL_BATCH_SIZE,
            cls.MANIFEST_COL_TRACE_FILE,
        ]
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            raise ValueError(f"Manifest {manifest} missing columns: {missing}. Got: {list(df.columns)}")

        traces: dict[str, dict[int, InferenceTrace]] = {}
        for row in df.to_dict(orient="records"):
            label = str(row[cls.MANIFEST_COL_MODEL_LABEL])
            num_gpus = int(row[cls.MANIFEST_COL_NUM_GPUS])
            batch = int(row[cls.MANIFEST_COL_BATCH_SIZE])
            trace_path = base_dir / str(row[cls.MANIFEST_COL_TRACE_FILE])

            if not trace_path.exists():
                raise FileNotFoundError(f"Trace file not found: {trace_path} (model={label}, batch={batch})")

            tdf = pd.read_csv(trace_path)
            if cls.TRACE_COL_TIME not in tdf.columns or cls.TRACE_COL_POWER not in tdf.columns:
                raise ValueError(
                    f"{trace_path} must contain {cls.TRACE_COL_TIME!r} and "
                    f"{cls.TRACE_COL_POWER!r}. Got: {list(tdf.columns)}"
                )

            t = tdf[cls.TRACE_COL_TIME].to_numpy(float)
            p = tdf[cls.TRACE_COL_POWER].to_numpy(float)
            if np.any(np.diff(t) < 0):
                idx = np.argsort(t)
                t, p = t[idx], p[idx]

            traces.setdefault(label, {})[batch] = InferenceTrace(
                t_s=t,
                power_w=p,
                measured_gpus=num_gpus,
            )

        return cls(traces)

    def build_templates(
        self,
        *,
        duration_s: Fraction | float,
        dt_s: Fraction | float,
        steady_skip_s: float = 0.0,
    ) -> InferenceTemplateStore:
        """Build per-GPU power templates for all traces.

        Args:
            duration_s: Total simulation duration (seconds).
            dt_s: Simulation timestep (seconds).
            steady_skip_s: Skip this many seconds from the start of each
                trace to avoid warm-up transients.

        Returns:
            A [`InferenceTemplateStore`][openg2g.datacenter.workloads.inference.InferenceTemplateStore]
                holding the built templates.
        """
        templates: dict[tuple[str, int], np.ndarray] = {}
        batch_sizes_by_model: dict[str, list[int]] = {}
        for label, per_batch in self._traces.items():
            batch_sizes_by_model[label] = sorted(per_batch.keys())
            for batch, tr in per_batch.items():
                tpl = _build_per_gpu_power_template(
                    tr,
                    dt_s=dt_s,
                    duration_s=duration_s,
                    steady_skip_s=steady_skip_s,
                )
                templates[(label, batch)] = tpl
        return InferenceTemplateStore(templates, batch_sizes_by_model)

    def save(self, out_dir: Path) -> None:
        """Save traces and manifest CSV to a directory.

        Writes individual trace CSVs to `out_dir/traces/` and a manifest
        CSV at `out_dir/traces_summary.csv`.

        Args:
            out_dir: Output directory.
        """
        out_dir = Path(out_dir)
        traces_dir = out_dir / "traces"
        traces_dir.mkdir(parents=True, exist_ok=True)

        summary_rows: list[dict[str, Any]] = []
        for label in sorted(self._traces):
            for batch in sorted(self._traces[label]):
                tr = self._traces[label][batch]
                trace_name = f"{label}_num_gpus_{tr.measured_gpus}_max_num_seqs_{batch}.csv"
                pd.DataFrame(
                    {
                        self.TRACE_COL_TIME: tr.t_s,
                        self.TRACE_COL_POWER: tr.power_w,
                    }
                ).to_csv(traces_dir / trace_name, index=False)
                summary_rows.append(
                    {
                        self.MANIFEST_COL_MODEL_LABEL: label,
                        self.MANIFEST_COL_NUM_GPUS: tr.measured_gpus,
                        self.MANIFEST_COL_BATCH_SIZE: batch,
                        self.MANIFEST_COL_TRACE_FILE: f"traces/{trace_name}",
                    }
                )
        pd.DataFrame(summary_rows).to_csv(out_dir / "traces_summary.csv", index=False)

load(manifest) classmethod

Load traces discovered via a manifest CSV.

Trace file paths in the manifest are resolved relative to the manifest file's parent directory.

Parameters:

Name Type Description Default
manifest Path

Path to the manifest CSV (e.g. traces_summary.csv). Expected columns: model_label, num_gpus, max_num_seqs, trace_file.

required
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def load(cls, manifest: Path) -> InferenceTraceStore:
    """Load traces discovered via a manifest CSV.

    Trace file paths in the manifest are resolved relative to the
    manifest file's parent directory.

    Args:
        manifest: Path to the manifest CSV (e.g. `traces_summary.csv`).
            Expected columns: `model_label`, `num_gpus`, `max_num_seqs`,
            `trace_file`.
    """
    manifest = Path(manifest)
    base_dir = manifest.parent
    df = pd.read_csv(manifest)

    required_cols = [
        cls.MANIFEST_COL_MODEL_LABEL,
        cls.MANIFEST_COL_NUM_GPUS,
        cls.MANIFEST_COL_BATCH_SIZE,
        cls.MANIFEST_COL_TRACE_FILE,
    ]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Manifest {manifest} missing columns: {missing}. Got: {list(df.columns)}")

    traces: dict[str, dict[int, InferenceTrace]] = {}
    for row in df.to_dict(orient="records"):
        label = str(row[cls.MANIFEST_COL_MODEL_LABEL])
        num_gpus = int(row[cls.MANIFEST_COL_NUM_GPUS])
        batch = int(row[cls.MANIFEST_COL_BATCH_SIZE])
        trace_path = base_dir / str(row[cls.MANIFEST_COL_TRACE_FILE])

        if not trace_path.exists():
            raise FileNotFoundError(f"Trace file not found: {trace_path} (model={label}, batch={batch})")

        tdf = pd.read_csv(trace_path)
        if cls.TRACE_COL_TIME not in tdf.columns or cls.TRACE_COL_POWER not in tdf.columns:
            raise ValueError(
                f"{trace_path} must contain {cls.TRACE_COL_TIME!r} and "
                f"{cls.TRACE_COL_POWER!r}. Got: {list(tdf.columns)}"
            )

        t = tdf[cls.TRACE_COL_TIME].to_numpy(float)
        p = tdf[cls.TRACE_COL_POWER].to_numpy(float)
        if np.any(np.diff(t) < 0):
            idx = np.argsort(t)
            t, p = t[idx], p[idx]

        traces.setdefault(label, {})[batch] = InferenceTrace(
            t_s=t,
            power_w=p,
            measured_gpus=num_gpus,
        )

    return cls(traces)

build_templates(*, duration_s, dt_s, steady_skip_s=0.0)

Build per-GPU power templates for all traces.

Parameters:

Name Type Description Default
duration_s Fraction | float

Total simulation duration (seconds).

required
dt_s Fraction | float

Simulation timestep (seconds).

required
steady_skip_s float

Skip this many seconds from the start of each trace to avoid warm-up transients.

0.0

Returns:

Type Description
InferenceTemplateStore

A InferenceTemplateStore holding the built templates.

Source code in openg2g/datacenter/workloads/inference.py
def build_templates(
    self,
    *,
    duration_s: Fraction | float,
    dt_s: Fraction | float,
    steady_skip_s: float = 0.0,
) -> InferenceTemplateStore:
    """Build per-GPU power templates for all traces.

    Args:
        duration_s: Total simulation duration (seconds).
        dt_s: Simulation timestep (seconds).
        steady_skip_s: Skip this many seconds from the start of each
            trace to avoid warm-up transients.

    Returns:
        A [`InferenceTemplateStore`][openg2g.datacenter.workloads.inference.InferenceTemplateStore]
            holding the built templates.
    """
    templates: dict[tuple[str, int], np.ndarray] = {}
    batch_sizes_by_model: dict[str, list[int]] = {}
    for label, per_batch in self._traces.items():
        batch_sizes_by_model[label] = sorted(per_batch.keys())
        for batch, tr in per_batch.items():
            tpl = _build_per_gpu_power_template(
                tr,
                dt_s=dt_s,
                duration_s=duration_s,
                steady_skip_s=steady_skip_s,
            )
            templates[(label, batch)] = tpl
    return InferenceTemplateStore(templates, batch_sizes_by_model)

save(out_dir)

Save traces and manifest CSV to a directory.

Writes individual trace CSVs to out_dir/traces/ and a manifest CSV at out_dir/traces_summary.csv.

Parameters:

Name Type Description Default
out_dir Path

Output directory.

required
Source code in openg2g/datacenter/workloads/inference.py
def save(self, out_dir: Path) -> None:
    """Save traces and manifest CSV to a directory.

    Writes individual trace CSVs to `out_dir/traces/` and a manifest
    CSV at `out_dir/traces_summary.csv`.

    Args:
        out_dir: Output directory.
    """
    out_dir = Path(out_dir)
    traces_dir = out_dir / "traces"
    traces_dir.mkdir(parents=True, exist_ok=True)

    summary_rows: list[dict[str, Any]] = []
    for label in sorted(self._traces):
        for batch in sorted(self._traces[label]):
            tr = self._traces[label][batch]
            trace_name = f"{label}_num_gpus_{tr.measured_gpus}_max_num_seqs_{batch}.csv"
            pd.DataFrame(
                {
                    self.TRACE_COL_TIME: tr.t_s,
                    self.TRACE_COL_POWER: tr.power_w,
                }
            ).to_csv(traces_dir / trace_name, index=False)
            summary_rows.append(
                {
                    self.MANIFEST_COL_MODEL_LABEL: label,
                    self.MANIFEST_COL_NUM_GPUS: tr.measured_gpus,
                    self.MANIFEST_COL_BATCH_SIZE: batch,
                    self.MANIFEST_COL_TRACE_FILE: f"traces/{trace_name}",
                }
            )
    pd.DataFrame(summary_rows).to_csv(out_dir / "traces_summary.csv", index=False)

InferenceData

LLM inference workload with offline simulation data.

Bundles model specifications with power templates and latency distributions. Validates that all models have matching data entries.

Parameters:

Name Type Description Default
models tuple[InferenceModelSpec, ...]

Model specifications as a tuple of InferenceModelSpec.

required
power_templates InferenceTemplateStore

Pre-built per-GPU power templates for all models and batch sizes, created via InferenceTraceStore.build_templates.

required
itl_fits ITLFitStore | None

Per-model ITL mixture distributions. Required when using controllers that read observed latency (e.g., OFOBatchSizeController). When omitted, NaN is reported for observed latency.

None
Source code in openg2g/datacenter/workloads/inference.py
class InferenceData:
    """LLM inference workload with offline simulation data.

    Bundles model specifications with power templates and latency
    distributions. Validates that all models have matching data entries.

    Args:
        models: Model specifications as a tuple of
            [`InferenceModelSpec`][openg2g.datacenter.config.InferenceModelSpec].
        power_templates: Pre-built per-GPU power templates for all models
            and batch sizes, created via
            [`InferenceTraceStore.build_templates`][..InferenceTraceStore.build_templates].
        itl_fits: Per-model ITL mixture distributions. Required when using
            controllers that read observed latency (e.g.,
            `OFOBatchSizeController`). When omitted, NaN is reported for
            observed latency.
    """

    def __init__(
        self,
        models: tuple[InferenceModelSpec, ...],
        *,
        power_templates: InferenceTemplateStore,
        itl_fits: ITLFitStore | None = None,
    ) -> None:
        if isinstance(power_templates, InferenceTraceStore):
            raise TypeError(
                "Expected a InferenceTemplateStore, got InferenceTraceStore. "
                "Call InferenceTraceStore.build_templates() first to create a InferenceTemplateStore."
            )
        if not models:
            raise ValueError("models must not be empty.")
        labels = [ms.model_label for ms in models]
        if len(labels) != len(set(labels)):
            raise ValueError(f"Duplicate model labels: {labels}")

        self._models = models
        self._power_templates: InferenceTemplateStore | None = power_templates
        self._trace_store: InferenceTraceStore | None = None
        self._itl_fit_store: ITLFitStore | None = None
        self._itl_fits = itl_fits
        self._itl_samples_df: pd.DataFrame | None = None

        for ms in self._models:
            try:
                power_templates.batch_sizes(ms.model_label)
            except KeyError:
                raise ValueError(
                    f"Power templates missing for model {ms.model_label!r}. "
                    f"Ensure InferenceTraceStore contains traces for all models."
                ) from None

            if itl_fits is not None and ms.model_label not in itl_fits.distributions:
                raise ValueError(
                    f"ITL fits missing for model {ms.model_label!r}. "
                    f"Available models in ITLFitStore: {sorted(itl_fits.distributions.keys())}"
                )

    def filter_models(
        self,
        models: tuple[InferenceModelSpec, ...],
    ) -> InferenceData:
        """Return a new InferenceData containing only the specified models."""
        labels = {ms.model_label for ms in models}

        # Filter power templates
        filtered_templates = {k: v for k, v in self._power_templates._templates.items() if k[0] in labels}
        filtered_batch_sizes = {k: v for k, v in self._power_templates._batch_sizes_by_model.items() if k in labels}
        new_templates = InferenceTemplateStore(filtered_templates, filtered_batch_sizes)

        # Filter ITL fits
        new_itl_fits = None
        if self._itl_fits is not None:
            filtered_dists = {k: v for k, v in self._itl_fits.distributions.items() if k in labels}
            new_itl_fits = ITLFitStore(filtered_dists)

        return InferenceData(models, power_templates=new_templates, itl_fits=new_itl_fits)

    @classmethod
    def generate(
        cls,
        models: tuple[InferenceModelSpec, ...],
        data_sources: dict[str, MLEnergySource],
        *,
        runs: Any = None,
        mlenergy_data_dir: Path | None = None,
        dt_s: float = 0.1,
        seed: int = 0,
        itl_sample_cap: int = 2048,
    ) -> InferenceData:
        """Generate inference data from ML.ENERGY benchmark data.

        Produces power traces and ITL mixture fits for all models and
        batch sizes specified in `data_sources`.

        Args:
            models: Model specifications.
            data_sources: Per-model benchmark data extraction settings,
                keyed by `model_label`.
            runs: Pre-loaded `LLMRuns` object. If `None`, loads from
                `mlenergy_data_dir` or the HuggingFace Hub.
            mlenergy_data_dir: Path to compiled mlenergy-data directory.
                Ignored if `runs` is provided.
            dt_s: Trace timestep (seconds).
            seed: Random seed for ITL fitting.
            itl_sample_cap: Maximum ITL samples per run for fitting.

        Returns:
            A new `InferenceData` with generated traces and ITL fits (no
            templates — call `InferenceTraceStore.build_templates()` on the
            saved/loaded store to get templates).
        """
        if runs is None:
            unique_tasks = {src.task for src in data_sources.values()}
            if mlenergy_data_dir:
                logger.info("Loading runs from %s (tasks: %s)", mlenergy_data_dir, sorted(unique_tasks))
                runs = LLMRuns.from_directory(str(mlenergy_data_dir), stable_only=False).task(*unique_tasks)
            else:
                logger.info("Loading runs from Hugging Face Hub (tasks: %s)", sorted(unique_tasks))
                runs = LLMRuns.from_hf(stable_only=False).task(*unique_tasks)
        if not runs:
            raise ValueError("No runs found for the specified tasks")

        subsets_by_label: dict[str, Any] = {}
        tl_frames: list[pd.DataFrame] = []
        itl_frames: list[pd.DataFrame] = []

        for ms in models:
            src = data_sources.get(ms.model_label)
            if src is None:
                raise ValueError(f"No data source for model {ms.model_label!r}")
            model_id = ms.model_id
            if not model_id:
                raise ValueError(f"model_id is required for data generation (model={ms.model_label!r})")

            subset = (
                runs.model_id(model_id).gpu_model(src.gpu).num_gpus(ms.gpus_per_replica).max_num_seqs(*src.batch_sizes)
            )
            if not subset:
                raise ValueError(
                    f"Config matched zero runs: model_id={model_id!r}, "
                    f"gpu={src.gpu!r}, num_gpus={ms.gpus_per_replica}, "
                    f"batch_sizes={src.batch_sizes}"
                )
            subsets_by_label[ms.model_label] = subset
            logger.info(
                "%s: %d runs (model_id=%s, gpu=%s, num_gpus=%d, batches=%s)",
                ms.model_label,
                len(subset),
                model_id,
                src.gpu,
                ms.gpus_per_replica,
                sorted({r.max_num_seqs for r in subset}),
            )

        logger.info("Downloading raw result files for %d models ...", len(subsets_by_label))
        for subset in subsets_by_label.values():
            subset.download_raw_files(file="results")
        logger.info("Downloads complete. Extracting timelines and ITL samples ...")

        for label, subset in subsets_by_label.items():
            for run in subset:
                tl = run.timelines(metric="power.device_instant")
                tl["model_label"] = label
                tl["num_gpus"] = run.num_gpus
                tl["max_num_seqs"] = run.max_num_seqs
                tl["run_index"] = len(tl_frames)
                tl_frames.append(tl)

            itl = subset.inter_token_latencies()
            itl["model_label"] = label
            itl_frames.append(itl)

        all_tl = pd.concat(tl_frames, ignore_index=True)
        itl_samples_df = pd.concat(itl_frames, ignore_index=True)
        logger.info("Building trace store (%d timeline rows) and fitting ITL models ...", len(all_tl))

        trace_store = _build_trace_store_from_timelines(all_tl, dt_s=dt_s)
        itl_fit_store = _build_itl_fit_store(itl_samples_df, max_samples=itl_sample_cap, seed=seed)

        return cls._from_stores(
            models,
            trace_store=trace_store,
            itl_fit_store=itl_fit_store,
            itl_samples_df=itl_samples_df,
        )

    @classmethod
    def _from_stores(
        cls,
        models: tuple[InferenceModelSpec, ...],
        *,
        trace_store: InferenceTraceStore,
        itl_fit_store: ITLFitStore,
        itl_samples_df: pd.DataFrame | None = None,
    ) -> InferenceData:
        """Create from raw stores (internal, used by generate)."""
        instance = object.__new__(cls)
        instance._models = models
        instance._trace_store = trace_store
        instance._itl_fit_store = itl_fit_store
        instance._power_templates = None
        instance._itl_fits = itl_fit_store
        instance._itl_samples_df = itl_samples_df
        return instance

    def save(self, out_dir: Path, *, plot: bool = False) -> None:
        """Save traces and ITL fits to a directory.

        Args:
            out_dir: Output directory.
            plot: If `True`, also write characterization plots (power
                trajectories, ITL distributions).
        """
        out_dir = Path(out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)
        if self._trace_store is not None:
            self._trace_store.save(out_dir)
        if self._itl_fits is not None:
            self._itl_fits.save(out_dir / "latency_fits.csv")

        (out_dir / "_manifest.json").write_text(
            json.dumps({"openg2g_version": openg2g.__version__}, indent=2, sort_keys=True)
        )

        if plot and self._trace_store is not None:
            _plot_power_trajectories(self._trace_store, self._models, out_dir)
            itl_samples = self._itl_samples_df
            if self._itl_fit_store is not None and itl_samples is not None:
                for ms in self._models:
                    _plot_itl_distributions(self._itl_fit_store, itl_samples, ms.model_label, out_dir)

    @classmethod
    def load(
        cls,
        data_dir: Path,
        models: tuple[InferenceModelSpec, ...],
        *,
        duration_s: float = 600.0,
        dt_s: float = 0.1,
        steady_skip_s: float = 0.0,
    ) -> InferenceData:
        """Load from a generated data directory.

        Loads traces from `traces_summary.csv`, builds templates, and
        loads ITL fits from `latency_fits.csv`.

        Args:
            data_dir: Directory containing generated data.
            models: Model specifications.
            duration_s: Simulation duration for template building.
            dt_s: Simulation timestep for template building.
            steady_skip_s: Skip seconds for template building.
        """
        data_dir = Path(data_dir)
        _check_version_stamp(data_dir, "InferenceData")
        store = InferenceTraceStore.load(data_dir / "traces_summary.csv")
        templates = store.build_templates(duration_s=duration_s, dt_s=dt_s, steady_skip_s=steady_skip_s)
        itl_fits = ITLFitStore.load(data_dir / "latency_fits.csv")
        return cls(models, power_templates=templates, itl_fits=itl_fits)

    @classmethod
    def ensure(
        cls,
        data_dir: Path,
        models: tuple[InferenceModelSpec, ...],
        data_sources: dict[str, MLEnergySource] | None = None,
        *,
        mlenergy_data_dir: Path | None = None,
        plot: bool = False,
        duration_s: float = 600.0,
        dt_s: float = 0.1,
        steady_skip_s: float = 0.0,
    ) -> InferenceData:
        """Load from `data_dir`, generating first if needed.

        If `data_dir/traces_summary.csv` does not exist, generates
        inference data from ML.ENERGY benchmark data and saves it.
        Then loads and returns.

        Args:
            data_dir: Data directory (generated files go here).
            models: Model specifications.
            data_sources: Per-model benchmark data extraction settings,
                keyed by `model_label`. Required when no cached data exists.
            mlenergy_data_dir: Path to compiled mlenergy-data directory.
            plot: If `True`, generate characterization plots on generation.
            duration_s: Simulation duration for template building.
            dt_s: Simulation timestep for template building.
            steady_skip_s: Skip seconds for template building.
        """
        data_dir = Path(data_dir)
        if not (data_dir / "traces_summary.csv").exists():
            if data_sources is None:
                raise ValueError("data_sources required for InferenceData generation (no cached data)")
            logger.info("Generating inference data to %s ...", data_dir)
            cls.generate(
                models,
                data_sources,
                mlenergy_data_dir=mlenergy_data_dir,
                dt_s=dt_s,
            ).save(data_dir, plot=plot)
        return cls.load(data_dir, models, duration_s=duration_s, dt_s=dt_s, steady_skip_s=steady_skip_s)

    @property
    def models(self) -> tuple[InferenceModelSpec, ...]:
        """The model specifications."""
        return self._models

    @property
    def power_templates(self) -> InferenceTemplateStore:
        if self._power_templates is None:
            raise RuntimeError("power_templates not available (generate-only instance). Load from disk first.")
        return self._power_templates

    @property
    def itl_fits(self) -> ITLFitStore | None:
        return self._itl_fits

models property

The model specifications.

filter_models(models)

Return a new InferenceData containing only the specified models.

Source code in openg2g/datacenter/workloads/inference.py
def filter_models(
    self,
    models: tuple[InferenceModelSpec, ...],
) -> InferenceData:
    """Return a new InferenceData containing only the specified models."""
    labels = {ms.model_label for ms in models}

    # Filter power templates
    filtered_templates = {k: v for k, v in self._power_templates._templates.items() if k[0] in labels}
    filtered_batch_sizes = {k: v for k, v in self._power_templates._batch_sizes_by_model.items() if k in labels}
    new_templates = InferenceTemplateStore(filtered_templates, filtered_batch_sizes)

    # Filter ITL fits
    new_itl_fits = None
    if self._itl_fits is not None:
        filtered_dists = {k: v for k, v in self._itl_fits.distributions.items() if k in labels}
        new_itl_fits = ITLFitStore(filtered_dists)

    return InferenceData(models, power_templates=new_templates, itl_fits=new_itl_fits)

generate(models, data_sources, *, runs=None, mlenergy_data_dir=None, dt_s=0.1, seed=0, itl_sample_cap=2048) classmethod

Generate inference data from ML.ENERGY benchmark data.

Produces power traces and ITL mixture fits for all models and batch sizes specified in data_sources.

Parameters:

Name Type Description Default
models tuple[InferenceModelSpec, ...]

Model specifications.

required
data_sources dict[str, MLEnergySource]

Per-model benchmark data extraction settings, keyed by model_label.

required
runs Any

Pre-loaded LLMRuns object. If None, loads from mlenergy_data_dir or the HuggingFace Hub.

None
mlenergy_data_dir Path | None

Path to compiled mlenergy-data directory. Ignored if runs is provided.

None
dt_s float

Trace timestep (seconds).

0.1
seed int

Random seed for ITL fitting.

0
itl_sample_cap int

Maximum ITL samples per run for fitting.

2048

Returns:

Type Description
InferenceData

A new InferenceData with generated traces and ITL fits (no

InferenceData

templates — call InferenceTraceStore.build_templates() on the

InferenceData

saved/loaded store to get templates).

Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def generate(
    cls,
    models: tuple[InferenceModelSpec, ...],
    data_sources: dict[str, MLEnergySource],
    *,
    runs: Any = None,
    mlenergy_data_dir: Path | None = None,
    dt_s: float = 0.1,
    seed: int = 0,
    itl_sample_cap: int = 2048,
) -> InferenceData:
    """Generate inference data from ML.ENERGY benchmark data.

    Produces power traces and ITL mixture fits for all models and
    batch sizes specified in `data_sources`.

    Args:
        models: Model specifications.
        data_sources: Per-model benchmark data extraction settings,
            keyed by `model_label`.
        runs: Pre-loaded `LLMRuns` object. If `None`, loads from
            `mlenergy_data_dir` or the HuggingFace Hub.
        mlenergy_data_dir: Path to compiled mlenergy-data directory.
            Ignored if `runs` is provided.
        dt_s: Trace timestep (seconds).
        seed: Random seed for ITL fitting.
        itl_sample_cap: Maximum ITL samples per run for fitting.

    Returns:
        A new `InferenceData` with generated traces and ITL fits (no
        templates — call `InferenceTraceStore.build_templates()` on the
        saved/loaded store to get templates).
    """
    if runs is None:
        unique_tasks = {src.task for src in data_sources.values()}
        if mlenergy_data_dir:
            logger.info("Loading runs from %s (tasks: %s)", mlenergy_data_dir, sorted(unique_tasks))
            runs = LLMRuns.from_directory(str(mlenergy_data_dir), stable_only=False).task(*unique_tasks)
        else:
            logger.info("Loading runs from Hugging Face Hub (tasks: %s)", sorted(unique_tasks))
            runs = LLMRuns.from_hf(stable_only=False).task(*unique_tasks)
    if not runs:
        raise ValueError("No runs found for the specified tasks")

    subsets_by_label: dict[str, Any] = {}
    tl_frames: list[pd.DataFrame] = []
    itl_frames: list[pd.DataFrame] = []

    for ms in models:
        src = data_sources.get(ms.model_label)
        if src is None:
            raise ValueError(f"No data source for model {ms.model_label!r}")
        model_id = ms.model_id
        if not model_id:
            raise ValueError(f"model_id is required for data generation (model={ms.model_label!r})")

        subset = (
            runs.model_id(model_id).gpu_model(src.gpu).num_gpus(ms.gpus_per_replica).max_num_seqs(*src.batch_sizes)
        )
        if not subset:
            raise ValueError(
                f"Config matched zero runs: model_id={model_id!r}, "
                f"gpu={src.gpu!r}, num_gpus={ms.gpus_per_replica}, "
                f"batch_sizes={src.batch_sizes}"
            )
        subsets_by_label[ms.model_label] = subset
        logger.info(
            "%s: %d runs (model_id=%s, gpu=%s, num_gpus=%d, batches=%s)",
            ms.model_label,
            len(subset),
            model_id,
            src.gpu,
            ms.gpus_per_replica,
            sorted({r.max_num_seqs for r in subset}),
        )

    logger.info("Downloading raw result files for %d models ...", len(subsets_by_label))
    for subset in subsets_by_label.values():
        subset.download_raw_files(file="results")
    logger.info("Downloads complete. Extracting timelines and ITL samples ...")

    for label, subset in subsets_by_label.items():
        for run in subset:
            tl = run.timelines(metric="power.device_instant")
            tl["model_label"] = label
            tl["num_gpus"] = run.num_gpus
            tl["max_num_seqs"] = run.max_num_seqs
            tl["run_index"] = len(tl_frames)
            tl_frames.append(tl)

        itl = subset.inter_token_latencies()
        itl["model_label"] = label
        itl_frames.append(itl)

    all_tl = pd.concat(tl_frames, ignore_index=True)
    itl_samples_df = pd.concat(itl_frames, ignore_index=True)
    logger.info("Building trace store (%d timeline rows) and fitting ITL models ...", len(all_tl))

    trace_store = _build_trace_store_from_timelines(all_tl, dt_s=dt_s)
    itl_fit_store = _build_itl_fit_store(itl_samples_df, max_samples=itl_sample_cap, seed=seed)

    return cls._from_stores(
        models,
        trace_store=trace_store,
        itl_fit_store=itl_fit_store,
        itl_samples_df=itl_samples_df,
    )

save(out_dir, *, plot=False)

Save traces and ITL fits to a directory.

Parameters:

Name Type Description Default
out_dir Path

Output directory.

required
plot bool

If True, also write characterization plots (power trajectories, ITL distributions).

False
Source code in openg2g/datacenter/workloads/inference.py
def save(self, out_dir: Path, *, plot: bool = False) -> None:
    """Save traces and ITL fits to a directory.

    Args:
        out_dir: Output directory.
        plot: If `True`, also write characterization plots (power
            trajectories, ITL distributions).
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    if self._trace_store is not None:
        self._trace_store.save(out_dir)
    if self._itl_fits is not None:
        self._itl_fits.save(out_dir / "latency_fits.csv")

    (out_dir / "_manifest.json").write_text(
        json.dumps({"openg2g_version": openg2g.__version__}, indent=2, sort_keys=True)
    )

    if plot and self._trace_store is not None:
        _plot_power_trajectories(self._trace_store, self._models, out_dir)
        itl_samples = self._itl_samples_df
        if self._itl_fit_store is not None and itl_samples is not None:
            for ms in self._models:
                _plot_itl_distributions(self._itl_fit_store, itl_samples, ms.model_label, out_dir)

load(data_dir, models, *, duration_s=600.0, dt_s=0.1, steady_skip_s=0.0) classmethod

Load from a generated data directory.

Loads traces from traces_summary.csv, builds templates, and loads ITL fits from latency_fits.csv.

Parameters:

Name Type Description Default
data_dir Path

Directory containing generated data.

required
models tuple[InferenceModelSpec, ...]

Model specifications.

required
duration_s float

Simulation duration for template building.

600.0
dt_s float

Simulation timestep for template building.

0.1
steady_skip_s float

Skip seconds for template building.

0.0
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def load(
    cls,
    data_dir: Path,
    models: tuple[InferenceModelSpec, ...],
    *,
    duration_s: float = 600.0,
    dt_s: float = 0.1,
    steady_skip_s: float = 0.0,
) -> InferenceData:
    """Load from a generated data directory.

    Loads traces from `traces_summary.csv`, builds templates, and
    loads ITL fits from `latency_fits.csv`.

    Args:
        data_dir: Directory containing generated data.
        models: Model specifications.
        duration_s: Simulation duration for template building.
        dt_s: Simulation timestep for template building.
        steady_skip_s: Skip seconds for template building.
    """
    data_dir = Path(data_dir)
    _check_version_stamp(data_dir, "InferenceData")
    store = InferenceTraceStore.load(data_dir / "traces_summary.csv")
    templates = store.build_templates(duration_s=duration_s, dt_s=dt_s, steady_skip_s=steady_skip_s)
    itl_fits = ITLFitStore.load(data_dir / "latency_fits.csv")
    return cls(models, power_templates=templates, itl_fits=itl_fits)

ensure(data_dir, models, data_sources=None, *, mlenergy_data_dir=None, plot=False, duration_s=600.0, dt_s=0.1, steady_skip_s=0.0) classmethod

Load from data_dir, generating first if needed.

If data_dir/traces_summary.csv does not exist, generates inference data from ML.ENERGY benchmark data and saves it. Then loads and returns.

Parameters:

Name Type Description Default
data_dir Path

Data directory (generated files go here).

required
models tuple[InferenceModelSpec, ...]

Model specifications.

required
data_sources dict[str, MLEnergySource] | None

Per-model benchmark data extraction settings, keyed by model_label. Required when no cached data exists.

None
mlenergy_data_dir Path | None

Path to compiled mlenergy-data directory.

None
plot bool

If True, generate characterization plots on generation.

False
duration_s float

Simulation duration for template building.

600.0
dt_s float

Simulation timestep for template building.

0.1
steady_skip_s float

Skip seconds for template building.

0.0
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def ensure(
    cls,
    data_dir: Path,
    models: tuple[InferenceModelSpec, ...],
    data_sources: dict[str, MLEnergySource] | None = None,
    *,
    mlenergy_data_dir: Path | None = None,
    plot: bool = False,
    duration_s: float = 600.0,
    dt_s: float = 0.1,
    steady_skip_s: float = 0.0,
) -> InferenceData:
    """Load from `data_dir`, generating first if needed.

    If `data_dir/traces_summary.csv` does not exist, generates
    inference data from ML.ENERGY benchmark data and saves it.
    Then loads and returns.

    Args:
        data_dir: Data directory (generated files go here).
        models: Model specifications.
        data_sources: Per-model benchmark data extraction settings,
            keyed by `model_label`. Required when no cached data exists.
        mlenergy_data_dir: Path to compiled mlenergy-data directory.
        plot: If `True`, generate characterization plots on generation.
        duration_s: Simulation duration for template building.
        dt_s: Simulation timestep for template building.
        steady_skip_s: Skip seconds for template building.
    """
    data_dir = Path(data_dir)
    if not (data_dir / "traces_summary.csv").exists():
        if data_sources is None:
            raise ValueError("data_sources required for InferenceData generation (no cached data)")
        logger.info("Generating inference data to %s ...", data_dir)
        cls.generate(
            models,
            data_sources,
            mlenergy_data_dir=mlenergy_data_dir,
            dt_s=dt_s,
        ).save(data_dir, plot=plot)
    return cls.load(data_dir, models, duration_s=duration_s, dt_s=dt_s, steady_skip_s=steady_skip_s)

InferenceAugmentedPower dataclass

Result of inference power augmentation for one simulation timestep.

Attributes:

Name Type Description
power_w ThreePhase

Three-phase inference power (watts), excluding base load.

power_by_model_w dict[str, float]

Per-model total active power (watts).

active_replicas_by_model dict[str, int]

Per-model active replica count.

Source code in openg2g/datacenter/workloads/inference.py
@dataclass(frozen=True)
class InferenceAugmentedPower:
    """Result of inference power augmentation for one simulation timestep.

    Attributes:
        power_w: Three-phase inference power (watts), excluding base load.
        power_by_model_w: Per-model total active power (watts).
        active_replicas_by_model: Per-model active replica count.
    """

    power_w: ThreePhase
    power_by_model_w: dict[str, float] = field(default_factory=dict)
    active_replicas_by_model: dict[str, int] = field(default_factory=dict)

InferencePowerAugmenter

Scales per-GPU inference power through server layouts to three-phase power.

Given per-GPU power values for each server (one value per server per model), applies per-server scaling, noise, activation masking, and phase summation to produce inference-level three-phase power.

This class is backend-agnostic. The offline datacenter feeds it template-indexed values; the online datacenter can feed it live-measured values. The datacenter backend is responsible for adding facility base load on top of the returned inference power.

Parameters:

Name Type Description Default
layouts dict[str, ServerLayout]

Per-model server layouts (physical topology).

required
policies dict[str, ActivationPolicy]

Per-model activation policies determining which servers are active at each timestep.

required
seed int

Random seed for noise RNG.

0
Source code in openg2g/datacenter/workloads/inference.py
class InferencePowerAugmenter:
    """Scales per-GPU inference power through server layouts to three-phase power.

    Given per-GPU power values for each server (one value per server per
    model), applies per-server scaling, noise, activation masking, and
    phase summation to produce inference-level three-phase power.

    This class is backend-agnostic. The offline datacenter feeds it
    template-indexed values; the online datacenter can feed it
    live-measured values. The datacenter backend is responsible for
    adding facility base load on top of the returned inference power.

    Args:
        layouts: Per-model server layouts (physical topology).
        policies: Per-model activation policies determining which servers
            are active at each timestep.
        seed: Random seed for noise RNG.
    """

    def __init__(
        self,
        layouts: dict[str, ServerLayout],
        policies: dict[str, ActivationPolicy],
        seed: int = 0,
    ) -> None:
        self._layouts = layouts
        self._policies = policies
        self._seed = int(seed)
        self._rng = np.random.default_rng(self._seed)

    def augment(
        self,
        per_gpu_by_model: dict[str, np.ndarray],
        t: float,
    ) -> InferenceAugmentedPower:
        """Augment per-server per-GPU power to three-phase power.

        Args:
            per_gpu_by_model: Mapping of model label to per-GPU power
                array of shape `(num_servers,)`. Only models with active
                replicas should be included.
            t: Current simulation time (seconds).

        Returns:
            Augmented inference power with three-phase totals, per-model
                power, and per-model active replica counts.
        """
        phase_power = np.zeros(3, dtype=float)
        power_by_model: dict[str, float] = {}
        active_replicas_by_model: dict[str, int] = {}

        for label, per_gpu in per_gpu_by_model.items():
            layout = self._layouts[label]
            policy = self._policies[label]

            server_powers = per_gpu * layout.gpus_per_server_list * layout.amplitude_scales
            if layout.noise_fraction > 0:
                levels = np.maximum(server_powers, 1.0)
                server_powers += self._rng.normal(0.0, 1.0, size=layout.num_servers) * layout.noise_fraction * levels
            server_powers = np.maximum(server_powers, 0.0)

            active_indices = policy.active_indices(t)
            active_powers = server_powers[active_indices]
            active_phases = layout.phase_list[active_indices]

            model_phase_power = np.zeros(3, dtype=float)
            np.add.at(model_phase_power, active_phases, active_powers)
            phase_power += model_phase_power

            power_by_model[label] = float(np.sum(active_powers))
            active_gpus = int(np.sum(layout.gpus_per_server_list[active_indices]))
            active_replicas_by_model[label] = active_gpus // layout.gpus_per_replica

        return InferenceAugmentedPower(
            power_w=ThreePhase(
                a=float(phase_power[0]),
                b=float(phase_power[1]),
                c=float(phase_power[2]),
            ),
            power_by_model_w=power_by_model,
            active_replicas_by_model=active_replicas_by_model,
        )

    def reset(self) -> None:
        """Re-seed the noise RNG to its initial state."""
        self._rng = np.random.default_rng(self._seed)

augment(per_gpu_by_model, t)

Augment per-server per-GPU power to three-phase power.

Parameters:

Name Type Description Default
per_gpu_by_model dict[str, ndarray]

Mapping of model label to per-GPU power array of shape (num_servers,). Only models with active replicas should be included.

required
t float

Current simulation time (seconds).

required

Returns:

Type Description
InferenceAugmentedPower

Augmented inference power with three-phase totals, per-model power, and per-model active replica counts.

Source code in openg2g/datacenter/workloads/inference.py
def augment(
    self,
    per_gpu_by_model: dict[str, np.ndarray],
    t: float,
) -> InferenceAugmentedPower:
    """Augment per-server per-GPU power to three-phase power.

    Args:
        per_gpu_by_model: Mapping of model label to per-GPU power
            array of shape `(num_servers,)`. Only models with active
            replicas should be included.
        t: Current simulation time (seconds).

    Returns:
        Augmented inference power with three-phase totals, per-model
            power, and per-model active replica counts.
    """
    phase_power = np.zeros(3, dtype=float)
    power_by_model: dict[str, float] = {}
    active_replicas_by_model: dict[str, int] = {}

    for label, per_gpu in per_gpu_by_model.items():
        layout = self._layouts[label]
        policy = self._policies[label]

        server_powers = per_gpu * layout.gpus_per_server_list * layout.amplitude_scales
        if layout.noise_fraction > 0:
            levels = np.maximum(server_powers, 1.0)
            server_powers += self._rng.normal(0.0, 1.0, size=layout.num_servers) * layout.noise_fraction * levels
        server_powers = np.maximum(server_powers, 0.0)

        active_indices = policy.active_indices(t)
        active_powers = server_powers[active_indices]
        active_phases = layout.phase_list[active_indices]

        model_phase_power = np.zeros(3, dtype=float)
        np.add.at(model_phase_power, active_phases, active_powers)
        phase_power += model_phase_power

        power_by_model[label] = float(np.sum(active_powers))
        active_gpus = int(np.sum(layout.gpus_per_server_list[active_indices]))
        active_replicas_by_model[label] = active_gpus // layout.gpus_per_replica

    return InferenceAugmentedPower(
        power_w=ThreePhase(
            a=float(phase_power[0]),
            b=float(phase_power[1]),
            c=float(phase_power[2]),
        ),
        power_by_model_w=power_by_model,
        active_replicas_by_model=active_replicas_by_model,
    )

reset()

Re-seed the noise RNG to its initial state.

Source code in openg2g/datacenter/workloads/inference.py
def reset(self) -> None:
    """Re-seed the noise RNG to its initial state."""
    self._rng = np.random.default_rng(self._seed)

RequestsConfig

Bases: BaseModel

Configuration for building per-model JSONL request files.

Attributes:

Name Type Description
dataset str

Dataset to sample prompts from ("gpqa" or "lm-arena-chat").

num_requests int

Number of requests to sample per model.

max_completion_tokens int

Maximum output tokens per request.

seed int

Random seed for dataset shuffling and oversampling.

system_prompt str

System prompt prepended to every request.

Source code in openg2g/datacenter/workloads/inference.py
class RequestsConfig(BaseModel):
    """Configuration for building per-model JSONL request files.

    Attributes:
        dataset: Dataset to sample prompts from (`"gpqa"` or `"lm-arena-chat"`).
        num_requests: Number of requests to sample per model.
        max_completion_tokens: Maximum output tokens per request.
        seed: Random seed for dataset shuffling and oversampling.
        system_prompt: System prompt prepended to every request.
    """

    model_config = ConfigDict(frozen=True)

    dataset: str = "lm-arena-chat"
    num_requests: int = 1000
    max_completion_tokens: int = 512
    seed: int = 0
    system_prompt: str = "You are a helpful AI assistant."

RequestStore

Per-model request dicts for online load generation.

Each model's requests are stored as a list of OpenAI Chat Completion streaming request dicts, suitable for submission to a vLLM server.

Attributes:

Name Type Description
requests_by_model

Mapping from model label to request dicts.

Source code in openg2g/datacenter/workloads/inference.py
class RequestStore:
    """Per-model request dicts for online load generation.

    Each model's requests are stored as a list of OpenAI Chat Completion
    streaming request dicts, suitable for submission to a vLLM server.

    Attributes:
        requests_by_model: Mapping from model label to request dicts.
    """

    def __init__(self, requests_by_model: dict[str, list[dict]]) -> None:
        self.requests_by_model = requests_by_model

    @classmethod
    def generate(
        cls,
        models: Sequence[InferenceModelSpec],
        config: RequestsConfig | None = None,
        *,
        extra_body_by_model: dict[str, dict] | None = None,
    ) -> RequestStore:
        """Sample prompts and build per-model request dicts.

        Requires `pip install datasets openai`.

        Args:
            models: Model specifications. Uses `model_id` for the API
                model field.
            config: Request generation config. Uses defaults if `None`.
            extra_body_by_model: Optional per-model extra fields merged
                into every request dict (e.g. `chat_template_kwargs`).
                Keyed by `model_label`.
        """
        import random as _random

        from datasets import load_dataset
        from openai.types.chat import (
            ChatCompletionAssistantMessageParam,
            ChatCompletionContentPartTextParam,
            ChatCompletionMessageParam,
            ChatCompletionSystemMessageParam,
            ChatCompletionUserMessageParam,
        )
        from openai.types.chat.completion_create_params import CompletionCreateParamsStreaming

        if config is None:
            config = RequestsConfig()

        def _text_part(text: str) -> ChatCompletionContentPartTextParam:
            return ChatCompletionContentPartTextParam(type="text", text=text)

        def _prompt_to_messages(prompt: str | list[str]) -> list[ChatCompletionMessageParam]:
            if isinstance(prompt, str):
                return [ChatCompletionUserMessageParam(role="user", content=[_text_part(prompt)])]
            msgs: list[ChatCompletionMessageParam] = [
                ChatCompletionUserMessageParam(role="user", content=[_text_part(prompt[0])])
            ]
            for i, turn in enumerate(prompt[1:]):
                if i % 2 == 0:
                    msgs.append(ChatCompletionAssistantMessageParam(role="assistant", content=[_text_part(turn)]))
                else:
                    msgs.append(ChatCompletionUserMessageParam(role="user", content=[_text_part(turn)]))
            return msgs

        def _maybe_oversample(items: list, target: int, seed: int) -> None:
            if len(items) >= target:
                return
            rng = _random.Random(seed)
            original = list(items)
            while len(items) < target:
                items.append(rng.choice(original))

        def _sample_lm_arena_chat(num_requests: int, seed: int) -> list[str | list[str]]:
            data = load_dataset("lmarena-ai/arena-human-preference-100k", split="train").shuffle(seed=seed)
            prompts: list[str | list[str]] = []
            for item in data:
                num_turns = item["turn"]
                conversation = item["conversation_a"]
                for turns in range(num_turns):
                    if len(prompts) >= num_requests:
                        break
                    messages: list[str] = []
                    for message in conversation[: 2 * turns + 1]:
                        messages.append(message["content"])
                    prompts.append(messages if len(messages) > 1 else messages[0])
                if len(prompts) >= num_requests:
                    break
            _maybe_oversample(prompts, num_requests, seed)
            return prompts

        def _sample_gpqa(num_requests: int, seed: int) -> list[str | list[str]]:
            data = load_dataset("Idavidrein/gpqa", "gpqa_extended", split="train", streaming=True).shuffle(seed=seed)
            _random.seed(seed)
            prompts: list[str | list[str]] = []
            for item in data:
                if len(prompts) >= num_requests:
                    break
                choices = [
                    item["Incorrect Answer 1"].strip(),
                    item["Incorrect Answer 2"].strip(),
                    item["Incorrect Answer 3"].strip(),
                    item["Correct Answer"].strip(),
                ]
                _random.shuffle(choices)
                question = item["Question"]
                prompt = f"What is the correct answer to the following question: {question}\n\nChoices:"
                for letter, choice in zip("ABCD", choices, strict=True):
                    prompt += f"\n({letter}) {choice}"
                prompts.append(prompt)
            _maybe_oversample(prompts, num_requests, seed)
            return prompts

        samplers = {"lm-arena-chat": _sample_lm_arena_chat, "gpqa": _sample_gpqa}
        sampler = samplers.get(config.dataset)
        if sampler is None:
            raise ValueError(f"Unknown dataset: {config.dataset!r}. Available: {sorted(samplers)}")

        extra = extra_body_by_model or {}

        requests_by_model: dict[str, list[dict]] = {}
        for spec in models:
            label = spec.model_label
            model_id = spec.model_id

            logger.info("Sampling %d %s prompts for %s (%s)...", config.num_requests, config.dataset, label, model_id)
            prompts = sampler(num_requests=config.num_requests, seed=config.seed)

            system_msgs: list[ChatCompletionMessageParam] = []
            if config.system_prompt:
                system_msgs.append(ChatCompletionSystemMessageParam(role="system", content=config.system_prompt))

            template = CompletionCreateParamsStreaming(
                model=model_id,
                messages=system_msgs,
                max_completion_tokens=config.max_completion_tokens,
                stream=True,
                stream_options={"include_usage": True, "continuous_usage_stats": True},
            )
            if label in extra:
                template.update(extra[label])

            reqs: list[dict] = []
            for prompt in prompts:
                request = dict(template)
                request["messages"] = list(template["messages"]) + _prompt_to_messages(prompt)
                reqs.append(request)
            requests_by_model[label] = reqs

        return cls(requests_by_model)

    def save(self, out_dir: Path) -> None:
        """Write per-model JSONL files to `out_dir`.

        Args:
            out_dir: Output directory. Created if it doesn't exist.
        """
        out_dir = Path(out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)
        for label, reqs in self.requests_by_model.items():
            out_path = out_dir / f"{label}.jsonl"
            with open(out_path, "w") as f:
                for req in reqs:
                    f.write(json.dumps(req) + "\n")
            logger.info("Wrote %d requests for %s to %s", len(reqs), label, out_path)

    @classmethod
    def load(cls, out_dir: Path) -> RequestStore:
        """Load per-model JSONL files from `out_dir`.

        Args:
            out_dir: Directory containing `{model_label}.jsonl` files.
        """
        requests_by_model: dict[str, list[dict]] = {}
        out_dir = Path(out_dir)
        for path in sorted(out_dir.glob("*.jsonl")):
            label = path.stem
            reqs: list[dict] = []
            with open(path) as f:
                for line in f:
                    line = line.strip()
                    if line:
                        reqs.append(json.loads(line))
            requests_by_model[label] = reqs
            logger.info("Loaded %d requests for %s", len(reqs), label)
        return cls(requests_by_model)

    @classmethod
    def ensure(
        cls,
        out_dir: Path,
        models: Sequence[InferenceModelSpec] | None = None,
        config: RequestsConfig | None = None,
        *,
        extra_body_by_model: dict[str, dict] | None = None,
    ) -> RequestStore:
        """Load request files from `out_dir`, generating first if needed.

        Args:
            out_dir: Directory for JSONL files.
            models: Required if request files don't exist yet.
            config: Request generation config. Uses defaults if `None`.
            extra_body_by_model: Optional per-model extra fields for
                request generation. Keyed by `model_label`.
        """
        out_dir = Path(out_dir)
        if not out_dir.exists():
            if models is None:
                raise ValueError("models required (no cached request data)")
            logger.info("Generating request files to %s ...", out_dir)
            cls.generate(models, config, extra_body_by_model=extra_body_by_model).save(out_dir)
        return cls.load(out_dir)

generate(models, config=None, *, extra_body_by_model=None) classmethod

Sample prompts and build per-model request dicts.

Requires pip install datasets openai.

Parameters:

Name Type Description Default
models Sequence[InferenceModelSpec]

Model specifications. Uses model_id for the API model field.

required
config RequestsConfig | None

Request generation config. Uses defaults if None.

None
extra_body_by_model dict[str, dict] | None

Optional per-model extra fields merged into every request dict (e.g. chat_template_kwargs). Keyed by model_label.

None
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def generate(
    cls,
    models: Sequence[InferenceModelSpec],
    config: RequestsConfig | None = None,
    *,
    extra_body_by_model: dict[str, dict] | None = None,
) -> RequestStore:
    """Sample prompts and build per-model request dicts.

    Requires `pip install datasets openai`.

    Args:
        models: Model specifications. Uses `model_id` for the API
            model field.
        config: Request generation config. Uses defaults if `None`.
        extra_body_by_model: Optional per-model extra fields merged
            into every request dict (e.g. `chat_template_kwargs`).
            Keyed by `model_label`.
    """
    import random as _random

    from datasets import load_dataset
    from openai.types.chat import (
        ChatCompletionAssistantMessageParam,
        ChatCompletionContentPartTextParam,
        ChatCompletionMessageParam,
        ChatCompletionSystemMessageParam,
        ChatCompletionUserMessageParam,
    )
    from openai.types.chat.completion_create_params import CompletionCreateParamsStreaming

    if config is None:
        config = RequestsConfig()

    def _text_part(text: str) -> ChatCompletionContentPartTextParam:
        return ChatCompletionContentPartTextParam(type="text", text=text)

    def _prompt_to_messages(prompt: str | list[str]) -> list[ChatCompletionMessageParam]:
        if isinstance(prompt, str):
            return [ChatCompletionUserMessageParam(role="user", content=[_text_part(prompt)])]
        msgs: list[ChatCompletionMessageParam] = [
            ChatCompletionUserMessageParam(role="user", content=[_text_part(prompt[0])])
        ]
        for i, turn in enumerate(prompt[1:]):
            if i % 2 == 0:
                msgs.append(ChatCompletionAssistantMessageParam(role="assistant", content=[_text_part(turn)]))
            else:
                msgs.append(ChatCompletionUserMessageParam(role="user", content=[_text_part(turn)]))
        return msgs

    def _maybe_oversample(items: list, target: int, seed: int) -> None:
        if len(items) >= target:
            return
        rng = _random.Random(seed)
        original = list(items)
        while len(items) < target:
            items.append(rng.choice(original))

    def _sample_lm_arena_chat(num_requests: int, seed: int) -> list[str | list[str]]:
        data = load_dataset("lmarena-ai/arena-human-preference-100k", split="train").shuffle(seed=seed)
        prompts: list[str | list[str]] = []
        for item in data:
            num_turns = item["turn"]
            conversation = item["conversation_a"]
            for turns in range(num_turns):
                if len(prompts) >= num_requests:
                    break
                messages: list[str] = []
                for message in conversation[: 2 * turns + 1]:
                    messages.append(message["content"])
                prompts.append(messages if len(messages) > 1 else messages[0])
            if len(prompts) >= num_requests:
                break
        _maybe_oversample(prompts, num_requests, seed)
        return prompts

    def _sample_gpqa(num_requests: int, seed: int) -> list[str | list[str]]:
        data = load_dataset("Idavidrein/gpqa", "gpqa_extended", split="train", streaming=True).shuffle(seed=seed)
        _random.seed(seed)
        prompts: list[str | list[str]] = []
        for item in data:
            if len(prompts) >= num_requests:
                break
            choices = [
                item["Incorrect Answer 1"].strip(),
                item["Incorrect Answer 2"].strip(),
                item["Incorrect Answer 3"].strip(),
                item["Correct Answer"].strip(),
            ]
            _random.shuffle(choices)
            question = item["Question"]
            prompt = f"What is the correct answer to the following question: {question}\n\nChoices:"
            for letter, choice in zip("ABCD", choices, strict=True):
                prompt += f"\n({letter}) {choice}"
            prompts.append(prompt)
        _maybe_oversample(prompts, num_requests, seed)
        return prompts

    samplers = {"lm-arena-chat": _sample_lm_arena_chat, "gpqa": _sample_gpqa}
    sampler = samplers.get(config.dataset)
    if sampler is None:
        raise ValueError(f"Unknown dataset: {config.dataset!r}. Available: {sorted(samplers)}")

    extra = extra_body_by_model or {}

    requests_by_model: dict[str, list[dict]] = {}
    for spec in models:
        label = spec.model_label
        model_id = spec.model_id

        logger.info("Sampling %d %s prompts for %s (%s)...", config.num_requests, config.dataset, label, model_id)
        prompts = sampler(num_requests=config.num_requests, seed=config.seed)

        system_msgs: list[ChatCompletionMessageParam] = []
        if config.system_prompt:
            system_msgs.append(ChatCompletionSystemMessageParam(role="system", content=config.system_prompt))

        template = CompletionCreateParamsStreaming(
            model=model_id,
            messages=system_msgs,
            max_completion_tokens=config.max_completion_tokens,
            stream=True,
            stream_options={"include_usage": True, "continuous_usage_stats": True},
        )
        if label in extra:
            template.update(extra[label])

        reqs: list[dict] = []
        for prompt in prompts:
            request = dict(template)
            request["messages"] = list(template["messages"]) + _prompt_to_messages(prompt)
            reqs.append(request)
        requests_by_model[label] = reqs

    return cls(requests_by_model)

save(out_dir)

Write per-model JSONL files to out_dir.

Parameters:

Name Type Description Default
out_dir Path

Output directory. Created if it doesn't exist.

required
Source code in openg2g/datacenter/workloads/inference.py
def save(self, out_dir: Path) -> None:
    """Write per-model JSONL files to `out_dir`.

    Args:
        out_dir: Output directory. Created if it doesn't exist.
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    for label, reqs in self.requests_by_model.items():
        out_path = out_dir / f"{label}.jsonl"
        with open(out_path, "w") as f:
            for req in reqs:
                f.write(json.dumps(req) + "\n")
        logger.info("Wrote %d requests for %s to %s", len(reqs), label, out_path)

load(out_dir) classmethod

Load per-model JSONL files from out_dir.

Parameters:

Name Type Description Default
out_dir Path

Directory containing {model_label}.jsonl files.

required
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def load(cls, out_dir: Path) -> RequestStore:
    """Load per-model JSONL files from `out_dir`.

    Args:
        out_dir: Directory containing `{model_label}.jsonl` files.
    """
    requests_by_model: dict[str, list[dict]] = {}
    out_dir = Path(out_dir)
    for path in sorted(out_dir.glob("*.jsonl")):
        label = path.stem
        reqs: list[dict] = []
        with open(path) as f:
            for line in f:
                line = line.strip()
                if line:
                    reqs.append(json.loads(line))
        requests_by_model[label] = reqs
        logger.info("Loaded %d requests for %s", len(reqs), label)
    return cls(requests_by_model)

ensure(out_dir, models=None, config=None, *, extra_body_by_model=None) classmethod

Load request files from out_dir, generating first if needed.

Parameters:

Name Type Description Default
out_dir Path

Directory for JSONL files.

required
models Sequence[InferenceModelSpec] | None

Required if request files don't exist yet.

None
config RequestsConfig | None

Request generation config. Uses defaults if None.

None
extra_body_by_model dict[str, dict] | None

Optional per-model extra fields for request generation. Keyed by model_label.

None
Source code in openg2g/datacenter/workloads/inference.py
@classmethod
def ensure(
    cls,
    out_dir: Path,
    models: Sequence[InferenceModelSpec] | None = None,
    config: RequestsConfig | None = None,
    *,
    extra_body_by_model: dict[str, dict] | None = None,
) -> RequestStore:
    """Load request files from `out_dir`, generating first if needed.

    Args:
        out_dir: Directory for JSONL files.
        models: Required if request files don't exist yet.
        config: Request generation config. Uses defaults if `None`.
        extra_body_by_model: Optional per-model extra fields for
            request generation. Keyed by `model_label`.
    """
    out_dir = Path(out_dir)
    if not out_dir.exists():
        if models is None:
            raise ValueError("models required (no cached request data)")
        logger.info("Generating request files to %s ...", out_dir)
        cls.generate(models, config, extra_body_by_model=extra_body_by_model).save(out_dir)
    return cls.load(out_dir)

openg2g.datacenter.workloads.training

Training workload: typed trace data and periodic overlay evaluation.

TrainingTraceParams

Bases: BaseModel

Parameters for synthetic training-like power trace generation.

Attributes:

Name Type Description
duration_s float

Total duration (seconds).

dt_s float

Timestep (seconds).

seed int

Random seed.

P_hi float

High plateau power (W).

P_lo float

Low plateau power (W).

sigma_hi float

Noise std in high plateaus (W).

sigma_lo float

Noise std in low plateaus (W).

seg_lo_range tuple[float, float]

Duration range for low segments (seconds).

seg_hi_range tuple[float, float]

Duration range for high segments (seconds).

dip_prob_per_sec float

Expected brief dips per second.

dip_depth_range tuple[float, float]

Depth range for brief dips (W below current level).

dip_dur_range tuple[float, float]

Duration range for brief dips (seconds).

smooth_window_s float

Smoothing window width (seconds).

ramp_s float

Initial warm-up ramp duration (seconds).

ramp_from float

Power at ramp start (W).

Source code in openg2g/datacenter/workloads/training.py
class TrainingTraceParams(BaseModel):
    """Parameters for synthetic training-like power trace generation.

    Attributes:
        duration_s: Total duration (seconds).
        dt_s: Timestep (seconds).
        seed: Random seed.
        P_hi: High plateau power (W).
        P_lo: Low plateau power (W).
        sigma_hi: Noise std in high plateaus (W).
        sigma_lo: Noise std in low plateaus (W).
        seg_lo_range: Duration range for low segments (seconds).
        seg_hi_range: Duration range for high segments (seconds).
        dip_prob_per_sec: Expected brief dips per second.
        dip_depth_range: Depth range for brief dips (W below current level).
        dip_dur_range: Duration range for brief dips (seconds).
        smooth_window_s: Smoothing window width (seconds).
        ramp_s: Initial warm-up ramp duration (seconds).
        ramp_from: Power at ramp start (W).
    """

    model_config = ConfigDict(frozen=True)

    duration_s: float = 1000.0
    dt_s: float = 0.1
    seed: int = 2
    P_hi: float = 225.0
    P_lo: float = 175.0
    sigma_hi: float = 50.0
    sigma_lo: float = 50.0
    seg_lo_range: tuple[float, float] = (10.0, 15.0)
    seg_hi_range: tuple[float, float] = (35.0, 40.0)
    dip_prob_per_sec: float = 0.010
    dip_depth_range: tuple[float, float] = (120.0, 125.0)
    dip_dur_range: tuple[float, float] = (0.06, 0.14)
    smooth_window_s: float = 0.30
    ramp_s: float = 18.0
    ramp_from: float = 50.0

TrainingTrace dataclass

A single-GPU training power trace.

Attributes:

Name Type Description
t_s ndarray

Time vector (seconds), monotonically increasing.

power_w ndarray

Power vector (watts) for one GPU, same length as t_s.

Source code in openg2g/datacenter/workloads/training.py
@dataclass(frozen=True)
class TrainingTrace:
    """A single-GPU training power trace.

    Attributes:
        t_s: Time vector (seconds), monotonically increasing.
        power_w: Power vector (watts) for one GPU, same length as `t_s`.
    """

    COL_TIME = "t_s"
    COL_POWER = "power_W"

    t_s: np.ndarray
    power_w: np.ndarray

    def __post_init__(self) -> None:
        if len(self.t_s) != len(self.power_w):
            raise ValueError(f"t_s and power_w must have the same length, got {len(self.t_s)} and {len(self.power_w)}")
        if len(self.t_s) < 2:
            raise ValueError("Training trace must have >= 2 samples.")

    @classmethod
    def generate(cls, params: TrainingTraceParams | None = None) -> TrainingTrace:
        """Generate a synthetic training-like power trace.

        Args:
            params: Generation parameters. Uses defaults if `None`.

        Returns:
            A new [`TrainingTrace`][.] with generated data.
        """
        if params is None:
            params = TrainingTraceParams()
        t, p = _generate_training_like_trace(params)
        return cls(t_s=t, power_w=p)

    def save(self, csv_path: Path) -> None:
        """Save the trace to a CSV file.

        Args:
            csv_path: Output CSV path.
        """
        csv_path = Path(csv_path)
        csv_path.parent.mkdir(parents=True, exist_ok=True)
        df = pd.DataFrame({self.COL_TIME: self.t_s, self.COL_POWER: self.power_w})
        df.to_csv(csv_path, index=False)

    @classmethod
    def load(cls, csv_path: Path) -> TrainingTrace:
        """Load a training trace from CSV.

        Args:
            csv_path: Path to CSV with columns `t_s` and `power_W`.
        """
        csv_path = Path(csv_path)
        df = pd.read_csv(csv_path)
        if cls.COL_TIME not in df.columns or cls.COL_POWER not in df.columns:
            raise ValueError(
                f"{csv_path} must have columns {cls.COL_TIME!r} and {cls.COL_POWER!r}. Got {list(df.columns)}"
            )

        t = df[cls.COL_TIME].to_numpy(float)
        p = np.clip(df[cls.COL_POWER].to_numpy(float), 0.0, None)

        if np.any(np.diff(t) < 0):
            idx = np.argsort(t)
            t, p = t[idx], p[idx]

        return cls(t_s=t, power_w=p)

    @classmethod
    def ensure(cls, csv_path: Path, params: TrainingTraceParams | None = None) -> TrainingTrace:
        """Load from `csv_path`, generating first if needed.

        Args:
            csv_path: Path to the training trace CSV.
            params: Generation parameters. Required when no cached file exists.
                Uses defaults if `None` and generation is needed.
        """
        csv_path = Path(csv_path)
        if not csv_path.exists():
            logger.info("Generating training trace to %s ...", csv_path)
            cls.generate(params).save(csv_path)
        return cls.load(csv_path)

generate(params=None) classmethod

Generate a synthetic training-like power trace.

Parameters:

Name Type Description Default
params TrainingTraceParams | None

Generation parameters. Uses defaults if None.

None

Returns:

Type Description
TrainingTrace

A new TrainingTrace with generated data.

Source code in openg2g/datacenter/workloads/training.py
@classmethod
def generate(cls, params: TrainingTraceParams | None = None) -> TrainingTrace:
    """Generate a synthetic training-like power trace.

    Args:
        params: Generation parameters. Uses defaults if `None`.

    Returns:
        A new [`TrainingTrace`][.] with generated data.
    """
    if params is None:
        params = TrainingTraceParams()
    t, p = _generate_training_like_trace(params)
    return cls(t_s=t, power_w=p)

save(csv_path)

Save the trace to a CSV file.

Parameters:

Name Type Description Default
csv_path Path

Output CSV path.

required
Source code in openg2g/datacenter/workloads/training.py
def save(self, csv_path: Path) -> None:
    """Save the trace to a CSV file.

    Args:
        csv_path: Output CSV path.
    """
    csv_path = Path(csv_path)
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    df = pd.DataFrame({self.COL_TIME: self.t_s, self.COL_POWER: self.power_w})
    df.to_csv(csv_path, index=False)

load(csv_path) classmethod

Load a training trace from CSV.

Parameters:

Name Type Description Default
csv_path Path

Path to CSV with columns t_s and power_W.

required
Source code in openg2g/datacenter/workloads/training.py
@classmethod
def load(cls, csv_path: Path) -> TrainingTrace:
    """Load a training trace from CSV.

    Args:
        csv_path: Path to CSV with columns `t_s` and `power_W`.
    """
    csv_path = Path(csv_path)
    df = pd.read_csv(csv_path)
    if cls.COL_TIME not in df.columns or cls.COL_POWER not in df.columns:
        raise ValueError(
            f"{csv_path} must have columns {cls.COL_TIME!r} and {cls.COL_POWER!r}. Got {list(df.columns)}"
        )

    t = df[cls.COL_TIME].to_numpy(float)
    p = np.clip(df[cls.COL_POWER].to_numpy(float), 0.0, None)

    if np.any(np.diff(t) < 0):
        idx = np.argsort(t)
        t, p = t[idx], p[idx]

    return cls(t_s=t, power_w=p)

ensure(csv_path, params=None) classmethod

Load from csv_path, generating first if needed.

Parameters:

Name Type Description Default
csv_path Path

Path to the training trace CSV.

required
params TrainingTraceParams | None

Generation parameters. Required when no cached file exists. Uses defaults if None and generation is needed.

None
Source code in openg2g/datacenter/workloads/training.py
@classmethod
def ensure(cls, csv_path: Path, params: TrainingTraceParams | None = None) -> TrainingTrace:
    """Load from `csv_path`, generating first if needed.

    Args:
        csv_path: Path to the training trace CSV.
        params: Generation parameters. Required when no cached file exists.
            Uses defaults if `None` and generation is needed.
    """
    csv_path = Path(csv_path)
    if not csv_path.exists():
        logger.info("Generating training trace to %s ...", csv_path)
        cls.generate(params).save(csv_path)
    return cls.load(csv_path)