byIndexStream(String indexName, String indexKey);
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java
index de4d00d717..6f9e73d8ed 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java
@@ -37,7 +37,8 @@
/**
* Provides useful operations to manipulate resources (server-side apply, patch, etc.) in an
* idiomatic way, in particular to make sure that the latest version of the resource is present in
- * the caches for the next reconciliation.
+ * the caches for the next reconciliation. In other words provides read-cache-after-write
+ * consistency.
*
* @param the resource type on which this object operates
*/
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index 6632ce631e..a3ef72a2a3 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -256,12 +256,13 @@ public void addIndexers(Map>> indexers) {
this.indexers.putAll(indexers);
}
+ public Stream byIndexStream(String indexName, String indexKey) {
+ return sources.values().stream().map(s -> s.byIndex(indexName, indexKey)).flatMap(List::stream);
+ }
+
@Override
public List byIndex(String indexName, String indexKey) {
- return sources.values().stream()
- .map(s -> s.byIndex(indexName, indexKey))
- .flatMap(List::stream)
- .collect(Collectors.toList());
+ return byIndexStream(indexName, indexKey).collect(Collectors.toList());
}
@Override
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
index 541068aa93..8885c225c8 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
@@ -195,6 +195,11 @@ public List byIndex(String indexName, String indexKey) {
return informer.getIndexer().byIndex(indexName, indexKey);
}
+ @Override
+ public Stream byIndexStream(String indexName, String indexKey) {
+ return byIndex(indexName, indexKey).stream();
+ }
+
@Override
public String toString() {
return informerInfo() + " (" + informer + ')';
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
index 26543e8322..69a5f36bf4 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
@@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
@@ -111,7 +112,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
res.ifPresentOrElse(
r -> {
R latestResource = (R) r.getResource().orElseThrow();
-
// as previous resource version we use the one from successful update, since
// we process new event here only if that is more recent then the event from our update.
// Note that this is equivalent with the scenario when an informer watch connection
@@ -222,11 +222,6 @@ public Optional getCachedValue(ResourceID resourceID) {
return get(resourceID);
}
- @Override
- public Stream list(String namespace, Predicate predicate) {
- return manager().list(namespace, predicate);
- }
-
void setTemporalResourceCache(TemporaryResourceCache temporaryResourceCache) {
this.temporaryResourceCache = temporaryResourceCache;
}
@@ -239,19 +234,134 @@ public void addIndexers(Map>> indexers) {
this.indexers.putAll(indexers);
}
+ @Override
+ public Stream list(String namespace, Predicate predicate) {
+ return manager().list(namespace, predicate);
+ }
+
+ @Override
+ public Stream list(Predicate predicate) {
+ return cache.list(predicate);
+ }
+
@Override
public List byIndex(String indexName, String indexKey) {
return manager().byIndex(indexName, indexKey);
}
- @Override
- public Stream keys() {
- return cache.keys();
+ public Stream byIndexStream(String indexName, String indexKey) {
+ return manager().byIndexStream(indexName, indexKey);
+ }
+
+ /**
+ * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is
+ * useful when resources are updated using {@link
+ * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
+ */
+ public Stream listWithStrongConsistency(String namespace, Predicate predicate) {
+ return mergeWithWithTempCacheResources(
+ manager().list(namespace, predicate), namespace, predicate);
+ }
+
+ /**
+ * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when
+ * resources are updated using {@link
+ * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
+ */
+ public Stream listWithStrongConsistency(Predicate predicate) {
+ return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate);
+ }
+
+ /**
+ * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is
+ * useful when resources are updated using {@link
+ * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
+ */
+ public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) {
+ return mergeWithWithTempCacheResources(
+ manager().byIndexStream(indexName, indexKey), indexName, indexKey);
+ }
+
+ private Stream mergeWithWithTempCacheResources(
+ Stream stream, String indexName, String indexKey) {
+ return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey);
+ }
+
+ private Stream mergeWithWithTempCacheResources(
+ Stream stream, String namespace, Predicate predicate) {
+ return mergeWithWithTempCacheResources(stream, namespace, predicate, null, null);
+ }
+
+ private Stream mergeWithWithTempCacheResources(
+ Stream stream,
+ String namespace,
+ Predicate predicate,
+ String indexName,
+ String indexKey) {
+ if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
+ return stream;
+ }
+ var allTempResources = temporaryResourceCache.getResources();
+ Map tempResources;
+ if (namespace == null && predicate == null) {
+ tempResources = new HashMap<>(allTempResources);
+ } else {
+ // filtering the temp cache according the user input (predicate, namespace)
+ tempResources =
+ allTempResources.entrySet().stream()
+ .filter(
+ e -> {
+ if (namespace != null) {
+ var res =
+ e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false);
+ if (!res) return false;
+ }
+ if (predicate != null) {
+ return predicate.test(e.getValue());
+ }
+ return true;
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+ if (tempResources.isEmpty()) {
+ return stream;
+ }
+ var upToDateList =
+ stream
+ .map(
+ r -> {
+ var resourceID = ResourceID.fromResource(r);
+ // removing the id from the related temp resources
+ // this is important so we can detect ghost resources:
+ // all that remains is ghost resource
+ var tempResource = tempResources.remove(resourceID);
+ // using the latest version
+ if (tempResource != null
+ && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
+ return tempResource;
+ }
+ return r;
+ })
+ .toList();
+ Stream tempResourceStream;
+ // ghost resource handling
+ if (indexName != null && indexKey != null) {
+ var indexer = indexers.get(indexName);
+ if (indexer == null) {
+ throw new IllegalArgumentException("Indexer not found for: " + indexName);
+ }
+ // we check if the ghost resource is part of the index
+ tempResourceStream =
+ tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey));
+ } else {
+ tempResourceStream = tempResources.values().stream();
+ }
+ return Stream.concat(tempResourceStream, upToDateList.stream());
}
@Override
- public Stream list(Predicate predicate) {
- return cache.list(predicate);
+ public Stream keys() {
+ return cache.keys();
}
@Override
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
index 5a4486f756..39543843b8 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
@@ -275,4 +275,12 @@ private void checkGhostResources() {
public synchronized Optional getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}
+
+ synchronized boolean isEmpty() {
+ return cache.isEmpty();
+ }
+
+ synchronized Map getResources() {
+ return new HashMap<>(cache);
+ }
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
index e60ac02280..7313cc3a48 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
@@ -16,11 +16,14 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;
import java.time.Duration;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -56,7 +59,9 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -87,10 +92,12 @@ void setup() {
when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any()))
.thenReturn(Set.of(ResourceID.fromResource(testDeployment())));
when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig);
- when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET);
+
when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class);
when(informerConfig.getGhostResourceCacheCheckInterval())
.thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL);
+ when(informerConfig.isComparableResourceVersions()).thenReturn(true);
+ when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET);
informerEventSource =
spy(
new InformerEventSource<>(informerEventSourceConfiguration, clientMock) {
@@ -533,6 +540,135 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
}
+ @Test
+ void listWithStrongConsistencyReplacesResourceFromTempCache() {
+ var original = testDeployment();
+ var newer = testDeployment();
+ newer.getMetadata().setResourceVersion("5");
+
+ when(temporaryResourceCache.getResources())
+ .thenReturn(Map.of(ResourceID.fromResource(original), newer));
+
+ var mim = mock(InformerManager.class);
+ when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original));
+ doReturn(mim).when(informerEventSource).manager();
+
+ var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList();
+
+ assertThat(result).containsExactly(newer);
+ }
+
+ @Test
+ void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() {
+ var original = testDeployment();
+
+ when(temporaryResourceCache.getResources()).thenReturn(Map.of());
+
+ var mim = mock(InformerManager.class);
+ when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original));
+ doReturn(mim).when(informerEventSource).manager();
+
+ var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList();
+
+ assertThat(result).containsExactly(original);
+ }
+
+ @Test
+ void listWithStrongConsistencyReplacesOnlyMatchingResources() {
+ var dep1 = testDeployment();
+ var dep2 = testDeployment();
+ dep2.getMetadata().setName("other");
+ var newerDep1 = testDeployment();
+ newerDep1.getMetadata().setResourceVersion("5");
+
+ when(temporaryResourceCache.getResources())
+ .thenReturn(Map.of(ResourceID.fromResource(dep1), newerDep1));
+
+ var informerManager = mock(InformerManager.class);
+ when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2));
+ doReturn(informerManager).when(informerEventSource).manager();
+
+ var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList();
+
+ assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2);
+ }
+
+ @Test
+ void byIndexStreamWithStrongConsistencyReplacesFromTempCache() {
+ var original = testDeployment();
+ var newer = testDeployment();
+ newer.getMetadata().setResourceVersion("5");
+
+ when(temporaryResourceCache.getResources())
+ .thenReturn(Map.of(ResourceID.fromResource(original), newer));
+
+ var informerManager = mock(InformerManager.class);
+ when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original));
+ doReturn(informerManager).when(informerEventSource).manager();
+ informerEventSource.addIndexers(Map.of("idx", d -> List.of("key")));
+
+ var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList();
+
+ assertThat(result).containsExactly(newer);
+ }
+
+ @Test
+ void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() {
+ var original = testDeployment();
+ original.getMetadata().setResourceVersion("5");
+ var olderTemp = testDeployment();
+ olderTemp.getMetadata().setResourceVersion("3");
+
+ when(temporaryResourceCache.getResources())
+ .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp));
+
+ var mim = mock(InformerManager.class);
+ when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original));
+ doReturn(mim).when(informerEventSource).manager();
+
+ var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList();
+
+ assertThat(result).containsExactly(original);
+ }
+
+ @Test
+ void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() {
+ var original = testDeployment();
+ original.getMetadata().setResourceVersion("5");
+ var olderTemp = testDeployment();
+ olderTemp.getMetadata().setResourceVersion("3");
+
+ when(temporaryResourceCache.getResources())
+ .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp));
+
+ var mim = mock(InformerManager.class);
+ when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original));
+ doReturn(mim).when(informerEventSource).manager();
+ informerEventSource.addIndexers(Map.of("idx", d -> List.of("key")));
+
+ var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList();
+
+ assertThat(result).containsExactly(original);
+ }
+
+ @Test
+ void listWithStrongConsistencyAddsGhostResources() {
+ var resource = testDeployment();
+ var ghostResource = testDeployment();
+ ghostResource.getMetadata().setName("ghost");
+
+ when(temporaryResourceCache.getResources())
+ .thenReturn(Map.of(ResourceID.fromResource(ghostResource), ghostResource));
+
+ var mim = mock(InformerManager.class);
+ when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource));
+ doReturn(mim).when(informerEventSource).manager();
+
+ var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList();
+
+ assertThat(result).containsExactlyInAnyOrder(resource, ghostResource);
+ }
+
Deployment testDeployment() {
Deployment deployment = new Deployment();
deployment.setMetadata(new ObjectMeta());
diff --git a/operator-framework-junit/pom.xml b/operator-framework-junit/pom.xml
index e43c38e534..aa18d5c778 100644
--- a/operator-framework-junit/pom.xml
+++ b/operator-framework-junit/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
operator-framework-junit
diff --git a/operator-framework/pom.xml b/operator-framework/pom.xml
index 90fc061b0f..f94dfa757d 100644
--- a/operator-framework/pom.xml
+++ b/operator-framework/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
operator-framework
diff --git a/pom.xml b/pom.xml
index 7fa7964a9b..3059ee2a00 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
pom
Operator SDK for Java
Java SDK for implementing Kubernetes operators
diff --git a/sample-operators/controller-namespace-deletion/pom.xml b/sample-operators/controller-namespace-deletion/pom.xml
index 066a812307..af4be01972 100644
--- a/sample-operators/controller-namespace-deletion/pom.xml
+++ b/sample-operators/controller-namespace-deletion/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
sample-controller-namespace-deletion
diff --git a/sample-operators/leader-election/pom.xml b/sample-operators/leader-election/pom.xml
index e1df94fa1e..4f896485d1 100644
--- a/sample-operators/leader-election/pom.xml
+++ b/sample-operators/leader-election/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
sample-leader-election
diff --git a/sample-operators/mysql-schema/pom.xml b/sample-operators/mysql-schema/pom.xml
index 283aead757..d2872c921a 100644
--- a/sample-operators/mysql-schema/pom.xml
+++ b/sample-operators/mysql-schema/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
sample-mysql-schema-operator
diff --git a/sample-operators/operations/pom.xml b/sample-operators/operations/pom.xml
index 8667604f96..30762126c6 100644
--- a/sample-operators/operations/pom.xml
+++ b/sample-operators/operations/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
sample-operations
@@ -106,11 +106,6 @@
operations-operator
-
-
- -Dlog4j.configurationFile=/config/log4j2.xml
-
-
diff --git a/sample-operators/pom.xml b/sample-operators/pom.xml
index 504aba5e78..9313095584 100644
--- a/sample-operators/pom.xml
+++ b/sample-operators/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
sample-operators
diff --git a/sample-operators/tomcat-operator/pom.xml b/sample-operators/tomcat-operator/pom.xml
index d769c09bda..ea964a2b07 100644
--- a/sample-operators/tomcat-operator/pom.xml
+++ b/sample-operators/tomcat-operator/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
sample-tomcat-operator
diff --git a/sample-operators/webpage/pom.xml b/sample-operators/webpage/pom.xml
index 304486e5f4..d50e5ef03c 100644
--- a/sample-operators/webpage/pom.xml
+++ b/sample-operators/webpage/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
sample-webpage-operator
diff --git a/test-index-processor/pom.xml b/test-index-processor/pom.xml
index 51e15247ff..2ae7c5f454 100644
--- a/test-index-processor/pom.xml
+++ b/test-index-processor/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.4-SNAPSHOT
+ 999-SNAPSHOT
test-index-processor