Source code for pcapkit.foundation.reassembly.ip

# -*- coding: utf-8 -*-
"""IP Datagram Reassembly
============================

:mod:`pcapkit.foundation.reassembly.ip` contains
:class:`~pcapkit.foundation.reassembly.ip.IP_Reassembly`
only, which reconstructs fragmented IP packets back to
origin. The following algorithm implement is based on IP
reassembly procedure introduced in :rfc:`791`, using
``RCVBT`` (fragment receivedbit table). Though another
algorithm is explained in :rfc:`815`, replacing ``RCVBT``,
however, this implement still used the elder one.

"""
from typing import TYPE_CHECKING, Generic, TypeVar

from pcapkit.corekit.infoclass import Info
from pcapkit.foundation.reassembly.reassembly import Reassembly
from pcapkit.utilities.compat import Tuple

if TYPE_CHECKING:
    from ipaddress import IPv4Address, IPv6Address
    from typing import Optional, overload

    from typing_extensions import Literal

    from pcapkit.const.reg.transtype import TransType
    from pcapkit.protocols.protocol import Protocol

__all__ = ['IP_Reassembly']

AT = TypeVar('AT', 'IPv4Address', 'IPv6Address')

###############################################################################
# Data Models
###############################################################################


[docs]class Packet(Info, Generic[AT]): """Data model for :term:`ipv4.packet` / :term:`ipv6.packet`.""" #: Buffer ID. bufid: 'tuple[AT, AT, int, TransType]' #: Original packet range number. num: 'int' #: Fragment offset. fo: 'int' #: Internet header length. ihl: 'int' #: More fragments flag. mf: 'bool' #: Total length, header included. tl: 'int' #: Raw :obj:`bytes` type header. header: 'bytes' #: Raw :obj:`bytearray` type payload. payload: 'bytearray' if TYPE_CHECKING: def __init__(self, bufid: 'tuple[AT, AT, int, TransType]', num: 'int', fo: 'int', ihl: 'int', mf: 'bool', tl: 'int', header: 'bytes', payload: 'bytearray') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs]class DatagramID(Info, Generic[AT]): """Data model for :term:`ipv4.datagram` / :term:`ipv6.datagram` original packet identifier.""" #: Source address. src: 'AT' #: Destination address. dst: 'AT' #: IP protocol identifier. id: 'int' #: Payload protocol type. proto: 'TransType' if TYPE_CHECKING: def __init__(self, src: 'AT', dst: 'AT', id: 'int', proto: 'TransType') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs]class Datagram(Info, Generic[AT]): """Data model for :term:`ipv4.datagram` / :term:`ipv6.datagram`.""" #: Completed flag. completed: 'bool' #: Original packet identifier. id: 'DatagramID[AT]' #: Packet numbers. index: 'tuple[int, ...]' #: Initial IP header. header: 'bytes' #: Reassembled IP payload. payload: 'bytes | tuple[bytes, ...]' #: Parsed IP payload. packet: 'Optional[Protocol]' if TYPE_CHECKING: @overload #pylint: disable=used-before-assignment def __init__(self, completed: 'Literal[True]', id: 'DatagramID[AT]', index: 'tuple[int, ...]', header: 'bytes', payload: 'bytes', packet: 'Protocol') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin @overload def __init__(self, completed: 'Literal[False]', id: 'DatagramID[AT]', index: 'tuple[int, ...]', header: 'bytes', payload: 'tuple[bytes, ...]', packet: 'None') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin def __init__(self, completed: 'bool', id: 'DatagramID[AT]', index: 'tuple[int, ...]', header: 'bytes', payload: 'bytes | tuple[bytes, ...]', packet: 'Optional[Protocol]') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs]class Buffer(Info, Generic[AT]): """Data model for :term:`ipv4.buffer` / :term:`ipv6.buffer`.""" #: Total data length. TDL: 'int' #: Fragment received bit table. RCVBT: 'bytearray' #: List of reassembled packets. index: 'list[int]' #: Header buffer. header: 'bytes' #: Data buffer, holes set to ``b'\x00'``. datagram: 'bytearray' if TYPE_CHECKING: def __init__(self, TDL: 'int', RCVBT: 'bytearray', index: 'list[int]', header: 'bytes', datagram: 'bytearray') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
############################################################################### # Algorithm Implementation ###############################################################################
[docs]class IP_Reassembly(Reassembly[Packet[AT], Datagram[AT], Tuple[AT, AT, 'int', 'TransType'], Buffer[AT]], Generic[AT]): # pylint: disable=abstract-method """Reassembly for IP payload. Important: This class is not intended to be instantiated directly, but rather used as a base class for the protocol-aware reassembly classes. """ ########################################################################## # Methods. ##########################################################################
[docs] def reassembly(self, info: 'Packet[AT]') -> 'None': """Reassembly procedure. Arguments: info: info dict of packets to be reassembled """ # clear cache self._newflg = False self.__cached__.clear() BUFID = info.bufid # Buffer Identifier FO = info.fo # Fragment Offset IHL = info.ihl # Internet Header Length MF = info.mf # More Fragments flag TL = info.tl # Total Length # when non-fragmented (possibly discarded) packet received if not FO and not MF: if BUFID in self._buffer: self._dtgram.extend( self.submit(self._buffer.pop(BUFID), bufid=BUFID) ) return # initialise buffer with BUFID if BUFID not in self._buffer: self._buffer[BUFID] = Buffer( TDL=-1, # Total Data Length RCVBT=bytearray(8191), # Fragment Received Bit Table index=[], # index record header=b'' if FO else info.header, # header buffer datagram=bytearray(65535), # data buffer ) else: # put header into header buffer if not FO: # pylint: disable=else-if-used self._buffer[BUFID].__update__(header=info.header) # append packet index self._buffer[BUFID].index.append(info.num) # put data into data buffer start = FO stop = TL - IHL + FO self._buffer[BUFID].datagram[start:stop] = info.payload # set RCVBT bits (in 8 octets) start = FO // 8 stop = FO // 8 + (TL - IHL + 7) // 8 self._buffer[BUFID].RCVBT[start:stop] = b'\x01' * (stop - start + 1) # get total data length (header excludes) TDL = 0 if not MF: TDL = TL - IHL + FO self._buffer[BUFID].__update__(TDL=TDL) # when datagram is reassembled in whole start = 0 stop = (TDL + 7) // 8 if TDL and all(self._buffer[BUFID].RCVBT[start:stop]): self._dtgram.extend( self.submit(self._buffer.pop(BUFID), bufid=BUFID, checked=True) )
[docs] def submit(self, buf: 'Buffer[AT]', *, bufid: 'tuple[AT, AT, int, TransType]', # type: ignore[override] # pylint: disable=arguments-differ checked: 'bool' = False) -> 'list[Datagram[AT]]': """Submit reassembled payload. Arguments: buf: buffer dict of reassembled packets bufid: buffer identifier checked: buffer consistency checked flag Returns: Reassembled packets. """ TDL = buf.TDL RCVBT = buf.RCVBT index = buf.index header = buf.header datagram = buf.datagram start = 0 stop = (TDL + 7) // 8 flag = checked or (TDL and all(RCVBT[start:stop])) # if datagram is not implemented if not flag and self._strflg: data = [] # type: list[bytes] byte = bytearray() # extract received payload for (bctr, bit) in enumerate(RCVBT): if bit: # received bit this = bctr * 8 that = this + 8 byte += datagram[this:that] else: # missing bit if byte: # strip empty payload data.append(bytes(byte)) byte = bytearray() # strip empty packets if data or header: packet = Datagram( completed=False, id=DatagramID( src=bufid[0], dst=bufid[1], id=bufid[2], proto=bufid[3], ), index=tuple(index), header=header, payload=tuple(data), packet=None, ) # if datagram is reassembled in whole else: payload = datagram[:TDL] packet = Datagram( completed=True, id=DatagramID( src=bufid[0], dst=bufid[1], id=bufid[2], proto=bufid[3], ), index=tuple(index), header=header, payload=bytes(payload), packet=self.protocol.analyze(bufid[3], bytes(payload)), ) return [packet]