diff --git a/db/queries/queries.go b/db/queries/queries.go index 8dfe37c..1e6b4e9 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -2913,18 +2913,104 @@ func DropCDCMetadataTable(ctx context.Context, db DBQuerier) error { return nil } -func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (string, string, []string, error) { +// pubCommitLSN is empty for legacy metadata rows that pre-date the +// pub_commit_lsn column; callers must treat empty as "invariant +// uncheckable" and skip the publication-commit guard with a warning. +func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (slotName, startLSN string, tables []string, pubCommitLSN string, err error) { sql, err := RenderSQL(SQLTemplates.GetCDCMetadata, nil) if err != nil { - return "", "", nil, err + return "", "", nil, "", err } - var slotName, startLSN string - var tables []string - err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables) + err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables, &pubCommitLSN) + if err != nil { + return "", "", nil, "", err + } + return slotName, startLSN, tables, pubCommitLSN, nil +} + +// Init path only. Ongoing flushes use UpdateCDCMetadata, which deliberately +// leaves pub_commit_lsn untouched so the listen.go guard always compares +// against the LSN captured at the matching init. +func InitCDCMetadata(ctx context.Context, db DBQuerier, publicationName, slotName, startLSN, pubCommitLSN string, tables []string) error { + sql, err := RenderSQL(SQLTemplates.InitCDCMetadata, nil) + if err != nil { + return fmt.Errorf("failed to render InitCDCMetadata SQL: %w", err) + } + if tables == nil { + tables = []string{} + } + _, err = db.Exec(ctx, sql, publicationName, slotName, startLSN, pubCommitLSN, tables) + if err != nil { + return fmt.Errorf("query to init cdc metadata failed: %w", err) + } + return nil +} + +// Called mid-Phase-A, after CREATE PUBLICATION, so the captured value is +// strictly less than Phase A's commit LSN. The slot created in Phase B +// has consistent_point >= that commit LSN, hence consistent_point > +// captured value: a safe lower bound for any valid replication start LSN. +func CurrentWalInsertLSN(ctx context.Context, db DBQuerier) (string, error) { + sql, err := RenderSQL(SQLTemplates.CurrentWalInsertLSN, nil) + if err != nil { + return "", fmt.Errorf("failed to render CurrentWalInsertLSN SQL: %w", err) + } + var lsn string + if err := db.QueryRow(ctx, sql).Scan(&lsn); err != nil { + return "", fmt.Errorf("failed to fetch current WAL insert LSN: %w", err) + } + return lsn, nil +} + +// Caller must have verified Spock is installed; this function does not +// gate on extension presence and will error otherwise. +func EnumerateMetadataRepsets(ctx context.Context, db DBQuerier, aceSchema string) ([]string, error) { + sql, err := RenderSQL(SQLTemplates.EnumerateMetadataRepsets, nil) if err != nil { - return "", "", nil, err + return nil, fmt.Errorf("failed to render EnumerateMetadataRepsets SQL: %w", err) } - return slotName, startLSN, tables, nil + rows, err := db.Query(ctx, sql, aceSchema) + if err != nil { + return nil, fmt.Errorf("query to enumerate metadata repsets failed: %w", err) + } + defer rows.Close() + var sets []string + for rows.Next() { + var s string + if err := rows.Scan(&s); err != nil { + return nil, fmt.Errorf("scan repset name: %w", err) + } + sets = append(sets, s) + } + return sets, rows.Err() +} + +// spock.repset_remove_table is a function call, not DDL — it does NOT +// propagate cross-node. Callers that need every node to drop a table +// from a repset must invoke this per-node. +func RepsetRemoveTable(ctx context.Context, db DBQuerier, setName, qualifiedTable string) error { + sql, err := RenderSQL(SQLTemplates.RepsetRemoveTable, nil) + if err != nil { + return fmt.Errorf("failed to render RepsetRemoveTable SQL: %w", err) + } + if _, err := db.Exec(ctx, sql, setName, qualifiedTable); err != nil { + return fmt.Errorf("repset_remove_table(%s, %s) failed: %w", setName, qualifiedTable, err) + } + return nil +} + +// CAUTION: session-scoped, not tx-scoped. Rollback does NOT reset it. +// Pair Set(true) with Set(false) on every path or the pooled conn +// returns poisoned and the next borrower silently won't replicate. +func SetSpockRepairMode(ctx context.Context, db DBQuerier, on bool) error { + sql, err := RenderSQL(SQLTemplates.SetSpockRepairMode, nil) + if err != nil { + return fmt.Errorf("failed to render SetSpockRepairMode SQL: %w", err) + } + if _, err := db.Exec(ctx, sql, on); err != nil { + return fmt.Errorf("spock.repair_mode(%v) failed: %w", on, err) + } + return nil } func UpdateMtreeCounters(ctx context.Context, db DBQuerier, mtreeTable string, isComposite bool, compositeTypeName string, pkeyType string, inserts, deletes, updates []string) error { diff --git a/db/queries/templates.go b/db/queries/templates.go index 6a0ffb8..13d5a3f 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -145,6 +145,12 @@ type Templates struct { ResetReplicationOriginSession *template.Template SetupReplicationOriginXact *template.Template ResetReplicationOriginXact *template.Template + + InitCDCMetadata *template.Template + EnumerateMetadataRepsets *template.Template + RepsetRemoveTable *template.Template + SetSpockRepairMode *template.Template + CurrentWalInsertLSN *template.Template } var SQLTemplates = Templates{ @@ -221,6 +227,39 @@ var SQLTemplates = Templates{ last_updated = EXCLUDED.last_updated `)), + // On conflict every column including pub_commit_lsn is refreshed: + // reaching this query path means a re-init, which created a fresh + // publication with a new commit LSN, and listen.go's guard must + // compare against that current LSN — not a stale prior one. + InitCDCMetadata: template.Must(template.New("initCdcMetadata").Funcs(aceTemplateFuncs).Parse(` + INSERT INTO + {{aceSchema}}.ace_cdc_metadata ( + publication_name, + slot_name, + start_lsn, + pub_commit_lsn, + tables, + last_updated + ) + VALUES + ( + $1, + $2, + $3, + $4, + $5, + current_timestamp + ) + ON CONFLICT (publication_name) DO + UPDATE + SET + slot_name = EXCLUDED.slot_name, + start_lsn = EXCLUDED.start_lsn, + pub_commit_lsn = EXCLUDED.pub_commit_lsn, + tables = EXCLUDED.tables, + last_updated = EXCLUDED.last_updated + `)), + DropPublication: template.Must(template.New("dropPublication").Parse(` DROP PUBLICATION IF EXISTS {{.PublicationName}} `)), @@ -244,7 +283,8 @@ var SQLTemplates = Templates{ SELECT slot_name, start_lsn, - tables + tables, + COALESCE(pub_commit_lsn, '') AS pub_commit_lsn FROM {{aceSchema}}.ace_cdc_metadata WHERE @@ -338,9 +378,13 @@ var SQLTemplates = Templates{ publication_name text PRIMARY KEY, slot_name text, start_lsn text, + pub_commit_lsn text, tables text[], last_updated timestamptz - )`), + ); + -- Forward-compatible addition for clusters that ran older versions. + ALTER TABLE {{aceSchema}}.ace_cdc_metadata + ADD COLUMN IF NOT EXISTS pub_commit_lsn text;`), ), GetPrimaryKey: template.Must(template.New("getPrimaryKey").Parse(` SELECT @@ -1599,4 +1643,42 @@ var SQLTemplates = Templates{ ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(` SELECT pg_replication_origin_xact_reset() `)), + + // EnumerateMetadataRepsets lists every Spock replication set that + // contains pgedge_ace.ace_cdc_metadata on the current node. Used by + // cdc.ExcludeMetadataFromSpockRepsets to undo Spock's DDL-triggered + // auto-add of the metadata table. + // + // spock.tables surfaces every known relation; rows where the table is + // not in any replication set carry a NULL set_name and must be + // filtered out — otherwise the scan fails with "cannot scan NULL into + // *string" the moment the metadata table exists but has not yet been + // added to a repset (i.e., every fresh init on a cluster with DDL + // replication disabled). + EnumerateMetadataRepsets: template.Must(template.New("enumerateMetadataRepsets").Funcs(aceTemplateFuncs).Parse(` + SELECT set_name + FROM spock.tables + WHERE nspname = $1 AND relname = 'ace_cdc_metadata' AND set_name IS NOT NULL + `)), + + // RepsetRemoveTable wraps spock.repset_remove_table. Returns void; we + // don't care about the return value, only that the call doesn't error. + RepsetRemoveTable: template.Must(template.New("repsetRemoveTable").Parse(` + SELECT spock.repset_remove_table($1, $2) + `)), + + // CAUTION: session-scoped, not tx-scoped. Pair $1=true with + // $1=false on every path or the pool returns poisoned. + SetSpockRepairMode: template.Must(template.New("setSpockRepairMode").Parse(` + SELECT spock.repair_mode($1) + `)), + + // CurrentWalInsertLSN returns pg_current_wal_insert_lsn() as text. Used + // at MtreeInit time, just after CREATE PUBLICATION, as the lower bound + // for any future replication start LSN. The slot's consistent point is + // guaranteed to be >= this value because the slot is created after this + // transaction commits. + CurrentWalInsertLSN: template.Must(template.New("currentWalInsertLSN").Parse(` + SELECT pg_current_wal_insert_lsn()::text + `)), } diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 4b6bd03..1e870a7 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -807,51 +807,120 @@ func (m *MerkleTreeTask) MtreeInit() (err error) { cfg := config.Get().MTree.CDC for _, nodeInfo := range m.ClusterNodes { - logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) - - lsn, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo) - if err != nil { - return fmt.Errorf("failed to set up replication slot on node %s: %w", nodeInfo["Name"], err) + if err := m.initOneNode(nodeInfo, cfg.PublicationName, cfg.SlotName); err != nil { + return err } + } + return nil +} - pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) - if err != nil { - return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err) - } - defer pool.Close() +// Per-node init in three phases so the slot's consistent point lands +// after the publication's commit (fixing the 42704 replay): +// +// A (tx 1): schema, helpers, metadata table, repset exclusion, CREATE +// PUBLICATION, capture pub_commit_lsn. Commit. +// B: CREATE replication slot — consistent_point > commit_lsn. +// C (tx 2): write metadata under spock.repair_mode so start_lsn stays +// node-local even while peer repsets still hold the table. +// +// Wrapped in a closure so per-node defers (Close/Rollback) fire here, +// not at MtreeInit return. +func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, slotName string) error { + logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) - if err = queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { - return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err) - } + pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) + if err != nil { + return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err) + } + defer pool.Close() + if err := queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { + return fmt.Errorf("failed to create schema '%s' on node %s: %w", m.aceSchema(), nodeInfo["Name"], err) + } + + var pubCommitLSN string + if err := func() (err error) { tx, err := pool.Begin(m.Ctx) if err != nil { - return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) + return err } defer tx.Rollback(m.Ctx) - if err = queries.CreateXORFunction(m.Ctx, tx); err != nil { - return fmt.Errorf("failed to create xor function: %w", err) + if err := queries.CreateXORFunction(m.Ctx, tx); err != nil { + return fmt.Errorf("create xor function: %w", err) } - - if err = queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil { - return fmt.Errorf("failed to create cdc metadata table: %w", err) + if err := queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil { + return fmt.Errorf("create cdc metadata table: %w", err) } + if err := cdc.ExcludeMetadataFromSpockRepsets(m.Ctx, tx, m.aceSchema()); err != nil { + return fmt.Errorf("exclude ace_cdc_metadata from spock repsets: %w", err) + } + if err := cdc.SetupPublication(m.Ctx, tx, publicationName); err != nil { + return fmt.Errorf("setup publication: %w", err) + } + // Captured mid-tx after CREATE PUBLICATION, so the value is + // strictly less than this tx's commit LSN; Phase B's slot + // consistent_point is >= that commit LSN. listen.go's guard + // therefore catches any start LSN that predates the publication. + pubCommitLSN, err = queries.CurrentWalInsertLSN(m.Ctx, tx) + if err != nil { + return fmt.Errorf("capture publication commit LSN: %w", err) + } + return tx.Commit(m.Ctx) + }(); err != nil { + return fmt.Errorf("phase A failed on node %s: %w", nodeInfo["Name"], err) + } - if err := cdc.SetupPublication(m.Ctx, tx, cfg.PublicationName); err != nil { - return fmt.Errorf("failed to setup publication on node %s: %w", nodeInfo["Name"], err) + slotLSN, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo) + if err != nil { + return fmt.Errorf("phase B (replication slot) failed on node %s: %w", nodeInfo["Name"], err) + } + + if err := func() (err error) { + // repair_mode is a session GUC; borrow a conn so defer + // Set(false) runs before Release. + c, err := pool.Acquire(m.Ctx) + if err != nil { + return fmt.Errorf("acquire connection for cdc metadata write: %w", err) } + defer c.Release() - if err = queries.UpdateCDCMetadata(m.Ctx, tx, cfg.PublicationName, cfg.SlotName, lsn.String(), []string{}); err != nil { - return fmt.Errorf("failed to update cdc metadata: %w", err) + if err := cdc.SetSpockRepairMode(m.Ctx, c, true); err != nil { + return fmt.Errorf("enable spock repair_mode: %w", err) } + // Reset must survive m.Ctx cancellation, else conn returns + // to the pool with the GUC on. + resetCtx := context.WithoutCancel(m.Ctx) + defer func() { + if err := cdc.SetSpockRepairMode(resetCtx, c, false); err != nil { + logger.Warn("failed to reset spock repair_mode before connection release: %v", err) + } + }() - if err := tx.Commit(m.Ctx); err != nil { - return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err) + tx, err := c.Begin(m.Ctx) + if err != nil { + return err } + defer tx.Rollback(m.Ctx) - logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"]) + if err := queries.InitCDCMetadata(m.Ctx, tx, + publicationName, slotName, + slotLSN.String(), pubCommitLSN, []string{}); err != nil { + return fmt.Errorf("write cdc metadata: %w", err) + } + return tx.Commit(m.Ctx) + }(); err != nil { + // Slot leak mitigation: Phase B created a slot that we failed to + // record. A subsequent SetupReplicationSlot will drop any prior + // slot of the same name, so a re-run reaps it — but try a best + // effort drop here too to keep the cluster tidy. + if dropErr := queries.DropReplicationSlot(m.Ctx, pool, slotName); dropErr != nil { + logger.Warn("failed to drop orphaned slot %s on node %s: %v", slotName, nodeInfo["Name"], dropErr) + } + return fmt.Errorf("phase C failed on node %s: %w", nodeInfo["Name"], err) } + + logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"]) return nil } @@ -1427,7 +1496,7 @@ func (m *MerkleTreeTask) BuildMtree() (err error) { } defer tx.Rollback(m.Ctx) - slotName, startLSN, tables, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) + slotName, startLSN, tables, _, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) if err != nil { pool.Close() return fmt.Errorf("failed to get cdc metadata on node %s: %w", nodeInfo["Name"], err) diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 0038dae..2001c8e 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -65,6 +65,7 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont publication := cfg.PublicationName slotName := cfg.SlotName var startLSNStr string + var pubCommitLSNStr string var tables []string func() { tx, err := pool.Begin(ctx) @@ -74,13 +75,14 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } defer tx.Rollback(ctx) - metaSlot, metaLSN, metaTables, err := queries.GetCDCMetadata(ctx, tx, publication) + metaSlot, metaLSN, metaTables, metaPubCommit, err := queries.GetCDCMetadata(ctx, tx, publication) if err != nil { logger.Error("failed to get cdc metadata: %v", err) startLSNStr = "" return } startLSNStr = metaLSN + pubCommitLSNStr = metaPubCommit tables = metaTables if metaSlot != "" { slotName = metaSlot @@ -98,10 +100,28 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return fmt.Errorf("failed to parse lsn: %v", err) } - slotConfirmedLSN, err := getSlotConfirmedFlushLSN(ctx, pool, slotName) - if err != nil { - logger.Error("failed to get confirmed_flush_lsn for slot %s: %v", slotName, err) - return fmt.Errorf("failed to get confirmed_flush_lsn for slot %s: %w", slotName, err) + // Tripwire: replay from start_lsn < pub_commit_lsn aborts with + // SQLSTATE 42704 (publication not yet in pgoutput's historical + // catalog snapshot). Catches partial state on upgrade and narrow + // cross-node leaks only; general defense is repset exclusion + + // repair_mode. Empty pub_commit_lsn = legacy row, warn and skip. + if pubCommitLSNStr == "" { + logger.Warn("ace_cdc_metadata has no pub_commit_lsn for publication %s; cannot verify slot/publication ordering. If you see SQLSTATE 42704, run 'ace mtree teardown' and 'ace mtree init' to recover", publication) + } else { + pubCommitLSN, err := pglogrepl.ParseLSN(pubCommitLSNStr) + if err != nil { + logger.Error("failed to parse pub_commit_lsn %s: %v", pubCommitLSNStr, err) + return fmt.Errorf("failed to parse pub_commit_lsn %s: %w", pubCommitLSNStr, err) + } + if startLSN < pubCommitLSN { + return fmt.Errorf( + "ace_cdc_metadata.start_lsn (%s) is older than publication commit LSN (%s) "+ + "for slot %s; this typically indicates ace_cdc_metadata was replicated "+ + "cross-node by Spock, or the slot/metadata is from a prior init. "+ + "Run 'ace mtree teardown' and 'ace mtree init' to recover", + startLSN, pubCommitLSN, slotName, + ) + } } targetFlushLSN := pglogrepl.LSN(0) @@ -119,9 +139,6 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } lastLSN := startLSN - if slotConfirmedLSN != 0 && slotConfirmedLSN < lastLSN { - lastLSN = slotConfirmedLSN - } var lastFlushedLSN pglogrepl.LSN = lastLSN var lastLSNVal atomic.Uint64 lastLSNVal.Store(uint64(lastLSN)) @@ -429,19 +446,43 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } if !continuous { - tx, err := pool.Begin(processingCtx) + // repair_mode is a session GUC; borrow a conn so defer + // Set(false) runs before Release. + c, err := pool.Acquire(processingCtx) if err != nil { if conn != nil { conn.Close(ctx) } - return fmt.Errorf("failed to begin metadata update transaction: %w", err) + return fmt.Errorf("failed to acquire connection for metadata update: %w", err) } - defer tx.Rollback(processingCtx) + defer c.Release() if tables == nil { tables = []string{} } + // start_lsn is node-local; cross-node propagation triggers 42704. + if err := SetSpockRepairMode(processingCtx, c, true); err != nil { + if conn != nil { + conn.Close(ctx) + } + return fmt.Errorf("failed to enable spock repair_mode: %w", err) + } + defer func() { + if err := SetSpockRepairMode(processingCtx, c, false); err != nil { + logger.Warn("failed to reset spock repair_mode before connection release: %v", err) + } + }() + + tx, err := c.Begin(processingCtx) + if err != nil { + if conn != nil { + conn.Close(ctx) + } + return fmt.Errorf("failed to begin metadata update transaction: %w", err) + } + defer tx.Rollback(processingCtx) + if err := queries.UpdateCDCMetadata(processingCtx, tx, publication, slotName, lastLSN.String(), tables); err != nil { if conn != nil { conn.Close(ctx) @@ -472,22 +513,6 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return nil } -func getSlotConfirmedFlushLSN(ctx context.Context, pool *pgxpool.Pool, slotName string) (pglogrepl.LSN, error) { - var lsnStr string - err := pool.QueryRow(ctx, "SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1", slotName).Scan(&lsnStr) - if err != nil { - return 0, fmt.Errorf("failed to fetch confirmed_flush_lsn for slot %s: %w", slotName, err) - } - if lsnStr == "" { - return 0, fmt.Errorf("confirmed_flush_lsn empty for slot %s", slotName) - } - lsn, err := pglogrepl.ParseLSN(lsnStr) - if err != nil { - return 0, fmt.Errorf("failed to parse confirmed_flush_lsn %s for slot %s: %w", lsnStr, slotName, err) - } - return lsn, nil -} - func getCurrentWalFlushLSN(ctx context.Context, pool *pgxpool.Pool) (pglogrepl.LSN, error) { var lsnStr string err := pool.QueryRow(ctx, "SELECT pg_current_wal_flush_lsn()").Scan(&lsnStr) @@ -509,7 +534,29 @@ func flushMetadata(ctx context.Context, pool *pgxpool.Pool, publication, slotNam tables = []string{} } - tx, err := pool.Begin(ctx) + // repair_mode is a session GUC; borrow a conn so defer Set(false) + // runs before Release. Periodic+shutdown path — leaks here poison + // the pool for the rest of the CDC stream. + c, err := pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("failed to acquire connection for metadata flush: %w", err) + } + defer c.Release() + + // start_lsn is node-local; cross-node propagation triggers 42704. + if err := SetSpockRepairMode(ctx, c, true); err != nil { + return fmt.Errorf("failed to enable spock repair_mode: %w", err) + } + // Reset must run even after caller ctx cancellation (shutdown + // path), or the conn returns to the pool with the GUC on. + resetCtx := context.WithoutCancel(ctx) + defer func() { + if err := SetSpockRepairMode(resetCtx, c, false); err != nil { + logger.Warn("failed to reset spock repair_mode before connection release: %v", err) + } + }() + + tx, err := c.Begin(ctx) if err != nil { return fmt.Errorf("failed to begin metadata update transaction: %w", err) } diff --git a/internal/infra/cdc/setup.go b/internal/infra/cdc/setup.go index a135cb8..907e5e2 100644 --- a/internal/infra/cdc/setup.go +++ b/internal/infra/cdc/setup.go @@ -38,6 +38,58 @@ func SetupPublication(ctx context.Context, db queries.DBQuerier, publicationName return nil } +// ExcludeMetadataFromSpockRepsets removes .ace_cdc_metadata from +// every Spock replication set it currently belongs to on the current node. +// +// Spock's DDL-replication path auto-adds new tables to the default repset on +// each node that applies the replicated CREATE TABLE. The metadata table +// holds node-local LSNs that must never propagate cross-node, so we strip +// it out of every repset after creation. This is per-node — repset_remove_table +// is a function call, not DDL, so it does not propagate to peers. +// +// No-op when Spock is not installed. +func ExcludeMetadataFromSpockRepsets(ctx context.Context, db queries.DBQuerier, aceSchema string) error { + spockInstalled, err := queries.CheckSpockInstalled(ctx, db) + if err != nil { + return fmt.Errorf("failed to detect spock extension: %w", err) + } + if !spockInstalled { + return nil + } + + sets, err := queries.EnumerateMetadataRepsets(ctx, db, aceSchema) + if err != nil { + return fmt.Errorf("failed to enumerate repsets containing ace_cdc_metadata: %w", err) + } + if len(sets) == 0 { + return nil + } + + qualified := fmt.Sprintf("%s.ace_cdc_metadata", aceSchema) + for _, set := range sets { + if err := queries.RepsetRemoveTable(ctx, db, set, qualified); err != nil { + return err + } + logger.Info("Removed %s from spock repset '%s'", qualified, set) + } + return nil +} + +// Required around ace_cdc_metadata writes: start_lsn is node-local +// and the table may briefly live in a peer's repset during per-node +// init. CAUTION: session-scoped, not tx-scoped — rollback does not +// reset it. No-op when Spock is not installed. +func SetSpockRepairMode(ctx context.Context, db queries.DBQuerier, on bool) error { + spockInstalled, err := queries.CheckSpockInstalled(ctx, db) + if err != nil { + return fmt.Errorf("failed to detect spock extension: %w", err) + } + if !spockInstalled { + return nil + } + return queries.SetSpockRepairMode(ctx, db, on) +} + func SetupReplicationSlot(ctx context.Context, nodeInfo map[string]any) (pglogrepl.LSN, error) { cfg := config.Get().MTree.CDC slot := cfg.SlotName diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index 8a2771d..fb13b5c 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -252,7 +252,7 @@ func currentMetadataLSN(t *testing.T, ctx context.Context) pglogrepl.LSN { require.NoError(t, err) defer tx.Rollback(ctx) - _, lsnStr, _, err := queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) + _, lsnStr, _, _, err := queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) require.NoError(t, err) lsn, err := pglogrepl.ParseLSN(lsnStr) diff --git a/tests/integration/cdc_init_ordering_test.go b/tests/integration/cdc_init_ordering_test.go new file mode 100644 index 0000000..093efae --- /dev/null +++ b/tests/integration/cdc_init_ordering_test.go @@ -0,0 +1,289 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +// Regression coverage for SQLSTATE 42704 "publication \"ace_mtree_pub\" +// does not exist" during mtree table-diff on Spock clusters. Two defects +// combined to cause it: +// +// 1. MtreeInit created the replication slot before the publication was +// committed, so the slot's consistent point preceded the publication. +// pgoutput's get_publication_oid then failed during replay. +// 2. Spock DDL-replicated CREATE TABLE on ace_cdc_metadata auto-added the +// table to the default replication set on every node. Peers' INSERTs into +// the table propagated cross-node, overwriting each node's local +// start_lsn with a peer's LSN. + +package integration + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pgedge/ace/internal/infra/cdc" + "github.com/pgedge/ace/pkg/config" + "github.com/stretchr/testify/require" +) + +// cdcMetadataTable returns the safely-quoted "schema"."ace_cdc_metadata" +// identifier. Built once with pgx.Identifier.Sanitize() so call sites can +// concatenate it into SQL without fmt.Sprintf — matches the production +// convention in queries package and keeps Codacy/Semgrep's +// concat-sqli pattern quiet (schema is config-loaded, never user input). +func cdcMetadataTable() string { + return pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize() +} + +// TestMtreeInitSlotIsAfterPublication verifies bug 1's fix: the replication +// slot's consistent point sits at or beyond the publication's commit LSN, so +// pgoutput can always find the publication in the historical catalog snapshot. +func TestMtreeInitSlotIsAfterPublication(t *testing.T) { + ctx := context.Background() + tableName := "customers_pub_ordering" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + for _, np := range []struct { + name string + pool *pgxpool.Pool + }{ + {serviceN1, pgCluster.Node1Pool}, + {serviceN2, pgCluster.Node2Pool}, + } { + // pub_commit_lsn must be populated by the new InitCDCMetadata path. + var pubCommitLSN string + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := np.pool.QueryRow(ctx, + "SELECT COALESCE(pub_commit_lsn, '') FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&pubCommitLSN) + require.NoError(t, err, "read pub_commit_lsn on %s", np.name) + require.NotEmpty(t, pubCommitLSN, "pub_commit_lsn must be set on %s (proves new InitCDCMetadata ran)", np.name) + + // The slot's confirmed_flush_lsn is the consistent point of the + // replication slot. It must be >= the publication's commit LSN, or + // pgoutput would replay from a snapshot that doesn't see the + // publication. This is the core ordering invariant the fix restores. + var slotConfirmed string + err = np.pool.QueryRow(ctx, + "SELECT confirmed_flush_lsn::text FROM pg_replication_slots WHERE slot_name = $1", + config.Cfg.MTree.CDC.SlotName, + ).Scan(&slotConfirmed) + require.NoError(t, err, "read slot confirmed_flush_lsn on %s", np.name) + + var aheadOrEqual bool + err = np.pool.QueryRow(ctx, + "SELECT $1::pg_lsn >= $2::pg_lsn", + slotConfirmed, pubCommitLSN, + ).Scan(&aheadOrEqual) + require.NoError(t, err, "compare LSNs on %s", np.name) + require.True(t, aheadOrEqual, + "slot consistent point %s must be >= publication commit LSN %s on %s", + slotConfirmed, pubCommitLSN, np.name) + } +} + +// TestMtreeInitExcludesMetadataFromSpock verifies bug 2's fix: the +// ace_cdc_metadata table is removed from every Spock replication set during +// init, and writes to it stay node-local. +// +// In production this matters because Spock's DDL-replication path auto-adds +// new tables to the default replication set on every node. The test cluster +// runs with DDL replication off, so to genuinely exercise the fix we run +// init, manually add the metadata table to the default repset (simulating +// Spock's auto-add), then re-run init. The fix's +// ExcludeMetadataFromSpockRepsets must clean the entry back out; pre-fix +// init had no such step, so the entry would persist. +func TestMtreeInitExcludesMetadataFromSpock(t *testing.T) { + ctx := context.Background() + tableName := "customers_metadata_isolation" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + qualifiedMetadata := cdcMetadataTable() + + // Force the state that Spock would create under DDL replication: add + // the metadata table to the default repset on each node. + seedMetadataInDefaultRepset(t, ctx, qualifiedMetadata) + + // Re-run init — the fix's ExcludeMetadataFromSpockRepsets must reverse + // the auto-add. Pre-fix init has no such step, so the table stays in + // the default repset and the assertion below fails. + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + + assertMetadataNotInAnyRepset(t, ctx) + assertSentinelDoesNotPropagate(t, ctx) +} + +// seedMetadataInDefaultRepset adds ace_cdc_metadata to Spock's default repset +// on every node, simulating Spock's DDL-replicated auto-add that the test +// cluster (DDL replication off) does not produce naturally. +func seedMetadataInDefaultRepset(t *testing.T, ctx context.Context, qualifiedMetadata string) { + t.Helper() + for _, np := range mtreeTestNodes() { + _, err := np.pool.Exec(ctx, + "SELECT spock.repset_add_table('default', $1::regclass)", + qualifiedMetadata, + ) + require.NoError(t, err, "seed: add metadata to default repset on %s", np.name) + } +} + +// assertMetadataNotInAnyRepset asserts the fix's post-init invariant: no row +// in spock.tables has a non-NULL set_name for ace_cdc_metadata. (The view +// surfaces every relation; NULL set_name means "not in any repset".) +func assertMetadataNotInAnyRepset(t *testing.T, ctx context.Context) { + t.Helper() + for _, np := range mtreeTestNodes() { + var inRepset bool + err := np.pool.QueryRow(ctx, + "SELECT EXISTS (SELECT 1 FROM spock.tables WHERE nspname = $1 AND relname = 'ace_cdc_metadata' AND set_name IS NOT NULL)", + config.Cfg.MTree.Schema, + ).Scan(&inRepset) + require.NoError(t, err, "check spock.tables on %s", np.name) + require.False(t, inRepset, + "ace_cdc_metadata must not be in any spock replication set on %s after re-init; "+ + "otherwise node-local start_lsn leaks cross-node", np.name) + } +} + +// assertSentinelDoesNotPropagate writes a sentinel start_lsn on n1 and +// verifies it never appears on n2 within a 3 s window. Under bug 2 the +// sentinel would propagate via Spock apply within seconds. +func assertSentinelDoesNotPropagate(t *testing.T, ctx context.Context) { + t.Helper() + sentinel := "FFFF/FFFFFFF0" + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + _, err := pgCluster.Node1Pool.Exec(ctx, + "UPDATE "+cdcMetadataTable()+" SET start_lsn = $1 WHERE publication_name = $2", + sentinel, config.Cfg.MTree.CDC.PublicationName, + ) + require.NoError(t, err, "write sentinel start_lsn on n1") + + // Long enough for Spock apply lag on slow CI. + deadline := time.Now().Add(10 * time.Second) + for time.Now().Before(deadline) { + var n2StartLSN string + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := pgCluster.Node2Pool.QueryRow(ctx, + "SELECT start_lsn FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&n2StartLSN) + require.NoError(t, err) + require.NotEqual(t, sentinel, n2StartLSN, + "sentinel start_lsn leaked from n1 to n2 via spock replication") + time.Sleep(200 * time.Millisecond) + } +} + +// mtreeTestNodes returns the (name, pool) pairs the regression tests fan out +// over. Centralised so the two-node iteration body need not be duplicated. +func mtreeTestNodes() []struct { + name string + pool *pgxpool.Pool +} { + return []struct { + name string + pool *pgxpool.Pool + }{ + {serviceN1, pgCluster.Node1Pool}, + {serviceN2, pgCluster.Node2Pool}, + } +} + +// TestProcessReplicationStreamRejectsStaleStartLSN verifies the new +// publication-commit guard in processReplicationStream. We corrupt n1's +// start_lsn to a value strictly older than pub_commit_lsn (which is what +// bug 2 used to do via Spock cross-node leak) and assert that UpdateFromCDC +// returns an explicit, actionable error rather than the cryptic SQLSTATE +// 42704 the customer saw. +func TestProcessReplicationStreamRejectsStaleStartLSN(t *testing.T) { + ctx := context.Background() + tableName := "customers_stale_start_lsn" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + // Read pub_commit_lsn and craft a stale start_lsn strictly below it. + var pubCommitLSN string + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := pgCluster.Node1Pool.QueryRow(ctx, + "SELECT pub_commit_lsn FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&pubCommitLSN) + require.NoError(t, err) + require.NotEmpty(t, pubCommitLSN, "fix relies on pub_commit_lsn being populated") + + var staleLSN string + err = pgCluster.Node1Pool.QueryRow(ctx, + "SELECT ($1::pg_lsn - 16)::text", pubCommitLSN, + ).Scan(&staleLSN) + require.NoError(t, err) + + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + _, err = pgCluster.Node1Pool.Exec(ctx, + "UPDATE "+cdcMetadataTable()+" SET start_lsn = $1 WHERE publication_name = $2", + staleLSN, config.Cfg.MTree.CDC.PublicationName, + ) + require.NoError(t, err, "inject stale start_lsn") + + nodeInfo := pgCluster.ClusterNodes[0] + err = cdc.UpdateFromCDC(context.Background(), nodeInfo) + require.Error(t, err, "UpdateFromCDC must refuse a stream whose start_lsn precedes the publication commit") + + msg := err.Error() + require.Contains(t, msg, "publication commit LSN", + "error must mention the publication-commit invariant, got: %s", msg) + require.NotContains(t, strings.ToLower(msg), "42704", + "caller should see the new guard's actionable error, not the raw 42704 from PostgreSQL: %s", msg) +}