Source code for pcapkit.toolkit.pyshark
# -*- coding: utf-8 -*-
"""PyShark tools
:mod:`pcapkit.toolkit.pyshark` contains all you need for
:mod:`pcapkit` handy usage with `PyShark`_ engine. All
reforming functions returns with a flag to indicate if
usable for its caller.
.. _PyShark: https://kiminewt.github.io/pyshark
"""
import ipaddress
from pcapkit.const.reg.linktype import LinkType as LINKTYPE
__all__ = ['packet2dict', 'tcp_traceflow']
[docs]def packet2dict(packet):
"""Convert PyShark packet into :obj:`dict`.
Args:
packet (pyshark.packet.packet.Packet): Scapy packet.
Returns:
Dict[str, Any]: A :obj:`dict` mapping of packet data.
"""
dict_ = dict()
frame = packet.frame_info
for field in frame.field_names:
dict_[field] = getattr(frame, field)
tempdict = dict_
for layer in packet.layers:
tempdict[layer.layer_name.upper()] = dict()
tempdict = tempdict[layer.layer_name.upper()]
for field in layer.field_names:
tempdict[field] = getattr(layer, field)
return dict_
[docs]def tcp_traceflow(packet):
"""Trace packet flow for TCP.
Args:
packet (pyshark.packet.packet.Packet): Scapy packet.
Returns:
Tuple[bool, Dict[str, Any]]: A tuple of data for TCP reassembly.
* If the ``packet`` can be used for TCP flow tracing. A packet can be reassembled
if it contains TCP layer.
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP
flow tracing (:term:`trace.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`~pcapkit.foundation.traceflow.TraceFlow`
"""
if 'TCP' in packet:
ip = packet.ip if 'IP' in packet else packet.ipv6
tcp = packet.tcp
data = dict(
protocol=LINKTYPE.get(packet.layers[0].layer_name.upper()), # data link type from global header
index=int(packet.number), # frame number
frame=packet2dict(packet), # extracted packet
syn=bool(int(tcp.flags_syn)), # TCP synchronise (SYN) flag
fin=bool(int(tcp.flags_fin)), # TCP finish (FIN) flag
src=ipaddress.ip_address(ip.src), # source IP
dst=ipaddress.ip_address(ip.dst), # destination IP
srcport=int(tcp.srcport), # TCP source port
dstport=int(tcp.dstport), # TCP destination port
timestamp=packet.frame_info.time_epoch, # timestamp
)
return True, data
return False, None