diff --git a/xrspatial/geotiff/__init__.py b/xrspatial/geotiff/__init__.py index 11e740d0..eb429782 100644 --- a/xrspatial/geotiff/__init__.py +++ b/xrspatial/geotiff/__init__.py @@ -1269,9 +1269,36 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, if isinstance(source, str) and source.lower().endswith('.vrt'): return read_vrt(source, dtype=dtype, name=name, chunks=chunks) - # Metadata-only read: O(1) memory via mmap, no pixel decompression - geo_info, full_h, full_w, file_dtype, n_bands = _read_geo_info( - source, overview_level=overview_level) + # P5: HTTP COG sources used to fire one IFD/header GET per chunk + # task. Parse metadata once here so every delayed task can reuse it. + is_http = ( + isinstance(source, str) + and source.startswith(('http://', 'https://')) + ) + http_meta = None + if is_http: + from ._reader import _HTTPSource, _parse_cog_http_meta + _src = _HTTPSource(source) + try: + http_header, http_ifd, http_geo, _ = _parse_cog_http_meta( + _src, overview_level=overview_level) + finally: + _src.close() + http_meta = (http_header, http_ifd) + geo_info = http_geo + full_h = http_ifd.height + full_w = http_ifd.width + from ._dtypes import resolve_bits_per_sample, tiff_dtype_to_numpy + bps = resolve_bits_per_sample(http_ifd.bits_per_sample) + file_dtype = tiff_dtype_to_numpy(bps, http_ifd.sample_format) + n_bands = ( + http_ifd.samples_per_pixel + if http_ifd.samples_per_pixel > 1 else 0 + ) + else: + # Metadata-only read: O(1) memory via mmap, no pixel decompression + geo_info, full_h, full_w, file_dtype, n_bands = _read_geo_info( + source, overview_level=overview_level) nodata = geo_info.nodata # Nodata masking promotes integer arrays to float64 (for NaN). @@ -1353,7 +1380,8 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata, band_arg, - target_dtype=target_dtype if dtype is not None else None), + target_dtype=target_dtype if dtype is not None else None, + http_meta=http_meta), shape=block_shape, dtype=target_dtype, ) @@ -1374,13 +1402,35 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, def _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata, - band, *, target_dtype=None): - """Dask-delayed function to read a single window.""" + band, *, target_dtype=None, http_meta=None): + """Dask-delayed function to read a single window. + + *http_meta* is an optional ``(TIFFHeader, IFD)`` tuple parsed once by + :func:`read_geotiff_dask`. When supplied, HTTP delayed tasks reuse + it instead of re-fetching the leading IFD bytes per chunk (P5). + For local sources this argument is ignored. + """ import dask + @dask.delayed def _read(): - arr, _ = read_to_array(source, window=(r0, c0, r1, c1), - overview_level=overview_level, band=band) + if http_meta is not None and isinstance(source, str) and \ + source.startswith(('http://', 'https://')): + from ._reader import _HTTPSource, _fetch_decode_cog_http_tiles + header, ifd = http_meta + src = _HTTPSource(source) + try: + arr = _fetch_decode_cog_http_tiles( + src, header, ifd, window=(r0, c0, r1, c1)) + finally: + src.close() + if (arr.ndim == 3 and ifd.samples_per_pixel > 1 + and band is not None): + arr = arr[:, :, band] + else: + arr, _ = read_to_array(source, window=(r0, c0, r1, c1), + overview_level=overview_level, + band=band) if nodata is not None: if arr.dtype.kind == 'f' and not np.isnan(nodata): arr = arr.copy() diff --git a/xrspatial/geotiff/_reader.py b/xrspatial/geotiff/_reader.py index a99f1e63..bb640d64 100644 --- a/xrspatial/geotiff/_reader.py +++ b/xrspatial/geotiff/_reader.py @@ -270,6 +270,109 @@ def _get_http_pool(): _http_pool = None +# --------------------------------------------------------------------------- +# HTTP range coalescing +# --------------------------------------------------------------------------- + +#: Default gap threshold (bytes) for merging adjacent COG tile ranges into a +#: single GET. COG tiles are stored sequentially, so most adjacent ranges +#: differ by zero (back-to-back) or a few bytes; 1 MB tolerates small holes +#: caused by interleaved overview/mask data without ballooning over-fetch. +#: Most tiles are well under 1 MB compressed, so the coalesced GET stays +#: O(num_tiles) bytes plus at most one threshold of slack between tiles. +COALESCE_GAP_THRESHOLD_DEFAULT = 1 << 20 # 1 MB + + +def coalesce_ranges( + ranges: list[tuple[int, int]], + gap_threshold: int = COALESCE_GAP_THRESHOLD_DEFAULT, +) -> tuple[list[tuple[int, int]], list[tuple[int, int, int]]]: + """Merge nearby ``(offset, length)`` ranges into fewer larger ones. + + Parameters + ---------- + ranges : list of (offset, length) + Per-tile byte ranges to fetch. Order is preserved in the + ``mapping`` output so callers can reassemble per-tile bytes. + gap_threshold : int + Maximum gap, in bytes, between two adjacent ranges before they + are merged. A gap of zero means perfectly back-to-back; larger + gaps trade some over-fetch for fewer round-trips. + + Returns + ------- + merged : list of (start, length) + Coalesced ranges, sorted by ``start``. Issue one GET per entry. + mapping : list of (merged_idx, rel_offset, length) + For each input range (in input order), the index of the merged + range its bytes live in, the offset within that merged range, + and the original length. Use with :func:`split_coalesced_bytes`. + + Notes + ----- + Empty input returns ``([], [])``. Negative gap thresholds disable + merging entirely (every input becomes its own merged range). + """ + if not ranges: + return [], [] + + # Tag each input with its original index so we can rebuild mapping. + indexed = sorted( + ((off, length, i) for i, (off, length) in enumerate(ranges)), + key=lambda t: t[0], + ) + + merged: list[tuple[int, int]] = [] + # mapping[input_idx] -> (merged_idx, rel_offset, length) + mapping: list[tuple[int, int, int]] = [(0, 0, 0)] * len(ranges) + + cur_start, cur_length, first_idx = indexed[0] + cur_end = cur_start + cur_length + members = [(first_idx, cur_start, cur_length)] + + for off, length, orig_idx in indexed[1:]: + gap = off - cur_end + if gap_threshold >= 0 and gap <= gap_threshold: + # Extend current merged range. Gaps may be negative if a + # later-listed range overlaps an earlier one; clamp so the + # merged length covers both. + new_end = max(cur_end, off + length) + cur_length = new_end - cur_start + cur_end = new_end + members.append((orig_idx, off, length)) + else: + merged_idx = len(merged) + merged.append((cur_start, cur_length)) + for orig, m_off, m_len in members: + mapping[orig] = (merged_idx, m_off - cur_start, m_len) + cur_start, cur_length, cur_end = off, length, off + length + members = [(orig_idx, off, length)] + + merged_idx = len(merged) + merged.append((cur_start, cur_length)) + for orig, m_off, m_len in members: + mapping[orig] = (merged_idx, m_off - cur_start, m_len) + + return merged, mapping + + +def split_coalesced_bytes( + merged_bytes: list[bytes], + mapping: list[tuple[int, int, int]], +) -> list[bytes]: + """Slice merged-GET payloads back into per-tile bytes using *mapping*. + + Inverse of :func:`coalesce_ranges`. ``merged_bytes[i]`` must be the + bytes returned by the GET for the ``i``th merged range; the output + is one bytes object per original input range, in input order. + """ + out: list[bytes] = [b''] * len(mapping) + for orig_idx, (merged_idx, rel_off, length) in enumerate(mapping): + chunk = merged_bytes[merged_idx] + out[orig_idx] = chunk[rel_off:rel_off + length] + return out + + class _HTTPSource: """HTTP data source using range requests with connection reuse. @@ -332,6 +435,29 @@ def read_ranges( return results # type: ignore[return-value] + def read_ranges_coalesced( + self, + ranges: list[tuple[int, int]], + max_workers: int = 8, + gap_threshold: int = COALESCE_GAP_THRESHOLD_DEFAULT, + ) -> list[bytes]: + """Fetch *ranges* using merged GETs where adjacent ranges allow it. + + Wrapper around :meth:`read_ranges` that first calls + :func:`coalesce_ranges` to group nearby ranges into fewer larger + GETs, then splits the responses back per-input via + :func:`split_coalesced_bytes`. Returns bytes in input order, same + as :meth:`read_ranges`. + + Setting *gap_threshold* to a negative number disables merging + and falls back to one GET per input range. + """ + if not ranges: + return [] + merged, mapping = coalesce_ranges(ranges, gap_threshold=gap_threshold) + merged_bytes = self.read_ranges(merged, max_workers=max_workers) + return split_coalesced_bytes(merged_bytes, mapping) + def read_all(self) -> bytes: if self._pool is not None: resp = self._pool.request('GET', self._url) @@ -945,6 +1071,38 @@ def _decode_one(job): # COG HTTP reader # --------------------------------------------------------------------------- +def _parse_cog_http_meta( + source: _HTTPSource, + overview_level: int | None = None, +) -> tuple[TIFFHeader, IFD, GeoInfo, bytes]: + """Fetch + parse the leading IFDs of an HTTP COG once. + + Issues one (or rarely two) range GET(s) for the leading 16 KB / 64 KB + of the file, parses the header and IFD list, and returns the selected + IFD plus the raw header bytes (kept for ``extract_geo_info`` callers + that might want the IFD's tag offsets). + + Pulled out of :func:`_read_cog_http` so :func:`read_geotiff_dask` + can parse metadata once per graph rather than once per chunk task + (P5: each delayed task used to fire its own 16 KB header GET). + """ + header_bytes = source.read_range(0, 16384) + header = parse_header(header_bytes) + ifds = parse_all_ifds(header_bytes, header) + + # If we didn't get all IFDs, try a larger fetch + if len(ifds) == 0: + header_bytes = source.read_range(0, 65536) + ifds = parse_all_ifds(header_bytes, header) + + if len(ifds) == 0: + raise ValueError("No IFDs found in COG") + + ifd = select_overview_ifd(ifds, overview_level) + geo_info = extract_geo_info(ifd, header_bytes, header.byte_order) + return header, ifd, geo_info, header_bytes + + def _read_cog_http(url: str, overview_level: int | None = None, band: int | None = None, max_pixels: int = MAX_PIXELS_DEFAULT, @@ -972,35 +1130,43 @@ def _read_cog_http(url: str, overview_level: int | None = None, (array, geo_info) tuple """ source = _HTTPSource(url) + header, ifd, geo_info, header_bytes = _parse_cog_http_meta( + source, overview_level=overview_level) - # Initial fetch: get header + IFDs (COGs put metadata first) - header_bytes = source.read_range(0, 16384) - - header = parse_header(header_bytes) - ifds = parse_all_ifds(header_bytes, header) - - # If we didn't get all IFDs, try a larger fetch - if len(ifds) == 0: - header_bytes = source.read_range(0, 65536) - ifds = parse_all_ifds(header_bytes, header) - - if len(ifds) == 0: - raise ValueError("No IFDs found in COG") + arr = _fetch_decode_cog_http_tiles( + source, header, ifd, max_pixels=max_pixels, window=None) + source.close() + return arr, geo_info - # Select IFD based on overview level, skipping any mask IFDs - ifd = select_overview_ifd(ifds, overview_level) +def _fetch_decode_cog_http_tiles( + source: _HTTPSource, + header: TIFFHeader, + ifd: IFD, + *, + max_pixels: int = MAX_PIXELS_DEFAULT, + window: tuple[int, int, int, int] | None = None, +) -> np.ndarray: + """Fetch and decode the tiles of a tiled COG over HTTP. + + Pulled out of :func:`_read_cog_http` so that callers with + pre-parsed metadata (notably :func:`read_geotiff_dask`) can reuse a + single IFD parse across many tile-fetch calls. When *window* is + given, only tiles intersecting the window are fetched + decoded; + the result is sized to the (clamped) window rather than the full + image. Coalescing of adjacent ranges still applies. + """ bps = resolve_bits_per_sample(ifd.bits_per_sample) dtype = tiff_dtype_to_numpy(bps, ifd.sample_format) - geo_info = extract_geo_info(ifd, header_bytes, header.byte_order) - - # COGs are tiled -- fetch individual tiles if not ifd.is_tiled: - # Fallback: fetch entire file + # Stripped HTTP COG: fall back to a full read. Window is honoured + # by slicing the decoded array. all_data = source.read_all() arr = _read_strips(all_data, ifd, header, dtype) - source.close() - return arr, geo_info + if window is not None: + r0, c0, r1, c1 = window + return arr[r0:r1, c0:c1] + return arr width = ifd.width height = ifd.height @@ -1031,6 +1197,19 @@ def _read_cog_http(url: str, overview_level: int | None = None, # TileOffsets length. See issue #1219. validate_tile_layout(ifd) + if window is None: + r0_out, c0_out, r1_out, c1_out = 0, 0, height, width + else: + r0_out, c0_out, r1_out, c1_out = window + r0_out = max(0, r0_out) + c0_out = max(0, c0_out) + r1_out = min(height, r1_out) + c1_out = min(width, c1_out) + + out_h = r1_out - r0_out + out_w = c1_out - c0_out + _check_dimensions(out_w, out_h, samples, max_pixels) + # Sparse tiles (TileByteCounts == 0) need to land on the file's nodata # value (or 0 if unset) rather than uninitialised memory. Detect them # up front so the result buffer is pre-filled before tile placement. @@ -1038,13 +1217,18 @@ def _read_cog_http(url: str, overview_level: int | None = None, if sparse: fill = _sparse_fill_value(ifd, dtype) if samples > 1: - result = np.full((height, width, samples), fill, dtype=dtype) + result = np.full((out_h, out_w, samples), fill, dtype=dtype) else: - result = np.full((height, width), fill, dtype=dtype) + result = np.full((out_h, out_w), fill, dtype=dtype) elif samples > 1: - result = np.empty((height, width, samples), dtype=dtype) + result = np.empty((out_h, out_w, samples), dtype=dtype) else: - result = np.empty((height, width), dtype=dtype) + result = np.empty((out_h, out_w), dtype=dtype) + + tile_row_start = r0_out // th + tile_row_end = min(math.ceil(r1_out / th), tiles_down) + tile_col_start = c0_out // tw + tile_col_end = min(math.ceil(c1_out / tw), tiles_across) # Pass 1: collect every tile's range and where it lands in the output. # Empty tiles (byte_count == 0) and any tile_idx beyond the offsets @@ -1052,8 +1236,8 @@ def _read_cog_http(url: str, overview_level: int | None = None, # the placements list. fetch_ranges: list[tuple[int, int]] = [] placements: list[tuple[int, int]] = [] # (tr, tc) per fetched tile - for tr in range(tiles_down): - for tc in range(tiles_across): + for tr in range(tile_row_start, tile_row_end): + for tc in range(tile_col_start, tile_col_end): tile_idx = tr * tiles_across + tc if tile_idx >= len(offsets): continue @@ -1067,13 +1251,26 @@ def _read_cog_http(url: str, overview_level: int | None = None, # Pass 2: fetch all tile bytes in parallel. Worker pool size is tunable # via XRSPATIAL_COG_HTTP_WORKERS so users on very slow links can dial # it up without code changes. + # + # COG tile offsets are sorted and usually back-to-back, so we coalesce + # adjacent ranges into fewer larger GETs (P2). The 1 MB gap threshold + # tolerates small interleaved metadata between tiles without dragging + # in unrelated overview data. Set XRSPATIAL_COG_COALESCE_GAP=-1 to + # disable merging (one GET per tile, the legacy behaviour). try: workers = max(1, int(_os_module.environ.get('XRSPATIAL_COG_HTTP_WORKERS', '8'))) except ValueError: workers = 8 - tile_bytes_list = source.read_ranges(fetch_ranges, max_workers=workers) + try: + gap = int(_os_module.environ.get( + 'XRSPATIAL_COG_COALESCE_GAP', + str(COALESCE_GAP_THRESHOLD_DEFAULT))) + except ValueError: + gap = COALESCE_GAP_THRESHOLD_DEFAULT + tile_bytes_list = source.read_ranges_coalesced( + fetch_ranges, max_workers=workers, gap_threshold=gap) - # Pass 3: decode each tile and place it. + # Pass 3: decode each tile and place it (clipped to the window). for (tr, tc), tile_data in zip(placements, tile_bytes_list): tile_pixels = _decode_strip_or_tile( tile_data, compression, tw, th, samples, @@ -1081,16 +1278,35 @@ def _read_cog_http(url: str, overview_level: int | None = None, byte_order=header.byte_order, jpeg_tables=jpeg_tables) - y0 = tr * th - x0 = tc * tw - y1 = min(y0 + th, height) - x1 = min(x0 + tw, width) - actual_h = y1 - y0 - actual_w = x1 - x0 - result[y0:y1, x0:x1] = tile_pixels[:actual_h, :actual_w] + # Tile position in image coordinates. + ty0 = tr * th + tx0 = tc * tw + ty1 = ty0 + th + tx1 = tx0 + tw + + # Intersect with the requested window. + iy0 = max(ty0, r0_out) + ix0 = max(tx0, c0_out) + iy1 = min(ty1, r1_out) + ix1 = min(tx1, c1_out) + if iy1 <= iy0 or ix1 <= ix0: + continue + + # Source slice within the decoded tile pixels. + sy0 = iy0 - ty0 + sx0 = ix0 - tx0 + sy1 = sy0 + (iy1 - iy0) + sx1 = sx0 + (ix1 - ix0) + + # Destination slice within the output buffer. + dy0 = iy0 - r0_out + dx0 = ix0 - c0_out + dy1 = iy1 - r0_out + dx1 = ix1 - c0_out + + result[dy0:dy1, dx0:dx1] = tile_pixels[sy0:sy1, sx0:sx1] - source.close() - return result, geo_info + return result # --------------------------------------------------------------------------- diff --git a/xrspatial/geotiff/tests/test_http_cog_coalesce.py b/xrspatial/geotiff/tests/test_http_cog_coalesce.py new file mode 100644 index 00000000..3b512a6d --- /dev/null +++ b/xrspatial/geotiff/tests/test_http_cog_coalesce.py @@ -0,0 +1,268 @@ +"""Coalesced HTTP COG range reads + once-per-graph IFD parsing. + +Covers two performance fixes: + +* P2 -- :func:`coalesce_ranges` merges adjacent tile byte ranges into + fewer larger GETs so HTTP wall time is bounded by ``ceil(N_merged/W) * + RTT`` rather than ``ceil(N_tiles/W) * RTT``. +* P5 -- :func:`read_geotiff_dask` parses IFDs once per graph and threads + the parsed metadata into delayed tasks, so an N-chunk HTTP COG no + longer fires N separate 16 KB header GETs. +""" +from __future__ import annotations + +import threading +import time + +import numpy as np +import pytest + +from xrspatial.geotiff import open_geotiff +from xrspatial.geotiff import read_geotiff_dask +from xrspatial.geotiff._reader import ( + COALESCE_GAP_THRESHOLD_DEFAULT, + _HTTPSource, + coalesce_ranges, + split_coalesced_bytes, +) +from xrspatial.geotiff._writer import write + + +# --------------------------------------------------------------------------- +# Pure unit tests on the coalescer +# --------------------------------------------------------------------------- + +def test_coalesce_empty_input(): + merged, mapping = coalesce_ranges([]) + assert merged == [] + assert mapping == [] + + +def test_coalesce_single_range(): + merged, mapping = coalesce_ranges([(100, 50)]) + assert merged == [(100, 50)] + # One input -> one merged entry, rel_offset 0, length 50. + assert mapping == [(0, 0, 50)] + + +def test_coalesce_merges_adjacent_ranges(): + # Three back-to-back ranges, each within the default gap threshold + # of the next, should collapse into a single merged GET. + ranges = [(0, 100), (100, 50), (150, 25)] + merged, mapping = coalesce_ranges(ranges) + assert len(merged) == 1 + start, length = merged[0] + assert start == 0 + assert length == 175 + # Each input maps back to merged_idx 0 with the right offset. + assert mapping[0] == (0, 0, 100) + assert mapping[1] == (0, 100, 50) + assert mapping[2] == (0, 150, 25) + + +def test_coalesce_does_not_merge_when_gap_exceeds_threshold(): + # 200-byte gap with gap_threshold=50 must split. + ranges = [(0, 100), (300, 50)] + merged, mapping = coalesce_ranges(ranges, gap_threshold=50) + assert merged == [(0, 100), (300, 50)] + assert mapping[0] == (0, 0, 100) + assert mapping[1] == (1, 0, 50) + + +def test_coalesce_with_unsorted_input(): + # Coalescing should sort by offset and still produce correct mapping + # in input order. + ranges = [(200, 30), (0, 100), (100, 50)] + merged, mapping = coalesce_ranges(ranges) + # All three are within 1 MB of each other so they merge into one. + assert len(merged) == 1 + start, length = merged[0] + assert start == 0 + assert length == 230 + # mapping is in input order, not sort order. + assert mapping[0] == (0, 200, 30) + assert mapping[1] == (0, 0, 100) + assert mapping[2] == (0, 100, 50) + + +def test_coalesce_negative_threshold_disables_merging(): + ranges = [(0, 10), (10, 10), (20, 10)] + merged, mapping = coalesce_ranges(ranges, gap_threshold=-1) + # Every input becomes its own merged range. + assert len(merged) == 3 + for i, (orig, _) in enumerate(zip(ranges, merged)): + assert mapping[i][0] != mapping[(i + 1) % 3][0] or i == 2 + + +def test_coalesce_split_recovers_per_tile_bytes(): + # Round-trip: real bytes through a fake fetcher that mirrors what + # urllib3 would return for a Range request. + payload = bytes(range(256)) * 4 # 1024 unique-ish bytes + ranges = [(0, 100), (100, 50), (200, 30), (1000, 20)] + + merged, mapping = coalesce_ranges(ranges, gap_threshold=200) + # First three merge (gaps 0 and 50 -> within threshold), last splits + # (gap of 770 from end of third range). + assert len(merged) == 2 + + merged_bytes = [payload[s:s + le] for (s, le) in merged] + out = split_coalesced_bytes(merged_bytes, mapping) + + for (off, length), tile in zip(ranges, out): + assert tile == payload[off:off + length] + + +# --------------------------------------------------------------------------- +# Mocked HTTP source for perf and call-count assertions +# --------------------------------------------------------------------------- + +class _MockHTTPSource(_HTTPSource): + """``_HTTPSource`` that serves bytes from an in-memory buffer. + + Records every ``read_range`` call and optionally sleeps to simulate + RTT. Tracks calls separately by ``(start, length)`` so tests can + assert how many tile fetches versus IFD fetches happened. + """ + + def __init__(self, buf: bytes, rtt: float = 0.0): + self._url = 'mock://' + self._size = len(buf) + self._pool = None + self._buf = buf + self._rtt = rtt + self.calls: list[tuple[int, int]] = [] + self._lock = threading.Lock() + + def read_range(self, start: int, length: int) -> bytes: + with self._lock: + self.calls.append((start, length)) + if self._rtt > 0: + time.sleep(self._rtt) + return self._buf[start:start + length] + + def read_all(self) -> bytes: + with self._lock: + self.calls.append((0, len(self._buf))) + if self._rtt > 0: + time.sleep(self._rtt) + return self._buf + + +@pytest.fixture +def small_cog_bytes(tmp_path): + """Build a small tiled COG and return its raw bytes.""" + arr = np.arange(64 * 64, dtype=np.float32).reshape(64, 64) + path = str(tmp_path / 'cog.tif') + # 8x8 tile grid (tile_size=8, image=64x64) gives 64 tiles -- enough + # tile count to make coalescing observable. + write(arr, path, compression='deflate', tiled=True, tile_size=8, + cog=True) + with open(path, 'rb') as f: + return f.read(), arr, path + + +def test_read_cog_http_uses_coalesced_fetches(small_cog_bytes, monkeypatch): + """One coalesced GET for many adjacent COG tiles instead of N GETs.""" + from xrspatial.geotiff import _reader as _reader_mod + + buf, expected, _ = small_cog_bytes + src = _MockHTTPSource(buf) + + def _fake_http_source(url): + return src + + monkeypatch.setattr(_reader_mod, '_HTTPSource', _fake_http_source) + + arr, _ = _reader_mod._read_cog_http('http://mock/cog.tif') + np.testing.assert_array_equal(arr, expected) + + # All calls. We expect one or two header GETs (16 KB / optional 64 KB) + # plus ONE merged tile GET, not 64 tile GETs. + tile_fetches = [ + (s, le) for (s, le) in src.calls + if s != 0 or le > 65536 # exclude header reads + ] + assert len(tile_fetches) == 1, ( + f'expected a single coalesced tile fetch, got {len(tile_fetches)}: ' + f'{tile_fetches[:5]}' + ) + + +def test_read_cog_http_perf_with_mock_rtt(small_cog_bytes, monkeypatch): + """Coalesced HTTP should beat the un-coalesced baseline at 50 ms RTT.""" + from xrspatial.geotiff import _reader as _reader_mod + + buf, expected, _ = small_cog_bytes + rtt = 0.05 + + # Baseline: disable coalescing via env var so each tile costs an RTT. + monkeypatch.setenv('XRSPATIAL_COG_COALESCE_GAP', '-1') + src1 = _MockHTTPSource(buf, rtt=rtt) + monkeypatch.setattr(_reader_mod, '_HTTPSource', lambda url: src1) + t0 = time.perf_counter() + arr1, _ = _reader_mod._read_cog_http('http://mock/cog.tif') + baseline = time.perf_counter() - t0 + np.testing.assert_array_equal(arr1, expected) + + # Coalesced path: leave env unset (default 1 MB threshold). + monkeypatch.delenv('XRSPATIAL_COG_COALESCE_GAP', raising=False) + src2 = _MockHTTPSource(buf, rtt=rtt) + monkeypatch.setattr(_reader_mod, '_HTTPSource', lambda url: src2) + t0 = time.perf_counter() + arr2, _ = _reader_mod._read_cog_http('http://mock/cog.tif') + coalesced = time.perf_counter() - t0 + np.testing.assert_array_equal(arr2, expected) + + # Coalesced has to be at least 2x faster on a 50 ms RTT 64-tile read. + # Baseline = ceil(64/8) * 50 ms = 400 ms; coalesced = ~50 ms for the + # single merged GET plus ~50 ms for the IFD read = ~100 ms. + assert coalesced * 2 < baseline, ( + f'coalesced wall time {coalesced:.3f}s should be much less than ' + f'baseline {baseline:.3f}s' + ) + + +# --------------------------------------------------------------------------- +# read_geotiff_dask: IFD parsing call count and correctness +# --------------------------------------------------------------------------- + +def test_dask_local_correctness(small_cog_bytes): + """Dask read of a local COG must equal the eager read bit-for-bit.""" + _, expected, path = small_cog_bytes + eager = open_geotiff(path) + lazy = read_geotiff_dask(path, chunks=16).compute() + np.testing.assert_array_equal(np.asarray(eager), np.asarray(lazy)) + np.testing.assert_array_equal(np.asarray(eager), expected) + + +def test_dask_http_parses_ifds_once(small_cog_bytes, monkeypatch): + """An N-chunk HTTP graph must trigger at most one IFD-header GET.""" + from xrspatial.geotiff import _reader as _reader_mod + + buf, expected, _ = small_cog_bytes + src_holder: list[_MockHTTPSource] = [] + + def _fake_http_source(url): + s = _MockHTTPSource(buf) + src_holder.append(s) + return s + + monkeypatch.setattr(_reader_mod, '_HTTPSource', _fake_http_source) + + # 16x16 chunks on 64x64 -> 16 chunks. Without P5 each chunk would + # spawn its own _HTTPSource and fire its own (0, 16384) GET. + da_arr = read_geotiff_dask('http://mock/cog.tif', chunks=16).compute() + np.testing.assert_array_equal(np.asarray(da_arr), expected) + + # Count "header" GETs across every _HTTPSource instance the read + # path created. The header probe is exactly (0, 16384) (or the + # fallback (0, 65536)). + header_calls = 0 + for s in src_holder: + for (start, length) in s.calls: + if start == 0 and length in (16384, 65536): + header_calls += 1 + assert header_calls <= 1, ( + f'expected <=1 header GETs across the dask graph, got ' + f'{header_calls} (over {len(src_holder)} sources)' + )