Source code for modules.statistics_manager

#!/usr/bin/env python3
"""
MIT License

Copyright (c) 2024 Mycelian

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

import asyncio
import json
import logging
import os
import sqlite3
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional

from .database_manager import get_data, set_data, update_data
from .path_utils import get_data_path

logger = logging.getLogger(__name__)


[docs] @dataclass class IndividualAlertTypeStatistics: """Statistics for individual alert types""" alert_type: str = "" alert_name: str = "" trigger_count: int = 0 last_triggered: float = field(default_factory=time.time) first_triggered: float = field(default_factory=time.time)
[docs] @dataclass class UserAlertStatistics: """Per-user statistics for alert events""" bit_alerts_played: int = 0 resubs_played: int = 0 new_subs_played: int = 0 gift_subs_played: int = 0 follow_alerts_played: int = 0 raids: int = 0 point_alerts_redeemed: int = 0 channel_points_redeemed: int = 0 # Total channel points redeemed by this user donations: int = 0 total_alerts: int = 0 first_seen: float = field(default_factory=time.time) last_seen: float = field(default_factory=time.time)
[docs] @dataclass class AlertStatistics: """Statistics for alert events""" bit_alerts_played: int = 0 total_bits: int = 0 # Total amount of bits given across all alerts resubs_played: int = 0 new_subs_played: int = 0 gift_subs_played: int = 0 total_gift_subs: int = 0 # Total number of gift subs given across all alerts follow_alerts_played: int = 0 raids: int = 0 point_alerts_redeemed: int = 0 total_channel_points_redeemed: int = ( 0 # Total channel points redeemed across all users ) donations: int = 0 # Per-user tracking user_stats: Dict[str, UserAlertStatistics] = field(default_factory=dict) # Individual alert type tracking alert_type_stats: Dict[str, IndividualAlertTypeStatistics] = field( default_factory=dict )
[docs] @dataclass class HypeTrainStatistics: """Statistics for hype train completions by level""" level_completions: Dict[int, int] = field( default_factory=dict ) # Dynamic dictionary for unlimited levels total_completions: int = 0
[docs] @dataclass class UserConnectorStatistics: """Per-user statistics for connector system""" connectors_triggered: int = 0 total_triggers: int = 0 connector_usage: Dict[str, int] = field( default_factory=dict ) # connector_name -> trigger_count first_seen: float = field(default_factory=time.time) last_seen: float = field(default_factory=time.time)
[docs] @dataclass class IndividualConnectorStatistics: """Statistics for individual connectors""" connector_name: str = "" trigger_count: int = 0 last_triggered: float = field(default_factory=time.time) first_triggered: float = field(default_factory=time.time)
[docs] @dataclass class ConnectorStatistics: """Statistics for connector system""" # connectors_created is not stored here since we get it dynamically from connector_manager connectors_triggered: int = 0 total_triggers: int = 0 # Total number of trigger events across all connectors # Per-user tracking user_stats: Dict[str, UserConnectorStatistics] = field(default_factory=dict) # Per-user individual connector tracking user_connector_stats: Dict[str, UserConnectorStatistics] = field( default_factory=dict ) # Individual connector tracking connector_stats: Dict[str, IndividualConnectorStatistics] = field( default_factory=dict )
[docs] @dataclass class UserChatbotStatistics: """Per-user statistics for chatbot system""" commands_triggered: int = 0 events_triggered: int = 0 total_interactions: int = 0 first_seen: float = field(default_factory=time.time) last_seen: float = field(default_factory=time.time)
[docs] @dataclass class IndividualCommandStatistics: """Statistics for individual commands""" command_name: str = "" usage_count: int = 0 last_used: float = field(default_factory=time.time) first_used: float = field(default_factory=time.time)
[docs] @dataclass class IndividualEventStatistics: """Statistics for individual events""" event_name: str = "" trigger_count: int = 0 last_triggered: float = field(default_factory=time.time) first_triggered: float = field(default_factory=time.time)
[docs] @dataclass class UserCommandStatistics: """Per-user statistics for individual commands""" command_usage: Dict[str, int] = field( default_factory=dict ) # command_name -> usage_count first_seen: float = field(default_factory=time.time) last_seen: float = field(default_factory=time.time)
[docs] @dataclass class UserEventStatistics: """Per-user statistics for individual events""" event_usage: Dict[str, int] = field( default_factory=dict ) # event_name -> trigger_count first_seen: float = field(default_factory=time.time) last_seen: float = field(default_factory=time.time)
[docs] @dataclass class ChatbotStatistics: """Statistics for chatbot system""" # commands_created and events_created are not stored here since we get them dynamically commands_triggered: int = 0 events_triggered: int = 0 total_interactions: int = 0 # Total commands + events triggered # Per-user tracking user_stats: Dict[str, UserChatbotStatistics] = field(default_factory=dict) # Per-user individual item tracking user_command_stats: Dict[str, UserCommandStatistics] = field(default_factory=dict) user_event_stats: Dict[str, UserEventStatistics] = field(default_factory=dict) # Individual command and event tracking command_stats: Dict[str, IndividualCommandStatistics] = field(default_factory=dict) event_stats: Dict[str, IndividualEventStatistics] = field(default_factory=dict)
[docs] @dataclass class UserQuoteStatistics: """Per-user statistics for quote system""" total_quotes_redeemed: int = 0 individual_quote_usage: Dict[str, int] = field( default_factory=dict ) # quote_id -> usage_count first_seen: float = field(default_factory=time.time) last_seen: float = field(default_factory=time.time)
[docs] @dataclass class QuoteStatistics: """Statistics for quote system""" # quotes_created is not stored here since we get it dynamically total_quotes_redeemed: int = 0 individual_quote_usage: Dict[str, int] = field( default_factory=dict ) # quote_id -> usage_count # Per-user tracking user_stats: Dict[str, UserQuoteStatistics] = field(default_factory=dict)
[docs] @dataclass class UserChatStatistics: """Per-user statistics for chat messages""" twitch_messages_received: int = 0 total_messages: int = 0 first_seen: float = field(default_factory=time.time) last_seen: float = field(default_factory=time.time)
[docs] @dataclass class ChatStatistics: """Statistics for chat messages""" twitch_messages_received: int = 0 total_messages: int = 0 # Per-user tracking user_stats: Dict[str, UserChatStatistics] = field(default_factory=dict)
[docs] @dataclass class TemplateStatistics: """Statistics for individual templates""" template_name: str = "" custom_stats: Dict[str, Any] = field( default_factory=dict ) # Custom stats submitted by templates last_updated: float = field(default_factory=time.time)
[docs] @dataclass class SessionStatistics: """Statistics for the current session""" start_time: float = field(default_factory=time.time) last_save_time: float = field(default_factory=time.time) session_duration: float = 0.0
[docs] @dataclass class StatisticsData: """Main statistics data container""" alerts: AlertStatistics = field(default_factory=AlertStatistics) hype_trains: HypeTrainStatistics = field(default_factory=HypeTrainStatistics) connectors: ConnectorStatistics = field(default_factory=ConnectorStatistics) chatbot: ChatbotStatistics = field(default_factory=ChatbotStatistics) quotes: QuoteStatistics = field(default_factory=QuoteStatistics) chat: ChatStatistics = field(default_factory=ChatStatistics) templates: Dict[str, TemplateStatistics] = field( default_factory=dict ) # template_name -> TemplateStatistics session: SessionStatistics = field(default_factory=SessionStatistics)
[docs] class StatisticsManager: """Comprehensive statistics manager for Mycelian""" def __init__(self): self.data = StatisticsData() self.save_interval = 300 # Save every 5 minutes by default self.database_path = "statistics" self.save_task = None self.is_running = False self._needs_periodic_start = False # Flag for deferred periodic saving self._periodic_thread = None # Thread for periodic saving # Separate statistics database attributes self._stats_db_path: Optional[str] = None self._stats_db_lock = threading.RLock() self._stats_db_pool: List[sqlite3.Connection] = [] self._stats_db_pool_size = 3 self._stats_db_pool_lock = threading.Lock() self._stats_db_initialized = False # Initialize the separate statistics database self._init_statistics_db() # Load existing statistics from database self._load_from_database() logger.info("StatisticsManager initialized") # ---- Separate Statistics Database Methods ---- def _init_statistics_db(self): """Initialize the separate statistics SQLite database. This database is managed solely by the StatisticsManager and stores per-user timestamped events and lifetime totals independently of the main application database_manager. """ try: db_path = get_data_path(os.path.join("data", "statistics.db")) os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True) self._stats_db_path = db_path # Create initial connection and tables conn = sqlite3.connect(db_path, check_same_thread=False) conn.row_factory = sqlite3.Row self._create_stats_tables(conn) conn.close() self._stats_db_initialized = True logger.info(f"Statistics database initialized at {db_path}") except Exception as e: logger.error(f"Failed to initialize statistics database: {e}", exc_info=True) self._stats_db_initialized = False def _create_stats_tables(self, conn: sqlite3.Connection): """Create tables for the separate statistics database.""" cursor = conn.cursor() # Per-user timestamped events table cursor.execute(""" CREATE TABLE IF NOT EXISTS user_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, event_type TEXT NOT NULL, amount REAL DEFAULT 0, alert_name TEXT DEFAULT '', timestamp REAL NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Indexes for efficient date-range and per-user queries cursor.execute( "CREATE INDEX IF NOT EXISTS idx_ue_username ON user_events(username)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_ue_event_type ON user_events(event_type)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_ue_timestamp ON user_events(timestamp)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_ue_user_type_ts " "ON user_events(username, event_type, timestamp)" ) # Lifetime totals table (replaces database_manager storage) cursor.execute(""" CREATE TABLE IF NOT EXISTS lifetime_totals ( id INTEGER PRIMARY KEY AUTOINCREMENT, category TEXT NOT NULL UNIQUE, data_json TEXT NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Migration tracking table cursor.execute(""" CREATE TABLE IF NOT EXISTS migrations ( id INTEGER PRIMARY KEY AUTOINCREMENT, migration_name TEXT NOT NULL UNIQUE, completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() def _get_stats_db_connection(self) -> Optional[sqlite3.Connection]: """Get a connection from the statistics DB pool or create a new one.""" if not self._stats_db_initialized or not self._stats_db_path: return None with self._stats_db_pool_lock: if self._stats_db_pool: return self._stats_db_pool.pop() conn = sqlite3.connect(self._stats_db_path, check_same_thread=False, timeout=30.0) conn.row_factory = sqlite3.Row return conn def _return_stats_db_connection(self, conn: Optional[sqlite3.Connection]): """Return a connection to the statistics DB pool.""" if conn is None: return with self._stats_db_pool_lock: if len(self._stats_db_pool) < self._stats_db_pool_size: self._stats_db_pool.append(conn) else: conn.close() def _close_statistics_db(self): """Close all connections in the statistics database pool.""" with self._stats_db_pool_lock: for conn in self._stats_db_pool: try: conn.close() except Exception: pass self._stats_db_pool.clear() logger.info("Statistics database connections closed") def _record_event( self, username: str, event_type: str, amount: float = 0, alert_name: str = "", ): """Record a timestamped per-user event in the statistics database. Args: username: The username associated with the event. event_type: Type of event (bit, sub, giftsub, donation, point_redeem, follow, raid). amount: Numeric amount (bits, gift quantity, points, etc.). alert_name: Name of the alert that was triggered. """ if not self._stats_db_initialized or not username: return conn = None try: conn = self._get_stats_db_connection() if conn is None: return cursor = conn.cursor() cursor.execute( "INSERT INTO user_events (username, event_type, amount, alert_name, timestamp) " "VALUES (?, ?, ?, ?, ?)", (username, event_type, amount, alert_name, time.time()), ) conn.commit() except Exception as e: logger.error(f"Error recording event: {e}") finally: self._return_stats_db_connection(conn) def _load_lifetime_totals_from_stats_db(self) -> Optional[Dict[str, Any]]: """Load lifetime totals from the statistics database. Returns: Data in the same ``{"data": {...}, "version": "1.0"}`` format used by the legacy database_manager path, or ``None`` if no data exists. """ conn = None try: conn = self._get_stats_db_connection() if conn is None: return None cursor = conn.cursor() cursor.execute("SELECT category, data_json FROM lifetime_totals") rows = cursor.fetchall() if not rows: return None data_dict: Dict[str, Any] = {} for row in rows: data_dict[row[0]] = json.loads(row[1]) return {"data": data_dict, "version": "1.0"} except Exception as e: logger.error(f"Error loading from statistics database: {e}", exc_info=True) return None finally: self._return_stats_db_connection(conn) def _save_lifetime_totals_to_stats_db(self, data_dict: Dict[str, Any]): """Save lifetime totals to the statistics database. Args: data_dict: Dictionary keyed by category name with JSON-serialisable values. """ conn = None try: conn = self._get_stats_db_connection() if conn is None: return cursor = conn.cursor() for category, category_data in data_dict.items(): cursor.execute( "INSERT OR REPLACE INTO lifetime_totals (category, data_json, updated_at) " "VALUES (?, ?, CURRENT_TIMESTAMP)", (category, json.dumps(category_data)), ) conn.commit() logger.debug("Statistics saved to statistics database successfully") except Exception as e: logger.error(f"Error saving to statistics database: {e}", exc_info=True) finally: self._return_stats_db_connection(conn) def _migrate_from_database_manager(self) -> bool: """One-time migration of lifetime totals from database_manager to the statistics database. Returns: True if migration was performed, False otherwise. """ if not self._stats_db_initialized: return False conn = None try: conn = self._get_stats_db_connection() if conn is None: return False cursor = conn.cursor() # Check if migration already completed cursor.execute( "SELECT 1 FROM migrations WHERE migration_name = 'initial_migration'" ) if cursor.fetchone(): return False # Already migrated # Load data from old database_manager old_data = get_data(self.database_path) if not old_data or "data" not in old_data: # No old data to migrate – mark as complete anyway cursor.execute( "INSERT INTO migrations (migration_name) VALUES ('initial_migration')" ) conn.commit() return False # Store each category in lifetime_totals data_dict = old_data["data"] for category, category_data in data_dict.items(): cursor.execute( "INSERT OR REPLACE INTO lifetime_totals (category, data_json) VALUES (?, ?)", (category, json.dumps(category_data)), ) # Mark migration as complete cursor.execute( "INSERT INTO migrations (migration_name) VALUES ('initial_migration')" ) conn.commit() logger.info( "Successfully migrated statistics from database_manager to statistics database" ) return True except Exception as e: logger.error(f"Error during statistics migration: {e}", exc_info=True) return False finally: self._return_stats_db_connection(conn) # ---- Per-User Event Query Methods ----
[docs] def get_user_events( self, username: str, start_time: Optional[float] = None, end_time: Optional[float] = None, event_type: Optional[str] = None, limit: int = 100, ) -> List[Dict[str, Any]]: """Query timestamped events for a user within a date range. Args: username: Username to query. start_time: Start of range (unix timestamp). None for no lower bound. end_time: End of range (unix timestamp). None for no upper bound. event_type: Filter by event type. None for all types. limit: Maximum number of events to return. Returns: List of event dictionaries ordered by timestamp descending. """ if not self._stats_db_initialized: return [] conn = None try: conn = self._get_stats_db_connection() if conn is None: return [] query = "SELECT * FROM user_events WHERE username = ?" params: list = [username] if start_time is not None: query += " AND timestamp >= ?" params.append(start_time) if end_time is not None: query += " AND timestamp <= ?" params.append(end_time) if event_type is not None: query += " AND event_type = ?" params.append(event_type) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) cursor = conn.cursor() cursor.execute(query, params) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"Error querying user events: {e}") return [] finally: self._return_stats_db_connection(conn)
[docs] def get_date_range_summary( self, start_time: float, end_time: float ) -> Dict[str, Any]: """Get aggregate stats across all users for a date range. Args: start_time: Start of range (unix timestamp). end_time: End of range (unix timestamp). Returns: Dictionary with per-event-type counts/totals and overall metrics. """ if not self._stats_db_initialized: return {} conn = None try: conn = self._get_stats_db_connection() if conn is None: return {} cursor = conn.cursor() summary: Dict[str, Any] = {} # Counts and totals grouped by event type cursor.execute( "SELECT event_type, COUNT(*) as count, SUM(amount) as total_amount " "FROM user_events WHERE timestamp >= ? AND timestamp <= ? " "GROUP BY event_type", (start_time, end_time), ) for row in cursor.fetchall(): summary[row[0]] = {"count": row[1], "total_amount": row[2] or 0} # Unique users cursor.execute( "SELECT COUNT(DISTINCT username) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ?", (start_time, end_time), ) summary["unique_users"] = cursor.fetchone()[0] # Total events cursor.execute( "SELECT COUNT(*) FROM user_events WHERE timestamp >= ? AND timestamp <= ?", (start_time, end_time), ) summary["total_events"] = cursor.fetchone()[0] return summary except Exception as e: logger.error(f"Error getting date range summary: {e}") return {} finally: self._return_stats_db_connection(conn)
[docs] def get_top_users_in_range( self, start_time: float, end_time: float, event_type: Optional[str] = None, limit: int = 10, ) -> List[Dict[str, Any]]: """Get top contributors in a date range. Args: start_time: Start of range (unix timestamp). end_time: End of range (unix timestamp). event_type: Filter by event type. None for all types. limit: Maximum number of users to return. Returns: List of dicts with username, event_count, and total_amount. """ if not self._stats_db_initialized: return [] conn = None try: conn = self._get_stats_db_connection() if conn is None: return [] cursor = conn.cursor() if event_type: cursor.execute( "SELECT username, COUNT(*) as event_count, SUM(amount) as total_amount " "FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = ? " "GROUP BY username ORDER BY total_amount DESC, event_count DESC LIMIT ?", (start_time, end_time, event_type, limit), ) else: cursor.execute( "SELECT username, COUNT(*) as event_count, SUM(amount) as total_amount " "FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? " "GROUP BY username ORDER BY event_count DESC LIMIT ?", (start_time, end_time, limit), ) return [ { "username": row[0], "event_count": row[1], "total_amount": row[2] or 0, } for row in cursor.fetchall() ] except Exception as e: logger.error(f"Error getting top users in range: {e}") return [] finally: self._return_stats_db_connection(conn)
[docs] def get_date_range_highlights( self, start_time: float, end_time: float ) -> Dict[str, Any]: """Get highlight data for the export card. Args: start_time: Start of range (unix timestamp). end_time: End of range (unix timestamp). Returns: Dictionary of highlight metrics (top donors, totals, etc.). """ if not self._stats_db_initialized: return {} conn = None try: conn = self._get_stats_db_connection() if conn is None: return {} cursor = conn.cursor() highlights: Dict[str, Any] = {} ts_params = (start_time, end_time) # Total bits and top bit donor cursor.execute( "SELECT username, SUM(amount) as total_bits FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'bit' " "GROUP BY username ORDER BY total_bits DESC LIMIT 1", ts_params, ) row = cursor.fetchone() highlights["top_bit_donor"] = ( {"username": row[0], "total": row[1]} if row else None ) cursor.execute( "SELECT SUM(amount) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'bit'", ts_params, ) row = cursor.fetchone() highlights["total_bits"] = int(row[0] or 0) if row else 0 # Biggest single bit donation cursor.execute( "SELECT username, amount FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'bit' " "ORDER BY amount DESC LIMIT 1", ts_params, ) row = cursor.fetchone() highlights["biggest_bit_donation"] = ( {"username": row[0], "amount": int(row[1])} if row else None ) # Total subs (new + resub) cursor.execute( "SELECT COUNT(*) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'sub'", ts_params, ) highlights["total_subs"] = cursor.fetchone()[0] # Total gift subs and top gifter cursor.execute( "SELECT SUM(amount) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'giftsub'", ts_params, ) row = cursor.fetchone() highlights["total_gift_subs"] = int(row[0] or 0) if row else 0 cursor.execute( "SELECT username, SUM(amount) as total_gifts FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'giftsub' " "GROUP BY username ORDER BY total_gifts DESC LIMIT 1", ts_params, ) row = cursor.fetchone() highlights["top_gifter"] = ( {"username": row[0], "total": int(row[1])} if row else None ) # Total donations cursor.execute( "SELECT COUNT(*) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'donation'", ts_params, ) highlights["total_donations"] = cursor.fetchone()[0] # Total channel points cursor.execute( "SELECT SUM(amount) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'point_redeem'", ts_params, ) row = cursor.fetchone() highlights["total_channel_points"] = int(row[0] or 0) if row else 0 # Total follows cursor.execute( "SELECT COUNT(*) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'follow'", ts_params, ) highlights["total_follows"] = cursor.fetchone()[0] # Total raids cursor.execute( "SELECT COUNT(*) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? AND event_type = 'raid'", ts_params, ) highlights["total_raids"] = cursor.fetchone()[0] # Most active user overall cursor.execute( "SELECT username, COUNT(*) as event_count FROM user_events " "WHERE timestamp >= ? AND timestamp <= ? " "GROUP BY username ORDER BY event_count DESC LIMIT 1", ts_params, ) row = cursor.fetchone() highlights["most_active_user"] = ( {"username": row[0], "event_count": row[1]} if row else None ) # Unique users cursor.execute( "SELECT COUNT(DISTINCT username) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ?", ts_params, ) highlights["unique_users"] = cursor.fetchone()[0] # Total events cursor.execute( "SELECT COUNT(*) FROM user_events " "WHERE timestamp >= ? AND timestamp <= ?", ts_params, ) highlights["total_events"] = cursor.fetchone()[0] return highlights except Exception as e: logger.error(f"Error getting date range highlights: {e}") return {} finally: self._return_stats_db_connection(conn)
[docs] def get_all_tracked_usernames(self) -> List[str]: """Get all usernames that have been tracked in the events database. Returns: Sorted list of unique usernames. """ if not self._stats_db_initialized: # Fall back to in-memory per-user stats usernames: set = set() usernames.update(self.data.alerts.user_stats.keys()) usernames.update(self.data.connectors.user_stats.keys()) usernames.update(self.data.chatbot.user_stats.keys()) usernames.update(self.data.quotes.user_stats.keys()) usernames.update(self.data.chat.user_stats.keys()) return sorted(usernames) conn = None try: conn = self._get_stats_db_connection() if conn is None: return [] cursor = conn.cursor() cursor.execute("SELECT DISTINCT username FROM user_events ORDER BY username") return [row[0] for row in cursor.fetchall()] except Exception as e: logger.error(f"Error getting tracked usernames: {e}") return [] finally: self._return_stats_db_connection(conn)
# ---- End Separate Statistics Database Methods ----
[docs] def initialize_with_data(self, all_data: Dict[str, Any]): """Initialize the statistics manager with pre-loaded data Args: all_data: Dictionary mapping database paths to their data """ try: stored_data = all_data.get("Statistics", {}) if stored_data and "data" in stored_data: # Load each statistics category data_dict = stored_data["data"] if "alerts" in data_dict: alerts_dict = data_dict["alerts"] # Handle user statistics separately user_stats = {} if "user_stats" in alerts_dict: user_stats_dict = alerts_dict.pop("user_stats") for username, user_data in user_stats_dict.items(): user_stats[username] = UserAlertStatistics(**user_data) # Handle individual alert type stats alert_type_stats = {} if "alert_type_stats" in alerts_dict: alert_type_stats_dict = alerts_dict.pop("alert_type_stats") for alert_type, alert_data in alert_type_stats_dict.items(): alert_type_stats[alert_type] = ( IndividualAlertTypeStatistics(**alert_data) ) self.data.alerts = AlertStatistics(**alerts_dict) self.data.alerts.user_stats = user_stats self.data.alerts.alert_type_stats = alert_type_stats if "hype_trains" in data_dict: hype_dict = data_dict["hype_trains"].copy() logger.debug(f"Loading hype_trains data: {hype_dict}") self.data.hype_trains = HypeTrainStatistics(**hype_dict) if "stream_sessions" in data_dict: stream_sessions_dict = data_dict["stream_sessions"].copy() self.data.stream_sessions = StreamSessionStatistics( **stream_sessions_dict ) if "viewer_engagement" in data_dict: viewer_engagement_dict = data_dict["viewer_engagement"].copy() self.data.viewer_engagement = ViewerEngagementStatistics( **viewer_engagement_dict ) if "performance" in data_dict: performance_dict = data_dict["performance"].copy() self.data.performance = PerformanceStatistics(**performance_dict) if "settings" in data_dict: settings_dict = data_dict["settings"].copy() self.data.settings = StatisticsSettings(**settings_dict) logger.info("Statistics loaded from pre-loaded data") else: logger.info( "No statistics data found in pre-loaded data, using defaults" ) except Exception as e: logger.error( f"Error loading statistics from pre-loaded data: {str(e)}", exc_info=True, )
def _load_from_database(self): """Load statistics from database""" try: stored_data = None # Try loading from the new statistics database first if self._stats_db_initialized: stored_data = self._load_lifetime_totals_from_stats_db() # If no data in new DB, try migrating from old database_manager if not stored_data: old_data = get_data(self.database_path) if old_data and "data" in old_data: stored_data = old_data # Migrate to new database if available if self._stats_db_initialized: self._migrate_from_database_manager() if stored_data and "data" in stored_data: # Load each statistics category data_dict = stored_data["data"] if "alerts" in data_dict: alerts_dict = data_dict["alerts"] # Handle user statistics separately user_stats = {} if "user_stats" in alerts_dict: user_stats_dict = alerts_dict.pop("user_stats") for username, user_data in user_stats_dict.items(): user_stats[username] = UserAlertStatistics(**user_data) # Handle individual alert type stats alert_type_stats = {} if "alert_type_stats" in alerts_dict: alert_type_stats_dict = alerts_dict.pop("alert_type_stats") for alert_type, alert_data in alert_type_stats_dict.items(): alert_type_stats[alert_type] = ( IndividualAlertTypeStatistics(**alert_data) ) self.data.alerts = AlertStatistics(**alerts_dict) self.data.alerts.user_stats = user_stats self.data.alerts.alert_type_stats = alert_type_stats if "hype_trains" in data_dict: hype_dict = data_dict["hype_trains"].copy() logger.debug(f"Loading hype_trains data: {hype_dict}") # Load level_completions dictionary level_completions = {} if "level_completions" in hype_dict: level_completions_dict = hype_dict.pop("level_completions") logger.debug( f"Found level_completions in hype_dict: {level_completions_dict}" ) if isinstance(level_completions_dict, dict): # Convert string keys to integers (JSON serialization converts int keys to strings) level_completions = { int(k): v for k, v in level_completions_dict.items() } elif isinstance(level_completions_dict, list): # Handle corrupted data - if it's a list, try to convert to dict with indices as keys logger.warning( f"level_completions is corrupted as list instead of dict: {level_completions_dict}. " "Converting to dict with indices as keys." ) level_completions = { i: v for i, v in enumerate(level_completions_dict) if v is not None } elif level_completions_dict is not None: logger.warning( f"level_completions is not a dict or list: {type(level_completions_dict)} = {level_completions_dict}. " "Initializing as empty dict." ) else: logger.debug( "No level_completions found in hype_dict - initializing as empty dict" ) hype_stats = HypeTrainStatistics(**hype_dict) hype_stats.level_completions = level_completions self.data.hype_trains = hype_stats logger.info( f"Hype train statistics loaded - level_completions: {level_completions}, total: {hype_stats.total_completions}" ) # Check and repair data integrity after loading integrity_issues = self.check_data_integrity() if integrity_issues: logger.warning( f"Data integrity issues found: {integrity_issues}" ) if self.repair_data_integrity(): logger.info("Data integrity automatically repaired") else: logger.error("Failed to repair data integrity issues") if "connectors" in data_dict: # Only load connectors_triggered and total_triggers, not connectors_created # since we get that dynamically from connector_manager conn_dict = data_dict["connectors"] self.data.connectors.connectors_triggered = conn_dict.get( "connectors_triggered", 0 ) self.data.connectors.total_triggers = conn_dict.get( "total_triggers", 0 ) # Handle user statistics separately user_stats = {} if "user_stats" in conn_dict: user_stats_dict = conn_dict["user_stats"] for username, user_data in user_stats_dict.items(): user_stats[username] = UserConnectorStatistics(**user_data) self.data.connectors.user_stats = user_stats # Handle per-user individual connector stats user_connector_stats = {} if "user_connector_stats" in conn_dict: user_connector_stats_dict = conn_dict["user_connector_stats"] for username, user_data in user_connector_stats_dict.items(): user_connector_stats[username] = UserConnectorStatistics( **user_data ) self.data.connectors.user_connector_stats = user_connector_stats # Handle individual connector stats connector_stats = {} if "connector_stats" in conn_dict: connector_stats_dict = conn_dict["connector_stats"] for ( connector_name, connector_data, ) in connector_stats_dict.items(): connector_stats[connector_name] = ( IndividualConnectorStatistics(**connector_data) ) self.data.connectors.connector_stats = connector_stats logger.info( f"Loaded connectors statistics from database: {self.data.connectors.connectors_triggered} triggered, {self.data.connectors.total_triggers} total triggers" ) if "chatbot" in data_dict: bot_dict = data_dict["chatbot"] # Only load triggered counts and total interactions, not created counts self.data.chatbot.commands_triggered = bot_dict.get( "commands_triggered", 0 ) self.data.chatbot.events_triggered = bot_dict.get( "events_triggered", 0 ) self.data.chatbot.total_interactions = bot_dict.get( "total_interactions", 0 ) # Handle user statistics separately user_stats = {} if "user_stats" in bot_dict: user_stats_dict = bot_dict["user_stats"] for username, user_data in user_stats_dict.items(): user_stats[username] = UserChatbotStatistics(**user_data) self.data.chatbot.user_stats = user_stats # Handle per-user individual command and event stats user_command_stats = {} user_event_stats = {} if "user_command_stats" in bot_dict: user_command_stats_dict = bot_dict["user_command_stats"] for username, user_data in user_command_stats_dict.items(): user_command_stats[username] = UserCommandStatistics( **user_data ) if "user_event_stats" in bot_dict: user_event_stats_dict = bot_dict["user_event_stats"] for username, user_data in user_event_stats_dict.items(): user_event_stats[username] = UserEventStatistics( **user_data ) self.data.chatbot.user_command_stats = user_command_stats self.data.chatbot.user_event_stats = user_event_stats # Handle individual command and event stats command_stats = {} event_stats = {} if "command_stats" in bot_dict: command_stats_dict = bot_dict["command_stats"] for command_name, command_data in command_stats_dict.items(): command_stats[command_name] = IndividualCommandStatistics( **command_data ) if "event_stats" in bot_dict: event_stats_dict = bot_dict["event_stats"] for event_name, event_data in event_stats_dict.items(): event_stats[event_name] = IndividualEventStatistics( **event_data ) self.data.chatbot.command_stats = command_stats self.data.chatbot.event_stats = event_stats if "quotes" in data_dict: quotes_dict = data_dict["quotes"] # Only load redeemed count and usage, not created count self.data.quotes.total_quotes_redeemed = quotes_dict.get( "total_quotes_redeemed", 0 ) self.data.quotes.individual_quote_usage = quotes_dict.get( "individual_quote_usage", {} ) # Handle user statistics separately user_stats = {} if "user_stats" in quotes_dict: user_stats_dict = quotes_dict["user_stats"] for username, user_data in user_stats_dict.items(): user_stats[username] = UserQuoteStatistics(**user_data) self.data.quotes.user_stats = user_stats if "chat" in data_dict: chat_dict = data_dict["chat"] # Handle user statistics separately user_stats = {} if "user_stats" in chat_dict: user_stats_dict = chat_dict.pop("user_stats") for username, user_data in user_stats_dict.items(): user_stats[username] = UserChatStatistics(**user_data) self.data.chat = ChatStatistics(**chat_dict) self.data.chat.user_stats = user_stats if "templates" in data_dict: templates_dict = data_dict["templates"] for template_name, template_data in templates_dict.items(): template_stats = TemplateStatistics(**template_data) self.data.templates[template_name] = template_stats # Always update session start time to current session, not loaded value self.data.session.start_time = time.time() logger.info("Statistics loaded from database successfully") else: logger.info("No existing statistics found in database, starting fresh") except Exception as e: logger.error(f"Error loading statistics from database: {e}") def _save_to_database(self): """Save current statistics to database""" try: logger.debug("Starting statistics save to database") # Update session duration self.data.session.session_duration = ( time.time() - self.data.session.start_time ) self.data.session.last_save_time = time.time() # Convert to dictionary for storage data_dict = { "data": { "alerts": { "bit_alerts_played": self.data.alerts.bit_alerts_played, "total_bits": self.data.alerts.total_bits, "resubs_played": self.data.alerts.resubs_played, "new_subs_played": self.data.alerts.new_subs_played, "gift_subs_played": self.data.alerts.gift_subs_played, "total_gift_subs": self.data.alerts.total_gift_subs, "follow_alerts_played": self.data.alerts.follow_alerts_played, "raids": self.data.alerts.raids, "point_alerts_redeemed": self.data.alerts.point_alerts_redeemed, "total_channel_points_redeemed": self.data.alerts.total_channel_points_redeemed, "donations": self.data.alerts.donations, "user_stats": { username: { "bit_alerts_played": stats.bit_alerts_played, "resubs_played": stats.resubs_played, "new_subs_played": stats.new_subs_played, "gift_subs_played": stats.gift_subs_played, "follow_alerts_played": stats.follow_alerts_played, "raids": stats.raids, "point_alerts_redeemed": stats.point_alerts_redeemed, "channel_points_redeemed": stats.channel_points_redeemed, "donations": stats.donations, "total_alerts": stats.total_alerts, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } for username, stats in self.data.alerts.user_stats.items() }, "alert_type_stats": { alert_type: { "alert_type": stats.alert_type, "alert_name": stats.alert_name, "trigger_count": stats.trigger_count, "first_triggered": stats.first_triggered, "last_triggered": stats.last_triggered, } for alert_type, stats in self.data.alerts.alert_type_stats.items() }, }, "hype_trains": { "level_completions": dict( self.data.hype_trains.level_completions ) if isinstance(self.data.hype_trains.level_completions, dict) else {}, "total_completions": self.data.hype_trains.total_completions, }, "connectors": { # connectors_created is not stored since we get it dynamically from connector_manager "connectors_triggered": self.data.connectors.connectors_triggered, "total_triggers": self.data.connectors.total_triggers, "user_stats": { username: { "connectors_triggered": stats.connectors_triggered, "total_triggers": stats.total_triggers, "connector_usage": stats.connector_usage, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } for username, stats in self.data.connectors.user_stats.items() }, "user_connector_stats": { username: { "connectors_triggered": stats.connectors_triggered, "total_triggers": stats.total_triggers, "connector_usage": stats.connector_usage, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } for username, stats in self.data.connectors.user_connector_stats.items() }, "connector_stats": { connector_name: { "connector_name": stats.connector_name, "trigger_count": stats.trigger_count, "first_triggered": stats.first_triggered, "last_triggered": stats.last_triggered, } for connector_name, stats in self.data.connectors.connector_stats.items() }, }, "chatbot": { # commands_created and events_created are not stored since we get them dynamically "commands_triggered": self.data.chatbot.commands_triggered, "events_triggered": self.data.chatbot.events_triggered, "total_interactions": self.data.chatbot.total_interactions, "user_stats": { username: { "commands_triggered": stats.commands_triggered, "events_triggered": stats.events_triggered, "total_interactions": stats.total_interactions, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } for username, stats in self.data.chatbot.user_stats.items() }, "user_command_stats": { username: { "command_usage": stats.command_usage, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } for username, stats in self.data.chatbot.user_command_stats.items() }, "user_event_stats": { username: { "event_usage": stats.event_usage, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } for username, stats in self.data.chatbot.user_event_stats.items() }, "command_stats": { command_name: { "command_name": stats.command_name, "usage_count": stats.usage_count, "first_used": stats.first_used, "last_used": stats.last_used, } for command_name, stats in self.data.chatbot.command_stats.items() }, "event_stats": { event_name: { "event_name": stats.event_name, "trigger_count": stats.trigger_count, "first_triggered": stats.first_triggered, "last_triggered": stats.last_triggered, } for event_name, stats in self.data.chatbot.event_stats.items() }, }, "quotes": { # quotes_created is not stored since we get it dynamically "total_quotes_redeemed": self.data.quotes.total_quotes_redeemed, "individual_quote_usage": self.data.quotes.individual_quote_usage, "user_stats": { username: { "total_quotes_redeemed": stats.total_quotes_redeemed, "individual_quote_usage": stats.individual_quote_usage, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } for username, stats in self.data.quotes.user_stats.items() }, }, "chat": { "twitch_messages_received": self.data.chat.twitch_messages_received, "total_messages": self.data.chat.total_messages, "user_stats": { username: { "twitch_messages_received": stats.twitch_messages_received, "total_messages": stats.total_messages, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } for username, stats in self.data.chat.user_stats.items() }, }, "templates": { template_name: { "template_name": stats.template_name, "custom_stats": stats.custom_stats, "last_updated": stats.last_updated, } for template_name, stats in self.data.templates.items() }, "session": { "start_time": self.data.session.start_time, "last_save_time": self.data.session.last_save_time, "session_duration": self.data.session.session_duration, }, }, "last_updated": time.time(), "version": "1.0", } logger.debug( f"Saving hype_trains data: {data_dict.get('data', {}).get('hype_trains', 'NOT FOUND')}" ) # Save to the separate statistics database if available if self._stats_db_initialized: self._save_lifetime_totals_to_stats_db(data_dict["data"]) else: # Fallback to old database_manager success = set_data(self.database_path, data_dict) if success: logger.debug("Statistics saved to database successfully") else: logger.error("Failed to save statistics to database") except Exception as e: logger.error(f"Error saving statistics to database: {e}") def _track_user_alert(self, username: str, alert_type: str): """Helper method to track per-user alert statistics""" if username not in self.data.alerts.user_stats: self.data.alerts.user_stats[username] = UserAlertStatistics() user_stats = self.data.alerts.user_stats[username] current_time = time.time() # Update the specific alert type if hasattr(user_stats, alert_type): setattr(user_stats, alert_type, getattr(user_stats, alert_type) + 1) # Update total alerts and timestamps user_stats.total_alerts += 1 user_stats.last_seen = current_time # Set first_seen if this is the first time if user_stats.total_alerts == 1: user_stats.first_seen = current_time def _track_alert_type(self, alert_type: str, alert_name: str = ""): """Helper method to track individual alert type statistics""" if alert_type not in self.data.alerts.alert_type_stats: self.data.alerts.alert_type_stats[alert_type] = ( IndividualAlertTypeStatistics( alert_type=alert_type, alert_name=alert_name ) ) alert_stats = self.data.alerts.alert_type_stats[alert_type] current_time = time.time() # Update trigger count and timestamps alert_stats.trigger_count += 1 alert_stats.last_triggered = current_time # Set first_triggered if this is the first time if alert_stats.trigger_count == 1: alert_stats.first_triggered = current_time # Update alert_name if provided and not set if alert_name and not alert_stats.alert_name: alert_stats.alert_name = alert_name # Alert Statistics Methods
[docs] def increment_bit_alerts( self, bit_amount: int = 0, username: Optional[str] = None, alert_name: str = "" ): """Increment bit alerts played count and total bits amount""" self.data.alerts.bit_alerts_played += 1 self.data.alerts.total_bits += bit_amount # Track per-user statistics if username provided if username: self._track_user_alert(username, "bit_alerts_played") self._record_event(username, "bit", bit_amount, alert_name) # Track individual alert type self._track_alert_type("bit", alert_name) logger.info( f"Bit alerts incremented to: {self.data.alerts.bit_alerts_played}, total bits: {self.data.alerts.total_bits}" )
[docs] def increment_resubs(self, username: Optional[str] = None, alert_name: str = ""): """Increment resubs played count""" self.data.alerts.resubs_played += 1 # Track per-user statistics if username provided if username: self._track_user_alert(username, "resubs_played") self._record_event(username, "sub", 0, alert_name) # Track individual alert type self._track_alert_type("resub", alert_name) logger.debug(f"Resubs incremented to: {self.data.alerts.resubs_played}")
[docs] def increment_new_subs(self, username: Optional[str] = None, alert_name: str = ""): """Increment new subs played count""" self.data.alerts.new_subs_played += 1 # Track per-user statistics if username provided if username: self._track_user_alert(username, "new_subs_played") self._record_event(username, "sub", 0, alert_name) # Track individual alert type self._track_alert_type("sub", alert_name) logger.debug(f"New subs incremented to: {self.data.alerts.new_subs_played}")
[docs] def increment_gift_subs( self, gift_quantity: int = 1, username: Optional[str] = None, alert_name: str = "", ): """Increment gift subs played count and total gift subs amount""" self.data.alerts.gift_subs_played += 1 self.data.alerts.total_gift_subs += gift_quantity # Track per-user statistics if username provided if username: self._track_user_alert(username, "gift_subs_played") self._record_event(username, "giftsub", gift_quantity, alert_name) # Track individual alert type self._track_alert_type("giftsub", alert_name) logger.debug( f"Gift subs incremented to: {self.data.alerts.gift_subs_played}, total gift subs: {self.data.alerts.total_gift_subs}" )
[docs] def increment_follow_alerts( self, username: Optional[str] = None, alert_name: str = "" ): """Increment follow alerts played count""" self.data.alerts.follow_alerts_played += 1 # Track per-user statistics if username provided if username: self._track_user_alert(username, "follow_alerts_played") self._record_event(username, "follow", 0, alert_name) # Track individual alert type self._track_alert_type("follow", alert_name) logger.info( f"Follow alerts incremented to: {self.data.alerts.follow_alerts_played}" )
[docs] def increment_raids(self, username: Optional[str] = None, alert_name: str = ""): """Increment raids count""" self.data.alerts.raids += 1 # Track per-user statistics if username provided if username: self._track_user_alert(username, "raids") self._record_event(username, "raid", 0, alert_name) # Track individual alert type self._track_alert_type("raid", alert_name) logger.info(f"Raids incremented to: {self.data.alerts.raids}")
[docs] def increment_point_alerts( self, username: Optional[str] = None, alert_name: str = "" ): """Increment point alerts redeemed count""" self.data.alerts.point_alerts_redeemed += 1 # Track per-user statistics if username provided if username: self._track_user_alert(username, "point_alerts_redeemed") self._record_event(username, "point_redeem", 0, alert_name) # Track individual alert type self._track_alert_type("point", alert_name) logger.info( f"Point alerts incremented to: {self.data.alerts.point_alerts_redeemed}" )
[docs] def increment_channel_points_redeemed( self, points_amount: int, username: Optional[str] = None ): """Increment total channel points redeemed and track per-user statistics""" # Validate points_amount if points_amount is None or not isinstance(points_amount, (int, float)): logger.warning( f"Invalid points_amount received: {points_amount} (type: {type(points_amount)})" ) return # Convert to int if it's a float points_amount = int(points_amount) if points_amount <= 0: logger.debug( f"Skipping increment with non-positive points_amount: {points_amount}" ) return self.data.alerts.total_channel_points_redeemed += points_amount # Track per-user statistics if username provided if username: if username not in self.data.alerts.user_stats: self.data.alerts.user_stats[username] = UserAlertStatistics() self.data.alerts.user_stats[ username ].channel_points_redeemed += points_amount self.data.alerts.user_stats[username].last_seen = time.time() logger.info( f"Channel points redeemed incremented by {points_amount} to: {self.data.alerts.total_channel_points_redeemed}" )
[docs] def increment_donations(self, username: Optional[str] = None, alert_name: str = ""): """Increment donations count""" self.data.alerts.donations += 1 # Track per-user statistics if username provided if username: self._track_user_alert(username, "donations") self._record_event(username, "donation", 0, alert_name) # Track individual alert type self._track_alert_type("donation", alert_name) logger.info(f"Donations incremented to: {self.data.alerts.donations}")
# Hype Train Statistics Methods
[docs] def increment_hype_train_completion(self, level: int): """Increment hype train completion for specific level (supports unlimited levels)""" # Ensure level is an integer try: level = int(level) except (ValueError, TypeError): logger.error( f"Invalid hype train level received: {level} (type: {type(level)})" ) return # Ensure level_completions is always a dict (defensive programming) if not isinstance(self.data.hype_trains.level_completions, dict): logger.error( f"level_completions is not a dict: {type(self.data.hype_trains.level_completions)} = {self.data.hype_trains.level_completions}. " "Resetting to empty dict." ) self.data.hype_trains.level_completions = {} # Use dictionary for dynamic levels (unlimited - no cap) self.data.hype_trains.level_completions[level] = ( self.data.hype_trains.level_completions.get(level, 0) + 1 ) self.data.hype_trains.total_completions += 1 logger.info( f"Hype train level {level} completion recorded. Current level_completions: {self.data.hype_trains.level_completions}. Total: {self.data.hype_trains.total_completions}" )
# Connector Statistics Methods
[docs] def increment_connectors_triggered( self, username: Optional[str] = None, connector_name: str = "" ): """Increment connectors triggered count""" self.data.connectors.connectors_triggered += 1 # Track per-user statistics if username provided if username: self._track_user_connector(username, 1) # Track individual connector if name provided if connector_name: self._track_individual_connector(connector_name) logger.debug( f"Connectors triggered incremented to: {self.data.connectors.connectors_triggered}" )
[docs] def increment_connector_triggers( self, count: int = 1, username: Optional[str] = None, connector_name: str = "" ): """Increment total connector triggers""" self.data.connectors.total_triggers += count # Track per-user statistics if username provided if username: self._track_user_connector(username, count) # Track per-user individual connector usage if both username and connector_name provided if connector_name: for _ in range(count): self._track_user_connector(username, connector_name) # Track individual connector if name provided if connector_name: for _ in range(count): self._track_individual_connector(connector_name) logger.debug( f"Total connector triggers incremented to: {self.data.connectors.total_triggers}" )
def _track_user_chatbot(self, username: str, stat_type: str): """Helper method to track per-user chatbot statistics""" if username not in self.data.chatbot.user_stats: self.data.chatbot.user_stats[username] = UserChatbotStatistics() user_stats = self.data.chatbot.user_stats[username] current_time = time.time() # Update the specific statistic if hasattr(user_stats, stat_type): setattr(user_stats, stat_type, getattr(user_stats, stat_type) + 1) # Update total interactions and timestamps user_stats.total_interactions += 1 user_stats.last_seen = current_time # Set first_seen if this is the first time if user_stats.total_interactions == 1: user_stats.first_seen = current_time def _track_command(self, command_name: str): """Helper method to track individual command statistics""" if command_name not in self.data.chatbot.command_stats: self.data.chatbot.command_stats[command_name] = IndividualCommandStatistics( command_name=command_name ) command_stats = self.data.chatbot.command_stats[command_name] current_time = time.time() # Update usage count and timestamps command_stats.usage_count += 1 command_stats.last_used = current_time # Set first_used if this is the first time if command_stats.usage_count == 1: command_stats.first_used = current_time def _track_event(self, event_name: str): """Helper method to track individual event statistics""" if event_name not in self.data.chatbot.event_stats: self.data.chatbot.event_stats[event_name] = IndividualEventStatistics( event_name=event_name ) event_stats = self.data.chatbot.event_stats[event_name] current_time = time.time() # Update trigger count and timestamps event_stats.trigger_count += 1 event_stats.last_triggered = current_time # Set first_triggered if this is the first time if event_stats.trigger_count == 1: event_stats.first_triggered = current_time def _track_user_command(self, username: str, command_name: str): """Helper method to track per-user command usage""" if username not in self.data.chatbot.user_command_stats: self.data.chatbot.user_command_stats[username] = UserCommandStatistics() user_stats = self.data.chatbot.user_command_stats[username] current_time = time.time() # Update command usage if command_name not in user_stats.command_usage: user_stats.command_usage[command_name] = 0 user_stats.command_usage[command_name] += 1 user_stats.last_seen = current_time # Set first_seen if this is the first time if len(user_stats.command_usage) == 1 and all( count == 1 for count in user_stats.command_usage.values() ): user_stats.first_seen = current_time def _track_user_event(self, username: str, event_name: str): """Helper method to track per-user event usage""" if username not in self.data.chatbot.user_event_stats: self.data.chatbot.user_event_stats[username] = UserEventStatistics() user_stats = self.data.chatbot.user_event_stats[username] current_time = time.time() # Update event usage if event_name not in user_stats.event_usage: user_stats.event_usage[event_name] = 0 user_stats.event_usage[event_name] += 1 user_stats.last_seen = current_time # Set first_seen if this is the first time if len(user_stats.event_usage) == 1 and all( count == 1 for count in user_stats.event_usage.values() ): user_stats.first_seen = current_time def _track_user_connector(self, username: str, connector_name: str): """Helper method to track per-user connector usage""" if username not in self.data.connectors.user_connector_stats: self.data.connectors.user_connector_stats[username] = ( UserConnectorStatistics() ) user_stats = self.data.connectors.user_connector_stats[username] current_time = time.time() # Update connector usage if connector_name not in user_stats.connector_usage: user_stats.connector_usage[connector_name] = 0 user_stats.connector_usage[connector_name] += 1 user_stats.connectors_triggered += 1 user_stats.total_triggers += 1 user_stats.last_seen = current_time # Set first_seen if this is the first time if user_stats.total_triggers == 1: user_stats.first_seen = current_time def _track_individual_connector(self, connector_name: str): """Helper method to track individual connector usage (global, not per-user)""" if connector_name not in self.data.connectors.connector_stats: self.data.connectors.connector_stats[connector_name] = ( IndividualConnectorStatistics( connector_name=connector_name, trigger_count=0, last_triggered=time.time(), first_triggered=time.time(), ) ) connector_stat = self.data.connectors.connector_stats[connector_name] connector_stat.trigger_count += 1 connector_stat.last_triggered = time.time() logger.debug( f"Tracked connector '{connector_name}' usage: {connector_stat.trigger_count}" ) # Chatbot Statistics Methods
[docs] def increment_commands_triggered( self, username: Optional[str] = None, command_name: str = "" ): """Increment chatbot commands triggered count""" self.data.chatbot.commands_triggered += 1 self.data.chatbot.total_interactions += 1 # Track per-user statistics if username provided if username: self._track_user_chatbot(username, "commands_triggered") # Track per-user individual command usage if both username and command_name provided if command_name: self._track_user_command(username, command_name) # Track individual command if name provided if command_name: self._track_command(command_name) logger.debug( f"Commands triggered incremented to: {self.data.chatbot.commands_triggered}" )
[docs] def increment_events_triggered( self, username: Optional[str] = None, event_name: str = "" ): """Increment chatbot events triggered count""" self.data.chatbot.events_triggered += 1 self.data.chatbot.total_interactions += 1 # Track per-user statistics if username provided if username: self._track_user_chatbot(username, "events_triggered") # Track per-user individual event usage if both username and event_name provided if event_name: self._track_user_event(username, event_name) # Track individual event if name provided if event_name: self._track_event(event_name) logger.debug( f"Events triggered incremented to: {self.data.chatbot.events_triggered}" )
def _track_user_quote(self, username: str, quote_id: str): """Helper method to track per-user quote statistics""" if username not in self.data.quotes.user_stats: self.data.quotes.user_stats[username] = UserQuoteStatistics() user_stats = self.data.quotes.user_stats[username] current_time = time.time() # Update individual quote usage if quote_id not in user_stats.individual_quote_usage: user_stats.individual_quote_usage[quote_id] = 0 user_stats.individual_quote_usage[quote_id] += 1 # Update total quotes redeemed and timestamps user_stats.total_quotes_redeemed += 1 user_stats.last_seen = current_time # Set first_seen if this is the first time if user_stats.total_quotes_redeemed == 1: user_stats.first_seen = current_time # Quote Statistics Methods
[docs] def increment_quote_redeemed(self, quote_id: str, username: Optional[str] = None): """Increment quote redeemed count for specific quote""" if quote_id not in self.data.quotes.individual_quote_usage: self.data.quotes.individual_quote_usage[quote_id] = 0 self.data.quotes.individual_quote_usage[quote_id] += 1 self.data.quotes.total_quotes_redeemed += 1 # Track per-user statistics if username provided if username: self._track_user_quote(username, quote_id) logger.debug( f"Quote {quote_id} redeemed. Total: {self.data.quotes.total_quotes_redeemed}" )
def _track_user_chat(self, username: str): """Helper method to track per-user chat statistics""" if username not in self.data.chat.user_stats: self.data.chat.user_stats[username] = UserChatStatistics() user_stats = self.data.chat.user_stats[username] current_time = time.time() # Update chat statistics user_stats.twitch_messages_received += 1 user_stats.total_messages += 1 user_stats.last_seen = current_time # Set first_seen if this is the first time if user_stats.total_messages == 1: user_stats.first_seen = current_time # Chat Statistics Methods
[docs] def increment_twitch_messages(self, username: Optional[str] = None): """Increment Twitch messages received count""" self.data.chat.twitch_messages_received += 1 self.data.chat.total_messages += 1 # Track per-user statistics if username provided if username: self._track_user_chat(username) logger.info( f"Twitch messages incremented to: {self.data.chat.twitch_messages_received} (total: {self.data.chat.total_messages})" )
# Template Statistics Methods
[docs] def submit_template_stat( self, template_name: str, stat_name: str, stat_value: Any, increment: bool = False, ): """ Submit a custom statistic from a template Args: template_name: Name of the template submitting the stat stat_name: Name of the statistic stat_value: Value of the statistic increment: If True, increment existing value by stat_value, otherwise set to stat_value """ try: # Ensure template entry exists if template_name not in self.data.templates: self.data.templates[template_name] = TemplateStatistics( template_name=template_name ) template_stats = self.data.templates[template_name] # Handle different data types and increment logic if increment: # Increment existing value if stat_name in template_stats.custom_stats: current_value = template_stats.custom_stats[stat_name] if isinstance(current_value, (int, float)) and isinstance( stat_value, (int, float) ): template_stats.custom_stats[stat_name] = ( current_value + stat_value ) elif isinstance(current_value, list) and isinstance( stat_value, list ): template_stats.custom_stats[stat_name] = ( current_value + stat_value ) else: # For non-numeric types, just set the new value template_stats.custom_stats[stat_name] = stat_value else: # First time setting this stat template_stats.custom_stats[stat_name] = stat_value else: # Set the value directly template_stats.custom_stats[stat_name] = stat_value # Update last modified time template_stats.last_updated = time.time() logger.debug( f"Template '{template_name}' stat '{stat_name}' updated to: {template_stats.custom_stats[stat_name]}" ) return True except Exception as e: logger.error(f"Error submitting template stat: {e}", exc_info=True) return False
[docs] def get_template_stats(self, template_name: Optional[str] = None) -> Dict[str, Any]: """ Get template statistics Args: template_name: Specific template name, or None for all templates Returns: Dictionary of template statistics """ if template_name: if template_name in self.data.templates: template_stats = self.data.templates[template_name] return { "template_name": template_stats.template_name, "custom_stats": template_stats.custom_stats.copy(), "last_updated": template_stats.last_updated, } else: return {} else: # Return all template stats result = {} for name, stats in self.data.templates.items(): result[name] = { "template_name": stats.template_name, "custom_stats": stats.custom_stats.copy(), "last_updated": stats.last_updated, } return result
[docs] def increment_template_counter( self, template_name: str, counter_name: str, increment: int = 1 ): """ Convenience method to increment a template counter Args: template_name: Name of the template counter_name: Name of the counter to increment increment: Amount to increment by (default: 1) """ return self.submit_template_stat( template_name, counter_name, increment, increment=True )
[docs] def reset_template_stats( self, template_name: Optional[str] = None, stat_name: Optional[str] = None ): """ Reset template statistics Args: template_name: Specific template name, or None for all templates stat_name: Specific stat name, or None for all stats in the template """ if template_name: if template_name in self.data.templates: if stat_name: if stat_name in self.data.templates[template_name].custom_stats: del self.data.templates[template_name].custom_stats[stat_name] logger.info( f"Reset template '{template_name}' stat '{stat_name}'" ) else: self.data.templates[template_name].custom_stats.clear() logger.info(f"Reset all stats for template '{template_name}'") else: # Reset all template stats self.data.templates.clear() logger.info("Reset all template statistics")
def _get_connector_count(self) -> int: """Get the current connector count from connector_manager""" try: from . import connector_manager # Get the global connector manager instance if hasattr(connector_manager, "connector_manager"): return len(connector_manager.connector_manager.connectors) else: # If connector_manager is not initialized, return 0 return 0 except Exception as e: logger.debug(f"Could not get connector count: {e}") return 0 def _get_commands_count(self) -> int: """Get the current commands count from chatbot_manager""" try: from . import chatbot_manager # Get the global chatbot manager instance manager = chatbot_manager.get_manager() # Check if manager has been loaded with data if not hasattr(manager, "commands") or manager.commands is None: logger.debug(" Commands count: 0 (chatbot manager not initialized)") return 0 commands = manager.get_all_commands() count = len(commands) logger.debug(f" Commands count: {count}") return count except Exception as e: logger.error(f"Could not get commands count: {e}", exc_info=True) return 0 def _get_events_count(self) -> int: """Get the current events count from chatbot_manager""" try: from . import chatbot_manager # Get the global chatbot manager instance manager = chatbot_manager.get_manager() # Check if manager has been loaded with data if not hasattr(manager, "events") or manager.events is None: logger.debug(" Events count: 0 (chatbot manager not initialized)") return 0 count = len(manager.get_all_events()) logger.debug(f" Events count: {count}") return count except Exception as e: logger.error(f"Could not get events count: {e}", exc_info=True) return 0 def _get_quotes_count(self) -> int: """Get the current quotes count from chatbot_manager""" try: from . import chatbot_manager # Get the global chatbot manager instance manager = chatbot_manager.get_manager() # Check if manager has been loaded with data if not hasattr(manager, "quotes") or manager.quotes is None: logger.debug(" Quotes count: 0 (chatbot manager not initialized)") return 0 count = len(manager.get_all_quotes()) logger.debug(f" Quotes count: {count}") return count except Exception as e: logger.error(f"Could not get quotes count: {e}", exc_info=True) return 0 # General Statistics Methods
[docs] def get_all_statistics(self) -> Dict[str, Any]: """Get all statistics as a dictionary""" commands_count = self._get_commands_count() events_count = self._get_events_count() quotes_count = self._get_quotes_count() logger.debug( f"get_all_statistics: commands={commands_count}, events={events_count}, quotes={quotes_count}" ) return { "alerts": { "bit_alerts_played": self.data.alerts.bit_alerts_played, "total_bits": self.data.alerts.total_bits, "resubs_played": self.data.alerts.resubs_played, "new_subs_played": self.data.alerts.new_subs_played, "gift_subs_played": self.data.alerts.gift_subs_played, "total_gift_subs": self.data.alerts.total_gift_subs, "follow_alerts_played": self.data.alerts.follow_alerts_played, "raids": self.data.alerts.raids, "point_alerts_redeemed": self.data.alerts.point_alerts_redeemed, "total_channel_points_redeemed": self.data.alerts.total_channel_points_redeemed, }, "hype_trains": { "level_completions": self.data.hype_trains.level_completions.copy(), "total_completions": self.data.hype_trains.total_completions, }, "connectors": { # Get connector count dynamically from connector_manager "connectors_created": self._get_connector_count(), "connectors_triggered": self.data.connectors.connectors_triggered, "total_triggers": self.data.connectors.total_triggers, }, "chatbot": { # Get counts dynamically from chatbot_manager "commands_created": commands_count, "commands_triggered": self.data.chatbot.commands_triggered, "events_created": events_count, "events_triggered": self.data.chatbot.events_triggered, "total_interactions": self.data.chatbot.total_interactions, }, "quotes": { # Get count dynamically from chatbot_manager "quotes_created": quotes_count, "total_quotes_redeemed": self.data.quotes.total_quotes_redeemed, "individual_quote_usage": self.data.quotes.individual_quote_usage, }, "chat": { "twitch_messages_received": self.data.chat.twitch_messages_received, "total_messages": self.data.chat.total_messages, }, "templates": self.get_template_stats(), "session": { "start_time": self.data.session.start_time, "last_save_time": self.data.session.last_save_time, "session_duration": time.time() - self.data.session.start_time, }, }
[docs] def get_statistics_summary(self) -> Dict[str, Any]: """Get a summary of key statistics""" commands_count = self._get_commands_count() events_count = self._get_events_count() quotes_count = self._get_quotes_count() return { "total_alerts": ( self.data.alerts.bit_alerts_played + self.data.alerts.resubs_played + self.data.alerts.new_subs_played + self.data.alerts.gift_subs_played + self.data.alerts.follow_alerts_played + self.data.alerts.raids + self.data.alerts.point_alerts_redeemed ), "total_hype_trains": self.data.hype_trains.total_completions, "total_connectors": self._get_connector_count(), "total_connector_triggers": self.data.connectors.total_triggers, "total_chatbot_items": (commands_count + events_count), "total_chatbot_interactions": self.data.chatbot.total_interactions, "total_quotes": quotes_count, "total_quote_redeems": self.data.quotes.total_quotes_redeemed, "total_chat_messages": self.data.chat.total_messages, "session_duration_hours": (time.time() - self.data.session.start_time) / 3600, }
[docs] def reset_statistics(self, category: Optional[str] = None): """Reset statistics - optionally for specific category only""" if category is None: # Reset all statistics self.data = StatisticsData() logger.info("All statistics reset") elif category == "alerts": self.data.alerts = AlertStatistics() logger.info("Alert statistics reset") elif category == "hype_trains": self.data.hype_trains = HypeTrainStatistics() logger.info("Hype train statistics reset") elif category == "connectors": self.data.connectors = ConnectorStatistics() logger.info("Connector statistics reset") elif category == "chatbot": self.data.chatbot = ChatbotStatistics() logger.info("Chatbot statistics reset") elif category == "quotes": self.data.quotes = QuoteStatistics() logger.info("Quote statistics reset") elif category == "chat": self.data.chat = ChatStatistics() logger.info("Chat statistics reset") # Save the reset data self._save_to_database()
# Periodic Saving Methods def _periodic_save_thread(self): """Background thread for periodic saving""" logger.debug("Periodic save thread started") while self.is_running: try: import time time.sleep(self.save_interval) if self.is_running: # Check again in case we were stopped during sleep logger.debug("Periodic save triggered") self._save_to_database() logger.debug("Periodic statistics save completed") except Exception as e: logger.error(f"Error in periodic save thread: {e}")
[docs] def start_periodic_saving_safe(self): """Start the periodic saving using a background thread""" if self.is_running: logger.warning("Statistics manager periodic saving is already running") return self.is_running = True logger.info("Starting statistics manager periodic saving") # Start the periodic saving thread immediately try: import threading self._periodic_thread = threading.Thread( target=self._periodic_save_thread, daemon=True, name="StatisticsPeriodicSave", ) self._periodic_thread.start() logger.info("Started periodic statistics saving thread") self._needs_periodic_start = False except Exception as e: logger.error(f"Failed to start periodic saving thread: {e}") self.is_running = False self._needs_periodic_start = True
[docs] def ensure_periodic_saving(self): """Ensure the periodic saving thread is running""" if self._needs_periodic_start or ( self.is_running and self._periodic_thread is None ): logger.info("Ensuring periodic saving is running...") self.start_periodic_saving_safe()
[docs] def start_periodic_saving(self): """Start the periodic saving task (deprecated - use start_periodic_saving_safe)""" return self.start_periodic_saving_safe()
[docs] def stop_periodic_saving(self): """Stop the periodic saving thread""" if self.is_running: self.is_running = False if self._periodic_thread and self._periodic_thread.is_alive(): # Thread will stop on next iteration since self.is_running is now False logger.info("Stopping periodic statistics saving thread") self._periodic_thread = None logger.info("Stopped periodic statistics saving")
[docs] def save_on_close(self): """Save statistics on application close""" try: logger.info(" Saving statistics on application close...") self._save_to_database() logger.info(" Statistics successfully saved on application close") except Exception as e: logger.error(f"Failed to save statistics on close: {str(e)}", exc_info=True) # Try one more time with a shorter timeout/context try: logger.info(" Attempting emergency save...") time.sleep(0.1) # Brief pause self._save_to_database() logger.info(" Emergency statistics save succeeded") except Exception as emergency_error: logger.error( f"Emergency statistics save also failed: {str(emergency_error)}", exc_info=True, ) finally: # Close statistics database connections self._close_statistics_db()
[docs] def force_save(self): """Force an immediate save of statistics""" logger.info("Force save requested") self._save_to_database() return self.get_all_statistics()
[docs] def get_user_statistics(self, username: str) -> Dict[str, Any]: """Get all statistics for a specific user""" user_stats = { "username": username, "alerts": {}, "connectors": {}, "chatbot": {}, "quotes": {}, "chat": {}, } # Get user alert statistics if username in self.data.alerts.user_stats: alert_stats = self.data.alerts.user_stats[username] user_stats["alerts"] = { "bit_alerts_played": alert_stats.bit_alerts_played, "resubs_played": alert_stats.resubs_played, "new_subs_played": alert_stats.new_subs_played, "gift_subs_played": alert_stats.gift_subs_played, "follow_alerts_played": alert_stats.follow_alerts_played, "raids": alert_stats.raids, "point_alerts_redeemed": alert_stats.point_alerts_redeemed, "donations": alert_stats.donations, "total_alerts": alert_stats.total_alerts, "first_seen": alert_stats.first_seen, "last_seen": alert_stats.last_seen, } # Get user connector statistics if username in self.data.connectors.user_stats: conn_stats = self.data.connectors.user_stats[username] user_stats["connectors"] = { "connectors_triggered": conn_stats.connectors_triggered, "total_triggers": conn_stats.total_triggers, "first_seen": conn_stats.first_seen, "last_seen": conn_stats.last_seen, } # Get user chatbot statistics if username in self.data.chatbot.user_stats: bot_stats = self.data.chatbot.user_stats[username] user_stats["chatbot"] = { "commands_triggered": bot_stats.commands_triggered, "events_triggered": bot_stats.events_triggered, "total_interactions": bot_stats.total_interactions, "first_seen": bot_stats.first_seen, "last_seen": bot_stats.last_seen, } # Get user quote statistics if username in self.data.quotes.user_stats: quote_stats = self.data.quotes.user_stats[username] user_stats["quotes"] = { "total_quotes_redeemed": quote_stats.total_quotes_redeemed, "individual_quote_usage": quote_stats.individual_quote_usage.copy(), "first_seen": quote_stats.first_seen, "last_seen": quote_stats.last_seen, } # Get user chat statistics if username in self.data.chat.user_stats: chat_stats = self.data.chat.user_stats[username] user_stats["chat"] = { "twitch_messages_received": chat_stats.twitch_messages_received, "total_messages": chat_stats.total_messages, "first_seen": chat_stats.first_seen, "last_seen": chat_stats.last_seen, } return user_stats
[docs] def get_top_users_by_statistic( self, stat_type: str, limit: int = 10 ) -> List[Dict[str, Any]]: """Get top users ranked by a specific statistic Args: stat_type: Type of statistic (e.g., 'total_alerts', 'bits_donated', 'quotes_redeemed') limit: Maximum number of users to return Returns: List of dictionaries with username and stat value, sorted by value descending """ users = [] if stat_type == "total_alerts": for username, stats in self.data.alerts.user_stats.items(): users.append( { "username": username, "value": stats.total_alerts, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) elif stat_type == "bits_donated": for username, stats in self.data.alerts.user_stats.items(): users.append( { "username": username, "value": stats.bit_alerts_played, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) elif stat_type == "subs_gifted": for username, stats in self.data.alerts.user_stats.items(): users.append( { "username": username, "value": stats.gift_subs_played, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) elif stat_type == "donations": for username, stats in self.data.alerts.user_stats.items(): users.append( { "username": username, "value": stats.donations, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) elif stat_type == "points_redeemed": for username, stats in self.data.alerts.user_stats.items(): users.append( { "username": username, "value": stats.point_alerts_redeemed, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) elif stat_type == "quotes_redeemed": for username, stats in self.data.quotes.user_stats.items(): users.append( { "username": username, "value": stats.total_quotes_redeemed, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) elif stat_type == "chat_messages": for username, stats in self.data.chat.user_stats.items(): users.append( { "username": username, "value": stats.total_messages, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) elif stat_type == "connector_triggers": for username, stats in self.data.connectors.user_stats.items(): users.append( { "username": username, "value": stats.total_triggers, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) elif stat_type == "chatbot_interactions": for username, stats in self.data.chatbot.user_stats.items(): users.append( { "username": username, "value": stats.total_interactions, "first_seen": stats.first_seen, "last_seen": stats.last_seen, } ) # Sort by value descending and return top limit users.sort(key=lambda x: x["value"], reverse=True) return users[:limit]
[docs] def get_recent_users(self, stat_type: str, limit: int = 10) -> List[Dict[str, Any]]: """Get most recent users for a specific statistic type Args: stat_type: Type of statistic (e.g., 'alerts', 'chat', 'connectors') limit: Maximum number of users to return Returns: List of dictionaries with username and last_seen timestamp, sorted by recency """ users = [] if stat_type == "alerts": for username, stats in self.data.alerts.user_stats.items(): users.append( { "username": username, "last_seen": stats.last_seen, "first_seen": stats.first_seen, } ) elif stat_type == "chat": for username, stats in self.data.chat.user_stats.items(): users.append( { "username": username, "last_seen": stats.last_seen, "first_seen": stats.first_seen, } ) elif stat_type == "connectors": for username, stats in self.data.connectors.user_stats.items(): users.append( { "username": username, "last_seen": stats.last_seen, "first_seen": stats.first_seen, } ) elif stat_type == "quotes": for username, stats in self.data.quotes.user_stats.items(): users.append( { "username": username, "last_seen": stats.last_seen, "first_seen": stats.first_seen, } ) elif stat_type == "chatbot": for username, stats in self.data.chatbot.user_stats.items(): users.append( { "username": username, "last_seen": stats.last_seen, "first_seen": stats.first_seen, } ) # Sort by last_seen descending (most recent first) users.sort(key=lambda x: x["last_seen"], reverse=True) return users[:limit]
[docs] def get_user_activity_summary(self, username: str) -> Dict[str, Any]: """Get a summary of user activity across all categories""" user_stats = self.get_user_statistics(username) summary = { "username": username, "total_activity_score": 0, "categories_active": 0, "most_recent_activity": 0, "first_seen": float("inf"), "last_seen": 0, } # Calculate activity score and find most recent activity for category, stats in user_stats.items(): if category == "username": continue if isinstance(stats, dict) and stats: summary["categories_active"] += 1 # Add to activity score based on different metrics if category == "alerts": summary["total_activity_score"] += stats.get("total_alerts", 0) * 10 elif category == "chat": summary["total_activity_score"] += ( stats.get("total_messages", 0) * 1 ) elif category == "quotes": summary["total_activity_score"] += ( stats.get("total_quotes_redeemed", 0) * 5 ) elif category == "connectors": summary["total_activity_score"] += ( stats.get("total_triggers", 0) * 3 ) elif category == "chatbot": summary["total_activity_score"] += ( stats.get("total_interactions", 0) * 2 ) # Track timestamps if "last_seen" in stats: summary["most_recent_activity"] = max( summary["most_recent_activity"], stats["last_seen"] ) if "first_seen" in stats: summary["first_seen"] = min( summary["first_seen"], stats["first_seen"] ) # Handle case where no activity found if summary["first_seen"] == float("inf"): summary["first_seen"] = 0 return summary
[docs] def get_top_commands(self, limit: int = 10) -> List[Dict[str, Any]]: """Get top commands by usage count""" commands = [] for command_name, stats in self.data.chatbot.command_stats.items(): commands.append( { "command_name": command_name, "usage_count": stats.usage_count, "first_used": stats.first_used, "last_used": stats.last_used, } ) commands.sort(key=lambda x: x["usage_count"], reverse=True) return commands[:limit]
[docs] def get_top_events(self, limit: int = 10) -> List[Dict[str, Any]]: """Get top events by trigger count""" events = [] for event_name, stats in self.data.chatbot.event_stats.items(): events.append( { "event_name": event_name, "trigger_count": stats.trigger_count, "first_triggered": stats.first_triggered, "last_triggered": stats.last_triggered, } ) events.sort(key=lambda x: x["trigger_count"], reverse=True) return events[:limit]
[docs] def get_top_connectors(self, limit: int = 10) -> List[Dict[str, Any]]: """Get top connectors by trigger count""" connectors = [] for connector_name, stats in self.data.connectors.connector_stats.items(): connectors.append( { "connector_name": connector_name, "trigger_count": stats.trigger_count, "first_triggered": stats.first_triggered, "last_triggered": stats.last_triggered, } ) connectors.sort(key=lambda x: x["trigger_count"], reverse=True) return connectors[:limit]
[docs] def get_top_alert_types(self, limit: int = 10) -> List[Dict[str, Any]]: """Get top alert types by trigger count""" alert_types = [] for alert_type, stats in self.data.alerts.alert_type_stats.items(): alert_types.append( { "alert_type": alert_type, "alert_name": stats.alert_name, "trigger_count": stats.trigger_count, "first_triggered": stats.first_triggered, "last_triggered": stats.last_triggered, } ) alert_types.sort(key=lambda x: x["trigger_count"], reverse=True) return alert_types[:limit]
[docs] def get_top_users_for_command( self, command_name: str, limit: int = 10 ) -> List[Dict[str, Any]]: """Get top users who used a specific command the most""" users = [] for username, stats in self.data.chatbot.user_command_stats.items(): if command_name in stats.command_usage: users.append( { "username": username, "usage_count": stats.command_usage[command_name], "first_used": stats.first_seen, "last_used": stats.last_seen, } ) users.sort(key=lambda x: x["usage_count"], reverse=True) return users[:limit]
[docs] def get_top_users_for_event( self, event_name: str, limit: int = 10 ) -> List[Dict[str, Any]]: """Get top users who triggered a specific event the most""" users = [] for username, stats in self.data.chatbot.user_event_stats.items(): if event_name in stats.event_usage: users.append( { "username": username, "trigger_count": stats.event_usage[event_name], "first_triggered": stats.first_seen, "last_triggered": stats.last_seen, } ) users.sort(key=lambda x: x["trigger_count"], reverse=True) return users[:limit]
[docs] def get_top_users_for_connector( self, connector_name: str, limit: int = 10 ) -> List[Dict[str, Any]]: """Get top users who triggered a specific connector the most""" users = [] for username, stats in self.data.connectors.user_connector_stats.items(): if connector_name in stats.connector_usage: users.append( { "username": username, "trigger_count": stats.connector_usage[connector_name], "first_triggered": stats.first_seen, "last_triggered": stats.last_seen, } ) users.sort(key=lambda x: x["trigger_count"], reverse=True) return users[:limit]
[docs] def get_top_users_for_quote( self, quote_id: str, limit: int = 10 ) -> List[Dict[str, Any]]: """Get top users who redeemed a specific quote the most""" users = [] for username, stats in self.data.quotes.user_stats.items(): if quote_id in stats.individual_quote_usage: users.append( { "username": username, "usage_count": stats.individual_quote_usage[quote_id], "first_used": stats.first_seen, "last_used": stats.last_seen, } ) users.sort(key=lambda x: x["usage_count"], reverse=True) return users[:limit]
[docs] def get_user_individual_usage(self, username: str) -> Dict[str, Any]: """Get detailed individual item usage for a specific user""" usage = { "username": username, "commands": {}, "events": {}, "connectors": {}, "quotes": {}, } # Get user's command usage if username in self.data.chatbot.user_command_stats: usage["commands"] = self.data.chatbot.user_command_stats[ username ].command_usage.copy() # Get user's event usage if username in self.data.chatbot.user_event_stats: usage["events"] = self.data.chatbot.user_event_stats[ username ].event_usage.copy() # Get user's connector usage if username in self.data.connectors.user_connector_stats: usage["connectors"] = self.data.connectors.user_connector_stats[ username ].connector_usage.copy() # Get user's quote usage if username in self.data.quotes.user_stats: usage["quotes"] = self.data.quotes.user_stats[ username ].individual_quote_usage.copy() return usage
[docs] def get_saving_status(self): """Get the current status of periodic saving""" thread_alive = ( self._periodic_thread is not None and self._periodic_thread.is_alive() ) return { "is_running": self.is_running, "has_thread": thread_alive, "needs_start": self._needs_periodic_start, "save_interval": self.save_interval, "last_save_time": self.data.session.last_save_time if hasattr(self.data.session, "last_save_time") else None, }
[docs] def check_data_integrity(self) -> List[str]: """Check data integrity and return list of issues found""" issues = [] # Check hype train level_completions if not isinstance(self.data.hype_trains.level_completions, dict): issues.append( f"hype_trains.level_completions is {type(self.data.hype_trains.level_completions)} " f"instead of dict: {self.data.hype_trains.level_completions}" ) else: # Check that all keys are integers and values are integers for k, v in self.data.hype_trains.level_completions.items(): if not isinstance(k, int): issues.append( f"hype_trains.level_completions key {k} is {type(k)} instead of int" ) if not isinstance(v, int): issues.append( f"hype_trains.level_completions[{k}] value {v} is {type(v)} instead of int" ) return issues
[docs] def repair_data_integrity(self) -> bool: """Attempt to repair data integrity issues. Returns True if repairs were made.""" repairs_made = False # Fix hype train level_completions if not isinstance(self.data.hype_trains.level_completions, dict): logger.warning( f"Repairing hype_trains.level_completions: was {type(self.data.hype_trains.level_completions)}, " f"setting to empty dict" ) self.data.hype_trains.level_completions = {} repairs_made = True # Clean up non-integer keys/values if isinstance(self.data.hype_trains.level_completions, dict): cleaned = {} for k, v in self.data.hype_trains.level_completions.items(): try: int_k = int(k) int_v = int(v) cleaned[int_k] = int_v except (ValueError, TypeError): logger.warning( f"Removing invalid hype_trains.level_completions entry: {k}={v}" ) repairs_made = True if len(cleaned) != len(self.data.hype_trains.level_completions): self.data.hype_trains.level_completions = cleaned repairs_made = True if repairs_made: logger.info("Data integrity repairs completed") # Save the repaired data self.save_statistics() return repairs_made
[docs] def set_save_interval(self, interval_seconds: int): """Set the periodic save interval""" self.save_interval = interval_seconds logger.info(f"Statistics save interval set to {interval_seconds} seconds")
# Global instance _statistics_manager = None
[docs] def get_statistics_manager() -> StatisticsManager: """Get the global statistics manager instance""" global _statistics_manager if _statistics_manager is None: _statistics_manager = StatisticsManager() return _statistics_manager
[docs] def initialize_statistics(): """Initialize the statistics manager""" manager = get_statistics_manager() # Don't start periodic saving here - it will be started when event loop is available return manager
[docs] def initialize_statistics_with_data(all_data: Dict[str, Any]): """Initialize the statistics manager with pre-loaded data""" manager = get_statistics_manager() manager.initialize_with_data(all_data) # Don't start periodic saving here - it will be started when event loop is available return manager
[docs] def start_statistics_saving(): """Start periodic saving for the statistics manager (call after event loop is running)""" try: manager = get_statistics_manager() manager.start_periodic_saving_safe() logger.info("Statistics periodic saving started successfully") return manager except Exception as e: logger.error(f"Failed to start statistics saving: {e}") return None
[docs] def shutdown_statistics(): """Shutdown the statistics manager""" global _statistics_manager print(" shutdown_statistics() called") if _statistics_manager: print(" Statistics manager found, shutting down...") try: _statistics_manager.stop_periodic_saving() _statistics_manager.save_on_close() _statistics_manager = None print(" Statistics manager shutdown complete") except Exception as e: print(f" Error during statistics manager shutdown: {str(e)}") else: print(" No statistics manager to shutdown")