paper-dynasty-discord/cogs/economy/notifications.py
2025-08-17 08:46:55 -05:00

171 lines
6.7 KiB
Python

# Economy Notifications Module
# Handles automated notification processing for market changes and rare pulls
import copy
import logging
from discord.ext import commands, tasks
from typing import Dict, List, Any
# Import specific utilities needed by this module
import discord
from api_calls import db_get, db_patch
from helpers.discord_utils import send_to_channel, get_team_embed
logger = logging.getLogger('discord_app')
class Notifications(commands.Cog):
"""Handles automated notification processing for Paper Dynasty."""
def __init__(self, bot):
self.bot = bot
# Set up app command error handler
bot.tree.on_error = self.on_app_command_error
# Start the notification checking loop
self.notif_check.start()
async def cog_unload(self):
"""Clean up when the cog is unloaded."""
self.notif_check.cancel()
async def on_app_command_error(self, interaction: discord.Interaction, error: discord.app_commands.AppCommandError):
"""Handle app command errors by sending them to the channel."""
await interaction.channel.send(f'{error}')
@tasks.loop(minutes=10)
async def notif_check(self):
"""Check for and process pending notifications every 10 minutes."""
try:
# Check for notifications
all_notifs = await db_get('notifs', params=[('ack', False)])
if not all_notifs:
logger.debug('No notifications found')
return
# Define notification topics and their configurations
topics = {
'Price Change': {
'channel_name': 'pd-market-watch',
'desc': 'Modified by buying and selling',
'notifs': []
},
'Rare Pull': {
'channel_name': 'pd-network-news',
'desc': 'MVP and All-Star cards pulled from packs',
'notifs': []
}
}
# Categorize notifications by topic
for line in all_notifs['notifs']:
if line['title'] in topics:
topics[line['title']]['notifs'].append(line)
logger.info(f'Processing notification topics: {topics}')
# Process each topic
for topic in topics:
await self._process_notification_topic(topic, topics[topic])
except Exception as e:
logger.error(f'Error in notif_check: {e}')
# Send error to commissioners-office for debugging
try:
await send_to_channel(
self.bot,
'commissioners-office',
f'Error in notification processing: {e}'
)
except Exception as channel_error:
logger.error(f'Failed to send error notification: {channel_error}')
async def _process_notification_topic(self, topic_name: str, topic_data: Dict[str, Any]):
"""Process notifications for a specific topic."""
if not topic_data['notifs']:
return
# Create base embed for this topic
embed = get_team_embed(title=f'{topic_name}{"s" if len(topic_data["notifs"]) > 1 else ""}')
embed.description = topic_data['desc']
# Group notifications by field_name to avoid duplicates
notification_groups = self._group_notifications(topic_data['notifs'])
# Send notifications in batches (Discord embed limit is 25 fields)
await self._send_notification_batches(
topic_data['channel_name'],
embed,
notification_groups
)
def _group_notifications(self, notifications: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""Group notifications by field_name to consolidate duplicates."""
notification_groups = {}
for notification in notifications:
field_name = notification['field_name']
if field_name not in notification_groups:
notification_groups[field_name] = {
'field_name': field_name,
'message': notification['message'],
'count': 1,
'ids': [notification['id']]
}
else:
# Update message (use latest) and increment count
notification_groups[field_name]['message'] = notification['message']
notification_groups[field_name]['count'] += 1
notification_groups[field_name]['ids'].append(notification['id'])
return notification_groups
async def _send_notification_batches(self, channel_name: str, base_embed: discord.Embed,
notification_groups: Dict[str, Dict[str, Any]]):
"""Send notifications in batches, respecting Discord's 25 field limit per embed."""
if not notification_groups:
return
current_embed = copy.deepcopy(base_embed)
field_counter = 0
for group_key, group_data in notification_groups.items():
# If we've hit the 25 field limit, send current embed and start a new one
if field_counter >= 25:
await send_to_channel(self.bot, channel_name, embed=current_embed)
current_embed = copy.deepcopy(base_embed)
field_counter = 0
# Add field to current embed
current_embed.add_field(
name=group_data['field_name'],
value=group_data['message'],
inline=False
)
field_counter += 1
# Mark all related notifications as acknowledged
for notif_id in group_data['ids']:
try:
await db_patch('notifs', object_id=notif_id, params=[('ack', True)])
logger.debug(f'Acknowledged notification {notif_id}')
except Exception as e:
logger.error(f'Failed to acknowledge notification {notif_id}: {e}')
# Send the final embed if it has any fields
if field_counter > 0:
await send_to_channel(self.bot, channel_name, embed=current_embed)
logger.info(f'Sent {field_counter} notifications to {channel_name}')
@notif_check.before_loop
async def before_notif_check(self):
"""Wait for the bot to be ready before starting the notification loop."""
await self.bot.wait_until_ready()
logger.info('Notification checking loop started')
async def setup(bot):
"""Setup function for the Notifications cog."""
await bot.add_cog(Notifications(bot))