Source code for pcapkit.protocols.internet.ipv4

# -*- coding: utf-8 -*-
"""internet protocol version 4

:mod:`pcapkit.protocols.internet.ipv4` contains
:class:`~pcapkit.protocols.internet.ipv4.IPv4` only,
which implements extractor for Internet Protocol
version 4 (IPv4) [*]_, whose structure is described
as below:

======= ========= ====================== =============================================
Octets      Bits        Name                    Description
======= ========= ====================== =============================================
  0           0   ``ip.version``              Version (``4``)
  0           4   ``ip.hdr_len``              Internal Header Length (IHL)
  1           8   ``ip.dsfield.dscp``         Differentiated Services Code Point (DSCP)
  1          14   ``ip.dsfield.ecn``          Explicit Congestion Notification (ECN)
  2          16   ``ip.len``                  Total Length
  4          32   ``ip.id``                   Identification
  6          48                               Reserved Bit (must be ``\\x00``)
  6          49   ``ip.flags.df``             Don't Fragment (DF)
  6          50   ``ip.flags.mf``             More Fragments (MF)
  6          51   ``ip.frag_offset``          Fragment Offset
  8          64   ``ip.ttl``                  Time To Live (TTL)
  9          72   ``ip.proto``                Protocol (Transport Layer)
  10         80   ``ip.checksum``             Header Checksum
  12         96   ``ip.src``                  Source IP Address
  16        128   ``ip.dst``                  Destination IP Address
  20        160   ``ip.options``              IP Options (if IHL > ``5``)
======= ========= ====================== =============================================

.. [*] https://en.wikipedia.org/wiki/IPv4

"""
import datetime
import ipaddress

from pcapkit.const.ipv4.classification_level import ClassificationLevel as _CLASSIFICATION_LEVEL
from pcapkit.const.ipv4.option_class import OptionClass as opt_class
from pcapkit.const.ipv4.option_number import OptionNumber as OPT_TYPE
from pcapkit.const.ipv4.protection_authority import ProtectionAuthority as _PROTECTION_AUTHORITY
from pcapkit.const.ipv4.qs_function import QSFunction as QS_FUNC
from pcapkit.const.ipv4.router_alert import RouterAlert as _ROUTER_ALERT
from pcapkit.const.reg.transtype import TransType
from pcapkit.const.ipv4.tos_del import ToSDelay as TOS_DEL
from pcapkit.const.ipv4.tos_ecn import ToSECN as TOS_ECN
from pcapkit.const.ipv4.tos_pre import ToSPrecedence as TOS_PRE
from pcapkit.const.ipv4.tos_rel import ToSReliability as TOS_REL
from pcapkit.const.ipv4.tos_thr import ToSThroughput as TOS_THR
from pcapkit.corekit.infoclass import Info
from pcapkit.protocols.internet.ip import IP
from pcapkit.utilities.exceptions import ProtocolError

__all__ = ['IPv4']

T = True
F = False

# pylint: disable=protected-access
process_opt = {
    0: lambda self, size, kind: self._read_mode_donone(size, kind),    # do nothing
    1: lambda self, size, kind: self._read_mode_unpack(size, kind),    # unpack according to size
    2: lambda self, size, kind: self._read_mode_route(size, kind),     # route data
    3: lambda self, size, kind: self._read_mode_qs(size, kind),        # Quick-Start
    4: lambda self, size, kind: self._read_mode_ts(size, kind),        # Time Stamp
    5: lambda self, size, kind: self._read_mode_tr(size, kind),        # Traceroute
    6: lambda self, size, kind: self._read_mode_sec(size, kind),       # (Extended) Security
    7: lambda self, size, kind: self._read_mode_rsralt(size, kind),    # Router Alert
}

