Source code for pcapkit.interface.core

# -*- coding: utf-8 -*-
"""core user interface

:mod:`pcapkit.interface.core` defines core user-oriented
interfaces, variables, and etc., which wraps around the
foundation classes from :mod:`pcapkit.foundation`.

"""
import sys
from typing import TYPE_CHECKING

from pcapkit.foundation.extraction import Extractor
from pcapkit.foundation.reassembly.ipv4 import IPv4_Reassembly
from pcapkit.foundation.reassembly.ipv6 import IPv6_Reassembly
from pcapkit.foundation.reassembly.tcp import TCP_Reassembly
from pcapkit.foundation.traceflow import TraceFlow
from pcapkit.protocols.protocol import Protocol
from pcapkit.utilities.exceptions import FormatError

if TYPE_CHECKING:
    from typing import Any, Callable, Optional, Type, Union

    from typing_extensions import Literal

    from pcapkit.foundation.reassembly.reassembly import Reassembly
    from pcapkit.protocols.misc.pcap.frame import Frame

    Formats = Literal['pcap', 'json', 'tree', 'plist']
    Engines = Literal['default', 'pcapkit', 'dpkt', 'scapy', 'pyshark']
    Layers = Literal['link', 'internet', 'transport', 'application', 'none']

    Protocols = Union[str, Protocol, Type[Protocol]]
    VerboseHandler = Callable[['Extractor', Frame], Any]

__all__ = [
    'extract', 'reassemble', 'trace',                       # interface functions
    'TREE', 'JSON', 'PLIST', 'PCAP',                        # format macros
    'LINK', 'INET', 'TRANS', 'APP', 'RAW',                  # layer macros
    'DPKT', 'Scapy', 'PyShark', 'PCAPKit',                  # engine macros
]

# output file formats
TREE = 'tree'
JSON = 'json'
PLIST = 'plist'
PCAP = 'pcap'

# layer thresholds
RAW = 'none'
LINK = 'link'
INET = 'internet'
TRANS = 'transport'
APP = 'application'

# extraction engines
DPKT = 'dpkt'
Scapy = 'scapy'
PCAPKit = 'default'
PyShark = 'pyshark'


[docs]def extract(fin: 'Optional[str]' = None, fout: 'Optional[str]' = None, format: 'Optional[Formats]' = None, # basic settings # pylint: disable=redefined-builtin auto: 'bool' = True, extension: 'bool' = True, store: 'bool' = True, # internal settings # pylint: disable=line-too-long files: 'bool' = False, nofile: 'bool' = False, verbose: 'bool | VerboseHandler' = False, # output settings # pylint: disable=line-too-long engine: 'Optional[Engines]' = None, layer: 'Optional[Layers | Type[Protocol]]' = None, protocol: 'Optional[Protocols]' = None, # extraction settings # pylint: disable=line-too-long ip: 'bool' = False, ipv4: 'bool' = False, ipv6: 'bool' = False, tcp: 'bool' = False, strict: 'bool' = True, # reassembly settings # pylint: disable=line-too-long trace: 'bool' = False, trace_fout: 'Optional[str]' = None, trace_format: 'Optional[Formats]' = None, # trace settings # pylint: disable=line-too-long,redefined-outer-name trace_byteorder: 'Literal["big", "little"]' = sys.byteorder, trace_nanosecond: 'bool' = False) -> 'Extractor': # trace settings # pylint: disable=line-too-long """Extract a PCAP file. Arguments: fin: file name to be read; if file not exist, raise :exc:`FileNotFound` fout: file name to be written format: file format of output auto: if automatically run till EOF extension: if check and append extensions to output file store: if store extracted packet info files: if split each frame into different files nofile: if no output file is to be dumped verbose: a :obj:`bool` value or a function takes the :class:`Extract` instance and current parsed frame (depends on engine selected) as parameters to print verbose output information engine: extraction engine to be used layer: extract til which layer protocol: extract til which protocol ip: if record data for IPv4 & IPv6 reassembly ipv4: if perform IPv4 reassembly ipv6: if perform IPv6 reassembly tcp: if perform TCP reassembly strict: if set strict flag for reassembly trace: if trace TCP traffic flows trace_fout: path name for flow tracer if necessary trace_format: output file format of flow tracer trace_byteorder: output file byte order trace_nanosecond: output nanosecond-resolution file flag Returns: An :class:`~pcapkit.foundation.extraction.Extractor` object. """ if isinstance(layer, type) and issubclass(layer, Protocol): layer = (layer.__layer__ or 'none').lower() # type: ignore[assignment] return Extractor(fin=fin, fout=fout, format=format, store=store, files=files, nofile=nofile, auto=auto, verbose=verbose, extension=extension, engine=engine, layer=layer, protocol=protocol, # type: ignore[arg-type] ip=ip, ipv4=ipv4, ipv6=ipv6, tcp=tcp, strict=strict, trace=trace, trace_fout=trace_fout, trace_format=trace_format, trace_byteorder=trace_byteorder, trace_nanosecond=trace_nanosecond)
[docs]def reassemble(protocol: 'str | Type[Protocol]', strict: 'bool' = False) -> 'Reassembly': """Reassemble fragmented datagrams. Arguments: protocol: protocol to be reassembled strict: if return all datagrams (including those not implemented) when submit Returns: A :class:`~pcapkit.foundation.reassembly.reassembly.Reassembly` object of corresponding protocol. Raises: FormatError: If ``protocol`` is **NOT** any of IPv4, IPv6 or TCP. """ if isinstance(protocol, type) and issubclass(protocol, Protocol): protocol = protocol.id()[0] if protocol == 'IPv4': return IPv4_Reassembly(strict=strict) if protocol == 'IPv6': return IPv6_Reassembly(strict=strict) if protocol == 'TCP': return TCP_Reassembly(strict=strict) raise FormatError(f'Unsupported reassembly protocol: {protocol}')
[docs]def trace(fout: 'Optional[str]', format: 'Optional[str]', # pylint: disable=redefined-builtin byteorder: 'Literal["little", "big"]' = sys.byteorder, nanosecond: bool = False) -> 'TraceFlow': """Trace TCP flows. Arguments: fout: output path format: output format byteorder: output file byte order nanosecond: output nanosecond-resolution file flag Returns: A :class:`~pcapkit.foundation.traceflow.TraceFlow` object. """ return TraceFlow(fout=fout, format=format, byteorder=byteorder, nanosecond=nanosecond)