Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion tools/afvs_prepare_folders.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,21 @@ def check_parameters(config):
if(empty_value(config, 'bash_template')):
print("* 'bash_template' must be set if batchsystem is 'bash'")
error = 1
elif(config['batchsystem'] == "lsf"):
if(empty_value(config, 'lsf_template')):
print("* 'lsf_template' must be set if batchsystem is 'lsf'")
error = 1
if(empty_value(config, 'lsf_cpus')):
print("* 'lsf_cpus' must be set if batchsystem is 'lsf'")
error = 1
if(empty_value(config, 'lsf_queue')):
print("* 'lsf_queue' must be set if batchsystem is 'lsf'")
error = 1
if(empty_value(config, 'lsf_job_submission_timeout')):
print("* 'lsf_job_submission_timeout' must be set if batchsystem is 'lsf'")
error = 1
else:
print(f"* batchsystem '{config['batchsystem']}' is not supported. Only awsbatch and slurm are supported")
print(f"* batchsystem '{config['batchsystem']}' is not supported. Only awsbatch, slurm, lsf and bash are supported")


if(empty_value(config, 'ligand_library_format')):
Expand Down
2 changes: 2 additions & 0 deletions tools/afvs_prepare_workunits.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ def process(ctx):
max_array_job_size = int(config['aws_batch_array_job_size'])
elif(config['batchsystem'] == "slurm"):
max_array_job_size = int(config['slurm_array_job_size'])
elif(config['batchsystem'] == "lsf"):
max_array_job_size = int(config['lsf_array_job_size'])
elif(config['batchsystem'] == "bash"):
max_array_job_size = int(config['bash_array_job_size'])

Expand Down
78 changes: 78 additions & 0 deletions tools/afvs_submit_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,82 @@ def submit_slurm(config, client, current_workunit, jobline):
# Slow ourselves down a bit
time.sleep(0.1)

def submit_lsf(config, client, current_workunit, jobline):

# Get the template
try:
with open(config['lsf_template']) as f:
lsf_template = jinja2.Template(f.read())
except IOError as error:
print(f"Cannot open the lsf_template ({config['lsf_template']})")
raise error

jobline_str = str(jobline)

# how many jobs are there that we need to submit?
subjobs_count = len(current_workunit['subjobs'])

# Where are we putting this file?
batch_workunit_base = Path(config['sharedfs_workunit_path']) / jobline_str
batch_workunit_base.mkdir(parents=True, exist_ok=True)

# LSF doesn't have a good way to submit an array job with a dependency on the previous one,
# so we will submit each subjob separately and then track the job IDs here.
job_ids = {}

for subjob_id in range(subjobs_count):

batch_submit_file = batch_workunit_base / f"submit_{subjob_id}.lsf"

template_values = {
"job_letter": config['job_letter'],
"job_name": config['job_name'],
"threads_to_use": config['threads_to_use'],
"subjob_id": subjob_id,
"lsf_cpus": config['lsf_cpus'],
"lsf_account": config['lsf_account'],
"lsf_queue": config['lsf_queue'],
"workunit_id": jobline_str,
"job_storage_mode": config['job_storage_mode'],
"job_tgz": current_workunit['download_path'],
"batch_workunit_base": batch_workunit_base.resolve().as_posix()
}
render_output = lsf_template.render(template_values)

try:
with open(batch_submit_file, "w") as f:
f.write(render_output)
except IOError as error:
print(f"Cannot write the workunit lsf file ({batch_submit_file})")
raise error

cmd = ["bsub"]

try:
with open(batch_submit_file, "r") as stdin_file:
ret = subprocess.run(cmd, stdin=stdin_file, capture_output=True,
text=True, timeout=int(config['lsf_job_submission_timeout']))
except subprocess.TimeoutExpired as err:
raise Exception("timeout on submission to bsub")

