Source code for pcapkit.foundation.extraction
# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel,fixme
"""Extractor for PCAP Files
==============================
:mod:`pcapkit.foundation.extraction` contains
:class:`~pcapkit.foundation.extraction.Extractor` only,
which synthesises file I/O and protocol analysis,
coordinates information exchange in all network layers,
extracts parametres from a PCAP file.
"""
# TODO: implement engine support for pypcap & pycapfile
import collections
import importlib
import os
import sys
from typing import TYPE_CHECKING, cast
from pcapkit.const.reg.linktype import LinkType as RegType_LinkType
from pcapkit.corekit.infoclass import Info
from pcapkit.protocols.misc.pcap.frame import Frame
from pcapkit.protocols.misc.pcap.header import Header
from pcapkit.utilities.exceptions import (CallableError, FileNotFound, FormatError, IterableError,
UnsupportedCall, stacklevel)
from pcapkit.utilities.logging import logger
from pcapkit.utilities.warnings import (AttributeWarning, DPKTWarning, EngineWarning, FormatWarning,
warn)
if TYPE_CHECKING:
from types import ModuleType, TracebackType
from typing import Any, BinaryIO, Callable, DefaultDict, Iterator, Optional, TextIO, Type, Union
from dictdumper.dumper import Dumper
from dpkt.dpkt import Packet as DPKTPacket
from pyshark.packet.packet import Packet as PySharkPacket
from scapy.packet import Packet as ScapyPacket
from typing_extensions import Literal
from pcapkit.corekit.version import VersionInfo
from pcapkit.foundation.reassembly.ip import Datagram as IP_Datagram
from pcapkit.foundation.reassembly.ipv4 import IPv4_Reassembly
from pcapkit.foundation.reassembly.ipv6 import IPv6_Reassembly
from pcapkit.foundation.reassembly.reassembly import Reassembly
from pcapkit.foundation.reassembly.tcp import Datagram as TCP_Datagram
from pcapkit.foundation.reassembly.tcp import TCP_Reassembly
from pcapkit.foundation.traceflow import Index, TraceFlow
from pcapkit.protocols.protocol import Protocol
Formats = Literal['pcap', 'json', 'tree', 'plist']
Engines = Literal['default', 'pcapkit', 'dpkt', 'scapy', 'pyshark']
Layers = Literal['link', 'internet', 'transport', 'application', 'none']
Protocols = Union[str, Protocol, Type[Protocol]]
VerboseHandler = Callable[['Extractor', Union[Frame, ScapyPacket, DPKTPacket, PySharkPacket]], Any]
__all__ = ['Extractor']
[docs]class ReassemblyData(Info):
"""Data storage for reassembly."""
#: IPv4 reassembled data.
ipv4: 'Optional[tuple[IP_Datagram, ...]]'
#: IPv6 reassembled data.
ipv6: 'Optional[tuple[IP_Datagram, ...]]'
#: TCP reassembled data.
tcp: 'Optional[tuple[TCP_Datagram, ...]]'
if TYPE_CHECKING:
def __init__(self, ipv4: 'Optional[tuple[IP_Datagram, ...]]', ipv6: 'Optional[tuple[IP_Datagram, ...]]', tcp: 'Optional[tuple[TCP_Datagram, ...]]') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long
[docs]class Extractor:
"""Extractor for PCAP files.
Notes:
For supported engines, please refer to
:meth:`~pcapkit.foundation.extraction.Extractor.run`.
"""
#: Input file name.
_ifnm: 'str'
#: Output file name.
_ofnm: 'Optional[str]'
#: Output file extension.
_fext: 'Optional[str]'
#: Auto extract flag.
_flag_a: 'bool'
#: Store data flag.
_flag_d: 'bool'
#: EOF flag.
_flag_e: 'bool'
#: Split file flag.
_flag_f: 'bool'
#: No output file.
_flag_q: 'bool'
#: Trace flag.
_flag_t: 'bool'
#: Verbose flag.
_flag_v: 'bool'
#: Verbose callback function.
#_vfunc: 'VerboseHandler'
#: Frame number.
_frnum: 'int'
#: Frame records.
_frame: 'list[Frame | ScapyPacket | DPKTPacket]'
#: Frame record for reassembly.
_reasm: 'list[Optional[Reassembly]]'
#: Flow tracer.
_trace: 'Optional[TraceFlow]'
#: IPv4 reassembly flag.
_ipv4: 'bool'
#: IPv6 reassembly flag.
_ipv6: 'bool'
#: TCP reassembly flag.
_tcp: 'bool'
#: Extract til protocol.
_exptl: 'Protocols'
#: Extract til layer.
_exlyr: 'Layers'
#: Extract using engine.
_exeng: 'Engines'
#: Extract module instance.
_expkg: 'Any'
#: Extract iterator instance.
_extmp: 'Any'
#: Input file object.
_ifile: 'BinaryIO'
#: Output file object.
_ofile: 'Dumper | Type[Dumper]'
#: Global header.
_gbhdr: 'Header'
#: Version info.
_vinfo: 'VersionInfo'
#: Data link layer protocol.
_dlink: 'RegType_LinkType'
#: Nanosecond flag.
_nnsec: 'bool'
#: Output format.
_type: 'Formats'
##########################################################################
# Defaults.
##########################################################################
#: DefaultDict[str, tuple[str, str, str | None]]: Format dumper mapping for
#: writing output files. The values should be a tuple representing the
#: module name, class name and file extension.
__output__ = collections.defaultdict(
lambda: ('pcapkit.dumpkit', 'NotImplementedIO', None),
{
'pcap': ('pcapkit.dumpkit', 'PCAPIO', '.pcap'),
'cap': ('pcapkit.dumpkit', 'PCAPIO', '.pcap'),
'plist': ('dictdumper', 'PLIST', '.plist'),
'xml': ('dictdumper', 'PLIST', '.plist'),
'json': ('dictdumper', 'JSON', '.json'),
'tree': ('dictdumper', 'Tree', '.txt'),
'text': ('dictdumper', 'Text', '.txt'),
'txt': ('dictdumper', 'Tree', '.txt'),
}
) # type: DefaultDict[str, tuple[str, str, str | None]]
##########################################################################
# Properties.
##########################################################################
@property
def info(self) -> 'VersionInfo':
"""Version of input PCAP file.
Raises:
UnsupportedCall: If :attr:`self._exeng <pcapkit.foundation.extraction.Extractor._exeng>`
is ``'scapy'`` or ``'pyshark'``, as such engines does not reserve such information.
"""
if self._exeng in ('scapy', 'pyshark'):
raise UnsupportedCall(f"'Extractor(engine={self._exeng})' object has no attribute 'info'")
return self._vinfo
@property
def length(self) -> 'int':
"""Frame number (of current extracted frame or all)."""
return self._frnum
@property
def format(self) -> 'Formats':
"""Format of output file.
Raises:
UnsupportedCall: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>`
is set as :data:`True`, as output is disabled by initialisation parameter.
"""
if self._flag_q:
raise UnsupportedCall("'Extractor(nofile=True)' object has no attribute 'format'")
return self._type
@property
def input(self) -> 'str':
"""Name of input PCAP file."""
return self._ifnm
@property
def output(self) -> 'str':
"""Name of output file.
Raises:
UnsupportedCall: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>`
is set as :data:`True`, as output is disabled by initialisation parameter.
"""
if self._flag_q:
raise UnsupportedCall("'Extractor(nofile=True)' object has no attribute 'format'")
return cast('str', self._ofnm)
@property
def header(self) -> 'Header':
"""Global header."""
return self._gbhdr
@property
def frame(self) -> 'tuple[Frame, ...]':
"""Extracted frames.
Raises:
UnsupportedCall: If :attr:`self._flag_d <pcapkit.foundation.extraction.Extractor._flag_d>`
is :data:`True`, as storing frame data is disabled.
"""
if self._flag_d:
return tuple(self._frame)
raise UnsupportedCall("'Extractor(store=False)' object has no attribute 'frame'")
@property
def reassembly(self) -> 'ReassemblyData':
"""Frame record for reassembly.
* ``ipv4`` -- tuple of TCP payload fragment (:term:`ipv4.datagram`)
* ``ipv6`` -- tuple of TCP payload fragment (:term:`ipv6.datagram`)
* ``tcp`` -- tuple of TCP payload fragment (:term:`tcp.datagram`)
"""
data = ReassemblyData(
ipv4=tuple(cast('IPv4_Reassembly', self._reasm[0]).datagram) if self._ipv4 else None,
ipv6=tuple(cast('IPv6_Reassembly', self._reasm[1]).datagram) if self._ipv6 else None,
tcp=tuple(cast('TCP_Reassembly', self._reasm[2]).datagram) if self._tcp else None,
)
return data
@property
def trace(self) -> 'tuple[Index, ...]':
"""Index table for traced flow.
Raises:
UnsupportedCall: If :attr:`self._flag_t <pcapkit.foundation.extraction.Extractor._flag_t>`
is :data:`True`, as TCP flow tracing is disabled.
"""
if self._flag_t:
return cast('TraceFlow', self._trace).index
raise UnsupportedCall("'Extractor(trace=False)' object has no attribute 'trace'")
@property
def engine(self) -> 'Engines':
"""PCAP extraction engine."""
return self._exeng
##########################################################################
# Methods.
##########################################################################
[docs] @classmethod
def register(cls, format: 'str', module: 'str', class_: 'str', ext: 'str') -> 'None': # pylint: disable=redefined-builtin
r"""Register a new dumper class.
Notes:
The full qualified class name of the new dumper class
should be as ``{module}.{class_}``.
Arguments:
format: format name
module: module name
class\_: class name
ext: file extension
"""
cls.__output__[format] = (module, class_, ext)
[docs] def run(self) -> 'None': # pylint: disable=inconsistent-return-statements
"""Start extraction.
We uses :meth:`~pcapkit.foundation.extraction.Extractor.import_test` to check if
a certain engine is available or not. For supported engines, each engine has
different driver method:
* Default drivers:
* Global header: :meth:`~pcapkit.foundation.extraction.Extractor.record_header`
* Packet frames: :meth:`~pcapkit.foundation.extraction.Extractor.record_frames`
* DPKT driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_dpkt`
* Scapy driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_scapy`
* PyShark driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_pyshark`
Warns:
EngineWarning: If the extraction engine is not available. This is either due to
dependency not installed, or supplied engine unknown.
"""
if self._exeng == 'dpkt':
engine = self.import_test('dpkt', name='DPKT')
if engine is not None:
return self._run_dpkt(engine)
elif self._exeng == 'scapy':
engine = self.import_test('scapy.all', name='Scapy')
if engine is not None:
return self._run_scapy(engine)
elif self._exeng == 'pyshark':
engine = self.import_test('pyshark', name='PyShark')
if engine is not None:
return self._run_pyshark(engine)
elif self._exeng not in ('default', 'pcapkit'):
warn(f'unsupported extraction engine: {self._exeng}; '
'using default engine instead', EngineWarning, stacklevel=stacklevel())
self._exeng = 'default' # using default/pcapkit engine
self.record_header() # read PCAP global header
self.record_frames() # read frames
@staticmethod
def import_test(engine: 'str', *, name: 'Optional[str]' = None) -> 'Optional[ModuleType]':
"""Test import for extractcion engine.
Args:
engine: Extraction engine module name.
name: Extraction engine display name.
Warns:
EngineWarning: If the engine module is not installed.
Returns:
If succeeded, returns the module; otherwise, returns :data:`None`.
"""
try:
module = importlib.import_module(engine)
except ImportError:
module = None
warn(f"extraction engine '{name or engine}' not available; "
'using default engine instead', EngineWarning, stacklevel=stacklevel())
return module
[docs] @classmethod
def make_name(cls, fin: 'str' = 'in.pcap', fout: 'str' = 'out', fmt: 'Formats' = 'tree',
extension: 'bool' = True, *, files: 'bool' = False,
nofile: 'bool' = False) -> 'tuple[str, Optional[str], Formats, Optional[str], bool]':
"""Generate input and output filenames.
The method will perform following processing:
1. sanitise ``fin`` as the input PCAP filename; ``in.pcap`` as default value and
append ``.pcap`` extension if needed and ``extension`` is :data:`True`; as well
as test if the file exists;
2. if ``nofile`` is :data:`True`, skips following processing;
3. if ``fmt`` provided, then it presumes corresponding output file extension;
4. if ``fout`` not provided, it presumes the output file name based on the presumptive
file extension; the stem of the output file name is set as ``out``; should the file
extension is not available, then it raises :exc:`~pcapkit.utilities.exceptions.FormatError`;
5. if ``fout`` provided, it presumes corresponding output format if needed; should the
presumption cannot be made, then it raises :exc:`~pcapkit.utilities.exceptions.FormatError`;
6. it will also append corresponding file extension to the output file name if needed
and ``extension`` is :data:`True`.
Args:
fin: Input filename.
fout: Output filename.
fmt: Output file format.
extension: If append ``.pcap`` file extension to the input filename
if ``fin`` does not have such file extension; if check and append extensions
to output file.
files: If split each frame into different files.
nofile: If no output file is to be dumped.
Returns:
Generated input and output filenames:
0. input filename
1. output filename / directory name
2. output format
3. output file extension (without ``.``)
4. if split each frame into different files
Raises:
FileNotFound: If input file does not exists.
FormatError: If output format not provided and cannot be presumpted.
"""
if extension: # pylint: disable=else-if-used
ifnm = fin if os.path.splitext(fin)[1] == '.pcap' else f'{fin}.pcap'
else:
ifnm = fin
if not os.path.isfile(ifnm):
raise FileNotFound(2, 'No such file or directory', ifnm)
if nofile:
ofnm = None
ext = None
else:
ext = cls.__output__[fmt][2]
if ext is None:
raise FormatError(f'unknown output format: {fmt}')
if (parent := os.path.split(fout)[0]):
os.makedirs(parent, exist_ok=True)
if files:
ofnm = fout
os.makedirs(ofnm, exist_ok=True)
elif extension:
ofnm = fout if os.path.splitext(fout)[1] == ext else f'{fout}{ext}'
else:
ofnm = fout
return ifnm, ofnm, fmt, ext, files
[docs] def record_header(self) -> 'None':
"""Read global header.
The method will parse the PCAP global header and save the parsed result
as :attr:`self._gbhdr <Extractor._gbhdr>`. Information such as PCAP version,
data link layer protocol type, nanosecond flag and byteorder will also be
save the current :class:`Extractor` instance.
If TCP flow tracing is enabled, the nanosecond flag and byteorder will
be used for the output PCAP file of the traced TCP flows.
For output, the method will dump the parsed PCAP global header under
the name of ``Global Header``.
"""
# pylint: disable=attribute-defined-outside-init,protected-access
self._gbhdr = Header(self._ifile)
self._vinfo = self._gbhdr.version
self._dlink = self._gbhdr.protocol
self._nnsec = self._gbhdr.nanosecond
if self._trace is not None:
self._trace._endian = self._gbhdr.byteorder
self._trace._nnsecd = self._gbhdr.nanosecond
if self._flag_q:
return
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/Global Header.{self._fext}')
ofile(self._gbhdr.info, name='Global Header')
else:
self._ofile(self._gbhdr.info, name='Global Header')
ofile = self._ofile
self._type = ofile.kind
[docs] def record_frames(self) -> 'None':
"""Read packet frames.
The method calls :meth:`_read_frame` to parse each frame from the input
PCAP file; and calls :meth:`_cleanup` upon complision.
Notes:
Under non-auto mode, i.e. :attr:`self._flag_a <Extractor._flag_a>` is
:data:`False`, the method performs no action.
"""
if self._flag_a:
while True:
try:
self._read_frame()
except (EOFError, StopIteration):
# quit when EOF
break
self._cleanup()
##########################################################################
# Data models.
##########################################################################
[docs] def __init__(self,
fin: 'Optional[str]' = None, fout: 'Optional[str]' = None, format: 'Optional[Formats]' = None, # basic settings # pylint: disable=redefined-builtin
auto: 'bool' = True, extension: 'bool' = True, store: 'bool' = True, # internal settings # pylint: disable=line-too-long
files: 'bool' = False, nofile: 'bool' = False, verbose: 'bool | VerboseHandler' = False, # output settings # pylint: disable=line-too-long
engine: 'Optional[Engines]' = None, layer: 'Optional[Layers]' = None, protocol: 'Optional[Protocols]' = None, # extraction settings # pylint: disable=line-too-long
ip: 'bool' = False, ipv4: 'bool' = False, ipv6: 'bool' = False, tcp: 'bool' = False, strict: 'bool' = True, # reassembly settings # pylint: disable=line-too-long
trace: 'bool' = False, trace_fout: 'Optional[str]' = None, trace_format: 'Optional[Formats]' = None, # trace settings # pylint: disable=line-too-long
trace_byteorder: 'Literal["big", "little"]' = sys.byteorder, trace_nanosecond: 'bool' = False) -> 'None': # trace settings # pylint: disable=line-too-long
"""Initialise PCAP Reader.
Args:
fin: file name to be read; if file not exist, raise :exc:`FileNotFound`
fout: file name to be written
format: file format of output
auto: if automatically run till EOF
extension: if check and append extensions to output file
store: if store extracted packet info
files: if split each frame into different files
nofile: if no output file is to be dumped
verbose: a :obj:`bool` value or a function takes the :class:`Extractor`
instance and current parsed frame (depends on engine selected) as
parameters to print verbose output information
engine: extraction engine to be used
layer: extract til which layer
protocol: extract til which protocol
ip: if record data for IPv4 & IPv6 reassembly
ipv4: if perform IPv4 reassembly
ipv6: if perform IPv6 reassembly
tcp: if perform TCP reassembly
strict: if set strict flag for reassembly
trace: if trace TCP traffic flows
trace_fout: path name for flow tracer if necessary
trace_format: output file format of flow tracer
trace_byteorder: output file byte order
trace_nanosecond: output nanosecond-resolution file flag
Warns:
FormatWarning: Warns under following circumstances:
* If using PCAP output for TCP flow tracing while the extraction engine is PyShark.
* If output file format is not supported.
"""
if fin is None:
fin = 'in.pcap'
if fout is None:
fout = 'out'
if format is None:
format = 'tree'
ifnm, ofnm, fmt, oext, files = self.make_name(fin, fout, format, extension, files=files, nofile=nofile)
self._ifnm = ifnm # input file name
self._ofnm = ofnm # output file name
self._fext = oext # output file extension
self._flag_a = auto # auto extract flag
self._flag_d = store # store data flag
self._flag_e = False # EOF flag
self._flag_f = files # split file flag
self._flag_q = nofile # no output flag
self._flag_t = trace # trace flag
self._flag_v = False # verbose flag
# verbose callback function
if isinstance(verbose, bool):
self._flag_v = verbose
if verbose:
self._vfunc = lambda e, f: print(
f'Frame {e._frnum:>3d}: {f.protochain}' # pylint: disable=protected-access
) # pylint: disable=logging-fstring-interpolation
else:
self._vfunc = lambda e, f: None
else:
self._flag_v = True
self._vfunc = verbose
self._frnum = 0 # frame number
self._frame = [] # frame record
self._reasm = [None for _ in range(3)] # frame record for reassembly (IPv4 / IPv6 / TCP)
self._trace = None # flow tracer
self._ipv4 = ipv4 or ip # IPv4 Reassembly
self._ipv6 = ipv6 or ip # IPv6 Reassembly
self._tcp = tcp # TCP Reassembly
self._exptl = protocol or 'null' # extract til protocol
self._exlyr = cast('Layers', (layer or 'none').lower()) # extract til layer
self._exeng = cast('Engines', (engine or 'default').lower()) # extract using engine
if self._ipv4:
from pcapkit.foundation.reassembly.ipv4 import IPv4_Reassembly
self._reasm[0] = IPv4_Reassembly(strict=strict)
if self._ipv6:
from pcapkit.foundation.reassembly.ipv6 import IPv6_Reassembly
self._reasm[1] = IPv6_Reassembly(strict=strict)
if self._tcp:
from pcapkit.foundation.reassembly.tcp import TCP_Reassembly
self._reasm[2] = TCP_Reassembly(strict=strict)
if trace:
from pcapkit.foundation.traceflow import TraceFlow # isort: skip
if self._exeng in ('pyshark',) and trace_format in ('pcap',):
warn(f"'Extractor(engine={self._exeng})' does not support 'trace_format={trace_format}'; "
"using 'trace_format=None' instead", FormatWarning, stacklevel=stacklevel())
trace_format = None
self._trace = TraceFlow(fout=trace_fout, format=trace_format,
byteorder=trace_byteorder, nanosecond=trace_nanosecond)
self._ifile = open(ifnm, 'rb') # input file # pylint: disable=unspecified-encoding,consider-using-with
if not self._flag_q:
module, class_, ext = self.__output__[fmt]
if ext is None:
warn(f'Unsupported output format: {fmt}; disabled file output feature',
FormatWarning, stacklevel=stacklevel())
output = getattr(importlib.import_module(module), class_) # type: Type[Dumper]
class DictDumper(output): # type: ignore[valid-type,misc]
"""Customised :class:`~dictdumper.dumper.Dumper` object."""
def object_hook(self, o: 'Any') -> 'Any':
"""Convert content for function call.
Args:
o: object to convert
Returns:
Converted object.
"""
import datetime
import decimal
import enum
import ipaddress
import aenum
if isinstance(o, decimal.Decimal):
return str(o)
if isinstance(o, datetime.timedelta):
return o.total_seconds()
if isinstance(o, Info):
return o.to_dict()
if isinstance(o, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
return str(o)
if isinstance(o, (enum.IntEnum, aenum.IntEnum)):
return dict(
name=f'{type(o).__name__}::{o.name}',
value=o.value,
)
return super().object_hook(o) # type: ignore[unreachable]
def default(self, o: 'Any') -> 'Literal["fallback"]': # pylint: disable=unused-argument
"""Check content type for function call."""
return 'fallback'
def _append_fallback(self, value: 'Any', file: 'TextIO') -> 'None':
if hasattr(value, '__slots__'):
new_value = {key: getattr(value, key) for key in value.__slots__}
elif hasattr(value, '__dict__'):
new_value = vars(value)
else:
logger.warning('unsupported object type: %s', type(value))
new_value = str(value) # type: ignore[assignment]
func = self._encode_func(new_value)
func(new_value, file)
self._ofile = DictDumper if self._flag_f else DictDumper(ofnm) # output file
self.run() # start extraction
[docs] def __iter__(self) -> 'Extractor':
"""Iterate and parse PCAP frame.
Raises:
IterableError: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>`
is :data:`True`, as such operation is not applicable.
"""
if not self._flag_a:
return self
raise IterableError("'Extractor(auto=True)' object is not iterable")
[docs] def __next__(self) -> 'Frame | ScapyPacket | DPKTPacket':
"""Iterate and parse next PCAP frame.
It will call :meth:`_read_frame` to parse next PCAP frame internally,
until the EOF reached; then it calls :meth:`_cleanup` for the aftermath.
"""
try:
return self._read_frame()
except (EOFError, StopIteration):
self._cleanup()
raise StopIteration # pylint: disable=raise-missing-from
[docs] def __call__(self) -> 'Frame | ScapyPacket | DPKTPacket':
"""Works as a simple wrapper for the iteration protocol.
Raises:
IterableError: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>`
is :data:`True`, as iteration is not applicable.
"""
if not self._flag_a:
try:
return self._read_frame()
except (EOFError, StopIteration) as error:
self._cleanup()
raise error
raise CallableError("'Extractor(auto=True)' object is not callable")
def __enter__(self) -> 'Extractor':
"""Uses :class:`Extractor` as a context manager."""
return self
def __exit__(self, exc_type: 'Type[BaseException] | None', exc_value: 'BaseException | None',
traceback: 'TracebackType | None') -> 'None': # pylint: disable=unused-argument
"""Close the input file when exits."""
self._ifile.close()
##########################################################################
# Utilities.
##########################################################################
[docs] def _cleanup(self) -> 'None':
"""Cleanup after extraction & analysis.
The method clears the :attr:`self._expkg <Extractor._expkg>` and
:attr:`self._extmp <Extractor._extmp>` attributes, sets
:attr:`self._flag_e <pcapkit.foundation.extraction.Extractor._flag_e>`
as :data:`True` and closes the input file.
"""
# pylint: disable=attribute-defined-outside-init
self._expkg = None
self._extmp = None
self._flag_e = True
self._ifile.close()
[docs] def _read_frame(self) -> 'Frame | ScapyPacket | DPKTPacket':
"""Headquarters for frame reader.
This method is a dispatcher for parsing frames.
* For Scapy engine, calls :meth:`_scapy_read_frame`.
* For DPKT engine, calls :meth:`_dpkt_read_frame`.
* For PyShark engine, calls :meth:`_pyshark_read_frame`.
* For default (PyPCAPKit) engine, calls :meth:`_default_read_frame`.
Returns:
The parsed frame instance.
"""
if self._exeng == 'scapy':
return self._scapy_read_frame()
if self._exeng == 'dpkt':
return self._dpkt_read_frame()
if self._exeng == 'pyshark':
return self._pyshark_read_frame()
return self._default_read_frame()
[docs] def _default_read_frame(self) -> 'Frame':
"""Read frames with default engine.
This method performs following operations:
- extract frames and each layer of packets;
- make :class:`~pcapkit.corekit.infoclass.Info` object out of frame properties;
- write to output file with corresponding dumper;
- reassemble IP and/or TCP datagram;
- trace TCP flows if any;
- record frame :class:`~pcapkit.corekit.infoclass.Info` object to frame storage.
Returns:
Parsed frame instance.
"""
from pcapkit.toolkit.default import (ipv4_reassembly, ipv6_reassembly, tcp_reassembly,
tcp_traceflow)
# read frame header
frame = Frame(self._ifile, num=self._frnum+1, header=self._gbhdr.info,
layer=self._exlyr, protocol=self._exptl, nanosecond=self._nnsec)
self._frnum += 1
# verbose output
self._vfunc(self, frame)
# write plist
frnum = f'Frame {self._frnum}'
if not self._flag_q:
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/{frnum}.{self._fext}')
ofile(frame.info, name=frnum)
else:
self._ofile(frame.info, name=frnum)
# record fragments
if self._ipv4:
data_ipv4 = ipv4_reassembly(frame)
if data_ipv4 is not None:
cast('IPv4_Reassembly', self._reasm[0])(data_ipv4)
if self._ipv6:
data_ipv6 = ipv6_reassembly(frame)
if data_ipv6 is not None:
cast('IPv6_Reassembly', self._reasm[1])(data_ipv6)
if self._tcp:
data_tcp = tcp_reassembly(frame)
if data_tcp is not None:
cast('TCP_Reassembly', self._reasm[2])(data_tcp)
# trace flows
if self._flag_t:
data_tf = tcp_traceflow(frame, data_link=self._dlink)
if data_tf is not None:
cast('TraceFlow', self._trace)(data_tf)
# record frames
if self._flag_d:
self._frame.append(frame)
# return frame record
return frame
[docs] def _run_scapy(self, scapy_all: 'ModuleType') -> 'None':
"""Call :func:`scapy.all.sniff` to extract PCAP files.
This method assigns :attr:`self._expkg <Extractor._expkg>` as :mod:`scapy.all`
and :attr:`self._extmp <Extractor._extmp>` as an iterator from
:func:`scapy.all.sniff`.
Args:
scapy_all: The :mod:`scapy.all` module.
Warns:
AttributeWarning: If :attr:`self._exlyr <Extractor._exlyr>` and/or
:attr:`self._exptl <Extractor._exptl>` is provided as the Scapy
engine currently does not support such operations.
"""
if self._exlyr != 'none' or self._exptl != 'null':
warn("'Extractor(engine=scapy)' does not support protocol and layer threshold; "
f"'layer={self._exlyr}' and 'protocol={self._exptl}' ignored",
AttributeWarning, stacklevel=stacklevel())
# setup verbose handler
if self._flag_v:
from pcapkit.toolkit.scapy import packet2chain # isort:skip
self._vfunc = lambda e, f: print(
f'Frame {e._frnum:>3d}: {packet2chain(f)}' # pylint: disable=protected-access
) # pylint: disable=logging-fstring-interpolation
# extract global header
self.record_header()
self._ifile.seek(0, os.SEEK_SET)
# extract & analyse file
self._expkg = scapy_all
self._extmp = iter(scapy_all.sniff(offline=self._ifnm)) # type: Iterator[ScapyPacket]
# start iteration
self.record_frames()
[docs] def _scapy_read_frame(self) -> 'ScapyPacket':
"""Read frames with Scapy engine.
Returns:
Parsed frame instance.
See Also:
Please refer to :meth:`_default_read_frame` for more operational information.
"""
from pcapkit.toolkit.scapy import (ipv4_reassembly, ipv6_reassembly, packet2dict,
tcp_reassembly, tcp_traceflow)
# fetch Scapy packet
packet = next(self._extmp)
# verbose output
self._frnum += 1
self._vfunc(self, packet)
# write plist
frnum = f'Frame {self._frnum}'
if not self._flag_q:
info = packet2dict(packet)
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/{frnum}.{self._fext}')
ofile(info, name=frnum)
else:
self._ofile(info, name=frnum)
# record fragments
if self._ipv4:
data_ipv4 = ipv4_reassembly(packet, count=self._frnum)
if data_ipv4 is not None:
cast('IPv4_Reassembly', self._reasm[0])(data_ipv4)
if self._ipv6:
data_ipv6 = ipv6_reassembly(packet, count=self._frnum)
if data_ipv6 is not None:
cast('IPv6_Reassembly', self._reasm[1])(data_ipv6)
if self._tcp:
data_tcp = tcp_reassembly(packet, count=self._frnum)
if data_tcp is not None:
cast('TCP_Reassembly', self._reasm[2])(data_tcp)
# trace flows
if self._flag_t:
data_tf = tcp_traceflow(packet, count=self._frnum)
if data_tf is not None:
cast('TraceFlow', self._trace)(data_tf)
# record frames
if self._flag_d:
# setattr(packet, 'packet2dict', packet2dict)
# setattr(packet, 'packet2chain', packet2chain)
self._frame.append(packet)
# return frame record
return packet
[docs] def _run_dpkt(self, dpkt: 'ModuleType') -> 'None':
"""Call :class:`dpkt.pcap.Reader` to extract PCAP files.
This method assigns :attr:`self._expkg <Extractor._expkg>` as :mod:`dpkt` and
:attr:`self._extmp <Extractor._extmp>` as an iterator from :class:`dpkt.pcap.Reader`.
Args:
dpkt: The :mod:`dpkt` module.
Warns:
AttributeWarning: If :attr:`self._exlyr <Extractor._exlyr>` and/or
:attr:`self._exptl <Extractor._exptl>` is provided as the DPKT
engine currently does not support such operations.
"""
if TYPE_CHECKING:
import dpkt # type: ignore[no-redef]
if self._exlyr != 'none' or self._exptl != 'null':
warn("'Extractor(engine=dpkt)' does not support protocol and layer threshold; "
f"'layer={self._exlyr}' and 'protocol={self._exptl}' ignored",
AttributeWarning, stacklevel=stacklevel())
# setup verbose handler
if self._flag_v:
from pcapkit.toolkit.dpkt import packet2chain # isort:skip
self._vfunc = lambda e, f: print(
f'Frame {e._frnum:>3d}: {packet2chain(f)}' # pylint: disable=protected-access
) # pylint: disable=logging-fstring-interpolation
# extract global header
self.record_header()
self._ifile.seek(0, os.SEEK_SET)
if self._dlink == RegType_LinkType.ETHERNET:
pkg = dpkt.ethernet.Ethernet
elif self._dlink.value == RegType_LinkType.IPV4:
pkg = dpkt.ip.IP
elif self._dlink.value == RegType_LinkType.IPV6:
pkg = dpkt.ip6.IP6
else:
warn('unrecognised link layer protocol; all analysis functions ignored',
DPKTWarning, stacklevel=stacklevel())
class RawPacket(dpkt.dpkt.Packet): # type: ignore[name-defined]
"""Raw packet."""
def __len__(self) -> 'int':
return len(self.data)
def __bytes__(self) -> 'bytes':
return self.data
def unpack(self, buf: 'bytes') -> 'None':
self.data = buf
pkg = RawPacket
# extract & analyse file
self._expkg = pkg
self._extmp = iter(dpkt.pcap.Reader(self._ifile)) # type: Iterator[tuple[float, DPKTPacket]]
# start iteration
self.record_frames()
[docs] def _dpkt_read_frame(self) -> 'DPKTPacket':
"""Read frames with DPKT engine.
Returns:
dpkt.dpkt.Packet: Parsed frame instance.
See Also:
Please refer to :meth:`_default_read_frame` for more operational information.
"""
from pcapkit.toolkit.dpkt import (ipv4_reassembly, ipv6_reassembly, packet2dict,
tcp_reassembly, tcp_traceflow)
# fetch DPKT packet
timestamp, pkt = cast('tuple[float, bytes]', next(self._extmp))
packet = self._expkg(pkt) # type: DPKTPacket
# verbose output
self._frnum += 1
self._vfunc(self, packet)
# write plist
frnum = f'Frame {self._frnum}'
if not self._flag_q:
info = packet2dict(packet, timestamp, data_link=self._dlink)
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/{frnum}.{self._fext}')
ofile(info, name=frnum)
else:
self._ofile(info, name=frnum)
# record fragments
if self._ipv4:
data_ipv4 = ipv4_reassembly(packet, count=self._frnum)
if data_ipv4 is not None:
cast('IPv4_Reassembly', self._reasm[0])(data_ipv4)
if self._ipv6:
data_ipv6 = ipv6_reassembly(packet, count=self._frnum)
if data_ipv6 is not None:
cast('IPv6_Reassembly', self._reasm[1])(data_ipv6)
if self._tcp:
data_tcp = tcp_reassembly(packet, count=self._frnum)
if data_tcp is not None:
cast('TCP_Reassembly', self._reasm[2])(data_tcp)
# trace flows
if self._flag_t:
data_tf = tcp_traceflow(packet, timestamp, data_link=self._dlink, count=self._frnum)
if data_tf is not None:
cast('TraceFlow', self._trace)(data_tf)
# record frames
if self._flag_d:
# setattr(packet, 'packet2dict', packet2dict)
# setattr(packet, 'packet2chain', packet2chain)
self._frame.append(packet)
# return frame record
return packet
[docs] def _run_pyshark(self, pyshark: 'ModuleType') -> 'None':
"""Call :class:`pyshark.FileCapture` to extract PCAP files.
This method assigns :attr:`self._expkg <Extractor._expkg>` as :mod:`pyshark` and
:attr:`self._extmp <Extractor._extmp>` as an iterator from :class:`pyshark.FileCapture`.
Args:
pyshark (types.ModuleType): The :mod:`pyshark` module.
Warns:
AttributeWarning: Warns under following circumstances:
* if :attr:`self._exlyr <Extractor._exlyr>` and/or
:attr:`self._exptl <Extractor._exptl>` is provided as the
PyShark engine currently does not support such operations.
* if reassembly is enabled, as the PyShark engine currently
does not support such operation.
"""
if self._exlyr != 'none' or self._exptl != 'null':
warn("'Extractor(engine=pyshark)' does not support protocol and layer threshold; "
f"'layer={self._exlyr}' and 'protocol={self._exptl}' ignored",
AttributeWarning, stacklevel=stacklevel())
if (self._ipv4 or self._ipv6 or self._tcp):
self._ipv4 = self._ipv6 = self._tcp = False
self._reasm = [None, None, None]
warn("'Extractor(engine=pyshark)' object dose not support reassembly; "
f"so 'ipv4={self._ipv4}', 'ipv6={self._ipv6}' and 'tcp={self._tcp}' will be ignored",
AttributeWarning, stacklevel=stacklevel())
# setup verbose handler
if self._flag_v:
self._vfunc = lambda e, f: print(
f'Frame {e._frnum:>3d}: {f.frame_info.protocols}' # pylint: disable=protected-access
) # pylint: disable=logging-fstring-interpolation
# extract & analyse file
self._expkg = pyshark
self._extmp = iter(pyshark.FileCapture(self._ifnm, keep_packets=False))
# start iteration
self.record_frames()
[docs] def _pyshark_read_frame(self) -> 'PySharkPacket':
"""Read frames with PyShark engine.
Returns:
Parsed frame instance.
See Also:
Please refer to :meth:`_default_read_frame` for more operational information.
"""
from pcapkit.toolkit.pyshark import packet2dict, tcp_traceflow
# fetch PyShark packet
packet = cast('PySharkPacket', next(self._extmp))
# verbose output
self._frnum = int(packet.number)
self._vfunc(self, packet)
# write plist
frnum = f'Frame {self._frnum}'
if not self._flag_q:
info = packet2dict(packet)
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/{frnum}.{self._fext}')
ofile(info, name=frnum)
else:
self._ofile(info, name=frnum)
# trace flows
if self._flag_t:
data_tf = tcp_traceflow(packet)
if data_tf is not None:
cast('TraceFlow', self._trace)(data_tf)
# record frames
if self._flag_d:
# setattr(packet, 'packet2dict', packet2dict)
self._frame.append(packet)
# return frame record
return packet