Source code for ogu_api.resources.threads

from __future__ import annotations

import calendar
import re
from datetime import datetime
from typing import Any, Mapping

import tls_client.response

from ..models import Post, Thread
from ._base import ResourceBase

__all__ = ['ThreadsResource']


_POST_ID_PATTERN = re.compile(r'post_(\d+)')
_UID_PATTERN = re.compile(r'uid=(\d+)')
_DATE_TITLE_PATTERN = re.compile(r'^\d{2}-\d{2}-\d{4}')
_PROFILE_LINK_PATTERN = re.compile(r'^https?://oguser\.com/[A-Za-z0-9_-]+/?$')
_PAGE_OF_PATTERN = re.compile(r'(\d+)\s*/\s*(\d+)')


[docs] class ThreadsResource(ResourceBase): """View threads / forums and post replies and new threads. Paths: - ``/showthread.php?tid={tid}`` — read a thread. - ``/forumdisplay.php?fid={fid}`` — read a forum's thread list. - ``/newreply.php?tid={tid}`` — reply form. - ``/newreply.php?tid={tid}&processed=1`` (POST) — submit reply. - ``/newthread.php?fid={fid}`` — new-thread form. - ``/newthread.php?fid={fid}&processed=1`` (POST) — submit new thread. """
[docs] async def get( self, thread_id: str | int, *, page: int | None = None, ) -> tls_client.response.Response: """Fetch a thread by numeric id. Args: thread_id: Numeric tid. page: 1-indexed page within the thread (newer pages of replies). """ path = f'/showthread.php?tid={thread_id}' if page is not None: path += f'&page={page}' return await self._http.get(path)
[docs] async def read( self, thread_id: str | int, *, page: int | None = None, ) -> Thread: """Fetch and parse a thread page into a :class:`~ogu_api.Thread` dataclass. Each post becomes a :class:`~ogu_api.Post` with ``pid``, ``author``, ``author_id``, ``body``, ``date`` (UTC unix timestamp), and the original ``date_label`` from the page. Args: thread_id: Numeric tid. page: 1-indexed page within the thread. Returns: Parsed :class:`~ogu_api.Thread`. Example: >>> thread = await client.threads.read(1286827) >>> print(thread.title, thread.page, thread.total_pages) >>> for post in thread.posts: ... print(post.author, post.date_label, post.body[:80]) """ response = await self.get(thread_id, page = page) return self.parse(response.text, thread_id = int(thread_id), page = page or 1)
[docs] async def get_lastpost(self, thread_id: str | int) -> tls_client.response.Response: """Jump to the last post of a thread (``?action=lastpost``).""" return await self._http.get(f'/showthread.php?tid={thread_id}&action=lastpost')
[docs] async def get_forum( self, forum_id: str | int, *, page: int | None = None, ) -> tls_client.response.Response: """Fetch a forum's thread listing. Args: forum_id: Numeric fid. page: 1-indexed page of the listing. """ path = f'/forumdisplay.php?fid={forum_id}' if page is not None: path += f'&page={page}' return await self._http.get(path)
[docs] async def get_reply_page(self, thread_id: str | int) -> tls_client.response.Response: """Fetch the reply form for a thread (raw HTML).""" return await self._http.get(f'/newreply.php?tid={thread_id}')
[docs] async def reply( self, thread_id: str | int, message: str, *, my_post_key: str | None = None, hidden: Mapping[str, Any] | None = None, ) -> tls_client.response.Response: """Post a reply to a thread. Auto-fetches the reply form when ``hidden`` is omitted, so the common case is just ``await client.threads.reply(tid, message)``. Args: thread_id: Numeric tid. message: Reply body (BBCode supported by the forum). my_post_key: Pre-fetched CSRF token. Auto-fetched from the reply form if omitted. hidden: Pre-fetched hidden form fields. Includes ``posthash``, ``subject``, etc. Returns: Raw POST response. Example: >>> await client.threads.reply(1286827, 'great post') """ if hidden is None: page = await self.get_reply_page(thread_id) hidden = self.extract_reply_hidden(page.text) if my_post_key is None: my_post_key = hidden.get('my_post_key', '') return await self._http.post( f'/newreply.php?tid={thread_id}&processed=1', data = { **hidden, 'action': 'do_newreply', 'my_post_key': my_post_key, 'tid': str(thread_id), 'message': message, }, )
[docs] async def get_new_thread_page(self, forum_id: str | int) -> tls_client.response.Response: """Fetch the new-thread form for a forum (raw HTML).""" return await self._http.get(f'/newthread.php?fid={forum_id}')
[docs] async def create( self, forum_id: str | int, subject: str, message: str, *, my_post_key: str | None = None, hidden: Mapping[str, Any] | None = None, ) -> tls_client.response.Response: """Create a new thread in a forum. Args: forum_id: Numeric fid of the destination forum. subject: Thread title. message: Thread body. my_post_key: Pre-fetched CSRF token. hidden: Pre-fetched hidden form fields. Returns: Raw POST response. Example: >>> await client.threads.create(13, subject = 'hi', message = 'first post') """ if hidden is None: page = await self.get_new_thread_page(forum_id) hidden = self.extract_new_thread_hidden(page.text) if my_post_key is None: my_post_key = hidden.get('my_post_key', '') return await self._http.post( f'/newthread.php?fid={forum_id}&processed=1', data = { **hidden, 'action': 'do_newthread', 'my_post_key': my_post_key, 'fid': str(forum_id), 'subject': subject, 'message': message, }, )
[docs] @classmethod def parse(cls, page_html: str, *, thread_id: int, page: int = 1) -> Thread: """Parse a ``/showthread.php`` page into a :class:`~ogu_api.Thread`. Args: page_html: Raw HTML body of a thread page. thread_id: Numeric tid (used as the ``Thread.tid`` field). page: 1-indexed page number (used as the ``Thread.page`` field). Returns: Parsed :class:`~ogu_api.Thread`. """ soup = ResourceBase._soup(page_html) title_node = soup.find('title') title_text = title_node.get_text(strip = True) if title_node else '' if title_text.endswith(' | OGU'): title_text = title_text[:-len(' | OGU')] posts: list[Post] = [] for article in soup.find_all('article'): post_div = article.find('div', id = _POST_ID_PATTERN) if not post_div: continue match = _POST_ID_PATTERN.search(post_div.get('id') or '') if not match: continue pid = int(match.group(1)) body_div = article.find('div', class_ = 'post_body') body_text = '' signature_text = '' if body_div: outer_sigs = [ s for s in body_div.find_all(class_ = 'signature') if s.parent is body_div ] for sig in outer_sigs: text = sig.get_text('\n', strip = True) if text: signature_text = text sig.decompose() body_text = body_div.get_text('\n', strip = True) author = None for a in article.find_all('a', href = True): href = a.get('href') or '' if _PROFILE_LINK_PATTERN.match(href): text = a.get_text(strip = True) if text: author = text break author_id: int | None = None for href_pattern in (r'reputation\.php\?uid=(\d+)', r'vouches\.php\?id=(\d+)', r'member\.php\?action=profile&uid=(\d+)'): link = article.find('a', href = re.compile(href_pattern)) if not link: continue uid_match = re.search(href_pattern, link.get('href') or '') if uid_match: try: author_id = int(uid_match.group(1)) except ValueError: author_id = None break date_unix = 0 date_label = '' for span in article.find_all('span', title = True): title_attr = span.get('title') or '' if not _DATE_TITLE_PATTERN.match(title_attr): continue date_label = span.get_text(' ', strip = True) date_unix = _parse_date(title_attr) break posts.append(Post( pid = pid, author = author, author_id = author_id, body = body_text, date = date_unix, date_label = date_label, signature = signature_text, )) total_pages = _extract_total_pages(soup) return Thread( tid = thread_id, title = title_text, posts = tuple(posts), page = page, total_pages = total_pages, )
[docs] @staticmethod def extract_reply_hidden(page_html: str) -> dict[str, Any]: """Pull hidden inputs off the reply form (``form[action*="newreply.php"]``).""" return ResourceBase._extract_hidden(page_html, form_selector = 'form[action*="newreply.php"]')
[docs] @staticmethod def extract_new_thread_hidden(page_html: str) -> dict[str, Any]: """Pull hidden inputs off the new-thread form.""" return ResourceBase._extract_hidden(page_html, form_selector = 'form[action*="newthread.php"]')
def _parse_date(title_attr: str) -> int: for fmt in ('%m-%d-%Y, %I:%M %p', '%m-%d-%Y'): try: dt = datetime.strptime(title_attr, fmt) except ValueError: continue return int(calendar.timegm(dt.timetuple())) return 0 def _extract_total_pages(soup: Any) -> int: posts_container = soup.find(id = 'posts') or soup.find(id = 'noa-posttransition') candidates: list[Any] = [] if posts_container is not None: for sibling in list(posts_container.previous_siblings) + list(posts_container.next_siblings): if getattr(sibling, 'find_all', None) is None: continue candidates.extend(sibling.find_all(class_ = re.compile(r'^pagination'))) for pagination in candidates: last = pagination.find('a', class_ = re.compile(r'pagination_last')) if last: href = last.get('href') or '' page_match = re.search(r'(?:[?&]|^)page=(\d+)', href) if page_match: try: return int(page_match.group(1)) except ValueError: pass # Fallback: largest page=N value across anchor hrefs max_page = 1 for anchor in pagination.find_all('a', href = True): page_match = re.search(r'(?:[?&])page=(\d+)', anchor.get('href') or '') if not page_match: continue try: page_num = int(page_match.group(1)) except ValueError: continue if page_num > max_page: max_page = page_num if max_page > 1: return max_page return 1