Source code for pcapkit.reassembly.ip

# -*- coding: utf-8 -*-
"""IP fragments reassembly

:mod:`pcapkit.reassembly.ipv4` contains
:class:`~pcapkit.reassembly.ipv4.IP_Reassembly`
only, which reconstructs fragmented IP packets back to
origin. The following algorithm implement is based on IP
reassembly procedure introduced in :rfc:`791`, using
``RCVBT`` (fragment receivedbit table). Though another
algorithm is explained in :rfc:`815`, replacing ``RCVBT``,
however, this implement still used the elder one.

Notations::

    FO    - Fragment Offset
    IHL   - Internet Header Length
    MF    - More Fragments flag
    TTL   - Time To Live
    NFB   - Number of Fragment Blocks
    TL    - Total Length
    TDL   - Total Data Length
    BUFID - Buffer Identifier
    RCVBT - Fragment Received Bit Table
    TLB   - Timer Lower Bound

Algorithm::

    DO {
        BUFID <- source|destination|protocol|identification;

        IF (FO = 0 AND MF = 0) {
            IF (buffer with BUFID is allocated) {
                flush all reassembly for this BUFID;
                Submit datagram to next step;
                DONE.
            }
        }

        IF (no buffer with BUFID is allocated) {
            allocate reassembly resources with BUFID;
            TIMER <- TLB;
            TDL <- 0;
            put data from fragment into data buffer with BUFID
                [from octet FO*8 to octet (TL-(IHL*4))+FO*8];
            set RCVBT bits [from FO to FO+((TL-(IHL*4)+7)/8)];
        }

        IF (MF = 0) {
            TDL <- TL-(IHL*4)+(FO*8)
        }

        IF (FO = 0) {
            put header in header buffer
        }

        IF (TDL # 0 AND all RCVBT bits [from 0 to (TDL+7)/8] are set) {
            TL <- TDL+(IHL*4)
            Submit datagram to next step;
            free all reassembly resources for this BUFID;
            DONE.
        }

        TIMER <- MAX(TIMER,TTL);

    } give up until (next fragment or timer expires);

    timer expires: {
        flush all reassembly with this BUFID;
        DONE.
    }

"""
from pcapkit.corekit.infoclass import Info
from pcapkit.reassembly.reassembly import Reassembly


__all__ = ['IP_Reassembly']


[docs]class IP_Reassembly(Reassembly): # pylint: disable=abstract-method """Reassembly for IP payload.""" ########################################################################## # Methods. ##########################################################################
[docs] def reassembly(self, info): """Reassembly procedure. Arguments: info (pcapkit.corekit.infoclass.Info): info dict of packets to be reassembled """ BUFID = info.bufid # Buffer Identifier FO = info.fo # Fragment Offset IHL = info.ihl # Internet Header Length MF = info.mf # More Fragments flag TL = info.tl # Total Length # when non-fragmented (possibly discarded) packet received if not FO and not MF: if BUFID in self._buffer: self._dtgram += self.submit(self._buffer[BUFID]) del self._buffer[BUFID] return # initialise buffer with BUFID if BUFID not in self._buffer: self._buffer[BUFID] = dict( TDL=0, # Total Data Length RCVBT=bytearray(8191), # Fragment Received Bit Table index=list(), # index record header=bytearray(), # header buffer datagram=bytearray(65535), # data buffer ) # append packet index self._buffer[BUFID]['index'].append(info.num) # put data into data buffer start = FO stop = TL - IHL + FO self._buffer[BUFID]['datagram'][start:stop] = info.payload # set RCVBT bits (in 8 octets) start = FO // 8 stop = FO // 8 + (TL - IHL + 7) // 8 self._buffer[BUFID]['RCVBT'][start:stop] = b'\x01' * (stop - start + 1) # get total data length (header excludes) if not MF: TDL = TL - IHL + FO # put header into header buffer if not FO: self._buffer[BUFID]['header'] = info.header # when datagram is reassembled in whole start = 0 stop = (TDL + 7) // 8 if TDL and all(self._buffer[BUFID]['RCVBT'][start:stop]): self._dtgram += self.submit(self._buffer[BUFID], checked=True) del self._buffer[BUFID]
[docs] def submit(self, buf, *, checked=False): # pylint: disable=arguments-differ """Submit reassembled payload. Arguments: buf (dict): buffer dict of reassembled packets Keyword Arguments: bufid (tuple): buffer identifier Returns: list: reassembled packets """ TDL = buf['TDL'] RCVBT = buf['RCVBT'] index = buf['index'] header = buf['header'] datagram = buf['datagram'] start = 0 stop = (TDL + 7) // 8 flag = checked or (TDL and all(RCVBT[start:stop])) # if datagram is not implemented if not flag and self._strflg: data = list() byte = bytearray() # extract received payload for (bctr, bit) in enumerate(RCVBT): if bit: # received bit this = bctr * 8 that = this + 8 byte += datagram[this:that] else: # missing bit if byte: # strip empty payload data.append(bytes(byte)) byte = bytearray() # strip empty packets if data or header: packet = Info( NotImplemented=True, index=tuple(index), header=header or None, payload=tuple(data) or None, ) # if datagram is reassembled in whole else: payload = datagram[:TDL] packet = Info( NotImplemented=False, index=tuple(index), packet=(bytes(header) + bytes(payload)) or None, ) return [packet]