Source code for dedupe.api

#!/usr/bin/python
"""
dedupe provides the main user interface for the library the
Dedupe class
"""
from __future__ import annotations

import itertools
import logging
import multiprocessing
import os
import pickle
import sqlite3
import tempfile
import warnings
from typing import TYPE_CHECKING, Literal, cast, overload

import numpy
import sklearn.linear_model
import sklearn.model_selection

import dedupe.blocking as blocking
import dedupe.clustering as clustering
import dedupe.core as core
import dedupe.datamodel as datamodel
import dedupe.labeler as labeler
import dedupe.predicates
import dedupe.serializer as serializer

if TYPE_CHECKING:
    from typing import BinaryIO, Collection, Generator, Iterable, MutableMapping, TextIO

    import numpy.typing

    from dedupe._typing import (
        ArrayLinks,
        Blocks,
        BlocksInt,
        BlocksStr,
        Classifier,
        Clusters,
        ClustersInt,
        ClustersStr,
        Data,
        DataInt,
        DataStr,
        JoinConstraint,
        LabelsLike,
        Links,
        LookupResultsInt,
        LookupResultsStr,
        PathLike,
        RecordDict,
    )
    from dedupe._typing import RecordDictPair as TrainingExample
    from dedupe._typing import RecordDictPairs as TrainingExamples
    from dedupe._typing import (
        RecordID,
        RecordPairs,
        Scores,
        TrainingData,
        TupleLinks,
        Variable,
    )

logger = logging.getLogger(__name__)


class Matching:
    """
    Base Class for Record Matching Classes
    """

    def __init__(
        self, num_cores: int | None, in_memory: bool = False, **kwargs
    ) -> None:
        if num_cores is None:
            self.num_cores = multiprocessing.cpu_count()
        else:
            self.num_cores = num_cores

        self.in_memory = in_memory
        self._fingerprinter: blocking.Fingerprinter | None = None
        self.data_model: datamodel.DataModel
        self.classifier: Classifier
        self.predicates: Collection[dedupe.predicates.Predicate]

    @property
    def fingerprinter(self) -> blocking.Fingerprinter:
        if self._fingerprinter is None:
            raise ValueError(
                "the record fingerprinter is not intialized, "
                "please run the train method"
            )

        return self._fingerprinter


class IntegralMatching(Matching):
    """
    This class is for linking class where we need to score all possible
    pairs before deciding on any matches
    """

    def score(self, pairs: RecordPairs) -> Scores:
        """
        Scores pairs of records. Returns pairs of tuples of records id and
        associated probabilities that the pair of records are match

        Args:
            pairs: Iterator of pairs of records

        """
        try:
            matches = core.scoreDuplicates(
                pairs, self.data_model.distances, self.classifier, self.num_cores
            )
        except RuntimeError:
            raise RuntimeError(
                """
                You need to either turn off multiprocessing or protect
                the calls to the Dedupe methods with a
                `if __name__ == '__main__'` in your main module, see
                https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods"""
            )

        return matches


