Source code for pcapkit.corekit.protochain

# -*- coding: utf-8 -*-
"""protocol chain collection

:mod:`pcapkit.corekit.protochain` contains special protocol
collection class :class:`~pcapkit.corekit.protochain.ProtoChain`.

"""
import collections.abc
import contextlib
import numbers
import re

from pcapkit.corekit.infoclass import Info
from pcapkit.utilities.compat import Collection, cached_property
from pcapkit.utilities.exceptions import IndexNotFound, IntError

###############################################################################
# from pcapkit.protocols.protocol import Protocol
###############################################################################

__all__ = ['ProtoChain']


[docs]class _ProtoList(Collection): """List of protocol classes for :class:`ProtoChain`.""" @property def data(self): """Protocol data. :rtype: List[pcapkit.protocols.protocol.Protocol] """ return self.__data__
[docs] def __init__(self, data=None, *, base=None): """Initialisation. Args: data (Optional[pcapkit.protocols.protocol.Protocol]): New protocol class on the top stack. Keyword Args: base (Union[pcapkit.corekit.protochain._ProtoList, List[pcapkit.protocols.protocol.Protocol]]): Original protocol class chain as base stacks. """ self.__data__ = list() if data is not None: self.__data__.append(data) if base is not None: if isinstance(base, _ProtoList): self.__data__.extend(base.data) else: self.__data__.extend(base)
[docs] def __len__(self): """Length of the protocol chain. :rtype: int """ return len(self.__data__)
[docs] def __iter__(self): """Iterate through the protocol chain. :rtype: Iterator[pcapkit.protocols.protocol.Protocol] """ return iter(self.__data__)
[docs] def __contains__(self, x): """Returns if ``x`` is in the chain. Args: x (Union[str, pcapkit.protocols.protocol.Protocol, Type[pcapkit.protocols.protocol, Protocol]]): name to search Returns: bool: if ``x`` is in the chain """ from pcapkit.protocols.protocol import Protocol # pylint: disable=import-outside-toplevel try: flag = issubclass(x, Protocol) except TypeError: flag = issubclass(type(x), Protocol) if flag or isinstance(x, Protocol): return x in self.__data__ with contextlib.suppress(Exception): for data in self.__data__: index = data.id() if isinstance(index, tuple): index = r'|'.join(index) if re.fullmatch(index, x, re.IGNORECASE): return True return False
[docs]class _AliasList(collections.abc.Sequence): """List of protocol aliases for ProtoChain""" @property def data(self): """Protocol alias data. :rtype: List[str] """ return self.__data__
[docs] def __init__(self, data=None, *, base=None): """Initialisation. Args: data (Optional[str]): New protocol alias on top stack. Keyword Args: base (Union[pcapkit.corekit.protochain._AliasLists, List[str]]): Original protocol alias chain as base stacks. """ self.__data__ = list() if data is not None: self.__data__.append(data) if base is not None: if isinstance(base, _AliasList): self.__data__.extend(base.data) else: self.__data__.extend(base)
[docs] def __len__(self): """Length of the protocol chain. :rtype: int """ return len(self.__data__)
[docs] def __iter__(self): """Iterate through the protocol chain. :rtype: Iterator[str] """ return iter(self.__data__)
[docs] def __getitem__(self, index): """Subscription (``getitem``) support. Args: index (int): Indexing key. Returns: str: Protocol alias at such index. """ return self.__data__[index]
[docs] def __reversed__(self): """Reverse the protocol alias chain. :rtype: List[str] """ return reversed(self.__data__)
[docs] def __contains__(self, x): """Returns if ``x`` is in the chain. Args: x (Union[str, pcapkit.protocols.protocol.Protocol, Type[pcapkit.protocols.protocol, Protocol]]): name to search Returns: bool: if ``x`` is in the chain """ from pcapkit.protocols.protocol import Protocol # pylint: disable=import-outside-toplevel try: flag = issubclass(x, Protocol) except TypeError: flag = issubclass(type(x), Protocol) if flag or isinstance(x, Protocol): x = x.id() if isinstance(x, tuple): x = r'|'.join(x) with contextlib.suppress(Exception): for data in self.__data__: if re.fullmatch(x, data, re.IGNORECASE): return True return False
[docs] def count(self, value): """Number of occurrences of ``value``. Args: value: (Union[str, pcapkit.protocols.protocol.Protocol, Type[pcapkit.protocols.protocol, Protocol]]): value to search Returns: int: Number of occurrences of ``value``. """ from pcapkit.protocols.protocol import Protocol # pylint: disable=import-outside-toplevel try: flag = issubclass(value, Protocol) except TypeError: flag = issubclass(type(value), Protocol) if flag or isinstance(value, Protocol): value = value.id() if isinstance(value, tuple): value = r'|'.join(value) with contextlib.suppress(Exception): return sum(1 for data in self.__data__ if re.fullmatch(value, data, re.IGNORECASE) is not None) return 0
[docs] def index(self, value, start=0, stop=None): # pylint: disable=inconsistent-return-statements """First index of ``value``. Args: value (Union[str, pcapkit.protocols.protocol.Protocol, Type[pcapkit.protocols.protocol, Protocol]]): value to search start (int): start offset stop (int): stop offset Returns: int: First index of ``value``. Raises: IntError: If the value is not present. """ if start is not None and start < 0: start = max(len(self) + start, 0) if stop is not None and stop < 0: stop += len(self) try: if not isinstance(start, numbers.Integral): start = self.index(start) if not isinstance(stop, numbers.Integral): stop = self.index(stop) except IndexNotFound: raise IntError('slice indices must be integers or have an __index__ method') from None from pcapkit.protocols.protocol import Protocol # pylint: disable=import-outside-toplevel try: flag = issubclass(value, Protocol) except TypeError: flag = issubclass(type(value), Protocol) if flag or isinstance(value, Protocol): value = value.id() if isinstance(value, tuple): value = r'|'.join(value) try: for index, data in enumerate(self.__data__[start:stop]): if re.fullmatch(value, data, re.IGNORECASE): return index except Exception: raise IndexNotFound(f'{value!r} is not in {self.__class__.__name__!r}') return -1
[docs]class ProtoChain(collections.abc.Container): """Protocols chain.""" ########################################################################## # Properties. ########################################################################## @property def proto(self): """Protocol classes chain. :rtype: pcapkit.corekit.protocol._ProtoList """ return self.__proto__ @property def alias(self): """Protocol aliases chain. :rtype: pcapkit.corekit.protocol._AliasList """ return self.__alias__ @cached_property def tuple(self): """Protocol names. :rtype: Tuple[str] """ return tuple(proto.__name__ for proto in self.__proto__.data) @property def chain(self): """Protocol chain string. :rtype: str """ return self.__str__() ########################################################################## # Methods. ##########################################################################
[docs] def index(self, value, start=None, stop=None): """First index of ``value``. Args: value (Union[str, pcapkit.protocols.protocol.Protocol, Type[pcapkit.protocols.protocol, Protocol]]): value to search start (int): start offset stop (int): stop offset Returns: int: First index of ``value``. Raises: IntError: If the value is not present. See Also: This method calls :meth:`self.__alias__.index <pcapkit.corekit.protochain._AliasList.index>` for the actual processing. """ return self.__alias__.index(value, start, stop)
[docs] def count(self, value): """Number of occurrences of ``value``. Args: value: (Union[str, pcapkit.protocols.protocol.Protocol, Type[pcapkit.protocols.protocol, Protocol]]): value to search Returns: int: Number of occurrences of ``value``. See Also: This method calls :meth:`self.__alias__.count <pcapkit.corekit.protochain._AliasList.count>` for the actual processing. """ return self.__alias__.count(value)
########################################################################## # Data modules. ##########################################################################
[docs] def __init__(self, proto=None, alias=None, *, basis=None): """Initialisation. Args: proto (Optional[pcapkit.protocols.protocol.Protocol]): New protocol class on the top stack. alias (Optional[str]): New protocol alias on the top stack. Keyword Args: basis (pcapkit.corekit.protochain.ProtoChain): Original protocol chain as base stacks. """ if alias is None and proto is not None: alias = getattr(proto, '__name__', type(proto).__name__) if basis is None: basis = Info(proto=None, alias=None) self.__proto__ = _ProtoList(proto, base=basis.proto) self.__alias__ = _AliasList(alias, base=basis.alias)
[docs] def __repr__(self): """Returns representation of protocol chain data. Example: >>> protochain ProtoChain(<class 'pcapkit.protocols.link.ethernet.Ethernet'>, ...) """ return f"ProtoChain({', '.join(map(lambda p: p.__name__, self.__proto__.data))})"
[docs] def __str__(self): """Returns formatted hex representation of source data stream. Example: >>> protochain ProtoChain(<class 'pcapkit.protocols.link.ethernet.Ethernet'>, ...) >>> print(protochain) Ethernet:IPv6:Raw """ return ':'.join(self.__alias__.data)
[docs] def __contains__(self, name): """Returns if ``name`` is in the chain. Args: name (Union[str, pcapkit.protocols.protocol.Protocol, Type[pcapkit.protocols.protocol, Protocol]]): name to search Returns: bool: if ``name`` is in the chain """ return (name in self.__proto__) or (name in self.__alias__)
[docs] def __getitem__(self, index): """Subscription (``getitem``) support. Args: index (int): Indexing key. Returns: str: Protocol alias at such index. """ return self.__alias__[index]