Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts",
"script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts",
"script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts",
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts"
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts",
"script:remove-member-org-stint-hash-keys": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/remove-member-org-stint-hash-keys.ts"
},
"lint-staged": {
"**/*.ts": [
Expand Down
81 changes: 81 additions & 0 deletions backend/src/bin/scripts/remove-member-org-stint-hash-keys.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import commandLineArgs from 'command-line-args'

import {
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
MEMBER_ORG_STINT_CHANGES_QUEUE,
} from '@crowd/common_services'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient, stopClient } from '@crowd/redis'

import { REDIS_CONFIG } from '@/conf'

const log = getServiceLogger()

const options = [
{
name: 'confirm',
alias: 'c',
type: Boolean,
description: 'Actually delete old hash keys. Defaults to dry-run.',
},
{
name: 'count',
type: Number,
defaultValue: 500,
description: 'SCAN count hint.',
},
]

const parameters = commandLineArgs(options)

function memberIdFromDatesKey(key: string): string {
return key.slice(`${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:`.length)
}

setImmediate(async () => {
const dryRun = !parameters.confirm
const scanCount = parameters.count
const redis = await getRedisClient(REDIS_CONFIG, true)
const pattern = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:*`

let scanned = 0
let hashKeys = 0
let deleted = 0
let cursor = 0

log.info({ dryRun, pattern, scanCount }, 'Removing old member organization stint hash keys.')

try {
do {
const result = await redis.scan(cursor, {
MATCH: pattern,
COUNT: scanCount,
})

cursor = Number(result.cursor)
scanned += result.keys.length

for (const key of result.keys) {
const type = await redis.type(key)
if (type === 'hash') {
hashKeys++
const memberId = memberIdFromDatesKey(key)

if (dryRun) {
log.info({ key, memberId }, 'Would remove old hash key and queue member.')
} else {
await redis.multi().del(key).sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId).exec()

deleted++
}
}
}
} while (cursor !== 0)

log.info({ dryRun, scanned, hashKeys, deleted }, 'Finished removing old hash keys.')
} finally {
await stopClient(redis)
}

process.exit(0)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain"
ON "memberOrganizations" ("memberId")
WHERE "source" = 'email-domain' AND "deletedAt" IS NULL;
2 changes: 1 addition & 1 deletion services/apps/cron_service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/cron-service",
"private": true,
"scripts": {
"start": "SERVICE=cron-service tsx src/main.ts",
"start": "SERVICE=cron-service LOG_LEVEL=trace tsx src/main.ts",
Comment thread
skwowet marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove log level trace here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, added it for debugging during tests. Will remove it after.

"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug": "SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import CronTime from 'cron-time-generator'

import {
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
MEMBER_ORG_STINT_CHANGES_QUEUE,
inferMemberOrganizationStintChanges,
} from '@crowd/common_services'
import {
QueryExecutor,
changeMemberOrganizationAffiliationOverrides,
checkOrganizationAffiliationPolicy,
createMemberOrganization,
fetchMemberOrganizationsBySource,
updateMemberOrganization,
} from '@crowd/data-access-layer'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis'
import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types'

import { IJobDefinition } from '../types'

const job: IJobDefinition = {
name: 'infer-member-organization-stint-changes',
cronTime: CronTime.every(5).minutes(),
timeout: 10 * 60,
process: async (ctx) => {
const redis = await getRedisClient(REDIS_CONFIG())
const db = await getDbConnection(WRITE_DB_CONFIG())
const qx = pgpQx(db)

ctx.log.info('Starting member organization stint inference job.')

const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500)
if (!memberIds?.length) return

ctx.log.info({ count: memberIds.length }, 'Processing members from queue.')

let processed = 0

for (const memberId of memberIds) {
try {
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
const rawMembers = await redis.sMembers(datesKey)

if (!rawMembers?.length) {
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
continue
}

const orgDates = parseSetMembers(rawMembers)

if (orgDates.length > 0) {
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)

const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, orgDates)

if (changes.length > 0) {
ctx.log.debug({ memberId, changes }, 'Stint changes identified.')
await qx.tx((tx) => applyStintChanges(tx, changes))
}
}
Comment thread
skwowet marked this conversation as resolved.

// Atomically remove only the values we read.
// If no new values were added, remove the member from the queue.
await RedisCache.ackSetMembers(
redis,
datesKey,
MEMBER_ORG_STINT_CHANGES_QUEUE,
memberId,
rawMembers,
)

processed++
} catch (err) {
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
throw err
}
Comment thread
skwowet marked this conversation as resolved.
}

ctx.log.info({ processed }, 'Batch complete.')
},
}

/**
* Parses set members of the form "orgId|date" into typed activity dates.
*/
function parseSetMembers(members: string[]): MemberOrgDate[] {
const results: MemberOrgDate[] = []

for (const m of members) {
const idx = m.indexOf('|')
if (idx > 0) {
results.push({ organizationId: m.slice(0, idx), date: m.slice(idx + 1) })
}
}

return results
}

/**
* Applies the stint changes to the database.
*/
async function applyStintChanges(qx: QueryExecutor, changes: MemberOrgStintChange[]) {
for (const change of changes) {
if (change.type === 'insert') {
const memberOrganizationId = await createMemberOrganization(qx, change.memberId, {
organizationId: change.organizationId,
dateStart: change.dateStart,
dateEnd: change.dateEnd,
source: OrganizationSource.EMAIL_DOMAIN,
})

const isAffiliationBlocked = await checkOrganizationAffiliationPolicy(
qx,
change.organizationId,
)

if (memberOrganizationId && isAffiliationBlocked) {
await changeMemberOrganizationAffiliationOverrides(qx, [
{
memberId: change.memberId,
memberOrganizationId,
allowAffiliation: false,
},
])
}
} else {
await updateMemberOrganization(qx, change.memberId, change.id, {
dateStart: change.dateStart,
dateEnd: change.dateEnd,
})
}
}
}

export default job
4 changes: 1 addition & 3 deletions services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/data-sink-worker",
"private": true,
"scripts": {
"start": "SERVICE=data-sink-worker tsx src/main.ts",
"start": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx src/main.ts",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove log level trace here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Comment thread
skwowet marked this conversation as resolved.
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"start:debug": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand All @@ -17,8 +17,6 @@
"script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts",
"script:process-results": "SERVICE=script tsx src/bin/process-results.ts",
"script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts",
"script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts",
"script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts",
"script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts",
"script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts",
"script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts",
Expand Down
97 changes: 0 additions & 97 deletions services/apps/data_sink_worker/src/bin/map-member-to-org.ts

This file was deleted.

Loading
Loading