""" Manual Import Router Handles manual file import operations: - Service configuration - File upload to temp directory - Filename parsing - Processing and moving to final destination (async background processing) """ import asyncio import shutil import uuid from datetime import datetime from pathlib import Path from threading import Lock from typing import Dict, List, Optional from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, Request, UploadFile from pydantic import BaseModel from slowapi import Limiter from slowapi.util import get_remote_address from ..core.dependencies import get_current_user, get_app_state from ..core.exceptions import handle_exceptions, NotFoundError, ValidationError from modules.filename_parser import FilenameParser, get_preset_patterns, parse_with_fallbacks, INSTAGRAM_PATTERNS from modules.universal_logger import get_logger logger = get_logger('API') router = APIRouter(prefix="/api/manual-import", tags=["Manual Import"]) limiter = Limiter(key_func=get_remote_address) # ============================================================================ # JOB TRACKING FOR BACKGROUND PROCESSING # ============================================================================ # In-memory job tracking (jobs are transient - cleared on restart) _import_jobs: Dict[str, Dict] = {} _jobs_lock = Lock() def get_job_status(job_id: str) -> Optional[Dict]: """Get the current status of an import job.""" with _jobs_lock: return _import_jobs.get(job_id) def update_job_status(job_id: str, updates: Dict): """Update an import job's status.""" with _jobs_lock: if job_id in _import_jobs: _import_jobs[job_id].update(updates) def create_job(job_id: str, total_files: int, service_name: str): """Create a new import job.""" with _jobs_lock: _import_jobs[job_id] = { 'id': job_id, 'status': 'processing', 'service_name': service_name, 'total_files': total_files, 'processed_files': 0, 'success_count': 0, 'failed_count': 0, 'results': [], 'current_file': None, 'started_at': datetime.now().isoformat(), 'completed_at': None } def cleanup_old_jobs(): """Remove jobs older than 1 hour.""" with _jobs_lock: now = datetime.now() to_remove = [] for job_id, job in _import_jobs.items(): if job.get('completed_at'): try: completed = datetime.fromisoformat(job['completed_at']) if (now - completed).total_seconds() > 3600: # 1 hour to_remove.append(job_id) except (ValueError, TypeError): pass for job_id in to_remove: del _import_jobs[job_id] # ============================================================================ # PYDANTIC MODELS # ============================================================================ class ParseFilenameRequest(BaseModel): filename: str pattern: str class FileInfo(BaseModel): filename: str temp_path: str manual_datetime: Optional[str] = None manual_username: Optional[str] = None class ProcessFilesRequest(BaseModel): service_name: str files: List[dict] # ============================================================================ # HELPER FUNCTIONS # ============================================================================ def extract_youtube_metadata(video_id: str) -> Optional[Dict]: """Extract metadata from YouTube video using yt-dlp.""" import subprocess import json try: result = subprocess.run( [ '/opt/media-downloader/venv/bin/yt-dlp', '--dump-json', '--no-download', '--no-warnings', f'https://www.youtube.com/watch?v={video_id}' ], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: return None metadata = json.loads(result.stdout) upload_date = None if 'upload_date' in metadata and metadata['upload_date']: try: upload_date = datetime.strptime(metadata['upload_date'], '%Y%m%d') except ValueError: pass return { 'title': metadata.get('title', ''), 'uploader': metadata.get('uploader', metadata.get('channel', '')), 'channel': metadata.get('channel', metadata.get('uploader', '')), 'upload_date': upload_date, 'duration': metadata.get('duration'), 'view_count': metadata.get('view_count'), 'description': metadata.get('description', '')[:200] if metadata.get('description') else None } except Exception as e: logger.warning(f"Failed to extract YouTube metadata for {video_id}: {e}", module="ManualImport") return None def extract_video_id_from_filename(filename: str) -> Optional[str]: """Try to extract YouTube video ID from filename.""" import re name = Path(filename).stem # Pattern 1: ID in brackets [ID] bracket_match = re.search(r'\[([A-Za-z0-9_-]{11})\]', name) if bracket_match: return bracket_match.group(1) # Pattern 2: ID at end after underscore underscore_match = re.search(r'_([A-Za-z0-9_-]{11})$', name) if underscore_match: return underscore_match.group(1) # Pattern 3: Just the ID (filename is exactly 11 chars) if re.match(r'^[A-Za-z0-9_-]{11}$', name): return name # Pattern 4: ID somewhere in the filename id_match = re.search(r'(?:^|[_\-\s])([A-Za-z0-9_-]{11})(?:[_\-\s.]|$)', name) if id_match: return id_match.group(1) return None # ============================================================================ # ENDPOINTS # ============================================================================ @router.get("/services") @limiter.limit("60/minute") @handle_exceptions async def get_manual_import_services( request: Request, current_user: Dict = Depends(get_current_user) ): """Get configured manual import services.""" app_state = get_app_state() config = app_state.settings.get('manual_import') if not config: return { "enabled": False, "temp_dir": "/opt/media-downloader/temp/manual_import", "services": [], "preset_patterns": get_preset_patterns() } config['preset_patterns'] = get_preset_patterns() return config @router.post("/parse") @limiter.limit("100/minute") @handle_exceptions async def parse_filename( request: Request, body: ParseFilenameRequest, current_user: Dict = Depends(get_current_user) ): """Parse a filename using a pattern and return extracted metadata.""" try: parser = FilenameParser(body.pattern) result = parser.parse(body.filename) if result['datetime']: result['datetime'] = result['datetime'].isoformat() return result except Exception as e: logger.error(f"Error parsing filename: {e}", module="ManualImport") return { "valid": False, "error": str(e), "username": None, "datetime": None, "media_id": None } # File upload constants MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 # 5GB max file size MAX_FILENAME_LENGTH = 255 ALLOWED_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.mp4', '.mov', '.avi', '.mkv', '.webm', '.webp', '.heic', '.heif'} @router.post("/upload") @limiter.limit("30/minute") @handle_exceptions async def upload_files_for_import( request: Request, files: List[UploadFile] = File(...), service_name: str = Form(...), current_user: Dict = Depends(get_current_user) ): """Upload files to temp directory for manual import.""" app_state = get_app_state() config = app_state.settings.get('manual_import') if not config or not config.get('enabled'): raise ValidationError("Manual import is not enabled") services = config.get('services', []) service = next((s for s in services if s['name'] == service_name and s.get('enabled', True)), None) if not service: raise NotFoundError(f"Service '{service_name}' not found or disabled") session_id = str(uuid.uuid4())[:8] temp_base = Path(config.get('temp_dir', '/opt/media-downloader/temp/manual_import')) temp_dir = temp_base / session_id temp_dir.mkdir(parents=True, exist_ok=True) pattern = service.get('filename_pattern', '{username}_{YYYYMMDD}_{HHMMSS}_{id}') platform = service.get('platform', 'unknown') # Use fallback patterns for Instagram (handles both underscore and dash formats) use_fallbacks = platform == 'instagram' parser = FilenameParser(pattern) if not use_fallbacks else None uploaded_files = [] for file in files: # Sanitize filename - use only the basename to prevent path traversal safe_filename = Path(file.filename).name # Validate filename length if len(safe_filename) > MAX_FILENAME_LENGTH: raise ValidationError(f"Filename too long: {safe_filename[:50]}... (max {MAX_FILENAME_LENGTH} chars)") # Validate file extension file_ext = Path(safe_filename).suffix.lower() if file_ext not in ALLOWED_EXTENSIONS: raise ValidationError(f"File type not allowed: {file_ext}. Allowed: {', '.join(sorted(ALLOWED_EXTENSIONS))}") file_path = temp_dir / safe_filename content = await file.read() # Validate file size if len(content) > MAX_FILE_SIZE: raise ValidationError(f"File too large: {safe_filename} ({len(content) / (1024*1024*1024):.2f}GB, max {MAX_FILE_SIZE / (1024*1024*1024)}GB)") with open(file_path, 'wb') as f: f.write(content) # Parse filename - use fallback patterns for Instagram if use_fallbacks: parse_result = parse_with_fallbacks(file.filename, INSTAGRAM_PATTERNS) else: parse_result = parser.parse(file.filename) parsed_datetime = None if parse_result['datetime']: parsed_datetime = parse_result['datetime'].isoformat() uploaded_files.append({ "filename": file.filename, "temp_path": str(file_path), "size": len(content), "parsed": { "valid": parse_result['valid'], "username": parse_result['username'], "datetime": parsed_datetime, "media_id": parse_result['media_id'], "error": parse_result['error'] } }) logger.info(f"Uploaded {len(uploaded_files)} files for manual import (service: {service_name})", module="ManualImport") return { "session_id": session_id, "service_name": service_name, "files": uploaded_files, "temp_dir": str(temp_dir) } def process_files_background( job_id: str, service_name: str, files: List[dict], service: dict, app_state ): """Background task to process imported files.""" import hashlib from modules.move_module import MoveManager destination = Path(service['destination']) destination.mkdir(parents=True, exist_ok=True) pattern = service.get('filename_pattern', '{username}_{YYYYMMDD}_{HHMMSS}_{id}') platform = service.get('platform', 'unknown') content_type = service.get('content_type', 'videos') use_ytdlp = service.get('use_ytdlp', False) parse_filename = service.get('parse_filename', True) # Use fallback patterns for Instagram use_fallbacks = platform == 'instagram' parser = FilenameParser(pattern) if not use_fallbacks else None # Generate session ID for real-time monitoring session_id = f"manual_import_{service_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Emit scraper_started event if app_state.scraper_event_emitter: app_state.scraper_event_emitter.emit_scraper_started( session_id=session_id, platform=platform, account=service_name, content_type=content_type, estimated_count=len(files) ) move_manager = MoveManager( unified_db=app_state.db, face_recognition_enabled=False, notifier=None, event_emitter=app_state.scraper_event_emitter ) move_manager.set_session_context( platform=platform, account=service_name, session_id=session_id ) results = [] success_count = 0 failed_count = 0 for idx, file_info in enumerate(files): temp_path = Path(file_info['temp_path']) filename = file_info['filename'] manual_datetime_str = file_info.get('manual_datetime') manual_username = file_info.get('manual_username') # Update job status with current file update_job_status(job_id, { 'current_file': filename, 'processed_files': idx }) if not temp_path.exists(): result = {"filename": filename, "status": "error", "error": "File not found in temp directory"} results.append(result) failed_count += 1 update_job_status(job_id, {'results': results.copy(), 'failed_count': failed_count}) continue username = 'unknown' parsed_datetime = None final_filename = filename # Use manual values if provided if not parse_filename or manual_datetime_str or manual_username: if manual_username: username = manual_username.strip().lower() if manual_datetime_str: try: parsed_datetime = datetime.strptime(manual_datetime_str, '%Y-%m-%dT%H:%M') except ValueError: try: parsed_datetime = datetime.fromisoformat(manual_datetime_str) except ValueError: logger.warning(f"Could not parse manual datetime: {manual_datetime_str}", module="ManualImport") # Try yt-dlp for YouTube videos if use_ytdlp and platform == 'youtube': video_id = extract_video_id_from_filename(filename) if video_id: logger.info(f"Extracting YouTube metadata for video ID: {video_id}", module="ManualImport") yt_metadata = extract_youtube_metadata(video_id) if yt_metadata: username = yt_metadata.get('channel') or yt_metadata.get('uploader') or 'unknown' username = "".join(c for c in username if c.isalnum() or c in ' _-').strip().replace(' ', '_') parsed_datetime = yt_metadata.get('upload_date') if yt_metadata.get('title'): title = yt_metadata['title'][:50] title = "".join(c for c in title if c.isalnum() or c in ' _-').strip().replace(' ', '_') ext = Path(filename).suffix final_filename = f"{username}_{parsed_datetime.strftime('%Y%m%d') if parsed_datetime else 'unknown'}_{title}_{video_id}{ext}" # Fall back to filename parsing if parse_filename and username == 'unknown': if use_fallbacks: parse_result = parse_with_fallbacks(filename, INSTAGRAM_PATTERNS) else: parse_result = parser.parse(filename) if parse_result['valid']: username = parse_result['username'] or 'unknown' parsed_datetime = parse_result['datetime'] elif not use_ytdlp: result = {"filename": filename, "status": "error", "error": parse_result['error'] or "Failed to parse filename"} results.append(result) failed_count += 1 update_job_status(job_id, {'results': results.copy(), 'failed_count': failed_count}) continue dest_subdir = destination / username dest_subdir.mkdir(parents=True, exist_ok=True) dest_path = dest_subdir / final_filename move_manager.start_batch( platform=platform, source=username, content_type=content_type ) file_size = temp_path.stat().st_size if temp_path.exists() else 0 try: success = move_manager.move_file( source=temp_path, destination=dest_path, timestamp=parsed_datetime, preserve_if_no_timestamp=True, content_type=content_type ) move_manager.end_batch() if success: url_hash = hashlib.sha256(f"manual_import:{final_filename}".encode()).hexdigest() with app_state.db.get_connection(for_write=True) as conn: cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO downloads (url_hash, url, platform, source, content_type, filename, file_path, file_size, file_hash, post_date, download_date, status, media_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'completed', ?) """, ( url_hash, f"manual_import://{final_filename}", platform, username, content_type, final_filename, str(dest_path), file_size, None, parsed_datetime.isoformat() if parsed_datetime else None, datetime.now().isoformat(), None )) conn.commit() result = { "filename": filename, "status": "success", "destination": str(dest_path), "username": username, "datetime": parsed_datetime.isoformat() if parsed_datetime else None } results.append(result) success_count += 1 else: result = {"filename": filename, "status": "error", "error": "Failed to move file (possibly duplicate)"} results.append(result) failed_count += 1 except Exception as e: move_manager.end_batch() result = {"filename": filename, "status": "error", "error": str(e)} results.append(result) failed_count += 1 # Update job status after each file update_job_status(job_id, { 'results': results.copy(), 'success_count': success_count, 'failed_count': failed_count, 'processed_files': idx + 1 }) # Clean up temp directory try: temp_parent = Path(files[0]['temp_path']).parent if files else None if temp_parent and temp_parent.exists(): for f in temp_parent.iterdir(): f.unlink() temp_parent.rmdir() except Exception: pass # Emit scraper_completed event if app_state.scraper_event_emitter: app_state.scraper_event_emitter.emit_scraper_completed( session_id=session_id, stats={ 'total_downloaded': len(files), 'moved': success_count, 'review': 0, 'duplicates': 0, 'failed': failed_count } ) # Mark job as complete update_job_status(job_id, { 'status': 'completed', 'completed_at': datetime.now().isoformat(), 'current_file': None }) logger.info(f"Manual import complete: {success_count} succeeded, {failed_count} failed", module="ManualImport") # Cleanup old jobs cleanup_old_jobs() @router.post("/process") @limiter.limit("10/minute") @handle_exceptions async def process_imported_files( request: Request, background_tasks: BackgroundTasks, body: ProcessFilesRequest, current_user: Dict = Depends(get_current_user) ): """Process uploaded files in the background - returns immediately with job ID.""" app_state = get_app_state() config = app_state.settings.get('manual_import') if not config or not config.get('enabled'): raise ValidationError("Manual import is not enabled") services = config.get('services', []) service = next((s for s in services if s['name'] == body.service_name and s.get('enabled', True)), None) if not service: raise NotFoundError(f"Service '{body.service_name}' not found") # Generate unique job ID job_id = f"import_{uuid.uuid4().hex[:12]}" # Create job tracking entry create_job(job_id, len(body.files), body.service_name) # Queue background processing background_tasks.add_task( process_files_background, job_id, body.service_name, body.files, service, app_state ) logger.info(f"Manual import job {job_id} queued: {len(body.files)} files for {body.service_name}", module="ManualImport") return { "job_id": job_id, "status": "processing", "total_files": len(body.files), "message": "Processing started in background" } @router.get("/status/{job_id}") @limiter.limit("120/minute") @handle_exceptions async def get_import_job_status( request: Request, job_id: str, current_user: Dict = Depends(get_current_user) ): """Get the status of a manual import job.""" job = get_job_status(job_id) if not job: raise NotFoundError(f"Job '{job_id}' not found") return job @router.delete("/temp") @limiter.limit("10/minute") @handle_exceptions async def clear_temp_directory( request: Request, current_user: Dict = Depends(get_current_user) ): """Clear all files from manual import temp directory.""" app_state = get_app_state() config = app_state.settings.get('manual_import') temp_dir = Path(config.get('temp_dir', '/opt/media-downloader/temp/manual_import')) if config else Path('/opt/media-downloader/temp/manual_import') if temp_dir.exists(): shutil.rmtree(temp_dir) temp_dir.mkdir(parents=True, exist_ok=True) logger.info("Cleared manual import temp directory", module="ManualImport") return {"status": "success", "message": "Temp directory cleared"} @router.get("/preset-patterns") @limiter.limit("60/minute") @handle_exceptions async def get_preset_filename_patterns( request: Request, current_user: Dict = Depends(get_current_user) ): """Get available preset filename patterns.""" return {"patterns": get_preset_patterns()}