Skip to content

Index

ipw.agents.mcp.retrieval

Retrieval MCP servers for benchmark integration.

This module provides retrieval capabilities (BM25, dense, grep, hybrid) for benchmarks that need document retrieval, with full energy/power/latency profiling.

Example

from ipw.agents.mcp.retrieval import HybridRetrievalServer, Document

server = HybridRetrievalServer(telemetry_collector=collector) docs = [Document(id="1", content="..."), Document(id="2", content="...")] server.index_documents(docs)

result = server.execute("search query", top_k=5)

BaseRetrievalServer

Bases: BaseMCPServer

Base class for retrieval servers with automatic telemetry.

Subclasses must implement: - _execute_impl(): Perform the actual retrieval - index_documents(): Index a list of documents - clear_index(): Clear all indexed documents

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
class BaseRetrievalServer(BaseMCPServer):
    """Base class for retrieval servers with automatic telemetry.

    Subclasses must implement:
    - _execute_impl(): Perform the actual retrieval
    - index_documents(): Index a list of documents
    - clear_index(): Clear all indexed documents
    """

    def __init__(
        self,
        name: str,
        telemetry_collector: Optional[Any] = None,
        event_recorder: Optional[Any] = None,
    ):
        """Initialize retrieval server.

        Args:
            name: Server name (e.g., "retrieval:bm25")
            telemetry_collector: Energy monitor collector for telemetry
            event_recorder: EventRecorder for per-action tracking
        """
        super().__init__(
            name=name,
            telemetry_collector=telemetry_collector,
            event_recorder=event_recorder,
        )
        self._document_count = 0

    def _get_backend(self) -> str:
        """Get the backend type for this server."""
        return "retrieval"

    @abstractmethod
    def index_documents(self, documents: List[Document]) -> int:
        """Index a list of documents.

        Args:
            documents: List of documents to index

        Returns:
            Number of documents successfully indexed
        """
        raise NotImplementedError

    @abstractmethod
    def clear_index(self) -> None:
        """Clear all indexed documents."""
        raise NotImplementedError

    @property
    def document_count(self) -> int:
        """Get the number of indexed documents."""
        return self._document_count

    def _format_results(
        self,
        results: List[RetrievalResult],
        include_scores: bool = True,
        include_metadata: bool = False,
    ) -> str:
        """Format retrieval results as text."""
        if not results:
            return "No results found."

        lines = []
        for i, result in enumerate(results, 1):
            header = f"[{i}] {result.document.id}"
            if include_scores:
                header += f" (score: {result.score:.4f})"
            lines.append(header)

            if include_metadata and result.document.metadata:
                meta_str = ", ".join(
                    f"{k}={v}" for k, v in result.document.metadata.items()
                )
                lines.append(f"  Metadata: {meta_str}")

            if result.highlights:
                for highlight in result.highlights:
                    lines.append(f"  ...{highlight}...")
            else:
                content = result.document.content
                if len(content) > 200:
                    content = content[:200] + "..."
                lines.append(f"  {content}")

            lines.append("")

        return "\n".join(lines)

    def health_check(self) -> bool:
        """Check if retrieval server is operational."""
        try:
            return self._document_count >= 0
        except Exception:
            return False

document_count property

Get the number of indexed documents.

__init__(name, telemetry_collector=None, event_recorder=None)

Initialize retrieval server.

Parameters:

Name Type Description Default
name str

Server name (e.g., "retrieval:bm25")

required
telemetry_collector Optional[Any]

Energy monitor collector for telemetry

None
event_recorder Optional[Any]

EventRecorder for per-action tracking

None
Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
def __init__(
    self,
    name: str,
    telemetry_collector: Optional[Any] = None,
    event_recorder: Optional[Any] = None,
):
    """Initialize retrieval server.

    Args:
        name: Server name (e.g., "retrieval:bm25")
        telemetry_collector: Energy monitor collector for telemetry
        event_recorder: EventRecorder for per-action tracking
    """
    super().__init__(
        name=name,
        telemetry_collector=telemetry_collector,
        event_recorder=event_recorder,
    )
    self._document_count = 0

index_documents(documents) abstractmethod

Index a list of documents.

Parameters:

Name Type Description Default
documents List[Document]

List of documents to index

required

Returns:

Type Description
int

