#!/usr/bin/env python3 """ Passkey Manager for Media Downloader Handles WebAuthn/Passkey operations Based on backup-central's implementation """ import sys import sqlite3 import json from datetime import datetime, timedelta from typing import Optional, Dict, List from pathlib import Path import os # Add parent path to allow imports from modules sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from modules.universal_logger import get_logger logger = get_logger('PasskeyManager') from webauthn import ( generate_registration_options, verify_registration_response, generate_authentication_options, verify_authentication_response, options_to_json ) from webauthn.helpers import base64url_to_bytes, bytes_to_base64url from webauthn.helpers.structs import ( PublicKeyCredentialDescriptor, UserVerificationRequirement, AttestationConveyancePreference, AuthenticatorSelectionCriteria, ResidentKeyRequirement ) class PasskeyManager: def __init__(self, db_path: str = None): if db_path is None: db_path = str(Path(__file__).parent.parent.parent / 'database' / 'auth.db') self.db_path = db_path # WebAuthn Configuration self.rp_name = 'Media Downloader' self.rp_id = self._get_rp_id() self.origin = self._get_origin() # Challenge timeout (5 minutes) self.challenge_timeout = timedelta(minutes=5) # Challenge storage (in-memory for now, could use Redis/DB for production) self.challenges = {} # username → challenge mapping # Initialize database self._init_database() def _get_rp_id(self) -> str: """Get Relying Party ID (domain)""" return os.getenv('WEBAUTHN_RP_ID', 'md.lic.ad') def _get_origin(self) -> str: """Get Origin URL""" return os.getenv('WEBAUTHN_ORIGIN', 'https://md.lic.ad') def _init_database(self): """Initialize passkey-related database tables""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Passkeys credentials table cursor.execute(""" CREATE TABLE IF NOT EXISTS passkey_credentials ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, credential_id TEXT NOT NULL UNIQUE, public_key TEXT NOT NULL, sign_count INTEGER NOT NULL DEFAULT 0, transports TEXT, device_name TEXT, aaguid TEXT, created_at TEXT NOT NULL, last_used TEXT, FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE ) """) # Passkey audit log cursor.execute(""" CREATE TABLE IF NOT EXISTS passkey_audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, action TEXT NOT NULL, success INTEGER NOT NULL, credential_id TEXT, ip_address TEXT, user_agent TEXT, details TEXT, timestamp TEXT NOT NULL ) """) # Add passkey enabled column to users try: cursor.execute("ALTER TABLE users ADD COLUMN passkey_enabled INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: pass conn.commit() def get_user_credentials(self, username: str) -> List[Dict]: """ Get all credentials for a user Args: username: Username Returns: List of credential dicts """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT credential_id, public_key, sign_count, device_name, created_at, last_used_at FROM passkey_credentials WHERE user_id = ? """, (username,)) rows = cursor.fetchall() credentials = [] for row in rows: credentials.append({ 'credential_id': row[0], 'public_key': row[1], 'sign_count': row[2], 'transports': None, 'device_name': row[3], 'aaguid': None, 'created_at': row[4], 'last_used': row[5] }) return credentials def generate_registration_options(self, username: str, email: Optional[str] = None) -> Dict: """ Generate registration options for a new passkey Args: username: Username email: Optional email for display name Returns: Registration options for client """ try: # Get existing credentials existing_credentials = self.get_user_credentials(username) # Create exclude credentials list exclude_credentials = [] for cred in existing_credentials: exclude_credentials.append( PublicKeyCredentialDescriptor( id=base64url_to_bytes(cred['credential_id']), transports=cred.get('transports') ) ) # Generate registration options options = generate_registration_options( rp_id=self.rp_id, rp_name=self.rp_name, user_id=username.encode('utf-8'), user_name=username, user_display_name=email or username, timeout=60000, # 60 seconds attestation=AttestationConveyancePreference.NONE, exclude_credentials=exclude_credentials, authenticator_selection=AuthenticatorSelectionCriteria( resident_key=ResidentKeyRequirement.PREFERRED, user_verification=UserVerificationRequirement.PREFERRED ), supported_pub_key_algs=[-7, -257] # ES256, RS256 ) # Store challenge challenge_str = bytes_to_base64url(options.challenge) self.challenges[username] = { 'challenge': challenge_str, 'type': 'registration', 'created_at': datetime.now(), 'expires_at': datetime.now() + self.challenge_timeout } self._log_audit(username, 'passkey_registration_options_generated', True, None, None, None, 'Registration options generated') # Convert to JSON-serializable format options_dict = options_to_json(options) return json.loads(options_dict) except Exception as e: self._log_audit(username, 'passkey_registration_options_failed', False, None, None, None, f'Error: {str(e)}') raise def verify_registration(self, username: str, credential_data: Dict, device_name: Optional[str] = None) -> Dict: """ Verify registration response and store credential Args: username: Username credential_data: Registration response from client device_name: Optional device name Returns: Dict with success status """ try: # Get stored challenge if username not in self.challenges: raise Exception("No registration in progress") challenge_data = self.challenges[username] # Check if expired if datetime.now() > challenge_data['expires_at']: del self.challenges[username] raise Exception("Challenge expired") if challenge_data['type'] != 'registration': raise Exception("Invalid challenge type") expected_challenge = base64url_to_bytes(challenge_data['challenge']) # Verify registration response verification = verify_registration_response( credential=credential_data, expected_challenge=expected_challenge, expected_rp_id=self.rp_id, expected_origin=self.origin ) # Store credential credential_id = bytes_to_base64url(verification.credential_id) public_key = bytes_to_base64url(verification.credential_public_key) # Extract transports if available transports = credential_data.get('response', {}).get('transports', []) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO passkey_credentials (user_id, credential_id, public_key, sign_count, device_name, created_at) VALUES (?, ?, ?, ?, ?, ?) """, ( username, credential_id, public_key, verification.sign_count, device_name, datetime.now().isoformat() )) # Enable passkey for user cursor.execute(""" UPDATE users SET passkey_enabled = 1 WHERE username = ? """, (username,)) conn.commit() # Clean up challenge del self.challenges[username] self._log_audit(username, 'passkey_registered', True, credential_id, None, None, f'Passkey registered: {device_name or "Unknown device"}') return { 'success': True, 'credentialId': credential_id } except Exception as e: logger.error(f"Passkey registration verification failed for {username}: {str(e)}", module="Passkey") logger.debug(f"Passkey registration detailed error:", exc_info=True, module="Passkey") self._log_audit(username, 'passkey_registration_failed', False, None, None, None, f'Error: {str(e)}') raise def generate_authentication_options(self, username: Optional[str] = None) -> Dict: """ Generate authentication options for passkey login Args: username: Optional username (if None, allows usernameless login) Returns: Authentication options for client """ try: allow_credentials = [] # If username provided, get their credentials if username: user_credentials = self.get_user_credentials(username) for cred in user_credentials: allow_credentials.append( PublicKeyCredentialDescriptor( id=base64url_to_bytes(cred['credential_id']), transports=cred.get('transports') ) ) # Generate authentication options options = generate_authentication_options( rp_id=self.rp_id, timeout=60000, # 60 seconds allow_credentials=allow_credentials if allow_credentials else None, user_verification=UserVerificationRequirement.PREFERRED ) # Store challenge challenge_str = bytes_to_base64url(options.challenge) self.challenges[username or '__anonymous__'] = { 'challenge': challenge_str, 'type': 'authentication', 'created_at': datetime.now(), 'expires_at': datetime.now() + self.challenge_timeout } self._log_audit(username or 'anonymous', 'passkey_authentication_options_generated', True, None, None, None, 'Authentication options generated') # Convert to JSON-serializable format options_dict = options_to_json(options) return json.loads(options_dict) except Exception as e: self._log_audit(username or 'anonymous', 'passkey_authentication_options_failed', False, None, None, None, f'Error: {str(e)}') raise def verify_authentication(self, username: str, credential_data: Dict) -> Dict: """ Verify authentication response Args: username: Username credential_data: Authentication response from client Returns: Dict with success status and verified username """ try: # Get stored challenge challenge_key = username or '__anonymous__' if challenge_key not in self.challenges: raise Exception("No authentication in progress") challenge_data = self.challenges[challenge_key] # Check if expired if datetime.now() > challenge_data['expires_at']: del self.challenges[challenge_key] raise Exception("Challenge expired") if challenge_data['type'] != 'authentication': raise Exception("Invalid challenge type") expected_challenge = base64url_to_bytes(challenge_data['challenge']) # Get credential ID from response credential_id = credential_data.get('id') or credential_data.get('rawId') if not credential_id: raise Exception("No credential ID in response") # Find credential in database with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT user_id, public_key, sign_count FROM passkey_credentials WHERE credential_id = ? """, (credential_id,)) row = cursor.fetchone() if not row: raise Exception("Credential not found") verified_username, public_key_b64, current_sign_count = row # Verify authentication response verification = verify_authentication_response( credential=credential_data, expected_challenge=expected_challenge, expected_rp_id=self.rp_id, expected_origin=self.origin, credential_public_key=base64url_to_bytes(public_key_b64), credential_current_sign_count=current_sign_count ) # Update sign count and last used cursor.execute(""" UPDATE passkey_credentials SET sign_count = ?, last_used_at = ? WHERE credential_id = ? """, (verification.new_sign_count, datetime.now().isoformat(), credential_id)) conn.commit() # Clean up challenge del self.challenges[challenge_key] self._log_audit(verified_username, 'passkey_authentication_success', True, credential_id, None, None, 'Passkey authentication successful') return { 'success': True, 'username': verified_username } except Exception as e: self._log_audit(username or 'anonymous', 'passkey_authentication_failed', False, None, None, None, f'Error: {str(e)}') raise def remove_credential(self, username: str, credential_id: str) -> bool: """ Remove a passkey credential Args: username: Username credential_id: Credential ID to remove Returns: True if successful """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Debug: List all credentials for this user cursor.execute("SELECT credential_id FROM passkey_credentials WHERE user_id = ?", (username,)) all_creds = cursor.fetchall() logger.debug(f"All credentials for {username}: {all_creds}", module="Passkey") logger.debug(f"Looking for credential_id: '{credential_id}'", module="Passkey") cursor.execute(""" DELETE FROM passkey_credentials WHERE user_id = ? AND credential_id = ? """, (username, credential_id)) deleted = cursor.rowcount > 0 logger.debug(f"Delete operation rowcount: {deleted}", module="Passkey") # Check if user has any remaining credentials cursor.execute(""" SELECT COUNT(*) FROM passkey_credentials WHERE user_id = ? """, (username,)) remaining = cursor.fetchone()[0] # If no credentials left, disable passkey if remaining == 0: cursor.execute(""" UPDATE users SET passkey_enabled = 0 WHERE username = ? """, (username,)) conn.commit() if deleted: self._log_audit(username, 'passkey_removed', True, credential_id, None, None, 'Passkey credential removed') return deleted except Exception as e: logger.error(f"Error removing passkey for {username}: {e}", module="Passkey") return False def get_passkey_status(self, username: str) -> Dict: """ Get passkey status for a user Args: username: Username Returns: Dict with enabled status and credential count """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT COUNT(*) FROM passkey_credentials WHERE user_id = ? """, (username,)) count = cursor.fetchone()[0] return { 'enabled': count > 0, 'credentialCount': count } def list_credentials(self, username: str) -> List[Dict]: """ List all credentials for a user (without sensitive data) Args: username: Username Returns: List of credential info dicts """ credentials = self.get_user_credentials(username) # Remove sensitive data safe_credentials = [] for cred in credentials: safe_credentials.append({ 'credentialId': cred['credential_id'], 'deviceName': cred.get('device_name', 'Unknown device'), 'createdAt': cred['created_at'], 'lastUsed': cred.get('last_used'), 'transports': cred.get('transports', []) }) return safe_credentials def _log_audit(self, username: str, action: str, success: bool, credential_id: Optional[str], ip_address: Optional[str], user_agent: Optional[str], details: Optional[str] = None): """Log passkey audit event""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO passkey_audit_log (user_id, action, success, credential_id, ip_address, user_agent, details, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (username, action, int(success), credential_id, ip_address, user_agent, details, datetime.now().isoformat())) conn.commit() except Exception as e: logger.error(f"Error logging passkey audit: {e}", module="Passkey")