class TestProfilerRunner:
"""Test ProfilerRunner orchestration."""
def test_initializes_with_config(self) -> None:
config = ProfilerConfig(
model="test-model",
client_id="ollama",
dataset_id="ipw",
)
runner = ProfilerRunner(config)
assert runner._config == config
@patch("ipw.execution.runner.DatasetRegistry")
@patch("ipw.execution.runner.ClientRegistry")
@patch("ipw.execution.runner.EnergyMonitorCollector")
@patch("ipw.execution.runner.TelemetrySession")
@patch("ipw.execution.runner.Dataset")
def test_run_creates_output_directory(
self,
mock_dataset_class: Mock,
mock_session: Mock,
mock_collector: Mock,
mock_client_registry: Mock,
mock_dataset_registry: Mock,
tmp_path: Path,
) -> None:
# Setup mocks
mock_dataset = MagicMock()
mock_dataset.size.return_value = 1
mock_dataset.__iter__.return_value = iter(
[DatasetRecord(problem="test", answer="answer", subject="math")]
)
mock_dataset.dataset_id = "test"
mock_dataset.dataset_name = "Test Dataset"
mock_dataset_registry.get.return_value = Mock(return_value=mock_dataset)
mock_client = Mock()
mock_client.health.return_value = True
mock_client.stream_chat_completion.return_value = Response(
content="response",
usage=ChatUsage(prompt_tokens=10, completion_tokens=5, total_tokens=15),
time_to_first_token_ms=100.0,
)
mock_client_registry.get.return_value = Mock(return_value=mock_client)
mock_collector_instance = Mock()
mock_collector.return_value = mock_collector_instance
mock_telemetry = Mock()
mock_telemetry.window.return_value = []
mock_telemetry.readings.return_value = []
mock_session.return_value.__enter__.return_value = mock_telemetry
# Mock Dataset.from_list to return a mock with save_to_disk that creates the directory
mock_hf_dataset = Mock()
def mock_save_to_disk(path: str):
Path(path).mkdir(parents=True, exist_ok=True)
mock_hf_dataset.save_to_disk = mock_save_to_disk
mock_dataset_class.from_list.return_value = mock_hf_dataset
config = ProfilerConfig(
model="test-model",
client_id="test-client",
dataset_id="test-dataset",
output_dir=tmp_path,
)
runner = ProfilerRunner(config)
runner.run()
# Check that output directory was created
expected_dir = tmp_path / "profile_UNKNOWN_HW_test_model_Test Dataset"
assert expected_dir.exists()
# Check that summary.json was written
summary_path = expected_dir / "summary.json"
assert summary_path.exists()
summary = json.loads(summary_path.read_text())
assert summary["profiler_config"]["model"] == "test-model"
assert "versions" in summary
@patch("ipw.execution.runner.DatasetRegistry")
@patch("ipw.execution.runner.ClientRegistry")
def test_raises_on_unknown_dataset(
self,
mock_client_registry: Mock,
mock_dataset_registry: Mock,
) -> None:
mock_dataset_registry.get.side_effect = KeyError("unknown")
config = ProfilerConfig(
model="test-model",
client_id="test-client",
dataset_id="unknown",
)
runner = ProfilerRunner(config)
with pytest.raises(RuntimeError, match="Unknown dataset"):
runner.run()
@patch("ipw.execution.runner.DatasetRegistry")
@patch("ipw.execution.runner.ClientRegistry")
def test_raises_on_unknown_client(
self,
mock_client_registry: Mock,
mock_dataset_registry: Mock,
) -> None:
mock_dataset_registry.get.return_value = Mock(return_value=Mock())
mock_client_registry.get.side_effect = KeyError("unknown")
config = ProfilerConfig(
model="test-model",
client_id="unknown",
dataset_id="test-dataset",
)
runner = ProfilerRunner(config)
with pytest.raises(RuntimeError, match="Unknown client"):
runner.run()
@patch("ipw.execution.runner.DatasetRegistry")
@patch("ipw.execution.runner.ClientRegistry")
@patch("ipw.execution.runner.EnergyMonitorCollector")
def test_raises_when_client_unhealthy(
self,
mock_collector: Mock,
mock_client_registry: Mock,
mock_dataset_registry: Mock,
) -> None:
mock_dataset = Mock()
mock_dataset_registry.get.return_value = Mock(return_value=mock_dataset)
mock_client = Mock()
mock_client.health.return_value = False
mock_client.client_name = "test-client"
mock_client_registry.get.return_value = Mock(return_value=mock_client)
mock_collector.return_value = Mock()
config = ProfilerConfig(
model="test-model",
client_id="test-client",
dataset_id="test-dataset",
)
runner = ProfilerRunner(config)
with pytest.raises(RuntimeError, match="unavailable"):
runner.run()
def test_compute_energy_metrics_handles_empty_readings(self) -> None:
config = ProfilerConfig(
model="test",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
metrics = runner._compute_energy_metrics([])
assert metrics.per_query_joules is None
assert metrics.total_joules is None
def test_compute_energy_metrics_handles_first_query(self) -> None:
config = ProfilerConfig(
model="test",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
readings = [
TelemetryReading(energy_joules=100.0),
TelemetryReading(energy_joules=150.0),
]
metrics = runner._compute_energy_metrics(readings)
assert metrics.per_query_joules == 50.0
assert metrics.total_joules == 50.0
def test_compute_energy_metrics_handles_subsequent_queries(self) -> None:
config = ProfilerConfig(
model="test",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
# First query
readings1 = [
TelemetryReading(energy_joules=100.0),
TelemetryReading(energy_joules=150.0),
]
runner._compute_energy_metrics(readings1)
# Second query
readings2 = [
TelemetryReading(energy_joules=150.0),
TelemetryReading(energy_joules=200.0),
]
metrics = runner._compute_energy_metrics(readings2)
assert metrics.per_query_joules == 50.0
def test_compute_energy_metrics_handles_counter_reset_between_queries(self) -> None:
config = ProfilerConfig(
model="test",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
# First query
readings1 = [
TelemetryReading(energy_joules=100.0),
TelemetryReading(energy_joules=150.0),
]
runner._compute_energy_metrics(readings1)
# Counter reset (goes backward)
readings2 = [
TelemetryReading(energy_joules=50.0),
TelemetryReading(energy_joules=100.0),
]
metrics = runner._compute_energy_metrics(readings2)
# Should measure per-query window delta despite reset while idle
assert metrics.per_query_joules == 50.0
def test_compute_energy_metrics_ignores_idle_energy_between_queries(self) -> None:
config = ProfilerConfig(
model="test",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
# First query establishes baseline
readings1 = [
TelemetryReading(energy_joules=100.0),
TelemetryReading(energy_joules=150.0),
]
runner._compute_energy_metrics(readings1)
# Second query starts after unrelated energy consumption
readings2 = [
TelemetryReading(energy_joules=250.0),
TelemetryReading(energy_joules=260.0),
]
metrics = runner._compute_energy_metrics(readings2)
assert metrics.per_query_joules == 10.0
def test_compute_energy_metrics_handles_counter_reset_within_query(self) -> None:
config = ProfilerConfig(
model="test",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
# First query establishes baseline
readings1 = [
TelemetryReading(energy_joules=100.0),
TelemetryReading(energy_joules=150.0),
]
runner._compute_energy_metrics(readings1)
# Counter resets while the query is running
readings2 = [
TelemetryReading(energy_joules=200.0),
TelemetryReading(energy_joules=10.0),
]
metrics = runner._compute_energy_metrics(readings2)
assert metrics.per_query_joules is None
def test_compute_energy_metrics_filters_infinite(self) -> None:
config = ProfilerConfig(
model="test",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
readings = [
TelemetryReading(energy_joules=float("inf")),
]
metrics = runner._compute_energy_metrics(readings)
assert metrics.per_query_joules is None
def test_compute_energy_metrics_filters_negative(self) -> None:
config = ProfilerConfig(
model="test",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
readings = [
TelemetryReading(energy_joules=-100.0),
]
metrics = runner._compute_energy_metrics(readings)
assert metrics.per_query_joules is None
def test_build_record_creates_model_metrics(self) -> None:
config = ProfilerConfig(
model="test-model",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
record = DatasetRecord(problem="test", answer="answer", subject="math")
response = Response(
content="response",
usage=ChatUsage(prompt_tokens=10, completion_tokens=5, total_tokens=15),
time_to_first_token_ms=100.0,
)
samples = [
TelemetrySample(
timestamp=1.0,
reading=TelemetryReading(
energy_joules=100.0,
power_watts=50.0,
),
),
TelemetrySample(
timestamp=2.0,
reading=TelemetryReading(
energy_joules=150.0,
power_watts=50.0,
),
),
]
result = runner._build_record(0, record, response, samples, 0.0, 2.0)
assert result is not None
assert "test-model" in result.model_metrics
metrics = result.model_metrics["test-model"]
assert metrics.token_metrics.input == 10
assert metrics.token_metrics.output == 5
assert metrics.token_metrics.total == 15
def test_build_record_handles_zero_completion_tokens(self) -> None:
config = ProfilerConfig(
model="test-model",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
record = DatasetRecord(problem="test", answer="answer", subject="math")
response = Response(
content="",
usage=ChatUsage(prompt_tokens=10, completion_tokens=0, total_tokens=10),
time_to_first_token_ms=0.0,
)
samples = []
result = runner._build_record(0, record, response, samples, 0.0, 1.0)
assert result is not None
metrics = result.model_metrics["test-model"]
assert metrics.latency_metrics.per_token_ms is None
assert metrics.latency_metrics.throughput_tokens_per_sec is None
def test_build_record_computes_throughput(self) -> None:
config = ProfilerConfig(
model="test-model",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
record = DatasetRecord(problem="test", answer="answer", subject="math")
response = Response(
content="response",
usage=ChatUsage(prompt_tokens=10, completion_tokens=10, total_tokens=20),
time_to_first_token_ms=100.0,
)
samples = []
# 10 tokens in 1 second = 10 tokens/sec
result = runner._build_record(0, record, response, samples, 0.0, 1.0)
assert result is not None
metrics = result.model_metrics["test-model"]
assert metrics.latency_metrics.throughput_tokens_per_sec == 10.0
assert metrics.latency_metrics.per_token_ms == 100.0
@patch.object(ProfilerRunner, "_process_records", autospec=True, side_effect=RuntimeError("boom"))
@patch.object(ProfilerRunner, "_ensure_client_ready")
@patch("ipw.execution.runner.TelemetrySession")
@patch("ipw.execution.runner.EnergyMonitorCollector")
def test_run_closes_client_on_error(
self,
mock_collector: Mock,
mock_session: Mock,
_mock_ensure_ready: Mock,
_mock_process_records: Mock,
) -> None:
config = ProfilerConfig(
model="test-model",
client_id="test",
dataset_id="test",
)
runner = ProfilerRunner(config)
client = Mock()
client.close = Mock()
dataset = Mock()
mock_session.return_value.__enter__.return_value = Mock()
mock_collector.return_value = Mock()
with patch.object(runner, "_resolve_dataset", return_value=dataset), patch.object(
runner, "_resolve_client", return_value=client
):
with pytest.raises(RuntimeError):
runner.run()
client.close.assert_called_once()
def test_get_output_path_includes_hardware_and_model(self, tmp_path: Path) -> None:
config = ProfilerConfig(
model="llama-3.2:1b",
client_id="test",
dataset_id="test",
output_dir=tmp_path,
)
runner = ProfilerRunner(config)
runner._hardware_label = "RTX3090"
path = runner._get_output_path()
assert path == tmp_path / "profile_RTX3090_llama_3_2_1b_test"