-
Notifications
You must be signed in to change notification settings - Fork 228
[AURON #2375] Fix Iceberg changelog scan field-id projection #2376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -211,6 +211,31 @@ object IcebergScanSupport extends Logging { | |
| } | ||
| val (fileSchema, partitionSchema) = schemas.get | ||
|
|
||
| val fieldIdsByName = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block — field-id extraction, rename/drop detection, the two asserts, and the |
||
| try { | ||
| AuronIcebergSourceUtil.expectedFieldIdsForChangelogScan(scan.asInstanceOf[AnyRef]) | ||
| } catch { | ||
| case NonFatal(t) => | ||
| logWarning(s"Failed to inspect Iceberg changelog field ids for $scan.", t) | ||
| return None | ||
| } | ||
|
|
||
| val renameOrDrop = | ||
| try { | ||
| AuronIcebergSourceUtil.detectRenameOrDropForChangelogScan(scan.asInstanceOf[AnyRef]) | ||
| } catch { | ||
| case NonFatal(t) => | ||
| logWarning(s"Failed to inspect Iceberg changelog schema history for $scan.", t) | ||
| return None | ||
| } | ||
| assert(!renameOrDrop.nested, "Nested Iceberg rename or drop is not supported.") | ||
|
|
||
| val missingFieldIds = | ||
| fileSchema.fields.filterNot(field => fieldIdsByName.contains(field.name)).map(_.name) | ||
| assert( | ||
| missingFieldIds.isEmpty, | ||
| s"Missing Iceberg field ids for columns: ${missingFieldIds.mkString(", ")}") | ||
|
|
||
| val partitions = inputPartitions(exec) | ||
| if (partitions.isEmpty) { | ||
| return Some( | ||
|
|
@@ -221,7 +246,7 @@ object IcebergScanSupport extends Logging { | |
| fileSchema, | ||
| partitionSchema, | ||
| Seq.empty, | ||
| Map.empty)) | ||
| fieldIdsByName)) | ||
| } | ||
|
|
||
| val icebergPartitions = partitions.flatMap(icebergPartition) | ||
|
|
@@ -256,7 +281,9 @@ object IcebergScanSupport extends Logging { | |
| } | ||
|
|
||
| val format = formats.headOption.getOrElse(FileFormat.PARQUET) | ||
| if (format != FileFormat.PARQUET && format != FileFormat.ORC) { | ||
| val supportedFormat = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small consistency thing: the file-scan copy of this |
||
| format == FileFormat.PARQUET || (format == FileFormat.ORC && !renameOrDrop.topLevel) | ||
| if (!supportedFormat) { | ||
| return None | ||
| } | ||
|
|
||
|
|
@@ -270,7 +297,7 @@ object IcebergScanSupport extends Logging { | |
| fileSchema, | ||
| partitionSchema, | ||
| pruningPredicates, | ||
| Map.empty)) | ||
| fieldIdsByName)) | ||
| } | ||
|
|
||
| private def supportedSchemas( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file-scan path reads the schema and table through the public
expectedSchema()/table()methods (:38,:49), while the changelog path reaches them by string-keyed reflection intoSparkChangelogScan(FieldUtils.readField(scan, "expectedSchema", true)here and"table"at:54). Since those are Iceberg-internal field names with no compile-time check, an Iceberg version that renames or restructures the field would slip through silently. The caller does guard both reads withtry/NonFatal → return None(IcebergScanSupport.scala:214-230), so the worst case is a quiet fallback rather than a crash — which is a good safety net. DoesSparkChangelogScanexpose any public accessor we could use instead, or is reflection the only door in? If it's the only option, would a one-line note on the field-name assumption help whoever does the next Iceberg bump?