Skip to content

Commit a52d993

Browse files
AlrobbertzCopilot
andauthored
Update Processing Lambda to Support PADRE Craft (#1)
* Update Requirements * Add Details for Testing with Private Codebase * Remove Reference for Private Repos * Update Processing to support None returns * Clean up Log Statements * Formatting * Formatting * Update Processing for Local Files & Update Actions Workflow * Update Volumne Mount in Workflow * Update Filename * Update Volumn Mount * Update README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 8d476d1 commit a52d993

6 files changed

Lines changed: 195 additions & 100 deletions

File tree

.github/workflows/calibration.yml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,22 @@ jobs:
1515
- name: Build Lambda Docker Image
1616
run: |
1717
cd lambda_function
18-
docker build -t processing_function:latest .
18+
export BASE_IMAGE=public.ecr.aws/w5r9l1c8/dev-padre-swsoc-docker-lambda-base:latest
19+
export REQUIREMENTS_FILE=padre-requirements.txt
20+
docker build \
21+
--build-arg BASE_IMAGE=$BASE_IMAGE \
22+
--build-arg REQUIREMENTS_FILE=$REQUIREMENTS_FILE \
23+
-t processing_function:latest .
1924
2025
- name: Run Lambda Docker Container
2126
run: |
22-
docker run -d --name processing_lambda -p 9000:8080 -e USE_INSTRUMENT_TEST_DATA=True -e SWXSOC_MISSION=hermes processing_function:latest
27+
docker run -d \
28+
--name processing_lambda \
29+
-p 9000:8080 \
30+
-v "${{ github.workspace }}/lambda_function/tests/test_data/:/test_data" \
31+
-e SWXSOC_MISSION=padre \
32+
-e SDC_AWS_FILE_PATH=/test_data/padre_get_CUBEADCS_GEN2_OP_STATUS_APP_Data_1761936771334_1762106179414.csv \
33+
processing_function:latest
2334
container_id=$(docker ps -qf "ancestor=processing_function:latest")
2435
echo "Container ID: $container_id"
2536
@@ -30,7 +41,7 @@ jobs:
3041
id: test-lambda
3142
run: |
3243
# Run curl and write the HTTP status code to a variable
33-
HTTP_STATUS=$(curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_eea_event.json)
44+
HTTP_STATUS=$(curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_craft_event.json)
3445
echo "HTTP Status: $HTTP_STATUS"
3546
3647
# Grep the HTTP status code from the curl output for 200 (success)

README.md

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,30 @@ The container will contain the latest release code as the production environment
1313
### **Testing Locally (Using own Test Data)**:
1414
1. Build the lambda container image (from within the lambda_function folder) you'd like to test:
1515

16-
`docker build -t processing_function:latest . --no-cache`
16+
```sh
17+
docker build \
18+
--build-arg BASE_IMAGE=$BASE_IMAGE \ # Optional: specify base image
19+
--build-arg REQUIREMENTS_FILE=$REQUIREMENTS_FILE \ # Optional: specify requirements file
20+
-t sdc_aws_processing_lambda:latest . \
21+
--network host
22+
```
1723

1824
2. Run the lambda container image you've built, this will start the lambda runtime environment:
1925

20-
`docker run -p 9000:8080 -v <directory_for_processed_files>:/test_data -e SDC_AWS_FILE_PATH=/test_data/<file_to_process_name> processing_function:latest`
26+
```sh
27+
docker run \
28+
-p 9000:8080 \
29+
-v <directory_for_processed_files>:/test_data \
30+
-e SDC_AWS_FILE_PATH=/test_data/<file_to_process_name> \
31+
sdc_aws_processing_lambda:latest
32+
```
2133

2234
3. From a `separate` terminal, make a curl request to the running lambda function:
2335

24-
`curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d @lambda_function/tests/test_data/test_eea_event.json`
36+
```sh
37+
curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" \
38+
-d @lambda_function/tests/test_data/test_eea_event.json
39+
```
2540

2641
4. Close original terminal running the docker image.
2742

@@ -50,4 +65,4 @@ The container will contain the latest release code as the production environment
5065

5166

5267
### **How this Lambda Function is deployed**
53-
This lambda function is part of the main SWxSOC Pipeline ([Architecture Repo Link](https://github.com/HERMES-SOC/sdc_aws_pipeline_architecture)). It is deployed via AWS Codebuild within that repository. It is first built and tagged within the appropriate production or development repository (depending if it is a release or commit). View the Codebuild CI/CD file [here](buildspec.yml).
68+
This lambda function is part of the main SWxSOC Pipeline ([Architecture Repo Link](https://github.com/swxsoc/sdc_aws_architecture)). It is deployed via AWS Codebuild within that repository. It is first built and tagged within the appropriate production or development repository (depending if it is a release or commit). View the Codebuild CI/CD file [here](buildspec.yml).

lambda_function/padre-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ sdc_aws_utils @ git+https://github.com/swxsoc/sdc_aws_utils.git
22
metatracker @ git+https://github.com/swxsoc/MetaTracker.git
33
padre_meddea @ git+https://github.com/PADRESat/padre_meddea.git
44
padre_sharp @ git+https://github.com/PADRESat/padre_sharp.git
5+
padre_craft @ git+https://github.com/PADRESat/padre_craft.git
56
tenacity==9.1.2

lambda_function/src/file_processor/file_processor.py

Lines changed: 135 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from itertools import combinations
1313
import shutil
1414
import traceback
15+
from typing import Tuple
1516

1617
import swxsoc
1718

@@ -164,10 +165,10 @@ def _process_file(self) -> None:
164165
)
165166

166167
FileProcessor._track_file_metatracker(
167-
science_filename_parser,
168-
Path(file_path),
169-
self.file_key,
170-
self.instrument_bucket_name,
168+
science_filename_parser=science_filename_parser,
169+
file_path=Path(file_path),
170+
s3_key=self.file_key,
171+
s3_bucket=self.instrument_bucket_name,
171172
status=status,
172173
)
173174

@@ -182,15 +183,16 @@ def _process_file(self) -> None:
182183
# If calibrated files are found, set status to success
183184
status = self.build_status(
184185
status=Status.SUCCESS,
185-
message=f"File Processed Successfully",
186+
message="File Processed Successfully",
186187
total_time=total_time,
187188
)
188189

190+
# Track the original science file as processed successfully
189191
science_file_id, science_product_id = FileProcessor._track_file_metatracker(
190-
science_filename_parser,
191-
Path(file_path),
192-
self.file_key,
193-
self.instrument_bucket_name,
192+
science_filename_parser=science_filename_parser,
193+
file_path=Path(file_path),
194+
s3_key=self.file_key,
195+
s3_bucket=self.instrument_bucket_name,
194196
status=status,
195197
)
196198

@@ -203,6 +205,10 @@ def _process_file(self) -> None:
203205
}
204206
)
205207

208+
# Filter out None values from calibrated filenames
209+
calibrated_filenames = [
210+
fname for fname in calibrated_filenames if fname is not None
211+
]
206212
# Push file to S3 Bucket
207213
for calibrated_filename in calibrated_filenames:
208214
status = self.build_status(
@@ -220,11 +226,11 @@ def _process_file(self) -> None:
220226

221227
# Track the calibrated file in the CDF Tracker
222228
self._track_file_metatracker(
223-
science_filename_parser,
224-
Path("/tmp") / calibrated_filename,
225-
calibrated_filename,
226-
destination_bucket,
227-
science_product_id,
229+
science_filename_parser=science_filename_parser,
230+
file_path=Path("/tmp") / calibrated_filename,
231+
s3_key=calibrated_filename,
232+
s3_bucket=destination_bucket,
233+
science_product_id=science_product_id,
228234
status=status,
229235
)
230236

@@ -260,43 +266,61 @@ def _calibrate_file(instrument, file_path, dry_run=False):
260266
fromlist=["data"],
261267
)
262268
# Get all files in test data directory
263-
test_data_dir = Path(instr_pkg_data.__path__[0])
269+
test_data_dir = Path(instr_pkg_data.__path__[0]) / "test"
270+
log.info(f"Test data directory: {test_data_dir}")
264271
test_data_files = list(test_data_dir.glob("**/*"))
265272
log.info(f"Found {len(test_data_files)} files in test data directory")
266273
log.info(f"Using {test_data_files} as test data")
267-
# Get any files ending in .bin or .cdf and calibrate them
274+
# Stub path list for calibrated files
275+
path_list = []
276+
# Loop the test data files for calibration
268277
for test_data_file in test_data_files:
269-
if test_data_file.suffix in [".bin", ".cdf", ".fits"]:
270-
log.info(f"Calibrating {test_data_file}")
278+
if test_data_file.suffix in [".bin", ".cdf", ".fits", ".csv"]:
279+
log.info(f"Calibrating {test_data_file.name}")
271280
# Make /test_data directory if it doesn't exist
272281
Path("/test_data").mkdir(parents=True, exist_ok=True)
273282
# Copy file to /test_data directory using shutil
274283
test_data_file_path = Path(test_data_file)
275284
file_path = Path(f"/test_data/{test_data_file_path.name}")
276285
shutil.copy(test_data_file_path, file_path)
277286
# Calibrate file
278-
calibrated_filename = calibration.process_file(file_path)[0]
279-
# Copy calibrated file to test data directory
280-
calibrated_file_path = Path(calibrated_filename)
281-
# Return name of calibrated file
282-
log.info(f"Calibrated file saved as {calibrated_file_path}")
283-
284-
return calibrated_filename
287+
files_list = calibration.process_file(file_path)
288+
289+
if len(files_list) == 0:
290+
log.warning(
291+
f"No calibrated files generated for {file_path}"
292+
)
293+
continue
294+
for generated_file in files_list:
295+
if generated_file is not None:
296+
new_file_path = Path(generated_file)
297+
calibrated_filename = new_file_path.name
298+
path_list.append(calibrated_filename)
299+
log.info(
300+
f"Calibrated file saved as {calibrated_filename}"
301+
)
302+
else:
303+
# Pass-through None values to indicate no file was created
304+
path_list.append(None)
305+
log.warning(f"'None' file generated for {file_path}")
306+
# Return list of calibrated files
307+
return path_list
285308

286-
# If no files ending in .bin or .cdf are found, raise an error
287-
raise FileNotFoundError(
288-
"No files ending in .bin or .cdf found in test data directory"
289-
)
290309
log.info(f"Calibrating {file_path}")
291310
# Get name of new file
292311
files_list = calibration.process_file(Path(file_path))
293312

294313
path_list = []
295314
for generated_file in files_list:
296-
new_file_path = Path(generated_file)
297-
calibrated_filename = new_file_path.name
298-
path_list.append(calibrated_filename)
299-
log.info(f"Calibrated file saved as {calibrated_filename}")
315+
if generated_file is not None:
316+
new_file_path = Path(generated_file)
317+
calibrated_filename = new_file_path.name
318+
path_list.append(calibrated_filename)
319+
log.info(f"Calibrated file saved as {calibrated_filename}")
320+
else:
321+
# Pass-through None values to indicate no file was created
322+
path_list.append(None)
323+
log.warning(f"'None' file generated for {file_path}")
300324

301325
return path_list
302326

@@ -336,84 +360,102 @@ def _calibrate_file(instrument, file_path, dry_run=False):
336360
reraise=True,
337361
)
338362
def _track_file_metatracker(
339-
science_filename_parser,
340-
file_path,
341-
s3_key,
342-
s3_bucket,
343-
science_product_id=None,
344-
status=None,
345-
) -> int:
363+
science_filename_parser: callable,
364+
file_path: Path,
365+
s3_key: str,
366+
s3_bucket: str,
367+
science_product_id: int = None,
368+
status: dict = None,
369+
) -> Tuple[int, int]:
346370
"""
347-
Tracks processed science product in the CDF Tracker file database.
371+
Tracks processed science product in the File Metadata tracker database.
348372
It involves initializing the database engine, setting up database tables,
349373
and tracking both the original and processed files.
350374
351-
:param science_filename_parser: The parser function to process file names.
352-
:type science_filename_parser: function
353-
:param file_path: The path of the original file.
354-
:type file_path: Path
375+
Parameters
376+
----------
377+
science_filename_parser : function
378+
The parser function to process file names.
379+
file_path : Path
380+
The path of the file in the filesystem.
381+
s3_key : str
382+
The S3 key of the file.
383+
s3_bucket : str
384+
The S3 bucket of the file.
385+
science_product_id : int, optional
386+
The ID of the science product, by default None.
387+
status : dict, optional
388+
The status dictionary for tracking, by default None.
389+
390+
Returns
391+
-------
392+
Tuple[int, int]
393+
A tuple containing the science file ID and science product ID.
355394
"""
356395
secret_arn = os.getenv("RDS_SECRET_ARN", None)
357-
if secret_arn:
358-
try:
359-
# Validate file path
360-
if not file_path or not isinstance(file_path, Path):
361-
raise ValueError("Invalid file path provided.")
362-
# Check if file exists
363-
if not file_path.exists():
364-
raise FileNotFoundError(f"File not found: {file_path}")
365-
366-
# Get Database Credentials
367-
session = boto3.session.Session()
368-
client = session.client(service_name="secretsmanager")
369-
response = client.get_secret_value(SecretId=secret_arn)
370-
secret = json.loads(response["SecretString"])
371-
connection_string = (
372-
f"postgresql://{secret['username']}:{secret['password']}@"
373-
f"{secret['host']}:{secret['port']}/{secret['dbname']}"
374-
)
396+
if not secret_arn:
397+
log.error(
398+
f"Failed to update MetaTracker for file {file_path}. No RDS Secret ARN found in environment variables.",
399+
)
400+
return None, None
375401

