Source code for pcapkit.foundation.reassembly.reassembly

# -*- coding: utf-8 -*-
"""Base Class
================

:mod:`pcapkit.foundation.reassembly.reassembly` contains
:class:`~pcapkit.foundation.reassembly.reassembly.Reassembly` only,
which is an abstract base class for all reassembly classes,
bases on algorithms described in :rfc:`791` and :rfc:`815`,
implements datagram reassembly of IP and TCP packets.

"""
import abc
from typing import TYPE_CHECKING, Generic, TypeVar

if TYPE_CHECKING:
    from typing import Any, Optional, Type

    from pcapkit.corekit.infoclass import Info
    from pcapkit.protocols.protocol import Protocol

__all__ = ['Reassembly']

# packet
PT = TypeVar('PT', bound='Info')
# datagram
DT = TypeVar('DT', bound='Info')
# buffer ID
IT = TypeVar('IT', bound='tuple')
# buffer
BT = TypeVar('BT', bound='Info')


[docs]class Reassembly(Generic[PT, DT, IT, BT], metaclass=abc.ABCMeta): """Base class for reassembly procedure. Args: strict: if return all datagrams (including those not implemented) when submit """ _strflg: 'bool' _newflg: 'bool' # Internal data storage for cached properties. __cached__: 'dict[str, Any]' ########################################################################## # Properties. ########################################################################## # protocol name of current reassembly object @property @abc.abstractmethod def name(self) -> 'str': """Protocol name of current reassembly object.""" # total number of reassembled packets @property def count(self) -> 'int': """Total number of reassembled packets.""" if self._newflg: self.__cached__.clear() self._newflg = False if (cached := self.__cached__.get('count')) is not None: return cached ret = len(self.datagram) self.__cached__['count'] = ret return ret # reassembled datagram @property def datagram(self) -> 'tuple[DT, ...]': """Reassembled datagram.""" if self._buffer: return self.fetch() return tuple(self._dtgram) @property @abc.abstractmethod def protocol(self) -> 'Type[Protocol]': """Protocol of current reassembly object.""" ########################################################################## # Methods. ########################################################################## # reassembly procedure
[docs] @abc.abstractmethod def reassembly(self, info: 'PT') -> 'None': """Reassembly procedure. Arguments: info: info dict of packets to be reassembled """ # clear cache self._newflg = False self.__cached__['count'] = None self.__cached__['fetch'] = None
# submit reassembled payload
[docs] @abc.abstractmethod def submit(self, buf: 'BT', **kwargs: 'Any') -> 'list[DT]': """Submit reassembled payload. Arguments: buf: buffer dict of reassembled packets **kwargs: arbitrary keyword arguments """
# fetch datagram
[docs] def fetch(self) -> 'tuple[DT, ...]': """Fetch datagram. Returns: Tuple of reassembled datagrams. Fetch reassembled datagrams from :attr:`~pcapkit.foundation.reassembly.reassembly.Reassembly._dtgram` and returns a *tuple* of such datagrams. If no cache found, the method will call :meth:`~pcapkit.foundation.reassembly.reassembly.Reassembly.submit` to *forcedly* obtain newly reassembled payload. Otherwise, the already calculated :attr:`~pcapkit.foundation.reassembly.reassembly.Reassembly._dtgram` will be returned. """ if self._newflg: self.__cached__.clear() self._newflg = False if (cached := self.__cached__.get('fetch')) is not None: return cached temp_dtgram = [] # type: list[DT] for (bufid, buffer) in self._buffer.items(): temp_dtgram.extend( self.submit(buffer, bufid=bufid) ) temp_dtgram.extend(self._dtgram) ret = tuple(temp_dtgram) self.__cached__['fetch'] = ret return ret
# return datagram index
[docs] def index(self, pkt_num: 'int') -> 'Optional[int]': """Return datagram index. Arguments: pkt_num: index of packet Returns: Reassembled datagram index which was from No. ``pkt_num`` packet; if not found, returns :obj:`None`. """ for counter, datagram in enumerate(self.datagram): if pkt_num in datagram.index: # type: ignore[attr-defined] return counter return None
# run automatically
[docs] def run(self, packets: 'list[PT]') -> 'None': """Run automatically. Arguments: packets: list of packet dicts to be reassembled """ for packet in packets: self.reassembly(packet)
########################################################################## # Data models. ########################################################################## def __new__(cls, *args: 'Any', **kwargs: 'Any') -> 'Reassembly[PT, DT, IT, BT]': # pylint: disable=unused-argument self = super().__new__(cls) # NOTE: Assign this attribute after ``__new__`` to avoid shared memory # reference between instances. self.__cached__ = {} return self
[docs] def __init__(self, *, strict: 'bool' = True) -> 'None': """Initialise packet reassembly. Args: strict: if return all datagrams (including those not implemented) when submit """ #: bool: Strict mode flag. self._strflg = strict #: bool: New datagram flag. self._newflg = False #: dict[IT, BT]: Dict buffer field. self._buffer = {} # type: dict[IT, BT] #: list[DT]: List reassembled datagram. self._dtgram = [] # type: list[DT]
[docs] def __call__(self, packet: 'PT') -> 'None': """Call packet reassembly. Arguments: packet: packet dict to be reassembled (detailed format described in corresponding protocol) """ self._newflg = True self.reassembly(packet)