From 036df5912f0c8fe020ed4479e2137adac8d5b60b Mon Sep 17 00:00:00 2001 From: afrind Date: Mon, 25 May 2026 13:03:23 -0400 Subject: [PATCH] relay: add PublisherCrossExecFilter and SubscriberCrossExecFilter Summary: Adds two executor-crossing filter classes that wrap the Publisher and Subscriber interfaces and forward all calls to a target executor. PublisherCrossExecFilter uses co_withExecutor on every coroutine method (subscribe, fetch, trackStatus, subscribeNamespace) to switch to the target executor before invoking the inner. goaway() is fire-and-forget via executor->add(). SubscriberCrossExecFilter does the same for publishNamespace() and goaway(). publish() is more involved: it returns a CrossExecFilter as the consumer immediately (before the inner is called), then the reply Task switches to the target executor, calls inner->publish(), and wires setDownstream() to connect the real consumer. FIFO ordering on the target executor guarantees setDownstream() runs before any data writes enqueued afterward. Both classes require the target executor to be FIFO if call ordering matters. Unit tests cover success/error paths for all methods plus the goaway enqueue-vs-invoke ordering. Test Plan: moqx_publisher_cross_exec_filter_test moqx_subscriber_cross_exec_filter_test --- CMakeLists.txt | 2 + src/relay/PublisherCrossExecFilter.cpp | 56 ++++++ src/relay/PublisherCrossExecFilter.h | 47 ++++++ src/relay/SubscriberCrossExecFilter.cpp | 68 ++++++++ src/relay/SubscriberCrossExecFilter.h | 57 +++++++ test/CMakeLists.txt | 28 +++ test/PublisherCrossExecFilterTest.cpp | 198 ++++++++++++++++++++++ test/SubscriberCrossExecFilterTest.cpp | 215 ++++++++++++++++++++++++ 8 files changed, 671 insertions(+) create mode 100644 src/relay/PublisherCrossExecFilter.cpp create mode 100644 src/relay/PublisherCrossExecFilter.h create mode 100644 src/relay/SubscriberCrossExecFilter.cpp create mode 100644 src/relay/SubscriberCrossExecFilter.h create mode 100644 test/PublisherCrossExecFilterTest.cpp create mode 100644 test/SubscriberCrossExecFilterTest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d04c82aa..ed309c93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -119,6 +119,8 @@ add_library(moqx_core STATIC src/relay/TopNFilter.cpp src/relay/PropertyRanking.cpp src/relay/CrossExecFilter.cpp + src/relay/PublisherCrossExecFilter.cpp + src/relay/SubscriberCrossExecFilter.cpp ) target_include_directories(moqx_core diff --git a/src/relay/PublisherCrossExecFilter.cpp b/src/relay/PublisherCrossExecFilter.cpp new file mode 100644 index 00000000..0dfc4f18 --- /dev/null +++ b/src/relay/PublisherCrossExecFilter.cpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/PublisherCrossExecFilter.h" + +namespace openmoq::moqx { + +folly::coro::Task +PublisherCrossExecFilter::trackStatus(moxygen::TrackStatus trackStatus) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->trackStatus(std::move(trackStatus)) + ); +} + +folly::coro::Task PublisherCrossExecFilter::subscribe( + moxygen::SubscribeRequest sub, + std::shared_ptr callback +) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->subscribe(std::move(sub), std::move(callback)) + ); +} + +folly::coro::Task PublisherCrossExecFilter::fetch( + moxygen::Fetch fetchReq, + std::shared_ptr fetchCallback +) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->fetch(std::move(fetchReq), std::move(fetchCallback)) + ); +} + +folly::coro::Task +PublisherCrossExecFilter::subscribeNamespace( + moxygen::SubscribeNamespace subAnn, + std::shared_ptr namespacePublishHandle +) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->subscribeNamespace(std::move(subAnn), std::move(namespacePublishHandle)) + ); +} + +void PublisherCrossExecFilter::goaway(moxygen::Goaway goaway) { + targetExec_->add([inner = inner_, goaway = std::move(goaway)]() mutable { + inner->goaway(std::move(goaway)); + }); +} + +} // namespace openmoq::moqx diff --git a/src/relay/PublisherCrossExecFilter.h b/src/relay/PublisherCrossExecFilter.h new file mode 100644 index 00000000..c3c7499e --- /dev/null +++ b/src/relay/PublisherCrossExecFilter.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +namespace openmoq::moqx { + +// Forwards all Publisher calls to a target executor. +// Coroutine methods switch to targetExec_ before invoking the inner and return +// the result to the caller. goaway() is fire-and-forget. +// +// Requires targetExec_ to be a FIFO executor if call ordering matters. +class PublisherCrossExecFilter final : public moxygen::Publisher { +public: + PublisherCrossExecFilter(folly::Executor* targetExec, std::shared_ptr inner) + : targetExec_(targetExec), inner_(std::move(inner)) {} + + folly::coro::Task trackStatus(moxygen::TrackStatus trackStatus) override; + + folly::coro::Task subscribe( + moxygen::SubscribeRequest sub, + std::shared_ptr callback + ) override; + + folly::coro::Task + fetch(moxygen::Fetch fetchReq, std::shared_ptr fetchCallback) override; + + folly::coro::Task subscribeNamespace( + moxygen::SubscribeNamespace subAnn, + std::shared_ptr namespacePublishHandle + ) override; + + void goaway(moxygen::Goaway goaway) override; + +private: + folly::Executor* targetExec_; + std::shared_ptr inner_; +}; + +} // namespace openmoq::moqx diff --git a/src/relay/SubscriberCrossExecFilter.cpp b/src/relay/SubscriberCrossExecFilter.cpp new file mode 100644 index 00000000..888c3bd4 --- /dev/null +++ b/src/relay/SubscriberCrossExecFilter.cpp @@ -0,0 +1,68 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/SubscriberCrossExecFilter.h" + +#include "relay/CrossExecFilter.h" + +namespace openmoq::moqx { + +folly::coro::Task +SubscriberCrossExecFilter::publishNamespace( + moxygen::PublishNamespace ann, + std::shared_ptr callback +) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(targetExec_), + inner_->publishNamespace(std::move(ann), std::move(callback)) + ); +} + +moxygen::Subscriber::PublishResult SubscriberCrossExecFilter::publish( + moxygen::PublishRequest pub, + std::shared_ptr handle +) { + auto filter = std::make_shared(targetExec_, nullptr); + // co_invoke captures exec and inner by value here (while `this` is live), + // so the task's coroutine frame owns them and doesn't need `this` to survive. + auto reply = folly::coro::co_invoke( + [exec = targetExec_, + inner = inner_, + pub = std::move(pub), + handle = std::move(handle), + filter]( + ) mutable -> folly::coro::Task> { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(exec), + doPublishOnExec(std::move(inner), std::move(pub), std::move(handle), std::move(filter)) + ); + } + ); + return PublishConsumerAndReplyTask{std::move(filter), std::move(reply)}; +} + +folly::coro::Task> +SubscriberCrossExecFilter::doPublishOnExec( + std::shared_ptr inner, + moxygen::PublishRequest pub, + std::shared_ptr handle, + std::shared_ptr filter +) { + auto result = inner->publish(std::move(pub), std::move(handle)); + if (result.hasError()) { + co_return folly::makeUnexpected(result.error()); + } + filter->setDownstream(std::move(result.value().consumer)); + co_return co_await std::move(result.value().reply); +} + +void SubscriberCrossExecFilter::goaway(moxygen::Goaway goaway) { + targetExec_->add([inner = inner_, goaway = std::move(goaway)]() mutable { + inner->goaway(std::move(goaway)); + }); +} + +} // namespace openmoq::moqx diff --git a/src/relay/SubscriberCrossExecFilter.h b/src/relay/SubscriberCrossExecFilter.h new file mode 100644 index 00000000..1f12cf99 --- /dev/null +++ b/src/relay/SubscriberCrossExecFilter.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +namespace openmoq::moqx { + +class CrossExecFilter; + +// Forwards all Subscriber calls to a target executor. +// publishNamespace() switches to targetExec_ before invoking the inner. +// publish() returns a CrossExecFilter with null downstream immediately and a +// reply Task; driving the reply Task calls inner_->publish() on targetExec_, +// wires the real consumer via setDownstream(), then awaits the reply. FIFO +// ordering guarantees setDownstream() runs before any data writes enqueued +// afterward. +// goaway() is fire-and-forget. +// +// Requires targetExec_ to be a FIFO executor if call ordering matters. +class SubscriberCrossExecFilter final : public moxygen::Subscriber { +public: + SubscriberCrossExecFilter(folly::Executor* targetExec, std::shared_ptr inner) + : targetExec_(targetExec), inner_(std::move(inner)) {} + + folly::coro::Task publishNamespace( + moxygen::PublishNamespace ann, + std::shared_ptr callback + ) override; + + PublishResult publish( + moxygen::PublishRequest pub, + std::shared_ptr handle + ) override; + + void goaway(moxygen::Goaway goaway) override; + +private: + static folly::coro::Task> + doPublishOnExec( + std::shared_ptr inner, + moxygen::PublishRequest pub, + std::shared_ptr handle, + std::shared_ptr filter + ); + + folly::Executor* targetExec_; + std::shared_ptr inner_; +}; + +} // namespace openmoq::moqx diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 246a2d7e..bc86edca 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -285,6 +285,34 @@ target_link_libraries(moqx_cross_exec_filter_test PRIVATE ) gtest_discover_tests(moqx_cross_exec_filter_test) +# PublisherCrossExecFilter unit tests +add_executable(moqx_publisher_cross_exec_filter_test + PublisherCrossExecFilterTest.cpp +) +target_link_libraries(moqx_publisher_cross_exec_filter_test PRIVATE + moqx_core + GTest::gtest_main + GTest::gmock + moxygen::moqtest_utils + Folly::folly_executors_manual_executor + Folly::folly_executors_cpu_thread_pool_executor +) +gtest_discover_tests(moqx_publisher_cross_exec_filter_test) + +# SubscriberCrossExecFilter unit tests +add_executable(moqx_subscriber_cross_exec_filter_test + SubscriberCrossExecFilterTest.cpp +) +target_link_libraries(moqx_subscriber_cross_exec_filter_test PRIVATE + moqx_core + GTest::gtest_main + GTest::gmock + moxygen::moqtest_utils + Folly::folly_executors_manual_executor + Folly::folly_executors_cpu_thread_pool_executor +) +gtest_discover_tests(moqx_subscriber_cross_exec_filter_test) + # --- MoqxCache tests --- add_executable(moqx_cache_test diff --git a/test/PublisherCrossExecFilterTest.cpp b/test/PublisherCrossExecFilterTest.cpp new file mode 100644 index 00000000..442fd07c --- /dev/null +++ b/test/PublisherCrossExecFilterTest.cpp @@ -0,0 +1,198 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/PublisherCrossExecFilter.h" + +#include +#include +#include +#include +#include +#include + +using namespace testing; +using namespace moxygen; +using namespace openmoq::moqx; + +namespace { + +class PublisherCrossExecFilterTest : public ::testing::Test { +protected: + void SetUp() override { + targetExec_ = std::make_shared(1); + inner_ = std::make_shared>(); + filter_ = std::make_shared(targetExec_.get(), inner_); + } + + std::shared_ptr targetExec_; + std::shared_ptr> inner_; + std::shared_ptr filter_; +}; + +// ---- subscribe ---- + +TEST_F(PublisherCrossExecFilterTest, SubscribeForwardsToInner) { + SubscribeOk ok; + ok.requestID = RequestID(1); + auto handle = std::make_shared>(ok); + EXPECT_CALL(*inner_, subscribe(_, _)) + .WillOnce( + [handle](SubscribeRequest, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeExpected(std::shared_ptr(handle + )); + } + ); + + SubscribeRequest sub; + sub.requestID = RequestID(1); + auto result = folly::coro::blockingWait(filter_->subscribe(std::move(sub), nullptr)); + EXPECT_TRUE(result.hasValue()); +} + +TEST_F(PublisherCrossExecFilterTest, SubscribeReturnsError) { + EXPECT_CALL(*inner_, subscribe(_, _)) + .WillOnce( + [](SubscribeRequest sub, + std::shared_ptr) -> folly::coro::Task { + co_return folly::makeUnexpected( + SubscribeError{sub.requestID, SubscribeErrorCode::NOT_SUPPORTED, "nope"} + ); + } + ); + + SubscribeRequest sub; + sub.requestID = RequestID(2); + auto result = folly::coro::blockingWait(filter_->subscribe(std::move(sub), nullptr)); + EXPECT_FALSE(result.hasValue()); + EXPECT_EQ(result.error().errorCode, SubscribeErrorCode::NOT_SUPPORTED); +} + +// ---- trackStatus ---- + +TEST_F(PublisherCrossExecFilterTest, TrackStatusForwardsToInner) { + EXPECT_CALL(*inner_, trackStatus(_)) + .WillOnce([](TrackStatus ts) -> folly::coro::Task { + TrackStatusOk ok; + ok.requestID = ts.requestID; + co_return folly::makeExpected(std::move(ok)); + }); + + TrackStatus ts; + ts.requestID = RequestID(3); + auto result = folly::coro::blockingWait(filter_->trackStatus(std::move(ts))); + EXPECT_TRUE(result.hasValue()); +} + +TEST_F(PublisherCrossExecFilterTest, TrackStatusReturnsError) { + EXPECT_CALL(*inner_, trackStatus(_)) + .WillOnce([](TrackStatus ts) -> folly::coro::Task { + co_return folly::makeUnexpected( + TrackStatusError{ts.requestID, TrackStatusErrorCode::NOT_SUPPORTED, "nope"} + ); + }); + + TrackStatus ts; + ts.requestID = RequestID(3); + auto result = folly::coro::blockingWait(filter_->trackStatus(std::move(ts))); + EXPECT_FALSE(result.hasValue()); + EXPECT_EQ(result.error().requestID, RequestID(3)); +} + +// ---- fetch ---- + +TEST_F(PublisherCrossExecFilterTest, FetchForwardsToInner) { + EXPECT_CALL(*inner_, fetch(_, _)) + .WillOnce( + [](Fetch, std::shared_ptr) -> folly::coro::Task { + co_return folly::makeUnexpected( + FetchError{RequestID(4), FetchErrorCode::NOT_SUPPORTED, "nope"} + ); + } + ); + + Fetch fetchReq; + fetchReq.requestID = RequestID(4); + auto result = folly::coro::blockingWait(filter_->fetch(std::move(fetchReq), nullptr)); + EXPECT_FALSE(result.hasValue()); + EXPECT_EQ(result.error().errorCode, FetchErrorCode::NOT_SUPPORTED); +} + +TEST_F(PublisherCrossExecFilterTest, FetchReturnsSuccess) { + FetchOk ok; + ok.requestID = RequestID(4); + auto handle = std::make_shared>(ok); + EXPECT_CALL(*inner_, fetch(_, _)) + .WillOnce( + [handle](Fetch, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeExpected(std::shared_ptr(handle + )); + } + ); + + Fetch fetchReq; + fetchReq.requestID = RequestID(4); + auto result = folly::coro::blockingWait(filter_->fetch(std::move(fetchReq), nullptr)); + ASSERT_TRUE(result.hasValue()); + ASSERT_NE(result.value(), nullptr); + EXPECT_EQ(result.value()->fetchOk().requestID, RequestID(4)); +} + +// ---- subscribeNamespace ---- + +TEST_F(PublisherCrossExecFilterTest, SubscribeNamespaceForwardsToInner) { + EXPECT_CALL(*inner_, subscribeNamespace(_, _)) + .WillOnce( + [](SubscribeNamespace subAnn, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeUnexpected(SubscribeNamespaceError{ + subAnn.requestID, + SubscribeNamespaceErrorCode::NOT_SUPPORTED, + "nope" + }); + } + ); + + SubscribeNamespace subAnn; + subAnn.requestID = RequestID(5); + auto result = folly::coro::blockingWait(filter_->subscribeNamespace(std::move(subAnn), nullptr)); + EXPECT_FALSE(result.hasValue()); +} + +TEST_F(PublisherCrossExecFilterTest, SubscribeNamespaceReturnsSuccess) { + auto handle = std::make_shared>(); + EXPECT_CALL(*inner_, subscribeNamespace(_, _)) + .WillOnce( + [handle](SubscribeNamespace, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeExpected( + std::shared_ptr(handle) + ); + } + ); + + SubscribeNamespace subAnn; + subAnn.requestID = RequestID(5); + auto result = folly::coro::blockingWait(filter_->subscribeNamespace(std::move(subAnn), nullptr)); + ASSERT_TRUE(result.hasValue()); + EXPECT_NE(result.value(), nullptr); +} + +// ---- goaway ---- + +TEST_F(PublisherCrossExecFilterTest, GoawayEnqueued) { + folly::ManualExecutor manualExec; + auto filter = std::make_shared(&manualExec, inner_); + + EXPECT_CALL(*inner_, goaway(_)).Times(0); + filter->goaway(Goaway{}); + + EXPECT_CALL(*inner_, goaway(_)).Times(1); + manualExec.drain(); +} + +} // namespace diff --git a/test/SubscriberCrossExecFilterTest.cpp b/test/SubscriberCrossExecFilterTest.cpp new file mode 100644 index 00000000..cec9c73b --- /dev/null +++ b/test/SubscriberCrossExecFilterTest.cpp @@ -0,0 +1,215 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/SubscriberCrossExecFilter.h" + +#include +#include +#include +#include +#include +#include +#include + +using namespace testing; +using namespace moxygen; +using namespace openmoq::moqx; + +namespace { + +class MockSubscriberWithGoaway : public MockSubscriber { +public: + MOCK_METHOD(void, goaway, (Goaway), (override)); +}; + +class SubscriberCrossExecFilterTest : public ::testing::Test { +protected: + void SetUp() override { + targetExec_ = std::make_shared(1); + inner_ = std::make_shared>(); + filter_ = std::make_shared(targetExec_.get(), inner_); + } + + std::shared_ptr targetExec_; + std::shared_ptr> inner_; + std::shared_ptr filter_; +}; + +// ---- publishNamespace ---- + +TEST_F(SubscriberCrossExecFilterTest, PublishNamespaceForwardsToInner) { + auto nsHandle = std::make_shared>(); + EXPECT_CALL(*inner_, publishNamespace(_, _)) + .WillOnce( + [nsHandle](PublishNamespace, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeExpected( + std::shared_ptr(nsHandle) + ); + } + ); + + PublishNamespace ann; + ann.requestID = RequestID(1); + auto result = folly::coro::blockingWait(filter_->publishNamespace(std::move(ann), nullptr)); + EXPECT_TRUE(result.hasValue()); +} + +TEST_F(SubscriberCrossExecFilterTest, PublishNamespaceReturnsError) { + EXPECT_CALL(*inner_, publishNamespace(_, _)) + .WillOnce( + [](PublishNamespace ann, std::shared_ptr) + -> folly::coro::Task { + co_return folly::makeUnexpected(PublishNamespaceError{ + ann.requestID, + PublishNamespaceErrorCode::NOT_SUPPORTED, + "nope" + }); + } + ); + + PublishNamespace ann; + ann.requestID = RequestID(2); + auto result = folly::coro::blockingWait(filter_->publishNamespace(std::move(ann), nullptr)); + EXPECT_FALSE(result.hasValue()); + EXPECT_EQ(result.error().errorCode, PublishNamespaceErrorCode::NOT_SUPPORTED); +} + +// ---- publish (sync, called directly) ---- + +TEST_F(SubscriberCrossExecFilterTest, PublishRunsOnTargetExec) { + PublishOk ok; + ok.requestID = RequestID(3); + EXPECT_CALL(*inner_, publish(_, _)) + .WillOnce( + [ok](PublishRequest, std::shared_ptr) -> Subscriber::PublishResult { + auto consumer = std::make_shared>(); + return Subscriber::PublishConsumerAndReplyTask{ + std::move(consumer), + folly::coro::makeTask>( + folly::makeExpected(ok) + ) + }; + } + ); + + PublishRequest pub; + pub.requestID = RequestID(3); + auto result = filter_->publish(std::move(pub), nullptr); + ASSERT_TRUE(result.hasValue()); + // consumer is the cross-exec filter returned immediately + EXPECT_NE(result.value().consumer, nullptr); + // driving the reply task causes inner->publish() to run on targetExec_ + auto reply = folly::coro::blockingWait(std::move(result.value().reply)); + EXPECT_TRUE(reply.hasValue()); + EXPECT_EQ(reply.value().requestID, RequestID(3)); +} + +TEST_F(SubscriberCrossExecFilterTest, PublishInnerReturnsError) { + EXPECT_CALL(*inner_, publish(_, _)) + .WillOnce( + [](PublishRequest pub, std::shared_ptr) -> Subscriber::PublishResult { + return folly::makeUnexpected( + PublishError{pub.requestID, PublishErrorCode::NOT_SUPPORTED, "nope"} + ); + } + ); + + PublishRequest pub; + pub.requestID = RequestID(4); + auto result = filter_->publish(std::move(pub), nullptr); + ASSERT_TRUE(result.hasValue()); + auto reply = folly::coro::blockingWait(std::move(result.value().reply)); + EXPECT_FALSE(reply.hasValue()); + EXPECT_EQ(reply.error().requestID, RequestID(4)); +} + +TEST_F(SubscriberCrossExecFilterTest, PublishDataFlowsThroughConsumer) { + auto innerConsumer = std::make_shared>(); + PublishOk ok; + ok.requestID = RequestID(5); + EXPECT_CALL(*inner_, publish(_, _)) + .WillOnce( + [innerConsumer, + ok](PublishRequest, std::shared_ptr) -> Subscriber::PublishResult { + return Subscriber::PublishConsumerAndReplyTask{ + innerConsumer, + folly::coro::makeTask>( + folly::makeExpected(ok) + ) + }; + } + ); + + PublishRequest pub; + pub.requestID = RequestID(5); + auto result = filter_->publish(std::move(pub), nullptr); + ASSERT_TRUE(result.hasValue()); + auto consumer = result.value().consumer; + + // Drive reply: calls inner->publish() on targetExec_ and wires up setDownstream(). + folly::coro::blockingWait(std::move(result.value().reply)); + + // objectStream through consumer should reach innerConsumer via CrossExecFilter. + std::promise done; + auto future = done.get_future(); + EXPECT_CALL(*innerConsumer, objectStream(_, _, _)) + .WillOnce([&done](const ObjectHeader&, Payload, bool) { + done.set_value(); + return folly::makeExpected(folly::unit); + }); + consumer->objectStream(ObjectHeader{0, 0, 0}, nullptr, false); + future.wait(); +} + +TEST_F(SubscriberCrossExecFilterTest, PublishSafeAfterFilterDestroyed) { + // Regression: the old coPublish member coroutine captured `this` implicitly. + // Destroying the filter before driving the reply task was a use-after-free. + // The fix captures exec and inner by value in co_invoke so `this` is not needed. + PublishOk ok; + ok.requestID = RequestID(6); + EXPECT_CALL(*inner_, publish(_, _)) + .WillOnce( + [ok](PublishRequest, std::shared_ptr) -> Subscriber::PublishResult { + return Subscriber::PublishConsumerAndReplyTask{ + std::make_shared>(), + folly::coro::makeTask>( + folly::makeExpected(ok) + ) + }; + } + ); + + // Obtain the reply task while the filter is alive on the stack, then let it die. + auto replyTask = [&]() { + SubscriberCrossExecFilter stackFilter(targetExec_.get(), inner_); + PublishRequest pub; + pub.requestID = RequestID(6); + auto result = stackFilter.publish(std::move(pub), nullptr); + EXPECT_TRUE(result.hasValue()); + return std::move(result.value().reply); + }(); + + // stackFilter is destroyed; driving the task must not access a dangling this. + auto reply = folly::coro::blockingWait(std::move(replyTask)); + EXPECT_TRUE(reply.hasValue()); + EXPECT_EQ(reply.value().requestID, RequestID(6)); +} + +// ---- goaway ---- + +TEST_F(SubscriberCrossExecFilterTest, GoawayEnqueued) { + folly::ManualExecutor manualExec; + auto filter = std::make_shared(&manualExec, inner_); + + EXPECT_CALL(*inner_, goaway(_)).Times(0); + filter->goaway(Goaway{}); + + EXPECT_CALL(*inner_, goaway(_)).Times(1); + manualExec.drain(); +} + +} // namespace