Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 74 additions & 8 deletions ami/exports/format_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,24 @@ class OccurrenceTabularSerializer(serializers.ModelSerializer):
determination_score = serializers.FloatField(allow_null=True)
verification_status = serializers.SerializerMethodField()

# Machine prediction fields
best_machine_prediction_name = serializers.CharField(allow_null=True, default=None)
best_machine_prediction_algorithm = serializers.CharField(allow_null=True, default=None)
best_machine_prediction_score = serializers.FloatField(allow_null=True, default=None)

# Verification fields
verified_by = serializers.SerializerMethodField()
verified_by_count = serializers.IntegerField(default=0)
agreed_with_algorithm = serializers.SerializerMethodField()
determination_matches_machine_prediction = serializers.SerializerMethodField()

# Detection fields
best_detection_url = serializers.SerializerMethodField()
best_detection_bbox = serializers.SerializerMethodField()
best_detection_width = serializers.SerializerMethodField()
best_detection_height = serializers.SerializerMethodField()
best_detection_source_image_url = serializers.SerializerMethodField()
best_detection_occurrence_url = serializers.SerializerMethodField()

class Meta:
model = Occurrence
Expand All @@ -111,29 +126,56 @@ class Meta:
"determination_name",
"determination_score",
"verification_status",
"best_machine_prediction_name",
"best_machine_prediction_algorithm",
"best_machine_prediction_score",
"verified_by",
"verified_by_count",
"agreed_with_algorithm",
"determination_matches_machine_prediction",
"detections_count",
"first_appearance_timestamp",
"last_appearance_timestamp",
"duration",
"best_detection_url",
"best_detection_bbox",
"best_detection_width",
"best_detection_height",
"best_detection_source_image_url",
"best_detection_occurrence_url",
]

def get_verification_status(self, obj):
"""
Returns 'Verified' if the occurrence has identifications, otherwise 'Not verified'.
"""
return "Verified" if obj.identifications.exists() else "Not verified"
"""Returns 'Verified' if the occurrence has non-withdrawn identifications."""
count = getattr(obj, "verified_by_count", None)
if count is not None:
return "Verified" if count > 0 else "Not verified"
return "Verified" if obj.identifications.filter(withdrawn=False).exists() else "Not verified"
Comment thread
mihow marked this conversation as resolved.
Outdated

def get_verified_by(self, obj):
"""Returns the display name of the user who made the best identification."""
return getattr(obj, "verified_by_name", None)

def get_agreed_with_algorithm(self, obj):
"""Returns the algorithm name if the identifier explicitly agreed with an ML prediction."""
return getattr(obj, "agreed_with_algorithm_name", None)

def get_determination_matches_machine_prediction(self, obj):
Comment thread
mihow marked this conversation as resolved.
"""Returns whether the determination taxon matches the best machine prediction taxon."""
prediction_taxon_id = getattr(obj, "best_machine_prediction_taxon_id", None)
if prediction_taxon_id is None or obj.determination_id is None:
return None
return obj.determination_id == prediction_taxon_id

def get_best_detection_url(self, obj):
"""
Returns the full URL to the cropped detection image.
Uses the annotated best_detection_path from the queryset.
"""
"""Returns the full URL to the cropped detection image."""
path = getattr(obj, "best_detection_path", None)
return get_media_url(path) if path else None

def get_best_detection_bbox(self, obj):
"""Returns the raw bounding box coordinates [x1, y1, x2, y2]."""
return getattr(obj, "best_detection_bbox", None)

def get_best_detection_width(self, obj):
"""Returns the width of the detection bounding box."""
bbox = BoundingBox.from_coords(getattr(obj, "best_detection_bbox", None), raise_on_error=False)
Expand All @@ -144,6 +186,28 @@ def get_best_detection_height(self, obj):
bbox = BoundingBox.from_coords(getattr(obj, "best_detection_bbox", None), raise_on_error=False)
return bbox.height if bbox else None

def get_best_detection_source_image_url(self, obj):
"""Returns the public URL to the original source image."""
path = getattr(obj, "best_detection_source_image_path", None)
base_url = getattr(obj, "best_detection_source_image_public_base_url", None)
if path and base_url:
import urllib.parse

return urllib.parse.urljoin(base_url, path.lstrip("/"))
Comment thread
mihow marked this conversation as resolved.
Outdated
return None

def get_best_detection_occurrence_url(self, obj):
"""Returns the platform UI link to the occurrence in context."""
event_id = getattr(obj, "best_detection_event_id", None)
source_image_id = getattr(obj, "best_detection_source_image_id", None)
if event_id and source_image_id:
# @TODO use settings for base URL instead of hardcoding
return (
f"https://app.preview.insectai.org/sessions/{event_id}"
f"?capture={source_image_id}&occurrence={obj.pk}"
)
return None


