-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbuild_script.py
More file actions
507 lines (442 loc) · 15.9 KB
/
build_script.py
File metadata and controls
507 lines (442 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
"""
Build script moved to python for better extendability and interoperability.
"""
import argparse
import os
from typing import Optional
import pandas as pd
from src.mapping import get_mapping_file, load_mapping_reference
from tqdm import tqdm
# DEFINE OUTPUT DIRECTORY
OUTPUT_DIR = "/tmp" # "/tmp"
# TODO: Write this function
def fitCurveFiles(morpho_behavior_tuples):
"""
get new curve fits, list of tuples of morpho/behavior pairs
"""
def combineFiles(location_list: pd.DataFrame, ftype: str) -> pd.DataFrame:
"""Combine files and account for duplicates and desired schema.
Parameters
----------
location_list : pd.DataFrame
_description_
ftype : str
File type, one of ["bmd", "fit", "dose"]
Returns
-------
pd.DataFrame
Table of concatenated files with duplicates removed
"""
dflist = []
required_cols = {
"bmd": [
"Chemical_ID",
"End_Point",
"Model",
"BMD10",
"BMD50",
"Min_Dose",
"Max_Dose",
"AUC_Norm",
"DataQC_Flag",
"BMD_Analysis_Flag",
], # ,"BMD10_Flag","BMD50_Flag{"),
"dose": ["Chemical_ID", "End_Point", "Dose", "Response", "CI_Lo", "CI_Hi"],
"fit": ["Chemical_ID", "End_Point", "X_vals", "Y_vals"],
}
tqdm.write(f"Concatenating {ftype}...")
for loc in location_list.location:
f = pd.read_csv(loc)[required_cols[ftype]]
dflist.append(f)
# Check for empty list
if not dflist:
tqdm.write("Warning: No valid files found for concatenation")
return pd.DataFrame(columns=required_cols[ftype])
df = pd.concat(dflist).drop_duplicates()
return df
def runSampMap(
is_sample: bool = False,
drcfiles: list = [],
sid: str = "",
smap: str = "",
cid: str = "",
emap: str = "",
cclass: str = "",
fses: str = "",
descfile: str = "",
output_dir: str = OUTPUT_DIR,
) -> list[str]:
"""Run sample-to-chemical mapping.
Parameters
----------
is_sample : bool, optional
If True, runs sample mapping mode; else, runs
chemical mapping mode, by default False
drcfiles : list, optional
List of dose-response curve files to process, by default []
sid : str, optional
Sample ID, by default ""
smap : str, optional
/path/to/sample_mapping_file, by default ""
cid : str, optional
Chemical ID, by default ""
emap : str, optional
/path/to/endpoint_mapping_file, by default ""
cclass : str, optional
/path/to/chemical_class_file, by default ""
fses : str, optional
/path/to/sample_files, by default ""
descfile : str, optional
/path/to/chemical_description_file, by default ""
output_dir : str, optional
Directory to save output, by default OUTPUT_DIR (='/tmp')
Returns
-------
list[str]
List of paths to the generated output files, including:
- Core data files
- samples.csv
- chemicals.csv
- samplesToChemicals.csv
- Zebrafish files for both chemical and sample measurements
- zebrafish{Samp,Chem}XYCoords.csv
- zebrafish{Samp,Chem}DoseResponse.csv
- zebrafish{Samp,Chem}BMDs.csv)
"""
import subprocess
drc = ",".join(drcfiles)
args = (
f"--sample_id={sid} "
f"--sample_map={smap} "
f"--chem_id={cid} "
f"--ep_map={emap} "
f"--chem_class={cclass} "
f"--sample_files={fses} "
f"--chem_desc={descfile} "
f"--output_dir={output_dir} "
)
if is_sample:
cmd = f"python sampleChemMapping/map_samples_to_chemicals.py --sample --drc_files={drc} {args}"
elif len(drcfiles) > 0:
cmd = f"python sampleChemMapping/map_samples_to_chemicals.py --chemical --drc_files={drc} {args}"
else:
cmd = f"python sampleChemMapping/map_samples_to_chemicals.py {args}"
try:
process = subprocess.run(cmd, capture_output=True, text=True, shell=True)
# Verify successful command execution
if process.returncode != 0:
raise subprocess.CalledProcessError(
returncode=process.returncode,
cmd=cmd,
output=process.stdout,
stderr=process.stderr,
)
# Show command line logging messages
for line in process.stdout.splitlines():
if line.strip():
tqdm.write(line)
except Exception as e:
tqdm.write(f"An error occurred while trying to run the command: {str(e)}")
raise e
##now we validate the files that came out.
dblist = [
os.path.join(output_dir, "samples.csv"),
os.path.join(output_dir, "chemicals.csv"),
os.path.join(output_dir, "samplesToChemicals.csv"),
]
for ftype in ["XYCoords.csv", "DoseResponse.csv", "BMDs.csv"]:
dblist.append(os.path.join(output_dir, f"zebrafishChem{ftype}"))
dblist.append(os.path.join(output_dir, f"zebrafishSamp{ftype}"))
return dblist
# runSchemaCheck(dblist)
def runExposome(
chem_id_file: str,
output_dir: str = OUTPUT_DIR,
) -> list[str]:
"""Pull exposome data.
Parameters
----------
chem_id_file : str
Path to file containing chemical IDs for which to pull
exposome data
Returns
-------
list[str]
List containing path to output exposomeGeneStats.csv file
"""
cmd = f"python exposome/exposome_summary_stats.py {chem_id_file}"
tqdm.write(cmd)
os.system(cmd)
return [os.path.join(output_dir, "exposomeGeneStats.csv")]
def runExpression(
gex: str,
chem: str,
ginfo: str,
output_dir: str = OUTPUT_DIR,
) -> list[str]:
"""Parse gene expression data using R.
Parameters
----------
gex : str
Path to gene expression data
chem : str
Path to chemical data file
ginfo : str
Path to gene info file
Returns
-------
list[str]
List of these three output files:
- "{OUTPUT_DIR}/srpDEGPathways.csv": Enriched pathways
in differentially expressed genes
- "{OUTPUT_DIR}/srpDEGStats.csv" : Summary statistics
for differentially expressed genes
- "{OUTPUT_DIR}/allGeneEx.csv" : All gene expression data
Note that OUTPUT_DIR = "/tmp" by default.
"""
cmd = f"Rscript zfExp/parseGexData.R {gex} {chem} {ginfo}"
tqdm.write(cmd)
os.system(cmd)
return [
os.path.join(output_dir, "srpDEGPathways.csv"),
os.path.join(output_dir, "srpDEGStats.csv"),
os.path.join(output_dir, "allGeneEx.csv"),
]
def runSchemaCheck(dbfiles: list[Optional[str]] = []):
"""Validate database files against schema using LinkML.
Parameters
----------
dbfiles : list[Optional[str]], optional
List of database files, by default []
"""
##TODO: make this work with internal calls
for filename in dbfiles:
classname = os.path.basename(filename).split(".")[0]
cmd = f"linkml-validate --schema srpAnalytics.yaml {filename} --target-class {classname}"
tqdm.write(cmd)
os.system(cmd)
def main():
"""Run data processing and analytics pipeline for Superfund data.
This is the main entrypoint for the Superfund data processing
pipeline designed to run inside a Docker container. The pipeline
processes various chemical and sample data files, performs
benchmark dose calculations, maps samples to chemicals, runs
exposome analyses, and processes gene expression data. The
workflow is modular, allowing specific components to be executed
at a time depending on the supplied command line arguments.
Workflow Components:
-------------------
1. Data Preparation:
- Loads mapping reference data
- Identifies morphology and behavior data pairs for chemicals
- Retrieves various mapping files (sample IDs, chemical IDs, endpoints, etc.)
2. Benchmark Dose (BMD) Analysis:
- Calculates dose-response curves and benchmark doses for chemical exposures
- Combines results across different sample types (chemical, extract) and data types (BMD, fit, dose)
3. Sample-Chemical Mapping:
- Links samples to chemicals using various reference files
- Validates outputs against schema definitions
4. Exposome Analysis:
- Processes exposome data for chemicals to identify environmental exposures
5. Gene Expression Analysis:
- Processes differential gene expression data associated with chemical exposures
- Performs pathway analysis on differentially expressed genes
Command-line Arguments:
----------------------
`--bmd` : Re-run benchmark dose calculation and dependent commands
`--samps` : Re-run sample-chemical mapping
`--expo` : Re-run exposome sample collection
`--geneEx` : Re-run gene expression generation
Outputs:
--------
Various CSV files stored in OUTPUT_DIR, including:
- Core data
- samples.csv
- chemicals.csv
- samplesToChemicals.csv
- Zebrafish assay data
- zebrafish{Chem,Samp}BMDs.csv
- zebrafish{Chem,Samp}DoseResponse.csv
- zebrafish{Chem,Samp}XYCoords.csv
- exposomeGeneStats.csv (exposome analysis)
- srpDEGPathways.csv, srpDEGStats.csv, allGeneEx.csv (gene expression results)
Notes:
------
- Intermediate files are created during processing and removed after use
- All outputs are validated against the LinkML schema definitions
- Progress is tracked using tqdm progress bars and informative messages
"""
df = load_mapping_reference()
####
# file parsing - collects all files we might need for the tool below
####
##first find the morphology and behavior pairs for chemical sources
chemdf = df.loc[df.sample_type == "chemical"]
morph = chemdf.loc[chemdf.data_type == "morphology"]
beh = chemdf.loc[chemdf.data_type == "behavior"]
tupes = []
for n in morph.name:
tupes.append(
[morph.loc[morph.name == n].location, beh.loc[beh.name == n].location]
)
##now map sample information
sid = get_mapping_file(df, "sampId")
cid = get_mapping_file(df, "chemId")
cclass = get_mapping_file(df, "class1")
emap = get_mapping_file(df, "endpointMap")
fses = get_mapping_file(df, "sample", return_first=False)
descfile = get_mapping_file(df, "chemdesc")
smap = get_mapping_file(df, "sampMap")
gex1 = get_mapping_file(df, "expression", return_first=False)
ginfo = get_mapping_file(df, "geneInfo")
###now we can call individiual commands
parser = argparse.ArgumentParser(
"Pull files from github list of files and call appropriate command"
)
parser.add_argument(
"--bmd",
dest="bmd",
action="store_true",
default=False,
help="Re-run benchmark dose calculation and dependent commands",
)
parser.add_argument(
"--samps",
dest="samps",
action="store_true",
default=False,
help="Re run sample-chem mapping",
)
parser.add_argument(
"--expo",
dest="expo",
action="store_true",
default=False,
help="Re run exposome sample collection",
)
parser.add_argument(
"--geneEx",
dest="geneEx",
action="store_true",
default=False,
help="Re run gene expression generation",
)
parser.add_argument(
"--output_dir",
dest="output_dir",
default=OUTPUT_DIR,
help="Directory to store output files (default: '/tmp')",
)
args = parser.parse_args()
##call bmdrc on all morphology/behavior pairs for sample sources
if args.bmd:
tqdm.write("Re-running benchmark dose collection...")
newbmds, newfits, newdoses = [], [], []
fitCurveFiles()
# ------------------------------------------------------------------------
# Benchmark Dose (BMD) Calculation / Sample-Chem Mapping (SAMPS) Workflows
# ------------------------------------------------------------------------
if args.bmd or args.samps: ### need to rerun samples if we have created new bmds
# add chemical BMDS, fits, curves to existing data
chem_files, sample_files = [], []
# Define files and set progress bar incrementes for concatenating each
sample_type = ["chemical", "extract"]
data_type = ["bmd", "fit", "dose"]
total_iterations = len(sample_type) * len(data_type)
progress_bar = tqdm(total=total_iterations, desc="Combining files")
for st in sample_type:
tqdm.write(f"Processing {st} samples...")
for dt in data_type:
fdf = combineFiles(
df.loc[df.sample_type == st].loc[df.data_type == dt], dt
)
fname = os.path.join(args.output_dir, f"tmp_{st}_{dt}.csv")
fdf.to_csv(fname, index=False)
if st == "chemical":
chem_files.append(fname)
else:
sample_files.append(fname)
progress_bar.update(1)
# Update progress bar after completion
progress_bar.set_description("Combining files... Done!")
progress_bar.close()
# Define fixed params for sample mapping
all_res = list()
sampmap_args = {
"sid": sid,
"smap": smap,
"cid": cid,
"emap": emap,
"cclass": cclass,
"fses": fses,
"descfile": descfile,
"output_dir": args.output_dir,
}
# Iterate through sampMap params
sampmap_params = [
{"is_sample": True, "drcfiles": sample_files},
{"is_sample": False, "drcfiles": chem_files},
{"is_sample": False, "drcfiles": []},
]
progress_bar = tqdm(
range(len(sampmap_params)),
desc="Running sample mapping",
)
# Perform sample mapping
for smp in sampmap_params:
smpargs = {**smp, **sampmap_args}
# tqdm.write(
# f"Performing sample mapping with the following parameters: {smpargs}"
# )
res = runSampMap(**smpargs)
all_res.extend(res)
progress_bar.update(1)
# Update progress bar after completion
progress_bar.set_description("Running sample mapping... Done!")
progress_bar.close()
# Collect all unique files and remove temp files
all_res = list(set(all_res))
for f in sample_files + chem_files:
# tqdm.write(f"Filename: {f}")
# os.system(f"head {f}")
os.system(f"rm {f}")
# Validate schema
runSchemaCheck(res)
# -----------------
# Exposome Workflow
# -----------------
if args.expo:
res = runExposome(cid, output_dir=args.output_dir)
# for f in res:
# tqdm.write(f"Filename: {f}")
# os.system(f"head {f}")
runSchemaCheck(res)
# ------------------------
# Gene Expression Workflow
# ------------------------
if args.geneEx:
if not os.path.exists(os.path.join(args.output_dir, "chemicals.csv")):
runSampMap(
is_sample=False,
drcfiles=[],
sid=sid,
smap=smap,
cid=cid,
emap=emap,
cclass=cclass,
fses=fses,
descfile=descfile,
output_dir=args.output_dir,
)
res = runExpression(
gex1,
os.path.join(args.output_dir, "chemicals.csv"),
ginfo,
output_dir=args.output_dir,
)
# for f in res:
# tqdm.write(f"Filename: {f}")
# os.system(f"head {f}")
runSchemaCheck(res)
if __name__ == "__main__":
main()