Source code for pcapkit.protocols.misc.raw
# -*- coding: utf-8 -*-
"""Raw Packet Data
=====================
:mod:`pcapkit.protocols.misc.raw` contains
:class:`~pcapkit.protocols.misc.raw.Raw` only, which implements
extractor for unknown protocol, and constructs a
:class:`~pcapkit.protocols.protocol.Protocol` like object.
"""
import io
from typing import TYPE_CHECKING, overload
from pcapkit.corekit.protochain import ProtoChain
from pcapkit.protocols.data.misc.raw import Raw as DataType_Raw
from pcapkit.protocols.misc.null import NoPayload
from pcapkit.protocols.protocol import Protocol
from pcapkit.utilities.exceptions import UnsupportedCall
if TYPE_CHECKING:
from typing import Any, BinaryIO, NoReturn, Optional
from typing_extensions import Literal
__all__ = ['Raw']
[docs]class Raw(Protocol[DataType_Raw]):
"""This class implements universal unknown protocol."""
##########################################################################
# Properties.
##########################################################################
# name of current protocol
@property
def name(self) -> 'Literal["Unknown"]':
"""Name of current protocol."""
return 'Unknown'
# header length of current protocol
@property
def length(self) -> 'Literal[0]':
"""Header length of current protocol."""
return 0
# name of next layer protocol
@property
def protocol(self) -> 'NoReturn':
"""Name of next layer protocol.
Raises:
UnsupportedCall: This protocol doesn't support :attr:`protocol`.
"""
raise UnsupportedCall(f"{self.__class__.__name__!r} object has no attribute 'protocol'")
##########################################################################
# Methods.
##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, *, error: 'Optional[Exception]' = None, # pylint: disable=arguments-differ
alias: 'Optional[int]' = None, **kwargs: 'Any') -> 'DataType_Raw': # pylint: disable=unused-argument
"""Read raw packet data.
Args:
length: Length of packet data.
error: Parsing errors if any.
alias: Original enumeration of the unknown protocol.
**kwargs: Arbitrary keyword arguments.
Returns:
The parsed packet data.
"""
raw = DataType_Raw(
protocol=alias,
packet=self._data,
error=error,
)
return raw
[docs] def make(self, **kwargs: 'Any') -> 'bytes':
"""Make raw packet data.
Args:
packet (bytes): Raw packet data.
**kwargs: Arbitrary keyword arguments.
Returns:
Constructed packet data.
"""
packet = kwargs.get('packet', b'')
return packet
##########################################################################
# Data models.
##########################################################################
@overload
def __post_init__(self, file: 'BinaryIO', length: 'Optional[int]' = ..., **kwargs: 'Any') -> 'None': ...
@overload
def __post_init__(self, **kwargs: 'Any') -> 'None': ... # pylint: disable=arguments-differ
[docs] def __post_init__(self, file: 'Optional[BinaryIO]' = None,
length: 'Optional[int]' = None, **kwargs: 'Any') -> 'None':
"""Post initialisation hook.
Would :mod:`pcapkit` encounter malformed packets, the original parsing
error instance will be provided as in ``error``.
Args:
file: Source packet stream.
length: Length of packet data.
error (Optional[Exception]): Parsing errors if any (for parsing).
alias (Optional[int]): Original enumeration of the unknown protocol.
**kwargs: Arbitrary keyword arguments.
See Also:
For construction argument, please refer to :meth:`make`.
"""
if file is None:
_data = self.make(**kwargs)
else:
_data = file.read(length) # type: ignore[arg-type]
#: bytes: Raw packet data.
self._data = _data
#: io.BytesIO: Source packet stream.
self._file = io.BytesIO(self._data)
#: pcapkit.protocols.data.misc.raw.Raw: Parsed packet data.
self._info = self.read(length, **kwargs)
if self._info.protocol is not None and hasattr(self._info.protocol, 'name'):
alias = self._info.protocol.name # type: ignore[attr-defined]
else:
alias = self.alias
#: pcapkit.protocols.null.NoPayload: Next layer (no payload).
self._next = NoPayload()
#: pcapkit.corekit.protochain.ProtoChain: Protocol chain from current layer.
self._protos = ProtoChain(self.__class__, alias)
[docs] @classmethod
def __index__(cls) -> 'NoReturn':
"""Numeral registry index of the protocol.
Raises:
UnsupportedCall: This protocol has no registry entry.
"""
raise UnsupportedCall(f'{cls.__name__!r} object cannot be interpreted as an integer')