From 058101d26a23eef213acdcdb57339d61e56ad4eb Mon Sep 17 00:00:00 2001 From: Mourits de Beer Date: Fri, 1 May 2026 17:56:53 +0200 Subject: [PATCH 1/6] feat(db): name Knex connections and optional session timeouts - Add POSTGRES_*_TIMEOUT_MS env knobs (empty = disabled) for statement, lock, and idle-in-transaction limits on new pool connections. - Set PostgreSQL application_name to odota::knex for pg_stat_activity. - Log pool max, application_name, and whether session timeouts are configured. Co-authored-by: Cursor --- config.ts | 4 +++ svc/store/db.ts | 75 ++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/config.ts b/config.ts index f7adae077..aa0c82376 100755 --- a/config.ts +++ b/config.ts @@ -30,6 +30,10 @@ const defaults = { POSTGRES_URL: "postgresql://postgres:postgres@localhost/yasp", // connection string for PostgreSQL READONLY_POSTGRES_URL: "postgresql://readonly:readonly@localhost/yasp", // readonly connection string for PostgreSQL POSTGRES_MAX_CONNECTIONS: "10", // Number of maximum connections in pool for postgres (per process) + /** Non-empty integer milliseconds applied per Knex pool connection via SET ... (disabled when unset). */ + POSTGRES_STATEMENT_TIMEOUT_MS: "", + POSTGRES_LOCK_TIMEOUT_MS: "", + POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS: "", REDIS_URL: "redis://127.0.0.1:6379/0", // connection string for Redis CASSANDRA_URL: "cassandra://localhost/yasp", // connection string for Cassandra RETRIEVER_SECRET: "", // string to use as shared secret with retriever/parser diff --git a/svc/store/db.ts b/svc/store/db.ts index b961e8f9c..b6b60aa23 100644 --- a/svc/store/db.ts +++ b/svc/store/db.ts @@ -6,25 +6,80 @@ import util from "node:util"; // remember: all values returned from the server are either NULL or a string pg.types.setTypeParser(20, (val) => (val === null ? null : parseInt(val, 10))); + +/** pg application_name is limited to 63 bytes; keep ASCII-safe for pg_stat_activity. */ +export function pgApplicationName(role: string): string { + const app = config.APP_NAME || "unknown"; + const safeApp = app.replace(/[^\w.-]/g, "_").substring(0, 40); + const safeRole = role.replace(/[^\w.-]/g, "_").substring(0, 20); + const name = `odota:${safeApp}:${safeRole}`; + return name.substring(0, 63); +} + +function parsePositiveMsEnv(key: string): string | null { + const raw = (config as Record)[key]; + if (raw === undefined || raw === "") { + return null; + } + const n = Number(raw); + if (!Number.isFinite(n) || n < 0) { + return null; + } + return `${Math.floor(n)}ms`; +} + +function knexPoolSessionStatements(): string[] { + const stmts: string[] = []; + const st = parsePositiveMsEnv("POSTGRES_STATEMENT_TIMEOUT_MS"); + const lt = parsePositiveMsEnv("POSTGRES_LOCK_TIMEOUT_MS"); + const idle = parsePositiveMsEnv( + "POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS", + ); + if (st) { + stmts.push(`SET statement_timeout TO '${st}'`); + } + if (lt) { + stmts.push(`SET lock_timeout TO '${lt}'`); + } + if (idle) { + stmts.push(`SET idle_in_transaction_session_timeout TO '${idle}'`); + } + return stmts; +} + +const knexAppName = pgApplicationName("knex"); +const sessionStmts = knexPoolSessionStatements(); + console.log( - "[POSTGRES] connecting %s with %s max connections", - config.POSTGRES_URL, + "[POSTGRES] knex connecting role=%s max=%s application_name=%s session_timeouts=%s", + config.APP_NAME || "(unset)", config.POSTGRES_MAX_CONNECTIONS, + knexAppName, + sessionStmts.length + ? { + statement_ms: config.POSTGRES_STATEMENT_TIMEOUT_MS || "(off)", + lock_ms: config.POSTGRES_LOCK_TIMEOUT_MS || "(off)", + idle_in_tx_ms: + config.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS || "(off)", + } + : "(none)", ); export const db = knex({ client: "pg", - connection: config.POSTGRES_URL, + connection: { + connectionString: config.POSTGRES_URL, + application_name: knexAppName, + }, pool: { min: 0, max: Number(config.POSTGRES_MAX_CONNECTIONS), - // afterCreate: (conn, done) => { - // // Set the minimum similarity for pg_trgm - // conn.query('SELECT set_limit(0.6);', (err) => { - // // if err is not falsy, connection is discarded from pool - // done(err, conn); - // }); - // }, + afterCreate(conn: pg.PoolClient, done: (err?: Error) => void) { + if (!sessionStmts.length) { + return done(); + } + conn.query(sessionStmts.join("; "), (err) => done(err)); + }, }, }); From c9cdcd050bfe458170fe84ee3f870ad8004bf770 Mon Sep 17 00:00:00 2001 From: Mourits de Beer Date: Fri, 1 May 2026 17:57:08 +0200 Subject: [PATCH 2/6] feat(api): set application_name on readonly /explorer pg pool Expose explorer sessions in pg_stat_activity as odota::explorer. Log pool identity alongside existing 15s timeouts. Co-authored-by: Cursor --- svc/api/spec.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/svc/api/spec.ts b/svc/api/spec.ts index 436afceb6..8f493073d 100644 --- a/svc/api/spec.ts +++ b/svc/api/spec.ts @@ -23,7 +23,7 @@ import { wordcloudCols, recentMatchesCols, } from "./playerFields.ts"; -import db from "../store/db.ts"; +import db, { pgApplicationName } from "../store/db.ts"; import redis, { redisCount, redisCountDistinct } from "../store/redis.ts"; import packageJson from "../../package.json" with { type: "json" }; import generateOperationId from "./generateOperationId.ts"; @@ -91,8 +91,10 @@ import { PRIORITY } from "../util/priority.ts"; import { getPatchIndex } from "../util/compute.ts"; import { matchupToString } from "../util/matchups.ts"; +const explorerPoolAppName = pgApplicationName("explorer"); const pool = new Pool({ connectionString: config.READONLY_POSTGRES_URL, + application_name: explorerPoolAppName, statement_timeout: 15000, query_timeout: 15000, lock_timeout: 15000, @@ -100,6 +102,13 @@ const pool = new Pool({ max: 5, }); +console.log( + "[POSTGRES] explorer pool application_name=%s max=%s statement_timeout_ms=%s", + explorerPoolAppName, + 5, + 15000, +); + const parameters = { ...heroParams, ...leagueParams, From 31aeddb328c6c562d167e127f4165596c0f4bbe7 Mon Sep 17 00:00:00 2001 From: Mourits de Beer Date: Fri, 1 May 2026 17:57:16 +0200 Subject: [PATCH 3/6] feat(queue): application_name for reliable queue pg clients Each runReliableQueue executor connects with odota::queue:: so queue-driven locks appear distinctly in pg_stat_activity. Co-authored-by: Cursor --- svc/store/queue.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/svc/store/queue.ts b/svc/store/queue.ts index 4f6fc7786..e2cc03054 100644 --- a/svc/store/queue.ts +++ b/svc/store/queue.ts @@ -1,6 +1,6 @@ import moment from "moment"; import redis, { redisCount } from "./redis.ts"; -import db from "./db.ts"; +import db, { pgApplicationName } from "./db.ts"; import config from "../../config.ts"; import { Client } from "pg"; import c from "ansi-colors"; @@ -55,8 +55,19 @@ export async function runReliableQueue( getCapacity?: () => Promise, ) { const executor = async (i: number) => { - const consumer = new Client(config.POSTGRES_URL); + const appName = pgApplicationName(`queue:${queueName}:${i}`); + const consumer = new Client({ + connectionString: config.POSTGRES_URL, + application_name: appName, + }); await consumer.connect(); + console.log( + "[POSTGRES] reliable queue consumer role=%s queue=%s slot=%d application_name=%s", + config.APP_NAME || "(unset)", + queueName, + i, + appName, + ); while (true) { // If we have a way to measure capacity, throttle the processing speed based on capacity if (getCapacity && i >= (await getCapacity())) { From cf0be483256e8e60d66c0e0f38fc9fa17c4f80cb Mon Sep 17 00:00:00 2001 From: Mourits de Beer Date: Fri, 1 May 2026 17:57:35 +0200 Subject: [PATCH 4/6] fix(insert): rollback insertMatch transaction on any failure If work after db.transaction() throws, release the client with rollback before rethrowing so the pool is not left with an open transaction. Co-authored-by: Cursor --- svc/util/insert.ts | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/svc/util/insert.ts b/svc/util/insert.ts index f62e7c1df..99d15c469 100644 --- a/svc/util/insert.ts +++ b/svc/util/insert.ts @@ -98,25 +98,30 @@ export async function insertMatch( let doGcData = false; const trx = await db.transaction(); - await upsertLeagueMatch(trx); - await upsertMatchPostgres(trx, isProTier); - await updateCounts( - trx, - match as ApiData, - average_rank, - num_rank_tier, - isProTier, - ); - await discoverPlayers(trx); - await queueMmr(trx); - await queueGcData(trx); - await queueRate(trx); - await queueScenarios(trx); - await postParsedMatch(trx); - await delInsertQueue(trx); - const parseJob = await queueParse(trx); - await trx.commit(); - return { parseJob, pgroup }; + try { + await upsertLeagueMatch(trx); + await upsertMatchPostgres(trx, isProTier); + await updateCounts( + trx, + match as ApiData, + average_rank, + num_rank_tier, + isProTier, + ); + await discoverPlayers(trx); + await queueMmr(trx); + await queueGcData(trx); + await queueRate(trx); + await queueScenarios(trx); + await postParsedMatch(trx); + await delInsertQueue(trx); + const parseJob = await queueParse(trx); + await trx.commit(); + return { parseJob, pgroup }; + } catch (e) { + await trx.rollback(); + throw e; + } async function upsertLeagueMatch(trx: knex.Knex.Transaction) { // Index the matchid to the league From 5b0fc5eb6def47e4d2ab8c84c9442b9389737965 Mon Sep 17 00:00:00 2001 From: Mourits de Beer Date: Fri, 1 May 2026 17:57:38 +0200 Subject: [PATCH 5/6] fix(db): rollback rater and syncSubs transactions on error Mirror insertMatch behavior so exceptions after db.transaction() always call trx.rollback() before propagating. Co-authored-by: Cursor --- svc/rater.ts | 55 +++++++++++++++++++++++++++---------------------- svc/syncSubs.ts | 23 +++++++++++++-------- 2 files changed, 44 insertions(+), 34 deletions(-) diff --git a/svc/rater.ts b/svc/rater.ts index c11ba01c9..b39f3049c 100644 --- a/svc/rater.ts +++ b/svc/rater.ts @@ -74,32 +74,37 @@ await runInLoop(async function rate() { const ratingDiff2 = kFactor * (win2 - e2); // Start transaction const trx = await db.transaction(); - // Rate each player - console.log("match %s, radiant_win: %s", row.match_id, row.radiant_win); - for (let p of gcMatch.players) { - const oldRating = ratingMap.get(p.account_id!) ?? DEFAULT_RATING; - const delta = isRadiant(p) ? ratingDiff1 : ratingDiff2; - // apply delta to each player (all players on a team will have the same rating change) - const newRating = oldRating + delta; - console.log( - "account_id: %s, oldRating: %s, newRating: %s, delta: %s", - p.account_id, - oldRating, - newRating, - delta, - ); - // Write ratings back to players + try { + // Rate each player + console.log("match %s, radiant_win: %s", row.match_id, row.radiant_win); + for (let p of gcMatch.players) { + const oldRating = ratingMap.get(p.account_id!) ?? DEFAULT_RATING; + const delta = isRadiant(p) ? ratingDiff1 : ratingDiff2; + // apply delta to each player (all players on a team will have the same rating change) + const newRating = oldRating + delta; + console.log( + "account_id: %s, oldRating: %s, newRating: %s, delta: %s", + p.account_id, + oldRating, + newRating, + delta, + ); + // Write ratings back to players + await trx.raw( + `INSERT INTO ${tableName}(account_id, computed_mmr, delta, match_id) VALUES(?, ?, ?, ?) ON CONFLICT(account_id) DO UPDATE SET computed_mmr = EXCLUDED.computed_mmr, delta = EXCLUDED.delta, match_id = EXCLUDED.match_id`, + [p.account_id, newRating, delta, row.match_id], + ); + } + // Delete row await trx.raw( - `INSERT INTO ${tableName}(account_id, computed_mmr, delta, match_id) VALUES(?, ?, ?, ?) ON CONFLICT(account_id) DO UPDATE SET computed_mmr = EXCLUDED.computed_mmr, delta = EXCLUDED.delta, match_id = EXCLUDED.match_id`, - [p.account_id, newRating, delta, row.match_id], + "DELETE FROM rating_queue WHERE match_seq_num = ?", + row.match_seq_num, ); + // Commit transaction + await trx.commit(); + redisCount("rater"); + } catch (e) { + await trx.rollback(); + throw e; } - // Delete row - await trx.raw( - "DELETE FROM rating_queue WHERE match_seq_num = ?", - row.match_seq_num, - ); - // Commit transaction - await trx.commit(); - redisCount("rater"); }, 0); diff --git a/svc/syncSubs.ts b/svc/syncSubs.ts index f8d663d8f..b91826d2a 100644 --- a/svc/syncSubs.ts +++ b/svc/syncSubs.ts @@ -15,14 +15,19 @@ await runInLoop(async function doSyncSubs() { } console.log(result.length, "subs"); const trx = await db.transaction(); - // Delete all status from subscribers - await trx.raw("UPDATE subscriber SET status = NULL"); - for (let sub of result) { - // Mark list of subscribers as active - await trx.raw("UPDATE subscriber SET status = ? WHERE customer_id = ?", [ - sub.status, - sub.customer, - ]); + try { + // Delete all status from subscribers + await trx.raw("UPDATE subscriber SET status = NULL"); + for (let sub of result) { + // Mark list of subscribers as active + await trx.raw("UPDATE subscriber SET status = ? WHERE customer_id = ?", [ + sub.status, + sub.customer, + ]); + } + await trx.commit(); + } catch (e) { + await trx.rollback(); + throw e; } - await trx.commit(); }, 60 * 1000); From 47a5f037c5fbe535c1e75e863b8c703894ecc3e3 Mon Sep 17 00:00:00 2001 From: Mourits de Beer Date: Fri, 1 May 2026 18:15:38 +0200 Subject: [PATCH 6/6] :art: --- config.ts | 7 +++---- svc/store/db.ts | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/config.ts b/config.ts index aa0c82376..c40b6f6e9 100755 --- a/config.ts +++ b/config.ts @@ -30,10 +30,9 @@ const defaults = { POSTGRES_URL: "postgresql://postgres:postgres@localhost/yasp", // connection string for PostgreSQL READONLY_POSTGRES_URL: "postgresql://readonly:readonly@localhost/yasp", // readonly connection string for PostgreSQL POSTGRES_MAX_CONNECTIONS: "10", // Number of maximum connections in pool for postgres (per process) - /** Non-empty integer milliseconds applied per Knex pool connection via SET ... (disabled when unset). */ - POSTGRES_STATEMENT_TIMEOUT_MS: "", - POSTGRES_LOCK_TIMEOUT_MS: "", - POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS: "", + POSTGRES_STATEMENT_TIMEOUT_MS: "", // Knex pool: SET statement_timeout per connection (ms, empty = off) + POSTGRES_LOCK_TIMEOUT_MS: "", // Knex pool: SET lock_timeout per connection (ms, empty = off) + POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS: "", // Knex pool: SET idle_in_transaction_session_timeout (ms, empty = off) REDIS_URL: "redis://127.0.0.1:6379/0", // connection string for Redis CASSANDRA_URL: "cassandra://localhost/yasp", // connection string for Cassandra RETRIEVER_SECRET: "", // string to use as shared secret with retriever/parser diff --git a/svc/store/db.ts b/svc/store/db.ts index b6b60aa23..f831f3227 100644 --- a/svc/store/db.ts +++ b/svc/store/db.ts @@ -50,19 +50,20 @@ function knexPoolSessionStatements(): string[] { const knexAppName = pgApplicationName("knex"); const sessionStmts = knexPoolSessionStatements(); +const knexTimeoutSummary = sessionStmts.length + ? [ + `statement=${config.POSTGRES_STATEMENT_TIMEOUT_MS || "0"}ms`, + `lock=${config.POSTGRES_LOCK_TIMEOUT_MS || "0"}ms`, + `idle_in_tx=${config.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS || "0"}ms`, + ].join(" ") + : "off"; + console.log( - "[POSTGRES] knex connecting role=%s max=%s application_name=%s session_timeouts=%s", + "[POSTGRES] knex role=%s max=%s application_name=%s session_timeouts=%s", config.APP_NAME || "(unset)", config.POSTGRES_MAX_CONNECTIONS, knexAppName, - sessionStmts.length - ? { - statement_ms: config.POSTGRES_STATEMENT_TIMEOUT_MS || "(off)", - lock_ms: config.POSTGRES_LOCK_TIMEOUT_MS || "(off)", - idle_in_tx_ms: - config.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS || "(off)", - } - : "(none)", + knexTimeoutSummary, ); export const db = knex({