320 lines
11 KiB
Python
320 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Service Health Monitor - Tracks service failures and sends alerts
|
|
Only active during scheduler mode for unattended operation monitoring
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Optional
|
|
from modules.universal_logger import get_logger
|
|
|
|
|
|
class ServiceHealthMonitor:
|
|
"""Monitor service health and send alerts when services get stuck"""
|
|
|
|
def __init__(self,
|
|
state_file: str = "/opt/media-downloader/database/service_health.json",
|
|
config: dict = None,
|
|
error_monitoring_config: dict = None,
|
|
pushover_notifier = None,
|
|
scheduler_mode: bool = False):
|
|
"""
|
|
Initialize health monitor
|
|
|
|
Args:
|
|
state_file: Path to JSON file storing health state
|
|
config: Configuration dict from settings.json
|
|
error_monitoring_config: Error monitoring settings (for push alert delay)
|
|
pushover_notifier: Instance of PushoverNotifier for alerts
|
|
scheduler_mode: Only monitor when True (scheduler mode)
|
|
"""
|
|
self.state_file = Path(state_file)
|
|
self.state_file.parent.mkdir(parents=True, exist_ok=True)
|
|
self.pushover = pushover_notifier
|
|
self.scheduler_mode = scheduler_mode
|
|
self.error_monitoring_config = error_monitoring_config or {}
|
|
|
|
# Default configuration
|
|
self.config = {
|
|
'enabled': True,
|
|
'notification_cooldown_hours': 24,
|
|
'min_consecutive_failures': 2, # Number of consecutive run failures before alerting
|
|
'services': {
|
|
'fastdl': {'monitor': True, 'notify': True},
|
|
'imginn': {'monitor': True, 'notify': True},
|
|
'snapchat': {'monitor': True, 'notify': True},
|
|
'toolzu': {'monitor': True, 'notify': True},
|
|
'tiktok': {'monitor': True, 'notify': True},
|
|
'forums': {'monitor': True, 'notify': True}
|
|
},
|
|
'pushover': {
|
|
'enabled': True,
|
|
'priority': 0,
|
|
'sound': 'pushover'
|
|
}
|
|
}
|
|
|
|
# Merge user config
|
|
if config:
|
|
self.config.update(config)
|
|
|
|
# Load or initialize state
|
|
self.state = self._load_state()
|
|
|
|
# Setup logging
|
|
self.logger = get_logger('ServiceHealthMonitor')
|
|
|
|
def _load_state(self) -> Dict:
|
|
"""Load health state from file"""
|
|
if self.state_file.exists():
|
|
try:
|
|
with open(self.state_file, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to load health state: {e}")
|
|
|
|
# Initialize empty state
|
|
return {'service_health': {}}
|
|
|
|
def _save_state(self):
|
|
"""Save health state to file"""
|
|
try:
|
|
with open(self.state_file, 'w') as f:
|
|
json.dump(self.state, f, indent=2, default=str)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to save health state: {e}")
|
|
|
|
def _get_service_state(self, service: str) -> Dict:
|
|
"""Get state for a service, initialize if doesn't exist"""
|
|
if service not in self.state['service_health']:
|
|
self.state['service_health'][service] = {
|
|
'status': 'healthy',
|
|
'consecutive_failures': 0,
|
|
'last_success': None,
|
|
'last_failure': None,
|
|
'last_notification_sent': None,
|
|
'failure_type': None,
|
|
'total_failures': 0,
|
|
'total_successes': 0
|
|
}
|
|
return self.state['service_health'][service]
|
|
|
|
def record_success(self, service: str):
|
|
"""
|
|
Record successful operation for a service
|
|
|
|
Args:
|
|
service: Service name (fastdl, imginn, snapchat, etc.)
|
|
"""
|
|
# Only monitor in scheduler mode
|
|
if not self.scheduler_mode:
|
|
return
|
|
|
|
# Check if service is monitored
|
|
if not self._is_monitored(service):
|
|
return
|
|
|
|
state = self._get_service_state(service)
|
|
now = datetime.now()
|
|
|
|
# Was service previously stuck? Send recovery notification
|
|
was_stuck = state['status'] == 'stuck'
|
|
|
|
# Update state
|
|
state['status'] = 'healthy'
|
|
state['consecutive_failures'] = 0
|
|
state['last_success'] = now.isoformat()
|
|
state['failure_type'] = None
|
|
state['total_successes'] += 1
|
|
|
|
self._save_state()
|
|
|
|
# Send recovery notification if service was stuck
|
|
if was_stuck and self._should_notify(service):
|
|
self._send_recovery_notification(service, now)
|
|
|
|
def record_failure(self, service: str, reason: str = 'unknown'):
|
|
"""
|
|
Record failure for a service
|
|
|
|
Args:
|
|
service: Service name (fastdl, imginn, snapchat, etc.)
|
|
reason: Reason for failure (cloudflare, rate_limit, timeout, etc.)
|
|
"""
|
|
# Only monitor in scheduler mode
|
|
if not self.scheduler_mode:
|
|
return
|
|
|
|
# Check if service is monitored
|
|
if not self._is_monitored(service):
|
|
return
|
|
|
|
state = self._get_service_state(service)
|
|
now = datetime.now()
|
|
|
|
# Update state - increment consecutive failures
|
|
state['consecutive_failures'] += 1
|
|
state['last_failure'] = now.isoformat()
|
|
state['failure_type'] = reason
|
|
state['total_failures'] += 1
|
|
|
|
# Check if service should be marked as stuck based on consecutive run failures
|
|
min_failures = self.config.get('min_consecutive_failures', 2)
|
|
if state['consecutive_failures'] >= min_failures:
|
|
state['status'] = 'stuck'
|
|
|
|
# Send notification if cooldown period has passed
|
|
if self._should_notify(service) and self._notification_cooldown_expired(service):
|
|
self._send_alert_notification(service, reason, now)
|
|
state['last_notification_sent'] = now.isoformat()
|
|
|
|
self._save_state()
|
|
|
|
def _is_monitored(self, service: str) -> bool:
|
|
"""Check if service should be monitored"""
|
|
if not self.config.get('enabled', True):
|
|
return False
|
|
|
|
service_config = self.config.get('services', {}).get(service, {})
|
|
return service_config.get('monitor', True)
|
|
|
|
def _should_notify(self, service: str) -> bool:
|
|
"""Check if notifications are enabled for this service"""
|
|
if not self.pushover:
|
|
return False
|
|
|
|
if not self.config.get('pushover', {}).get('enabled', True):
|
|
return False
|
|
|
|
service_config = self.config.get('services', {}).get(service, {})
|
|
return service_config.get('notify', True)
|
|
|
|
def _notification_cooldown_expired(self, service: str) -> bool:
|
|
"""Check if notification cooldown period has expired"""
|
|
state = self._get_service_state(service)
|
|
last_sent = state.get('last_notification_sent')
|
|
|
|
if not last_sent:
|
|
return True # Never sent, can send now
|
|
|
|
try:
|
|
last_sent_time = datetime.fromisoformat(last_sent)
|
|
# Use push_alert_delay_hours from error_monitoring config if available,
|
|
# otherwise fall back to notification_cooldown_hours or default 24
|
|
cooldown_hours = self.error_monitoring_config.get('push_alert_delay_hours',
|
|
self.config.get('notification_cooldown_hours', 24))
|
|
cooldown_period = timedelta(hours=cooldown_hours)
|
|
|
|
return datetime.now() - last_sent_time > cooldown_period
|
|
except (ValueError, TypeError):
|
|
return True # Error parsing date, allow notification
|
|
|
|
def _send_alert_notification(self, service: str, reason: str, now: datetime):
|
|
"""Send Pushover alert notification"""
|
|
state = self._get_service_state(service)
|
|
|
|
# Calculate time since last success
|
|
time_stuck = "Unknown"
|
|
if state['last_success']:
|
|
try:
|
|
last_success = datetime.fromisoformat(state['last_success'])
|
|
delta = now - last_success
|
|
hours = int(delta.total_seconds() / 3600)
|
|
if hours < 1:
|
|
time_stuck = f"{int(delta.total_seconds() / 60)} minutes ago"
|
|
elif hours < 48:
|
|
time_stuck = f"{hours} hours ago"
|
|
else:
|
|
days = int(hours / 24)
|
|
time_stuck = f"{days} days ago"
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Format service name
|
|
service_name = service.replace('_', ' ').title()
|
|
|
|
# Format reason
|
|
reason_map = {
|
|
'cloudflare': 'Cloudflare Challenge',
|
|
'cloudflare_challenge': 'Cloudflare Challenge',
|
|
'rate_limit': 'Rate Limited (429)',
|
|
'forbidden': 'Access Forbidden (403)',
|
|
'timeout': 'Connection Timeout',
|
|
'authentication': 'Authentication Required',
|
|
'captcha': 'CAPTCHA Challenge',
|
|
'blocked': 'IP Blocked',
|
|
'unknown': 'Unknown Error'
|
|
}
|
|
reason_text = reason_map.get(reason.lower(), reason)
|
|
|
|
# Build message
|
|
title = f"⚠️ Service Alert: {service_name}"
|
|
message = f"""Status: Stuck/Blocked
|
|
Issue: {reason_text}
|
|
Failed Since: {now.strftime('%b %d, %I:%M %p')} ({state['consecutive_failures']} consecutive failures)
|
|
|
|
Last successful download: {time_stuck if state['last_success'] else 'Never'}
|
|
|
|
Action may be required.
|
|
"""
|
|
|
|
# Send notification
|
|
try:
|
|
priority = self.config.get('pushover', {}).get('priority', 0)
|
|
sound = self.config.get('pushover', {}).get('sound', 'pushover')
|
|
|
|
self.pushover.send_notification(
|
|
title=title,
|
|
message=message,
|
|
priority=priority,
|
|
sound=sound
|
|
)
|
|
|
|
self.logger.info(f"Sent alert notification for {service}: {reason}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send alert notification: {e}")
|
|
|
|
def _send_recovery_notification(self, service: str, now: datetime):
|
|
"""Send recovery notification (optional)"""
|
|
# Recovery notifications are optional - can be disabled
|
|
if not self.config.get('send_recovery_notifications', False):
|
|
return
|
|
|
|
state = self._get_service_state(service)
|
|
service_name = service.replace('_', ' ').title()
|
|
|
|
title = f"✅ Service Recovered: {service_name}"
|
|
message = f"""Status: Healthy
|
|
Service is working again.
|
|
|
|
Recovered at: {now.strftime('%b %d, %I:%M %p')}
|
|
"""
|
|
|
|
try:
|
|
self.pushover.send_notification(
|
|
title=title,
|
|
message=message,
|
|
priority=-1, # Low priority for recovery
|
|
sound='magic'
|
|
)
|
|
|
|
self.logger.info(f"Sent recovery notification for {service}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send recovery notification: {e}")
|
|
|
|
def get_service_status(self, service: str) -> Dict:
|
|
"""Get current status for a service"""
|
|
return self._get_service_state(service).copy()
|
|
|
|
def get_all_status(self) -> Dict:
|
|
"""Get status for all services"""
|
|
return self.state['service_health'].copy()
|
|
|
|
def reset_service(self, service: str):
|
|
"""Reset state for a service"""
|
|
if service in self.state['service_health']:
|
|
del self.state['service_health'][service]
|
|
self._save_state()
|