Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/atframe/modules/worker_pool_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ class UTIL_SYMBOL_LOCAL worker_pool_module::worker : public std::enable_shared_f
worker_tick_action_container_type tick_handles_;
std::recursive_mutex tick_handle_lock_;
std::atomic<std::chrono::microseconds::rep> current_tick_interval_us_;
std::atomic<std::chrono::microseconds::rep> current_tick_second_busy_us_;
std::atomic<std::chrono::microseconds::rep> current_tick_second_waited_us_;
Comment thread
yousongyang marked this conversation as resolved.

std::atomic<std::chrono::microseconds::rep> cpu_time_busy_us_;
std::atomic<std::chrono::microseconds::rep> cpu_time_sleep_us_;
Expand All @@ -299,6 +301,7 @@ struct UTIL_SYMBOL_LOCAL worker_pool_module::worker_set {

std::atomic<int64_t> configure_tick_min_interval_microseconds;
std::atomic<int64_t> configure_tick_max_interval_microseconds;
std::atomic<int64_t> configure_tick_preserve_microseconds_in_second;

std::recursive_mutex worker_lock;
std::vector<std::shared_ptr<worker>> workers;
Expand Down Expand Up @@ -453,6 +456,9 @@ void worker_pool_module::worker::start(const std::shared_ptr<worker>& self, cons
}

// Maybe sleep until timeout or next event
self->current_tick_second_busy_us_.fetch_add(
std::chrono::duration_cast<std::chrono::microseconds>(busy_end_time - start_time).count(),
std::memory_order_release);
if (busy_end_time - start_time < tick_interval) {
std::unique_lock<std::mutex> lk_cv(self->waker_lock_);

Expand All @@ -465,6 +471,33 @@ void worker_pool_module::worker::start(const std::shared_ptr<worker>& self, cons
auto sleep_rep =
std::chrono::duration_cast<std::chrono::microseconds>(sleep_end_time - busy_end_time).count();
self->cpu_time_sleep_us_.fetch_add(sleep_rep, std::memory_order_release);
self->current_tick_second_waited_us_.fetch_add(sleep_rep, std::memory_order_release);
}
}

if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEDIUM
·
concurrency

current_tick_second_busy_us_current_tick_second_waited_us_ 的检查与重置不是原子操作。这两个变量都是 std::atomic,表明可能被其他线程读取(如监控统计)。在多线程读取场景下,检查两个变量之和与重置之间存在竞态:其他线程可能在两个 store(0) 之间读取到不一致的状态(如 busy=0 但 waited 仍为旧值)。建议使用单一的原子变量或互斥锁保护这对计数器的检查-重置操作。

Location: src/atframe/modules/worker_pool_module.cpp:478-482

Referenced code (src/atframe/modules/worker_pool_module.cpp:478-482):

    }

    if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
            self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
        1000000) {
      self->current_tick_second_busy_us_.store(0, std::memory_order_release);
      self->current_tick_second_waited_us_.store(0, std::memory_order_release);

      auto wait_preserve_us =

self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
1000000) {
self->current_tick_second_busy_us_.store(0, std::memory_order_release);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOW
·
correctness

当累计时间超过 1 秒时,超出部分会被丢弃。例如:busy_us 从 950,000 增加到 1,150,000 后触发重置,额外的 150,000 微秒将丢失,导致统计不准确。建议在重置时保留超出部分的值,或使用取模操作计算实际保留的时间。

Location: src/atframe/modules/worker_pool_module.cpp:481-482

Referenced code (src/atframe/modules/worker_pool_module.cpp:481-482):

            self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
        1000000) {
      self->current_tick_second_busy_us_.store(0, std::memory_order_release);
      self->current_tick_second_waited_us_.store(0, std::memory_order_release);

      auto wait_preserve_us =

Suggested fix:
在重置时保留超出 1 秒的部分,例如:auto overflow = busy_val + waited_val - 1000000; current_tick_second_busy_us_.store(overflow, ...);

self->current_tick_second_waited_us_.store(0, std::memory_order_release);

auto wait_preserve_us =
self->get_owner().configure_tick_preserve_microseconds_in_second.load(std::memory_order_acquire);
if (wait_preserve_us <= 0) {
wait_preserve_us = 8000;
}

std::chrono::system_clock::time_point preserve_sleep_start = std::chrono::system_clock::now();
std::unique_lock<std::mutex> lk_cv(self->waker_lock_);
self->status_.store(static_cast<uint8_t>(worker_status::kSleeping), std::memory_order_release);
self->waker_cv_.wait_for(lk_cv, std::chrono::microseconds(wait_preserve_us));
self->status_.store(static_cast<uint8_t>(worker_status::kRunning), std::memory_order_release);

std::chrono::system_clock::time_point sleep_end_time = std::chrono::system_clock::now();
if (sleep_end_time > preserve_sleep_start) {
auto sleep_rep =
std::chrono::duration_cast<std::chrono::microseconds>(sleep_end_time - preserve_sleep_start).count();
self->cpu_time_sleep_us_.fetch_add(sleep_rep, std::memory_order_release);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEDIUM
·
correctness

preserve sleep 时间被添加到 cpu_time_sleep_us_ 但未添加到 current_tick_second_waited_us_。这与普通 sleep 路径(第 473-474 行)的行为不一致,后者同时更新两个计数器。缺失的更新会导致 per-second 时间追踪不准确,并可能影响 preserve 触发周期的规律性。

触发场景:当 worker 的 busy + waited 时间累计超过 1 秒后触发 preserve sleep 时,该 sleep 时长不会被计入下一周期的 waited 累计值。

建议修复:在第 500 行的 fetch_add 之后添加 self->current_tick_second_waited_us_.fetch_add(sleep_rep, std::memory_order_release);

Location: src/atframe/modules/worker_pool_module.cpp:500

Referenced code (src/atframe/modules/worker_pool_module.cpp:500):

        auto sleep_rep =
            std::chrono::duration_cast<std::chrono::microseconds>(sleep_end_time - preserve_sleep_start).count();
        self->cpu_time_sleep_us_.fetch_add(sleep_rep, std::memory_order_release);
      }
    }

}
}
}
Expand Down Expand Up @@ -578,6 +611,7 @@ worker_pool_module::worker_set::worker_set()
current_expect_workers.store(2, std::memory_order_release);
configure_tick_min_interval_microseconds.store(4, std::memory_order_release);
configure_tick_max_interval_microseconds.store(128, std::memory_order_release);
configure_tick_preserve_microseconds_in_second.store(8000, std::memory_order_release);
}

LIBATAPP_MACRO_API worker_pool_module::worker_pool_module()
Expand Down
Loading