Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
684491f
Add job cache
sitaowang1998 May 4, 2026
00c425c
Don't insert when key already exists
sitaowang1998 May 4, 2026
7d63dd4
Fix docstring
sitaowang1998 May 4, 2026
8752d5e
Apply suggestions from code review
sitaowang1998 May 5, 2026
09754bb
Add service state
sitaowang1998 May 5, 2026
1711ee7
Use String for error
sitaowang1998 May 5, 2026
0327d45
Use TaskTracker
sitaowang1998 May 5, 2026
2fa04ae
Merge branch 'main' into jobentry
sitaowang1998 May 5, 2026
a9d6cfc
Add resent tasks to ready queue
sitaowang1998 May 5, 2026
19cda5a
Fix error and add resend
sitaowang1998 May 5, 2026
a5d32c0
Address comment
sitaowang1998 May 6, 2026
24afc05
Fix docstring
sitaowang1998 May 6, 2026
deda9b6
Merge remote-tracking branch 'origin/jobentry' into job-lifecycle
sitaowang1998 May 6, 2026
d1b5fbc
Fix merge conflicts: add Db variant to StorageServerError, update ins…
sitaowang1998 May 6, 2026
140cfe7
Remove unnecessary comment
sitaowang1998 May 6, 2026
c9a5798
Submit job fetch from db
sitaowang1998 May 6, 2026
f10f820
Bug fix
sitaowang1998 May 6, 2026
cd3ce08
Use counter for mock test
sitaowang1998 May 6, 2026
a20f074
Merge branch 'jobentry' into job-lifecycle
sitaowang1998 May 6, 2026
6cd403c
Improve codestyle
sitaowang1998 May 6, 2026
505fc54
Fix error
sitaowang1998 May 6, 2026
ce25c8d
Merge branch 'main' into job-lifecycle
sitaowang1998 May 7, 2026
92e9cf6
Extract mock test module
sitaowang1998 May 7, 2026
5757c77
Use DbStorage instead of specific trait
sitaowang1998 May 7, 2026
bd61189
Fix stale docstring and add SessionId to import in test_mocks
sitaowang1998 May 7, 2026
ec36221
Address pr review comment
sitaowang1998 May 8, 2026
c1dee1c
Add session id
sitaowang1998 May 8, 2026
8eeb96f
Add tracing
sitaowang1998 May 8, 2026
c0283ee
Fix toml
sitaowang1998 May 8, 2026
85ceaf0
Merge origin/main into job-lifecycle
sitaowang1998 May 9, 2026
31e3eee
Address comment
sitaowang1998 May 9, 2026
5a18c14
Fix docstring and add tests
sitaowang1998 May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions components/spider-storage/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
pub mod error;
pub mod job_cache;
pub mod service;

pub use error::StorageServerError;
pub use job_cache::JobCache;
pub use service::ServiceState;

#[cfg(test)]
mod test_mocks;
12 changes: 9 additions & 3 deletions components/spider-storage/src/state/error.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
use spider_core::types::id::JobId;

use crate::cache::error::CacheError;
use crate::{cache::error::CacheError, db::DbError};

