#!/usr/bin/env python3
"""
MIT License
Copyright (c) 2024 Mycelian
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
"""
Alert Database Migration Module for Mycelian
This module provides functionality to import alerts from external JSON files,
such as those exported from other streaming tools like Streamlabs OBS, OBS WebSocket alerts, etc.
Supports common alert formats and provides preview and migration capabilities.
"""
import json
import logging
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from pathlib import Path
from .alertutils import AlertObj
from .database_manager import database_manager
logger = logging.getLogger(__name__)
[docs]
@dataclass
class MigrationResult:
"""Result of an alert migration operation"""
total_alerts: int = 0
successful_migrations: int = 0
failed_migrations: int = 0
skipped_alerts: int = 0
alert_type_counts: Dict[str, int] = None
errors: List[Dict[str, str]] = None
def __post_init__(self):
if self.alert_type_counts is None:
self.alert_type_counts = {}
if self.errors is None:
self.errors = []
[docs]
def preview_migration(file_path: str) -> tuple[int, Dict[str, int]]:
"""
Preview what would be migrated from a file without actually migrating.
Args:
file_path: Path to the JSON file to preview
Returns:
tuple: (total_count, type_counts) where type_counts is a dict of alert_type: count
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
alerts = _parse_alert_data(data)
total_count = len(alerts)
# Count by type
type_counts = {}
for alert in alerts:
alert_type = alert.get('type', 'unknown')
type_counts[alert_type] = type_counts.get(alert_type, 0) + 1
logger.info(f"Preview: Found {total_count} alerts in {file_path}")
return total_count, type_counts
except Exception as e:
logger.error(f"Error previewing migration from {file_path}: {str(e)}")
return 0, {}
[docs]
def migrate_alerts_from_file(file_path: str, overwrite_existing: bool = False) -> MigrationResult:
"""
Migrate alerts from an external JSON file.
Args:
file_path: Path to the JSON file containing alerts
overwrite_existing: Whether to overwrite existing alerts with the same ID
Returns:
MigrationResult: Detailed results of the migration
"""
result = MigrationResult()
try:
# Load and parse the file
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
alerts = _parse_alert_data(data)
result.total_alerts = len(alerts)
logger.info(f"Starting migration of {result.total_alerts} alerts from {file_path}")
# Process each alert
for alert_data in alerts:
try:
success = _migrate_single_alert(alert_data, overwrite_existing, result)
if success:
result.successful_migrations += 1
alert_type = alert_data.get('type', 'unknown')
result.alert_type_counts[alert_type] = result.alert_type_counts.get(alert_type, 0) + 1
else:
result.failed_migrations += 1
except Exception as e:
result.failed_migrations += 1
result.errors.append({
'type': alert_data.get('type', 'unknown'),
'id': alert_data.get('id', 'unknown'),
'error': str(e)
})
logger.error(f"Error migrating alert {alert_data.get('id', 'unknown')}: {str(e)}")
logger.info(f"Migration completed: {result.successful_migrations} successful, {result.failed_migrations} failed")
return result
except Exception as e:
logger.error(f"Error during migration from {file_path}: {str(e)}")
result.errors.append({
'type': 'file',
'id': file_path,
'error': str(e)
})
return result
def _parse_alert_data(data: Any) -> List[Dict[str, Any]]:
"""
Parse alert data from various JSON formats.
Supports multiple common formats from different streaming tools.
"""
alerts = []
try:
# Handle different file formats
if isinstance(data, list):
# Direct list of alerts
alerts = data
elif isinstance(data, dict):
# Check for common structure patterns
if 'alerts' in data:
alerts = data['alerts']
elif 'notifications' in data:
alerts = data['notifications']
elif 'events' in data:
alerts = data['events']
elif 'sounds' in data:
# Streamlabs OBS format
alerts = _parse_streamlabs_format(data['sounds'])
else:
# Try to parse as individual alert
alerts = [data]
# Normalize alert format
normalized_alerts = []
for alert in alerts:
normalized = _normalize_alert_format(alert)
if normalized:
normalized_alerts.append(normalized)
return normalized_alerts
except Exception as e:
logger.error(f"Error parsing alert data: {str(e)}")
return []
def _parse_streamlabs_format(sounds_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parse Streamlabs OBS sound format"""
alerts = []
# Streamlabs format maps event types to sound configurations
event_type_map = {
'donation': 'donations',
'follow': 'follows',
'subscription': 'subs',
'bits': 'bits',
'raid': 'raids',
'giftsub': 'giftsubs'
}
for event_type, mycelian_type in event_type_map.items():
if event_type in sounds_data:
event_data = sounds_data[event_type]
# Handle both single alerts and arrays
if isinstance(event_data, list):
for i, item in enumerate(event_data):
alert = _convert_streamlabs_alert(item, mycelian_type, f"{event_type}_{i}")
if alert:
alerts.append(alert)
else:
alert = _convert_streamlabs_alert(event_data, mycelian_type, event_type)
if alert:
alerts.append(alert)
return alerts
def _convert_streamlabs_alert(streamlabs_alert: Dict[str, Any], alert_type: str, alert_id: str) -> Optional[Dict[str, Any]]:
"""Convert a Streamlabs alert to Mycelian format"""
try:
# Extract common fields
alert = {
'id': alert_id,
'type': alert_type,
'enabled': streamlabs_alert.get('enabled', True),
'sound_file': streamlabs_alert.get('file', ''),
'volume': streamlabs_alert.get('volume', 50) / 100.0, # Convert to 0-1 range
'delay': streamlabs_alert.get('delay', 0),
'message': streamlabs_alert.get('message', ''),
'duration': streamlabs_alert.get('duration', 5000),
'conditions': {}
}
# Handle amount-based conditions
if 'amount' in streamlabs_alert:
amount = streamlabs_alert['amount']
if isinstance(amount, dict):
min_amount = amount.get('min', 0)
max_amount = amount.get('max', 999999)
if min_amount == max_amount:
alert['id'] = f"{alert_type}{min_amount}"
else:
alert['id'] = f"{alert_type}{min_amount}-{max_amount}"
alert['conditions']['amount_range'] = [min_amount, max_amount]
elif isinstance(amount, (int, float)):
alert['id'] = f"{alert_type}{amount}"
alert['conditions']['amount'] = amount
return alert
except Exception as e:
logger.error(f"Error converting Streamlabs alert: {str(e)}")
return None
def _normalize_alert_format(alert_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Normalize an alert to Mycelian's expected format"""
try:
# Required fields
if 'type' not in alert_data and 'event_type' not in alert_data:
return None
alert_type = alert_data.get('type') or alert_data.get('event_type')
alert_id = alert_data.get('id') or alert_data.get('name') or f"{alert_type}_import"
# Normalize type names
type_map = {
'donation': 'donations',
'follow': 'follows',
'subscription': 'subs',
'subscriber': 'subs',
'bit': 'bits',
'cheer': 'bits',
'raid': 'raids',
'host': 'raids', # Treat hosts as raids
'giftsub': 'giftsubs',
'gift_subscription': 'giftsubs'
}
normalized_type = type_map.get(alert_type.lower(), alert_type.lower())
# Build normalized alert
normalized = {
'id': alert_id,
'type': normalized_type,
'enabled': alert_data.get('enabled', True),
'sound_file': alert_data.get('sound_file') or alert_data.get('audio_file') or alert_data.get('file', ''),
'volume': float(alert_data.get('volume', 0.5)),
'delay': int(alert_data.get('delay', 0)),
'message': alert_data.get('message') or alert_data.get('text', ''),
'duration': int(alert_data.get('duration', 5000)),
'conditions': alert_data.get('conditions', {})
}
# Handle amount/value conditions
if 'amount' in alert_data:
normalized['conditions']['amount'] = alert_data['amount']
if 'min_amount' in alert_data and 'max_amount' in alert_data:
normalized['conditions']['amount_range'] = [alert_data['min_amount'], alert_data['max_amount']]
return normalized
except Exception as e:
logger.error(f"Error normalizing alert format: {str(e)}")
return None
def _migrate_single_alert(alert_data: Dict[str, Any], overwrite_existing: bool, result: MigrationResult) -> bool:
"""Migrate a single alert to the database"""
try:
alert_type = alert_data['type']
alert_id = alert_data['id']
# Determine the database path
path_map = {
'bits': 'Alerts/BitAlerts',
'subs': 'Alerts/SubAlerts',
'donations': 'Alerts/DonationAlerts',
'follows': 'Alerts/FollowAlerts',
'raids': 'Alerts/RaidAlerts',
'giftsubs': 'Alerts/GiftsubAlerts',
'points': 'Alerts/PointAlerts'
}
# Handle range alerts (if amount_range is specified)
if 'amount_range' in alert_data.get('conditions', {}):
range_path_map = {
'bits': 'Alerts/BitRangeAlerts',
'donations': 'Alerts/DonationRangeAlerts',
'raids': 'Alerts/RaidRangeAlerts',
'giftsubs': 'Alerts/GiftsubRangeAlerts'
}
db_path = range_path_map.get(alert_type)
if not db_path:
logger.warning(f"Range alerts not supported for type: {alert_type}")
return False
else:
db_path = path_map.get(alert_type)
if not db_path:
logger.warning(f"Unknown alert type: {alert_type}")
return False
# Check if alert already exists
full_path = f"{db_path}/{alert_id}"
existing_alert = database_manager.get_data(full_path)
if existing_alert and not overwrite_existing:
result.skipped_alerts += 1
logger.info(f"Skipped existing alert: {alert_id}")
return False
# Create AlertObj
alert_obj = AlertObj(
alert_id=alert_id,
enabled=alert_data.get('enabled', True),
sound_file=alert_data.get('sound_file', ''),
volume=alert_data.get('volume', 0.5),
delay=alert_data.get('delay', 0),
message=alert_data.get('message', ''),
duration=alert_data.get('duration', 5000)
)
# Save to database
alert_dict = {
'alert_id': alert_obj.alert_id,
'enabled': alert_obj.enabled,
'sound_file': alert_obj.sound_file,
'volume': alert_obj.volume,
'delay': alert_obj.delay,
'message': alert_obj.message,
'duration': alert_obj.duration
}
success = database_manager.set_data(full_path, alert_dict)
if success:
logger.info(f"Successfully migrated alert: {alert_id} to {db_path}")
else:
logger.error(f"Failed to save alert: {alert_id}")
return success
except Exception as e:
logger.error(f"Error migrating single alert: {str(e)}")
return False