""" OnlyFans Direct API Client Downloads content directly from the OnlyFans API using browser-extracted credentials and dynamic request signing. """ import asyncio import aiohttp import re from datetime import datetime from typing import List, Optional, Dict, Any, Callable from urllib.parse import urlparse, urlencode from modules.base_module import LoggingMixin, RateLimitMixin from .models import Post, Attachment, Message from .onlyfans_signing import OnlyFansSigner class OnlyFansClient(LoggingMixin, RateLimitMixin): """ API client for downloading content directly from OnlyFans. API Endpoints: - Base URL: https://onlyfans.com/api2/v2 - Auth: Requires browser-extracted credentials (sess, auth_id, x-bc, User-Agent) - Signing: Every request needs dynamic sign/time/app-token headers - GET /users/me - Verify auth - GET /users/{username} - Get user profile - GET /users/{user_id}/posts?limit=50&offset={offset} - Get posts (paginated) """ BASE_URL = "https://onlyfans.com/api2/v2" SERVICE_ID = "onlyfans_direct" PLATFORM = "onlyfans" def __init__( self, auth_config: Dict[str, str], signing_url: Optional[str] = None, log_callback: Optional[Callable] = None, ): """ Args: auth_config: Dict with keys: sess, auth_id, auth_uid (optional), x_bc, user_agent signing_url: Optional custom URL for signing rules log_callback: Optional logging callback """ self._init_logger('PaidContent', log_callback, default_module='OnlyFansDirect') # More conservative rate limiting than Fansly (OF is stricter) self._init_rate_limiter( min_delay=1.5, max_delay=3.0, batch_delay_min=3, batch_delay_max=6 ) self.auth_config = auth_config self._session: Optional[aiohttp.ClientSession] = None self._signer = OnlyFansSigner(rules_url=signing_url) async def _get_session(self) -> aiohttp.ClientSession: """Get or create aiohttp session with OnlyFans headers""" if self._session is None or self._session.closed: # Build cookie string cookies = f"sess={self.auth_config['sess']}; auth_id={self.auth_config['auth_id']}" auth_uid = self.auth_config.get('auth_uid') if auth_uid: cookies += f"; auth_uid_{self.auth_config['auth_id']}={auth_uid}" headers = { 'Accept': 'application/json, text/plain, */*', 'User-Agent': self.auth_config.get('user_agent', ''), 'x-bc': self.auth_config.get('x_bc', ''), 'Cookie': cookies, 'Origin': 'https://onlyfans.com', 'Referer': 'https://onlyfans.com/', } timeout = aiohttp.ClientTimeout(total=60) self._session = aiohttp.ClientSession(headers=headers, timeout=timeout) return self._session async def _sign_request(self, endpoint: str) -> Dict[str, str]: """ Compute signing headers for an API request. Args: endpoint: API path (e.g. "/users/me") - will be prefixed with /api2/v2 Returns: Dict with sign, time, app-token, user-id headers """ user_id = self.auth_config.get('auth_id', '0') # Sign with full URL path (matching OF-Scraper) full_path = f"/api2/v2{endpoint}" sign_headers = await self._signer.sign(full_path, user_id) sign_headers['user-id'] = user_id return sign_headers async def _api_request(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]: """ Make a signed API request to OnlyFans. Handles 401 (auth failure), 429 (rate limit), and general errors. Auto-retries on 429 with exponential backoff. Args: endpoint: API path (e.g. "/users/me") params: Optional query parameters Returns: Parsed JSON response or None on failure """ session = await self._get_session() # Include query params in the signing path (OF-Scraper does this) sign_endpoint = endpoint if params: sign_endpoint = f"{endpoint}?{urlencode(params)}" sign_headers = await self._sign_request(sign_endpoint) url = f"{self.BASE_URL}{endpoint}" max_retries = 3 for attempt in range(max_retries): try: async with session.get(url, params=params, headers=sign_headers) as resp: if resp.status == 200: return await resp.json() elif resp.status == 401: self.log("OnlyFans auth failed (401) - credentials may be expired", 'error') return None elif resp.status == 429: retry_after = int(resp.headers.get('Retry-After', 30)) wait = min(retry_after * (attempt + 1), 120) self.log(f"Rate limited (429), waiting {wait}s (attempt {attempt + 1}/{max_retries})", 'warning') await asyncio.sleep(wait) # Refresh signing headers for retry (timestamp changes) sign_headers = await self._sign_request(sign_endpoint) continue elif resp.status == 404: self.log(f"Not found (404): {endpoint}", 'debug') return None else: text = await resp.text() self.log(f"API error: HTTP {resp.status} for {endpoint}: {text[:200]}", 'warning') return None except asyncio.TimeoutError: self.log(f"Request timeout for {endpoint} (attempt {attempt + 1})", 'warning') if attempt < max_retries - 1: await asyncio.sleep(5 * (attempt + 1)) sign_headers = await self._sign_request(sign_endpoint) continue return None except Exception as e: self.log(f"Request error for {endpoint}: {e}", 'error') return None return None @staticmethod def _strip_html(text: str) -> str: """Strip HTML tags and convert common entities to plain text""" if not text: return '' text = re.sub(r'', '\n', text) text = re.sub(r'<[^>]+>', '', text) text = text.replace('&', '&').replace('<', '<').replace('>', '>').replace(''', "'").replace('"', '"') return text.strip() async def close(self): """Close the aiohttp session""" if self._session and not self._session.closed: await self._session.close() self._session = None async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def check_auth(self) -> Dict[str, Any]: """ Verify credentials by calling /users/me. Returns: Dict with 'valid' bool and optionally 'user_id', 'username', 'name' """ self._delay_between_items() try: data = await self._api_request("/users/me") if data and data.get('id'): return { 'valid': True, 'user_id': str(data['id']), 'username': data.get('username', ''), 'name': data.get('name', ''), } return {'valid': False, 'error': 'Invalid credentials or unexpected response'} except Exception as e: self.log(f"Error checking auth: {e}", 'error') return {'valid': False, 'error': str(e)} async def get_user_info(self, username: str) -> Optional[Dict[str, Any]]: """ Get user profile info. Args: username: The OnlyFans username Returns: Normalized user info dict or None """ self._delay_between_items() try: data = await self._api_request(f"/users/{username}") if not data or not data.get('id'): self.log(f"User not found: {username}", 'warning') return None return { 'user_id': str(data['id']), 'username': data.get('username', username), 'display_name': data.get('name', ''), 'avatar_url': data.get('avatar'), 'banner_url': data.get('header'), 'bio': self._strip_html(data.get('rawAbout') or data.get('about') or ''), 'join_date': (data.get('joinDate') or '')[:10] or None, 'posts_count': data.get('postsCount', 0), } except Exception as e: self.log(f"Error getting user info for {username}: {e}", 'error') return None async def get_single_post(self, post_id: str) -> Optional[Post]: """ Fetch a single post by its OnlyFans post ID. Args: post_id: The OnlyFans post ID Returns: Post object or None """ self._delay_between_items() data = await self._api_request(f"/posts/{post_id}") if not data: self.log(f"Post {post_id} not found", 'warning') return None user_id = str(data.get('author', {}).get('id', data.get('authorId', ''))) post = self._parse_post(data, user_id) return post async def get_posts( self, user_id: str, username: str, since_date: Optional[str] = None, until_date: Optional[str] = None, days_back: Optional[int] = None, max_posts: Optional[int] = None, progress_callback: Optional[Callable[[int, int], None]] = None, ) -> List[Post]: """ Fetch posts from a creator's timeline using offset-based pagination. Args: user_id: The OnlyFans numeric user ID username: The username (for logging/reference) since_date: Only fetch posts after this date (ISO format) until_date: Only fetch posts before this date (ISO format) days_back: Fetch posts from the last N days max_posts: Maximum number of posts to fetch progress_callback: Called with (page, total_posts) during fetching Returns: List of Post objects """ self.log(f"Fetching posts for {username} (user_id: {user_id})", 'info') # Calculate date filters - use naive datetimes to avoid tz comparison issues since_dt = None until_dt = None if days_back: from datetime import timedelta since_date = (datetime.now() - timedelta(days=days_back)).isoformat() if since_date: try: dt = datetime.fromisoformat(since_date.replace('Z', '+00:00')) since_dt = dt.replace(tzinfo=None) # Normalize to naive except (ValueError, TypeError): pass if until_date: try: dt = datetime.fromisoformat(until_date.replace('Z', '+00:00')) until_dt = dt.replace(tzinfo=None) # Normalize to naive except (ValueError, TypeError): pass if since_dt: self.log(f"Date filter: since_date={since_dt.isoformat()}", 'debug') all_posts: List[Post] = [] offset = 0 page_size = 50 page = 0 consecutive_old = 0 # Track consecutive old posts for early stop while True: self._delay_between_items() params = { 'limit': str(page_size), 'offset': str(offset), 'order': 'publish_date_desc', } data = await self._api_request(f"/users/{user_id}/posts", params=params) if not data: break # OF returns a list of posts directly posts_list = data if isinstance(data, list) else data.get('list', []) if not posts_list: break page_had_old_post = False for post_data in posts_list: post = self._parse_post(post_data, user_id) if not post: continue # Check date filters using published_at if post.published_at and since_dt: try: post_dt = datetime.fromisoformat(post.published_at.replace('Z', '+00:00')) post_dt_naive = post_dt.replace(tzinfo=None) # Normalize to naive if post_dt_naive < since_dt: self.log(f"Reached posts older than since_date ({post.published_at}), stopping", 'debug') return all_posts except (ValueError, TypeError) as e: self.log(f"Date comparison error: {e} (post_date={post.published_at})", 'warning') if post.published_at and until_dt: try: post_dt = datetime.fromisoformat(post.published_at.replace('Z', '+00:00')) post_dt_naive = post_dt.replace(tzinfo=None) if post_dt_naive > until_dt: continue except (ValueError, TypeError): pass all_posts.append(post) if max_posts and len(all_posts) >= max_posts: self.log(f"Reached max_posts limit: {max_posts}", 'debug') return all_posts page += 1 if progress_callback: progress_callback(page, len(all_posts)) # If we got fewer results than page_size, we've reached the end if len(posts_list) < page_size: break offset += page_size self._delay_between_batches() # Also fetch pinned posts (they may not appear in the timeline) self._delay_between_items() pinned_data = await self._api_request( f"/users/{user_id}/posts", params={'limit': '50', 'offset': '0', 'order': 'publish_date_desc', 'pinned': '1'}, ) if pinned_data: pinned_list = pinned_data if isinstance(pinned_data, list) else pinned_data.get('list', []) existing_ids = {p.post_id for p in all_posts} for post_data in pinned_list: post = self._parse_post(post_data, user_id) if post and post.post_id not in existing_ids: all_posts.append(post) self.log(f"Fetched {len(all_posts)} posts for {username}", 'info') return all_posts def _parse_post(self, post_data: Dict, user_id: str) -> Optional[Post]: """ Parse an OnlyFans post into a Post model. Args: post_data: Raw post data from API user_id: Creator's user ID Returns: Post object or None if parsing fails """ try: post_id = str(post_data.get('id', '')) if not post_id: return None # Parse timestamp - OF uses ISO format strings published_at = None raw_date = post_data.get('postedAt') or post_data.get('createdAt') if raw_date: try: if isinstance(raw_date, str): published_at = raw_date elif isinstance(raw_date, (int, float)): published_at = datetime.fromtimestamp(raw_date).isoformat() except (ValueError, TypeError, OSError): pass # Content text content = self._strip_html(post_data.get('rawText') or post_data.get('text') or '') # Parse media attachments attachments = [] media_list = post_data.get('media', []) or [] for media_item in media_list: attachment = self._parse_attachment(media_item) if attachment: attachments.append(attachment) # Extract embed URLs from content text embed_urls = [] if content: url_pattern = r'https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/|vimeo\.com/|dailymotion\.com/video/)\S+' embed_urls = re.findall(url_pattern, content) return Post( post_id=post_id, service_id=self.SERVICE_ID, platform=self.PLATFORM, creator_id=user_id, title=None, content=content, published_at=published_at, added_at=datetime.now().isoformat(), attachments=attachments, embed_urls=embed_urls, is_pinned=bool(post_data.get('isPinned')), pinned_at=post_data.get('pinnedAt'), ) except Exception as e: self.log(f"Error parsing post: {e}", 'error') return None def _parse_attachment(self, media_item: Dict) -> Optional[Attachment]: """ Parse an OnlyFans media item into an Attachment. OF media structure: { id, type, source: {source: url, width, height, duration}, full: {source: url, ...}, preview: {source: url, ...} } Prefers 'full' quality (OF's standard since 2024), falls back to 'source'. Args: media_item: Raw media dict from API Returns: Attachment object or None """ try: media_id = str(media_item.get('id', '')) media_type = media_item.get('type', '').lower() # Map OF media types to our file types type_map = { 'photo': 'image', 'video': 'video', 'audio': 'audio', 'gif': 'image', } file_type = type_map.get(media_type, 'unknown') # Get download URL - prefer 'full' quality, fallback to 'source' download_url = None width = None height = None duration = None # Current OF API nests media under 'files' key files = media_item.get('files') or media_item # Try 'full' first (higher quality) full_data = files.get('full') if full_data and isinstance(full_data, dict): download_url = full_data.get('url') or full_data.get('source') width = full_data.get('width') height = full_data.get('height') duration = full_data.get('duration') # Fallback to 'source' if not download_url: source_data = files.get('source') if source_data and isinstance(source_data, dict): download_url = source_data.get('url') or source_data.get('source') if not width: width = source_data.get('width') if not height: height = source_data.get('height') if not duration: duration = source_data.get('duration') # For videos without a direct URL, get metadata from media item can_view = media_item.get('canView', True) if not download_url and media_type == 'video': # OF DRM videos use FairPlay SAMPLE-AES encryption — cannot be downloaded. # Get dimensions/duration for metadata, then fall through to preview frame. if not duration: duration = media_item.get('duration') if not width: width = (full_data or {}).get('width') if not height: height = (full_data or {}).get('height') # Fallback to 'preview' for any content type # For DRM videos (canView=true), downloads the preview frame image (shown with lock overlay) # For PPV videos (canView=false), there's no preview — marked unavailable if not download_url: preview_data = files.get('preview') if preview_data and isinstance(preview_data, dict): download_url = preview_data.get('url') or preview_data.get('source') if not width: width = preview_data.get('width') if not height: height = preview_data.get('height') # Some OF responses have src directly if not download_url: download_url = media_item.get('src') # Determine extension from URL ext = '' if download_url: parsed = urlparse(download_url) path = parsed.path if '.' in path: ext = path.rsplit('.', 1)[-1].lower() # Clean up common issues if ext in ('jpeg',): ext = 'jpg' elif media_type == 'photo': ext = 'jpg' elif media_type == 'video': ext = 'mp4' filename = f"{media_id}.{ext}" if ext else str(media_id) # Override file_type based on actual extension (OF sometimes misreports type) video_exts = {'mp4', 'mov', 'webm', 'avi', 'mkv', 'flv', 'm4v', 'wmv', 'mpg', 'mpeg'} if ext in video_exts and file_type != 'video': file_type = 'video' # Duration may be in seconds (float or int) if duration is not None: try: duration = int(float(duration)) except (ValueError, TypeError): duration = None # Check if content is actually locked (canView=false) vs just missing URL can_view = media_item.get('canView', True) is_preview = not can_view if not download_url and not can_view: self.log(f"PPV/locked content: {filename}", 'debug') # Detect preview-only: no full/source URL but got a preview URL if not is_preview and download_url: has_full = False if full_data and isinstance(full_data, dict): has_full = bool(full_data.get('url') or full_data.get('source')) if not has_full: source_data = files.get('source') if source_data and isinstance(source_data, dict): has_full = bool(source_data.get('url') or source_data.get('source')) elif not source_data: has_full = False if not has_full and not media_item.get('src'): # Only got URL from preview fallback is_preview = True return Attachment( name=filename, server_path=f"/onlyfans/{media_id}", file_type=file_type, extension=ext if ext else None, download_url=download_url, file_size=None, width=width, height=height, duration=duration, is_preview=is_preview, ) except Exception as e: self.log(f"Error parsing attachment: {e}", 'error') return None # ==================== MESSAGES ==================== async def get_messages(self, user_id: str, max_messages: int = 500) -> List[Message]: """ Fetch messages from a conversation with a creator. Uses GET /chats/{user_id}/messages with cursor-based pagination. The 'id' param is used as cursor for older messages. Args: user_id: OnlyFans numeric user ID of the creator max_messages: Maximum number of messages to fetch Returns: List of Message objects """ messages = [] cursor_id = None page = 0 while len(messages) < max_messages: page += 1 params = {'limit': 50, 'order': 'desc'} if cursor_id: params['id'] = cursor_id data = await self._api_request(f"/chats/{user_id}/messages", params=params) if not data: break # Response is a dict with 'list' key containing messages msg_list = data.get('list', []) if isinstance(data, dict) else data if not msg_list: break for msg_data in msg_list: msg = self._parse_message(msg_data, user_id) if msg: messages.append(msg) self.log(f"Fetched page {page}: {len(msg_list)} messages (total: {len(messages)})", 'debug') # Use the last message's id as cursor for next page if len(msg_list) < 50: break # Last page last_id = msg_list[-1].get('id') if last_id and str(last_id) != str(cursor_id): cursor_id = last_id else: break self.log(f"Fetched {len(messages)} messages for user {user_id}", 'info') return messages def _parse_message(self, msg_data: Dict, creator_user_id: str) -> Optional[Message]: """ Parse an OnlyFans message into a Message model. Args: msg_data: Raw message dict from API creator_user_id: Numeric user ID of the creator (to determine direction) Returns: Message object or None """ try: msg_id = str(msg_data.get('id', '')) if not msg_id: return None # Determine if message is from creator from_user = msg_data.get('fromUser', {}) from_user_id = str(from_user.get('id', '')) is_from_creator = (from_user_id == str(creator_user_id)) # Parse text text = self._strip_html(msg_data.get('text') or '') # Parse timestamp created_at = msg_data.get('createdAt') sent_at = None if created_at: try: sent_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')).isoformat() except (ValueError, TypeError): sent_at = created_at # PPV/price info price = msg_data.get('price') is_free = msg_data.get('isFree', True) is_purchased = msg_data.get('isOpened', False) or msg_data.get('canPurchase') is False is_tip = msg_data.get('isTip', False) tip_amount = msg_data.get('tipAmount') # Parse media attachments (same structure as posts) attachments = [] media_list = msg_data.get('media', []) or [] for media_item in media_list: att = self._parse_attachment(media_item) if att: attachments.append(att) return Message( message_id=msg_id, platform=self.PLATFORM, service_id=self.SERVICE_ID, creator_id=str(creator_user_id), text=text if text else None, sent_at=sent_at, is_from_creator=is_from_creator, is_tip=bool(is_tip), tip_amount=float(tip_amount) if tip_amount else None, price=float(price) if price else None, is_free=bool(is_free), is_purchased=bool(is_purchased), attachments=attachments, ) except Exception as e: self.log(f"Error parsing message: {e}", 'error') return None