536 lines
17 KiB
Python
536 lines
17 KiB
Python
"""
|
|
Media Identifier Module
|
|
|
|
Parses media filenames using guessit and matches them against TMDB for metadata enrichment.
|
|
Generates organized file paths for TV Shows and Movies.
|
|
"""
|
|
|
|
import re
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import requests
|
|
|
|
from modules.universal_logger import get_logger
|
|
|
|
logger = get_logger('MediaIdentifier')
|
|
|
|
# Try to import guessit, but gracefully handle if not installed
|
|
try:
|
|
import guessit
|
|
GUESSIT_AVAILABLE = True
|
|
except ImportError:
|
|
GUESSIT_AVAILABLE = False
|
|
logger.warning("guessit not installed - filename parsing will be limited")
|
|
|
|
|
|
@dataclass
|
|
class ParsedMedia:
|
|
"""Represents parsed media information from a filename."""
|
|
title: str
|
|
media_type: str # 'movie' or 'episode' (TV)
|
|
year: Optional[int] = None
|
|
season: Optional[int] = None
|
|
episode: Optional[int] = None
|
|
quality: Optional[str] = None
|
|
source: Optional[str] = None
|
|
codec: Optional[str] = None
|
|
release_group: Optional[str] = None
|
|
original_filename: str = ""
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
'title': self.title,
|
|
'media_type': self.media_type,
|
|
'year': self.year,
|
|
'season': self.season,
|
|
'episode': self.episode,
|
|
'quality': self.quality,
|
|
'source': self.source,
|
|
'codec': self.codec,
|
|
'release_group': self.release_group,
|
|
'original_filename': self.original_filename,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class TMDBMatch:
|
|
"""Represents a TMDB match for parsed media."""
|
|
tmdb_id: int
|
|
title: str
|
|
original_title: Optional[str]
|
|
media_type: str # 'movie' or 'tv'
|
|
year: Optional[int] = None
|
|
poster_path: Optional[str] = None
|
|
overview: Optional[str] = None
|
|
# For TV episodes
|
|
season_number: Optional[int] = None
|
|
episode_number: Optional[int] = None
|
|
episode_title: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
'tmdb_id': self.tmdb_id,
|
|
'title': self.title,
|
|
'original_title': self.original_title,
|
|
'media_type': self.media_type,
|
|
'year': self.year,
|
|
'poster_path': self.poster_path,
|
|
'overview': self.overview,
|
|
'season_number': self.season_number,
|
|
'episode_number': self.episode_number,
|
|
'episode_title': self.episode_title,
|
|
}
|
|
|
|
|
|
class MediaIdentifier:
|
|
"""
|
|
Identifies media from filenames and matches against TMDB.
|
|
|
|
Uses guessit for filename parsing and TMDB API for metadata enrichment.
|
|
"""
|
|
|
|
TMDB_BASE_URL = "https://api.themoviedb.org/3"
|
|
TMDB_IMAGE_BASE = "https://image.tmdb.org/t/p/w500"
|
|
|
|
# Quality normalization patterns
|
|
QUALITY_MAP = {
|
|
'2160p': '2160p',
|
|
'4k': '2160p',
|
|
'uhd': '2160p',
|
|
'1080p': '1080p',
|
|
'fullhd': '1080p',
|
|
'fhd': '1080p',
|
|
'720p': '720p',
|
|
'hd': '720p',
|
|
'480p': '480p',
|
|
'sd': '480p',
|
|
'360p': '360p',
|
|
}
|
|
|
|
def __init__(self, tmdb_api_key: str):
|
|
"""
|
|
Initialize the MediaIdentifier.
|
|
|
|
Args:
|
|
tmdb_api_key: TMDB API key for lookups
|
|
"""
|
|
self.api_key = tmdb_api_key
|
|
self.session = requests.Session()
|
|
|
|
def parse_filename(self, filename: str) -> Optional[ParsedMedia]:
|
|
"""
|
|
Parse a media filename to extract metadata.
|
|
|
|
Args:
|
|
filename: The filename to parse (without path)
|
|
|
|
Returns:
|
|
ParsedMedia object with extracted information, or None if parsing fails
|
|
"""
|
|
if not filename:
|
|
return None
|
|
|
|
# Strip path if present
|
|
filename = Path(filename).name
|
|
|
|
if GUESSIT_AVAILABLE:
|
|
return self._parse_with_guessit(filename)
|
|
else:
|
|
return self._parse_fallback(filename)
|
|
|
|
def _parse_with_guessit(self, filename: str) -> Optional[ParsedMedia]:
|
|
"""Parse filename using guessit library."""
|
|
try:
|
|
result = guessit.guessit(filename)
|
|
|
|
# Determine media type
|
|
media_type = result.get('type', 'movie')
|
|
if media_type == 'episode':
|
|
media_type = 'episode'
|
|
else:
|
|
media_type = 'movie'
|
|
|
|
# Extract title
|
|
title = result.get('title', '')
|
|
if not title:
|
|
return None
|
|
|
|
# Extract quality
|
|
quality = None
|
|
screen_size = result.get('screen_size')
|
|
if screen_size:
|
|
quality = self.QUALITY_MAP.get(str(screen_size).lower(), str(screen_size))
|
|
|
|
return ParsedMedia(
|
|
title=title,
|
|
media_type=media_type,
|
|
year=result.get('year'),
|
|
season=result.get('season'),
|
|
episode=result.get('episode'),
|
|
quality=quality,
|
|
source=result.get('source'),
|
|
codec=result.get('video_codec'),
|
|
release_group=result.get('release_group'),
|
|
original_filename=filename,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"guessit parsing failed for '{filename}': {e}")
|
|
return self._parse_fallback(filename)
|
|
|
|
def _parse_fallback(self, filename: str) -> Optional[ParsedMedia]:
|
|
"""
|
|
Fallback parser when guessit is not available.
|
|
Uses regex patterns to extract common media info.
|
|
"""
|
|
try:
|
|
# Remove extension
|
|
name = Path(filename).stem
|
|
|
|
# Replace common separators with spaces
|
|
name = re.sub(r'[._]', ' ', name)
|
|
|
|
# Try to extract TV show pattern: Show Name S01E02 or Show.Name.1x02
|
|
tv_pattern = r'^(.+?)[\s\.]+[Ss](\d{1,2})[Ee](\d{1,2})'
|
|
tv_match = re.match(tv_pattern, name)
|
|
|
|
if tv_match:
|
|
title = tv_match.group(1).strip()
|
|
season = int(tv_match.group(2))
|
|
episode = int(tv_match.group(3))
|
|
|
|
# Extract quality
|
|
quality = self._extract_quality(name)
|
|
|
|
return ParsedMedia(
|
|
title=title,
|
|
media_type='episode',
|
|
season=season,
|
|
episode=episode,
|
|
quality=quality,
|
|
original_filename=filename,
|
|
)
|
|
|
|
# Try alternative TV pattern: 1x02 format
|
|
alt_tv_pattern = r'^(.+?)[\s\.]+(\d{1,2})x(\d{1,2})'
|
|
alt_match = re.match(alt_tv_pattern, name)
|
|
|
|
if alt_match:
|
|
title = alt_match.group(1).strip()
|
|
season = int(alt_match.group(2))
|
|
episode = int(alt_match.group(3))
|
|
|
|
quality = self._extract_quality(name)
|
|
|
|
return ParsedMedia(
|
|
title=title,
|
|
media_type='episode',
|
|
season=season,
|
|
episode=episode,
|
|
quality=quality,
|
|
original_filename=filename,
|
|
)
|
|
|
|
# Assume movie - extract title and year
|
|
# Pattern: Movie Title (2023) or Movie.Title.2023
|
|
movie_pattern = r'^(.+?)[\s\.]+\(?(\d{4})\)?'
|
|
movie_match = re.match(movie_pattern, name)
|
|
|
|
if movie_match:
|
|
title = movie_match.group(1).strip()
|
|
year = int(movie_match.group(2))
|
|
else:
|
|
# Just use the name as title
|
|
title = name.split()[0] if name.split() else name
|
|
year = None
|
|
|
|
quality = self._extract_quality(name)
|
|
|
|
return ParsedMedia(
|
|
title=title,
|
|
media_type='movie',
|
|
year=year,
|
|
quality=quality,
|
|
original_filename=filename,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fallback parsing failed for '{filename}': {e}")
|
|
return None
|
|
|
|
def _extract_quality(self, text: str) -> Optional[str]:
|
|
"""Extract quality from text."""
|
|
text_lower = text.lower()
|
|
for pattern, quality in self.QUALITY_MAP.items():
|
|
if pattern in text_lower:
|
|
return quality
|
|
return None
|
|
|
|
def match_tmdb(self, parsed: ParsedMedia) -> Optional[TMDBMatch]:
|
|
"""
|
|
Match parsed media against TMDB.
|
|
|
|
Args:
|
|
parsed: ParsedMedia object from parse_filename
|
|
|
|
Returns:
|
|
TMDBMatch object if found, None otherwise
|
|
"""
|
|
if not parsed:
|
|
return None
|
|
|
|
try:
|
|
if parsed.media_type == 'episode':
|
|
return self._match_tv_show(parsed)
|
|
else:
|
|
return self._match_movie(parsed)
|
|
except Exception as e:
|
|
logger.error(f"TMDB matching failed for '{parsed.title}': {e}")
|
|
return None
|
|
|
|
def _match_tv_show(self, parsed: ParsedMedia) -> Optional[TMDBMatch]:
|
|
"""Match a TV show episode against TMDB."""
|
|
try:
|
|
# Search for the TV show
|
|
search_url = f"{self.TMDB_BASE_URL}/search/tv"
|
|
params = {
|
|
'api_key': self.api_key,
|
|
'query': parsed.title,
|
|
'page': 1,
|
|
}
|
|
if parsed.year:
|
|
params['first_air_date_year'] = parsed.year
|
|
|
|
response = self.session.get(search_url, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
results = data.get('results', [])
|
|
if not results:
|
|
logger.debug(f"No TMDB results for TV show: {parsed.title}")
|
|
return None
|
|
|
|
# Use the first (best) result
|
|
show = results[0]
|
|
show_id = show['id']
|
|
|
|
# Get episode details if we have season/episode
|
|
episode_title = None
|
|
if parsed.season and parsed.episode:
|
|
episode_url = f"{self.TMDB_BASE_URL}/tv/{show_id}/season/{parsed.season}/episode/{parsed.episode}"
|
|
ep_params = {'api_key': self.api_key}
|
|
try:
|
|
ep_response = self.session.get(episode_url, params=ep_params, timeout=30)
|
|
if ep_response.status_code == 200:
|
|
ep_data = ep_response.json()
|
|
episode_title = ep_data.get('name')
|
|
except Exception:
|
|
pass
|
|
|
|
# Parse year from first_air_date
|
|
year = None
|
|
first_air_date = show.get('first_air_date', '')
|
|
if first_air_date and len(first_air_date) >= 4:
|
|
try:
|
|
year = int(first_air_date[:4])
|
|
except ValueError:
|
|
pass
|
|
|
|
return TMDBMatch(
|
|
tmdb_id=show_id,
|
|
title=show.get('name', parsed.title),
|
|
original_title=show.get('original_name'),
|
|
media_type='tv',
|
|
year=year,
|
|
poster_path=show.get('poster_path'),
|
|
overview=show.get('overview'),
|
|
season_number=parsed.season,
|
|
episode_number=parsed.episode,
|
|
episode_title=episode_title,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"TMDB TV show matching failed: {e}")
|
|
return None
|
|
|
|
def _match_movie(self, parsed: ParsedMedia) -> Optional[TMDBMatch]:
|
|
"""Match a movie against TMDB."""
|
|
try:
|
|
# Search for the movie
|
|
search_url = f"{self.TMDB_BASE_URL}/search/movie"
|
|
params = {
|
|
'api_key': self.api_key,
|
|
'query': parsed.title,
|
|
'page': 1,
|
|
}
|
|
if parsed.year:
|
|
params['year'] = parsed.year
|
|
|
|
response = self.session.get(search_url, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
results = data.get('results', [])
|
|
if not results:
|
|
logger.debug(f"No TMDB results for movie: {parsed.title}")
|
|
return None
|
|
|
|
# Use the first (best) result
|
|
movie = results[0]
|
|
|
|
# Parse year from release_date
|
|
year = None
|
|
release_date = movie.get('release_date', '')
|
|
if release_date and len(release_date) >= 4:
|
|
try:
|
|
year = int(release_date[:4])
|
|
except ValueError:
|
|
pass
|
|
|
|
return TMDBMatch(
|
|
tmdb_id=movie['id'],
|
|
title=movie.get('title', parsed.title),
|
|
original_title=movie.get('original_title'),
|
|
media_type='movie',
|
|
year=year,
|
|
poster_path=movie.get('poster_path'),
|
|
overview=movie.get('overview'),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"TMDB movie matching failed: {e}")
|
|
return None
|
|
|
|
def get_organized_path(
|
|
self,
|
|
match: TMDBMatch,
|
|
base_path: str,
|
|
original_filename: str,
|
|
) -> str:
|
|
"""
|
|
Generate an organized file path for the matched media.
|
|
|
|
Args:
|
|
match: TMDBMatch object with TMDB metadata
|
|
base_path: Base directory for media storage
|
|
original_filename: Original filename (for extension)
|
|
|
|
Returns:
|
|
Full organized path for the file
|
|
"""
|
|
base = Path(base_path)
|
|
|
|
# Get extension from original filename
|
|
ext = Path(original_filename).suffix
|
|
|
|
# Sanitize title for filesystem
|
|
safe_title = self._sanitize_filename(match.title)
|
|
|
|
if match.media_type == 'tv':
|
|
# TV: {base}/TV Shows/{Show}/Season {XX}/{Show} - S{XX}E{XX} - {Episode Title}.{ext}
|
|
show_dir = base / "TV Shows" / safe_title
|
|
|
|
if match.season_number is not None:
|
|
season_dir = show_dir / f"Season {match.season_number:02d}"
|
|
else:
|
|
season_dir = show_dir / "Season 01"
|
|
|
|
# Build filename
|
|
if match.season_number is not None and match.episode_number is not None:
|
|
ep_part = f"S{match.season_number:02d}E{match.episode_number:02d}"
|
|
else:
|
|
ep_part = "S01E01"
|
|
|
|
if match.episode_title:
|
|
safe_ep_title = self._sanitize_filename(match.episode_title)
|
|
filename = f"{safe_title} - {ep_part} - {safe_ep_title}{ext}"
|
|
else:
|
|
filename = f"{safe_title} - {ep_part}{ext}"
|
|
|
|
return str(season_dir / filename)
|
|
|
|
else:
|
|
# Movie: {base}/Movies/{Title} ({Year})/{Title} ({Year}).{ext}
|
|
if match.year:
|
|
movie_folder = f"{safe_title} ({match.year})"
|
|
else:
|
|
movie_folder = safe_title
|
|
|
|
movie_dir = base / "Movies" / movie_folder
|
|
filename = f"{movie_folder}{ext}"
|
|
|
|
return str(movie_dir / filename)
|
|
|
|
def _sanitize_filename(self, name: str) -> str:
|
|
"""
|
|
Sanitize a string for use as a filename.
|
|
|
|
Removes/replaces characters that are invalid in filenames.
|
|
"""
|
|
if not name:
|
|
return "Unknown"
|
|
|
|
# Replace problematic characters
|
|
name = re.sub(r'[<>:"/\\|?*]', '', name)
|
|
name = re.sub(r'\s+', ' ', name)
|
|
name = name.strip()
|
|
|
|
# Limit length
|
|
if len(name) > 100:
|
|
name = name[:100].strip()
|
|
|
|
return name if name else "Unknown"
|
|
|
|
def identify_and_match(
|
|
self,
|
|
filename: str,
|
|
base_path: str = "/media",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Convenience method to parse, match, and get organized path in one call.
|
|
|
|
Args:
|
|
filename: The media filename to process
|
|
base_path: Base directory for organized media
|
|
|
|
Returns:
|
|
Dict with parsed info, TMDB match, and organized path
|
|
"""
|
|
result = {
|
|
'success': False,
|
|
'filename': filename,
|
|
'parsed': None,
|
|
'match': None,
|
|
'organized_path': None,
|
|
'error': None,
|
|
}
|
|
|
|
try:
|
|
# Parse filename
|
|
parsed = self.parse_filename(filename)
|
|
if not parsed:
|
|
result['error'] = 'Failed to parse filename'
|
|
return result
|
|
|
|
result['parsed'] = parsed.to_dict()
|
|
|
|
# Match against TMDB
|
|
match = self.match_tmdb(parsed)
|
|
if match:
|
|
result['match'] = match.to_dict()
|
|
|
|
# Get organized path
|
|
organized_path = self.get_organized_path(match, base_path, filename)
|
|
result['organized_path'] = organized_path
|
|
result['success'] = True
|
|
else:
|
|
result['error'] = 'No TMDB match found'
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
logger.error(f"identify_and_match failed for '{filename}': {e}")
|
|
return result
|