376-
metatracker_config = FileProcessor.get_metatracker_config(swxsoc.config)
402+
try:
403+
# Validate file path
404+
if not file_path or not isinstance(file_path, Path):
405+
raise ValueError("Invalid file path provided.")
406+
# Check if file exists
407+
if not file_path.exists():
408+
raise FileNotFoundError(f"File not found: {file_path}")
409+
410+
# Get Database Credentials
411+
session = boto3.session.Session()
412+
client = session.client(service_name="secretsmanager")
413+
response = client.get_secret_value(SecretId=secret_arn)
414+
secret = json.loads(response["SecretString"])
415+
connection_string = (
416+
f"postgresql://{secret['username']}:{secret['password']}@"
417+
f"{secret['host']}:{secret['port']}/{secret['dbname']}"
418+
)
377419

378-
log.debug(swxsoc.config)
420+
metatracker_config = FileProcessor.get_metatracker_config(swxsoc.config)
379421

380-
log.debug(metatracker_config)
422+
log.debug(swxsoc.config)
381423

382-
metatracker.set_config(metatracker_config)
424+
log.debug(metatracker_config)
383425

384-
from metatracker.database import create_engine
385-
from metatracker.database.tables import create_tables
386-
from metatracker.tracker import tracker
426+
metatracker.set_config(metatracker_config)
387427

