Skip to content

openg2g.clock

openg2g.clock

Simulation clock with multi-rate support and optional live-mode wall-clock sync.

SimulationClock dataclass

Integer-tick clock that avoids floating-point drift.

Components run at different rates (DC=0.1s, Grid=1.0s, Controller=1.0s or 60s). The coordinator computes tick_s as the GCD of all component periods.

All time step parameters use fractions.Fraction for exact arithmetic. The time_s property returns float for compatibility with numpy/plotting.

In live mode (live=True), the clock synchronizes with wall-clock time. If computation falls behind, a warning is issued.

Source code in openg2g/clock.py
@dataclass
class SimulationClock:
    """Integer-tick clock that avoids floating-point drift.

    Components run at different rates (DC=0.1s, Grid=1.0s, Controller=1.0s or 60s).
    The coordinator computes `tick_s` as the GCD of all component periods.

    All time step parameters use `fractions.Fraction` for exact arithmetic.
    The `time_s` property returns `float` for compatibility with numpy/plotting.

    In live mode (`live=True`), the clock synchronizes with wall-clock time.
    If computation falls behind, a warning is issued.
    """

    tick_s: Fraction
    live: bool = False
    _step: int = field(default=0, init=False, repr=False)
    _wall_t0: float | None = field(default=None, init=False, repr=False)

    def __post_init__(self) -> None:
        if not isinstance(self.tick_s, Fraction):
            raise TypeError(f"tick_s must be a Fraction, got {type(self.tick_s).__name__}")
        if self.tick_s <= 0:
            raise ValueError(f"tick_s must be positive, got {self.tick_s}")

    @property
    def time_s(self) -> float:
        return float(self._step * self.tick_s)

    @property
    def step(self) -> int:
        return self._step

    def advance(self) -> float:
        """Advance one tick. Returns new simulation time in seconds."""
        self._step += 1
        if self.live:
            if self._wall_t0 is None:
                self._wall_t0 = time.monotonic()
            expected_wall = self._wall_t0 + self.time_s
            now = time.monotonic()
            if now < expected_wall:
                time.sleep(expected_wall - now)
            elif now - expected_wall > float(self.tick_s):
                lag = now - expected_wall
                warnings.warn(
                    f"Clock lag: {lag:.3f}s behind wall time at sim t={self.time_s:.1f}s. "
                    f"Control loop cannot keep up with real-time.",
                    stacklevel=2,
                )
        return self.time_s

    def reset(self) -> None:
        """Reset clock to initial state (tick 0)."""
        self._step = 0
        self._wall_t0 = None

    def is_due(self, period_s: Fraction) -> bool:
        """Check if an event with the given period should fire on this tick.

        Raises:
            ValueError: If *period_s* is not an exact multiple of *tick_s*.
        """
        if period_s <= 0:
            raise ValueError(f"period_s must be positive, got {period_s}")
        ratio = period_s / self.tick_s
        if ratio.denominator != 1:
            raise ValueError(f"period_s={period_s} is not an exact multiple of tick_s={self.tick_s}")
        period_ticks = int(ratio)
        return self._step % period_ticks == 0

advance()

Advance one tick. Returns new simulation time in seconds.

Source code in openg2g/clock.py
def advance(self) -> float:
    """Advance one tick. Returns new simulation time in seconds."""
    self._step += 1
    if self.live:
        if self._wall_t0 is None:
            self._wall_t0 = time.monotonic()
        expected_wall = self._wall_t0 + self.time_s
        now = time.monotonic()
        if now < expected_wall:
            time.sleep(expected_wall - now)
        elif now - expected_wall > float(self.tick_s):
            lag = now - expected_wall
            warnings.warn(
                f"Clock lag: {lag:.3f}s behind wall time at sim t={self.time_s:.1f}s. "
                f"Control loop cannot keep up with real-time.",
                stacklevel=2,
            )
    return self.time_s

reset()

Reset clock to initial state (tick 0).

Source code in openg2g/clock.py
def reset(self) -> None:
    """Reset clock to initial state (tick 0)."""
    self._step = 0
    self._wall_t0 = None

is_due(period_s)

Check if an event with the given period should fire on this tick.

Raises:

Type Description
ValueError

If period_s is not an exact multiple of tick_s.

Source code in openg2g/clock.py
def is_due(self, period_s: Fraction) -> bool:
    """Check if an event with the given period should fire on this tick.

    Raises:
        ValueError: If *period_s* is not an exact multiple of *tick_s*.
    """
    if period_s <= 0:
        raise ValueError(f"period_s must be positive, got {period_s}")
    ratio = period_s / self.tick_s
    if ratio.denominator != 1:
        raise ValueError(f"period_s={period_s} is not an exact multiple of tick_s={self.tick_s}")
    period_ticks = int(ratio)
    return self._step % period_ticks == 0