diff --git a/config.ts b/config.ts index f7adae077..c40b6f6e9 100755 --- a/config.ts +++ b/config.ts @@ -30,6 +30,9 @@ const defaults = { POSTGRES_URL: "postgresql://postgres:postgres@localhost/yasp", // connection string for PostgreSQL READONLY_POSTGRES_URL: "postgresql://readonly:readonly@localhost/yasp", // readonly connection string for PostgreSQL POSTGRES_MAX_CONNECTIONS: "10", // Number of maximum connections in pool for postgres (per process) + POSTGRES_STATEMENT_TIMEOUT_MS: "", // Knex pool: SET statement_timeout per connection (ms, empty = off) + POSTGRES_LOCK_TIMEOUT_MS: "", // Knex pool: SET lock_timeout per connection (ms, empty = off) + POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS: "", // Knex pool: SET idle_in_transaction_session_timeout (ms, empty = off) REDIS_URL: "redis://127.0.0.1:6379/0", // connection string for Redis CASSANDRA_URL: "cassandra://localhost/yasp", // connection string for Cassandra RETRIEVER_SECRET: "", // string to use as shared secret with retriever/parser diff --git a/svc/api/spec.ts b/svc/api/spec.ts index 436afceb6..8f493073d 100644 --- a/svc/api/spec.ts +++ b/svc/api/spec.ts @@ -23,7 +23,7 @@ import { wordcloudCols, recentMatchesCols, } from "./playerFields.ts"; -import db from "../store/db.ts"; +import db, { pgApplicationName } from "../store/db.ts"; import redis, { redisCount, redisCountDistinct } from "../store/redis.ts"; import packageJson from "../../package.json" with { type: "json" }; import generateOperationId from "./generateOperationId.ts"; @@ -91,8 +91,10 @@ import { PRIORITY } from "../util/priority.ts"; import { getPatchIndex } from "../util/compute.ts"; import { matchupToString } from "../util/matchups.ts"; +const explorerPoolAppName = pgApplicationName("explorer"); const pool = new Pool({ connectionString: config.READONLY_POSTGRES_URL, + application_name: explorerPoolAppName, statement_timeout: 15000, query_timeout: 15000, lock_timeout: 15000, @@ -100,6 +102,13 @@ const pool = new Pool({ max: 5, }); +console.log( + "[POSTGRES] explorer pool application_name=%s max=%s statement_timeout_ms=%s", + explorerPoolAppName, + 5, + 15000, +); + const parameters = { ...heroParams, ...leagueParams, diff --git a/svc/rater.ts b/svc/rater.ts index c11ba01c9..b39f3049c 100644 --- a/svc/rater.ts +++ b/svc/rater.ts @@ -74,32 +74,37 @@ await runInLoop(async function rate() { const ratingDiff2 = kFactor * (win2 - e2); // Start transaction const trx = await db.transaction(); - // Rate each player - console.log("match %s, radiant_win: %s", row.match_id, row.radiant_win); - for (let p of gcMatch.players) { - const oldRating = ratingMap.get(p.account_id!) ?? DEFAULT_RATING; - const delta = isRadiant(p) ? ratingDiff1 : ratingDiff2; - // apply delta to each player (all players on a team will have the same rating change) - const newRating = oldRating + delta; - console.log( - "account_id: %s, oldRating: %s, newRating: %s, delta: %s", - p.account_id, - oldRating, - newRating, - delta, - ); - // Write ratings back to players + try { + // Rate each player + console.log("match %s, radiant_win: %s", row.match_id, row.radiant_win); + for (let p of gcMatch.players) { + const oldRating = ratingMap.get(p.account_id!) ?? DEFAULT_RATING; + const delta = isRadiant(p) ? ratingDiff1 : ratingDiff2; + // apply delta to each player (all players on a team will have the same rating change) + const newRating = oldRating + delta; + console.log( + "account_id: %s, oldRating: %s, newRating: %s, delta: %s", + p.account_id, + oldRating, + newRating, + delta, + ); + // Write ratings back to players + await trx.raw( + `INSERT INTO ${tableName}(account_id, computed_mmr, delta, match_id) VALUES(?, ?, ?, ?) ON CONFLICT(account_id) DO UPDATE SET computed_mmr = EXCLUDED.computed_mmr, delta = EXCLUDED.delta, match_id = EXCLUDED.match_id`, + [p.account_id, newRating, delta, row.match_id], + ); + } + // Delete row await trx.raw( - `INSERT INTO ${tableName}(account_id, computed_mmr, delta, match_id) VALUES(?, ?, ?, ?) ON CONFLICT(account_id) DO UPDATE SET computed_mmr = EXCLUDED.computed_mmr, delta = EXCLUDED.delta, match_id = EXCLUDED.match_id`, - [p.account_id, newRating, delta, row.match_id], + "DELETE FROM rating_queue WHERE match_seq_num = ?", + row.match_seq_num, ); + // Commit transaction + await trx.commit(); + redisCount("rater"); + } catch (e) { + await trx.rollback(); + throw e; } - // Delete row - await trx.raw( - "DELETE FROM rating_queue WHERE match_seq_num = ?", - row.match_seq_num, - ); - // Commit transaction - await trx.commit(); - redisCount("rater"); }, 0); diff --git a/svc/store/db.ts b/svc/store/db.ts index b961e8f9c..f831f3227 100644 --- a/svc/store/db.ts +++ b/svc/store/db.ts @@ -6,25 +6,81 @@ import util from "node:util"; // remember: all values returned from the server are either NULL or a string pg.types.setTypeParser(20, (val) => (val === null ? null : parseInt(val, 10))); + +/** pg application_name is limited to 63 bytes; keep ASCII-safe for pg_stat_activity. */ +export function pgApplicationName(role: string): string { + const app = config.APP_NAME || "unknown"; + const safeApp = app.replace(/[^\w.-]/g, "_").substring(0, 40); + const safeRole = role.replace(/[^\w.-]/g, "_").substring(0, 20); + const name = `odota:${safeApp}:${safeRole}`; + return name.substring(0, 63); +} + +function parsePositiveMsEnv(key: string): string | null { + const raw = (config as Record)[key]; + if (raw === undefined || raw === "") { + return null; + } + const n = Number(raw); + if (!Number.isFinite(n) || n < 0) { + return null; + } + return `${Math.floor(n)}ms`; +} + +function knexPoolSessionStatements(): string[] { + const stmts: string[] = []; + const st = parsePositiveMsEnv("POSTGRES_STATEMENT_TIMEOUT_MS"); + const lt = parsePositiveMsEnv("POSTGRES_LOCK_TIMEOUT_MS"); + const idle = parsePositiveMsEnv( + "POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS", + ); + if (st) { + stmts.push(`SET statement_timeout TO '${st}'`); + } + if (lt) { + stmts.push(`SET lock_timeout TO '${lt}'`); + } + if (idle) { + stmts.push(`SET idle_in_transaction_session_timeout TO '${idle}'`); + } + return stmts; +} + +const knexAppName = pgApplicationName("knex"); +const sessionStmts = knexPoolSessionStatements(); + +const knexTimeoutSummary = sessionStmts.length + ? [ + `statement=${config.POSTGRES_STATEMENT_TIMEOUT_MS || "0"}ms`, + `lock=${config.POSTGRES_LOCK_TIMEOUT_MS || "0"}ms`, + `idle_in_tx=${config.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS || "0"}ms`, + ].join(" ") + : "off"; + console.log( - "[POSTGRES] connecting %s with %s max connections", - config.POSTGRES_URL, + "[POSTGRES] knex role=%s max=%s application_name=%s session_timeouts=%s", + config.APP_NAME || "(unset)", config.POSTGRES_MAX_CONNECTIONS, + knexAppName, + knexTimeoutSummary, ); export const db = knex({ client: "pg", - connection: config.POSTGRES_URL, + connection: { + connectionString: config.POSTGRES_URL, + application_name: knexAppName, + }, pool: { min: 0, max: Number(config.POSTGRES_MAX_CONNECTIONS), - // afterCreate: (conn, done) => { - // // Set the minimum similarity for pg_trgm - // conn.query('SELECT set_limit(0.6);', (err) => { - // // if err is not falsy, connection is discarded from pool - // done(err, conn); - // }); - // }, + afterCreate(conn: pg.PoolClient, done: (err?: Error) => void) { + if (!sessionStmts.length) { + return done(); + } + conn.query(sessionStmts.join("; "), (err) => done(err)); + }, }, }); diff --git a/svc/store/queue.ts b/svc/store/queue.ts index 4f6fc7786..e2cc03054 100644 --- a/svc/store/queue.ts +++ b/svc/store/queue.ts @@ -1,6 +1,6 @@ import moment from "moment"; import redis, { redisCount } from "./redis.ts"; -import db from "./db.ts"; +import db, { pgApplicationName } from "./db.ts"; import config from "../../config.ts"; import { Client } from "pg"; import c from "ansi-colors"; @@ -55,8 +55,19 @@ export async function runReliableQueue( getCapacity?: () => Promise, ) { const executor = async (i: number) => { - const consumer = new Client(config.POSTGRES_URL); + const appName = pgApplicationName(`queue:${queueName}:${i}`); + const consumer = new Client({ + connectionString: config.POSTGRES_URL, + application_name: appName, + }); await consumer.connect(); + console.log( + "[POSTGRES] reliable queue consumer role=%s queue=%s slot=%d application_name=%s", + config.APP_NAME || "(unset)", + queueName, + i, + appName, + ); while (true) { // If we have a way to measure capacity, throttle the processing speed based on capacity if (getCapacity && i >= (await getCapacity())) { diff --git a/svc/syncSubs.ts b/svc/syncSubs.ts index f8d663d8f..b91826d2a 100644 --- a/svc/syncSubs.ts +++ b/svc/syncSubs.ts @@ -15,14 +15,19 @@ await runInLoop(async function doSyncSubs() { } console.log(result.length, "subs"); const trx = await db.transaction(); - // Delete all status from subscribers - await trx.raw("UPDATE subscriber SET status = NULL"); - for (let sub of result) { - // Mark list of subscribers as active - await trx.raw("UPDATE subscriber SET status = ? WHERE customer_id = ?", [ - sub.status, - sub.customer, - ]); + try { + // Delete all status from subscribers + await trx.raw("UPDATE subscriber SET status = NULL"); + for (let sub of result) { + // Mark list of subscribers as active + await trx.raw("UPDATE subscriber SET status = ? WHERE customer_id = ?", [ + sub.status, + sub.customer, + ]); + } + await trx.commit(); + } catch (e) { + await trx.rollback(); + throw e; } - await trx.commit(); }, 60 * 1000); diff --git a/svc/util/insert.ts b/svc/util/insert.ts index f62e7c1df..99d15c469 100644 --- a/svc/util/insert.ts +++ b/svc/util/insert.ts @@ -98,25 +98,30 @@ export async function insertMatch( let doGcData = false; const trx = await db.transaction(); - await upsertLeagueMatch(trx); - await upsertMatchPostgres(trx, isProTier); - await updateCounts( - trx, - match as ApiData, - average_rank, - num_rank_tier, - isProTier, - ); - await discoverPlayers(trx); - await queueMmr(trx); - await queueGcData(trx); - await queueRate(trx); - await queueScenarios(trx); - await postParsedMatch(trx); - await delInsertQueue(trx); - const parseJob = await queueParse(trx); - await trx.commit(); - return { parseJob, pgroup }; + try { + await upsertLeagueMatch(trx); + await upsertMatchPostgres(trx, isProTier); + await updateCounts( + trx, + match as ApiData, + average_rank, + num_rank_tier, + isProTier, + ); + await discoverPlayers(trx); + await queueMmr(trx); + await queueGcData(trx); + await queueRate(trx); + await queueScenarios(trx); + await postParsedMatch(trx); + await delInsertQueue(trx); + const parseJob = await queueParse(trx); + await trx.commit(); + return { parseJob, pgroup }; + } catch (e) { + await trx.rollback(); + throw e; + } async function upsertLeagueMatch(trx: knex.Knex.Transaction) { // Index the matchid to the league