3776 lines
173 KiB
Python
Executable File
3776 lines
173 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
FastDL Instagram Downloader Module
|
|
Can be imported and used in other scripts
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
|
|
import os
|
|
import re
|
|
import urllib.parse
|
|
import json
|
|
import random
|
|
import time
|
|
import platform
|
|
import subprocess
|
|
import requests
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from modules.base_module import LoggingMixin
|
|
from modules.cloudflare_handler import (
|
|
CloudflareHandler, SiteStatus, get_flaresolverr_user_agent,
|
|
get_playwright_context_options, get_playwright_stealth_scripts
|
|
)
|
|
from modules.instagram_utils import (
|
|
extract_instagram_media_id,
|
|
scan_existing_files_for_media_ids,
|
|
record_instagram_download,
|
|
is_instagram_downloaded
|
|
)
|
|
|
|
|
|
class FastDLDownloader(LoggingMixin):
|
|
"""
|
|
FastDL Instagram downloader that can be used as a module
|
|
|
|
Example usage:
|
|
from fastdl_module import FastDLDownloader
|
|
|
|
# Download stories for a user
|
|
downloader = FastDLDownloader()
|
|
count = downloader.download(
|
|
username="evalongoria",
|
|
content_type="stories",
|
|
output_dir="downloads/stories"
|
|
)
|
|
print(f"Downloaded {count} items")
|
|
"""
|
|
|
|
def __init__(self, headless=True, show_progress=True, use_database=True, log_callback=None, unified_db=None, high_res=False):
|
|
"""
|
|
Initialize the downloader
|
|
|
|
Args:
|
|
headless: Run browser in headless mode
|
|
show_progress: Print progress messages
|
|
use_database: Use SQLite database to track downloads (set False to re-download)
|
|
db_path: Path to SQLite database file (ignored if unified_db provided)
|
|
log_callback: Optional callback function for logging (tag, level, message)
|
|
unified_db: Optional UnifiedDatabase instance for centralized tracking
|
|
high_res: Use high-resolution download mode (searches individual Instagram URLs)
|
|
"""
|
|
# Initialize logging via mixin
|
|
self._init_logger('Instagram', log_callback, default_module='Download')
|
|
|
|
self.headless = headless
|
|
self.show_progress = show_progress
|
|
self.fastdl_url = "https://fastdl.app/en2"
|
|
self.downloaded_files = set()
|
|
self.use_database = use_database
|
|
self.high_res = high_res
|
|
self.unified_db = unified_db # Store for scraper config access
|
|
self.scraper_id = 'fastdl' # Scraper ID in database
|
|
self.pending_downloads = [] # Track downloads for deferred database recording
|
|
self._cdn_to_pk_map = {} # CDN filename -> Instagram pk map (for browser fallback)
|
|
|
|
# Rate limiting settings (matching InstaLoader improvements)
|
|
self.min_delay = 1 # Minimum delay between downloads (seconds)
|
|
self.max_delay = 3 # Maximum delay between downloads (seconds)
|
|
self.batch_size = 10 # Downloads before longer break
|
|
self.batch_delay_min = 30 # Minimum batch delay (seconds)
|
|
self.batch_delay_max = 60 # Maximum batch delay (seconds)
|
|
self.download_count = 0 # Track downloads for batch delays
|
|
|
|
# Use unified database only
|
|
if unified_db and use_database:
|
|
from modules.unified_database import FastDLDatabaseAdapter
|
|
self.db = FastDLDatabaseAdapter(unified_db)
|
|
else:
|
|
self.db = None
|
|
self.use_database = False
|
|
|
|
# Initialize activity status manager for real-time updates
|
|
from modules.activity_status import get_activity_manager
|
|
self.activity_manager = get_activity_manager(unified_db)
|
|
|
|
# Load scraper configuration from database if available
|
|
self.proxy_url = None
|
|
self.cookie_file = None # Default to None (use database)
|
|
|
|
if unified_db:
|
|
scraper_config = unified_db.get_scraper(self.scraper_id)
|
|
if scraper_config:
|
|
# Get proxy configuration
|
|
if scraper_config.get('proxy_enabled') and scraper_config.get('proxy_url'):
|
|
self.proxy_url = scraper_config['proxy_url']
|
|
self.log(f"Using proxy: {self.proxy_url}", "info")
|
|
|
|
# Fall back to cookie file if no database
|
|
if not unified_db:
|
|
self.cookie_file = Path("cookies/fastdl_cookies.json")
|
|
self.cookie_file.parent.mkdir(exist_ok=True)
|
|
|
|
# User-Agent to match FlareSolverr (dynamically fetched for consistency)
|
|
self.user_agent = get_flaresolverr_user_agent()
|
|
|
|
# Initialize universal Cloudflare handler
|
|
# Pass proxy_url if configured, and cookie_file=None for database storage
|
|
self.cf_handler = CloudflareHandler(
|
|
module_name="FastDL",
|
|
cookie_file=str(self.cookie_file) if self.cookie_file else None,
|
|
user_agent=self.user_agent,
|
|
logger=self.logger,
|
|
aggressive_expiry=True,
|
|
proxy_url=self.proxy_url # Pass proxy to FlareSolverr
|
|
)
|
|
|
|
# Keep for backwards compatibility
|
|
self.flaresolverr_url = self.cf_handler.flaresolverr_url
|
|
|
|
# Load cookies from database if available
|
|
self._load_cookies_from_db()
|
|
self.flaresolverr_enabled = self.cf_handler.flaresolverr_enabled
|
|
|
|
def _load_cookies_from_db(self):
|
|
"""Load cookies from database if available"""
|
|
if not self.unified_db:
|
|
return
|
|
|
|
try:
|
|
cookies = self.unified_db.get_scraper_cookies(self.scraper_id)
|
|
if cookies:
|
|
# Load into CloudflareHandler
|
|
self.cf_handler._cookies = cookies
|
|
self.log(f"Loaded {len(cookies)} cookies from database", "debug")
|
|
except Exception as e:
|
|
self.log(f"Error loading cookies from database: {e}", "warning")
|
|
|
|
def _save_cookies_to_db(self, cookies: list, user_agent: str = None):
|
|
"""Save cookies to database
|
|
|
|
Args:
|
|
cookies: List of cookie dictionaries
|
|
user_agent: User agent to associate with cookies (important for cf_clearance).
|
|
If not provided, uses self.user_agent as fallback.
|
|
"""
|
|
if not self.unified_db:
|
|
return
|
|
|
|
try:
|
|
# Use provided user_agent or fall back to self.user_agent
|
|
ua = user_agent or self.user_agent
|
|
self.unified_db.save_scraper_cookies(
|
|
self.scraper_id,
|
|
cookies,
|
|
user_agent=ua,
|
|
merge=True
|
|
)
|
|
self.log(f"Saved {len(cookies)} cookies to database (UA: {ua[:50]}...)", "debug")
|
|
except Exception as e:
|
|
self.log(f"Error saving cookies to database: {e}", "warning")
|
|
|
|
def _has_valid_cookies(self):
|
|
"""Check if we have valid cookies (either in file or database)"""
|
|
if self.unified_db:
|
|
cookies = self.unified_db.get_scraper_cookies(self.scraper_id)
|
|
return cookies and len(cookies) > 0
|
|
elif self.cookie_file:
|
|
return self.cookie_file.exists()
|
|
return False
|
|
|
|
def _cookies_expired(self):
|
|
"""Check if cookies are expired - delegates to CloudflareHandler"""
|
|
return self.cf_handler.cookies_expired()
|
|
|
|
def _get_cookies_for_requests(self):
|
|
"""Get cookies in format for requests library - delegates to CloudflareHandler"""
|
|
return self.cf_handler.get_cookies_dict()
|
|
|
|
def _get_cookies_via_flaresolverr(self, url="https://fastdl.app/", max_retries=2):
|
|
"""Use FlareSolverr to bypass Cloudflare - delegates to CloudflareHandler
|
|
|
|
Args:
|
|
url: URL to fetch
|
|
max_retries: Maximum number of retry attempts (default: 2)
|
|
|
|
Returns:
|
|
True if cookies obtained successfully, False otherwise
|
|
"""
|
|
success = self.cf_handler.get_cookies_via_flaresolverr(url, max_retries)
|
|
|
|
# Save cookies to database if successful
|
|
if success and self.unified_db:
|
|
cookies_list = self.cf_handler.get_cookies_list()
|
|
if cookies_list:
|
|
# CRITICAL: Get the user_agent from FlareSolverr solution, not self.user_agent
|
|
# cf_clearance cookies are fingerprinted to the browser that solved the challenge
|
|
flaresolverr_ua = self.cf_handler.get_user_agent()
|
|
self._save_cookies_to_db(cookies_list, user_agent=flaresolverr_ua)
|
|
|
|
return success
|
|
|
|
def _media_id_to_shortcode(self, media_id):
|
|
"""Convert Instagram media ID to shortcode
|
|
|
|
Instagram uses a custom base64 alphabet:
|
|
ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_
|
|
|
|
Args:
|
|
media_id: Instagram media ID (string or int)
|
|
|
|
Returns:
|
|
Instagram shortcode string
|
|
"""
|
|
alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
|
|
|
|
# Convert string ID to integer
|
|
media_id = int(media_id)
|
|
|
|
# Convert to base64 shortcode
|
|
shortcode = ''
|
|
while media_id > 0:
|
|
remainder = media_id % 64
|
|
media_id = media_id // 64
|
|
shortcode = alphabet[remainder] + shortcode
|
|
|
|
return shortcode or 'A'
|
|
|
|
def _extract_media_ids_from_fastdl_url(self, url):
|
|
"""Extract Instagram media IDs from FastDL proxied URLs
|
|
|
|
FastDL URLs contain Instagram CDN URLs with media IDs like:
|
|
561378837_18538674661006538_479694548187839800_n.jpg
|
|
|
|
The second number (18538674661006538) is the Instagram media ID
|
|
|
|
Args:
|
|
url: FastDL URL string
|
|
|
|
Returns:
|
|
List of media IDs found in the URL
|
|
"""
|
|
# Pattern: number_MEDIAID_number_n.jpg
|
|
pattern = r'(\d+)_(\d{17,19})_\d+_n\.(jpg|mp4)'
|
|
matches = re.findall(pattern, url)
|
|
|
|
if matches:
|
|
# Return the media ID (second capture group)
|
|
return [match[1] for match in matches]
|
|
|
|
return []
|
|
|
|
def _search_instagram_url_on_fastdl(self, page, instagram_url):
|
|
"""Search for a specific Instagram URL on FastDL to get high-res download links
|
|
|
|
Args:
|
|
page: Playwright page object
|
|
instagram_url: Instagram post URL (e.g., https://www.instagram.com/p/BB3NONxpzK/)
|
|
|
|
Returns:
|
|
List of tuples: [(download_link, file_extension, is_high_res), ...]
|
|
Empty list if search fails
|
|
"""
|
|
try:
|
|
self.log(f"Searching FastDL for: {instagram_url}", "debug")
|
|
|
|
# Navigate to FastDL homepage
|
|
page.goto(self.fastdl_url, wait_until="domcontentloaded", timeout=60000)
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Enter Instagram URL
|
|
input_field = page.locator("input[type='text']").first
|
|
if not input_field or not input_field.is_visible():
|
|
self.log("Could not find FastDL input field", "error")
|
|
return []
|
|
|
|
input_field.fill(instagram_url)
|
|
page.wait_for_timeout(500)
|
|
|
|
# Click download button
|
|
download_button = page.locator("button:has-text('Download')").first
|
|
if not download_button or not download_button.is_visible():
|
|
self.log("Could not find Download button", "error")
|
|
return []
|
|
|
|
download_button.click(force=True)
|
|
self.log("Loading post from Instagram URL...", "debug")
|
|
|
|
# Wait for content to load - Instagram URL searches take longer
|
|
try:
|
|
page.wait_for_selector(".loader-component", timeout=60000, state="detached")
|
|
self.log("Loader dismissed", "debug")
|
|
except Exception:
|
|
self.log("Loader still visible after 60s...", "warning")
|
|
|
|
# Wait additional time for content to render
|
|
page.wait_for_timeout(5000)
|
|
|
|
# Check for errors first
|
|
error_elem = page.locator(".error-message__text").first
|
|
if error_elem and error_elem.is_visible():
|
|
error_text = error_elem.text_content() or "Unknown error"
|
|
self.log(f"FastDL returned error: {error_text}", "error")
|
|
return []
|
|
|
|
# Try waiting for actual content elements
|
|
try:
|
|
page.wait_for_selector(".button__download, a[href*='media.fastdl.app']", timeout=10000)
|
|
self.log("Post content loaded successfully", "debug")
|
|
except Exception:
|
|
self.log("Post content did not load as expected", "warning")
|
|
# Check for error message in HTML
|
|
html = page.content()
|
|
if "Something went wrong" in html or "error-message" in html:
|
|
self.log("FastDL encountered an error fetching this post (may be deleted/unavailable)", "error")
|
|
return []
|
|
|
|
# Extract download links - try multiple selectors
|
|
# FastDL uses button elements with specific classes for download links
|
|
download_links = page.locator("a.button__download, a[href*='media.fastdl.app'], a[href*='.jpg'], a[href*='.mp4']").all()
|
|
|
|
if not download_links:
|
|
self.log("No download links found for this Instagram URL", "warning")
|
|
return []
|
|
|
|
# Analyze links to find high-res versions
|
|
results = []
|
|
for link in download_links:
|
|
href = link.get_attribute("href")
|
|
if not href:
|
|
continue
|
|
|
|
# Determine file type
|
|
ext = ".jpg" if ".jpg" in href else ".mp4"
|
|
|
|
# Check if it's high-res by looking for resolution indicators
|
|
is_high_res = False
|
|
if 'p1080x1080' in href or 'p1440x1440' in href or 'p2048x2048' in href:
|
|
is_high_res = True
|
|
elif 'p640x640' in href:
|
|
is_high_res = False
|
|
else:
|
|
# No resolution indicator, assume it might be high-res
|
|
is_high_res = True
|
|
|
|
results.append((href, ext, is_high_res))
|
|
|
|
# Filter to only high-res links if available
|
|
high_res_only = [r for r in results if r[2]]
|
|
if high_res_only:
|
|
self.log(f"Found {len(high_res_only)} high-res download link(s)", "info")
|
|
return high_res_only
|
|
else:
|
|
self.log(f"Found {len(results)} download link(s) (resolution unknown)", "info")
|
|
return results
|
|
|
|
except Exception as e:
|
|
self.log(f"Error searching Instagram URL on FastDL: {e}", "error")
|
|
return []
|
|
|
|
def _fetch_highres_via_api_convert(self, page, instagram_url):
|
|
"""Trigger FastDL to process an Instagram URL and intercept the /api/convert response.
|
|
|
|
Navigates to FastDL, enters the Instagram URL, clicks Download, and captures
|
|
the POST /api/convert response that FastDL makes internally.
|
|
|
|
Args:
|
|
page: Playwright page object
|
|
instagram_url: Instagram post URL (e.g. https://instagram.com/p/SHORTCODE/)
|
|
|
|
Returns:
|
|
Parsed JSON list from /api/convert response, or None on failure.
|
|
"""
|
|
convert_response = [None] # mutable container for closure
|
|
|
|
def _intercept_convert(response):
|
|
try:
|
|
if '/api/convert' in response.url and response.status == 200:
|
|
content_type = response.headers.get('content-type', '')
|
|
if 'json' in content_type:
|
|
convert_response[0] = response.json()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
page.on("response", _intercept_convert)
|
|
|
|
# Navigate to FastDL homepage
|
|
page.goto(self.fastdl_url, wait_until="domcontentloaded", timeout=60000)
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Dismiss any consent overlay
|
|
self._dismiss_consent_dialog(page)
|
|
|
|
# Enter Instagram URL
|
|
input_field = page.locator("input[type='text']").first
|
|
if not input_field or not input_field.is_visible():
|
|
self.log("Could not find FastDL input field for /api/convert", "error")
|
|
return None
|
|
|
|
input_field.fill(instagram_url)
|
|
page.wait_for_timeout(500)
|
|
|
|
# Click download button
|
|
download_button = page.locator("button:has-text('Download')").first
|
|
if not download_button or not download_button.is_visible():
|
|
self.log("Could not find Download button for /api/convert", "error")
|
|
return None
|
|
|
|
download_button.click(force=True)
|
|
self.log(f"Waiting for /api/convert response for {instagram_url}...", "debug")
|
|
|
|
# Poll until response captured or timeout (30s)
|
|
for _ in range(60):
|
|
if convert_response[0] is not None:
|
|
break
|
|
page.wait_for_timeout(500)
|
|
|
|
if convert_response[0] is None:
|
|
self.log(f"Timeout waiting for /api/convert response for {instagram_url}", "warning")
|
|
return None
|
|
|
|
self.log(f"Captured /api/convert response with {len(convert_response[0])} item(s)", "debug")
|
|
return convert_response[0]
|
|
|
|
except Exception as e:
|
|
self.log(f"Error fetching /api/convert for {instagram_url}: {e}", "error")
|
|
return None
|
|
finally:
|
|
try:
|
|
page.remove_listener("response", _intercept_convert)
|
|
except Exception:
|
|
pass
|
|
|
|
def _extract_highres_items_from_convert_response(self, convert_data, shortcode, fallback_date=None):
|
|
"""Parse /api/convert response into download items suitable for _download_items_parallel().
|
|
|
|
Args:
|
|
convert_data: JSON list from /api/convert response
|
|
shortcode: Instagram shortcode for this post
|
|
fallback_date: Fallback datetime if meta.taken_at is missing
|
|
|
|
Returns:
|
|
List of dicts with keys: download_url, filename, media_id, normalized_media_id,
|
|
post_date, ext, metadata
|
|
"""
|
|
items = []
|
|
profile = self.profile_name or "unknown"
|
|
|
|
if not isinstance(convert_data, list):
|
|
convert_data = [convert_data]
|
|
|
|
for idx, entry in enumerate(convert_data):
|
|
try:
|
|
# Extract download URL — first url entry has highest res
|
|
url_list = entry.get('url', [])
|
|
if not url_list:
|
|
continue
|
|
best_url = url_list[0]
|
|
download_url = best_url.get('url', '')
|
|
if not download_url:
|
|
continue
|
|
|
|
ext_raw = best_url.get('ext', 'jpg')
|
|
ext = f".{ext_raw}" if not ext_raw.startswith('.') else ext_raw
|
|
|
|
# Extract metadata
|
|
meta = entry.get('meta', {})
|
|
taken_at = meta.get('taken_at', 0)
|
|
post_date = datetime.fromtimestamp(taken_at) if taken_at else fallback_date
|
|
caption = meta.get('title', '')
|
|
post_shortcode = meta.get('shortcode', shortcode)
|
|
|
|
# Extract media_id from thumb URL's filename= param or uri= param
|
|
media_id = None
|
|
thumb_url = entry.get('thumb', '')
|
|
|
|
if thumb_url and 'filename=' in thumb_url:
|
|
try:
|
|
parsed = urllib.parse.urlparse(thumb_url)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
fn = params.get('filename', [''])[0]
|
|
if fn:
|
|
media_id = Path(fn).stem
|
|
except Exception:
|
|
pass
|
|
|
|
if not media_id and thumb_url and 'uri=' in thumb_url:
|
|
try:
|
|
parsed = urllib.parse.urlparse(thumb_url)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
uri = params.get('uri', [''])[0]
|
|
if uri:
|
|
media_id = self._extract_media_id_from_cdn_url(uri)
|
|
except Exception:
|
|
pass
|
|
|
|
if not media_id and download_url and 'uri=' in download_url:
|
|
try:
|
|
parsed = urllib.parse.urlparse(download_url)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
uri = params.get('uri', [''])[0]
|
|
if uri:
|
|
media_id = self._extract_media_id_from_cdn_url(uri)
|
|
except Exception:
|
|
pass
|
|
|
|
if not media_id:
|
|
# Final fallback: shortcode + index
|
|
media_id = f"{post_shortcode}_{idx}" if len(convert_data) > 1 else post_shortcode
|
|
|
|
normalized = extract_instagram_media_id(media_id) if media_id else media_id
|
|
|
|
date_str = post_date.strftime('%Y%m%d_%H%M%S') if post_date else datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f"{profile}_{date_str}_{media_id}{ext}"
|
|
|
|
items.append({
|
|
'media_id': media_id,
|
|
'normalized_media_id': normalized,
|
|
'download_url': download_url,
|
|
'filename': filename,
|
|
'post_date': post_date,
|
|
'ext': ext,
|
|
'shortcode': post_shortcode,
|
|
'caption': caption,
|
|
'metadata': {'high_res': True, 'instagram_url': f"https://www.instagram.com/p/{post_shortcode}/"},
|
|
})
|
|
|
|
except Exception as e:
|
|
self.log(f"Error parsing /api/convert entry {idx}: {e}", "debug")
|
|
continue
|
|
|
|
return items
|
|
|
|
def _check_post_phrases(self, page, phrase_config):
|
|
"""
|
|
Check if post contains required phrases
|
|
|
|
Args:
|
|
page: Playwright page object
|
|
phrase_config: Phrase search configuration
|
|
|
|
Returns:
|
|
True if post matches phrase criteria, False otherwise
|
|
"""
|
|
try:
|
|
# Get post caption/text from FastDL detail page
|
|
# The caption is typically in p.media-content__caption on the detail page
|
|
caption_selectors = [
|
|
'p.media-content__caption', # Primary caption selector on detail page
|
|
'.media-content__caption',
|
|
'.caption',
|
|
'.post-caption',
|
|
'div[class*="caption"]',
|
|
'p[class*="caption"]',
|
|
'.media-content__description',
|
|
'div.content',
|
|
'p.content'
|
|
]
|
|
|
|
post_text = ""
|
|
for selector in caption_selectors:
|
|
try:
|
|
elements = page.locator(selector).all()
|
|
for element in elements:
|
|
if element.is_visible():
|
|
text = element.text_content() or ""
|
|
if text:
|
|
post_text += " " + text
|
|
except Exception:
|
|
continue
|
|
|
|
# Also check any visible text in media content area
|
|
try:
|
|
media_content = page.locator('.media-content, .post-content').first
|
|
if media_content.count() > 0:
|
|
post_text += " " + (media_content.text_content() or "")
|
|
except Exception:
|
|
pass
|
|
|
|
if not post_text:
|
|
self.log("Could not extract post text for phrase matching", "debug")
|
|
# If we can't get text, default to downloading (avoid false negatives)
|
|
return True
|
|
|
|
# Clean up text
|
|
post_text = ' '.join(post_text.split()) # Normalize whitespace
|
|
|
|
phrases = phrase_config.get('phrases', [])
|
|
if not phrases:
|
|
return True # No phrases to match = match all
|
|
|
|
case_sensitive = phrase_config.get('case_sensitive', False)
|
|
match_all = phrase_config.get('match_all', False)
|
|
|
|
if not case_sensitive:
|
|
post_text = post_text.lower()
|
|
phrases = [p.lower() for p in phrases]
|
|
|
|
# Check phrase matching
|
|
matches = []
|
|
for phrase in phrases:
|
|
if phrase in post_text:
|
|
matches.append(phrase)
|
|
self.log(f"Found phrase match: '{phrase}'", "debug")
|
|
|
|
if match_all:
|
|
# All phrases must be found
|
|
result = len(matches) == len(phrases)
|
|
else:
|
|
# At least one phrase must be found
|
|
result = len(matches) > 0
|
|
|
|
if result:
|
|
self.log(f"Post matches phrase criteria ({len(matches)}/{len(phrases)} phrases found)", "info")
|
|
else:
|
|
self.log(f"Post does not match phrase criteria ({len(matches)}/{len(phrases)} phrases found)", "info")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
self.log(f"Error checking phrases: {e}", "error")
|
|
# On error, default to downloading (avoid false negatives)
|
|
return True
|
|
|
|
def _dismiss_consent_dialog(self, page):
|
|
"""Dismiss cookie consent / GDPR overlay if present (Google FundingChoices)."""
|
|
try:
|
|
consent_btn = page.locator(
|
|
'button.fc-cta-consent, '
|
|
'button.fc-cta-do-not-consent, '
|
|
'button[aria-label="Consent"], '
|
|
'button.fc-dismiss-button, '
|
|
'.fc-dialog button.fc-primary-button'
|
|
).first
|
|
if consent_btn.count() > 0 and consent_btn.is_visible():
|
|
consent_btn.click(force=True)
|
|
self.log("Dismissed consent dialog", "debug")
|
|
import time
|
|
time.sleep(0.5)
|
|
return
|
|
overlay = page.locator('.fc-consent-root, .fc-dialog-overlay').first
|
|
if overlay.count() > 0:
|
|
page.evaluate("document.querySelectorAll('.fc-consent-root, .fc-dialog-overlay, .fc-dialog-container').forEach(el => el.remove())")
|
|
self.log("Removed consent overlay via JS", "debug")
|
|
except Exception:
|
|
pass
|
|
|
|
def _smart_delay(self):
|
|
"""Implement smart delays with randomization to avoid detection"""
|
|
self.download_count += 1
|
|
|
|
# Check if we need a batch delay
|
|
if self.download_count % self.batch_size == 0:
|
|
delay = random.uniform(self.batch_delay_min, self.batch_delay_max)
|
|
self.log(f"Batch delay: waiting {delay:.1f} seconds after {self.download_count} downloads", "debug")
|
|
else:
|
|
# Regular delay with randomization
|
|
delay = random.uniform(self.min_delay, self.max_delay)
|
|
self.log(f"Waiting {delay:.1f} seconds before next download", "debug")
|
|
|
|
time.sleep(delay)
|
|
|
|
def _update_all_timestamps(self, filepath, post_date):
|
|
"""Update all timestamps for a file: filesystem and EXIF
|
|
|
|
Args:
|
|
filepath: Path to the file
|
|
post_date: datetime object with the target date/time
|
|
"""
|
|
if not post_date:
|
|
return
|
|
|
|
timestamp = post_date.timestamp()
|
|
|
|
# 1. Update file system timestamps (access time and modification time)
|
|
try:
|
|
os.utime(filepath, (timestamp, timestamp))
|
|
self.log(f"Updated file timestamps to {post_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
except Exception as e:
|
|
self.log(f"Failed to update file timestamps: {e}", "error")
|
|
|
|
# 2. Update creation time (platform-specific)
|
|
try:
|
|
if platform.system() == 'Darwin': # macOS
|
|
# Use SetFile command on macOS to set creation date
|
|
date_str = post_date.strftime('%m/%d/%Y %H:%M:%S')
|
|
subprocess.run(
|
|
['SetFile', '-d', date_str, str(filepath)],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
elif platform.system() == 'Windows':
|
|
# On Windows, we can use PowerShell to set creation time
|
|
# Escape special characters to prevent command injection
|
|
filepath_escaped = str(filepath).replace("'", "''") # PowerShell single-quote escape
|
|
# isoformat() produces safe strings like "2024-01-15T10:30:00" but escape anyway
|
|
date_escaped = post_date.isoformat().replace("'", "''")
|
|
ps_command = f"(Get-Item -LiteralPath '{filepath_escaped}').CreationTime = Get-Date '{date_escaped}'"
|
|
subprocess.run(
|
|
['powershell', '-Command', ps_command],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
# Linux doesn't support changing creation time
|
|
except Exception as e:
|
|
# SetFile might not be available on newer macOS versions
|
|
pass
|
|
|
|
# 3. Update EXIF data for images
|
|
if str(filepath).lower().endswith(('.jpg', '.jpeg', '.png')):
|
|
self._update_exif_timestamp(filepath, post_date)
|
|
|
|
# 4. Update MP4 metadata for videos
|
|
if str(filepath).lower().endswith(('.mp4', '.mov')):
|
|
self._update_video_metadata(filepath, post_date)
|
|
|
|
def _update_exif_timestamp(self, filepath, post_date):
|
|
"""Update EXIF timestamps in image files
|
|
|
|
Requires exiftool to be installed: brew install exiftool (macOS) or apt-get install exiftool (Linux)
|
|
"""
|
|
try:
|
|
# Check if exiftool is available
|
|
result = subprocess.run(['which', 'exiftool'], capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
# Try to use piexif as fallback if available
|
|
try:
|
|
import piexif
|
|
self._update_exif_with_piexif(filepath, post_date)
|
|
except ImportError:
|
|
pass # Silently skip if no EXIF tools available
|
|
return
|
|
|
|
# Format date for EXIF
|
|
exif_date = post_date.strftime('%Y:%m:%d %H:%M:%S')
|
|
|
|
# Update all date fields in EXIF including MetadataDate for Immich
|
|
cmd = [
|
|
'exiftool', '-overwrite_original', '-quiet',
|
|
f'-AllDates={exif_date}',
|
|
f'-MetadataDate={exif_date}',
|
|
'-HistoryWhen=',
|
|
f'-FileModifyDate={exif_date}',
|
|
str(filepath)
|
|
]
|
|
|
|
subprocess.run(cmd, capture_output=True, text=True)
|
|
self.log(f"Updated EXIF timestamps to {post_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
except Exception as e:
|
|
pass # Silently skip EXIF updates if tools not available
|
|
|
|
def _update_exif_with_piexif(self, filepath, post_date):
|
|
"""Update EXIF using piexif library as fallback"""
|
|
try:
|
|
import piexif
|
|
from PIL import Image
|
|
|
|
# Format date for EXIF
|
|
exif_date = post_date.strftime('%Y:%m:%d %H:%M:%S').encode('utf-8')
|
|
|
|
# Load existing EXIF or create new
|
|
exif_dict = {'0th': {}, 'Exif': {}, 'GPS': {}, 'Interop': {}, '1st': {}, 'thumbnail': None}
|
|
try:
|
|
with Image.open(filepath) as img:
|
|
exif_dict = piexif.load(img.info.get('exif', b''))
|
|
except Exception:
|
|
pass # Use default empty dict
|
|
|
|
# Update date fields
|
|
exif_dict['0th'][piexif.ImageIFD.DateTime] = exif_date
|
|
exif_dict['Exif'][piexif.ExifIFD.DateTimeOriginal] = exif_date
|
|
exif_dict['Exif'][piexif.ExifIFD.DateTimeDigitized] = exif_date
|
|
|
|
# Save with updated EXIF
|
|
exif_bytes = piexif.dump(exif_dict)
|
|
with Image.open(filepath) as img:
|
|
img.save(filepath, exif=exif_bytes)
|
|
|
|
self.log(f"Updated EXIF with piexif to {post_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
except Exception as e:
|
|
pass # Silently skip if piexif not available
|
|
|
|
def _update_video_metadata(self, filepath, post_date):
|
|
"""Update MP4/MOV video metadata timestamps
|
|
|
|
Uses ffmpeg if available to update video metadata
|
|
"""
|
|
try:
|
|
# Check if ffmpeg is available
|
|
result = subprocess.run(['which', 'ffmpeg'], capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
return # ffmpeg not available
|
|
|
|
# Format date for video metadata
|
|
meta_date = post_date.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
# Create temp file
|
|
temp_file = str(filepath) + '.temp.mp4'
|
|
|
|
# Update metadata using ffmpeg
|
|
cmd = [
|
|
'ffmpeg', '-i', str(filepath),
|
|
'-metadata', f'creation_time={post_date.isoformat()}Z',
|
|
'-metadata', f'date={meta_date}',
|
|
'-c', 'copy', # Copy streams without re-encoding
|
|
'-y', # Overwrite
|
|
temp_file
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode == 0:
|
|
# Replace original with temp file
|
|
os.replace(temp_file, filepath)
|
|
# Re-apply file timestamps (os.replace creates a new file with current mtime)
|
|
timestamp = post_date.timestamp()
|
|
os.utime(str(filepath), (timestamp, timestamp))
|
|
self.log(f"Updated video metadata to {post_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
else:
|
|
# Clean up temp file if it exists
|
|
if os.path.exists(temp_file):
|
|
os.remove(temp_file)
|
|
|
|
except Exception as e:
|
|
pass # Silently skip video metadata updates
|
|
|
|
|
|
def _is_already_downloaded(self, media_id):
|
|
"""Check if media_id has already been downloaded (uses centralized function)"""
|
|
if not self.use_database:
|
|
return False
|
|
|
|
# Use centralized function for consistent cross-module detection
|
|
return is_instagram_downloaded(self.db.db if hasattr(self.db, 'db') else self.db, media_id)
|
|
|
|
def _record_download(self, media_id, username, content_type, filename,
|
|
download_url=None, post_date=None, metadata=None, deferred=False):
|
|
"""Record a successful download in the database (uses centralized function)
|
|
|
|
Args:
|
|
deferred: If True, don't record to database now - add to pending_downloads list
|
|
for later recording after file move is complete
|
|
"""
|
|
# If deferred, store for later recording instead of recording now
|
|
if deferred:
|
|
file_path = str(filename) # Full path
|
|
filename_only = Path(filename).name # Just the filename
|
|
self.pending_downloads.append({
|
|
'media_id': media_id,
|
|
'username': username,
|
|
'filename': filename_only,
|
|
'url': download_url,
|
|
'post_date': post_date.isoformat() if post_date else None,
|
|
'file_path': file_path,
|
|
'content_type': content_type,
|
|
'metadata': metadata
|
|
})
|
|
self.log(f"Deferred recording for {media_id}", "debug")
|
|
return True
|
|
|
|
if not self.use_database:
|
|
self.log(f"Database recording disabled (use_database=False)", "debug")
|
|
return
|
|
|
|
# Extract just the filename from the full path for database
|
|
file_path = str(filename) # Full path
|
|
filename_only = Path(filename).name # Just the filename
|
|
|
|
self.log(f"Recording download in database: filename={filename_only}, media_id={media_id}, user={username}", "debug")
|
|
|
|
# Use centralized function for consistent cross-module storage
|
|
result = record_instagram_download(
|
|
db=self.db.db if hasattr(self.db, 'db') else self.db,
|
|
media_id=media_id,
|
|
username=username,
|
|
content_type=content_type,
|
|
filename=filename_only,
|
|
download_url=download_url,
|
|
post_date=post_date,
|
|
file_path=file_path,
|
|
method='fastdl',
|
|
extra_metadata=metadata
|
|
)
|
|
|
|
if result:
|
|
self.log(f"Successfully recorded download for {filename_only}", "debug")
|
|
else:
|
|
self.log(f"Failed to record download for {filename_only} (possibly duplicate)", "debug")
|
|
|
|
return result
|
|
|
|
def get_pending_downloads(self):
|
|
"""Get list of downloads that were deferred for later recording"""
|
|
return self.pending_downloads.copy()
|
|
|
|
def clear_pending_downloads(self):
|
|
"""Clear the pending downloads list after they've been recorded"""
|
|
self.pending_downloads = []
|
|
|
|
def _record_checked(self, media_id, username, content_type, reason="checked", post_date=None):
|
|
"""Record that a post was checked but not downloaded
|
|
|
|
Args:
|
|
media_id: The media ID that was checked
|
|
username: Instagram username
|
|
content_type: Type of content
|
|
reason: Reason for skipping ('old_post', 'phrase_checked', 'checked')
|
|
post_date: Optional post date
|
|
"""
|
|
if not self.use_database:
|
|
return
|
|
|
|
# Create a marker filename similar to ImgInn
|
|
marker_filename = f"_{reason}_{media_id}"
|
|
|
|
# Use centralized function for consistent cross-module storage
|
|
return record_instagram_download(
|
|
db=self.db.db if hasattr(self.db, 'db') else self.db,
|
|
media_id=media_id,
|
|
username=username,
|
|
content_type=content_type,
|
|
filename=marker_filename,
|
|
post_date=post_date,
|
|
method='fastdl',
|
|
extra_metadata={'marker': True, 'reason': reason}
|
|
)
|
|
|
|
def reset_database(self, username=None, content_type=None):
|
|
"""Reset database by removing tracking records
|
|
|
|
Args:
|
|
username: If specified, only reset records for this user
|
|
content_type: If specified, only reset records for this content type
|
|
|
|
Returns:
|
|
Number of records deleted
|
|
"""
|
|
if not self.use_database or not self.db:
|
|
self.log("Database is disabled")
|
|
return 0
|
|
|
|
# Use unified database
|
|
return self.db.reset_database(username, content_type)
|
|
|
|
def remove_tracking(self, media_ids):
|
|
"""Remove specific media IDs from tracking
|
|
|
|
Args:
|
|
media_ids: Single media_id string or list of media_ids to remove
|
|
|
|
Returns:
|
|
Number of records deleted
|
|
"""
|
|
if not self.use_database or not self.db:
|
|
return 0
|
|
|
|
# Use unified database
|
|
return self.db.remove_tracking(media_ids)
|
|
|
|
def get_tracked_items(self, username=None, content_type=None):
|
|
"""Get list of tracked items from database
|
|
|
|
Args:
|
|
username: Filter by username
|
|
content_type: Filter by content type
|
|
|
|
Returns:
|
|
List of dictionaries with tracking info
|
|
"""
|
|
if not self.use_database or not self.db:
|
|
return []
|
|
|
|
# Use unified database
|
|
return self.db.get_tracked_items(username, content_type)
|
|
|
|
def get_database_stats(self):
|
|
"""Get statistics about the database
|
|
|
|
Returns:
|
|
Dictionary with database statistics
|
|
"""
|
|
if not self.use_database or not self.db:
|
|
return {'enabled': False}
|
|
|
|
# Use unified database
|
|
return self.db.get_database_stats()
|
|
|
|
def download(self, username, content_type="all", output_dir="downloads",
|
|
max_downloads=None, days_back=None, date_from=None, date_to=None,
|
|
phrase_config=None, defer_database=False):
|
|
"""
|
|
Download content from Instagram via FastDL
|
|
|
|
Args:
|
|
username: Instagram username or URL
|
|
content_type: Type of content ('posts', 'stories', 'reels', 'highlights', 'all')
|
|
output_dir: Directory to save downloads
|
|
max_downloads: Maximum number of items to download
|
|
days_back: Number of days back to download posts/reels
|
|
date_from: Start date for range (datetime object or YYYY-MM-DD string)
|
|
date_to: End date for range (datetime object or YYYY-MM-DD string)
|
|
phrase_config: Optional phrase search configuration for posts/reels
|
|
{
|
|
'enabled': bool,
|
|
'phrases': list of phrases to search for,
|
|
'case_sensitive': bool,
|
|
'match_all': bool (True = all phrases must match, False = any phrase)
|
|
}
|
|
defer_database: If True, don't record to database immediately - store in
|
|
pending_downloads for later recording after file move is complete
|
|
|
|
Returns:
|
|
Number of successfully downloaded items
|
|
"""
|
|
# Clear downloaded_files cache between accounts to prevent memory growth
|
|
self.downloaded_files.clear()
|
|
|
|
# Check site status before doing anything else
|
|
self.log("Checking FastDL site status...", "debug")
|
|
site_status, error_msg = self.cf_handler.check_site_status("https://fastdl.app/", timeout=10)
|
|
|
|
if self.cf_handler.should_skip_download(site_status):
|
|
self.log(f"Skipping download - FastDL is unavailable: {error_msg}", "warning")
|
|
return 0
|
|
elif site_status == SiteStatus.CLOUDFLARE_CHALLENGE:
|
|
self.log("Cloudflare challenge detected, will attempt bypass during download", "info")
|
|
|
|
# Setup
|
|
self.username = username
|
|
self.content_type = content_type
|
|
self.output_dir = Path(output_dir)
|
|
# Don't create output_dir here - only create when we have files to download
|
|
self.max_downloads = max_downloads
|
|
self.phrase_config = phrase_config
|
|
self.defer_database = defer_database # Store for deferred recording
|
|
|
|
# Extract profile name
|
|
self.profile_name = self._extract_profile_name(username)
|
|
|
|
# Setup date filtering
|
|
self._setup_date_filtering(days_back, date_from, date_to)
|
|
|
|
# Scan existing files
|
|
self._scan_existing_files()
|
|
|
|
# Run the download
|
|
return self._run_download()
|
|
|
|
def download_multi(self, username, content_types, output_dirs,
|
|
max_downloads=None, days_back=None, date_from=None, date_to=None,
|
|
phrase_configs=None, defer_database=False):
|
|
"""Download multiple content types in a single browser session.
|
|
|
|
Args:
|
|
username: Instagram username
|
|
content_types: List like ['stories', 'reels', 'posts']
|
|
output_dirs: Dict {content_type: output_dir_path}
|
|
phrase_configs: Dict {content_type: phrase_config} or None
|
|
(other args same as download())
|
|
|
|
Returns:
|
|
Dict: {content_type: {'count': N, 'pending_downloads': [...]}}
|
|
"""
|
|
# Clear downloaded_files cache between accounts to prevent memory growth
|
|
self.downloaded_files.clear()
|
|
|
|
# Check site status before doing anything else
|
|
self.log("Checking FastDL site status...", "debug")
|
|
site_status, error_msg = self.cf_handler.check_site_status("https://fastdl.app/", timeout=10)
|
|
|
|
if self.cf_handler.should_skip_download(site_status):
|
|
self.log(f"Skipping download - FastDL is unavailable: {error_msg}", "warning")
|
|
return {ct: {'count': 0, 'pending_downloads': []} for ct in content_types}
|
|
elif site_status == SiteStatus.CLOUDFLARE_CHALLENGE:
|
|
self.log("Cloudflare challenge detected, will attempt bypass during download", "info")
|
|
|
|
# Setup
|
|
self.username = username
|
|
self.profile_name = self._extract_profile_name(username)
|
|
self.max_downloads = max_downloads
|
|
self.defer_database = defer_database
|
|
|
|
# Setup date filtering
|
|
self._setup_date_filtering(days_back, date_from, date_to)
|
|
|
|
# Do NOT call _scan_existing_files() here — done per content type inside _run_download_multi()
|
|
|
|
# Run the multi-content download
|
|
return self._run_download_multi(content_types, output_dirs, phrase_configs or {})
|
|
|
|
def _run_download_multi(self, content_types, output_dirs, phrase_configs):
|
|
"""Single browser session for all content types.
|
|
|
|
Args:
|
|
content_types: List of content types to download
|
|
output_dirs: Dict {content_type: output_dir_path}
|
|
phrase_configs: Dict {content_type: phrase_config}
|
|
|
|
Returns:
|
|
Dict: {content_type: {'count': N, 'pending_downloads': [...]}}
|
|
"""
|
|
results = {}
|
|
|
|
# Try to get fresh cookies via FlareSolverr if we don't have them or they're old
|
|
if not self._has_valid_cookies() or self._cookies_expired():
|
|
self.log("Cookies missing or expired, attempting FlareSolverr bypass...", "info")
|
|
if self._get_cookies_via_flaresolverr():
|
|
self.log("Successfully got fresh cookies from FlareSolverr", "info")
|
|
else:
|
|
self.log("FlareSolverr unavailable, will try with Playwright", "warning")
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(
|
|
headless=self.headless,
|
|
args=[
|
|
'--disable-blink-features=AutomationControlled',
|
|
'--disable-infobars',
|
|
'--disable-background-timer-throttling',
|
|
'--disable-backgrounding-occluded-windows',
|
|
'--disable-renderer-backgrounding'
|
|
]
|
|
)
|
|
|
|
# CRITICAL: Browser fingerprint must match FlareSolverr for cookies to work
|
|
context_options = get_playwright_context_options()
|
|
context_options['accept_downloads'] = True
|
|
context_options['ignore_https_errors'] = True
|
|
|
|
# Use stored cookie user_agent if available
|
|
try:
|
|
if self.unified_db:
|
|
stored_user_agent = self.unified_db.get_scraper_cookies_user_agent(self.scraper_id)
|
|
if stored_user_agent:
|
|
self.log(f"Using stored cookie user_agent: {stored_user_agent[:50]}...", "debug")
|
|
context_options['user_agent'] = stored_user_agent
|
|
else:
|
|
self.log(f"Using fingerprint: Chrome {context_options.get('extra_http_headers', {}).get('Sec-Ch-Ua', 'unknown')[:30]}...", "debug")
|
|
else:
|
|
self.log(f"Using fingerprint: Chrome {context_options.get('extra_http_headers', {}).get('Sec-Ch-Ua', 'unknown')[:30]}...", "debug")
|
|
except Exception as e:
|
|
self.log(f"Error getting stored user_agent, using default: {e}", "debug")
|
|
|
|
context = browser.new_context(**context_options)
|
|
|
|
# Load cookies from database or file
|
|
cookies_loaded = False
|
|
if self.unified_db:
|
|
try:
|
|
cookies = self.unified_db.get_scraper_cookies(self.scraper_id)
|
|
if cookies:
|
|
cleaned_cookies = []
|
|
for cookie in cookies:
|
|
cleaned = {k: v for k, v in cookie.items()
|
|
if k not in ['partitionKey', '_crHasCrossSiteAncestor']}
|
|
if 'expiry' in cleaned and 'expires' not in cleaned:
|
|
cleaned['expires'] = cleaned.pop('expiry')
|
|
cleaned_cookies.append(cleaned)
|
|
|
|
try:
|
|
context.clear_cookies()
|
|
except Exception:
|
|
pass
|
|
|
|
context.add_cookies(cleaned_cookies)
|
|
self.log(f"Loaded {len(cleaned_cookies)} cookies from database", "debug")
|
|
cookies_loaded = True
|
|
except Exception as e:
|
|
self.log(f"Error loading cookies from database: {e}", "warning")
|
|
|
|
# Fallback to file-based cookies
|
|
if not cookies_loaded and self.cookie_file and self.cookie_file.exists():
|
|
try:
|
|
with open(self.cookie_file, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
cookies = data.get('cookies', [])
|
|
if cookies:
|
|
cleaned_cookies = []
|
|
for cookie in cookies:
|
|
cleaned = dict(cookie)
|
|
if 'expiry' in cleaned and 'expires' not in cleaned:
|
|
cleaned['expires'] = cleaned.pop('expiry')
|
|
cleaned_cookies.append(cleaned)
|
|
|
|
try:
|
|
context.clear_cookies()
|
|
except Exception:
|
|
pass
|
|
|
|
context.add_cookies(cleaned_cookies)
|
|
self.log(f"Loaded {len(cleaned_cookies)} cookies from file", "debug")
|
|
except Exception as e:
|
|
self.log(f"Failed to load cookies: {e}", "warning")
|
|
|
|
# Handle popups
|
|
def handle_popup(page):
|
|
if len(context.pages) > 1:
|
|
self.log("Blocking popup")
|
|
page.close()
|
|
|
|
context.on("page", handle_popup)
|
|
|
|
page = context.new_page()
|
|
page.on("popup", lambda popup: popup.close())
|
|
|
|
# Add anti-detection scripts
|
|
page.add_init_script(get_playwright_stealth_scripts())
|
|
|
|
try:
|
|
# Intercept all API responses to discover FastDL's backend endpoints
|
|
api_responses = []
|
|
|
|
def _capture_api_response(response):
|
|
try:
|
|
url = response.url
|
|
if 'fastdl.app' in url and response.status == 200:
|
|
content_type_header = response.headers.get('content-type', '')
|
|
if 'json' in content_type_header:
|
|
try:
|
|
body = response.json()
|
|
api_responses.append({
|
|
'url': url,
|
|
'body': body,
|
|
'size': len(str(body)),
|
|
})
|
|
endpoint = url.split('/')[-1].split('?')[0]
|
|
if isinstance(body, dict) and 'result' in body:
|
|
result = body['result']
|
|
if isinstance(result, list):
|
|
self.log(f"[API] Captured {endpoint}: {len(result)} items", "info")
|
|
elif isinstance(result, dict) and 'edges' in result:
|
|
self.log(f"[API] Captured {endpoint}: {len(result['edges'])} edges (count: {result.get('count', '?')})", "info")
|
|
else:
|
|
self.log(f"[API] Captured {endpoint}", "info")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
page.on("response", _capture_api_response)
|
|
|
|
# Navigate to FastDL
|
|
self.log(f"Navigating to FastDL...")
|
|
page.goto(self.fastdl_url, wait_until="domcontentloaded", timeout=60000)
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Enter username
|
|
input_field = page.locator("input[type='text']").first
|
|
if not input_field or not input_field.is_visible():
|
|
self.log("Could not find input field", "error")
|
|
return {ct: {'count': 0, 'pending_downloads': []} for ct in content_types}
|
|
|
|
self.log(f"Entering username: {self.username}")
|
|
input_field.fill(self.username)
|
|
page.wait_for_timeout(500)
|
|
|
|
# Click download button
|
|
download_button = page.locator("button:has-text('Download')").first
|
|
if download_button and download_button.is_visible():
|
|
download_button.click(force=True)
|
|
self.log("Loading profile...")
|
|
|
|
try:
|
|
self.log("Waiting for profile to load...")
|
|
page.wait_for_selector(".loader-component", timeout=30000, state="detached")
|
|
self.log("Profile loading complete")
|
|
except PlaywrightTimeout:
|
|
self.log("Profile still loading after 30s, continuing anyway...", "warning")
|
|
except Exception as e:
|
|
self.log(f"Error waiting for loader: {e}", "debug")
|
|
|
|
page.wait_for_timeout(2000)
|
|
|
|
try:
|
|
page.wait_for_selector("ul.tabs-component", timeout=5000, state="attached")
|
|
tabs_count = page.locator("button.tabs-component__button").count()
|
|
if tabs_count > 0:
|
|
self.log(f"Profile loaded successfully - found {tabs_count} tabs")
|
|
else:
|
|
self.log("Tabs container found but no buttons, waiting...", "warning")
|
|
page.wait_for_timeout(5000)
|
|
except PlaywrightTimeout:
|
|
self.log("Tabs container not found after 5s, continuing anyway...", "warning")
|
|
except Exception as e:
|
|
self.log(f"Error checking tabs: {e}", "warning")
|
|
|
|
# Dismiss consent dialog
|
|
self._dismiss_consent_dialog(page)
|
|
|
|
# Scroll to load all paginated posts within date range
|
|
# Only scroll when posts content type is requested — reels uses
|
|
# the initial postsV2 capture (first page) without extra scrolling
|
|
if 'posts' in content_types:
|
|
self._scroll_to_load_api_posts(page, api_responses)
|
|
|
|
# Track API responses per content type by recording list boundaries
|
|
# Initial profile load + scrolling captures posts/postsV2 — used by both posts and reels
|
|
# (postsV2 contains all timeline content; _extract_posts_from_api filters by type)
|
|
api_responses_for = {}
|
|
initial_responses = list(api_responses) # snapshot after scrolling
|
|
if 'posts' in content_types:
|
|
api_responses_for['posts'] = initial_responses
|
|
if 'reels' in content_types:
|
|
api_responses_for['reels'] = initial_responses # same postsV2, filtered in extraction
|
|
|
|
# Click non-default tabs and capture their API responses separately
|
|
for ct in content_types:
|
|
if ct != 'posts':
|
|
start_idx = len(api_responses)
|
|
self.content_type = ct
|
|
self._navigate_to_content_tab(page)
|
|
# Stories has its own /stories endpoint — use only responses from its tab click
|
|
if ct == 'stories':
|
|
api_responses_for[ct] = api_responses[start_idx:]
|
|
|
|
# Process each content type in order: stories -> reels -> posts
|
|
# Posts go last because _download_highres_via_api_convert navigates away from profile
|
|
ordered = sorted(content_types, key=lambda ct: {'stories': 0, 'reels': 1, 'posts': 2}.get(ct, 9))
|
|
|
|
for ct in ordered:
|
|
try:
|
|
prev_pending = len(self.pending_downloads)
|
|
self.content_type = ct
|
|
self.output_dir = Path(output_dirs[ct])
|
|
self.phrase_config = phrase_configs.get(ct)
|
|
|
|
# Scan existing files for THIS content type's dir (accumulate, don't replace)
|
|
ct_existing = scan_existing_files_for_media_ids(self.output_dir, self.profile_name)
|
|
self.downloaded_files.update(ct_existing)
|
|
|
|
# Use only API responses relevant to this content type
|
|
ct_api_responses = api_responses_for.get(ct, [])
|
|
|
|
# Same download decision tree as _run_download
|
|
api_result = -1
|
|
use_api = ct in ('stories', 'posts', 'reels') and ct_api_responses
|
|
if use_api and self.high_res and ct == 'posts':
|
|
self.log("High-res mode enabled for posts, trying /api/convert approach", "info")
|
|
api_convert_result = self._download_highres_via_api_convert(page, ct_api_responses)
|
|
if api_convert_result < 0:
|
|
self.log("Falling back to browser-based high-res download", "info")
|
|
else:
|
|
api_result = api_convert_result
|
|
use_api = False
|
|
|
|
if use_api:
|
|
api_result = self._download_from_api(ct_api_responses)
|
|
|
|
if api_result >= 0:
|
|
self.log(f"API-based download complete for {ct}: {api_result} items")
|
|
count = api_result
|
|
else:
|
|
self.log(f"No API data available for {ct}, skipping", "debug")
|
|
count = 0
|
|
|
|
results[ct] = {
|
|
'count': count,
|
|
'pending_downloads': self.pending_downloads[prev_pending:]
|
|
}
|
|
except Exception as e:
|
|
self.log(f"Error downloading {ct}: {e}", "error")
|
|
import traceback
|
|
self.log(traceback.format_exc(), "debug")
|
|
results[ct] = {'count': 0, 'pending_downloads': []}
|
|
|
|
# Stop API interception
|
|
page.remove_listener("response", _capture_api_response)
|
|
|
|
except Exception as e:
|
|
self.log(f"Error: {e}", "error")
|
|
finally:
|
|
try:
|
|
context.close()
|
|
self.log("Browser context closed", "debug")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
browser.close()
|
|
self.log("Browser closed", "debug")
|
|
except Exception:
|
|
pass
|
|
|
|
# Fill in any missing content types with empty results
|
|
for ct in content_types:
|
|
if ct not in results:
|
|
results[ct] = {'count': 0, 'pending_downloads': []}
|
|
|
|
return results
|
|
|
|
def _extract_profile_name(self, input_value):
|
|
"""Extract profile name from username or URL"""
|
|
if "/" in input_value:
|
|
# It's a URL, extract username
|
|
parts = input_value.rstrip('/').split('/')
|
|
for i, part in enumerate(parts):
|
|
if part == "p" and i + 1 < len(parts):
|
|
# It's a post URL, get username from different position
|
|
return None
|
|
elif part in ["stories", "highlights", "reels"] and i > 0:
|
|
# Username is before the content type
|
|
return parts[i-1]
|
|
# Default to last part for profile URLs
|
|
return parts[-1]
|
|
else:
|
|
# Direct username
|
|
return input_value.lower()
|
|
|
|
def _setup_date_filtering(self, days_back, date_from, date_to):
|
|
"""Setup date range for filtering"""
|
|
self.date_from = None
|
|
self.date_to = None
|
|
|
|
if date_from:
|
|
if isinstance(date_from, str):
|
|
self.date_from = datetime.strptime(date_from, "%Y-%m-%d")
|
|
else:
|
|
self.date_from = date_from
|
|
|
|
if date_to:
|
|
if isinstance(date_to, str):
|
|
self.date_to = datetime.strptime(date_to, "%Y-%m-%d")
|
|
else:
|
|
self.date_to = date_to
|
|
|
|
if days_back and not self.date_from:
|
|
# Set date range to include full days
|
|
now = datetime.now()
|
|
self.date_to = datetime(now.year, now.month, now.day, 23, 59, 59) # End of today
|
|
self.date_from = (now - timedelta(days=days_back-1)).replace(hour=0, minute=0, second=0) # Start of N days ago
|
|
self.log(f"Downloading content from last {days_back} days ({self.date_from.strftime('%Y-%m-%d')} to {self.date_to.strftime('%Y-%m-%d')})")
|
|
|
|
def _scan_existing_files(self):
|
|
"""Scan existing files to avoid re-downloading"""
|
|
self.downloaded_files = scan_existing_files_for_media_ids(self.output_dir, self.profile_name)
|
|
if self.downloaded_files:
|
|
self.log(f"Found {len(self.downloaded_files)} existing media IDs, will skip duplicates")
|
|
|
|
def _extract_media_id_from_filename(self, filename):
|
|
"""Extract media ID from filename"""
|
|
name_without_ext = Path(filename).stem
|
|
|
|
if self.profile_name and name_without_ext.startswith(self.profile_name):
|
|
remaining = name_without_ext[len(self.profile_name):].lstrip('_')
|
|
else:
|
|
remaining = name_without_ext
|
|
|
|
return remaining if remaining else name_without_ext
|
|
|
|
def _run_download(self):
|
|
"""Run the actual download process"""
|
|
success_count = 0
|
|
|
|
# Try to get fresh cookies via FlareSolverr if we don't have them or they're old
|
|
if not self._has_valid_cookies() or self._cookies_expired():
|
|
self.log("Cookies missing or expired, attempting FlareSolverr bypass...", "info")
|
|
if self._get_cookies_via_flaresolverr():
|
|
self.log("Successfully got fresh cookies from FlareSolverr", "info")
|
|
else:
|
|
self.log("FlareSolverr unavailable, will try with Playwright", "warning")
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(
|
|
headless=self.headless,
|
|
args=[
|
|
'--disable-blink-features=AutomationControlled',
|
|
'--disable-infobars',
|
|
'--disable-background-timer-throttling',
|
|
'--disable-backgrounding-occluded-windows',
|
|
'--disable-renderer-backgrounding'
|
|
]
|
|
)
|
|
|
|
# CRITICAL: Browser fingerprint must match FlareSolverr for cookies to work
|
|
# Get dynamic fingerprint settings from FlareSolverr
|
|
context_options = get_playwright_context_options()
|
|
context_options['accept_downloads'] = True
|
|
context_options['ignore_https_errors'] = True
|
|
|
|
# IMPORTANT: If cookies have a stored user_agent, use THAT user_agent
|
|
# Cloudflare cf_clearance cookies are fingerprinted to the browser that solved the challenge
|
|
try:
|
|
if self.unified_db:
|
|
stored_user_agent = self.unified_db.get_scraper_cookies_user_agent(self.scraper_id)
|
|
if stored_user_agent:
|
|
self.log(f"Using stored cookie user_agent: {stored_user_agent[:50]}...", "debug")
|
|
context_options['user_agent'] = stored_user_agent
|
|
else:
|
|
self.log(f"Using fingerprint: Chrome {context_options.get('extra_http_headers', {}).get('Sec-Ch-Ua', 'unknown')[:30]}...", "debug")
|
|
else:
|
|
self.log(f"Using fingerprint: Chrome {context_options.get('extra_http_headers', {}).get('Sec-Ch-Ua', 'unknown')[:30]}...", "debug")
|
|
except Exception as e:
|
|
self.log(f"Error getting stored user_agent, using default: {e}", "debug")
|
|
|
|
context = browser.new_context(**context_options)
|
|
|
|
# Load cookies from database or file
|
|
cookies_loaded = False
|
|
if self.unified_db:
|
|
try:
|
|
cookies = self.unified_db.get_scraper_cookies(self.scraper_id)
|
|
if cookies:
|
|
# Clean cookies - remove unsupported properties and convert expiry->expires
|
|
cleaned_cookies = []
|
|
for cookie in cookies:
|
|
cleaned = {k: v for k, v in cookie.items()
|
|
if k not in ['partitionKey', '_crHasCrossSiteAncestor']}
|
|
# FlareSolverr uses 'expiry' but Playwright uses 'expires'
|
|
if 'expiry' in cleaned and 'expires' not in cleaned:
|
|
cleaned['expires'] = cleaned.pop('expiry')
|
|
cleaned_cookies.append(cleaned)
|
|
|
|
# CRITICAL: Clear existing cookies first to ensure new cf_clearance takes effect
|
|
try:
|
|
context.clear_cookies()
|
|
except Exception:
|
|
pass
|
|
|
|
context.add_cookies(cleaned_cookies)
|
|
self.log(f"Loaded {len(cleaned_cookies)} cookies from database", "debug")
|
|
cookies_loaded = True
|
|
except Exception as e:
|
|
self.log(f"Error loading cookies from database: {e}", "warning")
|
|
|
|
# Fallback to file-based cookies
|
|
if not cookies_loaded and self.cookie_file and self.cookie_file.exists():
|
|
try:
|
|
with open(self.cookie_file, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
cookies = data.get('cookies', [])
|
|
if cookies:
|
|
# Convert expiry->expires for Playwright compatibility
|
|
cleaned_cookies = []
|
|
for cookie in cookies:
|
|
cleaned = dict(cookie)
|
|
if 'expiry' in cleaned and 'expires' not in cleaned:
|
|
cleaned['expires'] = cleaned.pop('expiry')
|
|
cleaned_cookies.append(cleaned)
|
|
|
|
# CRITICAL: Clear existing cookies first
|
|
try:
|
|
context.clear_cookies()
|
|
except Exception:
|
|
pass
|
|
|
|
context.add_cookies(cleaned_cookies)
|
|
self.log(f"Loaded {len(cleaned_cookies)} cookies from file", "debug")
|
|
except Exception as e:
|
|
self.log(f"Failed to load cookies: {e}", "warning")
|
|
|
|
# Handle popups
|
|
def handle_popup(page):
|
|
if len(context.pages) > 1:
|
|
self.log("Blocking popup")
|
|
page.close()
|
|
|
|
context.on("page", handle_popup)
|
|
|
|
page = context.new_page()
|
|
page.on("popup", lambda popup: popup.close())
|
|
|
|
# Add anti-detection scripts
|
|
page.add_init_script(get_playwright_stealth_scripts())
|
|
|
|
try:
|
|
# Intercept all API responses to discover FastDL's backend endpoints
|
|
api_responses = []
|
|
|
|
def _capture_api_response(response):
|
|
try:
|
|
url = response.url
|
|
if 'fastdl.app' in url and response.status == 200:
|
|
content_type = response.headers.get('content-type', '')
|
|
if 'json' in content_type:
|
|
try:
|
|
body = response.json()
|
|
api_responses.append({
|
|
'url': url,
|
|
'body': body,
|
|
'size': len(str(body)),
|
|
})
|
|
# Extract endpoint name for logging
|
|
endpoint = url.split('/')[-1].split('?')[0]
|
|
if isinstance(body, dict) and 'result' in body:
|
|
result = body['result']
|
|
if isinstance(result, list):
|
|
self.log(f"[API] Captured {endpoint}: {len(result)} items", "info")
|
|
elif isinstance(result, dict) and 'edges' in result:
|
|
self.log(f"[API] Captured {endpoint}: {len(result['edges'])} edges (count: {result.get('count', '?')})", "info")
|
|
else:
|
|
self.log(f"[API] Captured {endpoint}", "info")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
page.on("response", _capture_api_response)
|
|
|
|
# Navigate to FastDL
|
|
self.log(f"Navigating to FastDL...")
|
|
page.goto(self.fastdl_url, wait_until="domcontentloaded", timeout=60000)
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Enter username
|
|
input_field = page.locator("input[type='text']").first
|
|
if not input_field or not input_field.is_visible():
|
|
self.log("Could not find input field", "error")
|
|
return 0
|
|
|
|
self.log(f"Entering username: {self.username}")
|
|
input_field.fill(self.username)
|
|
page.wait_for_timeout(500)
|
|
|
|
# Click download button
|
|
download_button = page.locator("button:has-text('Download')").first
|
|
if download_button and download_button.is_visible():
|
|
download_button.click(force=True)
|
|
self.log("Loading profile...")
|
|
|
|
# Wait for the profile loading message to disappear
|
|
try:
|
|
self.log("Waiting for profile to load...")
|
|
# Wait for the loader component to disappear (max 30 seconds)
|
|
page.wait_for_selector(".loader-component", timeout=30000, state="detached")
|
|
self.log("Profile loading complete")
|
|
except PlaywrightTimeout:
|
|
self.log("Profile still loading after 30s, continuing anyway...", "warning")
|
|
except Exception as e:
|
|
self.log(f"Error waiting for loader: {e}", "debug")
|
|
|
|
# Additional wait for tabs to render
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Wait for tabs to exist in DOM (they'll become actionable when clicked)
|
|
try:
|
|
page.wait_for_selector("ul.tabs-component", timeout=5000, state="attached")
|
|
tabs_count = page.locator("button.tabs-component__button").count()
|
|
if tabs_count > 0:
|
|
self.log(f"Profile loaded successfully - found {tabs_count} tabs")
|
|
else:
|
|
self.log("Tabs container found but no buttons, waiting...", "warning")
|
|
page.wait_for_timeout(5000)
|
|
except PlaywrightTimeout:
|
|
self.log("Tabs container not found after 5s, continuing anyway...", "warning")
|
|
except Exception as e:
|
|
self.log(f"Error checking tabs: {e}", "warning")
|
|
|
|
# Dismiss consent dialog
|
|
self._dismiss_consent_dialog(page)
|
|
|
|
# Navigate to content tab (this also triggers the API call for that content type)
|
|
if self.content_type != "all":
|
|
self._navigate_to_content_tab(page)
|
|
|
|
# Scroll to load all paginated posts/reels within date range
|
|
if self.content_type in ('posts', 'reels'):
|
|
self._scroll_to_load_api_posts(page, api_responses)
|
|
|
|
# Try API-based download first (much faster — no scrolling/DOM needed)
|
|
# postsV2 is already captured from initial profile load + scrolling
|
|
# stories/reels are captured when we click their tab above
|
|
# Skip API for posts with high_res — need browser to access Instagram directly
|
|
api_result = -1
|
|
use_api = self.content_type in ('stories', 'posts', 'reels') and api_responses
|
|
if use_api and self.high_res and self.content_type == 'posts':
|
|
self.log("High-res mode enabled for posts, trying /api/convert approach", "info")
|
|
api_convert_result = self._download_highres_via_api_convert(page, api_responses)
|
|
if api_convert_result < 0:
|
|
# postsV2 data missing, fall back to browser-based high-res
|
|
self.log("Falling back to browser-based high-res download", "info")
|
|
else:
|
|
api_result = api_convert_result
|
|
use_api = False # Don't also run normal API download for posts
|
|
if use_api:
|
|
api_result = self._download_from_api(api_responses)
|
|
|
|
if api_result >= 0:
|
|
self.log(f"API-based download complete: {api_result} items")
|
|
success_count = api_result
|
|
else:
|
|
if api_responses and self.content_type in ('stories', 'posts', 'reels'):
|
|
self.log("API data not usable, falling back to browser-based download", "info")
|
|
# Build a pk lookup map from API responses so the browser
|
|
# fallback can still tag downloads with the Instagram pk.
|
|
self._cdn_to_pk_map = {}
|
|
if self.content_type == 'stories' and api_responses:
|
|
self._build_pk_map_from_api(api_responses)
|
|
success_count = self._download_content(page)
|
|
|
|
# Stop API interception
|
|
page.remove_listener("response", _capture_api_response)
|
|
|
|
except Exception as e:
|
|
self.log(f"Error: {e}", "error")
|
|
finally:
|
|
try:
|
|
context.close()
|
|
self.log("Browser context closed", "debug")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
browser.close()
|
|
self.log("Browser closed", "debug")
|
|
except Exception:
|
|
pass
|
|
|
|
return success_count
|
|
|
|
def _navigate_to_content_tab(self, page):
|
|
"""Navigate to specific content type tab"""
|
|
# All tabs are lowercase on FastDL
|
|
tab_map = {
|
|
"stories": "stories",
|
|
"posts": "posts",
|
|
"reels": "reels",
|
|
"highlights": "highlights"
|
|
}
|
|
|
|
if self.content_type in tab_map:
|
|
tab_name = tab_map[self.content_type]
|
|
# Use the tabs-component__button selector
|
|
tab_selector = f"button.tabs-component__button:has-text('{tab_name}')"
|
|
|
|
try:
|
|
# Wait for the specific tab to exist in DOM
|
|
page.wait_for_selector(tab_selector, timeout=5000, state="attached")
|
|
|
|
# Get the tab element
|
|
tab = page.locator(tab_selector).first
|
|
|
|
# Dismiss consent overlay before clicking tab
|
|
self._dismiss_consent_dialog(page)
|
|
|
|
# Use dispatch_event to fire a DOM click event directly on the element
|
|
# force=True only dispatches mouse events at coordinates which Vue.js doesn't register
|
|
self.log(f"Clicking {tab_name} tab")
|
|
tab.dispatch_event('click')
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Verify tab switched by checking for active class
|
|
is_active = tab.evaluate("el => el.classList.contains('tabs-component__button--active')")
|
|
if not is_active:
|
|
self.log(f"dispatch_event didn't activate tab, trying JS click", "debug")
|
|
tab.evaluate("el => el.click()")
|
|
page.wait_for_timeout(2000)
|
|
is_active = tab.evaluate("el => el.classList.contains('tabs-component__button--active')")
|
|
if not is_active:
|
|
self.log(f"JS click also failed to activate {tab_name} tab", "warning")
|
|
else:
|
|
self.log(f"JS click activated {tab_name} tab", "debug")
|
|
else:
|
|
self.log(f"{tab_name} tab is now active", "debug")
|
|
|
|
# Wait for tab content to load
|
|
page.wait_for_timeout(3000)
|
|
|
|
except PlaywrightTimeout:
|
|
self.log(f"Timeout waiting for {tab_name} tab to become clickable", "warning")
|
|
except Exception as e:
|
|
self.log(f"Could not click {tab_name} tab: {e}", "warning")
|
|
|
|
def _extract_shortcodes_from_json(self, data, shortcodes_list):
|
|
"""Recursively extract Instagram shortcodes from JSON data
|
|
|
|
Args:
|
|
data: JSON data (dict, list, or primitive)
|
|
shortcodes_list: List to append found shortcodes to
|
|
"""
|
|
if isinstance(data, dict):
|
|
# Check for common keys that might contain shortcodes
|
|
for key in ['shortcode', 'code', 'post_id', 'media_id', 'id', 'pk', 'shortCode']:
|
|
if key in data:
|
|
value = data[key]
|
|
if isinstance(value, str) and len(value) == 11:
|
|
# Validate it looks like a shortcode
|
|
instagram_alphabet = set('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_')
|
|
if set(value).issubset(instagram_alphabet):
|
|
shortcodes_list.append(value)
|
|
|
|
# Recursively check all values
|
|
for value in data.values():
|
|
self._extract_shortcodes_from_json(value, shortcodes_list)
|
|
|
|
elif isinstance(data, list):
|
|
# Recursively check all items
|
|
for item in data:
|
|
self._extract_shortcodes_from_json(item, shortcodes_list)
|
|
|
|
def _extract_instagram_url_from_item(self, item, page):
|
|
"""Extract Instagram post URL from a profile item by clicking on it
|
|
|
|
FastDL displays Instagram shortcodes when you click on a post thumbnail.
|
|
We'll click the item, extract the Instagram URL from the detail view,
|
|
then go back to the grid.
|
|
|
|
Args:
|
|
item: Profile media list item element
|
|
page: Playwright page object
|
|
|
|
Returns:
|
|
Instagram post URL string or None
|
|
"""
|
|
try:
|
|
# Method 1: Check for data attributes first (fast)
|
|
for attr in ['data-url', 'data-post-url', 'data-instagram-url', 'data-shortcode']:
|
|
value = item.get_attribute(attr)
|
|
if value:
|
|
if 'instagram.com/p/' in value:
|
|
return value
|
|
# Check if it's just a shortcode
|
|
elif len(value) == 11 and value.replace('_', '').replace('-', '').isalnum():
|
|
return f"https://www.instagram.com/p/{value}/"
|
|
|
|
# Method 2: Check for Instagram links in the HTML
|
|
all_links = item.locator("a").all()
|
|
for link in all_links:
|
|
href = link.get_attribute("href")
|
|
if href and 'instagram.com/p/' in href:
|
|
return href
|
|
|
|
# Method 3: Click on the item to open detail view
|
|
# Find the clickable image or container
|
|
clickable = item.locator("img.media-content__image").first
|
|
if not clickable or not clickable.is_visible():
|
|
# Try finding any clickable element in the item
|
|
clickable = item.locator("a, button, .media-content__image").first
|
|
|
|
if clickable and clickable.is_visible():
|
|
self.log("Clicking item to extract Instagram URL...", "debug")
|
|
|
|
# Store current URL to know if we navigated
|
|
current_url = page.url
|
|
|
|
# Click the item
|
|
clickable.click(force=True)
|
|
page.wait_for_timeout(2000) # Wait for detail view to load
|
|
|
|
# Look for Instagram URL in the detail view
|
|
# Check page source for Instagram URLs
|
|
page_content = page.content()
|
|
|
|
# Look for instagram.com/p/ URLs in the HTML
|
|
import re
|
|
instagram_pattern = r'https?://(?:www\.)?instagram\.com/p/([A-Za-z0-9_-]{11})'
|
|
matches = re.findall(instagram_pattern, page_content)
|
|
|
|
if matches:
|
|
instagram_url = f"https://www.instagram.com/p/{matches[0]}/"
|
|
self.log(f"Found Instagram URL in detail view: {instagram_url}", "debug")
|
|
|
|
# Go back to grid view
|
|
page.go_back()
|
|
page.wait_for_timeout(1000)
|
|
|
|
return instagram_url
|
|
|
|
# If we didn't find anything, go back
|
|
if page.url != current_url:
|
|
page.go_back()
|
|
page.wait_for_timeout(1000)
|
|
|
|
except Exception as e:
|
|
self.log(f"Error extracting Instagram URL: {e}", "debug")
|
|
# Try to go back if we're stuck
|
|
try:
|
|
page.go_back()
|
|
page.wait_for_timeout(500)
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
def _download_content_highres(self, page):
|
|
"""Download content in high-resolution mode by searching individual Instagram URLs"""
|
|
success_count = 0
|
|
|
|
# STEP 0: Try to intercept API responses to find shortcodes
|
|
api_shortcodes = []
|
|
|
|
def handle_response(response):
|
|
"""Intercept API responses to extract shortcodes"""
|
|
try:
|
|
# Check if this is a FastDL API response
|
|
if 'fastdl.app' in response.url and response.status == 200:
|
|
content_type = response.headers.get('content-type', '')
|
|
if 'json' in content_type:
|
|
try:
|
|
data = response.json()
|
|
# Look for shortcodes in the JSON response
|
|
self._extract_shortcodes_from_json(data, api_shortcodes)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
self.log(f"Error intercepting response: {e}", "debug")
|
|
|
|
# Start listening to responses
|
|
page.on("response", handle_response)
|
|
|
|
# STEP 1: Scroll to load ALL content from the profile
|
|
self.log(f"Loading all {self.content_type} from profile...")
|
|
self._scroll_to_load_content(page)
|
|
|
|
# Stop listening
|
|
page.remove_listener("response", handle_response)
|
|
|
|
if api_shortcodes:
|
|
self.log(f"Extracted {len(api_shortcodes)} shortcodes from API responses!")
|
|
else:
|
|
self.log("No shortcodes found in API responses", "debug")
|
|
|
|
# STEP 1.5: Try to extract all Instagram shortcodes from page source first (faster)
|
|
self.log("Checking page source for Instagram URLs and shortcodes...")
|
|
page_content = page.content()
|
|
|
|
# Method 1: Look for full Instagram URLs (most reliable)
|
|
instagram_pattern = r'https?://(?:www\.)?instagram\.com/p/([A-Za-z0-9_-]{11})'
|
|
instagram_urls_found = re.findall(instagram_pattern, page_content)
|
|
|
|
# Method 2: Look for shortcodes in specific contexts only
|
|
# Look in data attributes that explicitly mention shortcode/post/media
|
|
data_attr_pattern = r'data-(?:shortcode|post-id|media-id|code)=["\']([A-Za-z0-9_-]{11})["\']'
|
|
data_attr_shortcodes = re.findall(data_attr_pattern, page_content, re.IGNORECASE)
|
|
|
|
# Method 3: Look in JavaScript objects with explicit keys
|
|
js_pattern = r'["\']?(?:shortcode|code|post_id|media_id)["\']?\s*[:=]\s*["\']([A-Za-z0-9_-]{11})["\']'
|
|
js_shortcodes = re.findall(js_pattern, page_content, re.IGNORECASE)
|
|
|
|
# Combine initial findings
|
|
potential_shortcodes = list(set(instagram_urls_found + data_attr_shortcodes + js_shortcodes))
|
|
|
|
# Filter out common false positives (HTML attributes, common words)
|
|
blacklist = {
|
|
'crossorigin', 'placeholder', 'description', 'attribution',
|
|
'information', 'application', 'xsrfcookie', 'performance',
|
|
'credentials', 'stylesheets', 'stylesheet_', 'javascript',
|
|
'touchstart', 'touchcancel', 'transparent', 'comfortable'
|
|
}
|
|
|
|
# Additional validation: Instagram shortcodes typically have mixed case
|
|
# and often contain numbers, underscores, or hyphens
|
|
def is_valid_shortcode(sc):
|
|
sc_lower = sc.lower()
|
|
# Reject if in blacklist
|
|
if sc_lower in blacklist:
|
|
return False
|
|
# Reject if all lowercase letters (likely a word)
|
|
if sc.islower() and sc.isalpha():
|
|
return False
|
|
# Reject if starts with common prefixes
|
|
if sc_lower.startswith(('data', 'http', 'www', 'src', 'href')):
|
|
return False
|
|
# Must use Instagram's alphabet only
|
|
instagram_alphabet = set('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_')
|
|
if not set(sc).issubset(instagram_alphabet):
|
|
return False
|
|
# Should have at least one uppercase OR number OR special char
|
|
if not any(c.isupper() or c.isdigit() or c in '-_' for c in sc):
|
|
return False
|
|
return True
|
|
|
|
valid_shortcodes = [sc for sc in potential_shortcodes if is_valid_shortcode(sc)]
|
|
|
|
# Also validate API shortcodes
|
|
api_shortcodes = [sc for sc in api_shortcodes if is_valid_shortcode(sc)]
|
|
|
|
# Combine all found shortcodes (from API responses, page source, JS)
|
|
all_shortcodes = set(api_shortcodes + valid_shortcodes)
|
|
|
|
if all_shortcodes:
|
|
self.log(f"Found {len(all_shortcodes)} valid Instagram shortcodes")
|
|
if api_shortcodes:
|
|
self.log(f" - {len(api_shortcodes)} from API responses")
|
|
if valid_shortcodes:
|
|
self.log(f" - {len(valid_shortcodes)} from page source/HTML")
|
|
|
|
instagram_urls_set = set(f"https://www.instagram.com/p/{shortcode}/" for shortcode in all_shortcodes)
|
|
|
|
# Log a few examples for verification
|
|
examples = list(all_shortcodes)[:5]
|
|
self.log(f"Example shortcodes: {', '.join(examples)}")
|
|
else:
|
|
instagram_urls_set = set()
|
|
self.log("No Instagram shortcodes found - high-res mode will not work", "warning")
|
|
|
|
# STEP 2: Collect all items and look for Instagram URLs or shortcodes
|
|
self.log("Extracting Instagram post URLs from profile content...")
|
|
all_media_items = page.locator("li.profile-media-list__item").all()
|
|
|
|
# Build mapping of Instagram URLs to their metadata
|
|
media_items_data = []
|
|
|
|
# If we found Instagram URLs in page source and count matches items,
|
|
# assume they're in order (much faster than clicking each item)
|
|
use_page_source_urls = len(instagram_urls_set) > 0 and len(instagram_urls_set) >= len(all_media_items) * 0.8
|
|
|
|
if use_page_source_urls:
|
|
self.log(f"Using Instagram URLs from page source (found {len(instagram_urls_set)}, items {len(all_media_items)})")
|
|
instagram_urls_list = list(instagram_urls_set)
|
|
|
|
for idx, item in enumerate(all_media_items):
|
|
# Get the date for this item
|
|
post_date = None
|
|
date_str = None
|
|
time_elem = item.locator("p.media-content__meta-time").first
|
|
if time_elem and time_elem.is_visible():
|
|
date_str = time_elem.get_attribute("title")
|
|
if date_str:
|
|
try:
|
|
post_date = datetime.strptime(date_str, "%m/%d/%Y, %I:%M:%S %p")
|
|
except Exception:
|
|
pass
|
|
|
|
# Try to get Instagram URL
|
|
instagram_url = None
|
|
|
|
# First, try to use pre-extracted URLs from page source (if available)
|
|
if use_page_source_urls and idx < len(instagram_urls_list):
|
|
instagram_url = instagram_urls_list[idx]
|
|
self.log(f"Item {idx+1}: Using URL from page source: {instagram_url}", "debug")
|
|
else:
|
|
# Otherwise, extract from the item itself (may click on it)
|
|
instagram_url = self._extract_instagram_url_from_item(item, page)
|
|
|
|
if instagram_url:
|
|
media_items_data.append({
|
|
'instagram_url': instagram_url,
|
|
'post_date': post_date,
|
|
'date_str': date_str
|
|
})
|
|
else:
|
|
# Fallback: Try to extract media ID and convert (unreliable)
|
|
self.log(f"Item {idx+1}: No Instagram URL found, trying CDN media ID (unreliable)", "warning")
|
|
item_links = item.locator("a[href*='.jpg'], a[href*='.mp4']").all()
|
|
for link in item_links:
|
|
href = link.get_attribute("href")
|
|
if href:
|
|
# Extract media IDs from this URL
|
|
media_ids = self._extract_media_ids_from_fastdl_url(href)
|
|
for media_id in media_ids:
|
|
media_items_data.append({
|
|
'media_id': media_id,
|
|
'instagram_url': None,
|
|
'post_date': post_date,
|
|
'date_str': date_str
|
|
})
|
|
break # Only process first link per item
|
|
|
|
if not media_items_data:
|
|
self.log("No Instagram post URLs or media IDs found in profile content", "warning")
|
|
self.log("", "info")
|
|
self.log("╔═══════════════════════════════════════════════════════════════════════╗", "warning")
|
|
self.log("║ HIGH-RES MODE FAILED: FastDL doesn't expose Instagram shortcodes ║", "warning")
|
|
self.log("║ ║", "warning")
|
|
self.log("║ Recommendation: Disable high_res mode in settings.json for FastDL ║", "warning")
|
|
self.log("║ Regular FastDL downloads are already good quality (640x640 or better)║", "warning")
|
|
self.log("╚═══════════════════════════════════════════════════════════════════════╝", "warning")
|
|
return 0
|
|
|
|
self.log(f"Found {len(media_items_data)} media items to download in high-res")
|
|
|
|
# Apply max_downloads limit
|
|
if self.max_downloads:
|
|
media_items_data = media_items_data[:self.max_downloads]
|
|
self.log(f"Limited to {len(media_items_data)} items")
|
|
|
|
# Set initial progress so dashboard shows 0/N immediately
|
|
self.activity_manager.update_status(
|
|
f"Downloading {self.content_type}",
|
|
progress_current=0,
|
|
progress_total=len(media_items_data)
|
|
)
|
|
|
|
# STEP 3: For each item, get Instagram URL and search on FastDL
|
|
consecutive_old_posts = 0
|
|
|
|
for i, item_data in enumerate(media_items_data, 1):
|
|
# Update progress at start of each iteration (fires even on skips)
|
|
self.activity_manager.update_status(
|
|
f"Downloading {self.content_type}",
|
|
progress_current=i,
|
|
progress_total=len(media_items_data)
|
|
)
|
|
|
|
instagram_url = item_data.get('instagram_url')
|
|
media_id = item_data.get('media_id')
|
|
post_date = item_data['post_date']
|
|
|
|
# Extract media ID for tracking
|
|
if instagram_url:
|
|
# Extract shortcode from Instagram URL for tracking
|
|
# URL format: https://www.instagram.com/p/SHORTCODE/
|
|
shortcode_match = re.search(r'/p/([A-Za-z0-9_-]+)', instagram_url)
|
|
if shortcode_match:
|
|
tracking_id = shortcode_match.group(1)
|
|
else:
|
|
tracking_id = instagram_url # Use full URL as fallback
|
|
elif media_id:
|
|
tracking_id = media_id
|
|
else:
|
|
self.log(f"[{i}/{len(media_items_data)}] No Instagram URL or media ID found, skipping")
|
|
continue
|
|
|
|
# Check if already downloaded - check both original and normalized media ID
|
|
normalized_tracking_id = extract_instagram_media_id(tracking_id)
|
|
if tracking_id in self.downloaded_files or normalized_tracking_id in self.downloaded_files:
|
|
self.log(f"[{i}/{len(media_items_data)}] Skipping duplicate (session): {tracking_id}")
|
|
continue
|
|
|
|
if self._is_already_downloaded(tracking_id) or (normalized_tracking_id != tracking_id and self._is_already_downloaded(normalized_tracking_id)):
|
|
self.log(f"[{i}/{len(media_items_data)}] Skipping duplicate (database): {tracking_id}")
|
|
self.downloaded_files.add(tracking_id)
|
|
self.downloaded_files.add(normalized_tracking_id)
|
|
continue
|
|
|
|
# Check date filtering
|
|
if post_date and (self.date_from or self.date_to):
|
|
if self.date_from and post_date < self.date_from:
|
|
self.log(f"[{i}/{len(media_items_data)}] Skipping - too old: {post_date.strftime('%Y-%m-%d')}")
|
|
# Record as checked so we don't check again
|
|
self._record_checked(tracking_id, self.profile_name, self.content_type,
|
|
reason="old_post", post_date=post_date)
|
|
consecutive_old_posts += 1
|
|
if consecutive_old_posts >= 5:
|
|
self.log("Reached old posts, stopping...")
|
|
break
|
|
continue
|
|
|
|
if self.date_to and post_date > self.date_to:
|
|
self.log(f"[{i}/{len(media_items_data)}] Skipping - too new: {post_date.strftime('%Y-%m-%d')}")
|
|
# Record as checked so we don't check again
|
|
self._record_checked(tracking_id, self.profile_name, self.content_type,
|
|
reason="too_new", post_date=post_date)
|
|
continue
|
|
|
|
consecutive_old_posts = 0
|
|
|
|
# Get Instagram URL - either directly or by converting media ID
|
|
if not instagram_url:
|
|
# Fallback: Try to convert media ID to Instagram shortcode
|
|
try:
|
|
shortcode = self._media_id_to_shortcode(media_id)
|
|
instagram_url = f"https://www.instagram.com/p/{shortcode}/"
|
|
self.log(f"[{i}/{len(media_items_data)}] Converting media ID {media_id} → {shortcode}", "warning")
|
|
self.log(f"[{i}/{len(media_items_data)}] NOTE: This conversion may not be accurate", "warning")
|
|
except Exception as e:
|
|
self.log(f"[{i}/{len(media_items_data)}] Error converting media ID {media_id}: {e}", "error")
|
|
continue
|
|
else:
|
|
self.log(f"[{i}/{len(media_items_data)}] Using Instagram URL: {instagram_url}")
|
|
|
|
# Search for this Instagram URL on FastDL to get high-res links
|
|
high_res_links = self._search_instagram_url_on_fastdl(page, instagram_url)
|
|
|
|
if not high_res_links:
|
|
self.log(f"[{i}/{len(media_items_data)}] No high-res links found for {instagram_url}", "warning")
|
|
continue
|
|
|
|
# Check for phrase matching on high-res page (if configured)
|
|
if self.phrase_config and self.phrase_config.get('enabled'):
|
|
# Extract caption from the high-res detail page
|
|
caption_text = ""
|
|
try:
|
|
# Try multiple caption selectors on the high-res page
|
|
caption_selectors = [
|
|
'div.output-list__caption p',
|
|
'.output-list__caption',
|
|
'div.output-list__caption',
|
|
'.media-content__caption',
|
|
'p.media-content__caption'
|
|
]
|
|
|
|
for selector in caption_selectors:
|
|
try:
|
|
caption_elem = page.locator(selector).first
|
|
if caption_elem and caption_elem.is_visible():
|
|
text = caption_elem.text_content() or ""
|
|
if text:
|
|
caption_text = text
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if caption_text:
|
|
# Clean up text
|
|
caption_text = ' '.join(caption_text.split())
|
|
|
|
phrases = self.phrase_config.get('phrases', [])
|
|
if phrases:
|
|
case_sensitive = self.phrase_config.get('case_sensitive', False)
|
|
match_all = self.phrase_config.get('match_all', False)
|
|
|
|
if not case_sensitive:
|
|
caption_text = caption_text.lower()
|
|
phrases = [p.lower() for p in phrases]
|
|
|
|
matches = []
|
|
for phrase in phrases:
|
|
if phrase in caption_text:
|
|
matches.append(phrase)
|
|
|
|
if match_all:
|
|
result = len(matches) == len(phrases)
|
|
else:
|
|
result = len(matches) > 0
|
|
|
|
if not result:
|
|
self.log(f"[{i}/{len(media_items_data)}] Post doesn't match phrase criteria, skipping", "info")
|
|
# Record as checked so we don't check again
|
|
self._record_checked(tracking_id, self.profile_name, self.content_type,
|
|
reason="phrase_checked", post_date=post_date)
|
|
continue
|
|
else:
|
|
self.log(f"[{i}/{len(media_items_data)}] Post matches phrase criteria ({len(matches)}/{len(phrases)} phrases found)", "info")
|
|
else:
|
|
self.log(f"[{i}/{len(media_items_data)}] No caption found on high-res page, downloading anyway", "debug")
|
|
|
|
except Exception as e:
|
|
self.log(f"Error checking phrases on high-res page: {e}", "warning")
|
|
# On error, proceed with download to avoid false negatives
|
|
|
|
# Download each high-res link
|
|
for link_idx, (download_url, ext, is_high_res) in enumerate(high_res_links):
|
|
try:
|
|
# Create clickable element or use direct download
|
|
# For now, we'll try to find and click the download link
|
|
download_link = page.locator(f"a[href='{download_url}']").first
|
|
|
|
if not download_link or not download_link.is_visible():
|
|
self.log(f"Could not find clickable link for high-res download", "debug")
|
|
continue
|
|
|
|
# Download the file
|
|
profile = self.profile_name or "unknown"
|
|
if post_date:
|
|
date_str_formatted = post_date.strftime('%Y%m%d_%H%M%S')
|
|
else:
|
|
date_str_formatted = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
if len(high_res_links) > 1:
|
|
new_filename = f"{profile}_{date_str_formatted}_{tracking_id}_{link_idx+1}{ext}"
|
|
else:
|
|
new_filename = f"{profile}_{date_str_formatted}_{tracking_id}{ext}"
|
|
|
|
filepath = self.output_dir / new_filename
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
with page.expect_download(timeout=30000) as download_info:
|
|
download_link.click(force=True)
|
|
download = download_info.value
|
|
download.save_as(filepath)
|
|
except Exception:
|
|
self.log(f"Browser download failed, trying direct HTTP download", "debug")
|
|
resp = requests.get(download_url, timeout=60, stream=True)
|
|
resp.raise_for_status()
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
# Check for duplicate hash before recording (hash blacklist persists even if original deleted)
|
|
file_hash = self.db.get_file_hash(str(filepath)) if self.db else None
|
|
if file_hash:
|
|
existing = self.db.get_download_by_file_hash(file_hash)
|
|
if existing and existing.get('file_path') and str(filepath) != existing.get('file_path'):
|
|
# Duplicate hash found - content was already downloaded (prevents redownload of deleted content)
|
|
self.log(f"⚠ Duplicate content detected (hash match): {filepath.name} matches {existing['filename']} from {existing['platform']}/{existing['source']}", "warning")
|
|
# Delete the duplicate regardless of whether original file still exists
|
|
try:
|
|
filepath.unlink()
|
|
self.log(f"Deleted duplicate (hash blacklist): {filepath.name}", "debug")
|
|
continue
|
|
except Exception as e:
|
|
self.log(f"Failed to delete duplicate {filepath.name}: {e}", "warning")
|
|
|
|
# Update timestamps
|
|
if post_date:
|
|
self._update_all_timestamps(filepath, post_date)
|
|
self.log(f"✓ [{i}/{len(media_items_data)}] Saved high-res: {filepath.name} (dated: {post_date.strftime('%Y-%m-%d %H:%M')})", "success")
|
|
else:
|
|
self.log(f"✓ [{i}/{len(media_items_data)}] Saved high-res: {filepath.name}", "success")
|
|
|
|
# Record in database with normalized media_id for cross-module detection
|
|
self._record_download(
|
|
media_id=normalized_tracking_id,
|
|
username=self.profile_name,
|
|
content_type=self.content_type,
|
|
filename=str(filepath),
|
|
download_url=download_url,
|
|
post_date=post_date,
|
|
metadata={'high_res': True, 'instagram_url': instagram_url},
|
|
deferred=self.defer_database
|
|
)
|
|
|
|
self.downloaded_files.add(tracking_id)
|
|
self.downloaded_files.add(normalized_tracking_id)
|
|
success_count += 1
|
|
|
|
# Smart delay between downloads
|
|
self._smart_delay()
|
|
|
|
except Exception as e:
|
|
self.log(f"Error downloading high-res file: {e}", "error")
|
|
continue
|
|
|
|
return success_count
|
|
|
|
def _download_highres_via_api_convert(self, page, api_responses):
|
|
"""Download high-res posts using /api/convert endpoint instead of browser scraping.
|
|
|
|
Uses postsV2 data (already captured from profile load) to get shortcodes,
|
|
then triggers /api/convert for each post to get high-res download URLs.
|
|
|
|
Args:
|
|
page: Playwright page object
|
|
api_responses: List of captured API responses from profile load
|
|
|
|
Returns:
|
|
Number of files downloaded, or -1 if postsV2 data not available (triggers fallback).
|
|
"""
|
|
# Find postsV2 data from captured API responses (merge all pages)
|
|
posts_data = None
|
|
for resp in api_responses:
|
|
url = resp.get('url', '')
|
|
body = resp.get('body', {})
|
|
if not isinstance(body, dict) or 'result' not in body:
|
|
continue
|
|
if '/postsV2' in url:
|
|
result = body['result']
|
|
if isinstance(result, dict) and 'edges' in result:
|
|
if posts_data is None:
|
|
posts_data = result
|
|
else:
|
|
more_edges = result.get('edges', [])
|
|
if more_edges:
|
|
posts_data['edges'].extend(more_edges)
|
|
|
|
if posts_data is None:
|
|
self.log("No postsV2 data found in API responses, cannot use /api/convert", "warning")
|
|
return -1 # Signal caller to fall back to browser-based high-res
|
|
|
|
edges = posts_data.get('edges', [])
|
|
if not edges:
|
|
self.log("postsV2 has no edges")
|
|
return 0
|
|
|
|
# Extract shortcodes + dates from postsV2
|
|
post_entries = []
|
|
for edge in edges:
|
|
node = edge.get('node', edge)
|
|
shortcode = node.get('shortcode', '')
|
|
if not shortcode:
|
|
continue
|
|
taken_at = node.get('taken_at_timestamp') or node.get('taken_at', 0)
|
|
post_date = datetime.fromtimestamp(taken_at) if taken_at else None
|
|
post_entries.append({
|
|
'shortcode': shortcode,
|
|
'post_date': post_date,
|
|
'post_id': str(node.get('id', '')),
|
|
})
|
|
|
|
self.log(f"Found {len(post_entries)} posts from postsV2 for high-res /api/convert download")
|
|
|
|
# Filter: dedup (session + DB), date range, max_downloads
|
|
filtered_entries = []
|
|
consecutive_old = 0
|
|
for entry in post_entries:
|
|
shortcode = entry['shortcode']
|
|
post_date = entry['post_date']
|
|
|
|
# Session dedup
|
|
if shortcode in self.downloaded_files:
|
|
continue
|
|
|
|
# Database dedup
|
|
if self._is_already_downloaded(shortcode):
|
|
self.downloaded_files.add(shortcode)
|
|
continue
|
|
|
|
# Date filtering
|
|
if post_date and (self.date_from or self.date_to):
|
|
if self.date_from and post_date < self.date_from:
|
|
self.log(f"Skipping old post: {shortcode} ({post_date.strftime('%Y-%m-%d')})")
|
|
self._record_checked(shortcode, self.profile_name, self.content_type,
|
|
reason="old_post", post_date=post_date)
|
|
consecutive_old += 1
|
|
if consecutive_old >= 5:
|
|
self.log("Reached old posts, stopping")
|
|
break
|
|
continue
|
|
if self.date_to and post_date > self.date_to:
|
|
self.log(f"Skipping future post: {shortcode} ({post_date.strftime('%Y-%m-%d')})")
|
|
continue
|
|
consecutive_old = 0
|
|
|
|
filtered_entries.append(entry)
|
|
|
|
# Apply max_downloads limit
|
|
if self.max_downloads and len(filtered_entries) > self.max_downloads:
|
|
filtered_entries = filtered_entries[:self.max_downloads]
|
|
self.log(f"Limiting to {self.max_downloads} posts")
|
|
|
|
if not filtered_entries:
|
|
self.log("No new posts to download after filtering")
|
|
return 0
|
|
|
|
self.log(f"Processing {len(filtered_entries)} posts via /api/convert for high-res download...")
|
|
|
|
# Set initial progress
|
|
self.activity_manager.update_status(
|
|
f"Downloading {self.content_type} (high-res)",
|
|
progress_current=0,
|
|
progress_total=len(filtered_entries)
|
|
)
|
|
|
|
# For each post: fetch via /api/convert, extract items, apply phrase matching
|
|
all_items = []
|
|
for i, entry in enumerate(filtered_entries, 1):
|
|
shortcode = entry['shortcode']
|
|
fallback_date = entry['post_date']
|
|
instagram_url = f"https://instagram.com/p/{shortcode}/"
|
|
|
|
self.activity_manager.update_status(
|
|
f"Fetching high-res post {i}/{len(filtered_entries)}",
|
|
progress_current=i,
|
|
progress_total=len(filtered_entries)
|
|
)
|
|
|
|
self.log(f"[{i}/{len(filtered_entries)}] Fetching /api/convert for {shortcode}...")
|
|
|
|
convert_data = self._fetch_highres_via_api_convert(page, instagram_url)
|
|
if not convert_data:
|
|
self.log(f"[{i}/{len(filtered_entries)}] No /api/convert data for {shortcode}, skipping", "warning")
|
|
continue
|
|
|
|
items = self._extract_highres_items_from_convert_response(convert_data, shortcode, fallback_date)
|
|
if not items:
|
|
self.log(f"[{i}/{len(filtered_entries)}] No downloadable items from /api/convert for {shortcode}", "warning")
|
|
continue
|
|
|
|
# Phrase matching using caption from /api/convert response (meta.title)
|
|
if self.phrase_config and self.phrase_config.get('enabled'):
|
|
caption = items[0].get('caption', '') if items else ''
|
|
if caption:
|
|
phrases = self.phrase_config.get('phrases', [])
|
|
if phrases:
|
|
case_sensitive = self.phrase_config.get('case_sensitive', False)
|
|
match_all = self.phrase_config.get('match_all', False)
|
|
|
|
check_text = caption if case_sensitive else caption.lower()
|
|
check_phrases = phrases if case_sensitive else [p.lower() for p in phrases]
|
|
|
|
matches = [p for p in check_phrases if p in check_text]
|
|
|
|
if match_all:
|
|
passed = len(matches) == len(check_phrases)
|
|
else:
|
|
passed = len(matches) > 0
|
|
|
|
if not passed:
|
|
self.log(f"[{i}/{len(filtered_entries)}] Post {shortcode} doesn't match phrase criteria, skipping")
|
|
self._record_checked(shortcode, self.profile_name, self.content_type,
|
|
reason="phrase_checked", post_date=fallback_date)
|
|
continue
|
|
else:
|
|
self.log(f"[{i}/{len(filtered_entries)}] Post matches phrases ({len(matches)}/{len(phrases)})")
|
|
else:
|
|
self.log(f"[{i}/{len(filtered_entries)}] No caption from /api/convert, downloading anyway", "debug")
|
|
|
|
# Dedup individual carousel items
|
|
new_for_post = 0
|
|
for item in items:
|
|
mid = item['media_id']
|
|
norm = item.get('normalized_media_id', mid)
|
|
if mid in self.downloaded_files or norm in self.downloaded_files:
|
|
continue
|
|
if self._is_already_downloaded(mid) or (norm != mid and self._is_already_downloaded(norm)):
|
|
self.downloaded_files.add(mid)
|
|
self.downloaded_files.add(norm)
|
|
continue
|
|
all_items.append(item)
|
|
new_for_post += 1
|
|
|
|
# Record shortcode as processed so next run skips the /api/convert fetch
|
|
if new_for_post == 0:
|
|
# All items already downloaded — record shortcode to avoid re-fetching
|
|
self.downloaded_files.add(shortcode)
|
|
self._record_checked(shortcode, self.profile_name, self.content_type,
|
|
reason="downloaded", post_date=fallback_date)
|
|
|
|
# Smart delay between posts (not between carousel items)
|
|
if i < len(filtered_entries):
|
|
self._smart_delay()
|
|
|
|
if not all_items:
|
|
self.log("No new high-res items to download after processing")
|
|
return 0
|
|
|
|
self.log(f"Downloading {len(all_items)} high-res items via parallel HTTP...")
|
|
count = self._download_items_parallel(all_items)
|
|
|
|
# Record all processed shortcodes so next run skips the /api/convert fetch
|
|
for entry in filtered_entries:
|
|
sc = entry['shortcode']
|
|
self.downloaded_files.add(sc)
|
|
self._record_checked(sc, self.profile_name, self.content_type,
|
|
reason="downloaded", post_date=entry.get('post_date'))
|
|
|
|
return count
|
|
|
|
def _download_from_api(self, api_responses):
|
|
"""Download content directly from intercepted API responses (no browser needed).
|
|
|
|
Returns:
|
|
Number of files downloaded, or -1 if API data not available for this content type.
|
|
"""
|
|
# Find the relevant API response(s) for our content type
|
|
api_data = None
|
|
for resp in api_responses:
|
|
url = resp.get('url', '')
|
|
body = resp.get('body', {})
|
|
if not isinstance(body, dict) or 'result' not in body:
|
|
continue
|
|
|
|
if self.content_type == 'stories' and '/stories' in url:
|
|
api_data = body['result']
|
|
break
|
|
elif self.content_type in ('posts', 'reels') and '/postsV2' in url:
|
|
result = body['result']
|
|
if api_data is None:
|
|
api_data = result
|
|
elif isinstance(api_data, dict) and 'edges' in api_data and isinstance(result, dict):
|
|
# Merge edges from additional paginated responses
|
|
more_edges = result.get('edges', [])
|
|
if more_edges:
|
|
api_data['edges'].extend(more_edges)
|
|
|
|
if api_data is None:
|
|
return -1 # No API data for this content type
|
|
|
|
# Extract download items based on content type
|
|
items = []
|
|
if self.content_type == 'stories':
|
|
if not isinstance(api_data, list):
|
|
return -1
|
|
items = self._extract_stories_from_api(api_data)
|
|
elif self.content_type in ('posts', 'reels'):
|
|
if not isinstance(api_data, dict) or 'edges' not in api_data:
|
|
return -1
|
|
items = self._extract_posts_from_api(api_data)
|
|
|
|
if not items:
|
|
self.log("No downloadable items found in API response")
|
|
return 0
|
|
|
|
self.log(f"Found {len(items)} items from API response")
|
|
|
|
# Filter out already-downloaded items
|
|
new_items = []
|
|
for item in items:
|
|
media_id = item['media_id']
|
|
normalized = item.get('normalized_media_id', media_id)
|
|
if media_id in self.downloaded_files or normalized in self.downloaded_files:
|
|
continue
|
|
if self._is_already_downloaded(media_id) or (normalized and normalized != media_id and self._is_already_downloaded(normalized)):
|
|
self.downloaded_files.add(media_id)
|
|
if normalized:
|
|
self.downloaded_files.add(normalized)
|
|
continue
|
|
new_items.append(item)
|
|
|
|
if not new_items:
|
|
self.log("All items already downloaded")
|
|
return 0
|
|
|
|
# Apply date filtering
|
|
filtered_items = []
|
|
consecutive_old = 0
|
|
for item in new_items:
|
|
post_date = item.get('post_date')
|
|
if post_date and (self.date_from or self.date_to):
|
|
if self.date_from and post_date < self.date_from:
|
|
self.log(f"Skipping old item: {post_date.strftime('%Y-%m-%d')}")
|
|
self._record_checked(item['media_id'], self.profile_name, self.content_type,
|
|
reason="old_post", post_date=post_date)
|
|
# Track shortcode so other content types don't re-check the same post
|
|
if item.get('shortcode'):
|
|
self.downloaded_files.add(item['shortcode'])
|
|
consecutive_old += 1
|
|
if self.content_type != 'stories' and consecutive_old >= 5:
|
|
self.log("Reached old posts, stopping")
|
|
break
|
|
continue
|
|
if self.date_to and post_date > self.date_to:
|
|
self.log(f"Skipping future item: {post_date.strftime('%Y-%m-%d')}")
|
|
continue
|
|
consecutive_old = 0
|
|
self.log(f"Item within date range: {post_date.strftime('%Y-%m-%d')}")
|
|
filtered_items.append(item)
|
|
|
|
# Apply max_downloads limit
|
|
if self.max_downloads and len(filtered_items) > self.max_downloads:
|
|
filtered_items = filtered_items[:self.max_downloads]
|
|
self.log(f"Limiting to {self.max_downloads} items")
|
|
|
|
if not filtered_items:
|
|
self.log("No items passed filtering")
|
|
return 0
|
|
|
|
self.log(f"Downloading {len(filtered_items)} items via API (parallel HTTP)...")
|
|
return self._download_items_parallel(filtered_items)
|
|
|
|
def _download_items_parallel(self, filtered_items):
|
|
"""Download items in parallel via HTTP with post-processing.
|
|
|
|
Items need: download_url, filename, media_id, normalized_media_id, post_date, ext
|
|
|
|
Returns: number of successfully downloaded files.
|
|
"""
|
|
if not filtered_items:
|
|
return 0
|
|
|
|
# Set initial progress
|
|
self.activity_manager.update_status(
|
|
f"Downloading {self.content_type}",
|
|
progress_current=0,
|
|
progress_total=len(filtered_items)
|
|
)
|
|
|
|
# Download all items in parallel via HTTP
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
success_count = 0
|
|
results = []
|
|
|
|
def _download_single(item):
|
|
"""Download a single file via HTTP with retry on server errors. Thread-safe."""
|
|
last_error = None
|
|
for attempt in range(3):
|
|
try:
|
|
resp = requests.get(item['download_url'], timeout=60, stream=True)
|
|
resp.raise_for_status()
|
|
filepath = self.output_dir / item['filename']
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
return {**item, 'filepath': filepath, 'success': True}
|
|
except requests.exceptions.HTTPError as e:
|
|
last_error = e
|
|
if resp.status_code >= 500 and attempt < 2:
|
|
time.sleep(2 * (attempt + 1))
|
|
continue
|
|
break
|
|
except Exception as e:
|
|
last_error = e
|
|
break
|
|
self.log(f"Download failed for {item['media_id']}: {last_error}", "warning")
|
|
return {**item, 'success': False, 'error': str(last_error)}
|
|
|
|
max_workers = min(4, len(filtered_items))
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
futures = {}
|
|
for idx, item in enumerate(filtered_items):
|
|
future = executor.submit(_download_single, item)
|
|
futures[future] = item
|
|
if idx < len(filtered_items) - 1:
|
|
time.sleep(0.2)
|
|
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
if result.get('success'):
|
|
results.append(result)
|
|
self.activity_manager.update_status(
|
|
f"Downloading {self.content_type}",
|
|
progress_current=len(results),
|
|
progress_total=len(filtered_items)
|
|
)
|
|
|
|
# Post-process: timestamps, hash check, DB recording (sequential)
|
|
for result in results:
|
|
filepath = result['filepath']
|
|
media_id = result['media_id']
|
|
normalized = result.get('normalized_media_id', media_id)
|
|
post_date = result.get('post_date')
|
|
download_url = result.get('download_url', '')
|
|
|
|
# Hash duplicate check
|
|
file_hash = self.db.get_file_hash(str(filepath)) if self.db else None
|
|
if file_hash:
|
|
existing = self.db.get_download_by_file_hash(file_hash)
|
|
if existing and existing.get('file_path') and str(filepath) != existing.get('file_path'):
|
|
self.log(f"Duplicate detected: {filepath.name}", "warning")
|
|
try:
|
|
filepath.unlink()
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
# Update timestamps
|
|
if post_date:
|
|
self._update_all_timestamps(filepath, post_date)
|
|
self.log(f"Saved: {filepath.name} (dated: {post_date.strftime('%Y-%m-%d %H:%M')})")
|
|
else:
|
|
self.log(f"Saved: {filepath.name}")
|
|
|
|
# Record in database — include pk for stories so callers
|
|
# can use the stable Instagram primary key as story ID
|
|
meta = result.get('metadata') or {}
|
|
if result.get('pk'):
|
|
meta['pk'] = result['pk']
|
|
self._record_download(
|
|
media_id=normalized or media_id,
|
|
username=self.profile_name,
|
|
content_type=self.content_type,
|
|
filename=str(filepath),
|
|
download_url=download_url,
|
|
post_date=post_date,
|
|
metadata=meta or None,
|
|
deferred=self.defer_database
|
|
)
|
|
|
|
self.downloaded_files.add(media_id)
|
|
if normalized:
|
|
self.downloaded_files.add(normalized)
|
|
success_count += 1
|
|
|
|
return success_count
|
|
|
|
def _extract_media_id_from_cdn_url(self, url):
|
|
"""Extract Instagram media ID from a CDN URL path.
|
|
|
|
Instagram CDN URLs look like:
|
|
https://scontent-xxx.cdninstagram.com/.../643551919_18095277650490921_7199803193185481374_n.jpg?...
|
|
|
|
Returns the filename stem like '643551919_18095277650490921_7199803193185481374_n'
|
|
"""
|
|
if not url:
|
|
return None
|
|
try:
|
|
# Parse the URL path, get the last segment before query params
|
|
path = urllib.parse.urlparse(url).path
|
|
filename = Path(path).stem # filename without extension
|
|
# Validate it looks like an Instagram media filename (contains underscores and digits)
|
|
if filename and '_' in filename and any(c.isdigit() for c in filename):
|
|
return filename
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _build_pk_map_from_api(self, api_responses):
|
|
"""Build a CDN-filename-to-pk map from captured API responses.
|
|
|
|
When the API-based download fails and we fall back to browser scraping,
|
|
we still need the pk for each story so callers can use stable IDs.
|
|
This extracts pk from the raw API data and maps it by CDN filename.
|
|
"""
|
|
for resp in api_responses:
|
|
url = resp.get('url', '')
|
|
body = resp.get('body', {})
|
|
if not isinstance(body, dict) or 'result' not in body:
|
|
continue
|
|
if '/stories' not in url:
|
|
continue
|
|
result = body['result']
|
|
if not isinstance(result, list):
|
|
continue
|
|
for story in result:
|
|
pk = str(story.get('pk', ''))
|
|
if not pk:
|
|
continue
|
|
# Map CDN filenames from all video/image versions to this pk
|
|
for vv in story.get('video_versions', []):
|
|
cdn_url = vv.get('url', '')
|
|
fname = self._extract_media_id_from_cdn_url(cdn_url)
|
|
if fname:
|
|
self._cdn_to_pk_map[fname] = pk
|
|
for cand in story.get('image_versions2', {}).get('candidates', []):
|
|
cdn_url = cand.get('url', '')
|
|
fname = self._extract_media_id_from_cdn_url(cdn_url)
|
|
if fname:
|
|
self._cdn_to_pk_map[fname] = pk
|
|
if self._cdn_to_pk_map:
|
|
self.log(f"Built pk map for {len(self._cdn_to_pk_map)} CDN filenames from API data", "debug")
|
|
|
|
def _extract_stories_from_api(self, stories_data):
|
|
"""Extract download items from stories API response."""
|
|
items = []
|
|
profile = self.profile_name or "unknown"
|
|
|
|
for story in stories_data:
|
|
try:
|
|
pk = str(story.get('pk', ''))
|
|
taken_at = story.get('taken_at', 0)
|
|
post_date = datetime.fromtimestamp(taken_at) if taken_at else None
|
|
|
|
# Determine if video or image
|
|
video_versions = story.get('video_versions', [])
|
|
if video_versions:
|
|
# Video — get highest resolution
|
|
best = max(video_versions, key=lambda v: v.get('height', 0) * v.get('width', 0))
|
|
cdn_url = best.get('url', '')
|
|
download_url = best.get('url_downloadable') or cdn_url
|
|
ext = '.mp4'
|
|
else:
|
|
# Image — get highest resolution candidate
|
|
candidates = story.get('image_versions2', {}).get('candidates', [])
|
|
if not candidates:
|
|
continue
|
|
best = max(candidates, key=lambda c: c.get('height', 0) * c.get('width', 0))
|
|
cdn_url = best.get('url', '')
|
|
download_url = best.get('url_downloadable') or cdn_url
|
|
ext = '.jpg'
|
|
|
|
if not download_url:
|
|
continue
|
|
|
|
# Extract media_id from the CDN URL (has Instagram filename)
|
|
# url_downloadable is a FastDL proxy URL, cdn url has the real filename
|
|
media_id = self._extract_media_id_from_cdn_url(cdn_url)
|
|
if not media_id:
|
|
# Fallback: try extracting from url_downloadable's filename param
|
|
if 'filename=' in download_url:
|
|
parsed = urllib.parse.urlparse(download_url)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
fn = params.get('filename', [''])[0]
|
|
if fn:
|
|
media_id = Path(fn).stem
|
|
if not media_id:
|
|
media_id = pk
|
|
|
|
normalized = extract_instagram_media_id(media_id) if media_id else pk
|
|
|
|
date_str = post_date.strftime('%Y%m%d_%H%M%S') if post_date else datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f"{profile}_{date_str}_{media_id}{ext}"
|
|
|
|
items.append({
|
|
'media_id': media_id,
|
|
'normalized_media_id': normalized,
|
|
'download_url': download_url,
|
|
'filename': filename,
|
|
'post_date': post_date,
|
|
'ext': ext,
|
|
'pk': pk,
|
|
})
|
|
except Exception as e:
|
|
self.log(f"Error parsing story item: {e}", "debug")
|
|
continue
|
|
|
|
return items
|
|
|
|
def _extract_posts_from_api(self, posts_data):
|
|
"""Extract download items from postsV2 API response."""
|
|
items = []
|
|
profile = self.profile_name or "unknown"
|
|
edges = posts_data.get('edges', [])
|
|
|
|
for edge in edges:
|
|
try:
|
|
node = edge.get('node', edge) # Some formats wrap in 'node'
|
|
post_id = str(node.get('id', ''))
|
|
shortcode = node.get('shortcode', '')
|
|
is_video = node.get('is_video', False)
|
|
taken_at = node.get('taken_at_timestamp') or node.get('taken_at', 0)
|
|
post_date = datetime.fromtimestamp(taken_at) if taken_at else None
|
|
|
|
# Filter by content type: reels are always videos
|
|
# product_type "clips" = reels (if available in API data)
|
|
if self.content_type == 'reels':
|
|
product_type = node.get('product_type', '')
|
|
if product_type:
|
|
# If product_type is available, use it for precise filtering
|
|
if product_type != 'clips':
|
|
continue
|
|
elif not is_video:
|
|
# Fallback: at minimum, reels must be videos
|
|
continue
|
|
|
|
cdn_url = ''
|
|
download_url = ''
|
|
|
|
if is_video:
|
|
download_url = node.get('video_url', '')
|
|
cdn_url = download_url
|
|
if not download_url:
|
|
resources = node.get('display_resources', [])
|
|
if resources:
|
|
best = max(resources, key=lambda r: r.get('config_width', 0) * r.get('config_height', 0))
|
|
cdn_url = best.get('src', '')
|
|
download_url = best.get('url_downloadable') or cdn_url
|
|
ext = '.mp4'
|
|
else:
|
|
resources = node.get('display_resources', [])
|
|
if resources:
|
|
best = max(resources, key=lambda r: r.get('config_width', 0) * r.get('config_height', 0))
|
|
cdn_url = best.get('src', '')
|
|
download_url = best.get('url_downloadable') or cdn_url
|
|
else:
|
|
cdn_url = node.get('display_url', '')
|
|
download_url = cdn_url
|
|
ext = '.jpg'
|
|
|
|
if not download_url:
|
|
continue
|
|
|
|
# Extract media_id from CDN URL (has Instagram filename)
|
|
media_id = self._extract_media_id_from_cdn_url(cdn_url)
|
|
if not media_id:
|
|
# Fallback: try url_downloadable's filename param
|
|
if 'filename=' in download_url:
|
|
parsed = urllib.parse.urlparse(download_url)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
fn = params.get('filename', [''])[0]
|
|
if fn:
|
|
media_id = Path(fn).stem
|
|
if not media_id:
|
|
media_id = shortcode or post_id
|
|
|
|
normalized = extract_instagram_media_id(media_id) if media_id else post_id
|
|
|
|
date_str = post_date.strftime('%Y%m%d_%H%M%S') if post_date else datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f"{profile}_{date_str}_{media_id}{ext}"
|
|
|
|
items.append({
|
|
'media_id': media_id,
|
|
'normalized_media_id': normalized,
|
|
'download_url': download_url,
|
|
'filename': filename,
|
|
'post_date': post_date,
|
|
'ext': ext,
|
|
'shortcode': shortcode,
|
|
'post_id': post_id,
|
|
})
|
|
except Exception as e:
|
|
self.log(f"Error parsing post edge: {e}", "debug")
|
|
continue
|
|
|
|
return items
|
|
|
|
def _download_content(self, page):
|
|
"""Download content from the page"""
|
|
# Special handling for highlights
|
|
if self.content_type == "highlights":
|
|
return self._download_highlights(page)
|
|
|
|
# Use high-res mode ONLY for posts (stories/reels already at best quality)
|
|
if self.high_res and self.content_type == "posts":
|
|
self.log("Using high-resolution download mode for posts", "info")
|
|
return self._download_content_highres(page)
|
|
|
|
success_count = 0
|
|
|
|
# Update activity status
|
|
self.activity_manager.update_status(f"Checking {self.content_type}")
|
|
|
|
# STEP 1: Scroll to load ALL content first
|
|
self.log(f"Scrolling to load all {self.content_type} content...")
|
|
self._scroll_to_load_content(page)
|
|
|
|
# STEP 2: After scrolling, collect all items and their dates
|
|
self.log("Collecting all items and dates after scrolling...")
|
|
all_media_items = page.locator("li.profile-media-list__item").all()
|
|
|
|
# Build a mapping of media items to dates
|
|
item_dates = {}
|
|
for item in all_media_items:
|
|
time_elem = item.locator("p.media-content__meta-time").first
|
|
if time_elem and time_elem.is_visible():
|
|
date_str = time_elem.get_attribute("title")
|
|
if date_str:
|
|
try:
|
|
# Parse date - use m/d/Y format
|
|
date_obj = datetime.strptime(date_str, "%m/%d/%Y, %I:%M:%S %p")
|
|
# Map all download links in this item to this date
|
|
item_links = item.locator("a[href*='.jpg'], a[href*='.mp4']").all()
|
|
for link in item_links:
|
|
href = link.get_attribute("href")
|
|
if href:
|
|
item_dates[href] = (date_str, date_obj)
|
|
except Exception:
|
|
pass
|
|
|
|
# STEP 3: Get all download links after everything is loaded
|
|
all_download_links = page.locator("a[href*='.jpg'], a[href*='.mp4']").all()
|
|
|
|
if not all_download_links:
|
|
self.log("No downloadable items found")
|
|
return 0
|
|
|
|
# STEP 3.5: Filter out duplicates BEFORE counting
|
|
download_links = []
|
|
skipped_duplicates = 0
|
|
|
|
for element in all_download_links:
|
|
if not element.is_visible():
|
|
continue
|
|
|
|
# Check for duplicates during collection
|
|
href = element.get_attribute("href") or ""
|
|
if "filename=" in href:
|
|
parsed = urllib.parse.urlparse(href)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
if 'filename' in params:
|
|
url_filename = params['filename'][0]
|
|
media_id = self._extract_media_id_from_filename(url_filename)
|
|
normalized_media_id = extract_instagram_media_id(media_id) if media_id else None
|
|
|
|
# Check in-memory cache first (both original and normalized)
|
|
if media_id in self.downloaded_files or (normalized_media_id and normalized_media_id in self.downloaded_files):
|
|
skipped_duplicates += 1
|
|
continue
|
|
|
|
# Check database (both original and normalized)
|
|
if self._is_already_downloaded(media_id) or (normalized_media_id and normalized_media_id != media_id and self._is_already_downloaded(normalized_media_id)):
|
|
self.downloaded_files.add(media_id) # Add to cache
|
|
if normalized_media_id:
|
|
self.downloaded_files.add(normalized_media_id)
|
|
skipped_duplicates += 1
|
|
continue
|
|
|
|
# Not a duplicate, add to download list
|
|
download_links.append(element)
|
|
|
|
if skipped_duplicates > 0:
|
|
self.log(f"Filtered out {skipped_duplicates} already-downloaded items")
|
|
|
|
if not download_links:
|
|
self.log("No new items to download (all are duplicates)")
|
|
return 0
|
|
|
|
self.log(f"Found {len(download_links)} new items to download")
|
|
|
|
# Limit downloads if specified
|
|
limit = len(download_links)
|
|
if self.max_downloads and self.max_downloads < limit:
|
|
limit = self.max_downloads
|
|
self.log(f"Limiting to {limit} items (max_downloads setting)")
|
|
|
|
# Set initial progress so dashboard shows 0/N immediately
|
|
self.activity_manager.update_status(
|
|
f"Downloading {self.content_type}",
|
|
progress_current=0,
|
|
progress_total=limit
|
|
)
|
|
|
|
# Dismiss any cookie consent overlay before clicking download links
|
|
self._dismiss_consent_dialog(page)
|
|
|
|
# STEP 4: Download all items in batch
|
|
consecutive_old_posts = 0 # Track posts outside date range
|
|
|
|
for i in range(limit):
|
|
if i >= len(download_links):
|
|
break
|
|
|
|
# Update progress at start of each iteration (fires even on skips)
|
|
self.activity_manager.update_status(
|
|
f"Downloading {self.content_type}",
|
|
progress_current=i + 1,
|
|
progress_total=limit
|
|
)
|
|
|
|
element = download_links[i]
|
|
if not element.is_visible():
|
|
continue
|
|
|
|
# Find the date for this specific item
|
|
post_date = None
|
|
try:
|
|
# Get the href of this link to look up its date
|
|
href = element.get_attribute("href")
|
|
if href and href in item_dates:
|
|
date_str, post_date = item_dates[href]
|
|
self.log(f"Found date for item {i+1}: {date_str}")
|
|
|
|
# Fallback: Try to find the parent li and get its date
|
|
if not post_date:
|
|
parent_li = element.locator("xpath=ancestor::li[@class='profile-media-list__item']").first
|
|
if parent_li and parent_li.is_visible():
|
|
time_elem = parent_li.locator("p.media-content__meta-time").first
|
|
if time_elem and time_elem.is_visible():
|
|
date_str = time_elem.get_attribute("title")
|
|
if date_str:
|
|
# Parse date - use m/d/Y format
|
|
post_date = datetime.strptime(date_str, "%m/%d/%Y, %I:%M:%S %p")
|
|
self.log(f"Found date via parent li: {date_str}")
|
|
|
|
except Exception as e:
|
|
self.log(f"Could not extract date: {e}")
|
|
|
|
# Check date filtering for all content types when date range is specified
|
|
if post_date and (self.date_from or self.date_to):
|
|
# Extract media_id for tracking
|
|
href = element.get_attribute("href") or ""
|
|
media_id_for_tracking = None
|
|
if "filename=" in href:
|
|
parsed = urllib.parse.urlparse(href)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
if 'filename' in params:
|
|
url_filename = params['filename'][0]
|
|
media_id_for_tracking = self._extract_media_id_from_filename(url_filename)
|
|
|
|
# Apply date filtering
|
|
if self.date_from and post_date < self.date_from:
|
|
self.log(f"Skipping item - too old: {post_date.strftime('%Y-%m-%d')}")
|
|
# Record as checked if we have media_id
|
|
if media_id_for_tracking:
|
|
self._record_checked(media_id_for_tracking, self.profile_name, self.content_type,
|
|
reason="old_post", post_date=post_date)
|
|
consecutive_old_posts += 1
|
|
|
|
# If we've seen 5 consecutive old posts, stop checking
|
|
# (posts are usually in chronological order)
|
|
# For highlights, don't stop early as they may have mixed dates
|
|
if self.content_type != "highlights" and consecutive_old_posts >= 5:
|
|
self.log("Reached old posts, stopping...")
|
|
break
|
|
continue
|
|
|
|
if self.date_to and post_date > self.date_to:
|
|
self.log(f"Skipping item - too new: {post_date.strftime('%Y-%m-%d')}")
|
|
# Record as checked if we have media_id
|
|
if media_id_for_tracking:
|
|
self._record_checked(media_id_for_tracking, self.profile_name, self.content_type,
|
|
reason="too_new", post_date=post_date)
|
|
continue
|
|
|
|
# Post is within range
|
|
consecutive_old_posts = 0 # Reset counter
|
|
self.log(f"Item within date range: {post_date.strftime('%Y-%m-%d')}")
|
|
|
|
# Check for phrase matching if configured (only for posts, not reels or stories)
|
|
if self.phrase_config and self.phrase_config.get('enabled'):
|
|
if self.content_type == 'posts':
|
|
# The caption is visible on the profile page itself
|
|
# Find the parent li element that contains this download link
|
|
parent_item = element.locator("xpath=ancestor::li[@class='profile-media-list__item']").first
|
|
if parent_item and parent_item.is_visible():
|
|
# Get the caption from this specific post item
|
|
caption_elem = parent_item.locator("p.media-content__caption").first
|
|
|
|
if caption_elem and caption_elem.is_visible():
|
|
caption_text = caption_elem.text_content() or ""
|
|
|
|
# Check if caption matches phrases
|
|
phrases = self.phrase_config.get('phrases', [])
|
|
if phrases:
|
|
case_sensitive = self.phrase_config.get('case_sensitive', False)
|
|
match_all = self.phrase_config.get('match_all', False)
|
|
|
|
if not case_sensitive:
|
|
caption_text = caption_text.lower()
|
|
phrases = [p.lower() for p in phrases]
|
|
|
|
matches = []
|
|
for phrase in phrases:
|
|
if phrase in caption_text:
|
|
matches.append(phrase)
|
|
|
|
if match_all:
|
|
result = len(matches) == len(phrases)
|
|
else:
|
|
result = len(matches) > 0
|
|
|
|
if not result:
|
|
self.log(f"Post {i+1} caption doesn't match phrases, skipping")
|
|
# Extract media_id for tracking
|
|
href = element.get_attribute("href") or ""
|
|
if "filename=" in href:
|
|
parsed = urllib.parse.urlparse(href)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
if 'filename' in params:
|
|
url_filename = params['filename'][0]
|
|
media_id_for_phrase = self._extract_media_id_from_filename(url_filename)
|
|
# Record as checked so we don't check again
|
|
self._record_checked(media_id_for_phrase, self.profile_name, self.content_type,
|
|
reason="phrase_checked", post_date=post_date)
|
|
continue
|
|
else:
|
|
self.log(f"Post {i+1} matches phrase criteria ({len(matches)}/{len(phrases)} phrases found)")
|
|
else:
|
|
# No caption found, skip phrase check for this item
|
|
self.log(f"No caption found for post {i+1}, skipping phrase check", "debug")
|
|
|
|
# Download the file
|
|
try:
|
|
href = element.get_attribute("href") or ""
|
|
download_timeout = 30000 # 30 seconds for videos
|
|
|
|
# Try browser download first, fall back to direct HTTP download
|
|
filepath = None
|
|
try:
|
|
with page.expect_download(timeout=download_timeout) as download_info:
|
|
element.click(force=True)
|
|
|
|
download = download_info.value
|
|
original_filename = download.suggested_filename
|
|
|
|
media_id = self._extract_media_id_from_filename(original_filename)
|
|
normalized_media_id = extract_instagram_media_id(media_id) if media_id else media_id
|
|
ext = Path(original_filename).suffix
|
|
profile = self.profile_name or "unknown"
|
|
|
|
if post_date:
|
|
date_str = post_date.strftime('%Y%m%d_%H%M%S')
|
|
else:
|
|
date_str = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
new_filename = f"{profile}_{date_str}_{media_id}{ext}"
|
|
filepath = self.output_dir / new_filename
|
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
download.save_as(filepath)
|
|
|
|
except Exception as dl_err:
|
|
if not href:
|
|
raise dl_err
|
|
self.log(f"Browser download failed ({dl_err}), trying direct HTTP download", "debug")
|
|
|
|
# Direct HTTP download fallback using the href URL
|
|
url_filename = ""
|
|
if "filename=" in href:
|
|
parsed = urllib.parse.urlparse(href)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
url_filename = params.get('filename', [''])[0]
|
|
|
|
if not url_filename:
|
|
url_filename = Path(urllib.parse.urlparse(href).path).name
|
|
|
|
media_id = self._extract_media_id_from_filename(url_filename)
|
|
normalized_media_id = extract_instagram_media_id(media_id) if media_id else media_id
|
|
ext = Path(url_filename).suffix if url_filename else '.jpg'
|
|
profile = self.profile_name or "unknown"
|
|
|
|
if post_date:
|
|
date_str = post_date.strftime('%Y%m%d_%H%M%S')
|
|
else:
|
|
date_str = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
new_filename = f"{profile}_{date_str}_{media_id}{ext}"
|
|
filepath = self.output_dir / new_filename
|
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
resp = requests.get(href, timeout=60, stream=True)
|
|
resp.raise_for_status()
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
# Check for duplicate hash before recording
|
|
file_hash = self.db.get_file_hash(str(filepath)) if self.db else None
|
|
if file_hash:
|
|
existing = self.db.get_download_by_file_hash(file_hash)
|
|
if existing and existing.get('file_path') and str(filepath) != existing.get('file_path'):
|
|
# Duplicate file with same hash exists
|
|
existing_path = Path(existing['file_path'])
|
|
if existing_path.exists():
|
|
self.log(f"⚠ Duplicate file detected: {filepath.name} matches {existing['filename']} from {existing['platform']}/{existing['source']}", "warning")
|
|
# Delete the duplicate and skip to next
|
|
try:
|
|
filepath.unlink()
|
|
self.log(f"Deleted duplicate: {filepath.name}", "debug")
|
|
continue
|
|
except Exception as e:
|
|
self.log(f"Failed to delete duplicate {filepath.name}: {e}", "warning")
|
|
|
|
# Update all timestamps if we have the post date
|
|
if post_date:
|
|
self._update_all_timestamps(filepath, post_date)
|
|
self.log(f"Saved: {filepath.name} (dated: {post_date.strftime('%Y-%m-%d %H:%M')})")
|
|
else:
|
|
self.log(f"Saved: {filepath.name}")
|
|
|
|
# Record in database with normalized media_id for cross-module detection
|
|
# Include pk in metadata if available from API capture (for stories)
|
|
dl_metadata = None
|
|
pk_map = getattr(self, '_cdn_to_pk_map', {})
|
|
if pk_map:
|
|
pk = None
|
|
# Try matching media_id directly (works if it's _n format)
|
|
if media_id:
|
|
pk = pk_map.get(media_id) or pk_map.get(normalized_media_id)
|
|
# Try extracting CDN filename from download URL path
|
|
if not pk and href:
|
|
cdn_filename = self._extract_media_id_from_cdn_url(href)
|
|
if cdn_filename:
|
|
pk = pk_map.get(cdn_filename)
|
|
# Fallback: check if href has a url= param with embedded CDN URL
|
|
if not pk and href and 'url=' in href:
|
|
try:
|
|
href_params = urllib.parse.parse_qs(urllib.parse.urlparse(href).query)
|
|
inner_url = href_params.get('url', [''])[0]
|
|
if inner_url:
|
|
cdn_filename = self._extract_media_id_from_cdn_url(inner_url)
|
|
if cdn_filename:
|
|
pk = pk_map.get(cdn_filename)
|
|
except Exception:
|
|
pass
|
|
if pk:
|
|
dl_metadata = {'pk': pk}
|
|
self.log(f"Mapped browser download {media_id} -> pk {pk}", "debug")
|
|
elif pk_map:
|
|
self.log(f"Could not map browser download {media_id} to pk (map has {len(pk_map)} entries)", "warning")
|
|
self._record_download(
|
|
media_id=normalized_media_id,
|
|
username=self.profile_name,
|
|
content_type=self.content_type,
|
|
filename=str(filepath),
|
|
download_url=href if 'href' in locals() else None,
|
|
post_date=post_date,
|
|
metadata=dl_metadata,
|
|
deferred=self.defer_database
|
|
)
|
|
|
|
self.downloaded_files.add(media_id)
|
|
self.downloaded_files.add(normalized_media_id)
|
|
success_count += 1
|
|
|
|
# Add smart delay between downloads
|
|
if i < len(download_links) - 1: # Don't delay after last item
|
|
self._smart_delay()
|
|
|
|
except Exception as e:
|
|
self.log(f"Error downloading item {i+1}: {e}")
|
|
continue
|
|
|
|
return success_count
|
|
|
|
def _download_highlights(self, page):
|
|
"""Download highlights - each highlight category is clicked and downloaded"""
|
|
total_downloaded = 0
|
|
|
|
# Find all highlight categories
|
|
highlight_buttons = page.locator("li.highlight button.highlight__button").all()
|
|
|
|
if not highlight_buttons:
|
|
self.log("No highlight categories found")
|
|
return 0
|
|
|
|
self.log(f"Found {len(highlight_buttons)} highlight categories")
|
|
|
|
# Get all category names first
|
|
categories = []
|
|
for button in highlight_buttons:
|
|
title_elem = button.locator("p.highlight__title").first
|
|
if title_elem and title_elem.is_visible():
|
|
name = title_elem.text_content().strip()
|
|
categories.append(name)
|
|
|
|
# Process each highlight category
|
|
for i, highlight_name in enumerate(categories):
|
|
try:
|
|
|
|
self.log(f"\nProcessing highlight {i+1}/{len(categories)}: {highlight_name}")
|
|
self.log("="*50)
|
|
|
|
# Create folder for this highlight only when needed
|
|
highlight_folder = self.output_dir / highlight_name
|
|
|
|
# Re-find and click the highlight button (page may have changed)
|
|
# Use filter instead of CSS selector to handle special characters
|
|
all_buttons = page.locator("button.highlight__button").all()
|
|
button = None
|
|
for btn in all_buttons:
|
|
title = btn.locator("p.highlight__title").first
|
|
if title and title.is_visible():
|
|
if title.text_content().strip() == highlight_name:
|
|
button = btn
|
|
break
|
|
if not button or not button.is_visible():
|
|
self.log(f"Could not find button for {highlight_name}")
|
|
continue
|
|
|
|
self.log(f"Clicking on {highlight_name}...")
|
|
button.click(force=True)
|
|
page.wait_for_timeout(5000) # Wait for content to load (increased for reliability)
|
|
|
|
# FIRST: Scroll to load ALL content
|
|
self.log("Scrolling to load all content...")
|
|
self._scroll_to_load_content(page)
|
|
|
|
# SECOND: Collect all items and their dates after scrolling is complete
|
|
self.log("Collecting all items after scrolling...")
|
|
all_media_items = page.locator("li.profile-media-list__item").all()
|
|
item_dates = {}
|
|
for item in all_media_items:
|
|
time_elem = item.locator("p.media-content__meta-time").first
|
|
if time_elem and time_elem.is_visible():
|
|
date_str = time_elem.get_attribute("title")
|
|
if date_str:
|
|
try:
|
|
date_obj = datetime.strptime(date_str, "%m/%d/%Y, %I:%M:%S %p")
|
|
# Map all download links in this item to this date
|
|
item_links = item.locator("a[href*='.jpg'], a[href*='.mp4']").all()
|
|
for link in item_links:
|
|
href = link.get_attribute("href")
|
|
if href:
|
|
item_dates[href] = (date_str, date_obj)
|
|
except Exception:
|
|
pass
|
|
|
|
# THIRD: Get all download links after everything is loaded
|
|
download_links = page.locator("a[href*='.jpg'], a[href*='.mp4']").all()
|
|
|
|
if not download_links:
|
|
self.log(f"No items found in highlight: {highlight_name}")
|
|
# Go back to highlights list
|
|
highlights_tab = page.locator("button.tabs-component__button:has-text('highlights')").first
|
|
if highlights_tab and highlights_tab.is_visible():
|
|
highlights_tab.click(force=True)
|
|
page.wait_for_timeout(2000)
|
|
continue
|
|
|
|
self.log(f"Found {len(download_links)} items in {highlight_name}")
|
|
self._dismiss_consent_dialog(page)
|
|
|
|
# Download each item in the highlight
|
|
for j, element in enumerate(download_links):
|
|
if not element.is_visible():
|
|
continue
|
|
|
|
# Check for duplicates before downloading
|
|
href = element.get_attribute("href") or ""
|
|
media_id = None
|
|
if "filename=" in href:
|
|
parsed = urllib.parse.urlparse(href)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
if 'filename' in params:
|
|
url_filename = params['filename'][0]
|
|
media_id = self._extract_media_id_from_filename(url_filename)
|
|
normalized_media_id = extract_instagram_media_id(media_id) if media_id else None
|
|
|
|
# Check duplicates (both original and normalized)
|
|
if media_id in self.downloaded_files or (normalized_media_id and normalized_media_id in self.downloaded_files):
|
|
self.log(f"Skipping duplicate (session): {url_filename}")
|
|
continue
|
|
|
|
# Check database (both original and normalized)
|
|
if self._is_already_downloaded(media_id) or (normalized_media_id and normalized_media_id != media_id and self._is_already_downloaded(normalized_media_id)):
|
|
self.log(f"Skipping duplicate (database): {url_filename}", "info")
|
|
self.downloaded_files.add(media_id)
|
|
if normalized_media_id:
|
|
self.downloaded_files.add(normalized_media_id)
|
|
continue
|
|
|
|
try:
|
|
# Extract info for filename
|
|
if not media_id:
|
|
# Will be set from download filename below
|
|
pass
|
|
if not normalized_media_id:
|
|
normalized_media_id = extract_instagram_media_id(media_id) if media_id else media_id
|
|
profile = self.profile_name or "unknown"
|
|
|
|
# Try to get the date for this item
|
|
post_date = None
|
|
dl_href = element.get_attribute("href") or ""
|
|
if dl_href and dl_href in item_dates:
|
|
date_str_found, post_date = item_dates[dl_href]
|
|
date_str = post_date.strftime('%Y%m%d_%H%M%S')
|
|
self.log(f"Found date for highlight item: {date_str_found}")
|
|
else:
|
|
date_str = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
highlight_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Try browser download, fall back to direct HTTP
|
|
try:
|
|
with page.expect_download(timeout=30000) as download_info:
|
|
element.click(force=True)
|
|
|
|
download = download_info.value
|
|
original_filename = download.suggested_filename
|
|
|
|
if not media_id:
|
|
media_id = self._extract_media_id_from_filename(original_filename)
|
|
normalized_media_id = extract_instagram_media_id(media_id) if media_id else media_id
|
|
ext = Path(original_filename).suffix
|
|
|
|
new_filename = f"{profile}_{date_str}_{media_id}{ext}"
|
|
filepath = highlight_folder / new_filename
|
|
download.save_as(filepath)
|
|
except Exception:
|
|
if not dl_href:
|
|
raise
|
|
self.log(f"Browser download failed, trying direct HTTP download", "debug")
|
|
|
|
if not media_id:
|
|
url_fn = ""
|
|
if "filename=" in dl_href:
|
|
parsed_url = urllib.parse.urlparse(dl_href)
|
|
url_params = urllib.parse.parse_qs(parsed_url.query)
|
|
url_fn = url_params.get('filename', [''])[0]
|
|
if not url_fn:
|
|
url_fn = Path(urllib.parse.urlparse(dl_href).path).name
|
|
media_id = self._extract_media_id_from_filename(url_fn)
|
|
normalized_media_id = extract_instagram_media_id(media_id) if media_id else media_id
|
|
ext = Path(url_fn).suffix if url_fn else '.jpg'
|
|
else:
|
|
ext = '.mp4' if '.mp4' in dl_href else '.jpg'
|
|
|
|
new_filename = f"{profile}_{date_str}_{media_id}{ext}"
|
|
filepath = highlight_folder / new_filename
|
|
resp = requests.get(dl_href, timeout=60, stream=True)
|
|
resp.raise_for_status()
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
# Check for duplicate hash before recording
|
|
file_hash = self.db.get_file_hash(str(filepath)) if self.db else None
|
|
if file_hash:
|
|
existing = self.db.get_download_by_file_hash(file_hash)
|
|
if existing and existing.get('file_path') and str(filepath) != existing.get('file_path'):
|
|
# Duplicate file with same hash exists
|
|
existing_path = Path(existing['file_path'])
|
|
if existing_path.exists():
|
|
self.log(f"⚠ Duplicate file detected: {filepath.name} matches {existing['filename']} from {existing['platform']}/{existing['source']}", "warning")
|
|
# Delete the duplicate and skip to next
|
|
try:
|
|
filepath.unlink()
|
|
self.log(f"Deleted duplicate: {filepath.name}", "debug")
|
|
continue
|
|
except Exception as e:
|
|
self.log(f"Failed to delete duplicate {filepath.name}: {e}", "warning")
|
|
|
|
# Update all timestamps if we have the post date
|
|
if post_date:
|
|
self._update_all_timestamps(filepath, post_date)
|
|
self.log(f"Saved: {highlight_name}/{new_filename} (dated: {post_date.strftime('%Y-%m-%d %H:%M')})")
|
|
else:
|
|
self.log(f"Saved: {highlight_name}/{new_filename}")
|
|
|
|
# Record in database with normalized media_id for cross-module detection
|
|
self._record_download(
|
|
media_id=normalized_media_id or media_id,
|
|
username=self.profile_name,
|
|
content_type="highlights",
|
|
filename=str(filepath),
|
|
download_url=href if href else None,
|
|
post_date=post_date,
|
|
metadata={"highlight_name": highlight_name},
|
|
deferred=self.defer_database
|
|
)
|
|
|
|
# Track downloaded file (both original and normalized)
|
|
self.downloaded_files.add(media_id)
|
|
if normalized_media_id:
|
|
self.downloaded_files.add(normalized_media_id)
|
|
total_downloaded += 1
|
|
|
|
# Use smart delay instead of fixed delay
|
|
self._smart_delay()
|
|
|
|
except Exception as e:
|
|
self.log(f"Error downloading item {j+1} from {highlight_name}: {e}")
|
|
continue
|
|
|
|
# Go back to highlights list for next category
|
|
self.log(f"Finished {highlight_name}, returning to highlights list...")
|
|
|
|
# Try multiple methods to return to highlights list
|
|
returned = False
|
|
|
|
# Method 1: Click highlights tab
|
|
highlights_tab = page.locator("button.tabs-component__button:has-text('highlights')").first
|
|
if highlights_tab and highlights_tab.is_visible():
|
|
self.log("Clicking highlights tab to return to list")
|
|
highlights_tab.click(force=True)
|
|
page.wait_for_timeout(3000)
|
|
|
|
# Check if it worked
|
|
highlight_buttons_check = page.locator("li.highlight button.highlight__button").all()
|
|
if highlight_buttons_check:
|
|
self.log(f"Successfully returned via tab ({len(highlight_buttons_check)} categories)")
|
|
returned = True
|
|
|
|
# Method 2: If tab didn't work, try clicking a different tab then back
|
|
if not returned:
|
|
self.log("Tab click didn't work, trying tab switch...")
|
|
posts_tab = page.locator("button.tabs-component__button:has-text('posts')").first
|
|
if posts_tab and posts_tab.is_visible():
|
|
posts_tab.click(force=True)
|
|
page.wait_for_timeout(2000)
|
|
|
|
highlights_tab = page.locator("button.tabs-component__button:has-text('highlights')").first
|
|
if highlights_tab and highlights_tab.is_visible():
|
|
highlights_tab.click(force=True)
|
|
page.wait_for_timeout(3000)
|
|
|
|
highlight_buttons_check = page.locator("li.highlight button.highlight__button").all()
|
|
if highlight_buttons_check:
|
|
self.log(f"Successfully returned via tab switch ({len(highlight_buttons_check)} categories)")
|
|
returned = True
|
|
|
|
if not returned:
|
|
self.log("ERROR: Could not return to highlights list, stopping")
|
|
break
|
|
|
|
except Exception as e:
|
|
self.log(f"Error processing highlight category {i+1}: {e}")
|
|
continue
|
|
|
|
return total_downloaded
|
|
|
|
def _scroll_to_load_api_posts(self, page, api_responses):
|
|
"""Scroll slowly to trigger paginated /postsV2 API calls.
|
|
|
|
FastDL lazy-loads posts as the user scrolls. The API response listener
|
|
captures each /postsV2 response automatically — we just need to scroll
|
|
to trigger the pagination requests. Stops when no new API responses
|
|
arrive after several scroll attempts, or when posts are older than
|
|
the configured date_from.
|
|
"""
|
|
self.log("Scrolling to load all posts within date range...")
|
|
initial_count = len(api_responses)
|
|
no_new_responses = 0
|
|
scroll_set = 0
|
|
|
|
while no_new_responses < 5:
|
|
old_count = len(api_responses)
|
|
|
|
# Slow, gradual scrolling — 200px at a time, 500ms between
|
|
for _ in range(10):
|
|
page.evaluate("window.scrollBy(0, 200)")
|
|
page.wait_for_timeout(500)
|
|
|
|
# Wait for API response to arrive
|
|
page.wait_for_timeout(3000)
|
|
|
|
new_count = len(api_responses)
|
|
if new_count > old_count:
|
|
self.log(f"Scroll {scroll_set + 1}: captured {new_count - old_count} new API response(s) (total: {new_count})")
|
|
no_new_responses = 0
|
|
scroll_set += 1
|
|
|
|
# Check if the latest postsV2 response has posts older than date_from
|
|
if self.date_from:
|
|
for resp in reversed(api_responses):
|
|
if '/postsV2' not in resp.get('url', ''):
|
|
continue
|
|
body = resp.get('body', {})
|
|
if not isinstance(body, dict) or 'result' not in body:
|
|
continue
|
|
result = body['result']
|
|
if not isinstance(result, dict) or 'edges' not in result:
|
|
continue
|
|
edges = result['edges']
|
|
if not edges:
|
|
continue
|
|
last_edge = edges[-1]
|
|
node = last_edge.get('node', last_edge)
|
|
taken_at = node.get('taken_at_timestamp') or node.get('taken_at', 0)
|
|
if taken_at:
|
|
post_date = datetime.fromtimestamp(taken_at)
|
|
if post_date < self.date_from:
|
|
self.log(f"Reached posts older than date range ({post_date.strftime('%Y-%m-%d')}), stopping scroll")
|
|
total_new = len(api_responses) - initial_count
|
|
self.log(f"Scrolling complete: captured {total_new} additional API response(s)")
|
|
return
|
|
break # Only check the latest postsV2 response
|
|
else:
|
|
no_new_responses += 1
|
|
scroll_set += 1
|
|
|
|
total_new = len(api_responses) - initial_count
|
|
self.log(f"Scrolling complete: captured {total_new} additional API response(s)")
|
|
|
|
def _scroll_to_load_content(self, page):
|
|
"""Scroll to load all lazy-loaded content"""
|
|
self.log("Scrolling to load content...")
|
|
|
|
# Count downloadable items
|
|
initial_count = len(page.locator("a[href*='.jpg'], a[href*='.mp4']").all())
|
|
|
|
no_change_count = 0
|
|
consecutive_old_items = 0
|
|
|
|
# Scroll slowly like you requested - human-like scrolling
|
|
# Highlights may have many items (80+), so increase scrolls
|
|
max_scrolls = 50 if self.content_type == "highlights" else 15
|
|
|
|
for scroll_set in range(max_scrolls):
|
|
old_height = page.evaluate("document.body.scrollHeight")
|
|
|
|
old_count = len(page.locator("a[href*='.jpg'], a[href*='.mp4']").all())
|
|
|
|
# Slow, gradual scrolling - 200px at a time
|
|
for small_scroll in range(10):
|
|
page.evaluate("window.scrollBy(0, 200)")
|
|
page.wait_for_timeout(500) # 0.5 second between small scrolls
|
|
|
|
# Wait for content to load after scrolling
|
|
page.wait_for_timeout(3000) # 3 seconds for new content
|
|
|
|
# Check for new content
|
|
new_height = page.evaluate("document.body.scrollHeight")
|
|
|
|
new_count = len(page.locator("a[href*='.jpg'], a[href*='.mp4']").all())
|
|
|
|
if new_count > old_count:
|
|
self.log(f"Loaded more items: {old_count} → {new_count}")
|
|
no_change_count = 0
|
|
|
|
# Check if we should stop based on dates (for posts/reels with date filtering)
|
|
if self.content_type in ["posts", "reels"] and self.date_from:
|
|
# Check the dates of the last few items
|
|
all_items = page.locator("li.profile-media-list__item").all()
|
|
if len(all_items) >= 10:
|
|
# Check last 10 items for dates
|
|
old_dates_found = 0
|
|
for item in all_items[-10:]:
|
|
time_elem = item.locator("p.media-content__meta-time").first
|
|
if time_elem and time_elem.is_visible():
|
|
date_str = time_elem.get_attribute("title")
|
|
if date_str:
|
|
try:
|
|
date_obj = datetime.strptime(date_str, "%m/%d/%Y, %I:%M:%S %p")
|
|
if date_obj < self.date_from:
|
|
old_dates_found += 1
|
|
except Exception:
|
|
pass
|
|
|
|
# If ALL of the last items are too old, stop scrolling
|
|
# This ensures we don't miss content at the boundary
|
|
if old_dates_found >= 10:
|
|
self.log(f"All {old_dates_found} items in last batch are too old, stopping scroll")
|
|
break
|
|
else:
|
|
no_change_count += 1
|
|
|
|
# If nothing changed for 5 scrolls, stop
|
|
if no_change_count >= 5:
|
|
self.log("No more content loading, stopping scroll")
|
|
break
|
|
|
|
|
|
# Example usage function
|
|
def download_instagram_content(username, content_type="all", output_dir="downloads",
|
|
use_database=True, db_path="fastdl_downloads.db", **kwargs):
|
|
"""
|
|
Simple function to download Instagram content
|
|
|
|
Args:
|
|
username: Instagram username
|
|
content_type: 'posts', 'stories', 'reels', 'highlights', or 'all'
|
|
output_dir: Where to save files
|
|
use_database: Use SQLite database to track downloads (set False to re-download)
|
|
db_path: Path to SQLite database file
|
|
**kwargs: Additional options (max_downloads, days_back, phrase_config, etc.)
|
|
|
|
Returns:
|
|
Number of downloaded items
|
|
"""
|
|
downloader = FastDLDownloader(headless=True, use_database=use_database, db_path=db_path)
|
|
return downloader.download(username, content_type, output_dir, **kwargs)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Example: Download stories for a user
|
|
count = download_instagram_content(
|
|
username="evalongoria",
|
|
content_type="stories",
|
|
output_dir="test_downloads"
|
|
)
|
|
print(f"\nTotal downloaded: {count} items") |