Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/atframe/modules/worker_pool_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ class UTIL_SYMBOL_LOCAL worker_pool_module::worker : public std::enable_shared_f
worker_tick_action_container_type tick_handles_;
std::recursive_mutex tick_handle_lock_;
std::atomic<std::chrono::microseconds::rep> current_tick_interval_us_;
std::atomic<std::chrono::microseconds::rep> current_tick_second_busy_us_;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEDIUM
·
correctness

新增的原子成员变量 current_tick_second_busy_us_current_tick_second_waited_us_ 未在 diff 中显示初始化。std::atomic 的默认构造函数对于整型类型会保留未初始化的值。如果 worker 类的构造函数没有显式初始化这些变量,它们将持有不确定的值,导致首次使用时的行为未定义。

建议在构造函数中添加初始化,例如:

current_tick_second_busy_us_.store(0, std::memory_order_release);
current_tick_second_waited_us_.store(0, std::memory_order_release);

或使用成员初始化列表/默认成员初始化器。

Location: src/atframe/modules/worker_pool_module.cpp:282-283

Referenced code (src/atframe/modules/worker_pool_module.cpp:282-283):

std::recursive_mutex tick_handle_lock_;
std::atomic<std::chrono::microseconds::rep> current_tick_interval_us_;
std::atomic<std::chrono::microseconds::rep> current_tick_second_busy_us_;
std::atomic<std::chrono::microseconds::rep> current_tick_second_waited_us_;

std::atomic<std::chrono::microseconds::rep> cpu_time_busy_us_;

std::atomic<std::chrono::microseconds::rep> current_tick_second_waited_us_;
Comment thread
yousongyang marked this conversation as resolved.
Comment on lines +282 to +283

