Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 66 additions & 8 deletions ami/exports/format_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,24 @@ class OccurrenceTabularSerializer(serializers.ModelSerializer):
determination_score = serializers.FloatField(allow_null=True)
verification_status = serializers.SerializerMethodField()

# Machine prediction fields
best_machine_prediction_name = serializers.CharField(allow_null=True, default=None)
best_machine_prediction_algorithm = serializers.CharField(allow_null=True, default=None)
best_machine_prediction_score = serializers.FloatField(allow_null=True, default=None)

# Verification fields
verified_by = serializers.SerializerMethodField()
participant_count = serializers.IntegerField(default=0)
agreed_with_algorithm = serializers.SerializerMethodField()
agreed_with_user = serializers.SerializerMethodField()
determination_matches_machine_prediction = serializers.SerializerMethodField()

# Detection fields
best_detection_url = serializers.SerializerMethodField()
best_detection_bbox = serializers.SerializerMethodField()
best_detection_width = serializers.SerializerMethodField()
best_detection_height = serializers.SerializerMethodField()
best_detection_source_image_url = serializers.SerializerMethodField()

class Meta:
model = Occurrence
Expand All @@ -111,29 +126,60 @@ class Meta:
"determination_name",
"determination_score",
"verification_status",
"best_machine_prediction_name",
"best_machine_prediction_algorithm",
"best_machine_prediction_score",
"verified_by",
"participant_count",
"agreed_with_algorithm",
"agreed_with_user",
"determination_matches_machine_prediction",
"detections_count",
"first_appearance_timestamp",
"last_appearance_timestamp",
"duration",
"best_detection_url",
"best_detection_bbox",
"best_detection_width",
"best_detection_height",
"best_detection_source_image_url",
]

def get_verification_status(self, obj):
"""
Returns 'Verified' if the occurrence has identifications, otherwise 'Not verified'.
"""
return "Verified" if obj.identifications.exists() else "Not verified"
"""Returns 'Verified' if the occurrence has non-withdrawn identifications."""
count = getattr(obj, "participant_count", None)
if count is not None:
return "Verified" if count > 0 else "Not verified"
return "Verified" if obj.identifications.filter(withdrawn=False).exists() else "Not verified"
Comment thread
mihow marked this conversation as resolved.
Outdated

def get_verified_by(self, obj):
"""Returns the display name of the user who made the best identification."""
return getattr(obj, "verified_by_name", None)

def get_agreed_with_algorithm(self, obj):
"""Returns the algorithm name if the identifier explicitly agreed with an ML prediction."""
return getattr(obj, "agreed_with_algorithm_name", None)

def get_agreed_with_user(self, obj):
"""Returns the email of the prior identifier the best identification explicitly agreed with."""
return getattr(obj, "agreed_with_user_email", None)
Comment thread
mihow marked this conversation as resolved.

def get_determination_matches_machine_prediction(self, obj):
Comment thread
mihow marked this conversation as resolved.
"""Returns whether the determination taxon matches the best machine prediction taxon."""
prediction_taxon_id = getattr(obj, "best_machine_prediction_taxon_id", None)
if prediction_taxon_id is None or obj.determination_id is None:
return None
return obj.determination_id == prediction_taxon_id

def get_best_detection_url(self, obj):
"""
Returns the full URL to the cropped detection image.
Uses the annotated best_detection_path from the queryset.
"""
"""Returns the full URL to the cropped detection image."""
path = getattr(obj, "best_detection_path", None)
return get_media_url(path) if path else None

def get_best_detection_bbox(self, obj):
"""Returns the raw bounding box coordinates [x1, y1, x2, y2]."""
return getattr(obj, "best_detection_bbox", None)

def get_best_detection_width(self, obj):
"""Returns the width of the detection bounding box."""
bbox = BoundingBox.from_coords(getattr(obj, "best_detection_bbox", None), raise_on_error=False)
Expand All @@ -144,6 +190,16 @@ def get_best_detection_height(self, obj):
bbox = BoundingBox.from_coords(getattr(obj, "best_detection_bbox", None), raise_on_error=False)
return bbox.height if bbox else None

def get_best_detection_source_image_url(self, obj):
"""Returns the public URL to the original source image."""
path = getattr(obj, "best_detection_source_image_path", None)
base_url = getattr(obj, "best_detection_source_image_public_base_url", None)
if path and base_url:
import urllib.parse

return urllib.parse.urljoin(base_url, path.lstrip("/"))
Comment thread
mihow marked this conversation as resolved.
Outdated
return None