Number of documents successfully indexed

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
@abstractmethod
def index_documents(self, documents: List[Document]) -> int:
    """Index a list of documents.

    Args:
        documents: List of documents to index

    Returns:
        Number of documents successfully indexed
    """
    raise NotImplementedError

clear_index() abstractmethod

Clear all indexed documents.

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
@abstractmethod
def clear_index(self) -> None:
    """Clear all indexed documents."""
    raise NotImplementedError

health_check()

Check if retrieval server is operational.

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
def health_check(self) -> bool:
    """Check if retrieval server is operational."""
    try:
        return self._document_count >= 0
    except Exception:
        return False

Document dataclass

A document for retrieval indexing.

Attributes:

Name Type Description
id str

Unique document identifier

content str

Document text content

metadata Dict[str, Any]

Optional metadata (e.g., title, source, date)

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
@dataclass
class Document:
    """A document for retrieval indexing.

    Attributes:
        id: Unique document identifier
        content: Document text content
        metadata: Optional metadata (e.g., title, source, date)
    """

    id: str
    content: str
    metadata: Dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        """Validate document."""
        if not self.id:
            raise ValueError("Document id cannot be empty")
        if not self.content:
            raise ValueError("Document content cannot be empty")

__post_init__()

Validate document.

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
def __post_init__(self):
    """Validate document."""
    if not self.id:
        raise ValueError("Document id cannot be empty")
    if not self.content:
        raise ValueError("Document content cannot be empty")

RetrievalResult dataclass

A single retrieval result with score.

Attributes:

Name Type Description
document Document

The retrieved document

score float

Relevance score (higher is better)

highlights List[str]

Optional highlighted text snippets

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
@dataclass
class RetrievalResult:
    """A single retrieval result with score.

    Attributes:
        document: The retrieved document
        score: Relevance score (higher is better)
        highlights: Optional highlighted text snippets
    """

    document: Document
    score: float
    highlights: List[str] = field(default_factory=list)

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for serialization."""
        return {
            "id": self.document.id,
            "content": self.document.content,
            "metadata": self.document.metadata,
            "score": self.score,
            "highlights": self.highlights,
        }

to_dict()

Convert to dictionary for serialization.

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/base.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary for serialization."""
    return {
        "id": self.document.id,
        "content": self.document.content,
        "metadata": self.document.metadata,
        "score": self.score,
        "highlights": self.highlights,
    }

GrepRetrievalServer

Bases: BaseRetrievalServer

Fast regex/keyword retrieval without indexing.

Example

server = GrepRetrievalServer() server.index_documents([ Document(id="1", content="Python is great for ML.\nIt has many libraries."), Document(id="2", content="JavaScript is for web development."), ])

