Source code for pcapkit.protocols.internet.internet

# -*- coding: utf-8 -*-
"""Base Protocol
===================

:mod:`pcapkit.protocols.internet.internet` contains :class:`~pcapkit.protocols.internet.internet.Internet`,
which is a base class for internet layer protocols, eg. :class:`~pcapkit.protocols.internet.ah.AH`,
:class:`~pcapkit.protocols.internet.ipsec.IPsec`, :class:`~pcapkit.protocols.internet.ipv4.IPv4`,
:class:`~pcapkit.protocols.internet.ipv6.IPv6`, :class:`~pcapkit.protocols.internet.ipx.IPX`, and etc.

"""
import collections
import importlib
from typing import TYPE_CHECKING, Generic, cast

from pcapkit.const.reg.transtype import TransType as RegType_TransType
from pcapkit.corekit.protochain import ProtoChain
from pcapkit.protocols.protocol import PT, Protocol
from pcapkit.utilities.decorators import beholder

if TYPE_CHECKING:
    from typing import Optional, Type

    from typing_extensions import Literal

__all__ = ['Internet']


[docs]class Internet(Protocol[PT], Generic[PT]): # pylint: disable=abstract-method """Abstract base class for internet layer protocol family. This class currently supports parsing of the following protocols, which are registered in the :attr:`self.__proto__ <pcapkit.protocols.internet.internet.Internet.__proto__>` attribute: .. list-table:: :header-rows: 1 * - Index - Protocol * - :attr:`~pcapkit.const.reg.transtype.TransType.HOPOPT` - :class:`pcapkit.protocols.internet.hopopt.HOPOPT` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv4` - :class:`pcapkit.protocols.internet.ipv4.IPv4` * - :attr:`~pcapkit.const.reg.transtype.TransType.TCP` - :class:`pcapkit.protocols.transport.tcp.TCP` * - :attr:`~pcapkit.const.reg.transtype.TransType.UDP` - :class:`pcapkit.protocols.transport.udp.UDP` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6` - :class:`pcapkit.protocols.internet.ipv6.IPv6` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6_Route` - :class:`pcapkit.protocols.internet.ipv6_route.IPv6_Route` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6_Frag` - :class:`pcapkit.protocols.internet.ipv6_frag.IPv6_Frag` * - :attr:`~pcapkit.const.reg.transtype.TransType.AH` - :class:`pcapkit.protocols.internet.ah.AH` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6_NoNxt` - :class:`pcapkit.protocols.misc.raw.Raw` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6_Opts` - :class:`pcapkit.protocols.internet.ipv6_opts.IPv6_Opts` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPX_in_IP` - :class:`pcapkit.protocols.internet.ipx.IPX` * - :attr:`~pcapkit.const.reg.transtype.TransType.Mobility_Header` - :class:`pcapkit.protocols.internet.mh.MH` * - :attr:`~pcapkit.const.reg.transtype.TransType.HIP` - :class:`pcapkit.protocols.internet.hip.HIP` """ ########################################################################## # Defaults. ########################################################################## #: Layer of protocol. __layer__ = 'Internet' # type: Literal['Internet'] #: DefaultDict[int, Tuple[str, str]]: Protocol index mapping for decoding next layer, #: c.f. :meth:`self._decode_next_layer <pcapkit.protocols.internet.internet.Internet._decode_next_layer>` #: & :meth:`self._import_next_layer <pcapkit.protocols.internet.internet.Internet._import_next_layer>`. __proto__ = collections.defaultdict( lambda: ('pcapkit.protocols.misc.raw', 'Raw'), { RegType_TransType.HOPOPT: ('pcapkit.protocols.internet.hopopt', 'HOPOPT'), RegType_TransType.IPv4: ('pcapkit.protocols.internet.ipv4', 'IPv4'), RegType_TransType.TCP: ('pcapkit.protocols.transport.tcp', 'TCP'), RegType_TransType.UDP: ('pcapkit.protocols.transport.udp', 'UDP'), RegType_TransType.IPv6: ('pcapkit.protocols.internet.ipv6', 'IPv6'), RegType_TransType.IPv6_Route: ('pcapkit.protocols.internet.ipv6_route', 'IPv6_Route'), RegType_TransType.IPv6_Frag: ('pcapkit.protocols.internet.ipv6_frag', 'IPv6_Frag'), RegType_TransType.AH: ('pcapkit.protocols.internet.ah', 'AH'), RegType_TransType.IPv6_NoNxt: ('pcapkit.protocols.misc.raw', 'Raw'), RegType_TransType.IPv6_Opts: ('pcapkit.protocols.internet.ipv6_opts', 'IPv6_Opts'), RegType_TransType.IPX_in_IP: ('pcapkit.protocols.internet.ipx', 'IPX'), RegType_TransType.Mobility_Header: ('pcapkit.protocols.internet.mh', 'MH'), RegType_TransType.HIP: ('pcapkit.protocols.internet.hip', 'HIP'), }, ) ########################################################################## # Properties. ########################################################################## # protocol layer @property def layer(self) -> 'Literal["Internet"]': """Protocol layer.""" return self.__layer__ ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def register(cls, code: 'RegType_TransType', module: str, class_: str) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. Arguments: code: protocol code as in :class:`~pcapkit.const.reg.transtype.TransType` module: module name class\_: class name """ cls.__proto__[code] = (module, class_)
########################################################################## # Utilities. ##########################################################################
[docs] def _read_protos(self, size: 'int') -> 'RegType_TransType': """Read next layer protocol type. Arguments: size: buffer size Returns: Next layer's protocol enumeration. """ _byte = self._read_unpack(size) _prot = RegType_TransType.get(_byte) return _prot
[docs] def _decode_next_layer(self, dict_: 'PT', proto: 'Optional[int]' = None, # pylint: disable=arguments-differ length: 'Optional[int]' = None, *, version: 'Literal[4, 6]' = 4, ipv6_exthdr: 'Optional[ProtoChain]' = None) -> 'PT': r"""Decode next layer extractor. Arguments: dict\_: info buffer proto: next layer protocol index length: valid (*non-padding*) length version: IP version ipv6_exthdr: protocol chain of IPv6 extension headers Returns: Current protocol with next layer extracted. """ next_ = self._import_next_layer(proto, length, version=version) # type: ignore[misc,call-arg,arg-type] info, chain = next_.info, next_.protochain # make next layer protocol name layer = next_.info_name # proto = next_.__class__.__name__ # write info and protocol chain into dict dict_.__update__([(layer, info)]) self._next = next_ # pylint: disable=attribute-defined-outside-init if ipv6_exthdr is not None: if chain is not None: chain = ipv6_exthdr + chain else: chain = ipv6_exthdr # type: ignore[unreachable] self._protos = ProtoChain(self.__class__, self.alias, basis=chain) # pylint: disable=attribute-defined-outside-init return dict_
[docs] @beholder # type: ignore[arg-type] def _import_next_layer(self, proto: 'int', length: 'Optional[int]' = None, *, # pylint: disable=arguments-differ version: 'Literal[4, 6]' = 4, extension: 'bool' = False) -> 'Protocol': """Import next layer extractor. Arguments: proto: next layer protocol index length: valid (*non-padding*) length version: IP protocol version extension: if is extension header Returns: Instance of next layer. """ if TYPE_CHECKING: protocol: 'Type[Protocol]' if length is not None and length == 0: from pcapkit.protocols.misc.null import NoPayload as protocol # type: ignore[no-redef] # isort: skip # pylint: disable=import-outside-toplevel elif self._sigterm: from pcapkit.protocols.misc.raw import Raw as protocol # type: ignore[no-redef] # isort: skip # pylint: disable=import-outside-toplevel else: module, name = self.__proto__[proto] protocol = cast('Type[Protocol]', getattr(importlib.import_module(module), name)) next_ = protocol(self._file, length, version=version, extension=extension, # type: ignore[abstract] alias=proto, layer=self._exlayer, protocol=self._exproto) return next_