""" Generic XenForo Forum Client for Paid Content Scrapes XenForo-based celebrity image forums (HQCelebCorner, PicturePub, etc.) treating each celebrity name as a "creator" and each matching thread as a post. Images are hosted on external hosts (imagebam, pixhost, imagetwist, etc.) and resolved via ImageHostHandler from forum_downloader. """ import asyncio import html import json import re from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Set from urllib.parse import urlparse, unquote_plus import aiohttp from modules.base_module import LoggingMixin from .models import Post, Attachment class XenForoForumClient(LoggingMixin): """Generic client for scraping XenForo-based forum threads.""" FLARESOLVERR_URL = 'http://localhost:8191/v1' HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', } IMAGE_EXTS = {'jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp', 'tiff'} # External image host domains to look for in post links IMAGE_HOST_DOMAINS = [ 'imagebam.com', 'pixhost.to', 'imagetwist.com', 'imgur.com', 'imgbox.com', 'postimg.cc', 'postimages.org', 'catbox.moe', 'turboimagehost.com', 'imageban.ru', 'img.yt', 'acidimg.cc', 'pixxxels.cc', 'imx.to', 'imgbb.com', 'ibb.co', ] def __init__(self, service_id: str, base_url: str, cookie_path: str, log_callback=None): self.SERVICE_ID = service_id self.BASE_URL = base_url.rstrip('/') self.COOKIE_PATH = cookie_path self._init_logger('PaidContent', log_callback, default_module=service_id) self._cookies: Optional[Dict[str, str]] = None self._image_host_handler = None # ------------------------------------------------------------------ # Cookie handling # ------------------------------------------------------------------ def _load_cookies(self) -> Dict[str, str]: """Load Playwright-format cookies and convert to {name: value} dict.""" if self._cookies is not None: return self._cookies try: cookie_path = Path(self.COOKIE_PATH) if cookie_path.exists(): with open(cookie_path, 'r') as f: raw_cookies = json.load(f) self._cookies = {c['name']: c['value'] for c in raw_cookies} self.log(f"Loaded {len(self._cookies)} cookies from {self.COOKIE_PATH}", 'debug') else: self.log(f"Cookie file not found: {self.COOKIE_PATH}", 'warning') self._cookies = {} except Exception as e: self.log(f"Error loading cookies: {e}", 'warning') self._cookies = {} return self._cookies def _get_cookie_header(self) -> str: """Build Cookie header string from loaded cookies.""" cookies = self._load_cookies() return '; '.join(f'{k}={v}' for k, v in cookies.items()) def _get_request_headers(self) -> Dict[str, str]: """Get headers with cookies for authenticated requests.""" headers = dict(self.HEADERS) cookie_str = self._get_cookie_header() if cookie_str: headers['Cookie'] = cookie_str return headers # ------------------------------------------------------------------ # Image host handling # ------------------------------------------------------------------ def _get_image_host_handler(self): """Get or create ImageHostHandler instance.""" if self._image_host_handler is None: try: from modules.forum_downloader import ImageHostHandler self._image_host_handler = ImageHostHandler self.log("Loaded ImageHostHandler from forum_downloader", 'debug') except ImportError: self.log("ImageHostHandler not available", 'warning') self._image_host_handler = False # sentinel to avoid retrying return self._image_host_handler if self._image_host_handler is not False else None # ------------------------------------------------------------------ # HTTP helpers # ------------------------------------------------------------------ async def _fetch_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]: """Fetch a page with cookies. Falls back to FlareSolverr on 403.""" headers = self._get_request_headers() try: async with session.get(url, headers=headers, allow_redirects=True) as resp: if resp.status == 200: return await resp.text() if resp.status == 403: self.log(f"Got 403 for {url}, trying FlareSolverr", 'debug') return await self._fetch_via_flaresolverr(url) self.log(f"HTTP {resp.status} for {url}", 'warning') return None except Exception as e: self.log(f"Error fetching {url}: {e}", 'warning') return await self._fetch_via_flaresolverr(url) async def _fetch_via_flaresolverr(self, url: str) -> Optional[str]: """Fetch a page using FlareSolverr to bypass Cloudflare.""" try: import requests as std_requests except ImportError: self.log("requests library not available for FlareSolverr", 'warning') return None fs_session_id = None try: # Create session resp = std_requests.post(self.FLARESOLVERR_URL, json={ 'cmd': 'sessions.create' }, timeout=30) data = resp.json() if data.get('status') != 'ok': self.log("Failed to create FlareSolverr session", 'warning') return None fs_session_id = data.get('session') # Fetch page cookies = self._load_cookies() resp = std_requests.post(self.FLARESOLVERR_URL, json={ 'cmd': 'request.get', 'url': url, 'session': fs_session_id, 'cookies': [{'name': k, 'value': v} for k, v in cookies.items()], 'maxTimeout': 60000, }, timeout=70) page_data = resp.json() if page_data.get('status') == 'ok': return page_data.get('solution', {}).get('response', '') self.log(f"FlareSolverr failed for {url}: {page_data.get('message', 'unknown')}", 'warning') return None except Exception as e: self.log(f"FlareSolverr error for {url}: {e}", 'warning') return None finally: if fs_session_id: try: std_requests.post(self.FLARESOLVERR_URL, json={ 'cmd': 'sessions.destroy', 'session': fs_session_id, }, timeout=10) except Exception: pass # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ async def search_threads(self, query: str) -> List[Dict]: """Search for threads matching a celebrity name. Returns list of {thread_id, title, url, reply_count}. """ threads = [] timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: # XenForo search: POST form to /search/search search_url = f'{self.BASE_URL}/search/search' headers = self._get_request_headers() headers['Content-Type'] = 'application/x-www-form-urlencoded' # Need CSRF token - fetch search page first search_page_url = f'{self.BASE_URL}/search/' page_html = await self._fetch_page(session, search_page_url) if not page_html: self.log("Failed to fetch search page", 'warning') return threads # Extract CSRF token csrf_match = re.search(r'name="_xfToken"\s+value="([^"]+)"', page_html) xf_token = csrf_match.group(1) if csrf_match else '' form_data = { 'keywords': query, 'search_type': 'post', 'c[title_only]': '1', 'order': 'date', '_xfToken': xf_token, } try: async with session.post(search_url, headers=headers, data=form_data, allow_redirects=True) as resp: if resp.status != 200: self.log(f"Search returned HTTP {resp.status}", 'warning') return threads result_html = await resp.text() result_url = str(resp.url) except Exception as e: self.log(f"Search failed: {e}", 'error') return threads threads = self._parse_search_results(result_html) # Handle search result pagination page = 2 while True: next_url = self._find_next_search_page(result_html, result_url, page) if not next_url: break await asyncio.sleep(0.3) result_html = await self._fetch_page(session, next_url) if not result_html: break more = self._parse_search_results(result_html) if not more: break threads.extend(more) page += 1 self.log(f"Search for '{query}' found {len(threads)} threads", 'info') return threads async def get_thread_info(self, thread_url: str) -> Optional[Dict]: """Fetch page 1 of a thread and extract metadata. Returns {thread_id, title, reply_count, page_count, url}. """ timeout = aiohttp.ClientTimeout(total=30) try: async with aiohttp.ClientSession(timeout=timeout) as session: page_html = await self._fetch_page(session, thread_url) if not page_html: return None title = self._extract_title(page_html) page_count = self._extract_page_count(page_html) reply_count = self._extract_reply_count(page_html) thread_id = self._extract_thread_id(thread_url) return { 'thread_id': thread_id, 'title': title or 'Untitled', 'reply_count': reply_count, 'page_count': page_count, 'url': thread_url.split('#')[0].rstrip('/'), } except Exception as e: self.log(f"Error getting thread info for {thread_url}: {e}", 'error') return None async def get_thread_images(self, thread_url: str, page_count: int = None, start_page: int = 1) -> List[Dict]: """Scrape all pages of a thread and extract image host links. Returns list of {url, host, post_number} dicts (deduplicated). """ images = [] seen_urls: Set[str] = set() timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: # If page_count not provided, fetch page 1 to determine it if page_count is None: page1_html = await self._fetch_page(session, thread_url) if not page1_html: return images page_count = self._extract_page_count(page1_html) page_images = self._extract_image_links(page1_html) for img in page_images: if img['url'] not in seen_urls: seen_urls.add(img['url']) images.append(img) start_page = 2 for page_num in range(start_page, page_count + 1): page_url = self._build_page_url(thread_url, page_num) await asyncio.sleep(0.5) # Rate limit page_html = await self._fetch_page(session, page_url) if not page_html: self.log(f"Failed to fetch page {page_num}, stopping", 'warning') break page_images = self._extract_image_links(page_html) new_count = 0 for img in page_images: if img['url'] not in seen_urls: seen_urls.add(img['url']) images.append(img) new_count += 1 self.log(f"Page {page_num}/{page_count}: {new_count} new image links", 'debug') self.log(f"Total: {len(images)} unique image links from {page_count} pages", 'info') return images async def resolve_image_url(self, host_page_url: str, session: aiohttp.ClientSession = None) -> Optional[str]: """Resolve an image host page URL to a direct image URL. Uses ImageHostHandler from forum_downloader where possible. """ handler = self._get_image_host_handler() # Try direct extraction without fetching the page if handler: direct = handler.extract_direct_url(host_page_url) if direct: return direct # imgbox thumbnail → full image conversion (thumbs2 → images2) m = re.match(r'https?://thumbs(\d*)\.imgbox\.com/([a-f0-9]+/[a-f0-9]+/)(\w+)_t\.\w+', host_page_url) if m: return f"https://images{m.group(1)}.imgbox.com/{m.group(2)}{m.group(3)}_o.jpg" # For hosts that need page content, fetch and parse own_session = session is None if own_session: timeout = aiohttp.ClientTimeout(total=30) session = aiohttp.ClientSession(timeout=timeout) try: # ImageBam requires sfw_inter=1 cookie to bypass consent page headers = dict(self.HEADERS) if 'imagebam' in host_page_url: headers['Cookie'] = 'sfw_inter=1' try: async with session.get(host_page_url, headers=headers, allow_redirects=True) as resp: if resp.status != 200: return None page_content = await resp.text() final_url = str(resp.url) except Exception as e: self.log(f"Failed to fetch image host page {host_page_url}: {e}", 'debug') return None # Try handler with page content if handler: direct = handler.extract_direct_url(host_page_url, page_content=page_content) if direct: return direct # Manual extraction fallbacks return self._extract_direct_image_from_html(host_page_url, page_content, final_url) finally: if own_session: await session.close() # ------------------------------------------------------------------ # HTML parsing helpers # ------------------------------------------------------------------ def _parse_search_results(self, html_content: str) -> List[Dict]: """Parse XenForo search results page for thread links.""" threads = [] # Parse each contentRow block to extract title, URL, and date for block_match in re.finditer( r']*>(.*?)\s*\s*', html_content, re.DOTALL ): block = block_match.group(1) # Extract thread URL and title title_match = re.search( r'class="contentRow-title">\s*]*>(.*?)', block, re.DOTALL ) if not title_match: continue url = title_match.group(1) title_raw = title_match.group(2) title_raw = re.sub(r']*>.*?', '', title_raw) title_raw = re.sub(r']*>.*?', '', title_raw) title_raw = re.sub(r']*>(.*?)', r'\1', title_raw) title = html.unescape(re.sub(r'<[^>]+>', '', title_raw).strip()) if not title: continue if not url.startswith('http'): url = self.BASE_URL + url thread_id = self._extract_thread_id(url) if not thread_id: continue # Extract date from