diff --git a/CMakeLists.txt b/CMakeLists.txt index a2ee9801..d04c82aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -118,6 +118,7 @@ add_library(moqx_core STATIC src/UpstreamProvider.cpp src/relay/TopNFilter.cpp src/relay/PropertyRanking.cpp + src/relay/CrossExecFilter.cpp ) target_include_directories(moqx_core diff --git a/src/relay/CrossExecFilter.cpp b/src/relay/CrossExecFilter.cpp new file mode 100644 index 00000000..45647202 --- /dev/null +++ b/src/relay/CrossExecFilter.cpp @@ -0,0 +1,514 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/CrossExecFilter.h" + +#include + +namespace openmoq::moqx { + +namespace { + +// Returns a coalesced deep copy of payload bytes when deepCopy is true, +// otherwise moves it. cloneCoalesced() copies bytes into a single contiguous +// buffer; unshare() ensures independence for single-element chains where +// coalesce() is a no-op and the buffer would otherwise still be shared. +moxygen::Payload maybeDeepCopy(moxygen::Payload& payload, bool deepCopy) { + if (!deepCopy || !payload) { + return std::move(payload); + } + auto copy = payload->cloneCoalesced(); + copy->unshare(); + return copy; +} + +} // namespace + +// ---- CrossExecSubgroupFilter ---- + +folly::Expected CrossExecSubgroupFilter::object( + uint64_t objectID, + moxygen::Payload payload, + moxygen::Extensions extensions, + bool finSubgroup +) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); + targetExec_->add([self = shared_from_this(), + objectID, + payload = std::move(capturedPayload), + extensions = std::move(extensions), + finSubgroup]() mutable { + if (!self->downstream_) { + self->storeDeferredError(moxygen::MoQPublishError::CANCELLED); + return; + } + auto result = + self->downstream_->object(objectID, std::move(payload), std::move(extensions), finSubgroup); + if (result.hasError()) { + self->storeDeferredError(result.error()); + } + }); + return folly::unit; +} + +folly::Expected CrossExecSubgroupFilter::beginObject( + uint64_t objectID, + uint64_t length, + moxygen::Payload initialPayload, + moxygen::Extensions extensions +) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + payloadTracker_.beginObject(length, initialPayload); + auto capturedPayload = maybeDeepCopy(initialPayload, deepCopyPayload_); + targetExec_->add([self = shared_from_this(), + objectID, + length, + payload = std::move(capturedPayload), + extensions = std::move(extensions)]() mutable { + if (!self->downstream_) { + self->storeDeferredError(moxygen::MoQPublishError::CANCELLED); + return; + } + auto result = + self->downstream_->beginObject(objectID, length, std::move(payload), std::move(extensions)); + if (result.hasError()) { + self->storeDeferredError(result.error()); + } + }); + return folly::unit; +} + +folly::Expected +CrossExecSubgroupFilter::objectPayload(moxygen::Payload payload, bool finSubgroup) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + auto status = payloadTracker_.consume(payload); + auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); + targetExec_->add([self = shared_from_this(), payload = std::move(capturedPayload), finSubgroup]( + ) mutable { + if (!self->downstream_) { + self->storeDeferredError(moxygen::MoQPublishError::CANCELLED); + return; + } + auto result = self->downstream_->objectPayload(std::move(payload), finSubgroup); + if (result.hasError()) { + self->storeDeferredError(result.error()); + } + }); + return status; +} + +folly::Expected +CrossExecSubgroupFilter::endOfGroup(uint64_t endOfGroupObjectID) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + targetExec_->add([self = shared_from_this(), endOfGroupObjectID]() { + if (self->downstream_) { + auto result = self->downstream_->endOfGroup(endOfGroupObjectID); + if (result.hasError()) { + // terminal — no storeDeferredError needed + XLOG(ERR) << "endOfGroup: " << result.error().describe(); + } + } + }); + return folly::unit; +} + +folly::Expected +CrossExecSubgroupFilter::endOfTrackAndGroup(uint64_t endOfTrackObjectID) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + targetExec_->add([self = shared_from_this(), endOfTrackObjectID]() { + if (self->downstream_) { + auto result = self->downstream_->endOfTrackAndGroup(endOfTrackObjectID); + if (result.hasError()) { + // terminal — no storeDeferredError needed + XLOG(ERR) << "endOfTrackAndGroup: " << result.error().describe(); + } + } + }); + return folly::unit; +} + +folly::Expected CrossExecSubgroupFilter::endOfSubgroup() { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + targetExec_->add([self = shared_from_this()]() { + if (self->downstream_) { + auto result = self->downstream_->endOfSubgroup(); + if (result.hasError()) { + // terminal — no storeDeferredError needed + XLOG(ERR) << "endOfSubgroup: " << result.error().describe(); + } + } + }); + return folly::unit; +} + +void CrossExecSubgroupFilter::reset(moxygen::ResetStreamErrorCode error) { + // storeDeferredError on calling thread; lambda needs no storeDeferredError even if downstream_ is + // null + storeDeferredError(moxygen::MoQPublishError::CANCELLED); + targetExec_->add([self = shared_from_this(), error]() { + if (self->downstream_) { + self->downstream_->reset(error); + } + }); +} + +void CrossExecSubgroupFilter::checkpoint() { + targetExec_->add([self = shared_from_this()]() { + if (self->downstream_) { + self->downstream_->checkpoint(); + } + }); +} + +folly::Expected, moxygen::MoQPublishError> +CrossExecSubgroupFilter::awaitReadyToConsume() { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + // TODO: backpressure not implemented; always reports ready + return folly::makeSemiFuture(0); +} + +// ---- CrossExecFilter ---- + +folly::Expected +CrossExecFilter::setTrackAlias(moxygen::TrackAlias alias) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + targetExec_->add([self = shared_from_this(), alias]() { + if (!self->downstream_) { + self->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); + return; + } + auto result = self->downstream_->setTrackAlias(alias); + if (result.hasError()) { + self->storeDeferredError(result.error()); + } + }); + return folly::unit; +} + +folly::Expected, moxygen::MoQPublishError> +CrossExecFilter::beginSubgroup( + uint64_t groupID, + uint64_t subgroupID, + moxygen::Priority priority, + bool containsLastInGroup +) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + auto subFilter = std::make_shared(targetExec_, deepCopyPayload_); + targetExec_->add( + [self = shared_from_this(), subFilter, groupID, subgroupID, priority, containsLastInGroup]( + ) mutable { + if (!self->downstream_) { + subFilter->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); + return; + } + auto result = + self->downstream_->beginSubgroup(groupID, subgroupID, priority, containsLastInGroup); + if (result.hasValue()) { + subFilter->setDownstream(std::move(result.value())); + subFilter->setKeepAlive(self->downstream_); + } else { + XLOG(ERR) << "CrossExecFilter beginSubgroup failed: " << result.error().describe(); + subFilter->storeDeferredError(result.error()); + } + } + ); + return subFilter; +} + +folly::Expected, moxygen::MoQPublishError> +CrossExecFilter::awaitStreamCredit() { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + // TODO: stream credit not forwarded across executor; always reports ready + return folly::makeSemiFuture(); +} + +folly::Expected CrossExecFilter::objectStream( + const moxygen::ObjectHeader& header, + moxygen::Payload payload, + bool lastInGroup +) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); + targetExec_->add([self = shared_from_this(), + header, + payload = std::move(capturedPayload), + lastInGroup]() mutable { + if (!self->downstream_) { + self->objectStreamErrors_.fetch_add(1, std::memory_order_relaxed); + XLOG(ERR) << "objectStream: no downstream"; + return; + } + auto result = self->downstream_->objectStream(header, std::move(payload), lastInGroup); + if (result.hasError()) { + self->objectStreamErrors_.fetch_add(1, std::memory_order_relaxed); + XLOG(ERR) << "objectStream: " << result.error().describe(); + } + }); + return folly::unit; +} + +folly::Expected CrossExecFilter::datagram( + const moxygen::ObjectHeader& header, + moxygen::Payload payload, + bool lastInGroup +) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); + targetExec_->add([self = shared_from_this(), + header, + payload = std::move(capturedPayload), + lastInGroup]() mutable { + if (!self->downstream_) { + self->datagramErrors_.fetch_add(1, std::memory_order_relaxed); + XLOG(ERR) << "datagram: no downstream"; + return; + } + auto result = self->downstream_->datagram(header, std::move(payload), lastInGroup); + if (result.hasError()) { + self->datagramErrors_.fetch_add(1, std::memory_order_relaxed); + XLOG(ERR) << "datagram: " << result.error().describe(); + } + }); + return folly::unit; +} + +folly::Expected +CrossExecFilter::publishDone(moxygen::PublishDone pubDone) { + targetExec_->add([self = shared_from_this(), pubDone = std::move(pubDone)]() mutable { + if (self->downstream_) { + self->downstream_->publishDone(std::move(pubDone)); + } + }); + return folly::unit; +} + +// ---- FetchCrossExecFilter ---- + +folly::Expected FetchCrossExecFilter::object( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + moxygen::Payload payload, + moxygen::Extensions extensions, + bool finFetch, + bool forwardingPreferenceIsDatagram +) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); + targetExec_->add([self = shared_from_this(), + groupID, + subgroupID, + objectID, + payload = std::move(capturedPayload), + extensions = std::move(extensions), + finFetch, + forwardingPreferenceIsDatagram]() mutable { + if (!self->downstream_) { + self->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); + return; + } + auto result = self->downstream_->object( + groupID, + subgroupID, + objectID, + std::move(payload), + std::move(extensions), + finFetch, + forwardingPreferenceIsDatagram + ); + if (result.hasError()) { + self->storeDeferredError(result.error()); + } + }); + return folly::unit; +} + +folly::Expected FetchCrossExecFilter::beginObject( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + uint64_t length, + moxygen::Payload initialPayload, + moxygen::Extensions extensions +) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + payloadTracker_.beginObject(length, initialPayload); + auto capturedPayload = maybeDeepCopy(initialPayload, deepCopyPayload_); + targetExec_->add([self = shared_from_this(), + groupID, + subgroupID, + objectID, + length, + payload = std::move(capturedPayload), + extensions = std::move(extensions)]() mutable { + if (!self->downstream_) { + self->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); + return; + } + auto result = self->downstream_->beginObject( + groupID, + subgroupID, + objectID, + length, + std::move(payload), + std::move(extensions) + ); + if (result.hasError()) { + self->storeDeferredError(result.error()); + } + }); + return folly::unit; +} + +folly::Expected +FetchCrossExecFilter::objectPayload(moxygen::Payload payload, bool finSubgroup) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + auto status = payloadTracker_.consume(payload); + auto capturedPayload = maybeDeepCopy(payload, deepCopyPayload_); + targetExec_->add([self = shared_from_this(), payload = std::move(capturedPayload), finSubgroup]( + ) mutable { + if (!self->downstream_) { + self->storeDeferredError(moxygen::MoQPublishError::WRITE_ERROR); + return; + } + auto result = self->downstream_->objectPayload(std::move(payload), finSubgroup); + if (result.hasError()) { + self->storeDeferredError(result.error()); + } + }); + return status; +} + +folly::Expected FetchCrossExecFilter::endOfGroup( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + bool finFetch +) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + targetExec_->add([self = shared_from_this(), groupID, subgroupID, objectID, finFetch]() { + if (self->downstream_) { + auto result = self->downstream_->endOfGroup(groupID, subgroupID, objectID, finFetch); + if (result.hasError()) { + // terminal — no storeDeferredError needed + XLOG(ERR) << "endOfGroup: " << result.error().describe(); + } + } + }); + return folly::unit; +} + +folly::Expected +FetchCrossExecFilter::endOfTrackAndGroup(uint64_t groupID, uint64_t subgroupID, uint64_t objectID) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + targetExec_->add([self = shared_from_this(), groupID, subgroupID, objectID]() { + if (self->downstream_) { + auto result = self->downstream_->endOfTrackAndGroup(groupID, subgroupID, objectID); + if (result.hasError()) { + // terminal — no storeDeferredError needed + XLOG(ERR) << "endOfTrackAndGroup: " << result.error().describe(); + } + } + }); + return folly::unit; +} + +folly::Expected FetchCrossExecFilter::endOfFetch() { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + targetExec_->add([self = shared_from_this()]() { + if (self->downstream_) { + auto result = self->downstream_->endOfFetch(); + if (result.hasError()) { + // terminal — no storeDeferredError needed + XLOG(ERR) << "endOfFetch: " << result.error().describe(); + } + } + }); + return folly::unit; +} + +void FetchCrossExecFilter::checkpoint() { + targetExec_->add([self = shared_from_this()]() { + if (self->downstream_) { + self->downstream_->checkpoint(); + } + }); +} + +folly::Expected +FetchCrossExecFilter::endOfUnknownRange(uint64_t groupID, uint64_t objectID, bool finFetch) { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + targetExec_->add([self = shared_from_this(), groupID, objectID, finFetch]() { + if (self->downstream_) { + auto result = self->downstream_->endOfUnknownRange(groupID, objectID, finFetch); + if (result.hasError()) { + self->storeDeferredError(result.error()); + } + } + }); + return folly::unit; +} + +void FetchCrossExecFilter::reset(moxygen::ResetStreamErrorCode error) { + // storeDeferredError on calling thread; lambda needs no storeDeferredError even if downstream_ is + // null + storeDeferredError(moxygen::MoQPublishError::CANCELLED); + targetExec_->add([self = shared_from_this(), error]() { + if (self->downstream_) { + self->downstream_->reset(error); + } + }); +} + +folly::Expected, moxygen::MoQPublishError> +FetchCrossExecFilter::awaitReadyToConsume() { + if (auto err = loadDeferredError()) { + return folly::makeUnexpected(*err); + } + // TODO: backpressure not implemented; always reports ready + return folly::makeSemiFuture(0); +} + +} // namespace openmoq::moqx diff --git a/src/relay/CrossExecFilter.h b/src/relay/CrossExecFilter.h new file mode 100644 index 00000000..7be3ac67 --- /dev/null +++ b/src/relay/CrossExecFilter.h @@ -0,0 +1,235 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "relay/SubgroupConsumerUtil.h" + +#include +#include +#include +#include +#include + +namespace openmoq::moqx { + +// Non-virtual mixin holding the executor pointer, deep-copy flag, and +// deferred-error state shared by all CrossExec filter classes. +// +// deferredError_ is written on the target executor when downstream returns an +// error or is null, and read on the calling thread. It stores a +// MoQPublishError::Code cast to uint32_t; 0 means open. Storing the actual +// code (not just a bool) lets callers propagate CANCELLED vs WRITE_ERROR +// correctly to MoQForwarder's soft/hard error classification. +class CrossExecBase { +protected: + CrossExecBase(folly::Executor* targetExec, bool deepCopyPayload) + : targetExec_(targetExec), deepCopyPayload_(deepCopyPayload) {} + + std::optional loadDeferredError() const { + if (auto code = deferredError_.load(std::memory_order_relaxed)) { + return moxygen::MoQPublishError(static_cast(code)); + } + return std::nullopt; + } + +public: + void storeDeferredError(moxygen::MoQPublishError::Code code) { + deferredError_.store(static_cast(code), std::memory_order_relaxed); + } + void storeDeferredError(const moxygen::MoQPublishError& err) { storeDeferredError(err.code); } + +protected: + folly::Executor* targetExec_; + bool deepCopyPayload_; + // 0 = open; non-zero = MoQPublishError::Code value set by target thread. + std::atomic deferredError_{0}; +}; + +// Forwards all SubgroupConsumer calls to a target executor (fire-and-forget). +// downstream_ starts null and is populated by CrossExecFilter::beginSubgroup() +// on the target executor. FIFO ordering guarantees it is set before any +// object/endOf* calls enqueued afterward execute. +class CrossExecSubgroupFilter final : public moxygen::SubgroupConsumerFilter, + public std::enable_shared_from_this, + public CrossExecBase { +public: + explicit CrossExecSubgroupFilter(folly::Executor* targetExec, bool deepCopyPayload = true) + : moxygen::SubgroupConsumerFilter(nullptr), CrossExecBase(targetExec, deepCopyPayload) {} + + folly::Expected object( + uint64_t objectID, + moxygen::Payload payload, + moxygen::Extensions extensions = moxygen::noExtensions(), + bool finSubgroup = false + ) override; + + folly::Expected beginObject( + uint64_t objectID, + uint64_t length, + moxygen::Payload initialPayload, + moxygen::Extensions extensions = moxygen::noExtensions() + ) override; + + folly::Expected + objectPayload(moxygen::Payload payload, bool finSubgroup = false) override; + + folly::Expected endOfGroup(uint64_t endOfGroupObjectID + ) override; + + folly::Expected + endOfTrackAndGroup(uint64_t endOfTrackObjectID) override; + + folly::Expected endOfSubgroup() override; + + void reset(moxygen::ResetStreamErrorCode error) override; + + // Must be overridden: SubgroupConsumerFilter::checkpoint() calls + // downstream_->checkpoint() but our inherited downstream_ is always null. + void checkpoint() override; + + // Must be overridden: SubgroupConsumerFilter::awaitReadyToConsume() calls + // downstream_->awaitReadyToConsume() but our inherited downstream_ is always null. + folly::Expected, moxygen::MoQPublishError> + awaitReadyToConsume() override; + + // Called on targetExec_ by CrossExecFilter::beginSubgroup to keep the + // parent TrackConsumer (MoQForwarder) alive as long as this subgroup filter + // exists. SubgroupForwarder holds a raw back-pointer to the parent forwarder; + // without this, the forwarder can be freed while in-flight lambdas are still + // executing reset/endOf* against it. + void setKeepAlive(std::shared_ptr ka) { keepAlive_ = std::move(ka); } + +private: + ObjectPayloadByteTracker payloadTracker_; + std::shared_ptr keepAlive_; +}; + +// Forwards all TrackConsumer calls to a target executor (fire-and-forget). +// All data methods return success immediately on the calling thread after +// checking deferredError_; the actual call runs asynchronously on targetExec. +// beginSubgroup() returns a CrossExecSubgroupFilter that similarly defers +// to targetExec. +// +// Requires targetExec to be a FIFO executor so that delivery order is +// preserved without additional synchronization. +// +// For deferred use (e.g. publish()): construct with inner=nullptr, then call +// setDownstream() on targetExec_ before any data methods are enqueued. FIFO +// ordering guarantees setDownstream() runs before those lambdas execute. +class CrossExecFilter final : public moxygen::TrackConsumerFilter, + public std::enable_shared_from_this, + public CrossExecBase { +public: + CrossExecFilter( + folly::Executor* targetExec, + std::shared_ptr inner, + bool deepCopyPayload = true + ) + : moxygen::TrackConsumerFilter(std::move(inner)), CrossExecBase(targetExec, deepCopyPayload) { + } + + folly::Expected setTrackAlias(moxygen::TrackAlias alias + ) override; + + folly::Expected, moxygen::MoQPublishError> + beginSubgroup( + uint64_t groupID, + uint64_t subgroupID, + moxygen::Priority priority, + bool containsLastInGroup = false + ) override; + + folly::Expected, moxygen::MoQPublishError> + awaitStreamCredit() override; + + folly::Expected objectStream( + const moxygen::ObjectHeader& header, + moxygen::Payload payload, + bool lastInGroup = false + ) override; + + folly::Expected + datagram(const moxygen::ObjectHeader& header, moxygen::Payload payload, bool lastInGroup = false) + override; + + folly::Expected publishDone(moxygen::PublishDone pubDone + ) override; + + uint64_t objectStreamErrors() const { + return objectStreamErrors_.load(std::memory_order_relaxed); + } + uint64_t datagramErrors() const { return datagramErrors_.load(std::memory_order_relaxed); } + +private: + std::atomic objectStreamErrors_{0}; + std::atomic datagramErrors_{0}; +}; + +// Forwards all FetchConsumer calls to a target executor (fire-and-forget). +// All data methods return success immediately on the calling thread after +// checking deferredError_; the actual call runs asynchronously on targetExec. +// +// Requires targetExec to be a FIFO executor so that delivery order is +// preserved without additional synchronization. +class FetchCrossExecFilter final : public moxygen::FetchConsumer, + public std::enable_shared_from_this, + public CrossExecBase { +public: + FetchCrossExecFilter( + folly::Executor* targetExec, + std::shared_ptr downstream, + bool deepCopyPayload = false + ) + : CrossExecBase(targetExec, deepCopyPayload), downstream_(std::move(downstream)) {} + + folly::Expected object( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + moxygen::Payload payload, + moxygen::Extensions extensions = moxygen::noExtensions(), + bool finFetch = false, + bool forwardingPreferenceIsDatagram = false + ) override; + + folly::Expected beginObject( + uint64_t groupID, + uint64_t subgroupID, + uint64_t objectID, + uint64_t length, + moxygen::Payload initialPayload, + moxygen::Extensions extensions = moxygen::noExtensions() + ) override; + + folly::Expected + objectPayload(moxygen::Payload payload, bool finSubgroup = false) override; + + folly::Expected + endOfGroup(uint64_t groupID, uint64_t subgroupID, uint64_t objectID, bool finFetch = false) + override; + + folly::Expected + endOfTrackAndGroup(uint64_t groupID, uint64_t subgroupID, uint64_t objectID) override; + + folly::Expected endOfFetch() override; + + void reset(moxygen::ResetStreamErrorCode error) override; + + void checkpoint() override; + + folly::Expected + endOfUnknownRange(uint64_t groupID, uint64_t objectID, bool finFetch = false) override; + + folly::Expected, moxygen::MoQPublishError> + awaitReadyToConsume() override; + +private: + std::shared_ptr downstream_; + ObjectPayloadByteTracker payloadTracker_; +}; + +} // namespace openmoq::moqx diff --git a/src/relay/SubgroupConsumerUtil.h b/src/relay/SubgroupConsumerUtil.h new file mode 100644 index 00000000..c816341e --- /dev/null +++ b/src/relay/SubgroupConsumerUtil.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +namespace openmoq::moqx { + +// Tracks multi-part object byte delivery so objectPayload() can return the +// correct ObjectPublishStatus without depending on finSubgroup. +// +// Usage: +// Call beginObject(length) from beginObject(). +// Call consume(payload) from objectPayload() before moving the payload; +// the returned status is the value to return to the caller. +struct ObjectPayloadByteTracker { + void beginObject(uint64_t length, const moxygen::Payload& initialPayload = nullptr) { + remaining_ = length; + if (initialPayload) { + remaining_ -= + std::min(remaining_, static_cast(initialPayload->computeChainDataLength())); + } + } + + moxygen::ObjectPublishStatus consume(const moxygen::Payload& payload) { + if (payload && remaining_ > 0) { + auto n = payload->computeChainDataLength(); + remaining_ -= std::min(remaining_, static_cast(n)); + } + return remaining_ == 0 ? moxygen::ObjectPublishStatus::DONE + : moxygen::ObjectPublishStatus::IN_PROGRESS; + } + +private: + uint64_t remaining_{0}; +}; + +} // namespace openmoq::moqx diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 92d9e866..246a2d7e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -272,6 +272,19 @@ target_link_libraries(track_filter_load_test PRIVATE moxygen::moxygen_util_insecure_verifier_dangerous_do_not_use_in_production ) +# CrossExecFilter unit tests +add_executable(moqx_cross_exec_filter_test + CrossExecFilterTest.cpp +) +target_link_libraries(moqx_cross_exec_filter_test PRIVATE + moqx_core + GTest::gtest_main + GTest::gmock + moxygen::moqtest_utils + Folly::folly_executors_manual_executor +) +gtest_discover_tests(moqx_cross_exec_filter_test) + # --- MoqxCache tests --- add_executable(moqx_cache_test diff --git a/test/CrossExecFilterTest.cpp b/test/CrossExecFilterTest.cpp new file mode 100644 index 00000000..9de6d946 --- /dev/null +++ b/test/CrossExecFilterTest.cpp @@ -0,0 +1,377 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "relay/CrossExecFilter.h" + +#include +#include +#include +#include + +using namespace testing; +using namespace moxygen; +using namespace openmoq::moqx; + +namespace { + +const TrackAlias kAlias{42}; + +ObjectHeader makeHeader(uint64_t group = 1, uint64_t subgroup = 0, uint64_t id = 0) { + return ObjectHeader{group, subgroup, id}; +} + +class CrossExecFilterTest : public ::testing::Test { +protected: + void SetUp() override { + innerTrack_ = std::make_shared>(); + innerSubgroup_ = std::make_shared>(); + + ON_CALL(*innerTrack_, setTrackAlias(_)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*innerTrack_, beginSubgroup(_, _, _, _)) + .WillByDefault(Return( + folly::makeExpected>(innerSubgroup_) + )); + ON_CALL(*innerTrack_, objectStream(_, _, _)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*innerTrack_, datagram(_, _, _)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*innerTrack_, publishDone(_)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + + ON_CALL(*innerSubgroup_, object(_, _, _, _)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*innerSubgroup_, beginObject(_, _, _, _)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*innerSubgroup_, objectPayload(_, _)) + .WillByDefault(Return(folly::makeExpected(ObjectPublishStatus::IN_PROGRESS) + )); + ON_CALL(*innerSubgroup_, endOfGroup(_)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*innerSubgroup_, endOfTrackAndGroup(_)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*innerSubgroup_, endOfSubgroup()) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + + filter_ = std::make_shared(&exec_, innerTrack_); + } + + folly::ManualExecutor exec_; + std::shared_ptr> innerTrack_; + std::shared_ptr> innerSubgroup_; + std::shared_ptr filter_; +}; + +// ---- setTrackAlias ---- + +TEST_F(CrossExecFilterTest, SetTrackAliasEnqueued) { + EXPECT_CALL(*innerTrack_, setTrackAlias(_)).Times(0); + auto result = filter_->setTrackAlias(kAlias); + EXPECT_TRUE(result.hasValue()); + + EXPECT_CALL(*innerTrack_, setTrackAlias(kAlias)).Times(1); + exec_.drain(); +} + +// ---- objectStream ---- + +TEST_F(CrossExecFilterTest, ObjectStreamEnqueued) { + auto hdr = makeHeader(2, 0, 5); + EXPECT_CALL(*innerTrack_, objectStream(_, _, _)).Times(0); + auto result = filter_->objectStream(hdr, nullptr, false); + EXPECT_TRUE(result.hasValue()); + + EXPECT_CALL(*innerTrack_, objectStream(hdr, _, false)).Times(1); + exec_.drain(); +} + +// ---- datagram ---- + +TEST_F(CrossExecFilterTest, DatagramEnqueued) { + auto hdr = makeHeader(3, 0, 1); + EXPECT_CALL(*innerTrack_, datagram(_, _, _)).Times(0); + auto result = filter_->datagram(hdr, nullptr, true); + EXPECT_TRUE(result.hasValue()); + + EXPECT_CALL(*innerTrack_, datagram(hdr, _, true)).Times(1); + exec_.drain(); +} + +// ---- publishDone ---- + +TEST_F(CrossExecFilterTest, PublishDoneEnqueued) { + PublishDone done; + done.statusCode = PublishDoneStatusCode::SUBSCRIPTION_ENDED; + + EXPECT_CALL(*innerTrack_, publishDone(_)).Times(0); + auto result = filter_->publishDone(std::move(done)); + EXPECT_TRUE(result.hasValue()); + + EXPECT_CALL(*innerTrack_, publishDone(_)).Times(1); + exec_.drain(); +} + +// ---- awaitStreamCredit ---- + +TEST_F(CrossExecFilterTest, AwaitStreamCreditReturnsReady) { + auto result = filter_->awaitStreamCredit(); + EXPECT_TRUE(result.hasValue()); + EXPECT_TRUE(result.value().isReady()); +} + +// ---- beginSubgroup + subgroup methods ---- + +TEST_F(CrossExecFilterTest, BeginSubgroupReturnsSubgroupImmediately) { + // beginSubgroup returns a cross-exec wrapper immediately; the inner's + // beginSubgroup runs only when the target executor is drained. + auto result = filter_->beginSubgroup(1, 0, 128, false); + EXPECT_TRUE(result.hasValue()); + ASSERT_NE(result.value(), nullptr); + // The returned consumer is the cross-exec wrapper, not the inner subgroup + EXPECT_NE(result.value().get(), static_cast(innerSubgroup_.get())); + exec_.drain(); // drive the pending beginSubgroup on inner +} + +TEST_F(CrossExecFilterTest, BeginSubgroupRunsOnTargetExecutor) { + EXPECT_CALL(*innerTrack_, beginSubgroup(1, 0, 128, false)).Times(1); + auto result = filter_->beginSubgroup(1, 0, 128, false); + EXPECT_TRUE(result.hasValue()); + exec_.drain(); +} + +TEST_F(CrossExecFilterTest, SubgroupObjectEnqueuedAfterBeginSubgroup) { + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + auto objResult = subFilter->object(0, nullptr, noExtensions(), false); + EXPECT_TRUE(objResult.hasValue()); + + // Before drain: inner not called + EXPECT_CALL(*innerSubgroup_, object(_, _, _, _)).Times(0); + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(0); + + // After drain: beginSubgroup runs first, then object + { + InSequence seq; + EXPECT_CALL(*innerTrack_, beginSubgroup(1, 0, 128, false)).Times(1); + EXPECT_CALL(*innerSubgroup_, object(0, _, _, false)).Times(1); + } + exec_.drain(); +} + +TEST_F(CrossExecFilterTest, SubgroupEndOfSubgroupEnqueued) { + auto subResult = filter_->beginSubgroup(2, 1, 64, true); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + subFilter->endOfSubgroup(); + + InSequence seq; + EXPECT_CALL(*innerTrack_, beginSubgroup(2, 1, 64, true)).Times(1); + EXPECT_CALL(*innerSubgroup_, endOfSubgroup()).Times(1); + exec_.drain(); +} + +TEST_F(CrossExecFilterTest, SubgroupEndOfGroupEnqueued) { + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + subFilter->endOfGroup(5); + + InSequence seq; + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + EXPECT_CALL(*innerSubgroup_, endOfGroup(5)).Times(1); + exec_.drain(); +} + +TEST_F(CrossExecFilterTest, SubgroupEndOfTrackAndGroupEnqueued) { + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + subFilter->endOfTrackAndGroup(7); + + InSequence seq; + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + EXPECT_CALL(*innerSubgroup_, endOfTrackAndGroup(7)).Times(1); + exec_.drain(); +} + +TEST_F(CrossExecFilterTest, SubgroupResetEnqueued) { + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + subFilter->reset(ResetStreamErrorCode::CANCELLED); + + InSequence seq; + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + EXPECT_CALL(*innerSubgroup_, reset(ResetStreamErrorCode::CANCELLED)).Times(1); + exec_.drain(); +} + +TEST_F(CrossExecFilterTest, SubgroupBeginObjectEnqueued) { + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + subFilter->beginObject(3, 100, nullptr, noExtensions()); + + InSequence seq; + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + EXPECT_CALL(*innerSubgroup_, beginObject(3, 100, _, _)).Times(1); + exec_.drain(); +} + +TEST_F(CrossExecFilterTest, SubgroupObjectPayloadEnqueued) { + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + // beginObject must precede objectPayload so the byte tracker knows the length. + subFilter->beginObject(0, 10, nullptr); + + auto payloadResult = subFilter->objectPayload(nullptr, true); + EXPECT_TRUE(payloadResult.hasValue()); + EXPECT_EQ(payloadResult.value(), ObjectPublishStatus::IN_PROGRESS); + + InSequence seq; + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + EXPECT_CALL(*innerSubgroup_, beginObject(0, 10, _, _)).Times(1); + EXPECT_CALL(*innerSubgroup_, objectPayload(_, true)).Times(1); + exec_.drain(); +} + +// Verify FIFO ordering: multiple track-level calls arrive in order +TEST_F(CrossExecFilterTest, MultipleCallsDeliveredInOrder) { + auto hdr1 = makeHeader(1, 0, 0); + auto hdr2 = makeHeader(1, 0, 1); + + filter_->objectStream(hdr1, nullptr, false); + filter_->objectStream(hdr2, nullptr, false); + filter_->publishDone(PublishDone{}); + + InSequence seq; + EXPECT_CALL(*innerTrack_, objectStream(hdr1, _, false)).Times(1); + EXPECT_CALL(*innerTrack_, objectStream(hdr2, _, false)).Times(1); + EXPECT_CALL(*innerTrack_, publishDone(_)).Times(1); + exec_.drain(); +} + +// objectStream failure bumps the counter but does NOT gate the track. +TEST_F(CrossExecFilterTest, ObjectStreamErrorBumpsCounterDoesNotGateTrack) { + EXPECT_CALL(*innerTrack_, objectStream(_, _, _)) + .WillOnce(Return(folly::makeUnexpected(MoQPublishError(MoQPublishError::Code::WRITE_ERROR)))); + + filter_->objectStream(makeHeader(), nullptr); + EXPECT_EQ(filter_->objectStreamErrors(), 0u); // counter updated on target exec, not yet + + exec_.drain(); // lambda runs; inner returns error; counter bumped + EXPECT_EQ(filter_->objectStreamErrors(), 1u); + + // Track-level gate is NOT set — beginSubgroup still works. + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + EXPECT_TRUE(subResult.hasValue()); + exec_.drain(); +} + +// datagram failure bumps the counter but does NOT gate objectStream or beginSubgroup. +TEST_F(CrossExecFilterTest, DatagramErrorBumpsCounterDoesNotGateTrack) { + EXPECT_CALL(*innerTrack_, datagram(_, _, _)) + .WillOnce(Return(folly::makeUnexpected(MoQPublishError(MoQPublishError::Code::WRITE_ERROR)))); + + filter_->datagram(makeHeader(3, 0, 1), nullptr, false); + exec_.drain(); + EXPECT_EQ(filter_->datagramErrors(), 1u); + EXPECT_EQ(filter_->objectStreamErrors(), 0u); + + // objectStream and beginSubgroup are unaffected. + EXPECT_CALL(*innerTrack_, objectStream(_, _, _)).Times(1); + EXPECT_TRUE(filter_->objectStream(makeHeader(3, 0, 2), nullptr, false).hasValue()); + + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + EXPECT_TRUE(filter_->beginSubgroup(1, 0, 128, false).hasValue()); + + exec_.drain(); +} + +// beginSubgroup failure stores the error on the subgroup, not the track. +// Subsequent subgroup calls fail immediately; other track calls still work. +TEST_F(CrossExecFilterTest, BeginSubgroupFailureGatesSubgroupNotTrack) { + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)) + .WillOnce(Return(folly::makeUnexpected(MoQPublishError(MoQPublishError::Code::WRITE_ERROR)))); + + auto subResult = filter_->beginSubgroup(1, 0, 128, false); + ASSERT_TRUE(subResult.hasValue()); + auto subFilter = subResult.value(); + + exec_.drain(); // inner beginSubgroup fails; error stored on subFilter + + // Subsequent subgroup call fails immediately from the deferred error. + auto objResult = subFilter->object(0, nullptr, noExtensions(), false); + EXPECT_TRUE(objResult.hasError()); + + // Track-level gate is NOT set — a new beginSubgroup still works. + EXPECT_CALL(*innerTrack_, beginSubgroup(_, _, _, _)).Times(1); + auto subResult2 = filter_->beginSubgroup(2, 0, 128, false); + EXPECT_TRUE(subResult2.hasValue()); + exec_.drain(); +} + +// ---- FetchCrossExecFilter ---- + +class FetchCrossExecFilterTest : public ::testing::Test { +protected: + void SetUp() override { + inner_ = std::make_shared>(); + ON_CALL(*inner_, object(_, _, _, _, _, _, _)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*inner_, endOfGroup(_, _, _, _)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*inner_, endOfTrackAndGroup(_, _, _)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*inner_, endOfFetch()) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + ON_CALL(*inner_, endOfUnknownRange(_, _, _)) + .WillByDefault(Return(folly::makeExpected(folly::unit))); + filter_ = std::make_shared(&exec_, inner_); + } + + folly::ManualExecutor exec_; + std::shared_ptr> inner_; + std::shared_ptr filter_; +}; + +TEST_F(FetchCrossExecFilterTest, MultipleCallsDeliveredInOrder) { + filter_->object(1, 0, 0, nullptr, noExtensions(), false, false); + filter_->checkpoint(); + filter_->object(1, 0, 1, nullptr, noExtensions(), false, false); + filter_->endOfFetch(); + + InSequence seq; + EXPECT_CALL(*inner_, object(1, 0, 0, _, _, false, false)).Times(1); + EXPECT_CALL(*inner_, checkpoint()).Times(1); + EXPECT_CALL(*inner_, object(1, 0, 1, _, _, false, false)).Times(1); + EXPECT_CALL(*inner_, endOfFetch()).Times(1); + exec_.drain(); +} + +TEST_F(FetchCrossExecFilterTest, EndOfUnknownRangeEnqueued) { + filter_->object(1, 0, 0, nullptr, noExtensions(), false, false); + filter_->endOfUnknownRange(1, 1, false); + filter_->endOfFetch(); + + InSequence seq; + EXPECT_CALL(*inner_, object(1, 0, 0, _, _, false, false)).Times(1); + EXPECT_CALL(*inner_, endOfUnknownRange(1, 1, false)).Times(1); + EXPECT_CALL(*inner_, endOfFetch()).Times(1); + exec_.drain(); +} + +} // namespace