Skip to content
Open
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
f959ef3
Add job cancel storage call
sitaowang1998 May 8, 2025
8a3ef43
Add tests for job cancel storage function
sitaowang1998 May 8, 2025
a79dff1
Add get_task_state
sitaowang1998 May 8, 2025
80e27cb
Add cancel messages in storage
sitaowang1998 May 8, 2025
f7fa1d7
Add get message function in storage
sitaowang1998 May 8, 2025
9aced17
Fix job error message
sitaowang1998 May 8, 2025
2198185
Add error message into tests
sitaowang1998 May 8, 2025
25e9b3f
Add cancel_job_by_task
sitaowang1998 May 8, 2025
4292d46
Add tests for cancel_job_by_task
sitaowang1998 May 8, 2025
6a1f9e0
Add job cancel in client
sitaowang1998 May 9, 2025
e4c8f99
Do not fail the task if it is not running
sitaowang1998 May 12, 2025
127cd74
Fix clang-tidy
sitaowang1998 May 12, 2025
d964d67
Add task executor cancel state check
sitaowang1998 May 12, 2025
2e2ddf8
Add ExecutorHandle class
sitaowang1998 May 12, 2025
1a724ce
Add job cancel check in worker
sitaowang1998 May 12, 2025
4b59d39
Fix missing argument for worker threads
sitaowang1998 May 12, 2025
c79eefc
Fixed lock guard const function
sitaowang1998 May 12, 2025
f3cb222
Fix clang-tidy
sitaowang1998 May 12, 2025
7342925
Add function name in job_errors table
sitaowang1998 May 12, 2025
5e841f8
Add client and tasks for cancel tests
sitaowang1998 May 12, 2025
a988751
Add get func_name column in storage
sitaowang1998 May 12, 2025
a58f58e
Add get_error in job
sitaowang1998 May 12, 2025
23d94ff
Fix clang tidy
sitaowang1998 May 12, 2025
d9e837f
Update unit test with new get_error
sitaowang1998 May 12, 2025
24563ce
Check for running status before parsing result
sitaowang1998 May 12, 2025
f1afa62
Improve storage task_fail state check
sitaowang1998 May 12, 2025
afbf6b9
Add integration tests for cancel and abort
sitaowang1998 May 12, 2025
e2cad8f
Revert test storage url change
sitaowang1998 May 12, 2025
a55ffac
Fix test cmake
sitaowang1998 May 12, 2025
709422d
Fix clang-tidy
sitaowang1998 May 12, 2025
228d32a
Fix storage docstring
sitaowang1998 May 12, 2025
b351b11
Reuse connection in worker heartbeat thread
sitaowang1998 May 12, 2025
6711918
Fix typo
sitaowang1998 May 12, 2025
134c811
Fix clang-tidy
sitaowang1998 May 12, 2025
9a33bbd
Do not add error message when cancel job from user
sitaowang1998 May 12, 2025
17dd4df
Add cancel inside ExecutorHandle
sitaowang1998 May 12, 2025
1f4160b
Merge branch 'main' into job_cancel
sitaowang1998 May 26, 2025
31a1579
Merge branch 'main' into job_cancel
sitaowang1998 May 29, 2025
670864c
Rename get_job_message to get_error_message
sitaowang1998 May 29, 2025
f500c1a
Rename TaskExecutor is_* functions
sitaowang1998 May 29, 2025
bef31da
Fix comment grammer
sitaowang1998 May 29, 2025
216913e
Remove unnecessary comment
sitaowang1998 May 29, 2025
594f32e
Improve ExeuctorHandle docstring
sitaowang1998 May 29, 2025
623ee0e
Fix relative include header
sitaowang1998 May 29, 2025
5b8ab01
Fix library include
sitaowang1998 May 29, 2025
cd55a1e
Improve integration test docstring
sitaowang1998 May 29, 2025
62976a2
Remove unnecessary comment
sitaowang1998 May 29, 2025
89b2813
Remove library header nolint
sitaowang1998 May 29, 2025
c0f533f
Remove unncecessary comments
sitaowang1998 May 29, 2025
e2a2ea6
Refactor job cancel unit test setup into a separate function
sitaowang1998 May 29, 2025
f7b36bb
Fix clang tidy
sitaowang1998 May 29, 2025
1d20f50
Fix clang-tidy
sitaowang1998 May 29, 2025
7368900
Fix clang tidy
sitaowang1998 Jun 2, 2025
2f9d378
Apply suggestions from code review
sitaowang1998 Jun 25, 2025
1bee529
Merge branch 'main' into job_cancel
sitaowang1998 Jun 25, 2025
8ebdfd1
Remove unused function
sitaowang1998 Jun 25, 2025
10bd8ff
Add error message for cancel test by user
sitaowang1998 Jun 25, 2025
a344f88
Refactor cancel integration test
sitaowang1998 Jun 25, 2025
8672fd8
Use maybe_unused
sitaowang1998 Jun 26, 2025
9fa0884
Rename job_errors' func_name column to offender; Improvce docstring
sitaowang1998 Jun 26, 2025
573a74b
Improve docstring of job_cancel_setup in test
sitaowang1998 Jun 26, 2025
2afd0e2
Bug fix
sitaowang1998 Jun 26, 2025
40e5fb5
Rename function
sitaowang1998 Jun 26, 2025
b65d187
Store task id inside TaskExecutor
sitaowang1998 Jun 26, 2025
e4da187
Make ExecutorHandle singleton
sitaowang1998 Jun 26, 2025
18ce4e9
Improve docstring for cancel test job setup
sitaowang1998 Jun 26, 2025
8015388
Fix clang tidy
sitaowang1998 Jun 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ set(SPIDER_WORKER_SOURCES
worker/ChildPid.cpp
worker/DllLoader.hpp
worker/DllLoader.cpp
worker/ExecutorHandle.hpp
worker/ExecutorHandle.cpp
worker/Process.hpp
worker/Process.cpp
worker/TaskExecutor.hpp
Expand Down
39 changes: 37 additions & 2 deletions src/spider/client/Job.hpp
Comment thread
sitaowang1998 marked this conversation as resolved.
Comment thread
sitaowang1998 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,19 @@ class Job {
*
* @throw spider::ConnectionException
*/
auto cancel();
auto cancel() -> void {
std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result
= m_storage_factory->provide_storage_connection();
if (std::holds_alternative<core::StorageErr>(conn_result)) {
throw ConnectionException(std::get<core::StorageErr>(conn_result).description);
}
auto conn = std::move(std::get<std::unique_ptr<core::StorageConnection>>(conn_result));
Comment thread
sitaowang1998 marked this conversation as resolved.

core::StorageErr const err = m_metadata_storage->cancel_job(*conn, m_id);
if (!err.success()) {
throw ConnectionException(err.description);
}
}

/**
* @return Status of the job.
Expand Down Expand Up @@ -153,7 +165,30 @@ class Job {
* @throw spider::ConnectionException
*/
auto get_error() -> std::pair<std::string, std::string> {
Comment thread
sitaowang1998 marked this conversation as resolved.
throw ConnectionException{"Not implemented"};
if (nullptr == m_conn) {
std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result
= m_storage_factory->provide_storage_connection();
if (std::holds_alternative<core::StorageErr>(conn_result)) {
throw ConnectionException(std::get<core::StorageErr>(conn_result).description);
}
auto conn = std::move(std::get<std::unique_ptr<core::StorageConnection>>(conn_result));
Comment thread
sitaowang1998 marked this conversation as resolved.

std::pair<std::string, std::string> res;
core::StorageErr const err
= m_metadata_storage->get_job_message(*conn, m_id, &res.first, &res.second);
if (false == err.success()) {
throw ConnectionException{err.description};
}
return res;
}

std::pair<std::string, std::string> res;
core::StorageErr const err
= m_metadata_storage->get_job_message(*m_conn, m_id, &res.first, &res.second);
if (false == err.success()) {
throw ConnectionException{err.description};
}
return res;
}

private:
Expand Down
16 changes: 16 additions & 0 deletions src/spider/client/TaskContext.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "TaskContext.hpp"

#include <cstdlib>
#include <memory>
#include <optional>
#include <string>
Expand Down Expand Up @@ -69,4 +70,19 @@ auto TaskContext::get_jobs() -> std::vector<boost::uuids::uuid> {
}
return job_ids;
}

auto TaskContext::abort(std::string const& message) -> void {
std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result
= m_storage_factory->provide_storage_connection();
if (std::holds_alternative<core::StorageErr>(conn_result)) {
throw ConnectionException(std::get<core::StorageErr>(conn_result).description);
}
auto conn = std::move(std::get<std::unique_ptr<core::StorageConnection>>(conn_result));
Comment thread
sitaowang1998 marked this conversation as resolved.

core::StorageErr const err = m_metadata_store->cancel_job_by_task(*conn, m_task_id, message);
if (!err.success()) {
throw ConnectionException(err.description);
}
std::quick_exit(1);
}
Comment thread
sitaowang1998 marked this conversation as resolved.
} // namespace spider
38 changes: 38 additions & 0 deletions src/spider/storage/MetadataStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ class MetadataStorage {
std::vector<boost::uuids::uuid>* job_ids
) -> StorageErr
= 0;
/**
* Cancel a job. This will set the job state to CANCEL and set all tasks that have not
* finished or started to CANCEL.
* @param conn
* @param id The job id.
* @return The error code.
*/
virtual auto cancel_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also have an error message parameter here right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. cancel_job is used by client to cancel a job without any error messages.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't fully understand. Are you saying a client cannot set a message (maybe because it is a cli command or something)? If this is the case, it makes sense to not have it as a parameter, but we should still set the error message to something fixed (e.g. "Job cancellation requested by client.") inside cancel_job.

