Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 66 additions & 33 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2795,6 +2795,65 @@ MoQSession::getSubscribeTrackReceiveState(TrackAlias trackAlias) {
return trackIt->second;
}

folly::Expected<
std::shared_ptr<MoQSession::SubscribeTrackReceiveState>,
FetchError>
MoQSession::resolveJoiningFetch(
JoiningFetch& joining, const FullTrackName& fullTrackName) {
if (joining.joiningRequestID == kAutoRequestID) {
for (const auto& [reqID, pending] : pendingRequests_) {
if (const auto* trackPtr = pending->tryGetSubscribeTrack()) {
if ((*trackPtr)->fullTrackName() == fullTrackName) {
joining.joiningRequestID = reqID;
return *trackPtr;
}
}
}
for (const auto& [alias, state] : subTracks_) {
if (state->fullTrackName() == fullTrackName) {
joining.joiningRequestID = state->getRequestID();
return state;
}
}
XLOG(ERR) << "kAutoRequestID joining FETCH found no match"
<< " ftn=" << fullTrackName << " sess=" << this;
return folly::makeUnexpected(FetchError{
kAutoRequestID,
FetchErrorCode::INTERNAL_ERROR,
"No matching subscribe for auto joining FETCH"});
}

std::shared_ptr<SubscribeTrackReceiveState> state;
auto pendingIt = pendingRequests_.find(joining.joiningRequestID);
if (pendingIt != pendingRequests_.end()) {
if (auto* trackPtr = pendingIt->second->tryGetSubscribeTrack()) {
state = *trackPtr;
}
} else {
auto subIt = reqIdToTrackAlias_.find(joining.joiningRequestID);
if (subIt != reqIdToTrackAlias_.end()) {
state = getSubscribeTrackReceiveState(subIt->second);
}
}
if (!state) {
XLOG(ERR) << "API error, joining FETCH for invalid requestID="
<< joining.joiningRequestID.value << " sess=" << this;
return folly::makeUnexpected(FetchError{
std::numeric_limits<uint64_t>::max(),
FetchErrorCode::INTERNAL_ERROR,
"Invalid JSID"});
}
if (fullTrackName != state->fullTrackName()) {
XLOG(ERR) << "API error, track name mismatch=" << fullTrackName << ","
<< state->fullTrackName() << " sess=" << this;
return folly::makeUnexpected(FetchError{
std::numeric_limits<uint64_t>::max(),
FetchErrorCode::INTERNAL_ERROR,
"Track name mismatch"});
}
return state;
}

