From 96456194150a6ba9e25dbe04b4969c3327eb7a64 Mon Sep 17 00:00:00 2001 From: linfeng Date: Wed, 1 Jul 2026 21:45:53 +0800 Subject: [PATCH 1/3] Iceberg changelog scan field-id projectio --- .../spark/source/AuronIcebergSourceUtil.scala | 17 ++++++++++ .../auron/iceberg/IcebergScanSupport.scala | 33 +++++++++++++++++-- .../AuronIcebergIntegrationSuite.scala | 29 ++++++++++++++++ 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala index c1d0a58d6..50520db62 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala @@ -18,6 +18,8 @@ package org.apache.iceberg.spark.source import scala.collection.JavaConverters._ +import org.apache.commons.lang3.reflect.FieldUtils +import org.apache.iceberg.Table import org.apache.iceberg.types.TypeUtil object AuronIcebergSourceUtil { @@ -37,8 +39,23 @@ object AuronIcebergSourceUtil { expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap } + def expectedFieldIdsForChangelogScan(scan: AnyRef): Map[String, Int] = { + val expectedSchema = + FieldUtils.readField(scan, "expectedSchema", true).asInstanceOf[org.apache.iceberg.Schema] + expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap + } + def detectRenameOrDrop(scan: AnyRef): RenameOrDrop = { val table = asBatchQueryScan(scan).table() + detectRenameOrDrop(table) + } + + def detectRenameOrDropForChangelogScan(scan: AnyRef): RenameOrDrop = { + val table = FieldUtils.readField(scan, "table", true).asInstanceOf[Table] + detectRenameOrDrop(table) + } + + private def detectRenameOrDrop(table: Table): RenameOrDrop = { val currentFields = collectFieldIdToName(table.schema()) table diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 3aa85b2de..6651a2743 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -211,6 +211,31 @@ object IcebergScanSupport extends Logging { } val (fileSchema, partitionSchema) = schemas.get + val fieldIdsByName = + try { + AuronIcebergSourceUtil.expectedFieldIdsForChangelogScan(scan.asInstanceOf[AnyRef]) + } catch { + case NonFatal(t) => + logWarning(s"Failed to inspect Iceberg changelog field ids for $scan.", t) + return None + } + + val renameOrDrop = + try { + AuronIcebergSourceUtil.detectRenameOrDropForChangelogScan(scan.asInstanceOf[AnyRef]) + } catch { + case NonFatal(t) => + logWarning(s"Failed to inspect Iceberg changelog schema history for $scan.", t) + return None + } + assert(!renameOrDrop.nested, "Nested Iceberg rename or drop is not supported.") + + val missingFieldIds = + fileSchema.fields.filterNot(field => fieldIdsByName.contains(field.name)).map(_.name) + assert( + missingFieldIds.isEmpty, + s"Missing Iceberg field ids for columns: ${missingFieldIds.mkString(", ")}") + val partitions = inputPartitions(exec) if (partitions.isEmpty) { return Some( @@ -221,7 +246,7 @@ object IcebergScanSupport extends Logging { fileSchema, partitionSchema, Seq.empty, - Map.empty)) + fieldIdsByName)) } val icebergPartitions = partitions.flatMap(icebergPartition) @@ -256,7 +281,9 @@ object IcebergScanSupport extends Logging { } val format = formats.headOption.getOrElse(FileFormat.PARQUET) - if (format != FileFormat.PARQUET && format != FileFormat.ORC) { + val supportedFormat = + format == FileFormat.PARQUET || (format == FileFormat.ORC && !renameOrDrop.topLevel) + if (!supportedFormat) { return None } @@ -270,7 +297,7 @@ object IcebergScanSupport extends Logging { fileSchema, partitionSchema, pruningPredicates, - Map.empty)) + fieldIdsByName)) } private def supportedSchemas( diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 1142b6e03..77ab99d69 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -475,6 +475,35 @@ class AuronIcebergIntegrationSuite } } + test("iceberg changelog scan reads renamed columns by field id") { + withTable("local.db.t_changelog_rename") { + withTempView("t_changelog_rename_changes") { + sql(""" + |create table local.db.t_changelog_rename (id int, old_name string) + |using iceberg + |tblproperties ('format-version' = '2') + |""".stripMargin) + sql("insert into local.db.t_changelog_rename values (0, 'initial')") + val startSnapshotId = currentSnapshotId("local.db.t_changelog_rename") + sql("insert into local.db.t_changelog_rename values (1, 'before')") + sql("alter table local.db.t_changelog_rename rename column old_name to new_name") + sql("insert into local.db.t_changelog_rename values (2, 'after')") + val endSnapshotId = currentSnapshotId("local.db.t_changelog_rename") + createChangelogView( + "local.db.t_changelog_rename", + "t_changelog_rename_changes", + startSnapshotId, + endSnapshotId) + + checkSparkAnswerAndOperator(""" + |select id, new_name, _change_type, _change_ordinal, _commit_snapshot_id + |from t_changelog_rename_changes + |order by id + |""".stripMargin) + } + } + } + test("iceberg changelog scan falls back when delete changes exist") { withTable("local.db.t_changelog_delete") { withTempView("t_changelog_delete_changes") { From cebbb719d751c0fc9367602ac9a21789363a66b6 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Thu, 2 Jul 2026 13:44:18 +0800 Subject: [PATCH 2/3] enhance iceberg suites --- .../AuronIcebergIntegrationSuite.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 77ab99d69..093712c4c 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -504,6 +504,36 @@ class AuronIcebergIntegrationSuite } } + test("iceberg changelog scan does not reuse dropped field id for an added column") { + withTable("local.db.t_changelog_drop_add") { + withTempView("t_changelog_drop_add_changes") { + sql(""" + |create table local.db.t_changelog_drop_add (id int, value string) + |using iceberg + |tblproperties ('format-version' = '2') + |""".stripMargin) + sql("insert into local.db.t_changelog_drop_add values (0, 'initial')") + val startSnapshotId = currentSnapshotId("local.db.t_changelog_drop_add") + sql("insert into local.db.t_changelog_drop_add values (1, 'old')") + sql("alter table local.db.t_changelog_drop_add drop column value") + sql("alter table local.db.t_changelog_drop_add add column value string") + sql("insert into local.db.t_changelog_drop_add values (2, 'new')") + val endSnapshotId = currentSnapshotId("local.db.t_changelog_drop_add") + createChangelogView( + "local.db.t_changelog_drop_add", + "t_changelog_drop_add_changes", + startSnapshotId, + endSnapshotId) + + checkSparkAnswerAndOperator(""" + |select id, value, _change_type, _change_ordinal, _commit_snapshot_id + |from t_changelog_drop_add_changes + |order by id + |""".stripMargin) + } + } + } + test("iceberg changelog scan falls back when delete changes exist") { withTable("local.db.t_changelog_delete") { withTempView("t_changelog_delete_changes") { From 276bfb9aa8b08906521923107b4876f2849e932f Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Thu, 2 Jul 2026 13:49:56 +0800 Subject: [PATCH 3/3] lint --- .../org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 093712c4c..e6f75e3c7 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -533,7 +533,7 @@ class AuronIcebergIntegrationSuite } } } - + test("iceberg changelog scan falls back when delete changes exist") { withTable("local.db.t_changelog_delete") { withTempView("t_changelog_delete_changes") {