Skip to content

Conversation

@Shekharrajak
Copy link
Contributor

Ref #3053

Rationale for this change

Dynamic Partition Pruning (DPP) was previously disabled for native_datafusion scan due to subquery handling concerns. However, Spark's DPP mechanism already evaluates dynamic filters and provides pre-filtered partition lists via dynamicallySelectedPartitions. This PR enables DPP support, achieving up to 301x I/O reduction by leveraging Spark's existing DPP infrastructure with native scan.

What changes are included in this PR?

  • Remove DPP fallback in CometNativeScan.isSupported() - native scan now accepts DPP queries
  • Add DPP auto-selection in CometScanRule.selectScan() - automatically selects native_datafusion for queries with dynamic pruning filters

How are these changes tested?

Unit tests

Add CometDPPSuite - test suite validating DPP with native scan
Add CometDPPBenchmark - benchmark showing I/O reduction metrics

Implementation                    numOutputRows       Reduction Factor
--------------------------------------------------------------------------------
Spark (baseline)                       15,781,140       1.0x
Comet (auto scan)                       5,295,380       3.0x
Comet (native_datafusion + DPP)            52,500       301x

Run the benchmark using

SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometDPPBenchmark

# check the result: cat spark/benchmarks/CometDPPBenchmark-jdk17-results.txt
Screenshot 2026-01-10 at 1 25 43 AM

@Shekharrajak
Copy link
Contributor Author

Shekharrajak commented Jan 9, 2026

Query Plans (selectivity=0.01)


