paper-dynasty-discord/helpers.py
Cal Corum 943dcc9b74 CLAUDE: Add get_context_user() helper for hybrid command compatibility
Created get_context_user() helper function to safely extract the user from
either Context or Interaction objects. This prevents AttributeError issues
when hybrid commands are invoked as slash commands.

Hybrid commands receive commands.Context (with .author) when invoked with
prefix commands, but discord.Interaction (with .user) when invoked as slash
commands. The helper function handles both cases transparently.

Updated all affected hybrid commands:
- /branding-pd (cogs/players.py, cogs/players_new/team_management.py)
- /pullroster (cogs/players.py, cogs/players_new/team_management.py)
- /newsheet (cogs/economy_new/team_setup.py)
- /lastpack (cogs/economy_new/packs.py)

This follows the same pattern as the owner_only() fix and provides a
consistent, maintainable solution for all hybrid commands.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 09:07:09 -06:00

1920 lines
72 KiB
Python

import asyncio
import datetime
import logging
import math
import os
import random
import traceback
import discord
import pygsheets
import requests
from discord.ext import commands
from api_calls import *
from bs4 import BeautifulSoup
from difflib import get_close_matches
from dataclasses import dataclass
from typing import Optional, Literal, Union, List
from exceptions import log_exception
from in_game.gameplay_models import Team
from constants import *
from discord_ui import *
from random_content import *
from utils import position_name_to_abbrev, user_has_role, get_roster_sheet_legacy, get_roster_sheet, get_player_url, owner_only, get_cal_user, get_context_user
from search_utils import *
from discord_utils import *
async def get_player_photo(player):
search_term = player['bbref_id'] if player['bbref_id'] else player['p_name']
req_url = f'https://www.thesportsdb.com/api/v1/json/1/searchplayers.php?p={search_term}'
try:
resp = requests.get(req_url, timeout=.5)
except Exception as e:
return None
if resp.status_code == 200 and resp.json()['player']:
if resp.json()['player'][0]['strSport'] == 'Baseball':
await db_patch('players', object_id=player['player_id'],
params=[('headshot', resp.json()['player'][0]['strThumb'])])
return resp.json()['player'][0]['strThumb']
return None
async def get_player_headshot(player):
search_term = player['bbref_id'] if player['bbref_id'] else player['p_name']
req_url = f'https://www.baseball-reference.com/search/search.fcgi?search={search_term}'
try:
resp = requests.get(req_url, timeout=2).text
soup = BeautifulSoup(resp, 'html.parser')
for item in soup.find_all('img'):
if 'headshot' in item['src']:
await db_patch('players', object_id=player['player_id'], params=[('headshot', item['src'])])
return item['src']
except:
pass
return await get_player_photo(player)
"""
NEW FOR SEASON 4
"""
async def get_team_by_owner(owner_id: int):
team = await db_get('teams', params=[('gm_id', owner_id)])
if not team['count']:
return None
return team['teams'][0]
async def team_role(ctx, team: Team):
return await get_or_create_role(ctx, f'{team.abbrev} - {team.lname}')
def get_all_pos(player):
all_pos = []
for x in range(1, 8):
if player[f'pos_{x}']:
all_pos.append(player[f'pos_{x}'])
return all_pos
async def share_channel(channel, user, read_only=False):
await channel.set_permissions(user, read_messages=True, send_messages=not read_only)
async def get_card_embeds(card, include_stats=False) -> list:
embed = discord.Embed(
title=f'{card["player"]["p_name"]}',
color=int(card['player']['rarity']['color'], 16)
)
# embed.description = card['team']['lname']
embed.description = f'{card["player"]["cardset"]["name"]} / {card["player"]["mlbclub"]}'
embed.set_author(name=card['team']['lname'], url=IMAGES['logo'], icon_url=card['team']['logo'])
embed.set_footer(text=f'Paper Dynasty Season {card["team"]["season"]}', icon_url=IMAGES['logo'])
if include_stats:
b_query = await db_get(
'plays/batting', params=[('player_id', card['player']['player_id']), ('season', PD_SEASON)])
p_query = await db_get(
'plays/pitching', params=[('player_id', card['player']['player_id']), ('season', PD_SEASON)])
embed.add_field(name='Player ID', value=f'{card["player"]["player_id"]}')
embed.add_field(name='Rarity', value=f'{card["player"]["rarity"]["name"]}')
embed.add_field(name='Cost', value=f'{card["player"]["cost"]}')
pos_string = ", ".join(get_all_pos(card['player']))
embed.add_field(name='Positions', value=pos_string)
# all_dex = card['player']['paperdex']
all_dex = await db_get('paperdex', params=[("player_id", card["player"]["player_id"]), ('flat', True)])
count = all_dex['count']
if card['team']['lname'] != 'Paper Dynasty':
bool_list = [True for elem in all_dex['paperdex'] if elem['team'] == card['team'].get('id', None)]
if any(bool_list):
if count == 1:
coll_string = f'Only you'
else:
coll_string = f'You and {count - 1} other{"s" if count - 1 != 1 else ""}'
elif count:
coll_string = f'{count} other team{"s" if count != 1 else ""}'
else:
coll_string = f'0 teams'
embed.add_field(name='Collected By', value=coll_string)
else:
embed.add_field(name='Collected By', value=f'{count} team{"s" if count != 1 else ""}')
# TODO: check for dupes with the included paperdex data
# if card['team']['lname'] != 'Paper Dynasty':
# team_dex = await db_get('cards', params=[("player_id", card["player"]["player_id"]), ('team_id', card['team']['id'])])
# count = 1 if not team_dex['count'] else team_dex['count']
# embed.add_field(name='# Dupes', value=f'{count - 1} dupe{"s" if count - 1 != 1 else ""}')
# embed.add_field(name='Team', value=f'{card["player"]["mlbclub"]}')
if card['player']['franchise'] != 'Pokemon':
player_pages = f'[BBRef](https://www.baseball-reference.com/players/{card["player"]["bbref_id"][0]}/{card["player"]["bbref_id"]}.shtml)'
else:
player_pages = f'[Pkmn]({PKMN_REF_URL}{card["player"]["bbref_id"]})'
embed.add_field(name='Player Page', value=f'{player_pages}')
embed.set_image(url=card["player"]["image"])
headshot = card['player']['headshot'] if card['player']['headshot'] else await get_player_headshot(card['player'])
if headshot:
embed.set_thumbnail(url=headshot)
else:
embed.set_thumbnail(url=IMAGES['logo'])
if card['player']['franchise'] == 'Pokemon':
if card['player']['fangr_id'] is not None:
try:
evo_mon = await db_get('players', object_id=card['player']['fangr_id'], none_okay=True)
if evo_mon is not None:
embed.add_field(
name='Evolves Into',
value=f'{evo_mon["p_name"]}'
)
except Exception as e:
logging.error('could not pull evolution: {e}', exc_info=True, stack_info=True)
if '420420' not in card['player']['strat_code']:
try:
evo_mon = await db_get('players', object_id=card['player']['strat_code'], none_okay=True)
if evo_mon is not None:
embed.add_field(
name='Evolves From',
value=f'{evo_mon["p_name"]}'
)
except Exception as e:
logging.error('could not pull evolution: {e}', exc_info=True, stack_info=True)
if include_stats:
if b_query['count'] > 0:
b = b_query['stats'][0]
re24 = f'{b["re24"]:.2f}'
batting_string = f'```\n' \
f' AVG OBP SLG\n' \
f' {b["avg"]:.3f} {b["obp"]:.3f} {b["slg"]:.3f}\n``````\n' \
f' OPS wOBA RE24\n' \
f' {b["ops"]:.3f} {b["woba"]:.3f} {re24: ^5}\n``````\n' \
f' PA H RBI 2B 3B HR SB\n' \
f'{b["pa"]: >3} {b["hit"]: ^3} {b["rbi"]: ^3} {b["double"]: >2} {b["triple"]: >2} ' \
f'{b["hr"]: >2} {b["sb"]: >2}```\n'
embed.add_field(name='Batting Stats', value=batting_string, inline=False)
if p_query['count'] > 0:
p = p_query['stats'][0]
ip_whole = math.floor(p['outs'] / 3)
ip_denom = p['outs'] % 3
ips = ip_whole + (ip_denom * 0.1)
kpbb = f'{p["k/bb"]:.1f}'
era = f'{p["era"]:.2f}'
whip = f'{p["whip"]:.2f}'
re24 = f'{p["re24"]:.2f}'
pitching_string = f'```\n' \
f' W-L SV ERA WHIP\n' \
f'{p["win"]: >2}-{p["loss"]: <2} {p["save"]: >2} {era: >5} {whip: >4}\n``````\n' \
f' IP SO K/BB RE24\n' \
f'{ips: >5} {p["so"]: ^3} {kpbb: ^4} {re24: ^5}\n```'
embed.add_field(name='Pitching Stats', value=pitching_string, inline=False)
if not card['player']['image2']:
return [embed]
card_two = discord.Embed(color=int(card['player']['rarity']['color'], 16))
card_two.set_footer(text=f'Paper Dynasty Season {card["team"]["season"]}', icon_url=IMAGES['logo'])
card_two.set_image(url=card['player']['image2'])
return [embed, card_two]
def image_embed(image_url: str, title: str = None, color: str = None, desc: str = None, author_name: str = None,
author_icon: str = None):
embed_color = int(SBA_COLOR, 16)
if color is not None:
embed_color = int(color, 16)
embed = discord.Embed(color=embed_color)
if title is not None:
embed.title = title
if desc is not None:
embed.description = desc
if author_name is not None:
icon = author_icon if author_icon is not None else IMAGES['logo']
embed.set_author(name=author_name, icon_url=icon)
embed.set_footer(text=f'Paper Dynasty Season {PD_SEASON}', icon_url=IMAGES['logo'])
embed.set_image(url=image_url)
return embed
def is_shiny(card):
if card['player']['rarity']['value'] >= 5:
return True
return False
async def display_cards(
cards: list, team: dict, channel, user, bot=None, pack_cover: str = None, cust_message: str = None,
add_roster: bool = True, pack_name: str = None) -> bool:
logger.info(f'display_cards called with {len(cards)} cards for team {team.get("abbrev", "Unknown")}')
try:
cards.sort(key=lambda x: x['player']['rarity']['value'])
logger.debug(f'Cards sorted successfully')
card_embeds = [await get_card_embeds(x) for x in cards]
logger.debug(f'Created {len(card_embeds)} card embeds')
page_num = 0 if pack_cover is None else -1
seen_shiny = False
logger.debug(f'Initial page_num: {page_num}, pack_cover: {pack_cover is not None}')
except Exception as e:
logger.error(f'Error in display_cards initialization: {e}', exc_info=True)
return False
try:
view = Pagination([user], timeout=10)
# Use simple text arrows instead of emojis to avoid context issues
l_emoji = ''
r_emoji = ''
view.left_button.disabled = True
view.left_button.label = f'{l_emoji}Prev: -/{len(card_embeds)}'
view.cancel_button.label = f'Close Pack'
view.right_button.label = f'Next: {page_num + 2}/{len(card_embeds)}{r_emoji}'
if len(cards) == 1:
view.right_button.disabled = True
logger.debug(f'Pagination view created successfully')
if pack_cover:
logger.debug(f'Sending pack cover message')
msg = await channel.send(
content=None,
embed=image_embed(pack_cover, title=f'{team["lname"]}', desc=pack_name),
view=view
)
else:
logger.debug(f'Sending card embed message for page {page_num}')
msg = await channel.send(content=None, embeds=card_embeds[page_num], view=view)
logger.debug(f'Initial message sent successfully')
except Exception as e:
logger.error(f'Error creating view or sending initial message: {e}', exc_info=True)
return False
try:
if cust_message:
logger.debug(f'Sending custom message: {cust_message[:50]}...')
follow_up = await channel.send(cust_message)
else:
logger.debug(f'Sending default message for {len(cards)} cards')
follow_up = await channel.send(f'{user.mention} you\'ve got {len(cards)} cards here')
logger.debug(f'Follow-up message sent successfully')
except Exception as e:
logger.error(f'Error sending follow-up message: {e}', exc_info=True)
return False
logger.debug(f'Starting main interaction loop')
while True:
try:
logger.debug(f'Waiting for user interaction on page {page_num}')
await view.wait()
logger.debug(f'User interaction received: {view.value}')
except Exception as e:
logger.error(f'Error in view.wait(): {e}', exc_info=True)
await msg.edit(view=None)
return False
if view.value:
if view.value == 'cancel':
await msg.edit(view=None)
if add_roster:
await follow_up.edit(content=f'Refresh your cards here: {get_roster_sheet(team)}')
return True
if view.value == 'left':
page_num -= 1 if page_num > 0 else 0
if view.value == 'right':
page_num += 1 if page_num < len(card_embeds) - 1 else 0
else:
if page_num == len(card_embeds) - 1:
await msg.edit(view=None)
if add_roster:
await follow_up.edit(content=f'Refresh your cards here: {get_roster_sheet(team)}')
return True
else:
page_num += 1
view.value = None
try:
if is_shiny(cards[page_num]) and not seen_shiny:
logger.info(f'Shiny card detected on page {page_num}: {cards[page_num]["player"]["p_name"]}')
seen_shiny = True
view = Pagination([user], timeout=300)
view.cancel_button.style = discord.ButtonStyle.success
view.cancel_button.label = 'Flip!'
view.left_button.label = '-'
view.right_button.label = '-'
view.left_button.disabled = True
view.right_button.disabled = True
# Get MVP image safely with fallback
franchise = cards[page_num]["player"]["franchise"]
logger.debug(f'Getting MVP image for franchise: {franchise}')
mvp_image = IMAGES['mvp'].get(franchise, IMAGES.get('mvp-hype', IMAGES['logo']))
await msg.edit(
embed=image_embed(
mvp_image,
color='56f1fa',
author_name=team['lname'],
author_icon=team['logo']
),
view=view)
logger.debug(f'MVP display updated successfully')
except Exception as e:
logger.error(f'Error processing shiny card on page {page_num}: {e}', exc_info=True)
# Continue with regular flow instead of crashing
try:
tmp_msg = await channel.send(content=f'<@&1163537676885033010> we\'ve got an MVP!')
await follow_up.edit(content=f'<@&1163537676885033010> we\'ve got an MVP!')
await tmp_msg.delete()
except discord.errors.NotFound:
# Role might not exist or message was already deleted
await follow_up.edit(content=f'We\'ve got an MVP!')
except Exception as e:
# Log error but don't crash the function
logger.error(f'Error handling MVP notification: {e}')
await follow_up.edit(content=f'We\'ve got an MVP!')
await view.wait()
view = Pagination([user], timeout=10)
try:
view.right_button.label = f'Next: {page_num + 2}/{len(card_embeds)}{r_emoji}'
view.cancel_button.label = f'Close Pack'
view.left_button.label = f'{l_emoji}Prev: {page_num}/{len(card_embeds)}'
if page_num == 0:
view.left_button.label = f'{l_emoji}Prev: -/{len(card_embeds)}'
view.left_button.disabled = True
elif page_num == len(card_embeds) - 1:
view.timeout = 600.0
view.right_button.label = f'Next: -/{len(card_embeds)}{r_emoji}'
view.right_button.disabled = True
logger.debug(f'Updating message to show page {page_num}/{len(card_embeds)}')
if page_num >= len(card_embeds):
logger.error(f'Page number {page_num} exceeds card_embeds length {len(card_embeds)}')
page_num = len(card_embeds) - 1
await msg.edit(content=None, embeds=card_embeds[page_num], view=view)
logger.debug(f'Message updated successfully to page {page_num}')
except Exception as e:
logger.error(f'Error updating message on page {page_num}: {e}', exc_info=True)
# Try to clean up and return
try:
await msg.edit(view=None)
except:
pass # If this fails too, just give up
return False
async def embed_pagination(
all_embeds: list, channel, user: discord.Member, custom_message: str = None,
timeout: int = 10, start_page: int = 0):
if start_page > len(all_embeds) - 1 or start_page < 0:
page_num = 0
else:
page_num = start_page
view = Pagination([user], timeout=timeout)
l_emoji = ''
r_emoji = ''
view.right_button.label = f'Next: {page_num + 2}/{len(all_embeds)}{r_emoji}'
view.cancel_button.label = f'Cancel'
view.left_button.label = f'{l_emoji}Prev: {page_num}/{len(all_embeds)}'
if page_num == 0:
view.left_button.label = f'{l_emoji}Prev: -/{len(all_embeds)}'
view.left_button.disabled = True
elif page_num == len(all_embeds) - 1:
view.right_button.label = f'Next: -/{len(all_embeds)}{r_emoji}'
view.right_button.disabled = True
msg = await channel.send(content=custom_message, embed=all_embeds[page_num], view=view)
while True:
await view.wait()
if view.value:
if view.value == 'cancel':
await msg.edit(view=None)
return True
if view.value == 'left':
page_num -= 1 if page_num > 0 else 0
if view.value == 'right':
page_num += 1 if page_num <= len(all_embeds) else len(all_embeds)
else:
if page_num == len(all_embeds) - 1:
await msg.edit(view=None)
return True
else:
page_num += 1
view.value = None
view = Pagination([user], timeout=timeout)
view.right_button.label = f'Next: {page_num + 2}/{len(all_embeds)}{r_emoji}'
view.cancel_button.label = f'Cancel'
view.left_button.label = f'{l_emoji}Prev: {page_num}/{len(all_embeds)}'
if page_num == 0:
view.left_button.label = f'{l_emoji}Prev: -/{len(all_embeds)}'
view.left_button.disabled = True
elif page_num == len(all_embeds) - 1:
view.timeout = 600.0
view.right_button.label = f'Next: -/{len(all_embeds)}{r_emoji}'
view.right_button.disabled = True
await msg.edit(content=None, embed=all_embeds[page_num], view=view)
async def get_test_pack(ctx, team):
pull_notifs = []
this_pack = await db_post('packs/one', payload={
'team_id': team['id'], 'pack_type_id': 1,
'open_time': int(datetime.datetime.timestamp(datetime.datetime.now())*1000)
})
ft_query = await db_get('players/random', params=[('max_rarity', 1), ('limit', 3)])
four_query = await db_get('players/random', params=[('min_rarity', 1), ('max_rarity', 3), ('limit', 1)])
five_query = await db_get('players/random', params=[('min_rarity', 5), ('max_rarity', 5), ('limit', 1)])
first_three = ft_query['players']
fourth = four_query['players']
fifth = five_query['players']
all_cards = [*first_three, *fourth, *fifth]
success = await db_post('cards', timeout=10, payload={'cards': [{
'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in all_cards]
})
if not success:
await ctx.send(f'I was not able to create these cards {get_emoji(ctx, "slight_frown")}')
return
for x in all_cards:
if x['rarity']['value'] >= 3:
pull_notifs.append(x)
for pull in pull_notifs:
await db_post('notifs', payload={
'created': int(datetime.datetime.timestamp(datetime.datetime.now())*1000),
'title': 'Rare Pull',
'field_name': f'{player_desc(pull)} ({pull["rarity"]["name"]})',
'message': f'Pulled by {team["abbrev"]}',
'about': f'Player-{pull["player_id"]}'
})
return [{'player': x, 'team': team} for x in all_cards]
async def roll_for_cards(all_packs: list, extra_val=None) -> list:
"""
Pack odds are calculated based on the pack type
Parameters
----------
extra_val
all_packs
Returns
-------
"""
all_players = []
team = all_packs[0]['team']
pack_ids = []
for pack in all_packs:
counts = {
'Rep': {
'count': 0,
'rarity': 0
},
'Res': {
'count': 0,
'rarity': 1
},
'Sta': {
'count': 0,
'rarity': 2
},
'All': {
'count': 0,
'rarity': 3
},
'MVP': {
'count': 0,
'rarity': 5
},
'HoF': {
'count': 0,
'rarity': 8
},
}
this_pack_players = []
if pack['pack_type']['name'] == 'Standard':
# Cards 1 - 2
for x in range(2):
d_1000 = random.randint(1, 1000)
if d_1000 <= 450:
counts['Rep']['count'] += 1
elif d_1000 <= 900:
counts['Res']['count'] += 1
else:
counts['Sta']['count'] += 1
# Card 3
d_1000 = random.randint(1, 1000)
if d_1000 <= 350:
counts['Rep']['count'] += 1
elif d_1000 <= 700:
counts['Res']['count'] += 1
elif d_1000 <= 950:
counts['Sta']['count'] += 1
else:
counts['All']['count'] += 1
# Card 4
d_1000 = random.randint(1, 1000)
if d_1000 <= 310:
counts['Rep']['count'] += 1
elif d_1000 <= 620:
counts['Res']['count'] += 1
elif d_1000 <= 940:
counts['Sta']['count'] += 1
elif d_1000 <= 990:
counts['All']['count'] += 1
else:
counts['MVP']['count'] += 1
# Card 5
d_1000 = random.randint(1, 1000)
if d_1000 <= 215:
counts['Rep']['count'] += 1
elif d_1000 <= 430:
counts['Res']['count'] += 1
elif d_1000 <= 930:
counts['Sta']['count'] += 1
elif d_1000 <= 980:
counts['All']['count'] += 1
elif d_1000 <= 990:
counts['MVP']['count'] += 1
else:
counts['HoF']['count'] += 1
elif pack['pack_type']['name'] == 'Premium':
# Card 1
d_1000 = random.randint(1, 1000)
if d_1000 <= 400:
counts['Rep']['count'] += 1
elif d_1000 <= 870:
counts['Res']['count'] += 1
elif d_1000 <= 970:
counts['Sta']['count'] += 1
elif d_1000 <= 990:
counts['All']['count'] += 1
else:
counts['MVP']['count'] += 1
# Card 2
d_1000 = random.randint(1, 1000)
if d_1000 <= 300:
counts['Rep']['count'] += 1
elif d_1000 <= 770:
counts['Res']['count'] += 1
elif d_1000 <= 970:
counts['Sta']['count'] += 1
elif d_1000 <= 990:
counts['All']['count'] += 1
else:
counts['MVP']['count'] += 1
# Card 3
d_1000 = random.randint(1, 1000)
if d_1000 <= 200:
counts['Rep']['count'] += 1
elif d_1000 <= 640:
counts['Res']['count'] += 1
elif d_1000 <= 940:
counts['Sta']['count'] += 1
elif d_1000 <= 990:
counts['All']['count'] += 1
else:
counts['MVP']['count'] += 1
# Card 4
d_1000 = random.randint(1, 1000)
if d_1000 <= 100:
counts['Rep']['count'] += 1
if d_1000 <= 530:
counts['Res']['count'] += 1
elif d_1000 <= 930:
counts['Sta']['count'] += 1
elif d_1000 <= 980:
counts['All']['count'] += 1
elif d_1000 <= 990:
counts['MVP']['count'] += 1
else:
counts['HoF']['count'] += 1
# Card 5
d_1000 = random.randint(1, 1000)
if d_1000 <= 380:
counts['Res']['count'] += 1
elif d_1000 <= 880:
counts['Sta']['count'] += 1
elif d_1000 <= 980:
counts['All']['count'] += 1
elif d_1000 <= 990:
counts['MVP']['count'] += 1
else:
counts['HoF']['count'] += 1
elif pack['pack_type']['name'] == 'Check-In Player':
logger.info(f'Building Check-In Pack // extra_val (type): {extra_val} {type(extra_val)}')
# Single Card
mod = 0
if isinstance(extra_val, int):
mod = extra_val
d_1000 = random.randint(1, 1000 + mod)
if d_1000 >= 1100:
counts['All']['count'] += 1
elif d_1000 >= 1000:
counts['Sta']['count'] += 1
elif d_1000 >= 500:
counts['Res']['count'] += 1
else:
counts['Rep']['count'] += 1
else:
raise TypeError(f'Pack type not recognized: {pack["pack_type"]["name"]}')
pull_notifs = []
for key in counts:
mvp_flag = None
if counts[key]['count'] > 0:
params = [
('min_rarity', counts[key]['rarity']), ('max_rarity', counts[key]['rarity']),
('limit', counts[key]['count'])
]
if all_packs[0]['pack_team'] is not None:
params.extend([('franchise', all_packs[0]['pack_team']['lname']), ('in_packs', True)])
elif all_packs[0]['pack_cardset'] is not None:
params.append(('cardset_id', all_packs[0]['pack_cardset']['id']))
else:
params.append(('in_packs', True))
pl = await db_get('players/random', params=params)
if pl['count'] != counts[key]['count']:
mvp_flag = counts[key]['count'] - pl['count']
logging.info(f'Set mvp flag to {mvp_flag} / cardset_id: {all_packs[0]["pack_cardset"]["id"]}')
for x in pl['players']:
this_pack_players.append(x)
all_players.append(x)
if x['rarity']['value'] >= 3:
pull_notifs.append(x)
if mvp_flag and all_packs[0]['pack_cardset']['id'] not in [23]:
logging.info(f'Adding {mvp_flag} MVPs for missing cards')
pl = await db_get('players/random', params=[('min_rarity', 5), ('limit', mvp_flag)])
for x in pl['players']:
this_pack_players.append(x)
all_players.append(x)
# Add dupes of Replacement/Reserve cards
elif mvp_flag:
logging.info(f'Adding {mvp_flag} duplicate pokemon cards')
for count in range(mvp_flag):
logging.info(f'Adding {pl["players"][0]["p_name"]} to the pack')
this_pack_players.append(x)
all_players.append(pl['players'][0])
success = await db_post(
'cards',
payload={'cards': [{
'player_id': x['player_id'], 'team_id': pack['team']['id'], 'pack_id': pack['id']} for x in this_pack_players]
},
timeout=10
)
if not success:
raise ConnectionError(f'Failed to create this pack of cards.')
await db_patch('packs', object_id=pack['id'], params=[
('open_time', int(datetime.datetime.timestamp(datetime.datetime.now())*1000))
])
pack_ids.append(pack['id'])
for pull in pull_notifs:
logger.info(f'good pull: {pull}')
await db_post('notifs', payload={
'created': int(datetime.datetime.timestamp(datetime.datetime.now())*1000),
'title': 'Rare Pull',
'field_name': f'{player_desc(pull)} ({pull["rarity"]["name"]})',
'message': f'Pulled by {team["abbrev"]}',
'about': f'Player-{pull["player_id"]}'
})
return pack_ids
async def give_packs(team: dict, num_packs: int, pack_type: dict = None) -> dict:
"""
Parameters
----------
pack_type
team
num_packs
Returns
-------
{ 'count': int, 'packs': [ all team packs ] }
"""
pt_id = pack_type['id'] if pack_type is not None else 1
await db_post(
'packs',
payload={'packs': [{'team_id': team['id'], 'pack_type_id': pt_id} for x in range(num_packs)]}
)
total_packs = await db_get('packs', params=[
('team_id', team['id']), ('opened', False)
])
return total_packs
def get_sheets(bot):
try:
return bot.get_cog('Gameplay').sheets
except Exception as e:
logger.error(f'Could not grab sheets auth: {e}')
raise ConnectionError(f'Bot has not authenticated with discord; please try again in 1 minute.')
def create_team_sheet(team, email: str, current, bot):
sheets = get_sheets(bot)
new_sheet = sheets.drive.copy_file(
f'{current["gsheet_template"]}',
f'{team["lname"]} Roster Sheet v{current["gsheet_version"]}',
'1539D0imTMjlUx2VF3NPMt7Sv85sb2XAJ'
)
logger.info(f'new_sheet: {new_sheet}')
this_sheet = sheets.open_by_key(new_sheet['id'])
this_sheet.share(email, role='writer')
team_data = this_sheet.worksheet_by_title('Team Data')
team_data.update_values(
crange='B1:B2',
values=[[f'{team["id"]}'], [f'\'{team_hash(team)}']]
)
logger.debug(f'this_sheet: {this_sheet}')
return this_sheet
async def refresh_sheet(team, bot, sheets=None) -> None:
return
if not sheets:
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team['gsheet'])
my_cards = this_sheet.worksheet_by_title('My Cards')
all_cards = this_sheet.worksheet_by_title('All Cards')
my_cards.update_value('A2', 'FALSE')
all_cards.update_value('A2', 'FALSE')
await asyncio.sleep(1)
my_cards.update_value('A2', 'TRUE')
await asyncio.sleep(0.5)
all_cards.update_value('A2', 'TRUE')
def delete_sheet(team, bot):
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team['gsheet'])
this_sheet.delete()
def share_sheet(team, email, bot) -> None:
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team['gsheet'])
this_sheet.share(email, role='writer')
def int_timestamp(datetime_obj: datetime.datetime) -> int:
return int(datetime.datetime.timestamp(datetime_obj) * 1000)
def get_pos_abbrev(pos_name):
if pos_name == 'Catcher':
return 'C'
elif pos_name == 'First Base':
return '1B'
elif pos_name == 'Second Base':
return '2B'
elif pos_name == 'Third Base':
return '3B'
elif pos_name == 'Shortstop':
return 'SS'
elif pos_name == 'Left Field':
return 'LF'
elif pos_name == 'Center Field':
return 'CF'
elif pos_name == 'Right Field':
return 'RF'
elif pos_name == 'Pitcher':
return 'P'
elif pos_name == 'Designated Hitter':
return 'DH'
elif pos_name == 'Pinch Hitter':
return 'PH'
else:
raise KeyError(f'{pos_name} is not a recognized position name')
async def cardset_search(cardset: str, cardset_list: list) -> Optional[dict]:
cardset_name = fuzzy_search(cardset, cardset_list)
if not cardset_name:
return None
c_query = await db_get('cardsets', params=[('name', cardset_name)])
if c_query['count'] == 0:
return None
return c_query['cardsets'][0]
def get_blank_team_card(player):
return {'player': player, 'team': {'lname': 'Paper Dynasty', 'logo': IMAGES['logo'], 'season': PD_SEASON, 'id': None}}
def get_rosters(team, bot, roster_num: Optional[int] = None) -> list:
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team['gsheet'])
r_sheet = this_sheet.worksheet_by_title(f'My Rosters')
logger.debug(f'this_sheet: {this_sheet} / r_sheet = {r_sheet}')
all_rosters = [None, None, None]
# Pull roster 1
if not roster_num or roster_num == 1:
roster_1 = r_sheet.range('B3:B28')
roster_name = r_sheet.cell('F30').value
logger.info(f'roster_1: {roster_1}')
if not roster_1[0][0].value == '':
all_rosters[0] = {'name': roster_name, 'roster_num': 1, 'team_id': team['id'], 'cards': None}
all_rosters[0]['cards'] = [int(x[0].value) for x in roster_1]
# Pull roster 2
if not roster_num or roster_num == 2:
roster_2 = r_sheet.range('B29:B54')
roster_name = r_sheet.cell('F31').value
logger.info(f'roster_2: {roster_2}')
if not roster_2[0][0].value == '':
all_rosters[1] = {'name': roster_name, 'roster_num': 2, 'team_id': team['id'], 'cards': None}
all_rosters[1]['cards'] = [int(x[0].value) for x in roster_2]
# Pull roster 3
if not roster_num or roster_num == 3:
roster_3 = r_sheet.range('B55:B80')
roster_name = r_sheet.cell('F32').value
logger.info(f'roster_3: {roster_3}')
if not roster_3[0][0].value == '':
all_rosters[2] = {'name': roster_name, 'roster_num': 3, 'team_id': team['id'], 'cards': None}
all_rosters[2]['cards'] = [int(x[0].value) for x in roster_3]
return all_rosters
def get_roster_lineups(team, bot, roster_num, lineup_num) -> list:
sheets = get_sheets(bot)
logger.debug(f'sheets: {sheets}')
this_sheet = sheets.open_by_key(team['gsheet'])
logger.debug(f'this_sheet: {this_sheet}')
r_sheet = this_sheet.worksheet_by_title('My Rosters')
logger.debug(f'r_sheet: {r_sheet}')
if lineup_num == 1:
row_start = 9
row_end = 17
else:
row_start = 18
row_end = 26
if roster_num == 1:
l_range = f'H{row_start}:I{row_end}'
elif roster_num == 2:
l_range = f'J{row_start}:K{row_end}'
else:
l_range = f'L{row_start}:M{row_end}'
logger.debug(f'l_range: {l_range}')
raw_cells = r_sheet.range(l_range)
logger.debug(f'raw_cells: {raw_cells}')
try:
lineup_cells = [(row[0].value, int(row[1].value)) for row in raw_cells]
except ValueError as e:
logger.error(f'Could not pull roster for {team["abbrev"]} due to a ValueError')
raise ValueError(f'Uh oh. Looks like your roster might not be saved. I am reading blanks when I try to '
f'get the card IDs')
logger.debug(f'lineup_cells: {lineup_cells}')
return lineup_cells
def post_ratings_guide(team, bot, this_sheet=None):
if not this_sheet:
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team['gsheet'])
p_guide = this_sheet.worksheet_by_title('Full Guide - Pitchers')
b_guide = this_sheet.worksheet_by_title('Full Guide - Batters')
p_guide.update_value('A1', RATINGS_PITCHER_FORMULA)
b_guide.update_value('A1', RATINGS_BATTER_FORMULA)
async def legal_channel(ctx):
bad_channels = ['paper-dynasty-chat', 'pd-news-ticker', 'pd-network-news']
if isinstance(ctx, commands.Context):
if ctx.channel.name in bad_channels:
raise commands.CheckFailure(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)')
else:
return True
elif ctx.channel.name in bad_channels:
# await ctx.message.add_reaction('❌')
# await ctx.send(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)')
# logger.warning(f'{ctx.author.name} posted in illegal channel.')
# return False
raise discord.app_commands.AppCommandError(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)')
else:
return True
def is_ephemeral_channel(channel) -> bool:
"""Check if channel requires ephemeral responses (chat channels)."""
if not channel or not hasattr(channel, 'name'):
return False
return channel.name in ['paper-dynasty-chat', 'pd-news-ticker']
def is_restricted_channel(channel) -> bool:
"""Check if channel is restricted for certain commands (chat/ticker channels)."""
if not channel or not hasattr(channel, 'name'):
return False
return channel.name in ['paper-dynasty-chat', 'pd-news-ticker']
def can_send_message(channel) -> bool:
"""Check if channel supports sending messages."""
return channel and hasattr(channel, 'send')
async def send_safe_message(
source: Union[discord.Interaction, commands.Context],
content: str = None,
*,
embeds: List[discord.Embed] = None,
view: discord.ui.View = None,
ephemeral: bool = False,
delete_after: float = None
) -> discord.Message:
"""
Safely send a message using the most appropriate method based on context.
For Interactions:
1. Try edit_original_response() if deferred
2. Try followup.send() if response is done
3. Try channel.send() if channel supports it
For Context:
1. Try ctx.send()
2. Try DM to user with context info if channel send fails
Args:
source: Discord Interaction or Context object
content: Message content
embeds: List of embeds to send
view: UI view to attach
ephemeral: Whether message should be ephemeral (Interaction only)
delete_after: Seconds after which to delete message
Returns:
The sent message object
Raises:
Exception: If all send methods fail
"""
logger = logging.getLogger('discord_app')
# Prepare message kwargs
kwargs = {}
if content is not None:
kwargs['content'] = content
if embeds is not None:
kwargs['embeds'] = embeds
if view is not None:
kwargs['view'] = view
if delete_after is not None:
kwargs['delete_after'] = delete_after
# Handle Interaction objects
if isinstance(source, discord.Interaction):
# Add ephemeral parameter for interactions
if ephemeral:
kwargs['ephemeral'] = ephemeral
# Strategy 1: Try edit_original_response if already deferred
if source.response.is_done():
try:
# For edit_original_response, we need to handle embeds differently
edit_kwargs = kwargs.copy()
if 'embeds' in edit_kwargs:
# edit_original_response expects 'embeds' parameter
pass # Already correct
if 'ephemeral' in edit_kwargs:
# Can't change ephemeral status on edit
del edit_kwargs['ephemeral']
await source.edit_original_response(**edit_kwargs)
# edit_original_response doesn't return a message object in the same way
# We'll use followup as backup to get a returnable message
if 'delete_after' not in kwargs: # Don't create extra messages if auto-deleting
return await source.followup.send("Message sent", ephemeral=True, delete_after=0.1)
return None # Can't return meaningful message object from edit
except Exception as e:
logger.debug(f"Failed to edit original response: {e}")
# Strategy 2: Try followup.send()
try:
return await source.followup.send(**kwargs)
except Exception as e:
logger.debug(f"Failed to send followup message: {e}")
# Strategy 3: Try channel.send() if possible
if can_send_message(source.channel):
try:
# Remove ephemeral for channel send (not supported)
channel_kwargs = kwargs.copy()
if 'ephemeral' in channel_kwargs:
del channel_kwargs['ephemeral']
return await source.channel.send(**channel_kwargs)
except Exception as e:
logger.debug(f"Failed to send channel message: {e}")
# All interaction methods failed
logger.error(f"All interaction message send methods failed for user {source.user.id}")
raise RuntimeError("Unable to send interaction message through any available method")
# Handle Context objects
elif isinstance(source, commands.Context):
# Strategy 1: Try ctx.send() directly
try:
# Remove ephemeral (not supported in Context)
ctx_kwargs = kwargs.copy()
if 'ephemeral' in ctx_kwargs:
del ctx_kwargs['ephemeral']
return await source.send(**ctx_kwargs)
except Exception as e:
logger.debug(f"Failed to send context message to channel: {e}")
# Strategy 2: Try DM to user with context info
try:
# Prepare DM with context information
channel_name = getattr(source.channel, 'name', 'Unknown Channel')
guild_name = getattr(source.guild, 'name', 'Unknown Server') if source.guild else 'DM'
dm_content = f"[Bot Response from #{channel_name} in {guild_name}]\n\n"
if content:
dm_content += content
# Send DM with modified content
dm_kwargs = kwargs.copy()
dm_kwargs['content'] = dm_content
if 'ephemeral' in dm_kwargs:
del dm_kwargs['ephemeral']
return await source.author.send(**dm_kwargs)
except Exception as dm_error:
logger.error(f"Failed to send DM fallback to user {source.author.id}: {dm_error}")
# Both ctx.send() and DM failed - let the exception bubble up
raise dm_error
else:
raise TypeError(f"Source must be discord.Interaction or commands.Context, got {type(source)}")
def get_role(ctx, role_name):
return discord.utils.get(ctx.guild.roles, name=role_name)
async def team_summary_embed(team, ctx, include_roster: bool = True):
embed = get_team_embed(f'{team["lname"]} Overview', team)
embed.add_field(name='General Manager', value=team['gmname'], inline=False)
embed.add_field(name='Wallet', value=f'{team["wallet"]}')
# embed.add_field(name='Collection Value', value=team['collection_value'])
p_query = await db_get('packs', params=[('team_id', team['id']), ('opened', False)])
if p_query['count'] > 0:
all_packs = {}
for x in p_query['packs']:
if x['pack_type']['name'] not in all_packs:
all_packs[x['pack_type']['name']] = 1
else:
all_packs[x['pack_type']['name']] += 1
pack_string = ''
for pack_type in all_packs:
pack_string += f'{pack_type.title()}: {all_packs[pack_type]}\n'
else:
pack_string = 'None'
embed.add_field(name='Unopened Packs', value=pack_string)
embed.add_field(name='Team Rating', value=f'{team["ranking"]}')
r_query = await db_get(f'results/team/{team["id"]}?season={PD_SEASON}')
if r_query:
embed.add_field(
name='Record',
value=f'Ranked: {r_query["ranked_wins"]}-{r_query["ranked_losses"]}\n'
f'Unlimited: {r_query["casual_wins"]}-{r_query["casual_losses"]}'
)
# try:
# r_query = await db_get('rosters', params=[('team_id', team['id'])])
# if r_query['count']:
# embed.add_field(name=f'Rosters', value=f'** **', inline=False)
# for roster in r_query['rosters']:
# roster_string = ''
# for i in range(1, 27):
# card = roster[f'card_{i}']
# roster_string += f'{card["player"]["description"]} ({card["player"]["pos_1"]})\n'
# embed.add_field(
# name=f'{roster["name"]} Roster',
# value=roster_string if len(roster_string) else "Unknown"
# )
# else:
# embed.add_field(
# name='Rosters',
# value='You can set up to three rosters for quick switching from your team sheet.',
# inline=False
# )
# except Exception as e:
# logger.error(f'Could not pull rosters for {team["abbrev"]}')
# embed.add_field(
# name='Rosters',
# value='Unable to pull current rosters. `/pullroster` to sync.',
# inline=False
# )
if include_roster:
embed.add_field(name='Team Sheet', value=get_roster_sheet(team), inline=False)
embed.add_field(
name='For Help',
value=f'`/help-pd` has FAQs; feel free to post questions in '
f'{get_channel(ctx, "paper-dynasty-chat").mention}.',
inline=False
)
return embed
async def give_cards_to_team(team, players: list = None, player_ids: list = None, pack_id=None):
if not pack_id:
p_query = await db_post(
'packs/one',
payload={
'team_id': team['id'],
'pack_type_id': 4,
'open_time': datetime.datetime.timestamp(datetime.datetime.now()) * 1000}
)
pack_id = p_query['id']
if not players and not player_ids:
raise ValueError('One of players or player_ids must be provided to distribute cards')
if players:
await db_post('cards', payload={'cards': [
{'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': pack_id} for x in players
]}, timeout=10)
elif player_ids:
await db_post('cards', payload={'cards': [
{'player_id': x, 'team_id': team['id'], 'pack_id': pack_id} for x in player_ids
]}, timeout=10)
def get_ratings_guide(sheets):
this_sheet = sheets.open_by_key(RATINGS_SHEET_KEY)
b_sheet = this_sheet.worksheet_by_title('ratings_Batters')
p_sheet = this_sheet.worksheet_by_title('ratings_Pitchers')
b_data = b_sheet.range('A2:N')
p_data = p_sheet.range('A2:N')
try:
batters = [
{
'player_id': int(x[0].value),
'p_name': x[1].value,
'rating': int(x[2].value),
'contact-r': int(x[3].value),
'contact-l': int(x[4].value),
'power-r': int(x[5].value),
'power-l': int(x[6].value),
'vision': int(x[7].value),
'speed': int(x[8].value),
'stealing': int(x[9].value),
'reaction': int(x[10].value),
'arm': int(x[11].value),
'fielding': int(x[12].value),
'hand': int(x[13].value),
} for x in b_data
]
pitchers = [
{
'player_id': int(x[0].value),
'p_name': x[1].value,
'rating': int(x[2].value),
'control-r': int(x[3].value),
'control-l': int(x[4].value),
'stuff-r': int(x[5].value),
'stuff-l': int(x[6].value),
'stamina': int(x[7].value),
'fielding': int(x[8].value),
'hit-9': int(x[9].value),
'k-9': int(x[10].value),
'bb-9': int(x[11].value),
'hr-9': int(x[12].value),
'hand': int(x[13].value),
} for x in p_data
]
except Exception as e:
return {'valid': False}
return {
'valid': True,
'batter_ratings': batters,
'pitcher_ratings': pitchers
}
async def paperdex_cardset_embed(team: dict, this_cardset: dict) -> list[discord.Embed]:
all_dex = await db_get(
'paperdex',
params=[('team_id', team['id']), ('cardset_id', this_cardset['id']), ('flat', True)]
)
dex_player_list = [x['player'] for x in all_dex['paperdex']]
hof_embed = get_team_embed(f'{team["lname"]} Collection', team=team)
mvp_embed = get_team_embed(f'{team["lname"]} Collection', team=team)
as_embed = get_team_embed(f'{team["lname"]} Collection', team=team)
sta_embed = get_team_embed(f'{team["lname"]} Collection', team=team)
res_embed = get_team_embed(f'{team["lname"]} Collection', team=team)
rep_embed = get_team_embed(f'{team["lname"]} Collection', team=team)
coll_data = {
99: {
'name': 'Hall of Fame',
'owned': 0,
'players': [],
'embeds': [hof_embed]
},
1: {
'name': 'MVP',
'owned': 0,
'players': [],
'embeds': [mvp_embed]
},
2: {
'name': 'All-Star',
'owned': 0,
'players': [],
'embeds': [as_embed]
},
3: {
'name': 'Starter',
'owned': 0,
'players': [],
'embeds': [sta_embed]
},
4: {
'name': 'Reserve',
'owned': 0,
'players': [],
'embeds': [res_embed]
},
5: {
'name': 'Replacement',
'owned': 0,
'players': [],
'embeds': [rep_embed]
},
'total_owned': 0
}
set_players = await db_get(
'players',
params=[('cardset_id', this_cardset['id']), ('flat', True), ('inc_dex', False)],
timeout=5
)
for player in set_players['players']:
if player['player_id'] in dex_player_list:
coll_data[player['rarity']]['owned'] += 1
coll_data['total_owned'] += 1
player['owned'] = True
else:
player['owned'] = False
logger.debug(f'player: {player} / type: {type(player)}')
coll_data[player['rarity']]['players'].append(player)
cover_embed = get_team_embed(f'{team["lname"]} Collection', team=team)
cover_embed.description = this_cardset['name']
cover_embed.add_field(name='# Total Cards', value=f'{set_players["count"]}')
cover_embed.add_field(name='# Collected', value=f'{coll_data["total_owned"]}')
display_embeds = [cover_embed]
for rarity_id in coll_data:
if rarity_id != 'total_owned':
if coll_data[rarity_id]['players']:
coll_data[rarity_id]['embeds'][0].description = f'Rarity: {coll_data[rarity_id]["name"]}'
coll_data[rarity_id]['embeds'][0].add_field(
name='# Collected / # Total Cards',
value=f'{coll_data[rarity_id]["owned"]} / {len(coll_data[rarity_id]["players"])}',
inline=False
)
chunk_string = ''
for index, this_player in enumerate(coll_data[rarity_id]['players']):
logger.debug(f'this_player: {this_player}')
chunk_string += '' if this_player['owned'] else ''
chunk_string += f'{this_player["p_name"]}\n'
if (index + 1) == len(coll_data[rarity_id]["players"]):
coll_data[rarity_id]['embeds'][0].add_field(
name=f'Group {math.ceil((index + 1) / 20)} / '
f'{math.ceil(len(coll_data[rarity_id]["players"]) / 20)}',
value=chunk_string
)
elif (index + 1) % 20 == 0:
coll_data[rarity_id]['embeds'][0].add_field(
name=f'Group {math.floor((index + 1) / 20)} / '
f'{math.ceil(len(coll_data[rarity_id]["players"]) / 20)}',
value=chunk_string
)
chunk_string = ''
display_embeds.append(coll_data[rarity_id]['embeds'][0])
return display_embeds
async def paperdex_team_embed(team: dict, mlb_team: dict) -> list[discord.Embed]:
all_dex = await db_get(
'paperdex',
params=[('team_id', team['id']), ('franchise', mlb_team['lname']), ('flat', True)]
)
dex_player_list = [x['player'] for x in all_dex['paperdex']]
c_query = await db_get('cardsets')
coll_data = {'total_owned': 0}
total_players = 0
for x in c_query['cardsets']:
set_players = await db_get(
'players',
params=[('cardset_id', x['id']), ('franchise', mlb_team['lname']), ('flat', True), ('inc_dex', False)]
)
if set_players is not None:
coll_data[x['id']] = {
'name': x['name'],
'owned': 0,
'players': [],
'embeds': [get_team_embed(f'{team["lname"]} Collection', team=team)]
}
total_players += set_players['count']
for player in set_players['players']:
if player['player_id'] in dex_player_list:
coll_data[x['id']]['owned'] += 1
coll_data['total_owned'] += 1
player['owned'] = True
else:
player['owned'] = False
logger.debug(f'player: {player} / type: {type(player)}')
coll_data[x['id']]['players'].append(player)
cover_embed = get_team_embed(f'{team["lname"]} Collection', team=team)
cover_embed.description = mlb_team['lname']
cover_embed.add_field(name='# Total Cards', value=f'{total_players}')
cover_embed.add_field(name='# Collected', value=f'{coll_data["total_owned"]}')
display_embeds = [cover_embed]
for cardset_id in coll_data:
if cardset_id != 'total_owned':
if coll_data[cardset_id]['players']:
coll_data[cardset_id]['embeds'][0].description = f'{mlb_team["lname"]} / ' \
f'{coll_data[cardset_id]["name"]}'
coll_data[cardset_id]['embeds'][0].add_field(
name='# Collected / # Total Cards',
value=f'{coll_data[cardset_id]["owned"]} / {len(coll_data[cardset_id]["players"])}',
inline=False
)
chunk_string = ''
for index, this_player in enumerate(coll_data[cardset_id]['players']):
logger.debug(f'this_player: {this_player}')
chunk_string += '' if this_player['owned'] else ''
chunk_string += f'{this_player["p_name"]}\n'
if (index + 1) == len(coll_data[cardset_id]["players"]):
coll_data[cardset_id]['embeds'][0].add_field(
name=f'Group {math.ceil((index + 1) / 20)} / '
f'{math.ceil(len(coll_data[cardset_id]["players"]) / 20)}',
value=chunk_string
)
elif (index + 1) % 20 == 0:
coll_data[cardset_id]['embeds'][0].add_field(
name=f'Group {math.floor((index + 1) / 20)} / '
f'{math.ceil(len(coll_data[cardset_id]["players"]) / 20)}',
value=chunk_string
)
chunk_string = ''
display_embeds.append(coll_data[cardset_id]['embeds'][0])
return display_embeds
def get_pack_cover(pack):
if pack['pack_cardset'] is not None and pack['pack_cardset'] == 23:
return IMAGES['pack-pkmnbs']
elif pack['pack_type']['name'] in ['Premium', 'MVP']:
return IMAGES['pack-pre']
elif pack['pack_type']['name'] == 'Standard':
return IMAGES['pack-sta']
elif pack['pack_type']['name'] == 'Mario':
return IMAGES['pack-mar']
else:
return None
async def open_st_pr_packs(all_packs: list, team: dict, context):
pack_channel = get_channel(context, 'pack-openings')
pack_cover = get_pack_cover(all_packs[0])
if pack_cover is None:
pack_channel = context.channel
if not pack_channel:
raise ValueError(f'I cannot find the pack-openings channel. {get_cal_user(context).mention} - halp?')
pack_ids = await roll_for_cards(all_packs)
if not pack_ids:
logger.error(f'open_packs - unable to roll_for_cards for packs: {all_packs}')
raise ValueError(f'I was not able to unpack these cards')
all_cards = []
for p_id in pack_ids:
new_cards = await db_get('cards', params=[('pack_id', p_id)])
all_cards.extend(new_cards['cards'])
if not all_cards:
logger.error(f'open_packs - unable to get cards for packs: {pack_ids}')
raise ValueError(f'I was not able to display these cards')
# Present cards to opening channel
if type(context) == commands.Context:
author = context.author
else:
author = context.user
await context.channel.send(content=f'Let\'s head down to {pack_channel.mention}!')
await display_cards(all_cards, team, pack_channel, author, pack_cover=pack_cover)
async def get_choice_from_cards(
interaction: discord.Interaction, all_players: list = None, cover_title: str = None,
cover_desc: str = None, cover_image_url: str = None, callback=None, temp_message: str = None,
conf_message: str = None, delete_message: bool = False):
# Display them with pagination, prev/next/select
card_embeds = [
await get_card_embeds(
{'player': x, 'team': {'lname': 'Paper Dynasty', 'season': PD_SEASON, 'logo': IMAGES['logo']}}
) for x in all_players
]
logger.debug(f'card embeds: {card_embeds}')
if cover_title is not None and cover_image_url is not None:
page_num = 0
view = Pagination([interaction.user], timeout=30)
view.left_button.disabled = True
view.left_button.label = f'Prev: -/{len(card_embeds)}'
view.cancel_button.label = f'Take This Card'
view.cancel_button.style = discord.ButtonStyle.success
view.cancel_button.disabled = True
view.right_button.label = f'Next: 1/{len(card_embeds)}'
msg = await interaction.channel.send(
content=None,
embed=image_embed(
image_url=cover_image_url,
title=cover_title,
desc=cover_desc
),
view=view
)
else:
page_num = 1
view = Pagination([interaction.user], timeout=30)
view.left_button.label = f'Prev: -/{len(card_embeds)}'
view.left_button.disabled = True
view.cancel_button.label = f'Take This Card'
view.cancel_button.style = discord.ButtonStyle.success
view.right_button.label = f'Next: {page_num + 1}/{len(card_embeds)}'
msg = await interaction.channel.send(content=None, embeds=card_embeds[page_num - 1], view=view)
if temp_message is not None:
temp_msg = await interaction.channel.send(content=temp_message)
else:
temp_msg = None
while True:
await view.wait()
if view.value:
if view.value == 'cancel':
await msg.edit(view=None)
if callback is not None:
callback(all_players[page_num - 1])
if conf_message is not None:
if temp_msg is not None:
await temp_msg.edit(content=conf_message)
else:
await interaction.channel.send(content=conf_message)
break
if view.value == 'left':
page_num -= 1 if page_num > 1 else len(card_embeds)
if view.value == 'right':
page_num += 1 if page_num < len(card_embeds) else 1
else:
if page_num == len(card_embeds):
page_num = 1
else:
page_num += 1
view.value = None
view = Pagination([interaction.user], timeout=30)
view.left_button.label = f'Prev: {page_num - 1}/{len(card_embeds)}'
view.cancel_button.label = f'Take This Card'
view.cancel_button.style = discord.ButtonStyle.success
view.right_button.label = f'Next: {page_num + 1}/{len(card_embeds)}'
if page_num == 1:
view.left_button.label = f'Prev: -/{len(card_embeds)}'
view.left_button.disabled = True
elif page_num == len(card_embeds):
view.right_button.label = f'Next: -/{len(card_embeds)}'
view.right_button.disabled = True
await msg.edit(content=None, embeds=card_embeds[page_num - 1], view=view)
if delete_message:
await msg.delete()
return all_players[page_num - 1]
async def open_choice_pack(this_pack, team: dict, context, cardset_id: Optional[int] = None):
pack_channel = get_channel(context, 'pack-openings')
pack_cover = get_pack_cover(this_pack)
pack_type = this_pack['pack_type']['name']
players = []
if pack_type == 'Mario':
d1000 = random.randint(1, 1000)
if d1000 > 800:
rarity_id = 5
elif d1000 > 550:
rarity_id = 3
else:
rarity_id = 2
pl = await db_get(
'players/random',
params=[
('cardset_id', 8), ('min_rarity', rarity_id), ('max_rarity', rarity_id), ('limit', 4)
]
)
players = pl['players']
elif pack_type == 'Team Choice':
if this_pack['pack_team'] is None:
raise KeyError(f'Team not listed for Team Choice pack')
d1000 = random.randint(1, 1000)
pack_cover = this_pack['pack_team']['logo']
if d1000 > 800:
rarity_id = 5
pack_cover = IMAGES['mvp'][this_pack['pack_team']['lname']]
elif d1000 > 550:
rarity_id = 3
else:
rarity_id = 2
# # HAX FOR SOCC TO GET HIS MVP PACK
# if (team['abbrev'] in ['KSK', 'NJY']) and (datetime.datetime.today().day == 24):
# rarity_id = 5
min_rarity = rarity_id
while len(players) < 4 and rarity_id < 10:
params = [
('min_rarity', min_rarity), ('max_rarity', rarity_id), ('limit', 4 - len(players)),
('franchise', this_pack['pack_team']['lname'])
]
if this_pack['pack_team']['abbrev'] not in ['MSS']:
params.append(('in_packs', True))
if cardset_id is not None:
params.append(('cardset_id', cardset_id))
pl = await db_get(
'players/random',
params=params
)
if pl['count'] >= 0:
for x in pl['players']:
if x not in players:
players.append(x)
if len(players) < 4:
min_rarity += 1
rarity_id += 1
elif pack_type == 'Promo Choice':
if this_pack['pack_cardset'] is None:
raise KeyError(f'Cardset not listed for Promo Choice pack')
d1000 = random.randint(1, 1000)
pack_cover = IMAGES['mvp-hype']
cardset_id = this_pack['pack_cardset']['id']
rarity_id = 5
if d1000 > 800:
rarity_id = 8
while len(players) < 4 and rarity_id < 10:
pl = await db_get(
'players/random',
params=[('cardset_id', cardset_id), ('min_rarity', rarity_id), ('max_rarity', rarity_id),
('limit', 8)]
)
if pl['count'] >= 0:
for x in pl['players']:
if len(players) >= 4:
break
if x not in players:
players.append(x)
if len(players) < 4:
cardset_id = LIVE_CARDSET_ID
else:
# Get 4 MVP cards
rarity_id = 5
if pack_type == 'HoF':
rarity_id = 8
elif pack_type == 'All Star':
rarity_id = 3
min_rarity = rarity_id
while len(players) < 4 and rarity_id < 10:
params = [
('min_rarity', min_rarity), ('max_rarity', rarity_id), ('limit', 4), ('in_packs', True)
]
if this_pack['pack_team'] is not None:
params.append(('franchise', this_pack['pack_team']['lname']))
if cardset_id is not None:
params.append(('cardset_id', cardset_id))
pl = await db_get('players/random', params=params)
if pl['count'] > 0:
players.extend(pl['players'])
if len(players) < 4:
rarity_id += 3
if len(players) == 0:
logger.error(f'Could not create choice pack')
raise ConnectionError(f'Could not create choice pack')
if type(context) == commands.Context:
author = context.author
else:
author = context.user
logger.info(f'helpers - open_choice_pack - players: {players}')
# Display them with pagination, prev/next/select
card_embeds = [
await get_card_embeds(
# {'player': x, 'team': {'lname': 'Paper Dynasty', 'season': PD_SEASON, 'logo': IMAGES['logo']}}
{'player': x, 'team': team} # Show team and dupe info
) for x in players
]
logger.debug(f'card embeds: {card_embeds}')
page_num = 0
view = Pagination([author], timeout=30)
view.left_button.disabled = True
view.left_button.label = f'Prev: -/{len(card_embeds)}'
view.cancel_button.label = f'Take This Card'
view.cancel_button.style = discord.ButtonStyle.success
view.cancel_button.disabled = True
view.right_button.label = f'Next: 1/{len(card_embeds)}'
# React to selection
await context.channel.send(f'Let\'s head down to {pack_channel.mention}!')
msg = await pack_channel.send(
content=None,
embed=image_embed(pack_cover, title=f'{team["lname"]}', desc=f'{pack_type} Pack - Choose 1 of 4 {pack_type}s!'),
view=view
)
if rarity_id >= 5:
tmp_msg = await pack_channel.send(content=f'<@&1163537676885033010> we\'ve got an MVP!')
else:
tmp_msg = await pack_channel.send(content=f'We\'ve got a choice pack here!')
while True:
await view.wait()
if view.value:
if view.value == 'cancel':
await msg.edit(view=None)
try:
await give_cards_to_team(team, players=[players[page_num - 1]], pack_id=this_pack['id'])
except Exception as e:
logger.error(f'failed to create cards: {e}')
raise ConnectionError(f'Failed to distribute these cards.')
await db_patch('packs', object_id=this_pack['id'], params=[
('open_time', int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000))
])
await tmp_msg.edit(
content=f'{players[page_num - 1]["p_name"]} has been added to the '
f'**{team["sname"]}** binder!'
)
break
if view.value == 'left':
page_num -= 1 if page_num > 1 else len(card_embeds)
if view.value == 'right':
page_num += 1 if page_num < len(card_embeds) else 1
else:
if page_num == len(card_embeds):
page_num = 1
else:
page_num += 1
view.value = None
view = Pagination([author], timeout=30)
view.left_button.label = f'Prev: {page_num - 1}/{len(card_embeds)}'
view.cancel_button.label = f'Take This Card'
view.cancel_button.style = discord.ButtonStyle.success
view.right_button.label = f'Next: {page_num + 1}/{len(card_embeds)}'
if page_num == 1:
view.left_button.label = f'Prev: -/{len(card_embeds)}'
view.left_button.disabled = True
elif page_num == len(card_embeds):
view.right_button.label = f'Next: -/{len(card_embeds)}'
view.right_button.disabled = True
await msg.edit(content=None, embeds=card_embeds[page_num - 1], view=view)
async def confirm_pack_purchase(interaction, owner_team, num_packs, total_cost, pack_embed):
view = Confirm(responders=[interaction.user], timeout=30)
await interaction.channel.send(
content=None,
embed=pack_embed
)
question = await interaction.channel.send(
content=f'Your Wallet: {owner_team["wallet"]}\n'
f'Pack{"s" if num_packs > 1 else ""} Price: {total_cost}\n'
f'After Purchase: {owner_team["wallet"] - total_cost}\n\n'
f'Would you like to make this purchase?',
view=view
)
await view.wait()
if not view.value:
await question.edit(
content='Saving that money. Smart.',
view=None
)
return None
else:
return question
def player_desc(this_player) -> str:
if this_player['p_name'] in this_player['description']:
return this_player['description']
return f'{this_player["description"]} {this_player["p_name"]}'
def player_pcard(this_player):
if this_player['image'] is not None and 'pitching' in this_player['image']:
return this_player['image']
elif this_player['image2'] is not None and 'pitching' in this_player['image2']:
return this_player['image2']
else:
return this_player['image']
def player_bcard(this_player):
if this_player['image'] is not None and 'batting' in this_player['image']:
return this_player['image']
elif this_player['image2'] is not None and 'batting' in this_player['image2']:
return this_player['image2']
# elif this_player['image'] is not None and 'pitching' in this_player['image']:
# return PITCHER_BATTING_CARD
else:
return this_player['image']