Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ba2daa7
feat(switch): implement SWITCH relay state machine (PR #1378 POC)
gwendalsimon May 4, 2026
59b85e6
chore(deps): update moxygen submodule — SWITCH type 0x75 → 0x1F
gwendalsimon May 5, 2026
92a2621
chore(deps): update moxygen — SWITCH type corrected to 0x1B
gwendalsimon May 5, 2026
99e5e84
test(switch): add test_switch.sh integration script and port reservation
gwendalsimon May 7, 2026
07e1f44
test(switch): register switch_relay ctest target
gwendalsimon May 7, 2026
8a12e98
chore(deps): update moxygen — switch test binaries and framing fixes
gwendalsimon May 7, 2026
91f2f32
chore(deps): update moxygen — fix install() targets for switch binaries
gwendalsimon May 7, 2026
3a0fca5
fix(switch): wire setRelay via validateAuthority, not createSession
gwendalsimon May 8, 2026
5b7445a
fix(build): update folly/experimental/coro paths to folly/coro
gwendalsimon May 8, 2026
93a89fb
fix(switch): fix MoqxSession API mismatches vs installed moxygen headers
gwendalsimon May 8, 2026
830c8c1
chore: update moxygen submodule to da7212da (switch unit tests + GCC …
gwendalsimon May 8, 2026
3c42bf3
test(switch): expand SwitchHandlerTest with SwitchTypes and GroupStar…
gwendalsimon May 8, 2026
495a0ba
test(switch): fix integration test; both scenarios now passing
gwendalsimon May 11, 2026
d067929
feat(switch): catch-up on dedicated uni FETCH stream, high priority
gwendalsimon May 15, 2026
daa1807
fix(switch): drop currentSubscribeRequestID from publishForSwitch; su…
gwendalsimon May 18, 2026
2fad587
chore: ignore CLAUDE.md (local Claude Code context, not for VCS)
gwendalsimon May 18, 2026
c623350
test(switch): add SWITCH_TRANSITION parameter encoding/decoding tests
gwendalsimon May 18, 2026
591f63c
refactor(switch): extract findGswitch to SwitchAlgorithm.h; virtual d…
gwendalsimon May 18, 2026
15c220e
chore: use HTTPS URL for moxygen submodule
gwendalsimon May 18, 2026
213dae6
test(switch): add deferred gswitch path test (co_await gswitchFound)
gwendalsimon May 18, 2026
4692e14
test(switch): add CUT_OLD and Phase 2 drain loop unit tests
gwendalsimon May 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ _CPack_Packages/
.vscode/
.claude/

# Local Claude Code context (project-specific, not for version control)
CLAUDE.md

# Docker secrets (never commit)
docker/.env
docker/cloudflare.ini
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "deps/moxygen"]
path = deps/moxygen
url = git@github.com:openmoq/moxygen.git
url = https://github.com/openmoq/moxygen.git
branch = main
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ add_library(moqx_core STATIC
src/MoqxRelay.cpp
src/MoqxRelayContext.cpp
src/MoqxRelayServer.cpp
src/MoqxSession.cpp
src/switch/SwitchHandler.cpp
src/MoqxPicoRelayServer.cpp
src/ServiceMatcher.cpp
src/admin/AdminServer.cpp
Expand Down
2 changes: 1 addition & 1 deletion deps/moxygen
Submodule moxygen updated 74 files
+124 −0 .github/CONTRIBUTING.md
+2 −1 .github/workflows/omoq-auto-merge-sync.yml
+37 −31 .github/workflows/omoq-ci-main.yml
+171 −0 .github/workflows/omoq-picoquic-sync.yml
+29 −48 .github/workflows/omoq-upstream-sync.yml
+311 −32 .github/workflows/omoq-version-release.yml
+0 −114 OPENMOQ_CONTRIBUTING.md
+1 −1 build/deps/github_hashes/facebook/folly-rev.txt
+1 −1 build/deps/github_hashes/facebook/mvfst-rev.txt
+1 −1 build/deps/github_hashes/facebook/proxygen-rev.txt
+1 −1 build/deps/github_hashes/facebook/wangle-rev.txt
+1 −1 build/deps/github_hashes/facebookincubator/fizz-rev.txt
+1 −0 build/deps/github_hashes/openmoq/picoquic-rev.txt
+9 −1 build/fbcode_builder/getdeps.py
+2 −1 build/fbcode_builder/getdeps/fetcher.py
+11 −3 build/fbcode_builder/getdeps/load.py
+1 −0 build/fbcode_builder/manifests/cachelib
+0 −28 build/fbcode_builder/manifests/cinderx-3_14
+0 −34 build/fbcode_builder/manifests/cinderx-main
+1 −0 build/fbcode_builder/manifests/fboss
+2 −2 build/fbcode_builder/manifests/picoquic
+46 −32 moxygen/CMakeLists.txt
+20 −0 moxygen/MoQCodec.cpp
+38 −33 moxygen/MoQCodec.h
+64 −0 moxygen/MoQFramer.cpp
+8 −0 moxygen/MoQFramer.h
+83 −89 moxygen/MoQRelaySession.cpp
+17 −14 moxygen/MoQRelaySession.h
+3 −0 moxygen/MoQServerBase.cpp
+2 −1 moxygen/MoQServerBase.h
+470 −221 moxygen/MoQSession.cpp
+174 −52 moxygen/MoQSession.h
+95 −0 moxygen/MoQTypes.h
+0 −1 moxygen/MoQWebTransportClient.cpp
+1 −3 moxygen/dejitter/CMakeLists.txt
+1 −3 moxygen/flv_parser/CMakeLists.txt
+2 −1 moxygen/mlog/CMakeLists.txt
+1 −3 moxygen/moq_mi/CMakeLists.txt
+1 −1 moxygen/moqtest/MoQTestClient.cpp
+2 −1 moxygen/moqtest/MoQTestServer.cpp
+3 −4 moxygen/moqtest/MoQTestServer.h
+17 −1 moxygen/openmoq/transport/pico/CMakeLists.txt
+2 −3 moxygen/openmoq/transport/pico/MoQPicoServerBase.cpp
+2 −5 moxygen/relay/CMakeLists.txt
+669 −320 moxygen/relay/MoQCache.cpp
+66 −23 moxygen/relay/MoQCache.h
+75 −58 moxygen/relay/MoQForwarder.cpp
+5 −0 moxygen/relay/MoQForwarder.h
+0 −17 moxygen/relay/MoQRelay.cpp
+11 −0 moxygen/relay/test/CMakeLists.txt
+1,516 −148 moxygen/relay/test/MoQCacheTests.cpp
+1,101 −0 moxygen/relay/test/MoQForwarderTest.cpp
+321 −1,238 moxygen/relay/test/MoQRelayTest.cpp
+1 −0 moxygen/samples/CMakeLists.txt
+77 −0 moxygen/samples/switch/CMakeLists.txt
+158 −0 moxygen/samples/switch/MoQSwitchPub.cpp
+426 −0 moxygen/samples/switch/MoQSwitchSub.cpp
+33 −1 moxygen/test/MoQCodecTest.cpp
+32 −0 moxygen/test/MoQFramerTest.cpp
+42 −0 moxygen/test/MoQLocationTest.cpp
+22 −0 moxygen/test/MoQSessionTrackStatusTests.cpp
+1 −0 moxygen/test/Mocks.h
+2 −1 moxygen/tools/moqperf/MoQPerfServer.cpp
+1 −3 moxygen/tools/moqperf/MoQPerfServer.h
+0 −113 moxygen/util/BidiIterator.h
+11 −8 moxygen/util/CMakeLists.txt
+0 −81 moxygen/util/FetchIntervalSet.h
+187 −0 moxygen/util/LocationIntervalSet.cpp
+136 −0 moxygen/util/LocationIntervalSet.h
+18 −0 moxygen/util/test/CMakeLists.txt
+575 −0 moxygen/util/test/LocationIntervalSetTest.cpp
+31 −2 openmoq/scripts/collect-artifacts-standalone.sh
+8 −3 openmoq/scripts/publish-artifacts.sh
+24 −7 standalone/CMakeLists.txt
54 changes: 54 additions & 0 deletions src/MoqxRelay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/

#include "MoqxRelay.h"
#include "MoqxSession.h"
#include "switch/SwitchHandler.h"
#include <folly/container/F14Set.h>
#include <moxygen/MoQFilters.h>
#include <moxygen/MoQTrackProperties.h>
Expand Down Expand Up @@ -1610,4 +1612,56 @@ void MoqxRelay::dumpState(RelayStateVisitor& visitor) const {
}
}

void MoqxRelay::handleSwitch(
std::shared_ptr<MoqxSession> session,
moxygen::Switch sw) {
auto handler = std::make_shared<SwitchHandler>(session, std::move(sw), *this);
folly::coro::co_withExecutor(
session->getExecutor(),
folly::coro::co_invoke([handler]() -> folly::coro::Task<void> {
co_await handler->run();
}))
.start();
}

std::shared_ptr<moxygen::MoQForwarder> MoqxRelay::getForwarder(
moxygen::RequestID requestID,
moxygen::MoQSession* session) const {
// First try to find by requestID + session pointer (upstream requestID match).
for (auto& [ftn, sub] : subscriptions_) {
if (sub.requestID == requestID && sub.forwarder &&
sub.forwarder->getSubscriber(session)) {
return sub.forwarder;
}
}
// Fallback: scan by session pointer alone. Disambiguates by session only.
// TODO: store subscriber requestID in RelaySubscription for exact lookup.
for (auto& [ftn, sub] : subscriptions_) {
if (sub.forwarder && sub.forwarder->getSubscriber(session)) {
return sub.forwarder;
}
}
return nullptr;
}

std::shared_ptr<moxygen::Publisher::SubscriptionHandle>
MoqxRelay::getSubscriptionHandle(
const moxygen::FullTrackName& trackName) const {
auto it = subscriptions_.find(trackName);
if (it != subscriptions_.end()) {
return it->second.handle;
}
return nullptr;
}

folly::coro::Task<std::shared_ptr<moxygen::MoQForwarder>>
MoqxRelay::getOrSubscribeForwarder(const moxygen::FullTrackName& trackName) {
auto it = subscriptions_.find(trackName);
if (it != subscriptions_.end() && it->second.forwarder) {
co_return it->second.forwarder;
}
// TODO: subscribe upstream and await non-null largest().
co_return nullptr;
}

} // namespace openmoq::moqx
35 changes: 35 additions & 0 deletions src/MoqxRelay.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

