793 lines
29 KiB
Python
793 lines
29 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Two-Factor Authentication API Routes for Media Downloader
|
|
Handles TOTP, Passkey, and Duo 2FA endpoints
|
|
|
|
Based on backup-central's implementation
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from fastapi import APIRouter, Depends, Request, Body
|
|
from fastapi.responses import JSONResponse
|
|
from typing import Optional, Dict, List
|
|
from pydantic import BaseModel
|
|
|
|
from core.config import settings
|
|
|
|
# Add parent path to allow imports from modules
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
from modules.universal_logger import get_logger
|
|
|
|
logger = get_logger('TwoFactorAuth')
|
|
|
|
# Import managers
|
|
from web.backend.totp_manager import TOTPManager
|
|
from web.backend.duo_manager import DuoManager
|
|
|
|
# Passkey manager - optional (requires compatible webauthn library)
|
|
try:
|
|
from web.backend.passkey_manager import PasskeyManager
|
|
PASSKEY_AVAILABLE = True
|
|
logger.info("Passkey support enabled", module="2FA")
|
|
except ImportError as e:
|
|
logger.warning(f"Passkey support disabled: {e}", module="2FA")
|
|
PasskeyManager = None
|
|
PASSKEY_AVAILABLE = False
|
|
|
|
|
|
# Pydantic models for request/response
|
|
|
|
# Standard response models
|
|
class StandardResponse(BaseModel):
|
|
"""Standard API response with success flag and message"""
|
|
success: bool
|
|
message: str
|
|
|
|
|
|
class TOTPSetupResponse(BaseModel):
|
|
success: bool
|
|
secret: Optional[str] = None
|
|
qrCodeDataURL: Optional[str] = None
|
|
manualEntryKey: Optional[str] = None
|
|
message: Optional[str] = None
|
|
|
|
|
|
class TOTPVerifyRequest(BaseModel):
|
|
code: str
|
|
|
|
|
|
class TOTPVerifyResponse(BaseModel):
|
|
success: bool
|
|
backupCodes: Optional[List[str]] = None
|
|
message: str
|
|
|
|
|
|
class TOTPLoginVerifyRequest(BaseModel):
|
|
username: str
|
|
code: str
|
|
useBackupCode: Optional[bool] = False
|
|
rememberMe: bool = False
|
|
|
|
|
|
class TOTPDisableRequest(BaseModel):
|
|
password: str
|
|
code: str
|
|
|
|
|
|
class PasskeyRegistrationOptionsResponse(BaseModel):
|
|
success: bool
|
|
options: Optional[Dict] = None
|
|
message: Optional[str] = None
|
|
|
|
|
|
class PasskeyVerifyRegistrationRequest(BaseModel):
|
|
credential: Dict
|
|
deviceName: Optional[str] = None
|
|
|
|
|
|
class PasskeyAuthenticationOptionsResponse(BaseModel):
|
|
success: bool
|
|
options: Optional[Dict] = None
|
|
message: Optional[str] = None
|
|
|
|
|
|
class PasskeyVerifyAuthenticationRequest(BaseModel):
|
|
credential: Dict
|
|
username: Optional[str] = None
|
|
rememberMe: bool = False
|
|
|
|
|
|
class DuoAuthRequest(BaseModel):
|
|
username: str
|
|
rememberMe: bool = False
|
|
|
|
|
|
class DuoAuthResponse(BaseModel):
|
|
success: bool
|
|
authUrl: Optional[str] = None
|
|
state: Optional[str] = None
|
|
message: Optional[str] = None
|
|
|
|
|
|
class DuoCallbackRequest(BaseModel):
|
|
state: str
|
|
duo_code: Optional[str] = None
|
|
|
|
|
|
def create_2fa_router(auth_manager, get_current_user_dependency):
|
|
"""
|
|
Create and configure the 2FA router
|
|
|
|
Args:
|
|
auth_manager: AuthManager instance
|
|
get_current_user_dependency: FastAPI dependency for authentication
|
|
|
|
Returns:
|
|
Configured APIRouter
|
|
"""
|
|
router = APIRouter()
|
|
|
|
# Initialize managers
|
|
totp_manager = TOTPManager()
|
|
duo_manager = DuoManager()
|
|
passkey_manager = PasskeyManager() if PASSKEY_AVAILABLE else None
|
|
|
|
# ============================================================================
|
|
# TOTP Endpoints
|
|
# ============================================================================
|
|
|
|
@router.post("/totp/setup", response_model=TOTPSetupResponse)
|
|
async def totp_setup(request: Request, current_user: Dict = Depends(get_current_user_dependency)):
|
|
"""Generate TOTP secret and QR code"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
|
|
# Check if already enabled
|
|
status = totp_manager.get_totp_status(username)
|
|
if status['enabled']:
|
|
return TOTPSetupResponse(
|
|
success=False,
|
|
message='2FA is already enabled for this account'
|
|
)
|
|
|
|
# Generate secret and QR code
|
|
result = totp_manager.generate_secret(username)
|
|
|
|
# Store in session temporarily (until verified)
|
|
# Note: In production, use proper session management
|
|
request.session['totp_setup'] = {
|
|
'secret': result['secret'],
|
|
'username': username
|
|
}
|
|
|
|
return TOTPSetupResponse(
|
|
success=True,
|
|
secret=result['secret'],
|
|
qrCodeDataURL=result['qrCodeDataURL'],
|
|
manualEntryKey=result['manualEntryKey'],
|
|
message='Scan QR code with your authenticator app'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"TOTP setup error: {e}", module="2FA")
|
|
return TOTPSetupResponse(success=False, message='Failed to generate 2FA setup')
|
|
|
|
@router.post("/totp/verify", response_model=TOTPVerifyResponse)
|
|
async def totp_verify(
|
|
verify_data: TOTPVerifyRequest,
|
|
request: Request,
|
|
current_user: Dict = Depends(get_current_user_dependency)
|
|
):
|
|
"""Verify TOTP code and enable 2FA"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
code = verify_data.code
|
|
|
|
# Validate code format
|
|
if not code or len(code) != 6 or not code.isdigit():
|
|
return TOTPVerifyResponse(
|
|
success=False,
|
|
message='Invalid code format. Must be 6 digits.'
|
|
)
|
|
|
|
# Get temporary secret from session
|
|
totp_setup = request.session.get('totp_setup')
|
|
if not totp_setup or totp_setup.get('username') != username:
|
|
return TOTPVerifyResponse(
|
|
success=False,
|
|
message='No setup in progress. Please start 2FA setup again.'
|
|
)
|
|
|
|
secret = totp_setup['secret']
|
|
|
|
# Verify the code
|
|
if not totp_manager.verify_token(secret, code):
|
|
return TOTPVerifyResponse(
|
|
success=False,
|
|
message='Invalid verification code. Please try again.'
|
|
)
|
|
|
|
# Generate backup codes
|
|
backup_codes = totp_manager.generate_backup_codes(10)
|
|
|
|
# Enable TOTP
|
|
totp_manager.enable_totp(username, secret, backup_codes)
|
|
|
|
# Clear session
|
|
request.session.pop('totp_setup', None)
|
|
|
|
return TOTPVerifyResponse(
|
|
success=True,
|
|
backupCodes=backup_codes,
|
|
message='2FA enabled successfully! Save your backup codes.'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"TOTP verify error: {e}", module="2FA")
|
|
return TOTPVerifyResponse(success=False, message='Failed to verify code')
|
|
|
|
@router.post("/totp/disable")
|
|
async def totp_disable(
|
|
disable_data: TOTPDisableRequest,
|
|
request: Request,
|
|
current_user: Dict = Depends(get_current_user_dependency)
|
|
):
|
|
"""Disable TOTP 2FA"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
password = disable_data.password
|
|
code = disable_data.code
|
|
|
|
# Verify password
|
|
user_info = auth_manager.get_user(username)
|
|
if not user_info:
|
|
return {'success': False, 'message': 'User not found'}
|
|
|
|
# Get user password hash from database
|
|
import sqlite3
|
|
with sqlite3.connect(auth_manager.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT password_hash, totp_secret FROM users WHERE username = ?", (username,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return {'success': False, 'message': 'User not found'}
|
|
|
|
password_hash, encrypted_secret = row
|
|
|
|
if not auth_manager.verify_password(password, password_hash):
|
|
return {'success': False, 'message': 'Invalid password'}
|
|
|
|
# Verify current TOTP code
|
|
if encrypted_secret:
|
|
decrypted_secret = totp_manager.decrypt_secret(encrypted_secret)
|
|
if not totp_manager.verify_token(decrypted_secret, code):
|
|
return {'success': False, 'message': 'Invalid verification code'}
|
|
|
|
# Disable TOTP
|
|
totp_manager.disable_totp(username)
|
|
|
|
return {'success': True, 'message': '2FA has been disabled'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"TOTP disable error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to disable 2FA'}
|
|
|
|
@router.post("/totp/regenerate-backup-codes")
|
|
async def totp_regenerate_backup_codes(
|
|
code: str = Body(..., embed=True),
|
|
current_user: Dict = Depends(get_current_user_dependency)
|
|
):
|
|
"""Regenerate backup codes"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
|
|
# Get user
|
|
import sqlite3
|
|
with sqlite3.connect(auth_manager.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT totp_enabled, totp_secret FROM users WHERE username = ?", (username,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row or not row[0]:
|
|
return {'success': False, 'message': '2FA is not enabled'}
|
|
|
|
# Verify current code
|
|
encrypted_secret = row[1]
|
|
if encrypted_secret:
|
|
decrypted_secret = totp_manager.decrypt_secret(encrypted_secret)
|
|
if not totp_manager.verify_token(decrypted_secret, code):
|
|
return {'success': False, 'message': 'Invalid verification code'}
|
|
|
|
# Generate new backup codes
|
|
backup_codes = totp_manager.regenerate_backup_codes(username)
|
|
|
|
return {
|
|
'success': True,
|
|
'backupCodes': backup_codes,
|
|
'message': 'New backup codes generated'
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Backup codes regeneration error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to regenerate backup codes'}
|
|
|
|
@router.get("/totp/status")
|
|
async def totp_status(current_user: Dict = Depends(get_current_user_dependency)):
|
|
"""Get TOTP status for current user"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
status = totp_manager.get_totp_status(username)
|
|
|
|
return {'success': True, **status}
|
|
|
|
except Exception as e:
|
|
logger.error(f"TOTP status error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to get 2FA status'}
|
|
|
|
@router.post("/totp/login/verify")
|
|
async def totp_login_verify(verify_data: TOTPLoginVerifyRequest, request: Request):
|
|
"""Verify TOTP code during login"""
|
|
try:
|
|
username = verify_data.username
|
|
code = verify_data.code
|
|
use_backup_code = verify_data.useBackupCode
|
|
|
|
# Get client IP
|
|
ip_address = request.client.host if request.client else None
|
|
|
|
# Check rate limit
|
|
allowed, attempts_remaining, locked_until = totp_manager.check_rate_limit(username, ip_address)
|
|
|
|
if not allowed:
|
|
return {
|
|
'success': False,
|
|
'message': f'Too many failed attempts. Locked until {locked_until}',
|
|
'lockedUntil': locked_until
|
|
}
|
|
|
|
# Get user
|
|
import sqlite3
|
|
with sqlite3.connect(auth_manager.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT password_hash, totp_enabled, totp_secret, role, email
|
|
FROM users WHERE username = ?
|
|
""", (username,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row or not row[1]: # Not TOTP enabled
|
|
return {'success': False, 'message': 'Invalid request'}
|
|
|
|
password_hash, totp_enabled, encrypted_secret, role, email = row
|
|
|
|
is_valid = False
|
|
|
|
# Verify backup code or TOTP
|
|
if use_backup_code:
|
|
is_valid, remaining = totp_manager.verify_backup_code(code, username)
|
|
else:
|
|
if encrypted_secret:
|
|
decrypted_secret = totp_manager.decrypt_secret(encrypted_secret)
|
|
is_valid = totp_manager.verify_token(decrypted_secret, code)
|
|
|
|
if not is_valid:
|
|
return {
|
|
'success': False,
|
|
'message': 'Invalid verification code',
|
|
'attemptsRemaining': attempts_remaining - 1
|
|
}
|
|
|
|
# Success - reset rate limit and create session
|
|
totp_manager.reset_rate_limit(username, ip_address)
|
|
|
|
# Create session with remember_me setting
|
|
session_result = auth_manager._create_session(username, role, ip_address, verify_data.rememberMe)
|
|
|
|
# Create response with cookie
|
|
response = JSONResponse(content=session_result)
|
|
max_age = 30 * 24 * 60 * 60 if verify_data.rememberMe else None
|
|
response.set_cookie(
|
|
key="auth_token",
|
|
value=session_result.get('token'),
|
|
max_age=max_age,
|
|
httponly=True,
|
|
secure=settings.SECURE_COOKIES,
|
|
samesite="lax",
|
|
path="/"
|
|
)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"TOTP login verification error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Verification failed'}
|
|
|
|
# ============================================================================
|
|
# Passkey Endpoints
|
|
# ============================================================================
|
|
|
|
@router.post("/passkey/registration-options", response_model=PasskeyRegistrationOptionsResponse)
|
|
async def passkey_registration_options(current_user: Dict = Depends(get_current_user_dependency)):
|
|
"""Generate passkey registration options"""
|
|
if not PASSKEY_AVAILABLE:
|
|
return PasskeyRegistrationOptionsResponse(
|
|
success=False,
|
|
message='Passkey support is not available on this server'
|
|
)
|
|
|
|
try:
|
|
username = current_user.get('sub')
|
|
email = current_user.get('email')
|
|
|
|
options = passkey_manager.generate_registration_options(username, email)
|
|
|
|
return PasskeyRegistrationOptionsResponse(
|
|
success=True,
|
|
options=options
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Passkey registration options error: {e}", module="2FA")
|
|
return PasskeyRegistrationOptionsResponse(
|
|
success=False,
|
|
message='Failed to generate registration options. Please try again.'
|
|
)
|
|
|
|
@router.post("/passkey/verify-registration")
|
|
async def passkey_verify_registration(
|
|
verify_data: PasskeyVerifyRegistrationRequest,
|
|
current_user: Dict = Depends(get_current_user_dependency)
|
|
):
|
|
"""Verify passkey registration"""
|
|
if not PASSKEY_AVAILABLE:
|
|
return {'success': False, 'message': 'Passkey support is not available on this server'}
|
|
|
|
try:
|
|
username = current_user.get('sub')
|
|
|
|
result = passkey_manager.verify_registration(
|
|
username,
|
|
verify_data.credential,
|
|
verify_data.deviceName
|
|
)
|
|
|
|
return {'success': True, **result}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Passkey registration verification error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to verify passkey registration. Please try again.'}
|
|
|
|
@router.post("/passkey/authentication-options", response_model=PasskeyAuthenticationOptionsResponse)
|
|
async def passkey_authentication_options(username: Optional[str] = Body(None, embed=True)):
|
|
"""Generate passkey authentication options"""
|
|
if not PASSKEY_AVAILABLE:
|
|
return PasskeyAuthenticationOptionsResponse(success=False, message='Passkey support is not available on this server')
|
|
|
|
try:
|
|
options = passkey_manager.generate_authentication_options(username)
|
|
|
|
return PasskeyAuthenticationOptionsResponse(
|
|
success=True,
|
|
options=options
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Passkey authentication options error: {e}", module="2FA")
|
|
return PasskeyAuthenticationOptionsResponse(
|
|
success=False,
|
|
message='Failed to generate authentication options. Please try again.'
|
|
)
|
|
|
|
@router.post("/passkey/verify-authentication")
|
|
async def passkey_verify_authentication(
|
|
verify_data: PasskeyVerifyAuthenticationRequest,
|
|
request: Request
|
|
):
|
|
"""Verify passkey authentication"""
|
|
if not PASSKEY_AVAILABLE:
|
|
return {'success': False, 'message': 'Passkey support is not available on this server'}
|
|
|
|
try:
|
|
# Get client IP
|
|
ip_address = request.client.host if request.client else None
|
|
|
|
# Verify authentication - use username from request if provided
|
|
result = passkey_manager.verify_authentication(verify_data.username, verify_data.credential)
|
|
|
|
if result['success']:
|
|
username = result['username']
|
|
|
|
# Get user info
|
|
import sqlite3
|
|
with sqlite3.connect(auth_manager.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT role, email FROM users WHERE username = ?", (username,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
role, email = row
|
|
|
|
# Create session with remember_me setting
|
|
session_result = auth_manager._create_session(username, role, ip_address, verify_data.rememberMe)
|
|
|
|
# Create response with cookie
|
|
response = JSONResponse(content=session_result)
|
|
max_age = 30 * 24 * 60 * 60 if verify_data.rememberMe else None
|
|
response.set_cookie(
|
|
key="auth_token",
|
|
value=session_result.get('token'),
|
|
max_age=max_age,
|
|
httponly=True,
|
|
secure=settings.SECURE_COOKIES,
|
|
samesite="lax",
|
|
path="/"
|
|
)
|
|
|
|
return response
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Passkey authentication verification error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to verify passkey authentication. Please try again.'}
|
|
|
|
@router.get("/passkey/list")
|
|
async def passkey_list(current_user: Dict = Depends(get_current_user_dependency)):
|
|
"""List user's passkeys"""
|
|
if not PASSKEY_AVAILABLE:
|
|
return {'success': False, 'message': 'Passkey support is not available on this server'}
|
|
|
|
try:
|
|
username = current_user.get('sub')
|
|
credentials = passkey_manager.list_credentials(username)
|
|
|
|
return {'success': True, 'credentials': credentials}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Passkey list error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to retrieve passkeys. Please try again.'}
|
|
|
|
@router.delete("/passkey/{credential_id}")
|
|
async def passkey_remove(
|
|
credential_id: str,
|
|
current_user: Dict = Depends(get_current_user_dependency)
|
|
):
|
|
"""Remove a passkey"""
|
|
if not PASSKEY_AVAILABLE:
|
|
return {'success': False, 'message': 'Passkey support is not available on this server'}
|
|
|
|
try:
|
|
username = current_user.get('sub')
|
|
logger.debug(f"Removing passkey - Username: {username}, Credential ID: {credential_id}", module="2FA")
|
|
logger.debug(f"Credential ID length: {len(credential_id)}, type: {type(credential_id)}", module="2FA")
|
|
success = passkey_manager.remove_credential(username, credential_id)
|
|
|
|
if success:
|
|
return {'success': True, 'message': 'Passkey removed'}
|
|
else:
|
|
return {'success': False, 'message': 'Passkey not found'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Passkey remove error: {e}", exc_info=True, module="2FA")
|
|
return {'success': False, 'message': 'Failed to remove passkey. Please try again.'}
|
|
|
|
@router.get("/passkey/status")
|
|
async def passkey_status(current_user: Dict = Depends(get_current_user_dependency)):
|
|
"""Get passkey status"""
|
|
if not PASSKEY_AVAILABLE:
|
|
return {'success': False, 'enabled': False, 'message': 'Passkey support is not available on this server'}
|
|
|
|
try:
|
|
username = current_user.get('sub')
|
|
status = passkey_manager.get_passkey_status(username)
|
|
|
|
return {'success': True, **status}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Passkey status error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to retrieve passkey status. Please try again.'}
|
|
|
|
# ============================================================================
|
|
# Duo Endpoints
|
|
# ============================================================================
|
|
|
|
@router.post("/duo/auth", response_model=DuoAuthResponse)
|
|
async def duo_auth(auth_data: DuoAuthRequest):
|
|
"""Initiate Duo authentication"""
|
|
try:
|
|
if not duo_manager.is_duo_configured():
|
|
return DuoAuthResponse(
|
|
success=False,
|
|
message='Duo is not configured'
|
|
)
|
|
|
|
username = auth_data.username
|
|
|
|
# Create auth URL (pass rememberMe to preserve for callback)
|
|
result = duo_manager.create_auth_url(username, auth_data.rememberMe)
|
|
|
|
return DuoAuthResponse(
|
|
success=True,
|
|
authUrl=result['authUrl'],
|
|
state=result['state']
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Duo auth error: {e}", module="2FA")
|
|
return DuoAuthResponse(success=False, message='Failed to initiate Duo authentication. Please try again.')
|
|
|
|
@router.get("/duo/callback")
|
|
async def duo_callback(
|
|
state: str,
|
|
duo_code: Optional[str] = None,
|
|
code: Optional[str] = None,
|
|
request: Request = None
|
|
):
|
|
"""Handle Duo callback (OAuth 2.0)"""
|
|
try:
|
|
# Duo sends 'code' parameter, normalize to duo_code
|
|
if not duo_code and code:
|
|
duo_code = code
|
|
|
|
if not duo_code:
|
|
return {'success': False, 'message': 'Missing authorization code'}
|
|
|
|
# Verify state and get username and remember_me
|
|
result = duo_manager.verify_state(state)
|
|
if not result:
|
|
return {'success': False, 'message': 'Invalid or expired state'}
|
|
|
|
username, remember_me = result
|
|
|
|
# Verify Duo response
|
|
if duo_manager.verify_duo_response(duo_code, username):
|
|
# Get client IP
|
|
ip_address = request.client.host if request.client else None
|
|
|
|
# Get user info
|
|
import sqlite3
|
|
with sqlite3.connect(auth_manager.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT role, email FROM users WHERE username = ?", (username,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
role, email = row
|
|
|
|
# Create session with remember_me setting
|
|
session_result = auth_manager._create_session(username, role, ip_address, remember_me)
|
|
|
|
# Redirect to frontend with success (token in URL and cookie)
|
|
from fastapi.responses import RedirectResponse
|
|
token = session_result.get('token')
|
|
session_id = session_result.get('sessionId')
|
|
|
|
# Create redirect response with token in URL (frontend reads and stores it)
|
|
redirect_url = f"/?duo_auth=success&token={token}&sessionId={session_id}&username={username}"
|
|
response = RedirectResponse(url=redirect_url, status_code=302)
|
|
|
|
# Set auth cookie with proper max_age
|
|
max_age = 30 * 24 * 60 * 60 if remember_me else None
|
|
response.set_cookie(
|
|
key="auth_token",
|
|
value=token,
|
|
max_age=max_age,
|
|
httponly=True,
|
|
secure=settings.SECURE_COOKIES,
|
|
samesite="lax",
|
|
path="/"
|
|
)
|
|
|
|
return response
|
|
|
|
# Verification failed - redirect with error
|
|
from fastapi.responses import RedirectResponse
|
|
return RedirectResponse(url="/login?duo_auth=failed&error=Duo+verification+failed", status_code=302)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Duo callback error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Duo authentication failed. Please try again.'}
|
|
|
|
@router.post("/duo/enroll")
|
|
async def duo_enroll(
|
|
duo_username: Optional[str] = Body(None, embed=True),
|
|
current_user: Dict = Depends(get_current_user_dependency)
|
|
):
|
|
"""Enroll user in Duo"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
|
|
success = duo_manager.enroll_user(username, duo_username)
|
|
|
|
if success:
|
|
return {'success': True, 'message': 'Duo enrollment successful'}
|
|
else:
|
|
return {'success': False, 'message': 'Duo enrollment failed'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Duo enroll error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to enroll in Duo. Please try again.'}
|
|
|
|
@router.post("/duo/unenroll")
|
|
async def duo_unenroll(current_user: Dict = Depends(get_current_user_dependency)):
|
|
"""Unenroll user from Duo"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
|
|
success = duo_manager.unenroll_user(username)
|
|
|
|
if success:
|
|
return {'success': True, 'message': 'Duo unenrollment successful'}
|
|
else:
|
|
return {'success': False, 'message': 'Duo unenrollment failed'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Duo unenroll error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to unenroll from Duo. Please try again.'}
|
|
|
|
@router.get("/duo/status")
|
|
async def duo_status(current_user: Dict = Depends(get_current_user_dependency)):
|
|
"""Get Duo status"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
status = duo_manager.get_duo_status(username)
|
|
config_status = duo_manager.get_configuration_status()
|
|
|
|
return {
|
|
'success': True,
|
|
**status,
|
|
'duoConfigured': config_status['configured']
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Duo status error: {e}", module="2FA")
|
|
return {'success': False, 'message': 'Failed to retrieve Duo status. Please try again.'}
|
|
|
|
# ============================================================================
|
|
# Combined 2FA Status Endpoint
|
|
# ============================================================================
|
|
|
|
@router.get("/status")
|
|
async def twofa_status(current_user: Dict = Depends(get_current_user_dependency)):
|
|
"""Get complete 2FA status for user"""
|
|
try:
|
|
username = current_user.get('sub')
|
|
|
|
totp_status = totp_manager.get_totp_status(username)
|
|
|
|
# Get passkey status and add availability flag
|
|
if PASSKEY_AVAILABLE:
|
|
passkey_status = passkey_manager.get_passkey_status(username)
|
|
passkey_status['available'] = True
|
|
else:
|
|
passkey_status = {'enabled': False, 'available': False, 'credentialCount': 0}
|
|
|
|
duo_status = duo_manager.get_duo_status(username)
|
|
duo_config_status = duo_manager.get_configuration_status()
|
|
|
|
# Determine available methods
|
|
available_methods = []
|
|
if totp_status['enabled']:
|
|
available_methods.append('totp')
|
|
if PASSKEY_AVAILABLE and passkey_status['enabled']:
|
|
available_methods.append('passkey')
|
|
if duo_status['enabled'] and duo_config_status['configured']:
|
|
available_methods.append('duo')
|
|
|
|
return {
|
|
'success': True,
|
|
'totp': totp_status,
|
|
'passkey': passkey_status,
|
|
'duo': {**duo_status, 'duoConfigured': duo_config_status['configured']},
|
|
'availableMethods': available_methods,
|
|
'anyEnabled': len(available_methods) > 0
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"2FA status error: {e}", module="2FA")
|
|
return {'success': False, 'message': str(e)}
|
|
|
|
return router
|