5118 lines
244 KiB
Python
Executable File
5118 lines
244 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Media Downloader - Complete unified media downloading system
|
|
Integrates all functionality: Instagram (InstaLoader + FastDL), TikTok, Forums with batch downloads,
|
|
Download Manager, EXIF timestamps, ImageBam support, and gallery-dl fallback
|
|
"""
|
|
|
|
# IMPORTANT: Apply nest_asyncio FIRST before any other imports that might create event loops
|
|
# This allows Playwright sync API to work inside asyncio contexts
|
|
try:
|
|
import nest_asyncio
|
|
nest_asyncio.apply()
|
|
except ImportError:
|
|
pass
|
|
|
|
# Suppress pkg_resources deprecation warning from face_recognition_models
|
|
import warnings
|
|
warnings.filterwarnings("ignore", message=".*pkg_resources is deprecated.*")
|
|
|
|
import os
|
|
import sys
|
|
|
|
# Bootstrap database backend (must be before any sqlite3 imports)
|
|
sys.path.insert(0, str(__import__('pathlib').Path(__file__).parent))
|
|
import modules.db_bootstrap # noqa: E402,F401
|
|
import json
|
|
import sqlite3
|
|
import logging
|
|
import argparse
|
|
import time
|
|
import subprocess
|
|
import random
|
|
import gc
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any, Set, Tuple
|
|
import requests
|
|
from dataclasses import dataclass
|
|
|
|
# Configure sqlite3 datetime adapter
|
|
sqlite3.register_adapter(datetime, lambda d: d.isoformat())
|
|
sqlite3.register_converter("datetime", lambda s: datetime.fromisoformat(s.decode()))
|
|
|
|
# Add modules directory to path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
sys.path.insert(0, str(Path(__file__).parent / 'modules'))
|
|
|
|
# ============================================================================
|
|
# MODULE IMPORTS
|
|
# ============================================================================
|
|
|
|
try:
|
|
from modules.instaloader_module import InstaLoaderModule as InstaLoaderDownloader
|
|
from modules.fastdl_module import FastDLDownloader
|
|
from modules.imginn_module import ImgInnDownloader
|
|
from modules.imginn_api_module import ImgInnAPIDownloader
|
|
from modules.instagram_client_module import InstagramClientDownloader
|
|
from modules.toolzu_module import ToolzuDownloader
|
|
from modules.snapchat_scraper import SnapchatDirectScraper
|
|
from modules.snapchat_client_module import SnapchatClientDownloader
|
|
from modules.tiktok_module import TikTokDownloader
|
|
from modules.forum_downloader import ForumDownloader
|
|
from modules.coppermine_module import CoppermineDownloader
|
|
from modules.download_manager import DownloadManager, DownloadItem
|
|
from modules.settings_manager import SettingsManager
|
|
from modules.date_utils import DateHandler, extract_date, update_timestamps
|
|
from modules.move_module import MoveManager
|
|
from modules.unified_database import UnifiedDatabase
|
|
from modules.universal_logger import get_logger
|
|
from modules.forum_db_adapter import ForumDatabaseAdapter
|
|
from modules.pushover_notifier import PushoverNotifier, create_notifier_from_config
|
|
from modules.service_health_monitor import ServiceHealthMonitor
|
|
from modules.dependency_updater import DependencyUpdater
|
|
from modules.downloader_monitor import get_monitor
|
|
from modules.activity_status import get_activity_manager
|
|
except ImportError as e:
|
|
print(f"Error importing modules: {e}")
|
|
print("Installing missing dependencies...")
|
|
subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"])
|
|
sys.exit(1)
|
|
|
|
# The UnifiedDatabase is now imported from modules.unified_database
|
|
|
|
# Keeping a minimal legacy adapter for backward compatibility
|
|
class DatabaseAdapter:
|
|
"""Minimal adapter for backward compatibility with existing code"""
|
|
|
|
def __init__(self, unified_db):
|
|
self.unified_db = unified_db
|
|
self.db_path = unified_db.db_path
|
|
|
|
def add_download(self, platform: str, media_id: str, **kwargs):
|
|
"""Add a download record"""
|
|
return self.unified_db.record_download(
|
|
url=kwargs.get('url', media_id),
|
|
platform=platform,
|
|
source=kwargs.get('username', ''),
|
|
content_type=kwargs.get('content_type'),
|
|
filename=kwargs.get('filepath'),
|
|
file_path=kwargs.get('filepath'),
|
|
file_size=kwargs.get('file_size'),
|
|
file_hash=kwargs.get('file_hash'),
|
|
post_date=kwargs.get('post_date'),
|
|
metadata=kwargs.get('metadata', {})
|
|
)
|
|
|
|
def is_downloaded(self, platform: str, media_id: str) -> bool:
|
|
"""Check if media was already downloaded"""
|
|
return self.unified_db.is_downloaded(media_id, platform=platform)
|
|
|
|
def add_to_queue(self, url: str, source: str, referer: str = None, priority: int = 0):
|
|
"""Add URL to download queue"""
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO download_queue (url, source, referer, priority)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (url, source, referer, priority))
|
|
|
|
conn.commit()
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def get_pending_downloads(self, source: str = None, limit: int = 100) -> List[Dict]:
|
|
"""Get pending downloads from queue"""
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES)
|
|
cursor = conn.cursor()
|
|
|
|
if source:
|
|
cursor.execute('''
|
|
SELECT id, url, referer, source, priority
|
|
FROM download_queue
|
|
WHERE status = 'pending' AND source = ?
|
|
ORDER BY priority DESC, id ASC
|
|
LIMIT ?
|
|
''', (source, limit))
|
|
else:
|
|
cursor.execute('''
|
|
SELECT id, url, referer, source, priority
|
|
FROM download_queue
|
|
WHERE status = 'pending'
|
|
ORDER BY priority DESC, id ASC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
return [
|
|
{'id': r[0], 'url': r[1], 'referer': r[2], 'source': r[3], 'priority': r[4]}
|
|
for r in rows
|
|
]
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def update_queue_status(self, queue_id: int, status: str, error: str = None):
|
|
"""Update download queue item status"""
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES)
|
|
cursor = conn.cursor()
|
|
|
|
if status == 'completed':
|
|
cursor.execute('''
|
|
UPDATE download_queue
|
|
SET status = ?, completed_date = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
''', (status, queue_id))
|
|
elif status == 'failed':
|
|
cursor.execute('''
|
|
UPDATE download_queue
|
|
SET status = ?, attempts = attempts + 1, error = ?
|
|
WHERE id = ?
|
|
''', (status, error, queue_id))
|
|
else:
|
|
cursor.execute('''
|
|
UPDATE download_queue
|
|
SET status = ?
|
|
WHERE id = ?
|
|
''', (status, queue_id))
|
|
|
|
conn.commit()
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def upsert_file_inventory(self, file_path: str, filename: str, platform: str,
|
|
source: str = None, content_type: str = None,
|
|
file_size: int = None, location: str = None,
|
|
method: str = None):
|
|
"""Wrapper for unified_db.upsert_file_inventory"""
|
|
return self.unified_db.upsert_file_inventory(
|
|
file_path=file_path,
|
|
filename=filename,
|
|
platform=platform,
|
|
source=source,
|
|
content_type=content_type,
|
|
file_size=file_size,
|
|
location=location,
|
|
method=method
|
|
)
|
|
|
|
# ============================================================================
|
|
# LAZY MODULE DICTIONARY
|
|
# ============================================================================
|
|
|
|
class LazyModuleDict:
|
|
"""Dict-like container that defers module instantiation until first access.
|
|
|
|
Stores factory functions via register() and only calls them when the key
|
|
is first accessed via __getitem__. Supports release() to free a module
|
|
after use (it will be re-created on next access).
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._factories = {} # key -> callable that returns a module instance
|
|
self._instances = {} # key -> instantiated module (populated on first access)
|
|
self._direct = {} # key -> directly assigned value (e.g. 'pending' sentinel)
|
|
|
|
def register(self, key, factory):
|
|
"""Register a lazy factory for a module key."""
|
|
self._factories[key] = factory
|
|
|
|
def __contains__(self, key):
|
|
return key in self._factories or key in self._instances or key in self._direct
|
|
|
|
def __getitem__(self, key):
|
|
# Return existing instance
|
|
if key in self._instances:
|
|
return self._instances[key]
|
|
# Return directly-set value (e.g. 'pending')
|
|
if key in self._direct:
|
|
return self._direct[key]
|
|
# Instantiate from factory on first access
|
|
if key in self._factories:
|
|
self._instances[key] = self._factories[key]()
|
|
return self._instances[key]
|
|
raise KeyError(key)
|
|
|
|
def __setitem__(self, key, value):
|
|
"""Direct assignment (used for 'pending' sentinel or replacing a module)."""
|
|
# If there was a factory, the direct value overrides it
|
|
self._direct[key] = value
|
|
|
|
def __delitem__(self, key):
|
|
self._factories.pop(key, None)
|
|
self._instances.pop(key, None)
|
|
self._direct.pop(key, None)
|
|
|
|
def get(self, key, default=None):
|
|
try:
|
|
return self[key]
|
|
except KeyError:
|
|
return default
|
|
|
|
def keys(self):
|
|
return set(self._factories.keys()) | set(self._instances.keys()) | set(self._direct.keys())
|
|
|
|
def items(self):
|
|
return [(k, self[k]) for k in self.keys()]
|
|
|
|
def values(self):
|
|
return [self[k] for k in self.keys()]
|
|
|
|
def __iter__(self):
|
|
return iter(self.keys())
|
|
|
|
def __len__(self):
|
|
return len(self.keys())
|
|
|
|
def release(self, key):
|
|
"""Release an instantiated module to free memory.
|
|
The factory remains registered so the module will be re-created on next access."""
|
|
instance = self._instances.pop(key, None)
|
|
# Also remove from direct if it was overridden there
|
|
self._direct.pop(key, None)
|
|
if instance is not None:
|
|
# Try to clean up the module
|
|
if hasattr(instance, 'close'):
|
|
try:
|
|
instance.close()
|
|
except Exception:
|
|
pass
|
|
if hasattr(instance, 'cleanup'):
|
|
try:
|
|
instance.cleanup()
|
|
except Exception:
|
|
pass
|
|
del instance
|
|
|
|
# ============================================================================
|
|
# MEDIA DOWNLOADER ORCHESTRATOR
|
|
# ============================================================================
|
|
|
|
class MediaDownloader:
|
|
"""Main orchestrator for all download modules"""
|
|
|
|
def __init__(self, config_path: str = None, enable_notifications: bool = False, unified_db=None):
|
|
"""Initialize Media Downloader with configuration
|
|
|
|
Args:
|
|
config_path: Path to configuration file
|
|
enable_notifications: Enable push notifications (True for scheduler, False for manual runs)
|
|
unified_db: Optional existing UnifiedDatabase instance to share (avoids duplicate DB pools)
|
|
"""
|
|
|
|
# Setup paths
|
|
self.base_path = Path(__file__).parent
|
|
self.config_path = config_path # Store config_path for scheduler initialization
|
|
|
|
# Initialize database path (use same database as web API)
|
|
db_path = self.base_path / 'database' / 'media_downloader.db'
|
|
|
|
# Initialize settings manager from database
|
|
self.settings_manager = SettingsManager(str(db_path))
|
|
|
|
# Load configuration from database
|
|
self.config = self._load_config()
|
|
|
|
# Setup logging
|
|
self.logger = get_logger('Media_Downloader')
|
|
|
|
# Initialize unified database - reuse existing instance if provided to avoid duplicate pools
|
|
db_path = self.base_path / 'database' / 'media_downloader.db'
|
|
if unified_db is not None:
|
|
self.unified_db = unified_db
|
|
self.logger.debug("Reusing shared UnifiedDatabase instance", module="Core")
|
|
else:
|
|
self.unified_db = UnifiedDatabase(str(db_path), use_pool=True, pool_size=5)
|
|
# Create backward compatibility adapter
|
|
self.db = DatabaseAdapter(self.unified_db)
|
|
|
|
# Initialize Pushover notifier ONLY if notifications are enabled (scheduler mode)
|
|
self.notifier = None
|
|
if enable_notifications:
|
|
self.notifier = create_notifier_from_config(self.config, unified_db=self.unified_db)
|
|
if self.notifier:
|
|
self.logger.info("Pushover notifications enabled (scheduler mode)", module="Core")
|
|
else:
|
|
self.logger.debug("Pushover notifications disabled in config", module="Core")
|
|
else:
|
|
self.logger.debug("Pushover notifications disabled (manual mode)", module="Core")
|
|
|
|
# Initialize Service Health Monitor ONLY in scheduler mode
|
|
self.health_monitor = None
|
|
if enable_notifications:
|
|
health_config = self.config.get('service_monitoring', {})
|
|
# Get error monitoring settings for push alert delay
|
|
error_monitoring_config = self.settings_manager.get('error_monitoring', {}) if self.settings_manager else {}
|
|
self.health_monitor = ServiceHealthMonitor(
|
|
config=health_config,
|
|
error_monitoring_config=error_monitoring_config,
|
|
pushover_notifier=self.notifier,
|
|
scheduler_mode=True
|
|
)
|
|
if health_config.get('enabled', True):
|
|
self.logger.info("Service health monitoring enabled (scheduler mode)", module="Core")
|
|
else:
|
|
self.logger.debug("Service health monitoring disabled in config", module="Core")
|
|
else:
|
|
self.logger.debug("Service health monitoring disabled (manual mode)", module="Core")
|
|
|
|
# Initialize Dependency Updater ONLY in scheduler mode
|
|
self.dependency_updater = None
|
|
if enable_notifications:
|
|
# Check both config keys for backwards compatibility
|
|
update_config = self.config.get('dependency_updater', {}) or self.config.get('dependency_updates', {})
|
|
self.dependency_updater = DependencyUpdater(
|
|
config=update_config,
|
|
pushover_notifier=self.notifier,
|
|
scheduler_mode=True
|
|
)
|
|
if update_config.get('enabled', True):
|
|
self.logger.info("Dependency auto-updates enabled (scheduler mode)", module="Core")
|
|
else:
|
|
self.logger.debug("Dependency auto-updates disabled in config", module="Core")
|
|
else:
|
|
self.logger.debug("Dependency auto-updates disabled (manual mode)", module="Core")
|
|
|
|
# Initialize Move Manager with notifier and database for file hash deduplication
|
|
self.move_manager = MoveManager(
|
|
log_callback=self._log_callback,
|
|
notifier=self.notifier,
|
|
unified_db=self.unified_db
|
|
)
|
|
|
|
# Initialize downloader monitor
|
|
self.monitor = get_monitor(self.unified_db, self.settings_manager)
|
|
self.logger.debug("Downloader monitor initialized", module="Core")
|
|
|
|
# Initialize Download Manager for multi-threaded downloads
|
|
self.download_manager = DownloadManager(
|
|
max_workers=5,
|
|
rate_limit=0.5,
|
|
timeout=30,
|
|
use_database=False, # We handle DB ourselves
|
|
show_progress=True
|
|
)
|
|
|
|
# Set OMDB API key for date extraction (if available)
|
|
omdb_key = os.environ.get('OMDB_API_KEY')
|
|
if omdb_key:
|
|
DateHandler.set_omdb_api_key(omdb_key)
|
|
|
|
# Initialize modules
|
|
self.modules = LazyModuleDict()
|
|
self._init_modules()
|
|
|
|
# Initialize scheduler (for tracking manual runs)
|
|
self.scheduler = None
|
|
try:
|
|
from modules.scheduler import DownloadScheduler
|
|
self.scheduler = DownloadScheduler(
|
|
config_path=str(self.config_path),
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db,
|
|
settings_manager=self.settings_manager
|
|
)
|
|
# Load existing state but don't start the scheduler for manual runs
|
|
self.scheduler._load_state()
|
|
except Exception as e:
|
|
self.logger.debug(f"Scheduler not initialized for manual run: {e}", module="Core")
|
|
|
|
# Clean up any leftover temp files from previous runs
|
|
self.cleanup_all_temp_dirs()
|
|
|
|
self.logger.info("Media Downloader initialized successfully", module="Core")
|
|
|
|
def _load_config(self) -> Dict:
|
|
"""Load configuration from database settings"""
|
|
config = self.settings_manager.get_all()
|
|
if not config:
|
|
raise ValueError("No settings found in database. Please configure via web interface.")
|
|
return config
|
|
|
|
def _log_callback(self, *args):
|
|
"""Centralized logging callback for modules - handles multiple signatures
|
|
|
|
NOTE: With universal logging now integrated in all modules, this callback
|
|
is kept for backwards compatibility but does NOT re-log messages since
|
|
each module has its own logger that already handles file logging.
|
|
This prevents duplicate log entries.
|
|
"""
|
|
# Since all modules now use universal_logger, they handle their own logging
|
|
# We don't need to re-log here as that creates duplicate entries
|
|
# This callback is kept only for backwards compatibility
|
|
pass
|
|
|
|
def _run_subprocess_with_streaming_logs(self, wrapper_path: str, config: Dict, timeout: int = 600) -> Tuple[int, str]:
|
|
"""Run a subprocess wrapper with real-time log streaming
|
|
|
|
Args:
|
|
wrapper_path: Path to the subprocess wrapper script
|
|
config: Configuration dictionary to pass to subprocess
|
|
timeout: Timeout in seconds
|
|
|
|
Returns:
|
|
Tuple of (return_code, stdout_output)
|
|
"""
|
|
venv_python = self.base_path / 'venv' / 'bin' / 'python'
|
|
|
|
# Prepare environment with Playwright browser path
|
|
import os
|
|
env = os.environ.copy()
|
|
env['PLAYWRIGHT_BROWSERS_PATH'] = '/root/.cache/ms-playwright'
|
|
|
|
# Run subprocess with real-time log streaming
|
|
# start_new_session=True creates a new process group so we can kill the
|
|
# entire tree (including Playwright browser processes) on shutdown
|
|
process = subprocess.Popen(
|
|
[str(venv_python), wrapper_path],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1, # Line buffered
|
|
env=env,
|
|
start_new_session=True
|
|
)
|
|
|
|
# Use communicate() to send config via stdin and read stdout/stderr.
|
|
# communicate() handles writing, closing stdin, and reading pipes
|
|
# in one call, avoiding I/O deadlock and double-close issues.
|
|
import sys
|
|
try:
|
|
stdout, stderr = process.communicate(input=json.dumps(config), timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
# Kill entire process group (subprocess + any browser children)
|
|
import os, signal as _signal
|
|
try:
|
|
os.killpg(process.pid, _signal.SIGKILL)
|
|
except OSError:
|
|
process.kill()
|
|
process.communicate() # Drain pipes after kill
|
|
raise
|
|
|
|
# Forward stderr to our own stderr for real-time monitoring
|
|
if stderr:
|
|
for line in stderr.splitlines():
|
|
if line.strip():
|
|
print(line.rstrip(), file=sys.stderr, flush=True)
|
|
|
|
return process.returncode, stdout
|
|
|
|
def _init_modules(self):
|
|
"""Register all download modules as lazy factories (instantiated on first access)"""
|
|
|
|
# Instagram - InstaLoader
|
|
if self.config.get('instagram', {}).get('enabled'):
|
|
ig_config = self.config.get('instagram', {})
|
|
if ig_config.get('method') == 'instaloader':
|
|
self.modules.register('instagram', lambda: InstaLoaderDownloader(
|
|
username=ig_config.get('username'),
|
|
password=ig_config.get('password'),
|
|
session_file=ig_config.get('session_file'),
|
|
totp_secret=ig_config.get('totp_secret'),
|
|
use_database=True,
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db,
|
|
require_valid_session=ig_config.get('require_valid_session', True)
|
|
))
|
|
self.logger.info("InstaLoader module registered (lazy)", module="Instagram")
|
|
self.logger.debug(f"InstaLoader config: session_file={ig_config.get('session_file')}, require_valid_session={ig_config.get('require_valid_session', True)}", module="Instagram")
|
|
|
|
# Instagram - FastDL (as fallback or alternative)
|
|
if self.config.get('fastdl', {}).get('enabled'):
|
|
fastdl_config = self.config.get('fastdl', {})
|
|
high_res_enabled = fastdl_config.get('high_res', False)
|
|
self.modules.register('fastdl', lambda: FastDLDownloader(
|
|
headless=True,
|
|
show_progress=True,
|
|
use_database=True,
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db,
|
|
high_res=high_res_enabled
|
|
))
|
|
self.logger.info("FastDL module registered (lazy)", module="Instagram")
|
|
self.logger.debug(f"FastDL config: headless=True, show_progress=True, use_database=True, high_res={high_res_enabled}", module="Instagram")
|
|
|
|
# ImgInn
|
|
if self.config.get('imginn', {}).get('enabled'):
|
|
imginn_config = self.config.get('imginn', {})
|
|
self.modules.register('imginn', lambda: ImgInnDownloader(
|
|
cookie_file=imginn_config.get('cookie_file', '/opt/media-downloader/cookies/imginn_cookies.json'),
|
|
headless=False,
|
|
show_progress=True,
|
|
use_database=True,
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db
|
|
))
|
|
self.logger.info("ImgInn module registered (lazy)", module="Instagram")
|
|
self.logger.debug(f"ImgInn config: cookie_file={imginn_config.get('cookie_file')}", module="Instagram")
|
|
|
|
# ImgInn API (curl_cffi-based, no Playwright)
|
|
if self.config.get('imginn_api', {}).get('enabled'):
|
|
self.modules.register('imginn_api', lambda: ImgInnAPIDownloader(
|
|
headless=True,
|
|
show_progress=True,
|
|
use_database=True,
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db
|
|
))
|
|
self.logger.info("ImgInn API module registered (lazy)", module="Instagram")
|
|
|
|
# Instagram Client (direct API)
|
|
if self.config.get('instagram_client', {}).get('enabled'):
|
|
self.modules.register('instagram_client', lambda: InstagramClientDownloader(
|
|
show_progress=True,
|
|
use_database=True,
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db
|
|
))
|
|
self.logger.info("Instagram Client (API) module registered (lazy)", module="Instagram")
|
|
|
|
# Toolzu (1920x1440 resolution Instagram downloader)
|
|
if self.config.get('toolzu', {}).get('enabled'):
|
|
toolzu_config = self.config.get('toolzu', {})
|
|
self.modules.register('toolzu', lambda: ToolzuDownloader(
|
|
headless=False,
|
|
show_progress=True,
|
|
use_database=True,
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db,
|
|
cookie_file=toolzu_config.get('cookie_file', '/opt/media-downloader/cookies/toolzu_cookies.json')
|
|
))
|
|
self.logger.info("Toolzu module registered (lazy)", module="Toolzu")
|
|
self.logger.debug(f"Toolzu config: cookies={bool(toolzu_config.get('cookie_file'))}", module="Toolzu")
|
|
|
|
# Snapchat (Direct Scraper)
|
|
if self.config.get('snapchat', {}).get('enabled'):
|
|
self.modules.register('snapchat', lambda: SnapchatDirectScraper(
|
|
headless=False,
|
|
show_progress=True,
|
|
use_database=True,
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db
|
|
))
|
|
self.logger.info("Snapchat Direct Scraper registered (lazy)", module="Snapchat")
|
|
|
|
# Snapchat Client (direct API, no Playwright)
|
|
if self.config.get('snapchat_client', {}).get('enabled'):
|
|
self.modules.register('snapchat_client', lambda: SnapchatClientDownloader(
|
|
show_progress=True,
|
|
use_database=True,
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db
|
|
))
|
|
self.logger.info("Snapchat Client (API) module registered (lazy)", module="Snapchat")
|
|
|
|
# TikTok
|
|
if self.config.get('tiktok', {}).get('enabled'):
|
|
self.modules.register('tiktok', lambda: TikTokDownloader(
|
|
log_callback=self._log_callback,
|
|
unified_db=self.unified_db
|
|
))
|
|
self.logger.info("TikTok module registered (lazy)", module="TikTok")
|
|
self.logger.debug("TikTok module configured with unified database", module="TikTok")
|
|
|
|
# Forums - store config but don't initialize yet
|
|
if self.config.get('forums', {}).get('enabled'):
|
|
# Store forum configurations for later initialization
|
|
self.forum_configs = {}
|
|
|
|
# Support new format with configs array
|
|
if 'configs' in self.config['forums']:
|
|
for forum_config in self.config['forums']['configs']:
|
|
forum_name = forum_config.get('name')
|
|
if forum_name and forum_config.get('enabled'):
|
|
self.forum_configs[forum_name] = forum_config
|
|
self.logger.info(f"Forum configured: {forum_name}", module="Forum")
|
|
else:
|
|
# Support old format (backwards compatibility)
|
|
for forum_name, forum_config in self.config['forums'].items():
|
|
if forum_name not in ['enabled', 'configs'] and isinstance(forum_config, dict) and forum_config.get('enabled'):
|
|
self.forum_configs[forum_name] = forum_config
|
|
self.logger.info(f"Forum configured: {forum_name}", module="Forum")
|
|
|
|
# Mark forums as available but not initialized
|
|
if self.forum_configs:
|
|
self.modules['forums'] = 'pending' # Will initialize when needed
|
|
|
|
# Coppermine galleries - log configuration
|
|
if self.config.get('coppermine', {}).get('enabled'):
|
|
galleries = self.config.get('coppermine', {}).get('galleries', [])
|
|
for gallery in galleries:
|
|
if gallery.get('enabled', True): # Default to enabled if not specified
|
|
gallery_name = gallery.get('name', 'Unnamed')
|
|
self.logger.info(f"Coppermine gallery configured: {gallery_name}", module="Coppermine")
|
|
if galleries:
|
|
self.modules['coppermine'] = 'pending' # Will initialize when needed
|
|
|
|
def _update_scheduler_for_manual_run(self, platform: str, username: str = None):
|
|
"""Update scheduler next_run time when a manual run happens (thread-safe)"""
|
|
if not self.scheduler:
|
|
return
|
|
|
|
# Build task_id to match scheduler format
|
|
if username:
|
|
task_id = f"{platform}:{username}"
|
|
else:
|
|
task_id = platform
|
|
|
|
# Get the interval from config first (read-only, no lock needed)
|
|
interval_hours = 6 # Default
|
|
|
|
if platform == 'instagram':
|
|
# Check for per-account interval
|
|
accounts = self.config.get('instagram', {}).get('accounts', [])
|
|
for account in accounts:
|
|
if account.get('username') == username:
|
|
interval_hours = account.get('check_interval_hours', 6)
|
|
break
|
|
else:
|
|
# Fallback to global setting
|
|
interval_hours = self.config.get('instagram', {}).get('check_interval_hours', 6)
|
|
|
|
elif platform == 'fastdl':
|
|
# FastDL uses global interval
|
|
interval_hours = self.config.get('fastdl', {}).get('check_interval_hours', 6)
|
|
|
|
elif platform == 'imginn':
|
|
# ImgInn uses global interval (both regular and phrase search users)
|
|
interval_hours = self.config.get('imginn', {}).get('check_interval_hours', 6)
|
|
|
|
elif platform == 'imginn_api':
|
|
# ImgInn API uses global interval
|
|
interval_hours = self.config.get('imginn_api', {}).get('check_interval_hours', 6)
|
|
|
|
elif platform == 'toolzu':
|
|
# Toolzu uses global interval
|
|
interval_hours = self.config.get('toolzu', {}).get('check_interval_hours', 4)
|
|
|
|
elif platform == 'snapchat':
|
|
# Snapchat uses global interval
|
|
interval_hours = self.config.get('snapchat', {}).get('check_interval_hours', 6)
|
|
|
|
elif platform == 'tiktok':
|
|
# Check for per-account interval
|
|
accounts = self.config.get('tiktok', {}).get('accounts', [])
|
|
for account in accounts:
|
|
if account.get('username') == username:
|
|
interval_hours = account.get('check_interval_hours', 12)
|
|
break
|
|
else:
|
|
# Fallback to global setting
|
|
interval_hours = self.config.get('tiktok', {}).get('check_interval_hours', 12)
|
|
|
|
elif platform == 'forum':
|
|
# Forums have per-forum intervals
|
|
forums = self.config.get('forums', {}).get('configs', [])
|
|
for forum in forums:
|
|
if forum.get('name') == username: # username is forum name in this case
|
|
interval_hours = forum.get('check_interval_hours', 12)
|
|
break
|
|
else:
|
|
interval_hours = 12 # Default for forums
|
|
|
|
elif platform == 'coppermine':
|
|
# Coppermine galleries have per-gallery intervals
|
|
galleries = self.config.get('coppermine', {}).get('galleries', [])
|
|
for gallery in galleries:
|
|
if gallery.get('name') == username: # username is gallery name in this case
|
|
interval_hours = gallery.get('check_interval_hours', 12)
|
|
break
|
|
else:
|
|
interval_hours = self.config.get('coppermine', {}).get('check_interval_hours', 12)
|
|
|
|
# Use scheduler's lock to safely update all shared state atomically
|
|
with self.scheduler._state_lock:
|
|
now = datetime.now()
|
|
self.scheduler.last_run_times[task_id] = now
|
|
next_run = self.scheduler.calculate_next_run(task_id, interval_hours)
|
|
self.scheduler.scheduled_tasks[task_id] = next_run
|
|
self.scheduler._save_state()
|
|
|
|
self.logger.debug(f"Updated scheduler for manual run of {task_id}, interval: {interval_hours}h, next run: {next_run.strftime('%Y-%m-%d %H:%M:%S')}", module="Core")
|
|
|
|
def download_instagram_unified(self):
|
|
"""Download Instagram content using unified configuration.
|
|
Processes each account through its assigned scraper(s)."""
|
|
import time as _time
|
|
|
|
unified = self.config.get('instagram_unified', {})
|
|
if not unified.get('enabled'):
|
|
self.logger.warning("Instagram unified is not enabled", module="Instagram")
|
|
return 0
|
|
|
|
scraper_assignment = unified.get('scraper_assignment', {})
|
|
content_types = unified.get('content_types', {})
|
|
accounts = unified.get('accounts', [])
|
|
phrase_search = unified.get('phrase_search', {})
|
|
user_delay = unified.get('user_delay_seconds', 20)
|
|
|
|
scraper_methods = {
|
|
'fastdl': 'download_fastdl',
|
|
'imginn_api': 'download_imginn_api',
|
|
'imginn': 'download_imginn',
|
|
'toolzu': 'download_toolzu',
|
|
'instagram_client': 'download_instagram_client',
|
|
'instagram': 'download_instagram',
|
|
}
|
|
ct_keys = ['posts', 'stories', 'reels', 'tagged']
|
|
|
|
total_count = 0
|
|
active_accounts = [a for a in accounts if any(a.get(ct) for ct in ct_keys)]
|
|
random.shuffle(active_accounts)
|
|
self.logger.info(f"Instagram: processing {len(active_accounts)} accounts", module="Instagram")
|
|
|
|
for idx, account in enumerate(active_accounts):
|
|
username = account.get('username', '')
|
|
if not username:
|
|
continue
|
|
|
|
# Group enabled content types by assigned scraper
|
|
scraper_groups = {}
|
|
for ct in ct_keys:
|
|
if not account.get(ct):
|
|
continue
|
|
ct_config = content_types.get(ct, {})
|
|
if not ct_config.get('enabled'):
|
|
continue
|
|
scraper_key = scraper_assignment.get(ct)
|
|
if scraper_key:
|
|
scraper_groups.setdefault(scraper_key, []).append(ct)
|
|
|
|
if not scraper_groups:
|
|
continue
|
|
|
|
self.logger.info(f"Instagram: @{username} ({idx+1}/{len(active_accounts)})", module="Instagram")
|
|
|
|
for scraper_key, cts in scraper_groups.items():
|
|
method_name = scraper_methods.get(scraper_key)
|
|
if not method_name or not hasattr(self, method_name):
|
|
continue
|
|
|
|
original_config = self.config.get(scraper_key, {}).copy() if self.config.get(scraper_key) else {}
|
|
try:
|
|
scraper_cfg = dict(original_config)
|
|
scraper_cfg['enabled'] = True
|
|
if scraper_key == 'instagram':
|
|
scraper_cfg['accounts'] = [{'username': username, 'check_interval_hours': 8, 'run_at_start': False}]
|
|
else:
|
|
scraper_cfg['usernames'] = [username]
|
|
|
|
for ct in ct_keys:
|
|
if ct in cts:
|
|
ct_global = content_types.get(ct, {})
|
|
scraper_cfg[ct] = {
|
|
'enabled': True,
|
|
'days_back': ct_global.get('days_back', 7),
|
|
'destination_path': ct_global.get('destination_path', ''),
|
|
'temp_dir': f'temp/{scraper_key}/{ct}',
|
|
}
|
|
else:
|
|
scraper_cfg[ct] = {'enabled': False}
|
|
|
|
scraper_cfg['phrase_search'] = {'enabled': False, 'usernames': [], 'phrases': []}
|
|
self.config[scraper_key] = scraper_cfg
|
|
|
|
count = getattr(self, method_name)()
|
|
if count and count > 0:
|
|
self.logger.info(f" {scraper_key}: {count} files for @{username} ({', '.join(cts)})", module="Instagram")
|
|
total_count += count
|
|
except Exception as e:
|
|
self.logger.error(f"Error running {scraper_key} for @{username}: {e}", module="Instagram")
|
|
finally:
|
|
if original_config:
|
|
self.config[scraper_key] = original_config
|
|
elif scraper_key in self.config:
|
|
del self.config[scraper_key]
|
|
|
|
if idx < len(active_accounts) - 1 and user_delay > 0:
|
|
_time.sleep(user_delay)
|
|
|
|
# Run phrase search
|
|
if phrase_search.get('enabled') and phrase_search.get('phrases'):
|
|
posts_scraper = scraper_assignment.get('posts', 'imginn_api')
|
|
method_name = scraper_methods.get(posts_scraper)
|
|
if method_name and hasattr(self, method_name):
|
|
original_config = self.config.get(posts_scraper, {}).copy() if self.config.get(posts_scraper) else {}
|
|
try:
|
|
scraper_cfg = dict(original_config)
|
|
scraper_cfg['enabled'] = True
|
|
scraper_cfg['usernames'] = []
|
|
ps_usernames = sorted(set(a['username'] for a in accounts if a.get('username') and a.get('posts')))
|
|
scraper_cfg['phrase_search'] = {
|
|
'enabled': True,
|
|
'download_all': phrase_search.get('download_all', True),
|
|
'usernames': ps_usernames,
|
|
'phrases': phrase_search.get('phrases', []),
|
|
'case_sensitive': phrase_search.get('case_sensitive', False),
|
|
'match_all': phrase_search.get('match_all', False),
|
|
}
|
|
posts_ct = content_types.get('posts', {})
|
|
scraper_cfg['posts'] = {
|
|
'enabled': posts_ct.get('enabled', True),
|
|
'days_back': posts_ct.get('days_back', 7),
|
|
'destination_path': posts_ct.get('destination_path', ''),
|
|
'temp_dir': f'temp/{posts_scraper}/posts',
|
|
}
|
|
for ct in ['stories', 'reels', 'tagged']:
|
|
scraper_cfg[ct] = {'enabled': False}
|
|
self.config[posts_scraper] = scraper_cfg
|
|
|
|
self.logger.info(f"Instagram: running phrase search via {posts_scraper}", module="Instagram")
|
|
count = getattr(self, method_name)()
|
|
if count and count > 0:
|
|
self.logger.info(f"Phrase search: {count} files via {posts_scraper}", module="Instagram")
|
|
total_count += count
|
|
except Exception as e:
|
|
self.logger.error(f"Error running phrase search: {e}", module="Instagram")
|
|
finally:
|
|
if original_config:
|
|
self.config[posts_scraper] = original_config
|
|
|
|
self.logger.info(f"Instagram: completed, {total_count} total files", module="Instagram")
|
|
return total_count
|
|
|
|
def download_instagram(self):
|
|
"""Download Instagram content using InstaLoader or FastDL"""
|
|
if 'instagram' not in self.modules and 'fastdl' not in self.modules:
|
|
self.logger.warning("No Instagram module available", module="Instagram")
|
|
return 0
|
|
|
|
total_downloaded = 0 # Track total files downloaded
|
|
ig_config = self.config.get('instagram', {})
|
|
fastdl_config = self.config.get('fastdl', {})
|
|
|
|
# Try InstaLoader first
|
|
if 'instagram' in self.modules and ig_config.get('enabled'):
|
|
downloader = self.modules['instagram']
|
|
|
|
# Check if module is ready (session valid if required)
|
|
if not downloader.is_ready():
|
|
self.logger.info("Skipping Instagram downloads - session validation failed", module="Instagram")
|
|
return 0
|
|
|
|
# Support both old format (usernames list) and new format (accounts list)
|
|
usernames_to_process = []
|
|
if 'accounts' in ig_config:
|
|
usernames_to_process = [acc.get('username') for acc in ig_config['accounts'] if acc.get('username')]
|
|
elif 'usernames' in ig_config:
|
|
usernames_to_process = ig_config['usernames']
|
|
|
|
# Randomize account order to avoid detection patterns
|
|
random.shuffle(usernames_to_process)
|
|
|
|
for username in usernames_to_process:
|
|
self.logger.info(f"Processing Instagram user: {username}", module="Instagram")
|
|
|
|
# Start batch context for proper source tracking during file moves
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=username,
|
|
content_type='media'
|
|
)
|
|
|
|
# Download posts
|
|
if ig_config.get('posts', {}).get('enabled'):
|
|
count = self._download_instagram_content(
|
|
downloader, username, 'posts', ig_config['posts']
|
|
)
|
|
total_downloaded += count if count else 0
|
|
|
|
# Download stories
|
|
if ig_config.get('stories', {}).get('enabled'):
|
|
count = self._download_instagram_content(
|
|
downloader, username, 'stories', ig_config['stories']
|
|
)
|
|
total_downloaded += count if count else 0
|
|
|
|
# Download reels
|
|
if ig_config.get('reels', {}).get('enabled'):
|
|
count = self._download_instagram_content(
|
|
downloader, username, 'reels', ig_config['reels']
|
|
)
|
|
total_downloaded += count if count else 0
|
|
|
|
# End batch context
|
|
self.move_manager.end_batch()
|
|
|
|
# Update scheduler for this manual run
|
|
# Skip if running under unified config
|
|
if not self.config.get('instagram_unified', {}).get('enabled'):
|
|
self._update_scheduler_for_manual_run('instagram', username)
|
|
|
|
# Note: FastDL is now called separately via download_fastdl() method
|
|
|
|
# Update scheduler for manual run (platform level if no specific users)
|
|
if total_downloaded > 0:
|
|
if not self.config.get('instagram_unified', {}).get('enabled'):
|
|
self._update_scheduler_for_manual_run('instagram')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} Instagram files, triggering Immich scan", module="Instagram")
|
|
self.trigger_immich_scan()
|
|
elif total_downloaded == 0:
|
|
self.logger.info("No Instagram files downloaded, skipping Immich scan", module="Instagram")
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
def _download_instagram_content(self, downloader, username: str, content_type: str, config: Dict) -> int:
|
|
"""Download Instagram content with proper file handling
|
|
|
|
Returns:
|
|
Number of files moved
|
|
"""
|
|
from modules.monitor_wrapper import log_download_result
|
|
|
|
temp_dir = self.base_path / config['temp_dir']
|
|
# Don't create temp_dir here - InstaLoader will create it when needed
|
|
|
|
dest_dir = Path(config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Set days back
|
|
days_back = config.get('days_back', 3)
|
|
since_date = datetime.now() - timedelta(days=days_back)
|
|
|
|
# Download content using the unified download method
|
|
try:
|
|
# InstaLoaderModule has a single download method with content_type parameter
|
|
count = downloader.download(
|
|
username=username,
|
|
output_dir=str(temp_dir),
|
|
content_type=content_type,
|
|
max_downloads=20,
|
|
days_back=days_back
|
|
)
|
|
|
|
self.logger.info(f"Downloaded {count} {content_type} for {username}", module="Core")
|
|
|
|
# Move files with timestamp preservation only if files were downloaded
|
|
if count > 0 and temp_dir.exists():
|
|
# Add username subdirectory for organization
|
|
user_dest = dest_dir / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
moved = self._move_and_process_files(
|
|
temp_dir, user_dest,
|
|
config.get('extensions', []),
|
|
platform='instagram',
|
|
source=username,
|
|
content_type=content_type
|
|
)
|
|
# Log to monitor (success)
|
|
log_download_result('instagram', username, moved or 0, error=None)
|
|
return moved
|
|
|
|
# Log to monitor (no new content, still successful)
|
|
log_download_result('instagram', username, 0, error=None)
|
|
return 0
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error downloading {content_type} for {username}: {e}", module="Error")
|
|
# Log to monitor (failure)
|
|
log_download_result('instagram', username, 0, error=str(e))
|
|
return 0
|
|
finally:
|
|
# ALWAYS run aggressive cleanup
|
|
self._cleanup_temp_dir(temp_dir)
|
|
|
|
def _download_fastdl_content(self, downloader, username: str, content_type: str, config: Dict, phrase_config: Dict = None) -> int:
|
|
"""Download content via FastDL using subprocess isolation
|
|
|
|
Returns:
|
|
Number of files moved
|
|
"""
|
|
temp_dir = self.base_path / config['temp_dir']
|
|
# Clean temp directory BEFORE download to avoid mixing files from different users
|
|
self._cleanup_temp_dir(temp_dir)
|
|
|
|
dest_dir = Path(config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Prepare configuration for subprocess
|
|
fastdl_config = self.config.get('fastdl', {})
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_type': content_type,
|
|
'temp_dir': str(temp_dir),
|
|
'days_back': config.get('days_back', 3),
|
|
'max_downloads': 50,
|
|
'headless': True,
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db'),
|
|
'high_res': fastdl_config.get('high_res', False),
|
|
'phrase_config': phrase_config
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'fastdl_subprocess_wrapper.py')
|
|
|
|
# Run FastDL download in subprocess with real-time log streaming
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=300 # 5 minute timeout
|
|
)
|
|
|
|
# Parse JSON result from stdout
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
fastdl_result = json.loads(stdout.strip())
|
|
count = fastdl_result.get('count', 0)
|
|
pending_downloads = fastdl_result.get('pending_downloads', [])
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse FastDL subprocess result for {username}: {e}", module="Instagram")
|
|
self.logger.debug(f"Raw stdout: {stdout[:500] if stdout else 'None'}", module="Instagram")
|
|
return 0
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {content_type} for {username} via FastDL (subprocess)", module="Instagram")
|
|
|
|
# Move files with timestamp preservation only if files were downloaded
|
|
if count > 0 and temp_dir.exists():
|
|
# Extract search term from phrase_config if present
|
|
search_term = None
|
|
if phrase_config and phrase_config.get('phrases'):
|
|
search_term = ', '.join(phrase_config.get('phrases', []))
|
|
|
|
# Add username subdirectory for organization
|
|
user_dest = dest_dir / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
moved = self._move_and_process_files(
|
|
temp_dir, user_dest,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='instagram',
|
|
source=username,
|
|
content_type=content_type,
|
|
search_term=search_term
|
|
)
|
|
|
|
# Record pending downloads after move completes
|
|
# Record regardless of moved count - files may have gone to recycle as duplicates
|
|
if pending_downloads:
|
|
self._record_pending_instagram_downloads(pending_downloads, dest_dir, username, method='fastdl')
|
|
|
|
return moved
|
|
|
|
return 0
|
|
else:
|
|
self.logger.warning(f"FastDL subprocess failed for {username} {content_type} (code {returncode})", module="Instagram")
|
|
# Try to parse error message from stdout
|
|
if stdout:
|
|
try:
|
|
error_result = json.loads(stdout.strip())
|
|
if error_result.get('message'):
|
|
self.logger.warning(f"FastDL error: {error_result['message'][:200]}", module="Instagram")
|
|
except json.JSONDecodeError:
|
|
self.logger.debug(f"Raw subprocess output: {stdout[:300]}", module="Instagram")
|
|
return 0
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"FastDL subprocess timeout for {username} {content_type}", module="Instagram")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.warning(f"FastDL subprocess error for {username} {content_type}: {e}", module="Instagram")
|
|
import traceback
|
|
self.logger.debug(traceback.format_exc())
|
|
return 0
|
|
finally:
|
|
# ALWAYS run aggressive cleanup
|
|
self._cleanup_temp_dir(temp_dir)
|
|
|
|
def _download_fastdl_content_multi(self, downloader, username: str, content_types: list, configs: dict, phrase_configs: dict = None) -> dict:
|
|
"""Download multiple content types via FastDL in a single subprocess/browser session.
|
|
|
|
Args:
|
|
downloader: FastDL downloader module reference (unused, kept for API consistency)
|
|
username: Instagram username
|
|
content_types: List of content types e.g. ['stories', 'reels', 'posts']
|
|
configs: Dict {content_type: config_dict} with temp_dir, destination_path, etc.
|
|
phrase_configs: Dict {content_type: phrase_config} or None
|
|
|
|
Returns:
|
|
Dict {content_type: moved_count}
|
|
"""
|
|
moved_counts = {}
|
|
temp_dirs = {}
|
|
dest_dirs = {}
|
|
output_dirs = {}
|
|
|
|
# Prepare dirs for each content type
|
|
for ct in content_types:
|
|
config = configs[ct]
|
|
temp_dir = self.base_path / config['temp_dir']
|
|
self._cleanup_temp_dir(temp_dir)
|
|
temp_dirs[ct] = temp_dir
|
|
output_dirs[ct] = str(temp_dir)
|
|
|
|
dest_dir = Path(config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest_dirs[ct] = dest_dir
|
|
|
|
# Build subprocess config
|
|
fastdl_config = self.config.get('fastdl', {})
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_types': content_types,
|
|
'output_dirs': output_dirs,
|
|
'days_back': configs[content_types[0]].get('days_back', 3),
|
|
'max_downloads': 50,
|
|
'headless': True,
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db'),
|
|
'high_res': fastdl_config.get('high_res', False),
|
|
'phrase_configs': phrase_configs
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'fastdl_subprocess_wrapper.py')
|
|
|
|
# Single subprocess for all content types (longer timeout since it does more work)
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=420 # 7 minute timeout for multi-content
|
|
)
|
|
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
fastdl_result = json.loads(stdout.strip())
|
|
results = fastdl_result.get('results', {})
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse FastDL multi subprocess result for {username}: {e}", module="Instagram")
|
|
self.logger.debug(f"Raw stdout: {stdout[:500] if stdout else 'None'}", module="Instagram")
|
|
return {ct: 0 for ct in content_types}
|
|
|
|
# Process each content type's results
|
|
for ct in content_types:
|
|
ct_result = results.get(ct, {})
|
|
count = ct_result.get('count', 0)
|
|
pending_downloads = ct_result.get('pending_downloads', [])
|
|
temp_dir = temp_dirs[ct]
|
|
dest_dir = dest_dirs[ct]
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {ct} for {username} via FastDL (multi)", module="Instagram")
|
|
|
|
if count > 0 and temp_dir.exists():
|
|
user_dest = dest_dir / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
moved = self._move_and_process_files(
|
|
temp_dir, user_dest,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='instagram',
|
|
source=username,
|
|
content_type=ct
|
|
)
|
|
|
|
if pending_downloads:
|
|
self._record_pending_instagram_downloads(pending_downloads, dest_dir, username, method='fastdl')
|
|
|
|
moved_counts[ct] = moved
|
|
else:
|
|
moved_counts[ct] = 0
|
|
|
|
return moved_counts
|
|
else:
|
|
self.logger.warning(f"FastDL multi subprocess failed for {username} (code {returncode})", module="Instagram")
|
|
if stdout:
|
|
try:
|
|
error_result = json.loads(stdout.strip())
|
|
if error_result.get('message'):
|
|
self.logger.warning(f"FastDL error: {error_result['message'][:200]}", module="Instagram")
|
|
except json.JSONDecodeError:
|
|
self.logger.debug(f"Raw subprocess output: {stdout[:300]}", module="Instagram")
|
|
return {ct: 0 for ct in content_types}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"FastDL multi subprocess timeout for {username}", module="Instagram")
|
|
return {ct: 0 for ct in content_types}
|
|
except Exception as e:
|
|
self.logger.warning(f"FastDL multi subprocess error for {username}: {e}", module="Instagram")
|
|
import traceback
|
|
self.logger.debug(traceback.format_exc())
|
|
return {ct: 0 for ct in content_types}
|
|
finally:
|
|
for ct, temp_dir in temp_dirs.items():
|
|
self._cleanup_temp_dir(temp_dir)
|
|
|
|
def _auto_run_toolzu_upgrade(self, username: str):
|
|
"""
|
|
Automatically run Toolzu quality upgrade for a specific user after FastDL completes
|
|
|
|
Args:
|
|
username: Instagram username to upgrade
|
|
"""
|
|
# Check if Toolzu is enabled and configured for this user
|
|
toolzu_config = self.config.get('toolzu', {})
|
|
if not toolzu_config.get('enabled'):
|
|
self.logger.debug(f"Toolzu not enabled, skipping auto-upgrade for {username}", module="Toolzu")
|
|
return
|
|
|
|
if username not in toolzu_config.get('usernames', []):
|
|
self.logger.debug(f"User {username} not in Toolzu config, skipping auto-upgrade", module="Toolzu")
|
|
return
|
|
|
|
if 'toolzu' not in self.modules:
|
|
self.logger.warning(f"Toolzu module not available for auto-upgrade of {username}", module="Toolzu")
|
|
return
|
|
|
|
self.logger.info(f"🔄 Auto-triggering Toolzu quality upgrade for {username}", module="Toolzu")
|
|
|
|
downloader = self.modules['toolzu']
|
|
|
|
# Download posts via Toolzu (only posts need quality upgrade)
|
|
if toolzu_config.get('posts', {}).get('enabled'):
|
|
config = toolzu_config['posts']
|
|
temp_dir = self.base_path / config.get('temp_dir', 'temp/toolzu/posts')
|
|
|
|
try:
|
|
count = downloader.download(
|
|
username=username,
|
|
content_type='posts',
|
|
days_back=config.get('days_back', 3),
|
|
max_downloads=15,
|
|
output_dir=str(temp_dir)
|
|
)
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} posts for {username} via Toolzu (auto-upgrade)", module="Toolzu")
|
|
|
|
# Run quality upgrade
|
|
if self.config.get('download_settings', {}).get('move_to_destination', True):
|
|
dest_path = Path(config.get('destination_path'))
|
|
moved_count = self._merge_fastdl_toolzu_quality_upgrade(
|
|
username, 'posts', temp_dir, dest_path
|
|
)
|
|
self.logger.info(f"Auto-upgrade processed {moved_count} files", module="Core")
|
|
else:
|
|
self.logger.debug(f"No new Toolzu files for {username}, quality already up-to-date", module="Toolzu")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Toolzu auto-upgrade error for {username}: {e}", module="Toolzu")
|
|
|
|
def download_fastdl(self):
|
|
"""Download Instagram content via FastDL (alternative to InstaLoader)"""
|
|
if 'fastdl' not in self.modules:
|
|
self.logger.warning("FastDL module not available", module="Instagram")
|
|
if self.health_monitor:
|
|
self.health_monitor.record_failure('fastdl', 'module_not_available')
|
|
return 0
|
|
|
|
total_downloaded = 0
|
|
fastdl_config = self.config.get('fastdl', {})
|
|
|
|
if not fastdl_config.get('enabled'):
|
|
self.logger.debug("FastDL is disabled in config", module="Instagram")
|
|
return 0
|
|
|
|
try:
|
|
downloader = self.modules['fastdl']
|
|
|
|
# Check if phrase search is enabled
|
|
phrase_config = fastdl_config.get('phrase_search', {})
|
|
|
|
# Process regular usernames (randomize order to avoid detection)
|
|
usernames = fastdl_config.get('usernames', [])
|
|
random.shuffle(usernames)
|
|
|
|
# Calculate total accounts for progress tracking
|
|
phrase_usernames_list = phrase_config.get('usernames', []) if phrase_config.get('enabled') else []
|
|
total_accounts = len(usernames) + len(phrase_usernames_list)
|
|
accounts_processed = 0
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
# Unified runner manages its own activity progress
|
|
if self.config.get('instagram_unified', {}).get('enabled'):
|
|
activity_mgr = None
|
|
|
|
# Get delay between users to avoid Cloudflare rate limiting
|
|
user_delay_min = fastdl_config.get('user_delay_seconds', 20)
|
|
user_delay_max = user_delay_min + 20 # Add 20s variability
|
|
|
|
for idx, username in enumerate(usernames):
|
|
accounts_processed += 1
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(accounts_processed, total_accounts)
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
# Add delay before each user (including first) to avoid Cloudflare rate limiting
|
|
if idx == 0:
|
|
# Initial delay before first user - give Cloudflare time to settle
|
|
delay = random.uniform(10, 20)
|
|
self.logger.info(f"Cloudflare rate limit: initial wait {delay:.1f}s before first user", module="FastDL")
|
|
else:
|
|
delay = random.uniform(user_delay_min, user_delay_max)
|
|
self.logger.info(f"Cloudflare rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(usernames)})", module="FastDL")
|
|
time.sleep(delay)
|
|
|
|
self.logger.info(f"Processing Instagram user via FastDL: {username}", module="Instagram")
|
|
|
|
# Start batch for this user (will combine all content types into one notification)
|
|
# Use 'instagram' platform (all Instagram methods now use platform='instagram')
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=username,
|
|
content_type='media' # Generic since we're combining posts/stories/reels
|
|
)
|
|
|
|
user_downloaded = 0
|
|
# Collect enabled content types for single-session multi-download
|
|
enabled_types = []
|
|
enabled_configs = {}
|
|
for ct in ['posts', 'stories', 'reels']:
|
|
if fastdl_config.get(ct, {}).get('enabled'):
|
|
enabled_types.append(ct)
|
|
enabled_configs[ct] = fastdl_config[ct]
|
|
|
|
if enabled_types:
|
|
if activity_mgr:
|
|
try:
|
|
types_label = ', '.join(ct.capitalize() for ct in enabled_types)
|
|
activity_mgr.update_status(f"Downloading {types_label}")
|
|
except Exception:
|
|
pass
|
|
|
|
moved_counts = self._download_fastdl_content_multi(
|
|
downloader, username, enabled_types, enabled_configs
|
|
)
|
|
for ct, count in moved_counts.items():
|
|
user_downloaded += count if count else 0
|
|
total_downloaded += count if count else 0
|
|
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_status(f"Downloaded {user_downloaded} files")
|
|
except Exception:
|
|
pass
|
|
|
|
# End batch and send one notification for all content types
|
|
self.move_manager.end_batch()
|
|
|
|
# Auto-import to private gallery if this account is mapped
|
|
if user_downloaded > 0 and not getattr(self, '_skip_gallery_bridge', False):
|
|
try:
|
|
from modules.scraper_gallery_bridge import get_crypto, on_download_complete
|
|
crypto = get_crypto()
|
|
if crypto:
|
|
on_download_complete(f"fastdl:{username}", user_downloaded, self.unified_db, crypto)
|
|
except Exception as e:
|
|
self.logger.debug(f"Gallery bridge: {e}", module="Instagram")
|
|
|
|
# NOTE: Don't update scheduler per-user - FastDL runs as a single batch task "fastdl:all"
|
|
# Scheduler update happens once after the entire batch completes (below)
|
|
|
|
# Toolzu is now scheduled separately instead of auto-running after FastDL
|
|
# Uncomment below to enable automatic quality upgrade after FastDL:
|
|
# self._auto_run_toolzu_upgrade(username)
|
|
|
|
gc.collect()
|
|
|
|
# No extra delay here — the top-of-loop delay (user_delay_seconds)
|
|
# already provides sufficient Cloudflare rate limiting between users.
|
|
# With single-session-per-user, double delays are unnecessary.
|
|
|
|
# Process phrase search usernames if enabled
|
|
if phrase_config.get('enabled'):
|
|
phrase_usernames = phrase_config.get('usernames', [])
|
|
if phrase_usernames:
|
|
# Randomize phrase search account order to avoid detection
|
|
random.shuffle(phrase_usernames)
|
|
self.logger.info(f"Processing phrase search for users: {phrase_usernames}", module="Core")
|
|
self.logger.info(f"Looking for phrases: {phrase_config.get('phrases', [])}", module="Core")
|
|
|
|
for username in phrase_usernames:
|
|
accounts_processed += 1
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(accounts_processed, total_accounts)
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Phrase search")
|
|
except Exception:
|
|
pass
|
|
|
|
self.logger.info(f"Searching {username} for phrase matches via FastDL", module="Instagram")
|
|
|
|
# Only search in posts (not reels or stories)
|
|
for content_type in ['posts']:
|
|
if fastdl_config.get(content_type, {}).get('enabled'):
|
|
config = fastdl_config[content_type]
|
|
count = self._download_fastdl_content(downloader, username, content_type, config, phrase_config=phrase_config)
|
|
total_downloaded += count if count else 0
|
|
|
|
# Add delay between phrase search accounts
|
|
if username != phrase_usernames[-1]:
|
|
delay = random.uniform(20, 40)
|
|
self.logger.debug(f"Waiting {delay:.1f}s before next phrase search account", module="Instagram")
|
|
time.sleep(delay)
|
|
|
|
# Update scheduler for the fastdl:all batch task (always update, even if no downloads)
|
|
# Skip if running under unified config — unified has its own scheduler task
|
|
if not self.config.get('instagram_unified', {}).get('enabled'):
|
|
self._update_scheduler_for_manual_run('fastdl', 'all')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} FastDL files, triggering Immich scan", module="Instagram")
|
|
self.trigger_immich_scan()
|
|
|
|
# Record success if monitoring enabled
|
|
if self.health_monitor:
|
|
self.health_monitor.record_success('fastdl')
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"FastDL download error: {e}", module="Instagram")
|
|
# Record failure if monitoring enabled
|
|
if self.health_monitor:
|
|
# Determine failure type from exception
|
|
error_str = str(e).lower()
|
|
if 'cloudflare' in error_str or 'cf_clearance' in error_str:
|
|
reason = 'cloudflare'
|
|
elif 'timeout' in error_str:
|
|
reason = 'timeout'
|
|
elif '403' in error_str:
|
|
reason = 'forbidden'
|
|
elif '429' in error_str:
|
|
reason = 'rate_limit'
|
|
else:
|
|
reason = 'unknown'
|
|
self.health_monitor.record_failure('fastdl', reason)
|
|
raise # Re-raise exception to maintain existing error handling
|
|
|
|
def download_imginn_api(self):
|
|
"""Download Instagram content via ImgInn API (curl_cffi-based, no Playwright)"""
|
|
if 'imginn_api' not in self.modules:
|
|
self.logger.warning("ImgInn API module not available", module="Instagram")
|
|
if self.health_monitor:
|
|
self.health_monitor.record_failure('imginn_api', 'module_not_available')
|
|
return 0
|
|
|
|
total_downloaded = 0
|
|
ia_config = self.config.get('imginn_api', {})
|
|
|
|
if not ia_config.get('enabled'):
|
|
self.logger.debug("ImgInn API is disabled in config", module="Instagram")
|
|
return 0
|
|
|
|
try:
|
|
downloader = self.modules['imginn_api']
|
|
|
|
# Check if phrase search is enabled
|
|
phrase_config = ia_config.get('phrase_search', {})
|
|
|
|
# Process regular usernames (randomize order to avoid detection)
|
|
usernames = ia_config.get('usernames', [])
|
|
random.shuffle(usernames)
|
|
|
|
# Calculate total accounts for progress tracking
|
|
phrase_usernames_list = phrase_config.get('usernames', []) if phrase_config.get('enabled') else []
|
|
total_accounts = len(usernames) + len(phrase_usernames_list)
|
|
accounts_processed = 0
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
# Unified runner manages its own activity progress
|
|
if self.config.get('instagram_unified', {}).get('enabled'):
|
|
activity_mgr = None
|
|
|
|
# Get delay between users to avoid Cloudflare rate limiting
|
|
user_delay_min = ia_config.get('user_delay_seconds', 20)
|
|
user_delay_max = user_delay_min + 20
|
|
|
|
for idx, username in enumerate(usernames):
|
|
accounts_processed += 1
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(accounts_processed, total_accounts)
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
# Add delay before each user to avoid Cloudflare rate limiting
|
|
if idx == 0:
|
|
delay = random.uniform(10, 20)
|
|
self.logger.info(f"Cloudflare rate limit: initial wait {delay:.1f}s before first user", module="ImgInnAPI")
|
|
else:
|
|
delay = random.uniform(user_delay_min, user_delay_max)
|
|
self.logger.info(f"Cloudflare rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(usernames)})", module="ImgInnAPI")
|
|
time.sleep(delay)
|
|
|
|
self.logger.info(f"Processing Instagram user via ImgInn API: {username}", module="Instagram")
|
|
|
|
# Start batch for this user
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=username,
|
|
content_type='media'
|
|
)
|
|
|
|
user_downloaded = 0
|
|
# Collect enabled content types
|
|
enabled_types = []
|
|
enabled_configs = {}
|
|
for ct in ['posts', 'stories', 'reels', 'tagged']:
|
|
if ia_config.get(ct, {}).get('enabled'):
|
|
enabled_types.append(ct)
|
|
enabled_configs[ct] = ia_config[ct]
|
|
|
|
if enabled_types:
|
|
if activity_mgr:
|
|
try:
|
|
types_label = ', '.join(ct.capitalize() for ct in enabled_types)
|
|
activity_mgr.update_status(f"Downloading {types_label}")
|
|
except Exception:
|
|
pass
|
|
|
|
moved_counts = self._download_imginn_api_content_multi(
|
|
downloader, username, enabled_types, enabled_configs
|
|
)
|
|
for ct, count in moved_counts.items():
|
|
user_downloaded += count if count else 0
|
|
total_downloaded += count if count else 0
|
|
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_status(f"Downloaded {user_downloaded} files")
|
|
except Exception:
|
|
pass
|
|
|
|
# End batch and send one notification for all content types
|
|
self.move_manager.end_batch()
|
|
|
|
# Auto-import to private gallery if this account is mapped
|
|
if user_downloaded > 0 and not getattr(self, '_skip_gallery_bridge', False):
|
|
try:
|
|
from modules.scraper_gallery_bridge import get_crypto, on_download_complete
|
|
crypto = get_crypto()
|
|
if crypto:
|
|
on_download_complete(f"imginn_api:{username}", user_downloaded, self.unified_db, crypto)
|
|
except Exception as e:
|
|
self.logger.debug(f"Gallery bridge: {e}", module="Instagram")
|
|
|
|
gc.collect()
|
|
|
|
# Process phrase search usernames if enabled
|
|
if phrase_config.get('enabled'):
|
|
phrase_usernames = phrase_config.get('usernames', [])
|
|
download_all = phrase_config.get('download_all', False)
|
|
if phrase_usernames:
|
|
random.shuffle(phrase_usernames)
|
|
|
|
if download_all:
|
|
self.logger.info(f"Processing download-all for users: {phrase_usernames}", module="Core")
|
|
else:
|
|
self.logger.info(f"Processing phrase search for users: {phrase_usernames}", module="Core")
|
|
self.logger.info(f"Looking for phrases: {phrase_config.get('phrases', [])}", module="Core")
|
|
|
|
for idx, username in enumerate(phrase_usernames):
|
|
accounts_processed += 1
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(accounts_processed, total_accounts)
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Phrase search" if not download_all else "Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
if idx == 0:
|
|
delay = random.uniform(10, 20)
|
|
self.logger.info(f"Cloudflare rate limit: initial wait {delay:.1f}s before first user", module="ImgInnAPI")
|
|
else:
|
|
delay = random.uniform(user_delay_min, user_delay_max)
|
|
self.logger.info(f"Cloudflare rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(phrase_usernames)})", module="ImgInnAPI")
|
|
time.sleep(delay)
|
|
|
|
if download_all:
|
|
# Download all content types (posts, stories, reels) for download_all users
|
|
content_type_count = 0
|
|
for content_type in ['posts', 'stories', 'reels']:
|
|
if ia_config.get(content_type, {}).get('enabled'):
|
|
if content_type_count > 0:
|
|
delay = random.uniform(3, 6)
|
|
self.logger.info(f"Rate limiting: waiting {delay:.1f}s before {content_type}", module="ImgInnAPI")
|
|
time.sleep(delay)
|
|
content_type_count += 1
|
|
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_status(f"Downloading {content_type}")
|
|
except Exception:
|
|
pass
|
|
|
|
config = ia_config[content_type]
|
|
try:
|
|
count = self._download_imginn_api_content(
|
|
downloader, username, content_type, config, phrase_config=None
|
|
)
|
|
total_downloaded += count if count else 0
|
|
except Exception as e:
|
|
self.logger.error(f"ImgInn API download_all error for {username}/{content_type}: {e}", module="Instagram")
|
|
else:
|
|
self.logger.info(f"Searching {username} for phrase matches via ImgInn API", module="Instagram")
|
|
|
|
# Only search in posts (not reels, stories, or tagged)
|
|
if ia_config.get('posts', {}).get('enabled'):
|
|
config = ia_config['posts']
|
|
count = self._download_imginn_api_content(
|
|
downloader, username, 'posts', config, phrase_config=phrase_config
|
|
)
|
|
total_downloaded += count if count else 0
|
|
|
|
# Auto-import to private gallery if this account is mapped (download_all only)
|
|
if download_all and total_downloaded > 0 and not getattr(self, '_skip_gallery_bridge', False):
|
|
try:
|
|
from modules.scraper_gallery_bridge import get_crypto, on_download_complete
|
|
crypto = get_crypto()
|
|
if crypto:
|
|
imported = on_download_complete(f"imginn_api:{username}", 1, self.unified_db, crypto)
|
|
if imported:
|
|
self.logger.info(f"Gallery bridge: imported {imported} files for {username}", module="Instagram")
|
|
except Exception as e:
|
|
self.logger.debug(f"Gallery bridge: {e}", module="Instagram")
|
|
|
|
if not download_all and username != phrase_usernames[-1]:
|
|
delay = random.uniform(20, 40)
|
|
self.logger.debug(f"Waiting {delay:.1f}s before next phrase search account", module="Instagram")
|
|
time.sleep(delay)
|
|
|
|
# Update scheduler for the imginn_api:all batch task
|
|
# Skip if running under unified config — unified has its own scheduler task
|
|
if not self.config.get('instagram_unified', {}).get('enabled'):
|
|
self._update_scheduler_for_manual_run('imginn_api', 'all')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} ImgInn API files, triggering Immich scan", module="Instagram")
|
|
self.trigger_immich_scan()
|
|
|
|
# Record success if monitoring enabled
|
|
if self.health_monitor:
|
|
self.health_monitor.record_success('imginn_api')
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"ImgInn API download error: {e}", module="Instagram")
|
|
if self.health_monitor:
|
|
error_str = str(e).lower()
|
|
if 'cloudflare' in error_str or 'cf_clearance' in error_str:
|
|
reason = 'cloudflare'
|
|
elif 'timeout' in error_str:
|
|
reason = 'timeout'
|
|
elif '403' in error_str:
|
|
reason = 'forbidden'
|
|
elif '429' in error_str:
|
|
reason = 'rate_limit'
|
|
else:
|
|
reason = 'unknown'
|
|
self.health_monitor.record_failure('imginn_api', reason)
|
|
raise
|
|
|
|
def _download_imginn_api_content(self, downloader, username: str, content_type: str, config: Dict, phrase_config: Dict = None) -> int:
|
|
"""Download content via ImgInn API using subprocess isolation.
|
|
|
|
Returns:
|
|
Number of files downloaded
|
|
"""
|
|
temp_dir = self.base_path / config.get('temp_dir', f'temp/imginn_api/{content_type}')
|
|
self._cleanup_temp_dir(temp_dir)
|
|
dest_dir = Path(config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_type': content_type,
|
|
'temp_dir': str(temp_dir),
|
|
'days_back': config.get('days_back') or 3,
|
|
'max_downloads': 50,
|
|
'headless': True,
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db'),
|
|
'phrase_config': phrase_config
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'imginn_api_subprocess_wrapper.py')
|
|
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=600
|
|
)
|
|
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
result = json.loads(stdout.strip())
|
|
count = result.get('count', 0)
|
|
pending_downloads = result.get('pending_downloads', [])
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse ImgInn API subprocess result for {username}: {e}", module="Instagram")
|
|
return 0
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {content_type} for {username} via ImgInn API", module="Instagram")
|
|
|
|
if count > 0 and self.config.get('download_settings', {}).get('move_to_destination', True):
|
|
# imginn_api puts files directly in temp_dir (no username subdirectory)
|
|
if temp_dir.exists():
|
|
user_dest = dest_dir / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
if content_type == 'tagged' and pending_downloads:
|
|
filename_to_poster = {}
|
|
for dl in pending_downloads:
|
|
if dl.get('filename') and dl.get('username'):
|
|
filename_to_poster[dl['filename']] = dl['username']
|
|
|
|
moved = 0
|
|
for file in list(temp_dir.iterdir()):
|
|
if file.is_file() and file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov']:
|
|
poster_username = filename_to_poster.get(file.name)
|
|
if not poster_username:
|
|
poster_username = file.stem.split('_')[0] if '_' in file.stem else username
|
|
poster_dest = dest_dir / poster_username
|
|
poster_dest.mkdir(parents=True, exist_ok=True)
|
|
moved += self._move_and_process_files(
|
|
file.parent, poster_dest,
|
|
[file.suffix.lower()],
|
|
platform='instagram',
|
|
source=poster_username,
|
|
content_type='tagged',
|
|
specific_files=[file]
|
|
)
|
|
else:
|
|
moved = self._move_and_process_files(
|
|
temp_dir, user_dest,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='instagram',
|
|
source=username,
|
|
content_type=content_type
|
|
)
|
|
|
|
if pending_downloads:
|
|
self._record_pending_instagram_downloads(pending_downloads, dest_dir, username, method='imginn_api')
|
|
|
|
return count
|
|
else:
|
|
self.logger.warning(f"ImgInn API subprocess failed for {username}/{content_type} (code {returncode})", module="Instagram")
|
|
return 0
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"ImgInn API subprocess timeout for {username}/{content_type}", module="Instagram")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.warning(f"ImgInn API subprocess error for {username}/{content_type}: {e}", module="Instagram")
|
|
return 0
|
|
finally:
|
|
self._cleanup_temp_dir(temp_dir)
|
|
|
|
def _download_imginn_api_content_multi(self, downloader, username: str, content_types: list, configs: dict) -> dict:
|
|
"""Download multiple content types via ImgInn API in a single subprocess session.
|
|
|
|
Returns:
|
|
Dict {content_type: moved_count}
|
|
"""
|
|
moved_counts = {}
|
|
temp_dirs = {}
|
|
dest_dirs = {}
|
|
output_dirs = {}
|
|
|
|
for ct in content_types:
|
|
config = configs[ct]
|
|
temp_dir = self.base_path / config.get('temp_dir', f'temp/imginn_api/{ct}')
|
|
self._cleanup_temp_dir(temp_dir)
|
|
temp_dirs[ct] = temp_dir
|
|
output_dirs[ct] = str(temp_dir)
|
|
|
|
dest_dir = Path(config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest_dirs[ct] = dest_dir
|
|
|
|
ia_config = self.config.get('imginn_api', {})
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_types': content_types,
|
|
'output_dirs': output_dirs,
|
|
'days_back': configs[content_types[0]].get('days_back', 3),
|
|
'max_downloads': 50,
|
|
'headless': True,
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db'),
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'imginn_api_subprocess_wrapper.py')
|
|
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=600
|
|
)
|
|
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
result = json.loads(stdout.strip())
|
|
results = result.get('results', {})
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse ImgInn API multi subprocess result for {username}: {e}", module="Instagram")
|
|
return {ct: 0 for ct in content_types}
|
|
|
|
for ct in content_types:
|
|
ct_result = results.get(ct, {})
|
|
count = ct_result.get('count', 0)
|
|
pending_downloads = ct_result.get('pending_downloads', [])
|
|
temp_dir = temp_dirs[ct]
|
|
dest_dir = dest_dirs[ct]
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {ct} for {username} via ImgInn API (multi)", module="Instagram")
|
|
|
|
if count > 0 and temp_dir.exists():
|
|
user_dest = dest_dir / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
if ct == 'tagged' and pending_downloads:
|
|
filename_to_poster = {}
|
|
for dl in pending_downloads:
|
|
if dl.get('filename') and dl.get('username'):
|
|
filename_to_poster[dl['filename']] = dl['username']
|
|
|
|
# Save and clear outer batch context so per-poster batches
|
|
# can set the correct source (poster username, not searched user)
|
|
saved_batch = self.move_manager.batch_context
|
|
self.move_manager.batch_context = None
|
|
|
|
# Group files by poster_username for batched notifications
|
|
poster_files = {}
|
|
for file in list(temp_dir.iterdir()):
|
|
if file.is_file() and file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov']:
|
|
poster_username = filename_to_poster.get(file.name)
|
|
if not poster_username:
|
|
poster_username = file.stem.split('_')[0] if '_' in file.stem else username
|
|
poster_files.setdefault(poster_username, []).append(file)
|
|
|
|
moved = 0
|
|
for poster_username, files in poster_files.items():
|
|
poster_dest = dest_dir / poster_username
|
|
poster_dest.mkdir(parents=True, exist_ok=True)
|
|
moved += self._move_and_process_files(
|
|
files[0].parent, poster_dest,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='instagram',
|
|
source=poster_username,
|
|
content_type='tagged',
|
|
specific_files=files
|
|
)
|
|
|
|
# Restore outer batch context
|
|
self.move_manager.batch_context = saved_batch
|
|
|
|
if pending_downloads:
|
|
self._record_pending_instagram_downloads(pending_downloads, dest_dir, username, method='imginn_api')
|
|
|
|
moved_counts[ct] = moved
|
|
else:
|
|
moved = self._move_and_process_files(
|
|
temp_dir, user_dest,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='instagram',
|
|
source=username,
|
|
content_type=ct
|
|
)
|
|
|
|
if pending_downloads:
|
|
self._record_pending_instagram_downloads(pending_downloads, dest_dir, username, method='imginn_api')
|
|
|
|
moved_counts[ct] = moved
|
|
else:
|
|
moved_counts[ct] = 0
|
|
|
|
return moved_counts
|
|
else:
|
|
self.logger.warning(f"ImgInn API multi subprocess failed for {username} (code {returncode})", module="Instagram")
|
|
return {ct: 0 for ct in content_types}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"ImgInn API multi subprocess timeout for {username}", module="Instagram")
|
|
return {ct: 0 for ct in content_types}
|
|
except Exception as e:
|
|
self.logger.warning(f"ImgInn API multi subprocess error for {username}: {e}", module="Instagram")
|
|
return {ct: 0 for ct in content_types}
|
|
finally:
|
|
for ct, temp_dir in temp_dirs.items():
|
|
self._cleanup_temp_dir(temp_dir)
|
|
|
|
def _download_imginn_content(self, username: str, content_type: str, config: Dict, phrase_config: Dict = None) -> int:
|
|
"""Download content via ImgInn using subprocess isolation
|
|
|
|
Returns:
|
|
Number of files downloaded
|
|
"""
|
|
temp_dir = self.base_path / config['temp_dir']
|
|
dest_dir = Path(config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Prepare configuration for subprocess
|
|
imginn_config = self.config.get('imginn', {})
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_type': content_type,
|
|
'temp_dir': str(temp_dir),
|
|
'days_back': config.get('days_back') or 14,
|
|
'max_downloads': 50,
|
|
'headless': False, # ImgInn uses headed mode with Xvfb
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db'),
|
|
'cookie_file': imginn_config.get('cookie_file'),
|
|
'phrase_config': phrase_config
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'imginn_subprocess_wrapper.py')
|
|
|
|
# Run ImgInn download in subprocess with real-time log streaming
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=600 # 10 minute timeout (ImgInn can be slow with Cloudflare)
|
|
)
|
|
|
|
# Parse JSON result from stdout
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
imginn_result = json.loads(stdout.strip())
|
|
count = imginn_result.get('count', 0)
|
|
pending_downloads = imginn_result.get('pending_downloads', [])
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse ImgInn subprocess result for {username}: {e}", module="Instagram")
|
|
self.logger.debug(f"Raw stdout: {stdout[:500] if stdout else 'None'}", module="Instagram")
|
|
return 0
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {content_type} for {username} via ImgInn (subprocess)", module="Instagram")
|
|
|
|
# Move files to destination
|
|
if self.config.get('download_settings', {}).get('move_to_destination', True):
|
|
user_temp_dir = temp_dir / username
|
|
if user_temp_dir.exists():
|
|
# Extract search term from phrase_config if present
|
|
search_term = None
|
|
if phrase_config and phrase_config.get('phrases'):
|
|
search_term = ', '.join(phrase_config.get('phrases', []))
|
|
|
|
# For tagged content, each file may be from a different poster
|
|
# Use pending_downloads data which has the correct poster username
|
|
# Route through move_manager for face recognition and deduplication
|
|
if content_type == 'tagged' and pending_downloads:
|
|
# Build filename -> poster_username mapping from pending_downloads
|
|
filename_to_poster = {}
|
|
for dl in pending_downloads:
|
|
if dl.get('filename') and dl.get('username'):
|
|
filename_to_poster[dl['filename']] = dl['username']
|
|
|
|
# Group files by poster_username first, then batch per poster
|
|
import re as re_tagged
|
|
poster_files = {} # poster_username -> [(file, dest_file), ...]
|
|
for file in list(user_temp_dir.iterdir()):
|
|
if file.is_file() and file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov']:
|
|
poster_username = filename_to_poster.get(file.name)
|
|
if not poster_username:
|
|
match = re_tagged.match(r'^(.+?)_(\d{8})_', file.name)
|
|
if match:
|
|
poster_username = match.group(1)
|
|
else:
|
|
poster_username = username
|
|
poster_dest = dest_dir / poster_username
|
|
poster_dest.mkdir(parents=True, exist_ok=True)
|
|
dest_file = poster_dest / file.name
|
|
poster_files.setdefault(poster_username, []).append((file, dest_file))
|
|
|
|
moved = 0
|
|
for poster_username, files in poster_files.items():
|
|
try:
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=poster_username,
|
|
content_type='tagged'
|
|
)
|
|
for file, dest_file in files:
|
|
success = self.move_manager.move_file(
|
|
source=file,
|
|
destination=dest_file,
|
|
content_type='tagged'
|
|
)
|
|
if success:
|
|
moved += 1
|
|
self.logger.debug(f"Moved tagged file to {poster_username}/: {file.name}", module="Instagram")
|
|
self.move_manager.end_batch()
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to move tagged files for {poster_username}: {e}", module="Instagram")
|
|
if self.move_manager.batch_context:
|
|
self.move_manager.end_batch()
|
|
|
|
self.logger.info(f"Moved {moved} tagged files to user-specific folders", module="Core")
|
|
elif content_type == 'tagged':
|
|
# Fallback for tagged without pending_downloads - extract from filename
|
|
import re as re_tagged_fallback
|
|
poster_files = {}
|
|
for file in list(user_temp_dir.iterdir()):
|
|
if file.is_file() and file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov']:
|
|
match = re_tagged_fallback.match(r'^(.+?)_(\d{8})_', file.name)
|
|
poster_username = match.group(1) if match else username
|
|
poster_dest = dest_dir / poster_username
|
|
poster_dest.mkdir(parents=True, exist_ok=True)
|
|
dest_file = poster_dest / file.name
|
|
poster_files.setdefault(poster_username, []).append((file, dest_file))
|
|
|
|
moved = 0
|
|
for poster_username, files in poster_files.items():
|
|
try:
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=poster_username,
|
|
content_type='tagged'
|
|
)
|
|
for file, dest_file in files:
|
|
success = self.move_manager.move_file(
|
|
source=file,
|
|
destination=dest_file,
|
|
content_type='tagged'
|
|
)
|
|
if success:
|
|
moved += 1
|
|
self.move_manager.end_batch()
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to move tagged files for {poster_username}: {e}", module="Instagram")
|
|
if self.move_manager.batch_context:
|
|
self.move_manager.end_batch()
|
|
self.logger.info(f"Moved {moved} tagged files (fallback) to user-specific folders", module="Core")
|
|
else:
|
|
# Add username subdirectory for organization (posts/stories)
|
|
user_dest = dest_dir / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Use shared move_manager with notification support
|
|
moved = self._move_and_process_files(
|
|
user_temp_dir, user_dest,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='instagram',
|
|
source=username,
|
|
content_type=content_type,
|
|
search_term=search_term
|
|
)
|
|
self.logger.info(f"Moved {moved} files to {user_dest}", module="Core")
|
|
|
|
# Record pending downloads after move completes
|
|
# Record regardless of moved count - files may have gone to recycle as duplicates
|
|
# The important thing is to record the media_id so we don't re-download
|
|
if pending_downloads:
|
|
self._record_pending_instagram_downloads(pending_downloads, dest_dir, username)
|
|
|
|
# Clean up temp directory after successful move
|
|
if user_temp_dir.exists() and user_temp_dir.is_dir():
|
|
try:
|
|
import shutil
|
|
shutil.rmtree(user_temp_dir)
|
|
self.logger.debug(f"Cleaned up ImgInn temp dir: {user_temp_dir}", module="Instagram")
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to clean up ImgInn temp dir: {e}", module="Instagram")
|
|
|
|
return count
|
|
else:
|
|
self.logger.warning(f"ImgInn subprocess failed for {username} {content_type} (code {returncode})", module="Instagram")
|
|
return 0
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"ImgInn subprocess timeout for {username} {content_type}", module="Instagram")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.warning(f"ImgInn subprocess error for {username} {content_type}: {e}", module="Instagram")
|
|
import traceback
|
|
self.logger.debug(traceback.format_exc())
|
|
return 0
|
|
|
|
def _download_instagram_client_content(self, username: str, content_type: str, config: Dict, phrase_config: Dict = None) -> int:
|
|
"""Download content via Instagram Client API using subprocess isolation
|
|
|
|
Returns:
|
|
Number of files downloaded
|
|
"""
|
|
temp_dir = self.base_path / config.get('temp_dir', f'temp/instagram_client/{content_type}')
|
|
dest_dir = Path(config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Prepare configuration for subprocess
|
|
# Default days_back per content type matches UI defaults
|
|
default_days = {'posts': 14, 'stories': 1, 'reels': 14, 'tagged': 7}
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_type': content_type,
|
|
'temp_dir': str(temp_dir),
|
|
'days_back': config.get('days_back') or default_days.get(content_type, 14),
|
|
'max_downloads': 50,
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db'),
|
|
'phrase_config': phrase_config
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'instagram_client_subprocess_wrapper.py')
|
|
|
|
# Run download in subprocess with real-time log streaming
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=300 # 5 minute timeout
|
|
)
|
|
|
|
# Parse JSON result from stdout
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
result = json.loads(stdout.strip())
|
|
count = result.get('count', 0)
|
|
pending_downloads = result.get('pending_downloads', [])
|
|
# Detect auth failure from subprocess
|
|
if result.get('auth_failed'):
|
|
self._ig_client_auth_failed = True
|
|
if result.get('user_id_failed'):
|
|
self._ig_client_user_id_failed = True
|
|
if result.get('invalid_owner'):
|
|
self._ig_client_invalid_owner = True
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse Instagram Client subprocess result for {username}: {e}", module="Instagram")
|
|
self.logger.debug(f"Raw stdout: {stdout[:500] if stdout else 'None'}", module="Instagram")
|
|
return 0
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {content_type} files for {username} via Instagram API", module="Instagram")
|
|
|
|
# Move files to destination
|
|
if self.config.get('download_settings', {}).get('move_to_destination', True):
|
|
user_temp_dir = temp_dir / username
|
|
if user_temp_dir.exists():
|
|
# Extract search term from phrase_config if present
|
|
search_term = None
|
|
if phrase_config and phrase_config.get('phrases'):
|
|
search_term = ', '.join(phrase_config.get('phrases', []))
|
|
|
|
# For tagged content, route files to poster-specific subfolders
|
|
if content_type == 'tagged' and pending_downloads:
|
|
filename_to_poster = {}
|
|
for dl in pending_downloads:
|
|
if dl.get('filename') and dl.get('username'):
|
|
filename_to_poster[dl['filename']] = dl['username']
|
|
|
|
# Group files by poster_username for batched notifications
|
|
poster_files = {}
|
|
for file in list(user_temp_dir.iterdir()):
|
|
if file.is_file() and file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov']:
|
|
poster_username = filename_to_poster.get(file.name)
|
|
if not poster_username:
|
|
match = re.match(r'^(.+?)_(\d{8})_', file.name)
|
|
if match:
|
|
poster_username = match.group(1)
|
|
else:
|
|
poster_username = username
|
|
poster_dest = dest_dir / poster_username
|
|
poster_dest.mkdir(parents=True, exist_ok=True)
|
|
dest_file = poster_dest / file.name
|
|
poster_files.setdefault(poster_username, []).append((file, dest_file))
|
|
|
|
moved = 0
|
|
for poster_username, files in poster_files.items():
|
|
try:
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=poster_username,
|
|
content_type='tagged'
|
|
)
|
|
for file, dest_file in files:
|
|
success = self.move_manager.move_file(
|
|
source=file,
|
|
destination=dest_file,
|
|
content_type='tagged'
|
|
)
|
|
if success:
|
|
moved += 1
|
|
self.move_manager.end_batch()
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to move tagged files for {poster_username}: {e}", module="Instagram")
|
|
if self.move_manager.batch_context:
|
|
self.move_manager.end_batch()
|
|
|
|
self.logger.info(f"Moved {moved} tagged files to user-specific folders", module="Core")
|
|
elif content_type == 'tagged':
|
|
# Fallback for tagged without pending_downloads
|
|
poster_files = {}
|
|
for file in list(user_temp_dir.iterdir()):
|
|
if file.is_file() and file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov']:
|
|
match = re.match(r'^(.+?)_(\d{8})_', file.name)
|
|
poster_username = match.group(1) if match else username
|
|
poster_dest = dest_dir / poster_username
|
|
poster_dest.mkdir(parents=True, exist_ok=True)
|
|
dest_file = poster_dest / file.name
|
|
poster_files.setdefault(poster_username, []).append((file, dest_file))
|
|
|
|
moved = 0
|
|
for poster_username, files in poster_files.items():
|
|
try:
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=poster_username,
|
|
content_type='tagged'
|
|
)
|
|
for file, dest_file in files:
|
|
success = self.move_manager.move_file(
|
|
source=file,
|
|
destination=dest_file,
|
|
content_type='tagged'
|
|
)
|
|
if success:
|
|
moved += 1
|
|
self.move_manager.end_batch()
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to move tagged files for {poster_username}: {e}", module="Instagram")
|
|
if self.move_manager.batch_context:
|
|
self.move_manager.end_batch()
|
|
self.logger.info(f"Moved {moved} tagged files (fallback) to user-specific folders", module="Core")
|
|
else:
|
|
# Posts/stories/reels - add username subdirectory
|
|
user_dest = dest_dir / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
moved = self._move_and_process_files(
|
|
user_temp_dir, user_dest,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='instagram',
|
|
source=username,
|
|
content_type=content_type,
|
|
search_term=search_term
|
|
)
|
|
self.logger.info(f"Moved {moved} files to {user_dest}", module="Core")
|
|
|
|
# Record pending downloads after move
|
|
if pending_downloads:
|
|
self._record_pending_instagram_downloads(pending_downloads, dest_dir, username, method='instagram_client')
|
|
|
|
# Clean up temp directory
|
|
if user_temp_dir.exists() and user_temp_dir.is_dir():
|
|
try:
|
|
import shutil
|
|
shutil.rmtree(user_temp_dir)
|
|
self.logger.debug(f"Cleaned up Instagram Client temp dir: {user_temp_dir}", module="Instagram")
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to clean up temp dir: {e}", module="Instagram")
|
|
|
|
return count
|
|
else:
|
|
self.logger.warning(f"Instagram Client subprocess failed for {username} {content_type} (code {returncode})", module="Instagram")
|
|
return 0
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"Instagram Client subprocess timeout for {username} {content_type}", module="Instagram")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.warning(f"Instagram Client subprocess error for {username} {content_type}: {e}", module="Instagram")
|
|
import traceback
|
|
self.logger.debug(traceback.format_exc())
|
|
return 0
|
|
|
|
def _record_pending_instagram_downloads(self, pending_downloads: list, dest_dir: Path, username: str, method: str = 'imginn'):
|
|
"""Record pending Instagram downloads to database after files have been moved
|
|
|
|
Args:
|
|
pending_downloads: List of download metadata dicts from download module
|
|
dest_dir: Final destination directory where files were moved
|
|
username: Instagram username
|
|
method: Download method used ('imginn', 'fastdl', 'instaloader', etc.)
|
|
"""
|
|
from modules.instagram_utils import record_instagram_download
|
|
from datetime import datetime
|
|
|
|
recorded = 0
|
|
for download in pending_downloads:
|
|
try:
|
|
# Get the filename and construct the final path
|
|
filename = download.get('filename')
|
|
if not filename:
|
|
continue
|
|
|
|
# The file should now be in dest_dir (or dest_dir/username depending on move logic)
|
|
# For tagged content, files are in dest_dir/poster_username where poster_username is from the download
|
|
poster_username = download.get('username', username)
|
|
|
|
# Check multiple possible locations
|
|
final_path = dest_dir / filename
|
|
alt_path = dest_dir / username / filename
|
|
poster_path = dest_dir / poster_username / filename # For tagged content
|
|
|
|
# Find the actual file location (for path tracking, but not required for recording)
|
|
actual_path = None
|
|
if poster_path.exists():
|
|
actual_path = poster_path
|
|
elif final_path.exists():
|
|
actual_path = final_path
|
|
elif alt_path.exists():
|
|
actual_path = alt_path
|
|
else:
|
|
# File might be in review folder
|
|
review_base = Path(str(dest_dir).replace('/md/', '/review/'))
|
|
review_path = review_base / filename
|
|
alt_review_path = review_base / username / filename
|
|
poster_review_path = review_base / poster_username / filename
|
|
if poster_review_path.exists():
|
|
actual_path = poster_review_path
|
|
elif review_path.exists():
|
|
actual_path = review_path
|
|
elif alt_review_path.exists():
|
|
actual_path = alt_review_path
|
|
|
|
# If file not found, still record the media_id to prevent re-downloading
|
|
# The file may have been sent to recycle as a duplicate
|
|
if not actual_path:
|
|
actual_path = dest_dir / filename # Use expected path for record
|
|
|
|
# Parse post_date back to datetime if it's a string
|
|
post_date = download.get('post_date')
|
|
if post_date and isinstance(post_date, (int, float)):
|
|
try:
|
|
post_date = datetime.fromtimestamp(post_date)
|
|
except (ValueError, OSError):
|
|
post_date = None
|
|
elif post_date and isinstance(post_date, str):
|
|
try:
|
|
post_date = datetime.fromisoformat(post_date)
|
|
except ValueError:
|
|
post_date = None
|
|
|
|
# Record to database with the actual final path
|
|
result = record_instagram_download(
|
|
db=self.unified_db,
|
|
media_id=download.get('media_id'),
|
|
username=download.get('username', username),
|
|
content_type=download.get('content_type', 'post'),
|
|
filename=filename,
|
|
url=download.get('url'),
|
|
post_date=post_date,
|
|
file_path=str(actual_path),
|
|
method=method,
|
|
extra_metadata=download.get('metadata')
|
|
)
|
|
|
|
if result:
|
|
recorded += 1
|
|
self.logger.debug(f"Recorded download: {filename} -> {actual_path}", module="Instagram")
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to record pending download: {e}", module="Instagram")
|
|
|
|
if recorded > 0:
|
|
self.logger.info(f"Recorded {recorded} downloads to database after move", module="Instagram")
|
|
|
|
def _record_pending_tiktok_downloads(self, pending_downloads: list, dest_dir: Path, username: str):
|
|
"""Record pending TikTok downloads to database after files have been moved
|
|
|
|
Args:
|
|
pending_downloads: List of download metadata dicts from tiktok module
|
|
dest_dir: Final destination directory where files were moved
|
|
username: TikTok username
|
|
"""
|
|
from datetime import datetime
|
|
|
|
recorded = 0
|
|
for download in pending_downloads:
|
|
try:
|
|
# Get the filename and construct the final path
|
|
filename = download.get('filename')
|
|
if not filename:
|
|
continue
|
|
|
|
# The file should now be in dest_dir
|
|
final_path = dest_dir / filename
|
|
|
|
# Find the actual file location
|
|
actual_path = None
|
|
if final_path.exists():
|
|
actual_path = final_path
|
|
else:
|
|
# File might be in review folder - check there too
|
|
review_base = Path(str(dest_dir).replace('/md/', '/review/'))
|
|
review_path = review_base / filename
|
|
if review_path.exists():
|
|
actual_path = review_path
|
|
|
|
if not actual_path:
|
|
self.logger.debug(f"File not found after move, skipping record: {filename}", module="TikTok")
|
|
continue
|
|
|
|
# Parse post_date back to datetime if it's a string
|
|
post_date = download.get('post_date')
|
|
if post_date and isinstance(post_date, (int, float)):
|
|
try:
|
|
post_date = datetime.fromtimestamp(post_date)
|
|
except (ValueError, OSError):
|
|
post_date = None
|
|
elif post_date and isinstance(post_date, str):
|
|
try:
|
|
post_date = datetime.fromisoformat(post_date)
|
|
except ValueError:
|
|
post_date = None
|
|
|
|
# Record to database with the actual final path
|
|
if 'tiktok' in self.modules:
|
|
result = self.modules['tiktok'].db.record_download(
|
|
video_id=download.get('video_id'),
|
|
username=download.get('username', username),
|
|
filename=filename,
|
|
post_date=post_date,
|
|
metadata=download.get('metadata'),
|
|
file_path=str(actual_path)
|
|
)
|
|
|
|
if result:
|
|
recorded += 1
|
|
self.logger.debug(f"Recorded download: {filename} -> {actual_path}", module="TikTok")
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to record pending download: {e}", module="TikTok")
|
|
|
|
if recorded > 0:
|
|
self.logger.info(f"Recorded {recorded} downloads to database after move", module="TikTok")
|
|
|
|
def _record_pending_snapchat_downloads(self, pending_downloads: list, dest_dir: Path, username: str):
|
|
"""Record pending Snapchat downloads to database after files have been moved
|
|
|
|
Args:
|
|
pending_downloads: List of download metadata dicts from snapchat module
|
|
dest_dir: Final destination directory where files were moved
|
|
username: Snapchat username
|
|
"""
|
|
from datetime import datetime
|
|
|
|
if 'snapchat' not in self.modules:
|
|
self.logger.debug("Snapchat module not available for recording", module="Snapchat")
|
|
return
|
|
|
|
recorded = 0
|
|
for download in pending_downloads:
|
|
try:
|
|
# Get the filename and construct the final path
|
|
filename = download.get('filename')
|
|
if not filename:
|
|
continue
|
|
|
|
# The file should now be in dest_dir
|
|
final_path = dest_dir / filename
|
|
|
|
# Find the actual file location
|
|
actual_path = None
|
|
if final_path.exists():
|
|
actual_path = final_path
|
|
else:
|
|
# File might be in review folder - check there too
|
|
review_base = Path(str(dest_dir).replace('/md/', '/review/'))
|
|
review_path = review_base / filename
|
|
if review_path.exists():
|
|
actual_path = review_path
|
|
|
|
if not actual_path:
|
|
self.logger.debug(f"File not found after move, skipping record: {filename}", module="Snapchat")
|
|
continue
|
|
|
|
# Parse post_date back to datetime if it's a string
|
|
post_date = download.get('post_date')
|
|
if post_date and isinstance(post_date, (int, float)):
|
|
try:
|
|
post_date = datetime.fromtimestamp(post_date)
|
|
except (ValueError, OSError):
|
|
post_date = None
|
|
elif post_date and isinstance(post_date, str):
|
|
try:
|
|
post_date = datetime.fromisoformat(post_date)
|
|
except ValueError:
|
|
post_date = None
|
|
|
|
# Record to database with the actual final path
|
|
if hasattr(self.modules['snapchat'], 'db') and self.modules['snapchat'].db:
|
|
self.modules['snapchat'].db.mark_downloaded(
|
|
username=download.get('username', username),
|
|
url=download.get('url'),
|
|
filename=filename,
|
|
post_date=post_date,
|
|
metadata=download.get('metadata'),
|
|
file_path=str(actual_path)
|
|
)
|
|
recorded += 1
|
|
self.logger.debug(f"Recorded download: {filename} -> {actual_path}", module="Snapchat")
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to record pending download: {e}", module="Snapchat")
|
|
|
|
if recorded > 0:
|
|
self.logger.info(f"Recorded {recorded} downloads to database after move", module="Snapchat")
|
|
|
|
def _record_pending_coppermine_downloads(self, pending_downloads: list, dest_dir: Path, downloader):
|
|
"""Record pending Coppermine downloads to database after files have been moved
|
|
|
|
Args:
|
|
pending_downloads: List of download metadata dicts from coppermine module
|
|
dest_dir: Final destination directory where files were moved
|
|
downloader: The CoppermineDownloader instance with database connection
|
|
"""
|
|
from datetime import datetime
|
|
|
|
if not hasattr(downloader, 'db') or not downloader.db:
|
|
self.logger.debug("Coppermine database not available for recording", module="Coppermine")
|
|
return
|
|
|
|
recorded = 0
|
|
for download in pending_downloads:
|
|
try:
|
|
# Get the filename and construct the final path
|
|
filename = download.get('filename')
|
|
if not filename:
|
|
continue
|
|
|
|
# The file should now be in dest_dir
|
|
final_path = dest_dir / filename
|
|
|
|
# Find the actual file location
|
|
actual_path = None
|
|
if final_path.exists():
|
|
actual_path = final_path
|
|
else:
|
|
# File might be in review folder - check there too
|
|
review_base = Path(str(dest_dir).replace('/md/', '/review/'))
|
|
review_path = review_base / filename
|
|
if review_path.exists():
|
|
actual_path = review_path
|
|
|
|
if not actual_path:
|
|
self.logger.debug(f"File not found after move, skipping record: {filename}", module="Coppermine")
|
|
continue
|
|
|
|
# Parse post_date back to datetime if it's a string
|
|
post_date = download.get('post_date')
|
|
if post_date and isinstance(post_date, (int, float)):
|
|
try:
|
|
post_date = datetime.fromtimestamp(post_date)
|
|
except (ValueError, OSError):
|
|
post_date = None
|
|
elif post_date and isinstance(post_date, str):
|
|
try:
|
|
post_date = datetime.fromisoformat(post_date)
|
|
except ValueError:
|
|
post_date = None
|
|
|
|
# Record to database with the actual final path
|
|
downloader.db.add_download(
|
|
url=download.get('url'),
|
|
platform=download.get('platform', 'coppermine'),
|
|
source=download.get('source'),
|
|
content_type=download.get('content_type', 'image'),
|
|
filename=filename,
|
|
file_path=str(actual_path),
|
|
file_size=download.get('file_size'),
|
|
file_hash=download.get('file_hash'),
|
|
post_date=post_date,
|
|
metadata=download.get('metadata')
|
|
)
|
|
recorded += 1
|
|
self.logger.debug(f"Recorded download: {filename} -> {actual_path}", module="Coppermine")
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to record pending download: {e}", module="Coppermine")
|
|
|
|
# Clear pending downloads after recording
|
|
downloader.clear_pending_downloads()
|
|
|
|
if recorded > 0:
|
|
self.logger.info(f"Recorded {recorded} downloads to database after move", module="Coppermine")
|
|
|
|
def download_imginn(self):
|
|
"""Download Instagram content via ImgInn"""
|
|
total_downloaded = 0
|
|
imginn_config = self.config.get('imginn', {})
|
|
|
|
if not imginn_config.get('enabled'):
|
|
self.logger.debug("ImgInn is disabled in config", module="Instagram")
|
|
return 0
|
|
|
|
# Check if phrase search is enabled
|
|
phrase_config = imginn_config.get('phrase_search', {})
|
|
|
|
# Process regular usernames (randomize order to avoid detection)
|
|
usernames = imginn_config.get('usernames', [])
|
|
random.shuffle(usernames)
|
|
|
|
# Calculate total accounts for progress tracking
|
|
phrase_usernames_list = phrase_config.get('usernames', []) if phrase_config.get('enabled') else []
|
|
total_accounts = len(usernames) + len(phrase_usernames_list)
|
|
accounts_processed = 0
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
# Unified runner manages its own activity progress
|
|
if self.config.get('instagram_unified', {}).get('enabled'):
|
|
activity_mgr = None
|
|
|
|
# Get delay between users to avoid Cloudflare rate limiting
|
|
# Default: 20-40 seconds between users (configurable via imginn.user_delay_seconds)
|
|
user_delay_min = imginn_config.get('user_delay_seconds', 20)
|
|
user_delay_max = user_delay_min + 20 # Add 20s variability
|
|
|
|
for idx, username in enumerate(usernames):
|
|
accounts_processed += 1
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(accounts_processed, total_accounts)
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
# Add delay before each user (including first) to avoid Cloudflare rate limiting
|
|
if idx == 0:
|
|
# Initial delay before first user - give Cloudflare time to settle
|
|
delay = random.uniform(10, 20)
|
|
self.logger.info(f"Cloudflare rate limit: initial wait {delay:.1f}s before first user", module="Instagram")
|
|
else:
|
|
delay = random.uniform(user_delay_min, user_delay_max)
|
|
self.logger.info(f"Cloudflare rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(usernames)})", module="Instagram")
|
|
time.sleep(delay)
|
|
|
|
self.logger.info(f"Processing Instagram user via ImgInn: {username}", module="Instagram")
|
|
|
|
# Start batch for this user (will combine all content types into one notification)
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=username,
|
|
content_type='media' # Generic since we're combining posts/stories
|
|
)
|
|
|
|
# Download each content type (ImgInn supports posts, stories, and tagged)
|
|
content_type_count = 0
|
|
for content_type in ['posts', 'stories', 'tagged']:
|
|
if imginn_config.get(content_type, {}).get('enabled'):
|
|
# Rate limiting between content types to avoid Cloudflare blocks
|
|
if content_type_count > 0:
|
|
delay = random.uniform(15, 25)
|
|
self.logger.info(f"Rate limiting: waiting {delay:.1f}s before {content_type} (Cloudflare avoidance)", module="Instagram")
|
|
time.sleep(delay)
|
|
content_type_count += 1
|
|
|
|
config = imginn_config[content_type]
|
|
count = self._download_imginn_content(username, content_type, config, phrase_config=None)
|
|
if count:
|
|
total_downloaded += count
|
|
|
|
# End batch and send one notification for all content types
|
|
self.move_manager.end_batch()
|
|
|
|
# Process phrase search usernames if enabled
|
|
if phrase_config.get('enabled'):
|
|
phrase_usernames = phrase_config.get('usernames', [])
|
|
download_all = phrase_config.get('download_all', False)
|
|
if phrase_usernames:
|
|
# Randomize phrase search account order to avoid detection
|
|
random.shuffle(phrase_usernames)
|
|
|
|
if download_all:
|
|
self.logger.info(f"Processing download-all for users: {phrase_usernames}", module="Core")
|
|
else:
|
|
self.logger.info(f"Processing phrase search for users: {phrase_usernames}", module="Core")
|
|
self.logger.info(f"Looking for phrases: {phrase_config.get('phrases', [])}", module="Core")
|
|
|
|
# Get delay between users to avoid Cloudflare rate limiting
|
|
# Default: 20-40 seconds between users (configurable via imginn.user_delay_seconds)
|
|
user_delay_min = imginn_config.get('user_delay_seconds', 20)
|
|
user_delay_max = user_delay_min + 20 # Add 20s variability
|
|
|
|
for idx, username in enumerate(phrase_usernames):
|
|
accounts_processed += 1
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(accounts_processed, total_accounts)
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Phrase search")
|
|
except Exception:
|
|
pass
|
|
|
|
# Add delay before each user (including first) to avoid Cloudflare rate limiting
|
|
if idx == 0:
|
|
# Initial delay before first user - give Cloudflare time to settle
|
|
delay = random.uniform(10, 20)
|
|
self.logger.info(f"Cloudflare rate limit: initial wait {delay:.1f}s before first user", module="Instagram")
|
|
else:
|
|
delay = random.uniform(user_delay_min, user_delay_max)
|
|
self.logger.info(f"Cloudflare rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(phrase_usernames)})", module="Instagram")
|
|
time.sleep(delay)
|
|
|
|
if download_all:
|
|
self.logger.info(f"Downloading all posts from {username} (no phrase filter)", module="Core")
|
|
else:
|
|
self.logger.info(f"Searching {username} for phrase matches", module="Core")
|
|
|
|
if imginn_config.get('posts', {}).get('enabled'):
|
|
config = imginn_config['posts']
|
|
|
|
try:
|
|
# If download_all is enabled, skip phrase filtering by passing None
|
|
effective_phrase_config = None if download_all else phrase_config
|
|
count = self._download_imginn_content(username, 'posts', config, phrase_config=effective_phrase_config)
|
|
|
|
if count:
|
|
if download_all:
|
|
self.logger.info(f"Downloaded {count} posts from {username}", module="Core")
|
|
else:
|
|
self.logger.info(f"Downloaded {count} phrase-matching posts from {username}", module="Core")
|
|
total_downloaded += count
|
|
# Note: Don't create separate scheduler tasks for phrase search usernames
|
|
# They run as part of the main imginn task
|
|
else:
|
|
if download_all:
|
|
self.logger.info(f"No new posts found for {username}", module="Core")
|
|
else:
|
|
self.logger.info(f"No phrase-matching posts found for {username}", module="Core")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"ImgInn phrase search error for {username}: {e}", module="Instagram")
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} ImgInn files, triggering Immich scan", module="Instagram")
|
|
self.trigger_immich_scan()
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
def download_instagram_client(self):
|
|
"""Download Instagram content via direct API (instagram_client)"""
|
|
total_downloaded = 0
|
|
ic_config = self.config.get('instagram_client', {})
|
|
|
|
if not ic_config.get('enabled'):
|
|
self.logger.debug("Instagram Client is disabled in config", module="Instagram")
|
|
return 0
|
|
|
|
# Auth failure tracking — skip stories/tagged once cookies expire
|
|
self._ig_client_auth_failed = False
|
|
self._ig_client_user_id_failed = False
|
|
self._ig_client_invalid_owner = False
|
|
self._ig_auth_alert_sent = False
|
|
|
|
# Check if phrase search is enabled
|
|
phrase_config = ic_config.get('phrase_search', {})
|
|
|
|
# Process regular usernames (randomize order)
|
|
usernames = ic_config.get('usernames', [])
|
|
random.shuffle(usernames)
|
|
|
|
# Calculate total accounts for progress tracking
|
|
phrase_usernames_list = phrase_config.get('usernames', []) if phrase_config.get('enabled') else []
|
|
total_accounts = len(usernames) + len(phrase_usernames_list)
|
|
accounts_processed = 0
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
# Unified runner manages its own activity progress
|
|
if self.config.get('instagram_unified', {}).get('enabled'):
|
|
activity_mgr = None
|
|
|
|
# Shorter delay than ImgInn (no Cloudflare involved)
|
|
user_delay_min = ic_config.get('user_delay_seconds', 20)
|
|
user_delay_max = user_delay_min + 15
|
|
|
|
for idx, username in enumerate(usernames):
|
|
accounts_processed += 1
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(accounts_processed, total_accounts)
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
# Add delay between users
|
|
if idx > 0:
|
|
delay = random.uniform(user_delay_min, user_delay_max)
|
|
self.logger.info(f"Rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(usernames)})", module="Instagram")
|
|
time.sleep(delay)
|
|
|
|
self.logger.info(f"Processing Instagram user via API: {username}", module="Instagram")
|
|
|
|
# Start batch for this user
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=username,
|
|
content_type='media'
|
|
)
|
|
|
|
# Download each content type (posts, stories, reels, tagged)
|
|
# Stories and tagged require auth; skip them if cookies have expired
|
|
content_type_count = 0
|
|
for content_type in ['posts', 'stories', 'reels', 'tagged']:
|
|
if ic_config.get(content_type, {}).get('enabled'):
|
|
# Skip auth-required content types if cookies have expired
|
|
if self._ig_client_auth_failed and content_type in ('stories', 'tagged'):
|
|
self.logger.debug(f"Skipping {content_type} for {username} — session expired", module="Instagram")
|
|
continue
|
|
|
|
# Rate limiting between content types
|
|
if content_type_count > 0:
|
|
delay = random.uniform(3, 6)
|
|
self.logger.info(f"Rate limiting: waiting {delay:.1f}s before {content_type}", module="Instagram")
|
|
time.sleep(delay)
|
|
content_type_count += 1
|
|
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_status(f"Downloading {content_type}")
|
|
except Exception:
|
|
pass
|
|
|
|
config = ic_config[content_type]
|
|
count = self._download_instagram_client_content(username, content_type, config, phrase_config=None)
|
|
if count:
|
|
total_downloaded += count
|
|
|
|
# On first auth failure, log it (no push notification or health banner)
|
|
if self._ig_client_auth_failed and not getattr(self, '_ig_auth_alert_sent', False):
|
|
self._ig_auth_alert_sent = True
|
|
self.logger.warning("Instagram session expired — skipping stories/tagged for remaining users, continuing posts/reels", module="Instagram")
|
|
|
|
# End batch and send one notification for all content types
|
|
self.move_manager.end_batch()
|
|
|
|
# Auto-import to private gallery after each user
|
|
if not getattr(self, '_skip_gallery_bridge', False):
|
|
try:
|
|
from modules.scraper_gallery_bridge import get_crypto, on_download_complete
|
|
crypto = get_crypto()
|
|
if crypto:
|
|
imported = on_download_complete(f"instagram_client:{username}", 1, self.unified_db, crypto)
|
|
if imported:
|
|
self.logger.info(f"Gallery bridge: imported {imported} files for {username}", module="Instagram")
|
|
except Exception as e:
|
|
self.logger.debug(f"Gallery bridge: {e}", module="Instagram")
|
|
|
|
# Process phrase search usernames if enabled
|
|
if phrase_config.get('enabled'):
|
|
phrase_usernames = phrase_config.get('usernames', [])
|
|
download_all = phrase_config.get('download_all', False)
|
|
if phrase_usernames:
|
|
random.shuffle(phrase_usernames)
|
|
|
|
if download_all:
|
|
self.logger.info(f"Processing download-all for users: {phrase_usernames}", module="Core")
|
|
else:
|
|
self.logger.info(f"Processing phrase search for users: {phrase_usernames}", module="Core")
|
|
self.logger.info(f"Looking for phrases: {phrase_config.get('phrases', [])}", module="Core")
|
|
|
|
for idx, username in enumerate(phrase_usernames):
|
|
accounts_processed += 1
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(accounts_processed, total_accounts)
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Phrase search" if not download_all else "Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
if idx > 0 and idx % 15 == 0:
|
|
batch_pause = random.uniform(45, 90)
|
|
self.logger.info(f"Batch pause: {batch_pause:.0f}s after {idx} users", module="Instagram")
|
|
time.sleep(batch_pause)
|
|
elif idx > 0:
|
|
delay = random.uniform(user_delay_min, user_delay_max)
|
|
self.logger.info(f"Rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(phrase_usernames)})", module="Instagram")
|
|
time.sleep(delay)
|
|
|
|
# Start batch for phrase search user
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=username,
|
|
content_type='media'
|
|
)
|
|
|
|
if download_all:
|
|
# Download all content types (posts, stories, reels) for download_all users
|
|
content_type_count = 0
|
|
for content_type in ['posts', 'stories', 'reels']:
|
|
if ic_config.get(content_type, {}).get('enabled'):
|
|
# Skip auth-required content types if cookies have expired
|
|
if self._ig_client_auth_failed and content_type == 'stories':
|
|
self.logger.debug(f"Skipping {content_type} for {username} — session expired", module="Instagram")
|
|
continue
|
|
|
|
if content_type_count > 0:
|
|
delay = random.uniform(3, 6)
|
|
self.logger.info(f"Rate limiting: waiting {delay:.1f}s before {content_type}", module="Instagram")
|
|
time.sleep(delay)
|
|
content_type_count += 1
|
|
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_status(f"Downloading {content_type}")
|
|
except Exception:
|
|
pass
|
|
|
|
config = ic_config[content_type]
|
|
try:
|
|
count = self._download_instagram_client_content(username, content_type, config, phrase_config=None)
|
|
if count:
|
|
total_downloaded += count
|
|
except Exception as e:
|
|
self.logger.error(f"Instagram Client download_all error for {username}/{content_type}: {e}", module="Instagram")
|
|
else:
|
|
# Phrase search: only download posts with phrase filter
|
|
if ic_config.get('posts', {}).get('enabled'):
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_status("Downloading posts")
|
|
except Exception:
|
|
pass
|
|
config = ic_config['posts']
|
|
try:
|
|
count = self._download_instagram_client_content(username, 'posts', config, phrase_config=phrase_config)
|
|
if count:
|
|
total_downloaded += count
|
|
except Exception as e:
|
|
self.logger.error(f"Instagram Client phrase search error for {username}: {e}", module="Instagram")
|
|
|
|
# End batch for phrase search user
|
|
self.move_manager.end_batch()
|
|
|
|
# Auto-import to private gallery after each phrase search user
|
|
if not getattr(self, '_skip_gallery_bridge', False):
|
|
try:
|
|
from modules.scraper_gallery_bridge import get_crypto, on_download_complete
|
|
crypto = get_crypto()
|
|
if crypto:
|
|
imported = on_download_complete(f"instagram_client:{username}", 1, self.unified_db, crypto)
|
|
if imported:
|
|
self.logger.info(f"Gallery bridge: imported {imported} files for {username}", module="Instagram")
|
|
except Exception as e:
|
|
self.logger.debug(f"Gallery bridge: {e}", module="Instagram")
|
|
|
|
# Import to private gallery via bridge
|
|
if total_downloaded > 0 and not getattr(self, '_skip_gallery_bridge', False):
|
|
try:
|
|
from modules.scraper_gallery_bridge import get_crypto, on_download_complete
|
|
crypto = get_crypto()
|
|
if crypto:
|
|
imported = on_download_complete(f"instagram_client:all", total_downloaded, self.unified_db, crypto)
|
|
if imported:
|
|
self.logger.info(f"Gallery bridge: imported {imported} files from instagram_client run", module="Instagram")
|
|
except Exception as e:
|
|
self.logger.debug(f"Gallery bridge: {e}", module="Instagram")
|
|
|
|
# Trigger Immich scan if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} Instagram Client files, triggering Immich scan", module="Instagram")
|
|
self.trigger_immich_scan()
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
def download_toolzu(self):
|
|
"""Download Instagram content via Toolzu (1920x1440 resolution) using subprocess isolation"""
|
|
if 'toolzu' not in self.modules:
|
|
self.logger.warning("Toolzu module not available", module="Toolzu")
|
|
return 0
|
|
|
|
total_downloaded = 0
|
|
toolzu_config = self.config.get('toolzu', {})
|
|
|
|
if not toolzu_config.get('enabled'):
|
|
self.logger.debug("Toolzu is disabled in config", module="Toolzu")
|
|
return 0
|
|
|
|
# Process regular usernames (randomize order to avoid detection)
|
|
usernames = toolzu_config.get('usernames', [])
|
|
random.shuffle(usernames)
|
|
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
# Unified runner manages its own activity progress
|
|
if self.config.get('instagram_unified', {}).get('enabled'):
|
|
activity_mgr = None
|
|
|
|
# Get delay between users to avoid Cloudflare rate limiting
|
|
# Default: 20-40 seconds between users (configurable via toolzu.user_delay_seconds)
|
|
user_delay_min = toolzu_config.get('user_delay_seconds', 20)
|
|
user_delay_max = user_delay_min + 20 # Add 20s variability
|
|
|
|
for idx, username in enumerate(usernames):
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(idx + 1, len(usernames))
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
# Add delay before each user (including first) to avoid Cloudflare rate limiting
|
|
if idx == 0:
|
|
# Initial delay before first user - give Cloudflare time to settle
|
|
delay = random.uniform(10, 20)
|
|
self.logger.info(f"Cloudflare rate limit: initial wait {delay:.1f}s before first user", module="Toolzu")
|
|
else:
|
|
delay = random.uniform(user_delay_min, user_delay_max)
|
|
self.logger.info(f"Cloudflare rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(usernames)})", module="Toolzu")
|
|
time.sleep(delay)
|
|
|
|
self.logger.info(f"Processing Instagram user via Toolzu: {username}", module="Instagram")
|
|
|
|
# Start batch for this user (will combine all content types into one notification)
|
|
self.move_manager.start_batch(
|
|
platform='instagram',
|
|
source=username,
|
|
content_type='media' # Generic since we're combining posts/stories
|
|
)
|
|
|
|
# Download each content type (Toolzu only supports posts and stories)
|
|
content_type_count = 0
|
|
for content_type in ['posts', 'stories']:
|
|
if toolzu_config.get(content_type, {}).get('enabled'):
|
|
# Rate limiting between content types to avoid Cloudflare blocks
|
|
if content_type_count > 0:
|
|
delay = random.uniform(15, 25)
|
|
self.logger.info(f"Rate limiting: waiting {delay:.1f}s before {content_type} (Cloudflare avoidance)", module="Toolzu")
|
|
time.sleep(delay)
|
|
content_type_count += 1
|
|
|
|
config = toolzu_config[content_type]
|
|
temp_dir = self.base_path / config.get('temp_dir', f'temp/toolzu/{content_type}')
|
|
|
|
# Prepare configuration for subprocess
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_type': content_type,
|
|
'temp_dir': str(temp_dir),
|
|
'days_back': config.get('days_back', 3),
|
|
'max_downloads': 15,
|
|
'headless': False, # Toolzu uses headed mode with Xvfb
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db'),
|
|
'cookie_file': toolzu_config.get('cookie_file'),
|
|
'toolzu_email': toolzu_config.get('email'),
|
|
'toolzu_password': toolzu_config.get('password')
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'toolzu_subprocess_wrapper.py')
|
|
|
|
# Run Toolzu download in subprocess with real-time log streaming
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=300 # 5 minute timeout
|
|
)
|
|
|
|
# Parse JSON result from stdout
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
toolzu_result = json.loads(stdout.strip())
|
|
count = toolzu_result.get('count', 0)
|
|
pending_downloads = toolzu_result.get('pending_downloads', [])
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse Toolzu subprocess result for {username}: {e}", module="Toolzu")
|
|
self.logger.debug(f"Raw stdout: {stdout[:500] if stdout else 'None'}", module="Toolzu")
|
|
continue
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {content_type} for {username} via Toolzu (subprocess)", module="Toolzu")
|
|
|
|
# Auto-merge with FastDL for quality upgrade (posts images only)
|
|
if self.config.get('download_settings', {}).get('move_to_destination', True):
|
|
dest_path = Path(config.get('destination_path'))
|
|
# Add username subdirectory for organization
|
|
user_dest = dest_path / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
if content_type == 'posts':
|
|
# Only upgrade posts (not stories) and only images (not videos)
|
|
moved_count = self._merge_fastdl_toolzu_quality_upgrade(
|
|
username, content_type, temp_dir, user_dest
|
|
)
|
|
self.logger.info(f"Processed {moved_count} files (upgraded quality where possible)", module="Core")
|
|
else:
|
|
# For stories, just move files without upgrade
|
|
moved = self._move_and_process_files(
|
|
temp_dir, user_dest,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='instagram',
|
|
source=username,
|
|
content_type=content_type
|
|
)
|
|
self.logger.info(f"Moved {moved} {content_type} files (no upgrade needed for stories)", module="Core")
|
|
|
|
# Record pending downloads after move completes
|
|
# Record regardless of moved count - files may have gone to recycle as duplicates
|
|
if pending_downloads:
|
|
self._record_pending_instagram_downloads(pending_downloads, user_dest, username, method='toolzu')
|
|
|
|
total_downloaded += count
|
|
else:
|
|
self.logger.warning(f"Toolzu subprocess failed for {username} {content_type} (code {returncode})", module="Toolzu")
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"Toolzu subprocess timeout for {username} {content_type}", module="Toolzu")
|
|
except Exception as e:
|
|
self.logger.warning(f"Toolzu subprocess error for {username} {content_type}: {e}", module="Toolzu")
|
|
|
|
# End batch and send one notification for all content types
|
|
self.move_manager.end_batch()
|
|
|
|
# Update scheduler for this manual run
|
|
if not self.config.get('instagram_unified', {}).get('enabled'):
|
|
self._update_scheduler_for_manual_run('toolzu', username)
|
|
|
|
# Update scheduler for manual run (platform level if no specific users)
|
|
if total_downloaded > 0:
|
|
if not self.config.get('instagram_unified', {}).get('enabled'):
|
|
self._update_scheduler_for_manual_run('toolzu')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} Toolzu files, triggering Immich scan", module="Toolzu")
|
|
self.trigger_immich_scan()
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
def _download_snapchat_content(self, username: str, content_type: str, config: Dict) -> int:
|
|
"""Download content via Snapchat using subprocess isolation
|
|
|
|
Args:
|
|
username: Snapchat username
|
|
content_type: Type of content ('stories')
|
|
config: Content-specific configuration
|
|
|
|
Returns:
|
|
Number of files downloaded
|
|
"""
|
|
snapchat_config = self.config.get('snapchat', {})
|
|
temp_dir = self.base_path / config.get('temp_dir', 'temp/snapchat/stories')
|
|
|
|
# Prepare configuration for subprocess
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_type': content_type,
|
|
'temp_dir': str(temp_dir),
|
|
'days_back': config.get('days_back', 7),
|
|
'max_downloads': config.get('max_downloads', 50),
|
|
'headless': False, # Snapchat uses headed mode with Xvfb
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db')
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'snapchat_subprocess_wrapper.py')
|
|
|
|
# Run Snapchat download in subprocess with real-time log streaming
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=600 # 10 minute timeout
|
|
)
|
|
|
|
# Parse JSON result from stdout
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
snapchat_result = json.loads(stdout.strip())
|
|
count = snapchat_result.get('count', 0)
|
|
pending_downloads = snapchat_result.get('pending_downloads', [])
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse Snapchat subprocess result for {username}: {e}", module="Snapchat")
|
|
self.logger.debug(f"Raw stdout: {stdout[:500] if stdout else 'None'}", module="Snapchat")
|
|
return 0
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {content_type} for {username} via Snapchat (subprocess)", module="Snapchat")
|
|
|
|
# Move files to destination if configured
|
|
if self.config.get('download_settings', {}).get('move_to_destination', True):
|
|
dest_path = Path(config.get('destination_path'))
|
|
moved = self._move_and_process_files(
|
|
temp_dir, dest_path,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='snapchat',
|
|
source=username,
|
|
content_type=content_type
|
|
)
|
|
self.logger.info(f"Moved {moved} Snapchat {content_type} files", module="Snapchat")
|
|
|
|
# Record pending downloads after successful move
|
|
# This ensures database records only exist for files that made it to final destination
|
|
if pending_downloads and moved > 0:
|
|
self._record_pending_snapchat_downloads(pending_downloads, dest_path, username)
|
|
|
|
# Clean up temp directory after successful move
|
|
user_temp_dir = temp_dir / username.lower()
|
|
if user_temp_dir.exists():
|
|
try:
|
|
import shutil
|
|
shutil.rmtree(user_temp_dir)
|
|
self.logger.debug(f"Cleaned up Snapchat temp dir: {user_temp_dir}", module="Snapchat")
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to clean up Snapchat temp dir: {e}", module="Snapchat")
|
|
|
|
return count
|
|
else:
|
|
self.logger.warning(f"Snapchat subprocess failed for {username} {content_type} (code {returncode})", module="Snapchat")
|
|
return 0
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"Snapchat subprocess timeout for {username} {content_type}", module="Snapchat")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.warning(f"Snapchat subprocess error for {username} {content_type}: {e}", module="Snapchat")
|
|
return 0
|
|
|
|
def _download_snapchat_client_content(self, username: str, content_type: str, config: Dict) -> int:
|
|
"""Download content via Snapchat Client API using subprocess isolation
|
|
|
|
Args:
|
|
username: Snapchat username
|
|
content_type: Type of content ('stories')
|
|
config: Content-specific configuration
|
|
|
|
Returns:
|
|
Number of files downloaded
|
|
"""
|
|
sc_config = self.config.get('snapchat_client', {})
|
|
temp_dir = self.base_path / config.get('temp_dir', 'temp/snapchat_client/stories')
|
|
|
|
# Prepare configuration for subprocess
|
|
subprocess_config = {
|
|
'username': username,
|
|
'content_type': content_type,
|
|
'temp_dir': str(temp_dir),
|
|
'days_back': config.get('days_back', 7),
|
|
'max_downloads': config.get('max_downloads', 50),
|
|
'db_path': str(self.base_path / 'database' / 'media_downloader.db')
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'snapchat_client_subprocess_wrapper.py')
|
|
|
|
# Run download in subprocess with real-time log streaming
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=120 # 2 minute timeout (no Cloudflare wait needed)
|
|
)
|
|
|
|
# Parse JSON result from stdout
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
result = json.loads(stdout.strip())
|
|
count = result.get('count', 0)
|
|
pending_downloads = result.get('pending_downloads', [])
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse Snapchat Client subprocess result for {username}: {e}", module="Snapchat")
|
|
self.logger.debug(f"Raw stdout: {stdout[:500] if stdout else 'None'}", module="Snapchat")
|
|
return 0
|
|
|
|
if count:
|
|
self.logger.info(f"Downloaded {count} {content_type} for {username} via Snapchat API", module="Snapchat")
|
|
|
|
# Move files to destination if configured
|
|
if self.config.get('download_settings', {}).get('move_to_destination', True):
|
|
dest_path = Path(config.get('destination_path'))
|
|
moved = self._move_and_process_files(
|
|
temp_dir, dest_path,
|
|
['.jpg', '.jpeg', '.png', '.webp', '.heic', '.mp4', '.mov'],
|
|
platform='snapchat',
|
|
source=username,
|
|
content_type=content_type
|
|
)
|
|
self.logger.info(f"Moved {moved} Snapchat Client {content_type} files", module="Snapchat")
|
|
|
|
# Record pending downloads after successful move
|
|
if pending_downloads and moved > 0:
|
|
self._record_pending_snapchat_downloads(pending_downloads, dest_path, username)
|
|
|
|
# Clean up temp directory after successful move
|
|
user_temp_dir = temp_dir / username.lower()
|
|
if user_temp_dir.exists():
|
|
try:
|
|
import shutil
|
|
shutil.rmtree(user_temp_dir)
|
|
self.logger.debug(f"Cleaned up Snapchat Client temp dir: {user_temp_dir}", module="Snapchat")
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to clean up Snapchat Client temp dir: {e}", module="Snapchat")
|
|
|
|
return count
|
|
else:
|
|
self.logger.warning(f"Snapchat Client subprocess failed for {username} {content_type} (code {returncode})", module="Snapchat")
|
|
return 0
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"Snapchat Client subprocess timeout for {username} {content_type}", module="Snapchat")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.warning(f"Snapchat Client subprocess error for {username} {content_type}: {e}", module="Snapchat")
|
|
return 0
|
|
|
|
def download_snapchat(self):
|
|
"""Download Snapchat content via Direct Scraper"""
|
|
snapchat_config = self.config.get('snapchat', {})
|
|
|
|
if not snapchat_config.get('enabled'):
|
|
self.logger.debug("Snapchat is disabled in config", module="Snapchat")
|
|
return 0
|
|
|
|
total_downloaded = 0
|
|
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
|
|
# Process regular usernames
|
|
snap_usernames = snapchat_config.get('usernames', [])
|
|
for idx, username in enumerate(snap_usernames):
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(idx + 1, len(snap_usernames))
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
self.logger.info(f"Processing Snapchat user: {username}", module="Snapchat")
|
|
|
|
# Start batch for this user
|
|
self.move_manager.start_batch(
|
|
platform='snapchat',
|
|
source=username,
|
|
content_type='stories'
|
|
)
|
|
|
|
# Download stories and spotlights via direct Snapchat scraper
|
|
if snapchat_config.get('stories', {}).get('enabled'):
|
|
config = snapchat_config['stories']
|
|
count = self._download_snapchat_content(username, 'stories', config)
|
|
total_downloaded += count
|
|
|
|
# End batch and send notification
|
|
self.move_manager.end_batch()
|
|
|
|
# Update scheduler for this manual run
|
|
self._update_scheduler_for_manual_run('snapchat', username)
|
|
|
|
# Update scheduler for manual run (platform level)
|
|
if total_downloaded > 0:
|
|
self._update_scheduler_for_manual_run('snapchat')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} Snapchat files, triggering Immich scan", module="Snapchat")
|
|
self.trigger_immich_scan()
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
def download_snapchat_client(self):
|
|
"""Download Snapchat content via direct API (no Playwright)"""
|
|
sc_config = self.config.get('snapchat_client', {})
|
|
|
|
if not sc_config.get('enabled'):
|
|
self.logger.debug("Snapchat Client is disabled in config", module="Snapchat")
|
|
return 0
|
|
|
|
total_downloaded = 0
|
|
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
|
|
# Process usernames
|
|
usernames = sc_config.get('usernames', [])
|
|
user_delay = sc_config.get('user_delay_seconds', 10)
|
|
|
|
for idx, username in enumerate(usernames):
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(idx + 1, len(usernames))
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
# Add delay between users
|
|
if idx > 0:
|
|
delay = random.uniform(user_delay, user_delay + 5)
|
|
self.logger.info(f"Rate limit: waiting {delay:.1f}s before next user ({idx+1}/{len(usernames)})", module="Snapchat")
|
|
time.sleep(delay)
|
|
|
|
self.logger.info(f"Processing Snapchat user via API: {username}", module="Snapchat")
|
|
|
|
# Start batch for this user
|
|
self.move_manager.start_batch(
|
|
platform='snapchat',
|
|
source=username,
|
|
content_type='stories'
|
|
)
|
|
|
|
# Download stories/highlights
|
|
if sc_config.get('stories', {}).get('enabled'):
|
|
config = sc_config['stories']
|
|
count = self._download_snapchat_client_content(username, 'stories', config)
|
|
total_downloaded += count
|
|
|
|
# Download spotlights
|
|
if sc_config.get('spotlight', {}).get('enabled', True):
|
|
config = sc_config.get('spotlight', {})
|
|
if config.get('destination_path'):
|
|
count = self._download_snapchat_client_content(username, 'spotlight', config)
|
|
total_downloaded += count
|
|
|
|
# End batch and send notification
|
|
self.move_manager.end_batch()
|
|
|
|
# Update scheduler for this manual run
|
|
self._update_scheduler_for_manual_run('snapchat_client', username)
|
|
|
|
# Update scheduler for manual run (platform level)
|
|
if total_downloaded > 0:
|
|
self._update_scheduler_for_manual_run('snapchat_client')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} Snapchat Client files, triggering Immich scan", module="Snapchat")
|
|
self.trigger_immich_scan()
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
def download_coppermine(self):
|
|
"""Download images from Coppermine Photo Galleries"""
|
|
coppermine_config = self.config.get('coppermine', {})
|
|
|
|
if not coppermine_config.get('enabled'):
|
|
self.logger.debug("Coppermine is disabled in config", module="Coppermine")
|
|
return 0
|
|
|
|
total_downloaded = 0
|
|
|
|
# Create Coppermine downloader instance
|
|
downloader = CoppermineDownloader(
|
|
show_progress=True,
|
|
use_database=True,
|
|
log_callback=None, # Coppermine uses its own universal logger now
|
|
unified_db=self.unified_db,
|
|
config=coppermine_config
|
|
)
|
|
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
|
|
# Process each gallery configuration
|
|
galleries = coppermine_config.get('galleries', [])
|
|
for gal_idx, gallery in enumerate(galleries):
|
|
try:
|
|
from modules.monitor_wrapper import log_download_result
|
|
|
|
gallery_url = gallery.get('url')
|
|
if not gallery_url:
|
|
continue
|
|
|
|
# Safely extract gallery name from URL, handling malformed URLs
|
|
try:
|
|
url_parts = gallery_url.split('//')
|
|
gallery_name = gallery.get('name', url_parts[1].split('/')[0] if len(url_parts) > 1 else 'unknown')
|
|
except (IndexError, AttributeError):
|
|
gallery_name = gallery.get('name', 'unknown')
|
|
days_back = gallery.get('days_back', coppermine_config.get('days_back', 7))
|
|
max_pages = gallery.get('max_pages', coppermine_config.get('max_pages'))
|
|
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(gal_idx + 1, len(galleries))
|
|
activity_mgr.update_account_name(gallery_name)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
self.logger.info(f"Processing Coppermine gallery: {gallery_name}", module="Coppermine")
|
|
|
|
# Download to temp directory first
|
|
temp_dir = self.base_path / 'temp' / 'coppermine' / gallery_name
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Determine final destination directory
|
|
dest_path = gallery.get('destination_path') or coppermine_config.get('destination_path')
|
|
if not dest_path:
|
|
dest_path = str(self.base_path / 'downloads' / 'coppermine' / gallery_name)
|
|
|
|
dest_path = Path(dest_path)
|
|
dest_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Start batch for this gallery
|
|
self.move_manager.start_batch(
|
|
platform='coppermine',
|
|
source=gallery_name,
|
|
content_type='image'
|
|
)
|
|
|
|
# Download images to temp directory with deferred database recording
|
|
file_timestamps, count = downloader.download(
|
|
gallery_url=gallery_url,
|
|
output_dir=str(temp_dir),
|
|
days_back=days_back,
|
|
max_pages=max_pages,
|
|
gallery_name=gallery_name,
|
|
defer_database=True # Defer recording until after file move
|
|
)
|
|
|
|
# Get pending downloads for recording after move
|
|
pending_downloads = downloader.get_pending_downloads()
|
|
|
|
if count > 0:
|
|
self.logger.info(f"Downloaded {count} images from {gallery_name}", module="Coppermine")
|
|
|
|
# Move files from temp to destination with processing
|
|
if self.config.get('download_settings', {}).get('move_to_destination', True):
|
|
moved = self._move_and_process_files(
|
|
temp_dir, dest_path,
|
|
['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.heic', '.heif'],
|
|
provided_timestamps=file_timestamps,
|
|
platform='coppermine',
|
|
source=gallery_name,
|
|
content_type='image'
|
|
)
|
|
self.logger.info(f"Moved {moved} images from {gallery_name}", module="Coppermine")
|
|
|
|
# Record pending downloads after successful move
|
|
# This ensures database records only exist for files that made it to final destination
|
|
if pending_downloads and moved > 0:
|
|
self._record_pending_coppermine_downloads(pending_downloads, dest_path, downloader)
|
|
|
|
total_downloaded += count
|
|
# Log to monitor (success)
|
|
log_download_result('coppermine', gallery_name, count, error=None)
|
|
else:
|
|
# Log to monitor (no new content)
|
|
log_download_result('coppermine', gallery_name, 0, error=None)
|
|
|
|
# End batch and send notification
|
|
self.move_manager.end_batch()
|
|
|
|
# Update scheduler for this manual run
|
|
self._update_scheduler_for_manual_run('coppermine', gallery_name)
|
|
|
|
except Exception as e:
|
|
from modules.monitor_wrapper import log_download_result
|
|
gallery_name = gallery.get('name', 'unknown')
|
|
self.logger.error(f"Coppermine download error for {gallery.get('url', 'unknown')}: {e}", module="Coppermine")
|
|
# Log to monitor (failure)
|
|
log_download_result('coppermine', gallery_name, 0, error=str(e))
|
|
continue
|
|
|
|
# Cleanup downloader
|
|
try:
|
|
downloader.cleanup()
|
|
except Exception as e:
|
|
self.logger.debug(f"Coppermine downloader cleanup: {e}", module="Coppermine")
|
|
|
|
# Update scheduler for manual run (platform level)
|
|
if total_downloaded > 0:
|
|
self._update_scheduler_for_manual_run('coppermine')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} Coppermine files, triggering Immich scan", module="Coppermine")
|
|
self.trigger_immich_scan()
|
|
|
|
# Release ML models once after all galleries are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
def download_tiktok(self):
|
|
"""Download TikTok content"""
|
|
if 'tiktok' not in self.modules:
|
|
self.logger.warning("TikTok module not available", module="TikTok")
|
|
return 0
|
|
|
|
total_downloaded = 0 # Track total files downloaded
|
|
tt_config = self.config.get('tiktok', {})
|
|
downloader = self.modules['tiktok']
|
|
|
|
# Support both old format (usernames list) and new format (accounts list)
|
|
usernames_to_process = []
|
|
if 'accounts' in tt_config:
|
|
usernames_to_process = [acc.get('username') for acc in tt_config['accounts'] if acc.get('username')]
|
|
elif 'usernames' in tt_config:
|
|
usernames_to_process = tt_config['usernames']
|
|
|
|
try:
|
|
activity_mgr = get_activity_manager()
|
|
except Exception:
|
|
activity_mgr = None
|
|
|
|
for idx, username in enumerate(usernames_to_process):
|
|
if activity_mgr:
|
|
try:
|
|
activity_mgr.update_account_progress(idx + 1, len(usernames_to_process))
|
|
activity_mgr.update_account_name(username)
|
|
activity_mgr.update_status("Starting")
|
|
except Exception:
|
|
pass
|
|
|
|
self.logger.info(f"Processing TikTok user: {username}", module="TikTok")
|
|
|
|
temp_dir = self.base_path / tt_config['temp_dir']
|
|
# Don't create temp_dir here - TikTok downloader will create it when needed
|
|
|
|
dest_dir = Path(tt_config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Download content
|
|
try:
|
|
from modules.monitor_wrapper import log_download_result
|
|
|
|
days_back = tt_config.get('days_back', 3)
|
|
|
|
# Ensure temp directory exists before passing to downloader
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
file_timestamps, downloaded_files = downloader.download_profile(
|
|
username=username,
|
|
output_dir=temp_dir,
|
|
number_of_days=days_back,
|
|
full_profile=False,
|
|
defer_database=True # Defer recording until after file move
|
|
)
|
|
count = len(downloaded_files)
|
|
|
|
# Get pending downloads for recording after move
|
|
pending_downloads = downloader.get_pending_downloads()
|
|
|
|
self.logger.info(f"Downloaded {count} TikTok files for {username}", module="TikTok")
|
|
|
|
# Move files only if downloaded and temp dir exists
|
|
if count > 0 and temp_dir.exists():
|
|
# Add username subdirectory for organization
|
|
user_dest = dest_dir / username
|
|
user_dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Pass the TikTok-extracted timestamps to ensure proper file dating
|
|
# Include both video and image extensions (TikTok has photo carousel posts)
|
|
moved = self._move_and_process_files(
|
|
temp_dir, user_dest,
|
|
tt_config.get('extensions', ['.mp4', '.webm', '.jpg', '.jpeg', '.png', '.heic']),
|
|
provided_timestamps=file_timestamps,
|
|
platform='tiktok',
|
|
source=username,
|
|
content_type=None # Let it auto-detect per file (images vs videos)
|
|
)
|
|
total_downloaded += moved
|
|
|
|
# Record pending downloads after successful move
|
|
# This ensures database records only exist for files that made it to final destination
|
|
if pending_downloads and moved > 0:
|
|
self._record_pending_tiktok_downloads(pending_downloads, dest_dir, username)
|
|
|
|
# Log to monitor (success)
|
|
log_download_result('tiktok', username, moved or 0, error=None)
|
|
|
|
# Update scheduler for this manual run
|
|
self._update_scheduler_for_manual_run('tiktok', username)
|
|
else:
|
|
# Log to monitor (no new content, still successful)
|
|
log_download_result('tiktok', username, 0, error=None)
|
|
|
|
except Exception as e:
|
|
from modules.monitor_wrapper import log_download_result
|
|
self.logger.error(f"Error downloading TikTok for {username}: {e}", module="TikTok")
|
|
# Log to monitor (failure)
|
|
log_download_result('tiktok', username, 0, error=str(e))
|
|
finally:
|
|
# ALWAYS run aggressive cleanup
|
|
self._cleanup_temp_dir(temp_dir)
|
|
|
|
# Update scheduler for manual run (platform level if no specific users)
|
|
if total_downloaded > 0:
|
|
self._update_scheduler_for_manual_run('tiktok')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} TikTok files, triggering Immich scan", module="TikTok")
|
|
self.trigger_immich_scan()
|
|
elif total_downloaded == 0:
|
|
self.logger.info("No TikTok files downloaded, skipping Immich scan", module="TikTok")
|
|
|
|
# Release ML models once after all users are processed
|
|
self.move_manager.release_models()
|
|
return total_downloaded
|
|
|
|
def download_forums(self, batch_mode: bool = True, specific_forum: str = None):
|
|
"""Download forum content with optional batch mode
|
|
|
|
Args:
|
|
batch_mode: Use batch download mode
|
|
specific_forum: If provided, only download from this specific forum
|
|
"""
|
|
if 'forums' not in self.modules:
|
|
self.logger.warning("No forum modules available", module="Forum")
|
|
return 0
|
|
|
|
# Initialize ForumDownloader with unified database if not already done
|
|
if self.modules['forums'] == 'pending':
|
|
self.logger.info("Initializing ForumDownloader with unified database...", module="Forum")
|
|
# Create database adapter for forum module
|
|
from modules.forum_db_adapter import ForumDatabaseAdapter
|
|
forum_db_adapter = ForumDatabaseAdapter(self.unified_db)
|
|
|
|
self.modules['forums'] = ForumDownloader(
|
|
headless=self.config.get('forums', {}).get('headless', True),
|
|
show_progress=True,
|
|
use_database=True,
|
|
db_path=forum_db_adapter, # Pass adapter instead of path
|
|
download_dir=str(self.base_path / 'downloads'),
|
|
log_callback=self._log_callback
|
|
)
|
|
self.logger.info("ForumDownloader initialized with unified database", module="Forum")
|
|
|
|
forum_module = self.modules['forums']
|
|
total_downloaded = 0 # Track total files downloaded
|
|
|
|
# Process specific forum or all forums
|
|
forums_to_process = {}
|
|
if specific_forum:
|
|
# Process only the specific forum
|
|
if specific_forum in self.forum_configs:
|
|
forums_to_process[specific_forum] = self.forum_configs[specific_forum]
|
|
else:
|
|
self.logger.warning(f"Forum {specific_forum} not found in configuration", module="Forum")
|
|
return 0
|
|
else:
|
|
# Process all forums
|
|
forums_to_process = self.forum_configs
|
|
|
|
try:
|
|
# Process each configured forum
|
|
for forum_name, forum_config in forums_to_process.items():
|
|
self.logger.info(f"Processing forum: {forum_name}", module="Forum")
|
|
|
|
temp_dir = self.base_path / forum_config['temp_dir']
|
|
# Don't create temp_dir here - ForumDownloader will create it when needed
|
|
|
|
dest_dir = Path(forum_config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
# Use batch forum download for all cases
|
|
count = self._batch_forum_download(forum_module, forum_name, forum_config, temp_dir, dest_dir)
|
|
total_downloaded += count
|
|
|
|
# Update scheduler for this manual run
|
|
self._update_scheduler_for_manual_run('forum', forum_name)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error downloading from {forum_name}: {e}", module="Forum")
|
|
|
|
# Update scheduler for manual run (platform level if no specific forum)
|
|
if total_downloaded > 0 and not specific_forum:
|
|
self._update_scheduler_for_manual_run('forum')
|
|
|
|
# Trigger Immich scan only if files were downloaded
|
|
if total_downloaded > 0 and self.config.get('immich', {}).get('scan_after_download'):
|
|
self.logger.info(f"Downloaded {total_downloaded} forum files, triggering Immich scan", module="Forum")
|
|
self.trigger_immich_scan()
|
|
elif total_downloaded == 0:
|
|
self.logger.info("No forum files downloaded, skipping Immich scan", module="Forum")
|
|
|
|
return total_downloaded
|
|
|
|
finally:
|
|
# Always cleanup Playwright resources after all forum downloads complete
|
|
if forum_module and hasattr(forum_module, 'cleanup'):
|
|
self.logger.info("Cleaning up Playwright resources...", module="Core")
|
|
try:
|
|
forum_module.cleanup()
|
|
self.logger.info("Playwright resources released successfully", module="Core")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error during Playwright cleanup: {e}", module="Error")
|
|
|
|
def _batch_forum_download(self, forum_module, forum_name: str, config: Dict, temp_dir: Path, dest_dir: Path) -> int:
|
|
"""Batch download forum content using subprocess isolation to avoid asyncio conflicts
|
|
|
|
Returns:
|
|
Total number of images downloaded
|
|
"""
|
|
self.logger.info(f"Starting forum download for {forum_name} (subprocess mode)", module="Forum")
|
|
|
|
# Prepare configuration for subprocess
|
|
subprocess_config = {
|
|
'forum_name': forum_name,
|
|
'forum_config': config,
|
|
'temp_dir': str(temp_dir),
|
|
'dest_dir': str(dest_dir),
|
|
'download_dir': str(self.base_path / 'downloads'),
|
|
'headless': self.config.get('forums', {}).get('headless', True)
|
|
}
|
|
|
|
try:
|
|
subprocess_wrapper = str(self.base_path / 'wrappers' / 'forum_subprocess_wrapper.py')
|
|
|
|
# Run forum download in subprocess with real-time log streaming
|
|
returncode, stdout = self._run_subprocess_with_streaming_logs(
|
|
subprocess_wrapper,
|
|
subprocess_config,
|
|
timeout=600 # 10 minute timeout
|
|
)
|
|
|
|
# Parse JSON result from stdout
|
|
self.logger.info(f"Subprocess returned: code={returncode}, stdout_len={len(stdout) if stdout else 0}", module="Forum")
|
|
if stdout:
|
|
self.logger.info(f"Subprocess stdout: {repr(stdout[:500])}", module="Forum")
|
|
|
|
if returncode == 0 and stdout:
|
|
try:
|
|
forum_result = json.loads(stdout.strip())
|
|
|
|
if forum_result.get('status') == 'success':
|
|
total_downloaded = forum_result.get('count', 0)
|
|
new_threads = forum_result.get('new_threads', 0)
|
|
total_results = forum_result.get('total_results', 0)
|
|
|
|
if new_threads > 0:
|
|
self.logger.info(f"Forum search results for {forum_name}:", module="Forum")
|
|
self.logger.info(f"Total threads found: {total_results}", module="Core")
|
|
self.logger.info(f"New threads downloaded: {new_threads}", module="Core")
|
|
self.logger.info(f"Total images downloaded: {total_downloaded}", module="Core")
|
|
elif total_results > 0:
|
|
self.logger.info(f"Forum search results for {forum_name}:", module="Forum")
|
|
self.logger.info(f"Total threads found: {total_results}", module="Core")
|
|
self.logger.info(f"All threads already downloaded", module="Core")
|
|
else:
|
|
self.logger.info(f"No threads found from {forum_name} search", module="Forum")
|
|
|
|
# Send notifications if files were downloaded
|
|
# Use file lists directly from subprocess (accurate paths, no race conditions)
|
|
if self.notifier:
|
|
try:
|
|
search_query = config.get('search_query', '')
|
|
final_files = forum_result.get('final_files', [])
|
|
review_files = forum_result.get('review_files', [])
|
|
|
|
# Send notification for files that went to final destination
|
|
if final_files:
|
|
downloads = []
|
|
for f in final_files:
|
|
downloads.append({
|
|
'source': forum_name,
|
|
'content_type': f.get('content_type') or 'image',
|
|
'filename': f.get('filename'),
|
|
'file_path': f.get('file_path')
|
|
})
|
|
|
|
self.notifier.notify_batch_download(
|
|
platform='forums',
|
|
downloads=downloads,
|
|
search_term=search_query
|
|
)
|
|
self.logger.info(f"Sent notification: {len(final_files)} image(s) to final", module="Core")
|
|
|
|
# Send separate notification for review queue files
|
|
if review_files:
|
|
downloads = []
|
|
for f in review_files:
|
|
downloads.append({
|
|
'source': forum_name,
|
|
'content_type': f.get('content_type') or 'image',
|
|
'filename': f.get('filename'),
|
|
'file_path': f.get('file_path')
|
|
})
|
|
|
|
self.notifier.notify_batch_download(
|
|
platform='forums',
|
|
downloads=downloads,
|
|
search_term=search_query,
|
|
is_review_queue=True
|
|
)
|
|
self.logger.info(f"Sent notification: {len(review_files)} image(s) to review queue", module="Core")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send forum notification: {e}", module="Forum")
|
|
|
|
return total_downloaded
|
|
else:
|
|
self.logger.error(f"Forum search failed: {forum_result.get('message', 'Unknown error')}", module="Forum")
|
|
return 0
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse forum subprocess result: {e}", module="Forum")
|
|
self.logger.debug(f"Subprocess stdout: {stdout[:500]}", module="Core")
|
|
return 0
|
|
else:
|
|
self.logger.error(f"Forum subprocess failed with code {returncode}", module="Forum")
|
|
if stdout:
|
|
self.logger.debug(f"Subprocess stdout: {stdout[:500]}", module="Core")
|
|
return 0
|
|
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.error(f"Forum download timeout after 10 minutes", module="Forum")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.error(f"Forum subprocess error: {e}", module="Forum")
|
|
import traceback
|
|
self.logger.debug(f"Traceback: {traceback.format_exc()}", module="Core")
|
|
return 0
|
|
|
|
def _move_and_process_files(self, source_dir: Path, dest_dir: Path, extensions: List[str],
|
|
provided_timestamps: Dict[str, datetime] = None,
|
|
platform: str = None, source: str = None,
|
|
content_type: str = None, search_term: str = None,
|
|
specific_files: List[Path] = None) -> int:
|
|
"""Move files and update timestamps with optional push notifications
|
|
|
|
Args:
|
|
source_dir: Source directory containing files to move
|
|
dest_dir: Destination directory
|
|
extensions: List of file extensions to process
|
|
provided_timestamps: Optional dict of filenames -> datetime objects (e.g., from TikTok module)
|
|
platform: Platform name for notifications (instagram, tiktok, forums, etc.)
|
|
source: Source/username for notifications
|
|
content_type: Content type for notifications (post, story, reel, thread, etc.)
|
|
search_term: Optional search term for notifications (for forum searches)
|
|
specific_files: Optional list of specific file Paths to move (instead of scanning source_dir)
|
|
|
|
Returns:
|
|
Number of files moved
|
|
"""
|
|
if not source_dir.exists():
|
|
return 0
|
|
|
|
# Start batch notification context if metadata provided AND no batch is already active
|
|
start_new_batch = False
|
|
if (platform or source or content_type) and not self.move_manager.batch_context:
|
|
self.move_manager.start_batch(
|
|
platform=platform or 'unknown',
|
|
source=source,
|
|
content_type=content_type,
|
|
search_term=search_term
|
|
)
|
|
start_new_batch = True
|
|
|
|
# Start with provided timestamps if available
|
|
file_timestamps = provided_timestamps.copy() if provided_timestamps else {}
|
|
|
|
# Determine which files to scan for timestamps
|
|
files_to_scan = specific_files if specific_files else [
|
|
f for f in source_dir.rglob('*')
|
|
if f.is_file() and f.suffix.lower() in extensions
|
|
]
|
|
|
|
# For files without provided timestamps, try to extract from filename/directory
|
|
for file in files_to_scan:
|
|
if file.name not in file_timestamps:
|
|
date = extract_date(file.stem) or extract_date(file.parent.name)
|
|
if date:
|
|
file_timestamps[file.name] = date
|
|
|
|
# Log how many timestamps we have
|
|
if provided_timestamps:
|
|
self.logger.debug(f"Using {len(provided_timestamps)} provided timestamps", module="Core")
|
|
|
|
# If specific_files provided, move them individually instead of batch scanning
|
|
if specific_files:
|
|
moved = 0
|
|
for file in specific_files:
|
|
if file.is_file():
|
|
ts = file_timestamps.get(file.name)
|
|
dest_file = dest_dir / file.name
|
|
success = self.move_manager.move_file(
|
|
source=file,
|
|
destination=dest_file,
|
|
timestamp=ts,
|
|
preserve_if_no_timestamp=True
|
|
)
|
|
if success:
|
|
moved += 1
|
|
# End batch and send notification if we started it
|
|
if start_new_batch:
|
|
self.move_manager.end_batch()
|
|
return moved
|
|
|
|
# Move files using MoveManager
|
|
stats = self.move_manager.move_files_batch(
|
|
source_dir=source_dir,
|
|
dest_dir=dest_dir,
|
|
file_timestamps=file_timestamps,
|
|
extensions=extensions,
|
|
preserve_if_no_timestamp=True
|
|
)
|
|
|
|
# End batch and send notification ONLY if we started it
|
|
if start_new_batch:
|
|
self.move_manager.end_batch()
|
|
|
|
# Log move statistics with clarification
|
|
msg_parts = [f"Moved {stats['moved']} files"]
|
|
if stats.get('skipped', 0) > 0:
|
|
msg_parts.append(f"skipped {stats['skipped']} existing")
|
|
if stats.get('duplicates', 0) > 0:
|
|
msg_parts.append(f"blocked {stats['duplicates']} hash duplicates")
|
|
if stats.get('review_queue', 0) > 0:
|
|
msg_parts.append(f"{stats['review_queue']} to review")
|
|
self.logger.info(f"{', '.join(msg_parts)}", module="Core")
|
|
return stats.get('moved', 0)
|
|
|
|
def _merge_fastdl_toolzu_quality_upgrade(self, username: str, content_type: str,
|
|
toolzu_temp_dir: Path, dest_dir: Path) -> int:
|
|
"""
|
|
Auto-merge FastDL and Toolzu downloads for quality upgrade
|
|
|
|
IMPORTANT: Only upgrades files that FastDL has ALREADY downloaded (in database)
|
|
- Checks database for FastDL downloads
|
|
- Only upgrades images (not videos)
|
|
- Uses FastDL timestamp from database
|
|
- Provides clear upgrade logging
|
|
|
|
Args:
|
|
username: Instagram username
|
|
content_type: 'posts' only (stories handled separately)
|
|
toolzu_temp_dir: Toolzu temp directory
|
|
dest_dir: Final destination directory
|
|
|
|
Returns:
|
|
Number of files processed
|
|
"""
|
|
import re
|
|
|
|
toolzu_user_dir = toolzu_temp_dir / username.lower()
|
|
|
|
self.logger.debug(f"Looking for Toolzu files in: {toolzu_user_dir}", module="Toolzu")
|
|
self.logger.debug(f"Directory exists: {toolzu_user_dir.exists()}", module="Toolzu")
|
|
|
|
if toolzu_user_dir.exists():
|
|
files_list = list(toolzu_user_dir.iterdir())
|
|
self.logger.debug(f"Found {len(files_list)} files in Toolzu directory", module="Core")
|
|
for f in files_list[:5]: # Show first 5 files
|
|
self.logger.debug(f"- {f.name}", module="Core")
|
|
|
|
if not toolzu_user_dir.exists():
|
|
self.logger.debug(f"No Toolzu files found at {toolzu_user_dir}", module="Toolzu")
|
|
return 0
|
|
|
|
# Ensure destination exists
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Process Toolzu files
|
|
moved_count = 0
|
|
upgraded_count = 0
|
|
skipped_count = 0
|
|
# Match both numeric IDs (17-19 digits) and alphanumeric IDs
|
|
# Don't include _ in character class so we stop at underscores
|
|
pattern = r'_([A-Za-z0-9-]{10,})\.(jpg|jpeg|png|webp|mp4|mov)$'
|
|
|
|
for toolzu_file in toolzu_user_dir.iterdir():
|
|
if not toolzu_file.is_file():
|
|
continue
|
|
|
|
# Skip video files - no need to upgrade (already good quality)
|
|
if toolzu_file.suffix.lower() in ['.mp4', '.mov']:
|
|
self.logger.debug(f"Skipping video (no upgrade needed)", module="Core")
|
|
continue
|
|
|
|
# Extract media ID from image files
|
|
match = re.search(pattern, toolzu_file.name)
|
|
if not match:
|
|
self.logger.debug(f"Could not extract media ID from {toolzu_file.name}", module="Toolzu")
|
|
continue
|
|
|
|
media_id = match.group(1)
|
|
|
|
# Check if FastDL has already downloaded this (in database)
|
|
self.logger.debug(f"Checking upgrade for media_id: {media_id}", module="Core")
|
|
fastdl_record = self.unified_db.get_download_by_media_id(media_id, platform='instagram', method='fastdl')
|
|
self.logger.debug(f"FastDL record: {fastdl_record}", module="Instagram")
|
|
|
|
if fastdl_record and fastdl_record.get('post_date'):
|
|
# Check if already upgraded
|
|
metadata = fastdl_record.get('metadata', {})
|
|
if metadata.get('upgraded'):
|
|
self.logger.debug(f"Skipping {toolzu_file.name} (already upgraded)", module="Toolzu")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# FastDL has this file and it hasn't been upgraded yet - upgrade it!
|
|
import shutil
|
|
|
|
# Get post_date from FastDL database
|
|
post_date = fastdl_record['post_date']
|
|
if isinstance(post_date, str):
|
|
post_date = datetime.fromisoformat(post_date)
|
|
|
|
# Use FastDL's exact original filename (preserve it exactly)
|
|
original_filename = fastdl_record.get('filename')
|
|
if not original_filename:
|
|
# Fallback: extract from file_path if filename field is empty
|
|
file_path = fastdl_record.get('file_path')
|
|
if file_path:
|
|
original_filename = Path(file_path).name
|
|
else:
|
|
# Last resort: generate filename
|
|
date_str = post_date.strftime('%Y%m%d_%H%M%S')
|
|
ext = toolzu_file.suffix
|
|
original_filename = f"{username.lower()}_{date_str}_{media_id}{ext}"
|
|
|
|
dest_file = dest_dir / original_filename
|
|
|
|
# Skip if already exists at destination
|
|
if dest_file.exists():
|
|
self.logger.debug(f"Skipping {original_filename} (already exists at destination)", module="Core")
|
|
# Mark as upgraded even if file exists
|
|
self.unified_db.mark_fastdl_upgraded(media_id)
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Copy Toolzu's high-res file using FastDL's original filename
|
|
shutil.copy2(toolzu_file, dest_file)
|
|
|
|
# Update timestamp to match FastDL
|
|
ts = post_date.timestamp()
|
|
os.utime(dest_file, (ts, ts))
|
|
|
|
# Mark this FastDL record as upgraded in the database
|
|
self.unified_db.mark_fastdl_upgraded(media_id)
|
|
|
|
self.logger.info(f"⬆️ UPGRADED: {original_filename} → high-res (dated: {post_date.strftime('%Y-%m-%d %H:%M')})", module="Core")
|
|
upgraded_count += 1
|
|
moved_count += 1
|
|
else:
|
|
# FastDL doesn't have this - skip it (don't upgrade)
|
|
self.logger.debug(f"Skipped {toolzu_file.name} (not in FastDL database)", module="Instagram")
|
|
skipped_count += 1
|
|
|
|
# Clean up temp directories
|
|
if toolzu_user_dir.exists():
|
|
import shutil
|
|
shutil.rmtree(toolzu_user_dir, ignore_errors=True)
|
|
self.logger.debug(f"Cleaned up Toolzu temp: {toolzu_user_dir}", module="Toolzu")
|
|
|
|
# Summary logging
|
|
if upgraded_count > 0:
|
|
self.logger.info(f"✨ QUALITY UPGRADE SUMMARY: Upgraded {upgraded_count} images to high-res", module="Toolzu")
|
|
if skipped_count > 0:
|
|
self.logger.debug(f"Skipped {skipped_count} files (not in FastDL database or already exist)", module="Instagram")
|
|
|
|
return moved_count
|
|
|
|
def _cleanup_temp_dir(self, temp_dir: Path):
|
|
"""Clean up temp directory after files have been moved - AGGRESSIVE VERSION"""
|
|
import shutil
|
|
try:
|
|
# Method 1: Try to remove the entire tree if it exists
|
|
if temp_dir.exists():
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
self.logger.debug(f"Removed temp dir tree: {temp_dir}", module="Core")
|
|
|
|
# Method 2: Clean up parent directories if they're empty
|
|
parents_to_check = []
|
|
current = temp_dir.parent
|
|
while current != self.base_path and current != Path('/'):
|
|
parents_to_check.append(current)
|
|
current = current.parent
|
|
|
|
# Check each parent from deepest to shallowest
|
|
for parent_dir in parents_to_check:
|
|
if parent_dir.exists() and not any(parent_dir.iterdir()):
|
|
try:
|
|
parent_dir.rmdir()
|
|
self.logger.debug(f"Removed empty parent dir: {parent_dir}", module="Core")
|
|
except OSError:
|
|
break # Stop if we can't remove a directory (permission, not empty, etc.)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error cleaning temp dir: {e}", module="Error")
|
|
|
|
def _cleanup_forum_temp_dir(self, temp_dir: Path):
|
|
"""Clean up forum temp directory more aggressively"""
|
|
import shutil
|
|
try:
|
|
if temp_dir.exists():
|
|
# Force remove the entire directory tree
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
self.logger.debug(f"Removed forum temp directory: {temp_dir}", module="Forum")
|
|
|
|
# Clean up parent directories if empty
|
|
parent = temp_dir.parent
|
|
base_temp = self.base_path / "temp"
|
|
|
|
while parent and parent != base_temp and parent.exists():
|
|
try:
|
|
# Try to remove if empty
|
|
if not any(parent.iterdir()):
|
|
parent.rmdir()
|
|
self.logger.debug(f"Removed empty parent: {parent}", module="Core")
|
|
parent = parent.parent
|
|
else:
|
|
# Check if it only has empty subdirectories
|
|
subdirs = [d for d in parent.iterdir() if d.is_dir()]
|
|
if subdirs and all(not any(d.iterdir()) for d in subdirs):
|
|
for subdir in subdirs:
|
|
try:
|
|
subdir.rmdir()
|
|
except OSError:
|
|
pass # Directory not empty or permission error
|
|
# Try parent again
|
|
if not any(parent.iterdir()):
|
|
parent.rmdir()
|
|
self.logger.debug(f"Removed parent after cleaning: {parent}", module="Core")
|
|
break
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not remove {parent}: {e}", module="Core")
|
|
break
|
|
except Exception as e:
|
|
self.logger.warning(f"Error cleaning forum temp dir {temp_dir}: {e}", module="Forum")
|
|
|
|
def cleanup_orphaned_data_files(self):
|
|
"""Clean up orphaned database files from data folder on startup
|
|
|
|
Only backup_cache.db and changelog.json belong in data/
|
|
Other .db files are likely created by running scripts from wrong directory
|
|
"""
|
|
data_dir = self.base_path / 'data'
|
|
if not data_dir.exists():
|
|
return
|
|
|
|
# Allowed files in data folder
|
|
allowed_files = {'backup_cache.db', 'changelog.json'}
|
|
|
|
try:
|
|
for item in data_dir.iterdir():
|
|
if item.is_file() and item.suffix == '.db' and item.name not in allowed_files:
|
|
# Check if it's empty or very small (likely orphaned)
|
|
try:
|
|
size = item.stat().st_size
|
|
if size < 50000: # Less than 50KB is likely empty/orphaned
|
|
item.unlink()
|
|
self.logger.info(f"Removed orphaned database file: {item.name} ({size} bytes)", module="Core")
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not check/remove {item}: {e}", module="Core")
|
|
except Exception as e:
|
|
self.logger.debug(f"Error during data folder cleanup: {e}", module="Core")
|
|
|
|
def cleanup_all_temp_dirs(self):
|
|
"""Clean up all temp directories on startup to prevent accumulation"""
|
|
import shutil
|
|
temp_base = self.base_path / 'temp'
|
|
|
|
# Also clean up orphaned data folder files
|
|
self.cleanup_orphaned_data_files()
|
|
|
|
if not temp_base.exists():
|
|
return
|
|
|
|
try:
|
|
# Count files before cleanup
|
|
total_files = 0
|
|
total_size = 0
|
|
for root, dirs, files in os.walk(temp_base):
|
|
for f in files:
|
|
try:
|
|
fpath = Path(root) / f
|
|
total_files += 1
|
|
total_size += fpath.stat().st_size
|
|
except OSError:
|
|
pass
|
|
|
|
if total_files > 0:
|
|
self.logger.info(f"Cleaning up {total_files} temp files ({total_size / 1024 / 1024:.1f} MB)", module="Core")
|
|
|
|
# Remove all contents but keep the base temp directory
|
|
for item in temp_base.iterdir():
|
|
try:
|
|
if item.is_dir():
|
|
shutil.rmtree(item, ignore_errors=True)
|
|
else:
|
|
item.unlink(missing_ok=True)
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not remove {item}: {e}", module="Core")
|
|
|
|
self.logger.info("Temp directory cleanup complete", module="Core")
|
|
else:
|
|
self.logger.debug("Temp directories already clean", module="Core")
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error during temp cleanup: {e}", module="Core")
|
|
|
|
def trigger_immich_scan(self):
|
|
"""Trigger Immich library scan if configured"""
|
|
immich_config = self.config.get('immich', {})
|
|
if not immich_config.get('enabled'):
|
|
return
|
|
|
|
# Safely get required config values with validation
|
|
api_url = immich_config.get('api_url')
|
|
api_key = immich_config.get('api_key')
|
|
library_id = immich_config.get('library_id')
|
|
|
|
if not all([api_url, api_key, library_id]):
|
|
self.logger.warning("Immich enabled but missing required config (api_url, api_key, or library_id)", module="Immich")
|
|
return
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{api_url}/libraries/{library_id}/scan",
|
|
headers={'X-API-KEY': api_key},
|
|
timeout=30 # Add timeout to prevent indefinite blocking
|
|
)
|
|
if response.status_code in [200, 201, 204]: # 204 No Content is success for scan endpoint
|
|
self.logger.info(f"Successfully triggered Immich scan for library {library_id}", module="Immich")
|
|
else:
|
|
self.logger.warning(f"Immich scan trigger failed: {response.status_code}", module="Immich")
|
|
except requests.Timeout:
|
|
self.logger.warning("Immich scan request timed out after 30 seconds", module="Immich")
|
|
except Exception as e:
|
|
self.logger.error(f"Error triggering Immich scan: {e}", module="Immich")
|
|
|
|
def download_with_gallery_dl(self, url: str, output_dir: Path):
|
|
"""Fallback to gallery-dl for unsupported image hosts"""
|
|
try:
|
|
result = subprocess.run(
|
|
['gallery-dl', url, '-d', str(output_dir)],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
self.logger.info(f"Successfully downloaded with gallery-dl: {url}", module="Coppermine")
|
|
return True
|
|
else:
|
|
self.logger.error(f"gallery-dl failed: {result.stderr}", module="Coppermine")
|
|
return False
|
|
except FileNotFoundError:
|
|
self.logger.warning("gallery-dl not installed, skipping fallback", module="Coppermine")
|
|
return False
|
|
|
|
def run(self):
|
|
"""Main execution loop"""
|
|
self.logger.info("Starting Media Downloader", module="Core")
|
|
|
|
try:
|
|
# Track total downloads across all platforms
|
|
total_downloaded_all = 0
|
|
hidden_modules = self.config.get('hidden_modules', [])
|
|
|
|
# Download Instagram (InstaLoader)
|
|
if self.config.get('instagram', {}).get('enabled') and 'instagram' not in hidden_modules:
|
|
count = self.download_instagram()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download Instagram (FastDL - alternative)
|
|
if self.config.get('fastdl', {}).get('enabled') and 'fastdl' not in hidden_modules:
|
|
count = self.download_fastdl()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download Instagram (ImgInn API - alternative)
|
|
if self.config.get('imginn_api', {}).get('enabled') and 'imginn_api' not in hidden_modules:
|
|
count = self.download_imginn_api()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download Instagram (ImgInn - alternative)
|
|
if self.config.get('imginn', {}).get('enabled') and 'imginn' not in hidden_modules:
|
|
count = self.download_imginn()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download Instagram (Toolzu - 1920x1440 resolution)
|
|
if self.config.get('toolzu', {}).get('enabled') and 'toolzu' not in hidden_modules:
|
|
count = self.download_toolzu()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download Snapchat
|
|
if self.config.get('snapchat', {}).get('enabled') and 'snapchat' not in hidden_modules:
|
|
count = self.download_snapchat()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download Snapchat Client (direct API)
|
|
if self.config.get('snapchat_client', {}).get('enabled') and 'snapchat_client' not in hidden_modules:
|
|
count = self.download_snapchat_client()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download TikTok
|
|
if self.config.get('tiktok', {}).get('enabled') and 'tiktok' not in hidden_modules:
|
|
count = self.download_tiktok()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download Forums
|
|
if self.config.get('forums', {}).get('enabled') and 'forums' not in hidden_modules:
|
|
count = self.download_forums(batch_mode=True)
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Download Coppermine Galleries
|
|
if self.config.get('coppermine', {}).get('enabled') and 'coppermine' not in hidden_modules:
|
|
count = self.download_coppermine()
|
|
total_downloaded_all += count if count else 0
|
|
|
|
# Trigger Immich scan at end only if files were downloaded
|
|
if self.config.get('immich', {}).get('scan_at_end'):
|
|
if total_downloaded_all > 0:
|
|
self.logger.info(f"Downloaded {total_downloaded_all} total files, triggering final Immich scan", module="Immich")
|
|
self.trigger_immich_scan()
|
|
else:
|
|
self.logger.info("No files downloaded, skipping final Immich scan", module="Immich")
|
|
|
|
self.logger.info("Media Downloader completed successfully", module="Core")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Fatal error in Media Downloader: {e}", module="Error")
|
|
raise
|
|
|
|
# ============================================================================
|
|
# MAIN ENTRY POINT
|
|
# ============================================================================
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(description='Media Downloader - Unified media downloading system')
|
|
parser.add_argument('--config', type=str, help='Path to configuration file')
|
|
parser.add_argument('--reset', action='store_true', help='Reset database and temp directories')
|
|
parser.add_argument('--test', action='store_true', help='Test mode - download minimal content')
|
|
# Load hidden_modules early to filter platform choices in --help
|
|
_all_platforms = ['instagram_unified', 'snapchat', 'snapchat_client', 'tiktok', 'forums', 'coppermine', 'all']
|
|
try:
|
|
import modules.db_bootstrap # noqa: F401 - ensures pg_adapter is loaded
|
|
from modules.settings_manager import SettingsManager
|
|
_sm = SettingsManager('database/media_downloader.db')
|
|
_hidden = _sm.get('hidden_modules') or []
|
|
if not isinstance(_hidden, list):
|
|
_hidden = []
|
|
except Exception:
|
|
_hidden = []
|
|
_visible_platforms = [p for p in _all_platforms if p not in _hidden]
|
|
if 'all' not in _visible_platforms:
|
|
_visible_platforms.append('all')
|
|
parser.add_argument('--platform', choices=_visible_platforms,
|
|
default='all', help='Platform to download from')
|
|
parser.add_argument('--scheduler', action='store_true', help='Run with scheduler for random intervals')
|
|
parser.add_argument('--scheduler-status', action='store_true', help='Show scheduler status')
|
|
parser.add_argument('--db', nargs=argparse.REMAINDER, help='Database management commands (e.g., --db delete POST_ID, --db list --limit 10, --db stats)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Handle database management commands
|
|
if args.db is not None:
|
|
# Delegate to db_manager.py
|
|
import subprocess as sp
|
|
base_path = Path(__file__).parent
|
|
db_manager_path = base_path / 'utilities' / 'db_manager.py'
|
|
|
|
# Run db_manager with the provided arguments
|
|
result = sp.run(
|
|
[sys.executable, str(db_manager_path)] + args.db,
|
|
cwd=base_path
|
|
)
|
|
sys.exit(result.returncode)
|
|
|
|
# Original args parsing continues below
|
|
args_original = args
|
|
|
|
# Reset if requested
|
|
if args.reset:
|
|
print("Resetting databases and temp directories...")
|
|
base_path = Path(__file__).parent
|
|
|
|
# Remove all databases from database directory
|
|
db_dir = base_path / 'database'
|
|
if db_dir.exists():
|
|
# Remove main unified database
|
|
db_path = db_dir / 'media_downloader.db'
|
|
if db_path.exists():
|
|
db_path.unlink()
|
|
print(f"Removed database: {db_path}")
|
|
|
|
# Remove scheduler database
|
|
scheduler_db = db_dir / 'scheduler_state.db'
|
|
if scheduler_db.exists():
|
|
scheduler_db.unlink()
|
|
print(f"Removed scheduler database: {scheduler_db}")
|
|
|
|
# Remove any WAL or SHM files
|
|
for pattern in ['*.db-wal', '*.db-shm']:
|
|
for file in db_dir.glob(pattern):
|
|
file.unlink()
|
|
print(f"Removed: {file}")
|
|
|
|
# Remove old databases from root (if exist from old versions)
|
|
old_scheduler_db = base_path / 'scheduler_state.db'
|
|
if old_scheduler_db.exists():
|
|
old_scheduler_db.unlink()
|
|
print(f"Removed old scheduler database: {old_scheduler_db}")
|
|
|
|
old_unified_db = base_path / 'unified_media.db'
|
|
if old_unified_db.exists():
|
|
old_unified_db.unlink()
|
|
print(f"Removed old unified database: {old_unified_db}")
|
|
|
|
# Clear temp directories
|
|
for temp_dir in ['temp', 'downloads']:
|
|
dir_path = base_path / temp_dir
|
|
if dir_path.exists():
|
|
import shutil
|
|
shutil.rmtree(dir_path)
|
|
print(f"Removed directory: {dir_path}")
|
|
|
|
# Clear logs
|
|
log_dir = base_path / 'logs'
|
|
if log_dir.exists():
|
|
for log_file in log_dir.glob('*.log'):
|
|
log_file.unlink()
|
|
print("Cleared log files")
|
|
|
|
print("Reset complete")
|
|
return
|
|
|
|
# Check scheduler status
|
|
if args.scheduler_status:
|
|
from modules.scheduler import DownloadScheduler
|
|
from modules.unified_database import UnifiedDatabase
|
|
import subprocess
|
|
import json
|
|
|
|
# Check if systemd service is running
|
|
service_running = False
|
|
try:
|
|
result = subprocess.run(['systemctl', 'is-active', 'media-downloader'],
|
|
capture_output=True, text=True)
|
|
service_running = result.stdout.strip() == 'active'
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
pass # systemctl not available or failed
|
|
|
|
# Create unified database instance for scheduler
|
|
db_path = Path(__file__).parent / 'database' / 'media_downloader.db'
|
|
unified_db = UnifiedDatabase(str(db_path), use_pool=True, pool_size=3)
|
|
|
|
# Create a simple log callback for scheduler status
|
|
def log_callback(message, level="info"):
|
|
pass # Silent for status check
|
|
|
|
# Read scheduler state from database
|
|
scheduled_tasks = {}
|
|
last_run_status = {}
|
|
try:
|
|
import sqlite3
|
|
conn = sqlite3.connect('scheduler_state')
|
|
cursor = conn.cursor()
|
|
try:
|
|
# Get both next run times and last run status
|
|
cursor.execute('''
|
|
SELECT task_id, next_run, last_run, status
|
|
FROM scheduler_state
|
|
WHERE next_run IS NOT NULL
|
|
ORDER BY next_run
|
|
''')
|
|
for task_id, next_run, last_run, status_str in cursor.fetchall():
|
|
if next_run:
|
|
try:
|
|
next_dt = datetime.fromisoformat(next_run)
|
|
time_diff = next_dt - datetime.now()
|
|
if time_diff.total_seconds() > 0:
|
|
# Format time until next run
|
|
hours = int(time_diff.total_seconds() // 3600)
|
|
minutes = int((time_diff.total_seconds() % 3600) // 60)
|
|
if hours > 0:
|
|
time_until = f"{hours}h {minutes}m"
|
|
else:
|
|
time_until = f"{minutes}m"
|
|
else:
|
|
time_until = "Now"
|
|
|
|
scheduled_tasks[task_id] = {
|
|
'next_run': next_run,
|
|
'time_until': time_until
|
|
}
|
|
except (ValueError, TypeError):
|
|
pass # Invalid datetime format
|
|
|
|
# Track last run status - all active tasks are considered successful
|
|
if last_run:
|
|
last_run_status[task_id] = {
|
|
'timestamp': last_run,
|
|
'success': True # If it's in the scheduler_state table and active, it was successful
|
|
}
|
|
except sqlite3.Error:
|
|
pass # Database error reading scheduler state
|
|
finally:
|
|
conn.close()
|
|
except Exception:
|
|
pass # Database connection error
|
|
|
|
# Initialize settings manager for database-backed configuration
|
|
db_path = Path(__file__).parent / 'database' / 'media_downloader.db'
|
|
settings_manager = SettingsManager(str(db_path))
|
|
|
|
scheduler = DownloadScheduler(config_path=args.config, log_callback=log_callback, unified_db=unified_db, settings_manager=settings_manager)
|
|
status = scheduler.get_status()
|
|
status['running'] = service_running
|
|
status['scheduled_tasks'] = scheduled_tasks
|
|
status['last_run_status'] = last_run_status
|
|
|
|
# Format the output nicely
|
|
print("\n╔════════════════════════════════════════════════════════════════╗")
|
|
print("║ SCHEDULER STATUS ║")
|
|
print("╚════════════════════════════════════════════════════════════════╝")
|
|
|
|
# Running status
|
|
if status['running']:
|
|
print("\n⚡ Status: RUNNING")
|
|
else:
|
|
print("\n⏸ Status: STOPPED")
|
|
|
|
# Load config from database
|
|
db_path = Path(__file__).parent / 'database' / 'media_downloader.db'
|
|
try:
|
|
settings_manager = SettingsManager(str(db_path))
|
|
config = settings_manager.get_all()
|
|
except Exception as e:
|
|
print(f" Error loading config: {e}")
|
|
config = {}
|
|
|
|
|
|
# Last run times
|
|
if status.get('last_run_times'):
|
|
print("\n📊 Last Run Times:")
|
|
print("─" * 66)
|
|
|
|
|
|
# Filter out disabled services
|
|
filtered_tasks = []
|
|
for task_name, run_info in status['last_run_times'].items():
|
|
# Check if this service is enabled
|
|
should_show = True
|
|
if ':' in task_name:
|
|
service = task_name.split(':')[0].lower()
|
|
|
|
if service == 'instagram' and not config.get('instagram', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service == 'fastdl' and not config.get('fastdl', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service == 'imginn' and not config.get('imginn', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service == 'snapchat' and not config.get('snapchat', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service == 'tiktok' and not config.get('tiktok', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service in ['forum', 'monitor'] and not config.get('forums', {}).get('enabled', False):
|
|
should_show = False
|
|
|
|
if should_show:
|
|
filtered_tasks.append((task_name, run_info))
|
|
|
|
# Sort by most recent first
|
|
sorted_tasks = sorted(
|
|
filtered_tasks,
|
|
key=lambda x: x[1]['timestamp'],
|
|
reverse=True
|
|
)
|
|
|
|
for task_name, run_info in sorted_tasks:
|
|
# Parse task type and target
|
|
if ':' in task_name:
|
|
task_type, target = task_name.split(':', 1)
|
|
# Capitalize properly for display
|
|
if task_type.lower() == 'imginn':
|
|
task_type = 'ImgInn'
|
|
elif task_type.lower() == 'fastdl':
|
|
task_type = 'FastDL'
|
|
elif task_type.lower() == 'tiktok':
|
|
task_type = 'TikTok'
|
|
else:
|
|
task_type = task_type.capitalize()
|
|
else:
|
|
task_type = "Task"
|
|
target = task_name
|
|
|
|
# Format time ago
|
|
time_ago = run_info['time_ago']
|
|
if isinstance(time_ago, str):
|
|
# Check if it contains "day" (for durations > 24 hours)
|
|
if "day" in time_ago:
|
|
# Format: "1 day, 0:30:45" or "2 days, 14:30:45"
|
|
day_parts = time_ago.split(", ")
|
|
days = int(day_parts[0].split()[0])
|
|
if len(day_parts) > 1:
|
|
time_parts = day_parts[1].split(':')
|
|
hours = int(time_parts[0])
|
|
minutes = int(time_parts[1]) if len(time_parts) > 1 else 0
|
|
time_str = f"{days}d {hours}h ago"
|
|
else:
|
|
time_str = f"{days}d ago"
|
|
else:
|
|
# Parse the timedelta string (format: "HH:MM:SS")
|
|
parts = time_ago.split(':')
|
|
if len(parts) == 3:
|
|
hours = int(parts[0])
|
|
minutes = int(parts[1])
|
|
seconds = float(parts[2])
|
|
|
|
if hours > 0:
|
|
time_str = f"{hours}h {minutes}m ago"
|
|
elif minutes > 0:
|
|
time_str = f"{minutes}m ago"
|
|
else:
|
|
time_str = f"{int(seconds)}s ago"
|
|
else:
|
|
time_str = time_ago
|
|
else:
|
|
time_str = str(time_ago)
|
|
|
|
# Format timestamp
|
|
timestamp = datetime.fromisoformat(run_info['timestamp'])
|
|
time_formatted = timestamp.strftime("%H:%M:%S")
|
|
|
|
# Check if we have status info from database
|
|
status_indicator = ""
|
|
if task_name in status.get('last_run_status', {}):
|
|
if status['last_run_status'][task_name]['success']:
|
|
status_indicator = " ✓"
|
|
else:
|
|
status_indicator = " ✗"
|
|
|
|
# Get download count if available
|
|
download_count = run_info.get('download_count', 0)
|
|
download_info = f" [{download_count} new]" if download_count > 0 else ""
|
|
|
|
print(f" {task_type:<10} {target:<20} {time_formatted} ({time_str}){status_indicator}{download_info}")
|
|
else:
|
|
print("\n No tasks have been run yet")
|
|
|
|
# Next scheduled runs (only if scheduler is running)
|
|
if status['running'] and status.get('scheduled_tasks'):
|
|
print("\n⏰ Next Scheduled Runs:")
|
|
print("─" * 66)
|
|
|
|
# Filter out disabled services
|
|
filtered_scheduled = []
|
|
for task_name, schedule_info in status['scheduled_tasks'].items():
|
|
should_show = True
|
|
if ':' in task_name:
|
|
service = task_name.split(':')[0].lower()
|
|
|
|
if service == 'instagram' and not config.get('instagram', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service == 'fastdl' and not config.get('fastdl', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service == 'imginn' and not config.get('imginn', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service == 'snapchat' and not config.get('snapchat', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service == 'tiktok' and not config.get('tiktok', {}).get('enabled', False):
|
|
should_show = False
|
|
elif service in ['forum', 'monitor'] and not config.get('forums', {}).get('enabled', False):
|
|
should_show = False
|
|
|
|
if should_show:
|
|
filtered_scheduled.append((task_name, schedule_info))
|
|
|
|
# Sort by next run time
|
|
sorted_scheduled = sorted(
|
|
filtered_scheduled,
|
|
key=lambda x: x[1]['next_run']
|
|
)
|
|
|
|
for task_name, schedule_info in sorted_scheduled:
|
|
# Parse task type and target
|
|
if ':' in task_name:
|
|
task_type, target = task_name.split(':', 1)
|
|
# Capitalize properly for display
|
|
if task_type.lower() == 'imginn':
|
|
task_type = 'ImgInn'
|
|
elif task_type.lower() == 'fastdl':
|
|
task_type = 'FastDL'
|
|
elif task_type.lower() == 'tiktok':
|
|
task_type = 'TikTok'
|
|
else:
|
|
task_type = task_type.capitalize()
|
|
else:
|
|
task_type = "Task"
|
|
target = task_name
|
|
|
|
# Format time until
|
|
time_until = schedule_info['time_until']
|
|
if time_until == 'Now':
|
|
time_str = "Now"
|
|
elif isinstance(time_until, str):
|
|
# Check if it contains "day" (for durations > 24 hours)
|
|
if "day" in time_until:
|
|
# Format: "1 day, 0:30:45" or "2 days, 14:30:45"
|
|
day_parts = time_until.split(", ")
|
|
days = int(day_parts[0].split()[0])
|
|
if len(day_parts) > 1:
|
|
time_parts = day_parts[1].split(':')
|
|
hours = int(time_parts[0])
|
|
minutes = int(time_parts[1]) if len(time_parts) > 1 else 0
|
|
time_str = f"in {days}d {hours}h"
|
|
else:
|
|
time_str = f"in {days}d"
|
|
else:
|
|
# Parse the timedelta string
|
|
parts = time_until.split(':')
|
|
if len(parts) == 3:
|
|
hours = int(parts[0])
|
|
minutes = int(parts[1])
|
|
seconds = float(parts[2].split('.')[0])
|
|
|
|
if hours > 0:
|
|
time_str = f"in {hours}h {minutes}m"
|
|
elif minutes > 0:
|
|
time_str = f"in {minutes}m"
|
|
else:
|
|
time_str = f"in {int(seconds)}s"
|
|
else:
|
|
time_str = time_until
|
|
else:
|
|
time_str = str(time_until)
|
|
|
|
# Format next run timestamp
|
|
next_run = datetime.fromisoformat(schedule_info['next_run'])
|
|
time_formatted = next_run.strftime("%H:%M:%S")
|
|
|
|
print(f" {task_type:<10} {target:<20} {time_formatted} ({time_str})")
|
|
elif status['running']:
|
|
print("\n⏰ Next Scheduled Runs:")
|
|
print("─" * 66)
|
|
print(" No tasks scheduled yet")
|
|
|
|
print("\n" + "─" * 66)
|
|
unified_db.close()
|
|
return
|
|
|
|
# Run with scheduler
|
|
if args.scheduler:
|
|
from modules.scheduler import DownloadScheduler
|
|
from modules.unified_database import UnifiedDatabase
|
|
import logging
|
|
import signal
|
|
from modules.universal_logger import get_logger
|
|
|
|
# Track instances for graceful shutdown
|
|
scheduler_instance = None
|
|
unified_db = None
|
|
shutdown_requested = False
|
|
|
|
def graceful_shutdown(signum, frame):
|
|
"""Handle SIGTERM/SIGINT for graceful shutdown"""
|
|
nonlocal shutdown_requested
|
|
if shutdown_requested:
|
|
return # Prevent multiple shutdowns
|
|
shutdown_requested = True
|
|
|
|
sig_name = 'SIGTERM' if signum == signal.SIGTERM else 'SIGINT'
|
|
print(f"\nReceived {sig_name}, initiating graceful shutdown...")
|
|
|
|
if scheduler_instance:
|
|
print("Stopping scheduler...")
|
|
scheduler_instance.stop()
|
|
|
|
# Clean up via the scheduler's downloader (no separate instance)
|
|
dl = getattr(scheduler_instance, 'downloader', None)
|
|
if dl:
|
|
print("Cleaning up temp files...")
|
|
dl.cleanup_all_temp_dirs()
|
|
|
|
# Close the shared database pool
|
|
if unified_db:
|
|
print("Closing database connections...")
|
|
unified_db.close()
|
|
|
|
print("Graceful shutdown complete")
|
|
sys.exit(0)
|
|
|
|
# Register signal handlers
|
|
signal.signal(signal.SIGTERM, graceful_shutdown)
|
|
signal.signal(signal.SIGINT, graceful_shutdown)
|
|
|
|
print("Starting Media Downloader with random scheduling...")
|
|
print("Downloads will run at random intervals in the second half of configured periods.")
|
|
print("This helps prevent detection by avoiding predictable patterns.")
|
|
print("")
|
|
|
|
# Use universal logger for daemon mode
|
|
logger = get_logger('Media_Downloader')
|
|
|
|
# Suppress noisy third-party loggers
|
|
logging.getLogger('asyncio').setLevel(logging.WARNING)
|
|
logging.getLogger('selenium').setLevel(logging.WARNING)
|
|
logging.getLogger('urllib3').setLevel(logging.WARNING)
|
|
|
|
# Create unified database instance for scheduler
|
|
db_path = Path(__file__).parent / 'database' / 'media_downloader.db'
|
|
unified_db = UnifiedDatabase(str(db_path), use_pool=True, pool_size=5)
|
|
|
|
# Create simple log callback (for backwards compatibility)
|
|
# Scheduler now uses its own universal logger, so we don't re-log
|
|
def log_callback(message, level="info"):
|
|
# Scheduler has its own universal logger, no need to re-log
|
|
pass
|
|
|
|
logger.info("Media Downloader daemon started with universal logging", module="Core")
|
|
|
|
# Initialize settings manager for database-backed configuration
|
|
db_path = Path(__file__).parent / 'database' / 'media_downloader.db'
|
|
settings_manager = SettingsManager(str(db_path))
|
|
|
|
# Pass the log callback to scheduler
|
|
scheduler = DownloadScheduler(
|
|
config_path=args.config,
|
|
log_callback=log_callback,
|
|
unified_db=unified_db,
|
|
settings_manager=settings_manager
|
|
)
|
|
scheduler_instance = scheduler # Track for graceful shutdown
|
|
|
|
try:
|
|
# Load config from database to show schedule
|
|
config = settings_manager.get_all()
|
|
|
|
print("Configured individual schedules:")
|
|
|
|
# Instagram accounts
|
|
if config.get('instagram', {}).get('enabled'):
|
|
print("\nInstagram accounts:")
|
|
for account in config['instagram'].get('accounts', []):
|
|
username = account.get('username')
|
|
interval = account.get('check_interval_hours', 6)
|
|
run_at_start = account.get('run_at_start', False)
|
|
start_text = " [runs at startup]" if run_at_start else ""
|
|
print(f" @{username}: {interval}h (random {interval/2:.1f}-{interval}h){start_text}")
|
|
|
|
# TikTok accounts
|
|
if config.get('tiktok', {}).get('enabled'):
|
|
print("\nTikTok accounts:")
|
|
for account in config['tiktok'].get('accounts', []):
|
|
username = account.get('username')
|
|
interval = account.get('check_interval_hours', 8)
|
|
run_at_start = account.get('run_at_start', False)
|
|
start_text = " [runs at startup]" if run_at_start else ""
|
|
print(f" @{username}: {interval}h (random {interval/2:.1f}-{interval}h){start_text}")
|
|
|
|
# Forum configurations
|
|
if config.get('forums', {}).get('enabled'):
|
|
print("\nForum configurations:")
|
|
for forum in config['forums'].get('configs', []):
|
|
name = forum.get('name')
|
|
interval = forum.get('check_interval_hours', 12)
|
|
run_at_start = forum.get('run_at_start', False)
|
|
start_text = " [runs at startup]" if run_at_start else ""
|
|
print(f" {name}: {interval}h (random {interval/2:.1f}-{interval}h){start_text}")
|
|
|
|
print("\nPress Ctrl+C to stop the scheduler\n")
|
|
|
|
# Start scheduler
|
|
scheduler.start()
|
|
|
|
# Dependency updates are handled by the scheduler when idle
|
|
# This ensures updates run safely when no tasks are active
|
|
logger.info("Dependency updates handled by scheduler (runs when idle)", module="Core")
|
|
|
|
# Keep running
|
|
import time
|
|
while True:
|
|
time.sleep(60) # Main loop heartbeat
|
|
|
|
except KeyboardInterrupt:
|
|
# Signal handler will take care of graceful shutdown
|
|
# This is a fallback in case signal handler wasn't triggered
|
|
if not shutdown_requested:
|
|
print("\nStopping scheduler...")
|
|
scheduler.stop()
|
|
dl = getattr(scheduler, 'downloader', None)
|
|
if dl:
|
|
dl.cleanup_all_temp_dirs()
|
|
if unified_db:
|
|
unified_db.close()
|
|
print("Scheduler stopped")
|
|
return
|
|
|
|
# Run downloader normally (single run)
|
|
try:
|
|
downloader = MediaDownloader(config_path=args.config)
|
|
|
|
if args.test:
|
|
print("Running in test mode - limited downloads")
|
|
# Temporarily reduce limits
|
|
if 'instagram' in downloader.config:
|
|
downloader.config['instagram']['max_downloads_per_user'] = 2
|
|
if 'tiktok' in downloader.config:
|
|
downloader.config['tiktok']['max_downloads'] = 2
|
|
|
|
hidden_modules = downloader.config.get('hidden_modules', [])
|
|
if args.platform != 'all' and args.platform in hidden_modules:
|
|
downloader.logger.warning(f"Module '{args.platform}' is disabled. Skipping.", module="Core")
|
|
elif args.platform == 'all':
|
|
downloader.run()
|
|
elif args.platform == 'instagram_unified':
|
|
downloader.download_instagram_unified()
|
|
elif args.platform == 'instagram':
|
|
downloader.download_instagram()
|
|
elif args.platform == 'fastdl':
|
|
downloader.download_fastdl()
|
|
elif args.platform == 'imginn':
|
|
downloader.download_imginn()
|
|
elif args.platform == 'imginn_api':
|
|
downloader.download_imginn_api()
|
|
elif args.platform == 'instagram_client':
|
|
downloader.download_instagram_client()
|
|
elif args.platform == 'toolzu':
|
|
downloader.download_toolzu()
|
|
elif args.platform == 'snapchat':
|
|
downloader.download_snapchat()
|
|
elif args.platform == 'snapchat_client':
|
|
downloader.download_snapchat_client()
|
|
elif args.platform == 'tiktok':
|
|
downloader.download_tiktok()
|
|
elif args.platform == 'forums':
|
|
downloader.download_forums()
|
|
elif args.platform == 'coppermine':
|
|
downloader.download_coppermine()
|
|
|
|
if downloader.config.get('immich', {}).get('scan_after_download'):
|
|
downloader.trigger_immich_scan()
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nDownload interrupted by user")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |