Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 82 additions & 13 deletions scapy/layers/dhcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- rfc951 - BOOTSTRAP PROTOCOL (BOOTP)
- rfc1542 - Clarifications and Extensions for the Bootstrap Protocol
- rfc1533 - DHCP Options and BOOTP Vendor Extensions
- rfc3396 - Encoding Long Options in DHCPv4
"""

try:
Expand Down Expand Up @@ -436,8 +437,62 @@ def i2repr(self, pkt, x):
s.append(sane(v))
return "[%s]" % (" ".join(s))

def _find_overload(self, x):
"""
Quickly scans the raw options buffer to find the value
of the Option Overload option (code 52).
Returns 0 if the option is not present.
"""
while x:
o = orb(x[0])
if o == 255:
break
if o == 0:
x = x[1:]
continue
if len(x) < 2:
break
olen = orb(x[1])
if len(x) < olen + 2:
break
if o == 52 and olen == 1:
return orb(x[2])
x = x[olen + 2:]
return 0

def getfield(self, pkt, s):
return b"", self.m2i(pkt, s)
"""
Retrieve the binary value of the s option field.
RFC 3396: build the aggregated options buffer (options, then file, then sname)
if the overload option is present.
"""
aggregate = s
overload = self._find_overload(s)
if (overload
and pkt.underlayer is not None
and isinstance(pkt.underlayer, BOOTP)):
if overload in (1, 3):
aggregate += pkt.underlayer.file
if overload in (2, 3):
aggregate += pkt.underlayer.sname
return b"", self.m2i(pkt, aggregate)

def _concat_fragments(self, x, o):
"""
RFC 3396: concatenate consecutive option fragments with the same code o
found at the beginning of buffer x.
Returns (raw_value, remaining_buffer) where:
- raw_value contains the concatenated data from all consumed fragments.
- remaining_buffer is x advanced past those fragments.
"""
raw_value = b""
while (x and len(x) >= 2 and orb(x[0]) == o):
next_olen = orb(x[1])
if len(x) < next_olen + 2:
break
raw_value += x[2:next_olen + 2]
x = x[next_olen + 2:]
return raw_value, x

def m2i(self, pkt, x):
opt = []
Expand All @@ -456,40 +511,51 @@ def m2i(self, pkt, x):
break
elif o in DHCPOptions:
f = DHCPOptions[o]
olen = orb(x[1])
x_before = x
raw_value = x[2:olen + 2]
x = x[olen + 2:]

# RFC 3396: concatenate subsequent fragments
extra, x = self._concat_fragments(x, o)
raw_value += extra

if isinstance(f, str):
olen = orb(x[1])
opt.append((f, x[2:olen + 2]))
x = x[olen + 2:]
opt.append((f, raw_value))
else:
olen = orb(x[1])
lval = [f.name]

if olen == 0:
if len(raw_value) == 0:
try:
_, val = f.getfield(pkt, b'')
except Exception:
opt.append(x)
opt.append(x_before)
break
else:
lval.append(val)

try:
left = x[2:olen + 2]
left = raw_value
while left:
left, val = f.getfield(pkt, left)
lval.append(val)
except Exception:
opt.append(x)
opt.append(x_before)
break
else:
otuple = tuple(lval)
opt.append(otuple)
x = x[olen + 2:]
else:
olen = orb(x[1])
opt.append((o, x[2:olen + 2]))
x_before = x
raw_value = x[2:olen + 2]
x = x[olen + 2:]

# RFC 3396: concatenate subsequent fragments
extra, x = self._concat_fragments(x, o)
raw_value += extra

opt.append((o, raw_value))
return opt

def i2m(self, pkt, x):
Expand All @@ -514,8 +580,11 @@ def i2m(self, pkt, x):
warning("Unknown field option %s", name)
continue

s += struct.pack("!BB", onum, len(oval))
s += oval
while oval:
chunk = oval[:255]
oval = oval[255:]
s += struct.pack("!BB", onum, len(chunk))
s += chunk

elif (isinstance(o, str) and o in DHCPRevOptions and
DHCPRevOptions[o][1] is None):
Expand Down
36 changes: 36 additions & 0 deletions test/scapy/layers/dhcp.uts
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,39 @@ assert result in [
'<function scapy.ansmachine.dhcpd(self, pool: Union[scapy.base_classes.Net, List[str]] = Net("192.168.1.128/25"), network: str = \'192.168.1.0/24\', gw: str = \'192.168.1.1\', nameserver: Union[str, List[str]] = None, domain: Union[str, NoneType] = None, renewal_time: int = 60, lease_time: int = 1800, **kwargs)>',
'<function scapy.ansmachine.dhcpd(self, pool=Net("192.168.1.128/25"), network=\'192.168.1.0/24\', gw=\'192.168.1.1\', nameserver=None, domain=None, renewal_time=60, lease_time=1800, **kwargs)>',
]

= RFC 3396 - Encoding long DHCPv4 options (issue #4642)
# i2m: option > 255 bytes is split into fragments (test case from issue)
# m2i: consecutive fragments with same code are concatenated
# m2i: non-consecutive same code options are NOT concatenated
# m2i: concatenation works for unknown option codes
# roundtrip: long option survives encode/decode
# getfield: sname/file not aggregated without overload
# getfield: overload=1 aggregates file field

import struct

r = raw(DHCP(options=[('captive-portal', 'a'*256), 'end']))
assert r[:2] == b'\x72\xff' and r[2:257] == b'a'*255
assert r[257:260] == b'\x72\x01a' and r[260:261] == b'\xff'

assert DHCP(b'\x06\x02\x01\x02\x06\x02\x03\x04').options == DHCP(b'\x06\x04\x01\x02\x03\x04').options

p = DHCP(b'\x0c\x02sc\x06\x04\x01\x02\x03\x04\x0c\x02py')
assert p.options == [('hostname', b'sc'), ('name_server', '1.2.3.4'), ('hostname', b'py')]

assert DHCP(b'\xfe\x02AB\xfe\x02CD').options[0] == (254, b'ABCD')

pkt2 = DHCP(raw(DHCP(options=[('captive-portal', 'a'*400), 'end'])))
assert pkt2.options[0] == ('captive-portal', b'a'*400) and pkt2.options[-1] == 'end'

bootp_pkt = BOOTP(chaddr="00:01:02:03:04:05", sname=b'myserver'+b'\x00'*56, file=b'bootfile'+b'\x00'*120, options=b'c\x82Sc') / DHCP(options=[('message-type', 'discover'), 'end'])
p = BOOTP(raw(bootp_pkt))
assert p[DHCP].options[0] == ('message-type', 1) and p[BOOTP].sname[:8] == b'myserver'

magic = b'\x63\x82\x53\x63'
opts = b'\x34\x01\x01' + b'\x35\x01\x01' + b'\xff'
file_field = (b'\x0c\x05scapy' + b'\xff' + b'\x00'*120)[:128]
bootp_raw = struct.pack("!4B", 1, 1, 6, 0) + b'\x00'*4 + b'\x00'*4 + b'\x00'*16 + b'\x00'*16 + b'\x00'*64 + file_field + magic + opts
p = BOOTP(bootp_raw)
assert DHCP in p and ('hostname', b'scapy') in p[DHCP].options