Files
media-downloader/modules/paid_content/youtube_client.py
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

1088 lines
46 KiB
Python

"""
YouTube Channel Client - Fetches channel info and videos using yt-dlp
"""
import asyncio
import json
import os
import re
import subprocess
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from modules.base_module import LoggingMixin
from .models import Creator, Post, Attachment
class YouTubeClient(LoggingMixin):
"""
Client for fetching YouTube channel information and videos using yt-dlp
Supports:
- Channel URLs (youtube.com/channel/..., youtube.com/@handle, youtube.com/c/...)
- Fetching channel metadata
- Listing all videos from a channel
- Downloading videos
"""
# Quality presets for yt-dlp
QUALITY_PRESETS = {
'best': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'1080p': 'bestvideo[height<=1080][ext=mp4]+bestaudio[ext=m4a]/best[height<=1080][ext=mp4]/best',
'720p': 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best',
'480p': 'bestvideo[height<=480][ext=mp4]+bestaudio[ext=m4a]/best[height<=480][ext=mp4]/best',
'audio': 'bestaudio[ext=m4a]/bestaudio/best',
}
def __init__(self, ytdlp_path: str = None, unified_db=None, log_callback=None, api_key: str = None):
self._init_logger('PaidContent', log_callback, default_module='YouTube')
# Find yt-dlp executable
self.ytdlp_path = ytdlp_path or self._find_ytdlp()
if not self.ytdlp_path:
self.log("yt-dlp not found, YouTube support will be disabled", 'warning')
# YouTube Data API v3 key (optional, speeds up channel video listing)
self.api_key = api_key
# Store database reference for cookie access
self.unified_db = unified_db
self._cookies_file = None
def _find_ytdlp(self) -> Optional[str]:
"""Find yt-dlp executable"""
common_paths = [
'/opt/media-downloader/venv/bin/yt-dlp', # Prefer venv version (kept up to date)
'/usr/local/bin/yt-dlp',
'/usr/bin/yt-dlp',
'/opt/homebrew/bin/yt-dlp',
os.path.expanduser('~/.local/bin/yt-dlp'),
]
for path in common_paths:
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
try:
result = subprocess.run(['which', 'yt-dlp'], capture_output=True, text=True)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return None
def is_available(self) -> bool:
"""Check if yt-dlp is available"""
return self.ytdlp_path is not None
def _get_cookies_file(self) -> Optional[str]:
"""Get path to cookies file, creating it from database if needed"""
if self._cookies_file and os.path.exists(self._cookies_file):
return self._cookies_file
if not self.unified_db:
return None
try:
with self.unified_db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT cookies_json FROM scrapers WHERE id = ?", ('ytdlp',))
row = cursor.fetchone()
if row and row[0]:
data = json.loads(row[0])
# Support both {"cookies": [...]} and [...] formats
if isinstance(data, dict) and 'cookies' in data:
cookies_list = data['cookies']
elif isinstance(data, list):
cookies_list = data
else:
cookies_list = []
if cookies_list:
# Write cookies to temp file in Netscape format
fd, self._cookies_file = tempfile.mkstemp(suffix='.txt', prefix='ytdlp_cookies_')
with os.fdopen(fd, 'w') as f:
f.write("# Netscape HTTP Cookie File\n")
for cookie in cookies_list:
domain = cookie.get('domain', '')
include_subdomains = 'TRUE' if domain.startswith('.') else 'FALSE'
path = cookie.get('path', '/')
secure = 'TRUE' if cookie.get('secure', False) else 'FALSE'
expiry = str(int(cookie.get('expirationDate', 0)))
name = cookie.get('name', '')
value = cookie.get('value', '')
f.write(f"{domain}\t{include_subdomains}\t{path}\t{secure}\t{expiry}\t{name}\t{value}\n")
self.log(f"Loaded {len(cookies_list)} cookies from ytdlp scraper", 'debug')
return self._cookies_file
except Exception as e:
self.log(f"Could not load cookies: {e}", 'debug')
return None
def _get_base_cmd(self) -> List[str]:
"""Get base yt-dlp command with cookies if available"""
cmd = [self.ytdlp_path]
cookies_file = self._get_cookies_file()
if cookies_file:
cmd.extend(['--cookies', cookies_file])
return cmd
def cleanup(self):
"""Clean up temporary files"""
if self._cookies_file and os.path.exists(self._cookies_file):
try:
os.unlink(self._cookies_file)
except Exception:
pass
self._cookies_file = None
@staticmethod
def extract_channel_id(url: str) -> Optional[str]:
"""
Extract channel identifier from various YouTube URL formats
Supports:
- youtube.com/channel/UC...
- youtube.com/@handle
- youtube.com/c/channelname
- youtube.com/user/username
"""
patterns = [
r'youtube\.com/channel/([a-zA-Z0-9_-]+)',
r'youtube\.com/@([a-zA-Z0-9_.-]+)',
r'youtube\.com/c/([a-zA-Z0-9_-]+)',
r'youtube\.com/user/([a-zA-Z0-9_-]+)',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
@staticmethod
def normalize_channel_url(channel_id: str) -> str:
"""Convert channel ID/handle to a consistent URL format"""
# Already a full URL - return as-is
if channel_id.startswith('http://') or channel_id.startswith('https://'):
return channel_id
if channel_id.startswith('@'):
return f"https://www.youtube.com/{channel_id}"
elif channel_id.startswith('UC'):
return f"https://www.youtube.com/channel/{channel_id}"
else:
# Assume it's a handle without @
return f"https://www.youtube.com/@{channel_id}"
async def get_channel_info(self, channel_url: str) -> Optional[Dict]:
"""
Get channel information
Returns dict with channel metadata or None if not found
"""
if not self.is_available():
return None
try:
# Use yt-dlp to get channel info from the videos tab
cmd = self._get_base_cmd() + [
'--no-warnings',
'--flat-playlist',
'-j',
'--playlist-items', '1', # Just get first item to extract channel info
f"{channel_url}/videos"
]
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
# Try alternative: get channel page directly
cmd = self._get_base_cmd() + [
'--no-warnings',
'-j',
'--no-download',
'--playlist-items', '0',
channel_url
]
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
self.log(f"Failed to get channel info: {stderr.decode()}", 'warning')
return None
# Parse the output
for line in stdout.decode('utf-8', errors='replace').strip().split('\n'):
if not line:
continue
try:
data = json.loads(line)
# Extract channel info from playlist entry or video
channel_id = data.get('channel_id') or data.get('uploader_id')
channel_name = data.get('channel') or data.get('uploader') or data.get('playlist_title', '').replace(' - Videos', '')
if channel_id or channel_name:
return {
'channel_id': channel_id,
'channel_name': channel_name,
'channel_url': data.get('channel_url') or data.get('uploader_url') or channel_url,
'description': data.get('description', ''),
'subscriber_count': data.get('channel_follower_count'),
'thumbnail': data.get('channel_thumbnail') or data.get('thumbnail'),
}
except json.JSONDecodeError:
continue
return None
except Exception as e:
self.log(f"Error getting channel info: {e}", 'error')
return None
async def get_channel_avatar(self, channel_url: str) -> Optional[str]:
"""
Fetch channel avatar URL from YouTube page
yt-dlp doesn't provide channel avatars, so we scrape the page directly.
"""
try:
import aiohttp
import re
# Normalize URL to channel page
normalized_url = self.normalize_channel_url(channel_url)
if not normalized_url:
normalized_url = channel_url
async with aiohttp.ClientSession() as session:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
async with session.get(
normalized_url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15)
) as resp:
if resp.status == 200:
text = await resp.text()
# YouTube embeds channel avatar in multiple places
# Format 1: "avatar":{"thumbnails":[{"url":"..."}]}
avatar_match = re.search(r'"avatar"\s*:\s*\{\s*"thumbnails"\s*:\s*\[\s*\{\s*"url"\s*:\s*"([^"]+)"', text)
if avatar_match:
avatar_url = avatar_match.group(1).replace('\\u0026', '&')
# Get highest resolution by replacing size params
avatar_url = re.sub(r'=s\d+-', '=s800-', avatar_url)
self.log(f"Found YouTube channel avatar", 'debug')
return avatar_url
# Format 2: "avatar":{"avatarViewModel":{"image":{"sources":[{"url":"..."}]}}}
avatar_match = re.search(r'"avatar"\s*:\s*\{\s*"avatarViewModel"\s*:\s*\{\s*"image"\s*:\s*\{\s*"sources"\s*:\s*\[\s*\{\s*"url"\s*:\s*"([^"]+)"', text)
if avatar_match:
avatar_url = avatar_match.group(1).replace('\\u0026', '&')
avatar_url = re.sub(r'=s\d+-', '=s800-', avatar_url)
self.log(f"Found YouTube channel avatar (viewModel)", 'debug')
return avatar_url
# Fallback: look for og:image meta tag (usually channel avatar)
og_match = re.search(r'<meta\s+property="og:image"\s+content="([^"]+)"', text)
if not og_match:
og_match = re.search(r'<meta\s+content="([^"]+)"\s+property="og:image"', text)
if og_match:
return og_match.group(1)
except Exception as e:
self.log(f"Could not fetch YouTube channel avatar: {e}", 'debug')
return None
async def get_channel_banner(self, channel_url: str) -> Optional[str]:
"""
Fetch channel banner URL from YouTube page
"""
try:
import aiohttp
import re
normalized_url = self.normalize_channel_url(channel_url)
if not normalized_url:
normalized_url = channel_url
async with aiohttp.ClientSession() as session:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
async with session.get(
normalized_url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15)
) as resp:
if resp.status == 200:
text = await resp.text()
# Look for banner image in page data
# Format: "banner":{"imageBannerViewModel":{"image":{"sources":[{"url":"..."}]}}}
banner_match = re.search(r'"banner"\s*:\s*\{\s*"imageBannerViewModel"\s*:\s*\{\s*"image"\s*:\s*\{\s*"sources"\s*:\s*\[\s*\{\s*"url"\s*:\s*"([^"]+)"', text)
if banner_match:
banner_url = banner_match.group(1).replace('\\u0026', '&')
self.log(f"Found YouTube channel banner", 'debug')
return banner_url
# Fallback: older format with thumbnails
banner_match = re.search(r'"banner"\s*:\s*\{\s*"thumbnails"\s*:\s*\[\s*\{\s*"url"\s*:\s*"([^"]+)"', text)
if banner_match:
banner_url = banner_match.group(1).replace('\\u0026', '&')
self.log(f"Found YouTube channel banner (fallback)", 'debug')
return banner_url
except Exception as e:
self.log(f"Could not fetch YouTube channel banner: {e}", 'debug')
return None
async def get_channel_bio(self, channel_url: str) -> Optional[str]:
"""
Fetch channel description/bio from YouTube page
"""
metadata = await self.get_channel_metadata(channel_url)
return metadata.get('bio') if metadata else None
async def get_channel_metadata(self, channel_url: str) -> Optional[Dict]:
"""
Fetch channel metadata including bio, joined date, location, and external links
"""
try:
import aiohttp
import re
# Navigate to the about page for better description access
normalized_url = self.normalize_channel_url(channel_url)
if not normalized_url:
normalized_url = channel_url
about_url = f"{normalized_url}/about"
async with aiohttp.ClientSession() as session:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
async with session.get(
about_url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15)
) as resp:
if resp.status == 200:
text = await resp.text()
result = {}
# Look for description in page data
# Format: "description":{"simpleText":"..."}
desc_match = re.search(r'"description"\s*:\s*\{\s*"simpleText"\s*:\s*"((?:[^"\\]|\\.)*)"', text)
if desc_match and desc_match.group(1):
try:
bio = json.loads(f'"{desc_match.group(1)}"')
if bio and bio.strip():
result['bio'] = bio
self.log(f"Found YouTube channel bio", 'debug')
except (json.JSONDecodeError, ValueError):
bio = desc_match.group(1).replace('\\n', '\n').replace('\\u0026', '&').replace('\\"', '"')
if bio and bio.strip():
result['bio'] = bio
# Alternative format for bio
if 'bio' not in result:
desc_match = re.search(r'"channelMetadataRenderer"[^}]*"description"\s*:\s*"((?:[^"\\]|\\.)*)"', text)
if desc_match and desc_match.group(1):
try:
bio = json.loads(f'"{desc_match.group(1)}"')
if bio and bio.strip():
result['bio'] = bio
except (json.JSONDecodeError, ValueError):
pass
# Extract joined date - "Joined Jan 25, 2018" -> "Jan 25, 2018"
joined_match = re.search(r'"joinedDateText"\s*:\s*\{[^}]*"content"\s*:\s*"([^"]+)"', text)
if joined_match:
joined_text = joined_match.group(1)
# Strip "Joined " prefix if present
if joined_text.startswith('Joined '):
joined_text = joined_text[7:]
result['joined_date'] = joined_text
self.log(f"Found YouTube joined date: {result['joined_date']}", 'debug')
# Extract country/location
country_match = re.search(r'"country"\s*:\s*\{[^}]*"simpleText"\s*:\s*"([^"]+)"', text)
if country_match:
result['location'] = country_match.group(1)
self.log(f"Found YouTube location: {result['location']}", 'debug')
# Extract external links
# Format: "channelExternalLinkViewModel":{"title":{"content":"Twitter"},"link":{"content":"twitter.com/..."}}
links = []
link_pattern = r'"channelExternalLinkViewModel"\s*:\s*\{[^}]*"title"\s*:\s*\{[^}]*"content"\s*:\s*"([^"]+)"[^}]*\}[^}]*"link"\s*:\s*\{[^}]*"content"\s*:\s*"([^"]+)"'
for match in re.finditer(link_pattern, text):
links.append({'title': match.group(1), 'url': match.group(2)})
if links:
result['external_links'] = json.dumps(links)
self.log(f"Found {len(links)} YouTube external links", 'debug')
return result if result else None
except Exception as e:
self.log(f"Could not fetch YouTube channel metadata: {e}", 'debug')
return None
async def _fetch_videos_via_api(self, channel_url: str, since_date: str = None,
max_videos: int = None, progress_callback=None) -> List[Dict]:
"""
Fetch channel videos using YouTube Data API v3 (much faster than yt-dlp).
Uses playlistItems endpoint to paginate through the channel's uploads playlist,
then batches video IDs to get duration/stats via the videos endpoint.
"""
import aiohttp
api_key = self.api_key
base = 'https://www.googleapis.com/youtube/v3'
# Step 1: Resolve channel handle/URL to channel ID
channel_id_raw = self.extract_channel_id(channel_url)
if not channel_id_raw:
raise ValueError(f"Could not extract channel identifier from {channel_url}")
async with aiohttp.ClientSession() as session:
# Determine the uploads playlist ID
uploads_playlist_id = None
if channel_id_raw.startswith('UC'):
# Already a channel ID — uploads playlist is UC -> UU
uploads_playlist_id = 'UU' + channel_id_raw[2:]
else:
# It's a handle — resolve via channels endpoint
params = {'part': 'contentDetails', 'forHandle': channel_id_raw, 'key': api_key}
async with session.get(f'{base}/channels', params=params, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status != 200:
body = await resp.text()
raise ValueError(f"YouTube API channels lookup failed ({resp.status}): {body[:200]}")
data = await resp.json()
items = data.get('items', [])
if not items:
raise ValueError(f"YouTube API: no channel found for handle '{channel_id_raw}'")
uploads_playlist_id = items[0]['contentDetails']['relatedPlaylists']['uploads']
self.log(f"Fetching videos via YouTube Data API (uploads playlist: {uploads_playlist_id})", 'info')
# Step 2: Paginate through playlistItems
video_snippets = []
page_token = None
since_dt = None
if since_date:
try:
since_dt = datetime.fromisoformat(since_date.replace('Z', '+00:00')).replace(tzinfo=None)
except (ValueError, AttributeError):
pass
while True:
params = {
'part': 'snippet',
'playlistId': uploads_playlist_id,
'maxResults': '50',
'key': api_key,
}
if page_token:
params['pageToken'] = page_token
async with session.get(f'{base}/playlistItems', params=params, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status != 200:
body = await resp.text()
raise ValueError(f"YouTube API playlistItems failed ({resp.status}): {body[:200]}")
data = await resp.json()
stop_paging = False
for item in data.get('items', []):
snippet = item.get('snippet', {})
published_at = snippet.get('publishedAt', '')
# Date filter: stop when we reach videos older than since_date
if since_dt and published_at:
try:
video_dt = datetime.fromisoformat(published_at.replace('Z', '+00:00')).replace(tzinfo=None)
if video_dt < since_dt:
stop_paging = True
break
except (ValueError, AttributeError):
pass
video_id = snippet.get('resourceId', {}).get('videoId')
if not video_id:
continue
video_snippets.append({
'video_id': video_id,
'title': snippet.get('title', f'Video {video_id}'),
'description': snippet.get('description', ''),
'published_at': published_at,
'channel_id': snippet.get('channelId', ''),
'channel': snippet.get('channelTitle', ''),
'thumbnail': (snippet.get('thumbnails', {}).get('maxres', {}).get('url')
or snippet.get('thumbnails', {}).get('high', {}).get('url')
or f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg"),
})
if progress_callback:
progress_callback(len(video_snippets))
if max_videos and len(video_snippets) >= max_videos:
stop_paging = True
break
page_token = data.get('nextPageToken')
if stop_paging or not page_token:
break
# Step 3: Batch-fetch duration and stats for all videos (50 at a time)
duration_map = {}
stats_map = {}
video_ids = [v['video_id'] for v in video_snippets]
for i in range(0, len(video_ids), 50):
batch = video_ids[i:i + 50]
params = {
'part': 'contentDetails,statistics',
'id': ','.join(batch),
'key': api_key,
}
async with session.get(f'{base}/videos', params=params, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
vdata = await resp.json()
for vitem in vdata.get('items', []):
vid = vitem['id']
# Parse ISO 8601 duration (PT#H#M#S) to seconds
dur_str = vitem.get('contentDetails', {}).get('duration', '')
duration_map[vid] = self._parse_iso_duration(dur_str)
stats = vitem.get('statistics', {})
stats_map[vid] = {
'view_count': int(stats['viewCount']) if 'viewCount' in stats else None,
'like_count': int(stats['likeCount']) if 'likeCount' in stats else None,
}
# Step 4: Assemble final video list in same format as yt-dlp method
videos = []
for v in video_snippets:
vid = v['video_id']
upload_date = None
if v['published_at']:
try:
upload_date = datetime.fromisoformat(v['published_at'].replace('Z', '+00:00')).strftime('%Y-%m-%dT%H:%M:%S')
except (ValueError, AttributeError):
upload_date = v['published_at']
stats = stats_map.get(vid, {})
duration = duration_map.get(vid)
video_entry = {
'video_id': vid,
'title': v['title'],
'description': v['description'],
'upload_date': upload_date,
'duration': duration,
'view_count': stats.get('view_count'),
'like_count': stats.get('like_count'),
'thumbnail': v['thumbnail'],
'url': f"https://www.youtube.com/watch?v={vid}",
'channel_id': v['channel_id'],
'channel': v['channel'],
}
# Tag YouTube Shorts (≤ 3 minutes)
if duration is not None and duration <= 180:
video_entry['is_short'] = True
videos.append(video_entry)
self.log(f"YouTube Data API returned {len(videos)} videos", 'info')
return videos
async def _fetch_members_only_videos(self, channel_url: str, since_date: str = None,
progress_callback=None, base_count: int = 0,
known_public_ids: set = None) -> List[Dict]:
"""
Fetch members-only videos from a channel using yt-dlp with cookies.
Strategy: Use --flat-playlist (fast) to get all video IDs visible with cookies.
Any IDs not in the API results (known_public_ids) are likely members-only.
Then fetch full metadata only for those specific videos.
"""
if not self._get_cookies_file():
self.log("No cookies available, skipping members-only check", 'debug')
return []
# Step 1: Fast flat-playlist scan to get all video IDs (includes members-only with cookies)
cmd = self._get_base_cmd() + [
'--no-warnings',
'--flat-playlist',
'-j',
f"{channel_url}/videos"
]
if since_date:
try:
date_obj = datetime.fromisoformat(since_date.replace('Z', '+00:00'))
dateafter = date_obj.strftime('%Y%m%d')
cmd.extend(['--dateafter', dateafter])
except (ValueError, AttributeError):
pass
self.log("Checking for members-only videos via yt-dlp (flat scan)...", 'info')
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
error = stderr.decode('utf-8', errors='replace')
self.log(f"Flat playlist scan failed: {error[:200]}", 'debug')
return []
# Find video IDs not in the API results
all_ids = []
for line in stdout.decode('utf-8', errors='replace').strip().split('\n'):
if not line:
continue
try:
data = json.loads(line)
vid = data.get('id') or data.get('url')
if vid and (not known_public_ids or vid not in known_public_ids):
all_ids.append(vid)
except json.JSONDecodeError:
continue
if not all_ids:
self.log("No additional videos found beyond API results", 'debug')
return []
self.log(f"Found {len(all_ids)} videos not in API results, fetching metadata...", 'info')
# Step 2: Fetch full metadata only for the unknown videos
videos = []
for vid in all_ids:
video_url = f"https://www.youtube.com/watch?v={vid}"
cmd = self._get_base_cmd() + [
'--no-warnings',
'--skip-download',
'--no-write-thumbnail',
'-j',
'--extractor-args', 'youtube:skip=hls,dash',
video_url
]
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
continue
for line in stdout.decode('utf-8', errors='replace').strip().split('\n'):
if not line:
continue
try:
data = json.loads(line)
video_id = data.get('id')
if not video_id:
continue
upload_date = data.get('upload_date')
if upload_date:
try:
upload_date = datetime.strptime(upload_date, '%Y%m%d').isoformat()
except ValueError:
pass
videos.append({
'video_id': video_id,
'title': data.get('title', f'Video {video_id}'),
'description': data.get('description', ''),
'upload_date': upload_date,
'duration': data.get('duration'),
'view_count': data.get('view_count'),
'like_count': data.get('like_count'),
'thumbnail': data.get('thumbnail') or f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg",
'url': f"https://www.youtube.com/watch?v={video_id}",
'channel_id': data.get('channel_id'),
'channel': data.get('channel') or data.get('uploader'),
'members_only': True,
})
if progress_callback:
progress_callback(base_count + len(videos))
except json.JSONDecodeError:
continue
self.log(f"Found {len(videos)} members-only videos", 'info' if videos else 'debug')
return videos
@staticmethod
def _parse_iso_duration(duration: str) -> Optional[int]:
"""Parse ISO 8601 duration (e.g. PT1H2M3S) to seconds"""
if not duration:
return None
match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration)
if not match:
return None
hours = int(match.group(1) or 0)
minutes = int(match.group(2) or 0)
seconds = int(match.group(3) or 0)
return hours * 3600 + minutes * 60 + seconds
async def get_channel_videos(self, channel_url: str, since_date: str = None,
max_videos: int = None, progress_callback=None,
known_video_ids: set = None) -> List[Dict]:
"""
Get all videos from a channel
Args:
channel_url: YouTube channel URL
since_date: Only fetch videos published after this date (ISO format)
max_videos: Maximum number of videos to fetch
progress_callback: Callback function(count) for progress updates
known_video_ids: Video IDs already in the database (avoids re-fetching on resync)
Returns:
List of video metadata dicts
"""
# Prefer YouTube Data API v3 if an API key is configured
if self.api_key:
try:
videos = await self._fetch_videos_via_api(channel_url, since_date, max_videos, progress_callback)
# API only returns public videos — do a targeted yt-dlp pass
# for members-only content (requires cookies from a member account)
if self.is_available():
try:
# Combine current API results + DB IDs so the flat scan
# doesn't re-fetch videos we already know about
all_known_ids = {v['video_id'] for v in videos}
if known_video_ids:
all_known_ids |= known_video_ids
members_videos = await self._fetch_members_only_videos(
channel_url, since_date, progress_callback, len(videos),
known_public_ids=all_known_ids
)
if members_videos:
new_members = [v for v in members_videos if v['video_id'] not in all_known_ids]
if new_members:
self.log(f"Found {len(new_members)} members-only videos via yt-dlp", 'info')
videos.extend(new_members)
# Re-sort by upload date descending
videos.sort(key=lambda v: v.get('upload_date') or '', reverse=True)
except Exception as e:
self.log(f"Members-only video fetch failed (non-fatal): {e}", 'debug')
return videos
except Exception as e:
self.log(f"YouTube Data API failed, falling back to yt-dlp: {e}", 'warning')
if not self.is_available():
return []
try:
# Build command to list all videos
# Note: We don't use --flat-playlist because it truncates descriptions
# This is slower but provides full video metadata including complete descriptions
cmd = self._get_base_cmd() + [
'--no-warnings',
'--skip-download',
'--no-write-thumbnail',
'-j',
'--extractor-args', 'youtube:skip=hls,dash',
f"{channel_url}/videos"
]
# Add date filter at yt-dlp level for efficiency
if since_date:
try:
# Convert ISO date to YYYYMMDD format for yt-dlp
date_obj = datetime.fromisoformat(since_date.replace('Z', '+00:00'))
dateafter = date_obj.strftime('%Y%m%d')
cmd.extend(['--dateafter', dateafter])
self.log(f"Filtering videos after {dateafter}", 'debug')
except (ValueError, AttributeError):
pass
if max_videos:
cmd.extend(['--playlist-items', f'1:{max_videos}'])
self.log(f"Fetching videos from channel: {channel_url}", 'info')
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
error = stderr.decode('utf-8', errors='replace')
self.log(f"Failed to get channel videos: {error}", 'warning')
return []
videos = []
for line in stdout.decode('utf-8', errors='replace').strip().split('\n'):
if not line:
continue
try:
data = json.loads(line)
# Skip non-video entries (like playlists)
if data.get('_type') == 'playlist':
continue
video_id = data.get('id')
if not video_id:
continue
# Parse upload date
upload_date = data.get('upload_date')
if upload_date:
# Convert YYYYMMDD to ISO format
try:
upload_date = datetime.strptime(upload_date, '%Y%m%d').isoformat()
except ValueError:
pass
# Check if video is newer than since_date
if since_date and upload_date and upload_date <= since_date:
self.log(f"Reached video from {upload_date}, stopping", 'debug')
break
video_entry = {
'video_id': video_id,
'title': data.get('title', f'Video {video_id}'),
'description': data.get('description', ''),
'upload_date': upload_date,
'duration': data.get('duration'),
'view_count': data.get('view_count'),
'like_count': data.get('like_count'),
'thumbnail': data.get('thumbnail') or f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg",
'url': f"https://www.youtube.com/watch?v={video_id}",
'channel_id': data.get('channel_id'),
'channel': data.get('channel') or data.get('uploader'),
}
# Tag members-only videos
if data.get('availability') == 'subscriber_only':
video_entry['members_only'] = True
# Tag YouTube Shorts (≤ 3 minutes)
duration = data.get('duration')
if duration is not None and duration <= 180:
video_entry['is_short'] = True
videos.append(video_entry)
if progress_callback:
progress_callback(len(videos))
if max_videos and len(videos) >= max_videos:
break
except json.JSONDecodeError:
continue
self.log(f"Found {len(videos)} videos", 'info')
return videos
except Exception as e:
self.log(f"Error getting channel videos: {e}", 'error')
return []
async def download_video(self, video_url: str, output_dir: Path, quality: str = 'best',
progress_callback=None) -> Dict:
"""
Download a video
Args:
video_url: YouTube video URL
output_dir: Directory to save the video
quality: Quality preset
progress_callback: Callback for download progress
Returns:
Dict with success status and file info
"""
if not self.is_available():
return {'success': False, 'error': 'yt-dlp not available'}
try:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Output template preserves original title
output_template = str(output_dir / '%(title).100s_%(id)s.%(ext)s')
format_str = self.QUALITY_PRESETS.get(quality, self.QUALITY_PRESETS['best'])
cmd = self._get_base_cmd() + [
'--no-warnings',
'-f', format_str,
'--merge-output-format', 'mp4',
'-o', output_template,
'--print-json',
'--no-playlist',
video_url
]
self.log(f"Downloading video: {video_url}", 'debug')
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
error_msg = stderr.decode('utf-8', errors='replace').strip()
if 'Video unavailable' in error_msg:
error_msg = 'Video unavailable or private'
elif 'age-restricted' in error_msg.lower():
error_msg = 'Video is age-restricted'
elif 'members only' in error_msg.lower():
error_msg = 'Video is members-only'
elif len(error_msg) > 200:
error_msg = error_msg[:200] + '...'
return {'success': False, 'error': error_msg}
# Parse output JSON
video_info = None
for line in stdout.decode('utf-8', errors='replace').strip().split('\n'):
try:
video_info = json.loads(line)
break
except json.JSONDecodeError:
continue
if not video_info:
# Try to find downloaded file
files = list(output_dir.glob('*.mp4'))
if files:
file_path = max(files, key=lambda f: f.stat().st_mtime)
return {
'success': True,
'file_path': str(file_path),
'filename': file_path.name,
'file_size': file_path.stat().st_size
}
return {'success': False, 'error': 'Could not find downloaded file'}
file_path = video_info.get('_filename') or video_info.get('filename')
if file_path:
file_path = Path(file_path)
return {
'success': True,
'file_path': str(file_path) if file_path else None,
'filename': file_path.name if file_path else None,
'file_size': file_path.stat().st_size if file_path and file_path.exists() else video_info.get('filesize'),
'title': video_info.get('title'),
'duration': video_info.get('duration'),
'video_id': video_info.get('id'),
'upload_date': video_info.get('upload_date'),
'thumbnail': video_info.get('thumbnail'),
}
except Exception as e:
self.log(f"Error downloading video: {e}", 'error')
return {'success': False, 'error': str(e)}
async def get_creator(self, channel_url: str) -> Optional[Creator]:
"""
Get Creator object from channel URL
"""
info = await self.get_channel_info(channel_url)
if not info:
return None
return Creator(
creator_id=info.get('channel_id') or self.extract_channel_id(channel_url),
service_id='youtube',
platform='youtube',
username=info.get('channel_name', 'Unknown'),
display_name=info.get('channel_name'),
profile_image_url=info.get('thumbnail'),
)
async def get_posts(self, channel_url: str, since_date: str = None,
max_videos: int = None, progress_callback=None,
known_video_ids: set = None) -> List[Post]:
"""
Get videos as Post objects
"""
videos = await self.get_channel_videos(channel_url, since_date, max_videos, progress_callback,
known_video_ids=known_video_ids)
posts = []
for video in videos:
# Create attachment for the video
attachment = Attachment(
name=f"{video['title']}.mp4",
file_type='video',
extension='.mp4',
server_path=video['url'], # Use URL as server_path
download_url=video['url'],
duration=video.get('duration'),
)
auto_tags = []
if video.get('members_only'):
auto_tags.append('Members Only')
if video.get('is_short'):
auto_tags.append('Short')
post = Post(
post_id=video['video_id'],
service_id='youtube',
platform='youtube',
creator_id=video.get('channel_id', ''),
title=video['title'],
content=video.get('description', ''),
published_at=video.get('upload_date'),
attachments=[attachment],
auto_tags=auto_tags,
)
posts.append(post)
return posts