Source code for headerparser.scanner

from __future__ import annotations
from collections.abc import Iterable, Iterator
import re
from typing import Optional, Tuple, Union
import attr
from deprecated import deprecated
from .errors import MalformedHeaderError, ScannerEOFError, UnexpectedFoldingError
from .util import ascii_splitlines

RgxType = Union[str, "re.Pattern[str]"]

FieldType = Tuple[Optional[str], str]

DEFAULT_SEPARATOR_REGEX = re.compile(r"[ \t]*:[ \t]*")


def data2iter(data: str | Iterable[str]) -> Iterator[str]:
    if isinstance(data, str):
        data = ascii_splitlines(data)
    return iter(data)


def convert_sep(v: Optional[RgxType]) -> re.Pattern[str]:
    if v is None:
        return DEFAULT_SEPARATOR_REGEX
    else:
        return re.compile(v)


def none2false(v: Optional[bool]) -> bool:
    return False if v is None else v


[docs] @attr.define class Scanner: """ .. versionadded:: 0.5.0 A class for scanning text for RFC 822-style header fields. Each method processes some portion of the input yet unscanned; the `scan()`, `scan_stanzas()`, and `get_unscanned()` methods process the entirety of the remaining input, while the `scan_next_stanza()` method only processes up through the first blank line. :param data: The text to scan. This may be a string, a text-file-like object, or an iterable of lines. If it is a string, it will be broken into lines on CR, LF, and CR LF boundaries. :param separator_regex: A regex (as a `str` or compiled regex object) defining the name-value separator; defaults to :regexp:`[ \\\\t]*:[ \\\\t]*`. When the regex is found in a line, everything before the matched substring becomes the field name, and everything after becomes the first line of the field value. Note that the regex must match any surrounding whitespace in order for it to be trimmed from the key & value. :param bool skip_leading_newlines: If `True`, blank lines at the beginning of the input will be discarded. If `False`, a blank line at the beginning of the input marks the end of an empty header section. """ _data: Iterator[str] = attr.field(converter=data2iter) separator_regex: re.Pattern[str] = attr.field( default=DEFAULT_SEPARATOR_REGEX, converter=convert_sep, kw_only=True, ) skip_leading_newlines: bool = attr.field( default=False, kw_only=True, converter=none2false ) _eof: bool = attr.field(default=False, init=False)
[docs] def scan(self) -> Iterator[FieldType]: """ Scan the remaining input for RFC 822-style header fields and return a generator of ``(name, value)`` pairs for each header field encountered, plus a ``(None, body)`` pair representing the body (if any) after the header section. All lines after the first blank line are concatenated & yielded as-is in a ``(None, body)`` pair. (Note that body lines which do not end with a line terminator will not have one appended.) If there is no empty line in the input, then no body pair is yielded. If the empty line is the last line in the input, the body will be the empty string. If the empty line is the *first* line in the input and the ``skip_leading_newlines`` option is false (the default), then all other lines will be treated as part of the body and will not be scanned for header fields. :raises ScannerError: if the header section is malformed :raises ScannerEOFError: if all of the input has already been consumed """ yield from self.scan_next_stanza() try: body = self.get_unscanned() except ScannerEOFError: pass else: yield (None, body)
[docs] def scan_next_stanza(self) -> Iterator[tuple[str, str]]: """ Scan the remaining input for RFC 822-style header fields and return a generator of ``(name, value)`` pairs for each header field in the input. Input processing stops as soon as a blank line is encountered. (If ``skip_leading_newlines`` is true, the function only stops on a blank line after a non-blank line.) :raises ScannerError: if the header section is malformed :raises ScannerEOFError: if all of the input has already been consumed """ if self._eof: raise ScannerEOFError() name: Optional[str] = None value = "" begun = False more_left = False for line in self._data: line = line.rstrip("\r\n") if line.startswith((" ", "\t")): begun = True if name is not None: value += "\n" + line else: raise UnexpectedFoldingError(line) else: m = self.separator_regex.search(line) if m: begun = True if name is not None: yield (name, value) name = line[: m.start()] value = line[m.end() :] elif line == "": if self.skip_leading_newlines and not begun: continue else: more_left = True break else: raise MalformedHeaderError(line) if name is not None: yield (name, value) if not more_left: self._eof = True
[docs] def scan_stanzas(self) -> Iterator[list[tuple[str, str]]]: """ Scan the remaining input for zero or more stanzas of RFC 822-style header fields and return a generator of lists of ``(name, value)`` pairs, where each list represents a stanza of header fields in the input. The stanzas are terminated by blank lines. Consecutive blank lines between stanzas are treated as a single blank line. Blank lines at the end of the input are discarded without creating a new stanza. :raises ScannerError: if the header section is malformed :raises ScannerEOFError: if all of the input has already been consumed """ if self._eof: raise ScannerEOFError() while True: try: fields = list(self.scan_next_stanza()) except ScannerEOFError: break if fields or not self._eof: yield fields else: break # type: ignore[unreachable] self.skip_leading_newlines = True
[docs] def get_unscanned(self) -> str: """ Return all of the input that has not yet been processed. After calling this method, calling any method again on the same `Scanner` instance will raise `ScannerEOFError`. :raises ScannerEOFError: if all of the input has already been consumed """ if self._eof: raise ScannerEOFError() else: return "".join(self._data)
[docs] @deprecated(version="0.5.0", reason="use scan() instead") def scan_string( s: str, *, separator_regex: Optional[RgxType] = None, skip_leading_newlines: bool = False, ) -> Iterator[FieldType]: """ Scan a string for RFC 822-style header fields and return a generator of ``(name, value)`` pairs for each header field in the input, plus a ``(None, body)`` pair representing the body (if any) after the header section. See `scan()` for more information on the exact behavior of the scanner. .. deprecated:: 0.5.0 Use `scan()` instead. :param s: a string which will be broken into lines on CR, LF, and CR LF boundaries and passed to `scan()` :param kwargs: Passed to the `Scanner` constructor :rtype: generator of pairs of strings :raises ScannerError: if the header section is malformed """ return scan( # pragma: no cover s, separator_regex=separator_regex, skip_leading_newlines=skip_leading_newlines, )
[docs] def scan( data: str | Iterable[str], *, separator_regex: Optional[RgxType] = None, skip_leading_newlines: bool = False, ) -> Iterator[FieldType]: """ .. versionadded:: 0.4.0 Scan a string, text-file-like object, or iterable of lines for RFC 822-style header fields and return a generator of ``(name, value)`` pairs for each header field in the input, plus a ``(None, body)`` pair representing the body (if any) after the header section. If ``data`` is a string, it will be broken into lines on CR, LF, and CR LF boundaries. All lines after the first blank line are concatenated & yielded as-is in a ``(None, body)`` pair. (Note that body lines which do not end with a line terminator will not have one appended.) If there is no empty line in ``data``, then no body pair is yielded. If the empty line is the last line in ``data``, the body will be the empty string. If the empty line is the *first* line in ``data`` and the ``skip_leading_newlines`` option is false (the default), then all other lines will be treated as part of the body and will not be scanned for header fields. .. versionchanged:: 0.5.0 ``data`` can now be a string. :param data: a string, text-file-like object, or iterable of strings representing lines of input :param kwargs: Passed to the `Scanner` constructor :rtype: generator of pairs of strings :raises ScannerError: if the header section is malformed """ return Scanner( data, separator_regex=separator_regex, skip_leading_newlines=skip_leading_newlines, ).scan()
[docs] @deprecated(version="0.5.0", reason="use Scanner.scan_next_stanza() instead") def scan_next_stanza( iterator: Iterator[str], *, separator_regex: Optional[RgxType] = None, skip_leading_newlines: bool = False, ) -> Iterator[tuple[str, str]]: """ .. versionadded:: 0.4.0 Scan a text-file-like object or iterator of lines for RFC 822-style header fields and return a generator of ``(name, value)`` pairs for each header field in the input. Input processing stops as soon as a blank line is encountered, leaving the rest of the iterator unconsumed (If ``skip_leading_newlines`` is true, the function only stops on a blank line after a non-blank line). .. deprecated:: 0.5.0 Use `Scanner.scan_next_stanza()` instead :param iterator: a text-file-like object or iterator of strings representing lines of input :param kwargs: Passed to the `Scanner` constructor :rtype: generator of pairs of strings :raises ScannerError: if the header section is malformed """ return Scanner( iterator, separator_regex=separator_regex, skip_leading_newlines=skip_leading_newlines, ).scan_next_stanza()
[docs] @deprecated(version="0.5.0", reason="use Scanner.scan_next_stanza() instead") def scan_next_stanza_string( s: str, *, separator_regex: Optional[RgxType] = None, skip_leading_newlines: bool = False, ) -> tuple[list[tuple[str, str]], str]: """ .. versionadded:: 0.4.0 Scan a string for RFC 822-style header fields and return a pair ``(fields, extra)`` where ``fields`` is a list of ``(name, value)`` pairs for each header field in the input up to the first blank line and ``extra`` is everything after the first blank line (If ``skip_leading_newlines`` is true, the dividing point is instead the first blank line after a non-blank line); if there is no appropriate blank line in the input, ``extra`` is the empty string. .. deprecated:: 0.5.0 Use `Scanner.scan_next_stanza()` instead :param s: a string to scan :param kwargs: Passed to the `Scanner` constructor :rtype: pair of a list of pairs of strings and a string :raises ScannerError: if the header section is malformed """ sc = Scanner( s, separator_regex=separator_regex, skip_leading_newlines=skip_leading_newlines, ) fields = list(sc.scan_next_stanza()) try: extra = sc.get_unscanned() except ScannerEOFError: extra = "" return (fields, extra)
[docs] def scan_stanzas( data: str | Iterable[str], *, separator_regex: Optional[RgxType] = None, skip_leading_newlines: bool = False, ) -> Iterator[list[tuple[str, str]]]: """ .. versionadded:: 0.4.0 Scan a string, text-file-like object, or iterable of lines for zero or more stanzas of RFC 822-style header fields and return a generator of lists of ``(name, value)`` pairs, where each list represents a stanza of header fields in the input. If ``data`` is a string, it will be broken into lines on CR, LF, and CR LF boundaries. The stanzas are terminated by blank lines. Consecutive blank lines between stanzas are treated as a single blank line. Blank lines at the end of the input are discarded without creating a new stanza. .. versionchanged:: 0.5.0 ``data`` can now be a string. :param data: a string, text-file-like object, or iterable of strings representing lines of input :param kwargs: Passed to the `Scanner` constructor :rtype: generator of lists of pairs of strings :raises ScannerError: if the header section is malformed """ return Scanner( data, separator_regex=separator_regex, skip_leading_newlines=skip_leading_newlines, ).scan_stanzas()
[docs] @deprecated(version="0.5.0", reason="use scan_stanzas() instead") def scan_stanzas_string( s: str, *, separator_regex: Optional[RgxType] = None, skip_leading_newlines: bool = False, ) -> Iterator[list[tuple[str, str]]]: """ .. versionadded:: 0.4.0 Scan a string for zero or more stanzas of RFC 822-style header fields and return a generator of lists of ``(name, value)`` pairs, where each list represents a stanza of header fields in the input. The stanzas are terminated by blank lines. Consecutive blank lines between stanzas are treated as a single blank line. Blank lines at the end of the input are discarded without creating a new stanza. .. deprecated:: 0.5.0 Use `scan_stanzas()` instead :param s: a string which will be broken into lines on CR, LF, and CR LF boundaries and passed to `scan_stanzas()` :param kwargs: Passed to the `Scanner` constructor :rtype: generator of lists of pairs of strings :raises ScannerError: if the header section is malformed """ return scan_stanzas( # pragma: no cover s, separator_regex=separator_regex, skip_leading_newlines=skip_leading_newlines, )