Protocol Family

pcapkit.protocols is collection of all protocol families, with detailed implementation and methods.

Base Protocol

class pcapkit.protocols.protocol.Protocol(file=None, length=None, **kwargs)[source]

Bases: object

Abstract base class for all protocol family.

__layer__ = None

Layer of protocol. Can be one of Link, Internet, Transport and Application.

Type

Literal[‘Link’, ‘Internet’, ‘Transport’, ‘Application’]

__proto__ = {}

Protocol index mapping for decoding next layer, c.f. self._decode_next_layer & self._import_next_layer. The values should be a tuple representing the module name and class name.

Type

DefaultDict[int, Tuple[str, str]]

__bytes__()[source]

Returns source data stream in bytes.

__contains__(name)[source]

Returns if name is in self._info.

Parameters

name (Any) – name to search

Returns

if name exists

Return type

bool

classmethod __eq__(other)[source]

Returns if other is of the same protocol as the current object.

Parameters

other (Union[Protocol, Type[Protocol]]) – Comparision against the object.

Returns

If other is of the same protocol as the current object.

Return type

bool

__getitem__(key)[source]

Subscription (getitem) support.

  • If key is a :obj`slice` object, ProtocolUnbound will be raised.

  • If key is a Protocol object, the method will fetch its indexes (id()).

  • Later, search the packet’s chain of protocols with the calculated key.

  • If no matches, then raises ProtocolNotFound.

Parameters

key (Union[str, Protocol, Type[Protocol]]) – Indexing key.

Returns

The sub-packet from the current packet of indexed protocol.

Return type

pcapkit.protocols.protocol.Protocol

Raises
__hash__()[source]

Return the hash value for self._data.

abstract classmethod __index__()[source]

Numeral registry index of the protocol.

Returns

Numeral registry index of the protocol.

Return type

enum.IntEnum

__init__(file=None, length=None, **kwargs)[source]

Initialisation.

Parameters
  • file (Optional[io.BytesIO]) – Source packet stream.

  • length (Optional[int]) – Length of packet data.

Keyword Arguments
  • _error (bool) – If the object is initiated after parsing errors (self._onerror).

  • _layer (str) – Parse packet until _layer (self._onerror).

  • _protocol (str) – Parse packet until _protocol (self._onerror).

  • **kwargs – Arbitrary keyword arguments.

__iter__()[source]

Iterate through self._data.

__length_hint__()[source]

Return an estimated length for the object.

__post_init__(file=None, length=None, **kwargs)[source]

Post initialisation hook.

Parameters
  • file (Optional[io.BytesIO]) – Source packet stream.

  • length (Optional[int]) – Length of packet data.

Keyword Arguments

**kwargs – Arbitrary keyword arguments.

See also

For construction argument, please refer to make().

__repr__()[source]

Returns representation of parsed protocol data.

Example

>>> protocol
<Frame Info(..., ethernet=Info(...), protocols='Ethernet:IPv6:Raw')>
_check_term_threshold()[source]

Check if reached termination threshold.

Returns

if reached termination threshold

Return type

bool

_decode_next_layer(dict_, proto=None, length=None)[source]

Decode next layer protocol.

Parameters
  • dict (dict) – info buffer

  • proto (int) – next layer protocol index

  • length (int) – valid (non-padding) length

Returns

current protocol with next layer extracted

Return type

dict

_import_next_layer(proto, length=None)[source]

Import next layer extractor.

Parameters
  • proto (int) – next layer protocol index

  • length (int) – valid (non-padding) length

Returns

instance of next layer

Return type

pcapkit.protocols.protocol.Protocol

classmethod _make_index(name, default=None, *, namespace=None, reversed=False, pack=False, size=4, signed=False, lilendian=False)[source]

Return first index of name from a dict or enumeration.

Parameters
Keyword Arguments
  • namespace (Union[dict, enum.EnumMeta]) – namespace for item

  • reversed (bool) – if namespace is str -> int pairs

  • pack (bool) – if need struct.pack() to pack the result

  • size (int) – buffer size

  • signed (bool) – signed flag

  • lilendian (bool) – little-endian flag

Returns

Index of name from a dict or enumeration. If packet is True, returns bytes; otherwise, returns int.

Return type

Union[int, bytes]

Raises

ProtocolNotImplemented – If name is NOT in namespace and default is None.

classmethod _make_pack(integer, *, size=1, signed=False, lilendian=False)[source]

Pack integers to bytes.

Parameters

integer (int) –

Keyword Arguments
  • size (int) – buffer size

  • signed (bool) – signed flag

  • lilendian (bool) – little-endian flag

Returns

Packed data upon success.

Return type

bytes

Raises

StructError – If failed to pack the integer.

_read_binary(size=1)[source]

Read bytes and convert into binaries.

Parameters

size (int) – buffer size

Returns

binary bits (0/1)

Return type

str

_read_fileng(*args, **kwargs)[source]

Read file buffer (self._file).

This method wraps the file.read() call.

Parameters

*args – arbitrary positional arguments

Keyword Arguments

**kwargs – arbitrary keyword arguments

Returns

Data read from file buffer.

Return type

bytes

_read_packet(length=None, *, header=None, payload=None, discard=False)[source]

Read raw packet data.

Parameters

length (int) – length of the packet

Keyword Arguments
  • header (Optional[int]) – length of the packet header

  • payload (Optional[int]) – length of the packet payload

  • discard (bool) – flag if discard header data

Returns

  • If header omits, returns the whole packet data in bytes.

  • If discard is set as True, returns the packet body (in bytes) only.

  • Otherwise, returns the header and payload data as a dict:

    class Packet(TypedDict):
        """Header and payload data."""
    
        #: packet header
        header: bytes
        #: packet payload
        payload: bytes
    

_read_protos(size)[source]

Read next layer protocol type.

Parameters

size (int) – buffer size

Returns

  • If succeed, returns the name of next layer protocol (str).

  • If fail, returns None.

_read_unpack(size=1, *, signed=False, lilendian=False, quiet=False)[source]

Read bytes and unpack for integers.

Parameters

size (int) – buffer size

Keyword Arguments
  • signed (bool) – signed flag

  • lilendian (bool) – little-endian flag

  • quiet (bool) – quiet (no exception) flag

Returns

unpacked data upon success

Return type

Optional[int]

Raises

StructError – If unpack (struct.pack()) failed, and struct.error raised.

static decode(byte, *, encoding=None, errors='strict')[source]

Decode bytes into str.

Should decoding failed using encoding, the method will try again decoding the bytes as 'unicode_escape'.

Parameters

byte (bytes) – Source bytestring.

Keyword Arguments
  • encoding (Optional[str]) – The encoding with which to decode the bytes. If not provided, pcapkit will first try detecting its encoding using chardet. The fallback encoding would is UTF-8.

  • errors (str) – The error handling scheme to use for the handling of decoding errors. The default is 'strict' meaning that decoding errors raise a UnicodeDecodeError. Other possible values are 'ignore' and 'replace' as well as any other name registered with codecs.register_error() that can handle UnicodeDecodeError.

Returns

Decoede string.

Return type

str

See also

bytes.decode()

classmethod id()[source]

Index ID of the protocol.

By default, it returns the name of the protocol.

Returns

Index ID of the protocol.

Return type

Union[str, Tuple[str]]

abstract make(**kwargs)[source]

Make (construct) packet data.

Keyword Arguments

**kwargs – Arbitrary keyword arguments.

Returns

Constructed packet data.

Return type

bytes

abstract read(length=None, **kwargs)[source]

Read (parse) packet data.

Parameters

length (Optional[int]) – Length of packet data.

Keyword Arguments

**kwargs – Arbitrary keyword arguments.

Returns

Parsed packet data.

Return type

dict

static unquote(url, *, encoding='utf-8', errors='replace')[source]

Unquote URLs into readable format.

Should decoding failed , the method will try again replacing '%' with '\x' then decoding the url as 'unicode_escape'.

Parameters

url (str) – URL string.

Keyword Arguments
  • encoding (str) – The encoding with which to decode the bytes.

  • errors (str) – The error handling scheme to use for the handling of decoding errors. The default is 'strict' meaning that decoding errors raise a UnicodeDecodeError. Other possible values are 'ignore' and 'replace' as well as any other name registered with codecs.register_error() that can handle UnicodeDecodeError.

Returns

Unquoted string.

Return type

str

_exlayer = None

Parse packet until such layer.

Type

str

_exproto = None

Parse packet until such protocol.

Type

str

_onerror = None

If the object is initiated after parsing errors.

Type

bool

_seekset = None

Initial offset of self._file

Type

int

_sigterm = None

If terminate parsing next layer of protocol.

Type

bool

property alias

Acronym of current protocol.

Return type

str

property data

Binary packet data of current instance.

Return type

bytes

property info

Info dict of current instance.

Return type

pcapkit.corekit.infoclass.Info

abstract property length

Header length of current protocol.

Return type

int

abstract property name

Name of current protocol.

Return type

str

property payload

Payload of current instance.

Return type

pcapkit.protocols.protocol.Protocol

property protochain

Protocol chain of current instance.

Return type

pcapkit.corekit.protochain.ProtoChain

property protocol

Name of next layer protocol (if any).

Return type

Optional[str]