Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.iceberg.spark.source

import scala.collection.JavaConverters._

import org.apache.commons.lang3.reflect.FieldUtils
import org.apache.iceberg.Table
import org.apache.iceberg.types.TypeUtil

object AuronIcebergSourceUtil {
Expand All @@ -37,8 +39,23 @@ object AuronIcebergSourceUtil {
expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap
}

def expectedFieldIdsForChangelogScan(scan: AnyRef): Map[String, Int] = {
val expectedSchema =
FieldUtils.readField(scan, "expectedSchema", true).asInstanceOf[org.apache.iceberg.Schema]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file-scan path reads the schema and table through the public expectedSchema() / table() methods (:38, :49), while the changelog path reaches them by string-keyed reflection into SparkChangelogScan (FieldUtils.readField(scan, "expectedSchema", true) here and "table" at :54). Since those are Iceberg-internal field names with no compile-time check, an Iceberg version that renames or restructures the field would slip through silently. The caller does guard both reads with try/NonFatal → return None (IcebergScanSupport.scala:214-230), so the worst case is a quiet fallback rather than a crash — which is a good safety net. Does SparkChangelogScan expose any public accessor we could use instead, or is reflection the only door in? If it's the only option, would a one-line note on the field-name assumption help whoever does the next Iceberg bump?

expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap
}

def detectRenameOrDrop(scan: AnyRef): RenameOrDrop = {
val table = asBatchQueryScan(scan).table()
detectRenameOrDrop(table)
}

def detectRenameOrDropForChangelogScan(scan: AnyRef): RenameOrDrop = {
val table = FieldUtils.readField(scan, "table", true).asInstanceOf[Table]
detectRenameOrDrop(table)
}

private def detectRenameOrDrop(table: Table): RenameOrDrop = {
val currentFields = collectFieldIdToName(table.schema())

table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@ object IcebergScanSupport extends Logging {
}
val (fileSchema, partitionSchema) = schemas.get

val fieldIdsByName =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block — field-id extraction, rename/drop detection, the two asserts, and the supportedFormat line at :284 — now repeats about 25 lines from planFileScan (:121-144, :187-191), differing only in which util method each calls. Worth noting this bug itself came from the two paths drifting apart: the changelog path shipped with Map.empty while the file-scan path already passed field-ids. Would it be worth factoring the shared portion into one helper parameterized by the two scan → … lookups, so a future field-id change can't miss one path again? Genuinely open — if you'd rather keep the two paths explicit for readability, that's a reasonable call too.

try {
AuronIcebergSourceUtil.expectedFieldIdsForChangelogScan(scan.asInstanceOf[AnyRef])
} catch {
case NonFatal(t) =>
logWarning(s"Failed to inspect Iceberg changelog field ids for $scan.", t)
return None
}

val renameOrDrop =
try {
AuronIcebergSourceUtil.detectRenameOrDropForChangelogScan(scan.asInstanceOf[AnyRef])
} catch {
case NonFatal(t) =>
logWarning(s"Failed to inspect Iceberg changelog schema history for $scan.", t)
return None
}
assert(!renameOrDrop.nested, "Nested Iceberg rename or drop is not supported.")

val missingFieldIds =
fileSchema.fields.filterNot(field => fieldIdsByName.contains(field.name)).map(_.name)
assert(
missingFieldIds.isEmpty,
s"Missing Iceberg field ids for columns: ${missingFieldIds.mkString(", ")}")

val partitions = inputPartitions(exec)
if (partitions.isEmpty) {
return Some(
Expand All @@ -221,7 +246,7 @@ object IcebergScanSupport extends Logging {
fileSchema,
partitionSchema,
Seq.empty,
Map.empty))
fieldIdsByName))
}

val icebergPartitions = partitions.flatMap(icebergPartition)
Expand Down Expand Up @@ -256,7 +281,9 @@ object IcebergScanSupport extends Logging {
}

val format = formats.headOption.getOrElse(FileFormat.PARQUET)
if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
val supportedFormat =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small consistency thing: the file-scan copy of this supportedFormat line has a comment just above it (:185-186) explaining why a top-level rename/drop makes older ORC files unsafe for native matching, but that comment didn't come across to the changelog copy. Worth mirroring it here so the ORC branch reads the same on both paths?

format == FileFormat.PARQUET || (format == FileFormat.ORC && !renameOrDrop.topLevel)
if (!supportedFormat) {
return None
}

Expand All @@ -270,7 +297,7 @@ object IcebergScanSupport extends Logging {
fileSchema,
partitionSchema,
pruningPredicates,
Map.empty))
fieldIdsByName))
}

private def supportedSchemas(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,65 @@ class AuronIcebergIntegrationSuite
}
}

test("iceberg changelog scan reads renamed columns by field id") {
withTable("local.db.t_changelog_rename") {
withTempView("t_changelog_rename_changes") {
sql("""
|create table local.db.t_changelog_rename (id int, old_name string)
|using iceberg
|tblproperties ('format-version' = '2')
|""".stripMargin)
sql("insert into local.db.t_changelog_rename values (0, 'initial')")
val startSnapshotId = currentSnapshotId("local.db.t_changelog_rename")
sql("insert into local.db.t_changelog_rename values (1, 'before')")
sql("alter table local.db.t_changelog_rename rename column old_name to new_name")
sql("insert into local.db.t_changelog_rename values (2, 'after')")
val endSnapshotId = currentSnapshotId("local.db.t_changelog_rename")
createChangelogView(
"local.db.t_changelog_rename",
"t_changelog_rename_changes",
startSnapshotId,
endSnapshotId)

checkSparkAnswerAndOperator("""
|select id, new_name, _change_type, _change_ordinal, _commit_snapshot_id
|from t_changelog_rename_changes
|order by id
|""".stripMargin)
}
}
}

test("iceberg changelog scan does not reuse dropped field id for an added column") {
withTable("local.db.t_changelog_drop_add") {
withTempView("t_changelog_drop_add_changes") {
sql("""
|create table local.db.t_changelog_drop_add (id int, value string)
|using iceberg
|tblproperties ('format-version' = '2')
|""".stripMargin)
sql("insert into local.db.t_changelog_drop_add values (0, 'initial')")
val startSnapshotId = currentSnapshotId("local.db.t_changelog_drop_add")
sql("insert into local.db.t_changelog_drop_add values (1, 'old')")
sql("alter table local.db.t_changelog_drop_add drop column value")
sql("alter table local.db.t_changelog_drop_add add column value string")
sql("insert into local.db.t_changelog_drop_add values (2, 'new')")
val endSnapshotId = currentSnapshotId("local.db.t_changelog_drop_add")
createChangelogView(
"local.db.t_changelog_drop_add",
"t_changelog_drop_add_changes",
startSnapshotId,
endSnapshotId)

checkSparkAnswerAndOperator("""
|select id, value, _change_type, _change_ordinal, _commit_snapshot_id
|from t_changelog_drop_add_changes
|order by id
|""".stripMargin)
}
}
}

test("iceberg changelog scan falls back when delete changes exist") {
withTable("local.db.t_changelog_delete") {
withTempView("t_changelog_delete_changes") {
Expand Down
Loading