diff --git a/docs/src/format/table/branch_tag.md b/docs/src/format/table/branch_tag.md index e3bd328d5d..e04fc4200e 100644 --- a/docs/src/format/table/branch_tag.md +++ b/docs/src/format/table/branch_tag.md @@ -114,8 +114,10 @@ Tags are always stored at the root dataset level, regardless of which branch the Each tag file is a JSON file with the following fields: -| JSON Key | Type | Optional | Description | -|-----------------|--------|----------|--------------------------------------------------------------------------| -| `branch` | string | Yes | Branch name being tagged. `null` or absent indicates main branch. | -| `version` | number | | Version number being tagged within that branch. | -| `manifest_size` | number | | Size of the manifest file in bytes. Used for efficient manifest loading. | +| JSON Key | Type | Optional | Description | +|----------------|--------|----------|-----------------------------------------------------------------------------------| +| `branch` | string | Yes | Branch name being tagged. `null` or absent indicates main branch. | +| `version` | number | | Version number being tagged within that branch. | +| `createdAt` | string | Yes | RFC 3339 UTC timestamp when the tag was first created. Historical tags may omit it. | +| `updatedAt` | string | Yes | RFC 3339 UTC timestamp when the tag was last modified. On creation it matches `createdAt`. | +| `manifestSize` | number | | Size of the manifest file in bytes. Used for efficient manifest loading. | diff --git a/docs/src/guide/tags_and_branches.md b/docs/src/guide/tags_and_branches.md index 02701f29e8..8af2302bfb 100644 --- a/docs/src/guide/tags_and_branches.md +++ b/docs/src/guide/tags_and_branches.md @@ -36,10 +36,10 @@ print(ds.tags.list()) # {} ds.tags.create("v1-prod", (None, 1)) print(ds.tags.list()) -# {'v1-prod': {'version': 1, 'manifest_size': ...}} +# {'v1-prod': {'version': 1, 'created_at': ..., 'updated_at': ..., 'manifest_size': ...}} ds.tags.update("v1-prod", (None, 2)) print(ds.tags.list()) -# {'v1-prod': {'version': 2, 'manifest_size': ...}} +# {'v1-prod': {'version': 2, 'created_at': ..., 'updated_at': ..., 'manifest_size': ...}} ds.tags.delete("v1-prod") print(ds.tags.list()) # {} @@ -47,10 +47,10 @@ print(ds.tags.list_ordered()) # [] ds.tags.create("v1-prod", (None, 1)) print(ds.tags.list_ordered()) -# [('v1-prod', {'version': 1, 'manifest_size': ...})] +# [('v1-prod', {'version': 1, 'created_at': ..., 'updated_at': ..., 'manifest_size': ...})] ds.tags.update("v1-prod", (None, 2)) print(ds.tags.list_ordered()) -# [('v1-prod', {'version': 2, 'manifest_size': ...})] +# [('v1-prod', {'version': 2, 'created_at': ..., 'updated_at': ..., 'manifest_size': ...})] ds.tags.delete("v1-prod") print(ds.tags.list_ordered()) # [] @@ -122,4 +122,4 @@ print(ds.branches.list_ordered(order="desc")) Branches hold references to data files. Lance ensures that cleanup does not delete files still referenced by any branch. - Delete unused branches to allow their referenced files to be cleaned up by `cleanup_old_versions()`. \ No newline at end of file + Delete unused branches to allow their referenced files to be cleaned up by `cleanup_old_versions()`. diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index cc82c8acee..c7fe287002 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -2364,6 +2364,26 @@ fn inner_list_tags<'local>( env: &mut JNIEnv<'local>, java_dataset: JObject, ) -> Result> { + fn optional_datetime_to_java_instant<'local>( + env: &mut JNIEnv<'local>, + timestamp: Option<&DateTime>, + ) -> Result> { + if let Some(timestamp) = timestamp { + let seconds = timestamp.timestamp(); + let nanos = timestamp.timestamp_subsec_nanos() as i64; + Ok(env + .call_static_method( + "java/time/Instant", + "ofEpochSecond", + "(JJ)Ljava/time/Instant;", + &[JValue::Long(seconds), JValue::Long(nanos)], + )? + .l()?) + } else { + Ok(JObject::null()) + } + } + let tag_map = { let dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; @@ -2377,14 +2397,18 @@ fn inner_list_tags<'local>( } else { JObject::null() }; + let created_at = optional_datetime_to_java_instant(env, tag_contents.created_at.as_ref())?; + let updated_at = optional_datetime_to_java_instant(env, tag_contents.updated_at.as_ref())?; let java_tag = env.new_object( "org/lance/Tag", - "(Ljava/lang/String;Ljava/lang/String;JI)V", + "(Ljava/lang/String;Ljava/lang/String;JILjava/time/Instant;Ljava/time/Instant;)V", &[ JValue::Object(&env.new_string(tag_name)?.into()), JValue::Object(&branch_name), JValue::Long(tag_contents.version as i64), JValue::Int(tag_contents.manifest_size as i32), + JValue::Object(&created_at), + JValue::Object(&updated_at), ], )?; env.call_method( diff --git a/java/src/main/java/org/lance/Tag.java b/java/src/main/java/org/lance/Tag.java index f7ce7be83c..7aabf6f451 100644 --- a/java/src/main/java/org/lance/Tag.java +++ b/java/src/main/java/org/lance/Tag.java @@ -15,6 +15,7 @@ import com.google.common.base.MoreObjects; +import java.time.Instant; import java.util.Objects; import java.util.Optional; @@ -23,12 +24,31 @@ public class Tag { private final Optional branch; private final long version; private final int manifestSize; + private final Optional createdAt; + private final Optional updatedAt; public Tag(String name, String branch, long version, int manifestSize) { + this(name, branch, version, manifestSize, null, null); + } + + /** + * Constructor used by JNI when reading tag metadata from native code. + * + *

