From 20bea90d0c868a830afa6209ab34e7688579db0b Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Thu, 7 May 2026 16:34:59 -0400 Subject: [PATCH 01/10] Add ValidatedJobSubmission --- components/spider-core/src/job.rs | 148 ++++++++++++++++++ components/spider-core/src/task.rs | 3 + components/spider-core/src/types/io.rs | 2 +- components/spider-storage/src/cache/error.rs | 3 - components/spider-storage/src/cache/job.rs | 22 +-- components/spider-storage/src/cache/task.rs | 80 +++++----- components/spider-storage/src/db/mariadb.rs | 10 +- components/spider-storage/src/db/protocol.rs | 17 +- .../spider-storage/src/state/job_cache.rs | 14 +- .../spider-storage/src/task_instance_pool.rs | 13 +- components/spider-storage/tests/jcb_test.rs | 32 ++-- .../spider-storage/tests/mariadb_test.rs | 88 ++++++++--- .../spider-storage/tests/scheduling_infra.rs | 31 ++-- 13 files changed, 323 insertions(+), 140 deletions(-) diff --git a/components/spider-core/src/job.rs b/components/spider-core/src/job.rs index 897809df..3277f0a7 100644 --- a/components/spider-core/src/job.rs +++ b/components/spider-core/src/job.rs @@ -1,5 +1,153 @@ use spider_derive::MySqlEnum; +use crate::{ + task::{Error, TaskGraph}, + types::io::TaskInput, +}; + +/// A validated wrapper around a task graph and its corresponding job inputs. +/// +/// This type guarantees at construction time that: +/// +/// * The task graph contains at least one task. +/// * The number of job inputs matches the number of graph inputs expected by the task graph. +/// +/// By passing this type through the call chain, downstream consumers can trust the consistency +/// invariant without re-validating. +#[derive(Debug)] +pub struct ValidatedJobSubmission { + task_graph: TaskGraph, + inputs: Vec, +} + +impl ValidatedJobSubmission { + /// Creates a new validated job submission. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`Error::InvalidJobSubmission`] if the task graph contains no tasks. + /// * [`Error::InvalidJobSubmission`] if the number of inputs does not match the number of graph + /// inputs. + pub fn new(task_graph: TaskGraph, inputs: Vec) -> Result { + let num_tasks = task_graph.get_num_tasks(); + if num_tasks == 0 { + return Err(Error::InvalidJobSubmission( + "task graph must contain at least one task".to_owned(), + )); + } + let expected_inputs = task_graph.get_task_graph_input_indices().len(); + let actual_inputs = inputs.len(); + if expected_inputs != actual_inputs { + return Err(Error::InvalidJobSubmission(format!( + "expected {expected_inputs} graph inputs, got {actual_inputs}" + ))); + } + Ok(Self { task_graph, inputs }) + } + + /// # Returns + /// + /// A reference to the validated task graph. + #[must_use] + pub const fn task_graph(&self) -> &TaskGraph { + &self.task_graph + } + + /// # Returns + /// + /// A reference to the validated job inputs. + #[must_use] + pub fn inputs(&self) -> &[TaskInput] { + &self.inputs + } + + /// Consumes the wrapper and returns the owned task graph and job inputs. + /// + /// # Returns + /// + /// A tuple of `(task_graph, inputs)`. + #[must_use] + pub fn into_parts(self) -> (TaskGraph, Vec) { + (self.task_graph, self.inputs) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::task::{ + DataTypeDescriptor, + ExecutionPolicy, + TaskDescriptor, + TaskGraph as SubmittedTaskGraph, + TdlContext, + ValueTypeDescriptor, + }; + + fn create_single_input_task_graph() -> SubmittedTaskGraph { + let bytes_type = DataTypeDescriptor::Value(ValueTypeDescriptor::bytes()); + let mut graph = + SubmittedTaskGraph::new(None, None).expect("task graph creation should succeed"); + graph + .insert_task(TaskDescriptor { + tdl_context: TdlContext { + package: "test_pkg".to_owned(), + task_func: "test_fn".to_owned(), + }, + execution_policy: Some(ExecutionPolicy::default()), + inputs: vec![bytes_type], + outputs: vec![], + input_sources: None, + }) + .expect("task insertion should succeed"); + graph + } + + #[test] + fn valid_job_submission_succeeds() { + let graph = create_single_input_task_graph(); + let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; + let result = ValidatedJobSubmission::new(graph, inputs); + assert!(result.is_ok(), "valid submission should succeed"); + } + + #[test] + fn empty_task_graph_fails() { + let graph = + SubmittedTaskGraph::new(None, None).expect("task graph creation should succeed"); + let inputs = vec![]; + let result = ValidatedJobSubmission::new(graph, inputs); + assert!( + matches!(result, Err(Error::InvalidJobSubmission(_))), + "empty task graph should return InvalidJobSubmission" + ); + } + + #[test] + fn mismatched_input_count_fails() { + let graph = create_single_input_task_graph(); + let inputs = vec![]; + let result = ValidatedJobSubmission::new(graph, inputs); + assert!( + matches!(result, Err(Error::InvalidJobSubmission(_))), + "mismatched input count should return InvalidJobSubmission" + ); + } + + #[test] + fn into_parts_returns_owned_components() { + let graph = create_single_input_task_graph(); + let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; + let submission = + ValidatedJobSubmission::new(graph, inputs).expect("submission should be valid"); + let (graph, inputs) = submission.into_parts(); + assert_eq!(graph.get_num_tasks(), 1, "task graph should have 1 task"); + assert_eq!(inputs.len(), 1, "should have 1 input"); + } +} + /// Represents a job in the Spider scheduling framework. pub struct Job {} diff --git a/components/spider-core/src/task.rs b/components/spider-core/src/task.rs index ac6ee329..a1b3c0ce 100644 --- a/components/spider-core/src/task.rs +++ b/components/spider-core/src/task.rs @@ -27,6 +27,9 @@ pub enum Error { #[error("invalid timeout policy: {0}")] InvalidTimeoutPolicy(String), + + #[error("invalid job submission: {0}")] + InvalidJobSubmission(String), } /// Enum for all possible states of a task. diff --git a/components/spider-core/src/types/io.rs b/components/spider-core/src/types/io.rs index 0186fb46..00c06057 100644 --- a/components/spider-core/src/types/io.rs +++ b/components/spider-core/src/types/io.rs @@ -6,7 +6,7 @@ use crate::{ }; /// Represents an input of a task. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub enum TaskInput { ValuePayload(Vec), } diff --git a/components/spider-storage/src/cache/error.rs b/components/spider-storage/src/cache/error.rs index aa1a24bd..7226bfd6 100644 --- a/components/spider-storage/src/cache/error.rs +++ b/components/spider-storage/src/cache/error.rs @@ -41,9 +41,6 @@ pub enum InternalError { #[error("task graph corrupted: {0}")] TaskGraphCorrupted(String), - #[error("task graph input size mismatch: expected {0}, got {1}")] - TaskGraphInputsSizeMismatch(usize, usize), - #[error("job not started")] JobNotStarted, diff --git a/components/spider-storage/src/cache/job.rs b/components/spider-storage/src/cache/job.rs index d2b8e20c..cc89bb99 100644 --- a/components/spider-storage/src/cache/job.rs +++ b/components/spider-storage/src/cache/job.rs @@ -7,11 +7,11 @@ use std::{ }; use spider_core::{ - job::JobState, - task::{TaskGraph as SubmittedTaskGraph, TaskIndex, TaskState}, + job::{JobState, ValidatedJobSubmission}, + task::{TaskIndex, TaskState}, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId, TaskInstanceId}, - io::{ExecutionContext, TaskInput, TaskOutput}, + io::{ExecutionContext, TaskOutput}, }, }; use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; @@ -63,27 +63,17 @@ impl< /// /// Returns an error if: /// - /// * [`InternalError::TaskGraphCorrupted`] if the given task graph doesn't contain any tasks. - /// The current version of JCB requires the job contains at least one task. /// * Forwards [`TaskGraph::create`]'s return values on failure. pub async fn create( id: JobId, owner_id: ResourceGroupId, - submitted_task_graph: &SubmittedTaskGraph, - inputs: Vec, + job_submission: ValidatedJobSubmission, ready_queue_sender: ReadyQueueSenderType, db_connector: DbConnectorType, task_instance_pool_connector: TaskInstancePoolConnectorType, ) -> Result { - let num_tasks = submitted_task_graph.get_num_tasks(); - if 0 == num_tasks { - return Err(InternalError::TaskGraphCorrupted( - "task graph with no task is unsupported".to_owned(), - ) - .into()); - } - - let task_graph = TaskGraph::create(submitted_task_graph, inputs).await?; + let num_tasks = job_submission.task_graph().get_num_tasks(); + let task_graph = TaskGraph::create(&job_submission).await?; let job_execution_state = JobExecutionState { state: JobState::Ready, task_graph, diff --git a/components/spider-storage/src/cache/task.rs b/components/spider-storage/src/cache/task.rs index 63ae1a9b..ffd258f8 100644 --- a/components/spider-storage/src/cache/task.rs +++ b/components/spider-storage/src/cache/task.rs @@ -5,15 +5,8 @@ use std::{ }; use spider_core::{ - task::{ - Task, - TaskGraph as SubmittedTaskGraph, - TaskIndex, - TaskState, - TdlContext, - TerminationTaskDescriptor, - TimeoutPolicy, - }, + job::ValidatedJobSubmission, + task::{Task, TaskIndex, TaskState, TdlContext, TerminationTaskDescriptor, TimeoutPolicy}, types::{ id::TaskInstanceId, io::{ExecutionContext, TaskInput, TaskOutput}, @@ -38,7 +31,7 @@ pub struct TaskGraph { impl TaskGraph { /// Factory function. /// - /// Creates a new task graph from a submitted task graph and the input task inputs. + /// Creates a new task graph from a validated job submission. /// /// # Returns /// @@ -49,31 +42,25 @@ impl TaskGraph { /// Returns an error if: /// /// * [`InternalError::TaskGraphCorrupted`] if: - /// * Any dataflow deps' index is out-of-range. + /// * Any dataflow deps’ index is out-of-range. /// * Any task index is out-of-range. - /// * [`InternalError::TaskGraphInputsSizeMismatch`] if the number of provided inputs does not - /// match the task graph’s expected number of inputs. - /// * Forwards [`SharedTaskControlBlock::create`]'s return values on failure. + /// * Forwards [`SharedTaskControlBlock::create`]’s return values on failure. /// /// # Panics /// /// Panics if the internal TCB buffer is corrupted. - pub async fn create( - submitted_task_graph: &SubmittedTaskGraph, - inputs: Vec, - ) -> Result { + pub async fn create(job_submission: &ValidatedJobSubmission) -> Result { + let submitted_task_graph = job_submission.task_graph(); + let inputs = job_submission.inputs(); let dataflow_dep_buffer: Vec> = (0..submitted_task_graph .get_num_dataflow_deps()) .map(|_| SharedRw::new(RwLock::new(ValuePayload::default()))) .collect(); let task_graph_input_indices = submitted_task_graph.get_task_graph_input_indices(); - if inputs.len() != task_graph_input_indices.len() { - return Err(InternalError::TaskGraphInputsSizeMismatch( - task_graph_input_indices.len(), - inputs.len(), - )); - } - for (deps_index, input) in task_graph_input_indices.into_iter().zip(inputs) { + for (deps_index, input) in task_graph_input_indices + .into_iter() + .zip(inputs.iter().cloned()) + { let dataflow_dep = dataflow_dep_buffer.get(deps_index).ok_or_else(|| { InternalError::TaskGraphCorrupted( "dataflow dependency index out-of-range".to_owned(), @@ -939,13 +926,16 @@ mod tests { }, }; - use spider_core::task::{ - DataTypeDescriptor, - ExecutionPolicy, - TaskDescriptor, - TaskGraph as SubmittedTaskGraph, - TerminationTaskDescriptor, - ValueTypeDescriptor, + use spider_core::{ + job::ValidatedJobSubmission, + task::{ + DataTypeDescriptor, + ExecutionPolicy, + TaskDescriptor, + TaskGraph as SubmittedTaskGraph, + TerminationTaskDescriptor, + ValueTypeDescriptor, + }, }; use super::*; @@ -1092,7 +1082,9 @@ mod tests { let inputs: Vec = (0..num_inputs) .map(|_| TaskInput::ValuePayload(vec![0u8; 4])) .collect(); - TaskGraph::create(&submitted, inputs) + let job_submission = + ValidatedJobSubmission::new(submitted, inputs).expect("job submission should be valid"); + TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed") } @@ -1107,7 +1099,7 @@ mod tests { max_num_instances: u32, max_num_retry: u32, ) -> SharedTerminationTaskControlBlock { - let submitted = SubmittedTaskGraph::new( + let mut submitted = SubmittedTaskGraph::new( Some(TerminationTaskDescriptor { tdl_context: TdlContext { package: "test_pkg".to_owned(), @@ -1122,7 +1114,21 @@ mod tests { None, ) .expect("task graph with commit task should be created"); - let task_graph = TaskGraph::create(&submitted, vec![]) + submitted + .insert_task(TaskDescriptor { + tdl_context: TdlContext { + package: "test_pkg".to_owned(), + task_func: "dummy_fn".to_owned(), + }, + execution_policy: Some(ExecutionPolicy::default()), + inputs: vec![], + outputs: vec![], + input_sources: None, + }) + .expect("task insertion should succeed"); + let job_submission = + ValidatedJobSubmission::new(submitted, vec![]).expect("job submission should be valid"); + let task_graph = TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed"); task_graph @@ -1237,7 +1243,9 @@ mod tests { TaskInput::ValuePayload(input_a), TaskInput::ValuePayload(input_b), ]; - TaskGraph::create(&submitted, inputs) + let job_submission = + ValidatedJobSubmission::new(submitted, inputs).expect("job submission should be valid"); + TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed") } diff --git a/components/spider-storage/src/db/mariadb.rs b/components/spider-storage/src/db/mariadb.rs index 1005e9c1..902849a8 100644 --- a/components/spider-storage/src/db/mariadb.rs +++ b/components/spider-storage/src/db/mariadb.rs @@ -4,11 +4,10 @@ use async_trait::async_trait; use const_format::formatcp; use secrecy::ExposeSecret; use spider_core::{ - job::JobState, - task::TaskGraph, + job::{JobState, ValidatedJobSubmission}, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId, SessionId}, - io::{TaskInput, TaskOutput}, + io::TaskOutput, }, }; use spider_derive::MySqlEnum; @@ -98,8 +97,7 @@ impl ExternalJobOrchestration for MariaDbStorageConnector { async fn register( &self, resource_group_id: ResourceGroupId, - task_graph: &TaskGraph, - job_inputs: &[TaskInput], + job_submission: &ValidatedJobSubmission, ) -> Result { const INSERT_QUERY: &str = formatcp!( "INSERT INTO `{table}` (`resource_group_id`, `serialized_task_graph`, \ @@ -107,6 +105,8 @@ impl ExternalJobOrchestration for MariaDbStorageConnector { table = JOBS_TABLE_NAME, ); + let task_graph = job_submission.task_graph(); + let job_inputs = job_submission.inputs(); let serialized_task_graph = task_graph .to_json() .map_err(|e| DbError::TaskGraphSerializationFailure(Box::new(e)))?; diff --git a/components/spider-storage/src/db/protocol.rs b/components/spider-storage/src/db/protocol.rs index 89ba3d9c..56231c26 100644 --- a/components/spider-storage/src/db/protocol.rs +++ b/components/spider-storage/src/db/protocol.rs @@ -2,11 +2,10 @@ use std::net::IpAddr; use async_trait::async_trait; use spider_core::{ - job::JobState, - task::TaskGraph, + job::{JobState, ValidatedJobSubmission}, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId, SessionId}, - io::{TaskInput, TaskOutput}, + io::TaskOutput, }, }; @@ -36,8 +35,7 @@ pub trait ExternalJobOrchestration { /// # Parameters /// /// * `resource_group_id` - The owner of the created job. - /// * `task_graph` - The task graph representing the job's tasks and their dependencies. - /// * `job_inputs` - A slice of job inputs required for the job. + /// * `job_submission` - The validated job submission containing the task graph and job inputs. /// /// # Returns /// @@ -51,17 +49,10 @@ pub trait ExternalJobOrchestration { /// * [`DbError::TaskGraphSerializationFailure`] if the `task_graph` serialization fails. /// * [`DbError::ValueSerializationFailure`] if the `job_inputs` serialization fails. /// * Forwards [`sqlx::error::Error`] on DB operation failure. - /// - /// # Note - /// - /// This function assumes that the `task_graph` and `job_inputs` are consistent. - /// - /// TODO: Fix this when #284 is addressed. async fn register( &self, resource_group_id: ResourceGroupId, - task_graph: &TaskGraph, - job_inputs: &[TaskInput], + job_submission: &ValidatedJobSubmission, ) -> Result; /// Gets the state of a job. diff --git a/components/spider-storage/src/state/job_cache.rs b/components/spider-storage/src/state/job_cache.rs index 40bb64c8..3a14fe3b 100644 --- a/components/spider-storage/src/state/job_cache.rs +++ b/components/spider-storage/src/state/job_cache.rs @@ -131,7 +131,7 @@ mod tests { use std::sync::Arc; use spider_core::{ - job::JobState, + job::{JobState, ValidatedJobSubmission}, task::{ DataTypeDescriptor, ExecutionPolicy, @@ -276,11 +276,13 @@ mod tests { }) .expect("task insertion should succeed"); + let job_submission = + ValidatedJobSubmission::new(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + .expect("job submission should be valid"); SharedJobControlBlock::create( job_id, spider_core::types::id::ResourceGroupId::new(), - &submitted, - vec![TaskInput::ValuePayload(vec![0u8; 4])], + job_submission, MockReadyQueueSender, MockDbConnector, MockTaskInstancePoolConnector, @@ -454,11 +456,13 @@ mod tests { .expect("task insertion should succeed"); let job_id = JobId::new(); + let job_submission = + ValidatedJobSubmission::new(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + .expect("job submission should be valid"); let jcb = SharedJobControlBlock::create( job_id, spider_core::types::id::ResourceGroupId::new(), - &submitted, - vec![TaskInput::ValuePayload(vec![0u8; 4])], + job_submission, sender, MockDbConnector, MockTaskInstancePoolConnector, diff --git a/components/spider-storage/src/task_instance_pool.rs b/components/spider-storage/src/task_instance_pool.rs index b5779ff0..7de68f5a 100644 --- a/components/spider-storage/src/task_instance_pool.rs +++ b/components/spider-storage/src/task_instance_pool.rs @@ -524,6 +524,7 @@ mod tests { use async_trait::async_trait; use spider_core::{ + job::ValidatedJobSubmission, task::{ DataTypeDescriptor, ExecutionPolicy, @@ -661,12 +662,12 @@ mod tests { input_sources: None, }) .expect("task insertion should succeed"); - let task_graph = crate::cache::task::TaskGraph::create( - &submitted, - vec![TaskInput::ValuePayload(vec![0u8; 4])], - ) - .await - .expect("cache task graph creation should succeed"); + let job_submission = + ValidatedJobSubmission::new(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + .expect("job submission should be valid"); + let task_graph = crate::cache::task::TaskGraph::create(&job_submission) + .await + .expect("cache task graph creation should succeed"); task_graph .get_task_control_block(0) .expect("task control block should exist") diff --git a/components/spider-storage/tests/jcb_test.rs b/components/spider-storage/tests/jcb_test.rs index 314f574a..802e228a 100644 --- a/components/spider-storage/tests/jcb_test.rs +++ b/components/spider-storage/tests/jcb_test.rs @@ -1,4 +1,4 @@ -use spider_core::job::JobState; +use spider_core::job::{JobState, ValidatedJobSubmission}; use spider_storage::db::{ExternalJobOrchestration, InternalJobOrchestration}; use super::{ @@ -47,9 +47,10 @@ async fn test_flat_success( ) -> WorkloadResult { let (graph, inputs) = build_flat_task_graph(10_000, 1024, true, true); let num_tasks = graph.get_num_tasks(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let result = run_workload( - &graph, - inputs, + job_submission, db_connector_factory, CancelPolicy::Never, default_output_handler(1024), @@ -89,9 +90,10 @@ async fn test_flat_cancel( db_connector_factory: impl DbConnectorFactory, ) -> WorkloadResult { let (graph, inputs) = build_flat_task_graph(10_000, 1024, true, true); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let result = run_workload( - &graph, - inputs, + job_submission, db_connector_factory, CancelPolicy::Immediate, default_output_handler(1024), @@ -133,9 +135,10 @@ async fn test_neural_net_success WorkloadResult { let (graph, inputs) = build_neural_net_task_graph(); let num_tasks = graph.get_num_tasks(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let result = run_workload( - &graph, - inputs, + job_submission, db_connector_factory, CancelPolicy::Never, default_output_handler(128), @@ -178,9 +181,10 @@ async fn test_neural_net_cancel, ) -> WorkloadResult { let (graph, inputs) = build_neural_net_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let result = run_workload( - &graph, - inputs, + job_submission, db_connector_factory, CancelPolicy::Immediate, default_output_handler(128), @@ -220,9 +224,10 @@ async fn test_always_fail_terminates_job, ) -> WorkloadResult { let (graph, inputs) = build_flat_task_graph(3, 128, false, false); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let result = run_workload( - &graph, - inputs, + job_submission, db_connector_factory, CancelPolicy::Never, default_output_handler(128), @@ -258,9 +263,10 @@ async fn test_concurrent_success_and_cancel, ) -> WorkloadResult { let (graph, inputs) = build_flat_task_graph(100, 128, true, true); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let result = run_workload( - &graph, - inputs, + job_submission, db_connector_factory, CancelPolicy::Concurrent, default_output_handler(128), diff --git a/components/spider-storage/tests/mariadb_test.rs b/components/spider-storage/tests/mariadb_test.rs index 05c9742f..029c2544 100644 --- a/components/spider-storage/tests/mariadb_test.rs +++ b/components/spider-storage/tests/mariadb_test.rs @@ -4,7 +4,7 @@ use std::{ }; use spider_core::{ - job::JobState, + job::{JobState, ValidatedJobSubmission}, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId}, io::TaskInput, @@ -58,9 +58,11 @@ async fn test_register_job() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -77,10 +79,10 @@ async fn test_register_job_invalid_resource_group() { let storage = create_mariadb_connector().await; let fake_rg_id = ResourceGroupId::new(); let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); - let result = storage - .register(fake_rg_id, &graph, inputs.as_slice()) - .await; + let result = storage.register(fake_rg_id, &job_submission).await; assert!( matches!(result, Err(DbError::ResourceGroupNotFound(_))), @@ -94,9 +96,11 @@ async fn test_start_job() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -115,9 +119,11 @@ async fn test_start_job_wrong_state() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -136,9 +142,11 @@ async fn test_cancel_job_without_cleanup_transitions_to_cancelled() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -162,9 +170,11 @@ async fn test_get_outputs_succeeded_job() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -188,9 +198,11 @@ async fn test_get_outputs_wrong_state() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -207,9 +219,11 @@ async fn test_get_error_failed_job() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -232,9 +246,11 @@ async fn test_get_error_wrong_state() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -251,9 +267,11 @@ async fn test_cancel_job_with_cleanup_transitions_to_cleanup_ready() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -276,9 +294,11 @@ async fn test_cancel_already_terminal() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -304,9 +324,11 @@ async fn test_set_state_valid_transition() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -327,9 +349,11 @@ async fn test_set_state_invalid_transition() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -350,9 +374,11 @@ async fn test_commit_outputs_without_commit_task() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -375,9 +401,11 @@ async fn test_commit_outputs_with_commit_task() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -403,9 +431,11 @@ async fn test_commit_outputs_wrong_state() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -427,9 +457,11 @@ async fn test_fail_job() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -452,9 +484,11 @@ async fn test_fail_terminal_state() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -481,9 +515,11 @@ async fn test_delete_expired_terminated_jobs() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -694,9 +730,11 @@ async fn test_cancel_from_ready_state() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); @@ -718,9 +756,11 @@ async fn test_delete_expired_terminated_jobs_no_match() { let storage = create_mariadb_connector().await; let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); + let job_submission = + ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); let job_id = storage - .register(rg_id, &graph, inputs.as_slice()) + .register(rg_id, &job_submission) .await .expect("register should succeed"); diff --git a/components/spider-storage/tests/scheduling_infra.rs b/components/spider-storage/tests/scheduling_infra.rs index 78fbd7d0..0dbb27b0 100644 --- a/components/spider-storage/tests/scheduling_infra.rs +++ b/components/spider-storage/tests/scheduling_infra.rs @@ -84,11 +84,11 @@ use async_trait::async_trait; use dashmap::DashMap; use rand::{Rng, SeedableRng}; use spider_core::{ - job::JobState, - task::{TaskGraph as SubmittedTaskGraph, TaskIndex}, + job::{JobState, ValidatedJobSubmission}, + task::TaskIndex, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId, TaskInstanceId}, - io::{ExecutionContext, TaskInput, TaskOutput}, + io::{ExecutionContext, TaskOutput}, }, }; use spider_storage::{ @@ -205,25 +205,23 @@ pub type FactoryReturn = (DbConnectorType, JobId, ResourceGroup /// /// * `DbConnectorType` - The DB-layer connector implementation. /// -/// Receives the submitted task graph and job inputs, performs any required DB setup (e.g. job -/// registration), and returns the connector along with the [`JobId`] and [`ResourceGroupId`] to -/// use for the JCB. +/// Receives the validated job submission, performs any required DB setup (e.g. job registration), +/// and returns the connector along with the [`JobId`] and [`ResourceGroupId`] to use for the JCB. pub trait DbConnectorFactory: - AsyncFnOnce(&SubmittedTaskGraph, &[TaskInput]) -> FactoryReturn + Send { + AsyncFnOnce(&ValidatedJobSubmission) -> FactoryReturn + Send { } impl DbConnectorFactory for AsyncFunc where - AsyncFunc: - AsyncFnOnce(&SubmittedTaskGraph, &[TaskInput]) -> FactoryReturn + Send, + AsyncFunc: AsyncFnOnce(&ValidatedJobSubmission) -> FactoryReturn + Send, { } /// Creates a [`NoopDbConnector`] with default [`JobId`] and [`ResourceGroupId`]. #[must_use] pub fn noop_db_connector_factory() -> impl DbConnectorFactory { - async |_, _| { + async |_: &ValidatedJobSubmission| { ( NoopDbConnector {}, JobId::default(), @@ -317,8 +315,7 @@ pub fn write_instrument_results( /// A [`WorkloadResult`] containing the terminal state and commit/cleanup execution counts. #[allow(clippy::too_many_lines)] pub async fn run_workload( - submitted_task_graph: &SubmittedTaskGraph, - inputs: Vec, + job_submission: ValidatedJobSubmission, db_connector_factory: impl DbConnectorFactory, cancel_policy: CancelPolicy, output_handler: TaskOutputHandler, @@ -330,16 +327,14 @@ pub async fn run_workload( let ready_queue_sender = MockReadyQueueSender { sender: ready_sender, }; - let (db_connector, job_id, resource_group_id) = - db_connector_factory(submitted_task_graph, &inputs).await; + let (db_connector, job_id, resource_group_id) = db_connector_factory(&job_submission).await; let task_instance_pool = MockTaskInstancePool::new(); // Create and start the JCB. let inner_jcb = SharedJobControlBlock::create( job_id, resource_group_id, - submitted_task_graph, - inputs, + job_submission, ready_queue_sender, db_connector, task_instance_pool, @@ -457,8 +452,8 @@ pub fn mariadb_db_connector_factory( storage: MariaDbStorageConnector, rg_id: ResourceGroupId, ) -> impl DbConnectorFactory { - async move |graph, inputs| { - let job_id = ExternalJobOrchestration::register(&storage, rg_id, graph, inputs) + async move |job_submission: &ValidatedJobSubmission| { + let job_id = ExternalJobOrchestration::register(&storage, rg_id, job_submission) .await .expect("register should succeed"); (storage, job_id, rg_id) From 327d793f76b52c5bd63ab6a31e667fe3bb8f631f Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Thu, 7 May 2026 16:57:53 -0400 Subject: [PATCH 02/10] Rename function --- components/spider-core/src/job.rs | 10 ++--- components/spider-storage/src/cache/task.rs | 6 +-- .../spider-storage/src/state/job_cache.rs | 4 +- .../spider-storage/src/task_instance_pool.rs | 2 +- components/spider-storage/tests/jcb_test.rs | 12 +++--- .../spider-storage/tests/mariadb_test.rs | 42 +++++++++---------- 6 files changed, 38 insertions(+), 38 deletions(-) diff --git a/components/spider-core/src/job.rs b/components/spider-core/src/job.rs index 3277f0a7..b106fcc9 100644 --- a/components/spider-core/src/job.rs +++ b/components/spider-core/src/job.rs @@ -30,7 +30,7 @@ impl ValidatedJobSubmission { /// * [`Error::InvalidJobSubmission`] if the task graph contains no tasks. /// * [`Error::InvalidJobSubmission`] if the number of inputs does not match the number of graph /// inputs. - pub fn new(task_graph: TaskGraph, inputs: Vec) -> Result { + pub fn validate(task_graph: TaskGraph, inputs: Vec) -> Result { let num_tasks = task_graph.get_num_tasks(); if num_tasks == 0 { return Err(Error::InvalidJobSubmission( @@ -109,7 +109,7 @@ mod tests { fn valid_job_submission_succeeds() { let graph = create_single_input_task_graph(); let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; - let result = ValidatedJobSubmission::new(graph, inputs); + let result = ValidatedJobSubmission::validate(graph, inputs); assert!(result.is_ok(), "valid submission should succeed"); } @@ -118,7 +118,7 @@ mod tests { let graph = SubmittedTaskGraph::new(None, None).expect("task graph creation should succeed"); let inputs = vec![]; - let result = ValidatedJobSubmission::new(graph, inputs); + let result = ValidatedJobSubmission::validate(graph, inputs); assert!( matches!(result, Err(Error::InvalidJobSubmission(_))), "empty task graph should return InvalidJobSubmission" @@ -129,7 +129,7 @@ mod tests { fn mismatched_input_count_fails() { let graph = create_single_input_task_graph(); let inputs = vec![]; - let result = ValidatedJobSubmission::new(graph, inputs); + let result = ValidatedJobSubmission::validate(graph, inputs); assert!( matches!(result, Err(Error::InvalidJobSubmission(_))), "mismatched input count should return InvalidJobSubmission" @@ -141,7 +141,7 @@ mod tests { let graph = create_single_input_task_graph(); let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; let submission = - ValidatedJobSubmission::new(graph, inputs).expect("submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("submission should be valid"); let (graph, inputs) = submission.into_parts(); assert_eq!(graph.get_num_tasks(), 1, "task graph should have 1 task"); assert_eq!(inputs.len(), 1, "should have 1 input"); diff --git a/components/spider-storage/src/cache/task.rs b/components/spider-storage/src/cache/task.rs index ffd258f8..5d833756 100644 --- a/components/spider-storage/src/cache/task.rs +++ b/components/spider-storage/src/cache/task.rs @@ -1083,7 +1083,7 @@ mod tests { .map(|_| TaskInput::ValuePayload(vec![0u8; 4])) .collect(); let job_submission = - ValidatedJobSubmission::new(submitted, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(submitted, inputs).expect("job submission should be valid"); TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed") @@ -1127,7 +1127,7 @@ mod tests { }) .expect("task insertion should succeed"); let job_submission = - ValidatedJobSubmission::new(submitted, vec![]).expect("job submission should be valid"); + ValidatedJobSubmission::validate(submitted, vec![]).expect("job submission should be valid"); let task_graph = TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed"); @@ -1244,7 +1244,7 @@ mod tests { TaskInput::ValuePayload(input_b), ]; let job_submission = - ValidatedJobSubmission::new(submitted, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(submitted, inputs).expect("job submission should be valid"); TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed") diff --git a/components/spider-storage/src/state/job_cache.rs b/components/spider-storage/src/state/job_cache.rs index 3a14fe3b..5dd21606 100644 --- a/components/spider-storage/src/state/job_cache.rs +++ b/components/spider-storage/src/state/job_cache.rs @@ -277,7 +277,7 @@ mod tests { .expect("task insertion should succeed"); let job_submission = - ValidatedJobSubmission::new(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + ValidatedJobSubmission::validate(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) .expect("job submission should be valid"); SharedJobControlBlock::create( job_id, @@ -457,7 +457,7 @@ mod tests { let job_id = JobId::new(); let job_submission = - ValidatedJobSubmission::new(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + ValidatedJobSubmission::validate(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) .expect("job submission should be valid"); let jcb = SharedJobControlBlock::create( job_id, diff --git a/components/spider-storage/src/task_instance_pool.rs b/components/spider-storage/src/task_instance_pool.rs index 7de68f5a..4cad1863 100644 --- a/components/spider-storage/src/task_instance_pool.rs +++ b/components/spider-storage/src/task_instance_pool.rs @@ -663,7 +663,7 @@ mod tests { }) .expect("task insertion should succeed"); let job_submission = - ValidatedJobSubmission::new(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + ValidatedJobSubmission::validate(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) .expect("job submission should be valid"); let task_graph = crate::cache::task::TaskGraph::create(&job_submission) .await diff --git a/components/spider-storage/tests/jcb_test.rs b/components/spider-storage/tests/jcb_test.rs index 802e228a..9ffd234c 100644 --- a/components/spider-storage/tests/jcb_test.rs +++ b/components/spider-storage/tests/jcb_test.rs @@ -48,7 +48,7 @@ async fn test_flat_success( let (graph, inputs) = build_flat_task_graph(10_000, 1024, true, true); let num_tasks = graph.get_num_tasks(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, @@ -91,7 +91,7 @@ async fn test_flat_cancel( ) -> WorkloadResult { let (graph, inputs) = build_flat_task_graph(10_000, 1024, true, true); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, @@ -136,7 +136,7 @@ async fn test_neural_net_success WorkloadResult { let (graph, inputs) = build_neural_net_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, @@ -225,7 +225,7 @@ async fn test_always_fail_terminates_job WorkloadResult { let (graph, inputs) = build_flat_task_graph(3, 128, false, false); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, @@ -264,7 +264,7 @@ async fn test_concurrent_success_and_cancel WorkloadResult { let (graph, inputs) = build_flat_task_graph(100, 128, true, true); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, diff --git a/components/spider-storage/tests/mariadb_test.rs b/components/spider-storage/tests/mariadb_test.rs index 029c2544..5c5d2207 100644 --- a/components/spider-storage/tests/mariadb_test.rs +++ b/components/spider-storage/tests/mariadb_test.rs @@ -59,7 +59,7 @@ async fn test_register_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -80,7 +80,7 @@ async fn test_register_job_invalid_resource_group() { let fake_rg_id = ResourceGroupId::new(); let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let result = storage.register(fake_rg_id, &job_submission).await; @@ -97,7 +97,7 @@ async fn test_start_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -120,7 +120,7 @@ async fn test_start_job_wrong_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -143,7 +143,7 @@ async fn test_cancel_job_without_cleanup_transitions_to_cancelled() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -171,7 +171,7 @@ async fn test_get_outputs_succeeded_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -199,7 +199,7 @@ async fn test_get_outputs_wrong_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -220,7 +220,7 @@ async fn test_get_error_failed_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -247,7 +247,7 @@ async fn test_get_error_wrong_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -268,7 +268,7 @@ async fn test_cancel_job_with_cleanup_transitions_to_cleanup_ready() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -295,7 +295,7 @@ async fn test_cancel_already_terminal() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -325,7 +325,7 @@ async fn test_set_state_valid_transition() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -350,7 +350,7 @@ async fn test_set_state_invalid_transition() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -375,7 +375,7 @@ async fn test_commit_outputs_without_commit_task() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -402,7 +402,7 @@ async fn test_commit_outputs_with_commit_task() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -432,7 +432,7 @@ async fn test_commit_outputs_wrong_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -458,7 +458,7 @@ async fn test_fail_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -485,7 +485,7 @@ async fn test_fail_terminal_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -516,7 +516,7 @@ async fn test_delete_expired_terminated_jobs() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -731,7 +731,7 @@ async fn test_cancel_from_ready_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -757,7 +757,7 @@ async fn test_delete_expired_terminated_jobs_no_match() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::new(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) From e489f9e623d58ad01fb2744d02eef4112f585df4 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Thu, 7 May 2026 18:18:08 -0400 Subject: [PATCH 03/10] Fix error type --- components/spider-core/src/job.rs | 29 +++++++++++-------- components/spider-core/src/task.rs | 7 +++-- components/spider-storage/src/cache/task.rs | 12 ++++---- .../spider-storage/src/state/job_cache.rs | 16 ++++++---- .../spider-storage/src/task_instance_pool.rs | 8 +++-- 5 files changed, 43 insertions(+), 29 deletions(-) diff --git a/components/spider-core/src/job.rs b/components/spider-core/src/job.rs index b106fcc9..32567106 100644 --- a/components/spider-core/src/job.rs +++ b/components/spider-core/src/job.rs @@ -27,22 +27,21 @@ impl ValidatedJobSubmission { /// /// Returns an error if: /// - /// * [`Error::InvalidJobSubmission`] if the task graph contains no tasks. - /// * [`Error::InvalidJobSubmission`] if the number of inputs does not match the number of graph + /// * [`Error::EmptyTaskGraph`] if the task graph contains no tasks. + /// * [`Error::InputCountMismatch`] if the number of inputs does not match the number of graph /// inputs. pub fn validate(task_graph: TaskGraph, inputs: Vec) -> Result { let num_tasks = task_graph.get_num_tasks(); if num_tasks == 0 { - return Err(Error::InvalidJobSubmission( - "task graph must contain at least one task".to_owned(), - )); + return Err(Error::EmptyTaskGraph); } let expected_inputs = task_graph.get_task_graph_input_indices().len(); let actual_inputs = inputs.len(); if expected_inputs != actual_inputs { - return Err(Error::InvalidJobSubmission(format!( - "expected {expected_inputs} graph inputs, got {actual_inputs}" - ))); + return Err(Error::InputCountMismatch { + expected: expected_inputs, + actual: actual_inputs, + }); } Ok(Self { task_graph, inputs }) } @@ -120,8 +119,8 @@ mod tests { let inputs = vec![]; let result = ValidatedJobSubmission::validate(graph, inputs); assert!( - matches!(result, Err(Error::InvalidJobSubmission(_))), - "empty task graph should return InvalidJobSubmission" + matches!(result, Err(Error::EmptyTaskGraph)), + "empty task graph should return EmptyTaskGraph" ); } @@ -131,8 +130,14 @@ mod tests { let inputs = vec![]; let result = ValidatedJobSubmission::validate(graph, inputs); assert!( - matches!(result, Err(Error::InvalidJobSubmission(_))), - "mismatched input count should return InvalidJobSubmission" + matches!( + result, + Err(Error::InputCountMismatch { + expected: 1, + actual: 0 + }) + ), + "mismatched input count should return InputCountMismatch" ); } diff --git a/components/spider-core/src/task.rs b/components/spider-core/src/task.rs index a1b3c0ce..2c485d8a 100644 --- a/components/spider-core/src/task.rs +++ b/components/spider-core/src/task.rs @@ -28,8 +28,11 @@ pub enum Error { #[error("invalid timeout policy: {0}")] InvalidTimeoutPolicy(String), - #[error("invalid job submission: {0}")] - InvalidJobSubmission(String), + #[error("task graph must contain at least one task")] + EmptyTaskGraph, + + #[error("expected {expected} graph inputs, got {actual}")] + InputCountMismatch { expected: usize, actual: usize }, } /// Enum for all possible states of a task. diff --git a/components/spider-storage/src/cache/task.rs b/components/spider-storage/src/cache/task.rs index 5d833756..1843ddcf 100644 --- a/components/spider-storage/src/cache/task.rs +++ b/components/spider-storage/src/cache/task.rs @@ -1082,8 +1082,8 @@ mod tests { let inputs: Vec = (0..num_inputs) .map(|_| TaskInput::ValuePayload(vec![0u8; 4])) .collect(); - let job_submission = - ValidatedJobSubmission::validate(submitted, inputs).expect("job submission should be valid"); + let job_submission = ValidatedJobSubmission::validate(submitted, inputs) + .expect("job submission should be valid"); TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed") @@ -1126,8 +1126,8 @@ mod tests { input_sources: None, }) .expect("task insertion should succeed"); - let job_submission = - ValidatedJobSubmission::validate(submitted, vec![]).expect("job submission should be valid"); + let job_submission = ValidatedJobSubmission::validate(submitted, vec![]) + .expect("job submission should be valid"); let task_graph = TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed"); @@ -1243,8 +1243,8 @@ mod tests { TaskInput::ValuePayload(input_a), TaskInput::ValuePayload(input_b), ]; - let job_submission = - ValidatedJobSubmission::validate(submitted, inputs).expect("job submission should be valid"); + let job_submission = ValidatedJobSubmission::validate(submitted, inputs) + .expect("job submission should be valid"); TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed") diff --git a/components/spider-storage/src/state/job_cache.rs b/components/spider-storage/src/state/job_cache.rs index 5dd21606..eb8e579f 100644 --- a/components/spider-storage/src/state/job_cache.rs +++ b/components/spider-storage/src/state/job_cache.rs @@ -276,9 +276,11 @@ mod tests { }) .expect("task insertion should succeed"); - let job_submission = - ValidatedJobSubmission::validate(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) - .expect("job submission should be valid"); + let job_submission = ValidatedJobSubmission::validate( + submitted, + vec![TaskInput::ValuePayload(vec![0u8; 4])], + ) + .expect("job submission should be valid"); SharedJobControlBlock::create( job_id, spider_core::types::id::ResourceGroupId::new(), @@ -456,9 +458,11 @@ mod tests { .expect("task insertion should succeed"); let job_id = JobId::new(); - let job_submission = - ValidatedJobSubmission::validate(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) - .expect("job submission should be valid"); + let job_submission = ValidatedJobSubmission::validate( + submitted, + vec![TaskInput::ValuePayload(vec![0u8; 4])], + ) + .expect("job submission should be valid"); let jcb = SharedJobControlBlock::create( job_id, spider_core::types::id::ResourceGroupId::new(), diff --git a/components/spider-storage/src/task_instance_pool.rs b/components/spider-storage/src/task_instance_pool.rs index 4cad1863..6f5c903a 100644 --- a/components/spider-storage/src/task_instance_pool.rs +++ b/components/spider-storage/src/task_instance_pool.rs @@ -662,9 +662,11 @@ mod tests { input_sources: None, }) .expect("task insertion should succeed"); - let job_submission = - ValidatedJobSubmission::validate(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) - .expect("job submission should be valid"); + let job_submission = ValidatedJobSubmission::validate( + submitted, + vec![TaskInput::ValuePayload(vec![0u8; 4])], + ) + .expect("job submission should be valid"); let task_graph = crate::cache::task::TaskGraph::create(&job_submission) .await .expect("cache task graph creation should succeed"); From 7e759b3539b6209e888b671568055d65b0180039 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Thu, 7 May 2026 20:47:56 -0400 Subject: [PATCH 04/10] Move ValidatedJobSubmission into storage --- components/spider-core/src/job.rs | 153 ----------------- components/spider-core/src/task.rs | 6 - components/spider-storage/src/cache.rs | 1 + components/spider-storage/src/cache/error.rs | 6 + components/spider-storage/src/cache/job.rs | 3 +- .../src/cache/job_submission.rs | 155 ++++++++++++++++++ components/spider-storage/src/cache/task.rs | 20 +-- components/spider-storage/src/db/mariadb.rs | 3 +- components/spider-storage/src/db/protocol.rs | 4 +- .../spider-storage/src/state/job_cache.rs | 3 +- .../spider-storage/src/task_instance_pool.rs | 2 +- components/spider-storage/tests/jcb_test.rs | 7 +- .../spider-storage/tests/mariadb_test.rs | 21 ++- .../spider-storage/tests/scheduling_infra.rs | 3 +- 14 files changed, 199 insertions(+), 188 deletions(-) create mode 100644 components/spider-storage/src/cache/job_submission.rs diff --git a/components/spider-core/src/job.rs b/components/spider-core/src/job.rs index 32567106..897809df 100644 --- a/components/spider-core/src/job.rs +++ b/components/spider-core/src/job.rs @@ -1,158 +1,5 @@ use spider_derive::MySqlEnum; -use crate::{ - task::{Error, TaskGraph}, - types::io::TaskInput, -}; - -/// A validated wrapper around a task graph and its corresponding job inputs. -/// -/// This type guarantees at construction time that: -/// -/// * The task graph contains at least one task. -/// * The number of job inputs matches the number of graph inputs expected by the task graph. -/// -/// By passing this type through the call chain, downstream consumers can trust the consistency -/// invariant without re-validating. -#[derive(Debug)] -pub struct ValidatedJobSubmission { - task_graph: TaskGraph, - inputs: Vec, -} - -impl ValidatedJobSubmission { - /// Creates a new validated job submission. - /// - /// # Errors - /// - /// Returns an error if: - /// - /// * [`Error::EmptyTaskGraph`] if the task graph contains no tasks. - /// * [`Error::InputCountMismatch`] if the number of inputs does not match the number of graph - /// inputs. - pub fn validate(task_graph: TaskGraph, inputs: Vec) -> Result { - let num_tasks = task_graph.get_num_tasks(); - if num_tasks == 0 { - return Err(Error::EmptyTaskGraph); - } - let expected_inputs = task_graph.get_task_graph_input_indices().len(); - let actual_inputs = inputs.len(); - if expected_inputs != actual_inputs { - return Err(Error::InputCountMismatch { - expected: expected_inputs, - actual: actual_inputs, - }); - } - Ok(Self { task_graph, inputs }) - } - - /// # Returns - /// - /// A reference to the validated task graph. - #[must_use] - pub const fn task_graph(&self) -> &TaskGraph { - &self.task_graph - } - - /// # Returns - /// - /// A reference to the validated job inputs. - #[must_use] - pub fn inputs(&self) -> &[TaskInput] { - &self.inputs - } - - /// Consumes the wrapper and returns the owned task graph and job inputs. - /// - /// # Returns - /// - /// A tuple of `(task_graph, inputs)`. - #[must_use] - pub fn into_parts(self) -> (TaskGraph, Vec) { - (self.task_graph, self.inputs) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::task::{ - DataTypeDescriptor, - ExecutionPolicy, - TaskDescriptor, - TaskGraph as SubmittedTaskGraph, - TdlContext, - ValueTypeDescriptor, - }; - - fn create_single_input_task_graph() -> SubmittedTaskGraph { - let bytes_type = DataTypeDescriptor::Value(ValueTypeDescriptor::bytes()); - let mut graph = - SubmittedTaskGraph::new(None, None).expect("task graph creation should succeed"); - graph - .insert_task(TaskDescriptor { - tdl_context: TdlContext { - package: "test_pkg".to_owned(), - task_func: "test_fn".to_owned(), - }, - execution_policy: Some(ExecutionPolicy::default()), - inputs: vec![bytes_type], - outputs: vec![], - input_sources: None, - }) - .expect("task insertion should succeed"); - graph - } - - #[test] - fn valid_job_submission_succeeds() { - let graph = create_single_input_task_graph(); - let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; - let result = ValidatedJobSubmission::validate(graph, inputs); - assert!(result.is_ok(), "valid submission should succeed"); - } - - #[test] - fn empty_task_graph_fails() { - let graph = - SubmittedTaskGraph::new(None, None).expect("task graph creation should succeed"); - let inputs = vec![]; - let result = ValidatedJobSubmission::validate(graph, inputs); - assert!( - matches!(result, Err(Error::EmptyTaskGraph)), - "empty task graph should return EmptyTaskGraph" - ); - } - - #[test] - fn mismatched_input_count_fails() { - let graph = create_single_input_task_graph(); - let inputs = vec![]; - let result = ValidatedJobSubmission::validate(graph, inputs); - assert!( - matches!( - result, - Err(Error::InputCountMismatch { - expected: 1, - actual: 0 - }) - ), - "mismatched input count should return InputCountMismatch" - ); - } - - #[test] - fn into_parts_returns_owned_components() { - let graph = create_single_input_task_graph(); - let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; - let submission = - ValidatedJobSubmission::validate(graph, inputs).expect("submission should be valid"); - let (graph, inputs) = submission.into_parts(); - assert_eq!(graph.get_num_tasks(), 1, "task graph should have 1 task"); - assert_eq!(inputs.len(), 1, "should have 1 input"); - } -} - /// Represents a job in the Spider scheduling framework. pub struct Job {} diff --git a/components/spider-core/src/task.rs b/components/spider-core/src/task.rs index 2c485d8a..ac6ee329 100644 --- a/components/spider-core/src/task.rs +++ b/components/spider-core/src/task.rs @@ -27,12 +27,6 @@ pub enum Error { #[error("invalid timeout policy: {0}")] InvalidTimeoutPolicy(String), - - #[error("task graph must contain at least one task")] - EmptyTaskGraph, - - #[error("expected {expected} graph inputs, got {actual}")] - InputCountMismatch { expected: usize, actual: usize }, } /// Enum for all possible states of a task. diff --git a/components/spider-storage/src/cache.rs b/components/spider-storage/src/cache.rs index 4ace32b7..d520f519 100644 --- a/components/spider-storage/src/cache.rs +++ b/components/spider-storage/src/cache.rs @@ -3,6 +3,7 @@ use spider_core::task::TaskIndex; pub mod error; pub mod io; pub mod job; +pub mod job_submission; mod sync; pub mod task; diff --git a/components/spider-storage/src/cache/error.rs b/components/spider-storage/src/cache/error.rs index 7226bfd6..d7f0f434 100644 --- a/components/spider-storage/src/cache/error.rs +++ b/components/spider-storage/src/cache/error.rs @@ -70,6 +70,12 @@ pub enum InternalError { #[error("ready queue channel is closed")] ReadyQueueChannelClosed, + + #[error("task graph must contain at least one task")] + EmptyTaskGraph, + + #[error("expected {expected} graph inputs, got {actual}")] + InputCountMismatch { expected: usize, actual: usize }, } /// Enums for all errors representing operations that are rejected due to stale cache state. diff --git a/components/spider-storage/src/cache/job.rs b/components/spider-storage/src/cache/job.rs index cc89bb99..c0d42af3 100644 --- a/components/spider-storage/src/cache/job.rs +++ b/components/spider-storage/src/cache/job.rs @@ -7,7 +7,7 @@ use std::{ }; use spider_core::{ - job::{JobState, ValidatedJobSubmission}, + job::JobState, task::{TaskIndex, TaskState}, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId, TaskInstanceId}, @@ -20,6 +20,7 @@ use crate::{ cache::{ TaskId, error::{CacheError, InternalError, InternalError::UnexpectedJobState, StaleStateError}, + job_submission::ValidatedJobSubmission, task::TaskGraph, }, db::InternalJobOrchestration, diff --git a/components/spider-storage/src/cache/job_submission.rs b/components/spider-storage/src/cache/job_submission.rs new file mode 100644 index 00000000..4e9afd8b --- /dev/null +++ b/components/spider-storage/src/cache/job_submission.rs @@ -0,0 +1,155 @@ +use spider_core::{task::TaskGraph, types::io::TaskInput}; + +use super::error::InternalError; + +/// A validated wrapper around a task graph and its corresponding job inputs. +/// +/// This type guarantees at construction time that: +/// +/// * The task graph contains at least one task. +/// * The number of job inputs matches the number of graph inputs expected by the task graph. +/// +/// By passing this type through the call chain, downstream consumers can trust the consistency +/// invariant without re-validating. +#[derive(Debug)] +pub struct ValidatedJobSubmission { + task_graph: TaskGraph, + inputs: Vec, +} + +impl ValidatedJobSubmission { + /// Creates a new validated job submission. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`InternalError::EmptyTaskGraph`] if the task graph contains no tasks. + /// * [`InternalError::InputCountMismatch`] if the number of inputs does not match the number of + /// graph inputs. + pub fn validate(task_graph: TaskGraph, inputs: Vec) -> Result { + let num_tasks = task_graph.get_num_tasks(); + if num_tasks == 0 { + return Err(InternalError::EmptyTaskGraph); + } + let expected_inputs = task_graph.get_task_graph_input_indices().len(); + let actual_inputs = inputs.len(); + if expected_inputs != actual_inputs { + return Err(InternalError::InputCountMismatch { + expected: expected_inputs, + actual: actual_inputs, + }); + } + Ok(Self { task_graph, inputs }) + } + + /// # Returns + /// + /// A reference to the validated task graph. + #[must_use] + pub const fn task_graph(&self) -> &TaskGraph { + &self.task_graph + } + + /// # Returns + /// + /// A reference to the validated job inputs. + #[must_use] + pub fn inputs(&self) -> &[TaskInput] { + &self.inputs + } + + /// Consumes the wrapper and returns the owned task graph and job inputs. + /// + /// # Returns + /// + /// A tuple of `(task_graph, inputs)`. + #[must_use] + pub fn into_parts(self) -> (TaskGraph, Vec) { + (self.task_graph, self.inputs) + } +} + +#[cfg(test)] +mod tests { + use spider_core::{ + task::{ + DataTypeDescriptor, + ExecutionPolicy, + TaskDescriptor, + TaskGraph as SubmittedTaskGraph, + TdlContext, + ValueTypeDescriptor, + }, + types::io::TaskInput, + }; + + use super::{super::error::InternalError, *}; + + fn create_single_input_task_graph() -> SubmittedTaskGraph { + let bytes_type = DataTypeDescriptor::Value(ValueTypeDescriptor::bytes()); + let mut graph = + SubmittedTaskGraph::new(None, None).expect("task graph creation should succeed"); + graph + .insert_task(TaskDescriptor { + tdl_context: TdlContext { + package: "test_pkg".to_owned(), + task_func: "test_fn".to_owned(), + }, + execution_policy: Some(ExecutionPolicy::default()), + inputs: vec![bytes_type], + outputs: vec![], + input_sources: None, + }) + .expect("task insertion should succeed"); + graph + } + + #[test] + fn valid_job_submission_succeeds() { + let graph = create_single_input_task_graph(); + let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; + let result = ValidatedJobSubmission::validate(graph, inputs); + assert!(result.is_ok(), "valid submission should succeed"); + } + + #[test] + fn empty_task_graph_fails() { + let graph = + SubmittedTaskGraph::new(None, None).expect("task graph creation should succeed"); + let inputs = vec![]; + let result = ValidatedJobSubmission::validate(graph, inputs); + assert!( + matches!(result, Err(InternalError::EmptyTaskGraph)), + "empty task graph should return EmptyTaskGraph" + ); + } + + #[test] + fn mismatched_input_count_fails() { + let graph = create_single_input_task_graph(); + let inputs = vec![]; + let result = ValidatedJobSubmission::validate(graph, inputs); + assert!( + matches!( + result, + Err(InternalError::InputCountMismatch { + expected: 1, + actual: 0 + }) + ), + "mismatched input count should return InputCountMismatch" + ); + } + + #[test] + fn into_parts_returns_owned_components() { + let graph = create_single_input_task_graph(); + let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; + let submission = + ValidatedJobSubmission::validate(graph, inputs).expect("submission should be valid"); + let (graph, inputs) = submission.into_parts(); + assert_eq!(graph.get_num_tasks(), 1, "task graph should have 1 task"); + assert_eq!(inputs.len(), 1, "should have 1 input"); + } +} diff --git a/components/spider-storage/src/cache/task.rs b/components/spider-storage/src/cache/task.rs index 1843ddcf..d6e38035 100644 --- a/components/spider-storage/src/cache/task.rs +++ b/components/spider-storage/src/cache/task.rs @@ -5,7 +5,6 @@ use std::{ }; use spider_core::{ - job::ValidatedJobSubmission, task::{Task, TaskIndex, TaskState, TdlContext, TerminationTaskDescriptor, TimeoutPolicy}, types::{ id::TaskInstanceId, @@ -17,6 +16,7 @@ use tokio::sync::RwLock; use crate::cache::{ error::{CacheError, InternalError, StaleStateError}, io::{InputReader, OutputReader, OutputWriter, ValuePayload}, + job_submission::ValidatedJobSubmission, sync::{Reader, SharedRw, Writer}, }; @@ -926,19 +926,17 @@ mod tests { }, }; - use spider_core::{ - job::ValidatedJobSubmission, - task::{ - DataTypeDescriptor, - ExecutionPolicy, - TaskDescriptor, - TaskGraph as SubmittedTaskGraph, - TerminationTaskDescriptor, - ValueTypeDescriptor, - }, + use spider_core::task::{ + DataTypeDescriptor, + ExecutionPolicy, + TaskDescriptor, + TaskGraph as SubmittedTaskGraph, + TerminationTaskDescriptor, + ValueTypeDescriptor, }; use super::*; + use crate::cache::job_submission::ValidatedJobSubmission; /// # Returns /// diff --git a/components/spider-storage/src/db/mariadb.rs b/components/spider-storage/src/db/mariadb.rs index 902849a8..faeda2a6 100644 --- a/components/spider-storage/src/db/mariadb.rs +++ b/components/spider-storage/src/db/mariadb.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use const_format::formatcp; use secrecy::ExposeSecret; use spider_core::{ - job::{JobState, ValidatedJobSubmission}, + job::JobState, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId, SessionId}, io::TaskOutput, @@ -14,6 +14,7 @@ use spider_derive::MySqlEnum; use sqlx::{MySqlPool, mysql::MySqlDatabaseError}; use crate::{ + cache::job_submission::ValidatedJobSubmission, config::DatabaseConfig, db::{ DbError, diff --git a/components/spider-storage/src/db/protocol.rs b/components/spider-storage/src/db/protocol.rs index 56231c26..0f286f9a 100644 --- a/components/spider-storage/src/db/protocol.rs +++ b/components/spider-storage/src/db/protocol.rs @@ -2,14 +2,14 @@ use std::net::IpAddr; use async_trait::async_trait; use spider_core::{ - job::{JobState, ValidatedJobSubmission}, + job::JobState, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId, SessionId}, io::TaskOutput, }, }; -use crate::db::error::DbError; +use crate::{cache::job_submission::ValidatedJobSubmission, db::error::DbError}; /// The database storage interface. A database storage must implement the following traits: /// diff --git a/components/spider-storage/src/state/job_cache.rs b/components/spider-storage/src/state/job_cache.rs index eb8e579f..f7691613 100644 --- a/components/spider-storage/src/state/job_cache.rs +++ b/components/spider-storage/src/state/job_cache.rs @@ -131,7 +131,7 @@ mod tests { use std::sync::Arc; use spider_core::{ - job::{JobState, ValidatedJobSubmission}, + job::JobState, task::{ DataTypeDescriptor, ExecutionPolicy, @@ -151,6 +151,7 @@ mod tests { cache::{ error::InternalError, job::SharedJobControlBlock, + job_submission::ValidatedJobSubmission, task::{SharedTaskControlBlock, SharedTerminationTaskControlBlock}, }, db::DbError, diff --git a/components/spider-storage/src/task_instance_pool.rs b/components/spider-storage/src/task_instance_pool.rs index 6f5c903a..3d4eae00 100644 --- a/components/spider-storage/src/task_instance_pool.rs +++ b/components/spider-storage/src/task_instance_pool.rs @@ -524,7 +524,6 @@ mod tests { use async_trait::async_trait; use spider_core::{ - job::ValidatedJobSubmission, task::{ DataTypeDescriptor, ExecutionPolicy, @@ -541,6 +540,7 @@ mod tests { use tokio::sync::Mutex; use super::*; + use crate::cache::job_submission::ValidatedJobSubmission; const DEFAULT_CHANNEL_SIZE: usize = 128; diff --git a/components/spider-storage/tests/jcb_test.rs b/components/spider-storage/tests/jcb_test.rs index 9ffd234c..110dcfec 100644 --- a/components/spider-storage/tests/jcb_test.rs +++ b/components/spider-storage/tests/jcb_test.rs @@ -1,5 +1,8 @@ -use spider_core::job::{JobState, ValidatedJobSubmission}; -use spider_storage::db::{ExternalJobOrchestration, InternalJobOrchestration}; +use spider_core::job::JobState; +use spider_storage::{ + cache::job_submission::ValidatedJobSubmission, + db::{ExternalJobOrchestration, InternalJobOrchestration}, +}; use super::{ scheduling_infra::{ diff --git a/components/spider-storage/tests/mariadb_test.rs b/components/spider-storage/tests/mariadb_test.rs index 5c5d2207..89433207 100644 --- a/components/spider-storage/tests/mariadb_test.rs +++ b/components/spider-storage/tests/mariadb_test.rs @@ -4,20 +4,23 @@ use std::{ }; use spider_core::{ - job::{JobState, ValidatedJobSubmission}, + job::JobState, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId}, io::TaskInput, }, }; -use spider_storage::db::{ - DbError, - ExecutionManagerLivenessManagement, - ExternalJobOrchestration, - InternalJobOrchestration, - MariaDbStorageConnector, - ResourceGroupManagement, - SessionManagement, +use spider_storage::{ + cache::job_submission::ValidatedJobSubmission, + db::{ + DbError, + ExecutionManagerLivenessManagement, + ExternalJobOrchestration, + InternalJobOrchestration, + MariaDbStorageConnector, + ResourceGroupManagement, + SessionManagement, + }, }; use super::{ diff --git a/components/spider-storage/tests/scheduling_infra.rs b/components/spider-storage/tests/scheduling_infra.rs index 0dbb27b0..d3e5eb98 100644 --- a/components/spider-storage/tests/scheduling_infra.rs +++ b/components/spider-storage/tests/scheduling_infra.rs @@ -84,7 +84,7 @@ use async_trait::async_trait; use dashmap::DashMap; use rand::{Rng, SeedableRng}; use spider_core::{ - job::{JobState, ValidatedJobSubmission}, + job::JobState, task::TaskIndex, types::{ id::{ExecutionManagerId, JobId, ResourceGroupId, TaskInstanceId}, @@ -96,6 +96,7 @@ use spider_storage::{ TaskId, error::{CacheError, InternalError}, job::SharedJobControlBlock, + job_submission::ValidatedJobSubmission, task::{SharedTaskControlBlock, SharedTerminationTaskControlBlock}, }, db::{DbError, ExternalJobOrchestration, InternalJobOrchestration, MariaDbStorageConnector}, From 3b5fcf8b6a9ed7417e27dc7ce3487989248ec208 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Thu, 7 May 2026 20:50:53 -0400 Subject: [PATCH 05/10] Fix error --- components/spider-storage/src/cache/error.rs | 12 ++++++------ .../spider-storage/src/cache/job_submission.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/components/spider-storage/src/cache/error.rs b/components/spider-storage/src/cache/error.rs index d7f0f434..29def958 100644 --- a/components/spider-storage/src/cache/error.rs +++ b/components/spider-storage/src/cache/error.rs @@ -41,6 +41,12 @@ pub enum InternalError { #[error("task graph corrupted: {0}")] TaskGraphCorrupted(String), + #[error("task graph must contain at least one task")] + TaskGraphEmpty, + + #[error("task graph input size mismatch: expected {expected}, got {actual}")] + TaskGraphInputSizeMismatch { expected: usize, actual: usize }, + #[error("job not started")] JobNotStarted, @@ -70,12 +76,6 @@ pub enum InternalError { #[error("ready queue channel is closed")] ReadyQueueChannelClosed, - - #[error("task graph must contain at least one task")] - EmptyTaskGraph, - - #[error("expected {expected} graph inputs, got {actual}")] - InputCountMismatch { expected: usize, actual: usize }, } /// Enums for all errors representing operations that are rejected due to stale cache state. diff --git a/components/spider-storage/src/cache/job_submission.rs b/components/spider-storage/src/cache/job_submission.rs index 4e9afd8b..7e01627d 100644 --- a/components/spider-storage/src/cache/job_submission.rs +++ b/components/spider-storage/src/cache/job_submission.rs @@ -24,18 +24,18 @@ impl ValidatedJobSubmission { /// /// Returns an error if: /// - /// * [`InternalError::EmptyTaskGraph`] if the task graph contains no tasks. - /// * [`InternalError::InputCountMismatch`] if the number of inputs does not match the number of + /// * [`InternalError::TaskGraphEmpty`] if the task graph contains no tasks. + /// * [`InternalError::TaskGraphInputSizeMismatch`] if the number of inputs does not match the number of /// graph inputs. pub fn validate(task_graph: TaskGraph, inputs: Vec) -> Result { let num_tasks = task_graph.get_num_tasks(); if num_tasks == 0 { - return Err(InternalError::EmptyTaskGraph); + return Err(InternalError::TaskGraphEmpty); } let expected_inputs = task_graph.get_task_graph_input_indices().len(); let actual_inputs = inputs.len(); if expected_inputs != actual_inputs { - return Err(InternalError::InputCountMismatch { + return Err(InternalError::TaskGraphInputSizeMismatch { expected: expected_inputs, actual: actual_inputs, }); @@ -120,7 +120,7 @@ mod tests { let inputs = vec![]; let result = ValidatedJobSubmission::validate(graph, inputs); assert!( - matches!(result, Err(InternalError::EmptyTaskGraph)), + matches!(result, Err(InternalError::TaskGraphEmpty)), "empty task graph should return EmptyTaskGraph" ); } @@ -133,7 +133,7 @@ mod tests { assert!( matches!( result, - Err(InternalError::InputCountMismatch { + Err(InternalError::TaskGraphInputSizeMismatch { expected: 1, actual: 0 }) From bdd9d3eb3ea8f2ea8f49272a19495cf09dffd4b1 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Thu, 7 May 2026 20:54:58 -0400 Subject: [PATCH 06/10] Fix docstring --- components/spider-storage/src/cache/task.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/spider-storage/src/cache/task.rs b/components/spider-storage/src/cache/task.rs index d6e38035..3ad8d3af 100644 --- a/components/spider-storage/src/cache/task.rs +++ b/components/spider-storage/src/cache/task.rs @@ -42,9 +42,9 @@ impl TaskGraph { /// Returns an error if: /// /// * [`InternalError::TaskGraphCorrupted`] if: - /// * Any dataflow deps’ index is out-of-range. + /// * Any dataflow deps' index is out-of-range. /// * Any task index is out-of-range. - /// * Forwards [`SharedTaskControlBlock::create`]’s return values on failure. + /// * Forwards [`SharedTaskControlBlock::create`]'s return values on failure. /// /// # Panics /// From 4dc7cd878a5ff08507e4f820c52d29d2e03d5621 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Thu, 7 May 2026 21:03:40 -0400 Subject: [PATCH 07/10] Task graph consume validated job submission --- components/spider-storage/src/cache/job.rs | 2 +- .../spider-storage/src/cache/job_submission.rs | 4 ++-- components/spider-storage/src/cache/task.rs | 16 ++++++---------- .../spider-storage/src/task_instance_pool.rs | 2 +- 4 files changed, 10 insertions(+), 14 deletions(-) diff --git a/components/spider-storage/src/cache/job.rs b/components/spider-storage/src/cache/job.rs index c0d42af3..5c575e8e 100644 --- a/components/spider-storage/src/cache/job.rs +++ b/components/spider-storage/src/cache/job.rs @@ -74,7 +74,7 @@ impl< task_instance_pool_connector: TaskInstancePoolConnectorType, ) -> Result { let num_tasks = job_submission.task_graph().get_num_tasks(); - let task_graph = TaskGraph::create(&job_submission).await?; + let task_graph = TaskGraph::create(job_submission).await?; let job_execution_state = JobExecutionState { state: JobState::Ready, task_graph, diff --git a/components/spider-storage/src/cache/job_submission.rs b/components/spider-storage/src/cache/job_submission.rs index 7e01627d..929206b0 100644 --- a/components/spider-storage/src/cache/job_submission.rs +++ b/components/spider-storage/src/cache/job_submission.rs @@ -25,8 +25,8 @@ impl ValidatedJobSubmission { /// Returns an error if: /// /// * [`InternalError::TaskGraphEmpty`] if the task graph contains no tasks. - /// * [`InternalError::TaskGraphInputSizeMismatch`] if the number of inputs does not match the number of - /// graph inputs. + /// * [`InternalError::TaskGraphInputSizeMismatch`] if the number of inputs does not match the + /// number of graph inputs. pub fn validate(task_graph: TaskGraph, inputs: Vec) -> Result { let num_tasks = task_graph.get_num_tasks(); if num_tasks == 0 { diff --git a/components/spider-storage/src/cache/task.rs b/components/spider-storage/src/cache/task.rs index 3ad8d3af..61f5f12a 100644 --- a/components/spider-storage/src/cache/task.rs +++ b/components/spider-storage/src/cache/task.rs @@ -49,18 +49,14 @@ impl TaskGraph { /// # Panics /// /// Panics if the internal TCB buffer is corrupted. - pub async fn create(job_submission: &ValidatedJobSubmission) -> Result { - let submitted_task_graph = job_submission.task_graph(); - let inputs = job_submission.inputs(); + pub async fn create(job_submission: ValidatedJobSubmission) -> Result { + let (submitted_task_graph, inputs) = job_submission.into_parts(); let dataflow_dep_buffer: Vec> = (0..submitted_task_graph .get_num_dataflow_deps()) .map(|_| SharedRw::new(RwLock::new(ValuePayload::default()))) .collect(); let task_graph_input_indices = submitted_task_graph.get_task_graph_input_indices(); - for (deps_index, input) in task_graph_input_indices - .into_iter() - .zip(inputs.iter().cloned()) - { + for (deps_index, input) in task_graph_input_indices.into_iter().zip(inputs) { let dataflow_dep = dataflow_dep_buffer.get(deps_index).ok_or_else(|| { InternalError::TaskGraphCorrupted( "dataflow dependency index out-of-range".to_owned(), @@ -1082,7 +1078,7 @@ mod tests { .collect(); let job_submission = ValidatedJobSubmission::validate(submitted, inputs) .expect("job submission should be valid"); - TaskGraph::create(&job_submission) + TaskGraph::create(job_submission) .await .expect("cache task graph creation should succeed") } @@ -1126,7 +1122,7 @@ mod tests { .expect("task insertion should succeed"); let job_submission = ValidatedJobSubmission::validate(submitted, vec![]) .expect("job submission should be valid"); - let task_graph = TaskGraph::create(&job_submission) + let task_graph = TaskGraph::create(job_submission) .await .expect("cache task graph creation should succeed"); task_graph @@ -1243,7 +1239,7 @@ mod tests { ]; let job_submission = ValidatedJobSubmission::validate(submitted, inputs) .expect("job submission should be valid"); - TaskGraph::create(&job_submission) + TaskGraph::create(job_submission) .await .expect("cache task graph creation should succeed") } diff --git a/components/spider-storage/src/task_instance_pool.rs b/components/spider-storage/src/task_instance_pool.rs index 3d4eae00..ec6fdbf7 100644 --- a/components/spider-storage/src/task_instance_pool.rs +++ b/components/spider-storage/src/task_instance_pool.rs @@ -667,7 +667,7 @@ mod tests { vec![TaskInput::ValuePayload(vec![0u8; 4])], ) .expect("job submission should be valid"); - let task_graph = crate::cache::task::TaskGraph::create(&job_submission) + let task_graph = crate::cache::task::TaskGraph::create(job_submission) .await .expect("cache task graph creation should succeed"); task_graph From b53714d2af20f6bf0baaa46bf7ea1ce7ebbab5b8 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Thu, 7 May 2026 21:27:12 -0400 Subject: [PATCH 08/10] Fix docstring --- components/spider-storage/src/cache/job_submission.rs | 6 +++++- components/spider-storage/src/db/protocol.rs | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/components/spider-storage/src/cache/job_submission.rs b/components/spider-storage/src/cache/job_submission.rs index 929206b0..ce291152 100644 --- a/components/spider-storage/src/cache/job_submission.rs +++ b/components/spider-storage/src/cache/job_submission.rs @@ -20,6 +20,10 @@ pub struct ValidatedJobSubmission { impl ValidatedJobSubmission { /// Creates a new validated job submission. /// + /// # Returns + /// + /// The validated job submission on success. + /// /// # Errors /// /// Returns an error if: @@ -138,7 +142,7 @@ mod tests { actual: 0 }) ), - "mismatched input count should return InputCountMismatch" + "mismatched input count should return TaskGraphInputSizeMismatch" ); } diff --git a/components/spider-storage/src/db/protocol.rs b/components/spider-storage/src/db/protocol.rs index 0f286f9a..0b9e297f 100644 --- a/components/spider-storage/src/db/protocol.rs +++ b/components/spider-storage/src/db/protocol.rs @@ -46,8 +46,8 @@ pub trait ExternalJobOrchestration { /// Returns an error if: /// /// * [`DbError::ResourceGroupNotFound`] if the `resource_group_id` does not exist. - /// * [`DbError::TaskGraphSerializationFailure`] if the `task_graph` serialization fails. - /// * [`DbError::ValueSerializationFailure`] if the `job_inputs` serialization fails. + /// * [`DbError::TaskGraphSerializationFailure`] if the task graph serialization fails. + /// * [`DbError::ValueSerializationFailure`] if the job inputs serialization fails. /// * Forwards [`sqlx::error::Error`] on DB operation failure. async fn register( &self, From f9e116381c458081e9aa617615addf30d7f55d7d Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Fri, 8 May 2026 11:01:58 -0400 Subject: [PATCH 09/10] Apply suggestions from code review Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com> --- components/spider-storage/src/cache/job_submission.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/components/spider-storage/src/cache/job_submission.rs b/components/spider-storage/src/cache/job_submission.rs index ce291152..f50164ee 100644 --- a/components/spider-storage/src/cache/job_submission.rs +++ b/components/spider-storage/src/cache/job_submission.rs @@ -36,12 +36,12 @@ impl ValidatedJobSubmission { if num_tasks == 0 { return Err(InternalError::TaskGraphEmpty); } - let expected_inputs = task_graph.get_task_graph_input_indices().len(); - let actual_inputs = inputs.len(); - if expected_inputs != actual_inputs { + let expected_num_inputs = task_graph.get_task_graph_input_indices().len(); + let actual_num_inputs = inputs.len(); + if expected_num_inputs != actual_num_inputs { return Err(InternalError::TaskGraphInputSizeMismatch { - expected: expected_inputs, - actual: actual_inputs, + expected: expected_num_inputs, + actual: actual_num_inputs, }); } Ok(Self { task_graph, inputs }) From 1e09f8d08db057b1d9994189693600d87b5ef21e Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Fri, 8 May 2026 17:27:51 -0400 Subject: [PATCH 10/10] Rename validate to create --- .../src/cache/job_submission.rs | 10 ++--- components/spider-storage/src/cache/task.rs | 6 +-- .../spider-storage/src/state/job_cache.rs | 16 +++---- .../spider-storage/src/task_instance_pool.rs | 8 ++-- components/spider-storage/tests/jcb_test.rs | 12 +++--- .../spider-storage/tests/mariadb_test.rs | 42 +++++++++---------- 6 files changed, 44 insertions(+), 50 deletions(-) diff --git a/components/spider-storage/src/cache/job_submission.rs b/components/spider-storage/src/cache/job_submission.rs index ce291152..aa4864ec 100644 --- a/components/spider-storage/src/cache/job_submission.rs +++ b/components/spider-storage/src/cache/job_submission.rs @@ -31,7 +31,7 @@ impl ValidatedJobSubmission { /// * [`InternalError::TaskGraphEmpty`] if the task graph contains no tasks. /// * [`InternalError::TaskGraphInputSizeMismatch`] if the number of inputs does not match the /// number of graph inputs. - pub fn validate(task_graph: TaskGraph, inputs: Vec) -> Result { + pub fn create(task_graph: TaskGraph, inputs: Vec) -> Result { let num_tasks = task_graph.get_num_tasks(); if num_tasks == 0 { return Err(InternalError::TaskGraphEmpty); @@ -113,7 +113,7 @@ mod tests { fn valid_job_submission_succeeds() { let graph = create_single_input_task_graph(); let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; - let result = ValidatedJobSubmission::validate(graph, inputs); + let result = ValidatedJobSubmission::create(graph, inputs); assert!(result.is_ok(), "valid submission should succeed"); } @@ -122,7 +122,7 @@ mod tests { let graph = SubmittedTaskGraph::new(None, None).expect("task graph creation should succeed"); let inputs = vec![]; - let result = ValidatedJobSubmission::validate(graph, inputs); + let result = ValidatedJobSubmission::create(graph, inputs); assert!( matches!(result, Err(InternalError::TaskGraphEmpty)), "empty task graph should return EmptyTaskGraph" @@ -133,7 +133,7 @@ mod tests { fn mismatched_input_count_fails() { let graph = create_single_input_task_graph(); let inputs = vec![]; - let result = ValidatedJobSubmission::validate(graph, inputs); + let result = ValidatedJobSubmission::create(graph, inputs); assert!( matches!( result, @@ -151,7 +151,7 @@ mod tests { let graph = create_single_input_task_graph(); let inputs = vec![TaskInput::ValuePayload(vec![1u8; 4])]; let submission = - ValidatedJobSubmission::validate(graph, inputs).expect("submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("submission should be valid"); let (graph, inputs) = submission.into_parts(); assert_eq!(graph.get_num_tasks(), 1, "task graph should have 1 task"); assert_eq!(inputs.len(), 1, "should have 1 input"); diff --git a/components/spider-storage/src/cache/task.rs b/components/spider-storage/src/cache/task.rs index 61f5f12a..416a3264 100644 --- a/components/spider-storage/src/cache/task.rs +++ b/components/spider-storage/src/cache/task.rs @@ -1076,7 +1076,7 @@ mod tests { let inputs: Vec = (0..num_inputs) .map(|_| TaskInput::ValuePayload(vec![0u8; 4])) .collect(); - let job_submission = ValidatedJobSubmission::validate(submitted, inputs) + let job_submission = ValidatedJobSubmission::create(submitted, inputs) .expect("job submission should be valid"); TaskGraph::create(job_submission) .await @@ -1120,7 +1120,7 @@ mod tests { input_sources: None, }) .expect("task insertion should succeed"); - let job_submission = ValidatedJobSubmission::validate(submitted, vec![]) + let job_submission = ValidatedJobSubmission::create(submitted, vec![]) .expect("job submission should be valid"); let task_graph = TaskGraph::create(job_submission) .await @@ -1237,7 +1237,7 @@ mod tests { TaskInput::ValuePayload(input_a), TaskInput::ValuePayload(input_b), ]; - let job_submission = ValidatedJobSubmission::validate(submitted, inputs) + let job_submission = ValidatedJobSubmission::create(submitted, inputs) .expect("job submission should be valid"); TaskGraph::create(job_submission) .await diff --git a/components/spider-storage/src/state/job_cache.rs b/components/spider-storage/src/state/job_cache.rs index f7691613..6ad3c7ce 100644 --- a/components/spider-storage/src/state/job_cache.rs +++ b/components/spider-storage/src/state/job_cache.rs @@ -277,11 +277,9 @@ mod tests { }) .expect("task insertion should succeed"); - let job_submission = ValidatedJobSubmission::validate( - submitted, - vec![TaskInput::ValuePayload(vec![0u8; 4])], - ) - .expect("job submission should be valid"); + let job_submission = + ValidatedJobSubmission::create(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + .expect("job submission should be valid"); SharedJobControlBlock::create( job_id, spider_core::types::id::ResourceGroupId::new(), @@ -459,11 +457,9 @@ mod tests { .expect("task insertion should succeed"); let job_id = JobId::new(); - let job_submission = ValidatedJobSubmission::validate( - submitted, - vec![TaskInput::ValuePayload(vec![0u8; 4])], - ) - .expect("job submission should be valid"); + let job_submission = + ValidatedJobSubmission::create(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + .expect("job submission should be valid"); let jcb = SharedJobControlBlock::create( job_id, spider_core::types::id::ResourceGroupId::new(), diff --git a/components/spider-storage/src/task_instance_pool.rs b/components/spider-storage/src/task_instance_pool.rs index ec6fdbf7..ace45ce6 100644 --- a/components/spider-storage/src/task_instance_pool.rs +++ b/components/spider-storage/src/task_instance_pool.rs @@ -662,11 +662,9 @@ mod tests { input_sources: None, }) .expect("task insertion should succeed"); - let job_submission = ValidatedJobSubmission::validate( - submitted, - vec![TaskInput::ValuePayload(vec![0u8; 4])], - ) - .expect("job submission should be valid"); + let job_submission = + ValidatedJobSubmission::create(submitted, vec![TaskInput::ValuePayload(vec![0u8; 4])]) + .expect("job submission should be valid"); let task_graph = crate::cache::task::TaskGraph::create(job_submission) .await .expect("cache task graph creation should succeed"); diff --git a/components/spider-storage/tests/jcb_test.rs b/components/spider-storage/tests/jcb_test.rs index 110dcfec..6f444343 100644 --- a/components/spider-storage/tests/jcb_test.rs +++ b/components/spider-storage/tests/jcb_test.rs @@ -51,7 +51,7 @@ async fn test_flat_success( let (graph, inputs) = build_flat_task_graph(10_000, 1024, true, true); let num_tasks = graph.get_num_tasks(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, @@ -94,7 +94,7 @@ async fn test_flat_cancel( ) -> WorkloadResult { let (graph, inputs) = build_flat_task_graph(10_000, 1024, true, true); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, @@ -139,7 +139,7 @@ async fn test_neural_net_success WorkloadResult { let (graph, inputs) = build_neural_net_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, @@ -228,7 +228,7 @@ async fn test_always_fail_terminates_job WorkloadResult { let (graph, inputs) = build_flat_task_graph(3, 128, false, false); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, @@ -267,7 +267,7 @@ async fn test_concurrent_success_and_cancel WorkloadResult { let (graph, inputs) = build_flat_task_graph(100, 128, true, true); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let result = run_workload( job_submission, db_connector_factory, diff --git a/components/spider-storage/tests/mariadb_test.rs b/components/spider-storage/tests/mariadb_test.rs index 89433207..3b90ab07 100644 --- a/components/spider-storage/tests/mariadb_test.rs +++ b/components/spider-storage/tests/mariadb_test.rs @@ -62,7 +62,7 @@ async fn test_register_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -83,7 +83,7 @@ async fn test_register_job_invalid_resource_group() { let fake_rg_id = ResourceGroupId::new(); let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let result = storage.register(fake_rg_id, &job_submission).await; @@ -100,7 +100,7 @@ async fn test_start_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -123,7 +123,7 @@ async fn test_start_job_wrong_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -146,7 +146,7 @@ async fn test_cancel_job_without_cleanup_transitions_to_cancelled() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -174,7 +174,7 @@ async fn test_get_outputs_succeeded_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -202,7 +202,7 @@ async fn test_get_outputs_wrong_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -223,7 +223,7 @@ async fn test_get_error_failed_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -250,7 +250,7 @@ async fn test_get_error_wrong_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -271,7 +271,7 @@ async fn test_cancel_job_with_cleanup_transitions_to_cleanup_ready() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -298,7 +298,7 @@ async fn test_cancel_already_terminal() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -328,7 +328,7 @@ async fn test_set_state_valid_transition() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -353,7 +353,7 @@ async fn test_set_state_invalid_transition() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -378,7 +378,7 @@ async fn test_commit_outputs_without_commit_task() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -405,7 +405,7 @@ async fn test_commit_outputs_with_commit_task() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -435,7 +435,7 @@ async fn test_commit_outputs_wrong_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -461,7 +461,7 @@ async fn test_fail_job() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -488,7 +488,7 @@ async fn test_fail_terminal_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -519,7 +519,7 @@ async fn test_delete_expired_terminated_jobs() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -734,7 +734,7 @@ async fn test_cancel_from_ready_state() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission) @@ -760,7 +760,7 @@ async fn test_delete_expired_terminated_jobs_no_match() { let rg_id = create_test_resource_group(&storage).await; let (graph, inputs) = single_task_graph(); let job_submission = - ValidatedJobSubmission::validate(graph, inputs).expect("job submission should be valid"); + ValidatedJobSubmission::create(graph, inputs).expect("job submission should be valid"); let job_id = storage .register(rg_id, &job_submission)