Source code for pcapkit.protocols.internet.ipv6_route

# -*- coding: utf-8 -*-
"""IPv6-Route - Routing Header for IPv6
==========================================

:mod:`pcapkit.protocols.internet.ipv6_route` contains
:class:`~pcapkit.protocols.internet.ipv6_route.IPv6_Route`
only, which implements extractor for Routing Header for IPv6
(IPv6-Route) [*]_, whose structure is described as below:

======= ========= ==================== ===============================
Octets      Bits        Name                    Description
======= ========= ==================== ===============================
  0           0   ``route.next``            Next Header
  1           8   ``route.length``          Header Extensive Length
  2          16   ``route.type``            Routing Type
  3          24   ``route.seg_left``        Segments Left
  4          32   ``route.data``            Type-Specific Data
======= ========= ==================== ===============================

.. [*] https://en.wikipedia.org/wiki/IPv6_packet#Routing

"""
import collections
import ipaddress
from typing import TYPE_CHECKING, overload

from pcapkit.const.ipv6.routing import Routing as RegType_Routing
from pcapkit.const.reg.transtype import TransType as RegType_TransType
from pcapkit.protocols.data.internet.ipv6_route import RPL as DataType_RPL
from pcapkit.protocols.data.internet.ipv6_route import IPv6_Route as DataType_IPv6_Route
from pcapkit.protocols.data.internet.ipv6_route import SourceRoute as DataType_SourceRoute
from pcapkit.protocols.data.internet.ipv6_route import Type2 as DataType_Type2
from pcapkit.protocols.data.internet.ipv6_route import UnknownType as DataType_UnknownType
from pcapkit.protocols.internet.internet import Internet
from pcapkit.utilities.exceptions import ProtocolError, UnsupportedCall

if TYPE_CHECKING:
    from ipaddress import IPv6Address
    from typing import Any, BinaryIO, Callable, DefaultDict, NoReturn, Optional

    from typing_extensions import Literal

    from pcapkit.corekit.protochain import ProtoChain
    from pcapkit.protocols.data.internet.ipv6_route import RoutingType as DataType_RoutingType
    from pcapkit.protocols.protocol import Protocol

    TypeParser = Callable[['IPv6_Route', int], DataType_RoutingType]

__all__ = ['IPv6_Route']


