|
1 | 1 | import os |
2 | 2 | import re |
3 | 3 | import subprocess |
| 4 | +import glob |
| 5 | +from typing import List |
4 | 6 |
|
5 | 7 |
|
6 | 8 | def write_stats(stats: dict, output_dir: str): |
@@ -83,3 +85,145 @@ def pairing(x): |
83 | 85 |
|
84 | 86 | # Return a list with the base name appended by "RIGHT" and "LEFT" |
85 | 87 | return [clear_primer + "RIGHT", clear_primer + "LEFT"] |
| 88 | + |
| 89 | + |
| 90 | +def collect_fasta_files(inputs: List[str]) -> List[str]: |
| 91 | + """ |
| 92 | + Collect all FASTA files from input paths (files or directories). |
| 93 | + |
| 94 | + Args: |
| 95 | + inputs: List of input paths. Each can be: |
| 96 | + - A FASTA file (*.fa, *.fna, *.fasta or their .gz versions) |
| 97 | + - A directory containing FASTA files |
| 98 | + |
| 99 | + Returns: |
| 100 | + List of paths to all FASTA files found. |
| 101 | + |
| 102 | + Raises: |
| 103 | + ValueError: If an input path doesn't exist or no FASTA files are found. |
| 104 | + """ |
| 105 | + fasta_extensions = ['.fa', '.fna', '.fasta', '.fa.gz', '.fna.gz', '.fasta.gz'] |
| 106 | + fasta_files = [] |
| 107 | + |
| 108 | + for input_path in inputs: |
| 109 | + if not os.path.exists(input_path): |
| 110 | + raise ValueError(f"Input path does not exist: {input_path}") |
| 111 | + |
| 112 | + if os.path.isfile(input_path): |
| 113 | + # Check if it's a FASTA file |
| 114 | + if any(input_path.endswith(ext) for ext in fasta_extensions): |
| 115 | + fasta_files.append(input_path) |
| 116 | + else: |
| 117 | + raise ValueError(f"Input file is not a recognized FASTA file: {input_path}") |
| 118 | + elif os.path.isdir(input_path): |
| 119 | + # Collect all FASTA files from directory |
| 120 | + dir_files = [] |
| 121 | + for ext in fasta_extensions: |
| 122 | + pattern = os.path.join(input_path, f"*{ext}") |
| 123 | + dir_files.extend(glob.glob(pattern)) |
| 124 | + |
| 125 | + if not dir_files: |
| 126 | + raise ValueError(f"No FASTA files found in directory: {input_path}") |
| 127 | + |
| 128 | + fasta_files.extend(sorted(dir_files)) |
| 129 | + else: |
| 130 | + raise ValueError(f"Input path is neither a file nor a directory: {input_path}") |
| 131 | + |
| 132 | + if not fasta_files: |
| 133 | + raise ValueError("No FASTA files found in any of the provided inputs") |
| 134 | + |
| 135 | + return sorted(fasta_files) |
| 136 | + |
| 137 | + |
| 138 | +def process_multiple_inputs( |
| 139 | + args: 'Namespace', |
| 140 | + input_fasta_files: List[str], |
| 141 | + output_dir: str, |
| 142 | + initial_generator: str = 'primer3' |
| 143 | +) -> str: |
| 144 | + """ |
| 145 | + Process multiple input FASTA files separately for initial set generation and merge their outputs. |
| 146 | + |
| 147 | + This function processes each input FASTA file independently: |
| 148 | + 1. Creates a temporary directory for each input |
| 149 | + 2. Runs uniline_fasta and initial_set_generation for each |
| 150 | + 3. Merges individual outputs |
| 151 | + 4. Combines all outputs into a single output.fa file |
| 152 | + |
| 153 | + Args: |
| 154 | + args: Arguments object containing all pipeline parameters |
| 155 | + input_fasta_files: List of paths to FASTA files to process |
| 156 | + output_dir: Output directory path (e.g., .tmp/0/) |
| 157 | + initial_generator: Initial generator to use ('primer3' or 'oligominer') |
| 158 | + |
| 159 | + Returns: |
| 160 | + Path to the final merged output.fa file |
| 161 | + |
| 162 | + Raises: |
| 163 | + ValueError: If initial_generator is unknown |
| 164 | + ImportError: If required modules cannot be imported |
| 165 | + """ |
| 166 | + # Import here to avoid circular dependencies |
| 167 | + from PROBESt.bash_wrappers import uniline_fasta |
| 168 | + from PROBESt.merge import merge |
| 169 | + |
| 170 | + # Create output directory if it doesn't exist |
| 171 | + os.makedirs(output_dir, exist_ok=True) |
| 172 | + |
| 173 | + # Process each input FASTA separately for initial set generation |
| 174 | + temp_output_dirs = [] |
| 175 | + |
| 176 | + for idx, fasta_file in enumerate(input_fasta_files): |
| 177 | + print(f"\nProcessing input {idx + 1}/{len(input_fasta_files)}: {fasta_file}") |
| 178 | + |
| 179 | + # Create temporary directory for this input |
| 180 | + temp_dir = os.path.join(output_dir, f"input_{idx}/") |
| 181 | + os.makedirs(temp_dir, exist_ok=True) |
| 182 | + temp_output_dirs.append(temp_dir) |
| 183 | + |
| 184 | + # Create a temporary args object with this specific input file |
| 185 | + temp_args = type('Args', (), {})() |
| 186 | + for attr in dir(args): |
| 187 | + if not attr.startswith('_'): |
| 188 | + setattr(temp_args, attr, getattr(args, attr)) |
| 189 | + temp_args.input = fasta_file |
| 190 | + |
| 191 | + # Make uniline fasta for this input |
| 192 | + uniline_fasta(temp_args, temp_dir) |
| 193 | + print(f"Input fasta parsed: {fasta_file}") |
| 194 | + |
| 195 | + # Template generation for this input |
| 196 | + if initial_generator == "primer3": |
| 197 | + from PROBESt.primer3 import initial_set_generation |
| 198 | + initial_set_generation(temp_args, temp_dir) |
| 199 | + elif initial_generator == "oligominer": |
| 200 | + from PROBESt.oligominer import initial_set_generation |
| 201 | + initial_set_generation(temp_args, temp_dir) |
| 202 | + else: |
| 203 | + raise ValueError(f"Unknown initial generator: {initial_generator}") |
| 204 | + |
| 205 | + # Merge this input's output |
| 206 | + merge(algo=temp_args.algorithm, |
| 207 | + input=os.path.join(temp_dir, "output.fa"), |
| 208 | + output=os.path.join(temp_dir, "merged.fa"), |
| 209 | + tmp=os.path.join(temp_dir, "fasta_table.tsv"), |
| 210 | + NNN=10, |
| 211 | + script_path=temp_args.script_path) |
| 212 | + |
| 213 | + # Merge all outputs to the first directory's output.fa |
| 214 | + print("\nMerging all initial set outputs...") |
| 215 | + final_output_fa = os.path.join(output_dir, "output.fa") |
| 216 | + with open(final_output_fa, 'w') as outfile: |
| 217 | + for temp_dir in temp_output_dirs: |
| 218 | + merged_fa = os.path.join(temp_dir, "merged.fa") |
| 219 | + if os.path.exists(merged_fa) and os.path.getsize(merged_fa) > 0: |
| 220 | + with open(merged_fa, 'r') as infile: |
| 221 | + outfile.write(infile.read()) |
| 222 | + else: |
| 223 | + # If merged.fa doesn't exist, try output.fa |
| 224 | + output_fa = os.path.join(temp_dir, "output.fa") |
| 225 | + if os.path.exists(output_fa) and os.path.getsize(output_fa) > 0: |
| 226 | + with open(output_fa, 'r') as infile: |
| 227 | + outfile.write(infile.read()) |
| 228 | + |
| 229 | + return final_output_fa |
0 commit comments