diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 8f78e3427..2e7b55096 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -473,33 +473,44 @@ fn collect_and_predicates( /// Recursively collect all OR sub-conditions and flatten nested OR /// structures. +/// +/// Returns `false` if any disjunct cannot be converted. OR pushdown must be +/// all-or-nothing: a pushed predicate is only used to skip row groups whose +/// statistics prove no row can match, so it must be implied by the true filter +/// (`true_filter => pushed`). Dropping a disjunct narrows the OR into a subset +/// of the true filter, which makes the reader skip row groups that actually +/// contain matching rows. (Dropping AND conjuncts only loosens the predicate, +/// so that stays safe.) fn collect_or_predicates( expr: &Arc, schema: &SchemaRef, predicates: &mut Vec, -) { +) -> bool { // Handle short-circuit OR expression (SCOrExpr) if let Some(sc_or) = expr.as_any().downcast_ref::() { // Recursively collect OR sub-conditions from both sides - collect_or_predicates(&sc_or.left, schema, predicates); - collect_or_predicates(&sc_or.right, schema, predicates); - return; + return collect_or_predicates(&sc_or.left, schema, predicates) + && collect_or_predicates(&sc_or.right, schema, predicates); } // Handle BinaryExpr with OR operator if let Some(binary) = expr.as_any().downcast_ref::() { if matches!(binary.op(), Operator::Or) { // Recursively collect OR sub-conditions from both sides - collect_or_predicates(binary.left(), schema, predicates); - collect_or_predicates(binary.right(), schema, predicates); - return; + return collect_or_predicates(binary.left(), schema, predicates) + && collect_or_predicates(binary.right(), schema, predicates); } } - // Not an OR expression, convert the whole expression - // (could be AND, comparison, IS NULL, etc.) - if let Some(pred) = convert_expr_to_orc(expr, schema) { - predicates.push(pred); + // Not an OR expression, convert the whole expression as a single disjunct + // (could be AND, comparison, IS NULL, etc.). If it cannot be converted, the + // entire OR is unpushable. + match convert_expr_to_orc(expr, schema) { + Some(pred) => { + predicates.push(pred); + true + } + None => false, } } @@ -531,7 +542,11 @@ fn convert_expr_to_orc( // Handle top-level short-circuit OR expression (SCOrExpr) if let Some(_sc_or) = expr.as_any().downcast_ref::() { let mut predicates = Vec::new(); - collect_or_predicates(expr, schema, &mut predicates); + if !collect_or_predicates(expr, schema, &mut predicates) { + // an OR disjunct could not be converted: pushing a narrowed + // predicate would skip row groups holding matching rows + return None; + } if predicates.is_empty() { return None; @@ -574,7 +589,11 @@ fn convert_expr_to_orc( // Handle top-level OR expression (BinaryExpr with OR operator) if matches!(binary.op(), Operator::Or) { let mut predicates = Vec::new(); - collect_or_predicates(expr, schema, &mut predicates); + if !collect_or_predicates(expr, schema, &mut predicates) { + // an OR disjunct could not be converted: pushing a narrowed + // predicate would skip row groups holding matching rows + return None; + } if predicates.is_empty() { return None; @@ -1122,6 +1141,115 @@ mod tests { assert_eq!(condition_count, 3); } + #[test] + fn test_or_with_unconvertible_disjunct_not_pushed() { + let schema = create_test_schema(); + // id = 1 OR (id = age) + // The second disjunct compares two columns and cannot be converted. + // The whole OR must NOT push down a narrowed predicate, otherwise the + // reader would skip row groups that only satisfy the dropped disjunct. + let id = Arc::new(Column::new("id", 0)); + let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1)))); + let conv = Arc::new(BinaryExpr::new(id.clone(), Operator::Eq, lit1)); + + let age = Arc::new(Column::new("age", 2)); + let unconv = Arc::new(BinaryExpr::new(id, Operator::Eq, age)); + + let or_expr = Arc::new(BinaryExpr::new(conv, Operator::Or, unconv)); + + let result = convert_predicate_to_orc(Some(or_expr), &schema); + assert!( + result.is_none(), + "OR with an unconvertible disjunct must not push down, got: {result:?}" + ); + } + + #[test] + fn test_or_with_unconvertible_and_branch_not_pushed() { + let schema = create_test_schema(); + // (id = age AND age = score) OR (id = 2) + // The first disjunct is an AND of two column-column comparisons, neither + // of which converts, so the AND yields no predicate. That disjunct must + // poison the whole OR rather than being silently dropped. This mirrors + // the production bug where `cast(type)=2 AND cast(gjo)=1` was dropped. + let id = Arc::new(Column::new("id", 0)); + let age = Arc::new(Column::new("age", 2)); + let id_eq_age = Arc::new(BinaryExpr::new(id.clone(), Operator::Eq, age.clone())); + let age_eq_score = Arc::new(BinaryExpr::new( + age, + Operator::Eq, + Arc::new(Column::new("score", 3)), + )); + let and_branch = Arc::new(BinaryExpr::new(id_eq_age, Operator::And, age_eq_score)); + + let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2)))); + let id_eq_2 = Arc::new(BinaryExpr::new(id, Operator::Eq, lit2)); + + let or_expr = Arc::new(BinaryExpr::new(and_branch, Operator::Or, id_eq_2)); + + let result = convert_predicate_to_orc(Some(or_expr), &schema); + assert!( + result.is_none(), + "OR whose disjunct is a fully-unconvertible AND must not push down, got: {result:?}" + ); + } + + #[test] + fn test_and_keeps_convertible_conjunct_when_or_unconvertible() { + let schema = create_test_schema(); + // name = "x" AND (id = 1 OR id = age) + // The OR is unconvertible, but it is an AND conjunct. Dropping it only + // loosens the pushed predicate, so the convertible name = "x" conjunct + // must still push down. + let name = Arc::new(Column::new("name", 1)); + let name_lit = Arc::new(Literal::new(ScalarValue::Utf8(Some("x".to_string())))); + let name_eq = Arc::new(BinaryExpr::new(name, Operator::Eq, name_lit)); + + let id = Arc::new(Column::new("id", 0)); + let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1)))); + let id_eq_1 = Arc::new(BinaryExpr::new(id.clone(), Operator::Eq, lit1)); + let age = Arc::new(Column::new("age", 2)); + let id_eq_age = Arc::new(BinaryExpr::new(id, Operator::Eq, age)); + let or_expr = Arc::new(BinaryExpr::new(id_eq_1, Operator::Or, id_eq_age)); + + let and_expr = Arc::new(BinaryExpr::new(name_eq, Operator::And, or_expr)); + + let result = convert_predicate_to_orc(Some(and_expr), &schema); + assert!(result.is_some()); + let debug_str = format!("{:?}", result.expect("Expected valid ORC predicate")); + // Only the name = "x" conjunct survives; the unconvertible OR is dropped. + assert!( + debug_str.contains("\"name\"") && debug_str.contains("Equal"), + "Expected name = \"x\" to push down, got: {debug_str}" + ); + assert!( + !debug_str.contains("Or("), + "Unconvertible OR must not appear in the pushed predicate, got: {debug_str}" + ); + } + + #[test] + fn test_sc_or_with_unconvertible_disjunct_not_pushed() { + let schema = create_test_schema(); + // Short-circuit OR: id = 1 OR (id = age) + // Same invariant as the BinaryExpr::Or case, but exercising the + // SCOrExpr path that the fix also updates. + let id = Arc::new(Column::new("id", 0)); + let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1)))); + let conv = Arc::new(BinaryExpr::new(id.clone(), Operator::Eq, lit1)); + + let age = Arc::new(Column::new("age", 2)); + let unconv = Arc::new(BinaryExpr::new(id, Operator::Eq, age)); + + let sc_or_expr = Arc::new(SCOrExpr::new(conv, unconv)); + + let result = convert_predicate_to_orc(Some(sc_or_expr), &schema); + assert!( + result.is_none(), + "SCOrExpr with an unconvertible disjunct must not push down, got: {result:?}" + ); + } + #[test] fn test_complex_mixed_predicates() { let schema = create_test_schema(); diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala index 0a34dedf0..9a2a5622c 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala @@ -1137,4 +1137,17 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite { } } } + + test("test OR pushdown with an unconvertible disjunct for orc table") { + withTable("orc_or") { + sql("create table orc_or(id int, b string) using orc") + // enough rows so `id` statistics differ across row groups and pruning kicks in + sql("insert into orc_or select cast(id as int), cast(id as string) from range(0, 1000000)") + // `b = 900000` (string col vs int literal) -> cast(b as double)=2.0 -> not convertible + // `id = 5` -> convertible + // OR drops the b-branch -> pushes only `id = 5` -> row groups without id=5 are skipped, + // losing the row where b='900000' + checkSparkAnswerAndOperator("select * from orc_or where id = 5 or b = 900000") + } + } }