class DedupeMatching(IntegralMatching):
    """
    Class for Deduplication, extends Matching.

    Use DedupeMatching when you have a dataset that can contain
    multiple references to the same entity.

    """

    @overload
    def partition(
        self, data: DataInt, threshold: float = 0.5
    ) -> ClustersInt:  # pragma: no cover
        ...

    @overload
    def partition(
        self, data: DataStr, threshold: float = 0.5
    ) -> ClustersStr:  # pragma: no cover
        ...

    def partition(self, data, threshold=0.5):  # pragma: no cover
        """
        Identifies records that all refer to the same entity, returns
        tuples containing a sequence of record ids and corresponding
        sequence of confidence score as a float between 0 and 1. The
        record_ids within each set should refer to the same entity and the
        confidence score is a measure of our confidence a particular entity
        belongs in the cluster.

        For details on the confidence score, see :func:`dedupe.Dedupe.cluster`.

        This method should only used for small to moderately sized
        datasets for larger data, you need may need to generate your
        own pairs of records and feed them to :func:`~score`.

        Args:
            data: Dictionary of records, where the keys are record_ids
                  and the values are dictionaries with the keys being
                  field names

            threshold: Number between 0 and 1.  We
                       will only consider put together records into
                       clusters if the `cophenetic similarity
                       <https://en.wikipedia.org/wiki/Cophenetic>`_ of
                       the cluster is greater than the threshold.

                       Lowering the number will increase recall,
                       raising it will increase precision

        Examples:
            >>> duplicates = matcher.partition(data, threshold=0.5)
            >>> duplicates
            [
                ((1, 2, 3), (0.790, 0.860, 0.790)),
                ((4, 5), (0.720, 0.720)),
                ((10, 11), (0.899, 0.899)),
            ]
        """
        pairs = self.pairs(data)
        pair_scores = self.score(pairs)
        clusters = self.cluster(pair_scores, threshold)
        clusters = self._add_singletons(data.keys(), clusters)
        clusters_eval = list(clusters)
        _cleanup_scores(pair_scores)
        return clusters_eval

    @overload
    @staticmethod
    def _add_singletons(
        all_ids: Iterable[int], clusters: ClustersInt
    ) -> ClustersInt: ...

    @overload
    @staticmethod
    def _add_singletons(
        all_ids: Iterable[str], clusters: ClustersStr
    ) -> ClustersStr: ...

    @staticmethod
    def _add_singletons(all_ids, clusters):
        singletons = set(all_ids)

        for record_ids, score in clusters:
            singletons.difference_update(record_ids)
            yield (record_ids, score)

        for singleton in singletons:
            yield (singleton,), (1.0,)

    def pairs(self, data: Data) -> RecordPairs:
        """
        Yield pairs of records that share common fingerprints.

        Each pair will occur at most once. If you override this
        method, you need to take care to ensure that this remains
        true, as downstream methods, particularly :func:`cluster`, assumes
        that every pair of records is compared no more than once.

        Args:
            data: Dictionary of records, where the keys are record_ids
                  and the values are dictionaries with the keys being
                  field names

        Examples:
            >>> pairs = matcher.pairs(data)
            >>> list(pairs)
            [
                (
                    (1, {"name": "Pat", "address": "123 Main"}),
                    (2, {"name": "Pat", "address": "123 Main"}),
                ),
                (
                    (1, {"name": "Pat", "address": "123 Main"}),
                    (3, {"name": "Sam", "address": "123 Main"}),
                ),
            ]
        """

        self.fingerprinter.index_all(data)

        id_type = core.sqlite_id_type(data)

        # Blocking and pair generation are typically the first memory
        # bottlenecks, so we'll use sqlite3 to avoid doing them in memory
        with tempfile.TemporaryDirectory() as temp_dir:
            if self.in_memory:
                con = sqlite3.connect(":memory:")
            else:
                con = sqlite3.connect(temp_dir + "/blocks.db")

            # Set journal mode to WAL.
            con.execute("pragma journal_mode=off")
            con.execute(
                f"CREATE TABLE blocking_map (block_key text, record_id {id_type})"
            )
            con.executemany(
                "INSERT INTO blocking_map values (?, ?)",
                self.fingerprinter(data.items()),
            )

            self.fingerprinter.reset_indices()

            con.execute(
                """CREATE UNIQUE INDEX record_id_block_key_idx
                           ON blocking_map (record_id, block_key)"""
            )
            con.execute(
                """CREATE INDEX block_key_idx
                           ON blocking_map (block_key)"""
            )
            con.execute("""ANALYZE""")
            pairs = con.execute(
                """SELECT DISTINCT a.record_id, b.record_id
                                   FROM blocking_map a
                                   INNER JOIN blocking_map b
                                   USING (block_key)
                                   WHERE a.record_id < b.record_id"""
            )

            for a_record_id, b_record_id in pairs:
                yield (
                    (a_record_id, data[a_record_id]),
                    (b_record_id, data[b_record_id]),
                )

            pairs.close()
            con.close()

    def cluster(self, scores: Scores, threshold: float = 0.5) -> Clusters:
        r"""From the similarity scores of pairs of records, decide which groups
        of records are all referring to the same entity.

        Yields tuples containing a sequence of record ids and corresponding
        sequence of confidence score as a float between 0 and 1. The
        record_ids within each set should refer to the same entity and the
        confidence score is a measure of our confidence a particular entity
        belongs in the cluster.

        Each confidence scores is a measure of how similar the record is
        to the other records in the cluster. Let :math:`\phi(i,j)` be the pair-wise
        similarity between records :math:`i` and :math:`j`. Let :math:`N` be the number of records in the cluster.

        .. math::

           \text{confidence score}_i = 1 - \sqrt {\frac{\sum_{j}^N (1 - \phi(i,j))^2}{N -1}}

        This measure is similar to the average squared distance
        between the focal record and the other records in the
        cluster. These scores can be `combined to give a total score
        for the cluster
        <https://en.wikipedia.org/wiki/Variance#Discrete_random_variable>`_.

        .. math::

           \text{cluster score} = 1 - \sqrt { \frac{\sum_i^N(1 - \mathrm{score}_i)^2 \cdot (N - 1) } { 2 N^2}}

        Args:
            scores: a numpy `structured array <https://docs.scipy.org/doc/numpy/user/basics.rec.html>`_ with a dtype of `[('pairs', id_type, 2),
                    ('score', 'f4')]` where dtype is either a str
                    or int, and score is a number between 0 and
                    1. The 'pairs' column contains pairs of ids of
                    the records compared and the 'score' column
                    should contains the similarity score for that
                    pair of records.

                    For each pair, the smaller id should be first.

            threshold: Number between 0 and 1. We will only consider
                       put together records into clusters if the
                       `cophenetic similarity
                       <https://en.wikipedia.org/wiki/Cophenetic>`_ of
                       the cluster is greater than the threshold.

                       Lowering the number will increase recall,
                       raising it will increase precision

        Examples:
            >>> pairs = matcher.pairs(data)
            >>> scores = matcher.scores(pairs)
            >>> clusters = matcher.cluster(scores)
            >>> list(clusters)
            [
                ((1, 2, 3), (0.790, 0.860, 0.790)),
                ((4, 5), (0.720, 0.720)),
                ((10, 11), (0.899, 0.899)),
            ]

        """

        logger.debug("matching done, begin clustering")

        yield from clustering.cluster(scores, threshold)


