Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions objectstore-service/docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ backend; objects exceeding it go to the long-term backend.

See [`StorageConfig`] for available backend implementations.

## Redirect Tombstones

For large objects, the high-volume backend stores a **redirect tombstone** — a
zero-payload marker whose metadata has `is_redirect_tombstone: true`. This
allows reads to check only the high-volume backend and follow the tombstone to
long-term storage, without scanning both backends on every read.

See [`StorageService`] for write ordering, consistency guarantees, and known
gaps.

# Metadata and Payload

Every object consists of structured **metadata** and a binary **payload**.
Expand Down
135 changes: 102 additions & 33 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tonic::Code;

use crate::PayloadStream;
use crate::backend::common::{
Backend, DeleteOutcome, DeleteResponse, GetResponse, MetadataResponse, PutResponse,
Backend, ConditionalOutcome, DeleteResponse, GetResponse, MetadataResponse, PutResponse,
};
use crate::error::{Error, Result};
use crate::gcp_auth::PrefetchingTokenProvider;
Expand Down Expand Up @@ -100,6 +100,28 @@ fn column_filter(column: &[u8]) -> v2::RowFilter {
}
}

/// Creates a predicate filter that matches rows containing a redirect tombstone.
///
/// Checks the metadata column for the `is_redirect_tombstone` marker. We cannot
/// use payload-column presence because `put_row` always writes a `p` cell — even
/// for tombstones (with empty bytes).
fn tombstone_predicate() -> v2::RowFilter {
v2::RowFilter {
filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
filters: vec![
column_filter(COLUMN_METADATA),
v2::RowFilter {
filter: Some(v2::row_filter::Filter::ValueRegexFilter(
// RE2 full-match anchored to the JSON start. The field ordering of
// Metadata ensures `is_redirect_tombstone` is serialized first.
b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(),
)),
},
],
})),
}
}

/// Parsed data from a BigTable row's cells.
struct RowData {
metadata: Metadata,
Expand Down Expand Up @@ -292,22 +314,22 @@ impl BigTableBackend {
Ok(response.into_inner())
}

async fn put_row(
&self,
path: Vec<u8>,
metadata: &Metadata,
payload: Vec<u8>,
action: &str,
) -> Result<v2::MutateRowResponse> {
/// Builds the three mutations that write an object: delete the existing row,
/// set the payload cell, set the metadata cell.
///
/// Used by both `put_row` (unconditional write) and `put_non_tombstone`
/// (conditional write via `CheckAndMutateRowRequest`).
///
/// NB: We explicitly delete the row before writing to clear stale metadata
/// on overwrite.
fn write_mutations(metadata: &Metadata, payload: Vec<u8>) -> Result<[mutation::Mutation; 3]> {
let now = SystemTime::now();
let (family, timestamp_micros) = match metadata.expiration_policy {
ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, now)?),
};

let mutations = [
// NB: We explicitly delete the row to clear metadata on overwrite.
Ok([
mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}),
mutation::Mutation::SetCell(mutation::SetCell {
family_name: family.to_owned(),
Expand All @@ -324,8 +346,18 @@ impl BigTableBackend {
cause,
})?,
}),
];
self.mutate(path, mutations, action).await
])
}

async fn put_row(
&self,
path: Vec<u8>,
metadata: &Metadata,
payload: Vec<u8>,
action: &str,
) -> Result<v2::MutateRowResponse> {
self.mutate(path, Self::write_mutations(metadata, payload)?, action)
.await
}
}

