Source code for pcapkit.foundation.analysis
# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel
"""analyser for application layer
:mod:`pcapkit.foundation.analysis` works as a header quarter to
analyse and match application layer protocol. Then, call
corresponding modules and functions to extract the attributes.
"""
import importlib
import os
from pcapkit.protocols.raw import Raw
from pcapkit.utilities.decorators import seekset_ng
from pcapkit.utilities.exceptions import ProtocolError
###############################################################################
# from pcapkit.protocols.application.ftp import FTP
# from pcapkit.protocols.application.httpv1 import HTTPv1
# from pcapkit.protocols.application.httpv2 import HTTPv2
###############################################################################
__all__ = ['analyse']
#: List of protocols supported by the analyser.
ANALYSE_PROTO = [
('pcapkit.protocols.application.ftp', 'FTP'),
('pcapkit.protocols.application.httpv1', 'HTTPv1'),
('pcapkit.protocols.application.httpv2', 'HTTPv2'),
]
[docs]def analyse(file, length=None, *, termination=False):
"""Analyse application layer packets.
Args:
file (io.BytesIO): source data stream
length (Optional[int]): packet length
Keyword Args:
termination (bool): If terminate parsing application layer protocol.
Returns:
Protocol: Parsed application layer protocol.
Notes:
Currently, the analysis processes in following order:
1. :class:`~pcapkit.protocols.application.ftp.FTP`
2. :class:`HTTP/1.* <pcapkit.protocols.application.httpv1.HTTPv1>`
3. :class:`HTTP/2 <pcapkit.protocols.application.httpv2.HTTPv2>`
and :class:`~pcapkit.protocols.raw.Raw` as the fallback result.
See Also:
The analysis processes order is defined by :data:`~pcapkit.foundation.analysis.ANALYSE_PROTO`.
"""
seekset = file.tell()
if not termination:
for (module, name) in ANALYSE_PROTO:
try:
protocol = getattr(importlib.import_module(module), name)
except (ImportError, AttributeError):
continue
packet = _analyse(protocol, file, length, seekset=seekset)
if packet is None:
continue
return packet
# backup file offset
file.seek(seekset, os.SEEK_SET)
# raw packet analysis
return Raw(file, length)
[docs]@seekset_ng
def _analyse(protocol, file, length=None, *, seekset=os.SEEK_SET): # pylint: disable=unused-argument
"""Analyse packet.
Args:
protocol (Protocol): target protocol class
file (io.BytesIO): source data stream
length (Optional[int]): packet length
Keyword Args:
seekset (int): original file offset
Returns:
Optional[Protocol]: If the packet is parsed successfully,
returns the parsed packet; otherwise returns :data:`None`.
"""
try:
packet = protocol(file, length)
except ProtocolError:
packet = None
return packet
[docs]def register(module, class_, *, index=None):
"""Register a new protocol class.
Arguments:
module (str): module name
class_ (str): class name
Keyword Arguments:
index (Optional[int]): Index of the protocol class
when inserted to :data:`~pcapkit.foundation.analysis.ANALYSE_PROTO`.
Notes:
The full qualified class name of the new protocol class
should be as ``{module}.{class_}``.
"""
if index is None:
ANALYSE_PROTO.append((module, class_))
else:
ANALYSE_PROTO.insert(index, (module, class_))