Source code for pcapkit.utilities.decorators
# -*- coding: utf-8 -*-
# pylint: disable=protected-access
"""Decorator Functions
=========================
:mod:`pcapkit.utilities.decorators` contains several useful
decorators, including :func:`~pcapkit.utilities.decorators.seekset`
and :func:`~pcapkit.utilities.decorators.beholder`.
"""
import functools
import io
import os
from typing import TYPE_CHECKING, cast
from pcapkit.utilities.exceptions import StructError
if TYPE_CHECKING:
from typing import Callable, Optional, TypeVar
from typing_extensions import Concatenate, ParamSpec
from pcapkit.protocols.protocol import Protocol
P = ParamSpec('P')
R = TypeVar('R')
__all__ = ['seekset', 'beholder']
[docs]def seekset(func: 'Callable[Concatenate[Protocol, P], R]') -> 'Callable[P, R]':
"""Read file from start then set back to original.
Important:
This decorator function is designed for decorating *class methods*.
The decorator will keep the current offset of :attr:`self._file <pcapkit.protocols.protocol.Protocol._file>`,
then call the decorated function. Afterwards, it will rewind the offset of
:attr:`self._file <pcapkit.protocols.protocol.Protocol._file>` to the original and returns the return value from
the decorated function.
Note:
The decorated function should have following signature::
func(self, *args, **kw)
See Also:
:meth:`pcapkit.protocols.protocol.Protocol._read_packet`
:param func: decorated function
:meta decorator:
"""
@functools.wraps(func)
def seekcur(*args: 'P.args', **kw: 'P.kwargs') -> 'R':
# extract self object
self = cast('Protocol', args[0])
# move file pointer
seek_cur = self._file.tell()
self._file.seek(self._seekset, os.SEEK_SET)
# call method
return_ = func(*args, **kw)
# reset file pointer
self._file.seek(seek_cur, os.SEEK_SET)
return return_
return seekcur
[docs]def beholder(func: 'Callable[Concatenate[Protocol, int, Optional[int], P], R]') -> 'Callable[P, R]':
"""Behold extraction procedure.
Important:
This decorator function is designed for decorating *class methods*.
This decorate first keep the current offset of
:attr:`self._file <pcapkit.protocols.protocol.Protocol._file>`, then
try to call the decorated function. Should any exception raised, it will
re-parse the :attr:`self._file <pcapkit.protocols.protocol.Protocol._file>`
as :class:`~pcapkit.protocols.misc.raw.Raw` protocol.
Note:
The decorated function should have following signature::
func(self, proto, length, *args, **kwargs)
See Also:
:meth:`pcapkit.protocols.protocol.Protocol._decode_next_layer`
:param func: decorated function
:meta decorator:
"""
@functools.wraps(func)
def behold(*args: 'P.args', **kwargs: 'P.kwargs') -> 'R':
# extract self object & args
self = cast('Protocol', args[0])
try:
length = cast('int', args[2])
except IndexError:
length = None
# record file pointer
seek_cur = self._file.tell()
try:
# call method
return func(*args, **kwargs)
except Exception as exc:
if isinstance(exc, StructError) and exc.eof: # pylint: disable=no-member
from pcapkit.protocols.misc.null import NoPayload as protocol # isort: skip # pylint: disable=import-outside-toplevel
else:
from pcapkit.protocols.misc.raw import Raw as protocol # type: ignore[no-redef] # isort: skip # pylint: disable=import-outside-toplevel
# error = traceback.format_exc(limit=1).strip().rsplit(os.linesep, maxsplit=1)[-1]
# log error
#logger.error(str(exc), exc_info=exc, stack_info=DEVMODE, stacklevel=stacklevel())
self._file.seek(seek_cur, os.SEEK_SET)
next_ = protocol(io.BytesIO(self._read_fileng(length)), length, error=str(exc))
return cast('R', next_)
return behold