IPv4_OPT = {                 # # copy  class  number  kind  length  process          name
    0:    (F, 'eool'),       # #   0     0       0      0      -       -     [RFC 791] End of Option List
    1:    (F, 'nop'),        # #   0     0       1      1      -       -     [RFC 791] No-Operation
    7:    (T, 'rr', 2),      # #   0     0       7      7      N       2     [RFC 791] Record Route
    11:   (T, 'mtup', 1),    # #   0     0      11     11      4       1     [RFC 1063][RFC 1191] MTU Probe
    12:   (T, 'mtur', 1),    # #   0     0      12     12      4       1     [RFC 1063][RFC 1191] MTU Reply
    25:   (T, 'qs', 3),      # #   0     0      25     25      8       3     [RFC 4782] Quick-Start
    68:   (T, 'ts', 4),      # #   0     2       4     68      N       4     [RFC 791] Time Stamp
    82:   (T, 'tr', 5),      # #   0     2      18     82      N       5     [RFC 1393][RFC 6814] Traceroute
    130:  (T, 'sec', 6),     # #   1     0       2    130      N       6     [RFC 1108] Security
    131:  (T, 'lsr', 2),     # #   1     0       3    131      N       2     [RFC 791] Loose Source Route
    133:  (T, 'esec', 6),    # #   1     0       5    133      N       6     [RFC 1108] Extended Security
    136:  (T, 'sid', 1),     # #   1     0       8    136      4       1     [RFC 791][RFC 6814] Stream ID
    137:  (T, 'ssr', 2),     # #   1     0       9    137      N       2     [RFC 791] Strict Source Route
    145:  (T, 'eip', 0),     # #   1     0      17    145      N       0     [RFC 1385][RFC 6814] Ext. Inet. Protocol
    148:  (T, 'rtralt', 7),  # #   1     0      20    148      4       7     [RFC 2113] Router Alert
}
"""IPv4 Option Utility Table

T | F
    bool, short of True / False

IPv4_OPT
    dict, IPv4 option dict.
    Value is a tuple which contains:
        |--> bool, if length greater than 1
        |       |--> T - True
        |       |--> F - False
        |--> str, description string, also attribute name
        |--> (optional) int, process that data bytes need (when length greater than 2)
                |--> 0: do nothing
                |--> 1: unpack according to size
                |--> 2: unpack route data options then add to dict
                |--> 3: unpack Quick-Start then add to dict
                |--> 4: unpack Time Stamp then add to dict
                |--> 5: unpack Traceroute then add to dict
                |--> 6: unpack (Extended) Security then add tot dict
                |--> 7: unpack Router Alert then add to dict

"""