namespace openmoq::moqx {

class MoqxSession;

// Visitor interface for relay state inspection.
// MoqxRelay::dumpState() calls these methods while walking internal state.
// Implement this to serialize state into any format without adding format
Expand Down Expand Up @@ -221,13 +223,46 @@ class MoqxRelay : public moxygen::Publisher,
// MUST be called on the relay's worker EVB.
void dumpState(RelayStateVisitor& visitor) const;

// --- SWITCH support ---

// Dispatch a SWITCH message from a session. Schedules SwitchHandler::run()
// on the session's executor and returns immediately.
void handleSwitch(
std::shared_ptr<MoqxSession> session,
moxygen::Switch sw);

// Returns the forwarder the session is currently subscribed to.
// requestID provided for a future exact lookup; POC scans by session pointer.
std::shared_ptr<moxygen::MoQForwarder> getForwarder(
moxygen::RequestID requestID,
moxygen::MoQSession* session) const;

// Returns existing forwarder for trackName, or subscribes upstream and waits.
folly::coro::Task<std::shared_ptr<moxygen::MoQForwarder>>
getOrSubscribeForwarder(const moxygen::FullTrackName& trackName);

// Returns SubscriptionHandle for an existing subscription.
std::shared_ptr<moxygen::Publisher::SubscriptionHandle>
getSubscriptionHandle(const moxygen::FullTrackName& trackName) const;

moxygen::MoQCache* cache() const {
return cache_.get();
}

// Test accessor: check if a publish exists and return node/publish state
struct PublishState {
bool nodeExists{false}; // true if tree node exists
std::shared_ptr<moxygen::MoQSession> session{nullptr}; // publish session if exists
};
PublishState findPublishState(const moxygen::FullTrackName& ftn);

// Test accessor: returns the MoQForwarder for a track, or null if not found.
std::shared_ptr<moxygen::MoQForwarder> getForwarderByName(
const moxygen::FullTrackName& ftn) const {
auto it = subscriptions_.find(ftn);
return it != subscriptions_.end() ? it->second.forwarder : nullptr;
}

private:
class NamespaceSubscription;
class TerminationFilter;
Expand Down
4 changes: 4 additions & 0 deletions src/MoqxRelayContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include "MoqxRelayContext.h"
#include "MoqxSession.h"
#include "stats/MoQStatsCollector.h"
#include <moxygen/events/MoQFollyExecutorImpl.h>
#include <moxygen/util/InsecureVerifierDangerousDoNotUseInProduction.h>
Expand Down Expand Up @@ -163,6 +164,9 @@ folly::Expected<folly::Unit, SessionCloseErrorCode> MoqxRelayContext::validateAu
CHECK(it != services_.end()) << "Service '" << *matchedName << "' matched but no entry found";
session->setPublishHandler(it->second.relay);
session->setSubscribeHandler(it->second.relay);
if (auto moqxSession = std::dynamic_pointer_cast<MoqxSession>(session)) {
moqxSession->setRelay(it->second.relay);
}
return folly::unit;
}

