Files
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

3617 lines
160 KiB
Python

"""
Database adapter for Paid Content feature
Follows existing BaseDatabaseAdapter pattern
"""
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
from modules.base_module import BaseDatabaseAdapter
class PaidContentDBAdapter(BaseDatabaseAdapter):
"""Database operations for paid content tables"""
# Attachment status constants
class AttachmentStatus:
PENDING = 'pending'
DOWNLOADING = 'downloading'
COMPLETED = 'completed'
FAILED = 'failed'
SKIPPED = 'skipped'
MISSING = 'missing'
DUPLICATE = 'duplicate'
# Embed status constants (same as attachment)
class EmbedStatus:
PENDING = 'pending'
DOWNLOADING = 'downloading'
COMPLETED = 'completed'
FAILED = 'failed'
SKIPPED = 'skipped'
# Attachment columns for SELECT queries (excludes binary thumbnail_data)
ATTACHMENT_COLUMNS = """a.id, a.post_id, a.message_id, a.attachment_index, a.name, a.file_type, a.extension,
a.server_path, a.download_url, a.file_size, a.width, a.height, a.duration,
a.status, a.local_path, a.local_filename, a.file_hash, a.perceptual_hash,
a.error_message, a.download_attempts, a.last_attempt, a.created_at,
a.downloaded_at, a.auto_requeue,
a.needs_quality_recheck, a.last_quality_check, a.quality_recheck_count"""
# Column whitelists for SQL injection prevention
ALLOWED_CONFIG_COLUMNS = {
'base_download_path', 'organize_by_date', 'organize_by_post',
'check_interval_hours', 'max_concurrent_downloads', 'download_embeds',
'embed_quality', 'notifications_enabled', 'push_notifications_enabled',
'perceptual_duplicate_detection', 'perceptual_threshold',
'auto_retry_failed', 'retry_max_attempts', 'updated_at'
}
ALLOWED_SERVICE_COLUMNS = {
'name', 'base_url', 'enabled', 'session_cookie', 'session_updated_at',
'last_health_check', 'health_status', 'supported_services',
'rate_limit_requests', 'rate_limit_window_seconds', 'updated_at'
}
ALLOWED_CREATOR_COLUMNS = {
'username', 'display_name', 'profile_image_url', 'banner_image_url',
'bio', 'joined_date', 'location', 'external_links',
'identity_id', 'enabled', 'last_checked', 'last_post_date', 'post_count',
'downloaded_count', 'total_size_bytes', 'auto_download', 'download_embeds',
'updated_at', 'last_coomer_check',
'sync_posts', 'sync_stories', 'sync_highlights', 'filter_tagged_users',
'use_authenticated_api'
}
ALLOWED_IDENTITY_COLUMNS = {
'name', 'slug', 'profile_image_url', 'notes', 'updated_at'
}
ALLOWED_POST_COLUMNS = {
'title', 'content', 'published_at', 'added_at', 'edited_at',
'has_attachments', 'attachment_count', 'downloaded', 'download_date',
'embed_count', 'embed_downloaded', 'is_favorited', 'is_viewed',
'view_date', 'local_path', 'metadata', 'is_pinned', 'pinned_at'
}
ALLOWED_ATTACHMENT_COLUMNS = {
'attachment_index', 'name', 'file_type', 'extension', 'server_path',
'download_url', 'file_size', 'width', 'height', 'duration', 'status',
'local_path', 'local_filename', 'file_hash', 'perceptual_hash',
'error_message', 'download_attempts', 'last_attempt', 'downloaded_at',
'thumbnail_data', 'auto_requeue',
'needs_quality_recheck', 'last_quality_check', 'quality_recheck_count',
'message_id'
}
ALLOWED_EMBED_COLUMNS = {
'url', 'platform', 'video_id', 'title', 'status', 'local_path',
'local_filename', 'file_size', 'duration', 'error_message',
'download_attempts', 'downloaded_at'
}
ALLOWED_MESSAGE_COLUMNS = {
'message_id', 'text', 'sent_at', 'is_from_creator', 'is_tip',
'tip_amount', 'price', 'is_free', 'is_purchased', 'has_attachments',
'attachment_count', 'is_read', 'reply_to_message_id', 'metadata'
}
def __init__(self, unified_db):
super().__init__(unified_db, platform='paid_content')
def _validate_columns(self, columns: set, allowed: set, table_name: str) -> None:
"""Validate that all columns are in the allowed whitelist"""
invalid = columns - allowed
if invalid:
raise ValueError(f"Invalid columns for {table_name}: {invalid}")
# ============== CONFIG ==============
# Default config values
DEFAULT_CONFIG = {
'base_download_path': '/paid-content',
'organize_by_date': 1,
'organize_by_post': 1,
'check_interval_hours': 6,
'max_concurrent_downloads': 3,
'download_embeds': 1,
'embed_quality': 'best',
'notifications_enabled': 1,
'push_notifications_enabled': 1,
'perceptual_duplicate_detection': 1,
'perceptual_threshold': 12,
'auto_retry_failed': 1,
'retry_max_attempts': 3,
}
def get_config(self) -> Dict:
"""Get paid content configuration"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM paid_content_config WHERE id = 1")
row = cursor.fetchone()
if row:
return dict(row)
# Return defaults if no row exists
return {**self.DEFAULT_CONFIG, 'id': 1}
def update_config(self, updates: Dict) -> bool:
"""Update configuration (creates row if not exists)"""
# Validate columns against whitelist
self._validate_columns(set(updates.keys()), self.ALLOWED_CONFIG_COLUMNS, 'paid_content_config')
updates['updated_at'] = datetime.now().isoformat()
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# First ensure the row exists
cursor.execute("INSERT OR IGNORE INTO paid_content_config (id) VALUES (1)")
# Now update
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
cursor.execute(
f"UPDATE paid_content_config SET {set_clause} WHERE id = 1",
list(updates.values())
)
conn.commit()
return True
# ============== SERVICES ==============
def get_services(self) -> List[Dict]:
"""Get all services"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM paid_content_services ORDER BY name")
return [dict(row) for row in cursor.fetchall()]
def get_service(self, service_id: str) -> Optional[Dict]:
"""Get single service"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM paid_content_services WHERE id = ?", (service_id,))
row = cursor.fetchone()
return dict(row) if row else None
def update_service(self, service_id: str, updates: Dict) -> bool:
"""Update service"""
# Validate columns against whitelist
self._validate_columns(set(updates.keys()), self.ALLOWED_SERVICE_COLUMNS, 'paid_content_services')
updates['updated_at'] = datetime.now().isoformat()
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_services SET {set_clause} WHERE id = ?",
[*updates.values(), service_id]
)
conn.commit()
return cursor.rowcount > 0
# ============== CREATORS ==============
def get_creators(self, service_id: str = None, platform: str = None,
identity_id: int = None, enabled_only: bool = False,
search: str = None, limit: int = 100, offset: int = 0) -> List[Dict]:
"""Get creators with filters"""
query = """
SELECT c.*, i.name as identity_name, i.slug as identity_slug
FROM paid_content_creators c
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE 1=1
"""
params = []
if service_id:
query += " AND c.service_id = ?"
params.append(service_id)
if platform:
query += " AND c.platform = ?"
params.append(platform)
if identity_id:
query += " AND c.identity_id = ?"
params.append(identity_id)
if enabled_only:
query += " AND c.enabled = 1"
if search:
query += " AND (c.username LIKE ? OR c.display_name LIKE ?)"
params.extend([f"%{search}%", f"%{search}%"])
query += " ORDER BY c.username LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_creator(self, creator_id: int) -> Optional[Dict]:
"""Get single creator"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT c.*, i.name as identity_name
FROM paid_content_creators c
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE c.id = ?
""", (creator_id,))
row = cursor.fetchone()
return dict(row) if row else None
def get_creator_by_api_id(self, service_id: str, platform: str, creator_id: str) -> Optional[Dict]:
"""Get creator by API identifiers"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM paid_content_creators
WHERE service_id = ? AND platform = ? AND creator_id = ?
""", (service_id, platform, creator_id))
row = cursor.fetchone()
return dict(row) if row else None
def get_creator_by_username(self, service_id: str, platform: str, username: str) -> Optional[Dict]:
"""Get creator by service, platform, and username"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM paid_content_creators
WHERE service_id = ? AND platform = ? AND username = ?
""", (service_id, platform, username))
row = cursor.fetchone()
return dict(row) if row else None
def get_creator_tagged_usernames(self, creator_id: int) -> List[Dict]:
"""Get distinct tagged usernames for a creator's posts with counts"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT tu.username, COUNT(*) as post_count
FROM paid_content_post_tagged_users tu
JOIN paid_content_posts p ON tu.post_id = p.id
WHERE p.creator_id = ? AND p.deleted_at IS NULL
AND tu.username != '__no_tags__'
GROUP BY tu.username
ORDER BY tu.username ASC
""", (creator_id,))
return [{'username': row[0], 'post_count': row[1]} for row in cursor.fetchall()]
def add_creator(self, data: Dict) -> int:
"""Add new creator"""
data['created_at'] = datetime.now().isoformat()
data['updated_at'] = data['created_at']
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"INSERT INTO paid_content_creators ({columns}) VALUES ({placeholders})",
list(data.values())
)
conn.commit()
return cursor.lastrowid
def update_creator(self, creator_id: int, updates: Dict) -> bool:
"""Update creator"""
# Validate columns against whitelist
self._validate_columns(set(updates.keys()), self.ALLOWED_CREATOR_COLUMNS, 'paid_content_creators')
updates['updated_at'] = datetime.now().isoformat()
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_creators SET {set_clause} WHERE id = ?",
[*updates.values(), creator_id]
)
conn.commit()
return cursor.rowcount > 0
def delete_creator(self, creator_id: int) -> bool:
"""Delete creator (cascades to posts/attachments)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_creators WHERE id = ?", (creator_id,))
conn.commit()
return cursor.rowcount > 0
def get_creator_post_count(self, creator_id: int) -> int:
"""Get post count for creator"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM paid_content_posts WHERE creator_id = ? AND deleted_at IS NULL",
(creator_id,)
)
return cursor.fetchone()[0]
def get_creator_downloaded_count(self, creator_id: int) -> int:
"""Get downloaded posts count for creator"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM paid_content_posts WHERE creator_id = ? AND downloaded = 1 AND deleted_at IS NULL",
(creator_id,)
)
return cursor.fetchone()[0]
def get_creator_stats(self, creator_id: int) -> Dict:
"""Get detailed stats for a creator"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
stats = {}
# Post count
cursor.execute(
"SELECT COUNT(*) FROM paid_content_posts WHERE creator_id = ? AND deleted_at IS NULL",
(creator_id,)
)
stats['total_posts'] = cursor.fetchone()[0]
# Downloaded posts
cursor.execute(
"SELECT COUNT(*) FROM paid_content_posts WHERE creator_id = ? AND downloaded = 1 AND deleted_at IS NULL",
(creator_id,)
)
stats['downloaded_posts'] = cursor.fetchone()[0]
# Attachment stats
cursor.execute("""
SELECT COUNT(*), SUM(CASE WHEN a.status = 'completed' THEN 1 ELSE 0 END),
SUM(CASE WHEN a.status = 'completed' THEN a.file_size ELSE 0 END)
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
WHERE p.creator_id = ? AND p.deleted_at IS NULL
""", (creator_id,))
row = cursor.fetchone()
stats['total_attachments'] = row[0] or 0
stats['downloaded_attachments'] = row[1] or 0
stats['total_size_bytes'] = row[2] or 0
# Failed downloads
cursor.execute("""
SELECT COUNT(*)
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
WHERE p.creator_id = ? AND a.status = 'failed' AND p.deleted_at IS NULL
""", (creator_id,))
stats['failed_downloads'] = cursor.fetchone()[0]
return stats
def increment_creator_download_stats(self, creator_id: int, file_count: int = 1, file_size: int = 0) -> bool:
"""Increment creator download stats (for real-time updates during downloads)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE paid_content_creators
SET downloaded_count = downloaded_count + ?,
total_size_bytes = total_size_bytes + ?,
updated_at = ?
WHERE id = ?
""", (file_count, file_size, datetime.now().isoformat(), creator_id))
conn.commit()
return cursor.rowcount > 0
# ============== IDENTITIES ==============
def get_identities(self) -> List[Dict]:
"""Get all identities with creator counts"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT i.*, COUNT(c.id) as creator_count
FROM paid_content_identities i
LEFT JOIN paid_content_creators c ON i.id = c.identity_id
GROUP BY i.id
ORDER BY i.name
""")
return [dict(row) for row in cursor.fetchall()]
def get_identity(self, identity_id: int) -> Optional[Dict]:
"""Get single identity"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM paid_content_identities WHERE id = ?", (identity_id,))
row = cursor.fetchone()
return dict(row) if row else None
def create_identity(self, name: str, notes: str = None) -> int:
"""Create new identity"""
slug = name.lower().replace(' ', '-')
# Make slug unique
base_slug = slug
counter = 1
while True:
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM paid_content_identities WHERE slug = ?", (slug,))
if not cursor.fetchone():
break
slug = f"{base_slug}-{counter}"
counter += 1
now = datetime.now().isoformat()
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO paid_content_identities (name, slug, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
""", (name, slug, notes, now, now))
conn.commit()
return cursor.lastrowid
def update_identity(self, identity_id: int, updates: Dict) -> bool:
"""Update identity"""
# Validate columns against whitelist
self._validate_columns(set(updates.keys()), self.ALLOWED_IDENTITY_COLUMNS, 'paid_content_identities')
updates['updated_at'] = datetime.now().isoformat()
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_identities SET {set_clause} WHERE id = ?",
[*updates.values(), identity_id]
)
conn.commit()
return cursor.rowcount > 0
def delete_identity(self, identity_id: int) -> bool:
"""Delete identity (unlinks creators but doesn't delete them)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Unlink creators first
cursor.execute(
"UPDATE paid_content_creators SET identity_id = NULL WHERE identity_id = ?",
(identity_id,)
)
cursor.execute("DELETE FROM paid_content_identities WHERE id = ?", (identity_id,))
conn.commit()
return cursor.rowcount > 0
def link_creator_to_identity(self, creator_id: int, identity_id: int) -> bool:
"""Link a creator to an identity"""
return self.update_creator(creator_id, {'identity_id': identity_id})
def unlink_creator_from_identity(self, creator_id: int) -> bool:
"""Unlink a creator from its identity"""
return self.update_creator(creator_id, {'identity_id': None})
# ============== POSTS ==============
def upsert_post(self, creator_id: int, post_data: Dict) -> tuple:
"""Insert or update post, returns (post_id, is_new)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Check if exists (including soft-deleted posts)
cursor.execute("""
SELECT id, deleted_at FROM paid_content_posts
WHERE creator_id = ? AND post_id = ?
""", (creator_id, post_data['post_id']))
existing = cursor.fetchone()
if existing:
# If soft-deleted, do NOT re-add or update — preserve the deletion
if existing[1] is not None: # deleted_at IS NOT NULL
return (existing[0], False)
# Update existing post — skip None values to avoid overwriting
# data that was previously populated (e.g. published_at from yt-dlp)
# Exception: 'title' can be set to None (e.g. OF direct client clears
# corrupted titles inherited from Coomer imports)
post_id = existing[0]
clearable_fields = ('title',)
update_data = {k: v for k, v in post_data.items()
if k != 'post_id' and (v is not None or k in clearable_fields)}
# Never overwrite these fields if they already have values in the DB
# (e.g. published_at is set by yt-dlp during download and must not be
# cleared by a subsequent flat-playlist resync that has no dates)
protected_fields = ('published_at', 'downloaded', 'download_date')
if update_data:
# Check which protected fields already have values
cursor.execute(
f"SELECT {', '.join(protected_fields)} FROM paid_content_posts WHERE id = ?",
(post_id,)
)
existing_values = cursor.fetchone()
if existing_values:
for i, field in enumerate(protected_fields):
if existing_values[i] is not None and field in update_data:
del update_data[field]
if update_data:
# Validate columns against whitelist
self._validate_columns(set(update_data.keys()), self.ALLOWED_POST_COLUMNS, 'paid_content_posts')
set_clause = ', '.join(f"{k} = ?" for k in update_data.keys())
cursor.execute(
f"UPDATE paid_content_posts SET {set_clause} WHERE id = ?",
[*update_data.values(), post_id]
)
conn.commit()
return (post_id, False) # Not new
else:
# Insert new post
post_data['creator_id'] = creator_id
columns = ', '.join(post_data.keys())
placeholders = ', '.join(['?'] * len(post_data))
cursor.execute(
f"INSERT INTO paid_content_posts ({columns}) VALUES ({placeholders})",
list(post_data.values())
)
conn.commit()
return (cursor.lastrowid, True) # Is new
def get_post(self, post_id: int) -> Optional[Dict]:
"""Get single post with attachments"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT p.*, c.username, c.platform, c.service_id, c.display_name,
c.profile_image_url, c.identity_id
FROM paid_content_posts p
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE p.id = ? AND p.deleted_at IS NULL
""", (post_id,))
row = cursor.fetchone()
if not row:
return None
post = dict(row)
# Get attachments (exclude binary thumbnail_data to avoid JSON serialization issues)
cursor.execute("""
SELECT id, post_id, attachment_index, name, file_type, extension,
server_path, download_url, file_size, width, height, duration,
status, local_path, local_filename, file_hash, perceptual_hash,
error_message, download_attempts, last_attempt, created_at, downloaded_at
FROM paid_content_attachments
WHERE post_id = ? ORDER BY attachment_index
""", (post_id,))
post['attachments'] = [dict(r) for r in cursor.fetchall()]
# Get embeds
cursor.execute("""
SELECT * FROM paid_content_embeds
WHERE post_id = ? ORDER BY id
""", (post_id,))
post['embeds'] = [dict(r) for r in cursor.fetchall()]
# Get tags
cursor.execute("""
SELECT t.id, t.name, t.slug, t.color
FROM paid_content_tags t
JOIN paid_content_post_tags pt ON t.id = pt.tag_id
WHERE pt.post_id = ?
ORDER BY t.name
""", (post_id,))
post['tags'] = [dict(r) for r in cursor.fetchall()]
return post
def update_post(self, post_id: int, updates: Dict) -> bool:
"""Update post fields (title, content, etc.)"""
if not updates:
return False
# Validate columns against whitelist
self._validate_columns(set(updates.keys()), self.ALLOWED_POST_COLUMNS, 'paid_content_posts')
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
values = list(updates.values()) + [post_id]
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_posts SET {set_clause} WHERE id = ?",
values
)
conn.commit()
return cursor.rowcount > 0
def _build_group_member_filter_clause(self, creator_group_id: int, conn) -> tuple:
"""Build per-member filter conditions for a creator group.
Returns (sql_clause, params) to append to the main query.
The clause replaces both the simple group subquery AND the global filter_tagged_users block.
"""
cursor = conn.cursor()
cursor.execute("""
SELECT creator_id, filter_tagged_users, filter_tag_ids
FROM paid_content_creator_group_members
WHERE group_id = ?
""", (creator_group_id,))
members = cursor.fetchall()
if not members:
return " AND 1=0", [] # No members = no posts
unfiltered_ids = []
filtered_clauses = []
params = []
for row in members:
cid = row[0]
raw_tagged = row[1]
raw_tags = row[2]
# Parse the filter values
tagged_users = []
tag_ids = []
try:
if raw_tagged:
tagged_users = json.loads(raw_tagged)
except (json.JSONDecodeError, TypeError):
pass
try:
if raw_tags:
tag_ids = json.loads(raw_tags)
except (json.JSONDecodeError, TypeError):
pass
has_member_filter = bool(tagged_users) or bool(tag_ids)
if not has_member_filter:
unfiltered_ids.append(cid)
else:
# Build per-member condition
sub_conditions = []
sub_params = []
if tagged_users:
tu_placeholders = ','.join(['?'] * len(tagged_users))
sub_conditions.append(
f"EXISTS (SELECT 1 FROM paid_content_post_tagged_users tu WHERE tu.post_id = p.id AND tu.username IN ({tu_placeholders}))"
)
sub_params.extend(tagged_users)
if tag_ids:
ti_placeholders = ','.join(['?'] * len(tag_ids))
sub_conditions.append(
f"EXISTS (SELECT 1 FROM paid_content_post_tags pt WHERE pt.post_id = p.id AND pt.tag_id IN ({ti_placeholders}))"
)
sub_params.extend(tag_ids)
combined = ' OR '.join(sub_conditions)
filtered_clauses.append(f"(p.creator_id = ? AND ({combined}))")
params.append(cid)
params.extend(sub_params)
# Build the combined clause
or_parts = []
if unfiltered_ids:
# For unfiltered members, apply the global creator-level filter_tagged_users
uf_placeholders = ','.join(['?'] * len(unfiltered_ids))
or_parts.append(f"""(p.creator_id IN ({uf_placeholders}) AND (
c.filter_tagged_users IS NULL
OR TRIM(c.filter_tagged_users) = ''
OR TRIM(c.filter_tagged_users) = '[]'
OR EXISTS (
SELECT 1 FROM paid_content_post_tagged_users tu
WHERE tu.post_id = p.id
AND c.filter_tagged_users LIKE '%\"' || tu.username || '\"%'
)
))""")
params = list(unfiltered_ids) + params
or_parts.extend(filtered_clauses)
if not or_parts:
return " AND 1=0", []
clause = " AND (" + " OR ".join(or_parts) + ")"
return clause, params
def get_posts(self, creator_id: int = None, creator_ids: list = None,
creator_group_id: int = None,
identity_id: int = None,
service: str = None, platform: str = None,
content_type: str = None, min_resolution: str = None,
tag_ids: List[int] = None,
tagged_user: str = None,
favorites_only: bool = False, unviewed_only: bool = False,
downloaded_only: bool = False, has_missing: bool = False,
missing_description: bool = False,
hide_empty: bool = False,
shuffle: bool = False, shuffle_seed: int = None,
date_from: str = None, date_to: str = None,
search: str = None,
sort_by: str = 'published_at', sort_order: str = 'desc',
limit: int = 50, offset: int = 0,
pinned_first: bool = True,
skip_pinned: bool = False) -> List[Dict]:
"""Get posts with filters (for feed)"""
query = """
SELECT p.*, c.username, c.platform, c.service_id, c.display_name,
c.profile_image_url, c.identity_id,
i.name as identity_name
FROM paid_content_posts p
JOIN paid_content_creators c ON p.creator_id = c.id
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE p.deleted_at IS NULL
"""
params = []
if skip_pinned:
query += " AND (p.is_pinned = 0 OR p.is_pinned IS NULL)"
if creator_id:
query += " AND p.creator_id = ?"
params.append(creator_id)
if creator_ids:
placeholders = ','.join(['?'] * len(creator_ids))
query += f" AND p.creator_id IN ({placeholders})"
params.extend(creator_ids)
_has_group_filter = False
if creator_group_id:
# Placeholder — actual clause built inside connection block
_has_group_filter = True
if identity_id:
query += " AND c.identity_id = ?"
params.append(identity_id)
if service:
query += " AND c.service_id = ?"
params.append(service)
if platform:
query += " AND c.platform = ?"
params.append(platform)
if content_type:
# Filter posts that have at least one attachment of the specified type
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id AND a.file_type = ?)"
params.append(content_type)
if min_resolution:
# Filter posts that have at least one attachment at or above the resolution
# Uses minimum dimension threshold to handle portrait/landscape and non-standard aspect ratios
# e.g., 1080p includes 1920x1080, 1080x1920, 1080x1350, etc.
resolution_thresholds = {
'720p': 720, # Either dimension >= 720
'1080p': 1080, # Either dimension >= 1080
'1440p': 1440, # Either dimension >= 1440
'4k': 2160 # Either dimension >= 2160
}
if min_resolution in resolution_thresholds:
min_dim = resolution_thresholds[min_resolution]
# Check if either dimension meets the threshold
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id AND a.width IS NOT NULL AND a.height IS NOT NULL AND (a.width >= ? OR a.height >= ?))"
params.extend([min_dim, min_dim])
if favorites_only:
query += " AND p.is_favorited = 1"
if unviewed_only:
query += " AND p.is_viewed = 0"
if downloaded_only:
query += " AND p.downloaded = 1"
if has_missing:
# Show posts that have at least one attachment with status 'failed' or 'pending'
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id AND a.status IN ('failed', 'pending', 'missing'))"
if missing_description:
# Show posts with missing or empty description/content (exclude posts marked as no_description)
query += " AND (p.content IS NULL OR p.content = '' OR TRIM(p.content) = '') AND (p.no_description IS NULL OR p.no_description = 0)"
if hide_empty:
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id)"
if date_from:
query += " AND p.published_at >= ?"
params.append(date_from)
if date_to:
# Add time component to include the full day (published_at is stored as 2022-06-21T12:00:00)
query += " AND p.published_at <= ?"
params.append(f"{date_to}T23:59:59")
if search:
# Split into words so "sweater video" matches posts containing both words anywhere
words = search.strip().split()
if words:
for word in words:
w = f"%{word}%"
query += """ AND (p.title ILIKE ? OR p.content ILIKE ?
OR c.username ILIKE ? OR c.display_name ILIKE ?
OR i.name ILIKE ?
OR EXISTS (SELECT 1 FROM paid_content_attachments a2 WHERE a2.post_id = p.id AND a2.name ILIKE ?)
OR EXISTS (SELECT 1 FROM paid_content_post_tags pt2 JOIN paid_content_tags t2 ON pt2.tag_id = t2.id WHERE pt2.post_id = p.id AND t2.name ILIKE ?))"""
params.extend([w, w, w, w, w, w, w])
if tag_ids:
# Filter posts that have ALL specified tags
placeholders = ','.join(['?'] * len(tag_ids))
query += f" AND EXISTS (SELECT 1 FROM paid_content_post_tags pt WHERE pt.post_id = p.id AND pt.tag_id IN ({placeholders}))"
params.extend(tag_ids)
if tagged_user:
if tagged_user == '__none__':
query += " AND NOT EXISTS (SELECT 1 FROM paid_content_post_tagged_users tu WHERE tu.post_id = p.id)"
else:
query += " AND EXISTS (SELECT 1 FROM paid_content_post_tagged_users tu WHERE tu.post_id = p.id AND tu.username = ?)"
params.append(tagged_user)
# Per-creator tagged user filter (auto-filter based on creator settings)
# When using group filter, this is handled by _build_group_member_filter_clause
if not _has_group_filter:
query += """
AND (
c.filter_tagged_users IS NULL
OR TRIM(c.filter_tagged_users) = ''
OR TRIM(c.filter_tagged_users) = '[]'
OR EXISTS (
SELECT 1 FROM paid_content_post_tagged_users tu
WHERE tu.post_id = p.id
AND c.filter_tagged_users LIKE '%"' || tu.username || '"%'
)
)
"""
# Shuffle mode: fetch all matching IDs, shuffle with seed, paginate in Python
if shuffle:
# Add group filter if needed, then get all matching IDs
with self.unified_db.get_connection() as conn:
id_query = query.replace(
"SELECT p.*, c.username, c.platform, c.service_id, c.display_name,\n"
" c.profile_image_url, c.identity_id,\n"
" i.name as identity_name",
"SELECT p.id"
)
id_params = list(params)
if _has_group_filter:
group_clause, group_params = self._build_group_member_filter_clause(creator_group_id, conn)
id_query += group_clause
id_params.extend(group_params)
cursor = conn.cursor()
cursor.execute(id_query, id_params)
all_ids = [row[0] for row in cursor.fetchall()]
# Deterministic shuffle with seed
import random as _random
rng = _random.Random(shuffle_seed if shuffle_seed is not None else 42)
rng.shuffle(all_ids)
# Paginate the shuffled IDs
page_ids = all_ids[offset:offset + limit]
if not page_ids:
return []
# Fetch full post data for this page, maintaining shuffled order
placeholders = ','.join(['?'] * len(page_ids))
full_query = f"""
SELECT p.*, c.username, c.platform, c.service_id, c.display_name,
c.profile_image_url, c.identity_id,
i.name as identity_name
FROM paid_content_posts p
JOIN paid_content_creators c ON p.creator_id = c.id
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE p.id IN ({placeholders})
"""
cursor.execute(full_query, page_ids)
rows_by_id = {dict(r)['id']: r for r in cursor.fetchall()}
posts = [dict(rows_by_id[pid]) for pid in page_ids if pid in rows_by_id]
else:
# Normal sorting
sort_col = sort_by if sort_by in ['published_at', 'added_at', 'title', 'download_date'] else 'published_at'
sort_dir = 'DESC' if sort_order.lower() == 'desc' else 'ASC'
if pinned_first:
query += f" ORDER BY p.is_pinned DESC, p.{sort_col} {sort_dir}, p.id {sort_dir}"
else:
query += f" ORDER BY p.{sort_col} {sort_dir}, p.id {sort_dir}"
query += " LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self.unified_db.get_connection() as conn:
# Build group member filter clause (needs connection for member lookup)
if _has_group_filter:
group_clause, group_params = self._build_group_member_filter_clause(creator_group_id, conn)
# Insert group clause before ORDER BY
order_idx = query.index(' ORDER BY')
query = query[:order_idx] + group_clause + query[order_idx:]
# Insert group params before the LIMIT/OFFSET params (last 2)
params = params[:-2] + group_params + params[-2:]
cursor = conn.cursor()
cursor.execute(query, params)
posts = [dict(row) for row in cursor.fetchall()]
# Fetch attachments, tags, tagged users for each post
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
for post in posts:
cursor.execute("""
SELECT id, post_id, attachment_index, name, file_type, extension,
server_path, download_url, file_size, width, height, duration,
status, local_path, local_filename, file_hash, perceptual_hash,
error_message, download_attempts, last_attempt, created_at, downloaded_at
FROM paid_content_attachments
WHERE post_id = ? ORDER BY attachment_index
""", (post['id'],))
post['attachments'] = [dict(row) for row in cursor.fetchall()]
cursor.execute("""
SELECT t.id, t.name, t.slug, t.color
FROM paid_content_tags t
JOIN paid_content_post_tags pt ON t.id = pt.tag_id
WHERE pt.post_id = ?
ORDER BY t.name
""", (post['id'],))
post['tags'] = [dict(row) for row in cursor.fetchall()]
cursor.execute("""
SELECT username FROM paid_content_post_tagged_users
WHERE post_id = ? ORDER BY username
""", (post['id'],))
post['tagged_users'] = [row[0] for row in cursor.fetchall()]
return posts
def get_posts_count(self, **filters) -> int:
"""Get total count of posts matching filters"""
query = """
SELECT COUNT(*)
FROM paid_content_posts p
JOIN paid_content_creators c ON p.creator_id = c.id
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE p.deleted_at IS NULL
"""
params = []
if filters.get('creator_id'):
query += " AND p.creator_id = ?"
params.append(filters['creator_id'])
if filters.get('creator_ids'):
creator_ids = filters['creator_ids']
placeholders = ','.join(['?'] * len(creator_ids))
query += f" AND p.creator_id IN ({placeholders})"
params.extend(creator_ids)
_has_group_filter = False
if filters.get('creator_group_id'):
_has_group_filter = True
if filters.get('identity_id'):
query += " AND c.identity_id = ?"
params.append(filters['identity_id'])
if filters.get('service'):
query += " AND c.service_id = ?"
params.append(filters['service'])
if filters.get('platform'):
query += " AND c.platform = ?"
params.append(filters['platform'])
if filters.get('content_type'):
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id AND a.file_type = ?)"
params.append(filters['content_type'])
if filters.get('min_resolution'):
resolution_thresholds = {
'720p': 720,
'1080p': 1080,
'1440p': 1440,
'4k': 2160
}
min_resolution = filters['min_resolution']
if min_resolution in resolution_thresholds:
min_dim = resolution_thresholds[min_resolution]
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id AND a.width IS NOT NULL AND a.height IS NOT NULL AND (a.width >= ? OR a.height >= ?))"
params.extend([min_dim, min_dim])
if filters.get('favorites_only'):
query += " AND p.is_favorited = 1"
if filters.get('unviewed_only'):
query += " AND p.is_viewed = 0"
if filters.get('downloaded_only'):
query += " AND p.downloaded = 1"
if filters.get('has_missing'):
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id AND a.status IN ('failed', 'pending', 'missing'))"
if filters.get('missing_description'):
query += " AND (p.content IS NULL OR p.content = '' OR TRIM(p.content) = '') AND (p.no_description IS NULL OR p.no_description = 0)"
if filters.get('hide_empty'):
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id)"
if filters.get('search'):
words = filters['search'].strip().split()
if words:
for word in words:
w = f"%{word}%"
query += """ AND (p.title ILIKE ? OR p.content ILIKE ?
OR c.username ILIKE ? OR c.display_name ILIKE ?
OR i.name ILIKE ?
OR EXISTS (SELECT 1 FROM paid_content_attachments a2 WHERE a2.post_id = p.id AND a2.name ILIKE ?)
OR EXISTS (SELECT 1 FROM paid_content_post_tags pt2 JOIN paid_content_tags t2 ON pt2.tag_id = t2.id WHERE pt2.post_id = p.id AND t2.name ILIKE ?))"""
params.extend([w, w, w, w, w, w, w])
if filters.get('date_from'):
query += " AND p.published_at >= ?"
params.append(filters['date_from'])
if filters.get('date_to'):
query += " AND p.published_at <= ?"
params.append(f"{filters['date_to']}T23:59:59")
if filters.get('tag_ids'):
tag_ids = filters['tag_ids']
placeholders = ','.join(['?'] * len(tag_ids))
query += f" AND EXISTS (SELECT 1 FROM paid_content_post_tags pt WHERE pt.post_id = p.id AND pt.tag_id IN ({placeholders}))"
params.extend(tag_ids)
if filters.get('tagged_user'):
if filters['tagged_user'] == '__none__':
query += " AND NOT EXISTS (SELECT 1 FROM paid_content_post_tagged_users tu WHERE tu.post_id = p.id)"
else:
query += " AND EXISTS (SELECT 1 FROM paid_content_post_tagged_users tu WHERE tu.post_id = p.id AND tu.username = ?)"
params.append(filters['tagged_user'])
if filters.get('skip_pinned'):
query += " AND (p.is_pinned = 0 OR p.is_pinned IS NULL)"
# Per-creator tagged user filter (auto-filter based on creator settings)
if not _has_group_filter:
query += """
AND (
c.filter_tagged_users IS NULL
OR TRIM(c.filter_tagged_users) = ''
OR TRIM(c.filter_tagged_users) = '[]'
OR EXISTS (
SELECT 1 FROM paid_content_post_tagged_users tu
WHERE tu.post_id = p.id
AND c.filter_tagged_users LIKE '%"' || tu.username || '"%'
)
)
"""
with self.unified_db.get_connection() as conn:
if _has_group_filter:
group_clause, group_params = self._build_group_member_filter_clause(filters['creator_group_id'], conn)
query += group_clause
params.extend(group_params)
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchone()[0]
def get_media_count(self, **filters) -> int:
"""Get total count of completed media attachments matching filters"""
query = """
SELECT COUNT(*)
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE a.status = 'completed' AND a.file_type IN ('image', 'video')
AND p.deleted_at IS NULL
"""
params = []
if filters.get('creator_id'):
query += " AND p.creator_id = ?"
params.append(filters['creator_id'])
if filters.get('creator_ids'):
creator_ids = filters['creator_ids']
placeholders = ','.join(['?'] * len(creator_ids))
query += f" AND p.creator_id IN ({placeholders})"
params.extend(creator_ids)
_has_group_filter = False
if filters.get('creator_group_id'):
_has_group_filter = True
if filters.get('identity_id'):
query += " AND c.identity_id = ?"
params.append(filters['identity_id'])
if filters.get('service'):
query += " AND c.service_id = ?"
params.append(filters['service'])
if filters.get('platform'):
query += " AND c.platform = ?"
params.append(filters['platform'])
if filters.get('content_type'):
query += " AND a.file_type = ?"
params.append(filters['content_type'])
if filters.get('min_resolution'):
resolution_thresholds = {
'720p': 720,
'1080p': 1080,
'1440p': 1440,
'4k': 2160
}
min_resolution = filters['min_resolution']
if min_resolution in resolution_thresholds:
min_dim = resolution_thresholds[min_resolution]
query += " AND a.width IS NOT NULL AND a.height IS NOT NULL AND (a.width >= ? OR a.height >= ?)"
params.extend([min_dim, min_dim])
if filters.get('favorites_only'):
query += " AND p.is_favorited = 1"
if filters.get('unviewed_only'):
query += " AND p.is_viewed = 0"
if filters.get('downloaded_only'):
query += " AND p.downloaded = 1"
if filters.get('has_missing'):
query += " AND EXISTS (SELECT 1 FROM paid_content_attachments a2 WHERE a2.post_id = p.id AND a2.status IN ('failed', 'pending', 'missing'))"
if filters.get('missing_description'):
query += " AND (p.content IS NULL OR p.content = '' OR TRIM(p.content) = '') AND (p.no_description IS NULL OR p.no_description = 0)"
if filters.get('hide_empty'):
# For media count, this is inherently true (only counting completed attachments)
# but still filter the posts to be consistent
pass
if filters.get('search'):
words = filters['search'].strip().split()
if words:
for word in words:
w = f"%{word}%"
query += """ AND (p.title ILIKE ? OR p.content ILIKE ?
OR c.username ILIKE ? OR c.display_name ILIKE ?
OR i.name ILIKE ?
OR EXISTS (SELECT 1 FROM paid_content_attachments a2 WHERE a2.post_id = p.id AND a2.name ILIKE ?)
OR EXISTS (SELECT 1 FROM paid_content_post_tags pt2 JOIN paid_content_tags t2 ON pt2.tag_id = t2.id WHERE pt2.post_id = p.id AND t2.name ILIKE ?))"""
params.extend([w, w, w, w, w, w, w])
if filters.get('date_from'):
query += " AND p.published_at >= ?"
params.append(filters['date_from'])
if filters.get('date_to'):
query += " AND p.published_at <= ?"
params.append(f"{filters['date_to']}T23:59:59")
if filters.get('tag_ids'):
tag_ids = filters['tag_ids']
placeholders = ','.join(['?'] * len(tag_ids))
query += f" AND EXISTS (SELECT 1 FROM paid_content_post_tags pt WHERE pt.post_id = p.id AND pt.tag_id IN ({placeholders}))"
params.extend(tag_ids)
if filters.get('tagged_user'):
if filters['tagged_user'] == '__none__':
query += " AND NOT EXISTS (SELECT 1 FROM paid_content_post_tagged_users tu WHERE tu.post_id = p.id)"
else:
query += " AND EXISTS (SELECT 1 FROM paid_content_post_tagged_users tu WHERE tu.post_id = p.id AND tu.username = ?)"
params.append(filters['tagged_user'])
if filters.get('skip_pinned'):
query += " AND (p.is_pinned = 0 OR p.is_pinned IS NULL)"
# Per-creator tagged user filter (auto-filter based on creator settings)
if not _has_group_filter:
query += """
AND (
c.filter_tagged_users IS NULL
OR TRIM(c.filter_tagged_users) = ''
OR TRIM(c.filter_tagged_users) = '[]'
OR EXISTS (
SELECT 1 FROM paid_content_post_tagged_users tu
WHERE tu.post_id = p.id
AND c.filter_tagged_users LIKE '%"' || tu.username || '"%'
)
)
"""
with self.unified_db.get_connection() as conn:
if _has_group_filter:
group_clause, group_params = self._build_group_member_filter_clause(filters['creator_group_id'], conn)
query += group_clause
params.extend(group_params)
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchone()[0]
def mark_post_viewed(self, post_id: int) -> bool:
"""Mark post as viewed"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE paid_content_posts
SET is_viewed = 1, view_date = ?
WHERE id = ? AND is_viewed = 0
""", (datetime.now().isoformat(), post_id))
conn.commit()
return cursor.rowcount > 0
def toggle_post_viewed(self, post_id: int) -> bool:
"""Toggle viewed status, returns new is_viewed state."""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("SELECT is_viewed FROM paid_content_posts WHERE id = ? AND deleted_at IS NULL", (post_id,))
row = cursor.fetchone()
if not row:
return False
new_val = 0 if row[0] else 1
view_date = datetime.now().isoformat() if new_val == 1 else None
cursor.execute(
"UPDATE paid_content_posts SET is_viewed = ?, view_date = ? WHERE id = ?",
(new_val, view_date, post_id)
)
conn.commit()
return bool(new_val)
def get_unviewed_posts_count(self) -> int:
"""Get count of unviewed posts (respects per-creator filter_tagged_users)."""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) as cnt
FROM paid_content_posts p
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE p.is_viewed = 0 AND p.deleted_at IS NULL
AND EXISTS (SELECT 1 FROM paid_content_attachments a WHERE a.post_id = p.id)
AND (
c.filter_tagged_users IS NULL
OR TRIM(c.filter_tagged_users) = ''
OR TRIM(c.filter_tagged_users) = '[]'
OR EXISTS (
SELECT 1 FROM paid_content_post_tagged_users tu
WHERE tu.post_id = p.id
AND c.filter_tagged_users LIKE '%"' || tu.username || '"%'
)
)
""")
return cursor.fetchone()['cnt']
def mark_all_posts_viewed(self) -> int:
"""Mark all unviewed posts as viewed."""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
'UPDATE paid_content_posts SET is_viewed = 1, view_date = ? WHERE is_viewed = 0 AND deleted_at IS NULL',
(datetime.now().isoformat(),)
)
conn.commit()
return cursor.rowcount
def toggle_post_favorite(self, post_id: int) -> bool:
"""Toggle favorite status, returns new status"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("SELECT is_favorited FROM paid_content_posts WHERE id = ? AND deleted_at IS NULL", (post_id,))
row = cursor.fetchone()
if not row:
return False
new_status = 0 if row[0] else 1
cursor.execute(
"UPDATE paid_content_posts SET is_favorited = ? WHERE id = ?",
(new_status, post_id)
)
conn.commit()
return bool(new_status)
def mark_post_downloaded(self, post_id: int) -> bool:
"""Mark post as downloaded"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE paid_content_posts
SET downloaded = 1, download_date = ?
WHERE id = ? AND deleted_at IS NULL
""", (datetime.now().isoformat(), post_id))
conn.commit()
return cursor.rowcount > 0
# ============== ATTACHMENTS ==============
# Filename patterns for placeholder/missing-photo images to skip
_PLACEHOLDER_PATTERNS = [
re.compile(r'^th_', re.IGNORECASE), # th_* forum thumbnails
re.compile(r'\.th\.(jpe?g|png|gif)$', re.IGNORECASE), # *.th.jpg forum thumbnail suffix
re.compile(r'^[a-zA-Z0-9_-]{6,12}_o\.jpe?g$', re.IGNORECASE), # gSFDFmSe_o.jpg imagevenue placeholders
re.compile(r'^\d+_\d+_[a-f0-9]+_o\.jpg$', re.IGNORECASE), # 38565340_41955754042_ed011b4e87_o.jpg
re.compile(r'^post-\d+-0-\d+-\d+\.gif$', re.IGNORECASE), # post-42358-0-1446015779-35437.gif forum icons
re.compile(r'^158089130_', re.IGNORECASE), # duck-squad signature image
re.compile(r'^fc50da521418916\.jpg$', re.IGNORECASE), # hqcelebcorner repeated tiny image
]
def upsert_attachment(self, post_id: int, data: Dict) -> int:
"""Insert or update attachment"""
# Skip placeholder/missing-photo images
filename = data.get('name', '')
if any(p.match(filename) for p in self._PLACEHOLDER_PATTERNS):
return None
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, status, width, height, duration FROM paid_content_attachments
WHERE post_id = ? AND server_path = ?
""", (post_id, data.get('server_path')))
existing = cursor.fetchone()
if existing:
att_id, status, existing_width, existing_height, existing_duration = existing
updates = []
params = []
# Update download_url for pending/failed/skipped/unavailable attachments (Fansly URLs expire)
if status in ('pending', 'failed', 'skipped', 'unavailable') and data.get('download_url'):
updates.append('download_url = ?')
params.append(data['download_url'])
# Reset failed/skipped/unavailable attachments to pending so they retry with fresh URL
# This handles PPV content that becomes available after purchase
if status in ('failed', 'skipped', 'unavailable'):
updates.append('status = ?')
params.append('pending')
updates.append('error_message = ?')
params.append(None)
updates.append('download_attempts = ?')
params.append(0)
# Mark failed/pending attachments as unavailable when no download URL (PPV/locked)
elif status in ('pending', 'failed') and not data.get('download_url') and data.get('status') == 'unavailable':
updates.append('status = ?')
params.append('unavailable')
# Update dimensions if not already set and provided in data
if existing_width is None and data.get('width'):
updates.append('width = ?')
params.append(data['width'])
if existing_height is None and data.get('height'):
updates.append('height = ?')
params.append(data['height'])
if existing_duration is None and data.get('duration'):
updates.append('duration = ?')
params.append(data['duration'])
# Set needs_quality_recheck flag if provided and not already set
if data.get('needs_quality_recheck') and status == 'completed':
updates.append('needs_quality_recheck = ?')
params.append(1)
if updates:
params.append(att_id)
cursor.execute(f"""
UPDATE paid_content_attachments
SET {', '.join(updates)}
WHERE id = ?
""", params)
conn.commit()
return att_id
# Add internal fields
data['post_id'] = post_id
data['created_at'] = datetime.now().isoformat()
# Validate and filter columns (allow internal columns: post_id, created_at)
allowed_insert_columns = self.ALLOWED_ATTACHMENT_COLUMNS | {'post_id', 'created_at'}
filtered_data = {k: v for k, v in data.items() if k in allowed_insert_columns}
columns = ', '.join(filtered_data.keys())
placeholders = ', '.join(['?'] * len(filtered_data))
cursor.execute(
f"INSERT INTO paid_content_attachments ({columns}) VALUES ({placeholders})",
list(filtered_data.values())
)
conn.commit()
return cursor.lastrowid
def get_attachment(self, attachment_id: int) -> Optional[Dict]:
"""Get single attachment with creator info"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
# Try post attachment first (post_id > 0)
cursor.execute(f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.post_id as post_api_id, p.title as post_title,
c.id as creator_db_id, c.username, c.platform, c.service_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.id = ?
""", (attachment_id,))
row = cursor.fetchone()
if row:
return dict(row)
# Try message attachment (post_id = 0, message_id set)
cursor.execute(f"""
SELECT {self.ATTACHMENT_COLUMNS}, NULL as post_api_id, NULL as post_title,
c.id as creator_db_id, c.username, c.platform, c.service_id
FROM paid_content_attachments a
JOIN paid_content_messages m ON a.message_id = m.id
JOIN paid_content_creators c ON m.creator_id = c.id
WHERE a.id = ?
""", (attachment_id,))
row = cursor.fetchone()
return dict(row) if row else None
def find_attachment_by_hash(self, file_hash: str, exclude_id: int = None) -> Optional[Dict]:
"""Find a completed attachment by file hash (for duplicate resolution)"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
query = """
SELECT id, local_path, file_type, status, thumbnail_data IS NOT NULL as has_thumbnail
FROM paid_content_attachments
WHERE file_hash = ? AND status = 'completed' AND local_path IS NOT NULL
"""
params = [file_hash]
if exclude_id:
query += " AND id != ?"
params.append(exclude_id)
query += " LIMIT 1"
cursor.execute(query, params)
row = cursor.fetchone()
return dict(row) if row else None
def get_attachment_by_path(self, local_path: str) -> Optional[Dict]:
"""Get attachment by its local file path"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.post_id as post_api_id, p.title as post_title,
p.content as post_content, p.published_at as post_date,
c.id as creator_db_id, c.username, c.platform, c.service_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.local_path = ?
""", (local_path,))
row = cursor.fetchone()
return dict(row) if row else None
def get_pending_attachments(self, creator_id: int = None, limit: int = None) -> List[Dict]:
"""Get attachments pending download
Args:
creator_id: Filter by creator (optional)
limit: Maximum number to return (None = unlimited, fetches all)
Ordering:
- New items (no last_attempt) first, ordered by post date DESC
- Re-queued items (have last_attempt) last, ordered by oldest attempt first
"""
query = f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.post_id as post_api_id, p.title as post_title,
c.id as creator_db_id, c.username, c.platform, c.service_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.status = 'pending'
"""
params = []
if creator_id:
query += " AND c.id = ?"
params.append(creator_id)
# New items first (NULL last_attempt), then re-queued by oldest attempt
query += " ORDER BY (CASE WHEN a.last_attempt IS NULL THEN 0 ELSE 1 END), a.last_attempt ASC, p.published_at DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_pending_attachments_for_post(self, post_id: int) -> List[Dict]:
"""Get pending attachments for a specific post"""
query = f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.post_id as post_api_id, p.title as post_title,
c.id as creator_db_id, c.username, c.platform, c.service_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.status = 'pending' AND a.post_id = ?
"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, (post_id,))
return [dict(row) for row in cursor.fetchall()]
def get_pending_attachment_count(self, creator_id: int = None) -> int:
"""Get count of attachments pending download"""
query = """
SELECT COUNT(*) FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
WHERE a.status = 'pending'
"""
params = []
if creator_id:
query += " AND p.creator_id = ?"
params.append(creator_id)
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchone()[0]
def get_downloading_attachment_count(self, creator_id: int = None) -> int:
"""Get count of attachments currently downloading"""
query = """
SELECT COUNT(*) FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
WHERE a.status = 'downloading'
"""
params = []
if creator_id:
query += " AND p.creator_id = ?"
params.append(creator_id)
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchone()[0]
def get_failed_attachment_count(self) -> int:
"""Get count of failed attachments"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM paid_content_attachments WHERE status = 'failed'")
return cursor.fetchone()[0]
def update_attachment_status(self, attachment_id: int, status: str, **kwargs) -> bool:
"""Update attachment status and optional fields"""
updates = {'status': status, **kwargs}
# Validate columns against whitelist
self._validate_columns(set(updates.keys()), self.ALLOWED_ATTACHMENT_COLUMNS, 'paid_content_attachments')
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_attachments SET {set_clause} WHERE id = ?",
[*updates.values(), attachment_id]
)
conn.commit()
return cursor.rowcount > 0
def update_attachment(self, attachment_id: int, updates: Dict) -> bool:
"""Update attachment fields"""
if not updates:
return False
# Validate columns against whitelist
self._validate_columns(set(updates.keys()), self.ALLOWED_ATTACHMENT_COLUMNS, 'paid_content_attachments')
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_attachments SET {set_clause} WHERE id = ?",
[*updates.values(), attachment_id]
)
conn.commit()
return cursor.rowcount > 0
def delete_attachment(self, attachment_id: int) -> bool:
"""Delete an attachment record from the database"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_attachments WHERE id = ?", [attachment_id])
conn.commit()
return cursor.rowcount > 0
def get_downloading_attachments(self, creator_id: int = None, limit: int = None) -> List[Dict]:
"""Get attachments currently being downloaded"""
query = f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.post_id as post_api_id, p.title as post_title,
c.id as creator_db_id, c.username, c.platform, c.service_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.status = 'downloading'
"""
params = []
if creator_id:
query += " AND c.id = ?"
params.append(creator_id)
query += " ORDER BY a.last_attempt DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def reset_downloading_to_pending(self) -> int:
"""Reset all downloading items back to pending (for stop all)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE paid_content_attachments
SET status = 'pending'
WHERE status = 'downloading'
""")
conn.commit()
return cursor.rowcount
def clear_failed_downloads(self) -> int:
"""Mark all failed downloads as skipped"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE paid_content_attachments
SET status = 'skipped', error_message = 'Cleared by user'
WHERE status = 'failed'
""")
conn.commit()
return cursor.rowcount
def get_attachment_thumbnail(self, attachment_id: int) -> Optional[bytes]:
"""Get thumbnail data for an attachment"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT thumbnail_data FROM paid_content_attachments WHERE id = ?",
(attachment_id,)
)
row = cursor.fetchone()
if row and row['thumbnail_data']:
return row['thumbnail_data']
return None
def get_attachments_missing_thumbnails(self, limit: int = 100) -> List[Dict]:
"""Get completed attachments that don't have thumbnails yet"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, local_path, file_type, name
FROM paid_content_attachments
WHERE status = 'completed'
AND local_path IS NOT NULL
AND thumbnail_data IS NULL
AND file_type IN ('image', 'video')
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
def count_attachments_missing_thumbnails(self) -> int:
"""Count how many completed attachments are missing thumbnails"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM paid_content_attachments
WHERE status = 'completed'
AND local_path IS NOT NULL
AND thumbnail_data IS NULL
AND file_type IN ('image', 'video')
""")
return cursor.fetchone()[0]
def count_attachments_missing_dimensions(self) -> int:
"""Count how many completed attachments are missing width/height dimensions"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM paid_content_attachments
WHERE status = 'completed'
AND local_path IS NOT NULL
AND (width IS NULL OR height IS NULL)
AND file_type IN ('image', 'video')
""")
return cursor.fetchone()[0]
def get_attachments_missing_dimensions(self, limit: int = 100) -> List[Dict]:
"""Get attachments that need dimension extraction"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, local_path, file_type, status
FROM paid_content_attachments
WHERE status = 'completed'
AND local_path IS NOT NULL
AND (width IS NULL OR height IS NULL)
AND file_type IN ('image', 'video')
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
def get_failed_downloads(self, max_attempts: int = 3, hours: int = 72, limit: int = None) -> List[Dict]:
"""Get all failed downloads for dashboard display"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
query = f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.title as post_title, p.post_id as post_api_id,
c.id as creator_db_id, c.username, c.platform, c.service_id,
CASE WHEN a.download_attempts < ? THEN 1 ELSE 0 END as can_retry
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.status = 'failed'
ORDER BY a.last_attempt DESC
"""
params: list = [max_attempts]
if limit:
query += " LIMIT ?"
params.append(limit)
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_quality_recheck_candidates(self, max_attempts: int = 24) -> List[Dict]:
"""Get attachments that need quality rechecking.
Returns fansly_direct video attachments where needs_quality_recheck=1,
quality_recheck_count < max_attempts, and last_quality_check is NULL
or older than 1 hour.
"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.post_id as post_api_id, p.title as post_title,
c.id as creator_db_id, c.username, c.platform, c.service_id, c.creator_id as account_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE c.service_id = 'fansly_direct'
AND a.file_type = 'video'
AND a.needs_quality_recheck = 1
AND a.quality_recheck_count < ?
AND (a.last_quality_check IS NULL
OR CAST(a.last_quality_check AS TEXT) < CAST(datetime('now', '-1 hour') AS TEXT))
ORDER BY a.quality_recheck_count ASC, a.id ASC
""", (max_attempts,))
return [dict(row) for row in cursor.fetchall()]
def check_duplicate_hash(self, file_hash: str) -> Optional[Dict]:
"""Check if file hash already exists"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.title as post_title, c.username
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.file_hash = ? AND a.status = 'completed'
LIMIT 1
""", (file_hash,))
row = cursor.fetchone()
return dict(row) if row else None
def get_attachments_with_phash(self) -> List[Dict]:
"""Get all completed attachments with perceptual hashes for duplicate detection"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT a.id, a.perceptual_hash, a.local_path, a.name,
p.title as post_title, c.username
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.status = 'completed'
AND a.perceptual_hash IS NOT NULL
AND a.perceptual_hash != ''
""")
return [dict(row) for row in cursor.fetchall()]
# ============== EMBEDS ==============
def upsert_embed(self, post_id: int, data: Dict) -> int:
"""Insert or update embed"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM paid_content_embeds
WHERE post_id = ? AND url = ?
""", (post_id, data.get('url')))
existing = cursor.fetchone()
if existing:
return existing[0]
data['post_id'] = post_id
data['created_at'] = datetime.now().isoformat()
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
cursor.execute(
f"INSERT INTO paid_content_embeds ({columns}) VALUES ({placeholders})",
list(data.values())
)
conn.commit()
return cursor.lastrowid
def get_pending_embeds(self, creator_id: int = None, limit: int = 50) -> List[Dict]:
"""Get embeds pending download"""
query = """
SELECT e.*, p.post_id as post_api_id, p.title as post_title,
c.id as creator_db_id, c.username, c.platform
FROM paid_content_embeds e
JOIN paid_content_posts p ON e.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE e.status = 'pending'
"""
params = []
if creator_id:
query += " AND c.id = ?"
params.append(creator_id)
query += " ORDER BY p.published_at DESC LIMIT ?"
params.append(limit)
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def update_embed_status(self, embed_id: int, status: str, **kwargs) -> bool:
"""Update embed status"""
updates = {'status': status, **kwargs}
# Validate columns against whitelist
self._validate_columns(set(updates.keys()), self.ALLOWED_EMBED_COLUMNS, 'paid_content_embeds')
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_embeds SET {set_clause} WHERE id = ?",
[*updates.values(), embed_id]
)
conn.commit()
return cursor.rowcount > 0
# ============== NOTIFICATIONS ==============
def create_notification(self, notification_type: str, title: str, message: str,
creator_id: int = None, post_id: int = None,
download_count: int = 0, file_count: int = 0,
media_files: List[Dict] = None) -> int:
"""Create notification record with optional media file metadata"""
import json
metadata = None
if media_files:
metadata = json.dumps({'media_files': media_files})
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO paid_content_notifications
(notification_type, creator_id, post_id, title, message, download_count, file_count, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (notification_type, creator_id, post_id, title, message, download_count, file_count, metadata))
conn.commit()
return cursor.lastrowid
def get_notifications(self, unread_only: bool = False, limit: int = 50, offset: int = 0) -> List[Dict]:
"""Get notifications with parsed metadata"""
import json
query = """
SELECT n.*, c.username, c.platform
FROM paid_content_notifications n
LEFT JOIN paid_content_creators c ON n.creator_id = c.id
WHERE 1=1
"""
params = []
if unread_only:
query += " AND n.is_read = 0"
query += " ORDER BY n.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
notifications = []
for row in cursor.fetchall():
notif = dict(row)
# Parse metadata JSON if present
if notif.get('metadata'):
try:
notif['metadata'] = json.loads(notif['metadata'])
except (json.JSONDecodeError, TypeError, ValueError):
notif['metadata'] = {}
else:
notif['metadata'] = {}
notifications.append(notif)
return notifications
def get_unread_notification_count(self) -> int:
"""Get count of unread notifications"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM paid_content_notifications WHERE is_read = 0")
return cursor.fetchone()[0]
def mark_notifications_read(self, notification_ids: List[int]) -> int:
"""Mark specific notifications as read"""
if not notification_ids:
return 0
placeholders = ','.join(['?'] * len(notification_ids))
now = datetime.now().isoformat()
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(f"""
UPDATE paid_content_notifications
SET is_read = 1, read_at = ?
WHERE id IN ({placeholders})
""", [now] + notification_ids)
conn.commit()
return cursor.rowcount
def mark_all_notifications_read(self) -> int:
"""Mark all notifications as read"""
now = datetime.now().isoformat()
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE paid_content_notifications
SET is_read = 1, read_at = ?
WHERE is_read = 0
""", (now,))
conn.commit()
return cursor.rowcount
def delete_notification(self, notification_id: int) -> bool:
"""Delete a notification"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
"DELETE FROM paid_content_notifications WHERE id = ?",
(notification_id,)
)
conn.commit()
return cursor.rowcount > 0
# ============== RECYCLE BIN ==============
def soft_delete_post(self, post_id: int, deleted_by: str = None) -> bool:
"""Soft delete a post by setting deleted_at timestamp"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Check post exists and is not already deleted
cursor.execute("SELECT id FROM paid_content_posts WHERE id = ? AND deleted_at IS NULL", (post_id,))
if not cursor.fetchone():
return False
# Set deleted_at timestamp (keeps record for sync dedup)
cursor.execute(
"UPDATE paid_content_posts SET deleted_at = ? WHERE id = ?",
(datetime.now().isoformat(), post_id)
)
# Also store in recycle bin for UI restore capability
cursor.execute("SELECT * FROM paid_content_posts WHERE id = ?", (post_id,))
post = cursor.fetchone()
if post:
cursor.execute("""
INSERT INTO paid_content_recycle_bin (item_type, original_id, original_data, deleted_by)
VALUES ('post', ?, ?, ?)
""", (post_id, json.dumps(dict(post)), deleted_by))
conn.commit()
return True
def get_recycle_bin_items(self, item_type: str = None, search: str = None,
platform: str = None, creator: str = None,
content_type: str = None,
limit: int = 50, offset: int = 0) -> List[Dict]:
"""Get items from recycle bin"""
query = "SELECT * FROM paid_content_recycle_bin WHERE 1=1"
params = []
if item_type:
query += " AND item_type = ?"
params.append(item_type)
if search:
query += " AND original_data LIKE ?"
params.append(f"%{search}%")
# Filter by platform (stored in original_data JSON)
if platform:
query += " AND json_extract(original_data, '$.platform') = ?"
params.append(platform)
# Filter by creator username (stored in original_data JSON)
if creator:
query += " AND json_extract(original_data, '$.creator_username') = ?"
params.append(creator)
# Filter by content/file type (for attachments)
if content_type:
query += " AND json_extract(original_data, '$.file_type') = ?"
params.append(content_type)
query += " ORDER BY deleted_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
items = []
for row in cursor.fetchall():
item = dict(row)
item['original_data'] = json.loads(item['original_data'])
items.append(item)
return items
def restore_from_recycle_bin(self, recycle_id: int) -> bool:
"""Restore item from recycle bin"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Get recycled item
cursor.execute("SELECT * FROM paid_content_recycle_bin WHERE id = ?", (recycle_id,))
item = cursor.fetchone()
if not item:
return False
item = dict(item)
original_data = json.loads(item['original_data'])
item_type = item['item_type']
# Restore based on type
if item_type == 'post':
columns = ', '.join(original_data.keys())
placeholders = ', '.join(['?'] * len(original_data))
cursor.execute(
f"INSERT OR REPLACE INTO paid_content_posts ({columns}) VALUES ({placeholders})",
list(original_data.values())
)
# Remove from recycle bin
cursor.execute("DELETE FROM paid_content_recycle_bin WHERE id = ?", (recycle_id,))
conn.commit()
return True
def empty_recycle_bin(self, item_type: str = None) -> int:
"""Permanently delete items from recycle bin"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
if item_type:
cursor.execute("DELETE FROM paid_content_recycle_bin WHERE item_type = ?", (item_type,))
else:
cursor.execute("DELETE FROM paid_content_recycle_bin")
conn.commit()
return cursor.rowcount
def permanently_delete_recycle_item(self, recycle_id: int) -> bool:
"""Permanently delete a single item from recycle bin"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_recycle_bin WHERE id = ?", (recycle_id,))
conn.commit()
return cursor.rowcount > 0
# ============== STATS ==============
def get_dashboard_stats(self) -> Dict:
"""Get dashboard statistics"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
stats = {}
# Creator counts
cursor.execute("SELECT COUNT(*) FROM paid_content_creators WHERE enabled = 1")
stats['total_creators'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM paid_content_creators")
stats['all_creators'] = cursor.fetchone()[0]
# Post counts
cursor.execute("SELECT COUNT(*) FROM paid_content_posts WHERE deleted_at IS NULL")
stats['total_posts'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM paid_content_posts WHERE downloaded = 1 AND deleted_at IS NULL")
stats['downloaded_posts'] = cursor.fetchone()[0]
# Attachment counts
cursor.execute("SELECT COUNT(*), SUM(file_size) FROM paid_content_attachments WHERE status = 'completed'")
row = cursor.fetchone()
stats['total_files'] = row[0] or 0
stats['total_size_bytes'] = row[1] or 0
# Failed downloads
cursor.execute("SELECT COUNT(*) FROM paid_content_attachments WHERE status = 'failed'")
stats['failed_downloads'] = cursor.fetchone()[0]
# Pending downloads
cursor.execute("SELECT COUNT(*) FROM paid_content_attachments WHERE status = 'pending'")
stats['pending_downloads'] = cursor.fetchone()[0]
# Unread notifications
cursor.execute("SELECT COUNT(*) FROM paid_content_notifications WHERE is_read = 0")
stats['unread_notifications'] = cursor.fetchone()[0]
# Storage by creator (top 10)
cursor.execute("""
SELECT c.id, c.username, c.platform, c.service_id, c.profile_image_url,
SUM(a.file_size) as total_size, COUNT(a.id) as file_count
FROM paid_content_creators c
JOIN paid_content_posts p ON c.id = p.creator_id AND p.deleted_at IS NULL
JOIN paid_content_attachments a ON p.id = a.post_id
WHERE a.status = 'completed'
GROUP BY c.id
ORDER BY total_size DESC
LIMIT 10
""")
stats['storage_by_creator'] = [dict(row) for row in cursor.fetchall()]
# Recent activity (last 7 days)
cursor.execute("""
SELECT SUBSTR(downloaded_at, 1, 10) as date, COUNT(*) as count
FROM paid_content_attachments
WHERE status = 'completed'
AND downloaded_at >= CAST(datetime('now', '-7 days') AS TEXT)
GROUP BY SUBSTR(downloaded_at, 1, 10)
ORDER BY date DESC
""")
stats['recent_activity'] = [dict(row) for row in cursor.fetchall()]
# Service stats
cursor.execute("""
SELECT s.id, s.name, s.health_status, s.last_health_check,
COUNT(c.id) as creator_count
FROM paid_content_services s
LEFT JOIN paid_content_creators c ON s.id = c.service_id
GROUP BY s.id
""")
stats['services'] = [dict(row) for row in cursor.fetchall()]
return stats
# ============== DOWNLOAD HISTORY ==============
def record_download_attempt(self, attachment_id: int = None, embed_id: int = None,
url: str = None, status: str = 'success',
error_message: str = None, response_code: int = None,
duration_seconds: float = None) -> int:
"""Record a download attempt in history"""
try:
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO paid_content_download_history
(attachment_id, embed_id, url, status, error_message, response_code, duration_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (attachment_id, embed_id, url, status, error_message, response_code, duration_seconds))
conn.commit()
return cursor.lastrowid
except Exception:
return 0
def get_download_history(self, attachment_id: int = None, status: str = None,
limit: int = 100) -> List[Dict]:
"""Get download history"""
query = "SELECT * FROM paid_content_download_history WHERE 1=1"
params = []
if attachment_id:
query += " AND attachment_id = ?"
params.append(attachment_id)
if status:
query += " AND status = ?"
params.append(status)
query += " ORDER BY attempt_date DESC LIMIT ?"
params.append(limit)
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
# ============== FILE INTEGRITY ==============
def scan_missing_files(self) -> Dict:
"""
Scan all completed attachments and check if files exist on disk.
Updates status to 'missing' for files that don't exist.
Returns stats about what was found.
"""
stats = {
'scanned': 0,
'missing': 0,
'found': 0,
'missing_files': []
}
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
# Get all completed attachments with local paths
cursor.execute("""
SELECT a.id, a.name, a.local_path, p.title as post_title,
c.username, c.platform
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.status = 'completed' AND a.local_path IS NOT NULL
""")
attachments = cursor.fetchall()
missing_ids = []
for att in attachments:
stats['scanned'] += 1
local_path = att['local_path']
if local_path and not Path(local_path).exists():
stats['missing'] += 1
missing_ids.append(att['id'])
stats['missing_files'].append({
'id': att['id'],
'name': att['name'],
'path': local_path,
'post_title': att['post_title'],
'creator': f"{att['username']} ({att['platform']})"
})
else:
stats['found'] += 1
# Update status for missing files
if missing_ids:
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
placeholders = ','.join(['?'] * len(missing_ids))
cursor.execute(f"""
UPDATE paid_content_attachments
SET status = 'missing',
error_message = 'File not found on disk during scan'
WHERE id IN ({placeholders})
""", missing_ids)
conn.commit()
return stats
def get_missing_attachments(self, limit: int = 100) -> List[Dict]:
"""Get attachments with missing files"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"""
SELECT {self.ATTACHMENT_COLUMNS}, p.title as post_title, p.post_id as api_post_id,
c.username, c.platform, c.service_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE a.status = 'missing'
ORDER BY a.id DESC
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
def reset_missing_to_pending(self, attachment_ids: List[int] = None) -> int:
"""Reset missing attachments to pending for re-download"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
if attachment_ids:
placeholders = ','.join(['?'] * len(attachment_ids))
cursor.execute(f"""
UPDATE paid_content_attachments
SET status = 'pending',
local_path = NULL,
local_filename = NULL,
file_hash = NULL,
error_message = NULL,
download_attempts = 0
WHERE id IN ({placeholders}) AND status = 'missing'
""", attachment_ids)
else:
# Reset all missing
cursor.execute("""
UPDATE paid_content_attachments
SET status = 'pending',
local_path = NULL,
local_filename = NULL,
file_hash = NULL,
error_message = NULL,
download_attempts = 0
WHERE status = 'missing'
""")
conn.commit()
return cursor.rowcount
# ============== TAGS ==============
def get_tags(self, creator_ids: List[int] = None) -> List[Dict]:
"""Get all tags with post counts, optionally filtered by creator IDs"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
if creator_ids:
placeholders = ','.join('?' for _ in creator_ids)
cursor.execute(f"""
SELECT t.*, COUNT(DISTINCT pt.post_id) as post_count
FROM paid_content_tags t
JOIN paid_content_post_tags pt ON t.id = pt.tag_id
JOIN paid_content_posts p ON pt.post_id = p.id
WHERE p.creator_id IN ({placeholders})
GROUP BY t.id
ORDER BY t.name COLLATE NOCASE ASC
""", creator_ids)
else:
cursor.execute("""
SELECT t.*, COUNT(pt.post_id) as post_count
FROM paid_content_tags t
LEFT JOIN paid_content_post_tags pt ON t.id = pt.tag_id
GROUP BY t.id
ORDER BY t.name COLLATE NOCASE ASC
""")
return [dict(row) for row in cursor.fetchall()]
def get_tag(self, tag_id: int) -> Optional[Dict]:
"""Get single tag"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM paid_content_tags WHERE id = ?", (tag_id,))
row = cursor.fetchone()
return dict(row) if row else None
def get_tag_by_slug(self, slug: str) -> Optional[Dict]:
"""Get tag by slug"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM paid_content_tags WHERE slug = ?", (slug,))
row = cursor.fetchone()
return dict(row) if row else None
def create_tag(self, name: str, color: str = "#6b7280", description: str = None) -> Optional[int]:
"""Create a new tag"""
slug = name.lower().replace(' ', '-').replace('/', '-')
now = datetime.now().isoformat()
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO paid_content_tags (name, slug, color, description, created_at)
VALUES (?, ?, ?, ?, ?)
""", (name, slug, color, description, now))
conn.commit()
return cursor.lastrowid
except Exception:
return None
def update_tag(self, tag_id: int, updates: Dict) -> bool:
"""Update a tag"""
allowed = {'name', 'color', 'description'}
filtered = {k: v for k, v in updates.items() if k in allowed}
if not filtered:
return False
# Update slug if name changed
if 'name' in filtered:
filtered['slug'] = filtered['name'].lower().replace(' ', '-').replace('/', '-')
set_clause = ', '.join(f"{k} = ?" for k in filtered.keys())
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_tags SET {set_clause} WHERE id = ?",
[*filtered.values(), tag_id]
)
conn.commit()
return cursor.rowcount > 0
def delete_tag(self, tag_id: int) -> bool:
"""Delete a tag"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_tags WHERE id = ?", (tag_id,))
conn.commit()
return cursor.rowcount > 0
def get_post_tags(self, post_id: int) -> List[Dict]:
"""Get tags for a specific post"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT t.*
FROM paid_content_tags t
JOIN paid_content_post_tags pt ON t.id = pt.tag_id
WHERE pt.post_id = ?
ORDER BY t.name
""", (post_id,))
return [dict(row) for row in cursor.fetchall()]
def add_tag_to_post(self, post_id: int, tag_id: int) -> bool:
"""Add a tag to a post"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
try:
cursor.execute("""
INSERT OR IGNORE INTO paid_content_post_tags (post_id, tag_id, created_at)
VALUES (?, ?, ?)
""", (post_id, tag_id, datetime.now().isoformat()))
conn.commit()
return cursor.rowcount > 0
except Exception:
return False
def remove_tag_from_post(self, post_id: int, tag_id: int) -> bool:
"""Remove a tag from a post"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM paid_content_post_tags WHERE post_id = ? AND tag_id = ?
""", (post_id, tag_id))
conn.commit()
return cursor.rowcount > 0
def set_post_tags(self, post_id: int, tag_ids: List[int]) -> None:
"""Set tags for a post (replaces existing tags)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Remove existing tags
cursor.execute("DELETE FROM paid_content_post_tags WHERE post_id = ?", (post_id,))
# Add new tags
now = datetime.now().isoformat()
for tag_id in tag_ids:
cursor.execute("""
INSERT OR IGNORE INTO paid_content_post_tags (post_id, tag_id, created_at)
VALUES (?, ?, ?)
""", (post_id, tag_id, now))
conn.commit()
# ============== TAGGED USERS ==============
def set_post_tagged_users(self, post_id: int, usernames: List[str]) -> None:
"""Set tagged users for a post (replaces existing)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_post_tagged_users WHERE post_id = ?", (post_id,))
now = datetime.now().isoformat()
for username in usernames:
cursor.execute("""
INSERT OR IGNORE INTO paid_content_post_tagged_users (post_id, username, created_at)
VALUES (?, ?, ?)
""", (post_id, username, now))
conn.commit()
def add_tagged_user(self, post_id: int, username: str) -> None:
"""Add a single tagged user to a post (without removing existing ones)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR IGNORE INTO paid_content_post_tagged_users (post_id, username, created_at)
VALUES (?, ?, ?)
""", (post_id, username, now))
conn.commit()
def get_post_tagged_users(self, post_id: int) -> List[str]:
"""Get tagged usernames for a post"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT username FROM paid_content_post_tagged_users
WHERE post_id = ? ORDER BY username
""", (post_id,))
return [row[0] for row in cursor.fetchall()]
def get_all_tagged_usernames(self, creator_ids: List[int] = None) -> List[Dict]:
"""Get all distinct tagged usernames with post counts, optionally filtered by creator IDs"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
if creator_ids:
placeholders = ','.join('?' for _ in creator_ids)
cursor.execute(f"""
SELECT username, COUNT(*) as post_count
FROM paid_content_post_tagged_users tu
JOIN paid_content_posts p ON tu.post_id = p.id
WHERE p.deleted_at IS NULL AND p.creator_id IN ({placeholders})
GROUP BY username
ORDER BY username ASC
""", creator_ids)
else:
cursor.execute("""
SELECT username, COUNT(*) as post_count
FROM paid_content_post_tagged_users tu
JOIN paid_content_posts p ON tu.post_id = p.id
WHERE p.deleted_at IS NULL
GROUP BY username
ORDER BY username ASC
""")
return [{'username': row[0], 'post_count': row[1]} for row in cursor.fetchall()]
def get_content_types(self, creator_ids: List[int] = None) -> List[str]:
"""Get distinct content types (from attachments), optionally filtered by creator IDs"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
if creator_ids:
placeholders = ','.join('?' for _ in creator_ids)
cursor.execute(f"""
SELECT DISTINCT a.file_type
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
WHERE p.deleted_at IS NULL AND a.file_type IS NOT NULL
AND p.creator_id IN ({placeholders})
ORDER BY a.file_type ASC
""", creator_ids)
else:
cursor.execute("""
SELECT DISTINCT a.file_type
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
WHERE p.deleted_at IS NULL AND a.file_type IS NOT NULL
ORDER BY a.file_type ASC
""")
return [row[0] for row in cursor.fetchall()]
# ============== MESSAGES ==============
def upsert_message(self, creator_id: int, msg_data: Dict) -> tuple:
"""Insert or update message, returns (message_db_id, is_new)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM paid_content_messages
WHERE creator_id = ? AND message_id = ?
""", (creator_id, msg_data['message_id']))
existing = cursor.fetchone()
if existing:
msg_db_id = existing[0]
update_data = {k: v for k, v in msg_data.items() if k != 'message_id' and v is not None}
if update_data:
self._validate_columns(set(update_data.keys()), self.ALLOWED_MESSAGE_COLUMNS, 'paid_content_messages')
set_clause = ', '.join(f"{k} = ?" for k in update_data.keys())
cursor.execute(
f"UPDATE paid_content_messages SET {set_clause} WHERE id = ?",
[*update_data.values(), msg_db_id]
)
conn.commit()
return (msg_db_id, False)
else:
msg_data['creator_id'] = creator_id
msg_data['created_at'] = datetime.now().isoformat()
# Filter to allowed columns + internal columns
allowed_insert = self.ALLOWED_MESSAGE_COLUMNS | {'creator_id', 'created_at'}
filtered = {k: v for k, v in msg_data.items() if k in allowed_insert}
columns = ', '.join(filtered.keys())
placeholders = ', '.join(['?'] * len(filtered))
cursor.execute(
f"INSERT INTO paid_content_messages ({columns}) VALUES ({placeholders})",
list(filtered.values())
)
conn.commit()
return (cursor.lastrowid, True)
def upsert_message_attachment(self, message_id: int, data: Dict) -> int:
"""Insert or update attachment for a message (dedup on message_id + server_path).
Message attachments have post_id = NULL (they don't belong to a post).
"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, status FROM paid_content_attachments
WHERE message_id = ? AND server_path = ?
""", (message_id, data.get('server_path')))
existing = cursor.fetchone()
if existing:
att_id, status = existing['id'], existing['status']
# Update download_url for pending/failed attachments (URLs expire)
if status in ('pending', 'failed', 'skipped') and data.get('download_url'):
updates = ['download_url = ?']
params = [data['download_url']]
if status in ('failed', 'skipped'):
updates.extend(['status = ?', 'error_message = ?', 'download_attempts = ?'])
params.extend(['pending', None, 0])
params.append(att_id)
cursor.execute(f"""
UPDATE paid_content_attachments
SET {', '.join(updates)}
WHERE id = ?
""", params)
conn.commit()
return att_id
# Insert new — post_id is NULL for message attachments
data['message_id'] = message_id
data['created_at'] = datetime.now().isoformat()
allowed_insert = self.ALLOWED_ATTACHMENT_COLUMNS | {'message_id', 'created_at'}
filtered = {k: v for k, v in data.items() if k in allowed_insert and k != 'post_id'}
columns = ', '.join(filtered.keys())
placeholders = ', '.join(['?'] * len(filtered))
try:
cursor.execute(
f"INSERT INTO paid_content_attachments ({columns}) VALUES ({placeholders})",
list(filtered.values())
)
conn.commit()
return cursor.lastrowid
except Exception as e:
logger.error(f"Error inserting message attachment: {e}")
return 0
def get_conversations(self, creator_id: int = None, service_id: str = None,
search: str = None, limit: int = 50, offset: int = 0) -> List[Dict]:
"""Get conversation list grouped by creator with message stats"""
query = """
SELECT c.id as creator_id, c.username, c.display_name, c.platform,
c.service_id, c.profile_image_url,
COUNT(m.id) as message_count,
SUM(CASE WHEN m.is_read = 0 THEN 1 ELSE 0 END) as unread_count,
MAX(m.sent_at) as last_message_at,
(SELECT m2.text FROM paid_content_messages m2
WHERE m2.creator_id = c.id
ORDER BY m2.sent_at DESC LIMIT 1) as last_message_text,
(SELECT m2.is_from_creator FROM paid_content_messages m2
WHERE m2.creator_id = c.id
ORDER BY m2.sent_at DESC LIMIT 1) as last_message_from_creator
FROM paid_content_creators c
INNER JOIN paid_content_messages m ON c.id = m.creator_id
WHERE 1=1
"""
params = []
if creator_id:
query += " AND c.id = ?"
params.append(creator_id)
if service_id:
query += " AND c.service_id = ?"
params.append(service_id)
if search:
query += " AND (c.username LIKE ? OR c.display_name LIKE ?)"
params.extend([f"%{search}%", f"%{search}%"])
query += " GROUP BY c.id ORDER BY last_message_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_messages(self, creator_id: int, before: str = None, limit: int = 50) -> List[Dict]:
"""Get paginated messages for a conversation, newest first, with attachments"""
query = """
SELECT m.* FROM paid_content_messages m
WHERE m.creator_id = ?
"""
params = [creator_id]
if before:
query += " AND m.sent_at < ?"
params.append(before)
query += " ORDER BY m.sent_at DESC LIMIT ?"
params.append(limit)
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
messages = [dict(row) for row in cursor.fetchall()]
# Fetch attachments for each message
for msg in messages:
cursor.execute("""
SELECT id, message_id, attachment_index, name, file_type, extension,
server_path, download_url, file_size, width, height, duration,
status, local_path, local_filename, file_hash,
error_message, download_attempts, created_at, downloaded_at
FROM paid_content_attachments
WHERE message_id = ? ORDER BY attachment_index
""", (msg['id'],))
atts = [dict(row) for row in cursor.fetchall()]
# Resolve local_path for attachments that are copies of post media
for att in atts:
if not att.get('local_path') and att.get('file_hash') and att.get('status') in ('completed', 'duplicate'):
cursor.execute("""
SELECT local_path FROM paid_content_attachments
WHERE file_hash = ? AND local_path IS NOT NULL AND local_path != '' AND id != ?
LIMIT 1
""", (att['file_hash'], att['id']))
orig = cursor.fetchone()
if orig:
att['local_path'] = orig['local_path']
msg['attachments'] = atts
return messages
def get_message_count(self, creator_id: int) -> int:
"""Get total message count for a creator"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM paid_content_messages WHERE creator_id = ?",
(creator_id,)
)
return cursor.fetchone()[0]
def get_total_unread_messages_count(self) -> int:
"""Get total count of unread messages across all creators."""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM paid_content_messages WHERE is_read = 0 AND is_from_creator = 1')
return cursor.fetchone()[0]
def mark_all_messages_read(self) -> int:
"""Mark all unread messages as read across all creators."""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute('UPDATE paid_content_messages SET is_read = 1 WHERE is_read = 0')
conn.commit()
return cursor.rowcount
def mark_messages_read(self, creator_id: int, message_ids: list = None) -> int:
"""Mark messages as read. If message_ids provided, marks only those; otherwise marks all."""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
if message_ids:
placeholders = ','.join('?' for _ in message_ids)
cursor.execute(
f"UPDATE paid_content_messages SET is_read = 1 WHERE creator_id = ? AND id IN ({placeholders}) AND is_read = 0",
[creator_id] + list(message_ids)
)
else:
cursor.execute(
"UPDATE paid_content_messages SET is_read = 1 WHERE creator_id = ? AND is_read = 0",
(creator_id,)
)
conn.commit()
return cursor.rowcount
def get_pending_message_attachments(self, creator_id: int = None) -> List[Dict]:
"""Get pending message attachments for download"""
query = f"""
SELECT {self.ATTACHMENT_COLUMNS}, m.message_id as msg_api_id,
c.id as creator_db_id, c.username, c.platform, c.service_id
FROM paid_content_attachments a
JOIN paid_content_messages m ON a.message_id = m.id
JOIN paid_content_creators c ON m.creator_id = c.id
WHERE a.status = 'pending' AND a.message_id IS NOT NULL
"""
params = []
if creator_id:
query += " AND c.id = ?"
params.append(creator_id)
query += " ORDER BY m.sent_at DESC"
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
# ========================================================================
# CREATOR GROUPS
# ========================================================================
def get_creator_groups(self) -> List[Dict]:
"""List all creator groups with member count"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT g.*,
COUNT(m.creator_id) as member_count
FROM paid_content_creator_groups g
LEFT JOIN paid_content_creator_group_members m ON g.id = m.group_id
GROUP BY g.id
ORDER BY g.name
""")
return [dict(row) for row in cursor.fetchall()]
def get_creator_group(self, group_id: int) -> Optional[Dict]:
"""Get a single group with full member details"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM paid_content_creator_groups WHERE id = ?", (group_id,))
group = cursor.fetchone()
if not group:
return None
group = dict(group)
cursor.execute("""
SELECT c.id, c.username, c.display_name, c.platform, c.service_id,
c.profile_image_url, c.post_count, c.enabled,
m.added_at, m.filter_tagged_users, m.filter_tag_ids
FROM paid_content_creator_group_members m
JOIN paid_content_creators c ON m.creator_id = c.id
WHERE m.group_id = ?
ORDER BY c.username
""", (group_id,))
group['members'] = [dict(row) for row in cursor.fetchall()]
group['member_count'] = len(group['members'])
return group
def create_creator_group(self, name: str, description: str = None) -> Dict:
"""Create a new creator group"""
now = datetime.now().isoformat()
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO paid_content_creator_groups (name, description, created_at, updated_at) VALUES (?, ?, ?, ?)",
(name, description, now, now)
)
conn.commit()
group_id = cursor.lastrowid
return {'id': group_id, 'name': name, 'description': description, 'member_count': 0, 'created_at': now, 'updated_at': now}
def update_creator_group(self, group_id: int, name: str = None, description: str = None) -> bool:
"""Update a creator group"""
updates = {}
if name is not None:
updates['name'] = name
if description is not None:
updates['description'] = description
if not updates:
return False
updates['updated_at'] = datetime.now().isoformat()
set_clause = ', '.join(f"{k} = ?" for k in updates.keys())
values = list(updates.values()) + [group_id]
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_creator_groups SET {set_clause} WHERE id = ?",
values
)
conn.commit()
return cursor.rowcount > 0
def delete_creator_group(self, group_id: int) -> bool:
"""Delete a creator group (CASCADE removes members)"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_creator_groups WHERE id = ?", (group_id,))
conn.commit()
return cursor.rowcount > 0
def add_creator_to_group(self, group_id: int, creator_id: int) -> bool:
"""Add a creator to a group"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT OR IGNORE INTO paid_content_creator_group_members (group_id, creator_id, added_at) VALUES (?, ?, ?)",
(group_id, creator_id, datetime.now().isoformat())
)
conn.commit()
return cursor.rowcount > 0
def update_pinned_posts(self, creator_id: int, pinned_post_ids: dict):
"""Update is_pinned and pinned_at for posts by their platform post_id.
pinned_post_ids: dict mapping post_id (str) -> {'pinned_at': str or None}
"""
if not pinned_post_ids:
return
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# First, clear pinned status for any posts no longer pinned
placeholders = ', '.join(['?'] * len(pinned_post_ids))
cursor.execute(
f"UPDATE paid_content_posts SET is_pinned = 0, pinned_at = NULL "
f"WHERE creator_id = ? AND is_pinned = 1 AND post_id NOT IN ({placeholders})",
[creator_id] + list(pinned_post_ids.keys())
)
# Then set pinned status for current pinned posts
for post_id_str, info in pinned_post_ids.items():
cursor.execute(
"UPDATE paid_content_posts SET is_pinned = 1, pinned_at = ? "
"WHERE creator_id = ? AND post_id = ? AND is_pinned = 0",
(info.get('pinned_at'), creator_id, post_id_str)
)
conn.commit()
def remove_creator_from_group(self, group_id: int, creator_id: int) -> bool:
"""Remove a creator from a group"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
"DELETE FROM paid_content_creator_group_members WHERE group_id = ? AND creator_id = ?",
(group_id, creator_id)
)
conn.commit()
return cursor.rowcount > 0
def update_group_member_filters(self, group_id: int, creator_id: int,
filter_tagged_users: str = None, filter_tag_ids: str = None) -> bool:
"""Update per-member filter overrides for a group member"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
"""UPDATE paid_content_creator_group_members
SET filter_tagged_users = ?, filter_tag_ids = ?
WHERE group_id = ? AND creator_id = ?""",
(filter_tagged_users, filter_tag_ids, group_id, creator_id)
)
conn.commit()
return cursor.rowcount > 0
# =========================================================================
# AUTO-TAG RULES
# =========================================================================
def get_auto_tag_rules(self) -> List[Dict]:
"""Get all auto-tag rules ordered by priority"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM paid_content_auto_tag_rules ORDER BY priority DESC, id ASC"
)
return [dict(row) for row in cursor.fetchall()]
def get_auto_tag_rule(self, rule_id: int) -> Optional[Dict]:
"""Get a single auto-tag rule by ID"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM paid_content_auto_tag_rules WHERE id = ?", (rule_id,))
row = cursor.fetchone()
return dict(row) if row else None
def create_auto_tag_rule(self, name: str, conditions: Dict, tag_ids: List[int], priority: int = 0) -> int:
"""Create a new auto-tag rule"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
now = datetime.utcnow().isoformat()
cursor.execute(
"""INSERT INTO paid_content_auto_tag_rules (name, conditions, tag_ids, priority, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(name, json.dumps(conditions), json.dumps(tag_ids), priority, now, now)
)
conn.commit()
return cursor.lastrowid
def update_auto_tag_rule(self, rule_id: int, updates: Dict) -> bool:
"""Update an auto-tag rule"""
allowed = {'name', 'enabled', 'conditions', 'tag_ids', 'priority'}
parts = []
values = []
for key, value in updates.items():
if key in allowed:
if key == 'conditions':
value = json.dumps(value) if isinstance(value, dict) else value
elif key == 'tag_ids':
value = json.dumps(value) if isinstance(value, list) else value
parts.append(f"{key} = ?")
values.append(value)
if not parts:
return False
parts.append("updated_at = ?")
values.append(datetime.utcnow().isoformat())
values.append(rule_id)
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute(
f"UPDATE paid_content_auto_tag_rules SET {', '.join(parts)} WHERE id = ?",
values
)
conn.commit()
return cursor.rowcount > 0
def delete_auto_tag_rule(self, rule_id: int) -> bool:
"""Delete an auto-tag rule"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_auto_tag_rules WHERE id = ?", (rule_id,))
conn.commit()
return cursor.rowcount > 0
def evaluate_rules_for_post(self, post_id: int) -> List[int]:
"""Evaluate all enabled auto-tag rules against a post, return matching tag IDs"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
# Get post data with creator info
cursor.execute("""
SELECT p.id, p.title, p.content, p.creator_id,
c.platform, c.username
FROM paid_content_posts p
JOIN paid_content_creators c ON p.creator_id = c.id
WHERE p.id = ?
""", (post_id,))
post = cursor.fetchone()
if not post:
return []
post = dict(post)
# Get attachment info for content_type and resolution checks
cursor.execute("""
SELECT file_type, width, height FROM paid_content_attachments
WHERE post_id = ? AND status != 'duplicate'
""", (post_id,))
attachments = [dict(r) for r in cursor.fetchall()]
# Get enabled rules
cursor.execute(
"SELECT * FROM paid_content_auto_tag_rules WHERE enabled = 1 ORDER BY priority DESC"
)
rules = [dict(r) for r in cursor.fetchall()]
matching_tag_ids = set()
for rule in rules:
conditions = json.loads(rule['conditions']) if isinstance(rule['conditions'], str) else rule['conditions']
tag_ids = json.loads(rule['tag_ids']) if isinstance(rule['tag_ids'], str) else rule['tag_ids']
if self._rule_matches(conditions, post, attachments):
matching_tag_ids.update(tag_ids)
return list(matching_tag_ids)
def _rule_matches(self, conditions: Dict, post: Dict, attachments: List[Dict]) -> bool:
"""Check if a rule's conditions match a post"""
if not conditions:
return False
if 'creator_id' in conditions and conditions['creator_id']:
if post['creator_id'] != conditions['creator_id']:
return False
if 'platform' in conditions and conditions['platform']:
if post.get('platform', '').lower() != conditions['platform'].lower():
return False
if 'content_type' in conditions and conditions['content_type']:
target = conditions['content_type'].lower()
has_type = any(
(a.get('file_type') or '').lower() == target for a in attachments
)
if not has_type:
return False
if 'min_resolution' in conditions and conditions['min_resolution']:
res_map = {'720p': 720, '1080p': 1080, '1440p': 1440, '4k': 2160}
min_h = res_map.get(conditions['min_resolution'], 0)
has_res = any(
(a.get('height') or 0) >= min_h for a in attachments
)
if not has_res:
return False
if 'title_contains' in conditions and conditions['title_contains']:
needle = conditions['title_contains'].lower()
title = (post.get('title') or '').lower()
if needle not in title:
return False
if 'description_contains' in conditions and conditions['description_contains']:
needle = conditions['description_contains'].lower()
content = (post.get('content') or '').lower()
if needle not in content:
return False
return True
def apply_auto_tags_to_post(self, post_id: int) -> int:
"""Evaluate rules and apply matching tags to a post. Returns count of tags applied."""
tag_ids = self.evaluate_rules_for_post(post_id)
if not tag_ids:
return 0
applied = 0
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
for tag_id in tag_ids:
try:
cursor.execute(
"INSERT OR IGNORE INTO paid_content_post_tags (post_id, tag_id) VALUES (?, ?)",
(post_id, tag_id)
)
if cursor.rowcount > 0:
applied += 1
except Exception:
pass
# Increment match counts for matching rules
if applied > 0:
cursor.execute(
"SELECT id, conditions, tag_ids FROM paid_content_auto_tag_rules WHERE enabled = 1"
)
rules = [dict(r) for r in cursor.fetchall()]
# Get post data for re-evaluation
cursor.execute("""
SELECT p.id, p.title, p.content, p.creator_id, c.platform, c.username
FROM paid_content_posts p JOIN paid_content_creators c ON p.creator_id = c.id
WHERE p.id = ?
""", (post_id,))
post = cursor.fetchone()
if post:
post = dict(post)
cursor.execute(
"SELECT file_type, width, height FROM paid_content_attachments WHERE post_id = ? AND status != 'duplicate'",
(post_id,)
)
atts = [dict(r) for r in cursor.fetchall()]
for rule in rules:
conds = json.loads(rule['conditions']) if isinstance(rule['conditions'], str) else rule['conditions']
if self._rule_matches(conds, post, atts):
cursor.execute(
"UPDATE paid_content_auto_tag_rules SET match_count = match_count + 1 WHERE id = ?",
(rule['id'],)
)
conn.commit()
return applied
def run_rules_on_existing_posts(self, rule_id: Optional[int] = None) -> Dict:
"""Run auto-tag rules on all existing posts. Returns stats."""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
if rule_id:
cursor.execute("SELECT * FROM paid_content_auto_tag_rules WHERE id = ? AND enabled = 1", (rule_id,))
else:
cursor.execute("SELECT * FROM paid_content_auto_tag_rules WHERE enabled = 1 ORDER BY priority DESC")
rules = [dict(r) for r in cursor.fetchall()]
if not rules:
return {'posts_checked': 0, 'tags_applied': 0, 'rules_matched': 0}
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM paid_content_posts WHERE deleted_at IS NULL")
post_ids = [row['id'] for row in cursor.fetchall()]
posts_checked = 0
tags_applied = 0
rules_matched = set()
for pid in post_ids:
posts_checked += 1
count = self.apply_auto_tags_to_post(pid)
if count > 0:
tags_applied += count
return {
'posts_checked': posts_checked,
'tags_applied': tags_applied,
'rules_count': len(rules)
}
# =========================================================================
# ANALYTICS
# =========================================================================
def get_storage_growth_over_time(self, days: int = 30) -> List[Dict]:
"""Get daily cumulative storage from downloaded_at"""
days = int(days) # Sanitize
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"""
SELECT DATE(datetime(downloaded_at)) as date,
SUM(file_size) as daily_bytes,
COUNT(*) as daily_count
FROM paid_content_attachments
WHERE downloaded_at IS NOT NULL
AND file_size IS NOT NULL
AND datetime(downloaded_at) >= datetime('now', '-{days} days')
GROUP BY DATE(datetime(downloaded_at))
ORDER BY date ASC
""")
rows = [dict(r) for r in cursor.fetchall()]
# Calculate cumulative totals
# First get total bytes before the window
cursor.execute(f"""
SELECT COALESCE(SUM(file_size), 0) as prior_bytes
FROM paid_content_attachments
WHERE downloaded_at IS NOT NULL AND file_size IS NOT NULL
AND datetime(downloaded_at) < datetime('now', '-{days} days')
""")
prior = cursor.fetchone()
cumulative = prior['prior_bytes'] if prior else 0
result = []
for row in rows:
cumulative += row['daily_bytes'] or 0
result.append({
'date': row['date'],
'daily_bytes': row['daily_bytes'] or 0,
'daily_count': row['daily_count'],
'cumulative_bytes': cumulative
})
return result
def get_downloads_per_period(self, period: str = 'day', days: int = 30) -> List[Dict]:
"""Get download counts per day or week"""
days = int(days) # Sanitize
if period == 'week':
date_expr = "strftime('%Y-W%W', downloaded_at)"
else:
date_expr = "DATE(datetime(downloaded_at))"
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"""
SELECT {date_expr} as period,
COUNT(*) as count,
COALESCE(SUM(file_size), 0) as bytes
FROM paid_content_attachments
WHERE downloaded_at IS NOT NULL
AND datetime(downloaded_at) >= datetime('now', '-{days} days')
GROUP BY {date_expr}
ORDER BY period ASC
""")
return [dict(r) for r in cursor.fetchall()]
def get_storage_by_creator(self, limit: int = 20) -> List[Dict]:
"""Get top creators by storage usage"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT c.id, c.username, c.display_name, c.platform, c.profile_image_url,
COALESCE(SUM(a.file_size), 0) as total_bytes,
COUNT(CASE WHEN a.status = 'completed' THEN 1 END) as file_count
FROM paid_content_creators c
LEFT JOIN paid_content_posts p ON p.creator_id = c.id AND p.deleted_at IS NULL
LEFT JOIN paid_content_attachments a ON a.post_id = p.id AND a.file_size IS NOT NULL
GROUP BY c.id, c.username, c.display_name, c.platform, c.profile_image_url
HAVING COALESCE(SUM(a.file_size), 0) > 0
ORDER BY total_bytes DESC
LIMIT ?
""", (limit,))
return [dict(r) for r in cursor.fetchall()]
def get_platform_distribution(self) -> List[Dict]:
"""Get content count and storage by platform"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT c.platform,
COUNT(DISTINCT p.id) as post_count,
COUNT(CASE WHEN a.status = 'completed' THEN 1 END) as file_count,
COALESCE(SUM(a.file_size), 0) as total_bytes
FROM paid_content_creators c
JOIN paid_content_posts p ON p.creator_id = c.id AND p.deleted_at IS NULL
LEFT JOIN paid_content_attachments a ON a.post_id = p.id AND a.file_size IS NOT NULL
GROUP BY c.platform
ORDER BY total_bytes DESC
""")
return [dict(r) for r in cursor.fetchall()]
def get_content_type_distribution(self) -> List[Dict]:
"""Get content count and storage by file type"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COALESCE(file_type, 'unknown') as content_type,
COUNT(*) as count,
COALESCE(SUM(file_size), 0) as total_bytes
FROM paid_content_attachments
WHERE status = 'completed'
GROUP BY file_type
ORDER BY total_bytes DESC
""")
return [dict(r) for r in cursor.fetchall()]
def get_creator_scorecards(self, sort_by: str = 'total_bytes', limit: int = 50) -> List[Dict]:
"""Get per-creator stats table"""
allowed_sorts = {'total_bytes', 'file_count', 'post_count', 'username', 'latest_download'}
if sort_by not in allowed_sorts:
sort_by = 'total_bytes'
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"""
SELECT c.id, c.username, c.display_name, c.platform, c.profile_image_url,
COUNT(DISTINCT p.id) as post_count,
COUNT(CASE WHEN a.status = 'completed' THEN 1 END) as file_count,
COALESCE(SUM(a.file_size), 0) as total_bytes,
MAX(a.downloaded_at) as latest_download,
COUNT(CASE WHEN a.file_type = 'image' AND a.status = 'completed' THEN 1 END) as image_count,
COUNT(CASE WHEN a.file_type = 'video' AND a.status = 'completed' THEN 1 END) as video_count
FROM paid_content_creators c
LEFT JOIN paid_content_posts p ON p.creator_id = c.id AND p.deleted_at IS NULL
LEFT JOIN paid_content_attachments a ON a.post_id = p.id
GROUP BY c.id, c.username, c.display_name, c.platform, c.profile_image_url
HAVING COUNT(CASE WHEN a.status = 'completed' THEN 1 END) > 0
ORDER BY {sort_by} DESC
LIMIT ?
""", (limit,))
return [dict(r) for r in cursor.fetchall()]
# =========================================================================
# WATCH LATER
# =========================================================================
def get_watch_later(self) -> List[Dict]:
"""Get all watch later items with full metadata"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT wl.id, wl.attachment_id, wl.post_id, wl.creator_id, wl.position, wl.added_at,
a.name as filename, a.file_type, a.extension, a.file_size,
a.width, a.height, a.duration, a.status, a.local_path, a.local_filename,
p.title as post_title, p.post_id as external_post_id,
c.username, c.display_name, c.platform, c.profile_image_url
FROM paid_content_watch_later wl
JOIN paid_content_attachments a ON wl.attachment_id = a.id
JOIN paid_content_posts p ON wl.post_id = p.id
JOIN paid_content_creators c ON wl.creator_id = c.id
ORDER BY wl.position ASC, wl.added_at ASC
""")
return [dict(r) for r in cursor.fetchall()]
def add_to_watch_later(self, attachment_id: int) -> Optional[int]:
"""Add an attachment to watch later. Returns ID or None if already exists."""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
# Get post_id and creator_id from attachment
cursor.execute("""
SELECT a.id, a.post_id, p.creator_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
WHERE a.id = ?
""", (attachment_id,))
att = cursor.fetchone()
if not att:
return None
# Get next position
cursor.execute("SELECT COALESCE(MAX(position), -1) + 1 as next_pos FROM paid_content_watch_later")
next_pos = cursor.fetchone()['next_pos']
try:
cursor.execute(
"""INSERT INTO paid_content_watch_later (attachment_id, post_id, creator_id, position)
VALUES (?, ?, ?, ?)""",
(attachment_id, att['post_id'], att['creator_id'], next_pos)
)
conn.commit()
return cursor.lastrowid
except Exception:
# Already exists (UNIQUE constraint)
return None
def remove_from_watch_later(self, attachment_id: int) -> bool:
"""Remove an attachment from watch later"""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_watch_later WHERE attachment_id = ?", (attachment_id,))
conn.commit()
return cursor.rowcount > 0
def clear_watch_later(self) -> int:
"""Clear all watch later items. Returns count removed."""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM paid_content_watch_later")
conn.commit()
return cursor.rowcount
def reorder_watch_later(self, ordered_ids: List[int]) -> bool:
"""Reorder watch later items by setting positions based on list order.
ordered_ids is a list of watch_later IDs in desired order."""
with self.unified_db.get_connection(for_write=True) as conn:
cursor = conn.cursor()
for pos, wl_id in enumerate(ordered_ids):
cursor.execute(
"UPDATE paid_content_watch_later SET position = ? WHERE id = ?",
(pos, wl_id)
)
conn.commit()
return True
def is_in_watch_later(self, attachment_id: int) -> bool:
"""Check if an attachment is in watch later"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM paid_content_watch_later WHERE attachment_id = ?", (attachment_id,))
return cursor.fetchone() is not None
def get_watch_later_attachment_ids(self) -> List[int]:
"""Get set of attachment IDs in watch later (for toggle state)"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT attachment_id FROM paid_content_watch_later")
return [row['attachment_id'] for row in cursor.fetchall()]
def get_watch_later_count(self) -> int:
"""Get count of watch later items"""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM paid_content_watch_later")
return cursor.fetchone()['count']
# ========================================================================
# GALLERY (flat media timeline)
# ========================================================================
def _build_gallery_base_query(self, select_clause: str,
creator_group_id: int = None,
creator_id: int = None,
content_type: str = None,
min_resolution: str = None,
date_from: str = None,
date_to: str = None,
search: str = None) -> tuple:
"""Build the shared FROM/WHERE for gallery queries.
Returns (query_string, params, needs_group_filter).
"""
query = f"""
{select_clause}
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE a.status = 'completed'
AND a.file_type IN ('image', 'video')
AND p.deleted_at IS NULL
"""
params = []
_has_group_filter = False
if creator_id:
query += " AND p.creator_id = ?"
params.append(creator_id)
if creator_group_id and creator_group_id > 0:
_has_group_filter = True
if content_type:
query += " AND a.file_type = ?"
params.append(content_type)
if min_resolution:
resolution_thresholds = {
'720p': 720, '1080p': 1080, '1440p': 1440, '4k': 2160
}
if min_resolution in resolution_thresholds:
min_dim = resolution_thresholds[min_resolution]
query += " AND a.width IS NOT NULL AND a.height IS NOT NULL AND (a.width >= ? OR a.height >= ?)"
params.extend([min_dim, min_dim])
if date_from:
query += " AND COALESCE(p.published_at, a.downloaded_at) >= ?"
params.append(date_from)
if date_to:
query += " AND COALESCE(p.published_at, a.downloaded_at) <= ?"
params.append(f"{date_to}T23:59:59")
if search:
words = search.strip().split()
for word in words:
w = f"%{word}%"
query += """ AND (p.title ILIKE ? OR p.content ILIKE ?
OR c.username ILIKE ? OR c.display_name ILIKE ?
OR a.name ILIKE ?)"""
params.extend([w, w, w, w, w])
# Per-creator tagged user filter (when not using group filter)
if not _has_group_filter:
query += """
AND (
c.filter_tagged_users IS NULL
OR TRIM(c.filter_tagged_users) = ''
OR TRIM(c.filter_tagged_users) = '[]'
OR EXISTS (
SELECT 1 FROM paid_content_post_tagged_users tu
WHERE tu.post_id = p.id
AND c.filter_tagged_users LIKE '%"' || tu.username || '"%'
)
)
"""
return query, params, _has_group_filter
def get_gallery_media(self, creator_group_id: int = None,
creator_id: int = None,
content_type: str = None,
min_resolution: str = None,
date_from: str = None, date_to: str = None,
search: str = None,
shuffle: bool = False, shuffle_seed: int = None,
limit: int = 200, offset: int = 0) -> List[Dict]:
"""Get flat list of media items for the gallery timeline."""
select = """SELECT a.id, a.post_id, a.name, a.file_type, a.extension,
a.width, a.height, a.duration, a.file_hash,
a.local_path, a.local_filename, a.file_size,
a.downloaded_at,
COALESCE(p.published_at, a.downloaded_at) as media_date,
p.title as post_title, p.is_favorited,
c.username, c.display_name, c.platform, c.service_id,
c.profile_image_url, c.id as creator_id,
i.name as identity_name"""
query, params, _has_group_filter = self._build_gallery_base_query(
select, creator_group_id=creator_group_id,
creator_id=creator_id, content_type=content_type,
min_resolution=min_resolution, date_from=date_from,
date_to=date_to, search=search
)
with self.unified_db.get_connection() as conn:
if _has_group_filter:
group_clause, group_params = self._build_group_member_filter_clause(creator_group_id, conn)
query += group_clause
params.extend(group_params)
if shuffle:
# Deterministic shuffle using PostgreSQL md5 hash — no Python memory needed
seed = shuffle_seed if shuffle_seed is not None else 42
query += " ORDER BY md5(a.id::text || ?::text), a.id"
params.append(str(seed))
else:
query += " ORDER BY COALESCE(p.published_at, a.downloaded_at) DESC, a.id DESC"
query += " LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_gallery_media_count(self, creator_group_id: int = None,
creator_id: int = None,
content_type: str = None,
min_resolution: str = None,
date_from: str = None, date_to: str = None,
search: str = None) -> int:
"""Get total count of gallery media items."""
query, params, _has_group_filter = self._build_gallery_base_query(
"SELECT COUNT(*)", creator_group_id=creator_group_id,
creator_id=creator_id, content_type=content_type,
min_resolution=min_resolution, date_from=date_from,
date_to=date_to, search=search
)
with self.unified_db.get_connection() as conn:
if _has_group_filter:
group_clause, group_params = self._build_group_member_filter_clause(creator_group_id, conn)
query += group_clause
params.extend(group_params)
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchone()[0]
def get_gallery_date_range(self, creator_group_id: int = None,
creator_id: int = None,
content_type: str = None) -> List[Dict]:
"""Get year/month distribution for timeline scrubber."""
select = """SELECT EXTRACT(YEAR FROM COALESCE(p.published_at::timestamp, a.downloaded_at::timestamp))::int as year,
EXTRACT(MONTH FROM COALESCE(p.published_at::timestamp, a.downloaded_at::timestamp))::int as month,
COUNT(*) as count"""
query, params, _has_group_filter = self._build_gallery_base_query(
select, creator_group_id=creator_group_id,
creator_id=creator_id, content_type=content_type
)
query += " GROUP BY year, month ORDER BY year DESC, month DESC"
with self.unified_db.get_connection() as conn:
if _has_group_filter:
group_clause, group_params = self._build_group_member_filter_clause(creator_group_id, conn)
# Insert before GROUP BY
group_idx = query.index(' GROUP BY')
query = query[:group_idx] + group_clause + query[group_idx:]
params.extend(group_params)
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_gallery_group_stats(self) -> List[Dict]:
"""Get per-group media counts and representative thumbnail for gallery landing."""
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
# Get all groups with member counts
cursor.execute("""
SELECT g.id, g.name, g.description,
COUNT(DISTINCT gm.creator_id) as member_count
FROM paid_content_creator_groups g
LEFT JOIN paid_content_creator_group_members gm ON g.id = gm.group_id
GROUP BY g.id, g.name, g.description
ORDER BY g.name
""")
groups = [dict(row) for row in cursor.fetchall()]
for group in groups:
# Single query per group: count + latest attachment + latest date
group_clause, group_params = self._build_group_member_filter_clause(group['id'], conn)
stats_query = """
SELECT COUNT(*) as total_media,
MAX(COALESCE(p.published_at, a.downloaded_at)) as latest_media_date
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE a.status = 'completed'
AND a.file_type IN ('image', 'video')
AND p.deleted_at IS NULL
""" + group_clause
cursor.execute(stats_query, group_params)
row = cursor.fetchone()
group['total_media'] = row['total_media'] if row else 0
group['latest_media_date'] = row['latest_media_date'] if row else None
# Get representative thumbnail (latest image)
if group['total_media'] > 0:
thumb_query = """
SELECT a.id as attachment_id
FROM paid_content_attachments a
JOIN paid_content_posts p ON a.post_id = p.id
JOIN paid_content_creators c ON p.creator_id = c.id
LEFT JOIN paid_content_identities i ON c.identity_id = i.id
WHERE a.status = 'completed'
AND a.file_type IN ('image', 'video')
AND p.deleted_at IS NULL
""" + group_clause + """
ORDER BY COALESCE(p.published_at, a.downloaded_at) DESC
LIMIT 1
"""
cursor.execute(thumb_query, group_params)
latest = cursor.fetchone()
group['representative_attachment_id'] = latest['attachment_id'] if latest else None
else:
group['representative_attachment_id'] = None
return groups