diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index 6756237b..951c48c7 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -2795,6 +2795,65 @@ MoQSession::getSubscribeTrackReceiveState(TrackAlias trackAlias) { return trackIt->second; } +folly::Expected< + std::shared_ptr, + FetchError> +MoQSession::resolveJoiningFetch( + JoiningFetch& joining, const FullTrackName& fullTrackName) { + if (joining.joiningRequestID == kAutoRequestID) { + for (const auto& [reqID, pending] : pendingRequests_) { + if (const auto* trackPtr = pending->tryGetSubscribeTrack()) { + if ((*trackPtr)->fullTrackName() == fullTrackName) { + joining.joiningRequestID = reqID; + return *trackPtr; + } + } + } + for (const auto& [alias, state] : subTracks_) { + if (state->fullTrackName() == fullTrackName) { + joining.joiningRequestID = state->getRequestID(); + return state; + } + } + XLOG(ERR) << "kAutoRequestID joining FETCH found no match" + << " ftn=" << fullTrackName << " sess=" << this; + return folly::makeUnexpected(FetchError{ + kAutoRequestID, + FetchErrorCode::INTERNAL_ERROR, + "No matching subscribe for auto joining FETCH"}); + } + + std::shared_ptr state; + auto pendingIt = pendingRequests_.find(joining.joiningRequestID); + if (pendingIt != pendingRequests_.end()) { + if (auto* trackPtr = pendingIt->second->tryGetSubscribeTrack()) { + state = *trackPtr; + } + } else { + auto subIt = reqIdToTrackAlias_.find(joining.joiningRequestID); + if (subIt != reqIdToTrackAlias_.end()) { + state = getSubscribeTrackReceiveState(subIt->second); + } + } + if (!state) { + XLOG(ERR) << "API error, joining FETCH for invalid requestID=" + << joining.joiningRequestID.value << " sess=" << this; + return folly::makeUnexpected(FetchError{ + std::numeric_limits::max(), + FetchErrorCode::INTERNAL_ERROR, + "Invalid JSID"}); + } + if (fullTrackName != state->fullTrackName()) { + XLOG(ERR) << "API error, track name mismatch=" << fullTrackName << "," + << state->fullTrackName() << " sess=" << this; + return folly::makeUnexpected(FetchError{ + std::numeric_limits::max(), + FetchErrorCode::INTERNAL_ERROR, + "Track name mismatch"}); + } + return state; +} + std::shared_ptr MoQSession::getFetchTrackReceiveState(RequestID requestID) { XLOG(DBG3) << "getTrack reqID=" << requestID; @@ -5324,40 +5383,14 @@ folly::coro::Task MoQSession::fetch( auto [standalone, joining] = fetchType(fetch); FullTrackName fullTrackName = fetch.fullTrackName; if (joining) { - std::shared_ptr state; - auto pendingIt = pendingRequests_.find(joining->joiningRequestID); - if (pendingIt != pendingRequests_.end()) { - if (auto* trackPtr = pendingIt->second->tryGetSubscribeTrack()) { - state = *trackPtr; - } - } else { - auto subIt = reqIdToTrackAlias_.find(joining->joiningRequestID); - if (subIt != reqIdToTrackAlias_.end()) { - state = getSubscribeTrackReceiveState(subIt->second); - } - } - if (!state) { - XLOG(ERR) << "API error, joining FETCH for invalid requestID=" - << joining->joiningRequestID.value << " sess=" << this; - FetchError fetchError = { - std::numeric_limits::max(), - FetchErrorCode::INTERNAL_ERROR, - "Invalid JSID"}; - MOQ_SUBSCRIBER_STATS( - subscriberStatsCallback_, onFetchError, fetchError.errorCode); - co_return folly::makeUnexpected(fetchError); - } - - if (fullTrackName != state->fullTrackName()) { - XLOG(ERR) << "API error, track name mismatch=" << fullTrackName << "," - << state->fullTrackName() << " sess=" << this; - FetchError fetchError = { - std::numeric_limits::max(), - FetchErrorCode::INTERNAL_ERROR, - "Track name mismatch"}; + // May populate joining->joiningRequestID when kAutoRequestID is passed. + auto stateResult = resolveJoiningFetch(*joining, fullTrackName); + if (stateResult.hasError()) { MOQ_SUBSCRIBER_STATS( - subscriberStatsCallback_, onFetchError, fetchError.errorCode); - co_return folly::makeUnexpected(fetchError); + subscriberStatsCallback_, + onFetchError, + stateResult.error().errorCode); + co_return folly::makeUnexpected(stateResult.error()); } } auto reqID = getNextRequestID(); diff --git a/moxygen/MoQSession.h b/moxygen/MoQSession.h index 03af8e29..747ab131 100644 --- a/moxygen/MoQSession.h +++ b/moxygen/MoQSession.h @@ -764,6 +764,12 @@ class MoQSession : public Subscriber, Parameters& params, const std::optional& forceVersion = std::nullopt); RequestID getNextRequestID(); + // Resolves joining.joiningRequestID (including kAutoRequestID) and validates + // the resulting state against fullTrackName. Sets joining.joiningRequestID + // to the resolved value when kAutoRequestID is used. + folly::Expected, FetchError> + resolveJoiningFetch( + JoiningFetch& joining, const FullTrackName& fullTrackName); void setRequestSession() { folly::RequestContext::get()->setContextData( sessionRequestToken(), diff --git a/moxygen/MoQTypes.h b/moxygen/MoQTypes.h index 915d8853..b187a48d 100644 --- a/moxygen/MoQTypes.h +++ b/moxygen/MoQTypes.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -741,8 +742,8 @@ struct TrackAlias { std::ostream& operator<<(std::ostream& os, TrackAlias alias); struct RequestID { - /* implicit */ RequestID(uint64_t v) : value(v) {} - RequestID() = default; + constexpr /* implicit */ RequestID(uint64_t v) : value(v) {} + constexpr RequestID() = default; uint64_t value{0}; bool operator==(const RequestID& s) const { return value == s.value; @@ -770,6 +771,10 @@ struct RequestID { }; std::ostream& operator<<(std::ostream& os, RequestID id); +// Sentinel value meaning "let the session resolve the request ID automatically" +// (e.g. by searching pending requests and active tracks for the matching track name). +constexpr RequestID kAutoRequestID{std::numeric_limits::max()}; + // TrackIdentifier variant removed - ObjectHeader now uses TrackAlias directly struct Extension { diff --git a/moxygen/relay/MoQForwarder.cpp b/moxygen/relay/MoQForwarder.cpp index 27466e29..6e2d569c 100644 --- a/moxygen/relay/MoQForwarder.cpp +++ b/moxygen/relay/MoQForwarder.cpp @@ -289,7 +289,8 @@ std::shared_ptr MoQForwarder::addSubscriber( std::shared_ptr MoQForwarder::addChannelSubscriber( folly::Executor* exec, bool forward, - std::shared_ptr consumer) { + std::shared_ptr consumer, + bool passive) { if (draining_) { XLOG(ERR) << "addChannelSubscriber called on draining track"; return nullptr; @@ -312,10 +313,15 @@ std::shared_ptr MoQForwarder::addChannelSubscriber( SubscribeRange{{0, 0}, kLocationMax}, std::move(consumer), forward); + subscriber->passive = passive; subscriber->mapKey = key; auto [it, inserted] = subscribers_.emplace(key, subscriber); - if (inserted && forward) { - addForwardingSubscriber(); + if (inserted) { + if (passive) { + passiveCount_++; + } else if (forward) { + addForwardingSubscriber(); + } } return it->second; } @@ -597,7 +603,15 @@ MoQForwarder::beginSubgroup( if (it != sub->subgroups.end() && it->second) { it->second->reset(ResetStreamErrorCode::CANCELLED); sub->subgroups.erase(it); - anyReset = true; + // Passive subscribers (e.g. the relay's own top-N/cache observer chain) + // are not real downstream consumers: they never stop_sending, so they + // must not mask the "no active consumers" signal. Reset their stale + // subgroup but do not count them toward anyReset, otherwise a duplicate + // subgroup would never propagate CANCELLED back to the publisher once + // all real consumers have stop_sent. + if (!sub->passive) { + anyReset = true; + } } } existingIt->second->detach(); diff --git a/moxygen/relay/MoQForwarder.h b/moxygen/relay/MoQForwarder.h index 9fe8e641..6457bef4 100644 --- a/moxygen/relay/MoQForwarder.h +++ b/moxygen/relay/MoQForwarder.h @@ -211,10 +211,17 @@ class MoQForwarder : public TrackConsumer { // the unique map key so only one cross-exec filter per executor is added. // Returns the Subscriber handle; call removeChannelSubscriber(handle) when // the local forwarder drains. + // + // passive=true marks the channel subscriber as not counting toward + // forwardingSubscribers_ or blocking onEmpty (see addSubscriber). Use it for + // the relay's own internal chain (top-N/termination/cache) attached below a + // local-forwarder primary, so the primary's onEmpty still fires once the last + // real cross-exec subscriber leaves. std::shared_ptr addChannelSubscriber( folly::Executor* exec, bool forward, - std::shared_ptr consumer); + std::shared_ptr consumer, + bool passive = false); // Remove a channel subscriber added via addChannelSubscriber(). void removeChannelSubscriber(