Skip to content

[SPARK-56371][SQL] Support _metadata.row_index for V2 Parquet reads#55321

Draft
LuciferYang wants to merge 13 commits intoapache:masterfrom
LuciferYang:SPARK-56371
Draft

[SPARK-56371][SQL] Support _metadata.row_index for V2 Parquet reads#55321
LuciferYang wants to merge 13 commits intoapache:masterfrom
LuciferYang:SPARK-56371

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Add support for the Parquet-specific generated row_index field on the V2
_metadata struct, completing V1 metadata-column parity for V2 Parquet tables.
This is the follow-up to SPARK-56335 (constant metadata fields).

This PR also restores vectorized columnar reads for V2 file metadata queries.
SPARK-56335 had to disable them because ConstantColumnVector cannot represent
a struct column; the new CompositeStructColumnVector lifts that restriction.

Key changes:

  1. CompositeStructColumnVector (new, Java): A minimal struct-typed
    ColumnVector that wraps a fixed array of arbitrary child column vectors.
    Used by the metadata wrapper to compose _metadata columnar batches whose
    children are a mix of ConstantColumnVector (for constant fields like
    file_path) and per-row vectors supplied by the format reader (e.g.,
    Parquet's _tmp_metadata_row_index). Zero-copy.

  2. ParquetTable.metadataSchemaFields: Overrides the FileTable extension
    point to append ParquetFileFormat.ROW_INDEX_FIELD, mirroring the V1
    ParquetFileFormat.metadataSchemaFields.

  3. FileScanBuilder.pruneColumns: Now inspects each requested _metadata
    sub-field. Constant fields flow through requestedMetadataFields unchanged.
    For generated fields (matched via FileSourceGeneratedMetadataStructField),
    the corresponding internal column (e.g., _tmp_metadata_row_index) is
    appended to requiredSchema so the format reader populates it. Internal
    columns are added with nullable = true so the Parquet reader treats them as
    synthetic via missingColumns / ParquetRowIndexUtil rather than failing
    the required-column check.

  4. FileScan.readSchema: Hides internal columns from the user-visible scan
    output. They live inside readDataSchema for the format reader, but must not
    appear in readSchema() because V2's PushDownUtils.toOutputAttrs looks
    each output column up by name in the relation output and the internal name is
    not a real column.

  5. MetadataAppendingFilePartitionReaderFactory (rewritten):

    • Row path: uses UnsafeProjection.create over BoundReferences and
      CreateNamedStruct. Constant metadata values are baked in as Literals
      for the split; generated values come from BoundReferences into the base
      row at the position of the internal column.
    • Columnar path (newly enabled): takes the input ColumnarBatch, drops
      the internal columns from the top-level column array, and appends a
      CompositeStructColumnVector for _metadata whose children are
      ConstantColumnVectors (constants) and direct references to the format
      reader's column vectors (generated). Zero-copy.
    • supportColumnarReads now delegates to the wrapped factory.
  6. wrapWithMetadataIfNeeded: Takes the read data schema as a parameter so
    the wrapper can compute the visible/internal column split. ParquetScan
    passes effectiveReadDataSchema (variant-pushdown aware); other scans pass
    their readDataSchema.

Why are the changes needed?

_metadata.row_index works on V1 Parquet but was unresolved on V2 Parquet
tables, forcing fallback to the V1 path. This blocks deprecating the V1 file
sources (SPARK-56170).

The vectorized restoration also recovers the performance regression SPARK-56335
introduced for _metadata queries: with SPARK-56335 alone, any _metadata.*
reference disabled columnar reads and fell back to the row path. After this
change, columnar reads work for both constant and generated metadata fields.

Does this PR introduce any user-facing change?

Yes. SELECT _metadata.row_index FROM parquet_table now works against V2
Parquet with the same semantics as V1. Vectorized reads are no longer disabled
when _metadata is referenced.

How was this patch tested?

  • New ParquetMetadataRowIndexV2Suite.
  • Pass Github Actions

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

…Frame API writes and delete FallBackFileSourceV2

Key changes:
- FileWrite: added partitionSchema, customPartitionLocations,
  dynamicPartitionOverwrite, isTruncate; path creation and truncate
  logic; dynamic partition overwrite via FileCommitProtocol
- FileTable: createFileWriteBuilder with SupportsDynamicOverwrite
  and SupportsTruncate; capabilities now include TRUNCATE and
  OVERWRITE_DYNAMIC; fileIndex skips file existence checks when
  userSpecifiedSchema is provided (write path)
- All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use
  createFileWriteBuilder with partition/truncate/overwrite support
- DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for
  non-partitioned Append and Overwrite via df.write.save(path)
- DataFrameWriter.insertInto: V1 fallback for file sources
  (TODO: SPARK-56175)
- DataFrameWriter.saveAsTable: V1 fallback for file sources
  (TODO: SPARK-56230, needs StagingTableCatalog)
- DataSourceV2Utils.getTableProvider: V1 fallback for file sources
  (TODO: SPARK-56175)
- Removed FallBackFileSourceV2 rule
- V2SessionCatalog.createTable: V1 FileFormat data type validation
…catalog table loading, and gate removal

Key changes:
- FileTable extends SupportsPartitionManagement with createPartition,
  dropPartition, listPartitionIdentifiers, partitionSchema
- Partition operations sync to catalog metastore (best-effort)
- V2SessionCatalog.loadTable returns FileTable instead of V1Table,
  sets catalogTable and useCatalogFileIndex on FileTable
- V2SessionCatalog.getDataSourceOptions includes storage.properties
  for proper option propagation (header, ORC bloom filter, etc.)
- V2SessionCatalog.createTable validates data types via FileTable
- FileTable.columns() restores NOT NULL constraints from catalogTable
- FileTable.partitioning() falls back to userSpecifiedPartitioning
  or catalog partition columns
- FileTable.fileIndex uses CatalogFileIndex when catalog has
  registered partitions (custom partition locations)
- FileTable.schema checks column name duplication for non-catalog
  tables only
- DataSourceV2Utils.getTableProvider: removed FileDataSourceV2 gate
- DataFrameWriter.insertInto: enabled V2 for file sources
- DataFrameWriter.saveAsTable: V1 fallback (TODO: SPARK-56230)
- ResolveSessionCatalog: V1 fallback for FileTable-backed commands
  (AnalyzeTable, AnalyzeColumn, TruncateTable, TruncatePartition,
  ShowPartitions, RecoverPartitions, AddPartitions, RenamePartitions,
  DropPartitions, SetTableLocation, CREATE TABLE validation,
  REPLACE TABLE blocking)
- FindDataSourceTable: streaming V1 fallback for FileTable
  (TODO: SPARK-56233)
- DataSource.planForWritingFileFormat: graceful V2 handling
Enable bucketed writes for V2 file tables via catalog BucketSpec.

Key changes:
- FileWrite: add bucketSpec field, use V1WritesUtils.getWriterBucketSpec()
  instead of hardcoded None
- FileTable: createFileWriteBuilder passes catalogTable.bucketSpec
  to the write pipeline
- FileDataSourceV2: getTable uses collect to skip BucketTransform
  (handled via catalogTable.bucketSpec instead)
- FileWriterFactory: use DynamicPartitionDataConcurrentWriter for
  bucketed writes since V2's RequiresDistributionAndOrdering cannot
  express hash-based ordering
- All 6 format Write/Table classes updated with BucketSpec parameter

Note: bucket pruning and bucket join (read-path optimization) are
not included in this patch (tracked under SPARK-56231).
Add RepairTableExec to sync filesystem partition directories with
catalog metastore for V2 file tables.

Key changes:
- New RepairTableExec: scans filesystem partitions via
  FileTable.listPartitionIdentifiers(), compares with catalog,
  registers missing partitions and drops orphaned entries
- DataSourceV2Strategy: route RepairTable and RecoverPartitions
  for FileTable to new V2 exec node
Implement SupportsOverwriteV2 for V2 file tables to support static
partition overwrite (INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...).

Key changes:
- FileTable: replace SupportsTruncate with SupportsOverwriteV2 on
  WriteBuilder, implement overwrite(predicates)
- FileWrite: extend toBatch() to delete only the matching partition
  directory, ordered by partitionSchema
- FileTable.CAPABILITIES: add OVERWRITE_BY_FILTER
- All 6 format Write/Table classes: plumb overwritePredicates parameter

This is a prerequisite for SPARK-56304 (ifPartitionNotExists).
…EAD)

