diff --git a/CMakeLists.txt b/CMakeLists.txt index d04c82aa..ed309c93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -119,6 +119,8 @@ add_library(moqx_core STATIC src/relay/TopNFilter.cpp src/relay/PropertyRanking.cpp src/relay/CrossExecFilter.cpp + src/relay/PublisherCrossExecFilter.cpp + src/relay/SubscriberCrossExecFilter.cpp ) target_include_directories(moqx_core diff --git a/src/relay/PublisherCrossExecFilter.cpp b/src/relay/PublisherCrossExecFilter.cpp new file mode 100644 index 00000000..0dfc4f18 --- /dev/null +++ b/src/relay/PublisherCrossExecFilter.cpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/PublisherCrossExecFilter.h" + +namespace openmoq::moqx { + +folly::coro::Task +PublisherCrossExecFilter::trackStatus(moxygen::TrackStatus trackStatus) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->trackStatus(std::move(trackStatus)) + ); +} + +folly::coro::Task PublisherCrossExecFilter::subscribe( + moxygen::SubscribeRequest sub, + std::shared_ptr callback +) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->subscribe(std::move(sub), std::move(callback)) + ); +} + +folly::coro::Task PublisherCrossExecFilter::fetch( + moxygen::Fetch fetchReq, + std::shared_ptr fetchCallback +) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->fetch(std::move(fetchReq), std::move(fetchCallback)) + ); +} + +folly::coro::Task +PublisherCrossExecFilter::subscribeNamespace( + moxygen::SubscribeNamespace subAnn, + std::shared_ptr namespacePublishHandle +) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->subscribeNamespace(std::move(subAnn), std::move(namespacePublishHandle)) + ); +} + +void PublisherCrossExecFilter::goaway(moxygen::Goaway goaway) { + targetExec_->add([inner = inner_, goaway = std::move(goaway)]() mutable { + inner->goaway(std::move(goaway)); + }); +} + +} // namespace openmoq::moqx diff --git a/src/relay/PublisherCrossExecFilter.h b/src/relay/PublisherCrossExecFilter.h new file mode 100644 index 00000000..c3c7499e --- /dev/null +++ b/src/relay/PublisherCrossExecFilter.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +namespace openmoq::moqx { + +// Forwards all Publisher calls to a target executor. +// Coroutine methods switch to targetExec_ before invoking the inner and return +// the result to the caller. goaway() is fire-and-forget. +// +// Requires targetExec_ to be a FIFO executor if call ordering matters. +class PublisherCrossExecFilter final : public moxygen::Publisher { +public: + PublisherCrossExecFilter(folly::Executor* targetExec, std::shared_ptr inner) + : targetExec_(targetExec), inner_(std::move(inner)) {} + + folly::coro::Task trackStatus(moxygen::TrackStatus trackStatus) override; + + folly::coro::Task subscribe( + moxygen::SubscribeRequest sub, + std::shared_ptr callback + ) override; + + folly::coro::Task + fetch(moxygen::Fetch fetchReq, std::shared_ptr fetchCallback) override; + + folly::coro::Task subscribeNamespace( + moxygen::SubscribeNamespace subAnn, + std::shared_ptr namespacePublishHandle + ) override; + + void goaway(moxygen::Goaway goaway) override; + +private: + folly::Executor* targetExec_; + std::shared_ptr inner_; +}; + +} // namespace openmoq::moqx diff --git a/src/relay/SubscriberCrossExecFilter.cpp b/src/relay/SubscriberCrossExecFilter.cpp new file mode 100644 index 00000000..888c3bd4 --- /dev/null +++ b/src/relay/SubscriberCrossExecFilter.cpp @@ -0,0 +1,68 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/SubscriberCrossExecFilter.h" + +#include "relay/CrossExecFilter.h" + +namespace openmoq::moqx { + +folly::coro::Task +SubscriberCrossExecFilter::publishNamespace( + moxygen::PublishNamespace ann, + std::shared_ptr callback +) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->publishNamespace(std::move(ann), std::move(callback)) + ); +} + +moxygen::Subscriber::PublishResult SubscriberCrossExecFilter::publish( + moxygen::PublishRequest pub, + std::shared_ptr handle +) { + auto filter = std::make_shared(targetExec_, nullptr); + // co_invoke captures exec and inner by value here (while `this` is live), + // so the task's coroutine frame owns them and doesn't need `this` to survive. + auto reply = folly::coro::co_invoke( + [exec = targetExec_, + inner = inner_, + pub = std::move(pub), + handle = std::move(handle), + filter]( + ) mutable -> folly::coro::Task> { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(exec), + doPublishOnExec(std::move(inner), std::move(pub), std::move(handle), std::move(filter)) + ); + } + ); + return PublishConsumerAndReplyTask{std::move(filter), std::move(reply)}; +} + +folly::coro::Task> +SubscriberCrossExecFilter::doPublishOnExec( + std::shared_ptr inner, + moxygen::PublishRequest pub, + std::shared_ptr handle, + std::shared_ptr filter +) { + auto result = inner->publish(std::move(pub), std::move(handle)); + if (result.hasError()) { + co_return folly::makeUnexpected(result.error()); + } + filter->setDownstream(std::move(result.value().consumer)); + co_return co_await std::move(result.value().reply); +} + +void SubscriberCrossExecFilter::goaway(moxygen::Goaway goaway) { + targetExec_->add([inner = inner_, goaway = std::move(goaway)]() mutable { + inner->goaway(std::move(goaway)); + }); +} + +} // namespace openmoq::moqx diff --git a/src/relay/SubscriberCrossExecFilter.h b/src/relay/SubscriberCrossExecFilter.h new file mode 100644 index 00000000..1f12cf99 --- /dev/null +++ b/src/relay/SubscriberCrossExecFilter.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +namespace openmoq::moqx { + +class CrossExecFilter; + +// Forwards all Subscriber calls to a target executor. +// publishNamespace() switches to targetExec_ before invoking the inner. +// publish() returns a CrossExecFilter with null downstream immediately and a +// reply Task; driving the reply Task calls inner_->publish() on targetExec_, +// wires the real consumer via setDownstream(), then awaits the reply. FIFO +// ordering guarantees setDownstream() runs before any data writes enqueued +// afterward. +// goaway() is fire-and-forget. +// +// Requires targetExec_ to be a FIFO executor if call ordering matters. +class SubscriberCrossExecFilter final : public moxygen::Subscriber { +public: + SubscriberCrossExecFilter(folly::Executor* targetExec, std::shared_ptr inner) + : targetExec_(targetExec), inner_(std::move(inner)) {} + + folly::coro::Task publishNamespace( + moxygen::PublishNamespace ann, + std::shared_ptr callback + ) override; + + PublishResult publish( + moxygen::PublishRequest pub, + std::shared_ptr handle + ) override; + + void goaway(moxygen::Goaway goaway) override; + +private: + static folly::coro::Task> + doPublishOnExec( + std::shared_ptr inner, + moxygen::PublishRequest pub, + std::shared_ptr handle, + std::shared_ptr filter + ); + + folly::Executor* targetExec_; + std::shared_ptr inner_; +}; + +} // namespace openmoq::moqx diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 246a2d7e..bc86edca 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -285,6 +285,34 @@ target_link_libraries(moqx_cross_exec_filter_test PRIVATE ) gtest_discover_tests(moqx_cross_exec_filter_test) +# PublisherCrossExecFilter unit tests +add_executable(moqx_publisher_cross_exec_filter_test + PublisherCrossExecFilterTest.cpp +) +target_link_libraries(moqx_publisher_cross_exec_filter_test PRIVATE + moqx_core + GTest::gtest_main + GTest::gmock + moxygen::moqtest_utils + Folly::folly_executors_manual_executor + Folly::folly_executors_cpu_thread_pool_executor +) +gtest_discover_tests(moqx_publisher_cross_exec_filter_test) + +# SubscriberCrossExecFilter unit tests +add_executable(moqx_subscriber_cross_exec_filter_test + SubscriberCrossExecFilterTest.cpp +) +target_link_libraries(moqx_subscriber_cross_exec_filter_test PRIVATE + moqx_core + GTest::gtest_main + GTest::gmock + moxygen::moqtest_utils + Folly::folly_executors_manual_executor + Folly::folly_executors_cpu_thread_pool_executor +) +gtest_discover_tests(moqx_subscriber_cross_exec_filter_test) + # --- MoqxCache tests --- add_executable(moqx_cache_test diff --git a/test/PublisherCrossExecFilterTest.cpp b/test/PublisherCrossExecFilterTest.cpp new file mode 100644 index 00000000..442fd07c --- /dev/null +++ b/test/PublisherCrossExecFilterTest.cpp @@ -0,0 +1,198 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/PublisherCrossExecFilter.h" + +#include +#include +#include +#include +#include +#include + +using namespace testing; +using namespace moxygen; +using namespace openmoq::moqx; + +namespace { + +class PublisherCrossExecFilterTest : public ::testing::Test { +protected: + void SetUp() override { + targetExec_ = std::make_shared(1); + inner_ = std::make_shared>(); + filter_ = std::make_shared(targetExec_.get(), inner_); + } + + std::shared_ptr targetExec_; + std::shared_ptr> inner_; + std::shared_ptr filter_; +}; + +// ---- subscribe ---- + +TEST_F(PublisherCrossExecFilterTest, SubscribeForwardsToInner) { + SubscribeOk ok; + ok.requestID = RequestID(1); + auto handle = std::make_shared>(ok); + EXPECT_CALL(*inner_, subscribe(_, _)) + .WillOnce( + [handle](SubscribeRequest, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeExpected(std::shared_ptr(handle + )); + } + ); + + SubscribeRequest sub; + sub.requestID = RequestID(1); + auto result = folly::coro::blockingWait(filter_->subscribe(std::move(sub), nullptr)); + EXPECT_TRUE(result.hasValue()); +} + +TEST_F(PublisherCrossExecFilterTest, SubscribeReturnsError) { + EXPECT_CALL(*inner_, subscribe(_, _)) + .WillOnce( + [](SubscribeRequest sub, + std::shared_ptr) -> folly::coro::Task { + co_return folly::makeUnexpected( + SubscribeError{sub.requestID, SubscribeErrorCode::NOT_SUPPORTED, "nope"} + ); + } + ); + + SubscribeRequest sub; + sub.requestID = RequestID(2); + auto result = folly::coro::blockingWait(filter_->subscribe(std::move(sub), nullptr)); + EXPECT_FALSE(result.hasValue()); + EXPECT_EQ(result.error().errorCode, SubscribeErrorCode::NOT_SUPPORTED); +} + +// ---- trackStatus ---- + +TEST_F(PublisherCrossExecFilterTest, TrackStatusForwardsToInner) { + EXPECT_CALL(*inner_, trackStatus(_)) + .WillOnce([](TrackStatus ts) -> folly::coro::Task { + TrackStatusOk ok; + ok.requestID = ts.requestID; + co_return folly::makeExpected(std::move(ok)); + }); + + TrackStatus ts; + ts.requestID = RequestID(3); + auto result = folly::coro::blockingWait(filter_->trackStatus(std::move(ts))); + EXPECT_TRUE(result.hasValue()); +} + +TEST_F(PublisherCrossExecFilterTest, TrackStatusReturnsError) { + EXPECT_CALL(*inner_, trackStatus(_)) + .WillOnce([](TrackStatus ts) -> folly::coro::Task { + co_return folly::makeUnexpected( + TrackStatusError{ts.requestID, TrackStatusErrorCode::NOT_SUPPORTED, "nope"} + ); + }); + + TrackStatus ts; + ts.requestID = RequestID(3); + auto result = folly::coro::blockingWait(filter_->trackStatus(std::move(ts))); + EXPECT_FALSE(result.hasValue()); + EXPECT_EQ(result.error().requestID, RequestID(3)); +} + +// ---- fetch ---- + +TEST_F(PublisherCrossExecFilterTest, FetchForwardsToInner) { + EXPECT_CALL(*inner_, fetch(_, _)) + .WillOnce( + [](Fetch, std::shared_ptr) -> folly::coro::Task { + co_return folly::makeUnexpected( + FetchError{RequestID(4), FetchErrorCode::NOT_SUPPORTED, "nope"} + ); + } + ); + + Fetch fetchReq; + fetchReq.requestID = RequestID(4); + auto result = folly::coro::blockingWait(filter_->fetch(std::move(fetchReq), nullptr)); + EXPECT_FALSE(result.hasValue()); + EXPECT_EQ(result.error().errorCode, FetchErrorCode::NOT_SUPPORTED); +} + +TEST_F(PublisherCrossExecFilterTest, FetchReturnsSuccess) { + FetchOk ok; + ok.requestID = RequestID(4); + auto handle = std::make_shared>(ok); + EXPECT_CALL(*inner_, fetch(_, _)) + .WillOnce( + [handle](Fetch, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeExpected(std::shared_ptr(handle + )); + } + ); + + Fetch fetchReq; + fetchReq.requestID = RequestID(4); + auto result = folly::coro::blockingWait(filter_->fetch(std::move(fetchReq), nullptr)); + ASSERT_TRUE(result.hasValue()); + ASSERT_NE(result.value(), nullptr); + EXPECT_EQ(result.value()->fetchOk().requestID, RequestID(4)); +} + +// ---- subscribeNamespace ---- + +TEST_F(PublisherCrossExecFilterTest, SubscribeNamespaceForwardsToInner) { + EXPECT_CALL(*inner_, subscribeNamespace(_, _)) + .WillOnce( + [](SubscribeNamespace subAnn, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeUnexpected(SubscribeNamespaceError{ + subAnn.requestID, + SubscribeNamespaceErrorCode::NOT_SUPPORTED, + "nope" + }); + } + ); + + SubscribeNamespace subAnn; + subAnn.requestID = RequestID(5); + auto result = folly::coro::blockingWait(filter_->subscribeNamespace(std::move(subAnn), nullptr)); + EXPECT_FALSE(result.hasValue()); +} + +TEST_F(PublisherCrossExecFilterTest, SubscribeNamespaceReturnsSuccess) { + auto handle = std::make_shared>(); + EXPECT_CALL(*inner_, subscribeNamespace(_, _)) + .WillOnce( + [handle](SubscribeNamespace, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeExpected( + std::shared_ptr(handle) + ); + } + ); + + SubscribeNamespace subAnn; + subAnn.requestID = RequestID(5); + auto result = folly::coro::blockingWait(filter_->subscribeNamespace(std::move(subAnn), nullptr)); + ASSERT_TRUE(result.hasValue()); + EXPECT_NE(result.value(), nullptr); +} + +// ---- goaway ---- + +TEST_F(PublisherCrossExecFilterTest, GoawayEnqueued) { + folly::ManualExecutor manualExec; + auto filter = std::make_shared(&manualExec, inner_); + + EXPECT_CALL(*inner_, goaway(_)).Times(0); + filter->goaway(Goaway{}); + + EXPECT_CALL(*inner_, goaway(_)).Times(1); + manualExec.drain(); +} + +} // namespace diff --git a/test/SubscriberCrossExecFilterTest.cpp b/test/SubscriberCrossExecFilterTest.cpp new file mode 100644 index 00000000..cec9c73b --- /dev/null +++ b/test/SubscriberCrossExecFilterTest.cpp @@ -0,0 +1,215 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/SubscriberCrossExecFilter.h" + +#include +#include +#include +#include +#include +#include +#include + +using namespace testing; +using namespace moxygen; +using namespace openmoq::moqx; + +namespace { + +class MockSubscriberWithGoaway : public MockSubscriber { +public: + MOCK_METHOD(void, goaway, (Goaway), (override)); +}; + +class SubscriberCrossExecFilterTest : public ::testing::Test { +protected: + void SetUp() override { + targetExec_ = std::make_shared(1); + inner_ = std::make_shared>(); + filter_ = std::make_shared(targetExec_.get(), inner_); + } + + std::shared_ptr targetExec_; + std::shared_ptr> inner_; + std::shared_ptr filter_; +}; + +// ---- publishNamespace ---- + +TEST_F(SubscriberCrossExecFilterTest, PublishNamespaceForwardsToInner) { + auto nsHandle = std::make_shared>(); + EXPECT_CALL(*inner_, publishNamespace(_, _)) + .WillOnce( + [nsHandle](PublishNamespace, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeExpected( + std::shared_ptr(nsHandle) + ); + } + ); + + PublishNamespace ann; + ann.requestID = RequestID(1); + auto result = folly::coro::blockingWait(filter_->publishNamespace(std::move(ann), nullptr)); + EXPECT_TRUE(result.hasValue()); +} + +TEST_F(SubscriberCrossExecFilterTest, PublishNamespaceReturnsError) { + EXPECT_CALL(*inner_, publishNamespace(_, _)) + .WillOnce( + [](PublishNamespace ann, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeUnexpected(PublishNamespaceError{ + ann.requestID, + PublishNamespaceErrorCode::NOT_SUPPORTED, + "nope" + }); + } + ); + + PublishNamespace ann; + ann.requestID = RequestID(2); + auto result = folly::coro::blockingWait(filter_->publishNamespace(std::move(ann), nullptr)); + EXPECT_FALSE(result.hasValue()); + EXPECT_EQ(result.error().errorCode, PublishNamespaceErrorCode::NOT_SUPPORTED); +} + +// ---- publish (sync, called directly) ---- + +TEST_F(SubscriberCrossExecFilterTest, PublishRunsOnTargetExec) { + PublishOk ok; + ok.requestID = RequestID(3); + EXPECT_CALL(*inner_, publish(_, _)) + .WillOnce( + [ok](PublishRequest, std::shared_ptr) -> Subscriber::PublishResult { + auto consumer = std::make_shared>(); + return Subscriber::PublishConsumerAndReplyTask{ + std::move(consumer), + folly::coro::makeTask>( + folly::makeExpected(ok) + ) + }; + } + ); + + PublishRequest pub; + pub.requestID = RequestID(3); + auto result = filter_->publish(std::move(pub), nullptr); + ASSERT_TRUE(result.hasValue()); + // consumer is the cross-exec filter returned immediately + EXPECT_NE(result.value().consumer, nullptr); + // driving the reply task causes inner->publish() to run on targetExec_ + auto reply = folly::coro::blockingWait(std::move(result.value().reply)); + EXPECT_TRUE(reply.hasValue()); + EXPECT_EQ(reply.value().requestID, RequestID(3)); +} + +TEST_F(SubscriberCrossExecFilterTest, PublishInnerReturnsError) { + EXPECT_CALL(*inner_, publish(_, _)) + .WillOnce( + [](PublishRequest pub, std::shared_ptr) -> Subscriber::PublishResult { + return folly::makeUnexpected( + PublishError{pub.requestID, PublishErrorCode::NOT_SUPPORTED, "nope"} + ); + } + ); + + PublishRequest pub; + pub.requestID = RequestID(4); + auto result = filter_->publish(std::move(pub), nullptr); + ASSERT_TRUE(result.hasValue()); + auto reply = folly::coro::blockingWait(std::move(result.value().reply)); + EXPECT_FALSE(reply.hasValue()); + EXPECT_EQ(reply.error().requestID, RequestID(4)); +} + +TEST_F(SubscriberCrossExecFilterTest, PublishDataFlowsThroughConsumer) { + auto innerConsumer = std::make_shared>(); + PublishOk ok; + ok.requestID = RequestID(5); + EXPECT_CALL(*inner_, publish(_, _)) + .WillOnce( + [innerConsumer, + ok](PublishRequest, std::shared_ptr) -> Subscriber::PublishResult { + return Subscriber::PublishConsumerAndReplyTask{ + innerConsumer, + folly::coro::makeTask>( + folly::makeExpected(ok) + ) + }; + } + ); + + PublishRequest pub; + pub.requestID = RequestID(5); + auto result = filter_->publish(std::move(pub), nullptr); + ASSERT_TRUE(result.hasValue()); + auto consumer = result.value().consumer; + + // Drive reply: calls inner->publish() on targetExec_ and wires up setDownstream(). + folly::coro::blockingWait(std::move(result.value().reply)); + + // objectStream through consumer should reach innerConsumer via CrossExecFilter. + std::promise done; + auto future = done.get_future(); + EXPECT_CALL(*innerConsumer, objectStream(_, _, _)) + .WillOnce([&done](const ObjectHeader&, Payload, bool) { + done.set_value(); + return folly::makeExpected(folly::unit); + }); + consumer->objectStream(ObjectHeader{0, 0, 0}, nullptr, false); + future.wait(); +} + +TEST_F(SubscriberCrossExecFilterTest, PublishSafeAfterFilterDestroyed) { + // Regression: the old coPublish member coroutine captured `this` implicitly. + // Destroying the filter before driving the reply task was a use-after-free. + // The fix captures exec and inner by value in co_invoke so `this` is not needed. + PublishOk ok; + ok.requestID = RequestID(6); + EXPECT_CALL(*inner_, publish(_, _)) + .WillOnce( + [ok](PublishRequest, std::shared_ptr) -> Subscriber::PublishResult { + return Subscriber::PublishConsumerAndReplyTask{ + std::make_shared>(), + folly::coro::makeTask>( + folly::makeExpected(ok) + ) + }; + } + ); + + // Obtain the reply task while the filter is alive on the stack, then let it die. + auto replyTask = [&]() { + SubscriberCrossExecFilter stackFilter(targetExec_.get(), inner_); + PublishRequest pub; + pub.requestID = RequestID(6); + auto result = stackFilter.publish(std::move(pub), nullptr); + EXPECT_TRUE(result.hasValue()); + return std::move(result.value().reply); + }(); + + // stackFilter is destroyed; driving the task must not access a dangling this. + auto reply = folly::coro::blockingWait(std::move(replyTask)); + EXPECT_TRUE(reply.hasValue()); + EXPECT_EQ(reply.value().requestID, RequestID(6)); +} + +// ---- goaway ---- + +TEST_F(SubscriberCrossExecFilterTest, GoawayEnqueued) { + folly::ManualExecutor manualExec; + auto filter = std::make_shared(&manualExec, inner_); + + EXPECT_CALL(*inner_, goaway(_)).Times(0); + filter->goaway(Goaway{}); + + EXPECT_CALL(*inner_, goaway(_)).Times(1); + manualExec.drain(); +} + +} // namespace