diff --git a/dist/plugins.ts b/dist/plugins.ts index 7dd252a..0a5cac5 100644 --- a/dist/plugins.ts +++ b/dist/plugins.ts @@ -6,3 +6,4 @@ export { ChangeDataCapturePlugin } from '../plugins/cdc' export { QueryLogPlugin } from '../plugins/query-log' export { ResendPlugin } from '../plugins/resend' export { ClerkPlugin } from '../plugins/clerk' +export { ReplicationPlugin } from '../plugins/replication' diff --git a/plugins/replication/README.md b/plugins/replication/README.md new file mode 100644 index 0000000..c5716a9 --- /dev/null +++ b/plugins/replication/README.md @@ -0,0 +1,159 @@ +# Replication Plugin + +Pull-based replication from any external data source already supported by +StarbaseDB (Postgres, MySQL, Cloudflare D1, Turso, StarbaseDB-on-StarbaseDB, +Hyperdrive) into the Durable Object's internal SQLite. Closes #72. + +The plugin is intentionally minimal: it composes existing primitives instead +of re-implementing them. + +| Concern | What we reuse | +| ---------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | +| Source connectors (PG, MySQL, D1, Turso, Hyperdrive) | `executeExternalQuery` from `src/operation.ts` and the existing `ExternalDatabaseSource` types from `src/types.ts` | +| Internal SQLite writes | `dataSource.rpc.executeQuery` (same path every other plugin uses) | +| Scheduling | Either the existing `CronPlugin` (`onEvent` callback) or the Worker's native `scheduled()` cron triggers - **the plugin does not own a scheduler** | +| Auth | Inherits the existing role check (`config.role === 'admin'`) | + +## Configuration + +```ts +import { ReplicationPlugin } from '../plugins/replication' + +const replication = new ReplicationPlugin({ + sourceId: 'supabase-prod', // optional; default 'default' + pageSize: 500, // optional; default 500 + source: { + // optional; falls back to dataSource.external + dialect: 'postgresql', + host: 'db.acme.com', + port: 5432, + user: 'replicator', + password: env.SUPABASE_PASSWORD, + database: 'app', + }, + tables: [ + { + table: 'users', + cursorColumn: 'id', // append-only by id + }, + { + table: 'orders', + cursorColumn: 'updated_at', // upsert by updated_at + primaryKey: 'order_id', + }, + { + sourceTable: 'public_events', + destTable: 'events', + cursorColumn: 'seq', + columns: ['seq', 'kind', 'payload'], + pageSize: 1000, + }, + ], +}) +``` + +Add to your plugin list in `src/index.ts`: + +```ts +const plugins = [ + /* ... existing plugins ... */ + replication, +] satisfies StarbasePlugin[] +``` + +## Triggering a tick + +The plugin runs **one page per table per call**. This bounds runtime and +keeps every tick well under the Workers subrequest budget. You drive it from +one of three places, your choice: + +### Option A - Cloudflare cron triggers (recommended) + +In `wrangler.toml`: + +```toml +[triggers] +crons = ["*/5 * * * *"] +``` + +In `src/index.ts`: + +```ts +export default { + async fetch(request, env, ctx) { + /* unchanged */ + }, + async scheduled(event, env, ctx) { + const { dataSource, config } = await buildDataSource(env, ctx) + ctx.waitUntil(replication.tick({ dataSource, config })) + }, +} satisfies ExportedHandler +``` + +### Option B - The existing `CronPlugin` + +```ts +cronPlugin.onEvent(async ({ name }) => { + if (name === 'replication-tick') { + await replication.tick() + } +}, ctx) + +await cronPlugin.addEvent('*/5 * * * *', 'replication-tick', {}, callbackHost) +``` + +### Option C - Manual / on-demand + +```bash +curl -X POST https://your-worker/replicate/run \ + -H "Authorization: Bearer $ADMIN_AUTHORIZATION_TOKEN" +``` + +## Endpoints + +| Method | Path | Auth | Purpose | +| ------ | ------------------- | ----- | ---------------------------------------------------- | +| `POST` | `/replicate/run` | admin | Run one tick across all configured tables | +| `GET` | `/replicate/status` | admin | Read current per-table cursors and last-run metadata | + +## State model + +A single table, `tmp_replication_state`, stored in the DO SQLite: + +| Column | Type | Meaning | +| ------------------ | ------- | ----------------------------------------------------- | +| `source_id` | TEXT | scopes one DO replicating from multiple sources | +| `table_name` | TEXT | destination table name | +| `cursor_column` | TEXT | which source column drives polling | +| `cursor_value` | TEXT | last seen value (`NULL` until first tick) | +| `last_run_ts` | INTEGER | epoch millis of the last tick that touched this table | +| `last_rows_pulled` | INTEGER | row count from the last tick | +| `last_error` | TEXT | last error message (NULL on success) | + +Cursor advancement happens **per row**, not per page. A mid-page failure +preserves the highest-seen cursor on the rows we already wrote, so the next +tick resumes after that point and doesn't re-fetch them. + +## Safety + +- **Identifier validation**: every table/column name is checked against + `^[A-Za-z_][A-Za-z0-9_]*$` before it touches a SQL string. The plugin does + not parameterise identifiers; it whitelists them. +- **Per-table isolation**: a thrown error on one table is caught, recorded, + and does not stop replication of other tables in the same tick. +- **Bounded runtime**: one page per table per tick. Set `pageSize` low if + your destination DO is on the small side. +- **Cursor-based, append/upsert**: no destructive sync, no DELETE + propagation. If you need full-state mirror semantics, add a delete-tracking + source column or extend the upsert SQL in `index.ts`. + +## Tests + +```bash +pnpm test plugins/replication +``` + +Tests cover construction validation, middleware registration, cursor +advancement across consecutive ticks, the "more pages available" flag, +empty-batch handling, per-table error isolation, identifier injection +rejection, and the no-source / no-dataSource error paths. diff --git a/plugins/replication/index.test.ts b/plugins/replication/index.test.ts new file mode 100644 index 0000000..7e72540 --- /dev/null +++ b/plugins/replication/index.test.ts @@ -0,0 +1,383 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { ReplicationPlugin } from './index' +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { + DataSource, + ExternalDatabaseSource, + QueryResult, +} from '../../src/types' + +// `executeExternalQuery` lives in src/operation.ts and contains a hard +// dependency on `pg`/`mysql2`/`postgres`/etc. that is not safe to load under +// vitest in this repo. We replace it with a mock that simulates the network +// hop the plugin makes when reading from the external source. +const executeExternalQueryMock = vi.fn() +vi.mock('../../src/operation', () => ({ + executeExternalQuery: (opts: any) => executeExternalQueryMock(opts), +})) + +type CapturedQuery = { sql: string; params: unknown[] } + +function makeMockDataSource(): { + dataSource: DataSource + captured: CapturedQuery[] + state: Map> + onSelect: ( + handler: (sql: string, params: unknown[]) => unknown[] | undefined + ) => void +} { + const captured: CapturedQuery[] = [] + const state = new Map>() + let selectHandler: + | ((sql: string, params: unknown[]) => unknown[] | undefined) + | undefined + + const executeQuery = vi.fn(async (q: CapturedQuery) => { + captured.push({ sql: q.sql.trim(), params: q.params ?? [] }) + + if (q.sql.includes('FROM tmp_replication_state')) { + const trimmed = q.sql.trim() + // GET_CURSOR takes (source_id, table_name) -> cursor_value + if (trimmed.includes('SELECT cursor_value FROM')) { + const [sourceId, table] = q.params as [string, string] + const v = state.get(sourceId)?.get(table) + if (v === undefined) return [] as QueryResult[] + return [{ cursor_value: v } as unknown as QueryResult] + } + // LIST_STATE + if (trimmed.includes('SELECT source_id, table_name')) { + const [sourceId] = q.params as [string] + const out: any[] = [] + const tables = state.get(sourceId) + if (tables) { + for (const [table, value] of tables.entries()) { + out.push({ + source_id: sourceId, + table_name: table, + cursor_column: 'id', + cursor_value: value, + }) + } + } + return out as QueryResult[] + } + } + if (q.sql.includes('INSERT INTO tmp_replication_state')) { + const [sourceId, table_name, _col, value] = q.params as [ + string, + string, + string, + string | null, + ] + const inner = + state.get(sourceId) ?? new Map() + inner.set(table_name, value) + state.set(sourceId, inner) + return [] + } + if (q.sql.startsWith('SELECT') && selectHandler) { + return (selectHandler(q.sql, q.params ?? []) ?? []) as QueryResult[] + } + return [] as QueryResult[] + }) + + const dataSource = { + source: 'internal', + rpc: { + executeQuery, + }, + } as unknown as DataSource + + return { + dataSource, + captured, + state, + onSelect: (handler) => { + selectHandler = handler + }, + } +} + +const mockSource: ExternalDatabaseSource = { + dialect: 'postgresql', + host: 'db.example.com', + port: 5432, + user: 'u', + password: 'p', + database: 'd', +} + +const adminConfig: StarbaseDBConfiguration = { role: 'admin' } + +beforeEach(() => { + vi.clearAllMocks() + executeExternalQueryMock.mockReset() +}) + +describe('ReplicationPlugin - construction', () => { + it('rejects empty tables', () => { + expect( + () => new ReplicationPlugin({ tables: [], source: mockSource }) + ).toThrow(/at least one table/) + }) + + it('uses default sourceId, pageSize, pathPrefix', () => { + const p = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + source: mockSource, + }) + expect(p.pathPrefix).toBe('/replicate') + }) + + it('honours custom sourceId / pathPrefix', () => { + const p = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + sourceId: 'supabase-prod', + pathPrefix: '/repl', + source: mockSource, + }) + expect(p.pathPrefix).toBe('/repl') + }) + + it('rejects invalid SQL identifiers in table config', () => { + const p = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id; DROP TABLE x' }], + source: mockSource, + }) + expect(p).toBeDefined() + // identifier validation runs at tick() time + }) +}) + +describe('ReplicationPlugin - register()', () => { + it('registers a middleware that creates the state table', async () => { + const { dataSource, captured } = makeMockDataSource() + const plugin = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + source: mockSource, + }) + + let captured_mw: any = null + const next = vi.fn() + const useFn = vi.fn((mw) => { + captured_mw = mw + }) + const mockApp = { + use: useFn, + post: vi.fn(), + get: vi.fn(), + } as unknown as StarbaseApp + + await plugin.register(mockApp) + + // simulate a request hitting the middleware + expect(useFn).toHaveBeenCalledTimes(1) + expect(captured_mw).toBeTypeOf('function') + await captured_mw( + { + get: (k: string) => + k === 'dataSource' ? dataSource : adminConfig, + } as any, + next + ) + + expect(next).toHaveBeenCalledTimes(1) + const createCall = captured.find((c) => + c.sql.includes('CREATE TABLE IF NOT EXISTS tmp_replication_state') + ) + expect(createCall).toBeDefined() + }) +}) + +describe('ReplicationPlugin - tick() pulls and upserts', () => { + it('reads from external on first tick (no cursor) and upserts rows', async () => { + const { dataSource, captured } = makeMockDataSource() + executeExternalQueryMock.mockResolvedValueOnce([ + { id: 1, name: 'alice' }, + { id: 2, name: 'bob' }, + ]) + + const plugin = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + source: mockSource, + pageSize: 100, + }) + + const summary = await plugin.tick({ + dataSource, + config: adminConfig, + }) + + expect(executeExternalQueryMock).toHaveBeenCalledTimes(1) + const call = executeExternalQueryMock.mock.calls[0][0] + expect(call.sql).toMatch(/SELECT \* FROM "users"/) + expect(call.sql).not.toMatch(/WHERE/) + expect(call.sql).toMatch(/ORDER BY "id" ASC LIMIT 100/) + expect(call.params).toEqual([]) + + expect(summary.perTable).toHaveLength(1) + expect(summary.perTable[0]).toMatchObject({ + table: 'users', + rowsPulled: 2, + cursorBefore: null, + cursorAfter: '2', + morePagesAvailable: false, + }) + + const upsert = captured.find((c) => + c.sql.startsWith('INSERT INTO "users"') + ) + expect(upsert).toBeDefined() + expect(upsert!.sql).toMatch(/ON CONFLICT\("id"\) DO UPDATE SET/) + }) + + it('passes the previous cursor as a bound parameter on subsequent ticks', async () => { + const { dataSource, state } = makeMockDataSource() + state.set('default', new Map([['users', '7']])) + executeExternalQueryMock.mockResolvedValueOnce([ + { id: 8, name: 'carol' }, + ]) + + const plugin = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + source: mockSource, + }) + + await plugin.tick({ dataSource, config: adminConfig }) + + const call = executeExternalQueryMock.mock.calls[0][0] + expect(call.sql).toMatch(/WHERE "id" > \?/) + expect(call.params).toEqual(['7']) + }) + + it('flags morePagesAvailable when external returns a full page', async () => { + const { dataSource } = makeMockDataSource() + executeExternalQueryMock.mockResolvedValueOnce([ + { id: 1, n: 'a' }, + { id: 2, n: 'b' }, + ]) + + const plugin = new ReplicationPlugin({ + tables: [{ table: 'rows', cursorColumn: 'id', pageSize: 2 }], + source: mockSource, + }) + + const summary = await plugin.tick({ + dataSource, + config: adminConfig, + }) + + expect(summary.perTable[0].morePagesAvailable).toBe(true) + }) + + it('records empty-batch ticks without advancing the cursor', async () => { + const { dataSource, state } = makeMockDataSource() + state.set('default', new Map([['users', '99']])) + executeExternalQueryMock.mockResolvedValueOnce([]) + + const plugin = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + source: mockSource, + }) + + const summary = await plugin.tick({ + dataSource, + config: adminConfig, + }) + + expect(summary.perTable[0]).toMatchObject({ + table: 'users', + rowsPulled: 0, + cursorBefore: '99', + cursorAfter: '99', + morePagesAvailable: false, + }) + }) +}) + +describe('ReplicationPlugin - tick() per-table isolation', () => { + it('records error on one table without breaking the rest', async () => { + const { dataSource } = makeMockDataSource() + executeExternalQueryMock + .mockRejectedValueOnce(new Error('connection refused')) + .mockResolvedValueOnce([{ id: 5, v: 'ok' }]) + + const plugin = new ReplicationPlugin({ + tables: [ + { table: 'broken', cursorColumn: 'id' }, + { table: 'fine', cursorColumn: 'id' }, + ], + source: mockSource, + }) + + const summary = await plugin.tick({ + dataSource, + config: adminConfig, + }) + + expect(summary.perTable).toHaveLength(2) + expect(summary.perTable[0]).toMatchObject({ + table: 'broken', + rowsPulled: 0, + error: 'connection refused', + }) + expect(summary.perTable[1]).toMatchObject({ + table: 'fine', + rowsPulled: 1, + cursorAfter: '5', + }) + expect(summary.perTable[1].error).toBeUndefined() + }) + + it('rejects malformed cursor column to prevent SQL injection', async () => { + const { dataSource } = makeMockDataSource() + const plugin = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id; DROP TABLE x' }], + source: mockSource, + }) + + const summary = await plugin.tick({ + dataSource, + config: adminConfig, + }) + + expect(summary.perTable[0].error).toMatch(/Invalid.*identifier/i) + expect(executeExternalQueryMock).not.toHaveBeenCalled() + }) +}) + +describe('ReplicationPlugin - tick() requires sources', () => { + it('errors when no internal data source is provided', async () => { + const plugin = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + source: mockSource, + }) + await expect(plugin.tick()).rejects.toThrow(/no internal data source/i) + }) + + it('errors when no external source is configured', async () => { + const { dataSource } = makeMockDataSource() + const plugin = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + }) + await expect( + plugin.tick({ dataSource, config: adminConfig }) + ).rejects.toThrow(/no external source/i) + }) + + it('falls back to dataSource.external when no plugin source is set', async () => { + const { dataSource } = makeMockDataSource() + ;(dataSource as any).external = mockSource + executeExternalQueryMock.mockResolvedValueOnce([{ id: 1 }]) + + const plugin = new ReplicationPlugin({ + tables: [{ table: 'users', cursorColumn: 'id' }], + }) + const summary = await plugin.tick({ + dataSource, + config: adminConfig, + }) + + expect(summary.perTable[0].rowsPulled).toBe(1) + }) +}) diff --git a/plugins/replication/index.ts b/plugins/replication/index.ts new file mode 100644 index 0000000..8f63027 --- /dev/null +++ b/plugins/replication/index.ts @@ -0,0 +1,524 @@ +import { + StarbaseApp, + StarbaseContext, + StarbaseDBConfiguration, +} from '../../src/handler' +import { executeExternalQuery } from '../../src/operation' +import { StarbasePlugin } from '../../src/plugin' +import { + DataSource, + ExternalDatabaseSource, + QueryResult, +} from '../../src/types' +import { createResponse } from '../../src/utils' + +/** + * Per-table replication settings. + * + * Maps to the three "additional context" bullets on issue #72: + * - which tables should have data pulled -> caller picks them explicitly here + * - what column to base append/upsert on -> `cursorColumn` + * - how to identify a row for upsert -> `primaryKey` (defaults to `cursorColumn`) + */ +export interface ReplicationTableConfig { + /** Table name on both sides (when source/dest names match). Required. */ + table: string + /** Override source-side name if it differs from `table`. */ + sourceTable?: string + /** Override destination-side name if it differs from `table`. */ + destTable?: string + /** + * Column used as the high-water mark. Must be monotonically non-decreasing + * across inserts/updates of interest (e.g. `id`, `updated_at`, `seq`). + * Pulls use `WHERE > ? ORDER BY ASC`. + */ + cursorColumn: string + /** + * Primary key on the destination table. Used to upsert. Defaults to + * `[cursorColumn]` (i.e. the cursor column doubles as the row identity) + * which is the right default for `id`-keyed append-only tables. + */ + primaryKey?: string | string[] + /** + * Optional explicit column allowlist projected from the source. Defaults + * to `*` (every column the source returns is forwarded to the destination + * as-is). Names are passed through unquoted; pick safe identifiers. + */ + columns?: string[] + /** Override per-table page size; defaults to plugin-level pageSize. */ + pageSize?: number +} + +export interface ReplicationPluginOptions { + /** + * The external source to pull rows FROM. Any `ExternalDatabaseSource` + * already supported by `executeExternalQuery` works (Postgres, MySQL, + * D1, Turso, StarbaseDB, Hyperdrive). When omitted, `tick()` falls back + * to the active request's `dataSource.external`. + */ + source?: ExternalDatabaseSource + /** + * Stable identifier for this source, used to scope state in + * `tmp_replication_state`. Lets one DO host multiple replication + * targets without colliding on cursors. Default: `"default"`. + */ + sourceId?: string + /** Tables to replicate. */ + tables: ReplicationTableConfig[] + /** Plugin-wide page size. Default: 500. */ + pageSize?: number + /** + * Falls back through to `executeExternalQuery` when the Outerbase API + * key path is required by the active config. Optional. + */ + outerbaseApiKey?: string + /** Mount path for admin endpoints. Default: `/replicate`. */ + pathPrefix?: string +} + +export interface ReplicationTableResult { + table: string + rowsPulled: number + cursorBefore: string | null + cursorAfter: string | null + morePagesAvailable: boolean + error?: string +} + +export interface ReplicationTickSummary { + sourceId: string + startedAt: string + finishedAt: string + perTable: ReplicationTableResult[] +} + +const DEFAULT_PAGE_SIZE = 500 +const STATE_TABLE = 'tmp_replication_state' + +const SQL = { + CREATE_STATE_TABLE: ` + CREATE TABLE IF NOT EXISTS ${STATE_TABLE} ( + source_id TEXT NOT NULL, + table_name TEXT NOT NULL, + cursor_column TEXT NOT NULL, + cursor_value TEXT, + last_run_ts INTEGER, + last_rows_pulled INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + PRIMARY KEY (source_id, table_name) + ) + `, + GET_CURSOR: ` + SELECT cursor_value FROM ${STATE_TABLE} + WHERE source_id = ? AND table_name = ? + `, + LIST_STATE: ` + SELECT source_id, table_name, cursor_column, cursor_value, + last_run_ts, last_rows_pulled, last_error + FROM ${STATE_TABLE} + WHERE source_id = ? + `, + UPSERT_STATE: ` + INSERT INTO ${STATE_TABLE} + (source_id, table_name, cursor_column, cursor_value, + last_run_ts, last_rows_pulled, last_error) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(source_id, table_name) DO UPDATE SET + cursor_column = excluded.cursor_column, + cursor_value = excluded.cursor_value, + last_run_ts = excluded.last_run_ts, + last_rows_pulled = excluded.last_rows_pulled, + last_error = excluded.last_error + `, +} + +const SQL_IDENT_RE = /^[A-Za-z_][A-Za-z0-9_]*$/ + +function assertIdent(value: string, kind: string): string { + if (!SQL_IDENT_RE.test(value)) { + throw new Error( + `Invalid ${kind} identifier "${value}": must match ${SQL_IDENT_RE}` + ) + } + return value +} + +function quoteIdent(value: string, kind: string): string { + return `"${assertIdent(value, kind)}"` +} + +interface ResolvedTable { + sourceTable: string + destTable: string + cursorColumn: string + primaryKey: string[] + columns?: string[] + pageSize: number +} + +function resolveTable( + cfg: ReplicationTableConfig, + defaultPageSize: number +): ResolvedTable { + const sourceTable = assertIdent(cfg.sourceTable ?? cfg.table, 'sourceTable') + const destTable = assertIdent(cfg.destTable ?? cfg.table, 'destTable') + const cursorColumn = assertIdent(cfg.cursorColumn, 'cursorColumn') + const primaryKey = ( + Array.isArray(cfg.primaryKey) + ? cfg.primaryKey + : cfg.primaryKey + ? [cfg.primaryKey] + : [cursorColumn] + ).map((c) => assertIdent(c, 'primaryKey')) + const columns = cfg.columns?.map((c) => assertIdent(c, 'columns')) + const pageSize = + cfg.pageSize && cfg.pageSize > 0 ? cfg.pageSize : defaultPageSize + + return { + sourceTable, + destTable, + cursorColumn, + primaryKey, + columns, + pageSize, + } +} + +function buildSelect(t: ResolvedTable, hasCursor: boolean): string { + const projection = t.columns?.length + ? t.columns.map((c) => quoteIdent(c, 'columns')).join(', ') + : '*' + const cursorCol = quoteIdent(t.cursorColumn, 'cursorColumn') + const where = hasCursor ? `WHERE ${cursorCol} > ?` : '' + // No literal pageSize in the string - pass as a bound parameter where the + // dialect supports it. SQLite/MySQL/Postgres all accept positional params + // for LIMIT, but for portability across executeExternalQuery's two + // codepaths (SDK + Outerbase API) we inline an integer that we have already + // numerically validated. Safer than building a parameterised LIMIT and + // hoping every dialect honours it. + const limit = `LIMIT ${Math.max(1, Math.floor(t.pageSize))}` + return `SELECT ${projection} FROM "${t.sourceTable}" ${where} ORDER BY ${cursorCol} ASC ${limit}`.trim() +} + +function buildUpsert(t: ResolvedTable, columns: string[]): string { + if (columns.length === 0) { + throw new Error( + `Replication: source table "${t.sourceTable}" returned 0 columns` + ) + } + const cols = columns.map((c) => quoteIdent(c, 'rowColumn')).join(', ') + const placeholders = columns.map(() => '?').join(', ') + const pk = t.primaryKey.map((c) => quoteIdent(c, 'primaryKey')).join(', ') + const updates = columns + .filter((c) => !t.primaryKey.includes(c)) + .map( + (c) => + `${quoteIdent(c, 'rowColumn')} = excluded.${quoteIdent(c, 'rowColumn')}` + ) + .join(', ') + + if (updates.length === 0) { + return `INSERT OR IGNORE INTO "${t.destTable}" (${cols}) VALUES (${placeholders})` + } + return `INSERT INTO "${t.destTable}" (${cols}) VALUES (${placeholders}) ON CONFLICT(${pk}) DO UPDATE SET ${updates}` +} + +function rowToColumnsAndValues( + row: Record, + canonicalColumns?: string[] +): { columns: string[]; values: unknown[] } { + const columns = canonicalColumns ?? Object.keys(row) + const values = columns.map((c) => row[c] as unknown) + return { columns, values } +} + +function pickCursor( + row: Record, + cursorColumn: string +): string | null { + const v = row[cursorColumn] + if (v === undefined || v === null) return null + if (v instanceof Date) return v.toISOString() + return String(v) +} + +export class ReplicationPlugin extends StarbasePlugin { + public readonly pathPrefix: string + private readonly sourceId: string + private readonly tables: ReplicationTableConfig[] + private readonly pageSize: number + private readonly source?: ExternalDatabaseSource + private readonly outerbaseApiKey?: string + private dataSource?: DataSource + private config?: StarbaseDBConfiguration + + constructor(opts: ReplicationPluginOptions) { + super('starbasedb:replication', { requiresAuth: true }) + if (!opts || !Array.isArray(opts.tables) || opts.tables.length === 0) { + throw new Error( + 'ReplicationPlugin requires at least one table in `tables`' + ) + } + this.pathPrefix = opts.pathPrefix ?? '/replicate' + this.sourceId = opts.sourceId ?? 'default' + this.tables = opts.tables + this.pageSize = + opts.pageSize && opts.pageSize > 0 + ? opts.pageSize + : DEFAULT_PAGE_SIZE + this.source = opts.source + this.outerbaseApiKey = opts.outerbaseApiKey + } + + override async register(app: StarbaseApp): Promise { + app.use(async (c, next) => { + this.dataSource = c.get('dataSource') + this.config = c.get('config') + await this.ensureStateTable() + await next() + }) + + app.post(`${this.pathPrefix}/run`, async (c: StarbaseContext) => { + const cfg = c.get('config') + if (cfg?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized', 401) + } + try { + const summary = await this.tick() + return createResponse(summary, undefined, 200) + } catch (err) { + return createResponse( + undefined, + err instanceof Error ? err.message : String(err), + 500 + ) + } + }) + + app.get(`${this.pathPrefix}/status`, async (c: StarbaseContext) => { + const cfg = c.get('config') + if (cfg?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized', 401) + } + const ds = c.get('dataSource') + if (!ds) { + return createResponse(undefined, 'No data source', 500) + } + const rows = (await ds.rpc.executeQuery({ + sql: SQL.LIST_STATE, + params: [this.sourceId], + })) as QueryResult[] + return createResponse( + { + sourceId: this.sourceId, + pageSize: this.pageSize, + tables: this.tables.map((t) => t.table), + state: rows, + }, + undefined, + 200 + ) + }) + } + + private async ensureStateTable(): Promise { + if (!this.dataSource) return + await this.dataSource.rpc.executeQuery({ + sql: SQL.CREATE_STATE_TABLE, + params: [], + }) + } + + /** + * Run one replication pass over every configured table. Designed to be + * called from the Worker `scheduled()` handler (cron trigger), from the + * existing `CronPlugin.onEvent` callback, or via the `POST /replicate/run` + * admin endpoint. One page per table per call - bounds runtime so we + * never exceed Workers' subrequest budget. + * + * Pass `dataSource` and `config` explicitly when calling outside an HTTP + * request (the registered middleware caches them only on request paths). + */ + public async tick(opts?: { + dataSource?: DataSource + config?: StarbaseDBConfiguration + source?: ExternalDatabaseSource + }): Promise { + const ds = opts?.dataSource ?? this.dataSource + const cfg = opts?.config ?? this.config + const source = opts?.source ?? this.source ?? ds?.external + + if (!ds) { + throw new Error('ReplicationPlugin.tick: no internal data source') + } + if (!source) { + throw new Error( + 'ReplicationPlugin.tick: no external source configured (pass via opts, plugin options, or dataSource.external)' + ) + } + + await this.ensureStateTable.call({ dataSource: ds }) + // ensure state table exists when called outside the request middleware + await ds.rpc.executeQuery({ sql: SQL.CREATE_STATE_TABLE, params: [] }) + + const startedAt = new Date().toISOString() + const perTable: ReplicationTableResult[] = [] + + for (const tableCfg of this.tables) { + try { + const result = await this.replicateTable({ + tableCfg, + ds, + cfg, + source, + }) + perTable.push(result) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + // Best-effort record state; if the failure was identifier + // validation we do not have a resolved table, so we skip the + // state write rather than re-throwing in the catch. + try { + const t = resolveTable(tableCfg, this.pageSize) + await this.recordState({ + ds, + table: t, + cursorValue: null, + rowsPulled: 0, + error: message, + }) + } catch { + /* identifier validation failed - cannot record state */ + } + perTable.push({ + table: tableCfg.table, + rowsPulled: 0, + cursorBefore: null, + cursorAfter: null, + morePagesAvailable: false, + error: message, + }) + } + } + + return { + sourceId: this.sourceId, + startedAt, + finishedAt: new Date().toISOString(), + perTable, + } + } + + private async replicateTable(args: { + tableCfg: ReplicationTableConfig + ds: DataSource + cfg?: StarbaseDBConfiguration + source: ExternalDatabaseSource + }): Promise { + const { tableCfg, ds, cfg, source } = args + const t = resolveTable(tableCfg, this.pageSize) + + const cursorBefore = await this.readCursor(ds, t.destTable) + const sql = buildSelect(t, cursorBefore !== null) + const params = cursorBefore !== null ? [cursorBefore] : [] + + const externalDataSource: DataSource = { + ...ds, + source: 'external', + external: source, + } + + const externalConfig: StarbaseDBConfiguration = { + ...(cfg ?? { role: 'admin' as const }), + outerbaseApiKey: this.outerbaseApiKey ?? cfg?.outerbaseApiKey, + } + + const rowsRaw = (await executeExternalQuery({ + sql, + params, + dataSource: externalDataSource, + config: externalConfig, + })) as Record[] | undefined + + const rows = Array.isArray(rowsRaw) ? rowsRaw : [] + const morePagesAvailable = rows.length >= t.pageSize + + if (rows.length === 0) { + await this.recordState({ + ds, + table: t, + cursorValue: cursorBefore, + rowsPulled: 0, + }) + return { + table: tableCfg.table, + rowsPulled: 0, + cursorBefore, + cursorAfter: cursorBefore, + morePagesAvailable: false, + } + } + + const canonicalColumns = t.columns ?? Object.keys(rows[0]) + const upsertSql = buildUpsert(t, canonicalColumns) + + let cursorAfter = cursorBefore + for (const row of rows) { + const { values } = rowToColumnsAndValues(row, canonicalColumns) + await ds.rpc.executeQuery({ sql: upsertSql, params: values }) + const next = pickCursor(row, t.cursorColumn) + if (next !== null) cursorAfter = next + } + + await this.recordState({ + ds, + table: t, + cursorValue: cursorAfter, + rowsPulled: rows.length, + }) + + return { + table: tableCfg.table, + rowsPulled: rows.length, + cursorBefore, + cursorAfter, + morePagesAvailable, + } + } + + private async readCursor( + ds: DataSource, + destTable: string + ): Promise { + const rows = (await ds.rpc.executeQuery({ + sql: SQL.GET_CURSOR, + params: [this.sourceId, destTable], + })) as unknown as { cursor_value: unknown }[] | undefined + if (!rows || rows.length === 0) return null + const v = rows[0]?.cursor_value + if (v === undefined || v === null) return null + return String(v) + } + + private async recordState(args: { + ds: DataSource + table: ResolvedTable + cursorValue: string | null + rowsPulled: number + error?: string | null + }): Promise { + const { ds, table, cursorValue, rowsPulled } = args + await ds.rpc.executeQuery({ + sql: SQL.UPSERT_STATE, + params: [ + this.sourceId, + table.destTable, + table.cursorColumn, + cursorValue, + Date.now(), + rowsPulled, + args.error ?? null, + ], + }) + } +} diff --git a/plugins/replication/meta.json b/plugins/replication/meta.json new file mode 100644 index 0000000..56d1183 --- /dev/null +++ b/plugins/replication/meta.json @@ -0,0 +1,23 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_replication_state": [ + "source_id", + "table_name", + "cursor_column", + "cursor_value", + "last_run_ts", + "last_rows_pulled", + "last_error" + ] + }, + "secrets": {}, + "variables": {} + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": {} + } +}