from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Mapping, MutableMapping
import tls_client
import tls_client.response
from .errors import (
OGUAPIError,
OGUAuthenticationError,
OGUAuthorizationError,
OGUNetworkError,
OGUNotFoundError,
OGURateLimitError,
OGUServerError,
OGUTimeoutError,
OGUValidationError,
)
from .proxy import Proxy
__all__ = [
'HttpClientConfig',
'HttpClient',
]
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = 'https://oguser.com'
DEFAULT_TIMEOUT_SECONDS = 30.0
DEFAULT_RETRIES = 0
DEFAULT_RETRY_BACKOFF_SECONDS = 0.5
DEFAULT_CLIENT_IDENTIFIER = 'chrome131'
DEFAULT_HEADERS: Mapping[str, str] = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Host': 'oguser.com',
'Priority': 'u=0, i',
'Referer': 'https://oguser.com/',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
'Sec-GPC': '1',
'TE': 'trailers',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0',
}
RETRYABLE_STATUS_CODES: frozenset[int] = frozenset({429, 500, 502, 503, 504})
[docs]
@dataclass(frozen = True)
class HttpClientConfig:
"""Frozen configuration for :class:`HttpClient`.
Attributes:
base_url: Forum base URL, default ``https://oguser.com``.
timeout_seconds: Per-request timeout.
max_retries: How many times to retry on ``429`` and ``5xx`` (default
``0`` — no retries).
retry_backoff_seconds: Base delay for exponential backoff between
retries.
client_identifier: TLS fingerprint identifier passed to
``tls_client.Session`` (default ``'chrome131'``).
default_headers: Per-request default headers. Merged with
per-request ``extra_headers``; per-request values win.
"""
base_url: str = DEFAULT_BASE_URL
timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS
max_retries: int = DEFAULT_RETRIES
retry_backoff_seconds: float = DEFAULT_RETRY_BACKOFF_SECONDS
client_identifier: str = DEFAULT_CLIENT_IDENTIFIER
default_headers: Mapping[str, str] = field(default_factory = lambda: dict(DEFAULT_HEADERS))
[docs]
class HttpClient:
"""Async wrapper around ``tls_client.Session``.
Runs the synchronous ``tls_client`` calls in a worker thread via
``asyncio.to_thread``, applies retries on ``429`` / ``5xx``, and maps
HTTP status codes to typed exceptions from :mod:`ogu_api.errors`.
You typically don't construct this directly — :class:`OGUClient` does it
for you. Drop down to ``client.http`` when you need to hit an endpoint
not yet covered by a resource.
Args:
proxy: Optional proxy string (see :class:`Proxy` for accepted forms).
config: Configuration. ``None`` for defaults.
session: Pre-configured ``tls_client.Session``. If provided, the
client will not close it on shutdown.
"""
def __init__(
self,
*,
proxy: str | None = None,
config: HttpClientConfig | None = None,
session: tls_client.Session | None = None,
) -> None:
self._config: HttpClientConfig = config or HttpClientConfig()
self._proxy: Proxy = Proxy(proxy)
self._session: tls_client.Session = session or self._create_session(
self._proxy,
client_identifier = self._config.client_identifier,
)
self._owns_session: bool = session is None
self._session.cookies.set('ogumybbuser', '1')
self._session.cookies.set('oguloginattempts', '1')
async def __aenter__(self) -> HttpClient:
return self
async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
self.close()
def close(self) -> None:
if self._owns_session:
close = getattr(self._session, 'close', None)
if callable(close):
close()
@property
def session(self) -> tls_client.Session:
return self._session
@property
def cookies(self) -> Any:
return self._session.cookies
@property
def base_url(self) -> str:
return self._config.base_url
[docs]
async def get(
self,
path: str,
*,
query: Mapping[str, Any] | None = None,
extra_headers: Mapping[str, str] | None = None,
allow_redirects: bool = True,
) -> tls_client.response.Response:
"""Convenience wrapper for ``request('GET', ...)``."""
return await self.request(
'GET',
path,
query = query,
extra_headers = extra_headers,
allow_redirects = allow_redirects,
)
[docs]
async def post(
self,
path: str,
*,
data: Mapping[str, Any] | None = None,
json_body: Any = None,
query: Mapping[str, Any] | None = None,
extra_headers: Mapping[str, str] | None = None,
allow_redirects: bool = True,
) -> tls_client.response.Response:
"""Convenience wrapper for ``request('POST', ...)``."""
return await self.request(
'POST',
path,
data = data,
json_body = json_body,
query = query,
extra_headers = extra_headers,
allow_redirects = allow_redirects,
)
[docs]
async def request(
self,
method: str,
path: str,
*,
query: Mapping[str, Any] | None = None,
data: Mapping[str, Any] | None = None,
json_body: Any = None,
extra_headers: Mapping[str, str] | None = None,
allow_redirects: bool = True,
) -> tls_client.response.Response:
"""Issue an HTTP request through the underlying ``tls_client.Session``.
Args:
method: HTTP verb (``'GET'``, ``'POST'``, …).
path: Request path (relative to ``base_url``) or absolute URL.
query: Query-string parameters.
data: Form-urlencoded body for POST.
json_body: JSON body. Mutually exclusive with ``data``.
extra_headers: Per-request headers; merged on top of
:attr:`HttpClientConfig.default_headers`.
allow_redirects: Follow 3xx redirects.
Returns:
``tls_client.response.Response`` for any 2xx / 3xx status.
Raises:
OGUAuthenticationError: 401.
OGUAuthorizationError: 403.
OGUNotFoundError: 404.
OGUValidationError: 400, 422.
OGURateLimitError: 429.
OGUServerError: 5xx.
OGUTimeoutError: Request timed out.
OGUNetworkError: Other network-level failures.
"""
url = self._build_url(path)
headers = self._build_headers(extra_headers, has_form_body = data is not None)
attempts = max(1, self._config.max_retries + 1)
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
try:
response = await asyncio.to_thread(
self._dispatch,
method = method,
url = url,
query = query,
data = data,
json_body = json_body,
headers = headers,
allow_redirects = allow_redirects,
)
except OGUTimeoutError as E:
last_error = E
if attempt >= attempts:
raise
self._sleep_for_retry(attempt)
continue
except OGUNetworkError as E:
last_error = E
if attempt >= attempts:
raise
self._sleep_for_retry(attempt)
continue
if response.status_code in RETRYABLE_STATUS_CODES and attempt < attempts:
self._sleep_for_retry(attempt)
continue
return self._handle_response(response, method = method, url = url)
if last_error is not None:
raise last_error
raise OGUNetworkError(f'Exhausted retries for {method} {url}')
@staticmethod
def _create_session(proxy: Proxy, *, client_identifier: str) -> tls_client.Session:
session = tls_client.Session(
client_identifier = client_identifier,
random_tls_extension_order = True,
)
if proxy.proxy:
session.proxies.update({
'http': proxy.proxy,
'https': proxy.proxy,
})
return session
def _dispatch(
self,
*,
method: str,
url: str,
query: Mapping[str, Any] | None,
data: Mapping[str, Any] | None,
json_body: Any,
headers: Mapping[str, str],
allow_redirects: bool,
) -> tls_client.response.Response:
request_func = getattr(self._session, method.lower())
try:
return request_func(
url,
params = dict(query) if query else None,
data = dict(data) if data else None,
json = json_body,
headers = dict(headers),
allow_redirects = allow_redirects,
)
except Exception as E:
message = str(E) or E.__class__.__name__
if 'timeout' in message.lower() or 'timed out' in message.lower():
raise OGUTimeoutError(f'Request timed out: {method} {url}') from E
raise OGUNetworkError(f'Network error during {method} {url}: {message}') from E
def _build_url(self, path: str) -> str:
if path.startswith(('http://', 'https://')):
return path
base = self._config.base_url.rstrip('/')
if not path.startswith('/'):
path = '/' + path
return f'{base}{path}'
def _build_headers(
self,
extra_headers: Mapping[str, str] | None,
*,
has_form_body: bool,
) -> MutableMapping[str, str]:
headers: MutableMapping[str, str] = dict(self._config.default_headers)
if has_form_body:
headers['Content-Type'] = 'application/x-www-form-urlencoded'
if extra_headers:
for key, value in extra_headers.items():
headers[key] = value
return headers
def _sleep_for_retry(self, attempt: int) -> None:
delay = self._config.retry_backoff_seconds * (2 ** (attempt - 1))
logger.debug('Retrying in %.2fs (attempt %d)', delay, attempt)
def _handle_response(
self,
response: tls_client.response.Response,
*,
method: str,
url: str,
) -> tls_client.response.Response:
status = response.status_code
if status < 400:
return response
body = response.text if hasattr(response, 'text') else None
message = f'{method} {url} -> {status}'
if status == 401:
raise OGUAuthenticationError(message, status_code = status, method = method, url = url, body = body)
if status == 403:
raise OGUAuthorizationError(message, status_code = status, method = method, url = url, body = body)
if status == 404:
raise OGUNotFoundError(message, status_code = status, method = method, url = url, body = body)
if status in (400, 422):
raise OGUValidationError(message, status_code = status, method = method, url = url, body = body)
if status == 429:
retry_after = _parse_retry_after(response)
raise OGURateLimitError(
message,
status_code = status,
method = method,
url = url,
body = body,
retry_after_seconds = retry_after,
)
if 500 <= status < 600:
raise OGUServerError(message, status_code = status, method = method, url = url, body = body)
raise OGUAPIError(message, status_code = status, method = method, url = url, body = body)
def _parse_retry_after(response: tls_client.response.Response) -> float | None:
raw = None
headers = getattr(response, 'headers', None)
if headers is not None:
raw = headers.get('Retry-After') or headers.get('retry-after')
if raw is None:
return None
try:
return float(raw)
except (TypeError, ValueError):
return None