Source code for pcapkit.protocols.internet.ipv6
# -*- coding: utf-8 -*-
"""internet protocol version 6
:mod:`pcapkit.protocols.internet.ipv6` contains
:class:`~pcapkit.protocols.internet.ipv6.IPv6` only,
which implements extractor for Internet Protocol
version 6 (IPv6) [*]_, whose structure is described
as below:
======= ========= ===================== =======================================
Octets Bits Name Description
======= ========= ===================== =======================================
0 0 ``ip.version`` Version (``6``)
0 4 ``ip.class`` Traffic Class
1 12 ``ip.label`` Flow Label
4 32 ``ip.payload`` Payload Length (header excludes)
6 48 ``ip.next`` Next Header
7 56 ``ip.limit`` Hop Limit
8 64 ``ip.src`` Source Address
24 192 ``ip.dst`` Destination Address
======= ========= ===================== =======================================
.. [*] https://en.wikipedia.org/wiki/IPv6_packet
"""
import ipaddress
from pcapkit.const.ipv6.extension_header import ExtensionHeader as EXT_HDR
from pcapkit.const.reg.transtype import TransType
from pcapkit.protocols.internet.ip import IP
__all__ = ['IPv6']
[docs]class IPv6(IP):
"""This class implements Internet Protocol version 6."""
##########################################################################
# Properties.
##########################################################################
@property
def name(self):
"""Name of corresponding protocol.
:rtype: Literal['Internet Protocol version 6']
"""
return 'Internet Protocol version 6'
@property
def length(self):
"""Header length of corresponding protocol.
:rtype: int
"""
return self._info.hdr_len # pylint: disable=E1101
@property
def protocol(self):
"""Name of next layer protocol.
:rtype: pcapkit.const.reg.transtype.TransType
"""
return self._info.protocol # pylint: disable=E1101
##########################################################################
# Methods.
##########################################################################
[docs] def read(self, length=None, **kwargs): # pylint: disable=unused-argument
"""Read Internet Protocol version 6 (IPv6).
Structure of IPv6 header [:rfc:`2460`]::
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|Version| Traffic Class | Flow Label |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Payload Length | Next Header | Hop Limit |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ +
| |
+ Source Address +
| |
+ +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ +
| |
+ Destination Address +
| |
+ +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Args:
length (Optional[int]): Length of packet data.
Keyword Args:
**kwargs: Arbitrary keyword arguments.
Returns:
DataType_IPv6: Parsed packet data.
"""
if length is None:
length = len(self)
_htet = self._read_ip_hextet()
_plen = self._read_unpack(2)
_next = self._read_protos(1)
_hlmt = self._read_unpack(1)
_srca = self._read_ip_addr()
_dsta = self._read_ip_addr()
ipv6 = {
'version': _htet[0],
'class': _htet[1],
'label': _htet[2],
'payload': _plen,
'next': _next,
'limit': _hlmt,
'src': _srca,
'dst': _dsta,
}
hdr_len = 40
raw_len = ipv6['payload']
ipv6['packet'] = self._read_packet(header=hdr_len, payload=raw_len)
return self._decode_next_layer(ipv6, _next, raw_len)
[docs] def make(self, **kwargs):
"""Make (construct) packet data.
Keyword Args:
**kwargs: Arbitrary keyword arguments.
Returns:
bytes: Constructed packet data.
"""
raise NotImplementedError
[docs] @classmethod
def id(cls):
"""Index ID of the protocol.
Returns:
Literal['IPv6']: Index ID of the protocol.
"""
return cls.__name__
##########################################################################
# Data models.
##########################################################################
[docs] def __length_hint__(self):
"""Return an estimated length for the object.
:rtype: Literal[40]
"""
return 40
[docs] @classmethod
def __index__(cls): # pylint: disable=invalid-index-returned
"""Numeral registry index of the protocol.
Returns:
pcapkit.const.reg.transtype.TransType: Numeral registry index of the
protocol in `IANA`_.
.. _IANA: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
"""
return TransType(41)
##########################################################################
# Utilities.
##########################################################################
[docs] def _read_ip_hextet(self):
"""Read first four hextets of IPv6.
Returns:
Tuple[int, int, int]: Parsed hextets data, including version number,
traffic class and flow label.
"""
_htet = self._read_fileng(4).hex()
_vers = int(_htet[0], base=16) # version number (6)
_tcls = int(_htet[0:2], base=16) # traffic class
_flow = int(_htet[2:], base=16) # flow label
return (_vers, _tcls, _flow)
[docs] def _read_ip_addr(self):
"""Read IP address.
Returns:
ipaddress.IPv6Address: Parsed IP address.
"""
return ipaddress.ip_address(self._read_fileng(16))
[docs] def _decode_next_layer(self, ipv6, proto=None, length=None): # pylint: disable=arguments-differ
"""Decode next layer extractor.
Arguments:
ipv6 (DataType_IPv6): info buffer
proto (str): next layer protocol name
length (int): valid (*not padding*) length
Returns:
DataType_IPv6: current protocol with next layer extracted
"""
hdr_len = 40 # header length
raw_len = ipv6['payload'] # payload length
_protos = list() # ProtoChain buffer
# traverse if next header is an extensive header
while proto in EXT_HDR:
# keep original data after fragment header
if proto.value == 44:
ipv6['fragment'] = self._read_packet(header=hdr_len, payload=raw_len)
# # directly break when No Next Header occurs
# if proto.name == 'IPv6-NoNxt':
# proto = None
# break
# make protocol name
next_ = self._import_next_layer(proto, version=6, extension=True)
info = next_.info
name = next_.alias.lstrip('IPv6-').lower()
ipv6[name] = info
# record protocol name
# self._protos = ProtoChain(name, chain, alias)
_protos.append(next_)
proto = info.next # pylint: disable=E1101
# update header & payload length
hdr_len += info.length # pylint: disable=E1101
raw_len -= info.length # pylint: disable=E1101
# record real header & payload length (headers exclude)
ipv6['hdr_len'] = hdr_len
ipv6['raw_len'] = raw_len
# update next header
ipv6['protocol'] = proto
return super()._decode_next_layer(ipv6, proto, raw_len, ipv6_exthdr=_protos)