Skip to content
Open
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ bin
.settings
.gradletasknamecache
.DS_Store
dagger-common/src/generated-sources/
dagger-common/src/generated-sources/
plans
.cursor
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.gotocompany.dagger.functions.udfs.python.PythonUdfManager;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.api.Table;
Expand Down Expand Up @@ -231,6 +232,15 @@ private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) {
private void addSink(StreamInfo streamInfo) {
SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter);
sinkOrchestrator.addSubscriber(telemetryExporter);
streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter, InfluxSinkOverrides.none()));
DataStreamSink<Row> dataStreamSink = streamInfo.getDataStream()
.sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter, InfluxSinkOverrides.none()));

// make sure previous tasks aggregate the data to one value if you don't want multiple rows for same keys/labels in csv
// For example
// GoFood created 35 (from task 0)
// GoFood created 15 (from task 1)
if (Constants.SINK_TYPE_CSV.equals(configuration.getString(Constants.SINK_TYPE_KEY, Constants.SINK_TYPE_DEFAULT))) {
dataStreamSink.setParallelism(1).name("csv-file-sink");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryPublisher;
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes;
import com.gotocompany.dagger.core.sink.bigquery.BigQuerySinkBuilder;
import com.gotocompany.dagger.core.sink.csv.CsvSinkBuilder;
import com.gotocompany.dagger.core.sink.influx.ErrorHandler;
import com.gotocompany.dagger.core.sink.influx.InfluxDBFactoryWrapper;
import com.gotocompany.dagger.core.sink.influx.InfluxDBSink;
Expand Down Expand Up @@ -80,6 +81,9 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl
case "log":
sink = new LogSink(columnNames);
break;
case "csv":
sink = CsvSinkBuilder.build(configuration, columnNames);
break;
case "bigquery":
sink = BigQuerySinkBuilder.create()
.setColumnNames(columnNames)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.gotocompany.dagger.core.sink.csv;

import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient;
import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategy;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.types.Row;

import java.util.List;
import java.util.Optional;

/**
* Sink that writes the output {@link Row}s as a daily-rolling CSV file to any Flink filesystem.
* Buffered rows are flushed on every checkpoint (via the writer's snapshotState). At-least-once;
* the sink must run with parallelism 1 since a single daily object cannot be written concurrently.
*/
public class CsvSink implements Sink<Row, Void, Void, Void> {

private final String[] columnNames;
private final CsvSinkConfig config;
private final FileStorageClient storageClient;
private final FileWriteStrategy writeStrategy;

public CsvSink(String[] columnNames, CsvSinkConfig config, FileStorageClient storageClient, FileWriteStrategy writeStrategy) {
this.columnNames = columnNames;
this.config = config;
this.storageClient = storageClient;
this.writeStrategy = writeStrategy;
}

@Override
public SinkWriter<Row, Void, Void> createWriter(InitContext context, List<Void> states) {
return new CsvSinkWriter(columnNames, config, storageClient, writeStrategy);
}

@Override
public Optional<SimpleVersionedSerializer<Void>> getWriterStateSerializer() {
return Optional.empty();
}

@Override
public Optional<Committer<Void>> createCommitter() {
return Optional.empty();
}

@Override
public Optional<GlobalCommitter<Void, Void>> createGlobalCommitter() {
return Optional.empty();
}

@Override
public Optional<SimpleVersionedSerializer<Void>> getCommittableSerializer() {
return Optional.empty();
}

@Override
public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.gotocompany.dagger.core.sink.csv;

import com.gotocompany.dagger.common.configuration.Configuration;
import com.gotocompany.dagger.core.sink.csv.storage.FileStorageClient;
import com.gotocompany.dagger.core.sink.csv.storage.FlinkFileSystemStorageClient;
import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategy;
import com.gotocompany.dagger.core.sink.csv.writemode.FileWriteStrategyFactory;
import com.gotocompany.dagger.core.utils.Constants;

import java.time.DateTimeException;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.regex.Pattern;

/**
* Builds a {@link CsvSink} from Dagger {@link Configuration}, wiring the write strategy and the
* Flink filesystem-backed storage client. SINK_CSV_BASE_PATH is required; everything else has a
* sensible default defined in {@link Constants}.
*/
public class CsvSinkBuilder {

private static final Pattern ALLOWED_DATE_FORMAT = Pattern.compile("^[A-Za-z_-]+$");

private CsvSinkBuilder() {
}

public static CsvSink build(Configuration configuration, String[] columnNames) {
String basePath = configuration.getString(Constants.SINK_CSV_BASE_PATH_KEY, "");
if (basePath == null || basePath.trim().isEmpty()) {
throw new IllegalArgumentException("Missing required configuration '" + Constants.SINK_CSV_BASE_PATH_KEY
+ "' for CSV sink. Example: oss://bucket-name/some-folder");
}

String dateFormat = configuration.getString(Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY, Constants.SINK_CSV_PARTITION_DATE_FORMAT_DEFAULT);
validateDateFormat(dateFormat);

String timezone = configuration.getString(Constants.SINK_CSV_PARTITION_TIMEZONE_KEY, Constants.SINK_CSV_PARTITION_TIMEZONE_DEFAULT);
ZoneId zoneId = validateTimezone(timezone);

String writeMode = configuration.getString(Constants.SINK_CSV_WRITE_MODE_KEY, Constants.SINK_CSV_WRITE_MODE_DEFAULT);
String delimiter = configuration.getString(Constants.SINK_CSV_DELIMITER_KEY, Constants.SINK_CSV_DELIMITER_DEFAULT);
boolean writeHeader = configuration.getBoolean(Constants.SINK_CSV_WRITE_HEADER_KEY, Constants.SINK_CSV_WRITE_HEADER_DEFAULT);
String filenamePrefix = configuration.getString(Constants.SINK_CSV_FILENAME_PREFIX_KEY, Constants.SINK_CSV_FILENAME_PREFIX_DEFAULT);
String jobId = configuration.getString(Constants.FLINK_JOB_ID_KEY, Constants.FLINK_JOB_ID_DEFAULT);

CsvSinkConfig config = new CsvSinkConfig(basePath.trim(), jobId, filenamePrefix, dateFormat, zoneId, delimiter, writeHeader);
FileWriteStrategy writeStrategy = FileWriteStrategyFactory.getWriteStrategy(writeMode);
FileStorageClient storageClient = new FlinkFileSystemStorageClient();

return new CsvSink(columnNames, config, storageClient, writeStrategy);
}

private static void validateDateFormat(String dateFormat) {
if (dateFormat == null || !ALLOWED_DATE_FORMAT.matcher(dateFormat).matches()) {
throw new IllegalArgumentException("Invalid '" + Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY + "' value '" + dateFormat
+ "' for CSV sink. Allowed characters: pattern letters (y, M, d, H, m, s, ...) and the separators "
+ "'-' and '_'. Characters such as '/', ':', '|', '.', and spaces are not allowed.");
}
try {
DateTimeFormatter.ofPattern(dateFormat, Locale.ENGLISH);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid '" + Constants.SINK_CSV_PARTITION_DATE_FORMAT_KEY + "' value '" + dateFormat
+ "' for CSV sink: not a valid date-time pattern.", e);
}
}

private static ZoneId validateTimezone(String timezone) {
try {
return ZoneId.of(timezone);
} catch (DateTimeException e) {
throw new IllegalArgumentException("Invalid '" + Constants.SINK_CSV_PARTITION_TIMEZONE_KEY + "' value '" + timezone
+ "' for CSV sink. Expected an IANA timezone id such as 'Asia/Jakarta' or 'UTC'.", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.gotocompany.dagger.core.sink.csv;

import java.io.Serializable;
import java.time.ZoneId;

/**
* Immutable, serializable configuration for the CSV sink. The full output path for a given day is
* {@code basePath/<sanitized jobId>/<filenamePrefix>-<date>.csv}.
*/
public class CsvSinkConfig implements Serializable {

private static final long serialVersionUID = 1L;

private final String basePath;
private final String jobId;
private final String filenamePrefix;
private final String dateFormat;
private final ZoneId zoneId;
private final String delimiter;
private final boolean writeHeader;

public CsvSinkConfig(String basePath, String jobId, String filenamePrefix, String dateFormat, ZoneId zoneId, String delimiter, boolean writeHeader) {
this.basePath = basePath;
this.jobId = jobId;
this.filenamePrefix = filenamePrefix;
this.dateFormat = dateFormat;
this.zoneId = zoneId;
this.delimiter = delimiter;
this.writeHeader = writeHeader;
}

public String getBasePath() {
return basePath;
}

public String getJobId() {
return jobId;
}

public String getFilenamePrefix() {
return filenamePrefix;
}

public String getDateFormat() {
return dateFormat;
}

public ZoneId getZoneId() {
return zoneId;
}

public String getDelimiter() {
return delimiter;
}

public boolean isWriteHeader() {
return writeHeader;
}
}
Loading
Loading