Source code for pcapkit.interface

# -*- coding: utf-8 -*-
# pylint: disable=bad-continuation
"""user interface

:mod:`pcapkit.interface` defines several user-oriented
interfaces, variables, and etc. These interfaces are
designed to help and simplify the usage of :mod:`pcapkit`.

"""
import io
import sys

from pcapkit.foundation.analysis import analyse as analyse2
from pcapkit.foundation.extraction import Extractor
from pcapkit.foundation.traceflow import TraceFlow
from pcapkit.protocols.protocol import Protocol
from pcapkit.reassembly.ipv4 import IPv4_Reassembly
from pcapkit.reassembly.ipv6 import IPv6_Reassembly
from pcapkit.reassembly.tcp import TCP_Reassembly
from pcapkit.utilities.exceptions import FormatError
from pcapkit.utilities.validations import bool_check, int_check, io_check, str_check

__all__ = [
    'extract', 'analyse', 'reassemble', 'trace',            # interface functions
    'TREE', 'JSON', 'PLIST', 'PCAP',                        # format macros
    'LINK', 'INET', 'TRANS', 'APP', 'RAW',                  # layer macros
    'DPKT', 'Scapy', 'PyShark', 'MPServer', 'MPPipeline', 'PCAPKit',
                                                            # engine macros
]

# output file formats
TREE = 'tree'
JSON = 'json'
PLIST = 'plist'
PCAP = 'pcap'

# layer thresholds
RAW = 'None'
LINK = 'Link'
INET = 'Internet'
TRANS = 'Transport'
APP = 'Application'

# extraction engines
DPKT = 'dpkt'
Scapy = 'scapy'
PCAPKit = 'default'
PyShark = 'pyshark'
MPServer = 'server'
MPPipeline = 'pipeline'


[docs]def extract(fin=None, fout=None, format=None, # basic settings # pylint: disable=redefined-builtin auto=True, extension=True, store=True, # internal settings files=False, nofile=False, verbose=False, # output settings engine=None, layer=None, protocol=None, # extraction settings ip=False, ipv4=False, ipv6=False, tcp=False, strict=True, # reassembly settings trace=False, trace_fout=None, trace_format=None, # trace settings # pylint: disable=redefined-outer-name trace_byteorder=sys.byteorder, trace_nanosecond=False): # trace settings """Extract a PCAP file. Arguments: fin (Optiona[str]): file name to be read; if file not exist, raise :exc:`FileNotFound` fout (Optiona[str]): file name to be written format (Optional[Literal['plist', 'json', 'tree']]): file format of output auto (bool): if automatically run till EOF extension (bool): if check and append extensions to output file store (bool): if store extracted packet info files (bool): if split each frame into different files nofile (bool): if no output file is to be dumped verbose (bool): if print verbose output information engine (Optional[Literal['default', 'pcapkit', 'dpkt', 'scapy', 'pyshark', 'server', 'pipeline']]): extraction engine to be used layer (Optional[Literal['Link', 'Internet', 'Transport', 'Application']]): extract til which layer protocol (Optional[Union[str, Tuple[str], Type[Protocol]]]): extract til which protocol ip (bool): if record data for IPv4 & IPv6 reassembly ipv4 (bool): if perform IPv4 reassembly ipv6 (bool): if perform IPv6 reassembly tcp (bool): if perform TCP reassembly strict (bool): if set strict flag for reassembly trace (bool): if trace TCP traffic flows trace_fout (Optional[str]): path name for flow tracer if necessary trace_format (Optional[Literal['plist', 'json', 'tree', 'pcap']]): output file format of flow tracer trace_byteorder (Literal['little', 'big']): output file byte order trace_nanosecond (bool): output nanosecond-resolution file flag Returns: Extractor -- an :class:`~pcapkit.foundation.extraction.Extractor` object """ if isinstance(layer, type) and issubclass(layer, Protocol): layer = layer.__layer__ if isinstance(protocol, type) and issubclass(protocol, Protocol): protocol = protocol.id() str_check(fin or '', fout or '', format or '', trace_fout or '', trace_format or '', engine or '', layer or '', *(protocol or '')) bool_check(files, nofile, verbose, auto, extension, store, ip, ipv4, ipv6, tcp, strict, trace) return Extractor(fin=fin, fout=fout, format=format, store=store, files=files, nofile=nofile, auto=auto, verbose=verbose, extension=extension, engine=engine, layer=layer, protocol=protocol, ip=ip, ipv4=ipv4, ipv6=ipv6, tcp=tcp, strict=strict, trace=trace, trace_fout=trace_fout, trace_format=trace_format, trace_byteorder=trace_byteorder, trace_nanosecond=trace_nanosecond)
[docs]def analyse(file, length=None): """Analyse application layer packets. Arguments: file (Union[bytes, io.BytesIO]): packet to be analysed length (Optional[int]): length of the analysing packet Returns: Analysis: an :class:`~pcapkit.foundation.analysis.Analysis` object """ if isinstance(file, bytes): file = io.BytesIO(file) io_check(file) int_check(length or sys.maxsize) return analyse2(file, length)
[docs]def reassemble(protocol, strict=False): """Reassemble fragmented datagrams. Arguments: protocol (Union[str, Type[Protocol]]) protocol to be reassembled strict (bool): if return all datagrams (including those not implemented) when submit Returns: Union[IPv4_Reassembly, IPv6_Reassembly, TCP_Reassembly]: a :class:`~pcapkit.reassembly.reassembly.Reassembly` object of corresponding protocol Raises: FormatError: If ``protocol`` is **NOT** any of IPv4, IPv6 or TCP. """ if isinstance(protocol, type) and issubclass(protocol, Protocol): protocol = protocol.id() str_check(protocol) bool_check(strict) if protocol == 'IPv4': return IPv4_Reassembly(strict=strict) if protocol == 'IPv6': return IPv6_Reassembly(strict=strict) if protocol == 'TCP': return TCP_Reassembly(strict=strict) raise FormatError(f'Unsupported reassembly protocol: {protocol}')
[docs]def trace(fout=None, format=None, byteorder=sys.byteorder, nanosecond=False): # pylint: disable=redefined-builtin """Trace TCP flows. Arguments: fout (str): output path format (Optional[str]): output format byteorder (str): output file byte order nanosecond (bool): output nanosecond-resolution file flag Returns: TraceFlow: a :class:`~pcapkit.foundation.traceflow.TraceFlow` object """ str_check(fout or '', format or '') return TraceFlow(fout=fout, format=format, byteorder=byteorder, nanosecond=nanosecond)