Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,12 @@ impl EventFormat for Event {

// If there are conflicts, rename the fields in JSON values
let value_arr = if !conflicts.is_empty() {
rename_conflicting_fields_in_json(value_arr, &conflicts)
rename_conflicting_fields_in_json(
value_arr,
&conflicts,
stream_schema,
schema_version,
)
} else {
value_arr
};
Expand Down
75 changes: 70 additions & 5 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,21 @@ pub fn detect_schema_conflicts(

/// Renames fields in JSON values according to the provided field mapping.
/// Used to resolve schema conflicts by renaming fields with conflicting types.
/// Renames conflicting JSON fields to typed sibling columns, but only for
/// records whose value is incompatible with the existing field type.
///
/// `field_mapping` is produced by `detect_schema_conflicts` and maps an
/// original field name to its typed sibling name, e.g. `level -> level_utf8`.
/// The mapping applies per value, not blindly to every record in the batch:
/// if existing schema has `level: Float64`, then `{ "level": 30 }` stays as
/// `level`, while `{ "level": "info" }` becomes `{ "level_utf8": "info" }`.
/// This avoids routing compatible values into the sibling column and then
/// failing validation, e.g. `level_utf8` expecting `Utf8` but receiving `30`.
pub fn rename_conflicting_fields_in_json(
values: Vec<Value>,
field_mapping: &HashMap<String, String>,
existing_schema: &HashMap<String, Arc<Field>>,
schema_version: SchemaVersion,
) -> Vec<Value> {
if field_mapping.is_empty() {
return values;
Expand All @@ -544,7 +556,13 @@ pub fn rename_conflicting_fields_in_json(
.into_iter()
.map(|(key, val)| {
if let Some(new_key) = field_mapping.get(&key) {
(new_key.clone(), val)
if existing_schema.get(&key).is_some_and(|field| {
value_compatible_with_type(&val, field.data_type(), schema_version)
}) {
(key, val)
} else {
(new_key.clone(), val)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} else {
(key, val)
}
Expand Down Expand Up @@ -776,8 +794,8 @@ mod tests {
#[test]
fn test_rename_conflicting_fields_in_json() {
let values = vec![
json!({"body_timestamp": "2025-01-01T00:00:00Z", "message": "hello"}),
json!({"body_timestamp": "2025-01-02T00:00:00Z", "message": "world"}),
json!({"body_timestamp": "not a timestamp", "message": "hello"}),
json!({"body_timestamp": "also not a timestamp", "message": "world"}),
];

let mut field_mapping = HashMap::new();
Expand All @@ -786,7 +804,22 @@ mod tests {
"body_timestamp_timestamp_ms".to_string(),
);

let renamed = rename_conflicting_fields_in_json(values, &field_mapping);
let mut existing_schema: HashMap<String, Arc<Field>> = HashMap::new();
existing_schema.insert(
"body_timestamp".to_string(),
Arc::new(Field::new(
"body_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

let renamed = rename_conflicting_fields_in_json(
values,
&field_mapping,
&existing_schema,
SchemaVersion::V1,
);

assert_eq!(renamed.len(), 2);
assert!(renamed[0].get("body_timestamp_timestamp_ms").is_some());
Expand All @@ -799,12 +832,44 @@ mod tests {
let values = vec![json!({"body_timestamp": "2025-01-01T00:00:00Z"})];

let field_mapping = HashMap::new();
let renamed = rename_conflicting_fields_in_json(values.clone(), &field_mapping);
let existing_schema = HashMap::new();
let renamed = rename_conflicting_fields_in_json(
values.clone(),
&field_mapping,
&existing_schema,
SchemaVersion::V1,
);

// Should return values unchanged
assert_eq!(renamed, values);
}

#[test]
fn test_rename_conflicting_fields_in_json_only_incompatible_records() {
let values = vec![json!({"level": 30}), json!({"level": "info"})];

let mut field_mapping = HashMap::new();
field_mapping.insert("level".to_string(), "level_utf8".to_string());

let mut existing_schema: HashMap<String, Arc<Field>> = HashMap::new();
existing_schema.insert(
"level".to_string(),
Arc::new(Field::new("level", DataType::Float64, true)),
);

let renamed = rename_conflicting_fields_in_json(
values,
&field_mapping,
&existing_schema,
SchemaVersion::V1,
);

assert_eq!(renamed[0].get("level"), Some(&json!(30)));
assert!(renamed[0].get("level_utf8").is_none());
assert_eq!(renamed[1].get("level_utf8"), Some(&json!("info")));
assert!(renamed[1].get("level").is_none());
}

#[test]
fn test_detect_schema_conflicts_timestamp_vs_utf8() {
// Existing schema has body_timestamp as Timestamp
Expand Down
Loading