Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const defaults = {
POSTGRES_URL: "postgresql://postgres:postgres@localhost/yasp", // connection string for PostgreSQL
READONLY_POSTGRES_URL: "postgresql://readonly:readonly@localhost/yasp", // readonly connection string for PostgreSQL
POSTGRES_MAX_CONNECTIONS: "10", // Number of maximum connections in pool for postgres (per process)
POSTGRES_STATEMENT_TIMEOUT_MS: "", // Knex pool: SET statement_timeout per connection (ms, empty = off)
POSTGRES_LOCK_TIMEOUT_MS: "", // Knex pool: SET lock_timeout per connection (ms, empty = off)
POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS: "", // Knex pool: SET idle_in_transaction_session_timeout (ms, empty = off)
REDIS_URL: "redis://127.0.0.1:6379/0", // connection string for Redis
CASSANDRA_URL: "cassandra://localhost/yasp", // connection string for Cassandra
RETRIEVER_SECRET: "", // string to use as shared secret with retriever/parser
Expand Down
11 changes: 10 additions & 1 deletion svc/api/spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
wordcloudCols,
recentMatchesCols,
} from "./playerFields.ts";
import db from "../store/db.ts";
import db, { pgApplicationName } from "../store/db.ts";
import redis, { redisCount, redisCountDistinct } from "../store/redis.ts";
import packageJson from "../../package.json" with { type: "json" };
import generateOperationId from "./generateOperationId.ts";
Expand Down Expand Up @@ -91,15 +91,24 @@ import { PRIORITY } from "../util/priority.ts";
import { getPatchIndex } from "../util/compute.ts";
import { matchupToString } from "../util/matchups.ts";

const explorerPoolAppName = pgApplicationName("explorer");
const pool = new Pool({
connectionString: config.READONLY_POSTGRES_URL,
application_name: explorerPoolAppName,
statement_timeout: 15000,
query_timeout: 15000,
lock_timeout: 15000,
connectionTimeoutMillis: 15000,
max: 5,
});

console.log(
"[POSTGRES] explorer pool application_name=%s max=%s statement_timeout_ms=%s",
explorerPoolAppName,
5,
15000,
);