Timestamps are system-generated metadata and are not part of the public Java input surface. + */ + private Tag( + String name, + String branch, + long version, + int manifestSize, + Instant createdAt, + Instant updatedAt) { this.name = name; this.branch = Optional.ofNullable(branch); this.version = version; this.manifestSize = manifestSize; + this.createdAt = Optional.ofNullable(createdAt); + this.updatedAt = Optional.ofNullable(updatedAt); } public String getName() { @@ -47,6 +67,14 @@ public int getManifestSize() { return manifestSize; } + public Optional getCreatedAt() { + return createdAt; + } + + public Optional getUpdatedAt() { + return updatedAt; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -54,6 +82,8 @@ public String toString() { .add("branch", branch) .add("version", version) .add("manifestSize", manifestSize) + .add("createdAt", createdAt) + .add("updatedAt", updatedAt) .toString(); } diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index a707b4f4a3..0da50bfed7 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -61,6 +61,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.time.Instant; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -317,8 +318,15 @@ void testTags(@TempDir Path tempDir) { try (Dataset dataset = testDataset.createEmptyDataset()) { assertEquals(1, dataset.version()); dataset.tags().create("tag1", Ref.ofMain()); - assertEquals(1, dataset.tags().list().size()); - assertEquals(1, dataset.tags().list().get(0).getVersion()); + List tags = dataset.tags().list(); + Tag tag1 = tags.get(0); + assertEquals(1, tags.size()); + assertEquals(1, tag1.getVersion()); + assertEquals(new Tag("tag1", null, 1, tag1.getManifestSize()), tag1); + assertTrue(new HashSet<>(tags).contains(new Tag("tag1", null, 1, tag1.getManifestSize()))); + assertTrue(tag1.getCreatedAt().isPresent()); + assertTrue(tag1.getUpdatedAt().isPresent()); + assertEquals(tag1.getCreatedAt(), tag1.getUpdatedAt()); assertEquals(1, dataset.tags().getVersion("tag1")); } @@ -332,10 +340,41 @@ void testTags(@TempDir Path tempDir) { assertEquals(2, dataset2.tags().list().size()); assertEquals(1, dataset2.tags().getVersion("tag1")); assertEquals(2, dataset2.tags().getVersion("tag2")); + Instant tag2CreatedAt = + dataset2.tags().list().stream() + .filter(t -> t.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getCreatedAt() + .orElseThrow(); + Instant tag2UpdatedAt = + dataset2.tags().list().stream() + .filter(t -> t.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getUpdatedAt() + .orElseThrow(); + assertEquals(tag2CreatedAt, tag2UpdatedAt); dataset2.tags().update("tag2", Ref.ofMain(1)); assertEquals(2, dataset2.tags().list().size()); assertEquals(1, dataset2.tags().list().get(0).getVersion()); assertEquals(1, dataset2.tags().list().get(1).getVersion()); + Instant updatedTag2CreatedAt = + dataset2.tags().list().stream() + .filter(t -> t.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getCreatedAt() + .orElseThrow(); + Instant updatedTag2 = + dataset2.tags().list().stream() + .filter(t -> t.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getUpdatedAt() + .orElseThrow(); + assertEquals(updatedTag2CreatedAt, tag2CreatedAt); + assertFalse(updatedTag2.isBefore(tag2UpdatedAt)); assertEquals(1, dataset2.tags().getVersion("tag1")); assertEquals(1, dataset2.tags().getVersion("tag2")); dataset2.tags().delete("tag2"); diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7496746285..fb2ef39811 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -4370,6 +4370,8 @@ class Transaction: class Tag(TypedDict): branch: Optional[str] version: int + created_at: Optional[datetime] + updated_at: Optional[datetime] manifest_size: int @@ -5727,7 +5729,7 @@ def list(self) -> dict[str, Tag]: Returns ------- dict[str, Tag] - A dictionary mapping tag names to version numbers. + A dictionary mapping tag names to tag metadata. """ return self._ds.tags() diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 889c37036a..185aef059a 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -472,6 +472,12 @@ def test_tag(tmp_path: Path): ds.tags.create("tag1", 1) assert len(ds.tags.list()) == 1 + tag1_meta = ds.tags.list()["tag1"] + assert tag1_meta["created_at"] is not None + assert isinstance(tag1_meta["created_at"], datetime) + assert tag1_meta["updated_at"] is not None + assert isinstance(tag1_meta["updated_at"], datetime) + assert tag1_meta["created_at"] == tag1_meta["updated_at"] with pytest.raises(ValueError): ds.tags.create("tag1", 1) @@ -505,7 +511,16 @@ def test_tag(tmp_path: Path): ): ds.tags.update("tag3", 1) + tag1_meta = ds.tags.list()["tag1"] + tag1_created_at = tag1_meta["created_at"] + tag1_updated_at = tag1_meta["updated_at"] + assert tag1_created_at is not None + assert tag1_updated_at is not None ds.tags.update("tag1", 2) + updated_tag1_meta = ds.tags.list()["tag1"] + assert updated_tag1_meta["created_at"] == tag1_created_at + assert updated_tag1_meta["updated_at"] is not None + assert updated_tag1_meta["updated_at"] >= tag1_updated_at ds = lance.dataset(base_dir, "tag1") assert ds.version == 2 @@ -524,7 +539,6 @@ def test_tag(tmp_path: Path): assert target_tag is not None assert target_tag["version"] == 1 assert target_tag["branch"] == "branch" - ds.tags.update("tag3", (None, 2)) target_tag = ds.tags.list()["tag3"] assert ds.tags.get_version("tag3") == 2 @@ -558,6 +572,11 @@ def test_tag_order(tmp_path: Path): tags_asc = ds.tags.list_ordered(order="asc") assert len(tags_asc) == 3 + first_tag = tags_asc[0][1] + assert first_tag["created_at"] is not None + assert isinstance(first_tag["created_at"], datetime) + assert first_tag["updated_at"] is not None + assert isinstance(first_tag["updated_at"], datetime) tag_names_asc = [t[0] for t in tags_asc] assert tag_names_asc == sorted(expected_tags.keys()), ( f"Unexpected ascending order: {tag_names_asc}" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 35306636c9..c9a22d4e2c 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1681,6 +1681,8 @@ impl Dataset { for (tag_name, tag_content) in tags { let dict = PyDict::new(py); dict.set_item("version", tag_content.version)?; + dict.set_item("created_at", tag_content.created_at)?; + dict.set_item("updated_at", tag_content.updated_at)?; dict.set_item("manifest_size", tag_content.manifest_size)?; pylist.append((tag_name.as_str(), dict))?; @@ -1698,6 +1700,8 @@ impl Dataset { let dict = PyDict::new(py); dict.set_item("branch", v.branch.clone())?; dict.set_item("version", v.version)?; + dict.set_item("created_at", v.created_at)?; + dict.set_item("updated_at", v.updated_at)?; dict.set_item("manifest_size", v.manifest_size)?; pytags.set_item(k, dict.into_py_any(py)?)?; } diff --git a/rust/lance/src/dataset/refs.rs b/rust/lance/src/dataset/refs.rs index 15d4e74a50..a301cc6f19 100644 --- a/rust/lance/src/dataset/refs.rs +++ b/rust/lance/src/dataset/refs.rs @@ -3,6 +3,7 @@ use std::ops::Range; +use chrono::{DateTime, Utc}; use futures::stream::{StreamExt, TryStreamExt}; use itertools::Itertools; use lance_io::object_store::ObjectStore; @@ -13,6 +14,7 @@ use std::sync::Arc; use crate::dataset::branch_location::BranchLocation; use crate::dataset::refs::Ref::{Tag, Version, VersionNumber}; +use crate::utils::temporal::utc_now; use crate::{Error, Result}; use serde::de::DeserializeOwned; use std::cmp::Ordering; @@ -221,7 +223,10 @@ impl Tags<'_> { message: format!("tag {} already exists", tag), }); } - let tag_contents = self.build_tag_content_by_ref(reference).await?; + let now = utc_now(); + let tag_contents = self + .build_tag_content_by_ref(reference, Some(now), Some(now)) + .await?; self.object_store() .put( @@ -257,7 +262,10 @@ impl Tags<'_> { message: format!("tag {} does not exist", tag), }); } - let tag_contents = self.build_tag_content_by_ref(reference).await?; + let previous_tag = self.get(tag).await?; + let tag_contents = self + .build_tag_content_by_ref(reference, previous_tag.created_at, Some(utc_now())) + .await?; self.object_store() .put( @@ -268,7 +276,12 @@ impl Tags<'_> { .map(|_| ()) } - async fn build_tag_content_by_ref(&self, reference: impl Into) -> Result { + async fn build_tag_content_by_ref( + &self, + reference: impl Into, + created_at: Option>, + updated_at: Option>, + ) -> Result { let reference = reference.into(); let (branch, version_number) = match reference { Version(branch, version_number) => (branch, version_number), @@ -313,6 +326,8 @@ impl Tags<'_> { let tag_contents = TagContents { branch, version: manifest_file.version, + created_at, + updated_at, manifest_size, }; Ok(tag_contents) @@ -654,6 +669,9 @@ impl<'a> BranchRelativePath<'a> { pub struct TagContents { pub branch: Option, pub version: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option>, + pub updated_at: Option>, pub manifest_size: usize, } @@ -1095,6 +1113,8 @@ mod tests { let tag_contents = TagContents { branch: Some("feature".to_string()), version: 10, + created_at: Some(chrono::DateTime::from_timestamp(1_234_567_000, 456_000_000).unwrap()), + updated_at: Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()), manifest_size: 2048, }; @@ -1102,13 +1122,43 @@ mod tests { let json = serde_json::to_string(&tag_contents).unwrap(); assert!(json.contains("branch")); assert!(json.contains("version")); + assert!(json.contains("createdAt")); + assert!(json.contains("updatedAt")); assert!(json.contains("manifestSize")); // Test deserialization let deserialized: TagContents = serde_json::from_str(&json).unwrap(); assert_eq!(deserialized.branch, tag_contents.branch); assert_eq!(deserialized.version, tag_contents.version); + assert_eq!(deserialized.created_at, tag_contents.created_at); + assert_eq!(deserialized.updated_at, tag_contents.updated_at); assert_eq!(deserialized.manifest_size, tag_contents.manifest_size); + + let tag_contents_without_created_at = TagContents { + branch: Some("feature".to_string()), + version: 10, + created_at: None, + updated_at: Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()), + manifest_size: 2048, + }; + let json_without_created_at = + serde_json::to_string(&tag_contents_without_created_at).unwrap(); + assert!(!json_without_created_at.contains("createdAt")); + assert!(json_without_created_at.contains("updatedAt")); + + let legacy_json = r#"{"branch":"feature","version":10,"manifestSize":2048}"#; + let legacy_deserialized: TagContents = serde_json::from_str(legacy_json).unwrap(); + assert_eq!(legacy_deserialized.created_at, None); + assert_eq!(legacy_deserialized.updated_at, None); + + let legacy_updated_only_json = r#"{"branch":"feature","version":10,"updatedAt":"2009-02-13T23:31:30.123Z","manifestSize":2048}"#; + let legacy_updated_only_deserialized: TagContents = + serde_json::from_str(legacy_updated_only_json).unwrap(); + assert_eq!(legacy_updated_only_deserialized.created_at, None); + assert_eq!( + legacy_updated_only_deserialized.updated_at, + Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()) + ); } #[rstest] diff --git a/rust/lance/src/dataset/tests/dataset_versioning.rs b/rust/lance/src/dataset/tests/dataset_versioning.rs index e9253cc69f..ef0cd67c3b 100644 --- a/rust/lance/src/dataset/tests/dataset_versioning.rs +++ b/rust/lance/src/dataset/tests/dataset_versioning.rs @@ -327,6 +327,14 @@ async fn test_tag( dataset.tags().create("tag1", 1).await.unwrap(); assert_eq!(dataset.tags().list().await.unwrap().len(), 1); + let tag1_metadata = dataset.tags().get("tag1").await.unwrap(); + let tag1_created_at = tag1_metadata + .created_at + .expect("newly created tag should have created_at"); + let tag1_updated_at = tag1_metadata + .updated_at + .expect("newly created tag should have updated_at"); + assert_eq!(tag1_created_at, tag1_updated_at); let another_bad_tag_creation = dataset.tags().create("tag1", 1).await; assert_eq!( @@ -406,6 +414,12 @@ async fn test_tag( ); dataset.tags().update("tag1", 2).await.unwrap(); + let tag1_after_first_update = dataset.tags().get("tag1").await.unwrap(); + assert_eq!(tag1_after_first_update.created_at, Some(tag1_created_at)); + let tag1_updated_after_first_update = tag1_after_first_update + .updated_at + .expect("updated tag should have updated_at"); + assert!(tag1_updated_after_first_update >= tag1_updated_at); dataset = dataset.checkout_version("tag1").await.unwrap(); assert_eq!(dataset.manifest.version, 2);