Source code for pcapkit.protocols.transport.transport

# -*- coding: utf-8 -*-
"""Base Protocol
===================

:mod:`pcapkit.protocols.transport.transport` contains
:class:`~pcapkit.protocols.transport.transport.Transport`,
which is a base class for transport layer protocols, eg.
:class:`~pcapkit.protocols.transport.transport.tcp.TCP` and
:class:`~pcapkit.protocols.transport.transport.udp.UDP`.

"""
import importlib
import io
from typing import TYPE_CHECKING, Generic, cast

from pcapkit.protocols.protocol import PT, Protocol
from pcapkit.utilities.exceptions import StructError, UnsupportedCall, stacklevel
from pcapkit.utilities.logging import DEVMODE, logger

if TYPE_CHECKING:
    from typing import Any, Optional, Type

    from typing_extensions import Literal

__all__ = ['Transport']


[docs]class Transport(Protocol[PT], Generic[PT]): # pylint: disable=abstract-method """Abstract base class for transport layer protocol family.""" ########################################################################## # Defaults. ########################################################################## #: Layer of protocol. __layer__ = 'Transport' # type: Literal['Transport'] ########################################################################## # Properties. ########################################################################## # protocol layer @property def layer(self) -> 'Literal["Transport"]': """Protocol layer.""" return self.__layer__ ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def register(cls, code: 'int', module: str, class_: str) -> 'None': """Register a new protocol class. Important: This method must be called from a non-abstract class, as the protocol map should be associated directly with specific transport layer protocol type. Arguments: code: port number module: module name class_: class name Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. """ if cls is Transport: raise UnsupportedCall(f'{cls.__name__} is an abstract class') cls.__proto__[code] = (module, class_)
[docs] @classmethod def analyze(cls, ports: 'tuple[int, int]', payload: 'bytes', **kwargs: 'Any') -> 'Protocol': # type: ignore[override] # pylint: disable=arguments-renamed """Analyse packet payload. Args: ports: Source & destination port numbers. payload: Packet payload. **kwargs: Arbitrary keyword arguments. Returns: Parsed payload as a :class:`~pcapkit.protocols.protocol.Protocol` instance. """ if ports[0] in cls.__proto__: module, name = cls.__proto__[ports[0]] else: module, name = cls.__proto__[ports[1]] protocol = cast('Type[Protocol]', getattr(importlib.import_module(module), name)) payload_io = io.BytesIO(payload) try: report = protocol(payload_io, len(payload), **kwargs) # type: ignore[abstract] except Exception as exc: if isinstance(exc, StructError) and exc.eof: # pylint: disable=no-member from pcapkit.protocols.misc.null import NoPayload as protocol # type: ignore[no-redef] # pylint: disable=import-outside-toplevel # isort:skip else: from pcapkit.protocols.misc.raw import Raw as protocol # type: ignore[no-redef] # pylint: disable=import-outside-toplevel # isort:skip # error = traceback.format_exc(limit=1).strip().rsplit(os.linesep, maxsplit=1)[-1] # log error logger.error(str(exc), exc_info=exc, stack_info=DEVMODE, stacklevel=stacklevel()) report = protocol(payload_io, len(payload), **kwargs) # type: ignore[abstract] return report
########################################################################## # Utilities. ##########################################################################
[docs] def _decode_next_layer(self, dict_: 'PT', ports: 'tuple[int, int]', length: 'Optional[int]' = None) -> 'PT': # type: ignore[override] # pylint: disable=arguments-renamed """Decode next layer protocol. The method will check if the next layer protocol is supported based on the source and destination port numbers. We will use the lower port number from both ports as the primary key to lookup the next layer. Arguments: dict_: info buffer ports: source & destination port numbers length: valid (*non-padding*) length Returns: Current protocol with next layer extracted. """ sort_port = sorted(ports) proto = sort_port[0] if sort_port[0] in self.__proto__ else sort_port[1] return super()._decode_next_layer(dict_, proto, length)