Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dedupe/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ class TrainingData(TypedDict):
distinct: List[RecordDictPair]


# Takes pairs of records and generates a (n_samples X n_features) array
FeaturizerFunction = Callable[
[Sequence[RecordDictPair]], numpy.typing.NDArray[numpy.float_]
]


class Classifier(Protocol):
"""Takes an array of pairwise distances and computes the likelihood they are a pair."""

Expand Down
10 changes: 6 additions & 4 deletions dedupe/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def score(self, pairs: RecordPairs) -> Scores:
"""
try:
matches = core.scoreDuplicates(
pairs, self.data_model, self.classifier, self.num_cores
pairs, self.data_model.distances, self.classifier, self.num_cores
)
except RuntimeError:
raise RuntimeError(
Expand Down Expand Up @@ -824,7 +824,7 @@ def score(self, blocks: Blocks) -> Generator[Scores, None, None]:
"""

matches = core.scoreGazette(
blocks, self.data_model, self.classifier, self.num_cores
blocks, self.data_model.distances, self.classifier, self.num_cores
)

return matches
Expand Down Expand Up @@ -1325,7 +1325,8 @@ def prepare_training(
examples, y = flatten_training(self.training_pairs)

self.active_learner = labeler.DedupeDisagreementLearner(
self.data_model,
self.data_model.predicates,
self.data_model.distances,
data,
index_include=examples,
)
Expand Down Expand Up @@ -1392,7 +1393,8 @@ def prepare_training(
examples, y = flatten_training(self.training_pairs)

self.active_learner = labeler.RecordLinkDisagreementLearner(
self.data_model,
self.data_model.predicates,
self.data_model.distances,
data_1,
data_2,
index_include=examples,
Expand Down
26 changes: 13 additions & 13 deletions dedupe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@
Classifier,
ClosableJoinable,
Data,
FeaturizerFunction,
Literal,
MapLike,
RecordID,
RecordIDDType,
RecordPairs,
Scores,
)
from dedupe.datamodel import DataModel

_Queue = Union[multiprocessing.dummy.Queue, multiprocessing.Queue]

Expand All @@ -53,15 +53,15 @@ class BlockingError(Exception):
class ScoreDupes(object):
def __init__(
self,
data_model: DataModel,
featurizer: FeaturizerFunction,
classifier: Classifier,
records_queue: _Queue,
exception_queue: _Queue,
score_file_path: str,
dtype: numpy.dtype,
offset,
):
self.data_model = data_model
self.featurizer = featurizer
self.classifier = classifier
self.records_queue = records_queue
self.exception_queue = exception_queue
Expand All @@ -87,8 +87,8 @@ def fieldDistance(self, record_pairs: RecordPairs) -> None:
if not records:
return

distances = self.data_model.distances(records)
scores = self.classifier.predict_proba(distances)[:, -1]
features = self.featurizer(records)
scores = self.classifier.predict_proba(features)[:, -1]

mask = scores > 0
if not mask.any():
Expand All @@ -113,7 +113,7 @@ def fieldDistance(self, record_pairs: RecordPairs) -> None:

def scoreDuplicates(
record_pairs: RecordPairs,
data_model: DataModel,
featurizer: FeaturizerFunction,
classifier: Classifier,
num_cores: int = 1,
) -> Scores:
Expand Down Expand Up @@ -145,7 +145,7 @@ def scoreDuplicates(

n_map_processes = max(num_cores, 1)
score_records = ScoreDupes(
data_model,
featurizer,
classifier,
record_pairs_queue,
exception_queue,
Expand Down Expand Up @@ -200,15 +200,15 @@ def fillQueue(


class ScoreGazette(object):
def __init__(self, data_model: DataModel, classifier: Classifier):
self.data_model = data_model
def __init__(self, featurizer: FeaturizerFunction, classifier: Classifier):
self.featurizer = featurizer
self.classifier = classifier

def __call__(self, block: Block) -> Scores:
record_ids, records = zip(*(zip(*each) for each in block))

distances = self.data_model.distances(records)
scores = self.classifier.predict_proba(distances)[:, -1]
features = self.featurizer(records)
scores = self.classifier.predict_proba(features)[:, -1]

id_type = sniff_id_type(record_ids)
ids = numpy.array(record_ids, dtype=id_type)
Expand All @@ -227,7 +227,7 @@ def __call__(self, block: Block) -> Scores:

def scoreGazette(
record_pairs: Blocks,
data_model: DataModel,
featurizer: FeaturizerFunction,
classifier: Classifier,
num_cores: int = 1,
) -> Generator[Scores, None, None]:
Expand All @@ -238,7 +238,7 @@ def scoreGazette(

imap, pool = appropriate_imap(num_cores)

score_records = ScoreGazette(data_model, classifier)
score_records = ScoreGazette(featurizer, classifier)

for scored_pairs in imap(score_records, record_pairs):
yield scored_pairs
Expand Down
56 changes: 24 additions & 32 deletions dedupe/labeler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
if TYPE_CHECKING:
from typing import Dict, Iterable, Literal, Mapping

from dedupe._typing import Data, Labels, LabelsLike
from dedupe._typing import Data, FeaturizerFunction, Labels, LabelsLike
from dedupe._typing import RecordDictPair as TrainingExample
from dedupe._typing import RecordDictPairs as TrainingExamples
from dedupe._typing import RecordIDPair
from dedupe.datamodel import DataModel
from dedupe.predicates import Predicate


Expand Down Expand Up @@ -70,33 +69,25 @@ def _verify_fit_args(pairs: TrainingExamples, y: LabelsLike) -> list[Literal[0,


class MatchLearner(Learner):
def __init__(self, data_model: DataModel, candidates: TrainingExamples):
self.data_model = data_model
def __init__(self, featurizer: FeaturizerFunction, candidates: TrainingExamples):
self._featurizer = featurizer
self._candidates = candidates.copy()
self._classifier = sklearn.linear_model.LogisticRegression()
self._distances = self._calc_distances(self.candidates)
self._features = self._featurizer(self.candidates)

def fit(self, pairs: TrainingExamples, y: LabelsLike) -> None:
y = self._verify_fit_args(pairs, y)
self._classifier.fit(self._calc_distances(pairs), numpy.array(y))
self._classifier.fit(self._featurizer(pairs), numpy.array(y))
self._fitted = True

def remove(self, index: int) -> None:
self._candidates.pop(index)
self._distances = numpy.delete(self._distances, index, axis=0)
self._features = numpy.delete(self._features, index, axis=0)

def candidate_scores(self) -> numpy.typing.NDArray[numpy.float_]:
if not self._fitted:
raise ValueError("Must call fit() before candidate_scores()")
scores: numpy.typing.NDArray[numpy.float_] = self._classifier.predict_proba(
self._distances
)[:, 1].reshape(-1, 1)
return scores

def _calc_distances(
self, pairs: TrainingExamples
) -> numpy.typing.NDArray[numpy.float_]:
return self.data_model.distances(pairs)
return self._classifier.predict_proba(self._features)[:, 1].reshape(-1, 1)


class BlockLearner(Learner):
Expand Down Expand Up @@ -216,7 +207,7 @@ def _filter_canopy_predicates(
class DedupeBlockLearner(BlockLearner):
def __init__(
self,
data_model: DataModel,
candidate_predicates: Iterable[Predicate],
data: Data,
index_include: TrainingExamples,
):
Expand All @@ -228,7 +219,7 @@ def __init__(
index_data = sample_records(data, 50000)
sampled_records = sample_records(index_data, N_SAMPLED_RECORDS)

preds = _filter_canopy_predicates(data_model.predicates, canopies=True)
preds = _filter_canopy_predicates(candidate_predicates, canopies=True)
self.block_learner = training.DedupeBlockLearner(
preds, sampled_records, index_data
)
Expand Down Expand Up @@ -268,7 +259,7 @@ def _sample(self, data: Data, sample_size: int) -> TrainingExamples:
class RecordLinkBlockLearner(BlockLearner):
def __init__(
self,
data_model: DataModel,
candidate_predicates: Iterable[Predicate],
data_1: Data,
data_2: Data,
index_include: TrainingExamples,
Expand All @@ -282,7 +273,7 @@ def __init__(
index_data = sample_records(data_2, 50000)
sampled_records_2 = sample_records(index_data, N_SAMPLED_RECORDS)

preds = _filter_canopy_predicates(data_model.predicates, canopies=False)
preds = _filter_canopy_predicates(candidate_predicates, canopies=False)
self.block_learner = training.RecordLinkBlockLearner(
preds, sampled_records_1, sampled_records_2, index_data
)
Expand Down Expand Up @@ -324,8 +315,7 @@ class DisagreementLearner(HasCandidates):
matcher: MatchLearner
blocker: BlockLearner

def __init__(self, data_model: DataModel) -> None:
self.data_model = data_model
def __init__(self) -> None:
self.y: numpy.typing.NDArray[numpy.int_] = numpy.array([])
self.pairs: TrainingExamples = []

Expand Down Expand Up @@ -383,12 +373,12 @@ def learn_predicates(
class DedupeDisagreementLearner(DisagreementLearner):
def __init__(
self,
data_model: DataModel,
candidate_predicates: Iterable[Predicate],
featurizer: FeaturizerFunction,
data: Data,
index_include: TrainingExamples,
):
super().__init__(data_model)

super().__init__()
data = core.index(data)

random_pair = (
Expand All @@ -400,11 +390,11 @@ def __init__(
index_include = index_include.copy()
index_include.append(exact_match)

self.blocker = DedupeBlockLearner(data_model, data, index_include)
self.blocker = DedupeBlockLearner(candidate_predicates, data, index_include)

self._candidates = self.blocker.candidates.copy()

self.matcher = MatchLearner(self.data_model, self.candidates)
self.matcher = MatchLearner(featurizer, self.candidates)

examples = [exact_match] * 4 + [random_pair]
labels: Labels = [1] * 4 + [0] # type: ignore[assignment]
Expand All @@ -414,13 +404,13 @@ def __init__(
class RecordLinkDisagreementLearner(DisagreementLearner):
def __init__(
self,
data_model: DataModel,
candidate_predicates: Iterable[Predicate],
featurizer: FeaturizerFunction,
data_1: Data,
data_2: Data,
index_include: TrainingExamples,
):
super().__init__(data_model)

super().__init__()
data_1 = core.index(data_1)

offset = len(data_1)
Expand All @@ -435,10 +425,12 @@ def __init__(
index_include = index_include.copy()
index_include.append(exact_match)

self.blocker = RecordLinkBlockLearner(data_model, data_1, data_2, index_include)
self.blocker = RecordLinkBlockLearner(
candidate_predicates, data_1, data_2, index_include
)
self._candidates = self.blocker.candidates.copy()

self.matcher = MatchLearner(self.data_model, self.candidates)
self.matcher = MatchLearner(featurizer, self.candidates)

examples = [exact_match] * 4 + [random_pair]
labels: Labels = [1] * 4 + [0] # type: ignore[assignment]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def setUp(self):

def test_score_duplicates(self):
scores = dedupe.core.scoreDuplicates(
self.records, self.data_model, self.classifier, 2
self.records, self.data_model.distances, self.classifier, 2
)

numpy.testing.assert_equal(scores["pairs"], self.desired_scored_pairs["pairs"])
Expand All @@ -94,7 +94,7 @@ def test_score_duplicates_with_zeros(self):
expected = numpy.array([(["3", "4"], 1)], dtype=dtype)

scores = dedupe.core.scoreDuplicates(
records, self.data_model, self.classifier, 2
records, self.data_model.distances, self.classifier, 2
)

assert isinstance(scores, numpy.memmap)
Expand Down
4 changes: 3 additions & 1 deletion tests/test_labeler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def test_AL(self):
({"name": "William", "age": "35"}, {"name": "Jimbo", "age": "21"}),
]
EXPECTED_CANDIDATES = {freeze_record_pair(pair) for pair in EXPECTED_CANDIDATES}
active_learner = labeler.DedupeDisagreementLearner(self.data_model, SAMPLE, [])
active_learner = labeler.DedupeDisagreementLearner(
self.data_model.predicates, self.data_model.distances, SAMPLE, []
)
actual_candidates = set()
for i in range(len(EXPECTED_CANDIDATES), 0, -1):
assert len(active_learner) == i
Expand Down