Skip to content

proto

ipw.telemetry.proto

Dynamic protobuf helpers for the energy monitor API.

StubBundle dataclass

Container for lazily constructed gRPC stub callables and message classes.

Source code in intelligence-per-watt/src/ipw/telemetry/proto.py
@dataclass(frozen=True)
class StubBundle:
    """Container for lazily constructed gRPC stub callables and message classes."""

    stub_factory: Type[Any]
    TelemetryReadingCls: Type[Any]
    StreamRequestCls: Type[Any]
    HealthRequestCls: Type[Any]

get_stub_bundle()

Return the cached dynamic gRPC stub bundle, creating it on first use.

Source code in intelligence-per-watt/src/ipw/telemetry/proto.py
def get_stub_bundle() -> StubBundle:
    """Return the cached dynamic gRPC stub bundle, creating it on first use."""

    global _STUB_BUNDLE
    if _STUB_BUNDLE is not None:
        return _STUB_BUNDLE

    pool = descriptor_pool.Default()
    try:
        pool.FindFileByName("energy.proto")
    except KeyError:
        _register_proto_descriptors(pool)

    TelemetryReadingCls = message_factory.GetMessageClass(
        pool.FindMessageTypeByName("energy.TelemetryReading")
    )
    StreamRequestCls = message_factory.GetMessageClass(
        pool.FindMessageTypeByName("energy.StreamRequest")
    )
    HealthRequestCls = message_factory.GetMessageClass(
        pool.FindMessageTypeByName("energy.HealthRequest")
    )
    HealthResponseCls = message_factory.GetMessageClass(
        pool.FindMessageTypeByName("energy.HealthResponse")
    )

    class EnergyMonitorStub:
        def __init__(self, channel: grpc.Channel) -> None:
            self.Health = channel.unary_unary(
                "/energy.EnergyMonitor/Health",
                request_serializer=HealthRequestCls.SerializeToString,
                response_deserializer=HealthResponseCls.FromString,
            )
            self.StreamTelemetry = channel.unary_stream(
                "/energy.EnergyMonitor/StreamTelemetry",
                request_serializer=StreamRequestCls.SerializeToString,
                response_deserializer=TelemetryReadingCls.FromString,
            )

    _STUB_BUNDLE = StubBundle(
        stub_factory=EnergyMonitorStub,
        TelemetryReadingCls=TelemetryReadingCls,
        StreamRequestCls=StreamRequestCls,
        HealthRequestCls=HealthRequestCls,
    )
    return _STUB_BUNDLE