Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ add_library(moqx_core STATIC
src/relay/TopNFilter.cpp
src/relay/PropertyRanking.cpp
src/relay/CrossExecFilter.cpp
src/relay/CrossExecForwarderCallback.cpp
src/relay/PublisherCrossExecFilter.cpp
src/relay/SubscriberCrossExecFilter.cpp
)
Expand Down
47 changes: 47 additions & 0 deletions src/relay/CrossExecForwarderCallback.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) OpenMOQ contributors.
* This source code is licensed under the Apache 2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "CrossExecForwarderCallback.h"

namespace openmoq::moqx {

void CrossExecForwarderCallback::onEmpty(moxygen::MoQForwarder* /*forwarder*/) {
auto f = forwarder_.lock();
if (!f) {
return;
}
targetExec_->add([f = std::move(f), downstream = downstream_]() mutable {
downstream->onEmpty(f.get());
});
}

void CrossExecForwarderCallback::forwardChanged(
moxygen::MoQForwarder* /*forwarder*/,
bool forward
) {
auto f = forwarder_.lock();
if (!f) {
return;
}
targetExec_->add([f = std::move(f), downstream = downstream_, forward]() mutable {
downstream->forwardChanged(f.get(), forward);
});
}

void CrossExecForwarderCallback::newGroupRequested(
moxygen::MoQForwarder* /*forwarder*/,
uint64_t group
) {
auto f = forwarder_.lock();
if (!f) {
return;
}
targetExec_->add([f = std::move(f), downstream = downstream_, group]() mutable {
downstream->newGroupRequested(f.get(), group);
});
}

} // namespace openmoq::moqx
39 changes: 39 additions & 0 deletions src/relay/CrossExecForwarderCallback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) OpenMOQ contributors.
* This source code is licensed under the Apache 2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <folly/Executor.h>
#include <memory>
#include <moxygen/relay/MoQForwarder.h>

namespace openmoq::moqx {

// Dispatches MoQForwarder::Callback methods to a target executor (fire-and-forget).
// Locks the weak_ptr on the calling thread (where the forwarder is guaranteed alive)
// and moves the resulting shared_ptr into the lambda, keeping the forwarder alive
// across the executor hop without forming a permanent ownership cycle.
class CrossExecForwarderCallback final : public moxygen::MoQForwarder::Callback {
public:
CrossExecForwarderCallback(
folly::Executor* targetExec,
std::weak_ptr<moxygen::MoQForwarder> forwarder,
std::shared_ptr<moxygen::MoQForwarder::Callback> downstream
)
: targetExec_(targetExec), forwarder_(std::move(forwarder)),
downstream_(std::move(downstream)) {}

void onEmpty(moxygen::MoQForwarder* forwarder) override;
void forwardChanged(moxygen::MoQForwarder* forwarder, bool forward) override;
void newGroupRequested(moxygen::MoQForwarder* forwarder, uint64_t group) override;

private:
folly::Executor* targetExec_;
std::weak_ptr<moxygen::MoQForwarder> forwarder_;
std::shared_ptr<moxygen::MoQForwarder::Callback> downstream_;
};

} // namespace openmoq::moqx
Loading