Files
media-downloader/modules/instagram_utils.py
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

462 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Instagram Utilities Module
Shared utility functions for Instagram downloaders (imginn, fastdl, toolzu, instaloader).
Centralizes common functionality like media ID extraction to avoid code duplication.
"""
import re
from datetime import datetime
from pathlib import Path
from typing import Optional, Set, Dict, Any
def extract_instagram_media_id(filename_or_id: str) -> str:
"""Extract the actual Instagram media ID from a filename or ID string.
Instagram image filenames follow the pattern:
{user_id}_{media_id}_{post_id}_n.ext
Where media_id is a 17-18 digit number starting with 18xxxxxxx
For video stories with AQ... format, these are story keys and
we use the whole key as the media ID.
Args:
filename_or_id: A filename like '591164014_18551181784006538_2284814566270897032_n'
or just a media ID string
Returns:
The extracted Instagram media ID (17-18 digit number) or the original string
if no pattern matches
Examples:
>>> extract_instagram_media_id('591164014_18551181784006538_2284814566270897032_n')
'18551181784006538'
>>> extract_instagram_media_id('18551181784006538')
'18551181784006538'
>>> extract_instagram_media_id('AQOOlj6M4PlGHBuYl02KzwUXefsdiou9q3ooFiNF4cUy3DEY6QKxROoUe9BKCeVJA4UF5BiVPIuqXheCU')
'AQOOlj6M4PlGHBuYl02KzwUXefsdiou9q3ooFiNF4cUy3DEY6QKxROoUe9BKCeVJA4UF5BiVPIuqXheCU'
"""
if not filename_or_id:
return filename_or_id
# Pattern 1: Standard Instagram image format with underscore separators
# {user_id}_{media_id}_{post_id}_n
# Media ID is the 17-18 digit number starting with 18
# Use underscore or start/end as boundaries (not \b which doesn't work with underscores)
ig_media_id_pattern = r'(?:^|_)(18\d{15,17})(?:_|$)'
match = re.search(ig_media_id_pattern, filename_or_id)
if match:
return match.group(1)
# Pattern 2: If it's already a valid media ID (17-18 digits starting with 18)
if re.match(r'^18\d{15,17}$', filename_or_id):
return filename_or_id
# Pattern 3: Story key format (AQ... encoded string) - use as-is
if filename_or_id.startswith('AQ') and len(filename_or_id) > 50:
return filename_or_id
# Pattern 4: Short post code format (like DRkaDSFD-U2) - use as-is
if re.match(r'^[A-Za-z0-9_-]{10,15}$', filename_or_id):
return filename_or_id
# No pattern matched - return original string
return filename_or_id
def extract_media_id_from_url(url: str) -> Optional[str]:
"""Extract Instagram media ID from a CDN URL.
Instagram CDN URLs contain media IDs in patterns like:
561378837_18538674661006538_479694548187839800_n.jpg
The second number (18538674661006538) is the Instagram media ID.
Args:
url: Instagram CDN URL string
Returns:
Media ID string or None if not found
"""
if not url:
return None
# Pattern: number_MEDIAID_number_n.jpg or .mp4
pattern = r'(\d+)_(\d{17,19})_\d+_n\.(jpg|mp4|jpeg|png)'
match = re.search(pattern, url)
if match:
return match.group(2) # Return the media ID
return None
def extract_media_ids_from_url(url: str) -> list:
"""Extract all Instagram media IDs from a URL.
Similar to extract_media_id_from_url but returns all matches as a list.
Args:
url: URL string that may contain Instagram media IDs
Returns:
List of media IDs found in the URL
"""
if not url:
return []
# Pattern: number_MEDIAID_number_n.jpg
pattern = r'(\d+)_(\d{17,19})_\d+_n\.(jpg|mp4|jpeg|png)'
matches = re.findall(pattern, url)
if matches:
# Return the media ID (second capture group) from each match
return [match[1] for match in matches]
return []
def extract_post_shortcode(url: str) -> Optional[str]:
"""Extract Instagram post shortcode from a URL.
Args:
url: Instagram URL like https://www.instagram.com/p/ABC123/
Returns:
Shortcode string or None if not found
"""
if not url:
return None
match = re.search(r'/p/([^/]+)/?', url)
if match:
return match.group(1)
return None
def media_id_to_shortcode(media_id: str) -> str:
"""Convert Instagram media ID to shortcode.
Args:
media_id: Numeric media ID string
Returns:
Instagram shortcode string
"""
alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
try:
media_id_int = int(media_id)
except (ValueError, TypeError):
return media_id # Return as-is if not a valid number
shortcode = ''
while media_id_int > 0:
remainder = media_id_int % 64
media_id_int = media_id_int // 64
shortcode = alphabet[remainder] + shortcode
return shortcode or 'A'
def scan_existing_files_for_media_ids(output_dir: Path, profile_name: str = None,
min_file_size: int = 0, recursive: bool = True) -> Set[str]:
"""Scan existing files and extract media IDs for duplicate detection.
Scans image and video files in the output directory, extracts both the
full media ID string and the normalized Instagram media ID (18-digit number).
Args:
output_dir: Directory to scan for existing files
profile_name: Optional profile name to filter files
min_file_size: Minimum file size in bytes (skip smaller files as corrupted)
recursive: If True, search subdirectories (rglob), otherwise only top level (glob)
Returns:
Set of media IDs (both full and normalized) found in existing files
"""
media_ids = set()
if not output_dir.exists():
return media_ids
glob_func = output_dir.rglob if recursive else output_dir.glob
for pattern in ["*.jpg", "*.jpeg", "*.png", "*.heic", "*.mp4", "*.mov"]:
for filepath in glob_func(pattern):
# Skip files smaller than min_file_size (likely corrupted/incomplete)
if min_file_size > 0:
try:
if filepath.stat().st_size < min_file_size:
continue
except OSError:
continue
filename = filepath.stem
# Format is: profile_YYYYMMDD_HHMMSS_mediaid
# Split into parts: [profile, date, time, ...rest is media_id]
parts = filename.split('_', 3)
if len(parts) >= 4:
# Check profile name if provided
if profile_name and parts[0] != profile_name:
continue
media_id_full = parts[3]
elif len(parts) > 1:
media_id_full = parts[-1]
else:
media_id_full = filename
if media_id_full:
# Add the full media ID string
media_ids.add(media_id_full)
# Also add the normalized Instagram media ID (18-digit number)
normalized_id = extract_instagram_media_id(media_id_full)
if normalized_id and normalized_id != media_id_full:
media_ids.add(normalized_id)
return media_ids
def parse_instagram_filename(filename: str) -> dict:
"""Parse an Instagram filename into its components.
Args:
filename: Filename like 'evalongoria_20251205_120406_591164014_18551181784006538_2284814566270897032_n_story1.jpg'
Returns:
Dictionary with parsed components:
- username: str or None
- date: str or None (YYYYMMDD format)
- time: str or None (HHMMSS format)
- media_id_full: str or None (full ID after date/time)
- media_id: str or None (normalized 18-digit Instagram media ID)
- suffix: str or None (e.g., 'story1')
- extension: str or None
"""
result = {
'username': None,
'date': None,
'time': None,
'media_id_full': None,
'media_id': None,
'suffix': None,
'extension': None
}
if not filename:
return result
# Get extension
path = Path(filename)
result['extension'] = path.suffix.lower() if path.suffix else None
basename = path.stem
# Split into parts
parts = basename.split('_')
if len(parts) >= 4:
result['username'] = parts[0]
# Check if parts[1] and parts[2] look like date/time
if len(parts[1]) == 8 and parts[1].isdigit():
result['date'] = parts[1]
if len(parts[2]) == 6 and parts[2].isdigit():
result['time'] = parts[2]
# Everything after date/time is the media ID (possibly with suffix)
media_id_full = '_'.join(parts[3:])
result['media_id_full'] = media_id_full
# Check for story suffix
if '_story' in media_id_full:
media_part, suffix_part = media_id_full.rsplit('_story', 1)
result['media_id_full'] = media_part
result['suffix'] = f'story{suffix_part}'
# Extract normalized media ID
result['media_id'] = extract_instagram_media_id(result['media_id_full'])
return result
def record_instagram_download(db, media_id: str, username: str, content_type: str,
filename: str, url: str = None, download_url: str = None,
post_date: datetime = None, file_path: str = None,
method: str = None, extra_metadata: Dict[str, Any] = None) -> bool:
"""Record an Instagram download in the database with normalized media_id.
This is the centralized function for recording Instagram downloads across all
Instagram downloader modules (imginn, fastdl, toolzu, instaloader). It ensures
the media_id is always normalized for cross-module duplicate detection.
Args:
db: Database instance (UnifiedDatabase or adapter with record_download method)
media_id: The media ID (will be normalized automatically)
username: Instagram username
content_type: Type of content (posts, stories, reels, highlights)
filename: Filename of the downloaded file
url: Original Instagram URL (e.g., https://instagram.com/p/ABC123/)
download_url: Direct download URL (CDN URL)
post_date: Post date/time
file_path: Full file path on disk
method: Download method (imginn, fastdl, toolzu, instaloader)
extra_metadata: Additional metadata to include
Returns:
True if successfully recorded, False otherwise
"""
if not db:
return False
# Normalize the media_id for consistent cross-module detection
normalized_media_id = extract_instagram_media_id(media_id) if media_id else media_id
# Build metadata with normalized media_id
metadata = {
'media_id': normalized_media_id,
'original_media_id': media_id if media_id != normalized_media_id else None,
}
# Add extra metadata if provided
if extra_metadata:
metadata.update(extra_metadata)
# Remove None values
metadata = {k: v for k, v in metadata.items() if v is not None}
# Determine URL for database (use download_url or construct from media_id)
db_url = url or download_url or f"instagram://{normalized_media_id}"
# Calculate file hash if file_path provided
file_hash = None
if file_path:
try:
from modules.unified_database import UnifiedDatabase
file_hash = UnifiedDatabase.get_file_hash(file_path)
except Exception:
pass
try:
# Try to use the db's record_download method directly
if hasattr(db, 'record_download'):
return db.record_download(
url=db_url,
platform='instagram',
source=username,
content_type=content_type,
filename=filename,
file_path=file_path,
file_hash=file_hash,
post_date=post_date,
metadata=metadata,
method=method
)
# Fallback for adapter-style databases
elif hasattr(db, 'mark_downloaded'):
return db.mark_downloaded(
username=username,
url=db_url,
filename=filename,
post_date=post_date,
metadata=metadata,
file_path=file_path,
content_type=content_type
)
else:
return False
except Exception:
return False
def is_instagram_downloaded(db, media_id: str, username: str = None) -> bool:
"""Check if Instagram content is already downloaded by media_id.
Checks for both the original and normalized media_id to ensure cross-module
duplicate detection works correctly.
Args:
db: Database instance (UnifiedDatabase or adapter)
media_id: The media ID to check (will check both original and normalized)
username: Optional username to scope the check
Returns:
True if already downloaded, False otherwise
"""
if not db or not media_id:
return False
# Normalize the media_id
normalized_media_id = extract_instagram_media_id(media_id)
# Check if this looks like a shortcode (10-15 alphanumeric chars, no 18xxx pattern)
is_shortcode = (normalized_media_id == media_id and
re.match(r'^[A-Za-z0-9_-]{10,15}$', media_id) and
not re.match(r'^18\d{15,17}$', media_id))
try:
# Check if db has get_connection (UnifiedDatabase) - query directly
if hasattr(db, 'get_connection'):
with db.get_connection() as conn:
cursor = conn.cursor()
# Check both normalized and original media_id
# Also verify file_path is set (download was actually completed)
if normalized_media_id != media_id:
cursor.execute('''
SELECT 1 FROM downloads
WHERE platform = 'instagram'
AND (media_id = ? OR media_id = ?)
AND file_path IS NOT NULL AND file_path != ''
LIMIT 1
''', (normalized_media_id, media_id))
else:
cursor.execute('''
SELECT 1 FROM downloads
WHERE platform = 'instagram'
AND media_id = ?
AND file_path IS NOT NULL AND file_path != ''
LIMIT 1
''', (normalized_media_id,))
if cursor.fetchone() is not None:
return True
# For shortcodes, also check the metadata JSON column
if is_shortcode:
cursor.execute('''
SELECT 1 FROM downloads
WHERE platform = 'instagram'
AND metadata LIKE ?
AND file_path IS NOT NULL AND file_path != ''
LIMIT 1
''', (f'%"shortcode": "{media_id}"%',))
if cursor.fetchone() is not None:
return True
# Check recycle bin — files previously downloaded then deleted
# should not be re-downloaded
cursor.execute('''
SELECT 1 FROM recycle_bin
WHERE original_filename LIKE ?
LIMIT 1
''', (f'%{normalized_media_id}%',))
if cursor.fetchone() is not None:
return True
return False
# Fallback for adapters with is_already_downloaded method
elif hasattr(db, 'is_already_downloaded'):
if db.is_already_downloaded(normalized_media_id):
return True
# Also check original if different
if normalized_media_id != media_id and db.is_already_downloaded(media_id):
return True
return False
except Exception:
return False