class CSVExporter(BaseExporter):
"""Handles CSV export of occurrences."""
Expand All @@ -165,6 +221,8 @@ def get_queryset(self):
.with_detections_count()
.with_identifications()
.with_best_detection() # type: ignore[union-attr] Custom queryset method
.with_best_machine_prediction()
.with_verification_info()
)

def export(self):
Expand Down
251 changes: 250 additions & 1 deletion ami/exports/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from rest_framework.test import APIClient

from ami.exports.models import DataExport
from ami.main.models import Occurrence, SourceImageCollection
from ami.main.models import Detection, Identification, Occurrence, SourceImageCollection, Taxon
from ami.ml.models import Algorithm
from ami.tests.fixtures.main import (
create_captures,
create_occurrences,
Expand Down Expand Up @@ -302,3 +303,251 @@ def test_non_member_cannot_create_export(self):
self.non_member.has_perm(Project.Permissions.CREATE_DATA_EXPORT, self.project),
"Non-member should not have create_dataexport permission",
)


class ExportNewFieldsTest(TestCase):
"""Test the new machine prediction, verification, and detection fields in CSV exports."""

def setUp(self):
self.project, self.deployment = setup_test_project(reuse=False)
self.user = self.project.owner
self.user.name = "Test Verifier"
self.user.save()
self.client = APIClient()
self.client.force_authenticate(user=self.user)

create_captures(deployment=self.deployment, num_nights=1, images_per_night=4, interval_minutes=1)
group_images_into_events(self.deployment)
create_taxa(self.project)

# Create an algorithm for classifications
self.algorithm, _ = Algorithm.objects.get_or_create(
name="test-classifier",
defaults={"key": "test-classifier"},
)

# Create a second taxon for disagreement tests
self.taxa = list(Taxon.objects.filter(projects=self.project)[:2])
self.taxon_a = self.taxa[0]
if len(self.taxa) > 1:
self.taxon_b = self.taxa[1]
else:
self.taxon_b = Taxon.objects.create(name="Test Taxon B")
self.taxon_b.projects.add(self.project)

def _create_occurrence_with_prediction(self, taxon=None, score=0.85):
"""Create an occurrence with a single detection and ML classification."""
taxon = taxon or self.taxon_a
source_image = self.project.captures.first()
detection = Detection.objects.create(
source_image=source_image,
timestamp=source_image.timestamp,
bbox=[0.1, 0.1, 0.5, 0.5],
path="detections/test.jpg",
)
classification = detection.classifications.create(
taxon=taxon,
score=score,
timestamp=source_image.timestamp,
algorithm=self.algorithm,
terminal=True,
)
occurrence = detection.associate_new_occurrence()
return occurrence, classification

def _run_csv_export(self):
"""Run a CSV export and return the rows as a list of dicts."""
data_export = DataExport.objects.create(
user=self.user,
project=self.project,
format="occurrences_simple_csv",
job=None,
)
file_url = data_export.run_export()
self.assertIsNotNone(file_url)
file_path = file_url.replace("/media/", "")
with default_storage.open(file_path, "r") as f:
rows = list(csv.DictReader(f))
default_storage.delete(file_path)
return rows

def test_ml_prediction_only(self):
"""Occurrence with only ML prediction: machine prediction fields populated, verified_by null."""
occurrence, classification = self._create_occurrence_with_prediction()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
rows = self._run_csv_export()

row = next(r for r in rows if int(r["id"]) == occurrence.pk)
self.assertEqual(row["best_machine_prediction_name"], self.taxon_a.name)
self.assertEqual(row["best_machine_prediction_algorithm"], "test-classifier")
self.assertAlmostEqual(float(row["best_machine_prediction_score"]), 0.85, places=2)
self.assertEqual(row["verified_by"], "")
self.assertEqual(row["participant_count"], "0")

def test_ml_prediction_with_agreeing_human(self):
"""Human agrees with ML: verified_by set, determination_matches = True."""
occurrence, classification = self._create_occurrence_with_prediction()

# Human agrees with the same taxon
Identification.objects.create(
user=self.user,
taxon=self.taxon_a,
occurrence=occurrence,
agreed_with_prediction=classification,
)

rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)

# Machine prediction fields still populated
self.assertEqual(row["best_machine_prediction_name"], self.taxon_a.name)
self.assertAlmostEqual(float(row["best_machine_prediction_score"]), 0.85, places=2)

# Verification fields
verified_by = row["verified_by"]
self.assertTrue(verified_by, "verified_by should not be empty")
self.assertEqual(row["participant_count"], "1")
self.assertEqual(row["agreed_with_algorithm"], "test-classifier")
self.assertEqual(row["determination_matches_machine_prediction"], "True")

