Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 30 additions & 36 deletions 2-parsl-advanced-features.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"from parsl.executors.threads import ThreadPoolExecutor\n",
"from parsl.executors import HighThroughputExecutor\n",
"from parsl.providers import LocalProvider\n",
"from parsl.channels import LocalChannel\n",
"\n",
"# Define a configuration for using local threads and pilot jobs\n",
"multi_site_config = Config(\n",
Expand All @@ -47,9 +46,8 @@
" HighThroughputExecutor(\n",
" label=\"local_htex\",\n",
" worker_debug=True,\n",
" max_workers=1,\n",
" max_workers_per_node=1,\n",
" provider=LocalProvider(\n",
" channel=LocalChannel(),\n",
" init_blocks=1,\n",
" max_blocks=1,\n",
" ),\n",
Expand Down Expand Up @@ -135,7 +133,7 @@
"\n",
"As a Parsl script is evaluated, it creates a collection of tasks for asynchronous execution. In most cases this stream of tasks is variable as different stages of the workflow are evaluated. To address this variability, Parsl is able to monitor the flow of tasks and elastically provision resources, within user specified bounds, in response. \n",
"\n",
"In the following example, we declare the range of blocks to be provisioned from 0 to 2 (minBlocks and maxBlocks, respectively). We then set parallelism to 0.1, which means that Parsl will favor reusing resources rather than provisioning new resources. You should see that the app is executed on one process IDs. Note: we restrict Parsl to using one worker per block (max_workers=1)."
"In the following example, we declare the range of blocks to be provisioned from 0 to 2 (minBlocks and maxBlocks, respectively). We then set parallelism to 0.1, which means that Parsl will favor reusing resources rather than provisioning new resources. You should see that the app is executed on one process IDs. Note: we restrict Parsl to using one worker per block (max_workers_per_node=1)."
]
},
{
Expand All @@ -147,17 +145,15 @@
"import parsl\n",
"from parsl.app.app import python_app, bash_app\n",
"from parsl.providers import LocalProvider\n",
"from parsl.channels import LocalChannel\n",
"from parsl.config import Config\n",
"from parsl.executors import HighThroughputExecutor\n",
"\n",
"local_htex = Config(\n",
" executors=[\n",
" HighThroughputExecutor(\n",
" label=\"local_htex\",\n",
" max_workers=1,\n",
" max_workers_per_node=1,\n",
" provider=LocalProvider(\n",
" channel=LocalChannel(),\n",
" init_blocks=1,\n",
" max_blocks=2,\n",
" parallelism=0.1,\n",
Expand Down Expand Up @@ -202,17 +198,15 @@
"import parsl\n",
"from parsl.app.app import python_app, bash_app\n",
"from parsl.providers import LocalProvider\n",
"from parsl.channels import LocalChannel\n",
"from parsl.config import Config\n",
"from parsl.executors import HighThroughputExecutor\n",
"\n",
"local_htex = Config(\n",
" executors=[\n",
" HighThroughputExecutor(\n",
" label=\"local_htex\",\n",
" max_workers=1,\n",
" max_workers_per_node=1,\n",
" provider=LocalProvider(\n",
" channel=LocalChannel(),\n",
" init_blocks=1,\n",
" max_blocks=2,\n",
" parallelism=1,\n",
Expand Down Expand Up @@ -267,7 +261,6 @@
"import parsl\n",
"from parsl.app.app import python_app, bash_app\n",
"from parsl.providers import LocalProvider\n",
"from parsl.channels import LocalChannel\n",
"from parsl.config import Config\n",
"from parsl.executors import HighThroughputExecutor\n",
"\n",
Expand All @@ -276,9 +269,8 @@
" HighThroughputExecutor(\n",
" label=\"htex_Local\",\n",
" worker_debug=True,\n",
" max_workers=1,\n",
" max_workers_per_node=1,\n",
" provider=LocalProvider(\n",
" channel=LocalChannel(),\n",
" init_blocks=1,\n",
" max_blocks=1,\n",
" )\n",
Expand Down Expand Up @@ -323,7 +315,7 @@
"\n",
"Checkpointing uses App caching to store results. Thus, the same caveats apply to non-deterministic functions. That is, the checkpoint saves results for an instance of an App when it has the same name, arguments, and function body. \n",
"\n",
"In this example we demonstrate how to automatically checkpoint workflows when tasks succesfully execute. This is enabled in the config by setting `checkpointMode` to `task_exit`. Other checkpointing models are described in the [checkpointing documentation](https://parsl.readthedocs.io/en/latest/userguide/checkpoints.html).\n"
"In this example we demonstrate how to automatically checkpoint workflows when tasks succesfully execute. This is enabled by creating a `BasicMemoizer` with `checkpoint_mode='task_exit'` and passing it to `Config(memoizer=...)`. Other checkpointing models are described in the [checkpointing documentation](https://parsl.readthedocs.io/en/latest/userguide/checkpoints.html).\n"
]
},
{
Expand All @@ -335,23 +327,22 @@
"import parsl\n",
"from parsl.app.app import python_app, bash_app\n",
"from parsl.providers import LocalProvider\n",
"from parsl.channels import LocalChannel\n",
"from parsl.config import Config\n",
"from parsl.executors import HighThroughputExecutor\n",
"from parsl.dataflow.memoization import BasicMemoizer\n",
"\n",
"local_htex = Config(\n",
" executors=[\n",
" HighThroughputExecutor(\n",
" label=\"local_htex\",\n",
" max_workers=1,\n",
" max_workers_per_node=1,\n",
" provider=LocalProvider(\n",
" channel=LocalChannel(),\n",
" init_blocks=1,\n",
" max_blocks=1,\n",
" )\n",
" )\n",
" ],\n",
" checkpoint_mode='task_exit',\n",
" memoizer=BasicMemoizer(checkpoint_mode='task_exit'),\n",
")\n",
"\n",
"dfk = parsl.load(local_htex)\n",
Expand All @@ -376,7 +367,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"To restart from a previous checkpoint the DFK must be configured with the appropriate checkpoint file. In most cases this is likley to be the most recent checkpoint file created. The following approach works with any checkpoint file, irrespective of which checkpointing method was used to create it. \n",
"To restart from a previous checkpoint the DFK must be configured with the appropriate checkpoint file via the memoizer. In most cases this is likley to be the most recent checkpoint file created. The following approach works with any checkpoint file, irrespective of which checkpointing method was used to create it. \n",
"\n",
"In this example we reload the most recent checkpoint and attempt to run the same workflow. The results return immediately as there is no need to rexecute each app. "
]
Expand All @@ -389,20 +380,20 @@
"source": [
"from parsl.utils import get_all_checkpoints\n",
"from parsl import set_stream_logger\n",
"from parsl.dataflow.memoization import BasicMemoizer\n",
"\n",
"local_htex = Config(\n",
" executors=[\n",
" HighThroughputExecutor(\n",
" label=\"local_htex\",\n",
" max_workers=1,\n",
" max_workers_per_node=1,\n",
" provider=LocalProvider(\n",
" channel=LocalChannel(),\n",
" init_blocks=1,\n",
" max_blocks=1,\n",
" )\n",
" )\n",
" ],\n",
" checkpoint_files = get_all_checkpoints(),\n",
" memoizer=BasicMemoizer(checkpoint_files=get_all_checkpoints()),\n",
")\n",
"\n",
"parsl.load(local_htex)\n",
Expand Down Expand Up @@ -535,7 +526,9 @@
"\n",
"To enable monitoring you must install Parsl with the monitoring module and add the monitoring hub to the configuration.\n",
"\n",
" $ pip install parsl[monitoring]\n",
" $ pip install parsl[monitoring] pandas plotly nbformat\n",
"\n",
"We use pandas and plotly for data visualization in the following walkthrough.\n",
"\n",
"Note: in this example we set the resource monitoring interval to 3 seconds so that we can capture resource information from short running tasks. In practice you will likely use a longer interval."
]
Expand All @@ -555,18 +548,14 @@
"from parsl.executors import HighThroughputExecutor\n",
"from parsl.providers import LocalProvider\n",
"from parsl.addresses import address_by_hostname\n",
"from parsl.channels import LocalChannel\n",
"\n",
"import logging\n",
"\n",
"config = Config(\n",
" executors=[\n",
" HighThroughputExecutor(\n",
" label=\"local_htex\",\n",
" address=address_by_hostname(),\n",
" max_workers=1,\n",
" max_workers_per_node=1,\n",
" provider=LocalProvider(\n",
" channel=LocalChannel(),\n",
" init_blocks=1,\n",
" max_blocks=1,\n",
" ),\n",
Expand Down Expand Up @@ -622,7 +611,7 @@
"import sqlite3\n",
"import pandas as pd\n",
"\n",
"conn = sqlite3.connect('monitoring.db')"
"conn = sqlite3.connect('runinfo/monitoring.db')"
]
},
{
Expand Down Expand Up @@ -732,13 +721,18 @@
" 'Resource': last_status['task_status_name']\n",
" }\n",
" parsl_tasks.extend([last_status_bar])\n",
"colors = {'pending': 'rgb(168, 168, 168)',\n",
" 'launched': 'rgb(100, 255, 255)',\n",
" 'running': 'rgb(0, 0, 255)',\n",
" 'exec_done': 'rgb(0, 200, 0)',\n",
" 'done': 'rgb(0, 200, 0)',\n",
" 'failed': 'rgb(255, 100, 100)',\n",
" }\n",
"colors = {\n",
" \"pending\": 'rgb(168, 168, 168)',\n",
" \"launched\": 'rgb(100, 255, 255)',\n",
" \"running\": 'rgb(0, 0, 255)',\n",
" \"dep_fail\": 'rgb(255, 128, 255)',\n",
" \"failed\": 'rgb(200, 0, 0)',\n",
" \"exec_done\": 'rgb(0, 200, 0)',\n",
" \"memo_done\": 'rgb(64, 200, 64)',\n",
" \"fail_retryable\": 'rgb(200, 128, 128)',\n",
" \"joining\": 'rgb(128, 128, 255)',\n",
" \"running_ended\": 'rgb(64, 64, 255)',\n",
"}\n",
"fig = ff.create_gantt(parsl_tasks,\n",
" title=\"\",\n",
" colors=colors,\n",
Expand Down
4 changes: 1 addition & 3 deletions 3-parsl-workflows.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@
"from parsl.data_provider.files import File\n",
"import os\n",
"from parsl.config import Config\n",
"from parsl.channels import LocalChannel\n",
"from parsl.providers import SlurmProvider\n",
"from parsl.executors import HighThroughputExecutor\n",
"from parsl.launchers import SrunLauncher\n",
Expand All @@ -358,10 +357,9 @@
" HighThroughputExecutor(\n",
" label=\"frontera_htex\",\n",
" address=address_by_hostname(),\n",
" max_workers=56,\n",
" max_workers_per_node=56,\n",
" provider=SlurmProvider(\n",
" cmd_timeout=60, \n",
" channel=LocalChannel(),\n",
" nodes_per_block=1,\n",
" partition='development',\n",
" scheduler_options='#SBATCH -A ALLOCATION', # Enter allocation\n",
Expand Down