Also, please improve the docstrings so the difference between cancel_job and cancel_job_by_task is more clear (e.g. the different use cases).

/**
* Cancel a job that owns the task. This will set the job state to CANCEL and set all tasks
* that have not finished or started to CANCEL.
* @param conn
* @param id The task id.
* @param message The error message of the cancellation.
* @return The error code.
*/
virtual auto
cancel_job_by_task(StorageConnection& conn, boost::uuids::uuid id, std::string const& message)
-> StorageErr
Comment thread
sitaowang1998 marked this conversation as resolved.
Outdated
= 0;
/**
* Get the error message of a cancelled job.
* @param conn
* @param id The job id.
* @param function_name The function name of the cancelled task.
* @param message The error message of the cancellation.
* @return The error code.
*/
virtual auto get_job_message(
Comment thread
sitaowang1998 marked this conversation as resolved.
Outdated
StorageConnection& conn,
boost::uuids::uuid id,
std::string* function_name,
std::string* message
) -> StorageErr
= 0;
virtual auto remove_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0;
virtual auto reset_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0;
virtual auto add_child(StorageConnection& conn, boost::uuids::uuid parent_id, Task const& child)
Expand All @@ -92,6 +127,9 @@ class MetadataStorage {
virtual auto set_task_state(StorageConnection& conn, boost::uuids::uuid id, TaskState state)
-> StorageErr
= 0;
virtual auto get_task_state(StorageConnection& conn, boost::uuids::uuid id, TaskState* state)
-> StorageErr
= 0;
virtual auto set_task_running(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0;
virtual auto add_task_instance(StorageConnection& conn, TaskInstance const& instance)
-> StorageErr
Expand Down
160 changes: 158 additions & 2 deletions src/spider/storage/mysql/MySqlStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,124 @@ auto MySqlMetadataStorage::get_jobs_by_client_id(
return StorageErr{};
}

auto MySqlMetadataStorage::cancel_job(StorageConnection& conn, boost::uuids::uuid const id)
-> StorageErr {
try {
// Set all pending/ready/running tasks from the job to cancelled
std::unique_ptr<sql::PreparedStatement> task_statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"UPDATE `tasks` SET `state` = 'cancel' WHERE `job_id` = ? AND "
"`state` IN ('pending', 'ready', 'running')"
)
);
sql::bytes id_bytes = uuid_get_bytes(id);
task_statement->setBytes(1, &id_bytes);
task_statement->executeUpdate();
// Set job state to cancelled
std::unique_ptr<sql::PreparedStatement> job_statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"UPDATE `jobs` SET `state` = 'cancel' WHERE `id` = ?"
)
);
job_statement->setBytes(1, &id_bytes);
job_statement->executeUpdate();
} catch (sql::SQLException& e) {
Comment thread
sitaowang1998 marked this conversation as resolved.
static_cast<MySqlConnection&>(conn)->rollback();
return StorageErr{StorageErrType::OtherErr, e.what()};
}
static_cast<MySqlConnection&>(conn)->commit();
return StorageErr{};
}

