Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Expressions that are not Spark-compatible will fall back to Spark by default and
| DateFormat | `date_format` | Yes | Partial support. Only specific format patterns are supported. |
| DateSub | `date_sub` | Yes | |
| DatePart | `date_part(field, source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` |
| Days | `days` | Yes | V2 partition transform. Supports DateType and TimestampType inputs. |
| Extract | `extract(field FROM source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` |
| FromUnixTime | `from_unixtime` | No | Does not support format, supports only -8334601211038 <= sec <= 8210266876799 |
| Hour | `hour` | No | Incorrectly applies timezone conversion to TimestampNTZ inputs ([#3180](https://github.com/apache/datafusion-comet/issues/3180)) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[DateAdd] -> CometDateAdd,
classOf[DateDiff] -> CometDateDiff,
classOf[DateFormatClass] -> CometDateFormat,
classOf[Days] -> CometDays,
classOf[DateSub] -> CometDateSub,
classOf[UnixDate] -> CometUnixDate,
classOf[FromUnixTime] -> CometFromUnixTime,
Expand Down
53 changes: 52 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.CometGetDateField.CometGetDateField
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.QueryPlanSerde._
Expand Down Expand Up @@ -586,3 +588,52 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
}
}
}

/**
* Converts a timestamp or date to the number of days since Unix epoch (1970-01-01). This is a V2
* partition transform expression.
*
* For DateType: dates are internally stored as days since epoch, so this is a simple cast to
* integer (same as CometUnixDate).
*
* For TimestampType: uses a timezone-aware Cast(Timestamp to Date) followed by Cast(Date to Int).
* The first cast respects the session timezone to correctly determine the date boundary.
*/
object CometDays extends CometExpressionSerde[Days] {
override def convert(
expr: Days,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val childExpr = exprToProtoInternal(expr.child, inputs, binding)

// Normalize input to DateType (Timestamp converts to Date first)
val dateExprOpt = expr.child.dataType match {
case DateType => childExpr
case TimestampType =>
val timezone = SQLConf.get.sessionLocalTimeZone
childExpr.flatMap { child =>
CometCast.castToProto(expr, Some(timezone), DateType, child, CometEvalMode.LEGACY)
}
case other =>
withInfo(expr, s"Days does not support input type: $other")
None
}

// Convert DateType to IntegerType (days since epoch)
val optExpr = dateExprOpt.map { dateExpr =>
Expr
.newBuilder()
.setCast(
ExprOuterClass.Cast
.newBuilder()
.setChild(dateExpr)
.setDatatype(serializeDataType(IntegerType).get)
.setEvalMode(ExprOuterClass.EvalMode.LEGACY)
.setAllowIncompat(false)
.build())
.build()
}

optExprWithInfo(optExpr, expr, expr.child)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ package org.apache.comet

import scala.util.Random

import org.apache.spark.sql.{CometTestBase, Row, SaveMode}
import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Days, Literal}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

Expand Down Expand Up @@ -395,4 +398,95 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH
// Test null handling
checkSparkAnswerAndOperator("SELECT unix_date(NULL)")
}

/**
* Checks that the Comet-evaluated DataFrame produces the same results as the baseline DataFrame
* evaluated by native Spark JVM, and that Comet native operators are used. This is needed
* because Days is a PartitionTransformExpression that extends Unevaluable, so
* checkSparkAnswerAndOperator cannot be used directly.
*/
private def checkDays(cometDF: DataFrame, baselineDF: DataFrame): Unit = {
// Ensure the expected answer is evaluated solely by native Spark JVM (Comet off)
var expected: Array[Row] = Array.empty
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
expected = baselineDF.collect()
}
checkAnswer(cometDF, expected.toSeq)
checkCometOperators(stripAQEPlan(cometDF.queryExecution.executedPlan))
}

test("days - date input") {
val r = new Random(42)
val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true)))
val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 1000, DataGenOptions())

checkDays(
dateDF.select(col("d"), getColumnFromExpression(Days(UnresolvedAttribute("d")))),
dateDF.selectExpr("d", "unix_date(d)"))
}

test("days - timestamp input") {
val r = new Random(42)
val tsSchema = StructType(Seq(StructField("ts", DataTypes.TimestampType, true)))
val tsDF = FuzzDataGenerator.generateDataFrame(r, spark, tsSchema, 1000, DataGenOptions())

for (timezone <- Seq("UTC", "America/Los_Angeles", "Asia/Tokyo")) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) {
checkDays(
tsDF.select(col("ts"), getColumnFromExpression(Days(UnresolvedAttribute("ts")))),
tsDF.selectExpr("ts", "unix_date(cast(ts as date))"))
}
}
}

test("days - literal edge cases") {
withSQLConf(
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {

val dummyDF = spark.range(1)

// Pre-epoch (should return negative day numbers)
checkDays(
dummyDF.select(
getColumnFromExpression(
Days(Literal.create(java.sql.Date.valueOf("1969-12-31"), DataTypes.DateType))),
getColumnFromExpression(
Days(Literal.create(java.sql.Date.valueOf("1960-01-01"), DataTypes.DateType)))),
dummyDF.selectExpr("unix_date(DATE('1969-12-31'))", "unix_date(DATE('1960-01-01'))"))

// Epoch and post-epoch
checkDays(
dummyDF.select(
getColumnFromExpression(
Days(Literal.create(java.sql.Date.valueOf("1970-01-01"), DataTypes.DateType))),
getColumnFromExpression(
Days(Literal.create(java.sql.Date.valueOf("1970-01-02"), DataTypes.DateType))),
getColumnFromExpression(
Days(Literal.create(java.sql.Date.valueOf("2024-01-01"), DataTypes.DateType)))),
dummyDF.selectExpr(
"unix_date(DATE('1970-01-01'))",
"unix_date(DATE('1970-01-02'))",
"unix_date(DATE('2024-01-01'))"))

// Timestamp literals
checkDays(
dummyDF.select(
getColumnFromExpression(Days(Literal
.create(java.sql.Timestamp.valueOf("1970-01-01 00:00:00"), DataTypes.TimestampType))),
getColumnFromExpression(
Days(
Literal.create(
java.sql.Timestamp.valueOf("2024-06-15 10:30:00"),
DataTypes.TimestampType)))),
dummyDF.selectExpr(
"unix_date(cast(TIMESTAMP('1970-01-01 00:00:00') as date))",
"unix_date(cast(TIMESTAMP('2024-06-15 10:30:00') as date))"))

// Null handling
checkDays(
dummyDF.select(getColumnFromExpression(Days(Literal.create(null, DataTypes.DateType)))),
dummyDF.selectExpr("unix_date(cast(NULL as date))"))
}
}
}
Loading