Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 54 additions & 32 deletions src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ impl HotTierManager {
.await?;

let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?;
stream_hot_tier.oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?;
stream_hot_tier.oldest_date_time_entry =
self.get_oldest_date_time_entry(stream, tenant_id).await?;

Ok(stream_hot_tier)
}
Expand Down Expand Up @@ -450,15 +451,15 @@ impl HotTierManager {
self.put_hot_tier(stream, &mut stream_hot_tier, tenant_id)
.await?;
file_processed = true;
let path = self.get_stream_path_for_date(stream, &date);
let path = self.get_stream_path_for_date(stream, &date, tenant_id);
let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?;
hot_tier_manifest.files.push(parquet_file.clone());
hot_tier_manifest
.files
.sort_by_key(|file| file.file_path.clone());
// write the manifest file to the hot tier directory
let manifest_path = self
.get_stream_path_for_date(stream, &date)
.get_stream_path_for_date(stream, &date, tenant_id)
.join("hottier.manifest.json");
fs::create_dir_all(manifest_path.parent().unwrap()).await?;
fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?;
Expand All @@ -467,9 +468,18 @@ impl HotTierManager {
}

///fetch the list of dates available in the hot tier directory for the stream and sort them
pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result<Vec<NaiveDate>, HotTierError> {
pub async fn fetch_hot_tier_dates(
&self,
stream: &str,
tenant_id: &Option<String>,
) -> Result<Vec<NaiveDate>, HotTierError> {
let mut date_list = Vec::new();
let path = self.hot_tier_path.join(stream);
let path = if let Some(tenant) = tenant_id.as_ref() {
self.hot_tier_path.join(tenant).join(stream)
} else {
self.hot_tier_path.join(stream)
};
// let path = self.hot_tier_path.join(stream);
if !path.exists() {
return Ok(date_list);
}
Expand Down Expand Up @@ -524,37 +534,47 @@ impl HotTierManager {
}

/// get hot tier path for the stream and date
pub fn get_stream_path_for_date(&self, stream: &str, date: &NaiveDate) -> PathBuf {
self.hot_tier_path.join(stream).join(format!("date={date}"))
pub fn get_stream_path_for_date(
&self,
stream: &str,
date: &NaiveDate,
tenant_id: &Option<String>,
) -> PathBuf {
if let Some(tenant) = tenant_id.as_ref() {
self.hot_tier_path
.join(tenant)
.join(stream)
.join(format!("date={date}"))
} else {
self.hot_tier_path.join(stream).join(format!("date={date}"))
}
}

/// Returns the list of manifest files present in hot tier directory for the stream
pub async fn get_hot_tier_manifest_files(
&self,
stream: &str,
manifest_files: &mut Vec<File>,
) -> Result<Vec<File>, HotTierError> {
// Fetch the list of hot tier parquet files for the given stream.
let mut hot_tier_files = self.get_hot_tier_parquet_files(stream).await?;

// Retain only the files in `hot_tier_files` that also exist in `manifest_files`.
hot_tier_files.retain(|file| {
manifest_files
.iter()
.any(|manifest_file| manifest_file.file_path.eq(&file.file_path))
});
// Check which query-relevant files exist locally in the hot tier directory.
let mut hot_tier_files = Vec::new();
let mut remaining = Vec::with_capacity(manifest_files.len());

for file in manifest_files.drain(..) {
let hot_tier_path = self.hot_tier_path.join(&file.file_path);
if let Ok(meta) = fs::metadata(&hot_tier_path).await
&& meta.len() == file.file_size
{
hot_tier_files.push(file);
continue;
}

// Sort `hot_tier_files` in descending order by file path.
hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));
remaining.push(file);
}

// Update `manifest_files` to exclude files that are present in the filtered `hot_tier_files`.
manifest_files.retain(|manifest_file| {
hot_tier_files
.iter()
.all(|file| !file.file_path.eq(&manifest_file.file_path))
});
*manifest_files = remaining;

// Sort `manifest_files` in descending order by file path.
// Sort both lists in descending order by file path.
hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));
manifest_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));

Ok(hot_tier_files)
Expand All @@ -564,16 +584,17 @@ impl HotTierManager {
pub async fn get_hot_tier_parquet_files(
&self,
stream: &str,
tenant_id: &Option<String>,
) -> Result<Vec<File>, HotTierError> {
// Fetch list of dates for the given stream
let date_list = self.fetch_hot_tier_dates(stream).await?;
let date_list = self.fetch_hot_tier_dates(stream, tenant_id).await?;

// Create an unordered iter of futures to async collect files
let mut tasks = FuturesUnordered::new();

// For each date, fetch the manifest and extract parquet files
for date in date_list {
let path = self.get_stream_path_for_date(stream, &date);
let path = self.get_stream_path_for_date(stream, &date, tenant_id);
tasks.push(async move {
HotTierManager::get_hot_tier_manifest_from_path(path)
.await
Expand Down Expand Up @@ -621,9 +642,9 @@ impl HotTierManager {
tenant_id: &Option<String>,
) -> Result<bool, HotTierError> {
let mut delete_successful = false;
let dates = self.fetch_hot_tier_dates(stream).await?;
let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?;
'loop_dates: for date in dates {
let path = self.get_stream_path_for_date(stream, &date);
let path = self.get_stream_path_for_date(stream, &date, tenant_id);
if !path.exists() {
continue;
}
Expand Down Expand Up @@ -712,14 +733,15 @@ impl HotTierManager {
pub async fn get_oldest_date_time_entry(
&self,
stream: &str,
tenant_id: &Option<String>,
) -> Result<Option<String>, HotTierError> {
let date_list = self.fetch_hot_tier_dates(stream).await?;
let date_list = self.fetch_hot_tier_dates(stream, tenant_id).await?;
if date_list.is_empty() {
return Ok(None);
}

for date in date_list {
let path = self.get_stream_path_for_date(stream, &date);
let path = self.get_stream_path_for_date(stream, &date, tenant_id);
let hours_dir = ReadDirStream::new(fs::read_dir(&path).await?);
let mut hours: Vec<DirEntry> = hours_dir.try_collect().await?;
hours.retain(|entry| {
Expand Down
10 changes: 3 additions & 7 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ impl StandardTableProvider {
time_partition: Option<String>,
) -> Result<(), DataFusionError> {
let hot_tier_files = hot_tier_manager
.get_hot_tier_manifest_files(&self.stream, manifest_files)
.get_hot_tier_manifest_files(manifest_files)
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?;

let hot_tier_files = hot_tier_files
let hot_tier_files: Vec<File> = hot_tier_files
.into_iter()
.map(|mut file| {
let path = PARSEABLE
Expand All @@ -221,11 +221,7 @@ impl StandardTableProvider {
.collect();

let (partitioned_files, statistics) = self.partitioned_files(hot_tier_files);
// let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() {
// &format!("file:///{tenant_id}/")
// } else {
// "file:///"
// };

let object_store_url = "file:///";
self.create_parquet_physical_plan(
execution_plans,
Expand Down
Loading