From edc7358905953141fe96e316b7445b8175a13e7f Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Fri, 8 May 2026 13:56:54 -0700 Subject: [PATCH 1/2] Coalesce HTTP COG range reads and parse IFDs once per dask graph Two related performance fixes for the HTTP COG read path. P2 -- range coalescing. _read_cog_http used to fire one GET Range: request per tile through an 8-worker pool, so wall time scaled as ceil(N_tiles / 8) * RTT. COG tiles are stored sequentially, so a new helper coalesce_ranges merges adjacent (offset, length) entries whose gap is below a threshold (default 1 MB, configurable via XRSPATIAL_COG_COALESCE_GAP) and split_coalesced_bytes slices the returned bytes back per-tile. _HTTPSource grows a read_ranges_coalesced wrapper that calls the existing read_ranges underneath. On a 50 ms RTT mocked link with 64 tiles the un-coalesced path takes ~450 ms; the coalesced path takes ~100 ms and issues 2 GETs instead of 65. P5 -- once-per-graph IFD parsing. read_geotiff_dask used to call read_to_array per chunk, which on HTTP routed through _read_cog_http and fired a fresh 16 KB header GET each time. The IFD parse is now factored into _parse_cog_http_meta and the tile fetch into _fetch_decode_cog_http_tiles (which honours a window). read_geotiff_dask parses metadata once before constructing the dask graph and threads the parsed (header, ifd) into delayed tasks via an http_meta kwarg. A 16-chunk dask graph now issues a single 16 KB header GET instead of one per chunk. Public API is unchanged. Existing test_cog_http_concurrent suite still passes; new tests live in test_http_cog_coalesce.py. --- xrspatial/geotiff/__init__.py | 66 +++- xrspatial/geotiff/_reader.py | 292 +++++++++++++++--- .../geotiff/tests/test_http_cog_coalesce.py | 268 ++++++++++++++++ 3 files changed, 580 insertions(+), 46 deletions(-) create mode 100644 xrspatial/geotiff/tests/test_http_cog_coalesce.py diff --git a/xrspatial/geotiff/__init__.py b/xrspatial/geotiff/__init__.py index 11e740d0..eb429782 100644 --- a/xrspatial/geotiff/__init__.py +++ b/xrspatial/geotiff/__init__.py @@ -1269,9 +1269,36 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, if isinstance(source, str) and source.lower().endswith('.vrt'): return read_vrt(source, dtype=dtype, name=name, chunks=chunks) - # Metadata-only read: O(1) memory via mmap, no pixel decompression - geo_info, full_h, full_w, file_dtype, n_bands = _read_geo_info( - source, overview_level=overview_level) + # P5: HTTP COG sources used to fire one IFD/header GET per chunk + # task. Parse metadata once here so every delayed task can reuse it. + is_http = ( + isinstance(source, str) + and source.startswith(('http://', 'https://')) + ) + http_meta = None + if is_http: + from ._reader import _HTTPSource, _parse_cog_http_meta + _src = _HTTPSource(source) + try: + http_header, http_ifd, http_geo, _ = _parse_cog_http_meta( + _src, overview_level=overview_level) + finally: + _src.close() + http_meta = (http_header, http_ifd) + geo_info = http_geo + full_h = http_ifd.height + full_w = http_ifd.width + from ._dtypes import resolve_bits_per_sample, tiff_dtype_to_numpy + bps = resolve_bits_per_sample(http_ifd.bits_per_sample) + file_dtype = tiff_dtype_to_numpy(bps, http_ifd.sample_format) + n_bands = ( + http_ifd.samples_per_pixel + if http_ifd.samples_per_pixel > 1 else 0 + ) + else: + # Metadata-only read: O(1) memory via mmap, no pixel decompression + geo_info, full_h, full_w, file_dtype, n_bands = _read_geo_info( + source, overview_level=overview_level) nodata = geo_info.nodata # Nodata masking promotes integer arrays to float64 (for NaN). @@ -1353,7 +1380,8 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata, band_arg, - target_dtype=target_dtype if dtype is not None else None), + target_dtype=target_dtype if dtype is not None else None, + http_meta=http_meta), shape=block_shape, dtype=target_dtype, ) @@ -1374,13 +1402,35 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, def _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata, - band, *, target_dtype=None): - """Dask-delayed function to read a single window.""" + band, *, target_dtype=None, http_meta=None): + """Dask-delayed function to read a single window. + + *http_meta* is an optional ``(TIFFHeader, IFD)`` tuple parsed once by + :func:`read_geotiff_dask`. When supplied, HTTP delayed tasks reuse + it instead of re-fetching the leading IFD bytes per chunk (P5). + For local sources this argument is ignored. + """ import dask + @dask.delayed def _read(): - arr, _ = read_to_array(source, window=(r0, c0, r1, c1), - overview_level=overview_level, band=band) + if http_meta is not None and isinstance(source, str) and \ + source.startswith(('http://', 'https://')): + from ._reader import _HTTPSource, _fetch_decode_cog_http_tiles + header, ifd = http_meta + src = _HTTPSource(source) + try: + arr = _fetch_decode_cog_http_tiles( + src, header, ifd, window=(r0, c0, r1, c1)) + finally: + src.close() + if (arr.ndim == 3 and ifd.samples_per_pixel > 1 + and band is not None): + arr = arr[:, :, band] + else: + arr, _ = read_to_array(source, window=(r0, c0, r1, c1), + overview_level=overview_level, + band=band) if nodata is not None: if arr.dtype.kind == 'f' and not np.isnan(nodata): arr = arr.copy() diff --git a/xrspatial/geotiff/_reader.py b/xrspatial/geotiff/_reader.py index a99f1e63..bb640d64 100644 --- a/xrspatial/geotiff/_reader.py +++ b/xrspatial/geotiff/_reader.py @@ -270,6 +270,109 @@ def _get_http_pool(): _http_pool = None +# --------------------------------------------------------------------------- +# HTTP range coalescing +# --------------------------------------------------------------------------- + +#: Default gap threshold (bytes) for merging adjacent COG tile ranges into a +#: single GET. COG tiles are stored sequentially, so most adjacent ranges +#: differ by zero (back-to-back) or a few bytes; 1 MB tolerates small holes +#: caused by interleaved overview/mask data without ballooning over-fetch. +#: Most tiles are well under 1 MB compressed, so the coalesced GET stays +#: O(num_tiles) bytes plus at most one threshold of slack between tiles. +COALESCE_GAP_THRESHOLD_DEFAULT = 1 << 20 # 1 MB + + +def coalesce_ranges( + ranges: list[tuple[int, int]], + gap_threshold: int = COALESCE_GAP_THRESHOLD_DEFAULT, +) -> tuple[list[tuple[int, int]], list[tuple[int, int, int]]]: + """Merge nearby ``(offset, length)`` ranges into fewer larger ones. + + Parameters + ---------- + ranges : list of (offset, length) + Per-tile byte ranges to fetch. Order is preserved in the + ``mapping`` output so callers can reassemble per-tile bytes. + gap_threshold : int + Maximum gap, in bytes, between two adjacent ranges before they + are merged. A gap of zero means perfectly back-to-back; larger + gaps trade some over-fetch for fewer round-trips. + + Returns + ------- + merged : list of (start, length) + Coalesced ranges, sorted by ``start``. Issue one GET per entry. + mapping : list of (merged_idx, rel_offset, length) + For each input range (in input order), the index of the merged + range its bytes live in, the offset within that merged range, + and the original length. Use with :func:`split_coalesced_bytes`. + + Notes + ----- + Empty input returns ``([], [])``. Negative gap thresholds disable + merging entirely (every input becomes its own merged range). + """ + if not ranges: + return [], [] + + # Tag each input with its original index so we can rebuild mapping. + indexed = sorted( + ((off, length, i) for i, (off, length) in enumerate(ranges)), + key=lambda t: t[0], + ) + + merged: list[tuple[int, int]] = [] + # mapping[input_idx] -> (merged_idx, rel_offset, length) + mapping: list[tuple[int, int, int]] = [(0, 0, 0)] * len(ranges) + + cur_start, cur_length, first_idx = indexed[0] + cur_end = cur_start + cur_length + members = [(first_idx, cur_start, cur_length)] + + for off, length, orig_idx in indexed[1:]: + gap = off - cur_end + if gap_threshold >= 0 and gap <= gap_threshold: + # Extend current merged range. Gaps may be negative if a + # later-listed range overlaps an earlier one; clamp so the + # merged length covers both. + new_end = max(cur_end, off + length) + cur_length = new_end - cur_start + cur_end = new_end + members.append((orig_idx, off, length)) + else: + merged_idx = len(merged) + merged.append((cur_start, cur_length)) + for orig, m_off, m_len in members: + mapping[orig] = (merged_idx, m_off - cur_start, m_len) + cur_start, cur_length, cur_end = off, length, off + length + members = [(orig_idx, off, length)] + + merged_idx = len(merged) + merged.append((cur_start, cur_length)) + for orig, m_off, m_len in members: + mapping[orig] = (merged_idx, m_off - cur_start, m_len) + + return merged, mapping + + +def split_coalesced_bytes( + merged_bytes: list[bytes], + mapping: list[tuple[int, int, int]], +) -> list[bytes]: + """Slice merged-GET payloads back into per-tile bytes using *mapping*. + + Inverse of :func:`coalesce_ranges`. ``merged_bytes[i]`` must be the + bytes returned by the GET for the ``i``th merged range; the output + is one bytes object per original input range, in input order. + """ + out: list[bytes] = [b''] * len(mapping) + for orig_idx, (merged_idx, rel_off, length) in enumerate(mapping): + chunk = merged_bytes[merged_idx] + out[orig_idx] = chunk[rel_off:rel_off + length] + return out + + class _HTTPSource: """HTTP data source using range requests with connection reuse. @@ -332,6 +435,29 @@ def read_ranges( return results # type: ignore[return-value] + def read_ranges_coalesced( + self, + ranges: list[tuple[int, int]], + max_workers: int = 8, + gap_threshold: int = COALESCE_GAP_THRESHOLD_DEFAULT, + ) -> list[bytes]: + """Fetch *ranges* using merged GETs where adjacent ranges allow it. + + Wrapper around :meth:`read_ranges` that first calls + :func:`coalesce_ranges` to group nearby ranges into fewer larger + GETs, then splits the responses back per-input via + :func:`split_coalesced_bytes`. Returns bytes in input order, same + as :meth:`read_ranges`. + + Setting *gap_threshold* to a negative number disables merging + and falls back to one GET per input range. + """ + if not ranges: + return [] + merged, mapping = coalesce_ranges(ranges, gap_threshold=gap_threshold) + merged_bytes = self.read_ranges(merged, max_workers=max_workers) + return split_coalesced_bytes(merged_bytes, mapping) + def read_all(self) -> bytes: if self._pool is not None: resp = self._pool.request('GET', self._url) @@ -945,6 +1071,38 @@ def _decode_one(job): # COG HTTP reader # --------------------------------------------------------------------------- +def _parse_cog_http_meta( + source: _HTTPSource, + overview_level: int | None = None, +) -> tuple[TIFFHeader, IFD, GeoInfo, bytes]: + """Fetch + parse the leading IFDs of an HTTP COG once. + + Issues one (or rarely two) range GET(s) for the leading 16 KB / 64 KB + of the file, parses the header and IFD list, and returns the selected + IFD plus the raw header bytes (kept for ``extract_geo_info`` callers + that might want the IFD's tag offsets). + + Pulled out of :func:`_read_cog_http` so :func:`read_geotiff_dask` + can parse metadata once per graph rather than once per chunk task + (P5: each delayed task used to fire its own 16 KB header GET). + """ + header_bytes = source.read_range(0, 16384) + header = parse_header(header_bytes) + ifds = parse_all_ifds(header_bytes, header) + + # If we didn't get all IFDs, try a larger fetch + if len(ifds) == 0: + header_bytes = source.read_range(0, 65536) + ifds = parse_all_ifds(header_bytes, header) + + if len(ifds) == 0: + raise ValueError("No IFDs found in COG") + + ifd = select_overview_ifd(ifds, overview_level) + geo_info = extract_geo_info(ifd, header_bytes, header.byte_order) + return header, ifd, geo_info, header_bytes + + def _read_cog_http(url: str, overview_level: int | None = None, band: int | None = None, max_pixels: int = MAX_PIXELS_DEFAULT, @@ -972,35 +1130,43 @@ def _read_cog_http(url: str, overview_level: int | None = None, (array, geo_info) tuple """ source = _HTTPSource(url) + header, ifd, geo_info, header_bytes = _parse_cog_http_meta( + source, overview_level=overview_level) - # Initial fetch: get header + IFDs (COGs put metadata first) - header_bytes = source.read_range(0, 16384) - - header = parse_header(header_bytes) - ifds = parse_all_ifds(header_bytes, header) - - # If we didn't get all IFDs, try a larger fetch - if len(ifds) == 0: - header_bytes = source.read_range(0, 65536) - ifds = parse_all_ifds(header_bytes, header) - - if len(ifds) == 0: - raise ValueError("No IFDs found in COG") + arr = _fetch_decode_cog_http_tiles( + source, header, ifd, max_pixels=max_pixels, window=None) + source.close() + return arr, geo_info - # Select IFD based on overview level, skipping any mask IFDs - ifd = select_overview_ifd(ifds, overview_level) +def _fetch_decode_cog_http_tiles( + source: _HTTPSource, + header: TIFFHeader, + ifd: IFD, + *, + max_pixels: int = MAX_PIXELS_DEFAULT, + window: tuple[int, int, int, int] | None = None, +) -> np.ndarray: + """Fetch and decode the tiles of a tiled COG over HTTP. + + Pulled out of :func:`_read_cog_http` so that callers with + pre-parsed metadata (notably :func:`read_geotiff_dask`) can reuse a + single IFD parse across many tile-fetch calls. When *window* is + given, only tiles intersecting the window are fetched + decoded; + the result is sized to the (clamped) window rather than the full + image. Coalescing of adjacent ranges still applies. + """ bps = resolve_bits_per_sample(ifd.bits_per_sample) dtype = tiff_dtype_to_numpy(bps, ifd.sample_format) - geo_info = extract_geo_info(ifd, header_bytes, header.byte_order) - - # COGs are tiled -- fetch individual tiles if not ifd.is_tiled: - # Fallback: fetch entire file + # Stripped HTTP COG: fall back to a full read. Window is honoured + # by slicing the decoded array. all_data = source.read_all() arr = _read_strips(all_data, ifd, header, dtype) - source.close() - return arr, geo_info + if window is not None: + r0, c0, r1, c1 = window + return arr[r0:r1, c0:c1] + return arr width = ifd.width height = ifd.height @@ -1031,6 +1197,19 @@ def _read_cog_http(url: str, overview_level: int | None = None, # TileOffsets length. See issue #1219. validate_tile_layout(ifd) + if window is None: + r0_out, c0_out, r1_out, c1_out = 0, 0, height, width + else: + r0_out, c0_out, r1_out, c1_out = window + r0_out = max(0, r0_out) + c0_out = max(0, c0_out) + r1_out = min(height, r1_out) + c1_out = min(width, c1_out) + + out_h = r1_out - r0_out + out_w = c1_out - c0_out + _check_dimensions(out_w, out_h, samples, max_pixels) + # Sparse tiles (TileByteCounts == 0) need to land on the file's nodata # value (or 0 if unset) rather than uninitialised memory. Detect them # up front so the result buffer is pre-filled before tile placement. @@ -1038,13 +1217,18 @@ def _read_cog_http(url: str, overview_level: int | None = None, if sparse: fill = _sparse_fill_value(ifd, dtype) if samples > 1: - result = np.full((height, width, samples), fill, dtype=dtype) + result = np.full((out_h, out_w, samples), fill, dtype=dtype) else: - result = np.full((height, width), fill, dtype=dtype) + result = np.full((out_h, out_w), fill, dtype=dtype) elif samples > 1: - result = np.empty((height, width, samples), dtype=dtype) + result = np.empty((out_h, out_w, samples), dtype=dtype) else: - result = np.empty((height, width), dtype=dtype) + result = np.empty((out_h, out_w), dtype=dtype) + + tile_row_start = r0_out // th + tile_row_end = min(math.ceil(r1_out / th), tiles_down) + tile_col_start = c0_out // tw + tile_col_end = min(math.ceil(c1_out / tw), tiles_across) # Pass 1: collect every tile's range and where it lands in the output. # Empty tiles (byte_count == 0) and any tile_idx beyond the offsets @@ -1052,8 +1236,8 @@ def _read_cog_http(url: str, overview_level: int | None = None, # the placements list. fetch_ranges: list[tuple[int, int]] = [] placements: list[tuple[int, int]] = [] # (tr, tc) per fetched tile - for tr in range(tiles_down): - for tc in range(tiles_across): + for tr in range(tile_row_start, tile_row_end): + for tc in range(tile_col_start, tile_col_end): tile_idx = tr * tiles_across + tc if tile_idx >= len(offsets): continue @@ -1067,13 +1251,26 @@ def _read_cog_http(url: str, overview_level: int | None = None, # Pass 2: fetch all tile bytes in parallel. Worker pool size is tunable # via XRSPATIAL_COG_HTTP_WORKERS so users on very slow links can dial # it up without code changes. + # + # COG tile offsets are sorted and usually back-to-back, so we coalesce + # adjacent ranges into fewer larger GETs (P2). The 1 MB gap threshold + # tolerates small interleaved metadata between tiles without dragging + # in unrelated overview data. Set XRSPATIAL_COG_COALESCE_GAP=-1 to + # disable merging (one GET per tile, the legacy behaviour). try: workers = max(1, int(_os_module.environ.get('XRSPATIAL_COG_HTTP_WORKERS', '8'))) except ValueError: workers = 8 - tile_bytes_list = source.read_ranges(fetch_ranges, max_workers=workers) + try: + gap = int(_os_module.environ.get( + 'XRSPATIAL_COG_COALESCE_GAP', + str(COALESCE_GAP_THRESHOLD_DEFAULT))) + except ValueError: + gap = COALESCE_GAP_THRESHOLD_DEFAULT + tile_bytes_list = source.read_ranges_coalesced( + fetch_ranges, max_workers=workers, gap_threshold=gap) - # Pass 3: decode each tile and place it. + # Pass 3: decode each tile and place it (clipped to the window). for (tr, tc), tile_data in zip(placements, tile_bytes_list): tile_pixels = _decode_strip_or_tile( tile_data, compression, tw, th, samples, @@ -1081,16 +1278,35 @@ def _read_cog_http(url: str, overview_level: int | None = None, byte_order=header.byte_order, jpeg_tables=jpeg_tables) - y0 = tr * th - x0 = tc * tw - y1 = min(y0 + th, height) - x1 = min(x0 + tw, width) - actual_h = y1 - y0 - actual_w = x1 - x0 - result[y0:y1, x0:x1] = tile_pixels[:actual_h, :actual_w] + # Tile position in image coordinates. + ty0 = tr * th + tx0 = tc * tw + ty1 = ty0 + th + tx1 = tx0 + tw + + # Intersect with the requested window. + iy0 = max(ty0, r0_out) + ix0 = max(tx0, c0_out) + iy1 = min(ty1, r1_out) + ix1 = min(tx1, c1_out) + if iy1 <= iy0 or ix1 <= ix0: + continue + + # Source slice within the decoded tile pixels. + sy0 = iy0 - ty0 + sx0 = ix0 - tx0 + sy1 = sy0 + (iy1 - iy0) + sx1 = sx0 + (ix1 - ix0) + + # Destination slice within the output buffer. + dy0 = iy0 - r0_out + dx0 = ix0 - c0_out + dy1 = iy1 - r0_out + dx1 = ix1 - c0_out + + result[dy0:dy1, dx0:dx1] = tile_pixels[sy0:sy1, sx0:sx1] - source.close() - return result, geo_info + return result # --------------------------------------------------------------------------- diff --git a/xrspatial/geotiff/tests/test_http_cog_coalesce.py b/xrspatial/geotiff/tests/test_http_cog_coalesce.py new file mode 100644 index 00000000..3b512a6d --- /dev/null +++ b/xrspatial/geotiff/tests/test_http_cog_coalesce.py @@ -0,0 +1,268 @@ +"""Coalesced HTTP COG range reads + once-per-graph IFD parsing. + +Covers two performance fixes: + +* P2 -- :func:`coalesce_ranges` merges adjacent tile byte ranges into + fewer larger GETs so HTTP wall time is bounded by ``ceil(N_merged/W) * + RTT`` rather than ``ceil(N_tiles/W) * RTT``. +* P5 -- :func:`read_geotiff_dask` parses IFDs once per graph and threads + the parsed metadata into delayed tasks, so an N-chunk HTTP COG no + longer fires N separate 16 KB header GETs. +""" +from __future__ import annotations + +import threading +import time + +import numpy as np +import pytest + +from xrspatial.geotiff import open_geotiff +from xrspatial.geotiff import read_geotiff_dask +from xrspatial.geotiff._reader import ( + COALESCE_GAP_THRESHOLD_DEFAULT, + _HTTPSource, + coalesce_ranges, + split_coalesced_bytes, +) +from xrspatial.geotiff._writer import write + + +# --------------------------------------------------------------------------- +# Pure unit tests on the coalescer +# --------------------------------------------------------------------------- + +def test_coalesce_empty_input(): + merged, mapping = coalesce_ranges([]) + assert merged == [] + assert mapping == [] + + +def test_coalesce_single_range(): + merged, mapping = coalesce_ranges([(100, 50)]) + assert merged == [(100, 50)] + # One input -> one merged entry, rel_offset 0, length 50. + assert mapping == [(0, 0, 50)] + + +def test_coalesce_merges_adjacent_ranges(): + # Three back-to-back ranges, each within the default gap threshold + # of the next, should collapse into a single merged GET. + ranges = [(0, 100), (100, 50), (150, 25)] + merged, mapping = coalesce_ranges(ranges) + assert len(merged) == 1 + start, length = merged[0] + assert start == 0 + assert length == 175 + # Each input maps back to merged_idx 0 with the right offset. + assert mapping[0] == (0, 0, 100) + assert mapping[1] == (0, 100, 50) + assert mapping[2] == (0, 150, 25) + + +def test_coalesce_does_not_merge_when_gap_exceeds_threshold(): + # 200-byte gap with gap_threshold=50 must split. + ranges = [(0, 100), (300, 50)] + merged, mapping = coalesce_ranges(ranges, gap_threshold=50) + assert merged == [(0, 100), (300, 50)] + assert mapping[0] == (0, 0, 100) + assert mapping[1] == (1, 0, 50) + + +def test_coalesce_with_unsorted_input(): + # Coalescing should sort by offset and still produce correct mapping + # in input order. + ranges = [(200, 30), (0, 100), (100, 50)] + merged, mapping = coalesce_ranges(ranges) + # All three are within 1 MB of each other so they merge into one. + assert len(merged) == 1 + start, length = merged[0] + assert start == 0 + assert length == 230 + # mapping is in input order, not sort order. + assert mapping[0] == (0, 200, 30) + assert mapping[1] == (0, 0, 100) + assert mapping[2] == (0, 100, 50) + + +def test_coalesce_negative_threshold_disables_merging(): + ranges = [(0, 10), (10, 10), (20, 10)] + merged, mapping = coalesce_ranges(ranges, gap_threshold=-1) + # Every input becomes its own merged range. + assert len(merged) == 3 + for i, (orig, _) in enumerate(zip(ranges, merged)): + assert mapping[i][0] != mapping[(i + 1) % 3][0] or i == 2 + + +def test_coalesce_split_recovers_per_tile_bytes(): + # Round-trip: real bytes through a fake fetcher that mirrors what + # urllib3 would return for a Range request. + payload = bytes(range(256)) * 4 # 1024 unique-ish bytes + ranges = [(0, 100), (100, 50), (200, 30), (1000, 20)] + + merged, mapping = coalesce_ranges(ranges, gap_threshold=200) + # First three merge (gaps 0 and 50 -> within threshold), last splits + # (gap of 770 from end of third range). + assert len(merged) == 2 + + merged_bytes = [payload[s:s + le] for (s, le) in merged] + out = split_coalesced_bytes(merged_bytes, mapping) + + for (off, length), tile in zip(ranges, out): + assert tile == payload[off:off + length] + + +# --------------------------------------------------------------------------- +# Mocked HTTP source for perf and call-count assertions +# --------------------------------------------------------------------------- + +class _MockHTTPSource(_HTTPSource): + """``_HTTPSource`` that serves bytes from an in-memory buffer. + + Records every ``read_range`` call and optionally sleeps to simulate + RTT. Tracks calls separately by ``(start, length)`` so tests can + assert how many tile fetches versus IFD fetches happened. + """ + + def __init__(self, buf: bytes, rtt: float = 0.0): + self._url = 'mock://' + self._size = len(buf) + self._pool = None + self._buf = buf + self._rtt = rtt + self.calls: list[tuple[int, int]] = [] + self._lock = threading.Lock() + + def read_range(self, start: int, length: int) -> bytes: + with self._lock: + self.calls.append((start, length)) + if self._rtt > 0: + time.sleep(self._rtt) + return self._buf[start:start + length] + + def read_all(self) -> bytes: + with self._lock: + self.calls.append((0, len(self._buf))) + if self._rtt > 0: + time.sleep(self._rtt) + return self._buf + + +@pytest.fixture +def small_cog_bytes(tmp_path): + """Build a small tiled COG and return its raw bytes.""" + arr = np.arange(64 * 64, dtype=np.float32).reshape(64, 64) + path = str(tmp_path / 'cog.tif') + # 8x8 tile grid (tile_size=8, image=64x64) gives 64 tiles -- enough + # tile count to make coalescing observable. + write(arr, path, compression='deflate', tiled=True, tile_size=8, + cog=True) + with open(path, 'rb') as f: + return f.read(), arr, path + + +def test_read_cog_http_uses_coalesced_fetches(small_cog_bytes, monkeypatch): + """One coalesced GET for many adjacent COG tiles instead of N GETs.""" + from xrspatial.geotiff import _reader as _reader_mod + + buf, expected, _ = small_cog_bytes + src = _MockHTTPSource(buf) + + def _fake_http_source(url): + return src + + monkeypatch.setattr(_reader_mod, '_HTTPSource', _fake_http_source) + + arr, _ = _reader_mod._read_cog_http('http://mock/cog.tif') + np.testing.assert_array_equal(arr, expected) + + # All calls. We expect one or two header GETs (16 KB / optional 64 KB) + # plus ONE merged tile GET, not 64 tile GETs. + tile_fetches = [ + (s, le) for (s, le) in src.calls + if s != 0 or le > 65536 # exclude header reads + ] + assert len(tile_fetches) == 1, ( + f'expected a single coalesced tile fetch, got {len(tile_fetches)}: ' + f'{tile_fetches[:5]}' + ) + + +def test_read_cog_http_perf_with_mock_rtt(small_cog_bytes, monkeypatch): + """Coalesced HTTP should beat the un-coalesced baseline at 50 ms RTT.""" + from xrspatial.geotiff import _reader as _reader_mod + + buf, expected, _ = small_cog_bytes + rtt = 0.05 + + # Baseline: disable coalescing via env var so each tile costs an RTT. + monkeypatch.setenv('XRSPATIAL_COG_COALESCE_GAP', '-1') + src1 = _MockHTTPSource(buf, rtt=rtt) + monkeypatch.setattr(_reader_mod, '_HTTPSource', lambda url: src1) + t0 = time.perf_counter() + arr1, _ = _reader_mod._read_cog_http('http://mock/cog.tif') + baseline = time.perf_counter() - t0 + np.testing.assert_array_equal(arr1, expected) + + # Coalesced path: leave env unset (default 1 MB threshold). + monkeypatch.delenv('XRSPATIAL_COG_COALESCE_GAP', raising=False) + src2 = _MockHTTPSource(buf, rtt=rtt) + monkeypatch.setattr(_reader_mod, '_HTTPSource', lambda url: src2) + t0 = time.perf_counter() + arr2, _ = _reader_mod._read_cog_http('http://mock/cog.tif') + coalesced = time.perf_counter() - t0 + np.testing.assert_array_equal(arr2, expected) + + # Coalesced has to be at least 2x faster on a 50 ms RTT 64-tile read. + # Baseline = ceil(64/8) * 50 ms = 400 ms; coalesced = ~50 ms for the + # single merged GET plus ~50 ms for the IFD read = ~100 ms. + assert coalesced * 2 < baseline, ( + f'coalesced wall time {coalesced:.3f}s should be much less than ' + f'baseline {baseline:.3f}s' + ) + + +# --------------------------------------------------------------------------- +# read_geotiff_dask: IFD parsing call count and correctness +# --------------------------------------------------------------------------- + +def test_dask_local_correctness(small_cog_bytes): + """Dask read of a local COG must equal the eager read bit-for-bit.""" + _, expected, path = small_cog_bytes + eager = open_geotiff(path) + lazy = read_geotiff_dask(path, chunks=16).compute() + np.testing.assert_array_equal(np.asarray(eager), np.asarray(lazy)) + np.testing.assert_array_equal(np.asarray(eager), expected) + + +def test_dask_http_parses_ifds_once(small_cog_bytes, monkeypatch): + """An N-chunk HTTP graph must trigger at most one IFD-header GET.""" + from xrspatial.geotiff import _reader as _reader_mod + + buf, expected, _ = small_cog_bytes + src_holder: list[_MockHTTPSource] = [] + + def _fake_http_source(url): + s = _MockHTTPSource(buf) + src_holder.append(s) + return s + + monkeypatch.setattr(_reader_mod, '_HTTPSource', _fake_http_source) + + # 16x16 chunks on 64x64 -> 16 chunks. Without P5 each chunk would + # spawn its own _HTTPSource and fire its own (0, 16384) GET. + da_arr = read_geotiff_dask('http://mock/cog.tif', chunks=16).compute() + np.testing.assert_array_equal(np.asarray(da_arr), expected) + + # Count "header" GETs across every _HTTPSource instance the read + # path created. The header probe is exactly (0, 16384) (or the + # fallback (0, 65536)). + header_calls = 0 + for s in src_holder: + for (start, length) in s.calls: + if start == 0 and length in (16384, 65536): + header_calls += 1 + assert header_calls <= 1, ( + f'expected <=1 header GETs across the dask graph, got ' + f'{header_calls} (over {len(src_holder)} sources)' + ) From df224c27bf21126e4127df05762fee473daca712 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Sat, 9 May 2026 14:12:37 -0700 Subject: [PATCH 2/2] Address Copilot review on #1534 Three findings, all acted on: - _parse_cog_http_meta retry comment claimed "If we didn't get all IFDs, try a larger fetch" but the gate is `len(ifds) == 0`, not partial-IFD recovery. Reword the comment to match: parse_all_ifds bails when the header GET lands short of the first IFD offset, and the retry expands the window in that one case. Overviews past the first 16 KiB still come from later reads. - _read_tiles now skips the full-image _check_dimensions(width, height, ...) gate when a window is provided. With the windowed HTTP path, dask-chunked reads of multi-billion-pixel COGs only materialise the window; the full-image cap was rejecting legitimate reads. The single-tile budget always applies, and the full-image cap still applies for whole-file reads (window is None) and the windowed output cap (out_h * out_w * samples) is unchanged. - read_geotiff_dask wraps the parsed (TIFFHeader, IFD) in a single dask.delayed key (`http_meta_key`) and threads it as a function argument into _delayed_read_window's inner @dask.delayed task, rather than capturing it via Python closure. Distributed/process schedulers now serialise the metadata once across the whole graph instead of once per chunk. On COGs with millions of tiles (and correspondingly large TileOffsets/TileByteCounts tuples in the IFD) this saves O(N_chunks) of redundant pickle work. --- xrspatial/geotiff/__init__.py | 27 +++++++++++++++++++-------- xrspatial/geotiff/_reader.py | 15 ++++++++++++--- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/xrspatial/geotiff/__init__.py b/xrspatial/geotiff/__init__.py index eb429782..a6e2ef97 100644 --- a/xrspatial/geotiff/__init__.py +++ b/xrspatial/geotiff/__init__.py @@ -1276,7 +1276,9 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, and source.startswith(('http://', 'https://')) ) http_meta = None + http_meta_key = None if is_http: + import dask from ._reader import _HTTPSource, _parse_cog_http_meta _src = _HTTPSource(source) try: @@ -1285,6 +1287,13 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, finally: _src.close() http_meta = (http_header, http_ifd) + # Wrap the parsed metadata in a single dask Delayed so every + # window task takes it as a graph input, not a Python closure. + # Without this, the (TIFFHeader, IFD) pair -- which can carry + # multi-million-entry TileOffsets/TileByteCounts tuples on + # large COGs -- would be embedded in each chunk task and + # serialised N times under distributed/process schedulers. + http_meta_key = dask.delayed(http_meta, pure=True) geo_info = http_geo full_h = http_ifd.height full_w = http_ifd.width @@ -1381,7 +1390,7 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, overview_level, nodata, band_arg, target_dtype=target_dtype if dtype is not None else None, - http_meta=http_meta), + http_meta_key=http_meta_key), shape=block_shape, dtype=target_dtype, ) @@ -1402,18 +1411,20 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, def _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata, - band, *, target_dtype=None, http_meta=None): + band, *, target_dtype=None, http_meta_key=None): """Dask-delayed function to read a single window. - *http_meta* is an optional ``(TIFFHeader, IFD)`` tuple parsed once by - :func:`read_geotiff_dask`. When supplied, HTTP delayed tasks reuse - it instead of re-fetching the leading IFD bytes per chunk (P5). - For local sources this argument is ignored. + *http_meta_key* is an optional ``Delayed[(TIFFHeader, IFD)]`` parsed + once by :func:`read_geotiff_dask` and wrapped via ``dask.delayed``. + Passing it as a function argument (rather than a closure capture) + makes the metadata a single graph input that all window tasks + depend on, so distributed/process schedulers serialise it once + instead of once per chunk. For local sources it is ``None``. """ import dask @dask.delayed - def _read(): + def _read(http_meta): if http_meta is not None and isinstance(source, str) and \ source.startswith(('http://', 'https://')): from ._reader import _HTTPSource, _fetch_decode_cog_http_tiles @@ -1443,7 +1454,7 @@ def _read(): if target_dtype is not None: arr = arr.astype(target_dtype) return arr - return _read() + return _read(http_meta_key) def read_geotiff_gpu(source: str, *, diff --git a/xrspatial/geotiff/_reader.py b/xrspatial/geotiff/_reader.py index bb640d64..677d330c 100644 --- a/xrspatial/geotiff/_reader.py +++ b/xrspatial/geotiff/_reader.py @@ -1090,7 +1090,11 @@ def _parse_cog_http_meta( header = parse_header(header_bytes) ifds = parse_all_ifds(header_bytes, header) - # If we didn't get all IFDs, try a larger fetch + # parse_all_ifds bails the moment it walks past the bytes we + # fetched, so a header GET that lands short of the first IFD's + # offset returns an empty list. Retry with a larger window in that + # case; this is *not* a partial-IFD recovery (overviews chained + # past the first 16 KiB are still loaded lazily by other readers). if len(ifds) == 0: header_bytes = source.read_range(0, 65536) ifds = parse_all_ifds(header_bytes, header) @@ -1189,8 +1193,13 @@ def _fetch_decode_cog_http_tiles( tiles_across = math.ceil(width / tw) tiles_down = math.ceil(height / th) - _check_dimensions(width, height, samples, max_pixels) - # A single tile's decoded bytes must also fit under the pixel budget. + # Cap the *materialised* pixel count, not the declared image size. + # A windowed HTTP read of a multi-billion-pixel COG only allocates + # the window, so capping the full image would reject legitimate + # tiled reads. The full-image cap still applies for whole-file + # reads (window is None). The single-tile budget always applies. + if window is None: + _check_dimensions(width, height, samples, max_pixels) _check_dimensions(tw, th, samples, max_pixels) # Reject malformed TIFFs whose declared tile grid exceeds the supplied