Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/src/ai/agent/api/convert_conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub fn convert_conversation_data_to_ai_conversation(
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: None,
autoexecute_override: None,
last_event_sequence: None,
Expand All @@ -97,6 +98,7 @@ pub fn convert_conversation_data_to_ai_conversation(
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
// TODO: Populate run_id from server metadata once it is exposed
// in ServerAIConversationMetadata. For cloud conversations that
// were spawned via the server API, the run_id is created at task
Expand Down
9 changes: 7 additions & 2 deletions app/src/ai/agent/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ impl AIConversation {
parent_agent_id,
agent_name,
parent_conversation_id,
is_remote_child,
run_id,
autoexecute_override,
last_event_sequence,
Expand All @@ -380,6 +381,7 @@ impl AIConversation {
let parent_conversation_id = data
.parent_conversation_id
.and_then(|id| AIConversationId::try_from(id).ok());
let is_remote_child = data.is_remote_child;
let run_id = data.run_id;
let autoexecute_override = if FeatureFlag::RememberFastForwardState.is_enabled() {
data.autoexecute_override
Expand All @@ -399,6 +401,7 @@ impl AIConversation {
parent_agent_id,
agent_name,
parent_conversation_id,
is_remote_child,
run_id,
autoexecute_override,
last_event_sequence,
Expand All @@ -413,6 +416,7 @@ impl AIConversation {
None,
None,
None,
false,
None,
AIConversationAutoexecuteMode::default(),
None,
Expand Down Expand Up @@ -457,7 +461,7 @@ impl AIConversation {
parent_agent_id,
agent_name,
parent_conversation_id,
is_remote_child: false,
is_remote_child,
last_event_sequence,
})
}
Expand Down Expand Up @@ -809,7 +813,7 @@ impl AIConversation {

/// Returns true if this conversation was spawned by a parent orchestrator agent.
pub fn is_child_agent_conversation(&self) -> bool {
self.parent_conversation_id.is_some()
self.parent_conversation_id.is_some() || self.parent_agent_id.is_some()
}

/// Returns true if this is a placeholder for a child agent executing on a
Expand Down Expand Up @@ -2830,6 +2834,7 @@ impl AIConversation {
parent_agent_id: self.parent_agent_id.clone(),
agent_name: self.agent_name.clone(),
parent_conversation_id: self.parent_conversation_id.map(|id| id.to_string()),
is_remote_child: self.is_remote_child,
run_id: self.task_id.map(|id| id.to_string()),
autoexecute_override: Some(self.autoexecute_override.into()),
last_event_sequence: self.last_event_sequence,
Expand Down
35 changes: 35 additions & 0 deletions app/src/ai/agent/conversation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,41 @@ fn restored_conversation_defaults_autoexecute_override_when_not_persisted() {
);
}

#[test]
fn restored_conversation_uses_persisted_last_event_sequence() {
let conversation_data: AgentConversationData =
serde_json::from_str(r#"{"server_conversation_token":null,"last_event_sequence":42}"#)
.unwrap();

let conversation = restored_conversation(Some(conversation_data));

assert_eq!(conversation.last_event_sequence(), Some(42));
}

#[test]
fn restored_conversation_uses_persisted_remote_child_marker() {
let conversation_data: AgentConversationData =
serde_json::from_str(r#"{"server_conversation_token":null,"is_remote_child":true}"#)
.unwrap();

let conversation = restored_conversation(Some(conversation_data));

assert!(conversation.is_remote_child());
}

#[test]
fn child_conversation_detection_uses_parent_agent_id() {
let conversation_data: AgentConversationData = serde_json::from_str(
r#"{"server_conversation_token":null,"parent_agent_id":"parent-run-id"}"#,
)
.unwrap();

let conversation = restored_conversation(Some(conversation_data));

assert!(conversation.is_child_agent_conversation());
assert_eq!(conversation.parent_conversation_id(), None);
}

#[test]
fn restored_conversation_defaults_unknown_persisted_autoexecute_override() {
let _flag = FeatureFlag::RememberFastForwardState.override_enabled(true);
Expand Down
7 changes: 7 additions & 0 deletions app/src/ai/agent_conversations_model_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ fn test_display_status_uses_matching_conversation_for_in_progress_task() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -204,6 +205,7 @@ fn test_display_status_updates_when_blocked_conversation_resumes() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -280,6 +282,7 @@ fn test_display_status_terminal_task_state_overrides_matching_conversation() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -332,6 +335,7 @@ fn test_status_filter_uses_display_status_for_task_backed_conversations() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -767,6 +771,7 @@ fn test_get_tasks_and_conversations_prefers_task_when_task_id_matches_conversati
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -824,6 +829,7 @@ fn test_get_tasks_and_conversations_prefers_task_when_server_token_matches() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: None,
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -880,6 +886,7 @@ fn test_get_tasks_and_conversations_keeps_unrelated_tasks_and_conversations() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: None,
autoexecute_override: None,
last_event_sequence: None,
Expand Down
63 changes: 51 additions & 12 deletions app/src/ai/agent_events/message_hydrator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use crate::ai::ambient_agents::AmbientAgentTaskId;
use anyhow::{anyhow, Context, Result};
#[cfg(not(target_family = "wasm"))]
use futures::future::Either;
Expand All @@ -9,6 +10,7 @@ use warpui::r#async::Timer;

use crate::ai::agent::ReceivedMessageInput;
use crate::server::server_api::ai::{AIClient, AgentRunEvent, ReadAgentMessageResponse};
use crate::server::server_api::ServerApi;

pub(crate) const DEFAULT_AGENT_MESSAGE_FETCH_TIMEOUT: Duration = Duration::from_secs(5);

Expand All @@ -17,6 +19,8 @@ pub(crate) const DEFAULT_AGENT_MESSAGE_FETCH_TIMEOUT: Duration = Duration::from_
#[derive(Clone)]
pub(crate) struct MessageHydrator {
ai_client: Arc<dyn AIClient>,
task_scoped_server_api: Option<Arc<ServerApi>>,
task_id: Option<AmbientAgentTaskId>,
#[cfg_attr(target_family = "wasm", allow(dead_code))]
fetch_timeout: Duration,
}
Expand All @@ -26,16 +30,40 @@ impl MessageHydrator {
Self::with_fetch_timeout(ai_client, DEFAULT_AGENT_MESSAGE_FETCH_TIMEOUT)
}

pub(crate) fn for_task(server_api: Arc<ServerApi>, task_id: AmbientAgentTaskId) -> Self {
let ai_client: Arc<dyn AIClient> = server_api.clone();
Self {
ai_client,
task_scoped_server_api: Some(server_api),
task_id: Some(task_id),
fetch_timeout: DEFAULT_AGENT_MESSAGE_FETCH_TIMEOUT,
}
}

pub(crate) fn with_fetch_timeout(
ai_client: Arc<dyn AIClient>,
fetch_timeout: Duration,
) -> Self {
Self {
ai_client,
task_scoped_server_api: None,
task_id: None,
fetch_timeout,
}
}

async fn read_message(&self, message_id: &str) -> Result<ReadAgentMessageResponse> {
match (self.task_scoped_server_api.as_ref(), self.task_id) {
(Some(server_api), Some(task_id)) => {
server_api
.read_agent_message_for_task(&task_id, message_id)
.await
}
_ => self.ai_client.read_agent_message(message_id).await,
}
.with_context(|| format!("Failed to read agent message {message_id}"))
}

pub(crate) async fn hydrate_event_for_recipient(
&self,
event: &AgentRunEvent,
Expand All @@ -55,6 +83,17 @@ impl MessageHydrator {
return None;
}
};
if message.body.is_empty() {
log::warn!(
"Hydrated empty-body agent message: message_id={} event_sequence={} recipient_run_id={} sender_run_id={} subject={:?} task_id={:?}",
message.message_id,
event.sequence,
recipient_run_id,
message.sender_run_id,
message.subject,
self.task_id.map(|task_id| task_id.to_string())
);
}

Some(ReceivedMessageInput {
message_id: message.message_id,
Expand All @@ -70,15 +109,13 @@ impl MessageHydrator {
&self,
message_id: &str,
) -> Result<ReadAgentMessageResponse> {
let read_message = self.ai_client.read_agent_message(message_id);
let read_message = self.read_message(message_id);
let timeout = Timer::after(self.fetch_timeout);
futures::pin_mut!(read_message);
futures::pin_mut!(timeout);

match futures::future::select(read_message, timeout).await {
Either::Left((result, _)) => {
result.with_context(|| format!("Failed to read agent message {message_id}"))
}
Either::Left((result, _)) => result,
Either::Right(_) => Err(anyhow!("Timed out reading agent message {message_id}")),
}
}
Expand All @@ -88,10 +125,7 @@ impl MessageHydrator {
&self,
message_id: &str,
) -> Result<ReadAgentMessageResponse> {
self.ai_client
.read_agent_message(message_id)
.await
.with_context(|| format!("Failed to read agent message {message_id}"))
self.read_message(message_id).await
}

pub(crate) async fn read_message_from_event_with_timeout(
Expand All @@ -105,10 +139,15 @@ impl MessageHydrator {
}

pub(crate) async fn mark_message_delivered(&self, message_id: &str) -> Result<()> {
self.ai_client
.mark_message_delivered(message_id)
.await
.with_context(|| format!("Failed to mark agent message {message_id} as delivered"))
match (self.task_scoped_server_api.as_ref(), self.task_id) {
(Some(server_api), Some(task_id)) => {
server_api
.mark_message_delivered_for_task(&task_id, message_id)
.await
}
_ => self.ai_client.mark_message_delivered(message_id).await,
}
.with_context(|| format!("Failed to mark agent message {message_id} as delivered"))
}

pub(crate) async fn mark_messages_delivered_best_effort<'a, I>(
Expand Down
Loading
Loading