#!/usr/bin/env python3 """ Comprehensive Perceptual Duplicate Detection Scan Scans ALL Instagram files from last 3 days: - Files in database (even if moved) - Files in recycle bin - Files in all locations Reports what would be considered duplicates WITHOUT actually moving anything. """ import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from modules.unified_database import UnifiedDatabase from modules.instagram_perceptual_duplicate_detector import InstagramPerceptualDuplicateDetector import json from datetime import datetime, timedelta from collections import defaultdict import os class DryRunLogger: """Logger that captures all messages""" def __init__(self): self.messages = [] def __call__(self, msg, level): self.messages.append((level, msg)) # Only print important messages to reduce clutter if level in ['info', 'success', 'warning', 'error']: print(f"[{level.upper()}] {msg}") def get_all_instagram_files(db, days=3): """Get all Instagram files from multiple sources""" print("Collecting all Instagram files from multiple sources...") print("-" * 80) all_files = {} # Use dict to deduplicate by path # 1. Get files from database print("\n1. Scanning database records...") with db.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT filename, source, file_path, file_hash, download_date, content_type FROM downloads WHERE platform = 'instagram' AND download_date > datetime('now', ?) AND file_path IS NOT NULL AND file_path NOT LIKE '%_phrase_checked_%' AND file_path NOT LIKE '%_old_post_%' AND file_path NOT LIKE '%_skipped%' ORDER BY source, download_date """, (f'-{days} days',)) db_files = 0 existing_db_files = 0 for row in cursor.fetchall(): db_files += 1 file_path = row[2] if file_path and Path(file_path).exists(): existing_db_files += 1 all_files[file_path] = { 'filename': row[0], 'source': row[1], 'file_path': file_path, 'file_hash': row[3], 'download_date': row[4], 'content_type': row[5] or 'unknown', 'location': 'database' } print(f" Found {db_files} database records, {existing_db_files} files still exist") # 2. Scan recycle bin directory print("\n2. Scanning recycle bin directory...") recycle_path = Path('/opt/immich/recycle') recycle_files = 0 if recycle_path.exists(): # Get all media files from last N days cutoff_time = datetime.now().timestamp() - (days * 24 * 60 * 60) for ext in ['*.mp4', '*.jpg', '*.jpeg', '*.webp', '*.png', '*.heic']: for file_path in recycle_path.rglob(ext): # Check modification time if file_path.stat().st_mtime > cutoff_time: recycle_files += 1 file_path_str = str(file_path) # Try to extract source from filename (Instagram format: source_date_...) filename = file_path.name source = 'unknown' # Try to match Instagram filename pattern import re match = re.match(r'^([a-z0-9._]+)_\d{8}', filename.lower()) if match: source = match.group(1) if file_path_str not in all_files: all_files[file_path_str] = { 'filename': filename, 'source': source, 'file_path': file_path_str, 'file_hash': None, 'download_date': datetime.fromtimestamp(file_path.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'content_type': 'unknown', 'location': 'recycle_bin' } print(f" Found {recycle_files} media files in recycle bin") # 3. Scan immich upload/review directories print("\n3. Scanning immich directories...") immich_files = 0 for base_path in ['/opt/immich/upload', '/opt/immich/review']: base = Path(base_path) if base.exists(): for ext in ['*.mp4', '*.jpg', '*.jpeg', '*.webp', '*.png', '*.heic']: for file_path in base.rglob(ext): # Check modification time if file_path.stat().st_mtime > cutoff_time: # Check if looks like Instagram file if 'instagram' in str(file_path).lower(): immich_files += 1 file_path_str = str(file_path) if file_path_str not in all_files: filename = file_path.name source = 'unknown' # Extract source from filename import re match = re.match(r'^([a-z0-9._]+)_\d{8}', filename.lower()) if match: source = match.group(1) all_files[file_path_str] = { 'filename': filename, 'source': source, 'file_path': file_path_str, 'file_hash': None, 'download_date': datetime.fromtimestamp(file_path.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'content_type': 'unknown', 'location': 'immich' } print(f" Found {immich_files} Instagram files in immich directories") print() print(f"TOTAL UNIQUE FILES TO ANALYZE: {len(all_files)}") print("=" * 80) print() return list(all_files.values()) def main(): print("=" * 80) print("COMPREHENSIVE INSTAGRAM PERCEPTUAL DUPLICATE DETECTION - DRY RUN") print("=" * 80) print() # Initialize database db_path = Path(__file__).parent.parent / 'database' / 'media_downloader.db' db = UnifiedDatabase(str(db_path)) # Get all files from all sources files = get_all_instagram_files(db, days=3) if len(files) == 0: print("No files to analyze!") return # Initialize detector logger = DryRunLogger() detector = InstagramPerceptualDuplicateDetector( unified_db=db, log_callback=logger ) # Settings settings = { 'enabled': False, 'perceptual_hash_threshold': 12, 'text_detection_enabled': True, 'clean_score_weight': 3, 'quality_score_weight': 1, 'min_text_difference': 5 } print(f"Settings:") print(f" - Perceptual hash threshold: {settings['perceptual_hash_threshold']}") print(f" - Clean score weight: {settings['clean_score_weight']}") print(f" - Quality score weight: {settings['quality_score_weight']}") print() # Process each file print("Analyzing files (this may take a while)...") print("-" * 80) file_data = [] processed = 0 skipped = 0 for i, file_info in enumerate(files, 1): file_path = file_info['file_path'] source = file_info['source'] # Progress indicator every 50 files if i % 50 == 0: print(f"Progress: {i}/{len(files)} files processed...") # Calculate perceptual hash phash = detector._calculate_perceptual_hash(file_path) if not phash: skipped += 1 continue # Detect text overlays if settings['text_detection_enabled']: text_count, text_chars = detector._detect_text_overlays(file_path) else: text_count, text_chars = 0, 0 # Get quality metrics quality_metrics = detector._get_quality_metrics(file_path) # Calculate scores clean_score = detector._calculate_clean_score(text_count, text_chars) quality_score = detector._calculate_quality_score(quality_metrics) file_data.append({ 'file_info': file_info, 'phash': phash, 'text_count': text_count, 'text_chars': text_chars, 'clean_score': clean_score, 'quality_score': quality_score, 'quality_metrics': quality_metrics, 'total_score': (clean_score * settings['clean_score_weight']) + (quality_score * settings['quality_score_weight']) }) processed += 1 print() print(f"Analyzed {processed} files successfully, skipped {skipped} files") print() print("=" * 80) print("DUPLICATE DETECTION ANALYSIS") print("=" * 80) print() # Find duplicates by comparing hashes duplicates = [] processed_indices = set() for i, data1 in enumerate(file_data): if i in processed_indices: continue group = [data1] for j, data2 in enumerate(file_data[i+1:], start=i+1): if j in processed_indices: continue # Same source only if data1['file_info']['source'] != data2['file_info']['source']: continue # Calculate Hamming distance distance = detector._hamming_distance(data1['phash'], data2['phash']) if distance <= settings['perceptual_hash_threshold']: group.append(data2) processed_indices.add(j) if len(group) > 1: # Sort by total score (highest first) group.sort(key=lambda x: x['total_score'], reverse=True) duplicates.append(group) processed_indices.add(i) if len(duplicates) == 0: print("✅ No perceptual duplicates found!") print() print("All files are unique or sufficiently different.") return print(f"Found {len(duplicates)} duplicate group(s):") print() total_would_remove = 0 total_size_would_free = 0 for group_num, group in enumerate(duplicates, 1): print(f"\n{'=' * 80}") print(f"DUPLICATE GROUP #{group_num}") print(f"{'=' * 80}") print(f"Source: {group[0]['file_info']['source']}") print(f"Files in group: {len(group)}") print() best = group[0] print(f"✅ WOULD KEEP:") print(f" File: {Path(best['file_info']['file_path']).name}") print(f" Location: {best['file_info']['location']}") print(f" Path: {best['file_info']['file_path']}") print(f" Clean score: {best['clean_score']:.1f}/100 ({best['text_count']} text regions)") print(f" Quality score: {best['quality_score']:.1f}/100 ({best['quality_metrics']['width']}x{best['quality_metrics']['height']}, {best['quality_metrics']['file_size']/1024/1024:.1f}MB)") print(f" Total score: {best['total_score']:.1f}") print() print(f"❌ WOULD REMOVE ({len(group)-1} file(s)):") for data in group[1:]: total_would_remove += 1 total_size_would_free += data['quality_metrics']['file_size'] print(f"\n File: {Path(data['file_info']['file_path']).name}") print(f" Location: {data['file_info']['location']}") print(f" Path: {data['file_info']['file_path']}") print(f" Clean score: {data['clean_score']:.1f}/100 ({data['text_count']} text regions)") print(f" Quality score: {data['quality_score']:.1f}/100 ({data['quality_metrics']['width']}x{data['quality_metrics']['height']}, {data['quality_metrics']['file_size']/1024/1024:.1f}MB)") print(f" Total score: {data['total_score']:.1f}") # Calculate hash distance distance = detector._hamming_distance(best['phash'], data['phash']) print(f" Hash distance from best: {distance}") # Explain why reasons = [] if data['clean_score'] < best['clean_score'] - settings['min_text_difference']: reasons.append(f"More text overlays ({data['text_count']} vs {best['text_count']})") if data['quality_score'] < best['quality_score']: reasons.append(f"Lower quality ({data['quality_metrics']['width']}x{data['quality_metrics']['height']} vs {best['quality_metrics']['width']}x{best['quality_metrics']['height']})") if data['total_score'] < best['total_score']: reasons.append(f"Lower total score ({data['total_score']:.1f} vs {best['total_score']:.1f})") if reasons: print(f" Reason(s): {'; '.join(reasons)}") print() print("=" * 80) print("SUMMARY") print("=" * 80) print(f"Total files analyzed: {processed}") print(f"Duplicate groups found: {len(duplicates)}") print(f"Files that would be kept: {len(duplicates)}") print(f"Files that would be removed: {total_would_remove}") print(f"Storage that would be freed: {total_size_would_free / 1024 / 1024:.1f} MB") print() print("⚠️ NOTE: This is a DRY RUN - no files were actually moved or deleted!") print() if __name__ == '__main__': main()