class RecordLinkMatching(IntegralMatching):
    """
    Class for Record Linkage, extends Matching.

    Use RecordLinkMatching when you have two datasets that you want to merge
    """

    def pairs(self, data_1: Data, data_2: Data) -> RecordPairs:
        """
        Yield pairs of records that share common fingerprints.

        Each pair will occur at most once. If you override this
        method, you need to take care to ensure that this remains
        true, as downstream methods, particularly :func:`one_to_one`,
        and :func:`many_to_one` assumes that every pair of records is
        compared no more than once.

        Args:
            data_1: Dictionary of records from first dataset, where the
                    keys are record_ids and the values are dictionaries
                    with the keys being field names
            data_2: Dictionary of records from second dataset, same
                    form as data_1

        Examples:
            >>> pairs = matcher.pairs(data_1, data_2)
            >>> list(pairs)
            [
                (
                    (1, {"name": "Pat", "address": "123 Main"}),
                    (2, {"name": "Pat", "address": "123 Main"}),
                ),
                (
                    (1, {"name": "Pat", "address": "123 Main"}),
                    (3, {"name": "Sam", "address": "123 Main"}),
                ),
            ]
        """

        self.fingerprinter.index_all(data_2)

        id_type_a = core.sqlite_id_type(data_1)
        id_type_b = core.sqlite_id_type(data_2)

        # Blocking and pair generation are typically the first memory
        # bottlenecks, so we'll use sqlite3 to avoid doing them in memory
        with tempfile.TemporaryDirectory() as temp_dir:
            if self.in_memory:
                con = sqlite3.connect(":memory:")
            else:
                con = sqlite3.connect(temp_dir + "/blocks.db")

            con.execute("pragma journal_mode=off")

            con.executescript(
                f"""CREATE TABLE blocking_map_a
                                 (block_key text, record_id {id_type_a});

                                 CREATE TABLE blocking_map_b
                                 (block_key text, record_id {id_type_b});"""
            )

            con.executemany(
                "INSERT INTO blocking_map_a values (?, ?)",
                self.fingerprinter(data_1.items()),
            )

            con.executemany(
                "INSERT INTO blocking_map_b values (?, ?)",
                self.fingerprinter(data_2.items(), target=True),
            )

            self.fingerprinter.reset_indices()

            con.executescript(
                """CREATE UNIQUE INDEX block_key_a_idx
                                 ON blocking_map_a (record_id, block_key);

                   CREATE UNIQUE INDEX block_key_b_idx
                                 ON blocking_map_b (block_key, record_id);"""
            )
            con.execute("""ANALYZE""")

            pairs = con.execute(
                """SELECT DISTINCT a.record_id, b.record_id
                                   FROM blocking_map_a a
                                   INNER JOIN blocking_map_b b
                                   USING (block_key)"""
            )

            for a_record_id, b_record_id in pairs:
                yield (
                    (a_record_id, data_1[a_record_id]),
                    (b_record_id, data_2[b_record_id]),
                )

            pairs.close()
            con.close()

    def join(
        self,
        data_1: Data,
        data_2: Data,
        threshold: float = 0.5,
        constraint: JoinConstraint = "one-to-one",
    ) -> Links:
        """
        Identifies pairs of records that refer to the same entity.

        Returns pairs of record ids with a confidence score as a float
        between 0 and 1. The record_ids within the pair should refer to the
        same entity and the confidence score is the estimated probability that
        the records refer to the same entity.

        This method should only used for small to moderately sized
        datasets for larger data, you need may need to generate your
        own pairs of records and feed them to the :func:`~score`.

        Args:
            data_1: Dictionary of records from first dataset, where the
                    keys are record_ids and the values are dictionaries
                    with the keys being field names

            data_2: Dictionary of records from second dataset, same form
                    as data_1

            threshold: Number between 0 and 1. We
                       will consider records as potential
                       duplicates if the predicted probability of
                       being a duplicate is above the threshold.

                       Lowering the number will increase recall, raising it
                       will increase precision

            constraint: What type of constraint to put on a join.

                        'one-to-one'
                              Every record in data_1 can match at most
                              one record from data_2 and every record
                              from data_2 can match at most one record
                              from data_1. This is good for when both
                              data_1 and data_2 are from different
                              sources and you are interested in
                              matching across the sources. If,
                              individually, data_1 or data_2 have many
                              duplicates you will not get good
                              matches.
                        'many-to-one'
                              Every record in data_1 can match at most
                              one record from data_2, but more than
                              one record from data_1 can match to the
                              same record in data_2. This is good for
                              when data_2 is a lookup table and data_1
                              is messy, such as geocoding or matching
                              against golden records.
                        'many-to-many'
                              Every record in data_1 can match
                              multiple records in data_2 and vice
                              versa. This is like a SQL inner join.

        Examples:
            >>> links = matcher.join(data_1, data_2, threshold=0.5)
            >>> list(links)
            [
                ((1, 2), 0.790),
                ((4, 5), 0.720),
                ((10, 11), 0.899)
            ]
        """

        assert constraint in {"one-to-one", "many-to-one", "many-to-many"}, (
            "%s is an invalid constraint option. Valid options include "
            "one-to-one, many-to-one, or many-to-many" % constraint
        )

        pairs = self.pairs(data_1, data_2)
        pair_scores = self.score(pairs)

        links: Links
        if constraint == "one-to-one":
            links = self.one_to_one(pair_scores, threshold)
        elif constraint == "many-to-one":
            links = self.many_to_one(pair_scores, threshold)
        else:
            links = pair_scores[pair_scores["score"] > threshold]

        links_evaluated: Links = list(links)  # type: ignore[assignment]
        _cleanup_scores(pair_scores)
        return links_evaluated

    def one_to_one(self, scores: Scores, threshold: float = 0.0) -> TupleLinks:
        """From the similarity scores of pairs of records, decide which
        pairs refer to the same entity.

        Every record in data_1 can match at most one record from
        data_2 and every record from data_2 can match at most one
        record from data_1. See
        https://en.wikipedia.org/wiki/Injective_function.

        This method is good for when both data_1 and data_2 are from
        different sources and you are interested in matching across
        the sources. If, individually, data_1 or data_2 have many duplicates
        you will not get good matches.

        Yields pairs of record ids with a confidence score as a float
        between 0 and 1. The record_ids within the pair should refer to the
        same entity and the confidence score is the estimated probability that
        the records refer to the same entity.

        Args:
            scores: a numpy `structured array <https://docs.scipy.org/doc/numpy/user/basics.rec.html>`_ with a dtype of `[('pairs', id_type, 2),
                    ('score', 'f4')]` where dtype is either a str
                    or int, and score is a number between 0 and
                    1. The 'pairs' column contains pairs of ids of
                    the records compared and the 'score' column
                    should contains the similarity score for that
                    pair of records.

            threshold: Number between 0 and 1. We
                       will consider records as potential
                       duplicates if the predicted probability of
                       being a duplicate is above the threshold.

                       Lowering the number will increase recall, raising it
                       will increase precision


        Examples:
            >>> pairs = matcher.pairs(data)
            >>> scores = matcher.scores(pairs, threshold=0.5)
            >>> links = matcher.one_to_one(scores)
            >>> list(links)
            [
                ((1, 2), 0.790),
                ((4, 5), 0.720),
                ((10, 11), 0.899)
            ]
        """
        if threshold:
            scores = scores[scores["score"] > threshold]

        logger.debug("matching done, begin clustering")

        yield from clustering.greedyMatching(scores)

    def many_to_one(self, scores: Scores, threshold: float = 0.0) -> ArrayLinks:
        """
        From the similarity scores of pairs of records, decide which
        pairs refer to the same entity.

        Every record in data_1 can match at most one record from
        data_2, but more than one record from data_1 can match to the same
        record in data_2. See
        https://en.wikipedia.org/wiki/Surjective_function

        This method is good for when data_2 is a lookup table and data_1
        is messy, such as geocoding or matching against golden records.

        Yields pairs of record ids with a confidence score as a float
        between 0 and 1. The record_ids within the pair should refer to the
        same entity and the confidence score is the estimated probability that
        the records refer to the same entity.

        Args:
            scores: a numpy `structured array <https://docs.scipy.org/doc/numpy/user/basics.rec.html>`_ with a dtype of `[('pairs', id_type, 2),
                    ('score', 'f4')]` where dtype is either a str
                    or int, and score is a number between 0 and
                    1. The 'pairs' column contains pairs of ids of
                    the records compared and the 'score' column
                    should contains the similarity score for that
                    pair of records.

            threshold: Number between 0 and 1. We
                       will consider records as potential
                       duplicates if the predicted probability of
                       being a duplicate is above the threshold.

                       Lowering the number will increase recall, raising it
                       will increase precision

        Examples:
            >>> pairs = matcher.pairs(data)
            >>> scores = matcher.scores(pairs, threshold=0.5)
            >>> links = matcher.many_to_one(scores)
            >>> print(list(links))
            [
                ((1, 2), 0.790),
                ((4, 5), 0.720),
                ((7, 2), 0.623),
                ((10, 11), 0.899)
             ]
        """

        logger.debug("matching done, begin clustering")

        yield from clustering.pair_gazette_matching(scores, threshold, 1)


