diff --git a/scapy/layers/dhcp.py b/scapy/layers/dhcp.py index 39250cf74a8..c39d299cecc 100644 --- a/scapy/layers/dhcp.py +++ b/scapy/layers/dhcp.py @@ -10,6 +10,7 @@ - rfc951 - BOOTSTRAP PROTOCOL (BOOTP) - rfc1542 - Clarifications and Extensions for the Bootstrap Protocol - rfc1533 - DHCP Options and BOOTP Vendor Extensions +- rfc3396 - Encoding Long Options in DHCPv4 """ try: @@ -436,8 +437,62 @@ def i2repr(self, pkt, x): s.append(sane(v)) return "[%s]" % (" ".join(s)) + def _find_overload(self, x): + """ + Quickly scans the raw options buffer to find the value + of the Option Overload option (code 52). + Returns 0 if the option is not present. + """ + while x: + o = orb(x[0]) + if o == 255: + break + if o == 0: + x = x[1:] + continue + if len(x) < 2: + break + olen = orb(x[1]) + if len(x) < olen + 2: + break + if o == 52 and olen == 1: + return orb(x[2]) + x = x[olen + 2:] + return 0 + def getfield(self, pkt, s): - return b"", self.m2i(pkt, s) + """ + Retrieve the binary value of the s option field. + RFC 3396: build the aggregated options buffer (options, then file, then sname) + if the overload option is present. + """ + aggregate = s + overload = self._find_overload(s) + if (overload + and pkt.underlayer is not None + and isinstance(pkt.underlayer, BOOTP)): + if overload in (1, 3): + aggregate += pkt.underlayer.file + if overload in (2, 3): + aggregate += pkt.underlayer.sname + return b"", self.m2i(pkt, aggregate) + + def _concat_fragments(self, x, o): + """ + RFC 3396: concatenate consecutive option fragments with the same code o + found at the beginning of buffer x. + Returns (raw_value, remaining_buffer) where: + - raw_value contains the concatenated data from all consumed fragments. + - remaining_buffer is x advanced past those fragments. + """ + raw_value = b"" + while (x and len(x) >= 2 and orb(x[0]) == o): + next_olen = orb(x[1]) + if len(x) < next_olen + 2: + break + raw_value += x[2:next_olen + 2] + x = x[next_olen + 2:] + return raw_value, x def m2i(self, pkt, x): opt = [] @@ -456,40 +511,51 @@ def m2i(self, pkt, x): break elif o in DHCPOptions: f = DHCPOptions[o] + olen = orb(x[1]) + x_before = x + raw_value = x[2:olen + 2] + x = x[olen + 2:] + + # RFC 3396: concatenate subsequent fragments + extra, x = self._concat_fragments(x, o) + raw_value += extra if isinstance(f, str): - olen = orb(x[1]) - opt.append((f, x[2:olen + 2])) - x = x[olen + 2:] + opt.append((f, raw_value)) else: - olen = orb(x[1]) lval = [f.name] - if olen == 0: + if len(raw_value) == 0: try: _, val = f.getfield(pkt, b'') except Exception: - opt.append(x) + opt.append(x_before) break else: lval.append(val) try: - left = x[2:olen + 2] + left = raw_value while left: left, val = f.getfield(pkt, left) lval.append(val) except Exception: - opt.append(x) + opt.append(x_before) break else: otuple = tuple(lval) opt.append(otuple) - x = x[olen + 2:] else: olen = orb(x[1]) - opt.append((o, x[2:olen + 2])) + x_before = x + raw_value = x[2:olen + 2] x = x[olen + 2:] + + # RFC 3396: concatenate subsequent fragments + extra, x = self._concat_fragments(x, o) + raw_value += extra + + opt.append((o, raw_value)) return opt def i2m(self, pkt, x): @@ -514,8 +580,11 @@ def i2m(self, pkt, x): warning("Unknown field option %s", name) continue - s += struct.pack("!BB", onum, len(oval)) - s += oval + while oval: + chunk = oval[:255] + oval = oval[255:] + s += struct.pack("!BB", onum, len(chunk)) + s += chunk elif (isinstance(o, str) and o in DHCPRevOptions and DHCPRevOptions[o][1] is None): diff --git a/test/scapy/layers/dhcp.uts b/test/scapy/layers/dhcp.uts index 8b226b267af..4eb31047193 100644 --- a/test/scapy/layers/dhcp.uts +++ b/test/scapy/layers/dhcp.uts @@ -137,3 +137,39 @@ assert result in [ '', '', ] + += RFC 3396 - Encoding long DHCPv4 options (issue #4642) +# i2m: option > 255 bytes is split into fragments (test case from issue) +# m2i: consecutive fragments with same code are concatenated +# m2i: non-consecutive same code options are NOT concatenated +# m2i: concatenation works for unknown option codes +# roundtrip: long option survives encode/decode +# getfield: sname/file not aggregated without overload +# getfield: overload=1 aggregates file field + +import struct + +r = raw(DHCP(options=[('captive-portal', 'a'*256), 'end'])) +assert r[:2] == b'\x72\xff' and r[2:257] == b'a'*255 +assert r[257:260] == b'\x72\x01a' and r[260:261] == b'\xff' + +assert DHCP(b'\x06\x02\x01\x02\x06\x02\x03\x04').options == DHCP(b'\x06\x04\x01\x02\x03\x04').options + +p = DHCP(b'\x0c\x02sc\x06\x04\x01\x02\x03\x04\x0c\x02py') +assert p.options == [('hostname', b'sc'), ('name_server', '1.2.3.4'), ('hostname', b'py')] + +assert DHCP(b'\xfe\x02AB\xfe\x02CD').options[0] == (254, b'ABCD') + +pkt2 = DHCP(raw(DHCP(options=[('captive-portal', 'a'*400), 'end']))) +assert pkt2.options[0] == ('captive-portal', b'a'*400) and pkt2.options[-1] == 'end' + +bootp_pkt = BOOTP(chaddr="00:01:02:03:04:05", sname=b'myserver'+b'\x00'*56, file=b'bootfile'+b'\x00'*120, options=b'c\x82Sc') / DHCP(options=[('message-type', 'discover'), 'end']) +p = BOOTP(raw(bootp_pkt)) +assert p[DHCP].options[0] == ('message-type', 1) and p[BOOTP].sname[:8] == b'myserver' + +magic = b'\x63\x82\x53\x63' +opts = b'\x34\x01\x01' + b'\x35\x01\x01' + b'\xff' +file_field = (b'\x0c\x05scapy' + b'\xff' + b'\x00'*120)[:128] +bootp_raw = struct.pack("!4B", 1, 1, 6, 0) + b'\x00'*4 + b'\x00'*4 + b'\x00'*16 + b'\x00'*16 + b'\x00'*64 + file_field + magic + opts +p = BOOTP(bootp_raw) +assert DHCP in p and ('hostname', b'scapy') in p[DHCP].options \ No newline at end of file