diff --git a/.clippy.toml b/.clippy.toml index 8987fce2..6b3b5fee 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1,3 +1,3 @@ -excessive-nesting-threshold = 4 +excessive-nesting-threshold = 6 too-many-arguments-threshold = 10 allowed-idents-below-min-chars = ["..", "k", "v", "f", "re", "id", "Ok", "'_"] diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index fc9c207a..5e60118e 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -17,7 +17,7 @@ jobs: - name: Install Rust + components uses: actions-rust-lang/setup-rust-toolchain@v1 with: - toolchain: 1.87 + toolchain: 1.89 components: rustfmt,clippy - name: Install code coverage uses: taiki-e/install-action@cargo-llvm-cov diff --git a/.vscode/settings.json b/.vscode/settings.json index 695f87c2..15fa34bb 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,7 +9,6 @@ ], "files.autoSave": "off", "files.insertFinalNewline": true, - "gitlens.showWelcomeOnInstall": false, "gitlens.showWhatsNewAfterUpgrades": false, "lldb.consoleMode": "evaluate", "rust-analyzer.cargo.features": [ diff --git a/Cargo.toml b/Cargo.toml index 2232bd2c..c9991c08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ glob = "0.3.1" heck = "0.5.0" # convert bytes to hex strings hex = "0.4.3" +hostname = "0.4.1" # hashmaps that preserve insertion order indexmap = { version = "2.9.0", features = ["serde"] } # utilities for iterables e.g. cartesian products diff --git a/cspell.json b/cspell.json index 99f8c978..42697206 100644 --- a/cspell.json +++ b/cspell.json @@ -80,7 +80,9 @@ "wasi", "patchelf", "itertools", - "colinianking" + "colinianking", + "itertools", + "pathset", ], "useGitignore": false, "ignorePaths": [ diff --git a/src/core/error.rs b/src/core/error.rs index d264814b..60a87382 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -122,9 +122,16 @@ impl fmt::Debug for OrcaError { match &self.kind { Kind::AgentCommunicationFailure { backtrace, .. } | Kind::FailedToStartPod { backtrace, .. } + | Kind::FailedToExtractRunInfo { backtrace, .. } | Kind::IncompletePacket { backtrace, .. } | Kind::InvalidPath { backtrace, .. } + | Kind::InvalidIndex { backtrace, .. } + | Kind::KeyMissing { backtrace, .. } | Kind::MissingInfo { backtrace, .. } + | Kind::FailedToGetPodJobOutput { backtrace, .. } + | Kind::PodJobProcessingError { backtrace, .. } + | Kind::PodJobSubmissionFailed { backtrace, .. } + | Kind::UnexpectedPathType { backtrace, .. } | Kind::BollardError { backtrace, .. } | Kind::ChronoParseError { backtrace, .. } | Kind::DOTError { backtrace, .. } diff --git a/src/core/graph.rs b/src/core/graph.rs index 8772dcc6..73bb87cb 100644 --- a/src/core/graph.rs +++ b/src/core/graph.rs @@ -1,5 +1,5 @@ use crate::{ - core::{pipeline::PipelineNode, util::get}, + core::{model::pipeline::PipelineNode, util::get}, uniffi::{error::Result, model::pipeline::Kernel}, }; use dot_parser::ast::Graph as DOTGraph; @@ -25,7 +25,7 @@ pub fn make_graph( let graph = DiGraph::::from_dot_graph(DOTGraph::try_from(input_dot)?).map( |_, node| PipelineNode { - name: node.id.clone(), + id: node.id.clone(), kernel: get(&metadata, &node.id) .unwrap_or_else(|error| panic!("{error}")) .clone(), diff --git a/src/core/mod.rs b/src/core/mod.rs index e4bf2a84..d5f14d51 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -9,7 +9,6 @@ macro_rules! inner_attr_to_each { pub(crate) mod error; pub(crate) mod graph; -pub(crate) mod pipeline; pub(crate) mod store; pub(crate) mod util; pub(crate) mod validation; @@ -20,6 +19,7 @@ inner_attr_to_each! { pub(crate) mod model; pub(crate) mod operator; pub(crate) mod orchestrator; + pub(crate) mod pipeline_runner; } #[cfg(feature = "test")] @@ -35,6 +35,7 @@ inner_attr_to_each! { )] pub mod crypto; pub mod model; - pub mod operator; pub mod orchestrator; + pub mod pipeline_runner; + pub mod operator; } diff --git a/src/core/model/mod.rs b/src/core/model/mod.rs index b21a1728..4006415b 100644 --- a/src/core/model/mod.rs +++ b/src/core/model/mod.rs @@ -57,4 +57,5 @@ where sorted.serialize(serializer) } +pub mod pipeline; pub mod pod; diff --git a/src/core/model/pipeline.rs b/src/core/model/pipeline.rs new file mode 100644 index 00000000..ffb0e319 --- /dev/null +++ b/src/core/model/pipeline.rs @@ -0,0 +1,109 @@ +use std::{ + backtrace::Backtrace, + collections::{HashMap, HashSet}, +}; + +use crate::uniffi::{ + error::{Kind, OrcaError, Result}, + model::{ + packet::PathSet, + pipeline::{Kernel, Pipeline, PipelineJob}, + }, +}; +use itertools::Itertools as _; +use petgraph::Direction::Incoming; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct PipelineNode { + pub id: String, + pub kernel: Kernel, +} + +impl Pipeline { + /// Function to get the parents of a node + pub(crate) fn get_node_parents( + &self, + node: &PipelineNode, + ) -> impl Iterator { + // Find the NodeIndex for the given node_key + let node_index = self + .graph + .node_indices() + .find(|&idx| self.graph[idx] == *node); + node_index.into_iter().flat_map(move |idx| { + self.graph + .neighbors_directed(idx, Incoming) + .map(move |parent_idx| &self.graph[parent_idx]) + }) + } + + /// Return a vec of `node_names` that takes in inputs based on the `input_spec` + pub(crate) fn get_input_nodes(&self) -> HashSet<&String> { + let mut input_nodes = HashSet::new(); + + self.input_spec.iter().for_each(|(_, node_uris)| { + for node_uri in node_uris { + input_nodes.insert(&node_uri.node_id); + } + }); + + input_nodes + } +} + +impl PipelineJob { + /// Helpful function to get the input packet for input nodes of the pipeline based on the `pipeline_job` an`pipeline_spec`ec + /// # Errors + /// Will return `Err` if there is an issue getting the input packet per node. + /// # Returns + /// A `HashMap` where the key is the node name and the value is a vector of `HashMap` representing the input packets for that node. + pub fn get_input_packet_per_node( + &self, + ) -> Result>>> { + // For each node in the input specification, we will iterate over its mapping + // nodes_input_spec contains > + let mut nodes_input_spec = HashMap::new(); + for (input_key, node_uris) in &self.pipeline.input_spec { + for node_uri in node_uris { + let input_path_sets = self.input_packet.get(input_key).ok_or(OrcaError { + kind: Kind::KeyMissing { + key: input_key.clone(), + backtrace: Some(Backtrace::capture()), + }, + })?; + // There shouldn't be a duplicate key in the input packet as this will be handle by pipeline verify + let input_spec = nodes_input_spec + .entry(&node_uri.node_id) + .or_insert_with(HashMap::new); + input_spec.insert(&node_uri.key, input_path_sets); + } + } + + // For each node, compute the cartesian product of the path_sets for each unique combination of keys + let node_input_packets = nodes_input_spec + .into_iter() + .map(|(node_id, input_node_keys)| { + // We need to pull them out at the same time to ensure the key order is preserve to match the cartesian product + let (keys, values): (Vec<_>, Vec<_>) = input_node_keys.into_iter().unzip(); + + // Covert each combo into a packet + let packets = values + .into_iter() + .multi_cartesian_product() + .map(|combo| { + keys.iter() + .copied() + .zip(combo) + .map(|(key, pathset)| (key.to_owned(), pathset.to_owned())) + .collect::>() + }) + .collect::>>(); + + (node_id.to_owned(), packets) + }) + .collect::>(); + + Ok(node_input_packets) + } +} diff --git a/src/core/operator.rs b/src/core/operator.rs index 64db6206..c10bcbc7 100644 --- a/src/core/operator.rs +++ b/src/core/operator.rs @@ -1,12 +1,13 @@ use crate::uniffi::{error::Result, model::packet::Packet}; use async_trait; use itertools::Itertools as _; +use serde::{Deserialize, Serialize}; use std::{clone::Clone, collections::HashMap, iter::IntoIterator, sync::Arc}; use tokio::sync::Mutex; #[async_trait::async_trait] pub trait Operator { - async fn next(&self, stream_name: String, packet: Packet) -> Result>; + async fn process_packet(&self, stream_name: String, packet: Packet) -> Result>; } pub struct JoinOperator { @@ -25,7 +26,7 @@ impl JoinOperator { #[async_trait::async_trait] impl Operator for JoinOperator { - async fn next(&self, stream_name: String, packet: Packet) -> Result> { + async fn process_packet(&self, stream_name: String, packet: Packet) -> Result> { let mut received_packets = self.received_packets.lock().await; received_packets .entry(stream_name.clone()) @@ -61,8 +62,9 @@ impl Operator for JoinOperator { } } +#[derive(uniffi::Object, Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct MapOperator { - map: HashMap, + pub map: HashMap, } impl MapOperator { @@ -73,7 +75,7 @@ impl MapOperator { #[async_trait::async_trait] impl Operator for MapOperator { - async fn next(&self, _: String, packet: Packet) -> Result> { + async fn process_packet(&self, _: String, packet: Packet) -> Result> { Ok(vec![ packet .iter() diff --git a/src/core/pipeline.rs b/src/core/pipeline.rs deleted file mode 100644 index 494e4919..00000000 --- a/src/core/pipeline.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::uniffi::model::pipeline::Kernel; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct PipelineNode { - pub name: String, - pub kernel: Kernel, -} diff --git a/src/core/pipeline_runner.rs b/src/core/pipeline_runner.rs new file mode 100644 index 00000000..d8bff399 --- /dev/null +++ b/src/core/pipeline_runner.rs @@ -0,0 +1,900 @@ +use crate::{ + core::{ + crypto::hash_buffer, + model::{pipeline::PipelineNode, serialize_hashmap}, + operator::{JoinOperator, Operator}, + util::{get, make_key_expr}, + }, + uniffi::{ + error::{ + Kind, OrcaError, Result, + selector::{self}, + }, + model::{ + packet::{Packet, PathSet, URI}, + pipeline::{Kernel, PipelineJob, PipelineResult, PipelineStatus}, + pod::{Pod, PodJob, PodResult}, + }, + orchestrator::{ + PodStatus, + agent::{Agent, AgentClient, Response}, + }, + }, +}; +use async_trait::async_trait; +use names::{Generator, Name}; +use serde_yaml::Serializer; +use snafu::{OptionExt as _, ResultExt as _}; +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, + sync::Arc, +}; +use tokio::{ + sync::{Mutex, RwLock}, + task::JoinSet, +}; + +static NODE_OUTPUT_KEY_EXPR: &str = "output"; +static FAILURE_KEY_EXP: &str = "failure"; + +/// Internal representation of a pipeline run, which should not be made public due to the fact that it contains +#[derive(Debug)] +struct PipelineRunInternal { + /// `PipelineJob` that this run is associated with + assigned_name: String, + session: Arc, // Zenoh session for communication + agent_client: Arc, // Zenoh agent client for communication with docker orchestrators + pipeline_job: Arc, // The pipeline job that this run is associated with + node_tasks: Arc>>>, // JoinSet of tasks for each node in the pipeline + outputs: Arc>>>, // String is the node key, while hash + failure_logs: Arc>>, // Logs of processing failures + failure_logging_task: Arc>>>, // JoinSet of tasks for logging failures + namespace: String, + namespace_lookup: HashMap, +} + +impl PipelineRunInternal { + fn make_key_expr(&self, node_id: &str, event: &str) -> String { + make_key_expr( + &self.agent_client.group, + &self.agent_client.host, + "pipeline_run", + &BTreeMap::from([ + ("event".to_owned(), event.to_owned()), + ("node_id".to_owned(), node_id.to_owned()), + ("pipeline_run_id".to_owned(), self.assigned_name.clone()), + ]), + ) + } + + fn make_abort_request_key_exp(&self) -> String { + make_key_expr( + &self.agent_client.group, + &self.agent_client.host, + "pipeline_run", + &BTreeMap::from([ + ("event".to_owned(), "abort".to_owned()), + ("pipeline_run_id".to_owned(), self.assigned_name.clone()), + ]), + ) + } + + // Utils functions + async fn send_packets(&self, node_id: &str, output_packets: &Vec) -> Result<()> { + Ok(self + .session + .put( + self.make_key_expr(node_id, NODE_OUTPUT_KEY_EXPR), + serde_json::to_string(output_packets)?, + ) + .await + .context(selector::AgentCommunicationFailure {})?) + } + + async fn send_err_msg(&self, node_id: &str, err: OrcaError) { + self.session + .put( + &self.make_key_expr(node_id, FAILURE_KEY_EXP), + format!("Node {node_id}: {err}"), + ) + .await + .context(selector::AgentCommunicationFailure {}) + .unwrap_or_else(|send_err| { + eprintln!("Failed to send error message for node {node_id}: {send_err}"); + }); + } + + async fn send_abort_request(&self) -> Result<()> { + Ok(self + .session + .put(&self.make_abort_request_key_exp(), vec![]) + .await + .context(selector::AgentCommunicationFailure {})?) + } + + async fn get_status(&self) -> PipelineStatus { + if !self.node_tasks.lock().await.is_empty() { + PipelineStatus::Running + } else if self.outputs.read().await.is_empty() { + PipelineStatus::Failed + } else if self.failure_logs.read().await.is_empty() { + PipelineStatus::Succeeded + } else { + PipelineStatus::PartiallySucceeded + } + } +} + +/// Runner that uses a docker agent to run pipelines +#[derive(Debug, Clone)] +pub struct DockerPipelineRunner { + agent: Arc, + pipeline_runs: HashMap>, +} + +/// This is an implementation of a pipeline runner that uses Zenoh to communicate between the tasks +/// The runtime is tokio +/// +/// These are the key expressions of the components of the pipeline: +/// Input Node: `pipeline_job_hash/input_node/outputs` (This is where the `pipeline_job` packets get fed to) +/// Nodes: `pipeline_job_hash/node_id/outputs/(success|failure)` (This is where the node outputs are sent to) +/// +impl DockerPipelineRunner { + /// Create a new Docker pipeline runner + /// # Errors + /// Will error out if the environment variable `HOSTNAME` is not set + pub fn new(agent: Arc) -> Self { + Self { + agent, + pipeline_runs: HashMap::new(), + } + } + + /// Will start a new pipeline run with the given `PipelineJob` + /// This will start the async tasks for each node in the pipeline + /// including the one that captures the outputs from the leaf nodes + /// + /// Upon receiving the ready message from all the nodes, it will send the input packets to the input node + /// + /// # Errors + /// Will error out if the pipeline job fails to start + pub async fn start( + &mut self, + pipeline_job: PipelineJob, + namespace: &str, // Name space to save pod_results to + namespace_lookup: &HashMap, + ) -> Result { + // Create a new pipeline run + let pipeline_run = Arc::new(PipelineRunInternal { + pipeline_job: pipeline_job.into(), + outputs: Arc::new(RwLock::new(HashMap::new())), + node_tasks: Arc::new(Mutex::new(JoinSet::new())), + failure_logs: Arc::new(RwLock::new(Vec::new())), + failure_logging_task: Arc::new(Mutex::new(JoinSet::new())), + assigned_name: Generator::with_naming(Name::Plain).next().context( + selector::MissingInfo { + details: "unable to generate a random name", + }, + )?, + session: Arc::clone(&self.agent.client.session), + agent_client: Arc::clone(&self.agent.client), + namespace: namespace.to_owned(), + namespace_lookup: namespace_lookup.clone(), + }); + + // Create failure logging task + pipeline_run + .failure_logging_task + .lock() + .await + .spawn(Self::failure_capture_task(Arc::clone(&pipeline_run))); + + // Create the processor task for each node + // The id for the pipeline_run is the pipeline_job hash + let pipeline_run_id = pipeline_run.pipeline_job.hash.clone(); + + let graph = &pipeline_run.pipeline_job.pipeline.graph; + + // Create the subscriber that listen for ready messages + let subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr("*", "node_ready")) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Get the set of input_nodes + let input_nodes = pipeline_run.pipeline_job.pipeline.get_input_nodes(); + + // Iterate through each node in the graph and spawn a task for each + for node_idx in graph.node_indices() { + let node = &graph[node_idx]; + + // Spawn the task + pipeline_run + .node_tasks + .lock() + .await + .spawn(Self::spawn_node_processing_task( + graph[node_idx].clone(), + Arc::clone(&pipeline_run), + input_nodes.contains(&node.id), + )); + } + + // Spawn the task that captures the outputs based on the output_spec + let mut node_output_spec = HashMap::new(); + // Group the output spec by node + for (output_key, node_uri) in &pipeline_run.pipeline_job.pipeline.output_spec { + node_output_spec + .entry(node_uri.node_id.clone()) + .or_insert_with(HashMap::new) + .insert(output_key.clone(), node_uri.key.clone()); + } + + for (node_id, key_mapping) in node_output_spec { + // Spawn the task that captures the outputs + pipeline_run + .node_tasks + .lock() + .await + .spawn(Self::create_output_capture_task_for_node( + key_mapping, + Arc::clone(&pipeline_run), + node_id.clone(), + )); + } + + // Wait for all nodes to be ready before sending inputs + + let num_of_nodes = graph.node_count(); + let mut ready_nodes = 0; + + while (subscriber.recv_async().await).is_ok() { + // Message is empty, just increment the counter + ready_nodes += 1; + + if ready_nodes == num_of_nodes { + break; // All nodes are ready, we can start sending inputs + } + } + + // For each node send all the packets associate with it + for (node_id, input_packets) in pipeline_run.pipeline_job.get_input_packet_per_node()? { + // Send the packet to the input node key_exp + pipeline_run + .send_packets(&format!("input_node_{node_id}"), &input_packets) + .await?; + + // Packets are sent, thus we can send the empty vec which signify processing is done + pipeline_run + .send_packets(&format!("input_node_{node_id}"), &Vec::new()) + .await?; + } + + // Insert into the list of pipeline runs + self.pipeline_runs + .insert(pipeline_run_id.clone(), pipeline_run); + + // Return the pipeline run id + Ok(pipeline_run_id) + } + + /// Given a pipeline run, wait for all its tasks to complete and return the `PipelineResult` + /// + /// # Errors + /// Will error out if any of the pipeline tasks failed to join + pub async fn get_result(&mut self, pipeline_run_id: &str) -> Result { + // To get the result, the pipeline execution must be complete, so we need to await on the tasks + + let pipeline_run = + self.pipeline_runs + .get_mut(pipeline_run_id) + .context(selector::KeyMissing { + key: pipeline_run_id.to_owned(), + })?; + + // Wait for all the tasks to complete + while let Some(result) = pipeline_run.node_tasks.lock().await.join_next().await { + result??; + } + + Ok(PipelineResult { + pipeline_job: Arc::clone(&pipeline_run.pipeline_job), + failure_logs: pipeline_run.failure_logs.read().await.clone(), + status: pipeline_run.get_status().await, + output_packets: pipeline_run.outputs.read().await.clone(), + }) + } + + /// Stop the pipeline run and all its tasks + /// This will send a stop message to a channel that all node manager task are subscribed to. + /// Upon receiving the stop message, each node manager will force abort all of its task and exit. + /// + /// # Errors + /// Will error out if the pipeline run is not found or if any of the tasks fail to stop correctly + pub async fn stop(&mut self, pipeline_run_id: &str) -> Result<()> { + // Get the pipeline run first then broadcast the abort request signal + let pipeline_run = + self.pipeline_runs + .get_mut(pipeline_run_id) + .context(selector::KeyMissing { + key: pipeline_run_id.to_owned(), + })?; + + // Send the abort request signal + pipeline_run.send_abort_request().await?; + + while pipeline_run + .node_tasks + .lock() + .await + .join_next() + .await + .is_some() + {} + Ok(()) + } + + /// This will capture the outputs of the given nodes and store it in the `outputs` map + async fn create_output_capture_task_for_node( + // + key_mapping: HashMap, + pipeline_run: Arc, + node_id: String, + ) -> Result<()> { + // Determine which keys we are interested in for the given node_id + + // Create a zenoh session + let subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr(&node_id, NODE_OUTPUT_KEY_EXPR)) + .await + .context(selector::AgentCommunicationFailure {})?; + + while let Ok(payload) = subscriber.recv_async().await { + // Extract the message from the payload + let packets: Vec = serde_json::from_slice(&payload.payload().to_bytes())?; + + if packets.is_empty() { + // Output node exited, thus we can exit the capture task too + break; + } + let mut outputs_lock = pipeline_run.outputs.write().await; + + for packet in packets { + for (output_key, node_key) in &key_mapping { + outputs_lock + .entry(output_key.to_owned()) + .or_default() + .push(get(&packet, node_key)?.clone()); + } + } + } + Ok(()) + } + + async fn failure_capture_task(pipeline_run: Arc) -> Result<()> { + let sub = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr("*", FAILURE_KEY_EXP)) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Listen to any failure messages and write it the logs + while let Ok(payload) = sub.recv_async().await { + // Extract the message from the payload + let failure_msg: String = serde_json::from_slice(&payload.payload().to_bytes())?; + // Store the failure message in the logs + pipeline_run.failure_logs.write().await.push(failure_msg); + } + + Ok(()) + } + + /// Function to start tasks associated with the node + /// Steps: + /// - Create the node processor based on the kernel type + /// - Create the zenoh session + /// - Create a join set to spawn and handle incoming messages tasks + /// - Create a subscriber for each of the parent nodes (Should only be 1, unless it is a joiner node) + /// - Create an abort listener task that will listen for stop requests + /// - For each subscriber, handle the incoming message appropriately + /// + /// # Errors + /// Will error out if the kernel for the node is not found or if the + async fn spawn_node_processing_task( + node: PipelineNode, + pipeline_run: Arc, + is_input_node: bool, + ) -> Result<()> { + // Get the node parents + let parent_nodes = pipeline_run + .pipeline_job + .pipeline + .get_node_parents(&node) + .collect::>(); + + // Create the correct processor for the node based on the kernel type + let node_processor: Arc>> = + Arc::new(Mutex::new(match &node.kernel { + Kernel::Pod { pod } => Box::new(PodProcessor::new( + Arc::clone(&pipeline_run), + node.id.clone(), + Arc::clone(pod), + )), + Kernel::MapOperator { mapper } => Box::new(OperatorProcessor::new( + Arc::clone(&pipeline_run), + node.id.clone(), + Arc::clone(mapper), + parent_nodes.len(), + )), + Kernel::JoinOperator => Box::new(OperatorProcessor::new( + Arc::clone(&pipeline_run), + node.id.clone(), + JoinOperator::new(parent_nodes.len()).into(), + parent_nodes.len(), + )), + })); + + // Create a join set to spawn and handle incoming messages tasks + let mut listener_tasks = JoinSet::new(); + + // Create a list of node_ids that this node should listen to + let mut nodes_to_sub_to = parent_nodes + .iter() + .map(|parent_node| parent_node.id.clone()) + .collect::>(); + + if is_input_node { + // If the node is an input node, we need to add the input node key expression + nodes_to_sub_to.push(format!("input_node_{}", node.id)); + } + + // For each node in nodes_to_subscribe_to, call the event handler func + for node_to_sub in &nodes_to_sub_to { + listener_tasks.spawn(Self::event_handler( + Arc::clone(&pipeline_run), + node.id.clone(), + node_to_sub.to_owned(), + Arc::clone(&node_processor), + )); + } + + // Create the listener task for the stop request + let abort_request_handler_task = tokio::spawn(Self::abort_request_event_handler( + node_processor, + Arc::clone(&pipeline_run), + )); + + // Wait for all tasks to be spawned and reply with ready message + // This is to ensure that the pipeline run knows when all tasks are ready to receive inputs + let mut num_of_ready_event_handler: usize = 0; + // Build the subscriber + let status_subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr(&node.id, "event_handler_ready")) + .await + .context(selector::AgentCommunicationFailure {})?; + + while status_subscriber.recv_async().await.is_ok() { + num_of_ready_event_handler += 1; + if num_of_ready_event_handler == nodes_to_sub_to.len() { + // +1 for the stop request task + break; // All tasks are ready, we can start sending inputs + } + } + + // Send a ready message so the pipeline knows when to start sending inputs + pipeline_run + .session + .put(pipeline_run.make_key_expr(&node.id, "node_ready"), vec![]) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Wait for all task to complete + while let Some(result) = listener_tasks.join_next().await { + match result { + Ok(Ok(())) => {} // Task completed successfully + Ok(Err(err)) => { + pipeline_run.send_err_msg(&node.id, err).await; + } + Err(err) => { + pipeline_run + .send_err_msg(&node.id, OrcaError::from(err)) + .await; + } + } + } + + // Abort the stop listener task since we don't need it anymore + abort_request_handler_task.abort(); + + Ok(()) + } + + /// This is the actual handler for incoming messages for the node + async fn event_handler( + pipeline_run: Arc, + node_id: String, + node_to_sub_to: String, + processor: Arc>>, + ) -> Result<()> { + // Create the subscriber + let subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr(&node_to_sub_to, NODE_OUTPUT_KEY_EXPR)) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Send out ready signal + pipeline_run + .session + .put( + pipeline_run.make_key_expr(&node_id, "event_handler_ready"), + vec![], + ) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Listen to the key + loop { + let sample = subscriber + .recv_async() + .await + .context(selector::AgentCommunicationFailure)?; + + // Extract out the packets + let packets: Vec = serde_json::from_slice(&sample.payload().to_bytes())?; + + // Check if the packets are empty, if so that means the node is finished processing + if packets.is_empty() { + processor + .lock() + .await + .mark_parent_as_complete(&node_to_sub_to) + .await; + break; + } + + // For each packet, we need to process it + for packet in packets { + processor + .lock() + .await + .process_incoming_packet(&node_to_sub_to, &packet) + .await; + } + } + Ok::<(), OrcaError>(()) + } + + /// This task will listen for stop requests on the given key expression + async fn abort_request_event_handler( + node_processor: Arc>>, + pipeline_run: Arc, + ) -> Result<()> { + let subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_abort_request_key_exp()) + .await + .context(selector::AgentCommunicationFailure {})?; + while subscriber.recv_async().await.is_ok() { + // Received a request to stop, therefore we need to tell the node_processor to shutdown + node_processor.lock().await.stop(); + } + Ok::<(), OrcaError>(()) + } +} + +/// Unify the interface for node processors and provide a common way to handle processing of incoming messages +/// This trait defines the methods that all node processors should implement +/// +/// Main purpose was to reduce the amount of code duplication between different node processors +/// As a result, each processor only needs to worry about writing their own function to process the msg +#[async_trait] +trait NodeProcessor: Send + Sync { + async fn process_incoming_packet(&mut self, sender_node_id: &str, incoming_packet: &Packet); + + /// Notifies the processor that the parent node has completed processing + /// If it is the last parent to complete, it will wait for all processing task to finish + /// Then send a completion signal + async fn mark_parent_as_complete(&mut self, parent_node_id: &str); + + fn stop(&mut self); +} + +/// Processor for Pods +/// Currently missing implementation to call agents for actual pod processing +struct PodProcessor { + pipeline_run: Arc, + node_id: String, + pod: Arc, + processing_tasks: JoinSet<()>, +} + +impl PodProcessor { + fn new(pipeline_run: Arc, node_id: String, pod: Arc) -> Self { + Self { + pipeline_run, + node_id, + pod, + processing_tasks: JoinSet::new(), + } + } +} + +impl PodProcessor { + /// Will handle the creation of the pod job, submission to the agent, listening for completion, and extracting the `output_packet` if successful + async fn process_packet( + pipeline_run: Arc, + node_id: String, + pod: Arc, + incoming_packet: HashMap, + ) -> Result { + // Hash the input_packet to create a unique identifier for the pod job + let input_packet_hash = { + let mut buf = Vec::new(); + let mut serializer = Serializer::new(&mut buf); + serialize_hashmap(&incoming_packet, &mut serializer)?; + hash_buffer(buf) + }; + + // Create the pod job + let pod_job = PodJob::new( + None, + Arc::clone(&pod), + incoming_packet, + URI { + namespace: pipeline_run.namespace.clone(), + path: format!( + "pipeline_outputs/{}/{node_id}/{input_packet_hash}", + pipeline_run.assigned_name + ) + .into(), + }, + pod.recommended_cpus, + pod.recommended_memory, + None, + &pipeline_run.namespace_lookup, + )?; + + // Create listener for pod_job + // Create the subscriber + let pod_job_subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.agent_client.make_key_expr( + true, + "pod_job", + BTreeMap::from([("hash", pod_job.hash.clone()), ("event", "*".to_owned())]), + )) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Create the async task to listen for the pod job completion + let pod_job_listener_task = tokio::spawn(async move { + // Wait for the pod job to complete and extract the result + let sample = pod_job_subscriber + .recv_async() + .await + .context(selector::AgentCommunicationFailure {})?; + // Extract the pod_result from the payload + let pod_result: PodResult = serde_json::from_slice(&sample.payload().to_bytes())?; + Ok::<_, OrcaError>(pod_result) + }); + + // Submit it to the client and get the response to make sure it was successful + let responses = pipeline_run + .agent_client + .start_pod_jobs(vec![pod_job.clone().into()]) + .await; + let response = responses + .first() + .context(selector::InvalidIndex { idx: 0_usize })?; + + match response { + Response::Ok => (), + Response::Err(err) => { + return Err(OrcaError { + kind: Kind::PodJobProcessingError { + hash: pod_job.hash, + reason: err.clone(), + backtrace: Some(snafu::Backtrace::capture()), + }, + }); + } + } + + // Get the pod result from the listener task + let pod_result = pod_job_listener_task.await??; + // Get the output packet for the pod result + Ok(match pod_result.status { + PodStatus::Completed => { + // Get the output packet + pod_result.output_packet + } + PodStatus::Failed(exit_code) => { + // Processing failed, thus return the error + return Err(OrcaError { + kind: Kind::PodJobProcessingError { + hash: pod_result.pod_job.hash.clone(), + reason: format!("Pod processing failed with exit code {exit_code}"), + backtrace: Some(snafu::Backtrace::capture()), + }, + }); + } + PodStatus::Running | PodStatus::Unset | PodStatus::Undefined => { + // This should not happen, but if it does, we will return an error + return Err(OrcaError { + kind: Kind::PodJobProcessingError { + hash: pod_result.pod_job.hash.clone(), + reason: "Pod result status is running or unset".to_owned(), + backtrace: Some(snafu::Backtrace::capture()), + }, + }); + } + }) + } +} + +#[async_trait] +impl NodeProcessor for PodProcessor { + async fn process_incoming_packet( + &mut self, + _sender_node_id: &str, + incoming_packet: &HashMap, + ) { + // Clone all necessary fields from self to move into the async block + let pipeline_run = Arc::clone(&self.pipeline_run); + let node_id = self.node_id.clone(); + let pod = Arc::clone(&self.pod); + + let incoming_packet_inner = incoming_packet.clone(); + + self.processing_tasks.spawn(async move { + let result = match Self::process_packet( + Arc::clone(&pipeline_run), + node_id.clone(), + Arc::clone(&pod), + incoming_packet_inner.clone(), + ) + .await + { + Ok(output_packet) => { + match pipeline_run + .send_packets(&node_id, &vec![output_packet]) + .await + { + Ok(()) => Ok(()), + Err(err) => Err(err), + } + } + Err(err) => Err(err), + }; + + match result { + Ok(()) => { + // Successfully processed the packet, nothing to do + } + Err(err) => { + pipeline_run.send_err_msg(&node_id, err).await; + } + } + }); + } + + async fn mark_parent_as_complete(&mut self, _parent_node_id: &str) { + // For pod we only have one parent, thus execute the exit case + while self.processing_tasks.join_next().await.is_some() {} + // Send out completion signal + match self + .pipeline_run + .send_packets(&self.node_id, &Vec::new()) + .await + { + Ok(()) => {} + Err(err) => { + self.pipeline_run.send_err_msg(&self.node_id, err).await; + } + } + } + + fn stop(&mut self) { + self.processing_tasks.abort_all(); + } +} + +struct OperatorProcessor { + pipeline_run: Arc, + node_id: String, + operator: Arc, + num_of_parents: usize, + num_of_completed_parents: usize, + processing_tasks: JoinSet<()>, +} + +impl OperatorProcessor { + /// Create a new operator processor + pub fn new( + pipeline_run: Arc, + node_id: String, + operator: Arc, + num_of_parents: usize, + ) -> Self { + Self { + pipeline_run, + node_id, + operator, + num_of_parents, + num_of_completed_parents: 0, + processing_tasks: JoinSet::new(), + } + } +} + +#[async_trait] +impl NodeProcessor for OperatorProcessor { + async fn process_incoming_packet( + &mut self, + sender_node_id: &str, + incoming_packet: &HashMap, + ) { + // Clone all necessary fields from self to move into the async block + let operator = Arc::clone(&self.operator); + let pipeline_run = Arc::clone(&self.pipeline_run); + let node_id = self.node_id.clone(); + + let sender_node_id_inner = sender_node_id.to_owned(); + let incoming_packet_inner = incoming_packet.clone(); + + self.processing_tasks.spawn(async move { + let processing_result = operator + .process_packet(sender_node_id_inner, incoming_packet_inner) + .await; + + match processing_result { + Ok(output_packets) => { + if !output_packets.is_empty() { + // Send out all the packets + match pipeline_run.send_packets(&node_id, &output_packets).await { + Ok(()) => {} + Err(err) => { + pipeline_run.send_err_msg(&node_id, err).await; + } + } + } + } + Err(err) => { + pipeline_run.send_err_msg(&node_id, err).await; + } + } + }); + } + + async fn mark_parent_as_complete(&mut self, _parent_node_id: &str) { + // Figure out if this is the last parent or not + self.num_of_completed_parents += 1; + + if self.num_of_completed_parents == self.num_of_parents { + // All parents are complete, thus we need to wait on all processing tasks then exit + while (self.processing_tasks.join_next().await).is_some() { + // Wait for all tasks to complete + } + // Send out completion signal which is same as success but it is an empty vec of packets + match self + .pipeline_run + .send_packets(&self.node_id, &Vec::new()) + .await + { + Ok(()) => {} + Err(err) => { + self.pipeline_run.send_err_msg(&self.node_id, err).await; + } + } + } + } + + fn stop(&mut self) { + self.processing_tasks.abort_all(); + } +} diff --git a/src/core/util.rs b/src/core/util.rs index f41e4356..4f0f72fd 100644 --- a/src/core/util.rs +++ b/src/core/util.rs @@ -1,6 +1,11 @@ use crate::uniffi::error::{Result, selector}; use snafu::OptionExt as _; -use std::{any::type_name, borrow::Borrow, collections::HashMap, fmt, hash}; +use std::{ + any::type_name, + borrow::Borrow, + collections::{BTreeMap, HashMap}, + fmt, hash, +}; #[expect( clippy::unwrap_used, @@ -35,3 +40,19 @@ where details: format!("key = {key:?}"), })?) } + +pub fn make_key_expr( + group: &str, + host: &str, + topic: &str, + content: &BTreeMap, +) -> String { + // For each key-value pair in the content, we format it as "key/value" and join them with "/". + // The final format will be "group/host/topic/key1/value1/key2/value + let content_converted = content + .iter() + .map(|(k, v)| format!("{k}/{v}")) + .collect::>() + .join("/"); + format!("{group}/{host}/{topic}/{content_converted}") +} diff --git a/src/uniffi/error.rs b/src/uniffi/error.rs index 00ed5c2e..b91bde21 100644 --- a/src/uniffi/error.rs +++ b/src/uniffi/error.rs @@ -18,6 +18,7 @@ use std::{ }; use tokio::task; use uniffi; + /// Shorthand for a Result that returns an [`OrcaError`]. pub type Result = result::Result; /// Possible errors you may encounter. @@ -30,6 +31,23 @@ pub(crate) enum Kind { source: Box, backtrace: Option, }, + #[snafu(display( + "Failed to extract run info from the container image file: {container_name}." + ))] + FailedToExtractRunInfo { + container_name: String, + backtrace: Option, + }, + #[snafu(display( + "Missing expected output file or dir with key {packet_key} at path {path:?} for pod job (hash: {pod_job_hash})." + ))] + FailedToGetPodJobOutput { + pod_job_hash: String, + packet_key: String, + path: Box, + io_error: Box, + backtrace: Option, + }, #[snafu(display("Incomplete {kind} packet. Missing `{missing_keys:?}` keys."))] IncompletePacket { kind: String, @@ -50,11 +68,37 @@ pub(crate) enum Kind { source: io::Error, backtrace: Option, }, + #[snafu(display("Failed to get items at idx {idx}."))] + InvalidIndex { + idx: usize, + backtrace: Option, + }, + #[snafu(display("Key '{key}' was not found in map."))] + KeyMissing { + key: String, + backtrace: Option, + }, #[snafu(display("Missing info. Details: {details}."))] MissingInfo { details: String, backtrace: Option, }, + #[snafu(display("Pod job submission failed with reason: {reason}."))] + PodJobSubmissionFailed { + reason: String, + backtrace: Option, + }, + #[snafu(display("Pod job {hash} failed to process with reason: {reason}."))] + PodJobProcessingError { + hash: String, + reason: String, + backtrace: Option, + }, + #[snafu(display("Unexpected path type: {path:?}. Only support files and directories."))] + UnexpectedPathType { + path: PathBuf, + backtrace: Option, + }, #[snafu(transparent)] BollardError { source: Box, diff --git a/src/uniffi/model/packet.rs b/src/uniffi/model/packet.rs index 595a3f92..df9f4a91 100644 --- a/src/uniffi/model/packet.rs +++ b/src/uniffi/model/packet.rs @@ -2,6 +2,9 @@ use serde::{Deserialize, Serialize}; use std::{collections::HashMap, path::PathBuf}; use uniffi; +use crate::core::util::get; +use crate::uniffi::error::Result; + /// Path sets are named and represent an abstraction for the file(s) that represent some particular /// data within a compute environment. #[derive(uniffi::Record, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -12,6 +15,9 @@ pub struct PathInfo { pub match_pattern: String, } +#[uniffi::export] +impl PathInfo {} + /// File or directory options for BLOBs. #[derive(uniffi::Enum, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)] pub enum BlobKind { @@ -52,5 +58,28 @@ pub enum PathSet { Collection(Vec), } +impl PathSet { + /// Util function to convert ``PathSet`` to ``PathBuf`` given a namespace lookup table + /// + /// # Errors + /// Will error out if namespace is missing in namespace lookup + pub fn to_path_buf(&self, namespace_lookup: &HashMap) -> Result> { + match self { + Self::Unary(blob) => { + let base_path = get(namespace_lookup, &blob.location.namespace)?; + Ok(vec![base_path.join(&blob.location.path)]) + } + Self::Collection(blobs) => { + let mut paths = Vec::with_capacity(blobs.len()); + for blob in blobs { + let base_path = get(namespace_lookup, &blob.location.namespace)?; + paths.push(base_path.join(&blob.location.path)); + } + Ok(paths) + } + } + } +} + /// A complete set of inputs to be provided to a computational unit. pub type Packet = HashMap; diff --git a/src/uniffi/model/pipeline.rs b/src/uniffi/model/pipeline.rs index 223f73cf..250fa742 100644 --- a/src/uniffi/model/pipeline.rs +++ b/src/uniffi/model/pipeline.rs @@ -2,7 +2,8 @@ use crate::{ core::{ crypto::{hash_blob, make_random_hash}, graph::make_graph, - pipeline::PipelineNode, + model::pipeline::PipelineNode, + operator::MapOperator, validation::validate_packet, }, uniffi::{ @@ -30,9 +31,9 @@ pub struct Pipeline { #[getset(skip)] pub graph: DiGraph, /// Exposed, internal input specification. Each input may be fed into more than one node/key if desired. - pub input_spec: HashMap>, + pub input_spec: HashMap>, /// Exposed, internal output specification. Each output is associated with only one node/key. - pub output_spec: HashMap, + pub output_spec: HashMap, } #[uniffi::export] @@ -46,14 +47,15 @@ impl Pipeline { pub fn new( graph_dot: &str, metadata: HashMap, - input_spec: &HashMap>, - output_spec: &HashMap, + input_spec: HashMap>, + output_spec: HashMap, ) -> Result { let graph = make_graph(graph_dot, metadata)?; + Ok(Self { graph, - input_spec: input_spec.clone(), - output_spec: output_spec.clone(), + input_spec, + output_spec, }) } } @@ -79,7 +81,6 @@ pub struct PipelineJob { pub output_dir: URI, } -#[expect(clippy::excessive_nesting, reason = "Nesting manageable.")] #[uniffi::export] impl PipelineJob { /// Construct a new pipeline job instance. @@ -91,7 +92,7 @@ impl PipelineJob { pub fn new( pipeline: Arc, input_packet: &HashMap>, - output_dir: &URI, + output_dir: URI, namespace_lookup: &HashMap, ) -> Result { validate_packet("input".into(), &pipeline.input_spec, input_packet)?; @@ -124,33 +125,75 @@ impl PipelineJob { hash: make_random_hash(), pipeline, input_packet: input_packet_with_checksum, - output_dir: output_dir.clone(), + output_dir, }) } } +/// Struct to hold the result of a pipeline execution. +#[derive(uniffi::Object, Debug, Clone, Deserialize, Serialize, Display, CloneGetters)] +#[getset(get_clone, impl_attrs = "#[uniffi::export]")] +#[display("{self:#?}")] +#[uniffi::export(Display)] +pub struct PipelineResult { + /// The pipeline job that was executed. + pub pipeline_job: Arc, + /// The result of the pipeline execution. + pub output_packets: HashMap>, + /// Logs of any failures that occurred during the pipeline execution. + pub failure_logs: Vec, + /// The status of the pipeline execution. + pub status: PipelineStatus, +} + +/// The status of a pipeline execution. +#[derive(uniffi::Enum, Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub enum PipelineStatus { + /// The pipeline is currently running. + Running, + /// The pipeline has completed successfully. + Succeeded, + /// The pipeline has failed. + Failed, + /// The pipeline has partially succeeded. There should be some failure logs + PartiallySucceeded, +} /// A node in a computational pipeline. -#[derive(uniffi::Enum, Debug, Clone, Deserialize, Serialize)] +#[derive(uniffi::Enum, Debug, Clone, Deserialize, Serialize, PartialEq)] pub enum Kernel { /// Pod reference. Pod { /// See [`Pod`](crate::uniffi::model::pod::Pod). - r#ref: Arc, + pod: Arc, }, /// Cartesian product operation. See [`JoinOperator`](crate::core::operator::JoinOperator). JoinOperator, /// Rename a path set key operation. MapOperator { /// See [`MapOperator`](crate::core::operator::MapOperator). - map: HashMap, + mapper: Arc, }, } +impl From for Kernel { + fn from(mapper: MapOperator) -> Self { + Self::MapOperator { + mapper: Arc::new(mapper), + } + } +} + +impl From for Kernel { + fn from(pod: Pod) -> Self { + Self::Pod { pod: Arc::new(pod) } + } +} + /// Index from pipeline node into pod specification. #[derive(uniffi::Record, Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] -pub struct SpecURI { +pub struct NodeURI { /// Node reference name in pipeline. - pub node: String, + pub node_id: String, /// Specification key. pub key: String, } diff --git a/src/uniffi/model/pod.rs b/src/uniffi/model/pod.rs index 62cd006f..b5e9567a 100644 --- a/src/uniffi/model/pod.rs +++ b/src/uniffi/model/pod.rs @@ -182,7 +182,6 @@ impl PodJob { }) } } - /// Result from a compute job run. #[derive(uniffi::Record, Serialize, Deserialize, Debug, Clone, PartialEq, Default)] pub struct PodResult { @@ -205,6 +204,8 @@ pub struct PodResult { pub created: u64, /// Time in epoch when terminated in seconds. pub terminated: u64, + /// Logs about stdout and stderr, where stderr is append at the end + pub logs: String, } impl PodResult { @@ -221,6 +222,7 @@ impl PodResult { created: u64, terminated: u64, namespace_lookup: &HashMap, + logs: String, ) -> Result { let output_packet = pod_job .pod @@ -263,6 +265,7 @@ impl PodResult { }) .collect::>()?; + // If packet is completed, the output packet must meet the output spec if matches!(status, PodStatus::Completed) { validate_packet("output".into(), &pod_job.pod.output_spec, &output_packet)?; } @@ -276,6 +279,7 @@ impl PodResult { status, created, terminated, + logs, }; Ok(Self { hash: hash_buffer(to_yaml(&pod_result_no_hash)?), diff --git a/src/uniffi/orchestrator/agent.rs b/src/uniffi/orchestrator/agent.rs index 6cab47b5..1d1df2e7 100644 --- a/src/uniffi/orchestrator/agent.rs +++ b/src/uniffi/orchestrator/agent.rs @@ -51,7 +51,7 @@ pub struct AgentClient { /// Connecting agent's assigned name used for reference. pub host: String, #[getset(skip)] - pub(crate) session: zenoh::Session, + pub(crate) session: Arc, } #[uniffi::export] @@ -72,7 +72,8 @@ impl AgentClient { .await .context(selector::AgentCommunicationFailure {})?, ) - })?, + })? + .into(), }) } /// Start many pod jobs to be processed in parallel. @@ -155,15 +156,15 @@ impl Agent { /// # Errors /// /// Will stop and return an error if encounters an error while processing any pod job request. - #[expect(clippy::excessive_nesting, reason = "Nesting manageable.")] pub async fn start( &self, namespace_lookup: &HashMap, available_store: Option>, ) -> Result<()> { let mut services = JoinSet::new(); + let self_ref = Arc::new(self.clone()); services.spawn(start_service( - Arc::new(self.clone()), + Arc::clone(&self_ref), "pod_job", BTreeMap::from([("action", "request".to_owned())]), namespace_lookup.clone(), @@ -185,7 +186,7 @@ impl Agent { "pod_job", BTreeMap::from([ ( - "action", + "event", match &pod_result.status { PodStatus::Completed => "success", PodStatus::Running @@ -204,23 +205,9 @@ impl Agent { )); if let Some(store) = available_store { services.spawn(start_service( - Arc::new(self.clone()), + Arc::clone(&self_ref), "pod_job", - BTreeMap::from([("action", "success".to_owned())]), - namespace_lookup.clone(), - { - let inner_store = Arc::clone(&store); - async move |_, _, _, pod_result| { - inner_store.save_pod_result(&pod_result)?; - Ok(()) - } - }, - async |_, ()| Ok(()), - )); - services.spawn(start_service( - Arc::new(self.clone()), - "pod_job", - BTreeMap::from([("action", "failure".to_owned())]), + BTreeMap::from([("event", "*".to_owned())]), namespace_lookup.clone(), async move |_, _, _, pod_result| { store.save_pod_result(&pod_result)?; diff --git a/src/uniffi/orchestrator/docker.rs b/src/uniffi/orchestrator/docker.rs index a8eb5d22..58114577 100644 --- a/src/uniffi/orchestrator/docker.rs +++ b/src/uniffi/orchestrator/docker.rs @@ -12,7 +12,9 @@ use crate::{ use async_trait; use bollard::{ Docker, - container::{RemoveContainerOptions, StartContainerOptions, WaitContainerOptions}, + container::{ + LogOutput, LogsOptions, RemoveContainerOptions, StartContainerOptions, WaitContainerOptions, + }, errors::Error::DockerContainerWaitError, image::{CreateImageOptions, ImportImageOptions}, }; @@ -70,6 +72,9 @@ impl Orchestrator for LocalDockerOrchestrator { ) -> Result { ASYNC_RUNTIME.block_on(self.get_result(pod_run, namespace_lookup)) } + fn get_logs_blocking(&self, pod_run: &PodRun) -> Result { + ASYNC_RUNTIME.block_on(self.get_logs(pod_run)) + } #[expect( clippy::try_err, reason = r#" @@ -263,8 +268,65 @@ impl Orchestrator for LocalDockerOrchestrator { ), })?, namespace_lookup, + self.get_logs(pod_run).await?, ) } + + async fn get_logs(&self, pod_run: &PodRun) -> Result { + let mut std_out = Vec::new(); + let mut std_err = Vec::new(); + + self.api + .logs::( + &pod_run.assigned_name, + Some(LogsOptions { + stdout: true, + stderr: true, + ..Default::default() + }), + ) + .try_collect::>() + .await? + .iter() + .for_each(|log_output| match log_output { + LogOutput::StdOut { message } => { + std_out.extend(message.to_vec()); + } + LogOutput::StdErr { message } => { + std_err.extend(message.to_vec()); + } + LogOutput::StdIn { .. } | LogOutput::Console { .. } => { + // Ignore stdin logs, as they are not relevant for our use case + } + }); + + let mut logs = String::from_utf8_lossy(&std_out).to_string(); + if !std_err.is_empty() { + logs.push_str("\nSTDERR:\n"); + logs.push_str(&String::from_utf8_lossy(&std_err)); + } + + // Check for errors in the docker state, if exist, attach it to logs + // This is for when the container exits immediately due to a bad command or similar + let error = self + .api + .inspect_container(&pod_run.assigned_name, None) + .await? + .state + .context(selector::FailedToExtractRunInfo { + container_name: &pod_run.assigned_name, + })? + .error + .context(selector::FailedToExtractRunInfo { + container_name: &pod_run.assigned_name, + })?; + + if !error.is_empty() { + logs.push_str(&error); + } + + Ok(logs) + } } #[uniffi::export] diff --git a/src/uniffi/orchestrator/mod.rs b/src/uniffi/orchestrator/mod.rs index 00358ec3..04d2412f 100644 --- a/src/uniffi/orchestrator/mod.rs +++ b/src/uniffi/orchestrator/mod.rs @@ -58,7 +58,7 @@ pub struct PodRunInfo { pub memory_limit: u64, } /// Current computation managed by orchestrator. -#[derive(uniffi::Record, Debug, PartialEq)] +#[derive(uniffi::Record, Debug, PartialEq, Clone)] pub struct PodRun { /// Original compute request. pub pod_job: Arc, @@ -121,6 +121,10 @@ pub trait Orchestrator: Send + Sync + fmt::Debug { pod_run: &PodRun, namespace_lookup: &HashMap, ) -> Result; + /// Get the logs for a specific pod run. + /// # Errors + /// Will return `Err` if there is an issue getting logs. + fn get_logs_blocking(&self, pod_run: &PodRun) -> Result; /// How to asynchronously start containers with an alternate image. /// /// # Errors @@ -170,6 +174,9 @@ pub trait Orchestrator: Send + Sync + fmt::Debug { pod_run: &PodRun, namespace_lookup: &HashMap, ) -> Result; + + /// Get the logs for a specific pod run. + async fn get_logs(&self, pod_run: &PodRun) -> Result; } /// Orchestration execution agent daemon and client. pub mod agent; diff --git a/tests/agent.rs b/tests/agent.rs index 0fa82a0d..6d7ec1e7 100644 --- a/tests/agent.rs +++ b/tests/agent.rs @@ -42,7 +42,6 @@ fn simple() -> Result<()> { Ok(()) } -#[expect(clippy::excessive_nesting, reason = "Nesting is manageable")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn parallel_four_cores() -> Result<()> { let test_dirs = TestDirs::new(&HashMap::from([("default".to_owned(), None::)]))?; @@ -102,7 +101,7 @@ async fn parallel_four_cores() -> Result<()> { .await .expect("All senders have dropped."); let metadata = extract_metadata(sample.key_expr().as_str()); - let topic_kind = metadata["action"].as_str(); + let topic_kind = metadata["event"].as_str(); if ["success", "failure"].contains(&topic_kind) { let pod_result = serde_json::from_slice::(&sample.payload().to_bytes())?; assert!( diff --git a/tests/extra/data/input_txt/Where.txt b/tests/extra/data/input_txt/Where.txt new file mode 100644 index 00000000..2891a132 --- /dev/null +++ b/tests/extra/data/input_txt/Where.txt @@ -0,0 +1 @@ +Where diff --git a/tests/extra/data/input_txt/black.txt b/tests/extra/data/input_txt/black.txt new file mode 100644 index 00000000..7e66a17d --- /dev/null +++ b/tests/extra/data/input_txt/black.txt @@ -0,0 +1 @@ +black diff --git a/tests/extra/data/input_txt/cat.txt b/tests/extra/data/input_txt/cat.txt new file mode 100644 index 00000000..ef07ddcd --- /dev/null +++ b/tests/extra/data/input_txt/cat.txt @@ -0,0 +1 @@ +cat diff --git a/tests/extra/data/input_txt/hiding.txt b/tests/extra/data/input_txt/hiding.txt new file mode 100644 index 00000000..56e64f05 --- /dev/null +++ b/tests/extra/data/input_txt/hiding.txt @@ -0,0 +1 @@ +hiding diff --git a/tests/extra/data/input_txt/is.txt b/tests/extra/data/input_txt/is.txt new file mode 100644 index 00000000..f5cb1322 --- /dev/null +++ b/tests/extra/data/input_txt/is.txt @@ -0,0 +1 @@ +is diff --git a/tests/extra/data/input_txt/playing.txt b/tests/extra/data/input_txt/playing.txt new file mode 100644 index 00000000..0395b790 --- /dev/null +++ b/tests/extra/data/input_txt/playing.txt @@ -0,0 +1 @@ +playing diff --git a/tests/extra/data/input_txt/tabby.txt b/tests/extra/data/input_txt/tabby.txt new file mode 100644 index 00000000..3de6015d --- /dev/null +++ b/tests/extra/data/input_txt/tabby.txt @@ -0,0 +1 @@ +tabby diff --git a/tests/extra/data/input_txt/the.txt b/tests/extra/data/input_txt/the.txt new file mode 100644 index 00000000..41d25f51 --- /dev/null +++ b/tests/extra/data/input_txt/the.txt @@ -0,0 +1 @@ +the diff --git a/tests/extra/python/agent_test.py b/tests/extra/python/agent_test.py index 7e35ea79..305dd2f8 100644 --- a/tests/extra/python/agent_test.py +++ b/tests/extra/python/agent_test.py @@ -29,7 +29,7 @@ def count(sample): with zenoh.open(zenoh.Config()) as session: with session.declare_subscriber( - f"**/action/success/**/group/{group}/**/topic/pod_job/**", count + f"**/event/success/**/group/{group}/**/topic/pod_job/**", count ) as subscriber: await asyncio.sleep(20) # wait for results @@ -45,7 +45,7 @@ async def main(client, agent, test_dir, namespace_lookup, pod_jobs): available_store=LocalFileStore(directory=f"{test_dir}/store"), ), ) - await asyncio.sleep(5) # ensure service ready + await asyncio.sleep(1) # ensure service ready try: await client.start_pod_jobs(pod_jobs=pod_jobs) @@ -90,7 +90,7 @@ async def main(client, agent, test_dir, namespace_lookup, pod_jobs): output_spec={}, source_commit_url="https://github.com/user/simple", recommended_cpus=0.1, - recommended_memory=10 << 20, + recommended_memory=128 << 20, required_gpu=None, ), input_packet={}, @@ -99,7 +99,7 @@ async def main(client, agent, test_dir, namespace_lookup, pod_jobs): path=".", ), cpu_limit=1, - memory_limit=10 << 20, + memory_limit=128 << 20, env_vars=None, namespace_lookup=namespace_lookup, ) diff --git a/tests/fixture/mod.rs b/tests/fixture/mod.rs index a9ae1090..c0779a01 100644 --- a/tests/fixture/mod.rs +++ b/tests/fixture/mod.rs @@ -8,15 +8,19 @@ )] use names::{Generator, Name}; -use orcapod::uniffi::{ - error::Result, - model::{ - Annotation, - packet::{Blob, BlobKind, Packet, PathInfo, PathSet, URI}, - pod::{Pod, PodJob, PodResult}, +use orcapod::{ + core::operator::MapOperator, + uniffi::{ + error::Result, + model::{ + Annotation, + packet::{Blob, BlobKind, Packet, PathInfo, PathSet, URI}, + pipeline::{Kernel, NodeURI, Pipeline, PipelineJob}, + pod::{Pod, PodJob, PodResult}, + }, + orchestrator::PodStatus, + store::{ModelID, ModelInfo, Store}, }, - orchestrator::PodStatus, - store::{ModelID, ModelInfo, Store}, }; use std::{ collections::HashMap, @@ -153,6 +157,7 @@ pub fn pod_result_style( 1_737_922_307, 1_737_925_907, namespace_lookup, + "Example logs".to_owned(), ) } @@ -280,6 +285,275 @@ pub fn pull_image(reference: &str) -> Result<()> { Ok(()) } +// Pipeline Fixture +pub fn combine_txt_pod(pod_name: &str) -> Result { + Pod::new( + Some(Annotation { + name: pod_name.to_owned(), + description: "Takes two input files, remove the final next line and combine them" + .to_owned(), + version: "1.0.0".to_owned(), + }), + "alpine:3.14".to_owned(), + vec![ + "sh".into(), + "-c".into(), + format!( + "printf '%s %s\\n' \"$(cat input/input_1.txt | head -c -1)\" \"$(cat input/input_2.txt | head -c -1)\" > /output/output.txt" + ), + ], + HashMap::from([ + ( + "input_1".to_owned(), + PathInfo { + path: PathBuf::from("/input/input_1.txt"), + match_pattern: r".*\.txt".to_owned(), + }, + ), + ( + "input_2".into(), + PathInfo { + path: PathBuf::from("/input/input_2.txt"), + match_pattern: r".*\.txt".to_owned(), + }, + ), + ]), + PathBuf::from("/output"), + HashMap::from([( + "output".to_owned(), + PathInfo { + path: PathBuf::from("output.txt"), + match_pattern: r".*\.txt".to_owned(), + }, + )]), + "N/A".to_owned(), + 0.25, // 250 millicores as frac cores + 128_u64 << 20, // 128MB in bytes + None, + ) +} + +#[expect(clippy::too_many_lines, reason = "OK in tests.")] +pub fn pipeline() -> Result { + // Create a simple pipeline where the functions job is to add append their name into the input file + // Structure: A -> Mapper -> Joiner -> B -> Mapper -> C, D -> Mapper -> Joiner + + // Create the kernel map + let mut kernel_map = HashMap::new(); + + // Insert the pod into the kernel map + for pod_name in ["A", "B", "C", "D", "E"] { + kernel_map.insert(pod_name.into(), combine_txt_pod(pod_name)?.into()); + } + + let output_to_input_1 = Arc::new(MapOperator { + map: HashMap::from([("output".to_owned(), "input_1".to_owned())]), + }); + + let output_to_input_2 = Arc::new(MapOperator { + map: HashMap::from([("output".to_owned(), "input_2".to_owned())]), + }); + + // Create a mapper for A, B, and C + kernel_map.insert( + "pod_a_mapper".into(), + Kernel::MapOperator { + mapper: Arc::clone(&output_to_input_1), + }, + ); + kernel_map.insert( + "pod_b_mapper".into(), + Kernel::MapOperator { + mapper: Arc::clone(&output_to_input_2), + }, + ); + kernel_map.insert( + "pod_c_mapper".into(), + Kernel::MapOperator { + mapper: Arc::clone(&output_to_input_1), + }, + ); + kernel_map.insert( + "pod_d_mapper".into(), + Kernel::MapOperator { + mapper: Arc::clone(&output_to_input_2), + }, + ); + + for joiner_name in ['c', 'd', 'e'] { + kernel_map.insert(format!("pod_{joiner_name}_joiner"), Kernel::JoinOperator); + } + + // Write all the edges in DOT format + let dot = " + digraph { + A -> pod_a_mapper -> pod_c_joiner; + B -> pod_b_mapper -> pod_c_joiner; + pod_c_joiner -> C -> pod_c_mapper-> pod_e_joiner; + D -> pod_d_mapper -> pod_e_joiner; + pod_e_joiner -> E; + } + "; + + Pipeline::new( + dot, + kernel_map, + HashMap::from([ + ( + "where".into(), + vec![NodeURI { + node_id: "A".into(), + key: "input_1".into(), + }], + ), + ( + "is".into(), + vec![NodeURI { + node_id: "A".into(), + key: "input_2".into(), + }], + ), + ( + "the".into(), + vec![NodeURI { + node_id: "B".into(), + key: "input_1".into(), + }], + ), + ( + "cat_color".into(), + vec![NodeURI { + node_id: "B".into(), + key: "input_2".into(), + }], + ), + ( + "cat".into(), + vec![NodeURI { + node_id: "D".into(), + key: "input_1".into(), + }], + ), + ( + "action".into(), + vec![NodeURI { + node_id: "D".into(), + key: "input_2".into(), + }], + ), + ]), + HashMap::from([( + "output".to_owned(), + NodeURI { + node_id: "E".into(), + key: "output".into(), + }, + )]), + ) +} + +#[expect(clippy::implicit_hasher, reason = "Could be a false positive?")] +pub fn pipeline_job(namespace_lookup: &HashMap) -> Result { + // Create a simple pipeline_job + let namespace: String = "default".into(); + PipelineJob::new( + pipeline()?.into(), + &HashMap::from([ + ( + "where".into(), + vec![PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/Where.txt".into(), + }, + checksum: String::new(), + })], + ), + ( + "is".into(), + vec![PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/is.txt".into(), + }, + checksum: String::new(), + })], + ), + ( + "the".into(), + vec![PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/the.txt".into(), + }, + checksum: String::new(), + })], + ), + ( + "cat_color".into(), + vec![ + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/black.txt".into(), + }, + checksum: String::new(), + }), + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/tabby.txt".into(), + }, + checksum: String::new(), + }), + ], + ), + ( + "cat".into(), + vec![PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/cat.txt".into(), + }, + checksum: String::new(), + })], + ), + ( + "action".into(), + vec![ + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/hiding.txt".into(), + }, + checksum: String::new(), + }), + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace, + path: "input_txt/playing.txt".into(), + }, + checksum: String::new(), + }), + ], + ), + ]), + URI { + namespace: "default".to_owned(), + path: PathBuf::from("pipeline_output"), + }, + namespace_lookup, + ) +} + // --- util --- pub fn str_to_vec(v: &str) -> Vec { diff --git a/tests/model.rs b/tests/model.rs index 31d0ce8c..aa571c3c 100644 --- a/tests/model.rs +++ b/tests/model.rs @@ -102,9 +102,9 @@ fn pod_job_to_yaml() -> Result<()> { #[test] fn hash_pod_result() -> Result<()> { - assert_eq!( + pretty_assert_eq!( pod_result_style(&NAMESPACE_LOOKUP_READ_ONLY)?.hash, - "e752d86d4fc5435bfa4564ba951851530f5cf2228586c2f488c2ca9e7bcc7ed1", + "0bc0f17230cf78dbf3054c6887f42abad8b48382efee2ffe1adc79f0ae02fd1a", "Hash didn't match." ); Ok(()) @@ -134,6 +134,7 @@ fn pod_result_to_yaml() -> Result<()> { status: Completed created: 1737922307 terminated: 1737925907 + logs: Example logs "}, "YAML serialization didn't match." ); diff --git a/tests/operator.rs b/tests/operator.rs index 0389f069..546c7a7f 100644 --- a/tests/operator.rs +++ b/tests/operator.rs @@ -29,7 +29,7 @@ async fn next_batch( ) -> Result> { let mut next_packets = vec![]; for (stream_name, packet) in packets { - next_packets.extend(operator.next(stream_name, packet).await?); + next_packets.extend(operator.process_packet(stream_name, packet).await?); } Ok(next_packets) } @@ -105,7 +105,7 @@ async fn join_spotty() -> Result<()> { assert_eq!( operator - .next( + .process_packet( "right".into(), Packet::from([make_packet_key("style".into(), "right/style0.t7".into())]) ) @@ -116,7 +116,7 @@ async fn join_spotty() -> Result<()> { assert_eq!( operator - .next( + .process_packet( "right".into(), Packet::from([make_packet_key("style".into(), "right/style1.t7".into())]) ) @@ -127,7 +127,7 @@ async fn join_spotty() -> Result<()> { assert_eq!( operator - .next( + .process_packet( "left".into(), Packet::from([make_packet_key( "subject".into(), @@ -194,7 +194,7 @@ async fn map_once() -> Result<()> { assert_eq!( operator - .next( + .process_packet( "parent".into(), Packet::from([ make_packet_key("key_old".into(), "some/key.txt".into()), diff --git a/tests/orchestrator.rs b/tests/orchestrator.rs index d129928b..db0c5706 100644 --- a/tests/orchestrator.rs +++ b/tests/orchestrator.rs @@ -4,7 +4,6 @@ clippy::panic, clippy::expect_used, clippy::unwrap_used, - clippy::indexing_slicing, reason = "OK in tests." )] @@ -49,12 +48,12 @@ fn basic_test( assert_eq!( orchestrator .list_blocking()? - .iter() - .filter(|run| **run == *pod_run) - .map(|run| Ok(orchestrator.get_info_blocking(run)?.command)) - .collect::>>()?[0], - expected_command, - "List return a pod_run with a different command." + .into_iter() + .filter(|pod_run_from_list| pod_run_from_list == pod_run) + .map(|run| Ok(orchestrator.get_info_blocking(&run)?.command)) + .collect::>>()?, + vec![expected_command], + "Unexpected list." ); // await result let pod_result_1 = orchestrator.get_result_blocking(pod_run, namespace_lookup)?; @@ -68,11 +67,11 @@ fn basic_test( orchestrator .list_blocking()? .into_iter() - .filter(|run| *run == *pod_run) + .filter(|pod_run_from_list| pod_run_from_list == pod_run) .map(|run| Ok(orchestrator.get_info_blocking(&run)?.command)) - .collect::>>()?[0], - expected_command, - "List return a pod_run with a different command." + .collect::>>()?, + vec![expected_command], + "Unexpected list." ); assert_eq!( pod_result_1.assigned_name, pod_run.assigned_name, @@ -296,3 +295,44 @@ async fn verify_pod_result_not_running() -> Result<()> { ); Ok(()) } + +#[test] +fn logs() -> Result<()> { + execute_wrapper(|orchestrator, namespace_lookup| { + let pod_job = pod_job_custom( + pod_custom( + "alpine:3.14", + vec!["bin/sh".into(), "-c".into(), "echo \"hi\"".into()], + HashMap::new(), + )?, + HashMap::new(), + namespace_lookup, + )?; + + let pod_run = orchestrator.start_blocking(&pod_job, namespace_lookup)?; + let pod_result = orchestrator.get_result_blocking(&pod_run, namespace_lookup)?; + + assert_eq!( + pod_result.status, + PodStatus::Completed, + "Pod status is not completed" + ); + + assert_eq!(orchestrator.get_logs_blocking(&pod_run)?, "hi\n"); + assert_eq!( + orchestrator + .get_result_blocking(&pod_run, namespace_lookup)? + .logs, + "hi\n" + ); + + orchestrator.delete_blocking(&pod_run)?; + + assert!( + !orchestrator.list_blocking()?.contains(&pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} diff --git a/tests/pipeline.rs b/tests/pipeline.rs index caa02d4a..b755f656 100644 --- a/tests/pipeline.rs +++ b/tests/pipeline.rs @@ -13,7 +13,7 @@ use orcapod::uniffi::{ error::Result, model::{ packet::{Blob, BlobKind, PathInfo, PathSet, URI}, - pipeline::{Kernel, Pipeline, PipelineJob, SpecURI}, + pipeline::{Kernel, NodeURI, Pipeline, PipelineJob}, }, }; use std::collections::HashMap; @@ -29,7 +29,7 @@ fn input_packet_checksum() -> Result<()> { HashMap::from([( "A".into(), Kernel::Pod { - r#ref: pod_custom( + pod: pod_custom( "alpine:3.14", vec!["echo".into()], HashMap::from([( @@ -43,14 +43,14 @@ fn input_packet_checksum() -> Result<()> { .into(), }, )]), - &HashMap::from([( + HashMap::from([( "pipeline_key_1".into(), - vec![SpecURI { - node: "A".into(), + vec![NodeURI { + node_id: "A".into(), key: "node_key_1".into(), }], )]), - &HashMap::new(), + HashMap::new(), )?; let pipeline_job = PipelineJob::new( @@ -66,7 +66,7 @@ fn input_packet_checksum() -> Result<()> { checksum: String::new(), }])], )]), - &URI { + URI { namespace: "default".into(), path: "output/pipeline".into(), }, diff --git a/tests/pipeline_runner.rs b/tests/pipeline_runner.rs new file mode 100644 index 00000000..76490bc1 --- /dev/null +++ b/tests/pipeline_runner.rs @@ -0,0 +1,96 @@ +#![expect( + missing_docs, + clippy::panic_in_result_fn, + clippy::indexing_slicing, + clippy::unwrap_used, + reason = "OK in tests." +)] + +// If 'fixture' is a local module, ensure there is a 'mod fixture;' statement or a 'fixture.rs' file in the same directory or in 'tests/'. +// If 'fixture' is an external crate, add it to Cargo.toml and import as shown below. +// use fixture::pipeline_job; +pub mod fixture; + +// Example for a local module: +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use orcapod::{ + core::pipeline_runner::DockerPipelineRunner, + uniffi::{ + error::Result, + model::pipeline::PipelineStatus, + orchestrator::{agent::Agent, docker::LocalDockerOrchestrator}, + }, +}; +use tokio::fs::read_to_string; + +use crate::fixture::TestDirs; +use fixture::pipeline_job; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn basic_run() -> Result<()> { + // Create the test_dir and get the namespace lookup + let test_dirs = TestDirs::new(&HashMap::from([( + "default".to_owned(), + Some("./tests/extra/data/"), + )]))?; + let namespace_lookup = test_dirs.namespace_lookup(); + + // Create and agent and start it (temporary for now, will be merge later) + let agent = Arc::new(Agent::new( + "test:basic_run".to_owned(), + "localhost".to_owned(), + Arc::new(LocalDockerOrchestrator::new().unwrap()), + )?); + + let agent_inner = Arc::clone(&agent); + let namespace_lookup_inner = namespace_lookup.clone(); + tokio::spawn(async move { + agent_inner + .start(&namespace_lookup_inner, None) + .await + .unwrap(); + }); + + let pipeline_job = pipeline_job(&namespace_lookup)?; + + // Create the runner + let mut runner = DockerPipelineRunner::new(agent); + + let pipeline_run = runner + .start(pipeline_job, "default", &namespace_lookup) + .await?; + + // Wait for the pipeline run to complete + let pipeline_result = runner.get_result(&pipeline_run).await?; + + // Check the output packet content + assert_eq!(pipeline_result.output_packets["output"].len(), 4); + + // Check the status + assert_eq!(pipeline_result.status, PipelineStatus::Succeeded); + + // Get all the output file content and read them in + let mut output_content = HashSet::new(); + + for output_packet in &pipeline_result.output_packets["output"] { + output_content + .insert(read_to_string(&output_packet.to_path_buf(&namespace_lookup)?[0]).await?); + } + + // Check if the output_content matches + assert_eq!( + output_content, + HashSet::from([ + "Where is the black cat playing\n".to_owned(), + "Where is the black cat hiding\n".to_owned(), + "Where is the tabby cat playing\n".to_owned(), + "Where is the tabby cat hiding\n".to_owned(), + ]) + ); + + Ok(()) +}