From 990468dcd2d8c95319d48b4604e4083d82bf45d1 Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Fri, 15 May 2026 14:05:34 +0200 Subject: [PATCH 1/6] fix: mtree init publication ordering + exclude metadata from spock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes 'publication "ace_mtree_pub" does not exist' (SQLSTATE 42704) during 'ace mtree table-diff' on Spock-enabled clusters. Two root causes addressed: 1. Init order: the replication slot was being created before the publication was committed to WAL. The slot's consistent point therefore preceded the publication, and pgoutput's get_publication_oid failed during change-callback replay. MtreeInit is now split into three phases per node: Phase A commits schema + metadata table + publication and captures the publication commit LSN; Phase B creates the slot (its consistent point is now strictly after the publication); Phase C persists slot/start_lsn/pub_commit_lsn. 2. Spock replication of ace_cdc_metadata: under DDL replication the metadata table was auto-added to Spock's default repset, so each node's node-local start_lsn leaked across the cluster and overwrote peer LSNs. ExcludeMetadataFromSpockRepsets enumerates spock.tables and removes the metadata table from every repset it landed in; all metadata writes (init, periodic flush, on-shutdown flush, final update) now wrap their write in spock.repair_mode to defend against the per-node-init race where a peer's repset still contains the table. spock.tables surfaces every Spock-known relation with NULL set_name for tables not in any repset, so the enumeration filters AND set_name IS NOT NULL. CAUTION applied at every SetSpockRepairMode site: repair_mode is session-scoped, not transaction-scoped — SQL ROLLBACK does NOT reset it. Callers must reach Set(false) on every code path or the pooled connection returns poisoned. A new pub_commit_lsn column is added to ace_cdc_metadata (additive, ALTER TABLE IF NOT EXISTS). processReplicationStream refuses to open a stream whose start_lsn is older than pub_commit_lsn, returning an actionable error instead of silently rewinding to a pre-publication LSN. Empty pub_commit_lsn (legacy metadata rows) skips the check with a warning. Existing broken installs are not auto-repaired; users must run 'ace mtree teardown' + 'ace mtree init' + 'ace mtree build' after upgrade. '--skip-cdc' remains a valid interim workaround for pre-upgrade clusters. Co-Authored-By: Claude Opus 4.7 (1M context) --- db/queries/queries.go | 102 ++++++++++++++++++-- db/queries/templates.go | 88 ++++++++++++++++- internal/consistency/mtree/merkle.go | 116 +++++++++++++++++------ internal/infra/cdc/listen.go | 82 +++++++++++----- internal/infra/cdc/setup.go | 60 ++++++++++++ tests/integration/cdc_busy_table_test.go | 2 +- 6 files changed, 388 insertions(+), 62 deletions(-) diff --git a/db/queries/queries.go b/db/queries/queries.go index 8dfe37c..aed7cab 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -2913,18 +2913,106 @@ func DropCDCMetadataTable(ctx context.Context, db DBQuerier) error { return nil } -func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (string, string, []string, error) { +// pubCommitLSN is empty for legacy metadata rows that pre-date the +// pub_commit_lsn column; callers must treat empty as "invariant +// uncheckable" and skip the publication-commit guard with a warning. +func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (slotName, startLSN string, tables []string, pubCommitLSN string, err error) { sql, err := RenderSQL(SQLTemplates.GetCDCMetadata, nil) if err != nil { - return "", "", nil, err + return "", "", nil, "", err } - var slotName, startLSN string - var tables []string - err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables) + err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables, &pubCommitLSN) + if err != nil { + return "", "", nil, "", err + } + return slotName, startLSN, tables, pubCommitLSN, nil +} + +// Init path only. Ongoing flushes use UpdateCDCMetadata, which deliberately +// leaves pub_commit_lsn untouched so the listen.go guard always compares +// against the LSN captured at the matching init. +func InitCDCMetadata(ctx context.Context, db DBQuerier, publicationName, slotName, startLSN, pubCommitLSN string, tables []string) error { + sql, err := RenderSQL(SQLTemplates.InitCDCMetadata, nil) + if err != nil { + return fmt.Errorf("failed to render InitCDCMetadata SQL: %w", err) + } + if tables == nil { + tables = []string{} + } + _, err = db.Exec(ctx, sql, publicationName, slotName, startLSN, pubCommitLSN, tables) + if err != nil { + return fmt.Errorf("query to init cdc metadata failed: %w", err) + } + return nil +} + +// Called mid-Phase-A, after CREATE PUBLICATION, so the captured value is +// strictly less than Phase A's commit LSN. The slot created in Phase B +// has consistent_point >= that commit LSN, hence consistent_point > +// captured value: a safe lower bound for any valid replication start LSN. +func CurrentWalInsertLSN(ctx context.Context, db DBQuerier) (string, error) { + sql, err := RenderSQL(SQLTemplates.CurrentWalInsertLSN, nil) + if err != nil { + return "", fmt.Errorf("failed to render CurrentWalInsertLSN SQL: %w", err) + } + var lsn string + if err := db.QueryRow(ctx, sql).Scan(&lsn); err != nil { + return "", fmt.Errorf("failed to fetch current WAL insert LSN: %w", err) + } + return lsn, nil +} + +// Caller must have verified Spock is installed; this function does not +// gate on extension presence and will error otherwise. +func EnumerateMetadataRepsets(ctx context.Context, db DBQuerier, aceSchema string) ([]string, error) { + sql, err := RenderSQL(SQLTemplates.EnumerateMetadataRepsets, nil) if err != nil { - return "", "", nil, err + return nil, fmt.Errorf("failed to render EnumerateMetadataRepsets SQL: %w", err) } - return slotName, startLSN, tables, nil + rows, err := db.Query(ctx, sql, aceSchema) + if err != nil { + return nil, fmt.Errorf("query to enumerate metadata repsets failed: %w", err) + } + defer rows.Close() + var sets []string + for rows.Next() { + var s string + if err := rows.Scan(&s); err != nil { + return nil, fmt.Errorf("scan repset name: %w", err) + } + sets = append(sets, s) + } + return sets, rows.Err() +} + +// spock.repset_remove_table is a function call, not DDL — it does NOT +// propagate cross-node. Callers that need every node to drop a table +// from a repset must invoke this per-node. +func RepsetRemoveTable(ctx context.Context, db DBQuerier, setName, qualifiedTable string) error { + sql, err := RenderSQL(SQLTemplates.RepsetRemoveTable, nil) + if err != nil { + return fmt.Errorf("failed to render RepsetRemoveTable SQL: %w", err) + } + if _, err := db.Exec(ctx, sql, setName, qualifiedTable); err != nil { + return fmt.Errorf("repset_remove_table(%s, %s) failed: %w", setName, qualifiedTable, err) + } + return nil +} + +// CAUTION: spock.repair_mode is SESSION-scoped, not transaction-scoped. +// SQL ROLLBACK does NOT reset it. Every Set(true) must be paired with a +// Set(false) on every code path, including error paths — otherwise the +// pooled connection is returned with repair_mode still on and subsequent +// borrowers silently fail to replicate their writes. +func SetSpockRepairMode(ctx context.Context, db DBQuerier, on bool) error { + sql, err := RenderSQL(SQLTemplates.SetSpockRepairMode, nil) + if err != nil { + return fmt.Errorf("failed to render SetSpockRepairMode SQL: %w", err) + } + if _, err := db.Exec(ctx, sql, on); err != nil { + return fmt.Errorf("spock.repair_mode(%v) failed: %w", on, err) + } + return nil } func UpdateMtreeCounters(ctx context.Context, db DBQuerier, mtreeTable string, isComposite bool, compositeTypeName string, pkeyType string, inserts, deletes, updates []string) error { diff --git a/db/queries/templates.go b/db/queries/templates.go index 6a0ffb8..64f4379 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -145,6 +145,12 @@ type Templates struct { ResetReplicationOriginSession *template.Template SetupReplicationOriginXact *template.Template ResetReplicationOriginXact *template.Template + + InitCDCMetadata *template.Template + EnumerateMetadataRepsets *template.Template + RepsetRemoveTable *template.Template + SetSpockRepairMode *template.Template + CurrentWalInsertLSN *template.Template } var SQLTemplates = Templates{ @@ -221,6 +227,39 @@ var SQLTemplates = Templates{ last_updated = EXCLUDED.last_updated `)), + // On conflict every column including pub_commit_lsn is refreshed: + // reaching this query path means a re-init, which created a fresh + // publication with a new commit LSN, and listen.go's guard must + // compare against that current LSN — not a stale prior one. + InitCDCMetadata: template.Must(template.New("initCdcMetadata").Funcs(aceTemplateFuncs).Parse(` + INSERT INTO + {{aceSchema}}.ace_cdc_metadata ( + publication_name, + slot_name, + start_lsn, + pub_commit_lsn, + tables, + last_updated + ) + VALUES + ( + $1, + $2, + $3, + $4, + $5, + current_timestamp + ) + ON CONFLICT (publication_name) DO + UPDATE + SET + slot_name = EXCLUDED.slot_name, + start_lsn = EXCLUDED.start_lsn, + pub_commit_lsn = EXCLUDED.pub_commit_lsn, + tables = EXCLUDED.tables, + last_updated = EXCLUDED.last_updated + `)), + DropPublication: template.Must(template.New("dropPublication").Parse(` DROP PUBLICATION IF EXISTS {{.PublicationName}} `)), @@ -244,7 +283,8 @@ var SQLTemplates = Templates{ SELECT slot_name, start_lsn, - tables + tables, + COALESCE(pub_commit_lsn, '') AS pub_commit_lsn FROM {{aceSchema}}.ace_cdc_metadata WHERE @@ -338,9 +378,13 @@ var SQLTemplates = Templates{ publication_name text PRIMARY KEY, slot_name text, start_lsn text, + pub_commit_lsn text, tables text[], last_updated timestamptz - )`), + ); + -- Forward-compatible addition for clusters that ran older versions. + ALTER TABLE {{aceSchema}}.ace_cdc_metadata + ADD COLUMN IF NOT EXISTS pub_commit_lsn text;`), ), GetPrimaryKey: template.Must(template.New("getPrimaryKey").Parse(` SELECT @@ -1599,4 +1643,44 @@ var SQLTemplates = Templates{ ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(` SELECT pg_replication_origin_xact_reset() `)), + + // EnumerateMetadataRepsets lists every Spock replication set that + // contains pgedge_ace.ace_cdc_metadata on the current node. Used by + // cdc.ExcludeMetadataFromSpockRepsets to undo Spock's DDL-triggered + // auto-add of the metadata table. + // + // spock.tables surfaces every known relation; rows where the table is + // not in any replication set carry a NULL set_name and must be + // filtered out — otherwise the scan fails with "cannot scan NULL into + // *string" the moment the metadata table exists but has not yet been + // added to a repset (i.e., every fresh init on a cluster with DDL + // replication disabled). + EnumerateMetadataRepsets: template.Must(template.New("enumerateMetadataRepsets").Funcs(aceTemplateFuncs).Parse(` + SELECT set_name + FROM spock.tables + WHERE nspname = $1 AND relname = 'ace_cdc_metadata' AND set_name IS NOT NULL + `)), + + // RepsetRemoveTable wraps spock.repset_remove_table. Returns void; we + // don't care about the return value, only that the call doesn't error. + RepsetRemoveTable: template.Must(template.New("repsetRemoveTable").Parse(` + SELECT spock.repset_remove_table($1, $2) + `)), + + // CAUTION: SESSION-scoped, not transaction-scoped. SQL ROLLBACK does + // NOT reset this. Callers must pair $1=true with a matching $1=false + // on every code path or the pooled connection returns with repair + // mode still on, silently suppressing replication for the next user. + SetSpockRepairMode: template.Must(template.New("setSpockRepairMode").Parse(` + SELECT spock.repair_mode($1) + `)), + + // CurrentWalInsertLSN returns pg_current_wal_insert_lsn() as text. Used + // at MtreeInit time, just after CREATE PUBLICATION, as the lower bound + // for any future replication start LSN. The slot's consistent point is + // guaranteed to be >= this value because the slot is created after this + // transaction commits. + CurrentWalInsertLSN: template.Must(template.New("currentWalInsertLSN").Parse(` + SELECT pg_current_wal_insert_lsn()::text + `)), } diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 4b6bd03..e232717 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -807,51 +807,111 @@ func (m *MerkleTreeTask) MtreeInit() (err error) { cfg := config.Get().MTree.CDC for _, nodeInfo := range m.ClusterNodes { - logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) + if err := m.initOneNode(nodeInfo, cfg.PublicationName, cfg.SlotName); err != nil { + return err + } + } + return nil +} - lsn, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo) +// initOneNode runs MtreeInit on a single node in three phases: +// +// 1. Phase A (tx 1): create schema, helpers, the CDC metadata table, +// exclude it from Spock replication sets, create the publication, +// and capture the publication's commit LSN. Commit. +// 2. Phase B (no tx): create the logical replication slot via a fresh +// replication-mode connection. Its consistent point is now +// guaranteed to be > the publication's commit LSN, fixing the +// "publication does not exist" replay error. +// 3. Phase C (tx 2): persist slot/start_lsn/pub_commit_lsn into +// ace_cdc_metadata, wrapped in spock.repair_mode(true) so the +// write stays node-local while peer nodes may still have the +// metadata table in their replication set. +// +// Each iteration runs inside this function so deferred Close/Rollback +// calls fire per-node rather than at MtreeInit return. +func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, slotName string) error { + logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) + + pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) + if err != nil { + return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err) + } + defer pool.Close() + + if err := queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { + return fmt.Errorf("failed to create schema '%s' on node %s: %w", m.aceSchema(), nodeInfo["Name"], err) + } + + var pubCommitLSN string + if err := func() (err error) { + tx, err := pool.Begin(m.Ctx) if err != nil { - return fmt.Errorf("failed to set up replication slot on node %s: %w", nodeInfo["Name"], err) + return err } + defer tx.Rollback(m.Ctx) - pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) + if err := queries.CreateXORFunction(m.Ctx, tx); err != nil { + return fmt.Errorf("create xor function: %w", err) + } + if err := queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil { + return fmt.Errorf("create cdc metadata table: %w", err) + } + if err := cdc.ExcludeMetadataFromSpockRepsets(m.Ctx, tx, m.aceSchema()); err != nil { + return fmt.Errorf("exclude ace_cdc_metadata from spock repsets: %w", err) + } + if err := cdc.SetupPublication(m.Ctx, tx, publicationName); err != nil { + return fmt.Errorf("setup publication: %w", err) + } + // Captured mid-tx after CREATE PUBLICATION, so the value is + // strictly less than this tx's commit LSN; Phase B's slot + // consistent_point is >= that commit LSN. listen.go's guard + // therefore catches any start LSN that predates the publication. + pubCommitLSN, err = queries.CurrentWalInsertLSN(m.Ctx, tx) if err != nil { - return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err) + return fmt.Errorf("capture publication commit LSN: %w", err) } - defer pool.Close() + return tx.Commit(m.Ctx) + }(); err != nil { + return fmt.Errorf("phase A failed on node %s: %w", nodeInfo["Name"], err) + } - if err = queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { - return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err) - } + slotLSN, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo) + if err != nil { + return fmt.Errorf("phase B (replication slot) failed on node %s: %w", nodeInfo["Name"], err) + } + if err := func() (err error) { tx, err := pool.Begin(m.Ctx) if err != nil { - return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) + return err } defer tx.Rollback(m.Ctx) - if err = queries.CreateXORFunction(m.Ctx, tx); err != nil { - return fmt.Errorf("failed to create xor function: %w", err) + if err := cdc.SetSpockRepairMode(m.Ctx, tx, true); err != nil { + return fmt.Errorf("enable spock repair_mode: %w", err) } - - if err = queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil { - return fmt.Errorf("failed to create cdc metadata table: %w", err) + if err := queries.InitCDCMetadata(m.Ctx, tx, + publicationName, slotName, + slotLSN.String(), pubCommitLSN, []string{}); err != nil { + return fmt.Errorf("write cdc metadata: %w", err) } - - if err := cdc.SetupPublication(m.Ctx, tx, cfg.PublicationName); err != nil { - return fmt.Errorf("failed to setup publication on node %s: %w", nodeInfo["Name"], err) + if err := cdc.SetSpockRepairMode(m.Ctx, tx, false); err != nil { + return fmt.Errorf("disable spock repair_mode: %w", err) } - - if err = queries.UpdateCDCMetadata(m.Ctx, tx, cfg.PublicationName, cfg.SlotName, lsn.String(), []string{}); err != nil { - return fmt.Errorf("failed to update cdc metadata: %w", err) + return tx.Commit(m.Ctx) + }(); err != nil { + // Slot leak mitigation: Phase B created a slot that we failed to + // record. A subsequent SetupReplicationSlot will drop any prior + // slot of the same name, so a re-run reaps it — but try a best + // effort drop here too to keep the cluster tidy. + if dropErr := queries.DropReplicationSlot(m.Ctx, pool, slotName); dropErr != nil { + logger.Warn("failed to drop orphaned slot %s on node %s: %v", slotName, nodeInfo["Name"], dropErr) } - - if err := tx.Commit(m.Ctx); err != nil { - return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err) - } - - logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"]) + return fmt.Errorf("phase C failed on node %s: %w", nodeInfo["Name"], err) } + + logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"]) return nil } @@ -1427,7 +1487,7 @@ func (m *MerkleTreeTask) BuildMtree() (err error) { } defer tx.Rollback(m.Ctx) - slotName, startLSN, tables, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) + slotName, startLSN, tables, _, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) if err != nil { pool.Close() return fmt.Errorf("failed to get cdc metadata on node %s: %w", nodeInfo["Name"], err) diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 0038dae..c517fa3 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -65,6 +65,7 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont publication := cfg.PublicationName slotName := cfg.SlotName var startLSNStr string + var pubCommitLSNStr string var tables []string func() { tx, err := pool.Begin(ctx) @@ -74,13 +75,14 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } defer tx.Rollback(ctx) - metaSlot, metaLSN, metaTables, err := queries.GetCDCMetadata(ctx, tx, publication) + metaSlot, metaLSN, metaTables, metaPubCommit, err := queries.GetCDCMetadata(ctx, tx, publication) if err != nil { logger.Error("failed to get cdc metadata: %v", err) startLSNStr = "" return } startLSNStr = metaLSN + pubCommitLSNStr = metaPubCommit tables = metaTables if metaSlot != "" { slotName = metaSlot @@ -98,10 +100,33 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return fmt.Errorf("failed to parse lsn: %v", err) } - slotConfirmedLSN, err := getSlotConfirmedFlushLSN(ctx, pool, slotName) - if err != nil { - logger.Error("failed to get confirmed_flush_lsn for slot %s: %v", slotName, err) - return fmt.Errorf("failed to get confirmed_flush_lsn for slot %s: %w", slotName, err) + // Publication-commit guard. If metadata records the LSN at which the + // publication was committed during MtreeInit, refuse to start a stream + // from any LSN strictly older than that — pgoutput's historical + // catalog snapshot would not yet contain the publication and the + // stream would abort with "publication does not exist" (SQLSTATE + // 42704). + // + // Empty pubCommitLSNStr means the metadata row was written by a + // version of ACE that pre-dates this fix; we cannot check the + // invariant and fall through with a warning. + if pubCommitLSNStr == "" { + logger.Warn("ace_cdc_metadata has no pub_commit_lsn for publication %s; cannot verify slot/publication ordering. If you see SQLSTATE 42704, run 'ace mtree teardown' and 'ace mtree init' to recover", publication) + } else { + pubCommitLSN, err := pglogrepl.ParseLSN(pubCommitLSNStr) + if err != nil { + logger.Error("failed to parse pub_commit_lsn %s: %v", pubCommitLSNStr, err) + return fmt.Errorf("failed to parse pub_commit_lsn %s: %w", pubCommitLSNStr, err) + } + if startLSN < pubCommitLSN { + return fmt.Errorf( + "ace_cdc_metadata.start_lsn (%s) is older than publication commit LSN (%s) "+ + "for slot %s; this typically indicates ace_cdc_metadata was replicated "+ + "cross-node by Spock, or the slot/metadata is from a prior init. "+ + "Run 'ace mtree teardown' and 'ace mtree init' to recover", + startLSN, pubCommitLSN, slotName, + ) + } } targetFlushLSN := pglogrepl.LSN(0) @@ -119,9 +144,6 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } lastLSN := startLSN - if slotConfirmedLSN != 0 && slotConfirmedLSN < lastLSN { - lastLSN = slotConfirmedLSN - } var lastFlushedLSN pglogrepl.LSN = lastLSN var lastLSNVal atomic.Uint64 lastLSNVal.Store(uint64(lastLSN)) @@ -442,6 +464,16 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont tables = []string{} } + // start_lsn is the local slot's progress; cross-node propagation + // triggers SQLSTATE 42704 on peers. See SetSpockRepairMode's + // CAUTION on session-scoped state — Set(false) below MUST run. + if err := SetSpockRepairMode(processingCtx, tx, true); err != nil { + if conn != nil { + conn.Close(ctx) + } + return fmt.Errorf("failed to enable spock repair_mode: %w", err) + } + if err := queries.UpdateCDCMetadata(processingCtx, tx, publication, slotName, lastLSN.String(), tables); err != nil { if conn != nil { conn.Close(ctx) @@ -449,6 +481,13 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return fmt.Errorf("failed to update cdc metadata: %w", err) } + if err := SetSpockRepairMode(processingCtx, tx, false); err != nil { + if conn != nil { + conn.Close(ctx) + } + return fmt.Errorf("failed to disable spock repair_mode: %w", err) + } + if err := tx.Commit(processingCtx); err != nil { if conn != nil { conn.Close(ctx) @@ -472,22 +511,6 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return nil } -func getSlotConfirmedFlushLSN(ctx context.Context, pool *pgxpool.Pool, slotName string) (pglogrepl.LSN, error) { - var lsnStr string - err := pool.QueryRow(ctx, "SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1", slotName).Scan(&lsnStr) - if err != nil { - return 0, fmt.Errorf("failed to fetch confirmed_flush_lsn for slot %s: %w", slotName, err) - } - if lsnStr == "" { - return 0, fmt.Errorf("confirmed_flush_lsn empty for slot %s", slotName) - } - lsn, err := pglogrepl.ParseLSN(lsnStr) - if err != nil { - return 0, fmt.Errorf("failed to parse confirmed_flush_lsn %s for slot %s: %w", lsnStr, slotName, err) - } - return lsn, nil -} - func getCurrentWalFlushLSN(ctx context.Context, pool *pgxpool.Pool) (pglogrepl.LSN, error) { var lsnStr string err := pool.QueryRow(ctx, "SELECT pg_current_wal_flush_lsn()").Scan(&lsnStr) @@ -515,10 +538,21 @@ func flushMetadata(ctx context.Context, pool *pgxpool.Pool, publication, slotNam } defer tx.Rollback(ctx) + // start_lsn is the local slot's progress; cross-node propagation + // triggers SQLSTATE 42704 on peers. See SetSpockRepairMode's + // CAUTION on session-scoped state — Set(false) below MUST run. + if err := SetSpockRepairMode(ctx, tx, true); err != nil { + return fmt.Errorf("failed to enable spock repair_mode: %w", err) + } + if err := queries.UpdateCDCMetadata(ctx, tx, publication, slotName, lsn.String(), tables); err != nil { return fmt.Errorf("failed to update cdc metadata: %w", err) } + if err := SetSpockRepairMode(ctx, tx, false); err != nil { + return fmt.Errorf("failed to disable spock repair_mode: %w", err) + } + if err := tx.Commit(ctx); err != nil { return fmt.Errorf("failed to commit cdc metadata update: %w", err) } diff --git a/internal/infra/cdc/setup.go b/internal/infra/cdc/setup.go index a135cb8..7ebb304 100644 --- a/internal/infra/cdc/setup.go +++ b/internal/infra/cdc/setup.go @@ -38,6 +38,66 @@ func SetupPublication(ctx context.Context, db queries.DBQuerier, publicationName return nil } +// ExcludeMetadataFromSpockRepsets removes .ace_cdc_metadata from +// every Spock replication set it currently belongs to on the current node. +// +// Spock's DDL-replication path auto-adds new tables to the default repset on +// each node that applies the replicated CREATE TABLE. The metadata table +// holds node-local LSNs that must never propagate cross-node, so we strip +// it out of every repset after creation. This is per-node — repset_remove_table +// is a function call, not DDL, so it does not propagate to peers. +// +// No-op when Spock is not installed. +func ExcludeMetadataFromSpockRepsets(ctx context.Context, db queries.DBQuerier, aceSchema string) error { + spockInstalled, err := queries.CheckSpockInstalled(ctx, db) + if err != nil { + return fmt.Errorf("failed to detect spock extension: %w", err) + } + if !spockInstalled { + return nil + } + + sets, err := queries.EnumerateMetadataRepsets(ctx, db, aceSchema) + if err != nil { + return fmt.Errorf("failed to enumerate repsets containing ace_cdc_metadata: %w", err) + } + if len(sets) == 0 { + return nil + } + + qualified := fmt.Sprintf("%s.ace_cdc_metadata", aceSchema) + for _, set := range sets { + if err := queries.RepsetRemoveTable(ctx, db, set, qualified); err != nil { + return err + } + logger.Info("Removed %s from spock repset '%s'", qualified, set) + } + return nil +} + +// While repair mode is active, local writes are NOT replicated to peer +// nodes. Required around writes to ace_cdc_metadata: start_lsn is the +// node's own slot position and must stay node-local even if the table +// briefly lives in a peer's repset during the per-node init loop. +// +// CAUTION: repair_mode is SESSION-scoped, not transaction-scoped. SQL +// ROLLBACK does NOT reset it. Every Set(true) must reach a matching +// Set(false) on every code path; an error between the two leaves the +// pooled connection poisoned and the next borrower's writes will not +// replicate. +// +// No-op when Spock is not installed. +func SetSpockRepairMode(ctx context.Context, db queries.DBQuerier, on bool) error { + spockInstalled, err := queries.CheckSpockInstalled(ctx, db) + if err != nil { + return fmt.Errorf("failed to detect spock extension: %w", err) + } + if !spockInstalled { + return nil + } + return queries.SetSpockRepairMode(ctx, db, on) +} + func SetupReplicationSlot(ctx context.Context, nodeInfo map[string]any) (pglogrepl.LSN, error) { cfg := config.Get().MTree.CDC slot := cfg.SlotName diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index 8a2771d..fb13b5c 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -252,7 +252,7 @@ func currentMetadataLSN(t *testing.T, ctx context.Context) pglogrepl.LSN { require.NoError(t, err) defer tx.Rollback(ctx) - _, lsnStr, _, err := queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) + _, lsnStr, _, _, err := queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) require.NoError(t, err) lsn, err := pglogrepl.ParseLSN(lsnStr) From d0e483c288dbab58a401282440c9968961ed1ce7 Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Fri, 15 May 2026 14:05:43 +0200 Subject: [PATCH 2/6] test: regression coverage for mtree CDC init bug (42704) Add tests/integration/cdc_init_ordering_test.go covering the three invariants the fix preserves: - slot's confirmed_flush_lsn >= ace_cdc_metadata.pub_commit_lsn on every node (Bug 1: slot consistent point must not precede publication commit). - ace_cdc_metadata is removed from every Spock repset on every node; the test seeds it into 'default' between two init calls so the assertion bites on pre-fix code (Bug 2). - processReplicationStream returns the actionable guard error, not raw SQLSTATE 42704, when start_lsn precedes pub_commit_lsn. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/integration/cdc_init_ordering_test.go | 249 ++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 tests/integration/cdc_init_ordering_test.go diff --git a/tests/integration/cdc_init_ordering_test.go b/tests/integration/cdc_init_ordering_test.go new file mode 100644 index 0000000..71d4408 --- /dev/null +++ b/tests/integration/cdc_init_ordering_test.go @@ -0,0 +1,249 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +// Regression coverage for SQLSTATE 42704 "publication \"ace_mtree_pub\" +// does not exist" during mtree table-diff on Spock clusters. Two defects +// combined to cause it: +// +// 1. MtreeInit created the replication slot before the publication was +// committed, so the slot's consistent point preceded the publication. +// pgoutput's get_publication_oid then failed during replay. +// 2. Spock DDL-replicated CREATE TABLE on ace_cdc_metadata auto-added the +// table to the default replication set on every node. Peers' INSERTs into +// the table propagated cross-node, overwriting each node's local +// start_lsn with a peer's LSN. + +package integration + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pgedge/ace/internal/infra/cdc" + "github.com/pgedge/ace/pkg/config" + "github.com/stretchr/testify/require" +) + +// TestMtreeInitSlotIsAfterPublication verifies bug 1's fix: the replication +// slot's consistent point sits at or beyond the publication's commit LSN, so +// pgoutput can always find the publication in the historical catalog snapshot. +func TestMtreeInitSlotIsAfterPublication(t *testing.T) { + ctx := context.Background() + tableName := "customers_pub_ordering" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + for _, np := range []struct { + name string + pool *pgxpool.Pool + }{ + {serviceN1, pgCluster.Node1Pool}, + {serviceN2, pgCluster.Node2Pool}, + } { + // pub_commit_lsn must be populated by the new InitCDCMetadata path. + var pubCommitLSN string + err := np.pool.QueryRow(ctx, fmt.Sprintf( + "SELECT COALESCE(pub_commit_lsn, '') FROM %s.ace_cdc_metadata WHERE publication_name = $1", + config.Cfg.MTree.Schema, + ), config.Cfg.MTree.CDC.PublicationName).Scan(&pubCommitLSN) + require.NoError(t, err, "read pub_commit_lsn on %s", np.name) + require.NotEmpty(t, pubCommitLSN, "pub_commit_lsn must be set on %s (proves new InitCDCMetadata ran)", np.name) + + // The slot's confirmed_flush_lsn is the consistent point of the + // replication slot. It must be >= the publication's commit LSN, or + // pgoutput would replay from a snapshot that doesn't see the + // publication. This is the core ordering invariant the fix restores. + var slotConfirmed string + err = np.pool.QueryRow(ctx, + "SELECT confirmed_flush_lsn::text FROM pg_replication_slots WHERE slot_name = $1", + config.Cfg.MTree.CDC.SlotName, + ).Scan(&slotConfirmed) + require.NoError(t, err, "read slot confirmed_flush_lsn on %s", np.name) + + var aheadOrEqual bool + err = np.pool.QueryRow(ctx, + "SELECT $1::pg_lsn >= $2::pg_lsn", + slotConfirmed, pubCommitLSN, + ).Scan(&aheadOrEqual) + require.NoError(t, err, "compare LSNs on %s", np.name) + require.True(t, aheadOrEqual, + "slot consistent point %s must be >= publication commit LSN %s on %s", + slotConfirmed, pubCommitLSN, np.name) + } +} + +// TestMtreeInitExcludesMetadataFromSpock verifies bug 2's fix: the +// ace_cdc_metadata table is removed from every Spock replication set during +// init, and writes to it stay node-local. +// +// In production this matters because Spock's DDL-replication path auto-adds +// new tables to the default replication set on every node. The test cluster +// runs with DDL replication off, so to genuinely exercise the fix we run +// init, manually add the metadata table to the default repset (simulating +// Spock's auto-add), then re-run init. The fix's +// ExcludeMetadataFromSpockRepsets must clean the entry back out; pre-fix +// init had no such step, so the entry would persist. +func TestMtreeInitExcludesMetadataFromSpock(t *testing.T) { + ctx := context.Background() + tableName := "customers_metadata_isolation" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + qualifiedMetadata := fmt.Sprintf("%s.ace_cdc_metadata", config.Cfg.MTree.Schema) + + // Force the state that Spock would create under DDL replication: add + // the metadata table to the default repset on each node. + for _, np := range []struct { + name string + pool *pgxpool.Pool + }{ + {serviceN1, pgCluster.Node1Pool}, + {serviceN2, pgCluster.Node2Pool}, + } { + _, err := np.pool.Exec(ctx, + "SELECT spock.repset_add_table('default', $1::regclass)", + qualifiedMetadata, + ) + require.NoError(t, err, "seed: add metadata to default repset on %s", np.name) + } + + // Re-run init — the fix's ExcludeMetadataFromSpockRepsets must reverse + // the auto-add. Pre-fix init has no such step, so the table stays in + // the default repset and the assertion below fails. + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + + for _, np := range []struct { + name string + pool *pgxpool.Pool + }{ + {serviceN1, pgCluster.Node1Pool}, + {serviceN2, pgCluster.Node2Pool}, + } { + // spock.tables surfaces every known relation, with set_name NULL + // when the table is not in any replication set. The fix's invariant + // is "not in any repset", i.e., no row with a non-NULL set_name. + var inRepset bool + err := np.pool.QueryRow(ctx, + "SELECT EXISTS (SELECT 1 FROM spock.tables WHERE nspname = $1 AND relname = 'ace_cdc_metadata' AND set_name IS NOT NULL)", + config.Cfg.MTree.Schema, + ).Scan(&inRepset) + require.NoError(t, err, "check spock.tables on %s", np.name) + require.False(t, inRepset, + "ace_cdc_metadata must not be in any spock replication set on %s after re-init; "+ + "otherwise node-local start_lsn leaks cross-node", np.name) + } + + // End-to-end demonstration: write a sentinel start_lsn into n1's metadata + // and confirm it does not propagate to n2. Under bug 2, the sentinel would + // appear on n2 within seconds (Spock apply latency). + sentinel := "FFFF/FFFFFFF0" + _, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( + "UPDATE %s.ace_cdc_metadata SET start_lsn = $1 WHERE publication_name = $2", + config.Cfg.MTree.Schema, + ), sentinel, config.Cfg.MTree.CDC.PublicationName) + require.NoError(t, err, "write sentinel start_lsn on n1") + + // Give Spock a generous window to apply if it were going to. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + var n2StartLSN string + err := pgCluster.Node2Pool.QueryRow(ctx, fmt.Sprintf( + "SELECT start_lsn FROM %s.ace_cdc_metadata WHERE publication_name = $1", + config.Cfg.MTree.Schema, + ), config.Cfg.MTree.CDC.PublicationName).Scan(&n2StartLSN) + require.NoError(t, err) + require.NotEqual(t, sentinel, n2StartLSN, + "sentinel start_lsn leaked from n1 to n2 via spock replication") + time.Sleep(200 * time.Millisecond) + } +} + +// TestProcessReplicationStreamRejectsStaleStartLSN verifies the new +// publication-commit guard in processReplicationStream. We corrupt n1's +// start_lsn to a value strictly older than pub_commit_lsn (which is what +// bug 2 used to do via Spock cross-node leak) and assert that UpdateFromCDC +// returns an explicit, actionable error rather than the cryptic SQLSTATE +// 42704 the customer saw. +func TestProcessReplicationStreamRejectsStaleStartLSN(t *testing.T) { + ctx := context.Background() + tableName := "customers_stale_start_lsn" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + // Read pub_commit_lsn and craft a stale start_lsn strictly below it. + var pubCommitLSN string + err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf( + "SELECT pub_commit_lsn FROM %s.ace_cdc_metadata WHERE publication_name = $1", + config.Cfg.MTree.Schema, + ), config.Cfg.MTree.CDC.PublicationName).Scan(&pubCommitLSN) + require.NoError(t, err) + require.NotEmpty(t, pubCommitLSN, "fix relies on pub_commit_lsn being populated") + + var staleLSN string + err = pgCluster.Node1Pool.QueryRow(ctx, + "SELECT ($1::pg_lsn - 16)::text", pubCommitLSN, + ).Scan(&staleLSN) + require.NoError(t, err) + + _, err = pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( + "UPDATE %s.ace_cdc_metadata SET start_lsn = $1 WHERE publication_name = $2", + config.Cfg.MTree.Schema, + ), staleLSN, config.Cfg.MTree.CDC.PublicationName) + require.NoError(t, err, "inject stale start_lsn") + + nodeInfo := pgCluster.ClusterNodes[0] + err = cdc.UpdateFromCDC(context.Background(), nodeInfo) + require.Error(t, err, "UpdateFromCDC must refuse a stream whose start_lsn precedes the publication commit") + + msg := err.Error() + require.Contains(t, msg, "publication commit LSN", + "error must mention the publication-commit invariant, got: %s", msg) + require.NotContains(t, strings.ToLower(msg), "42704", + "caller should see the new guard's actionable error, not the raw 42704 from PostgreSQL: %s", msg) +} From 77abfa5c9dedad271e32c969d54d52c8c2f6ecd4 Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Fri, 15 May 2026 14:18:00 +0200 Subject: [PATCH 3/6] test: extract helpers in TestMtreeInitExcludesMetadataFromSpock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codacy flagged the test body as 61 LOC (limit 50). Pull the two-node fan-out loops and the sentinel-propagation block into named helpers: seedMetadataInDefaultRepset, assertMetadataNotInAnyRepset, assertSentinelDoesNotPropagate, plus mtreeTestNodes() to centralise the (name, pool) pair list. The test body is now 21 LOC and reads as arrange → act → assert. The six "SQL injection" findings Codacy raised on the same file are AI-classified false positives: the %s substitution is config.Cfg.MTree.Schema (a config-loaded identifier sanitised at load time), and the line at seedMetadataInDefaultRepset's spock.repset_add_table call is in fact parameterised via $1 — Codacy misread the surrounding fmt.Sprintf. No code change needed for those. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/integration/cdc_init_ordering_test.go | 73 +++++++++++++-------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/tests/integration/cdc_init_ordering_test.go b/tests/integration/cdc_init_ordering_test.go index 71d4408..8ce1263 100644 --- a/tests/integration/cdc_init_ordering_test.go +++ b/tests/integration/cdc_init_ordering_test.go @@ -127,19 +127,7 @@ func TestMtreeInitExcludesMetadataFromSpock(t *testing.T) { // Force the state that Spock would create under DDL replication: add // the metadata table to the default repset on each node. - for _, np := range []struct { - name string - pool *pgxpool.Pool - }{ - {serviceN1, pgCluster.Node1Pool}, - {serviceN2, pgCluster.Node2Pool}, - } { - _, err := np.pool.Exec(ctx, - "SELECT spock.repset_add_table('default', $1::regclass)", - qualifiedMetadata, - ) - require.NoError(t, err, "seed: add metadata to default repset on %s", np.name) - } + seedMetadataInDefaultRepset(t, ctx, qualifiedMetadata) // Re-run init — the fix's ExcludeMetadataFromSpockRepsets must reverse // the auto-add. Pre-fix init has no such step, so the table stays in @@ -147,16 +135,30 @@ func TestMtreeInitExcludesMetadataFromSpock(t *testing.T) { require.NoError(t, mtreeTask.RunChecks(false)) require.NoError(t, mtreeTask.MtreeInit()) - for _, np := range []struct { - name string - pool *pgxpool.Pool - }{ - {serviceN1, pgCluster.Node1Pool}, - {serviceN2, pgCluster.Node2Pool}, - } { - // spock.tables surfaces every known relation, with set_name NULL - // when the table is not in any replication set. The fix's invariant - // is "not in any repset", i.e., no row with a non-NULL set_name. + assertMetadataNotInAnyRepset(t, ctx) + assertSentinelDoesNotPropagate(t, ctx) +} + +// seedMetadataInDefaultRepset adds ace_cdc_metadata to Spock's default repset +// on every node, simulating Spock's DDL-replicated auto-add that the test +// cluster (DDL replication off) does not produce naturally. +func seedMetadataInDefaultRepset(t *testing.T, ctx context.Context, qualifiedMetadata string) { + t.Helper() + for _, np := range mtreeTestNodes() { + _, err := np.pool.Exec(ctx, + "SELECT spock.repset_add_table('default', $1::regclass)", + qualifiedMetadata, + ) + require.NoError(t, err, "seed: add metadata to default repset on %s", np.name) + } +} + +// assertMetadataNotInAnyRepset asserts the fix's post-init invariant: no row +// in spock.tables has a non-NULL set_name for ace_cdc_metadata. (The view +// surfaces every relation; NULL set_name means "not in any repset".) +func assertMetadataNotInAnyRepset(t *testing.T, ctx context.Context) { + t.Helper() + for _, np := range mtreeTestNodes() { var inRepset bool err := np.pool.QueryRow(ctx, "SELECT EXISTS (SELECT 1 FROM spock.tables WHERE nspname = $1 AND relname = 'ace_cdc_metadata' AND set_name IS NOT NULL)", @@ -167,10 +169,13 @@ func TestMtreeInitExcludesMetadataFromSpock(t *testing.T) { "ace_cdc_metadata must not be in any spock replication set on %s after re-init; "+ "otherwise node-local start_lsn leaks cross-node", np.name) } +} - // End-to-end demonstration: write a sentinel start_lsn into n1's metadata - // and confirm it does not propagate to n2. Under bug 2, the sentinel would - // appear on n2 within seconds (Spock apply latency). +// assertSentinelDoesNotPropagate writes a sentinel start_lsn on n1 and +// verifies it never appears on n2 within a 3 s window. Under bug 2 the +// sentinel would propagate via Spock apply within seconds. +func assertSentinelDoesNotPropagate(t *testing.T, ctx context.Context) { + t.Helper() sentinel := "FFFF/FFFFFFF0" _, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( "UPDATE %s.ace_cdc_metadata SET start_lsn = $1 WHERE publication_name = $2", @@ -178,7 +183,6 @@ func TestMtreeInitExcludesMetadataFromSpock(t *testing.T) { ), sentinel, config.Cfg.MTree.CDC.PublicationName) require.NoError(t, err, "write sentinel start_lsn on n1") - // Give Spock a generous window to apply if it were going to. deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { var n2StartLSN string @@ -193,6 +197,21 @@ func TestMtreeInitExcludesMetadataFromSpock(t *testing.T) { } } +// mtreeTestNodes returns the (name, pool) pairs the regression tests fan out +// over. Centralised so the two-node iteration body need not be duplicated. +func mtreeTestNodes() []struct { + name string + pool *pgxpool.Pool +} { + return []struct { + name string + pool *pgxpool.Pool + }{ + {serviceN1, pgCluster.Node1Pool}, + {serviceN2, pgCluster.Node2Pool}, + } +} + // TestProcessReplicationStreamRejectsStaleStartLSN verifies the new // publication-commit guard in processReplicationStream. We corrupt n1's // start_lsn to a value strictly older than pub_commit_lsn (which is what From d6a8a9343354fb8523eae7ebbc9ad04728d0716c Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Fri, 15 May 2026 14:29:13 +0200 Subject: [PATCH 4/6] fix: ensure spock.repair_mode reset on error paths (Site 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit processReplicationStream's non-continuous tail leaked repair_mode when any step between Set(true) and Commit returned early — the session GUC persists across SQL ROLLBACK, so the pooled connection went back poisoned and the next borrower's writes silently did not replicate. Switch to pool.Acquire: defer Set(false) on the borrowed *pgxpool.Conn runs before Release, on every path. flushMetadata and MtreeInit Phase C share the shape; tracked separately. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/infra/cdc/listen.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index c517fa3..c79e4a7 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -451,41 +451,53 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } if !continuous { - tx, err := pool.Begin(processingCtx) + // repair_mode is a SESSION GUC; SQL ROLLBACK does not reset it + // and a deferred reset against the tx fails because the tx is + // already closed at commit time. Acquire a dedicated conn so + // Set(false) runs against the still-live session before the + // conn returns to the pool — preventing pool poisoning if any + // step between Set(true) and Commit returns early. + c, err := pool.Acquire(processingCtx) if err != nil { if conn != nil { conn.Close(ctx) } - return fmt.Errorf("failed to begin metadata update transaction: %w", err) + return fmt.Errorf("failed to acquire connection for metadata update: %w", err) } - defer tx.Rollback(processingCtx) + defer c.Release() if tables == nil { tables = []string{} } // start_lsn is the local slot's progress; cross-node propagation - // triggers SQLSTATE 42704 on peers. See SetSpockRepairMode's - // CAUTION on session-scoped state — Set(false) below MUST run. - if err := SetSpockRepairMode(processingCtx, tx, true); err != nil { + // triggers SQLSTATE 42704 on peers. + if err := SetSpockRepairMode(processingCtx, c, true); err != nil { if conn != nil { conn.Close(ctx) } return fmt.Errorf("failed to enable spock repair_mode: %w", err) } + defer func() { + if err := SetSpockRepairMode(processingCtx, c, false); err != nil { + logger.Warn("failed to reset spock repair_mode before connection release: %v", err) + } + }() - if err := queries.UpdateCDCMetadata(processingCtx, tx, publication, slotName, lastLSN.String(), tables); err != nil { + tx, err := c.Begin(processingCtx) + if err != nil { if conn != nil { conn.Close(ctx) } - return fmt.Errorf("failed to update cdc metadata: %w", err) + return fmt.Errorf("failed to begin metadata update transaction: %w", err) } + defer tx.Rollback(processingCtx) - if err := SetSpockRepairMode(processingCtx, tx, false); err != nil { + if err := queries.UpdateCDCMetadata(processingCtx, tx, publication, slotName, lastLSN.String(), tables); err != nil { if conn != nil { conn.Close(ctx) } - return fmt.Errorf("failed to disable spock repair_mode: %w", err) + return fmt.Errorf("failed to update cdc metadata: %w", err) } if err := tx.Commit(processingCtx); err != nil { From dcc6d9c2409983c002b7b719711a857ddc1d784e Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Tue, 19 May 2026 11:19:45 +0200 Subject: [PATCH 5/6] fix: address PR review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - flushMetadata (listen.go) and MtreeInit Phase C (merkle.go): port to the borrowed-*pgxpool.Conn pattern from Site 1, so defer Set(false) runs on the live session before Release. flushMetadata is on the periodic+shutdown path and was the highest-leak remaining site. - Guard scope (listen.go): clarify the pub_commit_lsn check is a tripwire for partial-state-on-upgrade and narrow cross-node leaks, not a general leak detector — the real defense is repset exclusion plus repair_mode wrapping every metadata write. - Test timing (cdc_init_ordering_test.go): bump assertSentinelDoesNotPropagate's window from 3 s to 10 s for slow CI. --- db/queries/queries.go | 8 ++- db/queries/templates.go | 6 +-- internal/consistency/mtree/merkle.go | 51 ++++++++++-------- internal/infra/cdc/listen.go | 59 +++++++++++---------- internal/infra/cdc/setup.go | 16 ++---- tests/integration/cdc_init_ordering_test.go | 3 +- 6 files changed, 71 insertions(+), 72 deletions(-) diff --git a/db/queries/queries.go b/db/queries/queries.go index aed7cab..1e6b4e9 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -2999,11 +2999,9 @@ func RepsetRemoveTable(ctx context.Context, db DBQuerier, setName, qualifiedTabl return nil } -// CAUTION: spock.repair_mode is SESSION-scoped, not transaction-scoped. -// SQL ROLLBACK does NOT reset it. Every Set(true) must be paired with a -// Set(false) on every code path, including error paths — otherwise the -// pooled connection is returned with repair_mode still on and subsequent -// borrowers silently fail to replicate their writes. +// CAUTION: session-scoped, not tx-scoped. Rollback does NOT reset it. +// Pair Set(true) with Set(false) on every path or the pooled conn +// returns poisoned and the next borrower silently won't replicate. func SetSpockRepairMode(ctx context.Context, db DBQuerier, on bool) error { sql, err := RenderSQL(SQLTemplates.SetSpockRepairMode, nil) if err != nil { diff --git a/db/queries/templates.go b/db/queries/templates.go index 64f4379..13d5a3f 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -1667,10 +1667,8 @@ var SQLTemplates = Templates{ SELECT spock.repset_remove_table($1, $2) `)), - // CAUTION: SESSION-scoped, not transaction-scoped. SQL ROLLBACK does - // NOT reset this. Callers must pair $1=true with a matching $1=false - // on every code path or the pooled connection returns with repair - // mode still on, silently suppressing replication for the next user. + // CAUTION: session-scoped, not tx-scoped. Pair $1=true with + // $1=false on every path or the pool returns poisoned. SetSpockRepairMode: template.Must(template.New("setSpockRepairMode").Parse(` SELECT spock.repair_mode($1) `)), diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index e232717..1e870a7 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -814,22 +814,17 @@ func (m *MerkleTreeTask) MtreeInit() (err error) { return nil } -// initOneNode runs MtreeInit on a single node in three phases: +// Per-node init in three phases so the slot's consistent point lands +// after the publication's commit (fixing the 42704 replay): // -// 1. Phase A (tx 1): create schema, helpers, the CDC metadata table, -// exclude it from Spock replication sets, create the publication, -// and capture the publication's commit LSN. Commit. -// 2. Phase B (no tx): create the logical replication slot via a fresh -// replication-mode connection. Its consistent point is now -// guaranteed to be > the publication's commit LSN, fixing the -// "publication does not exist" replay error. -// 3. Phase C (tx 2): persist slot/start_lsn/pub_commit_lsn into -// ace_cdc_metadata, wrapped in spock.repair_mode(true) so the -// write stays node-local while peer nodes may still have the -// metadata table in their replication set. +// A (tx 1): schema, helpers, metadata table, repset exclusion, CREATE +// PUBLICATION, capture pub_commit_lsn. Commit. +// B: CREATE replication slot — consistent_point > commit_lsn. +// C (tx 2): write metadata under spock.repair_mode so start_lsn stays +// node-local even while peer repsets still hold the table. // -// Each iteration runs inside this function so deferred Close/Rollback -// calls fire per-node rather than at MtreeInit return. +// Wrapped in a closure so per-node defers (Close/Rollback) fire here, +// not at MtreeInit return. func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, slotName string) error { logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) @@ -882,23 +877,37 @@ func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, s } if err := func() (err error) { - tx, err := pool.Begin(m.Ctx) + // repair_mode is a session GUC; borrow a conn so defer + // Set(false) runs before Release. + c, err := pool.Acquire(m.Ctx) if err != nil { - return err + return fmt.Errorf("acquire connection for cdc metadata write: %w", err) } - defer tx.Rollback(m.Ctx) + defer c.Release() - if err := cdc.SetSpockRepairMode(m.Ctx, tx, true); err != nil { + if err := cdc.SetSpockRepairMode(m.Ctx, c, true); err != nil { return fmt.Errorf("enable spock repair_mode: %w", err) } + // Reset must survive m.Ctx cancellation, else conn returns + // to the pool with the GUC on. + resetCtx := context.WithoutCancel(m.Ctx) + defer func() { + if err := cdc.SetSpockRepairMode(resetCtx, c, false); err != nil { + logger.Warn("failed to reset spock repair_mode before connection release: %v", err) + } + }() + + tx, err := c.Begin(m.Ctx) + if err != nil { + return err + } + defer tx.Rollback(m.Ctx) + if err := queries.InitCDCMetadata(m.Ctx, tx, publicationName, slotName, slotLSN.String(), pubCommitLSN, []string{}); err != nil { return fmt.Errorf("write cdc metadata: %w", err) } - if err := cdc.SetSpockRepairMode(m.Ctx, tx, false); err != nil { - return fmt.Errorf("disable spock repair_mode: %w", err) - } return tx.Commit(m.Ctx) }(); err != nil { // Slot leak mitigation: Phase B created a slot that we failed to diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index c79e4a7..2001c8e 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -100,16 +100,11 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return fmt.Errorf("failed to parse lsn: %v", err) } - // Publication-commit guard. If metadata records the LSN at which the - // publication was committed during MtreeInit, refuse to start a stream - // from any LSN strictly older than that — pgoutput's historical - // catalog snapshot would not yet contain the publication and the - // stream would abort with "publication does not exist" (SQLSTATE - // 42704). - // - // Empty pubCommitLSNStr means the metadata row was written by a - // version of ACE that pre-dates this fix; we cannot check the - // invariant and fall through with a warning. + // Tripwire: replay from start_lsn < pub_commit_lsn aborts with + // SQLSTATE 42704 (publication not yet in pgoutput's historical + // catalog snapshot). Catches partial state on upgrade and narrow + // cross-node leaks only; general defense is repset exclusion + + // repair_mode. Empty pub_commit_lsn = legacy row, warn and skip. if pubCommitLSNStr == "" { logger.Warn("ace_cdc_metadata has no pub_commit_lsn for publication %s; cannot verify slot/publication ordering. If you see SQLSTATE 42704, run 'ace mtree teardown' and 'ace mtree init' to recover", publication) } else { @@ -451,12 +446,8 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } if !continuous { - // repair_mode is a SESSION GUC; SQL ROLLBACK does not reset it - // and a deferred reset against the tx fails because the tx is - // already closed at commit time. Acquire a dedicated conn so - // Set(false) runs against the still-live session before the - // conn returns to the pool — preventing pool poisoning if any - // step between Set(true) and Commit returns early. + // repair_mode is a session GUC; borrow a conn so defer + // Set(false) runs before Release. c, err := pool.Acquire(processingCtx) if err != nil { if conn != nil { @@ -470,8 +461,7 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont tables = []string{} } - // start_lsn is the local slot's progress; cross-node propagation - // triggers SQLSTATE 42704 on peers. + // start_lsn is node-local; cross-node propagation triggers 42704. if err := SetSpockRepairMode(processingCtx, c, true); err != nil { if conn != nil { conn.Close(ctx) @@ -544,25 +534,36 @@ func flushMetadata(ctx context.Context, pool *pgxpool.Pool, publication, slotNam tables = []string{} } - tx, err := pool.Begin(ctx) + // repair_mode is a session GUC; borrow a conn so defer Set(false) + // runs before Release. Periodic+shutdown path — leaks here poison + // the pool for the rest of the CDC stream. + c, err := pool.Acquire(ctx) if err != nil { - return fmt.Errorf("failed to begin metadata update transaction: %w", err) + return fmt.Errorf("failed to acquire connection for metadata flush: %w", err) } - defer tx.Rollback(ctx) + defer c.Release() - // start_lsn is the local slot's progress; cross-node propagation - // triggers SQLSTATE 42704 on peers. See SetSpockRepairMode's - // CAUTION on session-scoped state — Set(false) below MUST run. - if err := SetSpockRepairMode(ctx, tx, true); err != nil { + // start_lsn is node-local; cross-node propagation triggers 42704. + if err := SetSpockRepairMode(ctx, c, true); err != nil { return fmt.Errorf("failed to enable spock repair_mode: %w", err) } + // Reset must run even after caller ctx cancellation (shutdown + // path), or the conn returns to the pool with the GUC on. + resetCtx := context.WithoutCancel(ctx) + defer func() { + if err := SetSpockRepairMode(resetCtx, c, false); err != nil { + logger.Warn("failed to reset spock repair_mode before connection release: %v", err) + } + }() - if err := queries.UpdateCDCMetadata(ctx, tx, publication, slotName, lsn.String(), tables); err != nil { - return fmt.Errorf("failed to update cdc metadata: %w", err) + tx, err := c.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin metadata update transaction: %w", err) } + defer tx.Rollback(ctx) - if err := SetSpockRepairMode(ctx, tx, false); err != nil { - return fmt.Errorf("failed to disable spock repair_mode: %w", err) + if err := queries.UpdateCDCMetadata(ctx, tx, publication, slotName, lsn.String(), tables); err != nil { + return fmt.Errorf("failed to update cdc metadata: %w", err) } if err := tx.Commit(ctx); err != nil { diff --git a/internal/infra/cdc/setup.go b/internal/infra/cdc/setup.go index 7ebb304..907e5e2 100644 --- a/internal/infra/cdc/setup.go +++ b/internal/infra/cdc/setup.go @@ -75,18 +75,10 @@ func ExcludeMetadataFromSpockRepsets(ctx context.Context, db queries.DBQuerier, return nil } -// While repair mode is active, local writes are NOT replicated to peer -// nodes. Required around writes to ace_cdc_metadata: start_lsn is the -// node's own slot position and must stay node-local even if the table -// briefly lives in a peer's repset during the per-node init loop. -// -// CAUTION: repair_mode is SESSION-scoped, not transaction-scoped. SQL -// ROLLBACK does NOT reset it. Every Set(true) must reach a matching -// Set(false) on every code path; an error between the two leaves the -// pooled connection poisoned and the next borrower's writes will not -// replicate. -// -// No-op when Spock is not installed. +// Required around ace_cdc_metadata writes: start_lsn is node-local +// and the table may briefly live in a peer's repset during per-node +// init. CAUTION: session-scoped, not tx-scoped — rollback does not +// reset it. No-op when Spock is not installed. func SetSpockRepairMode(ctx context.Context, db queries.DBQuerier, on bool) error { spockInstalled, err := queries.CheckSpockInstalled(ctx, db) if err != nil { diff --git a/tests/integration/cdc_init_ordering_test.go b/tests/integration/cdc_init_ordering_test.go index 8ce1263..fa7bf62 100644 --- a/tests/integration/cdc_init_ordering_test.go +++ b/tests/integration/cdc_init_ordering_test.go @@ -183,7 +183,8 @@ func assertSentinelDoesNotPropagate(t *testing.T, ctx context.Context) { ), sentinel, config.Cfg.MTree.CDC.PublicationName) require.NoError(t, err, "write sentinel start_lsn on n1") - deadline := time.Now().Add(3 * time.Second) + // Long enough for Spock apply lag on slow CI. + deadline := time.Now().Add(10 * time.Second) for time.Now().Before(deadline) { var n2StartLSN string err := pgCluster.Node2Pool.QueryRow(ctx, fmt.Sprintf( From 04b11aff204a8dcdcbd979d77221445a5521d00b Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Wed, 20 May 2026 08:35:18 +0200 Subject: [PATCH 6/6] Fix: address Codacy complaints --- tests/integration/cdc_init_ordering_test.go | 62 ++++++++++++++------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/tests/integration/cdc_init_ordering_test.go b/tests/integration/cdc_init_ordering_test.go index fa7bf62..093efae 100644 --- a/tests/integration/cdc_init_ordering_test.go +++ b/tests/integration/cdc_init_ordering_test.go @@ -30,12 +30,22 @@ import ( "testing" "time" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/pgedge/ace/internal/infra/cdc" "github.com/pgedge/ace/pkg/config" "github.com/stretchr/testify/require" ) +// cdcMetadataTable returns the safely-quoted "schema"."ace_cdc_metadata" +// identifier. Built once with pgx.Identifier.Sanitize() so call sites can +// concatenate it into SQL without fmt.Sprintf — matches the production +// convention in queries package and keeps Codacy/Semgrep's +// concat-sqli pattern quiet (schema is config-loaded, never user input). +func cdcMetadataTable() string { + return pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize() +} + // TestMtreeInitSlotIsAfterPublication verifies bug 1's fix: the replication // slot's consistent point sits at or beyond the publication's commit LSN, so // pgoutput can always find the publication in the historical catalog snapshot. @@ -65,10 +75,12 @@ func TestMtreeInitSlotIsAfterPublication(t *testing.T) { } { // pub_commit_lsn must be populated by the new InitCDCMetadata path. var pubCommitLSN string - err := np.pool.QueryRow(ctx, fmt.Sprintf( - "SELECT COALESCE(pub_commit_lsn, '') FROM %s.ace_cdc_metadata WHERE publication_name = $1", - config.Cfg.MTree.Schema, - ), config.Cfg.MTree.CDC.PublicationName).Scan(&pubCommitLSN) + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := np.pool.QueryRow(ctx, + "SELECT COALESCE(pub_commit_lsn, '') FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&pubCommitLSN) require.NoError(t, err, "read pub_commit_lsn on %s", np.name) require.NotEmpty(t, pubCommitLSN, "pub_commit_lsn must be set on %s (proves new InitCDCMetadata ran)", np.name) @@ -123,7 +135,7 @@ func TestMtreeInitExcludesMetadataFromSpock(t *testing.T) { dropCDCTestTable(t, tableName) }) - qualifiedMetadata := fmt.Sprintf("%s.ace_cdc_metadata", config.Cfg.MTree.Schema) + qualifiedMetadata := cdcMetadataTable() // Force the state that Spock would create under DDL replication: add // the metadata table to the default repset on each node. @@ -177,20 +189,24 @@ func assertMetadataNotInAnyRepset(t *testing.T, ctx context.Context) { func assertSentinelDoesNotPropagate(t *testing.T, ctx context.Context) { t.Helper() sentinel := "FFFF/FFFFFFF0" - _, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( - "UPDATE %s.ace_cdc_metadata SET start_lsn = $1 WHERE publication_name = $2", - config.Cfg.MTree.Schema, - ), sentinel, config.Cfg.MTree.CDC.PublicationName) + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + _, err := pgCluster.Node1Pool.Exec(ctx, + "UPDATE "+cdcMetadataTable()+" SET start_lsn = $1 WHERE publication_name = $2", + sentinel, config.Cfg.MTree.CDC.PublicationName, + ) require.NoError(t, err, "write sentinel start_lsn on n1") // Long enough for Spock apply lag on slow CI. deadline := time.Now().Add(10 * time.Second) for time.Now().Before(deadline) { var n2StartLSN string - err := pgCluster.Node2Pool.QueryRow(ctx, fmt.Sprintf( - "SELECT start_lsn FROM %s.ace_cdc_metadata WHERE publication_name = $1", - config.Cfg.MTree.Schema, - ), config.Cfg.MTree.CDC.PublicationName).Scan(&n2StartLSN) + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := pgCluster.Node2Pool.QueryRow(ctx, + "SELECT start_lsn FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&n2StartLSN) require.NoError(t, err) require.NotEqual(t, sentinel, n2StartLSN, "sentinel start_lsn leaked from n1 to n2 via spock replication") @@ -238,10 +254,12 @@ func TestProcessReplicationStreamRejectsStaleStartLSN(t *testing.T) { // Read pub_commit_lsn and craft a stale start_lsn strictly below it. var pubCommitLSN string - err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf( - "SELECT pub_commit_lsn FROM %s.ace_cdc_metadata WHERE publication_name = $1", - config.Cfg.MTree.Schema, - ), config.Cfg.MTree.CDC.PublicationName).Scan(&pubCommitLSN) + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := pgCluster.Node1Pool.QueryRow(ctx, + "SELECT pub_commit_lsn FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&pubCommitLSN) require.NoError(t, err) require.NotEmpty(t, pubCommitLSN, "fix relies on pub_commit_lsn being populated") @@ -251,10 +269,12 @@ func TestProcessReplicationStreamRejectsStaleStartLSN(t *testing.T) { ).Scan(&staleLSN) require.NoError(t, err) - _, err = pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( - "UPDATE %s.ace_cdc_metadata SET start_lsn = $1 WHERE publication_name = $2", - config.Cfg.MTree.Schema, - ), staleLSN, config.Cfg.MTree.CDC.PublicationName) + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + _, err = pgCluster.Node1Pool.Exec(ctx, + "UPDATE "+cdcMetadataTable()+" SET start_lsn = $1 WHERE publication_name = $2", + staleLSN, config.Cfg.MTree.CDC.PublicationName, + ) require.NoError(t, err, "inject stale start_lsn") nodeInfo := pgCluster.ClusterNodes[0]