Files
media-downloader/modules/forum_downloader.py
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

5029 lines
222 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Forum Downloader Module
Advanced forum scraping with database tracking, search monitoring, image host support,
and comprehensive authentication for major forum platforms.
Supported Forum Platforms:
- XenForo (1.x and 2.x)
- vBulletin (3.x, 4.x, 5.x)
- phpBB (all versions)
- Discourse
- Invision Power Board (IPB 4.x)
- MyBB
- Simple Machines Forum (SMF)
Key Features:
- Automatic forum type detection
- User authentication with cookie persistence
- Database tracking to avoid re-downloads
- Search monitoring with auto-tracking
- Bulk downloading from forum sections
- Support for multiple image hosting services
- Thread update monitoring
- Rate limiting and retry logic
Authentication:
The module supports automatic login for all major forum platforms.
Login credentials are used to access private/members-only content.
Cookies are saved for session persistence across runs.
Usage:
# Initialize downloader (use with ForumDatabaseAdapter for unified database)
from modules.forum_db_adapter import ForumDatabaseAdapter
forum_db_adapter = ForumDatabaseAdapter(unified_db)
downloader = ForumDownloader(
headless=True,
show_progress=True,
use_database=True,
db_path=forum_db_adapter # Pass adapter for unified database
)
# Login to forum (auto-detects forum type)
downloader.login(
forum_name="MyForum",
username="your_username",
password="your_password",
forum_url="https://forum.example.com"
)
# Download private thread
downloader.download_thread(
thread_url="https://forum.example.com/private/thread/123",
forum_name="MyForum",
username="your_username", # Optional if already logged in
password="your_password" # Optional if already logged in
)
# Download entire forum section
downloader.download_forum_section(
section_url="https://forum.example.com/forums/general-discussion",
forum_name="MyForum",
max_threads=50,
username="your_username",
password="your_password"
)
Based on FastDL architecture
"""
# Suppress pkg_resources deprecation warning from face_recognition_models
import warnings
warnings.filterwarnings('ignore', category=UserWarning, message='.*pkg_resources is deprecated.*')
from pathlib import Path
from datetime import datetime, timedelta
from urllib.parse import urlparse, urljoin
import os
import re
import sqlite3
import json
import hashlib
import time
import random
import platform
import subprocess
from typing import Dict, List, Optional, Tuple
import requests
from bs4 import BeautifulSoup
from enum import Enum
from modules.base_module import LoggingMixin
from modules.universal_logger import get_logger
# Module-level logger for classes without instance logger (ForumAuthenticator, etc.)
forum_logger = get_logger('Forum')
# Set Playwright browser path - use environment variable if set, otherwise use standard location
if 'PLAYWRIGHT_BROWSERS_PATH' not in os.environ:
os.environ['PLAYWRIGHT_BROWSERS_PATH'] = '/root/.cache/ms-playwright'
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
import nest_asyncio
# Apply nest_asyncio to allow Playwright in asyncio contexts
try:
nest_asyncio.apply()
except Exception as e:
pass
# Import shared date utilities
try:
from modules.date_utils import DateHandler, extract_date, update_timestamps
from modules.download_manager import DownloadManager, DownloadItem
from modules.move_module import MoveManager
DATE_UTILS_AVAILABLE = True
# OMDB API key is now set dynamically from settings in ForumDownloader.__init__
except ImportError:
DATE_UTILS_AVAILABLE = False
from datetime import datetime as dt
forum_logger.warning("date_utils module not found, using built-in date handling", module="Import")
# Optional imports
try:
from tqdm import tqdm
TQDM_AVAILABLE = True
except ImportError:
TQDM_AVAILABLE = False
# Cloudflare handler for protected sites
try:
from modules.cloudflare_handler import (
CloudflareHandler, SiteStatus, get_flaresolverr_user_agent,
get_playwright_context_options, get_playwright_stealth_scripts
)
CLOUDFLARE_HANDLER_AVAILABLE = True
except ImportError:
CLOUDFLARE_HANDLER_AVAILABLE = False
# Fallback functions if import fails
def get_flaresolverr_user_agent():
return 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36'
def get_playwright_context_options():
return {
'viewport': {'width': 1920, 'height': 1080},
'user_agent': get_flaresolverr_user_agent(),
'locale': 'en-US',
'timezone_id': 'America/New_York',
'color_scheme': 'light'
}
def get_playwright_stealth_scripts():
return "Object.defineProperty(navigator, 'webdriver', { get: () => undefined });"
class ForumType(Enum):
"""Supported forum types"""
XENOFORO = "xenoforo"
VBULLETIN = "vbulletin"
PHPBB = "phpbb"
DISCOURSE = "discourse"
INVISION = "invision"
MYBB = "mybb"
SMF = "smf"
UNKNOWN = "unknown"
class ForumAuthenticator:
"""Handle authentication for various forum platforms"""
def __init__(self, log_func=None):
self.credentials = {}
self.cookies = {}
self.session_data = {}
self.log = log_func if log_func else lambda *args: None
def detect_forum_type(self, page) -> ForumType:
"""Detect the forum software type from page content"""
try:
# Check meta tags and common identifiers
html = page.content()
url = page.url.lower()
# Known XenForo forums - check URL first to avoid false positives
if 'phun.org' in url or 'forum.phun.org' in url:
return ForumType.XENOFORO
# XenForo detection
if 'data-app="public"' in html or 'XenForo' in html or 'xf-init' in html:
return ForumType.XENOFORO
# vBulletin detection
if 'vBulletin' in html or 'vbulletin_' in html or 'vbmenu_' in html:
return ForumType.VBULLETIN
# phpBB detection
if 'phpBB' in html or 'phpbb' in html or 'viewtopic.php' in html:
return ForumType.PHPBB
# Discourse detection
if 'discourse' in html.lower() or 'data-discourse-' in html:
return ForumType.DISCOURSE
# Invision Power Board detection
if 'ips4' in html or 'ipb' in html.lower() or 'invisioncommunity' in html:
return ForumType.INVISION
# MyBB detection
if 'mybb' in html.lower() or 'MyBB' in html:
return ForumType.MYBB
# SMF (Simple Machines Forum) detection
if 'SMF' in html or 'smf_' in html:
return ForumType.SMF
except Exception as e:
forum_logger.error(f"Error detecting forum type: {e}")
return ForumType.UNKNOWN
def login_xenoforo(self, page, username: str, password: str, login_url: str) -> bool:
"""Login to XenForo forums"""
try:
page.goto(login_url, wait_until='domcontentloaded', timeout=60000)
# Try to wait for networkidle but don't fail if it times out
# (Cloudflare-protected sites may have ongoing background requests)
try:
page.wait_for_load_state('networkidle', timeout=15000)
except PlaywrightTimeout:
page.wait_for_timeout(3000) # Give page a moment to stabilize
# Look for login form
if page.locator('input[name="login"]').count() > 0:
# XenForo 2.x
page.fill('input[name="login"]', username)
page.fill('input[name="password"]', password)
# Check for remember me checkbox
if page.locator('input[name="remember"]').count() > 0:
page.check('input[name="remember"]')
# Try different submit button selectors
# First try the visible login button
login_buttons = [
'button:has-text("Log in")',
'button:has-text("Login")',
'button.button--primary:not(.button--icon)',
'button[type="submit"]:visible',
'input[type="submit"]:visible',
'button.button--primary[type="submit"]'
]
clicked = False
for selector in login_buttons:
try:
if page.locator(selector).count() > 0:
# Make sure it's the login button, not search
button = page.locator(selector).first
button_text = button.inner_text()
if 'search' not in button_text.lower():
button.click()
clicked = True
break
except Exception:
continue
if not clicked:
# Try pressing Enter in password field
page.locator('input[name="password"]').press('Enter')
elif page.locator('input[id="ctrl_pageLogin_login"]').count() > 0:
# XenForo 1.x
page.fill('input[id="ctrl_pageLogin_login"]', username)
page.fill('input[id="ctrl_pageLogin_password"]', password)
if page.locator('input[id="ctrl_pageLogin_remember"]').count() > 0:
page.check('input[id="ctrl_pageLogin_remember"]')
page.click('input[type="submit"]')
# Wait for login to process - use domcontentloaded with fallback
try:
page.wait_for_load_state('networkidle', timeout=15000)
except PlaywrightTimeout:
pass
page.wait_for_timeout(2000) # Wait a bit for login to process
# Check if login was successful
return self._verify_login(page, username)
except Exception as e:
forum_logger.error(f"XenForo login error: {e}")
return False
def login_vbulletin(self, page, username: str, password: str, login_url: str) -> bool:
"""Login to vBulletin forums"""
try:
page.goto(login_url, wait_until='domcontentloaded', timeout=60000)
try:
page.wait_for_load_state('networkidle', timeout=15000)
except PlaywrightTimeout:
page.wait_for_timeout(3000)
# vBulletin 5.x
if page.locator('input[name="username"]').count() > 0:
page.fill('input[name="username"]', username)
page.fill('input[name="password"]', password)
# Remember me
if page.locator('input[name="cookieuser"]').count() > 0:
page.check('input[name="cookieuser"]')
page.click('input[type="submit"], button[type="submit"]')
# vBulletin 3.x/4.x
elif page.locator('input[name="vb_login_username"]').count() > 0:
page.fill('input[name="vb_login_username"]', username)
page.fill('input[name="vb_login_password"]', password)
if page.locator('input[name="cookieuser"]').count() > 0:
page.check('input[name="cookieuser"]')
page.click('input[type="submit"]')
page.wait_for_load_state('networkidle')
return self._verify_login(page, username)
except Exception as e:
forum_logger.error(f"vBulletin login error: {e}")
return False
def login_phpbb(self, page, username: str, password: str, login_url: str) -> bool:
"""Login to phpBB forums"""
try:
page.goto(login_url)
page.wait_for_load_state('networkidle')
# Standard phpBB login
if page.locator('input[name="username"]').count() > 0:
page.fill('input[name="username"]', username)
page.fill('input[name="password"]', password)
# Auto login
if page.locator('input[name="autologin"]').count() > 0:
page.check('input[name="autologin"]')
page.click('input[name="login"], input[type="submit"]')
page.wait_for_load_state('networkidle')
return self._verify_login(page, username)
except Exception as e:
forum_logger.error(f"phpBB login error: {e}")
return False
def login_discourse(self, page, username: str, password: str, login_url: str) -> bool:
"""Login to Discourse forums"""
try:
page.goto(login_url)
page.wait_for_load_state('networkidle')
# Click login button if needed
if page.locator('button.login-button').count() > 0:
page.click('button.login-button')
page.wait_for_timeout(1000)
# Fill login form
if page.locator('input[id="login-account-name"]').count() > 0:
page.fill('input[id="login-account-name"]', username)
page.fill('input[id="login-account-password"]', password)
page.click('button[id="login-button"]')
page.wait_for_load_state('networkidle')
return self._verify_login(page, username)
except Exception as e:
forum_logger.error(f"Discourse login error: {e}")
return False
def login_invision(self, page, username: str, password: str, login_url: str) -> bool:
"""Login to Invision Power Board forums"""
try:
page.goto(login_url)
page.wait_for_load_state('networkidle')
# IPB 4.x
if page.locator('input[name="auth"]').count() > 0:
page.fill('input[name="auth"]', username)
page.fill('input[name="password"]', password)
if page.locator('input[name="remember_me"]').count() > 0:
page.check('input[name="remember_me"]')
page.click('button[type="submit"]')
# Older versions
elif page.locator('input[name="UserName"]').count() > 0:
page.fill('input[name="UserName"]', username)
page.fill('input[name="PassWord"]', password)
page.click('input[type="submit"]')
page.wait_for_load_state('networkidle')
return self._verify_login(page, username)
except Exception as e:
forum_logger.error(f"Invision login error: {e}")
return False
def login_mybb(self, page, username: str, password: str, login_url: str) -> bool:
"""Login to MyBB forums"""
try:
page.goto(login_url)
page.wait_for_load_state('networkidle')
if page.locator('input[name="username"]').count() > 0:
page.fill('input[name="username"]', username)
page.fill('input[name="password"]', password)
if page.locator('input[name="remember"]').count() > 0:
page.check('input[name="remember"]')
page.click('input[type="submit"]')
page.wait_for_load_state('networkidle')
return self._verify_login(page, username)
except Exception as e:
forum_logger.error(f"MyBB login error: {e}")
return False
def login_smf(self, page, username: str, password: str, login_url: str) -> bool:
"""Login to Simple Machines Forum"""
try:
page.goto(login_url)
page.wait_for_load_state('networkidle')
if page.locator('input[name="user"]').count() > 0:
page.fill('input[name="user"]', username)
page.fill('input[name="passwrd"]', password)
if page.locator('input[name="cookielength"]').count() > 0:
page.select_option('select[name="cookielength"]', 'always')
page.click('input[type="submit"]')
page.wait_for_load_state('networkidle')
return self._verify_login(page, username)
except Exception as e:
forum_logger.error(f"SMF login error: {e}")
return False
def _verify_login(self, page, username: str) -> bool:
"""Verify if login was successful"""
try:
html = page.content().lower()
username_lower = username.lower()
# Common indicators of successful login
success_indicators = [
f'welcome, {username_lower}',
f'hello {username_lower}',
f'logged in as {username_lower}',
username_lower,
'logout',
'log out',
'sign out',
'private messages',
'notifications',
'user cp',
'control panel'
]
for indicator in success_indicators:
if indicator in html:
return True
# Check for login error messages
error_indicators = [
'invalid',
'incorrect',
'error',
'failed',
'wrong password',
'not found'
]
for error in error_indicators:
if error in html and 'login' in html:
return False
except Exception as e:
forum_logger.error(f"Login verification error: {e}")
return False
def login_with_type(self, page, username: str, password: str, forum_url: str, forum_type_str: str) -> bool:
"""Login with explicitly specified forum type"""
# Convert string to ForumType enum
forum_type_map = {
'xenoforo': ForumType.XENOFORO,
'vbulletin': ForumType.VBULLETIN,
'phpbb': ForumType.PHPBB,
'discourse': ForumType.DISCOURSE,
'ipb': ForumType.INVISION,
'invision': ForumType.INVISION,
'mybb': ForumType.MYBB,
'smf': ForumType.SMF,
'unknown': ForumType.UNKNOWN
}
forum_type = forum_type_map.get(forum_type_str.lower(), ForumType.UNKNOWN)
forum_logger.info(f"Using specified forum type: {forum_type.value}")
# Determine login URL based on forum type
login_urls = {
ForumType.XENOFORO: f"{forum_url}/login",
ForumType.VBULLETIN: f"{forum_url}/login.php",
ForumType.PHPBB: f"{forum_url}/ucp.php?mode=login",
ForumType.DISCOURSE: f"{forum_url}/login",
ForumType.INVISION: f"{forum_url}/login",
ForumType.MYBB: f"{forum_url}/member.php?action=login",
ForumType.SMF: f"{forum_url}/index.php?action=login"
}
login_url = login_urls.get(forum_type, f"{forum_url}/login")
login_methods = {
ForumType.XENOFORO: lambda p, u, pw: self.login_xenoforo(p, u, pw, login_url),
ForumType.VBULLETIN: lambda p, u, pw: self.login_vbulletin(p, u, pw, login_url),
ForumType.PHPBB: lambda p, u, pw: self.login_phpbb(p, u, pw, login_url),
ForumType.DISCOURSE: lambda p, u, pw: self.login_discourse(p, u, pw, login_url),
ForumType.INVISION: lambda p, u, pw: self.login_invision(p, u, pw, login_url),
ForumType.MYBB: lambda p, u, pw: self.login_mybb(p, u, pw, login_url),
ForumType.SMF: lambda p, u, pw: self.login_smf(p, u, pw, login_url)
}
login_method = login_methods.get(forum_type)
if login_method:
return login_method(page, username, password)
forum_logger.warning(f"Unknown forum type: {forum_type_str}, attempting auto-detection")
return self.auto_login(page, username, password, forum_url)
def auto_login(self, page, username: str, password: str, forum_url: str) -> bool:
"""Automatically detect forum type and login"""
# Navigate to the forum URL first to detect forum type
page.goto(forum_url, wait_until='domcontentloaded')
page.wait_for_timeout(1000)
forum_type = self.detect_forum_type(page)
forum_logger.info(f"Detected forum type: {forum_type.value}")
login_methods = {
ForumType.XENOFORO: self.login_xenoforo,
ForumType.VBULLETIN: self.login_vbulletin,
ForumType.PHPBB: self.login_phpbb,
ForumType.DISCOURSE: self.login_discourse,
ForumType.INVISION: self.login_invision,
ForumType.MYBB: self.login_mybb,
ForumType.SMF: self.login_smf
}
if forum_type in login_methods:
# Try to find login page
login_url = self._find_login_url(page, forum_url)
if login_url:
return login_methods[forum_type](page, username, password, login_url)
forum_logger.warning(f"Unsupported or unknown forum type: {forum_type.value}")
return False
def _find_login_url(self, page, base_url: str) -> Optional[str]:
"""Find the login URL for a forum"""
common_paths = [
'/login',
'/login/',
'/index.php?login/',
'/login.php',
'/member.php?action=login',
'/ucp.php?mode=login',
'/index.php?action=login',
'/account/login',
'/signin',
'/user/login'
]
# Try common login paths
for path in common_paths:
login_url = urljoin(base_url, path)
try:
page.goto(login_url, wait_until='domcontentloaded', timeout=5000)
if 'login' in page.content().lower() or 'sign in' in page.content().lower():
return login_url
except Exception as e:
self.log(f"Failed to check login path {path}: {e}", level="debug")
continue
# Try to find login link on current page
try:
page.goto(base_url)
login_link = page.locator('a:has-text("Login"), a:has-text("Sign In"), a:has-text("Log In")').first
if login_link:
return login_link.get_attribute('href')
except Exception as e:
self.log(f"Failed to find login link on base page: {e}", level="debug")
return None
def save_cookies(self, page, forum_name: str):
"""Save cookies for session persistence"""
cookies = page.context.cookies()
self.cookies[forum_name] = cookies
# Save to file for persistence in cookies directory
cookies_dir = Path("cookies")
cookies_dir.mkdir(exist_ok=True)
cookies_file = cookies_dir / f"forum_cookies_{forum_name}.json"
with open(cookies_file, 'w') as f:
json.dump(cookies, f)
def load_cookies(self, context, forum_name: str) -> bool:
"""Load saved cookies"""
# Prioritize cookies directory, then check root for backwards compatibility
possible_paths = [
Path("cookies") / f"forum_cookies_{forum_name}.json",
Path(f"forum_cookies_{forum_name}.json") # backwards compatibility
]
cookies_file = None
for path in possible_paths:
if path.exists():
cookies_file = path
self.log(f"Found cookie file at: {path}", "debug")
break
if cookies_file and cookies_file.exists():
try:
with open(cookies_file, 'r') as f:
data = json.load(f)
# Handle both formats: raw list or CloudflareHandler dict format
if isinstance(data, list):
cookies = data
elif isinstance(data, dict) and 'cookies' in data:
cookies = data['cookies']
else:
self.log(f"Unknown cookie format for {forum_name}", "debug")
return False
# Format cookies for Playwright
formatted_cookies = []
for c in cookies:
cookie = {
'name': c['name'],
'value': c['value'],
'domain': c['domain'],
'path': c.get('path', '/'),
'secure': c.get('secure', True),
'httpOnly': c.get('httpOnly', False)
}
if c.get('expiry'):
cookie['expires'] = c['expiry']
if c.get('sameSite'):
cookie['sameSite'] = c['sameSite']
formatted_cookies.append(cookie)
context.add_cookies(formatted_cookies)
self.cookies[forum_name] = cookies
self.log(f"Successfully loaded {len(cookies)} cookies for {forum_name}", "debug")
return True
except Exception as e:
self.log(f"Error loading cookies: {e}", "debug")
else:
self.log(f"No cookie file found for {forum_name}", "debug")
return False
class ImageHostHandler:
"""Handle downloads from various image hosting services"""
# Supported image hosts and their patterns
IMAGE_HOSTS = {
'imgur': {
'domains': ['imgur.com', 'i.imgur.com'],
'patterns': [
r'https?://(?:i\.)?imgur\.com/([a-zA-Z0-9]+)(?:\.([a-z]+))?',
r'https?://imgur\.com/a/([a-zA-Z0-9]+)', # Albums
r'https?://imgur\.com/gallery/([a-zA-Z0-9]+)' # Galleries
]
},
'imgbb': {
'domains': ['imgbb.com', 'i.ibb.co', 'ibb.co'],
'patterns': [
r'https?://(?:i\.)?ibb\.co/([a-zA-Z0-9]+)',
r'https?://imgbb\.com/image/([a-zA-Z0-9]+)'
]
},
'postimage': {
'domains': ['postimg.cc', 'postimages.org', 'i.postimg.cc'],
'patterns': [
r'https?://(?:i\.)?postimg\.cc/([a-zA-Z0-9]+)/([a-zA-Z0-9\-]+)',
r'https?://postimages\.org/image/([a-zA-Z0-9]+)'
]
},
'imagebam': {
'domains': ['imagebam.com', 'www.imagebam.com'],
'patterns': [
r'https?://(?:www\.)?imagebam\.com/(?:image|view)/([a-zA-Z0-9]+)'
]
},
'imagevenue': {
'domains': ['imagevenue.com', 'img[0-9]+.imagevenue.com'],
'patterns': [
r'https?://img[0-9]+\.imagevenue\.com/.*?/([a-zA-Z0-9_]+\.(?:jpg|jpeg|png|gif))'
]
},
'pixhost': {
'domains': ['pixhost.to', 't.pixhost.to'],
'patterns': [
r'https?://(?:t\.)?pixhost\.to/(?:show|thumbs)/([0-9]+)/([a-zA-Z0-9_\-]+)'
]
},
'catbox': {
'domains': ['catbox.moe', 'files.catbox.moe'],
'patterns': [
r'https?://files\.catbox\.moe/([a-zA-Z0-9]+\.[a-z]+)'
]
},
'imagetwist': {
'domains': ['imagetwist.com', 'phun.imagetwist.com', 'i.imagetwist.com'],
'patterns': [
r'https?://(?:phun\.)?imagetwist\.com/([a-zA-Z0-9]+)',
r'https?://i\.imagetwist\.com/[^/]+/([a-zA-Z0-9]+\.[a-z]+)'
]
}
}
@classmethod
def identify_host(cls, url: str) -> Optional[str]:
"""Identify which image host a URL belongs to"""
domain = urlparse(url).netloc.lower()
for host_name, host_info in cls.IMAGE_HOSTS.items():
for host_domain in host_info['domains']:
if host_domain in domain or re.match(host_domain, domain):
return host_name
return None
@classmethod
def extract_direct_url(cls, url: str, page_content: str = None) -> Optional[str]:
"""Extract direct image URL from image host page"""
host = cls.identify_host(url)
if not host:
return None
# Direct extraction methods for known hosts
if host == 'imgur':
# Convert gallery/album URLs to direct image URLs
if '/a/' in url or '/gallery/' in url:
# Would need to fetch album data via Imgur API or scraping
return None
# Convert to direct image URL
if 'i.imgur.com' not in url:
match = re.search(r'imgur\.com/([a-zA-Z0-9]+)', url)
if match:
return f"https://i.imgur.com/{match.group(1)}.jpg"
return url
elif host == 'imgbb':
if 'i.ibb.co' in url:
return url # Already direct
# Parse page for direct URL
if page_content:
soup = BeautifulSoup(page_content, 'html.parser')
img = soup.find('img', {'class': 'main-image'}) or soup.find('img', {'id': 'image-viewer-container'})
if img and img.get('src'):
return img['src']
elif host == 'catbox':
if 'files.catbox.moe' in url:
return url # Already direct
# Add more host-specific extraction logic as needed
return None
class ForumDownloader(LoggingMixin):
"""
Forum downloader with database tracking and monitoring
Features:
- Download threads, posts, and search results
- Monitor searches for new content
- Track threads for updates
- Support multiple image hosts
- Database tracking to avoid re-downloads
- Automatic retry and rate limiting
"""
def __init__(self,
headless: bool = True,
show_progress: bool = True,
use_database: bool = True,
db_path = None,
download_dir: str = "forum_downloads",
max_retries: int = 3,
rate_limit: Tuple[int, int] = (1, 3),
user_agent: str = None,
forum_type: str = None,
log_callback=None):
"""
Initialize forum downloader
Args:
headless: Run browser in headless mode
show_progress: Show progress messages
use_database: Enable database tracking
db_path: Path to SQLite database
download_dir: Base directory for downloads
max_retries: Maximum retry attempts
rate_limit: (min, max) seconds between requests
user_agent: Custom user agent string
"""
self.headless = headless
self.show_progress = show_progress
self.use_database = use_database
# Check if db_path is actually a database adapter object
if hasattr(db_path, 'unified_db'):
# It's an adapter - use it directly
self.db_adapter = db_path
self.db_path = None # Not needed when using adapter
self.use_database = True
else:
# It's a regular path - use traditional database
self.db_adapter = None
self.db_path = db_path
self.download_dir = Path(download_dir)
# Don't create directory here - only create when actually downloading
self.max_retries = max_retries
self.rate_limit = rate_limit
self.user_agent = user_agent or self._get_random_user_agent()
# Initialize logging via mixin
self._init_logger('Forum', log_callback, default_module='Download')
# Statistics
self.stats = {
'threads_processed': 0,
'posts_downloaded': 0,
'images_downloaded': 0,
'searches_monitored': 0,
'new_threads_found': 0,
'errors': 0
}
self.pending_downloads = [] # Track downloads for deferred database recording
# Authentication
self.authenticator = ForumAuthenticator(log_func=self.log)
self.logged_in_forums = {}
# Browser context for session persistence
self.browser = None
self.context = None
self.playwright = None
# Forum type (can pre-set to skip detection)
if forum_type:
forum_type_map = {
'xenoforo': ForumType.XENOFORO,
'xenforo': ForumType.XENOFORO,
'vbulletin': ForumType.VBULLETIN,
'phpbb': ForumType.PHPBB,
'discourse': ForumType.DISCOURSE,
'invision': ForumType.INVISION,
'mybb': ForumType.MYBB,
'smf': ForumType.SMF
}
self.forum_type = forum_type_map.get(forum_type.lower(), None)
else:
self.forum_type = None
# FlareSolverr configuration
self.flaresolverr_url = "http://localhost:8191/v1"
self.flaresolverr_enabled = True # Set to False to disable
# Update User-Agent to match FlareSolverr if not custom (dynamically fetched)
if not user_agent:
self.user_agent = get_flaresolverr_user_agent()
# Initialize database (skip if using adapter)
if self.use_database and not self.db_adapter:
self._init_database()
# Initialize activity status manager for real-time updates
from modules.activity_status import get_activity_manager
unified_db_instance = self.db_adapter.unified_db if self.db_adapter else None
self.unified_db = unified_db_instance # Store for scraper config access
self.activity_manager = get_activity_manager(unified_db_instance)
# Set OMDB API key from settings for TV show date lookups
if DATE_UTILS_AVAILABLE and unified_db_instance:
try:
from modules.settings_manager import SettingsManager
settings = SettingsManager(unified_db_instance)
omdb_config = settings.get('omdb', {})
omdb_api_key = omdb_config.get('api_key', '')
if omdb_api_key:
DateHandler.set_omdb_api_key(omdb_api_key)
self.log("OMDB API key configured for date lookups", "debug")
except Exception as e:
self.log(f"Could not load OMDB API key from settings: {e}", "debug")
def _create_browser_context(self, browser, **extra_options):
"""Create a browser context with dynamic fingerprinting from FlareSolverr.
Args:
browser: Playwright browser instance
**extra_options: Additional options to merge (e.g., proxy)
Returns:
Browser context with proper fingerprinting
"""
context_options = get_playwright_context_options()
context_options.update(extra_options)
self.log(f"Using fingerprint: Chrome {context_options.get('extra_http_headers', {}).get('Sec-Ch-Ua', 'unknown')[:30]}...", "debug")
context = browser.new_context(**context_options)
# Add anti-detection scripts
context.add_init_script(get_playwright_stealth_scripts())
return context
def _get_forum_scraper_id(self, forum_name: str) -> str:
"""Convert forum name to scraper ID format"""
# Normalize forum name to match database IDs
normalized = forum_name.lower().replace(' ', '_').replace('.', '_').replace('-', '_')
return f"forum_{normalized}"
def _get_forum_scraper_config(self, forum_name: str) -> Optional[Dict]:
"""Get scraper configuration for a forum from database"""
if not self.unified_db:
return None
scraper_id = self._get_forum_scraper_id(forum_name)
try:
return self.unified_db.get_scraper(scraper_id)
except Exception as e:
self.log(f"Error getting scraper config for {forum_name}: {e}", "warning")
return None
def _get_forum_proxy_url(self, forum_name: str) -> Optional[str]:
"""Get proxy URL for a forum from database config"""
config = self._get_forum_scraper_config(forum_name)
if config and config.get('proxy_enabled') and config.get('proxy_url'):
return config['proxy_url']
return None
def _get_cookies_for_requests(self, forum_name: str = None):
"""Get cookies in format for requests library from database, FlareSolverr, or forum cookies"""
cookies = {}
# Try database first if available
if forum_name and self.unified_db:
scraper_id = self._get_forum_scraper_id(forum_name)
try:
cookie_list = self.unified_db.get_scraper_cookies(scraper_id)
if cookie_list:
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
return cookies
except Exception as e:
self.log(f"Error loading cookies from database for {forum_name}: {e}", "debug")
# Fall back to cookie file
if forum_name:
cookies_file = Path(f"cookies/forum_cookies_{forum_name}.json")
if cookies_file.exists():
try:
with open(cookies_file, 'r') as f:
data = json.load(f)
# If it's FlareSolverr format with timestamp
if isinstance(data, dict) and 'cookies' in data:
for cookie in data['cookies']:
cookies[cookie['name']] = cookie['value']
# If it's raw cookie list
elif isinstance(data, list):
for cookie in data:
cookies[cookie['name']] = cookie['value']
except (json.JSONDecodeError, KeyError, TypeError) as e:
self.log(f"Failed to parse cookies from {cookie_file}: {e}", level="debug")
return cookies
def _navigate_with_cloudflare(self, page, url: str, forum_name: str = None,
cloudflare_enabled: bool = False,
wait_until: str = 'networkidle',
timeout: int = 60000) -> bool:
"""Navigate to a URL with Cloudflare bypass support
Args:
page: Playwright page object
url: URL to navigate to
forum_name: Forum name for cookie management
cloudflare_enabled: Whether this forum uses Cloudflare protection
wait_until: Playwright wait condition
timeout: Navigation timeout in ms
Returns:
True if navigation succeeded, False otherwise
"""
if not cloudflare_enabled:
# Standard navigation without Cloudflare handling
try:
page.goto(url, wait_until=wait_until, timeout=timeout)
return True
except PlaywrightTimeout:
self.log(f"Navigation timeout for {url}", "error")
return False
# Cloudflare-protected navigation
if not CLOUDFLARE_HANDLER_AVAILABLE:
self.log("CloudflareHandler not available, falling back to standard navigation", "warning")
try:
page.goto(url, wait_until=wait_until, timeout=timeout)
return True
except PlaywrightTimeout:
return False
# Parse domain for CloudflareHandler
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
# Get proxy URL from database config if available
proxy_url = self._get_forum_proxy_url(forum_name) if forum_name else None
# Use database for cookies if unified_db available, otherwise use file
cookie_file = None
if not self.unified_db:
cookie_file = f"cookies/forum_cookies_{forum_name}.json" if forum_name else "cookies/forum_cloudflare.json"
# Initialize CloudflareHandler for this forum
cf_handler = CloudflareHandler(
module_name=f"Forum.{forum_name}" if forum_name else "Forum",
cookie_file=cookie_file,
flaresolverr_url=self.flaresolverr_url,
flaresolverr_enabled=self.flaresolverr_enabled,
user_agent=self.user_agent,
logger=self.logger,
aggressive_expiry=False, # Use conservative expiry for forum cookies
proxy_url=proxy_url # Pass proxy to FlareSolverr
)
# Load cookies from database if available
if self.unified_db and forum_name:
scraper_id = self._get_forum_scraper_id(forum_name)
try:
cookies = self.unified_db.get_scraper_cookies(scraper_id)
if cookies:
cf_handler._cookies = cookies
except Exception as e:
self.log(f"Error loading cookies from database: {e}", "debug")
# Always load existing cookies into the page context first
# This is critical for new pages that don't have cookies loaded
existing_cookies = cf_handler.get_cookies_list()
if existing_cookies:
self.log(f"Loading {len(existing_cookies)} existing Cloudflare cookies for {forum_name}", "debug")
try:
page.context.add_cookies(existing_cookies)
except Exception as e:
self.log(f"Error loading cookies: {e}", "debug")
# Check if we need fresh cookies
if cf_handler.cookies_expired():
self.log(f"Cloudflare cookies expired for {forum_name}, refreshing via FlareSolverr...", "info")
if cf_handler.get_cookies_via_flaresolverr(base_url):
self.log(f"Successfully refreshed Cloudflare cookies for {forum_name}", "success")
# Reload cookies into browser context
cookies = cf_handler.get_cookies_list()
if cookies:
page.context.add_cookies(cookies)
# Save cookies to database
if self.unified_db and forum_name:
scraper_id = self._get_forum_scraper_id(forum_name)
try:
self.unified_db.save_scraper_cookies(scraper_id, cookies, self.user_agent)
self.log(f"Saved {len(cookies)} Cloudflare cookies to database for {forum_name}", "debug")
except Exception as e:
self.log(f"Error saving cookies to database: {e}", "debug")
else:
self.log(f"Failed to refresh Cloudflare cookies for {forum_name}", "warning")
# Navigate to the URL with longer timeout for Cloudflare
try:
# Use domcontentloaded instead of networkidle for Cloudflare pages
# networkidle can timeout during challenge
page.goto(url, wait_until='domcontentloaded', timeout=timeout)
# Wait a moment for any Cloudflare JavaScript to execute
page.wait_for_timeout(3000)
# Check for Cloudflare challenge
try:
content = page.content().lower()
except Exception as e:
# Page might still be navigating
self.log(f"Page still loading, waiting...", "debug")
page.wait_for_timeout(5000)
content = page.content().lower()
challenge_indicators = [
'challenge-platform',
'checking your browser',
'just a moment',
'verify you are human',
'cf-challenge'
]
# Only consider it a challenge if we find indicators AND the page is short
# (Real forum pages are much longer than Cloudflare challenge pages)
is_challenge = any(indicator in content for indicator in challenge_indicators) and len(content) < 10000
if is_challenge:
self.log(f"Cloudflare challenge detected for {forum_name}, waiting for resolution...", "info")
# Wait for challenge to resolve (up to 120 seconds)
start_time = time.time()
while time.time() - start_time < 120:
try:
page.wait_for_timeout(3000)
content = page.content().lower()
# Check if challenge is still present
still_challenge = any(ind in content for ind in challenge_indicators) and len(content) < 10000
if not still_challenge:
self.log(f"Cloudflare challenge resolved for {forum_name}", "success")
# Save the new cookies
cf_handler.save_cookies_from_playwright(page.context)
# Also save to database if available
if self.unified_db and forum_name:
scraper_id = self._get_forum_scraper_id(forum_name)
cookies = cf_handler.get_cookies_list()
if cookies:
try:
self.unified_db.save_scraper_cookies(scraper_id, cookies, self.user_agent)
self.log(f"Saved {len(cookies)} Cloudflare cookies to database for {forum_name}", "debug")
except Exception as e:
self.log(f"Error saving cookies to database: {e}", "debug")
return True
# Log progress
elapsed = int(time.time() - start_time)
if elapsed % 15 == 0 and elapsed > 0:
self.log(f"Still waiting for Cloudflare ({elapsed}s)...", "debug")
except Exception as e:
self.log(f"Error during Cloudflare wait: {e}", "debug")
self.log(f"Cloudflare challenge did not resolve for {forum_name} after 120s", "error")
return False
# No challenge detected - check if we're on the right page
# Try to wait for networkidle, but don't fail if it times out
try:
page.wait_for_load_state('networkidle', timeout=15000)
except PlaywrightTimeout:
# Page may be loaded enough even if networkidle times out
self.log(f"networkidle timeout for {url}, checking if page is usable...", "debug")
# Verify we're on the expected page (not blocked/redirected)
if 'celebboard' in url.lower() and 'celebboard' in page.url.lower():
self.log(f"Successfully navigated to {page.url}", "success")
return True
return True
except PlaywrightTimeout:
self.log(f"Navigation timeout for Cloudflare-protected URL: {url}", "error")
return False
except Exception as e:
self.log(f"Navigation error for {url}: {e}", "error")
return False
def _get_random_user_agent(self) -> str:
"""Get random user agent for requests"""
agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
]
return random.choice(agents)
def get_pending_downloads(self):
"""Get list of downloads that were deferred for later recording"""
return self.pending_downloads.copy()
def clear_pending_downloads(self):
"""Clear the pending downloads list after they've been recorded"""
self.pending_downloads = []
def _get_db_connection(self):
"""Get database connection - either from adapter or direct"""
if self.db_adapter:
# Use temporary database for compatibility
if not hasattr(self, '_temp_db_path'):
import tempfile
self._temp_db_path = tempfile.mktemp(suffix='.db')
# Initialize temporary database
temp_conn = sqlite3.connect(self._temp_db_path)
self._init_database_conn(temp_conn)
temp_conn.close()
return sqlite3.connect(self._temp_db_path)
elif self.db_path:
return sqlite3.connect(self.db_path)
else:
# No database configured - create in-memory database
return sqlite3.connect(':memory:')
def _init_database_conn(self, conn):
"""Initialize database schema using provided connection"""
cursor = conn.cursor()
self._create_database_tables(cursor)
conn.commit()
def _init_database(self):
"""Initialize SQLite database for tracking"""
if self.db_adapter:
# Skip initialization when using adapter - it has its own database
return
conn = self._get_db_connection()
cursor = conn.cursor()
self._create_database_tables(cursor)
conn.commit()
conn.close()
def _create_database_tables(self, cursor):
"""Create database tables"""
# Threads table
cursor.execute('''
CREATE TABLE IF NOT EXISTS threads (
thread_id TEXT PRIMARY KEY,
forum_name TEXT,
thread_url TEXT UNIQUE,
thread_title TEXT,
author TEXT,
created_date DATETIME,
last_checked DATETIME,
last_post_date DATETIME,
post_count INTEGER DEFAULT 0,
status TEXT DEFAULT 'active',
monitor_until DATETIME,
metadata TEXT
)
''')
# Posts table
cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
post_id TEXT PRIMARY KEY,
thread_id TEXT,
post_url TEXT UNIQUE,
author TEXT,
post_date DATETIME,
content_hash TEXT,
has_images BOOLEAN DEFAULT 0,
downloaded BOOLEAN DEFAULT 0,
download_date DATETIME,
metadata TEXT,
FOREIGN KEY (thread_id) REFERENCES threads (thread_id)
)
''')
# Images table
cursor.execute('''
CREATE TABLE IF NOT EXISTS images (
image_id TEXT PRIMARY KEY,
post_id TEXT,
image_url TEXT,
direct_url TEXT,
filename TEXT,
file_hash TEXT,
downloaded BOOLEAN DEFAULT 0,
download_date DATETIME,
file_size INTEGER,
metadata TEXT,
FOREIGN KEY (post_id) REFERENCES posts (post_id)
)
''')
# Searches table
cursor.execute('''
CREATE TABLE IF NOT EXISTS searches (
search_id TEXT PRIMARY KEY,
forum_name TEXT,
search_query TEXT,
search_url TEXT,
last_checked DATETIME,
check_frequency_hours INTEGER DEFAULT 24,
active BOOLEAN DEFAULT 1,
results_found INTEGER DEFAULT 0,
metadata TEXT
)
''')
# Search results table (links searches to threads)
cursor.execute('''
CREATE TABLE IF NOT EXISTS search_results (
search_id TEXT,
thread_id TEXT,
found_date DATETIME,
PRIMARY KEY (search_id, thread_id),
FOREIGN KEY (search_id) REFERENCES searches (search_id),
FOREIGN KEY (thread_id) REFERENCES threads (thread_id)
)
''')
# Download queue table (similar to fastdl_module)
cursor.execute('''
CREATE TABLE IF NOT EXISTS download_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
referer TEXT,
save_path TEXT NOT NULL,
thread_id TEXT,
post_id TEXT,
forum_name TEXT,
status TEXT DEFAULT 'pending',
attempts INTEGER DEFAULT 0,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
downloaded_date DATETIME,
error_message TEXT,
file_hash TEXT,
metadata TEXT
)
''')
# Create indexes - both single and composite for optimization
# Single column indexes
cursor.execute('CREATE INDEX IF NOT EXISTS idx_threads_status ON threads(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_threads_monitor ON threads(monitor_until)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_posts_thread ON posts(thread_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_posts_downloaded ON posts(downloaded)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_images_post ON images(post_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_searches_active ON searches(active)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_queue_status ON download_queue(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_queue_url ON download_queue(url)')
# Composite indexes for common query patterns
cursor.execute('CREATE INDEX IF NOT EXISTS idx_threads_forum_status ON threads(forum_name, status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_posts_thread_downloaded ON posts(thread_id, downloaded)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_images_post_downloaded ON images(post_id, downloaded)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_queue_status_attempts ON download_queue(status, attempts)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_searches_forum_active ON searches(forum_name, active)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_threads_monitor_status ON threads(monitor_until, status)')
# Enable WAL mode for better concurrency
cursor.execute('PRAGMA journal_mode=WAL')
cursor.execute('PRAGMA synchronous=NORMAL')
# Create triggers for automatic cleanup
# Clean up old completed downloads after 90 days
cursor.execute('''
CREATE TRIGGER IF NOT EXISTS cleanup_old_downloads
AFTER INSERT ON download_queue
WHEN (SELECT COUNT(*) FROM download_queue WHERE status = 'completed') > 10000
BEGIN
DELETE FROM download_queue
WHERE status = 'completed'
AND downloaded_date < datetime('now', '-90 days');
END
''')
# Clean up expired monitoring threads
cursor.execute('''
CREATE TRIGGER IF NOT EXISTS cleanup_expired_monitors
AFTER INSERT ON threads
BEGIN
UPDATE threads
SET status = 'expired'
WHERE monitor_until IS NOT NULL
AND monitor_until < datetime('now')
AND status = 'active';
END
''')
# Clean up old search results after 180 days
cursor.execute('''
CREATE TRIGGER IF NOT EXISTS cleanup_old_search_results
AFTER INSERT ON search_results
WHEN (SELECT COUNT(*) FROM search_results) > 50000
BEGIN
DELETE FROM search_results
WHERE found_date < datetime('now', '-180 days');
END
''')
def reset_download_queue(self, forum_name=None, status=None):
"""Reset download queue by removing records
Args:
forum_name: If specified, only reset records for this forum
status: If specified, only reset records with this status
Returns:
Number of records deleted
"""
if not self.use_database:
return 0
conn = self._get_db_connection()
cursor = conn.cursor()
try:
if forum_name and status:
cursor.execute(
"DELETE FROM download_queue WHERE forum_name = ? AND status = ?",
(forum_name, status)
)
elif forum_name:
cursor.execute(
"DELETE FROM download_queue WHERE forum_name = ?",
(forum_name,)
)
elif status:
cursor.execute(
"DELETE FROM download_queue WHERE status = ?",
(status,)
)
else:
cursor.execute("DELETE FROM download_queue")
deleted = cursor.rowcount
conn.commit()
self.log(f"Deleted {deleted} records from download queue", "info")
return deleted
finally:
conn.close()
def add_to_download_queue(self, url, save_path, referer=None, thread_id=None,
post_id=None, forum_name=None, metadata=None):
"""Add an item to the download queue
Args:
url: URL to download
save_path: Where to save the file
referer: Referer URL
thread_id: Associated thread ID
post_id: Associated post ID
forum_name: Forum name
metadata: Additional metadata as dict
Returns:
True if added, False if already exists
"""
if not self.use_database:
return False
# Use adapter if available
if self.db_adapter:
return self.db_adapter.add_to_download_queue(
url=url, referer=referer, save_path=save_path,
thread_id=thread_id, post_id=post_id,
forum_name=forum_name, metadata=metadata
)
conn = self._get_db_connection()
cursor = conn.cursor()
try:
# Check if already in queue or downloaded
cursor.execute(
"SELECT status FROM download_queue WHERE url = ?",
(url,)
)
existing = cursor.fetchone()
if existing:
if existing[0] == 'completed':
self.log(f"Skipping already downloaded: {Path(save_path).name}", "info")
return False # Already downloaded
elif existing[0] == 'pending':
# Already in queue, don't duplicate
return False
# Only insert if not existing
metadata_str = json.dumps(metadata) if metadata else None
cursor.execute('''
INSERT INTO download_queue
(url, referer, save_path, thread_id, post_id, forum_name, status, metadata)
VALUES (?, ?, ?, ?, ?, ?, 'pending', ?)
''', (url, referer, str(save_path), thread_id, post_id, forum_name, metadata_str))
conn.commit()
return True
finally:
conn.close()
def is_in_download_queue(self, url):
"""Check if a URL is in the download queue with pending status
Args:
url: URL to check
Returns:
bool: True if in queue with pending status
"""
if not self.use_database:
return False
# Use adapter if available
if self.db_adapter:
return self.db_adapter.is_in_download_queue(url)
conn = self._get_db_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT status FROM download_queue WHERE url = ? AND status = 'pending'",
(url,)
)
result = cursor.fetchone()
return result is not None
finally:
conn.close()
def process_download_queue(self, context=None, max_items=None):
"""Process all pending items in the download queue using gallery-dl
Args:
context: Playwright context to use for downloads (optional, will use gallery-dl)
max_items: Maximum number of items to process
Returns:
Dict with download statistics
"""
if not self.use_database:
return {'processed': 0, 'successful': 0, 'failed': 0}
conn = self._get_db_connection()
cursor = conn.cursor()
# Get pending items with metadata
query = """
SELECT id, url, referer, save_path, thread_id, post_id,
forum_name, metadata
FROM download_queue
WHERE status = 'pending'
ORDER BY created_date
"""
if max_items:
query += f" LIMIT {max_items}"
cursor.execute(query)
items = cursor.fetchall()
stats = {'processed': 0, 'successful': 0, 'failed': 0}
for item in items:
item_id, url, referer, save_path, thread_id, post_id, forum_name, metadata_str = item
save_path = Path(save_path)
# Parse metadata
metadata = json.loads(metadata_str) if metadata_str else {}
post_date = None
post_title = metadata.get('post_title', '')
# Extract date from title first (takes precedence)
if post_title:
post_date = DateHandler.extract_date_from_text(post_title)
# Fall back to post date if no date in title
if not post_date and metadata.get('post_date'):
try:
post_date = datetime.fromisoformat(metadata['post_date'])
except (ValueError, TypeError):
pass # Invalid date format in metadata, use None
try:
# Download using Playwright if context available
if context:
page = context.new_page()
try:
# Set referer if provided
if referer:
page.set_extra_http_headers({'Referer': referer})
# For pixhost direct URLs (img*.pixhost.to), download directly
# For pixhost show URLs, they should have been converted during scraping
# but handle them here as fallback
if 'pixhost.to/show/' in url:
# This shouldn't happen if extraction worked during scraping
# Navigate to the pixhost page
page.goto(url, wait_until='domcontentloaded', timeout=30000)
page.wait_for_timeout(1000) # Wait for JS
# Find the actual image
img_element = page.query_selector('img#image')
if img_element:
actual_url = img_element.get_attribute('src')
if actual_url:
# Download the actual image
response = page.goto(actual_url, timeout=30000)
if response and response.ok:
content = response.body()
else:
raise Exception(f"Failed to download image from {actual_url}")
else:
raise Exception("No src attribute on image")
else:
raise Exception("No image found on pixhost page")
else:
# Regular download (including direct pixhost URLs)
response = page.goto(url, wait_until='domcontentloaded', timeout=60000)
if response and response.ok:
# Get the content
content = response.body()
else:
raise Exception(f"HTTP {response.status if response else 'No response'}")
# Check if it's HTML (error page)
if content[:1000].lower().find(b'<!doctype') != -1 or \
content[:1000].lower().find(b'<html') != -1:
raise Exception("Got HTML instead of image")
# Save the file
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(content)
# Update timestamps if we have a date
if post_date:
DateHandler.update_file_timestamps(save_path, post_date)
self.log(f"Updated timestamps to {post_date.strftime('%Y-%m-%d')}", "debug")
# Update database
cursor.execute('''
UPDATE download_queue
SET status = 'completed',
downloaded_date = CURRENT_TIMESTAMP,
file_hash = ?
WHERE id = ?
''', (hashlib.sha256(content).hexdigest(), item_id))
stats['successful'] += 1
self.log(f"Downloaded: {save_path.name}", "success")
finally:
page.close()
else:
# Fallback to requests (NOT RECOMMENDED - loses authentication)
headers = {'User-Agent': self.user_agent}
if referer:
headers['Referer'] = referer
response = requests.get(url, headers=headers, timeout=30, cookies=self._get_cookies_for_requests(forum_name))
response.raise_for_status()
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(response.content)
# Update timestamps if we have a date
if post_date:
DateHandler.update_file_timestamps(save_path, post_date)
cursor.execute('''
UPDATE download_queue
SET status = 'completed',
downloaded_date = CURRENT_TIMESTAMP
WHERE id = ?
''', (item_id,))
stats['successful'] += 1
except Exception as e:
# Mark as failed
cursor.execute('''
UPDATE download_queue
SET status = 'failed',
attempts = attempts + 1,
error_message = ?
WHERE id = ?
''', (str(e), item_id))
stats['failed'] += 1
self.log(f"Failed to download {url}: {e}", "error")
stats['processed'] += 1
self._apply_rate_limit()
conn.commit()
conn.close()
return stats
def cleanup(self):
"""Clean up browser and playwright resources"""
try:
if self.context:
self.context.close()
self.context = None
if self.browser:
self.browser.close()
self.browser = None
if self.playwright:
self.playwright.stop()
self.playwright = None
except Exception as e:
self.log(f"Error during cleanup: {e}", "debug")
def keep_alive(self):
"""Keep browser context alive by creating a dummy page"""
# Check thread safety - only ping if in same thread as context creation
import threading
current_thread_id = threading.current_thread().ident
context_thread_id = getattr(self, '_context_thread_id', None)
if self.context and context_thread_id == current_thread_id:
try:
dummy_page = self.context.new_page()
dummy_page.goto("about:blank")
dummy_page.close()
self.log("Browser keep-alive ping", "debug")
except Exception as e:
self.log(f"Keep-alive failed: {e}", "debug")
def cleanup_old_downloads(self, days=30):
"""Remove download records older than specified days
Args:
days: Number of days to keep records
Returns:
Number of records deleted
"""
if not self.use_database:
return 0
conn = self._get_db_connection()
cursor = conn.cursor()
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
cursor.execute('''
DELETE FROM download_queue
WHERE status = 'completed'
AND downloaded_date < ?
''', (cutoff_date,))
deleted = cursor.rowcount
conn.commit()
conn.close()
self.log(f"Cleaned up {deleted} old download records", "info")
return deleted
def get_queue_status(self):
"""Get current queue status
Returns:
Dict with queue statistics
"""
if not self.use_database:
return {}
conn = self._get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT status, COUNT(*)
FROM download_queue
GROUP BY status
''')
status = dict(cursor.fetchall())
conn.close()
return {
'pending': status.get('pending', 0),
'completed': status.get('completed', 0),
'failed': status.get('failed', 0),
'total': sum(status.values())
}
def retry_failed_downloads(self, max_attempts=3):
"""Retry failed downloads
Args:
max_attempts: Maximum number of attempts before giving up
Returns:
Number of items retried
"""
if not self.use_database:
return 0
conn = self._get_db_connection()
cursor = conn.cursor()
# Reset failed items that haven't exceeded max attempts
cursor.execute('''
UPDATE download_queue
SET status = 'pending'
WHERE status = 'failed'
AND attempts < ?
''', (max_attempts,))
retried = cursor.rowcount
conn.commit()
conn.close()
self.log(f"Retrying {retried} failed downloads", "info")
return retried
def _apply_rate_limit(self):
"""Apply random delay for rate limiting"""
delay = random.uniform(self.rate_limit[0], self.rate_limit[1])
time.sleep(delay)
def _get_content_hash(self, content: str) -> str:
"""Generate hash of content for duplicate detection"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def _download_image(self, url: str, save_path: Path, referer: str = None, external_only: bool = False, context=None) -> bool:
"""Download an image from URL, optionally filtering for external hosts only"""
try:
# If external_only is True, skip forum's internal attachments
if external_only:
# Skip forum's internal attachments
if '/attachments/' in url:
self.log(f"Skipping forum attachment: {url}", "debug")
return False
# Only download if it's an external image host or external URL
host = ImageHostHandler.identify_host(url)
if not host and not any(domain in url for domain in ['fastdl.app', 'picturepub.net']):
# Not a recognized external host, check if it's still external
if not url.startswith('http'):
self.log(f"Skipping non-external URL: {url}", "debug")
return False
# Check for image hosts that need special handling
host = ImageHostHandler.identify_host(url)
if host:
self.log(f"Detected {host} image host, extracting full image...", "debug")
# For image hosts, we need to visit the page and extract the full image
if host == 'imagebam':
# ImageBam requires visiting the page to get the full image
return self._download_from_imagebam(url, save_path, referer)
elif host == 'imagetwist':
# ImageTwist requires parsing the page to get direct image URL
return self._download_from_imagetwist(url, save_path, referer)
elif host == 'imgur':
# Imgur - convert to direct link
direct_url = ImageHostHandler.extract_direct_url(url)
if direct_url:
url = direct_url
else:
# Try generic extraction
direct_url = ImageHostHandler.extract_direct_url(url)
if direct_url:
url = direct_url
# Download using Playwright if context available, otherwise use requests
save_path.parent.mkdir(parents=True, exist_ok=True)
if context:
# Use Playwright for authenticated download
page = context.new_page()
try:
# Navigate to the image URL and get the response
response = page.goto(url, wait_until='networkidle')
if response:
# Get the response body (image bytes)
image_bytes = response.body()
# Check if we got HTML instead of an image
if image_bytes[:100].lower().find(b'<html') != -1 or image_bytes[:100].lower().find(b'<!doctype') != -1:
self.log(f"Got HTML instead of image for {url}", "warning")
page.close()
return False
# Save the image
with open(save_path, 'wb') as f:
f.write(image_bytes)
else:
self.log(f"Failed to get response for {url}", "warning")
page.close()
return False
finally:
page.close()
else:
# Fallback to requests if no context
headers = {
'User-Agent': self.user_agent,
'Referer': referer or url
}
response = requests.get(url, headers=headers, timeout=30, stream=True, cookies=self._get_cookies_for_requests())
response.raise_for_status()
# Read first chunk to validate content type
first_chunk = None
chunks = []
for chunk in response.iter_content(chunk_size=8192):
if first_chunk is None:
first_chunk = chunk
# Check if we got HTML instead of an image
if first_chunk[:100].lower().find(b'<html') != -1 or \
first_chunk[:100].lower().find(b'<!doctype') != -1 or \
first_chunk[:100].lower().find(b'<head>') != -1 or \
first_chunk[:100].lower().find(b'<script') != -1:
self.log(f"Got HTML instead of image for {url} (requests fallback)", "warning")
return False
chunks.append(chunk)
# Save the image if validation passed
with open(save_path, 'wb') as f:
for chunk in chunks:
f.write(chunk)
# Check for duplicate hash before marking as successful
if self.db_adapter and hasattr(self.db_adapter, 'unified_db'):
from pathlib import Path as PathLib
file_hash = self.db_adapter.unified_db.get_file_hash(str(save_path))
if file_hash:
existing = self.db_adapter.unified_db.get_download_by_file_hash(file_hash)
if existing and existing.get('file_path') and str(save_path) != existing.get('file_path'):
existing_path = PathLib(existing['file_path'])
if existing_path.exists():
self.log(f"⚠ Duplicate file detected: {save_path.name} matches {existing['filename']} from {existing['platform']}/{existing['source']}", "warning")
try:
save_path.unlink()
self.log(f"Deleted duplicate: {save_path.name}", "debug")
return False
except Exception as e:
self.log(f"Failed to delete duplicate {save_path.name}: {e}", "warning")
return False
return True
except Exception as e:
self.log(f"Failed to download image {url}: {e}", "error")
# Try gallery-dl as fallback
if self._try_gallery_dl_fallback(url, save_path, referer):
self.log(f"Successfully downloaded via gallery-dl: {save_path.name}", "success")
return True
return False
def _try_gallery_dl_fallback(self, url: str, save_path: Path, referer: str = None) -> bool:
"""Try to download using gallery-dl as fallback for unsupported hosts"""
try:
import subprocess
# Check if gallery-dl is installed
result = subprocess.run(["which", "gallery-dl"], capture_output=True)
if result.returncode != 0:
self.log("gallery-dl not installed, skipping fallback", "debug")
return False
self.log(f"Attempting download with gallery-dl: {url}", "debug")
# Build gallery-dl command
cmd = [
"gallery-dl",
"--dest", str(save_path.parent),
"--filename", f"{save_path.name}",
"--no-skip",
"--no-part",
"--quiet"
]
# Add referer if provided
if referer:
cmd.extend(["--header", f"Referer: {referer}"])
# Add the URL
cmd.append(url)
# Run gallery-dl with timeout
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0 and save_path.exists():
return True
# Check if file was saved with different extension
base_name = save_path.stem
for file in save_path.parent.glob(f"{base_name}.*"):
if file != save_path:
# Rename to expected path
file.rename(save_path)
return True
return False
except subprocess.TimeoutExpired:
self.log("gallery-dl timeout", "debug")
return False
except Exception as e:
self.log(f"gallery-dl fallback failed: {e}", "debug")
return False
def _download_with_retry(self, download_func, *args, max_retries=3, **kwargs):
"""Download with exponential backoff retry logic"""
import time
for attempt in range(max_retries):
try:
result = download_func(*args, **kwargs)
if result:
return True
# If download returned False (not an exception), might be 404
if attempt == max_retries - 1:
return False
except requests.exceptions.HTTPError as e:
if e.response.status_code in [404, 410]:
# Don't retry on not found
self.log(f"Resource not found (HTTP {e.response.status_code})", "warning")
return False
elif e.response.status_code == 429:
# Rate limited - wait longer
wait_time = min(60, (2 ** attempt) * 5)
self.log(f"Rate limited, waiting {wait_time}s", "warning")
time.sleep(wait_time)
elif e.response.status_code >= 500:
# Server error - retry with backoff
wait_time = min(30, (2 ** attempt) * 2)
self.log(f"Server error {e.response.status_code}, retrying in {wait_time}s", "warning")
time.sleep(wait_time)
else:
raise
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
if attempt < max_retries - 1:
# Network error - retry with exponential backoff
wait_time = min(30, (2 ** attempt) * 2)
self.log(f"Network error, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})", "info")
time.sleep(wait_time)
else:
self.log(f"Failed after {max_retries} attempts: {e}", "error")
return False
except Exception as e:
self.log(f"Unexpected error in download: {e}", "error")
return False
return False
def _download_from_imagebam(self, url: str, save_path: Path, referer: str = None) -> bool:
"""Download image from ImageBam (requires clicking continue)"""
try:
# ImageBam requires clicking "Continue to image" button
# Run Playwright in a separate thread to avoid event loop conflicts
def run_playwright():
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
executable_path='/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome' if os.path.exists('/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome') else None
)
page = browser.new_page(user_agent=self.user_agent)
# Set referer
if referer:
page.set_extra_http_headers({'Referer': referer})
# Go to ImageBam page
page.goto(url, wait_until='domcontentloaded')
page.wait_for_timeout(2000)
# Click "Continue to image" or similar button
continue_buttons = [
'button:has-text("Continue")',
'a:has-text("Continue")',
'input[value*="Continue"]',
'.continue-button',
'button:has-text("Continue to image")',
'a:has-text("Continue to image")',
'a:has-text("Continue to your image")'
]
for selector in continue_buttons:
try:
if page.locator(selector).count() > 0:
page.locator(selector).first.click()
page.wait_for_timeout(2000)
break
except Exception:
continue
# Now look for the actual image
img_url = None
# Try different methods to find the image
# Method 1: Look for ImageBam hosted images (images*.imagebam.com)
img_elems = page.locator('img').all()
for img in img_elems:
src = img.get_attribute('src')
if src:
# ImageBam full images are on images*.imagebam.com domains
if 'images' in src and 'imagebam.com' in src and src.endswith(('.jpg', '.jpeg', '.png', '.gif')):
# Check it's not a logo or small image
if 'logo' not in src.lower() and 'thumb' not in src.lower():
img_url = src
break # Found the full image
# Method 2: Look for image in a specific container
if not img_url:
main_img = page.locator('#imageTarget, .main-image, .the-image, #thepic').first
if main_img:
img_url = main_img.get_attribute('src')
# Method 3: Get from page content
if not img_url:
content = page.content()
import re
# Look for image URL in page
match = re.search(r'(https?://[^"]+images[^"]+\.(?:jpg|jpeg|png|gif))', content)
if match:
img_url = match.group(1)
browser.close()
if img_url:
# Make sure it's a full URL
if not img_url.startswith('http'):
img_url = urljoin(url, img_url)
# Download the image
headers = {
'User-Agent': self.user_agent,
'Referer': url
}
response = requests.get(img_url, headers=headers, timeout=30, stream=True, cookies=self._get_cookies_for_requests())
response.raise_for_status()
# Read first chunk to validate content type
first_chunk = None
chunks = []
for chunk in response.iter_content(chunk_size=8192):
if first_chunk is None:
first_chunk = chunk
# Check if we got HTML instead of an image
if first_chunk[:100].lower().find(b'<html') != -1 or \
first_chunk[:100].lower().find(b'<!doctype') != -1 or \
first_chunk[:100].lower().find(b'<head>') != -1 or \
first_chunk[:100].lower().find(b'<script') != -1:
self.log(f"Got HTML instead of image for ImageBam {img_url}", "warning")
return False
chunks.append(chunk)
# Save image if validation passed
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, 'wb') as f:
for chunk in chunks:
f.write(chunk)
self.log(f"Successfully downloaded ImageBam image: {save_path.name}", "success")
# Check for duplicate hash before marking as successful
if self.db_adapter and hasattr(self.db_adapter, 'unified_db'):
from pathlib import Path as PathLib
file_hash = self.db_adapter.unified_db.get_file_hash(str(save_path))
if file_hash:
existing = self.db_adapter.unified_db.get_download_by_file_hash(file_hash)
if existing and existing.get('file_path') and str(save_path) != existing.get('file_path'):
existing_path = PathLib(existing['file_path'])
if existing_path.exists():
self.log(f"⚠ Duplicate file detected: {save_path.name} matches {existing['filename']} from {existing['platform']}/{existing['source']}", "warning")
try:
save_path.unlink()
self.log(f"Deleted duplicate: {save_path.name}", "debug")
return False
except Exception as e:
self.log(f"Failed to delete duplicate {save_path.name}: {e}", "warning")
return False
return True
else:
self.log(f"Could not find image URL on ImageBam page: {url}", "warning")
return False
# nest_asyncio is already applied at module level
return run_playwright()
except Exception as e:
self.log(f"Failed to download from ImageBam {url}: {e}", "error")
return False
def _download_from_imagetwist(self, url: str, save_path: Path, referer: str = None) -> bool:
"""Download image from ImageTwist (requires parsing page for direct image URL)"""
import time
# Rate limiting for ImageTwist (they return error images if too fast)
if not hasattr(self, '_imagetwist_last_request'):
self._imagetwist_last_request = 0
elapsed = time.time() - self._imagetwist_last_request
if elapsed < 2.0: # Minimum 2 seconds between ImageTwist requests
time.sleep(2.0 - elapsed)
try:
self.log(f"Fetching ImageTwist page: {url}", "debug")
# First, fetch the page to find the direct image URL
headers = {
'User-Agent': self.user_agent,
'Referer': referer or 'https://forum.phun.org/'
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
self._imagetwist_last_request = time.time()
page_content = response.text
# Look for the direct image URL in the page
# ImageTwist pattern: <img src="https://i*phun.imagetwist.com/i/XXXXX/HASH.jpg/filename.jpg" class="pic img img-responsive"
img_url = None
# Method 1: Look for pic img img-responsive class (most reliable)
soup = BeautifulSoup(page_content, 'html.parser')
pic_img = soup.find('img', class_='pic')
if pic_img and pic_img.get('src'):
img_url = pic_img['src']
self.log(f"Found ImageTwist direct URL via pic class: {img_url}", "debug")
# Method 2: Regex for i*.imagetwist.com/i/ pattern
if not img_url:
match = re.search(r'(https?://i\d*(?:phun)?\.imagetwist\.com/i/[^"\'>\s]+)', page_content)
if match:
img_url = match.group(1)
self.log(f"Found ImageTwist direct URL via regex: {img_url}", "debug")
# Method 3: Look for download link
if not img_url:
download_link = soup.find('a', class_='ddownloader')
if download_link and download_link.get('href'):
img_url = download_link['href']
self.log(f"Found ImageTwist direct URL via download link: {img_url}", "debug")
if not img_url:
self.log(f"Could not find direct image URL on ImageTwist page: {url}", "warning")
return False
# Rate limit before image download too
elapsed = time.time() - self._imagetwist_last_request
if elapsed < 2.0:
time.sleep(2.0 - elapsed)
# Now download the actual image - use imagetwist page as Referer
self.log(f"Downloading ImageTwist image: {img_url}", "debug")
img_headers = {
'User-Agent': self.user_agent,
'Referer': url # Use the imagetwist page URL as Referer
}
img_response = requests.get(img_url, headers=img_headers, timeout=30, stream=True)
img_response.raise_for_status()
self._imagetwist_last_request = time.time()
# Check for ImageTwist error placeholder (8346 bytes PNG - rate limited or deleted)
content_length = img_response.headers.get('Content-Length', 'unknown')
if content_length == '8346':
self.log(f"ImageTwist returned error image (rate limited or unavailable): {url}", "warning")
return False
# Validate it's an image, not HTML
first_chunk = None
chunks = []
for chunk in img_response.iter_content(chunk_size=8192):
if first_chunk is None:
first_chunk = chunk
# Check if we got HTML instead of an image
if first_chunk[:100].lower().find(b'<html') != -1 or \
first_chunk[:100].lower().find(b'<!doctype') != -1 or \
first_chunk[:100].lower().find(b'<head>') != -1 or \
first_chunk[:100].lower().find(b'<script') != -1:
self.log(f"Got HTML instead of image for ImageTwist {img_url}", "warning")
return False
chunks.append(chunk)
# Save the image
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, 'wb') as f:
for chunk in chunks:
f.write(chunk)
# Log actual file size
actual_size = save_path.stat().st_size
self.log(f"Downloaded ImageTwist image: {save_path.name} ({actual_size} bytes)", "success")
# Check for duplicate hash
if self.db_adapter and hasattr(self.db_adapter, 'unified_db'):
from pathlib import Path as PathLib
file_hash = self.db_adapter.unified_db.get_file_hash(str(save_path))
if file_hash:
existing = self.db_adapter.unified_db.get_download_by_file_hash(file_hash)
if existing and existing.get('file_path') and str(save_path) != existing.get('file_path'):
existing_path = PathLib(existing['file_path'])
if existing_path.exists():
self.log(f"Duplicate file detected: {save_path.name} matches {existing['filename']}", "warning")
try:
save_path.unlink()
return False
except Exception as e:
self.log(f"Failed to delete duplicate {save_path.name}: {e}", "warning")
return False
return True
except Exception as e:
self.log(f"Failed to download from ImageTwist {url}: {e}", "error")
return False
def login(self, forum_name: str, username: str, password: str, forum_url: str = None,
forum_type: str = None, cloudflare_enabled: bool = False) -> bool:
"""
Login to a forum and keep browser context alive for subsequent operations
Args:
forum_name: Name identifier for the forum
username: Login username
password: Login password
forum_url: Base URL of the forum (optional if thread_url provided)
forum_type: Forum software type (xenoforo, vbulletin, phpbb, discourse, ipb, mybb, smf)
If not specified, will auto-detect
cloudflare_enabled: Whether this forum uses Cloudflare protection
Returns:
bool: True if login successful
"""
# Only create new browser if we don't have one
if not self.playwright:
self.playwright = sync_playwright().start()
if not self.browser:
self.browser = self.playwright.chromium.launch(
headless=self.headless,
executable_path='/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome' if os.path.exists('/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome') else None
)
if not self.context:
self.context = self._create_browser_context(self.browser)
import threading
self._context_thread_id = threading.current_thread().ident
# Try to load existing cookies first
if self.authenticator.load_cookies(self.context, forum_name):
page = self.context.new_page()
# Use Cloudflare-aware navigation if needed
if cloudflare_enabled:
if not self._navigate_with_cloudflare(page, forum_url, forum_name, cloudflare_enabled):
self.log(f"Failed to navigate to {forum_name} (Cloudflare)", "error")
page.close()
return False
else:
page.goto(forum_url)
# Verify if still logged in
if self.authenticator._verify_login(page, username):
self.logged_in_forums[forum_name] = True
self.log(f"Restored session for {forum_name}", "debug")
# Keep browser open for subsequent operations
return True
page.close()
page = self.context.new_page()
# Navigate to forum (with Cloudflare support if needed)
if forum_url:
if cloudflare_enabled:
if not self._navigate_with_cloudflare(page, forum_url, forum_name, cloudflare_enabled):
self.log(f"Failed to navigate to {forum_name} for login", "error")
return False
else:
page.goto(forum_url)
# Use provided forum_type or auto-detect
if forum_type:
success = self.authenticator.login_with_type(page, username, password, forum_url, forum_type)
else:
success = self.authenticator.auto_login(page, username, password, forum_url)
if success:
self.authenticator.save_cookies(page, forum_name)
self.logged_in_forums[forum_name] = True
self.log(f"Successfully logged in to {forum_name}", "success")
else:
self.log(f"Failed to login to {forum_name}", "error")
# Close browser on failure
self.browser.close()
self.browser = None
self.context = None
# Keep browser open for subsequent operations if successful
return success
def monitor_search(self,
forum_name: str,
search_query: str,
search_url: str = None,
forum_url: str = None,
check_frequency_hours: int = 24,
auto_track_days: int = 30,
number_of_days: int = None,
base_download_path: str = None,
destination_path: str = None,
username: str = None,
password: str = None,
newer_than_days: int = None,
older_than_days: int = None,
external_only: bool = True,
cloudflare_enabled: bool = False) -> Dict:
"""
Monitor a search for new threads/posts
Args:
forum_name: Name of the forum
search_query: Search query string
search_url: URL of the search results (optional if using date filters)
forum_url: Base URL of the forum (e.g., https://example.com)
check_frequency_hours: How often to check (hours)
auto_track_days: Days to track new threads found
number_of_days: Only download posts from last N days (None = all)
base_download_path: Temporary download path (default: downloads/{forum_name}/temp)
destination_path: Final destination path (default: downloads/{forum_name})
username: Optional username for login
password: Optional password for login
newer_than_days: Search for threads newer than N days
older_than_days: Search for threads older than N days
cloudflare_enabled: Whether this forum uses Cloudflare protection
Returns:
Dictionary with search results
"""
if not self.use_database:
self.log("Database required for search monitoring", "error")
return {}
conn = self._get_db_connection()
cursor = conn.cursor()
# Generate search ID
search_id = hashlib.sha256(f"{forum_name}:{search_query}".encode()).hexdigest()
# Check if search exists
cursor.execute(
"SELECT last_checked FROM searches WHERE search_id = ?",
(search_id,)
)
existing = cursor.fetchone()
# Check if we should run the search
if existing:
last_checked = existing[0] if isinstance(existing[0], datetime) else datetime.fromisoformat(existing[0])
if datetime.now() - last_checked < timedelta(hours=check_frequency_hours):
self.log(f"Search '{search_query}' checked recently, skipping", "info")
conn.close()
return {'status': 'skipped', 'reason': 'checked_recently'}
# Perform the search
self.log(f"Monitoring search: {search_query}", "info")
# If date filters are provided and no search_url, perform advanced search
if (newer_than_days or older_than_days) and not search_url:
search_url = self._perform_advanced_search(
forum_name=forum_name,
search_query=search_query,
forum_url=forum_url,
newer_than_days=newer_than_days,
older_than_days=older_than_days,
username=username,
password=password,
cloudflare_enabled=cloudflare_enabled
)
if not search_url:
self.log("Advanced search failed", "error")
conn.close()
return {'status': 'error', 'message': 'Advanced search failed'}
elif not search_url:
self.log("Search URL required when not using date filters", "error")
conn.close()
return {'status': 'error', 'message': 'Search URL required'}
# Check for special phun.org marker (results already scraped to avoid Cloudflare)
if search_url == "PHUN_RESULTS_READY":
results = getattr(self, '_phun_search_results', [])
self._phun_search_results = [] # Clear after use
else:
results = self._scrape_search_results(search_url)
# Filter results to only include threads that contain ALL search terms in the title
if search_query and results:
filtered_results = []
search_terms = search_query.lower().split() # Split search query into words
for result in results:
title = result.get('title', '').lower()
# Check if ALL search terms appear in the title
if title and all(term in title for term in search_terms):
filtered_results.append(result)
else:
self.log(f"Skipping thread (search term not in title): {result.get('title', 'Unknown')[:60]}...", "debug")
if len(filtered_results) < len(results):
self.log(f"Filtered {len(results) - len(filtered_results)} threads that don't match search query", "info")
results = filtered_results
# Update or insert search record
if existing:
cursor.execute('''
UPDATE searches
SET last_checked = ?, results_found = ?
WHERE search_id = ?
''', (datetime.now().isoformat(), len(results), search_id))
else:
cursor.execute('''
INSERT INTO searches
(search_id, forum_name, search_query, search_url, last_checked,
check_frequency_hours, active, results_found)
VALUES (?, ?, ?, ?, ?, ?, TRUE, ?)
''', (search_id, forum_name, search_query, search_url,
datetime.now().isoformat(), check_frequency_hours, len(results)))
# Process results
new_threads = 0
new_thread_results = [] # Track only new threads to download
monitor_until = datetime.now() + timedelta(days=auto_track_days)
for result in results:
thread_id = result.get('thread_id')
thread_url = result.get('url')
# Check if thread exists and if it's still being monitored
thread_exists = False
should_monitor = True
if self.db_adapter:
# Check if URL is already downloaded in unified database
thread_exists = self.db_adapter.is_already_downloaded(thread_url, forum_name=forum_name)
if thread_exists:
# Check if monitor_until has expired
thread_data = self.db_adapter.db_get_thread(thread_id)
if thread_data and thread_data.get('monitor_until'):
monitor_until_str = thread_data.get('monitor_until')
try:
monitor_until_date = datetime.fromisoformat(monitor_until_str)
if datetime.now() > monitor_until_date:
should_monitor = False
self.log(f"Thread monitoring expired, skipping: {result.get('title', 'Unknown')[:60]}...", "debug")
else:
self.log(f"Thread exists but still monitoring for updates: {result.get('title', 'Unknown')[:60]}...", "debug")
except Exception:
pass # If parsing fails, continue monitoring
else:
# Fallback to local database check
cursor.execute(
"SELECT thread_id, monitor_until FROM threads WHERE thread_id = ? OR thread_url = ?",
(thread_id, thread_url)
)
row = cursor.fetchone()
if row:
thread_exists = True
if row[1]: # monitor_until exists
try:
monitor_until_date = datetime.fromisoformat(row[1])
if datetime.now() > monitor_until_date:
should_monitor = False
except Exception:
pass
if not thread_exists or (thread_exists and should_monitor):
# New thread OR existing thread still being monitored
if not thread_exists:
# Add new thread to tracking
if self.db_adapter:
thread_added = self.db_adapter.db_add_thread(
thread_id=thread_id or hashlib.sha256(thread_url.encode()).hexdigest(),
forum_name=forum_name,
thread_url=thread_url,
thread_title=result.get('title', 'Unknown'),
monitor_until=monitor_until
)
if thread_added:
self.log(f"Added thread to monitoring for 30 days: {result.get('title', 'Unknown')[:60]}...", "info")
else:
# Fallback to local database
cursor.execute('''
INSERT OR IGNORE INTO threads
(thread_id, forum_name, thread_url, thread_title,
author, created_date, last_checked, status, monitor_until)
VALUES (?, ?, ?, ?, ?, ?, ?, 'active', ?)
''', (
thread_id or hashlib.sha256(thread_url.encode()).hexdigest(),
forum_name,
thread_url,
result.get('title', 'Unknown'),
result.get('author', 'Unknown'),
result.get('date', datetime.now().isoformat()) if isinstance(result.get('date'), str) else datetime.now().isoformat(),
datetime.now().isoformat(),
monitor_until.isoformat()
))
new_threads += 1
self.log(f"New thread found: {result.get('title', 'Unknown')[:60]}...", "info")
else:
# Existing thread still being monitored - just update last_checked
if self.db_adapter:
self.db_adapter.db_update_thread(
thread_id=thread_id,
last_post_date=None,
post_count=None
)
self.log(f"Checking monitored thread for updates: {result.get('title', 'Unknown')[:60]}...", "info")
# Add to results list for downloading/checking
new_thread_results.append(result)
else:
# Thread already downloaded - skip it
self.log(f"Thread already downloaded, skipping: {result.get('title', 'Unknown')[:60]}...", "info")
# Update monitoring in unified database if using adapter
if self.db_adapter:
self.db_adapter.db_update_thread(
thread_id=thread_id,
last_post_date=None,
post_count=None
)
else:
# Thread exists - update monitor_until if it's NULL
cursor.execute('''
UPDATE threads
SET monitor_until = ?, last_checked = ?
WHERE thread_id = ? AND monitor_until IS NULL
''', (monitor_until.isoformat(), datetime.now().isoformat(), thread_id))
# Link to search
cursor.execute('''
INSERT OR IGNORE INTO search_results
(search_id, thread_id, found_date)
VALUES (?, ?, ?)
''', (search_id, thread_id, datetime.now().isoformat()))
conn.commit()
conn.close()
self.stats['searches_monitored'] += 1
self.stats['new_threads_found'] += new_threads
skipped_threads = len(results) - new_threads
if skipped_threads > 0:
self.log(f"Search complete: {len(results)} results found, {new_threads} new threads, {skipped_threads} already downloaded", "success")
else:
self.log(f"Search complete: {len(results)} results, {new_threads} new threads", "success")
# Don't close browser here - it might be needed for downloads
# Let download_thread handle its own browser lifecycle
return {
'status': 'success',
'total_results': len(results),
'new_threads': new_threads,
'skipped_threads': len(results) - new_threads,
'search_id': search_id,
'results': new_thread_results # Return ONLY new threads to download
}
def download_thread(self,
thread_url: str,
forum_name: str = None,
download_images: bool = True,
update_existing: bool = True,
number_of_days: int = None,
base_download_path: str = None,
destination_path: str = None,
username: str = None,
password: str = None,
external_only: bool = True,
recycle_context: bool = True,
skip_file_move: bool = False,
cloudflare_enabled: bool = False,
defer_database: bool = False,
auto_track_days: int = 30) -> Dict:
"""
Download a forum thread with all posts and images
Args:
thread_url: URL of the thread
forum_name: Name of the forum (auto-detected if not provided)
download_images: Whether to download images
update_existing: Update existing posts
number_of_days: Only download posts from last N days (None = all)
base_download_path: Temporary download path (default: downloads/{forum_name}/temp)
destination_path: Final destination path (default: downloads/{forum_name})
defer_database: If True, don't record to unified database immediately - store in
pending_downloads for later recording after file move is complete
auto_track_days: Number of days to monitor the thread for updates (default: 30)
Returns:
Dictionary with download results
"""
# Store defer_database and cloudflare_enabled for use in method
self._current_defer_database = defer_database
self._current_cloudflare_enabled = cloudflare_enabled
self.log(f"Downloading thread: {thread_url}", "info")
self.activity_manager.update_status(f"Checking forum thread: {forum_name or 'unknown'}")
# Extract thread ID from URL (forum-specific)
thread_id = self._extract_thread_id(thread_url)
if self.use_database:
conn = self._get_db_connection()
cursor = conn.cursor()
# Check if thread exists
cursor.execute(
"SELECT last_post_date, post_count FROM threads WHERE thread_id = ?",
(thread_id,)
)
existing = cursor.fetchone()
if existing and not update_existing:
self.log(f"Thread {thread_id} already downloaded, skipping", "info")
conn.close()
return {'status': 'skipped', 'thread_id': thread_id}
# Setup authentication if needed
context = None
browser = None
thread_data = None
local_playwright = None
# Check if we're running in a different thread than where self.context was created
# Playwright contexts cannot be shared across threads
import threading
current_thread_id = threading.current_thread().ident
context_thread_id = getattr(self, '_context_thread_id', None)
can_reuse_context = (self.context and self.browser and
context_thread_id == current_thread_id)
# Check if we already have a browser context from login() in the SAME thread
if can_reuse_context:
# Use existing authenticated browser context
context = self.context
browser = self.browser
self.log(f"Using existing browser context for {forum_name}", "debug")
thread_data = self._scrape_thread(thread_url, context)
elif username and password:
# Create new browser context if not already logged in
local_playwright = sync_playwright().start()
browser = local_playwright.chromium.launch(
headless=self.headless,
executable_path='/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome' if os.path.exists('/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome') else None
)
context = self._create_browser_context(browser)
# Try to load existing cookies first
cookies_loaded = forum_name and self.authenticator.load_cookies(context, forum_name)
if cookies_loaded:
self.log(f"Loaded saved cookies for {forum_name}", "debug")
# Visit forum base URL to renew session (xf_user remember-me cookie)
try:
from urllib.parse import urlparse
base_url = f"{urlparse(thread_url).scheme}://{urlparse(thread_url).netloc}/"
temp_page = context.new_page()
temp_page.goto(base_url, wait_until='load', timeout=15000)
temp_page.wait_for_timeout(2000)
if self.authenticator._verify_login(temp_page, username):
self.logged_in_forums[forum_name] = True
self.authenticator.save_cookies(temp_page, forum_name)
self.log(f"Session renewed for {forum_name}", "debug")
else:
self.log(f"Session expired for {forum_name}, will re-login", "debug")
cookies_loaded = False
temp_page.close()
except Exception as e:
self.log(f"Error renewing session: {e}", "debug")
cookies_loaded = False
# Login if no cookies or session expired
if not cookies_loaded and forum_name and forum_name not in self.logged_in_forums:
temp_page = context.new_page()
if self.authenticator.auto_login(temp_page, username, password, thread_url):
self.authenticator.save_cookies(temp_page, forum_name)
self.logged_in_forums[forum_name] = True
self.log(f"Logged in to {forum_name}", "success")
temp_page.close()
# Scrape thread within the context
thread_data = self._scrape_thread(thread_url, context)
else:
# Scrape without authentication
thread_data = self._scrape_thread(thread_url, None)
if not thread_data:
self.log(f"Failed to scrape thread: {thread_url}", "error")
return {'status': 'error', 'thread_id': thread_id}
# Create thread directory with custom paths
safe_title = re.sub(r'[<>:"/\\|?*]', '_', thread_data['title'][:100])
# Use custom paths if provided
if base_download_path:
base_path = Path(base_download_path)
else:
base_path = self.download_dir / (forum_name or 'unknown') / 'temp'
if destination_path:
dest_path = Path(destination_path)
else:
dest_path = self.download_dir / (forum_name or 'unknown')
# Initially download to base path
thread_dir = base_path / safe_title
thread_dir.mkdir(parents=True, exist_ok=True)
# Final destination directory
final_dir = dest_path / safe_title
final_dir.mkdir(parents=True, exist_ok=True)
# Save thread info
if self.use_database:
# Add to unified database if using adapter
if self.db_adapter:
# Calculate monitor_until using configured auto_track_days
monitor_until = datetime.now() + timedelta(days=auto_track_days)
thread_added = self.db_adapter.db_add_thread(
thread_id=thread_id,
forum_name=forum_name or self._detect_forum(thread_url),
thread_url=thread_url,
thread_title=thread_data['title'],
monitor_until=monitor_until
)
if thread_added:
self.log(f"Added thread to monitoring database for {auto_track_days} days", "debug")
# Update with post count and mark as just checked
self.db_adapter.db_update_thread(
thread_id=thread_id,
last_post_date=thread_data.get('last_post_date'),
post_count=len(thread_data.get('posts', []))
)
else:
# Fallback to local database
cursor.execute('''
INSERT INTO threads
(thread_id, forum_name, thread_url, thread_title, author,
created_date, last_checked, last_post_date, post_count, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (thread_id) DO UPDATE SET
forum_name = EXCLUDED.forum_name,
thread_url = EXCLUDED.thread_url,
thread_title = EXCLUDED.thread_title,
author = EXCLUDED.author,
last_checked = EXCLUDED.last_checked,
last_post_date = EXCLUDED.last_post_date,
post_count = EXCLUDED.post_count,
status = EXCLUDED.status
''', (
thread_id,
forum_name or self._detect_forum(thread_url),
thread_url,
thread_data['title'],
thread_data.get('author', 'Unknown'),
thread_data.get('created_date') if isinstance(thread_data.get('created_date'), str) else datetime.now().isoformat(),
datetime.now().isoformat(),
thread_data.get('last_post_date') if isinstance(thread_data.get('last_post_date'), str) else datetime.now().isoformat(),
len(thread_data.get('posts', [])),
'active'
))
conn.commit()
conn.close() # Close connection before queueing to avoid database lock
# Process posts
downloaded_posts = 0
downloaded_images = 0
queued_images = 0
images_to_queue = [] # Collect images to queue
# Apply date filtering if specified
cutoff_date = None
if number_of_days:
cutoff_date = datetime.now() - timedelta(days=number_of_days)
self.log(f"Filtering posts from last {number_of_days} days (after {cutoff_date.strftime('%Y-%m-%d')})", "info")
# Reopen database connection for post processing
if self.use_database:
conn = self._get_db_connection()
cursor = conn.cursor()
for post in thread_data.get('posts', []):
# Check date filter
if cutoff_date and post.get('date'):
try:
post_date = datetime.fromisoformat(post.get('date').replace('Z', '+00:00'))
if post_date < cutoff_date:
continue # Skip posts older than cutoff
except Exception:
pass # If can't parse date, include the post
post_id = post.get('id') or hashlib.sha256(
f"{thread_id}:{post.get('author')}:{post.get('date')}".encode()
).hexdigest()
# Check if post exists
if self.use_database:
cursor.execute(
"SELECT downloaded FROM posts WHERE post_id = ?",
(post_id,)
)
post_exists = cursor.fetchone()
if post_exists and not update_existing:
continue
# Skip JSON saving - we only want images
# Save to database
if self.use_database:
cursor.execute('''
INSERT INTO posts
(post_id, thread_id, post_url, author, post_date,
content_hash, has_images, downloaded, download_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (post_id) DO UPDATE SET
thread_id = EXCLUDED.thread_id,
post_url = EXCLUDED.post_url,
author = EXCLUDED.author,
post_date = EXCLUDED.post_date,
content_hash = EXCLUDED.content_hash,
has_images = EXCLUDED.has_images,
downloaded = EXCLUDED.downloaded,
download_date = EXCLUDED.download_date
''', (
post_id,
thread_id,
post.get('url') or None,
post.get('author', 'Unknown'),
post.get('date', datetime.now().isoformat()) if not isinstance(post.get('date'), str) else post.get('date'),
self._get_content_hash(post.get('content', '')),
len(post.get('images', [])) > 0,
True,
datetime.now().isoformat()
))
downloaded_posts += 1
# Collect images for download if requested
if download_images and post.get('images'):
# Extract date for timestamp updating and filename prefix
post_date_str = None
post_date_obj = None
thread_title = thread_data.get('title', '')
# Try to extract date from thread title first (most reliable for these forums)
if DATE_UTILS_AVAILABLE and thread_title:
post_date_obj = DateHandler.extract_date_from_text(thread_title)
if post_date_obj:
self.log(f"Extracted date from title for filename: {post_date_obj.strftime('%Y%m%d_%H%M%S')}", "debug")
# Fall back to post date from forum
if not post_date_obj and post.get('date'):
post_date_str = post.get('date')
try:
if 'T' in str(post_date_str):
post_date_obj = datetime.fromisoformat(post_date_str.replace('Z', '+00:00'))
else:
# Try common forum date formats
for fmt in ['%b %d, %Y at %I:%M %p', '%B %d, %Y', '%d %b %Y', '%Y-%m-%d', '%m/%d/%Y']:
try:
post_date_obj = datetime.strptime(str(post_date_str).strip(), fmt)
break
except ValueError:
continue
except Exception:
pass
# Prepare metadata for queue
metadata = {
'post_title': thread_title, # Thread title for date extraction
'post_date': post_date_obj.isoformat() if post_date_obj else post_date_str,
'post_author': post.get('author', 'Unknown'),
'thread_title': thread_title
}
# Collect images to queue later
for img_url in post.get('images', []):
# Skip if external_only and it's an internal attachment
if external_only and '/attachments/' in img_url:
continue
# Don't process pixhost URLs here - let download manager handle it
img_filename = self._get_image_filename(img_url, post_date=post_date_obj)
img_path = thread_dir / img_filename
# Check if already exists locally
if img_path.exists():
self.log(f"Skipping existing local file: {img_filename}", "info")
continue
# Collect item to queue later (after closing DB)
images_to_queue.append({
'url': img_url,
'save_path': img_path,
'referer': thread_url,
'thread_id': thread_id,
'post_id': post_id,
'forum_name': forum_name,
'metadata': metadata
})
if self.use_database:
conn.commit()
conn.close()
# Now add collected images to queue (after DB is closed)
# Track which URLs were actually added (not duplicates)
newly_queued_urls = set()
for item in images_to_queue:
if self.add_to_download_queue(**item):
queued_images += 1
newly_queued_urls.add(item['url'])
self.log(f"Queued: {Path(item['save_path']).name}", "debug")
# Count how many were actually queued vs skipped
skipped_count = len(images_to_queue) - queued_images
if skipped_count > 0:
self.log(f"Skipped {skipped_count} duplicate images from database", "info")
# Process downloads with the new DownloadManager
if queued_images > 0:
self.log(f"Processing {queued_images} new images with multi-threaded downloader...", "info")
# Create download manager with appropriate settings
# When using adapter (db_path is None), disable download manager's own database
if self.db_path:
dm_db_path = str(self.db_path).replace('.db', '_downloads.db')
dm_use_db = self.use_database
else:
dm_db_path = ":memory:" # Use in-memory database
dm_use_db = False # Don't track in download manager's DB
download_manager = DownloadManager(
max_workers=10, # Increased concurrent downloads
rate_limit=0.2, # Faster rate limit
timeout=60, # Increased timeout for large images
show_progress=self.show_progress,
show_debug=False, # Hide debug messages
use_database=dm_use_db,
db_path=dm_db_path
)
# Set Playwright context for authenticated downloads
if context:
download_manager.set_playwright_context(context)
# Convert to DownloadItem objects (only newly queued items)
download_items = []
for item in images_to_queue:
# Skip items that weren't actually queued in THIS run (duplicates or already pending from previous runs)
if item['url'] not in newly_queued_urls:
continue
# Extract post date for timestamp updating
post_date = None
fallback_date = None
if item['metadata']:
# First try to get the actual post date as fallback
if item['metadata'].get('post_date'):
try:
fallback_date = datetime.fromisoformat(item['metadata']['post_date'])
except Exception:
pass
# Try to extract date from post title, with post date as fallback
post_title = item['metadata'].get('post_title', '')
if post_title:
post_date = DateHandler.extract_date_from_text(post_title, fallback_date=fallback_date)
else:
# No title to extract from, use the post date directly
post_date = fallback_date
download_items.append(DownloadItem(
url=item['url'],
save_path=item['save_path'],
referer=item['referer'],
metadata=item['metadata'],
post_date=post_date
))
# Close browser context NOW if we're downloading external images only
# The download manager uses requests for external images, not playwright
if external_only:
self.log("Closing browser pages (keeping context alive for reuse)", "debug")
try:
# Only close the page, keep context alive for next thread
if 'page' in locals() and page:
page.close()
page = None
# If recycle_context is True and this is self.context, recycle it
if recycle_context and context == self.context:
self.log("Recycling browser context", "debug")
if self.context:
self.context.close()
self.context = None
# Create new context for next use
if self.browser:
self.context = self._create_browser_context(self.browser)
import threading
self._context_thread_id = threading.current_thread().ident
# Reload cookies for authenticated forums
if forum_name and forum_name in self.logged_in_forums:
self.log(f"Reloading cookies for {forum_name}", "debug")
self.authenticator.load_cookies(self.context, forum_name)
# Only close local browser/context if different from self
elif context and context != self.context:
context.close()
context = None
if 'browser' in locals() and browser and browser != self.browser:
browser.close()
browser = None
except Exception as e:
self.log(f"Error managing browser context: {e}", "debug")
# Download all items - split large batches to prevent timeouts
if len(download_items) > 50:
self.log(f"Large batch ({len(download_items)} images), downloading in chunks", "info")
all_results = []
chunk_size = 30
for i in range(0, len(download_items), chunk_size):
chunk = download_items[i:i+chunk_size]
self.log(f"Downloading chunk {i//chunk_size + 1}/{(len(download_items)-1)//chunk_size + 1} ({len(chunk)} images)", "info")
chunk_results = download_manager.download_batch(chunk)
all_results.extend(chunk_results)
# Keep browser alive between chunks if still in use
if self.context and i + chunk_size < len(download_items):
try:
self.keep_alive()
except Exception:
pass # Browser may already be closed for external downloads
results = all_results
else:
# Download all items at once for small batches
results = download_manager.download_batch(download_items)
# Count successful downloads
downloaded_images = len([r for r in results if r.success])
failed_images = len([r for r in results if not r.success])
self.log(f"Download complete: {downloaded_images} successful, {failed_images} failed", "success")
# Update download_queue status for successful downloads
if self.use_database and results:
conn = self._get_db_connection()
cursor = conn.cursor()
for result in results:
if result.success:
# Mark as completed in download_queue
cursor.execute('''
UPDATE download_queue
SET status = 'completed',
downloaded_date = CURRENT_TIMESTAMP
WHERE url = ? AND status = 'pending'
''', (result.item.url,))
# Also record in unified database if using adapter
if self.db_adapter:
try:
metadata = result.item.metadata or {}
# Extract filename and file_path from save_path
filename = result.item.save_path.name if result.item.save_path else None
file_path = str(result.item.save_path) if result.item.save_path else None
# Get post_date from the DownloadItem
item_post_date = result.item.post_date if hasattr(result.item, 'post_date') else None
# If deferred, store for later recording after file move
if getattr(self, '_current_defer_database', False):
self.pending_downloads.append({
'url': result.item.url,
'thread_id': metadata.get('thread_id'),
'post_id': metadata.get('post_id'),
'filename': filename,
'file_path': file_path,
'metadata': metadata,
'post_date': item_post_date
})
self.log(f"Deferred recording for {filename}", "debug")
else:
self.db_adapter.record_download(
url=result.item.url,
thread_id=metadata.get('thread_id'),
post_id=metadata.get('post_id'),
filename=filename,
metadata=metadata,
file_path=file_path,
post_date=item_post_date
)
except Exception as e:
self.log(f"Failed to record download in unified database: {e}", "error")
conn.commit()
conn.close()
self.log(f"Updated {downloaded_images} items in download queue to completed", "debug")
# Update our stats already included in downloaded_images
self.stats['threads_processed'] += 1
self.stats['posts_downloaded'] += downloaded_posts
self.stats['images_downloaded'] += downloaded_images
# Track downloaded file paths for notification attachments
downloaded_file_paths = []
# Move files from base_path to destination_path if different (unless skip_file_move is True)
if not skip_file_move and base_download_path and destination_path and thread_dir != final_dir and downloaded_images > 0:
try:
import shutil
# Use MoveManager to move files (handles file_inventory registration and face recognition)
unified_db = self.db_adapter.unified_db if self.db_adapter and hasattr(self.db_adapter, 'unified_db') else None
move_manager = MoveManager(
unified_db=unified_db,
face_recognition_enabled=True # Enable face recognition for forum downloads
)
# Set batch context for proper file_inventory registration
move_manager.batch_context = {
'platform': 'forums',
'source': forum_name
}
files_moved = 0
# Get post date from thread data (actual forum post date)
post_date = None
if thread_data:
# First try to get the actual last_post_date from the forum
last_post_date = thread_data.get('last_post_date')
if last_post_date:
try:
if isinstance(last_post_date, str):
post_date = datetime.fromisoformat(last_post_date.replace('Z', '+00:00'))
elif isinstance(last_post_date, datetime):
post_date = last_post_date
if post_date:
self.log(f"Using forum post date: {post_date.strftime('%Y-%m-%d %H:%M')}", "debug")
except Exception as e:
self.log(f"Failed to parse last_post_date: {e}", "debug")
# Fallback: try to extract from title if no post date
if not post_date and DATE_UTILS_AVAILABLE:
thread_title = thread_data.get('title', '')
if thread_title:
post_date = DateHandler.extract_date_from_text(thread_title)
if post_date:
self.log(f"Extracted date from title: {post_date.strftime('%Y-%m-%d')}", "debug")
for file in thread_dir.rglob('*'):
if file.is_file():
relative_path = file.relative_to(thread_dir)
dest_file = final_dir / relative_path
dest_file.parent.mkdir(parents=True, exist_ok=True)
# Use MoveManager.move_file() which handles:
# - Duplicate detection via hash
# - file_inventory registration
# - EXIF and filesystem timestamp updates (centralized)
# - Face recognition (moves to review queue if no match)
if move_manager.move_file(file, dest_file, timestamp=post_date):
files_moved += 1
elif dest_file.exists():
# File was skipped (already exists at destination)
pass
# Only add files that matched faces (not review queue) to notification list
# move_manager.moved_files contains only matched files
# move_manager.review_queue_files contains files without face matches
matched_count = len(move_manager.moved_files)
for file_info in move_manager.moved_files:
file_path = file_info.get('file_path')
if file_path:
downloaded_file_paths.append(file_path)
# DEBUG: Log what we're adding to notification
self.log(f"Added {matched_count} face-matched files to notification list", "debug")
if matched_count > 0:
for fp in downloaded_file_paths[:3]: # Log first 3
self.log(f" - {Path(fp).name}", "debug")
# Log review queue files for debugging
review_count = len(move_manager.review_queue_files)
if review_count > 0:
self.log(f"{review_count} files moved to review queue (no face match)", "info")
# Clean up temp directory completely
if thread_dir.exists():
try:
# Force remove the entire thread directory and all its contents
import shutil
shutil.rmtree(thread_dir, ignore_errors=True)
self.log(f"Removed thread directory: {thread_dir}", "debug")
except Exception as e:
self.log(f"Failed to remove thread directory {thread_dir}: {e}", "warning")
# Clean up all parent directories up to base_download_path
# Start from the parent of thread_dir and work up
parent = thread_dir.parent if not thread_dir.exists() else thread_dir.parent
base_path = Path(base_download_path)
# Keep going up until we reach base_download_path or its parent
while parent and parent != base_path.parent and parent != base_path.parent.parent:
try:
if parent.exists():
# Check if directory is empty
if not any(parent.iterdir()):
parent.rmdir()
self.log(f"Removed empty parent directory: {parent}", "debug")
else:
# Directory not empty, check if it only contains empty subdirs
subdirs = [d for d in parent.iterdir() if d.is_dir()]
if subdirs and all(not any(d.iterdir()) for d in subdirs):
# All subdirs are empty, remove them
for subdir in subdirs:
try:
subdir.rmdir()
self.log(f"Removed empty subdirectory: {subdir}", "debug")
except Exception:
pass
# Try to remove parent again if now empty
if not any(parent.iterdir()):
parent.rmdir()
self.log(f"Removed parent directory after cleaning subdirs: {parent}", "debug")
# Move up one level
parent = parent.parent
except Exception as e:
self.log(f"Error cleaning parent directory {parent}: {e}", "debug")
break
if files_moved > 0:
self.log(f"Moved {files_moved} files to: {final_dir}", "info")
except Exception as e:
self.log(f"Error moving files: {e}", "error")
elif downloaded_images > 0 and final_dir and final_dir.exists():
# Files were downloaded directly to final location (no move needed)
# Track the file paths for notification attachments
for file in final_dir.rglob('*'):
if file.is_file():
downloaded_file_paths.append(str(file))
if downloaded_file_paths:
self.log(f"Tracked {len(downloaded_file_paths)} files in: {final_dir}", "debug")
self.log(
f"Thread complete: {downloaded_posts} posts, {downloaded_images} images",
"success"
)
# Update last_checked timestamp to prevent immediate re-checking by monitor
if self.use_database and thread_id:
try:
conn = self._get_db_connection()
cursor = conn.cursor()
# Update last_checked to current time
cursor.execute('''
UPDATE threads
SET last_checked = ?
WHERE thread_id = ?
''', (datetime.now().isoformat(), thread_id))
conn.commit()
conn.close()
self.log(f"Updated last_checked timestamp for thread {thread_id}", "debug")
except Exception as e:
self.log(f"Failed to update last_checked timestamp: {e}", "warning")
# Also update in unified database if available
if self.db_adapter and thread_id:
try:
self.db_adapter.db_update_thread_last_checked(thread_id)
except Exception as e:
self.log(f"Failed to update last_checked in unified database: {e}", "warning")
# Close browser only if we created it locally (not if using existing from login())
if local_playwright and browser:
browser.close()
local_playwright.stop()
return {
'status': 'success',
'thread_id': thread_id,
'posts_downloaded': downloaded_posts,
'images_downloaded': downloaded_images,
'thread_dir': str(thread_dir), # Temp directory where files were downloaded
'final_dir': str(final_dir) if destination_path else None, # Final destination directory
'downloaded_file_paths': downloaded_file_paths # List of final file paths for notifications
}
def update_monitored_threads(self, force_all: bool = False) -> Dict:
"""
Update all monitored threads
Args:
force_all: Update all threads regardless of monitor_until date
Returns:
Dictionary with update results
"""
if not self.use_database:
self.log("Database required for thread monitoring", "error")
return {}
conn = self._get_db_connection()
cursor = conn.cursor()
# Get threads to update
if force_all:
cursor.execute(
"SELECT thread_id, thread_url, forum_name FROM threads WHERE status = 'active'"
)
else:
cursor.execute('''
SELECT thread_id, thread_url, forum_name
FROM threads
WHERE status = 'active'
AND (monitor_until IS NULL OR monitor_until > ?)
''', (datetime.now().isoformat(),))
threads = cursor.fetchall()
conn.close()
self.log(f"Updating {len(threads)} monitored threads", "info")
results = {
'total': len(threads),
'updated': 0,
'new_posts': 0,
'errors': 0
}
for thread_id, thread_url, forum_name in threads:
try:
result = self.download_thread(
thread_url,
forum_name=forum_name,
update_existing=True
)
if result['status'] == 'success':
results['updated'] += 1
# Track new posts (would need to compare with previous count)
except Exception as e:
self.log(f"Error updating thread {thread_id}: {e}", "error")
results['errors'] += 1
self.stats['errors'] += 1
self._apply_rate_limit()
return results
def _perform_advanced_search(self,
forum_name: str,
search_query: str,
forum_url: str = None,
newer_than_days: int = None,
older_than_days: int = None,
username: str = None,
password: str = None,
cloudflare_enabled: bool = False) -> str:
"""
Perform advanced search with date filters
Returns the search results URL or None if failed
"""
from datetime import datetime, timedelta
# Calculate dates
newer_date = (datetime.now() - timedelta(days=newer_than_days)).strftime('%m/%d/%Y') if newer_than_days else None
older_date = (datetime.now() - timedelta(days=older_than_days)).strftime('%m/%d/%Y') if older_than_days else None
page = None
try:
# Check thread safety before using self.context - Playwright contexts
# cannot be shared across threads (causes "Cannot switch to a different thread" error)
import threading
current_thread_id = threading.current_thread().ident
context_thread_id = getattr(self, '_context_thread_id', None)
can_use_self_context = (self.context and context_thread_id == current_thread_id)
# Use existing context if available (from login session) AND in same thread
if can_use_self_context:
page = self.context.new_page()
else:
# Need to create a new browser context (thread-safe)
if not self.playwright:
self.playwright = sync_playwright().start()
if not self.browser:
self.browser = self.playwright.chromium.launch(
headless=self.headless,
executable_path='/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome' if os.path.exists('/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome') else None
)
if not self.context:
self.context = self._create_browser_context(self.browser)
self._context_thread_id = current_thread_id
page = self.context.new_page()
# Validate forum URL
if not forum_url:
self.log(f"Forum URL is required for {forum_name}", "error")
return None
# Special handling for PicturePub - use form with date fields
if forum_name == 'PicturePub':
return self._perform_picturepub_search(page, forum_url, newer_date, older_date, search_query)
# Special handling for phun.org - use direct URL search to avoid Cloudflare form challenge
if 'phun.org' in forum_url.lower():
return self._perform_phun_search(page, forum_url, newer_date, search_query, cloudflare_enabled, forum_name)
# Navigate to search page (with Cloudflare support)
search_page_url = f"{forum_url}/search/"
if not self._navigate_with_cloudflare(page, search_page_url, forum_name, cloudflare_enabled):
self.log(f"Failed to navigate to search page for {forum_name}", "error")
return None
page.wait_for_timeout(500)
# Click "Search everything" tab - try multiple selectors (English and German)
search_tab_selectors = [
"text='Search everything'",
"text='Everything'",
"text='Alles durchsuchen'",
"a:has-text('Everything')",
"a:has-text('Alles')",
"a[data-nav-id='everything']",
".tabPanes a:first"
]
tab_clicked = False
for selector in search_tab_selectors:
try:
if page.locator(selector).count() > 0:
page.locator(selector).first.click()
tab_clicked = True
break
except Exception:
continue
if tab_clicked:
page.wait_for_timeout(500)
# Scroll down to see date fields
page.evaluate("window.scrollBy(0, 400)")
page.wait_for_timeout(300)
# Fill date filters FIRST (important for XenForo)
if newer_date:
# Try multiple selectors for newer date field
newer_selectors = [
'input[name="c[newer_than]"]',
'input[name="newer_than"]',
'input[placeholder*="Newer"]',
'input.input--date:first'
]
newer_field = None
for selector in newer_selectors:
try:
if page.locator(selector).count() > 0:
newer_field = page.locator(selector).first
break
except Exception:
continue
if newer_field:
newer_field.click()
newer_field.clear()
newer_field.type(newer_date, delay=50)
page.keyboard.press('Tab')
self.log(f"Set newer_than: {newer_date}", "info")
if older_date:
# Try multiple selectors for older date field
older_selectors = [
'input[name="c[older_than]"]',
'input[name="older_than"]',
'input[placeholder*="Older"]',
'input.input--date:last'
]
older_field = None
for selector in older_selectors:
try:
if page.locator(selector).count() > 0:
older_field = page.locator(selector).first
break
except Exception:
continue
if older_field:
older_field.click()
older_field.clear()
older_field.type(older_date, delay=50)
page.keyboard.press('Tab')
self.log(f"Set older_than: {older_date}", "info")
page.wait_for_timeout(300)
# Check "Search titles only" checkbox (supports English and German)
titles_selectors = [
'label:has-text("Search titles only")',
'label:has-text("Nur Titel durchsuchen")',
'input[name="c[title_only]"]'
]
for selector in titles_selectors:
try:
elem = page.locator(selector).last
if elem.count() > 0:
elem.click(timeout=5000)
page.wait_for_timeout(300)
break
except Exception:
continue
# Fill keywords LAST (important for XenForo)
# Try multiple selectors for different languages
keywords_selectors = [
'input[name="keywords"][type="search"]',
'input[name="keywords"]',
page.get_by_role("searchbox", name="Keywords:"),
page.get_by_role("searchbox", name="Schlüsselwörter:")
]
keywords_field = None
for selector in keywords_selectors:
try:
if isinstance(selector, str):
elem = page.locator(selector)
else:
elem = selector
if elem.count() > 0:
keywords_field = elem.last
break
except Exception:
continue
if keywords_field:
keywords_field.click()
keywords_field.clear()
keywords_field.type(search_query, delay=50)
self.log(f"Set keywords: {search_query}", "info")
page.wait_for_timeout(300)
# Scroll to search button and click
page.evaluate("window.scrollBy(0, 200)")
page.wait_for_timeout(200)
# Try multiple search button selectors (English and German)
search_button_selectors = [
'.formSubmitRow button[type="submit"]',
'button.button--icon--search[type="submit"]',
'button:has-text("Search")',
'button:has-text("Suche")',
'button[type="submit"]:visible'
]
search_button = None
for selector in search_button_selectors:
try:
btn = page.locator(selector).first
if btn.count() > 0:
search_button = btn
break
except Exception:
continue
if search_button:
try:
search_button.scroll_into_view_if_needed(timeout=5000)
except Exception:
pass
search_button.click(force=True)
# Wait for results
page.wait_for_load_state('networkidle')
# Try to wait for results, but don't fail if none found
try:
page.wait_for_selector('.contentRow-title', timeout=5000)
except Exception:
# Check if "no results" message is shown
no_results = page.query_selector(':text("No results found")')
if no_results:
self.log("Search returned no results", "info")
else:
self.log("Waiting for results timed out", "warning")
# Get the final search URL
final_url = page.url
self.log(f"Advanced search URL: {final_url}", "info")
# Verify date filters are in URL
if newer_than_days and "newer_than" not in final_url:
self.log("Warning: newer_than filter may not be applied", "warning")
if older_than_days and "older_than" not in final_url:
self.log("Warning: older_than filter may not be applied", "warning")
return final_url
except Exception as e:
self.log(f"Advanced search error: {e}", "error")
return None
finally:
if page:
page.close()
def _perform_picturepub_search(self, page, forum_url: str, newer_date: str, older_date: str, search_query: str) -> str:
"""
Perform PicturePub-specific advanced search using form with date fields
Returns the search results URL or None if failed
"""
from datetime import datetime
try:
self.log("Using PicturePub-specific advanced search with date fields", "info")
# Navigate to search page
page.goto(f"{forum_url}/search/", wait_until='networkidle')
page.wait_for_timeout(2000)
# Find the form that has date input fields (advanced form)
forms = page.locator('form[action="/search/search"]').all()
advanced_form = None
for form in forms:
# Check if this form has date fields
newer_input = form.locator('input[name="c[newer_than]"]')
if newer_input.count() > 0:
# This is the advanced form with date fields
advanced_form = form
self.log("Found PicturePub advanced search form with date fields", "info")
# Fill keywords in THIS form
keywords = form.locator('input[name="keywords"]')
if keywords.count() > 0:
keywords.fill(search_query)
self.log(f"Filled keywords: {search_query}", "info")
# Fill newer_than date
if newer_date:
# Convert date format from MM/DD/YYYY to YYYY-MM-DD
date_obj = datetime.strptime(newer_date, '%m/%d/%Y')
formatted_date = date_obj.strftime('%Y-%m-%d')
newer_input.fill(formatted_date)
self.log(f"Set newer_than date: {formatted_date}", "info")
# Fill older_than date if provided
if older_date:
older_input = form.locator('input[name="c[older_than]"]')
if older_input.count() > 0:
date_obj = datetime.strptime(older_date, '%m/%d/%Y')
formatted_date = date_obj.strftime('%Y-%m-%d')
older_input.fill(formatted_date)
self.log(f"Set older_than date: {formatted_date}", "info")
# Check titles only (optional - skip if blocked)
try:
titles_checkbox = form.locator('input[name="c[title_only]"]')
if titles_checkbox.count() > 0:
# Try to check with force to bypass overlays
titles_checkbox.check(force=True)
self.log("Checked 'Search titles only'", "info")
except Exception:
self.log("Could not check titles only checkbox (optional)", "debug")
# Submit this form
form.evaluate('form => form.submit()')
self.log("Submitted PicturePub advanced search form", "info")
break
if not advanced_form:
self.log("Could not find PicturePub advanced form, using simple search", "warning")
# Wait for results
page.wait_for_timeout(5000)
final_url = page.url
return final_url
except Exception as e:
self.log(f"PicturePub search error: {e}", "error")
return None
def _perform_phun_search(self, page, forum_url: str, newer_date: str, search_query: str,
cloudflare_enabled: bool, forum_name: str) -> str:
"""
Perform phun.org-specific search using direct URL to avoid Cloudflare form challenge.
phun.org uses an older XenForo theme that triggers Cloudflare on form submissions.
Returns a special marker with results to avoid double navigation.
"""
from urllib.parse import quote_plus, urljoin
try:
self.log("Using phun.org direct URL search (bypasses Cloudflare form challenge)", "info")
# Build direct search URL - phun.org/XenForo 1.x format
# Uses keywords=, order=, title_only=1, date=UNIX_TS (no c[] wrapper)
encoded_query = quote_plus(search_query)
# Convert newer_date to Unix timestamp for XenForo 1.x date parameter
date_param = ""
if newer_date:
try:
from datetime import datetime
# Try multiple date formats
dt = None
for fmt in ["%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y"]:
try:
dt = datetime.strptime(newer_date, fmt)
break
except ValueError:
continue
if dt:
unix_ts = int(dt.timestamp())
date_param = f"&date={unix_ts}"
self.log(f"Filtering to posts newer than: {newer_date} (ts={unix_ts})", "info")
else:
self.log(f"Failed to parse date {newer_date}", "warning")
except Exception as e:
self.log(f"Failed to parse date {newer_date}: {e}", "warning")
# XenForo 1.x format: keywords, order, title_only, date (no c[] wrapper)
search_url = f"{forum_url}/search/search?keywords={encoded_query}&order=date&title_only=1{date_param}"
self.log(f"Direct search URL: {search_url}", "debug")
# Navigate with Cloudflare support
if not self._navigate_with_cloudflare(page, search_url, forum_name, cloudflare_enabled):
self.log(f"Failed to navigate to search results for {forum_name}", "error")
return None
page.wait_for_timeout(3000)
final_url = page.url
self.log(f"phun.org search result URL: {final_url}", "info")
# Scrape results directly on this page (avoid double navigation/Cloudflare)
results = []
# Debug: check what selectors are available
phun_count = page.locator('li.searchResult h3 a').count()
xf2_count = page.locator('.contentRow-title a').count()
thread_links_count = page.locator('a[href*="/threads/"]').count()
self.log(f"phun.org selector counts: li.searchResult={phun_count}, contentRow={xf2_count}, threads={thread_links_count}", "debug")
# Try phun.org-specific selector first
if phun_count > 0:
thread_links = page.locator('li.searchResult h3 a').all()
self.log(f"Found {len(thread_links)} phun.org-style search results", "info")
for link in thread_links:
try:
result = {
'title': link.inner_text(),
'url': link.get_attribute('href'),
'author': 'Unknown',
'date': datetime.now().isoformat()
}
if result['url'] and not result['url'].startswith('http'):
result['url'] = urljoin(forum_url, result['url'])
if result.get('url') and '/threads/' in result['url']:
result['thread_id'] = self._extract_thread_id(result['url'])
results.append(result)
self.log(f"Added: {result['title'][:50]}", "debug")
except Exception as e:
self.log(f"Error parsing result: {e}", "debug")
# Try XenForo 2.x selector
elif xf2_count > 0:
thread_links = page.locator('.contentRow-title a').all()
self.log(f"Found {len(thread_links)} XenForo 2.x search results", "info")
for link in thread_links:
try:
result = {
'title': link.inner_text(),
'url': link.get_attribute('href'),
'author': 'Unknown',
'date': datetime.now().isoformat()
}
if result['url'] and not result['url'].startswith('http'):
result['url'] = urljoin(forum_url, result['url'])
if result.get('url') and '/threads/' in result['url']:
result['thread_id'] = self._extract_thread_id(result['url'])
results.append(result)
except Exception as e:
self.log(f"Error parsing result: {e}", "debug")
# Fallback: find any thread links
elif thread_links_count > 0:
thread_links = page.locator('a[href*="/threads/"]').all()
self.log(f"Using fallback: found {len(thread_links)} thread links", "info")
for link in thread_links:
try:
href = link.get_attribute('href')
title = link.inner_text().strip()
if href and title and len(title) > 5:
result = {
'title': title,
'url': href if href.startswith('http') else urljoin(forum_url, href),
'author': 'Unknown',
'date': datetime.now().isoformat()
}
result['thread_id'] = self._extract_thread_id(result['url'])
if result not in results:
results.append(result)
except Exception:
pass
self.log(f"phun.org search found {len(results)} threads", "info")
# Store results and return special marker
self._phun_search_results = results
return "PHUN_RESULTS_READY"
except Exception as e:
self.log(f"phun.org search error: {e}", "error")
return None
def _scrape_search_results(self, search_url: str, context=None) -> List[Dict]:
"""Scrape search results page with support for multiple forum types"""
results = []
browser = None
page = None
local_playwright = None
try:
# Check thread safety before using self.context - Playwright contexts
# cannot be shared across threads (causes "Cannot switch to a different thread" error)
import threading
current_thread_id = threading.current_thread().ident
context_thread_id = getattr(self, '_context_thread_id', None)
can_use_self_context = (self.context and context_thread_id == current_thread_id)
# Use existing context if available (from login session) AND in same thread
if can_use_self_context:
page = self.context.new_page()
elif context:
page = context.new_page()
else:
local_playwright = sync_playwright().start()
browser = local_playwright.chromium.launch(
headless=self.headless,
executable_path='/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome' if os.path.exists('/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome') else None
)
page = browser.new_page(user_agent=self.user_agent)
# Use 'load' instead of 'networkidle' - phun.org has many ads/trackers that prevent networkidle
try:
page.goto(search_url, wait_until='load', timeout=30000)
page.wait_for_timeout(2000) # Brief wait for dynamic content
except Exception as nav_error:
self.log(f"Navigation timeout, trying domcontentloaded: {nav_error}", "warning")
page.goto(search_url, wait_until='domcontentloaded', timeout=30000)
# Use pre-set forum type or detect it
if hasattr(self, 'forum_type') and self.forum_type:
forum_type = self.forum_type
self.log(f"Using pre-set forum type: {forum_type.value}", "info")
else:
forum_type = self.authenticator.detect_forum_type(page) if hasattr(self, 'authenticator') else ForumType.UNKNOWN
self.log(f"Detected forum type: {forum_type.value}", "info")
# Debug: check what selectors are available
phun_count = page.locator('li.searchResult h3 a').count()
xf2_count = page.locator('.contentRow-title a').count()
block_count = page.locator('.block-row a').count()
thread_links_count = page.locator('a[href*="/threads/"]').count()
self.log(f"Selector counts: li.searchResult={phun_count}, contentRow={xf2_count}, block-row={block_count}, threads={thread_links_count}", "debug")
# Debug: save HTML snippet to file for analysis
if thread_links_count == 0:
try:
html_snippet = page.content()[:5000]
with open('/tmp/phun_debug.html', 'w') as f:
f.write(html_snippet)
self.log("Saved HTML snippet to /tmp/phun_debug.html", "debug")
except Exception:
pass
# phun.org / XenForo with listBlock structure (older theme)
if phun_count > 0:
thread_links = page.locator('li.searchResult h3 a').all()
self.log(f"Found {len(thread_links)} phun.org-style search results", "info")
for link in thread_links:
result = {}
try:
result['title'] = link.inner_text()
result['url'] = link.get_attribute('href')
if result['url'] and not result['url'].startswith('http'):
result['url'] = urljoin(search_url, result['url'])
if result.get('url'):
result['thread_id'] = self._extract_thread_id(result['url'])
result['author'] = 'Unknown'
result['date'] = datetime.now().isoformat()
if result.get('url') and result.get('title'):
# Only include thread links, skip post links
if '/threads/' in result['url']:
results.append(result)
self.log(f"Added result: {result['title'][:50]}", "debug")
else:
self.log(f"Skipped (not a thread): {result.get('url', 'No URL')}", "debug")
except Exception as e:
self.log(f"Error parsing search result: {e}", "debug")
continue
# XenForo 2.x - contentRow structure
elif page.locator('.contentRow-title a').count() > 0:
# Look for all thread links in search results
thread_links = page.locator('.contentRow-title a').all()
self.log(f"Found {len(thread_links)} XenForo 2.x search results", "info")
for link in thread_links:
result = {}
# Extract title and URL
try:
result['title'] = link.inner_text()
result['url'] = link.get_attribute('href')
self.log(f"Raw URL: {result['url']}", "debug")
if result['url'] and not result['url'].startswith('http'):
result['url'] = urljoin(search_url, result['url'])
self.log(f"Processed URL: {result['url']}", "debug")
# Extract thread ID from URL
if result.get('url'):
result['thread_id'] = self._extract_thread_id(result['url'])
# For XenForo search results, we may not have all metadata
# but we have title and URL which is enough
result['author'] = 'Unknown'
result['date'] = datetime.now().isoformat()
# Accept any URL that looks like it could be a thread
if result.get('url') and result.get('title'):
# Skip obvious non-thread links
skip_patterns = ['/members/', '/forums/', '/search/', '/login', '/register']
if not any(p in result['url'] for p in skip_patterns):
results.append(result)
self.log(f"Added result: {result['title'][:50]}", "debug")
else:
self.log(f"Skipped (non-thread pattern): {result.get('url', 'No URL')}", "debug")
except Exception as e:
self.log(f"Error parsing search result: {e}", "debug")
continue
# XenForo 1.x - ol.searchResults structure
elif page.locator('ol.searchResults li').count() > 0:
search_items = page.locator('ol.searchResults li').all()
self.log(f"Found {len(search_items)} XenForo 1.x search results", "info")
for item in search_items:
result = {}
title_elem = item.locator('h3.title a').first
if title_elem:
result['title'] = title_elem.inner_text()
result['url'] = title_elem.get_attribute('href')
if result['url'] and not result['url'].startswith('http'):
result['url'] = urljoin(search_url, result['url'])
if result.get('url'):
result['thread_id'] = self._extract_thread_id(result['url'])
meta_elem = item.locator('.meta').first
if meta_elem:
result['author'] = meta_elem.inner_text().split(',')[0].strip()
if result.get('url'):
results.append(result)
# vBulletin structure
elif page.locator('li.searchResult').count() > 0:
search_items = page.locator('li.searchResult').all()
self.log(f"Found {len(search_items)} vBulletin search results", "info")
for item in search_items:
result = {}
title_elem = item.locator('h3 a').first
if title_elem:
result['title'] = title_elem.inner_text()
result['url'] = title_elem.get_attribute('href')
if result['url'] and not result['url'].startswith('http'):
result['url'] = urljoin(search_url, result['url'])
if result.get('url'):
results.append(result)
# Generic fallback
else:
# Check if page explicitly says no results
no_results_text = page.locator('text=/no results/i, text=/no threads found/i, text=/no matches/i').first
if no_results_text:
self.log("Search returned no results (detected 'no results' message)", "info")
# Don't use generic parser when we know there are no results
else:
# Try to find any links that look like thread URLs
thread_links = page.locator('a[href*="/threads/"], a[href*="/topic/"], a[href*="showthread"]').all()
self.log(f"Using generic parser, found {len(thread_links)} potential threads", "info")
for link in thread_links:
result = {
'title': link.inner_text(),
'url': link.get_attribute('href'),
'author': 'Unknown'
}
if result['url'] and not result['url'].startswith('http'):
result['url'] = urljoin(search_url, result['url'])
if result['url'] and result['title']:
results.append(result)
# Only close if we created them locally (not using persistent context)
if page and not self.context and not context:
page.close()
if browser:
browser.close()
if local_playwright:
local_playwright.stop()
except Exception as e:
self.log(f"Error scraping search results: {e}", "error")
if page and not self.context and not context:
page.close()
if browser:
browser.close()
if local_playwright:
local_playwright.stop()
return results
def _scrape_thread_impl(self, thread_url: str, context=None, saved_cookies=None) -> Optional[Dict]:
"""Implementation of thread scraping - runs in separate thread to avoid async context issues"""
thread_data = {
'title': '',
'author': '',
'created_date': None,
'last_post_date': None,
'posts': []
}
browser = None
page = None
local_playwright = None
try:
# Check thread safety before using self.context - Playwright contexts
# cannot be shared across threads (causes "Cannot switch to a different thread" error)
import threading
current_thread_id = threading.current_thread().ident
context_thread_id = getattr(self, '_context_thread_id', None)
can_use_self_context = (self.context and context_thread_id == current_thread_id)
# Use existing context if available (from login session) AND in same thread
if can_use_self_context:
page = self.context.new_page()
elif context:
# Use provided context
page = context.new_page()
else:
# Create new context (always safe - new playwright instance per thread)
local_playwright = sync_playwright().start()
browser = local_playwright.chromium.launch(
headless=self.headless,
executable_path='/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome' if os.path.exists('/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome') else None
)
page = browser.new_page(user_agent=self.user_agent)
page.goto(thread_url, wait_until='networkidle')
# Extract thread info (forum-specific)
title_elem = page.query_selector('h1, .thread-title')
if title_elem:
thread_data['title'] = title_elem.inner_text()
# Extract posts based on forum type
# XenForo 1.x uses li.message, XenForo 2.x uses article.message
if 'xenforo' in page.content().lower() or 'xf' in page.content().lower():
# Try XenForo 2 first (article.message), then XenForo 1 (li.message)
posts = page.query_selector_all('article.message')
if not posts:
posts = page.query_selector_all('li.message')
else:
posts = page.query_selector_all('.post, .message, article')
for post in posts:
post_data = {}
# Extract post content
# XenForo 2: .message-body, XenForo 1: .messageContent, .messageText
content_elem = post.query_selector('.message-body, .post-content, .messageContent, .messageText, .message-content')
if content_elem:
post_data['content'] = content_elem.inner_text()
# Extract author
author_elem = post.query_selector('.message-name, .author, .username')
if author_elem:
post_data['author'] = author_elem.inner_text()
# Extract date
date_elem = post.query_selector('time, .date, .timestamp')
if date_elem:
post_data['date'] = date_elem.get_attribute('datetime') or date_elem.inner_text()
# Extract EXTERNAL image links (not inline forum attachments)
images = []
# Look for external image host links
link_selectors = [
'a[href*="imagebam"]',
'a[href*="imgbox"]',
'a[href*="imgur"]',
'a[href*="postimg"]',
'a[href*="imgbb"]',
'a[href*="pixhost"]',
'a[href*="imagevenue"]',
'a[href*="catbox"]',
'a[href*="fastdl.app"]',
'a[href*="picturepub.net"]',
'a[href*="imagetwist"]',
'a.file-preview'
]
for selector in link_selectors:
links = post.query_selector_all(selector)
for link in links:
href = link.get_attribute('href')
if href and href not in images:
# Make sure it's a full URL
if not href.startswith('http'):
href = urljoin(thread_url, href)
# Skip forum's internal attachments
if '/attachments/' not in href:
# Skip thumbnails (imgbox thumbs2, ImageBam thumbs, or _t.jpg/_t.png endings)
if ('thumbs' in href and ('imgbox.com' in href or 'imagebam.com' in href)) or href.endswith('_t.jpg') or href.endswith('_t.png'):
continue
images.append(href)
# Also check for any external links that might be images
all_links = post.query_selector_all('a[href^="http"]')
for link in all_links:
href = link.get_attribute('href')
if href and '/attachments/' not in href:
# Check if it's an image host we support
if ImageHostHandler.identify_host(href) and href not in images:
images.append(href)
post_data['images'] = images
thread_data['posts'].append(post_data)
# Extract last_post_date from the posts (use the most recent post's date)
latest_date = None
for post in thread_data['posts']:
post_date_str = post.get('date')
if post_date_str:
try:
# Try ISO format first (datetime attribute)
if 'T' in str(post_date_str):
parsed_date = datetime.fromisoformat(post_date_str.replace('Z', '+00:00'))
else:
# Try common forum date formats
for fmt in ['%b %d, %Y at %I:%M %p', '%B %d, %Y', '%d %b %Y', '%Y-%m-%d', '%m/%d/%Y']:
try:
parsed_date = datetime.strptime(str(post_date_str).strip(), fmt)
break
except ValueError:
continue
else:
parsed_date = None
if parsed_date and (latest_date is None or parsed_date > latest_date):
latest_date = parsed_date
except (ValueError, TypeError, AttributeError):
pass # Invalid date format, skip this post
if latest_date:
thread_data['last_post_date'] = latest_date.isoformat()
self.log(f"Extracted last_post_date: {latest_date.strftime('%Y-%m-%d %H:%M')}", "debug")
# Only close if we created them locally (not using persistent context)
if page and not self.context and not context:
page.close()
if browser:
browser.close()
if local_playwright:
local_playwright.stop()
except Exception as e:
self.log(f"Error scraping thread: {e}", "error")
if page and not self.context and not context:
page.close()
if browser:
browser.close()
if local_playwright:
local_playwright.stop()
return None
return thread_data
def _scrape_thread(self, thread_url: str, context=None) -> Optional[Dict]:
"""Scrape a forum thread with authentication support"""
thread_data = {
'title': '',
'author': '',
'created_date': None,
'last_post_date': None,
'posts': []
}
browser = None
page = None
local_playwright = None
try:
# For phun.org, we need a fresh context with the correct user-agent
# because cf_clearance cookies are tied to browser fingerprint
use_fresh_context = 'phun.org' in thread_url
# Use provided context first (passed from download_thread with thread-safe handling)
# Only fall back to self.context if no context passed and not Cloudflare site
# IMPORTANT: Check thread safety before using self.context - Playwright contexts
# cannot be shared across threads (causes "Cannot switch to a different thread" error)
import threading
current_thread_id = threading.current_thread().ident
context_thread_id = getattr(self, '_context_thread_id', None)
can_use_self_context = (self.context and not use_fresh_context and
context_thread_id == current_thread_id)
if context and not use_fresh_context:
# Use provided context (thread-safe - created in same thread)
page = context.new_page()
elif can_use_self_context:
# Fall back to self.context only if in same thread (verified thread-safe)
page = self.context.new_page()
else:
# Create new context (or forced for Cloudflare-protected sites)
local_playwright = sync_playwright().start()
browser = local_playwright.chromium.launch(
headless=self.headless,
executable_path='/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome' if os.path.exists('/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome') else None
)
# For Cloudflare-protected sites, use the stored user_agent from cookies
# cf_clearance cookies are tied to browser fingerprint
effective_user_agent = self.user_agent
if 'phun.org' in thread_url:
cf_handler = CloudflareHandler(
module_name="Forum.phun.org",
cookie_file="cookies/forum_cookies_phun.org.json",
flaresolverr_url=self.flaresolverr_url,
flaresolverr_enabled=self.flaresolverr_enabled,
user_agent=self.user_agent,
logger=self.logger
)
stored_ua = cf_handler.get_user_agent()
if stored_ua:
effective_user_agent = stored_ua
self.log(f"Using stored user-agent for phun.org Cloudflare cookies", "debug")
page = browser.new_page(user_agent=effective_user_agent)
# Use Cloudflare bypass for phun.org
if 'phun.org' in thread_url:
# Always enable Cloudflare for phun.org - it requires it regardless of config
cloudflare_enabled = True
self.log(f"phun.org thread: forcing cloudflare_enabled=True", "debug")
# Use 'load' instead of 'networkidle' to avoid timeout on ad-heavy pages
if not self._navigate_with_cloudflare(page, thread_url, 'phun.org', cloudflare_enabled,
wait_until='load', timeout=30000):
self.log(f"Failed to navigate to thread with Cloudflare bypass: {thread_url}", "error")
return thread_data
page.wait_for_timeout(3000) # Longer wait for Cloudflare
else:
# Use 'load' instead of 'networkidle' for other forums
try:
page.goto(thread_url, wait_until='load', timeout=30000)
# Wait for post content to render (XenForo 2.x or 1.x)
try:
page.wait_for_selector('article.message, li.message, .post, .message', timeout=10000)
except Exception:
pass # Timeout waiting for posts - page may have no posts or different structure
except Exception as nav_error:
self.log(f"Thread navigation timeout, trying domcontentloaded: {nav_error}", "warning")
page.goto(thread_url, wait_until='domcontentloaded', timeout=30000)
page.wait_for_timeout(3000)
# Extract thread info (forum-specific)
title_elem = page.query_selector('h1, .thread-title')
if title_elem:
thread_data['title'] = title_elem.inner_text()
# Extract posts based on forum type
# XenForo 1.x uses li.message, XenForo 2.x uses article.message
html_content = page.content().lower()
if 'xenforo' in html_content or 'xf' in html_content:
# Try XenForo 2 first (article.message), then XenForo 1 (li.message)
posts = page.query_selector_all('article.message')
if not posts:
posts = page.query_selector_all('li.message')
self.log(f"XenForo 1.x detected, found {len(posts)} li.message posts", "debug")
else:
self.log(f"XenForo 2.x detected, found {len(posts)} article.message posts", "debug")
else:
posts = page.query_selector_all('.post, .message, article')
self.log(f"Generic forum, found {len(posts)} posts", "debug")
# Debug: check if we're hitting Cloudflare (only if no posts found)
if not posts and ('just a moment' in html_content or 'cf-challenge' in html_content):
self.log("WARNING: Thread page shows Cloudflare challenge!", "warning")
# Save HTML for debugging
try:
with open('/tmp/phun_thread_debug.html', 'w') as f:
f.write(page.content()[:10000])
except Exception:
pass
for post in posts:
post_data = {}
# Extract post content
# XenForo 2: .message-body, XenForo 1: .messageContent, .messageText
content_elem = post.query_selector('.message-body, .post-content, .messageContent, .messageText, .message-content')
if content_elem:
post_data['content'] = content_elem.inner_text()
# Extract author
author_elem = post.query_selector('.message-name, .author, .username')
if author_elem:
post_data['author'] = author_elem.inner_text()
# Extract date
date_elem = post.query_selector('time, .date, .timestamp')
if date_elem:
post_data['date'] = date_elem.get_attribute('datetime') or date_elem.inner_text()
# Extract EXTERNAL image links (not inline forum attachments)
images = []
# Look for external image host links
link_selectors = [
'a[href*="imagebam"]',
'a[href*="imgbox"]',
'a[href*="imgur"]',
'a[href*="postimg"]',
'a[href*="imgbb"]',
'a[href*="pixhost"]',
'a[href*="imagevenue"]',
'a[href*="catbox"]',
'a[href*="fastdl.app"]',
'a[href*="picturepub.net"]',
'a[href*="imagetwist"]',
'a.file-preview'
]
for selector in link_selectors:
links = post.query_selector_all(selector)
for link in links:
href = link.get_attribute('href')
if href:
images.append(href)
# Also look for direct image links in the content (but exclude thumbnails)
img_tags = post.query_selector_all('img')
for img in img_tags:
src = img.get_attribute('src')
if src:
# Skip ImageBam thumbnails (they're on thumbs*.imagebam.com)
if 'thumbs' in src and 'imagebam.com' in src:
continue
# Skip imgbox thumbnails (they're on thumbs2.imgbox.com or end with _t.jpg)
if ('thumbs' in src and 'imgbox.com' in src) or (src.endswith('_t.jpg') or src.endswith('_t.png')):
continue
# Skip ImageTwist thumbnail URLs - we get proper URLs from <a href> links
# Thumbnails are on i*.imagetwist.com/th/ which we can't convert properly
if 'imagetwist.com' in src and '/th/' in src:
continue
# Only add direct images from these hosts (not imagebam since we want the link not the thumb)
if any(host in src for host in ['imgbox', 'imgur', 'postimg', 'imgbb']):
images.append(src)
if images:
post_data['images'] = list(set(images)) # Remove duplicates
thread_data['posts'].append(post_data)
# Extract last_post_date from the posts (use the most recent post's date)
latest_date = None
for post in thread_data['posts']:
post_date_str = post.get('date')
if post_date_str:
try:
# Try ISO format first (datetime attribute)
if 'T' in str(post_date_str):
parsed_date = datetime.fromisoformat(post_date_str.replace('Z', '+00:00'))
else:
# Try common forum date formats
for fmt in ['%b %d, %Y at %I:%M %p', '%B %d, %Y', '%d %b %Y', '%Y-%m-%d', '%m/%d/%Y']:
try:
parsed_date = datetime.strptime(str(post_date_str).strip(), fmt)
break
except ValueError:
continue
else:
parsed_date = None
if parsed_date and (latest_date is None or parsed_date > latest_date):
latest_date = parsed_date
except (ValueError, TypeError, AttributeError):
pass # Invalid date format, skip this post
if latest_date:
thread_data['last_post_date'] = latest_date.isoformat()
self.log(f"Extracted last_post_date: {latest_date.strftime('%Y-%m-%d %H:%M')}", "debug")
except Exception as e:
self.log(f"Error scraping thread: {e}", "error")
if page:
page.close()
if browser:
browser.close()
if local_playwright:
local_playwright.stop()
return None
finally:
# Close only the page, keep context alive for reuse
if page:
try:
page.close()
except Exception:
pass
# Only close browser if we created it locally
if browser:
browser.close()
if local_playwright:
local_playwright.stop()
return thread_data
def _extract_thread_id(self, url: str) -> str:
"""Extract thread ID from URL (forum-specific)"""
# Try common patterns
patterns = [
r'/threads?/([0-9]+)',
r'/t/([0-9]+)',
r'[?&]t=([0-9]+)',
r'/topic/([0-9]+)',
r'/viewtopic\.php\?.*t=([0-9]+)'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
# Fallback to URL hash
return hashlib.sha256(url.encode()).hexdigest()
def _detect_forum(self, url: str) -> str:
"""Detect forum software from URL"""
domain = urlparse(url).netloc
# Check for common forum software
if 'vbulletin' in url.lower() or '/showthread.php' in url:
return 'vBulletin'
elif 'phpbb' in url.lower() or '/viewtopic.php' in url:
return 'phpBB'
elif 'discourse' in url.lower() or '/t/' in url:
return 'Discourse'
elif 'xenforo' in url.lower() or '/threads/' in url:
return 'XenForo'
elif 'smf' in url.lower() or 'index.php?topic=' in url:
return 'SMF'
elif 'invision' in url.lower() or '/topic/' in url:
return 'Invision'
return domain
def _extract_date_from_post(self, post: Dict, thread_data: Dict) -> Optional[datetime]:
"""Extract date from post or thread title"""
import re
from datetime import datetime
# First try to extract from thread title
title = thread_data.get('title', '')
# Common date patterns in titles
# Examples: "15.08.2025", "08/15/2025", "15-08-2025", "August 15, 2025"
date_patterns = [
r'(\d{1,2})[\.\/\-](\d{1,2})[\.\/\-](\d{4})', # DD.MM.YYYY or MM/DD/YYYY
r'(\d{4})[\-\/](\d{1,2})[\-\/](\d{1,2})', # YYYY-MM-DD
r'(January|February|March|April|May|June|July|August|September|October|November|December)\s+(\d{1,2}),?\s+(\d{4})', # Month DD, YYYY
r'(\d{1,2})\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+(\d{4})', # DD Mon YYYY
]
for pattern in date_patterns:
match = re.search(pattern, title)
if match:
try:
# Parse based on pattern type
if 'January' in pattern or 'February' in pattern: # Month name pattern
month_str = match.group(1)
day = int(match.group(2))
year = int(match.group(3))
month_map = {
'January': 1, 'February': 2, 'March': 3, 'April': 4,
'May': 5, 'June': 6, 'July': 7, 'August': 8,
'September': 9, 'October': 10, 'November': 11, 'December': 12
}
month = month_map[month_str]
return datetime(year, month, day)
elif match.group(1).isdigit():
groups = [int(g) for g in match.groups() if g.isdigit()]
if len(groups) == 3:
# Determine format based on values
if groups[0] > 31: # YYYY-MM-DD
return datetime(groups[0], groups[1], groups[2])
elif groups[2] > 31: # DD-MM-YYYY or MM-DD-YYYY
# Assume DD.MM.YYYY for European format
if '.' in title:
return datetime(groups[2], groups[1], groups[0])
else: # Assume MM/DD/YYYY for US format
return datetime(groups[2], groups[0], groups[1])
except Exception:
pass
# Fallback to post date if available
if post.get('date'):
try:
# Parse various date formats
date_str = post['date']
if isinstance(date_str, str):
# Try ISO format first
if 'T' in date_str:
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
# Try other formats
for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d.%m.%Y']:
try:
return datetime.strptime(date_str, fmt)
except Exception:
continue
except Exception:
pass
return None
def _extract_pixhost_direct_url(self, show_url: str) -> Optional[str]:
"""Extract direct image URL from pixhost show URL
Based on the working pixhost_fetch.py script that probes imgNN.pixhost.to hosts
"""
import re
# Extract dir_id and filename from show URL
show_pattern = re.compile(r"https?://(?:www\.)?pixhost\.to/show/(\d+)/([^/]+)$", re.IGNORECASE)
match = show_pattern.match(show_url)
if not match:
return None
dir_id, filename = match.group(1), match.group(2)
# Try common hosts (img1-120.pixhost.to)
# Start with commonly used hosts
common_hosts = [1, 2, 3, 4, 5, 10, 15, 20, 25, 30, 40, 50, 60, 70, 80, 90, 100]
for host_num in common_hosts:
# Try different extensions
base, dot, ext = filename.rpartition(".")
extensions = [filename] # Try original first
if dot:
# Try common image extensions
for alt_ext in ["jpg", "jpeg", "png", "webp", "gif"]:
if alt_ext.lower() != ext.lower():
extensions.append(f"{base}.{alt_ext}")
for fname in extensions:
direct_url = f"https://img{host_num}.pixhost.to/images/{dir_id}/{fname}"
# Quick check with HEAD request
try:
response = requests.head(direct_url, timeout=2, allow_redirects=True,
headers={"User-Agent": self.user_agent})
if response.status_code == 200:
content_type = response.headers.get('Content-Type', '')
# Check if it's an image
if 'image' in content_type and 'removed.png' not in response.url:
self.log(f"Found pixhost image on img{host_num}", "debug")
return direct_url
except Exception:
continue
# If common hosts fail, return None and let regular download handle it
return None
def _get_image_filename(self, url: str, post_date: datetime = None) -> str:
"""Generate filename for image with optional date/time prefix
Args:
url: Image URL
post_date: Optional datetime to prefix filename (format: YYYYMMDD_HHMMSS_)
Returns:
Filename like "20251215_195700_3.jpg" if post_date provided, else "3.jpg"
"""
# Try to get original filename
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
if not filename or '.' not in filename:
# Generate from URL hash
ext = '.jpg' # Default extension
if '.png' in url.lower():
ext = '.png'
elif '.gif' in url.lower():
ext = '.gif'
elif '.webp' in url.lower():
ext = '.webp'
filename = hashlib.sha256(url.encode()).hexdigest() + ext
# Add date/time prefix if provided (makes filenames unique across downloads)
if post_date:
date_prefix = post_date.strftime('%Y%m%d_%H%M%S_')
filename = date_prefix + filename
return filename
def download_forum_section(self,
section_url: str,
forum_name: str,
max_pages: int = 10,
max_threads: int = None,
username: str = None,
password: str = None) -> Dict:
"""
Download all threads from a forum section/category
Args:
section_url: URL of the forum section
forum_name: Name of the forum
max_pages: Maximum pages to scan
max_threads: Maximum threads to download
username: Login username (optional)
password: Login password (optional)
Returns:
Dictionary with download results
"""
self.log(f"Downloading forum section: {section_url}", "info")
results = {
'threads_found': 0,
'threads_downloaded': 0,
'errors': 0,
'thread_urls': []
}
try:
# Run in thread to avoid event loop conflicts
def run_section_download():
with sync_playwright() as p:
browser = p.chromium.launch(
headless=self.headless,
executable_path='/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome' if os.path.exists('/opt/media-downloader/.playwright/chromium-1187/chrome-linux/chrome') else None
)
context = self._create_browser_context(browser)
# Handle authentication - try cookies first
if forum_name:
# Always try to load existing cookies first
if self.authenticator.load_cookies(context, forum_name):
self.logged_in_forums[forum_name] = True
self.log(f"Loaded saved cookies for {forum_name}", "debug")
# Only login if we have credentials and no valid cookies
elif username and password and forum_name not in self.logged_in_forums:
temp_page = context.new_page()
if self.authenticator.auto_login(temp_page, username, password, section_url):
self.authenticator.save_cookies(temp_page, forum_name)
self.logged_in_forums[forum_name] = True
self.log(f"Logged in to {forum_name}", "success")
temp_page.close()
page = context.new_page()
# Detect forum type
page.goto(section_url)
forum_type = self.authenticator.detect_forum_type(page)
thread_urls = []
# Extract thread URLs based on forum type
for page_num in range(1, max_pages + 1):
if page_num > 1:
# Navigate to next page (forum-specific)
next_url = self._get_next_page_url(section_url, page_num, forum_type)
if next_url:
page.goto(next_url)
else:
break
# Extract thread links based on forum type
if forum_type == ForumType.XENOFORO:
links = page.locator('h3.contentRow-title a, .structItem-title a').all()
elif forum_type == ForumType.VBULLETIN:
links = page.locator('a.title, .threadtitle a').all()
elif forum_type == ForumType.PHPBB:
links = page.locator('a.topictitle, .topic-title a').all()
elif forum_type == ForumType.DISCOURSE:
links = page.locator('.topic-list-item a.title').all()
elif forum_type == ForumType.INVISION:
links = page.locator('.ipsDataItem_title a, h4.ipsType_large a').all()
elif forum_type == ForumType.MYBB:
links = page.locator('.subject_new a, .subject_old a').all()
elif forum_type == ForumType.SMF:
links = page.locator('.subject a, span.preview a').all()
else:
# Generic fallback
links = page.locator('a[href*="thread"], a[href*="topic"], a[href*="/t/"]').all()
for link in links:
href = link.get_attribute('href')
if href:
full_url = urljoin(section_url, href)
if full_url not in thread_urls:
thread_urls.append(full_url)
if max_threads and len(thread_urls) >= max_threads:
break
if max_threads and len(thread_urls) >= max_threads:
break
self._apply_rate_limit()
browser.close()
results['threads_found'] = len(thread_urls)
results['thread_urls'] = thread_urls
return results
# nest_asyncio is already applied at module level
results = run_section_download()
# Download each thread
for i, thread_url in enumerate(results.get('thread_urls', []), 1):
self.log(f"Downloading thread {i}/{len(results['thread_urls'])}: {thread_url}", "info")
try:
thread_result = self.download_thread(
thread_url,
forum_name=forum_name,
username=username,
password=password
)
if thread_result.get('status') == 'success':
results['threads_downloaded'] += 1
except Exception as e:
self.log(f"Error downloading thread: {e}", "error")
results['errors'] += 1
self._apply_rate_limit()
except Exception as e:
self.log(f"Error downloading forum section: {e}", "error")
results['errors'] += 1
return results
def _get_next_page_url(self, base_url: str, page_num: int, forum_type: ForumType) -> Optional[str]:
"""Generate next page URL based on forum type"""
if forum_type == ForumType.XENOFORO:
return f"{base_url}?page={page_num}"
elif forum_type == ForumType.VBULLETIN:
return f"{base_url}?page={page_num}"
elif forum_type == ForumType.PHPBB:
return f"{base_url}&start={(page_num-1)*25}" # Usually 25 topics per page
elif forum_type == ForumType.DISCOURSE:
return f"{base_url}?page={page_num}"
elif forum_type == ForumType.INVISION:
return f"{base_url}?page={page_num}"
elif forum_type == ForumType.MYBB:
return f"{base_url}?page={page_num}"
elif forum_type == ForumType.SMF:
return f"{base_url}.{(page_num-1)*20}" # Usually 20 topics per page
return None
def get_statistics(self) -> Dict:
"""Get downloader statistics"""
stats = self.stats.copy()
if self.use_database:
conn = self._get_db_connection()
cursor = conn.cursor()
# Get database stats
cursor.execute("SELECT COUNT(*) FROM threads")
stats['total_threads'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM posts")
stats['total_posts'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM images WHERE downloaded = TRUE")
stats['total_images'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM searches WHERE active = TRUE")
stats['active_searches'] = cursor.fetchone()[0]
cursor.execute(
"SELECT COUNT(*) FROM threads WHERE status = 'active' AND monitor_until > ?",
(datetime.now().isoformat(),)
)
stats['monitored_threads'] = cursor.fetchone()[0]
conn.close()
return stats
# Example usage
if __name__ == "__main__":
from pathlib import Path
# Use proper database path (in-memory for standalone testing)
downloader = ForumDownloader(
headless=True,
show_progress=True,
use_database=False, # Disable DB for standalone testing
db_path=None,
download_dir=str(Path(__file__).parent.parent / "forum_downloads")
)
# Example: Login to forums (supports XenForo, vBulletin, phpBB, Discourse, Invision, MyBB, SMF)
# The login method will auto-detect the forum type
downloader.login(
forum_name="MyForum",
username="your_username",
password="your_password",
forum_url="https://forum.example.com"
)
# Example: Monitor a search with authentication
downloader.monitor_search(
forum_name="MyForum",
search_query="interesting topic",
search_url="https://forum.example.com/search?q=interesting+topic",
check_frequency_hours=6,
auto_track_days=30,
username="your_username", # Optional if already logged in
password="your_password" # Optional if already logged in
)
# Example: Download a thread with authentication
downloader.download_thread(
thread_url="https://forum.example.com/threads/12345",
forum_name="MyForum",
download_images=True,
username="your_username", # Optional if already logged in
password="your_password" # Optional if already logged in
)
# Example: Download from private/members-only section
# Authentication is required for these
private_thread = downloader.download_thread(
thread_url="https://forum.example.com/private/threads/67890",
forum_name="MyForum",
download_images=True,
username="your_username",
password="your_password"
)
# Example: Update all monitored threads
downloader.update_monitored_threads()
# Show statistics
stats = downloader.get_statistics()
forum_logger.info("Statistics:")
for key, value in stats.items():
forum_logger.info(f" {key}: {value}")
# Supported forum types:
# - XenForo (1.x and 2.x)
# - vBulletin (3.x, 4.x, 5.x)
# - phpBB (all versions)
# - Discourse
# - Invision Power Board (IPB)
# - MyBB
# - Simple Machines Forum (SMF)
# The module will automatically detect and handle each forum type