From d6112f78273edb07eb0366416f9e68884bb4e0a5 Mon Sep 17 00:00:00 2001 From: afrind Date: Mon, 25 May 2026 16:55:14 -0400 Subject: [PATCH] relay: add CrossExecForwarderCallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dispatches MoQForwarder::Callback (onEmpty, forwardChanged, newGroupRequested) to a target executor fire-and-forget. Locks the weak_ptr on the calling thread — where the forwarder is guaranteed alive — and moves the resulting shared_ptr into the lambda to keep it alive across the executor hop without forming a permanent ownership cycle (forwarder → callback → forwarder). The weak_ptr member avoids that permanent cycle; eager locking at call-site ensures the lambda always has a valid pointer to deliver. --- CMakeLists.txt | 1 + src/relay/CrossExecForwarderCallback.cpp | 47 ++++++++++++++++++++++++ src/relay/CrossExecForwarderCallback.h | 39 ++++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 src/relay/CrossExecForwarderCallback.cpp create mode 100644 src/relay/CrossExecForwarderCallback.h diff --git a/CMakeLists.txt b/CMakeLists.txt index ed309c93..a1b5d996 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -119,6 +119,7 @@ add_library(moqx_core STATIC src/relay/TopNFilter.cpp src/relay/PropertyRanking.cpp src/relay/CrossExecFilter.cpp + src/relay/CrossExecForwarderCallback.cpp src/relay/PublisherCrossExecFilter.cpp src/relay/SubscriberCrossExecFilter.cpp ) diff --git a/src/relay/CrossExecForwarderCallback.cpp b/src/relay/CrossExecForwarderCallback.cpp new file mode 100644 index 00000000..14968bc4 --- /dev/null +++ b/src/relay/CrossExecForwarderCallback.cpp @@ -0,0 +1,47 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "CrossExecForwarderCallback.h" + +namespace openmoq::moqx { + +void CrossExecForwarderCallback::onEmpty(moxygen::MoQForwarder* /*forwarder*/) { + auto f = forwarder_.lock(); + if (!f) { + return; + } + targetExec_->add([f = std::move(f), downstream = downstream_]() mutable { + downstream->onEmpty(f.get()); + }); +} + +void CrossExecForwarderCallback::forwardChanged( + moxygen::MoQForwarder* /*forwarder*/, + bool forward +) { + auto f = forwarder_.lock(); + if (!f) { + return; + } + targetExec_->add([f = std::move(f), downstream = downstream_, forward]() mutable { + downstream->forwardChanged(f.get(), forward); + }); +} + +void CrossExecForwarderCallback::newGroupRequested( + moxygen::MoQForwarder* /*forwarder*/, + uint64_t group +) { + auto f = forwarder_.lock(); + if (!f) { + return; + } + targetExec_->add([f = std::move(f), downstream = downstream_, group]() mutable { + downstream->newGroupRequested(f.get(), group); + }); +} + +} // namespace openmoq::moqx diff --git a/src/relay/CrossExecForwarderCallback.h b/src/relay/CrossExecForwarderCallback.h new file mode 100644 index 00000000..33e6add2 --- /dev/null +++ b/src/relay/CrossExecForwarderCallback.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +namespace openmoq::moqx { + +// Dispatches MoQForwarder::Callback methods to a target executor (fire-and-forget). +// Locks the weak_ptr on the calling thread (where the forwarder is guaranteed alive) +// and moves the resulting shared_ptr into the lambda, keeping the forwarder alive +// across the executor hop without forming a permanent ownership cycle. +class CrossExecForwarderCallback final : public moxygen::MoQForwarder::Callback { +public: + CrossExecForwarderCallback( + folly::Executor* targetExec, + std::weak_ptr forwarder, + std::shared_ptr downstream + ) + : targetExec_(targetExec), forwarder_(std::move(forwarder)), + downstream_(std::move(downstream)) {} + + void onEmpty(moxygen::MoQForwarder* forwarder) override; + void forwardChanged(moxygen::MoQForwarder* forwarder, bool forward) override; + void newGroupRequested(moxygen::MoQForwarder* forwarder, uint64_t group) override; + +private: + folly::Executor* targetExec_; + std::weak_ptr forwarder_; + std::shared_ptr downstream_; +}; + +} // namespace openmoq::moqx