#!/usr/bin/env python3 """ Subprocess wrapper for forum operations to avoid asyncio event loop conflicts This runs forum operations in a completely isolated subprocess """ import sys import json from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from wrappers.base_subprocess_wrapper import ( setup_signal_handlers, stderr_log, set_database_reference ) # Setup signal handlers early for graceful termination setup_signal_handlers("Forums") def run_forum_download(config): """Run forum download in isolated subprocess""" # Redirect stdout to stderr to prevent library output from polluting JSON result # (InsightFace/ONNX prints status messages to stdout) original_stdout = sys.stdout sys.stdout = sys.stderr from modules.forum_downloader import ForumDownloader from modules.unified_database import UnifiedDatabase from modules.forum_db_adapter import ForumDatabaseAdapter from modules.move_module import MoveManager from modules.monitor_wrapper import log_download_result # Create unified database (use db_path from config like other wrappers) db_path = config.get('db_path', '/opt/media-downloader/database/media_downloader.db') unified_db = UnifiedDatabase(db_path, use_pool=False) set_database_reference(unified_db) # For graceful cleanup on SIGTERM # Track all moved files across threads for notification all_final_files = [] all_review_files = [] try: # Validate required config parameters required_keys = ['download_dir', 'forum_name', 'forum_config', 'temp_dir', 'dest_dir'] missing_keys = [key for key in required_keys if key not in config] if missing_keys: return {'status': 'error', 'message': f'Missing required config keys: {missing_keys}', 'count': 0} forum_db_adapter = ForumDatabaseAdapter(unified_db) # Create ForumDownloader - uses its own universal logger now forum_module = ForumDownloader( headless=config.get('headless', True), show_progress=True, use_database=True, db_path=forum_db_adapter, download_dir=config['download_dir'], log_callback=None # Module uses universal logger ) # Create MoveManager - uses its own universal logger now move_manager = MoveManager( log_callback=None, # Module uses universal logger notifier=None, # Notifications handled by main process unified_db=unified_db, face_recognition_enabled=True # Enable face recognition for forum files ) forum_name = config['forum_name'] forum_config = config['forum_config'] temp_dir = Path(config['temp_dir']) dest_dir = Path(config['dest_dir']) # Create directories temp_dir.mkdir(parents=True, exist_ok=True) dest_dir.mkdir(parents=True, exist_ok=True) # Login if credentials provided if forum_config.get('username') and forum_config.get('password'): success = forum_module.login( forum_name=forum_name, username=forum_config['username'], password=forum_config['password'], forum_url=forum_config.get('forum_url'), forum_type=forum_config.get('forum_type'), cloudflare_enabled=forum_config.get('cloudflare_enabled', False) ) if not success: return {'status': 'error', 'message': 'Login failed', 'count': 0} # Use monitor_search to find threads search_result = forum_module.monitor_search( forum_name=forum_name, search_query=forum_config.get('search_query', ''), search_url=forum_config.get('search_url'), forum_url=forum_config.get('forum_url'), auto_track_days=forum_config.get('auto_track_days', 30), base_download_path=str(temp_dir), destination_path=str(dest_dir), username=forum_config.get('username'), password=forum_config.get('password'), newer_than_days=forum_config.get('newer_than_days', 3), older_than_days=forum_config.get('older_than_days'), external_only=forum_config.get('external_only', True), check_frequency_hours=0, cloudflare_enabled=forum_config.get('cloudflare_enabled', False) ) # Download the threads that were found total_images = 0 new_threads = search_result.get('new_threads', 0) total_results = search_result.get('total_results', 0) if search_result.get('status') == 'success' and search_result.get('results'): stderr_log(f"Downloading {len(search_result['results'])} new threads...") for thread_result in search_result['results']: thread_url = thread_result.get('url') if thread_url: # Download thread in a separate thread to avoid Playwright async conflict # Playwright sync API doesn't work inside asyncio loops import concurrent.futures def _download_in_thread(): return forum_module.download_thread( thread_url=thread_url, forum_name=forum_name, download_images=True, base_download_path=str(temp_dir), destination_path=str(dest_dir), username=forum_config.get('username'), password=forum_config.get('password'), external_only=forum_config.get('external_only', True), skip_file_move=True, # Keep files in temp for move_manager cloudflare_enabled=forum_config.get('cloudflare_enabled', False), defer_database=True, # Defer recording until after file move auto_track_days=forum_config.get('auto_track_days', 30) ) with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_download_in_thread) download_result = future.result() if download_result.get('status') == 'success': images_downloaded = download_result.get('images_downloaded', 0) # Use move_manager to process files (with face recognition) if images_downloaded > 0: thread_temp_dir = download_result.get('thread_dir') final_dir = download_result.get('final_dir') if thread_temp_dir and final_dir: stderr_log(f"Processing {images_downloaded} images with face recognition...") # Start batch context for proper database tracking thread_title = thread_result.get('title', 'Unknown Thread') move_manager.start_batch( platform='forums', source=forum_name, content_type='image', search_term=thread_title ) # Move files with face recognition filtering # preserve_if_no_timestamp=True preserves the timestamps already set on files stats = move_manager.move_files_batch( source_dir=thread_temp_dir, dest_dir=final_dir, file_timestamps=None, extensions=['.jpg', '.jpeg', '.png', '.gif', '.webp', '.mp4', '.webm'], preserve_if_no_timestamp=True ) # Capture file lists BEFORE end_batch() clears them # These will be returned for notification with correct paths for f in move_manager.moved_files: all_final_files.append({ 'file_path': f.get('file_path'), 'filename': f.get('filename'), 'content_type': f.get('content_type') or 'image' }) for f in move_manager.review_queue_files: all_review_files.append({ 'file_path': f.get('file_path'), 'filename': f.get('filename'), 'content_type': f.get('content_type') or 'image' }) # End batch to finalize database records move_manager.end_batch() # Record pending downloads after successful move # This ensures database records only exist for files that made it to final destination pending_downloads = forum_module.get_pending_downloads() if pending_downloads and stats.get('moved', 0) > 0: for download in pending_downloads: try: # Update file_path to final destination filename = download.get('filename') if filename: # Sanitize filename to prevent path traversal safe_filename = Path(filename).name # Strip any directory components actual_path = Path(final_dir) / safe_filename # Validate path is within final_dir (defense in depth) try: actual_path.resolve().relative_to(Path(final_dir).resolve()) except ValueError: stderr_log(f"Path traversal attempt blocked: {filename}", "warning") continue if actual_path.exists(): forum_db_adapter.record_download( url=download.get('url'), thread_id=download.get('thread_id'), post_id=download.get('post_id'), filename=safe_filename, metadata=download.get('metadata'), file_path=str(actual_path), post_date=download.get('post_date') ) except Exception as e: stderr_log(f"Failed to record pending download: {e}") forum_module.clear_pending_downloads() stderr_log(f"Recorded {len(pending_downloads)} downloads to database after move") # Update total with files that went to FINAL destination only (not review queue) # stats['moved'] includes ALL moves, stats['review_queue'] tracks review queue moves actual_final_count = stats.get('moved', 0) - stats.get('review_queue', 0) total_images += actual_final_count stderr_log(f"Moved {actual_final_count} images to final destination, {stats.get('review_queue', 0)} to review queue") else: # Fallback if dirs not provided total_images += images_downloaded stderr_log(f"Downloaded {images_downloaded} images from thread: {thread_result.get('title', 'Unknown')[:60]}...") else: stderr_log(f"No new images in thread: {thread_result.get('title', 'Unknown')[:60]}...") else: stderr_log(f"Failed to download thread: {thread_result.get('title', 'Unknown')}") # Cleanup forum_module.cleanup() # Log to monitor (success) forum_name = config['forum_name'] log_download_result('forums', forum_name, total_images, error=None) # Return result with file lists for notification return { 'status': 'success', 'count': total_images, 'new_threads': new_threads, 'total_results': total_results, 'final_files': all_final_files, # Files that went to final destination 'review_files': all_review_files # Files that went to review queue } except Exception as e: forum_name = config.get('forum_name', 'unknown') stderr_log(f"Forum download error: {e}", "error") import traceback stderr_log(traceback.format_exc(), "error") # Log failure to monitor log_download_result('forums', forum_name, 0, error=str(e)) return { 'status': 'error', 'message': str(e), 'count': 0 } finally: # Restore stdout before returning (so JSON output goes to real stdout) sys.stdout = original_stdout # Explicitly close database connection before subprocess exits if unified_db: try: unified_db.close() stderr_log("Database connection closed") except Exception as e: stderr_log(f"Error closing database: {e}") if __name__ == '__main__': try: # Read config from stdin config_json = sys.stdin.read() config = json.loads(config_json) # Run forum download result = run_forum_download(config) except Exception as e: # Ensure we always output valid JSON even on early errors import traceback stderr_log(f"Fatal subprocess error: {e}") stderr_log(traceback.format_exc()) result = { 'status': 'error', 'message': f'Subprocess error: {str(e)}', 'count': 0 } # Output result as JSON - ensure flushing before exit stderr_log(f"Subprocess complete, outputting JSON result: status={result.get('status')}, count={result.get('count')}") print(json.dumps(result), flush=True) sys.stdout.flush() sys.exit(0 if result['status'] == 'success' else 1)