Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions mcp_server/src/graphiti_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional

Expand Down Expand Up @@ -326,6 +327,7 @@ async def add_memory(
source: str = 'text',
source_description: str = '',
uuid: str | None = None,
reference_time: str | None = None,
) -> SuccessResponse | ErrorResponse:
"""Add an episode to memory. This is the primary way to add information to the graph.

Expand All @@ -345,6 +347,8 @@ async def add_memory(
- 'message': For conversation-style content
source_description (str, optional): Description of the source
uuid (str, optional): Optional UUID for the episode
reference_time (str, optional): ISO 8601 timestamp for the episode (e.g. "2024-01-15T12:00:00Z").
Sets the temporal reference point for extracted facts. Defaults to current UTC time.

Examples:
# Adding plain text content
Expand Down Expand Up @@ -384,6 +388,14 @@ async def add_memory(
logger.warning(f"Unknown source type '{source}', using 'text' as default")
episode_type = EpisodeType.text

# Parse reference_time if provided
parsed_reference_time = None
if reference_time:
try:
parsed_reference_time = datetime.fromisoformat(reference_time.replace("Z", "+00:00"))
except (ValueError, AttributeError) as e:
logger.warning(f"Invalid reference_time '{reference_time}': {e}. Using current time.")

# Submit to queue service for async processing
await queue_service.add_episode(
group_id=effective_group_id,
Expand All @@ -393,6 +405,7 @@ async def add_memory(
episode_type=episode_type,
entity_types=graphiti_service.entity_types,
uuid=uuid or None, # Ensure None is passed if uuid is None
reference_time=parsed_reference_time,
)

return SuccessResponse(
Expand Down
8 changes: 6 additions & 2 deletions mcp_server/src/services/queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ async def add_episode(
episode_type: Any,
entity_types: Any,
uuid: str | None,
reference_time: datetime | None = None,
) -> int:
"""Add an episode for processing.

Expand All @@ -118,17 +119,20 @@ async def add_episode(
episode_type: Type of the episode
entity_types: Entity types for extraction
uuid: Episode UUID
reference_time: Optional timestamp for the episode. If None, uses current UTC time.

Returns:
The position in the queue
"""
if self._graphiti_client is None:
raise RuntimeError('Queue service not initialized. Call initialize() first.')

effective_reference_time = reference_time or datetime.now(timezone.utc)

async def process_episode():
"""Process the episode using the graphiti client."""
try:
logger.info(f'Processing episode {uuid} for group {group_id}')
logger.info(f'Processing episode {uuid} for group {group_id} (ref_time={effective_reference_time.isoformat()})')

# Process the episode using the graphiti client
await self._graphiti_client.add_episode(
Expand All @@ -137,7 +141,7 @@ async def process_episode():
source_description=source_description,
source=episode_type,
group_id=group_id,
reference_time=datetime.now(timezone.utc),
reference_time=effective_reference_time,
entity_types=entity_types,
uuid=uuid,
)
Expand Down
Loading