if ret.returncode == 0:
match = re.search(
r'Job <(?P<value>\d+)> is submitted', ret.stdout)
if match:
job_ids[str(subjob_id)] = int(match.groupdict()['value'])
else:
raise Exception("bsub returned, but cannot parse output")
else:
raise Exception(f"bsub did not return successfully: {ret.stderr}")

# Slow ourselves down a bit
time.sleep(0.1)

current_workunit['status'] = {
'af_job_status': 'SUBMITTED',
'job_name': f"afvs-{config['job_letter']}-{jobline_str}",
'job_ids': job_ids
}

def submit_aws_batch(config, client, current_workunit, jobline):

Expand Down Expand Up @@ -313,6 +389,8 @@ def process(config, start, stop):
submit_slurm(config, client, current_workunit, jobline)
elif(submit_type == "bash"):
run_bash(config, current_workunit, jobline)
elif(submit_type == "lsf"):
submit_lsf(config, client, current_workunit, jobline)
else:
print(f"Unknown submit type {submit_type}")

Expand Down
22 changes: 21 additions & 1 deletion tools/templates/all.ctrl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ program_timeout=90
************************************************

batchsystem=awsbatch
# Possible values: awsbatch, slurm
# Possible values: awsbatch, slurm, lsf

****** AWS Batch Options (if batchsystem=awsbatch)

Expand Down Expand Up @@ -92,6 +92,26 @@ slurm_array_job_size=100
slurm_job_submission_timeout=10
# Timeout for submission of slurm jobs

****** LSF Options (if batchsystem=lsf)

lsf_template=./templates/template1.lsf.sh
# Template for the LSF job

lsf_account=
# LSF project/account to use (-P). If not set, default is used

lsf_queue=normal
# Queue to submit the job (-q)

lsf_cpus=18
# Number of CPUs per job (-n)

lsf_job_submission_timeout=10
# Timeout in seconds for each individual bsub call

lsf_array_job_size=100
# Maximum number of subjobs to group under a single workunit

****** Bash Options (if batchsystem=bash)

bash_template=./templates/template1.bash
Expand Down
64 changes: 64 additions & 0 deletions tools/templates/template1.lsf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env bash

# Copyright (C) 2019 Christoph Gorgulla
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# This file is part of AdaptiveFlow.
#
# AdaptiveFlow is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# AdaptiveFlow is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with AdaptiveFlow. If not, see <https://www.gnu.org/licenses/>.

# ---------------------------------------------------------------------------
#
# Description: LSF job file.
#
# ---------------------------------------------------------------------------

# Update the BSUB section if needed for your particular LSF
# installation. If a line starts with "##" (two #s) it will be
# ignored


#BSUB -J {{job_letter}}-{{workunit_id}}-{{subjob_id}}
#BSUB -n {{lsf_cpus}}
##BSUB -W 12:00
##BSUB -M 800
#BSUB -o {{batch_workunit_base}}/{{subjob_id}}.out
#BSUB -e {{batch_workunit_base}}/{{subjob_id}}.err
#BSUB -q {{lsf_queue}}
#BSUB -P {{lsf_account}}


# If you are using a virtualenv, make sure the correct one
# is being activated

source $HOME/afvs_env/bin/activate


# Job Information -- generally nothing in this
# section should be changed
##################################################################################

export AFVS_WORKUNIT={{workunit_id}}
export AFVS_JOB_STORAGE_MODE={{job_storage_mode}}
export AFVS_WORKUNIT_SUBJOB={{subjob_id}}
export AFVS_TMP_PATH=/dev/shm
export AFVS_CONFIG_JOB_TGZ={{job_tgz}}
export AFVS_TOOLS_PATH=${PWD}/bin
export AFVS_VCPUS={{threads_to_use}}

##################################################################################

date +%s > {{batch_workunit_base}}/{{subjob_id}}.start
./templates/afvs_run.py
date +%s > {{batch_workunit_base}}/{{subjob_id}}.end