diff --git a/dedupe/_typing.py b/dedupe/_typing.py index 1ab250183..b2e6d7e7a 100644 --- a/dedupe/_typing.py +++ b/dedupe/_typing.py @@ -80,6 +80,12 @@ class TrainingData(TypedDict): distinct: List[RecordDictPair] +# Takes pairs of records and generates a (n_samples X n_features) array +FeaturizerFunction = Callable[ + [Sequence[RecordDictPair]], numpy.typing.NDArray[numpy.float_] +] + + class Classifier(Protocol): """Takes an array of pairwise distances and computes the likelihood they are a pair.""" diff --git a/dedupe/api.py b/dedupe/api.py index dc54f7b33..831683676 100644 --- a/dedupe/api.py +++ b/dedupe/api.py @@ -14,6 +14,7 @@ import sqlite3 import tempfile import warnings +from io import BytesIO from typing import TYPE_CHECKING, cast import numpy @@ -107,7 +108,7 @@ def score(self, pairs: RecordPairs) -> Scores: """ try: matches = core.scoreDuplicates( - pairs, self.data_model, self.classifier, self.num_cores + pairs, self.data_model.distances, self.classifier, self.num_cores ) except RuntimeError: raise RuntimeError( @@ -824,7 +825,7 @@ def score(self, blocks: Blocks) -> Generator[Scores, None, None]: """ matches = core.scoreGazette( - blocks, self.data_model, self.classifier, self.num_cores + blocks, self.data_model.distances, self.classifier, self.num_cores ) return matches @@ -978,10 +979,37 @@ def __init__( """ super().__init__(num_cores, in_memory, **kwargs) + self.data_model, self.classifier, self.predicates = self._load_settings( + settings_file + ) + + logger.info("Predicate set:") + for predicate in self.predicates: + logger.info(predicate) + + self._fingerprinter = blocking.Fingerprinter(self.predicates) + + @classmethod + def _load_settings( + cls, settings_file: BinaryIO + ) -> tuple[datamodel.DataModel, Classifier, list[dedupe.predicates.Predicate]]: + # Make a copy so we can read it multiple times + settings_file = BytesIO(settings_file.read()) + settings_file.seek(0) + catchall_exception = SettingsFileLoadingException( + "Something has gone wrong with loading the settings file. " + "Try deleting the file" + ) try: - self.data_model = pickle.load(settings_file) - self.classifier = pickle.load(settings_file) - self.predicates = pickle.load(settings_file) + version = pickle.load(settings_file) + if not isinstance(version, int): + settings_file.seek(0) + return cls._load_settings_v0(settings_file) + else: + raise SettingsFileLoadingException( + "Settings file version {} not understood".format(version) + ) + except (KeyError, AttributeError): raise SettingsFileLoadingException( "This settings file is not compatible with " @@ -997,21 +1025,18 @@ def __init__( "install that library: `pip install rlr`" ) else: - raise SettingsFileLoadingException( - "Something has gone wrong with loading the settings file. " - "Try deleting the file" - ) from exc - except: # noqa: E722 - raise SettingsFileLoadingException( - "Something has gone wrong with loading the settings file. " - "Try deleting the file" - ) + raise + except Exception as exc: + raise catchall_exception from exc - logger.info("Predicate set:") - for predicate in self.predicates: - logger.info(predicate) - - self._fingerprinter = blocking.Fingerprinter(self.predicates) + @staticmethod + def _load_settings_v0( + settings_file: BinaryIO, + ) -> tuple[datamodel.DataModel, Classifier, list[dedupe.predicates.Predicate]]: + data_model = pickle.load(settings_file) + classifier = pickle.load(settings_file) + predicates = pickle.load(settings_file) + return data_model, classifier, predicates class ActiveMatching(Matching): @@ -1325,7 +1350,8 @@ def prepare_training( examples, y = flatten_training(self.training_pairs) self.active_learner = labeler.DedupeDisagreementLearner( - self.data_model, + self.data_model.predicates, + self.data_model.distances, data, index_include=examples, ) @@ -1392,7 +1418,8 @@ def prepare_training( examples, y = flatten_training(self.training_pairs) self.active_learner = labeler.RecordLinkDisagreementLearner( - self.data_model, + self.data_model.predicates, + self.data_model.distances, data_1, data_2, index_include=examples, diff --git a/dedupe/core.py b/dedupe/core.py index 7d739af55..64a74f66c 100644 --- a/dedupe/core.py +++ b/dedupe/core.py @@ -34,6 +34,7 @@ Classifier, ClosableJoinable, Data, + FeaturizerFunction, Literal, MapLike, RecordID, @@ -41,7 +42,6 @@ RecordPairs, Scores, ) - from dedupe.datamodel import DataModel _Queue = Union[multiprocessing.dummy.Queue, multiprocessing.Queue] @@ -53,7 +53,7 @@ class BlockingError(Exception): class ScoreDupes(object): def __init__( self, - data_model: DataModel, + featurizer: FeaturizerFunction, classifier: Classifier, records_queue: _Queue, exception_queue: _Queue, @@ -61,7 +61,7 @@ def __init__( dtype: numpy.dtype, offset, ): - self.data_model = data_model + self.featurizer = featurizer self.classifier = classifier self.records_queue = records_queue self.exception_queue = exception_queue @@ -87,8 +87,8 @@ def fieldDistance(self, record_pairs: RecordPairs) -> None: if not records: return - distances = self.data_model.distances(records) - scores = self.classifier.predict_proba(distances)[:, -1] + features = self.featurizer(records) + scores = self.classifier.predict_proba(features)[:, -1] mask = scores > 0 if not mask.any(): @@ -113,7 +113,7 @@ def fieldDistance(self, record_pairs: RecordPairs) -> None: def scoreDuplicates( record_pairs: RecordPairs, - data_model: DataModel, + featurizer: FeaturizerFunction, classifier: Classifier, num_cores: int = 1, ) -> Scores: @@ -145,7 +145,7 @@ def scoreDuplicates( n_map_processes = max(num_cores, 1) score_records = ScoreDupes( - data_model, + featurizer, classifier, record_pairs_queue, exception_queue, @@ -200,15 +200,15 @@ def fillQueue( class ScoreGazette(object): - def __init__(self, data_model: DataModel, classifier: Classifier): - self.data_model = data_model + def __init__(self, featurizer: FeaturizerFunction, classifier: Classifier): + self.featurizer = featurizer self.classifier = classifier def __call__(self, block: Block) -> Scores: record_ids, records = zip(*(zip(*each) for each in block)) - distances = self.data_model.distances(records) - scores = self.classifier.predict_proba(distances)[:, -1] + features = self.featurizer(records) + scores = self.classifier.predict_proba(features)[:, -1] id_type = sniff_id_type(record_ids) ids = numpy.array(record_ids, dtype=id_type) @@ -227,7 +227,7 @@ def __call__(self, block: Block) -> Scores: def scoreGazette( record_pairs: Blocks, - data_model: DataModel, + featurizer: FeaturizerFunction, classifier: Classifier, num_cores: int = 1, ) -> Generator[Scores, None, None]: @@ -238,7 +238,7 @@ def scoreGazette( imap, pool = appropriate_imap(num_cores) - score_records = ScoreGazette(data_model, classifier) + score_records = ScoreGazette(featurizer, classifier) for scored_pairs in imap(score_records, record_pairs): yield scored_pairs diff --git a/dedupe/datamodel.py b/dedupe/datamodel.py index bb4335658..053b8ee1c 100644 --- a/dedupe/datamodel.py +++ b/dedupe/datamodel.py @@ -8,6 +8,7 @@ import numpy import dedupe.variables +from dedupe.variables.base import CustomType from dedupe.variables.base import FieldType as FieldVariable from dedupe.variables.base import MissingDataType, Variable from dedupe.variables.interaction import InteractionType @@ -28,21 +29,23 @@ ) from dedupe.predicates import Predicate -VARIABLE_CLASSES = {k: v for k, v in FieldVariable.all_subclasses() if k} +VARIABLE_CLASSES = {k: v for k, v in Variable.all_subclasses() if k} class DataModel(object): version = 1 def __init__(self, variable_definitions: Iterable[VariableDefinition]): - variable_definitions = list(variable_definitions) - if not variable_definitions: - raise ValueError("The variable definitions cannot be empty") - all_variables: list[Variable] - self.primary_variables, all_variables = typify_variables(variable_definitions) - self._derived_start = len(all_variables) - - all_variables += interactions(variable_definitions, self.primary_variables) + variables = typify_variables(variable_definitions) + non_interactions: list[FieldVariable] = [ + v for v in variables if not isinstance(v, InteractionType) # type: ignore[misc] + ] + self.primary_variables = non_interactions + expanded_primary = _expand_higher_variables(self.primary_variables) + self._derived_start = len(expanded_primary) + + all_variables = expanded_primary.copy() + all_variables += _expanded_interactions(variables) all_variables += missing(all_variables) self._missing_field_indices = missing_field_indices(all_variables) @@ -50,9 +53,6 @@ def __init__(self, variable_definitions: Iterable[VariableDefinition]): self._len = len(all_variables) - def __len__(self) -> int: - return self._len - # Changing this from a property to just a normal attribute causes # pickling problems, because we are removing static methods from # their class context. This could be fixed by defining comparators @@ -82,7 +82,7 @@ def distances( ) -> numpy.typing.NDArray[numpy.float_]: num_records = len(record_pairs) - distances = numpy.empty((num_records, len(self)), "f4") + distances = numpy.empty((num_records, self._len), "f4") for i, (record_1, record_2) in enumerate(record_pairs): @@ -144,11 +144,12 @@ def __setstate__(self, d): def typify_variables( variable_definitions: Iterable[VariableDefinition], -) -> tuple[list[FieldVariable], list[Variable]]: - primary_variables: list[FieldVariable] = [] - all_variables: list[Variable] = [] - only_custom = True +) -> list[Variable]: + variable_definitions = list(variable_definitions) + if not variable_definitions: + raise ValueError("The variable definitions cannot be empty") + variables: list[Variable] = [] for definition in variable_definitions: try: variable_type = definition["type"] @@ -167,12 +168,6 @@ def typify_variables( "{'field' : 'Phone', type: 'String'}" ) - if variable_type != "Custom": - only_custom = False - - if variable_type == "Interaction": - continue - if variable_type == "FuzzyCategorical" and "other fields" not in definition: definition["other fields"] = [ # type: ignore d["field"] @@ -183,22 +178,15 @@ def typify_variables( try: variable_class = VARIABLE_CLASSES[variable_type] except KeyError: + valid = ", ".join(VARIABLE_CLASSES) raise KeyError( - "Field type %s not valid. Valid types include %s" - % (definition["type"], ", ".join(VARIABLE_CLASSES)) + f"Variable type {variable_type} not valid. Valid types include {valid}" ) - variable_object = variable_class(definition) - assert isinstance(variable_object, FieldVariable) - - primary_variables.append(variable_object) - - if hasattr(variable_object, "higher_vars"): - all_variables.extend(variable_object.higher_vars) - else: - variable_object = cast(Variable, variable_object) - all_variables.append(variable_object) + assert isinstance(variable_object, Variable) + variables.append(variable_object) + only_custom = all(isinstance(v, (CustomType, InteractionType)) for v in variables) if only_custom: raise ValueError( "At least one of the variable types needs to be a type" @@ -206,7 +194,17 @@ def typify_variables( "blocking rules" ) - return primary_variables, all_variables + return variables + + +def _expand_higher_variables(variables: Iterable[Variable]) -> list[Variable]: + result: list[Variable] = [] + for variable in variables: + if hasattr(variable, "higher_vars"): + result.extend(variable.higher_vars) + else: + result.append(variable) + return result def missing(variables: list[Variable]) -> list[MissingDataType]: @@ -217,16 +215,12 @@ def missing(variables: list[Variable]) -> list[MissingDataType]: return missing_variables -def interactions( - definitions: Iterable[VariableDefinition], primary_variables: list[FieldVariable] -) -> list[InteractionType]: - field_d = {field.name: field for field in primary_variables} - +def _expanded_interactions(variables: list[Variable]) -> list[InteractionType]: + field_vars = {var.name: var for var in variables if isinstance(var, FieldVariable)} interactions = [] - for definition in definitions: - if definition["type"] == "Interaction": - var = InteractionType(definition) - var.expandInteractions(field_d) + for var in variables: + if isinstance(var, InteractionType): + var.expandInteractions(field_vars) interactions.extend(var.higher_vars) return interactions @@ -236,15 +230,27 @@ def missing_field_indices(variables: list[Variable]) -> list[int]: def interaction_indices(variables: list[Variable]) -> list[list[int]]: - var_names = [var.name for var in variables] + _ensure_unique_names(variables) + name_to_index = {var.name: i for i, var in enumerate(variables)} indices = [] for var in variables: if hasattr(var, "interaction_fields"): - interaction_indices = [var_names.index(f) for f in var.interaction_fields] # type: ignore + interaction_indices = [name_to_index[f] for f in var.interaction_fields] # type: ignore indices.append(interaction_indices) return indices +def _ensure_unique_names(variables: Iterable[Variable]) -> None: + seen = set() + for var in variables: + if var.name in seen: + raise ValueError( + "Variable name used more than once! " + "Choose a unique name for each variable: '{var.name}'" + ) + seen.add(var.name) + + def reduce_method(m): # type: ignore[no-untyped-def] return (getattr, (m.__self__, m.__func__.__name__)) diff --git a/dedupe/labeler.py b/dedupe/labeler.py index 9598292c4..8c51dd697 100644 --- a/dedupe/labeler.py +++ b/dedupe/labeler.py @@ -15,11 +15,10 @@ if TYPE_CHECKING: from typing import Dict, Iterable, Literal, Mapping - from dedupe._typing import Data, Labels, LabelsLike + from dedupe._typing import Data, FeaturizerFunction, Labels, LabelsLike from dedupe._typing import RecordDictPair as TrainingExample from dedupe._typing import RecordDictPairs as TrainingExamples from dedupe._typing import RecordIDPair - from dedupe.datamodel import DataModel from dedupe.predicates import Predicate @@ -70,33 +69,25 @@ def _verify_fit_args(pairs: TrainingExamples, y: LabelsLike) -> list[Literal[0, class MatchLearner(Learner): - def __init__(self, data_model: DataModel, candidates: TrainingExamples): - self.data_model = data_model + def __init__(self, featurizer: FeaturizerFunction, candidates: TrainingExamples): + self._featurizer = featurizer self._candidates = candidates.copy() self._classifier = sklearn.linear_model.LogisticRegression() - self._distances = self._calc_distances(self.candidates) + self._features = self._featurizer(self.candidates) def fit(self, pairs: TrainingExamples, y: LabelsLike) -> None: y = self._verify_fit_args(pairs, y) - self._classifier.fit(self._calc_distances(pairs), numpy.array(y)) + self._classifier.fit(self._featurizer(pairs), numpy.array(y)) self._fitted = True def remove(self, index: int) -> None: self._candidates.pop(index) - self._distances = numpy.delete(self._distances, index, axis=0) + self._features = numpy.delete(self._features, index, axis=0) def candidate_scores(self) -> numpy.typing.NDArray[numpy.float_]: if not self._fitted: raise ValueError("Must call fit() before candidate_scores()") - scores: numpy.typing.NDArray[numpy.float_] = self._classifier.predict_proba( - self._distances - )[:, 1].reshape(-1, 1) - return scores - - def _calc_distances( - self, pairs: TrainingExamples - ) -> numpy.typing.NDArray[numpy.float_]: - return self.data_model.distances(pairs) + return self._classifier.predict_proba(self._features)[:, 1].reshape(-1, 1) class BlockLearner(Learner): @@ -212,7 +203,7 @@ def _filter_canopy_predicates( class DedupeBlockLearner(BlockLearner): def __init__( self, - data_model: DataModel, + candidate_predicates: Iterable[Predicate], data: Data, index_include: TrainingExamples, ): @@ -224,7 +215,7 @@ def __init__( index_data = sample_records(data, 50000) sampled_records = sample_records(index_data, N_SAMPLED_RECORDS) - preds = _filter_canopy_predicates(data_model.predicates, canopies=True) + preds = _filter_canopy_predicates(candidate_predicates, canopies=True) self.block_learner = training.DedupeBlockLearner( preds, sampled_records, index_data ) @@ -262,7 +253,7 @@ def _sample(self, data: Data, sample_size: int) -> TrainingExamples: class RecordLinkBlockLearner(BlockLearner): def __init__( self, - data_model: DataModel, + candidate_predicates: Iterable[Predicate], data_1: Data, data_2: Data, index_include: TrainingExamples, @@ -276,7 +267,7 @@ def __init__( index_data = sample_records(data_2, 50000) sampled_records_2 = sample_records(index_data, N_SAMPLED_RECORDS) - preds = _filter_canopy_predicates(data_model.predicates, canopies=False) + preds = _filter_canopy_predicates(candidate_predicates, canopies=False) self.block_learner = training.RecordLinkBlockLearner( preds, sampled_records_1, sampled_records_2, index_data ) @@ -318,8 +309,7 @@ class DisagreementLearner(HasCandidates): matcher: MatchLearner blocker: BlockLearner - def __init__(self, data_model: DataModel) -> None: - self.data_model = data_model + def __init__(self) -> None: self.y: numpy.typing.NDArray[numpy.int_] = numpy.array([]) self.pairs: TrainingExamples = [] @@ -377,12 +367,12 @@ def learn_predicates( class DedupeDisagreementLearner(DisagreementLearner): def __init__( self, - data_model: DataModel, + candidate_predicates: Iterable[Predicate], + featurizer: FeaturizerFunction, data: Data, index_include: TrainingExamples, ): - super().__init__(data_model) - + super().__init__() data = core.index(data) random_pair = ( @@ -394,11 +384,11 @@ def __init__( index_include = index_include.copy() index_include.append(exact_match) - self.blocker = DedupeBlockLearner(data_model, data, index_include) + self.blocker = DedupeBlockLearner(candidate_predicates, data, index_include) self._candidates = self.blocker.candidates.copy() - self.matcher = MatchLearner(self.data_model, self.candidates) + self.matcher = MatchLearner(featurizer, self.candidates) examples = [exact_match] * 4 + [random_pair] labels: Labels = [1] * 4 + [0] # type: ignore[assignment] @@ -408,13 +398,13 @@ def __init__( class RecordLinkDisagreementLearner(DisagreementLearner): def __init__( self, - data_model: DataModel, + candidate_predicates: Iterable[Predicate], + featurizer: FeaturizerFunction, data_1: Data, data_2: Data, index_include: TrainingExamples, ): - super().__init__(data_model) - + super().__init__() data_1 = core.index(data_1) offset = len(data_1) @@ -429,10 +419,12 @@ def __init__( index_include = index_include.copy() index_include.append(exact_match) - self.blocker = RecordLinkBlockLearner(data_model, data_1, data_2, index_include) - self._candidates = self.blocker.candidates + self.blocker = RecordLinkBlockLearner( + candidate_predicates, data_1, data_2, index_include + ) + self._candidates = self.blocker.candidates.copy() - self.matcher = MatchLearner(self.data_model, self.candidates) + self.matcher = MatchLearner(featurizer, self.candidates) examples = [exact_match] * 4 + [random_pair] labels: Labels = [1] * 4 + [0] # type: ignore[assignment] diff --git a/dedupe/variables/base.py b/dedupe/variables/base.py index f80b28faa..c958db5ea 100644 --- a/dedupe/variables/base.py +++ b/dedupe/variables/base.py @@ -58,21 +58,16 @@ def all_subclasses( class DerivedType(Variable): - type = "Derived" - def __init__(self, definition: VariableDefinition): self.name = "(%s: %s)" % (str(definition["name"]), str(definition["type"])) super(DerivedType, self).__init__(definition) class MissingDataType(Variable): - type = "MissingData" + has_missing = False def __init__(self, name: str): - - self.name = "(%s: Not Missing)" % name - - self.has_missing = False + self.name = f"({name}: Not Missing)" class FieldType(Variable): diff --git a/tests/test_api.py b/tests/test_api.py index 84ac9169a..6f9f40dd5 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -58,11 +58,13 @@ def test_initialize_fields(self): [], ) + # only customs with self.assertRaises(ValueError): dedupe.api.ActiveMatching( [{"field": "name", "type": "Custom", "comparator": lambda x, y: 1}], ) + # Only customs with self.assertRaises(ValueError): dedupe.api.ActiveMatching( [ @@ -71,6 +73,44 @@ def test_initialize_fields(self): ], ) + # Only custom and interactions + with self.assertRaises(ValueError): + dedupe.api.ActiveMatching( + [ + {"field": "name", "type": "Custom", "comparator": lambda x, y: 1}, + {"field": "age", "type": "Custom", "comparator": lambda x, y: 1}, + {"type": "Interaction", "interaction variables": ["name", "age"]}, + ], + ) + + # Only interactions + with self.assertRaises(ValueError): + dedupe.api.ActiveMatching( + [ + {"type": "Interaction", "interaction variables": []}, + ], + ) + + # Duplicate variable names (explicitly) + with self.assertRaises(ValueError) as e: + dedupe.api.ActiveMatching( + [ + {"field": "age", "type": "String", "variable name": "my_age"}, + {"field": "age", "type": "ShortString", "variable name": "my_age"}, + ], + ) + assert "Variable name used more than once!" in str(e.exception) + + # Duplicate variable names (implicitly) + with self.assertRaises(ValueError) as e: + dedupe.api.ActiveMatching( + [ + {"field": "age", "type": "String"}, + {"field": "age", "type": "String"}, + ], + ) + assert "Variable name used more than once!" in str(e.exception) + dedupe.api.ActiveMatching( [ {"field": "name", "type": "Custom", "comparator": lambda x, y: 1}, @@ -78,6 +118,19 @@ def test_initialize_fields(self): ], ) + dedupe.api.ActiveMatching( + [ + {"field": "name", "variable name": "name", "type": "String"}, + { + "field": "age", + "variable name": "age", + "type": "Custom", + "comparator": lambda x, y: 1, + }, + {"type": "Interaction", "interaction variables": ["name", "age"]}, + ], + ) + def test_check_record(self): matcher = dedupe.api.ActiveMatching(self.field_definition) diff --git a/tests/test_core.py b/tests/test_core.py index 2b4fc1c51..9754f1547 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -70,7 +70,7 @@ def setUp(self): def test_score_duplicates(self): scores = dedupe.core.scoreDuplicates( - self.records, self.data_model, self.classifier, 2 + self.records, self.data_model.distances, self.classifier, 2 ) numpy.testing.assert_equal(scores["pairs"], self.desired_scored_pairs["pairs"]) @@ -94,7 +94,7 @@ def test_score_duplicates_with_zeros(self): expected = numpy.array([(["3", "4"], 1)], dtype=dtype) scores = dedupe.core.scoreDuplicates( - records, self.data_model, self.classifier, 2 + records, self.data_model.distances, self.classifier, 2 ) assert isinstance(scores, numpy.memmap) diff --git a/tests/test_labeler.py b/tests/test_labeler.py index 7f9c8df20..30609ffae 100644 --- a/tests/test_labeler.py +++ b/tests/test_labeler.py @@ -43,7 +43,9 @@ def test_AL(self): ({"name": "William", "age": "35"}, {"name": "Jimbo", "age": "21"}), ] EXPECTED_CANDIDATES = {freeze_record_pair(pair) for pair in EXPECTED_CANDIDATES} - active_learner = labeler.DedupeDisagreementLearner(self.data_model, SAMPLE, []) + active_learner = labeler.DedupeDisagreementLearner( + self.data_model.predicates, self.data_model.distances, SAMPLE, [] + ) actual_candidates = set() for i in range(len(EXPECTED_CANDIDATES), 0, -1): assert len(active_learner) == i