Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 141 additions & 13 deletions native-engine/datafusion-ext-plans/src/orc_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,33 +473,44 @@ fn collect_and_predicates(

/// Recursively collect all OR sub-conditions and flatten nested OR
/// structures.
///
/// Returns `false` if any disjunct cannot be converted. OR pushdown must be
/// all-or-nothing: a pushed predicate is only used to skip row groups whose
/// statistics prove no row can match, so it must be implied by the true filter
/// (`true_filter => pushed`). Dropping a disjunct narrows the OR into a subset
/// of the true filter, which makes the reader skip row groups that actually
/// contain matching rows. (Dropping AND conjuncts only loosens the predicate,
/// so that stays safe.)
fn collect_or_predicates(
expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
schema: &SchemaRef,
predicates: &mut Vec<Predicate>,
) {
) -> bool {
// Handle short-circuit OR expression (SCOrExpr)
if let Some(sc_or) = expr.as_any().downcast_ref::<SCOrExpr>() {
// Recursively collect OR sub-conditions from both sides
collect_or_predicates(&sc_or.left, schema, predicates);
collect_or_predicates(&sc_or.right, schema, predicates);
return;
return collect_or_predicates(&sc_or.left, schema, predicates)
&& collect_or_predicates(&sc_or.right, schema, predicates);
}

// Handle BinaryExpr with OR operator
if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
if matches!(binary.op(), Operator::Or) {
// Recursively collect OR sub-conditions from both sides
collect_or_predicates(binary.left(), schema, predicates);
collect_or_predicates(binary.right(), schema, predicates);
return;
return collect_or_predicates(binary.left(), schema, predicates)
&& collect_or_predicates(binary.right(), schema, predicates);
}
}

// Not an OR expression, convert the whole expression
// (could be AND, comparison, IS NULL, etc.)
if let Some(pred) = convert_expr_to_orc(expr, schema) {
predicates.push(pred);
// Not an OR expression, convert the whole expression as a single disjunct
// (could be AND, comparison, IS NULL, etc.). If it cannot be converted, the
// entire OR is unpushable.
match convert_expr_to_orc(expr, schema) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this still have a similar implication issue through NOT? NotExpr converts its child with convert_expr_to_orc and then wraps it in Predicate::not(...), but child conversion can be partial for AND by dropping unconvertible conjuncts. That is safe before negation, but maybe not after it.

For example, NOT(id = 1 AND id = age) could become NOT(id = 1) if id = age is not convertible. That seems narrower than the original predicate and might let ORC pruning skip row groups with valid rows.

Would it be worth making NOT conversion all-or-nothing, or adding a regression test for this shape?

Some(pred) => {
predicates.push(pred);
true
}
None => false,
}
}

Expand Down Expand Up @@ -531,7 +542,11 @@ fn convert_expr_to_orc(
// Handle top-level short-circuit OR expression (SCOrExpr)
if let Some(_sc_or) = expr.as_any().downcast_ref::<SCOrExpr>() {
let mut predicates = Vec::new();
collect_or_predicates(expr, schema, &mut predicates);
if !collect_or_predicates(expr, schema, &mut predicates) {
// an OR disjunct could not be converted: pushing a narrowed
// predicate would skip row groups holding matching rows
return None;
}

if predicates.is_empty() {
return None;
Expand Down Expand Up @@ -574,7 +589,11 @@ fn convert_expr_to_orc(
// Handle top-level OR expression (BinaryExpr with OR operator)
if matches!(binary.op(), Operator::Or) {
let mut predicates = Vec::new();
collect_or_predicates(expr, schema, &mut predicates);
if !collect_or_predicates(expr, schema, &mut predicates) {
// an OR disjunct could not be converted: pushing a narrowed
// predicate would skip row groups holding matching rows
return None;
}

if predicates.is_empty() {
return None;
Expand Down Expand Up @@ -1122,6 +1141,115 @@ mod tests {
assert_eq!(condition_count, 3);
}

#[test]
fn test_or_with_unconvertible_disjunct_not_pushed() {
let schema = create_test_schema();
// id = 1 OR (id = age)
// The second disjunct compares two columns and cannot be converted.
// The whole OR must NOT push down a narrowed predicate, otherwise the
// reader would skip row groups that only satisfy the dropped disjunct.
let id = Arc::new(Column::new("id", 0));
let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
let conv = Arc::new(BinaryExpr::new(id.clone(), Operator::Eq, lit1));

let age = Arc::new(Column::new("age", 2));
let unconv = Arc::new(BinaryExpr::new(id, Operator::Eq, age));

let or_expr = Arc::new(BinaryExpr::new(conv, Operator::Or, unconv));

let result = convert_predicate_to_orc(Some(or_expr), &schema);
assert!(
result.is_none(),
"OR with an unconvertible disjunct must not push down, got: {result:?}"
);
}

#[test]
fn test_or_with_unconvertible_and_branch_not_pushed() {
let schema = create_test_schema();
// (id = age AND age = score) OR (id = 2)
// The first disjunct is an AND of two column-column comparisons, neither
// of which converts, so the AND yields no predicate. That disjunct must
// poison the whole OR rather than being silently dropped. This mirrors
// the production bug where `cast(type)=2 AND cast(gjo)=1` was dropped.
let id = Arc::new(Column::new("id", 0));
let age = Arc::new(Column::new("age", 2));
let id_eq_age = Arc::new(BinaryExpr::new(id.clone(), Operator::Eq, age.clone()));
let age_eq_score = Arc::new(BinaryExpr::new(
age,
Operator::Eq,
Arc::new(Column::new("score", 3)),
));
let and_branch = Arc::new(BinaryExpr::new(id_eq_age, Operator::And, age_eq_score));

let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
let id_eq_2 = Arc::new(BinaryExpr::new(id, Operator::Eq, lit2));

let or_expr = Arc::new(BinaryExpr::new(and_branch, Operator::Or, id_eq_2));

let result = convert_predicate_to_orc(Some(or_expr), &schema);
assert!(
result.is_none(),
"OR whose disjunct is a fully-unconvertible AND must not push down, got: {result:?}"
);
}

#[test]
fn test_and_keeps_convertible_conjunct_when_or_unconvertible() {
let schema = create_test_schema();
// name = "x" AND (id = 1 OR id = age)
// The OR is unconvertible, but it is an AND conjunct. Dropping it only
// loosens the pushed predicate, so the convertible name = "x" conjunct
// must still push down.
let name = Arc::new(Column::new("name", 1));
let name_lit = Arc::new(Literal::new(ScalarValue::Utf8(Some("x".to_string()))));
let name_eq = Arc::new(BinaryExpr::new(name, Operator::Eq, name_lit));

let id = Arc::new(Column::new("id", 0));
let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
let id_eq_1 = Arc::new(BinaryExpr::new(id.clone(), Operator::Eq, lit1));
let age = Arc::new(Column::new("age", 2));
let id_eq_age = Arc::new(BinaryExpr::new(id, Operator::Eq, age));
let or_expr = Arc::new(BinaryExpr::new(id_eq_1, Operator::Or, id_eq_age));

let and_expr = Arc::new(BinaryExpr::new(name_eq, Operator::And, or_expr));

let result = convert_predicate_to_orc(Some(and_expr), &schema);
assert!(result.is_some());
let debug_str = format!("{:?}", result.expect("Expected valid ORC predicate"));
// Only the name = "x" conjunct survives; the unconvertible OR is dropped.
assert!(
debug_str.contains("\"name\"") && debug_str.contains("Equal"),
"Expected name = \"x\" to push down, got: {debug_str}"
);
assert!(
!debug_str.contains("Or("),
"Unconvertible OR must not appear in the pushed predicate, got: {debug_str}"
);
}

#[test]
fn test_sc_or_with_unconvertible_disjunct_not_pushed() {
let schema = create_test_schema();
// Short-circuit OR: id = 1 OR (id = age)
// Same invariant as the BinaryExpr::Or case, but exercising the
// SCOrExpr path that the fix also updates.
let id = Arc::new(Column::new("id", 0));
let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
let conv = Arc::new(BinaryExpr::new(id.clone(), Operator::Eq, lit1));

let age = Arc::new(Column::new("age", 2));
let unconv = Arc::new(BinaryExpr::new(id, Operator::Eq, age));

let sc_or_expr = Arc::new(SCOrExpr::new(conv, unconv));

let result = convert_predicate_to_orc(Some(sc_or_expr), &schema);
assert!(
result.is_none(),
"SCOrExpr with an unconvertible disjunct must not push down, got: {result:?}"
);
}

#[test]
fn test_complex_mixed_predicates() {
let schema = create_test_schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,4 +1137,17 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
}
}
}

test("test OR pushdown with an unconvertible disjunct for orc table") {
withTable("orc_or") {
sql("create table orc_or(id int, b string) using orc")
// enough rows so `id` statistics differ across row groups and pruning kicks in
sql("insert into orc_or select cast(id as int), cast(id as string) from range(0, 1000000)")
// `b = 900000` (string col vs int literal) -> cast(b as double)=2.0 -> not convertible
// `id = 5` -> convertible
// OR drops the b-branch -> pushes only `id = 5` -> row groups without id=5 are skipped,
// losing the row where b='900000'
checkSparkAnswerAndOperator("select * from orc_or where id = 5 or b = 900000")
}
}
}
Loading