Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 95 additions & 7 deletions db/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,18 +2913,106 @@ func DropCDCMetadataTable(ctx context.Context, db DBQuerier) error {
return nil
}

func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (string, string, []string, error) {
// pubCommitLSN is empty for legacy metadata rows that pre-date the
// pub_commit_lsn column; callers must treat empty as "invariant
// uncheckable" and skip the publication-commit guard with a warning.
func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (slotName, startLSN string, tables []string, pubCommitLSN string, err error) {
sql, err := RenderSQL(SQLTemplates.GetCDCMetadata, nil)
if err != nil {
return "", "", nil, err
return "", "", nil, "", err
}
var slotName, startLSN string
var tables []string
err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables)
err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables, &pubCommitLSN)
if err != nil {
return "", "", nil, "", err
}
return slotName, startLSN, tables, pubCommitLSN, nil
}

// Init path only. Ongoing flushes use UpdateCDCMetadata, which deliberately
// leaves pub_commit_lsn untouched so the listen.go guard always compares
// against the LSN captured at the matching init.
func InitCDCMetadata(ctx context.Context, db DBQuerier, publicationName, slotName, startLSN, pubCommitLSN string, tables []string) error {
sql, err := RenderSQL(SQLTemplates.InitCDCMetadata, nil)
if err != nil {
return fmt.Errorf("failed to render InitCDCMetadata SQL: %w", err)
}
if tables == nil {
tables = []string{}
}
_, err = db.Exec(ctx, sql, publicationName, slotName, startLSN, pubCommitLSN, tables)
if err != nil {
return fmt.Errorf("query to init cdc metadata failed: %w", err)
}
return nil
}

// Called mid-Phase-A, after CREATE PUBLICATION, so the captured value is
// strictly less than Phase A's commit LSN. The slot created in Phase B
// has consistent_point >= that commit LSN, hence consistent_point >
// captured value: a safe lower bound for any valid replication start LSN.
func CurrentWalInsertLSN(ctx context.Context, db DBQuerier) (string, error) {
sql, err := RenderSQL(SQLTemplates.CurrentWalInsertLSN, nil)
if err != nil {
return "", fmt.Errorf("failed to render CurrentWalInsertLSN SQL: %w", err)
}
var lsn string
if err := db.QueryRow(ctx, sql).Scan(&lsn); err != nil {
return "", fmt.Errorf("failed to fetch current WAL insert LSN: %w", err)
}
return lsn, nil
}

// Caller must have verified Spock is installed; this function does not
// gate on extension presence and will error otherwise.
func EnumerateMetadataRepsets(ctx context.Context, db DBQuerier, aceSchema string) ([]string, error) {
sql, err := RenderSQL(SQLTemplates.EnumerateMetadataRepsets, nil)
if err != nil {
return "", "", nil, err
return nil, fmt.Errorf("failed to render EnumerateMetadataRepsets SQL: %w", err)
}
return slotName, startLSN, tables, nil
rows, err := db.Query(ctx, sql, aceSchema)
if err != nil {
return nil, fmt.Errorf("query to enumerate metadata repsets failed: %w", err)
}
defer rows.Close()
var sets []string
for rows.Next() {
var s string
if err := rows.Scan(&s); err != nil {
return nil, fmt.Errorf("scan repset name: %w", err)
}
sets = append(sets, s)
}
return sets, rows.Err()
}

// spock.repset_remove_table is a function call, not DDL — it does NOT
// propagate cross-node. Callers that need every node to drop a table
// from a repset must invoke this per-node.
func RepsetRemoveTable(ctx context.Context, db DBQuerier, setName, qualifiedTable string) error {
sql, err := RenderSQL(SQLTemplates.RepsetRemoveTable, nil)
if err != nil {
return fmt.Errorf("failed to render RepsetRemoveTable SQL: %w", err)
}
if _, err := db.Exec(ctx, sql, setName, qualifiedTable); err != nil {
return fmt.Errorf("repset_remove_table(%s, %s) failed: %w", setName, qualifiedTable, err)
}
return nil
}

// CAUTION: spock.repair_mode is SESSION-scoped, not transaction-scoped.
// SQL ROLLBACK does NOT reset it. Every Set(true) must be paired with a
// Set(false) on every code path, including error paths — otherwise the
// pooled connection is returned with repair_mode still on and subsequent
// borrowers silently fail to replicate their writes.
func SetSpockRepairMode(ctx context.Context, db DBQuerier, on bool) error {
sql, err := RenderSQL(SQLTemplates.SetSpockRepairMode, nil)
if err != nil {
return fmt.Errorf("failed to render SetSpockRepairMode SQL: %w", err)
}
if _, err := db.Exec(ctx, sql, on); err != nil {
return fmt.Errorf("spock.repair_mode(%v) failed: %w", on, err)
}
return nil
}

func UpdateMtreeCounters(ctx context.Context, db DBQuerier, mtreeTable string, isComposite bool, compositeTypeName string, pkeyType string, inserts, deletes, updates []string) error {
Expand Down
88 changes: 86 additions & 2 deletions db/queries/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ type Templates struct {
ResetReplicationOriginSession *template.Template
SetupReplicationOriginXact *template.Template
ResetReplicationOriginXact *template.Template

InitCDCMetadata *template.Template
EnumerateMetadataRepsets *template.Template
RepsetRemoveTable *template.Template
SetSpockRepairMode *template.Template
CurrentWalInsertLSN *template.Template
}

var SQLTemplates = Templates{
Expand Down Expand Up @@ -221,6 +227,39 @@ var SQLTemplates = Templates{
last_updated = EXCLUDED.last_updated
`)),

// On conflict every column including pub_commit_lsn is refreshed:
// reaching this query path means a re-init, which created a fresh
// publication with a new commit LSN, and listen.go's guard must
// compare against that current LSN — not a stale prior one.
InitCDCMetadata: template.Must(template.New("initCdcMetadata").Funcs(aceTemplateFuncs).Parse(`
INSERT INTO
{{aceSchema}}.ace_cdc_metadata (
publication_name,
slot_name,
start_lsn,
pub_commit_lsn,
tables,
last_updated
)
VALUES
(
$1,
$2,
$3,
$4,
$5,
current_timestamp
)
ON CONFLICT (publication_name) DO
UPDATE
SET
slot_name = EXCLUDED.slot_name,
start_lsn = EXCLUDED.start_lsn,
pub_commit_lsn = EXCLUDED.pub_commit_lsn,
tables = EXCLUDED.tables,
last_updated = EXCLUDED.last_updated
`)),

DropPublication: template.Must(template.New("dropPublication").Parse(`
DROP PUBLICATION IF EXISTS {{.PublicationName}}
`)),
Expand All @@ -244,7 +283,8 @@ var SQLTemplates = Templates{
SELECT
slot_name,
start_lsn,
tables
tables,
COALESCE(pub_commit_lsn, '') AS pub_commit_lsn
FROM
{{aceSchema}}.ace_cdc_metadata
WHERE
Expand Down Expand Up @@ -338,9 +378,13 @@ var SQLTemplates = Templates{
publication_name text PRIMARY KEY,
slot_name text,
start_lsn text,
pub_commit_lsn text,
tables text[],
last_updated timestamptz
)`),
);
-- Forward-compatible addition for clusters that ran older versions.
ALTER TABLE {{aceSchema}}.ace_cdc_metadata
ADD COLUMN IF NOT EXISTS pub_commit_lsn text;`),
),
GetPrimaryKey: template.Must(template.New("getPrimaryKey").Parse(`
SELECT
Expand Down Expand Up @@ -1599,4 +1643,44 @@ var SQLTemplates = Templates{
ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(`
SELECT pg_replication_origin_xact_reset()
`)),

// EnumerateMetadataRepsets lists every Spock replication set that
// contains pgedge_ace.ace_cdc_metadata on the current node. Used by
// cdc.ExcludeMetadataFromSpockRepsets to undo Spock's DDL-triggered
// auto-add of the metadata table.
//
// spock.tables surfaces every known relation; rows where the table is
// not in any replication set carry a NULL set_name and must be
// filtered out — otherwise the scan fails with "cannot scan NULL into
// *string" the moment the metadata table exists but has not yet been
// added to a repset (i.e., every fresh init on a cluster with DDL
// replication disabled).
EnumerateMetadataRepsets: template.Must(template.New("enumerateMetadataRepsets").Funcs(aceTemplateFuncs).Parse(`
SELECT set_name
FROM spock.tables
WHERE nspname = $1 AND relname = 'ace_cdc_metadata' AND set_name IS NOT NULL
Copy link
Copy Markdown
Contributor

@ibrarahmad ibrarahmad May 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema is parameterized via $1 but the relname 'ace_cdc_metadata' is hardcoded. A shared const cdcMetadataTableName referenced from both the Go side and this template would prevent drift if the table is ever renamed. Minor, not blocking.

`)),

// RepsetRemoveTable wraps spock.repset_remove_table. Returns void; we
// don't care about the return value, only that the call doesn't error.
RepsetRemoveTable: template.Must(template.New("repsetRemoveTable").Parse(`
SELECT spock.repset_remove_table($1, $2)
`)),

// CAUTION: SESSION-scoped, not transaction-scoped. SQL ROLLBACK does
// NOT reset this. Callers must pair $1=true with a matching $1=false
// on every code path or the pooled connection returns with repair
// mode still on, silently suppressing replication for the next user.
SetSpockRepairMode: template.Must(template.New("setSpockRepairMode").Parse(`
SELECT spock.repair_mode($1)
`)),

// CurrentWalInsertLSN returns pg_current_wal_insert_lsn() as text. Used
// at MtreeInit time, just after CREATE PUBLICATION, as the lower bound
// for any future replication start LSN. The slot's consistent point is
// guaranteed to be >= this value because the slot is created after this
// transaction commits.
CurrentWalInsertLSN: template.Must(template.New("currentWalInsertLSN").Parse(`
SELECT pg_current_wal_insert_lsn()::text
`)),
}
116 changes: 88 additions & 28 deletions internal/consistency/mtree/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,51 +807,111 @@ func (m *MerkleTreeTask) MtreeInit() (err error) {
cfg := config.Get().MTree.CDC

for _, nodeInfo := range m.ClusterNodes {
logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"])
if err := m.initOneNode(nodeInfo, cfg.PublicationName, cfg.SlotName); err != nil {
return err
}
}
return nil
}