### What changes were proposed in this pull request?

Implements `MicroBatchStream` support for V2 file tables, enabling structured streaming reads through the V2 path instead of falling back to V1 `FileStreamSource`.

Key changes:
- New `FileMicroBatchStream` class implementing `MicroBatchStream`, `SupportsAdmissionControl`, and `SupportsTriggerAvailableNow` — handles file discovery, offset management, rate limiting, and partition planning
- Override `FileScan.toMicroBatchStream()` to return `FileMicroBatchStream`
- Add `withFileIndex` method to `FileScan` and all 6 concrete scans for creating batch-specific scans
- Add `MICRO_BATCH_READ` to `FileTable.CAPABILITIES`
- Update `ResolveDataSource` to allow `FileDataSourceV2` into the V2 streaming path (respects `USE_V1_SOURCE_LIST` for backward compatibility)
- Remove the `FileTable` streaming fallback in `FindDataSourceTable`
- Reuses V1 infrastructure (`FileStreamSourceLog`, `FileStreamSourceOffset`, `SeenFilesMap`) for checkpoint compatibility

### Why are the changes needed?

V2 file tables cannot be fully adopted until streaming reads are supported. Without this, the V1 `FileStreamSource` fallback prevents deprecation of V1 file source code.

### Does this PR introduce _any_ user-facing change?

