Source code for pcapkit.utilities.validations

# -*- coding: utf-8 -*-
"""validation utilities

:mod:`pcapkit.utilities.validations` contains functions to
validate arguments for functions and classes. It was first
used in `PyNTLib`_ as validators.

.. _PyNTLib: https://github.com/JarryShaw/pyntlib

"""
import collections.abc
import enum
import inspect
import io
import ipaddress
import numbers

import aenum

from pcapkit.utilities.exceptions import (BoolError, BytearrayError, BytesError, ComplexError,
                                          DictError, DigitError, EnumError, FragmentError,
                                          InfoError, IntError, IOObjError, IPError, ListError,
                                          PacketError, RealError, StringError, TupleError)

__all__ = [
    'int_check', 'real_check', 'complex_check', 'number_check',
    'bool_check', 'bytes_check', 'bytearray_check', 'str_check',
    'list_check', 'dict_check', 'tuple_check', 'io_check',
    'frag_check', 'pkt_check', 'info_check', 'ip_check'
]


[docs]def int_check(*args, stacklevel=2): """Check if arguments are *integrals* (``int``). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: IntError: If any of the arguments is **NOT** *integral* (``int``). """ for var in args: if not isinstance(var, numbers.Integral): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise IntError(f'Function {func} expected integral number, {name} got instead.')
[docs]def real_check(*args, stacklevel=2): """Check if arguments are *real numbers* (``int`` and/or ``float``). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: RealError: If any of the arguments is **NOT** *real number* (``int`` and/or ``float``). """ for var in args: if not isinstance(var, numbers.Real): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise RealError(f'Function {func} expected real number, {name} got instead.')
[docs]def complex_check(*args, stacklevel=2): """Check if arguments are *complex numbers* (``complex``). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: ComplexError: If any of the arguments is **NOT** *complex number* (``complex``). """ for var in args: if not isinstance(var, numbers.Complex): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise ComplexError(f'Function {func} expected complex number, {name} got instead.')
[docs]def number_check(*args, stacklevel=2): """Check if arguments are *numbers*. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: DigitError: If any of the arguments is **NOT** *number* (``int``, ``float`` and/or ``complex``). """ for var in args: if not isinstance(var, numbers.Number): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise DigitError(f'Function {func} expected number, {name} got instead.')
[docs]def bytes_check(*args, stacklevel=2): """Check if arguments are :obj:`bytes` type. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: BytesError: If any of the arguments is **NOT** :obj:`bytes` type. """ for var in args: if not isinstance(var, (bytes, collections.abc.ByteString)): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise BytesError(f'Function {func} expected bytes, {name} got instead.')
[docs]def bytearray_check(*args, stacklevel=2): """Check if arguments are ``bytearray`` type. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: BytearrayError: If any of the arguments is **NOT** ``bytearray`` type. """ for var in args: if not isinstance(var, (bytearray, collections.abc.ByteString, collections.abc.MutableSequence)): name = type(var).__name__func = inspect.stack()[stacklevel][3] func = inspect.stack()[stacklevel][3] raise BytearrayError(f'Function {func} expected bytearray, {name} got instead.')
[docs]def str_check(*args, stacklevel=2): """Check if arguments are :obj:`str` type. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: StringError: If any of the arguments is **NOT** :obj:`str` type. """ for var in args: if not isinstance(var, (str, collections.UserString, collections.abc.Sequence)): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise StringError(f'Function {func} expected str, {name} got instead.')
[docs]def bool_check(*args, stacklevel=2): """Check if arguments are ``bool`` type. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: BoolError: If any of the arguments is **NOT** ``bool`` type. """ for var in args: if not isinstance(var, bool): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise BoolError(f'Function {func} expected bool, {name} got instead.')
[docs]def list_check(*args, stacklevel=2): """Check if arguments are ``list`` type. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: ListError: If any of the arguments is **NOT** ``list`` type. """ for var in args: if not isinstance(var, (list, collections.UserList, collections.abc.MutableSequence)): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise ListError(f'Function {func} expected list, {name} got instead.')
[docs]def dict_check(*args, stacklevel=2): """Check if arguments are :obj:`dict` type. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: DictError: If any of the arguments is **NOT** :obj:`dict` type. """ for var in args: if not isinstance(var, (dict, collections.UserDict, collections.abc.MutableMapping)): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise DictError(f'Function {func} expected dict, {name} got instead.')
[docs]def tuple_check(*args, stacklevel=2): """Check if arguments are :obj:`tuple` type. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: TupleError: If any of the arguments is **NOT** :obj:`tuple` type. """ for var in args: if not isinstance(var, (tuple, collections.abc.Sequence)): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise TupleError(f'Function {func} expected tuple, {name} got instead.')
[docs]def io_check(*args, stacklevel=2): """Check if arguments are *file-like object* (``io.IOBase``). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: IOObjError: If any of the arguments is **NOT** *file-like object* (``io.IOBase``). """ for var in args: if not isinstance(var, io.IOBase): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise IOObjError(f'Function {func} expected file-like object, {name} got instead.')
[docs]def info_check(*args, stacklevel=2): """Check if arguments are :class:`~pcapkit.corekit.infoclass.Info` instances. Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: InfoError: If any of the arguments is **NOT** :class:`~pcapkit.corekit.infoclass.Info` instance. """ from pcapkit.corekit.infoclass import Info # pylint: disable=import-outside-toplevel for var in args: if not isinstance(var, Info): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise InfoError(f'Function {func} expected Info instance, {name} got instead.')
[docs]def ip_check(*args, stacklevel=2): """Check if arguments are *IP addresses* (``ipaddress.IPv4Address`` and/or ``ipaddress.IPv6Address``). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: IPError: If any of the arguments is **NOT** *IP address* (``ipaddress.IPv4Address`` and/or ``ipaddress.IPv6Address``). """ for var in args: if not isinstance(var, (ipaddress.IPv4Address, ipaddress.IPv6Address)): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise IPError(f'Function {func} expected IP address, {name} got instead.')
[docs]def enum_check(*args, stacklevel=2): """Check if arguments are of *enumeration protocol* type (``enum.EnumMeta`` and/or ``aenum.EnumMeta``). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: EnumError: If any of the arguments is **NOT** *enumeration protocol* type (``enum.EnumMeta`` and/or ``aenum.EnumMeta``). """ for var in args: if not isinstance(var, (enum.EnumMeta, aenum.EnumMeta)): name = type(var).__name__ func = inspect.stack()[stacklevel][3] raise EnumError(f'Function {func} expected enumeration, {name} got instead.')
[docs]def frag_check(*args, protocol, stacklevel=3): """Check if arguments are valid fragments. Args: *args: Arguments to check. protocol (str): Originated fragmentation protocol (IPv4, IPv6 or TCP). stacklevel (int): Stack level to fetch originated function name. - If the protocol is IPv4, the fragment should be as an IPv4 :term:`fragmentation <ipv4.packet>`. - If the protocol is IPv6, the fragment should be as an IPv6 :term:`fragmentation <ipv6.packet>`. - If the protocol is TCP, the fragment should be as an TCP :term:`fragmentation <tcp.packet>`. Raises: FragmentError: If any of the arguments is **NOT** valid fragment. See Also: * :func:`pcapkit.utilities.validations._ip_frag_check` * :func:`pcapkit.utilities.validations._tcp_frag_check` """ if 'IP' in protocol: try: _ip_frag_check(*args, stacklevel=stacklevel) except KeyError as error: raise FragmentError(f'Missing fragment key: {error.args[0]}') except Exception as error: raise FragmentError(f'Malformed fragment object: {error}') elif 'TCP' in protocol: try: _tcp_frag_check(*args, stacklevel=stacklevel) except KeyError as error: raise FragmentError(f'Missing fragment key: {error.args[0]}') except Exception as error: raise FragmentError(f'Malformed fragment object: {error}') else: raise FragmentError(f'Unknown fragmented protocol {protocol}.')
[docs]def _ip_frag_check(*args, stacklevel=3): """Check if arguments are valid IP fragments (:term:`IPv4 <ipv4.packet>` and/or :term:`IPv6 <ipv6.packet>` packet). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. See Also: * :func:`pcapkit.toolkit.default.ipv4_reassembly` * :func:`pcapkit.toolkit.default.ipv6_reassembly` """ for var in args: dict_check(var, stacklevel=stacklevel) bufid = var['bufid'] str_check(bufid[3], stacklevel=stacklevel) bool_check(var['mf'], stacklevel=stacklevel) ip_check(bufid[0], bufid[1], stacklevel=stacklevel) bytearray_check(var['header'], var['payload'], stacklevel=stacklevel) int_check(bufid[2], var['num'], var['fo'], var['ihl'], var['tl'], stacklevel=stacklevel)
[docs]def _tcp_frag_check(*args, stacklevel=3): """Check if arguments are valid TCP fragments (:term:`TCP packet <tcp.packet>`). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. See Also: :func:`pcapkit.toolkit.default.tcp_reassembly` """ for var in args: dict_check(var, stacklevel=stacklevel) bufid = var['bufid'] ip_check(bufid[0], bufid[1], stacklevel=stacklevel) bytearray_check(var['payload'], stacklevel=stacklevel) bool_check(var['syn'], var['fin'], stacklevel=stacklevel) int_check(bufid[2], bufid[3], var['num'], var['ack'], var['dsn'], var['first'], var['last'], var['len'], stacklevel=stacklevel)
[docs]def pkt_check(*args, stacklevel=3): """Check if arguments are valid packets (:term:`TCP packet <trace.packet>`). Args: *args: Arguments to check. stacklevel (int): Stack level to fetch originated function name. Raises: PacketError: If any of the arguments is **NOT** valid packet. See Also: :func:`pcapkit.toolkit.default.tcp_traceflow` """ try: for var in args: dict_check(var, stacklevel=stacklevel) dict_check(var['frame'], stacklevel=stacklevel) enum_check(var['protocol'], stacklevel=stacklevel) real_check(var['timestamp'], stacklevel=stacklevel) ip_check(var['src'], var['dst'], stacklevel=stacklevel) bool_check(var['syn'], var['fin'], stacklevel=stacklevel) int_check(var['srcport'], var['dstport'], var['index'], stacklevel=stacklevel) except KeyError as error: raise PacketError(f'Missing packet key: {error.args[0]}') except Exception as error: raise PacketError(f'Malformed packet object: {error}')
############################################################################### # Test Codes # # func = sys._getframe().f_back.f_code.co_name # spec = inspect.getfullargspec(test) # argv = spec.args + spec.kwonlyargs # for index, var in enumerate(args): # if not isinstance(var, numbers.Integral): # raise IntError( f'Function {func.__name__} expected argument {argv[index]} ' # f'to be integral number, {type(var).__name__} got instead.' ) # ###############################################################################