From 00c8947710ac9f020017009fbacda5f53760e409 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Fri, 20 Mar 2026 23:32:36 +0800 Subject: [PATCH 1/3] support expression spark days --- .../apache/comet/serde/QueryPlanSerde.scala | 1 + .../org/apache/comet/serde/datetime.scala | 53 ++++++++++- .../comet/CometTemporalExpressionSuite.scala | 89 +++++++++++++++++++ 3 files changed, 142 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 8c39ba779d..02a76f69f0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -196,6 +196,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[DateAdd] -> CometDateAdd, classOf[DateDiff] -> CometDateDiff, classOf[DateFormatClass] -> CometDateFormat, + classOf[Days] -> CometDays, classOf[DateSub] -> CometDateSub, classOf[UnixDate] -> CometUnixDate, classOf[FromUnixTime] -> CometFromUnixTime, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 0720f785dd..8f3894c1ac 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,11 +21,13 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.CometGetDateField.CometGetDateField import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.QueryPlanSerde._ @@ -586,3 +588,52 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { } } } + +/** + * Converts a timestamp or date to the number of days since Unix epoch (1970-01-01). This is a V2 + * partition transform expression. + * + * For DateType: dates are internally stored as days since epoch, so this is a simple cast to + * integer (same as CometUnixDate). + * + * For TimestampType: uses a timezone-aware Cast(Timestamp to Date) followed by Cast(Date to Int). + * The first cast respects the session timezone to correctly determine the date boundary. + */ +object CometDays extends CometExpressionSerde[Days] { + override def convert( + expr: Days, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = exprToProtoInternal(expr.child, inputs, binding) + + // Normalize input to DateType (Timestamp converts to Date first) + val dateExprOpt = expr.child.dataType match { + case DateType => childExpr + case TimestampType => + val timezone = SQLConf.get.sessionLocalTimeZone + childExpr.flatMap { child => + CometCast.castToProto(expr, Some(timezone), DateType, child, CometEvalMode.LEGACY) + } + case other => + withInfo(expr, s"Days does not support input type: $other") + None + } + + // Convert DateType to IntegerType (days since epoch) + val optExpr = dateExprOpt.map { dateExpr => + Expr + .newBuilder() + .setCast( + ExprOuterClass.Cast + .newBuilder() + .setChild(dateExpr) + .setDatatype(serializeDataType(IntegerType).get) + .setEvalMode(ExprOuterClass.EvalMode.LEGACY) + .setAllowIncompat(false) + .build()) + .build() + } + + optExprWithInfo(optExpr, expr, expr.child) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1ae6926e05..1f46b0984b 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -395,4 +395,93 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Test null handling checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } + + test("days") { + import org.apache.spark.sql.Column + import org.apache.spark.sql.DataFrame + import org.apache.spark.sql.catalyst.expressions.{Days, Literal} + import org.apache.spark.sql.functions.col + + def checkDays(cometDF: DataFrame, baselineDF: DataFrame): Unit = { + // Ensure the expected answer is evaluated solely by native Spark JVM (Comet off) + var expected: Array[org.apache.spark.sql.Row] = Array.empty + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + expected = baselineDF.collect() + } + checkAnswer(cometDF, expected.toSeq) + checkCometOperators(stripAQEPlan(cometDF.queryExecution.executedPlan)) + } + + // === DateType input === + val r = new Random(42) + val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) + val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 1000, DataGenOptions()) + + checkDays( + dateDF.select(col("d"), new Column(Days(col("d").expr))), + dateDF.selectExpr("d", "unix_date(d)")) + + // === TimestampType input with timezone tests === + val tsSchema = StructType(Seq(StructField("ts", DataTypes.TimestampType, true))) + val tsDF = + FuzzDataGenerator.generateDataFrame(r, spark, tsSchema, 1000, DataGenOptions()) + + for (timezone <- Seq("UTC", "America/Los_Angeles", "Asia/Tokyo")) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { + checkDays( + tsDF.select(col("ts"), new Column(Days(col("ts").expr))), + tsDF.selectExpr("ts", "unix_date(cast(ts as date))")) + } + } + + // === Literal edge cases === + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + + val dummyDF = spark.range(1) + + // Pre-epoch (should return negative day numbers) + checkDays( + dummyDF.select( + new Column( + Days(Literal.create(java.sql.Date.valueOf("1969-12-31"), DataTypes.DateType))), + new Column( + Days(Literal.create(java.sql.Date.valueOf("1960-01-01"), DataTypes.DateType)))), + dummyDF.selectExpr("unix_date(DATE('1969-12-31'))", "unix_date(DATE('1960-01-01'))")) + + // Epoch and post-epoch + checkDays( + dummyDF.select( + new Column( + Days(Literal.create(java.sql.Date.valueOf("1970-01-01"), DataTypes.DateType))), + new Column( + Days(Literal.create(java.sql.Date.valueOf("1970-01-02"), DataTypes.DateType))), + new Column( + Days(Literal.create(java.sql.Date.valueOf("2024-01-01"), DataTypes.DateType)))), + dummyDF.selectExpr( + "unix_date(DATE('1970-01-01'))", + "unix_date(DATE('1970-01-02'))", + "unix_date(DATE('2024-01-01'))")) + + // Timestamp literals + checkDays( + dummyDF.select( + new Column(Days(Literal + .create(java.sql.Timestamp.valueOf("1970-01-01 00:00:00"), DataTypes.TimestampType))), + new Column( + Days( + Literal.create( + java.sql.Timestamp.valueOf("2024-06-15 10:30:00"), + DataTypes.TimestampType)))), + dummyDF.selectExpr( + "unix_date(cast(TIMESTAMP('1970-01-01 00:00:00') as date))", + "unix_date(cast(TIMESTAMP('2024-06-15 10:30:00') as date))")) + + // Null handling + checkDays( + dummyDF.select(new Column(Days(Literal.create(null, DataTypes.DateType)))), + dummyDF.selectExpr("unix_date(cast(NULL as date))")) + } + } } From dee727361ca5bcf58d809cf1711ce05ba9827c4b Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Sat, 21 Mar 2026 00:52:43 +0800 Subject: [PATCH 2/3] add list of supported expressions --- docs/source/user-guide/latest/expressions.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 57b7a3455a..ced31c9165 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -98,6 +98,7 @@ Expressions that are not Spark-compatible will fall back to Spark by default and | DateDiff | `datediff` | Yes | | | DateFormat | `date_format` | Yes | Partial support. Only specific format patterns are supported. | | DateSub | `date_sub` | Yes | | +| Days | `days` | Yes | V2 partition transform. Supports DateType and TimestampType inputs. | | DatePart | `date_part(field, source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` | | Extract | `extract(field FROM source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` | | FromUnixTime | `from_unixtime` | No | Does not support format, supports only -8334601211038 <= sec <= 8210266876799 | From b5e4222452635dd7c8810186b59c67d1281b5e87 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Sun, 22 Mar 2026 13:06:46 +0800 Subject: [PATCH 3/3] address comment and fix spark 4.0 compilation --- docs/source/user-guide/latest/expressions.md | 2 +- .../comet/CometTemporalExpressionSuite.scala | 65 ++++++++++--------- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index ced31c9165..c8e5475d0d 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -98,8 +98,8 @@ Expressions that are not Spark-compatible will fall back to Spark by default and | DateDiff | `datediff` | Yes | | | DateFormat | `date_format` | Yes | Partial support. Only specific format patterns are supported. | | DateSub | `date_sub` | Yes | | -| Days | `days` | Yes | V2 partition transform. Supports DateType and TimestampType inputs. | | DatePart | `date_part(field, source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` | +| Days | `days` | Yes | V2 partition transform. Supports DateType and TimestampType inputs. | | Extract | `extract(field FROM source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` | | FromUnixTime | `from_unixtime` | No | Does not support format, supports only -8334601211038 <= sec <= 8210266876799 | | Hour | `hour` | No | Incorrectly applies timezone conversion to TimestampNTZ inputs ([#3180](https://github.com/apache/datafusion-comet/issues/3180)) | diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1f46b0984b..9f5413933a 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -21,8 +21,11 @@ package org.apache.comet import scala.util.Random -import org.apache.spark.sql.{CometTestBase, Row, SaveMode} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Days, Literal} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} @@ -396,45 +399,47 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } - test("days") { - import org.apache.spark.sql.Column - import org.apache.spark.sql.DataFrame - import org.apache.spark.sql.catalyst.expressions.{Days, Literal} - import org.apache.spark.sql.functions.col - - def checkDays(cometDF: DataFrame, baselineDF: DataFrame): Unit = { - // Ensure the expected answer is evaluated solely by native Spark JVM (Comet off) - var expected: Array[org.apache.spark.sql.Row] = Array.empty - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - expected = baselineDF.collect() - } - checkAnswer(cometDF, expected.toSeq) - checkCometOperators(stripAQEPlan(cometDF.queryExecution.executedPlan)) + /** + * Checks that the Comet-evaluated DataFrame produces the same results as the baseline DataFrame + * evaluated by native Spark JVM, and that Comet native operators are used. This is needed + * because Days is a PartitionTransformExpression that extends Unevaluable, so + * checkSparkAnswerAndOperator cannot be used directly. + */ + private def checkDays(cometDF: DataFrame, baselineDF: DataFrame): Unit = { + // Ensure the expected answer is evaluated solely by native Spark JVM (Comet off) + var expected: Array[Row] = Array.empty + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + expected = baselineDF.collect() } + checkAnswer(cometDF, expected.toSeq) + checkCometOperators(stripAQEPlan(cometDF.queryExecution.executedPlan)) + } - // === DateType input === + test("days - date input") { val r = new Random(42) val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 1000, DataGenOptions()) checkDays( - dateDF.select(col("d"), new Column(Days(col("d").expr))), + dateDF.select(col("d"), getColumnFromExpression(Days(UnresolvedAttribute("d")))), dateDF.selectExpr("d", "unix_date(d)")) + } - // === TimestampType input with timezone tests === + test("days - timestamp input") { + val r = new Random(42) val tsSchema = StructType(Seq(StructField("ts", DataTypes.TimestampType, true))) - val tsDF = - FuzzDataGenerator.generateDataFrame(r, spark, tsSchema, 1000, DataGenOptions()) + val tsDF = FuzzDataGenerator.generateDataFrame(r, spark, tsSchema, 1000, DataGenOptions()) for (timezone <- Seq("UTC", "America/Los_Angeles", "Asia/Tokyo")) { withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { checkDays( - tsDF.select(col("ts"), new Column(Days(col("ts").expr))), + tsDF.select(col("ts"), getColumnFromExpression(Days(UnresolvedAttribute("ts")))), tsDF.selectExpr("ts", "unix_date(cast(ts as date))")) } } + } - // === Literal edge cases === + test("days - literal edge cases") { withSQLConf( SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { @@ -444,20 +449,20 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Pre-epoch (should return negative day numbers) checkDays( dummyDF.select( - new Column( + getColumnFromExpression( Days(Literal.create(java.sql.Date.valueOf("1969-12-31"), DataTypes.DateType))), - new Column( + getColumnFromExpression( Days(Literal.create(java.sql.Date.valueOf("1960-01-01"), DataTypes.DateType)))), dummyDF.selectExpr("unix_date(DATE('1969-12-31'))", "unix_date(DATE('1960-01-01'))")) // Epoch and post-epoch checkDays( dummyDF.select( - new Column( + getColumnFromExpression( Days(Literal.create(java.sql.Date.valueOf("1970-01-01"), DataTypes.DateType))), - new Column( + getColumnFromExpression( Days(Literal.create(java.sql.Date.valueOf("1970-01-02"), DataTypes.DateType))), - new Column( + getColumnFromExpression( Days(Literal.create(java.sql.Date.valueOf("2024-01-01"), DataTypes.DateType)))), dummyDF.selectExpr( "unix_date(DATE('1970-01-01'))", @@ -467,9 +472,9 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Timestamp literals checkDays( dummyDF.select( - new Column(Days(Literal + getColumnFromExpression(Days(Literal .create(java.sql.Timestamp.valueOf("1970-01-01 00:00:00"), DataTypes.TimestampType))), - new Column( + getColumnFromExpression( Days( Literal.create( java.sql.Timestamp.valueOf("2024-06-15 10:30:00"), @@ -480,7 +485,7 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Null handling checkDays( - dummyDF.select(new Column(Days(Literal.create(null, DataTypes.DateType)))), + dummyDF.select(getColumnFromExpression(Days(Literal.create(null, DataTypes.DateType)))), dummyDF.selectExpr("unix_date(cast(NULL as date))")) } }