2729 lines
99 KiB
Python
2729 lines
99 KiB
Python
"""
|
|
Celebrity Discovery Router
|
|
|
|
Handles celebrity content discovery features:
|
|
- Celebrity profiles CRUD
|
|
- Search presets management
|
|
- YouTube/RSS search execution
|
|
- Discovered videos management
|
|
- Integration with video downloader
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import subprocess
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional
|
|
|
|
import feedparser
|
|
import httpx
|
|
from fastapi import APIRouter, BackgroundTasks, Body, Depends, Query, Request
|
|
from fastapi.responses import Response
|
|
from pydantic import BaseModel
|
|
from slowapi import Limiter
|
|
from slowapi.util import get_remote_address
|
|
|
|
from ..core.dependencies import get_current_user, get_app_state
|
|
from ..core.exceptions import handle_exceptions, RecordNotFoundError, ValidationError
|
|
from modules.universal_logger import get_logger
|
|
|
|
logger = get_logger('API')
|
|
|
|
router = APIRouter(prefix="/api/celebrity", tags=["Celebrity Discovery"])
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
|
|
# ============================================================================
|
|
# HELPER FUNCTIONS
|
|
# ============================================================================
|
|
|
|
async def cache_thumbnail_async(video_id: str, thumbnail_url: str, db) -> None:
|
|
"""
|
|
Pre-cache thumbnail by fetching from URL and storing in database.
|
|
This speeds up Internet Discovery page loading.
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(thumbnail_url, headers={
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
|
})
|
|
if response.status_code == 200 and response.content:
|
|
with db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE celebrity_discovered_videos
|
|
SET thumbnail_data = ?
|
|
WHERE video_id = ?
|
|
''', (response.content, video_id))
|
|
conn.commit()
|
|
logger.debug(f"Cached thumbnail for {video_id}")
|
|
except Exception as e:
|
|
logger.debug(f"Failed to cache thumbnail for {video_id}: {e}")
|
|
|
|
|
|
# ============================================================================
|
|
# PYDANTIC MODELS
|
|
# ============================================================================
|
|
|
|
class CelebrityCreate(BaseModel):
|
|
name: str
|
|
image_url: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
|
|
|
|
class CelebrityUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
image_url: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
enabled: Optional[bool] = None
|
|
|
|
|
|
class SearchPresetCreate(BaseModel):
|
|
celebrity_id: int
|
|
name: str
|
|
source_type: str # 'youtube_channel', 'youtube_search', 'youtube_rss'
|
|
source_value: str # channel_id, search query, or RSS URL
|
|
keywords: Optional[List[str]] = None
|
|
content_type: Optional[str] = 'all' # 'interview', 'red_carpet', 'photoshoot', 'bts', 'premiere', 'all'
|
|
category: Optional[str] = 'other'
|
|
platform: Optional[str] = 'youtube' # 'youtube', 'dailymotion'
|
|
|
|
|
|
class SearchPresetUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
source_type: Optional[str] = None
|
|
source_value: Optional[str] = None
|
|
keywords: Optional[List[str]] = None
|
|
content_type: Optional[str] = None
|
|
platform: Optional[str] = None
|
|
category: Optional[str] = None
|
|
enabled: Optional[bool] = None
|
|
check_frequency_hours: Optional[int] = None
|
|
|
|
|
|
class VideoStatusUpdate(BaseModel):
|
|
status: str # 'new', 'queued', 'downloaded', 'ignored', 'watched'
|
|
|
|
|
|
class BulkVideoStatusUpdate(BaseModel):
|
|
video_ids: List[int]
|
|
status: str
|
|
|
|
|
|
# ============================================================================
|
|
# HELPER FUNCTIONS
|
|
# ============================================================================
|
|
|
|
def slugify(text: str) -> str:
|
|
"""Convert text to URL-safe slug."""
|
|
text = text.lower().strip()
|
|
text = re.sub(r'[^\w\s-]', '', text)
|
|
text = re.sub(r'[\s_-]+', '-', text)
|
|
return text
|
|
|
|
|
|
def detect_content_type(title: str, description: str = '') -> str:
|
|
"""Detect content type from video title and description."""
|
|
text = f"{title} {description}".lower()
|
|
|
|
if any(kw in text for kw in ['interview', 'talks', 'discusses', 'sits down', 'chats']):
|
|
return 'interview'
|
|
elif any(kw in text for kw in ['red carpet', 'arrives', 'arrival', 'gala', 'awards']):
|
|
return 'red_carpet'
|
|
elif any(kw in text for kw in ['premiere', 'screening', 'opening night']):
|
|
return 'premiere'
|
|
elif any(kw in text for kw in ['behind the scenes', 'bts', 'making of', 'on set']):
|
|
return 'bts'
|
|
elif any(kw in text for kw in ['photoshoot', 'photo shoot', 'magazine', 'cover shoot']):
|
|
return 'photoshoot'
|
|
else:
|
|
return 'other'
|
|
|
|
|
|
async def fetch_youtube_rss(channel_id: str) -> List[Dict]:
|
|
"""Fetch videos from YouTube RSS feed."""
|
|
rss_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}"
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
try:
|
|
response = await client.get(rss_url)
|
|
if response.status_code != 200:
|
|
logger.warning(f"RSS fetch failed for {channel_id}: {response.status_code}", module="Celebrity")
|
|
return []
|
|
|
|
feed = feedparser.parse(response.text)
|
|
videos = []
|
|
|
|
for entry in feed.entries:
|
|
video_id = entry.get('yt_videoid', '')
|
|
if not video_id and 'link' in entry:
|
|
# Extract from URL
|
|
match = re.search(r'v=([^&]+)', entry.link)
|
|
if match:
|
|
video_id = match.group(1)
|
|
|
|
videos.append({
|
|
'video_id': video_id,
|
|
'platform': 'youtube',
|
|
'url': f"https://www.youtube.com/watch?v={video_id}",
|
|
'title': entry.get('title', ''),
|
|
'channel_name': feed.feed.get('title', ''),
|
|
'channel_id': channel_id,
|
|
'thumbnail': entry.get('media_thumbnail', [{}])[0].get('url', '') if entry.get('media_thumbnail') else f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg",
|
|
'upload_date': entry.get('published', ''),
|
|
'description': entry.get('summary', ''),
|
|
'view_count': int(entry.get('media_statistics', {}).get('views', 0)) if entry.get('media_statistics') else 0
|
|
})
|
|
|
|
return videos
|
|
except Exception as e:
|
|
logger.error(f"RSS fetch error for {channel_id}: {e}", module="Celebrity")
|
|
return []
|
|
|
|
|
|
async def search_youtube_ytdlp(query: str, max_results: int = 20) -> List[Dict]:
|
|
"""Search YouTube using yt-dlp."""
|
|
try:
|
|
cmd = [
|
|
'/opt/media-downloader/venv/bin/yt-dlp',
|
|
f'ytsearch{max_results}:{query}',
|
|
'--dump-json',
|
|
'--flat-playlist',
|
|
'--no-warnings',
|
|
'--ignore-errors'
|
|
]
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60)
|
|
|
|
videos = []
|
|
for line in stdout.decode().strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
video_id = data.get('id', '')
|
|
# Generate YouTube thumbnail URL from video ID (flat-playlist doesn't include it)
|
|
thumbnail = data.get('thumbnail', '')
|
|
if not thumbnail and video_id:
|
|
thumbnail = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
|
|
videos.append({
|
|
'video_id': video_id,
|
|
'platform': 'youtube',
|
|
'url': data.get('url', f"https://www.youtube.com/watch?v={video_id}"),
|
|
'title': data.get('title', ''),
|
|
'channel_name': data.get('channel', data.get('uploader', '')),
|
|
'channel_id': data.get('channel_id', ''),
|
|
'thumbnail': thumbnail,
|
|
'duration': data.get('duration', 0),
|
|
'upload_date': data.get('upload_date', ''),
|
|
'view_count': data.get('view_count', 0),
|
|
'description': data.get('description', '')[:500] if data.get('description') else ''
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return videos
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"YouTube search timeout for: {query}", module="Celebrity")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"YouTube search error: {e}", module="Celebrity")
|
|
return []
|
|
|
|
|
|
async def fetch_video_metadata(video_id: str) -> Dict:
|
|
"""Fetch full metadata for a single video including upload date and resolution.
|
|
|
|
Returns:
|
|
Dict with video metadata, or empty dict if unavailable.
|
|
Special keys:
|
|
- '_error': Error type if fetch failed ('age_restricted', 'unavailable', 'private', 'removed', 'unknown')
|
|
- '_error_message': Full error message from yt-dlp
|
|
"""
|
|
try:
|
|
cmd = [
|
|
'/opt/media-downloader/venv/bin/yt-dlp',
|
|
f'https://www.youtube.com/watch?v={video_id}',
|
|
'--dump-json',
|
|
'--no-download',
|
|
'--no-warnings'
|
|
]
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30)
|
|
|
|
if stdout:
|
|
data = json.loads(stdout.decode().strip())
|
|
|
|
# Extract max resolution and width from formats
|
|
max_resolution = 0
|
|
max_width = 0
|
|
formats = data.get('formats', [])
|
|
for fmt in formats:
|
|
height = fmt.get('height')
|
|
if height and isinstance(height, int) and height > max_resolution:
|
|
# Only count video formats (not audio-only)
|
|
if fmt.get('vcodec', 'none') != 'none':
|
|
max_resolution = height
|
|
width = fmt.get('width')
|
|
if width and isinstance(width, int):
|
|
max_width = width
|
|
|
|
return {
|
|
'video_id': video_id,
|
|
'upload_date': data.get('upload_date', ''),
|
|
'view_count': data.get('view_count', 0),
|
|
'duration': data.get('duration', 0),
|
|
'description': data.get('description', '')[:500] if data.get('description') else '',
|
|
'max_resolution': max_resolution if max_resolution > 0 else None,
|
|
'max_width': max_width if max_width > 0 else None
|
|
}
|
|
|
|
# No stdout - check stderr for error type
|
|
error_msg = stderr.decode().lower() if stderr else ''
|
|
|
|
# Detect specific error types
|
|
if 'age' in error_msg or 'sign in to confirm' in error_msg or 'age-restricted' in error_msg:
|
|
return {'_error': 'age_restricted', '_error_message': error_msg[:200]}
|
|
elif 'private' in error_msg:
|
|
return {'_error': 'private', '_error_message': error_msg[:200]}
|
|
elif 'unavailable' in error_msg or 'not available' in error_msg:
|
|
return {'_error': 'unavailable', '_error_message': error_msg[:200]}
|
|
elif 'removed' in error_msg or 'deleted' in error_msg or 'terminated' in error_msg:
|
|
return {'_error': 'removed', '_error_message': error_msg[:200]}
|
|
elif error_msg:
|
|
return {'_error': 'unknown', '_error_message': error_msg[:200]}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch metadata for {video_id}: {e}", module="Celebrity")
|
|
return {}
|
|
|
|
|
|
async def enrich_videos_with_resolution(video_ids: List[str] = None, limit: int = 50, delete_unavailable: bool = True):
|
|
"""Background task to fetch resolution for videos that don't have it.
|
|
|
|
Args:
|
|
video_ids: Optional list of specific video IDs to enrich. If None, fetches oldest videos without resolution.
|
|
limit: Maximum number of videos to process in one batch.
|
|
delete_unavailable: If True, delete videos that appear to be unavailable (no metadata returned).
|
|
"""
|
|
app_state = get_app_state()
|
|
|
|
try:
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
if video_ids:
|
|
# Fetch specific videos
|
|
placeholders = ','.join('?' * len(video_ids))
|
|
cursor.execute(f'''
|
|
SELECT id, video_id, title FROM celebrity_discovered_videos
|
|
WHERE video_id IN ({placeholders}) AND max_resolution IS NULL
|
|
LIMIT ?
|
|
''', (*video_ids, limit))
|
|
else:
|
|
# Fetch oldest videos without resolution
|
|
cursor.execute('''
|
|
SELECT id, video_id, title FROM celebrity_discovered_videos
|
|
WHERE max_resolution IS NULL
|
|
ORDER BY discovered_at ASC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
|
|
videos = [dict(row) for row in cursor.fetchall()]
|
|
|
|
if not videos:
|
|
return
|
|
|
|
logger.info(f"Enriching {len(videos)} videos with resolution data", module="Celebrity")
|
|
|
|
updated = 0
|
|
deleted = 0
|
|
age_restricted = 0
|
|
for video in videos:
|
|
try:
|
|
metadata = await fetch_video_metadata(video['video_id'])
|
|
|
|
if metadata.get('max_resolution'):
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE celebrity_discovered_videos
|
|
SET max_resolution = ?, max_width = ?
|
|
WHERE id = ?
|
|
''', (metadata['max_resolution'], metadata.get('max_width'), video['id']))
|
|
conn.commit()
|
|
updated += 1
|
|
elif metadata.get('_error') == 'age_restricted':
|
|
# Age-restricted video - keep it, just log
|
|
age_restricted += 1
|
|
logger.debug(f"Age-restricted video (keeping): {video['title'][:50]}... ({video['video_id']})", module="Celebrity")
|
|
elif metadata.get('_error') in ('unavailable', 'private', 'removed') and delete_unavailable:
|
|
# Truly unavailable - delete it
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('DELETE FROM celebrity_discovered_videos WHERE id = ?', (video['id'],))
|
|
conn.commit()
|
|
deleted += 1
|
|
logger.info(f"Deleted {metadata.get('_error')} video: {video['title'][:50]}... ({video['video_id']})", module="Celebrity")
|
|
elif not metadata and delete_unavailable:
|
|
# Empty response with no error info - likely unavailable
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('DELETE FROM celebrity_discovered_videos WHERE id = ?', (video['id'],))
|
|
conn.commit()
|
|
deleted += 1
|
|
logger.info(f"Deleted unavailable video: {video['title'][:50]}... ({video['video_id']})", module="Celebrity")
|
|
|
|
# Rate limit to avoid API issues
|
|
await asyncio.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to enrich video {video['video_id']}: {e}", module="Celebrity")
|
|
continue
|
|
|
|
log_msg = f"Enriched {updated}/{len(videos)} videos with resolution"
|
|
if deleted > 0:
|
|
log_msg += f", deleted {deleted} unavailable"
|
|
if age_restricted > 0:
|
|
log_msg += f", {age_restricted} age-restricted (kept)"
|
|
logger.info(log_msg, module="Celebrity")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Resolution enrichment failed: {e}", module="Celebrity")
|
|
|
|
|
|
async def get_channel_videos_ytdlp(channel_id: str, keyword_filter: str = None, max_results: int = 50) -> List[Dict]:
|
|
"""Get videos from a YouTube channel using yt-dlp."""
|
|
try:
|
|
channel_url = f"https://www.youtube.com/channel/{channel_id}/videos"
|
|
|
|
cmd = [
|
|
'/opt/media-downloader/venv/bin/yt-dlp',
|
|
channel_url,
|
|
'--dump-json',
|
|
'--flat-playlist',
|
|
'--no-warnings',
|
|
'--ignore-errors',
|
|
'--playlist-end', str(max_results)
|
|
]
|
|
|
|
if keyword_filter:
|
|
cmd.extend(['--match-title', keyword_filter])
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120)
|
|
|
|
videos = []
|
|
for line in stdout.decode().strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
videos.append({
|
|
'video_id': data.get('id', ''),
|
|
'platform': 'youtube',
|
|
'url': data.get('url', f"https://www.youtube.com/watch?v={data.get('id', '')}"),
|
|
'title': data.get('title', ''),
|
|
'channel_name': data.get('channel', data.get('uploader', '')),
|
|
'channel_id': channel_id,
|
|
'thumbnail': data.get('thumbnail', f"https://i.ytimg.com/vi/{data.get('id', '')}/hqdefault.jpg"),
|
|
'duration': data.get('duration', 0),
|
|
'upload_date': data.get('upload_date', ''),
|
|
'view_count': data.get('view_count', 0),
|
|
'description': data.get('description', '')[:500] if data.get('description') else ''
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return videos
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Channel fetch timeout for: {channel_id}", module="Celebrity")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Channel fetch error: {e}", module="Celebrity")
|
|
return []
|
|
|
|
|
|
# ============================================================================
|
|
# CELEBRITY PROFILE ENDPOINTS
|
|
# ============================================================================
|
|
|
|
@router.get("/profiles")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_celebrity_profiles(
|
|
request: Request,
|
|
enabled_only: bool = Query(False),
|
|
tracked_only: bool = Query(False),
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get all celebrity profiles."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# For tracked_only, return simplified list for appearances config
|
|
if tracked_only:
|
|
cursor.execute('''
|
|
SELECT id, name, tmdb_person_id, enabled, tmdb_last_sync
|
|
FROM celebrity_profiles
|
|
WHERE enabled = 1
|
|
ORDER BY name
|
|
''')
|
|
profiles = []
|
|
for row in cursor.fetchall():
|
|
profiles.append({
|
|
'id': row[0],
|
|
'name': row[1],
|
|
'tmdb_person_id': row[2],
|
|
'enabled': bool(row[3]),
|
|
'tmdb_last_sync': row[4]
|
|
})
|
|
return profiles
|
|
|
|
# Normal profiles list
|
|
if enabled_only:
|
|
cursor.execute('''
|
|
SELECT cp.*,
|
|
(SELECT COUNT(*) FROM celebrity_search_presets WHERE celebrity_id = cp.id) as preset_count,
|
|
(SELECT COUNT(*) FROM celebrity_discovered_videos WHERE celebrity_id = cp.id AND status = 'new') as new_videos
|
|
FROM celebrity_profiles cp
|
|
WHERE cp.enabled = 1
|
|
ORDER BY cp.name
|
|
''')
|
|
else:
|
|
cursor.execute('''
|
|
SELECT cp.*,
|
|
(SELECT COUNT(*) FROM celebrity_search_presets WHERE celebrity_id = cp.id) as preset_count,
|
|
(SELECT COUNT(*) FROM celebrity_discovered_videos WHERE celebrity_id = cp.id AND status = 'new') as new_videos
|
|
FROM celebrity_profiles cp
|
|
ORDER BY cp.name
|
|
''')
|
|
|
|
profiles = []
|
|
for row in cursor.fetchall():
|
|
profiles.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'slug': row['slug'],
|
|
'image_url': row['image_url'],
|
|
'notes': row['notes'],
|
|
'enabled': bool(row['enabled']),
|
|
'preset_count': row['preset_count'],
|
|
'new_videos': row['new_videos'],
|
|
'created_at': row['created_at'],
|
|
'updated_at': row['updated_at']
|
|
})
|
|
|
|
return {"success": True, "profiles": profiles}
|
|
|
|
|
|
@router.get("/profiles/{profile_id}")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_celebrity_profile(
|
|
request: Request,
|
|
profile_id: int,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get a single celebrity profile with presets."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('SELECT * FROM celebrity_profiles WHERE id = ?', (profile_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
raise RecordNotFoundError("Celebrity profile not found")
|
|
|
|
# Get presets
|
|
cursor.execute('''
|
|
SELECT * FROM celebrity_search_presets
|
|
WHERE celebrity_id = ?
|
|
ORDER BY name
|
|
''', (profile_id,))
|
|
|
|
presets = []
|
|
for preset_row in cursor.fetchall():
|
|
presets.append({
|
|
'id': preset_row['id'],
|
|
'name': preset_row['name'],
|
|
'source_type': preset_row['source_type'],
|
|
'source_value': preset_row['source_value'],
|
|
'keywords': json.loads(preset_row['keywords']) if preset_row['keywords'] else [],
|
|
'content_type': preset_row['content_type'],
|
|
'enabled': bool(preset_row['enabled']),
|
|
'last_checked': preset_row['last_checked'],
|
|
'results_count': preset_row['results_count'],
|
|
'created_at': preset_row['created_at']
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"profile": {
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'slug': row['slug'],
|
|
'image_url': row['image_url'],
|
|
'notes': row['notes'],
|
|
'enabled': bool(row['enabled']),
|
|
'created_at': row['created_at'],
|
|
'updated_at': row['updated_at']
|
|
},
|
|
"presets": presets
|
|
}
|
|
|
|
|
|
@router.post("/profiles")
|
|
@limiter.limit("20/minute")
|
|
@handle_exceptions
|
|
async def create_celebrity_profile(
|
|
request: Request,
|
|
body: CelebrityCreate,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Create a new celebrity profile."""
|
|
app_state = get_app_state()
|
|
|
|
slug = slugify(body.name)
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Check for duplicate slug
|
|
cursor.execute('SELECT id FROM celebrity_profiles WHERE slug = ?', (slug,))
|
|
if cursor.fetchone():
|
|
raise ValidationError("A celebrity with this name already exists")
|
|
|
|
cursor.execute('''
|
|
INSERT INTO celebrity_profiles (name, slug, image_url, notes)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (body.name, slug, body.image_url, body.notes))
|
|
|
|
profile_id = cursor.lastrowid
|
|
conn.commit()
|
|
|
|
logger.info(f"Created celebrity profile: {body.name}", module="Celebrity")
|
|
|
|
return {
|
|
"success": True,
|
|
"profile_id": profile_id,
|
|
"slug": slug
|
|
}
|
|
|
|
|
|
@router.put("/profiles/{profile_id}")
|
|
@limiter.limit("30/minute")
|
|
@handle_exceptions
|
|
async def update_celebrity_profile(
|
|
request: Request,
|
|
profile_id: int,
|
|
body: CelebrityUpdate,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Update a celebrity profile."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('SELECT * FROM celebrity_profiles WHERE id = ?', (profile_id,))
|
|
if not cursor.fetchone():
|
|
raise RecordNotFoundError("Celebrity profile not found")
|
|
|
|
updates = []
|
|
params = []
|
|
|
|
if body.name is not None:
|
|
updates.append("name = ?")
|
|
params.append(body.name)
|
|
updates.append("slug = ?")
|
|
params.append(slugify(body.name))
|
|
if body.image_url is not None:
|
|
updates.append("image_url = ?")
|
|
params.append(body.image_url)
|
|
if body.notes is not None:
|
|
updates.append("notes = ?")
|
|
params.append(body.notes)
|
|
if body.enabled is not None:
|
|
updates.append("enabled = ?")
|
|
params.append(1 if body.enabled else 0)
|
|
|
|
if updates:
|
|
updates.append("updated_at = CURRENT_TIMESTAMP")
|
|
params.append(profile_id)
|
|
|
|
cursor.execute(f'''
|
|
UPDATE celebrity_profiles
|
|
SET {", ".join(updates)}
|
|
WHERE id = ?
|
|
''', params)
|
|
conn.commit()
|
|
|
|
return {"success": True}
|
|
|
|
|
|
@router.delete("/profiles/{profile_id}")
|
|
@limiter.limit("10/minute")
|
|
@handle_exceptions
|
|
async def delete_celebrity_profile(
|
|
request: Request,
|
|
profile_id: int,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Delete a celebrity profile and all associated data."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('SELECT name FROM celebrity_profiles WHERE id = ?', (profile_id,))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
raise RecordNotFoundError("Celebrity profile not found")
|
|
|
|
# Cascade deletes will handle presets and discovered videos
|
|
cursor.execute('DELETE FROM celebrity_profiles WHERE id = ?', (profile_id,))
|
|
conn.commit()
|
|
|
|
logger.info(f"Deleted celebrity profile: {row['name']}", module="Celebrity")
|
|
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/profiles/{profile_id}/toggle")
|
|
@limiter.limit("30/minute")
|
|
@handle_exceptions
|
|
async def toggle_celebrity_tracking(
|
|
request: Request,
|
|
profile_id: int,
|
|
body: Dict = Body(...),
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Toggle celebrity tracking on/off for appearances monitoring."""
|
|
app_state = get_app_state()
|
|
enabled = body.get('enabled', True)
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('SELECT name FROM celebrity_profiles WHERE id = ?', (profile_id,))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
raise RecordNotFoundError("Celebrity profile not found")
|
|
|
|
cursor.execute('''
|
|
UPDATE celebrity_profiles
|
|
SET enabled = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
''', (1 if enabled else 0, profile_id))
|
|
conn.commit()
|
|
|
|
logger.info(
|
|
f"{'Enabled' if enabled else 'Disabled'} tracking for celebrity: {row['name']}",
|
|
"SUCCESS",
|
|
module="Celebrity"
|
|
)
|
|
|
|
return {"success": True, "enabled": enabled}
|
|
|
|
|
|
# ============================================================================
|
|
# SEARCH PRESET ENDPOINTS
|
|
# ============================================================================
|
|
|
|
@router.get("/presets/categories")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_preset_categories(
|
|
request: Request,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get all preset categories with counts."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT
|
|
COALESCE(category, 'other') as category,
|
|
COUNT(*) as preset_count,
|
|
SUM(CASE WHEN enabled = 1 THEN 1 ELSE 0 END) as enabled_count,
|
|
SUM(results_count) as total_results
|
|
FROM celebrity_search_presets
|
|
GROUP BY COALESCE(category, 'other')
|
|
ORDER BY category
|
|
''')
|
|
|
|
categories = []
|
|
for row in cursor.fetchall():
|
|
categories.append({
|
|
'name': row['category'],
|
|
'preset_count': row['preset_count'],
|
|
'enabled_count': row['enabled_count'],
|
|
'total_results': row['total_results'] or 0
|
|
})
|
|
|
|
return {"success": True, "categories": categories}
|
|
|
|
|
|
@router.get("/presets")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_search_presets(
|
|
request: Request,
|
|
celebrity_id: Optional[int] = None,
|
|
category: Optional[str] = None,
|
|
platform: Optional[str] = None, # Filter by platform
|
|
exclude_source_type: Optional[str] = None, # Exclude specific source type (e.g. youtube_monitor)
|
|
search: Optional[str] = None, # Search in name, celebrity_name, source_value
|
|
enabled_only: bool = Query(False),
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get search presets, optionally filtered by celebrity, category, platform, or search query."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
query = '''
|
|
SELECT sp.*, cp.name as celebrity_name
|
|
FROM celebrity_search_presets sp
|
|
JOIN celebrity_profiles cp ON sp.celebrity_id = cp.id
|
|
WHERE 1=1
|
|
'''
|
|
params = []
|
|
|
|
if celebrity_id:
|
|
query += ' AND sp.celebrity_id = ?'
|
|
params.append(celebrity_id)
|
|
|
|
if category:
|
|
query += ' AND sp.category = ?'
|
|
params.append(category)
|
|
|
|
if platform:
|
|
query += ' AND sp.platform = ?'
|
|
params.append(platform)
|
|
|
|
if exclude_source_type:
|
|
query += ' AND sp.source_type != ?'
|
|
params.append(exclude_source_type)
|
|
|
|
if search:
|
|
search_term = f'%{search}%'
|
|
query += ' AND (sp.name LIKE ? OR cp.name LIKE ? OR sp.source_value LIKE ?)'
|
|
params.extend([search_term, search_term, search_term])
|
|
|
|
if enabled_only:
|
|
query += ' AND sp.enabled = 1'
|
|
|
|
query += ' ORDER BY cp.name, sp.name'
|
|
|
|
cursor.execute(query, params)
|
|
|
|
presets = []
|
|
for row in cursor.fetchall():
|
|
presets.append({
|
|
'id': row['id'],
|
|
'celebrity_id': row['celebrity_id'],
|
|
'celebrity_name': row['celebrity_name'],
|
|
'name': row['name'],
|
|
'source_type': row['source_type'],
|
|
'source_value': row['source_value'],
|
|
'keywords': json.loads(row['keywords']) if row['keywords'] else [],
|
|
'content_type': row['content_type'],
|
|
'category': row['category'] if 'category' in row.keys() else 'other',
|
|
'platform': row['platform'] if 'platform' in row.keys() else 'youtube',
|
|
'enabled': bool(row['enabled']),
|
|
'last_checked': row['last_checked'],
|
|
'check_frequency_hours': row['check_frequency_hours'],
|
|
'results_count': row['results_count'],
|
|
'created_at': row['created_at']
|
|
})
|
|
|
|
return {"success": True, "presets": presets}
|
|
|
|
|
|
@router.post("/presets")
|
|
@limiter.limit("30/minute")
|
|
@handle_exceptions
|
|
async def create_search_preset(
|
|
request: Request,
|
|
body: SearchPresetCreate,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Create a new search preset."""
|
|
app_state = get_app_state()
|
|
|
|
valid_source_types = ['youtube_channel', 'youtube_search', 'youtube_rss', 'dailymotion_channel']
|
|
if body.source_type not in valid_source_types:
|
|
raise ValidationError(f"Invalid source_type. Must be one of: {', '.join(valid_source_types)}")
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Verify celebrity exists
|
|
cursor.execute('SELECT id FROM celebrity_profiles WHERE id = ?', (body.celebrity_id,))
|
|
if not cursor.fetchone():
|
|
raise RecordNotFoundError("Celebrity profile not found")
|
|
|
|
cursor.execute('''
|
|
INSERT INTO celebrity_search_presets
|
|
(celebrity_id, name, source_type, source_value, keywords, content_type, category, platform)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
body.celebrity_id,
|
|
body.name,
|
|
body.source_type,
|
|
body.source_value,
|
|
json.dumps(body.keywords) if body.keywords else None,
|
|
body.content_type,
|
|
body.category or 'other',
|
|
body.platform or 'youtube'
|
|
))
|
|
|
|
preset_id = cursor.lastrowid
|
|
conn.commit()
|
|
|
|
logger.info(f"Created search preset: {body.name}", module="Celebrity")
|
|
|
|
return {"success": True, "preset_id": preset_id}
|
|
|
|
|
|
@router.post("/presets/bulk")
|
|
@limiter.limit("10/minute")
|
|
@handle_exceptions
|
|
async def create_bulk_presets(
|
|
request: Request,
|
|
presets: List[SearchPresetCreate] = Body(...),
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Create multiple search presets at once."""
|
|
app_state = get_app_state()
|
|
|
|
created_ids = []
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
for preset in presets:
|
|
if preset.source_type not in ['youtube_channel', 'youtube_search', 'youtube_rss']:
|
|
continue
|
|
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO celebrity_search_presets
|
|
(celebrity_id, name, source_type, source_value, keywords, content_type)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
preset.celebrity_id,
|
|
preset.name,
|
|
preset.source_type,
|
|
preset.source_value,
|
|
json.dumps(preset.keywords) if preset.keywords else None,
|
|
preset.content_type
|
|
))
|
|
|
|
if cursor.lastrowid:
|
|
created_ids.append(cursor.lastrowid)
|
|
|
|
conn.commit()
|
|
|
|
logger.info(f"Created {len(created_ids)} bulk presets", module="Celebrity")
|
|
|
|
return {"success": True, "created_count": len(created_ids), "preset_ids": created_ids}
|
|
|
|
|
|
@router.put("/presets/{preset_id}")
|
|
@limiter.limit("30/minute")
|
|
@handle_exceptions
|
|
async def update_search_preset(
|
|
request: Request,
|
|
preset_id: int,
|
|
body: SearchPresetUpdate,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Update a search preset."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('SELECT * FROM celebrity_search_presets WHERE id = ?', (preset_id,))
|
|
if not cursor.fetchone():
|
|
raise RecordNotFoundError("Search preset not found")
|
|
|
|
updates = []
|
|
params = []
|
|
|
|
if body.name is not None:
|
|
updates.append("name = ?")
|
|
params.append(body.name)
|
|
if body.source_type is not None:
|
|
updates.append("source_type = ?")
|
|
params.append(body.source_type)
|
|
if body.source_value is not None:
|
|
updates.append("source_value = ?")
|
|
params.append(body.source_value)
|
|
if body.keywords is not None:
|
|
updates.append("keywords = ?")
|
|
params.append(json.dumps(body.keywords))
|
|
if body.content_type is not None:
|
|
updates.append("content_type = ?")
|
|
params.append(body.content_type)
|
|
if body.category is not None:
|
|
updates.append("category = ?")
|
|
params.append(body.category)
|
|
if body.enabled is not None:
|
|
updates.append("enabled = ?")
|
|
params.append(1 if body.enabled else 0)
|
|
if body.check_frequency_hours is not None:
|
|
updates.append("check_frequency_hours = ?")
|
|
params.append(body.check_frequency_hours)
|
|
if body.platform is not None:
|
|
updates.append("platform = ?")
|
|
params.append(body.platform)
|
|
|
|
if updates:
|
|
updates.append("updated_at = CURRENT_TIMESTAMP")
|
|
params.append(preset_id)
|
|
|
|
cursor.execute(f'''
|
|
UPDATE celebrity_search_presets
|
|
SET {", ".join(updates)}
|
|
WHERE id = ?
|
|
''', params)
|
|
conn.commit()
|
|
|
|
return {"success": True}
|
|
|
|
|
|
@router.delete("/presets/{preset_id}")
|
|
@limiter.limit("20/minute")
|
|
@handle_exceptions
|
|
async def delete_search_preset(
|
|
request: Request,
|
|
preset_id: int,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Delete a search preset."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('DELETE FROM celebrity_search_presets WHERE id = ?', (preset_id,))
|
|
if cursor.rowcount == 0:
|
|
raise RecordNotFoundError("Search preset not found")
|
|
|
|
conn.commit()
|
|
|
|
return {"success": True}
|
|
|
|
|
|
# ============================================================================
|
|
# SEARCH EXECUTION ENDPOINTS
|
|
# ============================================================================
|
|
|
|
@router.post("/presets/{preset_id}/search")
|
|
@limiter.limit("10/minute")
|
|
@handle_exceptions
|
|
async def execute_preset_search(
|
|
request: Request,
|
|
preset_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Execute a search preset and discover videos."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT sp.*, cp.name as celebrity_name, cp.id as celebrity_id
|
|
FROM celebrity_search_presets sp
|
|
JOIN celebrity_profiles cp ON sp.celebrity_id = cp.id
|
|
WHERE sp.id = ?
|
|
''', (preset_id,))
|
|
|
|
preset = cursor.fetchone()
|
|
if not preset:
|
|
raise RecordNotFoundError("Search preset not found")
|
|
|
|
# Execute search based on source type
|
|
videos = []
|
|
|
|
if preset['source_type'] == 'youtube_rss':
|
|
videos = await fetch_youtube_rss(preset['source_value'])
|
|
elif preset['source_type'] == 'youtube_search':
|
|
videos = await search_youtube_ytdlp(preset['source_value'], max_results=30)
|
|
elif preset['source_type'] == 'youtube_channel':
|
|
# Build keyword filter from preset keywords
|
|
keyword_filter = None
|
|
if preset['keywords']:
|
|
keywords = json.loads(preset['keywords'])
|
|
if keywords:
|
|
keyword_filter = '|'.join(keywords)
|
|
videos = await get_channel_videos_ytdlp(preset['source_value'], keyword_filter, max_results=50)
|
|
|
|
# Filter by keywords if present
|
|
if preset['keywords']:
|
|
keywords = json.loads(preset['keywords'])
|
|
if keywords:
|
|
filtered = []
|
|
for video in videos:
|
|
title_lower = video.get('title', '').lower()
|
|
desc_lower = video.get('description', '').lower()
|
|
if any(kw.lower() in title_lower or kw.lower() in desc_lower for kw in keywords):
|
|
filtered.append(video)
|
|
videos = filtered
|
|
|
|
# Filter to only include videos with celebrity name in the title
|
|
celebrity_name = preset['celebrity_name'].lower()
|
|
name_parts = celebrity_name.split()
|
|
filtered_videos = []
|
|
for video in videos:
|
|
title_lower = video.get('title', '').lower()
|
|
# Stricter matching: require BOTH first AND last name, or the full name
|
|
# This prevents "Eva Mendes" or "Eva Green" from matching "Eva Longoria"
|
|
if len(name_parts) >= 2:
|
|
first_name = name_parts[0]
|
|
last_name = name_parts[-1]
|
|
# Must have full name OR (first name AND last name separately)
|
|
if celebrity_name in title_lower or (first_name in title_lower and last_name in title_lower):
|
|
filtered_videos.append(video)
|
|
else:
|
|
# Single name - require exact match
|
|
if celebrity_name in title_lower:
|
|
filtered_videos.append(video)
|
|
videos = filtered_videos
|
|
|
|
# Filter out blocked YouTube channels
|
|
celebrity_discovery_settings = app_state.settings.get('celebrity_discovery') or {}
|
|
blocked_channels_str = celebrity_discovery_settings.get('blocked_youtube_channels', '')
|
|
if blocked_channels_str:
|
|
blocked_channels = set(
|
|
name.strip().lower()
|
|
for name in blocked_channels_str.split('\n')
|
|
if name.strip()
|
|
)
|
|
if blocked_channels:
|
|
videos = [
|
|
v for v in videos
|
|
if (v.get('channel_name') or '').lower() not in blocked_channels
|
|
]
|
|
|
|
# Store discovered videos
|
|
new_count = 0
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
for video in videos:
|
|
if not video.get('video_id'):
|
|
continue
|
|
|
|
# Detect content type
|
|
content_type = detect_content_type(
|
|
video.get('title', ''),
|
|
video.get('description', '')
|
|
)
|
|
|
|
# Parse upload date
|
|
upload_date = None
|
|
if video.get('upload_date'):
|
|
try:
|
|
if len(video['upload_date']) == 8: # YYYYMMDD format
|
|
upload_date = datetime.strptime(video['upload_date'], '%Y%m%d').isoformat()
|
|
else:
|
|
upload_date = video['upload_date']
|
|
except (ValueError, TypeError) as e:
|
|
logger.debug(f"Failed to parse upload_date '{video['upload_date']}': {e}")
|
|
|
|
try:
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO celebrity_discovered_videos
|
|
(preset_id, celebrity_id, video_id, platform, url, title, channel_name,
|
|
channel_id, thumbnail, duration, upload_date, view_count, description, content_type)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
preset_id,
|
|
preset['celebrity_id'],
|
|
video['video_id'],
|
|
video.get('platform', 'youtube'),
|
|
video.get('url', ''),
|
|
video.get('title', ''),
|
|
video.get('channel_name', ''),
|
|
video.get('channel_id', ''),
|
|
video.get('thumbnail', ''),
|
|
video.get('duration', 0),
|
|
upload_date,
|
|
video.get('view_count', 0),
|
|
video.get('description', ''),
|
|
content_type
|
|
))
|
|
if cursor.rowcount > 0:
|
|
new_count += 1
|
|
# Pre-cache thumbnail for faster page loading
|
|
thumbnail_url = video.get('thumbnail', '')
|
|
if thumbnail_url:
|
|
await cache_thumbnail_async(video['video_id'], thumbnail_url, app_state.db)
|
|
except Exception as e:
|
|
logger.warning(f"Error storing video: {e}", module="Celebrity")
|
|
|
|
# Update preset stats
|
|
cursor.execute('''
|
|
UPDATE celebrity_search_presets
|
|
SET last_checked = CURRENT_TIMESTAMP,
|
|
results_count = (SELECT COUNT(*) FROM celebrity_discovered_videos WHERE preset_id = ?),
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
''', (preset_id, preset_id))
|
|
|
|
conn.commit()
|
|
|
|
logger.info(f"Preset search '{preset['name']}' found {len(videos)} videos, {new_count} new",
|
|
"SUCCESS", module="Celebrity")
|
|
|
|
return {
|
|
"success": True,
|
|
"videos_found": len(videos),
|
|
"new_videos": new_count,
|
|
"preset_name": preset['name']
|
|
}
|
|
|
|
|
|
# In-memory storage for search job progress
|
|
_search_jobs: Dict[str, Dict] = {}
|
|
|
|
|
|
@router.post("/search-all")
|
|
@limiter.limit("5/minute")
|
|
@handle_exceptions
|
|
async def search_all_presets(
|
|
request: Request,
|
|
celebrity_id: Optional[int] = Query(None),
|
|
background: bool = Query(False),
|
|
background_tasks: BackgroundTasks = None,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Execute all enabled search presets (or for a specific celebrity).
|
|
|
|
If background=True, starts the search in background and returns a job_id for tracking.
|
|
"""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
if celebrity_id:
|
|
cursor.execute('''
|
|
SELECT id, name FROM celebrity_search_presets
|
|
WHERE celebrity_id = ? AND enabled = 1
|
|
''', (celebrity_id,))
|
|
else:
|
|
cursor.execute('''
|
|
SELECT id, name FROM celebrity_search_presets
|
|
WHERE enabled = 1
|
|
''')
|
|
|
|
presets = [{'id': row['id'], 'name': row['name']} for row in cursor.fetchall()]
|
|
|
|
if not background:
|
|
# Synchronous execution (original behavior)
|
|
results = []
|
|
for preset in presets:
|
|
try:
|
|
result = await execute_preset_search(request, preset['id'], BackgroundTasks(), current_user)
|
|
results.append(result)
|
|
except Exception as e:
|
|
logger.warning(f"Preset {preset['id']} search failed: {e}", module="Celebrity")
|
|
results.append({"preset_id": preset['id'], "error": str(e)})
|
|
|
|
total_new = sum(r.get('new_videos', 0) for r in results if 'new_videos' in r)
|
|
|
|
# Enrich newly discovered videos with resolution data in background
|
|
if total_new > 0:
|
|
asyncio.create_task(enrich_videos_with_resolution(limit=min(total_new, 100)))
|
|
|
|
return {
|
|
"success": True,
|
|
"presets_searched": len(presets),
|
|
"total_new_videos": total_new,
|
|
"results": results
|
|
}
|
|
else:
|
|
# Background execution with progress tracking
|
|
import uuid
|
|
job_id = str(uuid.uuid4())[:8]
|
|
|
|
# Initialize job progress
|
|
_search_jobs[job_id] = {
|
|
'status': 'running',
|
|
'started_at': datetime.now().isoformat(),
|
|
'total_presets': len(presets),
|
|
'completed_presets': 0,
|
|
'current_preset': presets[0]['name'] if presets else '',
|
|
'total_new_videos': 0,
|
|
'results': [],
|
|
'errors': []
|
|
}
|
|
|
|
# Start background task
|
|
async def run_search():
|
|
job = _search_jobs[job_id]
|
|
for i, preset in enumerate(presets):
|
|
try:
|
|
job['current_preset'] = preset['name']
|
|
result = await execute_preset_search(request, preset['id'], BackgroundTasks(), current_user)
|
|
job['results'].append(result)
|
|
job['total_new_videos'] += result.get('new_videos', 0)
|
|
except Exception as e:
|
|
logger.warning(f"Preset {preset['id']} search failed: {e}", module="Celebrity")
|
|
job['errors'].append({'preset_id': preset['id'], 'preset_name': preset['name'], 'error': str(e)})
|
|
|
|
job['completed_presets'] = i + 1
|
|
|
|
job['status'] = 'completed'
|
|
job['completed_at'] = datetime.now().isoformat()
|
|
job['current_preset'] = ''
|
|
|
|
# Enrich newly discovered videos with resolution data in background
|
|
if job['total_new_videos'] > 0:
|
|
asyncio.create_task(enrich_videos_with_resolution(limit=min(job['total_new_videos'], 100)))
|
|
|
|
# Schedule background task
|
|
asyncio.create_task(run_search())
|
|
|
|
return {
|
|
"success": True,
|
|
"job_id": job_id,
|
|
"total_presets": len(presets),
|
|
"message": "Search started in background"
|
|
}
|
|
|
|
|
|
@router.get("/search-jobs/{job_id}")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_search_job_status(
|
|
request: Request,
|
|
job_id: str,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get the status of a background search job."""
|
|
if job_id not in _search_jobs:
|
|
raise RecordNotFoundError("Search job not found")
|
|
|
|
return {
|
|
"success": True,
|
|
"job": _search_jobs[job_id]
|
|
}
|
|
|
|
|
|
@router.delete("/search-jobs/{job_id}")
|
|
@limiter.limit("30/minute")
|
|
@handle_exceptions
|
|
async def delete_search_job(
|
|
request: Request,
|
|
job_id: str,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Delete a completed search job from memory."""
|
|
if job_id not in _search_jobs:
|
|
raise RecordNotFoundError("Search job not found")
|
|
|
|
del _search_jobs[job_id]
|
|
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/presets/all/discover-enrich")
|
|
@limiter.limit("1/minute")
|
|
@handle_exceptions
|
|
async def run_discover_enrich_all(
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Run discovery search and enrichment for ALL enabled presets."""
|
|
import subprocess
|
|
|
|
app_state = get_app_state()
|
|
|
|
# Count enabled presets
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT COUNT(*) as count FROM celebrity_search_presets WHERE enabled = 1')
|
|
result = cursor.fetchone()
|
|
if not result or result['count'] == 0:
|
|
raise RecordNotFoundError("No enabled presets found")
|
|
preset_count = result['count']
|
|
|
|
# Run the script in background (no args = all enabled presets)
|
|
def run_script():
|
|
try:
|
|
result = subprocess.run(
|
|
['/opt/media-downloader/venv/bin/python3', '/opt/media-downloader/scripts/discover_and_enrich.py'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=7200 # 2 hour timeout for full scan
|
|
)
|
|
logger.info(f"Full Discover & Enrich completed: {result.stdout[-500:] if result.stdout else 'No output'}", module="Celebrity")
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning("Full Discover & Enrich timed out", module="Celebrity")
|
|
except Exception as e:
|
|
logger.error(f"Full Discover & Enrich failed: {e}", module="Celebrity")
|
|
|
|
background_tasks.add_task(run_script)
|
|
|
|
logger.info(f"Started Full Discover & Enrich ({preset_count} presets)", module="Celebrity")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Full discovery and enrichment started ({preset_count} presets)",
|
|
"preset_count": preset_count
|
|
}
|
|
|
|
|
|
class BatchPresetRequest(BaseModel):
|
|
preset_ids: List[int]
|
|
|
|
|
|
@router.post("/presets/batch/discover-enrich")
|
|
@limiter.limit("2/minute")
|
|
@handle_exceptions
|
|
async def run_discover_enrich_batch(
|
|
request: Request,
|
|
body: BatchPresetRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Run discovery search and enrichment for a batch of selected presets."""
|
|
import subprocess
|
|
|
|
preset_ids = body.preset_ids
|
|
if not preset_ids:
|
|
raise ValueError("No preset IDs provided")
|
|
|
|
app_state = get_app_state()
|
|
|
|
# Verify presets exist
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
placeholders = ','.join('?' * len(preset_ids))
|
|
cursor.execute(f'SELECT id, name FROM celebrity_search_presets WHERE id IN ({placeholders})', preset_ids)
|
|
found_presets = cursor.fetchall()
|
|
if len(found_presets) != len(preset_ids):
|
|
raise RecordNotFoundError(f"Some presets not found")
|
|
|
|
# Run the script in background with specific preset IDs
|
|
def run_script():
|
|
try:
|
|
result = subprocess.run(
|
|
['/opt/media-downloader/venv/bin/python3', '/opt/media-downloader/scripts/discover_and_enrich.py',
|
|
'--preset-ids', ','.join(str(p) for p in preset_ids)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=3600 # 1 hour timeout for batch
|
|
)
|
|
logger.info(f"Batch Discover & Enrich completed ({len(preset_ids)} presets): {result.stdout[-500:] if result.stdout else 'No output'}", module="Celebrity")
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning(f"Batch Discover & Enrich timed out ({len(preset_ids)} presets)", module="Celebrity")
|
|
except Exception as e:
|
|
logger.error(f"Batch Discover & Enrich failed: {e}", module="Celebrity")
|
|
|
|
background_tasks.add_task(run_script)
|
|
|
|
logger.info(f"Started Batch Discover & Enrich ({len(preset_ids)} presets)", module="Celebrity")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Batch discovery started ({len(preset_ids)} presets)",
|
|
"preset_count": len(preset_ids)
|
|
}
|
|
|
|
|
|
@router.post("/presets/{preset_id}/discover-enrich")
|
|
@limiter.limit("5/minute")
|
|
@handle_exceptions
|
|
async def run_discover_and_enrich(
|
|
request: Request,
|
|
preset_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Run discovery search and enrichment for a specific preset using the discover_and_enrich.py script."""
|
|
import subprocess
|
|
import uuid
|
|
import re as regex
|
|
|
|
app_state = get_app_state()
|
|
|
|
# Verify preset exists
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT id, name FROM celebrity_search_presets WHERE id = ?', (preset_id,))
|
|
preset = cursor.fetchone()
|
|
if not preset:
|
|
raise RecordNotFoundError("Search preset not found")
|
|
|
|
# Create task ID
|
|
task_id = str(uuid.uuid4())[:8]
|
|
|
|
# Create task record
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO background_tasks (id, task_type, status, metadata)
|
|
VALUES (?, 'discover_enrich', 'running', ?)
|
|
''', (task_id, json.dumps({'preset_id': preset_id, 'preset_name': preset['name']})))
|
|
conn.commit()
|
|
|
|
# Run the script in background
|
|
def run_script():
|
|
try:
|
|
result = subprocess.run(
|
|
['/opt/media-downloader/venv/bin/python3', '/opt/media-downloader/scripts/discover_and_enrich.py', '--preset', str(preset_id)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=600 # 10 minute timeout
|
|
)
|
|
# Parse results from output
|
|
output = result.stdout or ''
|
|
new_count = 0
|
|
match_count = 0
|
|
results_count = 0
|
|
|
|
# Look for "RESULTS: X results, Y match, Z new" format
|
|
results_match = regex.search(r'RESULTS:\s*(\d+)\s+results,\s*(\d+)\s+match,\s*(\d+)\s+new', output)
|
|
if results_match:
|
|
results_count = int(results_match.group(1))
|
|
match_count = int(results_match.group(2))
|
|
new_count = int(results_match.group(3))
|
|
else:
|
|
# Fallback to old patterns
|
|
new_match = regex.search(r'(\d+)\s+new', output)
|
|
if new_match:
|
|
new_count = int(new_match.group(1))
|
|
|
|
match_match = regex.search(r'(\d+)\s+match', output)
|
|
if match_match:
|
|
match_count = int(match_match.group(1))
|
|
|
|
# Update task as completed
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE background_tasks
|
|
SET status = 'completed', completed_at = CURRENT_TIMESTAMP,
|
|
result = ?
|
|
WHERE id = ?
|
|
''', (json.dumps({'results_count': results_count, 'new_count': new_count, 'match_count': match_count, 'output': output[-500:]}), task_id))
|
|
conn.commit()
|
|
|
|
logger.info(f"Discover & Enrich completed for preset {preset_id}: {results_count} results, {match_count} match, {new_count} new", module="Celebrity")
|
|
except subprocess.TimeoutExpired:
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE background_tasks
|
|
SET status = 'failed', completed_at = CURRENT_TIMESTAMP, result = ?
|
|
WHERE id = ?
|
|
''', (json.dumps({'error': 'Timeout after 10 minutes'}), task_id))
|
|
conn.commit()
|
|
logger.warning(f"Discover & Enrich timed out for preset {preset_id}", module="Celebrity")
|
|
except Exception as e:
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE background_tasks
|
|
SET status = 'failed', completed_at = CURRENT_TIMESTAMP, result = ?
|
|
WHERE id = ?
|
|
''', (json.dumps({'error': str(e)}), task_id))
|
|
conn.commit()
|
|
logger.error(f"Discover & Enrich failed for preset {preset_id}: {e}", module="Celebrity")
|
|
|
|
background_tasks.add_task(run_script)
|
|
|
|
logger.info(f"Started Discover & Enrich for preset: {preset['name']}", module="Celebrity")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Discovery and enrichment started for preset '{preset['name']}'",
|
|
"preset_id": preset_id,
|
|
"task_id": task_id
|
|
}
|
|
|
|
|
|
@router.get("/tasks/{task_id}")
|
|
@handle_exceptions
|
|
async def get_task_status(
|
|
request: Request,
|
|
task_id: str,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get background task status."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT * FROM background_tasks WHERE id = ?', (task_id,))
|
|
task = cursor.fetchone()
|
|
|
|
if not task:
|
|
raise RecordNotFoundError("Task not found")
|
|
|
|
return {
|
|
"success": True,
|
|
"task": {
|
|
"id": task['id'],
|
|
"task_type": task['task_type'],
|
|
"status": task['status'],
|
|
"started_at": task['started_at'],
|
|
"completed_at": task['completed_at'],
|
|
"result": json.loads(task['result']) if task['result'] else None,
|
|
"metadata": json.loads(task['metadata']) if task['metadata'] else None
|
|
}
|
|
}
|
|
|
|
|
|
@router.post("/presets/category/{category}/discover-enrich")
|
|
@limiter.limit("2/minute")
|
|
@handle_exceptions
|
|
async def run_discover_enrich_category(
|
|
request: Request,
|
|
category: str,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Run discovery search and enrichment for all presets in a category."""
|
|
import subprocess
|
|
|
|
app_state = get_app_state()
|
|
|
|
# Verify category has presets
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT COUNT(*) as count FROM celebrity_search_presets WHERE category = ? AND enabled = 1', (category,))
|
|
result = cursor.fetchone()
|
|
if not result or result['count'] == 0:
|
|
raise RecordNotFoundError(f"No enabled presets found in category '{category}'")
|
|
preset_count = result['count']
|
|
|
|
# Run the script in background
|
|
def run_script():
|
|
try:
|
|
result = subprocess.run(
|
|
['/opt/media-downloader/venv/bin/python3', '/opt/media-downloader/scripts/discover_and_enrich.py', '--category', category],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=1800 # 30 minute timeout for category
|
|
)
|
|
logger.info(f"Discover & Enrich completed for category {category}: {result.stdout[-500:] if result.stdout else 'No output'}", module="Celebrity")
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning(f"Discover & Enrich timed out for category {category}", module="Celebrity")
|
|
except Exception as e:
|
|
logger.error(f"Discover & Enrich failed for category {category}: {e}", module="Celebrity")
|
|
|
|
background_tasks.add_task(run_script)
|
|
|
|
logger.info(f"Started Discover & Enrich for category: {category} ({preset_count} presets)", module="Celebrity")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Discovery and enrichment started for category '{category}' ({preset_count} presets)",
|
|
"category": category,
|
|
"preset_count": preset_count
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# CATEGORY ENDPOINTS
|
|
# ============================================================================
|
|
|
|
@router.get("/categories")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_categories_with_video_counts(
|
|
request: Request,
|
|
celebrity_id: Optional[int] = None,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get preset categories with video counts."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Build query based on filters
|
|
if celebrity_id:
|
|
cursor.execute('''
|
|
SELECT
|
|
p.category,
|
|
COUNT(DISTINCT p.id) as preset_count,
|
|
COUNT(v.id) as video_count,
|
|
SUM(CASE WHEN v.status = 'new' THEN 1 ELSE 0 END) as new_count
|
|
FROM celebrity_search_presets p
|
|
LEFT JOIN celebrity_discovered_videos v ON p.id = v.preset_id
|
|
WHERE p.celebrity_id = ?
|
|
GROUP BY p.category
|
|
ORDER BY video_count DESC
|
|
''', (celebrity_id,))
|
|
else:
|
|
cursor.execute('''
|
|
SELECT
|
|
p.category,
|
|
COUNT(DISTINCT p.id) as preset_count,
|
|
COUNT(v.id) as video_count,
|
|
SUM(CASE WHEN v.status = 'new' THEN 1 ELSE 0 END) as new_count
|
|
FROM celebrity_search_presets p
|
|
LEFT JOIN celebrity_discovered_videos v ON p.id = v.preset_id
|
|
GROUP BY p.category
|
|
ORDER BY video_count DESC
|
|
''')
|
|
|
|
categories = []
|
|
for row in cursor.fetchall():
|
|
categories.append({
|
|
'category': row['category'] or 'other',
|
|
'preset_count': row['preset_count'],
|
|
'video_count': row['video_count'],
|
|
'new_count': row['new_count'] or 0
|
|
})
|
|
|
|
return {"success": True, "categories": categories}
|
|
|
|
|
|
# ============================================================================
|
|
# DISCOVERED VIDEOS ENDPOINTS
|
|
# ============================================================================
|
|
|
|
@router.get("/sources")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_unique_sources(
|
|
request: Request,
|
|
celebrity_id: Optional[int] = None,
|
|
category: Optional[str] = None,
|
|
preset_ids: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
watched: Optional[str] = None,
|
|
platform: Optional[str] = None,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get unique channel names (sources) for filter dropdown, filtered by current criteria."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
query = '''
|
|
SELECT DISTINCT v.channel_name
|
|
FROM celebrity_discovered_videos v
|
|
JOIN celebrity_search_presets sp ON v.preset_id = sp.id
|
|
WHERE v.channel_name IS NOT NULL AND v.channel_name != ''
|
|
'''
|
|
params = []
|
|
|
|
if celebrity_id:
|
|
query += ' AND v.celebrity_id = ?'
|
|
params.append(celebrity_id)
|
|
|
|
if category:
|
|
query += ' AND sp.category = ?'
|
|
params.append(category)
|
|
|
|
if preset_ids:
|
|
try:
|
|
id_list = [int(x.strip()) for x in preset_ids.split(',') if x.strip()]
|
|
if id_list:
|
|
placeholders = ','.join(['?' for _ in id_list])
|
|
query += f' AND v.preset_id IN ({placeholders})'
|
|
params.extend(id_list)
|
|
except ValueError:
|
|
pass
|
|
|
|
if status:
|
|
if status == 'not_queued':
|
|
query += " AND v.status NOT IN ('queued', 'downloaded')"
|
|
elif status == 'not_downloaded':
|
|
query += " AND v.status != 'downloaded'"
|
|
else:
|
|
query += ' AND v.status = ?'
|
|
params.append(status)
|
|
|
|
if watched:
|
|
if watched == 'watched':
|
|
query += " AND v.status = 'watched'"
|
|
elif watched == 'unwatched':
|
|
query += " AND v.status != 'watched'"
|
|
|
|
if platform:
|
|
query += ' AND v.platform = ?'
|
|
params.append(platform)
|
|
|
|
query += ' ORDER BY v.channel_name'
|
|
|
|
cursor.execute(query, params)
|
|
sources = [row['channel_name'] for row in cursor.fetchall()]
|
|
|
|
return {"success": True, "sources": sources}
|
|
|
|
|
|
@router.get("/videos")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_discovered_videos(
|
|
request: Request,
|
|
celebrity_id: Optional[int] = None,
|
|
preset_id: Optional[int] = None,
|
|
preset_ids: Optional[str] = None, # Comma-separated list of preset IDs for grouped presets
|
|
category: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
watched: Optional[str] = None, # 'watched' or 'unwatched'
|
|
channel_name: Optional[str] = None, # Filter by YouTube channel/source
|
|
platform: Optional[str] = None, # Filter by platform (youtube, dailymotion)
|
|
content_type: Optional[str] = None,
|
|
min_resolution: Optional[int] = None,
|
|
resolution_sort: Optional[str] = None, # 'highest' or 'lowest'
|
|
date_sort: Optional[str] = None, # 'discovered_newest', 'discovered_oldest', 'uploaded_newest', 'uploaded_oldest'
|
|
name_sort: Optional[str] = None, # 'name_asc' or 'name_desc'
|
|
search: Optional[str] = None, # Search in title and channel_name
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get discovered videos with filters."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
query = '''
|
|
SELECT v.*, cp.name as celebrity_name, sp.name as preset_name, sp.category as preset_category
|
|
FROM celebrity_discovered_videos v
|
|
JOIN celebrity_profiles cp ON v.celebrity_id = cp.id
|
|
JOIN celebrity_search_presets sp ON v.preset_id = sp.id
|
|
WHERE 1=1
|
|
'''
|
|
count_query = '''
|
|
SELECT COUNT(*) FROM celebrity_discovered_videos v
|
|
JOIN celebrity_search_presets sp ON v.preset_id = sp.id
|
|
WHERE 1=1
|
|
'''
|
|
params = []
|
|
|
|
if celebrity_id:
|
|
query += ' AND v.celebrity_id = ?'
|
|
count_query += ' AND v.celebrity_id = ?'
|
|
params.append(celebrity_id)
|
|
|
|
if preset_id:
|
|
query += ' AND v.preset_id = ?'
|
|
count_query += ' AND v.preset_id = ?'
|
|
params.append(preset_id)
|
|
|
|
# Support multiple preset IDs (comma-separated) for grouped presets
|
|
if preset_ids:
|
|
try:
|
|
id_list = [int(x.strip()) for x in preset_ids.split(',') if x.strip()]
|
|
if id_list:
|
|
placeholders = ','.join(['?' for _ in id_list])
|
|
query += f' AND v.preset_id IN ({placeholders})'
|
|
count_query += f' AND v.preset_id IN ({placeholders})'
|
|
params.extend(id_list)
|
|
except ValueError:
|
|
pass # Invalid preset_ids format, ignore
|
|
|
|
if category:
|
|
query += ' AND sp.category = ?'
|
|
count_query += ' AND sp.category = ?'
|
|
params.append(category)
|
|
|
|
if status:
|
|
if status == 'not_queued':
|
|
# Show videos that are not queued or downloaded
|
|
query += " AND v.status NOT IN ('queued', 'downloaded')"
|
|
count_query += " AND v.status NOT IN ('queued', 'downloaded')"
|
|
elif status == 'not_downloaded':
|
|
# Show videos that are not downloaded (includes queued, new, ignored)
|
|
query += " AND v.status != 'downloaded'"
|
|
count_query += " AND v.status != 'downloaded'"
|
|
else:
|
|
query += ' AND v.status = ?'
|
|
count_query += ' AND v.status = ?'
|
|
params.append(status)
|
|
|
|
if content_type:
|
|
query += ' AND v.content_type = ?'
|
|
count_query += ' AND v.content_type = ?'
|
|
params.append(content_type)
|
|
|
|
if min_resolution:
|
|
query += ' AND v.max_resolution >= ?'
|
|
count_query += ' AND v.max_resolution >= ?'
|
|
params.append(min_resolution)
|
|
|
|
if watched:
|
|
if watched == 'watched':
|
|
query += " AND v.status = 'watched'"
|
|
count_query += " AND v.status = 'watched'"
|
|
elif watched == 'unwatched':
|
|
query += " AND v.status != 'watched'"
|
|
count_query += " AND v.status != 'watched'"
|
|
|
|
if channel_name:
|
|
# Support pattern-based filtering for Easynews TV/Movies
|
|
channel_name_stripped = channel_name.strip()
|
|
if channel_name_stripped == 'Easynews - TV':
|
|
query += " AND v.channel_name LIKE 'Easynews - tv:%'"
|
|
count_query += " AND v.channel_name LIKE 'Easynews - tv:%'"
|
|
elif channel_name_stripped == 'Easynews - Movies':
|
|
query += " AND v.channel_name LIKE 'Easynews - movie:%'"
|
|
count_query += " AND v.channel_name LIKE 'Easynews - movie:%'"
|
|
else:
|
|
query += ' AND v.channel_name = ?'
|
|
count_query += ' AND v.channel_name = ?'
|
|
params.append(channel_name)
|
|
|
|
if platform:
|
|
query += ' AND v.platform = ?'
|
|
count_query += ' AND v.platform = ?'
|
|
params.append(platform)
|
|
|
|
if search:
|
|
search_term = f'%{search}%'
|
|
query += ' AND (v.title LIKE ? OR v.channel_name LIKE ? OR v.url LIKE ? OR v.description LIKE ?)'
|
|
count_query += ' AND (v.title LIKE ? OR v.channel_name LIKE ? OR v.url LIKE ? OR v.description LIKE ?)'
|
|
params.extend([search_term, search_term, search_term, search_term])
|
|
|
|
# Get total count
|
|
cursor.execute(count_query, params)
|
|
total = cursor.fetchone()[0]
|
|
|
|
# Get videos with ordering
|
|
# Date sort takes precedence if specified
|
|
if date_sort == 'discovered_newest':
|
|
query += ' ORDER BY v.discovered_at DESC LIMIT ? OFFSET ?'
|
|
elif date_sort == 'discovered_oldest':
|
|
query += ' ORDER BY v.discovered_at ASC LIMIT ? OFFSET ?'
|
|
elif date_sort == 'uploaded_newest':
|
|
query += ' ORDER BY COALESCE(v.upload_date, "1970-01-01") DESC, v.discovered_at DESC LIMIT ? OFFSET ?'
|
|
elif date_sort == 'uploaded_oldest':
|
|
query += ' ORDER BY CASE WHEN v.upload_date IS NULL THEN 1 ELSE 0 END, v.upload_date ASC, v.discovered_at DESC LIMIT ? OFFSET ?'
|
|
elif date_sort == 'watched_newest':
|
|
query += ' ORDER BY COALESCE(v.status_updated_at, v.discovered_at) DESC LIMIT ? OFFSET ?'
|
|
elif resolution_sort == 'highest':
|
|
query += ' ORDER BY COALESCE(v.max_resolution, 0) DESC, v.discovered_at DESC LIMIT ? OFFSET ?'
|
|
elif resolution_sort == 'lowest':
|
|
query += ' ORDER BY CASE WHEN v.max_resolution IS NULL THEN 1 ELSE 0 END, v.max_resolution ASC, v.discovered_at DESC LIMIT ? OFFSET ?'
|
|
elif name_sort == 'name_asc':
|
|
query += ' ORDER BY v.title ASC, v.discovered_at DESC LIMIT ? OFFSET ?'
|
|
elif name_sort == 'name_desc':
|
|
query += ' ORDER BY v.title DESC, v.discovered_at DESC LIMIT ? OFFSET ?'
|
|
else:
|
|
query += ' ORDER BY v.discovered_at DESC LIMIT ? OFFSET ?'
|
|
params.extend([limit, offset])
|
|
|
|
cursor.execute(query, params)
|
|
|
|
videos = []
|
|
for row in cursor.fetchall():
|
|
videos.append({
|
|
'id': row['id'],
|
|
'preset_id': row['preset_id'],
|
|
'preset_name': row['preset_name'],
|
|
'preset_category': row['preset_category'] if 'preset_category' in row.keys() else 'other',
|
|
'celebrity_id': row['celebrity_id'],
|
|
'celebrity_name': row['celebrity_name'],
|
|
'video_id': row['video_id'],
|
|
'platform': row['platform'],
|
|
'url': row['url'],
|
|
'title': row['title'],
|
|
'channel_name': row['channel_name'],
|
|
'channel_id': row['channel_id'],
|
|
'thumbnail': row['thumbnail'],
|
|
'duration': row['duration'],
|
|
'upload_date': row['upload_date'],
|
|
'view_count': row['view_count'],
|
|
'description': row['description'],
|
|
'content_type': row['content_type'],
|
|
'status': row['status'],
|
|
'discovered_at': row['discovered_at'],
|
|
'downloaded_path': row['downloaded_path'],
|
|
'max_resolution': row['max_resolution'] if 'max_resolution' in row.keys() else None,
|
|
'max_width': row['max_width'] if 'max_width' in row.keys() else None,
|
|
'metadata': row['metadata'] if 'metadata' in row.keys() else None
|
|
})
|
|
|
|
# Filter out blocked YouTube channels
|
|
celebrity_discovery_settings = app_state.settings.get('celebrity_discovery') or {}
|
|
blocked_channels_str = celebrity_discovery_settings.get('blocked_youtube_channels', '')
|
|
if blocked_channels_str:
|
|
# Parse newline-separated list of blocked channel names (case-insensitive)
|
|
blocked_channels = set(
|
|
name.strip().lower()
|
|
for name in blocked_channels_str.split('\n')
|
|
if name.strip()
|
|
)
|
|
if blocked_channels:
|
|
original_count = len(videos)
|
|
videos = [
|
|
v for v in videos
|
|
if (v.get('channel_name') or '').lower() not in blocked_channels
|
|
]
|
|
filtered_count = original_count - len(videos)
|
|
if filtered_count > 0:
|
|
# Adjust total count for filtered results
|
|
total = max(0, total - filtered_count)
|
|
|
|
return {
|
|
"success": True,
|
|
"videos": videos,
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset
|
|
}
|
|
|
|
|
|
@router.put("/videos/{video_id}/status")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def update_video_status(
|
|
request: Request,
|
|
video_id: int,
|
|
body: VideoStatusUpdate,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Update the status of a discovered video."""
|
|
app_state = get_app_state()
|
|
|
|
valid_statuses = ['new', 'queued', 'downloaded', 'ignored', 'watched']
|
|
if body.status not in valid_statuses:
|
|
raise ValidationError(f"Invalid status. Must be one of: {', '.join(valid_statuses)}")
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
UPDATE celebrity_discovered_videos
|
|
SET status = ?, status_updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
''', (body.status, video_id))
|
|
|
|
if cursor.rowcount == 0:
|
|
raise RecordNotFoundError("Video not found")
|
|
|
|
conn.commit()
|
|
|
|
return {"success": True}
|
|
|
|
|
|
@router.get("/videos/{video_id}/stream")
|
|
@handle_exceptions
|
|
async def stream_discovered_video(
|
|
request: Request,
|
|
video_id: int,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""
|
|
Stream a downloaded discovered video file.
|
|
|
|
Returns the video file with proper Range support for seeking.
|
|
Only works for videos that have been downloaded (have downloaded_path).
|
|
"""
|
|
import os
|
|
from starlette.responses import StreamingResponse
|
|
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT downloaded_path FROM celebrity_discovered_videos WHERE id = ?', (video_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
raise RecordNotFoundError("Video not found")
|
|
|
|
file_path = row['downloaded_path']
|
|
|
|
if not file_path or not os.path.exists(file_path):
|
|
raise RecordNotFoundError("Downloaded file not found")
|
|
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
# Determine content type
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
content_type = {
|
|
'.mp4': 'video/mp4',
|
|
'.webm': 'video/webm',
|
|
'.mkv': 'video/x-matroska',
|
|
'.mov': 'video/quicktime',
|
|
'.avi': 'video/x-msvideo',
|
|
}.get(ext, 'video/mp4')
|
|
|
|
# Handle Range requests for seeking
|
|
range_header = request.headers.get("Range")
|
|
start = 0
|
|
end = file_size - 1
|
|
|
|
if range_header:
|
|
range_match = range_header.replace("bytes=", "").split("-")
|
|
if range_match[0]:
|
|
start = int(range_match[0])
|
|
if len(range_match) > 1 and range_match[1]:
|
|
end = min(int(range_match[1]), file_size - 1)
|
|
|
|
content_length = end - start + 1
|
|
|
|
def file_stream_generator():
|
|
with open(file_path, 'rb') as f:
|
|
f.seek(start)
|
|
remaining = content_length
|
|
while remaining > 0:
|
|
chunk_size = min(65536, remaining)
|
|
chunk = f.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
remaining -= len(chunk)
|
|
yield chunk
|
|
|
|
headers = {
|
|
"Accept-Ranges": "bytes",
|
|
"Content-Length": str(content_length),
|
|
"Cache-Control": "private, max-age=3600",
|
|
}
|
|
|
|
if range_header:
|
|
headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
|
|
return StreamingResponse(
|
|
file_stream_generator(),
|
|
status_code=206,
|
|
media_type=content_type,
|
|
headers=headers
|
|
)
|
|
|
|
return StreamingResponse(
|
|
file_stream_generator(),
|
|
media_type=content_type,
|
|
headers=headers
|
|
)
|
|
|
|
|
|
@router.put("/videos/bulk-status")
|
|
@limiter.limit("30/minute")
|
|
@handle_exceptions
|
|
async def update_bulk_video_status(
|
|
request: Request,
|
|
body: BulkVideoStatusUpdate,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Update status for multiple videos."""
|
|
app_state = get_app_state()
|
|
|
|
valid_statuses = ['new', 'queued', 'downloaded', 'ignored', 'watched']
|
|
if body.status not in valid_statuses:
|
|
raise ValidationError(f"Invalid status. Must be one of: {', '.join(valid_statuses)}")
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
placeholders = ','.join('?' * len(body.video_ids))
|
|
cursor.execute(f'''
|
|
UPDATE celebrity_discovered_videos
|
|
SET status = ?, status_updated_at = CURRENT_TIMESTAMP
|
|
WHERE id IN ({placeholders})
|
|
''', [body.status] + body.video_ids)
|
|
|
|
updated = cursor.rowcount
|
|
conn.commit()
|
|
|
|
return {"success": True, "updated_count": updated}
|
|
|
|
|
|
@router.delete("/videos/{video_id}")
|
|
@limiter.limit("30/minute")
|
|
@handle_exceptions
|
|
async def delete_discovered_video(
|
|
request: Request,
|
|
video_id: int,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Delete a discovered video."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('DELETE FROM celebrity_discovered_videos WHERE id = ?', (video_id,))
|
|
|
|
if cursor.rowcount == 0:
|
|
raise RecordNotFoundError("Video not found")
|
|
|
|
conn.commit()
|
|
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/cleanup-unavailable")
|
|
@limiter.limit("5/minute")
|
|
@handle_exceptions
|
|
async def cleanup_unavailable_videos(
|
|
request: Request,
|
|
limit: int = 100,
|
|
check_all: bool = False,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Check and delete videos that are no longer available on YouTube.
|
|
|
|
Args:
|
|
limit: Maximum number of videos to check
|
|
check_all: If True, check ALL videos (not just those without resolution)
|
|
"""
|
|
if check_all:
|
|
# Check all videos regardless of resolution status
|
|
asyncio.create_task(_cleanup_all_videos(limit=limit))
|
|
else:
|
|
# Only check videos without resolution (faster, catches most issues)
|
|
asyncio.create_task(enrich_videos_with_resolution(limit=limit, delete_unavailable=True))
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Cleanup started - checking up to {limit} videos for availability"
|
|
}
|
|
|
|
|
|
async def _cleanup_all_videos(limit: int = 100):
|
|
"""Check all videos for availability and delete unavailable ones."""
|
|
app_state = get_app_state()
|
|
|
|
try:
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
# Get oldest videos first (more likely to be unavailable)
|
|
cursor.execute('''
|
|
SELECT id, video_id, title, platform FROM celebrity_discovered_videos
|
|
ORDER BY discovered_at ASC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
videos = [dict(row) for row in cursor.fetchall()]
|
|
|
|
if not videos:
|
|
return
|
|
|
|
logger.info(f"Checking {len(videos)} videos for availability", module="Celebrity")
|
|
|
|
deleted = 0
|
|
checked = 0
|
|
age_restricted = 0
|
|
for video in videos:
|
|
try:
|
|
# Only check YouTube videos (dailymotion would need different handling)
|
|
if video.get('platform', 'youtube') != 'youtube':
|
|
continue
|
|
|
|
metadata = await fetch_video_metadata(video['video_id'])
|
|
checked += 1
|
|
|
|
if metadata.get('_error') == 'age_restricted':
|
|
# Age-restricted video - keep it
|
|
age_restricted += 1
|
|
logger.debug(f"Age-restricted video (keeping): {video['title'][:50]}... ({video['video_id']})", module="Celebrity")
|
|
elif metadata.get('_error') in ('unavailable', 'private', 'removed'):
|
|
# Truly unavailable - delete it
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('DELETE FROM celebrity_discovered_videos WHERE id = ?', (video['id'],))
|
|
conn.commit()
|
|
deleted += 1
|
|
logger.info(f"Deleted {metadata.get('_error')} video: {video['title'][:50]}... ({video['video_id']})", module="Celebrity")
|
|
elif not metadata or (not metadata.get('max_resolution') and not metadata.get('_error')):
|
|
# Empty response - likely unavailable
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('DELETE FROM celebrity_discovered_videos WHERE id = ?', (video['id'],))
|
|
conn.commit()
|
|
deleted += 1
|
|
logger.info(f"Deleted unavailable video: {video['title'][:50]}... ({video['video_id']})", module="Celebrity")
|
|
|
|
# Rate limit to avoid API issues
|
|
await asyncio.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error checking video {video['video_id']}: {e}", module="Celebrity")
|
|
continue
|
|
|
|
log_msg = f"Availability check complete: checked {checked}, deleted {deleted} unavailable"
|
|
if age_restricted > 0:
|
|
log_msg += f", {age_restricted} age-restricted (kept)"
|
|
logger.info(log_msg, module="Celebrity")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Availability cleanup failed: {e}", module="Celebrity")
|
|
|
|
|
|
# ============================================================================
|
|
# STATS ENDPOINTS
|
|
# ============================================================================
|
|
|
|
@router.get("/stats")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def get_celebrity_stats(
|
|
request: Request,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Get overall celebrity discovery statistics."""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Total counts
|
|
cursor.execute('SELECT COUNT(*) FROM celebrity_profiles')
|
|
total_celebrities = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM celebrity_search_presets')
|
|
total_presets = cursor.fetchone()[0]
|
|
|
|
# Count active monitored YouTube channels
|
|
cursor.execute("SELECT COUNT(*) FROM youtube_channel_monitors WHERE status = 'active'")
|
|
monitored_channels = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM celebrity_discovered_videos')
|
|
total_videos = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM celebrity_discovered_videos WHERE status = 'new'")
|
|
new_videos = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM celebrity_discovered_videos WHERE status = 'downloaded'")
|
|
downloaded_videos = cursor.fetchone()[0]
|
|
|
|
# Videos by content type
|
|
cursor.execute('''
|
|
SELECT content_type, COUNT(*) as count
|
|
FROM celebrity_discovered_videos
|
|
GROUP BY content_type
|
|
''')
|
|
by_content_type = {row['content_type']: row['count'] for row in cursor.fetchall()}
|
|
|
|
# Videos by status
|
|
cursor.execute('''
|
|
SELECT status, COUNT(*) as count
|
|
FROM celebrity_discovered_videos
|
|
GROUP BY status
|
|
''')
|
|
by_status = {row['status']: row['count'] for row in cursor.fetchall()}
|
|
|
|
# Recent discoveries
|
|
cursor.execute('''
|
|
SELECT v.*, cp.name as celebrity_name
|
|
FROM celebrity_discovered_videos v
|
|
JOIN celebrity_profiles cp ON v.celebrity_id = cp.id
|
|
ORDER BY v.discovered_at DESC
|
|
LIMIT 10
|
|
''')
|
|
recent = []
|
|
for row in cursor.fetchall():
|
|
recent.append({
|
|
'id': row['id'],
|
|
'title': row['title'],
|
|
'celebrity_name': row['celebrity_name'],
|
|
'channel_name': row['channel_name'],
|
|
'thumbnail': row['thumbnail'],
|
|
'discovered_at': row['discovered_at']
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"stats": {
|
|
"total_celebrities": total_celebrities,
|
|
"total_presets": total_presets,
|
|
"monitored_channels": monitored_channels,
|
|
"total_videos": total_videos,
|
|
"new_videos": new_videos,
|
|
"downloaded_videos": downloaded_videos,
|
|
"by_content_type": by_content_type,
|
|
"by_status": by_status
|
|
},
|
|
"recent_discoveries": recent
|
|
}
|
|
|
|
|
|
@router.post("/fetch-dates")
|
|
@limiter.limit("2/minute")
|
|
@handle_exceptions
|
|
async def fetch_missing_dates(
|
|
request: Request,
|
|
limit: int = Query(50, ge=1, le=200),
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""Fetch upload dates for videos that don't have them."""
|
|
app_state = get_app_state()
|
|
|
|
# Get videos without dates
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
SELECT id, video_id FROM celebrity_discovered_videos
|
|
WHERE upload_date IS NULL
|
|
ORDER BY discovered_at DESC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
videos = cursor.fetchall()
|
|
|
|
if not videos:
|
|
return {"success": True, "message": "All videos have dates", "updated": 0}
|
|
|
|
updated = 0
|
|
for video in videos:
|
|
try:
|
|
metadata = await fetch_video_metadata(video['video_id'])
|
|
if metadata.get('upload_date'):
|
|
# Parse and store the date
|
|
upload_date = metadata['upload_date']
|
|
if len(upload_date) == 8: # YYYYMMDD format
|
|
upload_date = datetime.strptime(upload_date, '%Y%m%d').isoformat()
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE celebrity_discovered_videos
|
|
SET upload_date = ?,
|
|
view_count = COALESCE(?, view_count),
|
|
duration = COALESCE(?, duration)
|
|
WHERE id = ?
|
|
''', (upload_date, metadata.get('view_count'), metadata.get('duration'), video['id']))
|
|
conn.commit()
|
|
updated += 1
|
|
|
|
# Small delay to avoid rate limiting
|
|
await asyncio.sleep(0.5)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch date for video {video['id']}: {e}", module="Celebrity")
|
|
continue
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Updated {updated} of {len(videos)} videos",
|
|
"updated": updated,
|
|
"total_checked": len(videos)
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# THUMBNAIL CACHING ENDPOINTS
|
|
# ============================================================================
|
|
|
|
async def download_and_cache_thumbnail(thumbnail_url: str) -> Optional[bytes]:
|
|
"""Download a thumbnail and return the binary data for caching.
|
|
|
|
Always returns JPEG format. Converts webp to jpg if needed.
|
|
For YouTube, prefers jpg URL over webp.
|
|
"""
|
|
if not thumbnail_url:
|
|
return None
|
|
|
|
# For YouTube, convert webp URLs to jpg URLs
|
|
url_to_fetch = thumbnail_url
|
|
if 'ytimg.com' in thumbnail_url:
|
|
# Convert vi_webp to vi for jpg format
|
|
url_to_fetch = thumbnail_url.replace('/vi_webp/', '/vi/')
|
|
# Also try to get higher quality by using hqdefault if we have sddefault
|
|
if 'sddefault' in url_to_fetch:
|
|
url_to_fetch = url_to_fetch.replace('sddefault', 'hqdefault')
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(
|
|
url_to_fetch,
|
|
headers={
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
|
}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
content = response.content
|
|
content_type = response.headers.get('content-type', '')
|
|
|
|
# Convert webp to jpg if needed
|
|
if 'webp' in content_type or url_to_fetch.endswith('.webp'):
|
|
try:
|
|
from PIL import Image
|
|
import io
|
|
img = Image.open(io.BytesIO(content))
|
|
if img.mode in ('RGBA', 'P'):
|
|
img = img.convert('RGB')
|
|
output = io.BytesIO()
|
|
img.save(output, format='JPEG', quality=85)
|
|
content = output.getvalue()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert webp to jpg: {e}", module="Celebrity")
|
|
|
|
return content
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cache thumbnail: {e}", module="Celebrity")
|
|
|
|
return None
|
|
|
|
|
|
@router.get("/thumbnail/{video_id}")
|
|
@limiter.limit("500/minute")
|
|
@handle_exceptions
|
|
async def get_celebrity_video_thumbnail(
|
|
request: Request,
|
|
video_id: str,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""
|
|
Serve cached video thumbnail from database.
|
|
|
|
Falls back to fetching and caching if not available.
|
|
"""
|
|
app_state = get_app_state()
|
|
|
|
with app_state.db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
SELECT thumbnail_data, thumbnail FROM celebrity_discovered_videos
|
|
WHERE video_id = ?
|
|
''', (video_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
raise RecordNotFoundError("Video not found", {"video_id": video_id})
|
|
|
|
thumbnail_data = row['thumbnail_data']
|
|
thumbnail_url = row['thumbnail']
|
|
|
|
# Serve cached data if available
|
|
if thumbnail_data:
|
|
return Response(
|
|
content=thumbnail_data,
|
|
media_type='image/jpeg',
|
|
headers={
|
|
'Cache-Control': 'public, max-age=86400',
|
|
'Access-Control-Allow-Origin': '*'
|
|
}
|
|
)
|
|
|
|
# Fetch and cache if not available
|
|
if thumbnail_url:
|
|
thumbnail_data = await download_and_cache_thumbnail(thumbnail_url)
|
|
|
|
if thumbnail_data:
|
|
# Cache in database
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE celebrity_discovered_videos
|
|
SET thumbnail_data = ?
|
|
WHERE video_id = ?
|
|
''', (thumbnail_data, video_id))
|
|
conn.commit()
|
|
|
|
return Response(
|
|
content=thumbnail_data,
|
|
media_type='image/jpeg',
|
|
headers={
|
|
'Cache-Control': 'public, max-age=86400',
|
|
'Access-Control-Allow-Origin': '*'
|
|
}
|
|
)
|
|
|
|
# If all else fails, return 404
|
|
raise RecordNotFoundError("Thumbnail not available", {"video_id": video_id})
|
|
|
|
|
|
# ============================================================================
|
|
# DOWNLOAD QUEUE ENDPOINTS
|
|
# ============================================================================
|
|
|
|
@router.post("/queue/add")
|
|
@limiter.limit("60/minute")
|
|
@handle_exceptions
|
|
async def add_video_to_download_queue(
|
|
request: Request,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""
|
|
Add a celebrity discovered video to the main download queue.
|
|
|
|
This updates the video status to 'queued' and adds it to the video_download_queue table.
|
|
"""
|
|
data = await request.json()
|
|
app_state = get_app_state()
|
|
|
|
video_id = data.get('video_id')
|
|
url = data.get('url')
|
|
|
|
if not video_id or not url:
|
|
raise ValidationError("video_id and url are required")
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Check if already in queue
|
|
cursor.execute('''
|
|
SELECT id FROM video_download_queue WHERE video_id = ? AND platform = ?
|
|
''', (video_id, data.get('platform', 'youtube')))
|
|
if cursor.fetchone():
|
|
raise ValidationError("Video already exists in download queue")
|
|
|
|
# Build metadata JSON with extra info
|
|
metadata_json = json.dumps({
|
|
'thumbnail': data.get('thumbnail', ''),
|
|
'view_count': data.get('view_count', 0)
|
|
})
|
|
|
|
# Add to video_download_queue table
|
|
cursor.execute('''
|
|
INSERT INTO video_download_queue (
|
|
platform, video_id, url, title, channel_name, thumbnail,
|
|
duration, upload_date, view_count, max_resolution, description,
|
|
source_type, source_id, source_name, priority, metadata
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
data.get('platform', 'youtube'),
|
|
video_id,
|
|
url,
|
|
data.get('title', ''),
|
|
data.get('channel_name', ''),
|
|
data.get('thumbnail', ''),
|
|
data.get('duration', 0),
|
|
data.get('upload_date'),
|
|
data.get('view_count', 0),
|
|
data.get('max_resolution'),
|
|
data.get('description', ''),
|
|
'celebrity',
|
|
data.get('source_id'),
|
|
data.get('source_name', ''),
|
|
5,
|
|
metadata_json
|
|
))
|
|
|
|
# Update celebrity_discovered_videos status
|
|
cursor.execute('''
|
|
UPDATE celebrity_discovered_videos
|
|
SET status = 'queued', status_updated_at = CURRENT_TIMESTAMP
|
|
WHERE video_id = ?
|
|
''', (video_id,))
|
|
|
|
conn.commit()
|
|
|
|
return {"success": True, "message": "Video added to download queue"}
|
|
|
|
|
|
@router.post("/queue/bulk-add")
|
|
@limiter.limit("30/minute")
|
|
@handle_exceptions
|
|
async def bulk_add_videos_to_download_queue(
|
|
request: Request,
|
|
current_user: Dict = Depends(get_current_user)
|
|
):
|
|
"""
|
|
Add multiple celebrity discovered videos to the main download queue.
|
|
"""
|
|
data = await request.json()
|
|
items = data.get('items', [])
|
|
|
|
if not items:
|
|
raise ValidationError("items array is required")
|
|
|
|
app_state = get_app_state()
|
|
added_count = 0
|
|
skipped_count = 0
|
|
|
|
with app_state.db.get_connection(for_write=True) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
for item in items:
|
|
video_id = item.get('video_id')
|
|
url = item.get('url')
|
|
|
|
if not video_id or not url:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Check if already in queue
|
|
cursor.execute('''
|
|
SELECT id FROM video_download_queue WHERE video_id = ? AND platform = ?
|
|
''', (video_id, item.get('platform', 'youtube')))
|
|
if cursor.fetchone():
|
|
skipped_count += 1
|
|
continue
|
|
|
|
try:
|
|
# Build metadata JSON with extra info
|
|
metadata_json = json.dumps({
|
|
'thumbnail': item.get('thumbnail', ''),
|
|
'view_count': item.get('view_count', 0)
|
|
})
|
|
|
|
# Add to video_download_queue table
|
|
cursor.execute('''
|
|
INSERT INTO video_download_queue (
|
|
platform, video_id, url, title, channel_name, thumbnail,
|
|
duration, upload_date, view_count, max_resolution, description,
|
|
source_type, source_id, source_name, priority, metadata
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
item.get('platform', 'youtube'),
|
|
video_id,
|
|
url,
|
|
item.get('title', ''),
|
|
item.get('channel_name', ''),
|
|
item.get('thumbnail', ''),
|
|
item.get('duration', 0),
|
|
item.get('upload_date'),
|
|
item.get('view_count', 0),
|
|
item.get('max_resolution'),
|
|
item.get('description', ''),
|
|
'celebrity',
|
|
item.get('source_id'),
|
|
item.get('source_name', ''),
|
|
5,
|
|
metadata_json
|
|
))
|
|
|
|
# Update celebrity_discovered_videos status
|
|
cursor.execute('''
|
|
UPDATE celebrity_discovered_videos
|
|
SET status = 'queued', status_updated_at = CURRENT_TIMESTAMP
|
|
WHERE video_id = ?
|
|
''', (video_id,))
|
|
|
|
added_count += 1
|
|
except Exception as e:
|
|
logger.warning(f"Error adding video {video_id} to queue: {e}", module="Celebrity")
|
|
skipped_count += 1
|
|
|
|
conn.commit()
|
|
|
|
return {
|
|
"success": True,
|
|
"added_count": added_count,
|
|
"skipped_count": skipped_count,
|
|
"message": f"Added {added_count} videos to download queue"
|
|
}
|