Files
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

1249 lines
45 KiB
Python

"""
Face Recognition Router
Handles face recognition operations:
- Reference face management (add, delete, retrain)
- Face scanning and matching
- Dashboard statistics
- Batch operations with WebSocket progress
"""
import asyncio
import base64
import io
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request
from PIL import Image
from pydantic import BaseModel
from slowapi import Limiter
from slowapi.util import get_remote_address
from ..core.dependencies import get_current_user, get_app_state
from ..core.config import settings
from ..core.exceptions import handle_exceptions, NotFoundError, ValidationError
from ..core.responses import now_iso8601
from ..core.utils import validate_file_path, get_face_module as _get_face_module, ALLOWED_PATHS
from modules.universal_logger import get_logger
logger = get_logger('API')
router = APIRouter(prefix="/api/face", tags=["Face Recognition"])
# Additional allowed paths for face recognition (includes /opt/immich for media files)
FACE_ALLOWED_PATHS = ALLOWED_PATHS + [Path('/opt/immich')]
def validate_file_path_for_face(file_path: str) -> Path:
"""
Validate file path is within allowed directories for face recognition.
Uses the shared validate_file_path function with face-specific allowed paths.
Raises:
ValidationError: If path is outside allowed directories
"""
from fastapi import HTTPException
try:
return validate_file_path(file_path, allowed_bases=FACE_ALLOWED_PATHS)
except HTTPException as e:
if e.status_code == 403:
logger.warning(f"Path traversal attempt blocked: {file_path}", module="Security")
raise ValidationError("Access denied: file path not in allowed directory")
elif e.status_code == 400:
logger.error(f"Path validation error for: {file_path}", module="Security")
raise ValidationError("Invalid file path")
raise
def get_face_module(db):
"""Get or create a cached FaceRecognitionModule instance for the given database."""
return _get_face_module(db, module_name="FaceRecognition")
limiter = Limiter(key_func=get_remote_address)
# ============================================================================
# PYDANTIC MODELS
# ============================================================================
class AddReferenceRequest(BaseModel):
file_path: str
person_name: str
class AddFaceReferenceRequest(BaseModel):
file_path: str
person_name: Optional[str] = None
background: bool = False
class BatchAddFaceReferenceRequest(BaseModel):
file_paths: List[str]
person_name: Optional[str] = None
class RetroactiveScanRequest(BaseModel):
directory: Optional[str] = None
class RetrainRequest(BaseModel):
model: str
# ============================================================================
# REFERENCE MANAGEMENT ENDPOINTS
# ============================================================================
@router.get("/reference-stats")
@limiter.limit("60/minute")
@handle_exceptions
async def get_face_reference_stats(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get statistics about reference faces in the database."""
app_state = get_app_state()
face_module = get_face_module(app_state.db)
refs = face_module.get_reference_faces()
# Group by person name
by_person = {}
for ref in refs:
person = ref['person_name']
if person not in by_person:
by_person[person] = {
'count': 0,
'first_added': ref['created_at'],
'last_added': ref['created_at']
}
by_person[person]['count'] += 1
if ref['created_at'] < by_person[person]['first_added']:
by_person[person]['first_added'] = ref['created_at']
if ref['created_at'] > by_person[person]['last_added']:
by_person[person]['last_added'] = ref['created_at']
return {
'total': len(refs),
'by_person': by_person
}
@router.get("/references")
@limiter.limit("60/minute")
@handle_exceptions
async def get_face_references(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get all reference faces with image thumbnails."""
app_state = get_app_state()
face_module = get_face_module(app_state.db)
refs = face_module.get_reference_faces()
references_with_images = []
for ref in refs:
thumbnail_data = ref.get('thumbnail_data')
# Fallback: generate thumbnail on-the-fly if not in database
if not thumbnail_data:
image_path = Path(ref['reference_image_path'])
if image_path.exists():
try:
with Image.open(image_path) as img:
img.thumbnail((150, 150), Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format='JPEG', quality=85)
buffer.seek(0)
thumbnail_data = base64.b64encode(buffer.read()).decode('utf-8')
except Exception as e:
logger.warning(f"Failed to generate thumbnail for {image_path}: {e}", module="FaceRecognition")
references_with_images.append({
'id': ref['id'],
'person_name': ref['person_name'],
'reference_image_path': ref['reference_image_path'],
'created_at': ref['created_at'],
'thumbnail': thumbnail_data
})
return {'references': references_with_images}
@router.post("/references")
@limiter.limit("20/minute")
@handle_exceptions
async def add_face_reference_simple(
request: Request,
current_user: Dict = Depends(get_current_user),
add_data: AddReferenceRequest = Body(...)
):
"""Add a new reference face from an existing file."""
# Security: Validate path is within allowed directories (prevent path traversal)
file_path = validate_file_path_for_face(add_data.file_path)
if not file_path.exists():
# Security: Don't expose full path in error message
raise NotFoundError("File not found")
app_state = get_app_state()
face_module = get_face_module(app_state.db)
success = face_module.add_reference_face(add_data.person_name, str(file_path))
if success:
return {'success': True, 'message': f'Reference face added for {add_data.person_name}'}
else:
raise ValidationError('Failed to add reference face - no face detected in image')
@router.delete("/references/{reference_id}")
@limiter.limit("20/minute")
@handle_exceptions
async def delete_face_reference(
request: Request,
reference_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Delete a reference face by ID."""
app_state = get_app_state()
face_module = get_face_module(app_state.db)
success = face_module.remove_reference_face(reference_id)
if success:
return {'success': True, 'message': 'Reference face removed'}
else:
raise NotFoundError('Reference face not found')
# ============================================================================
# RETRAIN AND MIGRATION ENDPOINTS
# ============================================================================
@router.post("/retrain-references")
@limiter.limit("1/minute")
@handle_exceptions
async def retrain_face_references(
request: Request,
current_user: Dict = Depends(get_current_user),
data: RetrainRequest = Body(...)
):
"""
Re-train all reference faces with a new InsightFace model.
Required when switching models because embeddings are incompatible.
"""
new_model = data.model
if not new_model:
raise ValidationError('Model name required')
valid_models = ['buffalo_l', 'buffalo_m', 'buffalo_s', 'buffalo_sc', 'antelopev2']
if new_model not in valid_models:
raise ValidationError(f'Invalid model. Must be one of: {", ".join(valid_models)}')
logger.info(f"Re-training references with model: {new_model}", module="FaceRecognition")
app_state = get_app_state()
# Update settings with new model
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("SELECT value FROM settings WHERE key = 'face_recognition'")
result = cursor.fetchone()
if result:
settings = json.loads(result[0])
settings['insightface_model'] = new_model
cursor.execute(
"UPDATE settings SET value = ? WHERE key = 'face_recognition'",
(json.dumps(settings),)
)
conn.commit()
# Store progress in shared state
progress_state = {
'current': 0,
'total': 0,
'person_name': '',
'in_progress': True
}
def progress_callback(current, total, person_name, success):
progress_state['current'] = current
progress_state['total'] = total
progress_state['person_name'] = person_name
app_state.face_retrain_progress = progress_state
face_module = get_face_module(app_state.db)
def run_retrain():
try:
return face_module.retrain_all_references_with_model(new_model, progress_callback=progress_callback)
finally:
progress_state['in_progress'] = False
result = await asyncio.to_thread(run_retrain)
app_state.face_retrain_progress = None
if result['success']:
return {
'success': True,
'message': f"Successfully re-trained {result['updated']}/{result['total']} references",
'total': result['total'],
'updated': result['updated'],
'failed': result['failed'],
'errors': result['errors']
}
else:
return {
'success': False,
'message': f"Re-training completed with errors: {result['updated']}/{result['total']} successful",
'total': result['total'],
'updated': result['updated'],
'failed': result['failed'],
'errors': result['errors']
}
@router.get("/retrain-progress")
@limiter.limit("300/minute")
@handle_exceptions
async def get_retrain_progress(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get current face recognition retraining progress."""
app_state = get_app_state()
progress = getattr(app_state, 'face_retrain_progress', None)
if not progress:
return {
'in_progress': False,
'current': 0,
'total': 0,
'person_name': ''
}
return progress
@router.post("/migrate-references")
@limiter.limit("5/minute")
@handle_exceptions
async def migrate_face_references(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""
Migrate existing face references to dedicated storage directory.
Copies reference images to /opt/media-downloader/data/face_references/
to prevent issues when original files are moved or deleted.
"""
app_state = get_app_state()
face_module = get_face_module(app_state.db)
result = face_module.migrate_references_to_storage()
logger.info(
f"Face reference migration: {result['migrated']} migrated, "
f"{result['deactivated']} deactivated, {result['skipped']} skipped",
module="FaceRecognition"
)
return {
"success": len(result['errors']) == 0,
"total": result['total'],
"migrated": result['migrated'],
"deactivated": result['deactivated'],
"skipped": result['skipped'],
"errors": result['errors']
}
# ============================================================================
# DASHBOARD STATISTICS
# ============================================================================
@router.get("/dashboard-stats")
@limiter.limit("60/minute")
@handle_exceptions
async def get_face_dashboard_stats(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get comprehensive face recognition statistics for dashboard."""
app_state = get_app_state()
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
# Total scans and matches
cursor.execute("""
SELECT
COUNT(*) as total_scans,
SUM(CASE WHEN has_match = 1 THEN 1 ELSE 0 END) as total_matches,
SUM(CASE WHEN has_match = 0 THEN 1 ELSE 0 END) as total_no_matches
FROM face_recognition_scans
""")
totals = cursor.fetchone()
# Matches by person
cursor.execute("""
SELECT
matched_person,
COUNT(*) as count,
AVG(confidence) as avg_confidence,
MIN(confidence) as min_confidence,
MAX(confidence) as max_confidence
FROM face_recognition_scans
WHERE has_match = 1 AND matched_person IS NOT NULL
GROUP BY matched_person
ORDER BY count DESC
""")
by_person = []
for row in cursor.fetchall():
try:
avg_conf = float(row[2]) if row[2] is not None else 0
except (ValueError, TypeError):
avg_conf = 0
try:
min_conf = float(row[3]) if row[3] is not None else 0
except (ValueError, TypeError):
min_conf = 0
try:
max_conf = float(row[4]) if row[4] is not None else 0
except (ValueError, TypeError):
max_conf = 0
by_person.append({
'person': row[0],
'count': row[1],
'avg_confidence': round(avg_conf, 2),
'min_confidence': round(min_conf, 2),
'max_confidence': round(max_conf, 2)
})
# Confidence distribution
cursor.execute("""
SELECT
CASE
WHEN confidence < 0.2 THEN '0-20%'
WHEN confidence < 0.4 THEN '20-40%'
WHEN confidence < 0.6 THEN '40-60%'
WHEN confidence < 0.8 THEN '60-80%'
ELSE '80-100%'
END as confidence_range,
COUNT(*) as count
FROM face_recognition_scans
WHERE has_match = 1 AND confidence IS NOT NULL
GROUP BY confidence_range
ORDER BY confidence_range
""")
confidence_distribution = {row[0]: row[1] for row in cursor.fetchall()}
# Recent matches (last 20)
cursor.execute("""
SELECT
f.file_path,
f.matched_person,
f.confidence,
f.scan_date,
d.source
FROM face_recognition_scans f
LEFT JOIN downloads d ON f.download_id = d.id
WHERE f.has_match = 1
ORDER BY f.scan_date DESC
LIMIT 20
""")
recent_matches = []
for row in cursor.fetchall():
try:
conf = float(row[2]) if row[2] is not None else 0
except (ValueError, TypeError):
conf = 0
source = row[4]
if not source:
file_path = row[0]
if file_path:
parts = file_path.split('/')
for part in parts:
if part.lower() in ['instagram', 'tiktok', 'youtube', 'twitter', 'reddit', 'imgur', 'toolzu', 'fastdl']:
source = part.lower()
break
if not source and 'forums' in file_path.lower():
if '/forums/' in file_path:
forum_parts = file_path.split('/forums/')
if len(forum_parts) > 1:
forum_name = forum_parts[1].split('/')[0]
source = f'forum:{forum_name}'
recent_matches.append({
'file_path': row[0],
'person': row[1],
'confidence': round(conf, 2),
'scan_date': row[3],
'source': source if source else 'unknown'
})
# Scans over time (last 30 days)
cursor.execute("""
SELECT
DATE(scan_date) as scan_day,
COUNT(*) as total_scans,
SUM(CASE WHEN has_match = 1 THEN 1 ELSE 0 END) as matches
FROM face_recognition_scans
WHERE scan_date >= DATE('now', '-30 days')
GROUP BY scan_day
ORDER BY scan_day
""")
scans_over_time = []
for row in cursor.fetchall():
scans_over_time.append({
'date': row[0],
'total_scans': row[1],
'matches': row[2],
'match_rate': round((row[2] / row[1] * 100) if row[1] > 0 else 0, 1)
})
match_rate = 0
if totals[0] > 0:
match_rate = round((totals[1] / totals[0]) * 100, 1)
return {
'total_scans': totals[0] or 0,
'total_matches': totals[1] or 0,
'total_no_matches': totals[2] or 0,
'match_rate': match_rate,
'by_person': by_person,
'confidence_distribution': confidence_distribution,
'recent_matches': recent_matches,
'scans_over_time': scans_over_time
}
# ============================================================================
# ADD REFERENCE FROM MEDIA (WITH BACKGROUND PROCESSING)
# ============================================================================
@router.post("/add-reference")
@limiter.limit("20/minute")
@handle_exceptions
async def add_face_reference(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(get_current_user),
add_data: AddFaceReferenceRequest = Body(...)
):
"""Add any file as a face recognition reference."""
logger.info(f"API: add_face_reference called for {add_data.file_path}, background={add_data.background}", module="FaceRecognition")
# Security: Validate path is within allowed directories (prevent path traversal)
source_path = validate_file_path_for_face(add_data.file_path)
# Normalize path - handle non-breaking spaces in filesystem
if not source_path.exists():
normalized_str = str(source_path).replace(' ', '\u00a0')
normalized_path = Path(normalized_str)
if normalized_path.exists():
# Re-validate the normalized path too
source_path = validate_file_path_for_face(str(normalized_path))
else:
# Security: Don't expose full path in error message
raise NotFoundError("File not found")
app_state = get_app_state()
# Get person name from settings if not provided
person_name = add_data.person_name
if not person_name:
settings = app_state.settings.get('face_recognition', {})
person_name = settings.get('person_name', 'Unknown')
if add_data.background:
async def background_task():
try:
logger.info(f"BACKGROUND_TASK: Waiting for semaphore for {source_path}", module="FaceRecognition")
async with app_state.face_recognition_semaphore:
logger.info(f"BACKGROUND_TASK: Acquired semaphore, starting for {source_path}", module="FaceRecognition")
process = await asyncio.create_subprocess_exec(
'/opt/media-downloader/venv/bin/python3',
'/opt/media-downloader/scripts/add_reference_face.py',
person_name,
str(source_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120)
returncode = process.returncode
logger.info(f"BACKGROUND_TASK: Process completed with code {returncode}", module="FaceRecognition")
if returncode == 0:
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id FROM face_recognition_scans WHERE file_path = ?', (str(source_path),))
if cursor.fetchone():
cursor.execute('''
UPDATE face_recognition_scans
SET has_match = 1, matched_person = ?, confidence = NULL, scan_date = CURRENT_TIMESTAMP
WHERE file_path = ?
''', (person_name, str(source_path)))
else:
cursor.execute('''
INSERT INTO face_recognition_scans
(file_path, has_match, matched_person, confidence, face_count, scan_type, scan_date)
VALUES (?, 1, ?, NULL, 1, 'manual_reference', CURRENT_TIMESTAMP)
''', (str(source_path), person_name))
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "face_reference_added",
"file_path": str(source_path),
"filename": source_path.name,
"person_name": person_name,
"success": True
})
else:
stderr_str = stderr.decode() if stderr else ''
stdout_str = stdout.decode() if stdout else ''
full_error = stderr_str.strip() or stdout_str.strip() or "Failed"
error_msg = "Processing failed"
for line in full_error.split('\n'):
if '[FaceRecognition]' in line:
parts = line.split('[FaceRecognition]')
if len(parts) > 1:
error_msg = parts[1].strip()
if ':' in error_msg:
error_msg = error_msg.split(':')[0].strip()
break
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id FROM face_recognition_scans WHERE file_path = ?', (str(source_path),))
if cursor.fetchone():
cursor.execute('''
UPDATE face_recognition_scans
SET has_match = 0, matched_person = NULL, confidence = NULL, scan_date = CURRENT_TIMESTAMP
WHERE file_path = ?
''', (str(source_path),))
else:
cursor.execute('''
INSERT INTO face_recognition_scans
(file_path, has_match, matched_person, confidence, face_count, scan_type, scan_date)
VALUES (?, 0, NULL, NULL, 0, 'manual_reference_failed', CURRENT_TIMESTAMP)
''', (str(source_path),))
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "face_reference_added",
"file_path": str(source_path),
"filename": source_path.name,
"person_name": person_name,
"success": False,
"error": error_msg
})
except Exception as e:
logger.error(f"BACKGROUND_TASK: Exception for {source_path}: {e}", module="FaceRecognition")
if hasattr(app_state, 'websocket_manager') and app_state.websocket_manager:
await app_state.websocket_manager.broadcast({
"type": "face_reference_added",
"file_path": str(source_path),
"filename": source_path.name,
"person_name": person_name,
"success": False,
"error": str(e)
})
asyncio.create_task(background_task())
return {
"success": True,
"message": f"Processing {source_path.name}...",
"person_name": person_name,
"file_path": str(source_path)
}
else:
# Run synchronously
await asyncio.sleep(1.0)
process = await asyncio.create_subprocess_exec(
'/opt/media-downloader/venv/bin/python3',
'/opt/media-downloader/scripts/add_reference_face.py',
person_name,
str(source_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120)
returncode = process.returncode
if returncode != 0:
stderr_str = stderr.decode() if stderr else ''
stdout_str = stdout.decode() if stdout else ''
error_msg = stderr_str.strip() or stdout_str.strip() or "Failed to add face reference"
raise HTTPException(status_code=500, detail=error_msg)
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id FROM face_recognition_scans WHERE file_path = ?', (str(source_path),))
if cursor.fetchone():
cursor.execute('''
UPDATE face_recognition_scans
SET has_match = 1, matched_person = ?, confidence = NULL, scan_date = CURRENT_TIMESTAMP
WHERE file_path = ?
''', (person_name, str(source_path)))
else:
cursor.execute('''
INSERT INTO face_recognition_scans
(file_path, has_match, matched_person, confidence, face_count, scan_type, scan_date)
VALUES (?, 1, ?, NULL, 1, 'manual_reference', CURRENT_TIMESTAMP)
''', (str(source_path), person_name))
return {
"success": True,
"message": f"Added {source_path.name} as face reference for {person_name}",
"person_name": person_name,
"file_path": str(source_path)
}
# ============================================================================
# BATCH ADD REFERENCE
# ============================================================================
@router.post("/batch-add-reference")
@limiter.limit("10/minute")
@handle_exceptions
async def batch_add_face_reference(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(get_current_user),
batch_data: BatchAddFaceReferenceRequest = Body(...)
):
"""Batch add files as face recognition references - runs in background with WebSocket progress."""
file_paths = batch_data.file_paths
# Security: Limit batch size to prevent DoS
MAX_BATCH_SIZE = 100 # Lower limit for face recognition (CPU intensive)
if len(file_paths) > MAX_BATCH_SIZE:
raise ValidationError(f"Batch size exceeds maximum of {MAX_BATCH_SIZE} files")
# Security: Pre-validate all paths before starting batch (prevent path traversal)
validated_paths = []
for fp in file_paths:
try:
validated_paths.append(str(validate_file_path_for_face(fp)))
except ValidationError:
logger.warning(f"Batch add: Path validation failed for {fp}", module="Security")
continue
if not validated_paths:
raise ValidationError("No valid file paths provided")
file_paths = validated_paths
app_state = get_app_state()
person_name = batch_data.person_name
if not person_name:
settings = app_state.settings.get('face_recognition', {})
person_name = settings.get('person_name', 'Unknown')
async def batch_task():
logger.info(f"BATCH_TASK_STARTED: Processing {len(file_paths)} files", module="FaceRecognition")
total = len(file_paths)
succeeded = 0
failed = 0
manager = getattr(app_state, 'websocket_manager', None)
if manager:
await manager.broadcast({
"type": "batch_face_reference_started",
"total": total
})
for idx, file_path_str in enumerate(file_paths, 1):
source_path = Path(file_path_str)
if not source_path.exists():
normalized_str = str(source_path).replace(' ', '\u00a0')
normalized_path = Path(normalized_str)
if normalized_path.exists():
source_path = normalized_path
filename = source_path.name
if manager:
await manager.broadcast({
"type": "batch_face_reference_progress",
"filename": filename,
"file_path": file_path_str,
"current": idx,
"total": total,
"status": "processing"
})
try:
if not source_path.exists():
raise Exception("File not found")
process = await asyncio.create_subprocess_exec(
'nice', '-n', '19',
'/opt/media-downloader/venv/bin/python3',
'/opt/media-downloader/scripts/add_reference_face.py',
person_name,
str(source_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=120)
returncode = process.returncode
if returncode == 0:
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id FROM face_recognition_scans WHERE file_path = ?', (str(source_path),))
if cursor.fetchone():
cursor.execute('''
UPDATE face_recognition_scans
SET has_match = 1, matched_person = ?, confidence = NULL, scan_date = CURRENT_TIMESTAMP
WHERE file_path = ?
''', (person_name, str(source_path)))
else:
cursor.execute('''
INSERT INTO face_recognition_scans
(file_path, has_match, matched_person, confidence, face_count, scan_type, scan_date)
VALUES (?, 1, ?, NULL, 1, 'manual_reference', CURRENT_TIMESTAMP)
''', (str(source_path), person_name))
succeeded += 1
if manager:
await manager.broadcast({
"type": "batch_face_reference_progress",
"filename": filename,
"file_path": file_path_str,
"current": idx,
"total": total,
"status": "success"
})
else:
failed += 1
stderr_str = stderr.decode() if stderr else ''
stdout_str = stdout.decode() if stdout else ''
error_msg = stderr_str.strip() or stdout_str.strip() or "Failed"
if manager:
await manager.broadcast({
"type": "batch_face_reference_progress",
"filename": filename,
"file_path": file_path_str,
"current": idx,
"total": total,
"status": "error",
"error": error_msg
})
except Exception as e:
failed += 1
if manager:
await manager.broadcast({
"type": "batch_face_reference_progress",
"filename": filename,
"file_path": file_path_str,
"current": idx,
"total": total,
"status": "error",
"error": str(e)
})
logger.info(f"BATCH_TASK: COMPLETE - {succeeded} succeeded, {failed} failed", module="FaceRecognition")
if manager:
await manager.broadcast({
"type": "batch_face_reference_complete",
"succeeded": succeeded,
"failed": failed,
"total": total
})
background_tasks.add_task(batch_task)
return {
"success": True,
"message": f"Processing {len(file_paths)} files in background...",
"total": len(file_paths)
}
# ============================================================================
# RETROACTIVE SCAN
# ============================================================================
@router.post("/retroactive-scan")
@limiter.limit("1/minute")
@handle_exceptions
async def retroactive_face_scan(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(get_current_user),
scan_data: RetroactiveScanRequest = Body(...)
):
"""Start a retroactive face recognition scan with live progress updates."""
app_state = get_app_state()
async def run_scan():
try:
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
query = '''
SELECT DISTINCT dm.file_path, dm.filename
FROM download_metadata dm
LEFT JOIN face_recognition_scans frs ON dm.file_path = frs.file_path
WHERE dm.media_type IN ('image', 'video')
AND (frs.id IS NULL OR frs.has_match = 0)
'''
if scan_data.directory:
query += ' AND dm.file_path LIKE ?'
cursor.execute(query, (f'%{scan_data.directory}%',))
else:
cursor.execute(query)
files_to_scan = cursor.fetchall()
total_files = len(files_to_scan)
manager = getattr(app_state, 'websocket_manager', None)
if manager:
await manager.broadcast({
"type": "face_scan_started",
"total_files": total_files,
"timestamp": now_iso8601()
})
face_module = get_face_module(app_state.db)
stats = {
'total': total_files,
'matched': 0,
'unmatched': 0,
'errors': 0,
'processed': 0
}
for idx, file_row in enumerate(files_to_scan, 1):
file_path = file_row[0]
filename = file_row[1]
try:
if manager:
await manager.broadcast({
"type": "face_scan_progress",
"file": filename,
"file_path": file_path,
"current": idx,
"total": total_files,
"status": "processing",
"timestamp": now_iso8601()
})
if not Path(file_path).exists():
stats['errors'] += 1
if manager:
await manager.broadcast({
"type": "face_scan_progress",
"file": filename,
"file_path": file_path,
"current": idx,
"total": total_files,
"status": "error",
"error": "File not found",
"timestamp": now_iso8601()
})
continue
result = face_module.scan_file(file_path)
if result.get('has_match'):
stats['matched'] += 1
if manager:
await manager.broadcast({
"type": "face_scan_progress",
"file": filename,
"file_path": file_path,
"current": idx,
"total": total_files,
"status": "matched",
"person": result.get('person_name', 'Unknown'),
"confidence": result.get('confidence', 0),
"timestamp": now_iso8601()
})
else:
stats['unmatched'] += 1
if manager:
await manager.broadcast({
"type": "face_scan_progress",
"file": filename,
"file_path": file_path,
"current": idx,
"total": total_files,
"status": "no_match",
"timestamp": now_iso8601()
})
stats['processed'] += 1
# Periodic memory cleanup to prevent OOM during long scans
if stats['processed'] % 50 == 0:
import gc
gc.collect()
except Exception as e:
stats['errors'] += 1
if manager:
await manager.broadcast({
"type": "face_scan_progress",
"file": filename,
"file_path": file_path,
"current": idx,
"total": total_files,
"status": "error",
"error": str(e),
"timestamp": now_iso8601()
})
if manager:
await manager.broadcast({
"type": "face_scan_complete",
"stats": stats,
"timestamp": now_iso8601()
})
except Exception as e:
if manager:
await manager.broadcast({
"type": "face_scan_error",
"error": str(e),
"timestamp": now_iso8601()
})
background_tasks.add_task(run_scan)
return {
"success": True,
"message": "Face recognition scan started"
}
# ============================================================================
# IMMICH INTEGRATION ENDPOINTS
# ============================================================================
def _get_immich_integration():
"""Get configured Immich integration instance."""
from modules.immich_face_integration import ImmichFaceIntegration
app_state = get_app_state()
immich_settings = app_state.settings.get('immich', {})
if not immich_settings.get('enabled'):
return None
return ImmichFaceIntegration(
api_url=immich_settings.get('api_url'),
api_key=immich_settings.get('api_key')
)
@router.get("/immich/status")
@limiter.limit("60/minute")
@handle_exceptions
async def get_immich_status(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get Immich face integration status and connection info."""
app_state = get_app_state()
immich_settings = app_state.settings.get('immich', {})
if not immich_settings.get('enabled'):
return {
'enabled': False,
'connected': False,
'message': 'Immich integration is disabled'
}
immich = _get_immich_integration()
if not immich or not immich.is_configured:
return {
'enabled': True,
'connected': False,
'message': 'Immich API key not configured'
}
result = immich.test_connection()
return {
'enabled': True,
'connected': result['success'],
'message': result['message'],
'server_info': result.get('server_info'),
'api_url': immich_settings.get('api_url')
}
@router.put("/immich/settings")
@limiter.limit("30/minute")
@handle_exceptions
async def update_immich_settings(
request: Request,
current_user: Dict = Depends(get_current_user),
enabled: bool = Body(None, embed=True)
):
"""Update Immich integration settings."""
app_state = get_app_state()
immich_settings = app_state.settings.get('immich', {})
if enabled is not None:
immich_settings['enabled'] = enabled
# Save using settings manager
app_state.settings.set(
key='immich',
value=immich_settings,
category='face_recognition',
description='Immich face recognition integration',
updated_by=current_user.get('username', 'user')
)
return {
'success': True,
'enabled': immich_settings.get('enabled', False)
}
@router.get("/immich/statistics")
@limiter.limit("30/minute")
@handle_exceptions
async def get_immich_statistics(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get face recognition statistics from Immich."""
immich = _get_immich_integration()
if not immich:
raise ValidationError('Immich integration is not enabled')
if not immich.is_configured:
raise ValidationError('Immich API key not configured')
stats = immich.get_statistics()
return stats
@router.get("/immich/people")
@limiter.limit("30/minute")
@handle_exceptions
async def get_immich_people(
request: Request,
current_user: Dict = Depends(get_current_user),
named_only: bool = True
):
"""Get all people/faces from Immich."""
immich = _get_immich_integration()
if not immich:
raise ValidationError('Immich integration is not enabled')
if not immich.is_configured:
raise ValidationError('Immich API key not configured')
if named_only:
people = immich.get_named_people()
else:
people = immich.get_all_people()
return {
'people': people,
'count': len(people)
}
class ImmichFaceLookupRequest(BaseModel):
file_path: str
@router.post("/immich/faces")
@limiter.limit("60/minute")
@handle_exceptions
async def get_immich_faces_for_file(
request: Request,
current_user: Dict = Depends(get_current_user),
lookup: ImmichFaceLookupRequest = Body(...)
):
"""
Get face recognition data from Immich for a specific file.
This queries Immich's face recognition database for any detected
and identified faces in the specified file.
"""
# Validate path
file_path = validate_file_path_for_face(lookup.file_path)
immich = _get_immich_integration()
if not immich:
raise ValidationError('Immich integration is not enabled')
if not immich.is_configured:
raise ValidationError('Immich API key not configured')
result = immich.get_faces_for_file(str(file_path))
return result
@router.get("/immich/person/{person_name}/assets")
@limiter.limit("30/minute")
@handle_exceptions
async def get_immich_person_assets(
request: Request,
person_name: str,
current_user: Dict = Depends(get_current_user),
limit: int = 100
):
"""Get assets containing a specific person from Immich."""
immich = _get_immich_integration()
if not immich:
raise ValidationError('Immich integration is not enabled')
if not immich.is_configured:
raise ValidationError('Immich API key not configured')
# Find person by name
person = immich.get_person_by_name(person_name)
if not person:
raise NotFoundError(f'Person "{person_name}" not found in Immich')
# Get assets for person
assets = immich.get_person_assets(person['id'])
# Convert Immich paths to local paths and limit results
local_assets = []
for asset in assets[:limit]:
original_path = asset.get('originalPath', '')
local_path = immich._immich_to_local_path(original_path)
local_assets.append({
'asset_id': asset.get('id'),
'local_path': local_path,
'immich_path': original_path,
'type': asset.get('type'),
'created_at': asset.get('fileCreatedAt')
})
return {
'person_id': person['id'],
'person_name': person['name'],
'assets': local_assets,
'total': len(assets),
'returned': len(local_assets)
}