class GazetteerMatching(Matching):
    def __init__(
        self, num_cores: int | None, in_memory: bool = False, **kwargs
    ) -> None:
        super().__init__(num_cores, in_memory, **kwargs)

        self.db: PathLike
        if self.in_memory:
            self.db = ":memory:"
        else:
            self.temp_dir = tempfile.TemporaryDirectory()
            self.db = self.temp_dir.name + "/blocks.db"

        self.indexed_data: (
            MutableMapping[int, RecordDict] | MutableMapping[str, RecordDict]
        )
        self.indexed_data = {}  # type: ignore[assignment]

    def _close(self) -> None:
        if not self.in_memory:
            self.temp_dir.cleanup()

    def __del__(self) -> None:
        self._close()

    @overload
    def index(self, data: DataInt) -> None: ...

    @overload
    def index(self, data: DataStr) -> None: ...

    def index(self, data):  # pragma: no cover
        """
        Add records to the index of records to match against. If a record in
        `canonical_data` has the same key as a previously indexed record, the
        old record will be replaced.

        Args:
            data: a dictionary of records where the keys
                  are record_ids and the values are
                  dictionaries with the keys being
                  field_names
        """

        self.fingerprinter.index_all(data)

        id_type = core.sqlite_id_type(data)

        con = sqlite3.connect(self.db)

        # Set journal mode to WAL.
        con.execute("pragma journal_mode=wal")

        con.execute(
            f"""CREATE TABLE IF NOT EXISTS indexed_records
                       (block_key text,
                        record_id {id_type},
                        UNIQUE(block_key, record_id))"""
        )

        con.executemany(
            "REPLACE INTO indexed_records VALUES (?, ?)",
            self.fingerprinter(data.items(), target=True),
        )

        con.execute(
            """CREATE UNIQUE INDEX IF NOT EXISTS
                       indexed_records_block_key_idx
                       ON indexed_records
                       (block_key, record_id)"""
        )
        con.execute("""ANALYZE""")

        con.commit()
        con.close()

        self.indexed_data.update(data)

    @overload
    def unindex(self, data: DataInt) -> None:  # pragma: no cover
        ...

    @overload
    def unindex(self, data: DataStr) -> None:  # pragma: no cover
        ...

    def unindex(self, data):  # pragma: no cover
        """
        Remove records from the index of records to match against.

        Args:
            data: a dictionary of records where the keys
                  are record_ids and the values are
                  dictionaries with the keys being
                  field_names
        """

        for field in self.fingerprinter.index_fields:
            self.fingerprinter.unindex(
                {record[field] for record in data.values()}, field
            )

        con = sqlite3.connect(self.db)
        con.executemany(
            """DELETE FROM indexed_records
                           WHERE record_id = ?""",
            ((k,) for k in data.keys()),
        )

        con.commit()
        con.close()

        for k in data:
            del self.indexed_data[k]

    @overload
    def blocks(self, data: DataInt) -> BlocksInt: ...

    @overload
    def blocks(self, data: DataStr) -> BlocksStr: ...

    def blocks(self, data):
        """
        Yield groups of pairs of records that share fingerprints.

        Each group contains one record from data_1 paired with the records
        from the indexed records that data_1 shares a fingerprint with.

        Each pair within and among blocks will occur at most once. If
        you override this method, you need to take care to ensure that
        this remains true, as downstream methods, particularly
        :func:`many_to_n`, assumes that every pair of records is compared no
        more than once.

        Args:
            data: Dictionary of records, where the keys are record_ids
                  and the values are dictionaries with the keys being
                  field names

        Examples:
            >>> blocks = matcher.pairs(data)
            >>> print(list(blocks)
            [
                [
                    (
                        (1, {"name": "Pat", "address": "123 Main"}),
                        (8, {"name": "Pat", "address": "123 Main"}),
                    ),
                    (
                        (1, {"name": "Pat", "address": "123 Main"}),
                        (9, {"name": "Sam", "address": "123 Main"}),
                    ),
                ],
                [
                    (
                        (2, {"name": "Sam", "address": "2600 State"}),
                        (5, {"name": "Pam", "address": "2600 Stat"}),
                    ),
                    (
                        (2, {"name": "Sam", "address": "123 State"}),
                        (7, {"name": "Sammy", "address": "123 Main"}),
                    ),
                ],
            ]
        """

        id_type = core.sqlite_id_type(data)

        con = sqlite3.connect(self.db, check_same_thread=False)

        con.execute("BEGIN")

        con.execute(
            f"CREATE TEMPORARY TABLE blocking_map (block_key text, record_id {id_type})"
        )
        con.executemany(
            "INSERT INTO blocking_map VALUES (?, ?)", self.fingerprinter(data.items())
        )

        pairs = con.execute(
            """SELECT DISTINCT a.record_id, b.record_id
                               FROM blocking_map a
                               INNER JOIN indexed_records b
                               USING (block_key)
                               ORDER BY a.record_id"""
        )

        pair_blocks: (
            Iterable[tuple[int, Iterable[tuple[int, int]]]]
            | Iterable[tuple[str, Iterable[tuple[str, str]]]]
        )

        pair_blocks = itertools.groupby(pairs, lambda x: x[0])

        for _, pair_block in pair_blocks:
            yield [
                (
                    (a_record_id, data[a_record_id]),
                    (b_record_id, self.indexed_data[b_record_id]),
                )
                for a_record_id, b_record_id in pair_block
            ]

        pairs.close()
        con.execute("ROLLBACK")
        con.close()

    def score(self, blocks: Blocks) -> Generator[Scores]:
        """
        Scores groups of pairs of records. Yields structured numpy arrays
        representing pairs of records in the group and the associated
        probability that the pair is a match.

        Args:
            blocks: Iterator of blocks of records
        """

        matches = core.scoreGazette(
            blocks, self.data_model.distances, self.classifier, self.num_cores
        )

        return matches

    def many_to_n(
        self,
        score_blocks: Iterable[Scores],
        threshold: float = 0.0,
        n_matches: int = 1,
    ) -> ArrayLinks:
        """
        For each group of scored pairs, yield the highest scoring N pairs

        Args:
            score_blocks: Iterator of numpy `structured arrays <https://docs.scipy.org/doc/numpy/user/basics.rec.html>`_,
                          each with a dtype of `[('pairs', id_type, 2),
                          ('score', 'f4')]` where dtype is either a str
                          or int, and score is a number between 0 and
                          1. The 'pairs' column contains pairs of ids of
                          the records compared and the 'score' column
                          should contains the similarity score for that
                          pair of records.

            threshold: Number between 0 and 1. We
                       will consider records as potential
                       duplicates if the predicted probability of
                       being a duplicate is above the threshold.

                       Lowering the number will increase recall, raising it
                       will increase precision

            n_matches: How many top scoring pairs to select per group
        """

        yield from clustering.gazetteMatching(score_blocks, threshold, n_matches)

    @overload
    def search(
        self,
        data: DataInt,
        threshold: float = 0.0,
        n_matches: int = 1,
        generator: bool = False,
    ) -> LookupResultsInt:  # pragma: no cover
        ...

    @overload
    def search(
        self,
        data: DataStr,
        threshold: float = 0.0,
        n_matches: int = 1,
        generator: bool = False,
    ) -> LookupResultsStr:  # pragma: no cover
        ...

    def search(
        self,
        data,
        threshold=0.0,
        n_matches=1,
        generator=False,
    ):  # pragma: no cover
        """
        Identifies pairs of records that could refer to the same entity,
        returns tuples containing tuples of possible matches, with a
        confidence score for each match. The record_ids within each
        tuple should refer to potential matches from a messy data
        record to canonical records. The confidence score is the
        estimated probability that the records refer to the same
        entity.

        Args:
            data: a dictionary of records from a messy
                  dataset, where the keys are record_ids and
                  the values are dictionaries with the keys
                  being field names.

            threshold: a number between 0 and 1. We will consider
                       records as potential duplicates if the predicted
                       probability of being a duplicate is
                       above the threshold.

                       Lowering the number will increase
                       recall, raising it will increase
                       precision
            n_matches: the maximum number of possible matches from
                       canonical_data to return for each record in
                       data. If set to `None` all possible
                       matches above the threshold will be
                       returned.
            generator: when `True`, match will generate a sequence of
                       possible matches, instead of a list.

        Examples:
            >>> matches = gazetteer.search(data, threshold=0.5, n_matches=2)
            >>> print(matches)
            [
                (((1, 6), 0.72), ((1, 8), 0.6)),
                (((2, 7), 0.72),),
                (((3, 6), 0.72), ((3, 8), 0.65)),
                (((4, 6), 0.96), ((4, 5), 0.63)),
            ]
        """
        blocks = self.blocks(data)
        pair_scores = self.score(blocks)
        search_results = self.many_to_n(pair_scores, threshold, n_matches)

        results = self._format_search_results(data, search_results)

        if generator:
            return results
        else:
            return list(results)

    @overload
    def _format_search_results(
        self, search_d: DataInt, results: ArrayLinks
    ) -> LookupResultsInt: ...

    @overload
    def _format_search_results(
        self, search_d: DataStr, results: ArrayLinks
    ) -> LookupResultsStr: ...

    def _format_search_results(self, search_d, results):
        seen: set[RecordID] = set()

        for result in results:
            a: RecordID | None = None
            b: RecordID
            score: float
            prepared_result: list[tuple[RecordID, float]] = []
            for (a, b), score in result:
                prepared_result.append((b, score))

            assert a is not None

            yield a, tuple(prepared_result)
            seen.add(a)

        for k in search_d.keys() - seen:
            yield k, ()


