Source code for pcapkit.foundation.traceflow

# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel
"""Trace TCP Flows
=====================

:mod:`pcapkit.foundation.traceflow` is the interface to trace
TCP flows from a series of packets and connections.

.. note::

   This was implemented as the demand of my mate
   `@gousaiyang <https://github.com/gousaiyang>`__.

"""
import collections
import importlib
import os
import sys
from typing import TYPE_CHECKING, Generic, TypeVar, overload

from pcapkit.corekit.infoclass import Info
from pcapkit.utilities.compat import Tuple
from pcapkit.utilities.exceptions import FileExists, stacklevel
from pcapkit.utilities.logging import logger
from pcapkit.utilities.warnings import FileWarning, FormatWarning, warn

if TYPE_CHECKING:
    from ipaddress import IPv4Address, IPv6Address
    from typing import Any, DefaultDict, Optional, TextIO, Type

    from dictdumper.dumper import Dumper
    from typing_extensions import Literal

    from pcapkit.const.reg.linktype import LinkType as RegType_LinkType
    from pcapkit.protocols.data.misc.pcap.frame import Frame as DataType_Frame

__all__ = ['TraceFlow']

IPAddress = TypeVar('IPAddress', 'IPv4Address', 'IPv6Address')

###############################################################################
# Data Models
###############################################################################

BufferID = Tuple[IPAddress, int, IPAddress, int]


