diff --git a/.gitignore b/.gitignore index 7b9c200..a3d0718 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ node_modules/ # Experiment results (promote notable runs to tests/benchmarks/results/) /tests/benchmarks/experiments/results_*/ +data/ # Recall Quality Lab data (snapshots, test results) /lab/snapshots/ diff --git a/automem/api/memory.py b/automem/api/memory.py index bc696bc..8d6031a 100644 --- a/automem/api/memory.py +++ b/automem/api/memory.py @@ -13,8 +13,12 @@ MEMORY_AUTO_SUMMARIZE, MEMORY_CONTENT_HARD_LIMIT, MEMORY_CONTENT_SOFT_LIMIT, + MEMORY_DEDUP_ENABLED, + MEMORY_DEDUP_MODEL, + MEMORY_DEDUP_SIMILARITY_THRESHOLD, MEMORY_SUMMARY_TARGET_LENGTH, ) +from automem.dedup import check_dedup from automem.utils.text import should_summarize_content, summarize_content @@ -337,6 +341,159 @@ def store() -> Any: else: last_accessed = updated_at + # ── Write-time dedup gate ────────────────────────────────── + # Check for semantically similar existing memories and let an LLM + # decide: ADD (store new), UPDATE (merge into existing), + # SUPERSEDE (replace existing), or NOOP (skip). + skip_dedup = payload.get("skip_dedup", False) + dedup_result = None + if MEMORY_DEDUP_ENABLED and not skip_dedup: + qdrant_client = get_qdrant_client() + openai_client = get_openai_client() if get_openai_client else None + if qdrant_client and openai_client: + dedup_result = check_dedup( + new_content=content, + generate_embedding=generate_real_embedding, + qdrant_client=qdrant_client, + collection_name=collection_name, + openai_client=openai_client, + model=MEMORY_DEDUP_MODEL, + similarity_threshold=MEMORY_DEDUP_SIMILARITY_THRESHOLD, + ) + + if dedup_result["action"] == "NOOP": + query_ms = (time.perf_counter() - query_start) * 1000 + return ( + jsonify( + { + "status": "skipped", + "reason": "dedup_noop", + "detail": dedup_result.get("reason", "duplicate"), + "candidates": dedup_result.get("candidates", []), + "query_time_ms": round(query_ms, 2), + } + ), + 200, + ) + + if dedup_result["action"] == "UPDATE" and dedup_result.get("target_id"): + # Merge into the existing memory instead of creating a new one. + # Rewrite memory_id to target the existing one, and use merged content. + target_id = dedup_result["target_id"] + merged = dedup_result.get("merged_content", content) + # Update the existing memory in FalkorDB + try: + graph.query( + """ + MATCH (m:Memory {id: $id}) + SET m.content = $content, + m.updated_at = $updated_at, + m.last_accessed = $last_accessed, + m.importance = CASE WHEN $importance > m.importance + THEN $importance ELSE m.importance END + RETURN m + """, + { + "id": target_id, + "content": merged, + "updated_at": utc_now(), + "last_accessed": utc_now(), + "importance": importance, + }, + ) + except Exception: + logger.exception("Failed to UPDATE existing memory %s, falling back to ADD", target_id) + dedup_result["action"] = "ADD" + else: + # Update Qdrant with new embedding + merged payload + qdrant_cl = get_qdrant_client() + if qdrant_cl: + try: + # Fetch existing payload to preserve tags/metadata not in request + existing_payload = {} + try: + existing_points = qdrant_cl.retrieve( + collection_name=collection_name, + ids=[target_id], + with_payload=True, + ) + if existing_points: + existing_payload = existing_points[0].payload or {} + except Exception: + logger.warning("Could not fetch existing payload for %s", target_id) + + new_emb = generate_real_embedding(merged) + if new_emb: + # Preserve existing fields, override only what's explicitly provided + merged_payload = { + **existing_payload, + "content": merged, + "importance": importance, + "updated_at": utc_now(), + "last_accessed": utc_now(), + } + # Only override tags/metadata if explicitly provided in request + if tags: + merged_payload["tags"] = tags + merged_payload["tag_prefixes"] = tag_prefixes + if metadata: + merged_payload["metadata"] = metadata + if memory_type: + merged_payload["type"] = memory_type + merged_payload["confidence"] = type_confidence + + qdrant_cl.upsert( + collection_name=collection_name, + points=[ + point_struct( + id=target_id, + vector=new_emb, + payload=merged_payload, + ) + ], + ) + except Exception: + logger.warning("Failed to update Qdrant for merged memory %s", target_id) + + query_ms = (time.perf_counter() - query_start) * 1000 + return ( + jsonify( + { + "status": "updated", + "memory_id": target_id, + "dedup_action": "UPDATE", + "merged_content": merged, + "query_time_ms": round(query_ms, 2), + } + ), + 200, + ) + + if dedup_result["action"] == "SUPERSEDE" and dedup_result.get("target_id"): + # Delete the old memory, then store the new one normally below + old_id = dedup_result["target_id"] + try: + graph.query("MATCH (m:Memory {id: $id}) DETACH DELETE m", {"id": old_id}) + except Exception: + logger.warning("Failed to delete superseded memory %s", old_id) + qdrant_cl = get_qdrant_client() + if qdrant_cl: + try: + selector = {"points": [old_id]} + try: + from qdrant_client.http import models as http_models + selector = http_models.PointIdsList(points=[old_id]) + except Exception: + pass + qdrant_cl.delete( + collection_name=collection_name, + points_selector=selector, + ) + except Exception: + logger.warning("Failed to delete superseded memory from Qdrant %s", old_id) + + # For ADD and SUPERSEDE, fall through to normal store below + try: graph.query( """ @@ -459,6 +616,12 @@ def store() -> Any: response["original_length"] = len(original_content) response["summarized_length"] = len(content) + # Include dedup info if gate ran + if dedup_result: + response["dedup_action"] = dedup_result["action"] + if dedup_result["action"] == "SUPERSEDE": + response["superseded_id"] = dedup_result.get("target_id") + logger.info( "memory_stored", extra={ diff --git a/automem/config.py b/automem/config.py index cbeb508..3f7ee9c 100644 --- a/automem/config.py +++ b/automem/config.py @@ -131,6 +131,17 @@ # Target length for summarized content MEMORY_SUMMARY_TARGET_LENGTH = int(os.getenv("MEMORY_SUMMARY_TARGET_LENGTH", "300")) +# Write-time deduplication gate (LLM-based ADD/UPDATE/SUPERSEDE/NOOP) +MEMORY_DEDUP_ENABLED = os.getenv("MEMORY_DEDUP_ENABLED", "false").lower() not in { + "0", + "false", + "no", +} +MEMORY_DEDUP_MODEL = os.getenv("MEMORY_DEDUP_MODEL", "gpt-4o-mini") +MEMORY_DEDUP_SIMILARITY_THRESHOLD = float( + os.getenv("MEMORY_DEDUP_SIMILARITY_THRESHOLD", "0.70") +) + # Memory types for classification MEMORY_TYPES = {"Decision", "Pattern", "Preference", "Style", "Habit", "Insight", "Context"} diff --git a/automem/dedup.py b/automem/dedup.py new file mode 100644 index 0000000..0ea66d7 --- /dev/null +++ b/automem/dedup.py @@ -0,0 +1,197 @@ +"""Write-time deduplication gate for AutoMem. + +Before storing a new memory, checks for semantically similar existing memories +and uses an LLM to classify the appropriate action: + +- ADD: Genuinely new information, store normally +- UPDATE: Refines/adds detail to an existing memory, merge into it +- SUPERSEDE: Replaces an outdated memory (delete old, store new) +- NOOP: Already known, skip entirely + +Inspired by Helixir's decision engine approach. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Callable, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# Minimum vector similarity to even consider dedup (below this, always ADD) +SIMILARITY_THRESHOLD = 0.70 + +# Maximum candidates to evaluate +MAX_CANDIDATES = 3 + +DEDUP_PROMPT = """You are a memory deduplication system. Given a NEW memory and EXISTING memories, decide what to do. + +NEW MEMORY: +{new_content} + +EXISTING MEMORIES: +{existing_memories} + +For each existing memory, decide the relationship to the new memory. Then output ONE action for the new memory: + +- ADD: The new memory contains genuinely new information not covered by any existing memory. Store it. +- UPDATE : The new memory refines, corrects, or adds meaningful detail to an existing memory. The existing memory should be updated to incorporate the new information. Output the merged content. +- SUPERSEDE : The new memory replaces an outdated existing memory (e.g., a decision changed, a status updated). The old one should be deleted and the new one stored. +- NOOP: The new memory is already fully covered by existing memories. Skip it. + +Rules: +- If the new memory has ANY meaningful new information beyond what exists, prefer ADD or UPDATE over NOOP. +- UPDATE means the existing memory's content should be expanded/corrected. Provide the merged text. +- SUPERSEDE means the old memory is wrong/outdated and should be replaced entirely. +- NOOP only if the new memory is truly redundant — same facts, same level of detail. +- When in doubt, ADD. False negatives (storing a near-dupe) are less harmful than false positives (losing information). + +Respond with ONLY valid JSON: +{{"action": "ADD"}} +or +{{"action": "UPDATE", "target_id": "", "merged_content": ""}} +or +{{"action": "SUPERSEDE", "target_id": ""}} +or +{{"action": "NOOP", "reason": ""}}""" + + +def check_dedup( + new_content: str, + generate_embedding: Callable[[str], List[float]], + qdrant_client: Any, + collection_name: str, + openai_client: Any, + model: str = "gpt-4o-mini", + similarity_threshold: float = SIMILARITY_THRESHOLD, +) -> Dict[str, Any]: + """Check if a new memory should be added, merged, or skipped. + + Returns: + Dict with keys: + - action: "ADD" | "UPDATE" | "SUPERSEDE" | "NOOP" + - target_id: (for UPDATE/SUPERSEDE) the existing memory ID to modify + - merged_content: (for UPDATE) the merged text + - reason: (for NOOP) why it was skipped + - candidates: list of similar memories found (for debugging) + """ + result: Dict[str, Any] = {"action": "ADD", "candidates": []} + + if not qdrant_client or not openai_client: + return result + + # Step 1: Generate embedding for the new content + try: + embedding = generate_embedding(new_content) + except Exception: + logger.warning("Failed to generate embedding for dedup check, defaulting to ADD") + return result + + if not embedding: + return result + + # Step 2: Search for similar existing memories + try: + search_results = qdrant_client.search( + collection_name=collection_name, + query_vector=embedding, + limit=MAX_CANDIDATES, + score_threshold=similarity_threshold, + ) + except Exception: + logger.warning("Qdrant search failed during dedup check, defaulting to ADD") + return result + + if not search_results: + return result + + # Step 3: Format candidates for LLM + candidates = [] + for hit in search_results: + payload = hit.payload or {} + candidates.append( + { + "id": str(hit.id), + "content": payload.get("content", ""), + "score": round(hit.score, 3), + "type": payload.get("type", ""), + "importance": payload.get("importance", 0), + } + ) + + result["candidates"] = candidates + + existing_text = "\n\n".join( + f"[ID: {c['id']}] (similarity: {c['score']})\n{c['content']}" + for c in candidates + ) + + # Step 4: Ask LLM to classify + prompt = DEDUP_PROMPT.format( + new_content=new_content, + existing_memories=existing_text, + ) + + try: + response = openai_client.chat.completions.create( + model=model, + messages=[{"role": "user", "content": prompt}], + temperature=0, + max_tokens=1000, + response_format={"type": "json_object"}, + ) + + raw = response.choices[0].message.content + if not raw: + logger.warning("LLM returned empty content for dedup, defaulting to ADD") + return result + raw = raw.strip() + decision = json.loads(raw) + + action = decision.get("action", "ADD").upper() + if action not in ("ADD", "UPDATE", "SUPERSEDE", "NOOP"): + action = "ADD" + + result["action"] = action + + # Validate target_id against actual candidates to prevent LLM hallucination + candidate_ids = {c["id"] for c in candidates} + target_id = decision.get("target_id", "") + if target_id and target_id not in candidate_ids: + logger.warning( + "Dedup target_id %s not in candidates %s, falling back to ADD", + target_id, candidate_ids, + ) + result["action"] = "ADD" + return result + + if action == "UPDATE": + result["target_id"] = decision.get("target_id", "") + result["merged_content"] = decision.get("merged_content", "") + if not result["target_id"] or not result["merged_content"]: + # Invalid UPDATE response, fall back to ADD + result["action"] = "ADD" + logger.warning("LLM returned UPDATE without target_id or merged_content, falling back to ADD") + + elif action == "SUPERSEDE": + result["target_id"] = decision.get("target_id", "") + if not result["target_id"]: + result["action"] = "ADD" + logger.warning("LLM returned SUPERSEDE without target_id, falling back to ADD") + + elif action == "NOOP": + result["reason"] = decision.get("reason", "duplicate") + + logger.info( + "Dedup decision: %s (candidates: %d, top_score: %.3f)", + result["action"], + len(candidates), + candidates[0]["score"] if candidates else 0, + ) + + except Exception: + logger.warning("LLM dedup classification failed, defaulting to ADD", exc_info=True) + result["action"] = "ADD" + + return result