result = server.execute("Python", pattern="Python.*ML")

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/grep_server.py
class GrepRetrievalServer(BaseRetrievalServer):
    """Fast regex/keyword retrieval without indexing.

    Example:
        server = GrepRetrievalServer()
        server.index_documents([
            Document(id="1", content="Python is great for ML.\\nIt has many libraries."),
            Document(id="2", content="JavaScript is for web development."),
        ])

        result = server.execute("Python", pattern="Python.*ML")
    """

    def __init__(
        self,
        telemetry_collector: Optional[Any] = None,
        event_recorder: Optional[Any] = None,
        default_context_lines: int = 2,
        max_matches: int = 50,
    ):
        super().__init__(
            name="retrieval:grep",
            telemetry_collector=telemetry_collector,
            event_recorder=event_recorder,
        )
        self.default_context_lines = default_context_lines
        self.max_matches = max_matches
        self._documents: Dict[str, Document] = {}

    def index_documents(self, documents: List[Document]) -> int:
        count = 0
        for doc in documents:
            self._documents[doc.id] = doc
            count += 1
        self._document_count = len(self._documents)
        return count

    def clear_index(self) -> None:
        self._documents.clear()
        self._document_count = 0

    def _grep_documents(
        self,
        pattern: str,
        case_sensitive: bool = False,
        context_lines: int = 2,
    ) -> List[GrepMatch]:
        matches = []
        flags = 0 if case_sensitive else re.IGNORECASE

        try:
            compiled_pattern = re.compile(pattern, flags)
        except re.error:
            compiled_pattern = re.compile(re.escape(pattern), flags)

        for doc_id, doc in self._documents.items():
            lines = doc.content.split("\n")

            for line_num, line in enumerate(lines):
                match = compiled_pattern.search(line)
                if match:
                    start_ctx = max(0, line_num - context_lines)
                    end_ctx = min(len(lines), line_num + context_lines + 1)

                    grep_match = GrepMatch(
                        document_id=doc_id,
                        line_number=line_num + 1,
                        line_content=line,
                        context_before=lines[start_ctx:line_num],
                        context_after=lines[line_num + 1 : end_ctx],
                        match_start=match.start(),
                        match_end=match.end(),
                    )
                    matches.append(grep_match)

        return matches

    def _format_grep_matches(self, matches: List[GrepMatch]) -> str:
        if not matches:
            return "No matches found."

        lines = []
        current_doc = None

        for match in matches:
            if match.document_id != current_doc:
                if current_doc is not None:
                    lines.append("")
                lines.append(f"=== {match.document_id} ===")
                current_doc = match.document_id

            for ctx_line in match.context_before:
                lines.append(f"  {ctx_line}")

            highlighted = (
                match.line_content[: match.match_start]
                + ">>>"
                + match.line_content[match.match_start : match.match_end]
                + "<<<"
                + match.line_content[match.match_end :]
            )
            lines.append(f"{match.line_number}: {highlighted}")

            for ctx_line in match.context_after:
                lines.append(f"  {ctx_line}")

            lines.append("---")

        return "\n".join(lines)

    def _matches_to_results(self, matches: List[GrepMatch]) -> List[RetrievalResult]:
        doc_matches: Dict[str, List[GrepMatch]] = {}
        for match in matches:
            if match.document_id not in doc_matches:
                doc_matches[match.document_id] = []
            doc_matches[match.document_id].append(match)

        results = []
        for doc_id, doc_match_list in doc_matches.items():
            doc = self._documents.get(doc_id)
            if not doc:
                continue

            score = len(doc_match_list)

            highlights = []
            for m in doc_match_list[:3]:
                highlight = m.line_content.strip()
                if len(highlight) > 100:
                    highlight = highlight[:100] + "..."
                highlights.append(highlight)

            results.append(
                RetrievalResult(
                    document=doc,
                    score=float(score),
                    highlights=highlights,
                )
            )

        results.sort(key=lambda r: r.score, reverse=True)
        return results

    def _execute_impl(self, prompt: str, **params: Any) -> MCPToolResult:
        pattern = params.get("pattern", prompt)
        case_sensitive = params.get("case_sensitive", False)
        context_lines = params.get("context_lines", self.default_context_lines)
        max_matches = params.get("max_matches", self.max_matches)
        return_documents = params.get("return_documents", False)

        matches = self._grep_documents(pattern, case_sensitive, context_lines)
        limited_matches = matches[:max_matches]
        total_matches = len(matches)

        if return_documents:
            results = self._matches_to_results(limited_matches)
            content = self._format_results(results)
        else:
            content = self._format_grep_matches(limited_matches)

        if total_matches > max_matches:
            content += f"\n\n[Showing {max_matches} of {total_matches} matches]"

        return MCPToolResult(
            content=content,
            cost_usd=0.0,
            metadata={
                "tool": "retrieval:grep",
                "pattern": pattern,
                "num_matches": total_matches,
                "num_returned": len(limited_matches),
                "case_sensitive": case_sensitive,
            },
        )

BM25RetrievalServer

Bases: BaseRetrievalServer

BM25 sparse retrieval server.

Uses the BM25 algorithm for keyword-based document retrieval. Fast, CPU-only, and requires rank-bm25 package.

Example

server = BM25RetrievalServer() server.index_documents([ Document(id="1", content="Machine learning is a subset of AI."), Document(id="2", content="Deep learning uses neural networks."), ])

