Source code for denselinkage.core.ports

"""Ports — structural contracts (``typing.Protocol``)."""

from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Protocol, TypeVar, runtime_checkable

from denselinkage.core.models import (
    CandidatePair,
    MatchDecision,
    MatchError,
    Record,
    RecordId,
)

if TYPE_CHECKING:
    import numpy as np
    import numpy.typing as npt

    from denselinkage.core.results import (
        ClusteringResult,
        LinkageResult,
        TrainingPairs,
    )

    Vectors = npt.NDArray[np.float32]
else:
    Vectors = Any

ComponentT = TypeVar("ComponentT")

# ``@runtime_checkable`` is
# retained ONLY so the structure-stage contract test can assert
# ``_is_runtime_protocol`` (tests/test_contract.py). No runtime ``isinstance``
# dispatch against these ports exists or is intended; first-party adapters
# subclass their port explicitly and mypy completeness-checks them.


[docs] @runtime_checkable class Serializer(Protocol): def serialize(self, record: Mapping[str, Any]) -> str: ...
[docs] @runtime_checkable class Embedder(Protocol): """Maps text to dense vectors. ``encode`` is the v1 workhorse; ``model_id`` and ``embedding_dim`` are reserved provenance / validation surface (see their docstrings). They stay on the port even though the v1 link path does not call them: adding a port member after the freeze is breaking, whereas removing an unused one later is cheap, so the asymmetry favours keeping them (ADR-0003).""" @property def model_id(self) -> str: """Stable identifier of the embedding model (e.g. its name/checkpoint). Reserved for **provenance**: the Phase-B Reference Store records it so a persisted index can refuse a query embedded by a different model. Not consumed by the v1 link path.""" ... @property def embedding_dim(self) -> int: """Output width of :meth:`encode`. Reserved for **eager dimension validation**; v1 instead detects width mismatches at search time via ``DimensionMismatch``, so this is not yet consumed.""" ... def encode( self, texts: Sequence[str], *, batch_size: int | None = None, show_progress: bool = False, ) -> Vectors: ...
[docs] @runtime_checkable class VectorIndex(Protocol): """Spec for a vector-index backend — stateless and reusable. Holds no vectors, only the configuration of *which* index to build. ``build`` is a factory (cf. :class:`Trainer`, "a factory, not a fit"): it returns a fresh populated :class:`SearchableIndex` and mutates neither ``self`` nor its arguments, so one spec safely builds many artifacts over different datasets. """ def build(self, vectors: Vectors, ids: Sequence[RecordId]) -> "SearchableIndex": ...
[docs] @runtime_checkable class SearchableIndex(Protocol): """Immutable fitted artifact produced by :meth:`VectorIndex.build`. Holds the indexed vectors for one dataset and answers nearest-neighbour queries; it has no ``add`` — the vectors are fixed at build time. ``search`` raises ``DimensionMismatch`` if a query's width differs from the indexed vectors. v1 is batch-oriented and the artifact is immutable. Incremental update is out of scope for v1; :meth:`extended` is the designed (not-yet-implemented) escape hatch — it returns a NEW artifact rather than mutating this one. """ def search( self, queries: Vectors, *, top_k: int ) -> list[list[tuple[RecordId, float]]]: ...
[docs] def extended(self, vectors: Vectors, ids: Sequence[RecordId]) -> "SearchableIndex": """Return a NEW artifact holding this index's vectors plus ``vectors``; this instance is left unchanged (the immutable-artifact guarantee). ``ids`` align positionally with ``vectors`` and must be disjoint from the ids already indexed; ``vectors`` must match the indexed width or ``DimensionMismatch`` is raised. Incremental indexing is out of scope for v1, so the v1 reference artifacts raise ``NotImplementedError`` rather than returning ``None``; the signature is fixed pre-freeze so the capability can land later without a breaking change. """ ...
[docs] @runtime_checkable class Blocker(Protocol): """Spec for candidate generation — stateless and reusable. ``build`` is a factory: it indexes ``records`` into a fresh :class:`BlockingIndex` and mutates nothing, so one ``Blocker`` (and the immutable ``DenseLinker`` that holds it) can be reused across datasets. """ def build(self, records: Sequence[Record]) -> "BlockingIndex": ...
[docs] @runtime_checkable class BlockingIndex(Protocol): """Immutable fitted artifact produced by :meth:`Blocker.build`. Holds the indexed (left/reference) records and generates ``CandidatePair``s for a query record set. ``top_k`` and ``similarity_threshold`` are query-time parameters: they default to the values fixed on the originating ``Blocker`` spec but may be overridden per call, so a ``top_k`` / threshold sweep reuses one built index instead of rebuilding it. An override ``top_k <= 0`` raises ``InvalidTopK`` (parity with the index-time validation documented on :meth:`~denselinkage.linkage.DenseLinker.index`). """ def query( self, records: Sequence[Record], *, top_k: int | None = None, similarity_threshold: float | None = None, ) -> list[CandidatePair]: ...
[docs] @runtime_checkable class Filter(Protocol): """A second comparison-space reduction, distinct from blocking: prune a candidate set before matching. Pure over already-generated pairs (carries no indexing state). ``SimilarityThresholdFilter`` is the dependency-free reference adapter; multi-pass / rule-based filters conform here. """ def filter(self, pairs: Sequence[CandidatePair]) -> list[CandidatePair]: ...
[docs] @runtime_checkable class Matcher(Protocol):
[docs] def match(self, pairs: Sequence[CandidatePair]) -> list[MatchDecision | MatchError]: """One outcome per input pair, aligned by position. A pair the matcher cannot decide yields a ``MatchError`` (never raises into the batch, so one bad call does not abort the rest).""" ...
[docs] @runtime_checkable class Clusterer(Protocol): """Groups a ``LinkageResult``'s matches into entity clusters — a swappable strategy, not a fixed step. ``ConnectedComponentsClusterer`` (wrapping the ``connected_components`` reference function) declares this port; alternative algorithms (e.g. agglomerative, incremental) conform here. Pure over an already-computed ``LinkageResult``; carries no blocking/matching state. """ def cluster(self, result: "LinkageResult") -> "ClusteringResult": ...
[docs] @runtime_checkable class Trainer(Protocol[ComponentT]): """Produces a frozen component from supervised data (v2, ``[train]``). ``train`` is a factory, not a fit: it returns a NEW component and mutates neither ``self`` nor ``base``. ``base`` is the optional checkpoint to continue from (enables the recursive blocker -> mine -> retrain loop). The protocol is locked here in Phase A; concrete trainers (``EmbedderTrainer -> Embedder``, ``CrossEncoderTrainer -> Matcher``) ship in v2 behind the ``[train]`` extra. """ def train( self, pairs: "TrainingPairs", *, base: ComponentT | None = None ) -> ComponentT: ...