class StaticMatching(Matching):
    """
    Class for initializing a dedupe object from a settings file.
    """

    def __init__(
        self,
        settings_file: BinaryIO,
        num_cores: int | None = None,
        in_memory: bool = False,
        **kwargs,
    ) -> None:  # pragma: no cover
        """
        Args:
            settings_file: A file object containing settings
                           info produced from the
                           :func:`~dedupe.api.ActiveMatching.write_settings` method.

            num_cores: The number of cpus to use for parallel
                       processing, defaults to the number of cpus
                       available on the machine. If set to 0, then
                       multiprocessing will be disabled.

            in_memory: If True, :meth:`dedupe.Dedupe.pairs` will generate
                       pairs in RAM with the sqlite3 ':memory:' option
                       rather than writing to disk. May be faster if
                       sufficient memory is available.

        .. warning::

            If using multiprocessing on Windows or Mac OS X, then
            you must protect calls to the Dedupe methods with a
            `if __name__ == '__main__'` in your main module, see
            https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
        """
        super().__init__(num_cores, in_memory, **kwargs)

        try:
            self.data_model = pickle.load(settings_file)
            self.classifier = pickle.load(settings_file)
            self.predicates = pickle.load(settings_file)
        except (KeyError, AttributeError):
            raise SettingsFileLoadingException(
                "This settings file is not compatible with "
                "the current version of dedupe. This can happen "
                "if you have recently upgraded dedupe."
            )
        except ModuleNotFoundError as exc:
            if "No module named 'rlr'" in str(exc):
                raise SettingsFileLoadingException(
                    "This settings file was created with a previous "
                    "version of dedupe that used the 'rlr' library. "
                    "To continue to use this settings file, you need "
                    "install that library: `pip install rlr`"
                )
            else:
                raise SettingsFileLoadingException(
                    "Something has gone wrong with loading the settings file. "
                    "Try deleting the file"
                ) from exc
        except:  # noqa: E722
            raise SettingsFileLoadingException(
                "Something has gone wrong with loading the settings file. "
                "Try deleting the file"
            )

        logger.info("Predicate set:")
        for predicate in self.predicates:
            logger.info(predicate)

        self._fingerprinter = blocking.Fingerprinter(self.predicates)


