Initial commit

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Todd
2026-03-29 22:42:55 -04:00
commit 0d7b2b1aab
389 changed files with 280296 additions and 0 deletions

117
tests/test_all_notifications.py Executable file
View File

@@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""
Test All Notification Types
Shows examples of all different notification formats
"""
import sys
import json
from pathlib import Path
from datetime import datetime
# Add modules to path
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / 'modules'))
from modules.pushover_notifier import PushoverNotifier
def load_config():
"""Load configuration from settings.json"""
config_path = Path(__file__).parent / 'config' / 'settings.json'
with open(config_path, 'r') as f:
return json.load(f)
def main():
print("Testing All Pushover Notification Formats...")
print("=" * 60)
# Load config
config = load_config()
pushover_config = config.get('pushover', {})
# Create notifier
notifier = PushoverNotifier(
user_key=pushover_config.get('user_key'),
api_token=pushover_config.get('api_token'),
enabled=True,
default_priority=0
)
tests = [
{
'name': 'Single Story',
'params': {
'platform': 'instagram',
'source': 'evalongoria',
'content_type': 'story',
'count': 1
}
},
{
'name': 'Multiple Stories',
'params': {
'platform': 'instagram',
'source': 'evalongoria',
'content_type': 'story',
'count': 5
}
},
{
'name': 'Multiple Posts with Search',
'params': {
'platform': 'instagram',
'source': 'beautybyelan',
'content_type': 'post',
'count': 3,
'search_term': '@evalongoria, eva longoria'
}
},
{
'name': 'Multiple Reels',
'params': {
'platform': 'instagram',
'source': 'evalongoria',
'content_type': 'reel',
'count': 7
}
},
{
'name': 'TikTok Videos',
'params': {
'platform': 'tiktok',
'source': 'evalongoria',
'content_type': 'video',
'count': 4
}
},
{
'name': 'Forum Images',
'params': {
'platform': 'forum',
'source': 'HQCelebCorner',
'content_type': 'image',
'count': 42,
'search_term': 'Eva Longoria'
}
}
]
print("\nSending test notifications...\n")
for i, test in enumerate(tests, 1):
print(f"{i}. {test['name']}...", end=' ')
success = notifier.notify_download(**test['params'])
print("" if success else "")
# Small delay between notifications
import time
time.sleep(1)
print(f"\n{'=' * 60}")
print(f"Final Stats: {notifier.get_stats()}")
print(f"\nCheck your Pushover app for {len(tests)} notifications!")
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
Test browser reuse across multiple profile downloads
This verifies that Cloudflare challenge is only solved once
"""
import sys
import os
from pathlib import Path
from datetime import datetime
# Add modules directory to path
sys.path.insert(0, str(Path(__file__).parent))
from modules.imginn_module import ImgInnDownloader
def main():
print("=" * 70)
print("Browser Reuse Test - Multiple Profiles")
print("=" * 70)
print(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
print()
print("This test will download from TWO different profiles sequentially")
print("to verify the browser is reused and Cloudflare doesn't re-challenge.")
print()
# Create downloader with HEADLESS browser
downloader = ImgInnDownloader(
api_key="cf57fdb7577ada64d150431d6589c8f4",
headless=True,
show_progress=True,
use_database=False
)
# Test profiles
profiles = ["evalongoria", "kimkardashian"]
try:
for i, username in enumerate(profiles, 1):
print("\n" + "=" * 70)
print(f"PROFILE {i}/{len(profiles)}: @{username}")
print("=" * 70)
files = downloader.download_posts(
username=username,
days_back=14,
max_posts=1
)
if files:
print(f"✅ Downloaded {len(files)} file(s) from @{username}")
for f in files:
print(f" - {Path(f).name}")
else:
print(f"⚠️ No new files from @{username} (may have been downloaded already)")
print("\n" + "=" * 70)
print("TEST COMPLETE")
print("=" * 70)
print()
print("✅ Browser reuse successful!")
print(" - Check logs above: Cloudflare should only be solved ONCE")
print(" - Second profile should say 'Browser already running, reusing...'")
print()
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
except Exception as e:
print(f"\n\n❌ Error: {e}")
import traceback
traceback.print_exc()
finally:
# Explicitly stop browser when done with all profiles
print("\nCleaning up browser...")
downloader._stop_browser()
print("✅ Browser closed")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python3
"""Test forum notification"""
import sys
import json
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / 'modules'))
from modules.pushover_notifier import PushoverNotifier
def load_config():
config_path = Path(__file__).parent / 'config' / 'settings.json'
with open(config_path, 'r') as f:
return json.load(f)
# Load config
config = load_config()
pushover_config = config.get('pushover', {})
# Create notifier
notifier = PushoverNotifier(
user_key=pushover_config.get('user_key'),
api_token=pushover_config.get('api_token'),
enabled=True
)
# Simulate forum download - 12 images from HQCelebCorner
downloads = []
for i in range(12):
downloads.append({
'source': 'HQCelebCorner',
'content_type': 'image',
'filename': None
})
# Send notification
success = notifier.notify_batch_download(
platform='forum',
downloads=downloads,
search_term='Eva Longoria'
)
print(f"Forum notification sent: {'' if success else ''}")
print(f"Stats: {notifier.get_stats()}")

