566 lines
21 KiB
Python
566 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Authentication Manager for Media Downloader
|
|
Handles user authentication and sessions
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
import secrets
|
|
import hashlib
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional, Dict, List, Tuple
|
|
from passlib.context import CryptContext
|
|
from jose import jwt, JWTError
|
|
from pathlib import Path
|
|
|
|
# Password hashing
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
# JWT Configuration
|
|
def _load_jwt_secret():
|
|
"""Load JWT secret from file, environment, or generate new one"""
|
|
# Try to load from file first
|
|
secret_file = Path(__file__).parent.parent.parent / '.jwt_secret'
|
|
if secret_file.exists():
|
|
with open(secret_file, 'r') as f:
|
|
secret = f.read().strip()
|
|
# Validate secret has minimum length (at least 32 chars)
|
|
if len(secret) >= 32:
|
|
return secret
|
|
# If too short, regenerate
|
|
|
|
# Fallback to environment variable
|
|
if "JWT_SECRET_KEY" in os.environ:
|
|
secret = os.environ["JWT_SECRET_KEY"]
|
|
if len(secret) >= 32:
|
|
return secret
|
|
|
|
# Generate a cryptographically strong secret (384 bits / 48 bytes)
|
|
new_secret = secrets.token_urlsafe(48)
|
|
try:
|
|
with open(secret_file, 'w') as f:
|
|
f.write(new_secret)
|
|
os.chmod(secret_file, 0o600)
|
|
except Exception:
|
|
# Log warning but continue - in-memory secret will invalidate on restart
|
|
import logging
|
|
logging.getLogger(__name__).warning(
|
|
"Could not save JWT secret to file. Tokens will be invalidated on restart."
|
|
)
|
|
|
|
return new_secret
|
|
|
|
SECRET_KEY = _load_jwt_secret()
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours (default)
|
|
ACCESS_TOKEN_REMEMBER_MINUTES = 60 * 24 * 30 # 30 days (remember me)
|
|
|
|
class AuthManager:
|
|
def __init__(self, db_path: str = None):
|
|
if db_path is None:
|
|
db_path = str(Path(__file__).parent.parent.parent / 'database' / 'auth.db')
|
|
|
|
self.db_path = db_path
|
|
|
|
# Rate limiting
|
|
self.login_attempts = {}
|
|
self.max_attempts = 5
|
|
self.lockout_duration = timedelta(minutes=15)
|
|
|
|
self._init_database()
|
|
self._create_default_user()
|
|
|
|
def _init_database(self):
|
|
"""Initialize the authentication database schema"""
|
|
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Users table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
username TEXT PRIMARY KEY,
|
|
password_hash TEXT NOT NULL,
|
|
role TEXT NOT NULL DEFAULT 'viewer',
|
|
email TEXT,
|
|
is_active INTEGER NOT NULL DEFAULT 1,
|
|
totp_secret TEXT,
|
|
totp_enabled INTEGER NOT NULL DEFAULT 0,
|
|
duo_enabled INTEGER NOT NULL DEFAULT 0,
|
|
duo_username TEXT,
|
|
created_at TEXT NOT NULL,
|
|
last_login TEXT,
|
|
preferences TEXT
|
|
)
|
|
""")
|
|
|
|
# Add duo_enabled column to existing table if it doesn't exist
|
|
try:
|
|
cursor.execute("ALTER TABLE users ADD COLUMN duo_enabled INTEGER NOT NULL DEFAULT 0")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
|
|
try:
|
|
cursor.execute("ALTER TABLE users ADD COLUMN duo_username TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
|
|
# Backup codes table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS backup_codes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT NOT NULL,
|
|
code_hash TEXT NOT NULL,
|
|
used INTEGER NOT NULL DEFAULT 0,
|
|
used_at TEXT,
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Sessions table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
session_token TEXT PRIMARY KEY,
|
|
username TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
expires_at TEXT NOT NULL,
|
|
ip_address TEXT,
|
|
user_agent TEXT,
|
|
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Login attempts table (for rate limiting)
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS login_attempts (
|
|
username TEXT PRIMARY KEY,
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
last_attempt TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
# Audit log
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS auth_audit (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
success INTEGER NOT NULL,
|
|
ip_address TEXT,
|
|
details TEXT,
|
|
timestamp TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
|
|
def _create_default_user(self):
|
|
"""Create default admin user if no users exist"""
|
|
import logging
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM users")
|
|
if cursor.fetchone()[0] == 0:
|
|
# Get password from environment or generate secure random one
|
|
default_password = os.environ.get("ADMIN_PASSWORD")
|
|
password_generated = False
|
|
|
|
if not default_password:
|
|
# Generate a secure random password (16 chars, URL-safe)
|
|
default_password = secrets.token_urlsafe(16)
|
|
password_generated = True
|
|
|
|
password_hash = pwd_context.hash(default_password)
|
|
|
|
cursor.execute("""
|
|
INSERT INTO users (username, password_hash, role, email, created_at, preferences)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
'admin',
|
|
password_hash,
|
|
'admin',
|
|
'admin@localhost',
|
|
datetime.now().isoformat(),
|
|
'{"theme": "dark"}'
|
|
))
|
|
|
|
conn.commit()
|
|
|
|
# Security: Never log the password - write to secure file instead
|
|
if password_generated:
|
|
password_file = Path(__file__).parent.parent.parent / '.admin_password'
|
|
try:
|
|
with open(password_file, 'w') as f:
|
|
f.write(f"Admin password (delete after first login):\n{default_password}\n")
|
|
os.chmod(password_file, 0o600)
|
|
_logger.info("Created default admin user. Password saved to .admin_password")
|
|
except Exception:
|
|
# If we can't write file, show masked hint only
|
|
_logger.info("Created default admin user. Set ADMIN_PASSWORD env var to control password.")
|
|
else:
|
|
_logger.info("Created default admin user with password from ADMIN_PASSWORD")
|
|
|
|
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
|
|
"""Verify a password against its hash"""
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
def get_password_hash(self, password: str) -> str:
|
|
"""Hash a password"""
|
|
return pwd_context.hash(password)
|
|
|
|
def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Create a JWT access token"""
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
|
else:
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
def verify_token(self, token: str) -> Optional[Dict]:
|
|
"""Verify and decode a JWT token"""
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
return payload
|
|
except JWTError:
|
|
return None
|
|
|
|
def check_rate_limit(self, username: str) -> Tuple[bool, Optional[str]]:
|
|
"""Check if user is rate limited"""
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT attempts, last_attempt FROM login_attempts
|
|
WHERE username = ?
|
|
""", (username,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return True, None
|
|
|
|
attempts, last_attempt_str = row
|
|
last_attempt = datetime.fromisoformat(last_attempt_str)
|
|
|
|
if attempts >= self.max_attempts:
|
|
time_since = datetime.now() - last_attempt
|
|
if time_since < self.lockout_duration:
|
|
remaining = self.lockout_duration - time_since
|
|
return False, f"Account locked. Try again in {int(remaining.total_seconds() / 60)} minutes"
|
|
else:
|
|
# Reset after lockout period
|
|
self._reset_login_attempts(username)
|
|
return True, None
|
|
|
|
return True, None
|
|
|
|
def _reset_login_attempts(self, username: str):
|
|
"""Reset login attempts for a user"""
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM login_attempts WHERE username = ?", (username,))
|
|
conn.commit()
|
|
|
|
def _record_login_attempt(self, username: str, success: bool):
|
|
"""Record a login attempt"""
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
if success:
|
|
# Reset attempts on successful login
|
|
cursor.execute("DELETE FROM login_attempts WHERE username = ?", (username,))
|
|
else:
|
|
# Increment attempts
|
|
now = datetime.now().isoformat()
|
|
cursor.execute("""
|
|
INSERT INTO login_attempts (username, attempts, last_attempt)
|
|
VALUES (?, 1, ?)
|
|
ON CONFLICT(username) DO UPDATE SET
|
|
attempts = login_attempts.attempts + 1,
|
|
last_attempt = EXCLUDED.last_attempt
|
|
""", (username, now))
|
|
|
|
conn.commit()
|
|
|
|
def authenticate(self, username: str, password: str, ip_address: str = None, remember_me: bool = False) -> Dict:
|
|
"""Authenticate a user with username and password"""
|
|
# Check rate limiting
|
|
allowed, error_msg = self.check_rate_limit(username)
|
|
if not allowed:
|
|
return {'success': False, 'error': error_msg, 'requires_2fa': False}
|
|
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT password_hash, role, is_active
|
|
FROM users WHERE username = ?
|
|
""", (username,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
self._record_login_attempt(username, False)
|
|
self._log_audit(username, 'login_failed', False, ip_address, 'Invalid username')
|
|
return {'success': False, 'error': 'Invalid credentials'}
|
|
|
|
password_hash, role, is_active = row
|
|
|
|
if not is_active:
|
|
self._log_audit(username, 'login_failed', False, ip_address, 'Account inactive')
|
|
return {'success': False, 'error': 'Account is inactive'}
|
|
|
|
if not self.verify_password(password, password_hash):
|
|
self._record_login_attempt(username, False)
|
|
self._log_audit(username, 'login_failed', False, ip_address, 'Invalid password')
|
|
return {'success': False, 'error': 'Invalid credentials'}
|
|
|
|
# Password is correct, create session (2FA removed)
|
|
return self._create_session(username, role, ip_address, remember_me)
|
|
|
|
def _create_session(self, username: str, role: str, ip_address: str = None, remember_me: bool = False) -> Dict:
|
|
"""Create a new session and return token - matches backup-central format"""
|
|
import json
|
|
import uuid
|
|
|
|
# Create JWT token
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get user email and preferences
|
|
cursor.execute("SELECT email, preferences FROM users WHERE username = ?", (username,))
|
|
row = cursor.fetchone()
|
|
email = row[0] if row and row[0] else None
|
|
|
|
# Parse preferences JSON
|
|
preferences = {}
|
|
if row and row[1]:
|
|
try:
|
|
preferences = json.loads(row[1])
|
|
except Exception:
|
|
pass
|
|
|
|
# Use longer expiration if remember_me is enabled
|
|
expire_minutes = ACCESS_TOKEN_REMEMBER_MINUTES if remember_me else ACCESS_TOKEN_EXPIRE_MINUTES
|
|
token_data = {"sub": username, "role": role, "email": email}
|
|
token = self.create_access_token(token_data, expires_delta=timedelta(minutes=expire_minutes))
|
|
|
|
# Generate session ID
|
|
sessionId = str(uuid.uuid4())
|
|
|
|
now = datetime.now().isoformat()
|
|
expires_at = (datetime.now() + timedelta(minutes=expire_minutes)).isoformat()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO sessions (session_token, username, created_at, expires_at, ip_address)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (token, username, now, expires_at, ip_address))
|
|
|
|
# Update last login
|
|
cursor.execute("""
|
|
UPDATE users SET last_login = ? WHERE username = ?
|
|
""", (now, username))
|
|
|
|
conn.commit()
|
|
|
|
self._log_audit(username, 'login_success', True, ip_address)
|
|
|
|
# Return structure matching backup-central (with preferences)
|
|
return {
|
|
'success': True,
|
|
'token': token,
|
|
'sessionId': sessionId,
|
|
'user': {
|
|
'username': username,
|
|
'role': role,
|
|
'email': email,
|
|
'preferences': preferences
|
|
}
|
|
}
|
|
|
|
def _log_audit(self, username: str, action: str, success: bool, ip_address: str = None, details: str = None):
|
|
"""Log an audit event"""
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO auth_audit (username, action, success, ip_address, details, timestamp)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (username, action, int(success), ip_address, details, datetime.now().isoformat()))
|
|
|
|
conn.commit()
|
|
|
|
def get_user(self, username: str) -> Optional[Dict]:
|
|
"""Get user information"""
|
|
import json
|
|
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT username, role, email, is_active, totp_enabled, duo_enabled, duo_username, created_at, last_login, preferences
|
|
FROM users WHERE username = ?
|
|
""", (username,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
# Parse preferences JSON
|
|
preferences = {}
|
|
if row[9]:
|
|
try:
|
|
preferences = json.loads(row[9])
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
'username': row[0],
|
|
'role': row[1],
|
|
'email': row[2],
|
|
'is_active': bool(row[3]),
|
|
'totp_enabled': bool(row[4]),
|
|
'duo_enabled': bool(row[5]),
|
|
'duo_username': row[6],
|
|
'created_at': row[7],
|
|
'last_login': row[8],
|
|
'preferences': preferences
|
|
}
|
|
|
|
def verify_session(self, token: str) -> Optional[Dict]:
|
|
"""Verify a session token"""
|
|
payload = self.verify_token(token)
|
|
if not payload:
|
|
return None
|
|
|
|
username = payload.get("sub")
|
|
if not username:
|
|
return None
|
|
|
|
# Check if session exists and is valid
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT expires_at FROM sessions
|
|
WHERE session_token = ? AND username = ?
|
|
""", (token, username))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
expires_at = datetime.fromisoformat(row[0])
|
|
if expires_at < datetime.now():
|
|
# Session expired
|
|
return None
|
|
|
|
return payload
|
|
|
|
def logout(self, token: str):
|
|
"""Logout and invalidate session"""
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM sessions WHERE session_token = ?", (token,))
|
|
conn.commit()
|
|
|
|
# ============================================================================
|
|
# DUO 2FA METHODS (Duo Universal Prompt)
|
|
# ============================================================================
|
|
# Duo authentication is now handled via DuoManager using Universal Prompt
|
|
# The flow is: login → redirect to Duo → callback → complete login
|
|
# All Duo operations are delegated to self.duo_manager
|
|
|
|
def change_password(self, username: str, current_password: str, new_password: str, ip_address: str = None) -> Dict:
|
|
"""Change user password"""
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get current password hash
|
|
cursor.execute("SELECT password_hash FROM users WHERE username = ?", (username,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return {'success': False, 'error': 'User not found'}
|
|
|
|
current_hash = row[0]
|
|
|
|
# Verify current password
|
|
if not self.verify_password(current_password, current_hash):
|
|
self._log_audit(username, 'password_change_failed', False, ip_address, 'Invalid current password')
|
|
return {'success': False, 'error': 'Current password is incorrect'}
|
|
|
|
# Validate new password (minimum 8 characters)
|
|
if len(new_password) < 8:
|
|
return {'success': False, 'error': 'New password must be at least 8 characters'}
|
|
|
|
# Hash new password
|
|
new_hash = self.get_password_hash(new_password)
|
|
|
|
# Update password
|
|
cursor.execute("""
|
|
UPDATE users SET password_hash = ?
|
|
WHERE username = ?
|
|
""", (new_hash, username))
|
|
|
|
# Security: Invalidate ALL existing sessions for this user
|
|
# This ensures compromised sessions are revoked when password changes
|
|
cursor.execute("""
|
|
DELETE FROM sessions WHERE username = ?
|
|
""", (username,))
|
|
|
|
sessions_invalidated = cursor.rowcount
|
|
|
|
conn.commit()
|
|
|
|
self._log_audit(username, 'password_changed', True, ip_address,
|
|
f'Invalidated {sessions_invalidated} sessions')
|
|
|
|
return {'success': True, 'sessions_invalidated': sessions_invalidated}
|
|
|
|
def update_preferences(self, username: str, preferences: dict) -> Dict:
|
|
"""Update user preferences (theme, etc.)"""
|
|
import json
|
|
|
|
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get current preferences
|
|
cursor.execute("SELECT preferences FROM users WHERE username = ?", (username,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return {'success': False, 'error': 'User not found'}
|
|
|
|
# Merge with existing preferences
|
|
current_prefs = {}
|
|
if row[0]:
|
|
try:
|
|
current_prefs = json.loads(row[0])
|
|
except Exception:
|
|
pass
|
|
|
|
current_prefs.update(preferences)
|
|
|
|
# Update preferences
|
|
cursor.execute("""
|
|
UPDATE users SET preferences = ?
|
|
WHERE username = ?
|
|
""", (json.dumps(current_prefs), username))
|
|
|
|
conn.commit()
|
|
|
|
return {'success': True, 'preferences': current_prefs}
|