diff --git a/.gitignore b/.gitignore index cb37fd0a..2efc5da7 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ _CPack_Packages/ # IDE / tools .vscode/ .claude/ +.cache/ # Docker secrets (never commit) docker/.env diff --git a/src/relay/CrossExecFilter.cpp b/src/relay/CrossExecFilter.cpp index 45647202..60789796 100644 --- a/src/relay/CrossExecFilter.cpp +++ b/src/relay/CrossExecFilter.cpp @@ -36,22 +36,24 @@ folly::Expected CrossExecSubgroupFilter:: bool finSubgroup ) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); - targetExec_->add([self = shared_from_this(), + targetExec_->add([this, objectID, payload = std::move(capturedPayload), extensions = std::move(extensions), finSubgroup]() mutable { - if (!self->downstream_) { - self->storeDeferredError(moxygen::MoQPublishError::CANCELLED); - return; + if (downstream_) { + auto result = + downstream_->object(objectID, std::move(payload), std::move(extensions), finSubgroup); + if (result.hasError()) { + closeWithError(result.error(), std::move(downstream_)); + } } - auto result = - self->downstream_->object(objectID, std::move(payload), std::move(extensions), finSubgroup); - if (result.hasError()) { - self->storeDeferredError(result.error()); + if (finSubgroup) { + deactivate(); } }); return folly::unit; @@ -64,23 +66,23 @@ folly::Expected CrossExecSubgroupFilter:: moxygen::Extensions extensions ) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } payloadTracker_.beginObject(length, initialPayload); auto capturedPayload = maybeDeepCopy(initialPayload, deepCopyPayload_); - targetExec_->add([self = shared_from_this(), + targetExec_->add([this, objectID, length, payload = std::move(capturedPayload), extensions = std::move(extensions)]() mutable { - if (!self->downstream_) { - self->storeDeferredError(moxygen::MoQPublishError::CANCELLED); + if (!downstream_) { return; } auto result = - self->downstream_->beginObject(objectID, length, std::move(payload), std::move(extensions)); + downstream_->beginObject(objectID, length, std::move(payload), std::move(extensions)); if (result.hasError()) { - self->storeDeferredError(result.error()); + closeWithError(result.error(), std::move(downstream_)); } }); return folly::unit; @@ -89,19 +91,20 @@ folly::Expected CrossExecSubgroupFilter:: folly::Expected CrossExecSubgroupFilter::objectPayload(moxygen::Payload payload, bool finSubgroup) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } auto status = payloadTracker_.consume(payload); auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); - targetExec_->add([self = shared_from_this(), payload = std::move(capturedPayload), finSubgroup]( - ) mutable { - if (!self->downstream_) { - self->storeDeferredError(moxygen::MoQPublishError::CANCELLED); - return; + targetExec_->add([this, payload = std::move(capturedPayload), finSubgroup]() mutable { + if (downstream_) { + auto result = downstream_->objectPayload(std::move(payload), finSubgroup); + if (result.hasError()) { + closeWithError(result.error(), std::move(downstream_)); + } } - auto result = self->downstream_->objectPayload(std::move(payload), finSubgroup); - if (result.hasError()) { - self->storeDeferredError(result.error()); + if (finSubgroup) { + deactivate(); } }); return status; @@ -110,16 +113,18 @@ CrossExecSubgroupFilter::objectPayload(moxygen::Payload payload, bool finSubgrou folly::Expected CrossExecSubgroupFilter::endOfGroup(uint64_t endOfGroupObjectID) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } - targetExec_->add([self = shared_from_this(), endOfGroupObjectID]() { - if (self->downstream_) { - auto result = self->downstream_->endOfGroup(endOfGroupObjectID); + targetExec_->add([this, endOfGroupObjectID]() { + if (downstream_) { + auto result = downstream_->endOfGroup(endOfGroupObjectID); if (result.hasError()) { // terminal — no storeDeferredError needed XLOG(ERR) << "endOfGroup: " << result.error().describe(); } } + deactivate(); }); return folly::unit; } @@ -127,32 +132,36 @@ CrossExecSubgroupFilter::endOfGroup(uint64_t endOfGroupObjectID) { folly::Expected CrossExecSubgroupFilter::endOfTrackAndGroup(uint64_t endOfTrackObjectID) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } - targetExec_->add([self = shared_from_this(), endOfTrackObjectID]() { - if (self->downstream_) { - auto result = self->downstream_->endOfTrackAndGroup(endOfTrackObjectID); + targetExec_->add([this, endOfTrackObjectID]() { + if (downstream_) { + auto result = downstream_->endOfTrackAndGroup(endOfTrackObjectID); if (result.hasError()) { // terminal — no storeDeferredError needed XLOG(ERR) << "endOfTrackAndGroup: " << result.error().describe(); } } + deactivate(); }); return folly::unit; } folly::Expected CrossExecSubgroupFilter::endOfSubgroup() { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } - targetExec_->add([self = shared_from_this()]() { - if (self->downstream_) { - auto result = self->downstream_->endOfSubgroup(); + targetExec_->add([this]() { + if (downstream_) { + auto result = downstream_->endOfSubgroup(); if (result.hasError()) { // terminal — no storeDeferredError needed XLOG(ERR) << "endOfSubgroup: " << result.error().describe(); } } + deactivate(); }); return folly::unit; } @@ -165,13 +174,18 @@ void CrossExecSubgroupFilter::reset(moxygen::ResetStreamErrorCode error) { if (self->downstream_) { self->downstream_->reset(error); } + self->deactivate(); }); } void CrossExecSubgroupFilter::checkpoint() { - targetExec_->add([self = shared_from_this()]() { - if (self->downstream_) { - self->downstream_->checkpoint(); + if (loadDeferredError()) { + enqueueDeactivate(); + return; + } + targetExec_->add([this]() { + if (downstream_) { + downstream_->checkpoint(); } }); } @@ -179,6 +193,7 @@ void CrossExecSubgroupFilter::checkpoint() { folly::Expected, moxygen::MoQPublishError> CrossExecSubgroupFilter::awaitReadyToConsume() { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } // TODO: backpressure not implemented; always reports ready @@ -192,14 +207,14 @@ CrossExecFilter::setTrackAlias(moxygen::TrackAlias alias) { if (auto err = loadDeferredError()) { return folly::makeUnexpected(*err); } - targetExec_->add([self = shared_from_this(), alias]() { - if (!self->downstream_) { - self->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); + targetExec_->add([this, alias]() { + if (!downstream_) { + storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); return; } - auto result = self->downstream_->setTrackAlias(alias); + auto result = downstream_->setTrackAlias(alias); if (result.hasError()) { - self->storeDeferredError(result.error()); + storeDeferredError(result.error()); } }); return folly::unit; @@ -215,25 +230,21 @@ CrossExecFilter::beginSubgroup( if (auto err = loadDeferredError()) { return folly::makeUnexpected(*err); } - auto subFilter = std::make_shared(targetExec_, deepCopyPayload_); - targetExec_->add( - [self = shared_from_this(), subFilter, groupID, subgroupID, priority, containsLastInGroup]( - ) mutable { - if (!self->downstream_) { - subFilter->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); - return; - } - auto result = - self->downstream_->beginSubgroup(groupID, subgroupID, priority, containsLastInGroup); - if (result.hasValue()) { - subFilter->setDownstream(std::move(result.value())); - subFilter->setKeepAlive(self->downstream_); - } else { - XLOG(ERR) << "CrossExecFilter beginSubgroup failed: " << result.error().describe(); - subFilter->storeDeferredError(result.error()); - } - } - ); + auto subFilter = CrossExecSubgroupFilter::create(targetExec_, deepCopyPayload_); + targetExec_->add([this, subFilter, groupID, subgroupID, priority, containsLastInGroup]() mutable { + if (!downstream_) { + subFilter->closeWithError(moxygen::MoQPublishError::WRITE_ERROR); + return; + } + auto result = downstream_->beginSubgroup(groupID, subgroupID, priority, containsLastInGroup); + if (result.hasValue()) { + subFilter->setDownstream(std::move(result.value())); + subFilter->setKeepAlive(downstream_); + } else { + XLOG(ERR) << "CrossExecFilter beginSubgroup failed: " << result.error().describe(); + subFilter->closeWithError(result.error()); + } + }); return subFilter; } @@ -255,18 +266,15 @@ folly::Expected CrossExecFilter::objectSt return folly::makeUnexpected(*err); } auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); - targetExec_->add([self = shared_from_this(), - header, - payload = std::move(capturedPayload), - lastInGroup]() mutable { - if (!self->downstream_) { - self->objectStreamErrors_.fetch_add(1, std::memory_order_relaxed); + targetExec_->add([this, header, payload = std::move(capturedPayload), lastInGroup]() mutable { + if (!downstream_) { + objectStreamErrors_.fetch_add(1, std::memory_order_relaxed); XLOG(ERR) << "objectStream: no downstream"; return; } - auto result = self->downstream_->objectStream(header, std::move(payload), lastInGroup); + auto result = downstream_->objectStream(header, std::move(payload), lastInGroup); if (result.hasError()) { - self->objectStreamErrors_.fetch_add(1, std::memory_order_relaxed); + objectStreamErrors_.fetch_add(1, std::memory_order_relaxed); XLOG(ERR) << "objectStream: " << result.error().describe(); } }); @@ -282,18 +290,15 @@ folly::Expected CrossExecFilter::datagram return folly::makeUnexpected(*err); } auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); - targetExec_->add([self = shared_from_this(), - header, - payload = std::move(capturedPayload), - lastInGroup]() mutable { - if (!self->downstream_) { - self->datagramErrors_.fetch_add(1, std::memory_order_relaxed); + targetExec_->add([this, header, payload = std::move(capturedPayload), lastInGroup]() mutable { + if (!downstream_) { + datagramErrors_.fetch_add(1, std::memory_order_relaxed); XLOG(ERR) << "datagram: no downstream"; return; } - auto result = self->downstream_->datagram(header, std::move(payload), lastInGroup); + auto result = downstream_->datagram(header, std::move(payload), lastInGroup); if (result.hasError()) { - self->datagramErrors_.fetch_add(1, std::memory_order_relaxed); + datagramErrors_.fetch_add(1, std::memory_order_relaxed); XLOG(ERR) << "datagram: " << result.error().describe(); } }); @@ -302,6 +307,8 @@ folly::Expected CrossExecFilter::datagram folly::Expected CrossExecFilter::publishDone(moxygen::PublishDone pubDone) { + // shared_from_this: always terminal; caller may drop its ref before the + // lambda executes on targetExec_. targetExec_->add([self = shared_from_this(), pubDone = std::move(pubDone)]() mutable { if (self->downstream_) { self->downstream_->publishDone(std::move(pubDone)); @@ -322,10 +329,11 @@ folly::Expected FetchCrossExecFilter::obj bool forwardingPreferenceIsDatagram ) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); - targetExec_->add([self = shared_from_this(), + targetExec_->add([this, groupID, subgroupID, objectID, @@ -333,21 +341,22 @@ folly::Expected FetchCrossExecFilter::obj extensions = std::move(extensions), finFetch, forwardingPreferenceIsDatagram]() mutable { - if (!self->downstream_) { - self->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); - return; + if (downstream_) { + auto result = downstream_->object( + groupID, + subgroupID, + objectID, + std::move(payload), + std::move(extensions), + finFetch, + forwardingPreferenceIsDatagram + ); + if (result.hasError()) { + closeWithError(result.error(), std::move(downstream_)); + } } - auto result = self->downstream_->object( - groupID, - subgroupID, - objectID, - std::move(payload), - std::move(extensions), - finFetch, - forwardingPreferenceIsDatagram - ); - if (result.hasError()) { - self->storeDeferredError(result.error()); + if (finFetch) { + deactivate(); } }); return folly::unit; @@ -362,22 +371,22 @@ folly::Expected FetchCrossExecFilter::beg moxygen::Extensions extensions ) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } payloadTracker_.beginObject(length, initialPayload); auto capturedPayload = maybeDeepCopy(initialPayload, deepCopyPayload_); - targetExec_->add([self = shared_from_this(), + targetExec_->add([this, groupID, subgroupID, objectID, length, payload = std::move(capturedPayload), extensions = std::move(extensions)]() mutable { - if (!self->downstream_) { - self->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); + if (!downstream_) { return; } - auto result = self->downstream_->beginObject( + auto result = downstream_->beginObject( groupID, subgroupID, objectID, @@ -386,28 +395,29 @@ folly::Expected FetchCrossExecFilter::beg std::move(extensions) ); if (result.hasError()) { - self->storeDeferredError(result.error()); + closeWithError(result.error(), std::move(downstream_)); } }); return folly::unit; } folly::Expected -FetchCrossExecFilter::objectPayload(moxygen::Payload payload, bool finSubgroup) { +FetchCrossExecFilter::objectPayload(moxygen::Payload payload, bool finFetch) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } auto status = payloadTracker_.consume(payload); auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); - targetExec_->add([self = shared_from_this(), payload = std::move(capturedPayload), finSubgroup]( - ) mutable { - if (!self->downstream_) { - self->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); - return; + targetExec_->add([this, payload = std::move(capturedPayload), finFetch]() mutable { + if (downstream_) { + auto result = downstream_->objectPayload(std::move(payload), finFetch); + if (result.hasError()) { + closeWithError(result.error(), std::move(downstream_)); + } } - auto result = self->downstream_->objectPayload(std::move(payload), finSubgroup); - if (result.hasError()) { - self->storeDeferredError(result.error()); + if (finFetch) { + deactivate(); } }); return status; @@ -420,16 +430,19 @@ folly::Expected FetchCrossExecFilter::end bool finFetch ) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } - targetExec_->add([self = shared_from_this(), groupID, subgroupID, objectID, finFetch]() { - if (self->downstream_) { - auto result = self->downstream_->endOfGroup(groupID, subgroupID, objectID, finFetch); + targetExec_->add([this, groupID, subgroupID, objectID, finFetch]() { + if (downstream_) { + auto result = downstream_->endOfGroup(groupID, subgroupID, objectID, finFetch); if (result.hasError()) { - // terminal — no storeDeferredError needed - XLOG(ERR) << "endOfGroup: " << result.error().describe(); + closeWithError(result.error(), std::move(downstream_)); } } + if (finFetch) { + deactivate(); + } }); return folly::unit; } @@ -437,40 +450,46 @@ folly::Expected FetchCrossExecFilter::end folly::Expected FetchCrossExecFilter::endOfTrackAndGroup(uint64_t groupID, uint64_t subgroupID, uint64_t objectID) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } - targetExec_->add([self = shared_from_this(), groupID, subgroupID, objectID]() { - if (self->downstream_) { - auto result = self->downstream_->endOfTrackAndGroup(groupID, subgroupID, objectID); + targetExec_->add([this, groupID, subgroupID, objectID]() { + if (downstream_) { + auto result = downstream_->endOfTrackAndGroup(groupID, subgroupID, objectID); if (result.hasError()) { - // terminal — no storeDeferredError needed XLOG(ERR) << "endOfTrackAndGroup: " << result.error().describe(); } } + deactivate(); }); return folly::unit; } folly::Expected FetchCrossExecFilter::endOfFetch() { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } - targetExec_->add([self = shared_from_this()]() { - if (self->downstream_) { - auto result = self->downstream_->endOfFetch(); + targetExec_->add([this]() { + if (downstream_) { + auto result = downstream_->endOfFetch(); if (result.hasError()) { - // terminal — no storeDeferredError needed XLOG(ERR) << "endOfFetch: " << result.error().describe(); } } + deactivate(); }); return folly::unit; } void FetchCrossExecFilter::checkpoint() { - targetExec_->add([self = shared_from_this()]() { - if (self->downstream_) { - self->downstream_->checkpoint(); + if (loadDeferredError()) { + enqueueDeactivate(); + return; + } + targetExec_->add([this]() { + if (downstream_) { + downstream_->checkpoint(); } }); } @@ -478,33 +497,41 @@ void FetchCrossExecFilter::checkpoint() { folly::Expected FetchCrossExecFilter::endOfUnknownRange(uint64_t groupID, uint64_t objectID, bool finFetch) { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } - targetExec_->add([self = shared_from_this(), groupID, objectID, finFetch]() { - if (self->downstream_) { - auto result = self->downstream_->endOfUnknownRange(groupID, objectID, finFetch); + targetExec_->add([this, groupID, objectID, finFetch]() { + if (downstream_) { + auto result = downstream_->endOfUnknownRange(groupID, objectID, finFetch); if (result.hasError()) { - self->storeDeferredError(result.error()); + closeWithError(result.error(), std::move(downstream_)); } } + if (finFetch) { + deactivate(); + } }); return folly::unit; } void FetchCrossExecFilter::reset(moxygen::ResetStreamErrorCode error) { - // storeDeferredError on calling thread; lambda needs no storeDeferredError even if downstream_ is - // null + // storeDeferredError on calling thread so subsequent calls fail fast. + // shared_from_this: reset() bypasses loadDeferredError(), so the source may + // call it after enqueueDeactivate() has already moved selfGuard_ out; we need an + // independent ref to guarantee the object outlives the lambda. storeDeferredError(moxygen::MoQPublishError::CANCELLED); targetExec_->add([self = shared_from_this(), error]() { if (self->downstream_) { self->downstream_->reset(error); } + self->deactivate(); }); } folly::Expected, moxygen::MoQPublishError> FetchCrossExecFilter::awaitReadyToConsume() { if (auto err = loadDeferredError()) { + enqueueDeactivate(); return folly::makeUnexpected(*err); } // TODO: backpressure not implemented; always reports ready diff --git a/src/relay/CrossExecFilter.h b/src/relay/CrossExecFilter.h index 7be3ac67..f2e2f71a 100644 --- a/src/relay/CrossExecFilter.h +++ b/src/relay/CrossExecFilter.h @@ -49,16 +49,81 @@ class CrossExecBase { std::atomic deferredError_{0}; }; +// CRTP mixin adding self-lifetime management to CrossExec filter classes. +// +// create() constructs the object via a PrivateTag-taking constructor (preventing +// direct make_shared by callers) then sets selfGuard_ = this so the object keeps +// itself alive until a terminal lambda on targetExec_ calls deactivate(). +// +// PrivateTag is protected so only Derived::create() can construct objects this way. +// +// Lifetime protocol: +// selfGuard_ keeps the object alive until a terminal lambda calls deactivate(). +// Non-terminal [this] lambdas that encounter an error call closeWithError(), +// which stores the error, nulls downstream_ (via consumed), and enqueues a +// no-op guard lambda that releases selfGuard_ after all already-queued [this] +// lambdas have run (FIFO ordering guarantees the guard is always last among +// lambdas enqueued before closeWithError was called). +// +// Terminal [this] lambdas (finSubgroup/finFetch=true) call deactivate() +// unconditionally, even when !downstream_, so the object is not leaked when the +// source drops its ref after the terminal call without making another method call. +template class CrossExecLifetime : public CrossExecBase { +protected: + struct PrivateTag {}; + + CrossExecLifetime(folly::Executor* exec, bool deepCopy) : CrossExecBase(exec, deepCopy) {} + +public: + // Called at the end of each terminal lambda (on targetExec_). + // Drops the self-anchor; must be the last *this access in the lambda. + void deactivate() { selfGuard_.reset(); } + + // Called by source methods from their loadDeferredError() early-return blocks. + // Moves selfGuard_ into a no-op lambda so the release lands after all [this] + // lambdas already enqueued by this (sequential) source thread. Idempotent. + void enqueueDeactivate() { + if (selfGuard_) { + targetExec_->add([guard = std::move(selfGuard_)]() {}); + } + } + + // Store error and null downstream_ (via consumed). The source thread schedules + // the selfGuard_ release via enqueueDeactivate() in its loadDeferredError() block. + // Pass std::move(downstream_) as consumed to null the member before returning. + template + void + closeWithError(moxygen::MoQPublishError::Code code, std::shared_ptr /*consumed*/ = nullptr) { + storeDeferredError(code); + } + template + void closeWithError(const moxygen::MoQPublishError& err, std::shared_ptr consumed = nullptr) { + closeWithError(err.code, std::move(consumed)); + } + +protected: + // Set by create(), cleared by deactivate() or moved out by enqueueDeactivate(). + std::shared_ptr selfGuard_; +}; + // Forwards all SubgroupConsumer calls to a target executor (fire-and-forget). // downstream_ starts null and is populated by CrossExecFilter::beginSubgroup() // on the target executor. FIFO ordering guarantees it is set before any // object/endOf* calls enqueued afterward execute. class CrossExecSubgroupFilter final : public moxygen::SubgroupConsumerFilter, public std::enable_shared_from_this, - public CrossExecBase { + public CrossExecLifetime { public: - explicit CrossExecSubgroupFilter(folly::Executor* targetExec, bool deepCopyPayload = true) - : moxygen::SubgroupConsumerFilter(nullptr), CrossExecBase(targetExec, deepCopyPayload) {} + static std::shared_ptr + create(folly::Executor* targetExec, bool deepCopyPayload = true) { + auto f = std::make_shared(PrivateTag{}, targetExec, deepCopyPayload); + f->selfGuard_ = f; + return f; + } + + CrossExecSubgroupFilter(PrivateTag, folly::Executor* targetExec, bool deepCopyPayload = true) + : moxygen::SubgroupConsumerFilter(nullptr), + CrossExecLifetime(targetExec, deepCopyPayload) {} folly::Expected object( uint64_t objectID, @@ -177,14 +242,31 @@ class CrossExecFilter final : public moxygen::TrackConsumerFilter, // preserved without additional synchronization. class FetchCrossExecFilter final : public moxygen::FetchConsumer, public std::enable_shared_from_this, - public CrossExecBase { + public CrossExecLifetime { public: + static std::shared_ptr create( + folly::Executor* targetExec, + std::shared_ptr downstream, + bool deepCopyPayload = false + ) { + auto f = std::make_shared( + PrivateTag{}, + targetExec, + std::move(downstream), + deepCopyPayload + ); + f->selfGuard_ = f; + return f; + } + FetchCrossExecFilter( + PrivateTag, folly::Executor* targetExec, std::shared_ptr downstream, bool deepCopyPayload = false ) - : CrossExecBase(targetExec, deepCopyPayload), downstream_(std::move(downstream)) {} + : CrossExecLifetime(targetExec, deepCopyPayload), + downstream_(std::move(downstream)) {} folly::Expected object( uint64_t groupID, @@ -206,7 +288,7 @@ class FetchCrossExecFilter final : public moxygen::FetchConsumer, ) override; folly::Expected - objectPayload(moxygen::Payload payload, bool finSubgroup = false) override; + objectPayload(moxygen::Payload payload, bool finFetch = false) override; folly::Expected endOfGroup(uint64_t groupID, uint64_t subgroupID, uint64_t objectID, bool finFetch = false) diff --git a/test/CrossExecFilterTest.cpp b/test/CrossExecFilterTest.cpp index 9de6d946..67266307 100644 --- a/test/CrossExecFilterTest.cpp +++ b/test/CrossExecFilterTest.cpp @@ -132,13 +132,17 @@ TEST_F(CrossExecFilterTest, BeginSubgroupReturnsSubgroupImmediately) { ASSERT_NE(result.value(), nullptr); // The returned consumer is the cross-exec wrapper, not the inner subgroup EXPECT_NE(result.value().get(), static_cast(innerSubgroup_.get())); - exec_.drain(); // drive the pending beginSubgroup on inner + auto subFilter = result.value(); + subFilter->reset(ResetStreamErrorCode::CANCELLED); + exec_.drain(); } TEST_F(CrossExecFilterTest, BeginSubgroupRunsOnTargetExecutor) { EXPECT_CALL(*innerTrack_, beginSubgroup(1, 0, 128, false)).Times(1); auto result = filter_->beginSubgroup(1, 0, 128, false); EXPECT_TRUE(result.hasValue()); + auto subFilter = result.value(); + subFilter->reset(ResetStreamErrorCode::CANCELLED); exec_.drain(); } @@ -149,17 +153,20 @@ TEST_F(CrossExecFilterTest, SubgroupObjectEnqueuedAfterBeginSubgroup) { auto objResult = subFilter->object(0, nullptr, noExtensions(), false); EXPECT_TRUE(objResult.hasValue()); + subFilter->checkpoint(); // Before drain: inner not called EXPECT_CALL(*innerSubgroup_, object(_, _, _, _)).Times(0); EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(0); - // After drain: beginSubgroup runs first, then object + // After drain: beginSubgroup runs first, then object, then checkpoint { InSequence seq; EXPECT_CALL(*innerTrack_, beginSubgroup(1, 0, 128, false)).Times(1); EXPECT_CALL(*innerSubgroup_, object(0, _, _, false)).Times(1); + EXPECT_CALL(*innerSubgroup_, checkpoint()).Times(1); } + subFilter->reset(ResetStreamErrorCode::CANCELLED); exec_.drain(); } @@ -225,6 +232,7 @@ TEST_F(CrossExecFilterTest, SubgroupBeginObjectEnqueued) { InSequence seq; EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); EXPECT_CALL(*innerSubgroup_, beginObject(3, 100, _, _)).Times(1); + subFilter->reset(ResetStreamErrorCode::CANCELLED); exec_.drain(); } @@ -277,7 +285,8 @@ TEST_F(CrossExecFilterTest, ObjectStreamErrorBumpsCounterDoesNotGateTrack) { // Track-level gate is NOT set — beginSubgroup still works. EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); auto subResult = filter_->beginSubgroup(1, 0, 128, false); - EXPECT_TRUE(subResult.hasValue()); + ASSERT_TRUE(subResult.hasValue()); + subResult.value()->reset(ResetStreamErrorCode::CANCELLED); exec_.drain(); } @@ -296,7 +305,9 @@ TEST_F(CrossExecFilterTest, DatagramErrorBumpsCounterDoesNotGateTrack) { EXPECT_TRUE(filter_->objectStream(makeHeader(3, 0, 2), nullptr, false).hasValue()); EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); - EXPECT_TRUE(filter_->beginSubgroup(1, 0, 128, false).hasValue()); + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + subResult.value()->reset(ResetStreamErrorCode::CANCELLED); exec_.drain(); } @@ -320,7 +331,8 @@ TEST_F(CrossExecFilterTest, BeginSubgroupFailureGatesSubgroupNotTrack) { // Track-level gate is NOT set — a new beginSubgroup still works. EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); auto subResult2 = filter_->beginSubgroup(2, 0, 128, false); - EXPECT_TRUE(subResult2.hasValue()); + ASSERT_TRUE(subResult2.hasValue()); + subResult2.value()->reset(ResetStreamErrorCode::CANCELLED); exec_.drain(); } @@ -340,7 +352,7 @@ class FetchCrossExecFilterTest : public ::testing::Test { .WillByDefault(Return(folly::makeExpected(folly::unit))); ON_CALL(*inner_, endOfUnknownRange(_, _, _)) .WillByDefault(Return(folly::makeExpected(folly::unit))); - filter_ = std::make_shared(&exec_, inner_); + filter_ = FetchCrossExecFilter::create(&exec_, inner_); } folly::ManualExecutor exec_; @@ -374,4 +386,114 @@ TEST_F(FetchCrossExecFilterTest, EndOfUnknownRangeEnqueued) { exec_.drain(); } +// finSubgroup=true is the terminal signal: the subgroup must clean up without +// a separate reset() even if the caller drops its ref immediately after. +// Run under ASan to catch use-after-free in the [this] lambda path. +TEST_F(CrossExecFilterTest, SubgroupObjectFinSubgroupIsTerminal) { + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + subFilter->object(0, nullptr, noExtensions(), true); + subFilter.reset(); // drop external ref; selfGuard_ keeps it alive until drain + + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + EXPECT_CALL(*innerSubgroup_, object(0, _, _, true)).Times(1); + exec_.drain(); +} + +// If the inner's beginSubgroup fails, queued data lambdas see !downstream_ +// and clean up via deactivate(). The error is scoped to the subgroup; the +// parent CrossExecFilter remains open for new beginSubgroup/objectStream calls. +TEST_F(CrossExecFilterTest, BeginSubgroupInnerFailureCleansUp) { + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)) + .WillOnce(Return(folly::makeUnexpected(MoQPublishError(MoQPublishError::Code::WRITE_ERROR)))); + + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + // Queue a data call before the setup lambda runs; it will see !downstream_. + EXPECT_CALL(*innerSubgroup_, object(_, _, _, _)).Times(0); + subFilter->object(0, nullptr, noExtensions(), false); + subFilter->reset(ResetStreamErrorCode::CANCELLED); // required terminal call + subFilter.reset(); + + exec_.drain(); + + // Failure is scoped to the subgroup; the parent track is still open. + EXPECT_CALL(*innerTrack_, objectStream(_, _, _)).Times(1); + auto result = filter_->objectStream(makeHeader(), nullptr); + EXPECT_TRUE(result.hasValue()); + exec_.drain(); // drain the objectStream lambda before filter_ is destroyed +} + +// UAF regression test: when beginSubgroup inner fails, closeWithError() on +// the subFilter drops selfGuard_ immediately while multiple [this]-capturing +// lambdas are still queued. Without the deferred-release fix, accessing +// this->downstream_ in the object and checkpoint lambdas is heap-use-after-free. +// Run under ASan to catch the UAF. +TEST_F(CrossExecFilterTest, SubgroupSetupFailureUAFOnMultipleQueuedLambdas) { + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)) + .WillOnce(Return(folly::makeUnexpected(MoQPublishError(MoQPublishError::Code::WRITE_ERROR)))); + + auto r = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(r.hasValue()); + auto subFilter = std::move(r.value()); // move out so r holds null — no extra ref + + // Two lambdas queued that capture [this]. Both see !downstream_ after setup + // fails; neither should be called on inner. + EXPECT_CALL(*innerSubgroup_, object(_, _, _, _)).Times(0); + EXPECT_CALL(*innerSubgroup_, checkpoint()).Times(0); + subFilter->object(0, nullptr, noExtensions(), false); + subFilter->checkpoint(); + subFilter->reset(ResetStreamErrorCode::CANCELLED); // required terminal call + subFilter.reset(); // drop external ref; selfGuard_ keeps object alive until reset lambda runs + + exec_.drain(); +} + +// UAF regression test: a non-terminal beginObject error calls closeWithError(), +// dropping selfGuard_ immediately while the queued objectPayload lambda still +// captures [this]. Without the fix, objectPayload accesses freed memory. +// Run under ASan to catch the UAF. +TEST_F(CrossExecFilterTest, SubgroupBeginObjectErrorUAFOnQueuedObjectPayload) { + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + EXPECT_CALL(*innerSubgroup_, beginObject(0, 10, _, _)) + .WillOnce(Return(folly::makeUnexpected(MoQPublishError(MoQPublishError::Code::WRITE_ERROR)))); + EXPECT_CALL(*innerSubgroup_, objectPayload(_, _)).Times(0); + + auto r = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(r.hasValue()); + auto subFilter = std::move(r.value()); // move out so r holds null — no extra ref + + subFilter->beginObject(0, 10, nullptr, noExtensions()); + subFilter->objectPayload(nullptr, true); // lambda captures [this]; will UAF without fix + subFilter.reset(); // drop external ref; only selfGuard_ + setup-lambda-capture keep object alive + + // Buggy: beginObject lambda errors → closeWithError() → deactivate() → + // selfGuard_.reset() → freed. The objectPayload lambda then accesses + // this->downstream_ on freed memory. + exec_.drain(); +} + +// UAF regression test: a non-terminal object() error calls closeWithError(), +// dropping selfGuard_ immediately while the queued terminal object lambda still +// captures [this]. Without the fix, the second lambda accesses freed memory. +// Run under ASan to catch the UAF. +TEST_F(FetchCrossExecFilterTest, NonTerminalObjectErrorUAFOnQueuedTerminalLambda) { + EXPECT_CALL(*inner_, object(1, 0, 0, _, _, false, false)) + .WillOnce(Return(folly::makeUnexpected(MoQPublishError(MoQPublishError::Code::WRITE_ERROR)))); + EXPECT_CALL(*inner_, object(1, 0, 1, _, _, true, false)).Times(0); + + filter_->object(1, 0, 0, nullptr, noExtensions(), false, false); + filter_->object(1, 0, 1, nullptr, noExtensions(), true, false); + filter_.reset(); // drop external ref; only selfGuard_ keeps object alive + + // Buggy: first object lambda errors → closeWithError() → deactivate() → + // selfGuard_.reset() → freed. The second object lambda then accesses + // this->downstream_ on freed memory. + exec_.drain(); +} + } // namespace