Source code for pcapkit.protocols.application.httpv1
# -*- coding: utf-8 -*-
r"""HTTP/1.* - Hypertext Transfer Protocol
============================================
:mod:`pcapkit.protocols.application.httpv1` contains
:class:`~pcapkit.protocols.application.httpv1.HTTP`
only, which implements extractor for Hypertext Transfer
Protocol (HTTP/1.*) [*]_, whose structure is described
as below:
.. code-block:: text
METHOD URL HTTP/VERSION\r\n :==: REQUEST LINE
<key> : <value>\r\n :==: REQUEST HEADER
............ (Ellipsis) :==: REQUEST HEADER
\r\n :==: REQUEST SEPARATOR
<body> :==: REQUEST BODY (optional)
HTTP/VERSION CODE DESP \r\n :==: RESPONSE LINE
<key> : <value>\r\n :==: RESPONSE HEADER
............ (Ellipsis) :==: RESPONSE HEADER
\r\n :==: RESPONSE SEPARATOR
<body> :==: RESPONSE BODY (optional)
.. [*] https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol
"""
import re
from typing import TYPE_CHECKING
from pcapkit.corekit.multidict import OrderedMultiDict
from pcapkit.protocols.application.http import HTTP as HTTPBase
from pcapkit.protocols.data.application.httpv1 import HTTP as DataType_HTTP
from pcapkit.protocols.data.application.httpv1 import RequestHeader as DataType_RequestHeader
from pcapkit.protocols.data.application.httpv1 import ResponseHeader as DataType_ResponseHeader
from pcapkit.utilities.exceptions import ProtocolError
if TYPE_CHECKING:
from typing import Any, NoReturn, Optional
from typing_extensions import Literal
from pcapkit.protocols.data.application.httpv1 import Header as DataType_Header
__all__ = ['HTTP']
#: Supported HTTP method.
HTTP_METHODS = [
'GET', 'HEAD', 'POST', 'PUT',
'DELETE', 'TRACE', 'OPTIONS',
'CONNECT', 'PATCH',
]
#: Regular expression to match HTTP methods.
_RE_METHOD = re.compile(r'|'.join(HTTP_METHODS).encode())
#: Regular expression to match HTTP version string.
_RE_VERSION = re.compile(rb"HTTP/(?P<version>\d\.\d)")
#: Regular expression to match HTTP status code.
_RE_STATUS = re.compile(rb'\d{3}')
[docs]class HTTP(HTTPBase[DataType_HTTP]):
"""This class implements Hypertext Transfer Protocol (HTTP/1.*)."""
##########################################################################
# Defaults.
##########################################################################
#: Literal['request', 'response']: Type of HTTP receipt.
_receipt: 'Literal["request", "response"]'
##########################################################################
# Properties.
##########################################################################
@property
def alias(self) -> 'Literal["HTTP/0.9", "HTTP/1.0", "HTTP/1.1"]':
"""Acronym of current protocol."""
return f'HTTP/{self.version}' # type: ignore[return-value]
@property
def version(self) -> 'Literal["0.9", "1.0", "1.1"]':
"""Version of current protocol."""
return self._info.receipt.version # type: ignore[attr-defined]
##########################################################################
# Methods.
##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'DataType_HTTP': # pylint: disable=unused-argument
"""Read Hypertext Transfer Protocol (HTTP/1.*).
Structure of HTTP/1.* packet [:rfc:`7230`]:
.. code-block:: text
HTTP-message :==: start-line
*( header-field CRLF )
CRLF
[ message-body ]
Args:
length: Length of packet data.
**kwargs: Arbitrary keyword arguments.
Returns:
Parsed packet data.
Raises:
ProtocolError: If the packet is malformed.
"""
if length is None:
length = len(self)
packet = self._file.read(length)
header, body = packet.split(b'\r\n\r\n', maxsplit=1)
header_line, header_unpacked = self._read_http_header(header)
body_unpacked = self._read_http_body(body) or None
http = DataType_HTTP(
receipt=header_line,
header=header_unpacked,
body=body_unpacked,
)
self._receipt = header_line.type
self._version = header_line.version # type: ignore[attr-defined]
self._length = len(packet)
return http
[docs] def make(self, **kwargs: 'Any') -> 'NoReturn':
"""Make (construct) packet data.
Args:
**kwargs: Arbitrary keyword arguments.
Returns:
Constructed packet data.
"""
raise NotImplementedError
[docs] @classmethod
def id(cls) -> 'tuple[Literal["HTTP"]]': # type: ignore[override]
"""Index ID of the protocol.
Returns:
Index ID of the protocol.
"""
return (cls.__name__,) # type: ignore[return-value]
##########################################################################
# Utilities.
##########################################################################
[docs] def _read_http_body(self, body: 'bytes') -> 'bytes':
"""Read HTTP/1.* body.
Args:
body: HTTP body data.
Returns:
Raw HTTP body.
"""
return body