Source code for pcapkit.protocols.misc.pcap.frame

# -*- coding: utf-8 -*-
"""Frame Header
==================

:mod:`pcapkit.protocols.misc.pcap.frame` contains
:class:`~pcapkit.protocols.misc.pcap.frame.Frame` only,
which implements extractor for frame headers [*]_ of PCAP,
whose structure is described as below:

.. code-block:: c

    typedef struct pcaprec_hdr_s {
        guint32 ts_sec;     /* timestamp seconds */
        guint32 ts_usec;    /* timestamp microseconds */
        guint32 incl_len;   /* number of octets of packet saved in file */
        guint32 orig_len;   /* actual length of packet */
    } pcaprec_hdr_t;

.. [*] https://wiki.wireshark.org/Development/LibpcapFileFormat#Record_.28Packet.29_Header

"""
import collections
import datetime
import decimal
import io
import sys
import time
from typing import TYPE_CHECKING, overload

from pcapkit.const.reg.linktype import LinkType as RegType_LinkType
from pcapkit.protocols.data.misc.pcap.frame import Frame as DataType_Frame
from pcapkit.protocols.data.misc.pcap.frame import FrameInfo as DataType_FrameInfo
from pcapkit.protocols.protocol import Protocol
from pcapkit.utilities.exceptions import StructError, UnsupportedCall

if TYPE_CHECKING:
    from decimal import Decimal
    from typing import Any, BinaryIO, Optional, Type

    from typing_extensions import Literal

    from pcapkit.protocols.data.misc.pcap.header import Header as DataType_Header

__all__ = ['Frame']

# check Python version
py37 = ((version_info := sys.version_info).major >= 3 and version_info.minor >= 7)


