@@ -122,13 +122,16 @@ def __init__(self, plugin_args=None):
122122 non_daemon = self .plugin_args .get ('non_daemon' , True )
123123 maxtasks = self .plugin_args .get ('maxtasksperchild' , 10 )
124124 self .processors = self .plugin_args .get ('n_procs' , cpu_count ())
125- self .memory_gb = self .plugin_args .get ('memory_gb' , # Allocate 90% of system memory
126- get_system_total_memory_gb () * 0.9 )
127- self .raise_insufficient = self .plugin_args .get ('raise_insufficient' , True )
125+ self .memory_gb = self .plugin_args .get (
126+ 'memory_gb' , # Allocate 90% of system memory
127+ get_system_total_memory_gb () * 0.9 )
128+ self .raise_insufficient = self .plugin_args .get ('raise_insufficient' ,
129+ True )
128130
129131 # Instantiate different thread pools for non-daemon processes
130- logger .debug ('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)' ,
131- 'non' * int (non_daemon ), self .processors , self .memory_gb )
132+ logger .debug ('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d,'
133+ 'mem_gb=%0.2f)' , 'non' * int (non_daemon ), self .processors ,
134+ self .memory_gb )
132135
133136 NipypePool = NonDaemonPool if non_daemon else Pool
134137 try :
@@ -205,12 +208,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
205208 Sends jobs to workers when system resources are available.
206209 """
207210
208- # Check to see if a job is available (jobs without dependencies not run)
211+ # Check to see if a job is available (jobs with all dependencies run)
209212 # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
210213 jobids = np .nonzero (~ self .proc_done & (self .depidx .sum (0 ) == 0 ))[1 ]
211214
212- # Check available system resources by summing all threads and memory used
213- free_memory_gb , free_processors = self ._check_resources (self .pending_tasks )
215+ # Check available resources by summing all threads and memory used
216+ free_memory_gb , free_processors = self ._check_resources (
217+ self .pending_tasks )
214218
215219 stats = (len (self .pending_tasks ), len (jobids ), free_memory_gb ,
216220 self .memory_gb , free_processors , self .processors )
@@ -229,7 +233,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
229233 'be submitted to the queue. Potential deadlock' )
230234 return
231235
232- jobids = self ._sort_jobs (jobids , scheduler = self .plugin_args .get ('scheduler' ))
236+ jobids = self ._sort_jobs (jobids ,
237+ scheduler = self .plugin_args .get ('scheduler' ))
233238
234239 # Run garbage collector before potentially submitting jobs
235240 gc .collect ()
@@ -265,9 +270,10 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
265270
266271 free_memory_gb -= next_job_gb
267272 free_processors -= next_job_th
268- logger .debug ('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.' ,
269- self .procs [jobid ].fullname , jobid , next_job_gb , next_job_th ,
270- free_memory_gb , free_processors )
273+ logger .debug ('Allocating %s ID=%d (%0.2fGB, %d threads). Free: '
274+ '%0.2fGB, %d threads.' , self .procs [jobid ].fullname ,
275+ jobid , next_job_gb , next_job_th , free_memory_gb ,
276+ free_processors )
271277
272278 # change job status in appropriate queues
273279 self .proc_done [jobid ] = True
0 commit comments