const parameters = {
...heroParams,
...leagueParams,
Expand Down
55 changes: 30 additions & 25 deletions svc/rater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,32 +74,37 @@ await runInLoop(async function rate() {
const ratingDiff2 = kFactor * (win2 - e2);
// Start transaction
const trx = await db.transaction();
// Rate each player
console.log("match %s, radiant_win: %s", row.match_id, row.radiant_win);
for (let p of gcMatch.players) {
const oldRating = ratingMap.get(p.account_id!) ?? DEFAULT_RATING;
const delta = isRadiant(p) ? ratingDiff1 : ratingDiff2;
// apply delta to each player (all players on a team will have the same rating change)
const newRating = oldRating + delta;
console.log(
"account_id: %s, oldRating: %s, newRating: %s, delta: %s",
p.account_id,
oldRating,
newRating,
delta,
);
// Write ratings back to players
try {
// Rate each player
console.log("match %s, radiant_win: %s", row.match_id, row.radiant_win);
for (let p of gcMatch.players) {
const oldRating = ratingMap.get(p.account_id!) ?? DEFAULT_RATING;
const delta = isRadiant(p) ? ratingDiff1 : ratingDiff2;
// apply delta to each player (all players on a team will have the same rating change)
const newRating = oldRating + delta;
console.log(
"account_id: %s, oldRating: %s, newRating: %s, delta: %s",
p.account_id,
oldRating,
newRating,
delta,
);
// Write ratings back to players
await trx.raw(
`INSERT INTO ${tableName}(account_id, computed_mmr, delta, match_id) VALUES(?, ?, ?, ?) ON CONFLICT(account_id) DO UPDATE SET computed_mmr = EXCLUDED.computed_mmr, delta = EXCLUDED.delta, match_id = EXCLUDED.match_id`,
[p.account_id, newRating, delta, row.match_id],
);
}
// Delete row
await trx.raw(
`INSERT INTO ${tableName}(account_id, computed_mmr, delta, match_id) VALUES(?, ?, ?, ?) ON CONFLICT(account_id) DO UPDATE SET computed_mmr = EXCLUDED.computed_mmr, delta = EXCLUDED.delta, match_id = EXCLUDED.match_id`,
[p.account_id, newRating, delta, row.match_id],
"DELETE FROM rating_queue WHERE match_seq_num = ?",
row.match_seq_num,
);
// Commit transaction
await trx.commit();
redisCount("rater");
} catch (e) {
await trx.rollback();
throw e;
}
// Delete row
await trx.raw(
"DELETE FROM rating_queue WHERE match_seq_num = ?",
row.match_seq_num,
);
// Commit transaction
await trx.commit();
redisCount("rater");
}, 0);
76 changes: 66 additions & 10 deletions svc/store/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,81 @@ import util from "node:util";

// remember: all values returned from the server are either NULL or a string
pg.types.setTypeParser(20, (val) => (val === null ? null : parseInt(val, 10)));

/** pg application_name is limited to 63 bytes; keep ASCII-safe for pg_stat_activity. */
export function pgApplicationName(role: string): string {
const app = config.APP_NAME || "unknown";
const safeApp = app.replace(/[^\w.-]/g, "_").substring(0, 40);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like having this length validation is an overkill (an LLM-ism, I presume)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, it is agent-assisted coding. Here it's one of those "solving a problem before it's a problem" things. So it's good, any name will work now, because it normalises the input. But at the same time, it's not really helpful when it wasn't a problem yet. I didn't ask it to solve it, but it's technically good, and once it's produced, we may as well keep it 🤷‍♂️

const safeRole = role.replace(/[^\w.-]/g, "_").substring(0, 20);
const name = `odota:${safeApp}:${safeRole}`;
return name.substring(0, 63);
}

function parsePositiveMsEnv(key: string): string | null {
const raw = (config as Record<string, string | undefined>)[key];
if (raw === undefined || raw === "") {
return null;
}
const n = Number(raw);
if (!Number.isFinite(n) || n < 0) {
return null;
}
return `${Math.floor(n)}ms`;
}

function knexPoolSessionStatements(): string[] {
const stmts: string[] = [];
const st = parsePositiveMsEnv("POSTGRES_STATEMENT_TIMEOUT_MS");
const lt = parsePositiveMsEnv("POSTGRES_LOCK_TIMEOUT_MS");
const idle = parsePositiveMsEnv(
"POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS",
);
if (st) {
stmts.push(`SET statement_timeout TO '${st}'`);
}
if (lt) {
stmts.push(`SET lock_timeout TO '${lt}'`);
}
if (idle) {
stmts.push(`SET idle_in_transaction_session_timeout TO '${idle}'`);
}
return stmts;
}

const knexAppName = pgApplicationName("knex");
const sessionStmts = knexPoolSessionStatements();

const knexTimeoutSummary = sessionStmts.length
? [
`statement=${config.POSTGRES_STATEMENT_TIMEOUT_MS || "0"}ms`,
`lock=${config.POSTGRES_LOCK_TIMEOUT_MS || "0"}ms`,
`idle_in_tx=${config.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT_MS || "0"}ms`,
].join(" ")
: "off";

console.log(
"[POSTGRES] connecting %s with %s max connections",
config.POSTGRES_URL,
"[POSTGRES] knex role=%s max=%s application_name=%s session_timeouts=%s",
config.APP_NAME || "(unset)",
config.POSTGRES_MAX_CONNECTIONS,
knexAppName,
knexTimeoutSummary,
);

export const db = knex({
client: "pg",
connection: config.POSTGRES_URL,
connection: {
connectionString: config.POSTGRES_URL,
application_name: knexAppName,
},
pool: {
min: 0,
max: Number(config.POSTGRES_MAX_CONNECTIONS),
// afterCreate: (conn, done) => {
// // Set the minimum similarity for pg_trgm
// conn.query('SELECT set_limit(0.6);', (err) => {
// // if err is not falsy, connection is discarded from pool
// done(err, conn);
// });
// },
afterCreate(conn: pg.PoolClient, done: (err?: Error) => void) {
if (!sessionStmts.length) {
return done();
}
conn.query(sessionStmts.join("; "), (err) => done(err));
},
},
});

Expand Down
15 changes: 13 additions & 2 deletions svc/store/queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import moment from "moment";
import redis, { redisCount } from "./redis.ts";
import db from "./db.ts";
import db, { pgApplicationName } from "./db.ts";
import config from "../../config.ts";
import { Client } from "pg";
import c from "ansi-colors";
Expand Down Expand Up @@ -55,8 +55,19 @@ export async function runReliableQueue(
getCapacity?: () => Promise<number>,
) {
const executor = async (i: number) => {
const consumer = new Client(config.POSTGRES_URL);
const appName = pgApplicationName(`queue:${queueName}:${i}`);
const consumer = new Client({
connectionString: config.POSTGRES_URL,
application_name: appName,
});
await consumer.connect();
console.log(
"[POSTGRES] reliable queue consumer role=%s queue=%s slot=%d application_name=%s",
config.APP_NAME || "(unset)",
queueName,
i,
appName,
);
while (true) {
// If we have a way to measure capacity, throttle the processing speed based on capacity
if (getCapacity && i >= (await getCapacity())) {
Expand Down
23 changes: 14 additions & 9 deletions svc/syncSubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ await runInLoop(async function doSyncSubs() {
}
console.log(result.length, "subs");
const trx = await db.transaction();
// Delete all status from subscribers
await trx.raw("UPDATE subscriber SET status = NULL");
for (let sub of result) {
// Mark list of subscribers as active
await trx.raw("UPDATE subscriber SET status = ? WHERE customer_id = ?", [
sub.status,
sub.customer,
]);
try {
// Delete all status from subscribers
await trx.raw("UPDATE subscriber SET status = NULL");
for (let sub of result) {
// Mark list of subscribers as active
await trx.raw("UPDATE subscriber SET status = ? WHERE customer_id = ?", [
sub.status,
sub.customer,
]);
}
await trx.commit();
} catch (e) {
await trx.rollback();
throw e;
}
await trx.commit();
}, 60 * 1000);
43 changes: 24 additions & 19 deletions svc/util/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,30 @@ export async function insertMatch(

let doGcData = false;
const trx = await db.transaction();
await upsertLeagueMatch(trx);
await upsertMatchPostgres(trx, isProTier);
await updateCounts(
trx,
match as ApiData,
average_rank,
num_rank_tier,
isProTier,
);
await discoverPlayers(trx);
await queueMmr(trx);
await queueGcData(trx);
await queueRate(trx);
await queueScenarios(trx);
await postParsedMatch(trx);
await delInsertQueue(trx);
const parseJob = await queueParse(trx);
await trx.commit();
return { parseJob, pgroup };
try {
await upsertLeagueMatch(trx);
await upsertMatchPostgres(trx, isProTier);
await updateCounts(
trx,
match as ApiData,
average_rank,
num_rank_tier,
isProTier,
);
await discoverPlayers(trx);
await queueMmr(trx);
await queueGcData(trx);
await queueRate(trx);
await queueScenarios(trx);
await postParsedMatch(trx);
await delInsertQueue(trx);
const parseJob = await queueParse(trx);
await trx.commit();
return { parseJob, pgroup };
} catch (e) {
await trx.rollback();
throw e;
}

async function upsertLeagueMatch(trx: knex.Knex.Transaction) {
// Index the matchid to the league
Expand Down
Loading