# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel
"""trace TCP flows
:mod:`pcapkit.foundation.traceflow` is the interface to trace
TCP flows from a series of packets and connections.
.. note::
This was implemented as the demand of my mate @gousaiyang.
Glossary
--------
trace.packet
Data structure for **TCP flow tracing**
(:meth:`~pcapkit.foundation.traceflow.TraceFlow.dump`)
is as following:
.. code:: python
tract_dict = dict(
protocol=data_link, # data link type from global header
index=frame.info.number, # frame number
frame=frame.info, # extracted frame info
syn=tcp.flags.syn, # TCP synchronise (SYN) flag
fin=tcp.flags.fin, # TCP finish (FIN) flag
src=ip.src, # source IP
dst=ip.dst, # destination IP
srcport=tcp.srcport, # TCP source port
dstport=tcp.dstport, # TCP destination port
timestamp=frame.info.time_epoch, # frame timestamp
)
trace.buffer
Data structure for internal buffering when performing flow tracing algorithms
(:attr:`~pcapkit.foundation.traceflow.TraceFlow._buffer`) is as following:
.. code:: python
(dict) buffer --> memory buffer for reassembly
|--> (tuple) BUFID : (dict)
| |--> ip.src |
| |--> ip.dst |
| |--> tcp.srcport |
| |--> tcp.dstport |
| |--> 'fpout' : (dictdumper.dumper.Dumper) output dumper object
| |--> 'index': (list) list of frame index
| | |--> (int) frame index
| |--> 'label': (str) flow label generated from ``BUFID``
|--> (tuple) BUFID ...
trace.index
Data structure for **TCP flow tracing** (element from
:attr:`~pcapkit.foundation.traceflow.TraceFlow.index` *tuple*)
is as following:
.. code:: python
(tuple) index
|--> (Info) data
| |--> 'fpout' : (Optional[str]) output filename if exists
| |--> 'index': (tuple) tuple of frame index
| | |--> (int) frame index
| |--> 'label': (str) flow label generated from ``BUFID``
|--> (Info) data ...
"""
import ipaddress
import pathlib
import sys
import warnings
from pcapkit.corekit.infoclass import Info
from pcapkit.utilities.compat import pathlib
from pcapkit.utilities.exceptions import FileExists, stacklevel
from pcapkit.utilities.validations import pkt_check
from pcapkit.utilities.warnings import FileWarning, FormatWarning
###############################################################################
# from dictdumper import JSON, PLIST, XML, JavaScript, Tree
# from pcapkit.dumpkit import PCAP, NotImplementedIO
###############################################################################
__all__ = ['TraceFlow']
[docs]class TraceFlow:
"""Trace TCP flows."""
##########################################################################
# Properties.
##########################################################################
@property
def index(self):
"""Index table for traced flow.
:rtype: Tuple[Info]
"""
if self._newflg:
return self.submit()
return tuple(self._stream)
##########################################################################
# Methods.
##########################################################################
[docs] @staticmethod
def make_fout(fout='./tmp', fmt='pcap'):
"""Make root path for output.
Positional arguments:
fout (str): root path for output
fmt (str): output format
Returns:
Tuple[Type[dictdumper.dumper.Dumper], str]: dumper of specified format and file
extension of output file
Warns:
FormatWarning: If ``fmt`` is not supported.
FileWarning: If ``fout`` exists and ``fmt`` is :data:`None`.
Raises:
FileExists: If ``fout`` exists and ``fmt`` is **NOT** :data:`None`.
"""
if fmt == 'pcap': # output PCAP file
from pcapkit.dumpkit import PCAP as output
elif fmt == 'plist': # output PLIST file
from dictdumper import PLIST as output
elif fmt == 'json': # output JSON file
from dictdumper import JSON as output
elif fmt == 'tree': # output treeview text file
from dictdumper import Tree as output
fmt = 'txt'
elif fmt == 'html': # output JavaScript file
from dictdumper import VueJS as output
fmt = 'js'
elif fmt == 'xml': # output XML file
from dictdumper import XML as output
else: # no output file
from pcapkit.dumpkit import NotImplementedIO as output
if fmt is not None:
warnings.warn(f'Unsupported output format: {fmt}; disabled file output feature',
FormatWarning, stacklevel=stacklevel())
return output, ''
try:
pathlib.Path(fout).mkdir(parents=True, exist_ok=True)
except FileExistsError as error:
if fmt is None:
warnings.warn(error.strerror, FileWarning, stacklevel=stacklevel())
else:
raise FileExists(*error.args).with_traceback(error.__traceback__) from None
class DictDumper(output):
"""Customised :class:`~dictdumper.dumper.Dumper` object."""
def object_hook(self, o):
"""Convert content for function call.
Args:
o (:obj:`Any`): object to convert
Returns:
:obj:`Any`: the converted object
"""
import enum
import aenum
if isinstance(o, (enum.IntEnum, aenum.IntEnum)):
return dict(
enum=type(o).__name__,
desc=o.__doc__,
name=o.name,
value=o.value,
)
if isinstance(o, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
return str(o)
if isinstance(o, Info):
return o.info2dict()
return super().object_hook(o)
return DictDumper, fmt
[docs] def dump(self, packet):
"""Dump frame to output files.
Arguments:
packet (Dict[str, Any]): a flow packet (:term:`trace.packet`)
"""
# fetch flow label
output = self.trace(packet, check=False, output=True)
# dump files
output(packet['frame'], name=f"Frame {packet['index']}")
[docs] def trace(self, packet, *, check=True, output=False):
"""Trace packets.
Arguments:
packet (Dict[str, Any]): a flow packet (:term:`trace.packet`)
Keyword Arguments:
check (bool): flag if run validations
output (bool): flag if has formatted dumper
Returns:
Union[dictdumper.dumper.Dumper, str]: If ``output`` is :data:`True`,
returns the initiated :class:`~dictdumper.dumper.Dumper` object, which
will dump data to the output file named after the flow label;
otherwise, returns the flow label itself.
Notes:
The flow label is formatted as following::
f'{packet.src}_{packet.srcport}-{packet.dst}_{info.dstport}-{packet.timestamp}'
"""
self._newflg = True
if check:
pkt_check(packet)
info = Info(packet)
# Buffer Identifier
BUFID = tuple(sorted([str(info.src), str(info.srcport), # pylint: disable=E1101
str(info.dst), str(info.dstport)])) # pylint: disable=E1101
# SYN = info.syn # Synchronise Flag (Establishment)
# Finish Flag (Termination)
FIN = info.fin # pylint: disable=E1101
# # when SYN is set, reset buffer of this seesion
# if SYN and BUFID in self._buffer:
# temp = self._buffer.pop(BUFID)
# temp['fpout'] = (self._fproot, self._fdpext)
# temp['index'] = tuple(temp['index'])
# self._stream.append(Info(temp))
# initialise buffer with BUFID
if BUFID not in self._buffer:
label = f'{info.src}_{info.srcport}-{info.dst}_{info.dstport}-{info.timestamp}' # pylint: disable=E1101
self._buffer[BUFID] = dict(
fpout=self._foutio(fname=f'{self._fproot}/{label}.{self._fdpext}', protocol=info.protocol, # pylint: disable=E1101
byteorder=self._endian, nanosecond=self._nnsecd),
index=list(),
label=label,
)
# trace frame record
self._buffer[BUFID]['index'].append(info.index) # pylint: disable=E1101
fpout = self._buffer[BUFID]['fpout']
label = self._buffer[BUFID]['label']
# when FIN is set, submit buffer of this session
if FIN:
buf = self._buffer.pop(BUFID)
# fpout, label = buf['fpout'], buf['label']
if self._fdpext:
buf['fpout'] = f'{self._fproot}/{label}.{self._fdpext}'
else:
del buf['fpout']
buf['index'] = tuple(buf['index'])
self._stream.append(Info(buf))
# return label or output object
return fpout if output else label
[docs] def submit(self):
"""Submit traced TCP flows.
Returns:
Tuple[Info]: traced TCP flow (:term:`trace.buffer`)
"""
self._newflg = False
ret = list()
for buf in self._buffer.values():
lbl = buf['label']
ret.append(Info(fpout=f"{self._fproot}/{lbl}.{self._fdpext}" if self._fdpext else NotImplemented,
index=tuple(buf['index']),
label=lbl,))
ret += self._stream
return tuple(ret)
##########################################################################
# Data models.
##########################################################################
[docs] def __init__(self, fout=None, format=None, byteorder=sys.byteorder, nanosecond=False): # pylint: disable=redefined-builtin
"""Initialise instance.
Arguments:
fout (Optional[str]): output path
format (Optional[str]): output format
byteorder (str): output file byte order
nanosecond (bool): output nanosecond-resolution file flag
"""
#: bool: New packet flag.
self._newflg = False
#: str: Output root path.
self._fproot = fout
#: dict: Buffer field (:term:`trace.buffer`).
self._buffer = dict()
#: list: Stream index (:term:`trace.index`).
self._stream = list()
#: Literal['little', 'big']: Output file byte order.
self._endian = byteorder
#: bool: Output nanosecond-resolution file flag.
self._nnsecd = nanosecond
# dump I/O object
fio, ext = self.make_fout(fout, format)
#: Type[dictdumper.dumper.Dumper]: Dumper class.
self._foutio = fio
#: str: Output file extension.
self._fdpext = ext
[docs] def __call__(self, packet):
"""Dump frame to output files.
Arguments:
packet (Dict[str, Any]): a flow packet (:term:`trace.packet`)
"""
self._newflg = True
self.dump(packet)