diff --git a/dedupe/_typing.py b/dedupe/_typing.py index 1ab25018..b2e6d7e7 100644 --- a/dedupe/_typing.py +++ b/dedupe/_typing.py @@ -80,6 +80,12 @@ class TrainingData(TypedDict): distinct: List[RecordDictPair] +# Takes pairs of records and generates a (n_samples X n_features) array +FeaturizerFunction = Callable[ + [Sequence[RecordDictPair]], numpy.typing.NDArray[numpy.float_] +] + + class Classifier(Protocol): """Takes an array of pairwise distances and computes the likelihood they are a pair.""" diff --git a/dedupe/api.py b/dedupe/api.py index fe701fff..ed98e520 100644 --- a/dedupe/api.py +++ b/dedupe/api.py @@ -107,7 +107,7 @@ def score(self, pairs: RecordPairs) -> Scores: """ try: matches = core.scoreDuplicates( - pairs, self.data_model, self.classifier, self.num_cores + pairs, self.data_model.distances, self.classifier, self.num_cores ) except RuntimeError: raise RuntimeError( @@ -824,7 +824,7 @@ def score(self, blocks: Blocks) -> Generator[Scores, None, None]: """ matches = core.scoreGazette( - blocks, self.data_model, self.classifier, self.num_cores + blocks, self.data_model.distances, self.classifier, self.num_cores ) return matches @@ -1325,7 +1325,8 @@ def prepare_training( examples, y = flatten_training(self.training_pairs) self.active_learner = labeler.DedupeDisagreementLearner( - self.data_model, + self.data_model.predicates, + self.data_model.distances, data, index_include=examples, ) @@ -1392,7 +1393,8 @@ def prepare_training( examples, y = flatten_training(self.training_pairs) self.active_learner = labeler.RecordLinkDisagreementLearner( - self.data_model, + self.data_model.predicates, + self.data_model.distances, data_1, data_2, index_include=examples, diff --git a/dedupe/core.py b/dedupe/core.py index 7d739af5..64a74f66 100644 --- a/dedupe/core.py +++ b/dedupe/core.py @@ -34,6 +34,7 @@ Classifier, ClosableJoinable, Data, + FeaturizerFunction, Literal, MapLike, RecordID, @@ -41,7 +42,6 @@ RecordPairs, Scores, ) - from dedupe.datamodel import DataModel _Queue = Union[multiprocessing.dummy.Queue, multiprocessing.Queue] @@ -53,7 +53,7 @@ class BlockingError(Exception): class ScoreDupes(object): def __init__( self, - data_model: DataModel, + featurizer: FeaturizerFunction, classifier: Classifier, records_queue: _Queue, exception_queue: _Queue, @@ -61,7 +61,7 @@ def __init__( dtype: numpy.dtype, offset, ): - self.data_model = data_model + self.featurizer = featurizer self.classifier = classifier self.records_queue = records_queue self.exception_queue = exception_queue @@ -87,8 +87,8 @@ def fieldDistance(self, record_pairs: RecordPairs) -> None: if not records: return - distances = self.data_model.distances(records) - scores = self.classifier.predict_proba(distances)[:, -1] + features = self.featurizer(records) + scores = self.classifier.predict_proba(features)[:, -1] mask = scores > 0 if not mask.any(): @@ -113,7 +113,7 @@ def fieldDistance(self, record_pairs: RecordPairs) -> None: def scoreDuplicates( record_pairs: RecordPairs, - data_model: DataModel, + featurizer: FeaturizerFunction, classifier: Classifier, num_cores: int = 1, ) -> Scores: @@ -145,7 +145,7 @@ def scoreDuplicates( n_map_processes = max(num_cores, 1) score_records = ScoreDupes( - data_model, + featurizer, classifier, record_pairs_queue, exception_queue, @@ -200,15 +200,15 @@ def fillQueue( class ScoreGazette(object): - def __init__(self, data_model: DataModel, classifier: Classifier): - self.data_model = data_model + def __init__(self, featurizer: FeaturizerFunction, classifier: Classifier): + self.featurizer = featurizer self.classifier = classifier def __call__(self, block: Block) -> Scores: record_ids, records = zip(*(zip(*each) for each in block)) - distances = self.data_model.distances(records) - scores = self.classifier.predict_proba(distances)[:, -1] + features = self.featurizer(records) + scores = self.classifier.predict_proba(features)[:, -1] id_type = sniff_id_type(record_ids) ids = numpy.array(record_ids, dtype=id_type) @@ -227,7 +227,7 @@ def __call__(self, block: Block) -> Scores: def scoreGazette( record_pairs: Blocks, - data_model: DataModel, + featurizer: FeaturizerFunction, classifier: Classifier, num_cores: int = 1, ) -> Generator[Scores, None, None]: @@ -238,7 +238,7 @@ def scoreGazette( imap, pool = appropriate_imap(num_cores) - score_records = ScoreGazette(data_model, classifier) + score_records = ScoreGazette(featurizer, classifier) for scored_pairs in imap(score_records, record_pairs): yield scored_pairs diff --git a/dedupe/labeler.py b/dedupe/labeler.py index 5942497a..3882acd7 100644 --- a/dedupe/labeler.py +++ b/dedupe/labeler.py @@ -15,11 +15,10 @@ if TYPE_CHECKING: from typing import Dict, Iterable, Literal, Mapping - from dedupe._typing import Data, Labels, LabelsLike + from dedupe._typing import Data, FeaturizerFunction, Labels, LabelsLike from dedupe._typing import RecordDictPair as TrainingExample from dedupe._typing import RecordDictPairs as TrainingExamples from dedupe._typing import RecordIDPair - from dedupe.datamodel import DataModel from dedupe.predicates import Predicate @@ -70,33 +69,25 @@ def _verify_fit_args(pairs: TrainingExamples, y: LabelsLike) -> list[Literal[0, class MatchLearner(Learner): - def __init__(self, data_model: DataModel, candidates: TrainingExamples): - self.data_model = data_model + def __init__(self, featurizer: FeaturizerFunction, candidates: TrainingExamples): + self._featurizer = featurizer self._candidates = candidates.copy() self._classifier = sklearn.linear_model.LogisticRegression() - self._distances = self._calc_distances(self.candidates) + self._features = self._featurizer(self.candidates) def fit(self, pairs: TrainingExamples, y: LabelsLike) -> None: y = self._verify_fit_args(pairs, y) - self._classifier.fit(self._calc_distances(pairs), numpy.array(y)) + self._classifier.fit(self._featurizer(pairs), numpy.array(y)) self._fitted = True def remove(self, index: int) -> None: self._candidates.pop(index) - self._distances = numpy.delete(self._distances, index, axis=0) + self._features = numpy.delete(self._features, index, axis=0) def candidate_scores(self) -> numpy.typing.NDArray[numpy.float_]: if not self._fitted: raise ValueError("Must call fit() before candidate_scores()") - scores: numpy.typing.NDArray[numpy.float_] = self._classifier.predict_proba( - self._distances - )[:, 1].reshape(-1, 1) - return scores - - def _calc_distances( - self, pairs: TrainingExamples - ) -> numpy.typing.NDArray[numpy.float_]: - return self.data_model.distances(pairs) + return self._classifier.predict_proba(self._features)[:, 1].reshape(-1, 1) class BlockLearner(Learner): @@ -216,7 +207,7 @@ def _filter_canopy_predicates( class DedupeBlockLearner(BlockLearner): def __init__( self, - data_model: DataModel, + candidate_predicates: Iterable[Predicate], data: Data, index_include: TrainingExamples, ): @@ -228,7 +219,7 @@ def __init__( index_data = sample_records(data, 50000) sampled_records = sample_records(index_data, N_SAMPLED_RECORDS) - preds = _filter_canopy_predicates(data_model.predicates, canopies=True) + preds = _filter_canopy_predicates(candidate_predicates, canopies=True) self.block_learner = training.DedupeBlockLearner( preds, sampled_records, index_data ) @@ -268,7 +259,7 @@ def _sample(self, data: Data, sample_size: int) -> TrainingExamples: class RecordLinkBlockLearner(BlockLearner): def __init__( self, - data_model: DataModel, + candidate_predicates: Iterable[Predicate], data_1: Data, data_2: Data, index_include: TrainingExamples, @@ -282,7 +273,7 @@ def __init__( index_data = sample_records(data_2, 50000) sampled_records_2 = sample_records(index_data, N_SAMPLED_RECORDS) - preds = _filter_canopy_predicates(data_model.predicates, canopies=False) + preds = _filter_canopy_predicates(candidate_predicates, canopies=False) self.block_learner = training.RecordLinkBlockLearner( preds, sampled_records_1, sampled_records_2, index_data ) @@ -324,8 +315,7 @@ class DisagreementLearner(HasCandidates): matcher: MatchLearner blocker: BlockLearner - def __init__(self, data_model: DataModel) -> None: - self.data_model = data_model + def __init__(self) -> None: self.y: numpy.typing.NDArray[numpy.int_] = numpy.array([]) self.pairs: TrainingExamples = [] @@ -383,12 +373,12 @@ def learn_predicates( class DedupeDisagreementLearner(DisagreementLearner): def __init__( self, - data_model: DataModel, + candidate_predicates: Iterable[Predicate], + featurizer: FeaturizerFunction, data: Data, index_include: TrainingExamples, ): - super().__init__(data_model) - + super().__init__() data = core.index(data) random_pair = ( @@ -400,11 +390,11 @@ def __init__( index_include = index_include.copy() index_include.append(exact_match) - self.blocker = DedupeBlockLearner(data_model, data, index_include) + self.blocker = DedupeBlockLearner(candidate_predicates, data, index_include) self._candidates = self.blocker.candidates.copy() - self.matcher = MatchLearner(self.data_model, self.candidates) + self.matcher = MatchLearner(featurizer, self.candidates) examples = [exact_match] * 4 + [random_pair] labels: Labels = [1] * 4 + [0] # type: ignore[assignment] @@ -414,13 +404,13 @@ def __init__( class RecordLinkDisagreementLearner(DisagreementLearner): def __init__( self, - data_model: DataModel, + candidate_predicates: Iterable[Predicate], + featurizer: FeaturizerFunction, data_1: Data, data_2: Data, index_include: TrainingExamples, ): - super().__init__(data_model) - + super().__init__() data_1 = core.index(data_1) offset = len(data_1) @@ -435,10 +425,12 @@ def __init__( index_include = index_include.copy() index_include.append(exact_match) - self.blocker = RecordLinkBlockLearner(data_model, data_1, data_2, index_include) + self.blocker = RecordLinkBlockLearner( + candidate_predicates, data_1, data_2, index_include + ) self._candidates = self.blocker.candidates.copy() - self.matcher = MatchLearner(self.data_model, self.candidates) + self.matcher = MatchLearner(featurizer, self.candidates) examples = [exact_match] * 4 + [random_pair] labels: Labels = [1] * 4 + [0] # type: ignore[assignment] diff --git a/tests/test_core.py b/tests/test_core.py index 2b4fc1c5..9754f154 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -70,7 +70,7 @@ def setUp(self): def test_score_duplicates(self): scores = dedupe.core.scoreDuplicates( - self.records, self.data_model, self.classifier, 2 + self.records, self.data_model.distances, self.classifier, 2 ) numpy.testing.assert_equal(scores["pairs"], self.desired_scored_pairs["pairs"]) @@ -94,7 +94,7 @@ def test_score_duplicates_with_zeros(self): expected = numpy.array([(["3", "4"], 1)], dtype=dtype) scores = dedupe.core.scoreDuplicates( - records, self.data_model, self.classifier, 2 + records, self.data_model.distances, self.classifier, 2 ) assert isinstance(scores, numpy.memmap) diff --git a/tests/test_labeler.py b/tests/test_labeler.py index 7f9c8df2..30609ffa 100644 --- a/tests/test_labeler.py +++ b/tests/test_labeler.py @@ -43,7 +43,9 @@ def test_AL(self): ({"name": "William", "age": "35"}, {"name": "Jimbo", "age": "21"}), ] EXPECTED_CANDIDATES = {freeze_record_pair(pair) for pair in EXPECTED_CANDIDATES} - active_learner = labeler.DedupeDisagreementLearner(self.data_model, SAMPLE, []) + active_learner = labeler.DedupeDisagreementLearner( + self.data_model.predicates, self.data_model.distances, SAMPLE, [] + ) actual_candidates = set() for i in range(len(EXPECTED_CANDIDATES), 0, -1): assert len(active_learner) == i