Source code for pcapkit.foundation.extraction
# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel,fixme
"""extractor for PCAP files
:mod:`pcapkit.foundation.extraction` contains
:class:`~pcapkit.foundation.extraction.Extractor` only,
which synthesises file I/O and protocol analysis,
coordinates information exchange in all network layers,
extracts parametres from a PCAP file.
"""
# TODO: implement engine support for pypcap & pycapfile
import builtins
import collections
import copy
import datetime
import importlib
import ipaddress
import os
import pathlib
import random
import re
import sys
import time
import warnings
from pcapkit.corekit.infoclass import Info
from pcapkit.protocols.pcap.frame import Frame
from pcapkit.protocols.pcap.header import Header
from pcapkit.utilities.compat import pathlib
from pcapkit.utilities.exceptions import (CallableError, FileNotFound, FormatError, IterableError,
UnsupportedCall, stacklevel)
from pcapkit.utilities.warnings import (AttributeWarning, DPKTWarning, EngineWarning, FormatWarning,
LayerWarning, ProtocolWarning)
###############################################################################
# import enum
# import multiprocessing
#
# import aenum
# import dpkt
# from dictdumper import JSON, PLIST, XML, JavaScript, Tree
# from pcapkit.foundation.traceflow import TraceFlow
# from pcapkit.reassembly.ipv4 import IPv4_Reassembly
# from pcapkit.reassembly.ipv6 import IPv6_Reassembly
# from pcapkit.reassembly.tcp import TCP_Reassembly
# from pcapkit.toolkit.default import (ipv4_reassembly, ipv6_reassembly,
# tcp_reassembly, tcp_traceflow)
# from pcapkit.toolkit.dpkt import (ipv4_reassembly, ipv6_reassembly,
# packet2chain, packet2dict, tcp_reassembly,
# tcp_traceflow)
# from pcapkit.toolkit.pyshark import packet2dict, tcp_traceflow
# from pcapkit.toolkit.scapy import (ipv4_reassembly, ipv6_reassembly,
# packet2chain, packet2dict, tcp_reassembly,
# tcp_traceflow)
#
# import scapy.all
###############################################################################
__all__ = ['Extractor']
# check list
#: List of layers.
LAYER_LIST = {'None', 'Link', 'Internet', 'Transport', 'Application'}
#: List of protocols.
PROTO_LIST = {
# base protocols
'null', 'protocol', 'raw',
# PCAP headers
'header', 'frame',
# Link layer
'link', 'arp', 'inarp', 'ethernet', 'l2tp', 'ospf', 'rarp', 'drarp', 'vlan',
# Internet layer
'internet', 'ah', 'hip', 'hopopt', 'ip', 'ipsec', 'ipv4', 'ipv6', 'ipv6_frag',
'ipv6_opts', 'ipv6_route', 'ipx', 'mh',
# Transport layer
'transport', 'tcp', 'udp',
# Application layer
'application', 'ftp', 'http', 'httpv1', 'httpv2',
}
# CPU number
if os.name == 'posix' and 'SC_NPROCESSORS_CONF' in os.sysconf_names:
CPU_CNT = os.sysconf('SC_NPROCESSORS_CONF')
elif 'sched_getaffinity' in os.__all__:
CPU_CNT = len(os.sched_getaffinity(0)) # pylint: disable=E1101
else:
CPU_CNT = os.cpu_count() or 1
[docs]class Extractor:
"""Extractor for PCAP files.
For supported engines, please refer to corresponding driver method for more information:
* Default drivers:
* Global header: :meth:`~pcapkit.foundation.extraction.Extractor.record_header`
* Packet frames: :meth:`~pcapkit.foundation.extraction.Extractor.record_frames`
* DPKT driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_dpkt`
* Scapy driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_scapy`
* PyShark driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_pyshark`
* Multiprocessing driver:
* Pipeline model: :meth:`~pcapkit.foundation.extraction.Extractor._run_pipeline`
* Server model: :meth:`~pcapkit.foundation.extraction.Extractor._run_server`
"""
##########################################################################
# Properties.
##########################################################################
@property
def info(self):
"""Version of input PCAP file.
Raises:
UnsupportedCall: If :attr:`self._exeng <pcapkit.foundation.extraction.Extractor._exeng>`
is ``'scapy'`` or ``'pyshark'``, as such engines does not reserve such information.
:rtype: VersionInfo
"""
if self._exeng in ('scapy', 'pyshark'):
raise UnsupportedCall(f"'Extractor(engine={self._exeng})' object has no attribute 'info'")
return self._vinfo
@property
def length(self):
"""Frame number (of current extracted frame or all).
:rtype: int
"""
return self._frnum
@property
def format(self):
"""Format of output file.
Raises:
UnsupportedCall: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>`
is set as :data:`True`, as output is disabled by initialisation parameter.
:rtype: str
"""
if self._flag_q:
raise UnsupportedCall("'Extractor(nofile=True)' object has no attribute 'format'")
return self._type
@property
def input(self):
"""Name of input PCAP file.
:rtype: str
"""
return self._ifnm
@property
def output(self):
"""Name of output file.
Raises:
UnsupportedCall: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>`
is set as :data:`True`, as output is disabled by initialisation parameter.
:rtype: str
"""
if self._flag_q:
raise UnsupportedCall("'Extractor(nofile=True)' object has no attribute 'format'")
return self._ofnm
@property
def header(self):
"""Global header.
Raises:
UnsupportedCall: If :attr:`self._exeng <pcapkit.foundation.extraction.Extractor._exeng>`
is ``'scapy'`` or ``'pyshark'``, as such engines does not reserve such information.
:rtype: Info[DataType_Header]
"""
if self._exeng in ('scapy', 'pyshark'):
raise UnsupportedCall(f"'Extractor(engine={self._exeng})' object has no attribute 'header'")
return self._gbhdr
@property
def protocol(self):
"""Protocol chain of current frame.
Raises:
UnsupportedCall: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>`
is :data:`True`, as such attribute is not applicable.
:rtype: ProtoChain
"""
if self._flag_a:
raise UnsupportedCall("'Extractor(auto=True)' object has no attribute 'protocol'")
return self._proto
@property
def frame(self):
"""Extracted frames.
Raises:
UnsupportedCall: If :attr:`self._flag_d <pcapkit.foundation.extraction.Extractor._flag_d>`
is :data:`True`, as storing frame data is disabled.
:rtype: Tuple[Info[DataType_Frame]]
"""
if self._flag_d:
return tuple(self._frame)
raise UnsupportedCall("'Extractor(store=False)' object has no attribute 'frame'")
@property
def reassembly(self):
"""Frame record for reassembly.
* ``ipv6`` -- tuple of TCP payload fragment (:class:`~pcapkit.reassembly.ipv4.IPv4_Reassembly`)
* ``ipv4`` -- tuple of TCP payload fragment (:class:`~pcapkit.reassembly.ipv6.IPv6_Reassembly`)
* ``tcp`` -- tuple of TCP payload fragment (:class:`~pcapkit.reassembly.tcp.TCP_Reassembly`)
:rtype: Info
"""
data = Info(
ipv4=tuple(self._reasm[0].datagram) if self._ipv4 else None,
ipv6=tuple(self._reasm[1].datagram) if self._ipv6 else None,
tcp=tuple(self._reasm[2].datagram) if self._tcp else None,
)
return data
@property
def trace(self):
"""Index table for traced flow.
Raises:
UnsupportedCall: If :attr:`self._flag_t <pcapkit.foundation.extraction.Extractor._flag_t>`
is :data:`True`, as TCP flow tracing is disabled.
:rtype: Tuple[Info]
"""
if self._flag_t:
return self._trace.index
raise UnsupportedCall("'Extractor(trace=False)' object has no attribute 'trace'")
@property
def engine(self):
"""PCAP extraction engine.
:rtype: str
"""
return self._exeng
##########################################################################
# Methods.
##########################################################################
[docs] def run(self): # pylint: disable=inconsistent-return-statements
"""Start extraction.
We uses :meth:`~pcapkit.foundation.extraction.Extractor.import_test` to check if
a certain engine is available or not. For supported engines, each engine has
different driver method:
* Default drivers:
* Global header: :meth:`~pcapkit.foundation.extraction.Extractor.record_header`
* Packet frames: :meth:`~pcapkit.foundation.extraction.Extractor.record_frames`
* DPKT driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_dpkt`
* Scapy driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_scapy`
* PyShark driver: :meth:`~pcapkit.foundation.extraction.Extractor._run_pyshark`
* Multiprocessing driver:
* Pipeline model: :meth:`~pcapkit.foundation.extraction.Extractor._run_pipeline`
* Server model: :meth:`~pcapkit.foundation.extraction.Extractor._run_server`
Warns:
EngineWarning: If the extraction engine is not available. This is either due to
dependency not installed, number of CPUs is not enough, or supplied engine
unknown.
"""
flag = True
if self._exeng == 'dpkt':
flag, engine = self.import_test('dpkt', name='DPKT')
if flag:
return self._run_dpkt(engine)
elif self._exeng == 'scapy':
flag, engine = self.import_test('scapy.all', name='Scapy')
if flag:
return self._run_scapy(engine)
elif self._exeng == 'pyshark':
flag, engine = self.import_test('pyshark', name='PyShark')
if flag:
return self._run_pyshark(engine)
elif self._exeng == 'pipeline':
flag, engine = self.import_test('multiprocessing', name='Pipeline Multiprocessing')
self._flag_m = flag = bool(flag and (self._flag_a and CPU_CNT > 1))
if self._flag_m:
return self._run_pipeline(engine)
warnings.warn('extraction engine Pipeline Multiprocessing is not available; '
'using default engine instead', EngineWarning, stacklevel=stacklevel())
elif self._exeng == 'server':
flag, engine = self.import_test('multiprocessing', name='Server Multiprocessing')
self._flag_m = flag = bool(flag and (self._flag_a and CPU_CNT > 2))
if self._flag_m:
return self._run_server(engine)
warnings.warn('extraction engine Server Multiprocessing is not available; '
'using default engine instead', EngineWarning, stacklevel=stacklevel())
elif self._exeng not in ('default', 'pcapkit'):
flag = False
warnings.warn(f'unsupported extraction engine: {self._exeng}; '
'using default engine instead', EngineWarning, stacklevel=stacklevel())
# using default/pcapkit engine
self._exeng = self._exeng if flag else 'default'
self.record_header() # read PCAP global header
self.record_frames() # read frames
[docs] def check(self):
"""Check layer and protocol thresholds.
Warns:
LayerWarning: If :attr:`self._exlyr <pcapkit.foundation.extraction.Extractor._exlyr>`
is not recognised.
ProtocolWarning: If :attr:`self._exptl <pcapkit.foundation.extraction.Extractor._exptl>`
is not recognised.
See Also:
* List of available layers: :data:`~pcapkit.foundation.extractor.LAYER_LIST`
* List of available protocols: :data:`~pcapkit.foundation.extractor.PROTO_LIST`
"""
layer = self._exlyr
if layer is not None:
if layer not in LAYER_LIST:
warnings.warn(f'unrecognised layer: {layer}', LayerWarning, stacklevel=stacklevel())
protocol = self._exptl
if protocol is not None:
def check_protocol(*args):
for arg in args:
if arg.lower() not in PROTO_LIST:
warnings.warn(f'unrecognised protocol: {protocol}', ProtocolWarning, stacklevel=stacklevel())
if isinstance(protocol, tuple):
check_protocol(*protocol)
else:
check_protocol(protocol)
[docs] @staticmethod
def import_test(engine, *, name=None):
"""Test import for extractcion engine.
Args:
engine (str): Extraction engine module name.
Keyword Args:
name (Optional[str]): Extraction engine display name.
Warns:
EngineWarning: If the engine module is not installed.
Returns:
Tuple[bool, Optional[ModuleType]]: If succeeded, returns :data:`True`
and the module; otherwise, returns :data:`False` and :data:`None`.
"""
try:
engine = importlib.import_module(engine)
return True, engine
except ImportError:
warnings.warn(f"extraction engine '{name or engine}' not available; "
'using default engine instead', EngineWarning, stacklevel=stacklevel())
return False, None
[docs] @classmethod
def make_name(cls, fin, fout, fmt, extension, *, files=False, nofile=False):
"""Generate input and output filenames.
The method will perform following processing:
1. sanitise ``fin`` as the input PCAP filename; ``in.pcap`` as default value and
append ``.pcap`` extension if needed and ``extension`` is :data:`True`; as well
as test if the file exists;
2. if ``nofile`` is :data:`True`, skips following processing;
3. if ``fmt`` provided, then it presumes corresponding output file extension;
4. if ``fout`` not provided, it presumes the output file name based on the presumptive
file extension; the stem of the output file name is set as ``out``; should the file
extension is not available, then it raises :exc:`~pcapkit.utilities.exceptions.FormatError`;
5. if ``fout`` provided, it presumes corresponding output format if needed; should the
presumption cannot be made, then it raises :exc:`~pcapkit.utilities.exceptions.FormatError`;
6. it will also append corresponding file extension to the output file name if needed
and ``extension`` is :data:`True`.
Args:
fin (Optional[str]): Input filename.
fout (Optional[str]): Output filename.
fmt (str): Output file format.
extension (bool): If append ``.pcap`` file extension to the input filename
if ``fin`` does not have such file extension; if check and append extensions
to output file.
Keyword Args:
files (bool): If split each frame into different files.
nofile (bool): If no output file is to be dumped.
Returns:
Tuple[str, str, str, str, bool]: Generated input and output filenames:
0. input filename
1. output filename / directory name
2. output format
3. output file extension (without ``.``)
4. if split each frame into different files
Raises:
FileNotFound: If input file does not exists.
FormatError: If output format not provided and cannot be presumpted.
"""
if fin is None:
ifnm = 'in.pcap'
else:
if extension: # pylint: disable=else-if-used
ifnm = fin if os.path.splitext(fin)[1] == '.pcap' else f'{fin}.pcap'
else:
ifnm = fin
if not os.path.isfile(ifnm):
raise FileNotFound(2, 'No such file or directory', ifnm)
if nofile:
ofnm = None
ext = None
else:
fmt_none = (fmt is None)
if fmt == 'html':
ext = 'js'
elif fmt == 'tree':
ext = 'txt'
else:
ext = fmt
if fout is None:
if fmt_none: # pylint: disable=no-else-raise
raise FormatError('Output format unspecified.')
elif files:
ofnm = 'out'
pathlib.Path(ofnm).mkdir(parents=True, exist_ok=True)
else:
ofnm = f'out.{ext}'
else:
fext = os.path.splitext(fout)[1]
pathlib.Path(os.path.split(fout)[0]).mkdir(parents=True, exist_ok=True)
if fext:
files = False
ofnm = fout
fmt = fmt or fext[1:] or None
if fmt is None:
raise FormatError('Output format unspecified.')
elif fmt_none:
raise FormatError('Output format unspecified.')
elif files:
ofnm = fout
pathlib.Path(ofnm).mkdir(parents=True, exist_ok=True)
elif extension:
ofnm = f'{fout}.{ext}'
else:
ofnm = fout
return ifnm, ofnm, fmt, ext, files
[docs] def record_header(self):
"""Read global header.
The method will parse the PCAP global header and save the parsed result
as :attr:`self._gbhdr <Extractor._gbhdr>`. Information such as PCAP version,
data link layer protocol type, nanosecond flag and byteorder will also be
save the current :class:`Extractor` instance.
If TCP flow tracing is enabled, the nanosecond flag and byteorder will
be used for the output PCAP file of the traced TCP flows.
For output, the method will dump the parsed PCAP global header under
the name of ``Global Header``.
"""
# pylint: disable=attribute-defined-outside-init,protected-access
self._gbhdr = Header(self._ifile)
self._vinfo = self._gbhdr.version
self._dlink = self._gbhdr.protocol
self._nnsec = self._gbhdr.nanosecond
if self._trace is not NotImplemented:
self._trace._endian = self._gbhdr.byteorder
self._trace._nnsecd = self._gbhdr.nanosecond
if not self._flag_q:
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/Global Header.{self._fext}')
ofile(self._gbhdr.info, name='Global Header')
self._type = ofile.kind
else:
self._ofile(self._gbhdr.info, name='Global Header')
self._type = self._ofile.kind
[docs] def record_frames(self):
"""Read packet frames.
The method calls :meth:`_read_frame` to parse each frame from the input
PCAP file; and calls :meth:`_cleanup` upon complision.
Notes:
Under non-auto mode, i.e. :attr:`self._flag_a <Extractor._flag_a>` is
:data:`False`, the method performs no action.
"""
if self._flag_a:
while True:
try:
self._read_frame()
except (EOFError, StopIteration):
# quit when EOF
break
self._cleanup()
##########################################################################
# Data modules.
##########################################################################
[docs] def __init__(self,
fin=None, fout=None, format=None, # basic settings # pylint: disable=redefined-builtin
auto=True, extension=True, store=True, # internal settings
files=False, nofile=False, verbose=False, # output settings
engine=None, layer=None, protocol=None, # extraction settings
ip=False, ipv4=False, ipv6=False, tcp=False, strict=True, # reassembly settings
trace=False, trace_fout=None, trace_format=None, # trace settings
trace_byteorder=sys.byteorder, trace_nanosecond=False): # trace settings
"""Initialise PCAP Reader.
Arguments:
fin (Optiona[str]): file name to be read; if file not exist, raise :exc:`FileNotFound`
fout (Optiona[str]): file name to be written
format (Optional[Literal['plist', 'json', 'tree']]): file format of output
auto (bool): if automatically run till EOF
extension (bool): if check and append extensions to output file
store (bool): if store extracted packet info
files (bool): if split each frame into different files
nofile (bool): if no output file is to be dumped
verbose (Union[bool, Callable[[pcapkit.foundation.extraction.Extractor,
pcapkit.protocol.pcap.frame.Frame]]]): a :obj:`bool` value or a function takes the :class:`Extract`
instance and current parsed frame (depends on engine selected) as parameters to print verbose output
information
engine (Optional[Literal['default', 'pcapkit', 'dpkt', 'scapy', 'pyshark', 'server', 'pipeline']]):
extraction engine to be used
layer (Optional[Literal['Link', 'Internet', 'Transport', 'Application']]): extract til which layer
protocol (Optional[Union[str, Tuple[str], Type[Protocol]]]): extract til which protocol
ip (bool): if record data for IPv4 & IPv6 reassembly
ipv4 (bool): if perform IPv4 reassembly
ipv6 (bool): if perform IPv6 reassembly
tcp (bool): if perform TCP reassembly
strict (bool): if set strict flag for reassembly
trace (bool): if trace TCP traffic flows
trace_fout (Optional[str]): path name for flow tracer if necessary
trace_format (Optional[Literal['plist', 'json', 'tree', 'pcap']]): output file
format of flow tracer
trace_byteorder (Literal['little', 'big']): output file byte order
trace_nanosecond (bool): output nanosecond-resolution file flag
Warns:
FormatWarning: Warns under following circumstances:
* If using PCAP output for TCP flow tracing while the extraction engine is PyShark.
* If output file format is not supported.
"""
ifnm, ofnm, fmt, ext, files = self.make_name(fin, fout, format, extension, files=files, nofile=nofile)
# put back builtin
format = builtins.format
self._ifnm = ifnm # input file name
self._ofnm = ofnm # output file name
self._fext = ext # output file extension
self._flag_a = auto # auto extract flag
self._flag_d = store # store data flag
self._flag_e = False # EOF flag
self._flag_f = files # split file flag
self._flag_m = False # multiprocessing flag
self._flag_q = nofile # no output flag
self._flag_t = trace # trace flag
self._flag_v = bool(verbose) # verbose output flag
# verbose callback function
if isinstance(verbose, bool):
self._vfunc = NotImplemented
else:
self._vfunc = verbose
self._frnum = 0 # frame number
self._frame = list() # frame record
self._proto = None # frame ProtoChain
self._reasm = [None for _ in range(3)]
# frame record for reassembly (IPv4 / IPv6 / TCP)
self._trace = NotImplemented # flow tracer
self._ipv4 = ipv4 or ip # IPv4 Reassembly
self._ipv6 = ipv6 or ip # IPv6 Reassembly
self._tcp = tcp # TCP Reassembly
self._exptl = protocol or 'null' # extract til protocol
self._exlyr = (layer or 'none').capitalize() # extract til layer
self._exeng = (engine or 'default').lower() # extract using engine
if self._ipv4:
from pcapkit.reassembly.ipv4 import IPv4_Reassembly
self._reasm[0] = IPv4_Reassembly(strict=strict)
if self._ipv6:
from pcapkit.reassembly.ipv6 import IPv6_Reassembly
self._reasm[1] = IPv6_Reassembly(strict=strict)
if self._tcp:
from pcapkit.reassembly.tcp import TCP_Reassembly
self._reasm[2] = TCP_Reassembly(strict=strict)
if trace:
from pcapkit.foundation.traceflow import TraceFlow
if self._exeng in ('pyshark',) and re.fullmatch('pcap', str(trace_format), re.IGNORECASE):
warnings.warn(f"'Extractor(engine={self._exeng})' does not support 'trace_format={trace_format}'; "
"using 'trace_format=None' instead", FormatWarning, stacklevel=stacklevel())
trace_format = None
self._trace = TraceFlow(fout=trace_fout, format=trace_format,
byteorder=trace_byteorder, nanosecond=trace_nanosecond)
self._ifile = open(ifnm, 'rb') # input file
if not self._flag_q:
if fmt == 'plist':
from dictdumper import PLIST as output # output PLIST file
elif fmt == 'json':
from dictdumper import JSON as output # output JSON file
elif fmt == 'tree':
from dictdumper import Tree as output # output treeview text file
#elif fmt == 'html':
# from dictdumper import VueJS as output # output JavaScript file
#elif fmt == 'xml':
# from dictdumper import XML as output # output XML file
else:
from pcapkit.dumpkit import NotImplementedIO as output # no output file
warnings.warn(f'unsupported output format: {fmt}; disabled file output feature',
FormatWarning, stacklevel=stacklevel())
class DictDumper(output):
"""Customised :class:`~dictdumper.dumper.Dumper` object."""
def object_hook(self, o):
"""Convert content for function call.
Args:
o (:obj:`Any`): object to convert
Returns:
:obj:`Any`: the converted object
"""
import enum
import aenum
if isinstance(o, (enum.IntEnum, aenum.IntEnum)):
return dict(
enum=type(o).__name__,
desc=o.__doc__,
name=o.name,
value=o.value,
)
if isinstance(o, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
return str(o)
if isinstance(o, Info):
return o.info2dict()
return super().object_hook(o)
self._ofile = DictDumper if self._flag_f else DictDumper(ofnm) # output file
self.check() # check layer & protocol
self.run() # start extraction
[docs] def __iter__(self):
"""Iterate and parse PCAP frame.
Raises:
IterableError: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>`
is :data:`True`, as such operation is not applicable.
"""
if not self._flag_a:
return self
raise IterableError("'Extractor(auto=True)' object is not iterable")
[docs] def __next__(self):
"""Iterate and parse next PCAP frame.
It will call :meth:`_read_frame` to parse next PCAP frame internally,
until the EOF reached; then it calls :meth:`_cleanup` for the aftermath.
"""
try:
return self._read_frame()
except (EOFError, StopIteration):
self._cleanup()
raise StopIteration
[docs] def __call__(self):
"""Works as a simple wrapper for the iteration protocol.
Raises:
IterableError: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>`
is :data:`True`, as iteration is not applicable.
"""
if not self._flag_a:
try:
return self._read_frame()
except (EOFError, StopIteration) as error:
self._cleanup()
raise error from None
raise CallableError("'Extractor(auto=True)' object is not callable")
[docs] def __exit__(self, exc_type, exc_value, traceback): # pylint: disable=unused-argument
"""Close the input file when exits."""
self._ifile.close()
##########################################################################
# Utilities.
##########################################################################
[docs] def _cleanup(self):
"""Cleanup after extraction & analysis.
The method clears the :attr:`self._expkg <Extractor._expkg>` and
:attr:`self._extmp <Extractor._extmp>` attributes, sets
:attr:`self._flag_e <pcapkit.foundation.extraction.Extractor._flag_e>`
as :data:`True` and closes the input file.
"""
# pylint: disable=attribute-defined-outside-init
self._expkg = None
self._extmp = None
self._flag_e = True
self._ifile.close()
[docs] def _aftermathmp(self):
"""Aftermath for multiprocessing.
The method will *join* all child processes forked/spawned as in
:attr:`self._mpprc <Extractor._mpprc>`, and will *join*
:attr:`self._mpsrv <Extractor._mpsrv>` server process if using
multiprocessing server engine.
For multiprocessing server engine, it will
* assign :attr:`self._mpfrm <Extractor._mpfrm>` to :attr:`self._frame <Extractor._frame>`
* assign :attr:`self._mprsm <Extractor._mprsm>` to :attr:`self._reasm <Extractor._reasm>`
* copy :attr:`self._mpkit.trace <Extractor._mpkit.trace>` to :attr:`self._trace <Extractor._trace>`
For multiprocessing pipeline engine, it will
* restore :attr:`self._frame <Extractor._frame>` from :attr:`self._mpkit.frames <Extractor._mpkit.frames>`
* copy :attr:`self._mpkit.reassembly <Extractor._mpkit.reassembly>` to :attr:`self._reasm <Extractor._reasm>`
* copy :attr:`self._mpkit.trace <Extractor._mpkit.trace>` to :attr:`self._trace <Extractor._trace>`
After restoring attributes, it will *shutdown* multiprocessing manager context
:attr:`self._mpmng <Extractor._mpmng>`, delete all multiprocessing attributes (i.e. starts with `_mp`),
and deduct the frame number :attr:`self._frnum <Extractor._frnum>` by 2 (*hacking solution*).
Notes:
If :attr:`self._flag_e <pcapkit.foundation.extraction.Extractor._flag_e>` is already
set as :data:`True`, do nothing.
Raises:
UnsupportedCall: If :attr:`self._flag_m <pcapkit.foundation.extraction.Extractor._flag_m>`
is :data:`False`, as such operation is not applicable.
"""
if not self._flag_m:
raise UnsupportedCall(f"Extractor(engine={self._exeng})' has no attribute '_aftermathmp'")
if self._flag_e:
return
# join processes
[proc.join() for proc in self._mpprc] # pylint: disable=expression-not-assigned
if self._exeng == 'server':
self._mpsrv.join()
# restore attributes
if self._exeng == 'server':
self._frame = list(self._mpfrm)
self._reasm = list(self._mprsm)
self._trace = copy.deepcopy(self._mpkit.trace)
if self._exeng == 'pipeline':
self._frame = [self._mpkit.frames[x] for x in sorted(self._mpkit.frames)]
self._reasm = copy.deepcopy(self._mpkit.reassembly)
self._trace = copy.deepcopy(self._mpkit.trace)
# shutdown & cleanup
self._mpmng.shutdown()
[delattr(self, attr) for attr in filter(lambda s: s.startswith('_mp'), dir(self))] # pylint: disable=expression-not-assigned
self._frnum -= 2
[docs] def _update_eof(self):
"""Update EOF flag.
This method calls :meth:`_aftermathmp` to cleanup multiproccessing stuff,
closes the input file and toggle :attr:`self._flag_e <pcapkit.foundation.extraction.Extractor._flag_e>`
as :data:`True`.
"""
self._aftermathmp()
self._ifile.close()
self._flag_e = True
[docs] def _read_frame(self):
"""Headquarters for frame reader.
This method is a dispatcher for parsing frames.
* For Scapy engine, calls :meth:`_scapy_read_frame`.
* For DPKT engine, calls :meth:`_dpkt_read_frame`.
* For PyShark engine, calls :meth:`_pyshark_read_frame`.
* For default (PyPCAPKit) engine, calls :meth:`_default_read_frame`.
Returns:
The parsed frame instance.
"""
if self._exeng == 'scapy':
return self._scapy_read_frame()
if self._exeng == 'dpkt':
return self._dpkt_read_frame()
if self._exeng == 'pyshark':
return self._pyshark_read_frame()
return self._default_read_frame()
[docs] def _default_read_frame(self, *, frame=None, mpkit=None):
"""Read frames with default engine.
This method performs following operations:
- extract frames and each layer of packets;
- make :class:`~pcapkit.corekit.infoclass.Info` object out of frame properties;
- write to output file with corresponding dumper;
- reassemble IP and/or TCP datagram;
- trace TCP flows if any;
- record frame :class:`~pcapkit.corekit.infoclass.Info` object to frame storage.
Keyword Args:
frame (Optional[pcapkit.protocols.pcap.frame.Frame]): The fallback ``frame`` data
(for multiprocessing engines).
mpkit (multiprocessing.managers.SyncManager.Namespace): The multiprocess data kit.
Returns:
Optional[pcapkit.protocols.pcap.frame.Frame]: Parsed frame instance.
"""
from pcapkit.toolkit.default import (ipv4_reassembly, ipv6_reassembly,
tcp_reassembly, tcp_traceflow)
# read frame header
if not self._flag_m:
frame = Frame(self._ifile, num=self._frnum+1, proto=self._dlink,
layer=self._exlyr, protocol=self._exptl, nanosecond=self._nnsec)
self._frnum += 1
# verbose output
if self._flag_v:
if self._vfunc is NotImplemented:
print(f' - Frame {self._frnum:>3d}: {frame.protochain}')
else:
self._vfunc(self, frame)
# write plist
frnum = f'Frame {self._frnum}'
if not self._flag_q:
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/{frnum}.{self._fext}')
ofile(frame.info, name=frnum)
else:
self._ofile(frame.info, name=frnum)
# record fragments
if self._ipv4:
flag, data = ipv4_reassembly(frame)
if flag:
self._reasm[0](data) # pylint: disable=E1102
if self._ipv6:
flag, data = ipv6_reassembly(frame)
if flag:
self._reasm[1](data) # pylint: disable=E1102
if self._tcp:
flag, data = tcp_reassembly(frame)
if flag:
self._reasm[2](data) # pylint: disable=E1102
# trace flows
if self._flag_t:
flag, data = tcp_traceflow(frame, data_link=self._dlink)
if flag:
self._trace(data)
# record frames
if self._exeng == 'pipeline':
if self._flag_d:
# frame._file = NotImplemented
mpkit.frames[self._frnum] = frame
# print(self._frnum, 'stored')
mpkit.current += 1
elif self._exeng == 'server':
# record frames
if self._flag_d:
# frame._file = NotImplemented
self._frame.append(frame)
# print(self._frnum, 'stored')
self._frnum += 1
else:
if self._flag_d:
self._frame.append(frame)
self._proto = frame.protochain.chain
# return frame record
return frame
[docs] def _run_scapy(self, scapy_all):
"""Call :func:`scapy.all.sniff` to extract PCAP files.
This method assigns :attr:`self._expkg <Extractor._expkg>` as :mod:`scapy.all`
and :attr:`self._extmp <Extractor._extmp>` as an iterator from
:func:`scapy.all.sniff`.
Args:
scapy_all (types.ModuleType): The :mod:`scapy.all` module.
Warns:
AttributeWarning: If :attr:`self._exlyr <Extractor._exlyr>` and/or
:attr:`self._exptl <Extractor._exptl>` is provided as the Scapy
engine currently does not support such operations.
"""
# pylint: disable=attribute-defined-outside-init
# if not self._flag_a:
# self._flag_a = True
# warnings.warn(f"'Extractor(engine=scapy)' object is not iterable; "
# "so 'auto=False' will be ignored", AttributeWarning, stacklevel=stacklevel())
if self._exlyr != 'None' or self._exptl != 'null':
warnings.warn("'Extractor(engine=scapy)' does not support protocol and layer threshold; "
f"'layer={self._exlyr}' and 'protocol={self._exptl}' ignored",
AttributeWarning, stacklevel=stacklevel())
# extract & analyse file
self._expkg = scapy_all
self._extmp = iter(scapy_all.sniff(offline=self._ifnm))
# start iteration
self.record_frames()
[docs] def _scapy_read_frame(self):
"""Read frames with Scapy engine.
Returns:
scapy.packet.Packet: Parsed frame instance.
See Also:
Please refer to :meth:`_default_read_frame` for more operational information.
"""
from pcapkit.toolkit.scapy import (ipv4_reassembly, ipv6_reassembly,
packet2chain, packet2dict, tcp_reassembly,
tcp_traceflow)
# fetch Scapy packet
packet = next(self._extmp)
# verbose output
self._frnum += 1
self._proto = packet2chain(packet)
if self._flag_v:
if self._vfunc is NotImplemented:
print(f' - Frame {self._frnum:>3d}: {self._proto}')
else:
self._vfunc(self, packet)
# write plist
frnum = f'Frame {self._frnum}'
if not self._flag_q:
info = packet2dict(packet)
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/{frnum}.{self._fext}')
ofile(info, name=frnum)
else:
self._ofile(info, name=frnum)
# record frames
if self._flag_d:
# setattr(packet, 'packet2dict', packet2dict)
# setattr(packet, 'packet2chain', packet2chain)
self._frame.append(packet)
# record fragments
if self._ipv4:
flag, data = ipv4_reassembly(packet, count=self._frnum)
if flag:
self._reasm[0](data) # pylint: disable=E1102
if self._ipv6:
flag, data = ipv6_reassembly(packet, count=self._frnum)
if flag:
self._reasm[1](data) # pylint: disable=E1102
if self._tcp:
flag, data = tcp_reassembly(packet, count=self._frnum)
if flag:
self._reasm[2](data) # pylint: disable=E1102
# trace flows
if self._flag_t:
flag, data = tcp_traceflow(packet, count=self._frnum)
if flag:
self._trace(data)
return packet
[docs] def _run_dpkt(self, dpkt):
"""Call :class:`dpkt.pcap.Reader` to extract PCAP files.
This method assigns :attr:`self._expkg <Extractor._expkg>` as :mod:`dpkt` and
:attr:`self._extmp <Extractor._extmp>` as an iterator from :class:`dpkt.pcap.Reader`.
Args:
dpkt (types.ModuleType): The :mod:`dpkt` module.
Warns:
AttributeWarning: If :attr:`self._exlyr <Extractor._exlyr>` and/or
:attr:`self._exptl <Extractor._exptl>` is provided as the DPKT
engine currently does not support such operations.
"""
# pylint: disable=attribute-defined-outside-init
# if not self._flag_a:
# self._flag_a = True
# warnings.warn(f"'Extractor(engine=dpkt)' object is not iterable; "
# "so 'auto=False' will be ignored", AttributeWarning, stacklevel=stacklevel())
if self._exlyr != 'None' or self._exptl != 'null':
warnings.warn("'Extractor(engine=dpkt)' does not support protocol and layer threshold; "
f"'layer={self._exlyr}' and 'protocol={self._exptl}' ignored",
AttributeWarning, stacklevel=stacklevel())
# extract global header
self.record_header()
self._ifile.seek(0, os.SEEK_SET)
# extract & analyse file
self._expkg = dpkt
self._extmp = iter(dpkt.pcap.Reader(self._ifile))
# start iteration
self.record_frames()
[docs] def _dpkt_read_frame(self):
"""Read frames with DPKT engine.
Returns:
dpkt.dpkt.Packet: Parsed frame instance.
See Also:
Please refer to :meth:`_default_read_frame` for more operational information.
"""
from pcapkit.toolkit.dpkt import (ipv4_reassembly, ipv6_reassembly,
packet2chain, packet2dict, tcp_reassembly,
tcp_traceflow)
# fetch DPKT packet
timestamp, packet = next(self._extmp)
# extract packet
if self._dlink.value == 1:
packet = self._expkg.ethernet.Ethernet(packet)
elif self._dlink.value == 228:
packet = self._expkg.ip.IP(packet)
elif self._dlink.value == 229:
packet = self._expkg.ip6.IP6(packet)
else:
warnings.warn('unrecognised link layer protocol; all analysis functions ignored',
DPKTWarning, stacklevel=stacklevel())
self._frnum += 1
if self._flag_d:
self._frame.append(packet)
return packet
# verbose output
self._frnum += 1
self._proto = packet2chain(packet)
if self._flag_v:
if self._vfunc is NotImplemented:
print(f' - Frame {self._frnum:>3d}: {self._proto}')
else:
self._vfunc(self, packet)
# write plist
frnum = f'Frame {self._frnum}'
if not self._flag_q:
info = packet2dict(packet, timestamp, data_link=self._dlink)
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/{frnum}.{self._fext}')
ofile(info, name=frnum)
else:
self._ofile(info, name=frnum)
# record frames
if self._flag_d:
setattr(packet, 'packet2dict', packet2dict)
setattr(packet, 'packet2chain', packet2chain)
self._frame.append(packet)
# record fragments
if self._ipv4:
flag, data = ipv4_reassembly(packet, count=self._frnum)
if flag:
self._reasm[0](data) # pylint: disable=E1102
if self._ipv6:
flag, data = ipv6_reassembly(packet, count=self._frnum)
if flag:
self._reasm[1](data) # pylint: disable=E1102
if self._tcp:
flag, data = tcp_reassembly(packet, count=self._frnum)
if flag:
self._reasm[2](data) # pylint: disable=E1102
# trace flows
if self._flag_t:
flag, data = tcp_traceflow(packet, timestamp, data_link=self._dlink, count=self._frnum)
if flag:
self._trace(data)
return packet
[docs] def _run_pyshark(self, pyshark):
"""Call :class:`pyshark.FileCapture` to extract PCAP files.
This method assigns :attr:`self._expkg <Extractor._expkg>` as :mod:`pyshark` and
:attr:`self._extmp <Extractor._extmp>` as an iterator from :class:`pyshark.FileCapture`.
Args:
pyshark (types.ModuleType): The :mod:`pyshark` module.
Warns:
AttributeWarning: Warns under following circumstances:
* if :attr:`self._exlyr <Extractor._exlyr>` and/or
:attr:`self._exptl <Extractor._exptl>` is provided as the
PyShark engine currently does not support such operations.
* if reassembly is enabled, as the PyShark engine currently
does not support such operation.
"""
# pylint: disable=attribute-defined-outside-init
# if not self._flag_a:
# self._flag_a = True
# warnings.warn(f"'Extractor(engine=pyshark)' object is not iterable; "
# "so 'auto=False' will be ignored", AttributeWarning, stacklevel=stacklevel())
if self._exlyr != 'None' or self._exptl != 'null':
warnings.warn("'Extractor(engine=pyshark)' does not support protocol and layer threshold; "
f"'layer={self._exlyr}' and 'protocol={self._exptl}' ignored",
AttributeWarning, stacklevel=stacklevel())
if (self._ipv4 or self._ipv6 or self._tcp):
self._ipv4 = self._ipv6 = self._tcp = False
self._reasm = [None, None, None]
warnings.warn("'Extractor(engine=pyshark)' object dose not support reassembly; "
f"so 'ipv4={self._ipv4}', 'ipv6={self._ipv6}' and 'tcp={self._tcp}' will be ignored",
AttributeWarning, stacklevel=stacklevel())
# extract & analyse file
self._expkg = pyshark
self._extmp = iter(pyshark.FileCapture(self._ifnm, keep_packets=False))
# start iteration
self.record_frames()
[docs] def _pyshark_read_frame(self):
"""Read frames with PyShark engine.
Returns:
pyshark.packet.packet.Packet: Parsed frame instance.
Notes:
This method inserts :func:`~pcapkit.toolkit.pyshark.packet2dict` to the parsed
frame instance as :meth:`~pyshark.packet.packet.Packet.packet2dict` method.
See Also:
Please refer to :meth:`_default_read_frame` for more operational information.
"""
from pcapkit.toolkit.pyshark import packet2dict, tcp_traceflow
# fetch PyShark packet
packet = next(self._extmp)
# def _pyshark_packet2chain(packet):
# """Fetch PyShark packet protocol chain."""
# return ':'.join(map(lambda layer: layer.layer_name.upper(), packet.layers))
# verbose output
self._frnum = int(packet.number)
self._proto = packet.frame_info.protocols
if self._flag_v:
if self._vfunc is NotImplemented:
print(f' - Frame {self._frnum:>3d}: {self._proto}')
else:
self._vfunc(self, packet)
# write plist
frnum = f'Frame {self._frnum}'
if not self._flag_q:
info = packet2dict(packet)
if self._flag_f:
ofile = self._ofile(f'{self._ofnm}/{frnum}.{self._fext}')
ofile(info, name=frnum)
else:
self._ofile(info, name=frnum)
# record frames
if self._flag_d:
setattr(packet, 'packet2dict', packet2dict)
self._frame.append(packet)
# trace flows
if self._flag_t:
flag, data = tcp_traceflow(packet)
if flag:
self._trace(data)
return packet
[docs] def _run_pipeline(self, multiprocessing):
"""Use pipeline multiprocessing to extract PCAP files.
Notes:
The basic concept of multiprocessing pipeline engine is that we parse the PCAP file as a pipeline.
Each frame per worker. Once the length of a frame is known, i.e. the PCAP frame header is parsed,
then we can start a new working and start parsing the next frame concurrently.
However, as the datagram reassembly and TCP flow tracing require linear sequential processing, we
still need to *wait* for the completion of analysis on previous frames before proceeding on such
operations.
This method assigns :attr:`self._expkg <Extractor._expkg>` as :mod:`multiprocessing`, creates a file
pointer storage as :attr:`self._mpfdp <Extractor._mpfdp>`, manager context as
:attr:`self._mpmng <Extractor._mpmng>` and namespace as :attr:`self._mpkit <Extractor._mpkit>`.
In the namespace, we initiate number of (on duty) workers as ``counter``, pool of (ready) workers
as ``pool``, current frame number as ``current``, EOF flag as ``eof``, frame storage as ``frames``,
TCP flow tracer :attr:`self._trace <Extractor._trace>` as ``trace`` and the reassembly buffers
:attr:`self._reasm <Extractor._reasm>` as ``reassembly``.
After initial setup, the method calls :meth:`record_header` to parse the PCAP global header and
*put* the file offset to :attr:`self._mpfdp <Extractor._mpfdp>` as the start of first frame. Then
it starts the parsing of each PCAP frame.
During this phrase, it's a :token:`while <while_stmt>` clause until
:attr:`self._mpkit.eof <Extractor._mpkit.eof>` is set as :data:`True` then it calls :meth:`_update_eof`
and breaks. In the :token:`while <while_stmt>` clause, it maintains a :class:`multiprocessing.pool.Pool`
like worker pool. It checks the :attr:`self._mpkit.pool <Extractor._mpkit.pool>` for available workers and
:attr:`self._mpkit.counter <Extractor._mpkit.counter>` for active workers.
When starts a new worker, it first update the input file offset to the file offset as specified
in :attr:`self._mpfdp <Extractor._mpfdp>`. Then creates a child process running :meth:`_pipeline_read_frame`
with keyword arguments ``mpkit`` as :attr:`self._mpkit <Extractor._mpkit>` and ``mpfdp`` as corresponding
:class:`~multiprocessing.Queue` from :attr:`self._mpfdp <Extractor._mpfdp>`. Later, it decendants the
:attr:`self._mpkit.pool <Extractor._mpkit.pool>` and increments the
:attr:`self._mpkit.counter <Extractor._mpkit.counter>`, both by ``1``. The child process will be appended to
:attr:`self._mpprc <Extractor._mpprc>`.
When the number of active workers is greater than or equal to :data:`CPU_CNT`, it waits and *join*
the leading child processes in :attr:`self._mpprc <Extractor._mpprc>` then removes their reference.
Args:
multiprocessing (types.ModuleType): The :mod:`multiprocessing` module.
Warns:
AttributeWarning: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>`
is :data:`False`, as multiprocessing engines do not support output.
Raises:
UnsupportedCall: If :attr:`self._flag_m <pcapkit.foundation.extraction.Extractor._flag_m>`
is :data:`False`, as such operation is not applicable.
"""
# pylint: disable=attribute-defined-outside-init
if not self._flag_m:
raise UnsupportedCall(f"Extractor(engine={self._exeng})' has no attribute '_run_pipline'")
if not self._flag_q:
self._flag_q = True
warnings.warn("'Extractor(engine=pipeline)' does not support output; "
f"'fout={self._ofnm}' ignored", AttributeWarning, stacklevel=stacklevel())
self._frnum = 1 # frame number (revised)
self._expkg = multiprocessing # multiprocessing module
self._mpprc = list() # multiprocessing process list
self._mpfdp = collections.defaultdict(multiprocessing.Queue) # multiprocessing file pointer
self._mpmng = multiprocessing.Manager() # multiprocessing manager
self._mpkit = self._mpmng.Namespace() # multiprocessing work kit
self._mpkit.counter = 0 # work count (on duty)
self._mpkit.pool = 1 # work pool (ready)
self._mpkit.current = 1 # current frame number
self._mpkit.eof = False # EOF flag
self._mpkit.frames = dict() # frame storage
self._mpkit.trace = self._trace # flow tracer
self._mpkit.reassembly = copy.deepcopy(self._reasm) # reassembly buffers
# preparation
self.record_header()
self._mpfdp[0].put(self._gbhdr.length)
# extraction
while True:
# check EOF
if self._mpkit.eof:
self._update_eof()
break
# check counter
if self._mpkit.pool and self._mpkit.counter < CPU_CNT:
# update file offset
self._ifile.seek(self._mpfdp.pop(self._frnum-1).get(), os.SEEK_SET)
# create worker
# print(self._frnum, 'start')
proc = multiprocessing.Process(
target=self._pipeline_read_frame,
kwargs={'mpkit': self._mpkit, 'mpfdp': self._mpfdp[self._frnum]}
)
# update status
self._mpkit.pool -= 1
self._mpkit.counter += 1
# start and record
proc.start()
self._frnum += 1
self._mpprc.append(proc)
# check buffer
if len(self._mpprc) >= CPU_CNT:
[proc.join() for proc in self._mpprc[:-4]] # pylint: disable=expression-not-assigned
del self._mpprc[:-4]
[docs] def _pipeline_read_frame(self, *, mpfdp, mpkit):
"""Extract frame with multiprocessing pipeline engine.
The method calls :class:`~pcapkit.protocols.pcap.Frame` to parse the PCAP frame data.
Should :exc:`EOFError` raised, it will toggle :attr:`self._mpkit.eof <Extractor._mpkit.eof>` as
:data:`True`. Finally, it will decendant :attr:`self.mpkit.counter <Extractor.mpkit.counter>` by
``1`` and closes the input source file (as the child process exits).
For the parsed :class:`~pcapkit.protocols.pcap.Frame` instance, the instant will first wait
until :attr:`self.mpkit.current <Extractor.mpkit.current>` is the same as
:attr:`self._frnum <Extractor._frnum>`, i.e. it's now time to process the parsed frame as in a
linear sequential order.
It will proceed by calling :meth:`_default_read_frame`, whilst temporarily assigning
:attr:`self.mpkit.trace <Extractor.mpkit.trace>` to :attr:`self._trace <Extractor._trace>` and
:attr:`self.mpkit.reassembly <Extractor.mpkit.reassembly>` to :attr:`self._reasm <Extractor._reasm>`
then put back.
Keyword Args:
mpfdp (multiprocessing.Queue): :class:`~multiprocessing.Queue` for multiprocessing file pointer (offset).
mpkit (multiprocessing.managers.SyncManager.Namespace):
:class:`~multiprocessing.managers.SyncManager.Namespace` instance as
:attr:`self._mpkit <Extractor._mpkit>`.
Raise:
EOFError: If :attr:`self._flag_e <pcapkit.foundation.extraction.Extractor._flag_e>`
is :data:`True`, as the parsing had finished.
"""
# check EOF
if self._flag_e:
raise EOFError
def _analyse_frame(*, frame, mpkit):
"""Analyse frame."""
# wait until ready
while mpkit.current != self._frnum:
time.sleep(random.randint(0, datetime.datetime.now().second) // 600)
# analysis and storage
# print(self._frnum, 'get')
self._trace = mpkit.trace
self._reasm = mpkit.reassembly
self._default_read_frame(frame=frame, mpkit=mpkit)
# print(self._frnum, 'analysed')
mpkit.trace = copy.deepcopy(self._trace)
mpkit.reassembly = copy.deepcopy(self._reasm)
# print(self._frnum, 'put')
# extract frame
try:
# extraction
frame = Frame(self._ifile, num=self._frnum, proto=self._dlink, layer=self._exlyr,
protocol=self._exptl, nanosecond=self._nnsec, mpkit=mpkit, mpfdp=mpfdp)
# analysis
_analyse_frame(frame=frame, mpkit=mpkit)
except EOFError:
mpkit.eof = True
finally:
mpkit.counter -= 1
self._ifile.close()
# print(self._frnum, 'done')
[docs] def _run_server(self, multiprocessing):
"""Use server multiprocessing to extract PCAP files.
Notes:
The basic concept of multiprocessing server engine is that we further separate the logic of
PCAP frame parsing and analysis/processing, comparing to the multiprocessing pipeline engine
(c.f. :meth:`_run_pipeline`).
We starts a *server* process to perform the datagram reassembly and TCP flow tracing, etc. of
all parsed PCAP frames, whilst parsing each PCAP frame in the same manner as in multiprocessing
pipeline engine, i.e. each frame per worker.
This method assigns :attr:`self._expkg <Extractor._expkg>` as :mod:`multiprocessing`, creates a file
pointer storage as :attr:`self._mpfdp <Extractor._mpfdp>`, manager context as
:attr:`self._mpmng <Extractor._mpmng>` and namespace as :attr:`self._mpkit <Extractor._mpkit>`. We will
also maintain the active process list :attr:`self._mpprc <Extractor._mpprc>` as in :meth:`_run_pipeline`.
It will also creates a :obj:`dict` as :attr:`self._mpbuf <Extractor._mpbuf>`, frame buffer (temporary
storage) for the server process to obtain the parsed frames; a :obj:`list` as
:attr:`self._mpfrm <Extractor._mpfrm>`, eventual frame storage; and another :obj:`list` as
:attr:`self._mprsm <Extractor._mprsm>`, storing the reassembly buffers :attr:`self._reasm <Extractor._reasm>`
before the server process exits.
In the namespace, we initiate number of (on duty) workers as ``counter``, pool of (ready) workers
as ``pool``, current frame number as ``current``, EOF flag as ``eof``, frame storage as ``frames``,
and ``trace`` for storing TCP flow tracer :attr:`self._trace <Extractor._trace>` before the server process
exits.
After initial setup, the method calls :meth:`record_header` to parse the PCAP global header and
*put* the file offset to :attr:`self._mpfdp <Extractor._mpfdp>` as the start of first frame. It will then
starts the server process :attr:`self._mpsrv <Extractor._mpsrv>` from :meth:`_server_analyse_frame`. Finally,
it starts the parsing of each PCAP frame.
During this phrase, it's a :token:`while <while_stmt>` clause until
:attr:`self._mpkit.eof <Extractor._mpkit.eof>` is set as :data:`True` then it calls :meth:`_update_eof` and
breaks. In the :token:`while <while_stmt>` clause, it maintains a :class:`multiprocessing.pool.Pool` like
worker pool. It checks the :attr:`self._mpkit.pool <Extractor._mpkit.pool>` for available workers and
:attr:`self._mpkit.counter <Extractor._mpkit.counter>` for active workers.
When starts a new worker, it first update the input file offset to the file offset as specified
in :attr:`self._mpfdp <Extractor._mpfdp>`. Then creates a child process running :meth:`_server_extract_frame`
with keyword arguments ``mpkit`` as :attr:`self._mpkit <Extractor._mpkit>`, `mpbuf` as
:attr:`self._mpbuf <Extractor._mpbuf>` and ``mpfdp`` as corresponding :class:`~multiprocessing.Queue` from
:attr:`self._mpfdp <Extractor._mpfdp>`. Later, it decendants the
:attr:`self._mpkit.pool <Extractor._mpkit.pool>` and increments the
:attr:`self._mpkit.counter <Extractor._mpkit.counter>`, both by ``1``. The child process will
be appended to :attr:`self._mpprc <Extractor._mpprc>`.
When the number of active workers is greater than or equal to :data:`CPU_CNT`, it waits and *join*
the leading child processes in :attr:`self._mpprc <Extractor._mpprc>` then removes their reference.
Args:
multiprocessing (types.ModuleType): The :mod:`multiprocessing` module.
Warns:
AttributeWarning: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>`
is :data:`False`, as multiprocessing engines do not support output.
Raises:
UnsupportedCall: If :attr:`self._flag_m <pcapkit.foundation.extraction.Extractor._flag_m>`
is :data:`False`, as such operation is not applicable.
"""
# pylint: disable=attribute-defined-outside-init
if not self._flag_m:
raise UnsupportedCall(f"Extractor(engine={self._exeng})' has no attribute '_run_server'")
if not self._flag_q:
self._flag_q = True
warnings.warn("'Extractor(engine=pipeline)' does not support output; "
f"'fout={self._ofnm}' ignored", AttributeWarning, stacklevel=stacklevel())
self._frnum = 1 # frame number (revised)
self._expkg = multiprocessing # multiprocessing module
self._mpsrv = NotImplemented # multiprocessing server process
self._mpprc = list() # multiprocessing process list
self._mpfdp = collections.defaultdict(multiprocessing.Queue) # multiprocessing file pointer
self._mpmng = multiprocessing.Manager() # multiprocessing manager
self._mpbuf = self._mpmng.dict() # multiprocessing frame dict
self._mpfrm = self._mpmng.list() # multiprocessing frame storage
self._mprsm = self._mpmng.list() # multiprocessing reassembly buffer
self._mpkit = self._mpmng.Namespace() # multiprocessing work kit
self._mpkit.counter = 0 # work count (on duty)
self._mpkit.pool = 1 # work pool (ready)
self._mpkit.eof = False # EOF flag
self._mpkit.trace = None # flow tracer
# preparation
self.record_header()
self._mpfdp[0].put(self._gbhdr.length)
self._mpsrv = multiprocessing.Process(
target=self._server_analyse_frame,
kwargs={'mpfrm': self._mpfrm, 'mprsm': self._mprsm, 'mpbuf': self._mpbuf, 'mpkit': self._mpkit}
)
self._mpsrv.start()
# extraction
while True:
# check EOF
if self._mpkit.eof:
self._update_eof()
break
# check counter
if self._mpkit.pool and self._mpkit.counter < CPU_CNT - 1:
# update file offset
self._ifile.seek(self._mpfdp.pop(self._frnum-1).get(), os.SEEK_SET)
# create worker
# print(self._frnum, 'start')
proc = multiprocessing.Process(
target=self._server_extract_frame,
kwargs={'mpkit': self._mpkit, 'mpbuf': self._mpbuf, 'mpfdp': self._mpfdp[self._frnum]}
)
# update status
self._mpkit.pool -= 1
self._mpkit.counter += 1
# start and record
proc.start()
self._frnum += 1
self._mpprc.append(proc)
# check buffer
if len(self._mpprc) >= CPU_CNT - 1:
[proc.join() for proc in self._mpprc[:-4]] # pylint: disable=expression-not-assigned
del self._mpprc[:-4]
[docs] def _server_extract_frame(self, *, mpfdp, mpkit, mpbuf):
"""Extract frame using multiprocessing server engine.
The method calls :class:`~pcapkit.protocols.pcap.Frame` to parse the PCAP frame data. The
parsed frame will be saved to ``mpbuf`` under the corresponding frame number
:attr:`self._frnum <Extractor._frnum>`.
Should :exc:`EOFError` raised, it will toggle :attr:`self._mpkit.eof <Extractor._mpkit.eof>`
as :data:`True`, and save :exc:`EOFError` object to ``mpbuf`` under the corresponding frame
number :attr:`self._frnum <Extractor._frnum>`.
Finally, it will decendant :attr:`self.mpkit.counter <Extractor.mpkit.counter>` by ``1`` and
closes the input source file (as the child process exits).
Args:
mpfdp (multiprocessing.Queue): :class:`~multiprocessing.Queue` for multiprocessing file pointer (offset).
mpkit (multiprocessing.managers.SyncManager.Namespace):
:class:`~multiprocessing.managers.SyncManager.Namespace` instance as :attr:`_mpkit`.
mpbuf (multiprocessing.managers.SyncManager.dict): Frame buffer (temporary storage) for the server process
:attr:`self._mpsrv <Extractor._mpsrv>` to obtain the parsed frames.
Raise:
EOFError: If :attr:`self._flag_e <pcapkit.foundation.extraction.Extractor._flag_e>`
is :data:`True`, as the parsing had finished.
"""
# check EOF
if self._flag_e:
raise EOFError
# extract frame
try:
frame = Frame(self._ifile, num=self._frnum, proto=self._dlink, layer=self._exlyr,
protocol=self._exptl, nanosecond=self._nnsec, mpkit=mpkit, mpfdp=mpfdp)
# frame._file = NotImplemented
mpbuf[self._frnum] = frame
except EOFError:
mpbuf[self._frnum] = EOFError
mpkit.eof = True
finally:
mpkit.counter -= 1
self._ifile.close()
# print(self._frnum, 'done')
[docs] def _server_analyse_frame(self, *, mpkit, mpfrm, mprsm, mpbuf):
"""Analyse frame using multiprocessing server engine.
This method starts a :token:`while <while_stmt>` clause. For each round, it will *pop* the frame
:attr:`self._frnum <Extractor._frnum>` from ``mpbuf`` then calls :meth:`_default_read_frame` to perform
datagram reassembly and TCP flow tracing, etc.
Once the frame popped is :exc:`EOFError`, i.e. the frame parsing had finished, it
breaks from the clause and updates ``mpfrm`` with :attr:`self._frame <Extractor._frame>`, ``mprsm`` with
:attr:`self._reasm <Extractor._reasm>`, and ``mpkit.trace`` with :attr:`self._trace <Extractor._trace>`.
Keyword Args:
mpkit (multiprocessing.managers.SyncManager.Namespace):
:class:`~multiprocessing.managers.SyncManager.Namespace` instance as :attr:`_mpkit`.
mpfrm (multiprocessing.managers.SyncManager.list): Frame storage.
mprsm (multiprocessing.managers.SyncManager.list): Reassembly buffers.
mpbuf (multiprocessing.managers.SyncManager.dict): Frame buffer (temporary storage) for the server process
:attr:`self._mpsrv <Extractor._mpsrv>` to obtain the parsed frames.
"""
while True:
# fetch frame
# print(self._frnum, 'trying')
frame = mpbuf.pop(self._frnum, None)
if frame is EOFError:
break
if frame is None:
continue
# print(self._frnum, 'get')
self._default_read_frame(frame=frame)
mpfrm += self._frame
mprsm += self._reasm
mpkit.trace = copy.deepcopy(self._trace)