Files
Todd 49e72207bf Encrypt file paths in API URLs using Fernet tokens
Raw filesystem paths were exposed in browser URLs, dev tools, and proxy logs.
Now all file-serving endpoints accept an opaque encrypted token (t= param)
derived from the session secret via HKDF, with a 4-hour TTL.

Backend:
- Add core/path_tokens.py with Fernet encrypt/decrypt (HKDF from .session_secret)
- Add file_token to all list/gallery/feed/search responses across 7 routers
- Accept optional t= param on all file-serving endpoints (backward compatible)

Frontend:
- Update 4 URL helpers in api.ts to prefer token when available
- Add 4 new helpers for paid-content/embedded-metadata URLs
- Update all 14 page/component files to pass file_token to URL builders
- Add file_token to all relevant TypeScript interfaces

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 08:25:22 -04:00

952 lines
30 KiB
Python

"""
Discovery Router
Handles discovery, organization and browsing features:
- Tags management (CRUD, file tagging, bulk operations)
- Smart folders (filter-based virtual folders)
- Collections (manual file groupings)
- Timeline and activity views
- Discovery queue management
"""
import json
from datetime import datetime
from typing import Dict, List, Optional
from fastapi import APIRouter, Body, Depends, Query, Request
from pydantic import BaseModel
from slowapi import Limiter
from slowapi.util import get_remote_address
from ..core.dependencies import get_current_user, get_app_state
from ..core.exceptions import handle_exceptions, NotFoundError, ValidationError
from ..core.path_tokens import encode_path
from ..core.responses import message_response, id_response, count_response, offset_paginated
from modules.discovery_system import get_discovery_system
from modules.universal_logger import get_logger
logger = get_logger('API')
router = APIRouter(prefix="/api", tags=["Discovery"])
limiter = Limiter(key_func=get_remote_address)
# ============================================================================
# PYDANTIC MODELS
# ============================================================================
class TagCreate(BaseModel):
name: str
parent_id: Optional[int] = None
color: str = '#6366f1'
icon: Optional[str] = None
description: Optional[str] = None
class TagUpdate(BaseModel):
name: Optional[str] = None
parent_id: Optional[int] = None
color: Optional[str] = None
icon: Optional[str] = None
description: Optional[str] = None
class BulkTagRequest(BaseModel):
file_ids: List[int]
tag_ids: List[int]
class SmartFolderCreate(BaseModel):
name: str
filters: dict = {}
icon: str = 'folder'
color: str = '#6366f1'
description: Optional[str] = None
sort_by: str = 'post_date'
sort_order: str = 'desc'
class SmartFolderUpdate(BaseModel):
name: Optional[str] = None
filters: Optional[dict] = None
icon: Optional[str] = None
color: Optional[str] = None
description: Optional[str] = None
sort_by: Optional[str] = None
sort_order: Optional[str] = None
class CollectionCreate(BaseModel):
name: str
description: Optional[str] = None
color: str = '#6366f1'
class CollectionUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
color: Optional[str] = None
cover_file_id: Optional[int] = None
class BulkCollectionAdd(BaseModel):
file_ids: List[int]
class DiscoveryQueueAdd(BaseModel):
file_ids: List[int]
priority: int = 0
# ============================================================================
# TAGS ENDPOINTS
# ============================================================================
@router.get("/tags")
@limiter.limit("60/minute")
@handle_exceptions
async def get_tags(
request: Request,
current_user: Dict = Depends(get_current_user),
parent_id: Optional[int] = Query(None, description="Parent tag ID (null for root, -1 for all)"),
include_counts: bool = Query(True, description="Include file counts")
):
"""Get all tags, optionally filtered by parent."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
tags = discovery.get_tags(parent_id=parent_id, include_counts=include_counts)
return {"tags": tags}
@router.get("/tags/{tag_id}")
@limiter.limit("60/minute")
@handle_exceptions
async def get_tag(
request: Request,
tag_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Get a single tag by ID."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
tag = discovery.get_tag(tag_id)
if not tag:
raise NotFoundError("Tag not found")
return tag
@router.post("/tags")
@limiter.limit("30/minute")
@handle_exceptions
async def create_tag(
request: Request,
tag_data: TagCreate,
current_user: Dict = Depends(get_current_user)
):
"""Create a new tag."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
tag_id = discovery.create_tag(
name=tag_data.name,
parent_id=tag_data.parent_id,
color=tag_data.color,
icon=tag_data.icon,
description=tag_data.description
)
if tag_id is None:
raise ValidationError("Failed to create tag")
return id_response(tag_id, "Tag created successfully")
@router.put("/tags/{tag_id}")
@limiter.limit("30/minute")
@handle_exceptions
async def update_tag(
request: Request,
tag_id: int,
tag_data: TagUpdate,
current_user: Dict = Depends(get_current_user)
):
"""Update a tag."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.update_tag(
tag_id=tag_id,
name=tag_data.name,
color=tag_data.color,
icon=tag_data.icon,
description=tag_data.description,
parent_id=tag_data.parent_id
)
if not success:
raise NotFoundError("Tag not found or update failed")
return message_response("Tag updated successfully")
@router.delete("/tags/{tag_id}")
@limiter.limit("30/minute")
@handle_exceptions
async def delete_tag(
request: Request,
tag_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Delete a tag."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.delete_tag(tag_id)
if not success:
raise NotFoundError("Tag not found")
return message_response("Tag deleted successfully")
@router.get("/files/{file_id}/tags")
@limiter.limit("60/minute")
@handle_exceptions
async def get_file_tags(
request: Request,
file_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Get all tags for a file."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
tags = discovery.get_file_tags(file_id)
return {"tags": tags}
@router.post("/files/{file_id}/tags/{tag_id}")
@limiter.limit("60/minute")
@handle_exceptions
async def tag_file(
request: Request,
file_id: int,
tag_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Add a tag to a file."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.tag_file(file_id, tag_id, created_by=current_user.get('sub'))
if not success:
raise ValidationError("Failed to tag file")
return message_response("Tag added to file")
@router.delete("/files/{file_id}/tags/{tag_id}")
@limiter.limit("60/minute")
@handle_exceptions
async def untag_file(
request: Request,
file_id: int,
tag_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Remove a tag from a file."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.untag_file(file_id, tag_id)
if not success:
raise NotFoundError("Tag not found on file")
return message_response("Tag removed from file")
@router.get("/tags/{tag_id}/files")
@limiter.limit("60/minute")
@handle_exceptions
async def get_files_by_tag(
request: Request,
tag_id: int,
current_user: Dict = Depends(get_current_user),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0)
):
"""Get files with a specific tag."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
files, total = discovery.get_files_by_tag(tag_id, limit=limit, offset=offset)
return offset_paginated(files, total, limit, offset, key="files")
@router.post("/tags/bulk")
@limiter.limit("30/minute")
@handle_exceptions
async def bulk_tag_files(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Tag multiple files with multiple tags."""
data = await request.json()
file_ids = data.get('file_ids', [])
tag_ids = data.get('tag_ids', [])
if not file_ids or not tag_ids:
raise ValidationError("file_ids and tag_ids required")
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
count = discovery.bulk_tag_files(file_ids, tag_ids, created_by=current_user.get('sub'))
return count_response(f"Tagged {count} file-tag pairs", count)
# ============================================================================
# SMART FOLDERS ENDPOINTS
# ============================================================================
@router.get("/smart-folders")
@limiter.limit("60/minute")
@handle_exceptions
async def get_smart_folders(
request: Request,
current_user: Dict = Depends(get_current_user),
include_system: bool = Query(True, description="Include system smart folders")
):
"""Get all smart folders."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
folders = discovery.get_smart_folders(include_system=include_system)
return {"smart_folders": folders}
@router.get("/smart-folders/stats")
@limiter.limit("30/minute")
@handle_exceptions
async def get_smart_folders_stats(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get file counts and preview thumbnails for all smart folders."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
folders = discovery.get_smart_folders(include_system=True)
stats = {}
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
for folder in folders:
filters = folder.get('filters', {})
folder_id = folder['id']
query = '''
SELECT COUNT(*) as count
FROM file_inventory fi
WHERE fi.location = 'final'
'''
params = []
if filters.get('platform'):
query += ' AND fi.platform = ?'
params.append(filters['platform'])
if filters.get('media_type'):
query += ' AND fi.content_type = ?'
params.append(filters['media_type'])
if filters.get('source'):
query += ' AND fi.source = ?'
params.append(filters['source'])
if filters.get('size_min'):
query += ' AND fi.file_size >= ?'
params.append(filters['size_min'])
cursor.execute(query, params)
count = cursor.fetchone()[0]
preview_query = '''
SELECT fi.file_path, fi.content_type
FROM file_inventory fi
WHERE fi.location = 'final'
'''
preview_params = []
if filters.get('platform'):
preview_query += ' AND fi.platform = ?'
preview_params.append(filters['platform'])
if filters.get('media_type'):
preview_query += ' AND fi.content_type = ?'
preview_params.append(filters['media_type'])
if filters.get('source'):
preview_query += ' AND fi.source = ?'
preview_params.append(filters['source'])
if filters.get('size_min'):
preview_query += ' AND fi.file_size >= ?'
preview_params.append(filters['size_min'])
preview_query += ' ORDER BY fi.created_date DESC LIMIT 4'
cursor.execute(preview_query, preview_params)
previews = []
for row in cursor.fetchall():
fp = row['file_path']
previews.append({
'file_path': fp,
'file_token': encode_path(fp) if fp else None,
'content_type': row['content_type']
})
stats[folder_id] = {
'count': count,
'previews': previews
}
return {"stats": stats}
@router.get("/smart-folders/{folder_id}")
@limiter.limit("60/minute")
@handle_exceptions
async def get_smart_folder(
request: Request,
folder_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Get a single smart folder by ID."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
folder = discovery.get_smart_folder(folder_id=folder_id)
if not folder:
raise NotFoundError("Smart folder not found")
return folder
@router.post("/smart-folders")
@limiter.limit("30/minute")
@handle_exceptions
async def create_smart_folder(
request: Request,
folder_data: SmartFolderCreate,
current_user: Dict = Depends(get_current_user)
):
"""Create a new smart folder."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
folder_id = discovery.create_smart_folder(
name=folder_data.name,
filters=folder_data.filters,
icon=folder_data.icon,
color=folder_data.color,
description=folder_data.description,
sort_by=folder_data.sort_by,
sort_order=folder_data.sort_order
)
if folder_id is None:
raise ValidationError("Failed to create smart folder")
return {"id": folder_id, "message": "Smart folder created successfully"}
@router.put("/smart-folders/{folder_id}")
@limiter.limit("30/minute")
@handle_exceptions
async def update_smart_folder(
request: Request,
folder_id: int,
folder_data: SmartFolderUpdate,
current_user: Dict = Depends(get_current_user)
):
"""Update a smart folder (cannot update system folders)."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.update_smart_folder(
folder_id=folder_id,
name=folder_data.name,
filters=folder_data.filters,
icon=folder_data.icon,
color=folder_data.color,
description=folder_data.description,
sort_by=folder_data.sort_by,
sort_order=folder_data.sort_order
)
if not success:
raise ValidationError("Failed to update smart folder (may be a system folder)")
return {"message": "Smart folder updated successfully"}
@router.delete("/smart-folders/{folder_id}")
@limiter.limit("30/minute")
@handle_exceptions
async def delete_smart_folder(
request: Request,
folder_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Delete a smart folder (cannot delete system folders)."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.delete_smart_folder(folder_id)
if not success:
raise ValidationError("Failed to delete smart folder (may be a system folder)")
return {"message": "Smart folder deleted successfully"}
# ============================================================================
# COLLECTIONS ENDPOINTS
# ============================================================================
@router.get("/collections")
@limiter.limit("60/minute")
@handle_exceptions
async def get_collections(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get all collections."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
collections = discovery.get_collections()
return {"collections": collections}
@router.get("/collections/{collection_id}")
@limiter.limit("60/minute")
@handle_exceptions
async def get_collection(
request: Request,
collection_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Get a single collection by ID."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
collection = discovery.get_collection(collection_id=collection_id)
if not collection:
raise NotFoundError("Collection not found")
return collection
@router.post("/collections")
@limiter.limit("30/minute")
@handle_exceptions
async def create_collection(
request: Request,
collection_data: CollectionCreate,
current_user: Dict = Depends(get_current_user)
):
"""Create a new collection."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
collection_id = discovery.create_collection(
name=collection_data.name,
description=collection_data.description,
color=collection_data.color
)
if collection_id is None:
raise ValidationError("Failed to create collection")
return {"id": collection_id, "message": "Collection created successfully"}
@router.put("/collections/{collection_id}")
@limiter.limit("30/minute")
@handle_exceptions
async def update_collection(
request: Request,
collection_id: int,
collection_data: CollectionUpdate,
current_user: Dict = Depends(get_current_user)
):
"""Update a collection."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.update_collection(
collection_id=collection_id,
name=collection_data.name,
description=collection_data.description,
color=collection_data.color,
cover_file_id=collection_data.cover_file_id
)
if not success:
raise NotFoundError("Collection not found")
return {"message": "Collection updated successfully"}
@router.delete("/collections/{collection_id}")
@limiter.limit("30/minute")
@handle_exceptions
async def delete_collection(
request: Request,
collection_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Delete a collection."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.delete_collection(collection_id)
if not success:
raise NotFoundError("Collection not found")
return {"message": "Collection deleted successfully"}
@router.get("/collections/{collection_id}/files")
@limiter.limit("60/minute")
@handle_exceptions
async def get_collection_files(
request: Request,
collection_id: int,
current_user: Dict = Depends(get_current_user),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0)
):
"""Get files in a collection."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
files, total = discovery.get_collection_files(collection_id, limit=limit, offset=offset)
return {"files": files, "total": total, "limit": limit, "offset": offset}
@router.post("/collections/{collection_id}/files/{file_id}")
@limiter.limit("60/minute")
@handle_exceptions
async def add_to_collection(
request: Request,
collection_id: int,
file_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Add a file to a collection."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.add_to_collection(collection_id, file_id, added_by=current_user.get('sub'))
if not success:
raise ValidationError("Failed to add file to collection")
return {"message": "File added to collection"}
@router.delete("/collections/{collection_id}/files/{file_id}")
@limiter.limit("60/minute")
@handle_exceptions
async def remove_from_collection(
request: Request,
collection_id: int,
file_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Remove a file from a collection."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.remove_from_collection(collection_id, file_id)
if not success:
raise NotFoundError("File not found in collection")
return {"message": "File removed from collection"}
@router.post("/collections/{collection_id}/files/bulk")
@limiter.limit("30/minute")
@handle_exceptions
async def bulk_add_to_collection(
request: Request,
collection_id: int,
current_user: Dict = Depends(get_current_user)
):
"""Add multiple files to a collection."""
data = await request.json()
file_ids = data.get('file_ids', [])
if not file_ids:
raise ValidationError("file_ids required")
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
count = discovery.bulk_add_to_collection(collection_id, file_ids, added_by=current_user.get('sub'))
return {"message": f"Added {count} files to collection", "count": count}
# ============================================================================
# TIMELINE ENDPOINTS
# ============================================================================
@router.get("/timeline")
@limiter.limit("60/minute")
@handle_exceptions
async def get_timeline(
request: Request,
current_user: Dict = Depends(get_current_user),
granularity: str = Query('day', pattern='^(day|week|month|year)$'),
date_from: Optional[str] = Query(None, pattern=r'^\d{4}-\d{2}-\d{2}$'),
date_to: Optional[str] = Query(None, pattern=r'^\d{4}-\d{2}-\d{2}$'),
platform: Optional[str] = Query(None),
source: Optional[str] = Query(None)
):
"""Get timeline aggregation data."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
data = discovery.get_timeline_data(
granularity=granularity,
date_from=date_from,
date_to=date_to,
platform=platform,
source=source
)
return {"timeline": data, "granularity": granularity}
@router.get("/timeline/heatmap")
@limiter.limit("60/minute")
@handle_exceptions
async def get_timeline_heatmap(
request: Request,
current_user: Dict = Depends(get_current_user),
year: Optional[int] = Query(None, ge=2000, le=2100),
platform: Optional[str] = Query(None)
):
"""Get activity heatmap data (file counts per day for a year)."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
heatmap = discovery.get_activity_heatmap(year=year, platform=platform)
return {"heatmap": heatmap, "year": year or datetime.now().year}
@router.get("/timeline/on-this-day")
@limiter.limit("60/minute")
@handle_exceptions
async def get_on_this_day(
request: Request,
current_user: Dict = Depends(get_current_user),
month: Optional[int] = Query(None, ge=1, le=12),
day: Optional[int] = Query(None, ge=1, le=31),
limit: int = Query(50, ge=1, le=200)
):
"""Get content from the same day in previous years ('On This Day' feature)."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
files = discovery.get_on_this_day(month=month, day=day, limit=limit)
return {"files": files, "count": len(files)}
# ============================================================================
# RECENT ACTIVITY ENDPOINT
# ============================================================================
@router.get("/discovery/recent-activity")
@limiter.limit("60/minute")
@handle_exceptions
async def get_recent_activity(
request: Request,
current_user: Dict = Depends(get_current_user),
limit: int = Query(10, ge=1, le=50)
):
"""Get recent activity across downloads, deletions, and restores."""
app_state = get_app_state()
activity = {
'recent_downloads': [],
'recent_deleted': [],
'recent_restored': [],
'recent_moved_to_review': [],
'summary': {
'downloads_24h': 0,
'downloads_7d': 0,
'deleted_24h': 0,
'deleted_7d': 0
}
}
with app_state.db.get_connection() as conn:
cursor = conn.cursor()
# Recent downloads
cursor.execute('''
SELECT
fi.id, fi.file_path, fi.filename, fi.platform, fi.source,
fi.content_type, fi.file_size, fi.created_date,
d.download_date, d.post_date
FROM file_inventory fi
LEFT JOIN downloads d ON d.file_path = fi.file_path
WHERE fi.location = 'final'
ORDER BY fi.created_date DESC
LIMIT ?
''', (limit,))
for row in cursor.fetchall():
fp = row['file_path']
activity['recent_downloads'].append({
'id': row['id'],
'file_path': fp,
'file_token': encode_path(fp) if fp else None,
'filename': row['filename'],
'platform': row['platform'],
'source': row['source'],
'content_type': row['content_type'],
'file_size': row['file_size'],
'timestamp': row['download_date'] or row['created_date'],
'action': 'download'
})
# Recent deleted
cursor.execute('''
SELECT
id, original_path, original_filename, recycle_path,
file_size, deleted_at, deleted_from, metadata
FROM recycle_bin
ORDER BY deleted_at DESC
LIMIT ?
''', (limit,))
for row in cursor.fetchall():
metadata = {}
if row['metadata']:
try:
metadata = json.loads(row['metadata'])
except (json.JSONDecodeError, TypeError):
pass
fp = row['recycle_path']
activity['recent_deleted'].append({
'id': row['id'],
'file_path': fp,
'file_token': encode_path(fp) if fp else None,
'original_path': row['original_path'],
'filename': row['original_filename'],
'platform': metadata.get('platform', 'unknown'),
'source': metadata.get('source', ''),
'content_type': metadata.get('content_type', 'image'),
'file_size': row['file_size'] or 0,
'timestamp': row['deleted_at'],
'deleted_from': row['deleted_from'],
'action': 'delete'
})
# Recent moved to review
cursor.execute('''
SELECT
id, file_path, filename, platform, source,
content_type, file_size, created_date
FROM file_inventory
WHERE location = 'review'
ORDER BY created_date DESC
LIMIT ?
''', (limit,))
for row in cursor.fetchall():
fp = row['file_path']
activity['recent_moved_to_review'].append({
'id': row['id'],
'file_path': fp,
'file_token': encode_path(fp) if fp else None,
'filename': row['filename'],
'platform': row['platform'],
'source': row['source'],
'content_type': row['content_type'],
'file_size': row['file_size'],
'timestamp': row['created_date'],
'action': 'review'
})
# Summary stats
cursor.execute('''
SELECT COUNT(*) FROM file_inventory
WHERE location = 'final'
AND created_date >= datetime('now', '-1 day')
''')
activity['summary']['downloads_24h'] = cursor.fetchone()[0]
cursor.execute('''
SELECT COUNT(*) FROM file_inventory
WHERE location = 'final'
AND created_date >= datetime('now', '-7 days')
''')
activity['summary']['downloads_7d'] = cursor.fetchone()[0]
cursor.execute('''
SELECT COUNT(*) FROM recycle_bin
WHERE deleted_at >= datetime('now', '-1 day')
''')
activity['summary']['deleted_24h'] = cursor.fetchone()[0]
cursor.execute('''
SELECT COUNT(*) FROM recycle_bin
WHERE deleted_at >= datetime('now', '-7 days')
''')
activity['summary']['deleted_7d'] = cursor.fetchone()[0]
return activity
# ============================================================================
# DISCOVERY QUEUE ENDPOINTS
# ============================================================================
@router.get("/discovery/queue/stats")
@limiter.limit("60/minute")
@handle_exceptions
async def get_queue_stats(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Get discovery queue statistics."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
stats = discovery.get_queue_stats()
return stats
@router.get("/discovery/queue/pending")
@limiter.limit("60/minute")
@handle_exceptions
async def get_pending_queue(
request: Request,
current_user: Dict = Depends(get_current_user),
limit: int = Query(100, ge=1, le=1000)
):
"""Get pending items in the discovery queue."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
items = discovery.get_pending_queue(limit=limit)
return {"items": items, "count": len(items)}
@router.post("/discovery/queue/add")
@limiter.limit("30/minute")
@handle_exceptions
async def add_to_queue(
request: Request,
current_user: Dict = Depends(get_current_user),
file_id: int = Body(...),
priority: int = Body(0)
):
"""Add a file to the discovery queue."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
success = discovery.add_to_queue(file_id, priority=priority)
if not success:
raise ValidationError("Failed to add file to queue")
return {"message": "File added to queue"}
@router.post("/discovery/queue/bulk-add")
@limiter.limit("10/minute")
@handle_exceptions
async def bulk_add_to_queue(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Add multiple files to the discovery queue."""
data = await request.json()
file_ids = data.get('file_ids', [])
priority = data.get('priority', 0)
if not file_ids:
raise ValidationError("file_ids required")
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
count = discovery.bulk_add_to_queue(file_ids, priority=priority)
return {"message": f"Added {count} files to queue", "count": count}
@router.delete("/discovery/queue/clear")
@limiter.limit("10/minute")
@handle_exceptions
async def clear_queue(
request: Request,
current_user: Dict = Depends(get_current_user)
):
"""Clear the discovery queue."""
app_state = get_app_state()
discovery = get_discovery_system(app_state.db)
count = discovery.clear_queue()
return {"message": f"Cleared {count} items from queue", "count": count}