diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index d17b7450..7b0f3a02 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -70,6 +70,16 @@ backend; objects exceeding it go to the long-term backend. See [`StorageConfig`] for available backend implementations. +## Redirect Tombstones + +For large objects, the high-volume backend stores a **redirect tombstone** — a +zero-payload marker whose metadata has `is_redirect_tombstone: true`. This +allows reads to check only the high-volume backend and follow the tombstone to +long-term storage, without scanning both backends on every read. + +See [`StorageService`] for write ordering, consistency guarantees, and known +gaps. + # Metadata and Payload Every object consists of structured **metadata** and a binary **payload**. diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index 33818261..540e670e 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -13,7 +13,7 @@ use tonic::Code; use crate::PayloadStream; use crate::backend::common::{ - Backend, DeleteOutcome, DeleteResponse, GetResponse, MetadataResponse, PutResponse, + Backend, ConditionalOutcome, DeleteResponse, GetResponse, MetadataResponse, PutResponse, }; use crate::error::{Error, Result}; use crate::gcp_auth::PrefetchingTokenProvider; @@ -100,6 +100,28 @@ fn column_filter(column: &[u8]) -> v2::RowFilter { } } +/// Creates a predicate filter that matches rows containing a redirect tombstone. +/// +/// Checks the metadata column for the `is_redirect_tombstone` marker. We cannot +/// use payload-column presence because `put_row` always writes a `p` cell — even +/// for tombstones (with empty bytes). +fn tombstone_predicate() -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_METADATA), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRegexFilter( + // RE2 full-match anchored to the JSON start. The field ordering of + // Metadata ensures `is_redirect_tombstone` is serialized first. + b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(), + )), + }, + ], + })), + } +} + /// Parsed data from a BigTable row's cells. struct RowData { metadata: Metadata, @@ -292,22 +314,22 @@ impl BigTableBackend { Ok(response.into_inner()) } - async fn put_row( - &self, - path: Vec, - metadata: &Metadata, - payload: Vec, - action: &str, - ) -> Result { + /// Builds the three mutations that write an object: delete the existing row, + /// set the payload cell, set the metadata cell. + /// + /// Used by both `put_row` (unconditional write) and `put_non_tombstone` + /// (conditional write via `CheckAndMutateRowRequest`). + /// + /// NB: We explicitly delete the row before writing to clear stale metadata + /// on overwrite. + fn write_mutations(metadata: &Metadata, payload: Vec) -> Result<[mutation::Mutation; 3]> { let now = SystemTime::now(); let (family, timestamp_micros) = match metadata.expiration_policy { ExpirationPolicy::Manual => (FAMILY_MANUAL, -1), ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?), ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?), }; - - let mutations = [ - // NB: We explicitly delete the row to clear metadata on overwrite. + Ok([ mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}), mutation::Mutation::SetCell(mutation::SetCell { family_name: family.to_owned(), @@ -324,8 +346,18 @@ impl BigTableBackend { cause, })?, }), - ]; - self.mutate(path, mutations, action).await + ]) + } + + async fn put_row( + &self, + path: Vec, + metadata: &Metadata, + payload: Vec, + action: &str, + ) -> Result { + self.mutate(path, Self::write_mutations(metadata, payload)?, action) + .await } } @@ -355,6 +387,56 @@ impl Backend for BigTableBackend { Ok(()) } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] + async fn put_non_tombstone( + &self, + id: &ObjectId, + metadata: &Metadata, + mut stream: PayloadStream, + ) -> Result { + tracing::debug!("Conditional write to Bigtable backend"); + + // Buffer the payload before the conditional RPC so the operation + // appears atomic: if the predicate is false we write, otherwise we + // discard the buffered bytes and return Tombstone. + let mut payload = ChunkedBytes::new(0); + while let Some(chunk) = stream.try_next().await? { + payload.push(chunk); + } + + let path = id.as_storage_path().to_string().into_bytes(); + let write_mutations: Vec = + Self::write_mutations(metadata, payload.into_bytes().into())? + .into_iter() + .map(|m| v2::Mutation { mutation: Some(m) }) + .collect(); + + let request = v2::CheckAndMutateRowRequest { + table_name: self.table_path.clone(), + row_key: path, + predicate_filter: Some(tombstone_predicate()), + true_mutations: vec![], // Tombstone matched → reject (no write). + false_mutations: write_mutations, // Not a tombstone → write. + ..Default::default() + }; + + let is_tombstone = self + .with_retry("put_non_tombstone", || async { + self.bigtable + .client() + .check_and_mutate_row(request.clone()) + .await + }) + .await? + .predicate_matched; + + Ok(if is_tombstone { + ConditionalOutcome::Tombstone + } else { + ConditionalOutcome::Executed + }) + } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn get_object(&self, id: &ObjectId) -> Result { tracing::debug!("Reading from Bigtable backend"); @@ -422,7 +504,7 @@ impl Backend for BigTableBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn delete_non_tombstone(&self, id: &ObjectId) -> Result { + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result { tracing::debug!("Conditional delete from Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); @@ -439,20 +521,7 @@ impl Backend for BigTableBackend { let request = v2::CheckAndMutateRowRequest { table_name: self.table_path.clone(), row_key: path, - predicate_filter: Some(v2::RowFilter { - filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { - filters: vec![ - column_filter(COLUMN_METADATA), - v2::RowFilter { - filter: Some(v2::row_filter::Filter::ValueRegexFilter( - // RE2 full-match anchored to the JSON start. The field ordering of - // Metadata ensures `is_redirect_tombstone` is serialized first. - b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(), - )), - }, - ], - })), - }), + predicate_filter: Some(tombstone_predicate()), true_mutations: vec![], // Tombstone matched → leave intact (no mutations). false_mutations: vec![delete_mutation], // Not a tombstone → delete the row. ..Default::default() @@ -469,9 +538,9 @@ impl Backend for BigTableBackend { .predicate_matched; if is_tombstone { - Ok(DeleteOutcome::Tombstone) + Ok(ConditionalOutcome::Tombstone) } else { - Ok(DeleteOutcome::Deleted) + Ok(ConditionalOutcome::Executed) } } } @@ -873,7 +942,7 @@ mod tests { .await?; let result = backend.delete_non_tombstone(&id).await?; - assert_eq!(result, DeleteOutcome::Deleted); + assert_eq!(result, ConditionalOutcome::Executed); let get_result = backend.get_object(&id).await?; assert!(get_result.is_none()); @@ -894,7 +963,7 @@ mod tests { backend.put_object(&id, &metadata, make_stream(b"")).await?; let result = backend.delete_non_tombstone(&id).await?; - assert_eq!(result, DeleteOutcome::Tombstone); + assert_eq!(result, ConditionalOutcome::Tombstone); // Tombstone should still exist — delete_non_tombstone leaves it intact. let get_result = backend.get_metadata(&id).await?; @@ -912,7 +981,7 @@ mod tests { let id = make_id(); let result = backend.delete_non_tombstone(&id).await?; - assert_eq!(result, DeleteOutcome::Deleted); + assert_eq!(result, ConditionalOutcome::Executed); Ok(()) } diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 164be3f9..b856f30e 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -22,13 +22,15 @@ pub(super) type MetadataResponse = Option; /// Backend response for delete operations. pub(super) type DeleteResponse = (); -/// Response from [`Backend::delete_non_tombstone`]. +/// Outcome of a conditional operation that is skipped when a redirect tombstone is present. +/// +/// Returned by [`Backend::put_non_tombstone`] and [`Backend::delete_non_tombstone`]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum DeleteOutcome { - /// The entity was a redirect tombstone; it was left intact. +pub enum ConditionalOutcome { + /// The operation was executed (no tombstone was present). + Executed, + /// A redirect tombstone was found; the operation was skipped. Tombstone, - /// The entity was a regular object (now deleted) or non-existent. - Deleted, } /// A type-erased [`Backend`] instance. @@ -48,6 +50,29 @@ pub trait Backend: Debug + Send + Sync + 'static { stream: PayloadStream, ) -> Result; + /// Writes the object only if the row does NOT already contain a redirect tombstone. + /// + /// Returns [`ConditionalOutcome::Executed`] after a successful write, or + /// [`ConditionalOutcome::Tombstone`] (leaving the row intact) when a redirect + /// tombstone is already present. + /// + /// The default implementation is a non-atomic read-then-write; backends + /// should override this with an atomic conditional mutation where possible. + async fn put_non_tombstone( + &self, + id: &ObjectId, + metadata: &Metadata, + stream: PayloadStream, + ) -> Result { + let existing = self.get_metadata(id).await?; + if existing.is_some_and(|m| m.is_tombstone()) { + Ok(ConditionalOutcome::Tombstone) + } else { + self.put_object(id, metadata, stream).await?; + Ok(ConditionalOutcome::Executed) + } + } + /// Retrieves an object at the given path, returning its metadata and a stream of bytes. async fn get_object(&self, id: &ObjectId) -> Result; @@ -64,16 +89,16 @@ pub trait Backend: Debug + Send + Sync + 'static { /// Deletes the object only if it is NOT a redirect tombstone. /// - /// Returns [`DeleteOutcome::Tombstone`] (leaving the row intact) when - /// the object is a redirect tombstone, or [`DeleteOutcome::Deleted`] + /// Returns [`ConditionalOutcome::Tombstone`] (leaving the row intact) when + /// the object is a redirect tombstone, or [`ConditionalOutcome::Executed`] /// (after deleting it) for regular objects and non-existent rows. - async fn delete_non_tombstone(&self, id: &ObjectId) -> Result { + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result { let metadata = self.get_metadata(id).await?; if metadata.is_some_and(|m| m.is_tombstone()) { - Ok(DeleteOutcome::Tombstone) + Ok(ConditionalOutcome::Tombstone) } else { self.delete_object(id).await?; - Ok(DeleteOutcome::Deleted) + Ok(ConditionalOutcome::Executed) } } } diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index ceb6da53..0b60dd7f 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -12,7 +12,7 @@ use bytes::{Bytes, BytesMut}; use futures_util::{StreamExt, TryStreamExt}; use objectstore_types::metadata::Metadata; -use super::common::{DeleteResponse, GetResponse, PutResponse}; +use super::common::{ConditionalOutcome, DeleteResponse, GetResponse, PutResponse}; use crate::PayloadStream; use crate::error::Result; use crate::id::ObjectId; @@ -76,6 +76,25 @@ impl super::common::Backend for InMemoryBackend { Ok(()) } + async fn put_non_tombstone( + &self, + id: &ObjectId, + metadata: &Metadata, + stream: PayloadStream, + ) -> Result { + // Buffer the payload before acquiring the lock so the lock is held + // only for the check-and-write, making the operation atomic. + let payload: BytesMut = stream.try_collect().await?; + let mut store = self.store.lock().unwrap(); + if store.get(id).is_some_and(|(m, _)| m.is_tombstone()) { + return Ok(ConditionalOutcome::Tombstone); + } + let mut metadata = metadata.clone(); + metadata.size = Some(payload.len()); + store.insert(id.clone(), (metadata, payload.freeze())); + Ok(ConditionalOutcome::Executed) + } + async fn get_object(&self, id: &ObjectId) -> Result { let entry = self.store.lock().unwrap().get(id).cloned(); Ok(entry.map(|(metadata, bytes)| { diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index e9d7def7..87cc3a66 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -107,16 +107,26 @@ pub const DEFAULT_CONCURRENCY_LIMIT: usize = 500; /// /// The tombstone system maintains consistency through operation ordering rather /// than distributed locks. The invariant is: a redirect tombstone is always the -/// **last thing written** and the **last thing removed**. +/// **first thing written** and the **last thing removed**. /// -/// - On **write**, the real object is persisted before the tombstone. If the -/// tombstone write fails, the real object is rolled back. +/// - On **write**, the tombstone is written to the high-volume backend before +/// the real object is written to long-term storage. If the tombstone write +/// fails, nothing is written. If the data write fails, a *headless tombstone* +/// remains: the tombstone exists in HV but no data in LT. This is an accepted +/// state — reads return `None`, deletes remove it, and re-inserts overwrite it. +/// For small objects, `put_non_tombstone` is used instead: it atomically +/// rejects the write if a tombstone is already present, routing the data to +/// long-term storage instead. /// - On **delete**, the real object is removed before the tombstone. If the /// long-term delete fails, the tombstone remains and the data stays reachable. /// /// This ensures that at every intermediate step, either the data is fully -/// reachable (tombstone points to data) or fully absent — never an orphan in -/// either direction. +/// reachable (tombstone points to data), a headless tombstone is present (safe, +/// recoverable), or the object is fully absent. +/// +/// There is one known gap: a concurrent insert and delete can race to produce +/// an **orphaned long-term object** — data in LT with no tombstone in HV. This +/// will be addressed in the future. /// /// See the individual methods for per-operation tombstone behavior. /// @@ -265,18 +275,18 @@ impl StorageService { /// # Run-to-completion /// /// Once called, the operation runs to completion even if the returned future - /// is dropped (e.g., on client disconnect). This guarantees that partially - /// written objects are never left without their redirect tombstone. + /// is dropped (e.g., on client disconnect). For large objects this ensures + /// that once the redirect tombstone is written to the high-volume backend, + /// the subsequent long-term data write is not abandoned mid-flight. /// /// # Tombstone handling /// - /// If the object has a caller-provided key and a redirect tombstone already - /// exists at that key, the new write is routed to the long-term backend - /// (preserving the existing tombstone as a redirect to the new data). - /// - /// For long-term writes, the real object is persisted first, then the - /// tombstone. If the tombstone write fails, the real object is rolled back - /// to avoid orphans. + /// For large objects the redirect tombstone is written to the high-volume + /// backend first, then the data to the long-term backend. For small objects, + /// `put_non_tombstone` is used: if a tombstone is already present at the + /// key, the data is routed to the long-term backend instead (preserving the + /// existing tombstone). See [`StorageService`] for the full consistency + /// model. pub async fn insert_object( &self, context: ObjectContext, @@ -674,12 +684,12 @@ mod tests { // the task can finish writing. lt.resume.notify_one(); - // Wait for the tombstone write to the high-volume backend, which is the - // last step of the long-term insert path. - let on_put = Arc::clone(&hv.on_put); + // Wait for the long-term data write to complete — this is the final step + // of the insert, so once it fires both the tombstone and the data are present. + let on_put = Arc::clone(<.on_put); tokio::time::timeout(Duration::from_secs(5), on_put.notified()) .await - .expect("timed out waiting for tombstone write"); + .expect("timed out waiting for long-term write"); // Verify the object was fully written despite the caller being dropped. let id = ObjectId::new(make_context(), "completion-test".into()); diff --git a/objectstore-service/src/stream.rs b/objectstore-service/src/stream.rs index 2ea91700..947c5541 100644 --- a/objectstore-service/src/stream.rs +++ b/objectstore-service/src/stream.rs @@ -156,6 +156,22 @@ where stream: None, }) } + + /// Consumes self and returns all bytes as a single [`Bytes`]. + /// + /// If the peek limit was exceeded, drains the remaining stream before + /// returning. Always correct regardless of [`is_exhausted`](Self::is_exhausted). + pub async fn into_bytes(mut self) -> io::Result { + if let Some(pending) = self.pending.take() { + self.buffer.push(pending); + } + if let Some(mut stream) = self.stream.take() { + while let Some(chunk) = stream.try_next().await? { + self.buffer.push(chunk); + } + } + Ok(self.buffer.into_bytes()) + } } impl SizedPeek diff --git a/objectstore-service/src/tiered.rs b/objectstore-service/src/tiered.rs index fbd69088..eab631cb 100644 --- a/objectstore-service/src/tiered.rs +++ b/objectstore-service/src/tiered.rs @@ -12,7 +12,7 @@ use futures_util::StreamExt; use objectstore_types::metadata::Metadata; use crate::PayloadStream; -use crate::backend::common::{BoxedBackend, DeleteOutcome}; +use crate::backend::common::{BoxedBackend, ConditionalOutcome}; use crate::error::Result; use crate::id::{ObjectContext, ObjectId}; use crate::service::{DeleteResponse, GetResponse, InsertResponse, MetadataResponse}; @@ -21,6 +21,7 @@ use crate::stream::SizedPeek; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB +#[derive(Clone, Copy)] enum BackendChoice { HighVolume, LongTerm, @@ -66,7 +67,7 @@ impl TieredStorage { let start = Instant::now(); let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?; - let mut backend = if peeked.is_exhausted() { + let initial_backend = if peeked.is_exhausted() { BackendChoice::HighVolume } else { BackendChoice::LongTerm @@ -75,34 +76,54 @@ impl TieredStorage { objectstore_metrics::distribution!( "put.first_chunk.latency"@s: start.elapsed(), "usecase" => context.usecase.as_str(), - "backend_choice" => backend, + "backend_choice" => initial_backend, ); - let has_key = key.is_some(); let id = ObjectId::optional(context, key); - // There might currently be a tombstone at the given path from a previously stored object. - if has_key { - let metadata = self.high_volume_backend.get_metadata(&id).await?; - if metadata.is_some_and(|m| m.is_tombstone()) { - // Write the object to the other backend and keep the tombstone in place - backend = BackendChoice::LongTerm; - } - }; - - let (backend_ty, stored_size) = match backend { + let (final_backend_choice, backend_ty, stored_size) = match initial_backend { BackendChoice::HighVolume => { + // All data fits in the peek buffer. Extract the bytes so they can + // be re-streamed to long-term storage if HV rejects with a tombstone. let stored_size = peeked.len() as u64; - let stream = peeked.into_stream().boxed(); - - self.high_volume_backend - .put_object(&id, metadata, stream) + let bytes = peeked.into_bytes().await?; + + let outcome = self + .high_volume_backend + .put_non_tombstone( + &id, + metadata, + futures_util::stream::once(std::future::ready(Ok(bytes.clone()))).boxed(), + ) .await?; - (self.high_volume_backend.name(), stored_size) + + match outcome { + ConditionalOutcome::Executed => ( + BackendChoice::HighVolume, + self.high_volume_backend.name(), + stored_size, + ), + ConditionalOutcome::Tombstone => { + // A tombstone already exists in HV; write the data directly to + // long-term storage. No need to write the tombstone again. + self.long_term_backend + .put_object( + &id, + metadata, + futures_util::stream::once(std::future::ready(Ok(bytes))).boxed(), + ) + .await?; + ( + BackendChoice::LongTerm, + self.long_term_backend.name(), + stored_size, + ) + } + } } BackendChoice::LongTerm => { let stored_size = Arc::new(AtomicU64::new(0)); - let stream = peeked + let lt_stream = peeked .into_stream() .inspect({ let stored_size = Arc::clone(&stored_size); @@ -114,30 +135,30 @@ impl TieredStorage { }) .boxed(); - // first write the object - self.long_term_backend - .put_object(&id, metadata, stream) - .await?; - + // Write the tombstone to HV first. If this fails, no data is written + // to either backend — the operation fails cleanly with no orphan. let redirect_metadata = Metadata { is_redirect_tombstone: Some(true), expiration_policy: metadata.expiration_policy, ..Default::default() }; - let redirect_stream = futures_util::stream::empty().boxed(); - let redirect_request = - self.high_volume_backend - .put_object(&id, &redirect_metadata, redirect_stream); - - // then we write the tombstone - let redirect_result = redirect_request.await; - if redirect_result.is_err() { - // and clean up on any kind of error - self.long_term_backend.delete_object(&id).await?; - } - redirect_result?; + self.high_volume_backend + .put_object( + &id, + &redirect_metadata, + futures_util::stream::empty().boxed(), + ) + .await?; + + // Write data to long-term storage. On failure, an OrphanHV + // remains — tombstone in HV, no data in LT. Reads return None; + // deletes and re-inserts clean it up. + self.long_term_backend + .put_object(&id, metadata, lt_stream) + .await?; ( + BackendChoice::LongTerm, self.long_term_backend.name(), stored_size.load(Ordering::Acquire), ) @@ -147,13 +168,13 @@ impl TieredStorage { objectstore_metrics::distribution!( "put.latency"@s: start.elapsed(), "usecase" => id.usecase(), - "backend_choice" => backend, + "backend_choice" => final_backend_choice, "backend_type" => backend_ty ); objectstore_metrics::distribution!( "put.size"@b: stored_size, "usecase" => id.usecase(), - "backend_choice" => backend, + "backend_choice" => final_backend_choice, "backend_type" => backend_ty ); @@ -226,7 +247,7 @@ impl TieredStorage { let mut backend_type = self.high_volume_backend.name(); let outcome = self.high_volume_backend.delete_non_tombstone(id).await?; - if outcome == DeleteOutcome::Tombstone { + if outcome == ConditionalOutcome::Tombstone { backend_choice = "long-term"; backend_type = self.long_term_backend.name(); // Delete the long-term object first, then clean up the tombstone. @@ -267,7 +288,7 @@ mod tests { use objectstore_types::scope::{Scope, Scopes}; use super::*; - use crate::backend::common::BoxedBackend; + use crate::backend::in_memory::InMemoryBackend; use crate::error::Error; use crate::stream::make_stream; @@ -329,7 +350,7 @@ mod tests { assert_eq!(body.as_ref(), b"auto-keyed"); } - // --- Size-based routing tests --- + // --- Routing --- #[tokio::test] async fn small_object_goes_to_high_volume() { @@ -365,28 +386,63 @@ mod tests { .await .unwrap(); - // Real payload should be in long-term let (lt_meta, lt_bytes) = lt.get_stored(&id).unwrap(); assert_eq!(lt_bytes.len(), payload.len()); assert!(!lt_meta.is_tombstone()); - // A redirect tombstone should exist in high-volume let (hv_meta, _) = hv.get_stored(&id).unwrap(); assert!(hv_meta.is_tombstone()); } #[tokio::test] - async fn reinsert_with_existing_tombstone_routes_to_long_term() { + async fn tombstone_inherits_expiration_policy() { let (storage, hv, lt) = make_tiered_storage(); - // First: insert a large object → creates tombstone in hv, payload in lt - let large_payload = vec![0xABu8; 2 * 1024 * 1024]; + let metadata_in = Metadata { + content_type: "image/png".into(), + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), + origin: Some("10.0.0.1".into()), + ..Default::default() + }; + let payload = vec![0u8; 2 * 1024 * 1024]; // force long-term + + let id = storage + .insert_object( + make_context(), + Some("expiry-test".into()), + &metadata_in, + make_stream(&payload), + ) + .await + .unwrap(); + + // Tombstone in HV should have only expiration_policy copied. + let (tombstone, _) = hv.get_stored(&id).unwrap(); + assert!(tombstone.is_tombstone()); + assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy); + assert_eq!(tombstone.content_type, Metadata::default().content_type); + assert!(tombstone.origin.is_none()); + + // Long-term object should have the full metadata. + let (lt_meta, _) = lt.get_stored(&id).unwrap(); + assert!(!lt_meta.is_tombstone()); + assert_eq!(lt_meta.content_type, "image/png"); + assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy); + } + + /// A small object with a pre-existing tombstone at the same key is detected + /// atomically by `put_non_tombstone` and routed to long-term storage. + #[tokio::test] + async fn reinsert_small_with_existing_tombstone_routes_to_long_term() { + let (storage, hv, lt) = make_tiered_storage(); + + // Establish a tombstone via a large insert. let id = storage .insert_object( make_context(), Some("reinsert-key".into()), &Default::default(), - make_stream(&large_payload), + make_stream(&vec![0xABu8; 2 * 1024 * 1024]), ) .await .unwrap(); @@ -394,9 +450,8 @@ mod tests { let (hv_meta, _) = hv.get_stored(&id).unwrap(); assert!(hv_meta.is_tombstone()); - // Now re-insert a SMALL payload with the same key. The service should - // detect the existing tombstone and route to long-term anyway. - let small_payload = vec![0xCDu8; 100]; // well under 1 MiB threshold + // Re-insert a small payload at the same key. + let small_payload = vec![0xCDu8; 100]; storage .insert_object( make_context(), @@ -407,88 +462,157 @@ mod tests { .await .unwrap(); - // The small object should be in long-term (not high-volume) + // Small object goes to LT; tombstone in HV is preserved. let (lt_meta, lt_bytes) = lt.get_stored(&id).unwrap(); assert!(!lt_meta.is_tombstone()); assert_eq!(lt_bytes.len(), small_payload.len()); - - // The tombstone in hv should still be present let (hv_meta, _) = hv.get_stored(&id).unwrap(); assert!(hv_meta.is_tombstone()); } + /// A small object at a key is overwritten by a large one: the large insert + /// replaces the small HV entry with a tombstone and writes data to LT. #[tokio::test] - async fn tombstone_inherits_expiration_policy() { + async fn overwrite_small_with_large_no_prior_tombstone() { let (storage, hv, lt) = make_tiered_storage(); - let metadata_in = Metadata { - content_type: "image/png".into(), - expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), - origin: Some("10.0.0.1".into()), - ..Default::default() - }; - let payload = vec![0u8; 2 * 1024 * 1024]; // force long-term - let id = storage .insert_object( make_context(), - Some("expiry-test".into()), - &metadata_in, - make_stream(&payload), + Some("overwrite-key".into()), + &Default::default(), + make_stream(&[0xAAu8; 100]), ) .await .unwrap(); - // The tombstone in hv should have ONLY expiration_policy copied - let (tombstone, _) = hv.get_stored(&id).unwrap(); - assert!(tombstone.is_tombstone()); - assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy); - assert_eq!(tombstone.content_type, Metadata::default().content_type); - assert!(tombstone.origin.is_none()); + assert!(hv.contains(&id)); + assert!(!lt.contains(&id)); - // The long-term object should have the full metadata - let (lt_meta, _) = lt.get_stored(&id).unwrap(); + let large_payload = vec![0xBBu8; 2 * 1024 * 1024]; + storage + .insert_object( + make_context(), + Some("overwrite-key".into()), + &Default::default(), + make_stream(&large_payload), + ) + .await + .unwrap(); + + let (hv_meta, _) = hv.get_stored(&id).unwrap(); + assert!( + hv_meta.is_tombstone(), + "HV must have tombstone after large overwrite" + ); + let (lt_meta, lt_bytes) = lt.get_stored(&id).unwrap(); assert!(!lt_meta.is_tombstone()); - assert_eq!(lt_meta.content_type, "image/png"); - assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy); + assert_eq!(lt_bytes.len(), large_payload.len()); } - // --- Tombstone redirect tests --- - + /// After a large object is fully deleted, a small re-insert at the same key + /// goes directly to HV (no tombstone present). #[tokio::test] - async fn reads_follow_tombstone_redirect() { - let (storage, _hv, _lt) = make_tiered_storage(); - let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB + async fn overwrite_large_with_small_after_delete() { + let (storage, hv, lt) = make_tiered_storage(); - let metadata_in = Metadata { - content_type: "image/png".into(), - ..Default::default() - }; let id = storage .insert_object( make_context(), - Some("redirect-read".into()), - &metadata_in, - make_stream(&payload), + Some("cycle-key".into()), + &Default::default(), + make_stream(&vec![0xABu8; 2 * 1024 * 1024]), ) .await .unwrap(); - // get_object should transparently follow the tombstone - let (metadata, stream) = storage.get_object(&id).await.unwrap().unwrap(); - let body: BytesMut = stream.try_collect().await.unwrap(); - assert_eq!(body.len(), payload.len()); - assert!(!metadata.is_tombstone()); + storage.delete_object(&id).await.unwrap(); + assert!(!hv.contains(&id)); + assert!(!lt.contains(&id)); - // get_metadata should also follow the tombstone - let metadata = storage.get_metadata(&id).await.unwrap().unwrap(); - assert!(!metadata.is_tombstone()); - assert_eq!(metadata.content_type, "image/png"); + let small_payload = vec![0xCDu8; 100]; + storage + .insert_object( + make_context(), + Some("cycle-key".into()), + &Default::default(), + make_stream(&small_payload), + ) + .await + .unwrap(); + + let (hv_meta, hv_bytes) = hv.get_stored(&id).unwrap(); + assert!( + !hv_meta.is_tombstone(), + "small object must be in HV, not a tombstone" + ); + assert_eq!(hv_bytes.len(), small_payload.len()); + assert!(!lt.contains(&id)); } - // --- Tombstone inconsistency tests --- + // --- Multi-chunk streaming --- + + #[tokio::test] + async fn multi_chunk_large_object_chains_buffered_and_remaining() { + let (storage, _hv, lt) = make_tiered_storage(); + + // Deliver a 2 MiB payload across multiple chunks that individually + // fit under the threshold but collectively exceed it. + let chunk_size = 512 * 1024; // 512 KiB per chunk + let chunk_count = 4; // 4 × 512 KiB = 2 MiB total + let chunks: Vec> = (0..chunk_count) + .map(|i| Ok(bytes::Bytes::from(vec![i as u8; chunk_size]))) + .collect(); + let stream = futures_util::stream::iter(chunks).boxed(); + + let id = storage + .insert_object( + make_context(), + Some("multi-chunk".into()), + &Default::default(), + stream, + ) + .await + .unwrap(); + + let (lt_meta, lt_bytes) = lt.get_stored(&id).unwrap(); + assert!(!lt_meta.is_tombstone()); + assert_eq!(lt_bytes.len(), chunk_size * chunk_count); - /// A backend where put_object always fails, but reads/deletes work normally. + for i in 0..chunk_count { + let offset = i * chunk_size; + assert!( + lt_bytes[offset..offset + chunk_size] + .iter() + .all(|&b| b == i as u8), + "data mismatch in chunk {i}" + ); + } + } + + // --- Tombstone consistency --- + // + // These tests verify the invariant: every object in long-term storage must be + // reachable via a redirect tombstone in the high-volume backend, and every + // tombstone must either point to existing LT data or be safely recoverable. + // + // Operation Scenario Outcome Test + // --------- ----------------------------------------- ------------------------- ---- + // read tombstone present, LT data present consistent reads_follow_tombstone_redirect + // read tombstone present, LT data absent headless tombstone orphan_hv_returns_none + // insert HV put_non_tombstone fails (small) consistent insert_small_hv_put_fails + // insert HV tombstone write fails (large) consistent insert_large_hv_tombstone_write_fails + // insert LT data write fails after HV tombstone headless tombstone insert_large_lt_put_fails_leaves_orphan_hv + // insert pod kill after HV tombstone, before LT headless tombstone pod_kill_during_insert_large_after_hv_tombstone + // delete clean delete consistent delete_cleans_up_both_backends + // delete LT delete fails consistent tombstone_preserved_when_long_term_delete_fails + // insert×2 concurrent large inserts (HV-first) consistent race_concurrent_insert_insert_no_orphan + // insert×2 small insert during large LT write consistent race_small_insert_during_large_lt_write + // insert+delete concurrent insert and delete race ORPHAN VIOLATION race_concurrent_insert_delete_causes_orphan_lt + + // Mock backends used by the consistency tests. + + /// A backend where `put_object` always fails; reads and deletes delegate normally. #[derive(Debug)] struct FailingPutBackend(InMemoryBackend); @@ -506,7 +630,7 @@ mod tests { ) -> Result<()> { Err(Error::Io(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, - "simulated tombstone write failure", + "simulated put failure", ))) } @@ -519,191 +643,478 @@ mod tests { } } - /// If the tombstone write to the high-volume backend fails after the long-term - /// write succeeds, the long-term object must be cleaned up so we never leave - /// an unreachable orphan in long-term storage. + /// A backend where `delete_object` always fails; reads and puts delegate normally. + #[derive(Debug)] + struct FailingDeleteBackend(InMemoryBackend); + + #[async_trait::async_trait] + impl crate::backend::common::Backend for FailingDeleteBackend { + fn name(&self) -> &'static str { + "failing-delete" + } + + async fn put_object( + &self, + id: &ObjectId, + metadata: &Metadata, + stream: PayloadStream, + ) -> Result<()> { + self.0.put_object(id, metadata, stream).await + } + + async fn get_object(&self, id: &ObjectId) -> Result> { + self.0.get_object(id).await + } + + async fn delete_object(&self, _id: &ObjectId) -> Result<()> { + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "simulated delete failure", + ))) + } + } + + /// A backend that notifies when `put_object` starts and waits to be resumed, + /// allowing tests to observe and control intermediate write state. + #[derive(Debug)] + struct SyncBackend { + inner: InMemoryBackend, + put_started: std::sync::Arc, + put_resume: std::sync::Arc, + } + + #[async_trait::async_trait] + impl crate::backend::common::Backend for SyncBackend { + fn name(&self) -> &'static str { + "sync" + } + + async fn put_object( + &self, + id: &ObjectId, + metadata: &Metadata, + stream: PayloadStream, + ) -> Result<()> { + self.put_started.notify_one(); + self.put_resume.notified().await; + self.inner.put_object(id, metadata, stream).await + } + + async fn get_object(&self, id: &ObjectId) -> Result> { + self.inner.get_object(id).await + } + + async fn delete_object(&self, id: &ObjectId) -> Result<()> { + self.inner.delete_object(id).await + } + } + + // Reads + + #[tokio::test] + async fn reads_follow_tombstone_redirect() { + let (storage, _hv, _lt) = make_tiered_storage(); + let metadata_in = Metadata { + content_type: "image/png".into(), + ..Default::default() + }; + let payload = vec![0xCDu8; 2 * 1024 * 1024]; + let id = storage + .insert_object( + make_context(), + Some("redirect-read".into()), + &metadata_in, + make_stream(&payload), + ) + .await + .unwrap(); + + let (metadata, stream) = storage.get_object(&id).await.unwrap().unwrap(); + let body: BytesMut = stream.try_collect().await.unwrap(); + assert_eq!(body.len(), payload.len()); + assert!(!metadata.is_tombstone()); + + let metadata = storage.get_metadata(&id).await.unwrap().unwrap(); + assert!(!metadata.is_tombstone()); + assert_eq!(metadata.content_type, "image/png"); + } + + /// An OrphanHV (tombstone in HV, no corresponding data in LT) is a + /// recoverable state: reads return None rather than an error. #[tokio::test] - async fn no_orphan_when_tombstone_write_fails() { + async fn orphan_hv_returns_none() { + let (storage, _hv, lt) = make_tiered_storage(); + let id = storage + .insert_object( + make_context(), + Some("orphan-tombstone".into()), + &Default::default(), + make_stream(&vec![0xCDu8; 2 * 1024 * 1024]), + ) + .await + .unwrap(); + + lt.remove(&id); // simulate missing LT data + + assert!(storage.get_object(&id).await.unwrap().is_none()); + assert!(storage.get_metadata(&id).await.unwrap().is_none()); + } + + // Insert failures — clean + + /// Small-object write via `put_non_tombstone`: if HV fails, the insert fails + /// cleanly with nothing written to either backend. + #[tokio::test] + async fn insert_small_hv_put_fails() { let lt = InMemoryBackend::new("lt"); - let hv: BoxedBackend = Box::new(FailingPutBackend(InMemoryBackend::new("hv"))); let storage = TieredStorage { - high_volume_backend: hv, + high_volume_backend: Box::new(FailingPutBackend(InMemoryBackend::new("hv"))), long_term_backend: Box::new(lt.clone()), }; - let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path let result = storage .insert_object( make_context(), - Some("orphan-test".into()), + Some("small-fail".into()), &Default::default(), - make_stream(&payload), + make_stream(b"tiny"), ) .await; assert!(result.is_err()); - assert!(lt.is_empty(), "long-term object not cleaned up"); + assert!(lt.is_empty()); } - /// If a tombstone exists in high-volume but the corresponding object is - /// missing from long-term storage (e.g. due to a race condition or partial - /// cleanup), reads should gracefully return None rather than error. + /// Large-object HV-first: if the tombstone write fails, the insert fails cleanly + /// with nothing written to either backend. #[tokio::test] - async fn orphan_tombstone_returns_none() { - let (storage, _hv, lt) = make_tiered_storage(); - let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB + async fn insert_large_hv_tombstone_write_fails() { + let lt = InMemoryBackend::new("lt"); + let storage = TieredStorage { + high_volume_backend: Box::new(FailingPutBackend(InMemoryBackend::new("hv"))), + long_term_backend: Box::new(lt.clone()), + }; - let id = storage + let result = storage .insert_object( make_context(), - Some("orphan-tombstone".into()), + Some("hv-fail-test".into()), &Default::default(), - make_stream(&payload), + make_stream(&vec![0xABu8; 2 * 1024 * 1024]), ) - .await - .unwrap(); + .await; + + assert!(result.is_err()); + assert!(lt.is_empty()); + } - // Remove the long-term object, leaving an orphan tombstone in hv - lt.remove(&id); + // Insert failures — note: OrphanHV possible - assert!( - storage.get_object(&id).await.unwrap().is_none(), - "orphan tombstone should resolve to None on get_object" - ); - assert!( - storage.get_metadata(&id).await.unwrap().is_none(), - "orphan tombstone should resolve to None on get_metadata" - ); + /// When the LT data write fails after the HV tombstone is written, an + /// OrphanHV is left: tombstone in HV, no data in LT. Reads return None, + /// deletes remove the tombstone, and re-inserts overwrite it. + #[tokio::test] + async fn insert_large_lt_put_fails_leaves_orphan_hv() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt-inner"); + let storage = TieredStorage { + high_volume_backend: Box::new(hv.clone()), + long_term_backend: Box::new(FailingPutBackend(lt_inner.clone())), + }; + + let result = storage + .insert_object( + make_context(), + Some("lt-fail-test".into()), + &Default::default(), + make_stream(&vec![0xABu8; 2 * 1024 * 1024]), + ) + .await; + + assert!(result.is_err()); + // OrphanHV: tombstone in HV, no data in LT. + let id = ObjectId::new(make_context(), "lt-fail-test".into()); + let (hv_meta, _) = hv.get_stored(&id).expect("tombstone must be in HV"); + assert!(hv_meta.is_tombstone()); + assert!(lt_inner.is_empty()); } - // --- Delete tests --- + /// If the process is killed after the HV tombstone is written but before the LT + /// write completes, an OrphanHV is left: tombstone in HV, no data in LT. + /// Reads return None; deletes and re-inserts recover the key. + #[tokio::test] + async fn pod_kill_during_insert_large_after_hv_tombstone() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let put_started = std::sync::Arc::new(tokio::sync::Notify::new()); + let put_resume = std::sync::Arc::new(tokio::sync::Notify::new()); + + let storage = std::sync::Arc::new(TieredStorage { + high_volume_backend: Box::new(hv.clone()), + long_term_backend: Box::new(SyncBackend { + inner: lt_inner.clone(), + put_started: std::sync::Arc::clone(&put_started), + put_resume: std::sync::Arc::clone(&put_resume), + }), + }); + + let id = ObjectId::new(make_context(), "pod-kill-key".into()); + let storage_task = std::sync::Arc::clone(&storage); + let insert_task = tokio::spawn(async move { + storage_task + .insert_object( + make_context(), + Some("pod-kill-key".into()), + &Default::default(), + make_stream(&vec![0xABu8; 2 * 1024 * 1024]), + ) + .await + }); + + // Wait until the HV tombstone is written and the LT put is blocked. + put_started.notified().await; + let (hv_meta, _) = hv.get_stored(&id).expect("tombstone must exist"); + assert!(hv_meta.is_tombstone()); + + // Simulate pod kill. + insert_task.abort(); + put_resume.notify_one(); // unblock backend to avoid deadlock + let _ = insert_task.await; + + // OrphanHV: tombstone in HV, no data in LT. + let (hv_meta, _) = hv.get_stored(&id).expect("tombstone must survive pod kill"); + assert!(hv_meta.is_tombstone()); + assert!(lt_inner.is_empty()); + assert!(storage.get_object(&id).await.unwrap().is_none()); + } + + // Delete #[tokio::test] async fn delete_cleans_up_both_backends() { let (storage, hv, lt) = make_tiered_storage(); - let payload = vec![0u8; 2 * 1024 * 1024]; // 2 MiB - let id = storage .insert_object( make_context(), Some("delete-both".into()), &Default::default(), - make_stream(&payload), + make_stream(&vec![0u8; 2 * 1024 * 1024]), ) .await .unwrap(); storage.delete_object(&id).await.unwrap(); - assert!(!hv.contains(&id), "tombstone not cleaned up"); - assert!(!lt.contains(&id), "object not cleaned up"); + assert!(!hv.contains(&id)); + assert!(!lt.contains(&id)); } - /// A backend wrapper that delegates everything except `delete_object`, which always fails. - #[derive(Debug)] - struct FailingDeleteBackend(InMemoryBackend); - - #[async_trait::async_trait] - impl crate::backend::common::Backend for FailingDeleteBackend { - fn name(&self) -> &'static str { - "failing-delete" - } - - async fn put_object( - &self, - id: &ObjectId, - metadata: &Metadata, - stream: PayloadStream, - ) -> Result<()> { - self.0.put_object(id, metadata, stream).await - } - - async fn get_object(&self, id: &ObjectId) -> Result> { - self.0.get_object(id).await - } - - async fn delete_object(&self, _id: &ObjectId) -> Result<()> { - Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::ConnectionRefused, - "simulated long-term delete failure", - ))) - } - } - - /// When the long-term delete fails, the tombstone must be preserved so the - /// object remains reachable and no data is orphaned. + /// When the LT delete fails, the tombstone is preserved so the object remains + /// reachable — no data is orphaned. #[tokio::test] async fn tombstone_preserved_when_long_term_delete_fails() { let hv = InMemoryBackend::new("hv"); - let lt: BoxedBackend = Box::new(FailingDeleteBackend(InMemoryBackend::new("lt"))); let storage = TieredStorage { high_volume_backend: Box::new(hv.clone()), - long_term_backend: lt, + long_term_backend: Box::new(FailingDeleteBackend(InMemoryBackend::new("lt"))), }; - let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> goes to long-term let id = storage .insert_object( make_context(), Some("fail-delete".into()), &Default::default(), - make_stream(&payload), + make_stream(&vec![0xABu8; 2 * 1024 * 1024]), ) .await .unwrap(); - let result = storage.delete_object(&id).await; - assert!(result.is_err()); + assert!(storage.delete_object(&id).await.is_err()); - let (hv_meta, _) = hv.get_stored(&id).expect("tombstone removed"); + let (hv_meta, _) = hv.get_stored(&id).expect("tombstone must be preserved"); assert!(hv_meta.is_tombstone()); - // The object should still be reachable through the service let (metadata, stream) = storage.get_object(&id).await.unwrap().unwrap(); let body: BytesMut = stream.try_collect().await.unwrap(); - assert_eq!(body.len(), payload.len()); + assert_eq!(body.len(), 2 * 1024 * 1024); assert!(!metadata.is_tombstone()); } - // --- Multi-chunk streaming tests --- + // Concurrent insert + insert — clean + /// Two concurrent large inserts at the same key: both write (or overwrite) the + /// tombstone in HV, both write to LT — last write wins, tombstone stays, no + /// orphan. #[tokio::test] - async fn multi_chunk_large_object_chains_buffered_and_remaining() { - let (storage, _hv, lt) = make_tiered_storage(); + async fn race_concurrent_insert_insert_no_orphan() { + let (storage, hv, lt) = make_tiered_storage(); + let storage = std::sync::Arc::new(storage); + let payload = vec![0xABu8; 2 * 1024 * 1024]; - // Deliver a 2 MiB payload across multiple chunks that individually - // fit under the threshold but collectively exceed it. - let chunk_size = 512 * 1024; // 512 KiB per chunk - let chunk_count = 4; // 4 × 512 KiB = 2 MiB total - let chunks: Vec> = (0..chunk_count) - .map(|i| Ok(bytes::Bytes::from(vec![i as u8; chunk_size]))) - .collect(); - let stream = futures_util::stream::iter(chunks).boxed(); + let s1 = std::sync::Arc::clone(&storage); + let p1 = payload.clone(); + let t1 = tokio::spawn(async move { + s1.insert_object( + make_context(), + Some("race-key".into()), + &Default::default(), + make_stream(&p1), + ) + .await + }); - let id = storage - .insert_object( + let s2 = std::sync::Arc::clone(&storage); + let t2 = tokio::spawn(async move { + s2.insert_object( make_context(), - Some("multi-chunk".into()), + Some("race-key".into()), &Default::default(), - stream, + make_stream(&payload), ) .await - .unwrap(); + }); - // Should have been routed to long-term (over 1 MiB). - let (lt_meta, lt_bytes) = lt.get_stored(&id).unwrap(); - assert!(!lt_meta.is_tombstone()); - assert_eq!(lt_bytes.len(), chunk_size * chunk_count); + t1.await.unwrap().unwrap(); + t2.await.unwrap().unwrap(); - // Verify data integrity — each chunk's fill byte should appear in order. - for i in 0..chunk_count { - let offset = i * chunk_size; - assert!( - lt_bytes[offset..offset + chunk_size] - .iter() - .all(|&b| b == i as u8), - "data mismatch in chunk {i}" - ); - } + let id = ObjectId::new(make_context(), "race-key".into()); + let (hv_meta, _) = hv.get_stored(&id).expect("HV must have tombstone"); + assert!(hv_meta.is_tombstone()); + assert!(lt.contains(&id)); + assert!(storage.get_object(&id).await.unwrap().is_some()); + } + + /// A small insert that arrives while a large insert's LT write is in progress + /// sees the HV tombstone via `put_non_tombstone` and routes its payload to LT. + /// Both writes complete consistently. + /// + /// Interleaving: + /// 1. Large insert writes tombstone to HV, blocks in LT. + /// 2. Small insert: `put_non_tombstone` sees tombstone → routes payload to LT. + /// 3. Both LT writes complete → tombstone in HV, data in LT. + #[tokio::test] + async fn race_small_insert_during_large_lt_write() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let put_started = std::sync::Arc::new(tokio::sync::Notify::new()); + let put_resume = std::sync::Arc::new(tokio::sync::Notify::new()); + + let storage = std::sync::Arc::new(TieredStorage { + high_volume_backend: Box::new(hv.clone()), + long_term_backend: Box::new(SyncBackend { + inner: lt_inner.clone(), + put_started: std::sync::Arc::clone(&put_started), + put_resume: std::sync::Arc::clone(&put_resume), + }), + }); + + let id = ObjectId::new(make_context(), "concurrent-key".into()); + + // Start large insert: HV tombstone is written immediately, LT write blocks. + let storage_large = std::sync::Arc::clone(&storage); + let large_task = tokio::spawn(async move { + storage_large + .insert_object( + make_context(), + Some("concurrent-key".into()), + &Default::default(), + make_stream(&vec![0xABu8; 2 * 1024 * 1024]), + ) + .await + }); + + // Wait until large insert is blocked in LT (tombstone already in HV). + put_started.notified().await; + + // Small insert arrives. put_non_tombstone sees the tombstone → routes to LT. + // LT is still SyncBackend, so this write blocks too. + let storage_small = std::sync::Arc::clone(&storage); + let small_task = tokio::spawn(async move { + storage_small + .insert_object( + make_context(), + Some("concurrent-key".into()), + &Default::default(), + make_stream(b"small payload"), + ) + .await + }); + + // Wait until small insert is also blocked in LT. + put_started.notified().await; + + // Both tasks are waiting on put_resume. Yield to ensure both have reached + // the await point, then wake them together. + tokio::task::yield_now().await; + put_resume.notify_waiters(); + + large_task.await.unwrap().unwrap(); + small_task.await.unwrap().unwrap(); + + // Both writes completed consistently: tombstone in HV, data in LT. + let (hv_meta, _) = hv.get_stored(&id).expect("tombstone must be in HV"); + assert!(hv_meta.is_tombstone()); + assert!(lt_inner.contains(&id)); + assert!(storage.get_object(&id).await.unwrap().is_some()); + } + + // Concurrent insert + delete — VIOLATION: OrphanLT + + /// Known gap: a concurrent insert and delete can race to leave an OrphanLT. + /// + /// Interleaving (HV-first): + /// 1. Insert writes tombstone to HV. + /// 2. Delete sees tombstone → deletes LT (empty) → deletes HV tombstone. + /// 3. Insert writes data to LT (tombstone already gone). + /// + /// Result: LT has data with no tombstone in HV — unreachable via the service. + /// Requires per-key serialization to fix; out of scope. + #[tokio::test] + async fn race_concurrent_insert_delete_causes_orphan_lt() { + let hv = InMemoryBackend::new("hv"); + let lt_inner = InMemoryBackend::new("lt"); + let put_started = std::sync::Arc::new(tokio::sync::Notify::new()); + let put_resume = std::sync::Arc::new(tokio::sync::Notify::new()); + + let storage = std::sync::Arc::new(TieredStorage { + high_volume_backend: Box::new(hv.clone()), + long_term_backend: Box::new(SyncBackend { + inner: lt_inner.clone(), + put_started: std::sync::Arc::clone(&put_started), + put_resume: std::sync::Arc::clone(&put_resume), + }), + }); + + let id = ObjectId::new(make_context(), "race-del-key".into()); + + let storage_insert = std::sync::Arc::clone(&storage); + let insert_task = tokio::spawn(async move { + storage_insert + .insert_object( + make_context(), + Some("race-del-key".into()), + &Default::default(), + make_stream(&vec![0xABu8; 2 * 1024 * 1024]), + ) + .await + }); + + // Wait until the HV tombstone is written and the LT put is blocked. + put_started.notified().await; + let (hv_meta, _) = hv.get_stored(&id).expect("tombstone must exist"); + assert!(hv_meta.is_tombstone()); + + // Delete runs concurrently: sees tombstone → deletes LT (empty) → removes tombstone. + storage.delete_object(&id).await.unwrap(); + assert!(!hv.contains(&id)); + + // Resume insert: LT write completes after the tombstone is gone. + put_resume.notify_one(); + insert_task.await.unwrap().unwrap(); + + // OrphanLT: data in LT, no tombstone in HV → unreachable. + assert!(lt_inner.contains(&id)); + assert!(!hv.contains(&id)); + assert!(storage.get_object(&id).await.unwrap().is_none()); } }