Source code for pcapkit.toolkit.scapy

# -*- coding: utf-8 -*-
"""Scapy tools

:mod:`pcapkit.toolkit.scapy` contains all you need for
:mod:`pcapkit` handy usage with `Scapy`_ engine. All reforming
functions returns with a flag to indicate if usable for
its caller.

.. _Scapy: https://scapy.net

Warns:
    ScapyWarning: If `Scapy`_ is not installed.

"""
import ipaddress
import time
import warnings

from pcapkit.const.reg.linktype import LinkType as LINKTYPE
from pcapkit.const.reg.transtype import TransType as TP_PROTO
from pcapkit.utilities.exceptions import ModuleNotFound, stacklevel
from pcapkit.utilities.warnings import ScapyWarning

###############################################################################
# import scapy.all
###############################################################################

try:
    import scapy.all as scapy_all
except ImportError:
    scapy_all = None
    warnings.warn("dependency package 'Scapy' not found",
                  ScapyWarning, stacklevel=stacklevel())

__all__ = [
    'packet2chain', 'packet2dict',
    'ipv4_reassembly', 'ipv6_reassembly', 'tcp_reassembly', 'tcp_traceflow'
]


[docs]def packet2chain(packet): """Fetch Scapy packet protocol chain. Args: packet (scapy.packet.Packet): Scapy packet. Returns: str: Colon (``:``) seperated list of protocol chain. Raises: ModuleNotFound: If `Scapy`_ is not installed. """ if scapy_all is None: raise ModuleNotFound("No module named 'scapy'", name='scapy') chain = [packet.name] payload = packet.payload while not isinstance(payload, scapy_all.packet.NoPayload): chain.append(payload.name) payload = payload.payload return ':'.join(chain)
[docs]def packet2dict(packet): """Convert Scapy packet into :obj:`dict`. Args: packet (scapy.packet.Packet): Scapy packet. Returns: Dict[str, Any]: A :obj:`dict` mapping of packet data. Raises: ModuleNotFound: If `Scapy`_ is not installed. """ if scapy_all is None: raise ModuleNotFound("No module named 'scapy'", name='scapy') def wrapper(packet): dict_ = packet.fields payload = packet.payload if not isinstance(payload, scapy_all.packet.NoPayload): dict_[payload.name] = wrapper(payload) return dict_ return { 'packet': bytes(packet), packet.name: wrapper(packet), }
[docs]def ipv4_reassembly(packet, *, count=NotImplemented): """Make data for IPv4 reassembly. Args: packet (scapy.packet.Packet): Scapy packet. Keyword Args: count (int): Packet index. If not provided, default to ``NotImplemented``. Returns: Tuple[bool, Dict[str, Any]]: A tuple of data for IPv4 reassembly. * If the ``packet`` can be used for IPv4 reassembly. A packet can be reassembled if it contains IPv4 layer (:class:`scapy.layers.inet.IP`) and the **DF** (:attr:`scapy.layers.inet.IP.flags.DF`) flag is ``False``. * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for IPv4 reassembly (:term:`ipv4.packet`) will be returned; otherwise, returns :data:`None`. See Also: :class:`~pcapkit.reassembly.ipv4.IPv4Reassembly` """ if 'IP' in packet: ipv4 = packet['IP'] if ipv4.flags.DF: # dismiss not fragmented packet return False, None data = dict( bufid=( ipaddress.ip_address(ipv4.src), # source IP address ipaddress.ip_address(ipv4.dst), # destination IP address ipv4.id, # identification TP_PROTO.get(ipv4.proto).name, # payload protocol type ), num=count, # original packet range number fo=ipv4.frag, # fragment offset ihl=ipv4.ihl, # internet header length mf=bool(ipv4.flags.MF), # more fragment flag tl=ipv4.len, # total length, header includes header=bytearray(ipv4.raw_packet_cache), # raw bytearray type header payload=bytearray(bytes(ipv4.payload)), # raw bytearray type payload ) return True, data return False, None
[docs]def ipv6_reassembly(packet, *, count=NotImplemented): """Make data for IPv6 reassembly. Args: packet (scapy.packet.Packet): Scapy packet. Keyword Args: count (int): Packet index. If not provided, default to ``NotImplemented``. Returns: Tuple[bool, Dict[str, Any]]: A tuple of data for IPv6 reassembly. * If the ``packet`` can be used for IPv6 reassembly. A packet can be reassembled if it contains IPv6 layer (:class:`scapy.layers.inet6.IPv6`) and IPv6 Fragment header (:rfc:`2460#section-4.5`, :class:`scapy.layers.inet6.IPv6ExtHdrFragment`). * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for IPv6 reassembly (:term:`ipv6.packet`) will be returned; otherwise, returns :data:`None`. Raises: ModuleNotFound: If `Scapy`_ is not installed. See Also: :class:`~pcapkit.reassembly.ipv6.IPv6Reassembly` """ if scapy_all is None: raise ModuleNotFound("No module named 'scapy'", name='scapy') if 'IPv6' in packet: ipv6 = packet['IPv6'] if scapy_all.IPv6ExtHdrFragment not in ipv6: # pylint: disable=E1101 return False, None # dismiss not fragmented packet ipv6_frag = ipv6['IPv6ExtHdrFragment'] data = dict( bufid=( ipaddress.ip_address(ipv6.src), # source IP address ipaddress.ip_address(ipv6.dst), # destination IP address ipv6.fl, # label TP_PROTO.get(ipv6_frag.nh).name, # next header field in IPv6 Fragment Header ), num=count, # original packet range number fo=ipv6_frag.offset, # fragment offset ihl=len(ipv6) - len(ipv6_frag), # header length, only headers before IPv6-Frag mf=bool(ipv6_frag.m), # more fragment flag tl=len(ipv6), # total length, header includes header=bytearray(bytes(ipv6)[:-len(ipv6_frag)]), # raw bytearray type header before IPv6-Frag payload=bytearray(bytes(ipv6_frag.payload)), # raw bytearray type payload after IPv6-Frag ) return True, data return False, None
[docs]def tcp_reassembly(packet, *, count=NotImplemented): """Store data for TCP reassembly. Args: packet (scapy.packet.Packet): Scapy packet. Keyword Args: count (int): Packet index. If not provided, default to ``NotImplemented``. Returns: Tuple[bool, Dict[str, Any]]: A tuple of data for TCP reassembly. * If the ``packet`` can be used for TCP reassembly. A packet can be reassembled if it contains TCP layer (:class:`scapy.layers.inet.TCP`). * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP reassembly (:term:`tcp.packet`) will be returned; otherwise, returns :data:`None`. See Also: :class:`~pcapkit.reassembly.tcp.TCPReassembly` """ if 'TCP' in packet: ip = packet['IP'] if 'IP' in packet else packet['IPv6'] tcp = packet['TCP'] data = dict( bufid=( ipaddress.ip_address(ip.src), # source IP address ipaddress.ip_address(ip.dst), # destination IP address tcp.sport, # source port tcp.dport, # destination port ), num=count, # original packet range number ack=tcp.ack, # acknowledgement dsn=tcp.seq, # data sequence number syn=bool(tcp.flags.S), # synchronise flag fin=bool(tcp.flags.F), # finish flag rst=bool(tcp.flags.R), # reset connection flag payload=bytearray(bytes(tcp.payload)), # raw bytearray type payload ) raw_len = len(tcp.payload) # payload length, header excludes data['first'] = tcp.seq # this sequence number data['last'] = tcp.seq + raw_len # next (wanted) sequence number data['len'] = raw_len # payload length, header excludes return True, data return False, None
[docs]def tcp_traceflow(packet, *, count=NotImplemented): """Trace packet flow for TCP. Args: packet (scapy.packet.Packet): Scapy packet. Keyword Args: count (int): Packet index. If not provided, default to ``NotImplemented``. Returns: Tuple[bool, Dict[str, Any]]: A tuple of data for TCP reassembly. * If the ``packet`` can be used for TCP flow tracing. A packet can be reassembled if it contains TCP layer (:class:`scapy.layers.inet.TCP`). * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP flow tracing (:term:`trace.packet`) will be returned; otherwise, returns :data:`None`. See Also: :class:`~pcapkit.foundation.traceflow.TraceFlow` """ if 'TCP' in packet: ip = packet['IP'] if 'IP' in packet else packet['IPv6'] tcp = packet['TCP'] data = dict( protocol=LINKTYPE.get(packet.name.upper()), # data link type from global header index=count, # frame number frame=packet2dict(packet), # extracted packet syn=bool(tcp.flags.S), # TCP synchronise (SYN) flag fin=bool(tcp.flags.F), # TCP finish (FIN) flag src=ipaddress.ip_address(ip.src), # source IP dst=ipaddress.ip_address(ip.dst), # destination IP srcport=tcp.sport, # TCP source port dstport=tcp.dport, # TCP destination port timestamp=time.time(), # timestamp ) return True, data return False, None