Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ const constants = {
'output.format.value': 'json',
'value.converter.schemas.enable': false,
'value.converter': 'org.apache.kafka.connect.storage.StringConverter',
// Kafka message key config
// The message key is set to only contain the bucket where the event happend.
// This will make events of the same bucket always land in the same partition
// as they will have the same key
'output.format.key': 'schema',
'output.schema.key': JSON.stringify({
type: 'record',
Expand All @@ -42,22 +38,8 @@ const constants = {
}],
}, 'null'],
}, {
name: 'fullDocument',
type: [{
type: 'record',
name: 'fullDocumentRecord',
fields: [{
name: 'value',
type: [{
type: 'record',
name: 'valueRecord',
fields: [{
name: 'key',
type: ['string', 'null'],
}],
}, 'null'],
}],
}, 'null'],
name: 'key',
type: ['string', 'null'],
Comment thread
delthas marked this conversation as resolved.
}],
}),
},
Expand Down
15 changes: 15 additions & 0 deletions extensions/oplogPopulator/pipeline/PipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ class PipelineFactory {
}
const pipeline = [
stage,
// Synthesise a top-level 'key' from fullDocument.value.key
// (insert/replace) or updateDescription.updatedFields.value.key
// (update). Relies on object MD writes always $set-ing the whole
// 'value' subdocument; a partial dotted $set would mis-partition.
// See BB-768 (superseded SMT alternative in PR #2741).
{
$addFields: {
key: {
$ifNull: [
'$fullDocument.value.key',
'$updateDescription.updatedFields.value.key',
],
},
},
},
];
if (this._locationStrippingBytesThreshold > 0) {
pipeline.push({
Expand Down
38 changes: 38 additions & 0 deletions lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
const { ZenkoMetrics } = require('arsenal').metrics;

// Counts consumed oplog events that downstream queue populators will
// process but that lack a top-level 'key' field — i.e., the synthesised
// partitioning key produced by the connector pipeline (see BB-768).
// Should stay at zero in steady-state; a non-zero rate signals that
// metadata grew a write path that doesn't $set the whole 'value'
// subdocument, regressing the per-object partitioning fix.
const oplogEventMissingKey = ZenkoMetrics.createCounter({
name: 's3_oplog_event_missing_key_total',
help: 'Total number of oplog events processed by queue populators ' +
'with the synthesised top-level "key" field missing or null',
labelNames: ['opType'],
});

class KafkaLogConsumerMetrics {
static onMissingKey(log, opType) {
try {
oplogEventMissingKey.inc({
opType: opType || 'unknown',
});
} catch (err) {
KafkaLogConsumerMetrics.handleError(
log, err, 'KafkaLogConsumerMetrics.onMissingKey');
}
}

static handleError(log, err, method) {
if (log && log.error) {
log.error('failed to update prometheus metrics', {
method,
error: err.message,
});
}
}
}

module.exports = KafkaLogConsumerMetrics;
5 changes: 5 additions & 0 deletions lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const stream = require('stream');
const KafkaLogConsumerMetrics = require('./KafkaLogConsumerMetrics');

class ListRecordStream extends stream.Transform {
/**
Expand Down Expand Up @@ -122,6 +123,10 @@ class ListRecordStream extends stream.Transform {
// skipping empty events
return callback(null, null);
}
if (changeStreamDocument.key === null || changeStreamDocument.key === undefined) {
KafkaLogConsumerMetrics.onMissingKey(
this._logger, changeStreamDocument.operationType);
}
const opType = this._getType(changeStreamDocument.operationType, objectMd);
const streamObject = {
// timestamp of the kafka message
Expand Down
45 changes: 44 additions & 1 deletion tests/functional/oplogPopulator/PipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ describe('PipelineFactory', function () {
const collectionName = 'test-pipeline-stripping';
let collection;
let setStage;
let addFieldsStage;

before(async () => {
await client.connect();
collection = db.collection(collectionName);

const factory = new MultipleBucketsPipelineFactory(THRESHOLD);
const pipeline = JSON.parse(factory.getPipeline(['test-bucket']));
setStage = pipeline[1];
addFieldsStage = pipeline[1];
setStage = pipeline[2];
});

afterEach(async () => {
Expand Down Expand Up @@ -147,4 +149,45 @@ describe('PipelineFactory', function () {
);
});
});

describe('key synthesis ($addFields)', () => {
it('should populate key from fullDocument.value.key on insert-shaped docs', async () => {
const doc = { fullDocument: { value: { key: 'my/object' } } };
await collection.insertOne(doc);
const results = await collection.aggregate([addFieldsStage]).toArray();
assert.strictEqual(results.length, 1);
assert.strictEqual(results[0].key, 'my/object');
});

it('should populate key from updateDescription.updatedFields.value.key on update-shaped docs', async () => {
const doc = { updateDescription: { updatedFields: { value: { key: 'my/object' } } } };
await collection.insertOne(doc);
const results = await collection.aggregate([addFieldsStage]).toArray();
assert.strictEqual(results.length, 1);
assert.strictEqual(results[0].key, 'my/object');
});

it('should prefer fullDocument.value.key when both are present', async () => {
const doc = {
fullDocument: { value: { key: 'from-full' } },
updateDescription: { updatedFields: { value: { key: 'from-update' } } },
};
await collection.insertOne(doc);
const results = await collection.aggregate([addFieldsStage]).toArray();
assert.strictEqual(results[0].key, 'from-full');
});

it('should leave key absent when neither path is populated', async () => {
Comment thread
delthas marked this conversation as resolved.
// $ifNull returns missing (not null) when all inputs are missing.
// The connector's nullable Avro key schema emits this as null on
// the wire, so the partition outcome is the same as an explicit
// null. In production deletes are ignored, so this case doesn't
// occur for consumed events.
const doc = { fullDocument: null };
await collection.insertOne(doc);
const results = await collection.aggregate([addFieldsStage]).toArray();
assert.strictEqual(results.length, 1);
assert.strictEqual(results[0].key, undefined);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
const assert = require('assert');
const sinon = require('sinon');
const { ZenkoMetrics } = require('arsenal').metrics;
const KafkaLogConsumerMetrics =
require('../../../../../lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics');

describe('KafkaLogConsumerMetrics', () => {
const log = { error: sinon.stub() };

afterEach(() => {
sinon.restore();
log.error.resetHistory();
});

describe('onMissingKey', () => {
it('should increment the s3_oplog_event_missing_key_total counter', () => {
const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total');
const incStub = sinon.stub(metric, 'inc');
KafkaLogConsumerMetrics.onMissingKey(log, 'update');
assert(incStub.calledOnceWith({ opType: 'update' }));
assert(log.error.notCalled);
});

it('should label as "unknown" when opType is missing', () => {
const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total');
const incStub = sinon.stub(metric, 'inc');
KafkaLogConsumerMetrics.onMissingKey(log, undefined);
assert(incStub.calledOnceWith({ opType: 'unknown' }));
});

it('should swallow + log errors from inc', () => {
const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total');
sinon.stub(metric, 'inc').throws(new Error('boom'));
assert.doesNotThrow(() => KafkaLogConsumerMetrics.onMissingKey(log, 'insert'));
assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'KafkaLogConsumerMetrics.onMissingKey',
}));
});
});
});
29 changes: 29 additions & 0 deletions tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
const assert = require('assert');
const sinon = require('sinon');
const werelogs = require('werelogs');
const logger = new werelogs.Logger('ListRecordStream');
const ListRecordStream =
require('../../../../../lib/queuePopulator/KafkaLogConsumer/ListRecordStream');
const KafkaLogConsumerMetrics =
require('../../../../../lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics');

const changeStreamDocument = {
ns: {
Expand Down Expand Up @@ -159,6 +162,32 @@ describe('ListRecordStream', () => {
});
});

it('should call onMissingKey when key is absent on a processed event', done => {
// changeStreamDocument has no top-level 'key' field — emulates an
// event that slipped past the $addFields stage. Should still be
// processed (objectMd is present) but the metric helper should fire.
const stub = sinon.stub(KafkaLogConsumerMetrics, 'onMissingKey');
const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument));
listRecordStream.write(kafkaMessage);
listRecordStream.once('data', () => {
assert(stub.calledOnceWith(logger, 'insert'));
stub.restore();
return done();
});
});

it('should NOT call onMissingKey when key is present', done => {
const stub = sinon.stub(KafkaLogConsumerMetrics, 'onMissingKey');
const doc = { ...changeStreamDocument, key: 'example-key' };
const kafkaMessage = getKafkaMessage(JSON.stringify(doc));
listRecordStream.write(kafkaMessage);
listRecordStream.once('data', () => {
assert(stub.notCalled);
stub.restore();
return done();
});
});

it('should skip empty record', done => {
const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument));
const EmptyKafkaMessage = getKafkaMessage('{}');
Expand Down
18 changes: 2 additions & 16 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,8 @@ const connectorConfig = {
}],
}, 'null'],
}, {
name: 'fullDocument',
type: [{
type: 'record',
name: 'fullDocumentRecord',
fields: [{
name: 'value',
type: [{
type: 'record',
name: 'valueRecord',
fields: [{
name: 'key',
type: ['string', 'null'],
}],
}, 'null'],
}],
}, 'null'],
name: 'key',
type: ['string', 'null'],
}],
}),
'heartbeat.interval.ms': 10000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,55 @@ describe('MultipleBucketsPipelineFactory', () => {
assert.strictEqual(result, '[]');
});

it('should return the pipeline with buckets and location stripping', () => {
it('should return the pipeline with buckets, key synthesis and location stripping', () => {
const buckets = ['bucket1', 'bucket2'];
const result = multipleBucketsPipelineFactory.getPipeline(buckets);
const pipeline = JSON.parse(result);

assert.strictEqual(pipeline.length, 2);
assert.strictEqual(pipeline.length, 3);
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}});
assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], {
assert.deepStrictEqual(pipeline[1], {
$addFields: {
key: {
$ifNull: [
'$fullDocument.value.key',
'$updateDescription.updatedFields.value.key',
],
},
},
});
assert.deepStrictEqual(pipeline[2].$set['fullDocument.value.location'], {
$cond: {
if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] },
then: '$$REMOVE',
else: '$fullDocument.value.location',
},
});
assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], {
assert.deepStrictEqual(pipeline[2].$set['updateDescription.updatedFields.value.location'], {
$cond: {
if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] },
then: '$$REMOVE',
else: '$updateDescription.updatedFields.value.location',
},
});
});

it('should return the pipeline with key synthesis when location stripping is disabled', () => {
const factory = new MultipleBucketsPipelineFactory(0);
const pipeline = JSON.parse(factory.getPipeline(['bucket1']));
assert.strictEqual(pipeline.length, 2);
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1']}}});
assert.deepStrictEqual(pipeline[1], {
$addFields: {
key: {
$ifNull: [
'$fullDocument.value.key',
'$updateDescription.updatedFields.value.key',
],
},
},
});
});
});

describe('getOldConnectorBucketList', () => {
Expand Down
Loading
Loading