Files
media-downloader/modules/face_recognition_module.py
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

1763 lines
73 KiB
Python

#!/usr/bin/env python3
"""
Face Recognition Module
Detects and matches faces in images using InsightFace (ArcFace + RetinaFace)
"""
import numpy as np
import gc
from pathlib import Path
from typing import Optional, List, Dict, Tuple
import pickle
import base64
import os
import shutil
import uuid
import cv2
from modules.universal_logger import get_logger
# Directory for storing reference face images (independent of source files)
FACE_REFERENCES_DIR = Path(__file__).parent.parent / 'data' / 'face_references'
# Suppress TensorFlow warnings (legacy, no longer using TensorFlow)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
# Lazy import flags - these will be set on first use to avoid loading
# heavy ML libraries (torch, onnxruntime, CUDA) at module import time.
# This prevents ~6GB of memory being allocated just by importing this module.
FACE_RECOGNITION_AVAILABLE = None # Will be set on first use
INSIGHTFACE_AVAILABLE = None # Will be set on first use
_FaceAnalysis = None # Cached class reference
face_recognition = None # Cached module reference (used by code as face_recognition.xyz())
def _check_face_recognition_available():
"""Lazily check if face_recognition library is available and cache the module"""
global FACE_RECOGNITION_AVAILABLE, face_recognition
if FACE_RECOGNITION_AVAILABLE is None:
try:
import face_recognition as fr_module
face_recognition = fr_module
FACE_RECOGNITION_AVAILABLE = True
except ImportError:
FACE_RECOGNITION_AVAILABLE = False
return FACE_RECOGNITION_AVAILABLE
def _check_insightface_available():
"""Lazily check if InsightFace is available and cache the class"""
global INSIGHTFACE_AVAILABLE, _FaceAnalysis
if INSIGHTFACE_AVAILABLE is None:
try:
from insightface.app import FaceAnalysis
_FaceAnalysis = FaceAnalysis
INSIGHTFACE_AVAILABLE = True
except ImportError:
INSIGHTFACE_AVAILABLE = False
return INSIGHTFACE_AVAILABLE
def _get_face_analysis_class():
"""Get the FaceAnalysis class, loading it if necessary"""
_check_insightface_available()
return _FaceAnalysis
logger = get_logger('FaceRecognition')
class FaceRecognitionModule:
"""Face recognition for filtering downloaded images"""
def __init__(self, unified_db=None, log_callback=None):
"""
Initialize face recognition module
Args:
unified_db: Database connection for storing/retrieving encodings
log_callback: Optional callback for logging
"""
self.unified_db = unified_db
self.log_callback = log_callback
self.reference_encodings = {} # {person_name: [encoding1, encoding2, ...]} - InsightFace encodings
self.reference_encodings_fr = {} # {person_name: [encoding1, encoding2, ...]} - face_recognition encodings (fallback)
self.insightface_app = None # Lazy-loaded InsightFace analyzer
# Ensure schema has face_recognition encoding column
self._ensure_fr_encoding_column()
# Load reference encodings from database
self._load_reference_encodings()
def _get_insightface_model_name(self) -> str:
"""
Get InsightFace model name from settings
Returns:
Model name (e.g., 'buffalo_l', 'antelopev2')
"""
if self.unified_db:
try:
import json
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT value FROM settings WHERE key = 'face_recognition'")
result = cursor.fetchone()
if result:
settings = json.loads(result[0])
model = settings.get('insightface_model', 'buffalo_l')
return model
except Exception as e:
self._log(f"Failed to read insightface_model from settings: {e}", "debug")
return 'buffalo_l' # Default
def _get_insightface_app(self):
"""
Get or initialize InsightFace app (singleton pattern)
Returns:
FaceAnalysis app or None if not available
"""
if not _check_insightface_available():
return None
if self.insightface_app is None:
try:
model_name = self._get_insightface_model_name()
self._log(f"Initializing InsightFace with model: {model_name}", "info")
FaceAnalysis = _get_face_analysis_class()
self.insightface_app = FaceAnalysis(name=model_name, providers=['CPUExecutionProvider'])
self.insightface_app.prepare(ctx_id=0, det_size=(640, 640))
self._log("InsightFace initialized successfully", "info")
except Exception as e:
self._log(f"Failed to initialize InsightFace: {e}", "error")
return None
return self.insightface_app
def release_model(self):
"""
Release the InsightFace model to free memory.
Call this after batch processing to prevent OOM in long-running services.
The model will be lazy-loaded again when needed.
"""
if self.insightface_app is not None:
self._log("Releasing InsightFace model to free memory", "info")
del self.insightface_app
self.insightface_app = None
gc.collect()
self._log("InsightFace model released", "debug")
def _log(self, message: str, level: str = "info"):
"""Log message"""
if self.log_callback:
self.log_callback(f"[FaceRecognition] {message}", level)
# Use universal logger with module tags
if level == "debug":
logger.debug(message, module='FaceRecognition')
elif level == "info":
logger.info(message, module='FaceRecognition')
elif level == "warning":
logger.warning(message, module='FaceRecognition')
elif level == "error":
logger.error(message, module='FaceRecognition')
def _ensure_fr_encoding_column(self):
"""Ensure the face_recognition encoding column exists in database"""
if not self.unified_db:
return
try:
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(face_recognition_references)")
columns = [row[1] for row in cursor.fetchall()]
if 'encoding_data_fr' not in columns:
cursor.execute("ALTER TABLE face_recognition_references ADD COLUMN encoding_data_fr TEXT")
conn.commit()
self._log("Added encoding_data_fr column for face_recognition fallback", "info")
except Exception as e:
self._log(f"Error ensuring encoding_data_fr column: {e}", "error")
def _generate_thumbnail(self, image_path: str, max_size: int = 150) -> Optional[str]:
"""
Generate a base64-encoded JPEG thumbnail from an image.
Args:
image_path: Path to the source image
max_size: Maximum dimension (width or height) in pixels
Returns:
Base64-encoded JPEG thumbnail, or None on failure
"""
img = None
thumbnail = None
try:
img = cv2.imread(image_path)
if img is None:
self._log(f"Failed to load image for thumbnail: {image_path}", "warning")
return None
# Calculate new dimensions maintaining aspect ratio
height, width = img.shape[:2]
if width > height:
new_width = max_size
new_height = int(height * max_size / width)
else:
new_height = max_size
new_width = int(width * max_size / height)
# Resize image
thumbnail = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA)
# Encode as JPEG
_, buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 85])
# Convert to base64
thumbnail_b64 = base64.b64encode(buffer).decode('utf-8')
return thumbnail_b64
except Exception as e:
self._log(f"Failed to generate thumbnail: {e}", "warning")
return None
finally:
# Clean up memory
if img is not None:
del img
if thumbnail is not None:
del thumbnail
def _copy_reference_image(self, source_path: str, person_name: str) -> Tuple[Optional[str], Optional[str]]:
"""
Copy a reference image to the dedicated face references directory with UUID filename.
Args:
source_path: Path to the source image
person_name: Name of the person (for logging only)
Returns:
Tuple of (path to copied image, base64 thumbnail) or (None, None) on failure
"""
try:
source = Path(source_path)
if not source.exists():
self._log(f"Source image does not exist: {source_path}", "error")
return None, None
# Ensure the face references directory exists
FACE_REFERENCES_DIR.mkdir(parents=True, exist_ok=True)
# Generate UUID filename, preserving original extension
file_ext = source.suffix.lower()
# Normalize extensions
if file_ext in ['.jpeg']:
file_ext = '.jpg'
elif file_ext in ['.webp', '.heic', '.heif']:
# Convert to jpg for consistency
file_ext = '.jpg'
unique_id = str(uuid.uuid4())
dest_filename = f"{unique_id}{file_ext}"
dest_path = FACE_REFERENCES_DIR / dest_filename
# Copy the file
shutil.copy2(source_path, dest_path)
self._log(f"Copied reference image for '{person_name}' to: {dest_filename}", "debug")
# Generate thumbnail
thumbnail_b64 = self._generate_thumbnail(str(dest_path))
return str(dest_path), thumbnail_b64
except Exception as e:
self._log(f"Failed to copy reference image: {e}", "error")
return None, None
def _load_reference_encodings(self):
"""Load reference face encodings from database (both InsightFace and face_recognition)"""
if not self.unified_db:
self._log("No database connection, cannot load reference encodings", "warning")
return
try:
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
# Get all reference faces (including face_recognition encodings if available)
cursor.execute("""
SELECT person_name, encoding_data, encoding_data_fr, is_active
FROM face_recognition_references
WHERE is_active = 1
""")
rows = cursor.fetchall()
fr_count = 0
for person_name, encoding_data, encoding_data_fr, is_active in rows:
# Decode InsightFace encoding (primary)
encoding_bytes = base64.b64decode(encoding_data)
encoding = pickle.loads(encoding_bytes)
if person_name not in self.reference_encodings:
self.reference_encodings[person_name] = []
self.reference_encodings[person_name].append(encoding)
# Decode face_recognition encoding (fallback) if available
if encoding_data_fr:
try:
fr_encoding_bytes = base64.b64decode(encoding_data_fr)
fr_encoding = pickle.loads(fr_encoding_bytes)
if person_name not in self.reference_encodings_fr:
self.reference_encodings_fr[person_name] = []
self.reference_encodings_fr[person_name].append(fr_encoding)
fr_count += 1
except Exception as e:
self._log(f"Error loading face_recognition encoding: {e}", "debug")
self._log(f"Loaded {len(rows)} reference encodings for {len(self.reference_encodings)} people", "info")
if fr_count > 0:
self._log(f"Loaded {fr_count} face_recognition fallback encodings", "debug")
except Exception as e:
self._log(f"Error loading reference encodings: {e}", "error")
def _extract_video_frame(self, video_path: str) -> Optional[str]:
"""
Extract a single frame from video for face detection
Args:
video_path: Path to video file
Returns:
Path to extracted frame image, or None on failure
"""
import subprocess
import tempfile
import os
# Try multiple timestamps to find a frame with better face visibility
timestamps = ['1', '2', '3', '0.5'] # Try 1s, 2s, 3s, 0.5s
for timestamp in timestamps:
try:
# Create temp file for frame
temp_fd, output_path = tempfile.mkstemp(suffix='.jpg')
os.close(temp_fd)
# Extract frame at this timestamp with low priority
cmd = [
'nice', '-n', '19',
'ffmpeg',
'-ss', timestamp,
'-i', video_path,
'-frames:v', '1', # Extract 1 frame
'-q:v', '2', # High quality
'-y', # Overwrite
output_path
]
result = subprocess.run(cmd, capture_output=True, timeout=10)
# Verify frame was extracted successfully and has valid content
output_file = Path(output_path)
if result.returncode == 0 and output_file.exists() and output_file.stat().st_size > 1000:
self._log(f"Extracted frame from video at {timestamp}s: {Path(video_path).name}", "debug")
return output_path
else:
# Try next timestamp
if output_file.exists():
os.remove(output_path)
continue
except Exception as e:
self._log(f"Error extracting video frame at {timestamp}s: {e}", "debug")
continue
self._log(f"Failed to extract frame from video after trying multiple timestamps: {Path(video_path).name}", "warning")
return None
def _extract_video_frames_at_positions(self, video_path: str, positions: List[float] = None) -> List[str]:
"""
Extract multiple frames from video at specific positions
Args:
video_path: Path to video file
positions: List of positions (0.0 to 1.0) in the video to extract frames
e.g., [0.1, 0.5, 0.9] for 10%, 50%, 90% through the video
Returns:
List of paths to extracted frame images
"""
import subprocess
import tempfile
import os
if positions is None:
positions = [0.1, 0.3, 0.5, 0.7, 0.9] # Sample 5 frames for better coverage
extracted_frames = []
try:
# First, get video duration
duration_cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
video_path
]
duration_result = subprocess.run(duration_cmd, capture_output=True, timeout=5, text=True)
if duration_result.returncode != 0:
self._log(f"Failed to get video duration: {Path(video_path).name}", "warning")
return []
try:
duration = float(duration_result.stdout.strip())
except ValueError:
self._log(f"Invalid duration value for video: {Path(video_path).name}", "warning")
return []
# Extract frame at each position
for pos in positions:
try:
# Calculate timestamp
timestamp = duration * pos
# Create temp file for frame
temp_fd, output_path = tempfile.mkstemp(suffix='.jpg')
os.close(temp_fd)
# Extract frame at this timestamp with low priority
cmd = [
'nice', '-n', '19',
'ffmpeg',
'-ss', str(timestamp),
'-i', video_path,
'-frames:v', '1',
'-q:v', '2',
'-y',
output_path
]
result = subprocess.run(cmd, capture_output=True, timeout=10)
# Verify frame was extracted successfully and has valid content
output_file = Path(output_path)
if result.returncode == 0 and output_file.exists() and output_file.stat().st_size > 1000:
self._log(f"Extracted frame at {pos*100:.0f}% ({timestamp:.1f}s): {Path(video_path).name}", "debug")
extracted_frames.append(output_path)
else:
if output_file.exists():
size = output_file.stat().st_size
os.remove(output_path)
if size <= 1000:
self._log(f"Frame at position {pos} too small ({size} bytes), skipping", "debug")
else:
self._log(f"Failed to extract frame at position {pos}", "debug")
except Exception as e:
self._log(f"Error extracting frame at position {pos}: {e}", "debug")
continue
if not extracted_frames:
self._log(f"Failed to extract any frames from video: {Path(video_path).name}", "warning")
else:
self._log(f"Extracted {len(extracted_frames)} frames from video: {Path(video_path).name}", "debug")
return extracted_frames
except Exception as e:
self._log(f"Error extracting multiple frames from video: {e}", "error")
return []
def detect_faces(self, image_path: str, is_video: bool = False) -> List[np.ndarray]:
"""
Detect all faces in an image or video and return their encodings
Args:
image_path: Path to image/video file
is_video: If True, extract frame from video first
Returns:
List of face encodings (numpy arrays)
"""
temp_frame_path = None
try:
# If video, extract a frame first
if is_video:
temp_frame_path = self._extract_video_frame(image_path)
if not temp_frame_path:
return []
image_path = temp_frame_path
# Load image
image = face_recognition.load_image_file(image_path)
# Try HOG model first (faster)
face_locations = face_recognition.face_locations(image, model="hog", number_of_times_to_upsample=2)
# If no faces found with HOG, try CNN model (more accurate but slower)
if not face_locations:
self._log(f"No faces found with HOG model, trying CNN model for {Path(image_path).name}", "debug")
try:
face_locations = face_recognition.face_locations(image, model="cnn", number_of_times_to_upsample=1)
if face_locations:
self._log(f"CNN model found {len(face_locations)} face(s)", "debug")
except Exception as cnn_error:
self._log(f"CNN model failed: {cnn_error}, no faces detected", "debug")
if not face_locations:
self._log(f"No faces detected in {Path(image_path).name}", "debug")
return []
# Get face encodings
face_encodings = face_recognition.face_encodings(image, face_locations)
self._log(f"Detected {len(face_encodings)} face(s) in {Path(image_path).name}", "debug")
return face_encodings
except Exception as e:
self._log(f"Error detecting faces in {image_path}: {e}", "error")
return []
finally:
# Clean up temp frame
if temp_frame_path:
try:
import os
os.unlink(temp_frame_path)
except Exception:
pass
# Explicitly release memory to prevent OOM in long-running processes
try:
del image
del face_locations
except NameError:
pass
gc.collect()
def detect_faces_insightface(self, image_path: str) -> List[np.ndarray]:
"""
Detect faces using InsightFace (ArcFace + RetinaFace)
More accurate and 6x faster than DeepFace
Args:
image_path: Path to image file
Returns:
List of face embeddings (numpy arrays)
"""
app = self._get_insightface_app()
if app is None:
self._log("InsightFace not available, falling back to face_recognition", "warning")
return self.detect_faces(image_path, is_video=False)
try:
# Load image using cv2
img = cv2.imread(image_path)
if img is None:
self._log(f"Failed to load image: {Path(image_path).name}", "error")
return []
# Detect faces and get embeddings
faces = app.get(img)
# Extract embeddings and filter out low-quality detections
face_encodings = []
total_detections = len(faces)
for face in faces:
# Get face bounding box
bbox = face.bbox.astype(int)
x1, y1, x2, y2 = bbox
face_width = x2 - x1
face_height = y2 - y1
face_area = face_width * face_height
# Reject faces smaller than 80x80 pixels (text/logos/distant faces)
if face_area > 0 and face_area < 6400: # 80*80 = 6400
self._log(f"Skipping small face detection ({face_width}x{face_height}px) - likely false positive", "debug")
continue
# Reject detections with unusual aspect ratios (text patterns)
if face_width > 0 and face_height > 0:
aspect_ratio = face_width / face_height
if aspect_ratio < 0.5 or aspect_ratio > 2.0:
self._log(f"Skipping face with unusual aspect ratio {aspect_ratio:.2f} - likely text/logo", "debug")
continue
face_encodings.append(face.embedding)
# If too many faces detected (>10), likely detecting text as faces
if total_detections > 10:
self._log(f"WARNING: Detected {total_detections} faces, {len(face_encodings)} passed filters - possible text/graphics", "warning")
self._log(f"InsightFace detected {len(face_encodings)} valid face(s) in {Path(image_path).name}", "debug")
return face_encodings
except Exception as e:
self._log(f"Error detecting faces with InsightFace: {e}", "debug")
return []
finally:
# Explicitly release memory to prevent OOM in long-running processes
del img
del faces
gc.collect()
def match_face_insightface(self, face_encoding: np.ndarray, tolerance: float = 0.20) -> Tuple[Optional[str], float, Optional[str]]:
"""
Check if a face encoding matches any reference person using InsightFace cosine distance
Args:
face_encoding: Face embedding from InsightFace (numpy array)
tolerance: Match tolerance for cosine distance (0.0-1.0, default 0.15 for ArcFace)
Lower = stricter matching
Returns:
Tuple of (person_name, confidence, best_candidate) where:
- person_name: matched person if above threshold, None otherwise
- confidence: best confidence found (even if below threshold)
- best_candidate: best matching person name (even if below threshold)
"""
if not self.reference_encodings:
self._log("No reference encodings loaded", "warning")
return None, 0.0, None
best_match_person = None
best_match_distance = float('inf')
# Check against each reference person
for person_name, reference_list in self.reference_encodings.items():
# Calculate cosine distance to each reference encoding
for ref_encoding in reference_list:
# Cosine distance = 1 - cosine_similarity
from scipy.spatial.distance import cosine
distance = cosine(face_encoding, ref_encoding)
if distance < best_match_distance:
best_match_distance = distance
best_match_person = person_name
# Calculate confidence even if below threshold
confidence = 1.0 - best_match_distance if best_match_distance != float('inf') else 0.0
# Check if best match is within tolerance
if best_match_distance <= tolerance:
self._log(f"Match found: {best_match_person} (confidence: {confidence:.2%}, distance: {best_match_distance:.3f})", "debug")
return best_match_person, confidence, best_match_person
else:
self._log(f"No match found (best: {best_match_person} at {confidence:.2%}, distance: {best_match_distance:.3f} > tolerance: {tolerance})", "debug")
return None, confidence, best_match_person
def match_face(self, face_encoding: np.ndarray, tolerance: float = 0.20) -> Tuple[Optional[str], float, Optional[str]]:
"""
Check if a face encoding matches any reference person
Uses InsightFace if available, falls back to face_recognition
Args:
face_encoding: Face encoding to check (numpy array)
tolerance: Match tolerance (0.15 for InsightFace, 0.6 for face_recognition)
Returns:
Tuple of (person_name, confidence, best_candidate) where:
- person_name: matched person if above threshold, None otherwise
- confidence: best confidence found (even if below threshold)
- best_candidate: best matching person name (even if below threshold)
"""
if _check_insightface_available():
return self.match_face_insightface(face_encoding, tolerance)
# Fallback to old method
if not self.reference_encodings:
self._log("No reference encodings loaded", "warning")
return None, 0.0, None
best_match_person = None
best_match_distance = float('inf')
# Check against each reference person
for person_name, reference_list in self.reference_encodings.items():
# Compare with all reference encodings for this person
distances = face_recognition.face_distance(reference_list, face_encoding)
# Get the best (minimum) distance
min_distance = float(np.min(distances))
if min_distance < best_match_distance:
best_match_distance = min_distance
best_match_person = person_name
# Calculate confidence even if below threshold
confidence = 1.0 - best_match_distance if best_match_distance != float('inf') else 0.0
# Check if best match is within tolerance
if best_match_distance <= tolerance:
self._log(f"Match found: {best_match_person} (confidence: {confidence:.2%})", "debug")
return best_match_person, confidence, best_match_person
else:
self._log(f"No match found (best: {best_match_person} at {confidence:.2%}, distance: {best_match_distance:.3f} > tolerance: {tolerance})", "debug")
return None, confidence, best_match_person
def _match_face_fr(self, face_encoding: np.ndarray, tolerance: float = 0.6) -> Tuple[Optional[str], float, Optional[str]]:
"""
Match a face encoding using face_recognition library encodings (128-dim)
Used as fallback when InsightFace fails to detect faces
Args:
face_encoding: Face encoding from face_recognition library (128-dim numpy array)
tolerance: Match tolerance (default 0.6 for face_recognition)
Returns:
Tuple of (person_name, confidence, best_candidate) where:
- person_name: matched person if above threshold, None otherwise
- confidence: best confidence found (even if below threshold)
- best_candidate: best matching person name (even if below threshold)
"""
if not self.reference_encodings_fr:
self._log("No face_recognition reference encodings loaded", "warning")
return None, 0.0, None
best_match_person = None
best_match_distance = float('inf')
# Check against each reference person's face_recognition encodings
for person_name, reference_list in self.reference_encodings_fr.items():
if not reference_list:
continue
# Compare with all reference encodings for this person
distances = face_recognition.face_distance(reference_list, face_encoding)
# Get the best (minimum) distance
min_distance = float(np.min(distances))
if min_distance < best_match_distance:
best_match_distance = min_distance
best_match_person = person_name
# Calculate confidence even if below threshold
confidence = 1.0 - best_match_distance if best_match_distance != float('inf') else 0.0
# Check if best match is within tolerance
if best_match_distance <= tolerance:
self._log(f"FR fallback match: {best_match_person} (confidence: {confidence:.2%}, distance: {best_match_distance:.3f})", "info")
return best_match_person, confidence, best_match_person
else:
self._log(f"FR fallback no match (best: {best_match_person} at {confidence:.2%}, distance: {best_match_distance:.3f} > tolerance: {tolerance})", "debug")
return None, confidence, best_match_person
def _get_target_person_name(self) -> str:
"""
Get the configured target person name from face_recognition settings.
Returns:
The target person name (e.g., 'Eva Longoria')
"""
if self.unified_db:
try:
import json
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT value FROM settings WHERE key = 'face_recognition'")
result = cursor.fetchone()
if result:
settings = json.loads(result[0])
return settings.get('person_name', 'Eva Longoria')
except Exception as e:
self._log(f"Failed to read person_name from settings: {e}", "debug")
return 'Eva Longoria' # Default
def _get_immich_tolerance(self) -> float:
"""
Get the Immich face matching tolerance from settings.
Returns:
Cosine distance threshold (lower = stricter, default 0.35)
"""
if self.unified_db:
try:
import json
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT value FROM settings WHERE key = 'face_recognition'")
result = cursor.fetchone()
if result:
settings = json.loads(result[0])
return settings.get('immich_tolerance', 0.35)
except Exception as e:
self._log(f"Failed to read immich_tolerance from settings: {e}", "debug")
return 0.35 # Default - stricter than 0.5
def _check_immich_faces(self, image_path: str) -> Optional[Dict]:
"""
Match faces against Immich's face database using embedding comparison.
Detects faces locally with InsightFace, then compares embeddings
against Immich's PostgreSQL database of named faces. This works
for ANY file, not just files already in Immich.
IMPORTANT: Only returns has_match=True if the matched person is
the configured target person (e.g., 'Eva Longoria'), not just any person.
Args:
image_path: Path to image file
Returns:
Dict with face results if Immich matching succeeds, None to fall back
"""
import subprocess
try:
# Check if Immich integration is configured
if not self.unified_db:
return None
# Get Immich settings from database
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT value FROM settings WHERE key = 'immich'")
row = cursor.fetchone()
if not row:
return None
import json
immich_settings = json.loads(row[0])
if not immich_settings.get('enabled'):
return None
# Get target person name and tolerance
target_person = self._get_target_person_name()
tolerance = self._get_immich_tolerance()
# Detect faces locally using InsightFace
if not _check_insightface_available():
return None
face_encodings = self.detect_faces_insightface(image_path)
if not face_encodings:
# No faces detected - return result (don't fall back)
return {
'has_match': False,
'person_name': None,
'confidence': 0.0,
'best_candidate': None,
'face_count': 0,
'faces': [],
'source': 'immich_embedding'
}
# Query Immich's PostgreSQL for nearest matches
faces = []
best_person = None
best_distance = float('inf')
has_match = False
for i, encoding in enumerate(face_encodings):
# Convert numpy array to PostgreSQL vector format
embedding_str = '[' + ','.join(str(x) for x in encoding.flatten()) + ']'
# Query Immich database for nearest named face
# Using cosine distance (<=>), lower = more similar
query = f"""
SELECT
p.name,
fs.embedding <=> '{embedding_str}' as distance
FROM face_search fs
JOIN asset_face af ON af.id = fs."faceId"
JOIN person p ON af."personId" = p.id
WHERE p.name IS NOT NULL AND p.name != ''
ORDER BY fs.embedding <=> '{embedding_str}'
LIMIT 1;
"""
result = subprocess.run(
['docker', 'exec', 'immich_postgres', 'psql', '-U', 'postgres', '-d', 'immich', '-t', '-c', query],
capture_output=True, text=True, timeout=10
)
person_name = None
distance = float('inf')
confidence = 0.0
if result.returncode == 0 and result.stdout.strip():
parts = result.stdout.strip().split('|')
if len(parts) >= 2:
person_name = parts[0].strip()
try:
distance = float(parts[1].strip())
# Convert cosine distance to confidence (0-1)
# Cosine distance of 0 = identical, 2 = opposite
confidence = max(0, 1 - distance)
except ValueError:
pass
# Check if within tolerance AND matches target person
# Only consider it a match if it's the configured target person
is_within_tolerance = distance < tolerance and person_name
is_target_person = person_name and person_name.lower() == target_person.lower()
matched = is_within_tolerance and is_target_person
face_result = {
'face_index': i,
'person_name': person_name if matched else None,
'confidence': confidence,
'best_candidate': person_name,
'matched': bool(matched),
'distance': distance,
'is_target': is_target_person
}
faces.append(face_result)
if matched and distance < best_distance:
best_distance = distance
best_person = person_name
has_match = True
self._log(f"Immich embedding: {len(face_encodings)} faces in {Path(image_path).name}", "info")
if best_person:
self._log(f"Immich match: {best_person} (distance: {best_distance:.3f}, tolerance: {tolerance})", "info")
elif faces and faces[0].get('best_candidate'):
# Log when a face was found but didn't match target person
candidate = faces[0].get('best_candidate')
dist = faces[0].get('distance', 999)
self._log(f"Immich no match: best candidate was {candidate} (distance: {dist:.3f}), target is {target_person}", "debug")
return {
'has_match': has_match,
'person_name': best_person,
'confidence': max(0, 1 - best_distance) if has_match else 0.0,
'best_candidate': best_person,
'face_count': len(face_encodings),
'faces': faces,
'source': 'immich_embedding'
}
except subprocess.TimeoutExpired:
self._log("Immich database query timed out", "warning")
return None
except Exception as e:
# Any error, fall back to local reference matching
self._log(f"Immich embedding check failed: {e}", "debug")
return None
def check_image(self, image_path: str, tolerance: float = 0.20, is_video: bool = False) -> Dict:
"""
Complete face check: detect faces and match against references
Args:
image_path: Path to image/video file
tolerance: Match tolerance (0.0 - 1.0)
is_video: If True, extract frame from video first
Returns:
Dict with:
- has_match: bool
- person_name: str or None (matched person if above threshold)
- confidence: float (best confidence found, even if below threshold)
- best_candidate: str or None (best matching person, even if below threshold)
- face_count: int
- faces: List of match results for each face
- source: str ('immich' or 'insightface')
"""
result = {
'has_match': False,
'person_name': None,
'confidence': 0.0,
'best_candidate': None,
'face_count': 0,
'faces': [],
'source': 'insightface'
}
# Try Immich first if available (faster, uses existing clustering)
immich_result = self._check_immich_faces(image_path)
if immich_result:
return immich_result
# Detect all faces - use InsightFace if available (even for videos)
if _check_insightface_available():
if is_video:
# Extract frame from video first
temp_frame = self._extract_video_frame(image_path)
if temp_frame:
try:
face_encodings = self.detect_faces_insightface(temp_frame)
finally:
import os
try:
os.unlink(temp_frame)
except Exception:
pass
else:
face_encodings = []
else:
face_encodings = self.detect_faces_insightface(image_path)
else:
face_encodings = self.detect_faces(image_path, is_video=is_video)
result['face_count'] = len(face_encodings)
if not face_encodings:
self._log(f"No faces detected in {Path(image_path).name}", "info")
return result
# Check each detected face
for i, face_encoding in enumerate(face_encodings):
person_name, confidence, best_candidate = self.match_face(face_encoding, tolerance)
face_result = {
'face_index': i,
'person_name': person_name,
'confidence': confidence,
'best_candidate': best_candidate,
'matched': person_name is not None
}
result['faces'].append(face_result)
# Track the highest confidence across all faces (matched or not)
if confidence > result['confidence']:
result['confidence'] = confidence
result['best_candidate'] = best_candidate
# Only set person_name if this is an actual match
if person_name:
result['person_name'] = person_name
result['has_match'] = True
# Force garbage collection after face processing to free memory
gc.collect()
return result
def check_video_multiframe(self, video_path: str, tolerance: float = 0.20, positions: List[float] = None) -> Dict:
"""
Check video using multiple frames for better face detection
Args:
video_path: Path to video file
tolerance: Match tolerance (0.0 - 1.0)
positions: List of positions (0.0 to 1.0) to extract frames from
Returns:
Dict with best match across all frames:
- has_match: bool
- person_name: str or None (matched person if above threshold)
- confidence: float (best confidence across all frames, even if below threshold)
- best_candidate: str or None (best matching person, even if below threshold)
- face_count: int (total faces found across all frames)
- frames_checked: int (number of frames successfully extracted)
- best_frame_index: int (which frame had the best match)
"""
import os
result = {
'has_match': False,
'person_name': None,
'confidence': 0.0,
'best_candidate': None,
'face_count': 0,
'frames_checked': 0,
'best_frame_index': -1
}
if positions is None:
positions = [0.1, 0.3, 0.5, 0.7, 0.9] # Sample 5 frames for better coverage
# Extract multiple frames from video
frame_paths = self._extract_video_frames_at_positions(video_path, positions)
if not frame_paths:
self._log(f"No frames extracted from video: {Path(video_path).name}", "warning")
return result
result['frames_checked'] = len(frame_paths)
try:
best_confidence = 0.0
best_person_name = None # Matched person (above threshold)
best_candidate = None # Best candidate (even if below threshold)
best_frame_idx = -1
total_faces = 0
# Check each frame
for idx, frame_path in enumerate(frame_paths):
try:
# Use InsightFace if available for better accuracy
if _check_insightface_available():
face_encodings = self.detect_faces_insightface(frame_path)
else:
face_encodings = self.detect_faces(frame_path, is_video=False)
total_faces += len(face_encodings)
# Check each face in this frame
for face_encoding in face_encodings:
person_name, confidence, candidate = self.match_face(face_encoding, tolerance)
# Keep track of best confidence (whether matched or not)
if confidence > best_confidence:
best_confidence = confidence
best_candidate = candidate
best_frame_idx = idx
# Only set best_person_name if above threshold
if person_name:
best_person_name = person_name
self._log(f"Frame {idx+1}/{len(frame_paths)}: Found {len(face_encodings)} faces", "debug")
except Exception as e:
self._log(f"Error checking frame {idx+1}: {e}", "debug")
continue
finally:
# Clean up temp frame
if os.path.exists(frame_path):
os.remove(frame_path)
# Update result with best match
result['face_count'] = total_faces
# Apply stricter rules for crowd/group situations
# If many faces detected, require higher confidence to avoid false positives
# ONLY applies when using strict/default tolerance - if user specified relaxed tolerance, respect it
if best_person_name:
avg_faces_per_frame = total_faces / len(frame_paths) if frame_paths else 0
# If 3+ faces per frame on average, it's likely a crowd/group/performance
# Require 95%+ confidence ONLY when using strict tolerance (0.20 or less)
# If tolerance > 0.20 (source-based or relaxed), skip crowd detection
if tolerance <= 0.20 and avg_faces_per_frame >= 3 and best_confidence < 0.95:
self._log(f"Rejecting match in crowd situation ({total_faces} faces, {avg_faces_per_frame:.1f} avg/frame): {best_person_name} at {best_confidence:.2%} < 95% required", "warning")
best_person_name = None
# Keep best_confidence and best_candidate for review display
elif tolerance > 0.20 and avg_faces_per_frame >= 3:
self._log(f"Crowd situation detected ({total_faces} faces, {avg_faces_per_frame:.1f} avg/frame) but skipping strict check due to relaxed tolerance ({tolerance})", "debug")
# Always store best confidence and candidate (even for non-matches)
result['confidence'] = best_confidence
result['best_candidate'] = best_candidate
result['best_frame_index'] = best_frame_idx
if best_person_name:
result['has_match'] = True
result['person_name'] = best_person_name
self._log(f"Best match: {best_person_name} with {best_confidence:.2%} confidence (frame {best_frame_idx+1})", "info")
else:
self._log(f"No match found in any of {len(frame_paths)} frames (total {total_faces} faces, best: {best_candidate} at {best_confidence:.2%})", "info")
except Exception as e:
self._log(f"Error in multi-frame video check: {e}", "error")
finally:
# Force garbage collection after video processing to free memory from ML models
gc.collect()
return result
def add_reference_face(self, person_name: str, image_path: str) -> bool:
"""
Add a reference face encoding for a person using InsightFace.
The reference image is copied to a dedicated directory to prevent
issues if the original file is moved or deleted.
Args:
person_name: Name of the person
image_path: Path to reference image or video
Returns:
True if successful, False otherwise
"""
temp_frame = None
stored_image_path = None
try:
# Check if input is a video file
video_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.m4v']
is_video = any(image_path.lower().endswith(ext) for ext in video_extensions)
# Detect face in reference image/video - use InsightFace if available
if _check_insightface_available():
if is_video:
# Extract frame from video first
temp_frame = self._extract_video_frame(image_path)
if temp_frame:
face_encodings = self.detect_faces_insightface(temp_frame)
else:
face_encodings = []
else:
face_encodings = self.detect_faces_insightface(image_path)
else:
# Fallback to old method
face_encodings = self.detect_faces(image_path, is_video=is_video)
if not face_encodings:
self._log(f"No face detected in reference image: {image_path}", "error")
return False
if len(face_encodings) > 1:
self._log(f"Multiple faces detected in reference image, using first one", "warning")
# Use first detected face (InsightFace encoding)
encoding = face_encodings[0]
# Serialize InsightFace encoding
encoding_bytes = pickle.dumps(encoding)
encoding_b64 = base64.b64encode(encoding_bytes).decode('utf-8')
# Also generate face_recognition encoding for fallback
encoding_fr_b64 = None
fr_image = None
if _check_face_recognition_available():
try:
source_for_fr = temp_frame if (is_video and temp_frame) else image_path
fr_image = face_recognition.load_image_file(source_for_fr)
fr_encodings = face_recognition.face_encodings(fr_image)
if fr_encodings:
fr_encoding_bytes = pickle.dumps(fr_encodings[0])
encoding_fr_b64 = base64.b64encode(fr_encoding_bytes).decode('utf-8')
self._log(f"Generated face_recognition fallback encoding", "debug")
except Exception as e:
self._log(f"Could not generate face_recognition encoding: {e}", "debug")
finally:
# Clean up fr_image to prevent memory leak
if fr_image is not None:
del fr_image
gc.collect()
# Copy reference image to dedicated directory with UUID filename
# For videos, save the extracted frame; for images, copy the original
if is_video and temp_frame:
# For videos, copy the extracted frame (which is a jpg)
stored_image_path, thumbnail_b64 = self._copy_reference_image(temp_frame, person_name)
else:
# For images, copy the original file
stored_image_path, thumbnail_b64 = self._copy_reference_image(image_path, person_name)
if not stored_image_path:
self._log(f"Failed to copy reference image to storage directory", "error")
return False
# Store in database with the copied image path and thumbnail
if self.unified_db:
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
# Check if thumbnail_data column exists, add if not
cursor.execute("PRAGMA table_info(face_recognition_references)")
columns = [row[1] for row in cursor.fetchall()]
if 'thumbnail_data' not in columns:
cursor.execute("ALTER TABLE face_recognition_references ADD COLUMN thumbnail_data TEXT")
cursor.execute("""
INSERT INTO face_recognition_references
(person_name, encoding_data, encoding_data_fr, reference_image_path, thumbnail_data, is_active, created_at)
VALUES (?, ?, ?, ?, ?, 1, datetime('now'))
""", (person_name, encoding_b64, encoding_fr_b64, stored_image_path, thumbnail_b64))
conn.commit()
# Reload encodings
self._load_reference_encodings()
self._log(f"Added reference face for '{person_name}' from {Path(image_path).name}", "info")
return True
else:
self._log("No database connection, cannot save reference face", "error")
# Clean up copied file since we couldn't save to database
if stored_image_path:
try:
Path(stored_image_path).unlink(missing_ok=True)
except OSError:
pass
return False
except Exception as e:
self._log(f"Error adding reference face: {e}", "error")
# Clean up copied file on error
if stored_image_path:
try:
Path(stored_image_path).unlink(missing_ok=True)
except OSError:
pass
return False
finally:
# Clean up temp frame from video extraction
if temp_frame:
try:
os.unlink(temp_frame)
except OSError:
pass
def remove_reference_face(self, reference_id: int, hard_delete: bool = False) -> bool:
"""
Remove a reference face encoding
Args:
reference_id: Database ID of reference to remove
hard_delete: If True, permanently delete from database. If False, soft delete (is_active=0)
Returns:
True if successful
"""
try:
if not self.unified_db:
self._log("No database connection", "error")
return False
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Get the file path before deleting
cursor.execute(
"SELECT reference_image_path FROM face_recognition_references WHERE id = ?",
(reference_id,)
)
row = cursor.fetchone()
if not row:
self._log(f"Reference ID {reference_id} not found", "warning")
return False
file_path = row[0]
if hard_delete:
# Permanently delete from database
cursor.execute("DELETE FROM face_recognition_references WHERE id = ?", (reference_id,))
else:
# Soft delete (set is_active = 0)
cursor.execute("""
UPDATE face_recognition_references
SET is_active = 0, updated_at = datetime('now')
WHERE id = ?
""", (reference_id,))
conn.commit()
# Delete the file from storage directory if it's in our managed directory
if file_path and str(FACE_REFERENCES_DIR) in file_path:
try:
Path(file_path).unlink(missing_ok=True)
self._log(f"Deleted reference file: {Path(file_path).name}", "debug")
except OSError as e:
self._log(f"Failed to delete file {file_path}: {e}", "warning")
# Reload encodings
self._load_reference_encodings()
self._log(f"Removed reference face ID {reference_id}", "info")
return True
except Exception as e:
self._log(f"Error removing reference face: {e}", "error")
return False
def purge_inactive_references(self) -> Dict:
"""
Permanently delete all inactive references and their files.
Returns:
Dict with count of purged references and any errors
"""
result = {'purged': 0, 'errors': []}
if not self.unified_db:
result['errors'].append("No database connection")
return result
try:
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Get all inactive references
cursor.execute("""
SELECT id, reference_image_path
FROM face_recognition_references
WHERE is_active = 0
""")
inactive = cursor.fetchall()
for ref_id, file_path in inactive:
try:
# Delete file if it exists in our managed directory
if file_path and str(FACE_REFERENCES_DIR) in file_path:
Path(file_path).unlink(missing_ok=True)
# Delete from database
cursor.execute("DELETE FROM face_recognition_references WHERE id = ?", (ref_id,))
result['purged'] += 1
except Exception as e:
result['errors'].append(f"Failed to purge ID {ref_id}: {str(e)}")
conn.commit()
self._log(f"Purged {result['purged']} inactive references", "info")
except Exception as e:
result['errors'].append(f"Purge failed: {str(e)}")
return result
def get_reference_faces(self) -> List[Dict]:
"""
Get all active reference faces from database
Returns:
List of dict with id, person_name, reference_image_path, thumbnail_data, created_at
"""
if not self.unified_db:
return []
try:
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
# Check if thumbnail_data column exists
cursor.execute("PRAGMA table_info(face_recognition_references)")
columns = [row[1] for row in cursor.fetchall()]
has_thumbnail = 'thumbnail_data' in columns
if has_thumbnail:
cursor.execute("""
SELECT id, person_name, reference_image_path, thumbnail_data, created_at
FROM face_recognition_references
WHERE is_active = 1
ORDER BY person_name, created_at
""")
rows = cursor.fetchall()
return [
{
'id': row[0],
'person_name': row[1],
'reference_image_path': row[2],
'thumbnail_data': row[3],
'created_at': row[4]
}
for row in rows
]
else:
cursor.execute("""
SELECT id, person_name, reference_image_path, created_at
FROM face_recognition_references
WHERE is_active = 1
ORDER BY person_name, created_at
""")
rows = cursor.fetchall()
return [
{
'id': row[0],
'person_name': row[1],
'reference_image_path': row[2],
'thumbnail_data': None,
'created_at': row[3]
}
for row in rows
]
except Exception as e:
self._log(f"Error getting reference faces: {e}", "error")
return []
def retrain_all_references_with_model(self, new_model_name: str, progress_callback=None) -> Dict:
"""
Re-train all reference faces with a new InsightFace model
This is required when switching models because different models produce
incompatible embeddings. This method re-extracts embeddings from the
original reference images using the new model.
Args:
new_model_name: Name of new model (e.g., 'buffalo_l', 'antelopev2')
progress_callback: Optional callback function called after each reference is processed
with signature: callback(current, total, person_name, success)
Returns:
Dict with status information:
- success: bool
- total: int (total references)
- updated: int (successfully updated)
- failed: int (failed to update)
- errors: list of error messages
"""
# Check database connection
if not self.unified_db:
error_msg = 'Database connection not available'
self._log(error_msg, "error")
return {
'success': False,
'total': 0,
'updated': 0,
'failed': 0,
'errors': [error_msg]
}
self._log(f"Re-training all references with model: {new_model_name}", "info")
result = {
'success': True,
'total': 0,
'updated': 0,
'failed': 0,
'errors': []
}
try:
# Import InsightFace here to avoid module-level import issues
try:
from insightface.app import FaceAnalysis
except ImportError as ie:
error_msg = f"InsightFace not available: {str(ie)}"
self._log(error_msg, "error")
return {
'success': False,
'total': 0,
'updated': 0,
'failed': 0,
'errors': [error_msg]
}
# Initialize new model
app = FaceAnalysis(name=new_model_name, providers=['CPUExecutionProvider'])
app.prepare(ctx_id=0, det_size=(640, 640))
self._log(f"Loaded model {new_model_name} for re-training", "info")
# Get all active references
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, person_name, reference_image_path
FROM face_recognition_references
WHERE is_active = 1
""")
references = cursor.fetchall()
result['total'] = len(references)
self._log(f"Found {len(references)} references to re-train", "info")
# Re-train each reference
for idx, (ref_id, person_name, image_path) in enumerate(references, 1):
img = None
faces = None
try:
# Check if file exists
from pathlib import Path
if not Path(image_path).exists():
error_msg = f"Reference image not found: {image_path}"
self._log(error_msg, "warning")
result['errors'].append(error_msg)
result['failed'] += 1
if progress_callback:
progress_callback(idx, result['total'], person_name, False)
continue
# Extract face using new model
import cv2
img = cv2.imread(image_path)
faces = app.get(img)
if not faces or len(faces) == 0:
error_msg = f"No face detected in {Path(image_path).name}"
self._log(error_msg, "warning")
result['errors'].append(error_msg)
result['failed'] += 1
if progress_callback:
progress_callback(idx, result['total'], person_name, False)
continue
# Use first detected face
face_encoding = faces[0].embedding
# Encode and store in database
encoding_bytes = pickle.dumps(face_encoding)
encoding_b64 = base64.b64encode(encoding_bytes).decode('utf-8')
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE face_recognition_references
SET encoding_data = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (encoding_b64, ref_id))
conn.commit()
result['updated'] += 1
self._log(f"Re-trained {person_name} reference #{ref_id}", "debug")
# Call progress callback on success
if progress_callback:
progress_callback(idx, result['total'], person_name, True)
except Exception as e:
error_msg = f"Failed to re-train reference {ref_id}: {str(e)}"
self._log(error_msg, "error")
result['errors'].append(error_msg)
result['failed'] += 1
if progress_callback:
progress_callback(idx, result['total'], person_name, False)
finally:
# Clean up memory after each reference
if img is not None:
del img
if faces is not None:
del faces
# Reload encodings from database
self._load_reference_encodings()
# Force reload of InsightFace app with new model
self.insightface_app = None
self._log(f"Re-training complete: {result['updated']}/{result['total']} successful", "info")
result['success'] = result['failed'] == 0
except Exception as e:
error_msg = f"Fatal error during re-training: {str(e)}"
self._log(error_msg, "error")
result['success'] = False
result['errors'].append(error_msg)
return result
def migrate_references_to_storage(self, progress_callback=None) -> Dict:
"""
Migrate existing reference images to the dedicated storage directory.
This copies reference images that still exist to the face_references directory
and updates the database paths. References with missing source files are
deactivated.
Args:
progress_callback: Optional callback function called after each reference
with signature: callback(current, total, person_name, status)
where status is 'migrated', 'deactivated', or 'skipped'
Returns:
Dict with:
- total: int (total references checked)
- migrated: int (successfully migrated)
- deactivated: int (deactivated due to missing files)
- skipped: int (already in storage directory)
- errors: list of error messages
"""
result = {
'total': 0,
'migrated': 0,
'deactivated': 0,
'skipped': 0,
'errors': []
}
if not self.unified_db:
result['errors'].append("No database connection")
return result
try:
# Ensure storage directory exists
FACE_REFERENCES_DIR.mkdir(parents=True, exist_ok=True)
storage_dir_str = str(FACE_REFERENCES_DIR)
# Get all active references
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, person_name, reference_image_path
FROM face_recognition_references
WHERE is_active = 1
""")
references = cursor.fetchall()
result['total'] = len(references)
self._log(f"Checking {len(references)} references for migration", "info")
# Ensure thumbnail_data column exists
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(face_recognition_references)")
columns = [row[1] for row in cursor.fetchall()]
if 'thumbnail_data' not in columns:
cursor.execute("ALTER TABLE face_recognition_references ADD COLUMN thumbnail_data TEXT")
conn.commit()
for idx, (ref_id, person_name, image_path) in enumerate(references, 1):
try:
# Check if file exists
if not Path(image_path).exists():
self._log(f"Reference {ref_id} ({person_name}): file missing, deactivating", "warning")
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE face_recognition_references
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (ref_id,))
conn.commit()
result['deactivated'] += 1
if progress_callback:
progress_callback(idx, result['total'], person_name, 'deactivated')
continue
# Check if already has UUID filename (36 char UUID + extension)
current_filename = Path(image_path).stem
is_uuid = len(current_filename) == 36 and current_filename.count('-') == 4
if image_path.startswith(storage_dir_str) and is_uuid:
# Already migrated with UUID, just generate thumbnail if missing
thumbnail_b64 = self._generate_thumbnail(image_path)
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE face_recognition_references
SET thumbnail_data = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND (thumbnail_data IS NULL OR thumbnail_data = '')
""", (thumbnail_b64, ref_id))
conn.commit()
result['skipped'] += 1
if progress_callback:
progress_callback(idx, result['total'], person_name, 'skipped')
continue
# Need to migrate: copy to storage with UUID filename
new_path, thumbnail_b64 = self._copy_reference_image(image_path, person_name)
if not new_path:
result['errors'].append(f"Failed to copy reference {ref_id}")
continue
# Update database with new path and thumbnail
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE face_recognition_references
SET reference_image_path = ?, thumbnail_data = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_path, thumbnail_b64, ref_id))
conn.commit()
# Delete old file if it was in storage directory (not the original source)
if image_path.startswith(storage_dir_str):
try:
Path(image_path).unlink(missing_ok=True)
except OSError:
pass
result['migrated'] += 1
self._log(f"Migrated reference {ref_id} ({person_name}) to UUID filename", "debug")
if progress_callback:
progress_callback(idx, result['total'], person_name, 'migrated')
except Exception as e:
error_msg = f"Error migrating reference {ref_id}: {str(e)}"
self._log(error_msg, "error")
result['errors'].append(error_msg)
self._log(f"Migration complete: {result['migrated']} migrated, {result['deactivated']} deactivated, {result['skipped']} already in storage", "info")
except Exception as e:
error_msg = f"Migration failed: {str(e)}"
self._log(error_msg, "error")
result['errors'].append(error_msg)
return result