#!/usr/bin/env python3
"""
MIT License
Copyright (c) 2024 Mycelian
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import asyncio
import json
import logging
import threading
import time
from typing import Any, Dict, List, Optional
from . import database_manager
from .connector_actions import create_action
from .connector_core import ActionType, Connector, EventData, TriggerType
from .connector_triggers import create_trigger
from .hotkey_listener import get_listener
from .statistics_manager import get_statistics_manager
logger = logging.getLogger(__name__)
[docs]
class ConnectorManager:
"""Manages connectors and processes events through the trigger-action system"""
def __init__(self):
self.connectors: Dict[str, Connector] = {}
self.event_queue = asyncio.Queue()
self.processing_task = None
self.connector_thread = None
self.is_running = False
self._lock = threading.RLock()
self.hotkey_listener = get_listener()
# Statistics
self.stats = {
"events_processed": 0,
"triggers_fired": 0,
"actions_executed": 0,
"errors": 0,
"started_at": time.time(),
}
[docs]
def start(self):
"""Start the connector manager"""
if self.is_running:
logger.warning("Connector manager is already running")
return
self.is_running = True
logger.info("Starting connector manager")
# Load connectors from database
self.load_connectors()
# Start event processing task only if we have a running event loop
try:
loop = asyncio.get_running_loop()
self.processing_task = asyncio.create_task(self._process_events())
logger.info("Started async event processing task")
except RuntimeError:
# No event loop running, defer task creation
logger.info("No event loop running, will start processing task later")
self.processing_task = None
[docs]
def start_connector_thread(self):
"""Start connector processing in a separate thread with its own event loop"""
import threading
def run_connector_loop():
"""Run the connector event processing in a separate event loop"""
try:
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Start the processing task
self.processing_task = loop.create_task(self._process_events())
# Run the event loop
loop.run_forever()
except Exception as e:
logger.error(f"Error in connector thread: {e}", exc_info=True)
# Start the connector processing in a background thread
self.connector_thread = threading.Thread(
target=run_connector_loop, daemon=True, name="ConnectorProcessor"
)
self.connector_thread.start()
logger.info("Started connector processing thread")
[docs]
def ensure_processing_task(self):
"""Ensure the processing task is running if we have an event loop"""
if self.is_running and self.processing_task is None:
try:
loop = asyncio.get_running_loop()
self.processing_task = asyncio.create_task(self._process_events())
logger.info("Started deferred async event processing task")
except RuntimeError:
# Try to get or create an event loop
try:
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.processing_task = loop.create_task(self._process_events())
# Try to run the event loop if it's not running
if not loop.is_running():
try:
loop.run_until_complete(asyncio.sleep(0))
except Exception:
pass
logger.info(
"Started deferred async event processing task (fallback)"
)
except Exception:
pass
[docs]
async def process_event(self, event_data: Dict[str, Any]) -> bool:
"""Process an event through all enabled connectors"""
if not self.is_running:
return False
# Ensure processing task is running
self.ensure_processing_task()
# Add event to queue for processing
await self.event_queue.put(event_data)
return True
[docs]
async def stop(self):
"""Stop the connector manager"""
if not self.is_running:
return
self.is_running = False
logger.info("Stopping connector manager")
if self.processing_task:
self.processing_task.cancel()
try:
await self.processing_task
except asyncio.CancelledError:
pass
# Cancel any active audio restoration tasks and clean up registry
try:
from .connector_actions import AudioControlAction
AudioControlAction.cancel_all_restoration_tasks()
AudioControlAction.cleanup_audio_change_registry()
except Exception as e:
logger.error(f"Error cancelling audio restoration tasks: {e}")
[docs]
async def add_event(self, event_data: Dict[str, Any]):
"""Add an event to the processing queue"""
if not self.is_running:
logger.warning("Connector manager not running, dropping event")
return
self.ensure_processing_task()
await self.event_queue.put(event_data)
logger.debug(f"Added event to queue: {event_data.get('event_type', 'unknown')}")
async def _process_events(self):
"""Process events from the queue"""
logger.info("Event processing started")
while self.is_running:
try:
# Wait for an event with timeout
try:
event_data = await asyncio.wait_for(
self.event_queue.get(), timeout=1.0
)
except asyncio.TimeoutError:
continue
# Process the event
await self._process_single_event(event_data)
self.stats["events_processed"] += 1
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error processing event: {e}", exc_info=True)
self.stats["errors"] += 1
logger.info("Event processing stopped")
async def _process_single_event(self, event_data: Dict[str, Any]):
"""Process a single event through all connectors"""
event_type = event_data.get("event_type", "unknown")
logger.debug(f"Processing event: {event_type}")
triggered_connectors = 0
triggered_connector_names = []
# Process through all enabled connectors
for connector_id, connector in self.connectors.items():
# Check if connector is enabled
if not connector.enabled:
continue
try:
triggered = await connector.process_event(event_data)
if triggered:
self.stats["triggers_fired"] += 1
triggered_connectors += 1
triggered_connector_names.append(connector.name)
# Note: action execution count is tracked in the connector
except Exception as e:
logger.error(
f"Error processing event in connector '{connector.name}': {e}",
exc_info=True,
)
self.stats["errors"] += 1
# Track statistics for triggered connectors
if triggered_connectors > 0:
try:
stats_manager = get_statistics_manager()
username = event_data.get("username")
# Track each individual connector that was triggered
for connector_name in triggered_connector_names:
stats_manager.increment_connector_triggers(
1, # One trigger per connector
username=username,
connector_name=connector_name,
)
logger.debug(
f"Tracked {triggered_connectors} connector trigger(s) for event: {event_type}"
)
except Exception as e:
logger.error(f"Error tracking connector trigger statistics: {e}")
[docs]
def load_connectors(self):
"""Load connectors from database"""
try:
with self._lock:
# Get connectors from database
connectors_data = database_manager.get_data("connectors", {})
self.connectors.clear()
for connector_id, connector_data in connectors_data.items():
try:
connector = self._deserialize_connector(connector_data)
if connector:
self.connectors[connector_id] = connector
# Register hotkey if it's a hotkey trigger and enabled
if connector.enabled:
self._register_connector_hotkey(connector)
logger.debug(f"Loaded connector: {connector.name}")
except Exception as e:
logger.error(
f"Error loading connector {connector_id}: {e}",
exc_info=True,
)
logger.info(
f"🔌 Loaded {len(self.connectors)} connectors from database"
)
# Log connector names for debugging
if self.connectors:
connector_names = list(self.connectors.keys())
logger.debug(f" Connector names: {connector_names}")
except Exception as e:
logger.error(f"Error loading connectors: {e}", exc_info=True)
[docs]
def save_connectors(self):
"""Save connectors to database"""
try:
with self._lock:
# Serialize all connectors
connectors_data = {}
for connector_id, connector in self.connectors.items():
try:
connectors_data[connector_id] = self._serialize_connector(
connector
)
except Exception as e:
logger.error(
f"Error serializing connector {connector_id}: {e}",
exc_info=True,
)
# Save to database
database_manager.set_data("connectors", connectors_data)
logger.info(f"Saved {len(connectors_data)} connectors")
except Exception as e:
logger.error(f"Error saving connectors: {e}", exc_info=True)
[docs]
def add_connector(self, connector: Connector) -> bool:
"""Add a new connector"""
try:
with self._lock:
if connector.connector_id in self.connectors:
logger.warning(f"Connector {connector.connector_id} already exists")
return False
# Validate connector
if not self._validate_connector(connector):
return False
self.connectors[connector.connector_id] = connector
# Register hotkey if it's a hotkey trigger and enabled
if connector.enabled:
self._register_connector_hotkey(connector)
logger.info(f"Added connector: {connector.name}")
# Note: We don't track connector creation count in statistics anymore
# since we get it dynamically from the connector_manager itself
# Save to database
self.save_connectors()
return True
except Exception as e:
logger.error(f"Error adding connector: {e}", exc_info=True)
return False
[docs]
def update_connector(self, connector_id: str, connector: Connector) -> bool:
"""Update an existing connector"""
try:
with self._lock:
if connector_id not in self.connectors:
logger.warning(f"Connector {connector_id} does not exist")
return False
# Validate connector
if not self._validate_connector(connector):
return False
self.connectors[connector_id] = connector
logger.info(f"Updated connector: {connector.name}")
# Save to database
self.save_connectors()
return True
except Exception as e:
logger.error(f"Error updating connector: {e}", exc_info=True)
return False
[docs]
def remove_connector(self, connector_id: str) -> bool:
"""Remove a connector"""
try:
with self._lock:
if connector_id not in self.connectors:
logger.warning(f"Connector {connector_id} does not exist")
return False
connector = self.connectors.pop(connector_id)
# Unregister hotkey if it's a hotkey trigger
self._unregister_connector_hotkey(connector)
logger.info(f"Removed connector: {connector.name}")
# Save to database
self.save_connectors()
return True
except Exception as e:
logger.error(f"Error removing connector: {e}", exc_info=True)
return False
[docs]
def get_connector(self, connector_id: str) -> Optional[Connector]:
"""Get a connector by ID"""
with self._lock:
return self.connectors.get(connector_id)
[docs]
def get_all_connectors(self) -> Dict[str, Connector]:
"""Get all connectors"""
with self._lock:
return self.connectors.copy()
[docs]
def get_connectors_by_trigger_type(
self, trigger_type: TriggerType
) -> List[Connector]:
"""Get connectors with a specific trigger type"""
with self._lock:
return [
connector
for connector in self.connectors.values()
if connector.trigger and connector.trigger.trigger_type == trigger_type
]
[docs]
def toggle_connector(self, connector_id: str) -> bool:
"""Toggle a connector's enabled state"""
try:
with self._lock:
if connector_id not in self.connectors:
return False
connector = self.connectors[connector_id]
connector.enabled = not connector.enabled
# Register/unregister hotkey if it's a hotkey trigger
if (
connector.trigger
and connector.trigger.trigger_type == TriggerType.HOTKEY
):
if connector.enabled:
self._register_connector_hotkey(connector)
else:
self._unregister_connector_hotkey(connector)
logger.info(
f"Connector '{connector.name}' {'enabled' if connector.enabled else 'disabled'}"
)
# Save to database
self.save_connectors()
return True
except Exception as e:
logger.error(f"Error toggling connector: {e}", exc_info=True)
return False
[docs]
def get_statistics(self) -> Dict[str, Any]:
"""Get connector manager statistics"""
uptime = time.time() - self.stats["started_at"]
return {
**self.stats,
"uptime_seconds": uptime,
"connector_count": len(self.connectors),
"enabled_connectors": sum(1 for c in self.connectors.values() if c.enabled),
}
def _validate_connector(self, connector: Connector) -> bool:
"""Validate a connector configuration"""
if not connector.trigger:
logger.error("Connector missing trigger")
return False
if not connector.actions:
logger.error("Connector missing actions")
return False
# Validate trigger
if not hasattr(connector.trigger, "should_trigger"):
logger.error("Invalid trigger type")
return False
# Validate actions
for action in connector.actions:
if not hasattr(action, "execute"):
logger.error("Invalid action type")
return False
if not action.validate_parameters():
logger.error(f"Action '{action.name}' has invalid parameters")
return False
return True
def _serialize_connector(self, connector: Connector) -> Dict[str, Any]:
"""Serialize a connector to dictionary"""
return {
"connector_id": connector.connector_id,
"name": connector.name,
"description": connector.description,
"enabled": connector.enabled,
"created_at": connector.created_at,
"last_triggered": connector.last_triggered,
"trigger_count": connector.trigger_count,
"metadata": connector.metadata,
"trigger": self._serialize_trigger(connector.trigger)
if connector.trigger
else None,
"actions": [self._serialize_action(action) for action in connector.actions],
}
def _deserialize_connector(self, data: Dict[str, Any]) -> Optional[Connector]:
"""Deserialize a connector from dictionary"""
try:
# Deserialize trigger
trigger = None
if data.get("trigger"):
trigger = self._deserialize_trigger(data["trigger"])
# Deserialize actions
actions = []
for action_data in data.get("actions", []):
action = self._deserialize_action(action_data)
if action:
actions.append(action)
return Connector(
connector_id=data["connector_id"],
name=data["name"],
description=data.get("description", ""),
enabled=data.get("enabled", True),
trigger=trigger,
actions=actions,
created_at=data.get("created_at", time.time()),
last_triggered=data.get("last_triggered", 0),
trigger_count=data.get("trigger_count", 0),
metadata=data.get("metadata", {}),
)
except Exception as e:
logger.error(f"Error deserializing connector: {e}", exc_info=True)
return None
def _serialize_trigger(self, trigger) -> Dict[str, Any]:
"""Serialize a trigger to dictionary"""
return {
"trigger_id": trigger.trigger_id,
"trigger_type": trigger.trigger_type.value,
"name": trigger.name,
"description": trigger.description,
"enabled": trigger.enabled,
"conditions": [
{
"field": cond.field,
"operator": cond.operator.value,
"value": cond.value,
"case_sensitive": cond.case_sensitive,
}
for cond in trigger.conditions
],
"cooldown_seconds": trigger.cooldown_seconds,
"last_triggered": trigger.last_triggered,
"metadata": trigger.metadata,
# Add trigger-specific fields
**{
key: value
for key, value in trigger.__dict__.items()
if key
not in [
"trigger_id",
"trigger_type",
"name",
"description",
"enabled",
"conditions",
"cooldown_seconds",
"last_triggered",
"metadata",
]
},
}
def _deserialize_trigger(self, data: Dict[str, Any]):
"""Deserialize a trigger from dictionary"""
try:
from .connector_core import ComparisonOperator, TriggerCondition
trigger_type = TriggerType(data["trigger_type"])
# Deserialize conditions
conditions = []
for cond_data in data.get("conditions", []):
condition = TriggerCondition(
field=cond_data["field"],
operator=ComparisonOperator(cond_data["operator"]),
value=cond_data["value"],
case_sensitive=cond_data.get("case_sensitive", True),
)
conditions.append(condition)
# Extract trigger-specific parameters
trigger_params = {
key: value
for key, value in data.items()
if key
not in [
"trigger_id",
"trigger_type",
"name",
"description",
"enabled",
"conditions",
"cooldown_seconds",
"last_triggered",
"metadata",
]
}
# Create trigger
trigger = create_trigger(
trigger_type=trigger_type,
trigger_id=data["trigger_id"],
name=data["name"],
description=data.get("description", ""),
enabled=data.get("enabled", True),
conditions=conditions,
cooldown_seconds=data.get("cooldown_seconds", 0),
metadata=data.get("metadata", {}),
**trigger_params,
)
trigger.last_triggered = data.get("last_triggered", 0)
return trigger
except Exception as e:
logger.error(f"Error deserializing trigger: {e}", exc_info=True)
return None
def _serialize_action(self, action) -> Dict[str, Any]:
"""Serialize an action to dictionary"""
return {
"action_id": action.action_id,
"action_type": action.action_type.value,
"name": action.name,
"description": action.description,
"enabled": action.enabled,
"parameters": action.parameters,
"metadata": action.metadata,
# Add action-specific fields
**{
key: value
for key, value in action.__dict__.items()
if key
not in [
"action_id",
"action_type",
"name",
"description",
"enabled",
"parameters",
"metadata",
]
},
}
def _deserialize_action(self, data: Dict[str, Any]):
"""Deserialize an action from dictionary"""
try:
action_type = ActionType(data["action_type"])
# Extract action-specific parameters
action_params = {
key: value
for key, value in data.items()
if key
not in [
"action_id",
"action_type",
"name",
"description",
"enabled",
"parameters",
"metadata",
]
}
# Create action
action = create_action(
action_type=action_type,
action_id=data["action_id"],
name=data["name"],
description=data.get("description", ""),
enabled=data.get("enabled", True),
parameters=data.get("parameters", {}),
metadata=data.get("metadata", {}),
**action_params,
)
return action
except Exception as e:
logger.error(f"Error deserializing action: {e}", exc_info=True)
return None
def _register_connector_hotkey(self, connector: Connector):
"""Register hotkey for a connector"""
try:
if (
not connector.trigger
or connector.trigger.trigger_type != TriggerType.HOTKEY
):
return
key_combination = getattr(connector.trigger, "key_combination", "")
if not key_combination:
return
is_global = getattr(connector.trigger, "is_global", True)
self.hotkey_listener.register_hotkey(
connector.connector_id, key_combination, is_global
)
except Exception as e:
logger.error(
f"Error registering hotkey for connector {connector.connector_id}: {e}",
exc_info=True,
)
def _unregister_connector_hotkey(self, connector: Connector):
"""Unregister hotkey for a connector"""
try:
if (
not connector.trigger
or connector.trigger.trigger_type != TriggerType.HOTKEY
):
return
self.hotkey_listener.unregister_hotkey(connector.connector_id)
except Exception as e:
logger.error(
f"Error unregistering hotkey for connector {connector.connector_id}: {e}",
exc_info=True,
)
[docs]
async def test_connector(
self, connector_id: str, test_event_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Test a connector with provided event data"""
connector = self.get_connector(connector_id)
if not connector:
return {"success": False, "error": "Connector not found"}
try:
# Process test event
result = await connector.process_event(test_event_data)
return {
"success": True,
"triggered": result,
"connector_name": connector.name,
"trigger_type": connector.trigger.trigger_type.value
if connector.trigger
else None,
"action_count": len(connector.actions),
}
except Exception as e:
logger.error(f"Error testing connector: {e}", exc_info=True)
return {"success": False, "error": str(e)}
# Global connector manager instance
connector_manager = ConnectorManager()
[docs]
def initialize():
"""Initialize the connector system"""
global connector_manager
try:
logger.info("Initializing connector system")
# Initialize hotkey listener
from .hotkey_listener import initialize as init_hotkey_listener
init_hotkey_listener()
connector_manager.start()
logger.info("Connector system initialized successfully")
except Exception as e:
logger.error(f"Error initializing connector system: {e}", exc_info=True)
[docs]
async def cleanup():
"""Cleanup the connector system"""
global connector_manager
try:
logger.info("Cleaning up connector system")
await connector_manager.stop()
# Cleanup hotkey listener
from .hotkey_listener import cleanup as cleanup_hotkey_listener
cleanup_hotkey_listener()
logger.info("Connector system cleaned up successfully")
except Exception as e:
logger.error(f"Error cleaning up connector system: {e}", exc_info=True)
[docs]
def get_manager() -> ConnectorManager:
"""Get the global connector manager instance"""
return connector_manager