diff --git a/2-parsl-advanced-features.ipynb b/2-parsl-advanced-features.ipynb index bfbbd22..3d28aca 100644 --- a/2-parsl-advanced-features.ipynb +++ b/2-parsl-advanced-features.ipynb @@ -35,7 +35,6 @@ "from parsl.executors.threads import ThreadPoolExecutor\n", "from parsl.executors import HighThroughputExecutor\n", "from parsl.providers import LocalProvider\n", - "from parsl.channels import LocalChannel\n", "\n", "# Define a configuration for using local threads and pilot jobs\n", "multi_site_config = Config(\n", @@ -47,9 +46,8 @@ " HighThroughputExecutor(\n", " label=\"local_htex\",\n", " worker_debug=True,\n", - " max_workers=1,\n", + " max_workers_per_node=1,\n", " provider=LocalProvider(\n", - " channel=LocalChannel(),\n", " init_blocks=1,\n", " max_blocks=1,\n", " ),\n", @@ -135,7 +133,7 @@ "\n", "As a Parsl script is evaluated, it creates a collection of tasks for asynchronous execution. In most cases this stream of tasks is variable as different stages of the workflow are evaluated. To address this variability, Parsl is able to monitor the flow of tasks and elastically provision resources, within user specified bounds, in response. \n", "\n", - "In the following example, we declare the range of blocks to be provisioned from 0 to 2 (minBlocks and maxBlocks, respectively). We then set parallelism to 0.1, which means that Parsl will favor reusing resources rather than provisioning new resources. You should see that the app is executed on one process IDs. Note: we restrict Parsl to using one worker per block (max_workers=1)." + "In the following example, we declare the range of blocks to be provisioned from 0 to 2 (minBlocks and maxBlocks, respectively). We then set parallelism to 0.1, which means that Parsl will favor reusing resources rather than provisioning new resources. You should see that the app is executed on one process IDs. Note: we restrict Parsl to using one worker per block (max_workers_per_node=1)." ] }, { @@ -147,7 +145,6 @@ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.providers import LocalProvider\n", - "from parsl.channels import LocalChannel\n", "from parsl.config import Config\n", "from parsl.executors import HighThroughputExecutor\n", "\n", @@ -155,9 +152,8 @@ " executors=[\n", " HighThroughputExecutor(\n", " label=\"local_htex\",\n", - " max_workers=1,\n", + " max_workers_per_node=1,\n", " provider=LocalProvider(\n", - " channel=LocalChannel(),\n", " init_blocks=1,\n", " max_blocks=2,\n", " parallelism=0.1,\n", @@ -202,7 +198,6 @@ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.providers import LocalProvider\n", - "from parsl.channels import LocalChannel\n", "from parsl.config import Config\n", "from parsl.executors import HighThroughputExecutor\n", "\n", @@ -210,9 +205,8 @@ " executors=[\n", " HighThroughputExecutor(\n", " label=\"local_htex\",\n", - " max_workers=1,\n", + " max_workers_per_node=1,\n", " provider=LocalProvider(\n", - " channel=LocalChannel(),\n", " init_blocks=1,\n", " max_blocks=2,\n", " parallelism=1,\n", @@ -267,7 +261,6 @@ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.providers import LocalProvider\n", - "from parsl.channels import LocalChannel\n", "from parsl.config import Config\n", "from parsl.executors import HighThroughputExecutor\n", "\n", @@ -276,9 +269,8 @@ " HighThroughputExecutor(\n", " label=\"htex_Local\",\n", " worker_debug=True,\n", - " max_workers=1,\n", + " max_workers_per_node=1,\n", " provider=LocalProvider(\n", - " channel=LocalChannel(),\n", " init_blocks=1,\n", " max_blocks=1,\n", " )\n", @@ -323,7 +315,7 @@ "\n", "Checkpointing uses App caching to store results. Thus, the same caveats apply to non-deterministic functions. That is, the checkpoint saves results for an instance of an App when it has the same name, arguments, and function body. \n", "\n", - "In this example we demonstrate how to automatically checkpoint workflows when tasks succesfully execute. This is enabled in the config by setting `checkpointMode` to `task_exit`. Other checkpointing models are described in the [checkpointing documentation](https://parsl.readthedocs.io/en/latest/userguide/checkpoints.html).\n" + "In this example we demonstrate how to automatically checkpoint workflows when tasks succesfully execute. This is enabled by creating a `BasicMemoizer` with `checkpoint_mode='task_exit'` and passing it to `Config(memoizer=...)`. Other checkpointing models are described in the [checkpointing documentation](https://parsl.readthedocs.io/en/latest/userguide/checkpoints.html).\n" ] }, { @@ -335,23 +327,22 @@ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.providers import LocalProvider\n", - "from parsl.channels import LocalChannel\n", "from parsl.config import Config\n", "from parsl.executors import HighThroughputExecutor\n", + "from parsl.dataflow.memoization import BasicMemoizer\n", "\n", "local_htex = Config(\n", " executors=[\n", " HighThroughputExecutor(\n", " label=\"local_htex\",\n", - " max_workers=1,\n", + " max_workers_per_node=1,\n", " provider=LocalProvider(\n", - " channel=LocalChannel(),\n", " init_blocks=1,\n", " max_blocks=1,\n", " )\n", " )\n", " ],\n", - " checkpoint_mode='task_exit',\n", + " memoizer=BasicMemoizer(checkpoint_mode='task_exit'),\n", ")\n", "\n", "dfk = parsl.load(local_htex)\n", @@ -376,7 +367,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "To restart from a previous checkpoint the DFK must be configured with the appropriate checkpoint file. In most cases this is likley to be the most recent checkpoint file created. The following approach works with any checkpoint file, irrespective of which checkpointing method was used to create it. \n", + "To restart from a previous checkpoint the DFK must be configured with the appropriate checkpoint file via the memoizer. In most cases this is likley to be the most recent checkpoint file created. The following approach works with any checkpoint file, irrespective of which checkpointing method was used to create it. \n", "\n", "In this example we reload the most recent checkpoint and attempt to run the same workflow. The results return immediately as there is no need to rexecute each app. " ] @@ -389,20 +380,20 @@ "source": [ "from parsl.utils import get_all_checkpoints\n", "from parsl import set_stream_logger\n", + "from parsl.dataflow.memoization import BasicMemoizer\n", "\n", "local_htex = Config(\n", " executors=[\n", " HighThroughputExecutor(\n", " label=\"local_htex\",\n", - " max_workers=1,\n", + " max_workers_per_node=1,\n", " provider=LocalProvider(\n", - " channel=LocalChannel(),\n", " init_blocks=1,\n", " max_blocks=1,\n", " )\n", " )\n", " ],\n", - " checkpoint_files = get_all_checkpoints(),\n", + " memoizer=BasicMemoizer(checkpoint_files=get_all_checkpoints()),\n", ")\n", "\n", "parsl.load(local_htex)\n", @@ -535,7 +526,9 @@ "\n", "To enable monitoring you must install Parsl with the monitoring module and add the monitoring hub to the configuration.\n", "\n", - " $ pip install parsl[monitoring]\n", + " $ pip install parsl[monitoring] pandas plotly nbformat\n", + "\n", + "We use pandas and plotly for data visualization in the following walkthrough.\n", "\n", "Note: in this example we set the resource monitoring interval to 3 seconds so that we can capture resource information from short running tasks. In practice you will likely use a longer interval." ] @@ -555,18 +548,14 @@ "from parsl.executors import HighThroughputExecutor\n", "from parsl.providers import LocalProvider\n", "from parsl.addresses import address_by_hostname\n", - "from parsl.channels import LocalChannel\n", - "\n", - "import logging\n", "\n", "config = Config(\n", " executors=[\n", " HighThroughputExecutor(\n", " label=\"local_htex\",\n", " address=address_by_hostname(),\n", - " max_workers=1,\n", + " max_workers_per_node=1,\n", " provider=LocalProvider(\n", - " channel=LocalChannel(),\n", " init_blocks=1,\n", " max_blocks=1,\n", " ),\n", @@ -622,7 +611,7 @@ "import sqlite3\n", "import pandas as pd\n", "\n", - "conn = sqlite3.connect('monitoring.db')" + "conn = sqlite3.connect('runinfo/monitoring.db')" ] }, { @@ -732,13 +721,18 @@ " 'Resource': last_status['task_status_name']\n", " }\n", " parsl_tasks.extend([last_status_bar])\n", - "colors = {'pending': 'rgb(168, 168, 168)',\n", - " 'launched': 'rgb(100, 255, 255)',\n", - " 'running': 'rgb(0, 0, 255)',\n", - " 'exec_done': 'rgb(0, 200, 0)',\n", - " 'done': 'rgb(0, 200, 0)',\n", - " 'failed': 'rgb(255, 100, 100)',\n", - " }\n", + "colors = {\n", + " \"pending\": 'rgb(168, 168, 168)',\n", + " \"launched\": 'rgb(100, 255, 255)',\n", + " \"running\": 'rgb(0, 0, 255)',\n", + " \"dep_fail\": 'rgb(255, 128, 255)',\n", + " \"failed\": 'rgb(200, 0, 0)',\n", + " \"exec_done\": 'rgb(0, 200, 0)',\n", + " \"memo_done\": 'rgb(64, 200, 64)',\n", + " \"fail_retryable\": 'rgb(200, 128, 128)',\n", + " \"joining\": 'rgb(128, 128, 255)',\n", + " \"running_ended\": 'rgb(64, 64, 255)',\n", + "}\n", "fig = ff.create_gantt(parsl_tasks,\n", " title=\"\",\n", " colors=colors,\n", diff --git a/3-parsl-workflows.ipynb b/3-parsl-workflows.ipynb index 4ab538a..f5a3d6b 100644 --- a/3-parsl-workflows.ipynb +++ b/3-parsl-workflows.ipynb @@ -344,7 +344,6 @@ "from parsl.data_provider.files import File\n", "import os\n", "from parsl.config import Config\n", - "from parsl.channels import LocalChannel\n", "from parsl.providers import SlurmProvider\n", "from parsl.executors import HighThroughputExecutor\n", "from parsl.launchers import SrunLauncher\n", @@ -358,10 +357,9 @@ " HighThroughputExecutor(\n", " label=\"frontera_htex\",\n", " address=address_by_hostname(),\n", - " max_workers=56,\n", + " max_workers_per_node=56,\n", " provider=SlurmProvider(\n", " cmd_timeout=60, \n", - " channel=LocalChannel(),\n", " nodes_per_block=1,\n", " partition='development',\n", " scheduler_options='#SBATCH -A ALLOCATION', # Enter allocation\n",