auto MySqlMetadataStorage::cancel_job_by_task(
StorageConnection& conn,
boost::uuids::uuid id,
std::string const& message
) -> StorageErr {
try {
// Get job id
sql::bytes task_id_bytes = uuid_get_bytes(id);
std::unique_ptr<sql::PreparedStatement> statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"SELECT `job_id`, `func_name` FROM `tasks` WHERE `id` = ?"
)
);
statement->setBytes(1, &task_id_bytes);
std::unique_ptr<sql::ResultSet> const res{statement->executeQuery()};
if (res->rowsCount() == 0) {
static_cast<MySqlConnection&>(conn)->rollback();
return StorageErr{StorageErrType::KeyNotFoundErr, "No task with id"};
}
res->next();
boost::uuids::uuid const job_id = read_id(res->getBinaryStream("job_id"));
sql::bytes job_id_bytes = uuid_get_bytes(job_id);
std::string const function_name = get_sql_string(res->getString("func_name"));
// Set all pending/ready/running tasks from the job to cancelled
std::unique_ptr<sql::PreparedStatement> task_statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"UPDATE `tasks` SET `state` = 'cancel' WHERE `job_id` = ? AND "
"`state` IN ('pending', 'ready', 'running')"
)
);
task_statement->setBytes(1, &job_id_bytes);
task_statement->executeUpdate();
// Set job state to cancelled
std::unique_ptr<sql::PreparedStatement> job_statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"UPDATE `jobs` SET `state` = 'cancel' WHERE `id` = ?"
)
);
job_statement->setBytes(1, &job_id_bytes);
job_statement->executeUpdate();
// Set the cancel message
std::unique_ptr<sql::PreparedStatement> message_statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"INSERT INTO `job_errors` (`job_id`, `func_name`, `message`) VALUES (?, ?, "
"?) "
)
);
message_statement->setBytes(1, &job_id_bytes);
message_statement->setString(2, function_name);
message_statement->setString(3, message);
message_statement->executeUpdate();
} catch (sql::SQLException& e) {
static_cast<MySqlConnection&>(conn)->rollback();
return StorageErr{StorageErrType::OtherErr, e.what()};
}
static_cast<MySqlConnection&>(conn)->commit();
return StorageErr{};
}

