1717 *
1818 */
1919
20- use std:: {
21- collections:: { HashMap , HashSet } ,
22- fs:: { self , File , OpenOptions , remove_file, write} ,
23- num:: NonZeroU32 ,
24- path:: { Path , PathBuf } ,
25- sync:: { Arc , Mutex , RwLock } ,
26- time:: { Instant , SystemTime , UNIX_EPOCH } ,
27- } ;
28-
2920use arrow_array:: RecordBatch ;
21+ use arrow_ipc:: reader:: StreamReader ;
3022use arrow_schema:: { Field , Fields , Schema } ;
3123use chrono:: { NaiveDateTime , Timelike , Utc } ;
3224use derive_more:: derive:: { Deref , DerefMut } ;
@@ -41,6 +33,15 @@ use parquet::{
4133 schema:: types:: ColumnPath ,
4234} ;
4335use relative_path:: RelativePathBuf ;
36+ use std:: io:: BufReader ;
37+ use std:: {
38+ collections:: { HashMap , HashSet } ,
39+ fs:: { self , File , OpenOptions , remove_file, write} ,
40+ num:: NonZeroU32 ,
41+ path:: { Path , PathBuf } ,
42+ sync:: { Arc , Mutex , RwLock } ,
43+ time:: { Instant , SystemTime , UNIX_EPOCH } ,
44+ } ;
4445use tokio:: task:: JoinSet ;
4546use tracing:: { error, info, trace, warn} ;
4647use ulid:: Ulid ;
@@ -61,7 +62,7 @@ use crate::{
6162} ;
6263
6364use super :: {
64- ARROW_FILE_EXTENSION , LogStream ,
65+ ARROW_FILE_EXTENSION , LogStream , PART_FILE_EXTENSION ,
6566 staging:: {
6667 StagingError ,
6768 reader:: { MergedRecordReader , MergedReverseRecordReader } ,
@@ -994,12 +995,112 @@ impl Stream {
994995 None
995996 }
996997
998+ /// Recovers orphaned .part files from a previous interrupted run.
999+ /// These are incomplete arrow files that weren't finalized before the server crashed.
1000+ /// Valid .part files are renamed to .arrows for processing, invalid ones are removed.
1001+ fn recover_orphan_part_files ( & self ) {
1002+ let Ok ( dir) = self . data_path . read_dir ( ) else {
1003+ return ;
1004+ } ;
1005+
1006+ for entry in dir. flatten ( ) {
1007+ let path = entry. path ( ) ;
1008+ if path
1009+ . extension ( )
1010+ . is_some_and ( |ext| ext == PART_FILE_EXTENSION )
1011+ {
1012+ info ! (
1013+ "Found orphaned .part file: {:?} for stream {}" ,
1014+ path, self . stream_name
1015+ ) ;
1016+
1017+ // Check if file is non-empty and potentially valid
1018+ match path. metadata ( ) {
1019+ Ok ( meta) if meta. len ( ) == 0 => {
1020+ warn ! (
1021+ "Removing empty orphaned .part file: {:?} for stream {}" ,
1022+ path, self . stream_name
1023+ ) ;
1024+ if let Err ( e) = remove_file ( & path) {
1025+ error ! ( "Failed to remove empty .part file {:?}: {e}" , path) ;
1026+ }
1027+ continue ;
1028+ }
1029+ Ok ( _) => {
1030+ // Try to validate the arrow file by reading its schema
1031+ match File :: open ( & path) {
1032+ Ok ( file) => {
1033+ match StreamReader :: try_new ( BufReader :: new ( file) , None ) {
1034+ Ok ( _reader) => {
1035+ // File has valid schema, rename to .arrows
1036+ let mut arrow_path = path. clone ( ) ;
1037+ arrow_path. set_extension ( ARROW_FILE_EXTENSION ) ;
1038+
1039+ // If arrow file with same name exists, generate a unique name
1040+ if arrow_path. exists ( ) {
1041+ let file_name =
1042+ arrow_path. file_name ( ) . unwrap ( ) . to_string_lossy ( ) ;
1043+ if let Some ( date_pos) = file_name. find ( ".date" ) {
1044+ let random_suffix = ulid:: Ulid :: new ( ) . to_string ( ) ;
1045+ let new_name = format ! (
1046+ "{}{}" ,
1047+ random_suffix,
1048+ & file_name[ date_pos..]
1049+ ) ;
1050+ arrow_path. set_file_name ( new_name) ;
1051+ }
1052+ }
1053+
1054+ info ! (
1055+ "Recovering orphaned .part file: {:?} -> {:?} for stream {}" ,
1056+ path, arrow_path, self . stream_name
1057+ ) ;
1058+ if let Err ( e) = std:: fs:: rename ( & path, & arrow_path) {
1059+ error ! (
1060+ "Failed to rename .part file {:?} to {:?}: {e}" ,
1061+ path, arrow_path
1062+ ) ;
1063+ }
1064+ }
1065+ Err ( e) => {
1066+ // File is invalid/corrupted, remove it
1067+ warn ! (
1068+ "Removing invalid/corrupted .part file: {:?} for stream {}: {e}" ,
1069+ path, self . stream_name
1070+ ) ;
1071+ if let Err ( e) = remove_file ( & path) {
1072+ error ! (
1073+ "Failed to remove invalid .part file {:?}: {e}" ,
1074+ path
1075+ ) ;
1076+ }
1077+ }
1078+ }
1079+ }
1080+ Err ( e) => {
1081+ error ! ( "Failed to open .part file {:?} for validation: {e}" , path) ;
1082+ }
1083+ }
1084+ }
1085+ Err ( e) => {
1086+ warn ! ( "Could not get metadata for .part file {:?}: {e}" , path) ;
1087+ }
1088+ }
1089+ }
1090+ }
1091+ }
1092+
9971093 /// First flushes arrows onto disk and then converts the arrow into parquet
9981094 pub fn flush_and_convert (
9991095 & self ,
10001096 init_signal : bool ,
10011097 shutdown_signal : bool ,
10021098 ) -> Result < ( ) , StagingError > {
1099+ // On init, recover any orphaned .part files from previous interrupted runs
1100+ if init_signal {
1101+ self . recover_orphan_part_files ( ) ;
1102+ }
1103+
10031104 let start_flush = Instant :: now ( ) ;
10041105 // Force flush for init or shutdown signals to convert all .part files to .arrows
10051106 // For regular cycles, use false to only flush non-current writers
0 commit comments