Source code for pcapkit.protocols.pcap.frame

# -*- coding: utf-8 -*-
"""frame header

:mod:`pcapkit.protocols.pcap.frame` contains
:class:`~pcapkit.protocols.pcap.frame.Frame` only,
which implements extractor for frame headers of PCAP,
whose structure is described as below:

.. code:: c

    typedef struct pcaprec_hdr_s {
        guint32 ts_sec;     /* timestamp seconds */
        guint32 ts_usec;    /* timestamp microseconds */
        guint32 incl_len;   /* number of octets of packet saved in file */
        guint32 orig_len;   /* actual length of packet */
    } pcaprec_hdr_t;

"""
import collections
import datetime
import importlib
import io
import os
import sys
import time
import traceback

from pcapkit.corekit.infoclass import Info
from pcapkit.protocols.protocol import Protocol
from pcapkit.utilities.compat import cached_property
from pcapkit.utilities.decorators import beholder
from pcapkit.utilities.exceptions import UnsupportedCall

__all__ = ['Frame']

# check Python version
version_info = sys.version_info
py37 = (version_info.major >= 3 and version_info.minor >= 7)


[docs]class Frame(Protocol): """Per packet frame header extractor.""" ########################################################################## # Defaults. ########################################################################## #: DefaultDict[int, Tuple[str, str]]: Protocol index mapping for decoding next layer, #: c.f. :meth:`self._decode_next_layer <pcapkit.protocols.protocol.Protocol._decode_next_layer>` #: & :meth:`self._import_next_layer <pcapkit.protocols.protocol.Protocol._import_next_layer>`. #: The values should be a tuple representing the module name and class name. __proto__ = collections.defaultdict(lambda: ('pcapkit.protocols.raw', 'Raw'), { 1: ('pcapkit.protocols.link', 'Ethernet'), 228: ('pcapkit.protocols.internet', 'IPv4'), 229: ('pcapkit.protocols.internet', 'IPv6'), }) ########################################################################## # Properties. ########################################################################## @property def name(self): """Name of corresponding protocol. :rtype: str """ return f'Frame {self._fnum}' @property def length(self): """Header length of corresponding protocol. :rtype: Literal[16] """ return 16 ########################################################################## # Methods. ##########################################################################
[docs] def index(self, name): """Call :meth:`ProtoChain.index <pcapkit.corekit.protochain.ProtoChain.index>`. Args: name (Union[str, Protocol, Type[Protocol]]): ``name`` to be searched Returns: int: first index of ``name`` Raises: IndexNotFound: if ``name`` is not present """ return self._protos.index(name)
[docs] def read(self, length=None, **kwargs): # pylint: disable=unused-argument """Read each block after global header. Args: length (Optional[int]): Length of packet data. Keyword Args: **kwargs: Arbitrary keyword arguments. Returns: DataType_Frame: Parsed packet data. Raises: EOFError: If :attr:`self._file <pcapkit.protocols.pcap.frame.Frame._file>` reaches EOF. """ # _scur = self._file.tell() _temp = self._read_unpack(4, lilendian=True, quiet=True) if _temp is None: raise EOFError _tsss = _temp _tsus = self._read_unpack(4, lilendian=True) _ilen = self._read_unpack(4, lilendian=True) _olen = self._read_unpack(4, lilendian=True) if self._nsec: _epch = _tsss + _tsus / 1000000000 else: _epch = _tsss + _tsus / 1000000 _time = datetime.datetime.fromtimestamp(_epch) frame = dict( frame_info=dict( ts_sec=_tsss, ts_usec=_tsus, incl_len=_ilen, orig_len=_olen, ), time=_time, number=self._fnum, time_epoch=_epch, len=_ilen, cap_len=_olen, ) # load packet data length = frame['len'] bytes_ = self._file.read(length) # record file pointer if self._mpkt and self._mpfp: # print(self._fnum, 'ready') self._mpfp.put(self._file.tell()) self._mpkt.pool += 1 # make BytesIO from frame packet data frame['packet'] = bytes_ self._file = io.BytesIO(bytes_) # frame['packet'] = self._read_packet(header=0, payload=length, discard=True) return self._decode_next_layer(frame, length)
[docs] def make(self, **kwargs): """Make frame packet data. Keyword Args: timestamp (float): UNIX-Epoch timestamp ts_sec (int): timestamp seconds ts_usec (int): timestamp microseconds incl_len (int): number of octets of packet saved in file orig_len (int): actual length of packet packet (bytes): raw packet data (default: ``b''``) nanosecond (bool): nanosecond-resolution file flag (default: :data:`False`) **kwargs: Arbitrary keyword arguments. Returns: bytes: Constructed packet data. """ # fetch values ts_sec, ts_usec = self._make_timestamp(**kwargs) packet = kwargs.get('packet', bytes()) # raw packet data incl_len = kwargs.get('incl_len', len(packet)) # number of octets of packet saved in file orig_len = kwargs.get('orig_len', len(packet)) # actual length of packet # make packet return b'%s%s%s%s%s' % ( self._make_pack(ts_sec, size=4, lilendian=True), self._make_pack(ts_usec, size=4, lilendian=True), self._make_pack(incl_len, size=4, lilendian=True), self._make_pack(orig_len, size=4, lilendian=True), packet, )
########################################################################## # Data models. ##########################################################################
[docs] def __post_init__(self, file=None, length=None, *, num, proto, nanosecond, **kwargs): # pylint: disable=arguments-differ """Initialisation. Args: file (Optional[io.BytesIO]): Source packet stream. length (Optional[int]): Length of packet data. Keyword Args: num (int): Frame index number (:attr:`self._fnum <pcapkit.protocols.pcap.frame.Frame._fnum>`). proto (pcapkit.const.reg.linktype.LinkType): Next layer protocol index (:attr:`self._prot <pcapkit.protocols.pcap.frame.Frame._prot>`). nanosecond (bool): Nanosecond-timestamp PCAP flag (:attr:`self._nsec <pcapkit.protocols.pcap.frame.Frame._nsec>`). mpfdp (multiprocessing.Queue): Multiprocessing file descriptor queue (:attr:`self._mpfp <pcapkit.protocols.pcap.frame.Frame._mpfp>`). mpkit (multiprocessing.Namespace): Multiprocessing auxiliaries (:attr:`self._mpkt <pcapkit.protocols.pcap.frame.Frame._mpkt>`). **kwargs: Arbitrary keyword arguments. For *multiprocessing* related parameters, please refer to :class:`pcapkit.foundation.extration.Extrator` for more information. See Also: For construction argument, please refer to :meth:`make`. """ #: int: frame index number self._fnum = num #: pcapkit.const.reg.linktype.LinkType: next layer protocol index self._prot = proto #: bool: nanosecond-timestamp PCAP flag self._nsec = nanosecond #: multiprocessing.Queue: multiprocessing file descriptor queue (*not available after initialisation*) self._mpfp = kwargs.pop('mpfdp', None) #: multiprocessing.Namespace: multiprocessing auxiliaries (*not available after initialisation*) self._mpkt = kwargs.pop('mpkit', None) if file is None: #: bytes: Raw packet data. self._data = self.make(**kwargs) #: io.BytesIO: Source packet stream. self._file = io.BytesIO(self._data) #: pcapkit.corekit.infoclass.Info: Parsed packet data. self._info = Info(self.read()) else: #: io.BytesIO: Source packet stream. self._file = file #: pcapkit.corekit.infoclass.Info: Parsed packet data. self._info = Info(self.read()) #: bytes: Raw packet data. self._data = self._read_packet(self._info.len) # pylint: disable=no-member #: io.BytesIO: Source packet stream. self._file = io.BytesIO(self._data) # remove temporary multiprocessing support attributes [delattr(self, attr) for attr in filter(lambda attr: attr.startswith('_mp'), dir(self))] # pylint: disable=expression-not-assigned
@cached_property def __len__(self): """Total length of corresponding protocol.""" return self._info.len # pylint: disable=no-member
[docs] def __length_hint__(self): """Return an estimated length for the object. :rtype: Literal[16] """ return 16
[docs] def __getitem__(self, key): """Subscription (``getitem``) support. This method fist checks if ``key`` exists in :attr:`self._info <pcapkit.protocols.pcap.frame.Frame._info>`. If so, returns the corresponding value, else calls the original :meth:`~pcapkit.protocols.protocol.Protocol.__getitem__` method. Args: key (Union[str, Protocol, Type[Protocol]]): Indexing key. Returns: * If ``key`` exists in :attr:`self._info <pcapkit.protocols.pcap.frame.Frame._info>`, returns the value of the ``key``; * else returns the sub-packet from the current packet of indexed protocol. """ # if requested attributes in info dict, # else call the original function try: return self._info[key] except KeyError: return super().__getitem__(key)
[docs] def __index__(self=None): """Index of the protocol. Returns: int: If the object is initiated, i.e. :attr:`self._fnum <pcapkit.protocols.pcap.frame.Frame._fnum>` exists, returns the frame index number of itself; else raises :exc:`UnsupportedCall`. Raises: UnsupportedCall: This protocol has no registry entry. """ if self is None: return 'Frame' if getattr(self, '_fnum', None) is None: raise UnsupportedCall("'Frame' object cannot be interpreted as an integer") return self._fnum
[docs] def __contains__(self, name): """Returns if ``name`` is in :attr:`self._info <pcapkit.protocols.protocol.Protocol._info>` or in the frame packet :attr:`self._protos <pcapkit.protocols.protocol.Protocol._protos>`. Args: name (Any): name to search Returns: bool: if ``name`` exists """ if isinstance(name, type) and issubclass(name, Protocol): name = name.id() if isinstance(name, tuple): for item in name: if item in self._protos: return True return False return (name in self._info) or (name in self._protos)
########################################################################## # Utilities. ##########################################################################
[docs] def _make_timestamp(self, **kwargs): # pylint: disable=no-self-use """Make timestamp. Keyword Args: **kwargs: Arbitrary keyword arguments. Returns: Tuple[int, int]: Second and microsecond/nanosecond value of timestamp. """ nanosecond = kwargs.get('nanosecond', False) # nanosecond-resolution file flag timestamp = kwargs.get('timestamp', time.time()) # timestamp ts_sec = kwargs.get('ts_sec', int(timestamp)) # timestamp seconds if py37 and nanosecond: _default_ts_usec = time.time_ns() % 1000000000 else: _default_ts_usec = int((timestamp - ts_sec) * (1000000000 if nanosecond else 1000000)) ts_usec = kwargs.get('ts_usec', _default_ts_usec) # timestamp microseconds return ts_sec, ts_usec
[docs] def _decode_next_layer(self, data, length=None): # pylint: disable=arguments-differ """Decode next layer protocol. Positional arguments: data (dict): info buffer length (int): valid (*non-padding*) length Returns: dict: current protocol with packet extracted """ seek_cur = self._file.tell() try: next_ = self._import_next_layer(self._prot, length) except Exception: data['error'] = traceback.format_exc(limit=1).strip().split(os.linesep)[-1] self._file.seek(seek_cur, os.SEEK_SET) next_ = beholder(self._import_next_layer)(self, self._prot, length, error=True) info, chain = next_.info, next_.protochain # make next layer protocol name layer = next_.alias.lower() # proto = next_.__class__.__name__ # write info and protocol chain into dict self._next = next_ # pylint: disable=attribute-defined-outside-init self._protos = chain # pylint: disable=attribute-defined-outside-init data[layer] = info data['protocols'] = self._protos.chain return data
[docs] def _import_next_layer(self, proto, length, error=False): # pylint: disable=arguments-differ """Import next layer extractor. This method currently supports following protocols as registered in :data:`~pcapkit.const.reg.linktype.LinkType`: .. list-table:: :header-rows: 1 * - ``proto`` - Protocol * - 1 - :class:`~pcapkit.protocols.link.ethernet.Ethernet` * - 228 - :class:`~pcapkit.protocols.internet.ipv4.IPv4` * - 229 - :class:`~pcapkit.protocols.internet.ipv6.IPv6` Arguments: proto (pcapkit.const.reg.linktype.LinkType): next layer protocol index length (int): valid (*non-padding*) length Keyword arguments: error (bool): if function called on error Returns: pcapkit.protocols.protocol.Protocol: instance of next layer """ module, name = self.__proto__[int(proto)] protocol = getattr(importlib.import_module(module), name) next_ = protocol(self._file, length, error=error, layer=self._exlayer, protocol=self._exproto) return next_