Skip to content

feat: Native Parquet Iceberg Data File Writes In Comet#4487

Open
jordepic wants to merge 3 commits into
apache:mainfrom
jordepic:main
Open

feat: Native Parquet Iceberg Data File Writes In Comet#4487
jordepic wants to merge 3 commits into
apache:mainfrom
jordepic:main

Conversation

@jordepic

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4322.

Rationale for this change

Comet, up until this point, has mainly been focused with accelerating reads from iceberg tables. However, a significant resources are being spent across various companies in order to rewrite iceberg data. Large tables need to be compacted to maintain their sort/Z order, and general data pipelines may write significant amounts of iceberg data. Having to do a large transpose between columnar and row-wise data is inefficient, and we'd much prefer to go directly from arrow-based column batches to parquet on disk.

What changes are included in this PR?

This change is split into three parts.

  1. Splitting the existing iceberg V2 spark write command into two: a "writer" spark operator and a "committer" spark operator.
  • This allows the iceberg data file write to be treated identically to a spark V1 parquet writing command, thereby allowing using similar code to handle the operator
  • Without it, both the data file writing and committing would live outside of the scope of spark adaptive query execution operator wrappers - we instead want the write operator to be within AQE so that as the data feeds into it gets re-planned we can determine whether the upstream data of the write is columnar
  1. Determining which operators should be converted to native code
  • This follows a simple philosophy: only convert writes to native code if they produce an "identical" outcome as the Java path
  • Disclaimer: nothing truly produces an identical result because parquet row group flushing is different
  • Besides that though, we can generally convert "normal" writes (parquet, default settings, some flexibility to change other settings as outlined in iceberg-writes.md, no delete files since iceberg-rust doesn't support positional deletes/DVs)
  1. Native iceberg write operator
  • The JVM is responsible for computing all iceberg writing settings and using a protobuf object to pass them to rust
  • Rust uses iceberg-rs in order to write the file and return avro-encoded dummy manifest bytes back to the JVM

How are these changes tested?

This change is tested extensively.

  1. Tests to ensure that iceberg writes are replaced by our "two-operator" structure
  2. Tests to ensure that the comet JVM properly serializes relevant data to protocol buffer form to go to the native layer
  3. Tests to ensure that native writes are only performed under very specific iceberg table properties
  4. Tests to ensure that native writes actually function as expected
  5. Tests to ensure that compaction/sorting/z-ordering can now be fully accelerated with native writing

@jordepic jordepic force-pushed the main branch 4 times, most recently from 7118d08 to 15c8f15 Compare May 29, 2026 15:25

@comphead comphead left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jordepic this is epic PR, @mbutrovich FYI, my understanding though that writes should be addressed in iceberg-rs to be supported by iceberg community and reusable by other users

@jordepic

Copy link
Copy Markdown
Contributor Author

Thanks @jordepic this is epic PR, @mbutrovich FYI, my understanding though that writes should be addressed in iceberg-rs to be supported by iceberg community and reusable by other users

Can you elaborate on that @comphead ? I use iceberg-rs to perform the writes here!

@jordepic

jordepic commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

@mbutrovich sorry for the additional tag here. I've actually been running this locally now and it's really been effective. Would you like me to try and split it a bit further, clean up some comments, etc?

@andygrove

Copy link
Copy Markdown
Member

Thanks for the epic PR @jordepic. I've started looking through it and using AI to help me comprehend and review this, since I am not an Iceberg expert.

I noticed that this PR is creating directly from the main branch of your fork - I'd recommend creating a separate branch.

I like this this functionality is disabled by default so users can opt-in while this goes through more testing.

Could you share some performance numbers and explain how you are benchmarking this?

@jordepic

jordepic commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Thanks, Andy! I'd always be happy to jump on a call to discuss the PR in more detail. I'm not really an expert of anything (and just have medium knowledge in Spark and slightly better for iceberg). Let me see if I can go about getting benchmarks! I find that with wider datasets the difference is more apparent because there is more of a penalty to doing an extra transpose when reading iceberg into rows and writing it back.

Also, I'm starting to break this PR into chunks to make it easier for review so that Matt can review it when he is back from pto. Here is the first link:
#4658

@jordepic

jordepic commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Methodology:

  • Write a 2 million row parquet file, which came to 1.5 GB, use it for all tests
  • Read it in and write it back to iceberg
  • We can read the file with or without comet, and write it with or without comet
  • Use reading in comet but writing with iceberg-java as a control
A baseline B comet read C comet read+write total A→C
100 columns (2M rows) 16.53s 13.83s 4.57s 3.6×
1 column (86.5M rows) 7.55s 7.74s 3.16s 2.4×

Like every great benchmark, this was taken on my Mac with a bunch of other shit running on it (I could tell you what but then I'd have to kill you).

You can see that the full native pipeline is 3.6x as fast as the writing pipeline with 100 columns. You can see that much of the performance gain can be attributed to enabling native writes. However, I postulated that much of that could have been due to having to pivot data. For that reason, I made a subsequent test doing the same thing but on a 1 column parquet file. You can see that the majority of improvement in performance actually comes from enabling writes, even though the relative difference between all JVM and all native is less.

In practice, I've had some phenomenal numbers. I'm working with a 5k column dataset that is virtually unwritable with Comet. Now, I can treat it like any other dataset! I find that datasets with fewer columns may exhibit equal performance to spark, I imagine there may be some weird overhead spinning up comet executors on K8s but I haven't investigated this too much yet. I'll attach an image of a query plan too.

image (17)

Here is a plan that goes through many different join operations and keeps the whole pipeline columnar, including the write at the end! This can be really impactful for ETL jobs and compactions :).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Writes to Apache Iceberg Tables

3 participants