Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,34 @@ const job: IJobDefinition = {
)
).count

// Break down errors by errorMessage + location, enriched with platform info
// Break down errors by errorMessage + location, enriched with platform info.
// Prefer metadata.errorMessage when set — the data sink worker writes specific
// values there (e.g. "noMerge blocked", "merge failed") so those surface as
// distinct groups rather than collapsing into the generic outer errorMessage.
const errorGroups = await dbConnection.any<IErrorGroup>(
`
SELECT
COALESCE(r.error->>'errorMessage', '[no errorMessage]') AS "errorMessage",
COALESCE(r.error->>'location', '[no location]') AS location,
count(*)::int AS count,
round(avg(r.retries), 1)::float AS "avgRetries",
max(r.retries)::int AS "maxRetries",
min(r."createdAt") AS oldest,
max(r."updatedAt") AS newest,
COALESCE(
r.error->'metadata'->>'errorMessage',
r.error->>'errorMessage',
'[no errorMessage]'
) AS "errorMessage",
COALESCE(r.error->>'location', '[no location]') AS location,
count(*)::int AS count,
round(avg(r.retries), 1)::float AS "avgRetries",
max(r.retries)::int AS "maxRetries",
min(r."createdAt") AS oldest,
max(r."updatedAt") AS newest,
string_agg(DISTINCT i.platform, ', ' ORDER BY i.platform) AS platforms
FROM integration.results r
LEFT JOIN integrations i ON i.id = r."integrationId"
WHERE r.state = 'error'
GROUP BY
r.error->>'errorMessage',
COALESCE(
r.error->'metadata'->>'errorMessage',
r.error->>'errorMessage',
'[no errorMessage]'
),
r.error->>'location'
ORDER BY count DESC
LIMIT 20
Expand Down
67 changes: 60 additions & 7 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,6 @@ export default class ActivityService extends LoggerBase {
reach: value.member.reach,
},
value.platform,
undefined,
orgPromiseCache,
value.timestamp,
)
Expand Down Expand Up @@ -1342,12 +1341,14 @@ export default class ActivityService extends LoggerBase {
payload.dbMember,
dbMemberIdentities.get(payload.dbMember.id),
payload.platform,
undefined,
orgPromiseCache,
payload.activity.timestamp,
)
.then(() => {
payload.memberId = payload.dbMember.id
.then((redirectId?: string) => {
payload.memberId = redirectId ?? payload.dbMember.id
if (redirectId) {
memberMap.set(key, redirectId)
}
Comment thread
themarolt marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
})
.catch(async (err) => {
const result = await this.handleMemberIdentityError(
Expand Down Expand Up @@ -1401,12 +1402,14 @@ export default class ActivityService extends LoggerBase {
payload.dbObjectMember,
dbMemberIdentities.get(payload.dbObjectMember.id),
payload.platform,
undefined,
orgPromiseCache,
payload.activity.timestamp,
)
.then(() => {
payload.objectMemberId = payload.dbObjectMember.id
.then((redirectId?: string) => {
payload.objectMemberId = redirectId ?? payload.dbObjectMember.id
if (redirectId) {
memberMap.set(key, redirectId)
}
})
.catch(async (err) => {
const result = await this.handleMemberIdentityError(
Expand Down Expand Up @@ -1768,6 +1771,20 @@ export default class ActivityService extends LoggerBase {
}
}

if (
metadata.memberWithIdentity &&
metadata.memberIdToUpdate &&
metadata.memberWithIdentity === metadata.memberIdToUpdate
) {
// The member already owns the conflicting identity — stale prefetch race.
// The identity is already present so treat this as a no-op success.
this.log.warn(
{ memberId: metadata.memberIdToUpdate, identity: metadata.erroredVerifiedIdentity },
'Verified identity already belongs to this member (stale prefetch) — treating as success',
)
return metadata.memberIdToUpdate as string
Comment thread
themarolt marked this conversation as resolved.
Comment thread
themarolt marked this conversation as resolved.
}
Comment thread
themarolt marked this conversation as resolved.

if (
metadata.memberWithIdentity &&
metadata.memberIdToUpdate &&
Expand All @@ -1789,19 +1806,55 @@ export default class ActivityService extends LoggerBase {
return originalId
} else {
metadata.noMerge = true
metadata.errorMessage = 'noMerge blocked — verified identity conflict'
}
} catch (err) {
metadata.mergeError = {
errorMessage: err?.message ?? '<no error message>',
errorStack: err?.stack,
err,
}
metadata.errorMessage = 'merge failed — auto-merge threw an error'
}
}

if (!metadata.errorMessage) {
metadata.errorMessage = 'verified identity conflict — identity owner not found'
}

return metadata
}

if (error instanceof ApplicationError && error.metadata?.mergeCount !== undefined) {
return {
...error.metadata,
errorMessage: error.message,
memberType,
memberIdToUpdate: dbMember?.id,
memberSource:
memberType === 'member' ? payload.dbMemberSource : payload.dbObjectMemberSource,
}
}

// syncIdentitiesAfterRedirect wraps constraint errors with the current survivingId when the
// original member has already been absorbed by a prior merge. Unwrap and re-handle using a
// synthetic dbMember with the surviving ID so mergeIfAllowed targets the right member.
if (
error instanceof ApplicationError &&
error.metadata?.survivingId !== undefined &&
error.originalError
) {
const survivingDbMember = dbMember
? { ...dbMember, id: error.metadata.survivingId as string }
: undefined
return this.handleMemberIdentityError(
error.originalError,
payload,
memberType,
survivingDbMember,
)
}

if (error instanceof ApplicationError) {
let nextError: any = error.originalError

Expand Down
Loading
Loading