565 lines
23 KiB
Python
565 lines
23 KiB
Python
import copy
|
|
import math
|
|
|
|
from helpers import *
|
|
from peewee import *
|
|
import discord
|
|
from discord import app_commands
|
|
from datetime import datetime, timedelta
|
|
from discord.ext import commands, tasks
|
|
from typing import Literal
|
|
|
|
db = SqliteDatabase(
|
|
'storage/sba_is_fun.db',
|
|
pragmas={
|
|
'journal_mode': 'wal',
|
|
'cache_size': -1 * 64000,
|
|
'synchronous': 0
|
|
}
|
|
)
|
|
|
|
|
|
class Creator(Model):
|
|
name = CharField()
|
|
discordid = IntegerField()
|
|
|
|
class Meta:
|
|
database = db
|
|
|
|
|
|
class Command(Model):
|
|
name = CharField()
|
|
message = CharField()
|
|
creator = ForeignKeyField(Creator)
|
|
createtime = DateTimeField()
|
|
last_used = DateTimeField()
|
|
sent_warns = IntegerField(default=0)
|
|
|
|
class Meta:
|
|
database = db
|
|
|
|
|
|
class Roles(Model):
|
|
name = CharField(unique=True)
|
|
enabled = BooleanField(default=True)
|
|
|
|
class Meta:
|
|
database = db
|
|
|
|
|
|
class Soaks(Model):
|
|
user = IntegerField()
|
|
message_id = IntegerField()
|
|
timestamp = DateTimeField()
|
|
|
|
class Meta:
|
|
database = db
|
|
|
|
|
|
class Fun(commands.Cog):
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
|
|
db.create_tables([Creator, Command, Roles, Soaks])
|
|
db.close()
|
|
self.daily_check.start()
|
|
|
|
@tasks.loop(hours=20)
|
|
async def daily_check(self):
|
|
try:
|
|
# logging.info(f'trying to start cc check')
|
|
guild = self.bot.get_guild(int(os.environ.get('GUILD_ID')))
|
|
if not guild:
|
|
# logging.info(f'no guild found for cc check')
|
|
await asyncio.sleep(15)
|
|
guild = self.bot.get_guild(int(os.environ.get('GUILD_ID')))
|
|
if not guild:
|
|
logging.error(f'Fun cog could not access guild')
|
|
return
|
|
except Exception as e:
|
|
logging.error(f'Could not run daily_check: {e}')
|
|
return
|
|
|
|
if guild.id != 613880856032968834:
|
|
logging.info(f'Not checking CCs outside of SBa server')
|
|
return
|
|
# <discordid> = {'member': <discord member>, 'commands': [(<command.name>, <command.message>)]}
|
|
del_notifs = {}
|
|
del_counter = 0
|
|
# <discordid> = {'member': <discord member>, 'commands': [(<command.name>, <command.message>)]}
|
|
warn_notifs = {}
|
|
now = datetime.now()
|
|
for x in Command.select():
|
|
# Final check / deleted
|
|
if x.last_used + timedelta(days=90) < now:
|
|
logging.warning(f'Deleting `!cc {x.name}`')
|
|
owner = guild.get_member(x.creator.discordid)
|
|
if owner:
|
|
if owner.id not in del_notifs:
|
|
del_notifs[owner.id] = {'member': owner, 'commands': [(x.name, x.message)]}
|
|
else:
|
|
del_notifs[owner.id]['commands'].append((x.name, x.message))
|
|
|
|
x.delete_instance()
|
|
del_counter += 1
|
|
|
|
elif x.last_used + timedelta(days=60) < now and (x.sent_warns is None or x.sent_warns == 0):
|
|
logging.warning(f'Warning for `!cc {x.name}`')
|
|
x.sent_warns = 1
|
|
x.save()
|
|
owner = guild.get_member(x.creator.discordid)
|
|
if owner:
|
|
if owner.id not in warn_notifs:
|
|
warn_notifs[owner.id] = {'member': owner, 'commands': [(x.name, x.message)]}
|
|
else:
|
|
warn_notifs[owner.id]['commands'].append((x.name, x.message))
|
|
|
|
# else:
|
|
# logging.warning(
|
|
# f'Command <!cc {x.name}> last used {x.last_used} / delta: {now - x.last_used} \n/>60 days: '
|
|
# f'{x.last_used + timedelta(days=60) < now} / sent_warns: {x.sent_warns}'
|
|
# )
|
|
|
|
db.close()
|
|
logging.info(f'deletions: {del_notifs}\nwarnings: {warn_notifs}')
|
|
|
|
for member in del_notifs:
|
|
plural = len(del_notifs[member]["commands"]) > 1
|
|
msg_content = f'Yo, it\'s cleanup time. I am deleting the following custom ' \
|
|
f'command{"s" if plural else ""}:\n\n'
|
|
short_msg_content = copy.deepcopy(msg_content)
|
|
for x in del_notifs[member]["commands"]:
|
|
msg_content += f'`!cc {x[0]}` - {x[1]}\n'
|
|
short_msg_content += f'`!cc {x[0]}`\n'
|
|
|
|
try:
|
|
await del_notifs[member]['member'].send(msg_content)
|
|
except Exception as e:
|
|
logging.error(f'fun daily_check - could not send deletion message to {del_notifs[member]["member"]} '
|
|
f'/ trying short_msg')
|
|
try:
|
|
await del_notifs[member]['member'].send(short_msg_content)
|
|
except Exception as e:
|
|
logging.error(f'fun daily_check - still could not send deletion message')
|
|
|
|
for member in warn_notifs:
|
|
plural = len(warn_notifs[member]["commands"]) > 1
|
|
msg_content = f'Heads up, the following custom ' \
|
|
f'command{"s" if plural else ""} will be deleted next month if ' \
|
|
f'{"they are" if plural else "it is"} not used:\n\n'
|
|
short_msg_content = copy.deepcopy(msg_content)
|
|
for x in warn_notifs[member]["commands"]:
|
|
msg_content += f'`!cc {x[0]}` - {x[1]}\n'
|
|
short_msg_content += f'`!cc {x[0]}`\n'
|
|
|
|
try:
|
|
await warn_notifs[member]['member'].send(msg_content)
|
|
except Exception as e:
|
|
logging.error(f'fun daily_check - could not send warn message to {warn_notifs[member]["member"]} '
|
|
f'/ trying short_msg')
|
|
try:
|
|
await warn_notifs[member]['member'].send(short_msg_content)
|
|
except Exception as e:
|
|
logging.error(f'fun daily_check - still could not send warn message')
|
|
|
|
logging.info(f'Deleted {del_counter} commands; sent deletion notifications to {len(del_notifs)} users; '
|
|
f'sent warnings to {len(warn_notifs)} users')
|
|
|
|
# async def cog_command_error(self, ctx, error):
|
|
# await ctx.send(f'{error}')
|
|
|
|
@commands.Cog.listener(name='on_message')
|
|
async def on_message_listener(self, message):
|
|
if message.author.bot or message.channel.guild.id != int(os.environ.get('GUILD_ID')) \
|
|
or message.content[:1] == '!':
|
|
return
|
|
|
|
tm = message.content.lower()
|
|
if 'soak' in tm or 'soaking' in tm:
|
|
squery = Soaks.select().order_by(-Soaks.id).limit(1)
|
|
if squery.count() > 0:
|
|
last_soak = squery[0]
|
|
else:
|
|
last_soak = None
|
|
new_soak = Soaks.insert(
|
|
{'user': message.author.id, 'message_id': message.id, 'timestamp': datetime.now()}
|
|
).execute()
|
|
db.close()
|
|
|
|
time_since = datetime.now() - last_soak.timestamp
|
|
# logging.info(f'time_since: {time_since} / seconds: {time_since.seconds} / days: {time_since.days}')
|
|
gif_search = None
|
|
if time_since.days >= 2:
|
|
ts_string = f'{time_since.days} days'
|
|
if time_since.days > 30:
|
|
gif_search = 'elite'
|
|
elif time_since.days > 14:
|
|
gif_search = 'pretty good'
|
|
else:
|
|
if time_since.seconds >= 7200:
|
|
ts_string = f'{time_since.seconds // 3600} hours'
|
|
gif_search = 'whats wrong with you'
|
|
else:
|
|
if time_since.seconds >= 120:
|
|
ts_string = f'{time_since.seconds // 60} minutes'
|
|
else:
|
|
ts_string = f'{time_since.seconds} seconds'
|
|
gif_search = 'pathetic'
|
|
|
|
await message.channel.send(
|
|
f'It has been {ts_string} since soaking was mentioned.'
|
|
)
|
|
|
|
if gif_search is not None:
|
|
try:
|
|
await message.channel.send(random_gif(gif_search))
|
|
except Exception as e:
|
|
logging.error(e)
|
|
|
|
@commands.command(name='lastsoak', aliases=['ls'], help='Get a link to the last mention of soaking')
|
|
async def last_soak_command(self, ctx):
|
|
squery = Soaks.select().order_by(-Soaks.id).limit(1)
|
|
if squery.count() > 0:
|
|
last_soak = squery[0]
|
|
else:
|
|
await ctx.send(f'I could not find the last mention of soaking.')
|
|
return
|
|
|
|
message = await ctx.fetch_message(last_soak.message_id)
|
|
await ctx.send(f'The last mention of soaking was: {message.jump_url}')
|
|
|
|
@commands.command(name='cc', help='Run custom custom command')
|
|
async def custom_command(self, ctx, command):
|
|
chosen = Command.get_or_none(fn.Lower(Command.name) == command.lower())
|
|
if not chosen:
|
|
# Error gif
|
|
# await ctx.send('https://tenor.com/blQnd.gif')
|
|
|
|
# Schitt's Creek 'what's that' gif
|
|
# await ctx.send('https://media.giphy.com/media/l0HUhFZx6q0hsPtHq/giphy.gif')
|
|
|
|
# Kermit lost gif
|
|
await ctx.send('https://tenor.com/6saQ.gif')
|
|
else:
|
|
if chosen.name == 'prettyrainbow' and ctx.author.id == 291738770313707521:
|
|
await ctx.send(random_no_phrase())
|
|
return
|
|
await ctx.send(chosen.message)
|
|
chosen.last_used = datetime.now()
|
|
chosen.sent_warns = 0
|
|
chosen.save()
|
|
|
|
db.close()
|
|
|
|
@commands.command(name='about', help='Who made the custom command')
|
|
async def about_command(self, ctx, command):
|
|
chosen = Command.get_or_none(fn.Lower(Command.name) == command.lower())
|
|
if not chosen:
|
|
await ctx.send('https://tenor.com/blQnd.gif')
|
|
|
|
embed = discord.Embed(title=f'About {chosen.name.title()}', color=0xFFFF00)
|
|
embed.add_field(name=f'Creator', value=f'{chosen.creator.name}', inline=False)
|
|
embed.add_field(name='Creation Date', value=f'{chosen.createtime}', inline=False)
|
|
embed.add_field(name='Message', value=f'{chosen.message}', inline=False)
|
|
|
|
await ctx.send(content=None, embed=embed)
|
|
db.close()
|
|
|
|
@commands.command(name='newcc', help='Create a new custom command')
|
|
@commands.has_any_role(SBA_PLAYERS_ROLE_NAME, 'Paper Dynasty Players')
|
|
async def new_custom_command(self, ctx, name, *, message):
|
|
time = datetime.now()
|
|
command = name
|
|
comm_message = message
|
|
|
|
chosen = Command.get_or_none(fn.Lower(Command.name) == command.lower())
|
|
if chosen:
|
|
await ctx.send('There is already a command with that name!')
|
|
return
|
|
|
|
embed = discord.Embed(title='Is this what you want?', color=0x91329F)
|
|
embed.add_field(name='Command Name', value=command, inline=False)
|
|
embed.add_field(name='Message', value=comm_message, inline=False)
|
|
|
|
await ctx.send(content=None, embed=embed)
|
|
|
|
view = Confirm(responders=[ctx.author])
|
|
question = await ctx.send('Should I create this for you?', view=view)
|
|
await view.wait()
|
|
|
|
if not view.value:
|
|
await question.edit(content='You keep thinking on it.', view=None)
|
|
return
|
|
|
|
this_person = Creator.get_or_none(Creator.discordid == ctx.author.id)
|
|
if not this_person:
|
|
this_person = Creator(name=f'{ctx.author.name}', discordid=f'{ctx.author.id}')
|
|
this_person.save()
|
|
|
|
this_command = Command(name=command, message=comm_message, createtime=time, creator=this_person, last_used=time)
|
|
if this_command.save() == 1:
|
|
await question.edit(content=f'`!cc {this_command.name}` is now a thing!', view=None)
|
|
else:
|
|
await question.edit(content='Hmm...I couldn\'t add that. I might need a grown up to help.', view=None)
|
|
|
|
db.close()
|
|
|
|
@commands.command(name='delcc', help='Delete a custom command')
|
|
@commands.has_any_role(SBA_PLAYERS_ROLE_NAME, 'Paper Dynasty Players')
|
|
async def delete_custom_command(self, ctx, name):
|
|
this_command = Command.get_or_none(fn.Lower(Command.name) == name.lower())
|
|
if not this_command:
|
|
await ctx.send('I couldn\'t find that command, sorry.')
|
|
return
|
|
|
|
if this_command.creator.discordid != ctx.author.id and ctx.author.id != self.bot.owner_id:
|
|
await ctx.send('Looks like this isn\'t your command to delete.')
|
|
return
|
|
|
|
embed = discord.Embed(title='Do you want to delete this command?', color=0x91329F)
|
|
embed.add_field(name='Command Name', value=this_command.name, inline=False)
|
|
embed.add_field(name='Message', value=this_command.message, inline=False)
|
|
|
|
view = Confirm(responders=[ctx.author])
|
|
question = await ctx.send(content=None, embed=embed, view=view)
|
|
await view.wait()
|
|
|
|
if not view.value:
|
|
await question.edit(content='It stays for now.', view=None)
|
|
return
|
|
|
|
if this_command.delete_instance() == 1:
|
|
await question.edit(view=None)
|
|
await ctx.send('He gone!')
|
|
else:
|
|
await ctx.send('Welp. That didn\'t work. Go complain to an adult, I guess.')
|
|
|
|
db.close()
|
|
|
|
@commands.command(name='allcc', help='Show all custom commands')
|
|
async def show_custom_commands(self, ctx, page=1):
|
|
def get_embed(this_page):
|
|
this_embed = discord.Embed(title=f'All Custom Commands', color=0x2F939F)
|
|
column_one = ''
|
|
column_two = ''
|
|
all_commands = Command.select().paginate(this_page, 40).order_by(Command.name)
|
|
for x in range(20):
|
|
try:
|
|
column_one += f'**{all_commands[x].name}** by {all_commands[x].creator.name}\n'
|
|
except Exception as e:
|
|
logging.error(f'Error building !allcc embed: {e}')
|
|
break
|
|
this_embed.add_field(name=f'{(this_page - 1) * 40 + 1}-{this_page * 40 - 20}', value=column_one)
|
|
|
|
for x in range(20, 40):
|
|
try:
|
|
column_two += f'**{all_commands[x].name}** by {all_commands[x].creator.name}\n'
|
|
except Exception as e:
|
|
logging.error(f'Error building !allcc embed: {e}')
|
|
break
|
|
if len(column_two) > 0:
|
|
this_embed.add_field(name=f'{(this_page - 1) * 40 + 21}-{this_page * 40}', value=column_two)
|
|
|
|
return this_embed
|
|
|
|
page_num = page
|
|
total_commands = Command.select(Command.id)
|
|
last_page = math.ceil(total_commands.count()/40)
|
|
|
|
if page_num > last_page:
|
|
await ctx.send(f'The max page number is {last_page}; going there now!')
|
|
page_num = last_page
|
|
|
|
embed = get_embed(page_num)
|
|
embed.description = f'Page {page_num} / {last_page}'
|
|
view = Pagination(responders=[ctx.author])
|
|
resp_message = await ctx.send(content=None, embed=embed, view=view)
|
|
|
|
while True:
|
|
await view.wait()
|
|
|
|
if view.value:
|
|
logging.info(f'got a value: {view.value}')
|
|
if view.value == 'left':
|
|
page_num = page_num - 1 if page_num > 1 else last_page
|
|
elif view.value == 'right':
|
|
page_num = page_num + 1 if page_num <= last_page else 1
|
|
elif view.value == 'cancel':
|
|
await resp_message.edit(content=None, embed=embed, view=None)
|
|
break
|
|
view.value = None
|
|
else:
|
|
await resp_message.edit(content=None, embed=embed, view=None)
|
|
break
|
|
|
|
# await resp_message.edit(content=None, embed=embed, view=None)
|
|
embed = get_embed(page_num)
|
|
embed.description = f'Page {page_num} / {last_page}'
|
|
view = Pagination(responders=[ctx.author])
|
|
await resp_message.edit(content=None, embed=embed, view=view)
|
|
|
|
db.close()
|
|
|
|
@commands.command(name='mycc', aliases=['showcc'], help='Show my commands')
|
|
@commands.has_any_role(SBA_PLAYERS_ROLE_NAME, 'Paper Dynasty Players')
|
|
async def my_custom_commands(self, ctx):
|
|
this_creator = Creator.get_or_none(Creator.discordid == ctx.author.id)
|
|
|
|
if not this_creator:
|
|
await ctx.send('It doesn\'t look like you\'ve created any custom commands. Try it out by running the '
|
|
'!help newcc for the command syntax!')
|
|
return
|
|
|
|
all_commands = Command.select().join(Creator).where(Command.creator == this_creator)
|
|
|
|
if all_commands.count() == 0:
|
|
await ctx.send('It doesn\'t look like you\'ve created any custom commands. Try it out by running the '
|
|
'!help newcc for the command syntax!')
|
|
return
|
|
|
|
comm_message = ''
|
|
for x in all_commands:
|
|
comm_message += f'{x.name}\n'
|
|
|
|
embed = discord.Embed(title=f'{ctx.author.name}\'s Commands', color=0x2F939F)
|
|
embed.add_field(name=f'Command Names', value=comm_message, inline=False)
|
|
|
|
await ctx.send(content=None, embed=embed)
|
|
|
|
db.close()
|
|
|
|
@app_commands.command(name='woulditdong', description='Log a dinger to see would it dong across SBa')
|
|
@app_commands.checks.has_any_role(SBA_PLAYERS_ROLE_NAME)
|
|
async def would_it_dong_slash(
|
|
self, interaction: discord.Interaction, batter_name: str, pitcher_name: str,
|
|
day_or_night: Literal['day', 'night'] = 'night',
|
|
result: Literal['no-doubt', 'bp-homerun', 'bp-flyout'] = 'bp-homerun', d20: int = None):
|
|
await interaction.response.defer()
|
|
current = await db_get('current')
|
|
team = await get_team_by_owner(current['season'], interaction.user.id)
|
|
|
|
result_text = 'Home Run'
|
|
if result == 'bp-flyout':
|
|
result_text = 'Fly Out'
|
|
|
|
season = 'fall'
|
|
if current['week'] < 6:
|
|
season = 'spring'
|
|
elif current['week'] < 17:
|
|
season = 'summer'
|
|
|
|
hr_count = 16
|
|
if result in ['bp-homerun', 'bp-flyout']:
|
|
# Check ballpark table for ballpark count
|
|
hr_count = random.randint(1, 15)
|
|
|
|
proj_distance = 369
|
|
|
|
dong_text = f'Result: {result_text}\n\n' \
|
|
f'Season: {season.title()}\n' \
|
|
f'Time of Day: {day_or_night.title()}\n' \
|
|
f'D20: {d20 if d20 is not None else "N/A"}\n' \
|
|
f'Proj. dist: {proj_distance} ft\n\n' \
|
|
f'This would have been a home run in {hr_count}/16 SBa ballparks.'
|
|
embed = get_team_embed(f'{batter_name.title()} vs {pitcher_name.title()}', team, thumbnail=False)
|
|
embed.set_author(name='Would it Dong?', icon_url=team['thumbnail'])
|
|
embed.add_field(name='** **', value=dong_text)
|
|
await send_to_channel(self.bot, 'news-ticker', content=None, embed=embed)
|
|
await interaction.edit_original_response(content=None, embed=embed)
|
|
|
|
# @commands.command(name='showcc', help='Show one person\'s custom commands')
|
|
# @commands.has_any_role(SBA_PLAYERS_ROLE_NAME, 'Paper Dynasty Players')
|
|
# async def show_cc_command(self, ctx, ):
|
|
|
|
# @commands.command(name='role', help='Toggle role')
|
|
# async def toggle_role_command(self, ctx, *, role_name):
|
|
# all_roles = [x.name for x in Roles.select().where(Roles.enabled)]
|
|
#
|
|
# async def toggle_role(full_role):
|
|
# if full_role in ctx.author.roles:
|
|
# await ctx.author.remove_roles(full_role)
|
|
# else:
|
|
# await ctx.author.add_roles(full_role)
|
|
#
|
|
# if len(role_name) < 4:
|
|
# await ctx.send('https://thumbs.gfycat.com/FrayedUnequaledGnat-size_restricted.gif')
|
|
# await ctx.send(f'What even is **{role_name}**...')
|
|
# db.close()
|
|
# return
|
|
#
|
|
# for name in all_roles:
|
|
# if role_name.lower() in name.lower():
|
|
# try:
|
|
# this_role = discord.utils.get(ctx.guild.roles, name=name)
|
|
# await toggle_role(this_role)
|
|
# await ctx.send(random_conf_gif())
|
|
# return
|
|
# except:
|
|
# await ctx.send(await get_emoji(ctx, 'fforrespect', False))
|
|
# await ctx.send('I was not able to assign that role.')
|
|
# return
|
|
#
|
|
# await ctx.send(f'That doesn\'t sound familiar. **{role_name}**...did you make that shit up?')
|
|
|
|
# @commands.command(name='showroles', help='Show toggleable roles')
|
|
# async def show_roles_command(self, ctx):
|
|
# all_roles = [x.name for x in Roles.select().where(Roles.enabled)]
|
|
# role_string = '\n- '.join(all_roles)
|
|
#
|
|
# embed = get_team_embed('Toggleable Roles', thumbnail=False)
|
|
# embed.description = 'Run !role <role_name> to toggle the role on or off'
|
|
# embed.add_field(name='Role Names', value=f'- {role_string}')
|
|
#
|
|
# await ctx.send(content=None, embed=embed)
|
|
|
|
# @commands.command(name='newrole', aliases=['removerole'], help='Make toggleable role')
|
|
# @commands.is_owner()
|
|
# async def make_toggleable_role_command(self, ctx, *, role_name):
|
|
# this_role = Roles.get_or_none(Roles.name == role_name)
|
|
#
|
|
# if not this_role:
|
|
# # Create the role if it doesn't exist
|
|
#
|
|
# this_role = Roles(name=role_name)
|
|
# this_role.save()
|
|
# if not discord.utils.get(ctx.guild.roles, name=this_role.name):
|
|
# await ctx.guild.create_role(name=f'{role_name}', mentionable=True)
|
|
# else:
|
|
# # Disable the role
|
|
#
|
|
# if this_role.enabled:
|
|
# this_role.enabled = False
|
|
# else:
|
|
# this_role.enabled = True
|
|
# this_role.save()
|
|
# this_role = discord.utils.get(ctx.guild.roles, name=this_role.name)
|
|
#
|
|
# if this_role:
|
|
# await this_role.edit(mentionable=False)
|
|
# else:
|
|
# await ctx.send('That role doesn\'t exist in the server.')
|
|
#
|
|
# await ctx.send(random_conf_gif())
|
|
|
|
# @commands.command(name='bulkrole', hidden=True)
|
|
# @commands.is_owner()
|
|
# async def bulkrole_command(self, ctx, *roles):
|
|
# all_roles = []
|
|
#
|
|
# for x in roles:
|
|
# all_roles.append(discord.utils.get(ctx.guild.roles, name=x))
|
|
#
|
|
# await ctx.send('On it. This could take a bit.')
|
|
# time_start = datetime.now()
|
|
#
|
|
# async for member in ctx.guild.fetch_members():
|
|
# logging.warning(f'member: {member}')
|
|
# await member.add_roles(*all_roles)
|
|
#
|
|
# time_end = datetime.now()
|
|
# await ctx.send(f'All done! That took {time_end - time_start}')
|
|
|
|
|
|
async def setup(bot):
|
|
await bot.add_cog(Fun(bot))
|