result = server.execute("machine learning neural networks", top_k=5)

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/bm25_server.py
class BM25RetrievalServer(BaseRetrievalServer):
    """BM25 sparse retrieval server.

    Uses the BM25 algorithm for keyword-based document retrieval.
    Fast, CPU-only, and requires rank-bm25 package.

    Example:
        server = BM25RetrievalServer()
        server.index_documents([
            Document(id="1", content="Machine learning is a subset of AI."),
            Document(id="2", content="Deep learning uses neural networks."),
        ])

        result = server.execute("machine learning neural networks", top_k=5)
    """

    def __init__(
        self,
        telemetry_collector: Optional[Any] = None,
        event_recorder: Optional[Any] = None,
        k1: float = 1.5,
        b: float = 0.75,
        tokenizer: Optional[str] = None,
    ):
        super().__init__(
            name="retrieval:bm25",
            telemetry_collector=telemetry_collector,
            event_recorder=event_recorder,
        )
        self.k1 = k1
        self.b = b
        self.tokenizer = tokenizer

        self._documents: List[Document] = []
        self._tokenized_corpus: List[List[str]] = []
        self._bm25: Optional[Any] = None

    def _tokenize(self, text: str) -> List[str]:
        tokens = re.findall(r"\b\w+\b", text.lower())
        return tokens

    def index_documents(self, documents: List[Document]) -> int:
        try:
            from rank_bm25 import BM25Okapi
        except ImportError:
            raise ImportError(
                "rank-bm25 is required for BM25RetrievalServer. "
                "Install with: pip install rank-bm25"
            )

        self._documents = list(documents)
        self._tokenized_corpus = [self._tokenize(doc.content) for doc in documents]

        self._bm25 = BM25Okapi(
            self._tokenized_corpus,
            k1=self.k1,
            b=self.b,
        )

        self._document_count = len(self._documents)
        return self._document_count

    def clear_index(self) -> None:
        self._documents.clear()
        self._tokenized_corpus.clear()
        self._bm25 = None
        self._document_count = 0

    def _search(self, query: str, top_k: int = 5) -> List[RetrievalResult]:
        if self._bm25 is None or not self._documents:
            return []

        query_tokens = self._tokenize(query)
        if not query_tokens:
            return []

        scores = self._bm25.get_scores(query_tokens)

        top_indices = sorted(
            range(len(scores)), key=lambda i: scores[i], reverse=True
        )[:top_k]

        results = []
        for idx in top_indices:
            if scores[idx] > 0:
                doc = self._documents[idx]
                highlights = self._generate_highlights(doc.content, query_tokens)
                results.append(
                    RetrievalResult(
                        document=doc,
                        score=float(scores[idx]),
                        highlights=highlights,
                    )
                )

        return results

    def _generate_highlights(
        self, content: str, query_tokens: List[str], max_highlights: int = 3
    ) -> List[str]:
        highlights = []
        sentences = re.split(r"[.!?]\s+", content)

        for sentence in sentences:
            sentence_lower = sentence.lower()
            if any(token in sentence_lower for token in query_tokens):
                if len(sentence) > 150:
                    sentence = sentence[:150] + "..."
                highlights.append(sentence.strip())
                if len(highlights) >= max_highlights:
                    break

        return highlights

    def _execute_impl(self, prompt: str, **params: Any) -> MCPToolResult:
        top_k = params.get("top_k", 5)
        include_scores = params.get("include_scores", True)
        include_metadata = params.get("include_metadata", False)

        if self._bm25 is None:
            return MCPToolResult(
                content="No documents indexed. Call index_documents() first.",
                cost_usd=0.0,
                metadata={"tool": "retrieval:bm25", "error": "no_index"},
            )

        results = self._search(prompt, top_k=top_k)

        content = self._format_results(
            results,
            include_scores=include_scores,
            include_metadata=include_metadata,
        )

        return MCPToolResult(
            content=content,
            cost_usd=0.0,
            metadata={
                "tool": "retrieval:bm25",
                "query": prompt,
                "num_results": len(results),
                "top_k": top_k,
                "indexed_documents": self._document_count,
            },
        )

DenseRetrievalServer

Bases: BaseRetrievalServer

Dense neural retrieval server using FAISS + sentence-transformers.

Latency: ~50ms per query Cost: Zero (local inference)

Example

