Skip to content

Commit b7b2ddb

Browse files
authored
Add compact chat completion trace fetcher (#716)
1 parent 025ad6c commit b7b2ddb

1 file changed

Lines changed: 383 additions & 0 deletions

File tree

Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
/**
2+
* Fetch and reconstruct recent chat completion traces from BigQuery.
3+
*
4+
* Usage:
5+
* bun scripts/fetch-recent-chat-completion-traces.ts
6+
* bun scripts/fetch-recent-chat-completion-traces.ts --prod --limit 5
7+
* infisical run --env=prod --silent -- bun scripts/fetch-recent-chat-completion-traces.ts --prod
8+
*/
9+
10+
import { BigQuery } from '@google-cloud/bigquery'
11+
import { mkdir, writeFile } from 'node:fs/promises'
12+
import { join, resolve } from 'node:path'
13+
14+
type Args = {
15+
dataset: string
16+
limit: number
17+
lookbackHours: number
18+
outDir: string
19+
traceSessionId: string | null
20+
}
21+
22+
type TraceRow = {
23+
trace_session_id: string
24+
agent_id: string
25+
created_at: unknown
26+
message_count: number
27+
message_start_index: number
28+
messages_json: string | null
29+
}
30+
31+
type ChatMessage = Record<string, unknown> & {
32+
role?: string
33+
content?: unknown
34+
}
35+
36+
type TraceCall = Omit<TraceRow, 'messages_json'> & {
37+
created_at: string
38+
messages: ChatMessage[]
39+
}
40+
41+
type TraceSession = {
42+
trace_session_id: string
43+
agent_ids: string[]
44+
first_created_at: string
45+
last_created_at: string
46+
messages: ChatMessage[]
47+
incomplete: boolean
48+
}
49+
50+
type TraceSessionIndexEntry = {
51+
trace_session_id: string
52+
file: string
53+
first_created_at: string
54+
last_created_at: string
55+
agent_ids: string[]
56+
message_count: number
57+
incomplete: boolean
58+
}
59+
60+
type TraceFile = {
61+
trace_session_id: string
62+
messages: ChatMessage[]
63+
}
64+
65+
function printHelp() {
66+
console.log(`Fetch recent chat completion traces from BigQuery.
67+
68+
Usage:
69+
bun scripts/fetch-recent-chat-completion-traces.ts [options]
70+
71+
Options:
72+
--prod Use codebuff_data instead of codebuff_data_dev.
73+
--dataset name Explicit BigQuery dataset name.
74+
--limit n Number of recent trace sessions to fetch. Default: 3.
75+
--lookback-hours n Recent window to scan and reconstruct. Default: 24.
76+
--trace-session-id id Fetch one known trace session id.
77+
--out-dir path Output directory. Default: .context/recent-chat-completion-traces.
78+
--help Show this message.
79+
`)
80+
}
81+
82+
function readNumberFlag(
83+
argv: string[],
84+
name: string,
85+
fallback: number,
86+
): number {
87+
const idx = argv.indexOf(name)
88+
if (idx < 0) return fallback
89+
90+
const raw = argv[idx + 1]
91+
const parsed = raw ? Number.parseInt(raw, 10) : Number.NaN
92+
if (!Number.isFinite(parsed) || parsed <= 0) {
93+
throw new Error(`${name} must be a positive integer`)
94+
}
95+
return parsed
96+
}
97+
98+
function readStringFlag(
99+
argv: string[],
100+
name: string,
101+
fallback: string | null,
102+
): string | null {
103+
const idx = argv.indexOf(name)
104+
return idx >= 0 && argv[idx + 1] ? argv[idx + 1]! : fallback
105+
}
106+
107+
function parseArgs(): Args {
108+
const argv = process.argv.slice(2)
109+
if (argv.includes('--help') || argv.includes('-h')) {
110+
printHelp()
111+
process.exit(0)
112+
}
113+
114+
return {
115+
dataset:
116+
readStringFlag(argv, '--dataset', null) ??
117+
(argv.includes('--prod') ? 'codebuff_data' : 'codebuff_data_dev'),
118+
limit: readNumberFlag(argv, '--limit', 3),
119+
lookbackHours: readNumberFlag(argv, '--lookback-hours', 24),
120+
traceSessionId: readStringFlag(argv, '--trace-session-id', null),
121+
outDir:
122+
readStringFlag(argv, '--out-dir', null) ??
123+
readStringFlag(argv, '--out', null) ??
124+
'.context/recent-chat-completion-traces',
125+
}
126+
}
127+
128+
function toIso(value: unknown): string {
129+
if (value instanceof Date) return value.toISOString()
130+
if (value && typeof value === 'object' && 'value' in value) {
131+
return String((value as { value: unknown }).value)
132+
}
133+
return String(value)
134+
}
135+
136+
function parseJson<T>(value: string | null, fallback: T): T {
137+
if (!value) return fallback
138+
const parsed = JSON.parse(value)
139+
return parsed === null ? fallback : (parsed as T)
140+
}
141+
142+
function safeFilePart(value: string): string {
143+
return value.replace(/[^a-zA-Z0-9._-]+/g, '-')
144+
}
145+
146+
function redactForPreview(text: string): string {
147+
return text
148+
.replace(
149+
/\b(?:api[_-]?key|access[_-]?key|secret|token|password|passwd|pwd)=([^&\s"'`]+)/gi,
150+
(match) => `${match.split('=')[0]}=[REDACTED]`,
151+
)
152+
.replace(
153+
/\b[A-Za-z0-9_-]{24,}\.[A-Za-z0-9_-]{12,}\.[A-Za-z0-9_-]{12,}\b/g,
154+
'[REDACTED_TOKEN]',
155+
)
156+
.replace(
157+
/\b(?:sk|pk|gho|ghp|glpat|xox[baprs])-?[A-Za-z0-9_-]{16,}\b/g,
158+
'[REDACTED_TOKEN]',
159+
)
160+
.replace(/\b[A-Fa-f0-9]{32,}\b/g, '[REDACTED_HEX]')
161+
.replace(/\b[A-Za-z0-9+/]{32,}={0,2}\b/g, (match) =>
162+
/[A-Za-z]/.test(match) && /\d/.test(match) ? '[REDACTED_SECRET]' : match,
163+
)
164+
}
165+
166+
function getMessagePreview(message: ChatMessage | undefined): string {
167+
if (!message) return '(none)'
168+
const role = typeof message.role === 'string' ? message.role : 'unknown'
169+
const content =
170+
typeof message.content === 'string'
171+
? message.content
172+
: JSON.stringify(message.content)
173+
return `${role}: ${redactForPreview((content ?? '').replace(/\s+/g, ' ')).slice(0, 120)}`
174+
}
175+
176+
function applyMessageDelta(params: {
177+
existingMessages: ChatMessage[]
178+
row: TraceCall
179+
}) {
180+
const { existingMessages, row } = params
181+
const reconstructed = [...existingMessages]
182+
row.messages.forEach((message, index) => {
183+
reconstructed[row.message_start_index + index] = message
184+
})
185+
return reconstructed.slice(0, row.message_count)
186+
}
187+
188+
function normalizeRow(row: TraceRow): TraceCall {
189+
return {
190+
...row,
191+
created_at: toIso(row.created_at),
192+
messages: parseJson<ChatMessage[]>(row.messages_json, []),
193+
}
194+
}
195+
196+
function reconstructTraceSessions(rows: TraceRow[]): TraceSession[] {
197+
const sessions = new Map<string, TraceSession>()
198+
const workingMessages = new Map<string, ChatMessage[]>()
199+
200+
for (const rawRow of rows) {
201+
const row = normalizeRow(rawRow)
202+
const sessionKey = row.trace_session_id
203+
204+
let session = sessions.get(sessionKey)
205+
if (!session) {
206+
session = {
207+
trace_session_id: row.trace_session_id,
208+
agent_ids: [],
209+
first_created_at: row.created_at,
210+
last_created_at: row.created_at,
211+
messages: [],
212+
incomplete: false,
213+
}
214+
sessions.set(sessionKey, session)
215+
}
216+
217+
if (!session.agent_ids.includes(row.agent_id)) {
218+
session.agent_ids.push(row.agent_id)
219+
}
220+
const currentMessages = workingMessages.get(sessionKey) ?? []
221+
if (row.message_start_index > currentMessages.length) {
222+
session.incomplete = true
223+
}
224+
225+
const reconstructedMessages = applyMessageDelta({
226+
existingMessages: currentMessages,
227+
row,
228+
})
229+
workingMessages.set(sessionKey, reconstructedMessages)
230+
if (
231+
reconstructedMessages.length > 0 &&
232+
reconstructedMessages.length >= session.messages.length
233+
) {
234+
session.messages = reconstructedMessages
235+
}
236+
237+
if (row.created_at < session.first_created_at) {
238+
session.first_created_at = row.created_at
239+
}
240+
if (row.created_at > session.last_created_at) {
241+
session.last_created_at = row.created_at
242+
}
243+
}
244+
245+
return [...sessions.values()].sort((a, b) =>
246+
b.last_created_at.localeCompare(a.last_created_at),
247+
)
248+
}
249+
250+
async function fetchRows(args: Args): Promise<TraceRow[]> {
251+
const bigquery = new BigQuery()
252+
const table = `\`${args.dataset}.chat_completion_traces\``
253+
const fields = `
254+
trace_session_id,
255+
agent_id,
256+
created_at,
257+
message_count,
258+
message_start_index,
259+
TO_JSON_STRING(messages) AS messages_json
260+
`
261+
262+
const query = args.traceSessionId
263+
? `
264+
SELECT ${fields}
265+
FROM ${table}
266+
WHERE trace_session_id = @traceSessionId
267+
AND trace_lineage_id = trace_session_id
268+
AND created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL @lookbackHours HOUR)
269+
ORDER BY trace_session_id, created_at, id
270+
`
271+
: `
272+
WITH recent_rows AS (
273+
SELECT *
274+
FROM ${table}
275+
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL @lookbackHours HOUR)
276+
AND trace_lineage_id = trace_session_id
277+
),
278+
recent_sessions AS (
279+
SELECT trace_session_id, MAX(created_at) AS last_created_at
280+
FROM recent_rows
281+
GROUP BY trace_session_id
282+
ORDER BY last_created_at DESC
283+
LIMIT @limit
284+
)
285+
SELECT ${fields}
286+
FROM recent_rows
287+
JOIN recent_sessions USING (trace_session_id)
288+
ORDER BY trace_session_id, created_at, id
289+
`
290+
291+
const [rows] = await bigquery.query({
292+
query,
293+
params: {
294+
limit: args.limit,
295+
lookbackHours: args.lookbackHours,
296+
...(args.traceSessionId ? { traceSessionId: args.traceSessionId } : {}),
297+
},
298+
})
299+
300+
return rows as TraceRow[]
301+
}
302+
303+
function printSummary(args: Args, sessions: TraceSession[]) {
304+
console.log(
305+
`Fetched ${sessions.length} trace session(s) from ${args.dataset}.chat_completion_traces`,
306+
)
307+
console.log(`Lookback: ${args.lookbackHours} hour(s)`)
308+
309+
for (const session of sessions) {
310+
console.log('')
311+
console.log(`Trace session: ${session.trace_session_id}`)
312+
console.log(` messages=${session.messages.length}`)
313+
console.log(
314+
` first=${session.first_created_at} last=${session.last_created_at}`,
315+
)
316+
console.log(
317+
` agents=${session.agent_ids.join(', ')}${session.incomplete ? ' incomplete=true' : ''}`,
318+
)
319+
console.log(` last_message=${getMessagePreview(session.messages.at(-1))}`)
320+
}
321+
}
322+
323+
function buildIndexEntry(session: TraceSession): TraceSessionIndexEntry {
324+
return {
325+
trace_session_id: session.trace_session_id,
326+
file: `${safeFilePart(session.trace_session_id)}.json`,
327+
first_created_at: session.first_created_at,
328+
last_created_at: session.last_created_at,
329+
agent_ids: session.agent_ids,
330+
message_count: session.messages.length,
331+
incomplete: session.incomplete,
332+
}
333+
}
334+
335+
function buildTraceFile(session: TraceSession): TraceFile {
336+
return {
337+
trace_session_id: session.trace_session_id,
338+
messages: session.messages,
339+
}
340+
}
341+
342+
async function main() {
343+
const args = parseArgs()
344+
const rows = await fetchRows(args)
345+
const sessions = reconstructTraceSessions(rows)
346+
const outDir = resolve(args.outDir)
347+
const indexEntries = sessions.map(buildIndexEntry)
348+
349+
await mkdir(outDir, { recursive: true })
350+
await Promise.all(
351+
sessions.map((session, index) =>
352+
writeFile(
353+
join(outDir, indexEntries[index]!.file),
354+
JSON.stringify(buildTraceFile(session), null, 2),
355+
),
356+
),
357+
)
358+
await writeFile(
359+
join(outDir, 'index.json'),
360+
JSON.stringify(
361+
{
362+
generated_at: new Date().toISOString(),
363+
dataset: args.dataset,
364+
lookback_hours: args.lookbackHours,
365+
trace_session_id: args.traceSessionId,
366+
sessions: indexEntries,
367+
},
368+
null,
369+
2,
370+
),
371+
)
372+
373+
printSummary(args, sessions)
374+
console.log('')
375+
console.log(
376+
`Wrote ${sessions.length} trace file(s) and index.json to ${outDir}`,
377+
)
378+
}
379+
380+
main().catch((error) => {
381+
console.error(error)
382+
process.exit(1)
383+
})

0 commit comments

Comments
 (0)