From d9975110a6ed581500399a01d82dd7422d113674 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Fri, 24 Apr 2026 15:40:58 +0530 Subject: [PATCH 01/18] feat: infer memberOrganization stint dates from work-email activities Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- ...ganizations-email-domain-partial-index.sql | 3 + ...inferMemberOrganizationStintChanges.job.ts | 129 +++++++++ services/apps/data_sink_worker/package.json | 2 - .../src/bin/map-member-to-org.ts | 97 ------- .../src/bin/map-tenant-members-to-org.ts | 98 ------- .../src/service/activity.service.ts | 11 + .../src/service/member.service.ts | 56 +++- .../src/services/common.member.service.ts | 40 ++- .../common_services/src/services/index.ts | 2 +- .../src/services/member-organization.ts | 251 ++++++++++++++++++ .../src/services/member/unmerge.ts | 2 +- .../src/services/memberOrganization.ts | 113 -------- .../src/members/organizations.ts | 17 ++ .../data-access-layer/src/members/segments.ts | 20 +- .../repo/memberAffiliation.data.ts | 3 + services/libs/types/src/organizations.ts | 16 ++ 16 files changed, 542 insertions(+), 318 deletions(-) create mode 100644 backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql create mode 100644 services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts delete mode 100644 services/apps/data_sink_worker/src/bin/map-member-to-org.ts delete mode 100644 services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts create mode 100644 services/libs/common_services/src/services/member-organization.ts delete mode 100644 services/libs/common_services/src/services/memberOrganization.ts diff --git a/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql b/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql new file mode 100644 index 0000000000..404f5c18be --- /dev/null +++ b/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql @@ -0,0 +1,3 @@ +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain" + ON "memberOrganizations" ("memberId") + WHERE "source" = 'email-domain' AND "deletedAt" IS NULL; diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts new file mode 100644 index 0000000000..56fec83064 --- /dev/null +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -0,0 +1,129 @@ +import CronTime from 'cron-time-generator' +import { + inferMemberOrganizationStintChanges, + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE +} from '@crowd/common_services' +import { + createMemberOrganization, + fetchMemberOrganizationsBySource, + updateMemberOrganization, +} from '@crowd/data-access-layer' +import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' +import { pgpQx, QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' +import { REDIS_CONFIG, RedisClient, getRedisClient } from '@crowd/redis' +import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types' +import { IJobDefinition } from '../types' + +const job: IJobDefinition = { + name: 'infer-member-organization-stint-changes', + cronTime: CronTime.every(5).minutes(), + timeout: 10 * 60, + process: async (ctx) => { + const redis = await getRedisClient(REDIS_CONFIG()) + const db = await getDbConnection(WRITE_DB_CONFIG(), 2, 0) + const qx = pgpQx(db) + + // 1. Fetch a batch of work triggers (Member IDs) + const memberIds = await redis.sPop(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) + if (!memberIds?.length) return + + ctx.log.info({ count: memberIds.length }, 'Processing pending members.') + const stats = { processed: 0, inserts: 0, updates: 0 } + + for (const memberId of memberIds) { + try { + // 2. Get the activity dates for this member + const activityDates = await popMemberOrganizationActivityDates(redis, memberId) + + // If the member has no activity dates, move to the next member + if (activityDates.length === 0) continue + + // 3. Sync with existing database state + const existingOrgs = await fetchMemberOrganizationsBySource( + qx, + memberId, + OrganizationSource.EMAIL_DOMAIN, + ) + + // 4. Calculate required stint changes + const stintChanges = inferMemberOrganizationStintChanges( + memberId, + existingOrgs, + activityDates, + ) + + if (stintChanges.length === 0) continue + + const counts = { + inserts: stintChanges.filter((c) => c.type === 'insert').length, + updates: stintChanges.filter((c) => c.type === 'update').length, + } + + ctx.log.info({ memberId, ...counts }, 'Stint changes identified.') + + ctx.log.debug( + { memberId, activityDates, existingOrgs, stintChanges }, + 'Stint inference trace.', + ) + + // @todo: Enable writes after dry-run validation + // await applyStintChanges(qx, stintChanges) + + stats.processed++ + stats.inserts += counts.inserts + stats.updates += counts.updates + } catch (err) { + ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') + } + } + + ctx.log.info(stats, 'Batch inference complete.') + }, +} + +async function popMemberOrganizationActivityDates( + redis: RedisClient, + memberId: string, +): Promise { + const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + + // hGetAll + del in a multi block makes the "Pop" atomic for the entire Hash + const [hash] = (await redis.multi().hGetAll(key).del(key).exec()) as [Record | null, number] + + if (!hash || Object.keys(hash).length === 0) return [] + + return Object.entries(hash) + .flatMap(([organizationId, datesJson]) => + (JSON.parse(datesJson) as string[]).map((date) => ({ organizationId, date })), + ) + .sort((a, b) => a.date.localeCompare(b.date)) +} + +async function applyStintChanges( + qx: QueryExecutor, + stintChanges: MemberOrgStintChange[], +): Promise { + for (const change of stintChanges) { + if (change.type === 'insert') { + await createMemberOrganization(qx, change.memberId, { + organizationId: change.organizationId, + dateStart: change.dateStart, + dateEnd: change.dateEnd, + source: OrganizationSource.EMAIL_DOMAIN, + }) + continue + } + + if (!change.id) { + throw new Error('Missing id for update stint change.') + } + + await updateMemberOrganization(qx, change.memberId, change.id, { + dateStart: change.dateStart, + dateEnd: change.dateEnd, + }) + } +} + +export default job \ No newline at end of file diff --git a/services/apps/data_sink_worker/package.json b/services/apps/data_sink_worker/package.json index c62c1e20d8..f9a2363081 100644 --- a/services/apps/data_sink_worker/package.json +++ b/services/apps/data_sink_worker/package.json @@ -17,8 +17,6 @@ "script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts", "script:process-results": "SERVICE=script tsx src/bin/process-results.ts", "script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts", - "script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts", - "script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts", "script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts", "script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts", "script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts", diff --git a/services/apps/data_sink_worker/src/bin/map-member-to-org.ts b/services/apps/data_sink_worker/src/bin/map-member-to-org.ts deleted file mode 100644 index 7f63a2dc18..0000000000 --- a/services/apps/data_sink_worker/src/bin/map-member-to-org.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' -import { dbStoreQx, findIdentitiesForMembers } from '@crowd/data-access-layer' -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo' -import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo' -import { getServiceLogger } from '@crowd/logging' -import { QueueFactory } from '@crowd/queue' -import { getRedisClient } from '@crowd/redis' -import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal' -import { MemberIdentityType } from '@crowd/types' - -import { DB_CONFIG, QUEUE_CONFIG, REDIS_CONFIG, TEMPORAL_CONFIG } from '../conf' -import MemberService from '../service/member.service' -import { OrganizationService } from '../service/organization.service' - -const log = getServiceLogger() - -const processArguments = process.argv.slice(2) - -if (processArguments.length !== 1) { - log.error('Expected 1 argument: memberId') - process.exit(1) -} - -const memberId = processArguments[0] - -setImmediate(async () => { - let temporal: TemporalClient | undefined - if (TEMPORAL_CONFIG().serverUrl) { - temporal = await getTemporalClient(TEMPORAL_CONFIG()) - } - - const redis = await getRedisClient(REDIS_CONFIG()) - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - - const queueClient = QueueFactory.createQueueService(QUEUE_CONFIG()) - const emitter = new DataSinkWorkerEmitter(queueClient, log) - await emitter.init() - - const dataSinkRepo = new DataSinkRepository(store, log) - const memberRepo = new MemberRepository(store, log) - - const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(queueClient, log) - await searchSyncWorkerEmitter.init() - - const memberService = new MemberService(store, redis, temporal, log) - const orgService = new OrganizationService(store, log) - - try { - const members = await memberRepo.findByIds([memberId]) - - const member = members[0] - - if (!member) { - log.error({ memberId }, 'Member not found!') - process.exit(1) - } - - const identities = (await findIdentitiesForMembers(dbStoreQx(store), [memberId])).get(memberId) - log.info(`Processing memberId: ${member.id}`) - - const segmentIds = await dataSinkRepo.getSegmentIds() - const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id - - const emailIdentities = identities.filter( - (i) => i.verified && i.type === MemberIdentityType.EMAIL, - ) - if (emailIdentities.length > 0) { - const emails = emailIdentities.map((i) => i.value) - log.info({ memberId, emails }, 'Member emails!') - const orgs = await memberService.assignOrganizationByEmailDomain(null, emails) - - if (orgs.length > 0) { - log.info('Organizations found with matching email domains:', JSON.stringify(orgs)) - orgService.addToMember([segmentId], member.id, orgs) - - for (const org of orgs) { - await searchSyncWorkerEmitter.triggerOrganizationSync(org.id, true, segmentId) - } - - await searchSyncWorkerEmitter.triggerMemberSync(member.id, true, segmentId) - log.info('Done mapping member to organizations!') - } else { - log.info('No organizations found with matching email domains!') - } - } else { - log.info('No emails found for member!') - } - - process.exit(0) - } catch (err) { - log.error('Failed to map organizations for member!', err) - process.exit(1) - } -}) diff --git a/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts b/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts deleted file mode 100644 index 04d23a1179..0000000000 --- a/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo' -import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo' -import { getServiceLogger } from '@crowd/logging' -import { QueueFactory } from '@crowd/queue' -import { getRedisClient } from '@crowd/redis' -import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal' - -import { DB_CONFIG, QUEUE_CONFIG, REDIS_CONFIG, TEMPORAL_CONFIG } from '../conf' -import MemberService from '../service/member.service' -import { OrganizationService } from '../service/organization.service' - -const log = getServiceLogger() - -setImmediate(async () => { - let temporal: TemporalClient | undefined - if (TEMPORAL_CONFIG().serverUrl) { - temporal = await getTemporalClient(TEMPORAL_CONFIG()) - } - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - - const redis = await getRedisClient(REDIS_CONFIG()) - - const queueClient = QueueFactory.createQueueService(QUEUE_CONFIG()) - const emitter = new DataSinkWorkerEmitter(queueClient, log) - await emitter.init() - - const dataSinkRepo = new DataSinkRepository(store, log) - const memberRepo = new MemberRepository(store, log) - - const segmentIds = await dataSinkRepo.getSegmentIds() - const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id - - const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(queueClient, log) - await searchSyncWorkerEmitter.init() - - const memberService = new MemberService(store, redis, temporal, log) - const orgService = new OrganizationService(store, log) - - const limit = 100 - let offset = 0 - let processedMembers = 0 - - let currentMemberId = null - let currentEmails = null - - try { - const { totalCount } = await memberRepo.getMemberIdsAndEmailsAndCount(segmentIds, { - limit, - offset, - countOnly: true, - }) - - log.info(`Total members found in the tenant: ${totalCount}`) - - do { - const { members } = await memberRepo.getMemberIdsAndEmailsAndCount(segmentIds, { - limit, - offset, - }) - - // member -> organization based on email domain - for (const member of members) { - currentMemberId = member.id - currentEmails = member.emails - if (member.emails) { - const orgs = await memberService.assignOrganizationByEmailDomain(null, member.emails) - - if (orgs.length > 0) { - orgService.addToMember([segmentId], member.id, orgs) - - for (const org of orgs) { - await searchSyncWorkerEmitter.triggerOrganizationSync(org.id, true, segmentId) - } - - await searchSyncWorkerEmitter.triggerMemberSync(member.id, true, segmentId) - } - } - - processedMembers++ - log.info(`Processed member ${member.id}. Progress: ${processedMembers}/${totalCount}`) - } - offset += limit - } while (totalCount > offset) - - log.info(`Member to organization association completed`) - process.exit(0) - } catch (err) { - log.error( - `Failed to assign member to organizations. Member ID: ${currentMemberId}, Emails: ${currentEmails}`, - err, - ) - process.exit(1) - } -}) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 5e4b4cdb7f..46d4f23c6d 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1197,6 +1197,7 @@ export default class ActivityService extends LoggerBase { value.platform, undefined, orgPromiseCache, + value.timestamp, ) .then((memberId) => { // map ids for members @@ -1342,6 +1343,7 @@ export default class ActivityService extends LoggerBase { payload.platform, undefined, orgPromiseCache, + payload.activity.timestamp, ) .then(() => { payload.memberId = payload.dbMember.id @@ -1400,6 +1402,7 @@ export default class ActivityService extends LoggerBase { payload.platform, undefined, orgPromiseCache, + payload.activity.timestamp, ) .then(() => { payload.objectMemberId = payload.dbObjectMember.id @@ -1447,11 +1450,19 @@ export default class ActivityService extends LoggerBase { ) as boolean if (!isBot) { + const verifiedEmailIdentity = payload.activity.member.identities?.find( + (i) => i.type === MemberIdentityType.EMAIL && i.verified, + ) + const emailDomain = verifiedEmailIdentity + ? verifiedEmailIdentity.value.split('@')[1] + : undefined + // associate activity with organization payload.organizationId = await this.commonMemberService.findAffiliation( payload.memberId, payload.segmentId, payload.activity.timestamp, + emailDomain, ) } else { // for bot members, we don't want to affiliate the activity with an organization diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 8aa78b1cac..25ba57c79b 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -12,7 +12,12 @@ import { isObjectEmpty, singleOrDefault, } from '@crowd/common' -import { BotDetectionService, CommonMemberService } from '@crowd/common_services' +import { + BotDetectionService, + CommonMemberService, + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE, +} from '@crowd/common_services' import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer' import { findIdentitiesForMembers, @@ -121,6 +126,7 @@ export default class MemberService extends LoggerBase { platform: PlatformType, releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, + activityTimestamp?: string, ): Promise { return logExecutionTimeV2( async () => { @@ -448,6 +454,8 @@ export default class MemberService extends LoggerBase { integrationId, emailIdentities.map((i) => i.value), orgPromiseCache, + id, + activityTimestamp, ), this.log, 'memberService -> create -> assignOrganizationByEmailDomain', @@ -505,6 +513,7 @@ export default class MemberService extends LoggerBase { platform: PlatformType, releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, + activityTimestamp?: string, ): Promise { await logExecutionTimeV2( async () => { @@ -682,6 +691,8 @@ export default class MemberService extends LoggerBase { integrationId, emailIdentities.map((i) => i.value), orgPromiseCache, + id, + activityTimestamp, ), this.log, 'memberService -> update -> assignOrganizationByEmailDomain', @@ -733,6 +744,8 @@ export default class MemberService extends LoggerBase { integrationId: string, emails: string[], orgPromiseCache?: Map>, + memberId?: string, + activityTimestamp?: string, ): Promise { const orgService = new OrganizationService(this.store, this.log) const organizations: IOrganizationIdSource[] = [] @@ -791,6 +804,9 @@ export default class MemberService extends LoggerBase { id: orgId, source: orgSource, }) + if (memberId && activityTimestamp) { + await this.bufferMemberOrganizationActivityDates(memberId, orgId, activityTimestamp) + } } } @@ -1047,4 +1063,42 @@ export default class MemberService extends LoggerBase { }, }) } + + /** + * Queues member activity dates in Redis as raw input for stint inference. + * + * To maximize throughput, this uses a non-atomic HGET/HSET pattern. While + * concurrent writes may occasionally drop a date, the system is self-healing + * as future activity will re-populate the buffer. + */ + private async bufferMemberOrganizationActivityDates( + memberId: string, + organizationId: string, + activityTimestamp: string, + ): Promise { + // 1. Normalize the timestamp to a simple YYYY-MM-DD string + const date = new Date(activityTimestamp).toISOString().slice(0, 10) + const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + + // 2. Fetch and de-duplicate the date within the organization's buffer + const existing = await this.redisClient.hGet(key, organizationId) + const dates: string[] = existing ? JSON.parse(existing) : [] + + if (dates.includes(date)) { + this.log.trace({ memberId, organizationId, date }, 'Date already buffered, skipping.') + } else { + dates.push(date) + dates.sort() + + // 3. Update the buffer with the new sorted date list + await this.redisClient.hSet(key, organizationId, JSON.stringify(dates)) + this.log.debug( + { memberId, organizationId, date, totalDates: dates.length }, + 'Buffered activity date for stint inference.', + ) + } + + // 4. add the member to the inference queue + await this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + } } diff --git a/services/libs/common_services/src/services/common.member.service.ts b/services/libs/common_services/src/services/common.member.service.ts index 70c70e0647..e4ecd1956b 100644 --- a/services/libs/common_services/src/services/common.member.service.ts +++ b/services/libs/common_services/src/services/common.member.service.ts @@ -51,6 +51,7 @@ import { IWorkExperienceData } from '@crowd/data-access-layer/src/old/apps/data_ import { addOrgsToSegments } from '@crowd/data-access-layer/src/organizations' import { Logger, LoggerBase } from '@crowd/logging' import { Client as TemporalClient, WorkflowIdReusePolicy } from '@crowd/temporal' +import { OrganizationSource } from '@crowd/types' import { MergeActionState, MergeActionStep, @@ -169,6 +170,7 @@ export class CommonMemberService extends LoggerBase { memberId: string, segmentId: string, timestamp: string, + emailDomain?: string, ): Promise { const manualAffiliation = await findMemberManualAffiliation( this.qx, @@ -180,7 +182,12 @@ export class CommonMemberService extends LoggerBase { return manualAffiliation.organizationId } - const currentEmployments = await findMemberWorkExperience(this.qx, memberId, timestamp) + const currentEmployments = await findMemberWorkExperience( + this.qx, + memberId, + timestamp, + emailDomain, + ) if (currentEmployments.length > 0) { return this.decidePrimaryOrganizationId(currentEmployments) } @@ -217,10 +224,37 @@ export class CommonMemberService extends LoggerBase { return primaryEmployment.organizationId } + // Filter by source priority + // Source rank: ui > email-domain > enrichment-* > Other + const rankSource = (source?: string) => { + if (source === OrganizationSource.UI) return 0 + if (source === OrganizationSource.EMAIL_DOMAIN) return 1 + if (source?.startsWith('enrichment-')) return 2 + return 3 + } + + let bestRank = 4 + let highestPrioritySourceExperiences: IWorkExperienceData[] = [] + + for (const exp of experiences) { + const rank = rankSource(exp.source) + if (rank < bestRank) { + bestRank = rank + highestPrioritySourceExperiences = [exp] + } else if (rank === bestRank) { + highestPrioritySourceExperiences.push(exp) + } + } + + // Keep only candidates from the highest-priority source tier + if (highestPrioritySourceExperiences.length === 1) { + return highestPrioritySourceExperiences[0].organizationId + } + // decide based on the member count in the organizations const memberCounts = await findMemberCountEstimateOfOrganizations( this.qx, - experiences.map((e) => e.organizationId), + highestPrioritySourceExperiences.map((e) => e.organizationId), ) if (memberCounts[0].memberCount > memberCounts[1].memberCount) { @@ -230,7 +264,7 @@ export class CommonMemberService extends LoggerBase { } // if there's a draw in the member count, use the one with the longer period - return getLongestDateRange(experiences).organizationId + return getLongestDateRange(highestPrioritySourceExperiences).organizationId } return null diff --git a/services/libs/common_services/src/services/index.ts b/services/libs/common_services/src/services/index.ts index 227ef64e34..8034c4cfc6 100644 --- a/services/libs/common_services/src/services/index.ts +++ b/services/libs/common_services/src/services/index.ts @@ -3,7 +3,7 @@ export * from './llm.service' export * from './common.member.service' export * from './member/unmerge' export * from './member/cache' -export * from './memberOrganization' +export * from './member-organization' export * from './bot.service' export * from './emitters' export * from './github.integration.service' diff --git a/services/libs/common_services/src/services/member-organization.ts b/services/libs/common_services/src/services/member-organization.ts new file mode 100644 index 0000000000..ec094e24c9 --- /dev/null +++ b/services/libs/common_services/src/services/member-organization.ts @@ -0,0 +1,251 @@ +import { RedisClient } from '@crowd/redis' +import { + IMemberOrganization, + MemberOrgDate, + MemberOrgStintChange, + MemberRoleUnmergeStrategy, +} from '@crowd/types' + +function roleKey( + role: IMemberOrganization, + strategy: MemberRoleUnmergeStrategy, +): string | undefined { + if (strategy === MemberRoleUnmergeStrategy.SAME_MEMBER) { + return role.organizationId + } + return role.memberId +} + +function roleExistsInArray( + role: IMemberOrganization, + roles: IMemberOrganization[], + strategy: MemberRoleUnmergeStrategy, +): boolean { + const key = roleKey(role, strategy) + return roles.some( + (r) => + roleKey(r, strategy) === key && + r.title === role.title && + r.dateStart === role.dateStart && + r.dateEnd === role.dateEnd, + ) +} + +export function rolesIntersect( + roleA: IMemberOrganization, + roleB: IMemberOrganization, + strategy: MemberRoleUnmergeStrategy, +): boolean { + if (roleKey(roleA, strategy) !== roleKey(roleB, strategy) || roleA.title !== roleB.title) { + return false + } + + const startA = new Date(roleA.dateStart).getTime() + const endA = new Date(roleA.dateEnd).getTime() + const startB = new Date(roleB.dateStart).getTime() + const endB = new Date(roleB.dateEnd).getTime() + + return ( + (startA < startB && endA > startB) || + (startB < startA && endB > startA) || + (startA < startB && endA > endB) || + (startB < startA && endB > endA) + ) +} + +export function unmergeRoles( + mergedRoles: IMemberOrganization[], + primaryBackupRoles: IMemberOrganization[], + secondaryBackupRoles: IMemberOrganization[], + strategy: MemberRoleUnmergeStrategy, +): IMemberOrganization[] { + const unmergedRoles: IMemberOrganization[] = mergedRoles.filter( + (role) => + role.source === 'ui' || + !secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), + ) + + const editableRoles = mergedRoles.filter( + (role) => + role.source !== 'ui' && + secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), + ) + + for (const secondaryBackupRole of secondaryBackupRoles) { + const { dateStart, dateEnd } = secondaryBackupRole + + if (dateStart === null && dateEnd === null) { + if ( + roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && + roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) + ) { + unmergedRoles.push(secondaryBackupRole) + } + } else if (dateStart !== null && dateEnd === null) { + const currentRoleFromPrimaryBackup = primaryBackupRoles.find( + (r) => + roleKey(r, strategy) === roleKey(secondaryBackupRole, strategy) && + r.title === secondaryBackupRole.title && + r.dateStart !== null && + r.dateEnd === null, + ) + if (currentRoleFromPrimaryBackup) { + unmergedRoles.push(currentRoleFromPrimaryBackup) + } + } else if (dateStart !== null && dateEnd !== null) { + if ( + roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && + roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) + ) { + unmergedRoles.push(secondaryBackupRole) + } else { + const intersecting = editableRoles.find((r) => + rolesIntersect(secondaryBackupRole, r, strategy), + ) + + if (intersecting) { + const fromBackup = primaryBackupRoles.find((r) => + rolesIntersect(secondaryBackupRole, r, strategy), + ) + if (fromBackup) { + unmergedRoles.push(fromBackup) + } + } + } + } + } + + return unmergedRoles +} + +export const MEMBER_ORG_STINT_CHANGES_QUEUE = 'infer-member-organization-stint-changes:members' +export const MEMBER_ORG_STINT_CHANGES_DATES_PREFIX = 'infer-member-organization-stint-changes:dates' + +interface Stint { + id: string | null + organizationId: string + dateStart: string | null + dateEnd: string | null + isDirty: boolean + isNew: boolean +} + +/** + * Core logic to determine if activity dates should expand existing stints or create new ones + */ +export function inferMemberOrganizationStintChanges( + memberId: string, + existingRows: IMemberOrganization[], + orgDates: MemberOrgDate[], +): MemberOrgStintChange[] { + const toIso = (v: string | Date) => new Date(v).toISOString().split('T')[0] + const diff = (a: string, b: string) => Math.abs(Date.parse(b) - Date.parse(a)) / 86_400_000 + + // 1. Initialize local state to track modifications and new records + const stints: Stint[] = existingRows.map((r) => ({ + id: r.id ?? null, + organizationId: r.organizationId, + dateStart: r.dateStart ? toIso(r.dateStart) : null, + dateEnd: r.dateEnd ? toIso(r.dateEnd) : null, + isDirty: false, + isNew: false, + })) + + for (const { organizationId, date: targetDate } of orgDates) { + const orgStints = stints.filter((s) => s.organizationId === organizationId) + + // 2. Skip if the date is already covered by an existing stint + if ( + orgStints.some( + (s) => s.dateStart && s.dateEnd && targetDate >= s.dateStart && targetDate <= s.dateEnd, + ) + ) + continue + + // 3. Fill undated placeholder only when no dated stint exists yet (Rule 2) + const dated = orgStints.filter((s) => s.dateStart && s.dateEnd) + const placeholder = orgStints.find((s) => !s.dateStart && !s.dateEnd) + if (placeholder && dated.length === 0) { + placeholder.dateStart = placeholder.dateEnd = targetDate + placeholder.isDirty = true + continue + } + + // 4. Find the closest neighbor stint to see if expansion is possible + let neighbor: Stint | null = null + let minGap = Infinity + + for (const s of dated) { + const gap = + targetDate > s.dateEnd! ? diff(s.dateEnd!, targetDate) : diff(targetDate, s.dateStart!) + if (gap < minGap) { + minGap = gap + neighbor = s + } + } + + if (!neighbor) { + stints.push({ + id: null, + organizationId, + dateStart: targetDate, + dateEnd: targetDate, + isDirty: true, + isNew: true, + }) + continue + } + + // 5. Determine the gap window between neighbor and targetDate, then check if another org + // holds a significant (>= 30 day) dated stint that overlaps that window (multi-stint guard) + const gapStart = targetDate > neighbor.dateEnd! ? neighbor.dateEnd! : targetDate + const gapEnd = targetDate > neighbor.dateEnd! ? targetDate : neighbor.dateStart! + const hasConflict = stints.some( + (s) => + s.organizationId !== organizationId && + s.dateStart && + s.dateEnd && + s.dateStart < gapEnd && + s.dateEnd > gapStart && + diff(s.dateStart, s.dateEnd) >= 30, + ) + + const isForward = targetDate > neighbor.dateEnd! + + if (hasConflict) { + // 6a. Another org owns the gap — start a fresh stint rather than bridging + stints.push({ + id: null, + organizationId, + dateStart: targetDate, + dateEnd: targetDate, + isDirty: true, + isNew: true, + }) + } else if (isForward && minGap <= 30) { + // 6b. Forward extension within the debounce window — skip to avoid thrashing dateEnd. + // Backward extensions are not debounced (rare, only during historical re-ingestion). + continue + } else { + // 6c. Extend the neighbor in the appropriate direction + if (isForward) neighbor.dateEnd = targetDate + else neighbor.dateStart = targetDate + neighbor.isDirty = true + } + } + + // 7. Map only modified or new stints back to change objects + return stints + .filter((s) => s.isDirty) + .map((s): MemberOrgStintChange => { + const payload = { + memberId, + organizationId: s.organizationId, + dateStart: s.dateStart as string, + dateEnd: s.dateEnd as string, + } + + if (s.isNew) return { type: 'insert', ...payload } + return { type: 'update', id: s.id as string, ...payload } + }) +} diff --git a/services/libs/common_services/src/services/member/unmerge.ts b/services/libs/common_services/src/services/member/unmerge.ts index 7d79631d92..e56a0d6ab0 100644 --- a/services/libs/common_services/src/services/member/unmerge.ts +++ b/services/libs/common_services/src/services/member/unmerge.ts @@ -58,7 +58,7 @@ import { } from '@crowd/types' import { BotDetectionService } from '../bot.service' -import { unmergeRoles } from '../memberOrganization' +import { unmergeRoles } from '../member-organization' const logger = getServiceLogger() diff --git a/services/libs/common_services/src/services/memberOrganization.ts b/services/libs/common_services/src/services/memberOrganization.ts deleted file mode 100644 index b20139e3e4..0000000000 --- a/services/libs/common_services/src/services/memberOrganization.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { IMemberOrganization, MemberRoleUnmergeStrategy } from '@crowd/types' - -function roleKey( - role: IMemberOrganization, - strategy: MemberRoleUnmergeStrategy, -): string | undefined { - if (strategy === MemberRoleUnmergeStrategy.SAME_MEMBER) { - return role.organizationId - } - return role.memberId -} - -function roleExistsInArray( - role: IMemberOrganization, - roles: IMemberOrganization[], - strategy: MemberRoleUnmergeStrategy, -): boolean { - const key = roleKey(role, strategy) - return roles.some( - (r) => - roleKey(r, strategy) === key && - r.title === role.title && - r.dateStart === role.dateStart && - r.dateEnd === role.dateEnd, - ) -} - -export function rolesIntersect( - roleA: IMemberOrganization, - roleB: IMemberOrganization, - strategy: MemberRoleUnmergeStrategy, -): boolean { - if (roleKey(roleA, strategy) !== roleKey(roleB, strategy) || roleA.title !== roleB.title) { - return false - } - - const startA = new Date(roleA.dateStart).getTime() - const endA = new Date(roleA.dateEnd).getTime() - const startB = new Date(roleB.dateStart).getTime() - const endB = new Date(roleB.dateEnd).getTime() - - return ( - (startA < startB && endA > startB) || - (startB < startA && endB > startA) || - (startA < startB && endA > endB) || - (startB < startA && endB > endA) - ) -} - -export function unmergeRoles( - mergedRoles: IMemberOrganization[], - primaryBackupRoles: IMemberOrganization[], - secondaryBackupRoles: IMemberOrganization[], - strategy: MemberRoleUnmergeStrategy, -): IMemberOrganization[] { - const unmergedRoles: IMemberOrganization[] = mergedRoles.filter( - (role) => - role.source === 'ui' || - !secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), - ) - - const editableRoles = mergedRoles.filter( - (role) => - role.source !== 'ui' && - secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), - ) - - for (const secondaryBackupRole of secondaryBackupRoles) { - const { dateStart, dateEnd } = secondaryBackupRole - - if (dateStart === null && dateEnd === null) { - if ( - roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && - roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) - ) { - unmergedRoles.push(secondaryBackupRole) - } - } else if (dateStart !== null && dateEnd === null) { - const currentRoleFromPrimaryBackup = primaryBackupRoles.find( - (r) => - roleKey(r, strategy) === roleKey(secondaryBackupRole, strategy) && - r.title === secondaryBackupRole.title && - r.dateStart !== null && - r.dateEnd === null, - ) - if (currentRoleFromPrimaryBackup) { - unmergedRoles.push(currentRoleFromPrimaryBackup) - } - } else if (dateStart !== null && dateEnd !== null) { - if ( - roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && - roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) - ) { - unmergedRoles.push(secondaryBackupRole) - } else { - const intersecting = editableRoles.find((r) => - rolesIntersect(secondaryBackupRole, r, strategy), - ) - - if (intersecting) { - const fromBackup = primaryBackupRoles.find((r) => - rolesIntersect(secondaryBackupRole, r, strategy), - ) - if (fromBackup) { - unmergedRoles.push(fromBackup) - } - } - } - } - } - - return unmergedRoles -} diff --git a/services/libs/data-access-layer/src/members/organizations.ts b/services/libs/data-access-layer/src/members/organizations.ts index 205c871a82..fcd87bbf18 100644 --- a/services/libs/data-access-layer/src/members/organizations.ts +++ b/services/libs/data-access-layer/src/members/organizations.ts @@ -42,6 +42,23 @@ export async function fetchMemberOrganizations( ) } +export async function fetchMemberOrganizationsBySource( + qx: QueryExecutor, + memberId: string, + source: OrganizationSource, +): Promise { + return qx.select( + ` + SELECT "id", "organizationId", "dateStart", "dateEnd", "title", "memberId", "source" + FROM "memberOrganizations" + WHERE "memberId" = $(memberId) + AND "source" = $(source) + AND "deletedAt" IS NULL + `, + { memberId, source }, + ) +} + export async function fetchOrganizationMemberIds( qx: QueryExecutor, organizationId: string, diff --git a/services/libs/data-access-layer/src/members/segments.ts b/services/libs/data-access-layer/src/members/segments.ts index 4960dc079a..34a15f7b5a 100644 --- a/services/libs/data-access-layer/src/members/segments.ts +++ b/services/libs/data-access-layer/src/members/segments.ts @@ -206,7 +206,21 @@ export async function findMemberWorkExperience( qx: QueryExecutor, memberId: string, timestamp: string, + emailDomain?: string, ): Promise { + let domainOrClause = '' + if (emailDomain) { + domainOrClause = ` + OR (mo."source" = 'email-domain' AND EXISTS ( + SELECT 1 FROM "organizationIdentities" oi + WHERE oi."organizationId" = mo."organizationId" + AND oi.type = 'primary-domain' + AND oi.verified = true + AND LOWER(oi.value) = LOWER($(emailDomain)) + )) + ` + } + const result = await qx.select( ` SELECT @@ -215,17 +229,19 @@ export async function findMemberWorkExperience( FROM "memberOrganizations" mo LEFT JOIN "memberOrganizationAffiliationOverrides" ovr on ovr."memberOrganizationId" = mo."id" WHERE mo."memberId" = $(memberId) + AND mo."deletedAt" IS NULL + AND coalesce(ovr."allowAffiliation", true) = true AND ( (mo."dateStart" <= $(timestamp) AND mo."dateEnd" >= $(timestamp)) OR (mo."dateStart" <= $(timestamp) AND mo."dateEnd" IS NULL) + ${domainOrClause} ) - AND mo."deletedAt" IS NULL - AND coalesce(ovr."allowAffiliation", true) = true ORDER BY mo."dateStart" DESC, mo.id `, { memberId, timestamp, + emailDomain, }, ) diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts index df444d7631..5c53320503 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts @@ -1,3 +1,5 @@ +import { OrganizationSource } from '@crowd/types' + interface BaseData { memberId: string organizationId: string @@ -15,6 +17,7 @@ export interface IManualAffiliationData extends BaseData { export interface IWorkExperienceData extends BaseData { id: string + source: OrganizationSource } export interface IOrganizationMemberCount { diff --git a/services/libs/types/src/organizations.ts b/services/libs/types/src/organizations.ts index d5b01d6beb..649cc27f6b 100644 --- a/services/libs/types/src/organizations.ts +++ b/services/libs/types/src/organizations.ts @@ -92,6 +92,22 @@ export interface IMemberRoleWithOrganization extends IMemberOrganization { organizationLogo: string } +export interface MemberOrgDate { + organizationId: string + date: string // YYYY-MM-DD +} + +interface MemberOrgStintChangeBase { + memberId: string + organizationId: string + dateStart: string + dateEnd: string +} + +export type MemberOrgStintChange = + | ({ type: 'insert' } & MemberOrgStintChangeBase) + | ({ type: 'update'; id: string } & MemberOrgStintChangeBase) + export interface IExecutiveChange { joined_date?: string pdl_id?: string From 057464d0dec92f1af7c30c3043334b09791b7b73 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Fri, 24 Apr 2026 23:04:04 +0530 Subject: [PATCH 02/18] fix: resolve pr comments from ai bots Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- ...inferMemberOrganizationStintChanges.job.ts | 154 ++++++++---------- .../src/service/member.service.ts | 41 +++-- .../src/services/member-organization.ts | 18 +- 3 files changed, 101 insertions(+), 112 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 56fec83064..5e31bbf46c 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -1,18 +1,16 @@ import CronTime from 'cron-time-generator' -import { - inferMemberOrganizationStintChanges, - MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, - MEMBER_ORG_STINT_CHANGES_QUEUE -} from '@crowd/common_services' + import { - createMemberOrganization, - fetchMemberOrganizationsBySource, - updateMemberOrganization, -} from '@crowd/data-access-layer' + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE, + inferMemberOrganizationStintChanges, +} from '@crowd/common_services' +import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer' import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' -import { pgpQx, QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' -import { REDIS_CONFIG, RedisClient, getRedisClient } from '@crowd/redis' -import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types' +import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { REDIS_CONFIG, getRedisClient } from '@crowd/redis' +import { OrganizationSource } from '@crowd/types' + import { IJobDefinition } from '../types' const job: IJobDefinition = { @@ -24,8 +22,8 @@ const job: IJobDefinition = { const db = await getDbConnection(WRITE_DB_CONFIG(), 2, 0) const qx = pgpQx(db) - // 1. Fetch a batch of work triggers (Member IDs) - const memberIds = await redis.sPop(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) + // 1. Get a batch of work + const memberIds = await redis.sRandMember(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) if (!memberIds?.length) return ctx.log.info({ count: memberIds.length }, 'Processing pending members.') @@ -33,97 +31,73 @@ const job: IJobDefinition = { for (const memberId of memberIds) { try { - // 2. Get the activity dates for this member - const activityDates = await popMemberOrganizationActivityDates(redis, memberId) - - // If the member has no activity dates, move to the next member - if (activityDates.length === 0) continue - - // 3. Sync with existing database state - const existingOrgs = await fetchMemberOrganizationsBySource( - qx, - memberId, - OrganizationSource.EMAIL_DOMAIN, - ) - - // 4. Calculate required stint changes - const stintChanges = inferMemberOrganizationStintChanges( - memberId, - existingOrgs, - activityDates, - ) - - if (stintChanges.length === 0) continue - - const counts = { - inserts: stintChanges.filter((c) => c.type === 'insert').length, - updates: stintChanges.filter((c) => c.type === 'update').length, + const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + const hash = await redis.hGetAll(datesKey) + + // If no data, just remove from queue and move on + if (!hash || Object.keys(hash).length === 0) { + await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + continue } - ctx.log.info({ memberId, ...counts }, 'Stint changes identified.') + // 2. Parse Redis data into domain objects + const { activityDates, orgIds } = parseMemberActivityHash(hash) - ctx.log.debug( - { memberId, activityDates, existingOrgs, stintChanges }, - 'Stint inference trace.', - ) + if (activityDates.length > 0) { + // 3. Compare with DB and calculate delta + const existingOrgs = await fetchMemberOrganizationsBySource( + qx, + memberId, + OrganizationSource.EMAIL_DOMAIN, + ) - // @todo: Enable writes after dry-run validation - // await applyStintChanges(qx, stintChanges) + const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates) + + if (changes.length > 0) { + ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.') + stats.inserts += changes.filter((c) => c.type === 'insert').length + stats.updates += changes.filter((c) => c.type === 'update').length + } + } + + // 4. Cleanup: Remove only the fields we actually read + await redis + .multi() + .hDel(datesKey, ...orgIds) + .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + .exec() stats.processed++ - stats.inserts += counts.inserts - stats.updates += counts.updates } catch (err) { ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') } } - ctx.log.info(stats, 'Batch inference complete.') + ctx.log.info(stats, 'Batch complete.') }, } -async function popMemberOrganizationActivityDates( - redis: RedisClient, - memberId: string, -): Promise { - const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` - - // hGetAll + del in a multi block makes the "Pop" atomic for the entire Hash - const [hash] = (await redis.multi().hGetAll(key).del(key).exec()) as [Record | null, number] - - if (!hash || Object.keys(hash).length === 0) return [] - - return Object.entries(hash) - .flatMap(([organizationId, datesJson]) => - (JSON.parse(datesJson) as string[]).map((date) => ({ organizationId, date })), - ) +/** + * Parses the Redis hash into a clean, typed list of activity dates. + */ +function parseMemberActivityHash(hash: Record) { + const orgIds = Object.keys(hash) + const activityDates = orgIds + .flatMap((organizationId) => { + try { + const dates = JSON.parse(hash[organizationId]) + return Array.isArray(dates) + ? dates + .filter((d): d is string => typeof d === 'string') + .map((date) => ({ organizationId, date })) + : [] + } catch { + return [] + } + }) .sort((a, b) => a.date.localeCompare(b.date)) -} - -async function applyStintChanges( - qx: QueryExecutor, - stintChanges: MemberOrgStintChange[], -): Promise { - for (const change of stintChanges) { - if (change.type === 'insert') { - await createMemberOrganization(qx, change.memberId, { - organizationId: change.organizationId, - dateStart: change.dateStart, - dateEnd: change.dateEnd, - source: OrganizationSource.EMAIL_DOMAIN, - }) - continue - } - - if (!change.id) { - throw new Error('Missing id for update stint change.') - } - await updateMemberOrganization(qx, change.memberId, change.id, { - dateStart: change.dateStart, - dateEnd: change.dateEnd, - }) - } + return { activityDates, orgIds } } -export default job \ No newline at end of file +export default job diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 25ba57c79b..3ce69e4acf 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -1076,29 +1076,42 @@ export default class MemberService extends LoggerBase { organizationId: string, activityTimestamp: string, ): Promise { - // 1. Normalize the timestamp to a simple YYYY-MM-DD string - const date = new Date(activityTimestamp).toISOString().slice(0, 10) + const date = new Date(activityTimestamp).toISOString().split('T')[0] const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` - // 2. Fetch and de-duplicate the date within the organization's buffer + // Safe parser for existing Redis strings. + // Returns a valid string array even if data is corrupt or missing. + const parseExistingDates = (value: string | null | undefined): string[] => { + if (!value) return [] + try { + const parsed = JSON.parse(value) + return Array.isArray(parsed) ? parsed.filter((d): d is string => typeof d === 'string') : [] + } catch { + this.log.warn('Corrupt dates buffer value detected during buffering.') + return [] + } + } + + // 1. Fetch current dates for this specific organization const existing = await this.redisClient.hGet(key, organizationId) - const dates: string[] = existing ? JSON.parse(existing) : [] + const dates: string[] = parseExistingDates(existing) - if (dates.includes(date)) { - this.log.trace({ memberId, organizationId, date }, 'Date already buffered, skipping.') - } else { + // 2. If the date is new, update the set and the queue + if (!dates.includes(date)) { dates.push(date) dates.sort() - // 3. Update the buffer with the new sorted date list - await this.redisClient.hSet(key, organizationId, JSON.stringify(dates)) + await Promise.all([ + // Update the specific org field in the hash + this.redisClient.hSet(key, organizationId, JSON.stringify(dates)), + // Ensure the member is in the processing queue + this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId), + ]) + this.log.debug( - { memberId, organizationId, date, totalDates: dates.length }, - 'Buffered activity date for stint inference.', + { memberId, organizationId, date, count: dates.length }, + 'Buffered activity date and queued member.', ) } - - // 4. add the member to the inference queue - await this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) } } diff --git a/services/libs/common_services/src/services/member-organization.ts b/services/libs/common_services/src/services/member-organization.ts index ec094e24c9..23e93a50de 100644 --- a/services/libs/common_services/src/services/member-organization.ts +++ b/services/libs/common_services/src/services/member-organization.ts @@ -1,4 +1,3 @@ -import { RedisClient } from '@crowd/redis' import { IMemberOrganization, MemberOrgDate, @@ -130,6 +129,8 @@ interface Stint { isNew: boolean } +type DatedStint = Stint & { dateStart: string; dateEnd: string } + /** * Core logic to determine if activity dates should expand existing stints or create new ones */ @@ -163,7 +164,9 @@ export function inferMemberOrganizationStintChanges( continue // 3. Fill undated placeholder only when no dated stint exists yet (Rule 2) - const dated = orgStints.filter((s) => s.dateStart && s.dateEnd) + const dated = orgStints.filter( + (s): s is DatedStint => s.dateStart !== null && s.dateEnd !== null, + ) const placeholder = orgStints.find((s) => !s.dateStart && !s.dateEnd) if (placeholder && dated.length === 0) { placeholder.dateStart = placeholder.dateEnd = targetDate @@ -172,12 +175,12 @@ export function inferMemberOrganizationStintChanges( } // 4. Find the closest neighbor stint to see if expansion is possible - let neighbor: Stint | null = null + let neighbor: DatedStint | null = null let minGap = Infinity for (const s of dated) { const gap = - targetDate > s.dateEnd! ? diff(s.dateEnd!, targetDate) : diff(targetDate, s.dateStart!) + targetDate > s.dateEnd ? diff(s.dateEnd, targetDate) : diff(targetDate, s.dateStart) if (gap < minGap) { minGap = gap neighbor = s @@ -198,8 +201,9 @@ export function inferMemberOrganizationStintChanges( // 5. Determine the gap window between neighbor and targetDate, then check if another org // holds a significant (>= 30 day) dated stint that overlaps that window (multi-stint guard) - const gapStart = targetDate > neighbor.dateEnd! ? neighbor.dateEnd! : targetDate - const gapEnd = targetDate > neighbor.dateEnd! ? targetDate : neighbor.dateStart! + const isForward = targetDate > neighbor.dateEnd + const gapStart = isForward ? neighbor.dateEnd : targetDate + const gapEnd = isForward ? targetDate : neighbor.dateStart const hasConflict = stints.some( (s) => s.organizationId !== organizationId && @@ -210,8 +214,6 @@ export function inferMemberOrganizationStintChanges( diff(s.dateStart, s.dateEnd) >= 30, ) - const isForward = targetDate > neighbor.dateEnd! - if (hasConflict) { // 6a. Another org owns the gap — start a fresh stint rather than bridging stints.push({ From 0af601cf4ae41cc26eb5894bf64f810e453a290e Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Fri, 24 Apr 2026 23:20:32 +0530 Subject: [PATCH 03/18] fix: update redis member ID retrieval method and cleanup logic Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/jobs/inferMemberOrganizationStintChanges.job.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 5e31bbf46c..40b5929fd7 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -23,7 +23,7 @@ const job: IJobDefinition = { const qx = pgpQx(db) // 1. Get a batch of work - const memberIds = await redis.sRandMember(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) + const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) if (!memberIds?.length) return ctx.log.info({ count: memberIds.length }, 'Processing pending members.') @@ -63,7 +63,7 @@ const job: IJobDefinition = { // 4. Cleanup: Remove only the fields we actually read await redis .multi() - .hDel(datesKey, ...orgIds) + .hDel(datesKey, orgIds) .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) .exec() From 34354a596cacd2fa96dd1641e854becc1939148e Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Sat, 25 Apr 2026 01:19:01 +0530 Subject: [PATCH 04/18] Update services/apps/data_sink_worker/src/service/member.service.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- services/apps/data_sink_worker/src/service/member.service.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 3ce69e4acf..cbe971d0d9 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -1087,7 +1087,10 @@ export default class MemberService extends LoggerBase { const parsed = JSON.parse(value) return Array.isArray(parsed) ? parsed.filter((d): d is string => typeof d === 'string') : [] } catch { - this.log.warn('Corrupt dates buffer value detected during buffering.') + this.log.warn( + { memberId, organizationId, key }, + 'Corrupt dates buffer value detected during buffering for member organization activity dates.', + ) return [] } } From f74aef1eef55c5cf013135f0748a780c73b3a160 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Sat, 25 Apr 2026 02:44:47 +0530 Subject: [PATCH 05/18] fix: resolve pr comments from ai bots Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- ...inferMemberOrganizationStintChanges.job.ts | 27 +++++++++---------- .../src/services/member-organization.ts | 4 ++- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 40b5929fd7..2f6a88b6b6 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -82,21 +82,18 @@ const job: IJobDefinition = { */ function parseMemberActivityHash(hash: Record) { const orgIds = Object.keys(hash) - const activityDates = orgIds - .flatMap((organizationId) => { - try { - const dates = JSON.parse(hash[organizationId]) - return Array.isArray(dates) - ? dates - .filter((d): d is string => typeof d === 'string') - .map((date) => ({ organizationId, date })) - : [] - } catch { - return [] - } - }) - .sort((a, b) => a.date.localeCompare(b.date)) - + const activityDates = orgIds.flatMap((organizationId) => { + try { + const dates = JSON.parse(hash[organizationId]) + return Array.isArray(dates) + ? dates + .filter((d): d is string => typeof d === 'string') + .map((date) => ({ organizationId, date })) + : [] + } catch { + return [] + } + }) return { activityDates, orgIds } } diff --git a/services/libs/common_services/src/services/member-organization.ts b/services/libs/common_services/src/services/member-organization.ts index 23e93a50de..fc8a7c38db 100644 --- a/services/libs/common_services/src/services/member-organization.ts +++ b/services/libs/common_services/src/services/member-organization.ts @@ -152,7 +152,9 @@ export function inferMemberOrganizationStintChanges( isNew: false, })) - for (const { organizationId, date: targetDate } of orgDates) { + const sortedDates = [...orgDates].sort((a, b) => a.date.localeCompare(b.date)) + + for (const { organizationId, date: targetDate } of sortedDates) { const orgStints = stints.filter((s) => s.organizationId === organizationId) // 2. Skip if the date is already covered by an existing stint From 68a79fc1b17eeba88b4c9f48cb6f8429836c45fb Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Sun, 26 Apr 2026 20:42:25 +0530 Subject: [PATCH 06/18] refactor: extract source rank logic into a separate func for clarity and reuse Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- services/libs/common/src/member.ts | 3 +++ .../src/services/common.member.service.ts | 13 ++----------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/services/libs/common/src/member.ts b/services/libs/common/src/member.ts index 2ca0d2c455..3518fe8884 100644 --- a/services/libs/common/src/member.ts +++ b/services/libs/common/src/member.ts @@ -82,6 +82,9 @@ export const calculateReach = (oldReach: any, newReach: any): { total: number } return out } +/** + * Lower rank wins when multiple member-organization sources overlap. + */ export function getMemberOrganizationSourceRank(source: string | null | undefined): number { if (source === OrganizationSource.UI) return 0 if (source === OrganizationSource.EMAIL_DOMAIN) return 1 diff --git a/services/libs/common_services/src/services/common.member.service.ts b/services/libs/common_services/src/services/common.member.service.ts index e4ecd1956b..f4752b6667 100644 --- a/services/libs/common_services/src/services/common.member.service.ts +++ b/services/libs/common_services/src/services/common.member.service.ts @@ -13,6 +13,7 @@ import { calculateReach, getEarliestValidDate, getLongestDateRange, + getMemberOrganizationSourceRank, mergeObjects, safeObjectMerge, } from '@crowd/common' @@ -51,7 +52,6 @@ import { IWorkExperienceData } from '@crowd/data-access-layer/src/old/apps/data_ import { addOrgsToSegments } from '@crowd/data-access-layer/src/organizations' import { Logger, LoggerBase } from '@crowd/logging' import { Client as TemporalClient, WorkflowIdReusePolicy } from '@crowd/temporal' -import { OrganizationSource } from '@crowd/types' import { MergeActionState, MergeActionStep, @@ -224,20 +224,11 @@ export class CommonMemberService extends LoggerBase { return primaryEmployment.organizationId } - // Filter by source priority - // Source rank: ui > email-domain > enrichment-* > Other - const rankSource = (source?: string) => { - if (source === OrganizationSource.UI) return 0 - if (source === OrganizationSource.EMAIL_DOMAIN) return 1 - if (source?.startsWith('enrichment-')) return 2 - return 3 - } - let bestRank = 4 let highestPrioritySourceExperiences: IWorkExperienceData[] = [] for (const exp of experiences) { - const rank = rankSource(exp.source) + const rank = getMemberOrganizationSourceRank(exp.source) if (rank < bestRank) { bestRank = rank highestPrioritySourceExperiences = [exp] From 9553e2cbce8ae8ef3543e18286fe962cbd65da03 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Tue, 28 Apr 2026 21:18:33 +0530 Subject: [PATCH 07/18] refactor: streamline email domain extraction logic in ActivityService Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../data_sink_worker/src/service/activity.service.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 46d4f23c6d..1ca2d5a4c1 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -9,6 +9,7 @@ import { distinctBy, escapeNullByte, generateUUIDv1, + isDomainExcluded, isValidEmail, parseGitHubNoreplyEmail, single, @@ -1450,12 +1451,10 @@ export default class ActivityService extends LoggerBase { ) as boolean if (!isBot) { - const verifiedEmailIdentity = payload.activity.member.identities?.find( - (i) => i.type === MemberIdentityType.EMAIL && i.verified, - ) - const emailDomain = verifiedEmailIdentity - ? verifiedEmailIdentity.value.split('@')[1] - : undefined + const emailDomain = payload.activity.member.identities + ?.filter((i) => i.type === MemberIdentityType.EMAIL && i.verified) + .map((i) => i.value.split('@')[1]?.toLowerCase()) + .find((domain) => domain && !isDomainExcluded(domain)) // associate activity with organization payload.organizationId = await this.commonMemberService.findAffiliation( From 8a9f500a4f4d945ee88925831ad30e9cd24d2bac Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 29 Apr 2026 17:37:29 +0530 Subject: [PATCH 08/18] chore: enable debugger logs in prod Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- services/apps/cron_service/package.json | 2 +- services/apps/data_sink_worker/package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/cron_service/package.json b/services/apps/cron_service/package.json index 1077a2e0b1..7740957821 100644 --- a/services/apps/cron_service/package.json +++ b/services/apps/cron_service/package.json @@ -2,7 +2,7 @@ "name": "@crowd/cron-service", "private": true, "scripts": { - "start": "SERVICE=cron-service tsx src/main.ts", + "start": "SERVICE=cron-service LOG_LEVEL=trace tsx src/main.ts", "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", "start:debug": "SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", diff --git a/services/apps/data_sink_worker/package.json b/services/apps/data_sink_worker/package.json index f9a2363081..71ec155d52 100644 --- a/services/apps/data_sink_worker/package.json +++ b/services/apps/data_sink_worker/package.json @@ -2,7 +2,7 @@ "name": "@crowd/data-sink-worker", "private": true, "scripts": { - "start": "SERVICE=data-sink-worker tsx src/main.ts", + "start": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx src/main.ts", "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts", "start:debug": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts", "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", From d7a18df9e756318a27b05f30fb775b5182dfa847 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 29 Apr 2026 18:02:53 +0530 Subject: [PATCH 09/18] chore: add logging for member organization stint inference job start Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/jobs/inferMemberOrganizationStintChanges.job.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 2f6a88b6b6..3fd00c020c 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -19,9 +19,11 @@ const job: IJobDefinition = { timeout: 10 * 60, process: async (ctx) => { const redis = await getRedisClient(REDIS_CONFIG()) - const db = await getDbConnection(WRITE_DB_CONFIG(), 2, 0) + const db = await getDbConnection(WRITE_DB_CONFIG()) const qx = pgpQx(db) + ctx.log.info('Starting member organization stint inference job!') + // 1. Get a batch of work const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) if (!memberIds?.length) return From 15bd7ec6f22f5ebfdf95c81c6306c1ca05030fb2 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 29 Apr 2026 18:45:00 +0530 Subject: [PATCH 10/18] fix: debugger and info logs Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- ...inferMemberOrganizationStintChanges.job.ts | 54 +++++++++++++------ 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 3fd00c020c..6271f03f27 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -5,14 +5,21 @@ import { MEMBER_ORG_STINT_CHANGES_QUEUE, inferMemberOrganizationStintChanges, } from '@crowd/common_services' -import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer' +import { + QueryExecutor, + createMemberOrganization, + fetchMemberOrganizationsBySource, + updateMemberOrganization, +} from '@crowd/data-access-layer' import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { REDIS_CONFIG, getRedisClient } from '@crowd/redis' -import { OrganizationSource } from '@crowd/types' +import { MemberOrgStintChange, OrganizationSource } from '@crowd/types' import { IJobDefinition } from '../types' +const APPLY_STINT_CHANGES = false + const job: IJobDefinition = { name: 'infer-member-organization-stint-changes', cronTime: CronTime.every(5).minutes(), @@ -22,31 +29,28 @@ const job: IJobDefinition = { const db = await getDbConnection(WRITE_DB_CONFIG()) const qx = pgpQx(db) - ctx.log.info('Starting member organization stint inference job!') + ctx.log.info('Starting member organization stint inference job.') - // 1. Get a batch of work const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) if (!memberIds?.length) return - ctx.log.info({ count: memberIds.length }, 'Processing pending members.') - const stats = { processed: 0, inserts: 0, updates: 0 } + ctx.log.info({ count: memberIds.length }, 'Processing members from queue.') + + let processed = 0 for (const memberId of memberIds) { try { const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` const hash = await redis.hGetAll(datesKey) - // If no data, just remove from queue and move on if (!hash || Object.keys(hash).length === 0) { await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) continue } - // 2. Parse Redis data into domain objects const { activityDates, orgIds } = parseMemberActivityHash(hash) if (activityDates.length > 0) { - // 3. Compare with DB and calculate delta const existingOrgs = await fetchMemberOrganizationsBySource( qx, memberId, @@ -56,26 +60,25 @@ const job: IJobDefinition = { const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates) if (changes.length > 0) { - ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.') - stats.inserts += changes.filter((c) => c.type === 'insert').length - stats.updates += changes.filter((c) => c.type === 'update').length + ctx.log.debug({ memberId, changes }, 'Stint changes identified.') + // await applyStintChanges(qx, changes) } } - // 4. Cleanup: Remove only the fields we actually read + // Remove only the fields we actually read await redis .multi() .hDel(datesKey, orgIds) .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) .exec() - stats.processed++ + processed++ } catch (err) { ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') } } - ctx.log.info(stats, 'Batch complete.') + ctx.log.info({ processed }, 'Batch complete.') }, } @@ -99,4 +102,25 @@ function parseMemberActivityHash(hash: Record) { return { activityDates, orgIds } } +/** + * Applies the stint changes to the database. + */ +async function applyStintChanges(qx: QueryExecutor, changes: MemberOrgStintChange[]) { + for (const change of changes) { + if (change.type === 'insert') { + await createMemberOrganization(qx, change.memberId, { + organizationId: change.organizationId, + dateStart: change.dateStart, + dateEnd: change.dateEnd, + source: OrganizationSource.EMAIL_DOMAIN, + }) + } else { + await updateMemberOrganization(qx, change.memberId, change.id, { + dateStart: change.dateStart, + dateEnd: change.dateEnd, + }) + } + } +} + export default job From 8914f410f280ef5210e8d349695bbfefbfad9c79 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 29 Apr 2026 18:45:48 +0530 Subject: [PATCH 11/18] chore: remove unused constant and add TODO for applying stint changes in member organization job Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/jobs/inferMemberOrganizationStintChanges.job.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 6271f03f27..a10a6b1e16 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -18,8 +18,6 @@ import { MemberOrgStintChange, OrganizationSource } from '@crowd/types' import { IJobDefinition } from '../types' -const APPLY_STINT_CHANGES = false - const job: IJobDefinition = { name: 'infer-member-organization-stint-changes', cronTime: CronTime.every(5).minutes(), @@ -61,6 +59,7 @@ const job: IJobDefinition = { if (changes.length > 0) { ctx.log.debug({ memberId, changes }, 'Stint changes identified.') + // TODO: Uncomment after validating preview logs. // await applyStintChanges(qx, changes) } } From 498f0c156f1f05ff49f3a3e19000fac412697a76 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 29 Apr 2026 19:55:10 +0530 Subject: [PATCH 12/18] feat: apply overrides in member organization stint changes job Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- ...inferMemberOrganizationStintChanges.job.ts | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index a10a6b1e16..c509ad1e2c 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -7,6 +7,8 @@ import { } from '@crowd/common_services' import { QueryExecutor, + changeMemberOrganizationAffiliationOverrides, + checkOrganizationAffiliationPolicy, createMemberOrganization, fetchMemberOrganizationsBySource, updateMemberOrganization, @@ -59,8 +61,7 @@ const job: IJobDefinition = { if (changes.length > 0) { ctx.log.debug({ memberId, changes }, 'Stint changes identified.') - // TODO: Uncomment after validating preview logs. - // await applyStintChanges(qx, changes) + await applyStintChanges(qx, changes) } } @@ -107,12 +108,27 @@ function parseMemberActivityHash(hash: Record) { async function applyStintChanges(qx: QueryExecutor, changes: MemberOrgStintChange[]) { for (const change of changes) { if (change.type === 'insert') { - await createMemberOrganization(qx, change.memberId, { + const memberOrganizationId = await createMemberOrganization(qx, change.memberId, { organizationId: change.organizationId, dateStart: change.dateStart, dateEnd: change.dateEnd, source: OrganizationSource.EMAIL_DOMAIN, }) + + const isAffiliationBlocked = await checkOrganizationAffiliationPolicy( + qx, + change.organizationId, + ) + + if (memberOrganizationId && isAffiliationBlocked) { + await changeMemberOrganizationAffiliationOverrides(qx, [ + { + memberId: change.memberId, + memberOrganizationId, + allowAffiliation: false, + }, + ]) + } } else { await updateMemberOrganization(qx, change.memberId, change.id, { dateStart: change.dateStart, From 4fead74ce90b54e8e59eb0a2436749ab51bb6129 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:14:19 +0530 Subject: [PATCH 13/18] fix: resolve pr review comments Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- backend/package.json | 3 +- .../remove-member-org-stint-hash-keys.ts | 81 +++++++++++++++++++ ...inferMemberOrganizationStintChanges.job.ts | 58 ++++++------- .../src/service/member.service.ts | 52 +++--------- .../src/services/common.member.service.ts | 7 +- services/libs/redis/src/cache.ts | 35 +++++++- 6 files changed, 160 insertions(+), 76 deletions(-) create mode 100644 backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts diff --git a/backend/package.json b/backend/package.json index 5bc3ff15d7..9b1fc3d4a9 100644 --- a/backend/package.json +++ b/backend/package.json @@ -32,7 +32,8 @@ "script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts", "script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts", "script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts", - "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts" + "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts", + "script:remove-member-org-stint-hash-keys": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/remove-member-org-stint-hash-keys.ts" }, "lint-staged": { "**/*.ts": [ diff --git a/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts b/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts new file mode 100644 index 0000000000..1dec70c86a --- /dev/null +++ b/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts @@ -0,0 +1,81 @@ +import commandLineArgs from 'command-line-args' + +import { + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE, +} from '@crowd/common_services' +import { getServiceLogger } from '@crowd/logging' +import { getRedisClient, stopClient } from '@crowd/redis' + +import { REDIS_CONFIG } from '@/conf' + +const log = getServiceLogger() + +const options = [ + { + name: 'confirm', + alias: 'c', + type: Boolean, + description: 'Actually delete old hash keys. Defaults to dry-run.', + }, + { + name: 'count', + type: Number, + defaultValue: 500, + description: 'SCAN count hint.', + }, +] + +const parameters = commandLineArgs(options) + +function memberIdFromDatesKey(key: string): string { + return key.slice(`${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:`.length) +} + +setImmediate(async () => { + const dryRun = !parameters.confirm + const scanCount = parameters.count + const redis = await getRedisClient(REDIS_CONFIG, true) + const pattern = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:*` + + let scanned = 0 + let hashKeys = 0 + let deleted = 0 + let cursor = 0 + + log.info({ dryRun, pattern, scanCount }, 'Removing old member organization stint hash keys.') + + try { + do { + const result = await redis.scan(cursor, { + MATCH: pattern, + COUNT: scanCount, + }) + + cursor = Number(result.cursor) + scanned += result.keys.length + + for (const key of result.keys) { + const type = await redis.type(key) + if (type === 'hash') { + hashKeys++ + const memberId = memberIdFromDatesKey(key) + + if (dryRun) { + log.info({ key, memberId }, 'Would remove old hash key and queue member.') + } else { + await redis.multi().del(key).sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId).exec() + + deleted++ + } + } + } + } while (cursor !== 0) + + log.info({ dryRun, scanned, hashKeys, deleted }, 'Finished removing old hash keys.') + } finally { + await stopClient(redis) + } + + process.exit(0) +}) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index c509ad1e2c..01df93291a 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -15,8 +15,8 @@ import { } from '@crowd/data-access-layer' import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' -import { REDIS_CONFIG, getRedisClient } from '@crowd/redis' -import { MemberOrgStintChange, OrganizationSource } from '@crowd/types' +import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis' +import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types' import { IJobDefinition } from '../types' @@ -41,40 +41,44 @@ const job: IJobDefinition = { for (const memberId of memberIds) { try { const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` - const hash = await redis.hGetAll(datesKey) + const rawMembers = await redis.sMembers(datesKey) - if (!hash || Object.keys(hash).length === 0) { + if (!rawMembers?.length) { await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) continue } - const { activityDates, orgIds } = parseMemberActivityHash(hash) + const orgDates = parseSetMembers(rawMembers) - if (activityDates.length > 0) { + if (orgDates.length > 0) { const existingOrgs = await fetchMemberOrganizationsBySource( qx, memberId, OrganizationSource.EMAIL_DOMAIN, ) - const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates) + const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, orgDates) if (changes.length > 0) { ctx.log.debug({ memberId, changes }, 'Stint changes identified.') - await applyStintChanges(qx, changes) + await qx.tx((tx) => applyStintChanges(tx, changes)) } } - // Remove only the fields we actually read - await redis - .multi() - .hDel(datesKey, orgIds) - .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) - .exec() + // Atomically remove only the values we read. + // If no new values were added, remove the member from the queue. + await RedisCache.ackSetMembers( + redis, + datesKey, + MEMBER_ORG_STINT_CHANGES_QUEUE, + memberId, + rawMembers, + ) processed++ } catch (err) { ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') + throw err } } @@ -83,23 +87,19 @@ const job: IJobDefinition = { } /** - * Parses the Redis hash into a clean, typed list of activity dates. + * Parses set members of the form "orgId|date" into typed activity dates. */ -function parseMemberActivityHash(hash: Record) { - const orgIds = Object.keys(hash) - const activityDates = orgIds.flatMap((organizationId) => { - try { - const dates = JSON.parse(hash[organizationId]) - return Array.isArray(dates) - ? dates - .filter((d): d is string => typeof d === 'string') - .map((date) => ({ organizationId, date })) - : [] - } catch { - return [] +function parseSetMembers(members: string[]): MemberOrgDate[] { + const results: MemberOrgDate[] = [] + + for (const m of members) { + const idx = m.indexOf('|') + if (idx > 0) { + results.push({ organizationId: m.slice(0, idx), date: m.slice(idx + 1) }) } - }) - return { activityDates, orgIds } + } + + return results } /** diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index cbe971d0d9..63ad00480b 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -1065,11 +1065,8 @@ export default class MemberService extends LoggerBase { } /** - * Queues member activity dates in Redis as raw input for stint inference. - * - * To maximize throughput, this uses a non-atomic HGET/HSET pattern. While - * concurrent writes may occasionally drop a date, the system is self-healing - * as future activity will re-populate the buffer. + * Queues a member activity date in Redis for stint inference. + * Uses SADD for natural concurrency safety and deduplication. */ private async bufferMemberOrganizationActivityDates( memberId: string, @@ -1077,44 +1074,17 @@ export default class MemberService extends LoggerBase { activityTimestamp: string, ): Promise { const date = new Date(activityTimestamp).toISOString().split('T')[0] - const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` - - // Safe parser for existing Redis strings. - // Returns a valid string array even if data is corrupt or missing. - const parseExistingDates = (value: string | null | undefined): string[] => { - if (!value) return [] - try { - const parsed = JSON.parse(value) - return Array.isArray(parsed) ? parsed.filter((d): d is string => typeof d === 'string') : [] - } catch { - this.log.warn( - { memberId, organizationId, key }, - 'Corrupt dates buffer value detected during buffering for member organization activity dates.', - ) - return [] - } - } - // 1. Fetch current dates for this specific organization - const existing = await this.redisClient.hGet(key, organizationId) - const dates: string[] = parseExistingDates(existing) + // Each member gets one flat set: values are "orgId|date" + const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + const value = `${organizationId}|${date}` - // 2. If the date is new, update the set and the queue - if (!dates.includes(date)) { - dates.push(date) - dates.sort() + await this.redisClient + .multi() + .sAdd(datesKey, value) + .sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + .exec() - await Promise.all([ - // Update the specific org field in the hash - this.redisClient.hSet(key, organizationId, JSON.stringify(dates)), - // Ensure the member is in the processing queue - this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId), - ]) - - this.log.debug( - { memberId, organizationId, date, count: dates.length }, - 'Buffered activity date and queued member.', - ) - } + this.log.debug({ memberId, organizationId, date }, 'Buffered activity date and queued member.') } } diff --git a/services/libs/common_services/src/services/common.member.service.ts b/services/libs/common_services/src/services/common.member.service.ts index f4752b6667..5f55b2d5d5 100644 --- a/services/libs/common_services/src/services/common.member.service.ts +++ b/services/libs/common_services/src/services/common.member.service.ts @@ -248,13 +248,12 @@ export class CommonMemberService extends LoggerBase { highestPrioritySourceExperiences.map((e) => e.organizationId), ) - if (memberCounts[0].memberCount > memberCounts[1].memberCount) { + // memberCounts is sorted desc by memberCount — pick the winner if it's strictly highest + if (memberCounts?.length >= 2 && memberCounts[0].memberCount > memberCounts[1].memberCount) { return memberCounts[0].organizationId - } else if (memberCounts[0].memberCount < memberCounts[1].memberCount) { - return memberCounts[1].organizationId } - // if there's a draw in the member count, use the one with the longer period + // tie or no data — fall back to longest date range return getLongestDateRange(highestPrioritySourceExperiences).organizationId } diff --git a/services/libs/redis/src/cache.ts b/services/libs/redis/src/cache.ts index fac4bb1bfd..29db9cb7c0 100644 --- a/services/libs/redis/src/cache.ts +++ b/services/libs/redis/src/cache.ts @@ -1,7 +1,7 @@ import { Logger, LoggerBase } from '@crowd/logging' import { ICache } from '@crowd/types' -import { RedisClient } from './types' +import type { RedisClient } from './types' export class RedisCache extends LoggerBase implements ICache { private readonly prefixer: (key: string) => string @@ -202,6 +202,39 @@ return cjson.encode(results)` return map } + /** + * Atomically removes members from a set and cleans up the tracking queue + * if the set is now empty. Prevents race conditions between SCARD and SREM. + * + * @returns 1 if the queue entry was removed, 0 otherwise. + */ + public static async ackSetMembers( + client: RedisClient, + setKey: string, + queueKey: string, + queueMember: string, + members: string[], + ): Promise { + // Guard clause: Redis errors if you call SREM with no members + if (members.length === 0) return 0 + + // Uses 'unpack' for O(1) script execution instead of a loop + const script = ` + redis.call('SREM', KEYS[1], unpack(ARGV, 2)) + if redis.call('SCARD', KEYS[1]) == 0 then + return redis.call('SREM', KEYS[2], ARGV[1]) + end + return 0 + ` + + const result = await client.eval(script, { + keys: [setKey, queueKey], + arguments: [queueMember, ...members], + }) + + return Number(result) + } + public async setIfNotExistsOrGet( key: string, value: string, From 0f3d3a2ae353895f0c36f0ef41fd1aeb5edc2ed0 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:30:04 +0530 Subject: [PATCH 14/18] fix: redis key pruning script Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../remove-member-org-stint-hash-keys.ts | 71 ++++--------------- 1 file changed, 13 insertions(+), 58 deletions(-) diff --git a/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts b/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts index 1dec70c86a..b4e49ec27f 100644 --- a/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts +++ b/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts @@ -1,5 +1,3 @@ -import commandLineArgs from 'command-line-args' - import { MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, MEMBER_ORG_STINT_CHANGES_QUEUE, @@ -11,71 +9,28 @@ import { REDIS_CONFIG } from '@/conf' const log = getServiceLogger() -const options = [ - { - name: 'confirm', - alias: 'c', - type: Boolean, - description: 'Actually delete old hash keys. Defaults to dry-run.', - }, - { - name: 'count', - type: Number, - defaultValue: 500, - description: 'SCAN count hint.', - }, -] - -const parameters = commandLineArgs(options) - -function memberIdFromDatesKey(key: string): string { - return key.slice(`${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:`.length) -} - setImmediate(async () => { - const dryRun = !parameters.confirm - const scanCount = parameters.count const redis = await getRedisClient(REDIS_CONFIG, true) - const pattern = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:*` - - let scanned = 0 - let hashKeys = 0 - let deleted = 0 - let cursor = 0 - - log.info({ dryRun, pattern, scanCount }, 'Removing old member organization stint hash keys.') + const prefixes = [MEMBER_ORG_STINT_CHANGES_QUEUE, `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:*`] try { - do { - const result = await redis.scan(cursor, { - MATCH: pattern, - COUNT: scanCount, - }) - - cursor = Number(result.cursor) - scanned += result.keys.length + for (const pattern of prefixes) { + let cursor = 0 + log.info({ pattern }, 'Nuking keys matching pattern.') - for (const key of result.keys) { - const type = await redis.type(key) - if (type === 'hash') { - hashKeys++ - const memberId = memberIdFromDatesKey(key) + do { + const result = await redis.scan(cursor, { MATCH: pattern, COUNT: 1000 }) + cursor = Number(result.cursor) - if (dryRun) { - log.info({ key, memberId }, 'Would remove old hash key and queue member.') - } else { - await redis.multi().del(key).sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId).exec() - - deleted++ - } + if (result.keys.length > 0) { + await redis.del(result.keys) + log.info({ count: result.keys.length }, 'Deleted batch of keys.') } - } - } while (cursor !== 0) + } while (cursor !== 0) + } - log.info({ dryRun, scanned, hashKeys, deleted }, 'Finished removing old hash keys.') + log.info('Cleanup complete.') } finally { await stopClient(redis) } - - process.exit(0) }) From 21e42bf760753af739f42a935778641b49882d96 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:44:09 +0530 Subject: [PATCH 15/18] chore: rm redis key clean up script Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- backend/package.json | 3 +- .../remove-member-org-stint-hash-keys.ts | 36 ------------------- 2 files changed, 1 insertion(+), 38 deletions(-) delete mode 100644 backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts diff --git a/backend/package.json b/backend/package.json index 9b1fc3d4a9..5bc3ff15d7 100644 --- a/backend/package.json +++ b/backend/package.json @@ -32,8 +32,7 @@ "script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts", "script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts", "script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts", - "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts", - "script:remove-member-org-stint-hash-keys": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/remove-member-org-stint-hash-keys.ts" + "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts" }, "lint-staged": { "**/*.ts": [ diff --git a/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts b/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts deleted file mode 100644 index b4e49ec27f..0000000000 --- a/backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { - MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, - MEMBER_ORG_STINT_CHANGES_QUEUE, -} from '@crowd/common_services' -import { getServiceLogger } from '@crowd/logging' -import { getRedisClient, stopClient } from '@crowd/redis' - -import { REDIS_CONFIG } from '@/conf' - -const log = getServiceLogger() - -setImmediate(async () => { - const redis = await getRedisClient(REDIS_CONFIG, true) - const prefixes = [MEMBER_ORG_STINT_CHANGES_QUEUE, `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:*`] - - try { - for (const pattern of prefixes) { - let cursor = 0 - log.info({ pattern }, 'Nuking keys matching pattern.') - - do { - const result = await redis.scan(cursor, { MATCH: pattern, COUNT: 1000 }) - cursor = Number(result.cursor) - - if (result.keys.length > 0) { - await redis.del(result.keys) - log.info({ count: result.keys.length }, 'Deleted batch of keys.') - } - } while (cursor !== 0) - } - - log.info('Cleanup complete.') - } finally { - await stopClient(redis) - } -}) From 0adfab9f50f4a1dddfa9bbda2e05b84e210a439a Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 30 Apr 2026 15:07:08 +0530 Subject: [PATCH 16/18] fix: lua script edge case Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- services/libs/redis/src/cache.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/services/libs/redis/src/cache.ts b/services/libs/redis/src/cache.ts index 29db9cb7c0..1dfc52f8db 100644 --- a/services/libs/redis/src/cache.ts +++ b/services/libs/redis/src/cache.ts @@ -218,9 +218,12 @@ return cjson.encode(results)` // Guard clause: Redis errors if you call SREM with no members if (members.length === 0) return 0 - // Uses 'unpack' for O(1) script execution instead of a loop const script = ` - redis.call('SREM', KEYS[1], unpack(ARGV, 2)) + local chunkSize = 500 + for i = 2, #ARGV, chunkSize do + redis.call('SREM', KEYS[1], unpack(ARGV, i, math.min(i + chunkSize - 1, #ARGV))) + end + if redis.call('SCARD', KEYS[1]) == 0 then return redis.call('SREM', KEYS[2], ARGV[1]) end From bdce7d13a390a34878c7484d9db631b809a590a2 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 30 Apr 2026 19:37:43 +0530 Subject: [PATCH 17/18] fix: set organization source to UI for manual edits in member organization updates Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/services/member/memberOrganizationsService.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/src/services/member/memberOrganizationsService.ts b/backend/src/services/member/memberOrganizationsService.ts index 251ea59019..4ccfa1fd22 100644 --- a/backend/src/services/member/memberOrganizationsService.ts +++ b/backend/src/services/member/memberOrganizationsService.ts @@ -22,6 +22,7 @@ import { IOrganization, IRenderFriendlyMemberOrganization, MemberOrganizationUpdate, + OrganizationSource, } from '@crowd/types' import SequelizeRepository from '@/database/repositories/sequelizeRepository' @@ -219,14 +220,18 @@ export default class MemberOrganizationsService extends LoggerBase { title: data.title, dateStart: data.dateStart, dateEnd: data.dateEnd, - source: data.source, verified: data.verified, verifiedBy: data.verifiedBy, }).filter(([, v]) => v !== undefined), ) await cleanSoftDeletedMemberOrganization(qx, memberId, data.organizationId, data) - await updateMemberOrganization(qx, memberId, id, update) + // Any manual edit from the frontend promotes ownership to UI so automated + // sources (e.g. email-domain inference) no longer overwrite user intent. + await updateMemberOrganization(qx, memberId, id, { + ...update, + source: OrganizationSource.UI, + }) await this.commonMemberService.startAffiliationRecalculation(memberId, [data.organizationId]) From afc5734b99eaaa1da91d0f00c167313731ef721b Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Sun, 3 May 2026 19:06:02 +0530 Subject: [PATCH 18/18] chore: rm debugger env Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- services/apps/cron_service/package.json | 2 +- services/apps/data_sink_worker/package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/cron_service/package.json b/services/apps/cron_service/package.json index 7740957821..1077a2e0b1 100644 --- a/services/apps/cron_service/package.json +++ b/services/apps/cron_service/package.json @@ -2,7 +2,7 @@ "name": "@crowd/cron-service", "private": true, "scripts": { - "start": "SERVICE=cron-service LOG_LEVEL=trace tsx src/main.ts", + "start": "SERVICE=cron-service tsx src/main.ts", "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", "start:debug": "SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", diff --git a/services/apps/data_sink_worker/package.json b/services/apps/data_sink_worker/package.json index 71ec155d52..f9a2363081 100644 --- a/services/apps/data_sink_worker/package.json +++ b/services/apps/data_sink_worker/package.json @@ -2,7 +2,7 @@ "name": "@crowd/data-sink-worker", "private": true, "scripts": { - "start": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx src/main.ts", + "start": "SERVICE=data-sink-worker tsx src/main.ts", "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts", "start:debug": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts", "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",