from __future__ import annotations
import calendar
import re
from datetime import datetime
from typing import Any, Mapping
import tls_client.response
from ..models import Post, Thread
from ._base import ResourceBase
__all__ = ['ThreadsResource']
_POST_ID_PATTERN = re.compile(r'post_(\d+)')
_UID_PATTERN = re.compile(r'uid=(\d+)')
_DATE_TITLE_PATTERN = re.compile(r'^\d{2}-\d{2}-\d{4}')
_PROFILE_LINK_PATTERN = re.compile(r'^https?://oguser\.com/[A-Za-z0-9_-]+/?$')
_PAGE_OF_PATTERN = re.compile(r'(\d+)\s*/\s*(\d+)')
[docs]
class ThreadsResource(ResourceBase):
"""View threads / forums and post replies and new threads.
Paths:
- ``/showthread.php?tid={tid}`` — read a thread.
- ``/forumdisplay.php?fid={fid}`` — read a forum's thread list.
- ``/newreply.php?tid={tid}`` — reply form.
- ``/newreply.php?tid={tid}&processed=1`` (POST) — submit reply.
- ``/newthread.php?fid={fid}`` — new-thread form.
- ``/newthread.php?fid={fid}&processed=1`` (POST) — submit new thread.
"""
[docs]
async def get(
self,
thread_id: str | int,
*,
page: int | None = None,
) -> tls_client.response.Response:
"""Fetch a thread by numeric id.
Args:
thread_id: Numeric tid.
page: 1-indexed page within the thread (newer pages of replies).
"""
path = f'/showthread.php?tid={thread_id}'
if page is not None:
path += f'&page={page}'
return await self._http.get(path)
[docs]
async def read(
self,
thread_id: str | int,
*,
page: int | None = None,
) -> Thread:
"""Fetch and parse a thread page into a :class:`~ogu_api.Thread` dataclass.
Each post becomes a :class:`~ogu_api.Post` with ``pid``, ``author``,
``author_id``, ``body``, ``date`` (UTC unix timestamp), and the
original ``date_label`` from the page.
Args:
thread_id: Numeric tid.
page: 1-indexed page within the thread.
Returns:
Parsed :class:`~ogu_api.Thread`.
Example:
>>> thread = await client.threads.read(1286827)
>>> print(thread.title, thread.page, thread.total_pages)
>>> for post in thread.posts:
... print(post.author, post.date_label, post.body[:80])
"""
response = await self.get(thread_id, page = page)
return self.parse(response.text, thread_id = int(thread_id), page = page or 1)
[docs]
async def get_by_link(self, link: str) -> tls_client.response.Response:
"""Fetch a thread by an arbitrary link (numeric or slug-rewritten).
Args:
link: Path returned by :meth:`FeedResource.extract_thread_links`,
e.g. ``/Thread-Card-to-Crypto`` or
``/showthread.php?tid=123&action=lastpost``.
"""
return await self._http.get(link)
[docs]
async def get_lastpost(self, thread_id: str | int) -> tls_client.response.Response:
"""Jump to the last post of a thread (``?action=lastpost``)."""
return await self._http.get(f'/showthread.php?tid={thread_id}&action=lastpost')
[docs]
async def get_forum(
self,
forum_id: str | int,
*,
page: int | None = None,
) -> tls_client.response.Response:
"""Fetch a forum's thread listing.
Args:
forum_id: Numeric fid.
page: 1-indexed page of the listing.
"""
path = f'/forumdisplay.php?fid={forum_id}'
if page is not None:
path += f'&page={page}'
return await self._http.get(path)
[docs]
async def get_reply_page(self, thread_id: str | int) -> tls_client.response.Response:
"""Fetch the reply form for a thread (raw HTML)."""
return await self._http.get(f'/newreply.php?tid={thread_id}')
[docs]
async def reply(
self,
thread_id: str | int,
message: str,
*,
my_post_key: str | None = None,
hidden: Mapping[str, Any] | None = None,
) -> tls_client.response.Response:
"""Post a reply to a thread.
Auto-fetches the reply form when ``hidden`` is omitted, so the common
case is just ``await client.threads.reply(tid, message)``.
Args:
thread_id: Numeric tid.
message: Reply body (BBCode supported by the forum).
my_post_key: Pre-fetched CSRF token. Auto-fetched from the reply
form if omitted.
hidden: Pre-fetched hidden form fields. Includes ``posthash``,
``subject``, etc.
Returns:
Raw POST response.
Example:
>>> await client.threads.reply(1286827, 'great post')
"""
if hidden is None:
page = await self.get_reply_page(thread_id)
hidden = self.extract_reply_hidden(page.text)
if my_post_key is None:
my_post_key = hidden.get('my_post_key', '')
return await self._http.post(
f'/newreply.php?tid={thread_id}&processed=1',
data = {
**hidden,
'action': 'do_newreply',
'my_post_key': my_post_key,
'tid': str(thread_id),
'message': message,
},
)
[docs]
async def get_new_thread_page(self, forum_id: str | int) -> tls_client.response.Response:
"""Fetch the new-thread form for a forum (raw HTML)."""
return await self._http.get(f'/newthread.php?fid={forum_id}')
[docs]
async def create(
self,
forum_id: str | int,
subject: str,
message: str,
*,
my_post_key: str | None = None,
hidden: Mapping[str, Any] | None = None,
) -> tls_client.response.Response:
"""Create a new thread in a forum.
Args:
forum_id: Numeric fid of the destination forum.
subject: Thread title.
message: Thread body.
my_post_key: Pre-fetched CSRF token.
hidden: Pre-fetched hidden form fields.
Returns:
Raw POST response.
Example:
>>> await client.threads.create(13, subject = 'hi', message = 'first post')
"""
if hidden is None:
page = await self.get_new_thread_page(forum_id)
hidden = self.extract_new_thread_hidden(page.text)
if my_post_key is None:
my_post_key = hidden.get('my_post_key', '')
return await self._http.post(
f'/newthread.php?fid={forum_id}&processed=1',
data = {
**hidden,
'action': 'do_newthread',
'my_post_key': my_post_key,
'fid': str(forum_id),
'subject': subject,
'message': message,
},
)
[docs]
@classmethod
def parse(cls, page_html: str, *, thread_id: int, page: int = 1) -> Thread:
"""Parse a ``/showthread.php`` page into a :class:`~ogu_api.Thread`.
Args:
page_html: Raw HTML body of a thread page.
thread_id: Numeric tid (used as the ``Thread.tid`` field).
page: 1-indexed page number (used as the ``Thread.page`` field).
Returns:
Parsed :class:`~ogu_api.Thread`.
"""
soup = ResourceBase._soup(page_html)
title_node = soup.find('title')
title_text = title_node.get_text(strip = True) if title_node else ''
if title_text.endswith(' | OGU'):
title_text = title_text[:-len(' | OGU')]
posts: list[Post] = []
for article in soup.find_all('article'):
post_div = article.find('div', id = _POST_ID_PATTERN)
if not post_div:
continue
match = _POST_ID_PATTERN.search(post_div.get('id') or '')
if not match:
continue
pid = int(match.group(1))
body_div = article.find('div', class_ = 'post_body')
body_text = ''
signature_text = ''
if body_div:
outer_sigs = [
s for s in body_div.find_all(class_ = 'signature')
if s.parent is body_div
]
for sig in outer_sigs:
text = sig.get_text('\n', strip = True)
if text:
signature_text = text
sig.decompose()
body_text = body_div.get_text('\n', strip = True)
author = None
for a in article.find_all('a', href = True):
href = a.get('href') or ''
if _PROFILE_LINK_PATTERN.match(href):
text = a.get_text(strip = True)
if text:
author = text
break
author_id: int | None = None
for href_pattern in (r'reputation\.php\?uid=(\d+)', r'vouches\.php\?id=(\d+)', r'member\.php\?action=profile&uid=(\d+)'):
link = article.find('a', href = re.compile(href_pattern))
if not link:
continue
uid_match = re.search(href_pattern, link.get('href') or '')
if uid_match:
try:
author_id = int(uid_match.group(1))
except ValueError:
author_id = None
break
date_unix = 0
date_label = ''
for span in article.find_all('span', title = True):
title_attr = span.get('title') or ''
if not _DATE_TITLE_PATTERN.match(title_attr):
continue
date_label = span.get_text(' ', strip = True)
date_unix = _parse_date(title_attr)
break
posts.append(Post(
pid = pid,
author = author,
author_id = author_id,
body = body_text,
date = date_unix,
date_label = date_label,
signature = signature_text,
))
total_pages = _extract_total_pages(soup)
return Thread(
tid = thread_id,
title = title_text,
posts = tuple(posts),
page = page,
total_pages = total_pages,
)
def _parse_date(title_attr: str) -> int:
for fmt in ('%m-%d-%Y, %I:%M %p', '%m-%d-%Y'):
try:
dt = datetime.strptime(title_attr, fmt)
except ValueError:
continue
return int(calendar.timegm(dt.timetuple()))
return 0
def _extract_total_pages(soup: Any) -> int:
posts_container = soup.find(id = 'posts') or soup.find(id = 'noa-posttransition')
candidates: list[Any] = []
if posts_container is not None:
for sibling in list(posts_container.previous_siblings) + list(posts_container.next_siblings):
if getattr(sibling, 'find_all', None) is None:
continue
candidates.extend(sibling.find_all(class_ = re.compile(r'^pagination')))
for pagination in candidates:
last = pagination.find('a', class_ = re.compile(r'pagination_last'))
if last:
href = last.get('href') or ''
page_match = re.search(r'(?:[?&]|^)page=(\d+)', href)
if page_match:
try:
return int(page_match.group(1))
except ValueError:
pass
# Fallback: largest page=N value across anchor hrefs
max_page = 1
for anchor in pagination.find_all('a', href = True):
page_match = re.search(r'(?:[?&])page=(\d+)', anchor.get('href') or '')
if not page_match:
continue
try:
page_num = int(page_match.group(1))
except ValueError:
continue
if page_num > max_page:
max_page = page_num
if max_page > 1:
return max_page
return 1