Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain"
ON "memberOrganizations" ("memberId")
WHERE "source" = 'email-domain' AND "deletedAt" IS NULL;
2 changes: 1 addition & 1 deletion services/apps/cron_service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/cron-service",
"private": true,
"scripts": {
"start": "SERVICE=cron-service tsx src/main.ts",
"start": "SERVICE=cron-service LOG_LEVEL=trace tsx src/main.ts",
Comment thread
skwowet marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove log level trace here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, added it for debugging during tests. Will remove it after.

"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug": "SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import CronTime from 'cron-time-generator'

import {
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
MEMBER_ORG_STINT_CHANGES_QUEUE,
inferMemberOrganizationStintChanges,
} from '@crowd/common_services'
import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { REDIS_CONFIG, getRedisClient } from '@crowd/redis'
import { OrganizationSource } from '@crowd/types'

import { IJobDefinition } from '../types'

const job: IJobDefinition = {
name: 'infer-member-organization-stint-changes',
cronTime: CronTime.every(5).minutes(),
timeout: 10 * 60,
process: async (ctx) => {
const redis = await getRedisClient(REDIS_CONFIG())
const db = await getDbConnection(WRITE_DB_CONFIG(), 2, 0)
const qx = pgpQx(db)

// 1. Get a batch of work
const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500)
Comment thread
skwowet marked this conversation as resolved.
Outdated
if (!memberIds?.length) return

ctx.log.info({ count: memberIds.length }, 'Processing pending members.')
const stats = { processed: 0, inserts: 0, updates: 0 }

for (const memberId of memberIds) {
try {
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
const hash = await redis.hGetAll(datesKey)

// If no data, just remove from queue and move on
if (!hash || Object.keys(hash).length === 0) {
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
continue
}

// 2. Parse Redis data into domain objects
const { activityDates, orgIds } = parseMemberActivityHash(hash)

if (activityDates.length > 0) {
// 3. Compare with DB and calculate delta
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)

const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)

if (changes.length > 0) {
ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.')
stats.inserts += changes.filter((c) => c.type === 'insert').length
stats.updates += changes.filter((c) => c.type === 'update').length
}
}
Comment thread
skwowet marked this conversation as resolved.
Comment thread
skwowet marked this conversation as resolved.
Outdated
Comment thread
skwowet marked this conversation as resolved.
Outdated

// 4. Cleanup: Remove only the fields we actually read
await redis
.multi()
.hDel(datesKey, orgIds)
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
.exec()
Comment thread
skwowet marked this conversation as resolved.
Outdated

stats.processed++
Comment thread
skwowet marked this conversation as resolved.
Outdated
} catch (err) {
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
}
Comment thread
skwowet marked this conversation as resolved.
}

ctx.log.info(stats, 'Batch complete.')
},
}

/**
* Parses the Redis hash into a clean, typed list of activity dates.
*/
function parseMemberActivityHash(hash: Record<string, string>) {
const orgIds = Object.keys(hash)
const activityDates = orgIds.flatMap((organizationId) => {
try {
const dates = JSON.parse(hash[organizationId])
return Array.isArray(dates)
? dates
.filter((d): d is string => typeof d === 'string')
.map((date) => ({ organizationId, date }))
: []
} catch {
return []
}
})
return { activityDates, orgIds }
}

export default job
4 changes: 1 addition & 3 deletions services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/data-sink-worker",
"private": true,
"scripts": {
"start": "SERVICE=data-sink-worker tsx src/main.ts",
"start": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx src/main.ts",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove log level trace here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Comment thread
skwowet marked this conversation as resolved.
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"start:debug": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand All @@ -17,8 +17,6 @@
"script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts",
"script:process-results": "SERVICE=script tsx src/bin/process-results.ts",
"script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts",
"script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts",
"script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts",
"script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts",
"script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts",
"script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts",
Expand Down
97 changes: 0 additions & 97 deletions services/apps/data_sink_worker/src/bin/map-member-to-org.ts

This file was deleted.

This file was deleted.

10 changes: 10 additions & 0 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
distinctBy,
escapeNullByte,
generateUUIDv1,
isDomainExcluded,
isValidEmail,
parseGitHubNoreplyEmail,
single,
Expand Down Expand Up @@ -1197,6 +1198,7 @@ export default class ActivityService extends LoggerBase {
value.platform,
undefined,
orgPromiseCache,
value.timestamp,
)
.then((memberId) => {
// map ids for members
Expand Down Expand Up @@ -1342,6 +1344,7 @@ export default class ActivityService extends LoggerBase {
payload.platform,
undefined,
orgPromiseCache,
payload.activity.timestamp,
)
.then(() => {
payload.memberId = payload.dbMember.id
Expand Down Expand Up @@ -1400,6 +1403,7 @@ export default class ActivityService extends LoggerBase {
payload.platform,
undefined,
orgPromiseCache,
payload.activity.timestamp,
)
.then(() => {
payload.objectMemberId = payload.dbObjectMember.id
Expand Down Expand Up @@ -1447,11 +1451,17 @@ export default class ActivityService extends LoggerBase {
) as boolean

if (!isBot) {
const emailDomain = payload.activity.member.identities
?.filter((i) => i.type === MemberIdentityType.EMAIL && i.verified)
.map((i) => i.value.split('@')[1]?.toLowerCase())
.find((domain) => domain && !isDomainExcluded(domain))

// associate activity with organization
payload.organizationId = await this.commonMemberService.findAffiliation(
payload.memberId,
payload.segmentId,
payload.activity.timestamp,
emailDomain,
)
} else {
// for bot members, we don't want to affiliate the activity with an organization
Expand Down
Loading
Loading