Skip to content
347 changes: 320 additions & 27 deletions src/mysql-util/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,58 @@ use mz_repr::{Datum, Row, RowPacker, SqlScalarType};
use crate::desc::MySqlColumnMeta;
use crate::{MySqlColumnDesc, MySqlError, MySqlTableDesc};

/// Canonical text for the MySQL zero-date sentinel ('0000-00-00 00:00:00').
/// In binlog MYSQL_TYPE_DATETIME/MYSQL_TYPE_DATETIME2 encodings, sec=0 *cannot* represent unix
/// epoch 0. The TIMESTAMP type's supported range starts at '1970-01-01 00:00:01' UTC
/// (<https://dev.mysql.com/doc/refman/8.0/en/datetime.html>), so any sec=0 is
/// unambiguously this sentinel.
const MYSQL_ZERO_TIMESTAMP: &str = "0000-00-00 00:00:00";

/// Maximum fractional-seconds precision MySQL accepts for DATETIME(p) and
/// TIMESTAMP(p) — values are stored in microseconds, so 6 digits is the
/// upper bound (<https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html>).
const MYSQL_MAX_FRACTIONAL_PRECISION: u32 = 6;

/// Format the zero-date sentinel for a column with the given fractional
/// precision (matches the Date arm's `{:0precision$}` behavior).
fn mysql_zero_timestamp(precision: u32) -> String {
if precision > 0 {
format!(
"{}.{}",
MYSQL_ZERO_TIMESTAMP,
"0".repeat(usize::cast_from(precision))
)
} else {
MYSQL_ZERO_TIMESTAMP.to_string()
}
}

/// Format MySQL DATETIME/TIMESTAMP components as `YYYY-MM-DD HH:MM:SS[.ffff]`.
/// `micros` is the raw microseconds (0..1_000_000); only the leading
/// `precision` digits are kept, matching MySQL's DATETIME(p)/TIMESTAMP(p)
/// display.
fn format_mysql_timestamp(
y: u16,
m: u8,
d: u8,
hr: u8,
min: u8,
sec: u8,
micros: u32,
precision: u32,
) -> String {
if precision == 0 {
return format!("{y:04}-{m:02}-{d:02} {hr:02}:{min:02}:{sec:02}");
}
// Clamp defensively: MySQL itself rejects precision > 6, but upstream
// metadata is untrusted and a larger value would make `pow()` below
// overflow its u32 exponent.
let p = precision.min(MYSQL_MAX_FRACTIONAL_PRECISION);
let scaled = micros / 10u32.pow(MYSQL_MAX_FRACTIONAL_PRECISION - p);
let width = usize::cast_from(p);
format!("{y:04}-{m:02}-{d:02} {hr:02}:{min:02}:{sec:02}.{scaled:0width$}")
}