lsn, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo)
// initOneNode runs MtreeInit on a single node in three phases:
//
// 1. Phase A (tx 1): create schema, helpers, the CDC metadata table,
// exclude it from Spock replication sets, create the publication,
// and capture the publication's commit LSN. Commit.
// 2. Phase B (no tx): create the logical replication slot via a fresh
// replication-mode connection. Its consistent point is now
// guaranteed to be > the publication's commit LSN, fixing the
// "publication does not exist" replay error.
// 3. Phase C (tx 2): persist slot/start_lsn/pub_commit_lsn into
// ace_cdc_metadata, wrapped in spock.repair_mode(true) so the
// write stays node-local while peer nodes may still have the
// metadata table in their replication set.
//
// Each iteration runs inside this function so deferred Close/Rollback
// calls fire per-node rather than at MtreeInit return.
func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, slotName string) error {
logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"])

pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts())
if err != nil {
return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err)
}
defer pool.Close()

if err := queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil {
return fmt.Errorf("failed to create schema '%s' on node %s: %w", m.aceSchema(), nodeInfo["Name"], err)
}

var pubCommitLSN string
if err := func() (err error) {
tx, err := pool.Begin(m.Ctx)
if err != nil {
return fmt.Errorf("failed to set up replication slot on node %s: %w", nodeInfo["Name"], err)
return err
}
defer tx.Rollback(m.Ctx)

pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts())
if err := queries.CreateXORFunction(m.Ctx, tx); err != nil {
return fmt.Errorf("create xor function: %w", err)
}
if err := queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil {
return fmt.Errorf("create cdc metadata table: %w", err)
}
if err := cdc.ExcludeMetadataFromSpockRepsets(m.Ctx, tx, m.aceSchema()); err != nil {
return fmt.Errorf("exclude ace_cdc_metadata from spock repsets: %w", err)
}
if err := cdc.SetupPublication(m.Ctx, tx, publicationName); err != nil {
return fmt.Errorf("setup publication: %w", err)
}
// Captured mid-tx after CREATE PUBLICATION, so the value is
// strictly less than this tx's commit LSN; Phase B's slot
// consistent_point is >= that commit LSN. listen.go's guard
// therefore catches any start LSN that predates the publication.
pubCommitLSN, err = queries.CurrentWalInsertLSN(m.Ctx, tx)
Comment thread
danolivo marked this conversation as resolved.
if err != nil {
return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err)
return fmt.Errorf("capture publication commit LSN: %w", err)
}
defer pool.Close()
return tx.Commit(m.Ctx)
}(); err != nil {
return fmt.Errorf("phase A failed on node %s: %w", nodeInfo["Name"], err)
}

