From 4867fe591d40a270bba79d3949f436e781dfee63 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 10 Jun 2026 15:22:41 +0700 Subject: [PATCH 01/10] feat(sink): add SINK_CSV_* config constants for CSV sink Introduce configuration keys and defaults for the upcoming CSV sink: base path, write mode (APPEND/OVERWRITE, default OVERWRITE), date format, delimiter, header toggle, and filename prefix. Co-authored-by: Cursor --- .../gotocompany/dagger/core/utils/Constants.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index 2e658730d..41d489083 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -202,6 +202,22 @@ public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIAB public static final String SINK_ERROR_TYPES_FOR_FAILURE = "SINK_ERROR_TYPES_FOR_FAILURE"; public static final String SINK_ERROR_TYPES_FOR_FAILURE_DEFAULT = ""; + // CSV sink. Output path = SINK_CSV_BASE_PATH//-.csv + public static final String SINK_CSV_BASE_PATH_KEY = "SINK_CSV_BASE_PATH"; + public static final String SINK_CSV_WRITE_MODE_KEY = "SINK_CSV_WRITE_MODE"; + public static final String SINK_CSV_WRITE_MODE_APPEND = "APPEND"; + public static final String SINK_CSV_WRITE_MODE_OVERWRITE = "OVERWRITE"; + public static final String SINK_CSV_WRITE_MODE_DEFAULT = SINK_CSV_WRITE_MODE_OVERWRITE; + // Java DateTimeFormatter pattern (dd day-of-month, MMM month abbrev, yyyy year). Rendered with Locale.ENGLISH. + public static final String SINK_CSV_DATE_FORMAT_KEY = "SINK_CSV_DATE_FORMAT"; + public static final String SINK_CSV_DATE_FORMAT_DEFAULT = "dd-MMM-yyyy"; + public static final String SINK_CSV_DELIMITER_KEY = "SINK_CSV_DELIMITER"; + public static final String SINK_CSV_DELIMITER_DEFAULT = ","; + public static final String SINK_CSV_WRITE_HEADER_KEY = "SINK_CSV_WRITE_HEADER"; + public static final boolean SINK_CSV_WRITE_HEADER_DEFAULT = true; + public static final String SINK_CSV_FILENAME_PREFIX_KEY = "SINK_CSV_FILENAME_PREFIX"; + public static final String SINK_CSV_FILENAME_PREFIX_DEFAULT = "output"; + public static final String[] SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL = {"SASL_PLAINTEXT", "SASL_SSL", "SSL"}; public static final String[] SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM = {"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"}; From 0842387db9b3c886bfaf5b4894620ef70ad61cb5 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 10 Jun 2026 15:43:04 +0700 Subject: [PATCH 02/10] feat(sink): add Flink FileSystem-backed storage client for CSV sink Add FileStorageClient abstraction (read/exists/write with whole-object semantics) and a single FlinkFileSystemStorageClient implementation backed by Flink's FileSystem, so the path scheme (file/gs/oss/cosn/s3) selects the backend without any cloud-provider SDKs. Includes unit tests. Co-authored-by: Cursor --- .../sink/csv/storage/FileStorageClient.java | 29 ++++++++ .../storage/FlinkFileSystemStorageClient.java | 54 ++++++++++++++ .../FlinkFileSystemStorageClientTest.java | 72 +++++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FileStorageClient.java create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClient.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClientTest.java diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FileStorageClient.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FileStorageClient.java new file mode 100644 index 000000000..24cb9931f --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FileStorageClient.java @@ -0,0 +1,29 @@ +package com.gotocompany.dagger.core.sink.csv.storage; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Abstraction over the destination file store used by the CSV sink. Implementations are expected to + * support whole-object read and overwrite semantics (object stores like GCS/OSS/COS/S3 do not allow + * true append, so the sink relies on read-modify-write). + */ +public interface FileStorageClient extends Serializable { + + /** + * @return true if an object already exists at the given path. + */ + boolean exists(String path) throws IOException; + + /** + * Reads the full content of the object at the given path. + * + * @return the object bytes, or an empty array if the object does not exist. + */ + byte[] read(String path) throws IOException; + + /** + * Writes (creating or fully replacing) the object at the given path with the given content. + */ + void write(String path, byte[] content) throws IOException; +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClient.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClient.java new file mode 100644 index 000000000..99685c3aa --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClient.java @@ -0,0 +1,54 @@ +package com.gotocompany.dagger.core.sink.csv.storage; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * {@link FileStorageClient} backed by Flink's {@link FileSystem} abstraction. The scheme of the + * supplied path (e.g. {@code file://}, {@code gs://}, {@code oss://}, {@code cosn://}, {@code s3://}) + * selects the underlying filesystem, so credentials and protocol handling are delegated to the + * filesystem Flink already configures. No cloud-provider SDKs are required. + */ +public class FlinkFileSystemStorageClient implements FileStorageClient { + + private static final long serialVersionUID = 1L; + private static final int BUFFER_SIZE = 8192; + + @Override + public boolean exists(String path) throws IOException { + Path filePath = new Path(path); + return filePath.getFileSystem().exists(filePath); + } + + @Override + public byte[] read(String path) throws IOException { + Path filePath = new Path(path); + FileSystem fileSystem = filePath.getFileSystem(); + if (!fileSystem.exists(filePath)) { + return new byte[0]; + } + try (FSDataInputStream inputStream = fileSystem.open(filePath); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[BUFFER_SIZE]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + return outputStream.toByteArray(); + } + } + + @Override + public void write(String path, byte[] content) throws IOException { + Path filePath = new Path(path); + FileSystem fileSystem = filePath.getFileSystem(); + try (FSDataOutputStream outputStream = fileSystem.create(filePath, FileSystem.WriteMode.OVERWRITE)) { + outputStream.write(content); + } + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClientTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClientTest.java new file mode 100644 index 000000000..4441a115e --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClientTest.java @@ -0,0 +1,72 @@ +package com.gotocompany.dagger.core.sink.csv.storage; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FlinkFileSystemStorageClientTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FlinkFileSystemStorageClient storageClient; + + @Before + public void setup() { + storageClient = new FlinkFileSystemStorageClient(); + } + + private String pathFor(String fileName) { + return new File(temporaryFolder.getRoot(), fileName).getAbsolutePath(); + } + + @Test + public void shouldReturnFalseWhenFileDoesNotExist() throws Exception { + assertFalse(storageClient.exists(pathFor("missing.csv"))); + } + + @Test + public void shouldReturnEmptyBytesWhenReadingMissingFile() throws Exception { + assertEquals(0, storageClient.read(pathFor("missing.csv")).length); + } + + @Test + public void shouldWriteAndReadBackContent() throws Exception { + String path = pathFor("output.csv"); + byte[] content = "id,name\n1,foo\n".getBytes(StandardCharsets.UTF_8); + + storageClient.write(path, content); + + assertTrue(storageClient.exists(path)); + assertArrayEquals(content, storageClient.read(path)); + } + + @Test + public void shouldOverwriteExistingContent() throws Exception { + String path = pathFor("output.csv"); + + storageClient.write(path, "first".getBytes(StandardCharsets.UTF_8)); + storageClient.write(path, "second".getBytes(StandardCharsets.UTF_8)); + + assertEquals("second", new String(storageClient.read(path), StandardCharsets.UTF_8)); + } + + @Test + public void shouldCreateParentDirectoriesWhenWriting() throws Exception { + String path = pathFor("my-job-id/nested/output.csv"); + byte[] content = "a,b\n1,2\n".getBytes(StandardCharsets.UTF_8); + + storageClient.write(path, content); + + assertArrayEquals(content, storageClient.read(path)); + } +} From 1ed0786329197743972cef26066d8b41e5c71d4c Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 10 Jun 2026 18:48:39 +0700 Subject: [PATCH 03/10] feat(sink): add APPEND/OVERWRITE write strategies for CSV sink Add FileWriteStrategy with AppendWriteStrategy (read-modify-write, time-series) and OverwriteWriteStrategy (full-replace snapshot, no-op on empty buffer), plus a factory keyed on SINK_CSV_WRITE_MODE. Both no-op when the buffer is empty. Includes tests and an in-memory storage test helper. Co-authored-by: Cursor --- .gitignore | 4 +- .../csv/writemode/AppendWriteStrategy.java | 40 +++++++++++++ .../sink/csv/writemode/FileWriteStrategy.java | 24 ++++++++ .../writemode/FileWriteStrategyFactory.java | 26 ++++++++ .../csv/writemode/OverwriteWriteStrategy.java | 33 +++++++++++ .../sink/csv/InMemoryFileStorageClient.java | 41 +++++++++++++ .../writemode/AppendWriteStrategyTest.java | 57 ++++++++++++++++++ .../FileWriteStrategyFactoryTest.java | 29 +++++++++ .../writemode/OverwriteWriteStrategyTest.java | 59 +++++++++++++++++++ 9 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/AppendWriteStrategy.java create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategy.java create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategyFactory.java create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/OverwriteWriteStrategy.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/InMemoryFileStorageClient.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/AppendWriteStrategyTest.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategyFactoryTest.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/OverwriteWriteStrategyTest.java diff --git a/.gitignore b/.gitignore index 50e2532d8..4921785c2 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,6 @@ bin .settings .gradletasknamecache .DS_Store -dagger-common/src/generated-sources/ \ No newline at end of file +dagger-common/src/generated-sources/ +plans +.cursor \ No newline at end of file diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/AppendWriteStrategy.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/AppendWriteStrategy.java new file mode 100644 index 000000000..587fdbb51 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/AppendWriteStrategy.java @@ -0,0 +1,40 @@ +package com.gotocompany.dagger.core.sink.csv.writemode; + +import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Appends the buffered lines to the existing daily file (read-modify-write, since object stores do + * not support true append). The header is written only when the file is new/empty. Produces a + * growing time-series file. At-least-once: on restart, replayed rows may be appended again. + */ +public class AppendWriteStrategy implements FileWriteStrategy { + + private static final long serialVersionUID = 1L; + private static final String LINE_SEPARATOR = "\n"; + + @Override + public void flush(FileStorageClient storageClient, String path, String headerLine, List bufferedLines) throws IOException { + if (bufferedLines.isEmpty()) { + return; + } + byte[] existingContent = storageClient.read(path); + boolean fileIsEmpty = existingContent == null || existingContent.length == 0; + + StringBuilder content = new StringBuilder(); + if (fileIsEmpty) { + if (headerLine != null) { + content.append(headerLine).append(LINE_SEPARATOR); + } + } else { + content.append(new String(existingContent, StandardCharsets.UTF_8)); + } + for (String line : bufferedLines) { + content.append(line).append(LINE_SEPARATOR); + } + storageClient.write(path, content.toString().getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategy.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategy.java new file mode 100644 index 000000000..f91e7fa96 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategy.java @@ -0,0 +1,24 @@ +package com.gotocompany.dagger.core.sink.csv.writemode; + +import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Strategy for turning a buffer of formatted CSV lines into a write against the destination file. + * Implementations decide whether the new lines are appended to the existing daily file or fully + * replace it. All implementations must be a no-op when the buffer is empty (so empty checkpoints do + * not blank or needlessly rewrite the file). + */ +public interface FileWriteStrategy extends Serializable { + + /** + * @param storageClient the destination store. + * @param path the full target object path for the current day. + * @param headerLine the CSV header line, or null when headers are disabled. + * @param bufferedLines the formatted CSV rows accumulated since the last flush. + */ + void flush(FileStorageClient storageClient, String path, String headerLine, List bufferedLines) throws IOException; +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategyFactory.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategyFactory.java new file mode 100644 index 000000000..a4912c08c --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategyFactory.java @@ -0,0 +1,26 @@ +package com.gotocompany.dagger.core.sink.csv.writemode; + +import com.gotocompany.dagger.core.utils.Constants; + +/** + * Resolves the configured {@code SINK_CSV_WRITE_MODE} into a {@link FileWriteStrategy}. Extensible: + * a future UPSERT (merge-by-key) mode can be added here without touching the writer. + */ +public class FileWriteStrategyFactory { + + private FileWriteStrategyFactory() { + } + + public static FileWriteStrategy getWriteStrategy(String writeMode) { + String normalizedWriteMode = writeMode == null ? "" : writeMode.trim().toUpperCase(); + switch (normalizedWriteMode) { + case Constants.SINK_CSV_WRITE_MODE_APPEND: + return new AppendWriteStrategy(); + case Constants.SINK_CSV_WRITE_MODE_OVERWRITE: + return new OverwriteWriteStrategy(); + default: + throw new IllegalArgumentException("Unsupported CSV sink write mode: '" + writeMode + + "'. Supported values: " + Constants.SINK_CSV_WRITE_MODE_APPEND + ", " + Constants.SINK_CSV_WRITE_MODE_OVERWRITE); + } + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/OverwriteWriteStrategy.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/OverwriteWriteStrategy.java new file mode 100644 index 000000000..1f5a183bd --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/writemode/OverwriteWriteStrategy.java @@ -0,0 +1,33 @@ +package com.gotocompany.dagger.core.sink.csv.writemode; + +import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Fully replaces the daily file with the buffered lines on each flush, producing a snapshot of the + * most recent window. When the buffer is empty the file is left untouched (no blanking between + * windows). Restart-safe by construction: a replayed window simply overwrites with the same content. + */ +public class OverwriteWriteStrategy implements FileWriteStrategy { + + private static final long serialVersionUID = 1L; + private static final String LINE_SEPARATOR = "\n"; + + @Override + public void flush(FileStorageClient storageClient, String path, String headerLine, List bufferedLines) throws IOException { + if (bufferedLines.isEmpty()) { + return; + } + StringBuilder content = new StringBuilder(); + if (headerLine != null) { + content.append(headerLine).append(LINE_SEPARATOR); + } + for (String line : bufferedLines) { + content.append(line).append(LINE_SEPARATOR); + } + storageClient.write(path, content.toString().getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/InMemoryFileStorageClient.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/InMemoryFileStorageClient.java new file mode 100644 index 000000000..67a86dd57 --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/InMemoryFileStorageClient.java @@ -0,0 +1,41 @@ +package com.gotocompany.dagger.core.sink.csv; + +import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +/** + * In-memory {@link FileStorageClient} for tests. Records content per path and counts writes so tests + * can assert no-op behaviour. + */ +public class InMemoryFileStorageClient implements FileStorageClient { + + private final Map files = new HashMap<>(); + private int writeCount = 0; + + @Override + public boolean exists(String path) { + return files.containsKey(path); + } + + @Override + public byte[] read(String path) { + return files.getOrDefault(path, new byte[0]); + } + + @Override + public void write(String path, byte[] content) { + files.put(path, content); + writeCount++; + } + + public String readAsString(String path) { + return new String(files.getOrDefault(path, new byte[0]), StandardCharsets.UTF_8); + } + + public int getWriteCount() { + return writeCount; + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/AppendWriteStrategyTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/AppendWriteStrategyTest.java new file mode 100644 index 000000000..cf0e5d970 --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/AppendWriteStrategyTest.java @@ -0,0 +1,57 @@ +package com.gotocompany.dagger.core.sink.csv.writemode; + +import com.gotocompany.dagger.core.sink.csv.InMemoryFileStorageClient; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class AppendWriteStrategyTest { + + private static final String PATH = "/tmp/job/output-09-Jun-2026.csv"; + private static final String HEADER = "service_type,booking_count"; + + private AppendWriteStrategy strategy; + private InMemoryFileStorageClient storageClient; + + @Before + public void setup() { + strategy = new AppendWriteStrategy(); + storageClient = new InMemoryFileStorageClient(); + } + + @Test + public void shouldNotWriteWhenBufferIsEmpty() throws Exception { + strategy.flush(storageClient, PATH, HEADER, Collections.emptyList()); + + assertEquals(0, storageClient.getWriteCount()); + assertFalse(storageClient.exists(PATH)); + } + + @Test + public void shouldWriteHeaderAndRowsWhenFileIsNew() throws Exception { + strategy.flush(storageClient, PATH, HEADER, Arrays.asList("GO_RIDE,120", "GO_CAR,85")); + + assertEquals("service_type,booking_count\nGO_RIDE,120\nGO_CAR,85\n", storageClient.readAsString(PATH)); + } + + @Test + public void shouldAppendRowsWithoutRepeatingHeaderWhenFileExists() throws Exception { + strategy.flush(storageClient, PATH, HEADER, Collections.singletonList("GO_RIDE,120")); + strategy.flush(storageClient, PATH, HEADER, Collections.singletonList("GO_CAR,85")); + + assertEquals("service_type,booking_count\nGO_RIDE,120\nGO_CAR,85\n", storageClient.readAsString(PATH)); + assertEquals(2, storageClient.getWriteCount()); + } + + @Test + public void shouldWriteOnlyRowsWhenHeaderIsDisabled() throws Exception { + strategy.flush(storageClient, PATH, null, Arrays.asList("GO_RIDE,120", "GO_CAR,85")); + + assertEquals("GO_RIDE,120\nGO_CAR,85\n", storageClient.readAsString(PATH)); + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategyFactoryTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategyFactoryTest.java new file mode 100644 index 000000000..1b0da64e3 --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/FileWriteStrategyFactoryTest.java @@ -0,0 +1,29 @@ +package com.gotocompany.dagger.core.sink.csv.writemode; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class FileWriteStrategyFactoryTest { + + @Test + public void shouldReturnAppendStrategy() { + assertTrue(FileWriteStrategyFactory.getWriteStrategy("APPEND") instanceof AppendWriteStrategy); + } + + @Test + public void shouldReturnOverwriteStrategy() { + assertTrue(FileWriteStrategyFactory.getWriteStrategy("OVERWRITE") instanceof OverwriteWriteStrategy); + } + + @Test + public void shouldBeCaseInsensitiveAndTrimmed() { + assertTrue(FileWriteStrategyFactory.getWriteStrategy(" overwrite ") instanceof OverwriteWriteStrategy); + assertTrue(FileWriteStrategyFactory.getWriteStrategy("append") instanceof AppendWriteStrategy); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnsupportedWriteMode() { + FileWriteStrategyFactory.getWriteStrategy("UPSERT"); + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/OverwriteWriteStrategyTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/OverwriteWriteStrategyTest.java new file mode 100644 index 000000000..b24e5481a --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/writemode/OverwriteWriteStrategyTest.java @@ -0,0 +1,59 @@ +package com.gotocompany.dagger.core.sink.csv.writemode; + +import com.gotocompany.dagger.core.sink.csv.InMemoryFileStorageClient; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class OverwriteWriteStrategyTest { + + private static final String PATH = "/tmp/job/output-09-Jun-2026.csv"; + private static final String HEADER = "service_type,booking_count"; + + private OverwriteWriteStrategy strategy; + private InMemoryFileStorageClient storageClient; + + @Before + public void setup() { + strategy = new OverwriteWriteStrategy(); + storageClient = new InMemoryFileStorageClient(); + } + + @Test + public void shouldNotWriteWhenBufferIsEmpty() throws Exception { + strategy.flush(storageClient, PATH, HEADER, Collections.emptyList()); + + assertEquals(0, storageClient.getWriteCount()); + assertFalse(storageClient.exists(PATH)); + } + + @Test + public void shouldNotBlankExistingFileOnEmptyBuffer() throws Exception { + strategy.flush(storageClient, PATH, HEADER, Collections.singletonList("GO_RIDE,120")); + strategy.flush(storageClient, PATH, HEADER, Collections.emptyList()); + + assertEquals("service_type,booking_count\nGO_RIDE,120\n", storageClient.readAsString(PATH)); + assertEquals(1, storageClient.getWriteCount()); + } + + @Test + public void shouldReplaceFileContentWithLatestBuffer() throws Exception { + strategy.flush(storageClient, PATH, HEADER, Arrays.asList("GO_RIDE,120", "GO_CAR,85")); + strategy.flush(storageClient, PATH, HEADER, Arrays.asList("GO_RIDE,95", "GO_CAR,70")); + + assertEquals("service_type,booking_count\nGO_RIDE,95\nGO_CAR,70\n", storageClient.readAsString(PATH)); + assertEquals(2, storageClient.getWriteCount()); + } + + @Test + public void shouldWriteOnlyRowsWhenHeaderIsDisabled() throws Exception { + strategy.flush(storageClient, PATH, null, Collections.singletonList("GO_RIDE,120")); + + assertEquals("GO_RIDE,120\n", storageClient.readAsString(PATH)); + } +} From b10002919e44a74cecbca4657ec29ec2dadb98bf Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 10 Jun 2026 20:24:15 +0700 Subject: [PATCH 04/10] feat(sink): add CsvSink and CsvSinkWriter for daily-rolling CSV output Add the Flink Sink V1 CsvSink and its CsvSinkWriter, which buffers Row outputs and flushes them on every checkpoint (snapshotState) and on close. Output rolls over daily to basePath//-.csv. Values are RFC-4180 escaped, nulls become empty cells, and Map/Collection/array values are JSON-encoded into a single cell. A Clock is injectable for deterministic date tests. Includes CsvSinkWriterTest. Co-authored-by: Cursor --- .../dagger/core/sink/csv/CsvSink.java | 63 ++++++++ .../dagger/core/sink/csv/CsvSinkConfig.java | 52 +++++++ .../dagger/core/sink/csv/CsvSinkWriter.java | 143 ++++++++++++++++++ .../core/sink/csv/CsvSinkWriterTest.java | 121 +++++++++++++++ 4 files changed, 379 insertions(+) create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSink.java create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkConfig.java create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSink.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSink.java new file mode 100644 index 000000000..451261574 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSink.java @@ -0,0 +1,63 @@ +package com.gotocompany.dagger.core.sink.csv; + +import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient; +import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategy; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Optional; + +/** + * Sink that writes the output {@link Row}s as a daily-rolling CSV file to any Flink filesystem. + * Buffered rows are flushed on every checkpoint (via the writer's snapshotState). At-least-once; + * the sink must run with parallelism 1 since a single daily object cannot be written concurrently. + */ +public class CsvSink implements Sink { + + private final String[] columnNames; + private final CsvSinkConfig config; + private final FileStorageClient storageClient; + private final FileWriteStrategy writeStrategy; + + public CsvSink(String[] columnNames, CsvSinkConfig config, FileStorageClient storageClient, FileWriteStrategy writeStrategy) { + this.columnNames = columnNames; + this.config = config; + this.storageClient = storageClient; + this.writeStrategy = writeStrategy; + } + + @Override + public SinkWriter createWriter(InitContext context, List states) { + return new CsvSinkWriter(columnNames, config, storageClient, writeStrategy); + } + + @Override + public Optional> getWriterStateSerializer() { + return Optional.empty(); + } + + @Override + public Optional> createCommitter() { + return Optional.empty(); + } + + @Override + public Optional> createGlobalCommitter() { + return Optional.empty(); + } + + @Override + public Optional> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional> getGlobalCommittableSerializer() { + return Optional.empty(); + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkConfig.java new file mode 100644 index 000000000..f16b0ef2f --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkConfig.java @@ -0,0 +1,52 @@ +package com.gotocompany.dagger.core.sink.csv; + +import java.io.Serializable; + +/** + * Immutable, serializable configuration for the CSV sink. The full output path for a given day is + * {@code basePath//-.csv}. + */ +public class CsvSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String basePath; + private final String jobId; + private final String filenamePrefix; + private final String dateFormat; + private final String delimiter; + private final boolean writeHeader; + + public CsvSinkConfig(String basePath, String jobId, String filenamePrefix, String dateFormat, String delimiter, boolean writeHeader) { + this.basePath = basePath; + this.jobId = jobId; + this.filenamePrefix = filenamePrefix; + this.dateFormat = dateFormat; + this.delimiter = delimiter; + this.writeHeader = writeHeader; + } + + public String getBasePath() { + return basePath; + } + + public String getJobId() { + return jobId; + } + + public String getFilenamePrefix() { + return filenamePrefix; + } + + public String getDateFormat() { + return dateFormat; + } + + public String getDelimiter() { + return delimiter; + } + + public boolean isWriteHeader() { + return writeHeader; + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java new file mode 100644 index 000000000..43fe45341 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java @@ -0,0 +1,143 @@ +package com.gotocompany.dagger.core.sink.csv; + +import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient; +import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategy; +import com.google.gson.Gson; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.time.Clock; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * Buffers formatted CSV rows and flushes them on every checkpoint (snapshotState) and on close. + * The destination path rolls over daily: {@code basePath//-.csv}. Whether the + * flush appends or fully replaces the file is decided by the configured {@link FileWriteStrategy}. + */ +public class CsvSinkWriter implements SinkWriter { + + private static final Gson GSON = new Gson(); + private static final String PATH_SEPARATOR = "/"; + private static final String FILE_EXTENSION = ".csv"; + + private final String[] columnNames; + private final CsvSinkConfig config; + private final FileStorageClient storageClient; + private final FileWriteStrategy writeStrategy; + private final DateTimeFormatter dateTimeFormatter; + private final String sanitizedJobId; + private final String basePath; + private final Clock clock; + private final List bufferedLines = new ArrayList<>(); + + public CsvSinkWriter(String[] columnNames, CsvSinkConfig config, FileStorageClient storageClient, FileWriteStrategy writeStrategy) { + this(columnNames, config, storageClient, writeStrategy, Clock.systemDefaultZone()); + } + + CsvSinkWriter(String[] columnNames, CsvSinkConfig config, FileStorageClient storageClient, FileWriteStrategy writeStrategy, Clock clock) { + this.columnNames = columnNames; + this.config = config; + this.storageClient = storageClient; + this.writeStrategy = writeStrategy; + this.clock = clock; + this.dateTimeFormatter = DateTimeFormatter.ofPattern(config.getDateFormat(), Locale.ENGLISH); + this.sanitizedJobId = sanitize(config.getJobId()); + this.basePath = stripTrailingSeparator(config.getBasePath()); + } + + @Override + public void write(Row row, Context context) { + bufferedLines.add(formatRow(row)); + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + flush(); + return Collections.emptyList(); + } + + @Override + public List prepareCommit(boolean flush) { + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + flush(); + } + + private void flush() throws IOException { + writeStrategy.flush(storageClient, buildPath(), buildHeaderLine(), bufferedLines); + bufferedLines.clear(); + } + + private String buildPath() { + String date = LocalDate.now(clock).format(dateTimeFormatter); + return basePath + PATH_SEPARATOR + sanitizedJobId + PATH_SEPARATOR + + config.getFilenamePrefix() + "-" + date + FILE_EXTENSION; + } + + private String buildHeaderLine() { + if (!config.isWriteHeader()) { + return null; + } + StringBuilder header = new StringBuilder(); + for (int i = 0; i < columnNames.length; i++) { + if (i > 0) { + header.append(config.getDelimiter()); + } + header.append(escape(columnNames[i])); + } + return header.toString(); + } + + private String formatRow(Row row) { + StringBuilder line = new StringBuilder(); + for (int i = 0; i < columnNames.length; i++) { + if (i > 0) { + line.append(config.getDelimiter()); + } + line.append(escape(formatValue(row.getField(i)))); + } + return line.toString(); + } + + private String formatValue(Object value) { + if (value == null) { + return ""; + } + if (value instanceof Map || value instanceof Collection || value.getClass().isArray()) { + return GSON.toJson(value); + } + return String.valueOf(value); + } + + private String escape(String field) { + String delimiter = config.getDelimiter(); + boolean needsQuoting = field.contains(delimiter) || field.contains("\"") + || field.contains("\n") || field.contains("\r"); + if (!needsQuoting) { + return field; + } + return "\"" + field.replace("\"", "\"\"") + "\""; + } + + private static String sanitize(String jobId) { + return jobId.trim().replaceAll("\\s+", "_"); + } + + private static String stripTrailingSeparator(String path) { + if (path.endsWith(PATH_SEPARATOR)) { + return path.substring(0, path.length() - 1); + } + return path; + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java new file mode 100644 index 000000000..279e0d203 --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java @@ -0,0 +1,121 @@ +package com.gotocompany.dagger.core.sink.csv; + +import com.gotocompany.dagger.core.sink.csv.writemode.OverwriteWriteStrategy; +import org.apache.flink.types.Row; +import org.junit.Before; +import org.junit.Test; + +import java.time.Clock; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CsvSinkWriterTest { + + private static final String BASE_PATH = "file:///tmp/out"; + private static final Clock DAY_ONE = Clock.fixed(Instant.parse("2026-06-09T10:00:00Z"), ZoneOffset.UTC); + private static final Clock DAY_TWO = Clock.fixed(Instant.parse("2026-06-10T10:00:00Z"), ZoneOffset.UTC); + + private InMemoryFileStorageClient storageClient; + + @Before + public void setup() { + storageClient = new InMemoryFileStorageClient(); + } + + private CsvSinkConfig config(boolean writeHeader) { + return new CsvSinkConfig(BASE_PATH, "my bookings job", "output", "dd-MMM-yyyy", ",", writeHeader); + } + + private CsvSinkWriter writer(String[] columnNames, boolean writeHeader, Clock clock) { + return new CsvSinkWriter(columnNames, config(writeHeader), storageClient, new OverwriteWriteStrategy(), clock); + } + + @Test + public void shouldWriteHeaderAndRowOnSnapshot() throws Exception { + CsvSinkWriter writer = writer(new String[]{"service_type", "booking_count"}, true, DAY_ONE); + + writer.write(Row.of("GO_RIDE", 120L), null); + writer.snapshotState(1L); + + String expectedPath = "file:///tmp/out/my_bookings_job/output-09-Jun-2026.csv"; + assertTrue(storageClient.exists(expectedPath)); + assertEquals("service_type,booking_count\nGO_RIDE,120\n", storageClient.readAsString(expectedPath)); + } + + @Test + public void shouldSanitizeJobIdAndBuildDailyPath() throws Exception { + CsvSinkWriter writer = writer(new String[]{"a"}, false, DAY_ONE); + + writer.write(Row.of("x"), null); + writer.snapshotState(1L); + + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-09-Jun-2026.csv")); + } + + @Test + public void shouldRollOverToNextDayFile() throws Exception { + writer(new String[]{"a"}, false, DAY_TWO).write(Row.of("x"), null); + CsvSinkWriter dayTwoWriter = writer(new String[]{"a"}, false, DAY_TWO); + dayTwoWriter.write(Row.of("x"), null); + dayTwoWriter.snapshotState(1L); + + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-10-Jun-2026.csv")); + } + + @Test + public void shouldFormatNullFieldAsEmpty() throws Exception { + CsvSinkWriter writer = writer(new String[]{"a", "b"}, false, DAY_ONE); + + writer.write(Row.of("foo", null), null); + writer.snapshotState(1L); + + assertEquals("foo,\n", storageClient.readAsString(pathDayOne())); + } + + @Test + public void shouldFormatLocalDateTimeAsIso() throws Exception { + CsvSinkWriter writer = writer(new String[]{"window_timestamp"}, false, DAY_ONE); + + writer.write(Row.of(LocalDateTime.of(2026, 6, 9, 12, 3, 0)), null); + writer.snapshotState(1L); + + assertEquals("2026-06-09T12:03\n", storageClient.readAsString(pathDayOne())); + } + + @Test + public void shouldQuoteFieldsContainingDelimiterQuoteOrNewline() throws Exception { + CsvSinkWriter writer = writer(new String[]{"a", "b", "c"}, false, DAY_ONE); + + writer.write(Row.of("has,comma", "has\"quote", "has\nnewline"), null); + writer.snapshotState(1L); + + assertEquals("\"has,comma\",\"has\"\"quote\",\"has\nnewline\"\n", storageClient.readAsString(pathDayOne())); + } + + @Test + public void shouldJsonEncodeCompositeValues() throws Exception { + CsvSinkWriter writer = writer(new String[]{"ids"}, false, DAY_ONE); + + writer.write(Row.of((Object) new int[]{1, 2, 3}), null); + writer.snapshotState(1L); + + assertEquals("\"[1,2,3]\"\n", storageClient.readAsString(pathDayOne())); + } + + @Test + public void shouldNotWriteWhenNoRowsBuffered() throws Exception { + CsvSinkWriter writer = writer(new String[]{"a"}, true, DAY_ONE); + + writer.snapshotState(1L); + + assertEquals(0, storageClient.getWriteCount()); + } + + private String pathDayOne() { + return "file:///tmp/out/my_bookings_job/output-09-Jun-2026.csv"; + } +} From fa6a77daf82e02ffb7f6d3a24aa6ec757d8e8863 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 10 Jun 2026 20:41:26 +0700 Subject: [PATCH 05/10] feat(sink): wire CSV sink into orchestrator with parallelism-1 guard Register SINK_TYPE=csv in SinkOrchestrator via a new CsvSinkBuilder that reads Dagger Configuration (SINK_CSV_BASE_PATH required) and assembles the sink with its write strategy and Flink filesystem storage client. Pin the CSV sink to parallelism 1 in DaggerSqlJobBuilder so a single subtask owns the daily file, leaving other sinks untouched. Add SINK_TYPE_* constants and tests for the builder and orchestrator wiring. Co-authored-by: Cursor --- .../dagger/core/DaggerSqlJobBuilder.java | 12 +++- .../dagger/core/sink/SinkOrchestrator.java | 4 ++ .../dagger/core/sink/csv/CsvSinkBuilder.java | 40 +++++++++++ .../dagger/core/utils/Constants.java | 4 ++ .../core/sink/SinkOrchestratorTest.java | 21 ++++++ .../core/sink/csv/CsvSinkBuilderTest.java | 67 +++++++++++++++++++ 6 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java index b5c65ee90..6f1b5fc3b 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java @@ -24,6 +24,7 @@ import com.gotocompany.dagger.functions.udfs.python.PythonUdfManager; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.Table; @@ -231,6 +232,15 @@ private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) { private void addSink(StreamInfo streamInfo) { SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter); sinkOrchestrator.addSubscriber(telemetryExporter); - streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter, InfluxSinkOverrides.none())); + DataStreamSink dataStreamSink = streamInfo.getDataStream() + .sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter, InfluxSinkOverrides.none())); + + // make sure previous tasks aggregate the data to one value if you don't want multiple rows for same keys/labels in csv + // For example + // GoFood created 35 (from task 0) + // GoFood created 15 (from task 1) + if (Constants.SINK_TYPE_CSV.equals(configuration.getString(Constants.SINK_TYPE_KEY, Constants.SINK_TYPE_DEFAULT))) { + dataStreamSink.setParallelism(1).name("csv-file-sink"); + } } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java index f8f418a77..82d83bb32 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java @@ -5,6 +5,7 @@ import com.gotocompany.dagger.core.metrics.telemetry.TelemetryPublisher; import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes; import com.gotocompany.dagger.core.sink.bigquery.BigQuerySinkBuilder; +import com.gotocompany.dagger.core.sink.csv.CsvSinkBuilder; import com.gotocompany.dagger.core.sink.influx.ErrorHandler; import com.gotocompany.dagger.core.sink.influx.InfluxDBFactoryWrapper; import com.gotocompany.dagger.core.sink.influx.InfluxDBSink; @@ -80,6 +81,9 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl case "log": sink = new LogSink(columnNames); break; + case "csv": + sink = CsvSinkBuilder.build(configuration, columnNames); + break; case "bigquery": sink = BigQuerySinkBuilder.create() .setColumnNames(columnNames) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java new file mode 100644 index 000000000..15e3d16f1 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java @@ -0,0 +1,40 @@ +package com.gotocompany.dagger.core.sink.csv; + +import com.gotocompany.dagger.common.configuration.Configuration; +import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient; +import com.gotocompany.dagger.core.sink.csv.storage.FlinkFileSystemStorageClient; +import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategy; +import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategyFactory; +import com.gotocompany.dagger.core.utils.Constants; + +/** + * Builds a {@link CsvSink} from Dagger {@link Configuration}, wiring the write strategy and the + * Flink filesystem-backed storage client. SINK_CSV_BASE_PATH is required; everything else has a + * sensible default defined in {@link Constants}. + */ +public class CsvSinkBuilder { + + private CsvSinkBuilder() { + } + + public static CsvSink build(Configuration configuration, String[] columnNames) { + String basePath = configuration.getString(Constants.SINK_CSV_BASE_PATH_KEY, ""); + if (basePath == null || basePath.trim().isEmpty()) { + throw new IllegalArgumentException("Missing required configuration '" + Constants.SINK_CSV_BASE_PATH_KEY + + "' for CSV sink. Example: oss://bucket-name/some-folder"); + } + + String writeMode = configuration.getString(Constants.SINK_CSV_WRITE_MODE_KEY, Constants.SINK_CSV_WRITE_MODE_DEFAULT); + String dateFormat = configuration.getString(Constants.SINK_CSV_DATE_FORMAT_KEY, Constants.SINK_CSV_DATE_FORMAT_DEFAULT); + String delimiter = configuration.getString(Constants.SINK_CSV_DELIMITER_KEY, Constants.SINK_CSV_DELIMITER_DEFAULT); + boolean writeHeader = configuration.getBoolean(Constants.SINK_CSV_WRITE_HEADER_KEY, Constants.SINK_CSV_WRITE_HEADER_DEFAULT); + String filenamePrefix = configuration.getString(Constants.SINK_CSV_FILENAME_PREFIX_KEY, Constants.SINK_CSV_FILENAME_PREFIX_DEFAULT); + String jobId = configuration.getString(Constants.FLINK_JOB_ID_KEY, Constants.FLINK_JOB_ID_DEFAULT); + + CsvSinkConfig config = new CsvSinkConfig(basePath.trim(), jobId, filenamePrefix, dateFormat, delimiter, writeHeader); + FileWriteStrategy writeStrategy = FileWriteStrategyFactory.getWriteStrategy(writeMode); + FileStorageClient storageClient = new FlinkFileSystemStorageClient(); + + return new CsvSink(columnNames, config, storageClient, writeStrategy); + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index 41d489083..b5dc9339c 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -202,6 +202,10 @@ public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIAB public static final String SINK_ERROR_TYPES_FOR_FAILURE = "SINK_ERROR_TYPES_FOR_FAILURE"; public static final String SINK_ERROR_TYPES_FOR_FAILURE_DEFAULT = ""; + public static final String SINK_TYPE_KEY = "SINK_TYPE"; + public static final String SINK_TYPE_DEFAULT = "influx"; + public static final String SINK_TYPE_CSV = "csv"; + // CSV sink. Output path = SINK_CSV_BASE_PATH//-.csv public static final String SINK_CSV_BASE_PATH_KEY = "SINK_CSV_BASE_PATH"; public static final String SINK_CSV_WRITE_MODE_KEY = "SINK_CSV_WRITE_MODE"; diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java index 30b4981db..05566c083 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java @@ -5,6 +5,7 @@ import com.gotocompany.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter; import com.gotocompany.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import com.gotocompany.dagger.core.sink.bigquery.BigQuerySink; +import com.gotocompany.dagger.core.sink.csv.CsvSink; import com.gotocompany.dagger.core.sink.influx.InfluxDBSink; import com.gotocompany.dagger.core.sink.influx.InfluxSinkOverrides; import com.gotocompany.dagger.core.sink.log.LogSink; @@ -113,6 +114,26 @@ public void shouldReturnSinkMetrics() { assertEquals(expectedMetrics, sinkOrchestrator.getTelemetry()); } + @Test + public void shouldGiveCsvSinkWhenConfiguredToUseCsv() throws Exception { + when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("csv"); + when(configuration.getString(eq(Constants.SINK_CSV_BASE_PATH_KEY), anyString())).thenReturn("file:///tmp/out"); + when(configuration.getString(eq(Constants.SINK_CSV_WRITE_MODE_KEY), anyString())).thenReturn(Constants.SINK_CSV_WRITE_MODE_OVERWRITE); + when(configuration.getBoolean(eq(Constants.SINK_CSV_WRITE_HEADER_KEY), anyBoolean())).thenReturn(true); + + Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{"id"}, stencilClientOrchestrator, daggerStatsDReporter, influxSinkOverrides); + + assertThat(sinkFunction, instanceOf(CsvSink.class)); + } + + @Test + public void shouldThrowWhenCsvSinkConfiguredWithoutBasePath() { + when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("csv"); + + Assert.assertThrows(IllegalArgumentException.class, + () -> sinkOrchestrator.getSink(configuration, new String[]{"id"}, stencilClientOrchestrator, daggerStatsDReporter, influxSinkOverrides)); + } + @Test public void shouldReturnBigQuerySink() { when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("bigquery"); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java new file mode 100644 index 000000000..72b6dadfb --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java @@ -0,0 +1,67 @@ +package com.gotocompany.dagger.core.sink.csv; + +import com.gotocompany.dagger.common.configuration.Configuration; +import com.gotocompany.dagger.core.utils.Constants; +import org.apache.flink.api.java.utils.ParameterTool; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +public class CsvSinkBuilderTest { + + private static final String[] COLUMN_NAMES = new String[]{"id", "name"}; + + private Configuration configurationOf(Map values) { + return new Configuration(ParameterTool.fromMap(values)); + } + + @Test + public void shouldBuildCsvSinkWhenBasePathIsProvided() { + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, "file:///tmp/out"); + + CsvSink sink = CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES); + + assertNotNull(sink); + } + + @Test + public void shouldBuildCsvSinkWithExplicitAppendWriteMode() { + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, "file:///tmp/out"); + values.put(Constants.SINK_CSV_WRITE_MODE_KEY, Constants.SINK_CSV_WRITE_MODE_APPEND); + + CsvSink sink = CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES); + + assertNotNull(sink); + } + + @Test + public void shouldThrowWhenBasePathIsMissing() { + assertThrows(IllegalArgumentException.class, + () -> CsvSinkBuilder.build(configurationOf(new HashMap<>()), COLUMN_NAMES)); + } + + @Test + public void shouldThrowWhenBasePathIsBlank() { + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, " "); + + assertThrows(IllegalArgumentException.class, + () -> CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES)); + } + + @Test + public void shouldThrowForUnsupportedWriteMode() { + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, "file:///tmp/out"); + values.put(Constants.SINK_CSV_WRITE_MODE_KEY, "UPSERT"); + + assertThrows(IllegalArgumentException.class, + () -> CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES)); + } +} From 282f9f9661476772bfddaca2fffeece0335656a2 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 10 Jun 2026 20:48:12 +0700 Subject: [PATCH 06/10] docs: document CSV sink configuration and usage Add the CSV sink to configuration.md (TOC, SINK_TYPE list, and a full CSV Sink section covering SINK_CSV_* keys with examples and defaults) and to create_dagger.md (supported-sinks list, prerequisites bullet, and a CSV Sink section with a sample properties block and OVERWRITE/APPEND guidance). Co-authored-by: Cursor --- docs/docs/guides/create_dagger.md | 27 ++++++++++++- docs/docs/reference/configuration.md | 59 +++++++++++++++++++++++++++- 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/docs/docs/guides/create_dagger.md b/docs/docs/guides/create_dagger.md index 428f50c14..ee5814eec 100644 --- a/docs/docs/guides/create_dagger.md +++ b/docs/docs/guides/create_dagger.md @@ -119,7 +119,7 @@ $ java -jar dagger-core/build/libs/dagger-core--fat.jar ConfigFi #### `Sinks` -- The current version of dagger supports Log, BigQuery, InfluxDB and Kafka as supported sinks to push the data after processing. You need to set up the desired sinks beforehand so that data can be pushed seamlessly. +- The current version of dagger supports Log, BigQuery, InfluxDB, Kafka and CSV as supported sinks to push the data after processing. You need to set up the desired sinks beforehand so that data can be pushed seamlessly. ##### `Influx Sink` @@ -137,6 +137,11 @@ $ java -jar dagger-core/build/libs/dagger-core--fat.jar ConfigFi - Bigquery Sink is created using the GOTO Depot library. - Depot is a sink connector, which acts as a bridge between data processing systems and real sink. You can check out the Depot Github repository [here](https://github.com/goto/depot/tree/main/docs). + ##### `CSV Sink` : + + - CSV sink writes the processed data as a daily-rolling CSV file to any Flink-supported filesystem (`file://`, `gs://`, `oss://`, `cosn://`, `s3://`). It is handy when downstream consumers want to download the file and import it into spreadsheet tools like Google Sheets or Lark. + - You only need write access to the target path; no extra service needs to be set up beyond the relevant Flink filesystem plugin/credentials for the chosen scheme. + ## Common Configurations @@ -258,6 +263,26 @@ OUTPUT_KAFKA_TOPIC=test-kafka-output - [Errors Handling](https://github.com/goto/depot/blob/main/docs/sinks/bigquery.md#errors-handling) - [Google Cloud Bigquery IAM Permission](https://github.com/goto/depot/blob/main/docs/sinks/bigquery.md#google-cloud-bigquery-iam-permission) +## CSV Sink + +- CSV sink writes the query output as a daily-rolling CSV file to any Flink-supported filesystem. The output file for a given day is `//-.csv`. +- Listing some of the configurations essential for CSV sink Dagger. Find more about them [here](../reference/configuration.md#csv-sink). + +```properties +# === sink config === +SINK_TYPE=csv +SINK_CSV_BASE_PATH=oss://bucket-name/some-folder +# === optional (defaults shown) === +SINK_CSV_WRITE_MODE=OVERWRITE +SINK_CSV_DATE_FORMAT=dd-MMM-yyyy +SINK_CSV_DELIMITER=, +SINK_CSV_WRITE_HEADER=true +SINK_CSV_FILENAME_PREFIX=output +``` + +- Rows are buffered and flushed on every Flink checkpoint, so the effective write cadence follows your windowing query and `FLINK_CHECKPOINT_INTERVAL`. The sink always runs with parallelism 1 and provides at-least-once delivery. +- Use `OVERWRITE` for windowed/aggregated snapshots (the file always reflects the latest window) and `APPEND` for time-series/passthrough output (rows accumulate over the day). Flatten nested columns in SQL using dot-notation and aliases, e.g. `SELECT emp.code AS emp_code`. + ## Advanced Data Processing - Dagger's inbuilt SQL enables users to do some complex stream processing like aggregations, joins, windowing, union etc. Find more of the sample queries [here](./guides/query_examples.md). diff --git a/docs/docs/reference/configuration.md b/docs/docs/reference/configuration.md index 8c632c2c9..67f81eb5b 100644 --- a/docs/docs/reference/configuration.md +++ b/docs/docs/reference/configuration.md @@ -8,6 +8,7 @@ This page contains references for all the application configurations for Dagger. * [Influx Sink](configuration.md#influx-sink) * [Kafka Sink](configuration.md#kafka-sink) * [BigQuery Sink](configuration.md#bigquery-sink) +* [CSV Sink](configuration.md#csv-sink) * [Schema Registry](configuration.md#schema-registry) * [Flink](configuration.md#flink) * [Darts](configuration.md#darts) @@ -287,7 +288,7 @@ STREAMS = [ #### `SINK_TYPE` -Defines the Dagger sink type. At present, we support `log`, `influx`, `kafka`, `bigquery` +Defines the Dagger sink type. At present, we support `log`, `influx`, `kafka`, `bigquery`, `csv` * Example value: `log` * Type: `required` @@ -467,6 +468,62 @@ Contains the error types for which the dagger should throw an exception if such - Type: `optional` +### CSV Sink + +A CSV sink Dagger \(`SINK_TYPE`=`csv`\) writes the query output as a daily-rolling CSV file to any Flink-supported filesystem (e.g. `file://`, `gs://`, `oss://`, `cosn://`, `s3://`). The full output path for a given day is `//-.csv`. + +The sink buffers output rows and flushes them on every Flink checkpoint (see `FLINK_CHECKPOINT_INTERVAL`); an empty buffer results in a no-op flush, so the file is only (re)written when there is new data. The sink always runs with parallelism 1 so that a single subtask owns the daily file. Delivery is at-least-once. + +Nested/composite columns are not expanded automatically — flatten them in your SQL using dot-notation and aliases (e.g. `SELECT emp.code AS emp_code`). Any non-scalar value that still reaches the sink (`Map`/`Collection`/array) is JSON-encoded into a single cell. + +#### `SINK_CSV_BASE_PATH` + +Defines the root path under which the daily CSV files are written. The job id and file name are appended to this base path. + +* Example value: `oss://bucket-name/some-folder` +* Type: `required` + +#### `SINK_CSV_WRITE_MODE` + +Defines how each flush writes to the daily file. `OVERWRITE` fully replaces the file with the latest buffer (best for windowed/aggregated snapshots), while `APPEND` performs a read-modify-write to accumulate rows (best for time-series/passthrough). `APPEND` is at-least-once and may produce duplicate rows on restart. + +* Example value: `APPEND` +* Type: `optional` +* Default value: `OVERWRITE` + +#### `SINK_CSV_DATE_FORMAT` + +Defines the date pattern used in the daily file name. It is a Java `DateTimeFormatter` pattern, rendered with `Locale.ENGLISH`. + +* Example value: `dd-MMM-yyyy` +* Type: `optional` +* Default value: `dd-MMM-yyyy` + +#### `SINK_CSV_DELIMITER` + +Defines the field delimiter used between columns. Values containing the delimiter, double-quotes, or newlines are quoted following RFC 4180. + +* Example value: `,` +* Type: `optional` +* Default value: `,` + +#### `SINK_CSV_WRITE_HEADER` + +Enable/Disable writing the column-name header as the first line of the file. + +* Example value: `true` +* Type: `optional` +* Default value: `true` + +#### `SINK_CSV_FILENAME_PREFIX` + +Defines the prefix of the daily file name; the date and `.csv` extension are appended to it. + +* Example value: `output` +* Type: `optional` +* Default value: `output` + + ### Schema Registry Stencil is dynamic schema registry for protobuf. Find more details about Stencil [here](https://github.com/goto/stencil#stencil). From 7634bffd0a0c64042a33fe00492f2d28ddca7aec Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 10 Jun 2026 23:11:03 +0700 Subject: [PATCH 07/10] fix(sink): flush CSV sink on prepareCommit and add logging Flush from prepareCommit instead of snapshotState: for a stateless sink (no writer-state serializer) Flink's SinkOperator never calls the writer's snapshotState, so checkpoint flushes never fired and no file was written despite healthy throughput. prepareCommit is invoked on every checkpoint (prepareSnapshotPreBarrier) and once with endOfInput=true at end of input, and close() remains a safety net. snapshotState is now a no-op. Also add INFO logging across init/flush/write (including the resolved filesystem and an error-with-rethrow on flush failure, which the job's tolerable-checkpoint-failure setting would otherwise hide) and strip all trailing slashes from SINK_CSV_BASE_PATH. Update and extend tests. Co-authored-by: Cursor --- .../dagger/core/sink/csv/CsvSinkWriter.java | 41 ++++++++--- .../storage/FlinkFileSystemStorageClient.java | 5 ++ .../core/sink/csv/CsvSinkWriterTest.java | 68 ++++++++++++++++--- 3 files changed, 96 insertions(+), 18 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java index 43fe45341..4c01599fc 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java @@ -5,6 +5,8 @@ import com.google.gson.Gson; import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Clock; @@ -18,12 +20,16 @@ import java.util.Map; /** - * Buffers formatted CSV rows and flushes them on every checkpoint (snapshotState) and on close. + * Buffers formatted CSV rows and flushes them on every checkpoint and on close. The flush is driven + * from {@code prepareCommit}, which Flink invokes on every checkpoint (via prepareSnapshotPreBarrier) + * and once with endOfInput=true at end of input; this is the reliable hook for a stateless sink, + * since {@code snapshotState} is only invoked for sinks that expose a writer-state serializer. * The destination path rolls over daily: {@code basePath//-.csv}. Whether the * flush appends or fully replaces the file is decided by the configured {@link FileWriteStrategy}. */ public class CsvSinkWriter implements SinkWriter { + private static final Logger LOGGER = LoggerFactory.getLogger(CsvSinkWriter.class); private static final Gson GSON = new Gson(); private static final String PATH_SEPARATOR = "/"; private static final String FILE_EXTENSION = ".csv"; @@ -51,6 +57,8 @@ public CsvSinkWriter(String[] columnNames, CsvSinkConfig config, FileStorageClie this.dateTimeFormatter = DateTimeFormatter.ofPattern(config.getDateFormat(), Locale.ENGLISH); this.sanitizedJobId = sanitize(config.getJobId()); this.basePath = stripTrailingSeparator(config.getBasePath()); + LOGGER.info("CSV sink writer initialized: basePath={}, jobId={}, writeStrategy={}, writeHeader={}, parallelism=1", + basePath, sanitizedJobId, writeStrategy.getClass().getSimpleName(), config.isWriteHeader()); } @Override @@ -59,24 +67,39 @@ public void write(Row row, Context context) { } @Override - public List snapshotState(long checkpointId) throws IOException { + public List prepareCommit(boolean endOfInput) throws IOException { + LOGGER.info("CSV sink flushing ({} buffered row(s), endOfInput={})", bufferedLines.size(), endOfInput); flush(); return Collections.emptyList(); } @Override - public List prepareCommit(boolean flush) { + public List snapshotState(long checkpointId) { return Collections.emptyList(); } @Override public void close() throws Exception { + LOGGER.info("CSV sink closing, flushing {} buffered row(s)", bufferedLines.size()); flush(); } private void flush() throws IOException { - writeStrategy.flush(storageClient, buildPath(), buildHeaderLine(), bufferedLines); - bufferedLines.clear(); + String path = buildPath(); + if (bufferedLines.isEmpty()) { + LOGGER.info("CSV sink: nothing to flush (empty buffer) for {}", path); + return; + } + int rowCount = bufferedLines.size(); + try { + LOGGER.info("CSV sink: writing {} row(s) to {}", rowCount, path); + writeStrategy.flush(storageClient, path, buildHeaderLine(), bufferedLines); + bufferedLines.clear(); + LOGGER.info("CSV sink: successfully wrote {} row(s) to {}", rowCount, path); + } catch (IOException | RuntimeException e) { + LOGGER.error("CSV sink: failed to write {} row(s) to {} : {}", rowCount, path, e.getMessage(), e); + throw e; + } } private String buildPath() { @@ -135,9 +158,11 @@ private static String sanitize(String jobId) { } private static String stripTrailingSeparator(String path) { - if (path.endsWith(PATH_SEPARATOR)) { - return path.substring(0, path.length() - 1); + String trimmed = path.trim(); + int end = trimmed.length(); + while (end > 0 && trimmed.charAt(end - 1) == '/') { + end--; } - return path; + return trimmed.substring(0, end); } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClient.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClient.java index 99685c3aa..8f598da9a 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClient.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/storage/FlinkFileSystemStorageClient.java @@ -4,6 +4,8 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -16,6 +18,7 @@ */ public class FlinkFileSystemStorageClient implements FileStorageClient { + private static final Logger LOGGER = LoggerFactory.getLogger(FlinkFileSystemStorageClient.class); private static final long serialVersionUID = 1L; private static final int BUFFER_SIZE = 8192; @@ -47,8 +50,10 @@ public byte[] read(String path) throws IOException { public void write(String path, byte[] content) throws IOException { Path filePath = new Path(path); FileSystem fileSystem = filePath.getFileSystem(); + LOGGER.info("Writing {} bytes to {} using filesystem {}", content.length, filePath, fileSystem.getClass().getName()); try (FSDataOutputStream outputStream = fileSystem.create(filePath, FileSystem.WriteMode.OVERWRITE)) { outputStream.write(content); } + LOGGER.info("Finished writing {} bytes to {}", content.length, filePath); } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java index 279e0d203..120f09299 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java @@ -27,31 +27,59 @@ public void setup() { } private CsvSinkConfig config(boolean writeHeader) { - return new CsvSinkConfig(BASE_PATH, "my bookings job", "output", "dd-MMM-yyyy", ",", writeHeader); + return config(BASE_PATH, writeHeader); + } + + private CsvSinkConfig config(String basePath, boolean writeHeader) { + return new CsvSinkConfig(basePath, "my bookings job", "output", "dd-MMM-yyyy", ",", writeHeader); } private CsvSinkWriter writer(String[] columnNames, boolean writeHeader, Clock clock) { return new CsvSinkWriter(columnNames, config(writeHeader), storageClient, new OverwriteWriteStrategy(), clock); } + private CsvSinkWriter writerWithBasePath(String basePath) { + return new CsvSinkWriter(new String[]{"a"}, config(basePath, false), storageClient, new OverwriteWriteStrategy(), DAY_ONE); + } + @Test - public void shouldWriteHeaderAndRowOnSnapshot() throws Exception { + public void shouldWriteHeaderAndRowOnPrepareCommit() throws Exception { CsvSinkWriter writer = writer(new String[]{"service_type", "booking_count"}, true, DAY_ONE); writer.write(Row.of("GO_RIDE", 120L), null); - writer.snapshotState(1L); + writer.prepareCommit(false); String expectedPath = "file:///tmp/out/my_bookings_job/output-09-Jun-2026.csv"; assertTrue(storageClient.exists(expectedPath)); assertEquals("service_type,booking_count\nGO_RIDE,120\n", storageClient.readAsString(expectedPath)); } + @Test + public void shouldNotFlushOnSnapshotState() throws Exception { + CsvSinkWriter writer = writer(new String[]{"a"}, true, DAY_ONE); + + writer.write(Row.of("x"), null); + writer.snapshotState(1L); + + assertEquals(0, storageClient.getWriteCount()); + } + + @Test + public void shouldFlushBufferedRowsOnClose() throws Exception { + CsvSinkWriter writer = writer(new String[]{"a"}, false, DAY_ONE); + + writer.write(Row.of("x"), null); + writer.close(); + + assertTrue(storageClient.exists(pathDayOne())); + } + @Test public void shouldSanitizeJobIdAndBuildDailyPath() throws Exception { CsvSinkWriter writer = writer(new String[]{"a"}, false, DAY_ONE); writer.write(Row.of("x"), null); - writer.snapshotState(1L); + writer.prepareCommit(false); assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-09-Jun-2026.csv")); } @@ -61,17 +89,37 @@ public void shouldRollOverToNextDayFile() throws Exception { writer(new String[]{"a"}, false, DAY_TWO).write(Row.of("x"), null); CsvSinkWriter dayTwoWriter = writer(new String[]{"a"}, false, DAY_TWO); dayTwoWriter.write(Row.of("x"), null); - dayTwoWriter.snapshotState(1L); + dayTwoWriter.prepareCommit(false); assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-10-Jun-2026.csv")); } + @Test + public void shouldStripSingleTrailingSlashFromBasePath() throws Exception { + CsvSinkWriter writer = writerWithBasePath("file:///tmp/out/"); + + writer.write(Row.of("x"), null); + writer.prepareCommit(false); + + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-09-Jun-2026.csv")); + } + + @Test + public void shouldStripMultipleTrailingSlashesFromBasePath() throws Exception { + CsvSinkWriter writer = writerWithBasePath("file:///tmp/out///"); + + writer.write(Row.of("x"), null); + writer.prepareCommit(false); + + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-09-Jun-2026.csv")); + } + @Test public void shouldFormatNullFieldAsEmpty() throws Exception { CsvSinkWriter writer = writer(new String[]{"a", "b"}, false, DAY_ONE); writer.write(Row.of("foo", null), null); - writer.snapshotState(1L); + writer.prepareCommit(false); assertEquals("foo,\n", storageClient.readAsString(pathDayOne())); } @@ -81,7 +129,7 @@ public void shouldFormatLocalDateTimeAsIso() throws Exception { CsvSinkWriter writer = writer(new String[]{"window_timestamp"}, false, DAY_ONE); writer.write(Row.of(LocalDateTime.of(2026, 6, 9, 12, 3, 0)), null); - writer.snapshotState(1L); + writer.prepareCommit(false); assertEquals("2026-06-09T12:03\n", storageClient.readAsString(pathDayOne())); } @@ -91,7 +139,7 @@ public void shouldQuoteFieldsContainingDelimiterQuoteOrNewline() throws Exceptio CsvSinkWriter writer = writer(new String[]{"a", "b", "c"}, false, DAY_ONE); writer.write(Row.of("has,comma", "has\"quote", "has\nnewline"), null); - writer.snapshotState(1L); + writer.prepareCommit(false); assertEquals("\"has,comma\",\"has\"\"quote\",\"has\nnewline\"\n", storageClient.readAsString(pathDayOne())); } @@ -101,7 +149,7 @@ public void shouldJsonEncodeCompositeValues() throws Exception { CsvSinkWriter writer = writer(new String[]{"ids"}, false, DAY_ONE); writer.write(Row.of((Object) new int[]{1, 2, 3}), null); - writer.snapshotState(1L); + writer.prepareCommit(false); assertEquals("\"[1,2,3]\"\n", storageClient.readAsString(pathDayOne())); } @@ -110,7 +158,7 @@ public void shouldJsonEncodeCompositeValues() throws Exception { public void shouldNotWriteWhenNoRowsBuffered() throws Exception { CsvSinkWriter writer = writer(new String[]{"a"}, true, DAY_ONE); - writer.snapshotState(1L); + writer.prepareCommit(false); assertEquals(0, storageClient.getWriteCount()); } From 386fd9f8f0d56defe7d23736d2d1f3ab22f61b73 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Fri, 12 Jun 2026 20:28:50 +0700 Subject: [PATCH 08/10] feat(sink): make CSV sink rolling pattern-driven via SINK_CSV_PARTITION_DATE_FORMAT Format a LocalDateTime so the date pattern can carry time fields, letting the pattern's finest field decide the file rolling/partitioning granularity (yearly, daily, hourly, minutely, ...). Rename SINK_CSV_DATE_FORMAT to SINK_CSV_PARTITION_DATE_FORMAT, default to yyyy-MMM-dd-HH-mm, and validate that the pattern uses only DateTimeFormatter letters plus '-' and '_'. Co-authored-by: Cursor --- .../dagger/core/sink/csv/CsvSinkBuilder.java | 24 ++++++++++- .../dagger/core/sink/csv/CsvSinkWriter.java | 9 ++-- .../dagger/core/utils/Constants.java | 8 ++-- .../core/sink/SinkOrchestratorTest.java | 1 + .../core/sink/csv/CsvSinkBuilderTest.java | 34 +++++++++++++++ .../core/sink/csv/CsvSinkWriterTest.java | 43 +++++++++++++++++++ docs/docs/guides/create_dagger.md | 2 +- docs/docs/reference/configuration.md | 14 ++++-- version.txt | 2 +- 9 files changed, 124 insertions(+), 13 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java index 15e3d16f1..867b6f69b 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java @@ -7,6 +7,10 @@ import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategyFactory; import com.gotocompany.dagger.core.utils.Constants; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.regex.Pattern; + /** * Builds a {@link CsvSink} from Dagger {@link Configuration}, wiring the write strategy and the * Flink filesystem-backed storage client. SINK_CSV_BASE_PATH is required; everything else has a @@ -14,6 +18,8 @@ */ public class CsvSinkBuilder { + private static final Pattern ALLOWED_DATE_FORMAT = Pattern.compile("^[A-Za-z_-]+$"); + private CsvSinkBuilder() { } @@ -24,8 +30,10 @@ public static CsvSink build(Configuration configuration, String[] columnNames) { + "' for CSV sink. Example: oss://bucket-name/some-folder"); } + String dateFormat = configuration.getString(Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY, Constants.SINK_CSV_PARTITION_DATE_FORMAT_DEFAULT); + validateDateFormat(dateFormat); + String writeMode = configuration.getString(Constants.SINK_CSV_WRITE_MODE_KEY, Constants.SINK_CSV_WRITE_MODE_DEFAULT); - String dateFormat = configuration.getString(Constants.SINK_CSV_DATE_FORMAT_KEY, Constants.SINK_CSV_DATE_FORMAT_DEFAULT); String delimiter = configuration.getString(Constants.SINK_CSV_DELIMITER_KEY, Constants.SINK_CSV_DELIMITER_DEFAULT); boolean writeHeader = configuration.getBoolean(Constants.SINK_CSV_WRITE_HEADER_KEY, Constants.SINK_CSV_WRITE_HEADER_DEFAULT); String filenamePrefix = configuration.getString(Constants.SINK_CSV_FILENAME_PREFIX_KEY, Constants.SINK_CSV_FILENAME_PREFIX_DEFAULT); @@ -37,4 +45,18 @@ public static CsvSink build(Configuration configuration, String[] columnNames) { return new CsvSink(columnNames, config, storageClient, writeStrategy); } + + private static void validateDateFormat(String dateFormat) { + if (dateFormat == null || !ALLOWED_DATE_FORMAT.matcher(dateFormat).matches()) { + throw new IllegalArgumentException("Invalid '" + Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY + "' value '" + dateFormat + + "' for CSV sink. Allowed characters: pattern letters (y, M, d, H, m, s, ...) and the separators " + + "'-' and '_'. Characters such as '/', ':', '|', '.', and spaces are not allowed."); + } + try { + DateTimeFormatter.ofPattern(dateFormat, Locale.ENGLISH); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid '" + Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY + "' value '" + dateFormat + + "' for CSV sink: not a valid date-time pattern.", e); + } + } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java index 4c01599fc..cab21820f 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java @@ -10,7 +10,7 @@ import java.io.IOException; import java.time.Clock; -import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; @@ -24,8 +24,9 @@ * from {@code prepareCommit}, which Flink invokes on every checkpoint (via prepareSnapshotPreBarrier) * and once with endOfInput=true at end of input; this is the reliable hook for a stateless sink, * since {@code snapshotState} is only invoked for sinks that expose a writer-state serializer. - * The destination path rolls over daily: {@code basePath//-.csv}. Whether the - * flush appends or fully replaces the file is decided by the configured {@link FileWriteStrategy}. + * The destination path is {@code basePath//-.csv}; the rolling/sharding + * granularity (yearly, daily, hourly, minutely, ...) is decided by the SINK_CSV_PARTITION_DATE_FORMAT pattern. + * Whether the flush appends or fully replaces the file is decided by the configured {@link FileWriteStrategy}. */ public class CsvSinkWriter implements SinkWriter { @@ -103,7 +104,7 @@ private void flush() throws IOException { } private String buildPath() { - String date = LocalDate.now(clock).format(dateTimeFormatter); + String date = LocalDateTime.now(clock).format(dateTimeFormatter); return basePath + PATH_SEPARATOR + sanitizedJobId + PATH_SEPARATOR + config.getFilenamePrefix() + "-" + date + FILE_EXTENSION; } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index b5dc9339c..4f57bd4e2 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -212,9 +212,11 @@ public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIAB public static final String SINK_CSV_WRITE_MODE_APPEND = "APPEND"; public static final String SINK_CSV_WRITE_MODE_OVERWRITE = "OVERWRITE"; public static final String SINK_CSV_WRITE_MODE_DEFAULT = SINK_CSV_WRITE_MODE_OVERWRITE; - // Java DateTimeFormatter pattern (dd day-of-month, MMM month abbrev, yyyy year). Rendered with Locale.ENGLISH. - public static final String SINK_CSV_DATE_FORMAT_KEY = "SINK_CSV_DATE_FORMAT"; - public static final String SINK_CSV_DATE_FORMAT_DEFAULT = "dd-MMM-yyyy"; + // Java DateTimeFormatter pattern rendered with Locale.ENGLISH. Its finest field decides the file rolling/sharding + // granularity (e.g. yyyy=yearly, yyyy-MM=monthly, dd-MMM-yyyy=daily, yyyy-MMM-dd-HH-mm=minutely). + // Allowed characters: pattern letters (y, M, d, H, m, s, ...) and the separators '-' and '_'; anything else is rejected. + public static final String SINK_CSV_PARTITION_DATE_FORMAT_KEY = "SINK_CSV_PARTITION_DATE_FORMAT"; + public static final String SINK_CSV_PARTITION_DATE_FORMAT_DEFAULT = "yyyy-MMM-dd-HH-mm"; public static final String SINK_CSV_DELIMITER_KEY = "SINK_CSV_DELIMITER"; public static final String SINK_CSV_DELIMITER_DEFAULT = ","; public static final String SINK_CSV_WRITE_HEADER_KEY = "SINK_CSV_WRITE_HEADER"; diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java index 05566c083..2cec9b373 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java @@ -119,6 +119,7 @@ public void shouldGiveCsvSinkWhenConfiguredToUseCsv() throws Exception { when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("csv"); when(configuration.getString(eq(Constants.SINK_CSV_BASE_PATH_KEY), anyString())).thenReturn("file:///tmp/out"); when(configuration.getString(eq(Constants.SINK_CSV_WRITE_MODE_KEY), anyString())).thenReturn(Constants.SINK_CSV_WRITE_MODE_OVERWRITE); + when(configuration.getString(eq(Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY), anyString())).thenReturn(Constants.SINK_CSV_PARTITION_DATE_FORMAT_DEFAULT); when(configuration.getBoolean(eq(Constants.SINK_CSV_WRITE_HEADER_KEY), anyBoolean())).thenReturn(true); Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{"id"}, stencilClientOrchestrator, daggerStatsDReporter, influxSinkOverrides); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java index 72b6dadfb..7b37e0027 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java @@ -64,4 +64,38 @@ public void shouldThrowForUnsupportedWriteMode() { assertThrows(IllegalArgumentException.class, () -> CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES)); } + + @Test + public void shouldBuildCsvSinkWithValidDateFormats() { + for (String dateFormat : new String[]{"yyyy", "yyyy-MMM-dd-HH-mm", "yyyy_MM_dd"}) { + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, "file:///tmp/out"); + values.put(Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY, dateFormat); + + assertNotNull(CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES)); + } + } + + @Test + public void shouldThrowForDateFormatWithDisallowedCharacters() { + for (String dateFormat : new String[]{"yyyy/MM/dd", "yyyy:MM", "yyyy|MM", "yyyy.MM.dd", "yyyy MM dd"}) { + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, "file:///tmp/out"); + values.put(Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY, dateFormat); + + assertThrows("Expected rejection for date format '" + dateFormat + "'", IllegalArgumentException.class, + () -> CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES)); + } + } + + @Test + public void shouldThrowForUnparseableDateFormat() { + // 'J' passes the character allowlist but is an unknown DateTimeFormatter pattern letter. + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, "file:///tmp/out"); + values.put(Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY, "J"); + + assertThrows(IllegalArgumentException.class, + () -> CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES)); + } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java index 120f09299..9db8c3436 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java @@ -42,6 +42,11 @@ private CsvSinkWriter writerWithBasePath(String basePath) { return new CsvSinkWriter(new String[]{"a"}, config(basePath, false), storageClient, new OverwriteWriteStrategy(), DAY_ONE); } + private CsvSinkWriter writerWithFormat(String dateFormat, Clock clock) { + CsvSinkConfig config = new CsvSinkConfig(BASE_PATH, "my bookings job", "output", dateFormat, ",", false); + return new CsvSinkWriter(new String[]{"a"}, config, storageClient, new OverwriteWriteStrategy(), clock); + } + @Test public void shouldWriteHeaderAndRowOnPrepareCommit() throws Exception { CsvSinkWriter writer = writer(new String[]{"service_type", "booking_count"}, true, DAY_ONE); @@ -94,6 +99,44 @@ public void shouldRollOverToNextDayFile() throws Exception { assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-10-Jun-2026.csv")); } + @Test + public void shouldRollYearlyWithYearPattern() throws Exception { + CsvSinkWriter writer = writerWithFormat("yyyy", DAY_ONE); + + writer.write(Row.of("x"), null); + writer.prepareCommit(false); + + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-2026.csv")); + } + + @Test + public void shouldRollMinutelyWithMinutePattern() throws Exception { + CsvSinkWriter writer = writerWithFormat("yyyy-MMM-dd-HH-mm", DAY_ONE); + + writer.write(Row.of("x"), null); + writer.prepareCommit(false); + + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-2026-Jun-09-10-00.csv")); + } + + @Test + public void shouldShardIntoSeparateFilesPerMinute() throws Exception { + Clock minuteZero = Clock.fixed(Instant.parse("2026-06-09T10:00:00Z"), ZoneOffset.UTC); + Clock minuteOne = Clock.fixed(Instant.parse("2026-06-09T10:01:00Z"), ZoneOffset.UTC); + + CsvSinkWriter firstMinuteWriter = writerWithFormat("yyyy-MMM-dd-HH-mm", minuteZero); + firstMinuteWriter.write(Row.of("x"), null); + firstMinuteWriter.prepareCommit(false); + + CsvSinkWriter secondMinuteWriter = writerWithFormat("yyyy-MMM-dd-HH-mm", minuteOne); + secondMinuteWriter.write(Row.of("y"), null); + secondMinuteWriter.prepareCommit(false); + + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-2026-Jun-09-10-00.csv")); + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-2026-Jun-09-10-01.csv")); + assertEquals(2, storageClient.getWriteCount()); + } + @Test public void shouldStripSingleTrailingSlashFromBasePath() throws Exception { CsvSinkWriter writer = writerWithBasePath("file:///tmp/out/"); diff --git a/docs/docs/guides/create_dagger.md b/docs/docs/guides/create_dagger.md index ee5814eec..1c0498a44 100644 --- a/docs/docs/guides/create_dagger.md +++ b/docs/docs/guides/create_dagger.md @@ -274,7 +274,7 @@ SINK_TYPE=csv SINK_CSV_BASE_PATH=oss://bucket-name/some-folder # === optional (defaults shown) === SINK_CSV_WRITE_MODE=OVERWRITE -SINK_CSV_DATE_FORMAT=dd-MMM-yyyy +SINK_CSV_PARTITION_DATE_FORMAT=yyyy-MMM-dd-HH-mm SINK_CSV_DELIMITER=, SINK_CSV_WRITE_HEADER=true SINK_CSV_FILENAME_PREFIX=output diff --git a/docs/docs/reference/configuration.md b/docs/docs/reference/configuration.md index 67f81eb5b..7d96e1ee7 100644 --- a/docs/docs/reference/configuration.md +++ b/docs/docs/reference/configuration.md @@ -491,13 +491,21 @@ Defines how each flush writes to the daily file. `OVERWRITE` fully replaces the * Type: `optional` * Default value: `OVERWRITE` -#### `SINK_CSV_DATE_FORMAT` +#### `SINK_CSV_PARTITION_DATE_FORMAT` -Defines the date pattern used in the daily file name. It is a Java `DateTimeFormatter` pattern, rendered with `Locale.ENGLISH`. +Defines the date-time pattern used both to name and to partition the output files. It is a Java `DateTimeFormatter` pattern, rendered with `Locale.ENGLISH`. The finest field in the pattern decides the file rolling/partitioning granularity, for example: + +* `yyyy` -> `output-2026.csv` (yearly) +* `yyyy-MM` -> `output-2026-06.csv` (monthly) +* `dd-MMM-yyyy` -> `output-12-Jun-2026.csv` (daily) +* `yyyy-MMM-dd-HH` -> `output-2026-Jun-12-06.csv` (hourly) +* `yyyy-MMM-dd-HH-mm` -> `output-2026-Jun-12-06-18.csv` (minutely) + +Allowed characters: the `DateTimeFormatter` pattern letters (such as `y`, `M`, `d`, `H`, `m`, `s`) and the separators hyphen (`-`) and underscore (`_`). Any other character (including `/`, `:`, `|`, `.`, and spaces) is rejected at startup; this keeps file names cross-platform and prevents the date value from injecting subfolders into the path. * Example value: `dd-MMM-yyyy` * Type: `optional` -* Default value: `dd-MMM-yyyy` +* Default value: `yyyy-MMM-dd-HH-mm` #### `SINK_CSV_DELIMITER` diff --git a/version.txt b/version.txt index e01e0ddd8..54d1a4f2a 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.12.4 +0.13.0 From 1743631773e6075267c4304eb191d8080210d66c Mon Sep 17 00:00:00 2001 From: rajuGT Date: Tue, 16 Jun 2026 22:29:06 +0700 Subject: [PATCH 09/10] Introduce CSV partition timezone configuration, so that rolling file works on the configured timezone instead of system time (which is mostly utc) --- .../dagger/core/sink/csv/CsvSinkBuilder.java | 16 ++++++++++++- .../dagger/core/sink/csv/CsvSinkConfig.java | 9 +++++++- .../dagger/core/sink/csv/CsvSinkWriter.java | 2 +- .../dagger/core/utils/Constants.java | 3 +++ .../core/sink/SinkOrchestratorTest.java | 1 + .../core/sink/csv/CsvSinkBuilderTest.java | 23 +++++++++++++++++++ .../core/sink/csv/CsvSinkWriterTest.java | 18 +++++++++++++-- docs/docs/guides/create_dagger.md | 1 + docs/docs/reference/configuration.md | 8 +++++++ 9 files changed, 76 insertions(+), 5 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java index 867b6f69b..2fbe2f255 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilder.java @@ -7,6 +7,8 @@ import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategyFactory; import com.gotocompany.dagger.core.utils.Constants; +import java.time.DateTimeException; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; import java.util.regex.Pattern; @@ -33,13 +35,16 @@ public static CsvSink build(Configuration configuration, String[] columnNames) { String dateFormat = configuration.getString(Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY, Constants.SINK_CSV_PARTITION_DATE_FORMAT_DEFAULT); validateDateFormat(dateFormat); + String timezone = configuration.getString(Constants.SINK_CSV_PARTITION_TIMEZONE_KEY, Constants.SINK_CSV_PARTITION_TIMEZONE_DEFAULT); + ZoneId zoneId = validateTimezone(timezone); + String writeMode = configuration.getString(Constants.SINK_CSV_WRITE_MODE_KEY, Constants.SINK_CSV_WRITE_MODE_DEFAULT); String delimiter = configuration.getString(Constants.SINK_CSV_DELIMITER_KEY, Constants.SINK_CSV_DELIMITER_DEFAULT); boolean writeHeader = configuration.getBoolean(Constants.SINK_CSV_WRITE_HEADER_KEY, Constants.SINK_CSV_WRITE_HEADER_DEFAULT); String filenamePrefix = configuration.getString(Constants.SINK_CSV_FILENAME_PREFIX_KEY, Constants.SINK_CSV_FILENAME_PREFIX_DEFAULT); String jobId = configuration.getString(Constants.FLINK_JOB_ID_KEY, Constants.FLINK_JOB_ID_DEFAULT); - CsvSinkConfig config = new CsvSinkConfig(basePath.trim(), jobId, filenamePrefix, dateFormat, delimiter, writeHeader); + CsvSinkConfig config = new CsvSinkConfig(basePath.trim(), jobId, filenamePrefix, dateFormat, zoneId, delimiter, writeHeader); FileWriteStrategy writeStrategy = FileWriteStrategyFactory.getWriteStrategy(writeMode); FileStorageClient storageClient = new FlinkFileSystemStorageClient(); @@ -59,4 +64,13 @@ private static void validateDateFormat(String dateFormat) { + "' for CSV sink: not a valid date-time pattern.", e); } } + + private static ZoneId validateTimezone(String timezone) { + try { + return ZoneId.of(timezone); + } catch (DateTimeException e) { + throw new IllegalArgumentException("Invalid '" + Constants.SINK_CSV_PARTITION_TIMEZONE_KEY + "' value '" + timezone + + "' for CSV sink. Expected an IANA timezone id such as 'Asia/Jakarta' or 'UTC'.", e); + } + } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkConfig.java index f16b0ef2f..984a43bd1 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkConfig.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkConfig.java @@ -1,6 +1,7 @@ package com.gotocompany.dagger.core.sink.csv; import java.io.Serializable; +import java.time.ZoneId; /** * Immutable, serializable configuration for the CSV sink. The full output path for a given day is @@ -14,14 +15,16 @@ public class CsvSinkConfig implements Serializable { private final String jobId; private final String filenamePrefix; private final String dateFormat; + private final ZoneId zoneId; private final String delimiter; private final boolean writeHeader; - public CsvSinkConfig(String basePath, String jobId, String filenamePrefix, String dateFormat, String delimiter, boolean writeHeader) { + public CsvSinkConfig(String basePath, String jobId, String filenamePrefix, String dateFormat, ZoneId zoneId, String delimiter, boolean writeHeader) { this.basePath = basePath; this.jobId = jobId; this.filenamePrefix = filenamePrefix; this.dateFormat = dateFormat; + this.zoneId = zoneId; this.delimiter = delimiter; this.writeHeader = writeHeader; } @@ -42,6 +45,10 @@ public String getDateFormat() { return dateFormat; } + public ZoneId getZoneId() { + return zoneId; + } + public String getDelimiter() { return delimiter; } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java index cab21820f..d5bdaccaf 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriter.java @@ -46,7 +46,7 @@ public class CsvSinkWriter implements SinkWriter { private final List bufferedLines = new ArrayList<>(); public CsvSinkWriter(String[] columnNames, CsvSinkConfig config, FileStorageClient storageClient, FileWriteStrategy writeStrategy) { - this(columnNames, config, storageClient, writeStrategy, Clock.systemDefaultZone()); + this(columnNames, config, storageClient, writeStrategy, Clock.system(config.getZoneId())); } CsvSinkWriter(String[] columnNames, CsvSinkConfig config, FileStorageClient storageClient, FileWriteStrategy writeStrategy, Clock clock) { diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index 4f57bd4e2..a2b6b62c1 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -217,6 +217,9 @@ public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIAB // Allowed characters: pattern letters (y, M, d, H, m, s, ...) and the separators '-' and '_'; anything else is rejected. public static final String SINK_CSV_PARTITION_DATE_FORMAT_KEY = "SINK_CSV_PARTITION_DATE_FORMAT"; public static final String SINK_CSV_PARTITION_DATE_FORMAT_DEFAULT = "yyyy-MMM-dd-HH-mm"; + // IANA timezone id (e.g. Asia/Jakarta, UTC) used to resolve the wall-clock date that drives the partition boundary. + public static final String SINK_CSV_PARTITION_TIMEZONE_KEY = "SINK_CSV_PARTITION_TIMEZONE"; + public static final String SINK_CSV_PARTITION_TIMEZONE_DEFAULT = "Asia/Jakarta"; public static final String SINK_CSV_DELIMITER_KEY = "SINK_CSV_DELIMITER"; public static final String SINK_CSV_DELIMITER_DEFAULT = ","; public static final String SINK_CSV_WRITE_HEADER_KEY = "SINK_CSV_WRITE_HEADER"; diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java index 2cec9b373..781b0c4a3 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java @@ -120,6 +120,7 @@ public void shouldGiveCsvSinkWhenConfiguredToUseCsv() throws Exception { when(configuration.getString(eq(Constants.SINK_CSV_BASE_PATH_KEY), anyString())).thenReturn("file:///tmp/out"); when(configuration.getString(eq(Constants.SINK_CSV_WRITE_MODE_KEY), anyString())).thenReturn(Constants.SINK_CSV_WRITE_MODE_OVERWRITE); when(configuration.getString(eq(Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY), anyString())).thenReturn(Constants.SINK_CSV_PARTITION_DATE_FORMAT_DEFAULT); + when(configuration.getString(eq(Constants.SINK_CSV_PARTITION_TIMEZONE_KEY), anyString())).thenReturn(Constants.SINK_CSV_PARTITION_TIMEZONE_DEFAULT); when(configuration.getBoolean(eq(Constants.SINK_CSV_WRITE_HEADER_KEY), anyBoolean())).thenReturn(true); Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{"id"}, stencilClientOrchestrator, daggerStatsDReporter, influxSinkOverrides); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java index 7b37e0027..a7639d33b 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkBuilderTest.java @@ -88,6 +88,29 @@ public void shouldThrowForDateFormatWithDisallowedCharacters() { } } + @Test + public void shouldBuildCsvSinkWithValidTimezones() { + for (String timezone : new String[]{"Asia/Jakarta", "UTC", "Europe/London"}) { + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, "file:///tmp/out"); + values.put(Constants.SINK_CSV_PARTITION_TIMEZONE_KEY, timezone); + + assertNotNull(CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES)); + } + } + + @Test + public void shouldThrowForInvalidTimezone() { + for (String timezone : new String[]{"Indonesia/Jakarta", "Not/AZone", "GMT+25"}) { + Map values = new HashMap<>(); + values.put(Constants.SINK_CSV_BASE_PATH_KEY, "file:///tmp/out"); + values.put(Constants.SINK_CSV_PARTITION_TIMEZONE_KEY, timezone); + + assertThrows("Expected rejection for timezone '" + timezone + "'", IllegalArgumentException.class, + () -> CsvSinkBuilder.build(configurationOf(values), COLUMN_NAMES)); + } + } + @Test public void shouldThrowForUnparseableDateFormat() { // 'J' passes the character allowlist but is an unknown DateTimeFormatter pattern letter. diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java index 9db8c3436..513fc1a4c 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/csv/CsvSinkWriterTest.java @@ -8,6 +8,7 @@ import java.time.Clock; import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import static org.junit.Assert.assertEquals; @@ -31,7 +32,7 @@ private CsvSinkConfig config(boolean writeHeader) { } private CsvSinkConfig config(String basePath, boolean writeHeader) { - return new CsvSinkConfig(basePath, "my bookings job", "output", "dd-MMM-yyyy", ",", writeHeader); + return new CsvSinkConfig(basePath, "my bookings job", "output", "dd-MMM-yyyy", ZoneId.of("UTC"), ",", writeHeader); } private CsvSinkWriter writer(String[] columnNames, boolean writeHeader, Clock clock) { @@ -43,7 +44,7 @@ private CsvSinkWriter writerWithBasePath(String basePath) { } private CsvSinkWriter writerWithFormat(String dateFormat, Clock clock) { - CsvSinkConfig config = new CsvSinkConfig(BASE_PATH, "my bookings job", "output", dateFormat, ",", false); + CsvSinkConfig config = new CsvSinkConfig(BASE_PATH, "my bookings job", "output", dateFormat, ZoneId.of("UTC"), ",", false); return new CsvSinkWriter(new String[]{"a"}, config, storageClient, new OverwriteWriteStrategy(), clock); } @@ -137,6 +138,19 @@ public void shouldShardIntoSeparateFilesPerMinute() throws Exception { assertEquals(2, storageClient.getWriteCount()); } + @Test + public void shouldResolvePartitionDateInConfiguredTimezone() throws Exception { + // 2026-06-09T17:30:00Z is already 2026-06-10 00:30 in Asia/Jakarta (UTC+7), so the daily partition rolls to the 10th. + Clock jakartaClock = Clock.fixed(Instant.parse("2026-06-09T17:30:00Z"), ZoneId.of("Asia/Jakarta")); + CsvSinkConfig config = new CsvSinkConfig(BASE_PATH, "my bookings job", "output", "dd-MMM-yyyy", ZoneId.of("Asia/Jakarta"), ",", false); + CsvSinkWriter writer = new CsvSinkWriter(new String[]{"a"}, config, storageClient, new OverwriteWriteStrategy(), jakartaClock); + + writer.write(Row.of("x"), null); + writer.prepareCommit(false); + + assertTrue(storageClient.exists("file:///tmp/out/my_bookings_job/output-10-Jun-2026.csv")); + } + @Test public void shouldStripSingleTrailingSlashFromBasePath() throws Exception { CsvSinkWriter writer = writerWithBasePath("file:///tmp/out/"); diff --git a/docs/docs/guides/create_dagger.md b/docs/docs/guides/create_dagger.md index 1c0498a44..8a6e7c62c 100644 --- a/docs/docs/guides/create_dagger.md +++ b/docs/docs/guides/create_dagger.md @@ -275,6 +275,7 @@ SINK_CSV_BASE_PATH=oss://bucket-name/some-folder # === optional (defaults shown) === SINK_CSV_WRITE_MODE=OVERWRITE SINK_CSV_PARTITION_DATE_FORMAT=yyyy-MMM-dd-HH-mm +SINK_CSV_PARTITION_TIMEZONE=Asia/Jakarta SINK_CSV_DELIMITER=, SINK_CSV_WRITE_HEADER=true SINK_CSV_FILENAME_PREFIX=output diff --git a/docs/docs/reference/configuration.md b/docs/docs/reference/configuration.md index 7d96e1ee7..65e736035 100644 --- a/docs/docs/reference/configuration.md +++ b/docs/docs/reference/configuration.md @@ -507,6 +507,14 @@ Allowed characters: the `DateTimeFormatter` pattern letters (such as `y`, `M`, ` * Type: `optional` * Default value: `yyyy-MMM-dd-HH-mm` +#### `SINK_CSV_PARTITION_TIMEZONE` + +Defines the timezone used to resolve the wall-clock date that drives the partition boundary (the `SINK_CSV_PARTITION_DATE_FORMAT` value). It must be a valid IANA timezone id; an invalid id is rejected at startup. Set this to your local zone so files roll over at local midnight (or the local hour/minute) rather than at UTC. + +* Example value: `Asia/Jakarta` +* Type: `optional` +* Default value: `Asia/Jakarta` + #### `SINK_CSV_DELIMITER` Defines the field delimiter used between columns. Values containing the delimiter, double-quotes, or newlines are quoted following RFC 4180. From c027a3b0705c9079da20aef8c32b471e74e06feb Mon Sep 17 00:00:00 2001 From: rajuGT Date: Tue, 16 Jun 2026 23:30:31 +0700 Subject: [PATCH 10/10] change SINK_CSV_WRITE_MODE_DEFAULT to Append --- .../java/com/gotocompany/dagger/core/utils/Constants.java | 2 +- docs/docs/guides/create_dagger.md | 2 +- docs/docs/reference/configuration.md | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index a2b6b62c1..7b0a0f5e2 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -211,7 +211,7 @@ public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIAB public static final String SINK_CSV_WRITE_MODE_KEY = "SINK_CSV_WRITE_MODE"; public static final String SINK_CSV_WRITE_MODE_APPEND = "APPEND"; public static final String SINK_CSV_WRITE_MODE_OVERWRITE = "OVERWRITE"; - public static final String SINK_CSV_WRITE_MODE_DEFAULT = SINK_CSV_WRITE_MODE_OVERWRITE; + public static final String SINK_CSV_WRITE_MODE_DEFAULT = SINK_CSV_WRITE_MODE_APPEND; // Java DateTimeFormatter pattern rendered with Locale.ENGLISH. Its finest field decides the file rolling/sharding // granularity (e.g. yyyy=yearly, yyyy-MM=monthly, dd-MMM-yyyy=daily, yyyy-MMM-dd-HH-mm=minutely). // Allowed characters: pattern letters (y, M, d, H, m, s, ...) and the separators '-' and '_'; anything else is rejected. diff --git a/docs/docs/guides/create_dagger.md b/docs/docs/guides/create_dagger.md index 8a6e7c62c..775764351 100644 --- a/docs/docs/guides/create_dagger.md +++ b/docs/docs/guides/create_dagger.md @@ -273,7 +273,7 @@ OUTPUT_KAFKA_TOPIC=test-kafka-output SINK_TYPE=csv SINK_CSV_BASE_PATH=oss://bucket-name/some-folder # === optional (defaults shown) === -SINK_CSV_WRITE_MODE=OVERWRITE +SINK_CSV_WRITE_MODE=APPEND SINK_CSV_PARTITION_DATE_FORMAT=yyyy-MMM-dd-HH-mm SINK_CSV_PARTITION_TIMEZONE=Asia/Jakarta SINK_CSV_DELIMITER=, diff --git a/docs/docs/reference/configuration.md b/docs/docs/reference/configuration.md index 65e736035..e4a2b56cb 100644 --- a/docs/docs/reference/configuration.md +++ b/docs/docs/reference/configuration.md @@ -487,9 +487,9 @@ Defines the root path under which the daily CSV files are written. The job id an Defines how each flush writes to the daily file. `OVERWRITE` fully replaces the file with the latest buffer (best for windowed/aggregated snapshots), while `APPEND` performs a read-modify-write to accumulate rows (best for time-series/passthrough). `APPEND` is at-least-once and may produce duplicate rows on restart. -* Example value: `APPEND` +* Example value: `OVERWRITE` * Type: `optional` -* Default value: `OVERWRITE` +* Default value: `APPEND` #### `SINK_CSV_PARTITION_DATE_FORMAT`