auto MySqlMetadataStorage::get_job_message(
StorageConnection& conn,
boost::uuids::uuid const id,
std::string* function_name,
std::string* message
) -> StorageErr {
try {
std::unique_ptr<sql::PreparedStatement> statement{
static_cast<MySqlConnection&>(conn)->prepareStatement(
"SELECT `func_name`, `message` FROM `job_errors` WHERE `job_id` = ?"
)
};
sql::bytes id_bytes = uuid_get_bytes(id);
statement->setBytes(1, &id_bytes);
std::unique_ptr<sql::ResultSet> const res{statement->executeQuery()};
if (res->rowsCount() == 0) {
static_cast<MySqlConnection&>(conn)->commit();
return StorageErr{StorageErrType::KeyNotFoundErr, "No messages found"};
}
res->next();
*function_name = get_sql_string(res->getString("func_name"));
*message = get_sql_string(res->getString("message"));
} catch (sql::SQLException& e) {
static_cast<MySqlConnection&>(conn)->rollback();
return StorageErr{StorageErrType::OtherErr, e.what()};
}
static_cast<MySqlConnection&>(conn)->commit();
return StorageErr{};
}

auto MySqlMetadataStorage::remove_job(StorageConnection& conn, boost::uuids::uuid id)
-> StorageErr {
try {
Expand Down Expand Up @@ -1399,6 +1517,39 @@ auto MySqlMetadataStorage::set_task_state(
return StorageErr{};
}

auto MySqlMetadataStorage::get_task_state(
StorageConnection& conn,
boost::uuids::uuid const id,
TaskState* state
) -> StorageErr {
try {
// Get the state of the task
std::unique_ptr<sql::PreparedStatement> statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"SELECT `state` FROM `tasks` WHERE `id` = ?"
)
);
sql::bytes id_bytes = uuid_get_bytes(id);
statement->setBytes(1, &id_bytes);
std::unique_ptr<sql::ResultSet> const res(statement->executeQuery());
if (res->rowsCount() == 0) {
static_cast<MySqlConnection&>(conn)->commit();
return StorageErr{
StorageErrType::KeyNotFoundErr,
fmt::format("No task with id {} ", boost::uuids::to_string(id))
};
}
res->next();
std::string const state_str = get_sql_string(res->getString("state"));
*state = string_to_task_state(state_str);
} catch (sql::SQLException& e) {
static_cast<MySqlConnection&>(conn)->rollback();
return StorageErr{StorageErrType::OtherErr, e.what()};
}
static_cast<MySqlConnection&>(conn)->commit();
return StorageErr{};
}

