Source code for pcapkit.protocols.internet.internet

# -*- coding: utf-8 -*-
# pylint: disable=bad-whitespace
"""root internet layer protocol

:mod:`pcapkit.protocols.internet.internet` contains :class:`~pcapkit.protocols.internet.internet.Internet`,
which is a base class for internet layer protocols, eg. :class:`~pcapkit.protocols.internet.ah.AH`,
:class:`~pcapkit.protocols.internet.ipsec.IPsec`, :class:`~pcapkit.protocols.internet.ipv4.IPv4`,
:class:`~pcapkit.protocols.internet.ipv6.IPv6`, :class:`~pcapkit.protocols.internet.ipx.IPX`, and etc.

"""
import collections
import importlib

from pcapkit.const.reg.ethertype import EtherType as ETHERTYPE
from pcapkit.const.reg.transtype import TransType as TP_PROTO
from pcapkit.corekit.protochain import ProtoChain
from pcapkit.protocols.protocol import Protocol
from pcapkit.utilities.decorators import beholder

__all__ = ['Internet', 'ETHERTYPE']


[docs]class Internet(Protocol): # pylint: disable=abstract-method """Abstract base class for internet layer protocol family.""" ########################################################################## # Defaults. ########################################################################## #: Layer of protocol. __layer__ = 'Internet' #: DefaultDict[int, Tuple[str, str]]: Protocol index mapping for decoding next layer, #: c.f. :meth:`self._decode_next_layer <pcapkit.protocols.protocol.Protocol._decode_next_layer>` #: & :meth:`self._import_next_layer <pcapkit.protocols.internet.link.Link._import_next_layer>`. __proto__ = collections.defaultdict(lambda: ('pcapkit.protocols.raw', 'Raw'), { 0: ('pcapkit.protocols.internet.hopopt', 'HOPOPT'), 4: ('pcapkit.protocols.internet.ipv4', 'IPv4'), 6: ('pcapkit.protocols.transport.tcp', 'TCP'), 17: ('pcapkit.protocols.transport.udp', 'UDP'), 41: ('pcapkit.protocols.internet.ipv6', 'IPv6'), 43: ('pcapkit.protocols.internet.ipv6_route', 'IPv6_Route'), 44: ('pcapkit.protocols.internet.ipv6_frag', 'IPv6_Frag'), 51: ('pcapkit.protocols.internet.ah', 'AH'), 60: ('pcapkit.protocols.internet.ipv6_opts', 'IPv6_Opts'), 111: ('pcapkit.protocols.internet.ipx', 'IPX'), 135: ('pcapkit.protocols.internet.mh', 'MH'), 139: ('pcapkit.protocols.internet.hip', 'HIP'), }) ########################################################################## # Properties. ########################################################################## # protocol layer @property def layer(self): """Protocol layer. :rtype: Literal['Internet'] """ return self.__layer__ ########################################################################## # Utilities. ##########################################################################
[docs] def _read_protos(self, size): """Read next layer protocol type. Arguments: size (int): buffer size Returns: pcapkit.const.reg.transtype.TransType: next layer's protocol enumeration """ _byte = self._read_unpack(size) _prot = TP_PROTO.get(_byte) return _prot
[docs] def _decode_next_layer(self, dict_, proto=None, length=None, *, version=4, ipv6_exthdr=None): # pylint: disable=arguments-differ """Decode next layer extractor. Arguments: dict_ (dict): info buffer proto (int): next layer protocol index length (int): valid (*non-padding*) length Keyword Arguments: version (Literal[4, 6]): IP version ipv6_exthdr (pcapkit.corekit.protochain.ProtoChain): protocol chain of IPv6 extension headers Returns: dict: current protocol with next layer extracted """ if self._onerror: next_ = beholder(self._import_next_layer)(self, proto, length, version=version) else: next_ = self._import_next_layer(proto, length, version=version) info, chain = next_.info, next_.protochain # make next layer protocol name layer = next_.alias.lower() # proto = next_.__class__.__name__ # write info and protocol chain into dict dict_[layer] = info self._next = next_ # pylint: disable=attribute-defined-outside-init if ipv6_exthdr is not None: for proto_cls in reversed(ipv6_exthdr): chain = ProtoChain(proto_cls.__class__, proto_cls.alias, basis=chain) self._protos = ProtoChain(self.__class__, self.alias, basis=chain) # pylint: disable=attribute-defined-outside-init return dict_
[docs] def _import_next_layer(self, proto, length=None, *, version=4, extension=False): # pylint: disable=arguments-differ """Import next layer extractor. This method currently supports following protocols as registered in :data:`~pcapkit.const.reg.transtype.TransType`: .. list-table:: :header-rows: 1 * - ``proto`` - Class * - 0 - :class:`~pcapkit.protocols.internet.hopopt.HOPOPT` * - 4 - :class:`~pcapkit.protocols.internet.ipv4.IPv4` * - 6 - :class:`~pcapkit.protocols.transport.tcp.TCP` * - 17 - :class:`~pcapkit.protocols.transport.udp.UDP` * - 41 - :class:`~pcapkit.protocols.internet.ipv6.IPv6` * - 43 - :class:`~pcapkit.protocols.internet.ipv6_route.IPv6_Route` * - 44 - :class:`~pcapkit.protocols.internet.ipv6_frag.IPv6_Frag` * - 51 - :class:`~pcapkit.protocols.internet.ah.AH` * - 60 - :class:`~pcapkit.protocols.internet.ipv6_opts.IPv6_Opts` * - 111 - :class:`~pcapkit.protocols.internet.ipx.IPX` * - 135 - :class:`~pcapkit.protocols.internet.mh.MH` * - 139 - :class:`~pcapkit.protocols.internet.hip.HIP` Arguments: proto (int): next layer protocol index length (int): valid (*non-padding*) length Keyword Arguments: version (Literal[4, 6]): IP protocol version extension (bool): if is extension header Returns: pcapkit.protocols.protocol.Protocol: instance of next layer """ if length == 0: from pcapkit.protocols.null import NoPayload as protocol # pylint: disable=import-outside-toplevel elif self._sigterm or proto == 59: # No Next Header for IPv6 from pcapkit.protocols.raw import Raw as protocol # pylint: disable=import-outside-toplevel else: module, name = self.__proto__[proto] protocol = getattr(importlib.import_module(module), name) next_ = protocol(self._file, length, version=version, extension=extension, error=self._onerror, layer=self._exlayer, protocol=self._exproto) return next_