Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ add_library(moqx_core STATIC
src/relay/TopNFilter.cpp
src/relay/PropertyRanking.cpp
src/relay/CrossExecFilter.cpp
src/relay/PublisherCrossExecFilter.cpp
src/relay/SubscriberCrossExecFilter.cpp
)

target_include_directories(moqx_core
Expand Down
56 changes: 56 additions & 0 deletions src/relay/PublisherCrossExecFilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) OpenMOQ contributors.
* This source code is licensed under the Apache 2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "relay/PublisherCrossExecFilter.h"

namespace openmoq::moqx {

folly::coro::Task<moxygen::Publisher::TrackStatusResult>
PublisherCrossExecFilter::trackStatus(moxygen::TrackStatus trackStatus) {
co_return co_await folly::coro::co_withExecutor(
folly::getKeepAliveToken(targetExec_),
inner_->trackStatus(std::move(trackStatus))
);
}

folly::coro::Task<moxygen::Publisher::SubscribeResult> PublisherCrossExecFilter::subscribe(
moxygen::SubscribeRequest sub,
std::shared_ptr<moxygen::TrackConsumer> callback
) {
co_return co_await folly::coro::co_withExecutor(
folly::getKeepAliveToken(targetExec_),
inner_->subscribe(std::move(sub), std::move(callback))
);
}

folly::coro::Task<moxygen::Publisher::FetchResult> PublisherCrossExecFilter::fetch(
moxygen::Fetch fetchReq,
std::shared_ptr<moxygen::FetchConsumer> fetchCallback
) {
co_return co_await folly::coro::co_withExecutor(
folly::getKeepAliveToken(targetExec_),
inner_->fetch(std::move(fetchReq), std::move(fetchCallback))
);
}

folly::coro::Task<moxygen::Publisher::SubscribeNamespaceResult>
PublisherCrossExecFilter::subscribeNamespace(
moxygen::SubscribeNamespace subAnn,
std::shared_ptr<NamespacePublishHandle> namespacePublishHandle
) {
co_return co_await folly::coro::co_withExecutor(
folly::getKeepAliveToken(targetExec_),
inner_->subscribeNamespace(std::move(subAnn), std::move(namespacePublishHandle))
);
}

void PublisherCrossExecFilter::goaway(moxygen::Goaway goaway) {
targetExec_->add([inner = inner_, goaway = std::move(goaway)]() mutable {
inner->goaway(std::move(goaway));
});
}

} // namespace openmoq::moqx
47 changes: 47 additions & 0 deletions src/relay/PublisherCrossExecFilter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) OpenMOQ contributors.
* This source code is licensed under the Apache 2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <folly/Executor.h>
#include <memory>
#include <moxygen/Publisher.h>

namespace openmoq::moqx {

// Forwards all Publisher calls to a target executor.
// Coroutine methods switch to targetExec_ before invoking the inner and return
// the result to the caller. goaway() is fire-and-forget.
//
// Requires targetExec_ to be a FIFO executor if call ordering matters.
class PublisherCrossExecFilter final : public moxygen::Publisher {
public:
PublisherCrossExecFilter(folly::Executor* targetExec, std::shared_ptr<moxygen::Publisher> inner)
: targetExec_(targetExec), inner_(std::move(inner)) {}

folly::coro::Task<TrackStatusResult> trackStatus(moxygen::TrackStatus trackStatus) override;

folly::coro::Task<SubscribeResult> subscribe(
moxygen::SubscribeRequest sub,
std::shared_ptr<moxygen::TrackConsumer> callback
) override;

folly::coro::Task<FetchResult>
fetch(moxygen::Fetch fetchReq, std::shared_ptr<moxygen::FetchConsumer> fetchCallback) override;

folly::coro::Task<SubscribeNamespaceResult> subscribeNamespace(
moxygen::SubscribeNamespace subAnn,
std::shared_ptr<NamespacePublishHandle> namespacePublishHandle
) override;

void goaway(moxygen::Goaway goaway) override;

private:
folly::Executor* targetExec_;
std::shared_ptr<moxygen::Publisher> inner_;
};

} // namespace openmoq::moqx
68 changes: 68 additions & 0 deletions src/relay/SubscriberCrossExecFilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) OpenMOQ contributors.
* This source code is licensed under the Apache 2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "relay/SubscriberCrossExecFilter.h"

#include "relay/CrossExecFilter.h"

