Source code for pcapkit.protocols.link.ospf
# -*- coding: utf-8 -*-
"""open shortest path first
:mod:`pcapkit.protocols.link.ospf` contains
:class:`~pcapkit.protocols.link.ospf.OSPF` only,
which implements extractor for Open Shortest Path
First (OSPF) [*]_, whose structure is described
as below:
+========+=======+====================+=================================+
| Octets | Bits | Name | Description |
+========+=======+====================+=================================+
| 0 | 0 | ``ospf.version`` | Version Number |
+--------+-------+--------------------+---------------------------------+
| 0 | 0 | ``ospf.type`` | Type |
+--------+-------+--------------------+---------------------------------+
| 0 | 1 | ``ospf.len`` | Packet Length (header included) |
+--------+-------+--------------------+---------------------------------+
| 0 | 2 | ``ospf.router_id`` | Router ID |
+--------+-------+--------------------+---------------------------------+
| 0 | 4 | ``ospf.area_id`` | Area ID |
+--------+-------+--------------------+---------------------------------+
| 0 | 6 | ``ospf.chksum`` | Checksum |
+--------+-------+--------------------+---------------------------------+
| 0 | 7 | ``ospf.autype`` | Authentication Type |
+--------+-------+--------------------+---------------------------------+
| 1 | 8 | ``ospf.auth`` | Authentication |
+--------+-------+--------------------+---------------------------------+
.. [*] https://en.wikipedia.org/wiki/Open_Shortest_Path_First
"""
import ipaddress
from pcapkit.const.ospf.authentication import Authentication as AUTH
from pcapkit.const.ospf.packet import Packet as TYPE
from pcapkit.protocols.link.link import Link
from pcapkit.utilities.exceptions import UnsupportedCall
__all__ = ['OSPF']
[docs]class OSPF(Link):
"""This class implements Open Shortest Path First."""
##########################################################################
# Properties.
##########################################################################
@property
def name(self):
"""Name of current protocol.
:rtype: str
"""
return f'Open Shortest Path First version {self._info.version}' # pylint: disable=E1101
@property
def alias(self):
"""Acronym of current protocol.
:rtype: str
"""
return f'OSPFv{self._info.version}' # pylint: disable=E1101
@property
def length(self):
"""Header length of current protocol.
:rtype: Literal[24]
"""
return 24
@property
def type(self):
"""OSPF packet type.
:rtype: pcapkit.const.ospf.packet.Packet
"""
return self._info.type # pylint: disable=E1101
##########################################################################
# Methods.
##########################################################################
[docs] def read(self, length=None, **kwargs): # pylint: disable=unused-argument
"""Read Open Shortest Path First.
Structure of OSPF header [:rfc:`2328`]::
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Version # | Type | Packet length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Router ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Area ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Checksum | AuType |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Authentication |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Authentication |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Args:
length (Optional[int]): Length of packet data.
Keyword Args:
**kwargs: Arbitrary keyword arguments.
Returns:
DataType_OSPF: Parsed packet data.
"""
if length is None:
length = len(self)
_vers = self._read_unpack(1)
_type = self._read_unpack(1)
_tlen = self._read_unpack(2)
_rtid = self._read_id_numbers()
_area = self._read_id_numbers()
_csum = self._read_fileng(2)
_autp = self._read_unpack(2)
ospf = dict(
version=_vers,
type=TYPE.get(_type),
len=_tlen,
router_id=_rtid,
area_id=_area,
chksum=_csum,
autype=AUTH.get(_autp) or 'Reserved',
)
if _autp == 2:
ospf['auth'] = self._read_encrypt_auth()
else:
ospf['auth'] = self._read_fileng(8)
length = ospf['len'] - 24
ospf['packet'] = self._read_packet(header=24, payload=length)
return self._decode_next_layer(ospf, length)
[docs] def make(self, **kwargs):
"""Make (construct) packet data.
Keyword Args:
**kwargs: Arbitrary keyword arguments.
Returns:
bytes: Constructed packet data.
"""
raise NotImplementedError
##########################################################################
# Data models.
##########################################################################
[docs] def __length_hint__(self):
"""Return an estimated length for the object.
:rtype: Literal[24]
"""
return 24
[docs] @classmethod
def __index__(cls): # pylint: disable=invalid-index-returned
"""Numeral registry index of the protocol.
Raises:
UnsupportedCall: This protocol has no registry entry.
"""
raise UnsupportedCall(f'{cls.__name__!r} object cannot be interpreted as an integer')
##########################################################################
# Utilities.
##########################################################################
[docs] def _read_id_numbers(self):
"""Read router and area IDs.
Returns:
IPv4Address: Parsed IDs as an IPv4 address.
"""
#_byte = self._read_fileng(4)
#_addr = '.'.join(str(_) for _ in _byte)
return ipaddress.ip_address(self._read_fileng(4))
[docs] def _read_encrypt_auth(self):
"""Read Authentication field when Cryptographic Authentication is employed,
i.e. :attr:`~OSPF.autype` is ``2``.
Structure of Cryptographic Authentication [:rfc:`2328`]::
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0 | Key ID | Auth Data Len |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Cryptographic sequence number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Args:
length (int): packet length
Returns:
DataType_Auth: Parsed packet data.
class Auth(TypedDict):
\"\"\"Cryptographic authentication.\"\"\"
#: key ID
key_id: int
#: authentication data length
len: int
#: cryptographic sequence number
seq: int
"""
_resv = self._read_fileng(2)
_keys = self._read_unpack(1)
_alen = self._read_unpack(1)
_seqn = self._read_unpack(4)
auth = dict(
key_id=_keys,
len=_alen,
seq=_seqn,
)
return auth