Source code for pcapkit.protocols.application.httpv1
# -*- coding: utf-8 -*-
"""hypertext transfer protocol (HTTP/1.*)
:mod:`pcapkit.protocols.application.httpv1` contains
:class:`~pcapkit.protocols.application.httpv1.HTTPv1`
only, which implements extractor for Hypertext Transfer
Protocol (HTTP/1.*) [*]_, whose structure is described
as below::
METHOD URL HTTP/VERSION\\r\\n :==: REQUEST LINE
<key> : <value>\\r\\n :==: REQUEST HEADER
............ (Ellipsis) :==: REQUEST HEADER
\\r\\n :==: REQUEST SEPARATOR
<body> :==: REQUEST BODY (optional)
HTTP/VERSION CODE DESP \\r\\n :==: RESPONSE LINE
<key> : <value>\\r\\n :==: RESPONSE HEADER
............ (Ellipsis) :==: RESPONSE HEADER
\\r\\n :==: RESPONSE SEPARATOR
<body> :==: RESPONSE BODY (optional)
.. [*] https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol
"""
import re
from pcapkit.protocols.application.http import HTTP
from pcapkit.utilities.exceptions import ProtocolError
__all__ = ['HTTPv1']
#: Supported HTTP method.
HTTP_METHODS = [
'GET', 'HEAD', 'POST', 'PUT',
'DELETE', 'TRACE', 'OPTIONS',
'CONNECT', 'PATCH',
]
#: Regular expression to match HTTP methods.
_RE_METHOD = re.compile(r'|'.join(HTTP_METHODS).encode())
#: Regular expression to match HTTP version string.
_RE_VERSION = re.compile(rb"HTTP/(?P<version>\d\.\d)")
#: Regular expression to match HTTP status code.
_RE_STATUS = re.compile(rb'\d{3}')
[docs]class HTTPv1(HTTP):
"""This class implements Hypertext Transfer Protocol (HTTP/1.*)."""
##########################################################################
# Defaults.
##########################################################################
#: Literal['request', 'response']: Type of HTTP receipt.
_receipt = None
##########################################################################
# Properties.
##########################################################################
@property
def alias(self):
"""Acronym of current protocol.
:rtype: Literal['HTTP/0.9', 'HTTP/1.0', 'HTTP/1.1']
"""
return f'HTTP/{self._info.header[self._receipt].version}' # pylint: disable=E1101
##########################################################################
# Methods.
##########################################################################
[docs] def read(self, length=None, **kwargs): # pylint: disable=unused-argument
"""Read Hypertext Transfer Protocol (HTTP/1.*).
Structure of HTTP/1.* packet [:rfc:`7230`]::
HTTP-message :==: start-line
*( header-field CRLF )
CRLF
[ message-body ]
Args:
length (Optional[int]): Length of packet data.
Keyword Args:
**kwargs: Arbitrary keyword arguments.
Returns:
DataType_HTTP: Parsed packet data.
Raises:
ProtocolError: If the packet is malformed.
"""
if length is None:
length = len(self)
packet = self._file.read(length)
try:
header, body = packet.split(b'\r\n\r\n', maxsplit=1)
except ValueError:
raise ProtocolError('HTTP: invalid format', quiet=True)
header_unpacked, http_receipt = self._read_http_header(header)
body_unpacked = self._read_http_body(body) or None
http = dict(
receipt=http_receipt,
header=header_unpacked,
body=body_unpacked,
raw=dict(
header=header,
body=body,
packet=self._read_packet(length),
),
)
self._receipt = http_receipt
return http
[docs] def make(self, **kwargs):
"""Make (construct) packet data.
Keyword Args:
**kwargs: Arbitrary keyword arguments.
Returns:
bytes: Constructed packet data.
"""
raise NotImplementedError
[docs] @classmethod
def id(cls):
"""Index ID of the protocol.
Returns:
Literal['HTTPv1']: Index ID of the protocol.
"""
return cls.__name__
##########################################################################
# Utilities.
##########################################################################
[docs] def _read_http_body(self, body): # pylint: disable=no-self-use
"""Read HTTP/1.* body.
Args:
body (bytes): HTTP body data.
Returns:
str: Raw HTTP body.
"""
return body