class ActiveMatching(Matching):
    """
    Class for training a matcher.
    """

    active_learner: labeler.DisagreementLearner | None
    training_pairs: TrainingData

    def __init__(
        self,
        variable_definition: Collection[Variable],
        num_cores: int | None = None,
        in_memory: bool = False,
        **kwargs,
    ) -> None:
        """
        Args:
            variable_definition: A list of Variable objects describing
                                 the variables will be used for
                                 training a model. See :ref:`variable_definitions`

            num_cores: The number of cpus to use for parallel
                       processing. If set to `None`, uses all cpus
                       available on the machine. If set to 0, then
                       multiprocessing will be disabled.

            in_memory: If True, :meth:`dedupe.Dedupe.pairs` will generate
                       pairs in RAM with the sqlite3 ':memory:' option
                       rather than writing to disk. May be faster if
                       sufficient memory is available.

        .. warning::

            If using multiprocessing on Windows or Mac OS X, then
            you must protect calls to the Dedupe methods with a
            `if __name__ == '__main__'` in your main module, see
            https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods

        """
        super().__init__(num_cores, in_memory, **kwargs)

        self.data_model = datamodel.DataModel(variable_definition)
        self.training_pairs = {"distinct": [], "match": []}
        self.classifier = sklearn.model_selection.GridSearchCV(
            estimator=sklearn.linear_model.LogisticRegression(),
            param_grid={"C": [0.00001, 0.0001, 0.001, 0.01, 0.1, 1, 10]},
            scoring="f1",
            n_jobs=-1,
        )
        self.active_learner = None

    def cleanup_training(self) -> None:  # pragma: no cover
        """
        Clean up data we used for training. Free up memory.
        """
        del self.training_pairs
        del self.active_learner

    def _read_training(self, training_file: TextIO) -> None:
        """
        Read training from previously built training data file object

        Args:
            training_file: file object containing the training data
        """

        logger.info("reading training from file")
        training_pairs = serializer.read_training(training_file)
        self.mark_pairs(training_pairs)

    def train(
        self, recall: float = 1.00, index_predicates: bool = True
    ) -> None:  # pragma: no cover
        """
        Learn final pairwise classifier and fingerprinting rules. Requires that
        adequate training data has been already been provided.

        Args:
            recall: The proportion of true dupe pairs in our
                    training data that that the learned fingerprinting
                    rules must cover. If we lower the recall, there will
                    be pairs of true dupes that we will never
                    directly compare.

                    recall should be a float between 0.0 and 1.0.

            index_predicates: Should dedupe consider predicates
                              that rely upon indexing the
                              data. Index predicates can be slower
                              and take substantial memory. Without
                              index predicates, you may get lower
                              recall when true-dupes are not blocked
                              together.
        """
        assert (
            self.active_learner is not None
        ), "Please initialize with the prepare_training method"

        examples, y = flatten_training(self.training_pairs)
        self.classifier.fit(self.data_model.distances(examples), y)

        self.predicates = self.active_learner.learn_predicates(recall, index_predicates)
        self._fingerprinter = blocking.Fingerprinter(self.predicates)
        self.fingerprinter.reset_indices()

    def write_training(self, file_obj: TextIO) -> None:  # pragma: no cover
        """
        Write a JSON file that contains labeled examples

        Args:
            file_obj: file object to write training data to

        Examples:
            >>> with open('training.json', 'w') as f:
            >>>     matcher.write_training(f)
        """
        serializer.write_training(self.training_pairs, file_obj)

    def write_settings(self, file_obj: BinaryIO) -> None:  # pragma: no cover
        """
        Write a settings file containing the
        data model and predicates to a file object

        Args:
            file_obj: file object to write settings data into

        Examples:
            >>> with open('learned_settings', 'wb') as f:
            >>>     matcher.write_settings(f)
        """

        pickle.dump(self.data_model, file_obj)
        pickle.dump(self.classifier, file_obj)
        pickle.dump(self.predicates, file_obj)

    def uncertain_pairs(self) -> TrainingExamples:
        """
         Returns a list of pairs of records from the sample of record pairs
         tuples that Dedupe is most curious to have labeled.

         This method is mainly useful for building a user interface for training
         a matching model.

        Examples:
            >>> pair = matcher.uncertain_pairs()
            >>> print(pair)
            [({'name' : 'Georgie Porgie'}, {'name' : 'Georgette Porgette'})]
        """
        assert (
            self.active_learner is not None
        ), "Please initialize with the prepare_training method"
        return [self.active_learner.pop()]

    def mark_pairs(self, labeled_pairs: TrainingData) -> None:
        """
        Add users labeled pairs of records to training data and update the
        matching model

        This method is useful for building a user interface for training a
        matching model or for adding training data from an existing source.

        Args:
            labeled_pairs: A dictionary with two keys, `match` and `distinct`
                           the values are lists that can contain pairs of
                           records

        Examples:
            >>> labeled_examples = {
            >>>     "match": [],
            >>>     "distinct": [
            >>>         (
            >>>             {"name": "Georgie Porgie"},
            >>>             {"name": "Georgette Porgette"},
            >>>         )
            >>>     ],
            >>> }
            >>> matcher.mark_pairs(labeled_examples)

        .. note::
           `mark_pairs()` is primarily designed to be used with
           :func:`~uncertain_pairs` to incrementally build a training
           set.

           If you have existing training data, you should likely
           format the data into the right form and supply the training
           data to the :func:`~prepare_training` method with the
           `training_file` argument.

           If that is not possible or desirable, you can use
           `mark_pairs()` to train a linker with existing data.
           However, you must ensure that every record that
           appears in the `labeled_pairs` argument appears in either
           the data or training file supplied to the
           :func:`~prepare_training` method.
        """
        self._checkTrainingPairs(labeled_pairs)

        self.training_pairs["match"].extend(labeled_pairs["match"])
        self.training_pairs["distinct"].extend(labeled_pairs["distinct"])

        if self.active_learner:
            examples, y = flatten_training(labeled_pairs)

            try:
                self.active_learner.mark(examples, y)
            except dedupe.predicates.NoIndexError as e:
                raise UserWarning(
                    "The record\n"
                    f"{e.failing_record}\n"
                    "is not known to to the active learner. "
                    "Make sure all `labeled_pairs` "
                    "are in the data or training file "
                    "of the `prepare_training()` method"
                )

    def _checkTrainingPairs(self, labeled_pairs: TrainingData) -> None:
        try:
            labeled_pairs.items()
            labeled_pairs["match"]
            labeled_pairs["distinct"]
        except (AttributeError, KeyError):
            raise ValueError(
                "labeled_pairs must be a dictionary with keys " '"distinct" and "match"'
            )

        if labeled_pairs["match"]:
            pair = labeled_pairs["match"][0]
            self._checkRecordPair(pair)

        if labeled_pairs["distinct"]:
            pair = labeled_pairs["distinct"][0]
            self._checkRecordPair(pair)

        if not labeled_pairs["distinct"] and not labeled_pairs["match"]:
            warnings.warn("Didn't return any labeled record pairs")

    def _checkRecordPair(self, record_pair: TrainingExample) -> None:
        try:
            a, b = record_pair
        except ValueError:
            raise ValueError(
                "The elements of data_sample must be pairs " "of record_pairs"
            )
        try:
            record_pair[0].keys() and record_pair[1].keys()
        except AttributeError:
            raise ValueError(
                "A pair of record_pairs must be made up of two " "dictionaries "
            )

        self.data_model.check(record_pair[0])
        self.data_model.check(record_pair[1])