auto MySqlMetadataStorage::set_task_running(StorageConnection& conn, boost::uuids::uuid id)
-> StorageErr {
try {
Expand Down Expand Up @@ -1682,11 +1833,16 @@ auto MySqlMetadataStorage::task_fail(
// Set the task fail if the last task instance fails
std::unique_ptr<sql::PreparedStatement> const task_statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"UPDATE `tasks` SET `state` = 'fail' WHERE `id` = ?"
"UPDATE `tasks` SET `state` = 'fail' WHERE `id` = ? AND `state` = "
"'running'"
)
);
task_statement->setBytes(1, &task_id_bytes);
task_statement->executeUpdate();
int32_t const task_count = task_statement->executeUpdate();
if (task_count == 0) {
static_cast<MySqlConnection&>(conn)->commit();
return StorageErr{};
}
// Set the job fails
std::unique_ptr<sql::PreparedStatement> const job_statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
Expand Down
12 changes: 12 additions & 0 deletions src/spider/storage/mysql/MySqlStorage.hpp
Comment thread
sitaowang1998 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ class MySqlMetadataStorage : public MetadataStorage {
boost::uuids::uuid client_id,
std::vector<boost::uuids::uuid>* job_ids
) -> StorageErr override;
auto cancel_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override;
auto
cancel_job_by_task(StorageConnection& conn, boost::uuids::uuid id, std::string const& message)
-> StorageErr override;
auto get_job_message(
StorageConnection& conn,
boost::uuids::uuid id,
std::string* function_name,
std::string* message
) -> StorageErr override;
auto remove_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override;
auto reset_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override;
auto add_child(StorageConnection& conn, boost::uuids::uuid parent_id, Task const& child)
Expand All @@ -86,6 +96,8 @@ class MySqlMetadataStorage : public MetadataStorage {
) -> StorageErr override;
auto set_task_state(StorageConnection& conn, boost::uuids::uuid id, TaskState state)
-> StorageErr override;
auto get_task_state(StorageConnection& conn, boost::uuids::uuid id, TaskState* state)
-> StorageErr override;
auto set_task_running(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override;
auto add_task_instance(StorageConnection& conn, TaskInstance const& instance)
-> StorageErr override;
Expand Down
13 changes: 11 additions & 2 deletions src/spider/storage/mysql/mysql_stmt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ std::string const cCreateJobTable = R"(CREATE TABLE IF NOT EXISTS jobs (
PRIMARY KEY (`id`)
))";

std::string const cCreateJobErrorTable = R"(CREATE TABLE IF NOT EXISTS `job_errors` (
`job_id` BINARY(16) NOT NULL,
`func_name` VARCHAR(64) NOT NULL,
`message` VARCHAR(999) NOT NULL,
CONSTRAINT `job_error_job_id` FOREIGN KEY (`job_id`) REFERENCES `jobs` (`id`) ON UPDATE NO ACTION ON DELETE CASCADE,
PRIMARY KEY (`job_id`)
))";

std::string const cCreateTaskTable = R"(CREATE TABLE IF NOT EXISTS tasks (
`id` BINARY(16) NOT NULL,
`job_id` BINARY(16) NOT NULL,
Expand Down Expand Up @@ -167,10 +175,11 @@ std::string const cCreateTaskKVDataTable = R"(CREATE TABLE IF NOT EXISTS `task_k
CONSTRAINT `kv_data_task_id` FOREIGN KEY (`task_id`) REFERENCES `tasks` (`id`) ON UPDATE NO ACTION ON DELETE CASCADE
))";

std::array<std::string const, 17> const cCreateStorage = {
std::array<std::string const, 18> const cCreateStorage = {
cCreateDriverTable, // drivers table must be created before data_ref_driver
cCreateSchedulerTable,
cCreateJobTable, // jobs table must be created before task
cCreateJobTable, // jobs table must be created before task and job_error
cCreateJobErrorTable,
cCreateTaskTable, // tasks table must be created before data_ref_task
cCreateDataTable, // data table must be created before task_outputs
cCreateDataLocalityTable,
Expand Down
Loading