/// Errors that can occur during storage server operations.
#[derive(thiserror::Error, Debug)]
pub enum StorageServerError {
#[error(transparent)]
Cache(#[from] CacheError),

#[error(transparent)]
Db(#[from] DbError),

#[error("stale session")]
StaleSession,

#[error("server is shutting down: {0}")]
Stopping(String),

#[error("bad request: {0}")]
BadRequest(String),
#[error("job not found in cache: {0:?}")]
JobNotFound(JobId),

#[error("job already exists: {0:?}")]
JobAlreadyExists(JobId),

#[error("bad request: {0}")]
BadRequest(String),
}
119 changes: 6 additions & 113 deletions components/spider-storage/src/state/job_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
cache::job::SharedJobControlBlock,
db::InternalJobOrchestration,
ready_queue::ReadyQueueSender,
state::error::StorageServerError,
state::StorageServerError,
task_instance_pool::TaskInstancePoolConnector,
};

Expand Down Expand Up @@ -131,7 +131,6 @@ mod tests {
use std::sync::Arc;

use spider_core::{
job::JobState,
task::{
DataTypeDescriptor,
ExecutionPolicy,
Expand All @@ -140,122 +139,16 @@ mod tests {
TdlContext,
ValueTypeDescriptor,
},
types::{
id::JobId,
io::{TaskInput, TaskOutput},
},
types::{id::JobId, io::TaskInput},
};

use super::*;
use crate::{
cache::{
error::InternalError,
job::SharedJobControlBlock,
task::{SharedTaskControlBlock, SharedTerminationTaskControlBlock},
},
db::DbError,
cache::{error::InternalError, job::SharedJobControlBlock},
ready_queue::ReadyQueueSender,
task_instance_pool::{TaskInstanceMetadata, TaskInstancePoolConnector},
state::test_mocks::{MockDbConnector, MockReadyQueueSender, MockTaskInstancePoolConnector},
};

/// A mock ready queue sender for testing.
#[derive(Clone, Default)]
struct MockReadyQueueSender;

#[async_trait::async_trait]
impl ReadyQueueSender for MockReadyQueueSender {
async fn send_task_ready(
&self,
_rg_id: spider_core::types::id::ResourceGroupId,
_job_id: JobId,
_task_indices: Vec<usize>,
) -> Result<(), InternalError> {
Ok(())
}

async fn send_commit_ready(
&self,
_rg_id: spider_core::types::id::ResourceGroupId,
_job_id: JobId,
) -> Result<(), InternalError> {
Ok(())
}

async fn send_cleanup_ready(
&self,
_rg_id: spider_core::types::id::ResourceGroupId,
_job_id: JobId,
) -> Result<(), InternalError> {
Ok(())
}
}

/// A mock DB connector for testing.
#[derive(Clone, Default)]
struct MockDbConnector;

#[async_trait::async_trait]
impl InternalJobOrchestration for MockDbConnector {
async fn start(&self, _job_id: JobId) -> Result<(), DbError> {
Ok(())
}

async fn set_state(&self, _job_id: JobId, _state: JobState) -> Result<(), DbError> {
Ok(())
}

async fn commit_outputs(
&self,
_job_id: JobId,
_outputs: Vec<TaskOutput>,
_has_commit_task: bool,
) -> Result<(), DbError> {
Ok(())
}

async fn cancel(&self, _job_id: JobId, _has_cleanup_task: bool) -> Result<(), DbError> {
Ok(())
}

async fn fail(&self, _job_id: JobId, _error_message: String) -> Result<(), DbError> {
Ok(())
}

async fn delete_expired_terminated_jobs(
&self,
_expire_after_sec: u64,
) -> Result<Vec<JobId>, DbError> {
Ok(Vec::new())
}
}

/// A mock task instance pool connector for testing.
#[derive(Clone, Default)]
struct MockTaskInstancePoolConnector;

#[async_trait::async_trait]
impl TaskInstancePoolConnector for MockTaskInstancePoolConnector {
fn get_next_available_task_instance_id(&self) -> spider_core::types::id::TaskInstanceId {
1
}

async fn register_task_instance(
&self,
_tcb: SharedTaskControlBlock,
_registration: TaskInstanceMetadata,
) -> Result<(), InternalError> {
Ok(())
}

async fn register_termination_task_instance(
&self,
_termination_tcb: SharedTerminationTaskControlBlock,
_registration: TaskInstanceMetadata,
) -> Result<(), InternalError> {
Ok(())
}
}

async fn create_test_jcb(
job_id: JobId,
) -> SharedJobControlBlock<MockReadyQueueSender, MockDbConnector, MockTaskInstancePoolConnector>
Expand All @@ -282,7 +175,7 @@ mod tests {
&submitted,
vec![TaskInput::ValuePayload(vec![0u8; 4])],
MockReadyQueueSender,
MockDbConnector,
MockDbConnector::default(),
MockTaskInstancePoolConnector,
)
.await
Expand Down Expand Up @@ -460,7 +353,7 @@ mod tests {
&submitted,
vec![TaskInput::ValuePayload(vec![0u8; 4])],
sender,
MockDbConnector,
MockDbConnector::default(),
MockTaskInstancePoolConnector,
)
.await
Expand Down
Loading
Loading