Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
57bcb94
fix(ddp-client): never queue non-method frames behind a wait block
ggazzo Apr 28, 2026
eee75f9
fix(useFormatDate): default Message_DateFormat to 'LL' to avoid date-…
ggazzo Apr 28, 2026
10fc544
feat(sdk): scaffold @rocket.chat/ddp-client alongside Meteor.connection
ggazzo Apr 28, 2026
38afd89
feat(sdk): route Meteor.connection method calls and subs through DDPSDK
ggazzo Apr 28, 2026
43a8b7b
test(e2e): make HomeContent.sendMessage transport-agnostic
ggazzo Apr 28, 2026
448c9ca
fix(sdk): route methods via REST, login via DDPSDK, surface auth errors
ggazzo Apr 28, 2026
c6b337a
fix(sdk): defer DDPSDK stream subscriptions until the socket is authe…
ggazzo Apr 29, 2026
e802bab
fix(sdk): bypass login resume so it routes to ddp-streamer in microse…
ggazzo Apr 29, 2026
b8fef3e
fix(sdk): never route fresh logins (SAML/password/OAuth) through DDPSDK
ggazzo Apr 29, 2026
a037265
feat(sdk): prototype stub Meteor stream to consolidate WS transports
ggazzo Apr 29, 2026
b3e3644
fix(sdk): filter SDK-internal ids in stream bridge; adopt login result
ggazzo Apr 29, 2026
a54cb9f
fix(sdk): dedup login on boot to prevent duplicate Presence connections
ggazzo Apr 29, 2026
b3b3f87
chore(sdk): satisfy lint on stub stream + bridge
ggazzo Apr 29, 2026
629c09e
fix(sdk): dedup runUserDataSync between onLoggedIn and userIdStore su…
ggazzo Apr 29, 2026
1754343
fix(sdk): clear local credentials when SDK auto-relogin fails on reco…
ggazzo Apr 29, 2026
83183b0
fix(sdk): share inflight-login lock between stub-routed and SDK paths
ggazzo Apr 29, 2026
d31efca
fix(sdk): move force-logout cleanup to useForceLogout, revert loginWi…
ggazzo Apr 30, 2026
2e721a2
fix(sdk): bound Accounts.loggingIn gate + retry on syncOnce failure
ggazzo Apr 30, 2026
a940e3f
fix(sdk): re-introduce loginWithToken wrap with stricter guards
ggazzo Apr 30, 2026
26cebbe
fix(sdk): defer loginWithToken auth-error cleanup to avoid racing fre…
ggazzo Apr 30, 2026
841c0d2
fix(sdk): replace broken Accounts.loggingIn gate with sdk.account poll
ggazzo Apr 30, 2026
310a44f
fix(sdk): reset syncOnce dedup on uid transition to undefined
ggazzo Apr 30, 2026
f6cd3b7
fix(sdk): only bridge result/updated frames when Meteor still has the…
ggazzo Apr 30, 2026
76c79d1
fix(sdk): catch async throws from bridged frames so the queue keeps d…
ggazzo Apr 30, 2026
261c266
fix(sdk): retry login through Meteor instead of clearing creds on aut…
ggazzo Apr 30, 2026
db2a823
chore(sdk): instrument loginWithToken auth-error path to diagnose CI …
ggazzo Apr 30, 2026
7c5512d
fix(sdk): re-emit reset on SDK reconnect so Meteor's onReconnect runs
ggazzo Apr 30, 2026
3048858
fix(sdk): invoke DDP.onReconnect callbacks directly, skip full onReset
ggazzo May 1, 2026
39bf2fc
fix(sdk): revert to full fire('reset') on SDK reconnect
ggazzo May 1, 2026
5e5ec97
chore(sdk): instrument SDK reconnect + Accounts._pollStoredLoginToken
ggazzo May 1, 2026
64f0445
fix(ddp-streamer): use graceful ws.close() on user.forceLogout
ggazzo May 1, 2026
8861751
fix(sdk): resend outstanding methods on SDK reconnect, skip _reconnec…
ggazzo May 1, 2026
42a6987
fix(sdk): revert to full fire('reset') — _reconnectStopper is needed
ggazzo May 1, 2026
7742877
fix(ddp-client): make Connection.connect/reconnect idempotent
ggazzo May 1, 2026
135bb4b
fix(sdk): reset Accounts._lastLoginTokenWhenPolled on SDK disconnect
ggazzo May 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ddp-client-idempotent-reconnect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@rocket.chat/ddp-client": patch
---

