Source code for pcapkit.foundation.reassembly.tcp

# -*- coding: utf-8 -*-
"""TCP Datagram Reassembly
=============================

:mod:`pcapkit.foundation.reassembly.tcp` contains
:class:`~pcapkit.foundation.reassembly.reassembly.Reassembly` only,
which reconstructs fragmented TCP packets back to origin.

"""
import sys
from typing import TYPE_CHECKING, Generic, TypeVar

from pcapkit.corekit.infoclass import Info
from pcapkit.foundation.reassembly.reassembly import Reassembly
from pcapkit.protocols.transport.tcp import TCP
from pcapkit.utilities.compat import Tuple

if TYPE_CHECKING:
    from ipaddress import IPv4Address, IPv6Address
    from typing import Optional, Type, overload

    from typing_extensions import Literal

    from pcapkit.protocols.protocol import Protocol

__all__ = ['TCP_Reassembly']

IPAddress = TypeVar('IPAddress', 'IPv4Address', 'IPv6Address')

###############################################################################
# Data Models
###############################################################################

BufferID = Tuple[IPAddress, int, IPAddress, int]


[docs]class Packet(Info): """Data model for :term:`tcp.packet`.""" #: Buffer ID. bufid: 'BufferID' #: Data sequence number. dsn: 'int' #: Acknowledgment number. ack: 'int' #: Original packet range number. num: 'int' #: Synchronise flag. syn: 'bool' #: Finish flag. fin: 'bool' #: Reset connection flag. rst: 'bool' #: Payload length, header excluded. len: 'int' #: This sequence number. first: 'int' #: Next (wanted) sequence number. last: 'int' #: Raw :obj:`bytes` type header. header: 'bytes' #: Raw :obj:`bytearray` type payload. payload: 'bytearray' if TYPE_CHECKING: def __init__(self, bufid: 'BufferID', dsn: 'int', ack: 'int', num: 'int', syn: 'bool', fin: 'bool', rst: 'bool', len: 'int', first: 'int', last: 'int', header: 'bytes', payload: 'bytearray') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs]class DatagramID(Info, Generic[IPAddress]): """Data model for :term:`tcp.datagram` original packet identifier.""" #: Source address. src: 'tuple[IPAddress, int]' #: Destination address. dst: 'tuple[IPAddress, int]' #: Original packet ACK number. ack: 'int' if TYPE_CHECKING: def __init__(self, src: 'tuple[IPAddress, int]', dst: 'tuple[IPAddress, int]', ack: 'int') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs]class Datagram(Info, Generic[IPAddress]): """Data model for :term:`tcp.datagram`.""" #: Completed flag. completed: 'bool' #: Original packet identifier. id: 'DatagramID[IPAddress]' #: Packet numbers. index: 'tuple[int, ...]' #: Initial TCP header. header: 'bytes' #: Reassembled payload (application layer data). payload: 'bytes | tuple[bytes, ...]' #: Parsed reassembled payload. packet: 'Optional[Protocol]' if TYPE_CHECKING: @overload # pylint: disable=used-before-assignment def __init__(self, completed: 'Literal[True]', id: 'DatagramID[IPAddress]', index: 'tuple[int, ...]', header: 'bytes', payload: 'bytes', packet: 'Protocol') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin @overload def __init__(self, completed: 'Literal[False]', id: 'DatagramID[IPAddress]', index: 'tuple[int, ...]', header: 'bytes', payload: 'tuple[bytes, ...]', packet: 'None') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin def __init__(self, completed: 'bool', id: 'DatagramID[IPAddress]', index: 'tuple[int, ...]', header: 'bytes', payload: 'bytes | tuple[bytes, ...]', packet: 'Optional[Protocol]') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs]class HoleDiscriptor(Info): """Data model for :term:`tcp.buffer` hole descriptor.""" #: Start of hole. first: 'int' #: Stop of hole. last: 'int' if TYPE_CHECKING: def __init__(self, first: 'int', last: 'int') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs]class Fragment(Info): """Data model for :term:`tcp.buffer` ACK list fragment item.""" #: List of reassembled packets. ind: 'list[int]' #: ISN of payload buffer. isn: 'int' #: Length of payload buffer. len: 'int' #: Reassembled payload holes set to b'\x00'. raw: 'bytearray' if TYPE_CHECKING: def __init__(self, ind: 'list[int]', isn: 'int', len: 'int', raw: 'bytearray') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs]class Buffer(Info): """Data model for :term:`tcp.buffer`.""" #: Hole descriptor list. hdl: 'list[HoleDiscriptor]' #: Initial TCP header. hdr: 'bytes' #: ACK list. ack: 'dict[int, Fragment]' if TYPE_CHECKING: def __init__(self, hdl: 'list[HoleDiscriptor]', hdr: 'bytes', ack: 'dict[int, Fragment]') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
############################################################################### # Algorithm Implementation ###############################################################################
[docs]class TCP_Reassembly(Reassembly[Packet, Datagram, BufferID, Buffer]): """Reassembly for TCP payload. Example: >>> from pcapkit.reassembly import TCP_Reassembly # Initialise instance: >>> tcp_reassembly = TCP_Reassembly() # Call reassembly: >>> tcp_reassembly(packet_dict) # Fetch result: >>> result = tcp_reassembly.datagram """ ########################################################################## # Methods. ########################################################################## @property def name(self) -> 'Literal["Transmission Control Protocol"]': """Protocol of current packet.""" return 'Transmission Control Protocol' @property def protocol(self) -> 'Type[TCP]': """Protocol of current reassembly object.""" return TCP ########################################################################## # Methods. ##########################################################################
[docs] def reassembly(self, info: 'Packet') -> 'None': """Reassembly procedure. Arguments: info: :term:`info <tcp.packet>` dict of packets to be reassembled """ # clear cache self._newflg = False self.__cached__.clear() BUFID = info.bufid # Buffer Identifier DSN = info.dsn # Data Sequence Number ACK = info.ack # Acknowledgement Number FIN = info.fin # Finish Flag (Termination) RST = info.rst # Reset Connection Flag (Termination) SYN = info.syn # Synchronise Flag (Establishment) # when SYN is set, reset buffer of existing session if SYN and BUFID in self._buffer: self._dtgram.extend( self.submit(self._buffer.pop(BUFID), bufid=BUFID) ) # initialise buffer with BUFID & ACK if BUFID not in self._buffer: self._buffer[BUFID] = Buffer( hdl=[ HoleDiscriptor( first=info.len, last=sys.maxsize, ), ], hdr=info.header if SYN else b'', ack={ ACK: Fragment( ind=[ info.num, ], isn=info.dsn, len=info.len, raw=info.payload, ), }, ) else: # initialise buffer with ACK if ACK not in self._buffer[BUFID].ack: self._buffer[BUFID].ack[ACK] = Fragment( ind=[ info.num, ], isn=info.dsn, len=info.len, raw=info.payload, ) else: # put header into header buffer if SYN: self._buffer[BUFID].__update__(hdr=info.header) # append packet index self._buffer[BUFID].ack[ACK].ind.append(info.num) # record fragment payload ISN = self._buffer[BUFID].ack[ACK].isn # Initial Sequence Number RAW = self._buffer[BUFID].ack[ACK].raw # Raw Payload Data if DSN >= ISN: # if fragment goes after existing payload LEN = self._buffer[BUFID].ack[ACK].len GAP = DSN - (ISN + LEN) # gap length between payloads if GAP >= 0: # if fragment goes after existing payload RAW += bytearray(GAP) + info.payload else: # if fragment partially overlaps existing payload RAW[DSN-ISN:] = info.payload else: # if fragment exceeds existing payload LEN = info.len GAP = ISN - (DSN + LEN) # gap length between payloads self._buffer[BUFID].ack[ACK].__update__( isn=DSN, ) if GAP >= 0: # if fragment exceeds existing payload RAW = info.payload + bytearray(GAP) + RAW else: # if fragment partially overlaps existing payload RAW = info.payload + RAW[ISN-GAP:] #self._buffer[BUFID].ack[ACK].raw = RAW # update payload datagram #self._buffer[BUFID].ack[ACK].len = len(RAW) # update payload length self._buffer[BUFID].ack[ACK].__update__( raw=RAW, # update payload datagram len=len(RAW), # update payload length ) # update hole descriptor list HDL = self._buffer[BUFID].hdl # HDL alias for (index, hole) in enumerate(HDL): # step one if info.first > hole.last: # step two continue if info.last < hole.first: # step three continue del HDL[index] # step four if info.first > hole.first: # step five new_hole = HoleDiscriptor( first=hole.first, last=info.first - 1, ) HDL.insert(index, new_hole) index += 1 if info.last < hole.last and not FIN and not RST: # step six new_hole = HoleDiscriptor( first=info.last + 1, last=hole.last ) HDL.insert(index, new_hole) break # step seven #self._buffer[BUFID].hdl = HDL # update HDL # when FIN/RST is set, submit buffer of this session if FIN or RST: self._dtgram.extend( self.submit(self._buffer.pop(BUFID), bufid=BUFID) )
[docs] def submit(self, buf: 'Buffer', *, bufid: 'BufferID') -> 'list[Datagram]': # type: ignore[override] # pylint: disable=arguments-differ """Submit reassembled payload. Arguments: buf: :term:`buffer <tcp.buffer>` dict of reassembled packets bufid: buffer identifier Returns: Reassembled :term:`packets <tcp.datagram>`. """ datagram = [] # type: list[Datagram] # reassembled datagram HDL = buf.hdl # hole descriptor list # check through every buffer with ACK for (ack, buffer) in buf.ack.items(): # if this buffer is not implemented # go through every hole and extract received payload if len(HDL) > 2 and self._strflg: data = [] # type: list[bytes] start = stop = 0 for hole in HDL: stop = hole.first byte = buffer.raw[start:stop] start = hole.last if byte: # strip empty payload data.append(byte) byte = buffer.raw[start:] if byte: # strip empty payload data.append(bytes(byte)) if data: # strip empty buffer packet = Datagram( completed=False, id=DatagramID( src=(bufid[0], bufid[1]), dst=(bufid[2], bufid[3]), ack=ack, ), index=tuple(buffer.ind), header=buf.hdr, payload=tuple(data), packet=None, ) datagram.append(packet) # if this buffer is implemented # export payload data & convert into bytes else: payload = buffer.raw if payload: # strip empty buffer packet = Datagram( completed=True, id=DatagramID( src=(bufid[0], bufid[1]), dst=(bufid[2], bufid[3]), ack=ack, ), index=tuple(buffer.ind), header=buf.hdr, payload=bytes(payload), packet=self.protocol.analyze((bufid[1], bufid[3]), bytes(payload)), ) datagram.append(packet) return datagram