Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5ff8d22
ref: don't pass DataModel into BlockLearners
NickCrews Sep 16, 2022
3f75472
ref: don't store DataModel in DisagreementLearner
NickCrews Sep 16, 2022
7aa69ea
ref: Don't require DataModel in MatchLearner
NickCrews Sep 16, 2022
7e5633a
bugfix:? Copy candidate preds in RecordLinkDisagreementLearner
NickCrews Sep 16, 2022
1d1b871
ref: Remove use of DataModel from active learners
NickCrews Sep 16, 2022
7350aed
ref: Don't use DataModel in core
NickCrews Sep 16, 2022
3c20417
ref: remove __len__ from DataModel
NickCrews Sep 16, 2022
25f91a5
ref: make typifying variables more clear
NickCrews Sep 16, 2022
b2234e8
ref: tweak error raising in typify_variables()
NickCrews Sep 16, 2022
13b9f50
ref: Clarify only_custom logic in typify_variables
NickCrews Sep 16, 2022
78ef7ff
typ: Fix typing of typify_variables()
NickCrews Sep 16, 2022
77b4192
ref: Further rename field to variable in datamodel
NickCrews Sep 16, 2022
497badc
test: Add more tests for interaction variables
NickCrews Sep 16, 2022
c04981d
Ensure unique variable names
NickCrews Sep 16, 2022
3e55496
ref: Remove "type" from non-exposed variables
NickCrews Sep 16, 2022
f32f511
ref: Create InteractionVariables the same as the rest
NickCrews Sep 16, 2022
e169cc2
ref: Move check for empty var def into typify_variables
NickCrews Sep 16, 2022
c181763
Move _load_settings() into separate function
NickCrews Sep 15, 2022
241c0a6
ref: Add versioning to _load_settings()
NickCrews Sep 16, 2022
83a2e89
ref: improve settings loading exception reporting
NickCrews Sep 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dedupe/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ class TrainingData(TypedDict):
distinct: List[RecordDictPair]


# Takes pairs of records and generates a (n_samples X n_features) array
FeaturizerFunction = Callable[
[Sequence[RecordDictPair]], numpy.typing.NDArray[numpy.float_]
]


class Classifier(Protocol):
"""Takes an array of pairwise distances and computes the likelihood they are a pair."""

Expand Down
69 changes: 48 additions & 21 deletions dedupe/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sqlite3
import tempfile
import warnings
from io import BytesIO
from typing import TYPE_CHECKING, cast

import numpy
Expand Down Expand Up @@ -107,7 +108,7 @@ def score(self, pairs: RecordPairs) -> Scores:
"""
try:
matches = core.scoreDuplicates(
pairs, self.data_model, self.classifier, self.num_cores
pairs, self.data_model.distances, self.classifier, self.num_cores
)
except RuntimeError:
raise RuntimeError(
Expand Down Expand Up @@ -824,7 +825,7 @@ def score(self, blocks: Blocks) -> Generator[Scores, None, None]:
"""

matches = core.scoreGazette(
blocks, self.data_model, self.classifier, self.num_cores
blocks, self.data_model.distances, self.classifier, self.num_cores
)

