Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions apps/webapp/src/script/mls/MLSConversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ export async function initMLSGroupConversation(
onError?: (conversation: Conversation, error: unknown) => void;
},
): Promise<void> {
logger.info('Initialising MLS group conversation', {
conversationId: mlsConversation.qualifiedId,
groupId: mlsConversation.groupId,
});
const {mls: mlsService, conversation: conversationService} = core.service || {};
if (!mlsService || !conversationService) {
throw new Error('MLS or Conversation service is not available!');
Expand All @@ -112,7 +116,6 @@ export async function initMLSGroupConversation(
await conversationRepository.ensureConversationExists({
groupId,
conversationId: qualifiedId,
epoch: mlsConversation.epoch,
core,
});

Expand All @@ -138,6 +141,7 @@ export async function initialiseSelfAndTeamConversations(
selfClientId: string,
core: Account,
): Promise<void> {
logger.info('Initialising self and team conversations');
const {mls: mlsService, conversation: conversationService} = core.service || {};
if (!mlsService || !conversationService) {
throw new Error('MLS or Conversation service is not available!');
Expand Down Expand Up @@ -169,17 +173,15 @@ export async function initialiseSelfAndTeamConversations(
return Promise.resolve();
}

logger.info('Conversation does not exist, ensuring establishment', {
logger.info('Conversation is not established, trying to establish', {
conversationId: conversation.qualifiedId,
groupId: conversation.groupId,
epoch: conversation.epoch,
});

// Otherwise, we need to ensure the conversation exists by establishing it or joining it by external commit.
await conversationRepository.ensureConversationExists({
conversationId: conversation.qualifiedId,
groupId: conversation.groupId,
epoch: conversation.epoch,
core,
});
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2223,50 +2223,57 @@ export class ConversationRepository {
public ensureConversationExists = async ({
conversationId,
groupId,
epoch,
core = this.core,
retry = true,
}: {
conversationId: QualifiedId;
groupId: string;
epoch: number;
core?: Account;
retry?: boolean;
}): Promise<void> => {
this.logger.info('Ensuring conversation exists', {conversationId, groupId, epoch});
if (await this.conversationService.mlsGroupExistsLocally(groupId)) {
const coreCryptoEpochNumber = await core.service?.mls?.getEpoch(groupId);
this.logger.info('Conversation already exists locally', {conversationId, groupId, epoch, coreCryptoEpochNumber});
if (coreCryptoEpochNumber === 0) {
if (!retry) {
this.logger.error('Epoch is 0, but retry is false, not retrying again', {
conversationId,
groupId,
epoch,
coreCryptoEpochNumber,
});
return;
}
return this.recoverFromLocalUnestablishedMLSConversations({
conversationId,
groupId,
epoch: coreCryptoEpochNumber,
core,
});
}
const conversationExistsOnCoreCrypto = await this.conversationService.mlsGroupExistsLocally(groupId);
const coreCryptoEpochNumberResult = await core.service?.mls?.getSafeEpoch(groupId);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid optional chaining here and introduce an explicit type guard for the MLS service before calling getSafeEpoch? Right now core.service?.mls?.getSafeEpoch(groupId) can return undefined, but line 2234 treats the result as present.

const coreCryptoEpochNumber = coreCryptoEpochNumberResult.isOk ? coreCryptoEpochNumberResult.value : undefined;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return or throw immediately on coreCryptoEpochNumberResult.isErr instead of logging and continuing? A failed epoch read means we do not have a valid number to reason about. Continuing mixes the error case into the normal control flow.


this.logger.info('Ensuring conversation exists', {
conversationId,
groupId,
coreCryptoEpochNumber,
conversationExistsOnCoreCrypto,
});

if (coreCryptoEpochNumberResult.isErr) {
this.logger.warn(
'A conversation exists on core crypto but there was an error when retrieving its epoch number',
coreCryptoEpochNumberResult.error,
);
}

if (conversationExistsOnCoreCrypto && coreCryptoEpochNumber > 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid comparing a possibly undefined value here? coreCryptoEpochNumber is undefined when the result is Err, so this condition depends on a value that is not cleanly narrowed.

this.logger.info('Conversation is established and epoch is greater than 0, no action needed');
// return early so we don't make unnecessary calls to the backend to fetch remote conversation
return;
}

// establish the conversation if epoch is 0
if (epoch === 0) {
this.logger.info('Establishing conversation as epoch is 0', {conversationId, groupId, epoch});
await this.establishMlsGroupConversation({conversationId, groupId, epoch, core});
if (conversationExistsOnCoreCrypto && coreCryptoEpochNumber === 0) {
this.logger.info('Conversation already exists locally but epoch is 0, wiping it');

// The conversation was created locally but the initial commit was never accepted by the backend
// Wipe the conversation locally and join by external commit
await core.service?.conversation?.wipeMLSConversation(groupId);
}

const remoteConversation = await this.conversationService.getConversationById(conversationId);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an explicit type guard for remoteConversation.epoch right after fetching the remote conversation? This method’s behavior depends entirely on epoch being a valid number, but the current code accepts any shape and lets invalid values fall through.

const remoteEpoch = remoteConversation.epoch;

// establish the conversation if remote epoch is 0
if (remoteEpoch === 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the epoch decision exhaustive instead of relying on partial conditions? Or in other words: this handles 0 but there is no corresponding explicit branch for invalid epoch values.

this.logger.info('Establishing conversation as remote epoch is 0', {remoteEpoch});
await this.establishMlsGroupConversation({conversationId, groupId, epoch: remoteEpoch, core});
return;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a missing/invalid remote epoch, now silently no-ops. The removed recovery code threw when the remote epoch was unavailable. The new implementation does nothing if remoteConversation.epoch is undefined, null or otherwise falsy but not exactly 0. In the message-send path, ensureConversationExists can return successfully and then sending continues.

}

// join by external commit
this.logger.info('Joining conversation by external commit', {conversationId, epoch});
if (epoch && epoch > 0) {
if (remoteEpoch && remoteEpoch > 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid the truthy/falsy check here?

if (remoteEpoch && remoteEpoch > 0) conflates “missing”, “zero”, and “invalid” with normal control flow. Or in other words: undefined or null will skip joining and the method still resolves.

this.logger.info('Joining conversation by external commit', {remoteEpoch});
await this.core.service?.conversation?.joinByExternalCommit(conversationId);
}
};
Expand Down Expand Up @@ -2310,55 +2317,6 @@ export class ConversationRepository {
);
};

/**
* Recovers from local unestablished MLS conversations by refetching metadata and re-establishing the conversation.
* This is typically needed when the local epoch is 0 but the epoch on backend is greater than 0
* indicating that the conversation has not been properly established.
* throws error in case both local and remote MLS group are at epoch 0 or remote epoch is not available
*/
private recoverFromLocalUnestablishedMLSConversations = async ({
conversationId,
groupId,
epoch,
core = this.core,
}: {
conversationId: QualifiedId;
groupId: string;
epoch: number;
core?: Account;
}) => {
try {
this.logger.info('Epoch is 0, refetching conversation metadata and re-establishing', {
conversationId,
groupId,
epoch,
});
await core.service?.conversation?.wipeMLSConversation(groupId);
const remoteConversation = await this.conversationService.getConversationById(conversationId);
const remoteEpoch = remoteConversation.epoch;
if (!remoteEpoch) {
this.logger.error('Remote epoch is not available!', {remoteConversation});
throw new Error('Remote epoch is not available!');
}
if (remoteEpoch === epoch) {
const errorMessage =
'Cannot recover: both local and remote MLS group are at epoch 0, the conversation was never established on the backend';
this.logger.error(errorMessage, {remoteEpoch, epoch});
throw new Error(errorMessage);
}

return this.ensureConversationExists({conversationId, groupId, epoch: remoteEpoch, core, retry: false});
} catch (error: unknown) {
this.logger.error('Failed to recover from local unestablished MLS conversation', {
error,
conversationId,
groupId,
epoch,
});
throw error;
}
};

/**
* will locally delete conversations that no longer exist on backend side
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,10 +1003,10 @@ export class MessageRepository {

try {
if (isMLSConversation(conversation)) {
this.logger.info('Sending message to a MLS conversation, ensuring conversation exists');
await this.conversationRepositoryProvider().ensureConversationExists({
conversationId: conversation.qualifiedId,
groupId: conversation.groupId,
epoch: conversation.epoch,
});
}
const result = await this.conversationService.send(sendOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@
* The join operation itself is not automatically re-run by policy.
*/
public async joinByExternalCommit(conversationId: QualifiedId): Promise<void> {
this.logger.info('Joining MLS conversation via external commit (orchestrated)', {conversationId});
await this.MLSRecoveryOrchestrator.execute({
context: {operationName: OperationName.joinExternalCommit, qualifiedConversationId: conversationId},
callBack: () => this.performJoinByExternalCommitAPI(conversationId),
Expand All @@ -574,6 +575,7 @@

// Low-level API call for joining via external commit (no recovery logic)
private async performJoinByExternalCommitAPI(conversationId: QualifiedId): Promise<void> {
this.logger.info('Joining MLS conversation via external commit (low-level)', {conversationId});
await this.mlsService.joinByExternalCommit(() => this.apiClient.api.conversation.getGroupInfo(conversationId));
}

Expand Down Expand Up @@ -727,7 +729,7 @@
};

private async matchesEpoch(groupId: string, backendEpoch: number): Promise<boolean> {
const localEpoch = await this.mlsService.getEpoch(groupId);

Check warning on line 732 in libraries/core/src/conversation/conversationService/conversationService.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'(groupId: string | Uint8Array<ArrayBufferLike>): any' is deprecated.

See more on https://sonarcloud.io/project/issues?id=wireapp_wire-webapp&issues=AZ3ZNe78nQQXsZ059EIt&open=AZ3ZNe78nQQXsZ059EIt&pullRequest=21174

this.logger.debug(
`Comparing conversation's (group_id: ${groupId}) local and backend epoch number: {local: ${String(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ export const handleMLSMessageAdd = async ({

const groupIdBytes = Decoder.fromBase64(groupId).asBytes;

// Helpful for correlating decrypt failures with rejoin/epoch transitions.
const coreCryptoEpochNumber = await mlsService.getSafeEpoch(groupId);
logger.info('Decrypting MLS message-add payload', {
qualifiedConversationId,
groupId,
coreCryptoEpochNumber: coreCryptoEpochNumber.isOk ? coreCryptoEpochNumber.value : 'Error retrieving epoch number',
eventTime: event.time,
});

const decryptedMessage = await mlsService.decryptMessage(
new ConversationId(groupIdBytes),
encryptedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import {ConversationMLSMessageAddEvent, ConversationMLSWelcomeEvent} from '@wireapp/api-client/lib/event';
import {QualifiedId} from '@wireapp/api-client/lib/user';
import {Converter, Decoder, Encoder} from 'bazinga64';
import {Task, task} from 'true-myth';

import {APIClient} from '@wireapp/api-client';
import {LogFactory, TimeUtil, TypedEventEmitter} from '@wireapp/commons';
Expand Down Expand Up @@ -464,11 +465,29 @@
return {keyPackages: coreCryptoKeyPackagesPayload, failures};
}

/**
* @deprecated Use `getSafeEpoch` to receive a `Task` with explicit error handling.
*/
public getEpoch(groupId: string | Uint8Array) {
const groupIdBytes = typeof groupId === 'string' ? Decoder.fromBase64(groupId).asBytes : groupId;
return this.coreCryptoClient.conversationEpoch(new ConversationId(groupIdBytes));
}

/**
* Safely reads the current MLS epoch for a conversation.
*
* Returns a `Task` so errors are modeled in the type instead of surfacing
* as an uncaught promise rejection.
*
* A common rejection reason is a core-crypto error equivalent to:
* `MlsErrorOther: Couldn't find conversation` (for example when the group is
* not present on core-crypto level).
*/
public getSafeEpoch(groupId: string | Uint8Array): Task<number, unknown> {
const groupIdBytes = typeof groupId === 'string' ? Decoder.fromBase64(groupId).asBytes : groupId;
return task.fromPromise(this.coreCryptoClient.conversationEpoch(new ConversationId(groupIdBytes)));
}

public async joinByExternalCommit(getGroupInfo: () => Promise<Uint8Array>) {
try {
this.logger.info('Trying to join MLS group via external commit');
Expand All @@ -486,7 +505,7 @@
if (welcomeBundle.id) {
//after we've successfully joined via external commit, we schedule periodic key material renewal
const groupIdStr = Encoder.toBase64(welcomeBundle.id.copyBytes()).asString;
const newEpoch = await this.getEpoch(groupIdStr);

Check warning on line 508 in libraries/core/src/messagingProtocols/mls/mlsService/mlsService.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'(groupId: string | Uint8Array<ArrayBufferLike>): any' is deprecated.

See more on https://sonarcloud.io/project/issues?id=wireapp_wire-webapp&issues=AZ3ZNezCnQQXsZ059EIr&open=AZ3ZNezCnQQXsZ059EIr&pullRequest=21174

// Schedule the next key material renewal
await this.scheduleKeyMaterialRenewal(groupIdStr);
Expand Down Expand Up @@ -536,7 +555,15 @@
this.logger.info('Message decrypted successfully', {qualifiedConversationId, duration: Date.now() - start});
return decryptedMessage;
} catch (error: unknown) {
this.logger.warn('Failed to decrypt MLS message', {qualifiedConversationId, error});
// This is safe (read-only) and helps correlate decryption failures with local epoch.
const coreCryptoEpochNumber = await task.fromPromise(this.coreCryptoClient.conversationEpoch(conversationId));
this.logger.warn('Failed to decrypt MLS message', {
qualifiedConversationId,
coreCryptoEpochNumber: coreCryptoEpochNumber.isOk
? coreCryptoEpochNumber.value
: 'Error retrieving epoch number',
error,
});
// According to CoreCrypto JS doc on .decryptMessage method, we should ignore some errors (corecrypto handle them internally)
if (shouldMLSDecryptionErrorBeIgnored(error)) {
return {
Expand Down Expand Up @@ -802,7 +829,7 @@
*/
public async isConversationEstablished(groupId: string): Promise<boolean> {
const doesConversationExist = await this.conversationExists(groupId);
return doesConversationExist && (await this.getEpoch(groupId)) > 0;

Check warning on line 832 in libraries/core/src/messagingProtocols/mls/mlsService/mlsService.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

'(groupId: string | Uint8Array<ArrayBufferLike>): any' is deprecated.

See more on https://sonarcloud.io/project/issues?id=wireapp_wire-webapp&issues=AZ3ZNezCnQQXsZ059EIs&open=AZ3ZNezCnQQXsZ059EIs&pullRequest=21174
}

public async clientValidKeypackagesCount(): Promise<number> {
Expand Down Expand Up @@ -1010,6 +1037,7 @@

await this.schedulePendingProposalsTask(groupId, firingDate);
} else {
this.logger.info('Trying to commit pending proposals immediately', {groupId});
await this.commitPendingProposals(groupId);
}
}
Expand All @@ -1018,7 +1046,10 @@
await this.coreDatabase.put('pendingProposals', {groupId, firingDate}, groupId);

TaskScheduler.addTask({
task: () => this.commitPendingProposals(groupId),
task: () => {
this.logger.info('Trying to commit pending proposals from scheduled task', {groupId});
return this.commitPendingProposals(groupId);
},
firingDate,
key: this.createPendingProposalsTaskKey(groupId),
});
Expand Down Expand Up @@ -1098,7 +1129,12 @@
}

TaskScheduler.addTask({
task: () => this.commitPendingProposals(groupId),
task: () => {
this.logger.info('Trying to commit pending proposals from startup rehydration', {
groupId,
});
return this.commitPendingProposals(groupId);
},
firingDate,
key: this.createPendingProposalsTaskKey(groupId),
});
Expand Down
Loading