114
tests/test_image_setting.py Executable file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""Test image attachment setting (enable/disable)"""
import sys
import json
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / 'modules'))
from modules.pushover_notifier import PushoverNotifier
def load_config():
config_path = Path(__file__).parent / 'config' / 'settings.json'
with open(config_path, 'r') as f:
return json.load(f)
# Load config
config = load_config()
pushover_config = config.get('pushover', {})
# Test images
test_images = [
"/opt/immich/md/forums/PicturePub/Teri Hatcher & Eva Longoria - Desperate Housewives S08E14_ Get Out of My Life 2012, 80x/638100294_rere-12.jpg",
"/opt/immich/md/forums/PicturePub/Teri Hatcher & Eva Longoria - Desperate Housewives S08E14_ Get Out of My Life 2012, 80x/638100582_rere-194.jpg"
]
# Prepare test downloads
downloads = []
for i in range(3):
downloads.append({
'source': 'evalongoria',
'content_type': 'post',
'filename': f'post_{i}.jpg',
'file_path': test_images[i % len(test_images)]
})
print("=" * 60)
print("Test 1: Image Thumbnails ENABLED")
print("=" * 60)
# Create notifier with images enabled
notifier1 = PushoverNotifier(
user_key=pushover_config.get('user_key'),
api_token=pushover_config.get('api_token'),
enabled=True,
include_image=True # ENABLED
)
print("Sending notification WITH image attachment...")
success1 = notifier1.notify_batch_download(
platform='instagram',
downloads=downloads,
search_term=None
)
print(f"{'' if success1 else ''} Notification sent (with image): {success1}")
print()
print("=" * 60)
print("Test 2: Image Thumbnails DISABLED")
print("=" * 60)
# Create notifier with images disabled
notifier2 = PushoverNotifier(
user_key=pushover_config.get('user_key'),
api_token=pushover_config.get('api_token'),
enabled=True,
include_image=False # DISABLED
)
print("Sending notification WITHOUT image attachment...")
success2 = notifier2.notify_batch_download(
platform='instagram',
downloads=downloads,
search_term=None
)
print(f"{'' if success2 else ''} Notification sent (without image): {success2}")
print()
print("=" * 60)
print("Test 3: Loading from Config File")
print("=" * 60)
from modules.pushover_notifier import create_notifier_from_config
notifier3 = create_notifier_from_config(config)
if notifier3:
print(f"Notifier created from config")
print(f" - include_image setting: {notifier3.include_image}")
print(f" - Current config value: {pushover_config.get('include_image', True)}")
print()
print("Sending notification using config setting...")
success3 = notifier3.notify_batch_download(
platform='instagram',
downloads=downloads,
search_term=None
)
print(f"{'' if success3 else ''} Notification sent: {success3}")
else:
print("❌ Failed to create notifier from config")
print()
print("=" * 60)
print("Check your Pushover app!")
print("You should see:")
print(" 1. First notification WITH thumbnail")
print(" 2. Second notification WITHOUT thumbnail")
print(" 3. Third notification based on config setting")
print("=" * 60)

75
tests/test_imginn_headless.py Executable file
View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
Test ImgInn with headless Chromium (no display)
"""
import sys
import os
from pathlib import Path
from datetime import datetime
# Add modules directory to path
sys.path.insert(0, str(Path(__file__).parent))
from modules.imginn_module import ImgInnDownloader
def main():
print("=" * 60)
print("ImgInn Headless Test - Chromium")
print("=" * 60)
print(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
print()
print("Testing with HEADLESS Chromium (no display needed)")
print()
# Get username from command line or use default
if len(sys.argv) > 1:
username = sys.argv[1]
else:
username = "evalongoria"
print(f"Testing with username: {username}")
print()
# Create downloader with HEADLESS browser
downloader = ImgInnDownloader(
api_key="cf57fdb7577ada64d150431d6589c8f4",
headless=True, # HEADLESS MODE
show_progress=True,
use_database=False
)
print(f"Starting headless download test for @{username}...")
print()
try:
# Try to download just 1 post to test
files = downloader.download_posts(
username=username,
days_back=14,
max_posts=1
)
print("\n" + "=" * 60)
print("RESULTS")
print("=" * 60)
if files:
print(f"✅ Successfully downloaded {len(files)} file(s)")
for f in files:
print(f" - {Path(f).name}")
print("\n✅ Headless Chromium works!")
else:
print("⚠️ No files downloaded")
print("This might be normal if posts were already downloaded")
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
except Exception as e:
print(f"\n\n❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

75
tests/test_imginn_visible.py Executable file
View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
Test ImgInn with visible browser to debug Cloudflare issues
"""
import sys
import os
from pathlib import Path
from datetime import datetime
# Add modules directory to path
sys.path.insert(0, str(Path(__file__).parent))
from modules.imginn_module import ImgInnDownloader
def main():
print("=" * 60)
print("ImgInn Browser Test - Visible Mode (CHROMIUM)")
print("=" * 60)
print(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
print()
print("This will open a visible Chromium browser so you can watch")
print("the Cloudflare interaction and see how it handles the challenge.")
print()
# Get username from command line or use default
if len(sys.argv) > 1:
username = sys.argv[1]
else:
username = "evalongoria"
print(f"Testing with username: {username}")
print()
# Create downloader with VISIBLE browser
downloader = ImgInnDownloader(
api_key="cf57fdb7577ada64d150431d6589c8f4",
headless=False, # THIS MAKES THE BROWSER VISIBLE
show_progress=True,
use_database=False # Skip database for testing
)
print(f"Starting download test for @{username}...")
print("Watch the browser window to see Cloudflare behavior!")
print()
try:
# Try to download just 1 post to test
files = downloader.download_posts(
username=username,
days_back=14,
max_posts=1 # Just test with 1 post
)
print("\n" + "=" * 60)
print("RESULTS")
print("=" * 60)
if files:
print(f"✅ Successfully downloaded {len(files)} file(s)")
for f in files:
print(f" - {Path(f).name}")
else:
print("❌ No files downloaded - check what happened in the browser")
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
except Exception as e:
print(f"\n\n❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""Test Instagram notification with mixed content types"""
import sys
import json
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / 'modules'))
from modules.pushover_notifier import PushoverNotifier
def load_config():
config_path = Path(__file__).parent / 'config' / 'settings.json'
with open(config_path, 'r') as f:
return json.load(f)
# Load config
config = load_config()
pushover_config = config.get('pushover', {})
# Create notifier
notifier = PushoverNotifier(
user_key=pushover_config.get('user_key'),
api_token=pushover_config.get('api_token'),
enabled=True
)
# Simulate Instagram download - mixed content from evalongoria
# 5 posts + 3 stories + 2 reels = 10 total
downloads = []
# Add 5 posts
for i in range(5):
downloads.append({
'source': 'evalongoria',
'content_type': 'post',
'filename': None
})
# Add 3 stories
for i in range(3):
downloads.append({
'source': 'evalongoria',
'content_type': 'story',
'filename': None
})
# Add 2 reels
for i in range(2):
downloads.append({
'source': 'evalongoria',
'content_type': 'reel',
'filename': None
})
# Send notification
print("Sending Instagram notification for evalongoria...")
print(f" 5 posts + 3 stories + 2 reels = 10 items")
print()
success = notifier.notify_batch_download(
platform='instagram',
downloads=downloads,
search_term=None
)
print(f"✅ Instagram notification sent: {success}")
print(f"Stats: {notifier.get_stats()}")
print()
print("Check your Pushover app!")

View File

