diff --git a/scripts/bench_decode_snapshot.py b/scripts/bench_decode_snapshot.py new file mode 100644 index 0000000..4b4f2fd --- /dev/null +++ b/scripts/bench_decode_snapshot.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +"""Local decode benchmark snapshot (not used by CI). + +Usage: + python scripts/bench_decode_snapshot.py + python scripts/bench_decode_snapshot.py --warmups 3 --samples 5 +""" + +from __future__ import annotations + +import argparse +import statistics +import sys +import time +from dataclasses import dataclass +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + +from qs_codec import DecodeOptions, decode + + +@dataclass(frozen=True) +class DecodeCase: + name: str + count: int + comma: bool + utf8_sentinel: bool + value_len: int + iterations: int + + +CASES = ( + DecodeCase(name="C1", count=100, comma=False, utf8_sentinel=False, value_len=8, iterations=120), + DecodeCase(name="C2", count=1000, comma=False, utf8_sentinel=False, value_len=40, iterations=16), + DecodeCase(name="C3", count=1000, comma=True, utf8_sentinel=True, value_len=40, iterations=16), +) + + +def make_value(length: int, seed: int) -> str: + out: list[str] = [] + state = ((seed * 2654435761) + 1013904223) & 0xFFFFFFFF + for _ in range(length): + state ^= (state << 13) & 0xFFFFFFFF + state ^= (state >> 17) & 0xFFFFFFFF + state ^= (state << 5) & 0xFFFFFFFF + + x = state % 62 + if x < 10: + ch = chr(0x30 + x) + elif x < 36: + ch = chr(0x41 + (x - 10)) + else: + ch = chr(0x61 + (x - 36)) + out.append(ch) + + return "".join(out) + + +def build_query(count: int, comma_lists: bool, utf8_sentinel: bool, value_len: int) -> str: + parts: list[str] = [] + if utf8_sentinel: + parts.append("utf8=%E2%9C%93") + + for i in range(count): + key = f"k{i}" + value = "a,b,c" if comma_lists and i % 10 == 0 else make_value(value_len, i) + parts.append(f"{key}={value}") + + return "&".join(parts) + + +def measure_case(case: DecodeCase, warmups: int, samples: int) -> tuple[float, int]: + query = build_query(case.count, case.comma, case.utf8_sentinel, case.value_len) + options = DecodeOptions( + comma=case.comma, + parse_lists=True, + parameter_limit=float("inf"), + raise_on_limit_exceeded=False, + interpret_numeric_entities=False, + charset_sentinel=case.utf8_sentinel, + ignore_query_prefix=False, + ) + + for _ in range(warmups): + decode(query, options=options) + + measurements: list[float] = [] + key_count = 0 + for _ in range(samples): + start = time.perf_counter() + parsed: dict[str, object] = {} + for _ in range(case.iterations): + parsed = decode(query, options=options) + elapsed = (time.perf_counter() - start) * 1000.0 / case.iterations + measurements.append(elapsed) + key_count = len(parsed) + + return statistics.median(measurements), key_count + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Benchmark decode performance snapshot cases (C1/C2/C3).") + parser.add_argument("--warmups", type=int, default=5, help="warm-up runs per case") + parser.add_argument("--samples", type=int, default=7, help="timed samples per case") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + if args.warmups < 0: + raise ValueError("--warmups must be >= 0") + if args.samples <= 0: + raise ValueError("--samples must be > 0") + + print(f"qs.py decode perf snapshot (median of {args.samples} samples)") + print("Decode (public API):") + for case in CASES: + median_ms, key_count = measure_case(case, warmups=args.warmups, samples=args.samples) + print( + " " + f"{case.name}: count={str(case.count).rjust(4)}, " + f"comma={str(case.comma).ljust(5)}, " + f"utf8={str(case.utf8_sentinel).ljust(5)}, " + f"len={str(case.value_len).rjust(2)}: " + f"{median_ms:7.3f} ms/op | keys={key_count}" + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/bench_encode_depth.py b/scripts/bench_encode_depth.py new file mode 100644 index 0000000..0fffd04 --- /dev/null +++ b/scripts/bench_encode_depth.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +"""Local deep-encode benchmark (not used by CI). + +Usage: + python scripts/bench_encode_depth.py + python scripts/bench_encode_depth.py --runs 5 --depths 2000 5000 12000 +""" + +from __future__ import annotations + +import argparse +import statistics +import sys +import time +import typing as t +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + +from qs_codec import encode +from qs_codec.models.encode_options import EncodeOptions + + +def make_nested(depth: int) -> t.Dict[str, t.Any]: + data: t.Dict[str, t.Any] = {"leaf": "x"} + for _ in range(depth): + data = {"a": data} + return data + + +def run_once(depth: int) -> float: + data = make_nested(depth) + start = time.perf_counter() + result = encode(data, options=EncodeOptions(encode=False)) + elapsed = time.perf_counter() - start + if not result.endswith("=x"): + raise RuntimeError(f"unexpected encoded output for depth={depth}") + return elapsed + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Benchmark deep encode performance.") + parser.add_argument("--runs", type=int, default=3, help="timed runs per depth") + parser.add_argument("--warmups", type=int, default=1, help="warm-up runs per depth") + parser.add_argument( + "--depths", + type=int, + nargs="+", + default=[2000, 5000, 12000], + help="depth values to benchmark", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + if args.runs <= 0: + raise ValueError("--runs must be > 0") + if args.warmups < 0: + raise ValueError("--warmups must be >= 0") + if not args.depths: + raise ValueError("--depths must not be empty") + + print(f"python={sys.version.split()[0]} runs={args.runs} warmups={args.warmups}") + for depth in args.depths: + for _ in range(args.warmups): + run_once(depth) + + times = [run_once(depth) for _ in range(args.runs)] + median = statistics.median(times) + print(f"depth={depth} median={median:.6f}s " f"runs=[{', '.join(f'{t_:.6f}' for t_ in times)}]") + + +if __name__ == "__main__": + main() diff --git a/src/qs_codec/constants/__init__.py b/src/qs_codec/constants/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/qs_codec/constants/encode_constants.py b/src/qs_codec/constants/encode_constants.py new file mode 100644 index 0000000..a61d3bc --- /dev/null +++ b/src/qs_codec/constants/encode_constants.py @@ -0,0 +1,6 @@ +"""Constants used in the encoding process of the qs_codec library.""" + +MAX_ENCODING_DEPTH_EXCEEDED: str = "Maximum encoding depth exceeded" +PHASE_START: int = 0 +PHASE_ITERATE: int = 1 +PHASE_AWAIT_CHILD: int = 2 diff --git a/src/qs_codec/decode.py b/src/qs_codec/decode.py index 4f7a052..08261cc 100644 --- a/src/qs_codec/decode.py +++ b/src/qs_codec/decode.py @@ -17,7 +17,6 @@ import re import typing as t from collections.abc import Mapping -from dataclasses import replace from math import isinf from .enums.charset import Charset @@ -26,6 +25,7 @@ from .enums.sentinel import Sentinel from .models.decode_options import DecodeOptions from .models.overflow_dict import OverflowDict +from .models.structured_key_scan import StructuredKeyScan from .models.undefined import UNDEFINED from .utils.decode_utils import DecodeUtils from .utils.utils import Utils @@ -69,37 +69,58 @@ def decode( if not isinstance(value, (str, Mapping)): raise ValueError("value must be a str or a Mapping[str, Any]") - # Work on a local copy so any internal toggles don't leak to caller - opts = replace(options) if options is not None else DecodeOptions() - - # Temporarily toggle parse_lists for THIS call only, and only for raw strings - orig_parse_lists = opts.parse_lists - try: - if isinstance(value, str) and orig_parse_lists: - # Pre-count parameters so we can decide on toggling before tokenization/decoding - _s = value.replace("?", "", 1) if opts.ignore_query_prefix else value - if isinstance(opts.delimiter, re.Pattern): - _parts_count = len(re.split(opts.delimiter, _s)) if _s else 0 - else: - _parts_count = (_s.count(opts.delimiter) + 1) if _s else 0 - if 0 < opts.list_limit < _parts_count: - opts.parse_lists = False - temp_obj: t.Optional[t.Dict[str, t.Any]] = ( - _parse_query_string_values(value, opts) if isinstance(value, str) else dict(value) + opts = options if options is not None else DecodeOptions() + decode_from_string = isinstance(value, str) + str_value: str = t.cast(str, value) if decode_from_string else "" + mapping_value: t.Mapping[str, t.Any] = t.cast(t.Mapping[str, t.Any], value) if not decode_from_string else {} + + parse_lists_effective = opts.parse_lists + if decode_from_string and parse_lists_effective: + # Keep caller options immutable: compute a local parse_lists switch only for this invocation. + query = str_value.replace("?", "", 1) if opts.ignore_query_prefix else str_value + if isinstance(opts.delimiter, re.Pattern): + parts_count = len(re.split(opts.delimiter, query)) if query else 0 + else: + parts_count = (query.count(opts.delimiter) + 1) if query else 0 + if 0 < opts.list_limit < parts_count: + parse_lists_effective = False + + if decode_from_string: + temp_obj: t.Optional[t.Dict[str, t.Any]] = _parse_query_string_values( + str_value, opts, parse_lists=parse_lists_effective ) + else: + temp_obj = dict(mapping_value) + if not temp_obj: + return obj - # Iterate over the keys and setup the new object - if temp_obj: - for key, val in temp_obj.items(): - new_obj: t.Any = _parse_keys(key, val, opts, isinstance(value, str)) + structured_scan = _scan_structured_keys(temp_obj, opts) if decode_from_string else StructuredKeyScan.empty() + if decode_from_string and not structured_scan.has_any_structured_syntax: + return Utils.compact(temp_obj) + + # Iterate over the keys and setup the new object + for key, val in temp_obj.items(): + if ( + decode_from_string + and key not in structured_scan.structured_keys + and key not in structured_scan.structured_roots + ): + # Fast path for flat keys: direct assignment when safe. + # If a structured key already materialized the same key, preserve + # historical merge semantics instead of overwriting. + if key in obj: + obj = Utils.merge(obj, {key: val}, opts) # type: ignore [assignment] + else: + obj[key] = val + continue - if not obj and isinstance(new_obj, dict): - obj = new_obj - continue + new_obj: t.Any = _parse_keys(key, val, opts, decode_from_string, parse_lists=parse_lists_effective) - obj = Utils.merge(obj, new_obj, opts) # type: ignore [assignment] - finally: - opts.parse_lists = orig_parse_lists + if not obj and isinstance(new_obj, dict): + obj = new_obj + continue + + obj = Utils.merge(obj, new_obj, opts) # type: ignore [assignment] return Utils.compact(obj) @@ -117,6 +138,80 @@ def loads(value: t.Optional[str], options: t.Optional[DecodeOptions] = None) -> return decode(value, options) +def _first_structured_split_index(key: str, allow_dots: bool) -> int: + """Return the earliest index that indicates structured syntax in ``key``.""" + split_at = key.find("[") + if not allow_dots: + return split_at + + dot_index = key.find(".") + if dot_index >= 0 and (split_at < 0 or dot_index < split_at): + split_at = dot_index + + encoded_dot_index = -1 + if "%" in key: + upper = key.find("%2E") + lower = key.find("%2e") + if upper >= 0 and lower >= 0: + encoded_dot_index = upper if upper < lower else lower + else: + encoded_dot_index = upper if upper >= 0 else lower + + if encoded_dot_index >= 0 and (split_at < 0 or encoded_dot_index < split_at): + split_at = encoded_dot_index + + return split_at + + +def _leading_structured_root(key: str, options: DecodeOptions) -> str: + """Extract root key for leading-bracket structured keys (``[]`` normalizes to ``"0"``).""" + segments = DecodeUtils.split_key_into_segments( + original_key=key, + allow_dots=t.cast(bool, options.allow_dots), + max_depth=options.depth, + strict_depth=options.strict_depth, + ) + if not segments: + return key + + first = segments[0] + if not first.startswith("["): + return first + + last = first.rfind("]") + clean_root = first[1:last] if last > 0 else first[1:] + return clean_root or "0" + + +def _scan_structured_keys(temp_obj: Mapping[str, t.Any], options: DecodeOptions) -> StructuredKeyScan: + """Pre-scan keys to enable flat-query and mixed-query decode bypasses.""" + if not temp_obj: + return StructuredKeyScan.empty() + + allow_dots = t.cast(bool, options.allow_dots) + structured_roots: t.Set[str] = set() + structured_keys: t.Set[str] = set() + + for key in temp_obj.keys(): + split_at = _first_structured_split_index(key, allow_dots) + if split_at < 0: + continue + structured_keys.add(key) + if split_at == 0: + structured_roots.add(_leading_structured_root(key, options)) + else: + structured_roots.add(key[:split_at]) + + if not structured_keys: + return StructuredKeyScan.empty() + + return StructuredKeyScan( + has_any_structured_syntax=True, + structured_roots=frozenset(structured_roots), + structured_keys=frozenset(structured_keys), + ) + + def _interpret_numeric_entities(value: str) -> str: """Convert HTML numeric entities (e.g., ``©``) to their character equivalents. @@ -162,7 +257,9 @@ def _parse_array_value(value: t.Any, options: DecodeOptions, current_list_length return value -def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str, t.Any]: +def _parse_query_string_values( + value: str, options: DecodeOptions, *, parse_lists: t.Optional[bool] = None +) -> t.Dict[str, t.Any]: """Tokenize a raw query string into a flat ``Dict[str, Any]``. Responsibilities @@ -185,6 +282,7 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str ``_parse_keys`` / ``_parse_object``. """ obj: t.Dict[str, t.Any] = {} + parse_lists_enabled = options.parse_lists if parse_lists is None else parse_lists clean_str: str = value.replace("?", "", 1) if options.ignore_query_prefix else value # Normalize %5B/%5D to literal brackets before splitting (case-insensitive). @@ -244,6 +342,7 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str # Local, non-optional decoder reference for type-checkers decoder_fn: t.Callable[..., t.Optional[str]] = options.decoder or DecodeUtils.decode + duplicates = options.duplicates # Iterate over parts and decode each key/value pair. for i, _ in enumerate(parts): @@ -251,53 +350,67 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str continue part: str = parts[i] + if not part: + continue bracket_equals_pos: int = part.find("]=") pos: int = part.find("=") if bracket_equals_pos == -1 else (bracket_equals_pos + 1) # Decode key and value with a key-aware decoder; skip pairs whose key decodes to None + raw_key = "" if pos == -1: key_decoded = decoder_fn(part, charset, kind=DecodeKind.KEY) if key_decoded is None: continue key: str = key_decoded + if not key: + continue val: t.Any = None if options.strict_null_handling else "" else: - key_decoded = decoder_fn(part[:pos], charset, kind=DecodeKind.KEY) + raw_key = part[:pos] + key_decoded = decoder_fn(raw_key, charset, kind=DecodeKind.KEY) if key_decoded is None: continue key = key_decoded - val = Utils.apply( - _parse_array_value( - part[pos + 1 :], - options, - len(obj[key]) if key in obj and isinstance(obj[key], (list, tuple)) else 0, - ), - lambda v: decoder_fn(v, charset, kind=DecodeKind.VALUE), + if not key: + continue + parsed_value = _parse_array_value( + part[pos + 1 :], + options, + len(obj[key]) if key in obj and isinstance(obj[key], (list, tuple)) else 0, ) + if isinstance(parsed_value, (list, tuple)): + val = [decoder_fn(v, charset, kind=DecodeKind.VALUE) for v in parsed_value] + else: + val = decoder_fn(parsed_value, charset, kind=DecodeKind.VALUE) if val and options.interpret_numeric_entities and charset == Charset.LATIN1: val = _interpret_numeric_entities( val if isinstance(val, str) else ",".join(val) if isinstance(val, (list, tuple)) else str(val) ) - # If the pair used empty brackets syntax and list parsing is enabled, force an array container. - # Always wrap exactly once to preserve list-of-lists semantics when comma splitting applies. - if options.parse_lists and "[]=" in part: + # Upstream parity: if token contains "[]=", only wrap values that are already arrays + # (typically produced by comma splitting), preserving list-of-lists semantics. + if parse_lists_enabled and pos != -1 and "[]=" in part and isinstance(val, (list, tuple)): val = [val] existing: bool = key in obj # Combine/overwrite according to the configured duplicates policy. - if existing and options.duplicates == Duplicates.COMBINE: + if existing and duplicates == Duplicates.COMBINE: obj[key] = Utils.combine(obj[key], val, options) - elif not existing or options.duplicates == Duplicates.LAST: + elif not existing or duplicates == Duplicates.LAST: obj[key] = val return obj def _parse_object( - chain: t.Union[t.List[str], t.Tuple[str, ...]], val: t.Any, options: DecodeOptions, values_parsed: bool + chain: t.Union[t.List[str], t.Tuple[str, ...]], + val: t.Any, + options: DecodeOptions, + values_parsed: bool, + *, + parse_lists: t.Optional[bool] = None, ) -> t.Any: """Fold a flat key-path chain into nested containers. @@ -326,6 +439,7 @@ def _parse_object( handled by the splitter. - When list parsing is disabled and an empty segment is encountered, coerces to ``{"0": leaf}`` to preserve round-trippability with other ports. """ + parse_lists_enabled = options.parse_lists if parse_lists is None else parse_lists current_list_length: int = 0 # If the chain ends with an empty list marker, compute current list length for limit checks. @@ -361,7 +475,7 @@ def _parse_object( obj: t.Optional[t.Union[t.Dict[str, t.Any], t.List[t.Any]]] root: str = chain[i] - if root == "[]" and options.parse_lists: + if root == "[]" and parse_lists_enabled: if Utils.is_overflow(leaf): obj = leaf elif options.allow_empty_lists and (leaf == "" or (options.strict_null_handling and leaf is None)): @@ -393,7 +507,7 @@ def _parse_object( except (ValueError, TypeError): index = None - if not options.parse_lists and decoded_root == "": + if not parse_lists_enabled and decoded_root == "": if Utils.is_overflow(leaf): obj = leaf else: @@ -403,20 +517,29 @@ def _parse_object( and index >= 0 and root != decoded_root and str(index) == decoded_root - and options.parse_lists + and parse_lists_enabled and index <= options.list_limit ): obj = [UNDEFINED for _ in range(index + 1)] obj[index] = leaf else: - obj[str(index) if index is not None else decoded_root] = leaf + # Preserve the literal decoded key for non-array roots (e.g. "[01]" -> "01"), + # matching Node `qs` behavior for leading-zero numeric-like segments. + obj[decoded_root] = leaf leaf = obj return leaf -def _parse_keys(given_key: t.Optional[str], val: t.Any, options: DecodeOptions, values_parsed: bool) -> t.Any: +def _parse_keys( + given_key: t.Optional[str], + val: t.Any, + options: DecodeOptions, + values_parsed: bool, + *, + parse_lists: t.Optional[bool] = None, +) -> t.Any: """Split a full key string into segments and dispatch to ``_parse_object``. Returns ``None`` for empty keys (mirrors upstream behavior). @@ -431,4 +554,4 @@ def _parse_keys(given_key: t.Optional[str], val: t.Any, options: DecodeOptions, strict_depth=options.strict_depth, ) - return _parse_object(keys, val, options, values_parsed) + return _parse_object(keys, val, options, values_parsed, parse_lists=parse_lists) diff --git a/src/qs_codec/encode.py b/src/qs_codec/encode.py index 712017d..6c94b47 100644 --- a/src/qs_codec/encode.py +++ b/src/qs_codec/encode.py @@ -18,15 +18,17 @@ from collections.abc import Mapping as ABCMapping from collections.abc import Sequence as ABCSequence from copy import deepcopy -from dataclasses import dataclass, field from datetime import datetime from functools import cmp_to_key from weakref import WeakKeyDictionary +from .constants.encode_constants import MAX_ENCODING_DEPTH_EXCEEDED, PHASE_AWAIT_CHILD, PHASE_ITERATE, PHASE_START from .enums.charset import Charset from .enums.format import Format from .enums.list_format import ListFormat from .enums.sentinel import Sentinel +from .models.cycle_state import CycleState +from .models.encode_frame import EncodeFrame from .models.encode_options import EncodeOptions from .models.key_path_node import KeyPathNode from .models.undefined import UNDEFINED, Undefined @@ -174,10 +176,6 @@ def encode(value: t.Any, options: EncodeOptions = EncodeOptions()) -> str: # Unique placeholder used as a key within the side-channel chain to pass context down traversal frames. _sentinel: WeakWrapper = WeakWrapper({}) -MAX_ENCODING_DEPTH_EXCEEDED = "Maximum encoding depth exceeded" -_PHASE_START = 0 -_PHASE_ITERATE = 1 -_PHASE_AWAIT_CHILD = 2 def _get_max_encode_depth(max_depth: t.Optional[int]) -> int: @@ -186,159 +184,6 @@ def _get_max_encode_depth(max_depth: t.Optional[int]) -> int: return max_depth -class _EncodeFrame: - """Mutable traversal frame for iterative encoding.""" - - __slots__ = ( - "add_query_prefix", - "adjusted_path", - "allow_dots", - "allow_empty_lists", - "charset", - "comma_compact_nulls", - "comma_round_trip", - "cycle_level", - "cycle_pushed", - "cycle_state", - "depth", - "encode_dot_in_keys", - "encode_values_only", - "encoder", - "filter_", - "format", - "formatter", - "generate_array_prefix", - "index", - "is_mapping", - "is_sequence", - "is_undefined", - "max_depth", - "obj", - "obj_id", - "obj_keys", - "path", - "phase", - "prefix", - "serialize_date", - "side_channel", - "skip_nulls", - "sort", - "step", - "strict_null_handling", - "value", - "values", - ) - value: t.Any - is_undefined: bool - side_channel: WeakKeyDictionary - prefix: t.Optional[str] - comma_round_trip: t.Optional[bool] - comma_compact_nulls: bool - encoder: t.Optional[t.Callable[[t.Any, t.Optional[Charset], t.Optional[Format]], str]] - serialize_date: t.Union[t.Callable[[datetime], t.Optional[str]], str] - sort: t.Optional[t.Callable[[t.Any, t.Any], int]] - filter_: t.Optional[t.Union[t.Callable, t.Sequence[t.Union[str, int]]]] - formatter: t.Optional[t.Callable[[str], str]] - format: Format - generate_array_prefix: t.Callable[[str, t.Optional[str]], str] - allow_empty_lists: bool - strict_null_handling: bool - skip_nulls: bool - encode_dot_in_keys: bool - allow_dots: bool - encode_values_only: bool - charset: t.Optional[Charset] - add_query_prefix: bool - depth: int - max_depth: t.Optional[int] - path: t.Optional[KeyPathNode] - phase: int - obj: t.Any - obj_id: t.Optional[int] - is_mapping: bool - is_sequence: bool - step: int - obj_keys: t.List[t.Any] - values: t.List[t.Any] - index: int - adjusted_path: t.Optional[KeyPathNode] - cycle_state: t.Optional["_CycleState"] - cycle_level: t.Optional[int] - cycle_pushed: bool - - def __init__( - self, - value: t.Any, - is_undefined: bool, - side_channel: WeakKeyDictionary, - prefix: t.Optional[str], - comma_round_trip: t.Optional[bool], - comma_compact_nulls: bool, - encoder: t.Optional[t.Callable[[t.Any, t.Optional[Charset], t.Optional[Format]], str]], - serialize_date: t.Union[t.Callable[[datetime], t.Optional[str]], str], - sort: t.Optional[t.Callable[[t.Any, t.Any], int]], - filter_: t.Optional[t.Union[t.Callable, t.Sequence[t.Union[str, int]]]], - formatter: t.Optional[t.Callable[[str], str]], - format: Format, - generate_array_prefix: t.Callable[[str, t.Optional[str]], str], - allow_empty_lists: bool, - strict_null_handling: bool, - skip_nulls: bool, - encode_dot_in_keys: bool, - allow_dots: bool, - encode_values_only: bool, - charset: t.Optional[Charset], - add_query_prefix: bool, - depth: int, - max_depth: t.Optional[int], - path: t.Optional[KeyPathNode] = None, - cycle_state: t.Optional["_CycleState"] = None, - cycle_level: t.Optional[int] = None, - ) -> None: - self.value = value - self.is_undefined = is_undefined - self.side_channel = side_channel - self.prefix = prefix - self.comma_round_trip = comma_round_trip - self.comma_compact_nulls = comma_compact_nulls - self.encoder = encoder - self.serialize_date = serialize_date - self.sort = sort - self.filter_ = filter_ - self.formatter = formatter - self.format = format - self.generate_array_prefix = generate_array_prefix - self.allow_empty_lists = allow_empty_lists - self.strict_null_handling = strict_null_handling - self.skip_nulls = skip_nulls - self.encode_dot_in_keys = encode_dot_in_keys - self.allow_dots = allow_dots - self.encode_values_only = encode_values_only - self.charset = charset - self.add_query_prefix = add_query_prefix - self.depth = depth - self.max_depth = max_depth - self.path = path - self.phase = _PHASE_START - self.obj = None - self.obj_id = None - self.is_mapping = False - self.is_sequence = False - self.step = 0 - self.obj_keys = [] - self.values = [] - self.index = 0 - self.adjusted_path = None - self.cycle_state = cycle_state - self.cycle_level = cycle_level - self.cycle_pushed = False - - -@dataclass -class _CycleState: - entries: t.Dict[int, t.List[t.Tuple[int, t.Any, bool]]] = field(default_factory=dict) - - def _identity_key(value: t.Any) -> int: """Return an identity-stable integer key for cycle bookkeeping. @@ -359,7 +204,7 @@ def _identity_key(value: t.Any) -> int: return id(value) -def _bootstrap_cycle_state_from_side_channel(side_channel: WeakKeyDictionary) -> t.Tuple[_CycleState, int]: +def _bootstrap_cycle_state_from_side_channel(side_channel: WeakKeyDictionary) -> t.Tuple[CycleState, int]: """ Build O(1) ancestry lookup state from an existing side-channel chain. @@ -373,7 +218,7 @@ def _bootstrap_cycle_state_from_side_channel(side_channel: WeakKeyDictionary) -> chain.append(tmp_sc) tmp_sc = tmp_sc.get(_sentinel) # type: ignore[assignment] - state = _CycleState() + state = CycleState() for level, ancestor in enumerate(reversed(chain)): is_top = ancestor.get(_sentinel) is None for key, pos in ancestor.items(): @@ -384,7 +229,7 @@ def _bootstrap_cycle_state_from_side_channel(side_channel: WeakKeyDictionary) -> return state, len(chain) -def _compute_step_and_check_cycle(state: _CycleState, node_key: t.Any, current_level: int) -> int: +def _compute_step_and_check_cycle(state: CycleState, node_key: t.Any, current_level: int) -> int: """ Compute the current cycle-detection "step" and raise on circular reference. @@ -406,12 +251,12 @@ def _compute_step_and_check_cycle(state: _CycleState, node_key: t.Any, current_l return 0 if is_top else distance -def _push_current_node(state: _CycleState, node_key: t.Any, current_level: int, pos: int, is_top: bool) -> None: +def _push_current_node(state: CycleState, node_key: t.Any, current_level: int, pos: int, is_top: bool) -> None: key_id = node_key if isinstance(node_key, int) else _identity_key(node_key) state.entries.setdefault(key_id, []).append((current_level, pos, is_top)) -def _pop_current_node(state: _CycleState, node_key: t.Any) -> None: +def _pop_current_node(state: CycleState, node_key: t.Any) -> None: key_id = node_key if isinstance(node_key, int) else _identity_key(node_key) entries = state.entries.get(key_id) if not entries: @@ -515,8 +360,8 @@ def _encode( """ last_result: t.Union[t.List[t.Any], t.Tuple[t.Any, ...], t.Any, None] = None - stack: t.List[_EncodeFrame] = [ - _EncodeFrame( + stack: t.List[EncodeFrame] = [ + EncodeFrame( value=value, is_undefined=is_undefined, side_channel=side_channel, @@ -546,7 +391,7 @@ def _encode( while stack: frame = stack[-1] - if frame.phase == _PHASE_START: + if frame.phase == PHASE_START: if frame.max_depth is None: frame.max_depth = _get_max_encode_depth(None) if frame.depth > frame.max_depth: @@ -678,10 +523,10 @@ def _encode( continue frame.index = 0 - frame.phase = _PHASE_ITERATE + frame.phase = PHASE_ITERATE continue - elif frame.phase == _PHASE_ITERATE: + elif frame.phase == PHASE_ITERATE: if frame.index >= len(frame.obj_keys): if frame.cycle_pushed and frame.obj_id is not None and frame.cycle_state is not None: _pop_current_node(frame.cycle_state, frame.obj_id) @@ -745,9 +590,9 @@ def _encode( else: child_path = adjusted_path.append(f".{encoded_key}" if frame.allow_dots else f"[{encoded_key}]") - frame.phase = _PHASE_AWAIT_CHILD + frame.phase = PHASE_AWAIT_CHILD stack.append( - _EncodeFrame( + EncodeFrame( value=_value, is_undefined=_value_undefined, side_channel=frame.side_channel, @@ -785,13 +630,13 @@ def _encode( continue else: - if frame.phase != _PHASE_AWAIT_CHILD: # pragma: no cover - internal invariant + if frame.phase != PHASE_AWAIT_CHILD: # pragma: no cover - internal invariant raise RuntimeError("Unexpected _encode frame phase") # noqa: TRY003 if isinstance(last_result, (list, tuple)): frame.values.extend(last_result) else: frame.values.append(last_result) - frame.phase = _PHASE_ITERATE + frame.phase = PHASE_ITERATE return [] if last_result is None else last_result diff --git a/src/qs_codec/models/cycle_state.py b/src/qs_codec/models/cycle_state.py new file mode 100644 index 0000000..7d829f1 --- /dev/null +++ b/src/qs_codec/models/cycle_state.py @@ -0,0 +1,11 @@ +"""CycleState model for storing cycle state information.""" + +import typing as t +from dataclasses import dataclass, field + + +@dataclass +class CycleState: + """Model for storing cycle state information.""" + + entries: t.Dict[int, t.List[t.Tuple[int, t.Any, bool]]] = field(default_factory=dict) diff --git a/src/qs_codec/models/decode_options.py b/src/qs_codec/models/decode_options.py index eb7479d..0404697 100644 --- a/src/qs_codec/models/decode_options.py +++ b/src/qs_codec/models/decode_options.py @@ -160,6 +160,14 @@ def __post_init__(self) -> None: if raw_dec is None: raw_dec = DecodeUtils.decode + default_decoder = DecodeUtils.decode + # Fast path for the library default decoder: skip adapter dispatch completely. + if raw_dec is default_decoder or getattr(raw_dec, "__func__", None) is getattr( + default_decoder, "__func__", None + ): + self.decoder = DecodeUtils.decode + return + user_dec = raw_dec # Precompute dispatch to avoid per-call introspection. diff --git a/src/qs_codec/models/encode_frame.py b/src/qs_codec/models/encode_frame.py new file mode 100644 index 0000000..0292c57 --- /dev/null +++ b/src/qs_codec/models/encode_frame.py @@ -0,0 +1,160 @@ +"""Mutable traversal frame for iterative encoding.""" + +import typing as t +from datetime import datetime +from weakref import WeakKeyDictionary + +from ..constants.encode_constants import PHASE_START +from ..enums.charset import Charset +from ..enums.format import Format +from ..models.key_path_node import KeyPathNode +from .cycle_state import CycleState + + +class EncodeFrame: + """Mutable traversal frame for iterative encoding.""" + + __slots__ = ( + "add_query_prefix", + "adjusted_path", + "allow_dots", + "allow_empty_lists", + "charset", + "comma_compact_nulls", + "comma_round_trip", + "cycle_level", + "cycle_pushed", + "cycle_state", + "depth", + "encode_dot_in_keys", + "encode_values_only", + "encoder", + "filter_", + "format", + "formatter", + "generate_array_prefix", + "index", + "is_mapping", + "is_sequence", + "is_undefined", + "max_depth", + "obj", + "obj_id", + "obj_keys", + "path", + "phase", + "prefix", + "serialize_date", + "side_channel", + "skip_nulls", + "sort", + "step", + "strict_null_handling", + "value", + "values", + ) + value: t.Any + is_undefined: bool + side_channel: WeakKeyDictionary + prefix: t.Optional[str] + comma_round_trip: t.Optional[bool] + comma_compact_nulls: bool + encoder: t.Optional[t.Callable[[t.Any, t.Optional[Charset], t.Optional[Format]], str]] + serialize_date: t.Union[t.Callable[[datetime], t.Optional[str]], str] + sort: t.Optional[t.Callable[[t.Any, t.Any], int]] + filter_: t.Optional[t.Union[t.Callable, t.Sequence[t.Union[str, int]]]] + formatter: t.Optional[t.Callable[[str], str]] + format: Format + generate_array_prefix: t.Callable[[str, t.Optional[str]], str] + allow_empty_lists: bool + strict_null_handling: bool + skip_nulls: bool + encode_dot_in_keys: bool + allow_dots: bool + encode_values_only: bool + charset: t.Optional[Charset] + add_query_prefix: bool + depth: int + max_depth: t.Optional[int] + path: t.Optional[KeyPathNode] + phase: int + obj: t.Any + obj_id: t.Optional[int] + is_mapping: bool + is_sequence: bool + step: int + obj_keys: t.List[t.Any] + values: t.List[t.Any] + index: int + adjusted_path: t.Optional[KeyPathNode] + cycle_state: t.Optional[CycleState] + cycle_level: t.Optional[int] + cycle_pushed: bool + + def __init__( + self, + value: t.Any, + is_undefined: bool, + side_channel: WeakKeyDictionary, + prefix: t.Optional[str], + comma_round_trip: t.Optional[bool], + comma_compact_nulls: bool, + encoder: t.Optional[t.Callable[[t.Any, t.Optional[Charset], t.Optional[Format]], str]], + serialize_date: t.Union[t.Callable[[datetime], t.Optional[str]], str], + sort: t.Optional[t.Callable[[t.Any, t.Any], int]], + filter_: t.Optional[t.Union[t.Callable, t.Sequence[t.Union[str, int]]]], + formatter: t.Optional[t.Callable[[str], str]], + format: Format, + generate_array_prefix: t.Callable[[str, t.Optional[str]], str], + allow_empty_lists: bool, + strict_null_handling: bool, + skip_nulls: bool, + encode_dot_in_keys: bool, + allow_dots: bool, + encode_values_only: bool, + charset: t.Optional[Charset], + add_query_prefix: bool, + depth: int, + max_depth: t.Optional[int], + path: t.Optional[KeyPathNode] = None, + cycle_state: t.Optional[CycleState] = None, + cycle_level: t.Optional[int] = None, + ) -> None: + """Initialize an EncodeFrame with the given parameters.""" + self.value = value + self.is_undefined = is_undefined + self.side_channel = side_channel + self.prefix = prefix + self.comma_round_trip = comma_round_trip + self.comma_compact_nulls = comma_compact_nulls + self.encoder = encoder + self.serialize_date = serialize_date + self.sort = sort + self.filter_ = filter_ + self.formatter = formatter + self.format = format + self.generate_array_prefix = generate_array_prefix + self.allow_empty_lists = allow_empty_lists + self.strict_null_handling = strict_null_handling + self.skip_nulls = skip_nulls + self.encode_dot_in_keys = encode_dot_in_keys + self.allow_dots = allow_dots + self.encode_values_only = encode_values_only + self.charset = charset + self.add_query_prefix = add_query_prefix + self.depth = depth + self.max_depth = max_depth + self.path = path + self.phase = PHASE_START + self.obj = None + self.obj_id = None + self.is_mapping = False + self.is_sequence = False + self.step = 0 + self.obj_keys = [] + self.values = [] + self.index = 0 + self.adjusted_path = None + self.cycle_state = cycle_state + self.cycle_level = cycle_level + self.cycle_pushed = False diff --git a/src/qs_codec/models/structured_key_scan.py b/src/qs_codec/models/structured_key_scan.py new file mode 100644 index 0000000..49780c8 --- /dev/null +++ b/src/qs_codec/models/structured_key_scan.py @@ -0,0 +1,18 @@ +"""Defines the StructuredKeyScan dataclass for representing the results of scanning for structured keys in query strings.""" + +import typing as t +from dataclasses import dataclass + + +@dataclass(frozen=True) +class StructuredKeyScan: + """Represents the results of scanning for structured keys in query strings.""" + + has_any_structured_syntax: bool + structured_roots: t.FrozenSet[str] + structured_keys: t.FrozenSet[str] + + @classmethod + def empty(cls) -> "StructuredKeyScan": + """Factory for an empty scan result when no structured syntax is detected.""" + return cls(has_any_structured_syntax=False, structured_roots=frozenset(), structured_keys=frozenset()) diff --git a/tests/unit/decode_test.py b/tests/unit/decode_test.py index cf9521e..bf52c24 100644 --- a/tests/unit/decode_test.py +++ b/tests/unit/decode_test.py @@ -7,7 +7,13 @@ import pytest from qs_codec import Charset, DecodeOptions, Duplicates, decode, load, loads -from qs_codec.decode import _parse_object +from qs_codec.decode import ( + _first_structured_split_index, + _leading_structured_root, + _parse_keys, + _parse_object, + _scan_structured_keys, +) from qs_codec.enums.decode_kind import DecodeKind from qs_codec.models.overflow_dict import OverflowDict from qs_codec.utils.decode_utils import DecodeUtils @@ -859,6 +865,12 @@ def _decoder(s: t.Optional[str], charset: t.Optional[Charset]) -> t.Any: {"foo": [["1", "2", "3"], "a"]}, id="string-second-list", ), + pytest.param( + "a[b]=x,y[]=z", + DecodeOptions(comma=True), + {"a": {"b": [["x", "y[]=z"]]}}, + id="comma-value-containing-empty-brackets-marker", + ), ], ) def test_parses_brackets_holds_list_of_lists_when_having_two_parts_of_strings_with_comma_as_list_divider( @@ -866,6 +878,9 @@ def test_parses_brackets_holds_list_of_lists_when_having_two_parts_of_strings_wi ) -> None: assert decode(query, options) == expected + def test_does_not_force_list_when_empty_brackets_marker_is_only_in_value(self) -> None: + assert decode("a[foo]=x[]=", DecodeOptions(comma=True)) == {"a": {"foo": "x[]="}} + @pytest.mark.parametrize( "query, expected", [ @@ -1474,6 +1489,199 @@ def test_parse_lists_toggle_does_not_leak_across_calls(self) -> None: assert res2 == {"a": ["1", "2"]} +class TestDecodeFastPathParity: + @pytest.mark.parametrize( + "query, options, expected", + [ + pytest.param("a=1&b=2", None, {"a": "1", "b": "2"}, id="flat-default"), + pytest.param( + "foo=bar&foo=baz", + DecodeOptions(duplicates=Duplicates.COMBINE), + {"foo": ["bar", "baz"]}, + id="flat-duplicates-combine", + ), + pytest.param( + "foo=bar&foo=baz", + DecodeOptions(duplicates=Duplicates.FIRST), + {"foo": "bar"}, + id="flat-duplicates-first", + ), + pytest.param( + "foo=bar&foo=baz", + DecodeOptions(duplicates=Duplicates.LAST), + {"foo": "baz"}, + id="flat-duplicates-last", + ), + pytest.param( + "a&b=", + DecodeOptions(strict_null_handling=True), + {"a": None, "b": ""}, + id="flat-strict-null-handling", + ), + pytest.param( + "a=%F8", + DecodeOptions(charset_sentinel=True, charset=Charset.LATIN1), + {"a": "ø"}, + id="flat-charset-sentinel-absent", + ), + pytest.param( + "utf8=%E2%9C%93&a=%C3%B8", + DecodeOptions(charset_sentinel=True, charset=Charset.LATIN1), + {"a": "ø"}, + id="flat-charset-sentinel-present", + ), + pytest.param( + "002=1&2=2", + None, + {"002": "1", "2": "2"}, + id="flat-leading-zero-key-remains-distinct", + ), + ], + ) + def test_flat_decode_parity( + self, query: str, options: t.Optional[DecodeOptions], expected: t.Mapping[str, t.Any] + ) -> None: + if options is not None: + assert decode(query, options) == expected + else: + assert decode(query) == expected + + def test_kind_aware_decoder_still_receives_key_value_for_flat_query(self) -> None: + calls: t.List[DecodeKind] = [] + + def _decoder(s: t.Optional[str], charset: t.Optional[Charset], *, kind: DecodeKind = DecodeKind.VALUE) -> t.Any: + calls.append(kind) + return DecodeUtils.decode(s, charset=charset, kind=kind) + + assert decode("k1=v1&k2=v2", DecodeOptions(decoder=_decoder)) == {"k1": "v1", "k2": "v2"} + assert calls == [DecodeKind.KEY, DecodeKind.VALUE, DecodeKind.KEY, DecodeKind.VALUE] + + def test_legacy_decoder_still_works_for_flat_query(self) -> None: + calls: t.List[t.Tuple[t.Optional[str], t.Optional[Charset]]] = [] + + def _legacy(token: t.Optional[str], charset: t.Optional[Charset]) -> t.Optional[str]: + calls.append((token, charset)) + return DecodeUtils.decode(token, charset=charset) + + assert decode("a=b&c=d", DecodeOptions(decoder=None, legacy_decoder=_legacy)) == {"a": "b", "c": "d"} + # KEY and VALUE for each pair still route through the legacy adapter. + assert [token for token, _ in calls] == ["a", "b", "c", "d"] + + @pytest.mark.parametrize( + "duplicates, expected", + [ + pytest.param(Duplicates.COMBINE, {"2": ["1", "3"], "002": "2"}, id="combine"), + pytest.param(Duplicates.FIRST, {"2": "1", "002": "2"}, id="first"), + pytest.param(Duplicates.LAST, {"2": "3", "002": "2"}, id="last"), + ], + ) + def test_duplicate_policy_does_not_merge_leading_zero_numeric_keys( + self, duplicates: Duplicates, expected: t.Mapping[str, t.Any] + ) -> None: + query = "2=1&002=2&2=3" + result = decode(query, DecodeOptions(duplicates=duplicates)) + assert result == expected + + +class TestDecodeMixedBypassParity: + @pytest.mark.parametrize( + "query, options, expected", + [ + pytest.param("a=1&a[b]=2", None, {"a": ["1", {"b": "2"}]}, id="flat-before-structured"), + pytest.param("a[b]=2&a=1", None, {"a": {"b": "2"}}, id="structured-before-flat"), + pytest.param("0=y&[]=x", None, {"0": "x"}, id="flat-zero-collides-leading-bracket-root"), + pytest.param("[]=x&0=y", None, {"0": ["x", "y"]}, id="leading-bracket-root-collides-flat-zero"), + pytest.param( + "a[b]=1&002=2", + None, + {"a": {"b": "1"}, "002": "2"}, + id="mixed-structured-and-leading-zero-flat-key", + ), + pytest.param( + "a=2&a.b=1", + DecodeOptions(allow_dots=True), + {"a": ["2", {"b": "1"}]}, + id="allow-dots-flat-root-collision", + ), + pytest.param( + "a%252Eb=1&a=2", + DecodeOptions(allow_dots=True, decode_dot_in_keys=True), + {"a.b": "1", "a": "2"}, + id="encoded-dot-structured-plus-flat-root", + ), + pytest.param( + "[]=a&[]=b&1=c", + None, + {"0": "a", "1": ["b", "c"]}, + id="flat-key-collides-with-index-materialized-by-empty-brackets", + ), + pytest.param( + "[01]=x&1=y", + None, + {"01": "x", "1": "y"}, + id="leading-zero-bracket-root-does-not-collide-with-canonical-numeric-flat-key", + ), + pytest.param( + "[01]=x&01=y", + None, + {"01": ["x", "y"]}, + id="leading-zero-bracket-root-collides-with-same-literal-flat-key", + ), + pytest.param( + "01=y&[01]=x", + None, + {"01": ["y", "x"]}, + id="leading-zero-flat-key-merges-with-later-leading-zero-bracket-root", + ), + ], + ) + def test_mixed_decode_parity( + self, query: str, options: t.Optional[DecodeOptions], expected: t.Mapping[str, t.Any] + ) -> None: + if options is not None: + assert decode(query, options) == expected + else: + assert decode(query) == expected + + +class TestDecodeInternalCoverage: + def test_first_structured_split_index_prefers_earliest_encoded_dot_variant(self) -> None: + key = "a%2eb%2Ec" + assert _first_structured_split_index(key, allow_dots=True) == key.find("%2e") + + def test_leading_structured_root_returns_plain_root_segment(self) -> None: + assert _leading_structured_root("a[b]=1", DecodeOptions()) == "a" + + def test_leading_structured_root_returns_key_when_split_has_no_segments( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + def _empty_segments(*_args: t.Any, **_kwargs: t.Any) -> t.List[str]: + return [] + + monkeypatch.setattr(DecodeUtils, "split_key_into_segments", _empty_segments) + assert _leading_structured_root("a[b]=1", DecodeOptions()) == "a[b]=1" + + def test_scan_structured_keys_empty_input(self) -> None: + scan = _scan_structured_keys({}, DecodeOptions()) + assert scan.has_any_structured_syntax is False + assert scan.structured_keys == frozenset() + assert scan.structured_roots == frozenset() + + def test_decode_skips_no_equals_pair_when_key_decodes_to_empty_string(self) -> None: + def _decoder( + token: t.Optional[str], charset: t.Optional[Charset], *, kind: DecodeKind = DecodeKind.VALUE + ) -> t.Optional[str]: + if kind is DecodeKind.KEY and token == "drop": + return "" + return DecodeUtils.decode(token, charset=charset, kind=kind) + + result = decode("drop&keep=1", DecodeOptions(decoder=_decoder)) + assert result == {"keep": "1"} + + def test_parse_keys_returns_none_for_empty_key(self) -> None: + assert _parse_keys(None, "value", DecodeOptions(), values_parsed=True) is None + + class TestCSharpParityEncodedDotBehavior: def test_top_level_allowdots_true_decodedot_true_splits_plain_and_encoded_dot(self) -> None: opt = DecodeOptions(allow_dots=True, decode_dot_in_keys=True) diff --git a/tests/unit/encode_test.py b/tests/unit/encode_test.py index 92f84ed..3f1a9e8 100644 --- a/tests/unit/encode_test.py +++ b/tests/unit/encode_test.py @@ -11,7 +11,8 @@ import pytest from qs_codec import Charset, EncodeOptions, Format, ListFormat, dumps, encode -from qs_codec.encode import _CycleState, _encode, _pop_current_node, _sentinel +from qs_codec.encode import _encode, _pop_current_node, _sentinel +from qs_codec.models.cycle_state import CycleState from qs_codec.models.undefined import Undefined from qs_codec.models.weak_wrapper import WeakWrapper from qs_codec.utils.encode_utils import EncodeUtils @@ -1971,7 +1972,7 @@ def test_encode_cycle_state_bootstrap_matches_legacy_side_channel_behavior(self) def test_pop_current_node_noop_when_wrapper_not_present(self) -> None: value: t.Dict[str, t.Any] = {"child": "value"} wrapper = WeakWrapper(value) - state = _CycleState() + state = CycleState() with does_not_raise(): _pop_current_node(state, wrapper) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 9db1352..6963ba0 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -538,6 +538,12 @@ def test_merge_true_into_null(self) -> None: def test_merge_null_into_array(self) -> None: assert Utils.merge(None, [42]) == [None, 42] + def test_merge_with_none_source_returns_target(self) -> None: + target = {"a": 1} + result = Utils.merge(target, None) + assert result == {"a": 1} + assert result is target + def test_merge_promotes_list_with_undefined_when_lists_disabled(self) -> None: options = DecodeOptions(parse_lists=False) target = [Undefined(), "keep"] @@ -682,6 +688,9 @@ def test_combine_neither_is_an_array(self) -> None: assert b is not combined assert combined == [1, 2] + def test_apply_maps_scalar_values(self) -> None: + assert Utils.apply("x", lambda value: f"{value}!") == "x!" + def test_combine_list_limit_exceeded_creates_overflow_dict(self) -> None: default_limit = DecodeOptions().list_limit a = [1] * max(1, default_limit)