server = DenseRetrievalServer(model_name="all-MiniLM-L6-v2") server.index_documents([ Document(id="1", content="Machine learning automates data analysis."), ]) result = server.execute("AI learns patterns from data", top_k=5)

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/dense_server.py
class DenseRetrievalServer(BaseRetrievalServer):
    """Dense neural retrieval server using FAISS + sentence-transformers.

    Latency: ~50ms per query
    Cost: Zero (local inference)

    Example:
        server = DenseRetrievalServer(model_name="all-MiniLM-L6-v2")
        server.index_documents([
            Document(id="1", content="Machine learning automates data analysis."),
        ])
        result = server.execute("AI learns patterns from data", top_k=5)
    """

    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        telemetry_collector: Optional[Any] = None,
        event_recorder: Optional[Any] = None,
        use_gpu: bool = False,
        gpu_device: int = 0,
        normalize_embeddings: bool = True,
        batch_size: int = 32,
    ):
        super().__init__(
            name="retrieval:dense",
            telemetry_collector=telemetry_collector,
            event_recorder=event_recorder,
        )
        self.model_name = model_name
        self.use_gpu = use_gpu
        self.gpu_device = gpu_device
        self.normalize_embeddings = normalize_embeddings
        self.batch_size = batch_size

        self._encoder: Optional[Any] = None
        self._index: Optional[Any] = None
        self._documents: List[Document] = []
        self._embedding_dim: Optional[int] = None

    def _get_encoder(self) -> Any:
        if self._encoder is None:
            try:
                from sentence_transformers import SentenceTransformer
            except ImportError:
                raise ImportError(
                    "sentence-transformers is required for DenseRetrievalServer. "
                    "Install with: pip install sentence-transformers"
                )

            if self.use_gpu:
                device = f"cuda:{self.gpu_device}"
            else:
                device = "cpu"

            self._encoder = SentenceTransformer(self.model_name, device=device)
            self._embedding_dim = self._encoder.get_sentence_embedding_dimension()

        return self._encoder

    def _create_faiss_index(self, dimension: int) -> Any:
        try:
            import faiss
        except ImportError:
            raise ImportError(
                "faiss-cpu is required for DenseRetrievalServer. "
                "Install with: pip install faiss-cpu"
            )

        if self.normalize_embeddings:
            index = faiss.IndexFlatIP(dimension)
        else:
            index = faiss.IndexFlatL2(dimension)

        if self.use_gpu:
            try:
                res = faiss.StandardGpuResources()
                index = faiss.index_cpu_to_gpu(res, self.gpu_device, index)
            except Exception:
                pass

        return index

    def index_documents(self, documents: List[Document]) -> int:
        import numpy as np

        if not documents:
            return 0

        encoder = self._get_encoder()
        self._documents = list(documents)

        texts = [doc.content for doc in documents]
        embeddings = encoder.encode(
            texts,
            batch_size=self.batch_size,
            show_progress_bar=False,
            normalize_embeddings=self.normalize_embeddings,
        )

        embeddings = np.array(embeddings, dtype=np.float32)

        self._index = self._create_faiss_index(embeddings.shape[1])
        self._index.add(embeddings)

        self._document_count = len(self._documents)
        return self._document_count

    def clear_index(self) -> None:
        self._documents.clear()
        self._index = None
        self._document_count = 0

    def _search(self, query: str, top_k: int = 5) -> List[RetrievalResult]:
        import numpy as np

        if self._index is None or not self._documents:
            return []

        encoder = self._get_encoder()

        query_embedding = encoder.encode(
            [query],
            normalize_embeddings=self.normalize_embeddings,
        )
        query_embedding = np.array(query_embedding, dtype=np.float32)

        k = min(top_k, len(self._documents))
        scores, indices = self._index.search(query_embedding, k)

        results = []
        for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
            if idx >= 0 and idx < len(self._documents):
                doc = self._documents[idx]
                highlights = self._generate_highlights(doc.content, query)
                results.append(
                    RetrievalResult(
                        document=doc,
                        score=float(score),
                        highlights=highlights,
                    )
                )

        return results

    def _generate_highlights(
        self, content: str, query: str, max_highlights: int = 3
    ) -> List[str]:
        sentences = re.split(r"[.!?]\s+", content)

        highlights = []
        for sentence in sentences[:max_highlights]:
            sentence = sentence.strip()
            if sentence:
                if len(sentence) > 150:
                    sentence = sentence[:150] + "..."
                highlights.append(sentence)

        return highlights

    def save_index(self, path: Union[str, Path]) -> None:
        import faiss
        import json

        path = Path(path)
        path.mkdir(parents=True, exist_ok=True)

        if self._index is not None:
            faiss.write_index(self._index, str(path / "index.faiss"))

        docs_data = [
            {"id": doc.id, "content": doc.content, "metadata": doc.metadata}
            for doc in self._documents
        ]
        with open(path / "documents.json", "w") as f:
            json.dump(docs_data, f)

        meta = {
            "model_name": self.model_name,
            "document_count": self._document_count,
            "embedding_dim": self._embedding_dim,
        }
        with open(path / "metadata.json", "w") as f:
            json.dump(meta, f)

    def load_index(self, path: Union[str, Path]) -> None:
        import faiss
        import json

        path = Path(path)

        self._index = faiss.read_index(str(path / "index.faiss"))

        with open(path / "documents.json") as f:
            docs_data = json.load(f)
        self._documents = [
            Document(id=d["id"], content=d["content"], metadata=d.get("metadata", {}))
            for d in docs_data
        ]

        with open(path / "metadata.json") as f:
            meta = json.load(f)
        self._document_count = meta["document_count"]
        self._embedding_dim = meta.get("embedding_dim")

    def _execute_impl(self, prompt: str, **params: Any) -> MCPToolResult:
        top_k = params.get("top_k", 5)
        include_scores = params.get("include_scores", True)
        include_metadata = params.get("include_metadata", False)

        if self._index is None:
            return MCPToolResult(
                content="No documents indexed. Call index_documents() first.",
                cost_usd=0.0,
                metadata={"tool": "retrieval:dense", "error": "no_index"},
            )

        results = self._search(prompt, top_k=top_k)

        content = self._format_results(
            results,
            include_scores=include_scores,
            include_metadata=include_metadata,
        )

        return MCPToolResult(
            content=content,
            cost_usd=0.0,
            metadata={
                "tool": "retrieval:dense",
                "query": prompt,
                "num_results": len(results),
                "top_k": top_k,
                "model": self.model_name,
                "indexed_documents": self._document_count,
            },
        )

