From 2dff34e0eaf67953c3512c1d6e5be6164770ce38 Mon Sep 17 00:00:00 2001 From: afrind Date: Sun, 31 May 2026 22:55:39 -0400 Subject: [PATCH 1/2] Add kAutoRequestID and resolveJoiningFetch helper kAutoRequestID is a sentinel RequestID (max uint64) that callers can pass in a JoiningFetch to let the session find the matching subscribe by track name rather than requiring the caller to track the request ID manually. resolveJoiningFetch handles both the auto case (searches pendingRequests_ and subTracks_ by FullTrackName, writes back the resolved ID) and the explicit case (existing lookup + validation), replacing the inline block in MoQSession::fetch with a single call. Co-Authored-By: Claude Sonnet 4.6 --- moxygen/MoQSession.cpp | 99 ++++++++++++++++++++++++++++-------------- moxygen/MoQSession.h | 6 +++ moxygen/MoQTypes.h | 9 +++- 3 files changed, 79 insertions(+), 35 deletions(-) diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index 6756237b..951c48c7 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -2795,6 +2795,65 @@ MoQSession::getSubscribeTrackReceiveState(TrackAlias trackAlias) { return trackIt->second; } +folly::Expected< + std::shared_ptr, + FetchError> +MoQSession::resolveJoiningFetch( + JoiningFetch& joining, const FullTrackName& fullTrackName) { + if (joining.joiningRequestID == kAutoRequestID) { + for (const auto& [reqID, pending] : pendingRequests_) { + if (const auto* trackPtr = pending->tryGetSubscribeTrack()) { + if ((*trackPtr)->fullTrackName() == fullTrackName) { + joining.joiningRequestID = reqID; + return *trackPtr; + } + } + } + for (const auto& [alias, state] : subTracks_) { + if (state->fullTrackName() == fullTrackName) { + joining.joiningRequestID = state->getRequestID(); + return state; + } + } + XLOG(ERR) << "kAutoRequestID joining FETCH found no match" + << " ftn=" << fullTrackName << " sess=" << this; + return folly::makeUnexpected(FetchError{ + kAutoRequestID, + FetchErrorCode::INTERNAL_ERROR, + "No matching subscribe for auto joining FETCH"}); + } + + std::shared_ptr state; + auto pendingIt = pendingRequests_.find(joining.joiningRequestID); + if (pendingIt != pendingRequests_.end()) { + if (auto* trackPtr = pendingIt->second->tryGetSubscribeTrack()) { + state = *trackPtr; + } + } else { + auto subIt = reqIdToTrackAlias_.find(joining.joiningRequestID); + if (subIt != reqIdToTrackAlias_.end()) { + state = getSubscribeTrackReceiveState(subIt->second); + } + } + if (!state) { + XLOG(ERR) << "API error, joining FETCH for invalid requestID=" + << joining.joiningRequestID.value << " sess=" << this; + return folly::makeUnexpected(FetchError{ + std::numeric_limits::max(), + FetchErrorCode::INTERNAL_ERROR, + "Invalid JSID"}); + } + if (fullTrackName != state->fullTrackName()) { + XLOG(ERR) << "API error, track name mismatch=" << fullTrackName << "," + << state->fullTrackName() << " sess=" << this; + return folly::makeUnexpected(FetchError{ + std::numeric_limits::max(), + FetchErrorCode::INTERNAL_ERROR, + "Track name mismatch"}); + } + return state; +} + std::shared_ptr MoQSession::getFetchTrackReceiveState(RequestID requestID) { XLOG(DBG3) << "getTrack reqID=" << requestID; @@ -5324,40 +5383,14 @@ folly::coro::Task MoQSession::fetch( auto [standalone, joining] = fetchType(fetch); FullTrackName fullTrackName = fetch.fullTrackName; if (joining) { - std::shared_ptr state; - auto pendingIt = pendingRequests_.find(joining->joiningRequestID); - if (pendingIt != pendingRequests_.end()) { - if (auto* trackPtr = pendingIt->second->tryGetSubscribeTrack()) { - state = *trackPtr; - } - } else { - auto subIt = reqIdToTrackAlias_.find(joining->joiningRequestID); - if (subIt != reqIdToTrackAlias_.end()) { - state = getSubscribeTrackReceiveState(subIt->second); - } - } - if (!state) { - XLOG(ERR) << "API error, joining FETCH for invalid requestID=" - << joining->joiningRequestID.value << " sess=" << this; - FetchError fetchError = { - std::numeric_limits::max(), - FetchErrorCode::INTERNAL_ERROR, - "Invalid JSID"}; - MOQ_SUBSCRIBER_STATS( - subscriberStatsCallback_, onFetchError, fetchError.errorCode); - co_return folly::makeUnexpected(fetchError); - } - - if (fullTrackName != state->fullTrackName()) { - XLOG(ERR) << "API error, track name mismatch=" << fullTrackName << "," - << state->fullTrackName() << " sess=" << this; - FetchError fetchError = { - std::numeric_limits::max(), - FetchErrorCode::INTERNAL_ERROR, - "Track name mismatch"}; + // May populate joining->joiningRequestID when kAutoRequestID is passed. + auto stateResult = resolveJoiningFetch(*joining, fullTrackName); + if (stateResult.hasError()) { MOQ_SUBSCRIBER_STATS( - subscriberStatsCallback_, onFetchError, fetchError.errorCode); - co_return folly::makeUnexpected(fetchError); + subscriberStatsCallback_, + onFetchError, + stateResult.error().errorCode); + co_return folly::makeUnexpected(stateResult.error()); } } auto reqID = getNextRequestID(); diff --git a/moxygen/MoQSession.h b/moxygen/MoQSession.h index 03af8e29..747ab131 100644 --- a/moxygen/MoQSession.h +++ b/moxygen/MoQSession.h @@ -764,6 +764,12 @@ class MoQSession : public Subscriber, Parameters& params, const std::optional& forceVersion = std::nullopt); RequestID getNextRequestID(); + // Resolves joining.joiningRequestID (including kAutoRequestID) and validates + // the resulting state against fullTrackName. Sets joining.joiningRequestID + // to the resolved value when kAutoRequestID is used. + folly::Expected, FetchError> + resolveJoiningFetch( + JoiningFetch& joining, const FullTrackName& fullTrackName); void setRequestSession() { folly::RequestContext::get()->setContextData( sessionRequestToken(), diff --git a/moxygen/MoQTypes.h b/moxygen/MoQTypes.h index 915d8853..b187a48d 100644 --- a/moxygen/MoQTypes.h +++ b/moxygen/MoQTypes.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -741,8 +742,8 @@ struct TrackAlias { std::ostream& operator<<(std::ostream& os, TrackAlias alias); struct RequestID { - /* implicit */ RequestID(uint64_t v) : value(v) {} - RequestID() = default; + constexpr /* implicit */ RequestID(uint64_t v) : value(v) {} + constexpr RequestID() = default; uint64_t value{0}; bool operator==(const RequestID& s) const { return value == s.value; @@ -770,6 +771,10 @@ struct RequestID { }; std::ostream& operator<<(std::ostream& os, RequestID id); +// Sentinel value meaning "let the session resolve the request ID automatically" +// (e.g. by searching pending requests and active tracks for the matching track name). +constexpr RequestID kAutoRequestID{std::numeric_limits::max()}; + // TrackIdentifier variant removed - ObjectHeader now uses TrackAlias directly struct Extension { From 8850292d49db9ca70a7bf627b2ba3d5539e9a2d0 Mon Sep 17 00:00:00 2001 From: afrind Date: Mon, 1 Jun 2026 02:45:12 -0400 Subject: [PATCH 2/2] MoQForwarder: add passive channel subscribers that don't gate forwarding Summary: Adds a `passive` flag to addChannelSubscriber() for the relay's own internal observer chain (top-N / termination / cache) attached below a local-forwarder primary. Passive subscribers are not real downstream consumers, so they must not influence the forwarder's "are there active consumers" accounting: - They increment passiveCount_ instead of calling addForwardingSubscriber(), so they don't count toward forwardingSubscribers_ or block onEmpty. The primary's onEmpty still fires once the last real cross-exec subscriber leaves. - In beginSubgroup(), resetting a passive subscriber's stale subgroup no longer sets anyReset. Passive subscribers never stop_sending, so counting them would mask the "no active consumers" signal and prevent a duplicate subgroup from propagating CANCELLED back to the publisher after all real consumers have stop_sent. Test Plan: --- moxygen/relay/MoQForwarder.cpp | 22 ++++++++++++++++++---- moxygen/relay/MoQForwarder.h | 9 ++++++++- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/moxygen/relay/MoQForwarder.cpp b/moxygen/relay/MoQForwarder.cpp index 27466e29..6e2d569c 100644 --- a/moxygen/relay/MoQForwarder.cpp +++ b/moxygen/relay/MoQForwarder.cpp @@ -289,7 +289,8 @@ std::shared_ptr MoQForwarder::addSubscriber( std::shared_ptr MoQForwarder::addChannelSubscriber( folly::Executor* exec, bool forward, - std::shared_ptr consumer) { + std::shared_ptr consumer, + bool passive) { if (draining_) { XLOG(ERR) << "addChannelSubscriber called on draining track"; return nullptr; @@ -312,10 +313,15 @@ std::shared_ptr MoQForwarder::addChannelSubscriber( SubscribeRange{{0, 0}, kLocationMax}, std::move(consumer), forward); + subscriber->passive = passive; subscriber->mapKey = key; auto [it, inserted] = subscribers_.emplace(key, subscriber); - if (inserted && forward) { - addForwardingSubscriber(); + if (inserted) { + if (passive) { + passiveCount_++; + } else if (forward) { + addForwardingSubscriber(); + } } return it->second; } @@ -597,7 +603,15 @@ MoQForwarder::beginSubgroup( if (it != sub->subgroups.end() && it->second) { it->second->reset(ResetStreamErrorCode::CANCELLED); sub->subgroups.erase(it); - anyReset = true; + // Passive subscribers (e.g. the relay's own top-N/cache observer chain) + // are not real downstream consumers: they never stop_sending, so they + // must not mask the "no active consumers" signal. Reset their stale + // subgroup but do not count them toward anyReset, otherwise a duplicate + // subgroup would never propagate CANCELLED back to the publisher once + // all real consumers have stop_sent. + if (!sub->passive) { + anyReset = true; + } } } existingIt->second->detach(); diff --git a/moxygen/relay/MoQForwarder.h b/moxygen/relay/MoQForwarder.h index 9fe8e641..6457bef4 100644 --- a/moxygen/relay/MoQForwarder.h +++ b/moxygen/relay/MoQForwarder.h @@ -211,10 +211,17 @@ class MoQForwarder : public TrackConsumer { // the unique map key so only one cross-exec filter per executor is added. // Returns the Subscriber handle; call removeChannelSubscriber(handle) when // the local forwarder drains. + // + // passive=true marks the channel subscriber as not counting toward + // forwardingSubscribers_ or blocking onEmpty (see addSubscriber). Use it for + // the relay's own internal chain (top-N/termination/cache) attached below a + // local-forwarder primary, so the primary's onEmpty still fires once the last + // real cross-exec subscriber leaves. std::shared_ptr addChannelSubscriber( folly::Executor* exec, bool forward, - std::shared_ptr consumer); + std::shared_ptr consumer, + bool passive = false); // Remove a channel subscriber added via addChannelSubscriber(). void removeChannelSubscriber(