namespace openmoq::moqx {

folly::coro::Task<moxygen::Subscriber::PublishNamespaceResult>
SubscriberCrossExecFilter::publishNamespace(
moxygen::PublishNamespace ann,
std::shared_ptr<PublishNamespaceCallback> callback
) {
co_return co_await folly::coro::co_withExecutor(
folly::getKeepAliveToken(targetExec_),
inner_->publishNamespace(std::move(ann), std::move(callback))
);
}

moxygen::Subscriber::PublishResult SubscriberCrossExecFilter::publish(
moxygen::PublishRequest pub,
std::shared_ptr<moxygen::SubscriptionHandle> handle
) {
auto filter = std::make_shared<CrossExecFilter>(targetExec_, nullptr);
// co_invoke captures exec and inner by value here (while `this` is live),
// so the task's coroutine frame owns them and doesn't need `this` to survive.
auto reply = folly::coro::co_invoke(
[exec = targetExec_,
inner = inner_,
pub = std::move(pub),
handle = std::move(handle),
filter](
) mutable -> folly::coro::Task<folly::Expected<moxygen::PublishOk, moxygen::PublishError>> {
co_return co_await folly::coro::co_withExecutor(
folly::getKeepAliveToken(exec),
doPublishOnExec(std::move(inner), std::move(pub), std::move(handle), std::move(filter))
);
}
);
return PublishConsumerAndReplyTask{std::move(filter), std::move(reply)};
}

folly::coro::Task<folly::Expected<moxygen::PublishOk, moxygen::PublishError>>
SubscriberCrossExecFilter::doPublishOnExec(
std::shared_ptr<moxygen::Subscriber> inner,
moxygen::PublishRequest pub,
std::shared_ptr<moxygen::SubscriptionHandle> handle,
std::shared_ptr<CrossExecFilter> filter
) {
auto result = inner->publish(std::move(pub), std::move(handle));
if (result.hasError()) {
co_return folly::makeUnexpected(result.error());
}
filter->setDownstream(std::move(result.value().consumer));
co_return co_await std::move(result.value().reply);
}

void SubscriberCrossExecFilter::goaway(moxygen::Goaway goaway) {
targetExec_->add([inner = inner_, goaway = std::move(goaway)]() mutable {
inner->goaway(std::move(goaway));
});
}

} // namespace openmoq::moqx
57 changes: 57 additions & 0 deletions src/relay/SubscriberCrossExecFilter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) OpenMOQ contributors.
* This source code is licensed under the Apache 2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <folly/Executor.h>
#include <memory>
#include <moxygen/Subscriber.h>

namespace openmoq::moqx {

class CrossExecFilter;

// Forwards all Subscriber calls to a target executor.
// publishNamespace() switches to targetExec_ before invoking the inner.
// publish() returns a CrossExecFilter with null downstream immediately and a
// reply Task; driving the reply Task calls inner_->publish() on targetExec_,
// wires the real consumer via setDownstream(), then awaits the reply. FIFO
// ordering guarantees setDownstream() runs before any data writes enqueued
// afterward.
// goaway() is fire-and-forget.
//
// Requires targetExec_ to be a FIFO executor if call ordering matters.
class SubscriberCrossExecFilter final : public moxygen::Subscriber {
public:
SubscriberCrossExecFilter(folly::Executor* targetExec, std::shared_ptr<moxygen::Subscriber> inner)
: targetExec_(targetExec), inner_(std::move(inner)) {}

folly::coro::Task<PublishNamespaceResult> publishNamespace(
moxygen::PublishNamespace ann,
std::shared_ptr<PublishNamespaceCallback> callback
) override;

PublishResult publish(
moxygen::PublishRequest pub,
std::shared_ptr<moxygen::SubscriptionHandle> handle
) override;

void goaway(moxygen::Goaway goaway) override;

private:
static folly::coro::Task<folly::Expected<moxygen::PublishOk, moxygen::PublishError>>
doPublishOnExec(
std::shared_ptr<moxygen::Subscriber> inner,
moxygen::PublishRequest pub,
std::shared_ptr<moxygen::SubscriptionHandle> handle,
std::shared_ptr<CrossExecFilter> filter
);

folly::Executor* targetExec_;
std::shared_ptr<moxygen::Subscriber> inner_;
};

} // namespace openmoq::moqx
28 changes: 28 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,34 @@ target_link_libraries(moqx_cross_exec_filter_test PRIVATE
)
gtest_discover_tests(moqx_cross_exec_filter_test)

# PublisherCrossExecFilter unit tests
add_executable(moqx_publisher_cross_exec_filter_test
PublisherCrossExecFilterTest.cpp
)
target_link_libraries(moqx_publisher_cross_exec_filter_test PRIVATE
moqx_core
GTest::gtest_main
GTest::gmock
moxygen::moqtest_utils
Folly::folly_executors_manual_executor
Folly::folly_executors_cpu_thread_pool_executor
)
gtest_discover_tests(moqx_publisher_cross_exec_filter_test)

# SubscriberCrossExecFilter unit tests
add_executable(moqx_subscriber_cross_exec_filter_test
SubscriberCrossExecFilterTest.cpp
)
target_link_libraries(moqx_subscriber_cross_exec_filter_test PRIVATE
moqx_core
GTest::gtest_main
GTest::gmock
moxygen::moqtest_utils
Folly::folly_executors_manual_executor
Folly::folly_executors_cpu_thread_pool_executor
)
gtest_discover_tests(moqx_subscriber_cross_exec_filter_test)

# --- MoqxCache tests ---

add_executable(moqx_cache_test
Expand Down
Loading
Loading