Expand Down Expand Up @@ -355,6 +387,56 @@ impl Backend for BigTableBackend {
Ok(())
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn put_non_tombstone(
&self,
id: &ObjectId,
metadata: &Metadata,
mut stream: PayloadStream,
) -> Result<ConditionalOutcome> {
tracing::debug!("Conditional write to Bigtable backend");

// Buffer the payload before the conditional RPC so the operation
// appears atomic: if the predicate is false we write, otherwise we
// discard the buffered bytes and return Tombstone.
let mut payload = ChunkedBytes::new(0);
while let Some(chunk) = stream.try_next().await? {
payload.push(chunk);
}

let path = id.as_storage_path().to_string().into_bytes();
let write_mutations: Vec<v2::Mutation> =
Self::write_mutations(metadata, payload.into_bytes().into())?
.into_iter()
.map(|m| v2::Mutation { mutation: Some(m) })
.collect();

let request = v2::CheckAndMutateRowRequest {
table_name: self.table_path.clone(),
row_key: path,
predicate_filter: Some(tombstone_predicate()),
true_mutations: vec![], // Tombstone matched → reject (no write).
false_mutations: write_mutations, // Not a tombstone → write.
..Default::default()
};

let is_tombstone = self
.with_retry("put_non_tombstone", || async {
self.bigtable
.client()
.check_and_mutate_row(request.clone())
.await
})
.await?
.predicate_matched;

Ok(if is_tombstone {
ConditionalOutcome::Tombstone
} else {
ConditionalOutcome::Executed
})
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
tracing::debug!("Reading from Bigtable backend");
Expand Down Expand Up @@ -422,7 +504,7 @@ impl Backend for BigTableBackend {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<DeleteOutcome> {
async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<ConditionalOutcome> {
tracing::debug!("Conditional delete from Bigtable backend");

let path = id.as_storage_path().to_string().into_bytes();
Expand All @@ -439,20 +521,7 @@ impl Backend for BigTableBackend {
let request = v2::CheckAndMutateRowRequest {
table_name: self.table_path.clone(),
row_key: path,
predicate_filter: Some(v2::RowFilter {
filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain {
filters: vec![
column_filter(COLUMN_METADATA),
v2::RowFilter {
filter: Some(v2::row_filter::Filter::ValueRegexFilter(
// RE2 full-match anchored to the JSON start. The field ordering of
// Metadata ensures `is_redirect_tombstone` is serialized first.
b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(),
)),
},
],
})),
}),
predicate_filter: Some(tombstone_predicate()),
true_mutations: vec![], // Tombstone matched → leave intact (no mutations).
false_mutations: vec![delete_mutation], // Not a tombstone → delete the row.
..Default::default()
Expand All @@ -469,9 +538,9 @@ impl Backend for BigTableBackend {
.predicate_matched;

if is_tombstone {
Ok(DeleteOutcome::Tombstone)
Ok(ConditionalOutcome::Tombstone)
} else {
Ok(DeleteOutcome::Deleted)
Ok(ConditionalOutcome::Executed)
}
}
}
Expand Down Expand Up @@ -873,7 +942,7 @@ mod tests {
.await?;

let result = backend.delete_non_tombstone(&id).await?;
assert_eq!(result, DeleteOutcome::Deleted);
assert_eq!(result, ConditionalOutcome::Executed);

let get_result = backend.get_object(&id).await?;
assert!(get_result.is_none());
Expand All @@ -894,7 +963,7 @@ mod tests {
backend.put_object(&id, &metadata, make_stream(b"")).await?;

let result = backend.delete_non_tombstone(&id).await?;
assert_eq!(result, DeleteOutcome::Tombstone);
assert_eq!(result, ConditionalOutcome::Tombstone);

// Tombstone should still exist — delete_non_tombstone leaves it intact.
let get_result = backend.get_metadata(&id).await?;
Expand All @@ -912,7 +981,7 @@ mod tests {

let id = make_id();
let result = backend.delete_non_tombstone(&id).await?;
assert_eq!(result, DeleteOutcome::Deleted);
assert_eq!(result, ConditionalOutcome::Executed);

Ok(())
}
Expand Down
45 changes: 35 additions & 10 deletions objectstore-service/src/backend/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ pub(super) type MetadataResponse = Option<Metadata>;
/// Backend response for delete operations.
pub(super) type DeleteResponse = ();

/// Response from [`Backend::delete_non_tombstone`].
/// Outcome of a conditional operation that is skipped when a redirect tombstone is present.
///
/// Returned by [`Backend::put_non_tombstone`] and [`Backend::delete_non_tombstone`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeleteOutcome {
/// The entity was a redirect tombstone; it was left intact.
pub enum ConditionalOutcome {
/// The operation was executed (no tombstone was present).
Executed,
/// A redirect tombstone was found; the operation was skipped.
Tombstone,
/// The entity was a regular object (now deleted) or non-existent.
Deleted,
}

/// A type-erased [`Backend`] instance.
Expand All @@ -48,6 +50,29 @@ pub trait Backend: Debug + Send + Sync + 'static {
stream: PayloadStream,
) -> Result<PutResponse>;

/// Writes the object only if the row does NOT already contain a redirect tombstone.
///
/// Returns [`ConditionalOutcome::Executed`] after a successful write, or
/// [`ConditionalOutcome::Tombstone`] (leaving the row intact) when a redirect
/// tombstone is already present.
///
/// The default implementation is a non-atomic read-then-write; backends
/// should override this with an atomic conditional mutation where possible.
async fn put_non_tombstone(
&self,
id: &ObjectId,
metadata: &Metadata,
stream: PayloadStream,
) -> Result<ConditionalOutcome> {
let existing = self.get_metadata(id).await?;
if existing.is_some_and(|m| m.is_tombstone()) {
Ok(ConditionalOutcome::Tombstone)
} else {
self.put_object(id, metadata, stream).await?;
Ok(ConditionalOutcome::Executed)
}
}

/// Retrieves an object at the given path, returning its metadata and a stream of bytes.
async fn get_object(&self, id: &ObjectId) -> Result<GetResponse>;

Expand All @@ -64,16 +89,16 @@ pub trait Backend: Debug + Send + Sync + 'static {

/// Deletes the object only if it is NOT a redirect tombstone.
///
/// Returns [`DeleteOutcome::Tombstone`] (leaving the row intact) when
/// the object is a redirect tombstone, or [`DeleteOutcome::Deleted`]
/// Returns [`ConditionalOutcome::Tombstone`] (leaving the row intact) when
/// the object is a redirect tombstone, or [`ConditionalOutcome::Executed`]
/// (after deleting it) for regular objects and non-existent rows.
async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<DeleteOutcome> {
async fn delete_non_tombstone(&self, id: &ObjectId) -> Result<ConditionalOutcome> {
let metadata = self.get_metadata(id).await?;
if metadata.is_some_and(|m| m.is_tombstone()) {
Ok(DeleteOutcome::Tombstone)
Ok(ConditionalOutcome::Tombstone)
} else {
self.delete_object(id).await?;
Ok(DeleteOutcome::Deleted)
Ok(ConditionalOutcome::Executed)
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion objectstore-service/src/backend/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bytes::{Bytes, BytesMut};
use futures_util::{StreamExt, TryStreamExt};
use objectstore_types::metadata::Metadata;

use super::common::{DeleteResponse, GetResponse, PutResponse};
use super::common::{ConditionalOutcome, DeleteResponse, GetResponse, PutResponse};
use crate::PayloadStream;
use crate::error::Result;
use crate::id::ObjectId;
Expand Down Expand Up @@ -76,6 +76,25 @@ impl super::common::Backend for InMemoryBackend {
Ok(())
}

async fn put_non_tombstone(
&self,
id: &ObjectId,
metadata: &Metadata,
stream: PayloadStream,
) -> Result<ConditionalOutcome> {
// Buffer the payload before acquiring the lock so the lock is held
// only for the check-and-write, making the operation atomic.
let payload: BytesMut = stream.try_collect().await?;
let mut store = self.store.lock().unwrap();
if store.get(id).is_some_and(|(m, _)| m.is_tombstone()) {
return Ok(ConditionalOutcome::Tombstone);
}
let mut metadata = metadata.clone();
metadata.size = Some(payload.len());
store.insert(id.clone(), (metadata, payload.freeze()));
Ok(ConditionalOutcome::Executed)
}

async fn get_object(&self, id: &ObjectId) -> Result<GetResponse> {
let entry = self.store.lock().unwrap().get(id).cloned();
Ok(entry.map(|(metadata, bytes)| {
Expand Down
Loading
Loading