-
Notifications
You must be signed in to change notification settings - Fork 731
feat: infer memberOrganization stint dates from work-email activities (CM-1105) #4054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d997511
057464d
0af601c
34354a5
f74aef1
68a79fc
9553e2c
8a9f500
d7a18df
15bd7ec
8914f41
498f0c1
4fead74
0f3d3a2
21e42bf
0adfab9
bdce7d1
c2f76de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain" | ||
| ON "memberOrganizations" ("memberId") | ||
| WHERE "source" = 'email-domain' AND "deletedAt" IS NULL; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ | |
| "name": "@crowd/cron-service", | ||
| "private": true, | ||
| "scripts": { | ||
| "start": "SERVICE=cron-service tsx src/main.ts", | ||
| "start": "SERVICE=cron-service LOG_LEVEL=trace tsx src/main.ts", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove log level trace here.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, added it for debugging during tests. Will remove it after. |
||
| "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", | ||
| "start:debug": "SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", | ||
| "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| import CronTime from 'cron-time-generator' | ||
|
|
||
| import { | ||
| MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, | ||
| MEMBER_ORG_STINT_CHANGES_QUEUE, | ||
| inferMemberOrganizationStintChanges, | ||
| } from '@crowd/common_services' | ||
| import { | ||
| QueryExecutor, | ||
| changeMemberOrganizationAffiliationOverrides, | ||
| checkOrganizationAffiliationPolicy, | ||
| createMemberOrganization, | ||
| fetchMemberOrganizationsBySource, | ||
| updateMemberOrganization, | ||
| } from '@crowd/data-access-layer' | ||
| import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' | ||
| import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' | ||
| import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis' | ||
| import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types' | ||
|
|
||
| import { IJobDefinition } from '../types' | ||
|
|
||
| const job: IJobDefinition = { | ||
| name: 'infer-member-organization-stint-changes', | ||
| cronTime: CronTime.every(5).minutes(), | ||
| timeout: 10 * 60, | ||
| process: async (ctx) => { | ||
| const redis = await getRedisClient(REDIS_CONFIG()) | ||
| const db = await getDbConnection(WRITE_DB_CONFIG()) | ||
| const qx = pgpQx(db) | ||
|
|
||
| ctx.log.info('Starting member organization stint inference job.') | ||
|
|
||
| const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) | ||
| if (!memberIds?.length) return | ||
|
|
||
| ctx.log.info({ count: memberIds.length }, 'Processing members from queue.') | ||
|
|
||
| let processed = 0 | ||
|
|
||
| for (const memberId of memberIds) { | ||
| try { | ||
| const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` | ||
| const rawMembers = await redis.sMembers(datesKey) | ||
|
|
||
| if (!rawMembers?.length) { | ||
| await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) | ||
| continue | ||
| } | ||
|
|
||
| const orgDates = parseSetMembers(rawMembers) | ||
|
|
||
| if (orgDates.length > 0) { | ||
| const existingOrgs = await fetchMemberOrganizationsBySource( | ||
| qx, | ||
| memberId, | ||
| OrganizationSource.EMAIL_DOMAIN, | ||
| ) | ||
|
|
||
| const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, orgDates) | ||
|
|
||
| if (changes.length > 0) { | ||
| ctx.log.debug({ memberId, changes }, 'Stint changes identified.') | ||
| await qx.tx((tx) => applyStintChanges(tx, changes)) | ||
| } | ||
| } | ||
|
skwowet marked this conversation as resolved.
|
||
|
|
||
| // Atomically remove only the values we read. | ||
| // If no new values were added, remove the member from the queue. | ||
| await RedisCache.ackSetMembers( | ||
| redis, | ||
| datesKey, | ||
| MEMBER_ORG_STINT_CHANGES_QUEUE, | ||
| memberId, | ||
| rawMembers, | ||
| ) | ||
|
|
||
| processed++ | ||
| } catch (err) { | ||
| ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') | ||
| throw err | ||
| } | ||
|
skwowet marked this conversation as resolved.
|
||
| } | ||
|
|
||
| ctx.log.info({ processed }, 'Batch complete.') | ||
| }, | ||
| } | ||
|
|
||
| /** | ||
| * Parses set members of the form "orgId|date" into typed activity dates. | ||
| */ | ||
| function parseSetMembers(members: string[]): MemberOrgDate[] { | ||
| const results: MemberOrgDate[] = [] | ||
|
|
||
| for (const m of members) { | ||
| const idx = m.indexOf('|') | ||
| if (idx > 0) { | ||
| results.push({ organizationId: m.slice(0, idx), date: m.slice(idx + 1) }) | ||
| } | ||
| } | ||
|
|
||
| return results | ||
| } | ||
|
|
||
| /** | ||
| * Applies the stint changes to the database. | ||
| */ | ||
| async function applyStintChanges(qx: QueryExecutor, changes: MemberOrgStintChange[]) { | ||
| for (const change of changes) { | ||
| if (change.type === 'insert') { | ||
| const memberOrganizationId = await createMemberOrganization(qx, change.memberId, { | ||
| organizationId: change.organizationId, | ||
| dateStart: change.dateStart, | ||
| dateEnd: change.dateEnd, | ||
| source: OrganizationSource.EMAIL_DOMAIN, | ||
| }) | ||
|
|
||
| const isAffiliationBlocked = await checkOrganizationAffiliationPolicy( | ||
| qx, | ||
| change.organizationId, | ||
| ) | ||
|
|
||
| if (memberOrganizationId && isAffiliationBlocked) { | ||
| await changeMemberOrganizationAffiliationOverrides(qx, [ | ||
| { | ||
| memberId: change.memberId, | ||
| memberOrganizationId, | ||
| allowAffiliation: false, | ||
| }, | ||
| ]) | ||
| } | ||
| } else { | ||
| await updateMemberOrganization(qx, change.memberId, change.id, { | ||
| dateStart: change.dateStart, | ||
| dateEnd: change.dateEnd, | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| export default job | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ | |
| "name": "@crowd/data-sink-worker", | ||
| "private": true, | ||
| "scripts": { | ||
| "start": "SERVICE=data-sink-worker tsx src/main.ts", | ||
| "start": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx src/main.ts", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove log level trace here.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here
skwowet marked this conversation as resolved.
|
||
| "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts", | ||
| "start:debug": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts", | ||
| "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", | ||
|
|
@@ -17,8 +17,6 @@ | |
| "script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts", | ||
| "script:process-results": "SERVICE=script tsx src/bin/process-results.ts", | ||
| "script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts", | ||
| "script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts", | ||
| "script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts", | ||
| "script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts", | ||
| "script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts", | ||
| "script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts", | ||
|
|
||
This file was deleted.
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.