""" Snapchat Client for Paid Content - Wraps SnapchatClientDownloader for paid content system. Maps spotlights and highlights to the Post/Attachment model used by the paid content scraper. """ from datetime import datetime from typing import Dict, List, Optional from modules.base_module import LoggingMixin from .models import Creator, Post, Attachment class SnapchatPaidContentClient(LoggingMixin): """ Client for fetching Snapchat creator content via the existing SnapchatClientDownloader. Each spotlight/highlight collection maps to one Post with snaps as Attachments. """ SERVICE_ID = 'snapchat' PLATFORM = 'snapchat' def __init__(self, unified_db=None, log_callback=None): self._init_logger('PaidContent', log_callback, default_module='Snapchat') self.unified_db = unified_db self._downloader = None def _get_downloader(self): """Lazy-init the underlying SnapchatClientDownloader.""" if self._downloader is None: from modules.snapchat_client_module import SnapchatClientDownloader self._downloader = SnapchatClientDownloader( show_progress=False, use_database=False, log_callback=self.log_callback, unified_db=self.unified_db, ) return self._downloader def get_creator_info(self, username: str) -> Optional[Dict]: """Get creator information from profile page __NEXT_DATA__. Returns dict with display_name and avatar_url if found. """ downloader = self._get_downloader() profile_url = f"https://story.snapchat.com/@{username}" html = downloader._fetch_page(profile_url) if not html: return {'creator_id': username, 'creator_name': username} data = downloader._extract_next_data(html) display_name = username avatar_url = None if data: props = data.get('props', {}).get('pageProps', {}) # userProfile uses a $case/userInfo wrapper user_profile = props.get('userProfile', {}) user_info = user_profile.get('userInfo', {}) if user_info: name = user_info.get('displayName', '').strip() if name: display_name = name # Bitmoji 3D avatar URL (best quality) bitmoji = user_info.get('bitmoji3d') or {} if isinstance(bitmoji, dict): avatar_url = bitmoji.get('avatarUrl') or bitmoji.get('url') # linkPreview OG images as avatar (preview/square.jpeg — good quality) if not avatar_url: link_preview = props.get('linkPreview', {}) for img_key in ('facebookImage', 'twitterImage'): img = link_preview.get(img_key, {}) if isinstance(img, dict) and img.get('url'): avatar_url = img['url'] break # pageMetadata.pageTitle sometimes has the display name if display_name == username: page_meta = props.get('pageMetadata', {}) page_title = page_meta.get('pageTitle', '') # Format: "DisplayName (@username) | Snapchat..." if page_title and '(@' in page_title: name_part = page_title.split('(@')[0].strip() if name_part: display_name = name_part return { 'creator_id': username, 'creator_name': display_name, 'profile_image_url': avatar_url, } def get_creator(self, username: str) -> Optional[Creator]: """Get Creator model for a Snapchat user.""" info = self.get_creator_info(username) if not info: return None return Creator( creator_id=username, service_id=self.SERVICE_ID, platform=self.PLATFORM, username=info.get('creator_name', username), display_name=info.get('creator_name'), profile_image_url=info.get('profile_image_url'), ) def get_posts(self, username: str, since_date: str = None) -> List[Post]: """Fetch spotlights and highlights as Post objects. Args: username: Snapchat username (without @) since_date: ISO date string; skip snaps older than this Returns: List of Post objects (one per spotlight/highlight collection) """ downloader = self._get_downloader() # Parse cutoff date cutoff_dt = None if since_date: try: if 'T' in since_date: cutoff_dt = datetime.fromisoformat(since_date.replace('Z', '+00:00').replace('+00:00', '')) else: cutoff_dt = datetime.strptime(since_date[:10], '%Y-%m-%d') except (ValueError, IndexError): pass # Discover content from profile (spotlights, highlights, stories) profile_content = downloader.get_profile_content(username) self.log(f"Found {len(profile_content.get('spotlights', []))} spotlights, " f"{len(profile_content.get('highlight_collections', []))} highlights, " f"{'stories' if profile_content.get('story_collection') else 'no stories'} " f"for @{username}", 'info') posts = [] # Process story snaps (inline from profile page — no extra HTTP requests) story_collection = profile_content.get('story_collection') if story_collection and story_collection.snaps: post = self._collection_to_post(story_collection, username, cutoff_dt) if post and post.attachments: posts.append(post) # Process highlights (inline from profile page — no extra HTTP requests) for collection in profile_content.get('highlight_collections', []): post = self._collection_to_post(collection, username, cutoff_dt) if post and post.attachments: posts.append(post) # Process spotlights (still requires per-URL fetch for full metadata) for url in profile_content.get('spotlights', []): collection = downloader.get_spotlight_metadata(url) if not collection: continue post = self._collection_to_post(collection, username, cutoff_dt) if post and post.attachments: posts.append(post) self.log(f"Mapped {len(posts)} posts with attachments for @{username}", 'info') return posts def _collection_to_post(self, collection, username: str, cutoff_dt=None) -> Optional[Post]: """Convert a SnapCollection to a Post with Attachments.""" if not collection.snaps: return None # Use the earliest snap timestamp as the post date timestamps = [s.timestamp for s in collection.snaps if s.timestamp] if timestamps: earliest = min(timestamps) published_at = earliest.strftime('%Y-%m-%d') else: published_at = None # Skip if all snaps are older than cutoff if cutoff_dt and timestamps: latest = max(timestamps) if latest < cutoff_dt: return None attachments = [] for snap in collection.snaps: if not snap.media_url: continue # Determine extension from media type ext = '.mp4' if snap.media_type == 'video' else '.jpg' name = f"{snap.media_id}{ext}" if snap.media_id else f"snap_{snap.index}{ext}" attachment = Attachment( name=name, file_type=snap.media_type, extension=ext, server_path=snap.media_url, download_url=snap.media_url, width=snap.width if snap.width else None, height=snap.height if snap.height else None, duration=snap.duration_ms // 1000 if snap.duration_ms else None, ) attachments.append(attachment) if not attachments: return None # Build content/title from collection metadata title = collection.title or None content = collection.title if collection.title else None # Tag as spotlight or highlight tag_name = collection.collection_type.title() # "Spotlight" or "Highlight" return Post( post_id=collection.collection_id, service_id=self.SERVICE_ID, platform=self.PLATFORM, creator_id=username, title=title, content=content, published_at=published_at, attachments=attachments, auto_tags=[tag_name], ) def download_snap(self, media_url: str, output_path: str) -> bool: """Download a single snap file via curl_cffi. Args: media_url: Direct URL to the media file output_path: Local path to save the file Returns: True if download succeeded """ import os downloader = self._get_downloader() session = downloader._get_session() try: url = media_url.replace('&', '&') resp = session.get(url, timeout=60) if resp.status_code == 200 and len(resp.content) > 0: os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'wb') as f: f.write(resp.content) return True else: self.log(f"Download failed: HTTP {resp.status_code}, size={len(resp.content)}", 'warning') return False except Exception as e: self.log(f"Download error: {e}", 'error') return False