1515See the License for the specific language governing permissions and
1616limitations under the License.
1717"""
18+ import calendar
1819import struct
1920import unittest
2021from datetime import datetime
2122from decimal import Decimal
2223
2324from pypaimon .schema .data_types import AtomicType , DataField
24- from pypaimon .table .row .generic_row import GenericRow , GenericRowSerializer , GenericRowDeserializer
25+ from pypaimon .table .row .generic_row import (
26+ GenericRow , GenericRowSerializer , GenericRowDeserializer ,
27+ _datetime_to_millis_and_nanos ,
28+ )
2529from pypaimon .table .row .row_kind import RowKind
2630
2731
32+ def _to_millis (dt : datetime ) -> int :
33+ """Helper: datetime to epoch millis using the same logic as serialization."""
34+ millis , _ = _datetime_to_millis_and_nanos (dt )
35+ return millis
36+
37+
2838class TimestampTest (unittest .TestCase ):
29- """Tests for timestamp serialization/deserialization in GenericRow,
30- aligned with Java BinaryRow's compact/non-compact format."""
39+ """Tests for timestamp serialization/deserialization in GenericRow."""
3140
3241 def test_timestamp_compact (self ):
33- """Compact timestamp (precision <= 3): epoch millis stored directly in fixed part ."""
42+ """Compact timestamp (precision <= 3): round-trip preserves millis ."""
3443 for type_str in ["TIMESTAMP(0)" , "TIMESTAMP(3)" ]:
3544 with self .subTest (type = type_str ):
3645 fields = [DataField (0 , "ts" , AtomicType (type_str ))]
37- ts = datetime (2025 , 4 , 8 , 10 , 30 , 0 , 123000 ) # .123 seconds
46+ ts = datetime (2025 , 4 , 8 , 10 , 30 , 0 , 123000 )
3847 row = GenericRow ([ts ], fields , RowKind .INSERT )
3948 serialized = GenericRowSerializer .to_bytes (row )
4049 result = GenericRowDeserializer .from_bytes (serialized , fields )
41- # compact stores millis, so microsecond part is truncated to millis
42- expected_millis = int (ts .timestamp () * 1000 )
43- actual_millis = int (result .values [0 ].timestamp () * 1000 )
44- self .assertEqual (actual_millis , expected_millis )
50+ self .assertEqual (_to_millis (result .values [0 ]), _to_millis (ts ))
4551
4652 def test_timestamp_compact_binary_format (self ):
47- """Verify compact timestamp binary layout: epoch millis in fixed slot, no variable area."""
53+ """Verify compact binary layout: epoch millis in fixed slot, no variable area."""
4854 fields = [DataField (0 , "ts" , AtomicType ("TIMESTAMP(3)" ))]
4955 ts = datetime (2025 , 4 , 8 , 10 , 30 , 0 )
5056 row = GenericRow ([ts ], fields , RowKind .INSERT )
5157 serialized = GenericRowSerializer .to_bytes (row )
5258
53- data = serialized [4 :] # skip arity prefix
59+ data = serialized [4 :]
5460 null_bits_size = 8
5561 fixed_part_size = null_bits_size + 1 * 8
56- # No variable area for compact timestamp
5762 self .assertEqual (len (data ), fixed_part_size )
58- # Fixed slot contains epoch millis
5963 field_offset = null_bits_size
6064 millis = struct .unpack ('<q' , data [field_offset :field_offset + 8 ])[0 ]
61- self .assertEqual (millis , int (ts . timestamp () * 1000 ))
65+ self .assertEqual (millis , _to_millis (ts ))
6266
6367 def test_timestamp_non_compact (self ):
6468 """Non-compact timestamp (precision 4-9): round-trip preserves microseconds."""
6569 for type_str in ["TIMESTAMP(6)" , "TIMESTAMP(9)" ]:
6670 with self .subTest (type = type_str ):
6771 fields = [DataField (0 , "ts" , AtomicType (type_str ))]
68- ts = datetime (2025 , 4 , 8 , 10 , 30 , 0 , 123456 ) # .123456 seconds
72+ ts = datetime (2025 , 4 , 8 , 10 , 30 , 0 , 123456 )
6973 row = GenericRow ([ts ], fields , RowKind .INSERT )
7074 serialized = GenericRowSerializer .to_bytes (row )
7175 result = GenericRowDeserializer .from_bytes (serialized , fields )
72- # Python datetime has microsecond precision
7376 self .assertEqual (result .values [0 ], ts )
7477
7578 def test_timestamp_non_compact_binary_format (self ):
76- """Verify non-compact timestamp binary layout matches Java:
77- - fixed slot: (offset << 32) | nanoOfMillisecond
78- - variable area: epoch millis (8 bytes)
79- """
79+ """Verify non-compact binary layout: (offset << 32 | nanoOfMilli) in fixed slot,
80+ epoch millis in variable area."""
8081 fields = [DataField (0 , "ts" , AtomicType ("TIMESTAMP(6)" ))]
81- ts = datetime (2025 , 4 , 8 , 10 , 30 , 0 , 123456 ) # .123456 seconds
82+ ts = datetime (2025 , 4 , 8 , 10 , 30 , 0 , 123456 )
8283 row = GenericRow ([ts ], fields , RowKind .INSERT )
8384 serialized = GenericRowSerializer .to_bytes (row )
8485
85- data = serialized [4 :] # skip arity prefix
86+ data = serialized [4 :]
8687 null_bits_size = 8
8788 fixed_part_size = null_bits_size + 1 * 8
8889
@@ -95,15 +96,13 @@ def test_timestamp_non_compact_binary_format(self):
9596 cursor = (offset_and_nano >> 32 ) & 0xFFFFFFFF
9697 nano_of_millisecond = offset_and_nano & 0xFFFFFFFF
9798
98- # cursor should point to variable area
9999 self .assertEqual (cursor , fixed_part_size )
100100 # 123456 us = 123 ms + 456 us = 123 ms + 456000 ns
101101 self .assertEqual (nano_of_millisecond , 456000 )
102102
103103 # Variable area contains epoch millis
104104 var_millis = struct .unpack ('<q' , data [cursor :cursor + 8 ])[0 ]
105- expected_millis = int (ts .timestamp () * 1000 )
106- self .assertEqual (var_millis , expected_millis )
105+ self .assertEqual (var_millis , _to_millis (ts ))
107106
108107 def test_timestamp_non_compact_null (self ):
109108 """Non-compact timestamp null value."""
@@ -116,19 +115,17 @@ def test_timestamp_non_compact_null(self):
116115 def test_timestamp_boundary_precision (self ):
117116 """Precision 3 is last compact, precision 4 is first non-compact."""
118117 ts = datetime (2025 , 4 , 8 , 10 , 30 , 0 , 123456 )
118+ fixed_part_size = 8 + 1 * 8
119119
120120 # precision=3: compact, no variable area
121121 fields_3 = [DataField (0 , "ts" , AtomicType ("TIMESTAMP(3)" ))]
122122 s_3 = GenericRowSerializer .to_bytes (GenericRow ([ts ], fields_3 , RowKind .INSERT ))
123- data_3 = s_3 [4 :]
124- fixed_part_size = 8 + 1 * 8
125- self .assertEqual (len (data_3 ), fixed_part_size )
123+ self .assertEqual (len (s_3 [4 :]), fixed_part_size )
126124
127125 # precision=4: non-compact, has 8-byte variable area
128126 fields_4 = [DataField (0 , "ts" , AtomicType ("TIMESTAMP(4)" ))]
129127 s_4 = GenericRowSerializer .to_bytes (GenericRow ([ts ], fields_4 , RowKind .INSERT ))
130- data_4 = s_4 [4 :]
131- self .assertEqual (len (data_4 ), fixed_part_size + 8 )
128+ self .assertEqual (len (s_4 [4 :]), fixed_part_size + 8 )
132129
133130 def test_timestamp_mixed_with_other_types (self ):
134131 """Non-compact timestamp mixed with other types in a single row."""
@@ -153,7 +150,7 @@ def test_timestamp_mixed_with_other_types(self):
153150
154151 self .assertEqual (result .values [0 ], 42 )
155152 self .assertEqual (result .values [1 ], "hello" )
156- self .assertEqual (int (result .values [2 ]. timestamp () * 1000 ), int (ts_compact . timestamp () * 1000 ))
153+ self .assertEqual (_to_millis (result .values [2 ]), _to_millis (ts_compact ))
157154 self .assertEqual (result .values [3 ], ts_non_compact )
158155 self .assertEqual (result .values [4 ], dec_val )
159156
@@ -164,15 +161,13 @@ def test_timestamp_default_precision(self):
164161 row = GenericRow ([ts ], fields , RowKind .INSERT )
165162 serialized = GenericRowSerializer .to_bytes (row )
166163 result = GenericRowDeserializer .from_bytes (serialized , fields )
167- expected_millis = int (ts .timestamp () * 1000 )
168- actual_millis = int (result .values [0 ].timestamp () * 1000 )
169- self .assertEqual (actual_millis , expected_millis )
164+ self .assertEqual (_to_millis (result .values [0 ]), _to_millis (ts ))
170165
171166 def test_timestamp_pre_epoch (self ):
172167 """Dates before 1970-01-01 (negative epoch millis) should round-trip correctly."""
173168 # Compact
174169 fields3 = [DataField (0 , "ts" , AtomicType ("TIMESTAMP(3)" ))]
175- ts_pre = datetime (1969 , 7 , 20 , 20 , 17 , 0 ) # Apollo 11 moon landing
170+ ts_pre = datetime (1969 , 7 , 20 , 20 , 17 , 0 )
176171 row = GenericRow ([ts_pre ], fields3 , RowKind .INSERT )
177172 serialized = GenericRowSerializer .to_bytes (row )
178173 result = GenericRowDeserializer .from_bytes (serialized , fields3 )
0 commit comments