Source code for ogu_api._http

from __future__ import annotations

import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Mapping, MutableMapping

import tls_client
import tls_client.response

from .errors import (
    OGUAPIError,
    OGUAuthenticationError,
    OGUAuthorizationError,
    OGUNetworkError,
    OGUNotFoundError,
    OGURateLimitError,
    OGUServerError,
    OGUTimeoutError,
    OGUValidationError,
)
from .proxy import Proxy

__all__ = [
    'HttpClientConfig',
    'HttpClient',
]


logger = logging.getLogger(__name__)


DEFAULT_BASE_URL = 'https://oguser.com'

DEFAULT_TIMEOUT_SECONDS = 30.0

DEFAULT_RETRIES = 0

DEFAULT_RETRY_BACKOFF_SECONDS = 0.5

DEFAULT_CLIENT_IDENTIFIER = 'chrome131'

DEFAULT_HEADERS: Mapping[str, str] = {
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Encoding': 'gzip, deflate, br, zstd',
    'Accept-Language': 'en-US,en;q=0.5',
    'Connection': 'keep-alive',
    'Host': 'oguser.com',
    'Priority': 'u=0, i',
    'Referer': 'https://oguser.com/',
    'Sec-Fetch-Dest': 'document',
    'Sec-Fetch-Mode': 'navigate',
    'Sec-Fetch-Site': 'same-origin',
    'Sec-Fetch-User': '?1',
    'Sec-GPC': '1',
    'TE': 'trailers',
    'Upgrade-Insecure-Requests': '1',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0',
}

RETRYABLE_STATUS_CODES: frozenset[int] = frozenset({429, 500, 502, 503, 504})