Make `Connection.connect()` and `Connection.reconnect()` idempotent. Previously they rejected with `Error('Connection in progress')` when called while a connection was already in flight or established. Because the internal retry timer (`ws.onclose` → `setTimeout(() => void this.reconnect(), …)`) fires with no `.catch`, that rejection surfaced as an unhandled rejection at the page level whenever an external caller (e.g. an SDK consumer's bootstrap path) won the race against the timer. The timer now no-ops when the connection has already been re-established, and a stale `ws.onclose` from a replaced socket no longer clobbers the new socket's status or schedules a redundant retry.
9 changes: 9 additions & 0 deletions apps/meteor/app/notifications/client/lib/Presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@ import { UserStatus } from '@rocket.chat/core-typings';
import { Meteor } from 'meteor/meteor';

import { Presence } from '../../../../client/lib/presence';
import { getDdpSdk } from '../../../../client/lib/sdk/ddpSdk';
import { createDdpSdkStreamerAdapter } from '../../../../client/lib/sdk/streamerAdapter';
import { streamerCentral } from '../../../../client/lib/streamer';

// TODO implement API on Streamer to be able to listen to all streamed data
// this is a hacky way to listen to all streamed data from user-presence Streamer

// Register the presence streamer on BOTH transports. The subscribe call in
// client/lib/presence.ts routes through DDPSDK when it's ready and falls back
// to Meteor otherwise, so the corresponding messages can arrive on either WS.
// StreamerCentral uses a per-connection `hasMeteorStreamerEventListeners` flag,
// so calling setupDdpConnection twice with distinct connection objects
// installs both listeners without duplicating within the same transport.
streamerCentral.getStreamer('user-presence', { ddpConnection: Meteor.connection });
streamerCentral.setupDdpConnection('user-presence', createDdpSdkStreamerAdapter(getDdpSdk()));

type args = [username: string, statusChanged?: UserStatus, statusText?: string];

Expand Down
185 changes: 84 additions & 101 deletions apps/meteor/app/utils/client/lib/SDKClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import type { RestClientInterface } from '@rocket.chat/api-client';
import type { SDK, ClientStream, StreamKeys, StreamNames, StreamerCallbackArgs, ServerMethods } from '@rocket.chat/ddp-client';
import { Emitter } from '@rocket.chat/emitter';
import { Accounts } from 'meteor/accounts-base';
import { DDPCommon } from 'meteor/ddp-common';
import { Meteor } from 'meteor/meteor';

import { APIClient } from './RestApiClient';
import { ensureConnectedAndAuthenticated, getDdpSdk } from '../../../../client/lib/sdk/ddpSdk';

declare module '@rocket.chat/ddp-client' {
// eslint-disable-next-line @typescript-eslint/naming-convention
Expand All @@ -19,30 +19,6 @@ declare module '@rocket.chat/ddp-client' {
}
}

const isChangedCollectionPayload = (
msg: any,
): msg is { msg: 'changed'; collection: string; fields: { eventName: string; args: unknown[] } } => {
if (typeof msg !== 'object' && (msg !== null || msg !== undefined)) {
return false;
}
if (msg.msg !== 'changed') {
return false;
}
if (typeof msg.collection !== 'string') {
return false;
}
if (typeof msg.fields !== 'object' && (msg.fields !== null || msg.fields !== undefined)) {
return false;
}
if (typeof msg.fields.eventName !== 'string') {
return false;
}
if (!Array.isArray(msg.fields.args)) {
return false;
}
return true;
};

type EventMap<N extends StreamNames = StreamNames, K extends StreamKeys<N> = StreamKeys<N>> = {
[key in `stream-${N}/${K}`]: StreamerCallbackArgs<N, K>;
};
Expand All @@ -57,83 +33,101 @@ type StreamMapValue = {
unsubList: Set<() => void>;
};

const createNewMeteorStream = (streamName: StreamNames, key: StreamKeys<StreamNames>, args: unknown[]): StreamMapValue => {
const createNewDdpSdkStream = (
streamProxy: Emitter<EventMap>,
streamName: StreamNames,
key: StreamKeys<StreamNames>,
args: unknown[],
): StreamMapValue => {
const ee = new Emitter<{
ready: [error: any] | [undefined, any];
error: [error: any];
stop: undefined;
}>();
const meta = {
ready: false,
};
const meta = { ready: false };

// Defer the actual `subscribe` until DDPSDK is authenticated. Without this,
// stream subscriptions fired immediately after re-login (e.g. the
// SubscriptionsCachedStore's `notify-user/<uid>/subscriptions-changed`
// listener that re-arms via onLoggedIn) hit the SDK socket while it's
// still anonymous — server rejects with `not-allowed`/`nosub`, the
// stream's `ready` promise emits an error, and the cached store never
// receives subsequent server events. The visible failure: an agent that
// just took a livechat chat post-relogin sees the chat work but the
// "Move to the queue" button never appears, because the new subscription
// the server creates for that agent is never replicated to the client's
// Subscriptions store, and pseudoRoom (= {...sub, ...room}) ends up with
// no `u` for the canMoveQueue check.
let subscription: ReturnType<ReturnType<typeof getDdpSdk>['client']['subscribe']> | undefined;
let offCollection: (() => void) | undefined;
let stopped = false;

void ensureConnectedAndAuthenticated()
.catch(() => undefined)
.then(() => {
if (stopped) return;
const sdk = getDdpSdk();
subscription = sdk.client.subscribe(`stream-${streamName}`, key, { useCollection: false, args });

subscription
.ready()
.then(() => {
if (stopped) return;
meta.ready = true;
ee.emit('ready', [undefined, { msg: 'ready', subs: [subscription!.id] }]);
})
.catch((err) => {
if (stopped) return;
ee.emit('ready', [err]);
ee.emit('error', err);
});

const sub = Meteor.connection.subscribe(
`stream-${streamName}`,
key,
{ useCollection: false, args },
{
onReady: (args: any) => {
meta.ready = true;
ee.emit('ready', [undefined, args]);
},
onError: (err: any) => {
ee.emit('ready', [err]);
ee.emit('error', err);
},
onStop: () => {
ee.emit('stop');
},
},
);
offCollection = sdk.client.onCollection(`stream-${streamName}`, (data: any) => {
if (data?.msg !== 'changed') return;
if (data.collection !== `stream-${streamName}`) return;
if (data.fields?.eventName !== key) return;
streamProxy.emit(`stream-${streamName}/${key}` as keyof EventMap, data.fields.args);
});
});

const onChange: ReturnType<ClientStream['subscribe']>['onChange'] = (cb) => {
if (meta.ready) {
cb({
msg: 'ready',

subs: [],
});
cb({ msg: 'ready', subs: [] });
return;
}
ee.once('ready', ([error, result]) => {
if (error) {
cb({
msg: 'nosub',

id: '',
error,
});
cb({ msg: 'nosub', id: '', error });
return;
}

cb(result);
});
};

const ready = () => {
if (meta.ready) {
return Promise.resolve();
}
return new Promise<void>((resolve, reject) => {
ee.once('ready', ([err]) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
};

return {
stop: sub.stop,
stop: () => {
// Mirror Meteor's subscription semantics: explicit stop() does not fire the
// 'stop' event (onStop is reserved for server-initiated closures).
// Emitting it here would recurse through the onStop handler that
// createStreamManager registers, which itself iterates the unsubList.
stopped = true;
offCollection?.();
subscription?.stop();
},
onChange,
ready,
onError: (cb: (...args: any[]) => void) =>
ee.once('error', (error) => {
cb(error);
}),

ready: () => {
if (meta.ready) return Promise.resolve();
return new Promise<void>((resolve, reject) => {
ee.once('ready', ([err]) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
},
onError: (cb: (...args: any[]) => void) => ee.once('error', (error) => cb(error)),
onStop: (cb: () => void) => ee.once('stop', cb),
get isReady() {
return meta.ready;
Expand All @@ -157,14 +151,6 @@ const createStreamManager = () => {
});
});

Meteor.connection._stream.on('message', (rawMsg: string) => {
const msg = DDPCommon.parseDDP(rawMsg);
if (!isChangedCollectionPayload(msg)) {
return;
}
streamProxy.emit(`${msg.collection}/${msg.fields.eventName}` as any, msg.fields.args as any);
});

const stream: SDK['stream'] = <N extends StreamNames, K extends StreamKeys<N>>(
name: N,
data: [key: K, ...args: unknown[]],
Expand All @@ -186,7 +172,8 @@ const createStreamManager = () => {

streamProxy.on(eventLiteral, proxyCallback);

const stream = streams.get(eventLiteral) || createNewMeteorStream(name, key, args);
const stream =
streams.get(eventLiteral) || createNewDdpSdkStream(streamProxy, name as StreamNames, key as StreamKeys<StreamNames>, args);

const stop = (): void => {
streamProxy.off(eventLiteral, proxyCallback);
Expand Down Expand Up @@ -242,29 +229,25 @@ export const createSDK = (rest: RestClientInterface) => {
const { stream, stopAll } = createStreamManager();

const publish = (name: string, args: unknown[]) => {
Meteor.call(`stream-${name}`, ...args);
// DDPSDK queues outbound frames until the WebSocket handshake completes,
// so there's no need to gate on an isReady flag here.
void getDdpSdk().client.callAsync(`stream-${name}`, ...args);
};

// Methods route through Meteor.callAsync which goes through
// Meteor.connection._send → ddpOverREST → REST. Going via the DDPSDK
// socket directly would bypass that wrapper and hit the same
// authenticated-but-empty-result race the cached stores caught.
const call = <T extends keyof ServerMethods>(method: T, ...args: Parameters<ServerMethods[T]>): Promise<ReturnType<ServerMethods[T]>> => {
return Meteor.callAsync(method, ...args);
};

const disconnect = () => {
Meteor.disconnect();
};

const reconnect = () => {
Meteor.reconnect();
};

return {
rest,
stop: stopAll,
stream,
publish,
call,
disconnect,
reconnect,
};
};

Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/client/hooks/useFormatDate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import { useCallback } from 'react';
import { formatDate } from '../lib/utils/dateFormat';

export const useFormatDate = () => {
const format = useSetting('Message_DateFormat');
return useCallback((time: string | Date | number) => formatDate(time, String(format)), [format]);
const format = useSetting('Message_DateFormat', 'LL');
return useCallback((time: string | Date | number) => formatDate(time, format), [format]);
};
6 changes: 5 additions & 1 deletion apps/meteor/client/lib/cachedStores/CachedStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ export abstract class CachedStore<T extends IRocketChatRecord, U = T> implements

protected eventType: StreamNames;

private readonly version = 18;
// Bumped from 18 → 19 to invalidate caches populated before the DDPSDK
// wire encoding was switched from JSON to EJSON. Entries written by the
// JSON window stored dates as ISO strings instead of Date instances, so
// fields like subscription.ls would fail `.getTime()` when read back.
private readonly version = 19;

private updatedAt = new Date(0);

Expand Down
43 changes: 38 additions & 5 deletions apps/meteor/client/lib/loggedIn.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,47 @@
import { Accounts } from 'meteor/accounts-base';

import { getUserId } from './user';
import { getUserId, userIdStore } from './user';

const isLoggedIn = () => {
const uid = getUserId();
return !!uid;
};

/**
* Fire `cb` whenever the local userId transitions from absent → present.
*
* `Accounts.onLogin` would normally cover this, but Meteor only invokes
* the onLogin hook from inside a Tracker.autorun that waits for
* `Meteor.userAsync()` to resolve to a real user doc. When a login goes
* through our REST fallback (e.g. logout → fresh login while DDPSDK is
* reconnecting), the user document never lands in Meteor.users — it
* normally arrives as a DDP collection frame, but the REST endpoint
* only returns the method result. The autorun then sees a null user
* forever, and onLogin never fires. By piggybacking on userIdStore (which
* is updated synchronously the moment Accounts.connection.userId() is
* set), we get a reliable login signal regardless of how the user doc
* eventually arrives.
*/
const subscribeToLogin = (handler: () => void): (() => void) => {
let lastSeen = userIdStore.getState();
return userIdStore.subscribe((next) => {
if (next === lastSeen) return;
const wasLoggedOut = !lastSeen;
lastSeen = next;
if (next && wasLoggedOut) {
handler();
}
});
};

export const whenLoggedIn = () => {
if (isLoggedIn()) {
return Promise.resolve();
}

return new Promise<void>((resolve) => {
const subscription = Accounts.onLogin(() => {
subscription.stop();
const stop = subscribeToLogin(() => {
stop();
resolve();
});
});
Expand All @@ -30,11 +57,17 @@ export const onLoggedIn = (cb: (() => () => void) | (() => Promise<() => void>)
}
};

const subscription = Accounts.onLogin(handler);
// Belt-and-braces: still register with Accounts.onLogin so consumers
// pick up loginDetails when Meteor's own autorun does fire (resume on
// page load, where the user doc lands via DDP and unblocks the
// autorun). The userIdStore subscription covers everything else.
const accountsSubscription = Accounts.onLogin(handler);
const stopUserIdSubscription = subscribeToLogin(handler);
if (isLoggedIn()) handler();

return () => {
subscription.stop();
accountsSubscription.stop();
stopUserIdSubscription();
cleanup?.();
};
};
Loading
Loading