HybridRetrievalServer

Bases: BaseRetrievalServer

Hybrid BM25 + dense retrieval with RRF fusion.

RRF Formula: score(d) = sum(1 / (k + rank(d))) for each retriever

Latency: ~100ms per query Cost: Zero (local inference)

Example

server = HybridRetrievalServer() server.index_documents([ Document(id="1", content="Machine learning automates data analysis."), ]) result = server.execute("ML data patterns", top_k=5)

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/hybrid_server.py
class HybridRetrievalServer(BaseRetrievalServer):
    """Hybrid BM25 + dense retrieval with RRF fusion.

    RRF Formula: score(d) = sum(1 / (k + rank(d))) for each retriever

    Latency: ~100ms per query
    Cost: Zero (local inference)

    Example:
        server = HybridRetrievalServer()
        server.index_documents([
            Document(id="1", content="Machine learning automates data analysis."),
        ])
        result = server.execute("ML data patterns", top_k=5)
    """

    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        telemetry_collector: Optional[Any] = None,
        event_recorder: Optional[Any] = None,
        bm25_weight: float = 0.5,
        dense_weight: float = 0.5,
        rrf_k: int = 60,
        use_gpu: bool = False,
        gpu_device: int = 0,
    ):
        super().__init__(
            name="retrieval:hybrid",
            telemetry_collector=telemetry_collector,
            event_recorder=event_recorder,
        )
        self.bm25_weight = bm25_weight
        self.dense_weight = dense_weight
        self.rrf_k = rrf_k

        self._bm25 = BM25RetrievalServer(
            telemetry_collector=telemetry_collector,
            event_recorder=event_recorder,
        )
        self._dense = DenseRetrievalServer(
            model_name=model_name,
            telemetry_collector=telemetry_collector,
            event_recorder=event_recorder,
            use_gpu=use_gpu,
            gpu_device=gpu_device,
        )
        self._documents: Dict[str, Document] = {}

    def index_documents(self, documents: List[Document]) -> int:
        self._documents = {doc.id: doc for doc in documents}

        bm25_count = self._bm25.index_documents(documents)
        dense_count = self._dense.index_documents(documents)

        self._document_count = min(bm25_count, dense_count)
        return self._document_count

    def clear_index(self) -> None:
        self._documents.clear()
        self._bm25.clear_index()
        self._dense.clear_index()
        self._document_count = 0

    def _rrf_fusion(
        self,
        bm25_results: List[RetrievalResult],
        dense_results: List[RetrievalResult],
        top_k: int,
    ) -> List[RetrievalResult]:
        bm25_ranks: Dict[str, int] = {
            r.document.id: i + 1 for i, r in enumerate(bm25_results)
        }
        dense_ranks: Dict[str, int] = {
            r.document.id: i + 1 for i, r in enumerate(dense_results)
        }

        all_doc_ids = set(bm25_ranks.keys()) | set(dense_ranks.keys())

        rrf_scores: Dict[str, float] = {}
        for doc_id in all_doc_ids:
            score = 0.0
            if doc_id in bm25_ranks:
                score += self.bm25_weight / (self.rrf_k + bm25_ranks[doc_id])
            if doc_id in dense_ranks:
                score += self.dense_weight / (self.rrf_k + dense_ranks[doc_id])
            rrf_scores[doc_id] = score

        sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)

        results = []
        for doc_id in sorted_ids[:top_k]:
            doc = self._documents.get(doc_id)
            if doc is None:
                continue

            highlights = []
            for r in bm25_results:
                if r.document.id == doc_id:
                    highlights.extend(r.highlights)
                    break
            for r in dense_results:
                if r.document.id == doc_id:
                    for h in r.highlights:
                        if h not in highlights:
                            highlights.append(h)
                    break

            results.append(
                RetrievalResult(
                    document=doc,
                    score=rrf_scores[doc_id],
                    highlights=highlights[:3],
                )
            )

        return results

    def _search(
        self,
        query: str,
        top_k: int = 5,
        bm25_candidates: int = 20,
        dense_candidates: int = 20,
    ) -> List[RetrievalResult]:
        bm25_results = self._bm25._search(query, top_k=bm25_candidates)
        dense_results = self._dense._search(query, top_k=dense_candidates)

        return self._rrf_fusion(bm25_results, dense_results, top_k)

    def save_index(self, path: Union[str, Path]) -> None:
        import json

        path = Path(path)
        path.mkdir(parents=True, exist_ok=True)

        self._dense.save_index(path / "dense")

        meta = {
            "bm25_weight": self.bm25_weight,
            "dense_weight": self.dense_weight,
            "rrf_k": self.rrf_k,
            "document_count": self._document_count,
        }
        with open(path / "hybrid_metadata.json", "w") as f:
            json.dump(meta, f)

    def load_index(self, path: Union[str, Path]) -> None:
        import json

        path = Path(path)

        self._dense.load_index(path / "dense")
        self._documents = {doc.id: doc for doc in self._dense._documents}
        self._bm25.index_documents(list(self._documents.values()))

        with open(path / "hybrid_metadata.json") as f:
            meta = json.load(f)
        self._document_count = meta["document_count"]

    def _execute_impl(self, prompt: str, **params: Any) -> MCPToolResult:
        top_k = params.get("top_k", 5)
        bm25_candidates = params.get("bm25_candidates", 20)
        dense_candidates = params.get("dense_candidates", 20)
        include_scores = params.get("include_scores", True)
        include_metadata = params.get("include_metadata", False)

        if self._document_count == 0:
            return MCPToolResult(
                content="No documents indexed. Call index_documents() first.",
                cost_usd=0.0,
                metadata={"tool": "retrieval:hybrid", "error": "no_index"},
            )

        results = self._search(
            prompt,
            top_k=top_k,
            bm25_candidates=bm25_candidates,
            dense_candidates=dense_candidates,
        )

        content = self._format_results(
            results,
            include_scores=include_scores,
            include_metadata=include_metadata,
        )

        return MCPToolResult(
            content=content,
            cost_usd=0.0,
            metadata={
                "tool": "retrieval:hybrid",
                "query": prompt,
                "num_results": len(results),
                "top_k": top_k,
                "bm25_weight": self.bm25_weight,
                "dense_weight": self.dense_weight,
                "indexed_documents": self._document_count,
            },
        )

