feat: Support Spark expression: percentile_cont#3757
feat: Support Spark expression: percentile_cont#3757YutaLin wants to merge 2 commits intoapache:mainfrom
Conversation
| if (expr.percentageExpression.dataType.isInstanceOf[ArrayType]) { | ||
| withInfo(aggExpr, "array of percentiles not supported") | ||
| return None | ||
| } |
There was a problem hiding this comment.
nit : could be a better idea to test scalar and reject all other inputs to make the code more defensive ?
| val childExpr = exprToProto(expr.child, inputs, binding) | ||
| val percentileExpr = exprToProto(expr.percentageExpression, inputs, binding) | ||
| val dataType = serializeDataType(expr.dataType) | ||
|
|
There was a problem hiding this comment.
nit : Any reason this if condition wouldn't always be true at all times ?
| import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, CentralMomentAgg, Corr, Count, Covariance, CovPopulation, CovSample, First, Last, Max, Min, Percentile, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.{ByteType, DataTypes, DecimalType, IntegerType, LongType, ShortType, StringType} | ||
| import org.apache.spark.sql.types.{ArrayType, ByteType, DataTypes, DayTimeIntervalType, DecimalType, IntegerType, LongType, NumericType, , StringType, YearMonthIntervalType} |
There was a problem hiding this comment.
| import org.apache.spark.sql.types.{ArrayType, ByteType, DataTypes, DayTimeIntervalType, DecimalType, IntegerType, LongType, NumericType, , StringType, YearMonthIntervalType} | |
| import org.apache.spark.sql.types.{ArrayType, ByteType, DataTypes, DayTimeIntervalType, DecimalType, IntegerType, LongType, NumericType, StringType, YearMonthIntervalType} |
| // Support numeric types and interval types | ||
| expr.child.dataType match { | ||
| case _: NumericType => | ||
| case _: DecimalType => |
There was a problem hiding this comment.
The match on DecimalType is unreachable (because NumericType already handles it)
| case _: DecimalType => |
| -- Uses similar test data as Spark's percentiles.sql | ||
|
|
||
| statement | ||
| CREATE TABLE test_percentile(k int, v int) USING parquet |
There was a problem hiding this comment.
Thanks for adding the SQL tests! It would be great to add tests with negative values, boundary percentiles (0.0 and 1.0), all-null groups, single-value groups, and DOUBLE/FLOAT column types. The negative values test is especially important since the sort order matters for correctness and I think there may currently be bugs in the implementation, but would be good to start with adding tests
Which issue does this PR close?
Comet does not currently support the Spark percentile_cont function, causing queries using this function to fall back to Spark's JVM execution instead of running natively on DataFusion.
PercentileCont calculates a percentile value based on a continuous distribution of numeric or ANSI interval columns at a given percentage. It implements the SQL PERCENTILE_CONT function which uses linear interpolation between values when the exact percentile position falls between two data points. This expression is a runtime-replaceable aggregate that delegates to the internal Percentile implementation.
Supporting this expression would allow more Spark workloads to benefit from Comet's native acceleration.
Not include array percentile and weighted percentile now.
Closes #3190
What changes are included in this PR?
Add PercentileCont message for expr.proto
Add
CometPercentilewith validationsRegister Percentile class in QueryPlanSerde
Handle PercentileCont protobuf in
planner.rsCustom
percentile.rswith Binary state because Datafusion percentile_cont stores all values as List and shuffle with that will causeCannot cast list to non-list data typeserrorHow are these changes tested?
Add sql test include numerical and interval