Skip to content

[spark] Split non-partition and partition predicates from pushPredicates to limit pushdown#3397

Open
Yohahaha wants to merge 4 commits into
apache:mainfrom
Yohahaha:spark-pushdown-partition-filter
Open

[spark] Split non-partition and partition predicates from pushPredicates to limit pushdown#3397
Yohahaha wants to merge 4 commits into
apache:mainfrom
Yohahaha:spark-pushdown-partition-filter

Conversation

@Yohahaha
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #xxx

Fix a bug where LIMIT is never pushed down when a query has partition filters (e.g., WHERE dt = 'x' LIMIT 1). The root cause is that pushPredicates returns all predicates including partition ones, causing Spark to keep a Filter node. Spark's pushDownLimit only invokes pushLimit when there are no filters (PhysicalOperation(_, Nil, ...)), so the Filter node blocks limit pushdown.

Brief change log

  • SparkPartitionPredicate.scala: Refactor extract() to return nonPartitionPredicates and partitionPredicate.
  • FlussScanBuilder.scala: Return only non-partition predicates from pushPredicates() across all scan builders (partition filters, ARROW filters, lake filters).

Tests

API and Format

Documentation

@Yohahaha
Copy link
Copy Markdown
Contributor Author

@fresh-borzoni @luoyuxia @YannByron PTAL!

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yohahaha Ty for the PR and nice catch, LGTM overall.
A couple of minor comments, PTAL

val (partitionPredicates, nonPartitionPredicates) = predicates.partition {
sparkPredicate =>
SparkPredicateConverter
.convert(rowType, sparkPredicate)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small thing: we call convert(...) here to classify the predicate, then call it again right below to actually use it. Could we convert once and reuse the result?

Something like mapping each predicate to its converted form, then partition-ing on isDefined. Only runs at planning time so not a big deal, just avoids doing the same work twice.

def flussConfig: FlussConfiguration

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
val nonPartitionPredicates = super.pushPredicates(predicates)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the lake path calls super.pushPredicates, which for ARROW tables runs convertAndStorePredicates and sets pushedPredicate/acceptedPredicates, then we overwrite both a few lines down.
So that work gets thrown away every time.

Could we pull the partition-extraction bit into a small helper and call that instead of super, so we skip the ARROW step we don't use?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a small refactor on filter pushdown. Lake and non-lake paths can now implement
pushdown logic freely. The common partition pushdown logic is in the parent class.

You can see the new interface and class inheritance structure below

SupportsPushDownV2Filters (Spark)
│
└── FlussSupportsPushDownPartitionFilters
    │
    ├── FlussSupportsPushDownV2Filters
    │   │
    │   ├── FlussAppendScanBuilder
    │   └── FlussUpsertScanBuilder
    │
    └── FlussLakeSupportsPushDownV2Filters
        │
        ├── FlussLakeAppendScanBuilder
        └── FlussLakeUpsertScanBuilder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants