Files
Todd 49e72207bf Encrypt file paths in API URLs using Fernet tokens
Raw filesystem paths were exposed in browser URLs, dev tools, and proxy logs.
Now all file-serving endpoints accept an opaque encrypted token (t= param)
derived from the session secret via HKDF, with a 4-hour TTL.

Backend:
- Add core/path_tokens.py with Fernet encrypt/decrypt (HKDF from .session_secret)
- Add file_token to all list/gallery/feed/search responses across 7 routers
- Accept optional t= param on all file-serving endpoints (backward compatible)

Frontend:
- Update 4 URL helpers in api.ts to prefer token when available
- Add 4 new helpers for paid-content/embedded-metadata URLs
- Update all 14 page/component files to pass file_token to URL builders
- Add file_token to all relevant TypeScript interfaces

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 08:25:22 -04:00

374 lines
12 KiB
Python

"""
Semantic Search Router
Handles CLIP-based semantic search operations:
- Text-based image/video search
- Similar file search
- Embedding generation and management
- Model settings
"""
import asyncio
import time
from typing import Dict, Optional
from fastapi import APIRouter, BackgroundTasks, Body, Depends, Query, Request
from pydantic import BaseModel
from slowapi import Limiter
from slowapi.util import get_remote_address
from ..core.dependencies import get_current_user, require_admin, get_app_state
from ..core.exceptions import handle_exceptions, ValidationError
from ..core.path_tokens import encode_path
from modules.semantic_search import get_semantic_search
from modules.universal_logger import get_logger
logger = get_logger('API')
router = APIRouter(prefix="/api/semantic", tags=["Semantic Search"])
limiter = Limiter(key_func=get_remote_address)
# Batch limit for embedding generation
EMBEDDING_BATCH_LIMIT = 10000
# ============================================================================
# PYDANTIC MODELS
# ============================================================================
class SemanticSearchRequest(BaseModel):
query: str
limit: int = 50
platform: Optional[str] = None
source: Optional[str] = None
threshold: float = 0.2
class GenerateEmbeddingsRequest(BaseModel):
limit: int = 100
platform: Optional[str] = None
class SemanticSettingsUpdate(BaseModel):
model: Optional[str] = None
threshold: Optional[float] = None
# ============================================================================
# SEARCH ENDPOINTS
# ============================================================================
@router.get("/stats")
@limiter.limit("120/minute")
@handle_exceptions
async def get_semantic_stats(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get statistics about semantic search embeddings."""
app_state = get_app_state()
search = get_semantic_search(app_state.db)
stats = search.get_embedding_stats()
return stats
@router.post("/search")
@limiter.limit("30/minute")
@handle_exceptions
async def semantic_search(
request: Request,
current_user: Dict = Depends(get_current_user),
query: str = Body(..., embed=True),
limit: int = Body(50, ge=1, le=200),
platform: Optional[str] = Body(None),
source: Optional[str] = Body(None),
threshold: float = Body(0.2, ge=0.0, le=1.0)
):
"""Search for images/videos using natural language query."""
app_state = get_app_state()
search = get_semantic_search(app_state.db)
results = search.search_by_text(
query=query,
limit=limit,
platform=platform,
source=source,
threshold=threshold
)
for r in results:
fp = r.get('file_path')
r['file_token'] = encode_path(fp) if fp else None
return {"results": results, "count": len(results), "query": query}
@router.post("/similar/{file_id}")
@limiter.limit("30/minute")
@handle_exceptions
async def find_similar_files(
request: Request,
file_id: int,
current_user: Dict = Depends(get_current_user),
limit: int = Query(50, ge=1, le=200),
platform: Optional[str] = Query(None),
source: Optional[str] = Query(None),
threshold: float = Query(0.5, ge=0.0, le=1.0)
):
"""Find files similar to a given file."""
app_state = get_app_state()
search = get_semantic_search(app_state.db)
results = search.search_by_file_id(
file_id=file_id,
limit=limit,
platform=platform,
source=source,
threshold=threshold
)
for r in results:
fp = r.get('file_path')
r['file_token'] = encode_path(fp) if fp else None
return {"results": results, "count": len(results), "source_file_id": file_id}
# ============================================================================
# EMBEDDING GENERATION ENDPOINTS
# ============================================================================
@router.post("/generate")
@limiter.limit("10/minute")
@handle_exceptions
async def generate_embeddings(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(get_current_user),
limit: int = Body(100, ge=1, le=1000),
platform: Optional[str] = Body(None)
):
"""Generate CLIP embeddings for files that don't have them yet."""
app_state = get_app_state()
if app_state.indexing_running:
return {
"success": False,
"message": "Indexing already in progress",
"already_running": True,
"status": "already_running"
}
search = get_semantic_search(app_state.db)
app_state.indexing_running = True
app_state.indexing_start_time = time.time()
loop = asyncio.get_event_loop()
manager = getattr(app_state, 'websocket_manager', None)
def progress_callback(processed: int, total: int, current_file: str):
try:
if processed % 10 == 0 or processed == total:
stats = search.get_embedding_stats()
if manager:
asyncio.run_coroutine_threadsafe(
manager.broadcast({
"type": "embedding_progress",
"processed": processed,
"total": total,
"current_file": current_file,
"total_embeddings": stats.get('total_embeddings', 0),
"coverage_percent": stats.get('coverage_percent', 0)
}),
loop
)
except Exception:
pass
def run_generation():
try:
results = search.generate_embeddings_batch(
limit=limit,
platform=platform,
progress_callback=progress_callback
)
logger.info(f"Embedding generation complete: {results}", module="SemanticSearch")
try:
stats = search.get_embedding_stats()
if manager:
asyncio.run_coroutine_threadsafe(
manager.broadcast({
"type": "embedding_complete",
"results": results,
"total_embeddings": stats.get('total_embeddings', 0),
"coverage_percent": stats.get('coverage_percent', 0)
}),
loop
)
except Exception:
pass
except Exception as e:
logger.error(f"Embedding generation failed: {e}", module="SemanticSearch")
finally:
app_state.indexing_running = False
app_state.indexing_start_time = None
background_tasks.add_task(run_generation)
return {
"message": f"Started embedding generation for up to {limit} files",
"status": "processing"
}
@router.get("/status")
@limiter.limit("60/minute")
@handle_exceptions
async def get_semantic_indexing_status(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Check if semantic indexing is currently running."""
app_state = get_app_state()
elapsed = None
if app_state.indexing_running and app_state.indexing_start_time:
elapsed = int(time.time() - app_state.indexing_start_time)
return {
"indexing_running": app_state.indexing_running,
"elapsed_seconds": elapsed
}
# ============================================================================
# SETTINGS ENDPOINTS
# ============================================================================
@router.get("/settings")
@limiter.limit("30/minute")
@handle_exceptions
async def get_semantic_settings(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get semantic search settings."""
app_state = get_app_state()
settings = app_state.settings.get('semantic_search', {})
return {
"model": settings.get('model', 'clip-ViT-B-32'),
"threshold": settings.get('threshold', 0.2)
}
@router.post("/settings")
@limiter.limit("10/minute")
@handle_exceptions
async def update_semantic_settings(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(require_admin),
model: str = Body(None, embed=True),
threshold: float = Body(None, embed=True)
):
"""Update semantic search settings."""
app_state = get_app_state()
current_settings = app_state.settings.get('semantic_search', {}) or {}
old_model = current_settings.get('model', 'clip-ViT-B-32') if isinstance(current_settings, dict) else 'clip-ViT-B-32'
model_changed = False
new_settings = dict(current_settings) if isinstance(current_settings, dict) else {}
if model:
valid_models = ['clip-ViT-B-32', 'clip-ViT-B-16', 'clip-ViT-L-14']
if model not in valid_models:
raise ValidationError(f"Invalid model. Must be one of: {valid_models}")
if model != old_model:
model_changed = True
new_settings['model'] = model
if threshold is not None:
if threshold < 0 or threshold > 1:
raise ValidationError("Threshold must be between 0 and 1")
new_settings['threshold'] = threshold
app_state.settings.set('semantic_search', new_settings, category='ai')
if model_changed:
logger.info(f"Model changed from {old_model} to {model}, clearing embeddings", module="SemanticSearch")
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM content_embeddings')
deleted = cursor.rowcount
logger.info(f"Cleared {deleted} embeddings for model change", module="SemanticSearch")
def run_reindex_for_model_change():
try:
search = get_semantic_search(app_state.db, force_reload=True)
results = search.generate_embeddings_batch(limit=EMBEDDING_BATCH_LIMIT)
logger.info(f"Model change re-index complete: {results}", module="SemanticSearch")
except Exception as e:
logger.error(f"Model change re-index failed: {e}", module="SemanticSearch")
background_tasks.add_task(run_reindex_for_model_change)
logger.info(f"Semantic search settings updated: {new_settings} (re-indexing started)", module="SemanticSearch")
return {"success": True, "settings": new_settings, "reindexing": True, "message": f"Model changed to {model}, re-indexing started"}
logger.info(f"Semantic search settings updated: {new_settings}", module="SemanticSearch")
return {"success": True, "settings": new_settings, "reindexing": False}
@router.post("/reindex")
@limiter.limit("2/minute")
@handle_exceptions
async def reindex_embeddings(
request: Request,
background_tasks: BackgroundTasks,
current_user: Dict = Depends(require_admin)
):
"""Clear and regenerate all embeddings."""
app_state = get_app_state()
if app_state.indexing_running:
return {"success": False, "message": "Indexing already in progress", "already_running": True}
search = get_semantic_search(app_state.db)
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM content_embeddings')
deleted = cursor.rowcount
logger.info(f"Cleared {deleted} embeddings for reindexing", module="SemanticSearch")
app_state.indexing_running = True
app_state.indexing_start_time = time.time()
def run_reindex():
try:
results = search.generate_embeddings_batch(limit=EMBEDDING_BATCH_LIMIT)
logger.info(f"Reindex complete: {results}", module="SemanticSearch")
except Exception as e:
logger.error(f"Reindex failed: {e}", module="SemanticSearch")
finally:
app_state.indexing_running = False
app_state.indexing_start_time = None
background_tasks.add_task(run_reindex)
return {"success": True, "message": "Re-indexing started in background"}
@router.post("/clear")
@limiter.limit("2/minute")
@handle_exceptions
async def clear_embeddings(
request: Request,
current_user: Dict = Depends(require_admin)
):
"""Clear all embeddings."""
app_state = get_app_state()
with app_state.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM content_embeddings')
deleted = cursor.rowcount
logger.info(f"Cleared {deleted} embeddings", module="SemanticSearch")
return {"success": True, "deleted": deleted}