class CSVExporter(BaseExporter):
"""Handles CSV export of occurrences."""
Expand All @@ -165,6 +229,8 @@ def get_queryset(self):
.with_detections_count()
.with_identifications()
.with_best_detection() # type: ignore[union-attr] Custom queryset method
.with_best_machine_prediction()
.with_verification_info()
)

def export(self):
Expand Down
186 changes: 185 additions & 1 deletion ami/exports/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from rest_framework.test import APIClient

from ami.exports.models import DataExport
from ami.main.models import Occurrence, SourceImageCollection
from ami.main.models import Detection, Identification, Occurrence, SourceImageCollection, Taxon
from ami.ml.models import Algorithm
from ami.tests.fixtures.main import (
create_captures,
create_occurrences,
Expand Down Expand Up @@ -302,3 +303,186 @@ def test_non_member_cannot_create_export(self):
self.non_member.has_perm(Project.Permissions.CREATE_DATA_EXPORT, self.project),
"Non-member should not have create_dataexport permission",
)


class ExportNewFieldsTest(TestCase):
"""Test the new machine prediction, verification, and detection fields in CSV exports."""

def setUp(self):
self.project, self.deployment = setup_test_project(reuse=False)
self.user = self.project.owner
self.user.name = "Test Verifier"
self.user.save()
self.client = APIClient()
self.client.force_authenticate(user=self.user)

create_captures(deployment=self.deployment, num_nights=1, images_per_night=4, interval_minutes=1)
group_images_into_events(self.deployment)
create_taxa(self.project)

# Create an algorithm for classifications
self.algorithm, _ = Algorithm.objects.get_or_create(
name="test-classifier",
defaults={"key": "test-classifier"},
)

# Create a second taxon for disagreement tests
self.taxa = list(Taxon.objects.filter(projects=self.project)[:2])
self.taxon_a = self.taxa[0]
if len(self.taxa) > 1:
self.taxon_b = self.taxa[1]
else:
self.taxon_b = Taxon.objects.create(name="Test Taxon B")
self.taxon_b.projects.add(self.project)

def _create_occurrence_with_prediction(self, taxon=None, score=0.85):
"""Create an occurrence with a single detection and ML classification."""
taxon = taxon or self.taxon_a
source_image = self.project.captures.first()
detection = Detection.objects.create(
source_image=source_image,
timestamp=source_image.timestamp,
bbox=[0.1, 0.1, 0.5, 0.5],
path="detections/test.jpg",
)
classification = detection.classifications.create(
taxon=taxon,
score=score,
timestamp=source_image.timestamp,
algorithm=self.algorithm,
terminal=True,
)
occurrence = detection.associate_new_occurrence()
return occurrence, classification

def _run_csv_export(self):
"""Run a CSV export and return the rows as a list of dicts."""
data_export = DataExport.objects.create(
user=self.user,
project=self.project,
format="occurrences_simple_csv",
job=None,
)
file_url = data_export.run_export()
self.assertIsNotNone(file_url)
file_path = file_url.replace("/media/", "")
with default_storage.open(file_path, "r") as f:
rows = list(csv.DictReader(f))
default_storage.delete(file_path)
return rows

def test_ml_prediction_only(self):
"""Occurrence with only ML prediction: machine prediction fields populated, verified_by null."""
occurrence, classification = self._create_occurrence_with_prediction()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
rows = self._run_csv_export()

row = next(r for r in rows if int(r["id"]) == occurrence.pk)
self.assertEqual(row["best_machine_prediction_name"], self.taxon_a.name)
self.assertEqual(row["best_machine_prediction_algorithm"], "test-classifier")
self.assertAlmostEqual(float(row["best_machine_prediction_score"]), 0.85, places=2)
self.assertEqual(row["verified_by"], "")
self.assertEqual(row["verified_by_count"], "0")

def test_ml_prediction_with_agreeing_human(self):
"""Human agrees with ML: verified_by set, determination_matches = True, determination_score = None."""
occurrence, classification = self._create_occurrence_with_prediction()

# Human agrees with the same taxon
Identification.objects.create(
user=self.user,
taxon=self.taxon_a,
occurrence=occurrence,
agreed_with_prediction=classification,
)

rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)

# Machine prediction fields still populated
self.assertEqual(row["best_machine_prediction_name"], self.taxon_a.name)
self.assertAlmostEqual(float(row["best_machine_prediction_score"]), 0.85, places=2)

# Verification fields
verified_by = row["verified_by"]
self.assertTrue(verified_by, "verified_by should not be empty")
self.assertEqual(row["verified_by_count"], "1")
self.assertEqual(row["agreed_with_algorithm"], "test-classifier")
self.assertEqual(row["determination_matches_machine_prediction"], "True")

