#!/usr/bin/env python3 """ Authentication Manager for Media Downloader Handles user authentication and sessions """ import os import sqlite3 import secrets import hashlib from datetime import datetime, timedelta, timezone from typing import Optional, Dict, List, Tuple from passlib.context import CryptContext from jose import jwt, JWTError from pathlib import Path # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT Configuration def _load_jwt_secret(): """Load JWT secret from file, environment, or generate new one""" # Try to load from file first secret_file = Path(__file__).parent.parent.parent / '.jwt_secret' if secret_file.exists(): with open(secret_file, 'r') as f: secret = f.read().strip() # Validate secret has minimum length (at least 32 chars) if len(secret) >= 32: return secret # If too short, regenerate # Fallback to environment variable if "JWT_SECRET_KEY" in os.environ: secret = os.environ["JWT_SECRET_KEY"] if len(secret) >= 32: return secret # Generate a cryptographically strong secret (384 bits / 48 bytes) new_secret = secrets.token_urlsafe(48) try: with open(secret_file, 'w') as f: f.write(new_secret) os.chmod(secret_file, 0o600) except Exception: # Log warning but continue - in-memory secret will invalidate on restart import logging logging.getLogger(__name__).warning( "Could not save JWT secret to file. Tokens will be invalidated on restart." ) return new_secret SECRET_KEY = _load_jwt_secret() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours (default) ACCESS_TOKEN_REMEMBER_MINUTES = 60 * 24 * 30 # 30 days (remember me) class AuthManager: def __init__(self, db_path: str = None): if db_path is None: db_path = str(Path(__file__).parent.parent.parent / 'database' / 'auth.db') self.db_path = db_path # Rate limiting self.login_attempts = {} self.max_attempts = 5 self.lockout_duration = timedelta(minutes=15) self._init_database() self._create_default_user() def _init_database(self): """Initialize the authentication database schema""" Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() # Users table cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'viewer', email TEXT, is_active INTEGER NOT NULL DEFAULT 1, totp_secret TEXT, totp_enabled INTEGER NOT NULL DEFAULT 0, duo_enabled INTEGER NOT NULL DEFAULT 0, duo_username TEXT, created_at TEXT NOT NULL, last_login TEXT, preferences TEXT ) """) # Add duo_enabled column to existing table if it doesn't exist try: cursor.execute("ALTER TABLE users ADD COLUMN duo_enabled INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: pass # Column already exists try: cursor.execute("ALTER TABLE users ADD COLUMN duo_username TEXT") except sqlite3.OperationalError: pass # Column already exists # Backup codes table cursor.execute(""" CREATE TABLE IF NOT EXISTS backup_codes ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, code_hash TEXT NOT NULL, used INTEGER NOT NULL DEFAULT 0, used_at TEXT, created_at TEXT NOT NULL, FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE ) """) # Sessions table cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( session_token TEXT PRIMARY KEY, username TEXT NOT NULL, created_at TEXT NOT NULL, expires_at TEXT NOT NULL, ip_address TEXT, user_agent TEXT, FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE ) """) # Login attempts table (for rate limiting) cursor.execute(""" CREATE TABLE IF NOT EXISTS login_attempts ( username TEXT PRIMARY KEY, attempts INTEGER NOT NULL DEFAULT 0, last_attempt TEXT NOT NULL ) """) # Audit log cursor.execute(""" CREATE TABLE IF NOT EXISTS auth_audit ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, action TEXT NOT NULL, success INTEGER NOT NULL, ip_address TEXT, details TEXT, timestamp TEXT NOT NULL ) """) conn.commit() def _create_default_user(self): """Create default admin user if no users exist""" import logging _logger = logging.getLogger(__name__) with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM users") if cursor.fetchone()[0] == 0: # Get password from environment or generate secure random one default_password = os.environ.get("ADMIN_PASSWORD") password_generated = False if not default_password: # Generate a secure random password (16 chars, URL-safe) default_password = secrets.token_urlsafe(16) password_generated = True password_hash = pwd_context.hash(default_password) cursor.execute(""" INSERT INTO users (username, password_hash, role, email, created_at, preferences) VALUES (?, ?, ?, ?, ?, ?) """, ( 'admin', password_hash, 'admin', 'admin@localhost', datetime.now().isoformat(), '{"theme": "dark"}' )) conn.commit() # Security: Never log the password - write to secure file instead if password_generated: password_file = Path(__file__).parent.parent.parent / '.admin_password' try: with open(password_file, 'w') as f: f.write(f"Admin password (delete after first login):\n{default_password}\n") os.chmod(password_file, 0o600) _logger.info("Created default admin user. Password saved to .admin_password") except Exception: # If we can't write file, show masked hint only _logger.info("Created default admin user. Set ADMIN_PASSWORD env var to control password.") else: _logger.info("Created default admin user with password from ADMIN_PASSWORD") def verify_password(self, plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(self, password: str) -> str: """Hash a password""" return pwd_context.hash(password) def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(self, token: str) -> Optional[Dict]: """Verify and decode a JWT token""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: return None def check_rate_limit(self, username: str) -> Tuple[bool, Optional[str]]: """Check if user is rate limited""" with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() cursor.execute(""" SELECT attempts, last_attempt FROM login_attempts WHERE username = ? """, (username,)) row = cursor.fetchone() if not row: return True, None attempts, last_attempt_str = row last_attempt = datetime.fromisoformat(last_attempt_str) if attempts >= self.max_attempts: time_since = datetime.now() - last_attempt if time_since < self.lockout_duration: remaining = self.lockout_duration - time_since return False, f"Account locked. Try again in {int(remaining.total_seconds() / 60)} minutes" else: # Reset after lockout period self._reset_login_attempts(username) return True, None return True, None def _reset_login_attempts(self, username: str): """Reset login attempts for a user""" with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() cursor.execute("DELETE FROM login_attempts WHERE username = ?", (username,)) conn.commit() def _record_login_attempt(self, username: str, success: bool): """Record a login attempt""" with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() if success: # Reset attempts on successful login cursor.execute("DELETE FROM login_attempts WHERE username = ?", (username,)) else: # Increment attempts now = datetime.now().isoformat() cursor.execute(""" INSERT INTO login_attempts (username, attempts, last_attempt) VALUES (?, 1, ?) ON CONFLICT(username) DO UPDATE SET attempts = login_attempts.attempts + 1, last_attempt = EXCLUDED.last_attempt """, (username, now)) conn.commit() def authenticate(self, username: str, password: str, ip_address: str = None, remember_me: bool = False) -> Dict: """Authenticate a user with username and password""" # Check rate limiting allowed, error_msg = self.check_rate_limit(username) if not allowed: return {'success': False, 'error': error_msg, 'requires_2fa': False} with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() cursor.execute(""" SELECT password_hash, role, is_active FROM users WHERE username = ? """, (username,)) row = cursor.fetchone() if not row: self._record_login_attempt(username, False) self._log_audit(username, 'login_failed', False, ip_address, 'Invalid username') return {'success': False, 'error': 'Invalid credentials'} password_hash, role, is_active = row if not is_active: self._log_audit(username, 'login_failed', False, ip_address, 'Account inactive') return {'success': False, 'error': 'Account is inactive'} if not self.verify_password(password, password_hash): self._record_login_attempt(username, False) self._log_audit(username, 'login_failed', False, ip_address, 'Invalid password') return {'success': False, 'error': 'Invalid credentials'} # Password is correct, create session (2FA removed) return self._create_session(username, role, ip_address, remember_me) def _create_session(self, username: str, role: str, ip_address: str = None, remember_me: bool = False) -> Dict: """Create a new session and return token - matches backup-central format""" import json import uuid # Create JWT token with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() # Get user email and preferences cursor.execute("SELECT email, preferences FROM users WHERE username = ?", (username,)) row = cursor.fetchone() email = row[0] if row and row[0] else None # Parse preferences JSON preferences = {} if row and row[1]: try: preferences = json.loads(row[1]) except Exception: pass # Use longer expiration if remember_me is enabled expire_minutes = ACCESS_TOKEN_REMEMBER_MINUTES if remember_me else ACCESS_TOKEN_EXPIRE_MINUTES token_data = {"sub": username, "role": role, "email": email} token = self.create_access_token(token_data, expires_delta=timedelta(minutes=expire_minutes)) # Generate session ID sessionId = str(uuid.uuid4()) now = datetime.now().isoformat() expires_at = (datetime.now() + timedelta(minutes=expire_minutes)).isoformat() cursor.execute(""" INSERT INTO sessions (session_token, username, created_at, expires_at, ip_address) VALUES (?, ?, ?, ?, ?) """, (token, username, now, expires_at, ip_address)) # Update last login cursor.execute(""" UPDATE users SET last_login = ? WHERE username = ? """, (now, username)) conn.commit() self._log_audit(username, 'login_success', True, ip_address) # Return structure matching backup-central (with preferences) return { 'success': True, 'token': token, 'sessionId': sessionId, 'user': { 'username': username, 'role': role, 'email': email, 'preferences': preferences } } def _log_audit(self, username: str, action: str, success: bool, ip_address: str = None, details: str = None): """Log an audit event""" with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO auth_audit (username, action, success, ip_address, details, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, (username, action, int(success), ip_address, details, datetime.now().isoformat())) conn.commit() def get_user(self, username: str) -> Optional[Dict]: """Get user information""" import json with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() cursor.execute(""" SELECT username, role, email, is_active, totp_enabled, duo_enabled, duo_username, created_at, last_login, preferences FROM users WHERE username = ? """, (username,)) row = cursor.fetchone() if not row: return None # Parse preferences JSON preferences = {} if row[9]: try: preferences = json.loads(row[9]) except Exception: pass return { 'username': row[0], 'role': row[1], 'email': row[2], 'is_active': bool(row[3]), 'totp_enabled': bool(row[4]), 'duo_enabled': bool(row[5]), 'duo_username': row[6], 'created_at': row[7], 'last_login': row[8], 'preferences': preferences } def verify_session(self, token: str) -> Optional[Dict]: """Verify a session token""" payload = self.verify_token(token) if not payload: return None username = payload.get("sub") if not username: return None # Check if session exists and is valid with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() cursor.execute(""" SELECT expires_at FROM sessions WHERE session_token = ? AND username = ? """, (token, username)) row = cursor.fetchone() if not row: return None expires_at = datetime.fromisoformat(row[0]) if expires_at < datetime.now(): # Session expired return None return payload def logout(self, token: str): """Logout and invalidate session""" with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() cursor.execute("DELETE FROM sessions WHERE session_token = ?", (token,)) conn.commit() # ============================================================================ # DUO 2FA METHODS (Duo Universal Prompt) # ============================================================================ # Duo authentication is now handled via DuoManager using Universal Prompt # The flow is: login → redirect to Duo → callback → complete login # All Duo operations are delegated to self.duo_manager def change_password(self, username: str, current_password: str, new_password: str, ip_address: str = None) -> Dict: """Change user password""" with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() # Get current password hash cursor.execute("SELECT password_hash FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if not row: return {'success': False, 'error': 'User not found'} current_hash = row[0] # Verify current password if not self.verify_password(current_password, current_hash): self._log_audit(username, 'password_change_failed', False, ip_address, 'Invalid current password') return {'success': False, 'error': 'Current password is incorrect'} # Validate new password (minimum 8 characters) if len(new_password) < 8: return {'success': False, 'error': 'New password must be at least 8 characters'} # Hash new password new_hash = self.get_password_hash(new_password) # Update password cursor.execute(""" UPDATE users SET password_hash = ? WHERE username = ? """, (new_hash, username)) # Security: Invalidate ALL existing sessions for this user # This ensures compromised sessions are revoked when password changes cursor.execute(""" DELETE FROM sessions WHERE username = ? """, (username,)) sessions_invalidated = cursor.rowcount conn.commit() self._log_audit(username, 'password_changed', True, ip_address, f'Invalidated {sessions_invalidated} sessions') return {'success': True, 'sessions_invalidated': sessions_invalidated} def update_preferences(self, username: str, preferences: dict) -> Dict: """Update user preferences (theme, etc.)""" import json with sqlite3.connect(self.db_path, timeout=30.0) as conn: cursor = conn.cursor() # Get current preferences cursor.execute("SELECT preferences FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if not row: return {'success': False, 'error': 'User not found'} # Merge with existing preferences current_prefs = {} if row[0]: try: current_prefs = json.loads(row[0]) except Exception: pass current_prefs.update(preferences) # Update preferences cursor.execute(""" UPDATE users SET preferences = ? WHERE username = ? """, (json.dumps(current_prefs), username)) conn.commit() return {'success': True, 'preferences': current_prefs}