Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions xrspatial/geotiff/_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@
MAX_IFD_ENTRY_COUNT = 100_000
MAX_IFD_ENTRY_BYTES = 1 << 18 # 256 KiB

# Maximum number of IFDs we walk in `parse_all_ifds` before giving up.
# Real-world COGs carry the full-resolution IFD plus a handful of overview
# levels and (optionally) per-band masks, so they sit comfortably below 64
# even for deep pyramids. A crafted TIFF can chain millions of distinct
# IFD offsets via `next_ifd_offset`; the cycle-detection `seen` set won't
# catch those because every offset is unique. 256 is a generous ceiling
# that bounds memory while leaving plenty of headroom for any legitimate
# pyramid layout.
MAX_IFDS = 256

# Well-known TIFF tag IDs
TAG_NEW_SUBFILE_TYPE = 254
TAG_IMAGE_WIDTH = 256
Expand Down Expand Up @@ -648,6 +658,16 @@ def parse_all_ifds(data: bytes | memoryview,
break
ifd = parse_ifd(data, offset, header)
ifds.append(ifd)
# The `seen` set catches cycles, but a crafted file can chain a
# very long list of distinct offsets, each pointing at a small
# valid IFD. Cap the chain at MAX_IFDS to bound memory. A chain
# of exactly MAX_IFDS is allowed; only MAX_IFDS + 1 raises (same
# convention as MAX_IFD_ENTRY_COUNT).
if len(ifds) > MAX_IFDS:
raise ValueError(
f"TIFF IFD chain exceeds limit (MAX_IFDS={MAX_IFDS}); "
f"file is malformed or attempting denial-of-service"
)
offset = ifd.next_ifd_offset

return ifds
144 changes: 144 additions & 0 deletions xrspatial/geotiff/tests/test_ifd_chain_cap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Tests for the IFD-chain length cap in parse_all_ifds (security S3).

A crafted TIFF can chain millions of distinct IFD offsets via
``next_ifd_offset``; the cycle-detection ``seen`` set in
``parse_all_ifds`` won't catch those because every offset is unique.
``MAX_IFDS`` bounds the chain length to keep memory predictable on
untrusted input.
"""
from __future__ import annotations

import struct

import numpy as np
import pytest

from xrspatial.geotiff import to_geotiff
from xrspatial.geotiff._header import (
MAX_IFDS,
TAG_IMAGE_WIDTH,
parse_all_ifds,
parse_header,
)


def _build_chained_ifd_bytes(n_ifds: int, big_endian: bool = False) -> bytes:
"""Build a classic TIFF whose IFD chain has exactly ``n_ifds`` IFDs.

Each IFD carries a single tag (ImageWidth) so the parser accepts it,
and points at the next IFD via the trailing next-IFD offset. The
final IFD has next-pointer 0 (chain terminates cleanly), which means
``parse_all_ifds`` would happily walk all ``n_ifds`` of them in the
absence of the cap.
"""
bo = '>' if big_endian else '<'
bom = b'MM' if big_endian else b'II'

# Each classic-TIFF IFD here is:
# 2 bytes num_entries
# 12 bytes per entry (1 entry)
# 4 bytes next-IFD pointer
# = 18 bytes
ifd_size = 18

# Header is 8 bytes, then IFDs back-to-back.
out = bytearray()
out.extend(bom)
out.extend(struct.pack(f'{bo}H', 42))
first_ifd_offset = 8
out.extend(struct.pack(f'{bo}I', first_ifd_offset))

for i in range(n_ifds):
next_offset = first_ifd_offset + (i + 1) * ifd_size
if i == n_ifds - 1:
next_offset = 0 # terminate chain cleanly
out.extend(struct.pack(f'{bo}H', 1)) # num_entries
# ImageWidth, type=LONG (4), count=1, value=i+1 inline
out.extend(struct.pack(f'{bo}HHI', TAG_IMAGE_WIDTH, 4, 1))
out.extend(struct.pack(f'{bo}I', i + 1))
out.extend(struct.pack(f'{bo}I', next_offset))

return bytes(out)


class TestIFDChainCap:

def test_ifd_chain_at_limit_rejected(self):
"""A chain well past MAX_IFDS must raise, not silently grow."""
data = _build_chained_ifd_bytes(MAX_IFDS + 50)
header = parse_header(data)
with pytest.raises(ValueError, match=str(MAX_IFDS)):
parse_all_ifds(data, header)

def test_chain_at_boundary_passes(self):
"""MAX_IFDS is the largest accepted chain; MAX_IFDS + 1 rejects.

Convention: ``parse_all_ifds`` raises once ``len(ifds) > MAX_IFDS``
after appending. Matches the ``> MAX_IFD_ENTRY_COUNT`` pattern
used elsewhere in this module so "MAX = N" reads naturally as
"up to and including N is allowed".
"""
# MAX_IFDS IFDs: passes, returns all of them.
data_at = _build_chained_ifd_bytes(MAX_IFDS)
header_at = parse_header(data_at)
ifds_at = parse_all_ifds(data_at, header_at)
assert len(ifds_at) == MAX_IFDS

# MAX_IFDS + 1 IFDs: rejected.
data_over = _build_chained_ifd_bytes(MAX_IFDS + 1)
header_over = parse_header(data_over)
with pytest.raises(ValueError, match=str(MAX_IFDS)):
parse_all_ifds(data_over, header_over)

def test_error_message_mentions_dos_and_limit(self):
data = _build_chained_ifd_bytes(MAX_IFDS + 5)
header = parse_header(data)
with pytest.raises(ValueError) as excinfo:
parse_all_ifds(data, header)
msg = str(excinfo.value)
assert "MAX_IFDS" in msg
assert str(MAX_IFDS) in msg
# Threat-model language so operators see why it tripped.
assert "denial-of-service" in msg or "malformed" in msg

def test_short_chain_passes(self):
"""A small handful of IFDs (typical pyramid depth) parses fine."""
data = _build_chained_ifd_bytes(8)
header = parse_header(data)
ifds = parse_all_ifds(data, header)
assert len(ifds) == 8
# Tags survived: each IFD's ImageWidth equals its index + 1.
for i, ifd in enumerate(ifds):
assert ifd.width == i + 1

def test_legitimate_cog_with_overviews_passes(self, tmp_path):
"""A real COG with several overview levels parses fine.

Real-world COGs have <30 IFDs even with many overview levels and
per-band masks; the cap should never get in their way.
"""
# 256 x 256 array with explicit overview levels triggers a small
# pyramid in the writer. With levels [2, 4, 8] we get full + 3
# overviews = 4 IFDs.
arr = np.arange(256 * 256, dtype=np.float32).reshape(256, 256)
path = str(tmp_path / 'real_cog.tif')
to_geotiff(arr, path, compression='deflate', tiled=True,
tile_size=64, cog=True, overview_levels=[2, 4, 8])

with open(path, 'rb') as f:
data = f.read()
header = parse_header(data)
ifds = parse_all_ifds(data, header)
assert 1 < len(ifds) < MAX_IFDS
assert len(ifds) <= 16 # well under the cap


class TestIFDChainCapBigEndian:
"""Same cap, but on a big-endian file."""

def test_big_endian_chain_rejected(self):
data = _build_chained_ifd_bytes(MAX_IFDS + 10, big_endian=True)
header = parse_header(data)
assert header.byte_order == '>'
with pytest.raises(ValueError, match=str(MAX_IFDS)):
parse_all_ifds(data, header)
Loading