Source code for modules.connector_integration

#!/usr/bin/env python3
"""
MIT License

Copyright (c) 2024 Mycelian

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

import asyncio
import logging
import time
from typing import Any, Dict, List

from .connector_core import EventData
from .connector_manager import get_manager

logger = logging.getLogger(__name__)


[docs] class ConnectorIntegration: """Integration layer between the connector system and existing Mycelian modules""" def __init__(self): self.manager = get_manager() self._twitch_integration_setup = False self._streamlabs_integration_setup = False
[docs] def setup_twitch_integration(self): """Set up integration with Twitch events""" if self._twitch_integration_setup: return try: # Import here to avoid circular imports from . import twitch # Store original event handlers self._original_twitch_handlers = {} # Wrap existing event handlers to also send to connector system if hasattr(twitch, "twitch_api") and twitch.twitch_api: self._wrap_twitch_handler( twitch.twitch_api, "on_bits_use", self._handle_twitch_bits ) self._wrap_twitch_handler( twitch.twitch_api, "on_sub", self._handle_twitch_sub ) self._wrap_twitch_handler( twitch.twitch_api, "on_resub", self._handle_twitch_resub ) self._wrap_twitch_handler( twitch.twitch_api, "on_sub_gift", self._handle_twitch_giftsub ) self._wrap_twitch_handler( twitch.twitch_api, "on_follow", self._handle_twitch_follow ) self._wrap_twitch_handler( twitch.twitch_api, "on_raid", self._handle_twitch_raid ) self._wrap_twitch_handler( twitch.twitch_api, "on_points", self._handle_twitch_points ) self._wrap_twitch_handler( twitch.twitch_api, "on_chat_message", self._handle_twitch_chat ) self._wrap_twitch_handler( twitch.twitch_api, "on_hype_train_start", self._handle_twitch_hype_train_start, ) self._wrap_twitch_handler( twitch.twitch_api, "on_hype_train_end", self._handle_twitch_hype_train_end, ) logger.info("Twitch integration set up for connector system") self._twitch_integration_setup = True else: logger.warning("Twitch API not available for connector integration") except Exception as e: logger.error(f"Error setting up Twitch integration: {e}", exc_info=True)
[docs] def setup_streamlabs_integration(self): """Set up integration with StreamLabs events""" if self._streamlabs_integration_setup: return try: from . import streamlabs # Wrap StreamLabs donation handler if hasattr(streamlabs, "streamlabs_integration"): original_handler = getattr( streamlabs.streamlabs_integration, "handle_donation_event", None ) if original_handler: def wrapped_handler(data): # Call original handler result = original_handler(data) # Send to connector system asyncio.create_task(self._handle_streamlabs_donation(data)) return result streamlabs.streamlabs_integration.handle_donation_event = ( wrapped_handler ) logger.info("StreamLabs integration set up for connector system") self._streamlabs_integration_setup = True else: logger.warning("StreamLabs donation handler not found") else: logger.warning("StreamLabs integration not available") except Exception as e: logger.error(f"Error setting up StreamLabs integration: {e}", exc_info=True)
def _wrap_twitch_handler(self, api_instance, handler_name: str, connector_handler): """Wrap a Twitch API handler to also send events to connector system""" try: original_handler = getattr(api_instance, handler_name, None) if not original_handler: logger.warning(f"Twitch handler {handler_name} not found") return # Store original handler self._original_twitch_handlers[handler_name] = original_handler # Create wrapped handler async def wrapped_handler(*args, **kwargs): # Call original handler result = await original_handler(*args, **kwargs) # Send to connector system try: await connector_handler(*args, **kwargs) except Exception as e: logger.error( f"Error in connector handler for {handler_name}: {e}", exc_info=True, ) return result # Replace handler setattr(api_instance, handler_name, wrapped_handler) logger.debug(f"Wrapped Twitch handler: {handler_name}") except Exception as e: logger.error( f"Error wrapping Twitch handler {handler_name}: {e}", exc_info=True ) async def _handle_twitch_bits(self, data): """Handle Twitch bits event for connector system""" try: print(f"[DEBUG] ConnectorIntegration._handle_twitch_bits: received bits event - bits: {data.event.bits}, user: {data.event.user_name}") event_data = EventData.from_twitch_bits( bits_amount=data.event.bits, username=data.event.user_name, message=getattr(data.event, "message", ""), user_id=data.event.user_id, is_anonymous=getattr(data.event, "is_anonymous", False), ) print(f"[DEBUG] ConnectorIntegration._handle_twitch_bits: created event_data: {event_data}") await self.manager.add_event(event_data) print(f"[DEBUG] ConnectorIntegration._handle_twitch_bits: event sent to manager") except Exception as e: print(f"[DEBUG] ConnectorIntegration._handle_twitch_bits: ERROR - {e}") logger.error( f"Error handling Twitch bits for connector: {e}", exc_info=True ) async def _handle_twitch_sub(self, data): """Handle Twitch subscription event for connector system""" try: event_data = EventData.from_twitch_sub( tier=data.event.tier, username=data.event.user_name, message=getattr(data.event.message, "text", "") if hasattr(data.event, "message") and data.event.message else "", months=1, # New sub is 1 month user_id=data.event.user_id, is_gift=getattr(data.event, "is_gift", False), ) await self.manager.add_event(event_data) except Exception as e: logger.error(f"Error handling Twitch sub for connector: {e}", exc_info=True) async def _handle_twitch_resub(self, data): """Handle Twitch resubscription event for connector system""" try: event_data = { "event_type": "twitch_resub", "tier": data.event.tier, "username": data.event.user_name, "message": getattr(data.event.message, "text", "") if hasattr(data.event, "message") and data.event.message else "", "months": getattr(data.event, "cumulative_months", 1), "streak_months": getattr(data.event, "streak_months", 0), "user_id": data.event.user_id, "timestamp": data.event.timestamp if hasattr(data.event, "timestamp") else None, "source": "twitch", } await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling Twitch resub for connector: {e}", exc_info=True ) async def _handle_twitch_giftsub(self, data): """Handle Twitch gift subscription event for connector system""" try: event_data = { "event_type": "twitch_giftsub", "tier": data.event.tier, "username": data.event.user_name, "recipient": getattr(data.event, "recipient_user_name", ""), "gift_count": getattr(data.event, "total", 1), "is_anonymous": getattr(data.event, "is_anonymous", False), "user_id": data.event.user_id, "timestamp": data.event.timestamp if hasattr(data.event, "timestamp") else None, "source": "twitch", } await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling Twitch gift sub for connector: {e}", exc_info=True ) async def _handle_twitch_follow(self, data): """Handle Twitch follow event for connector system""" try: event_data = EventData.from_twitch_follow( username=data.event.user_name, user_id=data.event.user_id, followed_at=data.event.followed_at, ) await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling Twitch follow for connector: {e}", exc_info=True ) async def _handle_twitch_raid(self, data): """Handle Twitch raid event for connector system""" try: event_data = EventData.from_twitch_raid( username=data.event.from_broadcaster_user_name, viewer_count=data.event.viewers, from_user_id=data.event.from_broadcaster_user_id, to_user_id=data.event.to_broadcaster_user_id, ) await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling Twitch raid for connector: {e}", exc_info=True ) async def _handle_twitch_points(self, data): """Handle Twitch channel points event for connector system""" try: event_data = { "event_type": "twitch_points", "username": data.event.user_name, "reward_id": data.event.reward.id, "reward_title": data.event.reward.title, "reward_name": data.event.reward.title, # Add both fields for compatibility "reward_cost": data.event.reward.cost, "user_input": getattr(data.event, "user_input", ""), "user_id": data.event.user_id, "timestamp": data.event.redeemed_at, "source": "twitch", } await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling Twitch points for connector: {e}", exc_info=True ) async def _handle_twitch_chat(self, data): """Handle Twitch chat message event for connector system""" try: message = data.event.message.text is_command = message.startswith("!") command = message[1:].split()[0] if is_command and len(message) > 1 else "" event_data = EventData.from_twitch_chat( username=data.event.chatter_user_name, message=message, is_command=is_command, command=command, user_id=data.event.chatter_user_id, badges=getattr(data.event, "badges", []), emotes=getattr(data.event.message, "emotes", []), ) await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling Twitch chat for connector: {e}", exc_info=True ) async def _handle_twitch_hype_train_start(self, data): """Handle Twitch hype train start event for connector system""" try: event_data = { "event_type": "twitch_hype_train_start", "level": data.event.level, "total": data.event.total, "goal": data.event.goal, "top_contributors": getattr(data.event, "top_contributions", []), "started_at": data.event.started_at, "expires_at": data.event.expires_at, "timestamp": data.event.started_at, "source": "twitch", } await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling Twitch hype train start for connector: {e}", exc_info=True, ) async def _handle_twitch_hype_train_end(self, data): """Handle Twitch hype train end event for connector system""" try: event_data = { "event_type": "twitch_hype_train_end", "level": data.event.level, "total": data.event.total, "top_contributors": getattr(data.event, "top_contributions", []), "started_at": data.event.started_at, "ended_at": data.event.ended_at, "cooldown_ends_at": data.event.cooldown_ends_at, "timestamp": data.event.ended_at, "source": "twitch", } await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling Twitch hype train end for connector: {e}", exc_info=True, ) async def _handle_streamlabs_donation(self, data): """Handle StreamLabs donation event for connector system""" try: message_data = data.get("message", []) if message_data: donation_info = message_data[0] # First donation in the batch event_data = EventData.from_donation( amount=float(donation_info.get("amount", 0)), username=donation_info.get("from", "Anonymous"), message=donation_info.get("message", ""), currency=donation_info.get("currency", "USD"), source="streamlabs", ) await self.manager.add_event(event_data) except Exception as e: logger.error( f"Error handling StreamLabs donation for connector: {e}", exc_info=True )
[docs] async def send_hotkey_event( self, key_code: str, modifiers: List[str] = None, is_global: bool = True ): """Send a hotkey event to the connector system""" try: if modifiers is None: modifiers = [] event_data = { "event_type": "hotkey", "key_code": key_code, "modifiers": modifiers, "is_global": is_global, "timestamp": time.time(), "source": "hotkey", } await self.manager.add_event(event_data) logger.debug(f"Hotkey event sent: {key_code} with modifiers {modifiers}") except Exception as e: logger.error(f"Error sending hotkey event: {e}", exc_info=True)
[docs] async def send_timer_event(self, timer_id: str, interval_seconds: int): """Send a timer event to the connector system""" try: import time event_data = { "event_type": "timer", "timer_id": timer_id, "interval_seconds": interval_seconds, "timestamp": time.time(), "source": "timer", } await self.manager.add_event(event_data) logger.debug(f"Timer event sent: {timer_id}") except Exception as e: logger.error(f"Error sending timer event: {e}", exc_info=True)
[docs] async def send_webhook_event(self, webhook_data: Dict[str, Any]): """Send a webhook event to the connector system""" try: event_data = { "event_type": "webhook", "webhook_data": webhook_data, "timestamp": webhook_data.get("timestamp") or time.time(), "source": "webhook", **webhook_data, } await self.manager.add_event(event_data) logger.info("Webhook event sent") except Exception as e: logger.error(f"Error sending webhook event: {e}", exc_info=True)
[docs] def get_available_template_actions(self) -> Dict[str, Dict[str, Any]]: """Get available template actions from template configs Returns: Dict[str, Dict[str, Any]]: Template name -> {action_id: action_config} mapping """ try: from . import template_config_parser config_parser = template_config_parser.TemplateConfigParser() config_files = config_parser.get_config_files() template_actions = {} for template_name in config_files: try: config = config_parser.load_config( template_name, include_dynamic_controls=True ) actions = {} # Look for connector_actions section if isinstance(config, dict) and "connector_actions" in config: connector_actions = config["connector_actions"] # connector_actions is a dict where each key is an action ID # and the value contains the full action configuration including elements for action_id, action_config in connector_actions.items(): if isinstance(action_config, dict): # Return the full action config, not just the name actions[action_id] = action_config if actions: template_actions[template_name] = actions except Exception as e: logger.warning( f"Error processing template {template_name} for actions: {e}" ) logger.debug( f"Found template actions for {len(template_actions)} templates" ) return template_actions except Exception as e: logger.error(f"Error getting template actions: {e}", exc_info=True) return {}
# Global connector integration instance connector_integration = ConnectorIntegration()
[docs] def initialize_integration(): """Initialize the connector integration system""" global connector_integration try: logger.info("Initializing connector integration") # Set up integrations connector_integration.setup_twitch_integration() connector_integration.setup_streamlabs_integration() logger.info("Connector integration initialized successfully") except Exception as e: logger.error(f"Error initializing connector integration: {e}", exc_info=True)
[docs] def get_integration() -> ConnectorIntegration: """Get the global connector integration instance""" return connector_integration