Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ tikv-jemallocator = { version = "0.6", optional = true, features = [
"override_allocator_on_supported_platforms",
] }
toml = "0.8.13"
toml_edit.workspace = true
tracing.workspace = true
ui_components.workspace = true
unicase = "2.7.0"
Expand Down
2 changes: 2 additions & 0 deletions app/src/ai/agent/api/convert_conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub fn convert_conversation_data_to_ai_conversation(
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: None,
autoexecute_override: None,
last_event_sequence: None,
Expand All @@ -97,6 +98,7 @@ pub fn convert_conversation_data_to_ai_conversation(
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
// TODO: Populate run_id from server metadata once it is exposed
// in ServerAIConversationMetadata. For cloud conversations that
// were spawned via the server API, the run_id is created at task
Expand Down
10 changes: 8 additions & 2 deletions app/src/ai/agent/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ impl AIConversation {
parent_agent_id,
agent_name,
parent_conversation_id,
is_remote_child,
run_id,
autoexecute_override,
last_event_sequence,
Expand All @@ -380,6 +381,7 @@ impl AIConversation {
let parent_conversation_id = data
.parent_conversation_id
.and_then(|id| AIConversationId::try_from(id).ok());
let is_remote_child = data.is_remote_child;
let run_id = data.run_id;
let autoexecute_override = if FeatureFlag::RememberFastForwardState.is_enabled() {
data.autoexecute_override
Expand All @@ -399,6 +401,7 @@ impl AIConversation {
parent_agent_id,
agent_name,
parent_conversation_id,
is_remote_child,
run_id,
autoexecute_override,
last_event_sequence,
Expand All @@ -413,6 +416,7 @@ impl AIConversation {
None,
None,
None,
false,
None,
AIConversationAutoexecuteMode::default(),
None,
Expand Down Expand Up @@ -457,7 +461,7 @@ impl AIConversation {
parent_agent_id,
agent_name,
parent_conversation_id,
is_remote_child: false,
is_remote_child,
last_event_sequence,
})
}
Expand Down Expand Up @@ -809,7 +813,7 @@ impl AIConversation {

/// Returns true if this conversation was spawned by a parent orchestrator agent.
pub fn is_child_agent_conversation(&self) -> bool {
self.parent_conversation_id.is_some()
self.parent_conversation_id.is_some() || self.parent_agent_id.is_some()
}

/// Returns true if this is a placeholder for a child agent executing on a
Expand Down Expand Up @@ -2830,6 +2834,7 @@ impl AIConversation {
parent_agent_id: self.parent_agent_id.clone(),
agent_name: self.agent_name.clone(),
parent_conversation_id: self.parent_conversation_id.map(|id| id.to_string()),
is_remote_child: self.is_remote_child,
run_id: self.task_id.map(|id| id.to_string()),
autoexecute_override: Some(self.autoexecute_override.into()),
last_event_sequence: self.last_event_sequence,
Expand Down Expand Up @@ -3516,6 +3521,7 @@ pub enum AIAgentHarness {
Oz,
ClaudeCode,
Gemini,
Codex,
Unknown,
}

Expand Down
35 changes: 35 additions & 0 deletions app/src/ai/agent/conversation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,41 @@ fn restored_conversation_defaults_autoexecute_override_when_not_persisted() {
);
}

#[test]
fn restored_conversation_uses_persisted_last_event_sequence() {
let conversation_data: AgentConversationData =
serde_json::from_str(r#"{"server_conversation_token":null,"last_event_sequence":42}"#)
.unwrap();

let conversation = restored_conversation(Some(conversation_data));

assert_eq!(conversation.last_event_sequence(), Some(42));
}

#[test]
fn restored_conversation_uses_persisted_remote_child_marker() {
let conversation_data: AgentConversationData =
serde_json::from_str(r#"{"server_conversation_token":null,"is_remote_child":true}"#)
.unwrap();

let conversation = restored_conversation(Some(conversation_data));

assert!(conversation.is_remote_child());
}

#[test]
fn child_conversation_detection_uses_parent_agent_id() {
let conversation_data: AgentConversationData = serde_json::from_str(
r#"{"server_conversation_token":null,"parent_agent_id":"parent-run-id"}"#,
)
.unwrap();

let conversation = restored_conversation(Some(conversation_data));

assert!(conversation.is_child_agent_conversation());
assert_eq!(conversation.parent_conversation_id(), None);
}

#[test]
fn restored_conversation_defaults_unknown_persisted_autoexecute_override() {
let _flag = FeatureFlag::RememberFastForwardState.override_enabled(true);
Expand Down
7 changes: 7 additions & 0 deletions app/src/ai/agent_conversations_model_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ fn test_display_status_uses_matching_conversation_for_in_progress_task() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -204,6 +205,7 @@ fn test_display_status_updates_when_blocked_conversation_resumes() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -280,6 +282,7 @@ fn test_display_status_terminal_task_state_overrides_matching_conversation() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -332,6 +335,7 @@ fn test_status_filter_uses_display_status_for_task_backed_conversations() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -767,6 +771,7 @@ fn test_get_tasks_and_conversations_prefers_task_when_task_id_matches_conversati
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: Some(task_id.clone()),
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -824,6 +829,7 @@ fn test_get_tasks_and_conversations_prefers_task_when_server_token_matches() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: None,
autoexecute_override: None,
last_event_sequence: None,
Expand Down Expand Up @@ -880,6 +886,7 @@ fn test_get_tasks_and_conversations_keeps_unrelated_tasks_and_conversations() {
parent_agent_id: None,
agent_name: None,
parent_conversation_id: None,
is_remote_child: false,
run_id: None,
autoexecute_override: None,
last_event_sequence: None,
Expand Down
63 changes: 51 additions & 12 deletions app/src/ai/agent_events/message_hydrator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use crate::ai::ambient_agents::AmbientAgentTaskId;
use anyhow::{anyhow, Context, Result};
#[cfg(not(target_family = "wasm"))]
use futures::future::Either;
Expand All @@ -9,6 +10,7 @@ use warpui::r#async::Timer;

use crate::ai::agent::ReceivedMessageInput;
use crate::server::server_api::ai::{AIClient, AgentRunEvent, ReadAgentMessageResponse};
use crate::server::server_api::ServerApi;

pub(crate) const DEFAULT_AGENT_MESSAGE_FETCH_TIMEOUT: Duration = Duration::from_secs(5);

Expand All @@ -17,6 +19,8 @@ pub(crate) const DEFAULT_AGENT_MESSAGE_FETCH_TIMEOUT: Duration = Duration::from_
#[derive(Clone)]
pub(crate) struct MessageHydrator {
ai_client: Arc<dyn AIClient>,
task_scoped_server_api: Option<Arc<ServerApi>>,
task_id: Option<AmbientAgentTaskId>,
#[cfg_attr(target_family = "wasm", allow(dead_code))]
fetch_timeout: Duration,
}
Expand All @@ -26,16 +30,40 @@ impl MessageHydrator {
Self::with_fetch_timeout(ai_client, DEFAULT_AGENT_MESSAGE_FETCH_TIMEOUT)
}

pub(crate) fn for_task(server_api: Arc<ServerApi>, task_id: AmbientAgentTaskId) -> Self {
let ai_client: Arc<dyn AIClient> = server_api.clone();
Self {
ai_client,
task_scoped_server_api: Some(server_api),
task_id: Some(task_id),
fetch_timeout: DEFAULT_AGENT_MESSAGE_FETCH_TIMEOUT,
}
}

pub(crate) fn with_fetch_timeout(
ai_client: Arc<dyn AIClient>,
fetch_timeout: Duration,
) -> Self {
Self {
ai_client,
task_scoped_server_api: None,
task_id: None,
fetch_timeout,
}
}

async fn read_message(&self, message_id: &str) -> Result<ReadAgentMessageResponse> {
match (self.task_scoped_server_api.as_ref(), self.task_id) {
(Some(server_api), Some(task_id)) => {
server_api
.read_agent_message_for_task(&task_id, message_id)
.await
}
_ => self.ai_client.read_agent_message(message_id).await,
}
.with_context(|| format!("Failed to read agent message {message_id}"))
}

pub(crate) async fn hydrate_event_for_recipient(
&self,
event: &AgentRunEvent,
Expand All @@ -55,6 +83,17 @@ impl MessageHydrator {
return None;
}
};
if message.body.is_empty() {
log::warn!(
"Hydrated empty-body agent message: message_id={} event_sequence={} recipient_run_id={} sender_run_id={} subject={:?} task_id={:?}",
message.message_id,
event.sequence,
recipient_run_id,
message.sender_run_id,
message.subject,
self.task_id.map(|task_id| task_id.to_string())
);
}

Some(ReceivedMessageInput {
message_id: message.message_id,
Expand All @@ -70,15 +109,13 @@ impl MessageHydrator {
&self,
message_id: &str,
) -> Result<ReadAgentMessageResponse> {
let read_message = self.ai_client.read_agent_message(message_id);
let read_message = self.read_message(message_id);
let timeout = Timer::after(self.fetch_timeout);
futures::pin_mut!(read_message);
futures::pin_mut!(timeout);

match futures::future::select(read_message, timeout).await {
Either::Left((result, _)) => {
result.with_context(|| format!("Failed to read agent message {message_id}"))
}
Either::Left((result, _)) => result,
Either::Right(_) => Err(anyhow!("Timed out reading agent message {message_id}")),
}
}
Expand All @@ -88,10 +125,7 @@ impl MessageHydrator {
&self,
message_id: &str,
) -> Result<ReadAgentMessageResponse> {
self.ai_client
.read_agent_message(message_id)
.await
.with_context(|| format!("Failed to read agent message {message_id}"))
self.read_message(message_id).await
}

pub(crate) async fn read_message_from_event_with_timeout(
Expand All @@ -105,10 +139,15 @@ impl MessageHydrator {
}

pub(crate) async fn mark_message_delivered(&self, message_id: &str) -> Result<()> {
self.ai_client
.mark_message_delivered(message_id)
.await
.with_context(|| format!("Failed to mark agent message {message_id} as delivered"))
match (self.task_scoped_server_api.as_ref(), self.task_id) {
(Some(server_api), Some(task_id)) => {
server_api
.mark_message_delivered_for_task(&task_id, message_id)
.await
}
_ => self.ai_client.mark_message_delivered(message_id).await,
}
.with_context(|| format!("Failed to mark agent message {message_id} as delivered"))
}

pub(crate) async fn mark_messages_delivered_best_effort<'a, I>(
Expand Down
Loading
Loading