Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjects.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.ably.lib.objects;

import io.ably.lib.objects.state.ObjectsStateChange;
import io.ably.lib.objects.type.counter.LiveCounter;
import io.ably.lib.objects.type.map.LiveMap;
import io.ably.lib.types.Callback;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
* s.unsubscribe();
* }
* </pre>
* Spec: RTLO4b5
*/
public interface ObjectsSubscription {
/**
* This method should be called when the subscription is no longer needed,
* it will make sure no further events will be sent to the subscriber and
* that references to the subscriber are cleaned up.
* Spec: RTLO4b5a
*/
void unsubscribe();
}
27 changes: 27 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.ably.lib.objects.type;

import org.jetbrains.annotations.Nullable;

/**
* Abstract base class for all LiveObject update notifications.
* Provides common structure for updates that occur on LiveMap and LiveCounter objects.
* Contains the update data that describes what changed in the live object.
* Spec: RTLO4b4
*/
public abstract class LiveObjectUpdate {
/**
* The update data containing details about the change that occurred
* Spec: RTLO4b4a
*/
@Nullable
protected final Object update;

/**
* Creates a LiveObjectUpdate with the specified update data.
*
* @param update the data describing the change, or null for no-op updates
*/
protected LiveObjectUpdate(@Nullable Object update) {
this.update = update;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.ably.lib.objects;
package io.ably.lib.objects.type.counter;

import io.ably.lib.types.Callback;
import org.jetbrains.annotations.Blocking;
Expand All @@ -11,7 +11,7 @@
* It allows incrementing, decrementing, and retrieving the current value of the counter,
* both synchronously and asynchronously.
*/
public interface LiveCounter {
public interface LiveCounter extends LiveCounterChange {

/**
* Increments the value of the counter by 1.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.ably.lib.objects.type.counter;

import io.ably.lib.objects.ObjectsSubscription;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;

/**
* Provides methods to subscribe to real-time updates on LiveCounter objects.
* Enables clients to receive notifications when counter values change due to
* operations performed by any client connected to the same channel.
*/
public interface LiveCounterChange {

/**
* Subscribes to real-time updates on this LiveCounter object.
* Multiple listeners can be subscribed to the same object independently.
* Spec: RTLO4b
*
* @param listener the listener to be notified of counter updates
* @return an ObjectsSubscription for managing this specific listener
*/
@NonBlocking
@NotNull ObjectsSubscription subscribe(@NotNull Listener listener);

/**
* Unsubscribes a specific listener from receiving updates.
* Has no effect if the listener is not currently subscribed.
* Spec: RTLO4c
*
* @param listener the listener to be unsubscribed
*/
@NonBlocking
void unsubscribe(@NotNull Listener listener);

/**
* Unsubscribes all listeners from receiving updates.
* No notifications will be delivered until new listeners are subscribed.
* Spec: RTLO4d
*/
@NonBlocking
void unsubscribeAll();

/**
* Listener interface for receiving LiveCounter updates.
* Spec: RTLO4b3
*/
interface Listener {
/**
* Called when the LiveCounter has been updated.
* Should execute quickly as it's called from the real-time processing thread.
*
* @param update details about the counter change
*/
void onUpdated(@NotNull LiveCounterUpdate update);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.ably.lib.objects.type.counter;

import io.ably.lib.objects.type.LiveObjectUpdate;
import org.jetbrains.annotations.NotNull;

/**
* Represents an update that occurred on a LiveCounter object.
* Contains information about counter value changes from increment/decrement operations.
* Updates can represent positive changes (increments) or negative changes (decrements).
*
* @spec RTLC11, RTLC11a - LiveCounter update structure and behavior
*/
public class LiveCounterUpdate extends LiveObjectUpdate {

/**
* Creates a no-op LiveCounterUpdate representing no actual change.
*/
public LiveCounterUpdate() {
super(null);
}

/**
* Creates a LiveCounterUpdate with the specified amount change.
*
* @param amount the amount by which the counter changed (positive = increment, negative = decrement)
*/
public LiveCounterUpdate(@NotNull Double amount) {
super(new Update(amount));
}

/**
* Gets the update information containing the amount of change.
*
* @return the Update object with the counter modification amount
*/
@NotNull
public LiveCounterUpdate.Update getUpdate() {
return (Update) update;
}

/**
* Returns a string representation of this LiveCounterUpdate.
*
* @return a string showing the amount of change to the counter
*/
@Override
public String toString() {
if (update == null) {
return "LiveCounterUpdate{no change}";
}
return "LiveCounterUpdate{amount=" + getUpdate().getAmount() + "}";
}

/**
* Contains the specific details of a counter update operation.
*
* @spec RTLC11b, RTLC11b1 - Counter update data structure
*/
public static class Update {
private final @NotNull Double amount;

/**
* Creates an Update with the specified amount.
*
* @param amount the counter change amount (positive = increment, negative = decrement)
*/
public Update(@NotNull Double amount) {
this.amount = amount;
}

/**
* Gets the amount by which the counter value was modified.
*
* @return the change amount (positive for increments, negative for decrements)
*/
public @NotNull Double getAmount() {
return amount;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.ably.lib.objects;
package io.ably.lib.objects.type.map;

import io.ably.lib.types.Callback;
import org.jetbrains.annotations.Blocking;
Expand All @@ -14,7 +14,7 @@
* The LiveMap interface provides methods to interact with a live, real-time map structure.
* It supports both synchronous and asynchronous operations for managing key-value pairs.
*/
public interface LiveMap {
public interface LiveMap extends LiveMapChange {

/**
* Retrieves the value associated with the specified key.
Expand Down
56 changes: 56 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.ably.lib.objects.type.map;

import io.ably.lib.objects.ObjectsSubscription;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;

/**
* Provides methods to subscribe to real-time updates on LiveMap objects.
* Enables clients to receive notifications when map entries are added, updated, or removed.
* Uses last-write-wins conflict resolution when multiple clients modify the same key.
*/
public interface LiveMapChange {

/**
* Subscribes to real-time updates on this LiveMap object.
* Multiple listeners can be subscribed to the same object independently.
* Spec: RTLO4b
*
* @param listener the listener to be notified of map updates
* @return an ObjectsSubscription for managing this specific listener
*/
@NonBlocking
@NotNull ObjectsSubscription subscribe(@NotNull Listener listener);

/**
* Unsubscribes a specific listener from receiving updates.
* Has no effect if the listener is not currently subscribed.
* Spec: RTLO4c
*
* @param listener the listener to be unsubscribed
*/
@NonBlocking
void unsubscribe(@NotNull Listener listener);

/**
* Unsubscribes all listeners from receiving updates.
* No notifications will be delivered until new listeners are subscribed.
* Spec: RTLO4d
*/
@NonBlocking
void unsubscribeAll();

/**
* Listener interface for receiving LiveMap updates.
* Spec: RTLO4b3
*/
interface Listener {
/**
* Called when the LiveMap has been updated.
* Should execute quickly as it's called from the real-time processing thread.
*
* @param update details about which keys were modified and how
*/
void onUpdated(@NotNull LiveMapUpdate update);
}
}
66 changes: 66 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.ably.lib.objects.type.map;

import io.ably.lib.objects.type.LiveObjectUpdate;
import org.jetbrains.annotations.NotNull;

import java.util.Map;

/**
* Represents an update that occurred on a LiveMap object.
* Contains information about which keys were modified and whether they were updated or removed.
*
* @spec RTLM18, RTLM18a - LiveMap update structure and behavior
*/
public class LiveMapUpdate extends LiveObjectUpdate {

/**
* Creates a no-op LiveMapUpdate representing no actual change.
*/
public LiveMapUpdate() {
super(null);
}

/**
* Creates a LiveMapUpdate with the specified key changes.
*
* @param update map of key names to their change types (UPDATED or REMOVED)
*/
public LiveMapUpdate(@NotNull Map<String, Change> update) {
super(update);
}

/**
* Gets the map of key changes that occurred in this update.
*
* @return map of key names to their change types
*/
@NotNull
public Map<String, Change> getUpdate() {
return (Map<String, Change>) update;
}

/**
* Returns a string representation of this LiveMapUpdate.
*
* @return a string showing the map key changes in this update
*/
@Override
public String toString() {
if (update == null) {
return "LiveMapUpdate{no change}";
}
return "LiveMapUpdate{changes=" + getUpdate() + "}";
}

/**
* Indicates the type of change that occurred to a map key.
*
* @spec RTLM18b - Map change types
*/
public enum Change {
/** The key was added or its value was modified */
UPDATED,
/** The key was removed from the map */
REMOVED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package io.ably.lib.objects

import io.ably.lib.objects.state.ObjectsStateChange
import io.ably.lib.objects.state.ObjectsStateEvent
import io.ably.lib.objects.type.counter.LiveCounter
import io.ably.lib.objects.type.map.LiveMap
import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.AblyException
import io.ably.lib.types.Callback
Expand Down Expand Up @@ -126,7 +128,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
* @spec OM2 - Populates missing fields from parent protocol message
*/
private fun initializeHandlerForIncomingObjectMessages(): Job {
return sequentialScope.launch {
return sequentialScope.launch {
objectsEventBus.collect { protocolMessage ->
// OM2 - Populate missing fields from parent
val objects = protocolMessage.state.filterIsInstance<ObjectMessage>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe
setChannelSerial(channelName, channelSerial)
}

// Spec: RTLO4b1, RTLO4b2
internal fun LiveObjectsAdapter.throwIfInvalidAccessApiConfiguration(channelName: String) {
throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe)
throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.ably.lib.objects

import io.ably.lib.objects.type.BaseLiveObject
import io.ably.lib.objects.type.LiveObjectUpdate
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
import io.ably.lib.objects.type.livemap.DefaultLiveMap
import io.ably.lib.util.Log
Expand Down Expand Up @@ -125,7 +126,8 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
}

val receivedObjectIds = mutableSetOf<String>()
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, Any>>()
// RTO5c1a2 - List to collect updates for existing objects
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, LiveObjectUpdate>>()

// RTO5c1
for ((objectId, objectState) in syncObjectsDataPool) {
Expand All @@ -148,7 +150,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
// RTO5c2 - need to remove LiveObject instances from the ObjectsPool for which objectIds were not received during the sync sequence
liveObjects.objectsPool.deleteExtraObjectIds(receivedObjectIds)

// call subscription callbacks for all updated existing objects
// RTO5c7 - call subscription callbacks for all updated existing objects
existingObjectUpdates.forEach { (obj, update) ->
obj.notifyUpdated(update)
}
Expand Down
Loading
Loading