257 lines
11 KiB
Python
257 lines
11 KiB
Python
import copy
|
|
|
|
from helpers import *
|
|
from db_calls import db_get, db_post, patch_player, get_player_by_name
|
|
|
|
import csv
|
|
import math
|
|
import pygsheets
|
|
from typing import Optional
|
|
|
|
from discord.ext import commands, tasks
|
|
from discord import app_commands
|
|
|
|
|
|
class Admins(commands.Cog):
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
|
|
@commands.command(name='current', help='Current db info')
|
|
@commands.is_owner()
|
|
async def current_command(self, ctx):
|
|
current = await db_get('current')
|
|
current_string = ''
|
|
|
|
for x in current:
|
|
current_string += f'{x}: {current[x]}\n'
|
|
|
|
await ctx.send(current_string)
|
|
|
|
@commands.command(name='blast', help='Megaphone')
|
|
@commands.is_owner()
|
|
async def blast_command(self, ctx, channel_name, *, message):
|
|
await send_to_channel(self.bot, channel_name, message)
|
|
await ctx.send(random_conf_gif())
|
|
|
|
@app_commands.command(name='blast', description='Send a message')
|
|
@app_commands.guilds(discord.Object(id=os.environ.get('GUILD_ID')))
|
|
@app_commands.checks.has_any_role('Da Commish')
|
|
async def blast_slash(
|
|
self, interaction: discord.Interaction, channel: discord.TextChannel, msg_content: str = None,
|
|
embed_title: str = None, embed_field_name: str = None, image_url: str = None, color_hex: str = None):
|
|
current = await db_get('current')
|
|
|
|
try:
|
|
embed = None
|
|
if embed_title:
|
|
embed = discord.Embed(
|
|
title=embed_title,
|
|
color=int(color_hex, 16) if color_hex is not None else int('0xa5fffc', 16)
|
|
)
|
|
embed.set_footer(text=f'SBa Season {current["season"]}', icon_url=LOGO)
|
|
embed.set_image(url=image_url)
|
|
if embed_field_name:
|
|
embed.add_field(
|
|
name=embed_field_name if embed_field_name is not None else "** **",
|
|
value=msg_content
|
|
)
|
|
await channel.send(content=None, embed=embed)
|
|
await interaction.response.send_message(content=random_conf_gif())
|
|
return
|
|
|
|
await channel.send(content=msg_content)
|
|
except Exception as e:
|
|
logging.error(f'Error blasting a message: {type(e)}: {e}')
|
|
await interaction.response.send_message(content=f'Uh oh\n\n{type(e)}: {e}')
|
|
|
|
# @commands.command(name='sendstats', help='all, batting, pitching')
|
|
# @commands.is_owner()
|
|
# async def send_stats_command(self, ctx, which='all'):
|
|
# trans_cog = self.bot.get_cog('Transactions')
|
|
# await trans_cog.send_stats_to_sheets(which=which)
|
|
|
|
@commands.command(name='test', hidden=True)
|
|
@commands.is_owner()
|
|
async def test_command(self, ctx):
|
|
# errors = []
|
|
# with open('storage/s89-injuries.csv') as csv_file:
|
|
# csv_reader = csv.reader(csv_file, delimiter=';')
|
|
# for row in csv_reader:
|
|
# strat_code = row[0]
|
|
# inj_rating = row[1]
|
|
# logging.info(f'strat_code: {strat_code} / inj_rating: {inj_rating}')
|
|
# p_query = await db_get('players', params=[
|
|
# ('season', 8), ('strat_code', strat_code)
|
|
# ])
|
|
# if p_query['count'] == 0:
|
|
# logging.error(f'Could not find strat_code {strat_code}')
|
|
# errors.append(f'**{strat_code}** - {inj_rating} not found')
|
|
# else:
|
|
# player = p_query['players'][0]
|
|
# player['injury_rating'] = inj_rating
|
|
# logging.info(f'patching {player["name"]} ({player["strat_code"]}) with injury {inj_rating}')
|
|
# await patch_player(player)
|
|
|
|
lost = []
|
|
p_query = await db_get('players', params=[('season', 7)])
|
|
for player in p_query['players']:
|
|
if player['team']['abbrev'] != 'FA':
|
|
pl_query = await db_get('players', params=[('season', 8), ('name', player['name'])])
|
|
if pl_query['count'] > 0:
|
|
old_team = player['team']
|
|
if old_team['abbrev'][-3:] == 'MiL':
|
|
t_query = await db_get('teams', params=[
|
|
('season', 8), ('team_abbrev', old_team['abbrev'][:-3])
|
|
])
|
|
elif old_team['abbrev'][-2:] == 'IL':
|
|
t_query = await db_get('teams', params=[
|
|
('season', 8), ('team_abbrev', old_team['abbrev'][:-2])
|
|
])
|
|
else:
|
|
t_query = await db_get('teams', params=[
|
|
('season', 8), ('team_abbrev', old_team['abbrev'])
|
|
])
|
|
|
|
new_player = copy.deepcopy(pl_query['players'][0])
|
|
new_player['team'] = t_query['teams'][0]
|
|
logging.info(f'Placing {new_player["name"]} on {new_player["team"]["sname"]}')
|
|
await patch_player(new_player)
|
|
else:
|
|
logging.info(f'No season 8 version of {player["name"]} was found')
|
|
lost.append(f'- {player["name"]} from {player["team"]["abbrev"]}\n')
|
|
|
|
await ctx.send(f'All done!')
|
|
if len(lost) > 0:
|
|
l_string = "".join(lost)
|
|
await ctx.send(f'Here are the lost players:\n\n{l_string}')
|
|
|
|
|
|
# @commands.command(name='sendmoves', help='Send moves to sheets')
|
|
# @commands.is_owner()
|
|
# async def send_moves_command(self, ctx):
|
|
# current = await db_get('current')
|
|
# await ctx.send('Authenticating with sheets...')
|
|
# sheets = pygsheets.authorize(service_file='storage/major-domo-service-creds.json')
|
|
# trans_tab = sheets.open_by_key(SBA_ROSTER_KEY).worksheet_by_title('Transactions')
|
|
# await ctx.send('Collecting transactions...')
|
|
# all_vals = []
|
|
# all_moves = await get_transactions(
|
|
# season=current['season'],
|
|
# timeout=30
|
|
# )
|
|
# await ctx.send(f'Processing transactions ({len(all_moves)} found)...')
|
|
# total_moves = len(all_moves)
|
|
#
|
|
# counter = 0
|
|
# for move in [*all_moves.values()]:
|
|
# all_vals.insert(
|
|
# 0,
|
|
# [
|
|
# move['player']['name'],
|
|
# move['oldteam']['sname'],
|
|
# move['newteam']['sname'],
|
|
# move['week'],
|
|
# total_moves + 416 - counter
|
|
# ]
|
|
# )
|
|
# counter += 1
|
|
# logging.warning(f'all_vals samples:\n0: {all_vals[0]}\n100: {all_vals[100]}\n1000: {all_vals[1000]}\n'
|
|
# f'2000: {all_vals[2000]}')
|
|
#
|
|
# await ctx.send('Sending transactions to sheets...')
|
|
# try:
|
|
# trans_tab.update_values(
|
|
# crange=f'A420',
|
|
# values=all_vals
|
|
# )
|
|
# await ctx.send('All done!')
|
|
# except Exception as e:
|
|
# await ctx.send('Failed sending to sheets')
|
|
|
|
# @commands.command(name='xpick', help='Expansion pick')
|
|
# @commands.is_owner()
|
|
# async def expansion_pick_command(self, ctx, team_abbrev, *, name):
|
|
# current = await db_get('current')
|
|
# player_cog = self.bot.get_cog('Players')
|
|
# player_name = await fuzzy_player_search(ctx, ctx.channel, self.bot, name, player_cog.player_list.keys())
|
|
# player = await get_one_player(player_name)
|
|
# team = await get_one_team(team_abbrev)
|
|
# old_team = copy.deepcopy(player["team"])
|
|
#
|
|
# if not team:
|
|
# await ctx.send(f'Who the fuck is **{team_abbrev}**? Get your shit together - it\'s DRAFT TIME!!!')
|
|
# return
|
|
#
|
|
# if old_team['id'] == 99:
|
|
# await ctx.send(f'Tell that bastard they\'re an idiot. {player["name"]} is a Free Agent.')
|
|
# return
|
|
#
|
|
# await patch_player(player['id'], team_id=team['id'])
|
|
# await ctx.send(content=None, embed=await get_player_embed(await get_one_player(player['id']), current))
|
|
# await send_to_channel(
|
|
# self.bot,
|
|
# 's4-draft-picks',
|
|
# f'Expansion Draft: {await get_emoji(ctx, team["sname"])}{team["sname"]} select **{player["name"]}** from '
|
|
# f'{await get_emoji(ctx, old_team["sname"])}{old_team["abbrev"]}'
|
|
# )
|
|
|
|
# @commands.command(name='injimport')
|
|
# @commands.is_owner()
|
|
# async def injury_import_command(self, ctx):
|
|
# sheets = pygsheets.authorize(service_file='storage/major-domo-service-creds.json')
|
|
# inj_tab = sheets.open_by_key('1uKRf7YwTcEfp8D7gUutQRwnOHW37hl6XcBbtqw3PbN4').worksheet_by_title('Sheet1')
|
|
# raw_data = inj_tab.get_values('A1', 'B545')
|
|
#
|
|
# for line in raw_data:
|
|
# player = await get_one_player(line[0])
|
|
# await patch_player(player['id'], pitcher_injury=line[1])
|
|
|
|
@commands.command(name='setdemweek', help='Set player\'s demotion week')
|
|
@commands.is_owner()
|
|
async def set_dem_week_command(self, ctx, week_num, *, player_name):
|
|
current = await db_get('current')
|
|
player_cog = self.bot.get_cog('Players')
|
|
player_name = await fuzzy_player_search(ctx, ctx.channel, self.bot, player_name, player_cog.player_list.keys())
|
|
# player = await get_one_player(player_name)
|
|
p_query = await db_get('players', params=[('season', current['season']), ('name', player_name)])
|
|
player = p_query['players'][0]
|
|
player['demotion_week'] = week_num
|
|
|
|
await patch_player(player)
|
|
await ctx.send(random_conf_gif())
|
|
|
|
@app_commands.command(name='set-keepers', description='Post final keepers')
|
|
@app_commands.guilds(discord.Object(id=os.environ.get('GUILD_ID')))
|
|
@app_commands.checks.has_any_role('Da Commish')
|
|
async def keeper_slash(
|
|
self, interaction: discord.Interaction, team_abbrev: str, keeper_1_name: str,
|
|
keeper_2_name: Optional[str] = None, keeper_3_name: Optional[str] = None,
|
|
keeper_4_name: Optional[str] = None, keeper_5_name: Optional[str] = None,
|
|
keeper_6_name: Optional[str] = None, keeper_7_name: Optional[str] = None):
|
|
await interaction.response.defer()
|
|
current = await db_get('current')
|
|
this_team = await get_team_by_abbrev(team_abbrev, current['season'])
|
|
if this_team is None:
|
|
await interaction.edit_original_response(content=f'Team {team_abbrev} not found')
|
|
|
|
k_list = []
|
|
for x in [keeper_1_name, keeper_2_name, keeper_3_name, keeper_4_name, keeper_5_name, keeper_6_name,
|
|
keeper_7_name]:
|
|
if x is not None:
|
|
this_player = await get_player_by_name(current['season'], x)
|
|
k_list.append({
|
|
'season': current['season'],
|
|
'team_id': this_team['id'],
|
|
'player_id': this_player['id']
|
|
})
|
|
|
|
resp = await db_post('keepers', payload={
|
|
'count': len(k_list),
|
|
'keepers': k_list
|
|
})
|
|
await interaction.edit_original_response(content=resp)
|
|
|
|
|
|
async def setup(bot):
|
|
await bot.add_cog(Admins(bot))
|