pub fn pack_mysql_row(
row_container: &mut Row,
row: MySqlRow,
Expand Down Expand Up @@ -325,34 +377,78 @@ fn pack_val_as_datum(
}
}
Some(MySqlColumnMeta::Timestamp(precision)) => {
// Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
// we need to handle them directly as strings
if let Value::Date(y, m, d, h, mm, s, ms) = value {
Comment thread
martykulma marked this conversation as resolved.
if *precision > 0 {
let precision: usize = (*precision).try_into()?;
packer.push(Datum::String(&format!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:0precision$}",
y,
m,
d,
h,
mm,
s,
ms,
precision = precision
)));
} else {
packer.push(Datum::String(&format!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
y, m, d, h, mm, s
)));
// Materialize treats DATETIME and TIMESTAMP as MySqlColumnMeta::Timestamp,
// but they have slightly different semantics as far as the range of dates
// they can represent.
// (see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-types.html).
//
// Three mysql_common::Value variants exist, which are mapped to
// [`MySqlColumnMeta::Timestamp`]
// (see https://github.com/blackbeam/rust_mysql_common/blob/2e6f6696de03c91b9fd95a87356d081285290704/src/binlog/value.rs):
// Value::Date — MZ snapshot & binlog MYSQL_TYPE_DATETIME/MYSQL_TYPE_DATETIME2
// (value/mod.rs:443-445, binlog/value.rs:109-161)
// Value::Int — legacy binlog MYSQL_TYPE_TIMESTAMP, pre-5.6,
// 4-byte unix epoch (binlog/value.rs:87-90)
// Value::Bytes — binlog MYSQL_TYPE_TIMESTAMP2, 5.6+,
// "<sec>" or "<sec>.<usec>" (binlog/value.rs:145-154)
let str_timestamp = match value {
Value::Date(y, m, d, h, mm, s, ms) => {
format_mysql_timestamp(y, m, d, h, mm, s, ms, *precision)
}
} else {
Err(anyhow::anyhow!(
// Pre-5.6 unix epoch, no fractional seconds.
// val == 0 is the zero-date sentinel, not epoch 0.
Value::Int(0) => mysql_zero_timestamp(*precision),
Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
.ok_or_else(|| {
anyhow::anyhow!("received invalid timestamp value: {}", val)
})?
.naive_utc()
.format("%Y-%m-%d %H:%M:%S")
.to_string(),
// 5.6+ epoch string; parse + reformat so all variants emit the
// same canonical YYYY-MM-DD HH:MM:SS[.ffff] text.
Value::Bytes(data) => {
let s = std::str::from_utf8(&data).map_err(|_| {
anyhow::anyhow!("received invalid timestamp value: {:?}", data)
})?;
// sec=0 (with or without fractional component) is the
// zero-date sentinel.
if s.split('.').next() == Some("0") {
mysql_zero_timestamp(*precision)
} else {
let dt = if s.contains('.') {
chrono::NaiveDateTime::parse_from_str(s, "%s%.6f")
} else {
chrono::NaiveDateTime::parse_from_str(s, "%s")
}
.map_err(|_| {
anyhow::anyhow!("received invalid timestamp value: {:?}", s)
})?;
use chrono::{Datelike, Timelike};
let y = u16::try_from(dt.year()).map_err(|_| {
anyhow::anyhow!(
"timestamp year out of range: {}",
dt.year()
)
})?;
format_mysql_timestamp(
y,
u8::try_from(dt.month())?,
u8::try_from(dt.day())?,
u8::try_from(dt.hour())?,
u8::try_from(dt.minute())?,
u8::try_from(dt.second())?,
dt.nanosecond() / 1000,
*precision,
)
}
}
_ => Err(anyhow::anyhow!(
"received unexpected value for timestamp type: {:?}",
value
))?;
}
))?,
};
packer.push(Datum::String(&str_timestamp));
}
Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
None => {
Expand Down Expand Up @@ -396,8 +492,8 @@ fn pack_val_as_datum(
// Timestamps are encoded as different mysql_common::Value types depending on
// whether they are from a binlog event or a query, and depending on which
// mysql timestamp version is used. We handle those cases here
// https://github.com/blackbeam/rust_mysql_common/blob/v0.31.0/src/binlog/value.rs#L87-L155
// https://github.com/blackbeam/rust_mysql_common/blob/v0.31.0/src/value/mod.rs#L332
// https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L155
// https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L332
let chrono_timestamp = match value {
Value::Date(..) => from_value_opt::<chrono::NaiveDateTime>(value)?,
// old temporal format from before MySQL 5.6; didn't support fractional seconds
Expand Down Expand Up @@ -481,3 +577,200 @@ fn check_char_length(
}
Ok(())
}

#[cfg(test)]
mod tests {
//! Unit tests for the TEXT-COLUMNS decoding of MySQL TIMESTAMP values.
//!
//! These cover the regression where a MySQL TIMESTAMP column declared as
//! a TEXT COLUMN fails to decode when the wire value arrives as
//! `Value::Bytes("<unix-epoch>")` or `Value::Int(<unix-epoch>)` instead
//! of `Value::Date(..)`. The integration test in
//! `test/mysql-cdc/text-columns-timestamp.td` exercises this through
//! a real MySQL container but is non-deterministic: which `Value`
//! variant `mysql-async` produces depends on connection-state timing.
//! These unit tests pin each variant down directly.
//!
//! The wire-variant matrix exercised below is derived from mysql_common
//! v0.35.5:
//!
//! * Value::Int(epoch) — binlog MYSQL_TYPE_TIMESTAMP (pre-5.6):
//! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L90
//! * Value::Bytes("<sec>"/"<sec>.<usec>") — binlog MYSQL_TYPE_TIMESTAMP2 (5.6+):
//! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L145-L154
//! * Value::Date(...) — binary query response + binlog DATETIME[2]:
//! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L443-L445
//! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L109-L161
//!
//! MySQL semantics referenced by the zero-date and fractional-precision
//! cases:
//!
//! * Zero-date allowed when sql_mode disables NO_ZERO_DATE:
//! https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html#sqlmode_no_zero_date
//! * TIMESTAMP(p) / DATETIME(p) fractional seconds:
//! https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
use super::*;
use mz_repr::{SqlColumnType, SqlScalarType};

fn timestamp_text_col(precision: u32) -> MySqlColumnDesc {
MySqlColumnDesc {
name: "created_at".to_string(),
column_type: Some(SqlColumnType {
scalar_type: SqlScalarType::String,
nullable: true,
}),
meta: Some(MySqlColumnMeta::Timestamp(precision)),
}
}

fn pack_one(value: Value, col: &MySqlColumnDesc) -> Result<String, anyhow::Error> {
let mut row = Row::default();
pack_val_as_datum(value, col, &mut row.packer())?;
Ok(row.unpack_first().unwrap_str().to_string())
}

#[mz_ore::test]
fn timestamp_value_date_no_precision() {
let col = timestamp_text_col(0);
let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 0), &col).unwrap();
assert_eq!(s, "2024-04-03 10:15:13");
}

#[mz_ore::test]
fn timestamp_value_date_with_precision() {
let col = timestamp_text_col(6);
let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 123456), &col).unwrap();
assert_eq!(s, "2024-04-03 10:15:13.123456");
}