@@ -0,0 +1,327 @@
#!/usr/bin/env python3
"""
Unit tests for Instagram Repost Detector
Run with: python3 -m pytest tests/test_instagram_repost_detector.py -v
"""
import os
import sys
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, MagicMock, patch
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
import pytest
PYTEST_AVAILABLE = True
except ImportError:
PYTEST_AVAILABLE = False
print("pytest not installed - run: pip3 install pytest")
from modules.instagram_repost_detector import InstagramRepostDetector
class TestInstagramRepostDetector:
"""Test suite for InstagramRepostDetector"""
@pytest.fixture
def mock_db(self):
"""Create mock database"""
db = Mock()
db.get_connection = MagicMock()
return db
@pytest.fixture
def detector(self, mock_db):
"""Create detector instance with mock DB"""
return InstagramRepostDetector(unified_db=mock_db)
@pytest.fixture
def temp_dir(self):
"""Create temporary directory for testing"""
temp = tempfile.mkdtemp()
yield Path(temp)
shutil.rmtree(temp, ignore_errors=True)
def test_detector_initialization(self, detector):
"""Test detector initializes correctly"""
assert detector is not None
assert detector.db is not None
assert detector.temp_download_path.exists()
def test_extract_username_from_text(self, detector):
"""Test username extraction from text"""
# Mock pytesseract if available
try:
import pytesseract
with patch('pytesseract.image_to_string') as mock_ocr:
mock_ocr.return_value = "Check out @testuser's post!"
# Create dummy image
from PIL import Image
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
img = Image.new('RGB', (100, 100), color='white')
img.save(f.name)
temp_file = f.name
result = detector._extract_username_from_repost(temp_file)
os.unlink(temp_file)
assert result == "testuser"
except ImportError:
pytest.skip("pytesseract not available")
def test_extract_multiple_usernames(self, detector):
"""Test extraction when multiple @usernames present"""
try:
import pytesseract
with patch('pytesseract.image_to_string') as mock_ocr:
mock_ocr.return_value = "@firstuser and @seconduser posted this"
from PIL import Image
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
img = Image.new('RGB', (100, 100), color='white')
img.save(f.name)
temp_file = f.name
result = detector._extract_username_from_repost(temp_file)
os.unlink(temp_file)
# Should return first username found
assert result == "firstuser"
except ImportError:
pytest.skip("pytesseract not available")
def test_no_username_found(self, detector):
"""Test when no @username is found"""
try:
import pytesseract
with patch('pytesseract.image_to_string') as mock_ocr:
mock_ocr.return_value = "No username here"
from PIL import Image
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
img = Image.new('RGB', (100, 100), color='white')
img.save(f.name)
temp_file = f.name
result = detector._extract_username_from_repost(temp_file)
os.unlink(temp_file)
assert result is None
except ImportError:
pytest.skip("pytesseract not available")
def test_is_monitored_account(self, detector, mock_db):
"""Test monitored account checking"""
# Mock database response - user is monitored
mock_cursor = Mock()
mock_cursor.fetchone.return_value = (1,)
mock_conn = Mock()
mock_conn.cursor.return_value = mock_cursor
mock_conn.__enter__ = Mock(return_value=mock_conn)
mock_conn.__exit__ = Mock(return_value=False)
mock_db.get_connection.return_value = mock_conn
result = detector._is_monitored_account("testuser")
assert result is True
def test_is_not_monitored_account(self, detector, mock_db):
"""Test non-monitored account checking"""
# Mock database response - user not monitored
mock_cursor = Mock()
mock_cursor.fetchone.return_value = None
mock_conn = Mock()
mock_conn.cursor.return_value = mock_cursor
mock_conn.__enter__ = Mock(return_value=mock_conn)
mock_conn.__exit__ = Mock(return_value=False)
mock_db.get_connection.return_value = mock_conn
result = detector._is_monitored_account("randomuser")
assert result is False
def test_already_fetched_today(self, detector, mock_db):
"""Test fetch cache checking"""
# Mock database response - already fetched
mock_cursor = Mock()
mock_cursor.fetchone.return_value = ("2025-11-09T10:00:00",)
mock_conn = Mock()
mock_conn.cursor.return_value = mock_cursor
mock_conn.__enter__ = Mock(return_value=mock_conn)
mock_conn.__exit__ = Mock(return_value=False)
mock_db.get_connection.return_value = mock_conn
result = detector._already_fetched_today("testuser")
assert result is True
def test_not_fetched_today(self, detector, mock_db):
"""Test fetch cache when not fetched"""
# Mock database response - not fetched
mock_cursor = Mock()
mock_cursor.fetchone.return_value = None
mock_conn = Mock()
mock_conn.cursor.return_value = mock_cursor
mock_conn.__enter__ = Mock(return_value=mock_conn)
mock_conn.__exit__ = Mock(return_value=False)
mock_db.get_connection.return_value = mock_conn
result = detector._already_fetched_today("testuser")
assert result is False
def test_perceptual_hash_calculation(self, detector):
"""Test perceptual hash calculation for images"""
try:
from PIL import Image
import imagehash
# Create test image
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
img = Image.new('RGB', (100, 100), color='red')
img.save(f.name)
temp_file = f.name
hash_result = detector._get_perceptual_hash(temp_file)
os.unlink(temp_file)
assert hash_result is not None
assert isinstance(hash_result, imagehash.ImageHash)
except ImportError:
pytest.skip("PIL or imagehash not available")
def test_perceptual_hash_similarity(self, detector):
"""Test that similar images produce similar hashes"""
try:
from PIL import Image, ImageDraw
import imagehash
# Create two similar images
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f1:
img1 = Image.new('RGB', (100, 100), color='blue')
draw = ImageDraw.Draw(img1)
draw.rectangle([25, 25, 75, 75], fill='white')
img1.save(f1.name)
temp_file1 = f1.name
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f2:
img2 = Image.new('RGB', (100, 100), color='blue')
draw = ImageDraw.Draw(img2)
draw.rectangle([25, 25, 75, 75], fill='white')
img2.save(f2.name)
temp_file2 = f2.name
hash1 = detector._get_perceptual_hash(temp_file1)
hash2 = detector._get_perceptual_hash(temp_file2)
os.unlink(temp_file1)
os.unlink(temp_file2)
# Identical images should have distance 0
distance = hash1 - hash2
assert distance == 0
except ImportError:
pytest.skip("PIL or imagehash not available")
def test_cleanup_temp_downloads(self, detector, temp_dir):
"""Test cleanup of temporary files"""
# Create test files
test_dir = temp_dir / "testuser"
stories_dir = test_dir / "stories"
posts_dir = test_dir / "posts"
stories_dir.mkdir(parents=True)
posts_dir.mkdir(parents=True)
# Create dummy files
file1 = stories_dir / "story1.jpg"
file2 = stories_dir / "story2.jpg"
file3 = posts_dir / "post1.jpg"
file1.touch()
file2.touch()
file3.touch()
# Cleanup, keeping file1
detector._cleanup_temp_downloads(test_dir, keep_file=str(file1))
# Check results
assert file1.exists() # Should be kept
assert not file2.exists() # Should be deleted
assert not file3.exists() # Should be deleted
def test_file_not_found(self, detector):
"""Test handling of non-existent files"""
result = detector.check_and_replace_repost("/nonexistent/file.jpg", "testuser")
assert result is None
def test_mark_fetched(self, detector, mock_db):
"""Test marking content as fetched"""
mock_cursor = Mock()
mock_conn = Mock()
mock_conn.cursor.return_value = mock_cursor
mock_conn.__enter__ = Mock(return_value=mock_conn)
mock_conn.__exit__ = Mock(return_value=False)
mock_db.get_connection.return_value = mock_conn
# Should not raise exception
detector._mark_fetched("testuser", content_count=10)
# Verify INSERT was called
assert mock_cursor.execute.called
def test_record_replacement(self, detector, mock_db):
"""Test recording repost replacement"""
mock_cursor = Mock()
mock_conn = Mock()
mock_conn.cursor.return_value = mock_cursor
mock_conn.__enter__ = Mock(return_value=mock_conn)
mock_conn.__exit__ = Mock(return_value=False)
mock_db.get_connection.return_value = mock_conn
detector.last_original_username = "originaluser"
# Should not raise exception
detector._record_repost_replacement(
repost_path="/path/to/repost.jpg",
original_path="/path/to/original.jpg",
replacement_path="/path/to/replacement.jpg"
)
# Verify INSERT was called
assert mock_cursor.execute.called
# Integration-style tests
class TestInstagramRepostDetectorIntegration:
"""Integration tests with real file operations"""
@pytest.fixture
def temp_dir(self):
"""Create temporary directory"""
temp = tempfile.mkdtemp()
yield Path(temp)
shutil.rmtree(temp, ignore_errors=True)
def test_full_workflow_mock(self, temp_dir):
"""Test full workflow with mocked downloads"""
# This would test the complete flow:
# 1. Create repost file with @username
# 2. OCR extracts username
# 3. Check if monitored
# 4. Download content (mocked)
# 5. Find match via hash
# 6. Replace file
# 7. Cleanup temp files
# Skipped for now - would require extensive mocking
pytest.skip("Full integration test requires complete setup")
if __name__ == "__main__":
if PYTEST_AVAILABLE:
pytest.main([__file__, "-v"])
else:
print("Please install pytest: pip3 install pytest")

View File

