Files
Todd 49e72207bf Encrypt file paths in API URLs using Fernet tokens
Raw filesystem paths were exposed in browser URLs, dev tools, and proxy logs.
Now all file-serving endpoints accept an opaque encrypted token (t= param)
derived from the session secret via HKDF, with a 4-hour TTL.

Backend:
- Add core/path_tokens.py with Fernet encrypt/decrypt (HKDF from .session_secret)
- Add file_token to all list/gallery/feed/search responses across 7 routers
- Accept optional t= param on all file-serving endpoints (backward compatible)

Frontend:
- Update 4 URL helpers in api.ts to prefer token when available
- Add 4 new helpers for paid-content/embedded-metadata URLs
- Update all 14 page/component files to pass file_token to URL builders
- Add file_token to all relevant TypeScript interfaces

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 08:25:22 -04:00

1373 lines
50 KiB
Python

"""
Downloads Router
Handles all download-related endpoints:
- List downloads (with filtering and pagination)
- Download statistics
- Advanced search
- Filter options
- Analytics
- Delete downloads
"""
from typing import Dict, List, Optional
from pathlib import Path
from fastapi import APIRouter, Depends, Query, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from ..core.dependencies import get_current_user, get_app_state, require_admin
from ..core.path_tokens import encode_path
from ..core.exceptions import (
handle_exceptions,
DatabaseError,
RecordNotFoundError,
ValidationError
)
from ..core.responses import now_iso8601
from ..models.api_models import StatsResponse
from modules.universal_logger import get_logger
from ..core.utils import get_media_dimensions_batch, MEDIA_FILTERS
logger = get_logger('API')
router = APIRouter(prefix="/api/downloads", tags=["Downloads"])
limiter = Limiter(key_func=get_remote_address)
# ============================================================================
# ENDPOINTS
# ============================================================================
@router.get("/filesystem")
@limiter.limit("5000/minute")
@handle_exceptions
async def get_downloads_from_filesystem(
request: Request,
current_user: Dict = Depends(get_current_user),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
platform: Optional[str] = None,
source: Optional[str] = None,
search: Optional[str] = None,
media_type: Optional[str] = None,
face_recognition: Optional[str] = None
):
"""
Get list of downloads from file_inventory table (database-first approach).
This endpoint provides fast access to downloaded files by querying the
file_inventory table instead of scanning the filesystem.
"""
app_state = get_app_state()
db = app_state.db
if not db:
raise DatabaseError("Database not initialized")
downloads = []
with db.get_connection() as conn:
cursor = conn.cursor()
query = '''
SELECT
id, file_path, filename, platform, source,
content_type, file_size, width, height, created_date, video_id
FROM file_inventory
WHERE location = 'final'
'''
params = []
if platform:
query += ' AND platform = ?'
params.append(platform)
if source:
query += ' AND source = ?'
params.append(source)
if media_type:
query += ' AND content_type = ?'
params.append(media_type)
if search:
search_term = f'%{search}%'
query += ' AND (filename LIKE ? OR platform LIKE ? OR source LIKE ? OR content_type LIKE ?)'
params.extend([search_term, search_term, search_term, search_term])
query += ' ORDER BY file_inventory.created_date DESC'
cursor.execute(query, params)
rows = cursor.fetchall()
# Batch fetch dimensions for items missing width/height (avoids N+1 queries)
paths_needing_dimensions = [
row[1] for row in rows
if row[7] is None or row[8] is None
]
dimensions_cache = get_media_dimensions_batch(paths_needing_dimensions) if paths_needing_dimensions else {}
for row in rows:
file_path_str = row[1]
# Determine content_type from path
path_lower = file_path_str.lower()
if 'post' in path_lower:
content_type_display = 'posts'
elif 'stor' in path_lower:
content_type_display = 'stories'
elif 'reel' in path_lower:
content_type_display = 'reels'
elif 'tagged' in path_lower:
content_type_display = 'tagged'
else:
content_type_display = row[5] if row[5] else 'media'
# Use cached dimensions or existing values
if row[7] is not None and row[8] is not None:
width, height = row[7], row[8]
else:
width, height = dimensions_cache.get(file_path_str, (None, None))
downloads.append({
"id": row[0],
"platform": row[3],
"source": row[4] if row[4] else 'unknown',
"content_type": content_type_display,
"media_type": row[5],
"filename": row[2],
"file_path": file_path_str,
"file_size": row[6] if row[6] else 0,
"download_date": row[9] if row[9] else '',
"status": "completed",
"width": width,
"height": height,
"video_id": row[10] if len(row) > 10 else None
})
# Add face recognition data (batch query)
file_paths = [item['file_path'] for item in downloads if item.get('file_path')]
face_results = db.get_face_recognition_results_batch(file_paths) if file_paths else {}
for item in downloads:
file_path = item.get('file_path', '')
face_result = face_results.get(file_path)
if face_result:
item['face_recognition'] = {
'scanned': True,
'matched': face_result['has_match'],
'person_name': face_result.get('matched_person'),
'confidence': face_result.get('confidence'),
'scan_date': face_result.get('scan_date')
}
else:
item['face_recognition'] = {'scanned': False}
# Apply face recognition filter
if face_recognition:
if face_recognition == 'matched':
downloads = [d for d in downloads if d.get('face_recognition', {}).get('matched')]
elif face_recognition == 'no_match':
downloads = [d for d in downloads if d.get('face_recognition', {}).get('scanned') and not d.get('face_recognition', {}).get('matched')]
elif face_recognition == 'not_scanned':
downloads = [d for d in downloads if not d.get('face_recognition', {}).get('scanned')]
# Apply pagination
total = len(downloads)
downloads = downloads[offset:offset + limit]
return {
"results": downloads,
"total": total,
"limit": limit,
"offset": offset
}
@router.get("")
@limiter.limit("500/minute")
@handle_exceptions
async def get_downloads(
request: Request,
current_user: Dict = Depends(get_current_user),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
platform: Optional[str] = None,
source: Optional[str] = None,
media_type: Optional[str] = Query(None, alias="mediaType") # Accept both snake_case and camelCase
):
"""
Get list of downloads with optional filtering.
Returns paginated results with total count.
"""
app_state = get_app_state()
db = app_state.db
if not db:
raise DatabaseError("Database not initialized")
with db.get_connection() as conn:
cursor = conn.cursor()
query = '''
SELECT
fi.id,
fi.platform,
fi.source,
fi.content_type,
fi.filename,
fi.file_path,
fi.file_size,
fi.created_date as download_date,
fi.created_date as post_date,
NULL as status,
fi.video_id
FROM file_inventory fi
WHERE fi.location = 'final'
'''
params = []
if platform:
query += ' AND fi.platform = ?'
params.append(platform)
if source:
query += ' AND fi.source = ?'
params.append(source)
if media_type and media_type != 'all':
query += ' AND fi.content_type = ?'
params.append(media_type)
# Get total count
count_query = '''
SELECT COUNT(*)
FROM file_inventory fi
WHERE fi.location = 'final'
'''
count_params = []
if platform:
count_query += ' AND fi.platform = ?'
count_params.append(platform)
if source:
count_query += ' AND fi.source = ?'
count_params.append(source)
if media_type and media_type != 'all':
count_query += ' AND fi.content_type = ?'
count_params.append(media_type)
cursor.execute(count_query, count_params)
total_count = cursor.fetchone()[0]
query += " ORDER BY fi.created_date DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
rows = cursor.fetchall()
results = [
{
"id": row[0],
"platform": row[1],
"source": row[2],
"content_type": row[3],
"filename": row[4],
"file_path": row[5],
"file_size": row[6],
"download_date": row[7],
"post_date": row[8],
"status": row[9],
"video_id": row[10] if len(row) > 10 else None
}
for row in rows
]
return {
"results": results,
"total": total_count,
"limit": limit,
"offset": offset
}
@router.get("/stats", response_model=StatsResponse)
@limiter.limit("100/minute")
@handle_exceptions
async def get_download_stats(request: Request, current_user: Dict = Depends(get_current_user)):
"""
Get download statistics.
Results are cached for 5 minutes for performance.
"""
from cache_manager import cache_manager
app_state = get_app_state()
db = app_state.db
if not db:
raise DatabaseError("Database not initialized")
# Try cache first
cache_key = "stats:download_stats"
cached_result = cache_manager.get(cache_key)
if cached_result:
return StatsResponse(**cached_result)
with db.get_connection() as conn:
cursor = conn.cursor()
# Total downloads - combine file_inventory (final+review) with recycle_bin table
cursor.execute("""
SELECT
(SELECT COUNT(*) FROM file_inventory WHERE location IN ('final', 'review')) +
(SELECT COUNT(*) FROM recycle_bin)
""")
total = cursor.fetchone()[0]
# By platform - use file_inventory for accurate counts (includes all sources)
cursor.execute("""
SELECT platform, COUNT(*) as cnt
FROM file_inventory
WHERE location IN ('final', 'review') AND platform IS NOT NULL
GROUP BY platform
ORDER BY cnt DESC
""")
by_platform = {row[0]: row[1] for row in cursor.fetchall()}
# Total size from file_inventory (final + review)
cursor.execute("""
SELECT COALESCE(SUM(file_size), 0) FROM file_inventory
WHERE location IN ('final', 'review')
""")
total_size = cursor.fetchone()[0]
# Recent 24h - combine downloads and video_downloads
cursor.execute(f"""
SELECT
(SELECT COUNT(*) FROM downloads WHERE download_date >= datetime('now', '-1 day') AND {MEDIA_FILTERS}) +
(SELECT COUNT(*) FROM video_downloads WHERE download_date >= datetime('now', '-1 day'))
""")
recent_24h = cursor.fetchone()[0]
# Duplicates prevented (SHA hash + perceptual hash)
cursor.execute(f"""
SELECT COUNT(*) FROM (
SELECT file_hash FROM downloads
WHERE file_hash IS NOT NULL
AND {MEDIA_FILTERS}
GROUP BY file_hash
HAVING COUNT(*) > 1
)
""")
hash_duplicates = cursor.fetchone()[0]
# Instagram perceptual hash duplicates
perceptual_duplicates = 0
try:
cursor.execute("""
SELECT COUNT(*) FROM (
SELECT perceptual_hash FROM instagram_perceptual_hashes
WHERE perceptual_hash IS NOT NULL
AND perceptual_hash != ''
GROUP BY perceptual_hash
HAVING COUNT(*) > 1
)
""")
perceptual_duplicates += cursor.fetchone()[0]
except Exception as e:
logger.debug(f"Could not count instagram perceptual duplicates: {e}", module="Downloads")
# Paid content perceptual hash duplicates
try:
cursor.execute("""
SELECT COUNT(*) FROM (
SELECT perceptual_hash FROM paid_content_attachments
WHERE perceptual_hash IS NOT NULL
AND perceptual_hash != ''
AND status = 'completed'
GROUP BY perceptual_hash
HAVING COUNT(*) > 1
)
""")
perceptual_duplicates += cursor.fetchone()[0]
except Exception as e:
logger.debug(f"Could not count paid content perceptual duplicates: {e}", module="Downloads")
duplicates = hash_duplicates + perceptual_duplicates
# Review queue count
cursor.execute("""
SELECT COUNT(*) FROM file_inventory
WHERE location = 'review'
""")
review_queue_count = cursor.fetchone()[0]
# Recycle bin count
cursor.execute("SELECT COUNT(*) FROM recycle_bin")
recycle_bin_count = cursor.fetchone()[0]
result = StatsResponse(
total_downloads=total,
by_platform=by_platform,
total_size=total_size,
recent_24h=recent_24h,
duplicates_prevented=duplicates,
review_queue_count=review_queue_count,
recycle_bin_count=recycle_bin_count
)
# Cache for 5 minutes
cache_manager.set(cache_key, result.model_dump())
return result
@router.get("/filter-options")
@router.get("/filters") # Alias for backwards compatibility
@limiter.limit("100/minute")
@handle_exceptions
async def get_filter_options(
request: Request,
platform: Optional[str] = Query(None, description="Filter sources by platform"),
current_user: Dict = Depends(get_current_user)
):
"""
Get available filter options for downloads.
Returns distinct platforms, sources, and content types.
Cached for 5 minutes to avoid repeated aggregate queries.
"""
from cache_manager import cache_manager
# Check cache first (5 minute TTL)
cache_key = f"filter_options:{platform or 'all'}"
cached_result = cache_manager.get(cache_key)
if cached_result:
return cached_result
app_state = get_app_state()
db = app_state.db
if not db:
raise DatabaseError("Database not initialized")
with db.get_connection() as conn:
cursor = conn.cursor()
# Get distinct platforms
cursor.execute("""
SELECT DISTINCT platform FROM file_inventory
WHERE location = 'final' AND platform IS NOT NULL
ORDER BY platform
""")
platforms = [row[0] for row in cursor.fetchall()]
# Get distinct sources (optionally filtered by platform)
if platform:
cursor.execute("""
SELECT DISTINCT source FROM file_inventory
WHERE location = 'final' AND source IS NOT NULL AND platform = ?
ORDER BY source
""", (platform,))
else:
cursor.execute("""
SELECT DISTINCT source FROM file_inventory
WHERE location = 'final' AND source IS NOT NULL
ORDER BY source
""")
sources = [row[0] for row in cursor.fetchall()]
# Get distinct content types
cursor.execute("""
SELECT DISTINCT content_type FROM file_inventory
WHERE location = 'final' AND content_type IS NOT NULL
ORDER BY content_type
""")
content_types = [row[0] for row in cursor.fetchall()]
result = {
"platforms": platforms,
"sources": sources,
"content_types": content_types,
"media_types": ["image", "video"], # Static list of valid media types
"sort_fields": ["download_date", "post_date", "file_size", "filename"],
"sort_orders": ["asc", "desc"]
}
# Cache for 5 minutes
cache_manager.set(cache_key, result, ttl=300)
return result
@router.get("/search")
@limiter.limit("5000/minute")
@handle_exceptions
async def advanced_search_downloads(
request: Request,
current_user: Dict = Depends(get_current_user),
limit: int = Query(100, ge=1, le=1000, description="Max items to return"),
offset: int = Query(0, ge=0, description="Number of items to skip"),
query: Optional[str] = Query(None, max_length=500, description="Search term"),
platforms: Optional[str] = None,
sources: Optional[str] = None,
content_types: Optional[str] = None,
media_type: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
size_min: Optional[int] = Query(None, ge=0),
size_max: Optional[int] = Query(None, ge=0),
sort_by: str = Query("download_date", pattern="^(download_date|file_size|filename|source|post_date)$"),
sort_order: str = Query("desc", pattern="^(asc|desc)$")
):
"""
Advanced search with comprehensive filtering and sorting.
Supports:
- Text search in filename and source
- Multiple platform/source filters (comma-separated)
- Date range filtering
- File size range filtering
- Configurable sorting
"""
app_state = get_app_state()
db = app_state.db
if not db:
raise DatabaseError("Database not initialized")
with db.get_connection() as conn:
cursor = conn.cursor()
# Only show files from final location
location_filter = "fi.location = 'final'"
base_query = f"""
SELECT fi.id, fi.platform, fi.source, fi.content_type, fi.filename, fi.file_path,
fi.file_size,
COALESCE(d_agg.max_download_date, fi.created_date) as download_date,
d_agg.max_post_date as post_date,
NULL as status,
fi.width, fi.height
FROM file_inventory fi
LEFT JOIN (
SELECT filename, MAX(download_date) as max_download_date, MAX(post_date) as max_post_date
FROM downloads
GROUP BY filename
) d_agg ON d_agg.filename = fi.filename
WHERE {location_filter}
"""
conditions = []
params = []
# Text search
if query:
conditions.append("(fi.filename LIKE ? OR fi.source LIKE ?)")
search_term = f"%{query}%"
params.extend([search_term, search_term])
# Platform filter
if platforms:
platform_list = [p.strip() for p in platforms.split(',')]
placeholders = ','.join(['?'] * len(platform_list))
conditions.append(f"fi.platform IN ({placeholders})")
params.extend(platform_list)
# Source filter
if sources:
source_list = [s.strip() for s in sources.split(',')]
placeholders = ','.join(['?'] * len(source_list))
conditions.append(f"fi.source IN ({placeholders})")
params.extend(source_list)
# Content type filter
if content_types:
ct_list = [c.strip() for c in content_types.split(',')]
placeholders = ','.join(['?'] * len(ct_list))
conditions.append(f"fi.content_type IN ({placeholders})")
params.extend(ct_list)
# Media type filter
if media_type:
conditions.append("fi.content_type = ?")
params.append(media_type)
# Date range
if date_from:
conditions.append("fi.created_date >= ?")
params.append(date_from)
if date_to:
conditions.append("fi.created_date <= ?")
params.append(date_to)
# Size range
if size_min is not None:
conditions.append("fi.file_size >= ?")
params.append(size_min)
if size_max is not None:
conditions.append("fi.file_size <= ?")
params.append(size_max)
# Build full query
if conditions:
base_query += " AND " + " AND ".join(conditions)
# Get total count
count_query = f"SELECT COUNT(*) FROM file_inventory fi WHERE {location_filter}"
if conditions:
count_query += " AND " + " AND ".join(conditions)
cursor.execute(count_query, params)
total = cursor.fetchone()[0]
# Add sorting
sort_column_map = {
'download_date': 'download_date',
'file_size': 'fi.file_size',
'filename': 'fi.filename',
'source': 'fi.source',
'post_date': 'post_date'
}
sort_col = sort_column_map.get(sort_by, 'download_date')
if sort_order.upper() not in ('ASC', 'DESC'):
sort_order = 'desc'
base_query += f" ORDER BY {sort_col} {sort_order.upper()}"
# Add pagination
base_query += " LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(base_query, params)
rows = cursor.fetchall()
results = [
{
"id": row[0],
"platform": row[1],
"source": row[2],
"content_type": row[3],
"filename": row[4],
"file_path": row[5],
"file_token": encode_path(row[5]) if row[5] else None,
"file_size": row[6],
"download_date": row[7],
"post_date": row[8],
"status": row[9],
"width": row[10],
"height": row[11]
}
for row in rows
]
return {
"results": results,
"total": total,
"limit": limit,
"offset": offset,
"has_more": offset + limit < total
}
@router.delete("/{download_id}")
@limiter.limit("100/minute")
@handle_exceptions
async def delete_download(
download_id: int,
request: Request,
current_user: Dict = Depends(require_admin) # Security: Require admin for delete operations
):
"""
Delete a download record (admin only).
Note: This only removes the database record, not the actual file.
Use the media/batch-delete endpoint to delete files.
"""
app_state = get_app_state()
db = app_state.db
if not db:
raise DatabaseError("Database not initialized")
with db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Check if record exists
cursor.execute("SELECT id FROM downloads WHERE id = ?", (download_id,))
if not cursor.fetchone():
raise RecordNotFoundError(
f"Download not found",
{"id": download_id}
)
# Delete the record
cursor.execute("DELETE FROM downloads WHERE id = ?", (download_id,))
conn.commit()
logger.info(f"Deleted download record {download_id}", module="Downloads")
return {
"success": True,
"message": f"Download {download_id} deleted",
"timestamp": now_iso8601()
}
# ============================================================================
# DAY-GROUPED ENDPOINT FOR NEW DOWNLOADS PAGE
# ============================================================================
@router.get("/by-day")
@limiter.limit("5000/minute")
@handle_exceptions
async def get_downloads_by_day(
request: Request,
current_user: Dict = Depends(get_current_user),
location: str = Query('media', description="Location filter: all, media, review, or recycle"),
platform: Optional[str] = None,
source: Optional[str] = None,
limit_days: int = Query(7, ge=1, le=90, description="Max days to return"),
offset_date: Optional[str] = None,
items_per_day: int = Query(100, ge=1, le=500, description="Max items per day"),
date_from: Optional[str] = Query(None, pattern="^\\d{4}-\\d{2}-\\d{2}$", description="Filter by date from (YYYY-MM-DD)"),
date_to: Optional[str] = Query(None, pattern="^\\d{4}-\\d{2}-\\d{2}$", description="Filter by date to (YYYY-MM-DD)"),
size_min: Optional[int] = Query(None, ge=0, description="Minimum file size in bytes"),
size_max: Optional[int] = Query(None, ge=0, description="Maximum file size in bytes"),
search: Optional[str] = Query(None, max_length=200, description="Search filename, platform, or source")
):
"""
Get downloads grouped by day for the new Downloads page UI.
Returns data organized by day with thumbnails for each day's downloads.
Supports all, media (final), review, and recycle bin locations.
"""
app_state = get_app_state()
db = app_state.db
if not db:
raise DatabaseError("Database not initialized")
if location not in ('all', 'media', 'review', 'recycle'):
raise ValidationError("Location must be 'all', 'media', 'review', or 'recycle'")
days_data = []
with db.get_connection() as conn:
cursor = conn.cursor()
# Build platform/source filter conditions
# Use fi. prefix for file_inventory queries (aliased as fi)
platform_filter_inventory = ''
platform_filter_recycle = ''
source_filter_inventory = ''
source_filter_recycle = ''
size_filter_inventory = ''
size_filter_recycle = ''
search_filter_inventory = ''
search_filter_recycle = ''
params = []
if platform:
platform_filter_inventory = ' AND fi.platform = ?'
platform_filter_recycle = " AND json_extract(metadata, '$.platform') = ?"
params.append(platform)
if source:
source_filter_inventory = ' AND fi.source = ?'
source_filter_recycle = " AND json_extract(metadata, '$.source') = ?"
params.append(source)
if size_min is not None:
size_filter_inventory += ' AND fi.file_size >= ?'
size_filter_recycle += ' AND file_size >= ?'
params.append(size_min)
if size_max is not None:
size_filter_inventory += ' AND fi.file_size <= ?'
size_filter_recycle += ' AND file_size <= ?'
params.append(size_max)
if search:
search_filter_inventory = ' AND (fi.filename LIKE ? OR fi.platform LIKE ? OR fi.source LIKE ?)'
search_filter_recycle = ' AND (original_filename LIKE ? OR json_extract(metadata, \'$.platform\') LIKE ? OR json_extract(metadata, \'$.source\') LIKE ?)'
params.extend([f'%{search}%', f'%{search}%', f'%{search}%'])
if location == 'all':
# UNION query combining all three sources with location indicator
base_query = f'''
SELECT
'media_' || CAST(fi.id AS TEXT) as id, fi.file_path, fi.filename, fi.platform, fi.source,
fi.content_type, fi.file_size, fi.width, fi.height,
fi.created_date as item_date,
NULL as deleted_from, 'media' as location_type,
fi.created_date as post_date,
fi.video_id
FROM file_inventory fi
WHERE fi.location = 'final' {platform_filter_inventory} {source_filter_inventory} {size_filter_inventory} {search_filter_inventory}
UNION ALL
SELECT
'review_' || CAST(fi.id AS TEXT) as id, fi.file_path, fi.filename, fi.platform, fi.source,
fi.content_type, fi.file_size, fi.width, fi.height,
fi.created_date as item_date,
NULL as deleted_from, 'review' as location_type,
fi.created_date as post_date,
fi.video_id
FROM file_inventory fi
WHERE fi.location = 'review' {platform_filter_inventory} {source_filter_inventory} {size_filter_inventory} {search_filter_inventory}
UNION ALL
SELECT
'recycle_' || CAST(id AS TEXT) as id, original_path, original_filename,
json_extract(metadata, '$.platform') as platform,
json_extract(metadata, '$.source') as source,
json_extract(metadata, '$.content_type') as content_type,
file_size,
json_extract(metadata, '$.width') as width,
json_extract(metadata, '$.height') as height,
deleted_at as item_date, deleted_from, 'recycle' as location_type,
json_extract(metadata, '$.post_date') as post_date,
json_extract(metadata, '$.video_id') as video_id
FROM recycle_bin
WHERE 1=1 {platform_filter_recycle} {source_filter_recycle} {size_filter_recycle} {search_filter_recycle}
'''
# Duplicate params for each part of the UNION that uses them
all_params = []
if platform:
all_params.append(platform) # media
if source:
all_params.append(source) # media
if size_min is not None:
all_params.append(size_min) # media
if size_max is not None:
all_params.append(size_max) # media
if search:
all_params.extend([f'%{search}%', f'%{search}%', f'%{search}%']) # media (3 params for OR)
if platform:
all_params.append(platform) # review
if source:
all_params.append(source) # review
if size_min is not None:
all_params.append(size_min) # review
if size_max is not None:
all_params.append(size_max) # review
if search:
all_params.extend([f'%{search}%', f'%{search}%', f'%{search}%']) # review (3 params for OR)
if platform:
all_params.append(platform) # recycle
if source:
all_params.append(source) # recycle
if size_min is not None:
all_params.append(size_min) # recycle
if size_max is not None:
all_params.append(size_max) # recycle
if search:
all_params.extend([f'%{search}%', f'%{search}%', f'%{search}%']) # recycle (3 params for OR)
params = all_params
date_field = 'item_date'
elif location == 'recycle':
# Query recycle_bin table - platform/source are in JSON metadata column
base_query = f'''
SELECT
id, original_path, original_filename,
json_extract(metadata, '$.platform') as platform,
json_extract(metadata, '$.source') as source,
json_extract(metadata, '$.content_type') as content_type,
file_size,
json_extract(metadata, '$.width') as width,
json_extract(metadata, '$.height') as height,
deleted_at, deleted_from, 'recycle' as location_type,
json_extract(metadata, '$.post_date') as post_date,
json_extract(metadata, '$.video_id') as video_id
FROM recycle_bin
WHERE 1=1 {platform_filter_recycle} {source_filter_recycle} {size_filter_recycle} {search_filter_recycle}
'''
date_field = 'deleted_at'
elif location == 'review':
# Query file_inventory with location='review'
base_query = f'''
SELECT
fi.id, fi.file_path, fi.filename, fi.platform, fi.source,
fi.content_type, fi.file_size, fi.width, fi.height,
fi.created_date as item_date,
NULL as deleted_from, 'review' as location_type,
fi.created_date as post_date,
fi.video_id
FROM file_inventory fi
WHERE fi.location = 'review' {platform_filter_inventory} {source_filter_inventory} {size_filter_inventory} {search_filter_inventory}
'''
date_field = 'item_date'
else:
# Query file_inventory with location='final' (media)
base_query = f'''
SELECT
fi.id, fi.file_path, fi.filename, fi.platform, fi.source,
fi.content_type, fi.file_size, fi.width, fi.height,
fi.created_date as item_date,
NULL as deleted_from, 'media' as location_type,
fi.created_date as post_date,
fi.video_id
FROM file_inventory fi
WHERE fi.location = 'final' {platform_filter_inventory} {source_filter_inventory} {size_filter_inventory} {search_filter_inventory}
'''
date_field = 'item_date'
# Wrap base query for easier date extraction
if location == 'all':
# For UNION, wrap the whole query
wrapped_query = f'SELECT * FROM ({base_query}) AS combined'
date_col = 'item_date'
else:
wrapped_query = base_query
date_col = date_field
# Build offset date filter and date range filters
date_filter = ''
date_filter_params = []
if offset_date:
date_filter += f' AND DATE({date_col}) < DATE(?)'
date_filter_params.append(offset_date)
if date_from:
date_filter += f' AND DATE({date_col}) >= DATE(?)'
date_filter_params.append(date_from)
if date_to:
date_filter += f' AND DATE({date_col}) <= DATE(?)'
date_filter_params.append(date_to)
# Get distinct dates first
date_query = f'''
SELECT DISTINCT DATE({date_col}) as day
FROM ({wrapped_query}) AS q
WHERE {date_col} IS NOT NULL {date_filter}
ORDER BY day DESC
LIMIT ?
'''
params_for_dates = params.copy() + date_filter_params + [limit_days]
cursor.execute(date_query, params_for_dates)
distinct_dates = [str(row[0]) for row in cursor.fetchall()]
# For each date, get the items
for day_str in distinct_dates:
if not day_str:
continue
day_query = f'''
SELECT * FROM ({wrapped_query}) AS q
WHERE DATE({date_col}) = DATE(?)
ORDER BY {date_col} DESC
LIMIT ?
'''
day_params = params.copy() + [day_str, items_per_day]
cursor.execute(day_query, day_params)
rows = cursor.fetchall()
# Count total for this day
count_query = f'''
SELECT COUNT(*) FROM ({wrapped_query}) AS q WHERE DATE({date_col}) = DATE(?)
'''
cursor.execute(count_query, params + [day_str])
day_count = cursor.fetchone()[0]
items = []
for row in rows:
file_path = row[1]
# Determine media type from filename
filename_lower = (row[2] or '').lower()
if any(filename_lower.endswith(ext) for ext in ['.mp4', '.mov', '.webm', '.avi', '.mkv', '.flv', '.m4v']):
media_type = 'video'
else:
media_type = 'image'
# Get location_type from row[11] (added to all queries)
item_location = row[11] if len(row) > 11 else location
# Get post_date from row[12] (added to all queries)
post_date = row[12] if len(row) > 12 else None
# Get video_id from row[13] (added to all queries)
video_id = row[13] if len(row) > 13 else None
items.append({
"id": row[0],
"file_path": file_path,
"filename": row[2],
"platform": row[3],
"source": row[4] or 'unknown',
"content_type": row[5],
"media_type": media_type,
"file_size": row[6] or 0,
"width": row[7],
"height": row[8],
"download_date": row[9],
"post_date": post_date,
"deleted_from": row[10] if item_location == 'recycle' else None,
"location_type": item_location,
"video_id": video_id
})
# Calculate per-day summary from items
day_by_location = {"media": 0, "review": 0, "recycle": 0}
day_by_platform = {}
for item in items:
loc = item.get("location_type", "media")
day_by_location[loc] = day_by_location.get(loc, 0) + 1
plat = item.get("platform")
if plat:
day_by_platform[plat] = day_by_platform.get(plat, 0) + 1
# Sort platforms by count
day_by_platform = dict(sorted(day_by_platform.items(), key=lambda x: -x[1]))
days_data.append({
"date": day_str,
"count": day_count,
"items": items,
"summary": {
"by_location": day_by_location,
"by_platform": day_by_platform
}
})
# Calculate totals
total_days = len(days_data)
total_items = sum(d['count'] for d in days_data)
# Get summary statistics (total counts across all data, not just loaded days)
summary = {"by_location": {}, "by_platform": {}}
with db.get_connection() as conn:
cursor = conn.cursor()
# Get counts by location
cursor.execute("SELECT COUNT(*) FROM file_inventory WHERE location = 'final'")
summary["by_location"]["media"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM file_inventory WHERE location = 'review'")
summary["by_location"]["review"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM recycle_bin")
summary["by_location"]["recycle"] = cursor.fetchone()[0]
# Get platform breakdown based on current location filter
if location == 'all':
# Combine all platforms
cursor.execute("""
SELECT platform, COUNT(*) as count FROM file_inventory
WHERE platform IS NOT NULL
GROUP BY platform
""")
platform_counts = {row[0]: row[1] for row in cursor.fetchall()}
cursor.execute("""
SELECT json_extract(metadata, '$.platform') as platform, COUNT(*) as count
FROM recycle_bin
WHERE json_extract(metadata, '$.platform') IS NOT NULL
GROUP BY platform
""")
for row in cursor.fetchall():
if row[0]:
platform_counts[row[0]] = platform_counts.get(row[0], 0) + row[1]
summary["by_platform"] = dict(sorted(platform_counts.items(), key=lambda x: -x[1]))
elif location == 'recycle':
cursor.execute("""
SELECT json_extract(metadata, '$.platform') as platform, COUNT(*) as count
FROM recycle_bin
WHERE json_extract(metadata, '$.platform') IS NOT NULL
GROUP BY platform
ORDER BY count DESC
""")
summary["by_platform"] = {row[0]: row[1] for row in cursor.fetchall() if row[0]}
else:
loc = 'review' if location == 'review' else 'final'
cursor.execute("""
SELECT platform, COUNT(*) as count FROM file_inventory
WHERE location = ? AND platform IS NOT NULL
GROUP BY platform
ORDER BY count DESC
""", (loc,))
summary["by_platform"] = {row[0]: row[1] for row in cursor.fetchall()}
return {
"days": days_data,
"total_days": total_days,
"total_items": total_items,
"location": location,
"has_more": len(distinct_dates) == limit_days,
"summary": summary
}
@router.get("/by-day/filters")
@limiter.limit("100/minute")
@handle_exceptions
async def get_by_day_filters(
request: Request,
location: str = Query('media', description="Location filter: all, media, review, or recycle"),
platform: Optional[str] = None,
current_user: Dict = Depends(get_current_user)
):
"""
Get available filter options for day-grouped downloads.
Returns distinct platforms and sources for the specified location.
"""
app_state = get_app_state()
db = app_state.db
if not db:
raise DatabaseError("Database not initialized")
if location not in ('all', 'media', 'review', 'recycle'):
raise ValidationError("Location must be 'all', 'media', 'review', or 'recycle'")
with db.get_connection() as conn:
cursor = conn.cursor()
if location == 'all':
# Combine platforms and sources from all locations
cursor.execute("""
SELECT DISTINCT platform FROM file_inventory
WHERE platform IS NOT NULL
UNION
SELECT DISTINCT json_extract(metadata, '$.platform') as platform
FROM recycle_bin
WHERE json_extract(metadata, '$.platform') IS NOT NULL
ORDER BY platform
""")
platforms = [row[0] for row in cursor.fetchall() if row[0]]
if platform:
cursor.execute("""
SELECT DISTINCT source FROM file_inventory
WHERE source IS NOT NULL AND platform = ?
UNION
SELECT DISTINCT json_extract(metadata, '$.source') as source
FROM recycle_bin
WHERE json_extract(metadata, '$.source') IS NOT NULL
AND json_extract(metadata, '$.platform') = ?
ORDER BY source
""", (platform, platform))
else:
cursor.execute("""
SELECT DISTINCT source FROM file_inventory
WHERE source IS NOT NULL
UNION
SELECT DISTINCT json_extract(metadata, '$.source') as source
FROM recycle_bin
WHERE json_extract(metadata, '$.source') IS NOT NULL
ORDER BY source
""")
sources = [row[0] for row in cursor.fetchall() if row[0]]
elif location == 'recycle':
# Query recycle_bin table - platform/source are in JSON metadata
cursor.execute("""
SELECT DISTINCT json_extract(metadata, '$.platform') as platform
FROM recycle_bin
WHERE json_extract(metadata, '$.platform') IS NOT NULL
ORDER BY platform
""")
platforms = [row[0] for row in cursor.fetchall() if row[0]]
if platform:
cursor.execute("""
SELECT DISTINCT json_extract(metadata, '$.source') as source
FROM recycle_bin
WHERE json_extract(metadata, '$.source') IS NOT NULL
AND json_extract(metadata, '$.platform') = ?
ORDER BY source
""", (platform,))
else:
cursor.execute("""
SELECT DISTINCT json_extract(metadata, '$.source') as source
FROM recycle_bin
WHERE json_extract(metadata, '$.source') IS NOT NULL
ORDER BY source
""")
sources = [row[0] for row in cursor.fetchall() if row[0]]
else:
# Query file_inventory
loc = 'review' if location == 'review' else 'final'
cursor.execute("""
SELECT DISTINCT platform FROM file_inventory
WHERE location = ? AND platform IS NOT NULL
ORDER BY platform
""", (loc,))
platforms = [row[0] for row in cursor.fetchall()]
if platform:
cursor.execute("""
SELECT DISTINCT source FROM file_inventory
WHERE location = ? AND source IS NOT NULL AND platform = ?
ORDER BY source
""", (loc, platform))
else:
cursor.execute("""
SELECT DISTINCT source FROM file_inventory
WHERE location = ? AND source IS NOT NULL
ORDER BY source
""", (loc,))
sources = [row[0] for row in cursor.fetchall()]
return {
"platforms": platforms,
"sources": sources,
"location": location
}
# ============================================================================
# ANALYTICS ENDPOINT
# ============================================================================
@router.get("/analytics")
@limiter.limit("5000/minute")
@handle_exceptions
async def get_download_analytics(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get advanced analytics and statistics - cached for 5 minutes."""
from cache_manager import cache_manager
app_state = get_app_state()
# Try cache first
cache_key = "stats:analytics"
cached_result = cache_manager.get(cache_key)
if cached_result:
return cached_result
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
# Downloads per day (last 30 days) - use shared MEDIA_FILTERS constant
cursor.execute(f"""
SELECT DATE(download_date) as date, COUNT(*) as count
FROM downloads
WHERE download_date >= datetime('now', '-30 days')
AND {MEDIA_FILTERS}
GROUP BY DATE(download_date)
ORDER BY date DESC
""")
downloads_per_day = [{'date': str(row[0]), 'count': row[1]} for row in cursor.fetchall()]
# File type breakdown
cursor.execute(f"""
SELECT
CASE
WHEN filename LIKE '%.jpg' OR filename LIKE '%.jpeg' OR
filename LIKE '%.png' OR filename LIKE '%.gif' OR
filename LIKE '%.heic' OR filename LIKE '%.heif' THEN 'image'
WHEN filename LIKE '%.mp4' OR filename LIKE '%.mov' OR
filename LIKE '%.webm' OR filename LIKE '%.avi' OR
filename LIKE '%.mkv' OR filename LIKE '%.flv' THEN 'video'
WHEN filename LIKE '%.m4a' OR filename LIKE '%.mp3' THEN 'audio'
ELSE 'other'
END as type,
COUNT(*) as count,
SUM(file_size) as total_size
FROM downloads
WHERE {MEDIA_FILTERS}
GROUP BY type
""")
file_types = [{'type': row[0], 'count': row[1], 'size': row[2] or 0} for row in cursor.fetchall()]
# Storage by platform (from file_inventory database)
storage_by_platform = []
try:
cursor.execute("""
SELECT
platform,
COUNT(*) as count,
COALESCE(SUM(file_size), 0) as total_size
FROM file_inventory
WHERE location = 'final'
GROUP BY platform
ORDER BY total_size DESC
""")
storage_by_platform = [
{
'platform': row[0],
'count': row[1],
'total_size': row[2],
'avg_size': row[2] / row[1] if row[1] > 0 else 0
}
for row in cursor.fetchall()
]
except Exception as e:
logger.error(f"Error calculating storage by platform: {e}", module="Error")
storage_by_platform = []
# Top sources (most downloads)
cursor.execute(f"""
SELECT source, COUNT(*) as count, platform
FROM downloads
WHERE {MEDIA_FILTERS}
GROUP BY source, platform
ORDER BY count DESC
LIMIT 10
""")
top_sources = [
{'source': row[0], 'count': row[1], 'platform': row[2]}
for row in cursor.fetchall()
]
# Download trends (hourly distribution)
cursor.execute(f"""
SELECT strftime('%H', download_date) as hour, COUNT(*) as count
FROM downloads
WHERE download_date >= datetime('now', '-7 days')
AND {MEDIA_FILTERS}
GROUP BY hour
ORDER BY hour
""")
hourly_distribution = [{'hour': int(row[0]), 'count': row[1]} for row in cursor.fetchall()]
# Weekly comparison
cursor.execute(f"""
SELECT
CASE
WHEN download_date >= datetime('now', '-7 days') THEN 'this_week'
WHEN download_date >= datetime('now', '-14 days') THEN 'last_week'
ELSE 'older'
END as period,
COUNT(*) as count
FROM downloads
WHERE download_date >= datetime('now', '-14 days')
AND {MEDIA_FILTERS}
GROUP BY period
""")
weekly_data = {row[0]: row[1] for row in cursor.fetchall()}
# Growth rate
this_week = weekly_data.get('this_week', 0)
last_week = weekly_data.get('last_week', 1) # Avoid division by zero
growth_rate = ((this_week - last_week) / last_week * 100) if last_week > 0 else 0
result = {
'downloads_per_day': downloads_per_day,
'file_types': file_types,
'storage_by_platform': storage_by_platform,
'top_sources': top_sources,
'hourly_distribution': hourly_distribution,
'weekly_comparison': {
'this_week': this_week,
'last_week': last_week,
'growth_rate': growth_rate
}
}
# Cache the result for 5 minutes
cache_manager.set(cache_key, result)
return result