From cfd6d463cd86f7e317b7f4e7efddec36732c18fb Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Tue, 25 Jun 2024 16:00:02 -0400 Subject: [PATCH 01/25] adding warning for access edge cases when linking --- .gitignore | 0 Makefile | 0 README.md | 0 apps/__init__.py | 0 apps/access/__init__.py | 40 +++++++++++++++++++++++---- apps/cleaning/__init__.py | 0 apps/cmoch/__init__.py | 0 apps/lims.py | 0 requirements.txt | 0 scripts/README.md | 0 scripts/bin/access_beagle_endpoint.py | 0 11 files changed, 35 insertions(+), 5 deletions(-) mode change 100644 => 100755 .gitignore mode change 100644 => 100755 Makefile mode change 100644 => 100755 README.md mode change 100644 => 100755 apps/__init__.py mode change 100644 => 100755 apps/access/__init__.py mode change 100644 => 100755 apps/cleaning/__init__.py mode change 100644 => 100755 apps/cmoch/__init__.py mode change 100644 => 100755 apps/lims.py mode change 100644 => 100755 requirements.txt mode change 100644 => 100755 scripts/README.md mode change 100644 => 100755 scripts/bin/access_beagle_endpoint.py diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/Makefile b/Makefile old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/apps/__init__.py b/apps/__init__.py old mode 100644 new mode 100755 diff --git a/apps/access/__init__.py b/apps/access/__init__.py old mode 100644 new mode 100755 index f4aa9a8..4227e72 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -5,7 +5,9 @@ from pathlib import Path import shutil import requests - +import re +import pdb +import json FLAG_TO_APPS = { "dmpmanifest": ("access_manifest", "manifest"), "msi": ("access legacy MSI", "microsatellite_instability"), @@ -54,6 +56,7 @@ def get_operator_run(app_name, app_version=None, tags=None, config=None, show_al "app_name": app_name } + operator_look_ahead(latest_operator_run, config, tags) if show_all_runs: latest_operator_run.pop("status") @@ -69,6 +72,7 @@ def get_operator_run(app_name, app_version=None, tags=None, config=None, show_al if "igoRequestId" in tags: new_tag = tags.replace("igoRequestId", "requestId") return get_operator_run(app_name, app_version, tags=new_tag, config=config) + else: print("There are no completed operator runs for this request in the following app: %s:%s" % (str(app_name), str(app_version)), file=sys.stderr) @@ -76,6 +80,22 @@ def get_operator_run(app_name, app_version=None, tags=None, config=None, show_al return latest_runs[0] +def operator_look_ahead(latest_operator_run, config, tags): + threshold = float(.90) + complete = float(1.0) + request_id = json.loads(tags)['igoRequestId'] + latest_operator_run_look_ahead = latest_operator_run.copy() + latest_operator_run_look_ahead.pop("status") + response_look_ahead = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), + headers={'Authorization': 'Bearer %s' % config['token']}, + params=latest_operator_run) + total_r = response_look_ahead.json()["results"][0]["num_total_runs"] + completed_r = response_look_ahead.json()["results"][0]["num_completed_runs"] + percent_c = completed_r / total_r + if (percent_c >= threshold) and (percent_c < complete): + print(f"Warning there is a more recent operator run for request {request_id} that is incomplete, but with {percent_c} of runs completed. This may be the operator run you need for analysis. Consult the request's operator run history and consider using the --all-runs flag if appropriate.") + + def open_request_file(request_ids_file): try: with open(request_ids_file,'r') as file: @@ -114,13 +134,11 @@ def get_runs(operator_run_id, config, show_all_runs): "page_size": 1000, "status": "COMPLETED" } - if show_all_runs: run_params.pop("status") response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) - return response.json()["results"] def get_run_by_id(run_id, config): @@ -162,6 +180,7 @@ def link_app(operator_run, directory, request_id, sample_id, arguments, config, except Exception as e: print("could not delete symlink: {} ".format(path / run["id"]), file=sys.stderr) else: + is_run_manual(run, request_id) try: os.symlink(run["output_directory"], path / run["id"]) print((path / run["id"]).absolute(), file=sys.stdout) @@ -289,11 +308,11 @@ def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, argu return files = [] # (sample_id, /path/to/file) - + for run in runs: for file_group in get_files_by_run_id(run["id"], config): files = files + find_files_by_sample(file_group["value"], sample_id=sample_id) - + accepted_file_types = ['.bam', '.bai'] for (sample_id, file) in files: file_path = get_file_path(file) @@ -339,6 +358,17 @@ def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, argu pass return "Completed" +def is_run_manual(run, request_id): + pattern = "/work/access/production/data/bams/*" + match = re.match(pattern, run["output_directory"]) + patient_dir = run["output_directory"] + '/current' + proj_dir = Path("./") / ("Project_" + request_id) + if match: + if Path(patient_dir).is_dir(): + return True + else: + shutil.rmtree(proj_dir) + raise FileNotFoundError(f'The folder {patient_dir} does not exist. Bams for request {request_id} were manually imported to Voyager. Please link patients in the data folder before linking the project folder.') def find_files_by_sample(file_group, sample_id = None): def traverse(file_group): diff --git a/apps/cleaning/__init__.py b/apps/cleaning/__init__.py old mode 100644 new mode 100755 diff --git a/apps/cmoch/__init__.py b/apps/cmoch/__init__.py old mode 100644 new mode 100755 diff --git a/apps/lims.py b/apps/lims.py old mode 100644 new mode 100755 diff --git a/requirements.txt b/requirements.txt old mode 100644 new mode 100755 diff --git a/scripts/README.md b/scripts/README.md old mode 100644 new mode 100755 diff --git a/scripts/bin/access_beagle_endpoint.py b/scripts/bin/access_beagle_endpoint.py old mode 100644 new mode 100755 From 0132ec1b569e3ee5d0f989159b2f939408a57b1d Mon Sep 17 00:00:00 2001 From: Allan Bolipata Date: Mon, 8 Jul 2024 12:50:22 -0400 Subject: [PATCH 02/25] Remove calls to obsolete endpoints --- scripts/bin/access_beagle_endpoint.py | 13 +---- scripts/deregister_jobs_and_files.py | 72 ++------------------------- 2 files changed, 5 insertions(+), 80 deletions(-) diff --git a/scripts/bin/access_beagle_endpoint.py b/scripts/bin/access_beagle_endpoint.py index eda1090..67bbf5d 100644 --- a/scripts/bin/access_beagle_endpoint.py +++ b/scripts/bin/access_beagle_endpoint.py @@ -17,19 +17,8 @@ def run_url(self, url): req = requests.get(url, auth=self.auth, verify=False) return req.json() - # had to build url weird because the requests docs were busted and I kept running into issues - def get_etl_jobs_by_request(self, request_id): - url = "%s/v0/etl/jobs/?page_size=1000&request_id=%s" % (self.API, request_id) - return self.run_url(url) - - - def get_etl_job(self, run_id): - url = "%s/v0/etl/jobs/%s" % (self.API, run_id) - return self.run_url(url) - - def get_file_ids(self, request_id): - url = "%s/v0/fs/files/?page_size=1000&metadata=requestId:%s" % (self.API, request_id) + url = "%s/v0/fs/files/?page_size=1000&metadata=igoRequestId:%s" % (self.API, request_id) data = self.run_url(url) file_ids = list() for result in data['results']: diff --git a/scripts/deregister_jobs_and_files.py b/scripts/deregister_jobs_and_files.py index 5c112fb..94fb79f 100755 --- a/scripts/deregister_jobs_and_files.py +++ b/scripts/deregister_jobs_and_files.py @@ -12,83 +12,19 @@ BEAGLE = beagle_api.AccessBeagleEndpoint() -def get_jobs(request_id): - """ - From request_id, get all related run data - """ - data = BEAGLE.get_etl_jobs_by_request(request_id)['results'] - - ids_fetch_samples = set() - - for job in data: - run_type = job['run'] - if run_type == 'beagle_etl.jobs.lims_etl_jobs.fetch_samples': - ids_fetch_samples.add(job['id']) - - return ids_fetch_samples - -def get_children_from_job(run_id): - """ - Retrieves children from main job - """ - data = BEAGLE.get_etl_job(run_id) - return data['children'] - -def get_run_type(run_id): - """ - We're assuming we get just one record here - """ - data = BEAGLE.get_etl_job(run_id) - if 'run' in data: - return data['run'] - return None - - -def get_file_id_from_run_id(run_id): - """ - Get the file id from the run id that imported it - """ - data = BEAGLE.get_etl_job(run_id) - file_path = data['args']['filepath'] - return BEAGLE.get_file_id_by_path(file_path) - if __name__ == "__main__": REQUEST_ID = sys.argv[1] OUTSCRIPT = sys.argv[2] - print("Retrieving root etl job IDs for %s" % REQUEST_ID) - FETCH_SAMPLE_JOBS = get_jobs(REQUEST_ID) - #print(FETCH_SAMPLE_JOBS) - + print("Searching beagle db for igoRequestId %s" % REQUEST_ID) files_to_deregister = set(BEAGLE.get_file_ids(REQUEST_ID)) - runs_to_deregister = set() - - number_of_fetch_jobs = len(FETCH_SAMPLE_JOBS) - print("Compiling child jobs from %i fetched jobs..." % number_of_fetch_jobs) - - for job_id in FETCH_SAMPLE_JOBS: - child_jobs = get_children_from_job(job_id) - for child_job in child_jobs: - run_type = get_run_type(child_job) - if run_type: - if run_type == "beagle_etl.jobs.lims_etl_jobs.create_pooled_normal": - pooled_normal_file_id = get_file_id_from_run_id(child_job) - files_to_deregister.add(pooled_normal_file_id) - runs_to_deregister.add(child_job) - - runs_to_deregister = list(runs_to_deregister.union(set(FETCH_SAMPLE_JOBS))) files_to_deregister = list(files_to_deregister) -# CHILD_JOBS = get_child_jobs(REQUEST_JOB_ID) -# print(CHILD_JOBS) num_files_to_deregister = len(files_to_deregister) - num_runs_to_deregister = len(runs_to_deregister) - print("Got %i files and %i runs to deregister. See beaglecli command output to execute." - % (num_files_to_deregister,num_runs_to_deregister)) - with open(OUTSCRIPT, 'w') as output_file: - for i in runs_to_deregister: - output_file.write("../beaglecli etl delete --job-id=%s\n" % i) + print("Found %i files to deregister; run \n\n\tbash %s\n\nto complete processing." + % (num_files_to_deregister,OUTSCRIPT)) + with open(OUTSCRIPT, 'w') as output_file: for i in files_to_deregister: if i: output_file.write("../beaglecli files delete --file-id=%s\n" % i) From f4aa679bc853b928a7cbbb0bad83dc7922a3c6af Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Tue, 8 Oct 2024 11:30:51 -0400 Subject: [PATCH 03/25] edge cases fix --- apps/access/__init__.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 4227e72..ffa792e 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -83,17 +83,21 @@ def get_operator_run(app_name, app_version=None, tags=None, config=None, show_al def operator_look_ahead(latest_operator_run, config, tags): threshold = float(.90) complete = float(1.0) - request_id = json.loads(tags)['igoRequestId'] + try: + request_id = json.loads(tags)['igoRequestId'] + except: + request_id = json.loads(tags)['requestId'] latest_operator_run_look_ahead = latest_operator_run.copy() latest_operator_run_look_ahead.pop("status") response_look_ahead = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), headers={'Authorization': 'Bearer %s' % config['token']}, params=latest_operator_run) - total_r = response_look_ahead.json()["results"][0]["num_total_runs"] - completed_r = response_look_ahead.json()["results"][0]["num_completed_runs"] - percent_c = completed_r / total_r - if (percent_c >= threshold) and (percent_c < complete): - print(f"Warning there is a more recent operator run for request {request_id} that is incomplete, but with {percent_c} of runs completed. This may be the operator run you need for analysis. Consult the request's operator run history and consider using the --all-runs flag if appropriate.") + if response_look_ahead.json()["results"]: + total_r = response_look_ahead.json()["results"][0]["num_total_runs"] + completed_r = response_look_ahead.json()["results"][0]["num_completed_runs"] + percent_c = completed_r / total_r + if (percent_c >= threshold) and (percent_c < complete): + print(f"Warning there is a more recent operator run for request {request_id} that is incomplete, but with {percent_c} of runs completed. This may be the operator run you need for analysis. Consult the request's operator run history and consider using the --all-runs flag if appropriate.") def open_request_file(request_ids_file): @@ -367,7 +371,6 @@ def is_run_manual(run, request_id): if Path(patient_dir).is_dir(): return True else: - shutil.rmtree(proj_dir) raise FileNotFoundError(f'The folder {patient_dir} does not exist. Bams for request {request_id} were manually imported to Voyager. Please link patients in the data folder before linking the project folder.') def find_files_by_sample(file_group, sample_id = None): From bd395387c88365b004dd7354a233438cdd93e33f Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Tue, 29 Oct 2024 11:14:27 -0400 Subject: [PATCH 04/25] add bai files for manual runs --- apps/access/__init__.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index ffa792e..9a08446 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -184,6 +184,7 @@ def link_app(operator_run, directory, request_id, sample_id, arguments, config, except Exception as e: print("could not delete symlink: {} ".format(path / run["id"]), file=sys.stderr) else: + breakpoint() is_run_manual(run, request_id) try: os.symlink(run["output_directory"], path / run["id"]) @@ -310,14 +311,20 @@ def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, argu if not runs: return + + add_bai=False + if len(runs) == 1: + manual_name="Run Access Legacy Fastq to Bam (file outputs) - Manual" + if manual_name in runs[0]["name"]: + add_bai=True files = [] # (sample_id, /path/to/file) - for run in runs: for file_group in get_files_by_run_id(run["id"], config): files = files + find_files_by_sample(file_group["value"], sample_id=sample_id) accepted_file_types = ['.bam', '.bai'] + for (sample_id, file) in files: file_path = get_file_path(file) _, file_ext = os.path.splitext(file_path) @@ -346,6 +353,10 @@ def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, argu try: os.symlink(file_path, sample_version_path / file_name) print((sample_version_path / file_name).absolute(), file=sys.stdout) + if add_bai: + file_name_index = file_name.replace(".bam", ".bai") + os.symlink(file_path, sample_version_path / file_name_index) + print((sample_version_path / file_name_index).absolute(), file=sys.stdout) except Exception as e: print("Could not create symlink from '{}' to '{}'".format(sample_version_path / file_name, file_path), file=sys.stderr) continue From 4443d7005144831077463c7449623e71f40306a4 Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Thu, 5 Dec 2024 11:25:02 -0500 Subject: [PATCH 05/25] update --- apps/access/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 9a08446..2de2118 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -184,7 +184,6 @@ def link_app(operator_run, directory, request_id, sample_id, arguments, config, except Exception as e: print("could not delete symlink: {} ".format(path / run["id"]), file=sys.stderr) else: - breakpoint() is_run_manual(run, request_id) try: os.symlink(run["output_directory"], path / run["id"]) From 44e33d16f68be04a0eda31e9cffa3b140ec0ade4 Mon Sep 17 00:00:00 2001 From: Eric Buehler <31450790+buehlere@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:29:27 -0500 Subject: [PATCH 06/25] Update __init__.py --- apps/access/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 2de2118..e21ef46 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -6,7 +6,6 @@ import shutil import requests import re -import pdb import json FLAG_TO_APPS = { "dmpmanifest": ("access_manifest", "manifest"), From 8345471c62f9ce16da3730fe45ed2d38394d1e00 Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Wed, 5 Feb 2025 14:41:07 -0500 Subject: [PATCH 07/25] access edge cases --- apps/access/__init__.py | 16 ++++++++++------ apps/cmoch/__init__.py | 3 ++- beaglecli | 9 +++++++++ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 2de2118..65dba9b 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -16,6 +16,8 @@ "snv": ("access legacy SNV", "small_variants"), "bams": ("access legacy", "bam_qc"), "nucleo": ("access nucleo", "bam_qc"), + "qc": ("access v2 nucleo qc", "quality_control"), + "qc_agg": ("access v2 nucleo qc agg", "quality_control_aggregate"), } def access_commands(arguments, config): @@ -184,12 +186,14 @@ def link_app(operator_run, directory, request_id, sample_id, arguments, config, except Exception as e: print("could not delete symlink: {} ".format(path / run["id"]), file=sys.stderr) else: - is_run_manual(run, request_id) - try: - os.symlink(run["output_directory"], path / run["id"]) - print((path / run["id"]).absolute(), file=sys.stdout) - except Exception as e: - print("could not create symlink from '{}' to '{}'".format(run["output_directory"], path / run["id"]), file=sys.stderr) + if is_run_manual(run, request_id): + print("Manual Run no linking in Project Directory, please see existing directory for results.", file=sys.stdout) + else: + try: + os.symlink(run["output_directory"], path / run["id"]) + print((path / run["id"]).absolute(), file=sys.stdout) + except Exception as e: + print("could not create symlink from '{}' to '{}'".format(run["output_directory"], path / run["id"]), file=sys.stderr) try: os.unlink(path_without_version / "current") diff --git a/apps/cmoch/__init__.py b/apps/cmoch/__init__.py index 26b672f..9cbaa02 100755 --- a/apps/cmoch/__init__.py +++ b/apps/cmoch/__init__.py @@ -10,7 +10,8 @@ "dmpmanifest": ("cmo_manifest", "manifest"), "bams": ("cmo-ch nucleo", "bams"), "qc": ("CMO-CH QC", "quality_control"), - "qc_agg": ("CMO-CH QC Agg", "quality_control_aggregate") + "qc_agg": ("CMO-CH QC Agg", "quality_control_aggregate"), + "chipvar": ("CMO-CH Chip-Var","chipvar") } diff --git a/beaglecli b/beaglecli index 14c07e5..6155043 100755 --- a/beaglecli +++ b/beaglecli @@ -30,6 +30,7 @@ import csv from apps.access import access_commands from apps.cmoch import cmoch_commands +from apps.heme import heme_commands from apps.lims import lims_commands from apps.cleaning import clean_json_comands @@ -98,6 +99,8 @@ Usage: beaglecli access link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] beaglecli cmoch link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] beaglecli cmoch link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli heme link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli heme link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] beaglecli --version Options: @@ -247,6 +250,12 @@ def command(arguments, config): 'beagle_endpoint': BEAGLE_ENDPOINT, 'api': API })) + if arguments.get('heme'): + return (heme_commands(arguments, { + 'token': config.token, + 'beagle_endpoint': BEAGLE_ENDPOINT, + 'api': API + })) if arguments.get('lims'): return (lims_commands(arguments, config)) From cdb8646dae0b1d7c6563fd7bbeef2119be92b4cc Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Wed, 5 Feb 2025 14:46:06 -0500 Subject: [PATCH 08/25] access edge updates --- apps/access/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 65dba9b..454a249 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -15,7 +15,7 @@ "sv": ("access legacy SV", "structural_variants"), "snv": ("access legacy SNV", "small_variants"), "bams": ("access legacy", "bam_qc"), - "nucleo": ("access nucleo", "bam_qc"), + "nucleo": ("access v2 nucleo", "bam_qc"), "qc": ("access v2 nucleo qc", "quality_control"), "qc_agg": ("access v2 nucleo qc agg", "quality_control_aggregate"), } From 0f303383a94c781b7e4771db52be4be0186031b7 Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Wed, 5 Feb 2025 15:28:10 -0500 Subject: [PATCH 09/25] more access edge update --- apps/access/__init__.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 29ec2be..e993be5 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -216,10 +216,21 @@ def link_single_sample_workflows_by_patient_id(operator_run, directory, request_ for run_meta in runs: run = get_run_by_id(run_meta["id"], config) + sample_key = None if operator_run['app_name'] == 'cmo_manifest': sample_path = path / request_id else: - sample_id = run["tags"]["cmoSampleIds"][0] if isinstance(run["tags"]["cmoSampleIds"], list) else run["tags"]["cmoSampleIds"] + if "cmoSampleIds" in run["tags"].keys(): + sample_key = "cmoSampleIds" + sample_id = run["tags"][sample_key][0] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] + elif "cmoSampleName" in run["tags"].keys(): + sample_key = "cmoSampleName" + sample_id = run["tags"][sample_key][0] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] + elif "cmoSampleId" in run["tags"].keys(): + sample_key = "cmoSampleId" + sample_id = run["tags"][sample_key] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] + else: + raise LookupError(f'Operator run {run["id"]} is missing Sample Meta Data') a, b, _ = sample_id.split("-", 2) patient_id = "-".join([a, b]) sample_path = path / patient_id / sample_id From 6884f67aadd1cd97ecc2ac9308b7c92f2c110156 Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Tue, 8 Apr 2025 15:44:35 -0400 Subject: [PATCH 10/25] adding v2 apps --- apps/access/__init__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index e993be5..1613fd8 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -9,16 +9,21 @@ import json FLAG_TO_APPS = { "dmpmanifest": ("access_manifest", "manifest"), + "bams": ("access legacy", "bam_qc"), "msi": ("access legacy MSI", "microsatellite_instability"), "cnv": ("access legacy CNV", "copy_number_variants"), "sv": ("access legacy SV", "structural_variants"), "snv": ("access legacy SNV", "small_variants"), - "bams": ("access legacy", "bam_qc"), - "nucleo": ("access v2 nucleo", "bam_qc"), - "qc": ("access v2 nucleo qc", "quality_control"), - "qc_agg": ("access v2 nucleo qc agg", "quality_control_aggregate"), + "bams_xs2":("access v2 nucleo", "bam_qc"), + "qc_xs2":("access v2 nucleo qc", "quality_control"), + "qc_agg_xs2":("access v2 nucleo qc agg", "quality_control_aggregate"), + "snv_xs2":("access v2 legacy SNV", "small_variants"), + "msi_xs2":("access v2 legacy MSI", "microsatellite_instability"), + "cnv_xs2":("access v2 legacy CNV", "copy_number_variants"), + "sv_xs2":("access v2 legacy SV", "structural_variants") } + def access_commands(arguments, config): print('Running ACCESS') From 89bae25ccf3467adf8c198997722424228c6f969 Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Tue, 8 Apr 2025 16:01:28 -0400 Subject: [PATCH 11/25] access v2 linking update --- apps/access/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 1613fd8..971878f 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -222,7 +222,7 @@ def link_single_sample_workflows_by_patient_id(operator_run, directory, request_ for run_meta in runs: run = get_run_by_id(run_meta["id"], config) sample_key = None - if operator_run['app_name'] == 'cmo_manifest': + if operator_run['app_name'] in ['access v2 nucleo qc agg', 'cmo_manifest']: sample_path = path / request_id else: if "cmoSampleIds" in run["tags"].keys(): From 749fe34626a29923c10cde6d18619ead74c0a904 Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Tue, 8 Apr 2025 16:42:58 -0400 Subject: [PATCH 12/25] v2 linking update --- apps/access/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 971878f..b729050 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -233,7 +233,7 @@ def link_single_sample_workflows_by_patient_id(operator_run, directory, request_ sample_id = run["tags"][sample_key][0] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] elif "cmoSampleId" in run["tags"].keys(): sample_key = "cmoSampleId" - sample_id = run["tags"][sample_key] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] + sample_id = run["tags"][sample_key][0] if isinstance(run["tags"][sample_key], list) else run["tags"][sample_key] else: raise LookupError(f'Operator run {run["id"]} is missing Sample Meta Data') a, b, _ = sample_id.split("-", 2) From 4aad235339d652610d2de821095feddf043c5d8d Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Tue, 15 Apr 2025 11:38:26 -0400 Subject: [PATCH 13/25] access v2 update --- apps/access/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index b729050..42f30a0 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -48,7 +48,7 @@ def access_commands(arguments, config): (app_name, directory) = FLAG_TO_APPS[app] operator_run = get_operator_run(app_name, app_version, tags, config, show_all_runs) if operator_run: - if(app == "bams"): + if(app in ["bams", "bams_xs2"]): link_bams_by_patient_id(operator_run, "bams", request, sample_id, arguments, config, show_all_runs) else: link_single_sample_workflows_by_patient_id(operator_run, directory, request, sample_id, arguments, From b4d9b288da1d417fdfd040e6a2272cc9173a9788 Mon Sep 17 00:00:00 2001 From: access pipeline-user Date: Mon, 25 Aug 2025 13:00:34 -0400 Subject: [PATCH 14/25] heme udpate --- apps/heme/__init__.py | 397 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 397 insertions(+) create mode 100755 apps/heme/__init__.py diff --git a/apps/heme/__init__.py b/apps/heme/__init__.py new file mode 100755 index 0000000..1caef87 --- /dev/null +++ b/apps/heme/__init__.py @@ -0,0 +1,397 @@ +import os +import sys +from collections import defaultdict +from urllib.parse import urljoin +from pathlib import Path +import shutil +import requests + +FLAG_TO_APPS = { + "dmpmanifest": ("cmo_manifest", "manifest"), + "bams": ("heme nucleo", "bams"), + "qc": ("heme nucleo qc", "quality_control"), + "qc_agg": ("heme nucleo qc agg", "quality_control_aggregate"), + "chipvar": ("Heme Chip-Var","chipvar") +} + + +def heme_commands(arguments, config): + print('Running HEME') + + request_ids, sample_id, apps, show_all_runs = get_arguments(arguments) + for request in request_ids: + tags = '{"cmoSampleId":"%s"}' % sample_id if sample_id else '{"igoRequestId":"%s"}' % request + if arguments.get('link'): + for (app, app_version) in apps: + (app_name, directory) = FLAG_TO_APPS[app] + operator_run = get_operator_run( + app_name, app_version, tags, config, show_all_runs) + if operator_run: + if arguments.get('--single-dir'): + if app == "bams": + link_bams_to_single_dir( + operator_run, app, request, sample_id, arguments, config, show_all_runs) + else: + print("Apps other than bams not supported at this time") + else: + link_app(operator_run, directory, request, + sample_id, arguments, config, show_all_runs) + + if arguments.get('link-patient'): + for (app, app_version) in apps: + (app_name, directory) = FLAG_TO_APPS[app] + operator_run = get_operator_run( + app_name, app_version, tags, config, show_all_runs) + if operator_run: + if(app == "bams"): + link_bams_by_patient_id( + operator_run, "bams", request, sample_id, arguments, config, show_all_runs) + else: + link_single_sample_workflows_by_patient_id(operator_run, directory, request, sample_id, arguments, + config, show_all_runs) + + +def get_operator_run(app_name, app_version=None, tags=None, config=None, show_all_runs=False): + latest_operator_run = { + "tags": tags, + "status": "COMPLETED", + "page_size": 1, + "app_name": app_name + } + + if show_all_runs: + latest_operator_run.pop("status") + + if app_version: + latest_operator_run["app_version"] = app_version + + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), + headers={ + 'Authorization': 'Bearer %s' % config['token']}, + params=latest_operator_run) + + latest_runs = response.json()["results"] + if not latest_runs: + print("There are no completed operator runs for this request in the following app: %s:%s" % + (str(app_name), str(app_version)), file=sys.stderr) + return None + + return latest_runs[0] + +def open_request_file(request_ids_file): + try: + with open(request_ids_file,'r') as file: + request_ids = [] + for line in file: + # Remove leading and trailing whitespaces and split the line by comma + request = line.strip(',\n') + # Append the list of items to the result list + request_ids.append(request) + except FileNotFoundError: + raise FileNotFoundError('Cannot find filename') + return request_ids + +def get_arguments(arguments): + request_ids = arguments.get('--request-ids') + request_ids_file = arguments.get('--request-ids-file') + sample_id = arguments.get('--sample-id') + app_tags = arguments.get('--apps') + show_all_runs = arguments.get('--all-runs') or False + if request_ids_file: + request_ids = open_request_file(request_ids_file) + + apps = [] # [(tag, version), ...] + for app in app_tags: + r = app.split(":") + if len(r) > 1: + apps.append((r[0], r[1])) + else: + apps.append((r[0], None)) + + return request_ids, sample_id, apps, show_all_runs + + +def get_runs(operator_run_id, config, show_all_runs): + run_params = { + "operator_run": operator_run_id, + "page_size": 1000, + "status": "COMPLETED" + } + + if show_all_runs: + run_params.pop("status") + + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), + headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) + + return response.json()["results"] + + +def get_run_by_id(run_id, config): + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run'] + run_id), + headers={'Authorization': 'Bearer %s' % config['token']}) + + return response.json() + + +def get_files_by_run_id(run_id, config): + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run'] + run_id), + headers={'Authorization': 'Bearer %s' % config['token']}) + + return response.json()["outputs"] + + +def get_file_path(file): + return file["location"][7:] + + +def link_app(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): + + version = arguments.get("--dir-version") or operator_run["app_version"] + should_delete = arguments.get("--delete") or False + + path = Path("./") + path_without_version = path / ("Project_" + request_id) / directory + path = path_without_version / version + path.mkdir(parents=True, exist_ok=True, mode=0o755) + + runs = get_runs(operator_run["id"], config, show_all_runs) + if not runs: + return + + files = [] # (sample_id, /path/to/file) + for run_meta in runs: + run = get_run_by_id(run_meta["id"], config) + if should_delete: + try: + os.unlink(path / run["id"]) + print((path / run["id"]).absolute(), file=sys.stdout) + except Exception as e: + print("could not delete symlink: {} ".format( + path / run["id"]), file=sys.stderr) + else: + try: + os.symlink(run["output_directory"], path / run["id"]) + print((path / run["id"]).absolute(), file=sys.stdout) + except Exception as e: + print("could not create symlink from '{}' to '{}'".format( + run["output_directory"], path / run["id"]), file=sys.stderr) + + try: + os.unlink(path_without_version / "current") + except: + pass + + if not should_delete: + os.symlink(path.absolute(), path_without_version / "current") + return "Completed" + + +def link_single_sample_workflows_by_patient_id(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): + version = arguments.get("--dir-version") or operator_run["app_version"] + should_delete = arguments.get("--delete") or False + + path = Path("./") / directory + + runs = get_runs(operator_run["id"], config, show_all_runs) + if not runs: + return + + for run_meta in runs: + run = get_run_by_id(run_meta["id"], config) + + if operator_run['app_name'] in ['heme nucleo qc agg', 'cmo_manifest'] : + sample_path = path / request_id + else: + if "cmoSampleIds" in run["tags"].keys(): + cmo_id = "cmoSampleIds" + else: + cmo_id = "cmoSampleId" + sample_id = run["tags"][cmo_id][0] if isinstance( + run["tags"][cmo_id], list) else run["tags"][cmo_id] + a, b, _ = sample_id.split("-", 2) + patient_id = "-".join([a, b]) + sample_path = path / patient_id / sample_id + sample_path.mkdir(parents=True, exist_ok=True, mode=0o755) + sample_version_path = sample_path / version + if should_delete: + try: + os.unlink(sample_version_path) + print(sample_version_path.absolute(), file=sys.stdout) + except Exception as e: + print("could not delete symlink: {} ".format( + sample_version_path), file=sys.stderr) + else: + try: + os.symlink(run["output_directory"], sample_version_path) + print(sample_version_path.absolute(), file=sys.stdout) + except Exception as e: + print("could not create symlink from '{}' to '{}'".format( + sample_version_path.absolute(), run["output_directory"]), file=sys.stderr) + + try: + os.unlink(sample_path / "current") + except: + pass + + if not should_delete: + os.symlink(sample_version_path.absolute(), sample_path / "current") + + return "Completed" + + +def link_bams_to_single_dir(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): + version = arguments.get("--dir-version") or operator_run["app_version"] + + path = Path("./") / directory / ("Project_" + request_id) + + runs = get_runs(operator_run["id"], config, show_all_runs) + + if not runs: + return + + files = [] # (sample_id, /path/to/file) + + for run in runs: + for file_group in get_files_by_run_id(run["id"], config): + files = files + \ + find_files_by_sample(file_group["value"], sample_id=sample_id) + + accepted_file_types = ['.bam', '.bai'] + for (sample_id, file) in files: + file_path = get_file_path(file) + _, file_ext = os.path.splitext(file_path) + + if file_ext not in accepted_file_types: + continue + + file_name = os.path.basename(file_path) + + sample_id, _ = file_name.split("_", 1) + a, b, _ = sample_id.split("-", 2) + patient_id = "-".join([a, b]) + + sample_path = path + sample_version_path = sample_path / version + sample_version_path.mkdir(parents=True, exist_ok=True, mode=0o755) + + try: + os.symlink(file_path, sample_version_path / file_name) + print((sample_version_path / file_name).absolute(), file=sys.stdout) + except Exception as e: + print("Could not create symlink from '{}' to '{}'".format( + sample_version_path / file_name, file_path), file=sys.stderr) + continue + + try: + os.unlink(sample_path / "current") + except Exception as e: + pass + + try: + os.symlink(sample_version_path.absolute(), sample_path / "current") + except Exception as e: + pass + + return "Completed" + + +def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): + version = arguments.get("--dir-version") or operator_run["app_version"] + should_delete = arguments.get("--delete") or False + + path = Path("./") / directory + + runs = get_runs(operator_run["id"], config, show_all_runs) + + if not runs: + return + + files = [] # (sample_id, /path/to/file) + + for run in runs: + for file_group in get_files_by_run_id(run["id"], config): + files = files + \ + find_files_by_sample(file_group["value"], sample_id=sample_id) + + accepted_file_types = ['.bam', '.bai'] + for (sample_id, file) in files: + file_path = get_file_path(file) + _, file_ext = os.path.splitext(file_path) + + if file_ext not in accepted_file_types: + continue + + file_name = os.path.basename(file_path) + + sample_id, _ = file_name.split("_", 1) + a, b, _ = sample_id.split("-", 2) + patient_id = "-".join([a, b]) + + sample_path = path / patient_id / sample_id + sample_version_path = sample_path / version + sample_version_path.mkdir(parents=True, exist_ok=True, mode=0o755) + + if should_delete: + try: + shutil.rmtree(sample_version_path) + print(sample_version_path.absolute(), file=sys.stdout) + except Exception as e: + print("could not delete folder: {} ".format( + sample_version_path), file=sys.stderr) + else: + try: + os.symlink(file_path, sample_version_path / file_name) + print((sample_version_path / file_name).absolute(), file=sys.stdout) + except Exception as e: + print("Could not create symlink from '{}' to '{}'".format( + sample_version_path / file_name, file_path), file=sys.stderr) + continue + + try: + os.unlink(sample_path / "current") + except Exception as e: + pass + + if not should_delete: + try: + os.symlink(sample_version_path.absolute(), + sample_path / "current") + except Exception as e: + pass + + return "Completed" + + +def find_files_by_sample(file_group, sample_id=None): + def traverse(file_group): + files = [] + if not file_group: + return [] + if type(file_group) == list: + if len(file_group) > 1: + return traverse(file_group[0]) + traverse(file_group[1:]) + elif file_group: + return traverse(file_group[0]) + elif "file" in file_group: + try: + file_sample_id = file_group["sampleId"] + if "File" == file_group["file"]["class"] and (not sample_id or + file_sample_id == + sample_id): + return [(file_sample_id, file_group["file"])] + [(file_sample_id, + f) for f in file_group["file"]["secondaryFiles"]] + except Exception as e: + print(e, file=sys.stderr) + elif "class" in file_group: + if file_group["class"] == "Directory": + return find_files_by_sample(file_group["listing"], sample_id=file_group["basename"]) + # TODO pull patient id here + elif file_group["class"] == "File": + secondary_files = [(sample_id, f) for f in file_group["secondaryFiles"] + ] if "secondaryFiles" in file_group else [] + return [(sample_id, file_group)] + secondary_files + + return [] + + return traverse(file_group) From 14ae36dfc0a4f50e01d693470b83d4c192ac3cf9 Mon Sep 17 00:00:00 2001 From: buehlere Date: Tue, 16 Sep 2025 09:45:34 -0400 Subject: [PATCH 15/25] Update __init__.py --- apps/access/__init__.py | 103 +++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 48 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 42f30a0..40586c3 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -25,36 +25,38 @@ def access_commands(arguments, config): - print('Running ACCESS') - - request_ids, sample_id, apps, show_all_runs = get_arguments(arguments) + print('Running ACCESS') + request_ids, sample_id, apps, uncompleted_runs = get_arguments(arguments) for request in request_ids: tags = '{"cmoSampleIds":"%s"}' % sample_id if sample_id else '{"igoRequestId":"%s"}' % request if arguments.get('link'): for (app, app_version) in apps: (app_name, directory) = FLAG_TO_APPS[app] - operator_run = get_operator_run(app_name, app_version, tags, config, show_all_runs) - if operator_run: + operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) + if operator_runs: if arguments.get('--single-dir'): if app == "bams": - link_bams_to_single_dir(operator_run, app, request, sample_id, arguments, config, show_all_runs) + link_bams_to_single_dir(operator_runs, app, request, sample_id, arguments, config, uncompleted_runs) else: print("Apps other than bams not supported at this time") else: - link_app(operator_run, directory, request, sample_id, arguments, config, show_all_runs) + link_app(operator_runs, directory, request, sample_id, arguments, config, uncompleted_runs) if arguments.get('link-patient'): for (app, app_version) in apps: (app_name, directory) = FLAG_TO_APPS[app] - operator_run = get_operator_run(app_name, app_version, tags, config, show_all_runs) - if operator_run: + operator_runs = get_operator_run(app_name, app_version, tags, config, uncompleted_runs) + if operator_runs: if(app in ["bams", "bams_xs2"]): - link_bams_by_patient_id(operator_run, "bams", request, sample_id, arguments, config, show_all_runs) + link_bams_by_patient_id(operator_runs, "bams", request, sample_id, arguments, config, uncompleted_runs) else: - link_single_sample_workflows_by_patient_id(operator_run, directory, request, sample_id, arguments, - config, show_all_runs) + link_single_sample_workflows_by_patient_id(operator_runs, directory, request, sample_id, arguments, + config, uncompleted_runs) -def get_operator_run(app_name, app_version=None, tags=None, config=None, show_all_runs=False): +def get_operator_run(app_name, arguments, app_version=None, tags=None, config=None, uncompleted_runs=False): + should_delete = arguments.get("--delete") or False + all_runs = arguments.get("--all-runs") or False + latest_operator_run = { "tags": tags, "status": "COMPLETED", @@ -62,8 +64,8 @@ def get_operator_run(app_name, app_version=None, tags=None, config=None, show_al "app_name": app_name } - operator_look_ahead(latest_operator_run, config, tags) - if show_all_runs: + # operator_look_ahead(latest_operator_run, config, tags) + if uncompleted_runs: latest_operator_run.pop("status") if app_version: @@ -72,19 +74,22 @@ def get_operator_run(app_name, app_version=None, tags=None, config=None, show_al response = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), headers={'Authorization': 'Bearer %s' % config['token']}, params=latest_operator_run) - latest_runs = response.json()["results"] if not latest_runs: if "igoRequestId" in tags: new_tag = tags.replace("igoRequestId", "requestId") - return get_operator_run(app_name, app_version, tags=new_tag, config=config) - + return get_operator_run(app_name, arguments, app_version, tags=new_tag, config=config) else: print("There are no completed operator runs for this request in the following app: %s:%s" % (str(app_name), str(app_version)), file=sys.stderr) return None - - return latest_runs[0] + if all_runs and should_delete: + next_response = response.json()['next'] + while next_response: + response = requests.get(next_response, headers={'Authorization': 'Bearer %s' % config['token']}) + next_response = response.json()['next'] + latest_runs.append(response.json()["results"][0]) + return latest_runs def operator_look_ahead(latest_operator_run, config, tags): threshold = float(.90) @@ -124,7 +129,7 @@ def get_arguments(arguments): request_ids_file = arguments.get('--request-ids-file') sample_id = arguments.get('--sample-id') app_tags = arguments.get('--apps') - show_all_runs = arguments.get('--all-runs') or False + uncompleted_runs = arguments.get('--uncompleted-runs') or False if request_ids_file: request_ids = open_request_file(request_ids_file) apps = [] # [(tag, version), ...] @@ -135,21 +140,25 @@ def get_arguments(arguments): else: apps.append((r[0], None)) - return request_ids, sample_id, apps, show_all_runs + return request_ids, sample_id, apps, uncompleted_runs -def get_runs(operator_run_id, config, show_all_runs): - run_params = { - "operator_run": operator_run_id, - "page_size": 1000, - "status": "COMPLETED" - } - if show_all_runs: - run_params.pop("status") +def get_runs(operator_run_id, config, uncompleted_runs): + response_rslts = [] + for oprn in operator_run_id: + run_params = { + "operator_run": oprn['id'], + "page_size": 1000, + "status": "COMPLETED" + } - response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), - headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) - return response.json()["results"] + if uncompleted_runs: + run_params.pop("status") + + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), + headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) + response_rslts.append(response.json()["results"][0]) + return response_rslts def get_run_by_id(run_id, config): response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run'] + run_id), @@ -167,19 +176,17 @@ def get_files_by_run_id(run_id, config): def get_file_path(file): return file["location"][7:] -def link_app(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): - version = arguments.get("--dir-version") or operator_run["app_version"] +def link_app(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): + version = arguments.get("--dir-version") should_delete = arguments.get("--delete") or False path = Path("./") path_without_version = path / ("Project_" + request_id) / directory path = path_without_version / version path.mkdir(parents=True, exist_ok=True, mode=0o755) - - runs = get_runs(operator_run["id"], config, show_all_runs) + runs = get_runs(operator_runs, config, uncompleted_runs) if not runs: return - files = [] # (sample_id, /path/to/file) for run_meta in runs: run = get_run_by_id(run_meta["id"], config) @@ -209,20 +216,20 @@ def link_app(operator_run, directory, request_id, sample_id, arguments, config, return "Completed" -def link_single_sample_workflows_by_patient_id(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): - version = arguments.get("--dir-version") or operator_run["app_version"] +def link_single_sample_workflows_by_patient_id(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): + version = arguments.get("--dir-version") or operator_runs["app_version"] should_delete = arguments.get("--delete") or False path = Path("./") / directory - runs = get_runs(operator_run["id"], config, show_all_runs) + runs = get_runs(operator_runs, config, uncompleted_runs) if not runs: return for run_meta in runs: run = get_run_by_id(run_meta["id"], config) sample_key = None - if operator_run['app_name'] in ['access v2 nucleo qc agg', 'cmo_manifest']: + if operator_runs['app_name'] in ['access v2 nucleo qc agg', 'cmo_manifest']: sample_path = path / request_id else: if "cmoSampleIds" in run["tags"].keys(): @@ -265,12 +272,12 @@ def link_single_sample_workflows_by_patient_id(operator_run, directory, request_ return "Completed" -def link_bams_to_single_dir(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): - version = arguments.get("--dir-version") or operator_run["app_version"] +def link_bams_to_single_dir(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): + version = arguments.get("--dir-version") path = Path("./") / directory / ("Project_" + request_id) - runs = get_runs(operator_run["id"], config, show_all_runs) + runs = get_runs(operator_runs, config, uncompleted_runs) if not runs: return @@ -319,13 +326,13 @@ def link_bams_to_single_dir(operator_run, directory, request_id, sample_id, argu return "Completed" -def link_bams_by_patient_id(operator_run, directory, request_id, sample_id, arguments, config, show_all_runs): - version = arguments.get("--dir-version") or operator_run["app_version"] +def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): + version = arguments.get("--dir-version") should_delete = arguments.get("--delete") or False path = Path("./") / directory - runs = get_runs(operator_run["id"], config, show_all_runs) + runs = get_runs(operator_runs, config, uncompleted_runs) if not runs: return From 93ae91886e019424ac51654de3840b57b71f34ff Mon Sep 17 00:00:00 2001 From: buehlere Date: Tue, 16 Sep 2025 11:21:47 -0400 Subject: [PATCH 16/25] delete old sample names --- apps/access/__init__.py | 6 +++++- beaglecli | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 40586c3..493b4a9 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -45,7 +45,7 @@ def access_commands(arguments, config): if arguments.get('link-patient'): for (app, app_version) in apps: (app_name, directory) = FLAG_TO_APPS[app] - operator_runs = get_operator_run(app_name, app_version, tags, config, uncompleted_runs) + operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) if operator_runs: if(app in ["bams", "bams_xs2"]): link_bams_by_patient_id(operator_runs, "bams", request, sample_id, arguments, config, uncompleted_runs) @@ -130,6 +130,10 @@ def get_arguments(arguments): sample_id = arguments.get('--sample-id') app_tags = arguments.get('--apps') uncompleted_runs = arguments.get('--uncompleted-runs') or False + all_runs = arguments.get('--all-runs') + delete = arguments.get('--delete') or False + if all_runs and delete is False: + raise ValueError("The --all-runs flag must be used with the --delete flag to avoid accidental linking of multiple runs.") if request_ids_file: request_ids = open_request_file(request_ids_file) apps = [] # [(tag, version), ...] diff --git a/beaglecli b/beaglecli index 6155043..3dc7805 100755 --- a/beaglecli +++ b/beaglecli @@ -95,8 +95,8 @@ Usage: beaglecli tempo-mpgen beaglecli tempo-mpgen override --normals= --tumors= beaglecli lims metadata [--request-id=] - beaglecli access link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli access link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli access link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] + beaglecli access link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] beaglecli cmoch link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] beaglecli cmoch link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] beaglecli heme link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] From d5d0a26aa9361c4bed1cfac4df3f14714cba260b Mon Sep 17 00:00:00 2001 From: EricWilliam Buehler Date: Tue, 16 Sep 2025 16:26:47 -0400 Subject: [PATCH 17/25] delete empty sample folders --- apps/access/__init__.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 493b4a9..b2d10e3 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -161,7 +161,7 @@ def get_runs(operator_run_id, config, uncompleted_runs): response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) - response_rslts.append(response.json()["results"][0]) + response_rslts = response_rslts + response.json()["results"] return response_rslts def get_run_by_id(run_id, config): @@ -198,6 +198,10 @@ def link_app(operator_runs, directory, request_id, sample_id, arguments, config, try: os.unlink(path / run["id"]) print((path / run["id"]).absolute(), file=sys.stdout) + if len(os.listdir(path)) == 0: + print("no remaining runs in version, deleting directory {} ".format(path), file=sys.stderr) + print(f"deleting {path}") + shutil.rmtree(path) except Exception as e: print("could not delete symlink: {} ".format(path / run["id"]), file=sys.stderr) else: @@ -225,7 +229,6 @@ def link_single_sample_workflows_by_patient_id(operator_runs, directory, request should_delete = arguments.get("--delete") or False path = Path("./") / directory - runs = get_runs(operator_runs, config, uncompleted_runs) if not runs: return @@ -233,7 +236,7 @@ def link_single_sample_workflows_by_patient_id(operator_runs, directory, request for run_meta in runs: run = get_run_by_id(run_meta["id"], config) sample_key = None - if operator_runs['app_name'] in ['access v2 nucleo qc agg', 'cmo_manifest']: + if operator_runs[0]['app_name'] in ['access v2 nucleo qc agg', 'cmo_manifest']: sample_path = path / request_id else: if "cmoSampleIds" in run["tags"].keys(): @@ -257,6 +260,10 @@ def link_single_sample_workflows_by_patient_id(operator_runs, directory, request try: os.unlink(sample_version_path) print(sample_version_path.absolute(), file=sys.stdout) + if len(os.listdir(sample_path)) == 0: + print("no remaining sample versions, deleting sample directory {} ".format(sample_path), file=sys.stderr) + print(f"deleting {sample_path}") + shutil.rmtree(sample_path) except Exception as e: print("could not delete symlink: {} ".format(sample_version_path), file=sys.stderr) else: @@ -376,6 +383,15 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg try: shutil.rmtree(sample_version_path) print(sample_version_path.absolute(), file=sys.stdout) + # delete if there are no other versions remaining + # if len(os.listdir(sample_path)) == 1: + # if os.path.islink(sample_path / os.listdir(sample_path)[0]) and not os.path.exists(sample_path / os.listdir(sample_path)[0]): + # print("no remaining sample versions, deleting sample directory {} ".format(sample_path), file=sys.stderr) + # shutil.rmtree(sample_path) + if len(os.listdir(sample_path)) == 0: + print("no remaining sample versions, deleting sample directory {} ".format(sample_path), file=sys.stderr) + print(f"deleting {sample_path}") + shutil.rmtree(sample_path) except Exception as e: print("could not delete folder: {} ".format(sample_version_path), file=sys.stderr) else: From 5586da6c378a55f3e854d094a691e2bc1fca246f Mon Sep 17 00:00:00 2001 From: buehlere Date: Mon, 6 Oct 2025 11:45:25 -0400 Subject: [PATCH 18/25] Update access_beagle_endpoint.py --- scripts/bin/access_beagle_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/bin/access_beagle_endpoint.py b/scripts/bin/access_beagle_endpoint.py index 8bedc98..423eefd 100755 --- a/scripts/bin/access_beagle_endpoint.py +++ b/scripts/bin/access_beagle_endpoint.py @@ -24,7 +24,7 @@ def get_file_ids(self, request_id): data = self.run_url(url) file_ids = list() for result in data['results']: - file_ids.append(result['id']) + file_ids.append(result['id']) return file_ids def get_file_id_by_path(self, path): From 8c3d706e80505aa1dab839dbf6d18486e7172e79 Mon Sep 17 00:00:00 2001 From: buehlere Date: Tue, 7 Oct 2025 14:49:35 -0400 Subject: [PATCH 19/25] merging XS apps to one script --- apps/access/__init__.py | 126 +++++++++++++++++++++++++++------------- beaglecli | 16 ++--- 2 files changed, 93 insertions(+), 49 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index b2d10e3..edf1191 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -20,50 +20,73 @@ "snv_xs2":("access v2 legacy SNV", "small_variants"), "msi_xs2":("access v2 legacy MSI", "microsatellite_instability"), "cnv_xs2":("access v2 legacy CNV", "copy_number_variants"), - "sv_xs2":("access v2 legacy SV", "structural_variants") + "sv_xs2":("access v2 legacy SV", "structural_variants"), + "dmpmanifest_ch": ("cmo_manifest", "manifest"), + "bams_ch": ("cmo-ch nucleo", "bams"), + "qc_ch": ("CMO-CH QC", "quality_control"), + "qc_agg_ch": ("CMO-CH QC Agg", "quality_control_aggregate"), + "chipvar_ch": ("CMO-CH Chip-Var","chipvar"), + "dmpmanifest_heme": ("cmo_manifest", "manifest"), + "bams_heme": ("heme nucleo", "bams"), + "qc_heme": ("heme nucleo qc", "quality_control"), + "qc_agg_heme": ("heme nucleo qc agg", "quality_control_aggregate"), + "chipvar_heme": ("Heme Chip-Var","chipvar") } def access_commands(arguments, config): - print('Running ACCESS') + print('Running Linking') request_ids, sample_id, apps, uncompleted_runs = get_arguments(arguments) - for request in request_ids: - tags = '{"cmoSampleIds":"%s"}' % sample_id if sample_id else '{"igoRequestId":"%s"}' % request - if arguments.get('link'): - for (app, app_version) in apps: - (app_name, directory) = FLAG_TO_APPS[app] - operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) - if operator_runs: - if arguments.get('--single-dir'): - if app == "bams": - link_bams_to_single_dir(operator_runs, app, request, sample_id, arguments, config, uncompleted_runs) - else: - print("Apps other than bams not supported at this time") + if arguments.get('--all-requests'): + tags = '' + request = None + link_runs(apps, arguments, None, request, sample_id, tags, config, uncompleted_runs, app_prefix="JUNO: ") + elif arguments.get('--request-ids'): + for request in request_ids: + tags = '{"cmoSampleIds":"%s"}' % sample_id if sample_id else '{"igoRequestId":"%s"}' % request + link_runs(apps, arguments, None, request, sample_id, tags, config, uncompleted_runs) + else: + raise ValueError("Must provide either --all-requests or --request-ids") + + +def link_runs(apps, arguments, operator_runs, request, sample_id, tags, config, uncompleted_runs, app_prefix=''): + if arguments.get('link'): + for (app, app_version) in apps: + (app_name, directory) = app_prefix + FLAG_TO_APPS[app] + operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) + if operator_runs: + if arguments.get('--single-dir'): + if app == "bams": + link_bams_to_single_dir(operator_runs, app, request, sample_id, arguments, config, uncompleted_runs) else: - link_app(operator_runs, directory, request, sample_id, arguments, config, uncompleted_runs) - - if arguments.get('link-patient'): - for (app, app_version) in apps: - (app_name, directory) = FLAG_TO_APPS[app] - operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) - if operator_runs: - if(app in ["bams", "bams_xs2"]): - link_bams_by_patient_id(operator_runs, "bams", request, sample_id, arguments, config, uncompleted_runs) - else: - link_single_sample_workflows_by_patient_id(operator_runs, directory, request, sample_id, arguments, - config, uncompleted_runs) + print("Apps other than bams not supported at this time") + else: + link_app(operator_runs, directory, request, sample_id, arguments, config, uncompleted_runs) + + if arguments.get('link-patient'): + for (app, app_version) in apps: + (app_name, directory) = FLAG_TO_APPS[app] + operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) + operator_runs = operator_runs[0:2] + if operator_runs: + if(app in ["bams", "bams_xs2"]): + link_bams_by_patient_id(operator_runs, "bams", request, sample_id, arguments, config, uncompleted_runs) + else: + link_single_sample_workflows_by_patient_id(operator_runs, directory, request, sample_id, arguments, + config, uncompleted_runs) def get_operator_run(app_name, arguments, app_version=None, tags=None, config=None, uncompleted_runs=False): should_delete = arguments.get("--delete") or False all_runs = arguments.get("--all-runs") or False - + all_requests = arguments.get("--all-requests") or False + latest_operator_run = { "tags": tags, "status": "COMPLETED", "page_size": 1, "app_name": app_name } - + seen_request_ids = set() # operator_look_ahead(latest_operator_run, config, tags) if uncompleted_runs: latest_operator_run.pop("status") @@ -71,6 +94,9 @@ def get_operator_run(app_name, arguments, app_version=None, tags=None, config=No if app_version: latest_operator_run["app_version"] = app_version + if all_requests: + latest_operator_run.pop("tags") + latest_operator_run.pop("page_size") response = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), headers={'Authorization': 'Bearer %s' % config['token']}, params=latest_operator_run) @@ -83,12 +109,12 @@ def get_operator_run(app_name, arguments, app_version=None, tags=None, config=No print("There are no completed operator runs for this request in the following app: %s:%s" % (str(app_name), str(app_version)), file=sys.stderr) return None - if all_runs and should_delete: + if (all_runs and should_delete) or all_requests: next_response = response.json()['next'] while next_response: response = requests.get(next_response, headers={'Authorization': 'Bearer %s' % config['token']}) next_response = response.json()['next'] - latest_runs.append(response.json()["results"][0]) + latest_runs.extend(response.json()["results"]) return latest_runs def operator_look_ahead(latest_operator_run, config, tags): @@ -147,8 +173,9 @@ def get_arguments(arguments): return request_ids, sample_id, apps, uncompleted_runs -def get_runs(operator_run_id, config, uncompleted_runs): +def get_runs(operator_run_id, config, uncompleted_runs, all_requests=False): response_rslts = [] + seen_request_ids = {} for oprn in operator_run_id: run_params = { "operator_run": oprn['id'], @@ -161,7 +188,15 @@ def get_runs(operator_run_id, config, uncompleted_runs): response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) + if all_requests: + request_id = response.json()["results"][0]['request_id'] + date = response.json()["results"][0]['created_date'] + if request_id in seen_request_ids and seen_request_ids.get(request_id) > date: + continue + else: + seen_request_ids[request_id] = date response_rslts = response_rslts + response.json()["results"] + return response_rslts def get_run_by_id(run_id, config): @@ -183,16 +218,19 @@ def get_file_path(file): def link_app(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): version = arguments.get("--dir-version") should_delete = arguments.get("--delete") or False - - path = Path("./") - path_without_version = path / ("Project_" + request_id) / directory - path = path_without_version / version - path.mkdir(parents=True, exist_ok=True, mode=0o755) - runs = get_runs(operator_runs, config, uncompleted_runs) + all_requests = arguments.get("--all-requests") or False + runs = get_runs(operator_runs, config, uncompleted_runs, all_requests) + if not runs: return files = [] # (sample_id, /path/to/file) for run_meta in runs: + if all_requests: + request_id = run_meta["request_id"] + path = Path("./") + path_without_version = path / ("Project_" + request_id) / directory + path = path_without_version / version + path.mkdir(parents=True, exist_ok=True, mode=0o755) run = get_run_by_id(run_meta["id"], config) if should_delete: try: @@ -205,9 +243,9 @@ def link_app(operator_runs, directory, request_id, sample_id, arguments, config, except Exception as e: print("could not delete symlink: {} ".format(path / run["id"]), file=sys.stderr) else: - if is_run_manual(run, request_id): - print("Manual Run no linking in Project Directory, please see existing directory for results.", file=sys.stdout) - else: + # if is_run_manual(run, request_id): + # print("Manual Run no linking in Project Directory, please see existing directory for results.", file=sys.stdout) + # else: try: os.symlink(run["output_directory"], path / run["id"]) print((path / run["id"]).absolute(), file=sys.stdout) @@ -360,7 +398,7 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg files = files + find_files_by_sample(file_group["value"], sample_id=sample_id) accepted_file_types = ['.bam', '.bai'] - + seen_paths = set() for (sample_id, file) in files: file_path = get_file_path(file) _, file_ext = os.path.splitext(file_path) @@ -377,6 +415,11 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg sample_path = path / patient_id / sample_id sample_version_path = sample_path / version + + if (sample_version_path / file_name) in seen_paths: + print(f"Skipping duplicate file {file_path} for sample {sample_id}", file=sys.stderr) + continue + sample_version_path.mkdir(parents=True, exist_ok=True, mode=0o755) if should_delete: @@ -398,6 +441,7 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg try: os.symlink(file_path, sample_version_path / file_name) print((sample_version_path / file_name).absolute(), file=sys.stdout) + seen_paths.add(sample_version_path / file_name) if add_bai: file_name_index = file_name.replace(".bam", ".bai") os.symlink(file_path, sample_version_path / file_name_index) diff --git a/beaglecli b/beaglecli index 3dc7805..e3c6ad8 100755 --- a/beaglecli +++ b/beaglecli @@ -95,12 +95,12 @@ Usage: beaglecli tempo-mpgen beaglecli tempo-mpgen override --normals= --tumors= beaglecli lims metadata [--request-id=] - beaglecli access link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] - beaglecli access link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] - beaglecli cmoch link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli cmoch link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli heme link [--single-dir] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli heme link-patient [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli access link [--single-dir] [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] + beaglecli access link-patient [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] + beaglecli cmoch link [--single-dir] [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli cmoch link-patient [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli heme link [--single-dir] [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli heme link-patient [--all-runs] [--all-requests] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] beaglecli --version Options: @@ -245,13 +245,13 @@ def command(arguments, config): 'api': API })) if arguments.get('cmoch'): - return (cmoch_commands(arguments, { + return (access_commands(arguments, { 'token': config.token, 'beagle_endpoint': BEAGLE_ENDPOINT, 'api': API })) if arguments.get('heme'): - return (heme_commands(arguments, { + return (access_commands(arguments, { 'token': config.token, 'beagle_endpoint': BEAGLE_ENDPOINT, 'api': API From 53f26bb0f0540dcaac5686e6af027c11019b7ee2 Mon Sep 17 00:00:00 2001 From: buehlere Date: Wed, 15 Oct 2025 10:26:33 -0400 Subject: [PATCH 20/25] Update __init__.py --- apps/access/__init__.py | 66 ++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index edf1191..1bbd0cd 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -36,23 +36,24 @@ def access_commands(arguments, config): print('Running Linking') - request_ids, sample_id, apps, uncompleted_runs = get_arguments(arguments) + request_ids, sample_id, apps, uncompleted_runs, app_prefix = get_arguments(arguments) if arguments.get('--all-requests'): - tags = '' request = None - link_runs(apps, arguments, None, request, sample_id, tags, config, uncompleted_runs, app_prefix="JUNO: ") + link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app_prefix) elif arguments.get('--request-ids'): for request in request_ids: - tags = '{"cmoSampleIds":"%s"}' % sample_id if sample_id else '{"igoRequestId":"%s"}' % request - link_runs(apps, arguments, None, request, sample_id, tags, config, uncompleted_runs) + link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app_prefix) else: raise ValueError("Must provide either --all-requests or --request-ids") -def link_runs(apps, arguments, operator_runs, request, sample_id, tags, config, uncompleted_runs, app_prefix=''): +def link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app_prefix=''): if arguments.get('link'): for (app, app_version) in apps: - (app_name, directory) = app_prefix + FLAG_TO_APPS[app] + (app_name, directory) = FLAG_TO_APPS[app] + app_name = f'{app_prefix} {app_name}' + all_requests = arguments.get("--all-requests") or False + tags = build_params(sample_id, request, app_name,all_requests, use_json_tags=False) operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) if operator_runs: if arguments.get('--single-dir'): @@ -66,8 +67,12 @@ def link_runs(apps, arguments, operator_runs, request, sample_id, tags, config, if arguments.get('link-patient'): for (app, app_version) in apps: (app_name, directory) = FLAG_TO_APPS[app] + if app_prefix: + app_name = f'{app_prefix} {app_name}' + all_requests = arguments.get("--all-requests") or False + tags = build_params(sample_id, request, app_name,all_requests, use_json_tags=False) operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) - operator_runs = operator_runs[0:2] + # operator_runs = operator_runs[0:2] if operator_runs: if(app in ["bams", "bams_xs2"]): link_bams_by_patient_id(operator_runs, "bams", request, sample_id, arguments, config, uncompleted_runs) @@ -75,18 +80,36 @@ def link_runs(apps, arguments, operator_runs, request, sample_id, tags, config, link_single_sample_workflows_by_patient_id(operator_runs, directory, request, sample_id, arguments, config, uncompleted_runs) +def build_params(sample_id, request, app, all_requests, use_json_tags): + params = { + "status": "COMPLETED", + "page_size": "1", + "app_name": app, + } + + if all_requests: + return params + + tags = {} + if sample_id: + tags["cmoSampleIds"] = sample_id + if request: + tags["igoRequestId"] = request + + if tags: + params["tags"] = json.dumps(tags) + + return params + + + + return params def get_operator_run(app_name, arguments, app_version=None, tags=None, config=None, uncompleted_runs=False): should_delete = arguments.get("--delete") or False all_runs = arguments.get("--all-runs") or False all_requests = arguments.get("--all-requests") or False - latest_operator_run = { - "tags": tags, - "status": "COMPLETED", - "page_size": 1, - "app_name": app_name - } - seen_request_ids = set() + latest_operator_run = tags # operator_look_ahead(latest_operator_run, config, tags) if uncompleted_runs: latest_operator_run.pop("status") @@ -94,13 +117,11 @@ def get_operator_run(app_name, arguments, app_version=None, tags=None, config=No if app_version: latest_operator_run["app_version"] = app_version - if all_requests: - latest_operator_run.pop("tags") - latest_operator_run.pop("page_size") + response = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), headers={'Authorization': 'Bearer %s' % config['token']}, params=latest_operator_run) - latest_runs = response.json()["results"] + latest_runs = response.json()["results"] if not latest_runs: if "igoRequestId" in tags: new_tag = tags.replace("igoRequestId", "requestId") @@ -158,6 +179,7 @@ def get_arguments(arguments): uncompleted_runs = arguments.get('--uncompleted-runs') or False all_runs = arguments.get('--all-runs') delete = arguments.get('--delete') or False + app_prefix = arguments.get('--app-prefix') or None if all_runs and delete is False: raise ValueError("The --all-runs flag must be used with the --delete flag to avoid accidental linking of multiple runs.") if request_ids_file: @@ -170,7 +192,7 @@ def get_arguments(arguments): else: apps.append((r[0], None)) - return request_ids, sample_id, apps, uncompleted_runs + return request_ids, sample_id, apps, uncompleted_runs, app_prefix def get_runs(operator_run_id, config, uncompleted_runs, all_requests=False): @@ -209,7 +231,6 @@ def get_run_by_id(run_id, config): def get_files_by_run_id(run_id, config): response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run'] + run_id), headers={'Authorization': 'Bearer %s' % config['token']}) - return response.json()["outputs"] def get_file_path(file): @@ -336,7 +357,6 @@ def link_bams_to_single_dir(operator_runs, directory, request_id, sample_id, arg for run in runs: for file_group in get_files_by_run_id(run["id"], config): files = files + find_files_by_sample(file_group["value"], sample_id=sample_id) - accepted_file_types = ['.bam', '.bai'] for (sample_id, file) in files: file_path = get_file_path(file) @@ -380,7 +400,6 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg should_delete = arguments.get("--delete") or False path = Path("./") / directory - runs = get_runs(operator_runs, config, uncompleted_runs) if not runs: @@ -396,7 +415,6 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg for run in runs: for file_group in get_files_by_run_id(run["id"], config): files = files + find_files_by_sample(file_group["value"], sample_id=sample_id) - accepted_file_types = ['.bam', '.bai'] seen_paths = set() for (sample_id, file) in files: From 91165b37075b0cb3a84e9704ce15f6f33933005c Mon Sep 17 00:00:00 2001 From: buehlere Date: Wed, 15 Oct 2025 10:46:02 -0400 Subject: [PATCH 21/25] Update beaglecli --- beaglecli | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/beaglecli b/beaglecli index e3c6ad8..3675a20 100755 --- a/beaglecli +++ b/beaglecli @@ -35,7 +35,7 @@ from apps.lims import lims_commands from apps.cleaning import clean_json_comands -BEAGLE_ENDPOINT = os.environ.get('BEAGLE_ENDPOINT', 'http://voyager:5007') +BEAGLE_ENDPOINT = os.environ.get('BEAGLE_ENDPOINT', 'http://isvvoyagerstage01.mskcc.org:4007') BEAGLE_USER = os.environ.get('BEAGLE_USER', '') BEAGLE_PW = os.environ.get('BEAGLE_PW', '') @@ -95,12 +95,12 @@ Usage: beaglecli tempo-mpgen beaglecli tempo-mpgen override --normals= --tumors= beaglecli lims metadata [--request-id=] - beaglecli access link [--single-dir] [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] - beaglecli access link-patient [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] - beaglecli cmoch link [--single-dir] [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli cmoch link-patient [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli heme link [--single-dir] [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] - beaglecli heme link-patient [--all-runs] [--all-requests] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli access link [--single-dir] [--app-prefix=] [--single-dir] [--all-requests] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] + beaglecli access link-patient [--all-requests] [--app-prefix=] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] [--uncompleted-runs] + beaglecli cmoch link [--single-dir] [--all-requests] [--app-prefix=] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli cmoch link-patient [--all-requests] [--all-runs] [--app-prefix=] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli heme link [--single-dir] [--all-requests] [--app-prefix=] [--all-runs] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] + beaglecli heme link-patient [--all-runs] [--all-requests] [--app-prefix=] [--request-ids=]... [--request-ids-file=] [--sample-id=] [--dir-version=] [--apps=]... [--delete] beaglecli --version Options: From 107384eb1b3efb26ee376faa9373fab2ac7a5d39 Mon Sep 17 00:00:00 2001 From: buehlere Date: Tue, 21 Oct 2025 11:59:36 -0400 Subject: [PATCH 22/25] Update __init__.py --- apps/access/__init__.py | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 1bbd0cd..021ac2c 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -268,6 +268,10 @@ def link_app(operator_runs, directory, request_id, sample_id, arguments, config, # print("Manual Run no linking in Project Directory, please see existing directory for results.", file=sys.stdout) # else: try: + if os.path.islink(path / run["id"]) and not os.path.exists(path / run["id"]): + run_path = path / run["id"] + print(f"Removing broken symlink for: {run_path}") + os.remove(path / run["id"]) os.symlink(run["output_directory"], path / run["id"]) print((path / run["id"]).absolute(), file=sys.stdout) except Exception as e: @@ -327,8 +331,12 @@ def link_single_sample_workflows_by_patient_id(operator_runs, directory, request print("could not delete symlink: {} ".format(sample_version_path), file=sys.stderr) else: try: + if os.path.islink(sample_version_path) and not os.path.exists(sample_version_path): + print(f"Removing broken symlink for: {sample_version_path}") + os.remove(sample_version_path) os.symlink(run["output_directory"], sample_version_path) print(sample_version_path.absolute(), file=sys.stdout) + except Exception as e: print("could not create symlink from '{}' to '{}'".format(sample_version_path.absolute(), run["output_directory"]), file=sys.stderr) @@ -375,8 +383,12 @@ def link_bams_to_single_dir(operator_runs, directory, request_id, sample_id, arg sample_path = path sample_version_path = sample_path / version sample_version_path.mkdir(parents=True, exist_ok=True, mode=0o755) + full_path = sample_version_path / file_name try: + if os.path.islink(full_path) and not os.path.exists(full_path): + print(f"Removing broken symlink for: {full_path}") + os.remove(full_path) os.symlink(file_path, sample_version_path / file_name) print((sample_version_path / file_name).absolute(), file=sys.stdout) except Exception as e: @@ -433,6 +445,9 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg sample_path = path / patient_id / sample_id sample_version_path = sample_path / version + full_path = sample_version_path / file_name + file_name_index = file_name.replace(".bam", ".bai") + full_path_index = sample_version_path / file_name_index if (sample_version_path / file_name) in seen_paths: print(f"Skipping duplicate file {file_path} for sample {sample_id}", file=sys.stderr) @@ -457,15 +472,19 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg print("could not delete folder: {} ".format(sample_version_path), file=sys.stderr) else: try: - os.symlink(file_path, sample_version_path / file_name) - print((sample_version_path / file_name).absolute(), file=sys.stdout) - seen_paths.add(sample_version_path / file_name) + if os.path.islink(full_path) and not os.path.exists(full_path): + print(f"Removing broken symlink for: {full_path}") + os.remove(full_path) + if add_bai: + os.remove(full_path_index) + os.symlink(file_path, full_path) + print((full_path).absolute(), file=sys.stdout) + seen_paths.add(full_path) if add_bai: - file_name_index = file_name.replace(".bam", ".bai") - os.symlink(file_path, sample_version_path / file_name_index) - print((sample_version_path / file_name_index).absolute(), file=sys.stdout) + os.symlink(file_path, full_path_index) + print((full_path_index).absolute(), file=sys.stdout) except Exception as e: - print("Could not create symlink from '{}' to '{}'".format(sample_version_path / file_name, file_path), file=sys.stderr) + print("Could not create symlink from '{}' to '{}'".format(full_path, file_path), file=sys.stderr) continue try: From e924906b26a42796caace7691dc8d0f80beef19b Mon Sep 17 00:00:00 2001 From: buehlere Date: Tue, 21 Oct 2025 13:42:00 -0400 Subject: [PATCH 23/25] Update beaglecli --- beaglecli | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beaglecli b/beaglecli index 3675a20..03ee196 100755 --- a/beaglecli +++ b/beaglecli @@ -35,7 +35,7 @@ from apps.lims import lims_commands from apps.cleaning import clean_json_comands -BEAGLE_ENDPOINT = os.environ.get('BEAGLE_ENDPOINT', 'http://isvvoyagerstage01.mskcc.org:4007') +BEAGLE_ENDPOINT = os.environ.get('BEAGLE_ENDPOINT', 'http://isvvoyagerprod01.mskcc.org:5007') BEAGLE_USER = os.environ.get('BEAGLE_USER', '') BEAGLE_PW = os.environ.get('BEAGLE_PW', '') From a41bc4e327141aed4c20392f685de2b21619f038 Mon Sep 17 00:00:00 2001 From: SVC_core005_bot03 Date: Tue, 28 Oct 2025 10:03:48 -0400 Subject: [PATCH 24/25] iris prod changes --- apps/access/__init__.py | 54 ++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 021ac2c..81c614a 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -32,15 +32,16 @@ "qc_agg_heme": ("heme nucleo qc agg", "quality_control_aggregate"), "chipvar_heme": ("Heme Chip-Var","chipvar") } - +REQUEST_PATIENT_APPS = ['access v2 nucleo qc agg', 'cmo_manifest', "CMO-CH QC Agg", "heme nucleo qc agg",'JUNO PIPELINE: access v2 nucleo qc agg', 'JUNO PIPELINE: cmo_manifest', "JUNO PIPELINE: CMO-CH QC Agg", "JUNO PIPELINE: heme nucleo qc agg"] def access_commands(arguments, config): print('Running Linking') request_ids, sample_id, apps, uncompleted_runs, app_prefix = get_arguments(arguments) - if arguments.get('--all-requests'): + if arguments.get('--all-requests') and not arguments.get('--request-ids'): request = None + # request = request_ids[0] link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app_prefix) - elif arguments.get('--request-ids'): + elif arguments.get('--request-ids') and not arguments.get('--all-requests'): for request in request_ids: link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app_prefix) else: @@ -74,7 +75,7 @@ def link_runs(apps, arguments, request, sample_id, config, uncompleted_runs, app operator_runs = get_operator_run(app_name, arguments, app_version, tags, config, uncompleted_runs) # operator_runs = operator_runs[0:2] if operator_runs: - if(app in ["bams", "bams_xs2"]): + if(app in ["bams", "bams_xs2", "bams_ch", "bams_heme"]): link_bams_by_patient_id(operator_runs, "bams", request, sample_id, arguments, config, uncompleted_runs) else: link_single_sample_workflows_by_patient_id(operator_runs, directory, request, sample_id, arguments, @@ -108,7 +109,7 @@ def get_operator_run(app_name, arguments, app_version=None, tags=None, config=No should_delete = arguments.get("--delete") or False all_runs = arguments.get("--all-runs") or False all_requests = arguments.get("--all-requests") or False - + latest_operator_run = tags # operator_look_ahead(latest_operator_run, config, tags) if uncompleted_runs: @@ -121,7 +122,10 @@ def get_operator_run(app_name, arguments, app_version=None, tags=None, config=No response = requests.get(urljoin(config['beagle_endpoint'], config['api']['operator-runs']), headers={'Authorization': 'Bearer %s' % config['token']}, params=latest_operator_run) - latest_runs = response.json()["results"] + try: + latest_runs = response.json()["results"] + except: + breakpoint() if not latest_runs: if "igoRequestId" in tags: new_tag = tags.replace("igoRequestId", "requestId") @@ -198,6 +202,7 @@ def get_arguments(arguments): def get_runs(operator_run_id, config, uncompleted_runs, all_requests=False): response_rslts = [] seen_request_ids = {} + request_skips = set(["11674_L", "11674_P", "11674_K", "11674_Q_custom"]) for oprn in operator_run_id: run_params = { "operator_run": oprn['id'], @@ -210,10 +215,11 @@ def get_runs(operator_run_id, config, uncompleted_runs, all_requests=False): response = requests.get(urljoin(config['beagle_endpoint'], config['api']['run']), headers={'Authorization': 'Bearer %s' % config['token']}, params=run_params) + if all_requests: request_id = response.json()["results"][0]['request_id'] date = response.json()["results"][0]['created_date'] - if request_id in seen_request_ids and seen_request_ids.get(request_id) > date: + if (request_id in seen_request_ids and seen_request_ids.get(request_id) > date) or (request_id in request_skips): continue else: seen_request_ids[request_id] = date @@ -249,8 +255,12 @@ def link_app(operator_runs, directory, request_id, sample_id, arguments, config, if all_requests: request_id = run_meta["request_id"] path = Path("./") - path_without_version = path / ("Project_" + request_id) / directory - path = path_without_version / version + if request_id is None: + pass + else: + path_without_version = path / ("Project_" + request_id) / directory + path = path_without_version / version + path.mkdir(parents=True, exist_ok=True, mode=0o755) run = get_run_by_id(run_meta["id"], config) if should_delete: @@ -275,6 +285,7 @@ def link_app(operator_runs, directory, request_id, sample_id, arguments, config, os.symlink(run["output_directory"], path / run["id"]) print((path / run["id"]).absolute(), file=sys.stdout) except Exception as e: + breakpoint() print("could not create symlink from '{}' to '{}'".format(run["output_directory"], path / run["id"]), file=sys.stderr) try: @@ -290,17 +301,20 @@ def link_app(operator_runs, directory, request_id, sample_id, arguments, config, def link_single_sample_workflows_by_patient_id(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): version = arguments.get("--dir-version") or operator_runs["app_version"] should_delete = arguments.get("--delete") or False - + all_requests = arguments.get("--all-requests") or False path = Path("./") / directory - runs = get_runs(operator_runs, config, uncompleted_runs) + runs = get_runs(operator_runs, config, uncompleted_runs, all_requests) + seen_request_ids = set() if not runs: return for run_meta in runs: run = get_run_by_id(run_meta["id"], config) sample_key = None - if operator_runs[0]['app_name'] in ['access v2 nucleo qc agg', 'cmo_manifest']: - sample_path = path / request_id + + if (operator_runs[0]['app_name'] in REQUEST_PATIENT_APPS) and (run_meta["request_id"] not in seen_request_ids): + sample_path = path / run_meta["request_id"] + seen_request_ids.add(sample_path) else: if "cmoSampleIds" in run["tags"].keys(): sample_key = "cmoSampleIds" @@ -338,6 +352,7 @@ def link_single_sample_workflows_by_patient_id(operator_runs, directory, request print(sample_version_path.absolute(), file=sys.stdout) except Exception as e: + breakpoint() print("could not create symlink from '{}' to '{}'".format(sample_version_path.absolute(), run["output_directory"]), file=sys.stderr) try: @@ -352,16 +367,15 @@ def link_single_sample_workflows_by_patient_id(operator_runs, directory, request def link_bams_to_single_dir(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): version = arguments.get("--dir-version") - + all_requests = arguments.get("--all-requests") or False path = Path("./") / directory / ("Project_" + request_id) - runs = get_runs(operator_runs, config, uncompleted_runs) + runs = get_runs(operator_runs, config, uncompleted_runs, all_requests) if not runs: return files = [] # (sample_id, /path/to/file) - for run in runs: for file_group in get_files_by_run_id(run["id"], config): files = files + find_files_by_sample(file_group["value"], sample_id=sample_id) @@ -392,6 +406,7 @@ def link_bams_to_single_dir(operator_runs, directory, request_id, sample_id, arg os.symlink(file_path, sample_version_path / file_name) print((sample_version_path / file_name).absolute(), file=sys.stdout) except Exception as e: + breakpoint() print("Could not create symlink from '{}' to '{}'".format(sample_version_path / file_name, file_path), file=sys.stderr) continue @@ -410,9 +425,10 @@ def link_bams_to_single_dir(operator_runs, directory, request_id, sample_id, arg def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arguments, config, uncompleted_runs): version = arguments.get("--dir-version") should_delete = arguments.get("--delete") or False + all_requests = arguments.get("--all-requests") or False path = Path("./") / directory - runs = get_runs(operator_runs, config, uncompleted_runs) + runs = get_runs(operator_runs, config, uncompleted_runs, all_requests) if not runs: return @@ -450,6 +466,8 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg full_path_index = sample_version_path / file_name_index if (sample_version_path / file_name) in seen_paths: + breakpoint() + # 11674_L and 11674_K are old runs with bad sample names print(f"Skipping duplicate file {file_path} for sample {sample_id}", file=sys.stderr) continue @@ -473,6 +491,7 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg else: try: if os.path.islink(full_path) and not os.path.exists(full_path): + breakpoint() print(f"Removing broken symlink for: {full_path}") os.remove(full_path) if add_bai: @@ -484,6 +503,7 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg os.symlink(file_path, full_path_index) print((full_path_index).absolute(), file=sys.stdout) except Exception as e: + breakpoint() print("Could not create symlink from '{}' to '{}'".format(full_path, file_path), file=sys.stderr) continue From 41c09fc8d0e8973e8df48e4144e435e2d8d3d480 Mon Sep 17 00:00:00 2001 From: SVC_core005_bot02 Date: Wed, 29 Oct 2025 09:42:54 -0400 Subject: [PATCH 25/25] fixing current folder link for all requests --- apps/access/__init__.py | 46 +++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/apps/access/__init__.py b/apps/access/__init__.py index 81c614a..269bd28 100755 --- a/apps/access/__init__.py +++ b/apps/access/__init__.py @@ -254,6 +254,12 @@ def link_app(operator_runs, directory, request_id, sample_id, arguments, config, for run_meta in runs: if all_requests: request_id = run_meta["request_id"] + if request_id_prev != request_id: + try: + os.symlink(path.absolute(), path_without_version / "current") + except: + print(f'Could not create current folder for: {path.absolute()}') + request_id_prev = request_id path = Path("./") if request_id is None: pass @@ -274,25 +280,20 @@ def link_app(operator_runs, directory, request_id, sample_id, arguments, config, except Exception as e: print("could not delete symlink: {} ".format(path / run["id"]), file=sys.stderr) else: - # if is_run_manual(run, request_id): - # print("Manual Run no linking in Project Directory, please see existing directory for results.", file=sys.stdout) - # else: - try: - if os.path.islink(path / run["id"]) and not os.path.exists(path / run["id"]): - run_path = path / run["id"] - print(f"Removing broken symlink for: {run_path}") - os.remove(path / run["id"]) - os.symlink(run["output_directory"], path / run["id"]) - print((path / run["id"]).absolute(), file=sys.stdout) - except Exception as e: - breakpoint() - print("could not create symlink from '{}' to '{}'".format(run["output_directory"], path / run["id"]), file=sys.stderr) - - try: + try: + if os.path.islink(path / run["id"]) and not os.path.exists(path / run["id"]): + run_path = path / run["id"] + print(f"Removing broken symlink for: {run_path}") + os.remove(path / run["id"]) + os.symlink(run["output_directory"], path / run["id"]) + print((path / run["id"]).absolute(), file=sys.stdout) + except Exception as e: + breakpoint() + print("could not create symlink from '{}' to '{}'".format(run["output_directory"], path / run["id"]), file=sys.stderr) + try: os.unlink(path_without_version / "current") - except: + except: pass - if not should_delete: os.symlink(path.absolute(), path_without_version / "current") return "Completed" @@ -520,16 +521,11 @@ def link_bams_by_patient_id(operator_runs, directory, request_id, sample_id, arg return "Completed" def is_run_manual(run, request_id): - pattern = "/work/access/production/data/bams/*" + pattern = r"/data1/core006/access/production/runs/Project_.*/bam_qc" match = re.match(pattern, run["output_directory"]) - patient_dir = run["output_directory"] + '/current' - proj_dir = Path("./") / ("Project_" + request_id) if match: - if Path(patient_dir).is_dir(): - return True - else: - raise FileNotFoundError(f'The folder {patient_dir} does not exist. Bams for request {request_id} were manually imported to Voyager. Please link patients in the data folder before linking the project folder.') - + return True + return False def find_files_by_sample(file_group, sample_id = None): def traverse(file_group): files = []