392 lines
16 KiB
Python
392 lines
16 KiB
Python
"""Taddy Podcast API client for finding podcast appearances"""
|
|
import asyncio
|
|
import re
|
|
from html import unescape
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
from web.backend.core.http_client import http_client
|
|
from modules.universal_logger import get_logger
|
|
|
|
logger = get_logger('Taddy')
|
|
|
|
|
|
def strip_html(text: str) -> str:
|
|
"""Strip HTML tags and decode entities from text"""
|
|
if not text:
|
|
return text
|
|
# Remove HTML tags
|
|
clean = re.sub(r'<[^>]+>', ' ', text)
|
|
# Decode HTML entities
|
|
clean = unescape(clean)
|
|
# Normalize whitespace
|
|
clean = re.sub(r'\s+', ' ', clean).strip()
|
|
return clean
|
|
|
|
|
|
class TaddyClient:
|
|
"""Client for interacting with the Taddy Podcast API (GraphQL)
|
|
|
|
Supports primary and fallback accounts for quota management.
|
|
When the primary account fails (500 error / quota exceeded),
|
|
automatically switches to the fallback account.
|
|
"""
|
|
|
|
BASE_URL = "https://api.taddy.org"
|
|
|
|
def __init__(self, user_id: str, api_key: str,
|
|
user_id_2: str = None, api_key_2: str = None):
|
|
# Primary account
|
|
self.user_id = user_id
|
|
self.api_key = api_key
|
|
|
|
# Fallback account (optional)
|
|
self.user_id_2 = user_id_2
|
|
self.api_key_2 = api_key_2
|
|
self.has_fallback = bool(user_id_2 and api_key_2)
|
|
|
|
# Track which account is active
|
|
self.using_fallback = False
|
|
|
|
self._update_headers()
|
|
|
|
def _update_headers(self):
|
|
"""Update headers based on current active account"""
|
|
if self.using_fallback and self.has_fallback:
|
|
self.headers = {
|
|
"Content-Type": "application/json",
|
|
"X-USER-ID": self.user_id_2,
|
|
"X-API-KEY": self.api_key_2
|
|
}
|
|
else:
|
|
self.headers = {
|
|
"Content-Type": "application/json",
|
|
"X-USER-ID": self.user_id,
|
|
"X-API-KEY": self.api_key
|
|
}
|
|
|
|
def _switch_to_fallback(self) -> bool:
|
|
"""Switch to fallback account if available. Returns True if switched."""
|
|
if self.has_fallback and not self.using_fallback:
|
|
self.using_fallback = True
|
|
self._update_headers()
|
|
logger.info("Switched to fallback Taddy account")
|
|
return True
|
|
return False
|
|
|
|
async def _graphql_query(self, query: str, variables: Dict = None, retry_on_fallback: bool = True) -> Optional[Dict]:
|
|
"""Execute a GraphQL query against the Taddy API
|
|
|
|
If the primary account fails with a 500 error (quota exceeded),
|
|
automatically retries with the fallback account if configured.
|
|
"""
|
|
try:
|
|
payload = {"query": query}
|
|
if variables:
|
|
payload["variables"] = variables
|
|
|
|
response = await http_client.post(
|
|
self.BASE_URL,
|
|
json=payload,
|
|
headers=self.headers
|
|
)
|
|
|
|
data = response.json()
|
|
|
|
if "errors" in data:
|
|
logger.error(f"Taddy API error: {data['errors']}")
|
|
return None
|
|
|
|
return data.get("data")
|
|
|
|
except Exception as e:
|
|
error_str = str(e).lower()
|
|
# Check for 500 error (quota exceeded) - http_client raises ServiceError
|
|
if "500" in error_str or "server error" in error_str:
|
|
account_type = "fallback" if self.using_fallback else "primary"
|
|
logger.warning(f"Taddy API returned 500 on {account_type} account (likely quota exceeded)")
|
|
|
|
# Try fallback if available and we haven't already
|
|
if retry_on_fallback and self._switch_to_fallback():
|
|
logger.info("Retrying with fallback Taddy account...")
|
|
return await self._graphql_query(query, variables, retry_on_fallback=False)
|
|
|
|
logger.error(f"Taddy API request failed: {e}")
|
|
return None
|
|
|
|
async def search_podcast_appearances(
|
|
self,
|
|
celebrity_name: str,
|
|
lookback_days: int = 730, # 2 years
|
|
lookahead_days: int = 30,
|
|
limit: int = 25,
|
|
max_pages: int = 10
|
|
) -> List[Dict]:
|
|
"""
|
|
Search for podcast episodes featuring a celebrity.
|
|
|
|
Args:
|
|
celebrity_name: Name of the celebrity to search for
|
|
lookback_days: How many days back to search
|
|
lookahead_days: How many days forward to search (for scheduled releases)
|
|
limit: Maximum results per page
|
|
|
|
Returns:
|
|
List of podcast appearance dicts
|
|
"""
|
|
appearances = []
|
|
|
|
# Calculate date range
|
|
now = datetime.now()
|
|
start_date = now - timedelta(days=lookback_days)
|
|
# Convert to Unix timestamp (seconds)
|
|
start_timestamp = int(start_date.timestamp())
|
|
|
|
query = """
|
|
query SearchPodcastEpisodes($term: String!, $limitPerPage: Int, $page: Int, $filterForPublishedAfter: Int) {
|
|
search(
|
|
term: $term,
|
|
filterForTypes: PODCASTEPISODE,
|
|
matchBy: EXACT_PHRASE,
|
|
limitPerPage: $limitPerPage,
|
|
page: $page,
|
|
filterForPublishedAfter: $filterForPublishedAfter
|
|
) {
|
|
searchId
|
|
podcastEpisodes {
|
|
uuid
|
|
name
|
|
description
|
|
datePublished
|
|
audioUrl
|
|
persons {
|
|
uuid
|
|
name
|
|
role
|
|
}
|
|
podcastSeries {
|
|
uuid
|
|
name
|
|
imageUrl
|
|
}
|
|
websiteUrl
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
# Paginate through results (max 20 pages API limit, 25 per page = 500 max)
|
|
# max_pages passed as parameter from config
|
|
all_episodes = []
|
|
|
|
for page in range(1, max_pages + 1):
|
|
variables = {
|
|
"term": celebrity_name,
|
|
"limitPerPage": limit,
|
|
"page": page,
|
|
"filterForPublishedAfter": start_timestamp
|
|
}
|
|
|
|
data = await self._graphql_query(query, variables)
|
|
|
|
if not data or not data.get("search"):
|
|
break
|
|
|
|
episodes = data["search"].get("podcastEpisodes", [])
|
|
if not episodes:
|
|
break # No more results
|
|
|
|
all_episodes.extend(episodes)
|
|
|
|
# If we got fewer than limit, we've reached the end
|
|
if len(episodes) < limit:
|
|
break
|
|
|
|
# Small delay between pages
|
|
await asyncio.sleep(0.2)
|
|
|
|
episodes = all_episodes
|
|
|
|
for ep in episodes:
|
|
try:
|
|
# Parse the episode data
|
|
podcast_series = ep.get("podcastSeries", {})
|
|
ep_name = (ep.get("name") or "")
|
|
podcast_name = (podcast_series.get("name") or "")
|
|
name_lower = celebrity_name.lower()
|
|
name_parts = name_lower.split()
|
|
|
|
# ===== USE PERSONS METADATA FOR ACCURATE FILTERING =====
|
|
# Check if celebrity is listed in the persons array with a role
|
|
persons = ep.get("persons", []) or []
|
|
person_match = None
|
|
credit_type = None
|
|
|
|
for person in persons:
|
|
person_name = (person.get("name") or "").lower()
|
|
# Match full name or last name
|
|
if name_lower in person_name or person_name in name_lower:
|
|
person_match = person
|
|
role = (person.get("role") or "").lower()
|
|
# Map Taddy roles to our credit types
|
|
if "host" in role:
|
|
credit_type = "host"
|
|
elif "guest" in role:
|
|
credit_type = "guest"
|
|
elif role:
|
|
credit_type = role # Use whatever role they have
|
|
else:
|
|
credit_type = "guest" # Default to guest if role not specified
|
|
break
|
|
# Also check by last name for partial matches
|
|
elif len(name_parts) >= 2:
|
|
last_name = name_parts[-1]
|
|
first_name = name_parts[0]
|
|
if len(last_name) >= 4 and (last_name in person_name or first_name in person_name):
|
|
person_match = person
|
|
role = (person.get("role") or "").lower()
|
|
if "host" in role:
|
|
credit_type = "host"
|
|
elif "guest" in role:
|
|
credit_type = "guest"
|
|
elif role:
|
|
credit_type = role
|
|
else:
|
|
credit_type = "guest"
|
|
break
|
|
|
|
# If person is in the persons list, include the episode
|
|
if person_match:
|
|
logger.debug(f"Accepting '{ep_name}' - {celebrity_name} listed as {credit_type} in persons metadata")
|
|
is_host = (credit_type == "host")
|
|
else:
|
|
# Fallback: check if they're the host via podcast series name
|
|
podcast_name_lower = podcast_name.lower()
|
|
is_host = name_lower in podcast_name_lower
|
|
if not is_host and len(name_parts) >= 2:
|
|
last_name = name_parts[-1]
|
|
first_name = name_parts[0]
|
|
if len(last_name) >= 4:
|
|
is_host = (f"with {last_name}" in podcast_name_lower or
|
|
f"with {first_name}" in podcast_name_lower or
|
|
f"{first_name} {last_name}" in podcast_name_lower)
|
|
|
|
if is_host:
|
|
credit_type = "host"
|
|
logger.debug(f"Accepting '{ep_name}' - host podcast (name in series title)")
|
|
else:
|
|
# No persons metadata - use WHITELIST approach
|
|
# Only accept if title clearly indicates an interview/guest appearance
|
|
ep_name_lower = ep_name.lower()
|
|
if name_lower not in ep_name_lower:
|
|
logger.debug(f"Skipping '{ep_name}' - name not in title")
|
|
continue
|
|
|
|
# Check podcast name for news/gossip shows first
|
|
garbage_podcast_names = ['news', 'gossip', 'rumor', 'daily', 'trending', 'tmz', 'variety', 'march madness', 'cruz show', 'aesthetic arrest', 'devious maids']
|
|
if any(word in podcast_name_lower for word in garbage_podcast_names):
|
|
logger.debug(f"Skipping '{ep_name}' - podcast name suggests news/gossip")
|
|
continue
|
|
|
|
# Reject listicles (multiple comma-separated topics)
|
|
comma_count = ep_name_lower.count(',')
|
|
if comma_count >= 3:
|
|
logger.debug(f"Skipping '{ep_name}' - listicle format ({comma_count} commas)")
|
|
continue
|
|
|
|
# WHITELIST: Only accept if title matches clear interview patterns
|
|
interview_patterns = [
|
|
# Direct interview indicators
|
|
rf'(interview|interviews|interviewing)\s+(with\s+)?{re.escape(name_lower)}',
|
|
rf'{re.escape(name_lower)}\s+(interview|interviewed)',
|
|
# Guest indicators
|
|
rf'(guest|featuring|feat\.?|ft\.?|with guest|special guest)[:\s]+{re.escape(name_lower)}',
|
|
rf'{re.escape(name_lower)}\s+(joins|joined|stops by|sits down|talks|speaks|discusses|shares|reveals|opens up|gets real|gets honest)',
|
|
# "Name on Topic" format (common interview title)
|
|
rf'^{re.escape(name_lower)}\s+on\s+',
|
|
# Episode number + name format ("Ep 123: Name...")
|
|
rf'^(ep\.?|episode|#)\s*\d+[:\s]+{re.escape(name_lower)}',
|
|
# Name at start followed by colon or dash (interview format)
|
|
rf'^{re.escape(name_lower)}\s*[:\-–—]\s*',
|
|
# "Conversation with Name"
|
|
rf'(conversation|chat|talk|talking|speaking)\s+with\s+{re.escape(name_lower)}',
|
|
# "Name Returns" / "Name is Back"
|
|
rf'{re.escape(name_lower)}\s+(returns|is back|comes back)',
|
|
# Q&A format
|
|
rf'(q&a|q\s*&\s*a|ama)\s+(with\s+)?{re.escape(name_lower)}',
|
|
# Podcast-specific patterns
|
|
rf'{re.escape(name_lower)}\s+(live|in studio|in the studio|on the show|on the pod)',
|
|
]
|
|
|
|
is_interview = False
|
|
for pattern in interview_patterns:
|
|
if re.search(pattern, ep_name_lower):
|
|
is_interview = True
|
|
logger.debug(f"Accepting '{ep_name}' - matches interview pattern")
|
|
break
|
|
|
|
if not is_interview:
|
|
logger.debug(f"Skipping '{ep_name}' - no interview pattern match (name just mentioned)")
|
|
continue
|
|
|
|
credit_type = "guest"
|
|
|
|
# Get the artwork URL from podcast series
|
|
artwork_url = podcast_series.get("imageUrl")
|
|
|
|
# Parse date
|
|
date_published = ep.get("datePublished")
|
|
if date_published:
|
|
# Taddy returns Unix timestamp in seconds
|
|
try:
|
|
pub_date = datetime.fromtimestamp(date_published)
|
|
appearance_date = pub_date.strftime("%Y-%m-%d")
|
|
status = "upcoming" if pub_date.date() > now.date() else "aired"
|
|
except (ValueError, TypeError):
|
|
appearance_date = None
|
|
status = "aired"
|
|
else:
|
|
appearance_date = None
|
|
status = "aired"
|
|
|
|
# Get episode URL
|
|
episode_url = ep.get("websiteUrl")
|
|
|
|
appearance = {
|
|
"appearance_type": "Podcast",
|
|
"show_name": podcast_series.get("name", "Unknown Podcast"),
|
|
"episode_title": ep.get("name"),
|
|
"appearance_date": appearance_date,
|
|
"status": status,
|
|
"description": strip_html(ep.get("description")),
|
|
"poster_url": artwork_url,
|
|
"audio_url": ep.get("audioUrl"),
|
|
"url": episode_url,
|
|
"credit_type": credit_type or ("host" if is_host else "guest"),
|
|
"character_name": "Self",
|
|
"taddy_episode_uuid": ep.get("uuid"),
|
|
"taddy_podcast_uuid": podcast_series.get("uuid"),
|
|
"duration_seconds": None, # Duration removed from query to reduce complexity
|
|
}
|
|
|
|
appearances.append(appearance)
|
|
logger.info(f"Found podcast appearance: {celebrity_name} on '{podcast_series.get('name')}' - {ep.get('name')}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing Taddy episode: {e}")
|
|
continue
|
|
|
|
return appearances
|
|
|
|
async def test_connection(self) -> bool:
|
|
"""Test if the API credentials are valid"""
|
|
query = """
|
|
query TestConnection {
|
|
search(term: "test", filterForTypes: PODCASTSERIES, limitPerPage: 1) {
|
|
searchId
|
|
}
|
|
}
|
|
"""
|
|
|
|
data = await self._graphql_query(query)
|
|
return data is not None
|