From 78e7bb1fa02ec5c8206ff6bbf6523718c5f66ed6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 30 Jun 2026 22:45:23 +0700 Subject: [PATCH] fix: schema conflict scenario: field exists in schema `level` with data type `float64` subsequent batch: there are multiple events in the same ingestion batch, and a field exists with different data types in the same batch eg. ``` [{ "level":false }, { "level":"false" }, { "level":0 }] ``` the ingestion fails with schema conflict error fix: apply correct data type to each value so "level":false becomes level_bool, "level":"false" becomes level_utf8 and "level":0 remains level --- src/event/format/json.rs | 7 +++- src/event/format/mod.rs | 75 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 02c9a51aa..5fd8b8f9a 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -109,7 +109,12 @@ impl EventFormat for Event { // If there are conflicts, rename the fields in JSON values let value_arr = if !conflicts.is_empty() { - rename_conflicting_fields_in_json(value_arr, &conflicts) + rename_conflicting_fields_in_json( + value_arr, + &conflicts, + stream_schema, + schema_version, + ) } else { value_arr }; diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index ae015a8e9..fbd1b4cae 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -528,9 +528,21 @@ pub fn detect_schema_conflicts( /// Renames fields in JSON values according to the provided field mapping. /// Used to resolve schema conflicts by renaming fields with conflicting types. +/// Renames conflicting JSON fields to typed sibling columns, but only for +/// records whose value is incompatible with the existing field type. +/// +/// `field_mapping` is produced by `detect_schema_conflicts` and maps an +/// original field name to its typed sibling name, e.g. `level -> level_utf8`. +/// The mapping applies per value, not blindly to every record in the batch: +/// if existing schema has `level: Float64`, then `{ "level": 30 }` stays as +/// `level`, while `{ "level": "info" }` becomes `{ "level_utf8": "info" }`. +/// This avoids routing compatible values into the sibling column and then +/// failing validation, e.g. `level_utf8` expecting `Utf8` but receiving `30`. pub fn rename_conflicting_fields_in_json( values: Vec, field_mapping: &HashMap, + existing_schema: &HashMap>, + schema_version: SchemaVersion, ) -> Vec { if field_mapping.is_empty() { return values; @@ -544,7 +556,13 @@ pub fn rename_conflicting_fields_in_json( .into_iter() .map(|(key, val)| { if let Some(new_key) = field_mapping.get(&key) { - (new_key.clone(), val) + if existing_schema.get(&key).is_some_and(|field| { + value_compatible_with_type(&val, field.data_type(), schema_version) + }) { + (key, val) + } else { + (new_key.clone(), val) + } } else { (key, val) } @@ -776,8 +794,8 @@ mod tests { #[test] fn test_rename_conflicting_fields_in_json() { let values = vec![ - json!({"body_timestamp": "2025-01-01T00:00:00Z", "message": "hello"}), - json!({"body_timestamp": "2025-01-02T00:00:00Z", "message": "world"}), + json!({"body_timestamp": "not a timestamp", "message": "hello"}), + json!({"body_timestamp": "also not a timestamp", "message": "world"}), ]; let mut field_mapping = HashMap::new(); @@ -786,7 +804,22 @@ mod tests { "body_timestamp_timestamp_ms".to_string(), ); - let renamed = rename_conflicting_fields_in_json(values, &field_mapping); + let mut existing_schema: HashMap> = HashMap::new(); + existing_schema.insert( + "body_timestamp".to_string(), + Arc::new(Field::new( + "body_timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )), + ); + + let renamed = rename_conflicting_fields_in_json( + values, + &field_mapping, + &existing_schema, + SchemaVersion::V1, + ); assert_eq!(renamed.len(), 2); assert!(renamed[0].get("body_timestamp_timestamp_ms").is_some()); @@ -799,12 +832,44 @@ mod tests { let values = vec![json!({"body_timestamp": "2025-01-01T00:00:00Z"})]; let field_mapping = HashMap::new(); - let renamed = rename_conflicting_fields_in_json(values.clone(), &field_mapping); + let existing_schema = HashMap::new(); + let renamed = rename_conflicting_fields_in_json( + values.clone(), + &field_mapping, + &existing_schema, + SchemaVersion::V1, + ); // Should return values unchanged assert_eq!(renamed, values); } + #[test] + fn test_rename_conflicting_fields_in_json_only_incompatible_records() { + let values = vec![json!({"level": 30}), json!({"level": "info"})]; + + let mut field_mapping = HashMap::new(); + field_mapping.insert("level".to_string(), "level_utf8".to_string()); + + let mut existing_schema: HashMap> = HashMap::new(); + existing_schema.insert( + "level".to_string(), + Arc::new(Field::new("level", DataType::Float64, true)), + ); + + let renamed = rename_conflicting_fields_in_json( + values, + &field_mapping, + &existing_schema, + SchemaVersion::V1, + ); + + assert_eq!(renamed[0].get("level"), Some(&json!(30))); + assert!(renamed[0].get("level_utf8").is_none()); + assert_eq!(renamed[1].get("level_utf8"), Some(&json!("info"))); + assert!(renamed[1].get("level").is_none()); + } + #[test] fn test_detect_schema_conflicts_timestamp_vs_utf8() { // Existing schema has body_timestamp as Timestamp