From 98c878f096290918975500fbbc04c07211925322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20DONNART?= Date: Wed, 27 May 2026 01:01:32 +0200 Subject: [PATCH 1/3] Support per-backend role and destination in replication engine Read destination bucket and role from each backends[] entry instead of the shared top-level fields, so a single source object can be replicated to multiple CRR destinations with their own role. Legacy entries without per-backend fields keep working via top-level fallback in ObjectQueueEntry's site-aware getters; MongoQueueProcessor's oplog path now matches every applicable rule and dedups backends per the design's (site, destination, role) rule. Issue: BB-762 --- .../mongoProcessor/MongoQueueProcessor.js | 90 +++------ .../queueProcessor/QueueProcessor.js | 8 +- .../replication/tasks/MultipleBackendTask.js | 16 +- .../replication/tasks/ReplicateObject.js | 33 +++- .../utils/getLocationsFromStorageClass.js | 10 - lib/models/ObjectQueueEntry.js | 3 +- .../unit/lib/models/ObjectQueueEntry.spec.js | Bin 2339 -> 6645 bytes .../MongoQueueProcessor.spec.js | 187 ++++++++++++++++++ .../unit/replication/ReplicateObject.spec.js | 119 +++++++++++ .../getLocationsFromStorageClass.js | 23 --- 10 files changed, 370 insertions(+), 119 deletions(-) delete mode 100644 extensions/replication/utils/getLocationsFromStorageClass.js create mode 100644 tests/unit/mongoProcessor/MongoQueueProcessor.spec.js delete mode 100644 tests/unit/replication/getLocationsFromStorageClass.js diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 6a3cff545..cc2843f07 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -7,7 +7,7 @@ const errors = require('arsenal').errors; const { replicationBackends, emptyFileMd5 } = require('arsenal').constants; const MongoClient = require('arsenal').storage .metadata.mongoclient.MongoClientInterface; -const { ObjectMD } = require('arsenal').models; +const { ObjectMD, ReplicationConfiguration } = require('arsenal').models; const { VersionID } = require('arsenal').versioning; const { extractVersionId } = require('../../lib/util/versioning'); @@ -234,24 +234,6 @@ class MongoQueueProcessor { }); } - /** - * get dataStoreVersionId, if exists - * @param {ObjectMDData} objMd - object md fetched from mongo - * @param {String} site - storage location name - * @return {String} dataStoreVersionId - */ - _getDataStoreVersionId(objMd, site) { - let dataStoreVersionId = ''; - if (objMd.replicationInfo && objMd.replicationInfo.backends) { - const backend = objMd.replicationInfo.backends - .find(l => l.site === site); - if (backend && backend.dataStoreVersionId) { - dataStoreVersionId = backend.dataStoreVersionId; - } - } - return dataStoreVersionId; - } - /** * Update ingested entry metadata fields: owner-id, owner-display-name * @param {ObjectQueueEntry} entry - object queue entry object @@ -340,53 +322,33 @@ class MongoQueueProcessor { const objectMDModel = new ObjectMD(); entry.setReplicationInfo(objectMDModel.getReplicationInfo()); - // TODO: refactor based off cloudserver getReplicationInfo - if (bucketRepInfo) { - const { role, destination, rules } = bucketRepInfo; - const rule = rules.find(r => - (entry.getObjectKey().startsWith(r.prefix) && r.enabled)); - - if (rule) { - const replicationInfo = {}; - const storageTypes = []; - const backends = []; - const storageClasses = rule.storageClass.split(','); - - storageClasses.forEach(storageClass => { - const storageClassName = - storageClass.endsWith(':preferred_read') ? - storageClass.split(':')[0] : storageClass; - const location = this._bootstrapList.find(l => - (l.site === storageClassName)); - if (location && replicationBackends[location.type]) { - storageTypes.push(location.type); - } - let dataStoreVersionId = ''; - if (zenkoObjMd) { - dataStoreVersionId = this._getDataStoreVersionId( - zenkoObjMd, storageClassName); - } - backends.push({ - site: storageClassName, - status: 'PENDING', - dataStoreVersionId, - }); - }); + if (!bucketRepInfo) { + return; + } - // save updated replication info - replicationInfo.status = 'PENDING'; - replicationInfo.backends = backends; - replicationInfo.content = content; - replicationInfo.destination = destination; - replicationInfo.storageClass = storageClasses.join(','); - replicationInfo.role = role; - replicationInfo.storageType = storageTypes.join(','); - replicationInfo.isNFS = bucketInfo.isNFS(); - - // apply changes - entry.setReplicationInfo(replicationInfo); - } + const isCloud = site => { + const location = this._bootstrapList.find(l => l.site === site); + return !!(location && replicationBackends[location.type]); + }; + + const backends = ReplicationConfiguration.resolveBackends( + bucketRepInfo, + entry.getObjectKey(), + isCloud, + zenkoObjMd?.replicationInfo?.backends, + ); + + if (backends.length === 0) { + return; } + + entry.setReplicationInfo({ + status: 'PENDING', + backends, + content, + role: ReplicationConfiguration.resolveSourceRole(bucketRepInfo.role), + isNFS: bucketInfo.isNFS(), + }); } /** diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 0ad73aae2..9ea5e574a 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -18,8 +18,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry'); const TaskScheduler = require('../../../lib/tasks/TaskScheduler'); const { getTaskSchedulerQueueKey, getTaskSchedulerDedupeKey } = require('./taskSchedulerHelpers'); -const getLocationsFromStorageClass = - require('../utils/getLocationsFromStorageClass'); const ReplicateObject = require('../tasks/ReplicateObject'); const MultipleBackendTask = require('../tasks/MultipleBackendTask'); const CopyLocationTask = require('../tasks/CopyLocationTask'); @@ -881,9 +879,9 @@ class QueueProcessor extends EventEmitter { } // ignore bucket entry if echo mode disabled } else if (sourceEntry instanceof ObjectQueueEntry) { - const replicationStorageClass = - sourceEntry.getReplicationStorageClass(); - const sites = getLocationsFromStorageClass(replicationStorageClass); + const sites = sourceEntry.getReplicationBackends() + .filter(b => b.status === 'PENDING') + .map(b => b.site); if (sites.includes(this.site)) { if (this.destConfig.replicationEndpoint && replicationBackends.includes(this.destConfig.replicationEndpoint.type)) { diff --git a/extensions/replication/tasks/MultipleBackendTask.js b/extensions/replication/tasks/MultipleBackendTask.js index f0a427ccb..d27cf23dd 100644 --- a/extensions/replication/tasks/MultipleBackendTask.js +++ b/extensions/replication/tasks/MultipleBackendTask.js @@ -317,7 +317,7 @@ class MultipleBackendTask extends ReplicateObject { const command = new MultipleBackendAbortMPUCommand({ Bucket: sourceEntry.getBucket(), Key: sourceEntry.getObjectKey(), - StorageType: sourceEntry.getReplicationStorageType(), + StorageType: this._getReplicationEndpointType(), StorageClass: this.site, UploadId: uploadId, RequestUids: log.getSerializedUids(), @@ -353,7 +353,7 @@ class MultipleBackendTask extends ReplicateObject { const command = new MultipleBackendCompleteMPUCommand({ Bucket: sourceEntry.getBucket(), Key: sourceEntry.getObjectKey(), - StorageType: sourceEntry.getReplicationStorageType(), + StorageType: this._getReplicationEndpointType(), StorageClass: this.site, VersionId: sourceEntry.getEncodedVersionId() || 'null', UserMetaData: sourceEntry.getUserMetadata(), @@ -445,7 +445,7 @@ class MultipleBackendTask extends ReplicateObject { const command = new MultipleBackendPutMPUPartCommand({ Bucket: sourceEntry.getBucket(), Key: sourceEntry.getObjectKey(), - StorageType: sourceEntry.getReplicationStorageType(), + StorageType: this._getReplicationEndpointType(), StorageClass: this.site, PartNumber: partNumber, UploadId: uploadId, @@ -506,7 +506,7 @@ class MultipleBackendTask extends ReplicateObject { const command = new MultipleBackendInitiateMPUCommand({ Bucket: sourceEntry.getBucket(), Key: sourceEntry.getObjectKey(), - StorageType: sourceEntry.getReplicationStorageType(), + StorageType: this._getReplicationEndpointType(), StorageClass: this.site, VersionId: sourceEntry.getEncodedVersionId() || 'null', UserMetaData: sourceEntry.getUserMetadata(), @@ -951,7 +951,7 @@ class MultipleBackendTask extends ReplicateObject { Key: sourceEntry.getObjectKey(), CanonicalID: sourceEntry.getOwnerId(), ContentMD5: sourceEntry.getContentMd5(), - StorageType: sourceEntry.getReplicationStorageType(), + StorageType: this._getReplicationEndpointType(), StorageClass: this.site, VersionId: sourceEntry.getEncodedVersionId() || 'null', UserMetaData: sourceEntry.getUserMetadata(), @@ -1008,7 +1008,7 @@ class MultipleBackendTask extends ReplicateObject { const command = new MultipleBackendPutObjectTaggingCommand({ Bucket: sourceEntry.getBucket(), Key: sourceEntry.getObjectKey(), - StorageType: sourceEntry.getReplicationStorageType(), + StorageType: this._getReplicationEndpointType(), StorageClass: this.site, DataStoreVersionId: sourceEntry.getReplicationSiteDataStoreVersionId(this.site), @@ -1065,7 +1065,7 @@ class MultipleBackendTask extends ReplicateObject { const command = new MultipleBackendDeleteObjectTaggingCommand({ Bucket: sourceEntry.getBucket(), Key: sourceEntry.getObjectKey(), - StorageType: sourceEntry.getReplicationStorageType(), + StorageType: this._getReplicationEndpointType(), StorageClass: this.site, DataStoreVersionId: sourceEntry.getReplicationSiteDataStoreVersionId(this.site), @@ -1146,7 +1146,7 @@ class MultipleBackendTask extends ReplicateObject { const command = new MultipleBackendDeleteObjectCommand({ Bucket: sourceEntry.getBucket(), Key: sourceEntry.getObjectKey(), - StorageType: sourceEntry.getReplicationStorageType(), + StorageType: this._getReplicationEndpointType(), StorageClass: this.site, RequestUids: log.getSerializedUids(), }); diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index 34a1d2496..226d7e398 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -4,6 +4,7 @@ const { S3Client, GetBucketReplicationCommand, GetObjectCommand } = require('@aw const errors = require('arsenal').errors; const jsutil = require('arsenal').jsutil; const ObjectMDLocation = require('arsenal').models.ObjectMDLocation; +const ReplicationConfiguration = require('arsenal').models.ReplicationConfiguration; const ClientManager = require('../../../lib/clients/ClientManager'); const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy'); @@ -227,7 +228,7 @@ class ReplicateObject extends BackbeatTask { _setupRolesOnce(entry, log, cb) { log.debug('getting bucket replication', { entry: entry.getLogInfo() }); - const entryRolesString = entry.getReplicationRoles(); + const entryRolesString = entry.getReplicationRoles(this.site); let entryRoles; if (entryRolesString !== undefined) { entryRoles = entryRolesString.split(','); @@ -266,9 +267,9 @@ class ReplicateObject extends BackbeatTask { 'replication disabled for object')); } const roles = data.ReplicationConfiguration.Role.split(','); - if (roles.length !== 2) { - log.error('expecting two roles separated by a ' + - 'comma in bucket replication configuration', + if (roles.length < 1 || roles.length > 2) { + log.error('expecting one or two roles in bucket ' + + 'replication configuration', { method: 'ReplicateObject._setupRolesOnce', entry: entry.getLogInfo(), @@ -287,18 +288,36 @@ class ReplicateObject extends BackbeatTask { }); return cb(errors.BadRole); } - if (roles[1] !== entryRoles[1]) { + // Multi-destination CRR: derive the expected destination + // role for this site from the matching rule's Account + // override; legacy configs without Account fall back to + // the literal two-comma role equality check. + const matchingRule = data.ReplicationConfiguration.Rules.find( + rule => rule.Status === 'Enabled' && + entry.getObjectKey().startsWith(rule.Prefix) && + rule.Destination && + rule.Destination.StorageClass === this.site); + let expectedDestRole; + if (matchingRule && matchingRule.Destination.Account) { + expectedDestRole = ReplicationConfiguration + .resolveDestinationRole( + data.ReplicationConfiguration.Role, + matchingRule.Destination.Account); + } else { + expectedDestRole = roles[1]; + } + if (expectedDestRole !== entryRoles[1]) { log.error('role in replication entry for target does ' + 'not match role in bucket replication configuration ', { method: 'ReplicateObject._setupRolesOnce', entry: entry.getLogInfo(), entryRole: entryRoles[1], - bucketRole: roles[1], + bucketRole: expectedDestRole, }); return cb(errors.BadRole); } - return cb(null, roles[0], roles[1]); + return cb(null, entryRoles[0], entryRoles[1]); }) .catch(err => { // eslint-disable-next-line no-param-reassign diff --git a/extensions/replication/utils/getLocationsFromStorageClass.js b/extensions/replication/utils/getLocationsFromStorageClass.js deleted file mode 100644 index c8f06d7a3..000000000 --- a/extensions/replication/utils/getLocationsFromStorageClass.js +++ /dev/null @@ -1,10 +0,0 @@ -function getLocationsFromStorageClass(replicationStorageClass) { - return replicationStorageClass.split(',').map(s => { - if (s.endsWith(':preferred_read')) { - return s.split(':')[0]; - } - return s; - }); -} - -module.exports = getLocationsFromStorageClass; diff --git a/lib/models/ObjectQueueEntry.js b/lib/models/ObjectQueueEntry.js index 550e9893b..6cf8fa95e 100644 --- a/lib/models/ObjectQueueEntry.js +++ b/lib/models/ObjectQueueEntry.js @@ -183,7 +183,7 @@ class ObjectQueueEntry extends ObjectMD { const newEntry = this.clone(); newEntry .setAccountId(this.getAccountId()) - .setBucket(this.getReplicationTargetBucket()) + .setBucket(this.getReplicationTargetBucket(site)) .setReplicationSiteStatus(site, 'REPLICA') .setReplicationStatus('REPLICA'); return newEntry; @@ -218,7 +218,6 @@ class ObjectQueueEntry extends ObjectMD { .setAccountId(this.getAccountId()) .setReplicationBackends(this.getReplicationBackends().filter(o => o.site === site)) .setReplicationSiteStatus(site, 'PENDING') - .setReplicationStorageClass(site) .setReplicationStatus('PENDING'); } diff --git a/tests/unit/lib/models/ObjectQueueEntry.spec.js b/tests/unit/lib/models/ObjectQueueEntry.spec.js index 3900eea5b8641d538065da098f2e9a008aabe5b8..08d05ca1fb4d68d29f76078a5ae36a77ea0e7f2c 100644 GIT binary patch literal 6645 zcmeHLZExE)5Z-713VxG}#g3hHU8|u3Vk9lnZb_E}+lL?+Xo<45$fQeB35>x1eRrf@ zsF!$gngH7xA&pa$JRP6sx#Jx}!IdUdDJC^}MKi{V)+ZZ zVJWzKx9*d>9)U+Pok>nOyCuy{``$DIBih>`-Zfi&_x8;XqtWQi>+$}pS3i#5n&<4p zXf!5d1aquCwCOFPf#G9t!=wV=ynyWOnAe$M2f8tjQf=(oUSz0nF@&a z`RL^E_~e6ES;Ar9STn8za_Jo&TpXaYzaCv2n7^;;QzNEy!VPm^j!GWTTNSANAP7=6 zqv2|RGaB}#NSQM`p^G3GkH-OizpG@pQ`j*hdwY9!Bx)Hdi70_sSDf^%r)v=Hy4M3Y zLYYh<8k(HI_WVA1_j8VRHF3%0KqZ>F@+32+H89Xvgs3Ij_hOo+ilAE{4OGVaA6|#9ywl?yT>K?+!6E}AjS{muT40KsHW#yNcOuG z)Nus!J+iLzdj;rOoA$R&B|`<#7wV%g8BP7#ILYGoaC&N7N|?f6ukW3f@K}a0!7DbCk9N!*xWFnjc_Sec7;tXIGW_) z!kQE&+eqQD?oYRHe}8|!itVQR&#k=@Zbx_s+0$u!;-SX#_e}9@LHqc$1b$!UK+++6 zWx{9QX685p092JSN(3#Aa5B~oiw7Pfo0tg2Vks0;M2Sp>ko}Q?@9)h#IQ)`v3O3;L zCFAht)-0m^*evDvhUw|i@l;FQo8pO!0UZ8C;Z#(|Rc&-b(+r{w>e?yPk&BmD%46wS zj=v2x*Vl{cE7Vl0V>YQti@d5lGDR^Iwu_}L3r(r+R9D2FyDxgm^@Jb)qugvSBJ#fX z02*-HcGUGz)(utVg)r3CPByNE99du6k#`Fxu;-cB7RRak=))~cx-GOSN8CeML4xG- z&KCC$ouY}14%uTLrRE|_BWO7_2~$R8M;mC%OlY{|3X0ASI~9Tka)7V8bQy+{QO({< zvG`Qn)2#d)pz?mF0jZ^<-H?{V)O*QuXlf5z-7D-ZZX#XqT!2y*hz9>rP11M za;JT~ws+}(ft(_`_&96{o~}s3!!(N+iD{Cu$TaxS0c-eQ*JT7+u)BX!j*KddzMcij zNpZv0AEu1Y^n8bm9Y}Cf+b~m^yBu8&@qB=8tgey*@#3=zrY<3O1cO79T|p>mUn`PZ zxIYw`CIZTwIinGY6P89#!`xk|83YI)Q&I)w)naDY(sqDe$|)%DPb)6kr!FSWbPb)CBIZDG!;5J8=AGF&HDAX|$Of zI+Ro{{=`;3rf}Sk^G?_CEr|a%Sx0sYxxY7RZ!40-LZGs!gqz2PN^pJiJC^_)|Ia9= ja&{Oowmf(4PfO9A=DERu1$W6p*y|$zr$d7X1^)XR|LAe1 delta 17 ZcmexryjW;L*2L`-C(AG!ZxZ^-4gg3k2UY+8 diff --git a/tests/unit/mongoProcessor/MongoQueueProcessor.spec.js b/tests/unit/mongoProcessor/MongoQueueProcessor.spec.js new file mode 100644 index 000000000..92d520de2 --- /dev/null +++ b/tests/unit/mongoProcessor/MongoQueueProcessor.spec.js @@ -0,0 +1,187 @@ +const assert = require('assert'); + +const MongoQueueProcessor = + require('../../../extensions/mongoProcessor/MongoQueueProcessor'); +const ObjectQueueEntry = + require('../../../lib/models/ObjectQueueEntry'); + +function _makeProcessor(bootstrapList) { + const proc = Object.create(MongoQueueProcessor.prototype); + proc._bootstrapList = bootstrapList; + return proc; +} + +function _makeEntry(objectKey = 'docs/2024/report.pdf') { + return new ObjectQueueEntry('bucket', `${objectKey} v`, { + 'md-model-version': 2, + 'replicationInfo': { + status: '', + backends: [], + content: [], + destination: '', + storageClass: '', + role: '', + storageType: '', + }, + }); +} + +function _makeBucketInfo(repCfg) { + return { + getReplicationConfiguration: () => repCfg, + isNFS: () => false, + }; +} + +describe('MongoQueueProcessor._updateReplicationInfo', () => { + it('builds per-backend destination/role from per-rule fields', () => { + const proc = _makeProcessor([ + { site: 'crr-a', type: 'scality' }, + { site: 'crr-b', type: 'scality' }, + ]); + const entry = _makeEntry(); + const bucketInfo = _makeBucketInfo({ + role: 'arn:aws:iam::111:role/src,arn:aws:iam::000:role/repRule', + destination: 'arn:aws:s3:::legacy-bucket', + rules: [{ + id: 'r-a', prefix: '', enabled: true, priority: 1, + storageClass: 'crr-a', + destination: 'arn:aws:s3:::bucket-a', + account: '222', + }, { + id: 'r-b', prefix: '', enabled: true, priority: 2, + storageClass: 'crr-b', + destination: 'arn:aws:s3:::bucket-b', + account: '333', + }], + }); + + proc._updateReplicationInfo(entry, bucketInfo, ['DATA', 'METADATA']); + + const info = entry.getReplicationInfo(); + assert.strictEqual(info.status, 'PENDING'); + assert.strictEqual(info.role, 'arn:aws:iam::111:role/src'); + assert.strictEqual(info.destination, undefined); + const backendsBySite = Object.fromEntries( + info.backends.map(b => [b.site, b])); + assert.deepStrictEqual( + info.backends.map(b => b.site).sort(), ['crr-a', 'crr-b']); + assert.strictEqual(backendsBySite['crr-a'].destination, + 'arn:aws:s3:::bucket-a'); + assert.strictEqual(backendsBySite['crr-a'].role, + 'arn:aws:iam::222:role/repRule'); + assert.strictEqual(backendsBySite['crr-b'].destination, + 'arn:aws:s3:::bucket-b'); + assert.strictEqual(backendsBySite['crr-b'].role, + 'arn:aws:iam::333:role/repRule'); + }); + + it('derives per-rule role via account substitution when absent', () => { + const proc = _makeProcessor([{ site: 'crr-a', type: 'scality' }]); + const entry = _makeEntry(); + const bucketInfo = _makeBucketInfo({ + role: 'arn:aws:iam::111:role/src,arn:aws:iam::000:role/repRule', + rules: [{ + id: 'r-a', prefix: '', enabled: true, + storageClass: 'crr-a', + destination: 'arn:aws:s3:::bucket-a', + account: '222', + }], + }); + + proc._updateReplicationInfo(entry, bucketInfo, ['DATA']); + + const [backend] = entry.getReplicationInfo().backends; + assert.strictEqual(backend.role, 'arn:aws:iam::222:role/repRule'); + }); + + it('dedups CRR backends on (site, destination, role)', () => { + const proc = _makeProcessor([{ site: 'crr-a', type: 'scality' }]); + const entry = _makeEntry(); + const bucketInfo = _makeBucketInfo({ + role: 'arn:aws:iam::111:role/src,arn:aws:iam::000:role/x', + rules: [{ + id: 'low', prefix: '', enabled: true, priority: 1, + storageClass: 'crr-a', + destination: 'arn:aws:s3:::bucket-a', + account: '222', + }, { + id: 'high', prefix: 'docs', enabled: true, priority: 10, + storageClass: 'crr-a', + destination: 'arn:aws:s3:::bucket-a', + account: '222', + }], + }); + + proc._updateReplicationInfo(entry, bucketInfo, ['DATA']); + + const backends = entry.getReplicationInfo().backends; + assert.strictEqual(backends.length, 1); + assert.strictEqual(backends[0].role, 'arn:aws:iam::222:role/x'); + }); + + it('omits destination/role on cloud backends', () => { + const proc = _makeProcessor([{ site: 'cloud-a', type: 'aws_s3' }]); + const entry = _makeEntry(); + const bucketInfo = _makeBucketInfo({ + role: 'arn:aws:iam::111:role/src', + rules: [{ + id: 'r-a', prefix: '', enabled: true, + storageClass: 'cloud-a', + destination: 'arn:aws:s3:::ignored', + account: '222', + }], + }); + + proc._updateReplicationInfo(entry, bucketInfo, ['DATA']); + + const [backend] = entry.getReplicationInfo().backends; + assert.strictEqual(backend.site, 'cloud-a'); + assert.strictEqual(backend.destination, undefined); + assert.strictEqual(backend.role, undefined); + }); + + it('handles legacy V1 comma-separated storageClass form', () => { + const proc = _makeProcessor([ + { site: 'crr-a', type: 'scality' }, + { site: 'crr-b', type: 'scality' }, + ]); + const entry = _makeEntry(); + const bucketInfo = _makeBucketInfo({ + role: 'arn:aws:iam::111:role/src,arn:aws:iam::222:role/dst', + destination: 'arn:aws:s3:::legacy-bucket', + rules: [{ + id: 'r1', prefix: '', enabled: true, + storageClass: 'crr-a,crr-b', + }], + }); + + proc._updateReplicationInfo(entry, bucketInfo, ['DATA']); + + const sites = entry.getReplicationInfo().backends.map(b => b.site); + assert.deepStrictEqual(sites.sort(), ['crr-a', 'crr-b']); + // Each CRR backend falls back to top-level destination/role. + entry.getReplicationInfo().backends.forEach(b => { + assert.strictEqual(b.destination, 'arn:aws:s3:::legacy-bucket'); + assert.strictEqual(b.role, 'arn:aws:iam::222:role/dst'); + }); + }); + + it('produces no backends when no rule matches', () => { + const proc = _makeProcessor([{ site: 'crr-a', type: 'scality' }]); + const entry = _makeEntry('other/path'); + const bucketInfo = _makeBucketInfo({ + role: 'arn:aws:iam::111:role/src', + rules: [{ + id: 'r-a', prefix: 'docs', enabled: true, + storageClass: 'crr-a', + destination: 'arn:aws:s3:::bucket-a', + account: '222', + }], + }); + + proc._updateReplicationInfo(entry, bucketInfo, ['DATA']); + + assert.deepStrictEqual(entry.getReplicationInfo().backends, []); + }); +}); diff --git a/tests/unit/replication/ReplicateObject.spec.js b/tests/unit/replication/ReplicateObject.spec.js index 038afe021..1cef402d4 100644 --- a/tests/unit/replication/ReplicateObject.spec.js +++ b/tests/unit/replication/ReplicateObject.spec.js @@ -132,6 +132,125 @@ describe('ReplicateObject', () => { }); }); + describe('_setupRolesOnce', () => { + const ObjectQueueEntry = + require('../../../lib/models/ObjectQueueEntry'); + + function _makeEntry(backends) { + return new ObjectQueueEntry('source-bucket', 'k v', { + 'md-model-version': 2, + 'replicationInfo': { + status: 'PENDING', + content: ['DATA', 'METADATA'], + destination: '', + role: 'arn:aws:iam::111:role/src', + backends, + }, + }); + } + + it('validates per-backend role via account substitution', done => { + task.site = 'site'; + sinon.stub(task, '_setupSourceClients').returns(); + task.S3source = { + send: () => Promise.resolve({ + ReplicationConfiguration: { + Role: 'arn:aws:iam::111:role/src,arn:aws:iam::000:role/repRule', + Rules: [{ + Status: 'Enabled', + Prefix: '', + Destination: { + Bucket: 'arn:aws:s3:::bucket-a', + StorageClass: 'site', + Account: '222', + }, + }], + }, + }), + }; + const entry = _makeEntry([{ + site: 'site', status: 'PENDING', dataStoreVersionId: '', + destination: 'arn:aws:s3:::bucket-a', + role: 'arn:aws:iam::222:role/repRule', + }]); + task._setupRolesOnce(entry, fakeLogger, (err, src, dst) => { + assert.ifError(err); + assert.strictEqual(src, 'arn:aws:iam::111:role/src'); + assert.strictEqual(dst, 'arn:aws:iam::222:role/repRule'); + done(); + }); + }); + + it('rejects when per-backend role does not match substituted role', done => { + task.site = 'site'; + sinon.stub(task, '_setupSourceClients').returns(); + task.S3source = { + send: () => Promise.resolve({ + ReplicationConfiguration: { + Role: 'arn:aws:iam::111:role/src,arn:aws:iam::000:role/repRule', + Rules: [{ + Status: 'Enabled', + Prefix: '', + Destination: { + Bucket: 'arn:aws:s3:::bucket-a', + StorageClass: 'site', + Account: '222', + }, + }], + }, + }), + }; + const entry = _makeEntry([{ + site: 'site', status: 'PENDING', dataStoreVersionId: '', + destination: 'arn:aws:s3:::bucket-a', + role: 'arn:aws:iam::999:role/repRule', + }]); + task._setupRolesOnce(entry, fakeLogger, err => { + assert(err); + assert.strictEqual(err.is.BadRole, true); + done(); + }); + }); + + it('falls back to literal compare for legacy configs without Account', done => { + task.site = 'site'; + sinon.stub(task, '_setupSourceClients').returns(); + task.S3source = { + send: () => Promise.resolve({ + ReplicationConfiguration: { + Role: 'arn:aws:iam::111:role/src,arn:aws:iam::222:role/legacy', + Rules: [{ + Status: 'Enabled', + Prefix: '', + Destination: { + Bucket: 'arn:aws:s3:::bucket-a', + StorageClass: 'site', + }, + }], + }, + }), + }; + const entry = new ObjectQueueEntry('source-bucket', 'k v', { + 'md-model-version': 2, + 'replicationInfo': { + status: 'PENDING', + content: ['DATA', 'METADATA'], + destination: 'arn:aws:s3:::bucket-a', + role: 'arn:aws:iam::111:role/src,arn:aws:iam::222:role/legacy', + backends: [{ + site: 'site', status: 'PENDING', dataStoreVersionId: '', + }], + }, + }); + task._setupRolesOnce(entry, fakeLogger, (err, src, dst) => { + assert.ifError(err); + assert.strictEqual(src, 'arn:aws:iam::111:role/src'); + assert.strictEqual(dst, 'arn:aws:iam::222:role/legacy'); + done(); + }); + }); + }); + describe('_setupDestClients', () => { it('should setup destination client with proper creds when using assumeRole', () => { sinon.stub(ClientManager.prototype, 'initCredentialsManager').returns(null); diff --git a/tests/unit/replication/getLocationsFromStorageClass.js b/tests/unit/replication/getLocationsFromStorageClass.js deleted file mode 100644 index 5d45249ed..000000000 --- a/tests/unit/replication/getLocationsFromStorageClass.js +++ /dev/null @@ -1,23 +0,0 @@ -const assert = require('assert'); -const getLocationsFromStorageClass = -require('../../../extensions/replication/utils/getLocationsFromStorageClass'); - -describe('getLocationsFromStorageClass', () => { - it('should return correct locations if preferred read location is not ' + - 'specified inside the location storage class', - () => { - const replicationStorageClass = 'awslocation,gcplocation'; - const locations = getLocationsFromStorageClass(replicationStorageClass); - const expectedLocations = ['awslocation', 'gcplocation']; - assert.deepStrictEqual(locations, expectedLocations); - }); - it('should return correct locations if preferred read location is ' + - 'specified inside the location storage class', - () => { - const replicationStorageClass = - 'awslocation,gcplocation:preferred_read'; - const locations = getLocationsFromStorageClass(replicationStorageClass); - const expectedLocations = ['awslocation', 'gcplocation']; - assert.deepStrictEqual(locations, expectedLocations); - }); -}); From 83c75ef6b534778f95b14b016dc533ae45ba578e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20DONNART?= Date: Fri, 29 May 2026 15:05:19 +0200 Subject: [PATCH 2/3] Bump arsenal dependendcy Issue: BB-762 --- package.json | 2 +- yarn.lock | 22 ++++++++++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index 5d1b941d3..8a22f158a 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "@scality/cloudserverclient": "^1.0.8", "@smithy/node-http-handler": "^3.3.3", "JSONStream": "^1.3.5", - "arsenal": "git+https://github.com/scality/arsenal#8.3.9", + "arsenal": "git+https://github.com/scality/arsenal#39c1a642d77436d871305435dcf244a64423a4b8", "async": "^2.3.0", "backo": "^1.1.0", "breakbeat": "scality/breakbeat#v1.0.3", diff --git a/yarn.lock b/yarn.lock index aa740f23e..34d39c5e3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2738,6 +2738,11 @@ resolved "https://registry.yarnpkg.com/@opentelemetry/api/-/api-1.9.0.tgz#d03eba68273dc0f7509e2a3d5cba21eae10379fe" integrity sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg== +"@opentelemetry/api@^1.9.0": + version "1.9.1" + resolved "https://registry.yarnpkg.com/@opentelemetry/api/-/api-1.9.1.tgz#c1b0346de336ba55af2d5a7970882037baedec05" + integrity sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q== + "@pkgjs/parseargs@^0.11.0": version "0.11.0" resolved "https://registry.yarnpkg.com/@pkgjs/parseargs/-/parseargs-0.11.0.tgz#a77ea742fab25775145434eb1d2328cf5013ac33" @@ -2765,6 +2770,14 @@ httpagent "github:scality/httpagent#1.1.0" werelogs "github:scality/werelogs#8.2.2" +"@scality/hdclient@^1.3.2": + version "1.3.2" + resolved "https://registry.yarnpkg.com/@scality/hdclient/-/hdclient-1.3.2.tgz#544d08a5b88869a9c30107c05d774e13595966fb" + integrity sha512-voy67AlH1irNmaXno0KP/KpiEBYzzdW8EoGjBXsVztLFjGe+RQnNtAyzCn05secZZy43jBYLOrZ7032gorxvrg== + dependencies: + httpagent "github:scality/httpagent#1.1.0" + werelogs "github:scality/werelogs#8.2.2" + "@senx/warp10@^2.0.3": version "2.0.3" resolved "https://registry.yarnpkg.com/@senx/warp10/-/warp10-2.0.3.tgz#dcce3890d491c6380f2967abcf126909ed208969" @@ -4978,9 +4991,9 @@ arraybuffer.prototype.slice@^1.0.4: optionalDependencies: ioctl "^2.0.2" -"arsenal@git+https://github.com/scality/arsenal#8.3.9": - version "8.3.9" - resolved "git+https://github.com/scality/arsenal#51e5b761f7f0612a722c828fa3d43b438c50ab7c" +"arsenal@git+https://github.com/scality/arsenal#39c1a642d77436d871305435dcf244a64423a4b8": + version "8.4.3" + resolved "git+https://github.com/scality/arsenal#39c1a642d77436d871305435dcf244a64423a4b8" dependencies: "@aws-sdk/client-kms" "^3.975.0" "@aws-sdk/client-s3" "^3.975.0" @@ -4989,7 +5002,8 @@ arraybuffer.prototype.slice@^1.0.4: "@azure/identity" "^4.13.0" "@azure/storage-blob" "^12.31.0" "@js-sdsl/ordered-set" "^4.4.2" - "@scality/hdclient" "^1.3.1" + "@opentelemetry/api" "^1.9.0" + "@scality/hdclient" "^1.3.2" "@smithy/node-http-handler" "^4.3.0" "@smithy/protocol-http" "^5.3.5" JSONStream "^1.3.5" From 3426739093fc07c8f98e44103fe255a0eaac7336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20DONNART?= Date: Mon, 1 Jun 2026 14:15:33 +0200 Subject: [PATCH 3/3] fixup! Support per-backend role and destination in replication engine --- extensions/replication/tasks/MultipleBackendTask.js | 3 ++- extensions/replication/tasks/ReplicateObject.js | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions/replication/tasks/MultipleBackendTask.js b/extensions/replication/tasks/MultipleBackendTask.js index d27cf23dd..a9ee7ca7a 100644 --- a/extensions/replication/tasks/MultipleBackendTask.js +++ b/extensions/replication/tasks/MultipleBackendTask.js @@ -85,7 +85,8 @@ class MultipleBackendTask extends ReplicateObject { .then(data => { const replicationEnabled = data.ReplicationConfiguration.Rules .some(rule => rule.Status === 'Enabled' && - entry.getObjectKey().startsWith(rule.Prefix)); + entry.getObjectKey().startsWith( + rule.Filter?.Prefix ?? rule.Prefix ?? '')); if (!replicationEnabled) { errMessage = 'replication disabled for object'; log.debug(errMessage, { diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index 226d7e398..eaf4d6e65 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -255,7 +255,8 @@ class ReplicateObject extends BackbeatTask { .then(data => { const replicationEnabled = ( data.ReplicationConfiguration.Rules.some( - rule => entry.getObjectKey().startsWith(rule.Prefix) + rule => entry.getObjectKey().startsWith( + rule.Filter?.Prefix ?? rule.Prefix ?? '') && rule.Status === 'Enabled')); if (!replicationEnabled) { log.debug('replication disabled for object',