[docs] class StaticDedupe(StaticMatching, DedupeMatching): """ Class for deduplication using saved settings. If you have already trained a :class:`Dedupe` object and saved the settings, you can load the saved settings with StaticDedupe. """
[docs] class Dedupe(ActiveMatching, DedupeMatching): """ Class for active learning deduplication. Use deduplication when you have data that can contain multiple records that can all refer to the same entity. """
[docs] def prepare_training( self, data: Data, training_file: TextIO | None = None, sample_size: int = 1500, blocked_proportion: float = 0.9, ) -> None: """ Initialize the active learner with your data and, optionally, existing training data. Sets up the learner. Args: data: Dictionary of records, where the keys are record_ids and the values are dictionaries with the keys being field names training_file: file object containing training data sample_size: Size of the sample to draw blocked_proportion: The proportion of record pairs to be sampled from similar records, as opposed to randomly selected pairs. Examples: >>> matcher.prepare_training(data_d, 150000, .5) >>> with open('training_file.json') as f: >>> matcher.prepare_training(data_d, training_file=f) """ self._checkData(data) # Reset active learner self.active_learner = None if training_file: self._read_training(training_file) # We need the active learner to know about all our # existing training data, so add them to data dictionary examples, y = flatten_training(self.training_pairs) self.active_learner = labeler.DedupeDisagreementLearner( self.data_model.predicates, self.data_model.distances, data, index_include=examples, ) self.active_learner.mark(examples, y)
def _checkData(self, data: Data) -> None: if len(data) == 0: raise ValueError("Dictionary of records is empty.") self.data_model.check(next(iter(data.values())))
class Link(ActiveMatching): """ Mixin Class for Active Learning Record Linkage """ def prepare_training( self, data_1: Data, data_2: Data, training_file: TextIO | None = None, sample_size: int = 1500, blocked_proportion: float = 0.9, ) -> None: """ Initialize the active learner with your data and, optionally, existing training data. Args: data_1: Dictionary of records from first dataset, where the keys are record_ids and the values are dictionaries with the keys being field names data_2: Dictionary of records from second dataset, same form as data_1 training_file: file object containing training data sample_size: The size of the sample to draw. blocked_proportion: The proportion of record pairs to be sampled from similar records, as opposed to randomly selected pairs. Examples: >>> matcher.prepare_training(data_1, data_2, 150000) or >>> with open('training_file.json') as f: >>> matcher.prepare_training(data_1, data_2, training_file=f) """ self._checkData(data_1, data_2) # Reset active learner self.active_learner = None if training_file: self._read_training(training_file) # We need the active learner to know about all our # existing training data, so add them to data dictionaries examples, y = flatten_training(self.training_pairs) self.active_learner = labeler.RecordLinkDisagreementLearner( self.data_model.predicates, self.data_model.distances, data_1, data_2, index_include=examples, ) self.active_learner.mark(examples, y) def _checkData(self, data_1: Data, data_2: Data) -> None: if len(data_1) == 0: raise ValueError("Dictionary of records from first dataset is empty.") elif len(data_2) == 0: raise ValueError("Dictionary of records from second dataset is empty.") self.data_model.check(next(iter(data_1.values()))) self.data_model.check(next(iter(data_2.values())))
[docs] class Gazetteer(Link, GazetteerMatching): """ Class for active learning gazetteer matching. Gazetteer matching is for matching a messy data set against a 'canonical dataset'. This class is useful for such tasks as matching messy addresses against a clean list """
[docs] class StaticGazetteer(StaticMatching, GazetteerMatching): """ Class for gazetter matching using saved settings. If you have already trained a :class:`Gazetteer` instance, you can load the saved settings with StaticGazetteer. """
class EmptyTrainingException(Exception): pass class SettingsFileLoadingException(Exception): pass def flatten_training( training_pairs: TrainingData, ) -> tuple[TrainingExamples, LabelsLike]: examples: TrainingExamples = [] y = [] for label in ("match", "distinct"): label = cast(Literal["match", "distinct"], label) pairs = training_pairs[label] examples.extend(pairs) encoded_y = 1 if label == "match" else 0 y.extend([encoded_y] * len(pairs)) return examples, numpy.array(y) def _cleanup_scores(arr: Scores) -> None: try: mmap_file = arr.filename # type: ignore except AttributeError: pass else: arr._mmap.close() # type: ignore # Unmap file to prevent PermissionError when deleting temp file del arr if mmap_file: os.remove(mmap_file)