1212from multiprocessing import Process , Pool , cpu_count , pool
1313from traceback import format_exception
1414import sys
15+ import gc
1516
1617from copy import deepcopy
1718import numpy as np
@@ -121,13 +122,16 @@ def __init__(self, plugin_args=None):
121122 non_daemon = self .plugin_args .get ('non_daemon' , True )
122123 maxtasks = self .plugin_args .get ('maxtasksperchild' , 10 )
123124 self .processors = self .plugin_args .get ('n_procs' , cpu_count ())
124- self .memory_gb = self .plugin_args .get ('memory_gb' , # Allocate 90% of system memory
125- get_system_total_memory_gb () * 0.9 )
126- self .raise_insufficient = self .plugin_args .get ('raise_insufficient' , True )
125+ self .memory_gb = self .plugin_args .get (
126+ 'memory_gb' , # Allocate 90% of system memory
127+ get_system_total_memory_gb () * 0.9 )
128+ self .raise_insufficient = self .plugin_args .get ('raise_insufficient' ,
129+ True )
127130
128131 # Instantiate different thread pools for non-daemon processes
129- logger .debug ('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)' ,
130- 'non' * int (non_daemon ), self .processors , self .memory_gb )
132+ logger .debug ('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d,'
133+ 'mem_gb=%0.2f)' , 'non' * int (non_daemon ), self .processors ,
134+ self .memory_gb )
131135
132136 NipypePool = NonDaemonPool if non_daemon else Pool
133137 try :
@@ -204,12 +208,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
204208 Sends jobs to workers when system resources are available.
205209 """
206210
207- # Check to see if a job is available (jobs without dependencies not run)
211+ # Check to see if a job is available (jobs with all dependencies run)
208212 # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
209213 jobids = np .nonzero (~ self .proc_done & (self .depidx .sum (0 ) == 0 ))[1 ]
210214
211- # Check available system resources by summing all threads and memory used
212- free_memory_gb , free_processors = self ._check_resources (self .pending_tasks )
215+ # Check available resources by summing all threads and memory used
216+ free_memory_gb , free_processors = self ._check_resources (
217+ self .pending_tasks )
213218
214219 stats = (len (self .pending_tasks ), len (jobids ), free_memory_gb ,
215220 self .memory_gb , free_processors , self .processors )
@@ -228,7 +233,11 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
228233 'be submitted to the queue. Potential deadlock' )
229234 return
230235
231- jobids = self ._sort_jobs (jobids , scheduler = self .plugin_args .get ('scheduler' ))
236+ jobids = self ._sort_jobs (jobids ,
237+ scheduler = self .plugin_args .get ('scheduler' ))
238+
239+ # Run garbage collector before potentially submitting jobs
240+ gc .collect ()
232241
233242 # Submit jobs
234243 for jobid in jobids :
@@ -261,9 +270,10 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
261270
262271 free_memory_gb -= next_job_gb
263272 free_processors -= next_job_th
264- logger .debug ('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.' ,
265- self .procs [jobid ].fullname , jobid , next_job_gb , next_job_th ,
266- free_memory_gb , free_processors )
273+ logger .debug ('Allocating %s ID=%d (%0.2fGB, %d threads). Free: '
274+ '%0.2fGB, %d threads.' , self .procs [jobid ].fullname ,
275+ jobid , next_job_gb , next_job_th , free_memory_gb ,
276+ free_processors )
267277
268278 # change job status in appropriate queues
269279 self .proc_done [jobid ] = True
@@ -292,6 +302,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
292302 free_processors += next_job_th
293303 # Display stats next loop
294304 self ._stats = None
305+
306+ # Clean up any debris from running node in main process
307+ gc .collect ()
295308 continue
296309
297310 # Task should be submitted to workers
0 commit comments