diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala index c1d0a58d6..50520db62 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala @@ -18,6 +18,8 @@ package org.apache.iceberg.spark.source import scala.collection.JavaConverters._ +import org.apache.commons.lang3.reflect.FieldUtils +import org.apache.iceberg.Table import org.apache.iceberg.types.TypeUtil object AuronIcebergSourceUtil { @@ -37,8 +39,23 @@ object AuronIcebergSourceUtil { expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap } + def expectedFieldIdsForChangelogScan(scan: AnyRef): Map[String, Int] = { + val expectedSchema = + FieldUtils.readField(scan, "expectedSchema", true).asInstanceOf[org.apache.iceberg.Schema] + expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap + } + def detectRenameOrDrop(scan: AnyRef): RenameOrDrop = { val table = asBatchQueryScan(scan).table() + detectRenameOrDrop(table) + } + + def detectRenameOrDropForChangelogScan(scan: AnyRef): RenameOrDrop = { + val table = FieldUtils.readField(scan, "table", true).asInstanceOf[Table] + detectRenameOrDrop(table) + } + + private def detectRenameOrDrop(table: Table): RenameOrDrop = { val currentFields = collectFieldIdToName(table.schema()) table diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 3aa85b2de..6651a2743 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -211,6 +211,31 @@ object IcebergScanSupport extends Logging { } val (fileSchema, partitionSchema) = schemas.get + val fieldIdsByName = + try { + AuronIcebergSourceUtil.expectedFieldIdsForChangelogScan(scan.asInstanceOf[AnyRef]) + } catch { + case NonFatal(t) => + logWarning(s"Failed to inspect Iceberg changelog field ids for $scan.", t) + return None + } + + val renameOrDrop = + try { + AuronIcebergSourceUtil.detectRenameOrDropForChangelogScan(scan.asInstanceOf[AnyRef]) + } catch { + case NonFatal(t) => + logWarning(s"Failed to inspect Iceberg changelog schema history for $scan.", t) + return None + } + assert(!renameOrDrop.nested, "Nested Iceberg rename or drop is not supported.") + + val missingFieldIds = + fileSchema.fields.filterNot(field => fieldIdsByName.contains(field.name)).map(_.name) + assert( + missingFieldIds.isEmpty, + s"Missing Iceberg field ids for columns: ${missingFieldIds.mkString(", ")}") + val partitions = inputPartitions(exec) if (partitions.isEmpty) { return Some( @@ -221,7 +246,7 @@ object IcebergScanSupport extends Logging { fileSchema, partitionSchema, Seq.empty, - Map.empty)) + fieldIdsByName)) } val icebergPartitions = partitions.flatMap(icebergPartition) @@ -256,7 +281,9 @@ object IcebergScanSupport extends Logging { } val format = formats.headOption.getOrElse(FileFormat.PARQUET) - if (format != FileFormat.PARQUET && format != FileFormat.ORC) { + val supportedFormat = + format == FileFormat.PARQUET || (format == FileFormat.ORC && !renameOrDrop.topLevel) + if (!supportedFormat) { return None } @@ -270,7 +297,7 @@ object IcebergScanSupport extends Logging { fileSchema, partitionSchema, pruningPredicates, - Map.empty)) + fieldIdsByName)) } private def supportedSchemas( diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 1142b6e03..e6f75e3c7 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -475,6 +475,65 @@ class AuronIcebergIntegrationSuite } } + test("iceberg changelog scan reads renamed columns by field id") { + withTable("local.db.t_changelog_rename") { + withTempView("t_changelog_rename_changes") { + sql(""" + |create table local.db.t_changelog_rename (id int, old_name string) + |using iceberg + |tblproperties ('format-version' = '2') + |""".stripMargin) + sql("insert into local.db.t_changelog_rename values (0, 'initial')") + val startSnapshotId = currentSnapshotId("local.db.t_changelog_rename") + sql("insert into local.db.t_changelog_rename values (1, 'before')") + sql("alter table local.db.t_changelog_rename rename column old_name to new_name") + sql("insert into local.db.t_changelog_rename values (2, 'after')") + val endSnapshotId = currentSnapshotId("local.db.t_changelog_rename") + createChangelogView( + "local.db.t_changelog_rename", + "t_changelog_rename_changes", + startSnapshotId, + endSnapshotId) + + checkSparkAnswerAndOperator(""" + |select id, new_name, _change_type, _change_ordinal, _commit_snapshot_id + |from t_changelog_rename_changes + |order by id + |""".stripMargin) + } + } + } + + test("iceberg changelog scan does not reuse dropped field id for an added column") { + withTable("local.db.t_changelog_drop_add") { + withTempView("t_changelog_drop_add_changes") { + sql(""" + |create table local.db.t_changelog_drop_add (id int, value string) + |using iceberg + |tblproperties ('format-version' = '2') + |""".stripMargin) + sql("insert into local.db.t_changelog_drop_add values (0, 'initial')") + val startSnapshotId = currentSnapshotId("local.db.t_changelog_drop_add") + sql("insert into local.db.t_changelog_drop_add values (1, 'old')") + sql("alter table local.db.t_changelog_drop_add drop column value") + sql("alter table local.db.t_changelog_drop_add add column value string") + sql("insert into local.db.t_changelog_drop_add values (2, 'new')") + val endSnapshotId = currentSnapshotId("local.db.t_changelog_drop_add") + createChangelogView( + "local.db.t_changelog_drop_add", + "t_changelog_drop_add_changes", + startSnapshotId, + endSnapshotId) + + checkSparkAnswerAndOperator(""" + |select id, value, _change_type, _change_ordinal, _commit_snapshot_id + |from t_changelog_drop_add_changes + |order by id + |""".stripMargin) + } + } + } + test("iceberg changelog scan falls back when delete changes exist") { withTable("local.db.t_changelog_delete") { withTempView("t_changelog_delete_changes") {