No. By default, `USE_V1_SOURCE_LIST` includes all file formats, so streaming reads still use V1. Users can opt into V2 by clearing the list. Existing checkpoints are compatible.

### How was this patch tested?

New `FileStreamV2ReadSuite` with 6 E2E tests. Existing `FileStreamSourceSuite` (76 tests) passes with V1 forced via `USE_V1_SOURCE_LIST`.
…ITE)

### What changes were proposed in this pull request?

Implements `StreamingWrite` support for V2 file tables, enabling structured streaming writes through the V2 path instead of falling back to V1 `FileStreamSink`.

Key changes:
- New `FileStreamingWrite` class implementing `StreamingWrite` — uses `ManifestFileCommitProtocol` for file commit and `FileStreamSinkLog` for metadata tracking
- New `FileStreamingWriterFactory` bridging `DataWriterFactory` to `StreamingDataWriterFactory`
- Override `FileWrite.toStreaming()` to return `FileStreamingWrite`
- Add `STREAMING_WRITE` to `FileTable.CAPABILITIES`
- Idempotent `commit(epochId, messages)` — skips already-committed batches
- Supports `retention` option for metadata log cleanup (V1 parity)
- Checkpoint compatible with V1 `FileStreamSink` (same `_spark_metadata` format)

### Why are the changes needed?

V2 file tables cannot be fully adopted until streaming writes are supported. Without this, the V1 `FileStreamSink` fallback prevents deprecation of V1 file source code. Together with SPARK-56232 (streaming read), this completes the streaming support needed for V1 deprecation.

### Does this PR introduce _any_ user-facing change?

No. By default, `USE_V1_SOURCE_LIST` includes all file formats, so streaming writes still use V1. Users can opt into V2 by clearing the list. Existing checkpoints are compatible.

### How was this patch tested?

New `FileStreamV2WriteSuite` with 4 E2E tests. Existing `FileStreamSinkV1Suite` passes. All 108 streaming file tests pass.
Exposes the V1-compatible `_metadata` struct column (`file_path`, `file_name`,
`file_size`, `file_block_start`, `file_block_length`, `file_modification_time`)
on V2 file-based tables so that queries like
`SELECT _metadata.file_path FROM parquet.`<path>`` work against the V2 scan
path instead of forcing a V1 fallback.

The wiring is:

* `FileTable` implements `SupportsMetadataColumns.metadataColumns()` and returns
  a single `_metadata` struct column whose fields come from
  `FileFormat.BASE_METADATA_FIELDS`. Formats may extend `metadataSchemaFields`
  later to expose additional fields (e.g., Parquet's `row_index`, tracked in
  SPARK-56371).
* `FileScanBuilder.pruneColumns` intercepts the `_metadata` field from the
  required schema, stores the pruned metadata struct on
  `requestedMetadataFields`, and keeps it out of `readDataSchema` so the
  format-specific reader stays unchanged.
* `FileScan.readSchema` re-exposes `_metadata` as a trailing struct field when
  metadata is requested, so `V2ScanRelationPushDown` can rebind the downstream
  attribute reference back to the scan output.
* A new `MetadataAppendingFilePartitionReaderFactory` wraps the format-specific
  reader factory and appends a single `_metadata` struct value (via
  `JoinedRow` + an inner `GenericInternalRow`) to each row. Columnar reads are
  disabled while metadata is requested since `ConstantColumnVector` is scalar
  and cannot represent a struct column; queries fall back to the row path.
* All six concrete scans (Parquet/ORC/CSV/JSON/Text/Avro) take
  `requestedMetadataFields` as a trailing default-valued case-class parameter
  and call the new `wrapWithMetadataIfNeeded` helper when constructing their
  reader factory. Their `ScanBuilder.build()` implementations pass the field
  through from `FileScanBuilder`.

Parquet's generated `row_index` metadata field is intentionally out of scope;
follow-up work is tracked in SPARK-56371.

Before this change, `_metadata` on a DSv2 file table was unresolvable and the
query fell back to the V1 `FileSourceScanExec` path, which is one of the
remaining blockers for deprecating the V1 file sources (SPARK-56170).

Yes. `_metadata.*` queries now work against the V2 file sources with the same
semantics as V1.