return matches
Expand Down Expand Up @@ -978,10 +979,37 @@ def __init__(
"""
super().__init__(num_cores, in_memory, **kwargs)

self.data_model, self.classifier, self.predicates = self._load_settings(
settings_file
)

logger.info("Predicate set:")
for predicate in self.predicates:
logger.info(predicate)

self._fingerprinter = blocking.Fingerprinter(self.predicates)

@classmethod
def _load_settings(
cls, settings_file: BinaryIO
) -> tuple[datamodel.DataModel, Classifier, list[dedupe.predicates.Predicate]]:
# Make a copy so we can read it multiple times
settings_file = BytesIO(settings_file.read())
settings_file.seek(0)
catchall_exception = SettingsFileLoadingException(
"Something has gone wrong with loading the settings file. "
"Try deleting the file"
)
try:
self.data_model = pickle.load(settings_file)
self.classifier = pickle.load(settings_file)
self.predicates = pickle.load(settings_file)
version = pickle.load(settings_file)
if not isinstance(version, int):
settings_file.seek(0)
return cls._load_settings_v0(settings_file)
else:
raise SettingsFileLoadingException(
"Settings file version {} not understood".format(version)
)

except (KeyError, AttributeError):
raise SettingsFileLoadingException(
"This settings file is not compatible with "
Expand All @@ -997,21 +1025,18 @@ def __init__(
"install that library: `pip install rlr`"
)
else:
raise SettingsFileLoadingException(
"Something has gone wrong with loading the settings file. "
"Try deleting the file"
) from exc
except: # noqa: E722
raise SettingsFileLoadingException(
"Something has gone wrong with loading the settings file. "
"Try deleting the file"
)
raise
except Exception as exc:
raise catchall_exception from exc

logger.info("Predicate set:")
for predicate in self.predicates:
logger.info(predicate)

self._fingerprinter = blocking.Fingerprinter(self.predicates)
@staticmethod
def _load_settings_v0(
settings_file: BinaryIO,
) -> tuple[datamodel.DataModel, Classifier, list[dedupe.predicates.Predicate]]:
data_model = pickle.load(settings_file)
classifier = pickle.load(settings_file)
predicates = pickle.load(settings_file)
return data_model, classifier, predicates


class ActiveMatching(Matching):
Expand Down Expand Up @@ -1325,7 +1350,8 @@ def prepare_training(
examples, y = flatten_training(self.training_pairs)

self.active_learner = labeler.DedupeDisagreementLearner(
self.data_model,
self.data_model.predicates,
self.data_model.distances,
data,
index_include=examples,
)
Expand Down Expand Up @@ -1392,7 +1418,8 @@ def prepare_training(
examples, y = flatten_training(self.training_pairs)

self.active_learner = labeler.RecordLinkDisagreementLearner(
self.data_model,
self.data_model.predicates,
self.data_model.distances,
data_1,
data_2,
index_include=examples,
Expand Down
26 changes: 13 additions & 13 deletions dedupe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@
Classifier,
ClosableJoinable,
Data,
FeaturizerFunction,
Literal,
MapLike,
RecordID,
RecordIDDType,
RecordPairs,
Scores,
)
from dedupe.datamodel import DataModel

_Queue = Union[multiprocessing.dummy.Queue, multiprocessing.Queue]

Expand All @@ -53,15 +53,15 @@ class BlockingError(Exception):
class ScoreDupes(object):
def __init__(
self,
data_model: DataModel,
featurizer: FeaturizerFunction,
classifier: Classifier,
records_queue: _Queue,
exception_queue: _Queue,
score_file_path: str,
dtype: numpy.dtype,
offset,
):
self.data_model = data_model
self.featurizer = featurizer
self.classifier = classifier
self.records_queue = records_queue
self.exception_queue = exception_queue
Expand All @@ -87,8 +87,8 @@ def fieldDistance(self, record_pairs: RecordPairs) -> None:
if not records:
return

distances = self.data_model.distances(records)
scores = self.classifier.predict_proba(distances)[:, -1]
features = self.featurizer(records)
scores = self.classifier.predict_proba(features)[:, -1]

mask = scores > 0
if not mask.any():
Expand All @@ -113,7 +113,7 @@ def fieldDistance(self, record_pairs: RecordPairs) -> None:

def scoreDuplicates(
record_pairs: RecordPairs,
data_model: DataModel,
featurizer: FeaturizerFunction,
classifier: Classifier,
num_cores: int = 1,
) -> Scores:
Expand Down Expand Up @@ -145,7 +145,7 @@ def scoreDuplicates(

n_map_processes = max(num_cores, 1)
score_records = ScoreDupes(
data_model,
featurizer,
classifier,
record_pairs_queue,
exception_queue,
Expand Down Expand Up @@ -200,15 +200,15 @@ def fillQueue(


class ScoreGazette(object):
def __init__(self, data_model: DataModel, classifier: Classifier):
self.data_model = data_model
def __init__(self, featurizer: FeaturizerFunction, classifier: Classifier):
self.featurizer = featurizer
self.classifier = classifier

def __call__(self, block: Block) -> Scores:
record_ids, records = zip(*(zip(*each) for each in block))

distances = self.data_model.distances(records)
scores = self.classifier.predict_proba(distances)[:, -1]
features = self.featurizer(records)
scores = self.classifier.predict_proba(features)[:, -1]

id_type = sniff_id_type(record_ids)
ids = numpy.array(record_ids, dtype=id_type)
Expand All @@ -227,7 +227,7 @@ def __call__(self, block: Block) -> Scores:

def scoreGazette(
record_pairs: Blocks,
data_model: DataModel,
featurizer: FeaturizerFunction,
classifier: Classifier,
num_cores: int = 1,
) -> Generator[Scores, None, None]:
Expand All @@ -238,7 +238,7 @@ def scoreGazette(

imap, pool = appropriate_imap(num_cores)

score_records = ScoreGazette(data_model, classifier)
score_records = ScoreGazette(featurizer, classifier)

for scored_pairs in imap(score_records, record_pairs):
yield scored_pairs
Expand Down
Loading