diff --git a/Cargo.toml b/Cargo.toml index fccdf985..0636a066 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,7 @@ print_stdout = { level = "allow", priority = 127 } # stdout pub_use = { level = "allow", priority = 127 } # ok to structure source into many files but clean up import pub_with_shorthand = { level = "allow", priority = 127 } # allow use of pub(super) pub_without_shorthand = { level = "allow", priority = 127 } # allow use of pub(in super) +result_large_err = { level = "allow", priority = 127 } # allow large error types in Result question_mark_used = { level = "allow", priority = 127 } # allow question operator self_named_module_files = { level = "allow", priority = 127 } # mod files ok semicolon_inside_block = { level = "allow", priority = 127 } # ok to keep inside block diff --git a/src/core/crypto.rs b/src/core/crypto.rs index b18e3435..58b58323 100644 --- a/src/core/crypto.rs +++ b/src/core/crypto.rs @@ -49,7 +49,7 @@ pub fn hash_buffer(buffer: impl AsRef<[u8]>) -> String { /// Will return error if unable to access file. pub fn hash_file(filepath: impl AsRef) -> Result { hash_stream( - &mut File::open(&filepath).context(selector::InvalidFilepath { + &mut File::open(&filepath).context(selector::InvalidFileOrDirPath { path: filepath.as_ref(), })?, ) @@ -62,7 +62,10 @@ pub fn hash_file(filepath: impl AsRef) -> Result { pub fn hash_dir(dirpath: impl AsRef) -> Result { let summary: BTreeMap = dirpath .as_ref() - .read_dir()? + .read_dir() + .context(selector::InvalidFileOrDirPath { + path: dirpath.as_ref(), + })? .map(|path| { let access_path = path?.path(); Ok(( diff --git a/src/core/error.rs b/src/core/error.rs index 0dd158e1..6bc98984 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -90,8 +90,10 @@ impl fmt::Debug for OrcaError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match &self.kind { Kind::EmptyResponseWhenLoadingContainerAltImage { backtrace, .. } + | Kind::FailedToExtractRunInfo { backtrace, .. } + | Kind::FailedToStartPod { backtrace, .. } | Kind::GeneratedNamesOverflow { backtrace, .. } - | Kind::InvalidFilepath { backtrace, .. } + | Kind::InvalidFileOrDirPath { backtrace, .. } | Kind::InvalidPodResultTerminatedDatetime { backtrace, .. } | Kind::KeyMissing { backtrace, .. } | Kind::NoAnnotationFound { backtrace, .. } diff --git a/src/core/orchestrator/docker.rs b/src/core/orchestrator/docker.rs index 999f01a5..bd6eadde 100644 --- a/src/core/orchestrator/docker.rs +++ b/src/core/orchestrator/docker.rs @@ -7,8 +7,9 @@ use crate::{ }, }; use bollard::{ - container::{Config, CreateContainerOptions, ListContainersOptions}, + container::{Config, CreateContainerOptions, ListContainersOptions, RemoveContainerOptions}, models::{ContainerStateStatusEnum, HostConfig}, + secret::{ContainerInspectResponse, ContainerSummary}, }; use chrono::DateTime; use futures_util::future::join_all; @@ -34,6 +35,11 @@ pub static RE_IMAGE_TAG: LazyLock = LazyLock::new(|| { .expect("Invalid image tag regex.") }); +#[expect(clippy::expect_used, reason = "Valid static regex")] +static RE_FOR_CMD: LazyLock = LazyLock::new(|| { + Regex::new(r#"[^\s"']+|"[^"]*"|'[^']*'"#).expect("Invalid model metadata regex.") +}); + impl LocalDockerOrchestrator { fn prepare_mount_binds( namespace_lookup: &HashMap, @@ -104,7 +110,11 @@ impl LocalDockerOrchestrator { - Pod commands will always have at least 1 element "# )] - pub(crate) fn prepare_container_start_inputs( + /// Prepare the inputs for starting a container. + /// + /// # Errors + /// Will fail if pod job is invalid + pub fn prepare_container_start_inputs( namespace_lookup: &HashMap, pod_job: &PodJob, image: String, @@ -130,11 +140,15 @@ impl LocalDockerOrchestrator { ), ("org.orcapod.pod_job.hash".to_owned(), pod_job.hash.clone()), ]); - let command = pod_job - .pod - .command - .split_whitespace() - .map(String::from) + let command = RE_FOR_CMD + .captures_iter(&pod_job.pod.command) + .map(|capture| { + capture + .extract::<0>() + .0 + .to_owned() + .replace(['\'', '\"'], "") + }) .collect::>(); Ok(( @@ -165,10 +179,7 @@ impl LocalDockerOrchestrator { )) } #[expect( - clippy::cast_sign_loss, clippy::string_slice, - clippy::cast_precision_loss, - clippy::cast_possible_truncation, clippy::indexing_slicing, reason = r#" - Timestamp and memory should always have a value > 0 @@ -181,7 +192,7 @@ impl LocalDockerOrchestrator { pub(crate) async fn list_containers( &self, filters: HashMap>, // https://docs.rs/bollard/latest/bollard/container/struct.ListContainersOptions.html#structfield.filters - ) -> Result> { + ) -> Result>> { Ok(join_all( self.api .list_containers(Some(ListContainersOptions { @@ -205,70 +216,151 @@ impl LocalDockerOrchestrator { ) .await .into_iter() - .filter_map(|result: Result<_>| { - let (container_name, container_summary, container_spec) = result.ok()?; - let terminated_timestamp = - DateTime::parse_from_rfc3339(container_spec.state.as_ref()?.finished_at.as_ref()?) - .ok()? - .timestamp() as u64; - Some(( + .map(|result: Result<_>| { + let (container_name, container_summary, container_inspect_response) = match result { + Ok((container_name, container_summary, container_inspect_response)) => ( + container_name, + container_summary, + container_inspect_response, + ), + Err(error) => { + return Err(error); + } + }; + + Ok( + Self::extract_run_info(&container_summary, &container_inspect_response) + .map(|run_info| (container_name.clone(), run_info)) + .context(selector::FailedToExtractRunInfo { container_name })?, + ) + })) + } + pub(crate) async fn delete_container(&self, container_name: &str) -> Result<()> { + self.api + .remove_container( container_name, - RunInfo { - image: container_spec.config.as_ref()?.image.as_ref()?.clone(), - created: container_summary.created? as u64, - terminated: (terminated_timestamp > 0).then_some(terminated_timestamp), - env_vars: container_spec - .config - .as_ref()? - .env + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + ) + .await?; + Ok(()) + } + + #[expect( + clippy::cast_sign_loss, + clippy::cast_precision_loss, + clippy::cast_possible_truncation, + reason = r#" + - Timestamp and memory should always have a value > 0 + - Container will always have a name with more than 1 character + - No issue in core casting if between 0 - 3.40e38(f32:MAX) + - No issue in exit code casting if between -3.27e4(i16:MIN) - 3.27e4(i16:MAX) + - Containers will always have at least 1 name with at least 2 characters + - This functions requires a lot of boilerplate code to extract the run info + "# + )] + fn extract_run_info( + container_summary: &ContainerSummary, + container_inspect_response: &ContainerInspectResponse, + ) -> Option { + let terminated_timestamp = DateTime::parse_from_rfc3339( + container_inspect_response + .state + .as_ref()? + .finished_at + .as_ref()?, + ) + .ok()? + .timestamp() as u64; + Some(RunInfo { + image: container_inspect_response + .config + .as_ref()? + .image + .as_ref()? + .clone(), + created: container_summary.created? as u64, + terminated: (terminated_timestamp > 0).then_some(terminated_timestamp), + env_vars: container_inspect_response + .config + .as_ref()? + .env + .as_ref()? + .iter() + .filter_map(|x| { + x.split_once('=') + .map(|(key, value)| (key.to_owned(), value.to_owned())) + }) + .collect(), + command: format!( + "{} {}", + container_inspect_response + .config + .as_ref()? + .entrypoint + .as_ref()? + .join(" "), + container_inspect_response + .config + .as_ref()? + .cmd + .as_ref()? + .join(" ") + ), + status: match ( + container_inspect_response.state.as_ref()?.status?, + container_inspect_response.state.as_ref()?.exit_code? as i16, + ) { + (ContainerStateStatusEnum::RUNNING, _) => Status::Running, + (ContainerStateStatusEnum::EXITED, 0) => Status::Completed, + (ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::DEAD, code) => { + Status::Failed(code) + } + ( + ContainerStateStatusEnum::CREATED | ContainerStateStatusEnum::RESTARTING, + code, + ) => { + if container_inspect_response + .state .as_ref()? - .iter() - .filter_map(|x| { - x.split_once('=') - .map(|(key, value)| (key.to_owned(), value.to_owned())) - }) - .collect(), - command: format!( - "{} {}", - container_spec - .config - .as_ref()? - .entrypoint - .as_ref()? - .join(" "), - container_spec.config.as_ref()?.cmd.as_ref()?.join(" ") - ), - status: match ( - container_spec.state.as_ref()?.status.as_ref()?, - container_spec.state.as_ref()?.exit_code? as i16, - ) { - (ContainerStateStatusEnum::RUNNING, _) => Status::Running, - (ContainerStateStatusEnum::EXITED, 0) => Status::Completed, - (ContainerStateStatusEnum::EXITED, code) => Status::Failed(code), - _ => todo!(), - }, - mounts: container_spec - .mounts + .error .as_ref()? - .iter() - .map(|mount_point| { - Some(format!( - "{}:{}{}", - mount_point.source.as_ref()?, - mount_point.destination.as_ref()?, - mount_point - .mode - .as_ref() - .map_or_else(String::new, |mode| format!(":{mode}")) - )) - }) - .collect::>>()?, - labels: container_spec.config.as_ref()?.labels.as_ref()?.clone(), - cpu_limit: container_spec.host_config.as_ref()?.nano_cpus? as f32 - / 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9 - memory_limit: container_spec.host_config.as_ref()?.memory? as u64, - }, - )) - })) + .is_empty() + { + Status::Queued + } else { + Status::Failed(code) + } + } + _ => Status::Unknown, + }, + mounts: container_inspect_response + .mounts + .as_ref()? + .iter() + .map(|mount_point| { + Some(format!( + "{}:{}{}", + mount_point.source.as_ref()?, + mount_point.destination.as_ref()?, + mount_point + .mode + .as_ref() + .map_or_else(String::new, |mode| format!(":{mode}")) + )) + }) + .collect::>>()?, + labels: container_inspect_response + .config + .as_ref()? + .labels + .as_ref()? + .clone(), + cpu_limit: container_inspect_response.host_config.as_ref()?.nano_cpus? as f32 + / 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9 + memory_limit: container_inspect_response.host_config.as_ref()?.memory? as u64, + }) } } diff --git a/src/uniffi/error.rs b/src/uniffi/error.rs index 3ef9daff..7462f0bc 100644 --- a/src/uniffi/error.rs +++ b/src/uniffi/error.rs @@ -29,10 +29,25 @@ pub(crate) enum Kind { path: PathBuf, backtrace: Option, }, + #[snafu(display( + "Failed to extract run info from the container image file: {container_name}." + ))] + FailedToExtractRunInfo { + container_name: String, + backtrace: Option, + }, + #[snafu(display( + "Fail to start pod with container_name: {container_name} with error: {source}" + ))] + FailedToStartPod { + container_name: String, + source: BollardError, + backtrace: Option, + }, #[snafu(display("Out of generated random names."))] GeneratedNamesOverflow { backtrace: Option }, #[snafu(display("{source} ({path:?})."))] - InvalidFilepath { + InvalidFileOrDirPath { path: PathBuf, source: io::Error, backtrace: Option, @@ -122,4 +137,8 @@ impl OrcaError { pub const fn is_purged_pod_run(&self) -> bool { matches!(self.kind, Kind::NoMatchingPodRun { .. }) } + /// Returns `true` if the error was caused by an invalid file or directory path. + pub const fn is_failed_to_start_pod(&self) -> bool { + matches!(self.kind, Kind::FailedToStartPod { .. }) + } } diff --git a/src/uniffi/orchestrator/docker.rs b/src/uniffi/orchestrator/docker.rs index 62f2465e..9852c818 100644 --- a/src/uniffi/orchestrator/docker.rs +++ b/src/uniffi/orchestrator/docker.rs @@ -12,12 +12,13 @@ use crate::{ use async_trait; use bollard::{ Docker, - container::{RemoveContainerOptions, StartContainerOptions, WaitContainerOptions}, + container::{StartContainerOptions, WaitContainerOptions}, image::{CreateImageOptions, ImportImageOptions}, }; +use colored::Colorize as _; use derive_more::Display; use futures_util::stream::{StreamExt as _, TryStreamExt as _}; -use snafu::{OptionExt as _, futures::TryFutureExt as _}; +use snafu::{OptionExt as _, ResultExt as _, futures::TryFutureExt as _}; use std::{collections::HashMap, path::PathBuf, sync::Arc}; use tokio::fs::File; use tokio_util::{ @@ -89,7 +90,7 @@ impl Orchestrator for LocalDockerOrchestrator { let location = namespace_lookup[&image_info.namespace].join(&image_info.path); let byte_stream = FramedRead::new( File::open(&location) - .context(selector::InvalidFilepath { path: &location }) + .context(selector::InvalidFileOrDirPath { path: &location }) .await?, BytesCodec::new(), ) @@ -126,7 +127,11 @@ impl Orchestrator for LocalDockerOrchestrator { .await?; self.api .start_container(&assigned_name, None::>) - .await?; + .await + .context(selector::FailedToStartPod { + container_name: assigned_name.clone(), + })?; + Ok(PodRun::new::(pod_job, assigned_name)) } async fn start( @@ -155,7 +160,8 @@ impl Orchestrator for LocalDockerOrchestrator { vec!["org.orcapod=true".to_owned()], )])) .await? - .map(|(assigned_name, run_info)| { + .map(|result| { + let (assigned_name, run_info) = result?; let pod_job: PodJob = serde_json::from_str(get(&run_info.labels, "org.orcapod.pod_job")?)?; Ok(PodRun::new::(&pod_job, assigned_name)) @@ -163,16 +169,7 @@ impl Orchestrator for LocalDockerOrchestrator { .collect() } async fn delete(&self, pod_run: &PodRun) -> Result<()> { - self.api - .remove_container( - &pod_run.assigned_name, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - ) - .await?; - Ok(()) + self.delete_container(&pod_run.assigned_name).await } async fn get_info(&self, pod_run: &PodRun) -> Result { let labels = vec![ @@ -183,21 +180,41 @@ impl Orchestrator for LocalDockerOrchestrator { ), format!("org.orcapod.pod_job.hash={}", pod_run.pod_job.hash), ]; + + // Add names to the filters + let container_filters = HashMap::from([ + ("label".to_owned(), labels), + ( + "name".to_owned(), + Vec::from([pod_run.assigned_name.clone()]), + ), + ]); + let (_, run_info) = self - .list_containers(HashMap::from([("label".to_owned(), labels)])) + .list_containers(container_filters) .await? .next() .context(selector::NoMatchingPodRun { pod_job_hash: pod_run.pod_job.hash.clone(), - })?; + })??; Ok(run_info) } async fn get_result(&self, pod_run: &PodRun) -> Result { - self.api + match self + .api .wait_container(&pod_run.assigned_name, None::>) .try_collect::>() - .await?; + .await + { + Ok(_) => {} + Err(error) => println!( + "{}{}", + "Warning: ".bright_yellow(), + error.to_string().bright_cyan() + ), + } let result_info = self.get_info(pod_run).await?; + PodResult::new( None, Arc::clone(&pod_run.pod_job), diff --git a/src/uniffi/orchestrator/mod.rs b/src/uniffi/orchestrator/mod.rs index 12ffad88..7461d9cb 100644 --- a/src/uniffi/orchestrator/mod.rs +++ b/src/uniffi/orchestrator/mod.rs @@ -17,12 +17,16 @@ pub enum ImageKind { /// Status of a particular compute run. #[derive(uniffi::Enum, Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Default)] pub enum Status { + /// Container is created and is pending execution + Queued, /// Run is ongoing. Running, /// Run has completed successfully. Completed, /// Run failed with the provided error code. Failed(i16), + /// Catch all for all undefine behavior + Unknown, /// No status set. #[default] Unset, @@ -52,7 +56,7 @@ pub struct RunInfo { pub memory_limit: u64, } /// Current computation managed by orchestrator. -#[derive(uniffi::Record, Debug)] +#[derive(uniffi::Record, Debug, PartialEq)] pub struct PodRun { /// Original compute request. pub pod_job: Arc, diff --git a/tests/crypto.rs b/tests/crypto.rs index cbb1f54f..3103a875 100644 --- a/tests/crypto.rs +++ b/tests/crypto.rs @@ -1,10 +1,21 @@ -#![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] +#![expect( + missing_docs, + clippy::panic_in_result_fn, + clippy::indexing_slicing, + clippy::panic, + reason = "OK in tests." +)] +pub mod fixture; +use fixture::pod_style; use orcapod::{ core::crypto::{hash_buffer, hash_dir, hash_file}, - uniffi::error::Result, + uniffi::{ + error::Result, + model::{Annotation, Blob, BlobKind, PathSet, PodJob, URI}, + }, }; -use std::fs::read; +use std::{collections::HashMap, fs::read, path::PathBuf}; #[test] fn consistent_hash() -> Result<()> { @@ -27,3 +38,52 @@ fn complex_hash() -> Result<()> { ); Ok(()) } + +#[test] +fn nested_dir_hash() -> Result<()> { + let namespace_lookup = HashMap::from([("default".to_owned(), PathBuf::from("./tests"))]); + + let pod_job = PodJob::new( + Some(Annotation { + name: "style-transfer".to_owned(), + description: "This is an example pod job.".to_owned(), + version: "0.1.0".to_owned(), + }), + pod_style()?.into(), + HashMap::from([( + "nested_dir".to_owned(), + PathSet::Unary(Blob { + kind: BlobKind::Directory, + location: URI { + namespace: "default".to_owned(), + path: "extra".into(), + }, + checksum: String::new(), + }), + )]), + URI { + namespace: "default".to_owned(), + path: PathBuf::from("output"), + }, + 0.5, // 500 millicores as frac cores + 2_u64 << 30, // 2GiB in bytes, KiB=<<10, MiB=<<20, GiB=<<30 + Some(HashMap::from([ + ("ZZZ".to_owned(), "PLEASE".to_owned()), + ("AAA".to_owned(), "SORT".to_owned()), + ])), + &namespace_lookup, + )?; + + match &pod_job.input_packet["nested_dir"] { + PathSet::Unary(blob) => { + assert_eq!( + blob.checksum, + hash_dir("./tests/extra")?, + "Checksum didn't match." + ); + } + PathSet::Collection(_) => panic!("Expected a Unary input."), + } + + Ok(()) +} diff --git a/tests/orchestrator.rs b/tests/orchestrator.rs index be55a902..ca0c1ba2 100644 --- a/tests/orchestrator.rs +++ b/tests/orchestrator.rs @@ -1,13 +1,25 @@ -#![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] +#![expect( + missing_docs, + clippy::panic_in_result_fn, + clippy::panic, + clippy::unwrap_used, + clippy::indexing_slicing, + reason = "OK in tests." +)] pub mod fixture; +use bollard::image::CreateImageOptions; use fixture::{TestContainerImage, TestDirs, container_image_style, pod_job_style}; +use futures_util::StreamExt as _; use orcapod::uniffi::{ error::Result, model::URI, orchestrator::{ImageKind, Orchestrator as _, PodRun, Status, docker::LocalDockerOrchestrator}, }; -use std::{collections::HashMap, ops::Deref as _, path::PathBuf, sync::Arc}; +use std::{ + collections::HashMap, ops::Deref as _, path::PathBuf, sync::Arc, thread::sleep, time::Duration, +}; +use tokio::runtime::Runtime; fn basic_test(start: T) -> Result<()> where @@ -32,6 +44,7 @@ where orchestrator .list_blocking()? .iter() + .filter(|run| **run == pod_run) .map(|run| Ok(orchestrator.get_info_blocking(run)?.command)) .collect::>>()?, vec![expected_command.clone()], @@ -48,6 +61,7 @@ where orchestrator .list_blocking()? .iter() + .filter(|run| **run == pod_run) .map(|run| Ok(orchestrator.get_info_blocking(run)?.command)) .collect::>>()?, vec![expected_command], @@ -63,7 +77,7 @@ where // test delete orchestrator.delete_blocking(&pod_run)?; assert!( - orchestrator.list_blocking()?.is_empty(), + !orchestrator.list_blocking()?.contains(&pod_run), "Unexpected container remains." ); // try getting info of a purged pod run @@ -88,7 +102,7 @@ fn offline_container_image_basic() -> Result<()> { namespace_lookup["default"].join(container_image_relative_location), )?; let mut pod_job = pod_job_style(namespace_lookup)?; - pod_job.env_vars = Some(HashMap::from([("DELAY".to_owned(), "5".to_owned())])); + pod_job.env_vars = Some(HashMap::from([("DELAY".to_owned(), "1".to_owned())])); Ok(( orchestrator.start_with_altimage_blocking( namespace_lookup, @@ -118,3 +132,208 @@ fn remote_container_image_basic() -> Result<()> { )) }) } + +fn execute_wrapper(test_fn: T) -> Result<()> +where + T: Fn(&HashMap, &LocalDockerOrchestrator) -> Result<()>, +{ + let test_dirs = TestDirs::new(&HashMap::from([( + "default".to_owned(), + Some("./tests/extra/data/"), + )]))?; + let namespace_lookup = test_dirs.namespace_lookup(); + let orchestrator = LocalDockerOrchestrator::new()?; + + test_fn(&namespace_lookup, &orchestrator) +} + +#[test] +fn command_parse() -> Result<()> { + execute_wrapper(|namespace_lookup, orchestrator| { + let mut pod_job = pod_job_style(namespace_lookup)?; + + let mut pod = pod_job.pod.deref().clone(); + pod.image = "alpine:3.14".to_owned(); + pod.command = r#"sh -c "echo hi1 && echo 'hi2'"""#.to_owned(); + pod.input_spec = HashMap::new(); + pod_job.pod = pod.into(); + + pod_job.input_packet = HashMap::new(); + + let pod_run = orchestrator.start_blocking(namespace_lookup, &pod_job)?; + sleep(Duration::from_secs(1)); + let pod_result = orchestrator.get_result_blocking(&pod_run)?; + + assert_eq!( + pod_result.status, + Status::Completed, + "Pod status is not completed" + ); + + orchestrator.delete_blocking(&pod_run)?; + + assert!( + !orchestrator.list_blocking()?.contains(&pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} + +#[test] +/// Expect pod to fail due to bad command, where the expected behavior should auto delete the container and return an error +fn fail_at_start() -> Result<()> { + execute_wrapper(|namespace_lookup, orchestrator| { + let mut pod_job = pod_job_style(namespace_lookup)?; + let mut pod = pod_job.pod.deref().clone(); + pod.image = "alpine:3.14".to_owned(); + pod.command = "python file_does_not_exist.py".to_owned(); + pod_job.pod = pod.into(); + + let container_name = match orchestrator.start_blocking(namespace_lookup, &pod_job) { + Ok(_) => panic!("Pod was launched successfully when it should have failed."), + Err(err) => { + assert!(err.is_failed_to_start_pod()); + err.to_string().split(' ').collect::>()[6].to_owned() + } + }; + + // Make sure the pod has been deleted after failing to start + let list_result = orchestrator + .list_blocking()? + .into_iter() + .filter(|pod_run| pod_run.assigned_name == *container_name) + .collect::>(); + + assert!( + list_result.len() == 1, + "List didn't return just the fail pod." + ); + + let pod_run = list_result.first().unwrap(); + + // Get the pod result and make sure it is in failed state + let pod_result = orchestrator.get_result_blocking(pod_run)?; + + assert_eq!( + pod_result.status, + Status::Failed(127), + "Pod status is not failed" + ); + + // Clean up the pod + orchestrator.delete_blocking(pod_run)?; + + assert!( + !orchestrator.list_blocking()?.contains(pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} + +#[test] +fn fail_during_execution() -> Result<()> { + execute_wrapper(|namespace_lookup, orchestrator| { + let mut pod_job = pod_job_style(namespace_lookup)?; + let mut pod = pod_job.pod.deref().clone(); + + pod.image = "alpine:3.14".to_owned(); + pod.command = "python file_does_not_exist.py".to_owned(); + + pod.image = "alpine:3.14".to_owned(); + pod.command = r#"bin/sh -c 'echo "hi" && bad_command'"#.to_owned(); + pod.input_spec = HashMap::new(); + pod_job.pod = pod.into(); + pod_job.input_packet = HashMap::new(); + + // Start job and wait for completion + let pod_run = orchestrator.start_blocking(namespace_lookup, &pod_job)?; + sleep(Duration::from_secs(1)); + let pod_result = orchestrator.get_result_blocking(&pod_run)?; + + assert_eq!( + pod_result.status, + Status::Failed(127), + "Should be in failed state" + ); + + // Clean up the pod + orchestrator.delete_blocking(&pod_run)?; + + assert!( + !orchestrator.list_blocking()?.contains(&pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +} + +#[test] +fn test_queued_status_container() -> Result<()> { + execute_wrapper(|namespace_lookup, orchestrator| { + let mut pod_job = pod_job_style(namespace_lookup)?; + let mut pod = pod_job.pod.deref().clone(); + + pod.image = "alpine:3.14".to_owned(); + pod.command = "python file_does_not_exist.py".to_owned(); + pod.input_spec = HashMap::new(); + pod_job.pod = pod.into(); + pod_job.input_packet = HashMap::new(); + + let runtime = Runtime::new()?; + + let image_options = Some(CreateImageOptions { + from_image: pod_job.pod.image.clone(), + ..Default::default() + }); + runtime.block_on( + orchestrator + .api + .create_image(image_options, None, None) + .collect::>(), + ); + + // Start job and wait for completion + let (container_name, options, config) = + LocalDockerOrchestrator::prepare_container_start_inputs( + namespace_lookup, + &pod_job, + pod_job.pod.image.clone(), + )?; + + runtime.block_on(orchestrator.api.create_container(options, config))?; + + // List all containers and check if the queued_container is in the list + let pod_runs = orchestrator + .list_blocking()? + .into_iter() + .filter(|pod_run| pod_run.assigned_name == container_name) + .collect::>(); + + assert!( + pod_runs.len() == 1, + "List didn't return just the queued pod." + ); + + let pod_run = pod_runs.first().unwrap(); + + // Check that the status is queued + assert!( + orchestrator.get_info_blocking(pod_run)?.status == Status::Queued, + "Status is not queued" + ); + + // Clean up container + orchestrator.delete_blocking(pod_run)?; + assert!( + !orchestrator.list_blocking()?.contains(pod_run), + "Unexpected container remains." + ); + + Ok(()) + }) +}