388-
# Initialize the database engine
389-
database_engine = create_engine(connection_string)
428+
from metatracker.database import create_engine
429+
from metatracker.database.tables import create_tables
430+
from metatracker.tracker import tracker
390431

391-
# Create tables if they do not exist
392-
create_tables(database_engine)
432+
# Initialize the database engine
433+
database_engine = create_engine(connection_string)
393434

394-
# Set tracker to MetaTracker
395-
meta_tracker = tracker.MetaTracker(
396-
database_engine, science_filename_parser
397-
)
435+
# Create tables if they do not exist
436+
create_tables(database_engine)
398437

399-
if meta_tracker:
400-
science_file_id, science_product_id = meta_tracker.track(
401-
file_path, s3_key, s3_bucket, status=status
402-
)
438+
# Set tracker to MetaTracker
439+
meta_tracker = tracker.MetaTracker(database_engine, science_filename_parser)
403440

404-
return science_file_id, science_product_id
441+
if meta_tracker:
442+
science_file_id, science_product_id = meta_tracker.track(
443+
file_path, s3_key, s3_bucket, status=status
444+
)
405445

406-
return None
446+
return science_file_id, science_product_id
407447

408-
except Exception as e:
409-
log.error(
410-
{
411-
"status": "ERROR",
412-
"message": str(e),
413-
"traceback": traceback.format_exc(),
414-
}
415-
)
416-
return None
448+
return None, None
449+
450+
except Exception as e:
451+
log.error(
452+
{
453+
"status": "ERROR",
454+
"message": str(e),
455+
"traceback": traceback.format_exc(),
456+
}
457+
)
458+
return None, None
417459

418460
@staticmethod
419461
def get_metatracker_config(swxsoc_config: dict) -> dict:

0 commit comments

Comments
 (0)