@@ -0,0 +1,257 @@
#!/usr/bin/env python3
"""
Dry-run test of Instagram Perceptual Duplicate Detection
Scans last 3 days of downloads and reports what would be considered duplicates
WITHOUT actually moving or deleting anything.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from modules.unified_database import UnifiedDatabase
from modules.instagram_perceptual_duplicate_detector import InstagramPerceptualDuplicateDetector
import json
from datetime import datetime, timedelta
from collections import defaultdict
class DryRunLogger:
"""Logger that captures all messages"""
def __init__(self):
self.messages = []
def __call__(self, msg, level):
self.messages.append((level, msg))
print(f"[{level.upper()}] {msg}")
def main():
print("=" * 80)
print("INSTAGRAM PERCEPTUAL DUPLICATE DETECTION - DRY RUN")
print("=" * 80)
print()
# Initialize database
db_path = Path(__file__).parent.parent / 'database' / 'media_downloader.db'
db = UnifiedDatabase(str(db_path))
# Get all Instagram files from last 3 days
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
filename,
source,
file_path,
file_hash,
download_date,
content_type
FROM downloads
WHERE platform = 'instagram'
AND download_date > datetime('now', '-3 days')
AND file_path IS NOT NULL
AND file_path NOT LIKE '%_phrase_checked_%'
AND file_path NOT LIKE '%_old_post_%'
ORDER BY source, download_date
""")
files = []
for row in cursor.fetchall():
if row[2] and Path(row[2]).exists(): # Only include files that exist
files.append({
'filename': row[0],
'source': row[1],
'file_path': row[2],
'file_hash': row[3],
'download_date': row[4],
'content_type': row[5] or 'unknown'
})
print(f"Found {len(files)} Instagram files from last 3 days that exist on disk")
print()
if len(files) == 0:
print("No files to analyze!")
return
# Initialize detector
logger = DryRunLogger()
detector = InstagramPerceptualDuplicateDetector(
unified_db=db,
log_callback=logger
)
# Get settings (will use defaults since feature is disabled)
settings = {
'enabled': False,
'perceptual_hash_threshold': 12,
'text_detection_enabled': True,
'clean_score_weight': 3,
'quality_score_weight': 1,
'min_text_difference': 5
}
print(f"Using settings:")
print(f" - Perceptual hash threshold: {settings['perceptual_hash_threshold']}")
print(f" - Clean score weight: {settings['clean_score_weight']}")
print(f" - Quality score weight: {settings['quality_score_weight']}")
print(f" - Text detection: {'Enabled' if settings['text_detection_enabled'] else 'Disabled'}")
print()
# Process each file and collect data
print("Analyzing files...")
print("-" * 80)
file_data = []
for i, file_info in enumerate(files, 1):
file_path = file_info['file_path']
source = file_info['source']
print(f"\n[{i}/{len(files)}] Processing: {Path(file_path).name}")
# Calculate perceptual hash
phash = detector._calculate_perceptual_hash(file_path)
if not phash:
print(f" ⚠️ Could not calculate perceptual hash - skipping")
continue
# Detect text overlays
if settings['text_detection_enabled']:
text_count, text_chars = detector._detect_text_overlays(file_path)
else:
text_count, text_chars = 0, 0
# Get quality metrics
quality_metrics = detector._get_quality_metrics(file_path)
# Calculate scores
clean_score = detector._calculate_clean_score(text_count, text_chars)
quality_score = detector._calculate_quality_score(quality_metrics)
print(f" Hash: {phash[:16]}...")
print(f" Text overlays: {text_count} regions, {text_chars} chars")
print(f" Resolution: {quality_metrics['width']}x{quality_metrics['height']}")
print(f" File size: {quality_metrics['file_size'] / 1024 / 1024:.1f} MB")
print(f" Clean score: {clean_score:.1f}/100")
print(f" Quality score: {quality_score:.1f}/100")
print(f" Total score: {(clean_score * settings['clean_score_weight']) + (quality_score * settings['quality_score_weight']):.1f}")
file_data.append({
'file_info': file_info,
'phash': phash,
'text_count': text_count,
'text_chars': text_chars,
'clean_score': clean_score,
'quality_score': quality_score,
'quality_metrics': quality_metrics,
'total_score': (clean_score * settings['clean_score_weight']) + (quality_score * settings['quality_score_weight'])
})
print()
print("=" * 80)
print("DUPLICATE DETECTION ANALYSIS")
print("=" * 80)
print()
# Find duplicates by comparing hashes
duplicates = []
processed = set()
for i, data1 in enumerate(file_data):
if i in processed:
continue
group = [data1]
for j, data2 in enumerate(file_data[i+1:], start=i+1):
if j in processed:
continue
# Same source only
if data1['file_info']['source'] != data2['file_info']['source']:
continue
# Calculate Hamming distance
distance = detector._hamming_distance(data1['phash'], data2['phash'])
if distance <= settings['perceptual_hash_threshold']:
group.append(data2)
processed.add(j)
if len(group) > 1:
# Sort by total score (highest first)
group.sort(key=lambda x: x['total_score'], reverse=True)
duplicates.append(group)
processed.add(i)
if len(duplicates) == 0:
print("✅ No perceptual duplicates found!")
print()
print("All files are unique or sufficiently different.")
return
print(f"Found {len(duplicates)} duplicate group(s):")
print()
total_would_remove = 0
for group_num, group in enumerate(duplicates, 1):
print(f"\n{'=' * 80}")
print(f"DUPLICATE GROUP #{group_num}")
print(f"{'=' * 80}")
print(f"Source: {group[0]['file_info']['source']}")
print(f"Files in group: {len(group)}")
print()
best = group[0]
print(f"✅ WOULD KEEP:")
print(f" File: {Path(best['file_info']['file_path']).name}")
print(f" Path: {best['file_info']['file_path']}")
print(f" Clean score: {best['clean_score']:.1f}/100 ({best['text_count']} text regions)")
print(f" Quality score: {best['quality_score']:.1f}/100 ({best['quality_metrics']['width']}x{best['quality_metrics']['height']}, {best['quality_metrics']['file_size']/1024/1024:.1f}MB)")
print(f" Total score: {best['total_score']:.1f}")
print(f" Download date: {best['file_info']['download_date']}")
print()
print(f"❌ WOULD REMOVE ({len(group)-1} file(s)):")
for data in group[1:]:
total_would_remove += 1
print(f"\n File: {Path(data['file_info']['file_path']).name}")
print(f" Path: {data['file_info']['file_path']}")
print(f" Clean score: {data['clean_score']:.1f}/100 ({data['text_count']} text regions)")
print(f" Quality score: {data['quality_score']:.1f}/100 ({data['quality_metrics']['width']}x{data['quality_metrics']['height']}, {data['quality_metrics']['file_size']/1024/1024:.1f}MB)")
print(f" Total score: {data['total_score']:.1f}")
print(f" Download date: {data['file_info']['download_date']}")
# Calculate hash distance
distance = detector._hamming_distance(best['phash'], data['phash'])
print(f" Hash distance from best: {distance}")
# Explain why it would be removed
reasons = []
if data['clean_score'] < best['clean_score'] - settings['min_text_difference']:
reasons.append(f"Has more text overlays ({data['text_count']} vs {best['text_count']})")
if data['quality_score'] < best['quality_score']:
reasons.append(f"Lower quality ({data['quality_metrics']['width']}x{data['quality_metrics']['height']} vs {best['quality_metrics']['width']}x{best['quality_metrics']['height']})")
if data['total_score'] < best['total_score']:
reasons.append(f"Lower total score ({data['total_score']:.1f} vs {best['total_score']:.1f})")
if reasons:
print(f" Reason(s): {'; '.join(reasons)}")
print()
print("=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Total files analyzed: {len(file_data)}")
print(f"Duplicate groups found: {len(duplicates)}")
print(f"Files that would be kept: {len(duplicates)}")
print(f"Files that would be removed: {total_would_remove}")
print()
print("⚠️ NOTE: This is a DRY RUN - no files were actually moved or deleted!")
print(" To enable this feature, set 'enabled: true' in Configuration > Instagram Perceptual Duplicate Detection")
print()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,367 @@
#!/usr/bin/env python3
"""
Comprehensive Perceptual Duplicate Detection Scan
Scans ALL Instagram files from last 3 days:
- Files in database (even if moved)
- Files in recycle bin
- Files in all locations
Reports what would be considered duplicates WITHOUT actually moving anything.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from modules.unified_database import UnifiedDatabase
from modules.instagram_perceptual_duplicate_detector import InstagramPerceptualDuplicateDetector
import json
from datetime import datetime, timedelta
from collections import defaultdict
import os
class DryRunLogger:
"""Logger that captures all messages"""
def __init__(self):
self.messages = []
def __call__(self, msg, level):
self.messages.append((level, msg))
# Only print important messages to reduce clutter
if level in ['info', 'success', 'warning', 'error']:
print(f"[{level.upper()}] {msg}")
def get_all_instagram_files(db, days=3):
"""Get all Instagram files from multiple sources"""
print("Collecting all Instagram files from multiple sources...")
print("-" * 80)
all_files = {} # Use dict to deduplicate by path
# 1. Get files from database
print("\n1. Scanning database records...")
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
filename,
source,
file_path,
file_hash,
download_date,
content_type
FROM downloads
WHERE platform = 'instagram'
AND download_date > datetime('now', ?)
AND file_path IS NOT NULL
AND file_path NOT LIKE '%_phrase_checked_%'
AND file_path NOT LIKE '%_old_post_%'
AND file_path NOT LIKE '%_skipped%'
ORDER BY source, download_date
""", (f'-{days} days',))
db_files = 0
existing_db_files = 0
for row in cursor.fetchall():
db_files += 1
file_path = row[2]
if file_path and Path(file_path).exists():
existing_db_files += 1
all_files[file_path] = {
'filename': row[0],
'source': row[1],
'file_path': file_path,
'file_hash': row[3],
'download_date': row[4],
'content_type': row[5] or 'unknown',
'location': 'database'
}
print(f" Found {db_files} database records, {existing_db_files} files still exist")
# 2. Scan recycle bin directory
print("\n2. Scanning recycle bin directory...")
recycle_path = Path('/opt/immich/recycle')
recycle_files = 0
if recycle_path.exists():
# Get all media files from last N days
cutoff_time = datetime.now().timestamp() - (days * 24 * 60 * 60)
for ext in ['*.mp4', '*.jpg', '*.jpeg', '*.webp', '*.png', '*.heic']:
for file_path in recycle_path.rglob(ext):
# Check modification time
if file_path.stat().st_mtime > cutoff_time:
recycle_files += 1
file_path_str = str(file_path)
# Try to extract source from filename (Instagram format: source_date_...)
filename = file_path.name
source = 'unknown'
# Try to match Instagram filename pattern
import re
match = re.match(r'^([a-z0-9._]+)_\d{8}', filename.lower())
if match:
source = match.group(1)
if file_path_str not in all_files:
all_files[file_path_str] = {
'filename': filename,
'source': source,
'file_path': file_path_str,
'file_hash': None,
'download_date': datetime.fromtimestamp(file_path.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'content_type': 'unknown',
'location': 'recycle_bin'
}
print(f" Found {recycle_files} media files in recycle bin")
# 3. Scan immich upload/review directories
print("\n3. Scanning immich directories...")
immich_files = 0
for base_path in ['/opt/immich/upload', '/opt/immich/review']:
base = Path(base_path)
if base.exists():
for ext in ['*.mp4', '*.jpg', '*.jpeg', '*.webp', '*.png', '*.heic']:
for file_path in base.rglob(ext):
# Check modification time
if file_path.stat().st_mtime > cutoff_time:
# Check if looks like Instagram file
if 'instagram' in str(file_path).lower():
immich_files += 1
file_path_str = str(file_path)
if file_path_str not in all_files:
filename = file_path.name
source = 'unknown'
# Extract source from filename
import re
match = re.match(r'^([a-z0-9._]+)_\d{8}', filename.lower())
if match:
source = match.group(1)
all_files[file_path_str] = {
'filename': filename,
'source': source,
'file_path': file_path_str,
'file_hash': None,
'download_date': datetime.fromtimestamp(file_path.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'content_type': 'unknown',
'location': 'immich'
}
print(f" Found {immich_files} Instagram files in immich directories")
print()
print(f"TOTAL UNIQUE FILES TO ANALYZE: {len(all_files)}")
print("=" * 80)
print()
return list(all_files.values())
def main():
print("=" * 80)
print("COMPREHENSIVE INSTAGRAM PERCEPTUAL DUPLICATE DETECTION - DRY RUN")
print("=" * 80)
print()
# Initialize database
db_path = Path(__file__).parent.parent / 'database' / 'media_downloader.db'
db = UnifiedDatabase(str(db_path))
# Get all files from all sources
files = get_all_instagram_files(db, days=3)
if len(files) == 0:
print("No files to analyze!")
return
# Initialize detector
logger = DryRunLogger()
detector = InstagramPerceptualDuplicateDetector(
unified_db=db,
log_callback=logger
)
# Settings
settings = {
'enabled': False,
'perceptual_hash_threshold': 12,
'text_detection_enabled': True,
'clean_score_weight': 3,
'quality_score_weight': 1,
'min_text_difference': 5
}
print(f"Settings:")
print(f" - Perceptual hash threshold: {settings['perceptual_hash_threshold']}")
print(f" - Clean score weight: {settings['clean_score_weight']}")
print(f" - Quality score weight: {settings['quality_score_weight']}")
print()
# Process each file
print("Analyzing files (this may take a while)...")
print("-" * 80)
file_data = []
processed = 0
skipped = 0
for i, file_info in enumerate(files, 1):
file_path = file_info['file_path']
source = file_info['source']
# Progress indicator every 50 files
if i % 50 == 0:
print(f"Progress: {i}/{len(files)} files processed...")
# Calculate perceptual hash
phash = detector._calculate_perceptual_hash(file_path)
if not phash:
skipped += 1
continue
# Detect text overlays
if settings['text_detection_enabled']:
text_count, text_chars = detector._detect_text_overlays(file_path)
else:
text_count, text_chars = 0, 0
# Get quality metrics
quality_metrics = detector._get_quality_metrics(file_path)
# Calculate scores
clean_score = detector._calculate_clean_score(text_count, text_chars)
quality_score = detector._calculate_quality_score(quality_metrics)
file_data.append({
'file_info': file_info,
'phash': phash,
'text_count': text_count,
'text_chars': text_chars,
'clean_score': clean_score,
'quality_score': quality_score,
'quality_metrics': quality_metrics,
'total_score': (clean_score * settings['clean_score_weight']) + (quality_score * settings['quality_score_weight'])
})
processed += 1
print()
print(f"Analyzed {processed} files successfully, skipped {skipped} files")
print()
print("=" * 80)
print("DUPLICATE DETECTION ANALYSIS")
print("=" * 80)
print()
# Find duplicates by comparing hashes
duplicates = []
processed_indices = set()
for i, data1 in enumerate(file_data):
if i in processed_indices:
continue
group = [data1]
for j, data2 in enumerate(file_data[i+1:], start=i+1):
if j in processed_indices:
continue
# Same source only
if data1['file_info']['source'] != data2['file_info']['source']:
continue
# Calculate Hamming distance
distance = detector._hamming_distance(data1['phash'], data2['phash'])
if distance <= settings['perceptual_hash_threshold']:
group.append(data2)
processed_indices.add(j)
if len(group) > 1:
# Sort by total score (highest first)
group.sort(key=lambda x: x['total_score'], reverse=True)
duplicates.append(group)
processed_indices.add(i)
if len(duplicates) == 0:
print("✅ No perceptual duplicates found!")
print()
print("All files are unique or sufficiently different.")
return
print(f"Found {len(duplicates)} duplicate group(s):")
print()
total_would_remove = 0
total_size_would_free = 0
for group_num, group in enumerate(duplicates, 1):
print(f"\n{'=' * 80}")
print(f"DUPLICATE GROUP #{group_num}")
print(f"{'=' * 80}")
print(f"Source: {group[0]['file_info']['source']}")
print(f"Files in group: {len(group)}")
print()
best = group[0]
print(f"✅ WOULD KEEP:")
print(f" File: {Path(best['file_info']['file_path']).name}")
print(f" Location: {best['file_info']['location']}")
print(f" Path: {best['file_info']['file_path']}")
print(f" Clean score: {best['clean_score']:.1f}/100 ({best['text_count']} text regions)")
print(f" Quality score: {best['quality_score']:.1f}/100 ({best['quality_metrics']['width']}x{best['quality_metrics']['height']}, {best['quality_metrics']['file_size']/1024/1024:.1f}MB)")
print(f" Total score: {best['total_score']:.1f}")
print()
print(f"❌ WOULD REMOVE ({len(group)-1} file(s)):")
for data in group[1:]:
total_would_remove += 1
total_size_would_free += data['quality_metrics']['file_size']
print(f"\n File: {Path(data['file_info']['file_path']).name}")
print(f" Location: {data['file_info']['location']}")
print(f" Path: {data['file_info']['file_path']}")
print(f" Clean score: {data['clean_score']:.1f}/100 ({data['text_count']} text regions)")
print(f" Quality score: {data['quality_score']:.1f}/100 ({data['quality_metrics']['width']}x{data['quality_metrics']['height']}, {data['quality_metrics']['file_size']/1024/1024:.1f}MB)")
print(f" Total score: {data['total_score']:.1f}")
# Calculate hash distance
distance = detector._hamming_distance(best['phash'], data['phash'])
print(f" Hash distance from best: {distance}")
# Explain why
reasons = []
if data['clean_score'] < best['clean_score'] - settings['min_text_difference']:
reasons.append(f"More text overlays ({data['text_count']} vs {best['text_count']})")
if data['quality_score'] < best['quality_score']:
reasons.append(f"Lower quality ({data['quality_metrics']['width']}x{data['quality_metrics']['height']} vs {best['quality_metrics']['width']}x{best['quality_metrics']['height']})")
if data['total_score'] < best['total_score']:
reasons.append(f"Lower total score ({data['total_score']:.1f} vs {best['total_score']:.1f})")
if reasons:
print(f" Reason(s): {'; '.join(reasons)}")
print()
print("=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Total files analyzed: {processed}")
print(f"Duplicate groups found: {len(duplicates)}")
print(f"Files that would be kept: {len(duplicates)}")
print(f"Files that would be removed: {total_would_remove}")
print(f"Storage that would be freed: {total_size_would_free / 1024 / 1024:.1f} MB")
print()
print("⚠️ NOTE: This is a DRY RUN - no files were actually moved or deleted!")
print()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,285 @@
#!/usr/bin/env python3
"""
Perceptual Duplicate Detection with Proper Source Mapping
Maps UUID filenames from recycle bin back to original Instagram sources
using the media-downloader's recycle_bin database.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from modules.unified_database import UnifiedDatabase
from modules.instagram_perceptual_duplicate_detector import InstagramPerceptualDuplicateDetector
import sqlite3
import json
from datetime import datetime
from collections import defaultdict
import re
def get_source_mapping(backup_db_path):
"""Map UUID recycle filenames to original sources"""
print("Loading recycle bin source mappings...")
conn = sqlite3.connect(backup_db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT
recycle_path,
original_filename,
original_path,
deleted_at
FROM recycle_bin
WHERE deleted_at > datetime('now', '-3 days')
""")
mapping = {}
for row in cursor.fetchall():
recycle_path = row['recycle_path']
original_filename = row['original_filename']
# Extract source from Instagram filename pattern: source_date_id.ext
source = 'unknown'
match = re.match(r'^([a-z0-9._]+)_\d{8}', original_filename.lower())
if match:
source = match.group(1)
mapping[recycle_path] = {
'source': source,
'original_filename': original_filename,
'original_path': row['original_path'],
'deleted_at': row['deleted_at']
}
conn.close()
print(f" Mapped {len(mapping)} recycled files to original sources")
return mapping
def main():
print("=" * 80)
print("INSTAGRAM PERCEPTUAL DUPLICATES - WITH SOURCE MAPPING")
print("=" * 80)
print()
# Load source mapping from recycle bin database
backup_db = Path(__file__).parent.parent / 'data' / 'backup_cache.db'
source_mapping = get_source_mapping(str(backup_db))
# Load comprehensive scan results
db_path = Path(__file__).parent.parent / 'database' / 'media_downloader.db'
db = UnifiedDatabase(str(db_path))
# Get all files
print("\nCollecting Instagram files...")
all_files = {}
# Database files
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT filename, source, file_path, download_date
FROM downloads
WHERE platform = 'instagram'
AND download_date > datetime('now', '-3 days')
AND file_path IS NOT NULL
AND file_path NOT LIKE '%_phrase_checked_%'
AND file_path NOT LIKE '%_old_post_%'
""")
for row in cursor.fetchall():
if Path(row[2]).exists():
all_files[row[2]] = {
'source': row[1],
'filename': row[0],
'file_path': row[2],
'location': 'active'
}
# Recycle bin files with proper source mapping
recycle_path = Path('/opt/immich/recycle')
if recycle_path.exists():
cutoff = datetime.now().timestamp() - (3 * 24 * 60 * 60)
for ext in ['*.mp4', '*.jpg', '*.jpeg', '*.webp', '*.png', '*.heic']:
for file_path in recycle_path.rglob(ext):
if file_path.stat().st_mtime > cutoff:
file_path_str = str(file_path)
# Look up source from mapping
source_info = source_mapping.get(file_path_str, {})
source = source_info.get('source', 'unknown')
original_filename = source_info.get('original_filename', file_path.name)
all_files[file_path_str] = {
'source': source,
'filename': original_filename,
'file_path': file_path_str,
'location': 'recycle_bin'
}
print(f"Total files to analyze: {len(all_files)}")
print()
# Initialize detector
detector = InstagramPerceptualDuplicateDetector(
unified_db=db,
log_callback=lambda msg, lvl: None # Suppress logs
)
# Analyze files
print("Analyzing files (this may take a while)...")
file_data = []
for i, (path, info) in enumerate(all_files.items(), 1):
if i % 50 == 0:
print(f" Progress: {i}/{len(all_files)}...")
phash = detector._calculate_perceptual_hash(path)
if not phash:
continue
text_count, text_chars = detector._detect_text_overlays(path)
quality_metrics = detector._get_quality_metrics(path)
clean_score = detector._calculate_clean_score(text_count, text_chars)
quality_score = detector._calculate_quality_score(quality_metrics)
file_data.append({
'info': info,
'phash': phash,
'text_count': text_count,
'text_chars': text_chars,
'clean_score': clean_score,
'quality_score': quality_score,
'quality_metrics': quality_metrics,
'total_score': (clean_score * 3) + (quality_score * 1)
})
print(f"Analyzed {len(file_data)} files")
print()
# Find duplicates by source
print("=" * 80)
print("DUPLICATE DETECTION BY SOURCE")
print("=" * 80)
print()
# Group by source first
by_source = defaultdict(list)
for data in file_data:
by_source[data['info']['source']].append(data)
# Find duplicates within each source
duplicate_groups = []
for source, files in by_source.items():
if source == 'unknown' or len(files) < 2:
continue
processed = set()
for i, data1 in enumerate(files):
if i in processed:
continue
group = [data1]
for j, data2 in enumerate(files[i+1:], start=i+1):
if j in processed:
continue
distance = detector._hamming_distance(data1['phash'], data2['phash'])
if distance <= 12: # threshold
group.append(data2)
processed.add(j)
if len(group) > 1:
group.sort(key=lambda x: x['total_score'], reverse=True)
duplicate_groups.append((source, group))
processed.add(i)
if len(duplicate_groups) == 0:
print("✅ No duplicates found (excluding 'unknown' sources)")
return
# Report by source
print(f"Found {len(duplicate_groups)} duplicate group(s) across {len(set(s for s, _ in duplicate_groups))} sources")
print()
# Group by source for reporting
by_source_report = defaultdict(list)
for source, group in duplicate_groups:
by_source_report[source].append(group)
total_would_remove = 0
total_size_freed = 0
for source in sorted(by_source_report.keys()):
groups = by_source_report[source]
print(f"\n{'=' * 80}")
print(f"SOURCE: @{source}")
print(f"{'=' * 80}")
print(f"Duplicate groups: {len(groups)}")
print()
for group_num, group in enumerate(groups, 1):
print(f"\n Group {group_num} ({len(group)} files):")
print(f" {'-' * 76}")
best = group[0]
print(f" ✅ KEEP: {best['info']['filename'][:60]}")
print(f" Location: {best['info']['location']}")
print(f" Clean: {best['clean_score']:.0f}/100 ({best['text_count']} text), Quality: {best['quality_score']:.0f}/100")
print(f" Resolution: {best['quality_metrics']['width']}x{best['quality_metrics']['height']}, Size: {best['quality_metrics']['file_size']/1024/1024:.1f}MB")
print()
for data in group[1:]:
total_would_remove += 1
total_size_freed += data['quality_metrics']['file_size']
distance = detector._hamming_distance(best['phash'], data['phash'])
print(f" ❌ REMOVE: {data['info']['filename'][:60]}")
print(f" Location: {data['info']['location']}")
print(f" Clean: {data['clean_score']:.0f}/100 ({data['text_count']} text), Quality: {data['quality_score']:.0f}/100")
print(f" Hash distance: {distance}")
reasons = []
if data['clean_score'] < best['clean_score'] - 5:
reasons.append(f"More text ({data['text_count']} vs {best['text_count']})")
if data['quality_score'] < best['quality_score']:
reasons.append("Lower quality")
if reasons:
print(f" Reason: {', '.join(reasons)}")
print()
print()
print("=" * 80)
print("SUMMARY BY SOURCE")
print("=" * 80)
source_stats = defaultdict(lambda: {'groups': 0, 'would_remove': 0})
for source, group in duplicate_groups:
source_stats[source]['groups'] += 1
source_stats[source]['would_remove'] += len(group) - 1
print()
for source in sorted(source_stats.keys(), key=lambda s: source_stats[s]['would_remove'], reverse=True):
stats = source_stats[source]
print(f" @{source:30s} : {stats['groups']:2d} groups, {stats['would_remove']:3d} files to remove")
print()
print("=" * 80)
print("OVERALL SUMMARY")
print("=" * 80)
print(f"Sources with duplicates: {len(source_stats)}")
print(f"Total duplicate groups: {len(duplicate_groups)}")
print(f"Files that would be removed: {total_would_remove}")
print(f"Storage that would be freed: {total_size_freed / 1024 / 1024:.1f} MB")
print()
if __name__ == '__main__':
main()

118
tests/test_push_with_thumbnail.py Executable file
View File

@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""Test push notification with image thumbnail attachment"""
import sys
import json
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / 'modules'))
from modules.pushover_notifier import PushoverNotifier
def load_config():
config_path = Path(__file__).parent / 'config' / 'settings.json'
with open(config_path, 'r') as f:
return json.load(f)
# Load config
config = load_config()
pushover_config = config.get('pushover', {})
# Create notifier
notifier = PushoverNotifier(
user_key=pushover_config.get('user_key'),
api_token=pushover_config.get('api_token'),
enabled=True
)
# Test 1: Instagram with multiple images
print("=" * 60)
print("Test 1: Instagram Notification with Thumbnail")
print("=" * 60)
# Find some actual images for testing
instagram_images = [
"/opt/immich/md/forums/PicturePub/Teri Hatcher & Eva Longoria - Desperate Housewives S08E14_ Get Out of My Life 2012, 80x/638100294_rere-12.jpg",
"/opt/immich/md/forums/PicturePub/Teri Hatcher & Eva Longoria - Desperate Housewives S08E14_ Get Out of My Life 2012, 80x/638100582_rere-194.jpg",
"/opt/immich/md/forums/PicturePub/Teri Hatcher & Eva Longoria - Desperate Housewives S08E14_ Get Out of My Life 2012, 80x/638100577_rere-191.jpg"
]
# Simulate Instagram download - mixed content
downloads = []
# Add 5 posts
for i in range(5):
downloads.append({
'source': 'evalongoria',
'content_type': 'post',
'filename': f'post_{i}.jpg',
'file_path': instagram_images[i % len(instagram_images)] # Use actual images
})
# Add 3 stories
for i in range(3):
downloads.append({
'source': 'evalongoria',
'content_type': 'story',
'filename': f'story_{i}.jpg',
'file_path': instagram_images[i % len(instagram_images)]
})
# Add 2 reels
for i in range(2):
downloads.append({
'source': 'evalongoria',
'content_type': 'reel',
'filename': f'reel_{i}.mp4',
'file_path': None # Videos won't be selected as thumbnails
})
print(f"Sending Instagram notification with thumbnail...")
print(f" - 5 posts + 3 stories + 2 reels = 10 items")
print(f" - Random thumbnail will be selected from images")
print()
success = notifier.notify_batch_download(
platform='instagram',
downloads=downloads,
search_term=None
)
print(f"{'' if success else ''} Instagram notification sent: {success}")
print()
# Test 2: Forum notification with thumbnail
print("=" * 60)
print("Test 2: Forum Notification with Thumbnail")
print("=" * 60)
forum_downloads = []
for i, img_path in enumerate(instagram_images):
forum_downloads.append({
'source': 'HQCelebCorner',
'content_type': 'image',
'filename': Path(img_path).name,
'file_path': img_path
})
print(f"Sending forum notification with thumbnail...")
print(f" - {len(forum_downloads)} images")
print(f" - Search term: Eva Longoria")
print()
success = notifier.notify_batch_download(
platform='forum',
downloads=forum_downloads,
search_term='Eva Longoria'
)
print(f"{'' if success else ''} Forum notification sent: {success}")
print()
# Show stats
print("=" * 60)
print(f"Notification Stats: {notifier.get_stats()}")
print("=" * 60)
print()
print("Check your Pushover app for notifications with thumbnails!")