#[mz_ore::test]
fn timestamp_value_date_zero_date() {
// The whole reason TEXT COLUMNS exists for TIMESTAMP: a
// zero-date arriving as Value::Date(0,..) should decode to the same
// "zero" timestamp value MySQL would display.
let col = timestamp_text_col(0);
let s = pack_one(Value::Date(0, 0, 0, 0, 0, 0, 0), &col).unwrap();
assert_eq!(s, "0000-00-00 00:00:00");
}

/// Regression: Value::Int (pre-5.6 legacy temporal format, unix
/// epoch seconds) was previously rejected with
/// `received unexpected value for timestamp type: Int(..)`.
#[mz_ore::test]
fn timestamp_value_int_epoch() {
let col = timestamp_text_col(0);
// 1743661234 == 2025-04-03 06:20:34 UTC
let s = pack_one(Value::Int(1_743_661_234), &col).unwrap();
assert_eq!(s, "2025-04-03 06:20:34");
}

/// sec=0 in the legacy TIMESTAMP encoding is the zero-date sentinel,
/// not unix epoch 0 — TIMESTAMP's range starts at '1970-01-01 00:00:01'
/// UTC so epoch 0 isn't a representable column value.
#[mz_ore::test]
fn timestamp_value_int_zero_is_sentinel() {
let col = timestamp_text_col(0);
let s = pack_one(Value::Int(0), &col).unwrap();
assert_eq!(s, "0000-00-00 00:00:00");
}

/// Out-of-range epochs must error rather than silently producing
/// a zero-timestamp — they aren't the MySQL zero-date marker, just
/// garbage chrono can't represent.
#[mz_ore::test]
fn timestamp_value_int_out_of_range_errors() {
let col = timestamp_text_col(0);
let err = pack_one(Value::Int(i64::MAX), &col).unwrap_err();
assert!(
err.to_string().contains("invalid timestamp value"),
"unexpected error message: {err}"
);
}

