Skip to content

run

ipw.cli.run

Run agentic benchmarks against a dataset.

run_cmd(agent_id, model, dataset_id, client_base_url, api_key, max_queries, output_dir, export_format, estimate_flops, agent_kwargs, eval_client, eval_model)

Execute an agentic benchmark run.

Source code in intelligence-per-watt/src/ipw/cli/run.py
@click.command(help="Run an agentic benchmark against a dataset.")
@click.option("--agent", "agent_id", required=True, help="Agent identifier")
@click.option("--model", required=True, help="Model name for the agent")
@click.option("--dataset", "dataset_id", required=True, help="Dataset identifier")
@click.option(
    "--client-base-url",
    default="http://localhost:8000",
    show_default=True,
    help="Inference server base URL",
)
@click.option(
    "--api-key",
    default="EMPTY",
    show_default=True,
    help="API key for the inference server",
)
@click.option(
    "--max-queries",
    type=int,
    default=None,
    help="Max queries to run (default: all)",
)
@click.option(
    "--output-dir",
    type=click.Path(),
    default="./runs/",
    show_default=True,
    help="Output directory",
)
@click.option(
    "--export-format",
    default="jsonl,hf",
    show_default=True,
    help="Comma-separated export formats (jsonl, hf)",
)
@click.option(
    "--estimate-flops",
    is_flag=True,
    default=False,
    help="Enable FLOPs estimation",
)
@click.option(
    "--agent-kwargs",
    default=None,
    help="JSON string of extra agent keyword arguments",
)
@click.option(
    "--eval-client",
    default="openai",
    show_default=True,
    help="Client for evaluation",
)
@click.option(
    "--eval-model",
    default="gpt-5-nano-2025-08-07",
    show_default=True,
    help="Model for evaluation",
)
def run_cmd(
    agent_id: str,
    model: str,
    dataset_id: str,
    client_base_url: str,
    api_key: str,
    max_queries: int | None,
    output_dir: str,
    export_format: str,
    estimate_flops: bool,
    agent_kwargs: str | None,
    eval_client: str,
    eval_model: str,
) -> None:
    """Execute an agentic benchmark run."""
    import ipw.datasets

    ipw.datasets.ensure_registered()

    # Ensure agent modules are imported for registry population
    from ipw.agents import react as _react, openhands as _openhands, terminus as _terminus  # noqa: F401
    from ipw.core.registry import AgentRegistry, DatasetRegistry
    from ipw.execution.agentic_runner import AgenticRunner
    from ipw.execution.exporters import export_hf_dataset, export_jsonl, export_summary_json
    from ipw.telemetry.events import EventRecorder

    # Parse agent kwargs
    extra_kwargs: dict = {}
    if agent_kwargs:
        try:
            extra_kwargs = json.loads(agent_kwargs)
        except json.JSONDecodeError as exc:
            raise click.ClickException(
                f"Invalid JSON for --agent-kwargs: {exc}"
            ) from exc

    # Resolve agent
    try:
        agent_cls = AgentRegistry.get(agent_id)
    except KeyError:
        available = [k for k, _ in AgentRegistry.items()]
        raise click.ClickException(
            f"Unknown agent '{agent_id}'. Available: {', '.join(available) or 'none'}"
        )

    # Resolve dataset
    try:
        dataset_cls = DatasetRegistry.get(dataset_id)
    except KeyError:
        available = [k for k, _ in DatasetRegistry.items()]
        raise click.ClickException(
            f"Unknown dataset '{dataset_id}'. Available: {', '.join(available) or 'none'}"
        )

    # Preflight: dataset requirements
    try:
        dataset_instance = dataset_cls()
        issues = dataset_instance.verify_requirements()
        if issues:
            raise click.ClickException(
                "Dataset requirements not satisfied:\n- " + "\n- ".join(issues)
            )
    except click.ClickException:
        raise
    except Exception as exc:
        raise click.ClickException(f"Failed to initialize dataset: {exc}") from exc

    # Create event recorder and agent
    event_recorder = EventRecorder()
    resolved_model = _create_model_for_agent(agent_id, model, client_base_url, api_key)
    try:
        agent_instance = agent_cls(
            model=resolved_model,
            event_recorder=event_recorder,
            **extra_kwargs,
        )
    except TypeError as exc:
        raise click.ClickException(
            f"Failed to initialize agent '{agent_id}': {exc}"
        ) from exc

    # Prepare output directory
    model_slug = "".join(c if c.isalnum() else "_" for c in model).strip("_") or "model"
    run_dir = Path(output_dir) / f"run_{agent_id}_{model_slug}_{dataset_id}"
    run_dir.mkdir(parents=True, exist_ok=True)

    # Build config for summary
    run_config = {
        "agent": agent_id,
        "model": model,
        "dataset": dataset_id,
        "client_base_url": client_base_url,
        "max_queries": max_queries,
        "export_format": export_format,
        "estimate_flops": estimate_flops,
        "eval_client": eval_client,
        "eval_model": eval_model,
    }

    info(f"Agent: {agent_id}")
    info(f"Model: {model}")
    info(f"Dataset: {dataset_id}")
    info(f"Output: {run_dir}")
    info("")

    # Run the agentic benchmark
    runner = AgenticRunner(
        agent=agent_instance,
        dataset=dataset_instance,
        telemetry_session=None,
        config=run_config,
        event_recorder=event_recorder,
    )

    try:
        traces = asyncio.run(runner.run(max_queries=max_queries))
    except Exception as exc:
        error(f"Run failed: {exc}")
        sys.exit(1)

    if not traces:
        warning("No traces collected.")
        return

    # FLOPs estimation
    if estimate_flops:
        try:
            from ipw.compute.flops import estimate_flops as do_estimate_flops

            for trace in traces:
                total_flops, flops_per_token = do_estimate_flops(
                    model,
                    trace.total_input_tokens,
                    trace.total_output_tokens,
                )
                if total_flops > 0:
                    info(
                        f"  {trace.query_id}: {total_flops:.2e} FLOPs "
                        f"({flops_per_token:.2e} per token)"
                    )
        except Exception as exc:
            warning(f"FLOPs estimation failed: {exc}")

    # Export results
    formats = [f.strip().lower() for f in export_format.split(",") if f.strip()]

    if "jsonl" in formats:
        jsonl_path = export_jsonl(traces, run_dir / "traces.jsonl")
        info(f"Exported JSONL: {jsonl_path}")

    if "hf" in formats:
        hf_path = export_hf_dataset(traces, run_dir / "hf_dataset")
        info(f"Exported HF dataset: {hf_path}")

    summary_path = export_summary_json(traces, run_config, run_dir / "summary.json")
    info(f"Exported summary: {summary_path}")

    # Print summary
    info("")
    total_completed = sum(1 for t in traces if t.completed)
    total_turns = sum(t.num_turns for t in traces)
    total_tokens = sum(t.total_input_tokens + t.total_output_tokens for t in traces)
    total_wall = sum(t.total_wall_clock_s for t in traces)

    success(f"Run complete: {total_completed}/{len(traces)} queries completed")
    info(f"  Turns: {total_turns}")
    info(f"  Tokens: {total_tokens}")
    info(f"  Wall clock: {total_wall:.1f}s")

    gpu_energy = sum(
        (t.total_gpu_energy_joules for t in traces if t.total_gpu_energy_joules is not None),
        0.0,
    )
    if gpu_energy > 0:
        info(f"  GPU energy: {gpu_energy:.2f} J")

    cost = sum(
        (t.total_cost_usd for t in traces if t.total_cost_usd is not None),
        0.0,
    )
    if cost > 0:
        info(f"  Cost: ${cost:.4f}")