Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,34 @@ const job: IJobDefinition = {
)
).count

// Break down errors by errorMessage + location, enriched with platform info
// Break down errors by errorMessage + location, enriched with platform info.
// Prefer metadata.errorMessage when set — the data sink worker writes specific
// values there (e.g. "noMerge blocked", "merge failed") so those surface as
// distinct groups rather than collapsing into the generic outer errorMessage.
const errorGroups = await dbConnection.any<IErrorGroup>(
`
SELECT
COALESCE(r.error->>'errorMessage', '[no errorMessage]') AS "errorMessage",
COALESCE(r.error->>'location', '[no location]') AS location,
count(*)::int AS count,
round(avg(r.retries), 1)::float AS "avgRetries",
max(r.retries)::int AS "maxRetries",
min(r."createdAt") AS oldest,
max(r."updatedAt") AS newest,
COALESCE(
r.error->'metadata'->>'errorMessage',
r.error->>'errorMessage',
'[no errorMessage]'
) AS "errorMessage",
COALESCE(r.error->>'location', '[no location]') AS location,
count(*)::int AS count,
round(avg(r.retries), 1)::float AS "avgRetries",
max(r.retries)::int AS "maxRetries",
min(r."createdAt") AS oldest,
max(r."updatedAt") AS newest,
string_agg(DISTINCT i.platform, ', ' ORDER BY i.platform) AS platforms
FROM integration.results r
LEFT JOIN integrations i ON i.id = r."integrationId"
WHERE r.state = 'error'
GROUP BY
r.error->>'errorMessage',
COALESCE(
r.error->'metadata'->>'errorMessage',
r.error->>'errorMessage',
'[no errorMessage]'
),
r.error->>'location'
ORDER BY count DESC
LIMIT 20
Expand Down
191 changes: 128 additions & 63 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,6 @@ export default class ActivityService extends LoggerBase {
reach: value.member.reach,
},
value.platform,
undefined,
orgPromiseCache,
value.timestamp,
)
Expand Down Expand Up @@ -1313,16 +1312,27 @@ export default class ActivityService extends LoggerBase {
dbMemberIdentities = await findIdentitiesForMembers(this.pgQx, Array.from(memberIds))
}

// Tracks merge redirects across payloads in this batch. When a member update redirects
// X→Y (merge), subsequent payloads for the same platform:username skip the update and
// reuse Y — avoids calling update() on an already-absorbed member row.
const memberMap = new Map<string, string>()

for (const payload of relevantPayloads) {
// contains the merged member ids
const memberMap = new Map<string, string>()
const memberKey = `${payload.platform}:${payload.activity.username}`
const objectMemberKey = `${payload.platform}:${payload.activity.objectMemberUsername}`
// When actor and objectActor are the same person, skip the objectMember update and
// copy memberId after Promise.all resolves.
const sameActorKey = !!(
payload.dbMember &&
payload.dbObjectMember &&
memberKey === objectMemberKey
)

const promises = []
// update members and orgs with them
if (payload.dbMember) {
const key = `${payload.platform}:${payload.activity.username}`
if (memberMap.has(key)) {
payload.memberId = memberMap.get(key)
if (memberMap.has(memberKey)) {
payload.memberId = memberMap.get(memberKey)
} else {
Comment thread
themarolt marked this conversation as resolved.
promises.push(
memberService
Expand All @@ -1342,12 +1352,14 @@ export default class ActivityService extends LoggerBase {
payload.dbMember,
dbMemberIdentities.get(payload.dbMember.id),
payload.platform,
undefined,
orgPromiseCache,
payload.activity.timestamp,
)
.then(() => {
payload.memberId = payload.dbMember.id
.then((redirectId?: string) => {
payload.memberId = redirectId ?? payload.dbMember.id
if (redirectId) {
memberMap.set(memberKey, redirectId)
}
Comment thread
themarolt marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
})
.catch(async (err) => {
const result = await this.handleMemberIdentityError(
Expand All @@ -1359,7 +1371,7 @@ export default class ActivityService extends LoggerBase {
if (result) {
if (typeof result === 'string') {
payload.memberId = result
memberMap.set(key, result)
memberMap.set(memberKey, result)
} else {
resultMap.set(payload.resultId, {
success: false,
Expand All @@ -1378,10 +1390,9 @@ export default class ActivityService extends LoggerBase {
}
}

if (payload.dbObjectMember) {
const key = `${payload.platform}:${payload.activity.objectMemberUsername}`
if (memberMap.has(key)) {
payload.objectMemberId = memberMap.get(key)
if (payload.dbObjectMember && !sameActorKey) {
if (memberMap.has(objectMemberKey)) {
payload.objectMemberId = memberMap.get(objectMemberKey)
Comment thread
themarolt marked this conversation as resolved.
} else {
promises.push(
memberService
Expand All @@ -1401,12 +1412,14 @@ export default class ActivityService extends LoggerBase {
payload.dbObjectMember,
dbMemberIdentities.get(payload.dbObjectMember.id),
payload.platform,
undefined,
orgPromiseCache,
payload.activity.timestamp,
)
.then(() => {
payload.objectMemberId = payload.dbObjectMember.id
.then((redirectId?: string) => {
payload.objectMemberId = redirectId ?? payload.dbObjectMember.id
if (redirectId) {
memberMap.set(objectMemberKey, redirectId)
}
})
.catch(async (err) => {
const result = await this.handleMemberIdentityError(
Expand All @@ -1418,7 +1431,7 @@ export default class ActivityService extends LoggerBase {
if (result) {
if (typeof result === 'string') {
payload.objectMemberId = result
memberMap.set(key, result)
memberMap.set(objectMemberKey, result)
} else {
resultMap.set(payload.resultId, {
success: false,
Expand All @@ -1439,6 +1452,10 @@ export default class ActivityService extends LoggerBase {

await Promise.all(promises)

if (sameActorKey) {
payload.objectMemberId = payload.memberId
}

if (resultMap.has(payload.resultId)) {
continue
}
Expand Down Expand Up @@ -1696,65 +1713,63 @@ export default class ActivityService extends LoggerBase {
error.constructor &&
error.constructor.name === 'DatabaseError' &&
error.constraint &&
error.constraint === 'uix_memberIdentities_platform_value_type_verified' &&
error.detail
error.constraint === 'uix_memberIdentities_platform_value_type_verified'
) {
return true
}

return false
}

const extractMetadata = async (
error: any,
): Promise<string | Record<string, unknown> | undefined> => {
const extractMetadata = async (): Promise<string | Record<string, unknown> | undefined> => {
const metadata: Record<string, unknown> = {}

// extract the platform, value, type from the detail
const detail = error.detail
const regex = /\(platform, value, type\)=\((.*?)\)/
const match = detail.match(regex)

if (!match || match.length < 2) {
return
}

// Split the matched string by commas
const values = match[1].split(',').map((val) => val.trim())
const incomingIdentities =
memberType === 'member'
? payload.activity.member.identities
: payload.activity.objectMember.identities
const verifiedIncoming = incomingIdentities.filter((i) => i.verified)
Comment thread
themarolt marked this conversation as resolved.
Comment thread
themarolt marked this conversation as resolved.

// Extract platform, value, and type
const [platform, value, type] = values
metadata.verifiedIdentities = verifiedIncoming

metadata.erroredVerifiedIdentity = {
platform,
value,
type,
if (verifiedIncoming.length === 0) {
return undefined
}

const membersWithIdentity = await findMembersByIdentities(
this.pgQx,
[
{
platform,
value,
type,
verified: true,
} as IMemberIdentity,
],
undefined,
true,
)
// Use the structured identities array to find the owner — avoids fragile Postgres
// Detail text parsing (format not stable; breaks if value contains a comma).
const owners = await findMembersByIdentities(this.pgQx, verifiedIncoming, undefined, true)
Comment thread
themarolt marked this conversation as resolved.

// Map keys are `${platform}:${type}:${value}` (from db rows). Match case-insensitively.
let conflictIdentity: IMemberIdentity | undefined
let ownerId: string | undefined
outer: for (const id of verifiedIncoming) {
for (const [key, oid] of owners) {
const sep1 = key.indexOf(':')
const sep2 = key.indexOf(':', sep1 + 1)
if (sep1 < 0 || sep2 < 0) continue
if (
key.slice(0, sep1) === id.platform &&
key.slice(sep1 + 1, sep2) === id.type &&
key.slice(sep2 + 1).toLowerCase() === id.value.toLowerCase()
) {
conflictIdentity = id
ownerId = oid
break outer
}
}
}

if (memberType === 'member') {
metadata.verifiedIdentities = payload.activity.member.identities.filter((i) => i.verified)
} else {
metadata.verifiedIdentities = payload.activity.objectMember.identities.filter(
(i) => i.verified,
)
if (conflictIdentity) {
metadata.erroredVerifiedIdentity = {
platform: conflictIdentity.platform,
value: conflictIdentity.value,
type: conflictIdentity.type,
}
}

if (membersWithIdentity.size > 0) {
metadata.memberWithIdentity = membersWithIdentity.values().next().value
if (ownerId) {
metadata.memberWithIdentity = ownerId
}

if (dbMember) {
Expand All @@ -1768,6 +1783,20 @@ export default class ActivityService extends LoggerBase {
}
}

if (
metadata.memberWithIdentity &&
metadata.memberIdToUpdate &&
metadata.memberWithIdentity === metadata.memberIdToUpdate
) {
// The member already owns the conflicting identity — stale prefetch race.
// The identity is already present so treat this as a no-op success.
this.log.warn(
{ memberId: metadata.memberIdToUpdate, identity: metadata.erroredVerifiedIdentity },
'Verified identity already belongs to this member (stale prefetch) — treating as success',
)
return metadata.memberIdToUpdate as string
Comment thread
themarolt marked this conversation as resolved.
Comment thread
themarolt marked this conversation as resolved.
}
Comment thread
themarolt marked this conversation as resolved.

if (
metadata.memberWithIdentity &&
metadata.memberIdToUpdate &&
Expand All @@ -1789,33 +1818,69 @@ export default class ActivityService extends LoggerBase {
return originalId
} else {
metadata.noMerge = true
metadata.errorMessage = 'noMerge blocked — verified identity conflict'
}
} catch (err) {
metadata.mergeError = {
errorMessage: err?.message ?? '<no error message>',
errorStack: err?.stack,
err,
}
metadata.errorMessage = 'merge failed — auto-merge threw an error'
}
}

if (!metadata.errorMessage) {
metadata.errorMessage = 'verified identity conflict — identity owner not found'
}

return metadata
}

if (error instanceof ApplicationError && error.metadata?.mergeCount !== undefined) {
return {
...error.metadata,
errorMessage: error.message,
memberType,
memberIdToUpdate: dbMember?.id,
memberSource:
memberType === 'member' ? payload.dbMemberSource : payload.dbObjectMemberSource,
}
}

// syncIdentitiesAfterRedirect wraps constraint errors with the current survivingId when the
// original member has already been absorbed by a prior merge. Unwrap and re-handle using a
// synthetic dbMember with the surviving ID so mergeIfAllowed targets the right member.
if (
error instanceof ApplicationError &&
error.metadata?.survivingId !== undefined &&
error.originalError
) {
const survivingDbMember = dbMember
? { ...dbMember, id: error.metadata.survivingId as string }
: undefined
return this.handleMemberIdentityError(
error.originalError,
payload,
memberType,
survivingDbMember,
)
}

if (error instanceof ApplicationError) {
let nextError: any = error.originalError

while (nextError) {
if (checkForIdentityConstraint(nextError)) {
return extractMetadata(nextError)
return extractMetadata()
} else if (nextError instanceof ApplicationError) {
nextError = nextError.originalError
} else {
nextError = undefined
}
}
} else if (checkForIdentityConstraint(error)) {
return extractMetadata(error)
return extractMetadata()
}

return undefined
Expand Down
Loading
Loading