|
10 | 10 | import os |
11 | 11 | import sys |
12 | 12 | from copy import deepcopy |
13 | | -from shutil import rmtree |
14 | 13 | import pytest |
15 | 14 |
|
16 | 15 | from ... import engine as pe |
17 | 16 | from ....interfaces import base as nib |
18 | 17 | from ....interfaces import utility as niu |
19 | 18 | from .... import config |
20 | | -from ..utils import merge_dict, clean_working_directory, write_workflow_prov |
| 19 | +from ..utils import clean_working_directory, write_workflow_prov |
| 20 | + |
| 21 | + |
| 22 | +class InputSpec(nib.TraitedSpec): |
| 23 | + in_file = nib.File(exists=True, copyfile=True) |
| 24 | + |
| 25 | + |
| 26 | +class OutputSpec(nib.TraitedSpec): |
| 27 | + output1 = nib.traits.List(nib.traits.Int, desc='outputs') |
| 28 | + |
| 29 | + |
| 30 | +class UtilsTestInterface(nib.BaseInterface): |
| 31 | + input_spec = InputSpec |
| 32 | + output_spec = OutputSpec |
| 33 | + |
| 34 | + def _run_interface(self, runtime): |
| 35 | + runtime.returncode = 0 |
| 36 | + return runtime |
| 37 | + |
| 38 | + def _list_outputs(self): |
| 39 | + outputs = self._outputs().get() |
| 40 | + outputs['output1'] = [1] |
| 41 | + return outputs |
21 | 42 |
|
22 | 43 |
|
23 | 44 | def test_identitynode_removal(tmpdir): |
@@ -99,204 +120,11 @@ class InputSpec(nib.TraitedSpec): |
99 | 120 | config.set_default_config() |
100 | 121 |
|
101 | 122 |
|
102 | | -def test_outputs_removal(tmpdir): |
103 | | - def test_function(arg1): |
104 | | - import os |
105 | | - file1 = os.path.join(os.getcwd(), 'file1.txt') |
106 | | - file2 = os.path.join(os.getcwd(), 'file2.txt') |
107 | | - fp = open(file1, 'wt') |
108 | | - fp.write('%d' % arg1) |
109 | | - fp.close() |
110 | | - fp = open(file2, 'wt') |
111 | | - fp.write('%d' % arg1) |
112 | | - fp.close() |
113 | | - return file1, file2 |
114 | | - |
115 | | - n1 = pe.Node( |
116 | | - niu.Function( |
117 | | - input_names=['arg1'], |
118 | | - output_names=['file1', 'file2'], |
119 | | - function=test_function), |
120 | | - base_dir=tmpdir.strpath, |
121 | | - name='testoutputs') |
122 | | - n1.inputs.arg1 = 1 |
123 | | - n1.config = {'execution': {'remove_unnecessary_outputs': True}} |
124 | | - n1.config = merge_dict(deepcopy(config._sections), n1.config) |
125 | | - n1.run() |
126 | | - assert tmpdir.join(n1.name, 'file1.txt').check() |
127 | | - assert tmpdir.join(n1.name, 'file1.txt').check() |
128 | | - n1.needed_outputs = ['file2'] |
129 | | - n1.run() |
130 | | - assert not tmpdir.join(n1.name, 'file1.txt').check() |
131 | | - assert tmpdir.join(n1.name, 'file2.txt').check() |
132 | | - |
133 | | - |
134 | | -class InputSpec(nib.TraitedSpec): |
135 | | - in_file = nib.File(exists=True, copyfile=True) |
136 | | - |
137 | | - |
138 | | -class OutputSpec(nib.TraitedSpec): |
139 | | - output1 = nib.traits.List(nib.traits.Int, desc='outputs') |
140 | | - |
141 | | - |
142 | | -class UtilsTestInterface(nib.BaseInterface): |
143 | | - input_spec = InputSpec |
144 | | - output_spec = OutputSpec |
145 | | - |
146 | | - def _run_interface(self, runtime): |
147 | | - runtime.returncode = 0 |
148 | | - return runtime |
149 | | - |
150 | | - def _list_outputs(self): |
151 | | - outputs = self._outputs().get() |
152 | | - outputs['output1'] = [1] |
153 | | - return outputs |
154 | | - |
155 | | - |
156 | | -def test_inputs_removal(tmpdir): |
157 | | - file1 = tmpdir.join('file1.txt') |
158 | | - file1.write('dummy_file') |
159 | | - n1 = pe.Node( |
160 | | - UtilsTestInterface(), base_dir=tmpdir.strpath, name='testinputs') |
161 | | - n1.inputs.in_file = file1.strpath |
162 | | - n1.config = {'execution': {'keep_inputs': True}} |
163 | | - n1.config = merge_dict(deepcopy(config._sections), n1.config) |
164 | | - n1.run() |
165 | | - assert tmpdir.join(n1.name, 'file1.txt').check() |
166 | | - n1.inputs.in_file = file1.strpath |
167 | | - n1.config = {'execution': {'keep_inputs': False}} |
168 | | - n1.config = merge_dict(deepcopy(config._sections), n1.config) |
169 | | - n1.overwrite = True |
170 | | - n1.run() |
171 | | - assert not tmpdir.join(n1.name, 'file1.txt').check() |
172 | | - |
173 | | - |
174 | | -def test_outputs_removal_wf(tmpdir): |
175 | | - def test_function(arg1): |
176 | | - import os |
177 | | - file1 = os.path.join(os.getcwd(), 'file1.txt') |
178 | | - file2 = os.path.join(os.getcwd(), 'file2.txt') |
179 | | - file3 = os.path.join(os.getcwd(), 'file3.txt') |
180 | | - file4 = os.path.join(os.getcwd(), 'subdir', 'file1.txt') |
181 | | - files = [file1, file2, file3, file4] |
182 | | - os.mkdir("subdir") |
183 | | - for filename in files: |
184 | | - with open(filename, 'wt') as fp: |
185 | | - fp.write('%d' % arg1) |
186 | | - return file1, file2, os.path.join(os.getcwd(), "subdir") |
187 | | - |
188 | | - def test_function2(in_file, arg): |
189 | | - import os |
190 | | - in_arg = open(in_file).read() |
191 | | - file1 = os.path.join(os.getcwd(), 'file1.txt') |
192 | | - file2 = os.path.join(os.getcwd(), 'file2.txt') |
193 | | - file3 = os.path.join(os.getcwd(), 'file3.txt') |
194 | | - files = [file1, file2, file3] |
195 | | - for filename in files: |
196 | | - with open(filename, 'wt') as fp: |
197 | | - fp.write('%d' % arg + in_arg) |
198 | | - return file1, file2, 1 |
199 | | - |
200 | | - def test_function3(arg): |
201 | | - import os |
202 | | - return arg |
203 | | - |
204 | | - for plugin in ('Linear', ): # , 'MultiProc'): |
205 | | - n1 = pe.Node( |
206 | | - niu.Function( |
207 | | - input_names=['arg1'], |
208 | | - output_names=['out_file1', 'out_file2', 'dir'], |
209 | | - function=test_function), |
210 | | - name='n1', |
211 | | - base_dir=tmpdir.strpath) |
212 | | - n1.inputs.arg1 = 1 |
213 | | - |
214 | | - n2 = pe.Node( |
215 | | - niu.Function( |
216 | | - input_names=['in_file', 'arg'], |
217 | | - output_names=['out_file1', 'out_file2', 'n'], |
218 | | - function=test_function2), |
219 | | - name='n2', |
220 | | - base_dir=tmpdir.strpath) |
221 | | - n2.inputs.arg = 2 |
222 | | - |
223 | | - n3 = pe.Node( |
224 | | - niu.Function( |
225 | | - input_names=['arg'], |
226 | | - output_names=['n'], |
227 | | - function=test_function3), |
228 | | - name='n3', |
229 | | - base_dir=tmpdir.strpath) |
230 | | - |
231 | | - wf = pe.Workflow( |
232 | | - name="node_rem_test" + plugin, base_dir=tmpdir.strpath) |
233 | | - wf.connect(n1, "out_file1", n2, "in_file") |
234 | | - |
235 | | - wf.run(plugin='Linear') |
236 | | - |
237 | | - for remove_unnecessary_outputs in [True, False]: |
238 | | - config.set_default_config() |
239 | | - wf.config = { |
240 | | - 'execution': { |
241 | | - 'remove_unnecessary_outputs': remove_unnecessary_outputs |
242 | | - } |
243 | | - } |
244 | | - rmtree(os.path.join(wf.base_dir, wf.name)) |
245 | | - wf.run(plugin=plugin) |
246 | | - |
247 | | - assert os.path.exists( |
248 | | - os.path.join(wf.base_dir, wf.name, n1.name, |
249 | | - 'file2.txt')) != remove_unnecessary_outputs |
250 | | - assert os.path.exists( |
251 | | - os.path.join(wf.base_dir, wf.name, n1.name, "subdir", |
252 | | - 'file1.txt')) != remove_unnecessary_outputs |
253 | | - assert os.path.exists( |
254 | | - os.path.join(wf.base_dir, wf.name, n1.name, 'file1.txt')) |
255 | | - assert os.path.exists( |
256 | | - os.path.join(wf.base_dir, wf.name, n1.name, |
257 | | - 'file3.txt')) != remove_unnecessary_outputs |
258 | | - assert os.path.exists( |
259 | | - os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) |
260 | | - assert os.path.exists( |
261 | | - os.path.join(wf.base_dir, wf.name, n2.name, 'file2.txt')) |
262 | | - assert os.path.exists( |
263 | | - os.path.join(wf.base_dir, wf.name, n2.name, |
264 | | - 'file3.txt')) != remove_unnecessary_outputs |
265 | | - |
266 | | - n4 = pe.Node(UtilsTestInterface(), name='n4', base_dir=tmpdir.strpath) |
267 | | - wf.connect(n2, "out_file1", n4, "in_file") |
268 | | - |
269 | | - def pick_first(l): |
270 | | - return l[0] |
271 | | - |
272 | | - wf.connect(n4, ("output1", pick_first), n3, "arg") |
273 | | - for remove_unnecessary_outputs in [True, False]: |
274 | | - for keep_inputs in [True, False]: |
275 | | - config.set_default_config() |
276 | | - wf.config = { |
277 | | - 'execution': { |
278 | | - 'keep_inputs': keep_inputs, |
279 | | - 'remove_unnecessary_outputs': |
280 | | - remove_unnecessary_outputs |
281 | | - } |
282 | | - } |
283 | | - rmtree(os.path.join(wf.base_dir, wf.name)) |
284 | | - wf.run(plugin=plugin) |
285 | | - assert os.path.exists( |
286 | | - os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) |
287 | | - assert os.path.exists( |
288 | | - os.path.join(wf.base_dir, wf.name, n2.name, |
289 | | - 'file2.txt')) != remove_unnecessary_outputs |
290 | | - assert os.path.exists( |
291 | | - os.path.join(wf.base_dir, wf.name, n4.name, |
292 | | - 'file1.txt')) == keep_inputs |
293 | | - |
294 | | - |
295 | | -def fwhm(fwhm): |
296 | | - return fwhm |
297 | | - |
298 | | - |
299 | 123 | def create_wf(name): |
| 124 | + """Creates a workflow for the following tests""" |
| 125 | + def fwhm(fwhm): |
| 126 | + return fwhm |
| 127 | + |
300 | 128 | pipe = pe.Workflow(name=name) |
301 | 129 | process = pe.Node( |
302 | 130 | niu.Function( |
|
0 commit comments