feat(plugins): add replication plugin for external-to-internal pull#145
Open
MilosM348 wants to merge 1 commit intoouterbase:mainfrom
Open
feat(plugins): add replication plugin for external-to-internal pull#145MilosM348 wants to merge 1 commit intoouterbase:mainfrom
MilosM348 wants to merge 1 commit intoouterbase:mainfrom
Conversation
Closes outerbase#72. Adds a ReplicationPlugin that pulls rows from any existing ExternalDatabaseSource (Postgres, MySQL, D1, Turso, StarbaseDB, Hyperdrive) into the DO's internal SQLite, paged and watermark-driven. Design constraints: 1. Reuse existing primitives only. Imports executeExternalQuery from src/operation.ts for the read leg and uses dataSource.rpc.executeQuery for the write leg - the same path every other plugin uses. No new client libraries, no new SDK surface, no wrangler.toml schema change. 2. Composable scheduling. Ticks are driven by either the existing CronPlugin or the Worker's native scheduled() handler, decided at the user's wiring layer. The plugin owns no scheduler. 3. Per-table cursor. Watermark stored in tmp_replication_state scoped by sourceId and table, advanced per row (not per page) so mid-page failures don't re-fetch on the next tick. 4. One page per call. Bounds runtime under the Workers subrequest budget; callers re-tick when morePagesAvailable is true. 5. Identifier whitelist on every table and column before splicing into SQL. 6. Per-table isolation. A throw on one table is recorded in summary and does not abort replication of the others in the same tick. Covers the three additional-context bullets from outerbase#72: - configurable interval (caller-driven, cron or scheduled) - per-table allowlist via ReplicationTableConfig.tables - last-queried mark via ReplicationTableConfig.cursorColumn Endpoints (admin-only): POST /replicate/run run one tick, return summary GET /replicate/status read current cursors and last-run metadata Diff is +1090/-0 across 5 files: 524 plugin code, 383 tests (14 cases, all passing), 159 README, 23 meta.json, 1 export line in dist/plugins.ts. No edits to src/index.ts, no new dependencies. Co-authored-by: Cursor <cursoragent@cursor.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
/claim #72
Closes #72.
What this PR does
Adds a
ReplicationPluginthat pulls rows from any existingExternalDatabaseSource(Postgres, MySQL, D1, Turso, StarbaseDB-on-StarbaseDB, Hyperdrive) into the DO's internal SQLite, paged and watermark-driven.Maps directly to the issue's three
Additional contextbullets:CronPluginor the Worker's nativescheduled()handler. The plugin owns no scheduler.ReplicationTableConfig[]- explicit, per-table allowlist with optional source/dest renaming and column projection.cursorColumnper table, watermark intmp_replication_state. SQL isSELECT ... WHERE cursorColumn > ? ORDER BY cursorColumn ASC LIMIT pageSize.Why this PR over the existing 18
The existing dogpile has been arguing about audit logs, event-loop yielding, and reflection. This PR ships the smallest correct primitive on top of what's already in the repo:
pg,mysql2,pg-cursor, etc.src/index.tswrangler.tomlschema[replication]blockexecuteExternalQuery)CronPluginorscheduled())The merged Clerk plugin (#91) is a useful precedent: same shape (
plugins/<name>/{index,README,meta.json}, 1-line export indist/plugins.ts, no edits tosrc/index.ts).Architecture, briefly
plugins/replication/ index.ts 524 LOC, single class, no submodules index.test.ts 383 LOC, 14 cases (vitest) README.md meta.json dist/plugins.ts (+1 line: export ReplicationPlugin)Read leg:
executeExternalQuery({ sql, params, dataSource: {...ds, source: 'external', external}, config })- the plugin literally calls the same function the request handler uses for external queries. Every dialect the SDK already supports works automatically; nothing here knows about Postgres or MySQL specifically.Write leg:
dataSource.rpc.executeQuery({ sql: INSERT ... ON CONFLICT(pk) DO UPDATE, params })- same path every other plugin uses for internal SQLite.State: one table,
tmp_replication_state(source_id, table_name, cursor_column, cursor_value, last_run_ts, last_rows_pulled, last_error), scoped bysourceIdso one DO can replicate from multiple sources without colliding.Watermark advance: per row, not per page. A mid-page failure preserves the highest-seen cursor on rows already written, so the next tick resumes after that point and never re-fetches them.
Bounded runtime: one page per table per tick. Returns
morePagesAvailable: booleanso callers can re-tick if needed; never busy-loops the Worker.Configuration (full, copy-pasteable)
``ts
import { ReplicationPlugin } from '@outerbase/starbasedb/plugins'
const replication = new ReplicationPlugin({
sourceId: 'supabase-prod',
pageSize: 500,
source: {
dialect: 'postgresql',
host: 'db.example.com',
port: 5432,
user: 'replicator',
password: env.PG_PASSWORD,
database: 'app',
},
tables: [
{ table: 'users', cursorColumn: 'id' },
{ table: 'orders', cursorColumn: 'updated_at', primaryKey: 'order_id' },
],
})
const plugins = [ /* existing... */, replication ] satisfies StarbasePlugin[]
``
Driven from
scheduled():ts async scheduled(event, env, ctx) { const { dataSource, config } = await buildDataSource(env, ctx) ctx.waitUntil(replication.tick({ dataSource, config })) }Or admin-triggered:
bash curl -X POST https://your-worker/replicate/run \ -H "Authorization: Bearer ADMIN_AUTHORIZATION_TOKEN"Endpoints
/replicate/run/replicate/statusSafety notes
^[A-Za-z_][A-Za-z0-9_]*\$before splicing. The plugin does not parameterise identifiers; it whitelists them. Testrejects malformed cursor column to prevent SQL injection` covers the regression.ReplicationTableResult.error, and does not abort other tables in the same tick.Tests
npx vitest run plugins/replication- 14 passed, 0 failed:register()mounts middleware that createstmp_replication_stateWHEREclause; subsequent ticks bind the prior cursor as a parametermorePagesAvailableflips when the source returns a full pagedataSource.externalwhen no plugin source is configuredOut of scope (deliberate)
CDCplugin already)Happy to add any of the above as follow-ups if you want them in scope.