|
| 1 | +from __future__ import annotations |
| 2 | + |
1 | 3 | import hashlib |
2 | 4 | import logging |
3 | 5 | import os |
4 | 6 | import pathlib |
5 | 7 | from dataclasses import dataclass |
6 | 8 | from datetime import datetime |
7 | | -from typing import Dict, List, Literal, Mapping, Optional |
| 9 | +from typing import Any, Dict, List, Literal, Mapping, Optional |
8 | 10 |
|
9 | 11 | import ispyb |
10 | 12 | import pydantic |
@@ -351,28 +353,28 @@ def trigger_dimple( |
351 | 353 |
|
352 | 354 | dcid = parameters.dcid |
353 | 355 |
|
354 | | - pdb_files = self.get_linked_pdb_files_for_dcid( |
| 356 | + pdb_files_or_codes = self.get_linked_pdb_files_for_dcid( |
355 | 357 | session, |
356 | 358 | dcid, |
357 | 359 | parameters.pdb_tmpdir, |
358 | 360 | user_pdb_dir=parameters.user_pdb_directory, |
359 | 361 | ) |
360 | 362 |
|
361 | | - if not pdb_files: |
| 363 | + if not pdb_files_or_codes: |
362 | 364 | self.log.info( |
363 | 365 | "Skipping dimple trigger: DCID %s has no associated PDB information" |
364 | 366 | % dcid |
365 | 367 | ) |
366 | 368 | return {"success": True} |
367 | | - pdb_files = [str(p) for p in pdb_files] |
| 369 | + pdb_files = [str(p) for p in pdb_files_or_codes] |
368 | 370 | self.log.info("PDB files: %s", ", ".join(pdb_files)) |
369 | 371 |
|
370 | 372 | dc = ( |
371 | 373 | session.query(DataCollection) |
372 | 374 | .filter(DataCollection.dataCollectionId == dcid) |
373 | 375 | .one() |
374 | 376 | ) |
375 | | - dimple_parameters = { |
| 377 | + dimple_parameters: dict[str, list[Any]] = { |
376 | 378 | "data": [os.fspath(parameters.mtz)], |
377 | 379 | "scaling_id": [parameters.scaling_id], |
378 | 380 | "pdb": pdb_files, |
@@ -766,17 +768,19 @@ def trigger_mrbump( |
766 | 768 | self.log.info("Skipping mrbump trigger: sequence information not available") |
767 | 769 | return {"success": True} |
768 | 770 |
|
769 | | - pdb_files = self.get_linked_pdb_files_for_dcid( |
770 | | - session, |
771 | | - dcid, |
772 | | - parameters.pdb_tmpdir, |
773 | | - user_pdb_dir=parameters.user_pdb_directory, |
774 | | - ignore_pdb_codes=True, |
| 771 | + pdb_files = tuple( |
| 772 | + self.get_linked_pdb_files_for_dcid( |
| 773 | + session, |
| 774 | + dcid, |
| 775 | + parameters.pdb_tmpdir, |
| 776 | + user_pdb_dir=parameters.user_pdb_directory, |
| 777 | + ignore_pdb_codes=True, |
| 778 | + ) |
775 | 779 | ) |
776 | 780 |
|
777 | 781 | jobids = [] |
778 | 782 |
|
779 | | - for pdb_files in {(), tuple(pdb_files)}: |
| 783 | + for pdb_files in {(), pdb_files}: |
780 | 784 | jp = self.ispyb.mx_processing.get_job_params() |
781 | 785 | jp["automatic"] = parameters.automatic |
782 | 786 | jp["comments"] = parameters.comment |
@@ -806,6 +810,7 @@ def trigger_mrbump( |
806 | 810 | self.log.debug(f"mrbump trigger: generated JobParameterID {jppid}") |
807 | 811 |
|
808 | 812 | for pdb_file in pdb_files: |
| 813 | + assert pdb_file.filepath is not None |
809 | 814 | filepath = pdb_file.filepath |
810 | 815 | if pdb_file.source == "AlphaFold": |
811 | 816 | trimmed = filepath.with_name( |
@@ -1130,7 +1135,7 @@ def trigger_big_ep( |
1130 | 1135 | class BigEPParams(pydantic.BaseModel): |
1131 | 1136 | data: pathlib.Path |
1132 | 1137 | scaled_unmerged_mtz: pathlib.Path |
1133 | | - path_ext: Optional[str] = pydantic.Field( |
| 1138 | + path_ext: str = pydantic.Field( |
1134 | 1139 | default_factory=lambda: datetime.now().strftime("%Y%m%d_%H%M%S") |
1135 | 1140 | ) |
1136 | 1141 |
|
@@ -1294,7 +1299,7 @@ def trigger_multiplex( |
1294 | 1299 | status["ntry"] += 1 |
1295 | 1300 | self.log.debug(f"dcid={dcid}\nmessage_delay={message_delay}\n{status}") |
1296 | 1301 |
|
1297 | | - multiplex_job_dcids = [] |
| 1302 | + multiplex_job_dcids: list[set[int]] = [] |
1298 | 1303 | jobids = [] |
1299 | 1304 |
|
1300 | 1305 | for group in related_dcids: |
|
0 commit comments