[docs]class Packet(Info, Generic[IPAddress]): """Data structure for **TCP flow tracing**. See Also: * :meth:`pcapkit.foundation.traceflow.TraceFlow.dump` * :term:`trace.packet` """ #: Data link type from global header. protocol: 'RegType_LinkType' #: Frame number. index: 'int' #: Extracted frame info. frame: 'DataType_Frame | dict[str, Any]' #: TCP synchronise (SYN) flag. syn: 'bool' #: TCP finish (FIN) flag. fin: 'bool' #: Source IP. src: 'IPAddress' #: Destination IP. dst: 'IPAddress' #: TCP source port. srcport: 'int' #: TCP destination port. dstport: 'int' #: Frame timestamp. timestamp: 'float' if TYPE_CHECKING: def __init__(self, protocol: 'RegType_LinkType', index: 'int', frame: 'DataType_Frame | dict[str, Any]', syn: 'bool', fin: 'bool', src: 'IPAddress', dst: 'IPAddress', srcport: 'int', dstport: 'int', timestamp: 'float') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long
[docs]class Buffer(Info): """Data structure for **TCP flow tracing**. See Also: * :attr:`pcapkit.foundation.traceflow.TraceFlow.index` * :term:`trace.buffer` """ #: Output dumper object. fpout: 'Dumper' #: List of frame index. index: 'list[int]' #: Flow label generated from ``BUFID``. label: 'str' if TYPE_CHECKING: def __init__(self, fpout: 'Dumper', index: 'list[int]', label: 'str') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements
[docs]class Index(Info): """Data structure for **TCP flow tracing**. See Also: * element from :attr:`pcapkit.foundation.traceflow.TraceFlow.index` *tuple* * :term:`trace.index` """ #: Output filename if exists. fpout: 'Optional[str]' #: Tuple of frame index. index: 'tuple[int, ...]' #: Flow label generated from ``BUFID``. label: 'str' if TYPE_CHECKING: def __init__(self, fpout: 'Optional[str]', index: 'tuple[int, ...]', label: 'str') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements
############################################################################### # Algorithm Implementation ###############################################################################
[docs]class TraceFlow: """Trace TCP flows.""" # Internal data storage for cached properties. __cached__: 'dict[str, Any]' ########################################################################## # Defaults. ########################################################################## #: DefaultDict[str, tuple[str, str, str | None]]: Format dumper mapping for #: writing output files. The values should be a tuple representing the #: module name, class name and file extension. __output__ = collections.defaultdict( lambda: ('pcapkit.dumpkit', 'NotImplementedIO', None), { 'pcap': ('pcapkit.dumpkit', 'PCAPIO', '.pcap'), 'cap': ('pcapkit.dumpkit', 'PCAPIO', '.pcap'), 'plist': ('dictdumper', 'PLIST', '.plist'), 'xml': ('dictdumper', 'PLIST', '.plist'), 'json': ('dictdumper', 'JSON', '.json'), 'tree': ('dictdumper', 'Tree', '.txt'), 'text': ('dictdumper', 'Text', '.txt'), 'txt': ('dictdumper', 'Tree', '.txt'), } ) # type: DefaultDict[str, tuple[str, str, str | None]] ########################################################################## # Properties. ########################################################################## @property def index(self) -> 'tuple[Index, ...]': """Index table for traced flow.""" if self._buffer: return self.submit() return tuple(self._stream) ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def register(cls, format: 'str', module: 'str', class_: 'str', ext: 'str') -> 'None': # pylint: disable=redefined-builtin r"""Register a new dumper class. Notes: The full qualified class name of the new dumper class should be as ``{module}.{class_}``. Arguments: format: format name module: module name class\_: class name ext: file extension """ cls.__output__[format] = (module, class_, ext)
[docs] @classmethod def make_fout(cls, fout: 'str' = './tmp', fmt: 'str' = 'pcap') -> 'tuple[Type[Dumper], str | None]': """Make root path for output. Args: fout: root path for output fmt: output format Returns: Dumper of specified format and file extension of output file. Warns: FormatWarning: If ``fmt`` is not supported. FileWarning: If ``fout`` exists and ``fmt`` is :data:`None`. Raises: FileExists: If ``fout`` exists and ``fmt`` is **NOT** :data:`None`. """ module, class_, ext = cls.__output__[fmt] if ext is None: warn(f'Unsupported output format: {fmt}; disabled file output feature', FormatWarning, stacklevel=stacklevel()) output = getattr(importlib.import_module(module), class_) # type: Type[Dumper] try: os.makedirs(fout, exist_ok=True) except FileExistsError as error: if ext is None: warn(error.strerror, FileWarning, stacklevel=stacklevel()) else: raise FileExists(*error.args).with_traceback(error.__traceback__) class DictDumper(output): # type: ignore[valid-type,misc] """Customised :class:`~dictdumper.dumper.Dumper` object.""" def object_hook(self, o: 'Any') -> 'Any': """Convert content for function call. Args: o: object to convert Returns: Converted object. """ import datetime import decimal import enum import ipaddress import aenum if isinstance(o, decimal.Decimal): return str(o) if isinstance(o, datetime.timedelta): return o.total_seconds() if isinstance(o, Info): return o.to_dict() if isinstance(o, (ipaddress.IPv4Address, ipaddress.IPv6Address)): return str(o) if isinstance(o, (enum.IntEnum, aenum.IntEnum)): return dict( name=f'{type(o).__name__}::{o.name}', value=o.value, ) return super().object_hook(o) # type: ignore[unreachable] def default(self, o: 'Any') -> 'Literal["fallback"]': # pylint: disable=unused-argument """Check content type for function call.""" return 'fallback' def _append_fallback(self, value: 'Any', file: 'TextIO') -> 'None': if hasattr(value, '__slots__'): new_value = {key: getattr(value, key) for key in value.__slots__} elif hasattr(value, '__dict__'): new_value = vars(value) else: logger.warning('unsupported object type: %s', type(value)) new_value = str(value) # type: ignore[assignment] func = self._encode_func(new_value) func(new_value, file) return DictDumper, ext
[docs] def dump(self, packet: 'Packet') -> 'None': """Dump frame to output files. Arguments: packet (Dict[str, Any]): a flow packet (:term:`trace.packet`) """ # fetch flow label output = self.trace(packet, output=True) # dump files output(packet.frame, name=f'Frame {packet.index}') # pylint: disable=not-callable
@overload def trace(self, packet: 'Packet', *, output: 'Literal[True]' = ...) -> 'Dumper': ... @overload def trace(self, packet: 'Packet', *, output: 'Literal[False]' = ...) -> 'str': ...
[docs] def trace(self, packet: 'Packet', *, output: 'bool' = False) -> 'Dumper | str': """Trace packets. Arguments: packet: a flow packet (:term:`trace.packet`) output: flag if has formatted dumper Returns: If ``output`` is :data:`True`, returns the initiated :class:`~dictdumper.dumper.Dumper` object, which will dump data to the output file named after the flow label; otherwise, returns the flow label itself. Notes: The flow label is formatted as following: .. code-block:: python f'{packet.src}_{packet.srcport}-{packet.dst}_{info.dstport}-{packet.timestamp}' """ # clear cache self.__cached__['submit'] = None # Buffer Identifier BUFID = (packet.src, packet.srcport, packet.dst, packet.dstport) # type: BufferID # SYN = packet.syn # Synchronise Flag (Establishment) FIN = packet.fin # Finish Flag (Termination) # # when SYN is set, reset buffer of this seesion # if SYN and BUFID in self._buffer: # temp = self._buffer.pop(BUFID) # temp['fpout'] = (self._fproot, self._fdpext) # temp['index'] = tuple(temp['index']) # self._stream.append(Info(temp)) # initialise buffer with BUFID if BUFID not in self._buffer: label = f'{packet.src}_{packet.srcport}-{packet.dst}_{packet.dstport}-{packet.timestamp}' self._buffer[BUFID] = Buffer( fpout=self._foutio(fname=f'{self._fproot}/{label}{self._fdpext or ""}', protocol=packet.protocol, byteorder=self._endian, nanosecond=self._nnsecd), index=[], label=label, ) # trace frame record self._buffer[BUFID].index.append(packet.index) fpout = self._buffer[BUFID].fpout label = self._buffer[BUFID].label # when FIN is set, submit buffer of this session if FIN: buf = self._buffer.pop(BUFID) # fpout, label = buf['fpout'], buf['label'] self._stream.append(Index( fpout=f'{self._fproot}/{label}{self._fdpext}' if self._fdpext is not None else None, index=tuple(buf.index), label=label, )) # return label or output object return fpout if output else label
[docs] def submit(self) -> 'tuple[Index, ...]': """Submit traced TCP flows. Returns: Traced TCP flow (:term:`trace.index`). """ if (cached := self.__cached__.get('submit')) is not None: return cached ret = [] # type: list[Index] for buf in self._buffer.values(): ret.append(Index(fpout=f"{self._fproot}/{buf.label}{self._fdpext}" if self._fdpext else None, index=tuple(buf.index), label=buf.label,)) ret.extend(self._stream) ret_submit = tuple(ret) self.__cached__['submit'] = ret_submit return ret_submit
########################################################################## # Data models. ########################################################################## def __new__(cls, *args: 'Any', **kwargs: 'Any') -> 'TraceFlow': # pylint: disable=unused-argument self = super().__new__(cls) # NOTE: Assign this attribute after ``__new__`` to avoid shared memory # reference between instances. self.__cached__ = {} return self
[docs] def __init__(self, fout: 'Optional[str]', format: 'Optional[str]', # pylint: disable=redefined-builtin byteorder: 'Literal["little", "big"]' = sys.byteorder, nanosecond: bool = False) -> 'None': """Initialise instance. Arguments: fout: output path format: output format byteorder: output file byte order nanosecond: output nanosecond-resolution file flag """ if fout is None: fout = './tmp' if format is None: format = 'pcap' #: str: Output root path. self._fproot = fout #: dict[BufferID, Buffer]: Buffer field (:term:`trace.buffer`). self._buffer = {} # type: dict[BufferID, Buffer] #: list[Index]: Stream index (:term:`trace.index`). self._stream = [] # type: list[Index] #: Literal['little', 'big']: Output file byte order. self._endian = byteorder #: bool: Output nanosecond-resolution file flag. self._nnsecd = nanosecond # dump I/O object fio, ext = self.make_fout(fout, format) #: Type[Dumper]: Dumper class. self._foutio = fio #: Optional[str]: Output file extension. self._fdpext = ext
[docs] def __call__(self, packet: 'Packet') -> 'None': """Dump frame to output files. Arguments: packet: a flow packet (:term:`trace.packet`) """ # trace frame record self.dump(packet)