Skip to content

Commit 6b3bfcb

Browse files
committed
feat: add recalculate affiliations script
Signed-off-by: Umberto Sgueglia <usgueglia@contractor.linuxfoundation.org>
1 parent 141bc2b commit 6b3bfcb

File tree

3 files changed

+269
-0
lines changed

3 files changed

+269
-0
lines changed

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/apps/script_executor_worker/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
99
"cleanup-fork-activities": "npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts",
1010
"cleanup-member-aggregates": "npx tsx src/bin/cleanup-member-aggregates.ts",
11+
"recalculate-enrichment-affiliations": "npx tsx src/bin/recalculate-enrichment-affiliations.ts",
1112
"add-lf-projects-to-collection": "npx tsx src/bin/add-lf-projects-to-collection.ts",
1213
"lint": "npx eslint --ext .ts src --max-warnings=0",
1314
"format": "npx prettier --write \"src/**/*.ts\"",
@@ -24,6 +25,7 @@
2425
"@crowd/redis": "workspace:*",
2526
"@crowd/snowflake": "workspace:*",
2627
"@crowd/types": "workspace:*",
28+
"@crowd/temporal": "workspace:*",
2729
"@temporalio/client": "~1.11.8",
2830
"@temporalio/workflow": "~1.11.8",
2931
"axios": "^1.6.8",
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/**
2+
* Recalculate Enrichment Affiliations Script
3+
*
4+
* PROBLEM:
5+
* Before fix CM-1118, the members_enrichment_worker modified work experiences
6+
* (memberOrganizations) without triggering affiliation recalculation. As a result,
7+
* activityRelations.organizationId may be stale for members whose work experiences
8+
* were created, updated, or deleted by enrichment.
9+
*
10+
* SOLUTION:
11+
* This script finds all members with enrichment-sourced work experiences and triggers
12+
* a memberUpdate Temporal workflow for each, which recalculates affiliations and syncs
13+
* to OpenSearch.
14+
*
15+
* Usage:
16+
* # Via package.json script (recommended):
17+
* pnpm run recalculate-enrichment-affiliations -- [options]
18+
*
19+
* # Or directly with tsx:
20+
* npx tsx src/bin/recalculate-enrichment-affiliations.ts [options]
21+
*
22+
* Options:
23+
* --page-size <n> Number of members to fetch per DB page (default: 1000)
24+
* --concurrency <n> Max concurrent Temporal workflow starts per page (default: 20)
25+
* --page-delay <ms> Milliseconds to wait between pages (default: 5000)
26+
* --start-after <id> Resume from a specific memberId (exclusive, for restarts)
27+
* --dry-run Log what would be processed without starting workflows
28+
* --limit <n> Stop after processing at most N members total (for testing)
29+
*
30+
* Environment Variables Required:
31+
* CROWD_DB_WRITE_HOST - Postgres write host
32+
* CROWD_DB_PORT - Postgres port
33+
* CROWD_DB_USERNAME - Postgres username
34+
* CROWD_DB_PASSWORD - Postgres password
35+
* CROWD_DB_DATABASE - Postgres database name
36+
* CROWD_TEMPORAL_SERVER_URL - Temporal server URL
37+
* CROWD_TEMPORAL_NAMESPACE - Temporal namespace
38+
* SERVICE - Service identifier (used by Temporal client)
39+
*/
40+
import { DEFAULT_TENANT_ID } from '@crowd/common'
41+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
42+
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
43+
import { getServiceChildLogger } from '@crowd/logging'
44+
import { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal'
45+
import { WorkflowIdReusePolicy } from '@temporalio/client'
46+
47+
const ENRICHMENT_SOURCES = ['enrichment-progai', 'enrichment-clearbit', 'enrichment-crustdata']
48+
49+
const log = getServiceChildLogger('recalculate-enrichment-affiliations')
50+
51+
interface MemberWithOrgs {
52+
memberId: string
53+
organizationIds: string[]
54+
}
55+
56+
interface ScriptOptions {
57+
pageSize: number
58+
concurrency: number
59+
pageDelayMs: number
60+
startAfter: string | null
61+
dryRun: boolean
62+
limit: number | null
63+
}
64+
65+
function parseArgs(): ScriptOptions {
66+
const args = process.argv.slice(2)
67+
68+
const getArg = (flag: string): string | undefined => {
69+
const idx = args.indexOf(flag)
70+
if (idx !== -1 && idx + 1 < args.length) return args[idx + 1]
71+
return undefined
72+
}
73+
74+
const pageSize = parseInt(getArg('--page-size') ?? '1000', 10)
75+
const concurrency = parseInt(getArg('--concurrency') ?? '20', 10)
76+
const pageDelayMs = parseInt(getArg('--page-delay') ?? '5000', 10)
77+
const startAfter = getArg('--start-after') ?? null
78+
const dryRun = args.includes('--dry-run')
79+
const limitRaw = getArg('--limit')
80+
const limit = limitRaw !== undefined ? parseInt(limitRaw, 10) : null
81+
82+
if (isNaN(pageSize) || pageSize <= 0) {
83+
log.error('--page-size must be a positive integer')
84+
process.exit(1)
85+
}
86+
if (isNaN(concurrency) || concurrency <= 0) {
87+
log.error('--concurrency must be a positive integer')
88+
process.exit(1)
89+
}
90+
if (isNaN(pageDelayMs) || pageDelayMs < 0) {
91+
log.error('--page-delay must be a non-negative integer')
92+
process.exit(1)
93+
}
94+
if (limit !== null && (isNaN(limit) || limit <= 0)) {
95+
log.error('--limit must be a positive integer')
96+
process.exit(1)
97+
}
98+
99+
return { pageSize, concurrency, pageDelayMs, startAfter, dryRun, limit }
100+
}
101+
102+
async function fetchPage(
103+
qx: ReturnType<typeof pgpQx>,
104+
afterMemberId: string | null,
105+
pageSize: number,
106+
): Promise<MemberWithOrgs[]> {
107+
const cursorClause = afterMemberId ? `AND "memberId" > $(afterMemberId)` : ''
108+
109+
// Selects only members that have at least one work experience created by enrichment
110+
// (source IN enrichment-progai, enrichment-clearbit, enrichment-crustdata).
111+
// organizationIds contains only the enrichment-sourced org IDs for that member —
112+
// used by the memberUpdate workflow to determine which orgs to sync to OpenSearch.
113+
const rows = await qx.select(
114+
`
115+
SELECT "memberId", array_agg(DISTINCT "organizationId") AS "organizationIds"
116+
FROM "memberOrganizations"
117+
WHERE source = ANY($(sources))
118+
${cursorClause}
119+
GROUP BY "memberId"
120+
ORDER BY "memberId"
121+
LIMIT $(pageSize)
122+
`,
123+
{ sources: ENRICHMENT_SOURCES, afterMemberId, pageSize },
124+
)
125+
126+
return rows as MemberWithOrgs[]
127+
}
128+
129+
async function runWithConcurrency<T>(
130+
items: T[],
131+
concurrency: number,
132+
fn: (item: T) => Promise<void>,
133+
): Promise<{ succeeded: number; failed: number }> {
134+
let succeeded = 0
135+
let failed = 0
136+
let index = 0
137+
138+
async function worker() {
139+
while (index < items.length) {
140+
const item = items[index++]
141+
try {
142+
await fn(item)
143+
succeeded++
144+
} catch (err) {
145+
failed++
146+
log.error({ err }, 'Failed to process member')
147+
}
148+
}
149+
}
150+
151+
const workers = Array.from({ length: Math.min(concurrency, items.length) }, worker)
152+
await Promise.all(workers)
153+
154+
return { succeeded, failed }
155+
}
156+
157+
async function main() {
158+
const opts = parseArgs()
159+
160+
log.info('='.repeat(80))
161+
log.info('Recalculate Enrichment Affiliations Script')
162+
log.info('='.repeat(80))
163+
log.info(`Page size: ${opts.pageSize}`)
164+
log.info(`Concurrency: ${opts.concurrency}`)
165+
log.info(`Page delay: ${opts.pageDelayMs}ms`)
166+
log.info(`Start after: ${opts.startAfter ?? '(beginning)'}`)
167+
log.info(`Mode: ${opts.dryRun ? 'DRY RUN' : 'LIVE'}`)
168+
log.info(`Limit: ${opts.limit ?? '(none)'}`)
169+
log.info('='.repeat(80))
170+
171+
// Init DB
172+
const dbConnection = await getDbConnection(WRITE_DB_CONFIG())
173+
const qx = pgpQx(dbConnection)
174+
175+
// Init Temporal
176+
const temporal = await getTemporalClient(TEMPORAL_CONFIG())
177+
178+
let cursor: string | null = opts.startAfter
179+
let pageNum = 0
180+
let totalSucceeded = 0
181+
let totalFailed = 0
182+
let totalProcessed = 0
183+
184+
while (true) {
185+
pageNum++
186+
187+
const remaining = opts.limit !== null ? opts.limit - totalProcessed : opts.pageSize
188+
if (remaining <= 0) {
189+
log.info(`Limit of ${opts.limit} members reached.`)
190+
break
191+
}
192+
193+
const fetchSize = Math.min(opts.pageSize, remaining)
194+
const membersPage = await fetchPage(qx, cursor, fetchSize)
195+
196+
if (membersPage.length === 0) {
197+
log.info('No more members to process.')
198+
break
199+
}
200+
201+
const lastMemberId = membersPage[membersPage.length - 1].memberId
202+
log.info(`Page ${pageNum}: ${membersPage.length} members | cursor: ${lastMemberId}`)
203+
204+
if (opts.dryRun) {
205+
log.info(`[DRY RUN] Would trigger ${membersPage.length} workflows`)
206+
totalProcessed += membersPage.length
207+
} else {
208+
const { succeeded, failed } = await runWithConcurrency(
209+
membersPage,
210+
opts.concurrency,
211+
async ({ memberId, organizationIds }) => {
212+
await temporal.workflow.start('memberUpdate', {
213+
taskQueue: 'profiles',
214+
workflowId: `member-update/${DEFAULT_TENANT_ID}/${memberId}`,
215+
workflowIdReusePolicy:
216+
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
217+
retry: { maximumAttempts: 10 },
218+
args: [
219+
{
220+
member: { id: memberId },
221+
memberOrganizationIds: organizationIds,
222+
syncToOpensearch: true,
223+
},
224+
],
225+
searchAttributes: { TenantId: [DEFAULT_TENANT_ID] },
226+
})
227+
},
228+
)
229+
230+
totalSucceeded += succeeded
231+
totalFailed += failed
232+
totalProcessed += succeeded + failed
233+
log.info(`Page ${pageNum} done: ${succeeded} ok, ${failed} failed`)
234+
}
235+
236+
log.info(`Resume with: --start-after ${lastMemberId}`)
237+
cursor = lastMemberId
238+
239+
if (membersPage.length < fetchSize) {
240+
// Last page (fewer results than requested means no more data)
241+
break
242+
}
243+
244+
if (opts.pageDelayMs > 0) {
245+
await new Promise((resolve) => setTimeout(resolve, opts.pageDelayMs))
246+
}
247+
}
248+
249+
log.info('='.repeat(80))
250+
log.info('Summary')
251+
log.info('='.repeat(80))
252+
log.info(`Pages processed: ${pageNum}`)
253+
if (!opts.dryRun) {
254+
log.info(`Total succeeded: ${totalSucceeded}`)
255+
log.info(`Total failed: ${totalFailed}`)
256+
}
257+
258+
process.exit(totalFailed > 0 ? 1 : 0)
259+
}
260+
261+
main().catch((err) => {
262+
log.error({ err }, 'Unexpected error')
263+
process.exit(1)
264+
})

0 commit comments

Comments
 (0)