feat: Support Spark expression window_time#3732
Conversation
| "(cast('2023-01-01 12:15:00' as timestamp), 3)") | ||
|
|
||
| // basic window_time with aggregation | ||
| checkSparkAnswer( |
There was a problem hiding this comment.
checkSparkAnswer isn't checking that the query is actually running in Comet. However, it would be better to add the SQL based tests instead as @comphead pointed out
|
Thanks @comphead and @andygrove for review and point this out. I'll add SQL tests in separate sql files and remove previous test. By the way, I discovered that using |
| INSERT INTO test_window_time VALUES (timestamp('2023-01-01 12:00:00'), 1), (timestamp('2023-01-01 12:05:00'), 2), (timestamp('2023-01-01 12:15:00'), 3) | ||
|
|
||
| -- basic window_time with tumbling window | ||
| query spark_answer_only |
There was a problem hiding this comment.
Both queries use spark_answer_only. Is there a reason these can't run with the default mode that verifies native execution? If there's another operator in the plan that causes fallback, could you note which one?
There was a problem hiding this comment.
Thanks @andygrove for review.
While this PR supports native execution, Spark's window() introduces unsupported ops that trigger a full fallback. Therefore, we temporarily use spark_answer_only for result verification. I'll add an inline comment to document this.
If I have misunderstood, please correct me.
There was a problem hiding this comment.
Yes, windowed aggregates are marked as incompatible. Maybe you can enable in the test with spark.comet.operator.WindowExec.allowIncompatible=true? You may need to check exact config key but it should be something like this.
There was a problem hiding this comment.
Thanks @andygrove . I dug into this and tried enabling the config spark.comet.operator.WindowExec.allowIncompatible=true, but the test still fails if I remove spark_answer_only.
The root cause might be at the expression level: Spark's TimeWindowing analyzer rule automatically expands window() into a CreateNamedStruct wrapped inside a KnownNullable expression. Since Comet doesn't currently support KnownNullable natively, the planner is forced to fall back to Spark anyway.
https://github.com/search?q=repo%3Aapache%2Fspark+KnownNullable&type=code
Fully supporting native struct creation and KnownNullable would be a broader effort outside the scope of this PR (which focuses on PreciseTimestampConversion issue for window_time).
Because of this, it might be better to keep spark_answer_only here and open a follow-up issue for those specific expressions. I'll make sure to update the inline comment to explicitly document the exact cause of the fallback for future reference.
Let me know your thoughts on this.
| CREATE TABLE test_window_time(time timestamp, value int) USING parquet | ||
|
|
||
| statement | ||
| INSERT INTO test_window_time VALUES (timestamp('2023-01-01 12:00:00'), 1), (timestamp('2023-01-01 12:05:00'), 2), (timestamp('2023-01-01 12:15:00'), 3) |
There was a problem hiding this comment.
Could you add a row with a null timestamp to the test data? Something like (NULL, 4)?
There was a problem hiding this comment.
Sure, I'll update this.Thanks.
spark/src/test/resources/sql-tests/expressions/datetime/window_time.sql
Outdated
Show resolved
Hide resolved
| // Both types are i64 micros in Arrow, so no conversion needed — return child directly. | ||
| exprToProtoInternal(expr.child, inputs, binding) |
|
Thanks @0lai0 this is looking good so far. Could you also update the user docs to add this to the list of supported expressions |
Co-authored-by: Andy Grove <agrove@apache.org>
Which issue does this PR close?
Closes #3138
Rationale for this change
Comet previously did not support the Spark
PreciseTimestampConversionexpression.This expression is not called directly by users, but is generated internally by Spark's Analyzer when it rewrites window_time(window_column) into a combination of GetStructField, Subtract, and PreciseTimestampConversion. Since Comet already supports GetStructField and Subtract but not PreciseTimestampConversion, queries using window_time would fall back to JVM execution.
What changes are included in this PR?
This change adds a Serde handler for PreciseTimestampConversion that reuses the existing Cast protobuf and Rust implementation. Since TimestampType (microseconds) and LongType share the same 64-bit memory layout in Arrow, a simple cast suffices without any new native code paths.
datetime.scala: Added CometPreciseTimestampConversion handlerQueryPlanSerde.scala: Registered the handler in temporalExpressions mapwindow_time.sql: Added window_time testHow are these changes tested?
Added a Scala unit test for
window_time expressionsupport inCometExpressionSuite./mvnw test -Dsuites="org.apache.comet.CometExpressionSuite" -Dscala.useZincServer=false