""" Utility functions for Paid Content feature """ import re from typing import Optional, Tuple from urllib.parse import urlparse def _extract_xenforo_search_query(parsed) -> Optional[str]: """Extract the 'q' search parameter from a XenForo search URL.""" from urllib.parse import parse_qs, unquote_plus qs = parse_qs(parsed.query) query = qs.get('q', [''])[0] if not query: m = re.search(r'[&?]q=([^&]+)', parsed.query) if m: query = unquote_plus(m.group(1)) return query or None def parse_creator_url(url: str) -> Optional[Tuple[str, str, str]]: """ Parse a Coomer/Kemono/YouTube/Twitch/Fansly creator URL Args: url: URL like https://coomer.party/onlyfans/user/creatorid or https://www.youtube.com/@channelhandle or https://www.youtube.com/channel/UCxxxxx or https://www.twitch.tv/username/clips or https://fansly.com/username Returns: Tuple of (service_id, platform, creator_id) or None if invalid """ try: parsed = urlparse(url) host = parsed.netloc.lower() # Handle YouTube URLs if 'youtube.com' in host or 'youtu.be' in host: channel_id = _extract_youtube_channel_id(url) if channel_id: return ('youtube', 'youtube', channel_id) return None # Handle Twitch URLs if 'twitch.tv' in host: channel_name = _extract_twitch_channel_name(url) if channel_name: return ('twitch', 'twitch', channel_name) return None # Handle Fansly URLs (direct API) if 'fansly.com' in host: username = _extract_fansly_username(url) if username: return ('fansly_direct', 'fansly', username) return None # Handle OnlyFans URLs (direct API) if 'onlyfans.com' in host: path_parts = [p for p in parsed.path.strip('/').split('/') if p] if path_parts: username = path_parts[0] if username.lower() not in ('my', 'api2', 'settings', 'search', 'notifications', 'chats', 'vault', 'lists', 'bookmarks', 'statements', 'help', 'terms', 'privacy', 'dmca', 'contact'): return ('onlyfans_direct', 'onlyfans', username) return None # Handle Pornhub URLs if 'pornhub.com' in host: creator_id = _extract_pornhub_creator_id(url) if creator_id: return ('pornhub', 'pornhub', creator_id) return None # Handle XHamster URLs if 'xhamster' in host: creator_id = _extract_xhamster_creator_id(url) if creator_id: return ('xhamster', 'xhamster', creator_id) return None # Handle TikTok URLs if 'tiktok.com' in host: username = _extract_tiktok_username(url) if username: return ('tiktok', 'tiktok', username) return None # Handle Instagram URLs if 'instagram.com' in host: username = _extract_instagram_username(url) if username: return ('instagram', 'instagram', username) return None # Handle BestEyeCandy URLs if 'besteyecandy.com' in host: cid_match = re.search(r'cid-(\d+)', parsed.path) slug_match = re.search(r'/([^/]+)\.html$', parsed.path) if cid_match and slug_match: slug = slug_match.group(1) return ('besteyecandy', 'besteyecandy', f"{cid_match.group(1)}/{slug}") elif cid_match: return ('besteyecandy', 'besteyecandy', cid_match.group(1)) return None # Handle Coppermine gallery URLs # Match: domain.com/gallery/, domain.com/cpg/, domain.com/coppermine/ # Also match direct index.php/thumbnails.php/displayimage.php pages if any(p in parsed.path.lower() for p in ['/gallery/', '/cpg/', '/coppermine/']) or \ re.search(r'(?:index|thumbnails|displayimage)\.php', parsed.path): # Normalize to gallery root base_path = re.sub( r'(?:index|thumbnails|displayimage)\.php.*$', '', parsed.path ) base_path = base_path.rstrip('/') if base_path: # Use domain + path as creator_id (e.g. kylie-jenner.org/gallery) creator_id = host.replace('www.', '') + base_path return ('coppermine', 'coppermine', creator_id) # Handle Bellazon URLs (forum threads as creators) if 'bellazon' in host: match = re.search(r'/topic/(\d+)-([^/]+)', parsed.path) if match: topic_id = match.group(1) return ('bellazon', 'bellazon', topic_id) return None # Handle Reddit URLs if 'reddit.com' in host: # Handle reddit.com/r/subreddit, old.reddit.com/r/subreddit, etc. path_parts = [p for p in parsed.path.strip('/').split('/') if p] if len(path_parts) >= 2 and path_parts[0] == 'r': subreddit = path_parts[1].lower() return ('reddit', 'reddit', subreddit) return None # Handle Snapchat URLs if 'snapchat.com' in host: # Handle snapchat.com/@username and story.snapchat.com/@username path_parts = [p for p in parsed.path.strip('/').split('/') if p] if path_parts: username = path_parts[0].lstrip('@') if username: return ('snapchat', 'snapchat', username) return None # Handle HQCelebCorner URLs if 'hqcelebcorner' in host: query = _extract_xenforo_search_query(parsed) if query: return ('hqcelebcorner', 'hqcelebcorner', query) return None # Handle PicturePub URLs if 'picturepub' in host: query = _extract_xenforo_search_query(parsed) if query: return ('picturepub', 'picturepub', query) return None # Handle Soundgasm URLs if 'soundgasm.net' in host: path_parts = [p for p in parsed.path.strip('/').split('/') if p] if len(path_parts) >= 2 and path_parts[0] in ('u', 'user'): return ('soundgasm', 'soundgasm', path_parts[1]) return None # Handle Liltsome URLs (archive, maps to soundgasm platform) if 'liltsome.yerf.org' in host: # Hash-based routing: /#/artist/{name} fragment = parsed.fragment # e.g. "/artist/kinkyshibby" if fragment: parts = [p for p in fragment.strip('/').split('/') if p] if len(parts) >= 2 and parts[0] == 'artist': return ('soundgasm', 'soundgasm', parts[1]) return None # Determine service (Coomer/Kemono) if 'coomer' in host: service_id = 'coomer' elif 'kemono' in host: service_id = 'kemono' else: return None # Parse path: /platform/user/creatorid path_parts = [p for p in parsed.path.strip('/').split('/') if p] if len(path_parts) >= 3 and path_parts[1] == 'user': platform = path_parts[0] creator_id = path_parts[2] return (service_id, platform, creator_id) return None except Exception: return None def _extract_youtube_channel_id(url: str) -> Optional[str]: """ Extract channel identifier from various YouTube URL formats Supports: - youtube.com/channel/UC... - youtube.com/@handle - youtube.com/c/channelname - youtube.com/user/username """ patterns = [ r'youtube\.com/channel/([a-zA-Z0-9_-]+)', r'youtube\.com/@([a-zA-Z0-9_.-]+)', r'youtube\.com/c/([a-zA-Z0-9_-]+)', r'youtube\.com/user/([a-zA-Z0-9_-]+)', ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def _extract_twitch_channel_name(url: str) -> Optional[str]: """ Extract channel name from Twitch URL Supports: - twitch.tv/username - twitch.tv/username/clips - m.twitch.tv/username/clips """ patterns = [ r'twitch\.tv/([a-zA-Z0-9_]+)(?:/clips)?', ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1).lower() return None def _extract_fansly_username(url: str) -> Optional[str]: """ Extract username from Fansly URL Supports: - fansly.com/username - fansly.com/username/posts - fansly.com/username/media """ patterns = [ r'fansly\.com/([a-zA-Z0-9_.-]+)(?:/(?:posts|media))?', ] for pattern in patterns: match = re.search(pattern, url) if match: username = match.group(1) # Filter out known non-username paths if username.lower() not in ('explore', 'search', 'settings', 'notifications', 'messages', 'live'): return username return None def _extract_pornhub_creator_id(url: str) -> Optional[str]: """Extract creator identifier from Pornhub URL, returns 'type/name' format""" patterns = [ r'pornhub\.com/pornstar/([a-zA-Z0-9_-]+)', r'pornhub\.com/channels/([a-zA-Z0-9_-]+)', r'pornhub\.com/users/([a-zA-Z0-9_-]+)', r'pornhub\.com/model/([a-zA-Z0-9_-]+)', ] for pattern in patterns: match = re.search(pattern, url) if match: # Store as "type/name" to preserve the URL type type_match = re.search(r'pornhub\.com/(pornstar|channels|users|model)/', url) return f"{type_match.group(1)}/{match.group(1)}" if type_match else match.group(1) return None def _extract_xhamster_creator_id(url: str) -> Optional[str]: """Extract creator identifier from XHamster URL, returns 'type/name' format""" patterns = [ r'xhamster\d*\.com/creators/([a-zA-Z0-9_-]+)', r'xhamster\d*\.com/channels/([a-zA-Z0-9_-]+)', ] for pattern in patterns: match = re.search(pattern, url) if match: type_match = re.search(r'xhamster\d*\.com/(creators|channels)/', url) return f"{type_match.group(1)}/{match.group(1)}" if type_match else match.group(1) return None def _extract_tiktok_username(url: str) -> Optional[str]: """Extract username from TikTok URL""" match = re.search(r'tiktok\.com/@([a-zA-Z0-9_.]+)', url) if match: return match.group(1) return None def _extract_instagram_username(url: str) -> Optional[str]: """Extract username from Instagram URL""" match = re.search(r'instagram\.com/([a-zA-Z0-9_.]+)/?', url) if match: username = match.group(1).lower() non_usernames = { 'explore', 'reels', 'stories', 'p', 'tv', 'accounts', 'direct', 'about', 'legal', 'developer', 'privacy', 'terms', 'help', 'api', 'reel', 'tags' } if username not in non_usernames: return username return None def parse_post_url(url: str) -> Optional[Tuple[str, str, str, str]]: """ Parse a Coomer/Kemono post URL Args: url: URL like https://coomer.party/onlyfans/user/creatorid/post/postid Returns: Tuple of (service_id, platform, creator_id, post_id) or None if invalid """ try: parsed = urlparse(url) host = parsed.netloc.lower() # Determine service if 'coomer' in host: service_id = 'coomer' elif 'kemono' in host: service_id = 'kemono' else: return None # Parse path: /platform/user/creatorid/post/postid path_parts = [p for p in parsed.path.strip('/').split('/') if p] if len(path_parts) >= 5 and path_parts[1] == 'user' and path_parts[3] == 'post': platform = path_parts[0] creator_id = path_parts[2] post_id = path_parts[4] return (service_id, platform, creator_id, post_id) return None except Exception: return None def format_file_size(size_bytes: int) -> str: """Format file size in human-readable format""" if size_bytes is None: return 'Unknown' for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if abs(size_bytes) < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} PB" def sanitize_filename(name: str, max_length: int = 200) -> str: """ Sanitize a string for use in a filename Args: name: String to sanitize max_length: Maximum length of result Returns: Sanitized filename """ if not name: return 'unnamed' # Remove/replace invalid characters name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', name) name = re.sub(r'\s+', '-', name.strip()) name = name.strip('.-') if len(name) > max_length: name = name[:max_length] return name or 'unnamed' def extract_platform_from_domain(domain: str) -> Optional[str]: """Extract platform name from domain""" domain = domain.lower().replace('www.', '') platform_domains = { 'onlyfans.com': 'onlyfans', 'fansly.com': 'fansly', 'patreon.com': 'patreon', 'fanbox.cc': 'fanbox', 'gumroad.com': 'gumroad', 'subscribestar.com': 'subscribestar', 'subscribestar.adult': 'subscribestar', 'discord.com': 'discord', 'discord.gg': 'discord', 'candfans.jp': 'candfans', } return platform_domains.get(domain) def detect_content_type(filename: str) -> str: """Detect content type from filename extension""" if not filename: return 'unknown' ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else '' image_exts = {'jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp', 'tiff', 'heic', 'heif', 'avif'} video_exts = {'mp4', 'mov', 'avi', 'mkv', 'webm', 'm4v', 'wmv', 'flv', 'mpeg', 'mpg', '3gp'} audio_exts = {'mp3', 'wav', 'flac', 'aac', 'm4a', 'ogg', 'wma'} archive_exts = {'zip', 'rar', '7z', 'tar', 'gz', 'bz2'} document_exts = {'pdf', 'doc', 'docx', 'txt', 'rtf', 'odt'} if ext in image_exts: return 'image' elif ext in video_exts: return 'video' elif ext in audio_exts: return 'audio' elif ext in archive_exts: return 'archive' elif ext in document_exts: return 'document' else: return 'unknown' def get_service_platforms(service_id: str) -> list: """Get supported platforms for a service""" platforms = { 'coomer': ['onlyfans', 'fansly', 'candfans'], 'kemono': ['patreon', 'fanbox', 'gumroad', 'subscribestar', 'discord'], 'youtube': ['youtube'], 'twitch': ['twitch'], 'fansly_direct': ['fansly'], 'onlyfans_direct': ['onlyfans'], 'pornhub': ['pornhub'], 'xhamster': ['xhamster'], 'tiktok': ['tiktok'], 'instagram': ['instagram'], 'soundgasm': ['soundgasm'], 'bellazon': ['bellazon'], 'besteyecandy': ['besteyecandy'], 'snapchat': ['snapchat'], 'reddit': ['reddit'], 'coppermine': ['coppermine'], 'hqcelebcorner': ['hqcelebcorner'], 'picturepub': ['picturepub'], } return platforms.get(service_id, []) def get_service_base_url(service_id: str) -> Optional[str]: """ Get base URL for a service. Note: For dynamic URLs, use the database (paid_content_services table). These are fallback defaults only. """ # Import here to avoid circular dependency from .api_client import PaidContentAPIClient return PaidContentAPIClient.DEFAULT_SERVICE_URLS.get(service_id)