312 lines
13 KiB
Python
312 lines
13 KiB
Python
"""
|
|
Unified API client for Coomer.party and Kemono.party
|
|
Both services share the same API structure (Kemono fork)
|
|
"""
|
|
|
|
import aiohttp
|
|
import asyncio
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
from modules.base_module import LoggingMixin, RateLimitMixin
|
|
from .models import Creator, Post, Attachment
|
|
|
|
|
|
class PaidContentAPIClient(LoggingMixin, RateLimitMixin):
|
|
"""
|
|
API client for Coomer and Kemono archival services
|
|
|
|
API Endpoints:
|
|
- GET /creators - List all creators
|
|
- GET /{service}/user/{creator_id} - Get creator info
|
|
- GET /{service}/user/{creator_id} - Get creator's posts (paginated with ?o=offset)
|
|
- GET /{service}/user/{creator_id}/post/{post_id} - Get single post
|
|
"""
|
|
|
|
# Fallback URLs if database doesn't have them configured
|
|
DEFAULT_SERVICE_URLS = {
|
|
'coomer': 'https://coomer.party',
|
|
'kemono': 'https://kemono.party'
|
|
}
|
|
|
|
SUPPORTED_PLATFORMS = {
|
|
'coomer': ['onlyfans', 'fansly', 'candfans'],
|
|
'kemono': ['patreon', 'fanbox', 'gumroad', 'subscribestar', 'discord']
|
|
}
|
|
|
|
def __init__(self, service_id: str, session_cookie: str = None, base_url: str = None, log_callback=None):
|
|
self._init_logger('PaidContent', log_callback, default_module='API')
|
|
self._init_rate_limiter(min_delay=0.5, max_delay=2.0, batch_delay_min=1, batch_delay_max=3)
|
|
|
|
self.service_id = service_id
|
|
|
|
# Use provided base_url, or fall back to defaults
|
|
if base_url:
|
|
# If base_url includes /api/v1, extract just the base
|
|
if '/api/v1' in base_url:
|
|
self.base_url = base_url.replace('/api/v1', '').rstrip('/')
|
|
else:
|
|
self.base_url = base_url.rstrip('/')
|
|
else:
|
|
self.base_url = self.DEFAULT_SERVICE_URLS.get(service_id)
|
|
|
|
self.api_url = f"{self.base_url}/api/v1"
|
|
self.session_cookie = session_cookie
|
|
self._session: Optional[aiohttp.ClientSession] = None
|
|
|
|
async def _get_session(self) -> aiohttp.ClientSession:
|
|
"""Get or create aiohttp session"""
|
|
if self._session is None or self._session.closed:
|
|
# Note: Coomer/Kemono require 'Accept: text/css' header as anti-scraping measure
|
|
# Despite this, they still return JSON responses
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/css',
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|
'Referer': self.base_url
|
|
}
|
|
cookies = {}
|
|
if self.session_cookie:
|
|
cookies['session'] = self.session_cookie
|
|
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
self._session = aiohttp.ClientSession(headers=headers, cookies=cookies, timeout=timeout)
|
|
return self._session
|
|
|
|
async def close(self):
|
|
"""Close the aiohttp session"""
|
|
if self._session and not self._session.closed:
|
|
await self._session.close()
|
|
self._session = None
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.close()
|
|
|
|
async def check_health(self) -> Dict[str, Any]:
|
|
"""Check API health status"""
|
|
import time
|
|
try:
|
|
session = await self._get_session()
|
|
start = time.time()
|
|
async with session.get(f"{self.api_url}/creators", timeout=aiohttp.ClientTimeout(total=10)) as resp:
|
|
elapsed = time.time() - start
|
|
if resp.status == 200:
|
|
# content_type=None allows parsing JSON regardless of response content-type
|
|
await resp.json(content_type=None)
|
|
return {'status': 'healthy', 'response_time': round(elapsed, 3)}
|
|
elif resp.status == 429:
|
|
return {'status': 'rate_limited', 'response_code': 429}
|
|
else:
|
|
return {'status': 'degraded', 'response_code': resp.status}
|
|
except asyncio.TimeoutError:
|
|
return {'status': 'timeout', 'error': 'Request timed out'}
|
|
except Exception as e:
|
|
return {'status': 'down', 'error': str(e)}
|
|
|
|
async def get_all_creators(self) -> List[Dict]:
|
|
"""Get list of all available creators (for search)"""
|
|
self._delay_between_items()
|
|
try:
|
|
session = await self._get_session()
|
|
async with session.get(f"{self.api_url}/creators") as resp:
|
|
if resp.status == 200:
|
|
return await resp.json(content_type=None)
|
|
self.log(f"Failed to get creators list: HTTP {resp.status}", 'warning')
|
|
return []
|
|
except Exception as e:
|
|
self.log(f"Error getting creators list: {e}", 'error')
|
|
return []
|
|
|
|
async def get_creator(self, platform: str, creator_id: str) -> Optional[Creator]:
|
|
"""Get creator info"""
|
|
self._delay_between_items()
|
|
try:
|
|
session = await self._get_session()
|
|
|
|
# First try to get creator profile
|
|
url = f"{self.api_url}/{platform}/user/{creator_id}/profile"
|
|
async with session.get(url) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json(content_type=None)
|
|
return Creator.from_api(data, self.service_id, platform, self.base_url)
|
|
|
|
# Fallback: get first post to extract creator info
|
|
url = f"{self.api_url}/{platform}/user/{creator_id}/posts"
|
|
async with session.get(url) as resp:
|
|
if resp.status == 200:
|
|
posts = await resp.json(content_type=None)
|
|
if posts and len(posts) > 0:
|
|
# Extract creator info from first post
|
|
first_post = posts[0]
|
|
# Construct image URLs - use .st instead of .party
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(self.base_url)
|
|
# Convert .party to .st for image URLs (coomer.party/kemono.party images are at .st)
|
|
netloc = parsed.netloc.replace('.party', '.st')
|
|
img_domain = f"img.{netloc}"
|
|
profile_image_url = f"https://{img_domain}/icons/{platform}/{creator_id}"
|
|
banner_image_url = f"https://{img_domain}/banners/{platform}/{creator_id}"
|
|
return Creator(
|
|
creator_id=creator_id,
|
|
service_id=self.service_id,
|
|
platform=platform,
|
|
username=first_post.get('user', creator_id),
|
|
display_name=first_post.get('user', creator_id),
|
|
profile_image_url=profile_image_url,
|
|
banner_image_url=banner_image_url
|
|
)
|
|
|
|
self.log(f"Creator not found: {platform}/{creator_id}", 'warning')
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.log(f"Error getting creator {platform}/{creator_id}: {e}", 'error')
|
|
return None
|
|
|
|
async def get_creator_posts(self, platform: str, creator_id: str, offset: int = 0) -> List[Post]:
|
|
"""Get creator's posts (50 per page by default)"""
|
|
self._delay_between_items()
|
|
try:
|
|
session = await self._get_session()
|
|
|
|
url = f"{self.api_url}/{platform}/user/{creator_id}/posts"
|
|
params = {'o': offset} if offset > 0 else {}
|
|
|
|
async with session.get(url, params=params) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json(content_type=None)
|
|
return [Post.from_api(p, self.service_id, platform, creator_id, self.base_url) for p in data]
|
|
elif resp.status == 404:
|
|
self.log(f"Creator not found: {platform}/{creator_id}", 'warning')
|
|
else:
|
|
self.log(f"Failed to get posts: HTTP {resp.status}", 'warning')
|
|
return []
|
|
|
|
except Exception as e:
|
|
self.log(f"Error getting posts for {platform}/{creator_id}: {e}", 'error')
|
|
return []
|
|
|
|
async def get_all_creator_posts(self, platform: str, creator_id: str,
|
|
since_date: str = None, max_posts: int = None,
|
|
progress_callback=None) -> List[Post]:
|
|
"""Fetch all posts with pagination"""
|
|
all_posts = []
|
|
offset = 0
|
|
page = 0
|
|
|
|
self.log(f"Fetching posts for {platform}/{creator_id}", 'info')
|
|
|
|
while True:
|
|
posts = await self.get_creator_posts(platform, creator_id, offset)
|
|
if not posts:
|
|
break
|
|
|
|
for post in posts:
|
|
# Stop if we've reached posts we've already seen
|
|
if since_date and post.published_at and post.published_at <= since_date:
|
|
self.log(f"Reached already-seen post date: {post.published_at}", 'debug')
|
|
return all_posts
|
|
|
|
all_posts.append(post)
|
|
|
|
if max_posts and len(all_posts) >= max_posts:
|
|
self.log(f"Reached max posts limit: {max_posts}", 'debug')
|
|
return all_posts
|
|
|
|
page += 1
|
|
offset += 50
|
|
|
|
if progress_callback:
|
|
progress_callback(page, len(all_posts))
|
|
|
|
self._delay_between_batches()
|
|
|
|
self.log(f"Fetched {len(all_posts)} posts for {platform}/{creator_id}", 'info')
|
|
return all_posts
|
|
|
|
async def get_post(self, platform: str, creator_id: str, post_id: str) -> Optional[Post]:
|
|
"""Get single post by ID"""
|
|
self._delay_between_items()
|
|
try:
|
|
session = await self._get_session()
|
|
|
|
url = f"{self.api_url}/{platform}/user/{creator_id}/post/{post_id}"
|
|
async with session.get(url) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json(content_type=None)
|
|
# Single post endpoint wraps response in {"post": {...}}
|
|
if isinstance(data, dict) and 'post' in data:
|
|
data = data['post']
|
|
return Post.from_api(data, self.service_id, platform, creator_id, self.base_url)
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.log(f"Error getting post {post_id}: {e}", 'error')
|
|
return None
|
|
|
|
async def search_creators(self, query: str, platform: str = None) -> List[Dict]:
|
|
"""Search for creators by name"""
|
|
self._delay_between_items()
|
|
try:
|
|
# Get all creators and filter locally (API doesn't have search endpoint)
|
|
all_creators = await self.get_all_creators()
|
|
|
|
query_lower = query.lower()
|
|
results = []
|
|
|
|
for creator in all_creators:
|
|
if platform and creator.get('service') != platform:
|
|
continue
|
|
|
|
name = (creator.get('name') or '').lower()
|
|
if query_lower in name:
|
|
results.append({
|
|
'id': creator.get('id'),
|
|
'name': creator.get('name'),
|
|
'service': creator.get('service'),
|
|
'indexed': creator.get('indexed'),
|
|
'updated': creator.get('updated'),
|
|
'favorited': creator.get('favorited', 0)
|
|
})
|
|
|
|
# Sort by favorited count (popularity)
|
|
results.sort(key=lambda x: x.get('favorited', 0), reverse=True)
|
|
return results[:50] # Limit results
|
|
|
|
except Exception as e:
|
|
self.log(f"Error searching creators: {e}", 'error')
|
|
return []
|
|
|
|
def get_attachment_url(self, server_path: str) -> str:
|
|
"""Convert server path to full download URL"""
|
|
if not server_path:
|
|
return ''
|
|
if server_path.startswith('http'):
|
|
return server_path
|
|
return f"{self.base_url}/data{server_path}"
|
|
|
|
def get_thumbnail_url(self, server_path: str) -> str:
|
|
"""Get thumbnail URL for an attachment"""
|
|
if not server_path:
|
|
return ''
|
|
if server_path.startswith('http'):
|
|
return server_path
|
|
return f"{self.base_url}/thumbnail/data{server_path}"
|
|
|
|
@classmethod
|
|
def get_supported_platforms(cls, service_id: str) -> List[str]:
|
|
"""Get list of supported platforms for a service"""
|
|
return cls.SUPPORTED_PLATFORMS.get(service_id, [])
|
|
|
|
@classmethod
|
|
def is_valid_service(cls, service_id: str) -> bool:
|
|
"""Check if service ID is valid"""
|
|
return service_id in cls.SERVICE_URLS
|
|
|
|
@classmethod
|
|
def get_service_ids(cls) -> List[str]:
|
|
"""Get list of all service IDs"""
|
|
return list(cls.SERVICE_URLS.keys())
|