Skip to content

Commit 960fb5a

Browse files
Multithreading apply_macros (MetOffice#225)
* add concurrency to writing * clean imports
1 parent 515e549 commit 960fb5a

1 file changed

Lines changed: 68 additions & 20 deletions

File tree

lfric_macros/apply_macros.py

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import yaml
2222
import networkx as nx
2323
from collections import defaultdict
24+
from concurrent.futures import as_completed, ThreadPoolExecutor
2425
from pathlib import Path
2526

2627
BLACK_COMMAND = "black --line-length=80"
@@ -927,7 +928,16 @@ def order_meta_dirs(self) -> list[str]:
927928
# guaranteed for valid rose metadata
928929
return list(nx.topological_sort(import_graph))
929930

930-
def preprocess_macros(self) -> None:
931+
def write_macros(self, meta_dir: Path, full_command: str) -> None:
932+
"""
933+
Overarching function for writing processed macros to versions files
934+
"""
935+
936+
self.write_python_imports(meta_dir)
937+
self.write_new_macro(meta_dir, full_command, self.target_macros[meta_dir])
938+
self.sections_with_macro.append(meta_dir)
939+
940+
def preprocess_macros(self, nproc: int) -> None:
931941
"""
932942
Overarching function to pre-process added macros
933943
Run before running any rose macro upgrade commands"
@@ -991,6 +1001,7 @@ def preprocess_macros(self) -> None:
9911001
# Now reconstruct the macro for all applications which have the newly
9921002
# added macro or import metadata with the new macro
9931003
# The macro sections need to be processed in the order of import
1004+
macros_to_write = {}
9941005
for meta_dir in self.order_meta_dirs():
9951006
import_order = self.determine_import_order(meta_dir)
9961007
full_command = self.combine_macros(import_order)
@@ -1009,22 +1020,34 @@ def preprocess_macros(self) -> None:
10091020
if last_after_tag:
10101021
self.target_macros[meta_dir]["before_tag"] = last_after_tag
10111022

1023+
macros_to_write[meta_dir] = full_command
1024+
1025+
with ThreadPoolExecutor(max_workers=nproc) as executor:
1026+
meta_order = sorted(macros_to_write.keys())
1027+
write_tasks = [
1028+
executor.submit(self.write_macros, meta_dir, macros_to_write[meta_dir])
1029+
for meta_dir in meta_order
1030+
]
1031+
for task in as_completed(write_tasks):
1032+
exception = task.exception()
1033+
if exception is not None:
1034+
executor.shutdown(wait=False, cancel_futures=True)
1035+
raise exception
10121036
print(
1013-
"[INFO] Writing macros to",
1014-
self.parse_application_section(meta_dir),
1037+
"[INFO] Processed macro successfully written to "
1038+
f"{
1039+
self.parse_application_section(
1040+
meta_order[write_tasks.index(task)]
1041+
)
1042+
}"
10151043
)
1016-
self.write_python_imports(meta_dir)
1017-
self.write_new_macro(
1018-
meta_dir, full_command, self.target_macros[meta_dir]
1019-
)
1020-
self.sections_with_macro.append(meta_dir)
10211044

10221045
############################################################################
10231046
# Upgrade Apps Functions
10241047
############################################################################
10251048

10261049
def metadata_check(self, meta_dir: Path) -> None:
1027-
""" "
1050+
"""
10281051
Note: Not currently run - see comment below
10291052
Run rose metadata-check on rose metadata directories to check the
10301053
validity of the metadata.
@@ -1091,16 +1114,14 @@ def run_app_upgrade(self, app_path: Path) -> None:
10911114
- app_path, the path to this app
10921115
"""
10931116
app = app_path.name
1094-
print(f"[INFO] Upgrading the rose-stem app {app}")
10951117
command = f"rose app-upgrade -a -y -C {app_path} {self.tag}"
10961118
result = run_command(command)
10971119
if result.returncode:
1098-
print(f"[FAIL] The rose-stem app {app} failed to upgrade")
10991120
raise RuntimeError(
1121+
f"[FAIL] The rose-stem app {app} failed to upgrade\n\n"
11001122
f"\nThe command run:\n{command}"
11011123
f"\nThe error message produced:\n{result.stderr}"
11021124
)
1103-
print(f"[PASS] Upgraded rose-stem app {app} successfully")
11041125

11051126
def run_macro_fix(self, app_path: Path) -> None:
11061127
"""
@@ -1110,18 +1131,16 @@ def run_macro_fix(self, app_path: Path) -> None:
11101131
- app_path, the path to this app
11111132
"""
11121133
app = app_path.name
1113-
print(f"[INFO] Forcing metadata consistency in app {app}")
11141134
command = f"rose macro --fix -y -C {app_path}"
11151135
result = run_command(command)
11161136
if result.returncode:
1117-
print(f"[FAIL] Unable to force metadata consistency in {app}")
11181137
raise RuntimeError(
1138+
f"[FAIL] Unable to force metadata consistency in {app}\n\n"
11191139
f"\nThe command run:\n{command}"
11201140
f"\nThe error message produced:\n{result.stderr}"
11211141
)
1122-
print(f"[PASS] Successfully forced metadata consistency in {app}")
11231142

1124-
def upgrade_apps(self) -> None:
1143+
def upgrade_apps(self, nproc: int) -> None:
11251144
"""
11261145
Overarching function to run rose commands to apply upgrade macros to
11271146
rose-stem apps.
@@ -1147,10 +1166,36 @@ def upgrade_apps(self) -> None:
11471166
banner_print("[INFO] Upgrading Apps")
11481167
upgradeable_apps = self.apps_to_upgrade()
11491168
for app_path in upgradeable_apps:
1150-
self.run_app_upgrade(app_path)
1151-
self.run_macro_fix(app_path)
11521169
if self.core_source in app_path.parents:
11531170
self.upgraded_core = True
1171+
break
1172+
1173+
with ThreadPoolExecutor(max_workers=nproc) as executor:
1174+
upgrade_tasks = [
1175+
executor.submit(self.run_app_upgrade, app) for app in upgradeable_apps
1176+
]
1177+
for task in as_completed(upgrade_tasks):
1178+
exception = task.exception()
1179+
if exception is not None:
1180+
executor.shutdown(wait=False, cancel_futures=True)
1181+
raise exception
1182+
print(
1183+
"[PASS] Upgraded rose-stem app "
1184+
f"{upgradeable_apps[upgrade_tasks.index(task)].name} successfully"
1185+
)
1186+
1187+
fix_tasks = [
1188+
executor.submit(self.run_macro_fix, app) for app in upgradeable_apps
1189+
]
1190+
for task in as_completed(fix_tasks):
1191+
exception = task.exception()
1192+
if exception is not None:
1193+
executor.shutdown(wait=False, cancel_futures=True)
1194+
raise exception
1195+
print(
1196+
"[PASS] Successfully forced metadata consistency in "
1197+
f"{upgradeable_apps[fix_tasks.index(task)].name}"
1198+
)
11541199

11551200

11561201
def check_tag(opt: str | None) -> str | None:
@@ -1223,6 +1268,9 @@ def parse_args() -> argparse.Namespace:
12231268
"Either a path to a working copy or a git source."
12241269
"If not set, will be read from the dependencies.yaml",
12251270
)
1271+
parser.add_argument(
1272+
"-p", "--processes", type=int, default=4, help="Number of processes to use"
1273+
)
12261274
return parser.parse_args()
12271275

12281276

@@ -1243,10 +1291,10 @@ def apply_macros_main(
12431291

12441292
# Pre-process macros
12451293
banner_print("Pre-Processing Macros")
1246-
macro_object.preprocess_macros()
1294+
macro_object.preprocess_macros(args.processes)
12471295

12481296
# Upgrade Rose Stem Apps
1249-
macro_object.upgrade_apps()
1297+
macro_object.upgrade_apps(args.processes)
12501298

12511299
# Clean up temporary directories
12521300
for repo, directory in macro_object.temp_dirs.items():

0 commit comments

Comments
 (0)