Files
Todd 49e72207bf Encrypt file paths in API URLs using Fernet tokens
Raw filesystem paths were exposed in browser URLs, dev tools, and proxy logs.
Now all file-serving endpoints accept an opaque encrypted token (t= param)
derived from the session secret via HKDF, with a 4-hour TTL.

Backend:
- Add core/path_tokens.py with Fernet encrypt/decrypt (HKDF from .session_secret)
- Add file_token to all list/gallery/feed/search responses across 7 routers
- Accept optional t= param on all file-serving endpoints (backward compatible)

Frontend:
- Update 4 URL helpers in api.ts to prefer token when available
- Add 4 new helpers for paid-content/embedded-metadata URLs
- Update all 14 page/component files to pass file_token to URL builders
- Add file_token to all relevant TypeScript interfaces

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 08:25:22 -04:00

1035 lines
40 KiB
Python

"""
Review Router
Handles review queue operations:
- List files in review queue
- Filter options for review
- Keep/delete/batch operations
- Face recognition rescan
- File serving for preview
- Add reference images
"""
import asyncio
import hashlib
import json
import mimetypes
import re
import shutil
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Query, Request
from fastapi.responses import FileResponse
from pydantic import BaseModel
from slowapi import Limiter
from slowapi.util import get_remote_address
from ..core.dependencies import get_current_user, get_current_user_media, get_app_state
from ..core.config import settings
from ..core.exceptions import (
handle_exceptions,
RecordNotFoundError,
MediaFileNotFoundError as CustomFileNotFoundError,
FileOperationError,
ValidationError
)
from ..core.responses import now_iso8601
from ..core.path_tokens import encode_path, decode_path
from modules.universal_logger import get_logger
from modules.date_utils import DateHandler
from ..core.utils import get_media_dimensions, get_media_dimensions_batch
logger = get_logger('API')
router = APIRouter(prefix="/api/review", tags=["Review Queue"])
limiter = Limiter(key_func=get_remote_address)
# Import shared utilities
from ..core.utils import get_face_module as _get_face_module, update_file_path_in_all_tables
def get_face_module(db):
"""Get or create a cached FaceRecognitionModule instance for the given database."""
return _get_face_module(db, module_name="Review")
REVIEW_BASE = Path("/opt/immich/review")
MAX_BATCH_SIZE = 500 # Maximum files per batch operation
MEDIA_BASE = Path("/opt/immich/md")
# ============================================================================
# PYDANTIC MODELS
# ============================================================================
class ReviewKeepRequest(BaseModel):
file_path: str
destination: Optional[str] = None
class ReviewAddRequest(BaseModel):
file_path: str
person_name: str
destination: Optional[str] = None
# NOTE: update_file_path_in_all_tables is imported from core.utils
# ============================================================================
# LIST AND FILTER ENDPOINTS
# ============================================================================
@router.get("/list")
@limiter.limit("5000/minute")
@handle_exceptions
async def get_review_queue(
request: Request,
current_user: Dict = Depends(get_current_user),
limit: int = Query(50, ge=1, le=500, description="Max items to return (1-500)"),
offset: int = Query(0, ge=0, description="Number of items to skip"),
platform: Optional[str] = None,
source: Optional[str] = None,
media_type: str = Query("all", pattern="^(all|image|video)$"),
sort_by: str = Query("post_date", pattern="^(post_date|created_date|added_date|file_size|filename|source|platform|confidence)$"),
sort_order: str = Query("desc", pattern="^(asc|desc)$"),
date_from: Optional[str] = Query(None, pattern="^\\d{4}-\\d{2}-\\d{2}$", description="Filter by post date from (YYYY-MM-DD)"),
date_to: Optional[str] = Query(None, pattern="^\\d{4}-\\d{2}-\\d{2}$", description="Filter by post date to (YYYY-MM-DD)"),
size_min: Optional[int] = Query(None, ge=0, description="Minimum file size in bytes"),
size_max: Optional[int] = Query(None, ge=0, description="Maximum file size in bytes"),
search: Optional[str] = Query(None, max_length=200, description="Search filename")
):
"""Get list of media files in review queue from file_inventory."""
app_state = get_app_state()
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
query = '''
SELECT
fi.id, fi.file_path, fi.filename, fi.platform, fi.source,
fi.content_type, fi.file_size, fi.width, fi.height,
fi.created_date, fi.last_verified,
COALESCE(d_max.max_post_date, fi.created_date) as post_date,
fi.video_id,
COALESCE(frs.confidence, -1) as face_confidence,
frs.has_match, frs.matched_person, frs.scan_date
FROM file_inventory fi
LEFT JOIN (
SELECT file_path, MAX(post_date) as max_post_date
FROM downloads
GROUP BY file_path
) d_max ON d_max.file_path = fi.file_path
LEFT JOIN face_recognition_scans frs ON frs.file_path = fi.file_path
WHERE fi.location = 'review'
'''
params = []
if platform:
query += ' AND fi.platform = ?'
params.append(platform)
if source:
query += ' AND fi.source = ?'
params.append(source)
if media_type != "all":
query += ' AND fi.content_type = ?'
params.append(media_type)
if date_from:
query += ' AND DATE(COALESCE(d_max.max_post_date, fi.created_date)) >= ?'
params.append(date_from)
if date_to:
query += ' AND DATE(COALESCE(d_max.max_post_date, fi.created_date)) <= ?'
params.append(date_to)
if size_min is not None:
query += ' AND fi.file_size >= ?'
params.append(size_min)
if size_max is not None:
query += ' AND fi.file_size <= ?'
params.append(size_max)
if search:
search_term = f'%{search}%'
query += ' AND (fi.filename LIKE ? OR fi.platform LIKE ? OR fi.source LIKE ? OR fi.content_type LIKE ?)'
params.extend([search_term, search_term, search_term, search_term])
# Count query
count_query = '''
SELECT COUNT(*)
FROM file_inventory fi
LEFT JOIN (
SELECT file_path, MAX(post_date) as max_post_date
FROM downloads
GROUP BY file_path
) d_max ON d_max.file_path = fi.file_path
WHERE fi.location = 'review'
'''
count_params = []
if platform:
count_query += ' AND fi.platform = ?'
count_params.append(platform)
if source:
count_query += ' AND fi.source = ?'
count_params.append(source)
if media_type != "all":
count_query += ' AND fi.content_type = ?'
count_params.append(media_type)
if date_from:
count_query += ' AND DATE(COALESCE(d_max.max_post_date, fi.created_date)) >= ?'
count_params.append(date_from)
if date_to:
count_query += ' AND DATE(COALESCE(d_max.max_post_date, fi.created_date)) <= ?'
count_params.append(date_to)
if size_min is not None:
count_query += ' AND fi.file_size >= ?'
count_params.append(size_min)
if size_max is not None:
count_query += ' AND fi.file_size <= ?'
count_params.append(size_max)
if search:
search_term = f'%{search}%'
count_query += ' AND (fi.filename LIKE ? OR fi.platform LIKE ? OR fi.source LIKE ? OR fi.content_type LIKE ?)'
count_params.extend([search_term, search_term, search_term, search_term])
cursor.execute(count_query, count_params)
total = cursor.fetchone()[0]
# Sorting
field_mapping = {
'post_date': 'post_date',
'added_date': 'fi.last_verified',
'file_size': 'fi.file_size',
'filename': 'fi.filename',
'source': 'fi.source',
'platform': 'fi.platform',
'confidence': 'face_confidence'
}
db_sort_field = field_mapping.get(sort_by, 'post_date')
sort_direction = 'DESC' if sort_order.lower() == 'desc' else 'ASC'
if db_sort_field in ['post_date', 'fi.last_verified']:
query += f' ORDER BY {db_sort_field} {sort_direction} NULLS LAST'
else:
query += f' ORDER BY {db_sort_field} {sort_direction}'
query += ' LIMIT ? OFFSET ?'
params.extend([limit, offset])
cursor.execute(query, params)
rows = cursor.fetchall()
# Batch lookup dimensions for items missing width/height
paths_needing_dimensions = [row[1] for row in rows if row[7] is None or row[8] is None]
dimensions_cache = get_media_dimensions_batch(paths_needing_dimensions) if paths_needing_dimensions else {}
files = []
for row in rows:
# Use DB values if available, else check batch cache
if row[7] is not None and row[8] is not None:
width, height = row[7], row[8]
else:
width, height = dimensions_cache.get(row[1], (row[7], row[8]))
fp = row[1]
file_item = {
"filename": row[2],
"file_path": fp,
"file_token": encode_path(fp) if fp else None,
"file_size": row[6] if row[6] else 0,
"added_date": row[10] if row[10] else '',
"post_date": row[11] if row[11] else '',
"platform": row[3],
"source": row[4] if row[4] else 'unknown',
"width": width,
"height": height,
"video_id": row[12] # For YouTube thumbnail support
}
# Add face recognition data from JOIN query
# row[12]=video_id, row[13]=face_confidence, row[14]=has_match, row[15]=matched_person, row[16]=scan_date
face_confidence = row[13] if row[13] is not None else -1
has_match = row[14]
matched_person = row[15]
scan_date = row[16]
if face_confidence >= 0: # Has face scan data (confidence >= 0 means scan exists)
file_item['face_recognition'] = {
'scanned': True,
'matched': bool(has_match),
'person_name': matched_person,
'confidence': face_confidence,
'scan_date': scan_date
}
else:
file_item['face_recognition'] = {'scanned': False}
files.append(file_item)
return {"files": files, "total": total}
@router.get("/filters")
@limiter.limit("5000/minute")
@handle_exceptions
async def get_review_filters(
request: Request,
platform: Optional[str] = Query(None, description="Filter sources by platform"),
current_user: Dict = Depends(get_current_user)
):
"""Get available filter options for review queue."""
from cache_manager import cache_manager
cache_key = f"filters:review_filters:{platform or 'all'}"
cached_result = cache_manager.get(cache_key)
if cached_result:
return cached_result
app_state = get_app_state()
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT platform
FROM file_inventory
WHERE location = 'review'
ORDER BY platform
""")
platforms = sorted([row[0] for row in cursor.fetchall()])
if platform:
cursor.execute("""
SELECT DISTINCT source
FROM file_inventory
WHERE location = 'review'
AND source IS NOT NULL
AND source != 'unknown'
AND platform = ?
ORDER BY source
""", (platform.lower(),))
else:
cursor.execute("""
SELECT DISTINCT source
FROM file_inventory
WHERE location = 'review' AND source IS NOT NULL AND source != 'unknown'
ORDER BY source
""")
sources = [row[0] for row in cursor.fetchall()]
result = {
"platforms": platforms,
"sources": sources
}
cache_manager.set(cache_key, result)
return result
@router.get("/all-paths")
@limiter.limit("5000/minute")
@handle_exceptions
async def get_all_review_paths(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get all file paths in review queue (no pagination)."""
app_state = get_app_state()
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT file_path
FROM file_inventory
WHERE location = 'review'
ORDER BY created_date DESC
""")
file_paths = [row[0] for row in cursor.fetchall()]
return {"file_paths": file_paths}
# ============================================================================
# FILE OPERATIONS
# ============================================================================
def process_review_keep_background(file_path: str, destination_hint: str, filename: str, app_state):
"""Background task to move review file to final destination."""
try:
source_path = Path(file_path)
destination = destination_hint
# If no destination provided, try to load from database metadata
if not destination:
try:
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT metadata FROM downloads
WHERE file_path = ?
''', (str(source_path),))
row = cursor.fetchone()
if row and row['metadata']:
metadata = json.loads(row['metadata'])
intended_path = metadata.get('intended_path')
if intended_path:
destination = intended_path
except Exception as e:
logger.warning(f"Could not load intended path from database: {e}", module="Database")
# Fallback: infer destination from current path structure
if not destination:
try:
# Use relative_to() for safe path validation (prevents symlink bypass)
source_path.resolve().relative_to(REVIEW_BASE.resolve())
relative_path = source_path.relative_to(REVIEW_BASE)
# For Instagram/TikTok, add username subdirectory from database
dest_base = MEDIA_BASE / relative_path.parent
# Get username (source) from database
try:
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT source, platform FROM file_inventory WHERE file_path = ?', (str(source_path),))
row = cursor.fetchone()
if row and row[0] and row[0] != 'unknown':
username = row[0]
platform = row[1] if row[1] else ''
if platform in ('instagram', 'tiktok') or 'instagram' in str(relative_path).lower() or 'tiktok' in str(relative_path).lower():
dest_base = dest_base / username
except Exception as e:
logger.warning(f"Could not get source from database: {e}", module="Database")
destination = str(dest_base / source_path.name)
except ValueError:
pass # Path not in REVIEW_BASE
except Exception as e:
logger.warning(f"Could not infer destination: {e}", module="Database")
if not destination:
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
app_state.websocket_manager.broadcast_sync({
"type": "batch_move_progress",
"filename": filename,
"success": False,
"error": "Destination required (no original path found)"
})
except Exception as e:
logger.debug(f"Failed to broadcast error status: {e}", module="Review")
return
dest_path = Path(destination)
if not dest_path.is_absolute():
dest_path = MEDIA_BASE / destination
if dest_path.name != source_path.name:
dest_path = dest_path / source_path.name
dest_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source_path), str(dest_path))
# Update file timestamps and EXIF to match post date from filename
try:
date_match = re.match(r'^(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})_', filename)
if date_match:
file_date = datetime(
int(date_match.group(1)), # year
int(date_match.group(2)), # month
int(date_match.group(3)), # day
int(date_match.group(4)), # hour
int(date_match.group(5)), # minute
int(date_match.group(6)) # second
)
DateHandler.update_file_timestamps(dest_path, file_date)
logger.debug(f"Updated file timestamps for {filename} to {file_date}", module="API")
except Exception as e:
logger.debug(f"Could not update file timestamps for {filename}: {e}", module="API")
# Update file_inventory
try:
success = app_state.db.update_file_inventory_location(
file_path=str(source_path),
new_location='final',
new_file_path=str(dest_path)
)
if success:
logger.info(f"Updated file_inventory: {filename} location changed from 'review' to 'final'", module="API")
app_state.db.queue_file_for_discovery(str(dest_path), ['embedding'], priority=5)
except Exception as e:
logger.warning(f"Failed to update file_inventory for {filename}: {e}", module="API")
update_file_path_in_all_tables(app_state.db, str(source_path), str(dest_path))
# Create downloads record if none exists (for files that came from review queue)
try:
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Check if downloads record exists
cursor.execute('SELECT id FROM downloads WHERE file_path = ?', (str(dest_path),))
if not cursor.fetchone():
# Get file_inventory info for platform/source and created_date
cursor.execute('SELECT platform, source, created_date FROM file_inventory WHERE file_path = ?', (str(dest_path),))
fi_row = cursor.fetchone()
platform = fi_row[0] if fi_row else 'unknown'
source = fi_row[1] if fi_row else 'unknown'
original_created_date = fi_row[2] if fi_row else None
# Extract post_date from filename (format: YYYYMMDD_HHMMSS_...)
post_date = None
date_match = re.match(r'^(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})_', filename)
if date_match:
try:
# Use ISO format with T separator for consistent timezone handling in frontend
post_date = f"{date_match.group(1)}-{date_match.group(2)}-{date_match.group(3)}T{date_match.group(4)}:{date_match.group(5)}:{date_match.group(6)}"
except Exception as e:
logger.debug(f"Could not parse date from filename {filename}: {e}", module="Review")
# Fallback to file modification time
if not post_date:
try:
mtime = dest_path.stat().st_mtime
# Use ISO format with T separator for consistent timezone handling
post_date = datetime.fromtimestamp(mtime).strftime('%Y-%m-%dT%H:%M:%S')
except Exception as e:
logger.debug(f"Could not get mtime for {dest_path}: {e}", module="Review")
# Insert downloads record, preserving original download date from file_inventory
url_hash = hashlib.sha256(str(dest_path).encode()).hexdigest()
# Use original created_date from file_inventory as download_date (when file was first added)
if original_created_date:
if isinstance(original_created_date, datetime):
download_date = original_created_date.strftime('%Y-%m-%dT%H:%M:%S')
else:
download_date = str(original_created_date).replace(' ', 'T')
else:
download_date = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
cursor.execute('''
INSERT INTO downloads (url_hash, url, platform, source, filename, file_path, post_date, download_date, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'completed')
''', (url_hash, str(dest_path), platform, source, filename, str(dest_path), post_date, download_date))
conn.commit()
logger.info(f"Created downloads record for {filename} with post_date={post_date}, download_date={download_date}", module="API")
except Exception as e:
logger.warning(f"Failed to create downloads record for {filename}: {e}", module="API")
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
app_state.websocket_manager.broadcast_sync({
"type": "batch_move_progress",
"filename": filename,
"success": True,
"destination": str(dest_path)
})
except Exception:
pass
except Exception as e:
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
app_state.websocket_manager.broadcast_sync({
"type": "batch_move_progress",
"filename": filename,
"success": False,
"error": str(e)
})
except Exception:
pass
@router.post("/keep")
@limiter.limit("1000/minute")
@handle_exceptions
async def keep_review_file(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(get_current_user),
keep_data: ReviewKeepRequest = Body(...)
):
"""Move file from review queue to final destination."""
app_state = get_app_state()
source_path = Path(keep_data.file_path)
# Use relative_to() for safe path validation (prevents symlink bypass)
try:
source_path.resolve().relative_to(REVIEW_BASE.resolve())
except ValueError:
raise ValidationError("Source must be in review directory")
if not source_path.exists():
raise CustomFileNotFoundError("File not found", {"path": str(source_path)})
filename = source_path.name
background_tasks.add_task(
process_review_keep_background,
keep_data.file_path,
keep_data.destination or '',
filename,
app_state
)
return {
"success": True,
"processing": True,
"message": "File move started, processing in background"
}
@router.post("/batch-delete")
@limiter.limit("100/minute")
@handle_exceptions
async def batch_delete_review_files(
request: Request,
current_user: Dict = Depends(get_current_user),
file_paths: List[str] = Body(..., embed=True)
):
"""Batch delete files from review queue. Maximum 500 files per request."""
# Security: Limit batch size to prevent DoS
if len(file_paths) > MAX_BATCH_SIZE:
raise ValidationError(f"Batch size exceeds maximum of {MAX_BATCH_SIZE} files")
app_state = get_app_state()
succeeded = 0
failed = 0
errors = []
for file_path in file_paths:
try:
file_to_delete = Path(file_path)
# Use relative_to() for safe path validation (prevents symlink bypass)
try:
file_to_delete.resolve().relative_to(REVIEW_BASE.resolve())
except ValueError:
failed += 1
errors.append(f"{file_path}: Not in review directory")
continue
if not file_to_delete.exists():
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM file_inventory WHERE file_path = ?", (str(file_to_delete),))
failed += 1
continue
recycle_id = app_state.db.move_to_recycle_bin(
file_path=str(file_to_delete),
deleted_from='review',
deleted_by=current_user.get('sub'),
metadata={}
)
if recycle_id:
succeeded += 1
else:
failed += 1
errors.append(f"{file_path}: Failed to move to recycle bin")
except Exception as e:
failed += 1
errors.append(f"{file_path}: {str(e)}")
# Broadcast
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "review_batch_delete_completed",
"succeeded": succeeded,
"failed": failed
})
except Exception:
pass
return {
"success": True,
"succeeded": succeeded,
"failed": failed,
"errors": errors[:10]
}
@router.delete("/delete")
@limiter.limit("1000/minute")
@handle_exceptions
async def delete_review_file(
request: Request,
current_user: Dict = Depends(get_current_user),
file_path: str = Body(..., embed=True)
):
"""Move file from review queue to recycle bin."""
app_state = get_app_state()
file_to_delete = Path(file_path)
# Use relative_to() for safe path validation (prevents symlink bypass)
try:
file_to_delete.resolve().relative_to(REVIEW_BASE.resolve())
except ValueError:
raise ValidationError("File must be in review directory")
if not file_to_delete.exists():
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM file_inventory WHERE file_path = ?", (str(file_to_delete),))
conn.commit()
raise CustomFileNotFoundError("File not found (database record removed)")
recycle_id = app_state.db.move_to_recycle_bin(
file_path=str(file_to_delete),
deleted_from='review',
deleted_by=current_user.get('sub'),
metadata={}
)
if not recycle_id:
raise FileOperationError("Failed to move to recycle bin")
# Broadcast
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "review_delete_completed",
"file": file_path
})
except Exception:
pass
return {"success": True}
@router.get("/file")
@limiter.limit("5000/minute")
@handle_exceptions
async def get_review_file(
request: Request,
file_path: str = None,
token: str = None,
t: str = None,
current_user: Dict = Depends(get_current_user_media)
):
"""Serve a file from the review queue."""
if t:
file_path = decode_path(t)
requested_file = Path(file_path)
try:
resolved_file = requested_file.resolve()
# Use relative_to() for safe path validation (prevents symlink bypass)
resolved_file.relative_to(REVIEW_BASE.resolve())
except ValueError:
raise ValidationError("File must be in review directory")
except Exception:
raise ValidationError("Invalid file path")
if not resolved_file.exists() or not resolved_file.is_file():
raise CustomFileNotFoundError("File not found")
mime_type, _ = mimetypes.guess_type(str(resolved_file))
if not mime_type:
mime_type = "application/octet-stream"
return FileResponse(
path=str(resolved_file),
media_type=mime_type,
filename=resolved_file.name
)
# ============================================================================
# FACE RECOGNITION RESCAN
# ============================================================================
@router.post("/rescan")
@limiter.limit("10/minute")
@handle_exceptions
async def rescan_review_queue(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(get_current_user)
):
"""Rescan all files in review queue for face recognition."""
app_state = get_app_state()
# Get all files in review queue
all_files = []
for ext in ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.mp4', '*.mov', '*.webm', '*.avi', '*.mkv']:
all_files.extend(REVIEW_BASE.rglob(ext))
all_files.extend(REVIEW_BASE.rglob(ext.upper()))
if not all_files:
return {"success": True, "message": "No files to scan", "total": 0}
# Filter out already scanned files (include those with confidence OR has_match set)
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT file_path FROM face_recognition_scans WHERE confidence IS NOT NULL OR has_match IS NOT NULL')
already_scanned = {row[0] for row in cursor.fetchall()}
files_to_scan = [f for f in all_files if str(f) not in already_scanned]
skipped_count = len(all_files) - len(files_to_scan)
total_files = len(files_to_scan)
if total_files == 0:
return {"success": True, "message": f"All {skipped_count} files already scanned", "total": 0, "skipped": skipped_count}
# Get settings
fr_settings = app_state.settings.get('face_recognition', {})
base_tolerance = float(fr_settings.get('tolerance', 0.20))
video_tolerance = float(fr_settings.get('video_tolerance', 0.45))
source_tolerances = fr_settings.get('source_tolerances', {})
async def run_rescan():
try:
from modules.face_recognition_module import FaceRecognitionModule
app_state.review_rescan_running = True
app_state.review_rescan_progress = {"current": 0, "total": total_files, "current_file": ""}
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "review_rescan_started",
"total_files": total_files,
"timestamp": now_iso8601()
})
except Exception:
pass
face_module = get_face_module(app_state.db)
stats = {'total': total_files, 'updated': 0, 'errors': 0, 'processed': 0}
video_extensions = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.m4v', '.gif'}
for idx, file_path in enumerate(files_to_scan, 1):
filename = file_path.name
app_state.review_rescan_progress = {"current": idx, "total": total_files, "current_file": filename}
try:
source = filename.split('_')[0] if '_' in filename else None
is_video = file_path.suffix.lower() in video_extensions
if source and source in source_tolerances:
tolerance = max(source_tolerances[source], video_tolerance) if is_video else source_tolerances[source]
else:
tolerance = video_tolerance if is_video else base_tolerance
# Broadcast progress
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "review_rescan_progress",
"file": filename,
"file_path": str(file_path),
"current": idx,
"total": total_files,
"status": "processing",
"timestamp": now_iso8601()
})
except Exception:
pass
loop = asyncio.get_event_loop()
if is_video:
result = await loop.run_in_executor(None, lambda: face_module.check_video_multiframe(str(file_path), tolerance=tolerance))
else:
result = await loop.run_in_executor(None, lambda: face_module.check_image(str(file_path), tolerance=tolerance))
# Update database
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id FROM face_recognition_scans WHERE file_path = ?', (str(file_path),))
existing = cursor.fetchone()
matched_person = result.get('person_name') or result.get('best_candidate')
confidence = float(result.get('confidence', 0.0))
if existing:
cursor.execute('''
UPDATE face_recognition_scans
SET has_match = ?, matched_person = ?, confidence = ?, face_count = ?, scan_date = CURRENT_TIMESTAMP, scan_type = 'rescan'
WHERE file_path = ?
''', (result['has_match'], matched_person, confidence, result.get('face_count', 0), str(file_path)))
else:
cursor.execute('''
INSERT INTO face_recognition_scans (file_path, has_match, matched_person, confidence, face_count, scan_type)
VALUES (?, ?, ?, ?, ?, 'rescan')
''', (str(file_path), result['has_match'], matched_person, confidence, result.get('face_count', 0)))
conn.commit()
stats['updated'] += 1
stats['processed'] += 1
# Broadcast result
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "review_rescan_progress",
"file": filename,
"file_path": str(file_path),
"current": idx,
"total": total_files,
"status": "matched" if result.get('has_match') else "updated",
"person": matched_person,
"confidence": confidence,
"timestamp": now_iso8601()
})
except Exception:
pass
except Exception as e:
stats['errors'] += 1
stats['processed'] += 1
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "review_rescan_progress",
"file": filename,
"file_path": str(file_path),
"current": idx,
"total": total_files,
"status": "error",
"error": str(e),
"timestamp": now_iso8601()
})
except Exception:
pass
# Completion
app_state.review_rescan_running = False
app_state.review_rescan_progress = {"current": 0, "total": 0, "current_file": "", "complete": True, "stats": stats}
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "review_rescan_complete",
"stats": stats,
"timestamp": now_iso8601()
})
except Exception:
pass
except Exception as e:
app_state.review_rescan_running = False
app_state.review_rescan_progress = {"current": 0, "total": 0, "current_file": "", "error": str(e)}
try:
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "review_rescan_error",
"error": str(e),
"timestamp": now_iso8601()
})
except Exception:
pass
background_tasks.add_task(run_rescan)
return {
"success": True,
"message": f"Rescan started for {total_files} files",
"total": total_files
}
@router.get("/rescan/status")
@limiter.limit("100/minute")
@handle_exceptions
async def get_rescan_status(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get the current status of a review rescan operation."""
app_state = get_app_state()
return {
"running": getattr(app_state, 'review_rescan_running', False),
"progress": getattr(app_state, 'review_rescan_progress', {})
}
# ============================================================================
# ADD REFERENCE ENDPOINTS
# ============================================================================
@router.post("/add-reference")
@limiter.limit("100/minute")
@handle_exceptions
async def add_reference_from_review(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(get_current_user),
add_data: ReviewAddRequest = Body(...)
):
"""
Add a reference image from review queue and move file to final destination.
This is a combined operation that:
1. Extracts face from the file
2. Adds it as a reference for the specified person
3. Moves the file to the final destination
"""
app_state = get_app_state()
source_path = Path(add_data.file_path)
# Use relative_to() for safe path validation (prevents symlink bypass)
try:
source_path.resolve().relative_to(REVIEW_BASE.resolve())
except ValueError:
raise ValidationError("Source must be in review directory")
if not source_path.exists():
raise CustomFileNotFoundError("File not found")
if not add_data.person_name or not add_data.person_name.strip():
raise ValidationError("Person name is required")
# Add face reference using cached module
face_module = get_face_module(app_state.db)
result = face_module.add_reference_image(
str(source_path),
add_data.person_name.strip(),
source_info="review_queue"
)
if not result.get('success'):
raise FileOperationError(
result.get('error', 'Failed to add reference'),
{"file": str(source_path)}
)
# Queue move to final destination in background
filename = source_path.name
background_tasks.add_task(
process_review_keep_background,
add_data.file_path,
add_data.destination or '',
filename,
app_state
)
return {
"success": True,
"message": f"Reference added for {add_data.person_name}",
"reference_count": result.get('reference_count', 1),
"processing": True
}