Skip to content

Commit f3640d6

Browse files
jacob720GDYendell
authored andcommitted
Add function to create interleaved VDS
1 parent d6971c2 commit f3640d6

5 files changed

Lines changed: 511 additions & 2 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ dependencies = [
1616
"numpy",
1717
"pillow",
1818
"typer",
19+
"h5py",
1920
] # Add project dependencies here, e.g. ["click", "numpy"]
2021
dynamic = ["version"]
2122
license.file = "LICENSE"
@@ -24,7 +25,7 @@ requires-python = ">=3.11"
2425

2526
[project.optional-dependencies]
2627
dev = [
27-
"tickit-devices>=0.4.1",
28+
"tickit-devices>=0.4.2",
2829
"aioca",
2930
"black",
3031
"copier",
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastcs.attributes import AttrRW
1+
from fastcs.attributes import AttrR, AttrRW
22
from fastcs_odin.controllers.odin_data.frame_processor import (
33
FrameProcessorAdapterController,
44
)
@@ -7,3 +7,5 @@
77
class EigerFrameProcessorAdapterController(FrameProcessorAdapterController):
88
data_compression: AttrRW[str]
99
data_datatype: AttrRW[str]
10+
data_dims_0: AttrR[int] # y
11+
data_dims_1: AttrR[int] # x

src/fastcs_eiger/controllers/odin/eiger_odin_controller.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
import asyncio
2+
from pathlib import Path
23

34
from fastcs.attributes import AttrRW
45
from fastcs.connections import IPConnectionSettings
56
from fastcs.datatypes import Int
67
from fastcs.methods import command
78

89
from fastcs_eiger.controllers.eiger_controller import COMMAND_GROUP, EigerController
10+
from fastcs.datatypes import Bool
11+
from fastcs.methods import command
12+
13+
from fastcs_eiger.controllers.eiger_controller import EigerController
14+
from fastcs_eiger.controllers.odin.generate_vds import create_interleave_vds
915
from fastcs_eiger.controllers.odin.odin_controller import OdinController
1016
from fastcs_eiger.eiger_parameter import EigerAPIVersion
1117

@@ -19,6 +25,7 @@ class EigerOdinController(EigerController):
1925
description="Timeout for start writing command",
2026
group=COMMAND_GROUP,
2127
)
28+
enable_vds_creation = AttrRW(Bool())
2229

2330
def __init__(
2431
self,
@@ -63,6 +70,10 @@ async def start_writing(self):
6370
self.OD.FP.data_datatype.put(f"uint{self.detector.bit_depth_image.get()}"),
6471
)
6572

73+
path = Path(self.OD.file_path.get())
74+
prefix = self.OD.file_prefix.get()
75+
frame_count = self.OD.FP.frames.get()
76+
6677
await self.OD.FP.start_writing()
6778