def test_ml_prediction_with_disagreeing_human(self):
"""Human disagrees with ML: different determination, determination_matches = False."""
occurrence, classification = self._create_occurrence_with_prediction(taxon=self.taxon_a)

# Human identifies as a different taxon
Identification.objects.create(
user=self.user,
taxon=self.taxon_b,
occurrence=occurrence,
)

rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)

# Machine prediction still shows original
self.assertEqual(row["best_machine_prediction_name"], self.taxon_a.name)
# Determination is now the human's choice
self.assertEqual(row["determination_name"], self.taxon_b.name)
self.assertEqual(row["determination_matches_machine_prediction"], "False")
self.assertEqual(row["agreed_with_algorithm"], "")

def test_human_agrees_with_another_human(self):
"""User B agrees with user A's identification: agreed_with_user exposes A's email."""
from ami.users.models import User

user_a = User.objects.create_user(email="user-a@test.org")
user_b = User.objects.create_user(email="user-b@test.org")

occurrence, _ = self._create_occurrence_with_prediction()

id_a = Identification.objects.create(
user=user_a,
taxon=self.taxon_b,
occurrence=occurrence,
)
Identification.objects.create(
user=user_b,
taxon=self.taxon_b,
occurrence=occurrence,
agreed_with_identification=id_a,
)

rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)

self.assertEqual(row["agreed_with_user"], "user-a@test.org")
# Not agreeing with an ML prediction
self.assertEqual(row["agreed_with_algorithm"], "")

def test_multiple_identifications_count(self):
"""Multiple identifications: verified_by_count reflects all non-withdrawn IDs."""
occurrence, _ = self._create_occurrence_with_prediction()

from ami.users.models import User

user2 = User.objects.create_user(email="verifier2@test.org")

Identification.objects.create(user=self.user, taxon=self.taxon_a, occurrence=occurrence)
Identification.objects.create(user=user2, taxon=self.taxon_a, occurrence=occurrence)

rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)
self.assertEqual(row["participant_count"], "2")

def test_detection_bbox_field(self):
"""Best detection bbox is included in export."""
occurrence, _ = self._create_occurrence_with_prediction()
rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)
self.assertIn("best_detection_bbox", row)
# bbox should be a string representation of the list
self.assertIn("0.1", row["best_detection_bbox"])

def test_api_and_csv_pick_same_best_prediction_with_mixed_terminal(self):
"""Occurrence.best_prediction and with_best_machine_prediction() must agree.

With both a high-score non-terminal classification and a lower-score terminal
classification, the terminal row should win in both the API's cached
best_prediction and the CSV's annotated best_machine_prediction_* fields.
"""
alg_intermediate, _ = Algorithm.objects.get_or_create(
name="intermediate-classifier", defaults={"key": "intermediate-classifier"}
)
alg_terminal, _ = Algorithm.objects.get_or_create(
name="terminal-classifier", defaults={"key": "terminal-classifier"}
)
source_image = self.project.captures.first()
detection = Detection.objects.create(
source_image=source_image,
timestamp=source_image.timestamp,
bbox=[0.1, 0.1, 0.5, 0.5],
path="detections/mixed.jpg",
)
detection.classifications.create(
taxon=self.taxon_a,
score=0.95,
timestamp=source_image.timestamp,
algorithm=alg_intermediate,
terminal=False,
)
detection.classifications.create(
taxon=self.taxon_b,
score=0.80,
timestamp=source_image.timestamp,
algorithm=alg_terminal,
terminal=True,
)
occurrence = detection.associate_new_occurrence()

rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)

self.assertEqual(row["best_machine_prediction_name"], self.taxon_b.name)
self.assertEqual(row["best_machine_prediction_algorithm"], "terminal-classifier")
self.assertAlmostEqual(float(row["best_machine_prediction_score"]), 0.80, places=2)

occurrence.refresh_from_db()
api_best = occurrence.best_prediction
self.assertIsNotNone(api_best)
self.assertEqual(api_best.taxon_id, self.taxon_b.pk)
self.assertEqual(api_best.algorithm.name, "terminal-classifier")

def test_csv_has_all_new_fields(self):
"""All new fields are present as CSV column headers."""
self._create_occurrence_with_prediction()
rows = self._run_csv_export()
self.assertGreater(len(rows), 0)
headers = rows[0].keys()
expected_fields = [
"best_machine_prediction_name",
"best_machine_prediction_algorithm",
"best_machine_prediction_score",
"verified_by",
"participant_count",
"agreed_with_algorithm",
"agreed_with_user",
"determination_matches_machine_prediction",
"best_detection_bbox",
"best_detection_source_image_url",
]
for field in expected_fields:
self.assertIn(field, headers, f"Missing CSV field: {field}")
Loading
Loading