Trace TCP Flows

pcapkit.foundation.traceflow is the interface to trace TCP flows from a series of packets and connections.

Note

This was implemented as the demand of my mate @gousaiyang

class pcapkit.foundation.traceflow.TraceFlow(fout, format, byteorder='little', nanosecond=False)[source]

Bases: object

Trace TCP flows.

Parameters
  • *args (Any) – Arbitrary positional arguments.

  • **kwargs (Any) – Arbitrary keyword arguments.

Return type

TraceFlow

__init__(fout, format, byteorder='little', nanosecond=False)[source]

Initialise instance.

Parameters
  • fout (Optional[str]) – output path

  • format (Optional[str]) – output format

  • byteorder (Literal[‘little’, ‘big’]) – output file byte order

  • nanosecond (bool) – output nanosecond-resolution file flag

Return type

None

__call__(packet)[source]

Dump frame to output files.

Parameters

packet (Packet) – a flow packet (trace.packet)

Return type

None

property index: tuple[Index, ...]

Index table for traced flow.

dump(packet)[source]

Dump frame to output files.

Parameters

packet (Dict[str, Any]) – a flow packet (trace.packet)

Return type

None

trace(packet: Packet, *, output: Literal[True] = False) Dumper[source]
trace(packet: Packet, *, output: Literal[False] = False) str

Trace packets.

Parameters
  • packet – a flow packet (trace.packet)

  • output – flag if has formatted dumper

Returns

If output is True, returns the initiated Dumper object, which will dump data to the output file named after the flow label; otherwise, returns the flow label itself.

Notes

The flow label is formatted as following:

f'{packet.src}_{packet.srcport}-{packet.dst}_{info.dstport}-{packet.timestamp}'
submit()[source]

Submit traced TCP flows.

Returns

Traced TCP flow (trace.index).

Return type

tuple[Index, …]

classmethod register(format, module, class_, ext)[source]

Register a new dumper class.

Notes

The full qualified class name of the new dumper class should be as {module}.{class_}.

Parameters
  • format (str) – format name

  • module (str) – module name

  • class_ (str) – class name

  • ext (str) – file extension

Return type

None

classmethod make_fout(fout='./tmp', fmt='pcap')[source]

Make root path for output.

Parameters
  • fout (str) – root path for output

  • fmt (str) – output format

Returns

Dumper of specified format and file extension of output file.

Warns
  • FormatWarning – If fmt is not supported.

  • FileWarning – If fout exists and fmt is None.

Raises

FileExists – If fout exists and fmt is NOT None.

Return type

tuple[Type[Dumper], str | None]

__output__: DefaultDict[str, tuple[str, str, str | None]]

Format dumper mapping for writing output files. The values should be a tuple representing the module name, class name and file extension.

Type

DefaultDict[str, tuple[str, str, str | None]]

_buffer: dict[BufferID, Buffer]

Buffer field (trace.buffer).

Type

dict[BufferID, Buffer]

Terminology

trace.packet

Data structure for TCP flow tracing (TraceFlow.dump) is as following:

tract_dict = dict(
    protocol=data_link,                     # data link type from global header
    index=frame.info.number,                # frame number
    frame=frame.info,                       # extracted frame info
    syn=tcp.flags.syn,                      # TCP synchronise (SYN) flag
    fin=tcp.flags.fin,                      # TCP finish (FIN) flag
    src=ip.src,                             # source IP
    dst=ip.dst,                             # destination IP
    srcport=tcp.srcport,                    # TCP source port
    dstport=tcp.dstport,                    # TCP destination port
    timestamp=frame.info.time_epoch,        # frame timestamp
)
trace.buffer

Data structure for internal buffering when performing flow tracing algorithms (TraceFlow._buffer) is as following:

(dict) buffer --> memory buffer for reassembly
 |--> (tuple) BUFID : (dict)
 |       |--> ip.src      |
 |       |--> tcp.srcport |
 |       |--> ip.dst      |
 |       |--> tcp.dstport |
 |                        |--> 'fpout' : (dictdumper.dumper.Dumper) output dumper object
 |                        |--> 'index': (list) list of frame index
 |                        |              |--> (int) frame index
 |                        |--> 'label': (str) flow label generated from ``BUFID``
 |--> (tuple) BUFID ...
trace.index

Data structure for TCP flow tracing (element from TraceFlow.index tuple) is as following:

(tuple) index
 |--> (Info) data
 |     |--> 'fpout' : (Optional[str]) output filename if exists
 |     |--> 'index': (tuple) tuple of frame index
 |     |              |--> (int) frame index
 |     |--> 'label': (str) flow label generated from ``BUFID``
 |--> (Info) data ...

Data Structures

pcapkit.foundation.traceflow.BufferID: tuple[IPAddress, int, IPAddress, int]

Buffer ID is a tuple of source IP, source port, destination IP, and destination port.

class pcapkit.foundation.traceflow.Packet(protocol, index, frame, syn, fin, src, dst, srcport, dstport, timestamp)[source]

Bases: Info, Generic[IPAddress]

Data structure for TCP flow tracing.

Parameters
  • *args (VT) – Arbitrary positional arguments.

  • **kwargs (VT) – Arbitrary keyword arguments.

Return type

Info

protocol: RegType_LinkType

Data link type from global header.

index: int

Frame number.

frame: DataType_Frame | dict[str, Any]

Extracted frame info.

syn: bool

TCP synchronise (SYN) flag.

fin: bool

TCP finish (FIN) flag.

src: IPAddress

Source IP.

dst: IPAddress

Destination IP.

srcport: int

TCP source port.

dstport: int

TCP destination port.

timestamp: float

Frame timestamp.

class pcapkit.foundation.traceflow.Buffer(fpout, index, label)[source]

Bases: Info

Data structure for TCP flow tracing.

Parameters
  • *args (VT) – Arbitrary positional arguments.

  • **kwargs (VT) – Arbitrary keyword arguments.

Return type

Info

fpout: Dumper

Output dumper object.

index: list[int]

List of frame index.

label: str

Flow label generated from BUFID.

class pcapkit.foundation.traceflow.Index(fpout, index, label)[source]

Bases: Info

Data structure for TCP flow tracing.

Parameters
  • *args (VT) – Arbitrary positional arguments.

  • **kwargs (VT) – Arbitrary keyword arguments.

Return type

Info

fpout: Optional[str]

Output filename if exists.

index: tuple[int, ...]

Tuple of frame index.

label: str

Flow label generated from BUFID.