|
12 | 12 | from multiprocessing import Process, Pool, cpu_count, pool |
13 | 13 | from traceback import format_exception |
14 | 14 | import sys |
| 15 | +from textwrap import indent |
| 16 | +from logging import INFO |
15 | 17 |
|
16 | 18 | from copy import deepcopy |
17 | 19 | import numpy as np |
18 | | - |
19 | 20 | from ... import logging |
20 | 21 | from ...utils.profiler import get_system_total_memory_gb |
21 | 22 | from ..engine import MapNode |
@@ -126,7 +127,7 @@ def __init__(self, plugin_args=None): |
126 | 127 | self.raise_insufficient = self.plugin_args.get('raise_insufficient', True) |
127 | 128 |
|
128 | 129 | # Instantiate different thread pools for non-daemon processes |
129 | | - logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', |
| 130 | + logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', |
130 | 131 | 'non' * int(non_daemon), self.processors, self.memory_gb) |
131 | 132 |
|
132 | 133 | NipypePool = NonDaemonPool if non_daemon else Pool |
@@ -158,7 +159,7 @@ def _submit_job(self, node, updatehash=False): |
158 | 159 | run_node, (node, updatehash, self._taskid), |
159 | 160 | callback=self._async_callback) |
160 | 161 |
|
161 | | - logger.debug('MultiProc submitted task %s (taskid=%d).', |
| 162 | + logger.debug('[MultiProc] Submitted task %s (taskid=%d).', |
162 | 163 | node.fullname, self._taskid) |
163 | 164 | return self._taskid |
164 | 165 |
|
@@ -214,9 +215,17 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): |
214 | 215 | stats = (len(self.pending_tasks), len(jobids), free_memory_gb, |
215 | 216 | self.memory_gb, free_processors, self.processors) |
216 | 217 | if self._stats != stats: |
217 | | - logger.info('Currently running %d tasks, and %d jobs ready. Free ' |
218 | | - 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d', |
219 | | - *stats) |
| 218 | + tasks_list_msg = '' |
| 219 | + if logger.level <= INFO: |
| 220 | + running_tasks = [' * %s' % self.procs[jobid].fullname |
| 221 | + for _, jobid in self.pending_tasks] |
| 222 | + if running_tasks: |
| 223 | + tasks_list_msg = '\nCurrently running:\n' |
| 224 | + tasks_list_msg += '\n'.join(running_tasks) |
| 225 | + tasks_list_msg = indent(tasks_list_msg, ' ' * 21) |
| 226 | + logger.info('[MultiProc] Running %d tasks, and %d jobs ready. Free ' |
| 227 | + 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s', |
| 228 | + *stats, tasks_list_msg) |
220 | 229 | self._stats = stats |
221 | 230 |
|
222 | 231 | if free_memory_gb < 0.01 or free_processors == 0: |
|
0 commit comments