[docs] @dataclass(frozen = True) class HttpClientConfig: """Frozen configuration for :class:`HttpClient`. Attributes: base_url: Forum base URL, default ``https://oguser.com``. timeout_seconds: Per-request timeout. max_retries: How many times to retry on ``429`` and ``5xx`` (default ``0`` — no retries). retry_backoff_seconds: Base delay for exponential backoff between retries. client_identifier: TLS fingerprint identifier passed to ``tls_client.Session`` (default ``'chrome131'``). default_headers: Per-request default headers. Merged with per-request ``extra_headers``; per-request values win. """ base_url: str = DEFAULT_BASE_URL timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS max_retries: int = DEFAULT_RETRIES retry_backoff_seconds: float = DEFAULT_RETRY_BACKOFF_SECONDS client_identifier: str = DEFAULT_CLIENT_IDENTIFIER default_headers: Mapping[str, str] = field(default_factory = lambda: dict(DEFAULT_HEADERS))
[docs] class HttpClient: """Async wrapper around ``tls_client.Session``. Runs the synchronous ``tls_client`` calls in a worker thread via ``asyncio.to_thread``, applies retries on ``429`` / ``5xx``, and maps HTTP status codes to typed exceptions from :mod:`ogu_api.errors`. You typically don't construct this directly — :class:`OGUClient` does it for you. Drop down to ``client.http`` when you need to hit an endpoint not yet covered by a resource. Args: proxy: Optional proxy string (see :class:`Proxy` for accepted forms). config: Configuration. ``None`` for defaults. session: Pre-configured ``tls_client.Session``. If provided, the client will not close it on shutdown. """ def __init__( self, *, proxy: str | None = None, config: HttpClientConfig | None = None, session: tls_client.Session | None = None, ) -> None: self._config: HttpClientConfig = config or HttpClientConfig() self._proxy: Proxy = Proxy(proxy) self._session: tls_client.Session = session or self._create_session( self._proxy, client_identifier = self._config.client_identifier, ) self._owns_session: bool = session is None self._session.cookies.set('ogumybbuser', '1') self._session.cookies.set('oguloginattempts', '1') async def __aenter__(self) -> HttpClient: return self async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None: self.close() def close(self) -> None: if self._owns_session: close = getattr(self._session, 'close', None) if callable(close): close() @property def session(self) -> tls_client.Session: return self._session @property def cookies(self) -> Any: return self._session.cookies @property def base_url(self) -> str: return self._config.base_url
[docs] async def get( self, path: str, *, query: Mapping[str, Any] | None = None, extra_headers: Mapping[str, str] | None = None, allow_redirects: bool = True, ) -> tls_client.response.Response: """Convenience wrapper for ``request('GET', ...)``.""" return await self.request( 'GET', path, query = query, extra_headers = extra_headers, allow_redirects = allow_redirects, )
[docs] async def post( self, path: str, *, data: Mapping[str, Any] | None = None, json_body: Any = None, query: Mapping[str, Any] | None = None, extra_headers: Mapping[str, str] | None = None, allow_redirects: bool = True, ) -> tls_client.response.Response: """Convenience wrapper for ``request('POST', ...)``.""" return await self.request( 'POST', path, data = data, json_body = json_body, query = query, extra_headers = extra_headers, allow_redirects = allow_redirects, )
[docs] async def request( self, method: str, path: str, *, query: Mapping[str, Any] | None = None, data: Mapping[str, Any] | None = None, json_body: Any = None, extra_headers: Mapping[str, str] | None = None, allow_redirects: bool = True, ) -> tls_client.response.Response: """Issue an HTTP request through the underlying ``tls_client.Session``. Args: method: HTTP verb (``'GET'``, ``'POST'``, …). path: Request path (relative to ``base_url``) or absolute URL. query: Query-string parameters. data: Form-urlencoded body for POST. json_body: JSON body. Mutually exclusive with ``data``. extra_headers: Per-request headers; merged on top of :attr:`HttpClientConfig.default_headers`. allow_redirects: Follow 3xx redirects. Returns: ``tls_client.response.Response`` for any 2xx / 3xx status. Raises: OGUAuthenticationError: 401. OGUAuthorizationError: 403. OGUNotFoundError: 404. OGUValidationError: 400, 422. OGURateLimitError: 429. OGUServerError: 5xx. OGUTimeoutError: Request timed out. OGUNetworkError: Other network-level failures. """ url = self._build_url(path) headers = self._build_headers(extra_headers, has_form_body = data is not None) attempts = max(1, self._config.max_retries + 1) last_error: Exception | None = None for attempt in range(1, attempts + 1): try: response = await asyncio.to_thread( self._dispatch, method = method, url = url, query = query, data = data, json_body = json_body, headers = headers, allow_redirects = allow_redirects, ) except OGUTimeoutError as E: last_error = E if attempt >= attempts: raise self._sleep_for_retry(attempt) continue except OGUNetworkError as E: last_error = E if attempt >= attempts: raise self._sleep_for_retry(attempt) continue if response.status_code in RETRYABLE_STATUS_CODES and attempt < attempts: self._sleep_for_retry(attempt) continue return self._handle_response(response, method = method, url = url) if last_error is not None: raise last_error raise OGUNetworkError(f'Exhausted retries for {method} {url}')
@staticmethod def _create_session(proxy: Proxy, *, client_identifier: str) -> tls_client.Session: session = tls_client.Session( client_identifier = client_identifier, random_tls_extension_order = True, ) if proxy.proxy: session.proxies.update({ 'http': proxy.proxy, 'https': proxy.proxy, }) return session def _dispatch( self, *, method: str, url: str, query: Mapping[str, Any] | None, data: Mapping[str, Any] | None, json_body: Any, headers: Mapping[str, str], allow_redirects: bool, ) -> tls_client.response.Response: request_func = getattr(self._session, method.lower()) try: return request_func( url, params = dict(query) if query else None, data = dict(data) if data else None, json = json_body, headers = dict(headers), allow_redirects = allow_redirects, ) except Exception as E: message = str(E) or E.__class__.__name__ if 'timeout' in message.lower() or 'timed out' in message.lower(): raise OGUTimeoutError(f'Request timed out: {method} {url}') from E raise OGUNetworkError(f'Network error during {method} {url}: {message}') from E def _build_url(self, path: str) -> str: if path.startswith(('http://', 'https://')): return path base = self._config.base_url.rstrip('/') if not path.startswith('/'): path = '/' + path return f'{base}{path}' def _build_headers( self, extra_headers: Mapping[str, str] | None, *, has_form_body: bool, ) -> MutableMapping[str, str]: headers: MutableMapping[str, str] = dict(self._config.default_headers) if has_form_body: headers['Content-Type'] = 'application/x-www-form-urlencoded' if extra_headers: for key, value in extra_headers.items(): headers[key] = value return headers def _sleep_for_retry(self, attempt: int) -> None: delay = self._config.retry_backoff_seconds * (2 ** (attempt - 1)) logger.debug('Retrying in %.2fs (attempt %d)', delay, attempt) def _handle_response( self, response: tls_client.response.Response, *, method: str, url: str, ) -> tls_client.response.Response: status = response.status_code if status < 400: return response body = response.text if hasattr(response, 'text') else None message = f'{method} {url} -> {status}' if status == 401: raise OGUAuthenticationError(message, status_code = status, method = method, url = url, body = body) if status == 403: raise OGUAuthorizationError(message, status_code = status, method = method, url = url, body = body) if status == 404: raise OGUNotFoundError(message, status_code = status, method = method, url = url, body = body) if status in (400, 422): raise OGUValidationError(message, status_code = status, method = method, url = url, body = body) if status == 429: retry_after = _parse_retry_after(response) raise OGURateLimitError( message, status_code = status, method = method, url = url, body = body, retry_after_seconds = retry_after, ) if 500 <= status < 600: raise OGUServerError(message, status_code = status, method = method, url = url, body = body) raise OGUAPIError(message, status_code = status, method = method, url = url, body = body)
def _parse_retry_after(response: tls_client.response.Response) -> float | None: raw = None headers = getattr(response, 'headers', None) if headers is not None: raw = headers.get('Retry-After') or headers.get('retry-after') if raw is None: return None try: return float(raw) except (TypeError, ValueError): return None