# determination_score should be empty/None for human-determined occurrences
self.assertIn(row["determination_score"], ["", "None", None])

def test_ml_prediction_with_disagreeing_human(self):
"""Human disagrees with ML: different determination, determination_matches = False."""
occurrence, classification = self._create_occurrence_with_prediction(taxon=self.taxon_a)

# Human identifies as a different taxon
Identification.objects.create(
user=self.user,
taxon=self.taxon_b,
occurrence=occurrence,
)

rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)

# Machine prediction still shows original
self.assertEqual(row["best_machine_prediction_name"], self.taxon_a.name)
# Determination is now the human's choice
self.assertEqual(row["determination_name"], self.taxon_b.name)
self.assertEqual(row["determination_matches_machine_prediction"], "False")
self.assertEqual(row["agreed_with_algorithm"], "")

def test_multiple_identifications_count(self):
"""Multiple identifications: verified_by_count reflects all non-withdrawn IDs."""
occurrence, _ = self._create_occurrence_with_prediction()

from ami.users.models import User

user2 = User.objects.create_user(email="verifier2@test.org")

Identification.objects.create(user=self.user, taxon=self.taxon_a, occurrence=occurrence)
Identification.objects.create(user=user2, taxon=self.taxon_a, occurrence=occurrence)

rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)
self.assertEqual(row["verified_by_count"], "2")

def test_detection_bbox_field(self):
"""Best detection bbox is included in export."""
occurrence, _ = self._create_occurrence_with_prediction()
rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)
self.assertIn("best_detection_bbox", row)
# bbox should be a string representation of the list
self.assertIn("0.1", row["best_detection_bbox"])

def test_csv_has_all_new_fields(self):
"""All new fields are present as CSV column headers."""
self._create_occurrence_with_prediction()
rows = self._run_csv_export()
self.assertGreater(len(rows), 0)
headers = rows[0].keys()
expected_fields = [
"best_machine_prediction_name",
"best_machine_prediction_algorithm",
"best_machine_prediction_score",
"verified_by",
"verified_by_count",
"agreed_with_algorithm",
"determination_matches_machine_prediction",
"best_detection_bbox",
"best_detection_source_image_url",
"best_detection_occurrence_url",
]
for field in expected_fields:
self.assertIn(field, headers, f"Missing CSV field: {field}")

def test_occurrence_url_field(self):
"""best_detection_occurrence_url contains a valid platform link."""
occurrence, _ = self._create_occurrence_with_prediction()
rows = self._run_csv_export()
row = next(r for r in rows if int(r["id"]) == occurrence.pk)
url = row.get("best_detection_occurrence_url", "")
if url:
self.assertIn(str(occurrence.pk), url)
29 changes: 29 additions & 0 deletions ami/main/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,7 @@ class OccurrenceListSerializer(DefaultSerializer):
event = EventNestedSerializer(read_only=True)
# first_appearance = TaxonSourceImageNestedSerializer(read_only=True)
determination_details = serializers.SerializerMethodField()
best_machine_prediction = serializers.SerializerMethodField()
identifications = OccurrenceIdentificationSerializer(many=True, read_only=True)

def get_permissions(self, instance, instance_data):
Expand Down Expand Up @@ -1357,6 +1358,7 @@ class Meta:
"detection_images",
"determination_score",
"determination_details",
"best_machine_prediction",
"identifications",
"created_at",
"updated_at",
Expand Down Expand Up @@ -1391,6 +1393,33 @@ def get_determination_details(self, obj: Occurrence):
score=obj.determination_score,
)

def get_best_machine_prediction(self, obj: Occurrence):
"""Always return the best machine prediction, regardless of human verification status."""
context = self.context
context["occurrence"] = obj

prediction = obj.best_prediction
if not prediction:
return None

taxon_data = TaxonNestedSerializer(prediction.taxon, context=context).data if prediction.taxon else None
algorithm_data = None
if prediction.algorithm:
from ami.ml.serializers import AlgorithmNestedSerializer

algorithm_data = AlgorithmNestedSerializer(prediction.algorithm, context=context).data

determination_matches = None
if obj.determination_id and prediction.taxon_id:
determination_matches = obj.determination_id == prediction.taxon_id

return dict(
taxon=taxon_data,
algorithm=algorithm_data,
score=prediction.score,
determination_matches_machine_prediction=determination_matches,
)
Comment thread
mihow marked this conversation as resolved.
Outdated
Comment thread
coderabbitai[bot] marked this conversation as resolved.


class OccurrenceSerializer(OccurrenceListSerializer):
determination = CaptureTaxonSerializer(read_only=True)
Expand Down
Loading
Loading