/// Regression: Value::Bytes carrying a unix-epoch string is the
/// wire variant that triggered the production failure
/// received unexpected value for timestamp type: Bytes("17436613..")
#[mz_ore::test]
fn timestamp_value_bytes_epoch() {
let col = timestamp_text_col(0);
let s = pack_one(Value::Bytes(b"1743661234".to_vec()), &col).unwrap();
assert_eq!(s, "2025-04-03 06:20:34");
}

/// sec=0 in the TIMESTAMP2 encoding is the zero-date sentinel; same
/// reasoning as `timestamp_value_int_zero_is_sentinel`.
#[mz_ore::test]
fn timestamp_value_bytes_zero_is_sentinel() {
let col = timestamp_text_col(0);
let s = pack_one(Value::Bytes(b"0".to_vec()), &col).unwrap();
assert_eq!(s, "0000-00-00 00:00:00");
}

/// Sentinel detection survives a fractional component ("0.NNNNNN"),
/// and the helper pads the output to the column's precision so that
/// snapshot and binlog paths produce identical text for the same
/// upstream row.
#[mz_ore::test]
fn timestamp_value_bytes_zero_with_fractional_is_sentinel() {
let col = timestamp_text_col(6);
let s = pack_one(Value::Bytes(b"0.000000".to_vec()), &col).unwrap();
assert_eq!(s, "0000-00-00 00:00:00.000000");
}

/// Fractional form of the TIMESTAMP2 binlog encoding —
/// "<sec>.<usec>" wrapped in Value::Bytes (binlog/value.rs:151-153).
/// Hits the `s.contains('.')` branch and the precision-aware
/// reformat.
#[mz_ore::test]
fn timestamp_value_bytes_epoch_fractional() {
let col = timestamp_text_col(6);
let s = pack_one(Value::Bytes(b"1743661234.123456".to_vec()), &col).unwrap();
assert_eq!(s, "2025-04-03 06:20:34.123456");
}

/// Bytes that aren't valid UTF-8 should produce a meaningful error,
/// not a panic.
#[mz_ore::test]
fn timestamp_value_bytes_invalid_utf8_errors() {
let col = timestamp_text_col(0);
// 0xC3 0x28 is an invalid 2-byte UTF-8 sequence.
let err = pack_one(Value::Bytes(vec![0xC3, 0x28]), &col).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("invalid timestamp value"),
"unexpected error message: {msg}"
);
}

/// Bytes that are valid UTF-8 but not parseable as a unix epoch
/// should produce the same structured error as invalid UTF-8 —
/// covers the chrono parse failure path that
/// `timestamp_value_bytes_invalid_utf8_errors` doesn't reach.
#[mz_ore::test]
fn timestamp_value_bytes_unparseable_errors() {
let col = timestamp_text_col(0);
// "2024-04-03 10:15:13" is not valid because Value::Bytes must contain seconds since
// epoch, Value::Bytes("<sec>"/"<sec>.<usec>")
for payload in [&b""[..], &b"not-an-epoch"[..], &b"2024-04-03 10:15:13"[..]] {
Comment thread
martykulma marked this conversation as resolved.
let err = pack_one(Value::Bytes(payload.to_vec()), &col).unwrap_err();
assert!(
err.to_string().contains("invalid timestamp value"),
"payload {payload:?}: unexpected error message: {err}"
);
}
}

/// Variants that have no defined mapping for a TIMESTAMP column
/// must still produce the existing structured decode error so the
/// source health surface can flag them.
#[mz_ore::test]
fn timestamp_value_unsupported_variant_errors() {
let col = timestamp_text_col(0);
let err = pack_one(Value::Float(1.0), &col).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("unexpected value for timestamp"),
"unexpected error message: {msg}"
);
}
}
Loading
Loading