[docs]class Frame(Protocol[DataType_Frame]): """Per packet frame header extractor. This class currently supports parsing of the following protocols, which are registered in the :attr:`self.__proto__ <pcapkit.protocols.misc.pcap.frame.Frame.__proto__>` attribute: .. list-table:: :header-rows: 1 * - Index - Protocol * - :attr:`pcapkit.const.reg.linktype.LinkType.ETHERNET` - :class:`pcapkit.protocols.link.ethernet.Ethernet` * - :attr:`pcapkit.const.reg.linktype.LinkType.IPV4` - :class:`pcapkit.protocols.internet.ipv4.IPv4` * - :attr:`pcapkit.const.reg.linktype.LinkType.IPV6` - :class:`pcapkit.protocols.internet.ipv6.IPv6` """ ########################################################################## # Defaults. ########################################################################## #: DefaultDict[int, tuple[str, str]]: Protocol index mapping for decoding next layer, #: c.f. :meth:`self._decode_next_layer <pcapkit.protocols.protocol.Protocol._decode_next_layer>` #: & :meth:`self._import_next_layer <pcapkit.protocols.protocol.Protocol._import_next_layer>`. #: The values should be a tuple representing the module name and class name. __proto__ = collections.defaultdict( lambda: ('pcapkit.protocols.misc.raw', 'Raw'), { RegType_LinkType.ETHERNET: ('pcapkit.protocols.link', 'Ethernet'), RegType_LinkType.IPV4: ('pcapkit.protocols.internet', 'IPv4'), RegType_LinkType.IPV6: ('pcapkit.protocols.internet', 'IPv6'), }, ) ########################################################################## # Properties. ########################################################################## @property def name(self) -> 'str': """Name of corresponding protocol.""" return f'Frame {self._fnum}' @property def length(self) -> 'Literal[16]': """Header length of corresponding protocol.""" return 16 @property def header(self) -> 'DataType_Header': """Global header of the PCAP file.""" return self._ghdr ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def register(cls, code: 'RegType_LinkType', module: 'str', class_: 'str') -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. Arguments: code: protocol code as in :class:`~pcapkit.const.reg.linktype.LinkType` module: module name class\_: class name """ cls.__proto__[code] = (module, class_)
[docs] def index(self, name: 'str | Protocol | Type[Protocol]') -> 'int': """Call :meth:`ProtoChain.index <pcapkit.corekit.protochain.ProtoChain.index>`. Args: name: ``name`` to be searched Returns: First index of ``name``. Raises: IndexNotFound: if ``name`` is not present """ return self._protos.index(name)
[docs] def read(self, length: 'Optional[int]' = None, *, _read: 'bool' = True, **kwargs: 'Any') -> 'DataType_Frame': r"""Read each block after global header. Args: length: Length of packet data. \_read: If the class is called in a parsing scenario. **kwargs: Arbitrary keyword arguments. Returns: DataType_Frame: Parsed packet data. Raises: EOFError: If :attr:`self._file <pcapkit.protocols.protocol.Protocol._file>` reaches EOF. """ try: _temp = self._read_unpack(4, lilendian=True) except StructError as exc: if exc.eof: raise EOFError # pylint: disable=raise-missing-from raise _tsss = _temp _tsus = self._read_unpack(4, lilendian=True) _ilen = self._read_unpack(4, lilendian=True) _olen = self._read_unpack(4, lilendian=True) if self._nsec: _epch = _tsss + decimal.Decimal(_tsus) / 1_000_000_000 else: _epch = _tsss + decimal.Decimal(_tsus) / 1_000_000 _time = datetime.datetime.fromtimestamp(float(_epch)) frame = DataType_Frame( frame_info=DataType_FrameInfo( ts_sec=_tsss, ts_usec=_tsus, incl_len=_ilen, orig_len=_olen, ), time=_time, number=self._fnum, time_epoch=_epch, len=_ilen, cap_len=_olen, ) if not _read: # move backward to the beginning of the packet self._file.seek(-self.length, io.SEEK_CUR) else: # NOTE: We create a copy of the frame packet data here for parsing # scenarios to keep the original packet data intact. # move backward to the beginning of the frame self._file.seek(-self.length, io.SEEK_CUR) #: bytes: Raw packet data. self._data = self._read_fileng(self.length + frame.len) #: io.BytesIO: Source packet stream. self._file = io.BytesIO(self._data) # move forward to the beginning of frame's first packet self._file.seek(self.length, io.SEEK_CUR) return self._decode_next_layer(frame, self._ghdr.network, frame.len)
[docs] def make(self, *, timestamp: 'Optional[float | Decimal]' = None, # type: ignore[override] # pylint: disable=arguments-differ ts_sec: 'Optional[int]' = None, ts_usec: 'Optional[int]' = None, incl_len: 'Optional[int]' = None, orig_len: 'Optional[int]' = None, packet: 'bytes', nanosecond: 'bool' = False, **kwargs: 'Any') -> 'bytes': """Make frame packet data. Args: timestamp: UNIX-Epoch timestamp ts_sec: timestamp seconds ts_usec: timestamp microseconds incl_len: number of octets of packet saved in file orig_len: actual length of packet packet: raw packet data nanosecond: nanosecond-resolution file flag **kwargs: Arbitrary keyword arguments. Returns: Constructed packet data. """ ts_sec, ts_usec = self._make_timestamp(timestamp, ts_sec, ts_usec, nanosecond) if incl_len is None: incl_len = min(len(packet), self._ghdr.snaplen) if orig_len is None: orig_len = len(packet) # make packet return b'%s%s%s%s%s' % ( self._make_pack(ts_sec, size=4, lilendian=True), self._make_pack(ts_usec, size=4, lilendian=True), self._make_pack(incl_len, size=4, lilendian=True), self._make_pack(orig_len, size=4, lilendian=True), packet[:incl_len], )
########################################################################## # Data models. ########################################################################## @overload # type: ignore[override] def __post_init__(self, file: 'BinaryIO', length: 'Optional[int]' = ..., *, # pylint: disable=arguments-differ num: 'int', header: 'DataType_Header', **kwargs: 'Any') -> 'None': ... @overload def __post_init__(self, *, num: 'int', header: 'DataType_Header', # pylint: disable=arguments-differ **kwargs: 'Any') -> 'None': ...
[docs] def __post_init__(self, file: 'Optional[BinaryIO]' = None, length: 'Optional[int]' = None, *, # pylint: disable=arguments-differ num: 'int', header: 'DataType_Header', **kwargs: 'Any') -> 'None': """Initialisation. Args: file: Source packet stream. length: Length of packet data. num: Frame index number. header: Global header of the PCAP file. **kwargs: Arbitrary keyword arguments. See Also: For construction argument, please refer to :meth:`make`. """ #: int: frame index number self._fnum = num #: pcapkit.protocols.misc.pcap.header.Header: Global header of the PCAP file. self._ghdr = header #: pcapkit.const.reg.linktype.LinkType: next layer protocol index self._prot = header.network #: bool: nanosecond-timestamp PCAP flag self._nsec = header.magic_number.nanosecond if file is None: _read = False #: bytes: Raw packet data. self._data = self.make(**kwargs) #: io.BytesIO: Source packet stream. self._file = io.BytesIO(self._data) else: _read = True #: io.BytesIO: Source packet stream. self._file = file #: pcapkit.corekit.infoclass.Info: Parsed packet data. self._info = self.read(length, _read=_read, **kwargs)
def __length_hint__(self) -> 'Literal[16]': """Return an estimated length for the object.""" return 16 # NOTE: This is a hack to make the ``__index__`` method work both as a # class method and an instance method.
[docs] def __index__(self: 'Optional[Frame]' = None) -> 'int': # type: ignore[override] """Index of the frame. Args: self: :class:`Frame` object or :obj:`None`. Returns: If the object is initiated, i.e. :attr:`self._fnum <pcapkit.protocols.misc.pcap.frame.Frame._fnum>` exists, returns the frame index number of itself; else raises :exc:`UnsupportedCall`. Raises: UnsupportedCall: This protocol has no registry entry. """ if self is None: raise UnsupportedCall("'Frame' object cannot be interpreted as an integer") return self._fnum
########################################################################## # Utilities. ########################################################################## def _make_timestamp(self, timestamp: 'Optional[float | Decimal]' = None, ts_sec: 'Optional[int]' = None, ts_usec: 'Optional[int]' = None, nanosecond: 'bool' = False) -> 'tuple[int, int]': """Make timestamp. Args: timestamp: UNIX-Epoch timestamp ts_sec: timestamp seconds ts_usec: timestamp microseconds nanosecond: nanosecond-resolution file flag Returns: Second and microsecond/nanosecond value of timestamp. """ if timestamp is None: if py37 and nanosecond: timestamp = decimal.Decimal(time.time_ns()) / 1_000_000_000 else: timestamp = decimal.Decimal(time.time()) else: timestamp = decimal.Decimal(timestamp) if ts_sec is None: ts_sec = int(timestamp) if ts_usec is None: ts_usec = int(timestamp - ts_sec) * (1_000_000_000 if nanosecond else 1_000_000) return ts_sec, ts_usec
[docs] def _decode_next_layer(self, dict_: 'DataType_Frame', proto: 'Optional[int]' = None, length: 'Optional[int]' = None) -> 'DataType_Frame': # pylint: disable=arguments-differ r"""Decode next layer protocol. Arguments: dict\_: info buffer proto: next layer protocol index length: valid (*non-padding*) length Returns: dict: current protocol with packet extracted """ next_ = self._import_next_layer(proto, length) # type: ignore[misc,call-arg] info, chain = next_.info, next_.protochain # make next layer protocol name layer = next_.info_name # proto = next_.__class__.__name__ # write info and protocol chain into dict dict_.__update__([ (layer, info), ('protocols', chain.chain), ]) self._next = next_ # pylint: disable=attribute-defined-outside-init self._protos = chain # pylint: disable=attribute-defined-outside-init return dict_