diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index c92994a..9f639ea 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -13,7 +13,7 @@ Concise, project-specific guidance for AI coding agents working on this repo. Fo ## 2. Key Behavioral Invariants - DO NOT mutate caller inputs—copy/normalize (shallow copy for mappings; deep-copy only when a callable filter may mutate; index-projection for sequences) before traversal. - Cycle detection in `encode._encode` must raise `ValueError("Circular reference detected")`—preserve side-channel algorithm. -- Depth, list, and parameter limits are security/safety features: respect `depth`, `max_depth`, `list_limit`, `parameter_limit`, and `strict_depth` / `raise_on_limit_exceeded` exactly as tests assert. `max_depth` is capped to the current recursion limit. +- Depth, list, and parameter limits are security/safety features: respect `depth`, `max_depth`, `list_limit`, `parameter_limit`, and `strict_depth` / `raise_on_limit_exceeded` exactly as tests assert. For encoding, `max_depth=None` means unbounded traversal (`sys.maxsize`); explicit values are enforced directly without recursion-limit capping, and exceeding an explicit `max_depth` raises `ValueError("Maximum encoding depth exceeded")`. - Duplicate key handling delegated to `Duplicates` enum: COMBINE → list accumulation; FIRST/LAST semantics enforced during merge. - List format semantics (`ListFormat` enum) change how prefixes are generated; COMMA + `comma_round_trip=True` must emit single-element marker for round-trip fidelity. - Charset sentinel logic: when `charset_sentinel=True`, prepend sentinel *before* payload; obey override rules when both charset and sentinel present. diff --git a/AGENTS.md b/AGENTS.md index 6515853..3971c56 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -22,7 +22,8 @@ - Preserve or raise the coverage level tracked in `coverage.xml`; CI flags regressions. - Name tests `test_{feature}_{scenario}` and refresh fixtures whenever query-string semantics shift. - When touching cross-language behavior, run `tests/comparison/compare_outputs.sh` to confirm parity with the Node reference. - - For encoding depth changes, cover `EncodeOptions.max_depth` (positive int/None) and cap-to-recursion behavior. + - For encoding depth changes, cover `EncodeOptions.max_depth` (positive int/None): `None` means unbounded traversal + (`sys.maxsize`) and explicit values are enforced directly (no recursion-limit capping). ## Commit & Pull Request Guidelines - Follow the emoji-prefixed summaries visible in `git log` (e.g., `:arrow_up: Bump actions/setup-python from 5 to 6 (#26)`), using the imperative mood. diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ed1619..e9c80a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 1.4.1-wip + +* [FIX] harden encoder traversal to an iterative implementation to avoid recursion-based crashes on very deep nested input +* [FIX] harden decode merge path (`Utils.merge`) with iterative traversal to prevent `RecursionError` on deep conflicting merges +* [FIX] update `EncodeOptions.max_depth` semantics: `None` is unbounded by this option; explicit limits are enforced directly +* [FIX] preserve legacy map-merge key collision semantics for mixed key types (`'1'` vs `1`) in iterative merge +* [CHORE] optimize deep encode performance by replacing per-frame side-channel chain scans with O(1) ancestry cycle state lookups +* [CHORE] add deep stack-safety regressions (depth 12_000) and cycle-state compatibility tests +* [CHORE] update encoding depth documentation + ## 1.4.0 * [FEAT] add `EncodeOptions.max_depth` to cap encoding traversal depth (capped to the current recursion limit) diff --git a/README.rst b/README.rst index d0a2bdb..c57d228 100644 --- a/README.rst +++ b/README.rst @@ -463,8 +463,7 @@ Maximum encoding depth You can cap how deep the encoder will traverse by setting the `max_depth `__ -option. If unset, the encoder derives a safe limit from the interpreter recursion limit; when set, the effective -limit is capped to the current recursion limit to avoid ``RecursionError``. +option. If unset, traversal is unbounded by this option. When set, the provided limit is enforced directly. .. code:: python diff --git a/docs/README.rst b/docs/README.rst index d73f8f4..f7b7e4e 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -413,9 +413,8 @@ Maximum encoding depth ^^^^^^^^^^^^^^^^^^^^^^ You can cap how deep the encoder will traverse by setting the -:py:attr:`max_depth ` option. If unset, the encoder derives a -safe limit from the interpreter recursion limit; when set, the effective limit is capped to the current recursion -limit to avoid ``RecursionError``. +:py:attr:`max_depth ` option. +If unset, traversal is unbounded by this option. When set, the provided limit is enforced directly. .. code:: python diff --git a/src/qs_codec/encode.py b/src/qs_codec/encode.py index a6084fd..9ad276c 100644 --- a/src/qs_codec/encode.py +++ b/src/qs_codec/encode.py @@ -17,6 +17,7 @@ import typing as t from collections.abc import Sequence as ABCSequence from copy import deepcopy +from dataclasses import dataclass, field from datetime import datetime from functools import cmp_to_key from weakref import WeakKeyDictionary @@ -95,7 +96,7 @@ def encode(value: t.Any, options: EncodeOptions = EncodeOptions()) -> str: if options.sort is not None and callable(options.sort): obj_keys = sorted(obj_keys, key=cmp_to_key(options.sort)) - # Side channel for cycle detection across recursive calls. + # Side channel seed for legacy `_encode` compatibility (and cycle-state bootstrap when provided). side_channel: WeakKeyDictionary = WeakKeyDictionary() max_depth = _get_max_encode_depth(options.max_depth) @@ -162,15 +163,123 @@ def encode(value: t.Any, options: EncodeOptions = EncodeOptions()) -> str: # Unique placeholder used as a key within the side-channel chain to pass context down recursion. _sentinel: WeakWrapper = WeakWrapper({}) -# Keep a safety buffer below Python's recursion limit to avoid RecursionError on deep inputs. -_DEPTH_MARGIN: int = 50 +MAX_ENCODING_DEPTH_EXCEEDED = "Maximum encoding depth exceeded" def _get_max_encode_depth(max_depth: t.Optional[int]) -> int: - limit = max(0, sys.getrecursionlimit() - _DEPTH_MARGIN) if max_depth is None: - return limit - return min(max_depth, limit) + return sys.maxsize + return max_depth + + +@dataclass +class _EncodeFrame: + value: t.Any + is_undefined: bool + side_channel: WeakKeyDictionary + prefix: t.Optional[str] + comma_round_trip: t.Optional[bool] + comma_compact_nulls: bool + encoder: t.Optional[t.Callable[[t.Any, t.Optional[Charset], t.Optional[Format]], str]] + serialize_date: t.Union[t.Callable[[datetime], t.Optional[str]], str] + sort: t.Optional[t.Callable[[t.Any, t.Any], int]] + filter_: t.Optional[t.Union[t.Callable, t.Sequence[t.Union[str, int]]]] + formatter: t.Optional[t.Callable[[str], str]] + format: Format + generate_array_prefix: t.Callable[[str, t.Optional[str]], str] + allow_empty_lists: bool + strict_null_handling: bool + skip_nulls: bool + encode_dot_in_keys: bool + allow_dots: bool + encode_values_only: bool + charset: t.Optional[Charset] + add_query_prefix: bool + depth: int + max_depth: t.Optional[int] + phase: str = "start" + obj: t.Any = None + obj_wrapper: t.Optional[WeakWrapper] = None + step: int = 0 + obj_keys: t.List[t.Any] = field(default_factory=list) + values: t.List[t.Any] = field(default_factory=list) + index: int = 0 + adjusted_prefix: str = "" + cycle_state: t.Optional["_CycleState"] = None + cycle_level: t.Optional[int] = None + cycle_pushed: bool = False + + +@dataclass +class _CycleEntry: + level: int + pos: t.Any + is_top: bool + + +@dataclass +class _CycleState: + entries: t.Dict[WeakWrapper, t.List[_CycleEntry]] = field(default_factory=dict) + + +def _bootstrap_cycle_state_from_side_channel(side_channel: WeakKeyDictionary) -> t.Tuple[_CycleState, int]: + """ + Build O(1) ancestry lookup state from an existing side-channel chain. + + Returns: + Tuple of (state, current_level), where current_level is the chain length + from the current frame to the top-most side-channel mapping. + """ + chain: t.List[WeakKeyDictionary] = [] + tmp_sc: t.Optional[WeakKeyDictionary] = side_channel.get(_sentinel) # type: ignore[assignment] + while tmp_sc is not None: + chain.append(tmp_sc) + tmp_sc = tmp_sc.get(_sentinel) # type: ignore[assignment] + + state = _CycleState() + for level, ancestor in enumerate(reversed(chain)): + is_top = ancestor.get(_sentinel) is None + for key, pos in ancestor.items(): + if key is _sentinel or not isinstance(key, WeakWrapper): + continue + state.entries.setdefault(key, []).append(_CycleEntry(level=level, pos=pos, is_top=is_top)) + + return state, len(chain) + + +def _compute_step_and_check_cycle(state: _CycleState, wrapper: WeakWrapper, current_level: int) -> int: + """ + Compute the current cycle-detection "step" and raise on circular reference. + + Semantics intentionally match the legacy side-channel chain scan: + * nearest ancestor match wins + * raise when ancestor_pos == distance + * return 0 when no match or when nearest match is the top-most side-channel + """ + entries = state.entries.get(wrapper) + if not entries: + return 0 + + nearest = entries[-1] + distance = current_level - nearest.level + if nearest.pos == distance: + raise ValueError("Circular reference detected") # noqa: TRY003 + + return 0 if nearest.is_top else distance + + +def _push_current_node(state: _CycleState, wrapper: WeakWrapper, current_level: int, pos: int, is_top: bool) -> None: + state.entries.setdefault(wrapper, []).append(_CycleEntry(level=current_level, pos=pos, is_top=is_top)) + + +def _pop_current_node(state: _CycleState, wrapper: WeakWrapper) -> None: + entries = state.entries.get(wrapper) + if not entries: + return + + entries.pop() + if not entries: + del state.entries[wrapper] def _encode( @@ -199,18 +308,19 @@ def _encode( _max_depth: t.Optional[int] = None, ) -> t.Union[t.List[t.Any], t.Tuple[t.Any, ...], t.Any]: """ - Recursive worker that produces `key=value` tokens for a single subtree. + Iterative worker that produces `key=value` tokens for a single subtree. This function returns either: * a list/tuple of tokens (strings) to be appended to the parent list, or * a single token string (when a scalar is reached). - It threads a *side-channel* (a chained `WeakKeyDictionary`) through recursion to detect cycles by remembering where each visited object last appeared. + It uses an internal O(1) cycle-state map to detect cycles while preserving compatibility with legacy direct `_encode` + callers that provide a chained side-channel via `_sentinel`. Args: value: Current subtree value. is_undefined: Whether the current key was absent in the parent mapping. - side_channel: Cycle-detection chain; child frames point to their parent via `_sentinel`. + side_channel: Legacy side-channel seed. If pre-seeded via `_sentinel`, it is bootstrapped once into cycle state. prefix: The key path accumulated so far (unencoded except for dot-encoding when requested). comma_round_trip: Whether a single-element list should emit `[]` to ensure round-trip with comma format. comma_compact_nulls: When True (and using comma list format), drop `None` entries before joining. @@ -233,206 +343,17 @@ def _encode( Returns: Either a list/tuple of tokens or a single token string. """ - if _max_depth is None: - _max_depth = _get_max_encode_depth(None) - if _depth > _max_depth: - raise ValueError("Maximum encoding depth exceeded") - - # Establish a starting prefix for the top-most invocation (used when called directly). - if prefix is None: - prefix = "?" if add_query_prefix else "" - - # Infer comma round-trip when using the COMMA generator and the flag was not explicitly provided. - if comma_round_trip is None: - comma_round_trip = generate_array_prefix is ListFormat.COMMA.generator - - # Choose a formatter if one wasn't provided (based on the selected format). - if formatter is None: - formatter = format.formatter - - # Work with the original; we never mutate in place (we build new lists/maps when normalizing). - obj: t.Any = value - - # --- Cycle detection via chained side-channel ----------------------------------------- - obj_wrapper: WeakWrapper = WeakWrapper(value) - tmp_sc: t.Optional[WeakKeyDictionary] = side_channel - step: int = 0 - find_flag: bool = False - - # Walk up the chain looking for `obj_wrapper`. If we see it at the same "step" - # again we've closed a loop. - while (tmp_sc := tmp_sc.get(_sentinel)) and not find_flag: # type: ignore [union-attr] - # Where `value` last appeared in the ref tree - pos: t.Optional[int] = tmp_sc.get(obj_wrapper) - step += 1 - if pos is not None: - if pos == step: - raise ValueError("Circular reference detected") - else: - find_flag = True # Break while - if tmp_sc.get(_sentinel) is None: - step = 0 - - # --- Pre-processing: filter & datetime handling --------------------------------------- - filter_opt = filter_ - if callable(filter_opt): - # Callable filter can transform the object for this prefix. - obj = filter_opt(prefix, obj) - else: - # Normalize datetimes both for scalars and (in COMMA mode) list elements. - if isinstance(obj, datetime): - obj = serialize_date(obj) if callable(serialize_date) else obj.isoformat() - elif generate_array_prefix is ListFormat.COMMA.generator and isinstance(obj, (list, tuple)): - if callable(serialize_date): - obj = [serialize_date(x) if isinstance(x, datetime) else x for x in obj] - else: - obj = [x.isoformat() if isinstance(x, datetime) else x for x in obj] - - # --- Null handling -------------------------------------------------------------------- - if not is_undefined and obj is None: - if strict_null_handling: - # Bare key (no '=value') when strict handling is requested. - return encoder(prefix, charset, format) if callable(encoder) and not encode_values_only else prefix - # Otherwise treat `None` as empty string. - obj = "" - - # --- Fast path for primitives/bytes --------------------------------------------------- - if Utils.is_non_nullish_primitive(obj, skip_nulls) or isinstance(obj, bytes): - # When a custom encoder is provided, still coerce Python bools to lowercase JSON style - if callable(encoder): - key_value = prefix if encode_values_only else encoder(prefix, charset, format) - if isinstance(obj, bool): - value_part = "true" if obj else "false" - else: - value_part = encoder(obj, charset, format) - return [f"{formatter(key_value)}={formatter(value_part)}"] - # Default fallback (no custom encoder): ensure lowercase boolean literals - if isinstance(obj, bool): - value_str = "true" if obj else "false" - else: - value_str = str(obj) - return [f"{formatter(prefix)}={formatter(value_str)}"] - - values: t.List[t.Any] = [] - - # If the *key itself* was undefined (not present in the parent), there is nothing to emit. - if is_undefined: - return values - - # --- Determine which keys/indices to traverse ---------------------------------------- - comma_effective_length: t.Optional[int] = None - obj_keys: t.List[t.Any] - if generate_array_prefix == ListFormat.COMMA.generator and isinstance(obj, (list, tuple)): - # In COMMA mode we join the elements into a single token at this level. - comma_items: t.List[t.Any] = list(obj) - if comma_compact_nulls: - comma_items = [item for item in comma_items if item is not None] - comma_effective_length = len(comma_items) - - if encode_values_only and callable(encoder): - encoded_items = Utils.apply(comma_items, encoder) - obj_keys_value = ",".join(("" if e is None else str(e)) for e in encoded_items) - else: - obj_keys_value = ",".join(Utils.normalize_comma_elem(e) for e in comma_items) - - if comma_items: - obj_keys = [{"value": obj_keys_value if obj_keys_value else None}] - else: - obj_keys = [{"value": UNDEFINED}] - elif ( - filter_opt is not None - and isinstance(filter_opt, ABCSequence) - and not isinstance(filter_opt, (str, bytes, bytearray)) - ): - # Iterable filter restricts traversal to a fixed key/index set. - obj_keys = list(filter_opt) - else: - # Default: enumerate keys/indices from mappings or sequences. - if isinstance(obj, t.Mapping): - keys = list(obj.keys()) - elif isinstance(obj, (list, tuple)): - keys = list(range(len(obj))) - else: - keys = [] - obj_keys = sorted(keys, key=cmp_to_key(sort)) if sort is not None else keys + last_result: t.Union[t.List[t.Any], t.Tuple[t.Any, ...], t.Any, None] = None - # Percent-encode literal dots in key names when requested. - encoded_prefix: str = prefix.replace(".", "%2E") if encode_dot_in_keys else prefix - - # In comma round-trip mode, ensure a single-element list appends `[]` to preserve type on decode. - single_item_for_round_trip: bool = False - if comma_round_trip and isinstance(obj, (list, tuple)): - if generate_array_prefix == ListFormat.COMMA.generator and comma_effective_length is not None: - single_item_for_round_trip = comma_effective_length == 1 - else: - single_item_for_round_trip = len(obj) == 1 - adjusted_prefix: str = f"{encoded_prefix}[]" if single_item_for_round_trip else encoded_prefix - - # Optionally emit empty lists as `key[]=`. - if allow_empty_lists and isinstance(obj, (list, tuple)) and not obj: - return [f"{adjusted_prefix}[]"] - - # --- Recurse for each child ----------------------------------------------------------- - for _key in obj_keys: - # Resolve the child value and whether it was "undefined" at this level. - _value: t.Any - _value_undefined: bool - if isinstance(_key, t.Mapping) and "value" in _key and not isinstance(_key.get("value"), Undefined): - _value = _key.get("value") - _value_undefined = False - else: - try: - if isinstance(obj, t.Mapping): - _value = obj.get(_key) - _value_undefined = _key not in obj - elif isinstance(obj, (list, tuple)): - if isinstance(_key, int): - _value = obj[_key] - _value_undefined = False - else: - _value = None - _value_undefined = True - else: - _value = obj[_key] - _value_undefined = False - except Exception: # pylint: disable=W0718 - _value = None - _value_undefined = True - - # Optionally drop null children. - if skip_nulls and _value is None: - continue - - # When using dotted paths and also encoding dots in keys, percent-escape '.' inside key names. - encoded_key: str = str(_key).replace(".", "%2E") if allow_dots and encode_dot_in_keys else str(_key) - - # Build the child key path depending on whether we're traversing a list or a mapping. - key_prefix: str = ( - generate_array_prefix(adjusted_prefix, encoded_key) - if isinstance(obj, (list, tuple)) - else f"{adjusted_prefix}{f'.{encoded_key}' if allow_dots else f'[{encoded_key}]'}" - ) - - # Update side-channel for the child call and thread the parent channel via `_sentinel`. - side_channel[obj_wrapper] = step - value_side_channel: WeakKeyDictionary = WeakKeyDictionary() - value_side_channel[_sentinel] = side_channel - - # Recurse into the child. - encoded: t.Union[t.List[t.Any], t.Tuple[t.Any, ...], t.Any] = _encode( - value=_value, - is_undefined=_value_undefined, - side_channel=value_side_channel, - prefix=key_prefix, + stack: t.List[_EncodeFrame] = [ + _EncodeFrame( + value=value, + is_undefined=is_undefined, + side_channel=side_channel, + prefix=prefix, comma_round_trip=comma_round_trip, comma_compact_nulls=comma_compact_nulls, - encoder=( - None - if generate_array_prefix is ListFormat.COMMA.generator - and encode_values_only - and isinstance(obj, (list, tuple)) - else encoder - ), + encoder=encoder, serialize_date=serialize_date, sort=sort, filter_=filter_, @@ -446,14 +367,273 @@ def _encode( allow_dots=allow_dots, encode_values_only=encode_values_only, charset=charset, - _depth=_depth + 1, - _max_depth=_max_depth, + add_query_prefix=add_query_prefix, + depth=_depth, + max_depth=_max_depth, ) + ] + + while stack: + frame = stack[-1] + + if frame.phase == "start": + if frame.max_depth is None: + frame.max_depth = _get_max_encode_depth(None) + if frame.depth > frame.max_depth: + raise ValueError(MAX_ENCODING_DEPTH_EXCEEDED) + + if frame.prefix is None: + frame.prefix = "?" if frame.add_query_prefix else "" + if frame.comma_round_trip is None: + frame.comma_round_trip = frame.generate_array_prefix is ListFormat.COMMA.generator + if frame.formatter is None: + frame.formatter = frame.format.formatter + + # Work with the original; we never mutate in place (we build new lists/maps when normalizing). + obj: t.Any = frame.value + + # --- Pre-processing: filter & datetime handling ------------------------------- + filter_opt = frame.filter_ + if callable(filter_opt): + # Callable filter can transform the object for this prefix. + obj = filter_opt(frame.prefix, obj) + else: + # Normalize datetimes both for scalars and (in COMMA mode) list elements. + if isinstance(obj, datetime): + obj = frame.serialize_date(obj) if callable(frame.serialize_date) else obj.isoformat() + elif frame.generate_array_prefix is ListFormat.COMMA.generator and isinstance(obj, (list, tuple)): + if callable(frame.serialize_date): + obj = [frame.serialize_date(x) if isinstance(x, datetime) else x for x in obj] + else: + obj = [x.isoformat() if isinstance(x, datetime) else x for x in obj] + + # --- Null handling ------------------------------------------------------------ + if not frame.is_undefined and obj is None: + if frame.strict_null_handling: + # Bare key (no '=value') when strict handling is requested. + result_token = ( + frame.encoder(frame.prefix, frame.charset, frame.format) + if callable(frame.encoder) and not frame.encode_values_only + else frame.prefix + ) + stack.pop() + last_result = result_token + continue + # Otherwise treat `None` as empty string. + obj = "" + + # --- Fast path for primitives/bytes ----------------------------------------- + if Utils.is_non_nullish_primitive(obj, frame.skip_nulls) or isinstance(obj, bytes): + # When a custom encoder is provided, still coerce Python bools to lowercase JSON style. + if callable(frame.encoder): + key_value = ( + frame.prefix + if frame.encode_values_only + else frame.encoder(frame.prefix, frame.charset, frame.format) + ) + if isinstance(obj, bool): + value_part = "true" if obj else "false" + else: + value_part = frame.encoder(obj, frame.charset, frame.format) + result_tokens = [f"{frame.formatter(key_value)}={frame.formatter(value_part)}"] + else: + # Default fallback (no custom encoder): ensure lowercase boolean literals. + if isinstance(obj, bool): + value_str = "true" if obj else "false" + else: + value_str = str(obj) + result_tokens = [f"{frame.formatter(frame.prefix)}={frame.formatter(value_str)}"] + + stack.pop() + last_result = result_tokens + continue + + frame.obj = obj + frame.values = [] + + # If the *key itself* was undefined (not present in the parent), there is nothing to emit. + if frame.is_undefined: + stack.pop() + last_result = frame.values + continue + + # --- Cycle detection via ancestry lookup state -------------------------------- + # Only needed for traversable containers; primitive/bytes values return via fast path above. + obj_wrapper: WeakWrapper = WeakWrapper(obj) + if frame.cycle_state is None or frame.cycle_level is None: + frame.cycle_state, frame.cycle_level = _bootstrap_cycle_state_from_side_channel(frame.side_channel) + step = _compute_step_and_check_cycle(frame.cycle_state, obj_wrapper, frame.cycle_level) + + frame.obj_wrapper = obj_wrapper + frame.step = step + + # --- Determine which keys/indices to traverse ------------------------------- + comma_effective_length: t.Optional[int] = None + if frame.generate_array_prefix is ListFormat.COMMA.generator and isinstance(obj, (list, tuple)): + # In COMMA mode we join the elements into a single token at this level. + comma_items: t.List[t.Any] = list(obj) + if frame.comma_compact_nulls: + comma_items = [item for item in comma_items if item is not None] + comma_effective_length = len(comma_items) + + if frame.encode_values_only and callable(frame.encoder): + encoded_items = Utils.apply(comma_items, frame.encoder) + obj_keys_value = ",".join(("" if e is None else str(e)) for e in encoded_items) + else: + obj_keys_value = ",".join(Utils.normalize_comma_elem(e) for e in comma_items) + + if comma_items: + frame.obj_keys = [{"value": obj_keys_value if obj_keys_value else None}] + else: + frame.obj_keys = [{"value": UNDEFINED}] + elif ( + filter_opt is not None + and isinstance(filter_opt, ABCSequence) + and not isinstance(filter_opt, (str, bytes, bytearray)) + ): + # Iterable filter restricts traversal to a fixed key/index set. + frame.obj_keys = list(filter_opt) + else: + # Default: enumerate keys/indices from mappings or sequences. + if isinstance(obj, t.Mapping): + keys = list(obj.keys()) + elif isinstance(obj, (list, tuple)): + keys = list(range(len(obj))) + else: + keys = [] + frame.obj_keys = sorted(keys, key=cmp_to_key(frame.sort)) if frame.sort is not None else keys + + # Percent-encode literal dots in key names when requested. + encoded_prefix: str = frame.prefix.replace(".", "%2E") if frame.encode_dot_in_keys else frame.prefix + + # In comma round-trip mode, ensure a single-element list appends `[]` to preserve type on decode. + single_item_for_round_trip: bool = False + if frame.comma_round_trip and isinstance(obj, (list, tuple)): + if frame.generate_array_prefix is ListFormat.COMMA.generator and comma_effective_length is not None: + single_item_for_round_trip = comma_effective_length == 1 + else: + single_item_for_round_trip = len(obj) == 1 + + frame.adjusted_prefix = f"{encoded_prefix}[]" if single_item_for_round_trip else encoded_prefix + + # Optionally emit empty lists as `key[]=`. + if frame.allow_empty_lists and isinstance(obj, (list, tuple)) and not obj: + stack.pop() + last_result = [f"{frame.adjusted_prefix}[]"] + continue + + frame.index = 0 + frame.phase = "iterate" + continue + + if frame.phase == "iterate": + if frame.index >= len(frame.obj_keys): + if frame.cycle_pushed and frame.obj_wrapper is not None and frame.cycle_state is not None: + _pop_current_node(frame.cycle_state, frame.obj_wrapper) + frame.cycle_pushed = False + stack.pop() + last_result = frame.values + continue + + if not frame.cycle_pushed and frame.obj_wrapper is not None and frame.cycle_state is not None: + _push_current_node( + frame.cycle_state, + frame.obj_wrapper, + frame.cycle_level if frame.cycle_level is not None else 0, + frame.step, + (frame.cycle_level == 0), + ) + frame.side_channel[frame.obj_wrapper] = frame.step + frame.cycle_pushed = True + + _key = frame.obj_keys[frame.index] + frame.index += 1 + + # Resolve the child value and whether it was "undefined" at this level. + _value: t.Any + _value_undefined: bool + if isinstance(_key, t.Mapping) and "value" in _key and not isinstance(_key.get("value"), Undefined): + _value = _key.get("value") + _value_undefined = False + else: + try: + if isinstance(frame.obj, t.Mapping): + _value = frame.obj.get(_key) + _value_undefined = _key not in frame.obj + elif isinstance(frame.obj, (list, tuple)): + if isinstance(_key, int): + _value = frame.obj[_key] + _value_undefined = False + else: + _value = None + _value_undefined = True + else: + _value = frame.obj[_key] + _value_undefined = False + except Exception: # noqa: BLE001 # pylint: disable=W0718 + # User-provided __getitem__/mapping accessors may raise arbitrary exceptions. + _value = None + _value_undefined = True + + # Optionally drop null children. + if frame.skip_nulls and _value is None: + continue + + # When using dotted paths and also encoding dots in keys, percent-escape '.' inside key names. + encoded_key: str = ( + str(_key).replace(".", "%2E") if frame.allow_dots and frame.encode_dot_in_keys else str(_key) + ) + + # Build the child key path depending on whether we're traversing a list or a mapping. + key_prefix: str = ( + frame.generate_array_prefix(frame.adjusted_prefix, encoded_key) + if isinstance(frame.obj, (list, tuple)) + else f"{frame.adjusted_prefix}{f'.{encoded_key}' if frame.allow_dots else f'[{encoded_key}]'}" + ) + + frame.phase = "await_child" + stack.append( + _EncodeFrame( + value=_value, + is_undefined=_value_undefined, + side_channel=frame.side_channel, + prefix=key_prefix, + comma_round_trip=frame.comma_round_trip, + comma_compact_nulls=frame.comma_compact_nulls, + encoder=( + None + if frame.generate_array_prefix is ListFormat.COMMA.generator + and frame.encode_values_only + and isinstance(frame.obj, (list, tuple)) + else frame.encoder + ), + serialize_date=frame.serialize_date, + sort=frame.sort, + filter_=frame.filter_, + formatter=frame.formatter, + format=frame.format, + generate_array_prefix=frame.generate_array_prefix, + allow_empty_lists=frame.allow_empty_lists, + strict_null_handling=frame.strict_null_handling, + skip_nulls=frame.skip_nulls, + encode_dot_in_keys=frame.encode_dot_in_keys, + allow_dots=frame.allow_dots, + encode_values_only=frame.encode_values_only, + charset=frame.charset, + add_query_prefix=False, + depth=frame.depth + 1, + max_depth=frame.max_depth, + cycle_state=frame.cycle_state, + cycle_level=(frame.cycle_level + 1) if frame.cycle_level is not None else None, + ) + ) + continue - # Flatten nested results into the `values` list. - if isinstance(encoded, (list, tuple)): - values.extend(encoded) + # frame.phase == "await_child" + if isinstance(last_result, (list, tuple)): + frame.values.extend(last_result) else: - values.append(encoded) + frame.values.append(last_result) + frame.phase = "iterate" - return values + return [] if last_result is None else last_result diff --git a/src/qs_codec/models/encode_options.py b/src/qs_codec/models/encode_options.py index db9ef4b..0c2a0c1 100644 --- a/src/qs_codec/models/encode_options.py +++ b/src/qs_codec/models/encode_options.py @@ -120,8 +120,8 @@ def encoder(self, value: t.Optional[t.Callable[[t.Any, t.Optional[Charset], t.Op max_depth: t.Optional[int] = None """Maximum nesting depth allowed during encoding. - When ``None``, the encoder derives a safe limit from the interpreter recursion limit (minus a safety margin). - When set, the effective limit is capped to the current recursion limit to avoid ``RecursionError``. + When ``None``, encoding depth is unbounded by this option. + When set, the value is enforced directly. """ def __post_init__(self) -> None: diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 1b829dd..1bd6bc2 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -19,6 +19,7 @@ import typing as t from collections import deque +from dataclasses import dataclass, field from datetime import datetime, timedelta from decimal import Decimal from enum import Enum @@ -44,6 +45,25 @@ def _numeric_key_pairs(mapping: t.Mapping[t.Any, t.Any]) -> t.List[t.Tuple[int, return pairs +@dataclass +class _MergeFrame: + target: t.Any + source: t.Any + options: DecodeOptions + phase: str = "start" + merge_target: t.Optional[t.MutableMapping[t.Any, t.Any]] = None + merge_existing_keys: t.Set[t.Any] = field(default_factory=set) + pending_updates: t.Dict[t.Any, t.Any] = field(default_factory=dict) + source_items: t.List[t.Tuple[t.Any, t.Any]] = field(default_factory=list) + entry_index: int = 0 + pending_key: t.Any = None + list_target: t.Dict[int, t.Any] = field(default_factory=dict) + list_source: t.Dict[int, t.Any] = field(default_factory=dict) + list_max_len: int = 0 + list_index: int = 0 + list_merged: t.List[t.Any] = field(default_factory=list) + + class Utils: """ Namespace container for stateless utility routines. @@ -96,148 +116,245 @@ def merge( The merged structure. May be the original `target` object when `source` is ``None``. """ - if source is None: - # Nothing to merge — keep the original target as‑is. - return target - - if options is None: - # Use default decode options when none are provided. - options = DecodeOptions() - - if not isinstance(source, t.Mapping): - # Fast‑path: merging a non‑mapping (list/tuple/scalar) into target. - if isinstance(target, (list, tuple)): - # If the target sequence contains `Undefined`, we may need to promote it - # to a dict keyed by string indices for stable writes. - if any(isinstance(el, Undefined) for el in target): - # Create an index → value view so we can overwrite by position. - target_: t.Dict[int, t.Any] = dict(enumerate(target)) - - if isinstance(source, (list, tuple)): - for i, item in enumerate(source): - if not isinstance(item, Undefined): - target_[i] = item - else: - target_[len(target_)] = source + opts = options if options is not None else DecodeOptions() + last_result: t.Any = None - # When list parsing is disabled, collapse to a string‑keyed dict and drop sentinels. - if not options.parse_lists and any(isinstance(value, Undefined) for value in target_.values()): - target = {str(i): target_[i] for i in target_ if not isinstance(target_[i], Undefined)} - else: - target = [el for el in target_.values() if not isinstance(el, Undefined)] - else: - if isinstance(source, (list, tuple)): - if all(isinstance(el, (t.Mapping, Undefined)) for el in target) and all( - isinstance(el, (t.Mapping, Undefined)) for el in source - ): - target_dict: t.Dict[int, t.Any] = dict(enumerate(target)) - source_dict: t.Dict[int, t.Any] = dict(enumerate(source)) - max_len = max(len(target_dict), len(source_dict)) - merged_list: t.List[t.Any] = [] - for i in range(max_len): - has_t = i in target_dict - has_s = i in source_dict - if has_t and has_s: - merged_list.append(Utils.merge(target_dict[i], source_dict[i], options)) - elif has_t: - tv = target_dict[i] - if not isinstance(tv, Undefined): - merged_list.append(tv) - elif has_s: - sv = source_dict[i] - if not isinstance(sv, Undefined): - merged_list.append(sv) - target = merged_list - else: - # Tuples are immutable; work with a list when mutating. - if isinstance(target, tuple): - target = list(target) - target.extend(el for el in source if not isinstance(el, Undefined)) - elif source is not None: - # Tuples are immutable; work with a list when mutating. - if isinstance(target, tuple): - target = list(target) - target.append(source) - elif isinstance(target, t.Mapping): - if Utils.is_overflow(target): - return Utils.combine(target, source, options) - - # Target is a mapping but source is a sequence — coerce indices to string keys. - if isinstance(source, (list, tuple)): - _new = dict(target) - for i, item in enumerate(source): - if not isinstance(item, Undefined): - _new[str(i)] = item - target = _new - elif source is not None: - if not isinstance(target, (list, tuple)) and isinstance(source, (list, tuple)): - return [target, *(el for el in source if not isinstance(el, Undefined))] - return [target, source] - - return target - - # Source is a mapping but target is not — coerce target to a mapping or - # concatenate as a list, then proceed. - if target is None or not isinstance(target, t.Mapping): - if isinstance(target, (list, tuple)): - return { - **{str(i): item for i, item in enumerate(target) if not isinstance(item, Undefined)}, - **source, - } - - if Utils.is_overflow(source): - source_of = t.cast(OverflowDict, source) - sorted_pairs = sorted(_numeric_key_pairs(source_of), key=lambda item: item[0]) - numeric_keys = {key for _, key in sorted_pairs} - result = OverflowDict() - offset = 0 - if not isinstance(target, Undefined): - result["0"] = target - offset = 1 - for numeric_key, key in sorted_pairs: - val = source_of[key] - if not isinstance(val, Undefined): - # Offset ensures target occupies index "0"; source indices shift up by 1 - result[str(numeric_key + offset)] = val - for key, val in source_of.items(): - if key in numeric_keys: + stack: t.List[_MergeFrame] = [_MergeFrame(target=target, source=source, options=opts)] + + while stack: + frame = stack[-1] + + if frame.phase == "start": + current_target = frame.target + current_source = frame.source + + if current_source is None: + stack.pop() + last_result = current_target + continue + + if not isinstance(current_source, t.Mapping): + # Fast-path: merging a non-mapping (list/tuple/scalar) into target. + if isinstance(current_target, (list, tuple)): + # If the target sequence contains `Undefined`, we may need to promote it + # to a dict keyed by indices for stable writes. + if any(isinstance(el, Undefined) for el in current_target): + target_: t.Dict[int, t.Any] = dict(enumerate(current_target)) + + if isinstance(current_source, (list, tuple)): + for i, item in enumerate(current_source): + if not isinstance(item, Undefined): + target_[i] = item + else: + target_[len(target_)] = current_source + + # When list parsing is disabled, collapse to a string-keyed dict and drop sentinels. + if not frame.options.parse_lists and any( + isinstance(value, Undefined) for value in target_.values() + ): + result: t.Any = { + str(i): target_[i] for i in target_ if not isinstance(target_[i], Undefined) + } + else: + result = [el for el in target_.values() if not isinstance(el, Undefined)] + stack.pop() + last_result = result + continue + + if isinstance(current_source, (list, tuple)): + if all(isinstance(el, (t.Mapping, Undefined)) for el in current_target) and all( + isinstance(el, (t.Mapping, Undefined)) for el in current_source + ): + frame.list_target = dict(enumerate(current_target)) + frame.list_source = dict(enumerate(current_source)) + frame.list_max_len = max(len(frame.list_target), len(frame.list_source)) + frame.list_index = 0 + frame.list_merged = [] + frame.phase = "list_iter" + continue + + mutable_target = ( + list(current_target) if isinstance(current_target, tuple) else current_target + ) + # Mutates in-place by design for list targets to preserve merge performance. + mutable_target.extend(el for el in current_source if not isinstance(el, Undefined)) + stack.pop() + last_result = mutable_target + continue + + mutable_target = list(current_target) if isinstance(current_target, tuple) else current_target + mutable_target.append(current_source) + stack.pop() + last_result = mutable_target continue - if not isinstance(val, Undefined): - result[key] = val - return result - - _res: t.List[t.Any] = [] - _iter1 = target if isinstance(target, (list, tuple)) else [target] - for _el in _iter1: - if not isinstance(_el, Undefined): - _res.append(_el) - _iter2 = [source] - for _el in _iter2: - if not isinstance(_el, Undefined): - _res.append(_el) - return _res - - # Prepare a mutable target we can merge into; reuse dict targets for performance. - merge_target: t.Dict[str, t.Any] - if isinstance(target, dict): - merge_target = target - else: - merge_target = dict(target) - - # For overlapping keys, merge recursively; otherwise, take the new value. - merged_updates: t.Dict[t.Any, t.Any] = {} - # Prefer exact key matches; fall back to string normalization only when needed. - for key, value in source.items(): - normalized_key = str(key) - if key in merge_target: - merged_updates[key] = Utils.merge(merge_target[key], value, options) - elif normalized_key in merge_target: - merged_updates[normalized_key] = Utils.merge(merge_target[normalized_key], value, options) - else: - merged_updates[key] = value - if merged_updates: - merge_target.update(merged_updates) - return merge_target + + if isinstance(current_target, t.Mapping): + if Utils.is_overflow(current_target): + stack.pop() + last_result = Utils.combine(current_target, current_source, frame.options) + continue + + # Target is a mapping but source is a sequence — coerce indices to string keys. + if isinstance(current_source, (list, tuple)): + new_target = dict(current_target) + for i, item in enumerate(current_source): + if not isinstance(item, Undefined): + new_target[str(i)] = item + stack.pop() + last_result = new_target + continue + + stack.pop() + last_result = current_target + continue + + if not isinstance(current_target, (list, tuple)) and isinstance(current_source, (list, tuple)): + stack.pop() + last_result = [ + current_target, + *(el for el in current_source if not isinstance(el, Undefined)), + ] + continue + + stack.pop() + last_result = [current_target, current_source] + continue + + # Source is a mapping but target is not — coerce target to a mapping or + # concatenate as a list, then proceed. + if current_target is None or not isinstance(current_target, t.Mapping): + if isinstance(current_target, (list, tuple)): + stack.pop() + last_result = { + **{ + str(i): item for i, item in enumerate(current_target) if not isinstance(item, Undefined) + }, + **current_source, + } + continue + + if Utils.is_overflow(current_source): + source_of = t.cast(OverflowDict, current_source) + sorted_pairs = sorted(_numeric_key_pairs(source_of), key=lambda item: item[0]) + numeric_keys = {key for _, key in sorted_pairs} + result = OverflowDict() + offset = 0 + if not isinstance(current_target, Undefined): + result["0"] = current_target + offset = 1 + for numeric_key, key in sorted_pairs: + val = source_of[key] + if not isinstance(val, Undefined): + # Offset ensures target occupies index "0"; source indices shift up by 1. + result[str(numeric_key + offset)] = val + for key, val in source_of.items(): + if key in numeric_keys: + continue + if not isinstance(val, Undefined): + result[key] = val + stack.pop() + last_result = result + continue + + result_list: t.List[t.Any] = [] + for element in (current_target,): + if not isinstance(element, Undefined): + result_list.append(element) + for element in (current_source,): + if not isinstance(element, Undefined): + result_list.append(element) + stack.pop() + last_result = result_list + continue + + # Prepare a mutable target we can merge into; reuse dict targets for performance. + frame.merge_target = current_target if isinstance(current_target, dict) else dict(current_target) + frame.merge_existing_keys = set(frame.merge_target.keys()) + frame.pending_updates = {} + frame.source_items = list(current_source.items()) + frame.entry_index = 0 + frame.pending_key = None + frame.phase = "map_iter" + continue + + if frame.phase == "map_iter": + merge_target = frame.merge_target + if merge_target is None: # pragma: no cover - internal invariant + raise RuntimeError("merge target is not initialized") # noqa: TRY003 + + if frame.entry_index >= len(frame.source_items): + if frame.pending_updates: + merge_target.update(frame.pending_updates) + stack.pop() + last_result = merge_target + continue + + key, value = frame.source_items[frame.entry_index] + frame.entry_index += 1 + normalized_key = str(key) + + if key in frame.merge_existing_keys: + frame.pending_key = key + frame.phase = "map_wait_child" + stack.append(_MergeFrame(target=merge_target[key], source=value, options=frame.options)) + continue + if normalized_key in frame.merge_existing_keys: + frame.pending_key = normalized_key + frame.phase = "map_wait_child" + stack.append(_MergeFrame(target=merge_target[normalized_key], source=value, options=frame.options)) + continue + + frame.pending_updates[key] = value + continue + + if frame.phase == "map_wait_child": + merge_target = frame.merge_target + if merge_target is None: # pragma: no cover - internal invariant + raise RuntimeError("merge target is not initialized") # noqa: TRY003 + frame.pending_updates[frame.pending_key] = last_result + frame.pending_key = None + frame.phase = "map_iter" + continue + + if frame.phase == "list_iter": + if frame.list_index >= frame.list_max_len: + stack.pop() + last_result = frame.list_merged + continue + + idx = frame.list_index + frame.list_index += 1 + has_target = idx in frame.list_target + has_source = idx in frame.list_source + + if has_target and has_source: + target_value = frame.list_target[idx] + source_value = frame.list_source[idx] + + if isinstance(source_value, Undefined): + if not isinstance(target_value, Undefined): + frame.list_merged.append(target_value) + continue + + frame.phase = "list_wait_child" + stack.append(_MergeFrame(target=target_value, source=source_value, options=frame.options)) + continue + + if has_target: + target_value = frame.list_target[idx] + if not isinstance(target_value, Undefined): + frame.list_merged.append(target_value) + elif has_source: + source_value = frame.list_source[idx] + if not isinstance(source_value, Undefined): + frame.list_merged.append(source_value) + continue + + # frame.phase == "list_wait_child" + frame.list_merged.append(last_result) + frame.phase = "list_iter" + + return last_result @staticmethod def compact(root: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: diff --git a/tests/unit/decode_test.py b/tests/unit/decode_test.py index 8156e45..cf9521e 100644 --- a/tests/unit/decode_test.py +++ b/tests/unit/decode_test.py @@ -929,6 +929,30 @@ def test_does_not_crash_when_parsing_deep_dicts(self) -> None: assert actual_depth == depth + def test_does_not_crash_when_merging_very_deep_keys(self) -> None: + # Single high-depth canary for the deep conflicting merge path. + depth = 12_000 + + path = "a" + ("[p]" * depth) + query = f"{path}[left]=1&{path}[right]=2" + + parsed: t.Optional[t.Mapping[str, t.Any]] + + with does_not_raise(): + parsed = decode(query, DecodeOptions(depth=depth + 2, parameter_limit=10_000_000)) + + assert parsed is not None + assert "a" in parsed + + current = parsed["a"] + for _ in range(depth): + assert isinstance(current, dict) + current = current["p"] + + assert isinstance(current, dict) + assert current["left"] == "1" + assert current["right"] == "2" + def test_parses_null_dicts_correctly(self) -> None: a: t.Dict[str, t.Any] = {"b": "c"} assert decode(a) == {"b": "c"} diff --git a/tests/unit/encode_test.py b/tests/unit/encode_test.py index f42526a..b1115d0 100644 --- a/tests/unit/encode_test.py +++ b/tests/unit/encode_test.py @@ -11,7 +11,7 @@ import pytest from qs_codec import Charset, EncodeOptions, Format, ListFormat, dumps, encode -from qs_codec.encode import _encode, _sentinel +from qs_codec.encode import _CycleState, _encode, _pop_current_node, _sentinel from qs_codec.models.undefined import Undefined from qs_codec.models.weak_wrapper import WeakWrapper from qs_codec.utils.encode_utils import EncodeUtils @@ -867,23 +867,33 @@ def test_encode_depth_guard_prevents_recursion_errors(self) -> None: with pytest.raises(ValueError, match="Maximum encoding depth exceeded"): encode(data, options=EncodeOptions(max_depth=3)) - def test_encode_depth_guard_caps_to_recursion_limit(self, monkeypatch: pytest.MonkeyPatch) -> None: - import importlib - - encode_module = importlib.import_module("qs_codec.encode") - - limit = encode_module._DEPTH_MARGIN + 3 - monkeypatch.setattr(encode_module.sys, "getrecursionlimit", lambda: limit) - + def test_encode_depth_guard_does_not_cap_to_recursion_limit(self) -> None: + # `_get_max_encode_depth` now uses `sys.maxsize` for None and explicit values directly, + # so monkeypatching `sys.getrecursionlimit` is intentionally unnecessary here. data: t.Dict[str, t.Any] = {} current = data for _ in range(5): nxt: t.Dict[str, t.Any] = {} current["a"] = nxt current = nxt + current["leaf"] = "x" - with pytest.raises(ValueError, match="Maximum encoding depth exceeded"): - encode(data, options=EncodeOptions(max_depth=10_000)) + with does_not_raise(): + result = encode(data, options=EncodeOptions(max_depth=10_000, encode=False)) + + assert result.endswith("=x") + + def test_encode_deep_nesting_iterative_stack_safety(self) -> None: + # Keep this above common recursion limits so recursion regressions still fail quickly. + depth = 12_000 + data: t.Dict[str, t.Any] = {"leaf": "x"} + for _ in range(depth): + data = {"a": data} + + with does_not_raise(): + result = encode(data, options=EncodeOptions(encode=False)) + + assert result.endswith("=x") @pytest.mark.parametrize( "data, options, expected", @@ -1853,6 +1863,93 @@ def test_encode_cycle_detection_marks_prior_visit_without_raising(self) -> None: assert tokens == ["root%5Bchild%5D=value"] + def test_encode_cycle_state_prefers_nearest_ancestor_mapping(self) -> None: + value: t.Dict[str, t.Any] = {"child": "value"} + wrapper = WeakWrapper(value) + + top_channel: WeakKeyDictionary = WeakKeyDictionary() + top_channel[wrapper] = 2 + + mid_channel: WeakKeyDictionary = WeakKeyDictionary() + mid_channel[_sentinel] = top_channel + mid_channel[wrapper] = 99 + + side_channel: WeakKeyDictionary = WeakKeyDictionary() + side_channel[_sentinel] = mid_channel + + tokens = _encode( + value=value, + is_undefined=False, + side_channel=side_channel, + prefix="root", + comma_round_trip=False, + comma_compact_nulls=False, + encoder=EncodeUtils.encode, + serialize_date=EncodeUtils.serialize_date, + sort=None, + filter_=None, + formatter=Format.RFC3986.formatter, + format=Format.RFC3986, + generate_array_prefix=ListFormat.INDICES.generator, + allow_empty_lists=False, + strict_null_handling=False, + skip_nulls=False, + encode_dot_in_keys=False, + allow_dots=False, + encode_values_only=False, + charset=Charset.UTF8, + ) + + assert tokens == ["root%5Bchild%5D=value"] + + def test_encode_cycle_state_bootstrap_matches_legacy_side_channel_behavior(self) -> None: + value: t.Dict[str, t.Any] = {"child": "value"} + wrapper = WeakWrapper(value) + + top_channel: WeakKeyDictionary = WeakKeyDictionary() + top_channel[wrapper] = 99 + + parent_channel: WeakKeyDictionary = WeakKeyDictionary() + parent_channel[_sentinel] = top_channel + + side_channel: WeakKeyDictionary = WeakKeyDictionary() + side_channel[_sentinel] = parent_channel + + tokens = _encode( + value=value, + is_undefined=False, + side_channel=side_channel, + prefix="root", + comma_round_trip=False, + comma_compact_nulls=False, + encoder=EncodeUtils.encode, + serialize_date=EncodeUtils.serialize_date, + sort=None, + filter_=None, + formatter=Format.RFC3986.formatter, + format=Format.RFC3986, + generate_array_prefix=ListFormat.INDICES.generator, + allow_empty_lists=False, + strict_null_handling=False, + skip_nulls=False, + encode_dot_in_keys=False, + allow_dots=False, + encode_values_only=False, + charset=Charset.UTF8, + ) + + assert tokens == ["root%5Bchild%5D=value"] + + def test_pop_current_node_noop_when_wrapper_not_present(self) -> None: + value: t.Dict[str, t.Any] = {"child": "value"} + wrapper = WeakWrapper(value) + state = _CycleState() + + with does_not_raise(): + _pop_current_node(state, wrapper) + + assert state.entries == {} + def test_encode_handles_iterable_filter_for_indexable_object(self, monkeypatch: pytest.MonkeyPatch) -> None: class Indexable: def __getitem__(self, key: str) -> str: diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index bb02e73..9db1352 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -594,6 +594,52 @@ def test_merges_array_into_object(self) -> None: {"foo": ["xyzzy"]}, ) == {"foo": {"bar": "baz", "0": "xyzzy"}} + def test_merge_mapping_target_with_scalar_source_returns_target_unchanged(self) -> None: + target = {"a": "b"} + source = "scalar" + + result = Utils.merge(target, source) # type: ignore[arg-type] + + assert result == {"a": "b"} + assert result is target + + def test_merge_structured_lists_prefers_source_when_target_slot_is_undefined(self) -> None: + options = DecodeOptions() + target = [Undefined()] + source = [{"from_source": 1}] + + result = Utils.merge(target, source, options) + + assert result == [{"from_source": 1}] + + def test_merge_deep_maps_without_stack_overflow(self) -> None: + # Keep this above common recursion limits so recursion regressions still fail quickly. + depth = 12_000 + + left: t.Dict[str, t.Any] = {} + cursor = left + for _ in range(depth): + nxt: t.Dict[str, t.Any] = {} + cursor["a"] = nxt + cursor = nxt + + right: t.Dict[str, t.Any] = {} + right_cursor = right + for _ in range(depth): + nxt = {} + right_cursor["a"] = nxt + right_cursor = nxt + right_cursor["leaf"] = "x" + + merged = Utils.merge(left, right) + walk = merged + for _ in range(depth): + assert isinstance(walk, dict) + walk = walk["a"] + + assert isinstance(walk, dict) + assert walk["leaf"] == "x" + def test_combine_both_arrays(self) -> None: a: t.List[int] = [1] b: t.List[int] = [2] @@ -950,6 +996,24 @@ def test_merge_prefers_exact_key_match_before_string_normalization(self) -> None assert result == {"1": {"a": "x", "b": "y"}} assert 1 not in result + def test_merge_does_not_match_source_keys_inserted_earlier_in_same_map_pass(self) -> None: + target = {"3": ""} + source = {"1": {"a": 1}, 1: ("x",)} + + result = Utils.merge(target, source) # type: ignore[arg-type] + + assert result == {"3": "", "1": {"a": 1}, 1: ("x",)} + assert "1" in result + assert 1 in result + + def test_merge_normalized_collision_with_existing_key_uses_existing_target_slot(self) -> None: + target = {"1": {"z": 0}} + source = {"1": {"a": 1}, 1: ("x",)} + + result = Utils.merge(target, source) # type: ignore[arg-type] + + assert result == {"1": {"z": 0, "a": 1, "0": "x"}} + def test_overflow_dict_copy_preserves_type(self) -> None: target = OverflowDict({"0": "a"}) result = target.copy()