Expand Down
6 changes: 3 additions & 3 deletions src/MoqxRelayServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include "MoqxRelayServer.h"
#include "MoqxSession.h"
#include "stats/QuicStatsCollector.h"
#include <moxygen/MoQRelaySession.h>
#include <moxygen/events/MoQFollyExecutorImpl.h>
Expand Down Expand Up @@ -150,11 +151,10 @@ std::shared_ptr<MoQSession> MoqxRelayServer::createSession(
folly::MaybeManagedPtr<proxygen::WebTransport> wt,
std::shared_ptr<MoQExecutor> executor
) {
return std::make_shared<MoQRelaySession>(
return std::make_shared<MoqxSession>(
folly::MaybeManagedPtr<proxygen::WebTransport>(std::move(wt)),
*this,
std::move(executor)
);
std::move(executor));
}

} // namespace openmoq::moqx
142 changes: 142 additions & 0 deletions src/MoqxSession.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (c) Synamedia
* SPDX-License-Identifier: Apache-2.0
*/

#include "MoqxSession.h"
#include "MoqxRelay.h"
#include "switch/SwitchTypes.h"
#include <moxygen/MoQFramer.h>
#include <quic/priority/HTTPPriorityQueue.h>

namespace openmoq::moqx {

void MoqxSession::setRelay(std::shared_ptr<MoqxRelay> relay) {
relay_ = std::move(relay);
}

void MoqxSession::onSwitch(moxygen::Switch sw) {
if (relay_) {
// shared_from_this() returns shared_ptr<MoQSession> (enable_shared_from_this
// is on the base class). Cast to MoqxSession before passing to handleSwitch.
relay_->handleSwitch(
std::static_pointer_cast<MoqxSession>(shared_from_this()),
std::move(sw));
}
}

std::optional<SwitchPublishResult> MoqxSession::publishForSwitch(
moxygen::PublishRequest pub,
uint64_t switchingGroupID,
uint64_t liveEdgeGroupID,
std::shared_ptr<moxygen::Publisher::SubscriptionHandle> handle) {

// Step 1: encode SWITCH_TRANSITION value bytes and insert into pub.params.
// Parameters::params_ is private — must use insertParam(), not push_back().
// Parameters::isParamAllowed() returns true for unknown keys (backward compat),
// so kSwitchTransitionParamKey (0xFF01) will be accepted.
// writeTrackRequestParams() handles delta-encoding for v16+ automatically.
folly::IOBufQueue valBuf{folly::IOBufQueue::cacheChainLength()};
folly::io::QueueAppender va(&valBuf, 16);
(void)quic::encodeQuicInteger(switchingGroupID, [&](auto b) { va.writeBE(b); });
(void)quic::encodeQuicInteger(liveEdgeGroupID, [&](auto b) { va.writeBE(b); });
auto valStr = valBuf.move()->moveToFbString().toStdString();
pub.params.insertParam(moxygen::Parameter{kSwitchTransitionParamKey, valStr});

// Step 2: assign requestID/trackAlias.
pub.requestID = getNextRequestID();
pub.trackAlias = moxygen::TrackAlias(pub.requestID.value);

// Step 3: register TrackPublisherImpl BEFORE opening the bidi stream.
// onPublishOk() looks up pendingRequests_ by requestID immediately when
// PUBLISH_OK arrives. Registration must precede the wire send.
pub.forward = true;
auto consumer = registerPublishConsumerForSwitch(pub, handle);

// Step 4: write PUBLISH frame and open bidi stream (draft >= 17).
folly::IOBufQueue writeBuf{folly::IOBufQueue::cacheChainLength()};
auto writeRes = moqFrameWriter_.writePublish(writeBuf, pub);
if (!writeRes) {
return std::nullopt;
}

// sendRequest() returns Expected<StreamWriteHandle*, string>.
auto sendRes = sendRequest(
writeBuf,
{moxygen::FrameType::PUBLISH_OK, moxygen::FrameType::PUBLISH_ERROR},
pub.requestID,
/*minBidiDraftVersion=*/17);
if (sendRes.hasError()) {
return std::nullopt;
}

return SwitchPublishResult{consumer};
}

void MoqxSession::writeCatchup(
const moxygen::FullTrackName& trackName,
uint64_t gswitch,
uint64_t liveEdge,
moxygen::RequestID currentSubscribeRequestID,
moxygen::MoQCache* cache) {
// moqFrameWriter_ is protected in MoQSession — accessible here (MoqxSession
// IS-A MoQSession) but not from SwitchHandler (a separate class).
if (!cache) {
return;
}
auto wt = getWebTransport();
if (!wt) {
return;
}
auto stream = wt->createUniStream();
if (!stream) {
XLOG(ERR) << "writeCatchup: failed to create uni stream for SWITCH catch-up";
return;
}
// High priority (urgency=1) so catch-up arrives before live subgroup streams.
stream.value()->setPriority(
quic::HTTPPriorityQueue::Priority(1, false, 0));

// Write FETCH_HEADER to the uni stream.
folly::IOBufQueue fetchHeaderBuf{folly::IOBufQueue::cacheChainLength()};
moqFrameWriter_.writeFetchHeader(fetchHeaderBuf, currentSubscribeRequestID);
stream.value()->writeStreamData(fetchHeaderBuf.move(), /*eof=*/false, /*callback=*/nullptr);

for (uint64_t g = gswitch; g < liveEdge; ++g) {
for (uint64_t objID = 0;; ++objID) {
auto* entry =
cache->getObject(trackName, moxygen::AbsoluteLocation{g, objID});
if (!entry) {
break; // nullptr = not cached or gap; treat as end of group for POC
}
// writeStreamObject() CHECKs that if objectPayload is non-empty, length
// must be set. Set length = payloadSize for NORMAL objects.
std::optional<uint64_t> objLen = std::nullopt;
if (entry->status == moxygen::ObjectStatus::NORMAL &&
entry->payloadSize > 0) {
objLen = entry->payloadSize;
}
moxygen::ObjectHeader hdr(
g,
entry->subgroup,
objID,
/*priority=*/std::nullopt,
entry->status,
entry->extensions,
objLen);
folly::IOBufQueue objBuf{folly::IOBufQueue::cacheChainLength()};
moqFrameWriter_.writeStreamObject(
objBuf,
moxygen::StreamType::FETCH_HEADER,
hdr,
entry->payload ? entry->payload->clone() : nullptr,
entry->forwardingPreferenceIsDatagram);
stream.value()->writeStreamData(objBuf.move(), /*eof=*/false, /*callback=*/nullptr);
}
}
// FIN the stream — catch-up delivery is complete.
folly::IOBufQueue emptyBuf{folly::IOBufQueue::cacheChainLength()};
stream.value()->writeStreamData(emptyBuf.move(), /*eof=*/true, /*callback=*/nullptr);
}

} // namespace openmoq::moqx
58 changes: 58 additions & 0 deletions src/MoqxSession.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) Synamedia
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <moxygen/MoQRelaySession.h>
#include <moxygen/MoQTypes.h>
#include <moxygen/relay/MoQCache.h>
#include <proxygen/lib/http/webtransport/WebTransport.h>
#include <memory>
#include <optional>

