Skip to content

POC - CSV Sink#71

Open
rajuGT wants to merge 6 commits into
mainfrom
feat/csv-sink
Open

POC - CSV Sink#71
rajuGT wants to merge 6 commits into
mainfrom
feat/csv-sink

Conversation

@rajuGT

@rajuGT rajuGT commented Jun 10, 2026

Copy link
Copy Markdown

No description provided.

rajuGT and others added 6 commits June 10, 2026 15:22
Introduce configuration keys and defaults for the upcoming CSV sink:
base path, write mode (APPEND/OVERWRITE, default OVERWRITE), date format,
delimiter, header toggle, and filename prefix.

Co-authored-by: Cursor <cursoragent@cursor.com>
Add FileStorageClient abstraction (read/exists/write with whole-object
semantics) and a single FlinkFileSystemStorageClient implementation backed
by Flink's FileSystem, so the path scheme (file/gs/oss/cosn/s3) selects the
backend without any cloud-provider SDKs. Includes unit tests.

Co-authored-by: Cursor <cursoragent@cursor.com>
Add FileWriteStrategy with AppendWriteStrategy (read-modify-write,
time-series) and OverwriteWriteStrategy (full-replace snapshot, no-op on
empty buffer), plus a factory keyed on SINK_CSV_WRITE_MODE. Both no-op when
the buffer is empty. Includes tests and an in-memory storage test helper.

Co-authored-by: Cursor <cursoragent@cursor.com>
Add the Flink Sink V1 CsvSink and its CsvSinkWriter, which buffers Row
outputs and flushes them on every checkpoint (snapshotState) and on close.
Output rolls over daily to basePath/<jobId>/<prefix>-<date>.csv. Values are
RFC-4180 escaped, nulls become empty cells, and Map/Collection/array values
are JSON-encoded into a single cell. A Clock is injectable for deterministic
date tests. Includes CsvSinkWriterTest.

Co-authored-by: Cursor <cursoragent@cursor.com>
Register SINK_TYPE=csv in SinkOrchestrator via a new CsvSinkBuilder that
reads Dagger Configuration (SINK_CSV_BASE_PATH required) and assembles the
sink with its write strategy and Flink filesystem storage client. Pin the
CSV sink to parallelism 1 in DaggerSqlJobBuilder so a single subtask owns
the daily file, leaving other sinks untouched. Add SINK_TYPE_* constants
and tests for the builder and orchestrator wiring.

Co-authored-by: Cursor <cursoragent@cursor.com>
Add the CSV sink to configuration.md (TOC, SINK_TYPE list, and a full CSV
Sink section covering SINK_CSV_* keys with examples and defaults) and to
create_dagger.md (supported-sinks list, prerequisites bullet, and a CSV
Sink section with a sample properties block and OVERWRITE/APPEND guidance).

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant