@click.command(help="Run an agentic benchmark against a dataset.")
@click.option("--agent", "agent_id", required=True, help="Agent identifier")
@click.option("--model", required=True, help="Model name for the agent")
@click.option("--dataset", "dataset_id", required=True, help="Dataset identifier")
@click.option(
"--client-base-url",
default="http://localhost:8000",
show_default=True,
help="Inference server base URL",
)
@click.option(
"--api-key",
default="EMPTY",
show_default=True,
help="API key for the inference server",
)
@click.option(
"--max-queries",
type=int,
default=None,
help="Max queries to run (default: all)",
)
@click.option(
"--output-dir",
type=click.Path(),
default="./runs/",
show_default=True,
help="Output directory",
)
@click.option(
"--export-format",
default="jsonl,hf",
show_default=True,
help="Comma-separated export formats (jsonl, hf)",
)
@click.option(
"--estimate-flops",
is_flag=True,
default=False,
help="Enable FLOPs estimation",
)
@click.option(
"--agent-kwargs",
default=None,
help="JSON string of extra agent keyword arguments",
)
@click.option(
"--eval-client",
default="openai",
show_default=True,
help="Client for evaluation",
)
@click.option(
"--eval-model",
default="gpt-5-nano-2025-08-07",
show_default=True,
help="Model for evaluation",
)
def run_cmd(
agent_id: str,
model: str,
dataset_id: str,
client_base_url: str,
api_key: str,
max_queries: int | None,
output_dir: str,
export_format: str,
estimate_flops: bool,
agent_kwargs: str | None,
eval_client: str,
eval_model: str,
) -> None:
"""Execute an agentic benchmark run."""
import ipw.datasets
ipw.datasets.ensure_registered()
# Ensure agent modules are imported for registry population
from ipw.agents import react as _react, openhands as _openhands, terminus as _terminus # noqa: F401
from ipw.core.registry import AgentRegistry, DatasetRegistry
from ipw.execution.agentic_runner import AgenticRunner
from ipw.execution.exporters import export_hf_dataset, export_jsonl, export_summary_json
from ipw.telemetry.events import EventRecorder
# Parse agent kwargs
extra_kwargs: dict = {}
if agent_kwargs:
try:
extra_kwargs = json.loads(agent_kwargs)
except json.JSONDecodeError as exc:
raise click.ClickException(
f"Invalid JSON for --agent-kwargs: {exc}"
) from exc
# Resolve agent
try:
agent_cls = AgentRegistry.get(agent_id)
except KeyError:
available = [k for k, _ in AgentRegistry.items()]
raise click.ClickException(
f"Unknown agent '{agent_id}'. Available: {', '.join(available) or 'none'}"
)
# Resolve dataset
try:
dataset_cls = DatasetRegistry.get(dataset_id)
except KeyError:
available = [k for k, _ in DatasetRegistry.items()]
raise click.ClickException(
f"Unknown dataset '{dataset_id}'. Available: {', '.join(available) or 'none'}"
)
# Preflight: dataset requirements
try:
dataset_instance = dataset_cls()
issues = dataset_instance.verify_requirements()
if issues:
raise click.ClickException(
"Dataset requirements not satisfied:\n- " + "\n- ".join(issues)
)
except click.ClickException:
raise
except Exception as exc:
raise click.ClickException(f"Failed to initialize dataset: {exc}") from exc
# Create event recorder and agent
event_recorder = EventRecorder()
resolved_model = _create_model_for_agent(agent_id, model, client_base_url, api_key)
try:
agent_instance = agent_cls(
model=resolved_model,
event_recorder=event_recorder,
**extra_kwargs,
)
except TypeError as exc:
raise click.ClickException(
f"Failed to initialize agent '{agent_id}': {exc}"
) from exc
# Prepare output directory
model_slug = "".join(c if c.isalnum() else "_" for c in model).strip("_") or "model"
run_dir = Path(output_dir) / f"run_{agent_id}_{model_slug}_{dataset_id}"
run_dir.mkdir(parents=True, exist_ok=True)
# Build config for summary
run_config = {
"agent": agent_id,
"model": model,
"dataset": dataset_id,
"client_base_url": client_base_url,
"max_queries": max_queries,
"export_format": export_format,
"estimate_flops": estimate_flops,
"eval_client": eval_client,
"eval_model": eval_model,
}
info(f"Agent: {agent_id}")
info(f"Model: {model}")
info(f"Dataset: {dataset_id}")
info(f"Output: {run_dir}")
info("")
# Run the agentic benchmark
runner = AgenticRunner(
agent=agent_instance,
dataset=dataset_instance,
telemetry_session=None,
config=run_config,
event_recorder=event_recorder,
)
try:
traces = asyncio.run(runner.run(max_queries=max_queries))
except Exception as exc:
error(f"Run failed: {exc}")
sys.exit(1)
if not traces:
warning("No traces collected.")
return
# FLOPs estimation
if estimate_flops:
try:
from ipw.compute.flops import estimate_flops as do_estimate_flops
for trace in traces:
total_flops, flops_per_token = do_estimate_flops(
model,
trace.total_input_tokens,
trace.total_output_tokens,
)
if total_flops > 0:
info(
f" {trace.query_id}: {total_flops:.2e} FLOPs "
f"({flops_per_token:.2e} per token)"
)
except Exception as exc:
warning(f"FLOPs estimation failed: {exc}")
# Export results
formats = [f.strip().lower() for f in export_format.split(",") if f.strip()]
if "jsonl" in formats:
jsonl_path = export_jsonl(traces, run_dir / "traces.jsonl")
info(f"Exported JSONL: {jsonl_path}")
if "hf" in formats:
hf_path = export_hf_dataset(traces, run_dir / "hf_dataset")
info(f"Exported HF dataset: {hf_path}")
summary_path = export_summary_json(traces, run_config, run_dir / "summary.json")
info(f"Exported summary: {summary_path}")
# Print summary
info("")
total_completed = sum(1 for t in traces if t.completed)
total_turns = sum(t.num_turns for t in traces)
total_tokens = sum(t.total_input_tokens + t.total_output_tokens for t in traces)
total_wall = sum(t.total_wall_clock_s for t in traces)
success(f"Run complete: {total_completed}/{len(traces)} queries completed")
info(f" Turns: {total_turns}")
info(f" Tokens: {total_tokens}")
info(f" Wall clock: {total_wall:.1f}s")
gpu_energy = sum(
(t.total_gpu_energy_joules for t in traces if t.total_gpu_energy_joules is not None),
0.0,
)
if gpu_energy > 0:
info(f" GPU energy: {gpu_energy:.2f} J")
cost = sum(
(t.total_cost_usd for t in traces if t.total_cost_usd is not None),
0.0,
)
if cost > 0:
info(f" Cost: ${cost:.4f}")