#!/usr/bin/env python3 """ Two-Factor Authentication API Routes for Media Downloader Handles TOTP, Passkey, and Duo 2FA endpoints Based on backup-central's implementation """ import sys from pathlib import Path from fastapi import APIRouter, Depends, Request, Body from fastapi.responses import JSONResponse from typing import Optional, Dict, List from pydantic import BaseModel from core.config import settings # Add parent path to allow imports from modules sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from modules.universal_logger import get_logger logger = get_logger('TwoFactorAuth') # Import managers from web.backend.totp_manager import TOTPManager from web.backend.duo_manager import DuoManager # Passkey manager - optional (requires compatible webauthn library) try: from web.backend.passkey_manager import PasskeyManager PASSKEY_AVAILABLE = True logger.info("Passkey support enabled", module="2FA") except ImportError as e: logger.warning(f"Passkey support disabled: {e}", module="2FA") PasskeyManager = None PASSKEY_AVAILABLE = False # Pydantic models for request/response # Standard response models class StandardResponse(BaseModel): """Standard API response with success flag and message""" success: bool message: str class TOTPSetupResponse(BaseModel): success: bool secret: Optional[str] = None qrCodeDataURL: Optional[str] = None manualEntryKey: Optional[str] = None message: Optional[str] = None class TOTPVerifyRequest(BaseModel): code: str class TOTPVerifyResponse(BaseModel): success: bool backupCodes: Optional[List[str]] = None message: str class TOTPLoginVerifyRequest(BaseModel): username: str code: str useBackupCode: Optional[bool] = False rememberMe: bool = False class TOTPDisableRequest(BaseModel): password: str code: str class PasskeyRegistrationOptionsResponse(BaseModel): success: bool options: Optional[Dict] = None message: Optional[str] = None class PasskeyVerifyRegistrationRequest(BaseModel): credential: Dict deviceName: Optional[str] = None class PasskeyAuthenticationOptionsResponse(BaseModel): success: bool options: Optional[Dict] = None message: Optional[str] = None class PasskeyVerifyAuthenticationRequest(BaseModel): credential: Dict username: Optional[str] = None rememberMe: bool = False class DuoAuthRequest(BaseModel): username: str rememberMe: bool = False class DuoAuthResponse(BaseModel): success: bool authUrl: Optional[str] = None state: Optional[str] = None message: Optional[str] = None class DuoCallbackRequest(BaseModel): state: str duo_code: Optional[str] = None def create_2fa_router(auth_manager, get_current_user_dependency): """ Create and configure the 2FA router Args: auth_manager: AuthManager instance get_current_user_dependency: FastAPI dependency for authentication Returns: Configured APIRouter """ router = APIRouter() # Initialize managers totp_manager = TOTPManager() duo_manager = DuoManager() passkey_manager = PasskeyManager() if PASSKEY_AVAILABLE else None # ============================================================================ # TOTP Endpoints # ============================================================================ @router.post("/totp/setup", response_model=TOTPSetupResponse) async def totp_setup(request: Request, current_user: Dict = Depends(get_current_user_dependency)): """Generate TOTP secret and QR code""" try: username = current_user.get('sub') # Check if already enabled status = totp_manager.get_totp_status(username) if status['enabled']: return TOTPSetupResponse( success=False, message='2FA is already enabled for this account' ) # Generate secret and QR code result = totp_manager.generate_secret(username) # Store in session temporarily (until verified) # Note: In production, use proper session management request.session['totp_setup'] = { 'secret': result['secret'], 'username': username } return TOTPSetupResponse( success=True, secret=result['secret'], qrCodeDataURL=result['qrCodeDataURL'], manualEntryKey=result['manualEntryKey'], message='Scan QR code with your authenticator app' ) except Exception as e: logger.error(f"TOTP setup error: {e}", module="2FA") return TOTPSetupResponse(success=False, message='Failed to generate 2FA setup') @router.post("/totp/verify", response_model=TOTPVerifyResponse) async def totp_verify( verify_data: TOTPVerifyRequest, request: Request, current_user: Dict = Depends(get_current_user_dependency) ): """Verify TOTP code and enable 2FA""" try: username = current_user.get('sub') code = verify_data.code # Validate code format if not code or len(code) != 6 or not code.isdigit(): return TOTPVerifyResponse( success=False, message='Invalid code format. Must be 6 digits.' ) # Get temporary secret from session totp_setup = request.session.get('totp_setup') if not totp_setup or totp_setup.get('username') != username: return TOTPVerifyResponse( success=False, message='No setup in progress. Please start 2FA setup again.' ) secret = totp_setup['secret'] # Verify the code if not totp_manager.verify_token(secret, code): return TOTPVerifyResponse( success=False, message='Invalid verification code. Please try again.' ) # Generate backup codes backup_codes = totp_manager.generate_backup_codes(10) # Enable TOTP totp_manager.enable_totp(username, secret, backup_codes) # Clear session request.session.pop('totp_setup', None) return TOTPVerifyResponse( success=True, backupCodes=backup_codes, message='2FA enabled successfully! Save your backup codes.' ) except Exception as e: logger.error(f"TOTP verify error: {e}", module="2FA") return TOTPVerifyResponse(success=False, message='Failed to verify code') @router.post("/totp/disable") async def totp_disable( disable_data: TOTPDisableRequest, request: Request, current_user: Dict = Depends(get_current_user_dependency) ): """Disable TOTP 2FA""" try: username = current_user.get('sub') password = disable_data.password code = disable_data.code # Verify password user_info = auth_manager.get_user(username) if not user_info: return {'success': False, 'message': 'User not found'} # Get user password hash from database import sqlite3 with sqlite3.connect(auth_manager.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT password_hash, totp_secret FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if not row: return {'success': False, 'message': 'User not found'} password_hash, encrypted_secret = row if not auth_manager.verify_password(password, password_hash): return {'success': False, 'message': 'Invalid password'} # Verify current TOTP code if encrypted_secret: decrypted_secret = totp_manager.decrypt_secret(encrypted_secret) if not totp_manager.verify_token(decrypted_secret, code): return {'success': False, 'message': 'Invalid verification code'} # Disable TOTP totp_manager.disable_totp(username) return {'success': True, 'message': '2FA has been disabled'} except Exception as e: logger.error(f"TOTP disable error: {e}", module="2FA") return {'success': False, 'message': 'Failed to disable 2FA'} @router.post("/totp/regenerate-backup-codes") async def totp_regenerate_backup_codes( code: str = Body(..., embed=True), current_user: Dict = Depends(get_current_user_dependency) ): """Regenerate backup codes""" try: username = current_user.get('sub') # Get user import sqlite3 with sqlite3.connect(auth_manager.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT totp_enabled, totp_secret FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if not row or not row[0]: return {'success': False, 'message': '2FA is not enabled'} # Verify current code encrypted_secret = row[1] if encrypted_secret: decrypted_secret = totp_manager.decrypt_secret(encrypted_secret) if not totp_manager.verify_token(decrypted_secret, code): return {'success': False, 'message': 'Invalid verification code'} # Generate new backup codes backup_codes = totp_manager.regenerate_backup_codes(username) return { 'success': True, 'backupCodes': backup_codes, 'message': 'New backup codes generated' } except Exception as e: logger.error(f"Backup codes regeneration error: {e}", module="2FA") return {'success': False, 'message': 'Failed to regenerate backup codes'} @router.get("/totp/status") async def totp_status(current_user: Dict = Depends(get_current_user_dependency)): """Get TOTP status for current user""" try: username = current_user.get('sub') status = totp_manager.get_totp_status(username) return {'success': True, **status} except Exception as e: logger.error(f"TOTP status error: {e}", module="2FA") return {'success': False, 'message': 'Failed to get 2FA status'} @router.post("/totp/login/verify") async def totp_login_verify(verify_data: TOTPLoginVerifyRequest, request: Request): """Verify TOTP code during login""" try: username = verify_data.username code = verify_data.code use_backup_code = verify_data.useBackupCode # Get client IP ip_address = request.client.host if request.client else None # Check rate limit allowed, attempts_remaining, locked_until = totp_manager.check_rate_limit(username, ip_address) if not allowed: return { 'success': False, 'message': f'Too many failed attempts. Locked until {locked_until}', 'lockedUntil': locked_until } # Get user import sqlite3 with sqlite3.connect(auth_manager.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT password_hash, totp_enabled, totp_secret, role, email FROM users WHERE username = ? """, (username,)) row = cursor.fetchone() if not row or not row[1]: # Not TOTP enabled return {'success': False, 'message': 'Invalid request'} password_hash, totp_enabled, encrypted_secret, role, email = row is_valid = False # Verify backup code or TOTP if use_backup_code: is_valid, remaining = totp_manager.verify_backup_code(code, username) else: if encrypted_secret: decrypted_secret = totp_manager.decrypt_secret(encrypted_secret) is_valid = totp_manager.verify_token(decrypted_secret, code) if not is_valid: return { 'success': False, 'message': 'Invalid verification code', 'attemptsRemaining': attempts_remaining - 1 } # Success - reset rate limit and create session totp_manager.reset_rate_limit(username, ip_address) # Create session with remember_me setting session_result = auth_manager._create_session(username, role, ip_address, verify_data.rememberMe) # Create response with cookie response = JSONResponse(content=session_result) max_age = 30 * 24 * 60 * 60 if verify_data.rememberMe else None response.set_cookie( key="auth_token", value=session_result.get('token'), max_age=max_age, httponly=True, secure=settings.SECURE_COOKIES, samesite="lax", path="/" ) return response except Exception as e: logger.error(f"TOTP login verification error: {e}", module="2FA") return {'success': False, 'message': 'Verification failed'} # ============================================================================ # Passkey Endpoints # ============================================================================ @router.post("/passkey/registration-options", response_model=PasskeyRegistrationOptionsResponse) async def passkey_registration_options(current_user: Dict = Depends(get_current_user_dependency)): """Generate passkey registration options""" if not PASSKEY_AVAILABLE: return PasskeyRegistrationOptionsResponse( success=False, message='Passkey support is not available on this server' ) try: username = current_user.get('sub') email = current_user.get('email') options = passkey_manager.generate_registration_options(username, email) return PasskeyRegistrationOptionsResponse( success=True, options=options ) except Exception as e: logger.error(f"Passkey registration options error: {e}", module="2FA") return PasskeyRegistrationOptionsResponse( success=False, message='Failed to generate registration options. Please try again.' ) @router.post("/passkey/verify-registration") async def passkey_verify_registration( verify_data: PasskeyVerifyRegistrationRequest, current_user: Dict = Depends(get_current_user_dependency) ): """Verify passkey registration""" if not PASSKEY_AVAILABLE: return {'success': False, 'message': 'Passkey support is not available on this server'} try: username = current_user.get('sub') result = passkey_manager.verify_registration( username, verify_data.credential, verify_data.deviceName ) return {'success': True, **result} except Exception as e: logger.error(f"Passkey registration verification error: {e}", module="2FA") return {'success': False, 'message': 'Failed to verify passkey registration. Please try again.'} @router.post("/passkey/authentication-options", response_model=PasskeyAuthenticationOptionsResponse) async def passkey_authentication_options(username: Optional[str] = Body(None, embed=True)): """Generate passkey authentication options""" if not PASSKEY_AVAILABLE: return PasskeyAuthenticationOptionsResponse(success=False, message='Passkey support is not available on this server') try: options = passkey_manager.generate_authentication_options(username) return PasskeyAuthenticationOptionsResponse( success=True, options=options ) except Exception as e: logger.error(f"Passkey authentication options error: {e}", module="2FA") return PasskeyAuthenticationOptionsResponse( success=False, message='Failed to generate authentication options. Please try again.' ) @router.post("/passkey/verify-authentication") async def passkey_verify_authentication( verify_data: PasskeyVerifyAuthenticationRequest, request: Request ): """Verify passkey authentication""" if not PASSKEY_AVAILABLE: return {'success': False, 'message': 'Passkey support is not available on this server'} try: # Get client IP ip_address = request.client.host if request.client else None # Verify authentication - use username from request if provided result = passkey_manager.verify_authentication(verify_data.username, verify_data.credential) if result['success']: username = result['username'] # Get user info import sqlite3 with sqlite3.connect(auth_manager.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT role, email FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if row: role, email = row # Create session with remember_me setting session_result = auth_manager._create_session(username, role, ip_address, verify_data.rememberMe) # Create response with cookie response = JSONResponse(content=session_result) max_age = 30 * 24 * 60 * 60 if verify_data.rememberMe else None response.set_cookie( key="auth_token", value=session_result.get('token'), max_age=max_age, httponly=True, secure=settings.SECURE_COOKIES, samesite="lax", path="/" ) return response return result except Exception as e: logger.error(f"Passkey authentication verification error: {e}", module="2FA") return {'success': False, 'message': 'Failed to verify passkey authentication. Please try again.'} @router.get("/passkey/list") async def passkey_list(current_user: Dict = Depends(get_current_user_dependency)): """List user's passkeys""" if not PASSKEY_AVAILABLE: return {'success': False, 'message': 'Passkey support is not available on this server'} try: username = current_user.get('sub') credentials = passkey_manager.list_credentials(username) return {'success': True, 'credentials': credentials} except Exception as e: logger.error(f"Passkey list error: {e}", module="2FA") return {'success': False, 'message': 'Failed to retrieve passkeys. Please try again.'} @router.delete("/passkey/{credential_id}") async def passkey_remove( credential_id: str, current_user: Dict = Depends(get_current_user_dependency) ): """Remove a passkey""" if not PASSKEY_AVAILABLE: return {'success': False, 'message': 'Passkey support is not available on this server'} try: username = current_user.get('sub') logger.debug(f"Removing passkey - Username: {username}, Credential ID: {credential_id}", module="2FA") logger.debug(f"Credential ID length: {len(credential_id)}, type: {type(credential_id)}", module="2FA") success = passkey_manager.remove_credential(username, credential_id) if success: return {'success': True, 'message': 'Passkey removed'} else: return {'success': False, 'message': 'Passkey not found'} except Exception as e: logger.error(f"Passkey remove error: {e}", exc_info=True, module="2FA") return {'success': False, 'message': 'Failed to remove passkey. Please try again.'} @router.get("/passkey/status") async def passkey_status(current_user: Dict = Depends(get_current_user_dependency)): """Get passkey status""" if not PASSKEY_AVAILABLE: return {'success': False, 'enabled': False, 'message': 'Passkey support is not available on this server'} try: username = current_user.get('sub') status = passkey_manager.get_passkey_status(username) return {'success': True, **status} except Exception as e: logger.error(f"Passkey status error: {e}", module="2FA") return {'success': False, 'message': 'Failed to retrieve passkey status. Please try again.'} # ============================================================================ # Duo Endpoints # ============================================================================ @router.post("/duo/auth", response_model=DuoAuthResponse) async def duo_auth(auth_data: DuoAuthRequest): """Initiate Duo authentication""" try: if not duo_manager.is_duo_configured(): return DuoAuthResponse( success=False, message='Duo is not configured' ) username = auth_data.username # Create auth URL (pass rememberMe to preserve for callback) result = duo_manager.create_auth_url(username, auth_data.rememberMe) return DuoAuthResponse( success=True, authUrl=result['authUrl'], state=result['state'] ) except Exception as e: logger.error(f"Duo auth error: {e}", module="2FA") return DuoAuthResponse(success=False, message='Failed to initiate Duo authentication. Please try again.') @router.get("/duo/callback") async def duo_callback( state: str, duo_code: Optional[str] = None, code: Optional[str] = None, request: Request = None ): """Handle Duo callback (OAuth 2.0)""" try: # Duo sends 'code' parameter, normalize to duo_code if not duo_code and code: duo_code = code if not duo_code: return {'success': False, 'message': 'Missing authorization code'} # Verify state and get username and remember_me result = duo_manager.verify_state(state) if not result: return {'success': False, 'message': 'Invalid or expired state'} username, remember_me = result # Verify Duo response if duo_manager.verify_duo_response(duo_code, username): # Get client IP ip_address = request.client.host if request.client else None # Get user info import sqlite3 with sqlite3.connect(auth_manager.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT role, email FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if row: role, email = row # Create session with remember_me setting session_result = auth_manager._create_session(username, role, ip_address, remember_me) # Redirect to frontend with success (token in URL and cookie) from fastapi.responses import RedirectResponse token = session_result.get('token') session_id = session_result.get('sessionId') # Create redirect response with token in URL (frontend reads and stores it) redirect_url = f"/?duo_auth=success&token={token}&sessionId={session_id}&username={username}" response = RedirectResponse(url=redirect_url, status_code=302) # Set auth cookie with proper max_age max_age = 30 * 24 * 60 * 60 if remember_me else None response.set_cookie( key="auth_token", value=token, max_age=max_age, httponly=True, secure=settings.SECURE_COOKIES, samesite="lax", path="/" ) return response # Verification failed - redirect with error from fastapi.responses import RedirectResponse return RedirectResponse(url="/login?duo_auth=failed&error=Duo+verification+failed", status_code=302) except Exception as e: logger.error(f"Duo callback error: {e}", module="2FA") return {'success': False, 'message': 'Duo authentication failed. Please try again.'} @router.post("/duo/enroll") async def duo_enroll( duo_username: Optional[str] = Body(None, embed=True), current_user: Dict = Depends(get_current_user_dependency) ): """Enroll user in Duo""" try: username = current_user.get('sub') success = duo_manager.enroll_user(username, duo_username) if success: return {'success': True, 'message': 'Duo enrollment successful'} else: return {'success': False, 'message': 'Duo enrollment failed'} except Exception as e: logger.error(f"Duo enroll error: {e}", module="2FA") return {'success': False, 'message': 'Failed to enroll in Duo. Please try again.'} @router.post("/duo/unenroll") async def duo_unenroll(current_user: Dict = Depends(get_current_user_dependency)): """Unenroll user from Duo""" try: username = current_user.get('sub') success = duo_manager.unenroll_user(username) if success: return {'success': True, 'message': 'Duo unenrollment successful'} else: return {'success': False, 'message': 'Duo unenrollment failed'} except Exception as e: logger.error(f"Duo unenroll error: {e}", module="2FA") return {'success': False, 'message': 'Failed to unenroll from Duo. Please try again.'} @router.get("/duo/status") async def duo_status(current_user: Dict = Depends(get_current_user_dependency)): """Get Duo status""" try: username = current_user.get('sub') status = duo_manager.get_duo_status(username) config_status = duo_manager.get_configuration_status() return { 'success': True, **status, 'duoConfigured': config_status['configured'] } except Exception as e: logger.error(f"Duo status error: {e}", module="2FA") return {'success': False, 'message': 'Failed to retrieve Duo status. Please try again.'} # ============================================================================ # Combined 2FA Status Endpoint # ============================================================================ @router.get("/status") async def twofa_status(current_user: Dict = Depends(get_current_user_dependency)): """Get complete 2FA status for user""" try: username = current_user.get('sub') totp_status = totp_manager.get_totp_status(username) # Get passkey status and add availability flag if PASSKEY_AVAILABLE: passkey_status = passkey_manager.get_passkey_status(username) passkey_status['available'] = True else: passkey_status = {'enabled': False, 'available': False, 'credentialCount': 0} duo_status = duo_manager.get_duo_status(username) duo_config_status = duo_manager.get_configuration_status() # Determine available methods available_methods = [] if totp_status['enabled']: available_methods.append('totp') if PASSKEY_AVAILABLE and passkey_status['enabled']: available_methods.append('passkey') if duo_status['enabled'] and duo_config_status['configured']: available_methods.append('duo') return { 'success': True, 'totp': totp_status, 'passkey': passkey_status, 'duo': {**duo_status, 'duoConfigured': duo_config_status['configured']}, 'availableMethods': available_methods, 'anyEnabled': len(available_methods) > 0 } except Exception as e: logger.error(f"2FA status error: {e}", module="2FA") return {'success': False, 'message': str(e)} return router