diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java index 1f6f1b8890b82..9f23e5855811b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java @@ -62,7 +62,7 @@ public static QueryableStoreType key type of the store * @param value type of the store - * @return {@link QueryableStoreTypes.TimestampedKeyValueStoreType} + * @return {@link QueryableStoreTypes.TimestampedKeyValueStoreWithHeadersType} */ public static QueryableStoreType>> timestampedKeyValueStoreWithHeaders() { return new TimestampedKeyValueStoreWithHeadersType<>(); @@ -91,7 +91,7 @@ public static QueryableStoreType key type of the store * @param value type of the store @@ -113,7 +113,7 @@ public static QueryableStoreType> sessionStore } /** - * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore ReadOnlySessionStoree<K, AggregationWithHeaders<V>>}. + * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore ReadOnlySessionStore<K, AggregationWithHeaders<V>>}. * * @param key type of the store * @param value type of the store diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index b7300f1445477..f9a85d83b0386 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -989,19 +989,22 @@ private void throwIfBuiltInStore(final StateStore stateStore) { } /** - * Get the {@link KeyValueStore} or {@link TimestampedKeyValueStore} with the given name. + * Get the {@link KeyValueStore}, {@link TimestampedKeyValueStore}, or {@link TimestampedKeyValueStoreWithHeaders} + * with the given name. * The store can be a "regular" or global store. *

- * If the registered store is a {@link TimestampedKeyValueStore} this method will return a value-only query - * interface. It is highly recommended to update the code for this case to avoid bugs and to use - * {@link #getTimestampedKeyValueStore(String)} for full store access instead. + * If the registered store is a {@link TimestampedKeyValueStore} or {@link TimestampedKeyValueStoreWithHeaders} + * this method will return a value-only query interface. + * It is highly recommended to update the code for this case to avoid bugs and to use + * {@link #getTimestampedKeyValueStore(String)} or {@link #getTimestampedKeyValueStoreWithHeaders(String)} + * for full store access instead. *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link KeyValueStore} or {@link TimestampedKeyValueStore} - * has been registered with the given name + * @return the key value store, or {@code null} if no {@link KeyValueStore}, {@link TimestampedKeyValueStore}, or + * {@link TimestampedWindowStoreWithHeaders} has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getTimestampedKeyValueStore(String) @@ -1028,14 +1031,19 @@ public KeyValueStore getKeyValueStore(final String name) { } /** - * Get the {@link TimestampedKeyValueStore} with the given name. + * Get the {@link TimestampedKeyValueStore} or {@link TimestampedKeyValueStoreWithHeaders} with the given name. * The store can be a "regular" or global store. *

+ * If the registered store is a {@link TimestampedKeyValueStoreWithHeaders} this method will return a value-ts-only query interface. + * It is highly recommended to update the code for this case to avoid bugs and to use + * {@link #getTimestampedKeyValueStoreWithHeaders(String)} for full store access instead. + *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link TimestampedKeyValueStore} has been registered with the given name + * @return the key value store, or {@code null} if no {@link TimestampedKeyValueStore} or + * {@link TimestampedKeyValueStoreWithHeaders }has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) @@ -1090,7 +1098,7 @@ public KeyValueStore> getTimestampedKeyValueS * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link VersionedKeyValueStore} has been registered with the given name + * @return the versioned store, or {@code null} if no {@link VersionedKeyValueStore} has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) @@ -1109,19 +1117,22 @@ public VersionedKeyValueStore getVersionedKeyValueStore(final Strin } /** - * Get the {@link WindowStore} or {@link TimestampedWindowStore} with the given name. + * Get the {@link WindowStore}, {@link TimestampedWindowStore}, or {@link TimestampedWindowStoreWithHeaders} + * with the given name. * The store can be a "regular" or global store. *

- * If the registered store is a {@link TimestampedWindowStore} this method will return a value-only query - * interface. It is highly recommended to update the code for this case to avoid bugs and to use - * {@link #getTimestampedWindowStore(String)} for full store access instead. + * If the registered store is a {@link TimestampedWindowStore} or {@link TimestampedWindowStoreWithHeaders} + * this method will return a value-only query interface. + * It is highly recommended to update the code for this case to avoid bugs and to use + * {@link #getTimestampedWindowStore(String)} or {@link #getTimestampedWindowStoreWithHeaders(String)} + * for full store access instead. *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link WindowStore} or {@link TimestampedWindowStore} - * has been registered with the given name + * @return the window store, or {@code null} if no {@link WindowStore}, {@link TimestampedWindowStore}, or + * {@link TimestampedWindowStoreWithHeaders} has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) @@ -1148,14 +1159,20 @@ public WindowStore getWindowStore(final String name) { } /** - * Get the {@link TimestampedWindowStore} with the given name. + * Get the {@link TimestampedWindowStore} or {@link TimestampedWindowStoreWithHeaders} with the given name. * The store can be a "regular" or global store. *

+ * If the registered store is a {@link TimestampedWindowStoreWithHeaders} + * this method will return a value-ts--only query interface. + * It is highly recommended to update the code for this case to avoid bugs and to use + * {@link #getTimestampedWindowStoreWithHeaders(String)} for full store access instead. + *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link TimestampedWindowStore} has been registered with the given name + * @return the window store, or {@code null} if no {@link TimestampedWindowStore} or + * {@link TimestampedWindowStoreWithHeaders} has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) @@ -1203,14 +1220,20 @@ public WindowStore> getTimestampedWindowStore } /** - * Get the {@link SessionStore} with the given name. + * Get the {@link SessionStore} or {@link SessionStoreWithHeaders} with the given name. * The store can be a "regular" or global store. *

+ * If the registered store is a {@link SessionStoreWithHeaders} + * this method will return a value--only query interface. + * It is highly recommended to update the code for this case to avoid bugs and to use + * {@link #getSessionStoreWithHeaders(String)} for full store access instead. + *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link SessionStore} has been registered with the given name + * @return the session store, or {@code null} if no {@link SessionStore} or {@link SessionStoreWithHeaders} + * has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) @@ -1232,14 +1255,14 @@ public SessionStore getSessionStore(final String name) { } /** - * Get the {@link SessionStore} with the given name. + * Get the {@link SessionStoreWithHeaders} with the given name. * The store can be a "regular" or global store. *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link SessionStore} has been registered with the given name + * @return the session store, or {@code null} if no {@link SessionStoreWithHeaders} has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 7555c0783305c..c8651c50ec6cb 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -915,7 +915,7 @@ public void shouldReturnCorrectInMemoryStoreTypeOnly() { private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) { final String keyValueStoreName = "keyValueStore"; final String timestampedKeyValueStoreName = "keyValueTimestampStore"; - final String timestampedKeyValueStoreWithHeaderName = "keyValueTimestampStoreWithHeaders"; + final String timestampedKeyValueStoreWithHeadersName = "keyValueTimestampStoreWithHeaders"; final String versionedKeyValueStoreName = "keyValueVersionedStore"; final String windowStoreName = "windowStore"; final String timestampedWindowStoreName = "windowTimestampStore"; @@ -933,7 +933,7 @@ private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) { persistent, keyValueStoreName, timestampedKeyValueStoreName, - timestampedKeyValueStoreWithHeaderName, + timestampedKeyValueStoreWithHeadersName, versionedKeyValueStoreName, windowStoreName, timestampedWindowStoreName, @@ -981,15 +981,15 @@ private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) { assertNull(testDriver.getSessionStoreWithHeaders(versionedKeyValueStoreName)); } - assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreWithHeaderName)); - assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreWithHeaderName)); - assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeaderName)); - assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreWithHeaderName)); - assertNull(testDriver.getWindowStore(timestampedKeyValueStoreWithHeaderName)); - assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreWithHeaderName)); - assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreWithHeaderName)); - assertNull(testDriver.getSessionStore(timestampedKeyValueStoreWithHeaderName)); - assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreWithHeaderName)); + assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreWithHeadersName)); + assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreWithHeadersName)); + assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getWindowStore(timestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getSessionStore(timestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreWithHeadersName)); assertNull(testDriver.getKeyValueStore(windowStoreName)); assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));