[docs]class IPv4(IP): """This class implements Internet Protocol version 4.""" ########################################################################## # Properties. ########################################################################## @property def name(self): """Name of corresponding protocol. :rtype: Literal['Internet Protocol version 4'] """ return 'Internet Protocol version 4' @property def length(self): """Header length of corresponding protocol. :rtype: int """ return self._info.hdr_len # pylint: disable=E1101 @property def protocol(self): """Name of next layer protocol. :rtype: pcapkit.const.reg.transtype.TransType """ return self._info.proto # pylint: disable=E1101 ########################################################################## # Methods. ##########################################################################
[docs] def read(self, length=None, **kwargs): # pylint: disable=unused-argument """Read Internet Protocol version 4 (IPv4). Structure of IPv4 header [:rfc:`791`]:: 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |Version| IHL |Type of Service| Total Length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Identification |Flags| Fragment Offset | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Time to Live | Protocol | Header Checksum | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Source Address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Destination Address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Options | Padding | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length (Optional[int]): Length of packet data. Keyword Args: **kwargs: Arbitrary keyword arguments. Returns: DataType_IPv4: Parsed packet data. """ if length is None: length = len(self) _vihl = self._read_fileng(1).hex() _dscp = self._read_binary(1) _tlen = self._read_unpack(2) _iden = self._read_unpack(2) _frag = self._read_binary(2) _ttol = self._read_unpack(1) _prot = self._read_protos(1) _csum = self._read_fileng(2) _srca = self._read_ipv4_addr() _dsta = self._read_ipv4_addr() ipv4 = dict( version=int(_vihl[0]), hdr_len=int(_vihl[1], base=16) * 4, dsfield=dict( dscp={ 'pre': TOS_PRE.get(int(_dscp[:3], base=2)), 'del': TOS_DEL.get(int(_dscp[3], base=2)), 'thr': TOS_THR.get(int(_dscp[4], base=2)), 'rel': TOS_REL.get(int(_dscp[5], base=2)), }, ecn=TOS_ECN.get(int(_dscp[-2:], base=2)), ), len=_tlen, id=_iden, flags=dict( df=bool(int(_frag[1])), mf=bool(int(_frag[2])), ), frag_offset=int(_frag[3:], base=2) * 8, ttl=_ttol, proto=_prot, checksum=_csum, src=_srca, dst=_dsta, ) _optl = ipv4['hdr_len'] - 20 if _optl: options = self._read_ipv4_options(_optl) ipv4['opt'] = options[0] # tuple of option acronyms ipv4.update(options[1]) # merge option info to buffer # ipv4['opt'] = self._read_fileng(_optl) or None hdr_len = ipv4['hdr_len'] raw_len = ipv4['len'] - hdr_len ipv4['packet'] = self._read_packet(header=hdr_len, payload=raw_len) return self._decode_next_layer(ipv4, _prot, raw_len)
[docs] def make(self, **kwargs): """Make (construct) packet data. Keyword Args: **kwargs: Arbitrary keyword arguments. Returns: bytes: Constructed packet data. """ raise NotImplementedError
[docs] @classmethod def id(cls): """Index ID of the protocol. Returns: Literal['IPv4']: Index ID of the protocol. """ return cls.__name__
########################################################################## # Data models. ##########################################################################
[docs] def __length_hint__(self): """Return an estimated length for the object. :rtype: Literal[20] """ return 20
[docs] @classmethod def __index__(cls): # pylint: disable=invalid-index-returned """Numeral registry index of the protocol. Returns: pcapkit.const.reg.transtype.TransType: Numeral registry index of the protocol in `IANA`_. .. _IANA: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml """ return TransType(4)
########################################################################## # Utilities. ##########################################################################
[docs] def _read_ipv4_addr(self): """Read IP address. Returns: ipaddress.IPv4Address: Parsed IP address. """ # _byte = self._read_fileng(4) # _addr = '.'.join([str(_) for _ in _byte]) # return _addr return ipaddress.ip_address(self._read_fileng(4))
[docs] def _read_opt_type(self, kind): # pylint: disable=no-self-use """Read option type field. Arguments: kind (int): option kind value Returns: DataType_IPv4_Option_Type: extracted IPv4 option """ bin_ = bin(kind)[2:].zfill(8) type_ = { 'copy': bool(int(bin_[0], base=2)), 'class': opt_class.get(int(bin_[1:3], base=2)), 'number': int(bin_[3:], base=2), } return type_
[docs] def _read_ipv4_options(self, size=None): """Read IPv4 option list. Arguments: size (Optional[int]): buffer size Returns: Tuple[Tuple[pcapkit.const.ipv4.option_number.OptionNumber], Dict[str, Union[DataType_Opt, Tuple[DataType_Opt]]]]: IPv4 option list and extracted IPv4 options """ counter = 0 # length of read option list optkind = list() # option kind list options = dict() # dict of option data while counter < size: # get option kind kind = self._read_unpack(1) # fetch corresponding option tuple opts = IPv4_OPT.get(kind) if opts is None: len_ = size - counter counter = size options['Unknown'] = self._read_fileng(len_) break # extract option dscp = OPT_TYPE.get(kind) desc = dscp.name if opts[0]: byte = self._read_unpack(1) if byte: # check option process mode data = process_opt[opts[2]](self, byte, kind) else: # permission options (length is 2) data = dict( kind=kind, # option kind type=self._read_opt_type(kind), # option type info length=2, # option length flag=True, # permission flag ) else: # 1-byte options byte = 1 data = dict( kind=kind, # option kind type=self._read_opt_type(kind), # option type info length=1, # option length ) # record option data counter += byte if dscp in optkind: if isinstance(options[desc], tuple): options[desc] += (Info(data),) else: options[desc] = (Info(options[desc]), Info(data)) else: optkind.append(dscp) options[desc] = data # break when eol triggered if not kind: break # get padding if counter < size: len_ = size - counter self._read_binary(len_) return tuple(optkind), options
[docs] def _read_mode_donone(self, size, kind): """Read options require no process. Arguments: size (int): length of option kind (int): option kind value Returns: DataType_Opt_Do_None: extracted option Raises: ProtocolError: If ``size`` is **LESS THAN** ``3``. """ if size < 3: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') data = dict( kind=kind, type=self._read_opt_type(kind), length=size, data=self._read_fileng(size), ) return data
[docs] def _read_mode_unpack(self, size, kind): """Read options require unpack process. Arguments: size (int): length of option kind (int): option kind value Returns: DataType_Opt_Unpack: extracted option Raises: ProtocolError: If ``size`` is **LESS THAN** ``3``. """ if size < 3: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') data = dict( kind=kind, type=self._read_opt_type(kind), length=size, data=self._read_unpack(size), ) return data
[docs] def _read_mode_route(self, size, kind): """Read options with route data. Structure of these options [:rfc:`791`]: * Loose Source Route .. code:: text +--------+--------+--------+---------//--------+ |10000011| length | pointer| route data | +--------+--------+--------+---------//--------+ * Strict Source Route .. code:: text +--------+--------+--------+---------//--------+ |10001001| length | pointer| route data | +--------+--------+--------+---------//--------+ * Record Route .. code:: text +--------+--------+--------+---------//--------+ |00000111| length | pointer| route data | +--------+--------+--------+---------//--------+ Arguments: size (int): length of option kind (Literal[7, 131, 137]): option kind value (RR/LSR/SSR) Returns: DataType_Opt_Route_Data: extracted option with route data Raises: ProtocolError: If the option is malformed. """ if size < 3 or (size - 3) % 4 != 0: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') _rptr = self._read_unpack(1) if _rptr < 4: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') data = dict( kind=kind, type=self._read_opt_type(kind), length=size, pointer=_rptr, ) counter = 4 address = list() endpoint = min(_rptr, size) while counter < endpoint: counter += 4 address.append(self._read_ipv4_addr()) data['data'] = tuple(address) or None return data
[docs] def _read_mode_qs(self, size, kind): """Read Quick Start option. Structure of Quick-Start (QS) option [:rfc:`4782`]: * A Quick-Start Request .. code:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Option | Length=8 | Func. | Rate | QS TTL | | | | 0000 |Request| | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | QS Nonce | R | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * Report of Approved Rate .. code:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Option | Length=8 | Func. | Rate | Not Used | | | | 1000 | Report| | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | QS Nonce | R | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Arguments: size (int): length of option kind (Literal[25]): option kind value (QS) Returns: DataType_Opt_QuickStart: extracted Quick Start option Raises: ProtocolError: If the option is malformed. """ if size != 8: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') _type = self._read_opt_type(kind) _fcrr = self._read_binary(1) _func = int(_fcrr[:4], base=2) _rate = int(_fcrr[4:], base=2) _ttlv = self._read_unpack(1) _nonr = self._read_binary(4) _qsnn = int(_nonr[:30], base=2) if _func not in (0, 8): raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') data = dict( kind=kind, type=_type, length=size, func=QS_FUNC.get(_func), rate=40000 * (2 ** _rate) / 1000, ttl=None if _func else _rate, nounce=_qsnn, ) return data
[docs] def _read_mode_ts(self, size, kind): """Read Time Stamp option. Structure of Timestamp (TS) option [:rfc:`791`]:: +--------+--------+--------+--------+ |01000100| length | pointer|oflw|flg| +--------+--------+--------+--------+ | internet address | +--------+--------+--------+--------+ | timestamp | +--------+--------+--------+--------+ | . | . . Arguments: size (int): length of option kind (Literal[68]): option kind value (TS) Returns: DataType_Opt_TimeStamp: extracted Time Stamp option Raises: ProtocolError: If the option is malformed. """ if size > 40 or size < 4: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') _tptr = self._read_unpack(1) _oflg = self._read_binary(1) _oflw = int(_oflg[:4], base=2) _flag = int(_oflg[4:], base=2) if _tptr < 5: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') data = dict( kind=kind, type=self._read_opt_type(kind), length=size, pointer=_tptr, overflow=_oflw, flag=_flag, ) endpoint = min(_tptr, size) if _flag == 0: if (size - 4) % 4 != 0: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') counter = 5 timestamp = list() while counter < endpoint: counter += 4 time = self._read_unpack(4, lilendian=True) timestamp.append(datetime.datetime.fromtimestamp(time)) data['timestamp'] = timestamp or None elif _flag in (1, 3): if (size - 4) % 8 != 0: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') counter = 5 ipaddress = list() # pylint: disable=redefined-outer-name timestamp = list() while counter < endpoint: counter += 8 ipaddress.append(self._read_ipv4_addr()) time = self._read_unpack(4, lilendian=True) timestamp.append(datetime.datetime.fromtimestamp(time)) data['ip'] = tuple(ipaddress) or None data['timestamp'] = tuple(timestamp) or None else: data['data'] = self._read_fileng(size - 4) or None return data
[docs] def _read_mode_tr(self, size, kind): """Read Traceroute option. Structure of Traceroute (TR) option [:rfc:`6814`]:: 0 8 16 24 +-+-+-+-+-+-+-+-+---------------+---------------+---------------+ |F| C | Number | Length | ID Number | +-+-+-+-+-+-+-+-+---------------+---------------+---------------+ | Outbound Hop Count | Return Hop Count | +---------------+---------------+---------------+---------------+ | Originator IP Address | +---------------+---------------+---------------+---------------+ Arguments: size (int): length of option kind (Literal[82]): option kind value (TR) Returns: DataType_Opt_Traceroute: extracted Traceroute option Raises: ProtocolError: If ``size`` is **NOT** ``12``. """ if size != 12: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') _idnm = self._read_unpack(2) _ohcn = self._read_unpack(2) _rhcn = self._read_unpack(2) _ipad = self._read_ipv4_addr() data = dict( kind=kind, type=self._read_opt_type(kind), length=size, id=_idnm, ohc=_ohcn, rhc=_rhcn, ip=_ipad, ) return data
[docs] def _read_mode_sec(self, size, kind): """Read options with security info. Structure of these options [:rfc:`1108`]: * Security (SEC) .. code:: text +------------+------------+------------+-------------//----------+ | 10000010 | XXXXXXXX | SSSSSSSS | AAAAAAA[1] AAAAAAA0 | | | | | [0] | +------------+------------+------------+-------------//----------+ TYPE = 130 LENGTH CLASSIFICATION PROTECTION LEVEL AUTHORITY FLAGS * Extended Security (ESEC) .. code:: text +------------+------------+------------+-------//-------+ | 10000101 | 000LLLLL | AAAAAAAA | add sec info | +------------+------------+------------+-------//-------+ TYPE = 133 LENGTH ADDITIONAL ADDITIONAL SECURITY INFO SECURITY FORMAT CODE INFO c """ if size < 3: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') _clvl = self._read_unpack(1) data = dict( kind=kind, type=self._read_opt_type(kind), length=size, level=_CLASSIFICATION_LEVEL.get(_clvl, _clvl), ) if size > 3: _list = list() for counter in range(3, size): _flag = self._read_binary(1) if (counter < size - 1 and not int(_flag[7], base=2)) \ or (counter == size - 1 and int(_flag[7], base=2)): raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') _dict = dict() for (index, bit) in enumerate(_flag[:5]): _auth = _PROTECTION_AUTHORITY.get(index) _dict[_auth] = bool(int(bit, base=2)) _list.append(Info(_dict)) data['flags'] = tuple(_list) return data
[docs] def _read_mode_rsralt(self, size, kind): """Read Router Alert option. Structure of Router Alert (RTRALT) option [:rfc:`2113`]:: +--------+--------+--------+--------+ |10010100|00000100| 2 octet value | +--------+--------+--------+--------+ Arguments: size (int): length of option kind (Literal[140]): option kind value (RTRALT) Returns: DataType_Opt_RouterAlert: extracted option with security info Raises: ProtocolError: If ``size`` is **NOT** ``4``. """ if size != 4: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format') _code = self._read_unpack(2) data = dict( kind=kind, type=self._read_opt_type(kind), length=size, alert=_ROUTER_ALERT.get(_code), code=_code, ) return data