IndexManager

Manages index persistence and caching.

Example

manager = IndexManager(cache_dir="./index_cache")

if not manager.is_valid("my_corpus", corpus_hash): server = BM25RetrievalServer() server.index_documents(documents) manager.save(server, "my_corpus", corpus_hash) else: server = manager.load("my_corpus", BM25RetrievalServer)

Source code in intelligence-per-watt/src/ipw/agents/mcp/retrieval/index_manager.py
class IndexManager:
    """Manages index persistence and caching.

    Example:
        manager = IndexManager(cache_dir="./index_cache")

        if not manager.is_valid("my_corpus", corpus_hash):
            server = BM25RetrievalServer()
            server.index_documents(documents)
            manager.save(server, "my_corpus", corpus_hash)
        else:
            server = manager.load("my_corpus", BM25RetrievalServer)
    """

    def __init__(
        self,
        cache_dir: Union[str, Path] = "./.retrieval_cache",
        auto_clean: bool = True,
        max_cache_entries: int = 50,
    ):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.auto_clean = auto_clean
        self.max_cache_entries = max_cache_entries

    def _get_index_path(self, name: str) -> Path:
        safe_name = "".join(c if c.isalnum() or c in "-_" else "_" for c in name)
        return self.cache_dir / safe_name

    def _compute_corpus_hash(self, documents: List[Document]) -> str:
        hasher = hashlib.sha256()
        for doc in sorted(documents, key=lambda d: d.id):
            hasher.update(doc.id.encode())
            hasher.update(doc.content.encode())
        return hasher.hexdigest()[:16]

    def is_valid(self, name: str, corpus_hash: Optional[str] = None) -> bool:
        index_path = self._get_index_path(name)
        metadata_path = index_path / "index_metadata.json"

        if not metadata_path.exists():
            return False

        if corpus_hash is None:
            return True

        try:
            with open(metadata_path) as f:
                metadata = IndexMetadata.from_dict(json.load(f))
            return metadata.corpus_hash == corpus_hash
        except Exception:
            return False

    def get_metadata(self, name: str) -> Optional[IndexMetadata]:
        index_path = self._get_index_path(name)
        metadata_path = index_path / "index_metadata.json"

        if not metadata_path.exists():
            return None

        try:
            with open(metadata_path) as f:
                return IndexMetadata.from_dict(json.load(f))
        except Exception:
            return None

    def save(
        self,
        server: BaseRetrievalServer,
        name: str,
        corpus_hash: str,
        extra_metadata: Optional[Dict[str, Any]] = None,
    ) -> Path:
        index_path = self._get_index_path(name)

        if index_path.exists():
            shutil.rmtree(index_path)
        index_path.mkdir(parents=True)

        if hasattr(server, "save_index"):
            server.save_index(index_path / "server_data")
        else:
            docs_data = []
            if hasattr(server, "_documents"):
                docs = server._documents
                if isinstance(docs, dict):
                    docs = list(docs.values())
                docs_data = [
                    {"id": d.id, "content": d.content, "metadata": d.metadata}
                    for d in docs
                ]
            with open(index_path / "documents.json", "w") as f:
                json.dump(docs_data, f)

        metadata = IndexMetadata(
            name=name,
            server_type=server.__class__.__name__,
            document_count=server.document_count,
            corpus_hash=corpus_hash,
            extra=extra_metadata or {},
        )
        with open(index_path / "index_metadata.json", "w") as f:
            json.dump(metadata.to_dict(), f)

        if self.auto_clean:
            self._cleanup_old_entries()

        return index_path

    def load(
        self,
        name: str,
        server_class: Type[BaseRetrievalServer],
        **server_kwargs: Any,
    ) -> Optional[BaseRetrievalServer]:
        index_path = self._get_index_path(name)
        metadata_path = index_path / "index_metadata.json"

        if not metadata_path.exists():
            return None

        try:
            server = server_class(**server_kwargs)

            if hasattr(server, "load_index") and (index_path / "server_data").exists():
                server.load_index(index_path / "server_data")
            elif (index_path / "documents.json").exists():
                with open(index_path / "documents.json") as f:
                    docs_data = json.load(f)
                documents = [
                    Document(
                        id=d["id"],
                        content=d["content"],
                        metadata=d.get("metadata", {}),
                    )
                    for d in docs_data
                ]
                server.index_documents(documents)

            return server
        except Exception as e:
            print(f"Warning: Failed to load index '{name}': {e}")
            return None

    def delete(self, name: str) -> bool:
        index_path = self._get_index_path(name)
        if index_path.exists():
            shutil.rmtree(index_path)
            return True
        return False

    def list_indices(self) -> List[IndexMetadata]:
        indices = []
        for path in self.cache_dir.iterdir():
            if path.is_dir():
                metadata_path = path / "index_metadata.json"
                if metadata_path.exists():
                    try:
                        with open(metadata_path) as f:
                            indices.append(IndexMetadata.from_dict(json.load(f)))
                    except Exception:
                        pass
        return indices

    def _cleanup_old_entries(self) -> int:
        indices = self.list_indices()
        if len(indices) <= self.max_cache_entries:
            return 0

        paths = []
        for idx in indices:
            path = self._get_index_path(idx.name)
            if path.exists():
                mtime = path.stat().st_mtime
                paths.append((mtime, idx.name))

        paths.sort()

        to_remove = len(paths) - self.max_cache_entries
        removed = 0
        for _, name in paths[:to_remove]:
            if self.delete(name):
                removed += 1

        return removed