diff --git a/.changeset/ddp-client-idempotent-reconnect.md b/.changeset/ddp-client-idempotent-reconnect.md new file mode 100644 index 0000000000000..65e5a74cb5c56 --- /dev/null +++ b/.changeset/ddp-client-idempotent-reconnect.md @@ -0,0 +1,5 @@ +--- +"@rocket.chat/ddp-client": patch +--- + +Make `Connection.connect()` and `Connection.reconnect()` idempotent. Previously they rejected with `Error('Connection in progress')` when called while a connection was already in flight or established. Because the internal retry timer (`ws.onclose` → `setTimeout(() => void this.reconnect(), …)`) fires with no `.catch`, that rejection surfaced as an unhandled rejection at the page level whenever an external caller (e.g. an SDK consumer's bootstrap path) won the race against the timer. The timer now no-ops when the connection has already been re-established, and a stale `ws.onclose` from a replaced socket no longer clobbers the new socket's status or schedules a redundant retry. diff --git a/apps/meteor/app/notifications/client/lib/Presence.ts b/apps/meteor/app/notifications/client/lib/Presence.ts index d6890d12aa52d..c5fd6ce1289f8 100644 --- a/apps/meteor/app/notifications/client/lib/Presence.ts +++ b/apps/meteor/app/notifications/client/lib/Presence.ts @@ -2,12 +2,21 @@ import { UserStatus } from '@rocket.chat/core-typings'; import { Meteor } from 'meteor/meteor'; import { Presence } from '../../../../client/lib/presence'; +import { getDdpSdk } from '../../../../client/lib/sdk/ddpSdk'; +import { createDdpSdkStreamerAdapter } from '../../../../client/lib/sdk/streamerAdapter'; import { streamerCentral } from '../../../../client/lib/streamer'; // TODO implement API on Streamer to be able to listen to all streamed data // this is a hacky way to listen to all streamed data from user-presence Streamer +// Register the presence streamer on BOTH transports. The subscribe call in +// client/lib/presence.ts routes through DDPSDK when it's ready and falls back +// to Meteor otherwise, so the corresponding messages can arrive on either WS. +// StreamerCentral uses a per-connection `hasMeteorStreamerEventListeners` flag, +// so calling setupDdpConnection twice with distinct connection objects +// installs both listeners without duplicating within the same transport. streamerCentral.getStreamer('user-presence', { ddpConnection: Meteor.connection }); +streamerCentral.setupDdpConnection('user-presence', createDdpSdkStreamerAdapter(getDdpSdk())); type args = [username: string, statusChanged?: UserStatus, statusText?: string]; diff --git a/apps/meteor/app/utils/client/lib/SDKClient.ts b/apps/meteor/app/utils/client/lib/SDKClient.ts index d064f52e9ef74..ac91705fa04e3 100644 --- a/apps/meteor/app/utils/client/lib/SDKClient.ts +++ b/apps/meteor/app/utils/client/lib/SDKClient.ts @@ -2,10 +2,10 @@ import type { RestClientInterface } from '@rocket.chat/api-client'; import type { SDK, ClientStream, StreamKeys, StreamNames, StreamerCallbackArgs, ServerMethods } from '@rocket.chat/ddp-client'; import { Emitter } from '@rocket.chat/emitter'; import { Accounts } from 'meteor/accounts-base'; -import { DDPCommon } from 'meteor/ddp-common'; import { Meteor } from 'meteor/meteor'; import { APIClient } from './RestApiClient'; +import { ensureConnectedAndAuthenticated, getDdpSdk } from '../../../../client/lib/sdk/ddpSdk'; declare module '@rocket.chat/ddp-client' { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -19,30 +19,6 @@ declare module '@rocket.chat/ddp-client' { } } -const isChangedCollectionPayload = ( - msg: any, -): msg is { msg: 'changed'; collection: string; fields: { eventName: string; args: unknown[] } } => { - if (typeof msg !== 'object' && (msg !== null || msg !== undefined)) { - return false; - } - if (msg.msg !== 'changed') { - return false; - } - if (typeof msg.collection !== 'string') { - return false; - } - if (typeof msg.fields !== 'object' && (msg.fields !== null || msg.fields !== undefined)) { - return false; - } - if (typeof msg.fields.eventName !== 'string') { - return false; - } - if (!Array.isArray(msg.fields.args)) { - return false; - } - return true; -}; - type EventMap = StreamKeys> = { [key in `stream-${N}/${K}`]: StreamerCallbackArgs; }; @@ -57,83 +33,101 @@ type StreamMapValue = { unsubList: Set<() => void>; }; -const createNewMeteorStream = (streamName: StreamNames, key: StreamKeys, args: unknown[]): StreamMapValue => { +const createNewDdpSdkStream = ( + streamProxy: Emitter, + streamName: StreamNames, + key: StreamKeys, + args: unknown[], +): StreamMapValue => { const ee = new Emitter<{ ready: [error: any] | [undefined, any]; error: [error: any]; stop: undefined; }>(); - const meta = { - ready: false, - }; + const meta = { ready: false }; + + // Defer the actual `subscribe` until DDPSDK is authenticated. Without this, + // stream subscriptions fired immediately after re-login (e.g. the + // SubscriptionsCachedStore's `notify-user//subscriptions-changed` + // listener that re-arms via onLoggedIn) hit the SDK socket while it's + // still anonymous — server rejects with `not-allowed`/`nosub`, the + // stream's `ready` promise emits an error, and the cached store never + // receives subsequent server events. The visible failure: an agent that + // just took a livechat chat post-relogin sees the chat work but the + // "Move to the queue" button never appears, because the new subscription + // the server creates for that agent is never replicated to the client's + // Subscriptions store, and pseudoRoom (= {...sub, ...room}) ends up with + // no `u` for the canMoveQueue check. + let subscription: ReturnType['client']['subscribe']> | undefined; + let offCollection: (() => void) | undefined; + let stopped = false; + + void ensureConnectedAndAuthenticated() + .catch(() => undefined) + .then(() => { + if (stopped) return; + const sdk = getDdpSdk(); + subscription = sdk.client.subscribe(`stream-${streamName}`, key, { useCollection: false, args }); + + subscription + .ready() + .then(() => { + if (stopped) return; + meta.ready = true; + ee.emit('ready', [undefined, { msg: 'ready', subs: [subscription!.id] }]); + }) + .catch((err) => { + if (stopped) return; + ee.emit('ready', [err]); + ee.emit('error', err); + }); - const sub = Meteor.connection.subscribe( - `stream-${streamName}`, - key, - { useCollection: false, args }, - { - onReady: (args: any) => { - meta.ready = true; - ee.emit('ready', [undefined, args]); - }, - onError: (err: any) => { - ee.emit('ready', [err]); - ee.emit('error', err); - }, - onStop: () => { - ee.emit('stop'); - }, - }, - ); + offCollection = sdk.client.onCollection(`stream-${streamName}`, (data: any) => { + if (data?.msg !== 'changed') return; + if (data.collection !== `stream-${streamName}`) return; + if (data.fields?.eventName !== key) return; + streamProxy.emit(`stream-${streamName}/${key}` as keyof EventMap, data.fields.args); + }); + }); const onChange: ReturnType['onChange'] = (cb) => { if (meta.ready) { - cb({ - msg: 'ready', - - subs: [], - }); + cb({ msg: 'ready', subs: [] }); return; } ee.once('ready', ([error, result]) => { if (error) { - cb({ - msg: 'nosub', - - id: '', - error, - }); + cb({ msg: 'nosub', id: '', error }); return; } - cb(result); }); }; - const ready = () => { - if (meta.ready) { - return Promise.resolve(); - } - return new Promise((resolve, reject) => { - ee.once('ready', ([err]) => { - if (err) { - reject(err); - return; - } - resolve(); - }); - }); - }; - return { - stop: sub.stop, + stop: () => { + // Mirror Meteor's subscription semantics: explicit stop() does not fire the + // 'stop' event (onStop is reserved for server-initiated closures). + // Emitting it here would recurse through the onStop handler that + // createStreamManager registers, which itself iterates the unsubList. + stopped = true; + offCollection?.(); + subscription?.stop(); + }, onChange, - ready, - onError: (cb: (...args: any[]) => void) => - ee.once('error', (error) => { - cb(error); - }), - + ready: () => { + if (meta.ready) return Promise.resolve(); + return new Promise((resolve, reject) => { + ee.once('ready', ([err]) => { + if (err) { + reject(err); + return; + } + resolve(); + }); + }); + }, + onError: (cb: (...args: any[]) => void) => ee.once('error', (error) => cb(error)), onStop: (cb: () => void) => ee.once('stop', cb), get isReady() { return meta.ready; @@ -157,14 +151,6 @@ const createStreamManager = () => { }); }); - Meteor.connection._stream.on('message', (rawMsg: string) => { - const msg = DDPCommon.parseDDP(rawMsg); - if (!isChangedCollectionPayload(msg)) { - return; - } - streamProxy.emit(`${msg.collection}/${msg.fields.eventName}` as any, msg.fields.args as any); - }); - const stream: SDK['stream'] = >( name: N, data: [key: K, ...args: unknown[]], @@ -186,7 +172,8 @@ const createStreamManager = () => { streamProxy.on(eventLiteral, proxyCallback); - const stream = streams.get(eventLiteral) || createNewMeteorStream(name, key, args); + const stream = + streams.get(eventLiteral) || createNewDdpSdkStream(streamProxy, name as StreamNames, key as StreamKeys, args); const stop = (): void => { streamProxy.off(eventLiteral, proxyCallback); @@ -242,29 +229,25 @@ export const createSDK = (rest: RestClientInterface) => { const { stream, stopAll } = createStreamManager(); const publish = (name: string, args: unknown[]) => { - Meteor.call(`stream-${name}`, ...args); + // DDPSDK queues outbound frames until the WebSocket handshake completes, + // so there's no need to gate on an isReady flag here. + void getDdpSdk().client.callAsync(`stream-${name}`, ...args); }; + // Methods route through Meteor.callAsync which goes through + // Meteor.connection._send → ddpOverREST → REST. Going via the DDPSDK + // socket directly would bypass that wrapper and hit the same + // authenticated-but-empty-result race the cached stores caught. const call = (method: T, ...args: Parameters): Promise> => { return Meteor.callAsync(method, ...args); }; - const disconnect = () => { - Meteor.disconnect(); - }; - - const reconnect = () => { - Meteor.reconnect(); - }; - return { rest, stop: stopAll, stream, publish, call, - disconnect, - reconnect, }; }; diff --git a/apps/meteor/client/hooks/useFormatDate.ts b/apps/meteor/client/hooks/useFormatDate.ts index 35410591ddd9d..a13f77b9850de 100644 --- a/apps/meteor/client/hooks/useFormatDate.ts +++ b/apps/meteor/client/hooks/useFormatDate.ts @@ -4,6 +4,6 @@ import { useCallback } from 'react'; import { formatDate } from '../lib/utils/dateFormat'; export const useFormatDate = () => { - const format = useSetting('Message_DateFormat'); - return useCallback((time: string | Date | number) => formatDate(time, String(format)), [format]); + const format = useSetting('Message_DateFormat', 'LL'); + return useCallback((time: string | Date | number) => formatDate(time, format), [format]); }; diff --git a/apps/meteor/client/lib/cachedStores/CachedStore.ts b/apps/meteor/client/lib/cachedStores/CachedStore.ts index 591de8226cf79..8dcd731a630b3 100644 --- a/apps/meteor/client/lib/cachedStores/CachedStore.ts +++ b/apps/meteor/client/lib/cachedStores/CachedStore.ts @@ -50,7 +50,11 @@ export abstract class CachedStore implements protected eventType: StreamNames; - private readonly version = 18; + // Bumped from 18 → 19 to invalidate caches populated before the DDPSDK + // wire encoding was switched from JSON to EJSON. Entries written by the + // JSON window stored dates as ISO strings instead of Date instances, so + // fields like subscription.ls would fail `.getTime()` when read back. + private readonly version = 19; private updatedAt = new Date(0); diff --git a/apps/meteor/client/lib/loggedIn.ts b/apps/meteor/client/lib/loggedIn.ts index 859ff2db7a4de..ddcaa23a4c3e1 100644 --- a/apps/meteor/client/lib/loggedIn.ts +++ b/apps/meteor/client/lib/loggedIn.ts @@ -1,20 +1,47 @@ import { Accounts } from 'meteor/accounts-base'; -import { getUserId } from './user'; +import { getUserId, userIdStore } from './user'; const isLoggedIn = () => { const uid = getUserId(); return !!uid; }; +/** + * Fire `cb` whenever the local userId transitions from absent → present. + * + * `Accounts.onLogin` would normally cover this, but Meteor only invokes + * the onLogin hook from inside a Tracker.autorun that waits for + * `Meteor.userAsync()` to resolve to a real user doc. When a login goes + * through our REST fallback (e.g. logout → fresh login while DDPSDK is + * reconnecting), the user document never lands in Meteor.users — it + * normally arrives as a DDP collection frame, but the REST endpoint + * only returns the method result. The autorun then sees a null user + * forever, and onLogin never fires. By piggybacking on userIdStore (which + * is updated synchronously the moment Accounts.connection.userId() is + * set), we get a reliable login signal regardless of how the user doc + * eventually arrives. + */ +const subscribeToLogin = (handler: () => void): (() => void) => { + let lastSeen = userIdStore.getState(); + return userIdStore.subscribe((next) => { + if (next === lastSeen) return; + const wasLoggedOut = !lastSeen; + lastSeen = next; + if (next && wasLoggedOut) { + handler(); + } + }); +}; + export const whenLoggedIn = () => { if (isLoggedIn()) { return Promise.resolve(); } return new Promise((resolve) => { - const subscription = Accounts.onLogin(() => { - subscription.stop(); + const stop = subscribeToLogin(() => { + stop(); resolve(); }); }); @@ -30,11 +57,17 @@ export const onLoggedIn = (cb: (() => () => void) | (() => Promise<() => void>) } }; - const subscription = Accounts.onLogin(handler); + // Belt-and-braces: still register with Accounts.onLogin so consumers + // pick up loginDetails when Meteor's own autorun does fire (resume on + // page load, where the user doc lands via DDP and unblocks the + // autorun). The userIdStore subscription covers everything else. + const accountsSubscription = Accounts.onLogin(handler); + const stopUserIdSubscription = subscribeToLogin(handler); if (isLoggedIn()) handler(); return () => { - subscription.stop(); + accountsSubscription.stop(); + stopUserIdSubscription(); cleanup?.(); }; }; diff --git a/apps/meteor/client/lib/presence.ts b/apps/meteor/client/lib/presence.ts index 16a5282c56921..39d2e73268579 100644 --- a/apps/meteor/client/lib/presence.ts +++ b/apps/meteor/client/lib/presence.ts @@ -4,8 +4,21 @@ import type { EventHandlerOf } from '@rocket.chat/emitter'; import { Emitter } from '@rocket.chat/emitter'; import { Meteor } from 'meteor/meteor'; +import { getDdpSdk } from './sdk/ddpSdk'; import { sdk } from '../../app/utils/client/lib/SDKClient'; +const subscribeUserPresence = (payload: { added?: string[]; removed?: string[] }): void => { + const ddp = getDdpSdk(); + if (ddp.connection.status === 'connected' && ddp.account.uid) { + // Fire the command-style subscription over our SDK; it has no lifecycle + // (the server registers the added/removed uids and moves on), matching + // Meteor.subscribe's behaviour here. + ddp.client.subscribe('stream-user-presence', '', payload); + return; + } + Meteor.subscribe('stream-user-presence', '', payload); +}; + type InternalEvents = { remove: IUser['_id']; reset: undefined; @@ -55,7 +68,7 @@ const getPresence = ((): ((uid: UserPresence['_id']) => void) => { const ids = Array.from(currentUids); const removed = Array.from(deletedUids); - Meteor.subscribe('stream-user-presence', '', { + subscribeUserPresence({ ...(ids.length > 0 && { added: Array.from(currentUids) }), ...(removed.length && { removed: Array.from(deletedUids) }), }); diff --git a/apps/meteor/client/lib/sdk/ddpSdk.ts b/apps/meteor/client/lib/sdk/ddpSdk.ts new file mode 100644 index 0000000000000..4b7f93d70266c --- /dev/null +++ b/apps/meteor/client/lib/sdk/ddpSdk.ts @@ -0,0 +1,269 @@ +import { DDPSDK } from '@rocket.chat/ddp-client'; +import EJSON from 'ejson'; +import { Accounts } from 'meteor/accounts-base'; +import { Meteor } from 'meteor/meteor'; + +import { userIdStore } from '../user'; + +const stripTrailingSlash = (value: string): string => (value.endsWith('/') ? value.slice(0, -1) : value); + +const computeDdpUrl = (): string => { + const rootUrl = typeof __meteor_runtime_config__ !== 'undefined' ? __meteor_runtime_config__.ROOT_URL : undefined; + const source = rootUrl && rootUrl !== '/' ? rootUrl : window.location.origin; + return stripTrailingSlash(source.replace(/^http/, 'ws')); +}; + +let instance: DDPSDK | undefined; +let connectPromise: Promise | undefined; + +const applyEjsonEncoding = (sdk: DDPSDK): void => { + const { ddp } = sdk.client as unknown as { ddp: { encode: unknown; decode: unknown } }; + if (!ddp) return; + ddp.encode = EJSON.stringify; + ddp.decode = EJSON.parse; +}; + +const startConnect = (sdk: DDPSDK): Promise => { + if (connectPromise) return connectPromise; + connectPromise = sdk.connection.connect().catch((err) => { + console.warn('[ddpSdk] connect failed', err); + // Allow a retry on the next call. + connectPromise = undefined; + }); + return connectPromise; +}; + +const waitForConnected = (sdk: DDPSDK): Promise => { + if (sdk.connection.status === 'connected') return Promise.resolve(); + return new Promise((resolve) => { + const stop = sdk.connection.on('connected', () => { + stop(); + resolve(); + }); + }); +}; + +export const getDdpSdk = (): DDPSDK => { + if (!instance) { + instance = DDPSDK.create(computeDdpUrl()); + applyEjsonEncoding(instance); + void startConnect(instance); + } + return instance; +}; + +const readStoredLoginToken = (): string | null => (typeof window !== 'undefined' ? window.localStorage.getItem('Meteor.loginToken') : null); + +let inflightLogin: Promise | undefined; + +export const ensureConnectedAndAuthenticated = async (): Promise => { + const sdk = getDdpSdk(); + + // IMPORTANT: must wait for the DDP `connected` handshake before issuing + // any wait-method (login uses wait:true). DDPDispatcher serializes wait + // blocks at the queue head, so a login dispatched while connecting + // queues ahead of the connect frame ws.onopen later emits — the connect + // frame ends up wedged in a non-wait block behind the wait block and + // never flushes, leaving the socket open but DDP-unhandshaked. + if ( + sdk.connection.status === 'idle' || + sdk.connection.status === 'closed' || + sdk.connection.status === 'disconnected' || + sdk.connection.status === 'failed' + ) { + void startConnect(sdk); + } + await waitForConnected(sdk); + + const token = readStoredLoginToken(); + if (!token || sdk.account.uid) { + return; + } + + if (inflightLogin) { + await inflightLogin; + return; + } + + // Give Meteor's own login flow (resume routed through stubMeteorStream + // + adoptAccountFromMeteorLoginResult) time to populate sdk.account + // before we issue our own loginWithToken. If adopt fires first, we can + // short-circuit and avoid sending a second login frame on the SDK + // socket — which would otherwise create a duplicate Presence + // connection (processConnectionStatus prefers ONLINE over AWAY in the + // aggregate, breaking the auto-away flow). 500ms covers a single + // server roundtrip in CI; if the stub-routed login hasn't completed by + // then, fall back to issuing our own loginWithToken below. + for (let i = 0; i < 20 && !sdk.account.uid; i++) { + await new Promise((resolve) => setTimeout(resolve, 25)); + } + if (sdk.account.uid) { + return; + } + + inflightLogin = (async () => { + try { + await sdk.account.loginWithToken(token); + } finally { + inflightLogin = undefined; + } + })(); + + try { + await inflightLogin; + } catch (error) { + if (isAuthError(error) && readStoredLoginToken() === token) { + // Server rejected the stored token. Without this branch the stored + // token stays in localStorage forever and the router keeps the user + // wedged on /home with no main UI and no login form: ddpOverREST + // routes Meteor's resume login through DDPSDK / REST (not Meteor's + // own WS), and on rejection the resume invoker errors but the + // account state isn't cleared automatically. The token-stable + // guard (readStoredLoginToken() === token) avoids kicking the user + // out when localStorage was updated mid-flight by a parallel flow + // (fresh registration, Meteor's own resume) — the 401 is then on a + // stale token a newer credential already replaced. Drop the local + // credentials manually instead of calling Meteor.logout(): the + // latter dispatches a `logout` method which itself races against + // parallel re-auth flows in CI's parallel-shard environment and + // kicked otherwise-healthy tests out. + Accounts._unstoreLoginToken(); + (Meteor.connection as unknown as { setUserId: (uid: string | null) => void }).setUserId(null); + return; + } + console.warn('[ddpSdk] loginWithToken failed', error); + } +}; + +const isAuthError = (error: unknown): boolean => { + if (!error || typeof error !== 'object') return false; + const e = error as { error?: unknown; reason?: unknown }; + return ( + e.error === 401 || + e.error === 403 || + e.reason === 'User not found' || + e.reason === 'Login token expired' || + e.reason === 'You are not allowed to use this token' + ); +}; + +/** + * When Meteor.applyAsync('login', ...) is routed through ddpOverREST it lands on + * DDPSDK as `client.callAsync('login', ...)`. The result authenticates the + * underlying DDP socket — server-side the session is now logged in — but + * `sdk.account` is bypassed entirely (only `sdk.account.loginWithToken` populates + * `account.uid` / `account.user`). Without this sync, our userIdStore subscriber + * sees uid set, calls ensureConnectedAndAuthenticated, finds `account.uid` empty, + * and fires a SECOND login on the same socket. The server happily honours both, + * issuing two different login tokens; whichever arrives second wins on the + * server but on the client we end up with `account.user.token !== Meteor.loginToken`, + * which surfaces later as auth-mismatched subscription errors and React crashes + * mid-flow. + * + * Call this from ddpOverREST after a successful 'login' method result so DDPSDK's + * `account` reflects the same credentials Meteor stored, and ensureConnectedAndAuthenticated + * short-circuits its own loginWithToken path. + */ +export const adoptAccountFromMeteorLoginResult = (result: unknown): void => { + if (!result || typeof result !== 'object') return; + const r = result as { id?: unknown; token?: unknown; tokenExpires?: unknown }; + if (typeof r.id !== 'string' || typeof r.token !== 'string') return; + const tokenExpiresRaw = r.tokenExpires; + let tokenExpires: Date | undefined; + if (tokenExpiresRaw instanceof Date) { + tokenExpires = tokenExpiresRaw; + } else if (typeof tokenExpiresRaw === 'object' && tokenExpiresRaw !== null && '$date' in tokenExpiresRaw) { + const d = (tokenExpiresRaw as { $date: number | string }).$date; + tokenExpires = new Date(typeof d === 'string' ? parseInt(d, 10) : d); + } + const sdk = getDdpSdk(); + sdk.account.user = { ...sdk.account.user, token: r.token, tokenExpires, id: r.id } as typeof sdk.account.user; + sdk.account.uid = r.id; +}; + +const teardownAuthenticatedConnection = (): void => { + if (!instance) return; + try { + instance.connection.close(); + } catch { + // ignore + } + instance.account.uid = undefined; + instance.account.user = undefined; + connectPromise = undefined; +}; + +declare global { + // eslint-disable-next-line @typescript-eslint/naming-convention + interface Window { + __rocketChatSdk?: DDPSDK; + } +} + +if (typeof window !== 'undefined') { + const sdk = getDdpSdk(); + window.__rocketChatSdk = sdk; + + // DDPSDK auto-fires loginWithToken on every `connected` event using the + // in-memory account.user.token (DDPSDK.create line 115-122). When the + // server force-logs the user out (resetUserE2EKey → + // Users.unsetLoginTokens → meteor.service force_logout listener closes + // the user's WebSocket sessions), the SDK reconnects and immediately + // retries the now-dead token. DDPSDK calls this with `void` so the + // rejection is swallowed; account.user stays populated, Meteor.userId() + // stays set, and the navbar continues to render Home with stale creds. + // + // Wrap account.loginWithToken so we can observe rejections from the + // auto-retry. To avoid breaking the SAML/password login flows where a + // fresh login is concurrently in flight, only act when: + // - the error is auth-shaped (`isAuthError`) AND + // - the token in localStorage still matches the one we tried with + // (nothing rotated it mid-flight) AND + // - the SDK account didn't get refreshed by a successful adopt while + // we were awaiting (sdk.account.uid still maps to this token's user) + // Wrap account.loginWithToken so the SDK's auto-relogin rejection (called + // with `void` in DDPSDK.create) doesn't surface as an unhandled rejection + // (window.onunhandledrejection → pageError). The actual recovery from a + // failed auto-relogin is now driven by Meteor's `DDP.onReconnect` + // callback (registered by `callLoginMethod`), which fires after + // stubMeteorStream re-emits `reset` on each SDK 'connected' event. That + // callback retries login with the latest stored token and calls + // `makeClientLoggedOut` on failure — no need to duplicate that logic. + const account = sdk.account as unknown as { loginWithToken: (token: string) => Promise }; + const originalLogin = account.loginWithToken.bind(sdk.account); + account.loginWithToken = async (token: string) => { + try { + return await originalLogin(token); + } catch (error) { + if (isAuthError(error)) { + // Meteor's onReconnect path will retry through stubMeteorStream + // with the current localStorage token; nothing for us to do here + // beyond not letting the rejection escape. + return undefined; + } + throw error; + } + }; + + // Boot-time auth is now driven by Meteor's login resume routed through + // stubMeteorStream, which calls adoptAccountFromMeteorLoginResult on + // success. Calling ensureConnectedAndAuthenticated here as well would + // fire a *second* loginWithToken on the SDK socket before the Meteor + // resume completes — server-side that ends up as TWO Accounts.onLogin + // fires → TWO Presence.newConnection inserts in usersSessions, with + // duplicate entries that confuse processConnectionStatus (one stays + // online while the other goes away, aggregating to online — auto-away + // never propagates). + + userIdStore.subscribe((uid) => { + if (uid) { + // Subsequent userId transitions (logout → login) still need to + // re-establish auth on the SDK socket; adopt only kicks in for + // login frames going through the stub, not for the post-logout + // re-auth that doesn't necessarily go through Meteor. + void ensureConnectedAndAuthenticated(); + } else { + teardownAuthenticatedConnection(); + } + }); +} diff --git a/apps/meteor/client/lib/sdk/streamerAdapter.ts b/apps/meteor/client/lib/sdk/streamerAdapter.ts new file mode 100644 index 0000000000000..63e5a9e40296a --- /dev/null +++ b/apps/meteor/client/lib/sdk/streamerAdapter.ts @@ -0,0 +1,47 @@ +import type { DDPSDK } from '@rocket.chat/ddp-client'; +import EJSON from 'ejson'; + +type StreamerDDPConnection = { + _stream: { + on: { + (key: 'message', callback: (data: string) => void): void; + (key: 'reset', callback: () => void): void; + }; + }; + subscribe(name: string, ...args: unknown[]): { stop: () => void }; + call(methodName: string, ...args: unknown[]): void; + hasMeteorStreamerEventListeners?: boolean; +}; + +/** + * Presents a DDPSDK instance with the shape `StreamerCentral` expects + * (the subset of Meteor.connection exposed as `StreamerDDPConnection`). + * Lets us run the existing streamer infrastructure against our SDK's + * WebSocket without rewriting StreamerCentral. + */ +export const createDdpSdkStreamerAdapter = (sdk: DDPSDK): StreamerDDPConnection => { + const { ddp } = sdk.client as unknown as { ddp: { onMessage: (cb: (payload: unknown) => void) => () => void } }; + + return { + _stream: { + on: ((key: 'message' | 'reset', callback: ((data: string) => void) | (() => void)): void => { + if (key === 'message') { + ddp.onMessage((payload) => { + // StreamerCentral re-parses the string with JSON.parse; hand it + // an EJSON-serialised payload so Dates/undefined round-trip the + // same way Meteor.connection's raw WS frames did. + (callback as (data: string) => void)(EJSON.stringify(payload)); + }); + return; + } + if (key === 'reset') { + sdk.connection.on('disconnected', callback as () => void); + } + }) as StreamerDDPConnection['_stream']['on'], + }, + subscribe: (name: string, ...args: unknown[]) => sdk.client.subscribe(name, ...args), + call: (methodName: string, ...args: unknown[]): void => { + void sdk.client.callAsync(methodName, ...args); + }, + }; +}; diff --git a/apps/meteor/client/main.ts b/apps/meteor/client/main.ts index 08d11bb14b734..be87f6e655f5b 100644 --- a/apps/meteor/client/main.ts +++ b/apps/meteor/client/main.ts @@ -1,5 +1,6 @@ import './meteor/overrides'; import './meteor/startup'; +import './lib/sdk/ddpSdk'; import './serviceWorker'; import('./meteor/login') diff --git a/apps/meteor/client/meteor/overrides/ddpOverREST.ts b/apps/meteor/client/meteor/overrides/ddpOverREST.ts index e7fb05275e3ab..38d70865c75ff 100644 --- a/apps/meteor/client/meteor/overrides/ddpOverREST.ts +++ b/apps/meteor/client/meteor/overrides/ddpOverREST.ts @@ -6,12 +6,22 @@ import { getUserId } from '../../lib/user'; const bypassMethods: string[] = ['setUserStatus', 'logout']; +const isResumeLogin = ({ method, params }: Meteor.IDDPMessage): boolean => method === 'login' && Boolean(params?.[0]?.resume); + const shouldBypass = ({ msg, method, params }: Meteor.IDDPMessage): boolean => { if (msg !== 'method') { return true; } - if (method === 'login' && params[0]?.resume) { + // In microservices CI, ddp-streamer-service registers `login`, `logout`, + // `setUserStatus`, and `UserPresence:*` as native methods (configureServer.ts + // in ee/apps/ddp-streamer); every other method delegates to the Meteor + // service via callMethodWithToken (extra hop). Bypassing these to Meteor's + // own WS routes them straight to ddp-streamer for the fast path; routing + // them through REST would wedge them on the slow rocketchat-main path + // instead, blowing past the 5s `expect(...).toBeVisible()` timeouts the + // post-relogin tests rely on. + if (method === 'login' && params?.[0]?.resume) { return true; } @@ -32,16 +42,10 @@ const withDDPOverREST = (_send: (this: Meteor.IMeteorConnection, message: Meteor return _send.call(this, message, ...args); } - const endpoint = !getUserId() ? 'method.callAnon' : 'method.call'; - - const restParams = { - message: DDPCommon.stringifyDDP({ ...message }), - }; - - const processResult = (_message: string): void => { - // Prevent error on reconnections and method retry. - // On those cases the API will be called 2 times but - // the handler will be deleted after the first execution. + const processResult = (resultMessage: string): void => { + // Prevent error on reconnections and method retry: on those cases the + // API will be called twice but the handler is deleted after the first + // execution. if (!this._methodInvokers[message.id]) { return; } @@ -49,8 +53,33 @@ const withDDPOverREST = (_send: (this: Meteor.IMeteorConnection, message: Meteor msg: 'updated', methods: [message.id], }); + this._streamHandlers.onMessage(resultMessage); + }; + + const wasResumeLogin = isResumeLogin(message); + + // Note on login routing: `login + resume` is bypassed in shouldBypass + // (handled by Meteor's own WS → ddp-streamer's native handler). Other + // login flavours (SAML credential exchange, password, OAuth) MUST route + // through REST below — never through the DDPSDK socket. ddp-streamer + // only exposes `login` natively for the resume shape; non-resume logins + // get delegated via MeteorService.callMethodWithToken (extra hop), and + // the SDK socket would also race the follow-up `Meteor.loginWithToken` + // resume that gets queued from the success handler — two logins on + // different sockets for the same user, with diverging account state. + // REST → rocketchat-main is one hop and lets the resume follow-up + // settle the SDK auth via ensureConnectedAndAuthenticated. + + // Login itself is the call that establishes auth — running it through + // `method.call` would force the REST middleware to validate the very + // token we're trying to use, and the server would 401 with "You must + // be logged in" before even invoking the login method. The 401 then + // short-circuits the resume callback, leaving the stale token in + // localStorage and the user wedged on /home with no main UI. + const endpoint = !getUserId() || wasResumeLogin ? 'method.callAnon' : 'method.call'; - this._streamHandlers.onMessage(_message); + const restParams = { + message: DDPCommon.stringifyDDP({ ...message }), }; const method = encodeURIComponent(message.method.replace(/\//g, ':')); @@ -58,40 +87,42 @@ const withDDPOverREST = (_send: (this: Meteor.IMeteorConnection, message: Meteor sdk.rest .post(`/v1/${endpoint}/${method}`, restParams) .then(({ message: _message }) => { - // Calling Meteor.loginWithToken before processing the result of the first login will ensure that the new login request - // is added to the top of the list of methodInvokers. - // The request itself will only be sent after the first login result is processed, but - // the Accounts.onLogin callbacks will be called before this request is effectively sent; - // This way, any requests done inside of onLogin callbacks will be added to the list - // and processed only after the Meteor.loginWithToken request is done - // So, the effective order is: - // 1. regular login with password is sent - // 2. result of the password login is received - // 3. login with token is added to the list of pending requests - // 4. result of the password login is processed - // 5. Accounts.onLogin callbacks are triggered, Meteor.userId is set - // 6. the request for the login with token is effectively sent - // 7. login with token result is processed - // 8. requests initiated inside of the Accounts.onLogin callback are then finally sent - // - // Keep in mind that there's a difference in how meteor3 processes the request results, compared to older meteor versions - // On meteor3, any collection writes triggered by a request result are done async, which means that the `processResult` call - // will not immediatelly trigger the callbacks, like it used to in older versions. - // That means that on meteor3+, it doesn't really make a difference if processResult is called before or after the Meteor.loginWithToken here - // as the result will be processed async, the loginWithToken call will be initiated before it is effectively processed anyway. - if (message.method === 'login') { + // Skip Meteor.loginWithToken on resume responses: Meteor itself called + // us with `login({resume})` and is already wiring up the new token via + // its invoker. Calling loginWithToken again would re-enter this _send + // override, dispatch another login method, and loop — locking the + // boot in a chain of resume calls and rate-limiting the server. + if (!wasResumeLogin && message.method === 'login') { const parsedMessage = DDPCommon.parseDDP(_message) as { result?: { token?: string } }; if (parsedMessage.result?.token) { Meteor.loginWithToken(parsedMessage.result.token); } } - processResult(_message); }) - .catch(async (error) => { - if ('message' in error && error.message) { - processResult(error.message); - } + .catch((error: unknown) => { + // The Rocket.Chat REST middleware throws the parsed JSON body, which + // is shaped like { success: false, error, status, message } for a 401 + // — NOT as a DDP-encoded result frame. If we feed `error.message` + // (just a plain string) to processResult, Meteor's `_streamHandlers` + // can't parse it and the invoker never sees the rejection: the + // stored token stays in localStorage, Meteor.userId() stays set, and + // the user is wedged on /home with no main UI and no login form. + // Re-encode it as a proper DDP error result so Accounts' resume + // callback runs and clears the stale credentials. + const e = (error ?? {}) as { error?: unknown; reason?: unknown; message?: unknown }; + const errorMessage = DDPCommon.stringifyDDP({ + msg: 'result', + id: message.id, + error: { + isClientSafe: true, + error: e.error ?? 'unknown', + reason: (e.reason as string) ?? (e.message as string) ?? 'Unknown error', + message: (e.message as string) ?? (e.reason as string) ?? 'Unknown error', + errorType: 'Meteor.Error', + } as unknown as Meteor.Error, + }); + processResult(errorMessage); console.error(error); }); }; diff --git a/apps/meteor/client/meteor/overrides/ddpSdkCollectionBridge.ts b/apps/meteor/client/meteor/overrides/ddpSdkCollectionBridge.ts new file mode 100644 index 0000000000000..18ad25de7d49a --- /dev/null +++ b/apps/meteor/client/meteor/overrides/ddpSdkCollectionBridge.ts @@ -0,0 +1,96 @@ +import { DDPCommon } from 'meteor/ddp-common'; +import { Meteor } from 'meteor/meteor'; + +import { getDdpSdk } from '../../lib/sdk/ddpSdk'; + +/** + * Bridge incoming DDPSDK frames into Meteor.connection's collection dispatch. + * + * Without this, routing Meteor.apply methods through DDPSDK would leave the + * application in a broken state: Meteor-registered collections (Meteor.users, + * every Mongo.Collection subscribers of the Meteor.connection publications, + * etc.) only react to frames they receive on Meteor.connection's own socket. + * A successful login via the DDPSDK socket pushes the current user document + * and follow-up subscription payloads on that socket — not Meteor's — so the + * Users Zustand store (exposed as Meteor.users through userAndUsers.ts) + * never populates and useMainReady stays false. + * + * By tapping DDPSDK's MinimalDDPClient.onMessage, we selectively re-feed + * collection-mutation, ready and nosub messages through + * Meteor.connection._streamHandlers.onMessage, reusing Meteor's dispatch + * logic for _stores without having to duplicate it. + * + * Method results / updated / heartbeat frames are NOT re-emitted — those are + * already handled by either Meteor's own invokers (when the method went + * through Meteor.connection) or by ddpOverSDK's processResult (when it went + * through DDPSDK). Duplicating them would confuse Meteor's invoker state. + */ + +type ParsedDdpFrame = { msg?: string; id?: unknown; methods?: unknown } & Record; + +const COLLECTION_FRAMES = new Set(['added', 'changed', 'removed', 'addedBefore', 'movedBefore']); +const SUBSCRIPTION_LIFECYCLE_FRAMES = new Set(['ready', 'nosub']); + +// SDK-internal ids are 'rc-ddp-client-N'; Meteor's are numeric strings ('1', +// '2', ...). Method-result frames addressed to SDK-internal ids must NOT +// reach Meteor's _streamHandlers — Meteor's `updated` handler throws "No +// callback invoker for method ..." when the id is missing from +// _methodInvokers (document_processors.js:168). Filter those out at the +// bridge so SDK's own callAsync flows aren't surfaced into Meteor. +const isSdkInternalId = (id: unknown): boolean => typeof id === 'string' && id.startsWith('rc-ddp-client-'); + +const shouldBridgeToMeteor = (frame: ParsedDdpFrame): boolean => { + if (!frame || typeof frame.msg !== 'string') return false; + + if (COLLECTION_FRAMES.has(frame.msg) || SUBSCRIPTION_LIFECYCLE_FRAMES.has(frame.msg)) { + return true; + } + + if (frame.msg === 'result') { + return !isSdkInternalId(frame.id); + } + if (frame.msg === 'updated') { + const methods = Array.isArray(frame.methods) ? (frame.methods as unknown[]) : []; + // If any of the methodIds in the `updated` frame is SDK-internal, drop + // the whole frame: Meteor processes every id and would throw on the + // first miss. In practice an `updated` frame carries ids from a single + // originating method call, so this is all-or-nothing anyway. + return methods.length > 0 && !methods.some(isSdkInternalId); + } + + return false; +}; + +export const installDdpSdkCollectionBridge = (): void => { + const sdk = getDdpSdk(); + const { ddp } = sdk.client as unknown as { ddp: { onMessage: (cb: (payload: ParsedDdpFrame) => void) => () => void } }; + if (!ddp?.onMessage) return; + + ddp.onMessage((frame) => { + if (!shouldBridgeToMeteor(frame)) return; + + // `_streamHandlers.onMessage` returns a Promise (the message handler is an + // async generator). A throw inside the inner `_process_updated` / + // `_process_result` (e.g. "No callback invoker for method N" when a + // stale frame arrives after a force-logout cycle invalidates the + // invoker) would otherwise escape this scope as an unhandled rejection, + // aborting Meteor's frame queue and leaving subsequent login result + // frames unprocessed. Wrap the call so both sync throws and async + // rejections are contained — Meteor keeps draining the queue even when + // individual frames hit dead invokers. + try { + const result = Meteor.connection._streamHandlers.onMessage( + DDPCommon.stringifyDDP(frame as Parameters[0]), + ) as unknown; + if (result && typeof (result as Promise).then === 'function') { + (result as Promise).catch((err) => { + console.warn('[ddpSdk] bridge frame drop (async)', frame.msg, err); + }); + } + } catch (err) { + console.warn('[ddpSdk] bridge frame drop', frame.msg, err); + } + }); +}; + +installDdpSdkCollectionBridge(); diff --git a/apps/meteor/client/meteor/overrides/index.ts b/apps/meteor/client/meteor/overrides/index.ts index f3370db0592f5..b039f3861af3a 100644 --- a/apps/meteor/client/meteor/overrides/index.ts +++ b/apps/meteor/client/meteor/overrides/index.ts @@ -1,4 +1,8 @@ import './ddpOverREST'; +import './ddpSdkCollectionBridge'; +import './subscribeViaSDK'; +import './stubMeteorStream'; +import './killMeteorStream'; import './desktopInjection'; import './oauthRedirectUri'; import './settings'; diff --git a/apps/meteor/client/meteor/overrides/killMeteorStream.ts b/apps/meteor/client/meteor/overrides/killMeteorStream.ts new file mode 100644 index 0000000000000..f0d7138609188 --- /dev/null +++ b/apps/meteor/client/meteor/overrides/killMeteorStream.ts @@ -0,0 +1,110 @@ +import { Accounts } from 'meteor/accounts-base'; +import { Meteor } from 'meteor/meteor'; + +import { userIdStore } from '../../lib/user'; + +/** + * Reset Meteor.connection's revival/quiescence bookkeeping at boot. + * + * Meteor's bootstrap subscriptions (loginServiceConfiguration, + * autoupdate) are opened before our overrides load. Until those are + * "revived" against the live DDP session, `_waitingForQuiescence()` + * returns true and `_livedata_data` buffers every incoming frame + * instead of processing it — including the synthetic `updated` frame + * that ddpOverREST.processResult emits to drive method invoker + * callbacks. Wiping the revival/quiescence state lets that synthetic + * frame reach the invoker in the same tick. + * + * NOTE: an earlier revision of this file also called + * `_stream.disconnect({ _permanent: true })` to make DDPSDK the sole + * transport. That broke `MethodInvoker.sendMessage()`'s + * `if (this.connection._stream._connected) { _send(...) }` gate — with + * the stream dead, sendMessage queues the invoker waiting for a + * connection that never returns and ddpOverREST's `_send` wrapper + * never fires for any method. Lying `_connected = true` after the + * disconnect makes `sendMessage` proceed but causes other Meteor + * internals to dispatch on the dead socket and crash the page. So + * Meteor's WS now stays connected — invokers reach `_send`, which + * ddpOverREST intercepts and routes to REST (or DDPSDK for `login`). + */ +const conn = Meteor.connection as unknown as { + _subsBeingRevived: Record; + _methodsBlockingQuiescence: Record; + _messagesBufferedUntilQuiescence: unknown[]; + _outstandingMethodBlocks: unknown[]; + _methodInvokers: Record; +}; + +conn._subsBeingRevived = Object.create(null); +conn._methodsBlockingQuiescence = Object.create(null); +conn._messagesBufferedUntilQuiescence = []; + +/** + * Force-clear Accounts._loggingIn once a uid lands in userIdStore. + * + * Meteor's loggedInAndDataReadyCallback flips _loggingIn back to false + * from inside a Tracker.autorun that awaits Meteor.userAsync(). Our + * findOneAsync await boundary breaks Tracker dep propagation, and the + * same autorun is where Accounts.onLogin would normally fire — so + * neither hook fires when synchronizeUserData later writes the user + * into the store, and the UI stays on "Connecting..." with + * Meteor.loggingIn() pinned to true. + * + * userIdStore is updated by the userAndUsers Tracker.autorun the moment + * Accounts.connection.userId() flips, which happens inside + * makeClientLoggedIn before the broken autorun is even installed. So + * subscribing here gives a reliable "login completed" signal — just + * skip the synchronous boot snapshot and react only to subsequent + * transitions. + */ +let saw: string | undefined = userIdStore.getState(); +userIdStore.subscribe((next) => { + if (next === saw) return; + saw = next; + if (next) { + (Accounts as unknown as { _setLoggingIn?: (v: boolean) => void })._setLoggingIn?.(false); + } +}); + +/** + * Drain Meteor's outstanding-method queue on logout. + * + * Accounts.logout's `applyAsync` resolves immediately via Meteor's + * fire-and-forget client path (the actual server response is awaited + * only by the MethodInvoker callback). Inside that resolved `.then`, + * makeClientLoggedOut clears userId, which fires our userIdStore + * subscriber to teardown DDPSDK. By the time the server's logout result + * frame would arrive, the DDPSDK socket is already closed — so the + * logout MethodInvoker stays in `_outstandingMethodBlocks` with + * `sentMessage=true` forever. + * + * The next `_addOutstandingMethod` call (e.g. token-resume right after + * logout) checks `_outstandingMethodBlocks.length === 1` to decide + * whether to send immediately. With the orphaned logout block ahead of + * it, the new method is silently queued and never sent. Visible + * failure: re-login after logout never produces a login `_send` and the + * UI hangs on PageLoading. + * + * Drain inside Accounts.onLogout because it fires synchronously from + * makeClientLoggedOut *before* setUserId(null) and before any + * subsequent applyAsync can enqueue. Doing the same drain on the + * userIdStore transition would race the test's `_pollStoredLoginToken` + * call, which can enqueue the new login between makeClientLoggedOut and + * Tracker's deferred autorun re-run — we'd then wipe the new login + * along with the dead logout. + */ +Accounts.onLogout(() => { + conn._outstandingMethodBlocks = []; + conn._methodInvokers = Object.create(null); + // Also wipe _methodsBlockingQuiescence: the logout method's wait-flag + // is still here because its server response never landed (DDPSDK was + // torn down by makeClientLoggedOut). With it left in place, + // _waitingForQuiescence() stays true and _livedata_data buffers every + // subsequent frame — including the synthetic `updated` we inject via + // processResult to trigger dataVisible on the next method invoker. The + // invoker would then sit with _methodResult set, _dataVisible false, + // and the login callback never fires. + conn._methodsBlockingQuiescence = Object.create(null); + conn._messagesBufferedUntilQuiescence = []; + conn._subsBeingRevived = Object.create(null); +}); diff --git a/apps/meteor/client/meteor/overrides/stubMeteorStream.ts b/apps/meteor/client/meteor/overrides/stubMeteorStream.ts new file mode 100644 index 0000000000000..f1985baf88921 --- /dev/null +++ b/apps/meteor/client/meteor/overrides/stubMeteorStream.ts @@ -0,0 +1,268 @@ +import { Accounts } from 'meteor/accounts-base'; +import { DDPCommon } from 'meteor/ddp-common'; +import { Meteor } from 'meteor/meteor'; +import { Tracker } from 'meteor/tracker'; + +import { adoptAccountFromMeteorLoginResult, getDdpSdk } from '../../lib/sdk/ddpSdk'; + +/** + * Replace Meteor.connection._stream with a stub that pretends to be a + * connected DDP stream and forwards outbound frames through the DDPSDK + * socket. The goal: only one WebSocket per page (the DDPSDK one). Meteor + * still owns its Connection / MethodInvoker / _streamHandlers machinery — + * we just swap the transport underneath. + * + * What goes through here: + * - method frames bypassed by ddpOverREST (login resume, UserPresence:*, + * setUserStatus, logout) — routed via the SDK socket so they hit + * ddp-streamer's native handlers. + * - sub/unsub frames Meteor sends internally (resubscriptions on reset, + * bootstrap subs that escape Meteor.connection.subscribe) — routed via + * the SDK socket; the responses (ready/nosub/added/changed) are bridged + * back to Meteor's _streamHandlers in ddpSdkCollectionBridge. + * - ping frames from Meteor's heartbeat — answered locally with a synthetic + * pong fed back into _streamHandlers so the heartbeat stays satisfied. + * - connect/pong frames — discarded; the SDK socket has its own handshake. + */ + +type MeteorIDDPStream = { + currentStatus: { + status: string; + connected: boolean; + retryCount: number; + retryTime?: number; + reason?: string; + }; + eventCallbacks?: Record void>>; + statusListeners?: { changed(): void }; + on(event: string, callback: (...args: unknown[]) => void): void; + forEachCallback(name: string, cb: (callback: (...args: unknown[]) => void) => void): void; + send(data: string): void; + status(): MeteorIDDPStream['currentStatus']; + statusChanged(): void; + reconnect(options?: unknown): void; + disconnect(options?: { _permanent?: boolean; _error?: unknown }): void; + _lostConnection(error?: unknown): void; +}; + +type MeteorConnectionInternals = { + _stream: MeteorIDDPStream; + _streamHandlers: { + onMessage(raw: string): void; + onReset(): void; + }; +}; + +const conn = Meteor.connection as unknown as MeteorConnectionInternals; + +const realStream = conn._stream; + +// Carry Meteor's already-registered handlers (registered in the Connection +// constructor BEFORE we got a chance to swap `_stream`) over to the stub — +// onMessage, onReset, onDisconnect all live in `realStream.eventCallbacks`. +const inheritedCallbacks = realStream.eventCallbacks ?? {}; + +// Drop Meteor's WS. The stub takes over before any user code is gated on +// _stream._connected, so closing the real socket does not strand any send(). +try { + realStream.disconnect({ _permanent: true }); +} catch { + // already closed / never opened +} + +const eventCallbacks: Record void>> = Object.create(null); +for (const [name, callbacks] of Object.entries(inheritedCallbacks)) { + eventCallbacks[name] = (callbacks as Array<(...args: unknown[]) => void>).slice(); +} +const fire = (name: string, ...args: unknown[]): void => { + const list = eventCallbacks[name]; + if (!list) return; + list.slice().forEach((cb) => cb(...args)); +}; + +const TrackerDependency = (Tracker as unknown as { Dependency?: new () => { changed(): void } }).Dependency; +const statusListeners = TrackerDependency ? new TrackerDependency() : undefined; + +const stub: MeteorIDDPStream = { + currentStatus: { + status: 'connected', + connected: true, + retryCount: 0, + }, + + eventCallbacks, + statusListeners, + + on(name, callback) { + if (name !== 'message' && name !== 'reset' && name !== 'disconnect') { + throw new Error(`unknown event type: ${name}`); + } + (eventCallbacks[name] ||= []).push(callback); + }, + + forEachCallback(name, cb) { + (eventCallbacks[name] || []).slice().forEach(cb); + }, + + send(data) { + let frame: { msg?: string; id?: string; method?: string; name?: string; params?: unknown[] } | undefined; + try { + frame = DDPCommon.parseDDP(data) as typeof frame; + } catch { + return; + } + if (!frame || typeof frame.msg !== 'string') return; + void routeOutbound(frame); + }, + + status() { + statusListeners?.changed?.(); + return this.currentStatus; + }, + + statusChanged() { + statusListeners?.changed?.(); + }, + + reconnect() { + // SDK owns reconnection; no-op here. + }, + disconnect() { + // SDK owns disconnection; no-op so Meteor.disconnect() is harmless. + }, + _lostConnection() { + // Nothing to do — heartbeat over the stub never times out. + }, +}; + +conn._stream = stub; + +const bridgePongFor = (id?: string): void => { + conn._streamHandlers.onMessage( + DDPCommon.stringifyDDP({ msg: 'pong', ...(id != null && { id }) } as unknown as Parameters[0]), + ); +}; + +type SdkDdp = { + emit(event: string, payload: unknown): void; + onResult(id: string, cb: (payload: { msg: 'result'; id: string; error?: unknown; result?: unknown }) => void): () => void; +}; + +const routeOutbound = (frame: { msg?: string; id?: string; method?: string; name?: string; params?: unknown[] }): void => { + const sdk = getDdpSdk(); + const { ddp } = sdk.client as unknown as { ddp: SdkDdp }; + + switch (frame.msg) { + case 'connect': + // SDK already negotiated DDP version on its own socket. + return; + case 'pong': + return; + case 'ping': + bridgePongFor(frame.id); + return; + case 'method': + // Meteor's `login` resume goes through here. The SDK socket session + // is authenticated server-side by the resume frame, but `sdk.account` + // only gets populated by `sdk.account.loginWithToken`. Without + // adopting Meteor's login result here, the userIdStore subscriber in + // ddpSdk would notice `sdk.account.uid` is empty and dispatch a + // SECOND `loginWithToken` on the same socket — extra ~100-200ms on + // every page load and a divergent token in `sdk.account.user`. + if (frame.method === 'login' && typeof frame.id === 'string') { + ddp.onResult(frame.id, (payload) => { + if ('error' in payload && payload.error) return; + if (payload.result) adoptAccountFromMeteorLoginResult(payload.result); + }); + } + ddp.emit('send', frame); + return; + case 'sub': + case 'unsub': + // ddpSdkCollectionBridge re-feeds the matching response frames + // (result, updated, ready, nosub, added/changed/removed) into + // Meteor.connection._streamHandlers, where the existing + // _methodInvokers / _subsBeingRevived bookkeeping picks them up by + // id. We only need to put the outbound frame on the wire here. + ddp.emit('send', frame); + break; + + default: + // Unknown frame type; drop quietly. + } +}; + +// If Meteor already finished its DDP handshake before we got swapped in, +// _lastSessionId is set and we do nothing: heartbeat is running, onConnected +// has fired, etc. If not, synthesize a `connected` frame so Meteor moves out +// of its initial "connecting" state. Heartbeat pings land in stub.send and +// are answered locally with synthetic pongs. +queueMicrotask(() => { + const c = conn as unknown as { _lastSessionId?: string | null }; + if (c._lastSessionId) return; + try { + conn._streamHandlers.onMessage( + DDPCommon.stringifyDDP({ + msg: 'connected', + session: 'sdk-bridged', + } as unknown as Parameters[0]), + ); + fire('reset'); + } catch (err) { + console.warn('[stubMeteorStream] failed to bootstrap connected state', err); + } +}); + +// When the underlying SDK socket reconnects (e.g. after a server-side +// ws.close / ws.terminate from force-logout in microservices), Meteor's +// connection sees no transport event because the stub keeps reporting +// 'connected'. Without help, both the in-flight method machinery and +// accounts-base's reconnect-time login retry stay dormant — methods sent +// on the prior SDK session are stranded with sentMessage=true, and the +// per-call _reconnectStopper from callLoginMethod (accounts_client.js:292) +// never runs. Force-logout flows then leave the user with stale +// credentials. +// +// Fire `reset` on every subsequent SDK 'connected' event: this drives +// _streamHandlers.onReset → _handleOutstandingMethodsOnReset (resends +// pending methods so message-actions / report-message tests don't wedge) +// AND _callOnReconnectAndSendAppropriateOutstandingMethods → DDP._reconnectHook +// callbacks → the _reconnectStopper that retries login with the latest +// stored token and calls makeClientLoggedOut on failure (so the +// account-manage-devices / admin-device-management / e2ee-key-reset +// force-logout tests recover). The first connect is handled by the +// queueMicrotask above; skip it here. The "method result but no methods +// outstanding" / "No callback invoker" warnings the resent blocks +// occasionally generate are caught by the bridge's async catch in +// ddpSdkCollectionBridge. +const sdk = getDdpSdk(); +let firstConnectHandled = false; +sdk.connection.on('connected', () => { + if (!firstConnectHandled) { + firstConnectHandled = true; + return; + } + try { + fire('reset'); + } catch (err) { + console.warn('[stubMeteorStream] reset on SDK reconnect failed', err); + } +}); + +// Belt-and-suspenders: when the underlying SDK socket disconnects, also reset +// `Accounts._lastLoginTokenWhenPolled` so the next `_pollStoredLoginToken` +// (whether triggered by the 3s polling timer or an external poke like a test's +// `loginByUserState`) is forced to compare against `null` and fire a fresh +// login if the stored token still exists. This covers the gap where neither +// `useForceLogout` (stream message lost in the broker race) nor +// `_reconnectStopper`'s `makeClientLoggedOut` ran — without this, a stored +// token equal to the cached `_lastLoginTokenWhenPolled` short-circuits the +// poller and the user sits with stale credentials until the next genuine +// token rotation. +sdk.connection.on('disconnected', () => { + try { + (Accounts as unknown as { _lastLoginTokenWhenPolled?: string | null })._lastLoginTokenWhenPolled = null; + } catch { + // ignore — we just want the poller to wake up next time + } +}); diff --git a/apps/meteor/client/meteor/overrides/subscribeViaSDK.ts b/apps/meteor/client/meteor/overrides/subscribeViaSDK.ts new file mode 100644 index 0000000000000..8d23633bf8ef8 --- /dev/null +++ b/apps/meteor/client/meteor/overrides/subscribeViaSDK.ts @@ -0,0 +1,64 @@ +import { Meteor } from 'meteor/meteor'; + +import { getDdpSdk } from '../../lib/sdk/ddpSdk'; + +/** + * Route Meteor.connection.subscribe through DDPSDK so the few direct + * publications that Accounts and Meteor core still open + * (loginServiceConfiguration, meteor_autoupdate_clientVersions, ...) + * ride our single WebSocket instead of Meteor's. + * + * The collection frames produced by those subscriptions are re-fed into + * Meteor.connection._streamHandlers by ddpSdkCollectionBridge, so + * Meteor's collection dispatch keeps working unchanged. + */ + +type SubscribeCallbacks = { + onReady?: () => void; + onError?: (err: Error) => void; + onStop?: (err?: Error) => void; +}; + +const extractCallbacks = (args: unknown[]): { params: unknown[]; callbacks: SubscribeCallbacks } => { + if (args.length === 0) return { params: [], callbacks: {} }; + + const last = args[args.length - 1]; + + if (typeof last === 'function') { + return { params: args.slice(0, -1), callbacks: { onReady: last as () => void } }; + } + + if ( + last !== null && + typeof last === 'object' && + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (typeof (last as any).onReady === 'function' || + typeof (last as any).onError === 'function' || + typeof (last as any).onStop === 'function') + ) { + return { params: args.slice(0, -1), callbacks: last as SubscribeCallbacks }; + } + + return { params: args, callbacks: {} }; +}; + +type MeteorSubscriptionHandle = Meteor.SubscriptionHandle; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +(Meteor.connection as any).subscribe = ((name: string, ...rest: unknown[]): MeteorSubscriptionHandle => { + const { params, callbacks } = extractCallbacks(rest); + const subscription = getDdpSdk().client.subscribe(name, ...params); + + subscription + .ready() + .then(() => callbacks.onReady?.()) + .catch((err: Error) => callbacks.onError?.(err)); + + return { + stop: () => { + subscription.stop(); + callbacks.onStop?.(); + }, + ready: () => subscription.isReady, + } as MeteorSubscriptionHandle; +}) as Meteor.IMeteorConnection['subscribe']; diff --git a/apps/meteor/client/providers/ServerProvider.tsx b/apps/meteor/client/providers/ServerProvider.tsx index 6694c2352ebae..c66218742d910 100644 --- a/apps/meteor/client/providers/ServerProvider.tsx +++ b/apps/meteor/client/providers/ServerProvider.tsx @@ -11,6 +11,7 @@ import type { Method, PathFor, OperationParams, OperationResult, UrlParams, Path import type { UploadResult, ServerContextValue } from '@rocket.chat/ui-contexts'; import { ServerContext } from '@rocket.chat/ui-contexts'; import { Meteor } from 'meteor/meteor'; +import { Tracker } from 'meteor/tracker'; import { compile } from 'path-to-regexp'; import { useMemo, type ReactNode } from 'react'; @@ -18,11 +19,12 @@ import { sdk } from '../../app/utils/client/lib/SDKClient'; import { Info as info } from '../../app/utils/rocketchat.info'; import { useReactiveValue } from '../hooks/useReactiveValue'; import { absoluteUrl } from '../lib/absoluteUrl'; +import { ensureConnectedAndAuthenticated, getDdpSdk } from '../lib/sdk/ddpSdk'; const callMethod = ( methodName: MethodName, ...args: ServerMethodParameters -): Promise> => Meteor.callAsync(methodName, ...args); +): Promise> => sdk.call(methodName, ...(args as any)) as Promise>; const callEndpoint = ({ method, @@ -73,11 +75,63 @@ const getStream = const writeStream = >(streamName: N, streamKey: K, ...args: StreamerCallbackArgs) => sdk.publish(streamName, [streamKey, ...args]); -const disconnect = () => Meteor.disconnect(); +const disconnect = () => { + Meteor.disconnect(); + try { + getDdpSdk().connection.close(); + } catch { + // no-op — DDPSDK may not be connected yet + } +}; + +const reconnect = () => { + Meteor.reconnect(); + // ensureConnectedAndAuthenticated handles both 'connect' and loginWithToken, + // so reconnecting here also re-establishes the DDPSDK session with the same + // token Meteor resumes with. + void ensureConnectedAndAuthenticated(); +}; -const reconnect = () => Meteor.reconnect(); +// Combine Meteor's DDP status with our DDPSDK's connection status so the +// ConnectionStatusBar / idle-connection hooks reflect the worst-case of both +// transports: if either socket is down, UI shows disconnected. Meteor.status() +// is Tracker-reactive; bridge DDPSDK's connection events into a local +// Tracker.Dependency so the same useReactiveValue autorun re-fires on either +// transport's transitions. +const ddpSdkStatusDep = new Tracker.Dependency(); +getDdpSdk().connection.on('connection', () => ddpSdkStatusDep.changed()); + +type CombinedStatus = ReturnType; + +// DDPSDK is now the primary transport for every method/subscription in +// the client, so its connection state drives the UI. Meteor.connection +// still exists as a legacy anchor for Accounts internals; we only fall +// back to its status fields when DDPSDK hasn't reported a retry yet. +const sdkStatusToMeteor = (sdkStatus: string, meteor: CombinedStatus): CombinedStatus => { + const retry = { retryCount: meteor.retryCount, retryTime: meteor.retryTime }; + + switch (sdkStatus) { + case 'connected': + return { status: 'connected', connected: true, ...retry }; + case 'connecting': + return { status: 'connecting', connected: false, ...retry }; + case 'reconnecting': + return { status: 'connecting', connected: false, ...retry }; + case 'failed': + return { status: 'failed', connected: false, ...retry }; + case 'closed': + case 'disconnected': + return { status: 'waiting', connected: false, ...retry }; + case 'idle': + default: + return { status: 'offline', connected: false, ...retry }; + } +}; -const getStatus = () => ({ ...Meteor.status() }); +const getStatus = () => { + ddpSdkStatusDep.depend(); + return sdkStatusToMeteor(getDdpSdk().connection.status, Meteor.status()); +}; type ServerProviderProps = { children?: ReactNode }; diff --git a/apps/meteor/client/startup/startup.ts b/apps/meteor/client/startup/startup.ts index 10bcbd8171cf9..8a70a3ff2e201 100644 --- a/apps/meteor/client/startup/startup.ts +++ b/apps/meteor/client/startup/startup.ts @@ -4,6 +4,7 @@ import { Accounts } from 'meteor/accounts-base'; import 'highlight.js/styles/github.css'; import { sdk } from '../../app/utils/client/lib/SDKClient'; import { onLoggedIn } from '../lib/loggedIn'; +import { ensureConnectedAndAuthenticated } from '../lib/sdk/ddpSdk'; import { userIdStore } from '../lib/user'; import { removeLocalUserData, synchronizeUserData } from '../lib/userData'; import { fireGlobalEvent } from '../lib/utils/fireGlobalEvent'; @@ -17,10 +18,19 @@ const emitStatusChange = (next: UserStatus | undefined) => { fireGlobalEvent('status-changed', status); }; -onLoggedIn(async () => { - const uid = userIdStore.getState(); - if (!uid) return; - +const runUserDataSync = async (uid: string) => { + // synchronizeUserData opens a `stream-notify-user/${uid}/userData` sub + // over DDPSDK. The server rejects that sub with "not-allowed" until + // DDPSDK has completed loginWithToken on its own socket. Both + // runUserDataSync and ensureConnectedAndAuthenticated are subscribers + // of userIdStore, so without sequencing the sub races the auth and + // hits the rejection on every re-login. Await the SDK auth here so + // the sub fires authenticated. + try { + await ensureConnectedAndAuthenticated(); + } catch { + // non-fatal: sdk.stream queues until DDPSDK eventually auths + } const user = await synchronizeUserData(uid); if (!user) return; @@ -30,8 +40,51 @@ onLoggedIn(async () => { } emitStatusChange(user.status); +}; + +// Both `onLoggedIn` (from accounts-base) and `userIdStore.subscribe` +// (belt-and-braces in case the loggedInAndDataReadyCallback's user-await +// autorun gets wedged on logout → fresh login) fire for the same uid on a +// successful login. runUserDataSync calls userSetUtcOffset which is +// rate-limited on CI/prod, so without a shared guard the second call +// returns 400 too-many-requests and downstream REST calls (sessions/list +// etc.) start coming back 401 because the rate limiter throttles auth +// checks for the rest of the window. Use a single guarded sync gate, but +// reset it on failure so SAML/oauth/post-logout flows can retry — those +// flows depend on a second runUserDataSync after the SDK socket finishes +// authenticating, otherwise the userData stream subscription comes back +// nosub and synchronizeUserData throws, leaving useUserDataSyncReady +// false and the page stuck on PageLoading. +let lastSyncedUid: string | undefined; +const syncOnce = (uid: string | undefined): void => { + // Reset on logout transitions so a subsequent re-login (same uid or different) + // runs a fresh sync. Force-logout via the SDK loginWithToken wrap clears + // creds via Accounts._unstoreLoginToken() + setUserId(null), which does NOT + // fire Accounts.onLogout — so without this branch, lastSyncedUid stays set, + // the next login is deduped, runUserDataSync is skipped, and + // useUserDataSyncReady stays false (page wedged on PageLoading). + if (!uid) { + lastSyncedUid = undefined; + return; + } + if (uid === lastSyncedUid) return; + lastSyncedUid = uid; + void runUserDataSync(uid).catch((err) => { + console.warn('[startup] runUserDataSync failed; clearing dedup to allow a retry', err); + if (lastSyncedUid === uid) lastSyncedUid = undefined; + }); +}; + +onLoggedIn(() => { + syncOnce(userIdStore.getState()); }); +userIdStore.subscribe((uid) => { + syncOnce(uid); +}); + +syncOnce(userIdStore.getState()); + Users.use.subscribe(() => { const uid = userIdStore.getState(); if (!uid) return; @@ -43,6 +96,7 @@ Users.use.subscribe(() => { Accounts.onLogout(() => { removeLocalUserData(); status = undefined; + lastSyncedUid = undefined; }); // Session-resume failure (expired stored token on page load): Meteor has already diff --git a/apps/meteor/client/views/root/hooks/loggedIn/useForceLogout.ts b/apps/meteor/client/views/root/hooks/loggedIn/useForceLogout.ts index 07390ddb0d4aa..bd8fa327cfc16 100644 --- a/apps/meteor/client/views/root/hooks/loggedIn/useForceLogout.ts +++ b/apps/meteor/client/views/root/hooks/loggedIn/useForceLogout.ts @@ -1,4 +1,6 @@ import { useStream, useSessionDispatch } from '@rocket.chat/ui-contexts'; +import { Accounts } from 'meteor/accounts-base'; +import { Meteor } from 'meteor/meteor'; import { useEffect } from 'react'; export const useForceLogout = (userId: string) => { @@ -10,6 +12,25 @@ export const useForceLogout = (userId: string) => { const unsubscribe = getNotifyUserStream(`${userId}/force_logout`, () => { setForceLogout(true); + + // Trigger an actual local logout. With the SDK socket as the + // transport, the legacy "server closes the WS, client reconnects, + // loginWithToken fails, accounts-base bounces to Login" chain no + // longer fires reliably: DDPSDK auto-retries loginWithToken on + // every `connected` and swallows the rejection with `void`, so + // the navbar stays on Home with stale credentials. Wipe Meteor's + // stored login token + userId here so the router falls back to + // /login. + try { + Accounts._unstoreLoginToken(); + } catch { + // ignore + } + try { + (Meteor.connection as unknown as { setUserId: (uid: string | null) => void }).setUserId(null); + } catch { + // ignore + } }); return unsubscribe; diff --git a/apps/meteor/package.json b/apps/meteor/package.json index 6d7732d83c0fb..73192ad7a937c 100644 --- a/apps/meteor/package.json +++ b/apps/meteor/package.json @@ -102,6 +102,7 @@ "@rocket.chat/core-typings": "workspace:^", "@rocket.chat/cron": "workspace:^", "@rocket.chat/css-in-js": "^0.32.0", + "@rocket.chat/ddp-client": "workspace:^", "@rocket.chat/emitter": "^0.32.0", "@rocket.chat/favicon": "workspace:^", "@rocket.chat/federation-matrix": "workspace:^", diff --git a/apps/meteor/tests/e2e/page-objects/fragments/home-content.ts b/apps/meteor/tests/e2e/page-objects/fragments/home-content.ts index 761520ad34e0a..d6dd034ae616d 100644 --- a/apps/meteor/tests/e2e/page-objects/fragments/home-content.ts +++ b/apps/meteor/tests/e2e/page-objects/fragments/home-content.ts @@ -125,24 +125,27 @@ export class HomeContent { await expect(this.composer.inputMessage).toBeEnabled(); await this.composer.inputMessage.fill(text); - if (enforce) { - const responsePromise = this.page.waitForResponse( - (response) => - /api\/v1\/method.call\/sendMessage/.test(response.url()) && response.status() === 200 && response.request().method() === 'POST', - ); - - await this.composer.btnSend.click(); - - const response = await (await responsePromise).json(); - - const mid = JSON.parse(response.message).result._id; - const messageLocator = this.getMessageById(mid); - - await expect(messageLocator).toBeVisible(); - await expect(messageLocator).not.toHaveClass('rcx-message--pending'); - } else { + if (!enforce) { await this.composer.btnSend.click(); + return; } + + // Wait for the message to settle in the DOM. Transport-agnostic — the + // message may be sent via REST (`/api/v1/method.call/sendMessage`) or + // via DDP/WS depending on whether DDPSDK is the active dispatcher; + // either way Meteor's optimistic insert renders the new list item + // before the server confirms, and the `rcx-message--pending` class + // drops once the server result lands. + const before = await this.messageListItems.count(); + + await this.composer.btnSend.click(); + + // Use `>=` rather than `==` because some flows (e.g. just-created + // encrypted channels) drop additional list items in alongside the + // user's send (other in-flight messages, decryption-status items), + // so an exact count is racy. + await expect.poll(() => this.messageListItems.count(), { timeout: 10_000 }).toBeGreaterThanOrEqual(before + 1); + await expect(this.lastUserMessage).not.toHaveClass(/rcx-message--pending/); } async dispatchSlashCommand(text: string): Promise { diff --git a/ee/apps/ddp-streamer/src/DDPStreamer.ts b/ee/apps/ddp-streamer/src/DDPStreamer.ts index c9e4ffedbe490..ccf5f2b1b8e49 100644 --- a/ee/apps/ddp-streamer/src/DDPStreamer.ts +++ b/ee/apps/ddp-streamer/src/DDPStreamer.ts @@ -60,7 +60,23 @@ export class DDPStreamer extends ServiceClass { return; } if (client?.userId === uid) { - ws.terminate(); + // Graceful close: lets the WS lib flush queued frames (including + // the `notify-user//force_logout` stream message that the + // monolith listener at apps/meteor/server/modules/listeners/listeners.module.ts:49 + // just enqueued) before the socket goes down. Previously this was + // `ws.terminate()`, which sends a TCP RST immediately and drops + // the queued frames — clients depending on the stream message + // (useForceLogout hook → Accounts._unstoreLoginToken + setUserId(null)) + // then never see the cleanup, leaving stale credentials in + // localStorage. Falls back to terminate() after a short grace + // period for unresponsive sockets. + ws.close(); + const guard = setTimeout(() => { + if (ws.readyState !== ws.CLOSED) { + ws.terminate(); + } + }, 5000); + ws.once('close', () => clearTimeout(guard)); } }); }); diff --git a/packages/ddp-client/__tests__/Connection.spec.ts b/packages/ddp-client/__tests__/Connection.spec.ts index 7ef9248abaa5b..10d0d24d2581f 100644 --- a/packages/ddp-client/__tests__/Connection.spec.ts +++ b/packages/ddp-client/__tests__/Connection.spec.ts @@ -139,20 +139,111 @@ it('should queue messages if the connection is not ready', async () => { await handleMethod(server, 'method', ['arg1', 'arg2'], '1'); }); -it('should throw an error if a reconnect is called while a connection is in progress', async () => { +it('should be idempotent if reconnect is called while already connected', async () => { const client = new MinimalDDPClient(); const connection = ConnectionImpl.create('ws://localhost:1234', globalThis.WebSocket, client, { retryCount: 0, retryTime: 0 }); await handleConnection(server, connection.connect()); - await expect(connection.reconnect()).rejects.toThrow('Connection in progress'); + // Previous behavior was to throw "Connection in progress" — the consumer's + // `void this.reconnect()` paths (notably the ws.onclose retry timer) + // surfaced that as an unhandled rejection / pageError. Now a redundant + // reconnect is just a no-op resolving with the current state. + await expect(connection.reconnect()).resolves.toBe(true); + expect(connection.status).toBe('connected'); }); -it('should throw an error if a connect is called while a connection is in progress', async () => { +it('should be idempotent if connect is called while already connected', async () => { const client = new MinimalDDPClient(); const connection = ConnectionImpl.create('ws://localhost:1234', globalThis.WebSocket, client, { retryCount: 0, retryTime: 0 }); await handleConnection(server, connection.connect()); - await expect(connection.connect()).rejects.toThrow('Connection in progress'); + await expect(connection.connect()).resolves.toBe(true); + expect(connection.status).toBe('connected'); +}); + +it('should not surface the retry timer rejection when an external connect won the race', async () => { + // Regression: ws.onclose schedules a `void this.reconnect()` timer; if the + // consumer (e.g. ddpSdk.ts startConnect) opens a fresh socket before that + // timer fires, the timer used to reject with "Connection in progress" and, + // because of the leading `void`, the rejection became an unhandled + // rejection on the page. The timer must now no-op silently when the + // connection has already been re-established. + const client = new MinimalDDPClient(); + const connection = ConnectionImpl.create('ws://localhost:1234', WebSocket, client, { retryCount: 1, retryTime: 100 }); + + await handleConnection(server, connection.connect()); + expect(connection.status).toBe('connected'); + + jest.useFakeTimers(); + + server.close(); + WS.clean(); + server = new WS('ws://localhost:1234/websocket'); + + expect(connection.status).toBe('disconnected'); + + // Track unhandled rejections on the timer's promise. + const unhandled = jest.fn(); + process.on('unhandledRejection', unhandled); + + // External code opens a new connection BEFORE the retry timer fires. + const externalConnect = handleConnection(server, connection.connect()); + + // Run the timer. + await jest.advanceTimersByTimeAsync(200); + await externalConnect; + + // Drain any microtasks the timer might have queued. + await Promise.resolve(); + await Promise.resolve(); + + expect(connection.status).toBe('connected'); + expect(unhandled).not.toHaveBeenCalled(); + process.off('unhandledRejection', unhandled); + jest.useRealTimers(); +}); + +it('should ignore a stale ws.onclose that fires after the socket has been replaced', async () => { + // Regression: ws.onclose handlers were closed over the original ws but + // mutated `this.status`/`this.retryCount` unconditionally. If a late close + // event from an old socket arrives after a new socket is connected, the + // handler would flip status back to 'disconnected' and schedule another + // retry timer. + const client = new MinimalDDPClient(); + const connection = ConnectionImpl.create('ws://localhost:1234', WebSocket, client, { retryCount: 1, retryTime: 100 }); + + await handleConnection(server, connection.connect()); + const firstWs = (connection as unknown as { ws: WebSocket }).ws; + expect(connection.status).toBe('connected'); + + jest.useFakeTimers(); + server.close(); + WS.clean(); + server = new WS('ws://localhost:1234/websocket'); + + expect(connection.status).toBe('disconnected'); + + await handleConnection( + server, + jest.advanceTimersByTimeAsync(200), + new Promise((resolve) => connection.once('reconnecting', () => resolve(undefined))), + new Promise((resolve) => connection.once('connection', (data) => resolve(data))), + ); + + expect(connection.status).toBe('connected'); + jest.useRealTimers(); + const secondWs = (connection as unknown as { ws: WebSocket }).ws; + expect(secondWs).not.toBe(firstWs); + + const statusBefore = connection.status; + const retryBefore = (connection as unknown as { retryCount: number }).retryCount; + + // Synthesize a late `close` event on the original socket — the handler + // must short-circuit because `this.ws !== ws` for the closed-over ws. + (firstWs as unknown as { onclose?: () => void }).onclose?.(); + + expect(connection.status).toBe(statusBefore); + expect((connection as unknown as { retryCount: number }).retryCount).toBe(retryBefore); }); diff --git a/packages/ddp-client/__tests__/DDPDispatcher.spec.ts b/packages/ddp-client/__tests__/DDPDispatcher.spec.ts index 7dfde76260ae4..ef61bedd654b6 100644 --- a/packages/ddp-client/__tests__/DDPDispatcher.spec.ts +++ b/packages/ddp-client/__tests__/DDPDispatcher.spec.ts @@ -72,6 +72,34 @@ it('should send outstanding blocks if there is no block waiting and item is adde expect(fn).toHaveBeenCalledTimes(1); }); +it('emits non-method payloads immediately, even when a wait block is at the head', () => { + // Regression: a connect frame dispatched while a wait `login` method is + // queued must still reach the server. Otherwise the DDP handshake never + // completes and the socket wedges open but unconnected. + const fn = jest.fn(); + const ddpDispatcher = new DDPDispatcher(); + ddpDispatcher.on('send', fn); + + const login = ddp.call('login'); + ddpDispatcher.dispatch(login, { wait: true }); + expect(fn).toHaveBeenCalledTimes(1); + expect(fn).toHaveBeenNthCalledWith(1, login); + + const connectPayload = { msg: 'connect' as const, version: '1', support: ['1'] }; + ddpDispatcher.dispatch(connectPayload); + expect(fn).toHaveBeenCalledTimes(2); + expect(fn).toHaveBeenNthCalledWith(2, connectPayload); + + const subPayload = { msg: 'sub' as const, id: 'a', name: 'foo', params: [] }; + ddpDispatcher.dispatch(subPayload); + expect(fn).toHaveBeenCalledTimes(3); + expect(fn).toHaveBeenNthCalledWith(3, subPayload); + + // Wait block remains pending — only the wait method is queued, the + // non-method frames bypassed it. + expect(ddpDispatcher.queue).toEqual([{ wait: true, items: [login] }]); +}); + it('should send the next blocks if the outstanding block was completed', () => { const fn = jest.fn(); diff --git a/packages/ddp-client/src/Connection.ts b/packages/ddp-client/src/Connection.ts index 0702103bd1276..70dd959d45c4d 100644 --- a/packages/ddp-client/src/Connection.ts +++ b/packages/ddp-client/src/Connection.ts @@ -109,8 +109,17 @@ export class ConnectionImpl } reconnect(): Promise { + // Idempotent — if another caller already started (or finished) a connection + // since this reconnect was scheduled, we don't need to do anything. The + // retry timer enqueued by `ws.onclose` runs with no awareness of any + // concurrent `connect()` (e.g. the consumer's own bootstrap or + // resume-on-userId-change path), so without this guard a late timer + // rejected with "Connection in progress" — and because the timer fires + // from `void this.reconnect()` the rejection became an unhandled + // rejection at the page level. if (this.status === 'connecting' || this.status === 'connected') { - return Promise.reject(new Error('Connection in progress')); + clearTimeout(this.retryOptions.retryTimer); + return Promise.resolve(true); } clearTimeout(this.retryOptions.retryTimer); @@ -123,8 +132,14 @@ export class ConnectionImpl } connect() { + // Same idempotency guard as `reconnect()` — multiple call sites + // (`reconnect()`, ws.onclose retry timer, external `startConnect`) can + // race; rejecting forced every caller to wrap in `.catch(() => {})` + // just to silence noise, and the internal timer's `void this.reconnect()` + // path didn't have a catch at all. if (this.status === 'connecting' || this.status === 'connected') { - return Promise.reject(new Error('Connection in progress')); + clearTimeout(this.retryOptions.retryTimer); + return Promise.resolve(true); } this.status = 'connecting'; @@ -183,6 +198,15 @@ export class ConnectionImpl }; ws.onclose = () => { + // If a newer ws has already taken over (this socket was closed + // after `connect()` opened a replacement), ignore the late + // onclose. Otherwise its handler would clobber `this.status` and + // `retryCount`, and could even schedule a redundant retry timer + // that fires while the new socket is healthy — observed as the + // "Connection in progress" pageError racing on every reconnect. + if (this.ws !== ws) { + return; + } clearTimeout(this.retryOptions.retryTimer); if (this.status === 'closed') { return; @@ -198,6 +222,16 @@ export class ConnectionImpl this.retryCount += 1; this.retryOptions.retryTimer = setTimeout(() => { + // Re-check the status when the timer actually fires. If the + // consumer bootstrapped a fresh `connect()` in the meantime + // (status flipped from 'disconnected' to 'connecting' or + // 'connected'), there's nothing for us to do. Without this + // the timer would call `this.reconnect()`, which (pre-this + // patch) rejected with "Connection in progress" and surfaced + // as an unhandled rejection. + if (this.status === 'connecting' || this.status === 'connected' || this.status === 'closed') { + return; + } void this.reconnect(); }, this.retryOptions.retryTime * this.retryCount); }; diff --git a/packages/ddp-client/src/DDPDispatcher.ts b/packages/ddp-client/src/DDPDispatcher.ts index b24d1d12a0be9..c2c61f33df201 100644 --- a/packages/ddp-client/src/DDPDispatcher.ts +++ b/packages/ddp-client/src/DDPDispatcher.ts @@ -3,6 +3,7 @@ */ import { MinimalDDPClient } from './MinimalDDPClient'; +import type { OutgoingPayload } from './types/OutgoingPayload'; import type { MethodPayload } from './types/methodsPayloads'; type Blocks = { @@ -15,12 +16,25 @@ type Queue = Blocks[]; export class DDPDispatcher extends MinimalDDPClient { queue: Queue = []; - override dispatch(msg: MethodPayload, options?: { wait?: boolean }) { + override dispatch(payload: OutgoingPayload, options?: { wait?: boolean }) { + // Only method payloads participate in the wait/queue serialization that + // implements Meteor's wait-method semantics. Protocol-level frames + // (connect, sub, unsub, ping, pong) must be delivered immediately: + // queueing a `connect` frame behind a wait `login` block, for example, + // wedges the DDP handshake — the socket opens but `connect` never + // reaches the server, so the server never replies `connected` and the + // session never establishes. + if (payload.msg !== 'method') { + this.emit('send', payload); + return; + } + if (options?.wait) { - this.wait(msg); + this.wait(payload); return; } - this.pushItem(msg); + + this.pushItem(payload); } wait(block: MethodPayload) { diff --git a/yarn.lock b/yarn.lock index 55dc6caa2167e..36302043c9ec6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9900,6 +9900,7 @@ __metadata: "@rocket.chat/core-typings": "workspace:^" "@rocket.chat/cron": "workspace:^" "@rocket.chat/css-in-js": "npm:^0.32.0" + "@rocket.chat/ddp-client": "workspace:^" "@rocket.chat/desktop-api": "workspace:~" "@rocket.chat/emitter": "npm:^0.32.0" "@rocket.chat/favicon": "workspace:^"