92
tests/test_pushover.py Executable file
View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""
Test Pushover Notifications
Sends a test notification to verify credentials and setup
"""
import sys
import json
from pathlib import Path
from datetime import datetime
# Add modules to path
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / 'modules'))
from modules.pushover_notifier import PushoverNotifier
def load_config():
"""Load configuration from settings.json"""
config_path = Path(__file__).parent / 'config' / 'settings.json'
with open(config_path, 'r') as f:
return json.load(f)
def main():
print("Testing Pushover Notifications...")
print("-" * 50)
# Load config
config = load_config()
pushover_config = config.get('pushover', {})
# Check if enabled
if not pushover_config.get('enabled'):
print("❌ Pushover is disabled in config")
print(" Set 'enabled': true in config/settings.json")
return 1
# Check credentials
user_key = pushover_config.get('user_key')
api_token = pushover_config.get('api_token')
if not user_key or not api_token:
print("❌ Missing Pushover credentials")
print(" Add 'user_key' and 'api_token' to config/settings.json")
return 1
print(f"✓ Pushover enabled")
print(f"✓ User key: {user_key[:10]}...")
print(f"✓ API token: {api_token[:10]}...")
print()
# Create notifier
notifier = PushoverNotifier(
user_key=user_key,
api_token=api_token,
enabled=True,
default_priority=pushover_config.get('priority', 0),
device=pushover_config.get('device')
)
# Send test notification
print("Sending test notification...")
success = notifier.notify_download(
platform='instagram',
source='evalongoria',
content_type='story',
filename='test_story_20251019.mp4',
count=3,
metadata={'post_date': datetime.now()}
)
if success:
print("✅ Test notification sent successfully!")
print()
print("Check your Pushover app for the notification.")
print()
print("Stats:", notifier.get_stats())
return 0
else:
print("❌ Failed to send notification")
print()
print("Stats:", notifier.get_stats())
print()
print("Possible issues:")
print(" - Invalid user_key or api_token")
print(" - No internet connection")
print(" - Pushover service down")
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -0,0 +1,309 @@
#!/usr/bin/env python3
"""
Manual Integration Test for Instagram Repost Detector
This script tests the repost detector with real files and can be run manually
to validate the implementation before integrating into the main system.
Usage:
python3 tests/test_repost_detection_manual.py [test_file_path] [source_username]
Example:
python3 tests/test_repost_detection_manual.py \
"/media/.../evalongoria_20251109_154548_story6.mp4" \
"evalongoria"
"""
import sys
import os
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from modules.instagram_repost_detector import InstagramRepostDetector
from modules.unified_database import UnifiedDatabase
def test_dependencies():
"""Test if all dependencies are installed"""
print("=" * 70)
print("CHECKING DEPENDENCIES")
print("=" * 70)
missing = []
try:
import pytesseract
from PIL import Image
print("✓ pytesseract and PIL installed")
except ImportError:
print("✗ pytesseract or PIL not installed")
print(" Install: pip3 install pytesseract pillow")
missing.append("pytesseract/PIL")
try:
import cv2
print("✓ opencv-python installed")
except ImportError:
print("✗ opencv-python not installed")
print(" Install: pip3 install opencv-python")
missing.append("opencv-python")
try:
import imagehash
print("✓ imagehash installed")
except ImportError:
print("✗ imagehash not installed")
print(" Install: pip3 install imagehash")
missing.append("imagehash")
# Check tesseract binary
try:
import pytesseract
pytesseract.get_tesseract_version()
print("✓ tesseract-ocr binary installed")
except Exception:
print("✗ tesseract-ocr binary not installed")
print(" Install: sudo apt-get install tesseract-ocr tesseract-ocr-eng")
missing.append("tesseract-ocr")
print()
if missing:
print(f"❌ Missing dependencies: {', '.join(missing)}")
print("\nPlease install missing dependencies before running tests.")
return False
else:
print("✅ All dependencies installed")
return True
def test_ocr_extraction(file_path: str):
"""Test OCR username extraction on a file"""
print("\n" + "=" * 70)
print("TEST 1: OCR USERNAME EXTRACTION")
print("=" * 70)
print(f"File: {file_path}")
# Create mock database for testing
db = UnifiedDatabase()
detector = InstagramRepostDetector(unified_db=db)
username = detector._extract_username_from_repost(file_path)
if username:
print(f"✅ SUCCESS: Extracted username: @{username}")
return username
else:
print("❌ FAILED: No username found")
return None
def test_monitored_check(username: str):
"""Test if username is in monitored accounts"""
print("\n" + "=" * 70)
print("TEST 2: MONITORED ACCOUNT CHECK")
print("=" * 70)
print(f"Username: @{username}")
db = UnifiedDatabase()
detector = InstagramRepostDetector(unified_db=db)
is_monitored = detector._is_monitored_account(username)
if is_monitored:
print(f"✅ @{username} IS monitored (will use normal download path)")
else:
print(f" @{username} NOT monitored (will use temp queue)")
return is_monitored
def test_perceptual_hash(file_path: str):
"""Test perceptual hash calculation"""
print("\n" + "=" * 70)
print("TEST 3: PERCEPTUAL HASH CALCULATION")
print("=" * 70)
print(f"File: {file_path}")
db = UnifiedDatabase()
detector = InstagramRepostDetector(unified_db=db)
hash_value = detector._get_perceptual_hash(file_path)
if hash_value:
print(f"✅ SUCCESS: Hash = {hash_value}")
return hash_value
else:
print("❌ FAILED: Could not calculate hash")
return None
def test_full_detection(file_path: str, source_username: str, dry_run: bool = True):
"""Test full repost detection workflow"""
print("\n" + "=" * 70)
print("TEST 4: FULL REPOST DETECTION WORKFLOW")
print("=" * 70)
print(f"File: {file_path}")
print(f"Source: @{source_username}")
print(f"Mode: {'DRY RUN (no downloads)' if dry_run else 'LIVE (will download)'}")
if dry_run:
print("\n⚠️ DRY RUN MODE - Will not download content from ImgInn")
print("To test with actual downloads, run with --live flag")
return None
db = UnifiedDatabase()
detector = InstagramRepostDetector(unified_db=db)
print("\nStarting detection...")
replacement = detector.check_and_replace_repost(file_path, source_username)
if replacement:
print(f"\n✅ SUCCESS: Repost replaced!")
print(f"Original file: {file_path}")
print(f"Replacement file: {replacement}")
return replacement
else:
print("\n❌ FAILED: No replacement found")
print("Possible reasons:")
print(" - No @username detected in the file")
print(" - Original content not available")
print(" - No matching content found via perceptual hash")
return None
def test_database_tracking():
"""Test database tracking tables"""
print("\n" + "=" * 70)
print("TEST 5: DATABASE TRACKING")
print("=" * 70)
db = UnifiedDatabase()
# Check if repost_fetch_cache table exists
with db.get_connection() as conn:
cursor = conn.cursor()
# Check fetch cache
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='repost_fetch_cache'
""")
has_cache = cursor.fetchone() is not None
# Check replacements table
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='repost_replacements'
""")
has_replacements = cursor.fetchone() is not None
if has_cache:
print("✓ repost_fetch_cache table exists")
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM repost_fetch_cache")
count = cursor.fetchone()[0]
print(f" {count} usernames in cache")
else:
print(" repost_fetch_cache table will be created on first use")
if has_replacements:
print("✓ repost_replacements table exists")
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM repost_replacements")
count = cursor.fetchone()[0]
print(f" {count} replacements tracked")
if count > 0:
print("\nRecent replacements:")
cursor.execute("""
SELECT repost_source, original_username, repost_filename, detected_at
FROM repost_replacements
ORDER BY detected_at DESC
LIMIT 5
""")
for row in cursor.fetchall():
print(f" - @{row[0]} reposted from @{row[1]}: {row[2]} ({row[3]})")
else:
print(" repost_replacements table will be created on first use")
def main():
"""Main test runner"""
print("\n" + "=" * 70)
print("INSTAGRAM REPOST DETECTOR - MANUAL TEST SUITE")
print("=" * 70)
# Check if test file provided
if len(sys.argv) < 2:
print("\nUsage:")
print(" python3 tests/test_repost_detection_manual.py [file_path] [source_username] [--live]")
print("\nExamples:")
print(" # Test with real example file (dry run)")
print(' python3 tests/test_repost_detection_manual.py \\')
print(' "/media/.../evalongoria_20251109_154548_story6.mp4" \\')
print(' "evalongoria"')
print()
print(" # Test with actual downloads")
print(' python3 tests/test_repost_detection_manual.py \\')
print(' "/media/.../evalongoria_20251109_154548_story6.mp4" \\')
print(' "evalongoria" \\')
print(' --live')
print()
# Run dependency check and database check only
deps_ok = test_dependencies()
if deps_ok:
test_database_tracking()
return
file_path = sys.argv[1]
source_username = sys.argv[2] if len(sys.argv) >= 3 else "unknown"
dry_run = "--live" not in sys.argv
# Validate file exists
if not os.path.exists(file_path):
print(f"\n❌ ERROR: File not found: {file_path}")
return
# Test 1: Dependencies
deps_ok = test_dependencies()
if not deps_ok:
print("\n⚠️ Cannot proceed with tests - missing dependencies")
return
# Test 2: OCR extraction
username = test_ocr_extraction(file_path)
# Test 3: Monitored check (if username found)
if username:
test_monitored_check(username)
# Test 4: Perceptual hash
test_perceptual_hash(file_path)
# Test 5: Database tracking
test_database_tracking()
# Test 6: Full detection (if not dry run)
if not dry_run:
test_full_detection(file_path, source_username, dry_run=False)
else:
print("\n" + "=" * 70)
print("SKIPPING FULL WORKFLOW TEST (DRY RUN)")
print("=" * 70)
print("To test full workflow with actual downloads, add --live flag")
print("\n" + "=" * 70)
print("TEST SUITE COMPLETE")
print("=" * 70)
if __name__ == "__main__":
main()

30
tests/test_toolzu_captcha.sh Executable file
View File

@@ -0,0 +1,30 @@
#!/bin/bash
# Quick test of Toolzu with 2captcha
echo "Testing Toolzu Stories download with 2captcha support..."
echo ""
cd /opt/media-downloader
# Create test config
cat > /tmp/toolzu_test_config.json << 'INNER_EOF'
{
"username": "evalongoria",
"content_type": "stories",
"temp_dir": "/tmp/toolzu_test",
"days_back": 3,
"max_downloads": 5,
"headless": false,
"db_path": "/opt/media-downloader/database/media_downloader.db",
"twocaptcha_api_key": "cf57fdb7577ada64d150431d6589c8f4",
"cookie_file": "/opt/media-downloader/cookies/toolzu_cookies.json",
"toolzu_email": "",
"toolzu_password": ""
}
INNER_EOF
# Run test
timeout 180 ./venv/bin/python ./toolzu_subprocess_wrapper.py < /tmp/toolzu_test_config.json
echo ""
echo "Test complete!"