Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d997511
feat: infer memberOrganization stint dates from work-email activities
skwowet Apr 24, 2026
057464d
fix: resolve pr comments from ai bots
skwowet Apr 24, 2026
0af601c
fix: update redis member ID retrieval method and cleanup logic
skwowet Apr 24, 2026
34354a5
Update services/apps/data_sink_worker/src/service/member.service.ts
skwowet Apr 24, 2026
f74aef1
fix: resolve pr comments from ai bots
skwowet Apr 24, 2026
68a79fc
refactor: extract source rank logic into a separate func for clarity …
skwowet Apr 26, 2026
9553e2c
refactor: streamline email domain extraction logic in ActivityService
skwowet Apr 28, 2026
8a9f500
chore: enable debugger logs in prod
skwowet Apr 29, 2026
d7a18df
chore: add logging for member organization stint inference job start
skwowet Apr 29, 2026
15bd7ec
fix: debugger and info logs
skwowet Apr 29, 2026
8914f41
chore: remove unused constant and add TODO for applying stint changes…
skwowet Apr 29, 2026
498f0c1
feat: apply overrides in member organization stint changes job
skwowet Apr 29, 2026
4fead74
fix: resolve pr review comments
skwowet Apr 30, 2026
0f3d3a2
fix: redis key pruning script
skwowet Apr 30, 2026
21e42bf
chore: rm redis key clean up script
skwowet Apr 30, 2026
0adfab9
fix: lua script edge case
skwowet Apr 30, 2026
bdce7d1
fix: set organization source to UI for manual edits in member organiz…
skwowet Apr 30, 2026
c2f76de
Merge branch 'main' into improve/CM-1105
skwowet Apr 30, 2026
afc5734
chore: rm debugger env
skwowet May 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain"
ON "memberOrganizations" ("memberId")
WHERE "source" = 'email-domain' AND "deletedAt" IS NULL;
9 changes: 7 additions & 2 deletions backend/src/services/member/memberOrganizationsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
IOrganization,
IRenderFriendlyMemberOrganization,
MemberOrganizationUpdate,
OrganizationSource,
} from '@crowd/types'

import SequelizeRepository from '@/database/repositories/sequelizeRepository'
Expand Down Expand Up @@ -219,14 +220,18 @@ export default class MemberOrganizationsService extends LoggerBase {
title: data.title,
dateStart: data.dateStart,
dateEnd: data.dateEnd,
source: data.source,
verified: data.verified,
verifiedBy: data.verifiedBy,
}).filter(([, v]) => v !== undefined),
)

await cleanSoftDeletedMemberOrganization(qx, memberId, data.organizationId, data)
await updateMemberOrganization(qx, memberId, id, update)
// Any manual edit from the frontend promotes ownership to UI so automated
// sources (e.g. email-domain inference) no longer overwrite user intent.
await updateMemberOrganization(qx, memberId, id, {
...update,
source: OrganizationSource.UI,
})

await this.commonMemberService.startAffiliationRecalculation(memberId, [data.organizationId])

Expand Down
2 changes: 1 addition & 1 deletion services/apps/cron_service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/cron-service",
"private": true,
"scripts": {
"start": "SERVICE=cron-service tsx src/main.ts",
"start": "SERVICE=cron-service LOG_LEVEL=trace tsx src/main.ts",
Comment thread
skwowet marked this conversation as resolved.
Outdated
Comment thread
skwowet marked this conversation as resolved.
Outdated
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug": "SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import CronTime from 'cron-time-generator'

import {
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
MEMBER_ORG_STINT_CHANGES_QUEUE,
inferMemberOrganizationStintChanges,
} from '@crowd/common_services'
import {
QueryExecutor,
changeMemberOrganizationAffiliationOverrides,
checkOrganizationAffiliationPolicy,
createMemberOrganization,
fetchMemberOrganizationsBySource,
updateMemberOrganization,
} from '@crowd/data-access-layer'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis'
import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types'

import { IJobDefinition } from '../types'

const job: IJobDefinition = {
name: 'infer-member-organization-stint-changes',
cronTime: CronTime.every(5).minutes(),
timeout: 10 * 60,
process: async (ctx) => {
const redis = await getRedisClient(REDIS_CONFIG())
const db = await getDbConnection(WRITE_DB_CONFIG())
const qx = pgpQx(db)

ctx.log.info('Starting member organization stint inference job.')

const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500)
if (!memberIds?.length) return

ctx.log.info({ count: memberIds.length }, 'Processing members from queue.')

let processed = 0

for (const memberId of memberIds) {
try {
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
const rawMembers = await redis.sMembers(datesKey)

if (!rawMembers?.length) {
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
continue
}

const orgDates = parseSetMembers(rawMembers)

if (orgDates.length > 0) {
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)

const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, orgDates)

if (changes.length > 0) {
ctx.log.debug({ memberId, changes }, 'Stint changes identified.')
await qx.tx((tx) => applyStintChanges(tx, changes))
}
}
Comment thread
skwowet marked this conversation as resolved.

// Atomically remove only the values we read.
// If no new values were added, remove the member from the queue.
await RedisCache.ackSetMembers(
redis,
datesKey,
MEMBER_ORG_STINT_CHANGES_QUEUE,
memberId,
rawMembers,
)

processed++
} catch (err) {
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
throw err
Comment thread
skwowet marked this conversation as resolved.
}
Comment thread
skwowet marked this conversation as resolved.
}

ctx.log.info({ processed }, 'Batch complete.')
},
}

/**
* Parses set members of the form "orgId|date" into typed activity dates.
*/
function parseSetMembers(members: string[]): MemberOrgDate[] {
const results: MemberOrgDate[] = []

for (const m of members) {
const idx = m.indexOf('|')
if (idx > 0) {
results.push({ organizationId: m.slice(0, idx), date: m.slice(idx + 1) })
}
}

return results
}

/**
* Applies the stint changes to the database.
*/
async function applyStintChanges(qx: QueryExecutor, changes: MemberOrgStintChange[]) {
for (const change of changes) {
if (change.type === 'insert') {
const memberOrganizationId = await createMemberOrganization(qx, change.memberId, {
organizationId: change.organizationId,
dateStart: change.dateStart,
dateEnd: change.dateEnd,
source: OrganizationSource.EMAIL_DOMAIN,
})

const isAffiliationBlocked = await checkOrganizationAffiliationPolicy(
qx,
change.organizationId,
)

if (memberOrganizationId && isAffiliationBlocked) {
await changeMemberOrganizationAffiliationOverrides(qx, [
{
memberId: change.memberId,
memberOrganizationId,
allowAffiliation: false,
},
])
}
} else {
await updateMemberOrganization(qx, change.memberId, change.id, {
dateStart: change.dateStart,
dateEnd: change.dateEnd,
})
}
}
}

export default job
4 changes: 1 addition & 3 deletions services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/data-sink-worker",
"private": true,
"scripts": {
"start": "SERVICE=data-sink-worker tsx src/main.ts",
"start": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx src/main.ts",
Comment thread
skwowet marked this conversation as resolved.
Outdated
Comment thread
skwowet marked this conversation as resolved.
Outdated
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"start:debug": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand All @@ -17,8 +17,6 @@
"script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts",
"script:process-results": "SERVICE=script tsx src/bin/process-results.ts",
"script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts",
"script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts",
"script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts",
"script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts",
"script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts",
"script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts",
Expand Down
97 changes: 0 additions & 97 deletions services/apps/data_sink_worker/src/bin/map-member-to-org.ts

This file was deleted.

This file was deleted.

Loading
Loading