#!/usr/bin/python
from __future__ import annotations
import logging
import time
from collections import defaultdict
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import (
Any,
Callable,
DefaultDict,
Generator,
Iterable,
List,
Sequence,
Union,
)
import dedupe.predicates
from dedupe._typing import Data, Record, RecordID
from dedupe.index import Index
Docs = Union[Iterable[str], Iterable[Iterable[str]]]
IndexList = DefaultDict[str, List[dedupe.predicates.IndexPredicate]]
logger = logging.getLogger(__name__)
def index_list() -> IndexList:
return defaultdict(list)
[docs]
class Fingerprinter:
"""Takes in a record and returns all blocks that record belongs to"""
def __init__(self, predicates: Iterable[dedupe.predicates.Predicate]) -> None:
self.predicates = predicates
self.index_fields: dict[str, IndexList]
self.index_fields = defaultdict(index_list)
"""
A dictionary of all the fingerprinter methods that use an
index of data field values. The keys are the field names,
which can be useful to know for indexing the data.
"""
self.index_predicates = []
for full_predicate in predicates:
for predicate in full_predicate:
if hasattr(predicate, "index"):
self.index_fields[predicate.field][predicate.type].append(predicate)
self.index_predicates.append(predicate)
[docs]
def __call__(
self, records: Iterable[Record], target: bool = False
) -> Generator[tuple[str, RecordID]]:
"""
Generate the predicates for records. Yields tuples of (predicate,
record_id).
Args:
records: A sequence of tuples of (record_id,
record_dict). Can often be created by
`data_dict.items()`.
target: Indicates whether the data should be treated as
the target data. This effects the behavior of
search predicates. If `target` is set to
`True`, an search predicate will return the
value itself. If `target` is set to `False` the
search predicate will return all possible
values within the specified search distance.
Let's say we have a
`LevenshteinSearchPredicate` with an associated
distance of `1` on a `"name"` field; and we
have a record like `{"name": "thomas"}`. If the
`target` is set to `True` then the predicate
will return `"thomas"`. If `target` is set to
`False`, then the blocker could return
`"thomas"`, `"tomas"`, and `"thoms"`. By using
the `target` argument on one of your datasets,
you will dramatically reduce the total number
of comparisons without a loss of accuracy.
.. code:: python
> data = [(1, {'name' : 'bob'}), (2, {'name' : 'suzanne'})]
> blocked_ids = deduper.fingerprinter(data)
> print list(blocked_ids)
[('foo:1', 1), ..., ('bar:1', 100)]
"""
start_time = time.perf_counter()
predicates = [
(":" + str(i), predicate) for i, predicate in enumerate(self.predicates)
]
for i, record in enumerate(records):
record_id, instance = record
for pred_id, predicate in predicates:
block_keys = predicate(instance, target=target)
for block_key in block_keys:
yield block_key + pred_id, record_id
if i and i % 10000 == 0:
logger.info(
"%(iteration)d, %(elapsed)f2 seconds",
{"iteration": i, "elapsed": time.perf_counter() - start_time},
)
[docs]
def reset_indices(self) -> None:
"""
Fingerprinter indices can take up a lot of memory. If you are
done with blocking, the method will reset the indices to free up.
If you need to block again, the data will need to be re-indexed.
"""
for predicate in self.index_predicates:
predicate.reset()
[docs]
def index(self, docs: Docs, field: str) -> None:
"""
Add docs to the indices used by fingerprinters.
Some fingerprinter methods depend upon having an index of
values that a field may have in the data. This method adds
those values to the index. If you don't have any fingerprinter
methods that use an index, this method will do nothing.
Args:
docs: an iterator of values from your data to index. While
not required, it is recommended that docs be a unique
set of of those values. Indexing can be an expensive
operation.
field: fieldname or key associated with the values you are
indexing
"""
indices = extractIndices(self.index_fields[field])
for doc in docs:
if doc:
for _, index, preprocess in indices:
index.index(preprocess(doc))
for index_type, index, _ in indices:
index.initSearch()
for predicate in self.index_fields[field][index_type]:
logger.debug("Canopy: %s", str(predicate))
predicate.index = index
predicate.bust_cache()
[docs]
def unindex(self, docs: Docs, field: str) -> None:
"""Remove docs from indices used by fingerprinters
Args:
docs: an iterator of values from your data to remove. While
not required, it is recommended that docs be a unique
set of of those values. Indexing can be an expensive
operation.
field: fieldname or key associated with the values you are
unindexing
"""
indices = extractIndices(self.index_fields[field])
for doc in docs:
if doc:
for _, index, preprocess in indices:
try:
index.unindex(preprocess(doc))
except KeyError:
pass
for index_type, index, _ in indices:
index.initSearch()
for predicate in self.index_fields[field][index_type]:
logger.debug("Canopy: %s", str(predicate))
predicate.index = index
predicate.bust_cache()
def index_all(self, data: Data) -> None:
for field in self.index_fields:
unique_fields = {record[field] for record in data.values() if record[field]}
self.index(unique_fields, field)
def extractIndices(
index_fields: IndexList,
) -> Sequence[tuple[str, Index, Callable[[Any], Any]]]:
indices = []
for index_type, predicates in index_fields.items():
predicate = predicates[0]
index = predicate.index
preprocess = predicate.preprocess
if predicate.index is None:
index = predicate.initIndex()
assert index is not None
indices.append((index_type, index, preprocess))
return indices