namespace openmoq::moqx {

class MoqxRelay;

struct SwitchPublishResult {
// TrackConsumer for Phase 2 subgroup delivery after catch-up.
std::shared_ptr<moxygen::TrackConsumer> consumer;
};

// Extends MoQRelaySession with SWITCH message dispatch and bidi PUBLISH support.
class MoqxSession : public moxygen::MoQRelaySession {
public:
using MoQRelaySession::MoQRelaySession;

void setRelay(std::shared_ptr<MoqxRelay> relay);

// MoQControlCodec::ControlCallback override
void onSwitch(moxygen::Switch sw) override;

// Open relay-initiated PUBLISH bidi stream for SWITCH response.
// Writes PUBLISH (with SWITCH_TRANSITION param embedded) + FETCH_HEADER.
// Returns {writeHandle, consumer} on success, nullopt on failure.
virtual std::optional<SwitchPublishResult> publishForSwitch(
moxygen::PublishRequest pub,
uint64_t switchingGroupID,
uint64_t liveEdgeGroupID,
std::shared_ptr<moxygen::Publisher::SubscriptionHandle> handle);

// Open a unidirectional FETCH stream and write FETCH_HEADER + catch-up
// objects [gswitch, liveEdge) from cache. High priority (urgency=1).
// moqFrameWriter_ is protected in MoQSession — must live here, not in
// SwitchHandler.
virtual void writeCatchup(
const moxygen::FullTrackName& trackName,
uint64_t gswitch,
uint64_t liveEdge,
moxygen::RequestID currentSubscribeRequestID,
moxygen::MoQCache* cache);

private:
std::shared_ptr<MoqxRelay> relay_;
};

} // namespace openmoq::moqx
Loading