Files
media-downloader/modules/discovery_system.py
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

1052 lines
40 KiB
Python

#!/usr/bin/env python3
"""
Smart Content Archive & Discovery System
Provides tagging, smart folders, collections, timeline, and semantic search capabilities
"""
import json
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from pathlib import Path
from modules.universal_logger import get_logger
logger = get_logger('Discovery')
def slugify(text: str) -> str:
"""Convert text to URL-friendly slug"""
text = text.lower().strip()
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[-\s]+', '-', text)
return text
class DiscoverySystem:
"""Smart Content Archive & Discovery System"""
def __init__(self, unified_db):
"""
Initialize Discovery System
Args:
unified_db: UnifiedDatabase instance
"""
self.db = unified_db
self.logger = get_logger('Discovery')
# =========================================================================
# TAGS MANAGEMENT
# =========================================================================
def create_tag(self, name: str, parent_id: int = None, color: str = '#6366f1',
icon: str = None, description: str = None) -> Optional[int]:
"""
Create a new tag
Args:
name: Tag name
parent_id: Parent tag ID for hierarchical tags
color: Hex color code
icon: Icon name
description: Tag description
Returns:
Tag ID or None on error
"""
slug = slugify(name)
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Use INSERT OR IGNORE + retry with unique suffix to avoid race conditions
for attempt in range(5):
try_slug = slug if attempt == 0 else f'{slug}-{attempt}'
try:
cursor.execute('''
INSERT INTO tags (name, slug, parent_id, color, icon, description)
VALUES (?, ?, ?, ?, ?, ?)
''', (name, try_slug, parent_id, color, icon, description))
slug = try_slug
break
except Exception:
if attempt == 4:
raise
continue
tag_id = cursor.lastrowid
self.logger.info(f"Created tag: {name} (id={tag_id})")
return tag_id
except Exception as e:
self.logger.error(f"Failed to create tag '{name}': {e}")
return None
def get_tags(self, parent_id: int = None, include_counts: bool = True) -> List[Dict]:
"""
Get all tags, optionally filtered by parent
Args:
parent_id: Filter by parent tag (None for root tags, -1 for all)
include_counts: Include file count for each tag
Returns:
List of tag dictionaries
"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
if parent_id == -1:
# Get all tags
cursor.execute('''
SELECT t.*,
(SELECT COUNT(*) FROM file_tags ft WHERE ft.tag_id = t.id) as file_count,
(SELECT COUNT(*) FROM tags c WHERE c.parent_id = t.id) as child_count
FROM tags t
ORDER BY t.name
''')
elif parent_id is None:
# Get root tags only
cursor.execute('''
SELECT t.*,
(SELECT COUNT(*) FROM file_tags ft WHERE ft.tag_id = t.id) as file_count,
(SELECT COUNT(*) FROM tags c WHERE c.parent_id = t.id) as child_count
FROM tags t
WHERE t.parent_id IS NULL
ORDER BY t.name
''')
else:
# Get children of specific parent
cursor.execute('''
SELECT t.*,
(SELECT COUNT(*) FROM file_tags ft WHERE ft.tag_id = t.id) as file_count,
(SELECT COUNT(*) FROM tags c WHERE c.parent_id = t.id) as child_count
FROM tags t
WHERE t.parent_id = ?
ORDER BY t.name
''', (parent_id,))
tags = []
for row in cursor.fetchall():
tags.append({
'id': row['id'],
'name': row['name'],
'slug': row['slug'],
'parent_id': row['parent_id'],
'color': row['color'],
'icon': row['icon'],
'description': row['description'],
'file_count': row['file_count'] if include_counts else None,
'child_count': row['child_count'] if include_counts else None,
'created_date': row['created_date'],
})
return tags
except Exception as e:
self.logger.error(f"Failed to get tags: {e}")
return []
def get_tag(self, tag_id: int) -> Optional[Dict]:
"""Get a single tag by ID"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT t.*,
(SELECT COUNT(*) FROM file_tags ft WHERE ft.tag_id = t.id) as file_count
FROM tags t
WHERE t.id = ?
''', (tag_id,))
row = cursor.fetchone()
if row:
return {
'id': row['id'],
'name': row['name'],
'slug': row['slug'],
'parent_id': row['parent_id'],
'color': row['color'],
'icon': row['icon'],
'description': row['description'],
'file_count': row['file_count'],
'created_date': row['created_date'],
}
return None
except Exception as e:
self.logger.error(f"Failed to get tag {tag_id}: {e}")
return None
def update_tag(self, tag_id: int, name: str = None, color: str = None,
icon: str = None, description: str = None, parent_id: int = None) -> bool:
"""Update an existing tag"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
updates = []
params = []
if name is not None:
updates.append('name = ?')
params.append(name)
updates.append('slug = ?')
params.append(slugify(name))
if color is not None:
updates.append('color = ?')
params.append(color)
if icon is not None:
updates.append('icon = ?')
params.append(icon)
if description is not None:
updates.append('description = ?')
params.append(description)
if parent_id is not None:
# Prevent circular reference
if parent_id != tag_id:
updates.append('parent_id = ?')
params.append(parent_id if parent_id > 0 else None)
if not updates:
return True
updates.append('updated_date = CURRENT_TIMESTAMP')
params.append(tag_id)
cursor.execute(f'''
UPDATE tags SET {', '.join(updates)}
WHERE id = ?
''', params)
return cursor.rowcount > 0
except Exception as e:
self.logger.error(f"Failed to update tag {tag_id}: {e}")
return False
def delete_tag(self, tag_id: int) -> bool:
"""Delete a tag (file_tags will cascade delete)"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM tags WHERE id = ?', (tag_id,))
deleted = cursor.rowcount > 0
if deleted:
self.logger.info(f"Deleted tag {tag_id}")
return deleted
except Exception as e:
self.logger.error(f"Failed to delete tag {tag_id}: {e}")
return False
def tag_file(self, file_id: int, tag_id: int, created_by: str = None) -> bool:
"""Add a tag to a file"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO file_tags (file_id, tag_id, created_by)
VALUES (?, ?, ?)
''', (file_id, tag_id, created_by))
return True
except Exception as e:
self.logger.error(f"Failed to tag file {file_id} with tag {tag_id}: {e}")
return False
def untag_file(self, file_id: int, tag_id: int) -> bool:
"""Remove a tag from a file"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM file_tags WHERE file_id = ? AND tag_id = ?
''', (file_id, tag_id))
return cursor.rowcount > 0
except Exception as e:
self.logger.error(f"Failed to untag file {file_id}: {e}")
return False
def get_file_tags(self, file_id: int) -> List[Dict]:
"""Get all tags for a file"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT t.* FROM tags t
JOIN file_tags ft ON ft.tag_id = t.id
WHERE ft.file_id = ?
ORDER BY t.name
''', (file_id,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
self.logger.error(f"Failed to get tags for file {file_id}: {e}")
return []
def get_files_by_tag(self, tag_id: int, limit: int = 100, offset: int = 0) -> Tuple[List[Dict], int]:
"""Get files with a specific tag"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
# Get total count
cursor.execute('''
SELECT COUNT(*) FROM file_tags WHERE tag_id = ?
''', (tag_id,))
total = cursor.fetchone()[0]
# Get files
cursor.execute('''
SELECT fi.* FROM file_inventory fi
JOIN file_tags ft ON ft.file_id = fi.id
WHERE ft.tag_id = ?
ORDER BY fi.created_date DESC
LIMIT ? OFFSET ?
''', (tag_id, limit, offset))
files = [dict(row) for row in cursor.fetchall()]
return files, total
except Exception as e:
self.logger.error(f"Failed to get files by tag {tag_id}: {e}")
return [], 0
def bulk_tag_files(self, file_ids: List[int], tag_ids: List[int], created_by: str = None) -> int:
"""Tag multiple files with multiple tags"""
count = 0
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
for file_id in file_ids:
for tag_id in tag_ids:
cursor.execute('''
INSERT OR IGNORE INTO file_tags (file_id, tag_id, created_by)
VALUES (?, ?, ?)
''', (file_id, tag_id, created_by))
count += cursor.rowcount
self.logger.info(f"Bulk tagged {count} file-tag pairs")
return count
except Exception as e:
self.logger.error(f"Failed to bulk tag files: {e}")
return count
# =========================================================================
# SMART FOLDERS
# =========================================================================
def create_smart_folder(self, name: str, filters: Dict, icon: str = 'folder',
color: str = '#6366f1', description: str = None,
sort_by: str = 'post_date', sort_order: str = 'desc') -> Optional[int]:
"""Create a new smart folder (saved query)"""
slug = slugify(name)
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Use retry loop to avoid race conditions with slug uniqueness
for attempt in range(5):
try_slug = slug if attempt == 0 else f'{slug}-{attempt}'
try:
cursor.execute('''
INSERT INTO smart_folders (name, slug, icon, color, description, filters, sort_by, sort_order)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (name, try_slug, icon, color, description, json.dumps(filters), sort_by, sort_order))
slug = try_slug
break
except Exception:
if attempt == 4:
raise
continue
folder_id = cursor.lastrowid
self.logger.info(f"Created smart folder: {name} (id={folder_id})")
return folder_id
except Exception as e:
self.logger.error(f"Failed to create smart folder '{name}': {e}")
return None
def get_smart_folders(self, include_system: bool = True) -> List[Dict]:
"""Get all smart folders"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
if include_system:
cursor.execute('''
SELECT * FROM smart_folders
ORDER BY display_order, name
''')
else:
cursor.execute('''
SELECT * FROM smart_folders
WHERE is_system = 0
ORDER BY display_order, name
''')
folders = []
for row in cursor.fetchall():
folders.append({
'id': row['id'],
'name': row['name'],
'slug': row['slug'],
'icon': row['icon'],
'color': row['color'],
'description': row['description'],
'filters': json.loads(row['filters']) if row['filters'] else {},
'sort_by': row['sort_by'],
'sort_order': row['sort_order'],
'is_system': bool(row['is_system']),
'display_order': row['display_order'],
})
return folders
except Exception as e:
self.logger.error(f"Failed to get smart folders: {e}")
return []
def get_smart_folder(self, folder_id: int = None, slug: str = None) -> Optional[Dict]:
"""Get a single smart folder by ID or slug"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
if folder_id:
cursor.execute('SELECT * FROM smart_folders WHERE id = ?', (folder_id,))
elif slug:
cursor.execute('SELECT * FROM smart_folders WHERE slug = ?', (slug,))
else:
return None
row = cursor.fetchone()
if row:
return {
'id': row['id'],
'name': row['name'],
'slug': row['slug'],
'icon': row['icon'],
'color': row['color'],
'description': row['description'],
'filters': json.loads(row['filters']) if row['filters'] else {},
'sort_by': row['sort_by'],
'sort_order': row['sort_order'],
'is_system': bool(row['is_system']),
}
return None
except Exception as e:
self.logger.error(f"Failed to get smart folder: {e}")
return None
def update_smart_folder(self, folder_id: int, name: str = None, filters: Dict = None,
icon: str = None, color: str = None, description: str = None,
sort_by: str = None, sort_order: str = None) -> bool:
"""Update a smart folder (cannot update system folders)"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Check if it's a system folder
cursor.execute('SELECT is_system FROM smart_folders WHERE id = ?', (folder_id,))
row = cursor.fetchone()
if row and row['is_system']:
self.logger.warning(f"Cannot update system smart folder {folder_id}")
return False
updates = []
params = []
if name is not None:
updates.append('name = ?')
params.append(name)
if filters is not None:
updates.append('filters = ?')
params.append(json.dumps(filters))
if icon is not None:
updates.append('icon = ?')
params.append(icon)
if color is not None:
updates.append('color = ?')
params.append(color)
if description is not None:
updates.append('description = ?')
params.append(description)
if sort_by is not None:
updates.append('sort_by = ?')
params.append(sort_by)
if sort_order is not None:
updates.append('sort_order = ?')
params.append(sort_order)
if not updates:
return True
updates.append('updated_date = CURRENT_TIMESTAMP')
params.append(folder_id)
cursor.execute(f'''
UPDATE smart_folders SET {', '.join(updates)}
WHERE id = ?
''', params)
return cursor.rowcount > 0
except Exception as e:
self.logger.error(f"Failed to update smart folder {folder_id}: {e}")
return False
def delete_smart_folder(self, folder_id: int) -> bool:
"""Delete a smart folder (cannot delete system folders)"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM smart_folders WHERE id = ? AND is_system = 0', (folder_id,))
deleted = cursor.rowcount > 0
if deleted:
self.logger.info(f"Deleted smart folder {folder_id}")
return deleted
except Exception as e:
self.logger.error(f"Failed to delete smart folder {folder_id}: {e}")
return False
# =========================================================================
# COLLECTIONS
# =========================================================================
def create_collection(self, name: str, description: str = None,
color: str = '#6366f1') -> Optional[int]:
"""Create a new collection"""
slug = slugify(name)
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Use retry loop to avoid race conditions with slug uniqueness
for attempt in range(5):
try_slug = slug if attempt == 0 else f'{slug}-{attempt}'
try:
cursor.execute('''
INSERT INTO collections (name, slug, description, color)
VALUES (?, ?, ?, ?)
''', (name, try_slug, description, color))
slug = try_slug
break
except Exception:
if attempt == 4:
raise
continue
collection_id = cursor.lastrowid
self.logger.info(f"Created collection: {name} (id={collection_id})")
return collection_id
except Exception as e:
self.logger.error(f"Failed to create collection '{name}': {e}")
return None
def get_collections(self) -> List[Dict]:
"""Get all collections with file counts"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT c.*,
(SELECT COUNT(*) FROM collection_files cf WHERE cf.collection_id = c.id) as file_count,
(SELECT fi.file_path FROM file_inventory fi WHERE fi.id = c.cover_file_id) as cover_path
FROM collections c
ORDER BY c.name
''')
collections = []
for row in cursor.fetchall():
collections.append({
'id': row['id'],
'name': row['name'],
'slug': row['slug'],
'description': row['description'],
'color': row['color'],
'cover_file_id': row['cover_file_id'],
'cover_path': row['cover_path'],
'file_count': row['file_count'],
'is_public': bool(row['is_public']),
'created_date': row['created_date'],
})
return collections
except Exception as e:
self.logger.error(f"Failed to get collections: {e}")
return []
def get_collection(self, collection_id: int = None, slug: str = None) -> Optional[Dict]:
"""Get a single collection by ID or slug"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
if collection_id:
cursor.execute('''
SELECT c.*,
(SELECT COUNT(*) FROM collection_files cf WHERE cf.collection_id = c.id) as file_count
FROM collections c
WHERE c.id = ?
''', (collection_id,))
elif slug:
cursor.execute('''
SELECT c.*,
(SELECT COUNT(*) FROM collection_files cf WHERE cf.collection_id = c.id) as file_count
FROM collections c
WHERE c.slug = ?
''', (slug,))
else:
return None
row = cursor.fetchone()
if row:
return {
'id': row['id'],
'name': row['name'],
'slug': row['slug'],
'description': row['description'],
'color': row['color'],
'cover_file_id': row['cover_file_id'],
'file_count': row['file_count'],
'is_public': bool(row['is_public']),
'created_date': row['created_date'],
}
return None
except Exception as e:
self.logger.error(f"Failed to get collection: {e}")
return None
def update_collection(self, collection_id: int, name: str = None,
description: str = None, color: str = None,
cover_file_id: int = None) -> bool:
"""Update a collection"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
updates = []
params = []
if name is not None:
updates.append('name = ?')
params.append(name)
if description is not None:
updates.append('description = ?')
params.append(description)
if color is not None:
updates.append('color = ?')
params.append(color)
if cover_file_id is not None:
updates.append('cover_file_id = ?')
params.append(cover_file_id if cover_file_id > 0 else None)
if not updates:
return True
updates.append('updated_date = CURRENT_TIMESTAMP')
params.append(collection_id)
cursor.execute(f'''
UPDATE collections SET {', '.join(updates)}
WHERE id = ?
''', params)
return cursor.rowcount > 0
except Exception as e:
self.logger.error(f"Failed to update collection {collection_id}: {e}")
return False
def delete_collection(self, collection_id: int) -> bool:
"""Delete a collection (collection_files will cascade delete)"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM collections WHERE id = ?', (collection_id,))
deleted = cursor.rowcount > 0
if deleted:
self.logger.info(f"Deleted collection {collection_id}")
return deleted
except Exception as e:
self.logger.error(f"Failed to delete collection {collection_id}: {e}")
return False
def add_to_collection(self, collection_id: int, file_id: int, added_by: str = None) -> bool:
"""Add a file to a collection"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Get next display order
cursor.execute('''
SELECT COALESCE(MAX(display_order), 0) + 1
FROM collection_files
WHERE collection_id = ?
''', (collection_id,))
next_order = cursor.fetchone()[0]
cursor.execute('''
INSERT OR IGNORE INTO collection_files (collection_id, file_id, display_order, added_by)
VALUES (?, ?, ?, ?)
''', (collection_id, file_id, next_order, added_by))
return True
except Exception as e:
self.logger.error(f"Failed to add file {file_id} to collection {collection_id}: {e}")
return False
def remove_from_collection(self, collection_id: int, file_id: int) -> bool:
"""Remove a file from a collection"""
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM collection_files
WHERE collection_id = ? AND file_id = ?
''', (collection_id, file_id))
return cursor.rowcount > 0
except Exception as e:
self.logger.error(f"Failed to remove file from collection: {e}")
return False
def get_collection_files(self, collection_id: int, limit: int = 100, offset: int = 0) -> Tuple[List[Dict], int]:
"""Get files in a collection"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
# Get total count
cursor.execute('''
SELECT COUNT(*) FROM collection_files WHERE collection_id = ?
''', (collection_id,))
total = cursor.fetchone()[0]
# Get files
cursor.execute('''
SELECT fi.*, cf.display_order, cf.added_date
FROM file_inventory fi
JOIN collection_files cf ON cf.file_id = fi.id
WHERE cf.collection_id = ?
ORDER BY cf.display_order
LIMIT ? OFFSET ?
''', (collection_id, limit, offset))
files = [dict(row) for row in cursor.fetchall()]
return files, total
except Exception as e:
self.logger.error(f"Failed to get collection files: {e}")
return [], 0
def bulk_add_to_collection(self, collection_id: int, file_ids: List[int], added_by: str = None) -> int:
"""Add multiple files to a collection"""
count = 0
try:
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Get current max order
cursor.execute('''
SELECT COALESCE(MAX(display_order), 0)
FROM collection_files
WHERE collection_id = ?
''', (collection_id,))
order = cursor.fetchone()[0]
for file_id in file_ids:
order += 1
cursor.execute('''
INSERT OR IGNORE INTO collection_files (collection_id, file_id, display_order, added_by)
VALUES (?, ?, ?, ?)
''', (collection_id, file_id, order, added_by))
count += cursor.rowcount
self.logger.info(f"Added {count} files to collection {collection_id}")
return count
except Exception as e:
self.logger.error(f"Failed to bulk add to collection: {e}")
return count
# =========================================================================
# TIMELINE
# =========================================================================
def get_timeline_data(self, granularity: str = 'day', date_from: str = None,
date_to: str = None, platform: str = None,
source: str = None) -> List[Dict]:
"""
Get timeline aggregation data
Args:
granularity: 'day', 'week', 'month', 'year'
date_from: Start date (YYYY-MM-DD)
date_to: End date (YYYY-MM-DD)
platform: Filter by platform
source: Filter by source
Returns:
List of timeline data points
"""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
# Date format based on granularity
date_formats = {
'day': '%Y-%m-%d',
'week': '%Y-W%W',
'month': '%Y-%m',
'year': '%Y'
}
date_format = date_formats.get(granularity, '%Y-%m-%d')
# Build query
query = f'''
SELECT
strftime('{date_format}', COALESCE(
(SELECT MAX(d.post_date) FROM downloads d WHERE d.file_path = fi.file_path),
fi.created_date
)) as date_key,
COUNT(*) as file_count,
COALESCE(SUM(fi.file_size), 0) as total_size,
SUM(CASE WHEN fi.content_type IN ('image', 'photo', 'posts', 'stories') THEN 1 ELSE 0 END) as image_count,
SUM(CASE WHEN fi.content_type IN ('video', 'reel', 'reels') THEN 1 ELSE 0 END) as video_count
FROM file_inventory fi
WHERE fi.location = 'final'
'''
params = []
if date_from:
query += ' AND fi.created_date >= ?'
params.append(date_from)
if date_to:
query += ' AND fi.created_date <= ?'
params.append(date_to + ' 23:59:59')
if platform:
query += ' AND fi.platform = ?'
params.append(platform)
if source:
query += ' AND fi.source = ?'
params.append(source)
query += f'''
GROUP BY date_key
ORDER BY date_key DESC
'''
cursor.execute(query, params)
timeline = []
for row in cursor.fetchall():
if row['date_key']:
timeline.append({
'date': row['date_key'],
'file_count': row['file_count'],
'total_size': row['total_size'],
'image_count': row['image_count'],
'video_count': row['video_count'],
})
return timeline
except Exception as e:
self.logger.error(f"Failed to get timeline data: {e}")
return []
def get_activity_heatmap(self, year: int = None, platform: str = None) -> Dict[str, int]:
"""
Get activity heatmap data (file count per day for a year)
Args:
year: Year to get data for (default: current year)
platform: Filter by platform
Returns:
Dict mapping date strings to file counts
"""
if year is None:
year = datetime.now().year
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
query = '''
SELECT
strftime('%Y-%m-%d', fi.created_date) as date_key,
COUNT(*) as count
FROM file_inventory fi
WHERE fi.location = 'final'
AND strftime('%Y', fi.created_date) = ?
'''
params = [str(year)]
if platform:
query += ' AND fi.platform = ?'
params.append(platform)
query += '''
GROUP BY date_key
ORDER BY date_key
'''
cursor.execute(query, params)
heatmap = {}
for row in cursor.fetchall():
if row['date_key']:
heatmap[row['date_key']] = row['count']
return heatmap
except Exception as e:
self.logger.error(f"Failed to get activity heatmap: {e}")
return {}
def get_on_this_day(self, month: int = None, day: int = None, limit: int = 50) -> List[Dict]:
"""
Get content from the same day in previous years ("On This Day" feature)
Args:
month: Month (1-12), defaults to today
day: Day (1-31), defaults to today
limit: Max results per year
Returns:
List of files with their years
"""
if month is None:
month = datetime.now().month
if day is None:
day = datetime.now().day
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT fi.*, strftime('%Y', fi.created_date) as year
FROM file_inventory fi
WHERE fi.location = 'final'
AND strftime('%m', fi.created_date) = ?
AND strftime('%d', fi.created_date) = ?
AND strftime('%Y', fi.created_date) < strftime('%Y', 'now')
ORDER BY fi.created_date DESC
LIMIT ?
''', (f'{month:02d}', f'{day:02d}', limit))
files = []
for row in cursor.fetchall():
file_dict = dict(row)
file_dict['years_ago'] = datetime.now().year - int(row['year'])
files.append(file_dict)
return files
except Exception as e:
self.logger.error(f"Failed to get 'on this day' content: {e}")
return []
# =========================================================================
# DISCOVERY QUEUE (delegates to unified_database)
# =========================================================================
def get_queue_stats(self) -> Dict[str, Any]:
"""Get discovery queue statistics."""
return self.db.get_discovery_queue_stats()
def get_pending_queue(self, limit: int = 100) -> List[Dict]:
"""Get pending items in the discovery queue."""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, file_id, file_path, scan_type, priority, status,
attempts, created_at, updated_at
FROM discovery_scan_queue
WHERE status = 'pending'
ORDER BY priority ASC, created_at ASC
LIMIT ?
''', (limit,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
self.logger.error(f"Failed to get pending queue: {e}")
return []
def add_to_queue(self, file_id: int, priority: int = 0) -> bool:
"""Add a file to the discovery queue."""
try:
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT file_path FROM file_inventory WHERE id = ?', (file_id,))
row = cursor.fetchone()
if not row:
return False
file_path = row['file_path'] if isinstance(row, dict) else row[0]
with self.db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO discovery_scan_queue (file_id, file_path, scan_type, priority, status)
VALUES (?, ?, 'full', ?, 'pending')
ON CONFLICT (file_id, scan_type) DO UPDATE SET
priority = MIN(excluded.priority, discovery_scan_queue.priority),
status = 'pending'
''', (file_id, file_path, priority))
conn.commit()
return True
except Exception as e:
self.logger.error(f"Failed to add to queue: {e}")
return False
def bulk_add_to_queue(self, file_ids: List[int], priority: int = 0) -> int:
"""Add multiple files to the discovery queue."""
count = 0
for file_id in file_ids:
if self.add_to_queue(file_id, priority=priority):
count += 1
return count
def clear_queue(self) -> int:
"""Clear the discovery queue."""
return self.db.clear_discovery_queue()
# Global instance (lazy initialization with thread safety)
_discovery_system = None
_discovery_system_lock = __import__('threading').Lock()
def get_discovery_system(unified_db=None):
"""Get or create global discovery system instance (thread-safe)"""
global _discovery_system
if _discovery_system is None:
with _discovery_system_lock:
# Double-check locking pattern
if _discovery_system is None:
if unified_db is None:
from modules.unified_database import UnifiedDatabase
unified_db = UnifiedDatabase()
_discovery_system = DiscoverySystem(unified_db)
return _discovery_system