diff --git a/patches/19/pg19-015-attoptions.diff b/patches/19/pg19-015-attoptions.diff new file mode 100644 index 00000000..726b01da --- /dev/null +++ b/patches/19/pg19-015-attoptions.diff @@ -0,0 +1,199 @@ +diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c +index 50747c16396..6cdbbe1d0dd 100644 +--- a/src/backend/access/common/reloptions.c ++++ b/src/backend/access/common/reloptions.c +@@ -166,6 +166,15 @@ static relopt_bool boolRelOpts[] = + }, + true + }, ++ { ++ { ++ "log_old_value", ++ "Add old value of attribute to WAL for logical decoding", ++ RELOPT_KIND_ATTRIBUTE, ++ ShareUpdateExclusiveLock ++ }, ++ false ++ }, + /* list terminator */ + {{NULL}} + }; +@@ -557,6 +566,19 @@ static relopt_enum enumRelOpts[] = + + static relopt_string stringRelOpts[] = + { ++ { ++ { ++ "delta_apply_function", ++ "Function called to perform delta conflict avoidance", ++ RELOPT_KIND_ATTRIBUTE, ++ ShareUpdateExclusiveLock ++ }, ++ -1, ++ true, ++ NULL, ++ NULL, ++ NULL ++ }, + /* list terminator */ + {{NULL}} + }; +@@ -2106,7 +2128,9 @@ attribute_reloptions(Datum reloptions, bool validate) + { + static const relopt_parse_elt tab[] = { + {"n_distinct", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct)}, +- {"n_distinct_inherited", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct_inherited)} ++ {"n_distinct_inherited", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct_inherited)}, ++ {"log_old_value", RELOPT_TYPE_BOOL, offsetof(AttributeOpts, log_old_value)}, ++ {"delta_apply_function", RELOPT_TYPE_STRING, offsetof(AttributeOpts, delta_apply_function)} + }; + + return (bytea *) build_reloptions(reloptions, validate, +diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c +index 0dcd6ee817e..dcf26167cae 100644 +--- a/src/backend/access/heap/heapam.c ++++ b/src/backend/access/heap/heapam.c +@@ -52,4 +52,5 @@ + #include "storage/procarray.h" ++#include "utils/attoptcache.h" + #include "utils/datum.h" + #include "utils/injection_point.h" + #include "utils/inval.h" +@@ -67,6 +68,7 @@ static void check_lock_if_inplace_updateable_rel(Relation relation, + HeapTuple newtup); + static void check_inplace_rel_lock(HeapTuple oldtup); + #endif ++static Bitmapset *HeapDetermineLogOldColumns(Relation relation); + static Bitmapset *HeapDetermineColumnsInfo(Relation relation, + Bitmapset *interesting_cols, + Bitmapset *external_cols, +@@ -104,6 +106,7 @@ static void index_delete_sort(TM_IndexDeleteOp *delstate); + static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); + static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); + static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, ++ Bitmapset *logged_old_attrs, + bool *copy); + + +@@ -3016,8 +3019,8 @@ l1: + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + */ +- old_key_tuple = walLogical ? +- ExtractReplicaIdentity(relation, &tp, true, &old_key_copied) : NULL; ++ old_key_tuple = walLogical ? ++ ExtractReplicaIdentity(relation, &tp, true, NULL, &old_key_copied) : NULL; + + /* + * If this is the first possibly-multixact-able operation in the current +@@ -3249,6 +3252,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, + Bitmapset *id_attrs; + Bitmapset *interesting_attrs; + Bitmapset *modified_attrs; ++ Bitmapset *logged_old_attrs; + ItemId lp; + HeapTupleData oldtup; + HeapTuple heaptup; +@@ -3419,6 +3423,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, + id_attrs, &oldtup, + newtup, &id_has_external); + ++ if (!IsCatalogRelationOid(relation->rd_id)) ++ logged_old_attrs = HeapDetermineLogOldColumns(relation); ++ else ++ /* No need to log old values for catalog tables */ ++ logged_old_attrs = NULL; ++ + /* + * If we're not updating any "key" column, we can grab a weaker lock type. + * This allows for more concurrency when we are running simultaneously +@@ -3692,6 +3702,7 @@ l2: + bms_free(key_attrs); + bms_free(id_attrs); + bms_free(modified_attrs); ++ bms_free(logged_old_attrs); + bms_free(interesting_attrs); + return result; + } +@@ -4041,6 +4052,7 @@ l2: + old_key_tuple = ExtractReplicaIdentity(relation, &oldtup, + bms_overlap(modified_attrs, id_attrs) || + id_has_external, ++ logged_old_attrs, + &old_key_copied); + + /* NO EREPORT(ERROR) from here till changes are logged */ +@@ -4207,6 +4219,7 @@ l2: + bms_free(key_attrs); + bms_free(id_attrs); + bms_free(modified_attrs); ++ bms_free(logged_old_attrs); + bms_free(interesting_attrs); + + return TM_Ok; +@@ -4379,6 +4392,26 @@ heap_attr_equals(TupleDesc tupdesc, int attrnum, Datum value1, Datum value2, + } + } + ++static Bitmapset * ++HeapDetermineLogOldColumns(Relation relation) ++{ ++ int attnum; ++ Bitmapset *logged_cols = NULL; ++ TupleDesc tupdesc = RelationGetDescr(relation); ++ AttributeOpts *aopt; ++ ++ for (attnum = 1; attnum <= tupdesc->natts; attnum++) ++ { ++ aopt = get_attribute_options(relation->rd_id, attnum); ++ if (aopt != NULL && aopt->log_old_value) ++ logged_cols = bms_add_member(logged_cols, ++ attnum - ++ FirstLowInvalidHeapAttributeNumber); ++ } ++ ++ return logged_cols; ++} ++ + /* + * Check which columns are being updated. + * +@@ -9132,6 +9165,7 @@ log_heap_new_cid(Relation relation, HeapTuple tup) + */ + static HeapTuple + ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, ++ Bitmapset *logged_old_attrs, + bool *copy) + { + TupleDesc desc = RelationGetDescr(relation); +@@ -9164,13 +9198,16 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, + } + + /* if the key isn't required and we're only logging the key, we're done */ +- if (!key_required) ++ if (!key_required && logged_old_attrs == NULL) + return NULL; + + /* find out the replica identity columns */ + idattrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + ++ /* merge the columns that are marked LOG_OLD_VALUE */ ++ idattrs = bms_union(idattrs, logged_old_attrs); ++ + /* + * If there's no defined replica identity columns, treat as !key_required. + * (This case should not be reachable from heap_update, since that should +diff --git a/src/include/utils/attoptcache.h b/src/include/utils/attoptcache.h +index f684a772af5..6c965fede13 100644 +--- a/src/include/utils/attoptcache.h ++++ b/src/include/utils/attoptcache.h +@@ -21,6 +21,8 @@ typedef struct AttributeOpts + int32 vl_len_; /* varlena header (do not touch directly!) */ + float8 n_distinct; + float8 n_distinct_inherited; ++ bool log_old_value; ++ Oid delta_apply_function; + } AttributeOpts; + + extern AttributeOpts *get_attribute_options(Oid attrelid, int attnum); diff --git a/patches/19/pg19-025-logical_commit_clock.diff b/patches/19/pg19-025-logical_commit_clock.diff new file mode 100644 index 00000000..f64e83cb --- /dev/null +++ b/patches/19/pg19-025-logical_commit_clock.diff @@ -0,0 +1,354 @@ +diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c +index b885513f765..62219cc44eb 100644 +--- a/src/backend/access/transam/xact.c ++++ b/src/backend/access/transam/xact.c +@@ -135,6 +135,15 @@ static TransactionId *ParallelCurrentXids; + */ + int MyXactFlags; + ++/* ++ * Spock: ++ * Incoming remote commit timestamp used in our monotonically increasing ++ * logical clock. This will be set by a logical replication apply worker ++ * to bump our local logical clock forward in case we receive a remote ++ * transaction that appears to have happened in the future. ++ */ ++TimestampTz remoteTransactionStopTimestamp = 0; ++ + /* + * transaction states - transaction state from server perspective + */ +@@ -371,6 +380,13 @@ static void ShowTransactionStateRec(const char *str, TransactionState s); + static const char *BlockStateAsString(TBlockState blockState); + static const char *TransStateAsString(TransState state); + ++/* ++ * Spock: ++ * Hook function to be called while holding the WAL insert spinlock ++ * to adjust commit timestamps via Lamport clock if needed. ++ */ ++static void EnsureMonotonicTransactionStopTimestamp(void *data); ++ + + /* ---------------------------------------------------------------- + * transaction state accessors +@@ -2198,6 +2214,13 @@ StartTransaction(void) + /* Mark xactStopTimestamp as unset. */ + xactStopTimestamp = 0; + ++ /* ++ * Spock: ++ * Reset the remoteTransactionStopTimestamp in case we are a ++ * replication apply worker. ++ */ ++ remoteTransactionStopTimestamp = 0; ++ + /* + * initialize other subsystems for new transaction + */ +@@ -2215,6 +2238,13 @@ StartTransaction(void) + if (TransactionTimeout > 0) + enable_timeout_after(TRANSACTION_TIMEOUT, TransactionTimeout); + ++ /* ++ * Spock: ++ * Reset XLogReserveInsertHook ++ */ ++ XLogReserveInsertHook = NULL; ++ XLogReserveInsertHookData = NULL; ++ + ShowTransactionState("StartTransaction"); + } + +@@ -5830,6 +5860,7 @@ XactLogCommitRecord(TimestampTz commit_time, + xl_xact_twophase xl_twophase; + xl_xact_origin xl_origin; + uint8 info; ++ XLogRecPtr result; + + Assert(CritSectionCount > 0); + +@@ -5973,7 +6004,20 @@ XactLogCommitRecord(TimestampTz commit_time, + /* we allow filtering by xacts */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + +- return XLogInsert(RM_XACT_ID, info); ++ /* ++ * Spock: ++ * Install our hook for the call to XLogInsert() so that we can ++ * modify the xactStopTimestamp and the xact_time of the xlrec ++ * while holding the lock that determines the commit-LSN to ensure ++ * the commit timestamps are monotonically increasing. ++ */ ++ XLogReserveInsertHook = EnsureMonotonicTransactionStopTimestamp; ++ XLogReserveInsertHookData = (void *)&xlrec; ++ result = XLogInsert(RM_XACT_ID, info); ++ XLogReserveInsertHook = NULL; ++ XLogReserveInsertHookData = NULL; ++ ++ return result; + } + + /* +@@ -6444,3 +6488,60 @@ xact_redo(XLogReaderState *record) + else + elog(PANIC, "xact_redo: unknown op code %u", info); + } ++ ++/* ++ * Spock: ++ * Hook function used in XactLogCommitRecord() to ensure that the ++ * commit timestamp is monotonically increasing in commit-LSN order. ++ */ ++static void ++EnsureMonotonicTransactionStopTimestamp(void *data) ++{ ++ xl_xact_commit *xlrec = (xl_xact_commit *)data; ++ TimestampTz logical_clock; ++ ++ logical_clock = XLogGetLastTransactionStopTimestamp(); ++ ++ if (remoteTransactionStopTimestamp != 0) ++ { ++ /* ++ * We are committing a replication apply worker transaction. ++ * In this case we only make sure that the logical clock is ++ * max(logical_clock, new xact_time, remote_xact_time). ++ * This is sufficient because the apply logic will use the ++ * tracked remote timestamp or the delta apply tracking data ++ * in the future, so no need to adjust the timestamp of the ++ * replication transaction itself. ++ */ ++ if (xlrec->xact_time > logical_clock) ++ logical_clock = xlrec->xact_time; ++ if (remoteTransactionStopTimestamp > logical_clock) ++ logical_clock = remoteTransactionStopTimestamp; ++ } ++ else ++ { ++ /* ++ * This is a local transaction. Make sure that the xact_time ++ * higher than any timestamp we have seen thus far. ++ * ++ * TODO: This is not postmaster restart safe. If the local ++ * system clock is further behind other nodes than it takes ++ * for the postmaster to restart (time between it stops ++ * accepting new transactions and time when it becomes ready ++ * to accept new transactions), local transactions will not ++ * be bumped into the future correctly. ++ */ ++ if (logical_clock >= xlrec->xact_time) ++ { ++ logical_clock++; ++ xlrec->xact_time = logical_clock; ++ xactStopTimestamp = logical_clock; ++ ++ XLogReserveInsertHookModifiedRecord = true; ++ } ++ else ++ logical_clock = xlrec->xact_time; ++ } ++ ++ XLogSetLastTransactionStopTimestamp(logical_clock); ++} +diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c +index 596d2ca5836..ee52552a79f 100644 +--- a/src/backend/access/transam/xlog.c ++++ b/src/backend/access/transam/xlog.c +@@ -156,6 +156,16 @@ int wal_segment_size = DEFAULT_XLOG_SEG_SIZE; + */ + int CheckPointSegments; + ++/* ++ * Spock: ++ * Hook used to ensure commit timestamps are monotonically increasing ++ * in commit-LSN order and a flag that the tells if the hook changed ++ * the record itself requiring to correct the CRC. ++ */ ++XLogReserveInsertHookType XLogReserveInsertHook = NULL; ++void *XLogReserveInsertHookData = NULL; ++bool XLogReserveInsertHookModifiedRecord = false; ++ + /* Estimated distance between checkpoints, in bytes */ + static double CheckPointDistanceEstimate = 0; + static double PrevCheckPointDistance = 0; +@@ -572,4 +572,12 @@ typedef struct XLogCtlData + slock_t info_lck; /* locks shared variables shown above */ ++ ++ /* ++ * Spock: ++ * This is our shared, logical clock that we use to force ++ * commit timestamps to be monotonically increasing in ++ * commit-LSN order. ++ */ ++ TimestampTz lastTransactionStopTimestamp; + } XLogCtlData; + + /* +@@ -701,6 +719,7 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch, + XLogRecData *rdata, + XLogRecPtr StartPos, XLogRecPtr EndPos, + TimeLineID tli); ++static void XLogRecordCorrectCRC(XLogRecData *rdata); + static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, + XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); + static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, +@@ -779,6 +798,13 @@ XLogInsertRecord(XLogRecData *rdata, + if (!XLogInsertAllowed()) + elog(ERROR, "cannot make new WAL entries during recovery"); + ++ /* ++ * Spock: ++ * Make sure the flag telling that ReserveXLog...() modified the ++ * record is false at this point. ++ */ ++ XLogReserveInsertHookModifiedRecord = false; ++ + /* + * Given that we're not in recovery, InsertTimeLineID is set and can't + * change, so we can read it without a lock. +@@ -907,6 +933,17 @@ XLogInsertRecord(XLogRecData *rdata, + + if (inserted) + { ++ /* ++ * Spock: ++ * If our logical_clock hook modified the XLog Record, ++ * recalculate the CRC. ++ */ ++ if (XLogReserveInsertHookModifiedRecord) ++ { ++ XLogRecordCorrectCRC(rdata); ++ XLogReserveInsertHookModifiedRecord = false; ++ } ++ + /* + * Now that xl_prev has been filled in, calculate CRC of the record + * header. +@@ -1090,6 +1127,27 @@ XLogInsertRecord(XLogRecData *rdata, + return EndPos; + } + ++/* ++ * Spock: ++ * Function to recalculate the WAL Record's CRC in case it was ++ * altered to ensure a monotonically increasing commit timestamp ++ * in LSN order. ++ */ ++static void ++XLogRecordCorrectCRC(XLogRecData *rdata) ++{ ++ XLogRecData *rdt; ++ XLogRecord *rechdr = (XLogRecord *) rdata->data; ++ pg_crc32c rdata_crc; ++ ++ INIT_CRC32C(rdata_crc); ++ COMP_CRC32C(rdata_crc, ((char *) rechdr) + SizeOfXLogRecord, rdata->len - SizeOfXLogRecord); ++ for (rdt = rdata->next; rdt != NULL; rdt = rdt->next) ++ COMP_CRC32C(rdata_crc, rdt->data, rdt->len); ++ ++ rechdr->xl_crc = rdata_crc; ++} ++ + /* + * Reserves the right amount of space for a record of given size from the WAL. + * *StartPos is set to the beginning of the reserved section, *EndPos to +@@ -1134,6 +1192,13 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, + */ + SpinLockAcquire(&Insert->insertpos_lck); + ++ /* ++ * Spock: ++ * If set call the XLogReserveInsertHook function ++ */ ++ if (XLogReserveInsertHook != NULL) ++ XLogReserveInsertHook(XLogReserveInsertHookData); ++ + startbytepos = Insert->CurrBytePos; + endbytepos = startbytepos + size; + prevbytepos = Insert->PrevBytePos; +@@ -1193,6 +1258,12 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) + return false; + } + ++ /* Spock: ++ * If set call the XLogReserveInsertHook function ++ */ ++ if (XLogReserveInsertHook != NULL) ++ XLogReserveInsertHook(XLogReserveInsertHookData); ++ + endbytepos = startbytepos + size; + prevbytepos = Insert->PrevBytePos; + +@@ -9564,3 +9635,15 @@ SetWalWriterSleeping(bool sleeping) + XLogCtl->WalWriterSleeping = sleeping; + SpinLockRelease(&XLogCtl->info_lck); + } ++ ++extern TimestampTz ++XLogGetLastTransactionStopTimestamp(void) ++{ ++ return XLogCtl->lastTransactionStopTimestamp; ++} ++ ++extern void ++XLogSetLastTransactionStopTimestamp(TimestampTz ts) ++{ ++ XLogCtl->lastTransactionStopTimestamp = ts; ++} +diff --git a/src/include/access/xact.h b/src/include/access/xact.h +index b2bc10ee041..0ab9948b4cf 100644 +--- a/src/include/access/xact.h ++++ b/src/include/access/xact.h +@@ -95,6 +95,13 @@ extern PGDLLIMPORT bool bsysscan; + */ + extern PGDLLIMPORT int MyXactFlags; + ++/* ++ * Spock: ++ * Incoming remote commit timestamp used in our monotonically increasing ++ * logical clock. ++ */ ++extern PGDLLIMPORT TimestampTz remoteTransactionStopTimestamp; ++ + /* + * XACT_FLAGS_ACCESSEDTEMPNAMESPACE - set when a temporary object is accessed. + * We don't allow PREPARE TRANSACTION in that case. +diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h +index adddac6710e..99864217c5c 100644 +--- a/src/include/access/xlog.h ++++ b/src/include/access/xlog.h +@@ -199,6 +199,18 @@ typedef enum WALAvailability + struct XLogRecData; + struct XLogReaderState; + ++/* ++ * Spock: ++ * Hook called from inside of holding the lock that determines ++ * the LSN order of commit records. We use this to ensure that ++ * commit timestamps are monotonically increasing in their LSN ++ * order. ++ */ ++typedef void (*XLogReserveInsertHookType)(void *data); ++extern XLogReserveInsertHookType XLogReserveInsertHook; ++extern void *XLogReserveInsertHookData; ++extern bool XLogReserveInsertHookModifiedRecord; ++ + extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, + XLogRecPtr fpw_lsn, + uint8 flags, +@@ -272,6 +284,14 @@ extern bool IsInstallXLogFileSegmentActive(void); + extern void ResetInstallXLogFileSegmentActive(void); + extern void XLogShutdownWalRcv(void); + ++/* ++ * Spock: ++ * Functions to access the last commit Lamport timestamp held in ++ * XLogCtl. ++ */ ++extern TimestampTz XLogGetLastTransactionStopTimestamp(void); ++extern void XLogSetLastTransactionStopTimestamp(TimestampTz tz); ++ + /* + * Routines to start, stop, and get status of a base backup. + */ diff --git a/patches/19/pg19-030-per-subtrans-commit-ts.diff b/patches/19/pg19-030-per-subtrans-commit-ts.diff new file mode 100644 index 00000000..e9e9ab53 --- /dev/null +++ b/patches/19/pg19-030-per-subtrans-commit-ts.diff @@ -0,0 +1,237 @@ +diff --git a/src/backend/access/rmgrdesc/committsdesc.c b/src/backend/access/rmgrdesc/committsdesc.c +index a6ab9dd78de..45b57d9dade 100644 +--- a/src/backend/access/rmgrdesc/committsdesc.c ++++ b/src/backend/access/rmgrdesc/committsdesc.c +@@ -37,6 +37,13 @@ commit_ts_desc(StringInfo buf, XLogReaderState *record) + appendStringInfo(buf, "pageno %" PRId64 ", oldestXid %u", + trunc->pageno, trunc->oldestXid); + } ++ else if (info == COMMIT_TS_SUBTRANS_TS) ++ { ++ SubTransactionCommitTsEntry *entry = (SubTransactionCommitTsEntry *)rec; ++ ++ appendStringInfo(buf, "xid %u, time "INT64_FORMAT", nodeid %d", ++ entry->xid, entry->time, entry->nodeid); ++ } + } + + const char * +@@ -48,6 +55,8 @@ commit_ts_identify(uint8 info) + return "ZEROPAGE"; + case COMMIT_TS_TRUNCATE: + return "TRUNCATE"; ++ case COMMIT_TS_SUBTRANS_TS: ++ return "SUBTRANS_TS"; + default: + return NULL; + } +diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c +index 225ff7ca9f2..4235350db90 100644 +--- a/src/backend/access/transam/commit_ts.c ++++ b/src/backend/access/transam/commit_ts.c +@@ -97,0 +97,7 @@ static SlruDesc CommitTsSlruDesc; ++/* ++ * Data to override CommitTsData for individual subtransaction. ++ * This is needed for pgEdge Delta Apply CommitTs tracking. ++ */ ++static SubTransactionCommitTsEntry *sub_trans_commit_ts_data = NULL; ++static int sub_trans_commit_n_alloc = 0; ++static int sub_trans_commit_n_used = 0; +@@ -130,4 +130,6 @@ static void ActivateCommitTs(void); + static void DeactivateCommitTs(void); + static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid); ++static void WriteSubTransTsXlogRec(TransactionId xid, TimestampTz ts, ++ ReplOriginId nodeid); + + /* +@@ -212,6 +222,15 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, + if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact)) + TransamVariables->newestCommitTsXid = newestXact; + LWLockRelease(CommitTsLock); ++ ++ /* Cleanup subtransaction commit ts override data */ ++ if (sub_trans_commit_ts_data != NULL) ++ { ++ pfree(sub_trans_commit_ts_data); ++ sub_trans_commit_ts_data = NULL; ++ sub_trans_commit_n_used = 0; ++ sub_trans_commit_n_alloc = 0; ++ } + } + + /* +@@ -251,12 +270,24 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, + { + int entryno = TransactionIdToCTsEntry(xid); + CommitTimestampEntry entry; ++ int i; + + Assert(TransactionIdIsNormal(xid)); + + entry.time = ts; + entry.nodeid = nodeid; + ++ /* Override the time and nodeid if an individual entry was recorded */ ++ for (i = 0; i < sub_trans_commit_n_used; i++) ++ { ++ if (sub_trans_commit_ts_data[i].xid == xid) ++ { ++ entry.time = sub_trans_commit_ts_data[i].time; ++ entry.nodeid = sub_trans_commit_ts_data[i].nodeid; ++ break; ++ } ++ } ++ + memcpy(CommitTsCtl->shared->page_buffer[slotno] + + SizeOfCommitTimestampEntry * entryno, + &entry, SizeOfCommitTimestampEntry); +@@ -377,6 +408,60 @@ GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid) + return xid; + } + ++/* ++ * Record a different CommitTsData entry for a given subtransaction ++ * ++ * pgEdge uses this in Spock to track the correct commit ts and origin ++ * in case a delta apply had to force an update to a row that would ++ * otherwise not be updated because last-update-wins found in favor of ++ * the existing local row. ++ */ ++void ++SubTransactionIdSetCommitTsData(TransactionId xid, TimestampTz ts, ++ ReplOriginId nodeid) ++{ ++ SubTransactionCommitTsEntry *ent; ++ ++ /* Ensure we have space in the tracking array */ ++ if (sub_trans_commit_n_used >= sub_trans_commit_n_alloc) ++ { ++ /* ++ * We allocate this in the top memory context. This could accumulate ++ * if transactions over and over record entries and then abort. ++ * Under the Spock apply worker that cannot happen as such error ++ * condition would restart the backend. ++ */ ++ MemoryContext oldcontext; ++ ++ oldcontext = MemoryContextSwitchTo(TopMemoryContext); ++ ++ if (sub_trans_commit_ts_data == NULL) ++ { ++ sub_trans_commit_n_alloc = 32; ++ sub_trans_commit_ts_data = (SubTransactionCommitTsEntry *) ++ palloc(sizeof(SubTransactionCommitTsEntry) * sub_trans_commit_n_alloc); ++ } ++ else ++ { ++ sub_trans_commit_n_alloc *= 2; ++ sub_trans_commit_ts_data = (SubTransactionCommitTsEntry *) ++ repalloc(sub_trans_commit_ts_data, ++ sizeof(SubTransactionCommitTsEntry) * sub_trans_commit_n_alloc); ++ } ++ ++ MemoryContextSwitchTo(oldcontext); ++ } ++ ++ ent = &sub_trans_commit_ts_data[sub_trans_commit_n_used++]; ++ ++ ent->xid = xid; ++ ent->time = ts; ++ ent->nodeid = nodeid; ++ ++ if (!RecoveryInProgress()) ++ WriteSubTransTsXlogRec(xid, ts, nodeid); ++} ++ + static void + error_commit_ts_disabled(void) + { +@@ -1016,6 +1101,23 @@ WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid) + (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE); + } + ++/* ++ * Write a SUBTRANS_TS xlog record ++ */ ++static void ++WriteSubTransTsXlogRec(TransactionId xid, TimestampTz time, ReplOriginId nodeid) ++{ ++ SubTransactionCommitTsEntry entry; ++ ++ entry.xid = xid; ++ entry.time = time; ++ entry.nodeid = nodeid; ++ ++ XLogBeginInsert(); ++ XLogRegisterData((char *) (&entry), sizeof(entry)); ++ (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SUBTRANS_TS); ++} ++ + /* + * CommitTS resource manager's routines + */ +@@ -1059,6 +1161,19 @@ commit_ts_redo(XLogReaderState *record) + + SimpleLruTruncate(CommitTsCtl, trunc->pageno); + } ++ else if (info == COMMIT_TS_SUBTRANS_TS) ++ { ++ SubTransactionCommitTsEntry entry; ++ ++ /* ++ * Redo of the commit record does also restore the commit_ts data, ++ * including for all subtransactions. We need to create the same ++ * override information as done during the original replication ++ * transaction on delta-apply. ++ */ ++ memcpy(&entry, XLogRecGetData(record), sizeof(entry)); ++ SubTransactionIdSetCommitTsData(entry.xid, entry.time, entry.nodeid); ++ } + else + elog(PANIC, "commit_ts_redo: unknown op code %u", info); + } +diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h +index b8294e41b97..dacade2a216 100644 +--- a/src/include/access/commit_ts.h ++++ b/src/include/access/commit_ts.h +@@ -1,3 +1,4 @@ ++ + /* + * commit_ts.h + * +@@ -16,6 +17,21 @@ + #include "replication/origin.h" + #include "storage/sync.h" + ++/* ++ * pgEdge requires to override the CommitTimestampEntry for individual ++ * subtransactions in the case that delta-apply needs to override losing ++ * last-update-wins. spock_apply_heap will then perform this (rare) ++ * update in a subtransaction so that this individual row gets its own ++ * xmin. By overriding this xid's CommitTsData the row can retain its ++ * original CommitTsData and not assume that of the overall replication ++ * transaction. ++ */ ++typedef struct SubTransactionCommitTsEntry ++{ ++ TransactionId xid; ++ TimestampTz time; ++ ReplOriginId nodeid; ++} SubTransactionCommitTsEntry; + + extern PGDLLIMPORT bool track_commit_timestamp; + +@@ -43,4 +43,7 @@ extern bool TransactionIdGetCommitTsData(TransactionId xid, + extern TransactionId GetLatestCommitTsData(TimestampTz *ts, + ReplOriginId *nodeid); ++extern void SubTransactionIdSetCommitTsData(TransactionId xid, ++ TimestampTz ts, ++ ReplOriginId nodeid); + + extern void BootStrapCommitTs(void); +@@ -60,4 +60,5 @@ extern int committssyncfiletag(const FileTag *ftag, char *path); + #define COMMIT_TS_ZEROPAGE 0x00 + #define COMMIT_TS_TRUNCATE 0x10 ++#define COMMIT_TS_SUBTRANS_TS 0x20 + + typedef struct xl_commit_ts_truncate diff --git a/patches/19/pg19-035-row-filter-check.diff b/patches/19/pg19-035-row-filter-check.diff new file mode 100644 index 00000000..0d1fee03 --- /dev/null +++ b/patches/19/pg19-035-row-filter-check.diff @@ -0,0 +1,25 @@ +diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c +index 803c26ab216..a60f9f1d0d8 100644 +--- a/src/backend/commands/publicationcmds.c ++++ b/src/backend/commands/publicationcmds.c +@@ -683,7 +683,7 @@ check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate) + * + * See check_simple_rowfilter_expr_walker for details. + */ +-static bool ++bool + check_simple_rowfilter_expr(Node *node, ParseState *pstate) + { + return check_simple_rowfilter_expr_walker(node, pstate); +diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h +index f90cf1ef896..c7ce64dc7f2 100644 +--- a/src/include/commands/publicationcmds.h ++++ b/src/include/commands/publicationcmds.h +@@ -39,5 +39,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation, + bool *invalid_column_list, + bool *invalid_gen_col); + extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables); ++extern bool check_simple_rowfilter_expr(Node *node, ParseState *pstate); ++ + + #endif /* PUBLICATIONCMDS_H */ diff --git a/patches/19/pg19-090-init_template_fix.diff b/patches/19/pg19-090-init_template_fix.diff new file mode 100644 index 00000000..912a36b2 --- /dev/null +++ b/patches/19/pg19-090-init_template_fix.diff @@ -0,0 +1,22 @@ +diff --git a/src/Makefile.global.in b/src/Makefile.global.in +index 8b1b357beaa..227004fcd36 100644 +--- a/src/Makefile.global.in ++++ b/src/Makefile.global.in +@@ -425,10 +425,15 @@ ld_library_path_var = LD_LIBRARY_PATH + # with_temp_install_extra is for individual ports to define if they + # need something more here. If not defined then the expansion does + # nothing. +-with_temp_install = \ ++ifdef NO_TEMP_INSTALL ++with_temp_install = ++else ++with_temp_install = INITDB_TEMPLATE='$(abs_top_builddir)'/tmp_install/initdb-template ++endif ++ ++with_temp_install += \ + PATH="$(abs_top_builddir)/tmp_install$(bindir):$(CURDIR):$$PATH" \ + $(call add_to_path,$(strip $(ld_library_path_var)),$(abs_top_builddir)/tmp_install$(libdir)) \ +- INITDB_TEMPLATE='$(abs_top_builddir)'/tmp_install/initdb-template \ + $(with_temp_install_extra) + + temp-install: | submake-generated-headers