major-domo-legacy/cogs/fun.py
Cal Corum 9770e360c3 WIP: uncommitted local changes before archival
- Modified cogs/dice.py, cogs/fun.py, db_calls.py
- Added COMMAND_LIST.md, api_calls/custom_command.py, sba_is_fun.db

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 14:11:58 -05:00

556 lines
24 KiB
Python

import copy
import math
from helpers import *
from api_calls.current import get_current
from peewee import *
import discord
from discord import app_commands
from datetime import datetime, timedelta
from discord.ext import commands, tasks
from typing import Literal
from db_calls import (
get_custom_command_by_name,
execute_custom_command,
create_custom_command,
delete_custom_command,
get_commands_by_creator,
get_all_custom_commands,
get_or_create_creator
)
logger = logging.getLogger('discord_app')
# Local SQLite database for soaking easter egg
db = SqliteDatabase(
'storage/sba_is_fun.db',
pragmas={
'journal_mode': 'wal',
'cache_size': -1 * 64000,
'synchronous': 0
}
)
class Soaks(Model):
user = IntegerField()
message_id = IntegerField()
timestamp = DateTimeField()
class Meta:
database = db
class Roles(Model):
name = CharField(unique=True)
enabled = BooleanField(default=True)
class Meta:
database = db
class Fun(commands.Cog):
def __init__(self, bot):
self.bot = bot
# Create tables for soaking easter egg (kept separate from custom commands)
db.create_tables([Soaks, Roles])
db.close()
self.daily_check.start()
@tasks.loop(hours=20)
async def daily_check(self):
try:
guild = self.bot.get_guild(int(os.environ.get('GUILD_ID')))
if not guild:
await asyncio.sleep(15)
guild = self.bot.get_guild(int(os.environ.get('GUILD_ID')))
if not guild:
logger.error(f'Fun cog could not access guild')
return
except Exception as e:
logger.error(f'Could not run daily_check: {e}')
return
if guild.id != 613880856032968834:
logger.info(f'Not checking CCs outside of SBA server')
return
# Get commands eligible for deletion (90+ days unused)
ninety_days_ago = (datetime.now() - timedelta(days=90)).isoformat()
sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat()
try:
# Get all commands to check for cleanup
all_commands_resp = await get_all_custom_commands(page=1, page_size=1000)
if not all_commands_resp:
logger.info('No custom commands found for cleanup check')
return
all_commands = all_commands_resp.get('custom_commands', [])
# {discord_id: {'member': <discord member>, 'commands': [(name, content)]}}
del_notifs = {}
del_counter = 0
warn_notifs = {}
now = datetime.now()
for cmd in all_commands:
# Parse last_used datetime
try:
last_used = datetime.fromisoformat(cmd['last_used']) if cmd.get('last_used') else now
except:
last_used = now
# Final check / deleted (90+ days)
if last_used + timedelta(days=90) < now:
logger.warning(f'Deleting `!cc {cmd["name"]}`')
creator_discord_id = cmd['creator']['discord_id'] if cmd.get('creator') else None
if creator_discord_id:
owner = guild.get_member(creator_discord_id)
if owner:
if owner.id not in del_notifs:
del_notifs[owner.id] = {'member': owner, 'commands': [(cmd['name'], cmd['content'])]}
else:
del_notifs[owner.id]['commands'].append((cmd['name'], cmd['content']))
await update_custom_command(cmd['id'], {'active': False})
del_counter += 1
# Warning (60+ days, not warned yet)
elif last_used + timedelta(days=60) < now and not cmd.get('warning_sent', False):
logger.warning(f'Warning for `!cc {cmd["name"]}`')
creator_discord_id = cmd['creator']['discord_id'] if cmd.get('creator') else None
if creator_discord_id:
owner = guild.get_member(creator_discord_id)
if owner:
if owner.id not in warn_notifs:
warn_notifs[owner.id] = {'member': owner, 'commands': [(cmd['name'], cmd['content'])]}
else:
warn_notifs[owner.id]['commands'].append((cmd['name'], cmd['content']))
# Mark warning as sent
from db_calls import update_custom_command
await update_custom_command(cmd['id'], {'warning_sent': True})
logger.info(f'deletions: {del_notifs}\nwarnings: {warn_notifs}')
# Send deletion notifications
for member_id in del_notifs:
plural = len(del_notifs[member_id]["commands"]) > 1
msg_content = f'Yo, it\'s cleanup time. I am deleting the following custom ' \
f'command{"s" if plural else ""}:\n\n'
short_msg_content = copy.deepcopy(msg_content)
for x in del_notifs[member_id]["commands"]:
msg_content += f'`!cc {x[0]}` - {x[1]}\n'
short_msg_content += f'`!cc {x[0]}`\n'
try:
await del_notifs[member_id]['member'].send(msg_content)
except Exception as e:
logger.error(f'fun daily_check - could not send deletion message to {del_notifs[member_id]["member"]} '
f'/ trying short_msg')
try:
await del_notifs[member_id]['member'].send(short_msg_content)
except Exception as e:
logger.error(f'fun daily_check - still could not send deletion message')
# Send warning notifications
for member_id in warn_notifs:
plural = len(warn_notifs[member_id]["commands"]) > 1
msg_content = f'Heads up, the following custom ' \
f'command{"s" if plural else ""} will be deleted next month if ' \
f'{"they are" if plural else "it is"} not used:\n\n'
short_msg_content = copy.deepcopy(msg_content)
for x in warn_notifs[member_id]["commands"]:
msg_content += f'`!cc {x[0]}` - {x[1]}\n'
short_msg_content += f'`!cc {x[0]}`\n'
try:
await warn_notifs[member_id]['member'].send(msg_content)
except Exception as e:
logger.error(f'fun daily_check - could not send warn message to {warn_notifs[member_id]["member"]} '
f'/ trying short_msg')
try:
await warn_notifs[member_id]['member'].send(short_msg_content)
except Exception as e:
logger.error(f'fun daily_check - still could not send warn message')
logger.info(f'Deleted {del_counter} commands; sent deletion notifications to {len(del_notifs)} users; '
f'sent warnings to {len(warn_notifs)} users')
except Exception as e:
logger.error(f'Error during daily_check: {e}', exc_info=True)
async def cog_command_error(self, ctx, error):
logger.error(msg=error, stack_info=True, exc_info=True)
await ctx.send(f'{error}\n\nRun !help <command_name> to see the command requirements')
async def slash_error(self, ctx, error):
logger.error(msg=error, stack_info=True, exc_info=True)
await ctx.send(f'{error[:1600]}')
@commands.Cog.listener(name='on_message')
async def on_message_listener(self, message):
if message.author.bot or message.channel.guild.id != int(os.environ.get('GUILD_ID')) \
or message.content[:1] == '!':
return
tm = message.content.lower()
if 'soak' in tm or 'soaking' in tm:
squery = Soaks.select().order_by(-Soaks.id).limit(1)
if squery.count() > 0:
last_soak = squery[0]
else:
last_soak = None
new_soak = Soaks.insert(
{'user': message.author.id, 'message_id': message.id, 'timestamp': datetime.now()}
).execute()
db.close()
if last_soak:
time_since = datetime.now() - last_soak.timestamp
# logger.info(f'time_since: {time_since} / seconds: {time_since.seconds} / days: {time_since.days}')
gif_search = None
if time_since.days >= 2:
ts_string = f'{time_since.days} days'
if time_since.days > 30:
gif_search = 'elite'
elif time_since.days > 14:
gif_search = 'pretty good'
else:
if time_since.seconds >= 7200:
ts_string = f'{time_since.seconds // 3600} hours'
gif_search = 'whats wrong with you'
else:
if time_since.seconds >= 120:
ts_string = f'{time_since.seconds // 60} minutes'
else:
ts_string = f'{time_since.seconds} seconds'
gif_search = 'pathetic'
await message.channel.send(
f'It has been {ts_string} since soaking was mentioned.'
)
if gif_search is not None:
try:
await message.channel.send(random_gif(gif_search))
except Exception as e:
logger.error(e)
@commands.command(name='cc', help='Run custom command')
async def custom_command(self, ctx, command):
try:
# Execute the command (updates usage stats automatically)
result = await execute_custom_command(command.lower())
if not result:
# Kermit lost gif
await ctx.send('https://tenor.com/6saQ.gif')
return
# Special easter egg for prettyrainbow command
if result['name'] == 'prettyrainbow' and ctx.author.id == 291738770313707521:
await ctx.send(random_no_phrase())
return
await ctx.send(result['content'])
except Exception as e:
logger.error(f'Error executing custom command {command}: {e}')
# Kermit lost gif
await ctx.send('https://tenor.com/6saQ.gif')
@commands.command(name='about', help='Who made the custom command')
async def about_command(self, ctx, command):
try:
result = await get_custom_command_by_name(command.lower())
if not result:
await ctx.send('https://tenor.com/blQnd.gif')
return
creator_name = result['creator']['username'] if result.get('creator') else 'Unknown'
created_at = result.get('created_at', 'Unknown')
embed = discord.Embed(title=f'About {result["name"].title()}', color=0xFFFF00)
embed.add_field(name=f'Creator', value=creator_name, inline=False)
embed.add_field(name='Creation Date', value=created_at, inline=False)
embed.add_field(name='Message', value=result['content'], inline=False)
await ctx.send(content=None, embed=embed)
except Exception as e:
logger.error(f'Error getting command info: {e}')
await ctx.send('https://tenor.com/blQnd.gif')
@commands.command(name='newcc', help='Create a new custom command')
@commands.has_any_role(SBA_PLAYERS_ROLE_NAME, 'Paper Dynasty Players')
async def new_custom_command(self, ctx, name, *, message):
try:
command_name = name.lower().strip()
command_content = message.strip()
# Check if command already exists
existing = await get_custom_command_by_name(command_name)
if existing:
await ctx.send('There is already a command with that name!')
return
# Show preview
embed = discord.Embed(title='Is this what you want?', color=0x91329F)
embed.add_field(name='Command Name', value=command_name, inline=False)
embed.add_field(name='Message', value=command_content, inline=False)
await ctx.send(content=None, embed=embed)
view = Confirm(responders=[ctx.author])
question = await ctx.send('Should I create this for you?', view=view)
await view.wait()
if not view.value:
await question.edit(content='You keep thinking on it.', view=None)
return
# Get or create creator
creator = await get_or_create_creator(
discord_id=ctx.author.id,
username=ctx.author.name,
display_name=ctx.author.display_name
)
# Create command
command_data = {
'name': command_name,
'content': command_content,
'creator_id': creator['id']
}
result = await create_custom_command(command_data)
if result:
await question.edit(content=f'`!cc {command_name}` is now a thing!', view=None)
else:
await question.edit(content='Hmm...I couldn\'t add that. I might need a grown up to help.', view=None)
except Exception as e:
logger.error(f'Error creating custom command: {e}', exc_info=True)
await ctx.send('Something went wrong creating that command. Try again or ask an admin for help.')
@commands.command(name='delcc', help='Delete a custom command')
@commands.has_any_role(SBA_PLAYERS_ROLE_NAME, 'Paper Dynasty Players')
async def delete_custom_command_cmd(self, ctx, name):
try:
command_name = name.lower().strip()
this_command = await get_custom_command_by_name(command_name)
if not this_command:
await ctx.send('I couldn\'t find that command, sorry.')
return
# Check ownership
creator_discord_id = this_command['creator']['discord_id'] if this_command.get('creator') else None
if creator_discord_id != ctx.author.id and ctx.author.id != self.bot.owner_id:
await ctx.send('Looks like this isn\'t your command to delete.')
return
embed = discord.Embed(title='Do you want to delete this command?', color=0x91329F)
embed.add_field(name='Command Name', value=this_command['name'], inline=False)
embed.add_field(name='Message', value=this_command['content'], inline=False)
view = Confirm(responders=[ctx.author])
question = await ctx.send(content=None, embed=embed, view=view)
await view.wait()
if not view.value:
await question.edit(content='It stays for now.', view=None)
return
result = await delete_custom_command(this_command['id'])
if result:
await question.edit(view=None)
await ctx.send('He gone!')
else:
await ctx.send('Welp. That didn\'t work. Go complain to an adult, I guess.')
except Exception as e:
logger.error(f'Error deleting custom command: {e}', exc_info=True)
await ctx.send('Something went wrong deleting that command.')
@commands.command(name='allcc', help='Show all custom commands')
async def show_custom_commands(self, ctx, page=1):
try:
def get_embed(this_page, result_data):
this_embed = discord.Embed(title=f'All Custom Commands', color=0x2F939F)
column_one = ''
column_two = ''
commands_list = result_data.get('custom_commands', [])
# First 20 commands in first column
for x in range(min(20, len(commands_list))):
try:
cmd = commands_list[x]
creator_name = cmd['creator']['username'] if cmd.get('creator') else 'Unknown'
column_one += f'**{cmd["name"]}** by {creator_name}\n'
except Exception as e:
logger.error(f'Error building !allcc embed: {e}')
break
if column_one:
this_embed.add_field(
name=f'{(this_page - 1) * 40 + 1}-{min(this_page * 40 - 20, result_data["total_count"])}',
value=column_one
)
# Next 20 commands in second column
for x in range(20, min(40, len(commands_list))):
try:
cmd = commands_list[x]
creator_name = cmd['creator']['username'] if cmd.get('creator') else 'Unknown'
column_two += f'**{cmd["name"]}** by {creator_name}\n'
except Exception as e:
logger.error(f'Error building !allcc embed: {e}')
break
if column_two:
this_embed.add_field(
name=f'{(this_page - 1) * 40 + 21}-{min(this_page * 40, result_data["total_count"])}',
value=column_two
)
return this_embed
page_num = page
result = await get_all_custom_commands(page=page_num, page_size=40)
if not result:
await ctx.send('No custom commands found!')
return
total_count = result.get('total_count', 0)
last_page = result.get('total_pages', 1)
if page_num > last_page:
await ctx.send(f'The max page number is {last_page}; going there now!')
page_num = last_page
result = await get_all_custom_commands(page=page_num, page_size=40)
embed = get_embed(page_num, result)
embed.description = f'Page {page_num} / {last_page}'
view = Pagination(responders=[ctx.author])
resp_message = await ctx.send(content=None, embed=embed, view=view)
while True:
await view.wait()
if view.value:
logger.info(f'got a value: {view.value}')
if view.value == 'left':
page_num = page_num - 1 if page_num > 1 else last_page
elif view.value == 'right':
page_num = page_num + 1 if page_num < last_page else 1
elif view.value == 'cancel':
await resp_message.edit(content=None, embed=embed, view=None)
break
view.value = None
else:
await resp_message.edit(content=None, embed=embed, view=None)
break
# Get new page data
result = await get_all_custom_commands(page=page_num, page_size=40)
embed = get_embed(page_num, result)
embed.description = f'Page {page_num} / {last_page}'
view = Pagination(responders=[ctx.author])
await resp_message.edit(content=None, embed=embed, view=view)
except Exception as e:
logger.error(f'Error showing all custom commands: {e}', exc_info=True)
await ctx.send('Something went wrong fetching the command list.')
@commands.command(name='mycc', aliases=['showcc'], help='Show my commands')
@commands.has_any_role(SBA_PLAYERS_ROLE_NAME, 'Paper Dynasty Players')
async def my_custom_commands(self, ctx):
try:
result = await get_commands_by_creator(discord_id=ctx.author.id, page=1, page_size=100)
if not result or result.get('total_count', 0) == 0:
await ctx.send('It doesn\'t look like you\'ve created any custom commands. Try it out by running '
'!help newcc for the command syntax!')
return
commands_list = result.get('custom_commands', [])
comm_message = ''
for cmd in commands_list:
comm_message += f'{cmd["name"]}\n'
embed = discord.Embed(title=f'{ctx.author.name}\'s Commands', color=0x2F939F)
embed.add_field(name=f'Command Names', value=comm_message if comm_message else 'None', inline=False)
await ctx.send(content=None, embed=embed)
except Exception as e:
logger.error(f'Error showing user commands: {e}', exc_info=True)
await ctx.send('Something went wrong fetching your commands.')
@commands.command(name='lastsoak', aliases=['ls'], help='Get a link to the last mention of soaking')
async def last_soak_command(self, ctx):
squery = Soaks.select().order_by(-Soaks.id).limit(1)
if squery.count() > 0:
last_soak = squery[0]
else:
await ctx.send(f'I could not find the last mention of soaking.')
db.close()
return
message = await ctx.fetch_message(last_soak.message_id)
await ctx.send(f'The last mention of soaking was: {message.jump_url}')
db.close()
@app_commands.command(name='woulditdong', description='Log a dinger to see would it dong across SBa')
@app_commands.checks.has_any_role(SBA_PLAYERS_ROLE_NAME)
async def would_it_dong_slash(
self, interaction: discord.Interaction, batter_name: str, pitcher_name: str,
day_or_night: Literal['day', 'night'] = 'night',
result: Literal['no-doubt', 'bp-homerun', 'bp-flyout'] = 'bp-homerun', d20: int = None):
await interaction.response.defer()
current = await get_current()
team = await get_team_by_owner(current.season, interaction.user.id)
result_text = 'Home Run'
if result == 'bp-flyout':
result_text = 'Fly Out'
season = 'fall'
if current.week < 6:
season = 'spring'
elif current.week < 17:
season = 'summer'
hr_count = 16
if result in ['bp-homerun', 'bp-flyout']:
# Check ballpark table for ballpark count
hr_count = random.randint(1, 15)
proj_distance = 369
dong_text = f'Result: {result_text}\n\n' \
f'Season: {season.title()}\n' \
f'Time of Day: {day_or_night.title()}\n' \
f'D20: {d20 if d20 is not None else "N/A"}\n' \
f'Proj. dist: {proj_distance} ft\n\n' \
f'This would have been a home run in {hr_count}/16 SBa ballparks.'
embed = get_team_embed(f'{batter_name.title()} vs {pitcher_name.title()}', team, thumbnail=False)
embed.set_author(name='Would it Dong?', icon_url=team['thumbnail'])
embed.add_field(name='** **', value=dong_text)
await send_to_channel(self.bot, 'news-ticker', content=None, embed=embed)
await interaction.edit_original_response(content=None, embed=embed)
async def setup(bot):
await bot.add_cog(Fun(bot))