Source code for pcapkit.reassembly.reassembly
# -*- coding: utf-8 -*-
"""fragmented packets reassembly
:mod:`pcapkit.reassembly.reassembly` contains
class:`~pcapkit.reassembly.reassembly.Reassembly` only,
which is an abstract base class for all reassembly classes,
bases on algorithms described in :rfc:`815`, implements
datagram reassembly of IP and TCP packets.
"""
import abc
import copy
from pcapkit.corekit.infoclass import Info
from pcapkit.utilities.validations import frag_check, int_check
__all__ = ['Reassembly']
[docs]class Reassembly(metaclass=abc.ABCMeta):
"""Base class for reassembly procedure."""
##########################################################################
# Properties.
##########################################################################
# protocol of current packet
@property
@abc.abstractmethod
def name(self):
"""Protocol of current packet.
:rtype: str
"""
# total number of reassembled packets
@property
def count(self):
"""Total number of reassembled packets.
:rtype: int
"""
return len(self.fetch())
# reassembled datagram
@property
def datagram(self):
"""Reassembled datagram.
:rtype: tuple
"""
return self.fetch()
@property
@abc.abstractmethod
def protocol(self):
"""Protocol of current reassembly object.
:rtype: str
"""
##########################################################################
# Methods.
##########################################################################
# reassembly procedure
[docs] @abc.abstractmethod
def reassembly(self, info):
"""Reassembly procedure.
Arguments:
info (pcapkit.corekit.infoclass.Info): info dict of packets to be reassembled
"""
# submit reassembled payload
[docs] @abc.abstractmethod
def submit(self, buf, **kwargs):
"""Submit reassembled payload.
Arguments:
buf (dict): buffer dict of reassembled packets
"""
# fetch datagram
[docs] def fetch(self):
"""Fetch datagram.
Returns:
Tuple[dict]: Tuple of reassembled datagrams.
Fetch reassembled datagrams from
:attr:`~pcapkit.reassembly.reassembly.Reassembly._dtgram`
and returns a *tuple* of such datagrams.
If :attr:`~pcapkit.reassembly.reassembly.Reassembly._newflg`
set as ``True``, the method will call
:meth:`~pcapkit.reassembly.reassembly.Reassembly.submit` to
(*force*) obtain newly reassembled payload. Otherwise, the
already calculated :attr:`~pcapkit.reassembly.reassembly.Reassembly._dtgram`
will be returned.
"""
if self._newflg:
self._newflg = False
temp_dtgram = copy.deepcopy(self._dtgram)
for (bufid, buffer) in self._buffer.items():
temp_dtgram += self.submit(buffer, bufid=bufid)
return tuple(temp_dtgram)
return tuple(self._dtgram)
# return datagram index
[docs] def index(self, pkt_num):
"""Return datagram index.
Arguments:
pkt_num (int): index of packet
Returns:
Optional[int]: reassembled datagram index which was from No. ``pkt_num`` packet;
if not found, returns ``None``
"""
int_check(pkt_num)
for counter, datagram in enumerate(self.datagram):
if pkt_num in datagram.index:
return counter
return None
# run automatically
[docs] def run(self, packets):
"""Run automatically.
Arguments:
packets (List[dict]): list of packet dicts to be reassembled
"""
for packet in packets:
frag_check(packet, protocol=self.protocol)
info = Info(packet)
self.reassembly(info)
self._newflg = True
##########################################################################
# Data models.
##########################################################################
#: Not hashable.
__hash__ = None
[docs] def __init__(self, *, strict=True):
"""Initialise packet reassembly.
Keyword arguments:
strict (bool): if return all datagrams (including those not
implemented) when submit
"""
#: bool: if new packets reassembled flag
self._newflg = False
#: bool: strict mode flag
self._strflg = strict
#: dict buffer field
self._buffer = dict()
#: list reassembled datagram
self._dtgram = list()
[docs] def __call__(self, packet):
"""Call packet reassembly.
Arguments:
packet (dict): packet dict to be reassembled
(detailed format described in corresponding protocol)
"""
frag_check(packet, protocol=self.protocol)
info = Info(packet)
self.reassembly(info)
self._newflg = True