# -*- coding: utf-8 -*-
"""DPKT tools
:mod:`pcapkit.toolkit.dpkt` contains all you need for
:mod:`pcapkit` handy usage with `DPKT`_ engine. All reforming
functions returns with a flag to indicate if usable for
its caller.
.. _DPKT: https://dpkt.readthedocs.io
"""
import ipaddress
from pcapkit.const.reg.transtype import TransType as TP_PROTO
__all__ = [
'ipv6_hdr_len', 'packet2chain', 'packet2dict',
'ipv4_reassembly', 'ipv6_reassembly', 'tcp_reassembly', 'tcp_traceflow'
]
[docs]def ipv6_hdr_len(ipv6):
"""Calculate length of headers before IPv6 Fragment header.
Args:
ipv6 (dpkt.ip6.IP6): DPKT IPv6 packet.
Returns:
int: Length of headers before IPv6 Fragment header
:class:`dpkt.ip6.IP6FragmentHeader` (:rfc:`2460#section-4.5`).
As specified in :rfc:`2460#section-4.1`, such headers (before the IPv6 Fragment Header)
includes Hop-by-Hop Options header :class:`dpkt.ip6.IP6HopOptsHeader` (:rfc:`2460#section-4.3`),
Destination Options header :class:`dpkt.ip6.IP6DstOptHeader` (:rfc:`2460#section-4.6`) and
Routing header :class:`dpkt.ip6.IP6RoutingHeader` (:rfc:`2460#section-4.4`).
"""
hdr_len = ipv6.__hdr_len__
# IP6HopOptsHeader / IP6DstOptHeader / IP6RoutingHeader
for code in (0, 60, 43):
ext_hdr = ipv6.extension_hdrs.get(code)
if ext_hdr is not None:
hdr_len += ext_hdr.length
return hdr_len
[docs]def packet2chain(packet):
"""Fetch DPKT packet protocol chain.
Args:
packet (dpkt.dpkt.Packet): DPKT packet.
Returns:
str: Colon (``:``) seperated list of protocol chain.
"""
chain = [type(packet).__name__]
payload = packet.data
while not isinstance(payload, bytes):
chain.append(type(payload).__name__)
payload = payload.data
return ':'.join(chain)
[docs]def packet2dict(packet, timestamp, *, data_link):
"""Convert DPKT packet into :obj:`dict`.
Args:
packet (c): Scapy packet.
Returns:
Dict[str, Any]: A :obj:`dict` mapping of packet data.
"""
def wrapper(packet):
dict_ = dict()
for field in packet.__hdr_fields__:
dict_[field] = getattr(packet, field, None)
payload = packet.data
if not isinstance(payload, bytes):
dict_[type(payload).__name__] = wrapper(payload)
return dict_
return {
'timestamp': timestamp,
'packet': packet.pack(),
data_link.name: wrapper(packet),
}
[docs]def ipv4_reassembly(packet, *, count=NotImplemented):
"""Make data for IPv4 reassembly.
Args:
packet (dpkt.dpkt.Packet): DPKT packet.
Keyword Args:
count (int): Packet index. If not provided, default to ``NotImplemented``.
Returns:
Tuple[bool, Dict[str, Any]]: A tuple of data for IPv4 reassembly.
* If the ``packet`` can be used for IPv4 reassembly. A packet can be reassembled
if it contains IPv4 layer (:class:`dpkt.ip.IP`) and the **DF** (:attr:`dpkt.ip.IP.df`)
flag is ``False``.
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for IPv4
reassembly (:term:`ipv4.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`~pcapkit.reassembly.ipv4.IPv4Reassembly`
"""
ipv4 = getattr(packet, 'ip', None)
if ipv4 is not None:
if ipv4.df: # dismiss not fragmented packet
return False, None
data = dict(
bufid=(
ipaddress.ip_address(ipv4.src), # source IP address
ipaddress.ip_address(ipv4.dst), # destination IP address
ipv4.id, # identification
TP_PROTO.get(ipv4.p).name, # payload protocol type
),
num=count, # original packet range number
fo=ipv4.off, # fragment offset
ihl=ipv4.__hdr_len__, # internet header length
mf=bool(ipv4.mf), # more fragment flag
tl=ipv4.len, # total length, header includes
header=bytearray(ipv4.pack()[:ipv4.__hdr_len__]), # raw bytearray type header
payload=bytearray(ipv4.pack()[ipv4.__hdr_len__:]), # raw bytearray type payload
)
return True, data
return False, None
[docs]def ipv6_reassembly(packet, *, count=NotImplemented):
"""Make data for IPv6 reassembly.
Args:
packet (dpkt.dpkt.Packet): DPKT packet.
Keyword Args:
count (int): Packet index. If not provided, default to ``NotImplemented``.
Returns:
Tuple[bool, Dict[str, Any]]: A tuple of data for IPv6 reassembly.
* If the ``packet`` can be used for IPv6 reassembly. A packet can be reassembled
if it contains IPv6 layer (:class:`dpkt.ip6.IP6`) and IPv6 Fragment header
(:rfc:`2460#section-4.5`, :class:`dpkt.ip6.IP6FragmentHeader`).
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for IPv6
reassembly (:term:`ipv6.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`~pcapkit.reassembly.ipv6.IPv6Reassembly`
"""
ipv6 = getattr(packet, 'ip6', None)
if ipv6 is not None:
ipv6_frag = ipv6.extension_hdrs.get(44)
if ipv6_frag is None: # dismiss not fragmented packet
return False, None
hdr_len = ipv6_hdr_len(ipv6)
data = dict(
bufid=(
ipaddress.ip_address(ipv6.src), # source IP address
ipaddress.ip_address(ipv6.dst), # destination IP address
ipv6.flow, # label
TP_PROTO.get(ipv6_frag.nh).name, # next header field in IPv6 Fragment Header
),
num=count, # original packet range number
fo=ipv6_frag.nxt, # fragment offset
ihl=hdr_len, # header length, only headers before IPv6-Frag
mf=bool(ipv6_frag.m_flag), # more fragment flag
tl=len(ipv6), # total length, header includes
header=bytearray(ipv6.pack()[:hdr_len]), # raw bytearray type header before IPv6-Frag
payload=bytearray(ipv6.pack()[hdr_len+ipv6_frag:]), # raw bytearray type payload after IPv6-Frag
)
return True, data
return False, None
[docs]def tcp_reassembly(packet, *, count=NotImplemented):
"""Make data for TCP reassembly.
Args:
packet (dpkt.dpkt.Packet): DPKT packet.
Keyword Args:
count (int): Packet index. If not provided, default to ``NotImplemented``.
Returns:
Tuple[bool, Dict[str, Any]]: A tuple of data for TCP reassembly.
* If the ``packet`` can be used for TCP reassembly. A packet can be reassembled
if it contains TCP layer (:class:`dpkt.tcp.TCP`).
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP
reassembly (:term:`tcp.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`~pcapkit.reassembly.tcp.TCPReassembly`
"""
if getattr(packet, 'ip', None):
ip = packet['ip']
elif getattr(packet, 'ip6', None):
ip = packet['ip6']
else:
return False, None
tcp = getattr(ip, 'tcp', None)
if tcp is not None:
flags = bin(tcp.flags)[2:].zfill(8)
data = dict(
bufid=(
ipaddress.ip_address(ip.src), # source IP address
ipaddress.ip_address(ip.dst), # destination IP address
tcp.sport, # source port
tcp.dport, # destination port
),
num=count, # original packet range number
ack=tcp.ack, # acknowledgement
dsn=tcp.seq, # data sequence number
rst=bool(int(flags[5])), # reset connection flag
syn=bool(int(flags[6])), # synchronise flag
fin=bool(int(flags[7])), # finish flag
payload=bytearray(tcp.pack()[tcp.__hdr_len__:]), # raw bytearray type payload
)
raw_len = len(tcp.data) # payload length, header excludes
data['first'] = tcp.seq # this sequence number
data['last'] = tcp.seq + raw_len # next (wanted) sequence number
data['len'] = raw_len # payload length, header excludes
return True, data
return False, None
[docs]def tcp_traceflow(packet, timestamp, *, data_link, count=NotImplemented):
"""Trace packet flow for TCP.
Args:
packet (dpkt.dpkt.Packet): DPKT packet.
timestamp (float): Timestamp of the packet.
Keyword Args:
data_link (str): Data link layer protocol (from global header).
count (int): Packet index. If not provided, default to ``NotImplemented``.
Returns:
Tuple[bool, Dict[str, Any]]: A tuple of data for TCP reassembly.
* If the ``packet`` can be used for TCP flow tracing. A packet can be reassembled
if it contains TCP layer (:class:`dpkt.tcp.TCP`).
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP
flow tracing (:term:`trace.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`~pcapkit.foundation.traceflow.TraceFlow`
"""
if getattr(packet, 'ip', None):
ip = packet['ip']
elif getattr(packet, 'ip6', None):
ip = packet['ip6']
else:
return False, None
tcp = getattr(ip, 'tcp', None)
if tcp is not None:
flags = bin(tcp.flags)[2:].zfill(8)
data = dict(
protocol=data_link, # data link type from global header
index=count, # frame number
frame=packet2dict(packet, timestamp, data_link=data_link), # extracted packet
syn=bool(int(flags[6])), # TCP synchronise (SYN) flag
fin=bool(int(flags[7])), # TCP finish (FIN) flag
src=ipaddress.ip_address(ip.src), # source IP
dst=ipaddress.ip_address(ip.dst), # destination IP
srcport=tcp.sport, # TCP source port
dstport=tcp.dport, # TCP destination port
timestamp=timestamp, # timestamp
)
return True, data
return False, None