Source code for

# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel
"""Trace TCP Flows

:mod:`` is the interface to trace
TCP flows from a series of packets and connections.

.. note::

   This was implemented as the demand of my mate
   `@gousaiyang <>`__.

import collections
import importlib
import os
import sys
from typing import TYPE_CHECKING, Generic, TypeVar, overload

from pcapkit.corekit.infoclass import Info
from pcapkit.utilities.compat import Tuple
from pcapkit.utilities.exceptions import FileExists, stacklevel
from pcapkit.utilities.logging import logger
from pcapkit.utilities.warnings import FileWarning, FormatWarning, warn

    from ipaddress import IPv4Address, IPv6Address
    from typing import Any, DefaultDict, Optional, TextIO, Type

    from dictdumper.dumper import Dumper
    from typing_extensions import Literal

    from pcapkit.const.reg.linktype import LinkType as RegType_LinkType
    from import Frame as DataType_Frame

__all__ = ['TraceFlow']

IPAddress = TypeVar('IPAddress', 'IPv4Address', 'IPv6Address')

# Data Models

BufferID = Tuple[IPAddress, int, IPAddress, int]

[docs]class Packet(Info, Generic[IPAddress]): """Data structure for **TCP flow tracing**. See Also: * :meth:`` * :term:`trace.packet` """ #: Data link type from global header. protocol: 'RegType_LinkType' #: Frame number. index: 'int' #: Extracted frame info. frame: 'DataType_Frame | dict[str, Any]' #: TCP synchronise (SYN) flag. syn: 'bool' #: TCP finish (FIN) flag. fin: 'bool' #: Source IP. src: 'IPAddress' #: Destination IP. dst: 'IPAddress' #: TCP source port. srcport: 'int' #: TCP destination port. dstport: 'int' #: Frame timestamp. timestamp: 'float' if TYPE_CHECKING: def __init__(self, protocol: 'RegType_LinkType', index: 'int', frame: 'DataType_Frame | dict[str, Any]', syn: 'bool', fin: 'bool', src: 'IPAddress', dst: 'IPAddress', srcport: 'int', dstport: 'int', timestamp: 'float') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long
[docs]class Buffer(Info): """Data structure for **TCP flow tracing**. See Also: * :attr:`` * :term:`trace.buffer` """ #: Output dumper object. fpout: 'Dumper' #: List of frame index. index: 'list[int]' #: Flow label generated from ``BUFID``. label: 'str' if TYPE_CHECKING: def __init__(self, fpout: 'Dumper', index: 'list[int]', label: 'str') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements
[docs]class Index(Info): """Data structure for **TCP flow tracing**. See Also: * element from :attr:`` *tuple* * :term:`trace.index` """ #: Output filename if exists. fpout: 'Optional[str]' #: Tuple of frame index. index: 'tuple[int, ...]' #: Flow label generated from ``BUFID``. label: 'str' if TYPE_CHECKING: def __init__(self, fpout: 'Optional[str]', index: 'tuple[int, ...]', label: 'str') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements
############################################################################### # Algorithm Implementation ###############################################################################
[docs]class TraceFlow: """Trace TCP flows.""" # Internal data storage for cached properties. __cached__: 'dict[str, Any]' ########################################################################## # Defaults. ########################################################################## #: DefaultDict[str, tuple[str, str, str | None]]: Format dumper mapping for #: writing output files. The values should be a tuple representing the #: module name, class name and file extension. __output__ = collections.defaultdict( lambda: ('pcapkit.dumpkit', 'NotImplementedIO', None), { 'pcap': ('pcapkit.dumpkit', 'PCAPIO', '.pcap'), 'cap': ('pcapkit.dumpkit', 'PCAPIO', '.pcap'), 'plist': ('dictdumper', 'PLIST', '.plist'), 'xml': ('dictdumper', 'PLIST', '.plist'), 'json': ('dictdumper', 'JSON', '.json'), 'tree': ('dictdumper', 'Tree', '.txt'), 'text': ('dictdumper', 'Text', '.txt'), 'txt': ('dictdumper', 'Tree', '.txt'), } ) # type: DefaultDict[str, tuple[str, str, str | None]] ########################################################################## # Properties. ########################################################################## @property def index(self) -> 'tuple[Index, ...]': """Index table for traced flow.""" if self._buffer: return self.submit() return tuple(self._stream) ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def register(cls, format: 'str', module: 'str', class_: 'str', ext: 'str') -> 'None': # pylint: disable=redefined-builtin r"""Register a new dumper class. Notes: The full qualified class name of the new dumper class should be as ``{module}.{class_}``. Arguments: format: format name module: module name class\_: class name ext: file extension """ cls.__output__[format] = (module, class_, ext)
[docs] @classmethod def make_fout(cls, fout: 'str' = './tmp', fmt: 'str' = 'pcap') -> 'tuple[Type[Dumper], str | None]': """Make root path for output. Args: fout: root path for output fmt: output format Returns: Dumper of specified format and file extension of output file. Warns: FormatWarning: If ``fmt`` is not supported. FileWarning: If ``fout`` exists and ``fmt`` is :data:`None`. Raises: FileExists: If ``fout`` exists and ``fmt`` is **NOT** :data:`None`. """ module, class_, ext = cls.__output__[fmt] if ext is None: warn(f'Unsupported output format: {fmt}; disabled file output feature', FormatWarning, stacklevel=stacklevel()) output = getattr(importlib.import_module(module), class_) # type: Type[Dumper] try: os.makedirs(fout, exist_ok=True) except FileExistsError as error: if ext is None: warn(error.strerror, FileWarning, stacklevel=stacklevel()) else: raise FileExists(*error.args).with_traceback(error.__traceback__) class DictDumper(output): # type: ignore[valid-type,misc] """Customised :class:`~dictdumper.dumper.Dumper` object.""" def object_hook(self, o: 'Any') -> 'Any': """Convert content for function call. Args: o: object to convert Returns: Converted object. """ import datetime import decimal import enum import ipaddress import aenum if isinstance(o, decimal.Decimal): return str(o) if isinstance(o, datetime.timedelta): return o.total_seconds() if isinstance(o, Info): return o.to_dict() if isinstance(o, (ipaddress.IPv4Address, ipaddress.IPv6Address)): return str(o) if isinstance(o, (enum.IntEnum, aenum.IntEnum)): return dict( name=f'{type(o).__name__}::{}', value=o.value, ) return super().object_hook(o) # type: ignore[unreachable] def default(self, o: 'Any') -> 'Literal["fallback"]': # pylint: disable=unused-argument """Check content type for function call.""" return 'fallback' def _append_fallback(self, value: 'Any', file: 'TextIO') -> 'None': if hasattr(value, '__slots__'): new_value = {key: getattr(value, key) for key in value.__slots__} elif hasattr(value, '__dict__'): new_value = vars(value) else: logger.warning('unsupported object type: %s', type(value)) new_value = str(value) # type: ignore[assignment] func = self._encode_func(new_value) func(new_value, file) return DictDumper, ext
[docs] def dump(self, packet: 'Packet') -> 'None': """Dump frame to output files. Arguments: packet (Dict[str, Any]): a flow packet (:term:`trace.packet`) """ # fetch flow label output = self.trace(packet, output=True) # dump files output(packet.frame, name=f'Frame {packet.index}') # pylint: disable=not-callable
@overload def trace(self, packet: 'Packet', *, output: 'Literal[True]' = ...) -> 'Dumper': ... @overload def trace(self, packet: 'Packet', *, output: 'Literal[False]' = ...) -> 'str': ...
[docs] def trace(self, packet: 'Packet', *, output: 'bool' = False) -> 'Dumper | str': """Trace packets. Arguments: packet: a flow packet (:term:`trace.packet`) output: flag if has formatted dumper Returns: If ``output`` is :data:`True`, returns the initiated :class:`~dictdumper.dumper.Dumper` object, which will dump data to the output file named after the flow label; otherwise, returns the flow label itself. Notes: The flow label is formatted as following: .. code-block:: python f'{packet.src}_{packet.srcport}-{packet.dst}_{info.dstport}-{packet.timestamp}' """ # clear cache self.__cached__['submit'] = None # Buffer Identifier BUFID = (packet.src, packet.srcport, packet.dst, packet.dstport) # type: BufferID # SYN = packet.syn # Synchronise Flag (Establishment) FIN = packet.fin # Finish Flag (Termination) # # when SYN is set, reset buffer of this seesion # if SYN and BUFID in self._buffer: # temp = self._buffer.pop(BUFID) # temp['fpout'] = (self._fproot, self._fdpext) # temp['index'] = tuple(temp['index']) # self._stream.append(Info(temp)) # initialise buffer with BUFID if BUFID not in self._buffer: label = f'{packet.src}_{packet.srcport}-{packet.dst}_{packet.dstport}-{packet.timestamp}' self._buffer[BUFID] = Buffer( fpout=self._foutio(fname=f'{self._fproot}/{label}{self._fdpext or ""}', protocol=packet.protocol, byteorder=self._endian, nanosecond=self._nnsecd), index=[], label=label, ) # trace frame record self._buffer[BUFID].index.append(packet.index) fpout = self._buffer[BUFID].fpout label = self._buffer[BUFID].label # when FIN is set, submit buffer of this session if FIN: buf = self._buffer.pop(BUFID) # fpout, label = buf['fpout'], buf['label'] self._stream.append(Index( fpout=f'{self._fproot}/{label}{self._fdpext}' if self._fdpext is not None else None, index=tuple(buf.index), label=label, )) # return label or output object return fpout if output else label
[docs] def submit(self) -> 'tuple[Index, ...]': """Submit traced TCP flows. Returns: Traced TCP flow (:term:`trace.index`). """ if (cached := self.__cached__.get('submit')) is not None: return cached ret = [] # type: list[Index] for buf in self._buffer.values(): ret.append(Index(fpout=f"{self._fproot}/{buf.label}{self._fdpext}" if self._fdpext else None, index=tuple(buf.index), label=buf.label,)) ret.extend(self._stream) ret_submit = tuple(ret) self.__cached__['submit'] = ret_submit return ret_submit
########################################################################## # Data models. ########################################################################## def __new__(cls, *args: 'Any', **kwargs: 'Any') -> 'TraceFlow': # pylint: disable=unused-argument self = super().__new__(cls) # NOTE: Assign this attribute after ``__new__`` to avoid shared memory # reference between instances. self.__cached__ = {} return self
[docs] def __init__(self, fout: 'Optional[str]', format: 'Optional[str]', # pylint: disable=redefined-builtin byteorder: 'Literal["little", "big"]' = sys.byteorder, nanosecond: bool = False) -> 'None': """Initialise instance. Arguments: fout: output path format: output format byteorder: output file byte order nanosecond: output nanosecond-resolution file flag """ if fout is None: fout = './tmp' if format is None: format = 'pcap' #: str: Output root path. self._fproot = fout #: dict[BufferID, Buffer]: Buffer field (:term:`trace.buffer`). self._buffer = {} # type: dict[BufferID, Buffer] #: list[Index]: Stream index (:term:`trace.index`). self._stream = [] # type: list[Index] #: Literal['little', 'big']: Output file byte order. self._endian = byteorder #: bool: Output nanosecond-resolution file flag. self._nnsecd = nanosecond # dump I/O object fio, ext = self.make_fout(fout, format) #: Type[Dumper]: Dumper class. self._foutio = fio #: Optional[str]: Output file extension. self._fdpext = ext
[docs] def __call__(self, packet: 'Packet') -> 'None': """Dump frame to output files. Arguments: packet: a flow packet (:term:`trace.packet`) """ # trace frame record self.dump(packet)