6879
try:
@@ -71,3 +82,18 @@ async def start_writing(self):
7182
)
7283
except TimeoutError as e:
7384
raise TimeoutError("File writers failed to start") from e
85+
86+
if self.enable_vds_creation.get():
87+
create_interleave_vds(
88+
path=path,
89+
prefix=prefix,
90+
datasets=["data"],
91+
frame_count=frame_count,
92+
frames_per_block=self.OD.block_size.get(),
93+
blocks_per_file=self.OD.FP.process_blocks_per_file.get(),
94+
frame_shape=(
95+
self.OD.FP.data_dims_1.get(),
96+
self.OD.FP.data_dims_0.get(),
97+
),
98+
dtype=self.OD.FP.data_datatype.get(),
99+
)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import math
2+
from dataclasses import dataclass
3+
from pathlib import Path
4+
5+
import h5py
6+
from fastcs.logging import bind_logger
7+
8+
logger = bind_logger(__name__)
9+
10+
11+
@dataclass
12+
class FileFrames:
13+
frames: int
14+
start: int
15+
frames_per_block: int
16+
17+
@property
18+
def blocks(self):
19+
return self.frames // self.frames_per_block
20+
21+
@property
22+
def remainder_frames(self):
23+
return self.frames % self.frames_per_block
24+
25+
26+
def _get_frames_per_file_writer(
27+
frame_count: int, frames_per_block: int, n_file_writers: int
28+
) -> list[int]:
29+
n_blocks = math.ceil(frame_count / frames_per_block)
30+
min_blocks_per_file = n_blocks // n_file_writers
31+
remainder = n_blocks - min_blocks_per_file * n_file_writers
32+
33+
frames_per_file_writer = []
34+
for i in range(n_file_writers):
35+
blocks = min_blocks_per_file + (i < remainder)
36+
frames_per_file_writer.append(blocks * frames_per_block)
37+
38+
overflow = sum(frames_per_file_writer) - frame_count
39+
frames_per_file_writer[remainder - 1] -= overflow
40+
return frames_per_file_writer
41+
42+
43+
def _calculate_frame_distribution(
44+
frame_count, frames_per_block, blocks_per_file, n_file_writers
45+
) -> dict[int, FileFrames]:
46+
47+
frame_distribution: dict[int, FileFrames] = {}
48+
49+
max_frames_per_file = (
50+
frames_per_block * blocks_per_file if blocks_per_file else frame_count
51+
)
52+
# total frames written before one of the file writers has to start a new file
53+
frames_before_new_file = n_file_writers * max_frames_per_file
54+
frames_per_file_writer = _get_frames_per_file_writer(
55+
frame_count, frames_per_block, n_file_writers
56+
)
57+
for file_writer_idx, n_frames in enumerate(frames_per_file_writer):
58+
n_files = math.ceil(n_frames / max_frames_per_file)
59+
for i in range(n_files):
60+
file_idx = file_writer_idx + i * n_file_writers
61+
62+
frame_distribution[file_idx + 1] = FileFrames(
63+
frames=min(n_frames - i * max_frames_per_file, max_frames_per_file),
64+
frames_per_block=frames_per_block,
65+
start=frames_per_block * file_writer_idx
66+
+ file_idx // n_file_writers * frames_before_new_file,
67+
)
68+
69+
return frame_distribution
70+
71+
72+
def create_interleave_vds(
73+
path: Path,
74+
prefix: str,
75+
datasets: list[str],
76+
frame_count: int,
77+
frames_per_block: int,
78+
blocks_per_file: int,
79+
frame_shape: tuple[int, int],
80+
dtype: str = "float",
81+
n_file_writers: int = 4,
82+
) -> None:
83+
frame_distribution = _calculate_frame_distribution(
84+
frame_count, frames_per_block, blocks_per_file, n_file_writers
85+
)
86+
stride = n_file_writers * frames_per_block
87+
filepath = f"{path / prefix}_vds.h5"
88+
logger.info(f"Writing virtual dataset at {filepath}")
89+
with h5py.File(f"{path / prefix}_vds.h5", "w", libver="latest") as f:
90+
for dataset_name in datasets:
91+
v_layout = h5py.VirtualLayout(
92+
shape=(frame_count, frame_shape[0], frame_shape[1]),
93+
dtype=dtype,
94+
)
95+
for file_number, file_frames in frame_distribution.items():
96+
full_block_frames = file_frames.blocks * frames_per_block
97+
v_source = h5py.VirtualSource(
98+
f"{prefix}_{str(file_number).zfill(6)}.h5",
99+
name=dataset_name,
100+
shape=(file_frames.frames, frame_shape[0], frame_shape[1]),
101+
dtype=dtype,
102+
)
103+
if file_frames.blocks:
104+
source = v_source[:full_block_frames, :, :]
105+
v_layout[
106+
h5py.MultiBlockSlice(
107+
start=file_frames.start,
108+
stride=stride,
109+
count=file_frames.blocks,
110+
block=frames_per_block,
111+
),
112+
:,
113+
:,
114+
] = source
115+
if file_frames.remainder_frames:
116+
# Last few frames that don't fit into a block
117+
source = v_source[full_block_frames : file_frames.frames, :, :]
118+
v_layout[
119+
frame_count - file_frames.remainder_frames : frame_count, :, :
120+
] = source
121+
122+
f.create_virtual_dataset(dataset_name, v_layout)

0 commit comments

Comments
 (0)