Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions apps/server/src/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,13 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) =>
snapshot,
}),
liveStream,
).pipe(
Stream.throttle({
cost: () => 1,
units: 50,
duration: "100 millis",
strategy: "shape",
}),
);
}),
{ "rpc.aggregate": "orchestration" },
Expand Down Expand Up @@ -981,6 +988,13 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) =>
),
}),
),
).pipe(
Stream.throttle({
cost: () => 1,
units: 10,
duration: "100 millis",
strategy: "shape",
}),
),
{ "rpc.aggregate": "vcs" },
),
Expand Down Expand Up @@ -1068,6 +1082,13 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) =>
terminalManager.subscribe((event) => Queue.offer(queue, event)),
(unsubscribe) => Effect.sync(unsubscribe),
),
).pipe(
Stream.throttle({
cost: () => 1,
units: 20,
duration: "50 millis",
strategy: "shape",
}),
),
{ "rpc.aggregate": "terminal" },
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ function makeStatus(overrides: Partial<WsConnectionStatus> = {}): WsConnectionSt
online: true,
phase: "idle",
reconnectAttemptCount: 0,
reconnectMaxAttempts: 8,
reconnectMaxAttempts: null,
reconnectPhase: "idle",
socketUrl: null,
...overrides,
Expand Down Expand Up @@ -67,15 +67,15 @@ describe("WebSocketConnectionSurface.logic", () => {
).toBe(false);
});

