diff --git a/.clippy.toml b/.clippy.toml index 8987fce2..6b3b5fee 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1,3 +1,3 @@ -excessive-nesting-threshold = 4 +excessive-nesting-threshold = 6 too-many-arguments-threshold = 10 allowed-idents-below-min-chars = ["..", "k", "v", "f", "re", "id", "Ok", "'_"] diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index fc9c207a..33299131 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -17,12 +17,12 @@ jobs: - name: Install Rust + components uses: actions-rust-lang/setup-rust-toolchain@v1 with: - toolchain: 1.87 + toolchain: 1.89 components: rustfmt,clippy - name: Install code coverage uses: taiki-e/install-action@cargo-llvm-cov - name: Run syntax and style tests - run: cargo clippy --no-default-features --features=test --all-targets -- -D warnings + run: cargo clippy --all-targets -- -D warnings - name: Run format test run: cargo fmt --check - name: Run integration tests w/ coverage report @@ -57,15 +57,11 @@ jobs: uv pip install eclipse-zenoh -p ~/.local/share/base . ~/.local/share/base/bin/activate maturin develop --uv - - name: Run smoke test + - name: Run Python tests env: RUST_BACKTRACE: full run: | . ~/.local/share/base/bin/activate - python tests/extra/python/smoke_test.py -- tests/.tmp - - name: Run agent test - env: - RUST_BACKTRACE: full - run: | - . ~/.local/share/base/bin/activate - python tests/extra/python/agent_test.py -- tests/.tmp + for py_file in tests/extra/python/*.py; do + python "$py_file" -- tests/.tmp + done diff --git a/.vscode/settings.json b/.vscode/settings.json index 695f87c2..a752f26f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,13 +9,8 @@ ], "files.autoSave": "off", "files.insertFinalNewline": true, - "gitlens.showWelcomeOnInstall": false, "gitlens.showWhatsNewAfterUpgrades": false, "lldb.consoleMode": "evaluate", - "rust-analyzer.cargo.features": [ - "test" - ], - "rust-analyzer.cargo.noDefaultFeatures": true, "rust-analyzer.check.command": "clippy", "rust-analyzer.runnables.extraTestBinaryArgs": [ "--nocapture" diff --git a/Cargo.toml b/Cargo.toml index 2232bd2c..c9991c08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ glob = "0.3.1" heck = "0.5.0" # convert bytes to hex strings hex = "0.4.3" +hostname = "0.4.1" # hashmaps that preserve insertion order indexmap = { version = "2.9.0", features = ["serde"] } # utilities for iterables e.g. cartesian products diff --git a/cspell.json b/cspell.json index 99f8c978..42697206 100644 --- a/cspell.json +++ b/cspell.json @@ -80,7 +80,9 @@ "wasi", "patchelf", "itertools", - "colinianking" + "colinianking", + "itertools", + "pathset", ], "useGitignore": false, "ignorePaths": [ diff --git a/src/core/crypto.rs b/src/core/crypto.rs index 0c606e19..4a109500 100644 --- a/src/core/crypto.rs +++ b/src/core/crypto.rs @@ -108,3 +108,43 @@ pub fn make_random_hash() -> String { rand::rng().fill_bytes(&mut bytes); hex::encode(bytes) } + +#[cfg(test)] +mod tests { + #![expect(clippy::panic_in_result_fn, reason = "OK in tests.")] + use crate::{ + core::crypto::{hash_buffer, hash_dir, hash_file}, + uniffi::error::Result, + }; + use std::fs::read; + + #[test] + fn consistent_hash() -> Result<()> { + let filepath = "./tests/extra/data/images/subject.jpeg"; + assert_eq!( + hash_file(filepath)?, + hash_buffer(&read(filepath)?), + "Checksum not consistent." + ); + Ok(()) + } + + #[test] + fn complex_hash() -> Result<()> { + let dirpath = "./tests/extra/data/images"; + assert_eq!( + hash_dir(dirpath)?, + "6c96a478ea25e34fab045bc82858a2980b2cfb22db32e83c01349a8e7ed3b42c".to_owned(), + "Directory checksum didn't match." + ); + Ok(()) + } + + #[test] + fn internal_invalid_filepath() { + assert!( + hash_file("nonexistent_file.txt").is_err(), + "Did not raise an invalid filepath error." + ); + } +} diff --git a/src/core/error.rs b/src/core/error.rs index d264814b..64e6d9f3 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -121,10 +121,18 @@ impl fmt::Debug for OrcaError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match &self.kind { Kind::AgentCommunicationFailure { backtrace, .. } + | Kind::EmptyDir { backtrace, .. } | Kind::FailedToStartPod { backtrace, .. } + | Kind::FailedToExtractRunInfo { backtrace, .. } | Kind::IncompletePacket { backtrace, .. } | Kind::InvalidPath { backtrace, .. } + | Kind::InvalidIndex { backtrace, .. } + | Kind::KeyMissing { backtrace, .. } | Kind::MissingInfo { backtrace, .. } + | Kind::FailedToGetPodJobOutput { backtrace, .. } + | Kind::PodJobProcessingError { backtrace, .. } + | Kind::PodJobSubmissionFailed { backtrace, .. } + | Kind::UnexpectedPathType { backtrace, .. } | Kind::BollardError { backtrace, .. } | Kind::ChronoParseError { backtrace, .. } | Kind::DOTError { backtrace, .. } diff --git a/src/core/graph.rs b/src/core/graph.rs index 8772dcc6..73bb87cb 100644 --- a/src/core/graph.rs +++ b/src/core/graph.rs @@ -1,5 +1,5 @@ use crate::{ - core::{pipeline::PipelineNode, util::get}, + core::{model::pipeline::PipelineNode, util::get}, uniffi::{error::Result, model::pipeline::Kernel}, }; use dot_parser::ast::Graph as DOTGraph; @@ -25,7 +25,7 @@ pub fn make_graph( let graph = DiGraph::::from_dot_graph(DOTGraph::try_from(input_dot)?).map( |_, node| PipelineNode { - name: node.id.clone(), + id: node.id.clone(), kernel: get(&metadata, &node.id) .unwrap_or_else(|error| panic!("{error}")) .clone(), diff --git a/src/core/mod.rs b/src/core/mod.rs index e4bf2a84..fe107cf0 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,40 +1,14 @@ -macro_rules! inner_attr_to_each { - { #!$attr:tt $($it:item)* } => { - $( - #$attr - $it - )* - } -} - pub(crate) mod error; pub(crate) mod graph; -pub(crate) mod pipeline; pub(crate) mod store; pub(crate) mod util; pub(crate) mod validation; -inner_attr_to_each! { - #![cfg(feature = "default")] - pub(crate) mod crypto; - pub(crate) mod model; - pub(crate) mod operator; - pub(crate) mod orchestrator; -} +pub(crate) mod crypto; +/// Model definition for orcapod +pub mod model; +pub(crate) mod operator; +pub(crate) mod orchestrator; -#[cfg(feature = "test")] -inner_attr_to_each! { - #![cfg_attr( - feature = "test", - allow( - missing_docs, - clippy::missing_errors_doc, - clippy::missing_panics_doc, - reason = "Documentation not necessary since private API.", - ), - )] - pub mod crypto; - pub mod model; - pub mod operator; - pub mod orchestrator; -} +/// Pipeline runner module +pub mod pipeline_runner; diff --git a/src/core/model/mod.rs b/src/core/model/mod.rs index b21a1728..242748ef 100644 --- a/src/core/model/mod.rs +++ b/src/core/model/mod.rs @@ -8,31 +8,36 @@ use std::{ hash::BuildHasher, result, }; -/// Converts a model instance into a consistent yaml. -/// -/// # Errors -/// -/// Will return `Err` if there is an issue converting an `instance` into YAML (w/o annotation). -pub fn to_yaml(instance: &T) -> Result { - let mapping: IndexMap = serde_yaml::from_str(&serde_yaml::to_string(instance)?)?; // cast to map - let mut yaml = serde_yaml::to_string( - &mapping - .iter() - .filter_map(|(k, v)| match &**k { - "annotation" | "hash" => None, - "pod" | "pod_job" => Some((k, v["hash"].clone())), - _ => Some((k, v.clone())), - }) - .collect::>(), - )?; // skip fields and convert refs to hash pointers - yaml.insert_str( - 0, - &format!("class: {}\n", get_type_name::().to_snake_case()), - ); // replace class at top - Ok(yaml) + +/// Trait to handle serialization to yaml for `OrcaPod` models +pub trait ToYaml: Serialize + Sized { + /// Serializes the instance to a YAML string. + /// # Errors + /// Will return `Err` if it fail to serialize instance to string + fn to_yaml(&self) -> Result { + let mapping: IndexMap = serde_yaml::from_str(&serde_yaml::to_string(self)?)?; // cast to map + let mut yaml = serde_yaml::to_string( + &mapping + .iter() + .filter_map(|(k, v)| Self::process_field(k, v)) + .collect::>(), + )?; // skip fields and convert refs to hash pointers + yaml.insert_str( + 0, + &format!("class: {}\n", get_type_name::().to_snake_case()), + ); // replace class at top + Ok(yaml) + } + + /// Filter out which field to serialize and which to omit + /// + /// # Returns + /// (`field_name`, `field_value`): to be pass to `to_yaml` for serialization + /// None: to skip + fn process_field(field_name: &str, field_value: &Value) -> Option<(String, Value)>; } -pub fn serialize_hashmap( +pub(crate) fn serialize_hashmap( map: &HashMap, serializer: S, ) -> result::Result @@ -44,7 +49,7 @@ where } #[allow(clippy::ref_option, reason = "Serde requires this signature.")] -pub fn serialize_hashmap_option( +pub(crate) fn serialize_hashmap_option( map_option: &Option>, serializer: S, ) -> result::Result @@ -57,4 +62,5 @@ where sorted.serialize(serializer) } -pub mod pod; +pub(crate) mod pipeline; +pub(crate) mod pod; diff --git a/src/core/model/pipeline.rs b/src/core/model/pipeline.rs new file mode 100644 index 00000000..ffb0e319 --- /dev/null +++ b/src/core/model/pipeline.rs @@ -0,0 +1,109 @@ +use std::{ + backtrace::Backtrace, + collections::{HashMap, HashSet}, +}; + +use crate::uniffi::{ + error::{Kind, OrcaError, Result}, + model::{ + packet::PathSet, + pipeline::{Kernel, Pipeline, PipelineJob}, + }, +}; +use itertools::Itertools as _; +use petgraph::Direction::Incoming; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct PipelineNode { + pub id: String, + pub kernel: Kernel, +} + +impl Pipeline { + /// Function to get the parents of a node + pub(crate) fn get_node_parents( + &self, + node: &PipelineNode, + ) -> impl Iterator { + // Find the NodeIndex for the given node_key + let node_index = self + .graph + .node_indices() + .find(|&idx| self.graph[idx] == *node); + node_index.into_iter().flat_map(move |idx| { + self.graph + .neighbors_directed(idx, Incoming) + .map(move |parent_idx| &self.graph[parent_idx]) + }) + } + + /// Return a vec of `node_names` that takes in inputs based on the `input_spec` + pub(crate) fn get_input_nodes(&self) -> HashSet<&String> { + let mut input_nodes = HashSet::new(); + + self.input_spec.iter().for_each(|(_, node_uris)| { + for node_uri in node_uris { + input_nodes.insert(&node_uri.node_id); + } + }); + + input_nodes + } +} + +impl PipelineJob { + /// Helpful function to get the input packet for input nodes of the pipeline based on the `pipeline_job` an`pipeline_spec`ec + /// # Errors + /// Will return `Err` if there is an issue getting the input packet per node. + /// # Returns + /// A `HashMap` where the key is the node name and the value is a vector of `HashMap` representing the input packets for that node. + pub fn get_input_packet_per_node( + &self, + ) -> Result>>> { + // For each node in the input specification, we will iterate over its mapping + // nodes_input_spec contains > + let mut nodes_input_spec = HashMap::new(); + for (input_key, node_uris) in &self.pipeline.input_spec { + for node_uri in node_uris { + let input_path_sets = self.input_packet.get(input_key).ok_or(OrcaError { + kind: Kind::KeyMissing { + key: input_key.clone(), + backtrace: Some(Backtrace::capture()), + }, + })?; + // There shouldn't be a duplicate key in the input packet as this will be handle by pipeline verify + let input_spec = nodes_input_spec + .entry(&node_uri.node_id) + .or_insert_with(HashMap::new); + input_spec.insert(&node_uri.key, input_path_sets); + } + } + + // For each node, compute the cartesian product of the path_sets for each unique combination of keys + let node_input_packets = nodes_input_spec + .into_iter() + .map(|(node_id, input_node_keys)| { + // We need to pull them out at the same time to ensure the key order is preserve to match the cartesian product + let (keys, values): (Vec<_>, Vec<_>) = input_node_keys.into_iter().unzip(); + + // Covert each combo into a packet + let packets = values + .into_iter() + .multi_cartesian_product() + .map(|combo| { + keys.iter() + .copied() + .zip(combo) + .map(|(key, pathset)| (key.to_owned(), pathset.to_owned())) + .collect::>() + }) + .collect::>>(); + + (node_id.to_owned(), packets) + }) + .collect::>(); + + Ok(node_input_packets) + } +} diff --git a/src/core/operator.rs b/src/core/operator.rs index 64db6206..b5487d3f 100644 --- a/src/core/operator.rs +++ b/src/core/operator.rs @@ -1,4 +1,4 @@ -use crate::uniffi::{error::Result, model::packet::Packet}; +use crate::uniffi::{error::Result, model::packet::Packet, operator::MapOperator}; use async_trait; use itertools::Itertools as _; use std::{clone::Clone, collections::HashMap, iter::IntoIterator, sync::Arc}; @@ -6,7 +6,7 @@ use tokio::sync::Mutex; #[async_trait::async_trait] pub trait Operator { - async fn next(&self, stream_name: String, packet: Packet) -> Result>; + async fn process_packet(&self, stream_name: String, packet: Packet) -> Result>; } pub struct JoinOperator { @@ -25,7 +25,7 @@ impl JoinOperator { #[async_trait::async_trait] impl Operator for JoinOperator { - async fn next(&self, stream_name: String, packet: Packet) -> Result> { + async fn process_packet(&self, stream_name: String, packet: Packet) -> Result> { let mut received_packets = self.received_packets.lock().await; received_packets .entry(stream_name.clone()) @@ -61,19 +61,9 @@ impl Operator for JoinOperator { } } -pub struct MapOperator { - map: HashMap, -} - -impl MapOperator { - pub fn new(map: &HashMap) -> Self { - Self { map: map.clone() } - } -} - #[async_trait::async_trait] impl Operator for MapOperator { - async fn next(&self, _: String, packet: Packet) -> Result> { + async fn process_packet(&self, _: String, packet: Packet) -> Result> { Ok(vec![ packet .iter() @@ -89,3 +79,222 @@ impl Operator for MapOperator { ]) } } + +#[cfg(test)] +mod tests { + #![expect(clippy::panic_in_result_fn, reason = "OK in tests.")] + + use crate::{ + core::operator::{JoinOperator, MapOperator, Operator}, + uniffi::{ + error::Result, + model::packet::{Blob, BlobKind, Packet, PathSet, URI}, + }, + }; + use std::{collections::HashMap, path::PathBuf}; + + fn make_packet_key(key_name: String, filepath: String) -> (String, PathSet) { + ( + key_name, + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: "default".into(), + path: PathBuf::from(filepath), + }, + checksum: String::new(), + }), + ) + } + + async fn next_batch( + operator: impl Operator, + packets: Vec<(String, Packet)>, + ) -> Result> { + let mut next_packets = vec![]; + for (stream_name, packet) in packets { + next_packets.extend(operator.process_packet(stream_name, packet).await?); + } + Ok(next_packets) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn join_once() -> Result<()> { + let operator = JoinOperator::new(2); + + let left_stream = (0..3) + .map(|i| { + ( + "left".into(), + Packet::from([make_packet_key( + "subject".into(), + format!("left/subject{i}.png"), + )]), + ) + }) + .collect::>(); + + let right_stream = (0..2) + .map(|i| { + ( + "right".into(), + Packet::from([make_packet_key( + "style".into(), + format!("right/style{i}.t7"), + )]), + ) + }) + .collect::>(); + + let mut input_streams = left_stream; + input_streams.extend(right_stream); + + assert_eq!( + next_batch(operator, input_streams).await?, + vec![ + Packet::from([ + make_packet_key("subject".into(), "left/subject0.png".into()), + make_packet_key("style".into(), "right/style0.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject1.png".into()), + make_packet_key("style".into(), "right/style0.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject2.png".into()), + make_packet_key("style".into(), "right/style0.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject0.png".into()), + make_packet_key("style".into(), "right/style1.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject1.png".into()), + make_packet_key("style".into(), "right/style1.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject2.png".into()), + make_packet_key("style".into(), "right/style1.t7".into()), + ]), + ], + "Unexpected streams." + ); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn join_spotty() -> Result<()> { + let operator = JoinOperator::new(2); + + assert_eq!( + operator + .process_packet( + "right".into(), + Packet::from([make_packet_key("style".into(), "right/style0.t7".into())]) + ) + .await?, + vec![], + "Unexpected streams." + ); + + assert_eq!( + operator + .process_packet( + "right".into(), + Packet::from([make_packet_key("style".into(), "right/style1.t7".into())]) + ) + .await?, + vec![], + "Unexpected streams." + ); + + assert_eq!( + operator + .process_packet( + "left".into(), + Packet::from([make_packet_key( + "subject".into(), + "left/subject0.png".into() + )]) + ) + .await?, + vec![ + Packet::from([ + make_packet_key("subject".into(), "left/subject0.png".into()), + make_packet_key("style".into(), "right/style0.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject0.png".into()), + make_packet_key("style".into(), "right/style1.t7".into()), + ]), + ], + "Unexpected streams." + ); + + assert_eq!( + next_batch( + operator, + (1..3) + .map(|i| { + ( + "left".into(), + Packet::from([make_packet_key( + "subject".into(), + format!("left/subject{i}.png"), + )]), + ) + }) + .collect::>() + ) + .await?, + vec![ + Packet::from([ + make_packet_key("subject".into(), "left/subject1.png".into()), + make_packet_key("style".into(), "right/style0.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject1.png".into()), + make_packet_key("style".into(), "right/style1.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject2.png".into()), + make_packet_key("style".into(), "right/style0.t7".into()), + ]), + Packet::from([ + make_packet_key("subject".into(), "left/subject2.png".into()), + make_packet_key("style".into(), "right/style1.t7".into()), + ]), + ], + "Unexpected streams." + ); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn map_once() -> Result<()> { + let operator = MapOperator { + map: HashMap::from([("key_old".into(), "key_new".into())]), + }; + + assert_eq!( + operator + .process_packet( + "parent".into(), + Packet::from([ + make_packet_key("key_old".into(), "some/key.txt".into()), + make_packet_key("subject".into(), "some/subject.txt".into()), + ]), + ) + .await?, + vec![Packet::from([ + make_packet_key("key_new".into(), "some/key.txt".into()), + make_packet_key("subject".into(), "some/subject.txt".into()), + ]),], + "Unexpected packet." + ); + + Ok(()) + } +} diff --git a/src/core/pipeline.rs b/src/core/pipeline.rs deleted file mode 100644 index 494e4919..00000000 --- a/src/core/pipeline.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::uniffi::model::pipeline::Kernel; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct PipelineNode { - pub name: String, - pub kernel: Kernel, -} diff --git a/src/core/pipeline_runner.rs b/src/core/pipeline_runner.rs new file mode 100644 index 00000000..cccbac98 --- /dev/null +++ b/src/core/pipeline_runner.rs @@ -0,0 +1,900 @@ +use crate::{ + core::{ + crypto::hash_buffer, + model::{pipeline::PipelineNode, serialize_hashmap}, + operator::{JoinOperator, Operator}, + util::{get, make_key_expr}, + }, + uniffi::{ + error::{ + Kind, OrcaError, Result, + selector::{self}, + }, + model::{ + packet::{Packet, PathSet, URI}, + pipeline::{Kernel, PipelineJob, PipelineResult, PipelineStatus}, + pod::{Pod, PodJob, PodResult}, + }, + orchestrator::{ + PodStatus, + agent::{Agent, AgentClient, Response}, + }, + }, +}; +use async_trait::async_trait; +use names::{Generator, Name}; +use serde_yaml::Serializer; +use snafu::{OptionExt as _, ResultExt as _}; +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, + sync::Arc, +}; +use tokio::{ + sync::{Mutex, RwLock}, + task::JoinSet, +}; + +static NODE_OUTPUT_KEY_EXPR: &str = "output"; +static FAILURE_KEY_EXP: &str = "failure"; + +/// Internal representation of a pipeline run, which should not be made public due to the fact that it contains +#[derive(Debug)] +struct PipelineRunInternal { + /// `PipelineJob` that this run is associated with + assigned_name: String, + session: Arc, // Zenoh session for communication + agent_client: Arc, // Zenoh agent client for communication with docker orchestrators + pipeline_job: Arc, // The pipeline job that this run is associated with + node_tasks: Arc>>>, // JoinSet of tasks for each node in the pipeline + outputs: Arc>>>, // String is the node key, while hash + failure_logs: Arc>>, // Logs of processing failures + failure_logging_task: Arc>>>, // JoinSet of tasks for logging failures + namespace: String, + namespace_lookup: HashMap, +} + +impl PipelineRunInternal { + fn make_key_expr(&self, node_id: &str, event: &str) -> String { + make_key_expr( + &self.agent_client.group, + &self.agent_client.host, + "pipeline_run", + &BTreeMap::from([ + ("event".to_owned(), event.to_owned()), + ("node_id".to_owned(), node_id.to_owned()), + ("pipeline_run_id".to_owned(), self.assigned_name.clone()), + ]), + ) + } + + fn make_abort_request_key_exp(&self) -> String { + make_key_expr( + &self.agent_client.group, + &self.agent_client.host, + "pipeline_run", + &BTreeMap::from([ + ("event".to_owned(), "abort".to_owned()), + ("pipeline_run_id".to_owned(), self.assigned_name.clone()), + ]), + ) + } + + // Utils functions + async fn send_packets(&self, node_id: &str, output_packets: &Vec) -> Result<()> { + Ok(self + .session + .put( + self.make_key_expr(node_id, NODE_OUTPUT_KEY_EXPR), + serde_json::to_string(output_packets)?, + ) + .await + .context(selector::AgentCommunicationFailure {})?) + } + + async fn send_err_msg(&self, node_id: &str, err: OrcaError) { + self.session + .put( + &self.make_key_expr(node_id, FAILURE_KEY_EXP), + format!("Node {node_id}: {err}"), + ) + .await + .context(selector::AgentCommunicationFailure {}) + .unwrap_or_else(|send_err| { + eprintln!("Failed to send error message for node {node_id}: {send_err}"); + }); + } + + async fn send_abort_request(&self) -> Result<()> { + Ok(self + .session + .put(&self.make_abort_request_key_exp(), vec![]) + .await + .context(selector::AgentCommunicationFailure {})?) + } + + async fn get_status(&self) -> PipelineStatus { + if !self.node_tasks.lock().await.is_empty() { + PipelineStatus::Running + } else if self.outputs.read().await.is_empty() { + PipelineStatus::Failed + } else if self.failure_logs.read().await.is_empty() { + PipelineStatus::Succeeded + } else { + PipelineStatus::PartiallySucceeded + } + } +} + +/// Runner that uses a docker agent to run pipelines +#[derive(Debug, Clone)] +pub struct DockerPipelineRunner { + agent: Arc, + pipeline_runs: HashMap>, +} + +/// This is an implementation of a pipeline runner that uses Zenoh to communicate between the tasks +/// The runtime is tokio +/// +/// These are the key expressions of the components of the pipeline: +/// Input Node: `pipeline_job_hash/input_node/outputs` (This is where the `pipeline_job` packets get fed to) +/// Nodes: `pipeline_job_hash/node_id/outputs/(success|failure)` (This is where the node outputs are sent to) +/// +impl DockerPipelineRunner { + /// Create a new Docker pipeline runner + /// # Errors + /// Will error out if the environment variable `HOSTNAME` is not set + pub fn new(agent: Arc) -> Self { + Self { + agent, + pipeline_runs: HashMap::new(), + } + } + + /// Will start a new pipeline run with the given `PipelineJob` + /// This will start the async tasks for each node in the pipeline + /// including the one that captures the outputs from the leaf nodes + /// + /// Upon receiving the ready message from all the nodes, it will send the input packets to the input node + /// + /// # Errors + /// Will error out if the pipeline job fails to start + pub async fn start( + &mut self, + pipeline_job: PipelineJob, + namespace: &str, // Name space to save pod_results to + namespace_lookup: &HashMap, + ) -> Result { + // Create a new pipeline run + let pipeline_run = Arc::new(PipelineRunInternal { + pipeline_job: pipeline_job.into(), + outputs: Arc::new(RwLock::new(HashMap::new())), + node_tasks: Arc::new(Mutex::new(JoinSet::new())), + failure_logs: Arc::new(RwLock::new(Vec::new())), + failure_logging_task: Arc::new(Mutex::new(JoinSet::new())), + assigned_name: Generator::with_naming(Name::Plain).next().context( + selector::MissingInfo { + details: "unable to generate a random name", + }, + )?, + session: Arc::clone(&self.agent.client.session), + agent_client: Arc::clone(&self.agent.client), + namespace: namespace.to_owned(), + namespace_lookup: namespace_lookup.clone(), + }); + + // Create failure logging task + pipeline_run + .failure_logging_task + .lock() + .await + .spawn(Self::failure_capture_task(Arc::clone(&pipeline_run))); + + // Create the processor task for each node + // The id for the pipeline_run is the pipeline_job hash + let pipeline_run_id = pipeline_run.pipeline_job.hash.clone(); + + let graph = &pipeline_run.pipeline_job.pipeline.graph; + + // Create the subscriber that listen for ready messages + let subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr("*", "node_ready")) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Get the set of input_nodes + let input_nodes = pipeline_run.pipeline_job.pipeline.get_input_nodes(); + + // Iterate through each node in the graph and spawn a task for each + for node_idx in graph.node_indices() { + let node = &graph[node_idx]; + + // Spawn the task + pipeline_run + .node_tasks + .lock() + .await + .spawn(Self::spawn_node_processing_task( + graph[node_idx].clone(), + Arc::clone(&pipeline_run), + input_nodes.contains(&node.id), + )); + } + + // Spawn the task that captures the outputs based on the output_spec + let mut node_output_spec = HashMap::new(); + // Group the output spec by node + for (output_key, node_uri) in &pipeline_run.pipeline_job.pipeline.output_spec { + node_output_spec + .entry(node_uri.node_id.clone()) + .or_insert_with(HashMap::new) + .insert(output_key.clone(), node_uri.key.clone()); + } + + for (node_id, key_mapping) in node_output_spec { + // Spawn the task that captures the outputs + pipeline_run + .node_tasks + .lock() + .await + .spawn(Self::create_output_capture_task_for_node( + key_mapping, + Arc::clone(&pipeline_run), + node_id.clone(), + )); + } + + // Wait for all nodes to be ready before sending inputs + + let num_of_nodes = graph.node_count(); + let mut ready_nodes = 0; + + while (subscriber.recv_async().await).is_ok() { + // Message is empty, just increment the counter + ready_nodes += 1; + + if ready_nodes == num_of_nodes { + break; // All nodes are ready, we can start sending inputs + } + } + + // For each node send all the packets associate with it + for (node_id, input_packets) in pipeline_run.pipeline_job.get_input_packet_per_node()? { + // Send the packet to the input node key_exp + pipeline_run + .send_packets(&format!("input_node_{node_id}"), &input_packets) + .await?; + + // Packets are sent, thus we can send the empty vec which signify processing is done + pipeline_run + .send_packets(&format!("input_node_{node_id}"), &Vec::new()) + .await?; + } + + // Insert into the list of pipeline runs + self.pipeline_runs + .insert(pipeline_run_id.clone(), pipeline_run); + + // Return the pipeline run id + Ok(pipeline_run_id) + } + + /// Given a pipeline run, wait for all its tasks to complete and return the `PipelineResult` + /// + /// # Errors + /// Will error out if any of the pipeline tasks failed to join + pub async fn get_result(&mut self, pipeline_run_id: &str) -> Result { + // To get the result, the pipeline execution must be complete, so we need to await on the tasks + + let pipeline_run = + self.pipeline_runs + .get_mut(pipeline_run_id) + .context(selector::KeyMissing { + key: pipeline_run_id.to_owned(), + })?; + + // Wait for all the tasks to complete + while let Some(result) = pipeline_run.node_tasks.lock().await.join_next().await { + result??; + } + + Ok(PipelineResult { + pipeline_job: Arc::clone(&pipeline_run.pipeline_job), + failure_logs: pipeline_run.failure_logs.read().await.clone(), + status: pipeline_run.get_status().await, + output_packets: pipeline_run.outputs.read().await.clone(), + }) + } + + /// Stop the pipeline run and all its tasks + /// This will send a stop message to a channel that all node manager task are subscribed to. + /// Upon receiving the stop message, each node manager will force abort all of its task and exit. + /// + /// # Errors + /// Will error out if the pipeline run is not found or if any of the tasks fail to stop correctly + pub async fn stop(&mut self, pipeline_run_id: &str) -> Result<()> { + // Get the pipeline run first then broadcast the abort request signal + let pipeline_run = + self.pipeline_runs + .get_mut(pipeline_run_id) + .context(selector::KeyMissing { + key: pipeline_run_id.to_owned(), + })?; + + // Send the abort request signal + pipeline_run.send_abort_request().await?; + + while pipeline_run + .node_tasks + .lock() + .await + .join_next() + .await + .is_some() + {} + Ok(()) + } + + /// This will capture the outputs of the given nodes and store it in the `outputs` map + async fn create_output_capture_task_for_node( + // + key_mapping: HashMap, + pipeline_run: Arc, + node_id: String, + ) -> Result<()> { + // Determine which keys we are interested in for the given node_id + + // Create a zenoh session + let subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr(&node_id, NODE_OUTPUT_KEY_EXPR)) + .await + .context(selector::AgentCommunicationFailure {})?; + + while let Ok(payload) = subscriber.recv_async().await { + // Extract the message from the payload + let packets: Vec = serde_json::from_slice(&payload.payload().to_bytes())?; + + if packets.is_empty() { + // Output node exited, thus we can exit the capture task too + break; + } + let mut outputs_lock = pipeline_run.outputs.write().await; + + for packet in packets { + for (output_key, node_key) in &key_mapping { + outputs_lock + .entry(output_key.to_owned()) + .or_default() + .push(get(&packet, node_key)?.clone()); + } + } + } + Ok(()) + } + + async fn failure_capture_task(pipeline_run: Arc) -> Result<()> { + let sub = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr("*", FAILURE_KEY_EXP)) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Listen to any failure messages and write it the logs + while let Ok(payload) = sub.recv_async().await { + // Extract the message from the payload + let failure_msg: String = serde_json::from_slice(&payload.payload().to_bytes())?; + // Store the failure message in the logs + pipeline_run.failure_logs.write().await.push(failure_msg); + } + + Ok(()) + } + + /// Function to start tasks associated with the node + /// Steps: + /// - Create the node processor based on the kernel type + /// - Create the zenoh session + /// - Create a join set to spawn and handle incoming messages tasks + /// - Create a subscriber for each of the parent nodes (Should only be 1, unless it is a joiner node) + /// - Create an abort listener task that will listen for stop requests + /// - For each subscriber, handle the incoming message appropriately + /// + /// # Errors + /// Will error out if the kernel for the node is not found or if the + async fn spawn_node_processing_task( + node: PipelineNode, + pipeline_run: Arc, + is_input_node: bool, + ) -> Result<()> { + // Get the node parents + let parent_nodes = pipeline_run + .pipeline_job + .pipeline + .get_node_parents(&node) + .collect::>(); + + // Create the correct processor for the node based on the kernel type + let node_processor: Arc>> = + Arc::new(Mutex::new(match &node.kernel { + Kernel::Pod { pod } => Box::new(PodProcessor::new( + Arc::clone(&pipeline_run), + node.id.clone(), + Arc::clone(pod), + )), + Kernel::MapOperator { mapper } => Box::new(OperatorProcessor::new( + Arc::clone(&pipeline_run), + node.id.clone(), + Arc::clone(mapper), + parent_nodes.len(), + )), + Kernel::JoinOperator => Box::new(OperatorProcessor::new( + Arc::clone(&pipeline_run), + node.id.clone(), + JoinOperator::new(parent_nodes.len()).into(), + parent_nodes.len(), + )), + })); + + // Create a join set to spawn and handle incoming messages tasks + let mut listener_tasks = JoinSet::new(); + + // Create a list of node_ids that this node should listen to + let mut nodes_to_sub_to = parent_nodes + .iter() + .map(|parent_node| parent_node.id.clone()) + .collect::>(); + + if is_input_node { + // If the node is an input node, we need to add the input node key expression + nodes_to_sub_to.push(format!("input_node_{}", node.id)); + } + + // For each node in nodes_to_subscribe_to, call the event handler func + for node_to_sub in &nodes_to_sub_to { + listener_tasks.spawn(Self::event_handler( + Arc::clone(&pipeline_run), + node.id.clone(), + node_to_sub.to_owned(), + Arc::clone(&node_processor), + )); + } + + // Create the listener task for the stop request + let abort_request_handler_task = tokio::spawn(Self::abort_request_event_handler( + node_processor, + Arc::clone(&pipeline_run), + )); + + // Wait for all tasks to be spawned and reply with ready message + // This is to ensure that the pipeline run knows when all tasks are ready to receive inputs + let mut num_of_ready_event_handler: usize = 0; + // Build the subscriber + let status_subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr(&node.id, "event_handler_ready")) + .await + .context(selector::AgentCommunicationFailure {})?; + + while status_subscriber.recv_async().await.is_ok() { + num_of_ready_event_handler += 1; + if num_of_ready_event_handler == nodes_to_sub_to.len() { + // +1 for the stop request task + break; // All tasks are ready, we can start sending inputs + } + } + + // Send a ready message so the pipeline knows when to start sending inputs + pipeline_run + .session + .put(pipeline_run.make_key_expr(&node.id, "node_ready"), vec![]) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Wait for all task to complete + while let Some(result) = listener_tasks.join_next().await { + match result { + Ok(Ok(())) => {} // Task completed successfully + Ok(Err(err)) => { + pipeline_run.send_err_msg(&node.id, err).await; + } + Err(err) => { + pipeline_run + .send_err_msg(&node.id, OrcaError::from(err)) + .await; + } + } + } + + // Abort the stop listener task since we don't need it anymore + abort_request_handler_task.abort(); + + Ok(()) + } + + /// This is the actual handler for incoming messages for the node + async fn event_handler( + pipeline_run: Arc, + node_id: String, + node_to_sub_to: String, + processor: Arc>>, + ) -> Result<()> { + // Create the subscriber + let subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_key_expr(&node_to_sub_to, NODE_OUTPUT_KEY_EXPR)) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Send out ready signal + pipeline_run + .session + .put( + pipeline_run.make_key_expr(&node_id, "event_handler_ready"), + vec![], + ) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Listen to the key + loop { + let sample = subscriber + .recv_async() + .await + .context(selector::AgentCommunicationFailure)?; + + // Extract out the packets + let packets: Vec = serde_json::from_slice(&sample.payload().to_bytes())?; + + // Check if the packets are empty, if so that means the node is finished processing + if packets.is_empty() { + processor + .lock() + .await + .mark_parent_as_complete(&node_to_sub_to) + .await; + break; + } + + // For each packet, we need to process it + for packet in packets { + processor + .lock() + .await + .process_incoming_packet(&node_to_sub_to, &packet) + .await; + } + } + Ok::<(), OrcaError>(()) + } + + /// This task will listen for stop requests on the given key expression + async fn abort_request_event_handler( + node_processor: Arc>>, + pipeline_run: Arc, + ) -> Result<()> { + let subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.make_abort_request_key_exp()) + .await + .context(selector::AgentCommunicationFailure {})?; + while subscriber.recv_async().await.is_ok() { + // Received a request to stop, therefore we need to tell the node_processor to shutdown + node_processor.lock().await.stop(); + } + Ok::<(), OrcaError>(()) + } +} + +/// Unify the interface for node processors and provide a common way to handle processing of incoming messages +/// This trait defines the methods that all node processors should implement +/// +/// Main purpose was to reduce the amount of code duplication between different node processors +/// As a result, each processor only needs to worry about writing their own function to process the msg +#[async_trait] +trait NodeProcessor: Send + Sync { + async fn process_incoming_packet(&mut self, sender_node_id: &str, incoming_packet: &Packet); + + /// Notifies the processor that the parent node has completed processing + /// If it is the last parent to complete, it will wait for all processing task to finish + /// Then send a completion signal + async fn mark_parent_as_complete(&mut self, parent_node_id: &str); + + fn stop(&mut self); +} + +/// Processor for Pods +/// Currently missing implementation to call agents for actual pod processing +struct PodProcessor { + pipeline_run: Arc, + node_id: String, + pod: Arc, + processing_tasks: JoinSet<()>, +} + +impl PodProcessor { + fn new(pipeline_run: Arc, node_id: String, pod: Arc) -> Self { + Self { + pipeline_run, + node_id, + pod, + processing_tasks: JoinSet::new(), + } + } +} + +impl PodProcessor { + /// Will handle the creation of the pod job, submission to the agent, listening for completion, and extracting the `output_packet` if successful + async fn process_packet( + pipeline_run: Arc, + node_id: String, + pod: Arc, + incoming_packet: HashMap, + ) -> Result { + // Hash the input_packet to create a unique identifier for the pod job + let input_packet_hash = { + let mut buf = Vec::new(); + let mut serializer = Serializer::new(&mut buf); + serialize_hashmap(&incoming_packet, &mut serializer)?; + hash_buffer(buf) + }; + + // Create the pod job + let pod_job = PodJob::new( + None, + Arc::clone(&pod), + incoming_packet, + URI { + namespace: pipeline_run.namespace.clone(), + path: format!( + "pipeline_outputs/{}/{node_id}/{input_packet_hash}", + pipeline_run.assigned_name + ) + .into(), + }, + pod.recommend_specs.cpus, + pod.recommend_specs.memory, + None, + &pipeline_run.namespace_lookup, + )?; + + // Create listener for pod_job + // Create the subscriber + let pod_job_subscriber = pipeline_run + .session + .declare_subscriber(pipeline_run.agent_client.make_key_expr( + true, + "pod_job", + BTreeMap::from([("hash", pod_job.hash.clone()), ("event", "*".to_owned())]), + )) + .await + .context(selector::AgentCommunicationFailure {})?; + + // Create the async task to listen for the pod job completion + let pod_job_listener_task = tokio::spawn(async move { + // Wait for the pod job to complete and extract the result + let sample = pod_job_subscriber + .recv_async() + .await + .context(selector::AgentCommunicationFailure {})?; + // Extract the pod_result from the payload + let pod_result: PodResult = serde_json::from_slice(&sample.payload().to_bytes())?; + Ok::<_, OrcaError>(pod_result) + }); + + // Submit it to the client and get the response to make sure it was successful + let responses = pipeline_run + .agent_client + .start_pod_jobs(vec![pod_job.clone().into()]) + .await; + let response = responses + .first() + .context(selector::InvalidIndex { idx: 0_usize })?; + + match response { + Response::Ok => (), + Response::Err(err) => { + return Err(OrcaError { + kind: Kind::PodJobProcessingError { + hash: pod_job.hash, + reason: err.clone(), + backtrace: Some(snafu::Backtrace::capture()), + }, + }); + } + } + + // Get the pod result from the listener task + let pod_result = pod_job_listener_task.await??; + // Get the output packet for the pod result + Ok(match pod_result.status { + PodStatus::Completed => { + // Get the output packet + pod_result.output_packet + } + PodStatus::Failed(exit_code) => { + // Processing failed, thus return the error + return Err(OrcaError { + kind: Kind::PodJobProcessingError { + hash: pod_result.pod_job.hash.clone(), + reason: format!("Pod processing failed with exit code {exit_code}"), + backtrace: Some(snafu::Backtrace::capture()), + }, + }); + } + PodStatus::Running | PodStatus::Unset | PodStatus::Undefined => { + // This should not happen, but if it does, we will return an error + return Err(OrcaError { + kind: Kind::PodJobProcessingError { + hash: pod_result.pod_job.hash.clone(), + reason: "Pod result status is running or unset".to_owned(), + backtrace: Some(snafu::Backtrace::capture()), + }, + }); + } + }) + } +} + +#[async_trait] +impl NodeProcessor for PodProcessor { + async fn process_incoming_packet( + &mut self, + _sender_node_id: &str, + incoming_packet: &HashMap, + ) { + // Clone all necessary fields from self to move into the async block + let pipeline_run = Arc::clone(&self.pipeline_run); + let node_id = self.node_id.clone(); + let pod = Arc::clone(&self.pod); + + let incoming_packet_inner = incoming_packet.clone(); + + self.processing_tasks.spawn(async move { + let result = match Self::process_packet( + Arc::clone(&pipeline_run), + node_id.clone(), + Arc::clone(&pod), + incoming_packet_inner.clone(), + ) + .await + { + Ok(output_packet) => { + match pipeline_run + .send_packets(&node_id, &vec![output_packet]) + .await + { + Ok(()) => Ok(()), + Err(err) => Err(err), + } + } + Err(err) => Err(err), + }; + + match result { + Ok(()) => { + // Successfully processed the packet, nothing to do + } + Err(err) => { + pipeline_run.send_err_msg(&node_id, err).await; + } + } + }); + } + + async fn mark_parent_as_complete(&mut self, _parent_node_id: &str) { + // For pod we only have one parent, thus execute the exit case + while self.processing_tasks.join_next().await.is_some() {} + // Send out completion signal + match self + .pipeline_run + .send_packets(&self.node_id, &Vec::new()) + .await + { + Ok(()) => {} + Err(err) => { + self.pipeline_run.send_err_msg(&self.node_id, err).await; + } + } + } + + fn stop(&mut self) { + self.processing_tasks.abort_all(); + } +} + +struct OperatorProcessor { + pipeline_run: Arc, + node_id: String, + operator: Arc, + num_of_parents: usize, + num_of_completed_parents: usize, + processing_tasks: JoinSet<()>, +} + +impl OperatorProcessor { + /// Create a new operator processor + pub fn new( + pipeline_run: Arc, + node_id: String, + operator: Arc, + num_of_parents: usize, + ) -> Self { + Self { + pipeline_run, + node_id, + operator, + num_of_parents, + num_of_completed_parents: 0, + processing_tasks: JoinSet::new(), + } + } +} + +#[async_trait] +impl NodeProcessor for OperatorProcessor { + async fn process_incoming_packet( + &mut self, + sender_node_id: &str, + incoming_packet: &HashMap, + ) { + // Clone all necessary fields from self to move into the async block + let operator = Arc::clone(&self.operator); + let pipeline_run = Arc::clone(&self.pipeline_run); + let node_id = self.node_id.clone(); + + let sender_node_id_inner = sender_node_id.to_owned(); + let incoming_packet_inner = incoming_packet.clone(); + + self.processing_tasks.spawn(async move { + let processing_result = operator + .process_packet(sender_node_id_inner, incoming_packet_inner) + .await; + + match processing_result { + Ok(output_packets) => { + if !output_packets.is_empty() { + // Send out all the packets + match pipeline_run.send_packets(&node_id, &output_packets).await { + Ok(()) => {} + Err(err) => { + pipeline_run.send_err_msg(&node_id, err).await; + } + } + } + } + Err(err) => { + pipeline_run.send_err_msg(&node_id, err).await; + } + } + }); + } + + async fn mark_parent_as_complete(&mut self, _parent_node_id: &str) { + // Figure out if this is the last parent or not + self.num_of_completed_parents += 1; + + if self.num_of_completed_parents == self.num_of_parents { + // All parents are complete, thus we need to wait on all processing tasks then exit + while (self.processing_tasks.join_next().await).is_some() { + // Wait for all tasks to complete + } + // Send out completion signal which is same as success but it is an empty vec of packets + match self + .pipeline_run + .send_packets(&self.node_id, &Vec::new()) + .await + { + Ok(()) => {} + Err(err) => { + self.pipeline_run.send_err_msg(&self.node_id, err).await; + } + } + } + } + + fn stop(&mut self) { + self.processing_tasks.abort_all(); + } +} diff --git a/src/core/store/filestore.rs b/src/core/store/filestore.rs index 622a3a82..684ab2db 100644 --- a/src/core/store/filestore.rs +++ b/src/core/store/filestore.rs @@ -1,6 +1,6 @@ use crate::{ core::{ - model::to_yaml, + model::ToYaml, store::MODEL_NAMESPACE, util::{get_type_name, parse_debug_name}, }, @@ -111,7 +111,7 @@ impl LocalFileStore { Ok(model_info.hash) } - fn save_file(file: impl AsRef, content: impl AsRef<[u8]>) -> Result<()> { + pub(crate) fn save_file(file: impl AsRef, content: impl AsRef<[u8]>) -> Result<()> { if let Some(parent) = file.as_ref().parent() { fs::create_dir_all(parent)?; } @@ -123,7 +123,7 @@ impl LocalFileStore { /// # Errors /// /// Will return `Err` if there is an issue storing the model. - pub(crate) fn save_model( + pub(crate) fn save_model( &self, model: &T, hash: &str, @@ -174,7 +174,7 @@ impl LocalFileStore { .yellow(), ); } else { - Self::save_file(spec_file, to_yaml(model)?)?; + Self::save_file(spec_file, model.to_yaml()?)?; } Ok(()) } diff --git a/src/core/util.rs b/src/core/util.rs index f41e4356..4f0f72fd 100644 --- a/src/core/util.rs +++ b/src/core/util.rs @@ -1,6 +1,11 @@ use crate::uniffi::error::{Result, selector}; use snafu::OptionExt as _; -use std::{any::type_name, borrow::Borrow, collections::HashMap, fmt, hash}; +use std::{ + any::type_name, + borrow::Borrow, + collections::{BTreeMap, HashMap}, + fmt, hash, +}; #[expect( clippy::unwrap_used, @@ -35,3 +40,19 @@ where details: format!("key = {key:?}"), })?) } + +pub fn make_key_expr( + group: &str, + host: &str, + topic: &str, + content: &BTreeMap, +) -> String { + // For each key-value pair in the content, we format it as "key/value" and join them with "/". + // The final format will be "group/host/topic/key1/value1/key2/value + let content_converted = content + .iter() + .map(|(k, v)| format!("{k}/{v}")) + .collect::>() + .join("/"); + format!("{group}/{host}/{topic}/{content_converted}") +} diff --git a/src/uniffi/error.rs b/src/uniffi/error.rs index 00ed5c2e..bda85d62 100644 --- a/src/uniffi/error.rs +++ b/src/uniffi/error.rs @@ -18,6 +18,7 @@ use std::{ }; use tokio::task; use uniffi; + /// Shorthand for a Result that returns an [`OrcaError`]. pub type Result = result::Result; /// Possible errors you may encounter. @@ -30,6 +31,28 @@ pub(crate) enum Kind { source: Box, backtrace: Option, }, + #[snafu(display("Empty directory: {dir:?}, where they should be files"))] + EmptyDir { + dir: PathBuf, + backtrace: Option, + }, + #[snafu(display( + "Failed to extract run info from the container image file: {container_name}." + ))] + FailedToExtractRunInfo { + container_name: String, + backtrace: Option, + }, + #[snafu(display( + "Missing expected output file or dir with key {packet_key} at path {path:?} for pod job (hash: {pod_job_hash})." + ))] + FailedToGetPodJobOutput { + pod_job_hash: String, + packet_key: String, + path: Box, + io_error: Box, + backtrace: Option, + }, #[snafu(display("Incomplete {kind} packet. Missing `{missing_keys:?}` keys."))] IncompletePacket { kind: String, @@ -50,11 +73,37 @@ pub(crate) enum Kind { source: io::Error, backtrace: Option, }, + #[snafu(display("Failed to get items at idx {idx}."))] + InvalidIndex { + idx: usize, + backtrace: Option, + }, + #[snafu(display("Key '{key}' was not found in map."))] + KeyMissing { + key: String, + backtrace: Option, + }, #[snafu(display("Missing info. Details: {details}."))] MissingInfo { details: String, backtrace: Option, }, + #[snafu(display("Pod job submission failed with reason: {reason}."))] + PodJobSubmissionFailed { + reason: String, + backtrace: Option, + }, + #[snafu(display("Pod job {hash} failed to process with reason: {reason}."))] + PodJobProcessingError { + hash: String, + reason: String, + backtrace: Option, + }, + #[snafu(display("Unexpected path type: {path:?}. Only support files and directories."))] + UnexpectedPathType { + path: PathBuf, + backtrace: Option, + }, #[snafu(transparent)] BollardError { source: Box, diff --git a/src/uniffi/mod.rs b/src/uniffi/mod.rs index e02fd6c9..2443ce00 100644 --- a/src/uniffi/mod.rs +++ b/src/uniffi/mod.rs @@ -2,6 +2,8 @@ pub mod error; /// Components of the data model. pub mod model; +/// Operators for pipeline +pub mod operator; /// Interface into container orchestration engine. pub mod orchestrator; /// Data persistence provided by a store backend. diff --git a/src/uniffi/model/packet.rs b/src/uniffi/model/packet.rs index 595a3f92..df9f4a91 100644 --- a/src/uniffi/model/packet.rs +++ b/src/uniffi/model/packet.rs @@ -2,6 +2,9 @@ use serde::{Deserialize, Serialize}; use std::{collections::HashMap, path::PathBuf}; use uniffi; +use crate::core::util::get; +use crate::uniffi::error::Result; + /// Path sets are named and represent an abstraction for the file(s) that represent some particular /// data within a compute environment. #[derive(uniffi::Record, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -12,6 +15,9 @@ pub struct PathInfo { pub match_pattern: String, } +#[uniffi::export] +impl PathInfo {} + /// File or directory options for BLOBs. #[derive(uniffi::Enum, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)] pub enum BlobKind { @@ -52,5 +58,28 @@ pub enum PathSet { Collection(Vec), } +impl PathSet { + /// Util function to convert ``PathSet`` to ``PathBuf`` given a namespace lookup table + /// + /// # Errors + /// Will error out if namespace is missing in namespace lookup + pub fn to_path_buf(&self, namespace_lookup: &HashMap) -> Result> { + match self { + Self::Unary(blob) => { + let base_path = get(namespace_lookup, &blob.location.namespace)?; + Ok(vec![base_path.join(&blob.location.path)]) + } + Self::Collection(blobs) => { + let mut paths = Vec::with_capacity(blobs.len()); + for blob in blobs { + let base_path = get(namespace_lookup, &blob.location.namespace)?; + paths.push(base_path.join(&blob.location.path)); + } + Ok(paths) + } + } + } +} + /// A complete set of inputs to be provided to a computational unit. pub type Packet = HashMap; diff --git a/src/uniffi/model/pipeline.rs b/src/uniffi/model/pipeline.rs index 223f73cf..af22d5e9 100644 --- a/src/uniffi/model/pipeline.rs +++ b/src/uniffi/model/pipeline.rs @@ -2,7 +2,7 @@ use crate::{ core::{ crypto::{hash_blob, make_random_hash}, graph::make_graph, - pipeline::PipelineNode, + model::pipeline::PipelineNode, validation::validate_packet, }, uniffi::{ @@ -11,6 +11,7 @@ use crate::{ packet::{PathSet, URI}, pod::Pod, }, + operator::MapOperator, }, }; use derive_more::Display; @@ -30,9 +31,9 @@ pub struct Pipeline { #[getset(skip)] pub graph: DiGraph, /// Exposed, internal input specification. Each input may be fed into more than one node/key if desired. - pub input_spec: HashMap>, + pub input_spec: HashMap>, /// Exposed, internal output specification. Each output is associated with only one node/key. - pub output_spec: HashMap, + pub output_spec: HashMap, } #[uniffi::export] @@ -46,14 +47,15 @@ impl Pipeline { pub fn new( graph_dot: &str, metadata: HashMap, - input_spec: &HashMap>, - output_spec: &HashMap, + input_spec: HashMap>, + output_spec: HashMap, ) -> Result { let graph = make_graph(graph_dot, metadata)?; + Ok(Self { graph, - input_spec: input_spec.clone(), - output_spec: output_spec.clone(), + input_spec, + output_spec, }) } } @@ -79,7 +81,6 @@ pub struct PipelineJob { pub output_dir: URI, } -#[expect(clippy::excessive_nesting, reason = "Nesting manageable.")] #[uniffi::export] impl PipelineJob { /// Construct a new pipeline job instance. @@ -91,7 +92,7 @@ impl PipelineJob { pub fn new( pipeline: Arc, input_packet: &HashMap>, - output_dir: &URI, + output_dir: URI, namespace_lookup: &HashMap, ) -> Result { validate_packet("input".into(), &pipeline.input_spec, input_packet)?; @@ -124,33 +125,75 @@ impl PipelineJob { hash: make_random_hash(), pipeline, input_packet: input_packet_with_checksum, - output_dir: output_dir.clone(), + output_dir, }) } } +/// Struct to hold the result of a pipeline execution. +#[derive(uniffi::Object, Debug, Clone, Deserialize, Serialize, Display, CloneGetters)] +#[getset(get_clone, impl_attrs = "#[uniffi::export]")] +#[display("{self:#?}")] +#[uniffi::export(Display)] +pub struct PipelineResult { + /// The pipeline job that was executed. + pub pipeline_job: Arc, + /// The result of the pipeline execution. + pub output_packets: HashMap>, + /// Logs of any failures that occurred during the pipeline execution. + pub failure_logs: Vec, + /// The status of the pipeline execution. + pub status: PipelineStatus, +} + +/// The status of a pipeline execution. +#[derive(uniffi::Enum, Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub enum PipelineStatus { + /// The pipeline is currently running. + Running, + /// The pipeline has completed successfully. + Succeeded, + /// The pipeline has failed. + Failed, + /// The pipeline has partially succeeded. There should be some failure logs + PartiallySucceeded, +} /// A node in a computational pipeline. -#[derive(uniffi::Enum, Debug, Clone, Deserialize, Serialize)] +#[derive(uniffi::Enum, Debug, Clone, Deserialize, Serialize, PartialEq)] pub enum Kernel { /// Pod reference. Pod { /// See [`Pod`](crate::uniffi::model::pod::Pod). - r#ref: Arc, + pod: Arc, }, /// Cartesian product operation. See [`JoinOperator`](crate::core::operator::JoinOperator). JoinOperator, /// Rename a path set key operation. MapOperator { /// See [`MapOperator`](crate::core::operator::MapOperator). - map: HashMap, + mapper: Arc, }, } +impl From for Kernel { + fn from(mapper: MapOperator) -> Self { + Self::MapOperator { + mapper: Arc::new(mapper), + } + } +} + +impl From for Kernel { + fn from(pod: Pod) -> Self { + Self::Pod { pod: Arc::new(pod) } + } +} + /// Index from pipeline node into pod specification. #[derive(uniffi::Record, Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] -pub struct SpecURI { +pub struct NodeURI { /// Node reference name in pipeline. - pub node: String, + pub node_id: String, /// Specification key. pub key: String, } diff --git a/src/uniffi/model/pod.rs b/src/uniffi/model/pod.rs index 62cd006f..9df7bb9d 100644 --- a/src/uniffi/model/pod.rs +++ b/src/uniffi/model/pod.rs @@ -2,8 +2,9 @@ use crate::{ core::{ crypto::{hash_blob, hash_buffer}, model::{ + ToYaml, pod::{deserialize_pod, deserialize_pod_job}, - serialize_hashmap, serialize_hashmap_option, to_yaml, + serialize_hashmap, serialize_hashmap_option, }, util::get, validation::validate_packet, @@ -49,14 +50,11 @@ pub struct Pod { /// Exposed, internal output specification. #[serde(serialize_with = "serialize_hashmap")] pub output_spec: HashMap, - /// Link to source associated with image binary. - pub source_commit_url: String, - /// Recommendation for CPU in fractional cores. - pub recommended_cpus: f32, - /// Recommendation for memory in bytes. - pub recommended_memory: u64, - /// If applicable, recommendation for GPU configuration. - pub required_gpu: Option, + /// Execution requirements for the pod. + #[serde(default)] + pub recommend_specs: RecommendSpecs, + /// Optional GPU requirements for the pod. If set, then the running system needs a GPU that meets the requirements. + pub gpu_requirements: Option, } #[uniffi::export] @@ -74,10 +72,8 @@ impl Pod { input_spec: HashMap, output_dir: PathBuf, output_spec: HashMap, - source_commit_url: String, - recommended_cpus: f32, - recommended_memory: u64, - required_gpu: Option, + recommend_specs: RecommendSpecs, + gpu_requirements: Option, ) -> Result { let pod_no_hash = Self { annotation, @@ -87,19 +83,72 @@ impl Pod { input_spec, output_dir, output_spec, - source_commit_url, - recommended_cpus, - recommended_memory, - required_gpu, + recommend_specs, + gpu_requirements, }; Ok(Self { - hash: hash_buffer(to_yaml(&pod_no_hash)?), + hash: hash_buffer(pod_no_hash.to_yaml()?), ..pod_no_hash }) } } +impl ToYaml for Pod { + fn process_field( + field_name: &str, + field_value: &serde_yaml::Value, + ) -> Option<(String, serde_yaml::Value)> { + match field_name { + "annotation" | "hash" | "recommend_specs" => None, + _ => Some((field_name.to_owned(), field_value.clone())), + } + } +} + +/// Execution recommendations for a pod, since it doesn't impact the actual reproducibility +/// it shouldn't be hashed along with the pod +#[derive(uniffi::Record, Serialize, Deserialize, Debug, PartialEq, Default, Clone)] +pub struct RecommendSpecs { + /// Optimal number of CPU cores needed to run the pod provided by the user + pub cpus: f32, + /// Optimal amount of memory needed to run the pod provided by the user, code can probably run with less but may hit OOM + pub memory: u64, +} + +impl ToYaml for RecommendSpecs { + fn process_field( + field_name: &str, + field_value: &serde_yaml::Value, + ) -> Option<(String, serde_yaml::Value)> { + Some((field_name.to_owned(), field_value.clone())) + } +} + +/// Specification for GPU requirements in computation. +#[derive(uniffi::Record, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct GPURequirement { + /// GPU model specification. + pub model: GPUModel, + /// Manufacturer recommended memory. + pub recommended_memory: u64, + /// Number of GPU cards required. + pub count: u16, +} + +/// GPU model specification. +#[derive(uniffi::Enum, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum GPUModel { + /// NVIDIA-manufactured card where `String` is the specific minimum CUDA version in X.XX + NVIDIA(String), + /// Any GPU architecture, code is generic enough + Any, +} + /// A compute job that specifies resource requests and input/output targets. +/// +/// `PodJob` represents a specific execution instance of a [`Pod`] with concrete +/// input data, resource limits, and output specifications. It includes all the +/// information needed to run a containerized computation job. #[derive( uniffi::Object, Serialize, Deserialize, Debug, PartialEq, Clone, Default, Display, CloneGetters, )] @@ -177,12 +226,25 @@ impl PodJob { env_vars, }; Ok(Self { - hash: hash_buffer(to_yaml(&pod_job_no_hash)?), + hash: hash_buffer(pod_job_no_hash.to_yaml()?), ..pod_job_no_hash }) } } +impl ToYaml for PodJob { + fn process_field( + field_name: &str, + field_value: &serde_yaml::Value, + ) -> Option<(String, serde_yaml::Value)> { + match field_name { + "annotation" | "hash" => None, + "pod" => Some((field_name.to_owned(), field_value["hash"].clone())), + _ => Some((field_name.to_owned(), field_value.clone())), + } + } +} + /// Result from a compute job run. #[derive(uniffi::Record, Serialize, Deserialize, Debug, Clone, PartialEq, Default)] pub struct PodResult { @@ -205,6 +267,8 @@ pub struct PodResult { pub created: u64, /// Time in epoch when terminated in seconds. pub terminated: u64, + /// Logs about stdout and stderr, where stderr is append at the end + pub logs: String, } impl PodResult { @@ -221,6 +285,7 @@ impl PodResult { created: u64, terminated: u64, namespace_lookup: &HashMap, + logs: String, ) -> Result { let output_packet = pod_job .pod @@ -263,6 +328,7 @@ impl PodResult { }) .collect::>()?; + // If packet is completed, the output packet must meet the output spec if matches!(status, PodStatus::Completed) { validate_packet("output".into(), &pod_job.pod.output_spec, &output_packet)?; } @@ -276,30 +342,24 @@ impl PodResult { status, created, terminated, + logs, }; Ok(Self { - hash: hash_buffer(to_yaml(&pod_result_no_hash)?), + hash: hash_buffer(pod_result_no_hash.to_yaml()?), ..pod_result_no_hash }) } } -/// Specification for GPU requirements in computation. -#[derive(uniffi::Record, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct GPURequirement { - /// GPU model specification. - pub model: GPUModel, - /// Manufacturer recommended memory. - pub recommended_memory: u64, - /// Number of GPU cards required. - pub count: u16, -} - -/// GPU model specification. -#[derive(uniffi::Enum, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub enum GPUModel { - /// NVIDIA-manufactured card where `String` is the specific model e.g. ??? - NVIDIA(String), - /// AMD-manufactured card where `String` is the specific model e.g. ??? - AMD(String), +impl ToYaml for PodResult { + fn process_field( + field_name: &str, + field_value: &serde_yaml::Value, + ) -> Option<(String, serde_yaml::Value)> { + match field_name { + "annotation" | "hash" => None, + "pod_job" => Some((field_name.to_owned(), field_value["hash"].clone())), + _ => Some((field_name.to_owned(), field_value.clone())), + } + } } diff --git a/src/uniffi/operator.rs b/src/uniffi/operator.rs new file mode 100644 index 00000000..a7f666be --- /dev/null +++ b/src/uniffi/operator.rs @@ -0,0 +1,11 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +/// Operator class that map `input_keys` to `output_key`, effectively renaming it +/// For use in pipelines +#[derive(uniffi::Object, Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct MapOperator { + /// Mapping of input keys to output keys + pub map: HashMap, +} diff --git a/src/uniffi/orchestrator/agent.rs b/src/uniffi/orchestrator/agent.rs index 6cab47b5..1d1df2e7 100644 --- a/src/uniffi/orchestrator/agent.rs +++ b/src/uniffi/orchestrator/agent.rs @@ -51,7 +51,7 @@ pub struct AgentClient { /// Connecting agent's assigned name used for reference. pub host: String, #[getset(skip)] - pub(crate) session: zenoh::Session, + pub(crate) session: Arc, } #[uniffi::export] @@ -72,7 +72,8 @@ impl AgentClient { .await .context(selector::AgentCommunicationFailure {})?, ) - })?, + })? + .into(), }) } /// Start many pod jobs to be processed in parallel. @@ -155,15 +156,15 @@ impl Agent { /// # Errors /// /// Will stop and return an error if encounters an error while processing any pod job request. - #[expect(clippy::excessive_nesting, reason = "Nesting manageable.")] pub async fn start( &self, namespace_lookup: &HashMap, available_store: Option>, ) -> Result<()> { let mut services = JoinSet::new(); + let self_ref = Arc::new(self.clone()); services.spawn(start_service( - Arc::new(self.clone()), + Arc::clone(&self_ref), "pod_job", BTreeMap::from([("action", "request".to_owned())]), namespace_lookup.clone(), @@ -185,7 +186,7 @@ impl Agent { "pod_job", BTreeMap::from([ ( - "action", + "event", match &pod_result.status { PodStatus::Completed => "success", PodStatus::Running @@ -204,23 +205,9 @@ impl Agent { )); if let Some(store) = available_store { services.spawn(start_service( - Arc::new(self.clone()), + Arc::clone(&self_ref), "pod_job", - BTreeMap::from([("action", "success".to_owned())]), - namespace_lookup.clone(), - { - let inner_store = Arc::clone(&store); - async move |_, _, _, pod_result| { - inner_store.save_pod_result(&pod_result)?; - Ok(()) - } - }, - async |_, ()| Ok(()), - )); - services.spawn(start_service( - Arc::new(self.clone()), - "pod_job", - BTreeMap::from([("action", "failure".to_owned())]), + BTreeMap::from([("event", "*".to_owned())]), namespace_lookup.clone(), async move |_, _, _, pod_result| { store.save_pod_result(&pod_result)?; diff --git a/src/uniffi/orchestrator/docker.rs b/src/uniffi/orchestrator/docker.rs index a8eb5d22..58114577 100644 --- a/src/uniffi/orchestrator/docker.rs +++ b/src/uniffi/orchestrator/docker.rs @@ -12,7 +12,9 @@ use crate::{ use async_trait; use bollard::{ Docker, - container::{RemoveContainerOptions, StartContainerOptions, WaitContainerOptions}, + container::{ + LogOutput, LogsOptions, RemoveContainerOptions, StartContainerOptions, WaitContainerOptions, + }, errors::Error::DockerContainerWaitError, image::{CreateImageOptions, ImportImageOptions}, }; @@ -70,6 +72,9 @@ impl Orchestrator for LocalDockerOrchestrator { ) -> Result { ASYNC_RUNTIME.block_on(self.get_result(pod_run, namespace_lookup)) } + fn get_logs_blocking(&self, pod_run: &PodRun) -> Result { + ASYNC_RUNTIME.block_on(self.get_logs(pod_run)) + } #[expect( clippy::try_err, reason = r#" @@ -263,8 +268,65 @@ impl Orchestrator for LocalDockerOrchestrator { ), })?, namespace_lookup, + self.get_logs(pod_run).await?, ) } + + async fn get_logs(&self, pod_run: &PodRun) -> Result { + let mut std_out = Vec::new(); + let mut std_err = Vec::new(); + + self.api + .logs::( + &pod_run.assigned_name, + Some(LogsOptions { + stdout: true, + stderr: true, + ..Default::default() + }), + ) + .try_collect::>() + .await? + .iter() + .for_each(|log_output| match log_output { + LogOutput::StdOut { message } => { + std_out.extend(message.to_vec()); + } + LogOutput::StdErr { message } => { + std_err.extend(message.to_vec()); + } + LogOutput::StdIn { .. } | LogOutput::Console { .. } => { + // Ignore stdin logs, as they are not relevant for our use case + } + }); + + let mut logs = String::from_utf8_lossy(&std_out).to_string(); + if !std_err.is_empty() { + logs.push_str("\nSTDERR:\n"); + logs.push_str(&String::from_utf8_lossy(&std_err)); + } + + // Check for errors in the docker state, if exist, attach it to logs + // This is for when the container exits immediately due to a bad command or similar + let error = self + .api + .inspect_container(&pod_run.assigned_name, None) + .await? + .state + .context(selector::FailedToExtractRunInfo { + container_name: &pod_run.assigned_name, + })? + .error + .context(selector::FailedToExtractRunInfo { + container_name: &pod_run.assigned_name, + })?; + + if !error.is_empty() { + logs.push_str(&error); + } + + Ok(logs) + } } #[uniffi::export] diff --git a/src/uniffi/orchestrator/mod.rs b/src/uniffi/orchestrator/mod.rs index 00358ec3..04d2412f 100644 --- a/src/uniffi/orchestrator/mod.rs +++ b/src/uniffi/orchestrator/mod.rs @@ -58,7 +58,7 @@ pub struct PodRunInfo { pub memory_limit: u64, } /// Current computation managed by orchestrator. -#[derive(uniffi::Record, Debug, PartialEq)] +#[derive(uniffi::Record, Debug, PartialEq, Clone)] pub struct PodRun { /// Original compute request. pub pod_job: Arc, @@ -121,6 +121,10 @@ pub trait Orchestrator: Send + Sync + fmt::Debug { pod_run: &PodRun, namespace_lookup: &HashMap, ) -> Result; + /// Get the logs for a specific pod run. + /// # Errors + /// Will return `Err` if there is an issue getting logs. + fn get_logs_blocking(&self, pod_run: &PodRun) -> Result; /// How to asynchronously start containers with an alternate image. /// /// # Errors @@ -170,6 +174,9 @@ pub trait Orchestrator: Send + Sync + fmt::Debug { pod_run: &PodRun, namespace_lookup: &HashMap, ) -> Result; + + /// Get the logs for a specific pod run. + async fn get_logs(&self, pod_run: &PodRun) -> Result; } /// Orchestration execution agent daemon and client. pub mod agent; diff --git a/src/uniffi/store/filestore.rs b/src/uniffi/store/filestore.rs index db089dfa..afbe7a43 100644 --- a/src/uniffi/store/filestore.rs +++ b/src/uniffi/store/filestore.rs @@ -1,14 +1,18 @@ -use crate::uniffi::{ - error::Result, - model::{ - ModelType, - pod::{Pod, PodJob, PodResult}, +use crate::{ + core::model::ToYaml as _, + uniffi::{ + error::{Kind, OrcaError, Result}, + model::{ + ModelType, + pod::{Pod, PodJob, PodResult}, + }, + store::{ModelID, ModelInfo, Store}, }, - store::{ModelID, ModelInfo, Store}, }; +use chrono::Utc; use derive_more::Display; use getset::CloneGetters; -use std::{fs, path::PathBuf}; +use std::{backtrace::Backtrace, fs, path::PathBuf}; use uniffi; /// Support for a storage backend on a local filesystem directory. #[derive(uniffi::Object, Debug, Display, CloneGetters, Clone)] @@ -23,12 +27,52 @@ pub struct LocalFileStore { #[uniffi::export] impl Store for LocalFileStore { fn save_pod(&self, pod: &Pod) -> Result<()> { - self.save_model(pod, &pod.hash, pod.annotation.as_ref()) + self.save_model(pod, &pod.hash, pod.annotation.as_ref())?; + // Deal with saving the recommended_specs + // Since we are going with a no modify scheme for saving, we will save the latest version as year-month-day-hour-min-second UTC + Self::save_file( + self.make_path( + pod, + &pod.hash, + format!( + "recommended_specs/{}", + Utc::now().format("%Y-%m-%d-%H-%M-%S-%f") + ), + ), + &pod.recommend_specs.to_yaml()?, + ) } fn load_pod(&self, model_id: &ModelID) -> Result { let (mut pod, annotation, hash) = self.load_model::(model_id)?; pod.annotation = annotation; pod.hash = hash; + // Deal with the recommended_specs by selecting the last saved spec + // List all files in the dir + let folder_path = self.make_path(&pod, &pod.hash, "recommended_specs"); + let mut recommended_specs = fs::read_dir(&folder_path)?; + + let mut latest_spec_file_name = recommended_specs + .next() + .ok_or(OrcaError { + kind: Kind::EmptyDir { + dir: folder_path.clone(), + backtrace: Some(Backtrace::capture()), + }, + })?? + .file_name(); + + for entry in recommended_specs { + let file_name = entry?.file_name(); + if file_name > latest_spec_file_name { + latest_spec_file_name = file_name; + } + } + + // Read the latest_spec and loaded back in + pod.recommend_specs = serde_yaml::from_str(&fs::read_to_string( + folder_path.join(latest_spec_file_name), + )?)?; + Ok(pod) } fn list_pod(&self) -> Result> { diff --git a/src/uniffi/store/mod.rs b/src/uniffi/store/mod.rs index 449da225..c07ac84a 100644 --- a/src/uniffi/store/mod.rs +++ b/src/uniffi/store/mod.rs @@ -16,7 +16,7 @@ pub enum ModelID { } /// Metadata for a model. -#[derive(uniffi::Record, Debug, PartialEq, Eq)] +#[derive(uniffi::Record, Debug, PartialEq, Eq, Hash)] pub struct ModelInfo { /// A model's name. pub name: Option, diff --git a/tests/agent.rs b/tests/agent.rs index 0fa82a0d..59469796 100644 --- a/tests/agent.rs +++ b/tests/agent.rs @@ -9,17 +9,15 @@ pub mod fixture; use fixture::{NAMESPACE_LOOKUP_READ_ONLY, TestDirs, pod_jobs_stresser, pull_image}; -use orcapod::{ - core::orchestrator::agent::extract_metadata, - uniffi::{ - error::Result, - model::pod::PodResult, - orchestrator::{ - agent::{Agent, AgentClient}, - docker::LocalDockerOrchestrator, - }, - store::{ModelID, Store as _, filestore::LocalFileStore}, +use itertools::Itertools as _; +use orcapod::uniffi::{ + error::Result, + model::pod::PodResult, + orchestrator::{ + agent::{Agent, AgentClient}, + docker::LocalDockerOrchestrator, }, + store::{ModelID, Store as _, filestore::LocalFileStore}, }; use std::{ collections::HashMap, @@ -42,7 +40,6 @@ fn simple() -> Result<()> { Ok(()) } -#[expect(clippy::excessive_nesting, reason = "Nesting is manageable")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn parallel_four_cores() -> Result<()> { let test_dirs = TestDirs::new(&HashMap::from([("default".to_owned(), None::)]))?; @@ -50,7 +47,7 @@ async fn parallel_four_cores() -> Result<()> { // config let image_reference = "ghcr.io/colinianking/stress-ng:e2f96874f951a72c1c83ff49098661f0e013ac40"; pull_image(image_reference)?; - let margin_millis = 2000; + let margin_millis = 5000; let run_duration_secs = 5; let (group, host) = ("agent_parallel-four-cores", "host"); // api @@ -101,8 +98,14 @@ async fn parallel_four_cores() -> Result<()> { .recv_async() .await .expect("All senders have dropped."); - let metadata = extract_metadata(sample.key_expr().as_str()); - let topic_kind = metadata["action"].as_str(); + let metadata: HashMap = sample + .key_expr() + .as_str() + .split('/') + .map(ToOwned::to_owned) + .tuples() + .collect(); + let topic_kind = metadata["event"].as_str(); if ["success", "failure"].contains(&topic_kind) { let pod_result = serde_json::from_slice::(&sample.payload().to_bytes())?; assert!( diff --git a/tests/crypto.rs b/tests/crypto.rs deleted file mode 100644 index a63dc18e..00000000 --- a/tests/crypto.rs +++ /dev/null @@ -1,30 +0,0 @@ -#![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] -pub mod fixture; - -use orcapod::{ - core::crypto::{hash_buffer, hash_dir, hash_file}, - uniffi::error::Result, -}; -use std::fs::read; - -#[test] -fn consistent_hash() -> Result<()> { - let filepath = "./tests/extra/data/images/subject.jpeg"; - assert_eq!( - hash_file(filepath)?, - hash_buffer(&read(filepath)?), - "Checksum not consistent." - ); - Ok(()) -} - -#[test] -fn complex_hash() -> Result<()> { - let dirpath = "./tests/extra/data/images"; - assert_eq!( - hash_dir(dirpath)?, - "6c96a478ea25e34fab045bc82858a2980b2cfb22db32e83c01349a8e7ed3b42c".to_owned(), - "Directory checksum didn't match." - ); - Ok(()) -} diff --git a/tests/error.rs b/tests/error.rs index 7eb89524..4564e162 100644 --- a/tests/error.rs +++ b/tests/error.rs @@ -10,16 +10,13 @@ use chrono::DateTime; use dot_parser::ast::Graph as DOTGraph; use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pod_custom, pod_job_custom, pod_job_style, str_to_vec}; use glob::glob; -use orcapod::{ - core::crypto::hash_file, - uniffi::{ - error::{OrcaError, Result}, - model::packet::PathInfo, - orchestrator::{ - Orchestrator as _, - agent::{AgentClient, Response}, - docker::LocalDockerOrchestrator, - }, +use orcapod::uniffi::{ + error::{OrcaError, Result}, + model::packet::PathInfo, + orchestrator::{ + Orchestrator as _, + agent::{AgentClient, Response}, + docker::LocalDockerOrchestrator, }, }; use serde_json; @@ -154,14 +151,6 @@ fn internal_incomplete_packet() -> Result<()> { Ok(()) } -#[test] -fn internal_invalid_filepath() { - assert!( - hash_file("nonexistent_file.txt").is_err_and(contains_debug), - "Did not raise an invalid filepath error." - ); -} - #[test] fn internal_key_missing() { assert!( diff --git a/tests/extra/data/input_txt/Where.txt b/tests/extra/data/input_txt/Where.txt new file mode 100644 index 00000000..2891a132 --- /dev/null +++ b/tests/extra/data/input_txt/Where.txt @@ -0,0 +1 @@ +Where diff --git a/tests/extra/data/input_txt/black.txt b/tests/extra/data/input_txt/black.txt new file mode 100644 index 00000000..7e66a17d --- /dev/null +++ b/tests/extra/data/input_txt/black.txt @@ -0,0 +1 @@ +black diff --git a/tests/extra/data/input_txt/cat.txt b/tests/extra/data/input_txt/cat.txt new file mode 100644 index 00000000..ef07ddcd --- /dev/null +++ b/tests/extra/data/input_txt/cat.txt @@ -0,0 +1 @@ +cat diff --git a/tests/extra/data/input_txt/hiding.txt b/tests/extra/data/input_txt/hiding.txt new file mode 100644 index 00000000..56e64f05 --- /dev/null +++ b/tests/extra/data/input_txt/hiding.txt @@ -0,0 +1 @@ +hiding diff --git a/tests/extra/data/input_txt/is.txt b/tests/extra/data/input_txt/is.txt new file mode 100644 index 00000000..f5cb1322 --- /dev/null +++ b/tests/extra/data/input_txt/is.txt @@ -0,0 +1 @@ +is diff --git a/tests/extra/data/input_txt/playing.txt b/tests/extra/data/input_txt/playing.txt new file mode 100644 index 00000000..0395b790 --- /dev/null +++ b/tests/extra/data/input_txt/playing.txt @@ -0,0 +1 @@ +playing diff --git a/tests/extra/data/input_txt/tabby.txt b/tests/extra/data/input_txt/tabby.txt new file mode 100644 index 00000000..3de6015d --- /dev/null +++ b/tests/extra/data/input_txt/tabby.txt @@ -0,0 +1 @@ +tabby diff --git a/tests/extra/data/input_txt/the.txt b/tests/extra/data/input_txt/the.txt new file mode 100644 index 00000000..41d25f51 --- /dev/null +++ b/tests/extra/data/input_txt/the.txt @@ -0,0 +1 @@ +the diff --git a/tests/extra/python/agent_test.py b/tests/extra/python/agent.py similarity index 88% rename from tests/extra/python/agent_test.py rename to tests/extra/python/agent.py index 7e35ea79..8405d8d7 100644 --- a/tests/extra/python/agent_test.py +++ b/tests/extra/python/agent.py @@ -17,6 +17,7 @@ Uri, Pod, Annotation, + RecommendSpecs, ) @@ -29,7 +30,7 @@ def count(sample): with zenoh.open(zenoh.Config()) as session: with session.declare_subscriber( - f"**/action/success/**/group/{group}/**/topic/pod_job/**", count + f"**/event/success/**/group/{group}/**/topic/pod_job/**", count ) as subscriber: await asyncio.sleep(20) # wait for results @@ -45,7 +46,7 @@ async def main(client, agent, test_dir, namespace_lookup, pod_jobs): available_store=LocalFileStore(directory=f"{test_dir}/store"), ), ) - await asyncio.sleep(5) # ensure service ready + await asyncio.sleep(1) # ensure service ready try: await client.start_pod_jobs(pod_jobs=pod_jobs) @@ -88,10 +89,11 @@ async def main(client, agent, test_dir, namespace_lookup, pod_jobs): input_spec={}, output_dir="/tmp/output", output_spec={}, - source_commit_url="https://github.com/user/simple", - recommended_cpus=0.1, - recommended_memory=10 << 20, - required_gpu=None, + recommend_specs=RecommendSpecs( + cpus=0.1, + memory=128 << 20, + ), + gpu_requirements=None, ), input_packet={}, output_dir=Uri( @@ -99,7 +101,7 @@ async def main(client, agent, test_dir, namespace_lookup, pod_jobs): path=".", ), cpu_limit=1, - memory_limit=10 << 20, + memory_limit=128 << 20, env_vars=None, namespace_lookup=namespace_lookup, ) diff --git a/tests/extra/python/smoke_test.py b/tests/extra/python/model.py similarity index 96% rename from tests/extra/python/smoke_test.py rename to tests/extra/python/model.py index 4e111023..03c76d8c 100755 --- a/tests/extra/python/smoke_test.py +++ b/tests/extra/python/model.py @@ -16,6 +16,7 @@ LocalFileStore, ModelId, ModelType, + RecommendSpecs, OrcaError, ) @@ -32,10 +33,11 @@ def create_pod(data, _): input_spec={}, output_dir="/tmp/output", output_spec={}, - source_commit_url="https://github.com/user/simple", - recommended_cpus=0.1, - recommended_memory=10 << 20, - required_gpu=None, + recommend_specs=RecommendSpecs( + cpus=0.1, + memory=10 << 20, + ), + gpu_requirements=None, ) return data["pod"], data diff --git a/tests/fixture/mod.rs b/tests/fixture/mod.rs index a9ae1090..9917caf3 100644 --- a/tests/fixture/mod.rs +++ b/tests/fixture/mod.rs @@ -13,8 +13,10 @@ use orcapod::uniffi::{ model::{ Annotation, packet::{Blob, BlobKind, Packet, PathInfo, PathSet, URI}, - pod::{Pod, PodJob, PodResult}, + pipeline::{Kernel, NodeURI, Pipeline, PipelineJob}, + pod::{Pod, PodJob, PodResult, RecommendSpecs}, }, + operator::MapOperator, orchestrator::PodStatus, store::{ModelID, ModelInfo, Store}, }; @@ -75,9 +77,10 @@ pub fn pod_style() -> Result { }, ), ]), - "https://github.com/user/style-transfer/tree/1.0.0".to_owned(), - 0.25, // 250 millicores as frac cores - 1_u64 << 30, // 1GiB in bytes + RecommendSpecs { + cpus: 0.25, + memory: 1_u64 << 30, + }, None, ) } @@ -153,6 +156,7 @@ pub fn pod_result_style( 1_737_922_307, 1_737_925_907, namespace_lookup, + "Example logs".to_owned(), ) } @@ -168,9 +172,10 @@ pub fn pod_custom( input_spec, PathBuf::from("/tmp/output"), HashMap::new(), - "https://github.com/place/holder".to_owned(), - 0.1, // 100 millicores as frac cores - 50_u64 << 20, // 10 MiB in bytes + RecommendSpecs { + cpus: 0.1, + memory: 50_u64 << 20, + }, None, ) } @@ -280,6 +285,276 @@ pub fn pull_image(reference: &str) -> Result<()> { Ok(()) } +// Pipeline Fixture +pub fn combine_txt_pod(pod_name: &str) -> Result { + Pod::new( + Some(Annotation { + name: pod_name.to_owned(), + description: "Takes two input files, remove the final next line and combine them" + .to_owned(), + version: "1.0.0".to_owned(), + }), + "alpine:3.14".to_owned(), + vec![ + "sh".into(), + "-c".into(), + format!( + "printf '%s %s\\n' \"$(cat input/input_1.txt | head -c -1)\" \"$(cat input/input_2.txt | head -c -1)\" > /output/output.txt" + ), + ], + HashMap::from([ + ( + "input_1".to_owned(), + PathInfo { + path: PathBuf::from("/input/input_1.txt"), + match_pattern: r".*\.txt".to_owned(), + }, + ), + ( + "input_2".into(), + PathInfo { + path: PathBuf::from("/input/input_2.txt"), + match_pattern: r".*\.txt".to_owned(), + }, + ), + ]), + PathBuf::from("/output"), + HashMap::from([( + "output".to_owned(), + PathInfo { + path: PathBuf::from("output.txt"), + match_pattern: r".*\.txt".to_owned(), + }, + )]), + RecommendSpecs { + cpus: 0.25, + memory: 128_u64 << 20, + }, + None, + ) +} + +#[expect(clippy::too_many_lines, reason = "OK in tests.")] +pub fn pipeline() -> Result { + // Create a simple pipeline where the functions job is to add append their name into the input file + // Structure: A -> Mapper -> Joiner -> B -> Mapper -> C, D -> Mapper -> Joiner + + // Create the kernel map + let mut kernel_map = HashMap::new(); + + // Insert the pod into the kernel map + for pod_name in ["A", "B", "C", "D", "E"] { + kernel_map.insert(pod_name.into(), combine_txt_pod(pod_name)?.into()); + } + + let output_to_input_1 = Arc::new(MapOperator { + map: HashMap::from([("output".to_owned(), "input_1".to_owned())]), + }); + + let output_to_input_2 = Arc::new(MapOperator { + map: HashMap::from([("output".to_owned(), "input_2".to_owned())]), + }); + + // Create a mapper for A, B, and C + kernel_map.insert( + "pod_a_mapper".into(), + Kernel::MapOperator { + mapper: Arc::clone(&output_to_input_1), + }, + ); + kernel_map.insert( + "pod_b_mapper".into(), + Kernel::MapOperator { + mapper: Arc::clone(&output_to_input_2), + }, + ); + kernel_map.insert( + "pod_c_mapper".into(), + Kernel::MapOperator { + mapper: Arc::clone(&output_to_input_1), + }, + ); + kernel_map.insert( + "pod_d_mapper".into(), + Kernel::MapOperator { + mapper: Arc::clone(&output_to_input_2), + }, + ); + + for joiner_name in ['c', 'd', 'e'] { + kernel_map.insert(format!("pod_{joiner_name}_joiner"), Kernel::JoinOperator); + } + + // Write all the edges in DOT format + let dot = " + digraph { + A -> pod_a_mapper -> pod_c_joiner; + B -> pod_b_mapper -> pod_c_joiner; + pod_c_joiner -> C -> pod_c_mapper-> pod_e_joiner; + D -> pod_d_mapper -> pod_e_joiner; + pod_e_joiner -> E; + } + "; + + Pipeline::new( + dot, + kernel_map, + HashMap::from([ + ( + "where".into(), + vec![NodeURI { + node_id: "A".into(), + key: "input_1".into(), + }], + ), + ( + "is".into(), + vec![NodeURI { + node_id: "A".into(), + key: "input_2".into(), + }], + ), + ( + "the".into(), + vec![NodeURI { + node_id: "B".into(), + key: "input_1".into(), + }], + ), + ( + "cat_color".into(), + vec![NodeURI { + node_id: "B".into(), + key: "input_2".into(), + }], + ), + ( + "cat".into(), + vec![NodeURI { + node_id: "D".into(), + key: "input_1".into(), + }], + ), + ( + "action".into(), + vec![NodeURI { + node_id: "D".into(), + key: "input_2".into(), + }], + ), + ]), + HashMap::from([( + "output".to_owned(), + NodeURI { + node_id: "E".into(), + key: "output".into(), + }, + )]), + ) +} + +#[expect(clippy::implicit_hasher, reason = "Could be a false positive?")] +pub fn pipeline_job(namespace_lookup: &HashMap) -> Result { + // Create a simple pipeline_job + let namespace: String = "default".into(); + PipelineJob::new( + pipeline()?.into(), + &HashMap::from([ + ( + "where".into(), + vec![PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/Where.txt".into(), + }, + checksum: String::new(), + })], + ), + ( + "is".into(), + vec![PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/is.txt".into(), + }, + checksum: String::new(), + })], + ), + ( + "the".into(), + vec![PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/the.txt".into(), + }, + checksum: String::new(), + })], + ), + ( + "cat_color".into(), + vec![ + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/black.txt".into(), + }, + checksum: String::new(), + }), + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/tabby.txt".into(), + }, + checksum: String::new(), + }), + ], + ), + ( + "cat".into(), + vec![PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/cat.txt".into(), + }, + checksum: String::new(), + })], + ), + ( + "action".into(), + vec![ + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace: namespace.clone(), + path: "input_txt/hiding.txt".into(), + }, + checksum: String::new(), + }), + PathSet::Unary(Blob { + kind: BlobKind::File, + location: URI { + namespace, + path: "input_txt/playing.txt".into(), + }, + checksum: String::new(), + }), + ], + ), + ]), + URI { + namespace: "default".to_owned(), + path: PathBuf::from("pipeline_output"), + }, + namespace_lookup, + ) +} + // --- util --- pub fn str_to_vec(v: &str) -> Vec { diff --git a/tests/model.rs b/tests/model.rs index 31d0ce8c..660b78e0 100644 --- a/tests/model.rs +++ b/tests/model.rs @@ -1,16 +1,16 @@ -#![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] +#![expect(missing_docs, reason = "OK in tests.")] pub mod fixture; use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pod_job_style, pod_result_style, pod_style}; use indoc::indoc; -use orcapod::{core::model::to_yaml, uniffi::error::Result}; +use orcapod::{core::model::ToYaml as _, uniffi::error::Result}; use pretty_assertions::assert_eq as pretty_assert_eq; #[test] fn hash_pod() -> Result<()> { - assert_eq!( + pretty_assert_eq!( pod_style()?.hash, - "0e993f645fbb36f0635e2c9140975997cf4ca723d0b49cf4ee4963b76e6424d7", + "2104a9b471bec3a54f1f5437d887e16626bed3e818241410ce1ec7a63f0361fb", "Hash didn't match." ); Ok(()) @@ -19,7 +19,7 @@ fn hash_pod() -> Result<()> { #[test] fn pod_to_yaml() -> Result<()> { pretty_assert_eq!( - to_yaml(&pod_style()?)?, + pod_style()?.to_yaml()?, indoc! {r" class: pod image: example.server.com/user/style-transfer:1.0.0 @@ -41,10 +41,7 @@ fn pod_to_yaml() -> Result<()> { result2: path: result2.jpeg match_pattern: .*\.jpeg - source_commit_url: https://github.com/user/style-transfer/tree/1.0.0 - recommended_cpus: 0.25 - recommended_memory: 1073741824 - required_gpu: null + gpu_requirements: null "}, "YAML serialization didn't match." ); @@ -53,9 +50,9 @@ fn pod_to_yaml() -> Result<()> { #[test] fn hash_pod_job() -> Result<()> { - assert_eq!( + pretty_assert_eq!( pod_job_style(&NAMESPACE_LOOKUP_READ_ONLY)?.hash, - "ba1c4693f9186ccb1b6e63625085d8fd95552b28b7a60fe9b1b47f68a9ba8880", + "009588059284ab7e62c6af040eca44dbd8b32964ebb1406a34b174dafcf4520d", "Hash didn't match." ); Ok(()) @@ -64,10 +61,10 @@ fn hash_pod_job() -> Result<()> { #[test] fn pod_job_to_yaml() -> Result<()> { pretty_assert_eq!( - to_yaml(&pod_job_style(&NAMESPACE_LOOKUP_READ_ONLY)?)?, + pod_job_style(&NAMESPACE_LOOKUP_READ_ONLY)?.to_yaml()?, indoc! {" class: pod_job - pod: 0e993f645fbb36f0635e2c9140975997cf4ca723d0b49cf4ee4963b76e6424d7 + pod: 2104a9b471bec3a54f1f5437d887e16626bed3e818241410ce1ec7a63f0361fb input_packet: base-input: - kind: File @@ -102,9 +99,9 @@ fn pod_job_to_yaml() -> Result<()> { #[test] fn hash_pod_result() -> Result<()> { - assert_eq!( + pretty_assert_eq!( pod_result_style(&NAMESPACE_LOOKUP_READ_ONLY)?.hash, - "e752d86d4fc5435bfa4564ba951851530f5cf2228586c2f488c2ca9e7bcc7ed1", + "9c4ae0ab0659823d104a30ef81c19c9cb1584a07e5cb702dc189f9bd7604fda3", "Hash didn't match." ); Ok(()) @@ -113,10 +110,10 @@ fn hash_pod_result() -> Result<()> { #[test] fn pod_result_to_yaml() -> Result<()> { pretty_assert_eq!( - to_yaml(&pod_result_style(&NAMESPACE_LOOKUP_READ_ONLY)?)?, + pod_result_style(&NAMESPACE_LOOKUP_READ_ONLY)?.to_yaml()?, indoc! {" class: pod_result - pod_job: ba1c4693f9186ccb1b6e63625085d8fd95552b28b7a60fe9b1b47f68a9ba8880 + pod_job: 009588059284ab7e62c6af040eca44dbd8b32964ebb1406a34b174dafcf4520d output_packet: result1: kind: File @@ -134,6 +131,7 @@ fn pod_result_to_yaml() -> Result<()> { status: Completed created: 1737922307 terminated: 1737925907 + logs: Example logs "}, "YAML serialization didn't match." ); diff --git a/tests/operator.rs b/tests/operator.rs deleted file mode 100644 index 0389f069..00000000 --- a/tests/operator.rs +++ /dev/null @@ -1,213 +0,0 @@ -#![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] - -use orcapod::{ - core::operator::{JoinOperator, MapOperator, Operator}, - uniffi::{ - error::Result, - model::packet::{Blob, BlobKind, Packet, PathSet, URI}, - }, -}; -use std::{collections::HashMap, path::PathBuf}; - -fn make_packet_key(key_name: String, filepath: String) -> (String, PathSet) { - ( - key_name, - PathSet::Unary(Blob { - kind: BlobKind::File, - location: URI { - namespace: "default".into(), - path: PathBuf::from(filepath), - }, - checksum: String::new(), - }), - ) -} - -async fn next_batch( - operator: impl Operator, - packets: Vec<(String, Packet)>, -) -> Result> { - let mut next_packets = vec![]; - for (stream_name, packet) in packets { - next_packets.extend(operator.next(stream_name, packet).await?); - } - Ok(next_packets) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn join_once() -> Result<()> { - let operator = JoinOperator::new(2); - - let left_stream = (0..3) - .map(|i| { - ( - "left".into(), - Packet::from([make_packet_key( - "subject".into(), - format!("left/subject{i}.png"), - )]), - ) - }) - .collect::>(); - - let right_stream = (0..2) - .map(|i| { - ( - "right".into(), - Packet::from([make_packet_key( - "style".into(), - format!("right/style{i}.t7"), - )]), - ) - }) - .collect::>(); - - let mut input_streams = left_stream; - input_streams.extend(right_stream); - - assert_eq!( - next_batch(operator, input_streams).await?, - vec![ - Packet::from([ - make_packet_key("subject".into(), "left/subject0.png".into()), - make_packet_key("style".into(), "right/style0.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject1.png".into()), - make_packet_key("style".into(), "right/style0.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject2.png".into()), - make_packet_key("style".into(), "right/style0.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject0.png".into()), - make_packet_key("style".into(), "right/style1.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject1.png".into()), - make_packet_key("style".into(), "right/style1.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject2.png".into()), - make_packet_key("style".into(), "right/style1.t7".into()), - ]), - ], - "Unexpected streams." - ); - - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn join_spotty() -> Result<()> { - let operator = JoinOperator::new(2); - - assert_eq!( - operator - .next( - "right".into(), - Packet::from([make_packet_key("style".into(), "right/style0.t7".into())]) - ) - .await?, - vec![], - "Unexpected streams." - ); - - assert_eq!( - operator - .next( - "right".into(), - Packet::from([make_packet_key("style".into(), "right/style1.t7".into())]) - ) - .await?, - vec![], - "Unexpected streams." - ); - - assert_eq!( - operator - .next( - "left".into(), - Packet::from([make_packet_key( - "subject".into(), - "left/subject0.png".into() - )]) - ) - .await?, - vec![ - Packet::from([ - make_packet_key("subject".into(), "left/subject0.png".into()), - make_packet_key("style".into(), "right/style0.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject0.png".into()), - make_packet_key("style".into(), "right/style1.t7".into()), - ]), - ], - "Unexpected streams." - ); - - assert_eq!( - next_batch( - operator, - (1..3) - .map(|i| { - ( - "left".into(), - Packet::from([make_packet_key( - "subject".into(), - format!("left/subject{i}.png"), - )]), - ) - }) - .collect::>() - ) - .await?, - vec![ - Packet::from([ - make_packet_key("subject".into(), "left/subject1.png".into()), - make_packet_key("style".into(), "right/style0.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject1.png".into()), - make_packet_key("style".into(), "right/style1.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject2.png".into()), - make_packet_key("style".into(), "right/style0.t7".into()), - ]), - Packet::from([ - make_packet_key("subject".into(), "left/subject2.png".into()), - make_packet_key("style".into(), "right/style1.t7".into()), - ]), - ], - "Unexpected streams." - ); - - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn map_once() -> Result<()> { - let operator = MapOperator::new(&HashMap::from([("key_old".into(), "key_new".into())])); - - assert_eq!( - operator - .next( - "parent".into(), - Packet::from([ - make_packet_key("key_old".into(), "some/key.txt".into()), - make_packet_key("subject".into(), "some/subject.txt".into()), - ]), - ) - .await?, - vec![Packet::from([ - make_packet_key("key_new".into(), "some/key.txt".into()), - make_packet_key("subject".into(), "some/subject.txt".into()), - ]),], - "Unexpected packet." - ); - - Ok(()) -} diff --git a/tests/orchestrator.rs b/tests/orchestrator.rs index d129928b..db0c5706 100644 --- a/tests/orchestrator.rs +++ b/tests/orchestrator.rs @@ -4,7 +4,6 @@ clippy::panic, clippy::expect_used, clippy::unwrap_used, - clippy::indexing_slicing, reason = "OK in tests." )] @@ -49,12 +48,12 @@ fn basic_test( assert_eq!( orchestrator .list_blocking()? - .iter() - .filter(|run| **run == *pod_run) - .map(|run| Ok(orchestrator.get_info_blocking(run)?.command)) - .collect::>>()?[0], - expected_command, - "List return a pod_run with a different command." + .into_iter() + .filter(|pod_run_from_list| pod_run_from_list == pod_run) + .map(|run| Ok(orchestrator.get_info_blocking(&run)?.command)) + .collect::>>()?, + vec![expected_command], + "Unexpected list." ); // await result let pod_result_1 = orchestrator.get_result_blocking(pod_run, namespace_lookup)?; @@ -68,11 +67,11 @@ fn basic_test( orchestrator .list_blocking()? .into_iter() - .filter(|run| *run == *pod_run) + .filter(|pod_run_from_list| pod_run_from_list == pod_run) .map(|run| Ok(orchestrator.get_info_blocking(&run)?.command)) - .collect::>>()?[0], - expected_command, - "List return a pod_run with a different command." + .collect::>>()?, + vec![expected_command], + "Unexpected list." ); assert_eq!( pod_result_1.assigned_name, pod_run.assigned_name, @@ -296,3 +295,44 @@ async fn verify_pod_result_not_running() -> Result<()> { ); Ok(()) } + +#[test] +fn logs() -> Result<()> { + execute_wrapper(|orchestrator, namespace_lookup| { + let pod_job = pod_job_custom( + pod_custom( + "alpine:3.14", + vec!["bin/sh".into(), "-c".into(), "echo \"hi\"".into()], + HashMap::new(), + )?, + HashMap::new(), + namespace_lookup, + )?; + + let pod_run = orchestrator.start_blocking(&pod_job, namespace_lookup)?; + let pod_result = orchestrator.get_result_blocking(&pod_run, namespace_lookup)?; + + assert_eq!( + pod_result.status, + PodStatus::Completed, + "Pod status is not completed" + ); + + assert_eq!(orchestrator.get_logs_blocking(&pod_run)?, "hi\n"); + assert_eq!( + orchestrator + .get_result_blocking(&pod_run, namespace_lookup)? + .logs, + "hi\n" + ); + + orchestrator.delete_blocking(&pod_run)?; + + assert!( + !orchestrator.list_blocking()?.contains(&pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} diff --git a/tests/pipeline.rs b/tests/pipeline.rs index caa02d4a..b755f656 100644 --- a/tests/pipeline.rs +++ b/tests/pipeline.rs @@ -13,7 +13,7 @@ use orcapod::uniffi::{ error::Result, model::{ packet::{Blob, BlobKind, PathInfo, PathSet, URI}, - pipeline::{Kernel, Pipeline, PipelineJob, SpecURI}, + pipeline::{Kernel, NodeURI, Pipeline, PipelineJob}, }, }; use std::collections::HashMap; @@ -29,7 +29,7 @@ fn input_packet_checksum() -> Result<()> { HashMap::from([( "A".into(), Kernel::Pod { - r#ref: pod_custom( + pod: pod_custom( "alpine:3.14", vec!["echo".into()], HashMap::from([( @@ -43,14 +43,14 @@ fn input_packet_checksum() -> Result<()> { .into(), }, )]), - &HashMap::from([( + HashMap::from([( "pipeline_key_1".into(), - vec![SpecURI { - node: "A".into(), + vec![NodeURI { + node_id: "A".into(), key: "node_key_1".into(), }], )]), - &HashMap::new(), + HashMap::new(), )?; let pipeline_job = PipelineJob::new( @@ -66,7 +66,7 @@ fn input_packet_checksum() -> Result<()> { checksum: String::new(), }])], )]), - &URI { + URI { namespace: "default".into(), path: "output/pipeline".into(), }, diff --git a/tests/pipeline_runner.rs b/tests/pipeline_runner.rs new file mode 100644 index 00000000..fa3c3ac2 --- /dev/null +++ b/tests/pipeline_runner.rs @@ -0,0 +1,92 @@ +#![expect( + missing_docs, + clippy::panic_in_result_fn, + clippy::indexing_slicing, + clippy::unwrap_used, + reason = "OK in tests." +)] +pub mod fixture; + +// Example for a local module: +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use orcapod::{ + core::pipeline_runner::DockerPipelineRunner, + uniffi::{ + error::Result, + model::pipeline::PipelineStatus, + orchestrator::{agent::Agent, docker::LocalDockerOrchestrator}, + }, +}; +use tokio::fs::read_to_string; + +use crate::fixture::TestDirs; +use fixture::pipeline_job; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn basic_run() -> Result<()> { + // Create the test_dir and get the namespace lookup + let test_dirs = TestDirs::new(&HashMap::from([( + "default".to_owned(), + Some("./tests/extra/data/"), + )]))?; + let namespace_lookup = test_dirs.namespace_lookup(); + + // Create and agent and start it (temporary for now, will be merge later) + let agent = Arc::new(Agent::new( + "test:basic_run".to_owned(), + "localhost".to_owned(), + Arc::new(LocalDockerOrchestrator::new().unwrap()), + )?); + + let agent_inner = Arc::clone(&agent); + let namespace_lookup_inner = namespace_lookup.clone(); + tokio::spawn(async move { + agent_inner + .start(&namespace_lookup_inner, None) + .await + .unwrap(); + }); + + let pipeline_job = pipeline_job(&namespace_lookup)?; + + // Create the runner + let mut runner = DockerPipelineRunner::new(agent); + + let pipeline_run = runner + .start(pipeline_job, "default", &namespace_lookup) + .await?; + + // Wait for the pipeline run to complete + let pipeline_result = runner.get_result(&pipeline_run).await?; + + // Check the output packet content + assert_eq!(pipeline_result.output_packets["output"].len(), 4); + + // Check the status + assert_eq!(pipeline_result.status, PipelineStatus::Succeeded); + + // Get all the output file content and read them in + let mut output_content = HashSet::new(); + + for output_packet in &pipeline_result.output_packets["output"] { + output_content + .insert(read_to_string(&output_packet.to_path_buf(&namespace_lookup)?[0]).await?); + } + + // Check if the output_content matches + assert_eq!( + output_content, + HashSet::from([ + "Where is the black cat playing\n".to_owned(), + "Where is the black cat hiding\n".to_owned(), + "Where is the tabby cat playing\n".to_owned(), + "Where is the tabby cat hiding\n".to_owned(), + ]) + ); + + Ok(()) +} diff --git a/tests/store.rs b/tests/store.rs index 3a6c66f8..64abfd7b 100644 --- a/tests/store.rs +++ b/tests/store.rs @@ -3,6 +3,7 @@ missing_docs, clippy::panic_in_result_fn, clippy::indexing_slicing, + clippy::unwrap_used, reason = "OK in tests." )] @@ -10,16 +11,26 @@ pub mod fixture; use fixture::{ NAMESPACE_LOOKUP_READ_ONLY, TestDirs, TestSetup, pod_job_style, pod_result_style, pod_style, }; -use orcapod::{ - core::{crypto::hash_buffer, model::to_yaml}, - uniffi::{ - error::Result, - model::{Annotation, ModelType}, - store::{ModelID, ModelInfo, Store as _, filestore::LocalFileStore}, +use orcapod::uniffi::{ + error::Result, + model::{ + Annotation, ModelType, + packet::PathInfo, + pod::{Pod, RecommendSpecs}, }, + store::{ModelID, ModelInfo, Store as _, filestore::LocalFileStore}, }; use pretty_assertions::assert_eq as pretty_assert_eq; -use std::{collections::HashMap, fmt::Debug, ops::Deref as _, path::Path, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + ops::Deref as _, + path::{Path, PathBuf}, + sync::Arc, + vec, +}; + +use crate::fixture::str_to_vec; fn is_dir_empty(file: &Path, levels_up: usize) -> Option { Some( @@ -74,6 +85,7 @@ fn pod_basic() -> Result<()> { fn pod_job_basic() -> Result<()> { let mut expected_model = pod_job_style(&NAMESPACE_LOOKUP_READ_ONLY)?; let mut pod = expected_model.pod.deref().clone(); + pod.annotation = None; expected_model.pod = Arc::new(pod); basic_test( @@ -241,85 +253,138 @@ fn pod_annotation_delete() -> Result<()> { Ok(()) } +#[expect(clippy::too_many_lines, reason = "Okay because of creating pods")] #[test] fn pod_annotation_unique() -> Result<()> { let test_dirs = TestDirs::new(&HashMap::from([("default".to_owned(), None::)]))?; let store = LocalFileStore::new(test_dirs.0["default"].path().to_path_buf()); - let original_annotation = Annotation { - name: "example".to_owned(), + + // Pod values + let annotation = Annotation { + name: "style-transfer".to_owned(), + description: "This is an example pod.".to_owned(), version: "1.0.0".to_owned(), - description: "original".to_owned(), }; - let mut pod = pod_style()?; - pod.annotation = Some(original_annotation.clone()); - store.save_pod(&pod)?; - let original_hash = pod.hash.clone(); - // case 1: Only change description, should skip saving model and annotation - pod.annotation = Some(Annotation { - description: "new".to_owned(), - ..original_annotation.clone() - }); + let image = "example.server.com/user/style-transfer:1.0.0".to_owned(); + let command = str_to_vec("python /run.py"); + let input_spec = HashMap::from([( + "input_key_1".to_owned(), + PathInfo { + path: PathBuf::from("/input"), + match_pattern: "input/.*".to_owned(), + }, + )]); + let output_spec = HashMap::from([( + "output_key_1".to_owned(), + PathInfo { + path: PathBuf::from("/output"), + match_pattern: "output/.*".to_owned(), + }, + )]); + let output_dir: PathBuf = "/output".into(); + let exec_requirements = RecommendSpecs { + cpus: 0.25, + memory: 1_u64 << 30, + }; + + let gpu_requirements = None; + + let pod = Pod::new( + Some(annotation.clone()), + image.clone(), + command.clone(), + input_spec.clone(), + output_dir.clone(), + output_spec.clone(), + exec_requirements.clone(), + gpu_requirements.clone(), + )?; + + // Save pod above store.save_pod(&pod)?; + // case 1: Only change description, should skip saving model and annotation since overriding annotation is not allowed + let pod_with_new_annotation = Pod::new( + Some(Annotation { + description: "new description".into(), + ..pod.annotation.as_ref().unwrap().clone() + }), + "example.server.com/user/style-transfer:1.0.0".to_owned(), + command, + input_spec.clone(), + "/output".into(), + output_spec.clone(), + exec_requirements.clone(), + gpu_requirements.clone(), + )?; + + store.save_pod(&pod_with_new_annotation)?; pretty_assert_eq!( - store.list_pod()?, - vec![ + HashSet::from_iter(store.list_pod()?), + HashSet::from([ ModelInfo { - name: Some(original_annotation.name.clone()), - version: Some(original_annotation.version.clone()), - hash: original_hash.clone(), + name: Some(annotation.name.clone()), + version: Some(annotation.version.clone()), + hash: pod_with_new_annotation.hash, }, ModelInfo { name: None, version: None, - hash: original_hash.clone(), + hash: pod.hash.clone(), }, - ], + ]), "Pod list didn't return 2 expected entries." ); pretty_assert_eq!( store .load_pod(&ModelID::Annotation( - original_annotation.name.clone(), - original_annotation.version.clone() + annotation.name.clone(), + annotation.version.clone() ))? .annotation, - Some(original_annotation.clone()), + Some(annotation.clone()), "Pod annotation unexpected." ); // case 2: Change description + model, should save model but skip annotation - pod.output_dir = "/output_2".into(); - pod.hash = hash_buffer(to_yaml(&pod)?); - let new_hash = pod.hash.clone(); - store.save_pod(&pod)?; + let pod_with_updated_command = Pod::new( + Some(annotation.clone()), + image, + str_to_vec("python new_run.py"), + input_spec, + output_dir, + output_spec, + exec_requirements, + gpu_requirements, + )?; + store.save_pod(&pod_with_updated_command)?; pretty_assert_eq!( - store.list_pod()?, - vec![ + HashSet::from_iter(store.list_pod()?), + HashSet::from([ ModelInfo { - name: Some(original_annotation.name.clone()), - version: Some(original_annotation.version.clone()), - hash: original_hash.clone(), + name: Some(annotation.name.clone()), + version: Some(annotation.version.clone()), + hash: pod.hash.clone(), }, ModelInfo { name: None, version: None, - hash: original_hash, + hash: pod.hash, }, ModelInfo { name: None, version: None, - hash: new_hash, + hash: pod_with_updated_command.hash, }, - ], + ]), "Pod list didn't return 3 expected entries." ); pretty_assert_eq!( store .load_pod(&ModelID::Annotation( - original_annotation.name.clone(), - original_annotation.version.clone() + annotation.name.clone(), + annotation.version.clone() ))? .annotation, - Some(original_annotation), + Some(annotation), "Pod annotation unexpected." ); Ok(())