Files
media-downloader/modules/toolzu_module.py
Todd 0d7b2b1aab Initial commit
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 22:42:55 -04:00

1117 lines
47 KiB
Python

#!/usr/bin/env python3
"""
Toolzu Instagram Downloader Module
Downloads Instagram content at 1920x1440 resolution
"""
# Allow nested event loops for compatibility with asyncio contexts
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
from pathlib import Path
from datetime import datetime, timedelta
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
import os
import re
import random
import time
import json
import requests
from modules.base_module import LoggingMixin
from modules.cloudflare_handler import (
CloudflareHandler, SiteStatus, get_flaresolverr_user_agent,
get_playwright_context_options, get_playwright_stealth_scripts
)
from modules.instagram_utils import (
extract_instagram_media_id,
scan_existing_files_for_media_ids,
record_instagram_download,
is_instagram_downloaded
)
class ToolzuDownloader(LoggingMixin):
"""
Toolzu Instagram downloader - provides 1920x1440 resolution downloads
Example usage:
from toolzu_module import ToolzuDownloader
downloader = ToolzuDownloader()
count = downloader.download(
username="evalongoria",
content_type="posts",
output_dir="downloads/posts"
)
print(f"Downloaded {count} items")
"""
def __init__(self, headless=True, show_progress=True, use_database=True,
log_callback=None, unified_db=None,
cookie_file=None, toolzu_email=None, toolzu_password=None):
"""
Initialize the downloader
Args:
headless: Run browser in headless mode
show_progress: Print progress messages
use_database: Use database to track downloads
log_callback: Optional callback function for logging
unified_db: Optional UnifiedDatabase instance
cookie_file: Path to cookie file for session persistence
toolzu_email: Email for Toolzu login (optional, for auto-login)
toolzu_password: Password for Toolzu login (optional, for auto-login)
"""
# Initialize logging via mixin
self._init_logger('Instagram', log_callback, default_module='Download')
self.headless = headless
self.show_progress = show_progress
# Toolzu now uses unified profile page with tabs
self.toolzu_url = 'https://toolzu.com/downloader/instagram/profile/'
self.login_url = 'https://toolzu.com/login'
self.downloaded_files = set()
self.use_database = use_database
self.toolzu_email = toolzu_email
self.toolzu_password = toolzu_password
self.unified_db = unified_db # Store for scraper config access
self.scraper_id = 'toolzu' # Scraper ID in database
# Rate limiting settings
self.min_delay = 5
self.max_delay = 15
self.batch_size = 10
self.batch_delay_min = 30
self.batch_delay_max = 60
self.download_count = 0
self.pending_downloads = [] # Track downloads for deferred database recording
# Use unified database
if unified_db and use_database:
from modules.unified_database import ToolzuDatabaseAdapter
self.db = ToolzuDatabaseAdapter(unified_db)
else:
self.db = None
self.use_database = False
# Initialize activity status manager for real-time updates
from modules.activity_status import get_activity_manager
self.activity_manager = get_activity_manager(unified_db)
# Load scraper configuration from database if available
self.proxy_url = None
self.cookie_file = None # Default to None (use database)
if unified_db:
scraper_config = unified_db.get_scraper(self.scraper_id)
if scraper_config:
# Get proxy configuration
if scraper_config.get('proxy_enabled') and scraper_config.get('proxy_url'):
self.proxy_url = scraper_config['proxy_url']
self.log(f"Using proxy: {self.proxy_url}", "info")
# Fall back to cookie file if no database or if explicitly provided
if not unified_db:
if cookie_file:
self.cookie_file = Path(cookie_file)
else:
self.cookie_file = Path('/opt/media-downloader/cookies/toolzu_cookies.json')
# User-Agent to match FlareSolverr (dynamically fetched for consistency)
self.user_agent = get_flaresolverr_user_agent()
# Initialize universal Cloudflare handler
# Pass proxy_url if configured, and cookie_file=None for database storage
self.cf_handler = CloudflareHandler(
module_name="Toolzu",
cookie_file=str(self.cookie_file) if self.cookie_file else None,
user_agent=self.user_agent,
logger=self.logger,
aggressive_expiry=True,
proxy_url=self.proxy_url # Pass proxy to FlareSolverr
)
# Keep for backwards compatibility
self.flaresolverr_url = self.cf_handler.flaresolverr_url
self.flaresolverr_enabled = self.cf_handler.flaresolverr_enabled
# Load cookies from database if available
self._load_cookies_from_db()
def _load_cookies_from_db(self):
"""Load cookies from database if available"""
if not self.unified_db:
return
try:
cookies = self.unified_db.get_scraper_cookies(self.scraper_id)
if cookies:
# Load into CloudflareHandler
self.cf_handler._cookies = cookies
self.log(f"Loaded {len(cookies)} cookies from database", "debug")
except Exception as e:
self.log(f"Error loading cookies from database: {e}", "warning")
def _save_cookies_to_db(self, cookies: list, user_agent: str = None):
"""Save cookies to database
Args:
cookies: List of cookie dictionaries
user_agent: User agent to associate with cookies (important for cf_clearance).
If not provided, uses self.user_agent as fallback.
"""
if not self.unified_db:
return
try:
# Use provided user_agent or fall back to self.user_agent
ua = user_agent or self.user_agent
self.unified_db.save_scraper_cookies(
self.scraper_id,
cookies,
user_agent=ua,
merge=True
)
self.log(f"Saved {len(cookies)} cookies to database (UA: {ua[:50]}...)", "debug")
except Exception as e:
self.log(f"Error saving cookies to database: {e}", "warning")
def _has_valid_cookies(self):
"""Check if we have valid cookies (either in file or database)"""
if self.unified_db:
cookies = self.unified_db.get_scraper_cookies(self.scraper_id)
return cookies and len(cookies) > 0
elif self.cookie_file:
return self.cookie_file.exists()
return False
def _cookies_expired(self):
"""Check if cookies are expired - delegates to CloudflareHandler"""
return self.cf_handler.cookies_expired()
def _get_cookies_for_requests(self):
"""Get cookies in format for requests library - delegates to CloudflareHandler"""
return self.cf_handler.get_cookies_dict()
def _get_cookies_via_flaresolverr(self, url="https://toolzu.com/", max_retries=2):
"""Use FlareSolverr to bypass Cloudflare - delegates to CloudflareHandler
Args:
url: URL to fetch
max_retries: Maximum number of retry attempts (default: 2)
Returns:
True if cookies obtained successfully, False otherwise
"""
success = self.cf_handler.get_cookies_via_flaresolverr(url, max_retries)
# Save cookies to database if successful
if success and self.unified_db:
cookies_list = self.cf_handler.get_cookies_list()
if cookies_list:
# CRITICAL: Get the user_agent from FlareSolverr solution, not self.user_agent
# cf_clearance cookies are fingerprinted to the browser that solved the challenge
flaresolverr_ua = self.cf_handler.get_user_agent()
self._save_cookies_to_db(cookies_list, user_agent=flaresolverr_ua)
return success
def _smart_delay(self):
"""Implement smart delays with randomization"""
self.download_count += 1
if self.download_count % self.batch_size == 0:
delay = random.uniform(self.batch_delay_min, self.batch_delay_max)
self.log(f"Batch delay: waiting {delay:.1f} seconds", "debug")
else:
delay = random.uniform(self.min_delay, self.max_delay)
self.log(f"Waiting {delay:.1f} seconds", "debug")
time.sleep(delay)
def _load_cookies(self, context):
"""Load cookies from database or file into browser context"""
# Try loading from database first
if self.unified_db:
try:
cookies = self.unified_db.get_scraper_cookies(self.scraper_id)
if cookies:
# Clean cookies - remove unsupported properties and convert expiry->expires
cleaned_cookies = []
for cookie in cookies:
cleaned = {k: v for k, v in cookie.items()
if k not in ['partitionKey', '_crHasCrossSiteAncestor']}
# FlareSolverr uses 'expiry' but Playwright uses 'expires'
if 'expiry' in cleaned and 'expires' not in cleaned:
cleaned['expires'] = cleaned.pop('expiry')
cleaned_cookies.append(cleaned)
# CRITICAL: Clear existing cookies first to ensure new cf_clearance takes effect
try:
context.clear_cookies()
except Exception:
pass
context.add_cookies(cleaned_cookies)
self.log(f"Loaded {len(cleaned_cookies)} cookies from database", "info")
return
except Exception as e:
self.log(f"Error loading cookies from database: {e}", "warning")
# Fallback to file-based cookies
if not self.cookie_file or not self.cookie_file.exists():
self.log("No saved cookies found", "debug")
return
try:
import json
with open(self.cookie_file, 'r') as f:
data = json.load(f)
cookies = data.get('cookies', [])
if cookies:
# Convert expiry->expires for Playwright compatibility
cleaned_cookies = []
for cookie in cookies:
cleaned = dict(cookie)
if 'expiry' in cleaned and 'expires' not in cleaned:
cleaned['expires'] = cleaned.pop('expiry')
cleaned_cookies.append(cleaned)
# CRITICAL: Clear existing cookies first
try:
context.clear_cookies()
except Exception:
pass
context.add_cookies(cleaned_cookies)
self.log(f"Loaded {len(cleaned_cookies)} cookies from file", "info")
except Exception as e:
self.log(f"Failed to load cookies: {e}", "warning")
def _save_cookies(self, context):
"""Save cookies to database or file"""
try:
import json
cookies = context.cookies()
# Save to database if available
if self.unified_db:
try:
# CRITICAL: Include user_agent for cf_clearance cookies to work
self.unified_db.save_scraper_cookies(
self.scraper_id,
cookies,
user_agent=self.user_agent,
merge=True
)
self.log(f"Saved {len(cookies)} cookies to database", "debug")
return
except Exception as e:
self.log(f"Error saving cookies to database: {e}", "warning")
# Fallback to file-based storage
if self.cookie_file:
# Ensure directory exists
self.cookie_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.cookie_file, 'w') as f:
json.dump({'cookies': cookies}, f, indent=2)
self.log(f"Saved {len(cookies)} cookies to file", "debug")
except Exception as e:
self.log(f"Failed to save cookies: {e}", "warning")
def login(self, page, context):
"""
Log in to Toolzu using provided credentials
Args:
page: Playwright page object
context: Browser context for saving cookies
Returns:
True if login successful, False otherwise
"""
if not self.toolzu_email or not self.toolzu_password:
self.log("No Toolzu credentials provided, cannot auto-login", "warning")
return False
try:
self.log("Attempting to log in to Toolzu...")
# Navigate to login page
page.goto(self.login_url, wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(2000)
# Fill in email
email_input = page.locator("#loginform-email").first
if not email_input.is_visible():
self.log("Login form not found", "error")
return False
self.log(f"Filling in email: {self.toolzu_email}")
email_input.fill(self.toolzu_email)
page.wait_for_timeout(500)
# Fill in password
password_input = page.locator("#loginform-password").first
password_input.fill(self.toolzu_password)
page.wait_for_timeout(500)
# Handle reCAPTCHA v3 if present
try:
# Wait a bit for reCAPTCHA to execute
page.wait_for_timeout(2000)
# Check if reCAPTCHA token field exists and is populated
recaptcha_field = page.locator("#loginform-recaptcha").first
if recaptcha_field:
recaptcha_value = recaptcha_field.get_attribute("value")
if recaptcha_value:
self.log("reCAPTCHA v3 token detected", "debug")
else:
self.log("reCAPTCHA v3 token not populated yet, waiting...", "debug")
page.wait_for_timeout(3000)
except Exception:
pass
# Submit the form
submit_button = page.locator("button[type='submit'], button:has-text('Log in')").first
if submit_button.is_visible():
self.log("Submitting login form...")
submit_button.click()
else:
# Try pressing Enter on password field
password_input.press("Enter")
# Wait for navigation or error
page.wait_for_timeout(5000)
# Check if login was successful
# Success: redirected away from login page or see user menu
current_url = page.url
if "/login" not in current_url or page.locator("a:has-text('Log out'), .user-menu, .dropdown-toggle").first.is_visible():
self.log("Login successful!", "success")
# Save cookies with login session
self._save_cookies(context)
return True
else:
# Check for error messages
error_msg = page.locator(".alert-danger, .help-block-error, .invalid-feedback").first
if error_msg.is_visible():
error_text = error_msg.inner_text()
self.log(f"Login failed: {error_text}", "error")
else:
self.log("Login failed (still on login page)", "error")
return False
except Exception as e:
self.log(f"Login error: {e}", "error")
import traceback
self.log(traceback.format_exc(), "debug")
return False
def _check_if_login_needed(self, page):
"""
Check if we need to log in (e.g., hit download limit, session expired)
Args:
page: Playwright page object
Returns:
True if login is needed, False otherwise
"""
try:
# Check for download limit message
limit_msg = page.locator("text=EXCEEDED THE LIMIT, text=login to continue, text=sign in").first
if limit_msg.is_visible():
self.log("Download limit detected, login required", "info")
return True
# Check if redirected to login page
if "/login" in page.url:
self.log("Redirected to login page", "info")
return True
return False
except Exception:
return False
def _extract_timestamp_from_url(self, url):
"""
Extract timestamp from Toolzu thumbnail URL query parameter
NOTE: Toolzu does NOT provide actual post dates anywhere on the page.
The 'time=' parameter in thumbnail URLs is the page load time, not post date.
This method returns None - download time will be used as fallback.
Args:
url: Toolzu thumbnail URL with time parameter
Returns:
None (Toolzu doesn't provide reliable post dates)
"""
# Don't extract timestamps from Toolzu - they're page load times, not post dates
return None
def _extract_media_id_from_url(self, url):
"""
Extract media ID from Instagram CDN URL
Args:
url: Instagram CDN URL
Returns:
Media ID string
"""
# Pattern: number_MEDIAID_number_n.jpg
pattern = r'(\d+)_(\d{17,19})_\d+_n\.(jpg|mp4)'
match = re.search(pattern, url)
if match:
return match.group(2) # Return the media ID
# Fallback: extract from filename
try:
filename = url.split('/')[-1].split('?')[0]
return Path(filename).stem
except Exception:
return None
def _is_already_downloaded(self, media_id):
"""Check if media_id was already downloaded by ANY Instagram downloader (uses centralized function)"""
if not self.use_database:
return False
try:
# Use centralized function for consistent cross-module detection
return is_instagram_downloaded(self.db.db if hasattr(self.db, 'db') else self.db, media_id)
except Exception as e:
self.log(f"Error checking database for {media_id}: {e}", "error")
return False # Don't skip on error - try to download
def _record_download(self, media_id, username, content_type, filename,
download_url=None, post_date=None, metadata=None, deferred=False):
"""Record download in database (uses centralized function)
Args:
deferred: If True, don't record to database now - add to pending_downloads list
for later recording after file move is complete
"""
# If deferred, store for later recording instead of recording now
if deferred:
file_path = str(filename) # Full path
filename_only = Path(filename).name # Just the filename
self.pending_downloads.append({
'media_id': media_id,
'username': username,
'filename': filename_only,
'url': download_url,
'post_date': post_date.isoformat() if post_date else None,
'file_path': file_path,
'content_type': content_type,
'metadata': metadata
})
self.log(f"Deferred recording for {media_id}", "debug")
return True
if not self.use_database:
return
# Extract just the filename from the full path for database
file_path = str(filename) # Full path
filename_only = Path(filename).name # Just the filename
try:
# Use centralized function for consistent cross-module storage
result = record_instagram_download(
db=self.db.db if hasattr(self.db, 'db') else self.db,
media_id=media_id,
username=username,
content_type=content_type,
filename=filename_only,
download_url=download_url,
post_date=post_date,
file_path=file_path,
method='toolzu',
extra_metadata=metadata or {}
)
if result:
self.log(f"Recorded download for {media_id}", "debug")
else:
self.log(f"Failed to record download for {media_id} (possibly duplicate)", "debug")
except Exception as e:
self.log(f"Failed to record download: {e}", "warning")
def get_pending_downloads(self):
"""Get list of downloads that were deferred for later recording"""
return self.pending_downloads.copy()
def clear_pending_downloads(self):
"""Clear the pending downloads list after they've been recorded"""
self.pending_downloads = []
def _update_file_timestamps(self, filepath, post_date):
"""Update file timestamps to match post date"""
if not post_date:
return
timestamp = post_date.timestamp()
try:
os.utime(filepath, (timestamp, timestamp))
self.log(f"Updated timestamps to {post_date.strftime('%Y-%m-%d %H:%M:%S')}", "debug")
except Exception as e:
self.log(f"Failed to update timestamps: {e}", "debug")
def download(self, username, content_type="posts", output_dir="downloads",
max_downloads=None, days_back=None, date_from=None, date_to=None,
defer_database=False):
"""
Download content from Instagram via Toolzu
Args:
username: Instagram username
content_type: 'posts' or 'stories' (Toolzu doesn't support reels)
output_dir: Directory to save downloads
max_downloads: Maximum number of items to download
days_back: Number of days back to download
date_from: Start date for range
date_to: End date for range
defer_database: If True, don't record to database immediately - store in
pending_downloads for later recording after file move is complete
Returns:
Number of successfully downloaded items
"""
# Clear downloaded_files cache between accounts to prevent memory growth
self.downloaded_files.clear()
# Check site status before doing anything else
self.log("Checking Toolzu site status...", "debug")
site_status, error_msg = self.cf_handler.check_site_status("https://toolzu.com/", timeout=10)
if self.cf_handler.should_skip_download(site_status):
self.log(f"Skipping download - Toolzu is unavailable: {error_msg}", "warning")
return 0
elif site_status == SiteStatus.CLOUDFLARE_CHALLENGE:
self.log("Cloudflare challenge detected, will attempt bypass during download", "info")
# Validate content type
if content_type not in ['posts', 'stories']:
self.log(f"Toolzu only supports 'posts' and 'stories', not '{content_type}'", "warning")
return 0
self.username = username
self.content_type = content_type
self.output_dir = Path(output_dir)
self.max_downloads = max_downloads
self.profile_name = username.lower()
self.defer_database = defer_database # Store for deferred recording
# Setup date filtering
self._setup_date_filtering(days_back, date_from, date_to)
# Scan existing files
self._scan_existing_files()
# Run download
return self._run_download()
def _setup_date_filtering(self, days_back, date_from, date_to):
"""Setup date range for filtering"""
self.date_from = None
self.date_to = None
if date_from:
if isinstance(date_from, str):
self.date_from = datetime.strptime(date_from, "%Y-%m-%d")
else:
self.date_from = date_from
if date_to:
if isinstance(date_to, str):
self.date_to = datetime.strptime(date_to, "%Y-%m-%d")
else:
self.date_to = date_to
if days_back and not self.date_from:
now = datetime.now()
self.date_to = datetime(now.year, now.month, now.day, 23, 59, 59)
self.date_from = (now - timedelta(days=days_back-1)).replace(hour=0, minute=0, second=0)
self.log(f"Downloading content from last {days_back} days ({self.date_from.strftime('%Y-%m-%d')} to {self.date_to.strftime('%Y-%m-%d')})")
def _scan_existing_files(self):
"""Scan existing files to avoid re-downloading"""
self.downloaded_files = scan_existing_files_for_media_ids(self.output_dir, self.profile_name)
if self.downloaded_files:
self.log(f"Found {len(self.downloaded_files)} existing media IDs, will skip duplicates")
def _run_download(self):
"""Run the actual download process"""
success_count = 0
# Update activity status
self.activity_manager.update_status(f"Checking {self.content_type}")
# Try to get fresh cookies via FlareSolverr if we don't have them or they're old
if not self.cookie_file.exists() or self._cookies_expired():
self.log("Cookies missing or expired, attempting FlareSolverr bypass...", "info")
if self._get_cookies_via_flaresolverr():
self.log("Successfully got fresh cookies from FlareSolverr", "info")
else:
self.log("FlareSolverr unavailable, will try with Playwright", "warning")
# Set Playwright browser path and display
import os
# Use environment variable if set, otherwise use standard location
if 'PLAYWRIGHT_BROWSERS_PATH' not in os.environ:
os.environ['PLAYWRIGHT_BROWSERS_PATH'] = '/root/.cache/ms-playwright'
os.environ['DISPLAY'] = ':100' # Use Xvfb virtual display
os.environ['HOME'] = '/root' # Fix Firefox launch as root
if 'XAUTHORITY' in os.environ:
del os.environ['XAUTHORITY'] # Remove user's XAUTHORITY
with sync_playwright() as p:
browser = p.firefox.launch(
headless=self.headless,
firefox_user_prefs={
# Disable automation indicators
'dom.webdriver.enabled': False,
'useAutomationExtension': False,
'general.platform.override': 'Win32',
'general.appversion.override': '5.0 (Windows)',
'general.oscpu.override': 'Windows NT 10.0; Win64; x64'
}
)
# CRITICAL: Browser fingerprint must match FlareSolverr for cookies to work
# Get dynamic fingerprint settings (Firefox doesn't use Sec-Ch-Ua headers)
context_options = get_playwright_context_options()
# Firefox-specific: remove Chrome-specific headers
if 'extra_http_headers' in context_options:
context_options['extra_http_headers'] = {
'Accept-Language': context_options['extra_http_headers'].get('Accept-Language', 'en-US,en;q=0.9')
}
context_options['ignore_https_errors'] = True
# IMPORTANT: If cookies have a stored user_agent, use THAT user_agent
# Cloudflare cf_clearance cookies are fingerprinted to the browser that solved the challenge
try:
if self.unified_db:
stored_user_agent = self.unified_db.get_scraper_cookies_user_agent(self.scraper_id)
if stored_user_agent:
self.log(f"Using stored cookie user_agent: {stored_user_agent[:50]}...", "debug")
context_options['user_agent'] = stored_user_agent
else:
self.log(f"Using fingerprint: UA={context_options['user_agent'][:50]}...", "debug")
else:
self.log(f"Using fingerprint: UA={context_options['user_agent'][:50]}...", "debug")
except Exception as e:
self.log(f"Error getting stored user_agent, using default: {e}", "debug")
context = browser.new_context(**context_options)
# Load cookies for session persistence
self._load_cookies(context)
page = context.new_page()
# Add comprehensive anti-detection scripts
page.add_init_script(get_playwright_stealth_scripts())
try:
# Navigate to Toolzu profile page
self.log(f"Navigating to Toolzu profile downloader")
page.goto(self.toolzu_url, wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(2000)
# Fill in the download box with username
try:
# Look for input box
input_selector = "input[name='profile'], input[type='text'], input.form-control"
input_box = page.locator(input_selector).first
if input_box.is_visible():
self.log(f"Filling in username: @{self.username}")
input_box.fill(f"@{self.username}")
page.wait_for_timeout(500)
# Submit form
submit_button = page.locator("button[type='submit'], button:has-text('Download'), .btn-primary").first
if submit_button.is_visible():
self.log("Submitting form...")
submit_button.click()
page.wait_for_timeout(5000) # Wait for page to load
else:
# Try pressing Enter
input_box.press("Enter")
page.wait_for_timeout(5000)
else:
self.log("Input box not found", "error")
return 0
except Exception as e:
self.log(f"Form submission error: {e}", "error")
return 0
# Wait for page to stabilize after form submission
page.wait_for_timeout(3000)
# Check if page loaded results (should have nav tabs or download cards)
try:
page.wait_for_selector("#pills-tab, .download-card, #photo-tab", timeout=15000)
self.log("Results page loaded", "debug")
except Exception:
self.log("Results page didn't load - may be blocked by reCAPTCHA", "warning")
# Take screenshot for debugging
try:
page.screenshot(path="/tmp/toolzu_blocked.png")
self.log("Screenshot saved to /tmp/toolzu_blocked.png", "debug")
except Exception:
pass
return 0
# If downloading stories, click the Stories tab
if self.content_type == 'stories':
self.log("Clicking Stories tab...")
try:
# Wait for the nav tabs to load first
page.wait_for_selector("#stories-tab", timeout=30000)
stories_tab = page.locator("#stories-tab").first
if stories_tab.is_visible():
# Click and wait for AJAX navigation
stories_tab.click()
self.log("Waiting for Stories AJAX content to load...")
# Wait for the stories tab to become active
page.wait_for_selector("#stories-tab.active", timeout=10000)
# Wait for the stories content div to be visible
page.wait_for_selector("#stories.active", timeout=10000)
# Wait a bit more for AJAX to populate content
page.wait_for_timeout(3000)
# Verify stories cards loaded
try:
page.wait_for_selector("#stories .download-card", timeout=30000) # 30 seconds for AJAX
download_cards_count = len(page.locator("#stories .download-card").all())
self.log(f"Found {download_cards_count} download cards in Stories tab", "debug")
self.log("Stories tab loaded successfully")
except PlaywrightTimeout:
# Check if we hit Toolzu's download limit
if self._check_if_login_needed(page):
self.log("Download limit reached, attempting auto-login...", "info")
if self.login(page, context):
# Login successful, retry the download
self.log("Retrying download after login...")
page.goto(self.toolzu_url, wait_until="domcontentloaded", timeout=30000)
# Continue with the download flow by not returning
# (let it fall through to retry)
else:
self.log("Auto-login failed, cannot continue", "error")
return 0
else:
self.log("No stories found in Stories tab (or loading timed out)", "warning")
return 0
else:
self.log("Stories tab not found", "error")
return 0
except Exception as e:
self.log(f"Failed to click Stories tab: {e}", "error")
return 0
else:
# For posts, wait for content to load
try:
page.wait_for_selector(".download-card", timeout=120000) # 2 minutes for reCAPTCHA
self.log("Content loaded successfully")
except PlaywrightTimeout:
self.log("Timeout waiting for content (reCAPTCHA may have failed)", "warning")
# Check if there's an actual error message
error_msg = page.locator(".alert-danger, .error-message, .alert-warning").first
if error_msg.is_visible():
error_text = error_msg.inner_text()
self.log(f"Error on page: {error_text}", "error")
# Download content (no tab navigation needed - different URLs per type)
success_count = self._download_content(page, context)
# Save cookies after successful download
self._save_cookies(context)
except Exception as e:
self.log(f"Error: {e}", "error")
finally:
try:
# Save cookies even on error (to preserve session)
self._save_cookies(context)
context.close()
browser.close()
self.log("Browser closed", "debug")
except Exception:
pass
return success_count
# Note: _navigate_to_content_type() removed - no longer needed
# Toolzu uses separate URLs for posts and stories, not tabs
def _download_content(self, page, context):
"""Download content from the page"""
success_count = 0
# Determine the correct selector based on content type
if self.content_type == 'stories':
# Only look in the Stories tab content
card_selector = "#stories .download-card"
self.log("Looking for stories in #stories tab...")
else:
# Look in the default Photos & videos tab
card_selector = ".download-card"
# Scroll to load all content
self.log("Scrolling to load all content...")
self._scroll_to_load_content(page, card_selector)
# Find all download cards
download_cards = page.locator(card_selector).all()
if not download_cards:
self.log("No download cards found")
return 0
self.log(f"Found {len(download_cards)} items to download")
# Extract all download info BEFORE starting downloads
# (clicking downloads can change page state and invalidate element references)
download_items = []
for i, card in enumerate(download_cards, 1):
try:
# Get download link
download_link = card.locator("a[download]").first
if not download_link or not download_link.is_visible():
continue
download_url = download_link.get_attribute("href")
if not download_url:
continue
# Extract media ID
media_id = self._extract_media_id_from_url(download_url)
if not media_id:
continue
download_items.append({
'download_url': download_url,
'media_id': media_id,
'index': i
})
except Exception as e:
self.log(f"Error extracting info from card {i}: {e}", "debug")
continue
if not download_items:
self.log("No valid download links found")
return 0
self.log(f"Extracted {len(download_items)} valid download links")
# Limit downloads (default 15 for daily checks)
if self.max_downloads:
download_items = download_items[:self.max_downloads]
self.log(f"Limited to {len(download_items)} items")
elif len(download_items) > 15:
# Default limit: only check 15 most recent posts
download_items = download_items[:15]
self.log(f"Limited to {len(download_items)} items (default for frequent checks)")
consecutive_old_posts = 0
# Set initial progress so dashboard shows 0/N immediately
self.activity_manager.update_status(
f"Downloading {self.content_type}",
progress_current=0,
progress_total=len(download_items)
)
# Now download each item
for item_idx, item in enumerate(download_items):
i = item['index']
download_url = item['download_url']
media_id = item['media_id']
# Update progress at start of each iteration (fires even on skips)
self.activity_manager.update_status(
f"Downloading {self.content_type}",
progress_current=item_idx + 1,
progress_total=len(download_items)
)
try:
# Check for duplicates - check both original and normalized media ID
normalized_media_id = extract_instagram_media_id(media_id)
if media_id in self.downloaded_files or normalized_media_id in self.downloaded_files:
self.log(f"[{i}/{len(download_items)}] Skipping duplicate (session): {media_id}")
continue
if self._is_already_downloaded(media_id) or (normalized_media_id != media_id and self._is_already_downloaded(normalized_media_id)):
self.log(f"[{i}/{len(download_items)}] Skipping duplicate (database): {media_id}")
self.downloaded_files.add(media_id)
self.downloaded_files.add(normalized_media_id)
continue
# Determine file extension
ext = ".jpg" if ".jpg" in download_url else ".mp4" if ".mp4" in download_url else ".jpg"
# Create filename (no post_date from Toolzu)
date_str = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{self.profile_name}_{date_str}_{media_id}{ext}"
# Create username subdirectory for organization
user_output_dir = self.output_dir / self.profile_name
user_output_dir.mkdir(parents=True, exist_ok=True)
filepath = user_output_dir / filename
# Download file using context.request (avoids stale element and navigation issues)
try:
# Use Playwright's request API to download directly with retry
max_retries = 2
for attempt in range(max_retries):
try:
response = context.request.get(download_url, timeout=60000) # 60 second timeout
if response.ok:
# Save the downloaded content
with open(filepath, 'wb') as f:
f.write(response.body())
break
else:
if attempt < max_retries - 1:
self.log(f"[{i}/{len(download_items)}] HTTP {response.status}, retrying...", "warning")
time.sleep(3)
else:
self.log(f"[{i}/{len(download_items)}] Download failed: HTTP {response.status}", "error")
continue
except Exception as retry_error:
if attempt < max_retries - 1:
self.log(f"[{i}/{len(download_items)}] Download error, retrying: {retry_error}", "warning")
time.sleep(3)
else:
raise
# Check for duplicate hash before recording (hash blacklist persists even if original deleted)
from pathlib import Path
file_hash = self.db.get_file_hash(str(filepath)) if self.db else None
if file_hash:
existing = self.db.get_download_by_file_hash(file_hash)
if existing and existing.get('file_path') and str(filepath) != existing.get('file_path'):
# Duplicate hash found - content was already downloaded (prevents redownload of deleted content)
self.log(f"⚠ Duplicate content detected (hash match): {filename} matches {existing['filename']} from {existing['platform']}/{existing['source']}", "warning")
# Delete the duplicate regardless of whether original file still exists
try:
filepath.unlink()
self.log(f"Deleted duplicate (hash blacklist): {filename}", "debug")
continue
except Exception as e:
self.log(f"Failed to delete duplicate {filename}: {e}", "warning")
# Record in database with normalized media_id for cross-module detection
self._record_download(
media_id=normalized_media_id,
username=self.profile_name,
content_type=self.content_type,
filename=str(filepath),
download_url=download_url,
post_date=None,
metadata={'resolution': '1920x1440'},
deferred=self.defer_database
)
self.downloaded_files.add(media_id)
self.downloaded_files.add(normalized_media_id)
success_count += 1
self.log(f"✓ [{i}/{len(download_items)}] Saved: {filename}", "success")
# Smart delay between downloads
if i < len(download_items):
self._smart_delay()
except PlaywrightTimeout:
self.log(f"[{i}/{len(download_items)}] Download timeout", "error")
continue
except Exception as e:
self.log(f"[{i}/{len(download_items)}] Download error: {e}", "error")
continue
except Exception as e:
self.log(f"[{i}/{len(download_items)}] Error processing item: {e}", "error")
continue
return success_count
def _scroll_to_load_content(self, page, card_selector=".download-card"):
"""Scroll to load all lazy-loaded content"""
no_change_count = 0
max_scrolls = 15
for scroll_set in range(max_scrolls):
old_count = len(page.locator(card_selector).all())
# Slow, gradual scrolling
for small_scroll in range(5):
page.evaluate("window.scrollBy(0, 200)")
page.wait_for_timeout(500)
page.wait_for_timeout(2000)
new_count = len(page.locator(card_selector).all())
if new_count > old_count:
self.log(f"Loaded more items: {old_count}{new_count}", "debug")
no_change_count = 0
else:
no_change_count += 1
if no_change_count >= 3:
self.log("No more content loading", "debug")
break
def download_instagram_content(username, content_type="posts", output_dir="downloads",
use_database=True, **kwargs):
"""
Simple function to download Instagram content via Toolzu
Args:
username: Instagram username
content_type: 'posts', 'stories', 'reels', or 'all'
output_dir: Where to save files
use_database: Use database to track downloads
**kwargs: Additional options
Returns:
Number of downloaded items
"""
downloader = ToolzuDownloader(headless=True, use_database=use_database)
return downloader.download(username, content_type, output_dir, **kwargs)
if __name__ == "__main__":
# Example: Download posts for a user
count = download_instagram_content(
username="evalongoria",
content_type="posts",
output_dir="test_downloads",
days_back=3,
max_downloads=15 # Only check 15 most recent (runs every 4 hours)
)
print(f"\nTotal downloaded: {count} items")