it("forces reconnect on focus for exhausted reconnect loops", () => {
it("forces reconnect on focus for high reconnect attempt counts in waiting phase", () => {
expect(
shouldAutoReconnect(
makeStatus({
hasConnected: true,
online: true,
phase: "disconnected",
reconnectAttemptCount: 8,
reconnectPhase: "exhausted",
reconnectAttemptCount: 20,
reconnectPhase: "waiting",
}),
"focus",
),
Expand Down
76 changes: 22 additions & 54 deletions apps/web/src/components/WebSocketConnectionSurface.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
type WsConnectionStatus,
type WsConnectionUiState,
useWsConnectionStatus,
WS_RECONNECT_MAX_ATTEMPTS,
} from "../rpc/wsConnectionState";
import { stackedThreadToast, toastManager } from "./ui/toast";
import { getPrimaryEnvironmentConnection } from "../environments/runtime";
Expand Down Expand Up @@ -42,15 +41,8 @@ function describeOfflineToast(): string {
}

function formatReconnectAttemptLabel(status: WsConnectionStatus): string {
const reconnectAttempt = Math.max(
1,
Math.min(status.reconnectAttemptCount, WS_RECONNECT_MAX_ATTEMPTS),
);
return `Attempt ${reconnectAttempt}/${status.reconnectMaxAttempts}`;
}

function describeExhaustedToast(): string {
return "Retries exhausted trying to reconnect";
const reconnectAttempt = Math.max(1, status.reconnectAttemptCount);
return `Attempt ${reconnectAttempt}`;
}

function getConnectionDisplayName(status: WsConnectionStatus): string {
Expand Down Expand Up @@ -118,19 +110,10 @@ export function shouldAutoReconnect(
const uiState = getWsConnectionUiState(status);

if (trigger === "online") {
return (
uiState === "offline" ||
uiState === "reconnecting" ||
uiState === "error" ||
status.reconnectPhase === "exhausted"
);
return uiState === "offline" || uiState === "reconnecting" || uiState === "error";
}

return (
status.online &&
status.hasConnected &&
(uiState === "reconnecting" || status.reconnectPhase === "exhausted")
);
return status.online && status.hasConnected && uiState === "reconnecting";
}

export function shouldRestartStalledReconnect(
Expand Down Expand Up @@ -273,17 +256,16 @@ export function WebSocketConnectionCoordinator() {
const previousDisconnectedAt = previousDisconnectedAtRef.current;
const shouldShowReconnectToast = status.hasConnected && uiState === "reconnecting";
const shouldShowOfflineToast = uiState === "offline" && status.disconnectedAt !== null;
const shouldShowExhaustedToast = status.hasConnected && status.reconnectPhase === "exhausted";

if (
toastResetTimerRef.current !== null &&
(shouldShowReconnectToast || shouldShowOfflineToast || shouldShowExhaustedToast)
(shouldShowReconnectToast || shouldShowOfflineToast)
) {
window.clearTimeout(toastResetTimerRef.current);
toastResetTimerRef.current = null;
}

if (shouldShowReconnectToast || shouldShowOfflineToast || shouldShowExhaustedToast) {
if (shouldShowReconnectToast || shouldShowOfflineToast) {
const toastPayload = shouldShowOfflineToast
? stackedThreadToast({
data: {
Expand All @@ -294,36 +276,22 @@ export function WebSocketConnectionCoordinator() {
title: "Offline",
type: "warning",
})
: shouldShowExhaustedToast
? stackedThreadToast({
actionProps: {
children: "Retry",
onClick: triggerManualReconnect,
},
data: {
hideCopyButton: true,
},
description: describeExhaustedToast(),
timeout: 0,
title: buildReconnectTitle(status),
type: "error",
})
: stackedThreadToast({
actionProps: {
children: "Retry now",
onClick: triggerManualReconnect,
},
data: {
hideCopyButton: true,
},
description:
status.nextRetryAt === null
? `Reconnecting... ${formatReconnectAttemptLabel(status)}`
: `Reconnecting in ${formatRetryCountdown(status.nextRetryAt, nowMs)}... ${formatReconnectAttemptLabel(status)}`,
timeout: 0,
title: buildReconnectTitle(status),
type: "loading",
});
: stackedThreadToast({
actionProps: {
children: "Retry now",
onClick: triggerManualReconnect,
},
data: {
hideCopyButton: true,
},
description:
status.nextRetryAt === null
? `Reconnecting... ${formatReconnectAttemptLabel(status)}`
: `Reconnecting in ${formatRetryCountdown(status.nextRetryAt, nowMs)}... ${formatReconnectAttemptLabel(status)}`,
timeout: 0,
title: buildReconnectTitle(status),
type: "loading",
});

if (toastIdRef.current) {
toastManager.update(toastIdRef.current, toastPayload);
Expand Down
6 changes: 4 additions & 2 deletions apps/web/src/rpc/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
recordWsConnectionErrored,
recordWsConnectionOpened,
type WsConnectionMetadata,
WS_RECONNECT_MAX_RETRIES,
} from "./wsConnectionState";

export interface WsProtocolCloseContext {
Expand Down Expand Up @@ -213,7 +212,7 @@ export function createWsRpcProtocolLayer(
const socketLayer = Socket.layerWebSocket(resolvedUrl).pipe(
Layer.provide(trackingWebSocketConstructorLayer),
);
const retryPolicy = Schedule.addDelay(Schedule.recurs(WS_RECONNECT_MAX_RETRIES), (retryCount) =>
const retryPolicy = Schedule.addDelay(Schedule.forever, (retryCount) =>
Effect.succeed(Duration.millis(getWsReconnectDelayMsForRetry(retryCount) ?? 0)),
);
const protocolLayer = Layer.effect(
Expand Down Expand Up @@ -304,6 +303,9 @@ export function createWsRpcProtocolLayer(
}),
onPingTimeout: Effect.sync(() => {
if (lifecycle.isActive()) {
if (typeof document !== "undefined" && document.visibilityState === "hidden") {
return;
}
clearAllTrackedRpcRequests();
recordWsConnectionErrored(
"WebSocket heartbeat timed out.",
Expand Down
11 changes: 5 additions & 6 deletions apps/web/src/rpc/wsConnectionState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
recordWsConnectionOpened,
resetWsConnectionStateForTests,
setBrowserOnlineStatus,
WS_RECONNECT_MAX_ATTEMPTS,
} from "./wsConnectionState";

describe("wsConnectionState", () => {
Expand Down Expand Up @@ -92,16 +91,16 @@ describe("wsConnectionState", () => {
});
});

it("marks the reconnect cycle as exhausted after the final attempt fails", () => {
for (let attempt = 0; attempt < WS_RECONNECT_MAX_ATTEMPTS; attempt += 1) {
it("keeps retrying indefinitely and stays in the waiting phase after many failed attempts", () => {
const manyAttempts = 20;
for (let attempt = 0; attempt < manyAttempts; attempt += 1) {
recordWsConnectionAttempt("ws://localhost:3020/ws");
recordWsConnectionErrored("Unable to connect to the T3 server WebSocket.");
}

expect(getWsConnectionStatus()).toMatchObject({
nextRetryAt: null,
reconnectAttemptCount: WS_RECONNECT_MAX_ATTEMPTS,
reconnectPhase: "exhausted",
reconnectAttemptCount: manyAttempts,
reconnectPhase: "waiting",
});
});
});
19 changes: 6 additions & 13 deletions apps/web/src/rpc/wsConnectionState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import { Atom } from "effect/unstable/reactivity";
import { appAtomRegistry } from "./atomRegistry";

export type WsConnectionUiState = "connected" | "connecting" | "error" | "offline" | "reconnecting";
export type WsReconnectPhase = "attempting" | "exhausted" | "idle" | "waiting";
export type WsReconnectPhase = "attempting" | "idle" | "waiting";

export const WS_RECONNECT_INITIAL_DELAY_MS = 1_000;
export const WS_RECONNECT_BACKOFF_FACTOR = 2;
export const WS_RECONNECT_MAX_DELAY_MS = 64_000;
export const WS_RECONNECT_MAX_RETRIES = 7;
export const WS_RECONNECT_MAX_ATTEMPTS = WS_RECONNECT_MAX_RETRIES + 1;

export interface WsConnectionStatus {
readonly attemptCount: number;
Expand All @@ -26,7 +24,7 @@ export interface WsConnectionStatus {
readonly online: boolean;
readonly phase: "idle" | "connecting" | "connected" | "disconnected";
readonly reconnectAttemptCount: number;
readonly reconnectMaxAttempts: number;
readonly reconnectMaxAttempts: null;
readonly reconnectPhase: WsReconnectPhase;
readonly socketUrl: string | null;
}
Expand All @@ -45,7 +43,7 @@ const INITIAL_WS_CONNECTION_STATUS = Object.freeze<WsConnectionStatus>({
online: typeof navigator === "undefined" ? true : navigator.onLine !== false,
phase: "idle",
reconnectAttemptCount: 0,
reconnectMaxAttempts: WS_RECONNECT_MAX_ATTEMPTS,
reconnectMaxAttempts: null,
reconnectPhase: "idle",
socketUrl: null,
});
Expand Down Expand Up @@ -201,7 +199,7 @@ export function useWsConnectionStatus(): WsConnectionStatus {
}

export function getWsReconnectDelayMsForRetry(retryIndex: number): number | null {
if (!Number.isInteger(retryIndex) || retryIndex < 0 || retryIndex >= WS_RECONNECT_MAX_RETRIES) {
if (!Number.isInteger(retryIndex) || retryIndex < 0) {
return null;
}

Expand All @@ -220,7 +218,7 @@ function applyDisconnectState(
): WsConnectionStatus {
const disconnectedAt = current.disconnectedAt ?? isoNow();
const nextRetryDelayMs =
current.nextRetryAt !== null || current.reconnectPhase === "exhausted"
current.nextRetryAt !== null
? null
: getWsReconnectDelayMsForRetry(Math.max(0, current.reconnectAttemptCount - 1));

Expand All @@ -234,11 +232,6 @@ function applyDisconnectState(
? current.nextRetryAt
: new Date(Date.now() + nextRetryDelayMs).toISOString(),
phase: "disconnected",
reconnectPhase:
current.reconnectPhase === "waiting" || current.reconnectPhase === "exhausted"
? current.reconnectPhase
: nextRetryDelayMs === null
? "exhausted"
: "waiting",
reconnectPhase: current.reconnectPhase === "waiting" ? current.reconnectPhase : "waiting",
};
}
28 changes: 27 additions & 1 deletion apps/web/src/rpc/wsTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ export class WsTransport {
private session: TransportSession;
private lastHeartbeatPongAt = 0;
private readonly streamRequestStartListeners = new Set<(info: StreamRequestStartInfo) => void>();
private readonly _wakeReconnect = new Set<() => void>();
private _visibilityHandler: (() => void) | null = null;

constructor(
url: WsRpcProtocolSocketUrlProvider,
Expand All @@ -74,6 +76,14 @@ export class WsTransport {
this.url = url;
this.lifecycleHandlers = lifecycleHandlers;
this.session = this.createSession();
if (typeof document !== "undefined") {
this._visibilityHandler = () => {
if (document.visibilityState === "visible") {
for (const wake of this._wakeReconnect) wake();
}
};
document.addEventListener("visibilitychange", this._visibilityHandler);
}
}

async request<TSuccess>(
Expand Down Expand Up @@ -187,7 +197,17 @@ export class WsTransport {
});
}
this.hasReportedTransportDisconnect = true;
await sleep(retryDelayMs);
const isLikelyCongestion =
formattedError.includes("heartbeat timed out") ||
formattedError.includes("ping timeout");
const effectiveDelay = isLikelyCongestion ? Math.max(retryDelayMs, 8_000) : retryDelayMs;
await Promise.race([
sleep(effectiveDelay),
new Promise<void>((resolve) => {
this._wakeReconnect.add(resolve);
void sleep(effectiveDelay).then(() => this._wakeReconnect.delete(resolve));
}),
]);
}
}
})();
Expand Down Expand Up @@ -227,6 +247,12 @@ export class WsTransport {
if (this.disposed) {
return;
}
if (this._visibilityHandler) {
document.removeEventListener("visibilitychange", this._visibilityHandler);
this._visibilityHandler = null;
}
for (const wake of this._wakeReconnect) wake();
this._wakeReconnect.clear();
this.disposed = true;
await this.closeSession(this.session);
}
Expand Down
Loading
Loading