This document describes the complete workflow for batch document processing using Exosphere.
The batch document processing workflow consists of 7 main nodes that work together to process documents in batches, extract information using Google Gemini, and store the results in a database.
flowchart LR
A[CSV Input] --> B[Chunking]
B --> C[Batch Processing]
C --> D[Polling]
D --> E[Validation]
E --> F[Database Write]
E --> G[Failure Handling]
G --> H[Failure CSV]
- Purpose: Reads file paths from a CSV input file
- Inputs:
csv_file_path: Path to the CSV file containing document paths
- Outputs:
file_paths: JSON string containing array of file paths
- Function: Parses CSV file and extracts file paths from the first column
- Purpose: Creates chunks of file paths for batch processing
- Inputs:
file_paths: JSON string containing array of file pathschunk_size: Size of each chunk (default: 10)
- Outputs:
chunk: JSON string containing a chunk of file paths
- Function: Splits file paths into manageable chunks for parallel processing
- Purpose: Processes each chunk as a batch and sends to Gemini
- Inputs:
chunk: JSON string containing chunk of file pathsprompt: Processing prompt for Gemini
- Outputs:
task_id: Unique task ID for trackingbatch_info: JSON string with batch information
- Secrets:
gemini_api_key: Gemini API key
- Function: Sends batch to Gemini endpoint and returns task ID
- Purpose: Polls for task completion with requeue mechanism
- Inputs:
task_id: Task ID to poll forbatch_info: JSON string with batch information
- Outputs:
task_result: JSON string with task resultsstatus: Status of the task (completed, failed, pending)
- Secrets:
gemini_api_key: Gemini API key
- Function: Polls Gemini for task completion, requeues if not complete
- Purpose: Validates extracted JSON information
- Inputs:
task_result: JSON string with task resultsbatch_info: JSON string with batch information
- Outputs:
validated_data: JSON string with validated datavalidation_status: Status of validation (valid, invalid, partial)
- Function: Validates extracted JSON against schema and performs quality checks
- Purpose: Writes validated information to database
- Inputs:
validated_data: JSON string with validated databatch_info: JSON string with batch information
- Outputs:
write_status: Status of database write operationrecord_count: Number of records written
- Secrets:
database_url: Database connection string
- Function: Writes validated data to MongoDB database
- Purpose: Handles validation failures and creates retry CSV
- Inputs:
validation_status: Status of validationbatch_info: JSON string with batch informationtask_result: JSON string with task results
- Outputs:
failure_csv_path: Path to the failure CSV fileretry_count: Number of files that need retry
- Function: Creates failure CSV for documents that need retry
- CSV file with document paths
- Processing prompt
- Chunk size parameter
- File paths are chunked into batches
- Each batch is sent to Gemini for processing
- Results are polled until completion
- Extracted data is validated
- Validated data stored in database
- Failure CSV for retry mechanism
- Processing statistics and logs
- Documents with invalid extracted data are logged
- Failure CSV is created with retry information
- Failed documents can be reprocessed
- Gemini API failures are handled gracefully
- Tasks are requeued with exponential backoff
- Timeout handling for long-running tasks
- Database connection issues are logged
- Partial writes are handled appropriately
- Transaction rollback on critical failures
file_path,failure_reason,task_id,timestamp
/path/to/doc1.pdf,missing_title,task-123,2024-01-01T12:00:00Z
/path/to/doc2.docx,content_too_short,task-123,2024-01-01T12:00:00Z- Failed documents are logged to failure CSV
- Failure CSV can be used as input for retry
- Retry count is tracked to prevent infinite loops
- Exponential backoff for API retries
- Optimal batch size depends on document complexity
- Larger batches reduce API calls but increase processing time
- Default batch size of 10 is recommended
- Polling interval starts at 5 seconds
- Maximum 10 polling attempts
- Exponential backoff for failed requests
- Indexes on task_id, file_path, and status
- JSONB for flexible extracted data storage
- Connection pooling for high throughput
- DEBUG: Detailed processing information
- INFO: General workflow progress
- WARNING: Non-critical issues
- ERROR: Critical failures
- Total documents processed
- Success/failure rates
- Processing time per batch
- API response times
- Database connectivity
- Gemini API availability
- File system access
- Memory and CPU usage
- Gemini API key stored as secret
- Database credentials in environment variables
- No hardcoded credentials in code
- Document paths are logged (ensure no sensitive data in paths)
- Extracted data stored in secure database
- Access controls on database
- CSV file format validation
- File path sanitization
- Prompt injection prevention
- CSV file not found: Check file path and permissions
- Gemini API errors: Verify API key and rate limits
- Database connection failed: Check connection string and network
- Validation failures: Review prompt and document format
- Check logs for error messages
- Verify environment variables
- Test database connectivity
- Validate CSV file format
- Check Gemini API status
- Restart failed nodes
- Retry with failure CSV
- Check database integrity
- Verify file system access
- Monitor resource usage