From f30ded4389d3dc398d5c6a4f63de2169996ab879 Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Tue, 14 Jan 2025 18:11:18 +0100 Subject: [PATCH 1/5] [JTH] add flexible run case options for different launchers --- bluemath_tk/wrappers/_base_wrappers.py | 191 +++++++++++++++--- bluemath_tk/wrappers/_utils_wrappers.py | 51 +++++ bluemath_tk/wrappers/swash/swash_example.py | 4 +- bluemath_tk/wrappers/swash/swash_wrapper.py | 40 ++++ bluemath_tk/wrappers/swash/templates/slurm.sh | 8 + 5 files changed, 260 insertions(+), 34 deletions(-) create mode 100644 bluemath_tk/wrappers/_utils_wrappers.py create mode 100644 bluemath_tk/wrappers/swash/templates/slurm.sh diff --git a/bluemath_tk/wrappers/_base_wrappers.py b/bluemath_tk/wrappers/_base_wrappers.py index 03563d5..4fb305a 100644 --- a/bluemath_tk/wrappers/_base_wrappers.py +++ b/bluemath_tk/wrappers/_base_wrappers.py @@ -6,6 +6,8 @@ import numpy as np from jinja2 import Environment, FileSystemLoader from ..core.models import BlueMathModel +from ._utils_wrappers import write_array_in_file, copy_files +from concurrent.futures import ThreadPoolExecutor, as_completed class BaseModelWrapper(BlueMathModel): @@ -49,12 +51,21 @@ class BaseModelWrapper(BlueMathModel): Copy file(s) from source to destination. build_cases(mode="all_combinations") Create the cases folders and render the input files. - run_cases() - Run the cases. + run_case(case_dir, launcher=None, script=None, params=None) + Run a single case based on the launcher, script, and parameters. + run_cases(launcher=None, script=None, params=None, parallel=False) + Run the cases based on the launcher, script, and parameters. + Parallel execution is optional. run_model(case_dir) Run the model for a specific case (abstract method). + run_model_with_apptainer(case_dir) + Run the model for a specific case using Apptainer. + run_model_with_docker(case_dir) + Run the model for a specific case using Docker. """ + available_launchers = ["sbatch", "apptainer", "docker"] + def __init__( self, templates_dir: str, @@ -264,15 +275,7 @@ def write_array_in_file(self, array: np.ndarray, filename: str) -> None: The name of the file. """ - with open(filename, "w") as f: - if array.ndim == 1: - for item in array: - f.write(f"{item}\n") - elif array.ndim == 2: - for row in array: - f.write(" ".join(map(str, row)) + "\n") - else: - raise ValueError("Only 1D and 2D arrays are supported") + write_array_in_file(array=array, filename=filename) def copy_files(self, src: str, dst: str) -> None: """ @@ -286,18 +289,7 @@ def copy_files(self, src: str, dst: str) -> None: The destination file. """ - if os.path.isdir(src): - os.makedirs(dst, exist_ok=True) - for file in os.listdir(src): - with open(file, "r") as f: - content = f.read() - with open(os.path.join(dst, file), "w") as f: - f.write(content) - else: - with open(src, "r") as f: - content = f.read() - with open(dst, "w") as f: - f.write(content) + copy_files(src=src, dst=dst) def build_cases(self, mode: str = "all_combinations") -> None: """ @@ -331,18 +323,125 @@ def build_cases(self, mode: str = "all_combinations") -> None: f"{len(self.cases_dirs)} cases created in {mode} mode and saved in {self.output_dir}" ) - def run_cases(self) -> None: + def run_case( + self, + case_dir: str, + launcher: str = None, + script: str = None, + params: str = None, + ) -> None: """ - Run the cases. + Run a single case based on the launcher, script, and parameters. + + Parameters + ---------- + case_dir : str + The case directory. + launcher : str, optional + The launcher to run the case. Default is None. + script : str, optional + The script to run the case. Default is None. + + Notes + ----- + - If launcher is None, the method run_model will be called. + - If launcher is not recognized, the method _exec_bash_commands will be called. """ - if self.cases_dirs: - for case_dir in self.cases_dirs: - self.logger.info(f"Running case in {case_dir}") - self.run_model(case_dir=case_dir) - self.logger.info("All cases ran successfully.") + self.logger.info(f"Running case in {case_dir}") + if launcher is None: + self.run_model(case_dir=case_dir) + elif launcher == "apptainer": + self.run_model_with_apptainer(case_dir=case_dir) + elif launcher == "docker": + self.run_model_with_docker(case_dir=case_dir) else: - raise ValueError("No cases to run.") + self._exec_bash_commands(str_cmd=f"{launcher} {params} {script}") + + def run_cases( + self, + launcher: str = None, + script: str = None, + params: str = None, + parallel: bool = False, + ) -> None: + """ + Run the cases based on the launcher, script, and parameters. + Parallel execution is optional. + + Parameters + ---------- + launcher : str, optional + The launcher to run the cases. Default is None. + script : str, optional + The script to run the cases. Default is None. + params : str, optional + The parameters to run the cases. Default is None. + parallel : bool, optional + If True, the cases will be run in parallel. Default is False. + + Raises + ------ + ValueError + If the launcher is not recognized or the script does not exist. + + Notes + ----- + - Sbatch is the only option different from None, Apptainer, and Docker. + """ + + if launcher is not None: + if launcher not in self.available_launchers: + raise ValueError( + f"Invalid launcher: {launcher}, not in {self.available_launchers}." + ) + if launcher == "sbatch": + if not os.path.exists(script): + raise ValueError(f"Script {script} does not exist.") + self.logger.info("Running cases with sbatch.") + self._exec_bash_commands(str_cmd=f"{launcher} {params} {script}") + else: + if parallel: + num_threads = self.get_num_processors_available() + self.logger.info( + f"Running cases in parallel with launcher={launcher}. Number of threads: {num_threads}." + ) + with ThreadPoolExecutor(max_workers=num_threads) as executor: + future_to_case = { + executor.submit( + self.run_case, case_dir, launcher, script, params + ): case_dir + for case_dir in self.cases_dirs + } + for future in as_completed(future_to_case): + case_dir = future_to_case[future] + try: + future.result() + except Exception as exc: + self.logger.error( + f"Case {case_dir} generated an exception: {exc}." + ) + else: + self.logger.info( + f"Running cases sequentially with launcher={launcher}." + ) + for case_dir in self.cases_dirs: + try: + self.run_case( + case_dir=case_dir, + launcher=launcher, + script=script, + params=params, + ) + except Exception as exc: + self.logger.error( + f"Case {case_dir} generated an exception: {exc}." + ) + if launcher == "docker": + # Remove stopped containers after running all cases + remove_stopped_containers_cmd = 'docker ps -a --filter "ancestor=tausiaj/swash-image:latest" -q | xargs docker rm' + self._exec_bash_commands(str_cmd=remove_stopped_containers_cmd) + self.logger.info("All cases ran successfully.") @abstractmethod def run_model(self, case_dir: str) -> None: @@ -352,7 +451,35 @@ def run_model(self, case_dir: str) -> None: Parameters ---------- case_dir : str - The directory of the case. + The case directory. """ pass + + def run_model_with_apptainer(self, case_dir: str) -> None: + """ + Run the model for the specified case using Apptainer. + + Parameters + ---------- + case_dir : str + The case directory. + """ + + raise NotImplementedError( + "The method run_model_with_apptainer must be implemented." + ) + + def run_model_with_docker(self, case_dir: str) -> None: + """ + Run the model for the specified case using Docker. + + Parameters + ---------- + case_dir : str + The case directory. + """ + + raise NotImplementedError( + "The method run_model_with_docker must be implemented." + ) diff --git a/bluemath_tk/wrappers/_utils_wrappers.py b/bluemath_tk/wrappers/_utils_wrappers.py new file mode 100644 index 0000000..dbb59b9 --- /dev/null +++ b/bluemath_tk/wrappers/_utils_wrappers.py @@ -0,0 +1,51 @@ +import os +import numpy as np + + +def write_array_in_file(array: np.ndarray, filename: str) -> None: + """ + Write an array in a file. + + Parameters + ---------- + array : np.ndarray + The array to be written. Can be 1D or 2D. + filename : str + The name of the file. + """ + + with open(filename, "w") as f: + if array.ndim == 1: + for item in array: + f.write(f"{item}\n") + elif array.ndim == 2: + for row in array: + f.write(" ".join(map(str, row)) + "\n") + else: + raise ValueError("Only 1D and 2D arrays are supported") + + +def copy_files(src: str, dst: str) -> None: + """ + Copy file(s) from source to destination. + + Parameters + ---------- + src : str + The source file. + dst : str + The destination file. + """ + + if os.path.isdir(src): + os.makedirs(dst, exist_ok=True) + for file in os.listdir(src): + with open(file, "r") as f: + content = f.read() + with open(os.path.join(dst, file), "w") as f: + f.write(content) + else: + with open(src, "r") as f: + content = f.read() + with open(dst, "w") as f: + f.write(content) diff --git a/bluemath_tk/wrappers/swash/swash_example.py b/bluemath_tk/wrappers/swash/swash_example.py index 9ad69a1..0aff295 100644 --- a/bluemath_tk/wrappers/swash/swash_example.py +++ b/bluemath_tk/wrappers/swash/swash_example.py @@ -136,10 +136,10 @@ def build_cases( output_dir=output_dir, ) # Build the input files - swan_model.build_cases(mode="one_by_one", depth=linear_depth, plants=plants) + swan_model.build_cases(mode="all_combinations", depth=linear_depth, plants=plants) # Set the SWASH executable swan_model.set_swash_exec( "/home/tausiaj/GeoOcean-Execs/SWASH-10.05-Linux/bin/swashrun" ) # Run the model - swan_model.run_cases() + swan_model.run_cases(parallel=True) diff --git a/bluemath_tk/wrappers/swash/swash_wrapper.py b/bluemath_tk/wrappers/swash/swash_wrapper.py index 1cb5ccb..7b68201 100644 --- a/bluemath_tk/wrappers/swash/swash_wrapper.py +++ b/bluemath_tk/wrappers/swash/swash_wrapper.py @@ -156,6 +156,7 @@ def run_model(self, case_dir: str, log_file: str = "swash_exec.log") -> None: if not self.swash_exec: raise ValueError("The SWASH executable was not set.") + # check if windows OS is_win = sys.platform.startswith("win") if is_win: @@ -166,3 +167,42 @@ def run_model(self, case_dir: str, log_file: str = "swash_exec.log") -> None: cmd += f" 2>&1 > {log_file}" # execute command self._exec_bash_commands(str_cmd=cmd) + + def run_model_with_apptainer(self, case_dir: str, apptainer_image: str) -> None: + """ + Run the SWASH model for the specified case using Apptainer. + + Parameters + ---------- + case_dir : str + The case directory. + apptainer_image : str + The Apptainer image. + """ + + # Construct the Apptainer command + apptainer_cmd = f"apptainer exec --bind {case_dir}:/tmp/swash --pwd /tmp/swash {apptainer_image} swashrun -input input.sws" + # Execute the Apptainer command + self._exec_bash_commands(str_cmd=apptainer_cmd) + + def run_model_with_docker( + self, + case_dir: str, + docker_image: str = "tausiaj/swash-image:latest", + ) -> None: + """ + Run the SWASH model for the specified case using Docker. + + Parameters + ---------- + case_dir : str + The case directory. + docker_image : str, optional + The Docker image. Default is "tausiaj/swash-image:latest". + """ + + # Construct the Docker command + # TODO: Check why --rm flag is not removing the container after execution + docker_cmd = f"docker run --rm -v {case_dir}:/case_dir -w /case_dir {docker_image} swashrun -input input.sws" + # Execute the Docker command + self._exec_bash_commands(str_cmd=docker_cmd) diff --git a/bluemath_tk/wrappers/swash/templates/slurm.sh b/bluemath_tk/wrappers/swash/templates/slurm.sh new file mode 100644 index 0000000..0f6f4e1 --- /dev/null +++ b/bluemath_tk/wrappers/swash/templates/slurm.sh @@ -0,0 +1,8 @@ +#!/bin/bash +#SBATCH --job-name=df_bs +#SBATCH --ntasks=8 +#SBATCH --mem=15GB +#SBATCH --time=24:00:00 +#SBATCH --partition=geocean + +/software/geocean/path/to/lanzador.sh /path/to/project/cases/case_$SLURM_ARRAY_TASK_ID/input.sws \ No newline at end of file From a87ba03db090ab4e73604c309dc630271e6a993e Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Wed, 15 Jan 2025 10:36:47 +0100 Subject: [PATCH 2/5] [JTH] add new input sws with updated example --- bluemath_tk/wrappers/swash/swash_example.py | 42 +++++-------------- .../wrappers/swash/templates/input.sws | 15 +++---- 2 files changed, 19 insertions(+), 38 deletions(-) diff --git a/bluemath_tk/wrappers/swash/swash_example.py b/bluemath_tk/wrappers/swash/swash_example.py index 0aff295..c343016 100644 --- a/bluemath_tk/wrappers/swash/swash_example.py +++ b/bluemath_tk/wrappers/swash/swash_example.py @@ -2,7 +2,6 @@ import numpy as np from bluemath_tk.datamining.lhs import LHS from bluemath_tk.datamining.mda import MDA -from bluemath_tk.topo_bathy.profiles import linear from bluemath_tk.waves.series import series_TMA from bluemath_tk.wrappers.swash.swash_wrapper import SwashModelWrapper @@ -16,8 +15,6 @@ def build_case( self, case_context: dict, case_dir: str, - depth: np.ndarray = None, - plants: np.ndarray = None, ) -> None: """ Build the input files for a case. @@ -34,16 +31,15 @@ def build_case( The plants array. Default is None. """ - if depth is not None: - # Save the depth to a file - self.write_array_in_file( - array=depth, filename=os.path.join(case_dir, "depth.bot") - ) - if plants is not None: - # Save the plants to a file - self.write_array_in_file( - array=plants, filename=os.path.join(case_dir, "plants.txt") - ) + # Copy test depth and plants files + self.copy_files( + src="/home/tausiaj/GitHub-GeoOcean/BlueMath/test_data/swash-depth.bot", + dst=os.path.join(case_dir, "depth.bot"), + ) + self.copy_files( + src="/home/tausiaj/GitHub-GeoOcean/BlueMath/test_data/swash-plants.txt", + dst=os.path.join(case_dir, "plants.txt"), + ) # Build the input waves waves_dict = { "H": case_context["Hs"], @@ -55,7 +51,7 @@ def build_case( "deltat": 1, "tendc": 1800, } - waves = series_TMA(waves=waves_dict, depth=depth[0]) + waves = series_TMA(waves=waves_dict, depth=10.0) # Save the waves to a file self.write_array_in_file( array=waves, filename=os.path.join(case_dir, "waves.bnd") @@ -64,8 +60,6 @@ def build_case( def build_cases( self, mode: str = "all_combinations", - depth: np.ndarray = None, - plants: np.ndarray = None, ) -> None: """ Build the input files for all cases. @@ -92,8 +86,6 @@ def build_cases( self.build_case( case_context=case_context, case_dir=case_dir, - depth=depth, - plants=plants, ) @@ -116,18 +108,6 @@ def build_cases( mda.fit(data=lhs_data) model_parameters = mda.centroids.to_dict(orient="list") output_dir = "/home/tausiaj/GitHub-GeoOcean/BlueMath/test_cases/swash/" - # Create the depth - """ - dx: bathymetry mesh resolution at x axes (m) - h0: offshore depth (m) - bCrest: beach heigh (m) - m: profile slope - Wfore: flume length before slope toe (m) - """ - linear_depth = linear(dx=0.05, h0=10, bCrest=5, m=1, Wfore=10) - # Create the plants - plants = np.zeros(linear_depth.size) - plants[(linear_depth < 1) & (linear_depth > 0)] = 1.0 # Create an instance of the SWASH model wrapper swan_model = VeggySwashModelWrapper( templates_dir=templates_dir, @@ -136,7 +116,7 @@ def build_cases( output_dir=output_dir, ) # Build the input files - swan_model.build_cases(mode="all_combinations", depth=linear_depth, plants=plants) + swan_model.build_cases(mode="one_by_one") # Set the SWASH executable swan_model.set_swash_exec( "/home/tausiaj/GeoOcean-Execs/SWASH-10.05-Linux/bin/swashrun" diff --git a/bluemath_tk/wrappers/swash/templates/input.sws b/bluemath_tk/wrappers/swash/templates/input.sws index c2761f7..e157495 100644 --- a/bluemath_tk/wrappers/swash/templates/input.sws +++ b/bluemath_tk/wrappers/swash/templates/input.sws @@ -1,14 +1,14 @@ -$ Project name +$Project name PROJECT '{{ case_num }}' $ $Set water level -SET LEVEL=1.0 +SET LEVEL=1.0040963361065678 $ $(1D-mode, flume) or (2D-mode, basin) MODE DYNanic ONED COORD CARTesian $Computational grid: geographic location, size, resolution and orientation -CGRID 0 0 0 1399 0 1567 0 +CGRID 0 0 0 1399 0 7345 0 $ $Multi-layered mode VERT 1 @@ -18,7 +18,7 @@ INPGRID BOTTOM 0 0 0 1399 0 1.00 1 READINP BOTTOM 1 'depth.bot' 1 0 FREE $ $Reading vegetation values from file -INPGRID NPLANTS 0 0 0 1399 0 1 1 +INPGRID NPLANTS 0 0 0 0 1 1 READINP NPLANTS 1 'plants.txt' 1 0 FREE $ $Initial values for flow variables @@ -31,9 +31,10 @@ SPON E 10 $ $Physics BREAK +FRIC MANNING 0.01 VEGETATION {{ vegetation_height }} 0.5009019570650376 1 1.0 $Numerics -NONHYDrostatic BOX 1. PREConditioner ILU +NONHYDrostatic $ $Output quantities DISCRET UPW MOM @@ -48,12 +49,12 @@ QUANTITY XP hexp=10 QUANT RUNUP delrp 0.01 $ CURVE 'line' 0 0 1399 1399 0 -TABLE 'line' HEAD 'output.tab' TSEC XP YP BOTL WATL QMAG OUTPUT 0 1 SEC +TABLE 'line' HEAD 'output.tab' TSEC XP YP WATL OUTPUT 0 1 SEC $ TABLE 'NOGRID' HEAD 'run.tab' TSEC RUNUP OUTPUT 0 1 SEC $ $Starts computation TEST 1,0 -COMPUTE 000000.000 0.0168 SEC 002000.000 +COMPUTE 000000.000 0.0048 SEC 003430.000 STOP $ \ No newline at end of file From 0b95729340fe0d5e9aec89750180000ea5e573fd Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Wed, 15 Jan 2025 10:55:16 +0100 Subject: [PATCH 3/5] [JTH] little change in input.sws to run model properly --- .gitignore | 1 + bluemath_tk/wrappers/swash/swash_example.py | 2 +- bluemath_tk/wrappers/swash/templates/input.sws | 17 ++++++++--------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 730cae1..afe09fc 100644 --- a/.gitignore +++ b/.gitignore @@ -99,4 +99,5 @@ venv.bak/ # Local notebooks notebooks/ test_cases/ +test_data/ TODO.md \ No newline at end of file diff --git a/bluemath_tk/wrappers/swash/swash_example.py b/bluemath_tk/wrappers/swash/swash_example.py index c343016..01874e1 100644 --- a/bluemath_tk/wrappers/swash/swash_example.py +++ b/bluemath_tk/wrappers/swash/swash_example.py @@ -122,4 +122,4 @@ def build_cases( "/home/tausiaj/GeoOcean-Execs/SWASH-10.05-Linux/bin/swashrun" ) # Run the model - swan_model.run_cases(parallel=True) + swan_model.run_cases(launcher="docker") diff --git a/bluemath_tk/wrappers/swash/templates/input.sws b/bluemath_tk/wrappers/swash/templates/input.sws index e157495..bd341a0 100644 --- a/bluemath_tk/wrappers/swash/templates/input.sws +++ b/bluemath_tk/wrappers/swash/templates/input.sws @@ -1,14 +1,14 @@ $Project name -PROJECT '{{ case_num }}' +PROJECT 'myproj' '{{ case_num }}' $ $Set water level -SET LEVEL=1.0040963361065678 +SET LEVEL=1.0 $ $(1D-mode, flume) or (2D-mode, basin) MODE DYNanic ONED COORD CARTesian $Computational grid: geographic location, size, resolution and orientation -CGRID 0 0 0 1399 0 7345 0 +CGRID 0 0 0 1399 0 2892 0 $ $Multi-layered mode VERT 1 @@ -18,7 +18,7 @@ INPGRID BOTTOM 0 0 0 1399 0 1.00 1 READINP BOTTOM 1 'depth.bot' 1 0 FREE $ $Reading vegetation values from file -INPGRID NPLANTS 0 0 0 0 1 1 +INPGRID NPLANTS 0 0 0 1399 0 1 1 READINP NPLANTS 1 'plants.txt' 1 0 FREE $ $Initial values for flow variables @@ -31,10 +31,9 @@ SPON E 10 $ $Physics BREAK -FRIC MANNING 0.01 -VEGETATION {{ vegetation_height }} 0.5009019570650376 1 1.0 +VEGETATION 1.4849813414456121 0.5009019570650376 1 1.0 $Numerics -NONHYDrostatic +NONHYDrostatic BOX 1. PREConditioner ILU $ $Output quantities DISCRET UPW MOM @@ -49,12 +48,12 @@ QUANTITY XP hexp=10 QUANT RUNUP delrp 0.01 $ CURVE 'line' 0 0 1399 1399 0 -TABLE 'line' HEAD 'output.tab' TSEC XP YP WATL OUTPUT 0 1 SEC +TABLE 'line' HEAD 'output.tab' TSEC XP YP BOTL WATL QMAG OUTPUT 0 1 SEC $ TABLE 'NOGRID' HEAD 'run.tab' TSEC RUNUP OUTPUT 0 1 SEC $ $Starts computation TEST 1,0 -COMPUTE 000000.000 0.0048 SEC 003430.000 +COMPUTE 000000.000 0.0133 SEC 002000.000 STOP $ \ No newline at end of file From 9da319c6e1d06087e5f2c9b9ecb3b5d174583f0b Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Wed, 15 Jan 2025 12:56:03 +0100 Subject: [PATCH 4/5] [JTH] change name in slurm and swash example --- bluemath_tk/wrappers/swash/swash_wrapper.py | 2 +- bluemath_tk/wrappers/swash/templates/{slurm.sh => sbatch.sh} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename bluemath_tk/wrappers/swash/templates/{slurm.sh => sbatch.sh} (100%) diff --git a/bluemath_tk/wrappers/swash/swash_wrapper.py b/bluemath_tk/wrappers/swash/swash_wrapper.py index 7b68201..a0df65d 100644 --- a/bluemath_tk/wrappers/swash/swash_wrapper.py +++ b/bluemath_tk/wrappers/swash/swash_wrapper.py @@ -188,7 +188,7 @@ def run_model_with_apptainer(self, case_dir: str, apptainer_image: str) -> None: def run_model_with_docker( self, case_dir: str, - docker_image: str = "tausiaj/swash-image:latest", + docker_image: str = "tausiaj/swash-geoocean:10.05", ) -> None: """ Run the SWASH model for the specified case using Docker. diff --git a/bluemath_tk/wrappers/swash/templates/slurm.sh b/bluemath_tk/wrappers/swash/templates/sbatch.sh similarity index 100% rename from bluemath_tk/wrappers/swash/templates/slurm.sh rename to bluemath_tk/wrappers/swash/templates/sbatch.sh From ed359ddf0eb410f97c98304932530e6dbc01e333 Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Wed, 15 Jan 2025 21:56:57 +0100 Subject: [PATCH 5/5] [JTH] add more docu and extra features to swash and base model wrappers --- .gitignore | 1 - bluemath_tk/wrappers/_base_wrappers.py | 200 +++++++++++++------- bluemath_tk/wrappers/swash/swash_example.py | 22 +-- bluemath_tk/wrappers/swash/swash_wrapper.py | 12 +- docs/wrappers/intro.md | 65 +++---- docs/wrappers/schedulers.md | 3 + 6 files changed, 178 insertions(+), 125 deletions(-) create mode 100644 docs/wrappers/schedulers.md diff --git a/.gitignore b/.gitignore index afe09fc..1d0a12a 100644 --- a/.gitignore +++ b/.gitignore @@ -49,7 +49,6 @@ coverage.xml *.py,cover .hypothesis/ .pytest_cache/ -tests_data/ # mypy .mypy_cache/ diff --git a/bluemath_tk/wrappers/_base_wrappers.py b/bluemath_tk/wrappers/_base_wrappers.py index 4fb305a..1991af3 100644 --- a/bluemath_tk/wrappers/_base_wrappers.py +++ b/bluemath_tk/wrappers/_base_wrappers.py @@ -1,6 +1,6 @@ import os +import copy import itertools -from abc import abstractmethod from typing import List import subprocess import numpy as np @@ -16,6 +16,10 @@ class BaseModelWrapper(BlueMathModel): Attributes ---------- + available_launchers : List[str] + The available launchers. + available_schedulers : List[str] + The available schedulers. templates_dir : str The directory where the templates are stored. templates_name : List[str] @@ -37,6 +41,10 @@ class BaseModelWrapper(BlueMathModel): Check if the parameters have the correct type. _exec_bash_commands(str_cmd, out_file=None, err_file=None) Execute bash commands. + list_available_launchers() + List the available launchers. + list_available_schedulers() + List the available schedulers. create_cases_context_one_by_one() Create an array of dictionaries with the combinations of values from the input dictionary, one by one. @@ -49,22 +57,25 @@ class BaseModelWrapper(BlueMathModel): Write an array in a file. copy_files(src, dst) Copy file(s) from source to destination. - build_cases(mode="all_combinations") + build_cases(mode="one_by_one") Create the cases folders and render the input files. run_case(case_dir, launcher=None, script=None, params=None) Run a single case based on the launcher, script, and parameters. run_cases(launcher=None, script=None, params=None, parallel=False) Run the cases based on the launcher, script, and parameters. Parallel execution is optional. - run_model(case_dir) - Run the model for a specific case (abstract method). - run_model_with_apptainer(case_dir) + run_cases_with_scheduler(scheduler, script, params=None) + Run the cases based on the scheduler, script, and parameters. + run_model() + Run the model for a specific case. + run_model_with_apptainer() Run the model for a specific case using Apptainer. - run_model_with_docker(case_dir) + run_model_with_docker() Run the model for a specific case using Docker. """ - available_launchers = ["sbatch", "apptainer", "docker"] + available_launchers = ["bash", "sh", "./", "apptainer", "docker", "qsub"] + available_schedulers = ["sbatch"] def __init__( self, @@ -187,6 +198,30 @@ def _exec_bash_commands( _stderr.flush() _stderr.close() + def list_available_launchers(self) -> List[str]: + """ + List the available launchers. + + Returns + ------- + list + A list with the available launchers. + """ + + return self.available_launchers + + def list_available_schedulers(self) -> List[str]: + """ + List the available schedulers. + + Returns + ------- + list + A list with the available schedulers. + """ + + return self.available_schedulers + def create_cases_context_one_by_one(self) -> List[dict]: """ Create an array of dictionaries with the combinations of values from the @@ -291,7 +326,7 @@ def copy_files(self, src: str, dst: str) -> None: copy_files(src=src, dst=dst) - def build_cases(self, mode: str = "all_combinations") -> None: + def build_cases(self, mode: str = "one_by_one") -> None: """ Create the cases folders and render the input files. @@ -364,6 +399,7 @@ def run_cases( script: str = None, params: str = None, parallel: bool = False, + cases_to_run: List[int] = None, ) -> None: """ Run the cases based on the launcher, script, and parameters. @@ -379,15 +415,13 @@ def run_cases( The parameters to run the cases. Default is None. parallel : bool, optional If True, the cases will be run in parallel. Default is False. + cases_to_run : List[int], optional + The list with the cases to run. Default is None. Raises ------ ValueError If the launcher is not recognized or the script does not exist. - - Notes - ----- - - Sbatch is the only option different from None, Apptainer, and Docker. """ if launcher is not None: @@ -395,89 +429,113 @@ def run_cases( raise ValueError( f"Invalid launcher: {launcher}, not in {self.available_launchers}." ) - if launcher == "sbatch": - if not os.path.exists(script): - raise ValueError(f"Script {script} does not exist.") - self.logger.info("Running cases with sbatch.") - self._exec_bash_commands(str_cmd=f"{launcher} {params} {script}") else: - if parallel: - num_threads = self.get_num_processors_available() - self.logger.info( - f"Running cases in parallel with launcher={launcher}. Number of threads: {num_threads}." - ) - with ThreadPoolExecutor(max_workers=num_threads) as executor: - future_to_case = { - executor.submit( - self.run_case, case_dir, launcher, script, params - ): case_dir - for case_dir in self.cases_dirs - } - for future in as_completed(future_to_case): - case_dir = future_to_case[future] - try: - future.result() - except Exception as exc: - self.logger.error( - f"Case {case_dir} generated an exception: {exc}." - ) - else: - self.logger.info( - f"Running cases sequentially with launcher={launcher}." - ) - for case_dir in self.cases_dirs: + self.logger.warning( + "Launcher is None, so the method run_model will be called." + ) + + if cases_to_run is not None: + self.logger.warning( + f"Cases to run was specified, so just {cases_to_run} will be run." + ) + cases_dir_to_run = [self.cases_dirs[case] for case in cases_to_run] + else: + cases_dir_to_run = copy.deepcopy(self.cases_dirs) + + if parallel: + num_threads = self.get_num_processors_available() + self.logger.info( + f"Running cases in parallel with launcher={launcher}. Number of threads: {num_threads}." + ) + with ThreadPoolExecutor(max_workers=num_threads) as executor: + future_to_case = { + executor.submit( + self.run_case, case_dir, launcher, script, params + ): case_dir + for case_dir in cases_dir_to_run + } + for future in as_completed(future_to_case): + case_dir = future_to_case[future] try: - self.run_case( - case_dir=case_dir, - launcher=launcher, - script=script, - params=params, - ) + future.result() except Exception as exc: self.logger.error( f"Case {case_dir} generated an exception: {exc}." ) - if launcher == "docker": - # Remove stopped containers after running all cases - remove_stopped_containers_cmd = 'docker ps -a --filter "ancestor=tausiaj/swash-image:latest" -q | xargs docker rm' - self._exec_bash_commands(str_cmd=remove_stopped_containers_cmd) - self.logger.info("All cases ran successfully.") + else: + self.logger.info(f"Running cases sequentially with launcher={launcher}.") + for case_dir in cases_dir_to_run: + try: + self.run_case( + case_dir=case_dir, + launcher=launcher, + script=script, + params=params, + ) + except Exception as exc: + self.logger.error(f"Case {case_dir} generated an exception: {exc}.") + + if launcher == "docker": + # Remove stopped containers after running all cases + remove_stopped_containers_cmd = 'docker ps -a --filter "ancestor=tausiaj/swash-image:latest" -q | xargs docker rm' + self._exec_bash_commands(str_cmd=remove_stopped_containers_cmd) + self.logger.info("All cases ran successfully.") - @abstractmethod - def run_model(self, case_dir: str) -> None: + def run_cases_with_scheduler( + self, + scheduler: str, + script: str, + params: str = None, + ) -> None: """ - Run the model. + Run the cases based on the scheduler, script, and parameters. Parameters ---------- - case_dir : str - The case directory. + scheduler : str + The scheduler to run the cases. + script : str + The script to run the cases. + params : str, optional + The parameters to run the cases. Default is None. + + Raises + ------ + ValueError + If the scheduler is not recognized or the script does not exist. """ - pass + if scheduler not in self.available_schedulers: + raise ValueError( + f"Invalid scheduler: {scheduler}, not in {self.available_schedulers}." + ) + if not os.path.exists(script): + raise ValueError(f"Script {script} does not exist.") + self.logger.info(f"Running cases with scheduler={scheduler}.") + self._exec_bash_commands(str_cmd=f"{scheduler} {params} {script}") - def run_model_with_apptainer(self, case_dir: str) -> None: + @staticmethod + def run_model() -> None: + """ + Run the model. """ - Run the model for the specified case using Apptainer. - Parameters - ---------- - case_dir : str - The case directory. + raise NotImplementedError("The method run_model must be implemented.") + + @staticmethod + def run_model_with_apptainer() -> None: + """ + Run the model for the specified case using Apptainer. """ raise NotImplementedError( "The method run_model_with_apptainer must be implemented." ) - def run_model_with_docker(self, case_dir: str) -> None: + @staticmethod + def run_model_with_docker() -> None: """ Run the model for the specified case using Docker. - - Parameters - ---------- - case_dir : str - The case directory. """ raise NotImplementedError( diff --git a/bluemath_tk/wrappers/swash/swash_example.py b/bluemath_tk/wrappers/swash/swash_example.py index 01874e1..632957a 100644 --- a/bluemath_tk/wrappers/swash/swash_example.py +++ b/bluemath_tk/wrappers/swash/swash_example.py @@ -25,19 +25,15 @@ def build_case( The case context. case_dir : str The case directory. - depth : np.ndarray, optional - The depth array. Default is None. - plants : np.ndarray, optional - The plants array. Default is None. """ # Copy test depth and plants files self.copy_files( - src="/home/tausiaj/GitHub-GeoOcean/BlueMath/test_data/swash-depth.bot", + src="C:/Users/UsuarioUC/Documents/BlueMath_tk/test_data/swash-depth.bot", dst=os.path.join(case_dir, "depth.bot"), ) self.copy_files( - src="/home/tausiaj/GitHub-GeoOcean/BlueMath/test_data/swash-plants.txt", + src="C:/Users/UsuarioUC/Documents/BlueMath_tk/test_data/swash-plants.txt", dst=os.path.join(case_dir, "plants.txt"), ) # Build the input waves @@ -59,7 +55,7 @@ def build_case( def build_cases( self, - mode: str = "all_combinations", + mode: str = "one_by_one", ) -> None: """ Build the input files for all cases. @@ -67,11 +63,7 @@ def build_cases( Parameters ---------- mode : str, optional - The mode to build the cases. Default is "all_combinations". - depth : np.ndarray, optional - The depth array. Default is None. - plants : np.ndarray, optional - The plants array. Default is None. + The mode to build the cases. Default is "one_by_one". Raises ------ @@ -93,7 +85,7 @@ def build_cases( if __name__ == "__main__": # Define the input parameters templates_dir = ( - "/home/tausiaj/GitHub-GeoOcean/BlueMath/bluemath_tk/wrappers/swash/templates/" + "C:/Users/UsuarioUC/Documents/BlueMath_tk/bluemath_tk/wrappers/swash/templates" ) templates_name = ["input.sws"] # Get 5 cases using LHS and MDA @@ -107,7 +99,7 @@ def build_cases( mda = MDA(num_centers=5) mda.fit(data=lhs_data) model_parameters = mda.centroids.to_dict(orient="list") - output_dir = "/home/tausiaj/GitHub-GeoOcean/BlueMath/test_cases/swash/" + output_dir = "C:/Users/UsuarioUC/Documents/BlueMath_tk/test_cases/swash/" # Create an instance of the SWASH model wrapper swan_model = VeggySwashModelWrapper( templates_dir=templates_dir, @@ -117,7 +109,7 @@ def build_cases( ) # Build the input files swan_model.build_cases(mode="one_by_one") - # Set the SWASH executable + # Set the SWASH executable (not used if docker is used) swan_model.set_swash_exec( "/home/tausiaj/GeoOcean-Execs/SWASH-10.05-Linux/bin/swashrun" ) diff --git a/bluemath_tk/wrappers/swash/swash_wrapper.py b/bluemath_tk/wrappers/swash/swash_wrapper.py index a0df65d..3dba69a 100644 --- a/bluemath_tk/wrappers/swash/swash_wrapper.py +++ b/bluemath_tk/wrappers/swash/swash_wrapper.py @@ -189,6 +189,8 @@ def run_model_with_docker( self, case_dir: str, docker_image: str = "tausiaj/swash-geoocean:10.05", + docker_out_logs: str = "docker_out.log", + docker_err_logs: str = "docker_err.log", ) -> None: """ Run the SWASH model for the specified case using Docker. @@ -198,11 +200,17 @@ def run_model_with_docker( case_dir : str The case directory. docker_image : str, optional - The Docker image. Default is "tausiaj/swash-image:latest". + The Docker image. Default is "tausiaj/swash-geoocean:10.05". + docker_out_logs : str, optional + The Docker output log file. Default is "docker_out.log". + docker_err_logs : str, optional + The Docker error log file. Default is "docker_err.log". """ # Construct the Docker command # TODO: Check why --rm flag is not removing the container after execution docker_cmd = f"docker run --rm -v {case_dir}:/case_dir -w /case_dir {docker_image} swashrun -input input.sws" # Execute the Docker command - self._exec_bash_commands(str_cmd=docker_cmd) + self._exec_bash_commands( + str_cmd=docker_cmd, out_file=docker_out_logs, err_file=docker_err_logs + ) diff --git a/docs/wrappers/intro.md b/docs/wrappers/intro.md index 7362b97..b398479 100644 --- a/docs/wrappers/intro.md +++ b/docs/wrappers/intro.md @@ -12,14 +12,26 @@ The [`BaseModelWrapper`](base_wrapper.md) class serves as the base class for all The [`SwashModelWrapper`](swash_wrapper.md) class is a specific implementation of the `BaseModelWrapper` for the SWASH model. It extends the base functionality to handle SWASH-specific requirements. -### Example Usage +### Example Usage (VeggySwashModelWrapper) + +To properly use wrappers, several bullet points must be understood: + +1. As shown in the example below, your model of interest base class, ``SwashModelWrapper` in this case, can be inherited to overwrite methods and build / run cases as needed. + +2. `build_case` method is essential to properly create the needed files to execute the model in each folder. In the example below, we copy a couple of files and then create an *waves* array that is written in another file. + +3. To **RUN** the cases, couple of methos are available: `run_cases` and `run_cases_with_scheduler`. In this section, we will focus on the `run_cases` method, for information regarding the `run_cases_with_scheduler` method, go [here](schedulers.md). +Then, the `run_cases` method allows the user to run the model for the different cases, directory by directory, as it is usually done. + +The method parameters are `launcher`, `script`, `params`, `parallel` and `cases_to_run`. Depending on the launcher, there might be some methods available, so please check if the launcher you want to use already has an implemented method. *If this is the case, overwrite this method in your class if you do not want to use the implemented version.* If your launcher has not an implemented method, you can either implement a method to be called, or use the parameters `launcher`, `script` and `params`, as the wrapper will execute `launcher params script` in each case directory. + +If no **launcher** is specified, wrapper will try to run the model locally, so the *executable* file must be set. Other way, wrapper should raise an Error. ```python import os import numpy as np from bluemath_tk.datamining.lhs import LHS from bluemath_tk.datamining.mda import MDA -from bluemath_tk.topo_bathy.profiles import linear from bluemath_tk.waves.series import series_TMA from bluemath_tk.wrappers.swash.swash_wrapper import SwashModelWrapper @@ -33,8 +45,6 @@ class VeggySwashModelWrapper(SwashModelWrapper): self, case_context: dict, case_dir: str, - depth: np.ndarray = None, - plants: np.ndarray = None, ) -> None: """ Build the input files for a case. @@ -51,16 +61,15 @@ class VeggySwashModelWrapper(SwashModelWrapper): The plants array. Default is None. """ - if depth is not None: - # Save the depth to a file - self.write_array_in_file( - array=depth, filename=os.path.join(case_dir, "depth.bot") - ) - if plants is not None: - # Save the plants to a file - self.write_array_in_file( - array=plants, filename=os.path.join(case_dir, "plants.txt") - ) + # Copy test depth and plants files + self.copy_files( + src="C:/Users/UsuarioUC/Documents/BlueMath_tk/test_data/swash-depth.bot", + dst=os.path.join(case_dir, "depth.bot"), + ) + self.copy_files( + src="C:/Users/UsuarioUC/Documents/BlueMath_tk/test_data/swash-plants.txt", + dst=os.path.join(case_dir, "plants.txt"), + ) # Build the input waves waves_dict = { "H": case_context["Hs"], @@ -72,7 +81,7 @@ class VeggySwashModelWrapper(SwashModelWrapper): "deltat": 1, "tendc": 1800, } - waves = series_TMA(waves=waves_dict, depth=depth[0]) + waves = series_TMA(waves=waves_dict, depth=10.0) # Save the waves to a file self.write_array_in_file( array=waves, filename=os.path.join(case_dir, "waves.bnd") @@ -81,8 +90,6 @@ class VeggySwashModelWrapper(SwashModelWrapper): def build_cases( self, mode: str = "all_combinations", - depth: np.ndarray = None, - plants: np.ndarray = None, ) -> None: """ Build the input files for all cases. @@ -109,8 +116,6 @@ class VeggySwashModelWrapper(SwashModelWrapper): self.build_case( case_context=case_context, case_dir=case_dir, - depth=depth, - plants=plants, ) @@ -118,7 +123,7 @@ class VeggySwashModelWrapper(SwashModelWrapper): if __name__ == "__main__": # Define the input parameters templates_dir = ( - "/home/tausiaj/GitHub-GeoOcean/BlueMath/bluemath_tk/wrappers/swash/templates/" + "C:/Users/UsuarioUC/Documents/BlueMath_tk/bluemath_tk/wrappers/swash/templates" ) templates_name = ["input.sws"] # Get 5 cases using LHS and MDA @@ -132,19 +137,7 @@ if __name__ == "__main__": mda = MDA(num_centers=5) mda.fit(data=lhs_data) model_parameters = mda.centroids.to_dict(orient="list") - output_dir = "/home/tausiaj/GitHub-GeoOcean/BlueMath/test_cases/swash/" - # Create the depth - """ - dx: bathymetry mesh resolution at x axes (m) - h0: offshore depth (m) - bCrest: beach heigh (m) - m: profile slope - Wfore: flume length before slope toe (m) - """ - linear_depth = linear(dx=0.05, h0=10, bCrest=5, m=1, Wfore=10) - # Create the plants - plants = np.zeros(linear_depth.size) - plants[(linear_depth < 1) & (linear_depth > 0)] = 1.0 + output_dir = "C:/Users/UsuarioUC/Documents/BlueMath_tk/test_cases/swash/" # Create an instance of the SWASH model wrapper swan_model = VeggySwashModelWrapper( templates_dir=templates_dir, @@ -153,11 +146,11 @@ if __name__ == "__main__": output_dir=output_dir, ) # Build the input files - swan_model.build_cases(mode="one_by_one", depth=linear_depth, plants=plants) - # Set the SWASH executable + swan_model.build_cases(mode="one_by_one") + # Set the SWASH executable (not used if docker is used) swan_model.set_swash_exec( "/home/tausiaj/GeoOcean-Execs/SWASH-10.05-Linux/bin/swashrun" ) # Run the model - swan_model.run_cases() + swan_model.run_cases(launcher="docker") ``` diff --git a/docs/wrappers/schedulers.md b/docs/wrappers/schedulers.md new file mode 100644 index 0000000..a42ff22 --- /dev/null +++ b/docs/wrappers/schedulers.md @@ -0,0 +1,3 @@ +# Schedulers documentation + +Under development... \ No newline at end of file