Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimesta
*
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.TimestampedKeyValueStoreType}
* @return {@link QueryableStoreTypes.TimestampedKeyValueStoreWithHeadersType}
*/
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>> timestampedKeyValueStoreWithHeaders() {
return new TimestampedKeyValueStoreWithHeadersType<>();
Expand Down Expand Up @@ -91,7 +91,7 @@ public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp
}

/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore&lt;K, ValueTimestampeHeaders&lt;V&gt;&gt;}.
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore&lt;K, ValueTimestampHeaders&lt;V&gt;&gt;}.
*
* @param <K> key type of the store
* @param <V> value type of the store
Expand All @@ -113,7 +113,7 @@ public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore
}

/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore ReadOnlySessionStoree&lt;K, AggregationWithHeaders&lt;V&gt;&gt;}.
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore ReadOnlySessionStore&lt;K, AggregationWithHeaders&lt;V&gt;&gt;}.
*
* @param <K> key type of the store
* @param <V> value type of the store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,19 +989,22 @@ private void throwIfBuiltInStore(final StateStore stateStore) {
}

/**
* Get the {@link KeyValueStore} or {@link TimestampedKeyValueStore} with the given name.
* Get the {@link KeyValueStore}, {@link TimestampedKeyValueStore}, or {@link TimestampedKeyValueStoreWithHeaders}
* with the given name.
* The store can be a "regular" or global store.
* <p>
* If the registered store is a {@link TimestampedKeyValueStore} this method will return a value-only query
* interface. <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getTimestampedKeyValueStore(String)} for full store access instead.</strong>
* If the registered store is a {@link TimestampedKeyValueStore} or {@link TimestampedKeyValueStoreWithHeaders}
* this method will return a value-only query interface.
* <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getTimestampedKeyValueStore(String)} or {@link #getTimestampedKeyValueStoreWithHeaders(String)}
* for full store access instead.</strong>
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link KeyValueStore} or {@link TimestampedKeyValueStore}
* has been registered with the given name
* @return the key value store, or {@code null} if no {@link KeyValueStore}, {@link TimestampedKeyValueStore}, or
* {@link TimestampedWindowStoreWithHeaders} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getTimestampedKeyValueStore(String)
Expand All @@ -1028,14 +1031,19 @@ public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
}

/**
* Get the {@link TimestampedKeyValueStore} with the given name.
* Get the {@link TimestampedKeyValueStore} or {@link TimestampedKeyValueStoreWithHeaders} with the given name.
* The store can be a "regular" or global store.
* <p>
* If the registered store is a {@link TimestampedKeyValueStoreWithHeaders} this method will return a value-ts-only query interface.
* <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getTimestampedKeyValueStoreWithHeaders(String)} for full store access instead.</strong>
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link TimestampedKeyValueStore} has been registered with the given name
* @return the key value store, or {@code null} if no {@link TimestampedKeyValueStore} or
* {@link TimestampedKeyValueStoreWithHeaders }has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
Expand Down Expand Up @@ -1090,7 +1098,7 @@ public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>> getTimestampedKeyValueS
* {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link VersionedKeyValueStore} has been registered with the given name
* @return the versioned store, or {@code null} if no {@link VersionedKeyValueStore} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
Expand All @@ -1109,19 +1117,22 @@ public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final Strin
}

/**
* Get the {@link WindowStore} or {@link TimestampedWindowStore} with the given name.
* Get the {@link WindowStore}, {@link TimestampedWindowStore}, or {@link TimestampedWindowStoreWithHeaders}
* with the given name.
* The store can be a "regular" or global store.
* <p>
* If the registered store is a {@link TimestampedWindowStore} this method will return a value-only query
* interface. <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getTimestampedWindowStore(String)} for full store access instead.</strong>
* If the registered store is a {@link TimestampedWindowStore} or {@link TimestampedWindowStoreWithHeaders}
* this method will return a value-only query interface.
* <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getTimestampedWindowStore(String)} or {@link #getTimestampedWindowStoreWithHeaders(String)}
* for full store access instead.</strong>
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link WindowStore} or {@link TimestampedWindowStore}
* has been registered with the given name
* @return the window store, or {@code null} if no {@link WindowStore}, {@link TimestampedWindowStore}, or
* {@link TimestampedWindowStoreWithHeaders} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
Expand All @@ -1148,14 +1159,20 @@ public <K, V> WindowStore<K, V> getWindowStore(final String name) {
}

/**
* Get the {@link TimestampedWindowStore} with the given name.
* Get the {@link TimestampedWindowStore} or {@link TimestampedWindowStoreWithHeaders} with the given name.
* The store can be a "regular" or global store.
* <p>
* If the registered store is a {@link TimestampedWindowStoreWithHeaders}
* this method will return a value-ts--only query interface.
* <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getTimestampedWindowStoreWithHeaders(String)} for full store access instead.</strong>
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link TimestampedWindowStore} has been registered with the given name
* @return the window store, or {@code null} if no {@link TimestampedWindowStore} or
* {@link TimestampedWindowStoreWithHeaders} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
Expand Down Expand Up @@ -1203,14 +1220,20 @@ public <K, V> WindowStore<K, ValueTimestampHeaders<V>> getTimestampedWindowStore
}

/**
* Get the {@link SessionStore} with the given name.
* Get the {@link SessionStore} or {@link SessionStoreWithHeaders} with the given name.
* The store can be a "regular" or global store.
* <p>
* If the registered store is a {@link SessionStoreWithHeaders}
* this method will return a value--only query interface.
* <strong>It is highly recommended to update the code for this case to avoid bugs and to use
* {@link #getSessionStoreWithHeaders(String)} for full store access instead.</strong>
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link SessionStore} has been registered with the given name
* @return the session store, or {@code null} if no {@link SessionStore} or {@link SessionStoreWithHeaders}
* has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
Expand All @@ -1232,14 +1255,14 @@ public <K, V> SessionStore<K, V> getSessionStore(final String name) {
}

/**
* Get the {@link SessionStore} with the given name.
* Get the {@link SessionStoreWithHeaders} with the given name.
* The store can be a "regular" or global store.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
* @return the key value store, or {@code null} if no {@link SessionStore} has been registered with the given name
* @return the session store, or {@code null} if no {@link SessionStoreWithHeaders} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ public void shouldReturnCorrectInMemoryStoreTypeOnly() {
private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
final String keyValueStoreName = "keyValueStore";
final String timestampedKeyValueStoreName = "keyValueTimestampStore";
final String timestampedKeyValueStoreWithHeaderName = "keyValueTimestampStoreWithHeaders";
final String timestampedKeyValueStoreWithHeadersName = "keyValueTimestampStoreWithHeaders";
final String versionedKeyValueStoreName = "keyValueVersionedStore";
final String windowStoreName = "windowStore";
final String timestampedWindowStoreName = "windowTimestampStore";
Expand All @@ -933,7 +933,7 @@ private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
persistent,
keyValueStoreName,
timestampedKeyValueStoreName,
timestampedKeyValueStoreWithHeaderName,
timestampedKeyValueStoreWithHeadersName,
versionedKeyValueStoreName,
windowStoreName,
timestampedWindowStoreName,
Expand Down Expand Up @@ -981,15 +981,15 @@ private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
assertNull(testDriver.getSessionStoreWithHeaders(versionedKeyValueStoreName));
}

assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreWithHeaderName));
assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreWithHeaderName));
assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreWithHeaderName));
assertNull(testDriver.getWindowStore(timestampedKeyValueStoreWithHeaderName));
assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreWithHeaderName));
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
assertNull(testDriver.getSessionStore(timestampedKeyValueStoreWithHeaderName));
assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreWithHeadersName));
assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreWithHeadersName));
assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeadersName));
assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreWithHeadersName));
assertNull(testDriver.getWindowStore(timestampedKeyValueStoreWithHeadersName));
assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreWithHeadersName));
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreWithHeadersName));
assertNull(testDriver.getSessionStore(timestampedKeyValueStoreWithHeadersName));
assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreWithHeadersName));

assertNull(testDriver.getKeyValueStore(windowStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
Expand Down
Loading