Files
media-downloader/modules/podchaser_client.py
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

446 lines
16 KiB
Python

"""Podchaser GraphQL API client for podcast guest appearances tracking"""
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from web.backend.core.http_client import http_client
from modules.universal_logger import get_logger
logger = get_logger('Podchaser')
class PodchaserClient:
"""Client for interacting with the Podchaser GraphQL API"""
API_URL = "https://api.podchaser.com/graphql"
def __init__(self, api_key: str):
# API key is actually the access token (already exchanged from client credentials)
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
@classmethod
async def from_client_credentials(cls, client_id: str, client_secret: str):
"""
Create a PodchaserClient by exchanging client credentials for an access token
Args:
client_id: Podchaser client ID
client_secret: Podchaser client secret
Returns:
PodchaserClient instance with access token
"""
from web.backend.core.http_client import http_client
mutation = """
mutation GetToken($client_id: String!, $client_secret: String!) {
requestAccessToken(
input: {
grant_type: CLIENT_CREDENTIALS
client_id: $client_id
client_secret: $client_secret
}
) {
access_token
}
}
"""
variables = {
"client_id": client_id,
"client_secret": client_secret
}
try:
response = await http_client.post(
cls.API_URL,
json={"query": mutation, "variables": variables},
headers={"Content-Type": "application/json"}
)
data = response.json()
if "errors" in data:
logger.error(f"Failed to get Podchaser access token: {data['errors']}")
raise Exception(f"Podchaser authentication failed: {data['errors']}")
access_token = data.get("data", {}).get("requestAccessToken", {}).get("access_token")
if not access_token:
raise Exception("No access token returned from Podchaser")
logger.info("Successfully obtained Podchaser access token")
return cls(access_token)
except Exception as e:
logger.error(f"Error getting Podchaser access token: {e}")
raise
async def _execute_query(self, query: str, variables: Optional[Dict] = None) -> Dict:
"""Execute a GraphQL query"""
try:
payload = {"query": query}
if variables:
payload["variables"] = variables
response = await http_client.post(
self.API_URL,
json=payload,
headers=self.headers
)
data = response.json()
if "errors" in data:
logger.error(f"GraphQL errors: {data['errors']}")
return {}
return data.get("data", {})
except Exception as e:
logger.error(f"Podchaser API error: {e}")
return {}
async def search_creator_by_creators_endpoint(self, name: str) -> Optional[Dict]:
"""
Search for a creator using the creators endpoint
This is more direct than searching via credits or podcasts
"""
query = """
query FindCreator($term: String!) {
creators(searchTerm: $term, first: 10) {
data {
pcid
name
informalName
subtitle
imageUrl
url
episodeAppearanceCount
}
}
}
"""
variables = {"term": name}
data = await self._execute_query(query, variables)
if data and "creators" in data and data["creators"]["data"]:
creators = data["creators"]["data"]
# Prefer exact case-insensitive match
name_lower = name.strip().lower()
for creator in creators:
if creator.get("name") and creator["name"].strip().lower() == name_lower:
logger.info(f"Found exact creator match: {creator['name']} (pcid: {creator['pcid']})")
return creator
# Return first result if no exact match
if creators:
logger.info(f"Found creator: {creators[0]['name']} (pcid: {creators[0]['pcid']})")
return creators[0]
return None
async def search_creator(self, name: str) -> Optional[Dict]:
"""
Search for a creator by name using the creators endpoint
Returns the first matching creator or None
"""
return await self.search_creator_by_creators_endpoint(name)
async def get_creator_guest_appearances(self, creator_id: str, days_back: int = 30, days_ahead: int = 365) -> List[Dict]:
"""
Get all guest AND host appearances (episodeCredits) for a creator
Filters for recent and upcoming episodes
Args:
creator_id: Podchaser creator ID
days_back: How many days in the past to search
days_ahead: How many days in the future to search
Returns:
List of episode appearances with metadata (both guest and host roles)
"""
today = datetime.now().date()
cutoff_past = today - timedelta(days=days_back)
cutoff_future = today + timedelta(days=days_ahead)
query = """
query GetCreatorAppearances($creatorId: String!, $page: Int) {
creator(identifier: {type: PCID, id: $creatorId}) {
pcid
name
episodeCredits(
filters: { role: ["guest", "host"] }
first: 20
page: $page
sort: {sortBy: DATE, direction: DESCENDING}
) {
data {
role {
code
title
}
episode {
id
title
description
url
imageUrl
audioUrl
airDate
podcast {
id
title
imageUrl
url
categories {
title
slug
}
}
}
}
paginatorInfo {
currentPage
hasMorePages
lastPage
}
}
}
}
"""
page = 1
max_pages = 10 # Limit to prevent excessive API calls
appearances = []
while page <= max_pages:
variables = {
"creatorId": str(creator_id),
"page": page
}
data = await self._execute_query(query, variables)
if not data or "creator" not in data or not data["creator"]:
break
creator_data = data["creator"]
episode_credits = creator_data.get("episodeCredits", {}).get("data", [])
logger.info(f"Fetched {len(episode_credits)} episodes from Podchaser (page {page})")
for credit in episode_credits:
episode = credit.get("episode")
if not episode:
continue
# Check air date
air_date_str = episode.get("airDate")
if not air_date_str:
continue
try:
# Handle both "YYYY-MM-DD" and "YYYY-MM-DD HH:MM:SS" formats
# Take only the date part (first 10 characters for YYYY-MM-DD)
date_part = air_date_str[:10] if len(air_date_str) >= 10 else air_date_str
air_date = datetime.strptime(date_part, "%Y-%m-%d").date()
# Only include episodes within our time window
if cutoff_past <= air_date <= cutoff_future:
podcast = episode.get("podcast", {})
role_obj = credit.get("role", {})
role_name = role_obj.get("title") if isinstance(role_obj, dict) else None
appearances.append({
"podchaser_episode_id": episode.get("id"),
"episode_title": episode.get("title"),
"podcast_name": podcast.get("title"),
"description": episode.get("description"),
"air_date": air_date_str,
"episode_url": episode.get("url"),
"audio_url": episode.get("audioUrl"),
"poster_url": episode.get("imageUrl") or podcast.get("imageUrl"),
"role": role_name,
"podchaser_podcast_id": podcast.get("id"),
})
except ValueError as e:
logger.debug(f"Date parse error for episode: {e}")
continue
# Check if there are more pages
paginator = creator_data.get("episodeCredits", {}).get("paginatorInfo", {})
if not paginator.get("hasMorePages"):
break
page += 1
await asyncio.sleep(0.15) # Rate limiting
logger.info(f"Returning {len(appearances)} guest/host appearances for creator {creator_id}")
return appearances
async def get_creator_podcast_episodes(self, creator_name: str, days_back: int = 30, days_ahead: int = 365) -> List[Dict]:
"""
Get podcast episodes where the creator is a host
Searches for podcasts by the creator's name and returns recent episodes
Args:
creator_name: Creator's name to search for
days_back: How many days in the past to search
days_ahead: How many days in the future to search
Returns:
List of podcast episodes with metadata
"""
today = datetime.now().date()
cutoff_past = today - timedelta(days=days_back)
cutoff_future = today + timedelta(days=days_ahead)
# Search for podcasts by creator name
query = """
query SearchPodcastByHost($searchTerm: String!) {
podcasts(searchTerm: $searchTerm, first: 5) {
data {
id
title
imageUrl
url
credits(first: 20) {
data {
role {
code
title
}
creator {
pcid
name
}
}
}
episodes(first: 50, sort: {sortBy: AIR_DATE, direction: DESCENDING}) {
data {
id
title
description
url
imageUrl
audioUrl
airDate
}
}
}
}
}
"""
variables = {"searchTerm": creator_name}
data = await self._execute_query(query, variables)
appearances = []
if data and "podcasts" in data and data["podcasts"]["data"]:
for podcast in data["podcasts"]["data"]:
# Check if the creator is a host of this podcast
credits = podcast.get("credits", {}).get("data", [])
is_host = False
host_role = None
for credit in credits:
creator = credit.get("creator", {})
role = credit.get("role", {})
# Check if this is our creator and they're a host
if (role.get("code") == "host" and
creator.get("name") and
(creator_name.lower() in creator["name"].lower() or
creator["name"].lower() in creator_name.lower())):
is_host = True
host_role = role.get("title")
break
if not is_host:
continue
# Get episodes from this podcast
episodes = podcast.get("episodes", {}).get("data", [])
for episode in episodes:
air_date_str = episode.get("airDate")
if not air_date_str:
continue
try:
# Handle both "YYYY-MM-DD" and "YYYY-MM-DD HH:MM:SS" formats
# Take only the date part (first 10 characters for YYYY-MM-DD)
date_part = air_date_str[:10] if len(air_date_str) >= 10 else air_date_str
air_date = datetime.strptime(date_part, "%Y-%m-%d").date()
# Only include episodes within our time window
if cutoff_past <= air_date <= cutoff_future:
appearances.append({
"podchaser_episode_id": episode.get("id"),
"episode_title": episode.get("title"),
"podcast_name": podcast.get("title"),
"description": episode.get("description"),
"air_date": air_date_str,
"episode_url": episode.get("url"),
"audio_url": episode.get("audioUrl"),
"poster_url": episode.get("imageUrl") or podcast.get("imageUrl"),
"role": host_role,
"podchaser_podcast_id": podcast.get("id"),
})
except ValueError:
continue
return appearances
async def find_upcoming_podcast_appearances(self, creator_id: str, creator_name: str = None) -> List[Dict]:
"""
Find upcoming podcast appearances for a creator
Includes both guest appearances (episodeCredits) and hosted podcast episodes
Returns episodes that haven't aired yet or aired within last 90 days
Args:
creator_id: Podchaser creator ID (pcid)
creator_name: Creator's name (required for podcast search)
"""
# Get both guest appearances and hosted episodes
guest_appearances = await self.get_creator_guest_appearances(
creator_id,
days_back=365, # Look back 1 year for recent episodes
days_ahead=365
)
# For hosted episodes, we need the creator name
hosted_episodes = []
if creator_name:
hosted_episodes = await self.get_creator_podcast_episodes(
creator_name,
days_back=365, # Look back 1 year for recent episodes
days_ahead=365
)
else:
logger.warning(f"No creator name provided for {creator_id}, skipping podcast host search")
# Combine and deduplicate by episode ID
all_appearances = {}
for appearance in guest_appearances + hosted_episodes:
episode_id = appearance.get("podchaser_episode_id")
if episode_id:
# If duplicate, prefer the one with more info (hosted episodes usually have more)
if episode_id not in all_appearances or len(str(appearance.get("description", ""))) > len(str(all_appearances[episode_id].get("description", ""))):
all_appearances[episode_id] = appearance
# Sort by air date
sorted_appearances = sorted(
all_appearances.values(),
key=lambda x: x.get("air_date", ""),
reverse=True
)
return sorted_appearances