3243 lines
154 KiB
Python
Executable File
3243 lines
154 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Scheduler Module for Media Downloader
|
|
Handles random scheduling within specified time windows for individual accounts
|
|
"""
|
|
|
|
# CRITICAL: Apply nest_asyncio FIRST before any other imports
|
|
# This allows Playwright sync API to work inside the scheduler
|
|
try:
|
|
import nest_asyncio
|
|
nest_asyncio.apply()
|
|
except ImportError:
|
|
pass
|
|
|
|
import random
|
|
import time
|
|
import sqlite3
|
|
import gc
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Optional, Callable, List
|
|
import threading
|
|
import json
|
|
from pathlib import Path
|
|
from modules.base_module import LoggingMixin
|
|
from modules.universal_logger import get_logger
|
|
from modules.youtube_channel_monitor import YouTubeChannelMonitor
|
|
from modules.easynews_monitor import EasynewsMonitor
|
|
from modules.dependency_updater import DependencyUpdater
|
|
from modules.reddit_community_monitor import RedditCommunityMonitor
|
|
from modules.task_checkpoint import TaskCheckpoint, STALE_THRESHOLD_HOURS
|
|
|
|
class DownloadScheduler(LoggingMixin):
|
|
"""
|
|
Scheduler that runs individual downloaders at random intervals within the second half of specified windows
|
|
|
|
For example: If check_interval is 6 hours, it will run randomly between 3-6 hours
|
|
This helps prevent detection by avoiding predictable patterns
|
|
"""
|
|
|
|
def __init__(self, config_path: str = None, log_callback: Callable = None, unified_db=None, settings_manager=None):
|
|
"""
|
|
Initialize the scheduler
|
|
|
|
Args:
|
|
config_path: Path to configuration file (DEPRECATED - use settings_manager instead)
|
|
log_callback: Callback function for logging
|
|
unified_db: UnifiedDatabase instance for state persistence
|
|
settings_manager: SettingsManager instance for reading configuration from database (PREFERRED)
|
|
"""
|
|
self.config_path = Path(config_path) if config_path else None
|
|
self.settings_manager = settings_manager
|
|
|
|
# Temporarily set show_debug for early config loading
|
|
self.show_debug = False
|
|
self.config = self._load_config()
|
|
actual_show_debug = self.config.get('download_settings', {}).get('show_debug', False)
|
|
|
|
# Initialize logging via mixin
|
|
self._init_logger('Scheduler', log_callback if log_callback != print else None, default_module='Scheduler', show_debug=actual_show_debug)
|
|
self.last_run_times = {} # Format: "platform:account" -> datetime
|
|
self.scheduled_tasks = {} # Format: "platform:account" -> next_run_datetime
|
|
self.task_callbacks = {} # Store callbacks for each task
|
|
self.task_intervals = {} # Store check intervals for each task
|
|
self.task_daily_times = {} # Store (hour, minute) for daily-at-time tasks
|
|
self.task_status = {} # Track success/failure of each task
|
|
self.running = False
|
|
self.unified_db = unified_db
|
|
self.downloader = None # Will be set when start() is called
|
|
|
|
# Initialize fingerprint database for dynamic browser fingerprinting
|
|
if unified_db:
|
|
from modules.cloudflare_handler import set_fingerprint_database
|
|
set_fingerprint_database(unified_db)
|
|
|
|
# Use a separate lightweight database for scheduler state
|
|
self.scheduler_db_path = Path(__file__).parent.parent / 'database' / 'scheduler_state.db'
|
|
# Ensure database directory exists
|
|
self.scheduler_db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._init_scheduler_db()
|
|
|
|
# Track current activity for real-time status
|
|
self.current_activity_file = Path(__file__).parent.parent / 'database' / 'current_activity.json'
|
|
self.current_task = None
|
|
self.current_task_start_time = None
|
|
|
|
# Thread lock for state protection (guards current_task and related state)
|
|
import threading
|
|
self._state_lock = threading.Lock()
|
|
|
|
# Background task tracking for parallel dispatch
|
|
self._background_task_ids = set() # Task IDs that run in background threads
|
|
self._running_background_tasks = set() # Currently running background task IDs
|
|
self._max_concurrent_background = 0 # 0 = unlimited concurrent background tasks
|
|
|
|
# Crash recovery: checkpoint for the currently executing task
|
|
self._current_checkpoint = None
|
|
|
|
# Initialize activity status manager for database-backed tracking
|
|
from modules.activity_status import get_activity_manager
|
|
self.activity_manager = get_activity_manager(unified_db)
|
|
|
|
# Initialize YouTube channel monitor
|
|
self.youtube_monitor = None # Will be initialized in start()
|
|
|
|
# Initialize Easynews monitor
|
|
self.easynews_monitor = None # Will be initialized in start()
|
|
|
|
# Initialize dependency updater (will be configured in start())
|
|
self.dependency_updater = None
|
|
self.last_dependency_check = None
|
|
|
|
# Task ID to display name mapping
|
|
self._task_display_names = {
|
|
'instagram_unified': 'Instagram',
|
|
}
|
|
|
|
# Load state from database or file
|
|
self._load_state()
|
|
|
|
def _display_task_id(self, task_id: str) -> str:
|
|
"""Convert internal task_id to a human-friendly display name."""
|
|
platform, _, rest = task_id.partition(':')
|
|
name = self._task_display_names.get(platform, platform)
|
|
return f"{name}:{rest}" if rest else name
|
|
|
|
def _init_scheduler_db(self):
|
|
"""Initialize separate scheduler database"""
|
|
import sqlite3
|
|
from contextlib import closing
|
|
|
|
with closing(sqlite3.connect(str(self.scheduler_db_path))) as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=30000")
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS scheduler_state (
|
|
task_id TEXT PRIMARY KEY,
|
|
last_run TEXT,
|
|
next_run TEXT,
|
|
run_count INTEGER DEFAULT 0,
|
|
status TEXT DEFAULT 'active',
|
|
last_download_count INTEGER DEFAULT 0
|
|
)
|
|
''')
|
|
|
|
# Add column if it doesn't exist (for existing databases)
|
|
cursor.execute("PRAGMA table_info(scheduler_state)")
|
|
columns = [col[1] for col in cursor.fetchall()]
|
|
if 'last_download_count' not in columns:
|
|
cursor.execute('ALTER TABLE scheduler_state ADD COLUMN last_download_count INTEGER DEFAULT 0')
|
|
|
|
# Create checkpoint table for crash recovery
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS scheduler_task_checkpoints (
|
|
task_id TEXT PRIMARY KEY,
|
|
task_type TEXT NOT NULL,
|
|
started_at TEXT NOT NULL,
|
|
completed_items TEXT DEFAULT '[]',
|
|
current_item TEXT,
|
|
total_items INTEGER DEFAULT 0,
|
|
status TEXT DEFAULT 'running',
|
|
updated_at TEXT
|
|
)
|
|
''')
|
|
|
|
# Clean up legacy task IDs that have been replaced with new naming convention
|
|
legacy_task_ids = [
|
|
'appearance_reminders', # Replaced by appearances:reminders
|
|
'tmdb_appearances_sync', # Replaced by appearances:tmdb_sync
|
|
]
|
|
for legacy_id in legacy_task_ids:
|
|
cursor.execute('DELETE FROM scheduler_state WHERE task_id = ?', (legacy_id,))
|
|
|
|
conn.commit()
|
|
|
|
def _load_config(self) -> Dict:
|
|
"""Load configuration from database (settings_manager is required)"""
|
|
# Load from database via settings_manager
|
|
if self.settings_manager:
|
|
try:
|
|
config = self.settings_manager.get_all()
|
|
if config:
|
|
self.log("Loaded configuration from database", "debug")
|
|
return config
|
|
else:
|
|
self.log("No configuration found in database", "warning")
|
|
return {}
|
|
except Exception as e:
|
|
self.log(f"Failed to load config from database: {e}", "error")
|
|
return {}
|
|
|
|
# Legacy fallback to JSON file (deprecated)
|
|
if self.config_path and self.config_path.exists():
|
|
self.log("WARNING: Loading from JSON file is deprecated. Use settings_manager instead.", "warning")
|
|
with open(self.config_path, 'r') as f:
|
|
config = json.load(f)
|
|
self.log("Loaded configuration from JSON file (DEPRECATED)", "debug")
|
|
return config
|
|
|
|
self.log("No configuration source available (settings_manager=None, no JSON file)", "error")
|
|
return {}
|
|
|
|
def reload_config(self):
|
|
"""Reload configuration from database/file"""
|
|
self.config = self._load_config()
|
|
self.show_debug = self.config.get('download_settings', {}).get('show_debug', False)
|
|
self.log("Configuration reloaded", "info")
|
|
|
|
def _load_state(self):
|
|
"""Load last run times and next scheduled runs from separate scheduler database"""
|
|
import sqlite3
|
|
from contextlib import closing
|
|
|
|
try:
|
|
with closing(sqlite3.connect(str(self.scheduler_db_path))) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT task_id, last_run, next_run, status FROM scheduler_state')
|
|
for row in cursor.fetchall():
|
|
task_id = row[0]
|
|
last_run = row[1]
|
|
next_run = row[2]
|
|
status = row[3]
|
|
|
|
if last_run:
|
|
self.last_run_times[task_id] = last_run if isinstance(last_run, datetime) else datetime.fromisoformat(last_run)
|
|
|
|
# Load next run time if it exists
|
|
if next_run:
|
|
next_run_dt = next_run if isinstance(next_run, datetime) else datetime.fromisoformat(next_run)
|
|
# Check if the scheduled time was missed
|
|
if next_run_dt < datetime.now():
|
|
self.log(f"Task {self._display_task_id(task_id)} missed scheduled run at {next_run}, running immediately", "warning")
|
|
# Keep original time so most overdue task runs first
|
|
self.scheduled_tasks[task_id] = next_run_dt
|
|
else:
|
|
# Keep the scheduled time
|
|
self.scheduled_tasks[task_id] = next_run_dt
|
|
self.log(f"Task {task_id} scheduled for {next_run}", "debug")
|
|
|
|
if status:
|
|
self.task_status[task_id] = status
|
|
except Exception as e:
|
|
if "no such table" not in str(e).lower():
|
|
self.log(f"Error loading state: {e}", "debug")
|
|
self.last_run_times = {}
|
|
self.task_status = {}
|
|
self.scheduled_tasks = {}
|
|
|
|
def _save_task_download_count(self, task_id: str, count: int):
|
|
"""Save download count for a task"""
|
|
import sqlite3
|
|
from contextlib import closing
|
|
|
|
try:
|
|
with closing(sqlite3.connect(str(self.scheduler_db_path), timeout=30)) as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
UPDATE scheduler_state
|
|
SET last_download_count = ?
|
|
WHERE task_id = ?
|
|
''', (count, task_id))
|
|
|
|
conn.commit()
|
|
except Exception as e:
|
|
self.log(f"Failed to save download count for {task_id}: {e}", "debug")
|
|
|
|
# Display names for special tasks (without :account suffix)
|
|
SPECIAL_TASK_DISPLAY = {
|
|
'appearances:tmdb_sync': ('Appearances', 'TMDb Sync'),
|
|
'appearances:reminders': ('Appearances', 'Daily Reminders'),
|
|
'youtube_channel_monitor': ('YouTube', 'Channel Monitor'),
|
|
'youtube_monitor:paused': ('YouTube', 'Paused Channel Re-check'),
|
|
'easynews_monitor': ('Easynews', 'Media Monitor'),
|
|
'reddit_monitor': ('Private Gallery', 'Reddit Monitor'),
|
|
'press_monitor': ('News', 'Press Monitor'),
|
|
# Legacy names for backwards compatibility
|
|
'tmdb_appearances_sync': ('Appearances', 'TMDb Sync'),
|
|
'appearance_reminders': ('Appearances', 'Daily Reminders'),
|
|
}
|
|
|
|
def _update_current_activity(self, task_id: str = None, status: str = None):
|
|
"""Update current activity status for real-time monitoring using database"""
|
|
try:
|
|
if task_id is None:
|
|
# Clear activity
|
|
self.activity_manager.stop_activity()
|
|
else:
|
|
# Check for special task display names
|
|
if task_id in self.SPECIAL_TASK_DISPLAY:
|
|
platform, account = self.SPECIAL_TASK_DISPLAY[task_id]
|
|
else:
|
|
# Standard format: platform:account
|
|
PLATFORM_LABELS = {
|
|
'instagram_unified': 'Instagram',
|
|
'imginn_api': 'Instagram',
|
|
'imginn': 'Instagram',
|
|
'fastdl': 'Instagram',
|
|
'instagram_client': 'Instagram',
|
|
'toolzu': 'Instagram',
|
|
'snapchat_client': 'Snapchat',
|
|
}
|
|
parts = task_id.split(':', 1)
|
|
raw_platform = parts[0] if len(parts) > 0 else 'Unknown'
|
|
platform = PLATFORM_LABELS.get(raw_platform, raw_platform.title())
|
|
account = parts[1] if len(parts) > 1 else None
|
|
|
|
self.activity_manager.start_activity(
|
|
task_id=task_id,
|
|
platform=platform,
|
|
account=account,
|
|
status=status or "Running"
|
|
)
|
|
except Exception as e:
|
|
self.log(f"Failed to update current activity: {e}", "debug")
|
|
|
|
def _save_single_task_state(self, task_id: str, next_run: datetime):
|
|
"""Save state for a single task to scheduler database"""
|
|
import sqlite3
|
|
from contextlib import closing
|
|
|
|
try:
|
|
with closing(sqlite3.connect(str(self.scheduler_db_path), timeout=30)) as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
cursor = conn.cursor()
|
|
|
|
last_run = self.last_run_times.get(task_id)
|
|
|
|
cursor.execute('''
|
|
INSERT OR REPLACE INTO scheduler_state
|
|
(task_id, last_run, next_run, run_count, status, last_download_count)
|
|
VALUES (?, ?, ?,
|
|
COALESCE((SELECT run_count FROM scheduler_state WHERE task_id = ?), 0),
|
|
'active',
|
|
COALESCE((SELECT last_download_count FROM scheduler_state WHERE task_id = ?), 0))
|
|
''', (
|
|
task_id,
|
|
last_run.isoformat() if last_run else None,
|
|
next_run.isoformat() if next_run else None,
|
|
task_id,
|
|
task_id
|
|
))
|
|
|
|
conn.commit()
|
|
except Exception as e:
|
|
self.log(f"Failed to save task state for {task_id}: {e}", "debug")
|
|
|
|
def _save_state(self):
|
|
"""Save last run times to separate scheduler database"""
|
|
import sqlite3
|
|
from contextlib import closing
|
|
|
|
# Always use the separate scheduler database
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
with closing(sqlite3.connect(str(self.scheduler_db_path), timeout=30)) as conn:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=30000")
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Save all states
|
|
for task_id, last_run in self.last_run_times.items():
|
|
next_run = self.scheduled_tasks.get(task_id)
|
|
# Always use 'active' for scheduled tasks to ensure they show up
|
|
cursor.execute('''
|
|
INSERT OR REPLACE INTO scheduler_state
|
|
(task_id, last_run, next_run, run_count, status)
|
|
VALUES (?, ?, ?,
|
|
COALESCE((SELECT run_count FROM scheduler_state WHERE task_id = ?), 0) + 1,
|
|
'active')
|
|
''', (
|
|
task_id,
|
|
last_run.isoformat() if last_run else None,
|
|
next_run.isoformat() if next_run else None,
|
|
task_id
|
|
))
|
|
|
|
conn.commit()
|
|
return # Success
|
|
|
|
except sqlite3.OperationalError as e:
|
|
if "locked" in str(e) and attempt < max_retries - 1:
|
|
time.sleep(1 * (attempt + 1)) # Exponential backoff
|
|
continue
|
|
else:
|
|
self.log(f"Failed to save state after {attempt + 1} attempts: {e}", "warning")
|
|
break
|
|
except Exception as e:
|
|
self.log(f"Error saving state: {e}", "warning")
|
|
break
|
|
|
|
def _save_state_to_file(self):
|
|
"""Save state to JSON file as fallback"""
|
|
try:
|
|
state_file = Path(__file__).parent.parent / 'config' / 'scheduler_state.json'
|
|
state_file.parent.mkdir(exist_ok=True)
|
|
state = {
|
|
'last_run_times': {
|
|
task_id: timestamp.isoformat()
|
|
for task_id, timestamp in self.last_run_times.items()
|
|
}
|
|
}
|
|
with open(state_file, 'w') as f:
|
|
json.dump(state, f, indent=2)
|
|
except Exception as e:
|
|
self.log(f"Error saving state to file: {e}", "warning")
|
|
|
|
def calculate_next_run(self, task_id: str, check_interval_hours: float) -> datetime:
|
|
"""
|
|
Calculate next run time in the second half of the interval
|
|
|
|
Args:
|
|
task_id: Unique task identifier (e.g., "instagram:username")
|
|
check_interval_hours: Interval in hours (e.g., 6 hours)
|
|
|
|
Returns:
|
|
datetime: Next scheduled run time
|
|
"""
|
|
now = datetime.now()
|
|
last_run = self.last_run_times.get(task_id, now)
|
|
|
|
# Calculate the window
|
|
# First half: 0 to check_interval_hours/2
|
|
# Second half: check_interval_hours/2 to check_interval_hours
|
|
half_interval = check_interval_hours / 2
|
|
|
|
# Random time in second half of interval
|
|
random_offset = random.uniform(half_interval, check_interval_hours)
|
|
|
|
# Calculate next run time
|
|
next_run = last_run + timedelta(hours=random_offset)
|
|
|
|
# If the calculated time is in the past, schedule for the near future
|
|
if next_run <= now:
|
|
# Add a small random delay (1-30 minutes)
|
|
next_run = now + timedelta(minutes=random.randint(1, 30))
|
|
|
|
self.log(f"Scheduled {self._display_task_id(task_id)} for {next_run.strftime('%Y-%m-%d %H:%M:%S')} "
|
|
f"(in {(next_run - now).total_seconds() / 3600:.1f} hours)", "info")
|
|
|
|
return next_run
|
|
|
|
def schedule_task(self, task_id: str, check_interval_hours: float, callback: Callable):
|
|
"""
|
|
Schedule an individual task to run at random intervals
|
|
|
|
Args:
|
|
task_id: Unique task identifier (e.g., "instagram:evalongoria")
|
|
check_interval_hours: Check interval in hours
|
|
callback: Function to call when scheduled
|
|
"""
|
|
# Only calculate new run time if task doesn't already have a scheduled time
|
|
# (This preserves schedules loaded from database after restart)
|
|
with self._state_lock:
|
|
if task_id not in self.scheduled_tasks:
|
|
next_run = self.calculate_next_run(task_id, check_interval_hours)
|
|
self.scheduled_tasks[task_id] = next_run
|
|
else:
|
|
next_run = self.scheduled_tasks[task_id]
|
|
self.log(f"Task {task_id} already scheduled for {next_run}", "debug")
|
|
|
|
# Store callback and interval for sequential execution
|
|
self.task_callbacks[task_id] = callback
|
|
self.task_intervals[task_id] = check_interval_hours
|
|
|
|
# Capture next_run while holding lock for save
|
|
next_run_for_save = next_run
|
|
|
|
# Save to scheduler database immediately (using captured value)
|
|
self._save_single_task_state(task_id, next_run_for_save)
|
|
|
|
def schedule_daily_at_time(self, task_id: str, hour: int, minute: int, callback: Callable):
|
|
"""
|
|
Schedule a task to run daily at a specific time.
|
|
|
|
Args:
|
|
task_id: Unique task identifier
|
|
hour: Hour to run (0-23)
|
|
minute: Minute to run (0-59)
|
|
callback: Function to call when scheduled
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
|
|
now = datetime.now()
|
|
target_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
|
|
# If target time already passed today, schedule for tomorrow
|
|
if target_time <= now:
|
|
target_time += timedelta(days=1)
|
|
|
|
with self._state_lock:
|
|
# Check if task already has a valid scheduled time for today/tomorrow at target hour
|
|
if task_id in self.scheduled_tasks:
|
|
existing = self.scheduled_tasks[task_id]
|
|
if existing > now and existing.hour == hour and existing.minute == minute:
|
|
self.log(f"Task {task_id} already scheduled for {existing}", "debug")
|
|
target_time = existing
|
|
else:
|
|
self.scheduled_tasks[task_id] = target_time
|
|
else:
|
|
self.scheduled_tasks[task_id] = target_time
|
|
|
|
self.task_callbacks[task_id] = callback
|
|
self.task_intervals[task_id] = 24.0 # Daily = 24 hours
|
|
self.task_daily_times[task_id] = (hour, minute) # Store the target time
|
|
|
|
self._save_single_task_state(task_id, target_time)
|
|
self.log(f"Scheduled {self._display_task_id(task_id)} to run daily at {hour:02d}:{minute:02d} (next: {target_time})", "info")
|
|
|
|
def _execute_background_task(self, task_id: str):
|
|
"""Execute a background task with overlap protection.
|
|
|
|
Background tasks are dispatched immediately without Cloudflare throttling,
|
|
gc.collect, or model release overhead. They run in daemon threads and
|
|
return immediately. Overlap protection prevents the same task from running
|
|
twice concurrently.
|
|
"""
|
|
with self._state_lock:
|
|
if task_id in self._running_background_tasks:
|
|
self.log(f"Skipping {task_id} - already running in background", "debug")
|
|
# Still reschedule so it doesn't keep firing every loop iteration
|
|
check_interval_hours = self.task_intervals.get(task_id)
|
|
if check_interval_hours:
|
|
daily_time = self.task_daily_times.get(task_id)
|
|
if daily_time:
|
|
hour, minute = daily_time
|
|
next_run = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=1)
|
|
else:
|
|
next_run = self.calculate_next_run(task_id, check_interval_hours)
|
|
self.scheduled_tasks[task_id] = next_run
|
|
self._save_state()
|
|
return
|
|
self._running_background_tasks.add(task_id)
|
|
|
|
callback = self.task_callbacks.get(task_id)
|
|
check_interval_hours = self.task_intervals.get(task_id)
|
|
if not callback or not check_interval_hours:
|
|
with self._state_lock:
|
|
self._running_background_tasks.discard(task_id)
|
|
return
|
|
|
|
self.log(f"Running background task: {self._display_task_id(task_id)}", "info")
|
|
self.last_run_times[task_id] = datetime.now()
|
|
|
|
try:
|
|
callback()
|
|
except Exception as e:
|
|
self.log(f"Error in background task {self._display_task_id(task_id)}: {e}", "error")
|
|
|
|
# Save download count (0 for background tasks)
|
|
self._save_task_download_count(task_id, 0)
|
|
|
|
# Reschedule
|
|
with self._state_lock:
|
|
daily_time = self.task_daily_times.get(task_id)
|
|
if daily_time:
|
|
hour, minute = daily_time
|
|
next_run = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=1)
|
|
else:
|
|
next_run = self.calculate_next_run(task_id, check_interval_hours)
|
|
with self._state_lock:
|
|
self.scheduled_tasks[task_id] = next_run
|
|
self._save_state()
|
|
|
|
def _execute_task(self, task_id: str):
|
|
"""
|
|
Execute a single task and reschedule it
|
|
|
|
Args:
|
|
task_id: Unique task identifier
|
|
"""
|
|
# Get callback and interval for this task (protected by lock to prevent race)
|
|
with self._state_lock:
|
|
callback = self.task_callbacks.get(task_id)
|
|
check_interval_hours = self.task_intervals.get(task_id)
|
|
|
|
if not callback or not check_interval_hours:
|
|
self.log(f"Task {self._display_task_id(task_id)} has no callback or interval", "warning")
|
|
return
|
|
|
|
# Track current task for real-time status (protected by lock)
|
|
with self._state_lock:
|
|
self.current_task = task_id
|
|
self.current_task_start_time = datetime.now()
|
|
self.last_run_times[task_id] = datetime.now()
|
|
self._update_current_activity(task_id, "running")
|
|
|
|
# Add throttling for Cloudflare-protected modules to avoid rate limiting
|
|
# Extract module name from task_id (e.g., "imginn:username" -> "imginn")
|
|
cloudflare_modules = {'imginn', 'toolzu', 'snapchat', 'fastdl'}
|
|
current_module = task_id.split(':')[0] if ':' in task_id else task_id
|
|
|
|
if current_module in cloudflare_modules:
|
|
# Check if we recently ran a task from the same module
|
|
last_cf_task = getattr(self, '_last_cloudflare_task', None)
|
|
last_cf_time = getattr(self, '_last_cloudflare_time', None)
|
|
|
|
if last_cf_task and last_cf_time:
|
|
last_module = last_cf_task.split(':')[0] if ':' in last_cf_task else last_cf_task
|
|
if last_module == current_module:
|
|
# Same Cloudflare module - add random delay
|
|
elapsed = (datetime.now() - last_cf_time).total_seconds()
|
|
min_delay = 20 # Minimum 20 seconds between same-module tasks
|
|
if elapsed < min_delay:
|
|
delay = min_delay - elapsed + random.uniform(5, 20)
|
|
self.log(f"Cloudflare throttle: waiting {delay:.1f}s before {self._display_task_id(task_id)}", "info")
|
|
time.sleep(delay)
|
|
|
|
# Update last Cloudflare task tracking
|
|
self._last_cloudflare_task = task_id
|
|
self._last_cloudflare_time = datetime.now()
|
|
|
|
# Run the task
|
|
self.log(f"Running task: {self._display_task_id(task_id)}", "info")
|
|
|
|
# Create a checkpoint for crash recovery (scrapers can use it via self._current_checkpoint)
|
|
checkpoint = TaskCheckpoint(task_id)
|
|
self._current_checkpoint = checkpoint
|
|
|
|
download_count = 0
|
|
try:
|
|
# Run the callback and try to capture download count
|
|
result = callback()
|
|
|
|
# If callback returns a number, use it as download count
|
|
if isinstance(result, int):
|
|
download_count = result
|
|
|
|
self.log(f"Completed task: {self._display_task_id(task_id)}", "success")
|
|
with self._state_lock:
|
|
self.task_status[task_id] = 'active' # Keep as active so it remains schedulable
|
|
except Exception as e:
|
|
self.log(f"Error in task {self._display_task_id(task_id)}: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
with self._state_lock:
|
|
self.task_status[task_id] = 'failed'
|
|
finally:
|
|
# Clean up checkpoint (deletes record on success, no-op if not started)
|
|
checkpoint.finish_if_started()
|
|
self._current_checkpoint = None
|
|
# Always clear current task tracking (protected by lock)
|
|
with self._state_lock:
|
|
self.current_task = None
|
|
self.current_task_start_time = None
|
|
self._update_current_activity(None)
|
|
|
|
# Auto-import to private gallery if account is mapped
|
|
if download_count > 0:
|
|
try:
|
|
from modules.scraper_gallery_bridge import get_crypto, on_download_complete
|
|
crypto = get_crypto()
|
|
if crypto:
|
|
on_download_complete(task_id, download_count, self.unified_db, crypto)
|
|
except Exception as e:
|
|
self.log(f"Gallery bridge error for {self._display_task_id(task_id)}: {e}", "warning")
|
|
|
|
# Save download count to database
|
|
self._save_task_download_count(task_id, download_count)
|
|
|
|
# Release the download module that was just used to free memory
|
|
# Modules are lazy-loaded and will be re-created on next task
|
|
if self.downloader and hasattr(self.downloader, 'modules') and hasattr(self.downloader.modules, 'release'):
|
|
module_key = task_id.split(':')[0] if ':' in task_id else task_id
|
|
# Skip special tasks that don't map to a module key
|
|
if module_key not in ('health_check', 'dependency_update', 'cleanup'):
|
|
try:
|
|
self.downloader.modules.release(module_key)
|
|
self.log(f"Released module '{module_key}' to free memory", "debug")
|
|
except Exception:
|
|
pass
|
|
|
|
# Force garbage collection after each task to prevent memory accumulation
|
|
# This is important for ML models (InsightFace, face_recognition) that can leak memory
|
|
gc.collect()
|
|
|
|
# Release ML models to free memory (they will be lazy-loaded again when needed)
|
|
# This is critical for preventing OOM in long-running scheduler processes
|
|
if self.downloader and hasattr(self.downloader, 'move_manager') and self.downloader.move_manager:
|
|
self.downloader.move_manager.release_models()
|
|
|
|
# Schedule next run
|
|
# Check if this is a daily-at-time task
|
|
with self._state_lock:
|
|
daily_time = self.task_daily_times.get(task_id)
|
|
|
|
if daily_time:
|
|
# Schedule for same time tomorrow
|
|
hour, minute = daily_time
|
|
from datetime import timedelta
|
|
next_run = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=1)
|
|
else:
|
|
next_run = self.calculate_next_run(task_id, check_interval_hours)
|
|
|
|
with self._state_lock:
|
|
self.scheduled_tasks[task_id] = next_run
|
|
|
|
# Save state after task completion and next run scheduling
|
|
self._save_state()
|
|
|
|
def _check_dependency_updates_if_idle(self):
|
|
"""
|
|
Check and run dependency updates when scheduler is idle.
|
|
Only runs if:
|
|
- Dependency updater is initialized and enabled
|
|
- No tasks are currently running
|
|
- Enough time has passed since last check (based on config interval)
|
|
|
|
For safety, this triggers an external script that:
|
|
1. Stops the scheduler
|
|
2. Runs all updates
|
|
3. Restarts API and scheduler
|
|
"""
|
|
if not self.dependency_updater:
|
|
return
|
|
|
|
# Check if updates are enabled
|
|
config = self._load_config()
|
|
update_config = config.get('dependency_updater', {}) or config.get('dependency_updates', {})
|
|
if not update_config.get('enabled', True):
|
|
return
|
|
|
|
# Check interval (default 24 hours)
|
|
check_interval_hours = update_config.get('check_interval_hours', 24)
|
|
|
|
# Check if enough time has passed since last check
|
|
now = datetime.now()
|
|
if self.last_dependency_check:
|
|
hours_since_last = (now - self.last_dependency_check).total_seconds() / 3600
|
|
if hours_since_last < check_interval_hours:
|
|
return # Not time yet
|
|
|
|
# Double-check we're truly idle (no current task)
|
|
with self._state_lock:
|
|
if self.current_task is not None:
|
|
return # A task started while we were checking
|
|
|
|
# Run dependency updates via external script for safe service handling
|
|
self.log("Scheduler idle - triggering dependency updates", "info")
|
|
self._update_current_activity("dependency_updates", "running")
|
|
|
|
try:
|
|
with self._state_lock:
|
|
self.current_task = "dependency_updates"
|
|
self.current_task_start_time = now
|
|
|
|
# Use subprocess to run the update script
|
|
# The script will stop this scheduler, run updates, and restart everything
|
|
import subprocess
|
|
script_path = Path(__file__).parent.parent / 'scripts' / 'run-dependency-updates.sh'
|
|
|
|
if script_path.exists():
|
|
self.log("Starting dependency update script (scheduler will be restarted)", "info")
|
|
self.last_dependency_check = now
|
|
|
|
# Run the script - it will stop and restart this scheduler
|
|
# Use Popen so we don't block waiting for it
|
|
subprocess.Popen(
|
|
[str(script_path)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
start_new_session=True # Detach from this process
|
|
)
|
|
|
|
# Give the script a moment to start before we continue
|
|
time.sleep(2)
|
|
else:
|
|
self.log(f"Update script not found: {script_path}", "warning")
|
|
|
|
except Exception as e:
|
|
self.log(f"Dependency update trigger failed: {e}", "error")
|
|
finally:
|
|
with self._state_lock:
|
|
self.current_task = None
|
|
self.current_task_start_time = None
|
|
self._update_current_activity(None)
|
|
|
|
def _main_loop(self):
|
|
"""Main scheduler loop - dispatches background tasks immediately, runs scraping tasks sequentially."""
|
|
self.log("Starting scheduler loop with parallel background dispatch", "info")
|
|
|
|
while self.running:
|
|
try:
|
|
now = datetime.now()
|
|
due_background = []
|
|
due_scraping = []
|
|
next_upcoming_time = None
|
|
|
|
for task_id, scheduled_time in self.scheduled_tasks.items():
|
|
if task_id not in self.task_callbacks:
|
|
continue
|
|
if scheduled_time <= now:
|
|
if task_id in self._background_task_ids:
|
|
due_background.append((task_id, scheduled_time))
|
|
else:
|
|
due_scraping.append((task_id, scheduled_time))
|
|
else:
|
|
if next_upcoming_time is None or scheduled_time < next_upcoming_time:
|
|
next_upcoming_time = scheduled_time
|
|
|
|
# 1. Dispatch ALL due background tasks immediately (they run in threads)
|
|
due_background.sort(key=lambda x: x[1]) # Most overdue first
|
|
for task_id, _ in due_background:
|
|
self._execute_background_task(task_id)
|
|
|
|
# 2. Run ONE most-overdue scraping task (blocks until complete)
|
|
if due_scraping:
|
|
due_scraping.sort(key=lambda x: x[1]) # Most overdue first
|
|
self._execute_task(due_scraping[0][0])
|
|
elif not due_background:
|
|
# No tasks at all - idle time
|
|
self._check_dependency_updates_if_idle()
|
|
if next_upcoming_time:
|
|
sleep_seconds = min((next_upcoming_time - now).total_seconds(), 60)
|
|
if sleep_seconds > 0:
|
|
time.sleep(sleep_seconds)
|
|
else:
|
|
time.sleep(60)
|
|
|
|
except Exception as e:
|
|
self.log(f"Error in scheduler main loop: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
time.sleep(60)
|
|
|
|
def _build_task_registry_from_config(self) -> Dict:
|
|
"""Build a dict of {task_id: task_info} from current config.
|
|
|
|
Each task_info dict contains:
|
|
interval (float): check interval in hours
|
|
callback (callable): function to run
|
|
background (bool): whether to run in background thread
|
|
run_at_start (bool): whether to run immediately on startup
|
|
module (str): module group name for ordering
|
|
daily_time (tuple|None): (hour, minute) for daily tasks
|
|
"""
|
|
config = self._load_config()
|
|
downloader = self.downloader
|
|
tasks = {}
|
|
hidden_modules = config.get('hidden_modules', [])
|
|
|
|
# --- Instagram unified ---
|
|
_instagram_unified_enabled = False
|
|
if config.get('instagram_unified', {}).get('enabled'):
|
|
_instagram_unified_enabled = True
|
|
interval = config['instagram_unified'].get('check_interval_hours', 8)
|
|
run_at_start = config['instagram_unified'].get('run_at_start', False)
|
|
task_id = 'instagram_unified:all'
|
|
if interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': interval,
|
|
'callback': lambda: self._run_instagram_unified(downloader),
|
|
'background': False,
|
|
'run_at_start': run_at_start,
|
|
'module': 'instagram_unified',
|
|
'daily_time': None,
|
|
}
|
|
|
|
# --- Legacy individual Instagram (only if unified NOT enabled) ---
|
|
if not _instagram_unified_enabled:
|
|
if config.get('instagram', {}).get('enabled') and 'instagram' not in hidden_modules:
|
|
accounts = config['instagram'].get('accounts', [])
|
|
if not accounts and 'usernames' in config['instagram']:
|
|
for username in config['instagram']['usernames']:
|
|
accounts.append({'username': username, 'check_interval_hours': 6, 'run_at_start': False})
|
|
for account in accounts:
|
|
username = account.get('username')
|
|
if not username:
|
|
continue
|
|
check_interval = account.get('check_interval_hours', 6)
|
|
run_at_start = account.get('run_at_start', False)
|
|
task_id = f"instagram:{username}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda u=username: self._run_instagram_for_user(downloader, u),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'instagram', 'daily_time': None,
|
|
}
|
|
|
|
if config.get('fastdl', {}).get('enabled') and 'fastdl' not in hidden_modules:
|
|
usernames = config['fastdl'].get('usernames', [])
|
|
check_interval = config['fastdl'].get('check_interval_hours', 6)
|
|
run_at_start = config['fastdl'].get('run_at_start', False)
|
|
if usernames and check_interval > 0:
|
|
tasks['fastdl:all'] = {
|
|
'interval': check_interval,
|
|
'callback': lambda: self._run_fastdl_all(downloader),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'fastdl', 'daily_time': None,
|
|
}
|
|
|
|
if config.get('imginn_api', {}).get('enabled') and 'imginn_api' not in hidden_modules:
|
|
usernames = config['imginn_api'].get('usernames', [])
|
|
check_interval = config['imginn_api'].get('check_interval_hours', 6)
|
|
run_at_start = config['imginn_api'].get('run_at_start', False)
|
|
if usernames and check_interval > 0:
|
|
tasks['imginn_api:all'] = {
|
|
'interval': check_interval,
|
|
'callback': lambda: self._run_imginn_api_all(downloader),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'imginn_api', 'daily_time': None,
|
|
}
|
|
|
|
if config.get('imginn', {}).get('enabled') and 'imginn' not in hidden_modules:
|
|
usernames = config['imginn'].get('usernames', [])
|
|
check_interval = config['imginn'].get('check_interval_hours', 6)
|
|
run_at_start = config['imginn'].get('run_at_start', False)
|
|
for username in usernames:
|
|
task_id = f"imginn:{username}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda u=username: self._run_imginn_for_user(downloader, u),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'imginn', 'daily_time': None,
|
|
}
|
|
|
|
if config.get('instagram_client', {}).get('enabled') and 'instagram_client' not in hidden_modules:
|
|
usernames = config['instagram_client'].get('usernames', [])
|
|
check_interval = config['instagram_client'].get('check_interval_hours', 6)
|
|
run_at_start = config['instagram_client'].get('run_at_start', False)
|
|
for username in usernames:
|
|
task_id = f"instagram_client:{username}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda u=username: self._run_instagram_client_for_user(downloader, u),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'instagram_client', 'daily_time': None,
|
|
}
|
|
|
|
if config.get('toolzu', {}).get('enabled') and 'toolzu' not in hidden_modules:
|
|
usernames = config['toolzu'].get('usernames', [])
|
|
check_interval = config['toolzu'].get('check_interval_hours', 4)
|
|
run_at_start = config['toolzu'].get('run_at_start', False)
|
|
for username in usernames:
|
|
task_id = f"toolzu:{username}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda u=username: self._run_toolzu_for_user(downloader, u),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'toolzu', 'daily_time': None,
|
|
}
|
|
|
|
# --- TikTok ---
|
|
if config.get('tiktok', {}).get('enabled') and 'tiktok' not in hidden_modules:
|
|
accounts = config['tiktok'].get('accounts', [])
|
|
global_run_at_start = config['tiktok'].get('run_at_start', False)
|
|
if not accounts and 'usernames' in config['tiktok']:
|
|
for username in config['tiktok']['usernames']:
|
|
accounts.append({'username': username, 'check_interval_hours': 8, 'run_at_start': global_run_at_start})
|
|
for account in accounts:
|
|
username = account.get('username')
|
|
if not username:
|
|
continue
|
|
check_interval = account.get('check_interval_hours', 8)
|
|
run_at_start = account.get('run_at_start', global_run_at_start)
|
|
task_id = f"tiktok:{username}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda u=username: self._run_tiktok_for_user(downloader, u),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'tiktok', 'daily_time': None,
|
|
}
|
|
|
|
# --- Snapchat ---
|
|
if config.get('snapchat', {}).get('enabled') and 'snapchat' not in hidden_modules:
|
|
usernames = config['snapchat'].get('usernames', [])
|
|
check_interval = config['snapchat'].get('check_interval_hours', 6)
|
|
run_at_start = config['snapchat'].get('run_at_start', False)
|
|
for username in usernames:
|
|
task_id = f"snapchat:{username}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda u=username: self._run_snapchat_for_user(downloader, u),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'snapchat', 'daily_time': None,
|
|
}
|
|
|
|
# --- Snapchat Client ---
|
|
if config.get('snapchat_client', {}).get('enabled') and 'snapchat_client' not in hidden_modules:
|
|
usernames = config['snapchat_client'].get('usernames', [])
|
|
check_interval = config['snapchat_client'].get('check_interval_hours', 6)
|
|
run_at_start = config['snapchat_client'].get('run_at_start', False)
|
|
for username in usernames:
|
|
task_id = f"snapchat_client:{username}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda u=username: self._run_snapchat_client_for_user(downloader, u),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'snapchat_client', 'daily_time': None,
|
|
}
|
|
|
|
# --- Forums ---
|
|
if config.get('forums', {}).get('enabled') and 'forums' not in hidden_modules:
|
|
forum_configs = config['forums'].get('configs', [])
|
|
for forum_config in forum_configs:
|
|
forum_name = forum_config.get('name')
|
|
if not forum_name or not forum_config.get('enabled'):
|
|
continue
|
|
check_interval = forum_config.get('check_interval_hours', 12)
|
|
run_at_start = forum_config.get('run_at_start', False)
|
|
task_id = f"forum:{forum_name}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda f=forum_config: self._run_forum(downloader, f),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'forums', 'daily_time': None,
|
|
}
|
|
if forum_config.get('update_monitored'):
|
|
monitor_interval = forum_config.get('monitor_threads_check_hours', 12)
|
|
monitor_at_start = forum_config.get('monitor_threads_run_at_start', False)
|
|
monitor_task_id = f"monitor:{forum_name}"
|
|
if monitor_interval > 0:
|
|
tasks[monitor_task_id] = {
|
|
'interval': monitor_interval,
|
|
'callback': lambda f=forum_config: self._check_monitored_threads(downloader, f),
|
|
'background': False, 'run_at_start': monitor_at_start, 'module': 'forums', 'daily_time': None,
|
|
}
|
|
|
|
# --- Coppermine ---
|
|
if config.get('coppermine', {}).get('enabled') and 'coppermine' not in hidden_modules:
|
|
galleries = config['coppermine'].get('galleries', [])
|
|
global_check_interval = config['coppermine'].get('check_interval_hours', 12)
|
|
global_run_at_start = config['coppermine'].get('run_at_start', False)
|
|
for gallery in galleries:
|
|
gallery_name = gallery.get('name')
|
|
gallery_url = gallery.get('url')
|
|
if not gallery_name or not gallery_url:
|
|
continue
|
|
check_interval = gallery.get('check_interval_hours', global_check_interval)
|
|
run_at_start = gallery.get('run_at_start', global_run_at_start)
|
|
task_id = f"coppermine:{gallery_name}"
|
|
if check_interval > 0:
|
|
tasks[task_id] = {
|
|
'interval': check_interval,
|
|
'callback': lambda g=gallery: self._run_coppermine_for_gallery(downloader, g),
|
|
'background': False, 'run_at_start': run_at_start, 'module': 'coppermine', 'daily_time': None,
|
|
}
|
|
|
|
# --- YouTube channel monitor ---
|
|
try:
|
|
if self.youtube_monitor:
|
|
yt_settings = self.youtube_monitor.get_global_settings()
|
|
yt_interval = yt_settings.get('check_interval_hours', 6)
|
|
tasks['youtube_channel_monitor'] = {
|
|
'interval': float(yt_interval),
|
|
'callback': lambda: self._run_youtube_monitor(),
|
|
'background': True, 'run_at_start': False, 'module': 'youtube', 'daily_time': None,
|
|
}
|
|
paused_check_interval_days = yt_settings.get('paused_check_interval_days', 14)
|
|
tasks['youtube_monitor:paused'] = {
|
|
'interval': float(paused_check_interval_days * 24),
|
|
'callback': lambda: self._run_youtube_paused_check(),
|
|
'background': False, 'run_at_start': False, 'module': 'youtube', 'daily_time': None,
|
|
}
|
|
except Exception as e:
|
|
self.log(f"Failed to build YouTube monitor tasks: {e}", "error")
|
|
|
|
# --- TMDb appearances sync ---
|
|
try:
|
|
with self.unified_db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT tmdb_enabled, tmdb_check_interval_hours FROM appearance_config WHERE id = 1')
|
|
row = cursor.fetchone()
|
|
if row and row[0]:
|
|
tmdb_interval = row[1] or 12
|
|
tasks['appearances:tmdb_sync'] = {
|
|
'interval': float(tmdb_interval),
|
|
'callback': lambda: self._run_tmdb_sync(),
|
|
'background': True, 'run_at_start': False, 'module': 'appearances', 'daily_time': None,
|
|
}
|
|
except Exception as e:
|
|
self.log(f"Failed to build TMDb sync task: {e}", "error")
|
|
|
|
# --- Easynews monitor ---
|
|
try:
|
|
if self.easynews_monitor:
|
|
easynews_config = self.easynews_monitor.get_config()
|
|
if easynews_config.get('enabled') and easynews_config.get('has_credentials'):
|
|
easynews_interval = easynews_config.get('check_interval_hours', 12)
|
|
tasks['easynews_monitor'] = {
|
|
'interval': float(easynews_interval),
|
|
'callback': lambda: self._run_easynews_check(),
|
|
'background': True, 'run_at_start': False, 'module': 'easynews', 'daily_time': None,
|
|
}
|
|
except Exception as e:
|
|
self.log(f"Failed to build Easynews monitor task: {e}", "error")
|
|
|
|
# --- Reddit community monitor ---
|
|
try:
|
|
if hasattr(self, 'reddit_monitor') and self.reddit_monitor:
|
|
reddit_settings = self.reddit_monitor.get_settings()
|
|
reddit_interval = float(reddit_settings.get('check_interval_hours', 4))
|
|
if reddit_settings.get('enabled'):
|
|
tasks['reddit_monitor'] = {
|
|
'interval': reddit_interval,
|
|
'callback': lambda: self._run_reddit_monitor(),
|
|
'background': True, 'run_at_start': False, 'module': 'reddit', 'daily_time': None,
|
|
}
|
|
except Exception as e:
|
|
self.log(f"Failed to build Reddit monitor task: {e}", "error")
|
|
|
|
# --- Press monitor ---
|
|
try:
|
|
with self.unified_db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT enabled, check_interval_hours FROM press_config WHERE id = 1')
|
|
press_row = cursor.fetchone()
|
|
if press_row and press_row['enabled']:
|
|
press_interval = float(press_row['check_interval_hours'] or 6)
|
|
tasks['press_monitor'] = {
|
|
'interval': press_interval,
|
|
'callback': lambda: self._run_press_monitor(),
|
|
'background': True, 'run_at_start': False, 'module': 'press', 'daily_time': None,
|
|
}
|
|
except Exception as e:
|
|
self.log(f"Failed to build Press monitor task: {e}", "error")
|
|
|
|
# --- Appearance reminders (daily at midnight) ---
|
|
try:
|
|
with self.unified_db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT notify_new_appearances FROM appearance_config WHERE id = 1')
|
|
row = cursor.fetchone()
|
|
if row and row[0]:
|
|
tasks['appearances:reminders'] = {
|
|
'interval': 24.0,
|
|
'callback': lambda: self._run_appearance_reminders(),
|
|
'background': False, 'run_at_start': False, 'module': 'appearances',
|
|
'daily_time': (0, 0),
|
|
}
|
|
except Exception as e:
|
|
self.log(f"Failed to build appearance reminders task: {e}", "error")
|
|
|
|
# --- Paid content sync ---
|
|
try:
|
|
from modules.paid_content import PaidContentDBAdapter
|
|
pc_db = PaidContentDBAdapter(self.unified_db)
|
|
pc_config = pc_db.get_config()
|
|
enabled_creators = pc_db.get_creators(enabled_only=True)
|
|
if enabled_creators:
|
|
check_interval = pc_config.get('check_interval_hours', 6)
|
|
if check_interval > 0:
|
|
tasks['paid_content:sync'] = {
|
|
'interval': check_interval,
|
|
'callback': lambda: self._run_paid_content_sync(),
|
|
'background': True, 'run_at_start': False, 'module': 'paid_content', 'daily_time': None,
|
|
}
|
|
except Exception as e:
|
|
self.log(f"Failed to build Paid Content sync task: {e}", "error")
|
|
|
|
return tasks
|
|
|
|
def reload_scheduled_tasks(self) -> dict:
|
|
"""Reload config and sync task registrations without restart.
|
|
|
|
Returns dict with 'added', 'removed', 'modified' lists of task IDs.
|
|
"""
|
|
if not self.downloader:
|
|
return {'added': [], 'removed': [], 'modified': []}
|
|
|
|
desired_tasks = self._build_task_registry_from_config()
|
|
current_tasks = set(self.task_callbacks.keys())
|
|
|
|
added, removed, modified = [], [], []
|
|
|
|
with self._state_lock:
|
|
# Add new tasks / update modified intervals
|
|
for task_id, task_info in desired_tasks.items():
|
|
if task_id not in current_tasks:
|
|
# New task — register it
|
|
if task_info.get('daily_time'):
|
|
hour, minute = task_info['daily_time']
|
|
self.schedule_daily_at_time(task_id, hour, minute, task_info['callback'])
|
|
else:
|
|
self.schedule_task(task_id, task_info['interval'], task_info['callback'])
|
|
if task_info.get('background'):
|
|
self._background_task_ids.add(task_id)
|
|
added.append(task_id)
|
|
self.log(f"Hot reload: added task {self._display_task_id(task_id)}", "info")
|
|
elif self.task_intervals.get(task_id) != task_info['interval']:
|
|
# Interval changed — update it (next run recalculated on next execution)
|
|
self.task_intervals[task_id] = task_info['interval']
|
|
self.task_callbacks[task_id] = task_info['callback']
|
|
modified.append(task_id)
|
|
self.log(f"Hot reload: updated interval for {self._display_task_id(task_id)}", "info")
|
|
|
|
# Remove tasks no longer in config (skip maintenance tasks)
|
|
for task_id in current_tasks:
|
|
if task_id not in desired_tasks and not task_id.startswith('maintenance:'):
|
|
self.scheduled_tasks.pop(task_id, None)
|
|
self.task_callbacks.pop(task_id, None)
|
|
self.task_intervals.pop(task_id, None)
|
|
self.task_daily_times.pop(task_id, None)
|
|
self._background_task_ids.discard(task_id)
|
|
removed.append(task_id)
|
|
self.log(f"Hot reload: removed task {self._display_task_id(task_id)}", "info")
|
|
|
|
self._save_state()
|
|
|
|
return {'added': added, 'removed': removed, 'modified': modified}
|
|
|
|
def start(self):
|
|
"""Start the scheduler"""
|
|
self.running = True
|
|
self.log("Scheduler started", "info")
|
|
|
|
# Clear any stale activity status from a previous scheduler process
|
|
# (e.g., if the scheduler was killed mid-task, active=1 would be stuck)
|
|
try:
|
|
self.activity_manager.stop_activity()
|
|
self.activity_manager.stop_all_background_tasks()
|
|
except Exception:
|
|
pass
|
|
|
|
# Check for interrupted tasks from a previous crash
|
|
interrupted = TaskCheckpoint.get_interrupted()
|
|
if interrupted:
|
|
from datetime import datetime, timedelta
|
|
stale_cutoff = datetime.now() - timedelta(hours=STALE_THRESHOLD_HOURS)
|
|
for cp in interrupted:
|
|
started = datetime.fromisoformat(cp['started_at']) if cp['started_at'] else None
|
|
if started and started < stale_cutoff:
|
|
self.log(
|
|
f"Abandoning stale checkpoint: {cp['task_id']} "
|
|
f"(started {cp['started_at']}, {cp['completed_count']}/{cp['total_items']} done)",
|
|
"warning",
|
|
)
|
|
TaskCheckpoint.abandon(cp['task_id'])
|
|
else:
|
|
self.log(
|
|
f"Found interrupted task: {cp['task_id']} "
|
|
f"({cp['completed_count']}/{cp['total_items']} items completed, "
|
|
f"was processing: {cp['current_item']})",
|
|
"warning",
|
|
)
|
|
|
|
# Import MediaDownloader once
|
|
import sys
|
|
import importlib.util
|
|
from pathlib import Path
|
|
|
|
# Add parent directory to path
|
|
parent_dir = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(parent_dir))
|
|
|
|
# Import media-downloader.py (with hyphen)
|
|
spec = importlib.util.spec_from_file_location(
|
|
"media_downloader",
|
|
parent_dir / "media-downloader.py"
|
|
)
|
|
media_downloader = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(media_downloader)
|
|
MediaDownloader = media_downloader.MediaDownloader
|
|
|
|
# Load configuration
|
|
config = self._load_config()
|
|
|
|
# Create downloader instance with notifications enabled for scheduler mode
|
|
# Share unified_db to avoid duplicate database connection pools
|
|
downloader = MediaDownloader(enable_notifications=True, unified_db=self.unified_db)
|
|
self.downloader = downloader # Store reference for memory management
|
|
|
|
# Initialize dependency updater for idle-time updates
|
|
update_config = config.get('dependency_updater', {}) or config.get('dependency_updates', {})
|
|
if update_config.get('enabled', True):
|
|
self.dependency_updater = DependencyUpdater(
|
|
config=update_config,
|
|
pushover_notifier=downloader.notifier if hasattr(downloader, 'notifier') else None,
|
|
scheduler_mode=True
|
|
)
|
|
# Load last check time from the updater's persisted state
|
|
status = self.dependency_updater.get_update_status()
|
|
if status and status.get('last_check'):
|
|
try:
|
|
self.last_dependency_check = datetime.fromisoformat(status['last_check'])
|
|
self.log(f"Last dependency check: {self.last_dependency_check}", "debug")
|
|
except (ValueError, TypeError):
|
|
pass
|
|
self.log("Dependency updater initialized (runs when idle)", "info")
|
|
else:
|
|
self.log("Dependency updater disabled in config", "debug")
|
|
|
|
# Track tasks to run at start - organized by module type
|
|
startup_tasks_by_module = {
|
|
'instagram_unified': [],
|
|
'instagram': [],
|
|
'fastdl': [],
|
|
'tiktok': [],
|
|
'imginn': [],
|
|
'imginn_api': [],
|
|
'instagram_client': [],
|
|
'toolzu': [],
|
|
'snapchat': [],
|
|
'snapchat_client': [],
|
|
'forums': [],
|
|
'coppermine': []
|
|
}
|
|
|
|
# Get hidden modules list - skip scheduling for hidden modules
|
|
hidden_modules = config.get('hidden_modules', [])
|
|
|
|
# Schedule unified Instagram task (replaces 6 individual Instagram scraper blocks)
|
|
_instagram_unified_enabled = False
|
|
if config.get('instagram_unified', {}).get('enabled'):
|
|
_instagram_unified_enabled = True
|
|
interval = config['instagram_unified'].get('check_interval_hours', 8)
|
|
run_at_start = config['instagram_unified'].get('run_at_start', False)
|
|
task_id = 'instagram_unified:all'
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['instagram_unified'].append(
|
|
(task_id, lambda: self._run_instagram_unified(downloader))
|
|
)
|
|
|
|
if interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
interval,
|
|
lambda: self._run_instagram_unified(downloader)
|
|
)
|
|
|
|
# Legacy individual Instagram scheduling (only if unified is NOT enabled)
|
|
if not _instagram_unified_enabled:
|
|
# Schedule Instagram accounts (InstaLoader)
|
|
if config.get('instagram', {}).get('enabled') and 'instagram' not in hidden_modules:
|
|
accounts = config['instagram'].get('accounts', [])
|
|
if not accounts and 'usernames' in config['instagram']:
|
|
for username in config['instagram']['usernames']:
|
|
accounts.append({
|
|
'username': username,
|
|
'check_interval_hours': 6,
|
|
'run_at_start': False
|
|
})
|
|
|
|
for account in accounts:
|
|
username = account.get('username')
|
|
check_interval = account.get('check_interval_hours', 6)
|
|
run_at_start = account.get('run_at_start', False)
|
|
|
|
if not username:
|
|
continue
|
|
|
|
task_id = f"instagram:{username}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['instagram'].append((task_id, lambda u=username: self._run_instagram_for_user(downloader, u)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda u=username: self._run_instagram_for_user(downloader, u)
|
|
)
|
|
|
|
# Schedule FastDL accounts
|
|
if config.get('fastdl', {}).get('enabled') and 'fastdl' not in hidden_modules:
|
|
usernames = config['fastdl'].get('usernames', [])
|
|
check_interval = config['fastdl'].get('check_interval_hours', 6)
|
|
run_at_start = config['fastdl'].get('run_at_start', False)
|
|
|
|
if len(usernames) > 0:
|
|
task_id = "fastdl:all"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['fastdl'].append((task_id, lambda: self._run_fastdl_all(downloader)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda: self._run_fastdl_all(downloader)
|
|
)
|
|
|
|
# Schedule ImgInn API accounts
|
|
if config.get('imginn_api', {}).get('enabled') and 'imginn_api' not in hidden_modules:
|
|
usernames = config['imginn_api'].get('usernames', [])
|
|
check_interval = config['imginn_api'].get('check_interval_hours', 6)
|
|
run_at_start = config['imginn_api'].get('run_at_start', False)
|
|
|
|
if len(usernames) > 0:
|
|
task_id = "imginn_api:all"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['imginn_api'].append((task_id, lambda: self._run_imginn_api_all(downloader)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda: self._run_imginn_api_all(downloader)
|
|
)
|
|
|
|
# Schedule TikTok accounts
|
|
if config.get('tiktok', {}).get('enabled') and 'tiktok' not in hidden_modules:
|
|
accounts = config['tiktok'].get('accounts', [])
|
|
# Get global run_at_start setting (fallback to False)
|
|
global_run_at_start = config['tiktok'].get('run_at_start', False)
|
|
|
|
# Support old format
|
|
if not accounts and 'usernames' in config['tiktok']:
|
|
for username in config['tiktok']['usernames']:
|
|
accounts.append({
|
|
'username': username,
|
|
'check_interval_hours': 8,
|
|
'run_at_start': global_run_at_start
|
|
})
|
|
|
|
for account in accounts:
|
|
username = account.get('username')
|
|
check_interval = account.get('check_interval_hours', 8)
|
|
# Check per-account run_at_start first, fallback to global setting
|
|
run_at_start = account.get('run_at_start', global_run_at_start)
|
|
|
|
if not username:
|
|
continue
|
|
|
|
task_id = f"tiktok:{username}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['tiktok'].append((task_id, lambda u=username: self._run_tiktok_for_user(downloader, u)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda u=username: self._run_tiktok_for_user(downloader, u)
|
|
)
|
|
|
|
# Legacy ImgInn, Instagram Client, Toolzu scheduling (only if unified is NOT enabled)
|
|
if not _instagram_unified_enabled:
|
|
# Schedule ImgInn accounts (alternative Instagram downloader)
|
|
if config.get('imginn', {}).get('enabled') and 'imginn' not in hidden_modules:
|
|
usernames = config['imginn'].get('usernames', [])
|
|
check_interval = config['imginn'].get('check_interval_hours', 6)
|
|
run_at_start = config['imginn'].get('run_at_start', False)
|
|
|
|
for username in usernames:
|
|
task_id = f"imginn:{username}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['imginn'].append((task_id, lambda u=username: self._run_imginn_for_user(downloader, u)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda u=username: self._run_imginn_for_user(downloader, u)
|
|
)
|
|
|
|
# Schedule Instagram Client accounts (direct API downloader)
|
|
if config.get('instagram_client', {}).get('enabled') and 'instagram_client' not in hidden_modules:
|
|
usernames = config['instagram_client'].get('usernames', [])
|
|
check_interval = config['instagram_client'].get('check_interval_hours', 6)
|
|
run_at_start = config['instagram_client'].get('run_at_start', False)
|
|
|
|
for username in usernames:
|
|
task_id = f"instagram_client:{username}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['instagram_client'].append((task_id, lambda u=username: self._run_instagram_client_for_user(downloader, u)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda u=username: self._run_instagram_client_for_user(downloader, u)
|
|
)
|
|
|
|
# Schedule Toolzu accounts (1920x1440 resolution Instagram downloader)
|
|
if config.get('toolzu', {}).get('enabled') and 'toolzu' not in hidden_modules:
|
|
usernames = config['toolzu'].get('usernames', [])
|
|
check_interval = config['toolzu'].get('check_interval_hours', 4)
|
|
run_at_start = config['toolzu'].get('run_at_start', False)
|
|
|
|
for username in usernames:
|
|
task_id = f"toolzu:{username}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['toolzu'].append((task_id, lambda u=username: self._run_toolzu_for_user(downloader, u)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda u=username: self._run_toolzu_for_user(downloader, u)
|
|
)
|
|
|
|
# Schedule Snapchat accounts
|
|
if config.get('snapchat', {}).get('enabled') and 'snapchat' not in hidden_modules:
|
|
# Snapchat uses a usernames list
|
|
usernames = config['snapchat'].get('usernames', [])
|
|
check_interval = config['snapchat'].get('check_interval_hours', 6)
|
|
run_at_start = config['snapchat'].get('run_at_start', False)
|
|
|
|
for username in usernames:
|
|
task_id = f"snapchat:{username}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['snapchat'].append((task_id, lambda u=username: self._run_snapchat_for_user(downloader, u)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda u=username: self._run_snapchat_for_user(downloader, u)
|
|
)
|
|
|
|
# Schedule Snapchat Client accounts (direct API, no Playwright)
|
|
if config.get('snapchat_client', {}).get('enabled') and 'snapchat_client' not in hidden_modules:
|
|
usernames = config['snapchat_client'].get('usernames', [])
|
|
check_interval = config['snapchat_client'].get('check_interval_hours', 6)
|
|
run_at_start = config['snapchat_client'].get('run_at_start', False)
|
|
|
|
for username in usernames:
|
|
task_id = f"snapchat_client:{username}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['snapchat_client'].append((task_id, lambda u=username: self._run_snapchat_client_for_user(downloader, u)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda u=username: self._run_snapchat_client_for_user(downloader, u)
|
|
)
|
|
|
|
# Schedule Forum configurations
|
|
if config.get('forums', {}).get('enabled') and 'forums' not in hidden_modules:
|
|
forum_configs = config['forums'].get('configs', [])
|
|
|
|
for forum_config in forum_configs:
|
|
forum_name = forum_config.get('name')
|
|
if not forum_name or not forum_config.get('enabled'):
|
|
continue
|
|
|
|
# Schedule regular forum search
|
|
check_interval = forum_config.get('check_interval_hours', 12)
|
|
run_at_start = forum_config.get('run_at_start', False)
|
|
|
|
task_id = f"forum:{forum_name}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['forums'].append((task_id, lambda f=forum_config: self._run_forum(downloader, f)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda f=forum_config: self._run_forum(downloader, f)
|
|
)
|
|
|
|
# Schedule monitored thread checks (separate task)
|
|
if forum_config.get('update_monitored'):
|
|
monitor_interval = forum_config.get('monitor_threads_check_hours', 12)
|
|
monitor_at_start = forum_config.get('monitor_threads_run_at_start', False)
|
|
|
|
monitor_task_id = f"monitor:{forum_name}"
|
|
|
|
if monitor_at_start:
|
|
startup_tasks_by_module['forums'].append((monitor_task_id,
|
|
lambda f=forum_config: self._check_monitored_threads(downloader, f)))
|
|
|
|
if monitor_interval > 0:
|
|
self.schedule_task(
|
|
monitor_task_id,
|
|
monitor_interval,
|
|
lambda f=forum_config: self._check_monitored_threads(downloader, f)
|
|
)
|
|
|
|
# Schedule Coppermine galleries
|
|
if config.get('coppermine', {}).get('enabled') and 'coppermine' not in hidden_modules:
|
|
galleries = config['coppermine'].get('galleries', [])
|
|
global_check_interval = config['coppermine'].get('check_interval_hours', 12)
|
|
global_run_at_start = config['coppermine'].get('run_at_start', False)
|
|
|
|
for gallery in galleries:
|
|
gallery_name = gallery.get('name')
|
|
gallery_url = gallery.get('url')
|
|
|
|
if not gallery_name or not gallery_url:
|
|
continue
|
|
|
|
check_interval = gallery.get('check_interval_hours', global_check_interval)
|
|
run_at_start = gallery.get('run_at_start', global_run_at_start)
|
|
|
|
task_id = f"coppermine:{gallery_name}"
|
|
|
|
if run_at_start:
|
|
startup_tasks_by_module['coppermine'].append((task_id, lambda g=gallery: self._run_coppermine_for_gallery(downloader, g)))
|
|
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
task_id,
|
|
check_interval,
|
|
lambda g=gallery: self._run_coppermine_for_gallery(downloader, g)
|
|
)
|
|
|
|
# Schedule maintenance tasks
|
|
# Error monitoring - check hourly for unreviewed errors and send push alerts
|
|
self.schedule_task(
|
|
"maintenance:error_alerts",
|
|
1.0, # Check every hour
|
|
lambda: self._check_error_alerts(downloader)
|
|
)
|
|
self._background_task_ids.add("maintenance:error_alerts")
|
|
self.log("Scheduled maintenance task: error_alerts (hourly)", "info")
|
|
|
|
# Error cleanup - run daily to clean up old error records
|
|
self.schedule_task(
|
|
"maintenance:error_cleanup",
|
|
24.0, # Once per day
|
|
lambda: self._cleanup_old_errors(downloader)
|
|
)
|
|
self._background_task_ids.add("maintenance:error_cleanup")
|
|
self.log("Scheduled maintenance task: error_cleanup (daily)", "info")
|
|
|
|
# YouTube channel monitor - check channels for matching videos
|
|
try:
|
|
db_path = str(Path(__file__).parent.parent / 'database' / 'media_downloader.db')
|
|
self.youtube_monitor = YouTubeChannelMonitor(db_path, self.activity_manager)
|
|
|
|
# Get configured interval from settings (default 6 hours)
|
|
yt_settings = self.youtube_monitor.get_global_settings()
|
|
yt_interval = yt_settings.get('check_interval_hours', 6)
|
|
|
|
self.schedule_task(
|
|
"youtube_channel_monitor",
|
|
float(yt_interval),
|
|
lambda: self._run_youtube_monitor()
|
|
)
|
|
self._background_task_ids.add("youtube_channel_monitor")
|
|
self.log(f"Scheduled YouTube channel monitor (every {yt_interval} hours)", "info")
|
|
|
|
# Schedule paused channel re-checks (v11.20.0)
|
|
paused_check_interval_days = yt_settings.get('paused_check_interval_days', 14)
|
|
self.schedule_task(
|
|
"youtube_monitor:paused",
|
|
float(paused_check_interval_days * 24), # Convert days to hours
|
|
lambda: self._run_youtube_paused_check()
|
|
)
|
|
self.log(f"Scheduled YouTube paused channel re-checks (every {paused_check_interval_days} days)", "info")
|
|
except Exception as e:
|
|
self.log(f"Failed to initialize YouTube channel monitor: {e}", "error")
|
|
|
|
# Schedule TMDb celebrity appearances sync (v11.23.0)
|
|
try:
|
|
# Check if TMDb sync is enabled
|
|
with self.unified_db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT tmdb_enabled, tmdb_check_interval_hours FROM appearance_config WHERE id = 1')
|
|
row = cursor.fetchone()
|
|
|
|
if row and row[0]: # TMDb enabled
|
|
tmdb_interval = row[1] or 12 # Default 12 hours
|
|
self.schedule_task(
|
|
"appearances:tmdb_sync",
|
|
float(tmdb_interval),
|
|
lambda: self._run_tmdb_sync()
|
|
)
|
|
self._background_task_ids.add("appearances:tmdb_sync")
|
|
self.log(f"Scheduled TMDb celebrity appearances sync (every {tmdb_interval} hours)", "info")
|
|
else:
|
|
self.log("TMDb sync disabled in appearance config", "info")
|
|
except Exception as e:
|
|
self.log(f"Failed to schedule TMDb appearances sync: {e}", "error")
|
|
|
|
# Schedule Easynews monitor (v11.30.0)
|
|
try:
|
|
db_path = str(Path(__file__).parent.parent / 'database' / 'media_downloader.db')
|
|
self.easynews_monitor = EasynewsMonitor(db_path, self.activity_manager)
|
|
|
|
# Get configured interval from settings (default 12 hours)
|
|
easynews_config = self.easynews_monitor.get_config()
|
|
if easynews_config.get('enabled') and easynews_config.get('has_credentials'):
|
|
easynews_interval = easynews_config.get('check_interval_hours', 12)
|
|
|
|
self.schedule_task(
|
|
"easynews_monitor",
|
|
float(easynews_interval),
|
|
lambda: self._run_easynews_check()
|
|
)
|
|
self._background_task_ids.add("easynews_monitor")
|
|
self.log(f"Scheduled Easynews monitor (every {easynews_interval} hours)", "info")
|
|
else:
|
|
self.log("Easynews monitor disabled or no credentials configured", "info")
|
|
except Exception as e:
|
|
self.log(f"Failed to initialize Easynews monitor: {e}", "error")
|
|
|
|
# Schedule Reddit community monitor for private gallery
|
|
try:
|
|
db_path = str(Path(__file__).parent.parent / 'database' / 'media_downloader.db')
|
|
self.reddit_monitor = RedditCommunityMonitor(db_path, self.activity_manager)
|
|
|
|
reddit_settings = self.reddit_monitor.get_settings()
|
|
reddit_interval = float(reddit_settings.get('check_interval_hours', 4))
|
|
if reddit_settings.get('enabled'):
|
|
self.schedule_task(
|
|
"reddit_monitor",
|
|
reddit_interval,
|
|
lambda: self._run_reddit_monitor()
|
|
)
|
|
self._background_task_ids.add("reddit_monitor")
|
|
self.log(f"Scheduled Reddit community monitor (every {reddit_interval} hours)", "info")
|
|
else:
|
|
self.log("Reddit community monitor disabled", "info")
|
|
except Exception as e:
|
|
self.log(f"Failed to initialize Reddit community monitor: {e}", "error")
|
|
|
|
# Schedule Press monitor (GDELT news articles)
|
|
try:
|
|
with self.unified_db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT enabled, check_interval_hours FROM press_config WHERE id = 1')
|
|
press_row = cursor.fetchone()
|
|
|
|
if press_row and press_row['enabled']:
|
|
press_interval = float(press_row['check_interval_hours'] or 6)
|
|
self.schedule_task(
|
|
"press_monitor",
|
|
press_interval,
|
|
lambda: self._run_press_monitor()
|
|
)
|
|
self._background_task_ids.add("press_monitor")
|
|
self.log(f"Scheduled Press monitor (every {press_interval} hours)", "info")
|
|
else:
|
|
self.log("Press monitor disabled", "info")
|
|
except Exception as e:
|
|
self.log(f"Failed to initialize Press monitor: {e}", "error")
|
|
|
|
# Schedule daily appearance reminder notifications (runs at midnight)
|
|
try:
|
|
# Check if appearance notifications are enabled
|
|
with self.unified_db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT notify_new_appearances FROM appearance_config WHERE id = 1')
|
|
row = cursor.fetchone()
|
|
|
|
if row and row[0]: # Notifications enabled
|
|
self.schedule_daily_at_time(
|
|
"appearances:reminders",
|
|
0, # Hour: midnight
|
|
0, # Minute: 0
|
|
lambda: self._run_appearance_reminders()
|
|
)
|
|
else:
|
|
self.log("Appearance notifications disabled in config", "info")
|
|
except Exception as e:
|
|
self.log(f"Failed to schedule appearance reminders: {e}", "error")
|
|
|
|
# Schedule paid content sync (Coomer/Kemono)
|
|
try:
|
|
from modules.paid_content import PaidContentDBAdapter
|
|
pc_db = PaidContentDBAdapter(self.unified_db)
|
|
pc_config = pc_db.get_config()
|
|
|
|
# Only schedule if there are enabled creators
|
|
enabled_creators = pc_db.get_creators(enabled_only=True)
|
|
if enabled_creators:
|
|
check_interval = pc_config.get('check_interval_hours', 6)
|
|
if check_interval > 0:
|
|
self.schedule_task(
|
|
"paid_content:sync",
|
|
check_interval,
|
|
lambda: self._run_paid_content_sync()
|
|
)
|
|
self._background_task_ids.add("paid_content:sync")
|
|
self.log(f"Paid Content sync scheduled (every {check_interval}h, {len(enabled_creators)} creators)", "info")
|
|
else:
|
|
self.log("Paid Content sync disabled (check_interval_hours = 0)", "info")
|
|
else:
|
|
self.log("Paid Content sync: no enabled creators configured", "debug")
|
|
|
|
except Exception as e:
|
|
self.log(f"Failed to schedule Paid Content sync: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
|
|
# Get configured module run order, or use default
|
|
default_order = ['fastdl', 'imginn_api', 'tiktok', 'imginn', 'instagram_client', 'toolzu', 'snapchat_client', 'forums', 'coppermine']
|
|
module_run_order = config.get('scheduler', {}).get('module_run_order', default_order)
|
|
|
|
# Build startup_tasks list in the configured order
|
|
startup_tasks = []
|
|
for module_name in module_run_order:
|
|
if module_name in startup_tasks_by_module:
|
|
startup_tasks.extend(startup_tasks_by_module[module_name])
|
|
|
|
# Add any modules not in the configured order (for backwards compatibility)
|
|
for module_name, tasks in startup_tasks_by_module.items():
|
|
if module_name not in module_run_order:
|
|
startup_tasks.extend(tasks)
|
|
|
|
if module_run_order != default_order:
|
|
self.log(f"Using custom module run order: {', '.join(module_run_order)}", "info")
|
|
|
|
# Clear any stale activity from previous scheduler instance
|
|
self._update_current_activity(None)
|
|
|
|
# Run interrupted tasks first (crash recovery)
|
|
if interrupted:
|
|
recovery_tasks = []
|
|
for cp in interrupted:
|
|
cp_task_id = cp['task_id']
|
|
started = datetime.fromisoformat(cp['started_at']) if cp['started_at'] else None
|
|
stale_cutoff = datetime.now() - timedelta(hours=STALE_THRESHOLD_HOURS)
|
|
if started and started < stale_cutoff:
|
|
continue # Already abandoned above
|
|
if cp_task_id in self.task_callbacks:
|
|
recovery_tasks.append(cp_task_id)
|
|
else:
|
|
self.log(f"Interrupted task {cp_task_id} is no longer registered — abandoning checkpoint", "warning")
|
|
TaskCheckpoint.abandon(cp_task_id)
|
|
|
|
if recovery_tasks:
|
|
self.log(f"Resuming {len(recovery_tasks)} interrupted task(s) from previous crash...", "info")
|
|
for task_id in recovery_tasks:
|
|
# Use the correct dispatch method based on task type
|
|
if task_id in self._background_task_ids:
|
|
self._execute_background_task(task_id)
|
|
else:
|
|
self._execute_task(task_id)
|
|
|
|
# Run startup tasks sequentially
|
|
if startup_tasks:
|
|
self.log(f"Running {len(startup_tasks)} tasks configured with run_at_start...", "info")
|
|
# Modules that use Cloudflare-protected sites need longer delays between accounts
|
|
cloudflare_modules = {'imginn', 'imginn_api', 'toolzu', 'snapchat', 'fastdl'}
|
|
prev_module = None
|
|
|
|
for task_id, func in startup_tasks:
|
|
try:
|
|
# Extract module name from task_id (e.g., "imginn:username" -> "imginn")
|
|
current_module = task_id.split(':')[0] if ':' in task_id else task_id
|
|
|
|
# Add longer random delay between accounts of the same Cloudflare-protected module
|
|
if prev_module and prev_module == current_module and current_module in cloudflare_modules:
|
|
delay = random.uniform(15, 45)
|
|
self.log(f"Waiting {delay:.1f}s between {current_module} accounts to avoid rate limiting", "info")
|
|
time.sleep(delay)
|
|
|
|
# Update activity tracking (protected by lock)
|
|
with self._state_lock:
|
|
self.current_task = task_id
|
|
self.current_task_start_time = datetime.now()
|
|
self._update_current_activity(task_id, "running")
|
|
|
|
self.log(f"Running {self._display_task_id(task_id)} at startup", "info")
|
|
func()
|
|
self.log(f"Completed {self._display_task_id(task_id)} startup run", "success")
|
|
|
|
# Clear activity tracking and update last run time (protected by lock)
|
|
with self._state_lock:
|
|
self.last_run_times[task_id] = datetime.now()
|
|
self.current_task = None
|
|
self.current_task_start_time = None
|
|
self._update_current_activity(None)
|
|
|
|
prev_module = current_module
|
|
|
|
# Add small delay between tasks to avoid database contention
|
|
time.sleep(0.5)
|
|
except Exception as e:
|
|
self.log(f"Error running {self._display_task_id(task_id)} at startup: {e}", "error")
|
|
# Clear activity on error too (protected by lock)
|
|
with self._state_lock:
|
|
self.current_task = None
|
|
self.current_task_start_time = None
|
|
self._update_current_activity(None)
|
|
prev_module = current_module
|
|
|
|
# Save all states at once after all startup tasks complete
|
|
self._save_state()
|
|
|
|
# Start the main sequential loop
|
|
self._main_loop()
|
|
|
|
def _run_instagram_for_user(self, downloader, username: str):
|
|
"""Run Instagram download for specific user"""
|
|
from modules.instagram_rate_limiter import rate_limiter as ig_rate_limiter
|
|
|
|
self.log(f"Downloading Instagram for @{username}", "info")
|
|
# Temporarily override config to only download this user
|
|
original_accounts = downloader.config.get('instagram', {}).get('accounts', [])
|
|
original_usernames = downloader.config.get('instagram', {}).get('usernames', [])
|
|
|
|
try:
|
|
# Acquire operation lock so paid content sync can't run simultaneously
|
|
with ig_rate_limiter.operation_lock:
|
|
# Set to only this username
|
|
if 'accounts' in downloader.config.get('instagram', {}):
|
|
downloader.config['instagram']['accounts'] = [
|
|
acc for acc in original_accounts if acc.get('username') == username
|
|
]
|
|
else:
|
|
downloader.config['instagram']['usernames'] = [username]
|
|
|
|
count = downloader.download_instagram()
|
|
if count and count > 0:
|
|
self.log(f"Downloaded {count} files for @{username} via Instagram", "info")
|
|
result = count if count else 0
|
|
finally:
|
|
# Restore original config
|
|
if 'accounts' in downloader.config.get('instagram', {}):
|
|
downloader.config['instagram']['accounts'] = original_accounts
|
|
else:
|
|
downloader.config['instagram']['usernames'] = original_usernames
|
|
|
|
# Brief cooldown between accounts to avoid rapid-fire API calls
|
|
time.sleep(random.uniform(3.0, 6.0))
|
|
return result
|
|
|
|
def _run_fastdl_all(self, downloader):
|
|
"""Run FastDL download for all configured users"""
|
|
usernames = downloader.config.get('fastdl', {}).get('usernames', [])
|
|
if not usernames:
|
|
return 0
|
|
|
|
self.log(f"Downloading Instagram via FastDL for {len(usernames)} accounts", "info")
|
|
count = downloader.download_fastdl()
|
|
if count and count > 0:
|
|
self.log(f"Downloaded {count} files total via FastDL", "info")
|
|
return count if count else 0
|
|
|
|
def _run_instagram_unified(self, downloader):
|
|
"""
|
|
Unified Instagram runner. Processes accounts one at a time,
|
|
running each content type through its assigned scraper.
|
|
"""
|
|
# Reload config from settings to pick up latest saves
|
|
config = self._load_config()
|
|
unified = config.get('instagram_unified', {})
|
|
if not unified.get('enabled'):
|
|
return 0
|
|
|
|
scraper_assignment = unified.get('scraper_assignment', {})
|
|
content_types = unified.get('content_types', {})
|
|
accounts = unified.get('accounts', [])
|
|
phrase_search = unified.get('phrase_search', {})
|
|
user_delay = unified.get('user_delay_seconds', 20)
|
|
|
|
# Map scraper keys to download methods
|
|
scraper_methods = {
|
|
'fastdl': 'download_fastdl',
|
|
'imginn_api': 'download_imginn_api',
|
|
'imginn': 'download_imginn',
|
|
'toolzu': 'download_toolzu',
|
|
'instagram_client': 'download_instagram_client',
|
|
'instagram': 'download_instagram',
|
|
}
|
|
|
|
# Content type keys in each scraper's config
|
|
content_type_keys = ['posts', 'stories', 'reels', 'tagged']
|
|
|
|
total_count = 0
|
|
active_accounts = [a for a in accounts if any(a.get(ct) for ct in content_type_keys)]
|
|
random.shuffle(active_accounts)
|
|
self.log(f"Instagram: processing {len(active_accounts)} accounts", "info")
|
|
|
|
# Crash recovery checkpoint
|
|
checkpoint = self._current_checkpoint
|
|
if checkpoint:
|
|
checkpoint.start(total_items=len(active_accounts))
|
|
if checkpoint.is_recovering():
|
|
self.log(f"Instagram: recovering — skipping already-completed accounts", "info")
|
|
|
|
# Suppress per-method gallery bridge calls — the unified runner handles it
|
|
# once at the end via _execute_task's on_download_complete
|
|
downloader._skip_gallery_bridge = True
|
|
|
|
for idx, account in enumerate(active_accounts):
|
|
username = account.get('username', '')
|
|
if not username:
|
|
continue
|
|
|
|
if checkpoint and checkpoint.is_completed(username):
|
|
self.log(f"Instagram: skipping @{username} (already completed in previous run)", "debug")
|
|
continue
|
|
|
|
if checkpoint:
|
|
checkpoint.set_current(username)
|
|
|
|
# Group enabled content types by assigned scraper
|
|
scraper_groups = {}
|
|
for ct in content_type_keys:
|
|
if not account.get(ct):
|
|
continue
|
|
ct_config = content_types.get(ct, {})
|
|
if not ct_config.get('enabled'):
|
|
continue
|
|
scraper_key = scraper_assignment.get(ct)
|
|
if not scraper_key:
|
|
continue
|
|
if scraper_key not in scraper_groups:
|
|
scraper_groups[scraper_key] = []
|
|
scraper_groups[scraper_key].append(ct)
|
|
|
|
if not scraper_groups:
|
|
continue
|
|
|
|
self.log(f"Instagram: @{username} ({idx+1}/{len(active_accounts)})", "info")
|
|
|
|
# Update dashboard activity with overall progress
|
|
try:
|
|
self.activity_manager.update_account_name(username)
|
|
self.activity_manager.update_account_progress(idx + 1, len(active_accounts))
|
|
except Exception:
|
|
pass
|
|
|
|
user_count = 0
|
|
for scraper_key, cts in scraper_groups.items():
|
|
method_name = scraper_methods.get(scraper_key)
|
|
if not method_name or not hasattr(downloader, method_name):
|
|
self.log(f"Unknown scraper method: {scraper_key}", "warning")
|
|
continue
|
|
|
|
# Save original config for this scraper
|
|
original_config = downloader.config.get(scraper_key, {}).copy() if downloader.config.get(scraper_key) else {}
|
|
|
|
try:
|
|
# Build temporary scraper config
|
|
scraper_cfg = dict(original_config)
|
|
scraper_cfg['enabled'] = True
|
|
|
|
# Set usernames for this single user
|
|
if scraper_key == 'instagram':
|
|
scraper_cfg['accounts'] = [{'username': username, 'check_interval_hours': 8, 'run_at_start': False}]
|
|
else:
|
|
scraper_cfg['usernames'] = [username]
|
|
|
|
# Enable only the content types assigned to this scraper for this account
|
|
for ct in content_type_keys:
|
|
if ct in cts:
|
|
ct_global = content_types.get(ct, {})
|
|
scraper_cfg[ct] = {
|
|
'enabled': True,
|
|
'days_back': ct_global.get('days_back', 7),
|
|
'destination_path': ct_global.get('destination_path', ''),
|
|
'temp_dir': f'temp/{scraper_key}/{ct}',
|
|
}
|
|
else:
|
|
scraper_cfg[ct] = {'enabled': False}
|
|
|
|
# Disable phrase search for per-user runs
|
|
scraper_cfg['phrase_search'] = {'enabled': False, 'usernames': [], 'phrases': []}
|
|
|
|
# Apply temporary config
|
|
downloader.config[scraper_key] = scraper_cfg
|
|
|
|
# Call the download method
|
|
method = getattr(downloader, method_name)
|
|
count = method()
|
|
if count and count > 0:
|
|
self.log(f" {scraper_key}: {count} files for @{username} ({', '.join(cts)})", "info")
|
|
user_count += count
|
|
total_count += count
|
|
|
|
except Exception as e:
|
|
self.log(f"Error running {scraper_key} for @{username}: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
finally:
|
|
# Restore original config
|
|
if original_config:
|
|
downloader.config[scraper_key] = original_config
|
|
elif scraper_key in downloader.config:
|
|
del downloader.config[scraper_key]
|
|
|
|
# Auto-import to private gallery after all content types for this user
|
|
if user_count > 0:
|
|
try:
|
|
from modules.scraper_gallery_bridge import get_crypto, on_download_complete
|
|
crypto = get_crypto()
|
|
if crypto:
|
|
imported = on_download_complete(f"instagram_unified:{username}", 1, self.unified_db, crypto)
|
|
if imported:
|
|
self.log(f" Gallery bridge: imported {imported} files for @{username}", "info")
|
|
except Exception as e:
|
|
self.log(f"Gallery bridge error for @{username}: {e}", "warning")
|
|
|
|
# Mark account as completed in checkpoint
|
|
if checkpoint:
|
|
checkpoint.mark_completed(username)
|
|
|
|
# Delay between accounts
|
|
if idx < len(active_accounts) - 1 and user_delay > 0:
|
|
time.sleep(user_delay)
|
|
|
|
# Run phrase search on the scraper assigned to posts
|
|
if phrase_search.get('enabled') and phrase_search.get('phrases'):
|
|
posts_scraper = scraper_assignment.get('posts', 'imginn_api')
|
|
method_name = scraper_methods.get(posts_scraper)
|
|
if method_name and hasattr(downloader, method_name):
|
|
original_config = downloader.config.get(posts_scraper, {}).copy() if downloader.config.get(posts_scraper) else {}
|
|
try:
|
|
# Build config with phrase search enabled, no regular usernames
|
|
scraper_cfg = dict(original_config)
|
|
scraper_cfg['enabled'] = True
|
|
scraper_cfg['usernames'] = []
|
|
|
|
# Collect all usernames with posts enabled for phrase search
|
|
ps_usernames = sorted(set(
|
|
a['username'] for a in accounts
|
|
if a.get('username') and a.get('posts')
|
|
))
|
|
|
|
scraper_cfg['phrase_search'] = {
|
|
'enabled': True,
|
|
'download_all': phrase_search.get('download_all', True),
|
|
'usernames': ps_usernames,
|
|
'phrases': phrase_search.get('phrases', []),
|
|
'case_sensitive': phrase_search.get('case_sensitive', False),
|
|
'match_all': phrase_search.get('match_all', False),
|
|
}
|
|
|
|
# Enable posts content type for phrase search
|
|
posts_ct = content_types.get('posts', {})
|
|
scraper_cfg['posts'] = {
|
|
'enabled': posts_ct.get('enabled', True),
|
|
'days_back': posts_ct.get('days_back', 7),
|
|
'destination_path': posts_ct.get('destination_path', ''),
|
|
'temp_dir': f'temp/{posts_scraper}/posts',
|
|
}
|
|
for ct in ['stories', 'reels', 'tagged']:
|
|
scraper_cfg[ct] = {'enabled': False}
|
|
|
|
downloader.config[posts_scraper] = scraper_cfg
|
|
|
|
self.log(f"Instagram: running phrase search via {posts_scraper}", "info")
|
|
method = getattr(downloader, method_name)
|
|
count = method()
|
|
if count and count > 0:
|
|
self.log(f"Phrase search: {count} files via {posts_scraper}", "info")
|
|
total_count += count
|
|
except Exception as e:
|
|
self.log(f"Error running phrase search via {posts_scraper}: {e}", "error")
|
|
finally:
|
|
if original_config:
|
|
downloader.config[posts_scraper] = original_config
|
|
|
|
# Re-enable per-method gallery bridge calls
|
|
downloader._skip_gallery_bridge = False
|
|
|
|
self.log(f"Instagram: completed, {total_count} total files", "info")
|
|
return total_count
|
|
|
|
def _run_platform_for_user(self, downloader, platform: str, username: str, download_method: str, display_name: str = None):
|
|
"""
|
|
Generic method to run a platform download for a specific user.
|
|
Reduces code duplication across _run_*_for_user methods.
|
|
|
|
Args:
|
|
downloader: The downloader instance
|
|
platform: Platform config key (e.g., 'fastdl', 'imginn', 'snapchat')
|
|
username: Username to download for
|
|
download_method: Name of the download method to call (e.g., 'download_fastdl')
|
|
display_name: Optional display name for logging (defaults to platform)
|
|
"""
|
|
display = display_name or platform.title()
|
|
self.log(f"Downloading {display} for @{username}", "info")
|
|
|
|
# Save original usernames
|
|
original_usernames = downloader.config.get(platform, {}).get('usernames', [])
|
|
|
|
try:
|
|
# Set to only this username
|
|
downloader.config.setdefault(platform, {})['usernames'] = [username]
|
|
# Call the download method dynamically
|
|
method = getattr(downloader, download_method)
|
|
count = method()
|
|
if count and count > 0:
|
|
self.log(f"Downloaded {count} files for @{username} via {display}", "info")
|
|
return count if count else 0
|
|
finally:
|
|
# Restore original config
|
|
downloader.config[platform]['usernames'] = original_usernames
|
|
|
|
def _run_fastdl_for_user(self, downloader, username: str):
|
|
"""Run FastDL download for specific user"""
|
|
return self._run_platform_for_user(
|
|
downloader, 'fastdl', username, 'download_fastdl', 'Instagram via FastDL'
|
|
)
|
|
|
|
def _run_imginn_api_all(self, downloader):
|
|
"""Run ImgInn API download for all configured users"""
|
|
usernames = downloader.config.get('imginn_api', {}).get('usernames', [])
|
|
if not usernames:
|
|
return 0
|
|
|
|
self.log(f"Downloading Instagram via ImgInn API for {len(usernames)} accounts", "info")
|
|
count = downloader.download_imginn_api()
|
|
if count and count > 0:
|
|
self.log(f"Downloaded {count} files total via ImgInn API", "info")
|
|
return count if count else 0
|
|
|
|
def _run_imginn_api_for_user(self, downloader, username: str):
|
|
"""Run ImgInn API download for specific user"""
|
|
return self._run_platform_for_user(
|
|
downloader, 'imginn_api', username, 'download_imginn_api', 'Instagram via ImgInn API'
|
|
)
|
|
|
|
def _run_imginn_for_user(self, downloader, username: str):
|
|
"""Run ImgInn download for specific user"""
|
|
return self._run_platform_for_user(
|
|
downloader, 'imginn', username, 'download_imginn', 'Instagram via ImgInn'
|
|
)
|
|
|
|
def _run_instagram_client_for_user(self, downloader, username: str):
|
|
"""Run Instagram Client (API) download for specific user"""
|
|
return self._run_platform_for_user(
|
|
downloader, 'instagram_client', username, 'download_instagram_client', 'Instagram via API'
|
|
)
|
|
|
|
def _run_toolzu_for_user(self, downloader, username: str):
|
|
"""Run Toolzu download for specific user"""
|
|
return self._run_platform_for_user(
|
|
downloader, 'toolzu', username, 'download_toolzu', 'Instagram via Toolzu (1920x1440)'
|
|
)
|
|
|
|
def _run_snapchat_for_user(self, downloader, username: str):
|
|
"""Run Snapchat download for specific user"""
|
|
return self._run_platform_for_user(
|
|
downloader, 'snapchat', username, 'download_snapchat', 'Snapchat stories'
|
|
)
|
|
|
|
def _run_snapchat_client_for_user(self, downloader, username: str):
|
|
"""Run Snapchat Client (API) download for specific user"""
|
|
return self._run_platform_for_user(
|
|
downloader, 'snapchat_client', username, 'download_snapchat_client', 'Snapchat via API'
|
|
)
|
|
|
|
def _run_tiktok_for_user(self, downloader, username: str):
|
|
"""Run TikTok download for specific user"""
|
|
self.log(f"Downloading TikTok for @{username}", "info")
|
|
# Temporarily override config
|
|
original_accounts = downloader.config.get('tiktok', {}).get('accounts', [])
|
|
original_usernames = downloader.config.get('tiktok', {}).get('usernames', [])
|
|
|
|
try:
|
|
if 'accounts' in downloader.config.get('tiktok', {}):
|
|
downloader.config['tiktok']['accounts'] = [
|
|
acc for acc in original_accounts if acc.get('username') == username
|
|
]
|
|
else:
|
|
downloader.config['tiktok']['usernames'] = [username]
|
|
|
|
count = downloader.download_tiktok()
|
|
if count and count > 0:
|
|
self.log(f"Downloaded {count} files for @{username} via TikTok", "info")
|
|
return count if count else 0
|
|
finally:
|
|
if 'accounts' in downloader.config.get('tiktok', {}):
|
|
downloader.config['tiktok']['accounts'] = original_accounts
|
|
else:
|
|
downloader.config['tiktok']['usernames'] = original_usernames
|
|
|
|
def _run_forum(self, downloader, forum_config: Dict):
|
|
"""Run forum download for specific configuration"""
|
|
forum_name = forum_config.get('name')
|
|
self.log(f"Searching for new threads in forum: {forum_name}", "info")
|
|
|
|
# Check if we've recently searched this forum
|
|
task_id = f"forum:{forum_name}"
|
|
check_interval = forum_config.get('check_interval_hours', 12)
|
|
min_hours_between_searches = check_interval / 2
|
|
|
|
if hasattr(downloader, 'unified_db'):
|
|
# Get last search time from scheduler state
|
|
state = downloader.unified_db.get_scheduler_state(task_id)
|
|
if state and state.get('last_run'):
|
|
last_run = state['last_run'] if isinstance(state['last_run'], datetime) else datetime.fromisoformat(state['last_run'])
|
|
hours_since_last = (datetime.now() - last_run).total_seconds() / 3600
|
|
|
|
if hours_since_last < min_hours_between_searches:
|
|
self.log(f"Skipping forum search (searched {hours_since_last:.1f} hours ago, minimum is {min_hours_between_searches:.1f} hours)", "info")
|
|
return
|
|
|
|
# Run forum download for this specific forum only
|
|
# Run in separate thread to avoid Playwright async conflict
|
|
self.log(f"Executing forum search for {forum_name}", "info")
|
|
|
|
import concurrent.futures
|
|
|
|
def _forum_search_in_thread():
|
|
return downloader.download_forums(batch_mode=True, specific_forum=forum_name)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
|
future = executor.submit(_forum_search_in_thread)
|
|
count = future.result()
|
|
|
|
if count and count > 0:
|
|
self.log(f"Downloaded {count} files from {forum_name}", "info")
|
|
return count if count else 0
|
|
|
|
def _run_coppermine_for_gallery(self, downloader, gallery_config: Dict):
|
|
"""Run Coppermine download for specific gallery"""
|
|
gallery_name = gallery_config.get('name')
|
|
gallery_url = gallery_config.get('url')
|
|
|
|
self.log(f"Downloading from Coppermine gallery: {gallery_name}", "info")
|
|
|
|
# Temporarily override config to download only this gallery
|
|
original_galleries = downloader.config.get('coppermine', {}).get('galleries', [])
|
|
|
|
try:
|
|
downloader.config.setdefault('coppermine', {})['galleries'] = [gallery_config]
|
|
count = downloader.download_coppermine()
|
|
|
|
if count and count > 0:
|
|
self.log(f"Downloaded {count} images from {gallery_name}", "info")
|
|
return count if count else 0
|
|
finally:
|
|
downloader.config['coppermine']['galleries'] = original_galleries
|
|
|
|
def _check_monitored_threads(self, downloader, forum_config: Dict):
|
|
"""Check monitored threads for updates"""
|
|
forum_name = forum_config.get('name')
|
|
self.log(f"Checking monitored threads for forum: {forum_name}", "info")
|
|
|
|
try:
|
|
# First, expire old monitored threads based on configured auto_track_days
|
|
auto_track_days = forum_config.get('auto_track_days', 30)
|
|
if hasattr(downloader, 'unified_db'):
|
|
try:
|
|
from modules.forum_db_adapter import ForumDatabaseAdapter
|
|
forum_adapter = ForumDatabaseAdapter(downloader.unified_db)
|
|
forum_adapter.cleanup_old_data(days=auto_track_days)
|
|
self.log(f"Expired old forum thread monitors ({auto_track_days}+ days)", "debug")
|
|
except Exception as e:
|
|
self.log(f"Error expiring old monitors: {e}", "debug")
|
|
|
|
# Clean up any existing browser contexts before starting
|
|
# Run in separate thread to avoid Playwright async conflict
|
|
if hasattr(downloader, 'modules') and 'forums' in downloader.modules:
|
|
forum_module = downloader.modules['forums']
|
|
if forum_module != 'pending' and hasattr(forum_module, 'cleanup'):
|
|
try:
|
|
import concurrent.futures
|
|
|
|
def _cleanup_in_thread():
|
|
return forum_module.cleanup()
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
|
future = executor.submit(_cleanup_in_thread)
|
|
future.result()
|
|
|
|
self.log("Cleaned up forum browser context", "debug")
|
|
except Exception as e:
|
|
self.log(f"Error cleaning browser context: {e}", "debug")
|
|
|
|
# Get monitored threads from database
|
|
if hasattr(downloader, 'unified_db'):
|
|
with downloader.unified_db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
SELECT ft.thread_id, ft.thread_url, ft.thread_title, ft.last_post_date, ft.last_checked
|
|
FROM forum_threads ft
|
|
INNER JOIN (
|
|
SELECT thread_url, MAX(thread_id) as max_id
|
|
FROM forum_threads
|
|
WHERE forum_name = ? AND status = 'active'
|
|
AND (monitor_until IS NULL OR monitor_until > datetime('now'))
|
|
GROUP BY thread_url
|
|
) latest ON ft.thread_id = latest.max_id
|
|
''', (forum_name,))
|
|
|
|
threads = cursor.fetchall()
|
|
|
|
if threads:
|
|
self.log(f"Found {len(threads)} monitored threads for {forum_name} (auto-monitoring for {auto_track_days} days)", "info")
|
|
for thread in threads[:5]: # Show first 5 threads
|
|
# Handle Row object
|
|
if hasattr(thread, 'keys'):
|
|
title = dict(thread).get('thread_title', 'Unknown')
|
|
else:
|
|
title = thread[2] if len(thread) > 2 else 'Unknown'
|
|
title = title[:50] if title else 'Unknown'
|
|
self.log(f" - {title}...", "debug")
|
|
if len(threads) > 5:
|
|
self.log(f" ... and {len(threads) - 5} more", "debug")
|
|
|
|
# Check how many threads were recently checked
|
|
threads_to_check = []
|
|
skipped_count = 0
|
|
|
|
# Determine check interval - don't recheck if checked within half the interval
|
|
check_interval = forum_config.get('monitor_threads_check_hours', 12)
|
|
min_hours_between_checks = check_interval / 2
|
|
|
|
# Also skip threads that were just checked in this session (within 1 hour)
|
|
min_hours_for_immediate_skip = 1.0
|
|
|
|
# Get search query for filtering
|
|
search_query = forum_config.get('search_query', '')
|
|
filtered_count = 0
|
|
|
|
# Build set of base thread URLs to detect duplicates (ADDS, X\d+ variants)
|
|
import re
|
|
base_thread_urls = set()
|
|
thread_url_map = {} # Map thread_url to thread data
|
|
|
|
for thread in threads:
|
|
if hasattr(thread, 'keys'):
|
|
thread_url = dict(thread).get('thread_url', '')
|
|
else:
|
|
thread_url = thread[1] if len(thread) > 1 else ''
|
|
|
|
# Normalize URL to base form (remove -adds, -x\d+, etc.)
|
|
base_url = re.sub(r'-(adds?|x\d+)(?=\.|/|$)', '', thread_url, flags=re.IGNORECASE)
|
|
base_thread_urls.add(base_url)
|
|
thread_url_map[thread_url] = base_url
|
|
|
|
for thread in threads:
|
|
# Convert Row to dict for easier access
|
|
if hasattr(thread, 'keys'):
|
|
thread_dict = dict(thread)
|
|
else:
|
|
# Fallback for tuple access
|
|
thread_dict = {
|
|
'thread_id': thread[0],
|
|
'thread_url': thread[1],
|
|
'thread_title': thread[2],
|
|
'last_post_date': thread[3] if len(thread) > 3 else None,
|
|
'last_checked': thread[4] if len(thread) > 4 else None
|
|
}
|
|
|
|
thread_id = thread_dict['thread_id']
|
|
thread_title = thread_dict['thread_title'][:50] if thread_dict['thread_title'] else 'Unknown'
|
|
full_thread_title = thread_dict['thread_title'] or ''
|
|
thread_url = thread_dict.get('thread_url', '')
|
|
|
|
# Filter: Skip duplicate variant threads (ADDS, X\d+, etc.)
|
|
# If this is a variant thread and the base thread exists, skip it
|
|
base_url = thread_url_map.get(thread_url, thread_url)
|
|
if thread_url != base_url and base_url in base_thread_urls:
|
|
# This is a variant (ADDS, X13, etc.) and the base thread exists
|
|
# Check if any other thread in the list has this base URL
|
|
has_base_thread = any(
|
|
thread_url_map.get(dict(t).get('thread_url') if hasattr(t, 'keys') else t[1], '') == base_url
|
|
and (dict(t).get('thread_url') if hasattr(t, 'keys') else t[1]) != thread_url
|
|
for t in threads
|
|
)
|
|
if has_base_thread:
|
|
self.log(f"Skipping variant thread (base exists): {thread_title}...", "debug")
|
|
filtered_count += 1
|
|
continue
|
|
|
|
# Filter: Skip threads where search query is NOT in the title
|
|
if search_query:
|
|
search_terms = search_query.lower().split()
|
|
title_lower = full_thread_title.lower()
|
|
if not all(term in title_lower for term in search_terms):
|
|
self.log(f"Skipping thread (search query not in title): {thread_title}...", "debug")
|
|
filtered_count += 1
|
|
continue
|
|
|
|
# Check if thread was JUST checked (within 1 hour - same session)
|
|
last_checked = thread_dict.get('last_checked')
|
|
if last_checked:
|
|
from datetime import datetime
|
|
if isinstance(last_checked, str):
|
|
last_checked_dt = datetime.fromisoformat(last_checked)
|
|
else:
|
|
last_checked_dt = last_checked
|
|
hours_since_checked = (datetime.now() - last_checked_dt).total_seconds() / 3600
|
|
|
|
if hours_since_checked < min_hours_for_immediate_skip:
|
|
self.log(f"Skipping thread (just checked {hours_since_checked:.1f}h ago): {thread_title}...", "debug")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Check if this thread was recently checked (within half interval)
|
|
if downloader.unified_db.was_thread_checked_recently(
|
|
thread_id, hours=min_hours_between_checks
|
|
):
|
|
self.log(f"Skipping thread (recently checked): {thread_title}", "debug")
|
|
skipped_count += 1
|
|
else:
|
|
threads_to_check.append(thread_dict)
|
|
|
|
if threads_to_check:
|
|
filter_msg = f", {filtered_count} filtered" if filtered_count > 0 else ""
|
|
self.log(f"Checking {len(threads_to_check)} threads ({skipped_count} skipped as recently checked{filter_msg})", "info")
|
|
|
|
# Process each thread that needs checking
|
|
total_new_posts = 0
|
|
total_new_images = 0
|
|
all_downloaded_file_paths = [] # Collect file paths for notification
|
|
|
|
for i, thread in enumerate(threads_to_check, 1):
|
|
try:
|
|
thread_url = thread.get('thread_url') if isinstance(thread, dict) else thread['thread_url']
|
|
thread_title = (thread.get('thread_title') if isinstance(thread, dict) else thread['thread_title'])
|
|
thread_title = thread_title[:60] if thread_title else 'Unknown'
|
|
|
|
self.log(f"[{i}/{len(threads_to_check)}] Checking monitored thread: {thread_title}...", "info")
|
|
|
|
# Actually download the thread to check for updates
|
|
if hasattr(downloader, 'modules') and 'forums' in downloader.modules:
|
|
forum_module = downloader.modules['forums']
|
|
|
|
# If forums module not initialized, initialize it
|
|
if forum_module == 'pending':
|
|
from modules.forum_db_adapter import ForumDatabaseAdapter
|
|
forum_db_adapter = ForumDatabaseAdapter(downloader.unified_db)
|
|
from modules.forum_downloader import ForumDownloader
|
|
|
|
forum_module = ForumDownloader(
|
|
headless=downloader.config.get('forums', {}).get('headless', True),
|
|
show_progress=True,
|
|
use_database=True,
|
|
db_path=forum_db_adapter,
|
|
download_dir=str(downloader.base_path / 'downloads'),
|
|
log_callback=downloader._log_callback
|
|
)
|
|
downloader.modules['forums'] = forum_module
|
|
|
|
# Set up paths
|
|
temp_dir = downloader.base_path / forum_config['temp_dir']
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
dest_dir = Path(forum_config['destination_path'])
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Download the thread in a separate thread to avoid Playwright async conflict
|
|
# Playwright sync API doesn't work well inside asyncio loops
|
|
import concurrent.futures
|
|
|
|
def _download_in_thread():
|
|
return forum_module.download_thread(
|
|
thread_url=thread_url,
|
|
forum_name=forum_name,
|
|
base_download_path=str(temp_dir),
|
|
destination_path=str(dest_dir),
|
|
username=forum_config.get('username'),
|
|
password=forum_config.get('password'),
|
|
external_only=forum_config.get('external_only', True),
|
|
cloudflare_enabled=forum_config.get('cloudflare_enabled', False),
|
|
auto_track_days=forum_config.get('auto_track_days', 30)
|
|
)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
|
future = executor.submit(_download_in_thread)
|
|
result = future.result()
|
|
|
|
if result.get('status') == 'success':
|
|
images = result.get('images_downloaded', 0)
|
|
posts = result.get('posts_processed', 0)
|
|
file_paths = result.get('downloaded_file_paths', [])
|
|
if images > 0:
|
|
self.log(f"Found {images} new images in {posts} new posts", "info")
|
|
total_new_images += images
|
|
total_new_posts += posts
|
|
all_downloaded_file_paths.extend(file_paths)
|
|
else:
|
|
self.log(f"No new content in thread", "debug")
|
|
elif result.get('status') == 'skipped':
|
|
self.log(f"Thread already up to date", "debug")
|
|
else:
|
|
self.log(f"Failed to check thread: {result.get('message', 'Unknown error')}", "warning")
|
|
|
|
# Record that we checked it
|
|
downloader.unified_db.record_thread_check(
|
|
thread_id=thread.get('thread_id') if isinstance(thread, dict) else thread['thread_id'],
|
|
forum_name=forum_name,
|
|
last_post_date=thread.get('last_post_date') if isinstance(thread, dict) else thread['last_post_date'],
|
|
status='completed'
|
|
)
|
|
|
|
except Exception as e:
|
|
self.log(f"Error checking thread {thread_title}: {e}", "error")
|
|
downloader.unified_db.record_thread_check(
|
|
thread_id=thread.get('thread_id') if isinstance(thread, dict) else thread['thread_id'],
|
|
forum_name=forum_name,
|
|
status='failed'
|
|
)
|
|
|
|
if total_new_images > 0:
|
|
self.log(f"Monitored threads check complete: {total_new_images} new images from {total_new_posts} new posts", "success")
|
|
|
|
# Send push notification if enabled
|
|
if downloader.notifier:
|
|
try:
|
|
search_query = forum_config.get('search_query', '')
|
|
|
|
# Create downloads list for notification with actual file paths
|
|
downloads = []
|
|
for file_path in all_downloaded_file_paths:
|
|
downloads.append({
|
|
'source': forum_name,
|
|
'content_type': 'image',
|
|
'filename': Path(file_path).name,
|
|
'file_path': file_path
|
|
})
|
|
|
|
# DEBUG: Log notification details
|
|
self.log(f"Preparing forum notification: {len(downloads)} files", "debug")
|
|
for dl in downloads[:5]: # Log first 5
|
|
exists = Path(dl['file_path']).exists()
|
|
self.log(f" - {dl['filename']} (exists: {exists})", "debug")
|
|
|
|
# Send batch notification
|
|
downloader.notifier.notify_batch_download(
|
|
platform='forums',
|
|
downloads=downloads,
|
|
search_term=search_query
|
|
)
|
|
|
|
self.log(f"Sent notification: {len(downloads)} image(s) from {forum_name}", "info")
|
|
except Exception as e:
|
|
self.log(f"Failed to send forum notification: {e}", "error")
|
|
|
|
# Trigger Immich scan if configured
|
|
if downloader.config.get('immich', {}).get('scan_after_download'):
|
|
self.log("Triggering Immich scan after monitored thread updates", "info")
|
|
downloader.trigger_immich_scan()
|
|
else:
|
|
self.log(f"Monitored threads check complete: No new content found", "info")
|
|
else:
|
|
filter_msg = f" ({filtered_count} filtered out)" if filtered_count > 0 else ""
|
|
self.log(f"All {len(threads)} threads were recently checked, skipping{filter_msg}", "info")
|
|
else:
|
|
self.log(f"No monitored threads found for {forum_name}", "info")
|
|
|
|
except Exception as e:
|
|
self.log(f"Error checking monitored threads for {forum_name}: {e}", "error")
|
|
return 0
|
|
|
|
return total_new_images if 'total_new_images' in locals() else 0
|
|
|
|
def _check_error_alerts(self, downloader):
|
|
"""Check for unreviewed errors and send push alerts if needed"""
|
|
try:
|
|
if not hasattr(downloader, 'unified_db') or not downloader.unified_db:
|
|
self.log("No unified_db available for error checking", "debug")
|
|
return
|
|
|
|
# Check error monitoring settings
|
|
error_settings = {}
|
|
if self.settings_manager:
|
|
error_settings = self.settings_manager.get('error_monitoring', {})
|
|
|
|
# If error monitoring is disabled, skip
|
|
if not error_settings.get('enabled', True):
|
|
self.log("Error monitoring is disabled", "debug")
|
|
return
|
|
|
|
# If push alerts are disabled, skip
|
|
if not error_settings.get('push_alert_enabled', True):
|
|
self.log("Error push alerts are disabled", "debug")
|
|
return
|
|
|
|
# Get the delay hours from settings (default 24)
|
|
delay_hours = error_settings.get('push_alert_delay_hours', 24)
|
|
|
|
# Scan logs for new errors first
|
|
self.log("Scanning logs for new errors...", "debug")
|
|
downloader.unified_db.scan_logs_for_errors()
|
|
|
|
# Check for errors needing push alerts (unreviewed for delay_hours+, no alert sent yet)
|
|
errors = downloader.unified_db.get_errors_needing_push_alert(delay_hours=delay_hours)
|
|
|
|
if not errors:
|
|
self.log("No unreviewed errors requiring push alerts", "debug")
|
|
return
|
|
|
|
# Count errors by module
|
|
error_count = len(errors)
|
|
modules = set(e.get('module', 'Unknown') for e in errors)
|
|
|
|
self.log(f"Found {error_count} errors needing push alert from: {', '.join(modules)}", "info")
|
|
|
|
# Send push notification if notifier is available
|
|
if downloader.notifier:
|
|
# Build error summary
|
|
error_summary = []
|
|
for error in errors[:5]: # Limit to first 5 errors
|
|
module = error.get('module', 'Unknown')
|
|
message = error.get('message', '')[:100] # Truncate long messages
|
|
count = error.get('occurrence_count', 1)
|
|
if count > 1:
|
|
error_summary.append(f"• [{module}] {message} (x{count})")
|
|
else:
|
|
error_summary.append(f"• [{module}] {message}")
|
|
|
|
if error_count > 5:
|
|
error_summary.append(f"... and {error_count - 5} more errors")
|
|
|
|
message = f"<b>{error_count} unreviewed error(s)</b> detected in logs:\n\n" + "\n".join(error_summary)
|
|
message += "\n\n<i>Check the dashboard for details.</i>"
|
|
|
|
# Set proper context for error notifications (prevents stale download data)
|
|
downloader.notifier._current_notification_context = {
|
|
'platform': 'system',
|
|
'source': 'Log Errors',
|
|
'content_type': 'error',
|
|
'download_count': 0,
|
|
'metadata': None
|
|
}
|
|
|
|
# Send notification
|
|
success = downloader.notifier.send_notification(
|
|
title="⚠️ Log Errors Require Attention",
|
|
message=message,
|
|
priority=0, # Normal priority
|
|
sound="intermission",
|
|
html=True
|
|
)
|
|
|
|
if success:
|
|
self.log(f"Sent push alert for {error_count} unreviewed errors", "info")
|
|
|
|
# Mark all errors as having had push alert sent
|
|
error_ids = [e.get('id') for e in errors if e.get('id')]
|
|
if error_ids:
|
|
downloader.unified_db.mark_push_alert_sent(error_ids)
|
|
else:
|
|
self.log("Failed to send push alert for errors", "warning")
|
|
else:
|
|
self.log("No notifier available for push alerts", "debug")
|
|
|
|
except Exception as e:
|
|
self.log(f"Error checking for error alerts: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
|
|
def _cleanup_old_errors(self, downloader):
|
|
"""Clean up old error records (older than 7 days) and orphaned embeddings"""
|
|
try:
|
|
if not hasattr(downloader, 'unified_db') or not downloader.unified_db:
|
|
return
|
|
|
|
# Clean up old error records
|
|
deleted_count = downloader.unified_db.cleanup_old_errors(days=7)
|
|
if deleted_count > 0:
|
|
self.log(f"Cleaned up {deleted_count} old error records", "info")
|
|
else:
|
|
self.log("No old error records to clean up", "debug")
|
|
|
|
# Clean up orphaned embeddings (files not in 'final' location)
|
|
self._cleanup_orphaned_embeddings(downloader)
|
|
|
|
except Exception as e:
|
|
self.log(f"Error cleaning up old errors: {e}", "error")
|
|
|
|
def _cleanup_orphaned_embeddings(self, downloader):
|
|
"""Clean up embeddings for files no longer in 'final' location"""
|
|
try:
|
|
if not hasattr(downloader, 'unified_db') or not downloader.unified_db:
|
|
return
|
|
|
|
with downloader.unified_db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Delete embeddings for files that are in recycle or review (not final)
|
|
cursor.execute('''
|
|
DELETE FROM content_embeddings
|
|
WHERE file_id IN (
|
|
SELECT ce.file_id FROM content_embeddings ce
|
|
JOIN file_inventory fi ON ce.file_id = fi.id
|
|
WHERE fi.location != 'final'
|
|
)
|
|
''')
|
|
orphaned_count = cursor.rowcount
|
|
|
|
# Also delete embeddings where file_id no longer exists in file_inventory
|
|
cursor.execute('''
|
|
DELETE FROM content_embeddings
|
|
WHERE file_id NOT IN (SELECT id FROM file_inventory)
|
|
''')
|
|
missing_count = cursor.rowcount
|
|
|
|
total_deleted = orphaned_count + missing_count
|
|
if total_deleted > 0:
|
|
self.log(f"Cleaned up {total_deleted} orphaned embeddings ({orphaned_count} non-final, {missing_count} missing files)", "info")
|
|
else:
|
|
self.log("No orphaned embeddings to clean up", "debug")
|
|
|
|
except Exception as e:
|
|
self.log(f"Error cleaning up orphaned embeddings: {e}", "error")
|
|
|
|
def _run_youtube_monitor(self) -> int:
|
|
"""Run the YouTube channel monitor to check for matching videos.
|
|
|
|
Runs asynchronously in background thread so scheduler can continue.
|
|
Uses background task tracking (shows in Background Tasks panel, not Current Activity).
|
|
"""
|
|
try:
|
|
if not self.youtube_monitor:
|
|
self.log("YouTube monitor not initialized", "warning")
|
|
return 0
|
|
|
|
# Run in background thread using check_all_now() which has its own
|
|
# background task tracking (shows in Background Tasks panel)
|
|
def run_in_background():
|
|
try:
|
|
import asyncio
|
|
# Create new event loop for this thread
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
videos_added = loop.run_until_complete(self.youtube_monitor.check_all_now(from_scheduler=True))
|
|
if videos_added > 0:
|
|
self.log(f"YouTube monitor: added {videos_added} videos to queue", "success")
|
|
else:
|
|
self.log("YouTube monitor: no new matching videos", "debug")
|
|
finally:
|
|
loop.close()
|
|
except Exception as e:
|
|
self.log(f"Error in YouTube monitor background task: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
finally:
|
|
with self._state_lock:
|
|
self._running_background_tasks.discard("youtube_channel_monitor")
|
|
|
|
# Start background thread
|
|
bg_thread = threading.Thread(target=run_in_background, daemon=True)
|
|
bg_thread.start()
|
|
|
|
self.log("YouTube monitor started in background", "info")
|
|
return 0 # Return immediately, actual count tracked by background task
|
|
|
|
except Exception as e:
|
|
self.log(f"Error starting YouTube monitor: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
return 0
|
|
|
|
def _run_reddit_monitor(self) -> int:
|
|
"""Run the Reddit community monitor to check for new posts.
|
|
|
|
Runs asynchronously in background thread so scheduler can continue.
|
|
Uses background task tracking (shows in Background Tasks panel).
|
|
"""
|
|
try:
|
|
if not hasattr(self, 'reddit_monitor') or not self.reddit_monitor:
|
|
self.log("Reddit monitor not initialized", "warning")
|
|
return 0
|
|
|
|
def run_in_background():
|
|
try:
|
|
import asyncio
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
media_count = loop.run_until_complete(self.reddit_monitor.check_all_now(from_scheduler=True))
|
|
if media_count > 0:
|
|
self.log(f"Reddit monitor: imported {media_count} new media items", "success")
|
|
else:
|
|
self.log("Reddit monitor: no new media found", "debug")
|
|
finally:
|
|
loop.close()
|
|
except Exception as e:
|
|
self.log(f"Error in Reddit monitor background task: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
finally:
|
|
with self._state_lock:
|
|
self._running_background_tasks.discard("reddit_monitor")
|
|
|
|
bg_thread = threading.Thread(target=run_in_background, daemon=True)
|
|
bg_thread.start()
|
|
|
|
self.log("Reddit monitor started in background", "info")
|
|
return 0
|
|
|
|
except Exception as e:
|
|
self.log(f"Error starting Reddit monitor: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
return 0
|
|
|
|
def _run_tmdb_sync(self) -> int:
|
|
"""Run TMDb celebrity appearances sync.
|
|
|
|
Runs asynchronously in background thread so scheduler can continue.
|
|
Uses background task tracking (shows in Background Tasks panel).
|
|
"""
|
|
try:
|
|
# Run in background thread
|
|
def run_in_background():
|
|
try:
|
|
import asyncio
|
|
from web.backend.routers.appearances import sync_tmdb_appearances
|
|
|
|
# Create new event loop for this thread
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(sync_tmdb_appearances(from_scheduler=True, db=self.unified_db))
|
|
self.log("TMDb appearances sync completed", "success")
|
|
finally:
|
|
loop.close()
|
|
except Exception as e:
|
|
self.log(f"Error in TMDb sync background task: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
finally:
|
|
with self._state_lock:
|
|
self._running_background_tasks.discard("appearances:tmdb_sync")
|
|
|
|
# Start background thread
|
|
bg_thread = threading.Thread(target=run_in_background, daemon=True)
|
|
bg_thread.start()
|
|
|
|
self.log("TMDb appearances sync started in background", "info")
|
|
return 0 # Return immediately, actual progress tracked by background task
|
|
|
|
except Exception as e:
|
|
self.log(f"Error starting TMDb sync: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
return 0
|
|
|
|
def _run_appearance_reminders(self) -> int:
|
|
"""Run daily appearance reminder notifications"""
|
|
try:
|
|
import asyncio
|
|
from web.backend.routers.appearances import send_appearance_reminders
|
|
|
|
self.log("Running appearance reminder check...", "info")
|
|
|
|
# Create a new event loop for the async function
|
|
# (using asyncio.run() can fail with "Event loop is closed" in threaded contexts)
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
result = loop.run_until_complete(send_appearance_reminders(self.unified_db))
|
|
finally:
|
|
loop.close()
|
|
|
|
if result.get('success'):
|
|
sent = result.get('sent', 0)
|
|
if sent > 0:
|
|
self.log(f"Sent {sent} appearance reminders", "success")
|
|
else:
|
|
self.log("No appearance reminders to send", "info")
|
|
return sent
|
|
else:
|
|
self.log(f"Appearance reminders failed: {result.get('error', 'Unknown error')}", "error")
|
|
return 0
|
|
except Exception as e:
|
|
self.log(f"Error running appearance reminders: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
return 0
|
|
|
|
def _run_youtube_paused_check(self) -> int:
|
|
"""Check paused YouTube channels for activity"""
|
|
try:
|
|
if not self.youtube_monitor:
|
|
self.log("YouTube monitor not initialized", "warning")
|
|
return 0
|
|
|
|
# Run the paused channel check synchronously
|
|
resumed = self.youtube_monitor.check_paused_channels_sync()
|
|
|
|
if resumed > 0:
|
|
self.log(f"YouTube paused check: {resumed} channels auto-resumed", "success")
|
|
else:
|
|
self.log("YouTube paused check: no channels resumed", "debug")
|
|
|
|
return resumed
|
|
except Exception as e:
|
|
self.log(f"Error checking paused YouTube channels: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
return 0
|
|
|
|
def _run_easynews_check(self) -> int:
|
|
"""Run the Easynews monitor to check for new media.
|
|
|
|
Runs in background thread so scheduler can continue.
|
|
"""
|
|
try:
|
|
if not self.easynews_monitor:
|
|
self.log("Easynews monitor not initialized", "warning")
|
|
return 0
|
|
|
|
# Run in background thread
|
|
def run_in_background():
|
|
try:
|
|
result = self.easynews_monitor.check_all_celebrities(from_scheduler=True)
|
|
results_found = result.get('results_found', 0)
|
|
if results_found > 0:
|
|
self.log(f"Easynews monitor: found {results_found} new results", "success")
|
|
else:
|
|
self.log("Easynews monitor: no new results", "debug")
|
|
except Exception as e:
|
|
self.log(f"Error in Easynews monitor background task: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
finally:
|
|
with self._state_lock:
|
|
self._running_background_tasks.discard("easynews_monitor")
|
|
|
|
# Start background thread
|
|
bg_thread = threading.Thread(target=run_in_background, daemon=True)
|
|
bg_thread.start()
|
|
|
|
self.log("Easynews monitor started in background", "info")
|
|
return 0 # Return immediately, actual count tracked by monitor
|
|
|
|
except Exception as e:
|
|
self.log(f"Error starting Easynews monitor: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
return 0
|
|
|
|
def _run_press_monitor(self) -> int:
|
|
"""Run the Press monitor to fetch GDELT news articles.
|
|
|
|
Runs in background thread so scheduler can continue.
|
|
"""
|
|
try:
|
|
def run_in_background():
|
|
try:
|
|
from web.backend.routers.press import process_press_articles
|
|
result = process_press_articles(self.unified_db)
|
|
total_new = result.get('total_new', 0)
|
|
if total_new > 0:
|
|
self.log(f"Press monitor: found {total_new} new articles", "success")
|
|
else:
|
|
self.log("Press monitor: no new articles", "debug")
|
|
except Exception as e:
|
|
self.log(f"Error in Press monitor background task: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
finally:
|
|
with self._state_lock:
|
|
self._running_background_tasks.discard("press_monitor")
|
|
|
|
bg_thread = threading.Thread(target=run_in_background, daemon=True)
|
|
bg_thread.start()
|
|
|
|
self.log("Press monitor started in background", "info")
|
|
return 0
|
|
|
|
except Exception as e:
|
|
self.log(f"Error starting Press monitor: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
return 0
|
|
|
|
def _run_paid_content_sync(self) -> int:
|
|
"""Run paid content sync for all enabled creators.
|
|
|
|
Runs asynchronously in background thread so scheduler can continue.
|
|
Uses the standalone sync_paid_content_all function which works across processes
|
|
and uses activity_manager for database-backed progress tracking (shows in GUI).
|
|
"""
|
|
try:
|
|
# Run in background thread (like TMDb sync)
|
|
def run_in_background():
|
|
try:
|
|
import asyncio
|
|
from web.backend.routers.paid_content import sync_paid_content_all
|
|
|
|
# Create new event loop for this thread
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
result = loop.run_until_complete(sync_paid_content_all(from_scheduler=True, download=True))
|
|
if result.get('success'):
|
|
new_posts = result.get('new_posts', 0)
|
|
new_files = result.get('new_files', 0)
|
|
failed = result.get('failed_creators', 0)
|
|
if new_posts > 0 or new_files > 0:
|
|
self.log(f"Paid Content sync: {new_posts} new posts, {new_files} new files", "success")
|
|
else:
|
|
self.log("Paid Content sync: no new content", "debug")
|
|
if failed > 0:
|
|
self.log(f"Paid Content sync: {failed} creators failed", "warning")
|
|
else:
|
|
self.log(f"Paid Content sync failed: {result.get('error', 'Unknown error')}", "error")
|
|
finally:
|
|
loop.close()
|
|
except Exception as e:
|
|
self.log(f"Error in Paid Content sync background task: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
finally:
|
|
with self._state_lock:
|
|
self._running_background_tasks.discard("paid_content:sync")
|
|
|
|
# Start background thread
|
|
import threading
|
|
bg_thread = threading.Thread(target=run_in_background, daemon=True)
|
|
bg_thread.start()
|
|
|
|
self.log("Paid Content sync started in background", "info")
|
|
return 0 # Return immediately, actual progress tracked via activity_manager
|
|
|
|
except Exception as e:
|
|
self.log(f"Error starting Paid Content sync: {e}", "error")
|
|
import traceback
|
|
self.log(f"Traceback: {traceback.format_exc()}", "debug")
|
|
return 0
|
|
|
|
def stop(self):
|
|
"""Stop the scheduler"""
|
|
self.running = False
|
|
self.log("Scheduler stopping...", "info")
|
|
# Sequential mode - main loop will exit on next iteration
|
|
self.log("Scheduler stopped", "info")
|
|
|
|
def get_status(self) -> Dict:
|
|
"""Get scheduler status"""
|
|
import sqlite3
|
|
|
|
status = {
|
|
'running': self.running,
|
|
'scheduled_tasks': {},
|
|
'last_run_times': {}
|
|
}
|
|
|
|
# Get download counts from database
|
|
download_counts = {}
|
|
try:
|
|
from contextlib import closing
|
|
with closing(sqlite3.connect(str(self.scheduler_db_path), timeout=5)) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT task_id, last_download_count FROM scheduler_state')
|
|
for row in cursor.fetchall():
|
|
download_counts[row[0]] = row[1] or 0
|
|
except (sqlite3.Error, OSError):
|
|
pass
|
|
|
|
for task_id, next_run in self.scheduled_tasks.items():
|
|
status['scheduled_tasks'][task_id] = {
|
|
'next_run': next_run.isoformat(),
|
|
'time_until': str(next_run - datetime.now()) if next_run > datetime.now() else 'Now'
|
|
}
|
|
|
|
for task_id, last_run in self.last_run_times.items():
|
|
status['last_run_times'][task_id] = {
|
|
'timestamp': last_run.isoformat(),
|
|
'time_ago': str(datetime.now() - last_run),
|
|
'download_count': download_counts.get(task_id, 0)
|
|
}
|
|
|
|
return status
|
|
|
|
|
|
def main():
|
|
"""Test the scheduler"""
|
|
scheduler = DownloadScheduler()
|
|
|
|
# Show configuration
|
|
config = scheduler._load_config()
|
|
print("Individual downloader schedules:")
|
|
|
|
# Show Instagram accounts
|
|
if config.get('instagram', {}).get('enabled'):
|
|
print("\nInstagram accounts:")
|
|
for account in config['instagram'].get('accounts', []):
|
|
username = account.get('username')
|
|
interval = account.get('check_interval_hours', 6)
|
|
run_start = account.get('run_at_start', False)
|
|
print(f" @{username}: {interval}h interval (random {interval/2:.1f}-{interval}h), run_at_start={run_start}")
|
|
|
|
# Show FastDL accounts
|
|
if config.get('fastdl', {}).get('enabled'):
|
|
print("\nFastDL (Instagram) accounts:")
|
|
usernames = config['fastdl'].get('usernames', [])
|
|
interval = config['fastdl'].get('check_interval_hours', 6)
|
|
run_start = config['fastdl'].get('run_at_start', False)
|
|
for username in usernames:
|
|
print(f" @{username}: {interval}h interval (random {interval/2:.1f}-{interval}h), run_at_start={run_start}")
|
|
|
|
# Show ImgInn API accounts
|
|
if config.get('imginn_api', {}).get('enabled'):
|
|
print("\nImgInn API (Instagram) accounts:")
|
|
usernames = config['imginn_api'].get('usernames', [])
|
|
interval = config['imginn_api'].get('check_interval_hours', 6)
|
|
run_start = config['imginn_api'].get('run_at_start', False)
|
|
for username in usernames:
|
|
print(f" @{username}: {interval}h interval (random {interval/2:.1f}-{interval}h), run_at_start={run_start}")
|
|
|
|
# Show ImgInn accounts
|
|
if config.get('imginn', {}).get('enabled'):
|
|
print("\nImgInn (Instagram) accounts:")
|
|
usernames = config['imginn'].get('usernames', [])
|
|
interval = config['imginn'].get('check_interval_hours', 6)
|
|
run_start = config['imginn'].get('run_at_start', False)
|
|
for username in usernames:
|
|
print(f" @{username}: {interval}h interval (random {interval/2:.1f}-{interval}h), run_at_start={run_start}")
|
|
|
|
# Show Toolzu accounts
|
|
if config.get('toolzu', {}).get('enabled'):
|
|
print("\nToolzu (Instagram 1920x1440) accounts:")
|
|
usernames = config['toolzu'].get('usernames', [])
|
|
interval = config['toolzu'].get('check_interval_hours', 4)
|
|
run_start = config['toolzu'].get('run_at_start', False)
|
|
for username in usernames:
|
|
print(f" @{username}: {interval}h interval (random {interval/2:.1f}-{interval}h), run_at_start={run_start}")
|
|
|
|
# Show Snapchat accounts
|
|
if config.get('snapchat', {}).get('enabled'):
|
|
print("\nSnapchat accounts:")
|
|
usernames = config['snapchat'].get('usernames', [])
|
|
interval = config['snapchat'].get('check_interval_hours', 6)
|
|
run_start = config['snapchat'].get('run_at_start', False)
|
|
for username in usernames:
|
|
print(f" @{username}: {interval}h interval (random {interval/2:.1f}-{interval}h), run_at_start={run_start}")
|
|
|
|
# Show TikTok accounts
|
|
if config.get('tiktok', {}).get('enabled'):
|
|
print("\nTikTok accounts:")
|
|
for account in config['tiktok'].get('accounts', []):
|
|
username = account.get('username')
|
|
interval = account.get('check_interval_hours', 8)
|
|
run_start = account.get('run_at_start', False)
|
|
print(f" @{username}: {interval}h interval (random {interval/2:.1f}-{interval}h), run_at_start={run_start}")
|
|
|
|
# Show Forums
|
|
if config.get('forums', {}).get('enabled'):
|
|
print("\nForum configurations:")
|
|
for forum in config['forums'].get('configs', []):
|
|
name = forum.get('name')
|
|
interval = forum.get('check_interval_hours', 12)
|
|
run_start = forum.get('run_at_start', False)
|
|
print(f" {name}: {interval}h interval (random {interval/2:.1f}-{interval}h), run_at_start={run_start}")
|
|
|
|
print("\nStarting scheduler...")
|
|
scheduler.start()
|
|
|
|
try:
|
|
# Keep running
|
|
while True:
|
|
time.sleep(30)
|
|
status = scheduler.get_status()
|
|
print(f"\nScheduler status at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}:")
|
|
print(json.dumps(status, indent=2, default=str))
|
|
except KeyboardInterrupt:
|
|
scheduler.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |