From 1b7cc000c14294b0db1ad4895888a824d8f3dc55 Mon Sep 17 00:00:00 2001 From: afrind Date: Sat, 6 Jun 2026 12:18:38 -0400 Subject: [PATCH 1/3] relay_thread config and allow > 1 thread in config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a use_relay_thread boolean config option (default: true) that controls whether relay state is isolated on a dedicated executor thread. Disabling it is intended for baseline performance comparison only. Also removes the hard error that rejected threads > 1, replacing it with a targeted check: threads > 1 requires use_relay_thread=true. This unlocks the config validation only — threads > 1 with use_relay_thread=true will race on shared relay state until the following commit wires up the dedicated relay executor. --- src/config/Config.h | 1 + src/config/ConfigResolver.cpp | 8 ++++++-- src/config/loader/ParsedConfig.h | 5 +++++ test/config/ConfigResolverTest.cpp | 21 +++++++++++++++++++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/config/Config.h b/src/config/Config.h index b81ccf34..5fb7600a 100644 --- a/src/config/Config.h +++ b/src/config/Config.h @@ -215,6 +215,7 @@ struct Config { std::optional admin; std::string relayID; // always set: from config or randomly generated uint32_t threads{1}; + bool useRelayThread{true}; bool mvfstBpfSteering{true}; }; diff --git a/src/config/ConfigResolver.cpp b/src/config/ConfigResolver.cpp index 5176b5f7..5ba34613 100644 --- a/src/config/ConfigResolver.cpp +++ b/src/config/ConfigResolver.cpp @@ -896,8 +896,11 @@ folly::Expected resolveConfig(const ParsedConfig& c const uint32_t threads = config.threads.value().value_or(1); if (threads == 0) { errors.push_back("threads must be >= 1"); - } else if (threads > 1) { - errors.push_back("threads > 1 is not yet supported"); + } + + const bool useRelayThread = config.use_relay_thread.value().value_or(true); + if (threads > 1 && !useRelayThread) { + errors.push_back("use_relay_thread must be true when threads > 1"); } const bool mvfstBpfSteering = config.mvfst_bpf_steering.value().value_or(true); @@ -951,6 +954,7 @@ folly::Expected resolveConfig(const ParsedConfig& c .admin = std::move(adminConfig), .relayID = std::move(relayID), .threads = threads, + .useRelayThread = useRelayThread, .mvfstBpfSteering = mvfstBpfSteering, }, .warnings = std::move(warnings), diff --git a/src/config/loader/ParsedConfig.h b/src/config/loader/ParsedConfig.h index 4a10c8b7..d7469269 100644 --- a/src/config/loader/ParsedConfig.h +++ b/src/config/loader/ParsedConfig.h @@ -420,6 +420,11 @@ struct ParsedConfig { std::optional> listener_defaults; rfl::Description<"Number of IO worker threads (default: 1)", std::optional> threads; + rfl::Description< + "Dedicate one relay thread per service for relay state isolation (default: true). " + "Disable for baseline performance comparison.", + std::optional> + use_relay_thread; rfl::Description< "Attach a classic BPF reuseport filter to steer QUIC packets to the correct mvfst worker " "based on the connection ID's workerId field (Linux only, mvfst stack only, default: true). " diff --git a/test/config/ConfigResolverTest.cpp b/test/config/ConfigResolverTest.cpp index ef048291..7a8b556f 100644 --- a/test/config/ConfigResolverTest.cpp +++ b/test/config/ConfigResolverTest.cpp @@ -1086,12 +1086,29 @@ TEST(ResolveConfig, ThreadsZeroRejected) { EXPECT_THAT(result.error(), HasSubstr("threads must be >= 1")); } -TEST(ResolveConfig, ThreadsGreaterThanOneRejected) { +TEST(ResolveConfig, ThreadsGreaterThanOneAccepted) { auto cfg = makeMinimalInsecureConfig(); cfg.threads = std::optional{2}; auto result = resolveConfig(cfg); + ASSERT_TRUE(result.hasValue()); + EXPECT_EQ(result.value().config.threads, 2u); +} + +TEST(ResolveConfig, UseRelayThreadFalseWithOneThreadAccepted) { + auto cfg = makeMinimalInsecureConfig(); + cfg.use_relay_thread = std::optional{false}; + auto result = resolveConfig(cfg); + ASSERT_TRUE(result.hasValue()); + EXPECT_FALSE(result.value().config.useRelayThread); +} + +TEST(ResolveConfig, UseRelayThreadFalseWithMultipleThreadsRejected) { + auto cfg = makeMinimalInsecureConfig(); + cfg.threads = std::optional{2}; + cfg.use_relay_thread = std::optional{false}; + auto result = resolveConfig(cfg); ASSERT_TRUE(result.hasError()); - EXPECT_THAT(result.error(), HasSubstr("threads > 1 is not yet supported")); + EXPECT_THAT(result.error(), HasSubstr("use_relay_thread must be true when threads > 1")); } // --- multiple listeners tests --- From 13bbd0e28d8ab682513bd05791b7ef3cce331834 Mon Sep 17 00:00:00 2001 From: afrind Date: Sat, 6 Jun 2026 12:18:38 -0400 Subject: [PATCH 2/3] relay: isolate relay state on dedicated executor to support multiple I/O threads --- deps/moxygen | 2 +- src/MoqxRelay.cpp | 358 ++++++++++++++++++++---------- src/MoqxRelay.h | 106 ++++++++- src/MoqxRelayContext.cpp | 82 +++++-- src/MoqxRelayContext.h | 9 +- src/SubscriptionRegistry.cpp | 17 +- src/SubscriptionRegistry.h | 18 +- src/main.cpp | 3 +- src/relay/RelayExecUtil.h | 72 ++++++ test/SubscriptionRegistryTest.cpp | 22 +- 10 files changed, 530 insertions(+), 159 deletions(-) create mode 100644 src/relay/RelayExecUtil.h diff --git a/deps/moxygen b/deps/moxygen index 4a020047..a27f5c10 160000 --- a/deps/moxygen +++ b/deps/moxygen @@ -1 +1 @@ -Subproject commit 4a020047e344bfa47362d58f46c0bc29d39ef91d +Subproject commit a27f5c10c659580a8773eb9e76d09b79df1e85bd diff --git a/src/MoqxRelay.cpp b/src/MoqxRelay.cpp index 449dc72b..ed6d0a78 100644 --- a/src/MoqxRelay.cpp +++ b/src/MoqxRelay.cpp @@ -7,6 +7,9 @@ */ #include "MoqxRelay.h" +#include "relay/CrossExecForwarderCallback.h" +#include "relay/RelayExecUtil.h" +#include "relay/SubscriberCrossExecFilter.h" #include #include #include @@ -28,9 +31,11 @@ class MoqxRelayNamespaceHandle : public Publisher::NamespacePublishHandle { MoqxRelayNamespaceHandle( std::weak_ptr relay, std::shared_ptr session, - std::string peerID = {} + std::string peerID = {}, + folly::Executor* relayExec = nullptr ) - : relay_(std::move(relay)), session_(std::move(session)), peerID_(std::move(peerID)) {} + : relay_(std::move(relay)), session_(std::move(session)), peerID_(std::move(peerID)), + relayExec_(relayExec) {} ~MoqxRelayNamespaceHandle() { auto relay = relay_.lock(); @@ -38,52 +43,68 @@ class MoqxRelayNamespaceHandle : public Publisher::NamespacePublishHandle { return; } for (const auto& ns : activeNamespaces_) { - relay->doPublishNamespaceDone(ns, session_); + runOnExec(relayExec_, [relay, ns, session = session_]() mutable { + relay->doPublishNamespaceDone(ns, session); + }); } } void namespaceMsg(const TrackNamespace& suffix) override { - auto relay = relay_.lock(); - if (!relay || !session_) { - return; - } activeNamespaces_.insert(suffix); PublishNamespace pubNs; pubNs.trackNamespace = suffix; - relay->doPublishNamespace(std::move(pubNs), session_, nullptr, peerID_); + runOnExec( + relayExec_, + [relay = relay_, pubNs = std::move(pubNs), session = session_, peerID = peerID_]() mutable { + if (auto r = relay.lock()) { + r->doPublishNamespace(std::move(pubNs), session, nullptr, peerID); + } + } + ); } void namespaceDoneMsg(const TrackNamespace& suffix) override { - auto relay = relay_.lock(); - if (!relay || !session_) { - return; - } activeNamespaces_.erase(suffix); - relay->doPublishNamespaceDone(suffix, session_); + runOnExec(relayExec_, [relay = relay_, suffix, session = session_]() mutable { + if (auto r = relay.lock()) { + r->doPublishNamespaceDone(suffix, session); + } + }); } private: std::weak_ptr relay_; std::shared_ptr session_; std::string peerID_; + folly::Executor* relayExec_; folly::F14FastSet activeNamespaces_; }; std::shared_ptr makeNamespaceBridgeHandle( std::weak_ptr relay, std::shared_ptr session, - std::string peerID + std::string peerID, + folly::Executor* relayExec ) { return std::make_shared( std::move(relay), std::move(session), - std::move(peerID) + std::move(peerID), + relayExec ); } folly::coro::Task MoqxRelay::onUpstreamConnect(std::shared_ptr session) { - auto nsHandle = makeNamespaceBridgeHandle(weak_from_this(), session); - auto result = co_await session->subscribeNamespace(makePeerSubNs(relayID_), nsHandle); + co_return co_await onUpstreamConnectImpl(std::move(session)); +} + +folly::coro::Task MoqxRelay::onUpstreamConnectImpl(std::shared_ptr session) { + auto nsHandle = makeNamespaceBridgeHandle(weak_from_this(), session, {}, relayExec_); + // subscribeNamespace must run on the upstream session's executor + auto result = co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(session->getExecutor()), + session->subscribeNamespace(makePeerSubNs(relayID_), nsHandle) + ); if (result.hasValue()) { upstreamSubNsHandle_ = std::move(result.value()); } else { @@ -96,10 +117,16 @@ void MoqxRelay::onUpstreamDisconnect() { } std::shared_ptr MoqxRelay::createPublisherFilter() { + if (relayExec_) { + return std::make_shared(relayExec_, shared_from_this()); + } return shared_from_this(); } std::shared_ptr MoqxRelay::createSubscriberFilter() { + if (relayExec_) { + return std::make_shared(relayExec_, shared_from_this()); + } return shared_from_this(); } @@ -175,7 +202,7 @@ std::shared_ptr MoqxRelay::doPublishNamespac if (outSession != session && (info.options == SubscribeNamespaceOptions::NAMESPACE || info.options == SubscribeNamespaceOptions::BOTH)) { if (info.namespacePublishHandle) { - // Draft 16+: send NAMESPACE message on the bidi stream + // Draft 16+: send NAMESPACE message on the bidi stream. TrackNamespace suffix(std::vector( pubNs.trackNamespace.trackNamespace.begin() + info.trackNamespacePrefix.size(), pubNs.trackNamespace.trackNamespace.end() @@ -194,6 +221,13 @@ std::shared_ptr MoqxRelay::doPublishNamespac folly::coro::Task MoqxRelay::publishNamespace( PublishNamespace pubNs, std::shared_ptr callback +) { + return publishNamespaceImpl(std::move(pubNs), std::move(callback)); +} + +folly::coro::Task MoqxRelay::publishNamespaceImpl( + PublishNamespace pubNs, + std::shared_ptr callback ) { // TODO: store auth for forwarding on future SubscribeNamespace? auto session = MoQSession::getRequestSession(); @@ -281,19 +315,28 @@ Subscriber::PublishResult MoqxRelay::publish(PublishRequest pub, std::shared_ptr handle) { XLOG(DBG1) << __func__ << " ftn=" << pub.fullTrackName; XCHECK(handle) << "Publish handle cannot be null"; + // getRequestSession() stays valid on relayExec_: RequestContext propagates + // across the filter's executor hop. Authorize/validate before touching state. auto session = MoQSession::getRequestSession(); if (!pub.fullTrackName.trackNamespace.startsWith(allowedNamespacePrefix_)) { return folly::makeUnexpected( PublishError{pub.requestID, PublishErrorCode::UNINTERESTED, "bad namespace"} ); } - if (pub.fullTrackName.trackNamespace.empty()) { return folly::makeUnexpected( PublishError({pub.requestID, PublishErrorCode::INTERNAL_ERROR, "namespace required"}) ); } + maybeSetSessionExec(*session); + return publishWithSession(std::move(pub), std::move(handle), std::move(session)); +} +Subscriber::PublishResult MoqxRelay::publishWithSession( + PublishRequest pub, + std::shared_ptr handle, + std::shared_ptr session +) { // Handle duplicate publisher at relay level before registering in the tree. // Move the forwarder out and erase the entry BEFORE calling publishDone. // publishDone iterates subscribers via forEachSubscriber; if a subscriber @@ -308,10 +351,12 @@ MoqxRelay::publish(PublishRequest pub, std::shared_ptr(pub.fullTrackName, pub.largest); forwarder->setExtensions(pub.extensions); + auto publisherWrapped = maybeWrapPublisher(relayExec_, session); auto publishEntry = registry_.createFromPublish( pub.fullTrackName, forwarder, session, + std::move(publisherWrapped), pub.requestID, std::move(handle), [&](std::shared_ptr f) { return buildFilterChain(pub.fullTrackName, f); } @@ -354,6 +399,14 @@ MoqxRelay::publish(PublishRequest pub, std::shared_ptrsetCallback( + std::make_shared(relayExec_, forwarder, shared_from_this()) + ); + } else { + forwarder->setCallback(shared_from_this()); + } + uint64_t nSubscribers = 0; bool hasTrackFilterSub = false; for (auto& [outSession, info] : sessions) { @@ -366,11 +419,12 @@ MoqxRelay::publish(PublishRequest pub, std::shared_ptrgetExecutor(); - co_withExecutor(exec, publishToSession(outSession, forwarder, info.forward)).start(); + if (!addSubscriberAndPublish(outSession, forwarder, info.forward, /*pinned=*/true)) { + XLOG(ERR) << "addSubscriberAndPublish failed for " << forwarder->fullTrackName(); + continue; + } } } - forwarder->setCallback(shared_from_this()); // Forward if there are direct subscribers OR TRACK_FILTER subscribers // (PropertyRanking needs objects to evaluate property values for ranking). @@ -391,49 +445,90 @@ MoqxRelay::publish(PublishRequest pub, std::shared_ptr MoqxRelay::publishToSession( - std::shared_ptr session, +namespace { + +// Free-function coroutine: awaits the publish reply and calls onPublishOk. +// Holds forwarder alive because subscriber keeps a raw ref into it. +folly::coro::Task awaitPublishReply( std::shared_ptr forwarder, - bool forward, - bool trackFilterSubscriber + std::shared_ptr subscriber, + folly::coro::Task> reply ) { - if (session->isClosed()) { - XLOG(WARN) << "publishToSession: session closed, skipping " << forwarder->fullTrackName(); + auto result = co_await co_awaitTry(std::move(reply)); + if (result.hasException()) { + XLOG(ERR) << "Publish reply exception for " << forwarder->fullTrackName() + << " subscriber=" << subscriber.get() << ": " << result.exception().what(); + subscriber->unsubscribe(); co_return; } - auto subscriber = forwarder->addSubscriber(session, forward); - if (!subscriber) { - XLOG(ERR) << "Subscribe failed: addSubscriber returned null for " << forwarder->fullTrackName(); + if (result.value().hasError()) { + XLOG(ERR) << "Publish reply error for " << forwarder->fullTrackName() + << " subscriber=" << subscriber.get() << ": " << result.value().error().reasonPhrase; + subscriber->unsubscribe(); co_return; } - // Direct subscribers are pinned (not evictable by PropertyRanking). - // TRACK_FILTER subscribers are unpinned so onTrackEvicted can remove them. - subscriber->pinned = !trackFilterSubscriber; - XLOG(DBG4) << "added subscriber for ftn=" << forwarder->fullTrackName(); - auto guard = folly::makeGuard([subscriber] { subscriber->unsubscribe(); }); + XLOG(DBG1) << "Received PublishOk for " << forwarder->fullTrackName() + << " subscriber=" << subscriber.get(); + subscriber->onPublishOk(result.value().value()); +} - auto pubInitial = session->publish(subscriber->getPublishRequest(), subscriber); - if (pubInitial.hasError()) { - XLOG(ERR) << "Publish failed err=" << pubInitial.error().reasonPhrase; - co_return; +} // namespace + +std::optional MoqxRelay::startPublish( + std::shared_ptr session, + std::shared_ptr forwarder, + bool forward, + bool pinned, + folly::Executor* subscriberExec +) { + auto subscriber = forwarder->addSubscriber(session, forward); + if (!subscriber) { + XLOG(ERR) << "startPublish: addSubscriber null for " << forwarder->fullTrackName(); + return std::nullopt; } - subscriber->trackConsumer = std::move(pubInitial->consumer); - auto pubResult = co_await co_awaitTry(std::move(pubInitial->reply)); - if (pubResult.hasException()) { - XLOG(ERR) << "Publish failed err=" << pubResult.exception().what(); - co_return; + subscriber->pinned = pinned; + XLOG(DBG4) << "added subscriber for ftn=" << forwarder->fullTrackName(); + Subscriber::PublishResult pub; + if (subscriberExec) { + SubscriberCrossExecFilter wrapped(subscriberExec, session); + pub = wrapped.publish(subscriber->getPublishRequest(), subscriber); + } else { + pub = session->publish(subscriber->getPublishRequest(), subscriber); } - if (pubResult.value().hasError()) { - XLOG(ERR) << "Publish failed err=" << pubResult.value().error().reasonPhrase; - co_return; + if (pub.hasError()) { + XLOG(ERR) << "startPublish: publish failed: " << pub.error().reasonPhrase; + subscriber->unsubscribe(); + return std::nullopt; } - guard.dismiss(); - XLOG(DBG1) << "Publish OK sess=" << session.get(); - auto& pubOk = pubResult.value().value(); + subscriber->trackConsumer = std::move(pub->consumer); + return PreparedPublish{std::move(subscriber), std::move(pub->reply)}; +} - // Process the PUBLISH_OK response - updates range, forward flag, and - // handles NEW_GROUP_REQUEST forwarding via callback - subscriber->onPublishOk(pubOk); +bool MoqxRelay::addSubscriberAndPublish( + std::shared_ptr session, + std::shared_ptr forwarder, + bool forward, + bool pinned +) { + auto p = startPublish( + session, + forwarder, + forward, + pinned, + relayExec_ ? session->getExecutor() : nullptr + ); + if (!p) { + return false; + } + // Run awaitPublishReply on relayExec_ so onPublishOk and detach() (from + // publishDone) are always on the same thread and cannot race. For + // single-thread (relayExec_ == nullptr) this is the subscriber's exec. + co_withExecutor( + relayExec_ ? static_cast(relayExec_) : session->getExecutor(), + awaitPublishReply(forwarder, std::move(p->subscriber), std::move(p->reply)) + ) + .start(); + return true; } class MoqxRelay::NamespaceSubscription : public Publisher::SubscribeNamespaceHandle { @@ -530,6 +625,13 @@ MoqxRelay::buildFilterChain(const FullTrackName& ftn, std::shared_ptr MoqxRelay::subscribeNamespace( SubscribeNamespace subNs, std::shared_ptr namespacePublishHandle +) { + return subscribeNamespaceImpl(std::move(subNs), std::move(namespacePublishHandle)); +} + +folly::coro::Task MoqxRelay::subscribeNamespaceImpl( + SubscribeNamespace subNs, + std::shared_ptr namespacePublishHandle ) { XLOG(DBG1) << __func__ << " nsp=" << subNs.trackNamespacePrefix; @@ -545,10 +647,11 @@ folly::coro::Task MoqxRelay::subscribeNames << ", reciprocating peer subNs"; // Tag with the peer's relay ID so we suppress echoing these namespaces // back to that peer on reconnect. - auto handle = makeNamespaceBridgeHandle(weak_from_this(), session, incomingPeerID); - auto recipResult = co_await session->subscribeNamespace( - makePeerSubNs(), - handle + auto handle = makeNamespaceBridgeHandle(weak_from_this(), session, incomingPeerID, relayExec_); + // subscribeNamespace must run on the peer session's executor. + auto recipResult = co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(session->getExecutor()), + session->subscribeNamespace(makePeerSubNs(), handle) ); // no token: reciprocal, prevents loop if (recipResult.hasError()) { XLOG(ERR) << "Reciprocal peer subNs failed: " << recipResult.error().reasonPhrase; @@ -599,7 +702,7 @@ folly::coro::Task MoqxRelay::subscribeNames // If TRACK_FILTER is present, enroll session in PropertyRanking for top-N selection. // NOTE: onSelected callbacks fire synchronously within addSessionToTopNGroup() for - // tracks already in top-N, triggering publishToSession() before this call returns. + // tracks already in top-N, triggering onTrackSelected() before this call returns. if (trackFilter) { auto ranking = getOrCreateRanking(nodePtr, trackFilter->propertyType, subNs.trackNamespacePrefix); @@ -654,7 +757,10 @@ folly::coro::Task MoqxRelay::subscribeNames (subNs.options == SubscribeNamespaceOptions::BOTH || subNs.options == SubscribeNamespaceOptions::PUBLISH)) { if (publishSession != session) { - co_withExecutor(exec, publishToSession(session, forwarder, subNs.forward)).start(); + if (!addSubscriberAndPublish(session, forwarder, subNs.forward, /*pinned=*/true)) { + XLOG(ERR) << "addSubscriberAndPublish failed for " << ftn; + return; + } } } }); @@ -703,7 +809,13 @@ MoqxRelay::PublishState MoqxRelay::findPublishState(const FullTrackName& ftn) { folly::coro::Task MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr consumer) { + return subscribeImpl(std::move(subReq), std::move(consumer)); +} + +folly::coro::Task +MoqxRelay::subscribeImpl(SubscribeRequest subReq, std::shared_ptr consumer) { auto session = MoQSession::getRequestSession(); + maybeSetSessionExec(*session); const auto& ftn = subReq.fullTrackName; if (ftn.trackNamespace.empty()) { @@ -715,8 +827,7 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con // TOCTOU fix: if we might be the first subscriber, wait for the upstream // connection before branching. A concurrent coroutine may emplace the entry // while we are suspended, so we re-check inside getOrCreateFromSubscribe. - if (!registry_.exists(ftn) && upstream_ && - !namespaceTree_.findPublisherSession(ftn.trackNamespace)) { + if (!registry_.exists(ftn) && upstream_ && !findUpstreamPublisher(ftn.trackNamespace)) { co_await upstream_->waitForConnected(kUpstreamConnectWaitTimeout); } @@ -733,6 +844,7 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con {subReq.requestID, SubscribeErrorCode::TRACK_NOT_EXIST, "no such namespace or track"} )); } // pending destructor fires on early return above + auto upstreamPublisher = maybeWrapPublisher(relayExec_, upstreamSession); // Add subscriber first (with the client's original request) in case objects // arrive before subscribe OK. @@ -757,9 +869,8 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con subReq.locType = LocationType::LargestObject; // Per the spec, we're supposed to always forward=1 upstream subReq.forward = first->forwarder->numForwardingSubscribers() > 0; - subReq.requestID = upstreamSession->peekNextRequestID(); - auto subRes = co_await upstreamSession->subscribe(subReq, first->consumer); + auto subRes = co_await upstreamPublisher->subscribe(subReq, first->consumer); if (subRes.hasError()) { co_return folly::makeUnexpected(SubscribeError( {clientRequestID, @@ -783,7 +894,8 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con first->forwarder->tryProcessNewGroupRequest(subReq.params, /*fire=*/false); auto requestID = subRes.value()->subscribeOk().requestID; - if (!first->pending.complete(std::move(subRes.value()), requestID, upstreamSession)) { + if (!first->pending + .complete(std::move(subRes.value()), requestID, upstreamSession, upstreamPublisher)) { XLOG(ERR) << "Subscription replaced by reconnecting publisher: " << ftn; co_return folly::makeUnexpected(SubscribeError{ clientRequestID, @@ -824,6 +936,11 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con folly::coro::Task MoqxRelay::fetch(Fetch fetch, std::shared_ptr consumer) { + return fetchImpl(std::move(fetch), std::move(consumer)); +} + +folly::coro::Task +MoqxRelay::fetchImpl(Fetch fetch, std::shared_ptr consumer) { auto session = MoQSession::getRequestSession(); if (fetch.fullTrackName.trackNamespace.empty()) { @@ -848,33 +965,30 @@ MoqxRelay::fetch(Fetch fetch, std::shared_ptr consumer) { fetch.args = StandaloneFetch(res.value().start, res.value().end); joining = nullptr; } else { - // Upstream is resolving the subscribe, forward joining fetch - joining->joiningRequestID = fetchView->requestID; + // Upstream is resolving the subscribe; let MoQSession resolve the + // request ID by track name to avoid a cross-executor data race. + joining->joiningRequestID = kAutoRequestID; } } - auto upstreamSession = namespaceTree_.findPublisherSession(fetch.fullTrackName.trackNamespace); - if (!upstreamSession && upstream_) { + auto upstreamPublisher = findUpstreamPublisher(fetch.fullTrackName.trackNamespace); + if (!upstreamPublisher && upstream_) { co_await upstream_->waitForConnected(kUpstreamConnectWaitTimeout); - upstreamSession = namespaceTree_.findPublisherSession(fetch.fullTrackName.trackNamespace); + upstreamPublisher = findUpstreamPublisher(fetch.fullTrackName.trackNamespace); } - if (!upstreamSession) { + if (!upstreamPublisher) { // Attempt to find matching upstream subscription (from publish) if (auto fetchView = registry_.getFetchView(fetch.fullTrackName)) { - upstreamSession = fetchView->upstream; + upstreamPublisher = fetchView->publisher; } - if (!upstreamSession) { + if (!upstreamPublisher) { co_return folly::makeUnexpected( FetchError({fetch.requestID, FetchErrorCode::TRACK_NOT_EXIST, "no upstream for fetch"}) ); } } - if (session.get() == upstreamSession.get()) { - co_return folly::makeUnexpected( - FetchError({fetch.requestID, FetchErrorCode::INTERNAL_ERROR, "self fetch"}) - ); - } fetch.priority = kDefaultUpstreamPriority; + if (!cache_ || joining) { // We can't use the cache on an unresolved joining fetch - we don't know // which objects are being requested. However, once we have that resolved, @@ -883,12 +997,18 @@ MoqxRelay::fetch(Fetch fetch, std::shared_ptr consumer) { XLOG(DBG1) << "Upstream fetch {" << standalone->start.group << "," << standalone->start.object << "}.." << standalone->end.group << "," << standalone->end.object << "}"; } - co_return co_await upstreamSession->fetch(fetch, std::move(consumer)); + co_return co_await upstreamPublisher->fetch(std::move(fetch), std::move(consumer)); } - co_return co_await cache_->fetch(fetch, std::move(consumer), std::move(upstreamSession)); + co_return co_await cache_ + ->fetch(std::move(fetch), std::move(consumer), std::move(upstreamPublisher)); } folly::coro::Task MoqxRelay::trackStatus(TrackStatus trackStatus) { + return trackStatusImpl(std::move(trackStatus)); +} + +folly::coro::Task MoqxRelay::trackStatusImpl(TrackStatus trackStatus +) { XLOG(DBG1) << __func__ << " ftn=" << trackStatus.fullTrackName; auto session = MoQSession::getRequestSession(); @@ -928,25 +1048,26 @@ folly::coro::Task MoqxRelay::trackStatus(TrackStat << " statusCode=" << (uint32_t)statusCode; co_return trackStatusOk; } else { - // No subscription - forward to upstream - auto upstreamSession = - namespaceTree_.findPublisherSession(trackStatus.fullTrackName.trackNamespace); - if (!upstreamSession && upstream_) { - co_await upstream_->waitForConnected(kUpstreamConnectWaitTimeout); - upstreamSession = - namespaceTree_.findPublisherSession(trackStatus.fullTrackName.trackNamespace); + // No active subscription — try registry publisher first, then namespace tree + std::shared_ptr upstreamPublisher; + if (upstreamView) { + upstreamPublisher = upstreamView->publisher; + } else { + upstreamPublisher = findUpstreamPublisher(trackStatus.fullTrackName.trackNamespace); + if (!upstreamPublisher && upstream_) { + co_await upstream_->waitForConnected(kUpstreamConnectWaitTimeout); + upstreamPublisher = findUpstreamPublisher(trackStatus.fullTrackName.trackNamespace); + } } - if (!upstreamSession) { - XLOG(DBG1) << "No upstream session for track: " << trackStatus.fullTrackName; + if (!upstreamPublisher) { + XLOG(DBG1) << "No upstream for track: " << trackStatus.fullTrackName; co_return folly::makeUnexpected(TrackStatusError{ trackStatus.requestID, TrackStatusErrorCode::TRACK_NOT_EXIST, "no such namespace or track" }); } - - // Forward the trackStatus request to the upstream publisher session - auto result = co_await upstreamSession->trackStatus(trackStatus); + auto result = co_await upstreamPublisher->trackStatus(std::move(trackStatus)); if (result.hasError()) { XLOG(DBG1) << "Upstream trackStatus failed: " << result.error().reasonPhrase; @@ -958,7 +1079,10 @@ folly::coro::Task MoqxRelay::trackStatus(TrackStat } void MoqxRelay::onEmpty(MoQForwarder* forwarder) { - const auto& ftn = forwarder->fullTrackName(); + onEmptyImpl(forwarder->fullTrackName()); +} + +void MoqxRelay::onEmptyImpl(const FullTrackName& ftn) { auto upstreamView = registry_.getUpstreamView(ftn); if (!upstreamView) { return; @@ -976,8 +1100,12 @@ void MoqxRelay::onEmpty(MoQForwarder* forwarder) { if (upstreamView->isPublish) { // if it's publish, don't unsubscribe, just subscribeUpdate forward=false XLOG(DBG1) << "Updating upstream subscription forward=false"; - auto exec = upstreamView->upstream->getExecutor(); - co_withExecutor(exec, doSubscribeUpdate(upstreamView->handle, /*forward=*/false)).start(); + auto exec = relayExec(); + co_withExecutor( + folly::getKeepAliveToken(exec), + doSubscribeUpdate(upstreamView->handle, /*forward=*/false) + ) + .start(); } else { upstreamView->handle->unsubscribe(); XLOG(DBG4) << "Erasing subscription to " << ftn; @@ -986,7 +1114,10 @@ void MoqxRelay::onEmpty(MoQForwarder* forwarder) { } void MoqxRelay::forwardChanged(MoQForwarder* forwarder, bool forward) { - const auto& ftn = forwarder->fullTrackName(); + forwardChangedImpl(forwarder->fullTrackName(), forward); +} + +void MoqxRelay::forwardChangedImpl(const FullTrackName& ftn, bool forward) { auto upstreamView = registry_.getUpstreamView(ftn); if (!upstreamView) { return; @@ -1002,12 +1133,16 @@ void MoqxRelay::forwardChanged(MoQForwarder* forwarder, bool forward) { } XLOG(INFO) << "Updating forward for " << ftn << " forward=" << forward; - auto exec = upstreamView->upstream->getExecutor(); - co_withExecutor(exec, doSubscribeUpdate(upstreamView->handle, forward)).start(); + auto exec = relayExec(); + co_withExecutor(folly::getKeepAliveToken(exec), doSubscribeUpdate(upstreamView->handle, forward)) + .start(); } void MoqxRelay::newGroupRequested(MoQForwarder* forwarder, uint64_t group) { - const auto& ftn = forwarder->fullTrackName(); + newGroupRequestedImpl(forwarder->fullTrackName(), group); +} + +void MoqxRelay::newGroupRequestedImpl(const FullTrackName& ftn, uint64_t group) { auto upstreamView = registry_.getUpstreamView(ftn); // Check if handle is still valid (publisher may have terminated) if (!upstreamView || !upstreamView->handle) { @@ -1016,9 +1151,10 @@ void MoqxRelay::newGroupRequested(MoQForwarder* forwarder, uint64_t group) { } XLOG(INFO) << "New group request detected for " << ftn; - auto exec = upstreamView->upstream->getExecutor(); + auto exec = relayExec(); auto handle = upstreamView->handle; - co_withExecutor(exec, doNewGroupRequestUpdate(std::move(handle), group)).start(); + co_withExecutor(folly::getKeepAliveToken(exec), doNewGroupRequestUpdate(std::move(handle), group)) + .start(); } // TRACK_FILTER support @@ -1133,8 +1269,8 @@ void MoqxRelay::onTrackSelected( XLOG(DBG4) << "[MoqxRelay] Track selected: " << ftn << " session=" << session.get() << " forward=" << forward; - if (!session || session->isClosed()) { - XLOG(ERR) << "onTrackSelected: session null or closed, skipping " << ftn; + if (!session) { + XLOG(ERR) << "onTrackSelected: null session for " << ftn; return; } @@ -1147,20 +1283,18 @@ void MoqxRelay::onTrackSelected( auto exec = session->getExecutor(); XCHECK(exec) << "onTrackSelected: null executor for session " << session.get(); - // TODO: Consider batching multiple publishToSession calls on the same executor - // when multiple tracks are selected for the same session in a single ranking update. - co_withExecutor( - exec, - publishToSession(session, trackForwarder, forward, /*trackFilterSubscriber=*/true) - ) - .start(); + // TODO: Consider batching multiple addSubscriberAndPublish calls on the same + // executor when multiple tracks are selected for the same session in a single + // ranking update. + // TRACK_FILTER subscribers are unpinned so onTrackEvicted can remove them. + addSubscriberAndPublish(session, trackForwarder, forward, /*pinned=*/false); } void MoqxRelay::onTrackEvicted(const FullTrackName& ftn, std::shared_ptr session) { XLOG(DBG4) << "[MoqxRelay] Track evicted: " << ftn << " session=" << session.get(); - if (!session || session->isClosed()) { - XLOG(WARN) << "onTrackEvicted: session null or closed, skipping " << ftn; + if (!session) { + XLOG(WARN) << "onTrackEvicted: null session for " << ftn; return; } diff --git a/src/MoqxRelay.h b/src/MoqxRelay.h index 5ac4c3e1..6723d91b 100644 --- a/src/MoqxRelay.h +++ b/src/MoqxRelay.h @@ -14,9 +14,11 @@ #include "UpstreamProvider.h" #include "config/Config.h" #include "relay/PropertyRanking.h" +#include "relay/RelayExecUtil.h" #include #include +#include #include #include #include @@ -24,6 +26,10 @@ #include #include +namespace openmoq::moqx { +class CrossExecForwarderCallback; +} // namespace openmoq::moqx + namespace openmoq::moqx { // Visitor interface for relay state inspection. @@ -115,6 +121,21 @@ class MoqxRelay : public moxygen::Publisher, } } + // Optionally isolate relay state on a dedicated executor thread. + // When set, all public entry points switch to relayExec before touching + // relay state, and consumer callbacks to/from sessions are wrapped with + // cross-executor filters. relayExec must outlive this relay. + // If not set (default), all operations run on the calling thread. + void setRelayExec(folly::Executor* relayExec) { relayExec_ = relayExec; } + + // Takes ownership of exec and uses it as the relay executor. + void setRelayExec(std::shared_ptr exec) { + ownedRelayExec_ = std::move(exec); + relayExec_ = ownedRelayExec_.get(); + } + + folly::Executor* getRelayExec() const { return relayExec_; } + void setAllowedNamespacePrefix(moxygen::TrackNamespace allowed) { allowedNamespacePrefix_ = std::move(allowed); } @@ -252,17 +273,41 @@ class MoqxRelay : public moxygen::Publisher, void forwardChanged(moxygen::MoQForwarder* forwarder, bool forward) override; void newGroupRequested(moxygen::MoQForwarder* forwarder, uint64_t group) override; + // FTN-keyed impl variants — called by CrossExecForwarderCallback (relay exec) + // or directly from the non-cross-exec callbacks above. + friend class CrossExecForwarderCallback; + void onEmptyImpl(const moxygen::FullTrackName& ftn); + void forwardChangedImpl(const moxygen::FullTrackName& ftn, bool forward); + void newGroupRequestedImpl(const moxygen::FullTrackName& ftn, uint64_t group); + folly::coro::Task publishNamespaceToSession( std::shared_ptr session, moxygen::PublishNamespace pubNs, std::shared_ptr nodePtr ); - folly::coro::Task publishToSession( + // Sync setup: addSubscriber → set pinned → session->publish (via + // SubscriberCrossExecFilter when subscriberExec is non-null) → set + // trackConsumer. Returns nullopt and cleans up on any synchronous failure. + struct PreparedPublish { + std::shared_ptr subscriber; + folly::coro::Task> reply; + }; + std::optional startPublish( std::shared_ptr session, std::shared_ptr forwarder, bool forward, - bool trackFilterSubscriber = false + bool pinned, + folly::Executor* subscriberExec + ); + + // Calls startPublish and fires the reply as a free-running coroutine. + // Returns false and cleans up on any synchronous failure. + bool addSubscriberAndPublish( + std::shared_ptr session, + std::shared_ptr forwarder, + bool forward, + bool pinned ); folly::coro::Task @@ -324,6 +369,54 @@ class MoqxRelay : public moxygen::Publisher, const moxygen::FullTrackName& ftn, std::shared_ptr consumer ); + + // Impl methods — run on relayExec_ when set, or inline when relayExec_==nullptr. + folly::coro::Task + subscribeImpl(moxygen::SubscribeRequest subReq, std::shared_ptr consumer); + folly::coro::Task + fetchImpl(moxygen::Fetch fetch, std::shared_ptr consumer); + folly::coro::Task subscribeNamespaceImpl( + moxygen::SubscribeNamespace subNs, + std::shared_ptr namespacePublishHandle + ); + folly::coro::Task publishNamespaceImpl( + moxygen::PublishNamespace pubNs, + std::shared_ptr callback + ); + folly::coro::Task trackStatusImpl(moxygen::TrackStatus req + ); + folly::coro::Task onUpstreamConnectImpl(std::shared_ptr session); + + // Contains all the inline publish() logic, taking session explicitly so it + // can be called from either the I/O thread (relayExec_==nullptr) or from + // coPublish on relay exec (where getRequestSession() would return null). + PublishResult publishWithSession( + moxygen::PublishRequest pub, + std::shared_ptr handle, + std::shared_ptr session + ); + + std::shared_ptr ownedRelayExec_; + folly::Executor* relayExec_{nullptr}; + // Only set in single-threaded mode (relayExec_ == null); used as the + // coroutine start executor for fire-and-forget tasks like doSubscribeUpdate. + folly::Executor* sessionExec_{nullptr}; + + void maybeSetSessionExec(moxygen::MoQSession& session) { + if (!relayExec_ && !sessionExec_) { + sessionExec_ = session.getExecutor(); + } + } + + folly::Executor* relayExec() const { return relayExec_ ? relayExec_ : sessionExec_; } + + std::shared_ptr findUpstreamPublisher(const moxygen::TrackNamespace& ns) { + auto session = namespaceTree_.findPublisherSession(ns); + if (!session) { + return nullptr; + } + return maybeWrapPublisher(relayExec_, std::move(session)); + } std::unique_ptr cache_; uint64_t maxDeselected_{kDefaultMaxDeselected}; @@ -334,12 +427,15 @@ class MoqxRelay : public moxygen::Publisher, }; // Creates a NamespacePublishHandle that bridges NAMESPACE/NAMESPACE_DONE -// messages from a peer relay into relay->doPublishNamespace() synchronously. -// Used for both the initiating (UpstreamProvider) and reciprocal (MoqxRelay) paths. +// messages from a peer relay into relay->doPublishNamespace(). When relayExec +// is non-null, callbacks are dispatched to it so relay state is only mutated +// on the relay executor thread. Used for both the initiating (UpstreamProvider) +// and reciprocal (MoqxRelay) paths. std::shared_ptr makeNamespaceBridgeHandle( std::weak_ptr relay, std::shared_ptr session, - std::string peerID = {} + std::string peerID = {}, + folly::Executor* relayExec = nullptr ); } // namespace openmoq::moqx diff --git a/src/MoqxRelayContext.cpp b/src/MoqxRelayContext.cpp index 96a528dd..1d281b7d 100644 --- a/src/MoqxRelayContext.cpp +++ b/src/MoqxRelayContext.cpp @@ -6,11 +6,15 @@ #include "MoqxRelayContext.h" #include "relay/AuthFilters.h" +#include "relay/PublisherCrossExecFilter.h" +#include "relay/RelayExecUtil.h" +#include "relay/SubscriberCrossExecFilter.h" #include "stats/MoQStatsCollector.h" #include #include #include +#include #include using namespace moxygen; @@ -19,18 +23,41 @@ namespace openmoq::moqx { MoqxRelayContext::MoqxRelayContext( const folly::F14FastMap& services, - const std::string& relayID + const std::string& relayID, + bool useRelayThread ) : serviceMatcher_(services), relayID_(relayID) { - for (const auto& [name, svc] : services) { - services_.emplace( - name, - ServiceEntry{ - svc, - std::make_shared(svc.cache, relayID), - std::make_shared(svc.auth) - } + if (useRelayThread && !services.empty()) { + relayThreadPool_ = std::make_shared( + services.size(), + std::make_shared("moqx-relay") ); + auto evbs = relayThreadPool_->getAllEventBases(); + XCHECK_EQ(evbs.size(), services.size()); + size_t i = 0; + for (const auto& [name, svc] : services) { + auto relay = std::make_shared(svc.cache, relayID); + relay->setRelayExec(std::make_shared(evbs[i++].get())); + services_.emplace( + name, + ServiceEntry{ + svc, + std::move(relay), + std::make_shared(svc.auth) + } + ); + } + } else { + for (const auto& [name, svc] : services) { + services_.emplace( + name, + ServiceEntry{ + svc, + std::make_shared(svc.cache, relayID), + std::make_shared(svc.auth) + } + ); + } } } @@ -61,10 +88,7 @@ void MoqxRelayContext::initUpstreams(folly::EventBase* workerEvb) { CHECK(workerEvb) << "initUpstreams: workerEvb must not be null"; workerEvb_ = workerEvb; - // Use the provided worker EVB for all upstream connections. - // Per-EVB providers (one per worker thread) are a follow-up. - auto exec = std::make_shared(workerEvb); - + auto workerExec = std::make_shared(workerEvb); for (auto& [name, entry] : services_) { if (!entry.config.upstream) { continue; @@ -72,15 +96,31 @@ void MoqxRelayContext::initUpstreams(folly::EventBase* workerEvb) { const auto& cfg = *entry.config.upstream; auto verifier = makeUpstreamVerifier(cfg.tls); auto relay = entry.relay; - auto onConnect = [relay](std::shared_ptr session) -> folly::coro::Task { - co_await relay->onUpstreamConnect(session); + auto* relayExec = relay->getRelayExec(); + auto onConnect = [relay, + relayExec](std::shared_ptr session) -> folly::coro::Task { + if (relayExec) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(relayExec), + relay->onUpstreamConnect(session) + ); + } + co_return co_await relay->onUpstreamConnect(session); }; - auto onDisconnect = [relay]() { relay->onUpstreamDisconnect(); }; + auto onDisconnect = [relay, relayExec]() { + runOnExec(relayExec, [relay]() { relay->onUpstreamDisconnect(); }); + }; + std::shared_ptr pubHandler = relay; + std::shared_ptr subHandler = relay; + if (relayExec) { + pubHandler = std::make_shared(relayExec, relay); + subHandler = std::make_shared(relayExec, relay); + } auto provider = std::make_shared( - exec, + workerExec, proxygen::URL(cfg.url), - /*publishHandler=*/entry.relay, - /*subscribeHandler=*/entry.relay, + /*publishHandler=*/pubHandler, + /*subscribeHandler=*/subHandler, verifier, std::move(onConnect), std::move(onDisconnect), @@ -92,7 +132,7 @@ void MoqxRelayContext::initUpstreams(folly::EventBase* workerEvb) { // Eagerly connect so the peering handshake fires before any subscribers // arrive. The connection is lazy in UpstreamProvider but we kick it off // now so the upstream namespace tree is ready. - co_withExecutor(workerEvb, provider->start()).start(); + co_withExecutor(workerExec.get(), provider->start()).start(); } } @@ -169,6 +209,8 @@ folly::Expected MoqxRelayContext::validateAu } // Route: verify the setup token, then install the session's filter handlers. + // createPublisher/SubscriberFilter wraps the relay in cross-exec filters when + // the service runs on a dedicated relay thread. auto it = services_.find(*matchedName); CHECK(it != services_.end()) << "Service '" << *matchedName << "' matched but no entry found"; auto& entry = it->second; diff --git a/src/MoqxRelayContext.h b/src/MoqxRelayContext.h index 5c4305a3..8fce86dd 100644 --- a/src/MoqxRelayContext.h +++ b/src/MoqxRelayContext.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -56,7 +57,8 @@ class MoqxRelayContext { MoqxRelayContext( const folly::F14FastMap& services, - const std::string& relayID + const std::string& relayID, + bool useRelayThread = true ); void setStatsRegistry(std::shared_ptr registry); @@ -110,6 +112,11 @@ class MoqxRelayContext { ); private: + // When use_relay_thread=true: one dedicated thread per service, each with its + // own executor, isolating relay state from I/O threads. Null when disabled. + // Each relay owns its MoQFollyExecutorImpl; the pool just keeps threads alive. + std::shared_ptr relayThreadPool_; + folly::F14FastMap services_; ServiceMatcher serviceMatcher_; std::string relayID_; diff --git a/src/SubscriptionRegistry.cpp b/src/SubscriptionRegistry.cpp index 41acd6aa..bfe27e0e 100644 --- a/src/SubscriptionRegistry.cpp +++ b/src/SubscriptionRegistry.cpp @@ -13,7 +13,8 @@ namespace openmoq::moqx { bool SubscriptionRegistry::UpstreamSubscribePending::complete( std::shared_ptr handle, moxygen::RequestID requestID, - std::shared_ptr upstreamSession + std::shared_ptr upstreamSession, + std::shared_ptr publisher ) { active_ = false; return registry_->completeSubscription( @@ -21,7 +22,8 @@ bool SubscriptionRegistry::UpstreamSubscribePending::complete( weakForwarder_, std::move(handle), requestID, - std::move(upstreamSession) + std::move(upstreamSession), + std::move(publisher) ); } @@ -85,7 +87,8 @@ bool SubscriptionRegistry::completeSubscription( std::weak_ptr weakForwarder, std::shared_ptr handle, moxygen::RequestID requestID, - std::shared_ptr upstreamSession + std::shared_ptr upstreamSession, + std::shared_ptr publisher ) { auto it = subscriptions_.find(ftn); if (it == subscriptions_.end() || it->second.forwarder != weakForwarder.lock()) { @@ -95,6 +98,7 @@ bool SubscriptionRegistry::completeSubscription( rsub.handle = std::move(handle); rsub.requestID = requestID; rsub.upstream = std::move(upstreamSession); + rsub.publisher = std::move(publisher); rsub.promise.setValue(folly::unit); return true; } @@ -114,6 +118,7 @@ SubscriptionRegistry::PublishEntry SubscriptionRegistry::createFromPublish( const moxygen::FullTrackName& ftn, std::shared_ptr forwarder, std::shared_ptr session, + std::shared_ptr publisher, moxygen::RequestID requestID, std::shared_ptr handle, folly::FunctionRef)> chainBuilder @@ -137,6 +142,7 @@ SubscriptionRegistry::PublishEntry SubscriptionRegistry::createFromPublish( rsub.promise.setValue(folly::unit); rsub.requestID = requestID; rsub.handle = std::move(handle); + rsub.publisher = std::move(publisher); rsub.isPublish = true; auto [consumer, topNFilter] = chainBuilder(forwarder); @@ -174,7 +180,7 @@ SubscriptionRegistry::getUpstreamView(const moxygen::FullTrackName& ftn) const { const auto& rsub = it->second; return UpstreamView{ rsub.forwarder, - rsub.upstream, + rsub.publisher, rsub.handle, rsub.requestID, rsub.isPublish, @@ -189,7 +195,7 @@ SubscriptionRegistry::getFetchView(const moxygen::FullTrackName& ftn) const { return std::nullopt; } const auto& rsub = it->second; - return FetchView{rsub.forwarder, rsub.upstream, rsub.requestID, rsub.promise.isFulfilled()}; + return FetchView{rsub.forwarder, rsub.publisher, rsub.requestID, rsub.promise.isFulfilled()}; } std::shared_ptr @@ -201,6 +207,7 @@ SubscriptionRegistry::onPublisherTerminated(const moxygen::FullTrackName& ftn) { auto& rsub = it->second; rsub.handle.reset(); rsub.upstream.reset(); + rsub.publisher.reset(); if (rsub.forwarder->empty()) { subscriptions_.erase(it); return nullptr; diff --git a/src/SubscriptionRegistry.h b/src/SubscriptionRegistry.h index 4f5630dd..76faf574 100644 --- a/src/SubscriptionRegistry.h +++ b/src/SubscriptionRegistry.h @@ -53,12 +53,13 @@ class SubscriptionRegistry { // Identity-checked success path. Re-finds by ftn; checks forwarder identity // to detect a reconnecting publisher that replaced the entry during the - // caller's co_await suspension. Sets handle, requestID, upstreamSession; - // fulfills promise. Returns false if entry is gone or replaced. + // caller's co_await suspension. Sets handle, requestID, upstreamSession, + // publisher; fulfills promise. Returns false if entry is gone or replaced. bool complete( std::shared_ptr handle, moxygen::RequestID requestID, - std::shared_ptr upstreamSession + std::shared_ptr upstreamSession, + std::shared_ptr publisher ); ~UpstreamSubscribePending(); @@ -109,6 +110,7 @@ class SubscriptionRegistry { const moxygen::FullTrackName& ftn, std::shared_ptr forwarder, std::shared_ptr session, + std::shared_ptr publisher, moxygen::RequestID requestID, std::shared_ptr handle, folly::FunctionRef)> chainBuilder @@ -126,10 +128,10 @@ class SubscriptionRegistry { }; std::optional getTopNView(const moxygen::FullTrackName& ftn) const; - // For onEmpty / forwardChanged / newGroupRequested + // For onEmpty / forwardChanged / newGroupRequested / trackStatus struct UpstreamView { std::shared_ptr forwarder; - std::shared_ptr upstream; + std::shared_ptr publisher; std::shared_ptr handle; moxygen::RequestID requestID; bool isPublish; @@ -140,7 +142,7 @@ class SubscriptionRegistry { // For fetch() struct FetchView { std::shared_ptr forwarder; - std::shared_ptr upstream; + std::shared_ptr publisher; moxygen::RequestID requestID; bool isReady; }; @@ -182,6 +184,7 @@ class SubscriptionRegistry { std::shared_ptr forwarder; std::shared_ptr upstream; + std::shared_ptr publisher; moxygen::RequestID requestID{0}; std::shared_ptr handle; folly::coro::SharedPromise promise; @@ -196,7 +199,8 @@ class SubscriptionRegistry { std::weak_ptr weakForwarder, std::shared_ptr handle, moxygen::RequestID requestID, - std::shared_ptr upstreamSession + std::shared_ptr upstreamSession, + std::shared_ptr publisher ); // Called by UpstreamSubscribePending destructor on failure. diff --git a/src/main.cpp b/src/main.cpp index 5b748aba..84c49971 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -117,7 +117,8 @@ int main(int argc, char* argv[]) { // === 6. Initialize services === // Construct and configure the application's own services // (MoqxRelayContext, MoqxRelayServer, etc.) - auto context = std::make_shared(config.services, config.relayID); + auto context = + std::make_shared(config.services, config.relayID, config.useRelayThread); // === 6a. Stats registry === auto statsRegistry = std::make_shared(); diff --git a/src/relay/RelayExecUtil.h b/src/relay/RelayExecUtil.h new file mode 100644 index 00000000..ae406288 --- /dev/null +++ b/src/relay/RelayExecUtil.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "CrossExecFilter.h" +#include "PublisherCrossExecFilter.h" +#include +#include +#include +#include + +namespace openmoq::moqx { + +// Wraps c in a CrossExecFilter targeting exec, or returns c if exec is null. +inline std::shared_ptr +maybeCrossExec(folly::Executor* exec, std::shared_ptr c) { + if (!exec) { + return c; + } + return std::make_shared(exec, std::move(c), /*deepCopyPayload=*/false); +} + +// Wraps c in a FetchCrossExecFilter targeting exec, or returns c if exec is null. +inline std::shared_ptr +maybeCrossExec(folly::Executor* exec, std::shared_ptr c) { + if (!exec) { + return c; + } + return FetchCrossExecFilter::create(exec, std::move(c), /*deepCopyPayload=*/false); +} + +// Wraps p in a PublisherCrossExecFilter targeting exec, or returns p if exec is null. +inline std::shared_ptr +maybeCrossExec(folly::Executor* exec, std::shared_ptr p) { + if (!exec) { + return p; + } + return std::make_shared(exec, std::move(p)); +} + +// Wraps session as a Publisher, targeting its executor when relayExec is set. +inline std::shared_ptr +maybeWrapPublisher(folly::Executor* relayExec, std::shared_ptr session) { + // Evaluate getExecutor() before std::move(session) to avoid unspecified + // argument evaluation order leaving session moved-from. + auto* exec = relayExec ? session->getExecutor() : nullptr; + return maybeCrossExec(exec, std::shared_ptr(std::move(session))); +} + +// Runs fn on exec (fire-and-forget) if exec is non-null; otherwise runs it +// inline on the calling thread. +template void runOnExec(folly::Executor* exec, Fn&& fn) { + if (exec) { + exec->add(std::forward(fn)); + } else { + std::forward(fn)(); + } +} + +// When relayExec is set, dispatches fn to sessionExec (fire-and-forget). +// Otherwise runs fn inline (caller is already on the correct thread). +// Use this when relay state changes need to notify a specific session's executor. +template +void runOnSessionExec(folly::Executor* relayExec, folly::Executor* sessionExec, Fn&& fn) { + runOnExec(relayExec ? sessionExec : nullptr, std::forward(fn)); +} + +} // namespace openmoq::moqx diff --git a/test/SubscriptionRegistryTest.cpp b/test/SubscriptionRegistryTest.cpp index 2db00234..1ad6c20a 100644 --- a/test/SubscriptionRegistryTest.cpp +++ b/test/SubscriptionRegistryTest.cpp @@ -44,7 +44,7 @@ TEST(SubscriptionRegistryTest, AwaitSubsequentSucceeds) { registry.getOrCreateFromSubscribe(kFtn, nullptr, subscribeChain) ); - EXPECT_TRUE(first.pending.complete(nullptr, RequestID(0), nullptr)); + EXPECT_TRUE(first.pending.complete(nullptr, RequestID(0), nullptr, nullptr)); folly::EventBase evb; auto sub = folly::coro::blockingWait(std::move(task), &evb); @@ -89,7 +89,7 @@ TEST(SubscriptionRegistryTest, PendingCompleteReturnsFalseWhenEntryGone) { registry.remove(kFtn); // simulate publisher replacing the entry mid-subscribe - EXPECT_FALSE(first.pending.complete(nullptr, RequestID(0), nullptr)); + EXPECT_FALSE(first.pending.complete(nullptr, RequestID(0), nullptr, nullptr)); } // Regression: awaitSubsequent must re-find after suspension; erased entry throws. @@ -114,7 +114,7 @@ TEST(SubscriptionRegistryTest, AwaitSubsequentHandlesErasedEntry) { std::move(std::get>(token2)); // Upstream subscribe succeeds — fulfills the promise. - EXPECT_TRUE(first.pending.complete(nullptr, RequestID(0), nullptr)); + EXPECT_TRUE(first.pending.complete(nullptr, RequestID(0), nullptr, nullptr)); // Entry is erased before the subsequent coroutine resumes. registry.remove(kFtn); @@ -131,11 +131,18 @@ TEST(SubscriptionRegistryTest, CreateFromPublishEvictsSubscribeEntry) { registry.getOrCreateFromSubscribe(kFtn, nullptr, subscribeChain) ); auto originalForwarder = first.forwarder; - first.pending.complete(nullptr, RequestID(0), nullptr); + first.pending.complete(nullptr, RequestID(0), nullptr, nullptr); auto newForwarder = std::make_shared(kFtn, std::nullopt); - auto entry = - registry.createFromPublish(kFtn, newForwarder, nullptr, RequestID(1), nullptr, publishChain); + auto entry = registry.createFromPublish( + kFtn, + newForwarder, + nullptr, + nullptr, + RequestID(1), + nullptr, + publishChain + ); ASSERT_TRUE(entry.evicted.has_value()); EXPECT_EQ(entry.evicted->forwarder, originalForwarder); @@ -147,7 +154,8 @@ TEST(SubscriptionRegistryTest, CreateFromPublishEvictsSubscribeEntry) { TEST(SubscriptionRegistryTest, OnPublisherTerminatedErasesEmptyEntry) { SubscriptionRegistry registry; auto forwarder = std::make_shared(kFtn, std::nullopt); - registry.createFromPublish(kFtn, forwarder, nullptr, RequestID(0), nullptr, publishChain); + registry + .createFromPublish(kFtn, forwarder, nullptr, nullptr, RequestID(0), nullptr, publishChain); auto result = registry.onPublisherTerminated(kFtn); EXPECT_EQ(result, nullptr); From 1cc0aba30d4d96ed99c33ecf80b8724066ea10b8 Mon Sep 17 00:00:00 2001 From: afrind Date: Sat, 6 Jun 2026 12:49:21 -0400 Subject: [PATCH 3/3] test: add MultiThread relay test mode Add RelayMode::MultiThread to the MoQRelayTest parameterized suite so the relay is exercised across an executor boundary, alongside the existing SingleThread mode. Fixture (MoqxRelayTestFixture.h/.cpp, MoqxRelayTestModes.cpp): - New RelayMode::MultiThread enum value, instantiated in the AllModes suite. - SetUp() starts a ScopedEventBaseThread, wires the relay onto it via setRelayExec, and wraps it in PublisherCrossExecFilter / SubscriberCrossExecFilter. TearDown() drains pending relay-exec tasks and tears down in order to avoid use-after-free on NamespaceTree. - TestMoQExecutor::drive() does a loopOnce(), then in MT mode round-trips through the relay EVB twice to flush task->relay and relay-created tasks. - Add driveIfMultiThread(), make verifyOnRelayExec() hop to the relay EVB, and add driveUntil(pred) to advance async cascades deterministically instead of using a fixed drive count (capped at one iter in ST mode). Tests (DataPlane, NGR, Peer, Publish, SubNs, Subscribe): - Insert driveIfMultiThread() where work crosses executors. - Replace fixed-drive-then-read with driveUntil() plus std::atomic flags. - DuplicateSubgroupCancelledWhenNoActiveConsumers splits ST/MT expectations: in MT the CrossExecFilter defers the CANCELLED error to the next op, so it probes via endOfSubgroup(). - Add reset() calls to simulate the publisher/QUIC resetting open streams. Test Plan: --- test/MoqxRelayDataPlaneTests.cpp | 57 ++++++++++++++++++++++--- test/MoqxRelayNGRTests.cpp | 34 +++++++++++++-- test/MoqxRelayPeerTests.cpp | 1 + test/MoqxRelayPublishTests.cpp | 73 ++++++++++++++++++++++---------- test/MoqxRelaySubNsTests.cpp | 1 + test/MoqxRelaySubscribeTests.cpp | 3 ++ test/MoqxRelayTestFixture.cpp | 39 ++++++++++++++++- test/MoqxRelayTestFixture.h | 44 ++++++++++++++++++- test/MoqxRelayTestModes.cpp | 4 +- 9 files changed, 222 insertions(+), 34 deletions(-) diff --git a/test/MoqxRelayDataPlaneTests.cpp b/test/MoqxRelayDataPlaneTests.cpp index 0155841c..c5b3ccb9 100644 --- a/test/MoqxRelayDataPlaneTests.cpp +++ b/test/MoqxRelayDataPlaneTests.cpp @@ -57,20 +57,27 @@ TEST_P(MoQRelayTest, DuplicateSubgroupReplacesActiveConsumers) { auto sgForwarder1 = publishConsumer->beginSubgroup(0, 0, 0); EXPECT_TRUE(sgForwarder1.hasValue()); + driveIfMultiThread( + ); // flush so beginSubgroup wires downstream_ and calls mockConsumer beginSubgroup // Duplicate beginSubgroup - should reset v1 consumers and return new - // forwarder + // forwarder. Simulate publisher resetting the old stream. auto sgForwarder2 = publishConsumer->beginSubgroup(0, 0, 0); + driveIfMultiThread(); // flush duplicate so v1 consumers are reset and v2 consumers are created + (*sgForwarder1)->reset(moxygen::ResetStreamErrorCode::CANCELLED); + driveIfMultiThread(); // flush publisher reset of old stream EXPECT_TRUE(sgForwarder2.hasValue()); EXPECT_NE(sgForwarder1.value(), sgForwarder2.value()); // Close the new subgroup cleanly before teardown to avoid reset during // cleanup EXPECT_TRUE(sgForwarder2.value()->endOfSubgroup().hasValue()); + driveIfMultiThread(); // flush endOfSubgroup removeSession(publisherSession); removeSession(sub1); removeSession(sub2); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } // Test: Duplicate beginSubgroup after all subscribers have stop_sending'd @@ -99,17 +106,40 @@ TEST_P(MoQRelayTest, DuplicateSubgroupCancelledWhenNoActiveConsumers) { auto sgRes = publishConsumer->beginSubgroup(0, 0, 0); ASSERT_TRUE(sgRes.hasValue()); auto sg = sgRes.value(); + driveIfMultiThread(); // flush so beginSubgroup wires sg.downstream_ before object() enqueues // Trigger stop_sending tombstone via CANCELLED error from object() + if (relayEvb_) { + // MT: enqueue an extra object() after the beginSubgroup lambda + sg->object(0, nullptr, {}, false); + driveIfMultiThread(); // flush so object() runs and tombstones the subscriber + } sg->object(0, nullptr, {}, false); + driveIfMultiThread(); // flush so object() runs and tombstones the subscriber - // Duplicate beginSubgroup - all consumers tombstoned, should return CANCELLED + // Duplicate beginSubgroup - all consumers tombstoned, should return CANCELLED. auto dupRes = publishConsumer->beginSubgroup(0, 0, 0); - EXPECT_TRUE(dupRes.hasError()); - EXPECT_EQ(dupRes.error().code, MoQPublishError::CANCELLED); + if (relayMode() == RelayMode::MultiThread) { + // MT mode: a CrossExecFilter sits between publisher and relay, so it always + // returns a subFilter and the error is deferred until the next operation. + ASSERT_TRUE(dupRes.hasValue()); + driveIfMultiThread(); // flush so object() runs and tombstones the subscriber + auto probeRes = dupRes.value()->endOfSubgroup(); + EXPECT_TRUE(probeRes.hasError()); + if (probeRes.hasError()) { + EXPECT_EQ(probeRes.error().code, MoQPublishError::CANCELLED); + } + } else { + // ST and LocalForwarderMT: the publisher writes directly to the (local) + // forwarder with no cross-exec hop, so CANCELLED is returned synchronously. + EXPECT_TRUE(dupRes.hasError()); + EXPECT_EQ(dupRes.error().code, MoQPublishError::CANCELLED); + } removeSession(publisherSession); removeSession(subscriber); + sg->reset(ResetStreamErrorCode::CANCELLED); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } // Test: Duplicate beginSubgroup with partial stop_sending - active subscriber @@ -143,6 +173,11 @@ TEST_P(MoQRelayTest, DuplicateSubgroupSkipsTombstonedSubscriber) { // CANCELLED to simulate stop_sending EXPECT_CALL(*sgAv1, object(_, _, _, _)) .WillOnce(Return(folly::makeExpected(folly::unit))); + if (relayEvb_) { + EXPECT_CALL(*sgAv1, object(_, _, _, _)) + .WillOnce(Return(folly::makeExpected(folly::unit))) + .RetiresOnSaturation(); + } EXPECT_CALL(*sgBv1, object(_, _, _, _)) .WillOnce( Return(folly::makeUnexpected(MoQPublishError(MoQPublishError::CANCELLED, "stop sending"))) @@ -160,22 +195,34 @@ TEST_P(MoQRelayTest, DuplicateSubgroupSkipsTombstonedSubscriber) { auto sgForwarder1 = publishConsumer->beginSubgroup(0, 0, 0); ASSERT_TRUE(sgForwarder1.hasValue()); + driveIfMultiThread(); // flush so beginSubgroup wires downstream_ before object() enqueues + if (relayEvb_) { + // MT: enqueue an extra object() after the beginSubgroup lambda + sgForwarder1.value()->object(0, nullptr, {}, false); + driveIfMultiThread(); // flush so object() runs and tombstones the subscriber + } // Trigger tombstone for sub B via CANCELLED from object() sgForwarder1.value()->object(0, nullptr, {}, false); + driveIfMultiThread(); // flush so object() runs and tombstones sub B // Duplicate beginSubgroup: sub A gets reset+new, sub B is skipped - // (tombstoned) + // (tombstoned). Simulate publisher resetting the old stream. auto sgForwarder2 = publishConsumer->beginSubgroup(0, 0, 0); + driveIfMultiThread(); // flush duplicate so sgAv1 is reset and sgAv2 is created + (*sgForwarder1)->reset(moxygen::ResetStreamErrorCode::CANCELLED); + driveIfMultiThread(); // flush publisher reset of old stream EXPECT_TRUE(sgForwarder2.hasValue()); EXPECT_NE(sgForwarder1.value(), sgForwarder2.value()); // Close the new subgroup cleanly before teardown EXPECT_TRUE(sgForwarder2.value()->endOfSubgroup().hasValue()); + driveIfMultiThread(); // flush endOfSubgroup removeSession(publisherSession); removeSession(subA); removeSession(subB); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } } // namespace moxygen::test diff --git a/test/MoqxRelayNGRTests.cpp b/test/MoqxRelayNGRTests.cpp index 827fb86d..3a998f6e 100644 --- a/test/MoqxRelayNGRTests.cpp +++ b/test/MoqxRelayNGRTests.cpp @@ -7,6 +7,7 @@ */ #include "MoqxRelayTestFixture.h" +#include namespace moxygen::test { @@ -40,6 +41,7 @@ TEST_P(MoQRelayTest, RelayPublishPropagatesDynamicGroupsToSubscribers) { removeSession(subscriberSession); exec_->drive(); removeSession(publisherSession); + driveIfMultiThread(); } // Test: relay SUBSCRIBE path – dynamic groups from the upstream SubscribeOk is @@ -88,6 +90,7 @@ TEST_P(MoQRelayTest, RelaySubscribePropagatesDynamicGroupsToAllSubscribers) { removeSession(publisherSession); removeSession(subscriber1); removeSession(subscriber2); + driveIfMultiThread(); } // Relay test: When a late-joining subscriber sends NEW_GROUP_REQUEST in its @@ -214,6 +217,7 @@ TEST_P(MoQRelayTest, RelayRequestUpdateNGRCascadedUpstream) { removeSession(publisherSession); removeSession(subscriberSession); + driveIfMultiThread(); } // Relay test: downstream subscriber returns PublishOk carrying NEW_GROUP_REQUEST; @@ -248,10 +252,13 @@ TEST_P(MoQRelayTest, PublishOkNewNGRForwardedUpstream) { doSubscribeNamespace(subscriberSession, kTestNamespace); auto publishHandle = makePublishHandle(); + std::atomic updates{0}; { testing::InSequence seq; - EXPECT_CALL(*publishHandle, requestUpdateCalled(_)).Times(1); // forward=true update - EXPECT_CALL(*publishHandle, requestUpdateCalled(_)).WillOnce([](const RequestUpdate& update) { + EXPECT_CALL(*publishHandle, requestUpdateCalled(_)) // forward=true update + .WillOnce([&](const RequestUpdate&) { ++updates; }); + EXPECT_CALL(*publishHandle, requestUpdateCalled(_)).WillOnce([&](const RequestUpdate& update) { + ++updates; auto ngrValue = getFirstIntParam(update.params, TrackRequestParamKey::NEW_GROUP_REQUEST); ASSERT_TRUE(ngrValue.has_value()); EXPECT_EQ(*ngrValue, 8u); @@ -272,8 +279,16 @@ TEST_P(MoQRelayTest, PublishOkNewNGRForwardedUpstream) { }); exec_->drive(); + // Wait for the async cascade (forward=true + NGR=8) to actually land rather + // than driving a fixed number of times, then lock in the assertion before + // teardown so the trailing forwardChanged(false) → requestUpdate at + // removeSession can't over-saturate the expectation. + EXPECT_TRUE(driveUntil([&] { return updates.load() >= 2; })) + << "NGR cascade incomplete: " << updates.load() << "/2 requestUpdates"; + removeSession(publisherSession); removeSession(subscriberSession); + driveIfMultiThread(); } // Relay test: a second subscriber returning the same NEW_GROUP_REQUEST value in @@ -320,10 +335,13 @@ TEST_P(MoQRelayTest, PublishOkDuplicateNGRNotForwardedUpstream) { doSubscribeNamespace(subscriber2, kTestNamespace); auto publishHandle = makePublishHandle(); + std::atomic updates{0}; { testing::InSequence seq; - EXPECT_CALL(*publishHandle, requestUpdateCalled(_)).Times(1); // forward=true update - EXPECT_CALL(*publishHandle, requestUpdateCalled(_)).Times(1); // NGR update (deduplicated) + EXPECT_CALL(*publishHandle, requestUpdateCalled(_)) // forward=true update + .WillOnce([&](const RequestUpdate&) { ++updates; }); + EXPECT_CALL(*publishHandle, requestUpdateCalled(_)) // NGR update (deduplicated) + .WillOnce([&](const RequestUpdate&) { ++updates; }); } PublishRequest pub; @@ -340,9 +358,17 @@ TEST_P(MoQRelayTest, PublishOkDuplicateNGRNotForwardedUpstream) { }); exec_->drive(); + // Wait for the deduplicated cascade (forward=true + one NGR) to land, then + // lock in the assertion before teardown; the trailing forwardChanged(false) → + // requestUpdate at subscriber teardown is not asserted here and would + // otherwise race in and over-saturate the expectation. + EXPECT_TRUE(driveUntil([&] { return updates.load() >= 2; })) + << "NGR cascade incomplete: " << updates.load() << "/2 requestUpdates"; + removeSession(publisherSession); removeSession(subscriber1); removeSession(subscriber2); + driveIfMultiThread(); } } // namespace moxygen::test diff --git a/test/MoqxRelayPeerTests.cpp b/test/MoqxRelayPeerTests.cpp index 086a0d0d..16ca38d7 100644 --- a/test/MoqxRelayPeerTests.cpp +++ b/test/MoqxRelayPeerTests.cpp @@ -134,6 +134,7 @@ TEST_P(MoQRelayTest, LocalNamespaceDeliveredToPeerOnReconnect) { removeSession(localPublisher); removeSession(peerSession); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } // A mock session that simulates a peer announcing peerNs when the relay diff --git a/test/MoqxRelayPublishTests.cpp b/test/MoqxRelayPublishTests.cpp index 74faf3c6..776efc16 100644 --- a/test/MoqxRelayPublishTests.cpp +++ b/test/MoqxRelayPublishTests.cpp @@ -7,6 +7,7 @@ */ #include "MoqxRelayTestFixture.h" +#include namespace moxygen::test { @@ -55,10 +56,13 @@ TEST_P(MoQRelayTest, PublishExtensionsForwardedToSubscribers) { // Subscribe to namespace first auto mockConsumer = createMockConsumer(); Extensions receivedExtensions; + std::atomic published{false}; EXPECT_CALL(*subscriber, publish(testing::_, testing::_)) .WillOnce([&mockConsumer, - &receivedExtensions](const PublishRequest& pubReq, auto /*subHandle*/) { + &receivedExtensions, + &published](const PublishRequest& pubReq, auto /*subHandle*/) { receivedExtensions = pubReq.extensions; + published.store(true); return Subscriber::PublishResult(Subscriber::PublishConsumerAndReplyTask{ mockConsumer, []() -> folly::coro::Task> { @@ -94,12 +98,19 @@ TEST_P(MoQRelayTest, PublishExtensionsForwardedToSubscribers) { }); exec_->drive(); + // Wait until the relay has actually forwarded the publish to the subscriber + // (the mock sets `published`), rather than reading receivedExtensions after a + // fixed drive — under parallel load the forwarding may not have completed yet. + ASSERT_TRUE(driveUntil([&] { return published.load(); })) + << "publish was not forwarded to the subscriber"; + // Verify extensions were forwarded EXPECT_EQ(receivedExtensions.getIntExtension(kDeliveryTimeoutExtensionType), 5000); EXPECT_EQ(receivedExtensions.getIntExtension(0xBEEF'0000), 42); removeSession(publisherSession); removeSession(subscriber); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } // ============================================================ @@ -153,29 +164,38 @@ TEST_P(MoQRelayTest, PublishExtensionsForwardedToLateJoiners) { // Late-joining subscriber 2 should also get extensions Extensions receivedExtensions; + std::atomic published2{false}; auto mockConsumer2 = createMockConsumer(); EXPECT_CALL(*subscriber2, publish(testing::_, testing::_)) - .WillOnce([&mockConsumer2, &receivedExtensions](const PublishRequest& pubReq, auto) { - receivedExtensions = pubReq.extensions; - return Subscriber::PublishResult(Subscriber::PublishConsumerAndReplyTask{ - mockConsumer2, - []() -> folly::coro::Task> { - co_return PublishOk{ - RequestID(2), - true, - 0, - GroupOrder::OldestFirst, - LocationType::LargestObject, - std::nullopt, - std::nullopt - }; - }() - }); - }); + .WillOnce( + [&mockConsumer2, &receivedExtensions, &published2](const PublishRequest& pubReq, auto) { + receivedExtensions = pubReq.extensions; + published2.store(true); + return Subscriber::PublishResult(Subscriber::PublishConsumerAndReplyTask{ + mockConsumer2, + []() -> folly::coro::Task> { + co_return PublishOk{ + RequestID(2), + true, + 0, + GroupOrder::OldestFirst, + LocationType::LargestObject, + std::nullopt, + std::nullopt + }; + }() + }); + } + ); doSubscribeNamespace(subscriber2, kTestNamespace); exec_->drive(); + // Wait until the relay forwards the publish to the late joiner before reading + // the extensions (async under parallel load). + ASSERT_TRUE(driveUntil([&] { return published2.load(); })) + << "publish was not forwarded to the late-joining subscriber"; + // Verify late-joiner received extensions EXPECT_EQ(receivedExtensions.getIntExtension(kDeliveryTimeoutExtensionType), 3000); EXPECT_EQ(receivedExtensions.getIntExtension(0xCAFE'0000), 99); @@ -183,6 +203,7 @@ TEST_P(MoQRelayTest, PublishExtensionsForwardedToLateJoiners) { removeSession(publisherSession); removeSession(subscriber1); removeSession(subscriber2); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } // Regression test: publisher reconnect after disconnect with active subscriber @@ -231,6 +252,8 @@ TEST_P(MoQRelayTest, PublisherReconnectWithOpenSubgroupNoSegfault) { moxygen::BeginSubgroupOptions{} ); ASSERT_TRUE(sgRes.hasValue()) << "beginSubgroup should succeed"; + // Simulate QUIC resetting the open stream on connection drop. + (*sgRes)->reset(moxygen::ResetStreamErrorCode::INTERNAL_ERROR); }); // Step 3: session A's connection drops WITHOUT closing the subgroup. @@ -288,9 +311,9 @@ TEST_P(MoQRelayTest, PublishReplacesSubscribeDrainsOldAndServesNew) { // Subscribe to the track (creates subscribe-path subscription) auto oldConsumer = createMockConsumer(); - bool publishDoneReceived = false; + std::atomic publishDoneReceived{false}; EXPECT_CALL(*oldConsumer, publishDone(_)).WillOnce([&publishDoneReceived](const PublishDone&) { - publishDoneReceived = true; + publishDoneReceived.store(true); return folly::makeExpected(folly::unit); }); auto handle = subscribeToTrack( @@ -306,8 +329,11 @@ TEST_P(MoQRelayTest, PublishReplacesSubscribeDrainsOldAndServesNew) { auto publishConsumer = doPublish(publisherSession, kTestTrackName); ASSERT_NE(publishConsumer, nullptr); - // Old subscriber must have been drained - EXPECT_TRUE(publishDoneReceived) << "Old subscribe-path subscriber should receive publishDone"; + // Old subscriber must have been drained. publishDone crosses executors to the + // old subscribe-path consumer, so wait for it rather than asserting after a + // fixed drive. + EXPECT_TRUE(driveUntil([&] { return publishDoneReceived.load(); })) + << "Old subscribe-path subscriber should receive publishDone"; // New publish-path subscription should be functional: subscribe a new // downstream consumer and verify it receives data from the publisher @@ -324,9 +350,11 @@ TEST_P(MoQRelayTest, PublishReplacesSubscribeDrainsOldAndServesNew) { auto sgRes = publishConsumer->beginSubgroup(0, 0, 0); ASSERT_TRUE(sgRes.hasValue()); EXPECT_TRUE(sgRes.value()->endOfSubgroup().hasValue()); + driveIfMultiThread(); // flush relayExec_ so beginSubgroup/endOfSubgroup reach subscribers removeSession(publisherSession); removeSession(subscriberSession); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } // Regression test: publisher reconnects while a subscribe coroutine is @@ -543,6 +571,7 @@ TEST_P(MoQRelayTest, PublishDonePrunesNamespaceTreeNode) { exec_->drive(); removeSession(publisher); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } // Empty namespace: publishNamespace with an empty TrackNamespace must not crash. diff --git a/test/MoqxRelaySubNsTests.cpp b/test/MoqxRelaySubNsTests.cpp index 7305987f..1bbd2921 100644 --- a/test/MoqxRelaySubNsTests.cpp +++ b/test/MoqxRelaySubNsTests.cpp @@ -87,6 +87,7 @@ TEST_P(MoQRelayTest, SubscribeNamespaceDoesntAddDrainingPublish) { removeSession(publisherSession); removeSession(subscriber1); removeSession(subscriber2); + driveIfMultiThread(); // flush relay cleanup so it drops session refs before mocks are destroyed } TEST_P(MoQRelayTest, SubscribeNamespaceEmptyPrefixRejectedPreV16) { diff --git a/test/MoqxRelaySubscribeTests.cpp b/test/MoqxRelaySubscribeTests.cpp index 8ec8e3dc..929a2a9c 100644 --- a/test/MoqxRelaySubscribeTests.cpp +++ b/test/MoqxRelaySubscribeTests.cpp @@ -34,6 +34,8 @@ TEST_P(MoQRelayTest, ForwardChangedAfterPublisherTermination) { }); auto subgroupRes = publishConsumer->beginSubgroup(0, 0, 0); ASSERT_TRUE(subgroupRes.hasValue()); + driveIfMultiThread( + ); // flush beginSubgroup so relay subgroup forwarder is wired before publishDone // Publisher terminates — onPublishDone clears handle/upstream. // forwarder->publishDone sets draining and calls drainSubscriber, but the @@ -61,6 +63,7 @@ TEST_P(MoQRelayTest, ForwardChangedAfterPublisherTermination) { removeSession(publisherSession); removeSession(subSession); + driveIfMultiThread(); // flush pending lambdas (sg->reset, cleanup) before mocks are destroyed } // Bug: when a second subscriber with forward=true joins an existing PUBLISH-path diff --git a/test/MoqxRelayTestFixture.cpp b/test/MoqxRelayTestFixture.cpp index 051d97ff..75ec9e53 100644 --- a/test/MoqxRelayTestFixture.cpp +++ b/test/MoqxRelayTestFixture.cpp @@ -16,7 +16,16 @@ void TestMoQExecutor::add(folly::Func func) { MoQFollyExecutorImpl::add(std::move(func)); } void TestMoQExecutor::drive() { - if (auto* evb = getBackingEventBase()) { + auto* evb = getBackingEventBase(); + if (!evb) { + return; + } + evb->loopOnce(); + if (relayEvb_) { + // flush our pending task to relay + relayEvb_->runInEventBaseThreadAndWait([]() {}); + // now flush any tasks created by our task + relayEvb_->runInEventBaseThreadAndWait([]() {}); evb->loopOnce(); } } @@ -30,14 +39,40 @@ void MoQRelayTest::SetUp() { exec_ = std::make_shared(); relay_ = std::make_shared(config::CacheConfig{.maxCachedTracks = 0}); relay_->setAllowedNamespacePrefix(kAllowedPrefix); + if (relayMode() == RelayMode::MultiThread) { + relayThread_ = std::make_unique("relay-test"); + relayEvb_ = relayThread_->getEventBase(); + relay_->setRelayExec(relayEvb_); + exec_->setRelayEvb(relayEvb_); + ASSERT_NE(relay_->getRelayExec(), nullptr); + publisherInterface_ = + std::make_shared(relay_->getRelayExec(), relay_); + subscriberInterface_ = + std::make_shared(relay_->getRelayExec(), relay_); + } } void MoQRelayTest::resetRelay(std::shared_ptr relay) { relay_ = std::move(relay); + if (relayEvb_) { + relay_->setRelayExec(relayEvb_); + publisherInterface_ = std::make_shared(relayEvb_, relay_); + subscriberInterface_ = std::make_shared(relayEvb_, relay_); + } } void MoQRelayTest::TearDown() { + // Drain any pending relay exec tasks (e.g., async publishNamespaceDone dispatches + // from cleanup) before destroying relay state to avoid use-after-free on NamespaceTree. + if (relayEvb_) { + relayEvb_->runInEventBaseThreadAndWait([]() {}); + relayEvb_->runInEventBaseThreadAndWait([]() {}); + } + exec_->setRelayEvb(nullptr); + publisherInterface_.reset(); + subscriberInterface_.reset(); relay_.reset(); + relayThread_.reset(); } std::shared_ptr MoQRelayTest::createMockSession() { @@ -195,6 +230,7 @@ std::shared_ptr MoQRelayTest::doPublish( } co_withExecutor(static_cast(exec_.get()), std::move(res->reply)) .start(); + driveIfMultiThread(); // flush reply to relay exec so publish state is ready return res->consumer; } return std::shared_ptr(nullptr); @@ -255,6 +291,7 @@ std::shared_ptr MoQRelayTest::doPublishWithHandle( getOrCreateMockState(session)->publishConsumers.push_back(consumer); co_withExecutor(static_cast(exec_.get()), std::move(res->reply)) .start(); + driveIfMultiThread(); // flush reply to relay exec so publish state is ready return consumer; }); } diff --git a/test/MoqxRelayTestFixture.h b/test/MoqxRelayTestFixture.h index 245a1cbe..460b6c66 100644 --- a/test/MoqxRelayTestFixture.h +++ b/test/MoqxRelayTestFixture.h @@ -12,8 +12,11 @@ #pragma once #include "MoqxRelay.h" +#include "relay/PublisherCrossExecFilter.h" +#include "relay/SubscriberCrossExecFilter.h" #include #include +#include #include #include #include @@ -31,6 +34,8 @@ namespace moxygen::test { enum class RelayMode { SingleThread, + MultiThread, + // Future: Mode3 — add here, then add a branch in SetUp() and INSTANTIATE entry }; inline void PrintTo(RelayMode mode, std::ostream* os) { @@ -38,6 +43,9 @@ inline void PrintTo(RelayMode mode, std::ostream* os) { case RelayMode::SingleThread: *os << "SingleThread"; return; + case RelayMode::MultiThread: + *os << "MultiThread"; + return; } } @@ -55,8 +63,11 @@ class TestMoQExecutor : public MoQFollyExecutorImpl, public folly::DrivableExecu void drive() override; void driveFor(int n); + void setRelayEvb(folly::EventBase* evb) { relayEvb_ = evb; } + private: folly::EventBase evb_; + folly::EventBase* relayEvb_{nullptr}; }; // Test fixture for MoqxRelay and NamespaceTree tests. @@ -83,7 +94,36 @@ class MoQRelayTest : public ::testing::TestWithParam { folly::Optional expectedError = folly::none ); - template void verifyOnRelayExec(Func&& func) { func(); } + template void verifyOnRelayExec(Func&& func) { + if (relayEvb_) { + relayEvb_->runInEventBaseThreadAndWait(std::forward(func)); + } else { + func(); + } + } + + void driveIfMultiThread() { + if (relayEvb_) { + exec_->drive(); + } + } + + // Drive both executors until `done()` is true or maxIters is reached. Each + // exec_->drive() flushes exec_ and (in MT/LocalForwarderMT modes) synchronizes + // with relayEvb_ via runInEventBaseThreadAndWait, so this deterministically + // advances the relay's async cascades (e.g. forwardChanged/NGR requestUpdate) + // instead of guessing a fixed drive count. Returns done() — false means it + // timed out. In SingleThread mode drive() degrades to a single loopOnce. + template bool driveUntil(Pred&& done, int maxIters = 500) { + // SingleThread mode is fully synchronous: there is no relay thread to await, + // so at most one loopOnce can make new progress. Cap iterations at 1 to + // avoid spinning loopOnce maxIters times when a predicate stays false. + int iters = relayEvb_ ? maxIters : 1; + for (int i = 0; i < iters && !done(); ++i) { + exec_->drive(); + } + return done(); + } template auto withSessionContext(std::shared_ptr session, Func&& func) -> decltype(func()) { @@ -166,6 +206,8 @@ class MoQRelayTest : public ::testing::TestWithParam { std::shared_ptr exec_; std::shared_ptr relay_; + std::unique_ptr relayThread_; + folly::EventBase* relayEvb_{nullptr}; }; } // namespace moxygen::test diff --git a/test/MoqxRelayTestModes.cpp b/test/MoqxRelayTestModes.cpp index 38faea89..bc8d5427 100644 --- a/test/MoqxRelayTestModes.cpp +++ b/test/MoqxRelayTestModes.cpp @@ -9,11 +9,13 @@ namespace moxygen::test { INSTANTIATE_TEST_SUITE_P( AllModes, MoQRelayTest, - ::testing::Values(RelayMode::SingleThread), + ::testing::Values(RelayMode::SingleThread, RelayMode::MultiThread), [](const ::testing::TestParamInfo& info) -> std::string { switch (info.param) { case RelayMode::SingleThread: return "SingleThread"; + case RelayMode::MultiThread: + return "MultiThread"; } return "Unknown"; }