New `FileMetadataColumnsV2Suite` exercises read and projection paths for
Parquet/ORC/JSON/CSV/Text, forcing the V2 path via `useV1SourceList`, and
asserts the metadata struct values against the underlying file's
`java.io.File` stats. All 16 tests pass.
Adds support for the Parquet-specific generated `row_index` field on the V2
`_metadata` struct, completing V1 metadata-column parity for V2 Parquet tables.
This is the follow-up to SPARK-56335 (constant metadata fields).

The implementation also restores vectorized columnar reads for any V2 file
metadata query (SPARK-56335 had to disable them because `ConstantColumnVector`
cannot represent a struct column; the new `CompositeStructColumnVector` lifts
that restriction).

* `CompositeStructColumnVector` (Java) - a minimal struct-typed `ColumnVector`
  that wraps a fixed array of arbitrary child column vectors. Used by the
  metadata wrapper to compose `_metadata` columnar batches whose children are
  a mix of `ConstantColumnVector` (for constant fields like `file_path`) and
  per-row vectors supplied by the format reader (e.g., Parquet's
  `_tmp_metadata_row_index`).

* `ParquetTable.metadataSchemaFields` - overrides the V2 `FileTable` extension
  point to append `ParquetFileFormat.ROW_INDEX_FIELD`, mirroring V1
  `ParquetFileFormat.metadataSchemaFields`.

* `FileScanBuilder.pruneColumns` - now inspects each requested `_metadata`
  sub-field. Constant fields continue to flow through `requestedMetadataFields`
  unchanged; for generated fields (matched via
  `FileSourceGeneratedMetadataStructField`), the corresponding internal column
  (e.g., `_tmp_metadata_row_index`) is appended to `requiredSchema` so the
  format reader populates it. Internal columns are added with `nullable = true`
  so the Parquet reader treats them as synthetic via `missingColumns` /
  `ParquetRowIndexUtil` rather than failing the required-column check.

* `FileScan.readSchema` - hides internal columns from the user-visible scan
  output. They live inside `readDataSchema` for the format reader, but must not
  appear in `readSchema()`: V2's `PushDownUtils.toOutputAttrs` looks each output
  column up by name in the relation output and the internal name is not a real
  column.

* `MetadataAppendingFilePartitionReaderFactory` - rewritten:
  - Row path uses `UnsafeProjection.create` over `BoundReference`s and
    `CreateNamedStruct`. Constant metadata values are baked in as `Literal`s
    for the split; generated values come from `BoundReference`s into the
    base row at the position of the internal column.
  - Columnar path (newly enabled) takes the input `ColumnarBatch`, drops the
    internal columns from the top-level column array, and appends a
    `CompositeStructColumnVector` for `_metadata` whose children are
    `ConstantColumnVector`s (constants) and direct references to the format
    reader's column vectors (generated). Zero-copy.
  - `supportColumnarReads` now delegates to the wrapped factory.

* `wrapWithMetadataIfNeeded` takes the read data schema as a parameter so the
  wrapper can compute the visible/internal column split. ParquetScan passes
  `effectiveReadDataSchema` (variant pushdown aware); other scans pass their
  `readDataSchema`.

`_metadata.row_index` works on V1 Parquet but was unresolved on V2 Parquet
tables, forcing fallback to the V1 path. This blocks deprecating the V1 file
sources (SPARK-56170). With this change, `SELECT _metadata.row_index FROM t`
works against V2 Parquet with the same semantics as V1.

The vectorized restoration also recovers the perf regression SPARK-56335
introduced for plain `_metadata.file_path`-style queries.

Yes:
1. `_metadata.row_index` is now available on V2 Parquet tables.
2. Queries that select any `_metadata.*` columns on V2 file tables now use
   vectorized reads when the underlying format supports them, instead of
   falling back to the row-based path.

* New `ParquetMetadataRowIndexV2Suite` (8 tests):
  - per-row values via vectorized + row-based readers
  - row_index resets per file across multiple files
  - combined constant + generated metadata fields in one query
  - filter on `_metadata.row_index`
  - metadata-only projection (no data columns)
  - row_index with partitioned table
  - EXPLAIN shows row_index in the MetadataColumns entry

* Existing suites still pass: `FileMetadataColumnsV2Suite` (24, SPARK-56335),
  `FileMetadataStructSuite` (V1, ~100), `MetadataColumnSuite` (~4). 136 tests
  total across these suites.

* Scalastyle: `sql`, `sql/Test`, `avro` clean.

Builds on top of SPARK-56335 (constant metadata column support for V2 file
tables).
@LuciferYang LuciferYang marked this pull request as draft April 13, 2026 07:12
@LuciferYang
Copy link
Copy Markdown
Contributor Author

This is the 13th PR for SPARK-56170

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant