Skip to content

test_telemetry_session

ipw.tests.execution.test_telemetry_session

Tests for telemetry session management.

TestTelemetrySession

Test TelemetrySession context manager.

Source code in intelligence-per-watt/src/ipw/tests/execution/test_telemetry_session.py
class TestTelemetrySession:
    """Test TelemetrySession context manager."""

    def test_initializes_with_collector(self) -> None:
        collector = Mock()
        session = TelemetrySession(collector)
        assert session._collector is collector

    def test_context_manager_starts_collector(self) -> None:
        collector = Mock()
        collector_ctx = Mock()
        collector.start.return_value = collector_ctx
        collector_ctx.__enter__ = Mock(return_value=collector_ctx)
        collector_ctx.__exit__ = Mock(return_value=None)

        # Create an empty iterator for stream_readings
        collector.stream_readings.return_value = iter([])

        with TelemetrySession(collector) as session:
            collector.start.assert_called_once()
            assert session is not None

    def test_context_manager_stops_thread(self) -> None:
        collector = Mock()
        collector_ctx = Mock()
        collector.start.return_value = collector_ctx
        collector_ctx.__enter__ = Mock(return_value=collector_ctx)
        collector_ctx.__exit__ = Mock(return_value=None)
        collector.stream_readings.return_value = iter([])

        session = TelemetrySession(collector)
        with session:
            pass

        # Thread should be stopped
        assert session._stop_event.is_set()

    def test_window_filters_by_time(self) -> None:
        collector = Mock()
        session = TelemetrySession(collector)

        # Manually add samples
        session._samples.append(
            TelemetrySample(timestamp=1.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=2.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=3.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=4.0, reading=TelemetryReading())
        )

        windowed = list(session.window(1.5, 3.5))
        assert len(windowed) == 2
        assert windowed[0].timestamp == 2.0
        assert windowed[1].timestamp == 3.0

    def test_window_includes_boundaries(self) -> None:
        collector = Mock()
        session = TelemetrySession(collector)

        session._samples.append(
            TelemetrySample(timestamp=1.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=2.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=3.0, reading=TelemetryReading())
        )

        windowed = list(session.window(1.0, 3.0))
        assert len(windowed) == 3

    def test_window_returns_empty_when_no_overlap(self) -> None:
        collector = Mock()
        session = TelemetrySession(collector)

        session._samples.append(
            TelemetrySample(timestamp=1.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=2.0, reading=TelemetryReading())
        )

        windowed = list(session.window(5.0, 10.0))
        assert len(windowed) == 0

    def test_readings_returns_all_samples(self) -> None:
        collector = Mock()
        session = TelemetrySession(collector)

        session._samples.append(
            TelemetrySample(timestamp=1.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=2.0, reading=TelemetryReading())
        )

        readings = list(session.readings())
        assert len(readings) == 2

    def test_trim_removes_old_samples(self) -> None:
        collector = Mock()
        session = TelemetrySession(collector, buffer_seconds=5.0)

        session._samples.append(
            TelemetrySample(timestamp=1.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=2.0, reading=TelemetryReading())
        )
        session._samples.append(
            TelemetrySample(timestamp=10.0, reading=TelemetryReading())
        )

        session._trim(10.0)

        # Only samples within buffer_seconds (5.0) should remain
        # cutoff = 10.0 - 5.0 = 5.0
        # So samples at timestamp >= 5.0 remain
        # That means only the sample at 10.0 should remain
        assert len(session._samples) == 1
        assert session._samples[0].timestamp == 10.0

    def test_respects_max_samples(self) -> None:
        collector = Mock()
        session = TelemetrySession(collector, max_samples=3)

        for i in range(5):
            session._samples.append(
                TelemetrySample(timestamp=float(i), reading=TelemetryReading())
            )

        # deque with maxlen should keep only last 3
        assert len(session._samples) == 3
        assert session._samples[0].timestamp == 2.0
        assert session._samples[-1].timestamp == 4.0

    def test_integration_with_real_collector(self) -> None:
        """Integration test with actual streaming (if available)."""
        collector = Mock()
        collector_ctx = Mock()
        collector.start.return_value = collector_ctx
        collector_ctx.__enter__ = Mock(return_value=collector_ctx)
        collector_ctx.__exit__ = Mock(return_value=None)

        # Simulate a few readings
        readings = [
            TelemetryReading(energy_joules=100.0),
            TelemetryReading(energy_joules=150.0),
            TelemetryReading(energy_joules=200.0),
        ]

        def reading_generator():
            for r in readings:
                yield r
                time.sleep(0.01)  # Small delay

        collector.stream_readings.return_value = reading_generator()

        with TelemetrySession(collector, buffer_seconds=10.0) as session:
            # Give thread time to collect samples
            time.sleep(0.1)

        # Should have collected some samples
        assert len(session._samples) > 0

test_integration_with_real_collector()

Integration test with actual streaming (if available).

Source code in intelligence-per-watt/src/ipw/tests/execution/test_telemetry_session.py
def test_integration_with_real_collector(self) -> None:
    """Integration test with actual streaming (if available)."""
    collector = Mock()
    collector_ctx = Mock()
    collector.start.return_value = collector_ctx
    collector_ctx.__enter__ = Mock(return_value=collector_ctx)
    collector_ctx.__exit__ = Mock(return_value=None)

    # Simulate a few readings
    readings = [
        TelemetryReading(energy_joules=100.0),
        TelemetryReading(energy_joules=150.0),
        TelemetryReading(energy_joules=200.0),
    ]

    def reading_generator():
        for r in readings:
            yield r
            time.sleep(0.01)  # Small delay

    collector.stream_readings.return_value = reading_generator()

    with TelemetrySession(collector, buffer_seconds=10.0) as session:
        # Give thread time to collect samples
        time.sleep(0.1)

    # Should have collected some samples
    assert len(session._samples) > 0