std::atomic<std::chrono::microseconds::rep> cpu_time_busy_us_;
std::atomic<std::chrono::microseconds::rep> cpu_time_sleep_us_;
Expand All @@ -299,6 +301,7 @@ struct UTIL_SYMBOL_LOCAL worker_pool_module::worker_set {

std::atomic<int64_t> configure_tick_min_interval_microseconds;
std::atomic<int64_t> configure_tick_max_interval_microseconds;
std::atomic<int64_t> configure_tick_preserve_microseconds_in_second;

std::recursive_mutex worker_lock;
std::vector<std::shared_ptr<worker>> workers;
Expand Down Expand Up @@ -353,6 +356,9 @@ worker_pool_module::worker::worker(worker_pool_module::worker_set& owner, uint32
current_tick_interval_us_.store(4000, std::memory_order_release);
}

current_tick_second_busy_us_.store(0, std::memory_order_release);
current_tick_second_waited_us_.store(0, std::memory_order_release);

cpu_time_busy_us_.store(0, std::memory_order_release);
cpu_time_sleep_us_.store(0, std::memory_order_release);
cpu_time_last_second_busy_us_.store(0, std::memory_order_release);
Expand Down Expand Up @@ -453,6 +459,9 @@ void worker_pool_module::worker::start(const std::shared_ptr<worker>& self, cons
}

// Maybe sleep until timeout or next event
self->current_tick_second_busy_us_.fetch_add(
std::chrono::duration_cast<std::chrono::microseconds>(busy_end_time - start_time).count(),
std::memory_order_release);
if (busy_end_time - start_time < tick_interval) {
std::unique_lock<std::mutex> lk_cv(self->waker_lock_);

Expand All @@ -465,6 +474,33 @@ void worker_pool_module::worker::start(const std::shared_ptr<worker>& self, cons
auto sleep_rep =
std::chrono::duration_cast<std::chrono::microseconds>(sleep_end_time - busy_end_time).count();
self->cpu_time_sleep_us_.fetch_add(sleep_rep, std::memory_order_release);
self->current_tick_second_waited_us_.fetch_add(sleep_rep, std::memory_order_release);
}
}

Comment on lines +478 to +480
if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEDIUM
·
concurrency

current_tick_second_busy_us_current_tick_second_waited_us_ 的检查与重置不是原子操作。这两个变量都是 std::atomic,表明可能被其他线程读取(如监控统计)。在多线程读取场景下,检查两个变量之和与重置之间存在竞态:其他线程可能在两个 store(0) 之间读取到不一致的状态(如 busy=0 但 waited 仍为旧值)。建议使用单一的原子变量或互斥锁保护这对计数器的检查-重置操作。

Location: src/atframe/modules/worker_pool_module.cpp:478-482

Referenced code (src/atframe/modules/worker_pool_module.cpp:478-482):

    }

    if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
            self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
        1000000) {
      self->current_tick_second_busy_us_.store(0, std::memory_order_release);
      self->current_tick_second_waited_us_.store(0, std::memory_order_release);

      auto wait_preserve_us =

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOW
·
concurrency

非原子的计数器检查与重置操作可能导致监控数据瞬时不一致。current_tick_second_busy_us_current_tick_second_waited_us_ 的读取和重置分为四个独立的原子操作,其他线程在读取这两个值用于监控时可能观察到:一个已重置而另一个未重置的中间状态,导致该瞬间的 busy + waited 总和被低估。对于监控用途这种情况通常可接受,但如果需要精确一致性,应考虑使用互斥锁或将两个计数器合并为单个 128 位原子值。

Location: src/atframe/modules/worker_pool_module.cpp:481

Referenced code (src/atframe/modules/worker_pool_module.cpp:481):

    }

    if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
            self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
        1000000) {

Suggested fix:
如果监控需要精确一致性,考虑使用 mutex 保护这两个计数器的读写,或使用单个 64 位原子值存储总时间(busy + waited)。

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEDIUM
·
concurrency

对两个独立的原子变量分别执行 load 后求和判断、再分别 store 重置的操作整体上不是原子的。当前所有读写操作均在同一个工作线程内顺序执行,暂时不存在实际的竞态条件。但这些原子变量使用了 memory_order_acquire/release 语义,暗示设计上可能预期被其他线程读取(如监控场景)。如果未来添加监控线程读取这些统计值,检查与重置之间的时间窗口可能导致读取到不一致状态(一个已重置、另一个未重置)。建议考虑使用单一原子变量累计时间,或在需要跨线程读取时使用互斥锁保护复合操作。

Location: src/atframe/modules/worker_pool_module.cpp:481

Referenced code (src/atframe/modules/worker_pool_module.cpp:481):

    }

    if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
            self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
        1000000) {

Suggested fix:
考虑将 busy 和 waited 时间合并到单一原子变量中累积,或使用互斥锁保护检查和重置的复合操作。

self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
1000000) {
self->current_tick_second_busy_us_.store(0, std::memory_order_release);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOW
·
correctness

当累计时间超过 1 秒时,超出部分会被丢弃。例如:busy_us 从 950,000 增加到 1,150,000 后触发重置,额外的 150,000 微秒将丢失,导致统计不准确。建议在重置时保留超出部分的值,或使用取模操作计算实际保留的时间。

Location: src/atframe/modules/worker_pool_module.cpp:481-482

Referenced code (src/atframe/modules/worker_pool_module.cpp:481-482):

            self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
        1000000) {
      self->current_tick_second_busy_us_.store(0, std::memory_order_release);
      self->current_tick_second_waited_us_.store(0, std::memory_order_release);

      auto wait_preserve_us =

Suggested fix:
在重置时保留超出 1 秒的部分,例如:auto overflow = busy_val + waited_val - 1000000; current_tick_second_busy_us_.store(overflow, ...);

self->current_tick_second_waited_us_.store(0, std::memory_order_release);

auto wait_preserve_us =
self->get_owner().configure_tick_preserve_microseconds_in_second.load(std::memory_order_acquire);
if (wait_preserve_us <= 0) {
wait_preserve_us = 8000;
}

std::chrono::system_clock::time_point preserve_sleep_start = std::chrono::system_clock::now();
std::unique_lock<std::mutex> lk_cv(self->waker_lock_);
self->status_.store(static_cast<uint8_t>(worker_status::kSleeping), std::memory_order_release);
self->waker_cv_.wait_for(lk_cv, std::chrono::microseconds(wait_preserve_us));
self->status_.store(static_cast<uint8_t>(worker_status::kRunning), std::memory_order_release);

std::chrono::system_clock::time_point sleep_end_time = std::chrono::system_clock::now();
if (sleep_end_time > preserve_sleep_start) {
auto sleep_rep =
std::chrono::duration_cast<std::chrono::microseconds>(sleep_end_time - preserve_sleep_start).count();
self->cpu_time_sleep_us_.fetch_add(sleep_rep, std::memory_order_release);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEDIUM
·
correctness

preserve sleep 时间被添加到 cpu_time_sleep_us_ 但未添加到 current_tick_second_waited_us_。这与普通 sleep 路径(第 473-474 行)的行为不一致,后者同时更新两个计数器。缺失的更新会导致 per-second 时间追踪不准确,并可能影响 preserve 触发周期的规律性。

触发场景:当 worker 的 busy + waited 时间累计超过 1 秒后触发 preserve sleep 时,该 sleep 时长不会被计入下一周期的 waited 累计值。

建议修复:在第 500 行的 fetch_add 之后添加 self->current_tick_second_waited_us_.fetch_add(sleep_rep, std::memory_order_release);

Location: src/atframe/modules/worker_pool_module.cpp:500

Referenced code (src/atframe/modules/worker_pool_module.cpp:500):

        auto sleep_rep =
            std::chrono::duration_cast<std::chrono::microseconds>(sleep_end_time - preserve_sleep_start).count();
        self->cpu_time_sleep_us_.fetch_add(sleep_rep, std::memory_order_release);
      }
    }

}
}
}
Expand Down Expand Up @@ -578,6 +614,7 @@ worker_pool_module::worker_set::worker_set()
current_expect_workers.store(2, std::memory_order_release);
configure_tick_min_interval_microseconds.store(4, std::memory_order_release);
configure_tick_max_interval_microseconds.store(128, std::memory_order_release);
configure_tick_preserve_microseconds_in_second.store(8000, std::memory_order_release);
}

LIBATAPP_MACRO_API worker_pool_module::worker_pool_module()
Expand Down
Loading