diff --git a/src/common/lib/client/modularplugins.ts b/src/common/lib/client/modularplugins.ts index 99c0a3c5b..97e060b8f 100644 --- a/src/common/lib/client/modularplugins.ts +++ b/src/common/lib/client/modularplugins.ts @@ -41,7 +41,7 @@ export interface ModularPlugins { FetchRequest?: typeof fetchRequest; MessageInteractions?: typeof FilteredSubscriptions; Push?: typeof PushPlugin; - Objects?: typeof ObjectsPlugin; + Objects?: typeof ObjectsPlugin; // PC5, PT2b } export const allCommonModularPlugins: ModularPlugins = { Rest }; diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 8a95b2ce5..b83068f9b 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -149,11 +149,12 @@ class RealtimeChannel extends EventEmitter { return this._push; } + /** @spec RTL27 */ get objects() { if (!this._objects) { - Utils.throwMissingPluginError('Objects'); + Utils.throwMissingPluginError('Objects'); // RTL27b } - return this._objects; + return this._objects; // RTL27a } invalidStateError(): ErrorInfo { @@ -613,6 +614,7 @@ class RealtimeChannel extends EventEmitter { break; } + // RTL1 // OBJECT and OBJECT_SYNC message processing share most of the logic, so group them together case actions.OBJECT: case actions.OBJECT_SYNC: { diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index 873d7d198..76ceb78b0 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -169,7 +169,7 @@ class ProtocolMessage { /** * This will be undefined if we skipped decoding this property due to user not requesting Objects functionality — see {@link fromDeserialized} */ - state?: ObjectsPlugin.ObjectMessage[]; + state?: ObjectsPlugin.ObjectMessage[]; // TR4r auth?: unknown; connectionDetails?: Record; params?: Record; diff --git a/src/common/lib/types/protocolmessagecommon.ts b/src/common/lib/types/protocolmessagecommon.ts index 8c1e43a64..bb85accf9 100644 --- a/src/common/lib/types/protocolmessagecommon.ts +++ b/src/common/lib/types/protocolmessagecommon.ts @@ -1,6 +1,7 @@ // constant definitions that can be imported by anyone without worrying about circular // deps +// TR2 export const actions = { HEARTBEAT: 0, ACK: 1, @@ -31,6 +32,7 @@ Object.keys(actions).forEach(function (name) { ActionName[(actions as { [key: string]: number })[name]] = name; }); +// TR3 export const flags: { [key: string]: number } = { /* Channel attach state flags */ HAS_PRESENCE: 1 << 0, diff --git a/src/common/lib/util/utils.ts b/src/common/lib/util/utils.ts index 351daa96b..df02eac98 100644 --- a/src/common/lib/util/utils.ts +++ b/src/common/lib/util/utils.ts @@ -285,8 +285,8 @@ export function inspectBody(body: unknown): string { * Returns the byte size of the provided data based on the spec: * - TM6a - size of the string is byte length of the string * - TM6c - size of the buffer is its size in bytes - * - OD3c - size of a number is 8 bytes - * - OD3d - size of a boolean is 1 byte + * - OD3d - size of a number is 8 bytes + * - OD3b - size of a boolean is 1 byte */ export function dataSizeBytes(data: string | number | boolean | Bufferlike): number { if (Platform.BufferUtils.isBuffer(data)) { diff --git a/src/plugins/objects/livecounter.ts b/src/plugins/objects/livecounter.ts index d04ebb659..7a760fca0 100644 --- a/src/plugins/objects/livecounter.ts +++ b/src/plugins/objects/livecounter.ts @@ -4,18 +4,20 @@ import { ObjectMessage, ObjectOperation, ObjectOperationAction, ObjectsCounterOp import { Objects } from './objects'; export interface LiveCounterData extends LiveObjectData { - data: number; + data: number; // RTLC3 } export interface LiveCounterUpdate extends LiveObjectUpdate { update: { amount: number }; } +/** @spec RTLC1, RTLC2 */ export class LiveCounter extends LiveObject { /** * Returns a {@link LiveCounter} instance with a 0 value. * * @internal + * @spec RTLC4 */ static zeroValue(objects: Objects, objectId: string): LiveCounter { return new LiveCounter(objects, objectId); @@ -122,9 +124,10 @@ export class LiveCounter extends LiveObject }; } + /** @spec RTLC5 */ value(): number { - this._objects.throwIfInvalidAccessApiConfiguration(); - return this._dataRef.data; + this._objects.throwIfInvalidAccessApiConfiguration(); // RTLC5a, RTLC5b + return this._dataRef.data; // RTLC5c } /** @@ -221,6 +224,7 @@ export class LiveCounter extends LiveObject /** * @internal + * @spec RTLC6 */ overrideWithObjectState(objectState: ObjectState): LiveCounterUpdate | LiveObjectUpdateNoop { if (objectState.objectId !== this.getObjectId()) { @@ -252,7 +256,7 @@ export class LiveCounter extends LiveObject // object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation. // should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object. - this._siteTimeserials = objectState.siteTimeserials ?? {}; + this._siteTimeserials = objectState.siteTimeserials ?? {}; // RTLC6a if (this.isTombstoned()) { // this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing @@ -265,8 +269,9 @@ export class LiveCounter extends LiveObject this.tombstone(); } else { // override data for this object with data from the object state - this._createOperationIsMerged = false; - this._dataRef = { data: objectState.counter?.count ?? 0 }; + this._createOperationIsMerged = false; // RTLC6b + this._dataRef = { data: objectState.counter?.count ?? 0 }; // RTLC6c + // RTLC6d if (!this._client.Utils.isNil(objectState.createOp)) { this._mergeInitialDataFromCreateOperation(objectState.createOp); } @@ -285,6 +290,7 @@ export class LiveCounter extends LiveObject return; } + /** @spec RTLC4 */ protected _getZeroValueData(): LiveCounterData { return { data: 0 }; } @@ -299,8 +305,8 @@ export class LiveCounter extends LiveObject // note that it is intentional to SUM the incoming count from the create op. // if we got here, it means that current counter instance is missing the initial value in its data reference, // which we're going to add now. - this._dataRef.data += objectOperation.counter?.count ?? 0; - this._createOperationIsMerged = true; + this._dataRef.data += objectOperation.counter?.count ?? 0; // RTLC6d1 + this._createOperationIsMerged = true; // RTLC6d2 return { update: { amount: objectOperation.counter?.count ?? 0 } }; } diff --git a/src/plugins/objects/livemap.ts b/src/plugins/objects/livemap.ts index 38e42b2ca..478661422 100644 --- a/src/plugins/objects/livemap.ts +++ b/src/plugins/objects/livemap.ts @@ -49,13 +49,14 @@ export interface LiveMapEntry { } export interface LiveMapData extends LiveObjectData { - data: Map; + data: Map; // RTLM3 } export interface LiveMapUpdate extends LiveObjectUpdate { update: { [keyName in keyof T & string]?: 'updated' | 'removed' }; } +/** @spec RTLM1, RTLM2 */ export class LiveMap extends LiveObject> { constructor( objects: Objects, @@ -69,6 +70,7 @@ export class LiveMap extends LiveObject(objects: Objects, objectId: string): LiveMap { return new LiveMap(objects, ObjectsMapSemantics.LWW, objectId); @@ -200,7 +202,7 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject(key: TKey): T[TKey] | undefined { - this._objects.throwIfInvalidAccessApiConfiguration(); + this._objects.throwIfInvalidAccessApiConfiguration(); // RTLM5b, RTLM5c if (this.isTombstoned()) { return undefined as T[TKey]; @@ -305,10 +309,12 @@ export class LiveMap extends LiveObject extends LiveObject | LiveObjectUpdateNoop { if (objectState.objectId !== this.getObjectId()) { @@ -512,7 +519,7 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject this._dataRef.data.delete(x)); } + /** @spec RTLM4 */ protected _getZeroValueData(): LiveMapData { return { data: new Map() }; } @@ -621,6 +630,7 @@ export class LiveMap extends LiveObject = { update: {} }; + // RTLM6d1 // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. // we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations. Object.entries(objectOperation.map.entries ?? {}).forEach(([key, entry]) => { @@ -628,10 +638,10 @@ export class LiveMap extends LiveObject | LiveObjectUpdateNoop; if (entry.tombstone === true) { - // entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op + // RTLM6d1b - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op update = this._applyMapRemove({ key }, opSerial); } else { - // entry in MAP_CREATE op is not removed, try to set it via MAP_SET op + // RTLM6d1a - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op update = this._applyMapSet({ key, data: entry.data }, opSerial); } @@ -644,7 +654,7 @@ export class LiveMap extends LiveObject extends LiveObject | LiveObjectUpdateNoop { const { ErrorInfo, Utils } = this._client; const existingEntry = this._dataRef.data.get(op.key); + // RTLM7a if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opSerial)) { - // the operation's serial <= the entry's serial, ignore the operation. + // RTLM7a1 - the operation's serial <= the entry's serial, ignore the operation. this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, @@ -713,13 +725,14 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject | LiveObjectUpdateNoop { const existingEntry = this._dataRef.data.get(op.key); + // RTLM8a if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opSerial)) { - // the operation's serial <= the entry's serial, ignore the operation. + // RTLM8a1 - the operation's serial <= the entry's serial, ignore the operation. this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, @@ -766,13 +783,15 @@ export class LiveMap extends LiveObject extends LiveObject mapEntrySerial; } @@ -859,23 +879,23 @@ export class LiveMap extends LiveObject extends LiveObject ObjectData; +/** @spec OOP2 */ export enum ObjectOperationAction { MAP_CREATE = 0, MAP_SET = 1, @@ -20,160 +21,187 @@ export enum ObjectOperationAction { OBJECT_DELETE = 5, } +/** @spec OMP2 */ export enum ObjectsMapSemantics { LWW = 0, } -/** An ObjectData represents a value in an object on a channel. */ +/** + * An ObjectData represents a value in an object on a channel. + * @spec OD1 + */ export interface ObjectData { /** A reference to another object, used to support composable object structures. */ - objectId?: string; + objectId?: string; // OD2a - /** Can be set by the client to indicate that value in `string` or `bytes` field have an encoding. */ - encoding?: string; + /** May be set by the client to indicate that value in `string` field have an encoding. */ + encoding?: string; // OD2b /** A primitive boolean leaf value in the object graph. Only one value field can be set. */ - boolean?: boolean; + boolean?: boolean; // OD2c /** A primitive binary leaf value in the object graph. Only one value field can be set. */ - bytes?: Bufferlike; + bytes?: Bufferlike; // OD2d /** A primitive number leaf value in the object graph. Only one value field can be set. */ - number?: number; + number?: number; // OD2e /** A primitive string leaf value in the object graph. Only one value field can be set. */ - string?: string; + string?: string; // OD2f } -/** An ObjectsMapOp describes an operation to be applied to a Map object. */ +/** + * An ObjectsMapOp describes an operation to be applied to a Map object. + * @spec OMO1 + */ export interface ObjectsMapOp { /** The key of the map entry to which the operation should be applied. */ - key: string; + key: string; // OMO2a /** The data that the map entry should contain if the operation is a MAP_SET operation. */ - data?: ObjectData; + data?: ObjectData; // OMO2b } -/** An ObjectsCounterOp describes an operation to be applied to a Counter object. */ +/** + * An ObjectsCounterOp describes an operation to be applied to a Counter object. + * @spec OCO1 + */ export interface ObjectsCounterOp { /** The data value that should be added to the counter */ - amount: number; + amount: number; // OCO2a } -/** An ObjectsMapEntry represents the value at a given key in a Map object. */ +/** + * An ObjectsMapEntry represents the value at a given key in a Map object. + * @spec OME1 + */ export interface ObjectsMapEntry { /** Indicates whether the map entry has been removed. */ - tombstone?: boolean; + tombstone?: boolean; // OME2a /** * The {@link ObjectMessage.serial} value of the last operation that was applied to the map entry. * * It is optional in a MAP_CREATE operation and might be missing, in which case the client should use a nullish value for it * and treat it as the "earliest possible" serial for comparison purposes. */ - timeserial?: string; + timeserial?: string; // OME2b /** The data that represents the value of the map entry. */ - data?: ObjectData; + data?: ObjectData; // OME2c } -/** An ObjectsMap object represents a map of key-value pairs. */ +/** + * An ObjectsMap object represents a map of key-value pairs. + * @spec OMP1 + */ export interface ObjectsMap { /** The conflict-resolution semantics used by the map object. */ - semantics?: ObjectsMapSemantics; + semantics?: ObjectsMapSemantics; // OMP3a // The map entries, indexed by key. - entries?: Record; + entries?: Record; // OMP3b } -/** An ObjectsCounter object represents an incrementable and decrementable value */ +/** + * An ObjectsCounter object represents an incrementable and decrementable value + * @spec OCN1 + */ export interface ObjectsCounter { /** The value of the counter */ - count?: number; + count?: number; // OCN2a } -/** An ObjectOperation describes an operation to be applied to an object on a channel. */ +/** + * An ObjectOperation describes an operation to be applied to an object on a channel. + * @spec OOP1 + */ export interface ObjectOperation { /** Defines the operation to be applied to the object. */ - action: ObjectOperationAction; + action: ObjectOperationAction; // OOP3a /** The object ID of the object on a channel to which the operation should be applied. */ - objectId: string; + objectId: string; // OOP3b /** The payload for the operation if it is an operation on a Map object type. */ - mapOp?: ObjectsMapOp; + mapOp?: ObjectsMapOp; // OOP3c /** The payload for the operation if it is an operation on a Counter object type. */ - counterOp?: ObjectsCounterOp; + counterOp?: ObjectsCounterOp; // OOP3d /** * The payload for the operation if the operation is MAP_CREATE. * Defines the initial value for the Map object. */ - map?: ObjectsMap; + map?: ObjectsMap; // OOP3e /** * The payload for the operation if the operation is COUNTER_CREATE. * Defines the initial value for the Counter object. */ - counter?: ObjectsCounter; + counter?: ObjectsCounter; // OOP3f /** * The nonce, must be present on create operations. This is the random part * that has been hashed with the type and initial value to create the object ID. */ - nonce?: string; + nonce?: string; // OOP3g /** * The initial value bytes for the object. These bytes should be used along with the nonce * and timestamp to create the object ID. Frontdoor will use this to verify the object ID. * After verification the bytes will be decoded into the Map or Counter objects and * the initialValue, nonce, and initialValueEncoding will be removed. */ - initialValue?: Bufferlike; + initialValue?: Bufferlike; // OOP3h /** The initial value encoding defines how the initialValue should be interpreted. */ - initialValueEncoding?: Utils.Format; + initialValueEncoding?: Utils.Format; // OOP3i } -/** An ObjectState describes the instantaneous state of an object on a channel. */ +/** + * An ObjectState describes the instantaneous state of an object on a channel. + * @spec OST1 + */ export interface ObjectState { /** The identifier of the object. */ - objectId: string; + objectId: string; // OST2a /** A map of serials keyed by a {@link ObjectMessage.siteCode}, representing the last operations applied to this object */ - siteTimeserials: Record; + siteTimeserials: Record; // OST2b /** True if the object has been tombstoned. */ - tombstone: boolean; + tombstone: boolean; // OST2c /** * The operation that created the object. * * Can be missing if create operation for the object is not known at this point. */ - createOp?: ObjectOperation; + createOp?: ObjectOperation; // OST2d /** * The data that represents the result of applying all operations to a Map object * excluding the initial value from the create operation if it is a Map object type. */ - map?: ObjectsMap; + map?: ObjectsMap; // OST2e /** * The data that represents the result of applying all operations to a Counter object * excluding the initial value from the create operation if it is a Counter object type. */ - counter?: ObjectsCounter; + counter?: ObjectsCounter; // OST2f } // TODO: tidy up encoding/decoding logic for ObjectMessage: // Should have separate WireObjectMessage with the correct types received from the server, do the necessary encoding/decoding there. // For reference, see WireMessage and WirePresenceMessage /** + * An individual object message to be sent or received via the Ably Realtime service. + * @spec OM1 * @internal */ export class ObjectMessage { - id?: string; - timestamp?: number; - clientId?: string; - connectionId?: string; - extras?: any; + id?: string; // OM2a + clientId?: string; // OM2b + connectionId?: string; // OM2c + extras?: any; // OM2d + timestamp?: number; // OM2e /** * Describes an operation to be applied to an object. * * Mutually exclusive with the `object` field. This field is only set on object messages if the `action` field of the `ProtocolMessage` encapsulating it is `OBJECT`. */ - operation?: ObjectOperation; + operation?: ObjectOperation; // OM2f /** * Describes the instantaneous state of an object. * * Mutually exclusive with the `operation` field. This field is only set on object messages if the `action` field of the `ProtocolMessage` encapsulating it is `OBJECT_SYNC`. */ - object?: ObjectState; + object?: ObjectState; // OM2g /** An opaque string that uniquely identifies this object message. */ - serial?: string; + serial?: string; // OM2h /** An opaque string used as a key to update the map of serial values on an object. */ - siteCode?: string; + siteCode?: string; // OM2i constructor( private _utils: typeof Utils, @@ -185,6 +213,8 @@ export class ObjectMessage { * Mutates the provided ObjectMessage. * * Uses encoding functions from regular `Message` processing. + * + * @spec OM4 */ static encode(message: ObjectMessage, client: BaseClient): ObjectMessage { const encodeInitialValueFn: EncodeInitialValueFunction = (data, encoding) => { @@ -232,6 +262,8 @@ export class ObjectMessage { * Format is used to decode the bytes value as it's implicitly encoded depending on the protocol used: * - json: bytes are base64 encoded string * - msgpack: bytes have a binary representation and don't need to be decoded + * + * @spec OM5 */ static async decode( message: ObjectMessage, @@ -339,6 +371,7 @@ export class ObjectMessage { } } + /** @spec OD5 */ private static async _decodeObjectData( objectData: ObjectData, client: BaseClient, @@ -351,11 +384,12 @@ export class ObjectMessage { // - if connection is json - "bytes" was received as a base64 string, need to decode it to a buffer if (format !== 'msgpack' && objectData.bytes != null) { - // connection is using JSON protocol, decode bytes value + // OD5b2 - connection is using JSON protocol, decode bytes value objectData.bytes = client.Platform.BufferUtils.base64Decode(String(objectData.bytes)); } } + /** @spec OOP5 */ private static _encodeObjectOperation( objectOperation: ObjectOperation, encodeObjectDataFn: EncodeObjectDataFunction, @@ -421,6 +455,7 @@ export class ObjectMessage { return objectStateCopy; } + /** @spec OD4 */ private static _encodeObjectData(data: ObjectData, encodeFn: EncodeObjectDataFunction): ObjectData { const encodedData = encodeFn(data); return encodedData; @@ -440,6 +475,7 @@ export class ObjectMessage { objectState?: ObjectState; } { const encodeInitialValueFn: EncodeInitialValueFunction = (data, encoding) => { + // OOP5a1, OOP5b1 - initialValue encoded based on the protocol used const { data: encodedData, encoding: newEncoding } = messageEncoding.encodeDataForWire(data, encoding, format); return { data: encodedData, @@ -456,6 +492,7 @@ export class ObjectMessage { let encodedBytes: any = data.bytes; if (data.bytes != null) { + // OD4c2, OD4d2 const result = messageEncoding.encodeDataForWire(data.bytes, data.encoding, format); encodedBytes = result.data; // no need to change the encoding @@ -529,123 +566,142 @@ export class ObjectMessage { return result; } + /** @spec OM3 */ getMessageSize(): number { let size = 0; - size += this.clientId?.length ?? 0; + // OM3a + size += this.clientId?.length ?? 0; // OM3f if (this.operation) { - size += this._getObjectOperationSize(this.operation); + size += this._getObjectOperationSize(this.operation); // OM3b } if (this.object) { - size += this._getObjectStateSize(this.object); + size += this._getObjectStateSize(this.object); // OM3c } if (this.extras) { - size += JSON.stringify(this.extras).length; + size += JSON.stringify(this.extras).length; // OM3d } return size; } + /** @spec OOP4 */ private _getObjectOperationSize(operation: ObjectOperation): number { let size = 0; + // OOP4a if (operation.mapOp) { - size += this._getMapOpSize(operation.mapOp); + size += this._getMapOpSize(operation.mapOp); // OOP4b } if (operation.counterOp) { - size += this._getCounterOpSize(operation.counterOp); + size += this._getCounterOpSize(operation.counterOp); // OOP4c } if (operation.map) { - size += this._getObjectMapSize(operation.map); + size += this._getObjectMapSize(operation.map); // OOP4d } if (operation.counter) { - size += this._getObjectCounterSize(operation.counter); + size += this._getObjectCounterSize(operation.counter); // OOP4e } return size; } + /** @spec OST3 */ private _getObjectStateSize(obj: ObjectState): number { let size = 0; + // OST3a if (obj.map) { - size += this._getObjectMapSize(obj.map); + size += this._getObjectMapSize(obj.map); // OST3b } if (obj.counter) { - size += this._getObjectCounterSize(obj.counter); + size += this._getObjectCounterSize(obj.counter); // OST3c } if (obj.createOp) { - size += this._getObjectOperationSize(obj.createOp); + size += this._getObjectOperationSize(obj.createOp); // OST3d } return size; } + /** @spec OMP4 */ private _getObjectMapSize(map: ObjectsMap): number { let size = 0; + // OMP4a Object.entries(map.entries ?? {}).forEach(([key, entry]) => { - size += key?.length ?? 0; + size += key?.length ?? 0; // OMP4a1 if (entry) { - size += this._getMapEntrySize(entry); + size += this._getMapEntrySize(entry); // OMP4a2 } }); return size; } + /** @spec OCN3 */ private _getObjectCounterSize(counter: ObjectsCounter): number { + // OCN3b if (counter.count == null) { return 0; } + // OCN3a return 8; } + /** @spec OME3 */ private _getMapEntrySize(entry: ObjectsMapEntry): number { let size = 0; + // OME3a if (entry.data) { - size += this._getObjectDataSize(entry.data); + size += this._getObjectDataSize(entry.data); // OME3b } return size; } + /** @spec OMO3 */ private _getMapOpSize(mapOp: ObjectsMapOp): number { let size = 0; - size += mapOp.key?.length ?? 0; - + // OMO3a + size += mapOp.key?.length ?? 0; // OMO3d if (mapOp.data) { - size += this._getObjectDataSize(mapOp.data); + size += this._getObjectDataSize(mapOp.data); // OMO3b } return size; } + /** @spec OCO3 */ private _getCounterOpSize(operation: ObjectsCounterOp): number { + // OCO3b if (operation.amount == null) { return 0; } + // OCO3a return 8; } + /** @spec OD3 */ private _getObjectDataSize(data: ObjectData): number { let size = 0; + // OD3a if (data.boolean != null) { - size += this._utils.dataSizeBytes(data.boolean); + size += this._utils.dataSizeBytes(data.boolean); // OD3b } if (data.bytes != null) { - size += this._utils.dataSizeBytes(data.bytes); + size += this._utils.dataSizeBytes(data.bytes); // OD3c } if (data.number != null) { - size += this._utils.dataSizeBytes(data.number); + size += this._utils.dataSizeBytes(data.number); // OD3d } if (data.string != null) { - size += this._utils.dataSizeBytes(data.string); + size += this._utils.dataSizeBytes(data.string); // OD3e } return size; diff --git a/src/plugins/objects/objects.ts b/src/plugins/objects/objects.ts index 45223cee8..0c9859935 100644 --- a/src/plugins/objects/objects.ts +++ b/src/plugins/objects/objects.ts @@ -45,7 +45,7 @@ export class Objects { private _eventEmitterInternal: EventEmitter; // related to RTC10, should have a separate EventEmitter for users of the library private _eventEmitterPublic: EventEmitter; - private _objectsPool: ObjectsPool; + private _objectsPool: ObjectsPool; // RTO3 private _syncObjectsDataPool: SyncObjectsDataPool; private _currentSyncId: string | undefined; private _currentSyncCursor: string | undefined; @@ -69,16 +69,17 @@ export class Objects { * When called without a type variable, we return a default root type which is based on globally defined interface for Objects feature. * A user can provide an explicit type for the getRoot method to explicitly set the type structure on this particular channel. * This is useful when working with multiple channels with different underlying data structure. + * @spec RTO1 */ async getRoot(): Promise> { - this.throwIfInvalidAccessApiConfiguration(); + this.throwIfInvalidAccessApiConfiguration(); // RTO1a, RTO1b // if we're not synced yet, wait for sync sequence to finish before returning root if (this._state !== ObjectsState.synced) { - await this._eventEmitterInternal.once(ObjectsEvent.synced); + await this._eventEmitterInternal.once(ObjectsEvent.synced); // RTO1c } - return this._objectsPool.get(ROOT_OBJECT_ID) as LiveMap; + return this._objectsPool.get(ROOT_OBJECT_ID) as LiveMap; // RTO1d } /** @@ -211,17 +212,20 @@ export class Objects { /** * @internal + * @spec RTO5 */ handleObjectSyncMessages(objectMessages: ObjectMessage[], syncChannelSerial: string | null | undefined): void { - const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial); + const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial); // RTO5a const newSyncSequence = this._currentSyncId !== syncId; if (newSyncSequence) { - this._startNewSync(syncId, syncCursor); + // RTO5a2 - new sync sequence started + this._startNewSync(syncId, syncCursor); // RTO5a2a } - this._syncObjectsDataPool.applyObjectSyncMessages(objectMessages); + // RTO5a3 - continue current sync sequence + this._syncObjectsDataPool.applyObjectSyncMessages(objectMessages); // RTO5b - // if this is the last (or only) message in a sequence of sync updates, end the sync + // RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync if (!syncCursor) { // defer the state change event until the next tick if this was a new sync sequence // to allow any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. @@ -247,6 +251,7 @@ export class Objects { /** * @internal + * @spec RTO4 */ onAttached(hasObjects?: boolean): void { this._client.Logger.logAction( @@ -256,6 +261,7 @@ export class Objects { `channel=${this._channel.name}, hasObjects=${hasObjects}`, ); + // RTO4a const fromInitializedState = this._state === ObjectsState.initialized; if (hasObjects || fromInitializedState) { // should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value. @@ -263,14 +269,15 @@ export class Objects { this._startNewSync(); } + // RTO4b if (!hasObjects) { // if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel. // reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes. - this._objectsPool.resetToInitialPool(true); - this._syncObjectsDataPool.clear(); + this._objectsPool.resetToInitialPool(true); // RTO4b1, RTO4b2 + this._syncObjectsDataPool.clear(); // RTO4b3 // defer the state change event until the next tick if we started a new sequence just now due to being in initialized state. // this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. - this._endSync(fromInitializedState); + this._endSync(fromInitializedState); // RTO4b4 } } @@ -338,6 +345,7 @@ export class Objects { this._stateChange(ObjectsState.syncing, false); } + /** @spec RTO5c */ private _endSync(deferStateEvent: boolean): void { this._applySync(); // should apply buffered object operations after we applied the sync. @@ -345,9 +353,9 @@ export class Objects { this._applyObjectMessages(this._bufferedObjectOperations); this._bufferedObjectOperations = []; - this._syncObjectsDataPool.clear(); - this._currentSyncId = undefined; - this._currentSyncCursor = undefined; + this._syncObjectsDataPool.clear(); // RTO5c4 + this._currentSyncId = undefined; // RTO5c3 + this._currentSyncCursor = undefined; // RTO5c3 this._stateChange(ObjectsState.synced, deferStateEvent); } @@ -358,6 +366,7 @@ export class Objects { let match: RegExpMatchArray | null; let syncId: string | undefined = undefined; let syncCursor: string | undefined = undefined; + // RTO5a1 - syncChannelSerial is a two-part identifier: : if (syncChannelSerial && (match = syncChannelSerial.match(/^([\w-]+):(.*)$/))) { syncId = match[1]; syncCursor = match[2]; @@ -377,38 +386,41 @@ export class Objects { const receivedObjectIds = new Set(); const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate | LiveObjectUpdateNoop }[] = []; + // RTO5c1 for (const [objectId, entry] of this._syncObjectsDataPool.entries()) { receivedObjectIds.add(objectId); const existingObject = this._objectsPool.get(objectId); + // RTO5c1a if (existingObject) { - const update = existingObject.overrideWithObjectState(entry.objectState); + const update = existingObject.overrideWithObjectState(entry.objectState); // RTO5c1a1 // store updates to call subscription callbacks for all of them once the sync sequence is completed. // this will ensure that clients get notified about the changes only once everything has been applied. existingObjectUpdates.push({ object: existingObject, update }); continue; } + // RTO5c1b, let newObject: LiveObject; // assign to a variable so TS doesn't complain about 'never' type in the default case const objectType = entry.objectType; switch (objectType) { case 'LiveCounter': - newObject = LiveCounter.fromObjectState(this, entry.objectState); + newObject = LiveCounter.fromObjectState(this, entry.objectState); // RTO5c1b1a break; case 'LiveMap': - newObject = LiveMap.fromObjectState(this, entry.objectState); + newObject = LiveMap.fromObjectState(this, entry.objectState); // RTO5c1b1b break; default: - throw new this._client.ErrorInfo(`Unknown LiveObject type: ${objectType}`, 50000, 500); + throw new this._client.ErrorInfo(`Unknown LiveObject type: ${objectType}`, 50000, 500); // RTO5c1b1c } - this._objectsPool.set(objectId, newObject); + this._objectsPool.set(objectId, newObject); // RTO5c1b1 } - // need to remove LiveObject instances from the ObjectsPool for which objectIds were not received during the sync sequence + // RTO5c2 - need to remove LiveObject instances from the ObjectsPool for which objectIds were not received during the sync sequence this._objectsPool.deleteExtraObjectIds([...receivedObjectIds]); // call subscription callbacks for all updated existing objects @@ -457,14 +469,15 @@ export class Objects { } } + /** @spec RTO2 */ private _throwIfMissingChannelMode(expectedMode: 'object_subscribe' | 'object_publish'): void { - // channel.modes is only populated on channel attachment, so use it only if it is set, - // otherwise as a best effort use user provided channel options + // RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set if (this._channel.modes != null && !this._channel.modes.includes(expectedMode)) { - throw new this._client.ErrorInfo(`"${expectedMode}" channel mode must be set for this operation`, 40024, 400); + throw new this._client.ErrorInfo(`"${expectedMode}" channel mode must be set for this operation`, 40024, 400); // RTO2a2 } + // RTO2b - otherwise as a best effort use user provided channel options if (!this._client.Utils.allToLowerCase(this._channel.channelOptions.modes ?? []).includes(expectedMode)) { - throw new this._client.ErrorInfo(`"${expectedMode}" channel mode must be set for this operation`, 40024, 400); + throw new this._client.ErrorInfo(`"${expectedMode}" channel mode must be set for this operation`, 40024, 400); // RTO2b2 } } diff --git a/src/plugins/objects/objectspool.ts b/src/plugins/objects/objectspool.ts index 99a9a1f1f..d062d659b 100644 --- a/src/plugins/objects/objectspool.ts +++ b/src/plugins/objects/objectspool.ts @@ -10,10 +10,11 @@ export const ROOT_OBJECT_ID = 'root'; /** * @internal + * @spec RTO3 */ export class ObjectsPool { private _client: BaseClient; - private _pool: Map; + private _pool: Map; // RTO3a private _gcInterval: ReturnType; constructor(private _objects: Objects) { @@ -70,22 +71,23 @@ export class ObjectsPool { } } + /** @spec RTO6 */ createZeroValueObjectIfNotExists(objectId: string): LiveObject { const existingObject = this.get(objectId); if (existingObject) { - return existingObject; + return existingObject; // RTO6a } - const parsedObjectId = ObjectId.fromString(this._client, objectId); + const parsedObjectId = ObjectId.fromString(this._client, objectId); // RTO6b let zeroValueObject: LiveObject; switch (parsedObjectId.type) { case 'map': { - zeroValueObject = LiveMap.zeroValue(this._objects, objectId); + zeroValueObject = LiveMap.zeroValue(this._objects, objectId); // RTO6b2 break; } case 'counter': - zeroValueObject = LiveCounter.zeroValue(this._objects, objectId); + zeroValueObject = LiveCounter.zeroValue(this._objects, objectId); // RTO6b3 break; } @@ -95,6 +97,7 @@ export class ObjectsPool { private _createInitialPool(): Map { const pool = new Map(); + // RTO3b const root = LiveMap.zeroValue(this._objects, ROOT_OBJECT_ID); pool.set(root.getObjectId(), root); return pool;