From 39a3e6c752c52211260b0967abe4d378f57bf1c7 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Fri, 2 Aug 2024 16:43:47 +0800 Subject: [PATCH 01/24] make Poco::ActiveThreadPool easy to use (#4544) --- Foundation/include/Poco/ActiveThreadPool.h | 88 +--- Foundation/src/ActiveThreadPool.cpp | 547 +++++++++++++-------- 2 files changed, 372 insertions(+), 263 deletions(-) diff --git a/Foundation/include/Poco/ActiveThreadPool.h b/Foundation/include/Poco/ActiveThreadPool.h index 5c04a8ff02..ac28051097 100644 --- a/Foundation/include/Poco/ActiveThreadPool.h +++ b/Foundation/include/Poco/ActiveThreadPool.h @@ -20,38 +20,34 @@ #include "Poco/Foundation.h" #include "Poco/Thread.h" -#include "Poco/Mutex.h" #include "Poco/Environment.h" -#include namespace Poco { class Runnable; -class ActiveThread; +class ActiveThreadPoolPrivate; class Foundation_API ActiveThreadPool - /// A thread pool always keeps a number of threads running, ready - /// to accept work. - /// Threads in an active thread pool are re-used - /// Every thread in the pool has own notification-queue with Runnable - /// Every Runnable executes on next thread (round-robin model) - /// The thread pool always keeps fixed number of threads running. + /// A thread pool manages and recycles individual Poco::Thread objects + /// to help reduce thread creation costs in programs that use threads. + /// + /// The thread pool supports a task queue. + /// When there are no idle threads, tasks are placed in the task queue to wait for execution. /// Use case for this pool is running many (more than os-max-thread-count) short live tasks - /// Round-robin model allow efficiently utilize cpu cores { public: ActiveThreadPool(int capacity = static_cast(Environment::processorCount()) + 1, int stackSize = POCO_THREAD_STACK_SIZE); - /// Creates a thread pool with fixed capacity threads. + /// Creates a thread pool with a maximum thread count of capacity. /// Threads are created with given stack size. - ActiveThreadPool(std::string name, + ActiveThreadPool(const std::string& name, int capacity = static_cast(Environment::processorCount()) + 1, int stackSize = POCO_THREAD_STACK_SIZE); - /// Creates a thread pool with the given name and fixed capacity threads. + /// Creates a thread pool with the given name and a maximum thread count of capacity. /// Threads are created with given stack size. ~ActiveThreadPool(); @@ -64,39 +60,19 @@ class Foundation_API ActiveThreadPool int getStackSize() const; /// Returns the stack size used to create new threads. - void start(Runnable& target); - /// Obtains a thread and starts the target. + int expiryTimeout() const; + /// Returns the thread expiry timeout value in milliseconds. + /// The default expiryTimeout is 30000 milliseconds (30 seconds). + + void setExpiryTimeout(int expiryTimeout); + /// Set the thread expiry timeout value in milliseconds. + /// The default expiryTimeout is 30000 milliseconds (30 seconds). - void start(Runnable& target, const std::string& name); + void start(Runnable& target, int priority = 0); /// Obtains a thread and starts the target. - /// Assigns the given name to the thread. - - void startWithPriority(Thread::Priority priority, Runnable& target); - /// Obtains a thread, adjusts the thread's priority, and starts the target. - - void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name); - /// Obtains a thread, adjusts the thread's priority, and starts the target. - /// Assigns the given name to the thread. - - void stopAll(); - /// Stops all running threads and waits for their completion. - /// - /// Will also delete all thread objects. - /// If used, this method should be the last action before - /// the thread pool is deleted. - /// - /// Note: If a thread fails to stop within 10 seconds - /// (due to a programming error, for example), the - /// underlying thread object will not be deleted and - /// this method will return anyway. This allows for a - /// more or less graceful shutdown in case of a misbehaving - /// thread. void joinAll(); - /// Waits for all threads to complete. - /// - /// Note that this will join() underlying - /// threads and restart them for next tasks. + /// Waits for all threads to exit and removes all threads from the thread pool. const std::string& name() const; /// Returns the name of the thread pool, @@ -107,38 +83,14 @@ class Foundation_API ActiveThreadPool /// Returns a reference to the default /// thread pool. -protected: - ActiveThread* getThread(); - ActiveThread* createThread(); - private: ActiveThreadPool(const ActiveThreadPool& pool); ActiveThreadPool& operator = (const ActiveThreadPool& pool); - typedef std::vector ThreadVec; - - std::string _name; - int _capacity; - int _serial; - int _stackSize; - ThreadVec _threads; - mutable FastMutex _mutex; - std::atomic _lastThreadIndex{0}; +private: + ActiveThreadPoolPrivate* m_impl; }; - -inline int ActiveThreadPool::getStackSize() const -{ - return _stackSize; -} - - -inline const std::string& ActiveThreadPool::name() const -{ - return _name; -} - - } // namespace Poco diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 91a6099d89..233de207a7 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -15,347 +15,504 @@ #include "Poco/ActiveThreadPool.h" #include "Poco/Runnable.h" #include "Poco/Thread.h" -#include "Poco/Event.h" #include "Poco/ThreadLocal.h" #include "Poco/ErrorHandler.h" -#include "Poco/NotificationQueue.h" +#include "Poco/Condition.h" #include -#include +#include +#include namespace Poco { -class NewActionNotification: public Notification +class QueuePage { public: - using Ptr = AutoPtr; + enum + { + MaxPageSize = 256 + }; - NewActionNotification(Thread::Priority priority, Runnable& runnable, const std::string& name) : - _priority(priority), - _runnable(runnable), - _name(name) + QueuePage(Runnable* runnable, int priority): + _priority(priority) { + push(runnable); } - ~NewActionNotification() override = default; - - Runnable& runnable() const + bool isFull() { - return _runnable; + return _lastIndex >= MaxPageSize - 1; } - Thread::Priority priority() const + bool isFinished() { - return _priority; + return _firstIndex > _lastIndex; } - const std::string &threadName() const + void push(Runnable* runnable) { - return _name; + poco_assert(runnable); + poco_assert(!isFull()); + _lastIndex += 1; + _entries[_lastIndex] = runnable; } - std::string threadFullName() const + void skipToNextOrEnd() { - std::string fullName(_name); - if (_name.empty()) + while (!isFinished() && _entries[_firstIndex] == nullptr) { - fullName = _name; + _firstIndex += 1; } - else + } + + Runnable* first() + { + poco_assert(!isFinished()); + Runnable* runnable = _entries[_firstIndex]; + poco_assert(runnable); + return runnable; + } + + Runnable* pop() + { + poco_assert(!isFinished()); + Runnable* runnable = first(); + poco_assert(runnable); + + // clear the entry although this should not be necessary + _entries[_firstIndex] = nullptr; + _firstIndex += 1; + + // make sure the next runnable returned by first() is not a nullptr + skipToNextOrEnd(); + + return runnable; + } + + bool tryTake(Runnable* runnable) + { + poco_assert(!isFinished()); + for (int i = _firstIndex; i <= _lastIndex; i++) { - fullName.append(" ("); - fullName.append(_name); - fullName.append(")"); + if (_entries[i] == runnable) + { + _entries[i] = nullptr; + if (i == _firstIndex) + { + // make sure first() does not return a nullptr + skipToNextOrEnd(); + } + return true; + } } - return fullName; + return false; + } + + int priority() const + { + return _priority; } private: - std::atomic _priority; - Runnable& _runnable; - std::string _name; + int _priority = 0; + int _firstIndex = 0; + int _lastIndex = -1; + Runnable* _entries[MaxPageSize]; }; -class ActiveThread: public Runnable + +class ActivePooledThread: public Runnable { public: - ActiveThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE); - ~ActiveThread() override = default; + ActivePooledThread(ActiveThreadPoolPrivate* pool); void start(); - void start(Thread::Priority priority, Runnable& target); - void start(Thread::Priority priority, Runnable& target, const std::string& name); void join(); - void release(); - void run() override; + bool isRunning() const; + + void setRunnable(Runnable* runnable); + void notifyRunnableReady(); + void registerThreadInactive(); + + virtual void run() override; private: - NotificationQueue _pTargetQueue; - std::string _name; - Thread _thread; - Event _targetCompleted; - FastMutex _mutex; - const long JOIN_TIMEOUT = 10000; - std::atomic _needToStop{false}; + ActiveThreadPoolPrivate* _pool; + Runnable* _runnable; + Condition _runnableReady; + Thread _thread; }; -ActiveThread::ActiveThread(const std::string& name, int stackSize): - _name(name), - _thread(name), - _targetCompleted(Event::EVENT_MANUALRESET) +class ActiveThreadPoolPrivate { - poco_assert_dbg (stackSize >= 0); - _thread.setStackSize(stackSize); +public: + ActiveThreadPoolPrivate(int capacity, int stackSize); + ActiveThreadPoolPrivate(int capacity, int stackSize, const std::string& name); + ~ActiveThreadPoolPrivate(); + + bool tryStart(Runnable* runnable); + void enqueueTask(Runnable* runnable, int priority = 0); + void startThread(Runnable* runnable); + void joinAll(); + + int activeThreadCount() const; + +public: + mutable FastMutex mutex; + std::string name; + std::set allThreads; + std::list waitingThreads; + std::list expiredThreads; + std::list runnables; + Condition noActiveThreads; + + int expiryTimeout = 30000; + int maxThreadCount; + int stackSize; + int activeThreads = 0; + int serial = 0; +}; + + +ActivePooledThread::ActivePooledThread(ActiveThreadPoolPrivate* pool): + _pool(pool), + _runnable(nullptr) +{ + poco_assert(_pool != nullptr); + std::ostringstream name; + name << _pool->name << "[#" << ++_pool->serial << "]"; + _thread.setName(name.str()); + _thread.setStackSize(_pool->stackSize); } -void ActiveThread::start() + +void ActivePooledThread::start() { - _needToStop = false; _thread.start(*this); } -void ActiveThread::start(Thread::Priority priority, Runnable& target) +void ActivePooledThread::setRunnable(Runnable* runnable) { - _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, _name)); + _runnable = runnable; } -void ActiveThread::start(Thread::Priority priority, Runnable& target, const std::string& name) +void ActivePooledThread::notifyRunnableReady() { - _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, name)); + _runnableReady.signal(); } -void ActiveThread::join() -{ - _pTargetQueue.wakeUpAll(); - if (!_pTargetQueue.empty()) - { - _targetCompleted.wait(); - } +bool ActivePooledThread::isRunning() const +{ + return _thread.isRunning(); } -void ActiveThread::release() +void ActivePooledThread::join() { - // In case of a statically allocated thread pool (such - // as the default thread pool), Windows may have already - // terminated the thread before we got here. - if (_thread.isRunning()) - { - _needToStop = true; - _pTargetQueue.wakeUpAll(); - if (!_pTargetQueue.empty()) - _targetCompleted.wait(JOIN_TIMEOUT); - } - - if (_thread.tryJoin(JOIN_TIMEOUT)) - { - delete this; - } + _thread.join(); } -void ActiveThread::run() +void ActivePooledThread::run() { - do + FastMutex::ScopedLock lock(_pool->mutex); + for (;;) { - AutoPtr pN = _pTargetQueue.waitDequeueNotification(); - while (pN) + Runnable* r = _runnable; + _runnable = nullptr; + + do { - NewActionNotification::Ptr pNAN = pN.cast(); - Runnable& target = pNAN->runnable(); - _thread.setPriority(pNAN->priority()); - _thread.setName(pNAN->name()); - try - { - target.run(); - } - catch (Exception& exc) + if (r) { - ErrorHandler::handle(exc); + _pool->mutex.unlock(); + try + { + r->run(); + } + catch (Exception& exc) + { + ErrorHandler::handle(exc); + } + catch (std::exception& exc) + { + ErrorHandler::handle(exc); + } + catch (...) + { + ErrorHandler::handle(); + } + ThreadLocalStorage::clear(); + _pool->mutex.lock(); } - catch (std::exception& exc) + + if (_pool->runnables.empty()) { - ErrorHandler::handle(exc); + r = nullptr; + break; } - catch (...) - { - ErrorHandler::handle(); + + QueuePage* page = _pool->runnables.front(); + r = page->pop(); + + if (page->isFinished()) { + _pool->runnables.pop_front(); + delete page; } - _thread.setName(_name); - _thread.setPriority(Thread::PRIO_NORMAL); - ThreadLocalStorage::clear(); - pN = _pTargetQueue.waitDequeueNotification(1000); + } while (true); + + _pool->waitingThreads.push_back(this); + registerThreadInactive(); + // wait for work, exiting after the expiry timeout is reached + _runnableReady.tryWait(_pool->mutex, _pool->expiryTimeout); + ++_pool->activeThreads; + + auto it = std::find(_pool->waitingThreads.begin(), _pool->waitingThreads.end(), this); + if (it != _pool->waitingThreads.end()) + { + _pool->waitingThreads.erase(it); + _pool->expiredThreads.push_back(this); + registerThreadInactive(); + break; + } + + if (!_pool->allThreads.count(this)) + { + registerThreadInactive(); + break; } - _targetCompleted.set(); } - while (_needToStop == false); } -ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize): - _capacity(capacity), - _serial(0), - _stackSize(stackSize), - _lastThreadIndex(0) +void ActivePooledThread::registerThreadInactive() { - poco_assert (_capacity >= 1); - - _threads.reserve(_capacity); - - for (int i = 0; i < _capacity; i++) + if (--_pool->activeThreads == 0) { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); + _pool->noActiveThreads.broadcast(); } } -ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize): - _name(std::move(name)), - _capacity(capacity), - _serial(0), - _stackSize(stackSize), - _lastThreadIndex(0) +ActiveThreadPoolPrivate::ActiveThreadPoolPrivate(int capacity, int stackSize_): + maxThreadCount(capacity), + stackSize(stackSize_) { - poco_assert (_capacity >= 1); +} - _threads.reserve(_capacity); - for (int i = 0; i < _capacity; i++) - { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } +ActiveThreadPoolPrivate::ActiveThreadPoolPrivate(int capacity, int stackSize_, const std::string& name_): + name(name_), + maxThreadCount(capacity), + stackSize(stackSize_) +{ } -ActiveThreadPool::~ActiveThreadPool() +ActiveThreadPoolPrivate::~ActiveThreadPoolPrivate() { - try + joinAll(); +} + + +bool ActiveThreadPoolPrivate::tryStart(Runnable* runnable) +{ + poco_assert(runnable != nullptr); + if (allThreads.empty()) { - stopAll(); + startThread(runnable); + return true; } - catch (...) + + if (activeThreadCount() >= maxThreadCount) { - poco_unexpected(); + return false; } -} + if (!waitingThreads.empty()) { + // recycle an available thread + enqueueTask(runnable); + ActivePooledThread* pThread = waitingThreads.front(); + waitingThreads.pop_front(); + pThread->notifyRunnableReady(); + return true; + } -int ActiveThreadPool::capacity() const -{ - return _capacity; + if (!expiredThreads.empty()) { + // restart an expired thread + ActivePooledThread* pThread = expiredThreads.front(); + expiredThreads.pop_front(); + + ++activeThreads; + + pThread->setRunnable(runnable); + pThread->start(); + return true; + } + + // start a new thread + startThread(runnable); + return true; } -void ActiveThreadPool::start(Runnable& target) +inline bool comparePriority(int priority, const QueuePage* p) { - getThread()->start(Thread::PRIO_NORMAL, target); + return p->priority() < priority; } -void ActiveThreadPool::start(Runnable& target, const std::string& name) +void ActiveThreadPoolPrivate::enqueueTask(Runnable* runnable, int priority) { - getThread()->start(Thread::PRIO_NORMAL, target, name); + poco_assert(runnable != nullptr); + for (QueuePage* page : runnables) { + if (page->priority() == priority && !page->isFull()) { + page->push(runnable); + return; + } + } + auto it = std::upper_bound(runnables.begin(), runnables.end(), priority, comparePriority); + runnables.insert(it, new QueuePage(runnable, priority)); } -void ActiveThreadPool::startWithPriority(Thread::Priority priority, Runnable& target) +int ActiveThreadPoolPrivate::activeThreadCount() const { - getThread()->start(priority, target); + return (int)(allThreads.size() - expiredThreads.size() - waitingThreads.size()); } -void ActiveThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name) +void ActiveThreadPoolPrivate::startThread(Runnable* runnable) { - getThread()->start(priority, target, name); + poco_assert(runnable != nullptr); + auto pThread = new ActivePooledThread(this); + allThreads.insert(pThread); + ++activeThreads; + pThread->setRunnable(runnable); + pThread->start(); } -void ActiveThreadPool::stopAll() +void ActiveThreadPoolPrivate::joinAll() { - FastMutex::ScopedLock lock(_mutex); + FastMutex::ScopedLock lock(mutex); + + do { + while (!runnables.empty() || activeThreads != 0) + { + noActiveThreads.wait(mutex); + } + + // move the contents of the set out so that we can iterate without the lock + std::set allThreadsCopy; + allThreadsCopy.swap(allThreads); + expiredThreads.clear(); + waitingThreads.clear(); + mutex.unlock(); + + for (ActivePooledThread* pThread : allThreadsCopy) { + if (pThread->isRunning()) { + pThread->notifyRunnableReady(); + pThread->join(); + } + delete pThread; + } - for (auto pThread: _threads) + mutex.lock(); + + // More threads can be started during reset(), in that case continue + // waiting if we still have time left. + } while (!runnables.empty() || activeThreads != 0); + + while (!runnables.empty() || activeThreads != 0) { - pThread->release(); + noActiveThreads.wait(mutex); } - _threads.clear(); } -void ActiveThreadPool::joinAll() +ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize): + m_impl(new ActiveThreadPoolPrivate(capacity, stackSize)) { - FastMutex::ScopedLock lock(_mutex); - - for (auto pThread: _threads) - { - pThread->join(); - } +} - _threads.clear(); - _threads.reserve(_capacity); - for (int i = 0; i < _capacity; i++) - { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } +ActiveThreadPool::ActiveThreadPool(const std::string& name, int capacity, int stackSize): + m_impl(new ActiveThreadPoolPrivate(capacity, stackSize, name)) +{ } -ActiveThread* ActiveThreadPool::getThread() + +ActiveThreadPool::~ActiveThreadPool() { - auto thrSize = _threads.size(); - auto i = (_lastThreadIndex++) % thrSize; - ActiveThread* pThread = _threads[i]; - return pThread; + delete m_impl; + m_impl = nullptr; } -ActiveThread* ActiveThreadPool::createThread() +int ActiveThreadPool::capacity() const { - std::ostringstream name; - name << _name << "[#active-thread-" << ++_serial << "]"; - return new ActiveThread(name.str(), _stackSize); + return m_impl->maxThreadCount; } -class ActiveThreadPoolSingletonHolder +void ActiveThreadPool::start(Runnable& target, int priority) { -public: - ActiveThreadPoolSingletonHolder() = default; - ~ActiveThreadPoolSingletonHolder() - { - delete _pPool; - } - ActiveThreadPool* pool() - { - FastMutex::ScopedLock lock(_mutex); + FastMutex::ScopedLock lock(m_impl->mutex); + + if (!m_impl->tryStart(&target)) { + m_impl->enqueueTask(&target, priority); - if (!_pPool) + if (!m_impl->waitingThreads.empty()) { - _pPool = new ActiveThreadPool("default-active"); + ActivePooledThread* pThread = m_impl->waitingThreads.front(); + m_impl->waitingThreads.pop_front(); + pThread->notifyRunnableReady(); } - return _pPool; } +} -private: - ActiveThreadPool* _pPool{nullptr}; - FastMutex _mutex; -}; + +void ActiveThreadPool::joinAll() +{ + m_impl->joinAll(); +} ActiveThreadPool& ActiveThreadPool::defaultPool() { - static ActiveThreadPoolSingletonHolder sh; - return *sh.pool(); + static ActiveThreadPool thePool; + return thePool; } +int ActiveThreadPool::getStackSize() const +{ + return m_impl->stackSize; +} + + +int ActiveThreadPool::expiryTimeout() const +{ + return m_impl->expiryTimeout; +} + + +void ActiveThreadPool::setExpiryTimeout(int expiryTimeout) +{ + if (m_impl->expiryTimeout != expiryTimeout) + { + m_impl->expiryTimeout = expiryTimeout; + } +} + + +const std::string& ActiveThreadPool::name() const +{ + return m_impl->name; +} + } // namespace Poco From acdb9ba6a690bec468e949d89124bee762a80b96 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Tue, 6 Aug 2024 10:21:18 +0800 Subject: [PATCH 02/24] code format --- Foundation/src/ActiveThreadPool.cpp | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 233de207a7..c774ad6723 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -258,7 +258,8 @@ void ActivePooledThread::run() QueuePage* page = _pool->runnables.front(); r = page->pop(); - if (page->isFinished()) { + if (page->isFinished()) + { _pool->runnables.pop_front(); delete page; } @@ -332,7 +333,8 @@ bool ActiveThreadPoolPrivate::tryStart(Runnable* runnable) return false; } - if (!waitingThreads.empty()) { + if (!waitingThreads.empty()) + { // recycle an available thread enqueueTask(runnable); ActivePooledThread* pThread = waitingThreads.front(); @@ -341,7 +343,8 @@ bool ActiveThreadPoolPrivate::tryStart(Runnable* runnable) return true; } - if (!expiredThreads.empty()) { + if (!expiredThreads.empty()) + { // restart an expired thread ActivePooledThread* pThread = expiredThreads.front(); expiredThreads.pop_front(); @@ -368,8 +371,10 @@ inline bool comparePriority(int priority, const QueuePage* p) void ActiveThreadPoolPrivate::enqueueTask(Runnable* runnable, int priority) { poco_assert(runnable != nullptr); - for (QueuePage* page : runnables) { - if (page->priority() == priority && !page->isFull()) { + for (QueuePage* page : runnables) + { + if (page->priority() == priority && !page->isFull()) + { page->push(runnable); return; } @@ -413,10 +418,11 @@ void ActiveThreadPoolPrivate::joinAll() waitingThreads.clear(); mutex.unlock(); - for (ActivePooledThread* pThread : allThreadsCopy) { - if (pThread->isRunning()) { + for (ActivePooledThread* pThread : allThreadsCopy) + { + if (pThread->isRunning()) + { pThread->notifyRunnableReady(); - pThread->join(); } delete pThread; } @@ -463,7 +469,8 @@ void ActiveThreadPool::start(Runnable& target, int priority) { FastMutex::ScopedLock lock(m_impl->mutex); - if (!m_impl->tryStart(&target)) { + if (!m_impl->tryStart(&target)) + { m_impl->enqueueTask(&target, priority); if (!m_impl->waitingThreads.empty()) From b407b831fb11c06b6affc59701d3d127881b83de Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Tue, 6 Aug 2024 10:30:28 +0800 Subject: [PATCH 03/24] Fix ThreadSanitizer thread leak error --- Foundation/src/ActiveThreadPool.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index c774ad6723..3c95e75b57 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -351,6 +351,8 @@ bool ActiveThreadPoolPrivate::tryStart(Runnable* runnable) ++activeThreads; + // an expired thread must call join() before restart it, or it will cost thread leak + pThread->join(); pThread->setRunnable(runnable); pThread->start(); return true; @@ -424,6 +426,9 @@ void ActiveThreadPoolPrivate::joinAll() { pThread->notifyRunnableReady(); } + + // we must call join() before thread destruction, or it will cost thread leak + pThread->join(); delete pThread; } From 6d70ff84bee66c1cb107ec95eaea726facb9265a Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 7 Aug 2024 09:23:36 +0800 Subject: [PATCH 04/24] enh(ActivePooledThread): Change pointers to references --- Foundation/src/ActiveThreadPool.cpp | 54 +++++++++++++---------------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 3c95e75b57..ec0244f4fd 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -32,7 +32,7 @@ class QueuePage MaxPageSize = 256 }; - QueuePage(Runnable* runnable, int priority): + QueuePage(Runnable& runnable, int priority): _priority(priority) { push(runnable); @@ -48,12 +48,11 @@ class QueuePage return _firstIndex > _lastIndex; } - void push(Runnable* runnable) + void push(Runnable& runnable) { - poco_assert(runnable); poco_assert(!isFull()); _lastIndex += 1; - _entries[_lastIndex] = runnable; + _entries[_lastIndex] = &runnable; } void skipToNextOrEnd() @@ -129,7 +128,7 @@ class ActivePooledThread: public Runnable void join(); bool isRunning() const; - void setRunnable(Runnable* runnable); + void setRunnable(Runnable& target); void notifyRunnableReady(); void registerThreadInactive(); @@ -137,7 +136,7 @@ class ActivePooledThread: public Runnable private: ActiveThreadPoolPrivate* _pool; - Runnable* _runnable; + Runnable* _pTarget; Condition _runnableReady; Thread _thread; }; @@ -150,9 +149,9 @@ class ActiveThreadPoolPrivate ActiveThreadPoolPrivate(int capacity, int stackSize, const std::string& name); ~ActiveThreadPoolPrivate(); - bool tryStart(Runnable* runnable); - void enqueueTask(Runnable* runnable, int priority = 0); - void startThread(Runnable* runnable); + bool tryStart(Runnable& target); + void enqueueTask(Runnable& target, int priority = 0); + void startThread(Runnable& target); void joinAll(); int activeThreadCount() const; @@ -176,7 +175,7 @@ class ActiveThreadPoolPrivate ActivePooledThread::ActivePooledThread(ActiveThreadPoolPrivate* pool): _pool(pool), - _runnable(nullptr) + _pTarget(nullptr) { poco_assert(_pool != nullptr); std::ostringstream name; @@ -192,9 +191,9 @@ void ActivePooledThread::start() } -void ActivePooledThread::setRunnable(Runnable* runnable) +void ActivePooledThread::setRunnable(Runnable& target) { - _runnable = runnable; + _pTarget = ⌖ } @@ -221,8 +220,8 @@ void ActivePooledThread::run() FastMutex::ScopedLock lock(_pool->mutex); for (;;) { - Runnable* r = _runnable; - _runnable = nullptr; + Runnable* r = _pTarget; + _pTarget = nullptr; do { @@ -319,12 +318,11 @@ ActiveThreadPoolPrivate::~ActiveThreadPoolPrivate() } -bool ActiveThreadPoolPrivate::tryStart(Runnable* runnable) +bool ActiveThreadPoolPrivate::tryStart(Runnable& target) { - poco_assert(runnable != nullptr); if (allThreads.empty()) { - startThread(runnable); + startThread(target); return true; } @@ -336,7 +334,7 @@ bool ActiveThreadPoolPrivate::tryStart(Runnable* runnable) if (!waitingThreads.empty()) { // recycle an available thread - enqueueTask(runnable); + enqueueTask(target); ActivePooledThread* pThread = waitingThreads.front(); waitingThreads.pop_front(); pThread->notifyRunnableReady(); @@ -353,13 +351,13 @@ bool ActiveThreadPoolPrivate::tryStart(Runnable* runnable) // an expired thread must call join() before restart it, or it will cost thread leak pThread->join(); - pThread->setRunnable(runnable); + pThread->setRunnable(target); pThread->start(); return true; } // start a new thread - startThread(runnable); + startThread(target); return true; } @@ -370,19 +368,18 @@ inline bool comparePriority(int priority, const QueuePage* p) } -void ActiveThreadPoolPrivate::enqueueTask(Runnable* runnable, int priority) +void ActiveThreadPoolPrivate::enqueueTask(Runnable& target, int priority) { - poco_assert(runnable != nullptr); for (QueuePage* page : runnables) { if (page->priority() == priority && !page->isFull()) { - page->push(runnable); + page->push(target); return; } } auto it = std::upper_bound(runnables.begin(), runnables.end(), priority, comparePriority); - runnables.insert(it, new QueuePage(runnable, priority)); + runnables.insert(it, new QueuePage(target, priority)); } @@ -392,13 +389,12 @@ int ActiveThreadPoolPrivate::activeThreadCount() const } -void ActiveThreadPoolPrivate::startThread(Runnable* runnable) +void ActiveThreadPoolPrivate::startThread(Runnable& target) { - poco_assert(runnable != nullptr); auto pThread = new ActivePooledThread(this); allThreads.insert(pThread); ++activeThreads; - pThread->setRunnable(runnable); + pThread->setRunnable(target); pThread->start(); } @@ -474,9 +470,9 @@ void ActiveThreadPool::start(Runnable& target, int priority) { FastMutex::ScopedLock lock(m_impl->mutex); - if (!m_impl->tryStart(&target)) + if (!m_impl->tryStart(target)) { - m_impl->enqueueTask(&target, priority); + m_impl->enqueueTask(target, priority); if (!m_impl->waitingThreads.empty()) { From d2744facb951c3e3af18d52a9c7f82a3b08c738d Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 7 Aug 2024 09:26:01 +0800 Subject: [PATCH 05/24] enh(ActivePooledThread): remove unused method --- Foundation/src/ActiveThreadPool.cpp | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index ec0244f4fd..0422f8d193 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -87,25 +87,6 @@ class QueuePage return runnable; } - bool tryTake(Runnable* runnable) - { - poco_assert(!isFinished()); - for (int i = _firstIndex; i <= _lastIndex; i++) - { - if (_entries[i] == runnable) - { - _entries[i] = nullptr; - if (i == _firstIndex) - { - // make sure first() does not return a nullptr - skipToNextOrEnd(); - } - return true; - } - } - return false; - } - int priority() const { return _priority; From df5c140b25b2c774f5683fd23cbb45da27d0a9bc Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 21 Aug 2024 09:21:43 +0800 Subject: [PATCH 06/24] enh(Poco::ActiveThreadPool): Use std::unique_ptr instead of raw pointer --- Foundation/include/Poco/ActiveThreadPool.h | 3 ++- Foundation/src/ActiveThreadPool.cpp | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Foundation/include/Poco/ActiveThreadPool.h b/Foundation/include/Poco/ActiveThreadPool.h index ac28051097..1edbb5f579 100644 --- a/Foundation/include/Poco/ActiveThreadPool.h +++ b/Foundation/include/Poco/ActiveThreadPool.h @@ -21,6 +21,7 @@ #include "Poco/Foundation.h" #include "Poco/Thread.h" #include "Poco/Environment.h" +#include namespace Poco { @@ -88,7 +89,7 @@ class Foundation_API ActiveThreadPool ActiveThreadPool& operator = (const ActiveThreadPool& pool); private: - ActiveThreadPoolPrivate* m_impl; + std::unique_ptr m_impl; }; } // namespace Poco diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 0422f8d193..1c8374a528 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -436,8 +436,6 @@ ActiveThreadPool::ActiveThreadPool(const std::string& name, int capacity, int st ActiveThreadPool::~ActiveThreadPool() { - delete m_impl; - m_impl = nullptr; } From fde68aa8cbd0ab2ffdfe7da85be17058a7182a94 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 21 Aug 2024 09:29:57 +0800 Subject: [PATCH 07/24] enh(Poco::ActiveThreadPool): Use C++ static_cast instead of C casting --- Foundation/src/ActiveThreadPool.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 1c8374a528..983dd66e22 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -366,7 +366,8 @@ void ActiveThreadPoolPrivate::enqueueTask(Runnable& target, int priority) int ActiveThreadPoolPrivate::activeThreadCount() const { - return (int)(allThreads.size() - expiredThreads.size() - waitingThreads.size()); + std::size_t count = allThreads.size() - expiredThreads.size() - waitingThreads.size(); + return static_cast(count); } From 754e628eb8529b2fe049ad0eb30c22916e08dac6 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 21 Aug 2024 14:24:33 +0800 Subject: [PATCH 08/24] enh(Poco::ActiveThreadPool): Use standard containers instead of implementing own --- Foundation/src/ActiveThreadPool.cpp | 124 +++++++++++++--------------- 1 file changed, 56 insertions(+), 68 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 983dd66e22..6675012cb2 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -21,82 +21,92 @@ #include #include #include +#include namespace Poco { -class QueuePage + +class RunnableList + /// A list of the same priority runnables { public: - enum + RunnableList(Runnable& target, int priority) : _priority(priority) { - MaxPageSize = 256 - }; + push(target); + } - QueuePage(Runnable& runnable, int priority): - _priority(priority) + int priority() const { - push(runnable); + return _priority; } - bool isFull() + void push(Runnable& r) { - return _lastIndex >= MaxPageSize - 1; + _runnables.push_back(&r); } - bool isFinished() + Runnable* pop() { - return _firstIndex > _lastIndex; + Runnable* r = _runnables.front(); + _runnables.pop_front(); + return r; } - void push(Runnable& runnable) + bool empty() const { - poco_assert(!isFull()); - _lastIndex += 1; - _entries[_lastIndex] = &runnable; + return _runnables.empty(); } - void skipToNextOrEnd() + // for std::push_heap + bool operator< (const RunnableList& rhs) const { - while (!isFinished() && _entries[_firstIndex] == nullptr) - { - _firstIndex += 1; - } + return this->_priority < rhs._priority; } - Runnable* first() +private: + int _priority = 0; + std::list _runnables; +}; + + +class RunnablePriorityQueue + /// A priority queue of runnables +{ +public: + void push(Runnable& target, int priority) { - poco_assert(!isFinished()); - Runnable* runnable = _entries[_firstIndex]; - poco_assert(runnable); - return runnable; + for (auto& q : _queues) + { + if (q->priority() == priority) + { + q->push(target); + return; + } + } + auto q = std::make_shared(target, priority); + _queues.push_back(q); + std::push_heap(_queues.begin(), _queues.end()); } Runnable* pop() { - poco_assert(!isFinished()); - Runnable* runnable = first(); - poco_assert(runnable); - - // clear the entry although this should not be necessary - _entries[_firstIndex] = nullptr; - _firstIndex += 1; - - // make sure the next runnable returned by first() is not a nullptr - skipToNextOrEnd(); - - return runnable; + auto q = _queues.front(); + Runnable* r = q->pop(); + if (q->empty()) + { + std::pop_heap(_queues.begin(), _queues.end()); + _queues.pop_back(); + } + return r; } - int priority() const + bool empty() const { - return _priority; + return _queues.empty(); } private: - int _priority = 0; - int _firstIndex = 0; - int _lastIndex = -1; - Runnable* _entries[MaxPageSize]; + std::vector> _queues; }; @@ -143,7 +153,7 @@ class ActiveThreadPoolPrivate std::set allThreads; std::list waitingThreads; std::list expiredThreads; - std::list runnables; + RunnablePriorityQueue runnables; Condition noActiveThreads; int expiryTimeout = 30000; @@ -235,14 +245,7 @@ void ActivePooledThread::run() break; } - QueuePage* page = _pool->runnables.front(); - r = page->pop(); - - if (page->isFinished()) - { - _pool->runnables.pop_front(); - delete page; - } + r = _pool->runnables.pop(); } while (true); _pool->waitingThreads.push_back(this); @@ -343,24 +346,9 @@ bool ActiveThreadPoolPrivate::tryStart(Runnable& target) } -inline bool comparePriority(int priority, const QueuePage* p) -{ - return p->priority() < priority; -} - - void ActiveThreadPoolPrivate::enqueueTask(Runnable& target, int priority) { - for (QueuePage* page : runnables) - { - if (page->priority() == priority && !page->isFull()) - { - page->push(target); - return; - } - } - auto it = std::upper_bound(runnables.begin(), runnables.end(), priority, comparePriority); - runnables.insert(it, new QueuePage(target, priority)); + runnables.push(target, priority); } From 9717b53c1c790659aa32e3c4a417375462c071f5 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 21 Aug 2024 14:52:22 +0800 Subject: [PATCH 09/24] enh(Poco::ActiveThreadPool): Change pointer to reference --- Foundation/src/ActiveThreadPool.cpp | 43 ++++++++++++++--------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 6675012cb2..425d97b5f6 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -113,7 +113,7 @@ class RunnablePriorityQueue class ActivePooledThread: public Runnable { public: - ActivePooledThread(ActiveThreadPoolPrivate* pool); + ActivePooledThread(ActiveThreadPoolPrivate& pool); void start(); void join(); @@ -126,7 +126,7 @@ class ActivePooledThread: public Runnable virtual void run() override; private: - ActiveThreadPoolPrivate* _pool; + ActiveThreadPoolPrivate& _pool; Runnable* _pTarget; Condition _runnableReady; Thread _thread; @@ -164,15 +164,14 @@ class ActiveThreadPoolPrivate }; -ActivePooledThread::ActivePooledThread(ActiveThreadPoolPrivate* pool): +ActivePooledThread::ActivePooledThread(ActiveThreadPoolPrivate& pool): _pool(pool), _pTarget(nullptr) { - poco_assert(_pool != nullptr); std::ostringstream name; - name << _pool->name << "[#" << ++_pool->serial << "]"; + name << _pool.name << "[#" << ++_pool.serial << "]"; _thread.setName(name.str()); - _thread.setStackSize(_pool->stackSize); + _thread.setStackSize(_pool.stackSize); } @@ -208,7 +207,7 @@ void ActivePooledThread::join() void ActivePooledThread::run() { - FastMutex::ScopedLock lock(_pool->mutex); + FastMutex::ScopedLock lock(_pool.mutex); for (;;) { Runnable* r = _pTarget; @@ -218,7 +217,7 @@ void ActivePooledThread::run() { if (r) { - _pool->mutex.unlock(); + _pool.mutex.unlock(); try { r->run(); @@ -236,34 +235,34 @@ void ActivePooledThread::run() ErrorHandler::handle(); } ThreadLocalStorage::clear(); - _pool->mutex.lock(); + _pool.mutex.lock(); } - if (_pool->runnables.empty()) + if (_pool.runnables.empty()) { r = nullptr; break; } - r = _pool->runnables.pop(); + r = _pool.runnables.pop(); } while (true); - _pool->waitingThreads.push_back(this); + _pool.waitingThreads.push_back(this); registerThreadInactive(); // wait for work, exiting after the expiry timeout is reached - _runnableReady.tryWait(_pool->mutex, _pool->expiryTimeout); - ++_pool->activeThreads; + _runnableReady.tryWait(_pool.mutex, _pool.expiryTimeout); + ++_pool.activeThreads; - auto it = std::find(_pool->waitingThreads.begin(), _pool->waitingThreads.end(), this); - if (it != _pool->waitingThreads.end()) + auto it = std::find(_pool.waitingThreads.begin(), _pool.waitingThreads.end(), this); + if (it != _pool.waitingThreads.end()) { - _pool->waitingThreads.erase(it); - _pool->expiredThreads.push_back(this); + _pool.waitingThreads.erase(it); + _pool.expiredThreads.push_back(this); registerThreadInactive(); break; } - if (!_pool->allThreads.count(this)) + if (!_pool.allThreads.count(this)) { registerThreadInactive(); break; @@ -274,9 +273,9 @@ void ActivePooledThread::run() void ActivePooledThread::registerThreadInactive() { - if (--_pool->activeThreads == 0) + if (--_pool.activeThreads == 0) { - _pool->noActiveThreads.broadcast(); + _pool.noActiveThreads.broadcast(); } } @@ -361,7 +360,7 @@ int ActiveThreadPoolPrivate::activeThreadCount() const void ActiveThreadPoolPrivate::startThread(Runnable& target) { - auto pThread = new ActivePooledThread(this); + auto pThread = new ActivePooledThread(*this); allThreads.insert(pThread); ++activeThreads; pThread->setRunnable(target); From 4a3340bd70b66b330444fc8f58f2df6d6d6ce55a Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 21 Aug 2024 16:18:08 +0800 Subject: [PATCH 10/24] enh(Poco::ActiveThreadPool): Use smart pointers instead of bare pointers --- Foundation/src/ActiveThreadPool.cpp | 40 +++++++++++++++++------------ 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 425d97b5f6..89076075fa 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -18,6 +18,8 @@ #include "Poco/ThreadLocal.h" #include "Poco/ErrorHandler.h" #include "Poco/Condition.h" +#include "Poco/RefCountedObject.h" +#include "Poco/AutoPtr.h" #include #include #include @@ -30,7 +32,7 @@ class RunnableList /// A list of the same priority runnables { public: - RunnableList(Runnable& target, int priority) : _priority(priority) + RunnableList(Runnable& target, int priority): _priority(priority) { push(target); } @@ -110,10 +112,12 @@ class RunnablePriorityQueue }; -class ActivePooledThread: public Runnable +class ActivePooledThread: public Runnable, public RefCountedObject { public: - ActivePooledThread(ActiveThreadPoolPrivate& pool); + using Ptr = Poco::AutoPtr; + + explicit ActivePooledThread(ActiveThreadPoolPrivate& pool); void start(); void join(); @@ -150,9 +154,9 @@ class ActiveThreadPoolPrivate public: mutable FastMutex mutex; std::string name; - std::set allThreads; - std::list waitingThreads; - std::list expiredThreads; + std::set allThreads; + std::list waitingThreads; + std::list expiredThreads; RunnablePriorityQueue runnables; Condition noActiveThreads; @@ -183,6 +187,7 @@ void ActivePooledThread::start() void ActivePooledThread::setRunnable(Runnable& target) { + poco_assert(_pTarget == nullptr); _pTarget = ⌖ } @@ -247,22 +252,23 @@ void ActivePooledThread::run() r = _pool.runnables.pop(); } while (true); - _pool.waitingThreads.push_back(this); + ActivePooledThread::Ptr thisCopy(this, true); + _pool.waitingThreads.push_back(thisCopy); registerThreadInactive(); // wait for work, exiting after the expiry timeout is reached _runnableReady.tryWait(_pool.mutex, _pool.expiryTimeout); ++_pool.activeThreads; - auto it = std::find(_pool.waitingThreads.begin(), _pool.waitingThreads.end(), this); + auto it = std::find(_pool.waitingThreads.begin(), _pool.waitingThreads.end(), thisCopy); if (it != _pool.waitingThreads.end()) { _pool.waitingThreads.erase(it); - _pool.expiredThreads.push_back(this); + _pool.expiredThreads.push_back(thisCopy); registerThreadInactive(); break; } - if (!_pool.allThreads.count(this)) + if (!_pool.allThreads.count(thisCopy)) { registerThreadInactive(); break; @@ -318,7 +324,7 @@ bool ActiveThreadPoolPrivate::tryStart(Runnable& target) { // recycle an available thread enqueueTask(target); - ActivePooledThread* pThread = waitingThreads.front(); + auto pThread = waitingThreads.front(); waitingThreads.pop_front(); pThread->notifyRunnableReady(); return true; @@ -327,7 +333,7 @@ bool ActiveThreadPoolPrivate::tryStart(Runnable& target) if (!expiredThreads.empty()) { // restart an expired thread - ActivePooledThread* pThread = expiredThreads.front(); + auto pThread = expiredThreads.front(); expiredThreads.pop_front(); ++activeThreads; @@ -360,7 +366,7 @@ int ActiveThreadPoolPrivate::activeThreadCount() const void ActiveThreadPoolPrivate::startThread(Runnable& target) { - auto pThread = new ActivePooledThread(*this); + ActivePooledThread::Ptr pThread = new ActivePooledThread(*this); allThreads.insert(pThread); ++activeThreads; pThread->setRunnable(target); @@ -379,13 +385,13 @@ void ActiveThreadPoolPrivate::joinAll() } // move the contents of the set out so that we can iterate without the lock - std::set allThreadsCopy; + std::set allThreadsCopy; allThreadsCopy.swap(allThreads); expiredThreads.clear(); waitingThreads.clear(); mutex.unlock(); - for (ActivePooledThread* pThread : allThreadsCopy) + for (auto pThread : allThreadsCopy) { if (pThread->isRunning()) { @@ -394,7 +400,7 @@ void ActiveThreadPoolPrivate::joinAll() // we must call join() before thread destruction, or it will cost thread leak pThread->join(); - delete pThread; + poco_assert(2 == pThread->referenceCount()); } mutex.lock(); @@ -443,7 +449,7 @@ void ActiveThreadPool::start(Runnable& target, int priority) if (!m_impl->waitingThreads.empty()) { - ActivePooledThread* pThread = m_impl->waitingThreads.front(); + auto pThread = m_impl->waitingThreads.front(); m_impl->waitingThreads.pop_front(); pThread->notifyRunnableReady(); } From e215cb7ac85c0d3702c1b26bcad23df938b12a91 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 21 Aug 2024 16:55:42 +0800 Subject: [PATCH 11/24] enh(Poco::ActiveThreadPool): Fix codeql warning: A stack address which arrived via a may be assigned to a non-local variable. --- Foundation/src/ActiveThreadPool.cpp | 39 +++++++++++++++-------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 89076075fa..1e752cfebe 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace Poco { @@ -32,7 +33,8 @@ class RunnableList /// A list of the same priority runnables { public: - RunnableList(Runnable& target, int priority): _priority(priority) + RunnableList(Runnable& target, int priority): + _priority(priority) { push(target); } @@ -44,12 +46,12 @@ class RunnableList void push(Runnable& r) { - _runnables.push_back(&r); + _runnables.push_back(std::ref(r)); } - Runnable* pop() + Runnable& pop() { - Runnable* r = _runnables.front(); + auto r = _runnables.front(); _runnables.pop_front(); return r; } @@ -67,7 +69,7 @@ class RunnableList private: int _priority = 0; - std::list _runnables; + std::list> _runnables; }; @@ -85,15 +87,15 @@ class RunnablePriorityQueue return; } } - auto q = std::make_shared(target, priority); + auto q = std::make_shared(std::ref(target), priority); _queues.push_back(q); std::push_heap(_queues.begin(), _queues.end()); } - Runnable* pop() + Runnable& pop() { auto q = _queues.front(); - Runnable* r = q->pop(); + auto& r = q->pop(); if (q->empty()) { std::pop_heap(_queues.begin(), _queues.end()); @@ -131,7 +133,7 @@ class ActivePooledThread: public Runnable, public RefCountedObject private: ActiveThreadPoolPrivate& _pool; - Runnable* _pTarget; + std::optional> _target{}; Condition _runnableReady; Thread _thread; }; @@ -169,8 +171,7 @@ class ActiveThreadPoolPrivate ActivePooledThread::ActivePooledThread(ActiveThreadPoolPrivate& pool): - _pool(pool), - _pTarget(nullptr) + _pool(pool) { std::ostringstream name; name << _pool.name << "[#" << ++_pool.serial << "]"; @@ -187,8 +188,8 @@ void ActivePooledThread::start() void ActivePooledThread::setRunnable(Runnable& target) { - poco_assert(_pTarget == nullptr); - _pTarget = ⌖ + poco_assert(_target.has_value() == false); + _target = std::ref(target); } @@ -215,17 +216,17 @@ void ActivePooledThread::run() FastMutex::ScopedLock lock(_pool.mutex); for (;;) { - Runnable* r = _pTarget; - _pTarget = nullptr; + auto r = _target; + _target.reset(); do { - if (r) + if (r.has_value()) { _pool.mutex.unlock(); try { - r->run(); + r.value().get().run(); } catch (Exception& exc) { @@ -245,11 +246,11 @@ void ActivePooledThread::run() if (_pool.runnables.empty()) { - r = nullptr; + r.reset(); break; } - r = _pool.runnables.pop(); + r = std::ref(_pool.runnables.pop()); } while (true); ActivePooledThread::Ptr thisCopy(this, true); From 86d9a1382eb5926be89eef2d59f6e915eac42d31 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Wed, 21 Aug 2024 17:48:43 +0800 Subject: [PATCH 12/24] enh(Poco::ActiveThreadPool): More test case --- .../testsuite/src/ActiveThreadPoolTest.cpp | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 1bef49d0bb..74eda5f0bf 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -73,6 +73,40 @@ void ActiveThreadPoolTest::testActiveThreadPool() pool.joinAll(); assertTrue (_count == 1000); + + pool.setExpiryTimeout(10); + assertTrue (pool.expiryTimeout() == 10); + + try + { + for (int i = 0; i < pool.capacity(); ++i) + { + pool.start(ra); + } + } + catch (...) + { + failmsg("wrong exception thrown"); + } + + // wait for the threads to expire + Poco::Thread::sleep(pool.expiryTimeout() * pool.capacity()); + + try + { + for (int i = 0; i < pool.capacity(); ++i) + { + pool.start(ra); // reuse expired threads + } + } + catch (...) + { + failmsg("wrong exception thrown"); + } + + // wait for the threads to expire + Poco::Thread::sleep(pool.expiryTimeout() * pool.capacity()); + pool.joinAll(); // join with no active threads } From cba4673b47761192d118eadf320b92f880071404 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Thu, 22 Aug 2024 09:50:04 +0800 Subject: [PATCH 13/24] enh(Poco::ActiveThreadPool): std::optional::value unavailable on earlier macOS versions --- Foundation/src/ActiveThreadPool.cpp | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 1e752cfebe..e6598e4819 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -24,7 +24,6 @@ #include #include #include -#include namespace Poco { @@ -133,7 +132,7 @@ class ActivePooledThread: public Runnable, public RefCountedObject private: ActiveThreadPoolPrivate& _pool; - std::optional> _target{}; + Runnable* _pTarget; Condition _runnableReady; Thread _thread; }; @@ -171,7 +170,8 @@ class ActiveThreadPoolPrivate ActivePooledThread::ActivePooledThread(ActiveThreadPoolPrivate& pool): - _pool(pool) + _pool(pool), + _pTarget(nullptr) { std::ostringstream name; name << _pool.name << "[#" << ++_pool.serial << "]"; @@ -188,8 +188,8 @@ void ActivePooledThread::start() void ActivePooledThread::setRunnable(Runnable& target) { - poco_assert(_target.has_value() == false); - _target = std::ref(target); + poco_assert(_pTarget == nullptr); + _pTarget = ⌖ } @@ -216,17 +216,17 @@ void ActivePooledThread::run() FastMutex::ScopedLock lock(_pool.mutex); for (;;) { - auto r = _target; - _target.reset(); + auto r = _pTarget; + _pTarget = nullptr; do { - if (r.has_value()) + if (r) { _pool.mutex.unlock(); try { - r.value().get().run(); + r->run(); } catch (Exception& exc) { @@ -246,11 +246,11 @@ void ActivePooledThread::run() if (_pool.runnables.empty()) { - r.reset(); + r = nullptr; break; } - r = std::ref(_pool.runnables.pop()); + r = &_pool.runnables.pop(); } while (true); ActivePooledThread::Ptr thisCopy(this, true); From 6bed6cba6f2623496d0d2a116f29a9a8bf61214b Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Thu, 22 Aug 2024 14:38:21 +0800 Subject: [PATCH 14/24] enh(Poco::ActiveThreadPool): Fix compare function for make heap --- Foundation/src/ActiveThreadPool.cpp | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index e6598e4819..12511cd8d4 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -60,18 +60,22 @@ class RunnableList return _runnables.empty(); } - // for std::push_heap - bool operator< (const RunnableList& rhs) const - { - return this->_priority < rhs._priority; - } - private: int _priority = 0; std::list> _runnables; }; +struct RunnablePriorityCompare +{ + // for make heap + bool operator()(const std::shared_ptr& left, const std::shared_ptr& right) const + { + return left->priority() < right->priority(); + } +}; + + class RunnablePriorityQueue /// A priority queue of runnables { @@ -88,7 +92,7 @@ class RunnablePriorityQueue } auto q = std::make_shared(std::ref(target), priority); _queues.push_back(q); - std::push_heap(_queues.begin(), _queues.end()); + std::push_heap(_queues.begin(), _queues.end(), _comp); } Runnable& pop() @@ -97,7 +101,7 @@ class RunnablePriorityQueue auto& r = q->pop(); if (q->empty()) { - std::pop_heap(_queues.begin(), _queues.end()); + std::pop_heap(_queues.begin(), _queues.end(), _comp); _queues.pop_back(); } return r; @@ -110,6 +114,7 @@ class RunnablePriorityQueue private: std::vector> _queues; + RunnablePriorityCompare _comp; }; From 7b26ab89ec17502ecf43819d22447a37fce434ec Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Thu, 22 Aug 2024 14:48:07 +0800 Subject: [PATCH 15/24] enh(Poco::ActiveThreadPool): Add more test case --- .../testsuite/src/ActiveThreadPoolTest.cpp | 79 +++++++++++++++++-- .../testsuite/src/ActiveThreadPoolTest.h | 4 +- 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 74eda5f0bf..40c983be84 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -16,12 +16,44 @@ #include "Poco/Exception.h" #include "Poco/Thread.h" #include "Poco/Environment.h" +#include "Poco/RefCountedObject.h" +#include "Poco/AutoPtr.h" using Poco::ActiveThreadPool; using Poco::RunnableAdapter; using Poco::Thread; using Poco::Environment; +using Poco::Runnable; +using Poco::RefCountedObject; +using Poco::AutoPtr; + + +namespace +{ + class TestPriorityRunnable: public Runnable, public RefCountedObject + { + public: + using Ptr = AutoPtr; + + TestPriorityRunnable(int n, Poco::FastMutex& mutex, std::vector& result): + _n(n), + _mutex(mutex), + _result(result) + {} + + virtual void run() override + { + Poco::FastMutex::ScopedLock lock(_mutex); + _result.push_back(_n); + } + + private: + int _n; + Poco::FastMutex& _mutex; + std::vector& _result; + }; +} ActiveThreadPoolTest::ActiveThreadPoolTest(const std::string& name): CppUnit::TestCase(name) @@ -34,14 +66,13 @@ ActiveThreadPoolTest::~ActiveThreadPoolTest() } -void ActiveThreadPoolTest::testActiveThreadPool() +void ActiveThreadPoolTest::testSimpleCount() { ActiveThreadPool pool; - assertTrue (pool.capacity() == static_cast(Environment::processorCount()) + 1); - RunnableAdapter ra(*this, &ActiveThreadPoolTest::count); + _count = 0; try { for (int i = 0; i < 2000; ++i) @@ -53,9 +84,7 @@ void ActiveThreadPoolTest::testActiveThreadPool() { failmsg("wrong exception thrown"); } - pool.joinAll(); - assertTrue (_count == 2000); _count = 0; @@ -71,12 +100,19 @@ void ActiveThreadPoolTest::testActiveThreadPool() failmsg("wrong exception thrown"); } pool.joinAll(); - assertTrue (_count == 1000); +} + + +void ActiveThreadPoolTest::testExpiryTimeout() +{ + ActiveThreadPool pool; + RunnableAdapter ra(*this, &ActiveThreadPoolTest::count); pool.setExpiryTimeout(10); assertTrue (pool.expiryTimeout() == 10); + _count = 0; try { for (int i = 0; i < pool.capacity(); ++i) @@ -107,8 +143,35 @@ void ActiveThreadPoolTest::testActiveThreadPool() // wait for the threads to expire Poco::Thread::sleep(pool.expiryTimeout() * pool.capacity()); pool.joinAll(); // join with no active threads + assertTrue (_count == pool.capacity() * 2); } +void ActiveThreadPoolTest::testPriority() +{ + Poco::FastMutex mutex; + std::vector result; + ActiveThreadPool pool(1); + std::vector runnables; + + Poco::FastMutex::ScopedLock lock(mutex); // lock, to make sure runnables are queued + for (int priority = 0; priority < 1000; ++priority) + { + TestPriorityRunnable::Ptr r = new TestPriorityRunnable(priority, mutex, result); + runnables.push_back(r); + pool.start(*r, priority); + } + mutex.unlock(); // unlock, to let runnables go + pool.joinAll(); + + std::vector mock; + mock.push_back(0); // 0 is the first result + for (int i = 999; i > 0; --i) + { + mock.push_back(i); // other results should sort by priority + } + + assertTrue (std::equal(result.begin(), result.end(), mock.begin(), mock.end())); +} void ActiveThreadPoolTest::setUp() { @@ -131,7 +194,9 @@ CppUnit::Test* ActiveThreadPoolTest::suite() { CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest"); - CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testSimpleCount); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testExpiryTimeout); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testPriority); return pSuite; } diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.h b/Foundation/testsuite/src/ActiveThreadPoolTest.h index 51df837355..0e78b65540 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.h +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.h @@ -26,7 +26,9 @@ class ActiveThreadPoolTest: public CppUnit::TestCase ActiveThreadPoolTest(const std::string& name); ~ActiveThreadPoolTest(); - void testActiveThreadPool(); + void testSimpleCount(); + void testExpiryTimeout(); + void testPriority(); void setUp(); void tearDown(); From 81823a4224fd5ae4dd236d1da5ad39286eeb5770 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Thu, 22 Aug 2024 15:48:32 +0800 Subject: [PATCH 16/24] enh(Poco::ActiveThreadPool): Add more test case --- .../testsuite/src/ActiveThreadPoolTest.cpp | 31 ++++++++++++------- .../testsuite/src/ActiveThreadPoolTest.h | 6 ++-- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 40c983be84..36415aa9a9 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -66,7 +66,7 @@ ActiveThreadPoolTest::~ActiveThreadPoolTest() } -void ActiveThreadPoolTest::testSimpleCount() +void ActiveThreadPoolTest::testActiveThreadPool1() { ActiveThreadPool pool; assertTrue (pool.capacity() == static_cast(Environment::processorCount()) + 1); @@ -104,7 +104,7 @@ void ActiveThreadPoolTest::testSimpleCount() } -void ActiveThreadPoolTest::testExpiryTimeout() +void ActiveThreadPoolTest::testActiveThreadPool2() { ActiveThreadPool pool; RunnableAdapter ra(*this, &ActiveThreadPoolTest::count); @@ -146,23 +146,30 @@ void ActiveThreadPoolTest::testExpiryTimeout() assertTrue (_count == pool.capacity() * 2); } -void ActiveThreadPoolTest::testPriority() +void ActiveThreadPoolTest::testActiveThreadPool3() { Poco::FastMutex mutex; std::vector result; ActiveThreadPool pool(1); std::vector runnables; - Poco::FastMutex::ScopedLock lock(mutex); // lock, to make sure runnables are queued - for (int priority = 0; priority < 1000; ++priority) + mutex.lock(); // lock, to make sure runnables are queued + try { - TestPriorityRunnable::Ptr r = new TestPriorityRunnable(priority, mutex, result); - runnables.push_back(r); - pool.start(*r, priority); + for (int priority = 0; priority < 1000; ++priority) + { + TestPriorityRunnable::Ptr r = new TestPriorityRunnable(priority, mutex, result); + runnables.push_back(r); + pool.start(*r, priority); + } + } + catch (...) + { + failmsg("wrong exception thrown"); } mutex.unlock(); // unlock, to let runnables go - pool.joinAll(); + pool.joinAll(); std::vector mock; mock.push_back(0); // 0 is the first result for (int i = 999; i > 0; --i) @@ -194,9 +201,9 @@ CppUnit::Test* ActiveThreadPoolTest::suite() { CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest"); - CppUnit_addTest(pSuite, ActiveThreadPoolTest, testSimpleCount); - CppUnit_addTest(pSuite, ActiveThreadPoolTest, testExpiryTimeout); - CppUnit_addTest(pSuite, ActiveThreadPoolTest, testPriority); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool1); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3); return pSuite; } diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.h b/Foundation/testsuite/src/ActiveThreadPoolTest.h index 0e78b65540..dec9e85df8 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.h +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.h @@ -26,9 +26,9 @@ class ActiveThreadPoolTest: public CppUnit::TestCase ActiveThreadPoolTest(const std::string& name); ~ActiveThreadPoolTest(); - void testSimpleCount(); - void testExpiryTimeout(); - void testPriority(); + void testActiveThreadPool1(); + void testActiveThreadPool2(); + void testActiveThreadPool3(); void setUp(); void tearDown(); From 36e1ce5ae6a09e76c4720a13b37fd96782ea30b9 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Thu, 22 Aug 2024 16:13:34 +0800 Subject: [PATCH 17/24] enh(Poco::ActiveThreadPool): Code style --- Foundation/testsuite/src/ActiveThreadPoolTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 36415aa9a9..f38fb5acce 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -202,8 +202,8 @@ CppUnit::Test* ActiveThreadPoolTest::suite() CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest"); CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool1); - CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2); - CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3); return pSuite; } From 363702286c93d64d8ede22d8452580ce4e3510da Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Thu, 22 Aug 2024 16:45:16 +0800 Subject: [PATCH 18/24] enh(Poco::ActiveThreadPool): Test case --- Foundation/testsuite/src/ActiveThreadPoolTest.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index f38fb5acce..8f23585c8f 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -126,7 +126,7 @@ void ActiveThreadPoolTest::testActiveThreadPool2() } // wait for the threads to expire - Poco::Thread::sleep(pool.expiryTimeout() * pool.capacity()); + Thread::sleep(pool.expiryTimeout() * pool.capacity()); try { @@ -141,7 +141,7 @@ void ActiveThreadPoolTest::testActiveThreadPool2() } // wait for the threads to expire - Poco::Thread::sleep(pool.expiryTimeout() * pool.capacity()); + Thread::sleep(pool.expiryTimeout() * pool.capacity()); pool.joinAll(); // join with no active threads assertTrue (_count == pool.capacity() * 2); } @@ -202,7 +202,7 @@ CppUnit::Test* ActiveThreadPoolTest::suite() CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest"); CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool1); - CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2); + //CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2); CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3); return pSuite; From a37bf5ad6e27af83d5c6da89996ac227e41378ef Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Thu, 22 Aug 2024 17:37:24 +0800 Subject: [PATCH 19/24] enh(Poco::ActiveThreadPool): Test case --- Foundation/testsuite/src/ActiveThreadPoolTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 8f23585c8f..c8e091a937 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -203,7 +203,7 @@ CppUnit::Test* ActiveThreadPoolTest::suite() CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool1); //CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2); - CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3); + //CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3); return pSuite; } From 09db4e717c9b67dc14012a16613fe3ff6e45d890 Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Fri, 23 Aug 2024 09:30:30 +0800 Subject: [PATCH 20/24] enh(Poco::ActiveThreadPool): Fix test case error --- Foundation/src/ActiveThreadPool.cpp | 9 ++++----- Foundation/testsuite/src/ActiveThreadPoolTest.cpp | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 12511cd8d4..f8a20f5807 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -258,23 +258,22 @@ void ActivePooledThread::run() r = &_pool.runnables.pop(); } while (true); - ActivePooledThread::Ptr thisCopy(this, true); - _pool.waitingThreads.push_back(thisCopy); + _pool.waitingThreads.push_back(ActivePooledThread::Ptr{ this, true }); registerThreadInactive(); // wait for work, exiting after the expiry timeout is reached _runnableReady.tryWait(_pool.mutex, _pool.expiryTimeout); ++_pool.activeThreads; - auto it = std::find(_pool.waitingThreads.begin(), _pool.waitingThreads.end(), thisCopy); + auto it = std::find(_pool.waitingThreads.begin(), _pool.waitingThreads.end(), ActivePooledThread::Ptr{ this, true }); if (it != _pool.waitingThreads.end()) { _pool.waitingThreads.erase(it); - _pool.expiredThreads.push_back(thisCopy); + _pool.expiredThreads.push_back(ActivePooledThread::Ptr{ this, true }); registerThreadInactive(); break; } - if (!_pool.allThreads.count(thisCopy)) + if (!_pool.allThreads.count(ActivePooledThread::Ptr{ this, true })) { registerThreadInactive(); break; diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index c8e091a937..deffde56e4 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -202,8 +202,8 @@ CppUnit::Test* ActiveThreadPoolTest::suite() CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest"); CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool1); - //CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2); - //CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3); return pSuite; } From 3a76e283800eaa4728bec536584b7025877f88ea Mon Sep 17 00:00:00 2001 From: Matej Kenda Date: Sat, 24 Aug 2024 18:17:32 +0200 Subject: [PATCH 21/24] Revert "enh(Poco::ActiveThreadPool): std::optional::value unavailable on earlier macOS versions" This reverts commit cba4673b47761192d118eadf320b92f880071404. --- Foundation/src/ActiveThreadPool.cpp | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index f8a20f5807..1fd641ee25 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace Poco { @@ -137,7 +138,7 @@ class ActivePooledThread: public Runnable, public RefCountedObject private: ActiveThreadPoolPrivate& _pool; - Runnable* _pTarget; + std::optional> _target{}; Condition _runnableReady; Thread _thread; }; @@ -175,8 +176,7 @@ class ActiveThreadPoolPrivate ActivePooledThread::ActivePooledThread(ActiveThreadPoolPrivate& pool): - _pool(pool), - _pTarget(nullptr) + _pool(pool) { std::ostringstream name; name << _pool.name << "[#" << ++_pool.serial << "]"; @@ -193,8 +193,8 @@ void ActivePooledThread::start() void ActivePooledThread::setRunnable(Runnable& target) { - poco_assert(_pTarget == nullptr); - _pTarget = ⌖ + poco_assert(_target.has_value() == false); + _target = std::ref(target); } @@ -221,17 +221,17 @@ void ActivePooledThread::run() FastMutex::ScopedLock lock(_pool.mutex); for (;;) { - auto r = _pTarget; - _pTarget = nullptr; + auto r = _target; + _target.reset(); do { - if (r) + if (r.has_value()) { _pool.mutex.unlock(); try { - r->run(); + r.value().get().run(); } catch (Exception& exc) { @@ -251,11 +251,11 @@ void ActivePooledThread::run() if (_pool.runnables.empty()) { - r = nullptr; + r.reset(); break; } - r = &_pool.runnables.pop(); + r = std::ref(_pool.runnables.pop()); } while (true); _pool.waitingThreads.push_back(ActivePooledThread::Ptr{ this, true }); From 74915820a621b75b49b81c0bd24384adad98d17d Mon Sep 17 00:00:00 2001 From: Matej Kenda Date: Sat, 24 Aug 2024 18:18:33 +0200 Subject: [PATCH 22/24] enh(macOS): require min deployment macOS version 10.15 which has full support for C++17 --- build/config/Darwin-clang-libc++ | 4 ++-- cmake/DefinePlatformSpecifc.cmake | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/build/config/Darwin-clang-libc++ b/build/config/Darwin-clang-libc++ index bfd94fb376..07f22f2b1c 100644 --- a/build/config/Darwin-clang-libc++ +++ b/build/config/Darwin-clang-libc++ @@ -1,7 +1,7 @@ # # Darwin-clang-libc++ # -# Build settings for Mac OS X 10.11 and later (clang, libc++, x86_64/arm64) +# Build settings for Mac OS X 10.13 and later (clang, libc++, x86_64/arm64) # The build settings defined in this file are compatible # with XCode C++ projects. # @@ -13,7 +13,7 @@ LINKMODE ?= SHARED ARCHFLAGS ?= -arch $(POCO_HOST_OSARCH) SANITIZEFLAGS ?= -OSFLAGS ?= -mmacosx-version-min=10.11 -isysroot $(shell xcrun --show-sdk-path) +OSFLAGS ?= -mmacosx-version-min=10.15 -isysroot $(shell xcrun --show-sdk-path) ifeq ($(POCO_HOST_OSARCH),arm64) OPENSSL_DIR ?= /opt/homebrew/opt/openssl diff --git a/cmake/DefinePlatformSpecifc.cmake b/cmake/DefinePlatformSpecifc.cmake index 1b09993b01..380c03e571 100644 --- a/cmake/DefinePlatformSpecifc.cmake +++ b/cmake/DefinePlatformSpecifc.cmake @@ -77,6 +77,8 @@ else(BUILD_SHARED_LIBS) set(CMAKE_RELWITHDEBINFO_POSTFIX "${STATIC_POSTFIX}" CACHE STRING "Set RelWithDebInfo library postfix" FORCE) endif() +# MacOS version that has full support for C++17 +set(CMAKE_OSX_DEPLOYMENT_TARGET, 10.15) # OS Detection include(CheckTypeSize) From 32e1d33fed43d377d5a801697482c59819dd7eec Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Mon, 26 Aug 2024 13:39:34 +0800 Subject: [PATCH 23/24] enh(Poco::ActiveThreadPool): Remove useless "{}" --- Foundation/src/ActiveThreadPool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 1fd641ee25..12f44e8656 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -138,7 +138,7 @@ class ActivePooledThread: public Runnable, public RefCountedObject private: ActiveThreadPoolPrivate& _pool; - std::optional> _target{}; + std::optional> _target; Condition _runnableReady; Thread _thread; }; From 40bedc94017fa6b7f48778db952875aae012b67d Mon Sep 17 00:00:00 2001 From: siren186 <765495939@qq.com> Date: Mon, 26 Aug 2024 16:59:45 +0800 Subject: [PATCH 24/24] enh(Poco::ActiveThreadPool): Rename member variable m_impl to _impl --- Foundation/include/Poco/ActiveThreadPool.h | 2 +- Foundation/src/ActiveThreadPool.cpp | 30 +++++++++++----------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/Foundation/include/Poco/ActiveThreadPool.h b/Foundation/include/Poco/ActiveThreadPool.h index 1edbb5f579..8b72d98f5c 100644 --- a/Foundation/include/Poco/ActiveThreadPool.h +++ b/Foundation/include/Poco/ActiveThreadPool.h @@ -89,7 +89,7 @@ class Foundation_API ActiveThreadPool ActiveThreadPool& operator = (const ActiveThreadPool& pool); private: - std::unique_ptr m_impl; + std::unique_ptr _impl; }; } // namespace Poco diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 12f44e8656..857d44aea7 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -422,13 +422,13 @@ void ActiveThreadPoolPrivate::joinAll() ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize): - m_impl(new ActiveThreadPoolPrivate(capacity, stackSize)) + _impl(new ActiveThreadPoolPrivate(capacity, stackSize)) { } ActiveThreadPool::ActiveThreadPool(const std::string& name, int capacity, int stackSize): - m_impl(new ActiveThreadPoolPrivate(capacity, stackSize, name)) + _impl(new ActiveThreadPoolPrivate(capacity, stackSize, name)) { } @@ -440,22 +440,22 @@ ActiveThreadPool::~ActiveThreadPool() int ActiveThreadPool::capacity() const { - return m_impl->maxThreadCount; + return _impl->maxThreadCount; } void ActiveThreadPool::start(Runnable& target, int priority) { - FastMutex::ScopedLock lock(m_impl->mutex); + FastMutex::ScopedLock lock(_impl->mutex); - if (!m_impl->tryStart(target)) + if (!_impl->tryStart(target)) { - m_impl->enqueueTask(target, priority); + _impl->enqueueTask(target, priority); - if (!m_impl->waitingThreads.empty()) + if (!_impl->waitingThreads.empty()) { - auto pThread = m_impl->waitingThreads.front(); - m_impl->waitingThreads.pop_front(); + auto pThread = _impl->waitingThreads.front(); + _impl->waitingThreads.pop_front(); pThread->notifyRunnableReady(); } } @@ -464,7 +464,7 @@ void ActiveThreadPool::start(Runnable& target, int priority) void ActiveThreadPool::joinAll() { - m_impl->joinAll(); + _impl->joinAll(); } @@ -477,28 +477,28 @@ ActiveThreadPool& ActiveThreadPool::defaultPool() int ActiveThreadPool::getStackSize() const { - return m_impl->stackSize; + return _impl->stackSize; } int ActiveThreadPool::expiryTimeout() const { - return m_impl->expiryTimeout; + return _impl->expiryTimeout; } void ActiveThreadPool::setExpiryTimeout(int expiryTimeout) { - if (m_impl->expiryTimeout != expiryTimeout) + if (_impl->expiryTimeout != expiryTimeout) { - m_impl->expiryTimeout = expiryTimeout; + _impl->expiryTimeout = expiryTimeout; } } const std::string& ActiveThreadPool::name() const { - return m_impl->name; + return _impl->name; } } // namespace Poco