if err = queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil {
return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err)
}
slotLSN, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo)
if err != nil {
return fmt.Errorf("phase B (replication slot) failed on node %s: %w", nodeInfo["Name"], err)
}

if err := func() (err error) {
tx, err := pool.Begin(m.Ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err)
return err
}
defer tx.Rollback(m.Ctx)

if err = queries.CreateXORFunction(m.Ctx, tx); err != nil {
return fmt.Errorf("failed to create xor function: %w", err)
if err := cdc.SetSpockRepairMode(m.Ctx, tx, true); err != nil {
return fmt.Errorf("enable spock repair_mode: %w", err)
}

if err = queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil {
return fmt.Errorf("failed to create cdc metadata table: %w", err)
if err := queries.InitCDCMetadata(m.Ctx, tx,
Copy link
Copy Markdown
Contributor

@ibrarahmad ibrarahmad May 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same hole as flushMetadata (listen.go:556). If InitCDCMetadata errors, Set(false) on line 899 is skipped and the pool conn returns poisoned. Please port to the borrowed-conn pattern in this PR rather than a follow-up.

publicationName, slotName,
slotLSN.String(), pubCommitLSN, []string{}); err != nil {
return fmt.Errorf("write cdc metadata: %w", err)
}

if err := cdc.SetupPublication(m.Ctx, tx, cfg.PublicationName); err != nil {
return fmt.Errorf("failed to setup publication on node %s: %w", nodeInfo["Name"], err)
if err := cdc.SetSpockRepairMode(m.Ctx, tx, false); err != nil {
return fmt.Errorf("disable spock repair_mode: %w", err)
}

if err = queries.UpdateCDCMetadata(m.Ctx, tx, cfg.PublicationName, cfg.SlotName, lsn.String(), []string{}); err != nil {
return fmt.Errorf("failed to update cdc metadata: %w", err)
return tx.Commit(m.Ctx)
}(); err != nil {
// Slot leak mitigation: Phase B created a slot that we failed to
// record. A subsequent SetupReplicationSlot will drop any prior
// slot of the same name, so a re-run reaps it — but try a best
// effort drop here too to keep the cluster tidy.
if dropErr := queries.DropReplicationSlot(m.Ctx, pool, slotName); dropErr != nil {
logger.Warn("failed to drop orphaned slot %s on node %s: %v", slotName, nodeInfo["Name"], dropErr)
}

if err := tx.Commit(m.Ctx); err != nil {
return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err)
}

logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"])
return fmt.Errorf("phase C failed on node %s: %w", nodeInfo["Name"], err)
}

logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"])
return nil
}

Expand Down Expand Up @@ -1427,7 +1487,7 @@ func (m *MerkleTreeTask) BuildMtree() (err error) {
}
defer tx.Rollback(m.Ctx)

slotName, startLSN, tables, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName)
slotName, startLSN, tables, _, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName)
if err != nil {
pool.Close()
return fmt.Errorf("failed to get cdc metadata on node %s: %w", nodeInfo["Name"], err)
Expand Down
Loading
Loading