Source code for pcapkit.foundation.analysis

# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel
"""analyser for application layer

:mod:`pcapkit.foundation.analysis` works as a header quarter to
analyse and match application layer protocol. Then, call
corresponding modules and functions to extract the attributes.

"""
import os

from pcapkit.protocols.raw import Raw
from pcapkit.utilities.decorators import seekset_ng
from pcapkit.utilities.exceptions import ProtocolError

###############################################################################
# from pcapkit.protocols.application.ftp import FTP
# from pcapkit.protocols.application.httpv1 import HTTPv1
# from pcapkit.protocols.application.httpv2 import HTTPv2
###############################################################################

__all__ = ['analyse']


[docs]def analyse(file, length=None, *, termination=False): """Analyse application layer packets. Args: file (io.BytesIO): source data stream length (Optional[int]): packet length Keyword Args: termination (bool): If terminate parsing application layer protocol. Returns: Protocol: Parsed application layer protocol. Notes: Currently, the analysis processes in following order: 1. :class:`~pcapkit.protocols.application.ftp.FTP` 2. :class:`HTTP/1.* <pcapkit.protocols.application.httpv1.HTTPv1>` 3. :class:`HTTP/2 <pcapkit.protocols.application.httpv2.HTTPv2>` and :class:`~pcapkit.protocols.raw.Raw` as the fallback result. """ seekset = file.tell() if not termination: # FTP analysis flag, ftp = _analyse_ftp(file, length, seekset=seekset) if flag: return ftp # HTTP/1.* analysis flag, http = _analyse_httpv1(file, length, seekset=seekset) if flag: return http # HTTP/2 analysis flag, http = _analyse_httpv2(file, length, seekset=seekset) if flag: return http # backup file offset file.seek(seekset, os.SEEK_SET) # raw packet analysis return Raw(file, length)
[docs]@seekset_ng def _analyse_httpv1(file, length=None, *, seekset=os.SEEK_SET): # pylint: disable=unused-argument """Analyse HTTP/1.* packet. Args: file (io.BytesIO): source data stream length (Optional[int]): packet length Keyword Args: seekset (int): original file offset Returns: Tuple[bool, Optional[HTTPv1]]: If the packet is HTTP/1.*, returns :data:`True` and parsed HTTP/1.* packet; otherwise returns :data:`False` and :data:`None`. """ try: from pcapkit.protocols.application.httpv1 import HTTPv1 http = HTTPv1(file, length) except ProtocolError: return False, None return True, http
[docs]@seekset_ng def _analyse_httpv2(file, length, *, seekset=os.SEEK_SET): # pylint: disable=unused-argument """Analyse HTTP/2 packet. Args: file (io.BytesIO): source data stream length (Optional[int]): packet length Keyword Args: seekset (int): original file offset Returns: Tuple[bool, Optional[HTTPv1]]: If the packet is HTTP/2, returns :data:`True` and parsed HTTP/2 packet; otherwise returns :data:`False` and :data:`None`. """ try: from pcapkit.protocols.application.httpv2 import HTTPv2 http = HTTPv2(file, length) except ProtocolError: return False, None return True, http
[docs]@seekset_ng def _analyse_ftp(file, length, *, seekset=os.SEEK_SET): # pylint: disable=unused-argument """Analyse FTP packet. Args: file (io.BytesIO): source data stream length (Optional[int]): packet length Keyword Args: seekset (int): original file offset Returns: Tuple[bool, Optional[HTTPv1]]: If the packet is FTP, returns :data:`True` and parsed FTP packet; otherwise returns :data:`False` and :data:`None`. """ try: from pcapkit.protocols.application.ftp import FTP ftp = FTP(file, length) except ProtocolError: return False, None return True, ftp