diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java
index eec8ce16d..ba4103cd7 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/configuration/Configuration.java
@@ -5,25 +5,68 @@
import java.io.Serializable;
import java.util.Arrays;
+/**
+ * Serializable, typed view over a Flink {@link ParameterTool} that exposes a Dagger job's
+ * configuration.
+ *
+ *
It wraps the parameters supplied at job submission and provides typed accessors (string,
+ * string array, integer, boolean, long) with optional defaults, so the rest of the codebase can
+ * read configuration keys without dealing with parsing. Being {@link Serializable}, the
+ * configuration can be captured by operators and shipped with the Flink job graph.
+ */
public class Configuration implements Serializable {
+ /** The underlying Flink parameter source backing all lookups. */
private final ParameterTool param;
+ /**
+ * Wraps the given Flink parameters.
+ *
+ * @param param the parameter source holding the job's configuration key/value pairs
+ */
public Configuration(ParameterTool param) {
this.param = param;
}
+ /**
+ * Returns the underlying Flink parameter source.
+ *
+ * @return the wrapped {@link ParameterTool}
+ */
public ParameterTool getParam() {
return param;
}
+ /**
+ * Returns the value of a configuration key as a string.
+ *
+ * @param configKey the configuration key to look up
+ * @return the configured value, or {@code null} if the key is absent
+ */
public String getString(String configKey) {
return param.get(configKey);
}
+ /**
+ * Returns the value of a configuration key as a string, or a default when absent.
+ *
+ * @param configKey the configuration key to look up
+ * @param defaultValue the value to return when the key is not present
+ * @return the configured value, or {@code defaultValue} if the key is absent
+ */
public String getString(String configKey, String defaultValue) {
return param.get(configKey, defaultValue);
}
+ /**
+ * Returns the value of a configuration key split into a trimmed string array.
+ *
+ *
The raw value is split on commas with each element trimmed. When the key is missing or its
+ * value is blank, the supplied default array is returned instead.
+ *
+ * @param configKey the configuration key to look up
+ * @param defaultValue the array to return when the key is absent or blank
+ * @return the parsed comma-separated values, or {@code defaultValue} if none are present
+ */
public String[] getStringArray(String configKey, String[] defaultValue) {
String value = param.get(configKey);
if (value == null || value.trim().isEmpty()) {
@@ -33,14 +76,35 @@ public String[] getStringArray(String configKey, String[] defaultValue) {
return Arrays.stream(value.split(",")).map(String::trim).toArray(String[]::new);
}
+ /**
+ * Returns the value of a configuration key as an integer, or a default when absent.
+ *
+ * @param configKey the configuration key to look up
+ * @param defaultValue the value to return when the key is not present
+ * @return the configured integer, or {@code defaultValue} if the key is absent
+ */
public Integer getInteger(String configKey, Integer defaultValue) {
return param.getInt(configKey, defaultValue);
}
+ /**
+ * Returns the value of a configuration key as a boolean, or a default when absent.
+ *
+ * @param configKey the configuration key to look up
+ * @param defaultValue the value to return when the key is not present
+ * @return the configured boolean, or {@code defaultValue} if the key is absent
+ */
public Boolean getBoolean(String configKey, Boolean defaultValue) {
return param.getBoolean(configKey, defaultValue);
}
+ /**
+ * Returns the value of a configuration key as a long, or a default when absent.
+ *
+ * @param configKey the configuration key to look up
+ * @param defaultValue the value to return when the key is not present
+ * @return the configured long, or {@code defaultValue} if the key is absent
+ */
public Long getLong(String configKey, Long defaultValue) {
return param.getLong(configKey, defaultValue);
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/Constants.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/Constants.java
index d1ab88b6f..ab3125e9c 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/Constants.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/Constants.java
@@ -1,33 +1,67 @@
package com.gotocompany.dagger.common.core;
+/**
+ * Centralized constants shared across Dagger's common module.
+ *
+ *
Holds the configuration keys (and their default values) for the Stencil schema-registry
+ * client, a handful of telemetry/UDF identifiers, stream input-schema configuration keys, and the
+ * internal column names Dagger appends to every deserialized {@link org.apache.flink.types.Row}.
+ * This is a constant holder and is not intended to be instantiated.
+ */
public class Constants {
+ /** Configuration key toggling use of the remote Stencil schema registry. */
public static final String SCHEMA_REGISTRY_STENCIL_ENABLE_KEY = "SCHEMA_REGISTRY_STENCIL_ENABLE";
+ /** Default for {@link #SCHEMA_REGISTRY_STENCIL_ENABLE_KEY}: remote Stencil disabled. */
public static final boolean SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT = false;
+ /** Configuration key holding the comma-separated Stencil registry URLs. */
public static final String SCHEMA_REGISTRY_STENCIL_URLS_KEY = "SCHEMA_REGISTRY_STENCIL_URLS";
+ /** Default for {@link #SCHEMA_REGISTRY_STENCIL_URLS_KEY}: no URLs configured. */
public static final String SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT = "";
+ /** Configuration key for the Stencil descriptor fetch timeout, in milliseconds. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS = "SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS";
+ /** Default Stencil fetch timeout: {@code 10000} ms (10 seconds). */
public static final Integer SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT = 10000;
+ /** Configuration key for the comma-separated {@code name:value} HTTP headers sent to Stencil. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS";
+ /** Default for {@link #SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_KEY}: no headers. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_DEFAULT = "";
+ /** Configuration key toggling automatic refresh of the Stencil descriptor cache. */
public static final String SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH_KEY = "SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH";
+ /** Default for {@link #SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH_KEY}: auto-refresh disabled. */
public static final boolean SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH_DEFAULT = false;
+ /** Configuration key for the Stencil descriptor cache time-to-live, in milliseconds. */
public static final String SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS_KEY = "SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS";
+ /** Default Stencil cache TTL: {@code 900000} ms (15 minutes). */
public static final Long SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS_DEFAULT = 900000L;
+ /** Configuration key selecting the Stencil schema refresh strategy. */
public static final String SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY_KEY = "SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY";
+ /** Default Stencil refresh strategy: {@code "LONG_POLLING"}. */
public static final String SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY_DEFAULT = "LONG_POLLING";
+ /** Configuration key for the minimum back-off between Stencil fetch retries, in milliseconds. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS";
+ /** Default minimum Stencil fetch back-off: {@code 60000} ms (1 minute). */
public static final Long SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS_DEFAULT = 60000L;
+ /** Configuration key for the number of times a failed Stencil fetch is retried. */
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES";
+ /** Default number of Stencil fetch retries: {@code 4}. */
public static final Integer SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES_DEFAULT = 4;
+ /** Metric group key under which user-defined function (UDF) telemetry is reported. */
public static final String UDF_TELEMETRY_GROUP_KEY = "udf";
+ /** Aspect/field name used when publishing gauge metric values. */
public static final String GAUGE_ASPECT_NAME = "value";
+ /** Length of the sliding time window used when aggregating/reporting metrics. */
public static final long SLIDING_TIME_WINDOW = 10;
+ /** Stream configuration key naming the protobuf class for an input stream's schema. */
public static final String STREAM_INPUT_SCHEMA_PROTO_CLASS = "INPUT_SCHEMA_PROTO_CLASS";
+ /** Stream configuration key naming the table/alias for an input stream. */
public static final String STREAM_INPUT_SCHEMA_TABLE = "INPUT_SCHEMA_TABLE";
+ /** Configuration key holding the definition of the job's input streams. */
public static final String INPUT_STREAMS = "STREAMS";
+ /** Name of the internal boolean column Dagger appends to flag whether a record parsed successfully. */
public static final String INTERNAL_VALIDATION_FIELD_KEY = "__internal_validation_field__";
+ /** Default name of the event-time (rowtime) attribute column appended to every row. */
public static final String ROWTIME = "rowtime";
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/DaggerContext.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/DaggerContext.java
index 79c8eaa2b..af4350aba 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/DaggerContext.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/DaggerContext.java
@@ -13,10 +13,25 @@
* It initializes with StreamExecutionEnvironment, StreamTableEnvironment and Configuration.
*/
public class DaggerContext {
+ /**
+ * Logger used to record lifecycle events of the {@link DaggerContext} singleton.
+ */
private static final Logger LOGGER = LoggerFactory.getLogger(DaggerContext.class.getName());
+ /**
+ * The lazily-created, {@code volatile} singleton instance shared across the Dagger job.
+ */
private static volatile DaggerContext daggerContext = null;
+ /**
+ * The Flink {@link StreamExecutionEnvironment} that backs the streaming job.
+ */
private final StreamExecutionEnvironment executionEnvironment;
+ /**
+ * The Flink {@link StreamTableEnvironment} used to evaluate the Table/SQL pipeline.
+ */
private final StreamTableEnvironment tableEnvironment;
+ /**
+ * The user-supplied {@link Configuration} that parameterizes the Dagger job.
+ */
private final Configuration configuration;
/**
@@ -55,14 +70,29 @@ public static synchronized DaggerContext init(Configuration configuration) {
return daggerContext;
}
+ /**
+ * Returns the Flink {@link StreamExecutionEnvironment} held by this context.
+ *
+ * @return the stream execution environment
+ */
public StreamExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}
+ /**
+ * Returns the Flink {@link StreamTableEnvironment} held by this context.
+ *
+ * @return the stream table environment
+ */
public StreamTableEnvironment getTableEnvironment() {
return tableEnvironment;
}
+ /**
+ * Returns the {@link Configuration} that was used to initialize this context.
+ *
+ * @return the configuration
+ */
public Configuration getConfiguration() {
return configuration;
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/FieldDescriptorCache.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/FieldDescriptorCache.java
index 8b265dc87..c4b255617 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/FieldDescriptorCache.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/FieldDescriptorCache.java
@@ -8,15 +8,42 @@
import java.util.Map;
+/**
+ * Serializable cache of protobuf field positions, used to keep field indices stable when the
+ * Stencil schema is refreshed at runtime.
+ *
+ *
On construction it walks a protobuf {@link Descriptors.Descriptor} recursively (descending into
+ * nested {@code MESSAGE} fields) and records, for every field, its original declared index keyed by
+ * fully-qualified name, plus the field count (arity) of every message type keyed by fully-qualified
+ * name. The protobuf and Parquet deserializers consult this cache so that rows are built against the
+ * field layout captured at startup rather than against a possibly-reordered refreshed descriptor.
+ */
public class FieldDescriptorCache implements Serializable {
+ /** Maps each field's fully-qualified name to its original declared index within its message. */
private final Map fieldDescriptorIndexMap = new HashMap<>();
+ /** Maps each message type's fully-qualified name to its original field count (arity). */
private final Map protoDescriptorArityMap = new HashMap<>();
+ /**
+ * Builds a cache by recursively indexing the given descriptor and all nested message types.
+ *
+ * @param descriptor the root protobuf descriptor to index
+ */
public FieldDescriptorCache(Descriptors.Descriptor descriptor) {
cacheFieldDescriptorMap(descriptor);
}
+ /**
+ * Recursively records field indices and message arities for the given descriptor.
+ *
+ *
Returns early if this message type has already been cached, which both avoids repeated work
+ * and guards against recursive or self-referential schemas. For every field it stores the
+ * field's original index, and for nested {@code MESSAGE} fields it recurses into the referenced
+ * message type.
+ *
+ * @param descriptor the protobuf descriptor whose fields should be cached
+ */
public void cacheFieldDescriptorMap(Descriptors.Descriptor descriptor) {
if (protoDescriptorArityMap.containsKey(descriptor.getFullName())) {
@@ -37,6 +64,13 @@ public void cacheFieldDescriptorMap(Descriptors.Descriptor descriptor) {
}
}
+ /**
+ * Returns the cached original index of a protobuf field.
+ *
+ * @param fieldDescriptor the field whose original index is requested
+ * @return the field's original declared index captured when it was cached
+ * @throws IllegalArgumentException if the field is not present in the cache
+ */
public int getOriginalFieldIndex(Descriptors.FieldDescriptor fieldDescriptor) {
if (!fieldDescriptorIndexMap.containsKey(fieldDescriptor.getFullName())) {
throw new IllegalArgumentException("The Field Descriptor " + fieldDescriptor.getFullName() + " was not found in the cache");
@@ -44,11 +78,24 @@ public int getOriginalFieldIndex(Descriptors.FieldDescriptor fieldDescriptor) {
return fieldDescriptorIndexMap.get(fieldDescriptor.getFullName());
}
+ /**
+ * Indicates whether a field (by fully-qualified name) is present in the cache.
+ *
+ * @param fieldName the fully-qualified field name to look up
+ * @return {@code true} if the field has been cached, {@code false} otherwise
+ */
public boolean containsField(String fieldName) {
return fieldDescriptorIndexMap.containsKey(fieldName);
}
+ /**
+ * Returns the cached original field count (arity) of a protobuf message type.
+ *
+ * @param descriptor the message descriptor whose original field count is requested
+ * @return the number of fields the message declared when it was cached
+ * @throws IllegalArgumentException if the descriptor is not present in the cache
+ */
public int getOriginalFieldCount(Descriptors.Descriptor descriptor) {
if (!protoDescriptorArityMap.containsKey(descriptor.getFullName())) {
throw new IllegalArgumentException("The Proto Descriptor " + descriptor.getFullName() + " was not found in the cache");
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/StencilClientOrchestrator.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/StencilClientOrchestrator.java
index f182e66f1..cfcbe84eb 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/StencilClientOrchestrator.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/StencilClientOrchestrator.java
@@ -23,9 +23,21 @@
* The Stencil client orchestrator for dagger.
*/
public class StencilClientOrchestrator implements Serializable {
+ /**
+ * The process-wide {@link StencilClient} cached and shared across orchestrator instances.
+ */
private static StencilClient stencilClient;
+ /**
+ * Logger used to report invalid header configuration and other diagnostics.
+ */
private static final Logger LOGGER = LoggerFactory.getLogger(StencilClientOrchestrator.class);
+ /**
+ * The Dagger {@link Configuration} from which Stencil settings are read.
+ */
private Configuration configuration;
+ /**
+ * The de-duplicated set of Stencil (schema registry) URLs to fetch descriptors from.
+ */
private HashSet stencilUrls;
/**
@@ -38,6 +50,12 @@ public StencilClientOrchestrator(Configuration configuration) {
this.stencilUrls = getStencilUrls();
}
+ /**
+ * Builds a {@link StencilConfig} from the Dagger {@link Configuration}, wiring fetch headers,
+ * timeouts, cache behaviour, refresh strategy and retry/backoff settings.
+ *
+ * @return the assembled Stencil configuration
+ */
public StencilConfig createStencilConfig() {
return StencilConfig.builder()
.fetchHeaders(getHeaders(configuration))
@@ -50,6 +68,16 @@ public StencilConfig createStencilConfig() {
.build();
}
+ /**
+ * Resolves the {@link SchemaRefreshStrategy} to use for the Stencil cache.
+ *
+ *
Returns a version-based refresh strategy when {@code refreshStrategy} equals
+ * {@code "VERSION_BASED_REFRESH"} (case-insensitive); otherwise it falls back to the
+ * long-polling strategy, including when {@code refreshStrategy} is {@code null}.
+ *
+ * @param refreshStrategy the configured refresh-strategy name, may be {@code null}
+ * @return the matching schema refresh strategy
+ */
private SchemaRefreshStrategy getSchemaRefreshStrategy(String refreshStrategy) {
if (refreshStrategy == null) {
return SchemaRefreshStrategy.longPollingStrategy();
@@ -61,6 +89,12 @@ private SchemaRefreshStrategy getSchemaRefreshStrategy(String refreshStrategy) {
}
+ /**
+ * Reads the configured fetch-header string and parses it into HTTP headers.
+ *
+ * @param config the configuration to read the header string from
+ * @return the parsed list of {@code Header} objects; empty when none are configured
+ */
private List getHeaders(Configuration config) {
String headerString = config.getString(SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_KEY, SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_DEFAULT);
return parseHeaders(headerString);
@@ -97,6 +131,16 @@ public StencilClient enrichStencilClient(List additionalStencilUrls) {
return stencilClient;
}
+ /**
+ * Creates a {@link StencilClient} for the given URLs.
+ *
+ *
When remote Stencil is enabled in the configuration a registry-backed client is built
+ * from {@code urls} and the {@link StencilConfig}; otherwise a default in-classpath client
+ * is returned.
+ *
+ * @param urls the Stencil registry URLs to fetch descriptors from
+ * @return the initialized Stencil client
+ */
private StencilClient initStencilClient(List urls) {
StencilConfig stencilConfig = createStencilConfig();
boolean enableRemoteStencil = configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT);
@@ -105,6 +149,13 @@ private StencilClient initStencilClient(List urls) {
: StencilClientFactory.getClient();
}
+ /**
+ * Splits a comma-separated header string into individual, validated HTTP headers.
+ *
+ * @param headersString the raw {@code key:value} header pairs separated by commas,
+ * treated as empty when {@code null}
+ * @return the list of valid parsed headers; invalid entries are skipped
+ */
private List parseHeaders(String headersString) {
headersString = headersString == null ? "" : headersString;
return Arrays.stream(headersString.split(","))
@@ -114,6 +165,15 @@ private List parseHeaders(String headersString) {
.collect(Collectors.toList());
}
+ /**
+ * Checks whether a single header entry is well-formed, i.e. it contains exactly one
+ * non-empty key and one non-empty value separated by a colon.
+ *
+ *
A non-empty but malformed entry is logged and treated as invalid.
+ *
+ * @param headerString the trimmed {@code key:value} entry to validate
+ * @return {@code true} if the entry has a valid key and value, otherwise {@code false}
+ */
private Boolean isValidHeader(String headerString) {
Boolean isValid = Arrays.stream(headerString.split(":")).map(String::trim).filter(a -> !a.isEmpty()).count() == 2;
if (!isValid && !headerString.isEmpty()) {
@@ -122,11 +182,22 @@ private Boolean isValidHeader(String headerString) {
return isValid;
}
+ /**
+ * Converts a single {@code key:value} entry into a {@link BasicHeader}.
+ *
+ * @param headerString the entry to parse; expected to contain a colon separator
+ * @return the header built from the trimmed key and value
+ */
private BasicHeader parseHeader(String headerString) {
String[] split = headerString.split(":");
return new BasicHeader(split[0].trim(), split[1].trim());
}
+ /**
+ * Reads the configured Stencil URLs and collects them into a de-duplicated set.
+ *
+ * @return the set of trimmed, unique Stencil registry URLs
+ */
private HashSet getStencilUrls() {
stencilUrls = Arrays.stream(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT).split(","))
.map(String::trim)
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/StreamInfo.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/StreamInfo.java
index 7b87b9fb4..f55d717f1 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/StreamInfo.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/StreamInfo.java
@@ -7,7 +7,13 @@
* The class to hold the data stream and column names.
*/
public class StreamInfo {
+ /**
+ * The underlying Flink data stream of {@link Row} records.
+ */
private DataStream dataStream;
+ /**
+ * The column names describing the schema of each {@link Row} in the stream.
+ */
private String[] columnNames;
/**
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/Transformer.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/Transformer.java
index 5f31d33a5..fe9db85c7 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/core/Transformer.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/core/Transformer.java
@@ -4,5 +4,14 @@
* The interface for all the transformer.
*/
public interface Transformer {
+ /**
+ * Applies this transformation to the given {@link StreamInfo} and returns the result.
+ *
+ *
Implementations typically derive a new {@code DataStream} and/or column layout
+ * from the input and wrap them in the returned {@link StreamInfo}.
+ *
+ * @param streamInfo the input stream and its column metadata to transform
+ * @return the transformed stream information
+ */
StreamInfo transform(StreamInfo streamInfo);
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/DescriptorNotFoundException.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/DescriptorNotFoundException.java
index f249d2639..e483180ec 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/DescriptorNotFoundException.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/DescriptorNotFoundException.java
@@ -4,6 +4,7 @@
* The class Exception if Descriptor not found.
*/
public class DescriptorNotFoundException extends RuntimeException {
+ /** Default detail message used when no specific descriptor error message is supplied. */
public static final String DESCRIPTOR_NOT_FOUND = "descriptor not found";
/**
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/DaggerDeserializationException.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/DaggerDeserializationException.java
index 4baddb397..3be5eccaf 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/DaggerDeserializationException.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/DaggerDeserializationException.java
@@ -13,6 +13,11 @@ public DaggerDeserializationException(Exception innerException) {
super(innerException);
}
+ /**
+ * Instantiates a new Dagger deserialization exception with the specified detail message.
+ *
+ * @param message the detail message describing the deserialization failure
+ */
public DaggerDeserializationException(String message) {
super(message);
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/InvalidJSONSchemaException.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/InvalidJSONSchemaException.java
index a3def3374..e7750fbb1 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/InvalidJSONSchemaException.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/InvalidJSONSchemaException.java
@@ -1,6 +1,19 @@
package com.gotocompany.dagger.common.exceptions.serde;
+/**
+ * Unchecked exception thrown when a configured JSON schema cannot be parsed or is structurally
+ * invalid.
+ *
+ *
Dagger can derive types and field mappings from a JSON schema when consuming or producing
+ * JSON-encoded data. If that schema is malformed or cannot be interpreted, this exception wraps the
+ * underlying parsing failure so the job fails fast instead of proceeding with an unusable schema.
+ */
public class InvalidJSONSchemaException extends RuntimeException {
+ /**
+ * Creates a new exception that wraps the underlying cause of the schema parsing failure.
+ *
+ * @param innerException the exception thrown while reading or parsing the JSON schema
+ */
public InvalidJSONSchemaException(Exception innerException) {
super(innerException);
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/SimpleGroupParsingException.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/SimpleGroupParsingException.java
index e2b162605..cf2084c48 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/SimpleGroupParsingException.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/exceptions/serde/SimpleGroupParsingException.java
@@ -5,6 +5,11 @@
**/
public class SimpleGroupParsingException extends RuntimeException {
+ /**
+ * Instantiates a new Simple group parsing exception with the specified detail message.
+ *
+ * @param message the detail message describing why the field could not be parsed
+ */
public SimpleGroupParsingException(String message) {
super(message);
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/aspects/AspectType.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/aspects/AspectType.java
index 47b19499b..ccf09a197 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/aspects/AspectType.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/aspects/AspectType.java
@@ -4,8 +4,12 @@
* The enum Aspect type.
*/
public enum AspectType {
+ /** Aspect reported as a point-in-time gauge value. */
Gauge,
+ /** Aspect reported as a distribution of values via a histogram. */
Histogram,
+ /** Aspect reported as a metered rate of events. */
Metric,
+ /** Aspect reported as a monotonically increasing counter. */
Counter
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/aspects/Aspects.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/aspects/Aspects.java
index 6d66ef558..38a3fea74 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/aspects/Aspects.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/aspects/Aspects.java
@@ -4,7 +4,17 @@
* The interface for aspects.
*/
public interface Aspects {
+ /**
+ * Returns the metric name used when registering this aspect with a metric group.
+ *
+ * @return the metric name of the aspect
+ */
String getValue();
+ /**
+ * Returns the kind of metric this aspect represents.
+ *
+ * @return the {@link AspectType} of the aspect
+ */
AspectType getAspectType();
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/CounterStatsManager.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/CounterStatsManager.java
index 46f9dede1..006bb48ac 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/CounterStatsManager.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/CounterStatsManager.java
@@ -12,7 +12,9 @@
* The Counter stats manager.
*/
public class CounterStatsManager {
+ /** The Flink metric group under which counters are registered. */
private MetricGroup metricGroup;
+ /** Registered counters keyed by the aspect they measure. */
private Map counters = new HashMap<>();
/**
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/GaugeStatsManager.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/GaugeStatsManager.java
index ddcde1f91..2b3a08139 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/GaugeStatsManager.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/GaugeStatsManager.java
@@ -9,7 +9,9 @@
*/
public class GaugeStatsManager {
+ /** Whether gauge registration is enabled; when {@code false} all register calls are no-ops. */
private final Boolean enabled;
+ /** The Flink metric group under which gauges are registered. */
private final MetricGroup metricGroup;
/**
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/MeterStatsManager.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/MeterStatsManager.java
index 968267639..3986cb018 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/MeterStatsManager.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/metrics/managers/MeterStatsManager.java
@@ -20,9 +20,13 @@
* The Meter stats manager.
*/
public class MeterStatsManager {
+ /** Histograms keyed by aspect, used to record value distributions over a sliding time window. */
private final HashMap histogramMap;
+ /** Whether metric registration and updates are enabled; when {@code false} calls are no-ops. */
private Boolean enabled;
+ /** Meters keyed by aspect, used to record event rates. */
private HashMap meterMap;
+ /** The Flink metric group under which histograms and meters are registered. */
private MetricGroup metricGroup;
/**
@@ -65,6 +69,12 @@ public void register(String groupName, Aspects[] aspects) {
}
}
+ /**
+ * Creates a Dropwizard-compatible histogram backed by a sliding time window reservoir.
+ *
+ * @return a new {@code com.codahale.metrics.Histogram} that retains samples for the
+ * configured {@code SLIDING_TIME_WINDOW} number of seconds
+ */
private com.codahale.metrics.Histogram getHistogram() {
return new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(SLIDING_TIME_WINDOW, TimeUnit.SECONDS));
}
@@ -105,6 +115,16 @@ public void register(String groupKey, String groupValue, Aspects[] aspects) {
}
}
+ /**
+ * Registers the given aspects against the supplied metric group.
+ *
+ *
For each aspect, a Dropwizard-backed histogram is created when its
+ * {@link AspectType} is {@link AspectType#Histogram}, and a meter is created when it is
+ * {@link AspectType#Metric}; aspects of any other type are ignored.
+ *
+ * @param group the Flink metric group to register the metrics under
+ * @param aspects the aspects to register
+ */
private void register(MetricGroup group, Aspects[] aspects) {
for (Aspects aspect : aspects) {
if (AspectType.Histogram.equals(aspect.getAspectType())) {
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DaggerDeserializer.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DaggerDeserializer.java
index e24cf608e..395489997 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DaggerDeserializer.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DaggerDeserializer.java
@@ -4,6 +4,24 @@
import java.io.Serializable;
+/**
+ * Common contract implemented by every Dagger source deserializer.
+ *
+ *
A {@code DaggerDeserializer} turns raw bytes consumed from a source (for example a Kafka
+ * record value or a Parquet {@code SimpleGroup}) into a Flink {@link org.apache.flink.types.Row}
+ * that subsequently flows through the streaming job. Concrete implementations include the
+ * protobuf, JSON and Parquet deserializers, all of which carry this marker so they can be wired
+ * into Dagger sources interchangeably.
+ *
+ *
The interface deliberately combines two concerns required by Flink. It extends
+ * {@link Serializable} so the deserializer can be shipped to task managers as part of the
+ * serialized job graph, and it extends {@link ResultTypeQueryable} so Flink can statically query
+ * the {@code TypeInformation} of the records this deserializer produces (needed for the SQL
+ * planner and for state/serializer selection).
+ *
+ * @param the element type emitted by the deserializer, typically
+ * {@link org.apache.flink.types.Row}
+ */
public interface DaggerDeserializer extends Serializable, ResultTypeQueryable {
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DaggerInternalTypeInformation.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DaggerInternalTypeInformation.java
index 2dd563df6..2c794bcde 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DaggerInternalTypeInformation.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DaggerInternalTypeInformation.java
@@ -10,9 +10,43 @@
import java.util.ArrayList;
import java.util.Arrays;
+/**
+ * Contract for source schema adapters that expose the Flink {@link Row} type produced by a Dagger
+ * input stream and that append Dagger's internal trailing columns to it.
+ *
+ *
Implementations such as {@code ProtoType} and {@code JsonType} translate an external schema
+ * (a protobuf descriptor or a JSON schema string) into a Flink {@code TypeInformation}. On top
+ * of the user-visible columns, every row emitted by a Dagger deserializer carries two extra
+ * trailing fields used internally by the pipeline; {@link #addInternalFields} centralizes how
+ * those columns are added so all input formats stay consistent.
+ */
public interface DaggerInternalTypeInformation {
+
+ /**
+ * Builds the complete Flink row type for this stream, including Dagger's internal trailing
+ * columns.
+ *
+ * @return the {@code TypeInformation} describing the {@link Row} emitted by the
+ * corresponding deserializer, with the validation flag and rowtime columns appended
+ */
TypeInformation getRowType();
+ /**
+ * Appends Dagger's two internal trailing columns to an existing row type.
+ *
+ *
The supplied {@code initialTypeInfo} (which is expected to be a {@link RowTypeInfo}
+ * describing the user-facing columns) is extended with two fields, in this exact order: a
+ * boolean validation flag stored under {@link Constants#INTERNAL_VALIDATION_FIELD_KEY} that
+ * records whether the source record was parsed successfully, followed by an event-time column
+ * named {@code rowtimeAttributeName} of type {@code SQL_TIMESTAMP} that Flink uses as the
+ * rowtime attribute for watermarking and windowing.
+ *
+ * @param initialTypeInfo the row type describing the user-visible columns; cast to
+ * {@link RowTypeInfo} to read its field names and types
+ * @param rowtimeAttributeName the name to assign to the appended event-time (rowtime) column
+ * @return a new named {@code TypeInformation} containing the original fields followed by
+ * the boolean validation flag and the rowtime timestamp column
+ */
default TypeInformation addInternalFields(TypeInformation initialTypeInfo, String rowtimeAttributeName) {
RowTypeInfo rowTypeInfo = (RowTypeInfo) initialTypeInfo;
ArrayList fieldNames = new ArrayList<>(Arrays.asList(rowTypeInfo.getFieldNames()));
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DataTypes.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DataTypes.java
index 6be1bf8e2..6f80edd73 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DataTypes.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/DataTypes.java
@@ -1,6 +1,15 @@
package com.gotocompany.dagger.common.serde;
+/**
+ * Enumerates the wire formats Dagger can deserialize records from on an input stream.
+ *
+ *
The selected value is typically derived from stream configuration and used to choose the
+ * matching {@code DaggerDeserializer} implementation (a JSON-backed deserializer versus a
+ * protobuf-backed one) when a Dagger source is constructed.
+ */
public enum DataTypes {
+ /** Records encoded as JSON documents, deserialized against a configured JSON schema. */
JSON,
+ /** Records encoded as Protocol Buffers messages, deserialized via a Stencil descriptor. */
PROTO
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/json/deserialization/JsonDeserializer.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/json/deserialization/JsonDeserializer.java
index 9bce511b6..6c8132699 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/json/deserialization/JsonDeserializer.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/json/deserialization/JsonDeserializer.java
@@ -17,11 +17,35 @@
import static com.gotocompany.dagger.common.core.Constants.ROWTIME;
+/**
+ * Kafka deserialization schema that converts JSON-encoded record values into Flink {@link Row}
+ * instances for a Dagger input stream.
+ *
+ *
Parsing is delegated to Flink's {@link JsonRowDeserializationSchema}, which is built from the
+ * row type produced by {@link JsonType} (the user-visible columns plus Dagger's internal trailing
+ * columns). After parsing, each row is post-processed so that the trailing validation flag is set
+ * and the rowtime column is populated with a {@link Timestamp} derived from the configured source
+ * field. Any parsing or conversion failure is surfaced as a {@link DaggerDeserializationException}
+ * so that Dagger sources can handle errors uniformly.
+ */
public class JsonDeserializer implements KafkaDeserializationSchema, DaggerDeserializer {
+ /** Backing Flink schema that parses JSON bytes into a {@link Row} matching {@link #typeInformation}. */
private final JsonRowDeserializationSchema jsonRowDeserializationSchema;
+ /** Index, within the produced row, of the field whose value supplies the event-time (rowtime). */
private final int rowtimeIdx;
+ /** Full produced {@link Row} type, including Dagger's internal validation and rowtime columns. */
private final TypeInformation typeInformation;
+ /**
+ * Instantiates a new JSON deserializer for a Dagger source.
+ *
+ *
Builds the produced row type from the JSON schema (appending Dagger's internal columns
+ * under the {@code ROWTIME} attribute name), constructs the backing Flink JSON schema from that
+ * type, and resolves the index of the field that carries the event time.
+ *
+ * @param jsonSchema the JSON schema string describing the stream's records
+ * @param rowtimeFieldName the name of the field whose value is used as the event-time (rowtime)
+ */
public JsonDeserializer(String jsonSchema, String rowtimeFieldName) {
this.typeInformation = new JsonType(jsonSchema, ROWTIME).getRowType();
this.jsonRowDeserializationSchema = new JsonRowDeserializationSchema.Builder(typeInformation).build();
@@ -29,11 +53,32 @@ public JsonDeserializer(String jsonSchema, String rowtimeFieldName) {
this.rowtimeIdx = rowTypeInfo.getFieldIndex(rowtimeFieldName);
}
+ /**
+ * {@inheritDoc}
+ *
+ *
Dagger streams are unbounded, so this always returns {@code false}; no element is ever
+ * treated as an end-of-stream marker.
+ *
+ * @param nextElement the most recently deserialized row (ignored)
+ * @return {@code false} always
+ */
@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
+ /**
+ * {@inheritDoc}
+ *
+ *
Parses the Kafka record's value bytes as JSON into a {@link Row} using the backing
+ * {@link JsonRowDeserializationSchema}, then populates Dagger's internal validation and rowtime
+ * columns via {@link #addTimestampFieldToRow(Row)}.
+ *
+ * @param consumerRecord the Kafka record whose value holds the JSON payload
+ * @return the deserialized row with its validation flag set and rowtime column populated
+ * @throws DaggerDeserializationException if the payload cannot be parsed, or the rowtime field
+ * carries an unsupported type
+ */
@Override
public Row deserialize(ConsumerRecord consumerRecord) {
try {
@@ -44,11 +89,33 @@ public Row deserialize(ConsumerRecord consumerRecord) {
}
}
+ /**
+ * {@inheritDoc}
+ *
+ * @return the {@link Row} {@code TypeInformation} produced by the backing JSON schema, that is
+ * the user columns followed by Dagger's internal trailing columns
+ */
@Override
public TypeInformation getProducedType() {
return jsonRowDeserializationSchema.getProducedType();
}
+ /**
+ * Copies the user-visible columns of a freshly parsed row and fills in Dagger's internal
+ * trailing columns.
+ *
+ *
All fields except the final two are copied verbatim into a new {@link Row} of the same
+ * arity. The event-time value is read from {@link #rowtimeIdx}: a {@link BigDecimal} is
+ * interpreted as epoch seconds and converted with {@link Instant#ofEpochSecond(long)}, while an
+ * existing {@link Timestamp} is used as-is. The last column is then set to that timestamp and
+ * the second-to-last column (the validation flag) is set to {@code true}.
+ *
+ * @param row the row produced by the backing JSON schema
+ * @return a new row holding the user columns plus the populated validation flag and rowtime
+ * timestamp
+ * @throws DaggerDeserializationException if the rowtime field is neither a {@link BigDecimal}
+ * nor a {@link Timestamp}
+ */
private Row addTimestampFieldToRow(Row row) {
Row finalRecord = new Row(row.getArity());
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/json/deserialization/JsonType.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/json/deserialization/JsonType.java
index 53aea049b..f1d9955fc 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/json/deserialization/JsonType.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/json/deserialization/JsonType.java
@@ -8,15 +8,43 @@
import java.io.Serializable;
+/**
+ * Derives the Flink {@link Row} {@code TypeInformation} for a JSON-encoded input stream from its
+ * JSON schema.
+ *
+ *
The configured JSON schema string is converted into a named row type using Flink's
+ * {@link JsonRowSchemaConverter}, after which Dagger's internal trailing columns (the boolean
+ * validation flag and the rowtime timestamp) are appended through {@link #addInternalFields}.
+ * Instances are {@link Serializable} so the schema description can travel with the serialized job
+ * graph; the resulting type is consumed when constructing the JSON deserializer for a Kafka source.
+ */
public class JsonType implements Serializable, DaggerInternalTypeInformation {
+ /** JSON schema string (JSON-Schema syntax) describing the user-visible columns of the stream. */
private String jsonSchema;
+ /** Name to assign to the appended event-time (rowtime) column. */
private String rowtimeAttributeName;
+ /**
+ * Instantiates a new JSON type descriptor for a stream.
+ *
+ * @param jsonSchema the JSON schema string describing the stream's records
+ * @param rowtimeAttributeName the name of the event-time (rowtime) column to append
+ */
public JsonType(String jsonSchema, String rowtimeAttributeName) {
this.jsonSchema = jsonSchema;
this.rowtimeAttributeName = rowtimeAttributeName;
}
+ /**
+ * {@inheritDoc}
+ *
+ *
Converts the configured JSON schema into a named row type via
+ * {@link JsonRowSchemaConverter#convert(String)} and then appends Dagger's internal validation
+ * and rowtime columns.
+ *
+ * @return the {@code TypeInformation} for the JSON stream, including the internal trailing
+ * columns
+ */
public TypeInformation getRowType() {
TypeInformation rowNamed = JsonRowSchemaConverter.convert(jsonSchema);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/parquet/SimpleGroupValidation.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/parquet/SimpleGroupValidation.java
index 07d5e18d4..0cbf36dcf 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/parquet/SimpleGroupValidation.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/parquet/SimpleGroupValidation.java
@@ -9,7 +9,26 @@
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+/**
+ * Static helpers that inspect the schema of a Parquet {@link SimpleGroup} to determine how its
+ * fields are encoded.
+ *
+ *
Dagger reads Parquet records as {@link SimpleGroup} instances and converts them into Flink
+ * rows. Because map-typed fields may be written either with the legacy two-field repeated-group
+ * layout or with the standard Parquet {@code MAP} logical-type layout, these utilities let the
+ * deserializer detect which encoding is present (and whether a field is present and populated at
+ * all) before attempting to read values. The class exposes only static methods and is not intended
+ * to be instantiated.
+ */
public class SimpleGroupValidation {
+ /**
+ * Checks that a field is declared in the group's schema and actually carries at least one value.
+ *
+ * @param simpleGroup the Parquet group to inspect
+ * @param fieldName the name of the field to look for
+ * @return {@code true} if the schema contains {@code fieldName} and its repetition count is
+ * non-zero, {@code false} otherwise
+ */
public static boolean checkFieldExistsAndIsInitialized(SimpleGroup simpleGroup, String fieldName) {
return simpleGroup.getType().containsField(fieldName) && simpleGroup.getFieldRepetitionCount(fieldName) != 0;
}
@@ -66,6 +85,18 @@ public static boolean checkIsStandardSimpleGroupMap(SimpleGroup simpleGroup, Str
&& applyNestedKeyValueFieldValidations(simpleGroup, fieldName);
}
+ /**
+ * Validates the outer group of a candidate standard Parquet map field.
+ *
+ *
For the field to qualify it must be a {@link GroupType} whose repetition is {@code OPTIONAL}
+ * or {@code REQUIRED}, whose logical type annotation is {@link LogicalTypeAnnotation#mapType()},
+ * and which has exactly one child (the nested {@code key_value} group).
+ *
+ * @param simpleGroup the Parquet group containing the candidate map field
+ * @param fieldName the name of the candidate map field
+ * @return {@code true} if the outer map group matches the standard specification, {@code false}
+ * otherwise (including when the field is not a group)
+ */
private static boolean applyMapFieldValidations(SimpleGroup simpleGroup, String fieldName) {
Type mapType = simpleGroup.getType().getType(fieldName);
if (mapType instanceof GroupType) {
@@ -78,6 +109,18 @@ private static boolean applyMapFieldValidations(SimpleGroup simpleGroup, String
return false;
}
+ /**
+ * Validates the nested {@code key_value} group of a candidate standard Parquet map field.
+ *
+ *
Assumes the outer field is already known to be a group. The nested {@code key_value} child
+ * must exist, be a {@code REPEATED} {@link GroupType}, contain a {@code key} field, and that
+ * {@code key} must itself be {@code REQUIRED}, matching the Apache Parquet map specification.
+ *
+ * @param simpleGroup the Parquet group containing the map field
+ * @param fieldName the name of the map field
+ * @return {@code true} if the nested key/value structure matches the specification,
+ * {@code false} otherwise
+ */
private static boolean applyNestedKeyValueFieldValidations(SimpleGroup simpleGroup, String fieldName) {
GroupType mapGroupType = simpleGroup.getType().getType(fieldName).asGroupType();
if (mapGroupType.containsField("key_value")) {
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializer.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializer.java
index f9a0e49e9..51e877281 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializer.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializer.java
@@ -15,12 +15,38 @@
import java.sql.Timestamp;
import java.time.Instant;
+/**
+ * Deserializes Parquet records (read as {@link SimpleGroup} instances) into Flink {@link Row}
+ * objects for a Dagger Parquet source.
+ *
+ *
Dagger's Parquet files are written from protobuf schemas, so the row layout is derived from a
+ * protobuf descriptor resolved by class name through the {@link StencilClientOrchestrator}. Each
+ * {@link SimpleGroup} is mapped column-by-column via {@link RowFactory}, leaving two trailing slots
+ * for Dagger's internal columns, which are then filled with a boolean validation flag and an
+ * event-time (rowtime) {@link Timestamp} extracted from the configured timestamp field.
+ */
public class SimpleGroupDeserializer implements DaggerDeserializer {
+ /** Fully-qualified protobuf class name whose descriptor defines the Parquet record schema. */
private final String protoClassName;
+ /** Proto field number of the timestamp field used to derive the event-time (rowtime). */
private final int timestampFieldIndex;
+ /** Resolves protobuf descriptors from the Stencil schema registry. */
private final StencilClientOrchestrator stencilClientOrchestrator;
+ /** Produced {@link Row} type, including Dagger's internal validation and rowtime columns. */
private final TypeInformation typeInformation;
+ /**
+ * Instantiates a new Parquet {@link SimpleGroup} deserializer.
+ *
+ *
Eagerly builds the produced row type from the protobuf schema via {@link ProtoType} so
+ * Flink can query it; descriptor resolution for actual deserialization happens lazily when
+ * {@link #deserialize(SimpleGroup)} is invoked.
+ *
+ * @param protoClassName the fully-qualified protobuf class name describing the schema
+ * @param timestampFieldIndex the proto field number of the timestamp field used for rowtime
+ * @param rowtimeAttributeName the name to assign to the appended event-time (rowtime) column
+ * @param stencilClientOrchestrator the orchestrator used to resolve protobuf descriptors
+ */
public SimpleGroupDeserializer(String protoClassName, int timestampFieldIndex, String rowtimeAttributeName, StencilClientOrchestrator stencilClientOrchestrator) {
this.protoClassName = protoClassName;
this.timestampFieldIndex = timestampFieldIndex;
@@ -28,6 +54,13 @@ public SimpleGroupDeserializer(String protoClassName, int timestampFieldIndex, S
this.typeInformation = new ProtoType(protoClassName, rowtimeAttributeName, stencilClientOrchestrator).getRowType();
}
+ /**
+ * Resolves the protobuf descriptor for {@link #protoClassName} from the Stencil registry.
+ *
+ * @return the descriptor describing the Parquet record schema
+ * @throws DescriptorNotFoundException if no descriptor is registered for the configured class
+ * name
+ */
private Descriptors.Descriptor getProtoParser() {
Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(protoClassName);
if (dsc == null) {
@@ -36,6 +69,17 @@ private Descriptors.Descriptor getProtoParser() {
return dsc;
}
+ /**
+ * Converts a single Parquet {@link SimpleGroup} into a Flink {@link Row}.
+ *
+ *
The row is built from the resolved descriptor with two extra trailing slots reserved for
+ * Dagger's internal columns, which are subsequently populated by
+ * {@link #addTimestampFieldToRow(Row, SimpleGroup, Descriptors.Descriptor)}.
+ *
+ * @param simpleGroup the Parquet record to convert
+ * @return the resulting row with its validation flag and rowtime timestamp populated
+ * @throws DaggerDeserializationException if conversion of the record fails
+ */
public Row deserialize(SimpleGroup simpleGroup) {
Descriptors.Descriptor descriptor = getProtoParser();
try {
@@ -46,6 +90,19 @@ public Row deserialize(SimpleGroup simpleGroup) {
}
}
+ /**
+ * Populates Dagger's internal trailing columns on an already-mapped Parquet row.
+ *
+ *
Reads the configured timestamp field (located by {@link #timestampFieldIndex}) using a
+ * {@link TimestampHandler}, which yields a two-element row of epoch seconds and nanoseconds. The
+ * second-to-last column (the validation flag) is set to {@code true} and the last column is set
+ * to the corresponding {@link Timestamp} built via {@link Instant#ofEpochSecond(long, long)}.
+ *
+ * @param row the row already populated with the user-visible columns
+ * @param simpleGroup the source Parquet record holding the timestamp field
+ * @param descriptor the protobuf descriptor used to locate the timestamp field by number
+ * @return the same row instance, with its validation flag and rowtime column populated
+ */
private Row addTimestampFieldToRow(Row row, SimpleGroup simpleGroup, Descriptors.Descriptor descriptor) {
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByNumber(timestampFieldIndex);
TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);
@@ -58,6 +115,12 @@ private Row addTimestampFieldToRow(Row row, SimpleGroup simpleGroup, Descriptors
return row;
}
+ /**
+ * {@inheritDoc}
+ *
+ * @return the produced {@link Row} {@code TypeInformation}, including Dagger's internal trailing
+ * columns
+ */
@Override
public TypeInformation getProducedType() {
return this.typeInformation;
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializer.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializer.java
index 6da14e54c..5a56507a2 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializer.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoDeserializer.java
@@ -25,12 +25,33 @@
*/
public class ProtoDeserializer implements KafkaDeserializationSchema, DaggerDeserializer {
+ /**
+ * Logger used to warn about null payloads and invalid protobuf records.
+ */
private static final Logger LOGGER = LoggerFactory.getLogger(ProtoDeserializer.class);
+ /**
+ * The fully-qualified protobuf class name used to resolve the message descriptor.
+ */
private final String protoClassName;
+ /**
+ * The field number of the protobuf timestamp field appended as the rowtime column.
+ */
private final int timestampFieldIndex;
+ /**
+ * The orchestrator used to obtain the Stencil client that resolves proto descriptors.
+ */
private final StencilClientOrchestrator stencilClientOrchestrator;
+ /**
+ * The Flink {@link TypeInformation} describing the {@link Row} produced by this deserializer.
+ */
private final TypeInformation typeInformation;
+ /**
+ * Cache of field descriptors used to build rows efficiently when schema auto-refresh is on.
+ */
private final FieldDescriptorCache fieldDescriptorCache;
+ /**
+ * Whether the Stencil cache auto-refresh is enabled, which selects the descriptor-cache row path.
+ */
private final boolean stencilAutoRefreshEnable;
/**
@@ -50,11 +71,33 @@ public ProtoDeserializer(String protoClassName, int timestampFieldIndex, String
this.stencilAutoRefreshEnable = stencilClientOrchestrator.createStencilConfig().getCacheAutoRefresh();
}
+ /**
+ * {@inheritDoc}
+ *
+ *
This stream is unbounded, so the implementation always reports that the end of stream
+ * has not been reached.
+ *
+ * @param nextElement the most recently deserialized row
+ * @return {@code false} always, since the Kafka source is treated as never-ending
+ */
@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
+ /**
+ * Deserializes a Kafka record into a Flink {@link Row}.
+ *
+ *
A {@code null} payload, or a record that fails protobuf parsing, yields a default
+ * "invalid" row (flagged as invalid with a zero timestamp) rather than failing the job;
+ * a successfully parsed message is converted and augmented with its rowtime timestamp.
+ *
+ * @param consumerRecord the Kafka record whose key and value byte arrays are read
+ * @return the deserialized row, or a default invalid row when the value is {@code null}
+ * or cannot be parsed as the expected protobuf message
+ * @throws DescriptorNotFoundException if the proto descriptor cannot be resolved
+ * @throws DaggerDeserializationException if an unexpected runtime error occurs while parsing
+ */
@Override
public Row deserialize(ConsumerRecord consumerRecord) {
Descriptors.Descriptor descriptor = getProtoParser();
@@ -76,11 +119,23 @@ public Row deserialize(ConsumerRecord consumerRecord) {
}
}
+ /**
+ * {@inheritDoc}
+ *
+ * @return the {@link TypeInformation} of the {@link Row} this deserializer produces
+ */
@Override
public TypeInformation getProducedType() {
return this.typeInformation;
}
+ /**
+ * Resolves the protobuf message {@link Descriptors.Descriptor} for {@code protoClassName}
+ * from the Stencil client.
+ *
+ * @return the descriptor for the configured proto class
+ * @throws DescriptorNotFoundException if no descriptor is registered for {@code protoClassName}
+ */
private Descriptors.Descriptor getProtoParser() {
Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(protoClassName);
if (dsc == null) {
@@ -89,6 +144,15 @@ private Descriptors.Descriptor getProtoParser() {
return dsc;
}
+ /**
+ * Builds a placeholder {@link Row} for records that cannot be deserialized.
+ *
+ *
The row is created from the proto default instance with two extra trailing columns,
+ * the validity flag set to {@code false} and the rowtime set to epoch zero.
+ *
+ * @param defaultInstance the default protobuf message instance used to shape the row
+ * @return a row flagged as invalid with a zero timestamp
+ */
private Row createDefaultInvalidRow(DynamicMessage defaultInstance) {
Row row;
if (stencilAutoRefreshEnable) {
@@ -101,6 +165,15 @@ private Row createDefaultInvalidRow(DynamicMessage defaultInstance) {
return row;
}
+ /**
+ * Converts a parsed protobuf message into a {@link Row} and appends rowtime metadata.
+ *
+ *
Two trailing columns are added: a validity flag set to {@code true} and a
+ * {@link Timestamp} derived from the seconds and nanos of the configured timestamp field.
+ *
+ * @param proto the successfully parsed protobuf message
+ * @return the row representation including the validity flag and event-time timestamp
+ */
private Row addTimestampFieldToRow(DynamicMessage proto) {
Row finalRecord;
if (stencilAutoRefreshEnable) {
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoType.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoType.java
index 3e154a16e..14dadc439 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoType.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/deserialization/ProtoType.java
@@ -17,9 +17,22 @@
*/
public class ProtoType implements Serializable, DaggerInternalTypeInformation {
+ /**
+ * The cached, lazily-resolved protobuf {@link Descriptor} for {@code protoClassName};
+ * marked {@code transient} because descriptors are not serializable.
+ */
private transient Descriptor protoFieldDescriptor;
+ /**
+ * The fully-qualified protobuf class name whose schema drives the row type.
+ */
private String protoClassName;
+ /**
+ * The name of the attribute that carries the Flink rowtime (event-time) field.
+ */
private String rowtimeAttributeName;
+ /**
+ * The orchestrator used to obtain the Stencil client that resolves proto descriptors.
+ */
private StencilClientOrchestrator stencilClientOrchestrator;
/**
@@ -45,6 +58,11 @@ public TypeInformation getRowType() {
return addInternalFields(rowNamed, rowtimeAttributeName);
}
+ /**
+ * Returns the protobuf {@link Descriptor}, resolving and caching it on first access.
+ *
+ * @return the proto field descriptor for {@code protoClassName}
+ */
private Descriptor getProtoFieldDescriptor() {
if (protoFieldDescriptor == null) {
protoFieldDescriptor = createFieldDescriptor();
@@ -52,6 +70,12 @@ private Descriptor getProtoFieldDescriptor() {
return protoFieldDescriptor;
}
+ /**
+ * Resolves the protobuf {@link Descriptor} for {@code protoClassName} via the Stencil client.
+ *
+ * @return the resolved descriptor
+ * @throws DescriptorNotFoundException if no descriptor is registered for {@code protoClassName}
+ */
private Descriptor createFieldDescriptor() {
Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(protoClassName);
if (dsc == null) {
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/serialization/KafkaProtoSerializer.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/serialization/KafkaProtoSerializer.java
index 3716b8daa..d698a83b0 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/serialization/KafkaProtoSerializer.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/serialization/KafkaProtoSerializer.java
@@ -11,25 +11,76 @@
import java.util.Objects;
+/**
+ * Kafka sink serialization schema that turns Flink {@link Row} records into protobuf-encoded
+ * {@link ProducerRecord}s for a Dagger Kafka sink.
+ *
+ *
The actual row-to-protobuf encoding is delegated to a {@link ProtoSerializer}, which produces
+ * the key and value byte arrays; this class wraps those bytes into a {@link ProducerRecord} aimed
+ * at the configured output topic. It implements Flink's {@code KafkaRecordSerializationSchema} so it
+ * can be plugged directly into a Flink {@code KafkaSink}.
+ */
public class KafkaProtoSerializer implements KafkaRecordSerializationSchema {
+ /** Kafka topic the serialized records are published to; must be non-empty when serializing. */
private final String outputTopic;
+ /** Delegate that encodes a {@link Row} into protobuf key and value byte arrays. */
private final ProtoSerializer protoSerializer;
+ /** Logger (named {@code "KafkaSink"}) used to trace rows being written to Kafka. */
private static final Logger LOGGER = LoggerFactory.getLogger("KafkaSink");
+ /**
+ * Creates a serializer with an empty output topic, delegating to
+ * {@link #KafkaProtoSerializer(ProtoSerializer, String)}.
+ *
+ *
An output topic must be configured before records are written, because
+ * {@link #serialize(Row, KafkaSinkContext, Long)} rejects an empty topic with a
+ * {@link DaggerSerializationException}.
+ *
+ * @param protoSerializer the delegate that encodes rows into protobuf key/value bytes
+ */
public KafkaProtoSerializer(ProtoSerializer protoSerializer) {
this(protoSerializer, "");
}
+ /**
+ * Creates a serializer targeting a specific Kafka topic.
+ *
+ * @param protoSerializer the delegate that encodes rows into protobuf key/value bytes
+ * @param outputTopic the Kafka topic to publish serialized records to
+ */
public KafkaProtoSerializer(ProtoSerializer protoSerializer, String outputTopic) {
this.protoSerializer = protoSerializer;
this.outputTopic = outputTopic;
}
+ /**
+ * {@inheritDoc}
+ *
+ *
Delegates to the default {@link KafkaRecordSerializationSchema} initialization; this
+ * serializer holds no additional state that needs setting up.
+ *
+ * @param context the serialization initialization context
+ * @param sinkContext the Kafka sink context
+ * @throws Exception if the default initialization fails
+ */
@Override
public void open(InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
KafkaRecordSerializationSchema.super.open(context, sinkContext);
}
+ /**
+ * {@inheritDoc}
+ *
+ *
Encodes the given row into protobuf key and value byte arrays via the delegate
+ * {@link ProtoSerializer} and wraps them in a {@link ProducerRecord} for the configured output
+ * topic. The row being written is logged at info level.
+ *
+ * @param row the Flink row to serialize
+ * @param context the Kafka sink context (unused)
+ * @param timestamp the event timestamp supplied by Flink (unused)
+ * @return a {@link ProducerRecord} carrying the protobuf key and value for the output topic
+ * @throws DaggerSerializationException if no output topic has been configured
+ */
@Override
public ProducerRecord serialize(Row row, KafkaSinkContext context, Long timestamp) {
if (Objects.isNull(outputTopic) || outputTopic.equals("")) {
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/serialization/ProtoSerializer.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/serialization/ProtoSerializer.java
index 1ddcf5e77..9d61aa00e 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/serialization/ProtoSerializer.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/proto/serialization/ProtoSerializer.java
@@ -14,13 +14,42 @@
import java.util.Arrays;
import java.util.Objects;
+/**
+ * Serializes Flink {@link Row} records into protobuf byte arrays for Kafka keys and values.
+ *
+ *
Column names supplied at construction map row fields to protobuf fields (supporting nested
+ * fields via dot-separated names), and a {@link StencilClientOrchestrator} resolves the key and
+ * message descriptors from the schema registry.
+ */
public class ProtoSerializer implements Serializable {
+ /**
+ * The fully-qualified protobuf class name used to serialize the Kafka record key;
+ * may be {@code null} or empty when no key is produced.
+ */
private final String keyProtoClassName;
+ /**
+ * The row column names, positionally aligned with the fields of each {@link Row}.
+ */
private final String[] columnNames;
+ /**
+ * The orchestrator used to resolve protobuf descriptors from the Stencil registry.
+ */
private final StencilClientOrchestrator stencilClientOrchestrator;
+ /**
+ * The fully-qualified protobuf class name used to serialize the Kafka record value.
+ */
private final String messageProtoClassName;
+ /**
+ * Instantiates a new proto serializer.
+ *
+ * @param keyProtoClassName the protobuf class name for the record key, may be {@code null} or empty
+ * @param messageProtoClassName the protobuf class name for the record value; required and non-empty
+ * @param columnNames the column names mapping row fields to protobuf fields
+ * @param stencilClientOrchestrator the orchestrator used to resolve proto descriptors
+ * @throws DaggerSerializationException if {@code messageProtoClassName} is {@code null} or empty
+ */
public ProtoSerializer(String keyProtoClassName, String messageProtoClassName, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator) {
this.keyProtoClassName = keyProtoClassName;
this.columnNames = columnNames;
@@ -29,6 +58,11 @@ public ProtoSerializer(String keyProtoClassName, String messageProtoClassName, S
checkValidity();
}
+ /**
+ * Validates that a non-empty message proto class name was supplied.
+ *
+ * @throws DaggerSerializationException if {@code messageProtoClassName} is {@code null} or empty
+ */
private void checkValidity() {
if (Objects.isNull(messageProtoClassName) || messageProtoClassName.isEmpty()) {
throw new DaggerSerializationException("messageProtoClassName is required");
@@ -46,10 +80,26 @@ public byte[] serializeKey(Row row) {
: parse(row, getDescriptor(keyProtoClassName)).toByteArray();
}
+ /**
+ * Serializes the value portion of a row into protobuf bytes.
+ *
+ * @param row the row to serialize using {@code messageProtoClassName}
+ * @return the serialized protobuf message as a byte array
+ */
public byte[] serializeValue(Row row) {
return parse(row, getDescriptor(messageProtoClassName)).toByteArray();
}
+ /**
+ * Builds a {@link DynamicMessage} from a row using the given descriptor.
+ *
+ *
Each column is mapped onto the corresponding protobuf field; dot-separated column
+ * names are routed to nested message builders, while unknown top-level fields are skipped.
+ *
+ * @param element the row whose fields are written into the message
+ * @param descriptor the descriptor of the protobuf message being built
+ * @return the populated protobuf message
+ */
private DynamicMessage parse(Row element, Descriptors.Descriptor descriptor) {
int numberOfElements = element.getArity();
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
@@ -71,6 +121,13 @@ private DynamicMessage parse(Row element, Descriptors.Descriptor descriptor) {
return builder.build();
}
+ /**
+ * Resolves the protobuf {@link Descriptors.Descriptor} for the given class name via Stencil.
+ *
+ * @param className the fully-qualified protobuf class name to resolve
+ * @return the descriptor for {@code className}
+ * @throws DescriptorNotFoundException if no descriptor is registered for {@code className}
+ */
private Descriptors.Descriptor getDescriptor(String className) {
Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(className);
if (dsc == null) {
@@ -79,6 +136,20 @@ private Descriptors.Descriptor getDescriptor(String className) {
return dsc;
}
+ /**
+ * Recursively populates a nested protobuf field addressed by a dot-separated column path.
+ *
+ *
The first element of {@code nestedColumnNames} selects a child field on
+ * {@code parentDescriptor}; the method descends into the child message builder until the
+ * leaf field is reached, then writes {@code data} into it.
+ *
+ * @param parentDescriptor the descriptor of the message currently being populated
+ * @param nestedColumnNames the remaining path segments identifying the target field
+ * @param parentBuilder the builder of the message currently being populated
+ * @param data the value to set on the leaf field
+ * @return the parent builder with the nested field populated
+ * @throws InvalidColumnMappingException if a path segment does not exist on the descriptor
+ */
private DynamicMessage.Builder populateNestedBuilder(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames, DynamicMessage.Builder parentBuilder, Object data) {
String childColumnName = nestedColumnNames[0];
Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName);
@@ -95,6 +166,18 @@ private DynamicMessage.Builder populateNestedBuilder(Descriptors.Descriptor pare
return parentBuilder;
}
+ /**
+ * Sets a single, non-nested protobuf field on the given builder.
+ *
+ *
A {@code null} field descriptor or {@code null} data leaves the builder unchanged;
+ * otherwise the value is converted by the {@link TypeHandler} resolved for the field.
+ *
+ * @param builder the builder to populate
+ * @param fieldDescriptor the target protobuf field, may be {@code null}
+ * @param data the value to write, may be {@code null}
+ * @return the (possibly unchanged) builder
+ * @throws InvalidColumnMappingException if {@code data}'s type does not match the field type
+ */
private DynamicMessage.Builder populateBuilder(DynamicMessage.Builder builder, Descriptors.FieldDescriptor fieldDescriptor, Object data) {
if (fieldDescriptor == null) {
return builder;
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/PrimitiveTypeHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/PrimitiveTypeHandler.java
index 43b904077..8ad899125 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/PrimitiveTypeHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/PrimitiveTypeHandler.java
@@ -14,6 +14,10 @@
* The type Primitive proto handler.
*/
public class PrimitiveTypeHandler implements TypeHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} describing the primitive field this handler
+ * converts between its protobuf representation and the corresponding Flink type.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -25,21 +29,54 @@ public PrimitiveTypeHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Indicates that this handler can process the field.
+ *
+ *
{@code PrimitiveTypeHandler} is the fallback handler chosen by
+ * {@code TypeHandlerFactory} when no more specific handler matches, so it always
+ * reports that it can handle the field.
+ *
+ * @return {@code true}, always
+ */
@Override
public boolean canHandle() {
return true;
}
+ /**
+ * Writes the given primitive value onto the supplied protobuf message builder.
+ *
+ *
When {@code field} is {@code null} the builder is returned untouched; otherwise the
+ * value is parsed into the descriptor's primitive type before being set.
+ *
+ * @param builder the dynamic message builder being populated
+ * @param field the Flink-side value to convert and set, or {@code null} to skip
+ * @return the same {@code builder}, with the field set when a non-null value was provided
+ */
@Override
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
return field != null ? builder.setField(fieldDescriptor, transform(field)) : builder;
}
+ /**
+ * Converts a value coming from a post processor into the descriptor's primitive Java type.
+ *
+ * @param field the raw value emitted by an upstream post processor
+ * @return the parsed primitive value
+ */
@Override
public Object transformFromPostProcessor(Object field) {
return transform(field);
}
+ /**
+ * Parses the raw value into the descriptor's primitive type using the matching
+ * {@code PrimitiveHandler}.
+ *
+ * @param field the raw value to parse
+ * @return the parsed primitive value
+ * @throws InvalidDataTypeException if the value cannot be parsed into the field's expected type
+ */
private Object transform(Object field) {
PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor);
try {
@@ -50,27 +87,63 @@ private Object transform(Object field) {
}
}
+ /**
+ * Returns the primitive value read from a protobuf message unchanged.
+ *
+ *
Primitive protobuf values already map directly onto Flink types, so no conversion
+ * is required.
+ *
+ * @param field the value read from the protobuf message
+ * @return the same {@code field} value
+ */
@Override
public Object transformFromProto(Object field) {
return field;
}
+ /**
+ * Returns the primitive protobuf value unchanged, ignoring the descriptor cache.
+ *
+ *
The {@code cache} is accepted for interface compatibility but is not needed for
+ * primitive fields, which require no nested descriptor lookups.
+ *
+ * @param field the value read from the protobuf message
+ * @param cache the field descriptor cache, unused for primitive fields
+ * @return the same {@code field} value
+ */
@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return field;
}
+ /**
+ * Reads the primitive value for this field from a Parquet {@code SimpleGroup}.
+ *
+ * @param simpleGroup the Parquet group holding the row being deserialized
+ * @return the parsed primitive value, or the type's default when the field is absent
+ */
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor);
return primitiveHandler.parseSimpleGroup(simpleGroup);
}
+ /**
+ * Returns the primitive value unchanged for JSON serialization.
+ *
+ * @param field the primitive value to emit
+ * @return the same {@code field} value
+ */
@Override
public Object transformToJson(Object field) {
return field;
}
+ /**
+ * Returns the Flink {@code TypeInformation} for this primitive field.
+ *
+ * @return the type information supplied by the matching {@code PrimitiveHandler}
+ */
@Override
public TypeInformation getTypeInformation() {
PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/RowFactory.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/RowFactory.java
index 790d707a7..198a90141 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/RowFactory.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/RowFactory.java
@@ -81,6 +81,18 @@ public static Row createRow(DynamicMessage proto, int extraColumns, FieldDescrip
}
+ /**
+ * Builds a Flink {@code Row} from a Parquet {@code SimpleGroup}, reserving extra trailing columns.
+ *
+ *
Each field declared by the descriptor is deserialized through its matching
+ * {@code TypeHandler} and placed at the field's index; the row is widened by
+ * {@code extraColumns} so callers can append derived columns afterwards.
+ *
+ * @param descriptor the protobuf descriptor describing the row layout
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @param extraColumns the number of additional, initially empty columns to append
+ * @return the populated row
+ */
public static Row createRow(Descriptors.Descriptor descriptor, SimpleGroup simpleGroup, int extraColumns) {
List descriptorFields = descriptor.getFields();
Row row = new Row(descriptorFields.size() + extraColumns);
@@ -91,6 +103,13 @@ public static Row createRow(Descriptors.Descriptor descriptor, SimpleGroup simpl
return row;
}
+ /**
+ * Builds a Flink {@code Row} from a Parquet {@code SimpleGroup} with no extra columns.
+ *
+ * @param descriptor the protobuf descriptor describing the row layout
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the populated row
+ */
public static Row createRow(Descriptors.Descriptor descriptor, SimpleGroup simpleGroup) {
return createRow(descriptor, simpleGroup, 0);
}
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java
index 852155260..0a5d84e53 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/TypeHandlerFactory.java
@@ -23,6 +23,12 @@
* The factory class for Type handler.
*/
public class TypeHandlerFactory {
+ /**
+ * Cache of previously resolved handlers, keyed by each field's fully qualified name.
+ *
+ *
Every entry pairs the field descriptor's hash code with its {@code TypeHandler} so a
+ * cached handler can be reused, while still being rebuilt whenever the descriptor changes.
+ */
private static Map> typeHandlerMap = new ConcurrentHashMap<>();
/**
@@ -60,6 +66,16 @@ protected static void clearTypeHandlerMap() {
typeHandlerMap.clear();
}
+ /**
+ * Builds the ordered list of candidate handlers for the given field.
+ *
+ *
Ordering is significant: {@code getTypeHandler} selects the first handler whose
+ * {@code canHandle()} returns {@code true}, so more specific handlers (maps, timestamps,
+ * enums, structs and repeated variants) are listed before the generic message handler.
+ *
+ * @param fieldDescriptor the field descriptor to build candidate handlers for
+ * @return the ordered list of candidate handlers to try
+ */
private static List getSpecificHandlers(Descriptors.FieldDescriptor fieldDescriptor) {
return Arrays.asList(
new MapHandler(fieldDescriptor),
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/EnumHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/EnumHandler.java
index 0b0621cd6..ac0652707 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/EnumHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/EnumHandler.java
@@ -15,6 +15,10 @@
* The type Enum proto handler.
*/
public class EnumHandler implements TypeHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the enum field this handler converts to and
+ * from its string name representation.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -26,11 +30,27 @@ public EnumHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field is a non-repeated protobuf {@code enum}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.ENUM && !fieldDescriptor.isRepeated();
}
+ /**
+ * Sets the enum field on the builder by resolving the value's name to an enum constant.
+ *
+ *
The incoming value is treated as the enum constant name (trimmed). When the handler
+ * cannot apply or {@code field} is {@code null}, the builder is returned unchanged.
+ *
+ * @param builder the dynamic message builder being populated
+ * @param field the enum constant name to set, or {@code null} to skip
+ * @return the same {@code builder}, with the enum field set when resolvable
+ * @throws EnumFieldNotFoundException if the name does not match any constant of the enum
+ */
@Override
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
if (!canHandle() || field == null) {
@@ -44,6 +64,15 @@ public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder bui
return builder.setField(fieldDescriptor, valueByName);
}
+ /**
+ * Resolves a post-processor value to a protobuf enum constant name.
+ *
+ *
The input may be either the enum's numeric position or its name; when it matches
+ * neither, the enum's zero-numbered (default) constant name is returned.
+ *
+ * @param field the value to resolve, defaulting to {@code "0"} when {@code null}
+ * @return the resolved enum constant name
+ */
@Override
public Object transformFromPostProcessor(Object field) {
String input = field != null ? field.toString() : "0";
@@ -57,16 +86,37 @@ public Object transformFromPostProcessor(Object field) {
}
}
+ /**
+ * Converts an enum value read from a protobuf message into its trimmed string name.
+ *
+ * @param field the enum value descriptor read from the message
+ * @return the enum constant name as a string
+ */
@Override
public Object transformFromProto(Object field) {
return String.valueOf(field).trim();
}
+ /**
+ * Converts the protobuf enum value into its trimmed string name, ignoring the cache.
+ *
+ * @param field the enum value descriptor read from the message
+ * @param cache the field descriptor cache, unused for enum fields
+ * @return the enum constant name as a string
+ */
@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return String.valueOf(field).trim();
}
+ /**
+ * Reads the enum field from a Parquet {@code SimpleGroup} as its constant name.
+ *
+ *
Unknown or absent values fall back to the enum's zero-numbered default constant.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the resolved enum constant name, or the default constant name when missing
+ */
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
String defaultEnumValue = fieldDescriptor.getEnumType().findValueByNumber(0).getName();
@@ -79,11 +129,22 @@ public Object transformFromParquet(SimpleGroup simpleGroup) {
return defaultEnumValue;
}
+ /**
+ * Returns the enum constant name unchanged for JSON serialization.
+ *
+ * @param field the enum constant name
+ * @return the same {@code field} value
+ */
@Override
public Object transformToJson(Object field) {
return field;
}
+ /**
+ * Returns the Flink {@code TypeInformation} used to represent this enum field.
+ *
+ * @return {@code Types.STRING}, since enum constants are represented by their name
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.STRING;
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java
index 9076d6b98..92f4e2933 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/GoogleProtobufComplexMessageHandler.java
@@ -30,6 +30,11 @@
*/
public class GoogleProtobufComplexMessageHandler implements TypeHandler {
+ /**
+ * The set of fully qualified protobuf message names that this handler recognizes as
+ * dynamic, JSON-like complex types ({@code Struct}, {@code Value}, {@code ListValue}
+ * and {@code NullValue}).
+ */
private static final Set RECOGNIZED_COMPLEX_TYPES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
"google.protobuf.Struct",
"google.protobuf.Value",
@@ -37,18 +42,42 @@ public class GoogleProtobufComplexMessageHandler implements TypeHandler {
"google.protobuf.NullValue"
)));
+ /**
+ * The protobuf {@code FieldDescriptor} of the complex message field handled here.
+ */
private final Descriptors.FieldDescriptor fieldDescriptor;
+ /**
+ * Instantiates a new handler for dynamic Google Protobuf complex message types.
+ *
+ * @param fieldDescriptor the descriptor of the complex message field to handle
+ */
public GoogleProtobufComplexMessageHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field is a message whose type is one of the recognized
+ * complex protobuf types
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE
&& RECOGNIZED_COMPLEX_TYPES.contains(fieldDescriptor.getMessageType().getFullName());
}
+ /**
+ * Serializes a recognized complex protobuf message into its raw byte-array form.
+ *
+ *
Because these {@code Struct}-family types are dynamic and recursive, they are stored
+ * as the message's serialized bytes and reconstructed later using the field descriptor.
+ *
+ * @param field the protobuf value read from the message; expected to be a {@code DynamicMessage}
+ * @return the serialized bytes of the message, or {@code null} when the value is empty,
+ * absent, or not a {@code DynamicMessage}
+ */
@Override
public Object transformFromProto(Object field) {
if (field == null) {
@@ -65,11 +94,33 @@ public Object transformFromProto(Object field) {
return null;
}
+ /**
+ * Serializes the complex protobuf message to bytes, delegating to {@code transformFromProto}.
+ *
+ *
The descriptor cache is not required for these self-describing complex types.
+ *
+ * @param field the protobuf value read from the message
+ * @param cache the field descriptor cache, unused for this conversion
+ * @return the serialized bytes of the message, or {@code null} when there is no usable value
+ */
@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return transformFromProto(field);
}
+ /**
+ * Reconstructs a complex protobuf message from its byte-array form and sets it on the builder.
+ *
+ *
The {@code field} is expected to be the serialized bytes previously produced by
+ * {@code transformFromProto}; it is parsed back into a {@code DynamicMessage} using the
+ * field's message type. When the handler cannot apply or {@code field} is {@code null},
+ * the builder is returned unchanged.
+ *
+ * @param builder the dynamic message builder being populated
+ * @param field the serialized message bytes to parse and set, or {@code null} to skip
+ * @return the same {@code builder}, with the field set when bytes were provided
+ * @throws RuntimeException if the bytes cannot be parsed into the field's message type
+ */
@Override
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
if (!canHandle() || field == null) {
@@ -85,21 +136,44 @@ public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder bui
}
}
+ /**
+ * Returns the post-processor value unchanged.
+ *
+ * @param field the value supplied by an upstream post processor
+ * @return the same {@code field} value
+ */
@Override
public Object transformFromPostProcessor(Object field) {
return field;
}
+ /**
+ * Returns {@code null}, as these complex types are not read from Parquet sources.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return {@code null}, always
+ */
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
return null;
}
+ /**
+ * Returns {@code null}, as these complex types are not emitted to JSON by this handler.
+ *
+ * @param field the value that would be serialized
+ * @return {@code null}, always
+ */
@Override
public Object transformToJson(Object field) {
return null;
}
+ /**
+ * Returns the Flink {@code TypeInformation} used to represent this field.
+ *
+ * @return a primitive byte-array type, matching the serialized byte representation
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.PRIMITIVE_ARRAY(Types.BYTE);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/MapHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/MapHandler.java
index 266129138..6c4dccfe2 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/MapHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/MapHandler.java
@@ -27,7 +27,13 @@
*/
public class MapHandler implements TypeHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the map field being converted.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
+ /**
+ * Delegate handler that treats the map's entries as a repeated key/value message.
+ */
private TypeHandler repeatedMessageHandler;
/**
@@ -40,11 +46,27 @@ public MapHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.repeatedMessageHandler = new RepeatedMessageHandler(fieldDescriptor);
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field is a protobuf {@code map} field
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.isMapField();
}
+ /**
+ * Sets the map field on the builder by encoding its entries as repeated key/value messages.
+ *
+ *
A {@code Map} input is first turned into rows of {@code (key, value)} pairs; any other
+ * input is passed straight to the underlying repeated-message handler. When the handler
+ * cannot apply or {@code field} is {@code null}, the builder is returned unchanged.
+ *
+ * @param builder the dynamic message builder being populated
+ * @param field the map (or pre-built rows) to encode, or {@code null} to skip
+ * @return the same {@code builder}, with the map entries set when provided
+ */
@Override
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
if (!canHandle() || field == null) {
@@ -61,6 +83,16 @@ public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder bui
return repeatedMessageHandler.transformToProtoBuilder(builder, field);
}
+ /**
+ * Converts a post-processor value into an array of key/value {@code Row} entries.
+ *
+ *
For a {@code Map} input, each entry's key and value are converted with their own
+ * handlers; a {@code List} input is delegated to the repeated-message handler. Any other
+ * input (including {@code null}) yields an empty array.
+ *
+ * @param field the map or list value emitted by an upstream post processor
+ * @return an array of two-field rows, one per map entry
+ */
@Override
public Object transformFromPostProcessor(Object field) {
ArrayList rows = new ArrayList<>();
@@ -85,16 +117,38 @@ public Object transformFromPostProcessor(Object field) {
return rows.toArray();
}
+ /**
+ * Converts the map entries read from a protobuf message into key/value rows.
+ *
+ * @param field the repeated map-entry value read from the message
+ * @return an array of two-field rows, one per map entry
+ */
@Override
public Object transformFromProto(Object field) {
return repeatedMessageHandler.transformFromProto(field);
}
+ /**
+ * Converts the protobuf map entries into key/value rows using the descriptor cache.
+ *
+ * @param field the repeated map-entry value read from the message
+ * @param cache the field descriptor cache used to resolve nested field indices
+ * @return an array of two-field rows, one per map entry
+ */
@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return repeatedMessageHandler.transformFromProtoUsingCache(field, cache);
}
+ /**
+ * Reads the map field from a Parquet {@code SimpleGroup} into key/value rows.
+ *
+ *
Both the legacy and the standard ({@code key_value}-wrapped) Parquet map encodings are
+ * supported; an empty array is returned when the field is missing.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return an array of two-field rows, one per map entry, or an empty array when absent
+ */
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -108,6 +162,13 @@ public Object transformFromParquet(SimpleGroup simpleGroup) {
return new Row[0];
}
+ /**
+ * Deserializes a legacy-encoded Parquet map, where entries are repeated directly on the field.
+ *
+ * @param simpleGroup the Parquet group containing the map field
+ * @param fieldName the name of the map field to read
+ * @return the deserialized key/value rows
+ */
private Row[] transformLegacyMapFromSimpleGroup(SimpleGroup simpleGroup, String fieldName) {
ArrayList deserializedRows = new ArrayList<>();
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
@@ -119,6 +180,14 @@ private Row[] transformLegacyMapFromSimpleGroup(SimpleGroup simpleGroup, String
return deserializedRows.toArray(new Row[]{});
}
+ /**
+ * Deserializes a standard-encoded Parquet map, where entries are nested under a
+ * {@code key_value} group.
+ *
+ * @param simpleGroup the Parquet group containing the map field
+ * @param fieldName the name of the map field to read
+ * @return the deserialized key/value rows
+ */
private Row[] transformStandardMapFromSimpleGroup(SimpleGroup simpleGroup, String fieldName) {
ArrayList deserializedRows = new ArrayList<>();
final String innerFieldName = "key_value";
@@ -132,11 +201,22 @@ private Row[] transformStandardMapFromSimpleGroup(SimpleGroup simpleGroup, Strin
return deserializedRows.toArray(new Row[]{});
}
+ /**
+ * Returns {@code null}, as map fields are not serialized to JSON by this handler.
+ *
+ * @param field the value that would be serialized
+ * @return {@code null}, always
+ */
@Override
public Object transformToJson(Object field) {
return null;
}
+ /**
+ * Returns the Flink {@code TypeInformation} used to represent this map field.
+ *
+ * @return an object-array type whose element is the key/value row type
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.OBJECT_ARRAY(TypeInformationFactory.getRowType(fieldDescriptor.getMessageType()));
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/MessageHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/MessageHandler.java
index c7dcd12f9..0e5bd1909 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/MessageHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/MessageHandler.java
@@ -25,9 +25,21 @@
* The type Message proto handler.
*/
public class MessageHandler implements TypeHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the nested message field being converted.
+ */
private FieldDescriptor fieldDescriptor;
+ /**
+ * Lazily created schema used to serialize the message row to JSON.
+ */
private JsonRowSerializationSchema jsonRowSerializationSchema;
+ /**
+ * The default (empty) instance of the message, used when a Parquet value is absent.
+ */
private DynamicMessage defaultMessageInstance;
+ /**
+ * The descriptor of the nested message type, cached for deserialization.
+ */
private Descriptors.Descriptor fieldMessageDescriptor;
/**
@@ -43,11 +55,27 @@ public MessageHandler(FieldDescriptor fieldDescriptor) {
}
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field is a message type other than {@code google.protobuf.Timestamp}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == MESSAGE && !fieldDescriptor.getMessageType().getFullName().equals("google.protobuf.Timestamp");
}
+ /**
+ * Builds the nested protobuf message from a Flink {@code Row} and sets it on the builder.
+ *
+ *
Each nested field present in the row is converted with its own handler before the
+ * assembled message is attached. When the handler cannot apply or {@code field} is
+ * {@code null}, the builder is returned unchanged.
+ *
+ * @param builder the dynamic message builder being populated
+ * @param field the row holding the nested message's field values, or {@code null} to skip
+ * @return the same {@code builder}, with the nested message set when provided
+ */
@Override
public Builder transformToProtoBuilder(Builder builder, Object field) {
if (!canHandle() || field == null) {
@@ -71,21 +99,48 @@ public Builder transformToProtoBuilder(Builder builder, Object field) {
return builder.setField(fieldDescriptor, elementBuilder.build());
}
+ /**
+ * Converts a post-processor map into a Flink {@code Row} for the nested message.
+ *
+ * @param field the nested message values as a map keyed by field name
+ * @return the populated row representing the nested message
+ */
@Override
public Object transformFromPostProcessor(Object field) {
return RowFactory.createRow((Map) field, fieldDescriptor.getMessageType());
}
+ /**
+ * Converts a nested protobuf message read from the parent into a Flink {@code Row}.
+ *
+ * @param field the nested {@code DynamicMessage} read from the parent message
+ * @return the populated row representing the nested message
+ */
@Override
public Object transformFromProto(Object field) {
return RowFactory.createRow((DynamicMessage) field);
}
+ /**
+ * Converts the nested protobuf message into a row using the descriptor cache.
+ *
+ * @param field the nested {@code DynamicMessage} read from the parent message
+ * @param cache the field descriptor cache used to resolve nested field indices
+ * @return the populated row representing the nested message
+ */
@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return RowFactory.createRow((DynamicMessage) field, cache);
}
+ /**
+ * Reads the nested message field from a Parquet {@code SimpleGroup} into a row.
+ *
+ *
When the field is missing, a row built from the message's default instance is returned.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the populated row, or a default row when the field is absent
+ */
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -96,6 +151,14 @@ public Object transformFromParquet(SimpleGroup simpleGroup) {
return RowFactory.createRow(defaultMessageInstance);
}
+ /**
+ * Serializes the nested message row to its JSON string representation.
+ *
+ *
The JSON serialization schema is created lazily on first use.
+ *
+ * @param field the nested message row to serialize
+ * @return the JSON string for the row
+ */
@Override
public Object transformToJson(Object field) {
if (jsonRowSerializationSchema == null) {
@@ -104,11 +167,21 @@ public Object transformToJson(Object field) {
return new String(jsonRowSerializationSchema.serialize((Row) field));
}
+ /**
+ * Returns the Flink {@code TypeInformation} used to represent this nested message.
+ *
+ * @return the row type derived from the message descriptor
+ */
@Override
public TypeInformation getTypeInformation() {
return TypeInformationFactory.getRowType(fieldDescriptor.getMessageType());
}
+ /**
+ * Builds the JSON row serialization schema for the nested message type.
+ *
+ * @return a schema configured with this message's row type information
+ */
private JsonRowSerializationSchema createJsonRowSchema() {
return JsonRowSerializationSchema
.builder()
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/StructMessageHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/StructMessageHandler.java
index bb3833acb..a0ded77cc 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/StructMessageHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/StructMessageHandler.java
@@ -13,6 +13,9 @@
* The type Struct message proto handler.
*/
public class StructMessageHandler implements TypeHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the {@code google.protobuf.Struct} field handled here.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -24,42 +27,92 @@ public StructMessageHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field is a non-repeated {@code google.protobuf.Struct} message
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE
&& fieldDescriptor.toProto().getTypeName().equals(".google.protobuf.Struct") && !fieldDescriptor.isRepeated();
}
+ /**
+ * Returns the builder unchanged.
+ *
+ *
This handler does not currently serialize {@code Struct} values back into protobuf.
+ *
+ * @param builder the dynamic message builder being populated
+ * @param field the value that would be set, ignored here
+ * @return the supplied {@code builder}, unchanged
+ */
@Override
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
return builder;
}
+ /**
+ * Returns {@code null}, as {@code Struct} values are not produced from post-processor input.
+ *
+ * @param field the value supplied by an upstream post processor
+ * @return {@code null}, always
+ */
@Override
public Object transformFromPostProcessor(Object field) {
return null;
}
+ /**
+ * Returns {@code null}, as {@code Struct} values are not read from protobuf by this handler.
+ *
+ * @param field the value read from the protobuf message
+ * @return {@code null}, always
+ */
@Override
public Object transformFromProto(Object field) {
return null;
}
+ /**
+ * Returns {@code null}, ignoring the descriptor cache.
+ *
+ * @param field the value read from the protobuf message
+ * @param cache the field descriptor cache, unused here
+ * @return {@code null}, always
+ */
@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return null;
}
+ /**
+ * Returns {@code null}, as {@code Struct} values are not read from Parquet by this handler.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return {@code null}, always
+ */
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
return null;
}
+ /**
+ * Returns {@code null}, as {@code Struct} values are not emitted to JSON by this handler.
+ *
+ * @param field the value that would be serialized
+ * @return {@code null}, always
+ */
@Override
public Object transformToJson(Object field) {
return null;
}
+ /**
+ * Returns the Flink {@code TypeInformation} used to represent this field.
+ *
+ * @return an empty named-row type
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.ROW_NAMED(new String[]{});
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/TimestampHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/TimestampHandler.java
index 32f659452..e39ccbed0 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/TimestampHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/complex/TimestampHandler.java
@@ -27,11 +27,29 @@
* The type Timestamp proto handler.
*/
public class TimestampHandler implements TypeHandler {
+ /**
+ * The number of milliseconds in one second, used to split epoch values into seconds.
+ */
private static final int SECOND_TO_MS_FACTOR = 1000;
+ /**
+ * The default seconds component used when a timestamp value is absent.
+ */
private static final long DEFAULT_SECONDS_VALUE = 0L;
+ /**
+ * The default nanoseconds component used when a timestamp value is absent.
+ */
private static final int DEFAULT_NANOS_VALUE = 0;
+ /**
+ * The number of nanoseconds in one millisecond, used when converting Parquet millis.
+ */
private static final int MS_TO_NANOS_FACTOR = 1000_000;
+ /**
+ * The UTC date format used to render timestamps as strings for JSON output.
+ */
private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ /**
+ * The protobuf {@code FieldDescriptor} of the {@code google.protobuf.Timestamp} field handled here.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -44,11 +62,29 @@ public TimestampHandler(Descriptors.FieldDescriptor fieldDescriptor) {
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field is a {@code google.protobuf.Timestamp} message
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE && fieldDescriptor.getMessageType().getFullName().equals("google.protobuf.Timestamp");
}
+ /**
+ * Converts a variety of time representations into a protobuf {@code Timestamp} and sets it.
+ *
+ *
Supported inputs include {@code java.sql.Timestamp}, {@code Instant},
+ * {@code LocalDateTime}, a two-field {@code Row} of {@code (seconds, nanos)}, an ISO-8601
+ * {@code String}, and any {@code Number} of epoch seconds. When the handler cannot apply or
+ * {@code field} is {@code null}, the builder is returned unchanged.
+ *
+ * @param builder the dynamic message builder being populated
+ * @param field the time value to convert and set, or {@code null} to skip
+ * @return the same {@code builder}, with the timestamp set when a value could be derived
+ * @throws IllegalArgumentException if a {@code Row} input does not have exactly two fields
+ */
@Override
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
if (!canHandle() || field == null) {
@@ -93,27 +129,62 @@ public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder bui
return builder;
}
+ /**
+ * Converts a {@code LocalDateTime} into a protobuf {@code Timestamp} at UTC.
+ *
+ * @param timeField the local date-time to convert, interpreted as UTC
+ * @return the equivalent protobuf timestamp
+ */
private Timestamp convertLocalDateTime(LocalDateTime timeField) {
return Timestamp.newBuilder()
.setSeconds(timeField.toEpochSecond(ZoneOffset.UTC))
.build();
}
+ /**
+ * Converts a post-processor value into its ISO-8601 string form when it is a valid instant.
+ *
+ * @param field the value emitted by an upstream post processor
+ * @return the value's string representation, or {@code null} if it is not a valid timestamp
+ */
@Override
public Object transformFromPostProcessor(Object field) {
return isValid(field) ? field.toString() : null;
}
+ /**
+ * Converts a protobuf {@code Timestamp} message read from the parent into a Flink {@code Row}.
+ *
+ * @param field the nested {@code DynamicMessage} timestamp read from the parent message
+ * @return a row holding the timestamp's {@code seconds} and {@code nanos} fields
+ */
@Override
public Object transformFromProto(Object field) {
return RowFactory.createRow((DynamicMessage) field);
}
+ /**
+ * Converts the protobuf timestamp into a row using the descriptor cache.
+ *
+ * @param field the nested {@code DynamicMessage} timestamp read from the parent message
+ * @param cache the field descriptor cache used to resolve nested field indices
+ * @return a row holding the timestamp's {@code seconds} and {@code nanos} fields
+ */
@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return RowFactory.createRow((DynamicMessage) field, cache);
}
+ /**
+ * Reads the timestamp field from a Parquet {@code SimpleGroup} into a {@code (seconds, nanos)} row.
+ *
+ *
Both the {@code INT64} millisecond encoding and the nested group encoding (with
+ * {@code seconds} and {@code nanos} fields) are supported; a default zero timestamp is
+ * returned when the field is absent.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return a two-field row of seconds and nanos
+ */
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -128,6 +199,13 @@ public Object transformFromParquet(SimpleGroup simpleGroup) {
return Row.of(DEFAULT_SECONDS_VALUE, DEFAULT_NANOS_VALUE);
}
+ /**
+ * Parses an {@code INT64} millisecond timestamp from a Parquet group into seconds and nanos.
+ *
+ * @param simpleGroup the Parquet group containing the timestamp field
+ * @param timestampFieldName the name of the timestamp field to read
+ * @return a two-field row of seconds and nanos
+ */
private Row parseInt64TimestampFromSimpleGroup(SimpleGroup simpleGroup, String timestampFieldName) {
/* conversion from ms to nanos borrowed from Instant.java class and inlined here for performance reasons */
long timeInMillis = simpleGroup.getLong(timestampFieldName, 0);
@@ -137,6 +215,13 @@ private Row parseInt64TimestampFromSimpleGroup(SimpleGroup simpleGroup, String t
return Row.of(seconds, nanos);
}
+ /**
+ * Parses a nested-group timestamp (with {@code seconds} and {@code nanos}) from a Parquet group.
+ *
+ * @param simpleGroup the Parquet group containing the timestamp field
+ * @param timestampFieldName the name of the timestamp group field to read
+ * @return a two-field row of seconds and nanos, defaulting to zero for missing components
+ */
private Row parseGroupTypeTimestampFromSimpleGroup(SimpleGroup simpleGroup, String timestampFieldName) {
SimpleGroup timestampGroup = (SimpleGroup) simpleGroup.getGroup(timestampFieldName, 0);
long seconds = 0L;
@@ -150,6 +235,13 @@ private Row parseGroupTypeTimestampFromSimpleGroup(SimpleGroup simpleGroup, Stri
return Row.of(seconds, nanos);
}
+ /**
+ * Renders the timestamp row as a UTC date-time string for JSON output.
+ *
+ * @param field the timestamp {@code Row} of {@code (seconds, nanos)}
+ * @return the formatted UTC date-time string, or the original value when it is not a
+ * two-field row
+ */
@Override
public Object transformToJson(Object field) {
Row timeField = (Row) field;
@@ -161,11 +253,22 @@ public Object transformToJson(Object field) {
}
}
+ /**
+ * Returns the Flink {@code TypeInformation} used to represent this timestamp field.
+ *
+ * @return the row type derived from the timestamp message descriptor
+ */
@Override
public TypeInformation getTypeInformation() {
return TypeInformationFactory.getRowType(fieldDescriptor.getMessageType());
}
+ /**
+ * Converts a {@code java.sql.Timestamp} into a protobuf {@code Timestamp}.
+ *
+ * @param field the SQL timestamp to convert
+ * @return the equivalent protobuf timestamp, preserving seconds and nanoseconds
+ */
private Timestamp convertSqlTimestamp(java.sql.Timestamp field) {
long timestampSeconds = field.getTime() / SECOND_TO_MS_FACTOR;
int timestampNanos = field.getNanos();
@@ -175,6 +278,12 @@ private Timestamp convertSqlTimestamp(java.sql.Timestamp field) {
.build();
}
+ /**
+ * Checks whether the given value can be parsed as an ISO-8601 instant.
+ *
+ * @param field the value to validate
+ * @return {@code true} if the value is non-null and parses as an {@code Instant}
+ */
private boolean isValid(Object field) {
if (field == null) {
return false;
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/BooleanHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/BooleanHandler.java
index 46917901f..8c9c9df97 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/BooleanHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/BooleanHandler.java
@@ -14,6 +14,9 @@
* The type Boolean primitive type handler.
*/
public class BooleanHandler implements PrimitiveHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the boolean field this handler processes.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -25,16 +28,33 @@ public BooleanHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field's Java type is {@code BOOLEAN}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == JavaType.BOOLEAN;
}
+ /**
+ * Parses the given value into a Java {@code boolean}.
+ *
+ * @param field the value to parse, defaulting to {@code false} when {@code null}
+ * @return the parsed boolean value
+ */
@Override
public Object parseObject(Object field) {
return Boolean.parseBoolean(getValueOrDefault(field, "false"));
}
+ /**
+ * Reads the boolean value for this field from a Parquet {@code SimpleGroup}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the boolean value, or {@code false} when the field is absent
+ */
@Override
public Object parseSimpleGroup(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -48,6 +68,12 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}
}
+ /**
+ * Converts a list of boolean values into a primitive {@code boolean[]}.
+ *
+ * @param field the list of boolean values, or {@code null}
+ * @return the values as a {@code boolean[]}, empty when {@code field} is {@code null}
+ */
@Override
public Object parseRepeatedObjectField(Object field) {
boolean[] inputValues = new boolean[0];
@@ -57,6 +83,12 @@ public Object parseRepeatedObjectField(Object field) {
return inputValues;
}
+ /**
+ * Reads the repeated boolean field from a Parquet {@code SimpleGroup} into a {@code boolean[]}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the boolean array, empty when the field is absent
+ */
@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -71,11 +103,21 @@ public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
return new boolean[0];
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a single boolean value.
+ *
+ * @return {@code Types.BOOLEAN}
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.BOOLEAN;
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a repeated boolean field.
+ *
+ * @return a primitive boolean-array type
+ */
@Override
public TypeInformation getArrayType() {
return Types.PRIMITIVE_ARRAY(Types.BOOLEAN);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/ByteStringHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/ByteStringHandler.java
index 146ace65f..18ac1b4dc 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/ByteStringHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/ByteStringHandler.java
@@ -16,6 +16,9 @@
* The type Byte string primitive type handler.
*/
public class ByteStringHandler implements PrimitiveHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the byte-string field this handler processes.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -27,16 +30,33 @@ public ByteStringHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field's Java type is {@code BYTE_STRING}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == JavaType.BYTE_STRING;
}
+ /**
+ * Returns the given byte-string value unchanged.
+ *
+ * @param field the value to pass through
+ * @return the same {@code field} value
+ */
@Override
public Object parseObject(Object field) {
return field;
}
+ /**
+ * Reads the byte-string value for this field from a Parquet {@code SimpleGroup}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the value as a {@code ByteString}, or {@code null} when the field is absent
+ */
@Override
public Object parseSimpleGroup(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -50,6 +70,12 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}
}
+ /**
+ * Converts a list of byte-string values into a {@code ByteString[]}.
+ *
+ * @param field the list of {@code ByteString} values, or {@code null}
+ * @return the values as a {@code ByteString[]}, empty when {@code field} is {@code null}
+ */
@Override
public Object parseRepeatedObjectField(Object field) {
List inputValues = new ArrayList<>();
@@ -59,6 +85,12 @@ public Object parseRepeatedObjectField(Object field) {
return inputValues.toArray(new ByteString[]{});
}
+ /**
+ * Reads the repeated byte-string field from a Parquet {@code SimpleGroup} into a {@code ByteString[]}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the byte-string array, empty when the field is absent
+ */
@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -73,11 +105,21 @@ public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
return byteStringList.toArray(new ByteString[]{});
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a single byte-string value.
+ *
+ * @return the type information for {@code ByteString}
+ */
@Override
public TypeInformation getTypeInformation() {
return TypeInformation.of(ByteString.class);
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a repeated byte-string field.
+ *
+ * @return an object-array type of {@code ByteString}
+ */
@Override
public TypeInformation getArrayType() {
return Types.OBJECT_ARRAY(TypeInformation.of(ByteString.class));
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/DoubleHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/DoubleHandler.java
index e326c677d..fba790802 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/DoubleHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/DoubleHandler.java
@@ -14,6 +14,9 @@
* The type Double primitive type handler.
*/
public class DoubleHandler implements PrimitiveHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the double field this handler processes.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -25,16 +28,33 @@ public DoubleHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field's Java type is {@code DOUBLE}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == JavaType.DOUBLE;
}
+ /**
+ * Parses the given value into a Java {@code double}.
+ *
+ * @param field the value to parse, defaulting to {@code 0} when {@code null}
+ * @return the parsed double value
+ */
@Override
public Object parseObject(Object field) {
return Double.parseDouble(getValueOrDefault(field, "0"));
}
+ /**
+ * Reads the double value for this field from a Parquet {@code SimpleGroup}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the double value, or {@code 0.0} when the field is absent
+ */
@Override
public Object parseSimpleGroup(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -48,6 +68,12 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}
}
+ /**
+ * Converts a list of double values into a primitive {@code double[]}.
+ *
+ * @param field the list of double values, or {@code null}
+ * @return the values as a {@code double[]}, empty when {@code field} is {@code null}
+ */
@Override
public Object parseRepeatedObjectField(Object field) {
double[] inputValues = new double[0];
@@ -57,6 +83,12 @@ public Object parseRepeatedObjectField(Object field) {
return inputValues;
}
+ /**
+ * Reads the repeated double field from a Parquet {@code SimpleGroup} into a {@code double[]}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the double array, empty when the field is absent
+ */
@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -71,11 +103,21 @@ public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
return new double[0];
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a single double value.
+ *
+ * @return {@code Types.DOUBLE}
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.DOUBLE;
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a repeated double field.
+ *
+ * @return a primitive double-array type
+ */
@Override
public TypeInformation getArrayType() {
return Types.PRIMITIVE_ARRAY(Types.DOUBLE);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/FloatHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/FloatHandler.java
index 6083800d1..1f6b32f9f 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/FloatHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/FloatHandler.java
@@ -14,6 +14,9 @@
* The type Float primitive type handler.
*/
public class FloatHandler implements PrimitiveHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the float field this handler processes.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -25,16 +28,33 @@ public FloatHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field's Java type is {@code FLOAT}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == JavaType.FLOAT;
}
+ /**
+ * Parses the given value into a Java {@code float}.
+ *
+ * @param field the value to parse, defaulting to {@code 0} when {@code null}
+ * @return the parsed float value
+ */
@Override
public Object parseObject(Object field) {
return Float.parseFloat(getValueOrDefault(field, "0"));
}
+ /**
+ * Reads the float value for this field from a Parquet {@code SimpleGroup}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the float value, or {@code 0.0F} when the field is absent
+ */
@Override
public Object parseSimpleGroup(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -48,6 +68,12 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}
}
+ /**
+ * Converts a list of float values into a primitive {@code float[]}.
+ *
+ * @param field the list of float values, or {@code null}
+ * @return the values as a {@code float[]}, empty when {@code field} is {@code null}
+ */
@Override
public Object parseRepeatedObjectField(Object field) {
@@ -58,6 +84,12 @@ public Object parseRepeatedObjectField(Object field) {
return inputValues;
}
+ /**
+ * Reads the repeated float field from a Parquet {@code SimpleGroup} into a {@code float[]}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the float array, empty when the field is absent
+ */
@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -72,11 +104,21 @@ public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
return new float[0];
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a single float value.
+ *
+ * @return {@code Types.FLOAT}
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.FLOAT;
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a repeated float field.
+ *
+ * @return a primitive float-array type
+ */
@Override
public TypeInformation getArrayType() {
return Types.PRIMITIVE_ARRAY(Types.FLOAT);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/IntegerHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/IntegerHandler.java
index c181776dc..8923ed4e0 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/IntegerHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/IntegerHandler.java
@@ -14,6 +14,9 @@
* The type Integer primitive type handler.
*/
public class IntegerHandler implements PrimitiveHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the integer field this handler processes.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -25,16 +28,33 @@ public IntegerHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field's Java type is {@code INT}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == JavaType.INT;
}
+ /**
+ * Parses the given value into a Java {@code int}.
+ *
+ * @param field the value to parse, defaulting to {@code 0} when {@code null}
+ * @return the parsed integer value
+ */
@Override
public Object parseObject(Object field) {
return Integer.parseInt(getValueOrDefault(field, "0"));
}
+ /**
+ * Reads the integer value for this field from a Parquet {@code SimpleGroup}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the integer value, or {@code 0} when the field is absent
+ */
@Override
public Object parseSimpleGroup(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -48,6 +68,12 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}
}
+ /**
+ * Converts a list of integer values into a primitive {@code int[]}.
+ *
+ * @param field the list of integer values, or {@code null}
+ * @return the values as an {@code int[]}, empty when {@code field} is {@code null}
+ */
@Override
public Object parseRepeatedObjectField(Object field) {
int[] inputValues = new int[0];
@@ -57,6 +83,12 @@ public Object parseRepeatedObjectField(Object field) {
return inputValues;
}
+ /**
+ * Reads the repeated integer field from a Parquet {@code SimpleGroup} into an {@code int[]}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the integer array, empty when the field is absent
+ */
@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -71,11 +103,21 @@ public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
return new int[0];
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a single integer value.
+ *
+ * @return {@code Types.INT}
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.INT;
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a repeated integer field.
+ *
+ * @return a primitive int-array type
+ */
@Override
public TypeInformation getArrayType() {
return Types.PRIMITIVE_ARRAY(Types.INT);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/LongHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/LongHandler.java
index 9292f7cf0..c35271ee3 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/LongHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/LongHandler.java
@@ -14,6 +14,9 @@
* The type Long primitive type handler.
*/
public class LongHandler implements PrimitiveHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the long field this handler processes.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -25,16 +28,33 @@ public LongHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field's Java type is {@code LONG}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == JavaType.LONG;
}
+ /**
+ * Parses the given value into a Java {@code long}.
+ *
+ * @param field the value to parse, defaulting to {@code 0} when {@code null}
+ * @return the parsed long value
+ */
@Override
public Object parseObject(Object field) {
return Long.parseLong(getValueOrDefault(field, "0"));
}
+ /**
+ * Reads the long value for this field from a Parquet {@code SimpleGroup}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the long value, or {@code 0L} when the field is absent
+ */
@Override
public Object parseSimpleGroup(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -48,6 +68,12 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}
}
+ /**
+ * Converts a list of long values into a {@code Long[]}.
+ *
+ * @param field the list of long values, or {@code null}
+ * @return the values as a {@code Long[]}, empty when {@code field} is {@code null}
+ */
@Override
public Object parseRepeatedObjectField(Object field) {
List inputValues = new ArrayList<>();
@@ -57,6 +83,12 @@ public Object parseRepeatedObjectField(Object field) {
return inputValues.toArray(new Long[]{});
}
+ /**
+ * Reads the repeated long field from a Parquet {@code SimpleGroup} into a {@code Long[]}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the long array, empty when the field is absent
+ */
@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -70,11 +102,21 @@ public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
return longArrayList.toArray(new Long[]{});
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a single long value.
+ *
+ * @return {@code Types.LONG}
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.LONG;
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a repeated long field.
+ *
+ * @return an object-array type of {@code Long}
+ */
@Override
public TypeInformation getArrayType() {
return Types.OBJECT_ARRAY(Types.LONG);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/PrimitiveHandlerFactory.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/PrimitiveHandlerFactory.java
index bd4d1fb40..41741b6ab 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/PrimitiveHandlerFactory.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/PrimitiveHandlerFactory.java
@@ -26,6 +26,15 @@ public static PrimitiveHandler getTypeHandler(Descriptors.FieldDescriptor fieldD
return filteredTypeHandlers.orElseThrow(() -> new DataTypeNotSupportedException("Data type " + fieldDescriptor.getJavaType() + " not supported in primitive type handlers"));
}
+ /**
+ * Builds the ordered list of candidate primitive handlers for the given field.
+ *
+ *
{@code getTypeHandler} selects the first handler whose {@code canHandle()} returns
+ * {@code true}, so the order determines which handler claims each Java type.
+ *
+ * @param fieldDescriptor the field descriptor to build candidate handlers for
+ * @return the ordered list of candidate primitive handlers to try
+ */
private static List getSpecificHandlers(Descriptors.FieldDescriptor fieldDescriptor) {
return Arrays.asList(
new IntegerHandler(fieldDescriptor),
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/StringHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/StringHandler.java
index 71258dbf6..c2172a022 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/StringHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/primitive/StringHandler.java
@@ -15,6 +15,9 @@
* The type String primitive type handler.
*/
public class StringHandler implements PrimitiveHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the string field this handler processes.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
/**
@@ -26,16 +29,33 @@ public StringHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field's Java type is {@code STRING}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == JavaType.STRING;
}
+ /**
+ * Converts the given value into its string form.
+ *
+ * @param field the value to convert, defaulting to an empty string when {@code null}
+ * @return the value's string representation
+ */
@Override
public Object parseObject(Object field) {
return getValueOrDefault(field, "");
}
+ /**
+ * Reads the string value for this field from a Parquet {@code SimpleGroup}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the string value, or an empty string when the field is absent
+ */
@Override
public Object parseSimpleGroup(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -49,6 +69,12 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}
}
+ /**
+ * Converts a list of string values into a {@code String[]}.
+ *
+ * @param field the list of string values, or {@code null}
+ * @return the values as a {@code String[]}, empty when {@code field} is {@code null}
+ */
@Override
public Object parseRepeatedObjectField(Object field) {
List inputValues = new ArrayList<>();
@@ -58,6 +84,12 @@ public Object parseRepeatedObjectField(Object field) {
return inputValues.toArray(new String[]{});
}
+ /**
+ * Reads the repeated string field from a Parquet {@code SimpleGroup} into a {@code String[]}.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return the string array, empty when the field is absent
+ */
@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
@@ -72,11 +104,21 @@ public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
return new String[0];
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a single string value.
+ *
+ * @return {@code Types.STRING}
+ */
@Override
public TypeInformation getTypeInformation() {
return Types.STRING;
}
+ /**
+ * Returns the Flink {@code TypeInformation} for a repeated string field.
+ *
+ * @return an object-array type of {@code String}
+ */
@Override
public TypeInformation getArrayType() {
return ObjectArrayTypeInfo.getInfoFor(Types.STRING);
diff --git a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java
index a916d9b85..1ce7cfd0a 100644
--- a/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java
+++ b/dagger-common/src/main/java/com/gotocompany/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java
@@ -23,7 +23,13 @@
* The type Repeated enum proto handler.
*/
public class RepeatedEnumHandler implements TypeHandler {
+ /**
+ * The protobuf {@code FieldDescriptor} of the repeated enum field this handler processes.
+ */
private Descriptors.FieldDescriptor fieldDescriptor;
+ /**
+ * Shared Gson instance used to serialize the enum-name array to JSON.
+ */
private static final Gson GSON = new Gson();
/**
@@ -35,11 +41,27 @@ public RepeatedEnumHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}
+ /**
+ * Determines whether this handler applies to the field.
+ *
+ * @return {@code true} if the field is a repeated protobuf {@code enum}
+ */
@Override
public boolean canHandle() {
return fieldDescriptor.getJavaType() == ENUM && fieldDescriptor.isRepeated();
}
+ /**
+ * Sets the repeated enum field on the builder by resolving each value's name to a constant.
+ *
+ *
The input may be an array or a {@code List} of enum constant names. When the handler
+ * cannot apply or {@code field} is {@code null}, the builder is returned unchanged.
+ *
+ * @param builder the dynamic message builder being populated
+ * @param field the collection of enum constant names to set, or {@code null} to skip
+ * @return the same {@code builder}, with the repeated enum field set when provided
+ * @throws EnumFieldNotFoundException if any name does not match a constant of the enum
+ */
@Override
public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder builder, Object field) {
if (!canHandle() || field == null) {
@@ -55,6 +77,13 @@ public DynamicMessage.Builder transformToProtoBuilder(DynamicMessage.Builder bui
return builder;
}
+ /**
+ * Resolves a single value to its protobuf enum value descriptor by name.
+ *
+ * @param field the enum constant name to resolve
+ * @return the matching enum value descriptor
+ * @throws EnumFieldNotFoundException if the name does not match any constant of the enum
+ */
private Descriptors.EnumValueDescriptor getEnumValue(Object field) {
String stringValue = String.valueOf(field).trim();
Descriptors.EnumValueDescriptor valueByName = fieldDescriptor.getEnumType().findValueByName(stringValue);
@@ -64,21 +93,48 @@ private Descriptors.EnumValueDescriptor getEnumValue(Object field) {
return valueByName;
}
+ /**
+ * Converts a post-processor value into an array of enum constant names.
+ *
+ * @param field the collection of enum values emitted by an upstream post processor
+ * @return a {@code String[]} of enum constant names
+ */
@Override
public Object transformFromPostProcessor(Object field) {
return getValue(field);
}
+ /**
+ * Converts the repeated enum values read from a protobuf message into an array of names.
+ *
+ * @param field the repeated enum value read from the message
+ * @return a {@code String[]} of enum constant names
+ */
@Override
public Object transformFromProto(Object field) {
return getValue(field);
}
+ /**
+ * Converts the repeated protobuf enum values into an array of names, ignoring the cache.
+ *
+ * @param field the repeated enum value read from the message
+ * @param cache the field descriptor cache, unused for enum fields
+ * @return a {@code String[]} of enum constant names
+ */
@Override
public Object transformFromProtoUsingCache(Object field, FieldDescriptorCache cache) {
return getValue(field);
}
+ /**
+ * Reads the repeated enum field from a Parquet {@code SimpleGroup} into an array of names.
+ *
+ *
Unknown values fall back to the enum's zero-numbered default constant.
+ *
+ * @param simpleGroup the Parquet group holding the encoded record
+ * @return a {@code String[]} of enum constant names, empty when the field is absent
+ */
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
String defaultEnumValue = fieldDescriptor.getEnumType().findValueByNumber(0).getName();
@@ -96,16 +152,33 @@ public Object transformFromParquet(SimpleGroup simpleGroup) {
return enumArrayList.toArray(new String[]{});
}
+ /**
+ * Serializes the repeated enum values to a JSON array of constant names.
+ *
+ * @param field the repeated enum value to serialize
+ * @return the JSON string for the array of enum names
+ */
@Override
public Object transformToJson(Object field) {
return GSON.toJson(getValue(field));
}
+ /**
+ * Returns the Flink {@code TypeInformation} used to represent this repeated enum field.
+ *
+ * @return an object-array type of {@code String}
+ */
@Override
public TypeInformation getTypeInformation() {
return ObjectArrayTypeInfo.getInfoFor(Types.STRING);
}
+ /**
+ * Converts a collection of enum values into an array of their string names.
+ *
+ * @param field the collection of enum values, or {@code null}
+ * @return a {@code String[]} of names, empty when {@code field} is {@code null}
+ */
private Object getValue(Object field) {
List values = new ArrayList<>();
if (field != null) {
@@ -114,6 +187,12 @@ private Object getValue(Object field) {
return values.toArray(new String[]{});
}
+ /**
+ * Maps each element of the given list to its string representation.
+ *
+ * @param protos the list of enum values to stringify
+ * @return the list of string names
+ */
private List getStringRow(List