--- Spark (baseline) ---
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) Project [id#9L, dim_key#10L, value#11, name#22]
   +- *(2) BroadcastHashJoin [dim_key#10L], [id#21L], Inner, BuildRight, false
      :- *(2) Filter isnotnull(dim_key#10L)
      :  +- *(2) ColumnarToRow
      :     +- FileScan parquet [id#9L,dim_key#10L,value#11] Batched: true, DataFilters: [isnotnull(dim_key#10L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
      +- BroadcastQueryStage 0
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=108]
            +- *(1) Filter isnotnull(id#21L)
               +- *(1) ColumnarToRow
                  +- FileScan parquet [id#21L,name#22] Batched: true, DataFilters: [isnotnull(id#21L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
+- == Initial Plan ==
   Project [id#9L, dim_key#10L, value#11, name#22]
   +- BroadcastHashJoin [dim_key#10L], [id#21L], Inner, BuildRight, false
      :- Filter isnotnull(dim_key#10L)
      :  +- FileScan parquet [id#9L,dim_key#10L,value#11] Batched: true, DataFilters: [isnotnull(dim_key#10L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=94]
         +- Filter isnotnull(id#21L)
            +- FileScan parquet [id#21L,name#22] Batched: true, DataFilters: [isnotnull(id#21L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>


--- Comet (auto scan) ---
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) CometColumnarToRow
   +- CometProject [id#9L, dim_key#10L, value#11, name#22], [id#9L, dim_key#10L, value#11, name#22]
      +- CometBroadcastHashJoin [dim_key#10L], [id#21L], Inner, BuildRight
         :- CometFilter [id#9L, dim_key#10L, value#11], isnotnull(dim_key#10L)
         :  +- CometScan [native_iceberg_compat] parquet [id#9L,dim_key#10L,value#11] Batched: true, DataFilters: [isnotnull(dim_key#10L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
         +- BroadcastQueryStage 0
            +- CometBroadcastExchange [id#21L, name#22]
               +- CometFilter [id#21L, name#22], isnotnull(id#21L)
                  +- CometScan [native_iceberg_compat] parquet [id#21L,name#22] Batched: true, DataFilters: [isnotnull(id#21L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
+- == Initial Plan ==
   CometProject [id#9L, dim_key#10L, value#11, name#22], [id#9L, dim_key#10L, value#11, name#22]
   +- CometBroadcastHashJoin [dim_key#10L], [id#21L], Inner, BuildRight
      :- CometFilter [id#9L, dim_key#10L, value#11], isnotnull(dim_key#10L)
      :  +- CometScan [native_iceberg_compat] parquet [id#9L,dim_key#10L,value#11] Batched: true, DataFilters: [isnotnull(dim_key#10L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
      +- CometBroadcastExchange [id#21L, name#22]
         +- CometFilter [id#21L, name#22], isnotnull(id#21L)
            +- CometScan [native_iceberg_compat] parquet [id#21L,name#22] Batched: true, DataFilters: [isnotnull(id#21L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>


--- Comet (native_datafusion + DPP) ---
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) CometColumnarToRow
   +- CometProject [id#9L, dim_key#10L, value#11, name#22], [id#9L, dim_key#10L, value#11, name#22]
      +- CometBroadcastHashJoin [dim_key#10L], [id#21L], Inner, BuildRight
         :- CometFilter [id#9L, dim_key#10L, value#11], isnotnull(dim_key#10L)
         :  +- CometNativeScan parquet [id#9L,dim_key#10L,value#11] Batched: true, DataFilters: [isnotnull(dim_key#10L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
         +- BroadcastQueryStage 0
            +- CometBroadcastExchange [id#21L, name#22]
               +- CometFilter [id#21L, name#22], isnotnull(id#21L)
                  +- CometNativeScan parquet [id#21L,name#22] Batched: true, DataFilters: [isnotnull(id#21L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
+- == Initial Plan ==
   CometProject [id#9L, dim_key#10L, value#11, name#22], [id#9L, dim_key#10L, value#11, name#22]
   +- CometBroadcastHashJoin [dim_key#10L], [id#21L], Inner, BuildRight
      :- CometFilter [id#9L, dim_key#10L, value#11], isnotnull(dim_key#10L)
      :  +- CometNativeScan parquet [id#9L,dim_key#10L,value#11] Batched: true, DataFilters: [isnotnull(dim_key#10L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
      +- CometBroadcastExchange [id#21L, name#22]
         +- CometFilter [id#21L, name#22], isnotnull(id#21L)
            +- CometNativeScan parquet [id#21L,name#22] Batched: true, DataFilters: [isnotnull(id#21L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-b9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>


@Shekharrajak
Copy link
Contributor Author

Shekharrajak commented Jan 9, 2026

  1. Query Plan (selectivity=0.1) :

--- Spark (baseline) ---
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) Project [id#61L, dim_key#62L, value#63, name#74]
   +- *(2) BroadcastHashJoin [dim_key#62L], [id#73L], Inner, BuildRight, false
      :- *(2) Filter isnotnull(dim_key#62L)
      :  +- *(2) ColumnarToRow
      :     +- FileScan parquet [id#61L,dim_key#62L,value#63] Batched: true, DataFilters: [isnotnull(dim_key#62L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
      +- BroadcastQueryStage 0
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=502]
            +- *(1) Filter isnotnull(id#73L)
               +- *(1) ColumnarToRow
                  +- FileScan parquet [id#73L,name#74] Batched: true, DataFilters: [isnotnull(id#73L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
+- == Initial Plan ==
   Project [id#61L, dim_key#62L, value#63, name#74]
   +- BroadcastHashJoin [dim_key#62L], [id#73L], Inner, BuildRight, false
      :- Filter isnotnull(dim_key#62L)
      :  +- FileScan parquet [id#61L,dim_key#62L,value#63] Batched: true, DataFilters: [isnotnull(dim_key#62L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=488]
         +- Filter isnotnull(id#73L)
            +- FileScan parquet [id#73L,name#74] Batched: true, DataFilters: [isnotnull(id#73L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>


--- Comet (auto scan) ---
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) CometColumnarToRow
   +- CometProject [id#61L, dim_key#62L, value#63, name#74], [id#61L, dim_key#62L, value#63, name#74]
      +- CometBroadcastHashJoin [dim_key#62L], [id#73L], Inner, BuildRight
         :- CometFilter [id#61L, dim_key#62L, value#63], isnotnull(dim_key#62L)
         :  +- CometScan [native_iceberg_compat] parquet [id#61L,dim_key#62L,value#63] Batched: true, DataFilters: [isnotnull(dim_key#62L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
         +- BroadcastQueryStage 0
            +- CometBroadcastExchange [id#73L, name#74]
               +- CometFilter [id#73L, name#74], isnotnull(id#73L)
                  +- CometScan [native_iceberg_compat] parquet [id#73L,name#74] Batched: true, DataFilters: [isnotnull(id#73L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
+- == Initial Plan ==
   CometProject [id#61L, dim_key#62L, value#63, name#74], [id#61L, dim_key#62L, value#63, name#74]
   +- CometBroadcastHashJoin [dim_key#62L], [id#73L], Inner, BuildRight
      :- CometFilter [id#61L, dim_key#62L, value#63], isnotnull(dim_key#62L)
      :  +- CometScan [native_iceberg_compat] parquet [id#61L,dim_key#62L,value#63] Batched: true, DataFilters: [isnotnull(dim_key#62L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
      +- CometBroadcastExchange [id#73L, name#74]
         +- CometFilter [id#73L, name#74], isnotnull(id#73L)
            +- CometScan [native_iceberg_compat] parquet [id#73L,name#74] Batched: true, DataFilters: [isnotnull(id#73L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>


--- Comet (native_datafusion + DPP) ---
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) CometColumnarToRow
   +- CometProject [id#61L, dim_key#62L, value#63, name#74], [id#61L, dim_key#62L, value#63, name#74]
      +- CometBroadcastHashJoin [dim_key#62L], [id#73L], Inner, BuildRight
         :- CometFilter [id#61L, dim_key#62L, value#63], isnotnull(dim_key#62L)
         :  +- CometNativeScan parquet [id#61L,dim_key#62L,value#63] Batched: true, DataFilters: [isnotnull(dim_key#62L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
         +- BroadcastQueryStage 0
            +- CometBroadcastExchange [id#73L, name#74]
               +- CometFilter [id#73L, name#74], isnotnull(id#73L)
                  +- CometNativeScan parquet [id#73L,name#74] Batched: true, DataFilters: [isnotnull(id#73L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
+- == Initial Plan ==
   CometProject [id#61L, dim_key#62L, value#63, name#74], [id#61L, dim_key#62L, value#63, name#74]
   +- CometBroadcastHashJoin [dim_key#62L], [id#73L], Inner, BuildRight
      :- CometFilter [id#61L, dim_key#62L, value#63], isnotnull(dim_key#62L)
      :  +- CometNativeScan parquet [id#61L,dim_key#62L,value#63] Batched: true, DataFilters: [isnotnull(dim_key#62L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(dim_key)], ReadSchema: struct<id:bigint,dim_key:bigint,value:double>
      +- CometBroadcastExchange [id#73L, name#74]
         +- CometFilter [id#73L, name#74], isnotnull(id#73L)
            +- CometNativeScan parquet [id#73L,name#74] Batched: true, DataFilters: [isnotnull(id#73L)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2l/scrh4gwd32lf1rrytysnzmjw0000gn/T/spark-c9..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>


Note:

Spark: FileScan parquet (standard Spark scan)
Comet auto: CometScan [native_iceberg_compat]
Comet native_datafusion: CometNativeScan (with DPP support)

@codecov-commenter
Copy link

codecov-commenter commented Jan 9, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 54.59%. Comparing base (f09f8af) to head (4568c16).
⚠️ Report is 835 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3060      +/-   ##
============================================
- Coverage     56.12%   54.59%   -1.54%     
- Complexity      976     1262     +286     
============================================
  Files           119      167      +48     
  Lines         11743    15522    +3779     
  Branches       2251     2574     +323     
============================================
+ Hits           6591     8474    +1883     
- Misses         4012     5830    +1818     
- Partials       1140     1218      +78     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Shekharrajak Shekharrajak force-pushed the feature/dpp-phase1-mvp branch from 58f60e7 to 1b6f816 Compare January 10, 2026 06:38

// Add runtime filter bounds if available
// These are pushed down from join operators to enable I/O reduction
addRuntimeFilterBounds(scan, nativeScanBuilder)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dataFilters have been pushed down to the native scan.

val dataFilters = new ListBuffer[Expr]()
for (filter <- scan.supportedDataFilters) {
exprToProto(filter, scan.output) match {
case Some(proto) => dataFilters += proto
case _ =>
logWarning(s"Unsupported data filter $filter")
}
}
nativeScanBuilder.addAllDataFilters(dataFilters.asJava)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for sharing @wForget - looks like these changes were not required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filters are pushed down but native filtering is not implemented, right ? I think those fallback to spark.

Copy link
Member

@wForget wForget Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filters are pushed down but native filtering is not implemented, right ? I think those fallback to spark.

There are several possibilities:

  1. Runtime Filters have been pushed down to the native parquet reader, but the parquet reader is not enabled or does not support them. (Parquet row group filters will be ineffective, but subsequent Filter operator will be effective.)
  2. Runtime Filters have been generated, but they caused the spark plan to fall back to vanilla spark. (As far as I know, Comet’s support for subqueries may not be complete.)
  3. Runtime Filters were not generated correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing. I tried native runtime filters execution but do not see any improvements

The benchmark shows Comet is slower than Spark for workloads. This could be due to:

JNI Overhead: Significant cost crossing JVM/native boundary
Spark's Vectorized Reader: Highly optimized for in-memory parquet reading
Runtime Filters Not Effective: Filters may not be pruning row groups because:
Benchmark parquet files don't have bloom filters written
Data is in few row groups (small files)

@Shekharrajak Shekharrajak marked this pull request as draft January 10, 2026 08:43
@Shekharrajak Shekharrajak force-pushed the feature/dpp-phase1-mvp branch from 2b5c120 to 1add4cf Compare January 10, 2026 08:52
withInfo(scanExec, s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled")
}

// Native DataFusion doesn't support subqueries/dynamic pruning
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With or without this if condition - I am seeing same benchmarking, not sure when this is used .

@Shekharrajak
Copy link
Contributor Author

Update:

Although I could not achieve, what I was trying to do but the takeaway is :

Even for non-partitioned tables where DPP operates via dataFilters (not partitionFilters), Comet's native DataFusion scan shows:

Massive I/O reduction through efficient filter pushdown
Native vectorized execution benefits without requiring partition-level optimizations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants