Source code for pcapkit.toolkit.pyshark
# -*- coding: utf-8 -*-
""":mod:`PyShark <pyshark>` Tools
====================================
:mod:`pcapkit.toolkit.pyshark` contains all you need for
:mod:`pcapkit` handy usage with `PyShark`_ engine. All
reforming functions returns with a flag to indicate if
usable for its caller.
.. _PyShark: https://kiminewt.github.io/pyshark
"""
import ipaddress
from typing import TYPE_CHECKING, cast
from pcapkit.const.reg.linktype import LinkType as RegType_LinkType
from pcapkit.foundation.traceflow import Packet as TF_Packet
if TYPE_CHECKING:
from typing import Any
from pyshark.packet.packet import Packet
__all__ = ['packet2dict', 'tcp_traceflow']
[docs]def packet2dict(packet: 'Packet') -> 'dict[str, Any]':
"""Convert PyShark packet into :obj:`dict`.
Args:
packet: Scapy packet.
Returns:
A :obj:`dict` mapping of packet data.
"""
dict_ = {} # type: dict[str, Any]
frame = packet.frame_info
for field in frame.field_names:
dict_[field] = getattr(frame, field)
tempdict = dict_
for layer in packet.layers:
tempdict[layer.layer_name.upper()] = {}
tempdict = tempdict[layer.layer_name.upper()]
for field in layer.field_names:
tempdict[field] = getattr(layer, field)
return dict_
[docs]def tcp_traceflow(packet: 'Packet') -> 'TF_Packet | None':
"""Trace packet flow for TCP.
Args:
packet: Scapy packet.
Returns:
Tuple[bool, Dict[str, Any]]: A tuple of data for TCP reassembly.
* If the ``packet`` can be used for TCP flow tracing. A packet can be reassembled
if it contains TCP layer.
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP
flow tracing (:term:`trace.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`~pcapkit.foundation.traceflow.TraceFlow`
"""
if 'IP' in packet:
ip = cast('Packet', packet.ip)
elif 'IPv6' in packet:
ip = cast('Packet', packet.ipv6)
else:
return None
if 'TCP' in packet:
tcp = cast('Packet', packet.tcp)
data = TF_Packet( # type: ignore[type-var]
protocol=RegType_LinkType.get(packet.layers[0].layer_name.upper()), # data link type from global header
index=int(packet.number), # frame number
frame=packet2dict(packet), # extracted packet
syn=bool(int(tcp.flags_syn)), # TCP synchronise (SYN) flag
fin=bool(int(tcp.flags_fin)), # TCP finish (FIN) flag
src=ipaddress.ip_address(ip.src), # source IP
dst=ipaddress.ip_address(ip.dst), # destination IP
srcport=int(tcp.srcport), # TCP source port
dstport=int(tcp.dstport), # TCP destination port
timestamp=packet.frame_info.time_epoch, # timestamp
)
return data
return None