Source code for pcapkit.dumpkit.pcap

# -*- coding: utf-8 -*-
"""PCAP Dumper
=================

:mod:`pcapkit.dumpkit.pcap` is the dumper for :mod:`pcapkit` implementation,
specifically for PCAP format, which is alike those described in
:mod:`dictdumper`.

"""
import sys
from typing import TYPE_CHECKING

import dictdumper

from pcapkit.protocols.misc.pcap.frame import Frame
from pcapkit.protocols.misc.pcap.header import Header

if TYPE_CHECKING:
    from enum import IntEnum as StdlibIntEnum
    from typing import Any, BinaryIO, Optional

    from aenum import IntEnum as AenumIntEnum
    from typing_extensions import Literal

    from pcapkit.const.reg.linktype import LinkType as RegType_LinkType

__all__ = [
    'PCAPIO',
]


[docs]class PCAPIO(dictdumper.Dumper): """PCAP file dumper.""" ########################################################################## # Properties. ########################################################################## @property def kind(self) -> 'Literal["pcap"]': """File format of current dumper.""" return 'pcap' ########################################################################## # Data models. ########################################################################## def __init__(self, fname: 'str', *, protocol: 'RegType_LinkType | StdlibIntEnum | AenumIntEnum | str | int', byteorder: 'Literal["big", "little"]' = sys.byteorder, nanosecond: 'bool' = False, **kwargs: 'Any') -> 'None': # pylint: disable=arguments-differ """Initialise dumper. Args: fname: output file name protocol: data link type byteorder: header byte order nanosecond: nanosecond-resolution file flag **kwargs: arbitrary keyword arguments """ #: int: Frame counter. self._fnum = 1 #: bool: Nanosecond-resolution file flag. self._nsec = nanosecond #: RegType_LinkType | StdlibIntEnum | AenumIntEnum | str | int: Data link type. self._link = protocol super().__init__(fname, protocol=protocol, byteorder=byteorder, nanosecond=nanosecond, **kwargs) def __call__(self, value: 'Frame', name: 'Optional[str]' = None) -> 'PCAPIO': """Dump a new frame. Args: value: content to be dumped name: name of current content block Returns: The dumper class itself (to support chain calling). """ with open(self._file, 'ab') as file: self._append_value(value, file, name or '') return self ########################################################################## # Utilities. ########################################################################## def _dump_header(self, *, protocol: 'RegType_LinkType | StdlibIntEnum | AenumIntEnum | str | int', # pylint: disable=arguments-differ byteorder: 'Literal["big", "little"]' = sys.byteorder, nanosecond: 'bool' = False, **kwargs: 'Any') -> 'None': # pylint: disable=unused-argument """Initially dump file heads and tails. Args: protocol: data link type byteorder: header byte order nanosecond: nanosecond-resolution file flag **kwargs: arbitrary keyword arguments """ packet = Header( network=protocol, byteorder=byteorder, nanosecond=nanosecond, ).data with open(self._file, 'wb') as file: file.write(packet) def _append_value(self, value: 'Frame', file: 'BinaryIO', name: 'str') -> 'None': # pylint: disable=unused-argument """Call this function to write contents. Args: value: content to be dumped file: output file name: name of current content block """ packet = Frame( nanosecond=self._nsec, num=self._fnum, proto=self._link, packet=value.packet, **value.info.frame_info, ).data file.write(packet) self._fnum += 1