Integer-tick clock that avoids floating-point drift.
Components run at different rates (DC=0.1s, Grid=1.0s, Controller=1.0s or 60s).
The coordinator computes tick_s as the GCD of all component periods.
All time step parameters use fractions.Fraction for exact arithmetic.
The time_s property returns float for compatibility with numpy/plotting.
In live mode (live=True), the clock synchronizes with wall-clock time.
If computation falls behind, a warning is issued.
Source code in openg2g/clock.py
| @dataclass
class SimulationClock:
"""Integer-tick clock that avoids floating-point drift.
Components run at different rates (DC=0.1s, Grid=1.0s, Controller=1.0s or 60s).
The coordinator computes `tick_s` as the GCD of all component periods.
All time step parameters use `fractions.Fraction` for exact arithmetic.
The `time_s` property returns `float` for compatibility with numpy/plotting.
In live mode (`live=True`), the clock synchronizes with wall-clock time.
If computation falls behind, a warning is issued.
"""
tick_s: Fraction
live: bool = False
_step: int = field(default=0, init=False, repr=False)
_wall_t0: float | None = field(default=None, init=False, repr=False)
def __post_init__(self) -> None:
if not isinstance(self.tick_s, Fraction):
raise TypeError(f"tick_s must be a Fraction, got {type(self.tick_s).__name__}")
if self.tick_s <= 0:
raise ValueError(f"tick_s must be positive, got {self.tick_s}")
@property
def time_s(self) -> float:
return float(self._step * self.tick_s)
@property
def step(self) -> int:
return self._step
def advance(self) -> float:
"""Advance one tick. Returns new simulation time in seconds."""
self._step += 1
if self.live:
if self._wall_t0 is None:
self._wall_t0 = time.monotonic()
expected_wall = self._wall_t0 + self.time_s
now = time.monotonic()
if now < expected_wall:
time.sleep(expected_wall - now)
elif now - expected_wall > float(self.tick_s):
lag = now - expected_wall
warnings.warn(
f"Clock lag: {lag:.3f}s behind wall time at sim t={self.time_s:.1f}s. "
f"Control loop cannot keep up with real-time.",
stacklevel=2,
)
return self.time_s
def reset(self) -> None:
"""Reset clock to initial state (tick 0)."""
self._step = 0
self._wall_t0 = None
def is_due(self, period_s: Fraction) -> bool:
"""Check if an event with the given period should fire on this tick.
Raises:
ValueError: If *period_s* is not an exact multiple of *tick_s*.
"""
if period_s <= 0:
raise ValueError(f"period_s must be positive, got {period_s}")
ratio = period_s / self.tick_s
if ratio.denominator != 1:
raise ValueError(f"period_s={period_s} is not an exact multiple of tick_s={self.tick_s}")
period_ticks = int(ratio)
return self._step % period_ticks == 0
|
advance()
Advance one tick. Returns new simulation time in seconds.
Source code in openg2g/clock.py
| def advance(self) -> float:
"""Advance one tick. Returns new simulation time in seconds."""
self._step += 1
if self.live:
if self._wall_t0 is None:
self._wall_t0 = time.monotonic()
expected_wall = self._wall_t0 + self.time_s
now = time.monotonic()
if now < expected_wall:
time.sleep(expected_wall - now)
elif now - expected_wall > float(self.tick_s):
lag = now - expected_wall
warnings.warn(
f"Clock lag: {lag:.3f}s behind wall time at sim t={self.time_s:.1f}s. "
f"Control loop cannot keep up with real-time.",
stacklevel=2,
)
return self.time_s
|
reset()
Reset clock to initial state (tick 0).
Source code in openg2g/clock.py
| def reset(self) -> None:
"""Reset clock to initial state (tick 0)."""
self._step = 0
self._wall_t0 = None
|
is_due(period_s)
Check if an event with the given period should fire on this tick.
Raises:
| Type |
Description |
ValueError
|
If period_s is not an exact multiple of tick_s.
|
Source code in openg2g/clock.py
| def is_due(self, period_s: Fraction) -> bool:
"""Check if an event with the given period should fire on this tick.
Raises:
ValueError: If *period_s* is not an exact multiple of *tick_s*.
"""
if period_s <= 0:
raise ValueError(f"period_s must be positive, got {period_s}")
ratio = period_s / self.tick_s
if ratio.denominator != 1:
raise ValueError(f"period_s={period_s} is not an exact multiple of tick_s={self.tick_s}")
period_ticks = int(ratio)
return self._step % period_ticks == 0
|