From e46bab4f16eb34525bcad4cd4a5641630aa636a2 Mon Sep 17 00:00:00 2001 From: Shi Zhang Date: Wed, 18 Mar 2026 11:39:54 -0400 Subject: [PATCH] add LFS template and scripts focusing on job arrays --- tools/afvs_prepare_folders.py | 15 +++++- tools/afvs_prepare_workunits.py | 2 + tools/afvs_submit_jobs.py | 78 ++++++++++++++++++++++++++++++++ tools/templates/all.ctrl | 22 ++++++++- tools/templates/template1.lsf.sh | 64 ++++++++++++++++++++++++++ 5 files changed, 179 insertions(+), 2 deletions(-) create mode 100644 tools/templates/template1.lsf.sh diff --git a/tools/afvs_prepare_folders.py b/tools/afvs_prepare_folders.py index dce66e1..f249b16 100755 --- a/tools/afvs_prepare_folders.py +++ b/tools/afvs_prepare_folders.py @@ -217,8 +217,21 @@ def check_parameters(config): if(empty_value(config, 'bash_template')): print("* 'bash_template' must be set if batchsystem is 'bash'") error = 1 + elif(config['batchsystem'] == "lsf"): + if(empty_value(config, 'lsf_template')): + print("* 'lsf_template' must be set if batchsystem is 'lsf'") + error = 1 + if(empty_value(config, 'lsf_cpus')): + print("* 'lsf_cpus' must be set if batchsystem is 'lsf'") + error = 1 + if(empty_value(config, 'lsf_queue')): + print("* 'lsf_queue' must be set if batchsystem is 'lsf'") + error = 1 + if(empty_value(config, 'lsf_job_submission_timeout')): + print("* 'lsf_job_submission_timeout' must be set if batchsystem is 'lsf'") + error = 1 else: - print(f"* batchsystem '{config['batchsystem']}' is not supported. Only awsbatch and slurm are supported") + print(f"* batchsystem '{config['batchsystem']}' is not supported. Only awsbatch, slurm, lsf and bash are supported") if(empty_value(config, 'ligand_library_format')): diff --git a/tools/afvs_prepare_workunits.py b/tools/afvs_prepare_workunits.py index 708a611..2ba17f8 100755 --- a/tools/afvs_prepare_workunits.py +++ b/tools/afvs_prepare_workunits.py @@ -329,6 +329,8 @@ def process(ctx): max_array_job_size = int(config['aws_batch_array_job_size']) elif(config['batchsystem'] == "slurm"): max_array_job_size = int(config['slurm_array_job_size']) + elif(config['batchsystem'] == "lsf"): + max_array_job_size = int(config['lsf_array_job_size']) elif(config['batchsystem'] == "bash"): max_array_job_size = int(config['bash_array_job_size']) diff --git a/tools/afvs_submit_jobs.py b/tools/afvs_submit_jobs.py index 42446cf..1b2b5ab 100755 --- a/tools/afvs_submit_jobs.py +++ b/tools/afvs_submit_jobs.py @@ -173,6 +173,82 @@ def submit_slurm(config, client, current_workunit, jobline): # Slow ourselves down a bit time.sleep(0.1) +def submit_lsf(config, client, current_workunit, jobline): + + # Get the template + try: + with open(config['lsf_template']) as f: + lsf_template = jinja2.Template(f.read()) + except IOError as error: + print(f"Cannot open the lsf_template ({config['lsf_template']})") + raise error + + jobline_str = str(jobline) + + # how many jobs are there that we need to submit? + subjobs_count = len(current_workunit['subjobs']) + + # Where are we putting this file? + batch_workunit_base = Path(config['sharedfs_workunit_path']) / jobline_str + batch_workunit_base.mkdir(parents=True, exist_ok=True) + + # LSF doesn't have a good way to submit an array job with a dependency on the previous one, + # so we will submit each subjob separately and then track the job IDs here. + job_ids = {} + + for subjob_id in range(subjobs_count): + + batch_submit_file = batch_workunit_base / f"submit_{subjob_id}.lsf" + + template_values = { + "job_letter": config['job_letter'], + "job_name": config['job_name'], + "threads_to_use": config['threads_to_use'], + "subjob_id": subjob_id, + "lsf_cpus": config['lsf_cpus'], + "lsf_account": config['lsf_account'], + "lsf_queue": config['lsf_queue'], + "workunit_id": jobline_str, + "job_storage_mode": config['job_storage_mode'], + "job_tgz": current_workunit['download_path'], + "batch_workunit_base": batch_workunit_base.resolve().as_posix() + } + render_output = lsf_template.render(template_values) + + try: + with open(batch_submit_file, "w") as f: + f.write(render_output) + except IOError as error: + print(f"Cannot write the workunit lsf file ({batch_submit_file})") + raise error + + cmd = ["bsub"] + + try: + with open(batch_submit_file, "r") as stdin_file: + ret = subprocess.run(cmd, stdin=stdin_file, capture_output=True, + text=True, timeout=int(config['lsf_job_submission_timeout'])) + except subprocess.TimeoutExpired as err: + raise Exception("timeout on submission to bsub") + + if ret.returncode == 0: + match = re.search( + r'Job <(?P\d+)> is submitted', ret.stdout) + if match: + job_ids[str(subjob_id)] = int(match.groupdict()['value']) + else: + raise Exception("bsub returned, but cannot parse output") + else: + raise Exception(f"bsub did not return successfully: {ret.stderr}") + + # Slow ourselves down a bit + time.sleep(0.1) + + current_workunit['status'] = { + 'af_job_status': 'SUBMITTED', + 'job_name': f"afvs-{config['job_letter']}-{jobline_str}", + 'job_ids': job_ids + } def submit_aws_batch(config, client, current_workunit, jobline): @@ -313,6 +389,8 @@ def process(config, start, stop): submit_slurm(config, client, current_workunit, jobline) elif(submit_type == "bash"): run_bash(config, current_workunit, jobline) + elif(submit_type == "lsf"): + submit_lsf(config, client, current_workunit, jobline) else: print(f"Unknown submit type {submit_type}") diff --git a/tools/templates/all.ctrl b/tools/templates/all.ctrl index 6c7d1b6..cd9e9f6 100644 --- a/tools/templates/all.ctrl +++ b/tools/templates/all.ctrl @@ -22,7 +22,7 @@ program_timeout=90 ************************************************ batchsystem=awsbatch -# Possible values: awsbatch, slurm +# Possible values: awsbatch, slurm, lsf ****** AWS Batch Options (if batchsystem=awsbatch) @@ -92,6 +92,26 @@ slurm_array_job_size=100 slurm_job_submission_timeout=10 # Timeout for submission of slurm jobs +****** LSF Options (if batchsystem=lsf) + +lsf_template=./templates/template1.lsf.sh +# Template for the LSF job + +lsf_account= +# LSF project/account to use (-P). If not set, default is used + +lsf_queue=normal +# Queue to submit the job (-q) + +lsf_cpus=18 +# Number of CPUs per job (-n) + +lsf_job_submission_timeout=10 +# Timeout in seconds for each individual bsub call + +lsf_array_job_size=100 +# Maximum number of subjobs to group under a single workunit + ****** Bash Options (if batchsystem=bash) bash_template=./templates/template1.bash diff --git a/tools/templates/template1.lsf.sh b/tools/templates/template1.lsf.sh new file mode 100644 index 0000000..8487020 --- /dev/null +++ b/tools/templates/template1.lsf.sh @@ -0,0 +1,64 @@ +#!/usr/bin/env bash + +# Copyright (C) 2019 Christoph Gorgulla +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# This file is part of AdaptiveFlow. +# +# AdaptiveFlow is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# AdaptiveFlow is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with AdaptiveFlow. If not, see . + +# --------------------------------------------------------------------------- +# +# Description: LSF job file. +# +# --------------------------------------------------------------------------- + +# Update the BSUB section if needed for your particular LSF +# installation. If a line starts with "##" (two #s) it will be +# ignored + + +#BSUB -J {{job_letter}}-{{workunit_id}}-{{subjob_id}} +#BSUB -n {{lsf_cpus}} +##BSUB -W 12:00 +##BSUB -M 800 +#BSUB -o {{batch_workunit_base}}/{{subjob_id}}.out +#BSUB -e {{batch_workunit_base}}/{{subjob_id}}.err +#BSUB -q {{lsf_queue}} +#BSUB -P {{lsf_account}} + + +# If you are using a virtualenv, make sure the correct one +# is being activated + +source $HOME/afvs_env/bin/activate + + +# Job Information -- generally nothing in this +# section should be changed +################################################################################## + +export AFVS_WORKUNIT={{workunit_id}} +export AFVS_JOB_STORAGE_MODE={{job_storage_mode}} +export AFVS_WORKUNIT_SUBJOB={{subjob_id}} +export AFVS_TMP_PATH=/dev/shm +export AFVS_CONFIG_JOB_TGZ={{job_tgz}} +export AFVS_TOOLS_PATH=${PWD}/bin +export AFVS_VCPUS={{threads_to_use}} + +################################################################################## + +date +%s > {{batch_workunit_base}}/{{subjob_id}}.start +./templates/afvs_run.py +date +%s > {{batch_workunit_base}}/{{subjob_id}}.end \ No newline at end of file