[docs]class IPv6_Route(Internet[DataType_IPv6_Route]): """This class implements Routing Header for IPv6. This class currently supports parsing of the following Routing Header for IPv6 routing data types, which are registered in the :attr:`self.__routing__ <pcapkit.protocols.internet.ipv6_route.IPv6_Route.__routing__>` attribute: .. list-table:: :header-rows: 1 * - Routing Code - Data Parser * - :attr:`~pcapkit.const.ipv6.routing.Routing.Source_Route` - :meth:`~pcapkit.protocols.internet.ipv6_route.IPv6_Route._read_data_type_src` * - :attr:`~pcapkit.const.ipv6.routing.Routing.Type_2_Routing_Header` - :meth:`~pcapkit.protocols.internet.ipv6_route.IPv6_Route._read_data_type_2` * - :attr:`~pcapkit.const.ipv6.routing.Routing.RPL_Source_Route_Header` - :meth:`~pcapkit.protocols.internet.ipv6_route.IPv6_Route._read_data_type_rpl` """ ########################################################################## # Defaults. ########################################################################## #: DefaultDict[RegType_Routing, str | TypeParser]: Type code to method #: mapping. Method names are expected to be referred to the class by #: ``_read_data_type_${name}``, and if such name not found, the value should #: then be a method that can parse the routing type by itself. __routing__ = collections.defaultdict( lambda: 'none', { RegType_Routing.Source_Route: 'src', # [RFC 5095] DEPRECATED RegType_Routing.Type_2_Routing_Header: '2', # [RFC 6275] RegType_Routing.RPL_Source_Route_Header: 'rpl', # [RFC 6554] }, ) # type: DefaultDict[int, str | TypeParser] ########################################################################## # Properties. ########################################################################## @property def name(self) -> 'Literal["Routing Header for IPv6"]': """Name of current protocol.""" return 'Routing Header for IPv6' @property def alias(self) -> 'Literal["IPv6-Route"]': """Acronym of corresponding protocol.""" return 'IPv6-Route' @property def length(self) -> 'int': """Header length of current protocol.""" return self._info.length @property def payload(self) -> 'Protocol | NoReturn': """Payload of current instance. Raises: UnsupportedCall: if the protocol is used as an IPv6 extension header :rtype: pcapkit.protocols.protocol.Protocol """ if self._extf: raise UnsupportedCall(f"'{self.__class__.__name__}' object has no attribute 'payload'") return self._next @property def protocol(self) -> 'Optional[str] | NoReturn': """Name of next layer protocol (if any). Raises: UnsupportedCall: if the protocol is used as an IPv6 extension header """ if self._extf: raise UnsupportedCall(f"'{self.__class__.__name__}' object has no attribute 'protocol'") return super().protocol @property def protochain(self) -> 'ProtoChain | NoReturn': """Protocol chain of current instance. Raises: UnsupportedCall: if the protocol is used as an IPv6 extension header """ if self._extf: raise UnsupportedCall(f"'{self.__class__.__name__}' object has no attribute 'protochain'") return super().protochain ########################################################################## # Methods. ##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, *, extension: 'bool' = False, # pylint: disable=arguments-differ **kwargs: 'Any') -> 'DataType_IPv6_Route': # pylint: disable=unused-argument """Read Routing Header for IPv6. Structure of IPv6-Route header [:rfc:`8200`][:rfc:`5095`]: .. code-block:: text +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Next Header | Hdr Ext Len | Routing Type | Segments Left | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | . . . type-specific data . . . | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length: Length of packet data. extension: If the packet is used as an IPv6 extension header. **kwargs: Arbitrary keyword arguments. Returns: Parsed packet data. """ if length is None: length = len(self) _next = self._read_protos(1) _hlen = self._read_unpack(1) _type = self._read_unpack(1) _left = self._read_unpack(1) ipv6_route = DataType_IPv6_Route( next=_next, length=(_hlen + 1) * 8, type=RegType_Routing.get(_type), seg_left=_left, ) _dlen = _hlen * 8 - 4 if _dlen: _name = self.__routing__[ipv6_route.type] # type: str | TypeParser if isinstance(_name, str): _name = f'_read_data_type_{_name.lower()}' _meth = getattr(self, _name, self._read_data_type_none) # type: Callable[[int], DataType_RoutingType] _data = _meth(_dlen) else: _data = _name(self, _dlen) ipv6_route.__update__(**_data) if extension: return ipv6_route return self._decode_next_layer(ipv6_route, _next, length - ipv6_route.length)
[docs] def make(self, **kwargs: 'Any') -> 'NoReturn': """Make (construct) packet data. Args: **kwargs: Arbitrary keyword arguments. Returns: Constructed packet data. """ raise NotImplementedError
[docs] @classmethod def register_routing(cls, code: 'RegType_Routing', meth: 'str | TypeParser') -> 'None': """Register an routing data parser. Args: code: IPv6-Route data type code. meth: Method name or callable to parse the data. """ cls.__routing__[code] = meth
########################################################################## # Data models. ########################################################################## @overload def __post_init__(self, file: 'BinaryIO', length: 'Optional[int]' = ..., *, # pylint: disable=arguments-differ extension: 'bool' = ..., **kwargs: 'Any') -> 'None': ... @overload def __post_init__(self, **kwargs: 'Any') -> 'None': ... # pylint: disable=arguments-differ
[docs] def __post_init__(self, file: 'Optional[BinaryIO]' = None, length: 'Optional[int]' = None, *, # pylint: disable=arguments-differ extension: 'bool' = False, **kwargs: 'Any') -> 'None': """Post initialisation hook. Args: file: Source packet stream. length: Length of packet data. Keyword Args: extension: If the protocol is used as an IPv6 extension header. **kwargs: Arbitrary keyword arguments. See Also: For construction argument, please refer to :meth:`make`. """ #: bool: If the protocol is used as an IPv6 extension header. self._extf = extension # call super __post_init__ super().__post_init__(file, length, extension=extension, **kwargs) # type: ignore[arg-type]
def __length_hint__(self) -> 'Literal[4]': """Return an estimated length for the object.""" return 4
[docs] @classmethod def __index__(cls) -> 'RegType_TransType': # pylint: disable=invalid-index-returned """Numeral registry index of the protocol. Returns: Numeral registry index of the protocol in `IANA`_. .. _IANA: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml """ return RegType_TransType.IPv6_Route # type: ignore[return-value]
########################################################################## # Utilities. ##########################################################################
[docs] def _read_data_type_none(self, length: 'int') -> 'DataType_UnknownType': """Read IPv6-Route unknown type data. Structure of IPv6-Route unknown type data [:rfc:`8200`][:rfc:`5095`]: .. code-block:: text +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Next Header | Hdr Ext Len | Routing Type | Segments Left | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | . . . type-specific data . . . | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length: route data length Returns: Parsed route data. """ _data = self._read_fileng(length) data = DataType_UnknownType( data=_data, ) return data
[docs] def _read_data_type_src(self, length: 'int') -> 'DataType_SourceRoute': """Read IPv6-Route Source Route data. Structure of IPv6-Route Source Route data [:rfc:`5095`]: .. code-block:: text +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Next Header | Hdr Ext Len | Routing Type=0| Segments Left | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Reserved | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + + | | + Address[1] + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + + | | + Address[2] + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ . . . . . . . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + + | | + Address[n] + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length: route data length Returns: Parsed route data. """ _resv = self._read_fileng(4) _addr = [] # type: list[IPv6Address] for _ in range((length - 4) // 16): _addr.append(ipaddress.ip_address(self._read_fileng(16))) # type: ignore[arg-type] data = DataType_SourceRoute( ip=tuple(_addr), ) return data
[docs] def _read_data_type_2(self, length: 'int') -> 'DataType_Type2': """Read IPv6-Route Type 2 data. Structure of IPv6-Route Type 2 data [:rfc:`6275`]: .. code-block:: text +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Next Header | Hdr Ext Len=2 | Routing Type=2|Segments Left=1| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Reserved | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + + | | + Home Address + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length: route data length Returns: Parsed route data. Raises: ProtocolError: If ``length`` is **NOT** ``20``. """ if length != 20: raise ProtocolError(f'{self.alias}: [TypeNo 2] invalid format') _resv = self._read_fileng(4) _home = self._read_fileng(16) data = DataType_Type2( ip=ipaddress.ip_address(_home), # type: ignore[arg-type] # type: ignore[arg-type] ) return data
[docs] def _read_data_type_rpl(self, length: 'int') -> 'DataType_RPL': """Read IPv6-Route RPL Source data. Structure of IPv6-Route RPL Source data [:rfc:`6554`]: .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Next Header | Hdr Ext Len | Routing Type | Segments Left | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | CmprI | CmprE | Pad | Reserved | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | . . . Addresses[1..n] . . . | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length: route data length Returns: Parsed route data. Raises: ProtocolError: If ``length`` is **NOT** ``20``. """ _cmpr = self._read_binary(1) _padr = self._read_binary(1) _resv = self._read_fileng(2) _inti = int(_cmpr[:4], base=2) _inte = int(_cmpr[4:], base=2) _plen = int(_padr[:4], base=2) _ilen = 16 - _inti _elen = 16 - _inte _addr = [] # type: list[IPv6Address] for _ in range(((length - 4) - _elen - _plen) // _ilen): _addr.append(ipaddress.ip_address(self._read_fileng(_ilen))) # type: ignore[arg-type] _addr.append(ipaddress.ip_address(self._read_fileng(_elen))) # type: ignore[arg-type] _pads = self._read_fileng(_plen) data = DataType_RPL( cmpr_i=_inti, cmpr_e=_inte, pad=_plen, ip=tuple(_addr), ) return data