std::shared_ptr<MoQSession::FetchTrackReceiveState>
MoQSession::getFetchTrackReceiveState(RequestID requestID) {
XLOG(DBG3) << "getTrack reqID=" << requestID;
Expand Down Expand Up @@ -5324,40 +5383,14 @@ folly::coro::Task<Publisher::FetchResult> MoQSession::fetch(
auto [standalone, joining] = fetchType(fetch);
FullTrackName fullTrackName = fetch.fullTrackName;
if (joining) {
std::shared_ptr<SubscribeTrackReceiveState> state;
auto pendingIt = pendingRequests_.find(joining->joiningRequestID);
if (pendingIt != pendingRequests_.end()) {
if (auto* trackPtr = pendingIt->second->tryGetSubscribeTrack()) {
state = *trackPtr;
}
} else {
auto subIt = reqIdToTrackAlias_.find(joining->joiningRequestID);
if (subIt != reqIdToTrackAlias_.end()) {
state = getSubscribeTrackReceiveState(subIt->second);
}
}
if (!state) {
XLOG(ERR) << "API error, joining FETCH for invalid requestID="
<< joining->joiningRequestID.value << " sess=" << this;
FetchError fetchError = {
std::numeric_limits<uint64_t>::max(),
FetchErrorCode::INTERNAL_ERROR,
"Invalid JSID"};
MOQ_SUBSCRIBER_STATS(
subscriberStatsCallback_, onFetchError, fetchError.errorCode);
co_return folly::makeUnexpected(fetchError);
}

if (fullTrackName != state->fullTrackName()) {
XLOG(ERR) << "API error, track name mismatch=" << fullTrackName << ","
<< state->fullTrackName() << " sess=" << this;
FetchError fetchError = {
std::numeric_limits<uint64_t>::max(),
FetchErrorCode::INTERNAL_ERROR,
"Track name mismatch"};
// May populate joining->joiningRequestID when kAutoRequestID is passed.
auto stateResult = resolveJoiningFetch(*joining, fullTrackName);
if (stateResult.hasError()) {
MOQ_SUBSCRIBER_STATS(
subscriberStatsCallback_, onFetchError, fetchError.errorCode);
co_return folly::makeUnexpected(fetchError);
subscriberStatsCallback_,
onFetchError,
stateResult.error().errorCode);
co_return folly::makeUnexpected(stateResult.error());
}
}
auto reqID = getNextRequestID();
Expand Down
6 changes: 6 additions & 0 deletions moxygen/MoQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,12 @@ class MoQSession : public Subscriber,
Parameters& params,
const std::optional<uint64_t>& forceVersion = std::nullopt);
RequestID getNextRequestID();
// Resolves joining.joiningRequestID (including kAutoRequestID) and validates
// the resulting state against fullTrackName. Sets joining.joiningRequestID
// to the resolved value when kAutoRequestID is used.
folly::Expected<std::shared_ptr<SubscribeTrackReceiveState>, FetchError>
resolveJoiningFetch(
JoiningFetch& joining, const FullTrackName& fullTrackName);
void setRequestSession() {
folly::RequestContext::get()->setContextData(
sessionRequestToken(),
Expand Down
9 changes: 7 additions & 2 deletions moxygen/MoQTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <folly/hash/Hash.h>
#include <folly/io/IOBuf.h>
#include <algorithm>
#include <limits>
#include <optional>
#include <vector>

Expand Down Expand Up @@ -741,8 +742,8 @@ struct TrackAlias {
std::ostream& operator<<(std::ostream& os, TrackAlias alias);

struct RequestID {
/* implicit */ RequestID(uint64_t v) : value(v) {}
RequestID() = default;
constexpr /* implicit */ RequestID(uint64_t v) : value(v) {}
constexpr RequestID() = default;
uint64_t value{0};
bool operator==(const RequestID& s) const {
return value == s.value;
Expand Down Expand Up @@ -770,6 +771,10 @@ struct RequestID {
};
std::ostream& operator<<(std::ostream& os, RequestID id);

// Sentinel value meaning "let the session resolve the request ID automatically"
// (e.g. by searching pending requests and active tracks for the matching track name).
constexpr RequestID kAutoRequestID{std::numeric_limits<uint64_t>::max()};

// TrackIdentifier variant removed - ObjectHeader now uses TrackAlias directly

struct Extension {
Expand Down
22 changes: 18 additions & 4 deletions moxygen/relay/MoQForwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ std::shared_ptr<MoQForwarder::Subscriber> MoQForwarder::addSubscriber(
std::shared_ptr<MoQForwarder::Subscriber> MoQForwarder::addChannelSubscriber(
folly::Executor* exec,
bool forward,
std::shared_ptr<TrackConsumer> consumer) {
std::shared_ptr<TrackConsumer> consumer,
bool passive) {
if (draining_) {
XLOG(ERR) << "addChannelSubscriber called on draining track";
return nullptr;
Expand All @@ -312,10 +313,15 @@ std::shared_ptr<MoQForwarder::Subscriber> MoQForwarder::addChannelSubscriber(
SubscribeRange{{0, 0}, kLocationMax},
std::move(consumer),
forward);
subscriber->passive = passive;
subscriber->mapKey = key;
auto [it, inserted] = subscribers_.emplace(key, subscriber);
if (inserted && forward) {
addForwardingSubscriber();
if (inserted) {
if (passive) {
passiveCount_++;
} else if (forward) {
addForwardingSubscriber();
}
}
return it->second;
}
Expand Down Expand Up @@ -597,7 +603,15 @@ MoQForwarder::beginSubgroup(
if (it != sub->subgroups.end() && it->second) {
it->second->reset(ResetStreamErrorCode::CANCELLED);
sub->subgroups.erase(it);
anyReset = true;
// Passive subscribers (e.g. the relay's own top-N/cache observer chain)
// are not real downstream consumers: they never stop_sending, so they
// must not mask the "no active consumers" signal. Reset their stale
// subgroup but do not count them toward anyReset, otherwise a duplicate
// subgroup would never propagate CANCELLED back to the publisher once
// all real consumers have stop_sent.
if (!sub->passive) {
anyReset = true;
}
}
}
existingIt->second->detach();
Expand Down
9 changes: 8 additions & 1 deletion moxygen/relay/MoQForwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,17 @@ class MoQForwarder : public TrackConsumer {
// the unique map key so only one cross-exec filter per executor is added.
// Returns the Subscriber handle; call removeChannelSubscriber(handle) when
// the local forwarder drains.
//
// passive=true marks the channel subscriber as not counting toward
// forwardingSubscribers_ or blocking onEmpty (see addSubscriber). Use it for
// the relay's own internal chain (top-N/termination/cache) attached below a
// local-forwarder primary, so the primary's onEmpty still fires once the last
// real cross-exec subscriber leaves.
std::shared_ptr<MoQForwarder::Subscriber> addChannelSubscriber(
folly::Executor* exec,
bool forward,
std::shared_ptr<TrackConsumer> consumer);
std::shared_ptr<TrackConsumer> consumer,
bool passive = false);

// Remove a channel subscriber added via addChannelSubscriber().
void removeChannelSubscriber(
Expand Down
Loading