import asyncio import datetime import logging import math import os import random import traceback import discord import pygsheets import requests from discord.ext import commands from api_calls import * from bs4 import BeautifulSoup from difflib import get_close_matches from dataclasses import dataclass from typing import Optional, Literal, Union, List from exceptions import log_exception from in_game.gameplay_models import Team from constants import * from discord_ui import * from random_content import * from utils import position_name_to_abbrev, user_has_role, get_roster_sheet_legacy, get_roster_sheet, get_player_url, owner_only, get_cal_user, get_context_user from search_utils import * from discord_utils import * async def get_player_photo(player): search_term = player['bbref_id'] if player['bbref_id'] else player['p_name'] req_url = f'https://www.thesportsdb.com/api/v1/json/1/searchplayers.php?p={search_term}' try: resp = requests.get(req_url, timeout=.5) except Exception as e: return None if resp.status_code == 200 and resp.json()['player']: if resp.json()['player'][0]['strSport'] == 'Baseball': await db_patch('players', object_id=player['player_id'], params=[('headshot', resp.json()['player'][0]['strThumb'])]) return resp.json()['player'][0]['strThumb'] return None async def get_player_headshot(player): search_term = player['bbref_id'] if player['bbref_id'] else player['p_name'] req_url = f'https://www.baseball-reference.com/search/search.fcgi?search={search_term}' try: resp = requests.get(req_url, timeout=2).text soup = BeautifulSoup(resp, 'html.parser') for item in soup.find_all('img'): if 'headshot' in item['src']: await db_patch('players', object_id=player['player_id'], params=[('headshot', item['src'])]) return item['src'] except: pass return await get_player_photo(player) """ NEW FOR SEASON 4 """ async def get_team_by_owner(owner_id: int): team = await db_get('teams', params=[('gm_id', owner_id)]) if not team['count']: return None return team['teams'][0] async def team_role(ctx, team: Team): return await get_or_create_role(ctx, f'{team.abbrev} - {team.lname}') def get_all_pos(player): all_pos = [] for x in range(1, 8): if player[f'pos_{x}']: all_pos.append(player[f'pos_{x}']) return all_pos async def share_channel(channel, user, read_only=False): await channel.set_permissions(user, read_messages=True, send_messages=not read_only) async def get_card_embeds(card, include_stats=False) -> list: embed = discord.Embed( title=f'{card["player"]["p_name"]}', color=int(card['player']['rarity']['color'], 16) ) # embed.description = card['team']['lname'] embed.description = f'{card["player"]["cardset"]["name"]} / {card["player"]["mlbclub"]}' embed.set_author(name=card['team']['lname'], url=IMAGES['logo'], icon_url=card['team']['logo']) embed.set_footer(text=f'Paper Dynasty Season {card["team"]["season"]}', icon_url=IMAGES['logo']) if include_stats: b_query = await db_get( 'plays/batting', params=[('player_id', card['player']['player_id']), ('season', PD_SEASON)]) p_query = await db_get( 'plays/pitching', params=[('player_id', card['player']['player_id']), ('season', PD_SEASON)]) embed.add_field(name='Player ID', value=f'{card["player"]["player_id"]}') embed.add_field(name='Rarity', value=f'{card["player"]["rarity"]["name"]}') embed.add_field(name='Cost', value=f'{card["player"]["cost"]}₼') pos_string = ", ".join(get_all_pos(card['player'])) embed.add_field(name='Positions', value=pos_string) # all_dex = card['player']['paperdex'] all_dex = await db_get('paperdex', params=[("player_id", card["player"]["player_id"]), ('flat', True)]) count = all_dex['count'] if card['team']['lname'] != 'Paper Dynasty': bool_list = [True for elem in all_dex['paperdex'] if elem['team'] == card['team'].get('id', None)] if any(bool_list): if count == 1: coll_string = f'Only you' else: coll_string = f'You and {count - 1} other{"s" if count - 1 != 1 else ""}' elif count: coll_string = f'{count} other team{"s" if count != 1 else ""}' else: coll_string = f'0 teams' embed.add_field(name='Collected By', value=coll_string) else: embed.add_field(name='Collected By', value=f'{count} team{"s" if count != 1 else ""}') # TODO: check for dupes with the included paperdex data # if card['team']['lname'] != 'Paper Dynasty': # team_dex = await db_get('cards', params=[("player_id", card["player"]["player_id"]), ('team_id', card['team']['id'])]) # count = 1 if not team_dex['count'] else team_dex['count'] # embed.add_field(name='# Dupes', value=f'{count - 1} dupe{"s" if count - 1 != 1 else ""}') # embed.add_field(name='Team', value=f'{card["player"]["mlbclub"]}') if card['player']['franchise'] != 'Pokemon': player_pages = f'[BBRef](https://www.baseball-reference.com/players/{card["player"]["bbref_id"][0]}/{card["player"]["bbref_id"]}.shtml)' else: player_pages = f'[Pkmn]({PKMN_REF_URL}{card["player"]["bbref_id"]})' embed.add_field(name='Player Page', value=f'{player_pages}') embed.set_image(url=card["player"]["image"]) headshot = card['player']['headshot'] if card['player']['headshot'] else await get_player_headshot(card['player']) if headshot: embed.set_thumbnail(url=headshot) else: embed.set_thumbnail(url=IMAGES['logo']) if card['player']['franchise'] == 'Pokemon': if card['player']['fangr_id'] is not None: try: evo_mon = await db_get('players', object_id=card['player']['fangr_id'], none_okay=True) if evo_mon is not None: embed.add_field( name='Evolves Into', value=f'{evo_mon["p_name"]}' ) except Exception as e: logging.error('could not pull evolution: {e}', exc_info=True, stack_info=True) if '420420' not in card['player']['strat_code']: try: evo_mon = await db_get('players', object_id=card['player']['strat_code'], none_okay=True) if evo_mon is not None: embed.add_field( name='Evolves From', value=f'{evo_mon["p_name"]}' ) except Exception as e: logging.error('could not pull evolution: {e}', exc_info=True, stack_info=True) if include_stats: if b_query['count'] > 0: b = b_query['stats'][0] re24 = f'{b["re24"]:.2f}' batting_string = f'```\n' \ f' AVG OBP SLG\n' \ f' {b["avg"]:.3f} {b["obp"]:.3f} {b["slg"]:.3f}\n``````\n' \ f' OPS wOBA RE24\n' \ f' {b["ops"]:.3f} {b["woba"]:.3f} {re24: ^5}\n``````\n' \ f' PA H RBI 2B 3B HR SB\n' \ f'{b["pa"]: >3} {b["hit"]: ^3} {b["rbi"]: ^3} {b["double"]: >2} {b["triple"]: >2} ' \ f'{b["hr"]: >2} {b["sb"]: >2}```\n' embed.add_field(name='Batting Stats', value=batting_string, inline=False) if p_query['count'] > 0: p = p_query['stats'][0] ip_whole = math.floor(p['outs'] / 3) ip_denom = p['outs'] % 3 ips = ip_whole + (ip_denom * 0.1) kpbb = f'{p["k/bb"]:.1f}' era = f'{p["era"]:.2f}' whip = f'{p["whip"]:.2f}' re24 = f'{p["re24"]:.2f}' pitching_string = f'```\n' \ f' W-L SV ERA WHIP\n' \ f'{p["win"]: >2}-{p["loss"]: <2} {p["save"]: >2} {era: >5} {whip: >4}\n``````\n' \ f' IP SO K/BB RE24\n' \ f'{ips: >5} {p["so"]: ^3} {kpbb: ^4} {re24: ^5}\n```' embed.add_field(name='Pitching Stats', value=pitching_string, inline=False) if not card['player']['image2']: return [embed] card_two = discord.Embed(color=int(card['player']['rarity']['color'], 16)) card_two.set_footer(text=f'Paper Dynasty Season {card["team"]["season"]}', icon_url=IMAGES['logo']) card_two.set_image(url=card['player']['image2']) return [embed, card_two] def image_embed(image_url: str, title: str = None, color: str = None, desc: str = None, author_name: str = None, author_icon: str = None): embed_color = int(SBA_COLOR, 16) if color is not None: embed_color = int(color, 16) embed = discord.Embed(color=embed_color) if title is not None: embed.title = title if desc is not None: embed.description = desc if author_name is not None: icon = author_icon if author_icon is not None else IMAGES['logo'] embed.set_author(name=author_name, icon_url=icon) embed.set_footer(text=f'Paper Dynasty Season {PD_SEASON}', icon_url=IMAGES['logo']) embed.set_image(url=image_url) return embed def is_shiny(card): if card['player']['rarity']['value'] >= 5: return True return False async def display_cards( cards: list, team: dict, channel, user, bot=None, pack_cover: str = None, cust_message: str = None, add_roster: bool = True, pack_name: str = None) -> bool: logger.info(f'display_cards called with {len(cards)} cards for team {team.get("abbrev", "Unknown")}') try: cards.sort(key=lambda x: x['player']['rarity']['value']) logger.debug(f'Cards sorted successfully') card_embeds = [await get_card_embeds(x) for x in cards] logger.debug(f'Created {len(card_embeds)} card embeds') page_num = 0 if pack_cover is None else -1 seen_shiny = False logger.debug(f'Initial page_num: {page_num}, pack_cover: {pack_cover is not None}') except Exception as e: logger.error(f'Error in display_cards initialization: {e}', exc_info=True) return False try: view = Pagination([user], timeout=10) # Use simple text arrows instead of emojis to avoid context issues l_emoji = '←' r_emoji = '→' view.left_button.disabled = True view.left_button.label = f'{l_emoji}Prev: -/{len(card_embeds)}' view.cancel_button.label = f'Close Pack' view.right_button.label = f'Next: {page_num + 2}/{len(card_embeds)}{r_emoji}' if len(cards) == 1: view.right_button.disabled = True logger.debug(f'Pagination view created successfully') if pack_cover: logger.debug(f'Sending pack cover message') msg = await channel.send( content=None, embed=image_embed(pack_cover, title=f'{team["lname"]}', desc=pack_name), view=view ) else: logger.debug(f'Sending card embed message for page {page_num}') msg = await channel.send(content=None, embeds=card_embeds[page_num], view=view) logger.debug(f'Initial message sent successfully') except Exception as e: logger.error(f'Error creating view or sending initial message: {e}', exc_info=True) return False try: if cust_message: logger.debug(f'Sending custom message: {cust_message[:50]}...') follow_up = await channel.send(cust_message) else: logger.debug(f'Sending default message for {len(cards)} cards') follow_up = await channel.send(f'{user.mention} you\'ve got {len(cards)} cards here') logger.debug(f'Follow-up message sent successfully') except Exception as e: logger.error(f'Error sending follow-up message: {e}', exc_info=True) return False logger.debug(f'Starting main interaction loop') while True: try: logger.debug(f'Waiting for user interaction on page {page_num}') await view.wait() logger.debug(f'User interaction received: {view.value}') except Exception as e: logger.error(f'Error in view.wait(): {e}', exc_info=True) await msg.edit(view=None) return False if view.value: if view.value == 'cancel': await msg.edit(view=None) if add_roster: await follow_up.edit(content=f'Refresh your cards here: {get_roster_sheet(team)}') return True if view.value == 'left': page_num -= 1 if page_num > 0 else 0 if view.value == 'right': page_num += 1 if page_num < len(card_embeds) - 1 else 0 else: if page_num == len(card_embeds) - 1: await msg.edit(view=None) if add_roster: await follow_up.edit(content=f'Refresh your cards here: {get_roster_sheet(team)}') return True else: page_num += 1 view.value = None try: if is_shiny(cards[page_num]) and not seen_shiny: logger.info(f'Shiny card detected on page {page_num}: {cards[page_num]["player"]["p_name"]}') seen_shiny = True view = Pagination([user], timeout=300) view.cancel_button.style = discord.ButtonStyle.success view.cancel_button.label = 'Flip!' view.left_button.label = '-' view.right_button.label = '-' view.left_button.disabled = True view.right_button.disabled = True # Get MVP image safely with fallback franchise = cards[page_num]["player"]["franchise"] logger.debug(f'Getting MVP image for franchise: {franchise}') mvp_image = IMAGES['mvp'].get(franchise, IMAGES.get('mvp-hype', IMAGES['logo'])) await msg.edit( embed=image_embed( mvp_image, color='56f1fa', author_name=team['lname'], author_icon=team['logo'] ), view=view) logger.debug(f'MVP display updated successfully') except Exception as e: logger.error(f'Error processing shiny card on page {page_num}: {e}', exc_info=True) # Continue with regular flow instead of crashing try: tmp_msg = await channel.send(content=f'<@&1163537676885033010> we\'ve got an MVP!') await follow_up.edit(content=f'<@&1163537676885033010> we\'ve got an MVP!') await tmp_msg.delete() except discord.errors.NotFound: # Role might not exist or message was already deleted await follow_up.edit(content=f'We\'ve got an MVP!') except Exception as e: # Log error but don't crash the function logger.error(f'Error handling MVP notification: {e}') await follow_up.edit(content=f'We\'ve got an MVP!') await view.wait() view = Pagination([user], timeout=10) try: view.right_button.label = f'Next: {page_num + 2}/{len(card_embeds)}{r_emoji}' view.cancel_button.label = f'Close Pack' view.left_button.label = f'{l_emoji}Prev: {page_num}/{len(card_embeds)}' if page_num == 0: view.left_button.label = f'{l_emoji}Prev: -/{len(card_embeds)}' view.left_button.disabled = True elif page_num == len(card_embeds) - 1: view.timeout = 600.0 view.right_button.label = f'Next: -/{len(card_embeds)}{r_emoji}' view.right_button.disabled = True logger.debug(f'Updating message to show page {page_num}/{len(card_embeds)}') if page_num >= len(card_embeds): logger.error(f'Page number {page_num} exceeds card_embeds length {len(card_embeds)}') page_num = len(card_embeds) - 1 await msg.edit(content=None, embeds=card_embeds[page_num], view=view) logger.debug(f'Message updated successfully to page {page_num}') except Exception as e: logger.error(f'Error updating message on page {page_num}: {e}', exc_info=True) # Try to clean up and return try: await msg.edit(view=None) except: pass # If this fails too, just give up return False async def embed_pagination( all_embeds: list, channel, user: discord.Member, custom_message: str = None, timeout: int = 10, start_page: int = 0): if start_page > len(all_embeds) - 1 or start_page < 0: page_num = 0 else: page_num = start_page view = Pagination([user], timeout=timeout) l_emoji = '' r_emoji = '' view.right_button.label = f'Next: {page_num + 2}/{len(all_embeds)}{r_emoji}' view.cancel_button.label = f'Cancel' view.left_button.label = f'{l_emoji}Prev: {page_num}/{len(all_embeds)}' if page_num == 0: view.left_button.label = f'{l_emoji}Prev: -/{len(all_embeds)}' view.left_button.disabled = True elif page_num == len(all_embeds) - 1: view.right_button.label = f'Next: -/{len(all_embeds)}{r_emoji}' view.right_button.disabled = True msg = await channel.send(content=custom_message, embed=all_embeds[page_num], view=view) while True: await view.wait() if view.value: if view.value == 'cancel': await msg.edit(view=None) return True if view.value == 'left': page_num -= 1 if page_num > 0 else 0 if view.value == 'right': page_num += 1 if page_num <= len(all_embeds) else len(all_embeds) else: if page_num == len(all_embeds) - 1: await msg.edit(view=None) return True else: page_num += 1 view.value = None view = Pagination([user], timeout=timeout) view.right_button.label = f'Next: {page_num + 2}/{len(all_embeds)}{r_emoji}' view.cancel_button.label = f'Cancel' view.left_button.label = f'{l_emoji}Prev: {page_num}/{len(all_embeds)}' if page_num == 0: view.left_button.label = f'{l_emoji}Prev: -/{len(all_embeds)}' view.left_button.disabled = True elif page_num == len(all_embeds) - 1: view.timeout = 600.0 view.right_button.label = f'Next: -/{len(all_embeds)}{r_emoji}' view.right_button.disabled = True await msg.edit(content=None, embed=all_embeds[page_num], view=view) async def get_test_pack(ctx, team): pull_notifs = [] this_pack = await db_post('packs/one', payload={ 'team_id': team['id'], 'pack_type_id': 1, 'open_time': int(datetime.datetime.timestamp(datetime.datetime.now())*1000) }) ft_query = await db_get('players/random', params=[('max_rarity', 1), ('limit', 3)]) four_query = await db_get('players/random', params=[('min_rarity', 1), ('max_rarity', 3), ('limit', 1)]) five_query = await db_get('players/random', params=[('min_rarity', 5), ('max_rarity', 5), ('limit', 1)]) first_three = ft_query['players'] fourth = four_query['players'] fifth = five_query['players'] all_cards = [*first_three, *fourth, *fifth] success = await db_post('cards', timeout=10, payload={'cards': [{ 'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in all_cards] }) if not success: await ctx.send(f'I was not able to create these cards {get_emoji(ctx, "slight_frown")}') return for x in all_cards: if x['rarity']['value'] >= 3: pull_notifs.append(x) for pull in pull_notifs: await db_post('notifs', payload={ 'created': int(datetime.datetime.timestamp(datetime.datetime.now())*1000), 'title': 'Rare Pull', 'field_name': f'{player_desc(pull)} ({pull["rarity"]["name"]})', 'message': f'Pulled by {team["abbrev"]}', 'about': f'Player-{pull["player_id"]}' }) return [{'player': x, 'team': team} for x in all_cards] async def roll_for_cards(all_packs: list, extra_val=None) -> list: """ Pack odds are calculated based on the pack type Parameters ---------- extra_val all_packs Returns ------- """ all_players = [] team = all_packs[0]['team'] pack_ids = [] for pack in all_packs: counts = { 'Rep': { 'count': 0, 'rarity': 0 }, 'Res': { 'count': 0, 'rarity': 1 }, 'Sta': { 'count': 0, 'rarity': 2 }, 'All': { 'count': 0, 'rarity': 3 }, 'MVP': { 'count': 0, 'rarity': 5 }, 'HoF': { 'count': 0, 'rarity': 8 }, } this_pack_players = [] if pack['pack_type']['name'] == 'Standard': # Cards 1 - 2 for x in range(2): d_1000 = random.randint(1, 1000) if d_1000 <= 450: counts['Rep']['count'] += 1 elif d_1000 <= 900: counts['Res']['count'] += 1 else: counts['Sta']['count'] += 1 # Card 3 d_1000 = random.randint(1, 1000) if d_1000 <= 350: counts['Rep']['count'] += 1 elif d_1000 <= 700: counts['Res']['count'] += 1 elif d_1000 <= 950: counts['Sta']['count'] += 1 else: counts['All']['count'] += 1 # Card 4 d_1000 = random.randint(1, 1000) if d_1000 <= 310: counts['Rep']['count'] += 1 elif d_1000 <= 620: counts['Res']['count'] += 1 elif d_1000 <= 940: counts['Sta']['count'] += 1 elif d_1000 <= 990: counts['All']['count'] += 1 else: counts['MVP']['count'] += 1 # Card 5 d_1000 = random.randint(1, 1000) if d_1000 <= 215: counts['Rep']['count'] += 1 elif d_1000 <= 430: counts['Res']['count'] += 1 elif d_1000 <= 930: counts['Sta']['count'] += 1 elif d_1000 <= 980: counts['All']['count'] += 1 elif d_1000 <= 990: counts['MVP']['count'] += 1 else: counts['HoF']['count'] += 1 elif pack['pack_type']['name'] == 'Premium': # Card 1 d_1000 = random.randint(1, 1000) if d_1000 <= 400: counts['Rep']['count'] += 1 elif d_1000 <= 870: counts['Res']['count'] += 1 elif d_1000 <= 970: counts['Sta']['count'] += 1 elif d_1000 <= 990: counts['All']['count'] += 1 else: counts['MVP']['count'] += 1 # Card 2 d_1000 = random.randint(1, 1000) if d_1000 <= 300: counts['Rep']['count'] += 1 elif d_1000 <= 770: counts['Res']['count'] += 1 elif d_1000 <= 970: counts['Sta']['count'] += 1 elif d_1000 <= 990: counts['All']['count'] += 1 else: counts['MVP']['count'] += 1 # Card 3 d_1000 = random.randint(1, 1000) if d_1000 <= 200: counts['Rep']['count'] += 1 elif d_1000 <= 640: counts['Res']['count'] += 1 elif d_1000 <= 940: counts['Sta']['count'] += 1 elif d_1000 <= 990: counts['All']['count'] += 1 else: counts['MVP']['count'] += 1 # Card 4 d_1000 = random.randint(1, 1000) if d_1000 <= 100: counts['Rep']['count'] += 1 if d_1000 <= 530: counts['Res']['count'] += 1 elif d_1000 <= 930: counts['Sta']['count'] += 1 elif d_1000 <= 980: counts['All']['count'] += 1 elif d_1000 <= 990: counts['MVP']['count'] += 1 else: counts['HoF']['count'] += 1 # Card 5 d_1000 = random.randint(1, 1000) if d_1000 <= 380: counts['Res']['count'] += 1 elif d_1000 <= 880: counts['Sta']['count'] += 1 elif d_1000 <= 980: counts['All']['count'] += 1 elif d_1000 <= 990: counts['MVP']['count'] += 1 else: counts['HoF']['count'] += 1 elif pack['pack_type']['name'] == 'Check-In Player': logger.info(f'Building Check-In Pack // extra_val (type): {extra_val} {type(extra_val)}') # Single Card mod = 0 if isinstance(extra_val, int): mod = extra_val d_1000 = random.randint(1, 1000 + mod) if d_1000 >= 1100: counts['All']['count'] += 1 elif d_1000 >= 1000: counts['Sta']['count'] += 1 elif d_1000 >= 500: counts['Res']['count'] += 1 else: counts['Rep']['count'] += 1 else: raise TypeError(f'Pack type not recognized: {pack["pack_type"]["name"]}') pull_notifs = [] for key in counts: mvp_flag = None if counts[key]['count'] > 0: params = [ ('min_rarity', counts[key]['rarity']), ('max_rarity', counts[key]['rarity']), ('limit', counts[key]['count']) ] if all_packs[0]['pack_team'] is not None: params.extend([('franchise', all_packs[0]['pack_team']['lname']), ('in_packs', True)]) elif all_packs[0]['pack_cardset'] is not None: params.append(('cardset_id', all_packs[0]['pack_cardset']['id'])) else: params.append(('in_packs', True)) pl = await db_get('players/random', params=params) if pl['count'] != counts[key]['count']: mvp_flag = counts[key]['count'] - pl['count'] logging.info(f'Set mvp flag to {mvp_flag} / cardset_id: {all_packs[0]["pack_cardset"]["id"]}') for x in pl['players']: this_pack_players.append(x) all_players.append(x) if x['rarity']['value'] >= 3: pull_notifs.append(x) if mvp_flag and all_packs[0]['pack_cardset']['id'] not in [23]: logging.info(f'Adding {mvp_flag} MVPs for missing cards') pl = await db_get('players/random', params=[('min_rarity', 5), ('limit', mvp_flag)]) for x in pl['players']: this_pack_players.append(x) all_players.append(x) # Add dupes of Replacement/Reserve cards elif mvp_flag: logging.info(f'Adding {mvp_flag} duplicate pokemon cards') for count in range(mvp_flag): logging.info(f'Adding {pl["players"][0]["p_name"]} to the pack') this_pack_players.append(x) all_players.append(pl['players'][0]) success = await db_post( 'cards', payload={'cards': [{ 'player_id': x['player_id'], 'team_id': pack['team']['id'], 'pack_id': pack['id']} for x in this_pack_players] }, timeout=10 ) if not success: raise ConnectionError(f'Failed to create this pack of cards.') await db_patch('packs', object_id=pack['id'], params=[ ('open_time', int(datetime.datetime.timestamp(datetime.datetime.now())*1000)) ]) pack_ids.append(pack['id']) for pull in pull_notifs: logger.info(f'good pull: {pull}') await db_post('notifs', payload={ 'created': int(datetime.datetime.timestamp(datetime.datetime.now())*1000), 'title': 'Rare Pull', 'field_name': f'{player_desc(pull)} ({pull["rarity"]["name"]})', 'message': f'Pulled by {team["abbrev"]}', 'about': f'Player-{pull["player_id"]}' }) return pack_ids async def give_packs(team: dict, num_packs: int, pack_type: dict = None) -> dict: """ Parameters ---------- pack_type team num_packs Returns ------- { 'count': int, 'packs': [ all team packs ] } """ pt_id = pack_type['id'] if pack_type is not None else 1 await db_post( 'packs', payload={'packs': [{'team_id': team['id'], 'pack_type_id': pt_id} for x in range(num_packs)]} ) total_packs = await db_get('packs', params=[ ('team_id', team['id']), ('opened', False) ]) return total_packs def get_sheets(bot): try: return bot.get_cog('Gameplay').sheets except Exception as e: logger.error(f'Could not grab sheets auth: {e}') raise ConnectionError(f'Bot has not authenticated with discord; please try again in 1 minute.') def create_team_sheet(team, email: str, current, bot): sheets = get_sheets(bot) new_sheet = sheets.drive.copy_file( f'{current["gsheet_template"]}', f'{team["lname"]} Roster Sheet v{current["gsheet_version"]}', '1539D0imTMjlUx2VF3NPMt7Sv85sb2XAJ' ) logger.info(f'new_sheet: {new_sheet}') this_sheet = sheets.open_by_key(new_sheet['id']) this_sheet.share(email, role='writer') team_data = this_sheet.worksheet_by_title('Team Data') team_data.update_values( crange='B1:B2', values=[[f'{team["id"]}'], [f'\'{team_hash(team)}']] ) logger.debug(f'this_sheet: {this_sheet}') return this_sheet async def refresh_sheet(team, bot, sheets=None) -> None: return if not sheets: sheets = get_sheets(bot) this_sheet = sheets.open_by_key(team['gsheet']) my_cards = this_sheet.worksheet_by_title('My Cards') all_cards = this_sheet.worksheet_by_title('All Cards') my_cards.update_value('A2', 'FALSE') all_cards.update_value('A2', 'FALSE') await asyncio.sleep(1) my_cards.update_value('A2', 'TRUE') await asyncio.sleep(0.5) all_cards.update_value('A2', 'TRUE') def delete_sheet(team, bot): sheets = get_sheets(bot) this_sheet = sheets.open_by_key(team['gsheet']) this_sheet.delete() def share_sheet(team, email, bot) -> None: sheets = get_sheets(bot) this_sheet = sheets.open_by_key(team['gsheet']) this_sheet.share(email, role='writer') def int_timestamp(datetime_obj: datetime.datetime) -> int: return int(datetime.datetime.timestamp(datetime_obj) * 1000) def get_pos_abbrev(pos_name): if pos_name == 'Catcher': return 'C' elif pos_name == 'First Base': return '1B' elif pos_name == 'Second Base': return '2B' elif pos_name == 'Third Base': return '3B' elif pos_name == 'Shortstop': return 'SS' elif pos_name == 'Left Field': return 'LF' elif pos_name == 'Center Field': return 'CF' elif pos_name == 'Right Field': return 'RF' elif pos_name == 'Pitcher': return 'P' elif pos_name == 'Designated Hitter': return 'DH' elif pos_name == 'Pinch Hitter': return 'PH' else: raise KeyError(f'{pos_name} is not a recognized position name') async def cardset_search(cardset: str, cardset_list: list) -> Optional[dict]: cardset_name = fuzzy_search(cardset, cardset_list) if not cardset_name: return None c_query = await db_get('cardsets', params=[('name', cardset_name)]) if c_query['count'] == 0: return None return c_query['cardsets'][0] def get_blank_team_card(player): return {'player': player, 'team': {'lname': 'Paper Dynasty', 'logo': IMAGES['logo'], 'season': PD_SEASON, 'id': None}} def get_rosters(team, bot, roster_num: Optional[int] = None) -> list: sheets = get_sheets(bot) this_sheet = sheets.open_by_key(team['gsheet']) r_sheet = this_sheet.worksheet_by_title(f'My Rosters') logger.debug(f'this_sheet: {this_sheet} / r_sheet = {r_sheet}') all_rosters = [None, None, None] # Pull roster 1 if not roster_num or roster_num == 1: roster_1 = r_sheet.range('B3:B28') roster_name = r_sheet.cell('F30').value logger.info(f'roster_1: {roster_1}') if not roster_1[0][0].value == '': all_rosters[0] = {'name': roster_name, 'roster_num': 1, 'team_id': team['id'], 'cards': None} all_rosters[0]['cards'] = [int(x[0].value) for x in roster_1] # Pull roster 2 if not roster_num or roster_num == 2: roster_2 = r_sheet.range('B29:B54') roster_name = r_sheet.cell('F31').value logger.info(f'roster_2: {roster_2}') if not roster_2[0][0].value == '': all_rosters[1] = {'name': roster_name, 'roster_num': 2, 'team_id': team['id'], 'cards': None} all_rosters[1]['cards'] = [int(x[0].value) for x in roster_2] # Pull roster 3 if not roster_num or roster_num == 3: roster_3 = r_sheet.range('B55:B80') roster_name = r_sheet.cell('F32').value logger.info(f'roster_3: {roster_3}') if not roster_3[0][0].value == '': all_rosters[2] = {'name': roster_name, 'roster_num': 3, 'team_id': team['id'], 'cards': None} all_rosters[2]['cards'] = [int(x[0].value) for x in roster_3] return all_rosters def get_roster_lineups(team, bot, roster_num, lineup_num) -> list: sheets = get_sheets(bot) logger.debug(f'sheets: {sheets}') this_sheet = sheets.open_by_key(team['gsheet']) logger.debug(f'this_sheet: {this_sheet}') r_sheet = this_sheet.worksheet_by_title('My Rosters') logger.debug(f'r_sheet: {r_sheet}') if lineup_num == 1: row_start = 9 row_end = 17 else: row_start = 18 row_end = 26 if roster_num == 1: l_range = f'H{row_start}:I{row_end}' elif roster_num == 2: l_range = f'J{row_start}:K{row_end}' else: l_range = f'L{row_start}:M{row_end}' logger.debug(f'l_range: {l_range}') raw_cells = r_sheet.range(l_range) logger.debug(f'raw_cells: {raw_cells}') try: lineup_cells = [(row[0].value, int(row[1].value)) for row in raw_cells] except ValueError as e: logger.error(f'Could not pull roster for {team["abbrev"]} due to a ValueError') raise ValueError(f'Uh oh. Looks like your roster might not be saved. I am reading blanks when I try to ' f'get the card IDs') logger.debug(f'lineup_cells: {lineup_cells}') return lineup_cells def post_ratings_guide(team, bot, this_sheet=None): if not this_sheet: sheets = get_sheets(bot) this_sheet = sheets.open_by_key(team['gsheet']) p_guide = this_sheet.worksheet_by_title('Full Guide - Pitchers') b_guide = this_sheet.worksheet_by_title('Full Guide - Batters') p_guide.update_value('A1', RATINGS_PITCHER_FORMULA) b_guide.update_value('A1', RATINGS_BATTER_FORMULA) async def legal_channel(ctx): bad_channels = ['paper-dynasty-chat', 'pd-news-ticker', 'pd-network-news'] if isinstance(ctx, commands.Context): if ctx.channel.name in bad_channels: raise commands.CheckFailure(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)') else: return True elif ctx.channel.name in bad_channels: # await ctx.message.add_reaction('❌') # await ctx.send(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)') # logger.warning(f'{ctx.author.name} posted in illegal channel.') # return False raise discord.app_commands.AppCommandError(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)') else: return True def app_legal_channel(): """Check for slash commands (app_commands). Use as @app_legal_channel()""" async def predicate(interaction: discord.Interaction) -> bool: bad_channels = ['paper-dynasty-chat', 'pd-news-ticker', 'pd-network-news'] if interaction.channel.name in bad_channels: raise discord.app_commands.CheckFailure( f'Slide on down to the {get_channel(interaction, "pd-bot-hole").mention} ;)' ) return True return discord.app_commands.check(predicate) def is_ephemeral_channel(channel) -> bool: """Check if channel requires ephemeral responses (chat channels).""" if not channel or not hasattr(channel, 'name'): return False return channel.name in ['paper-dynasty-chat', 'pd-news-ticker'] def is_restricted_channel(channel) -> bool: """Check if channel is restricted for certain commands (chat/ticker channels).""" if not channel or not hasattr(channel, 'name'): return False return channel.name in ['paper-dynasty-chat', 'pd-news-ticker'] def can_send_message(channel) -> bool: """Check if channel supports sending messages.""" return channel and hasattr(channel, 'send') async def send_safe_message( source: Union[discord.Interaction, commands.Context], content: str = None, *, embeds: List[discord.Embed] = None, view: discord.ui.View = None, ephemeral: bool = False, delete_after: float = None ) -> discord.Message: """ Safely send a message using the most appropriate method based on context. For Interactions: 1. Try edit_original_response() if deferred 2. Try followup.send() if response is done 3. Try channel.send() if channel supports it For Context: 1. Try ctx.send() 2. Try DM to user with context info if channel send fails Args: source: Discord Interaction or Context object content: Message content embeds: List of embeds to send view: UI view to attach ephemeral: Whether message should be ephemeral (Interaction only) delete_after: Seconds after which to delete message Returns: The sent message object Raises: Exception: If all send methods fail """ logger = logging.getLogger('discord_app') # Prepare message kwargs kwargs = {} if content is not None: kwargs['content'] = content if embeds is not None: kwargs['embeds'] = embeds if view is not None: kwargs['view'] = view if delete_after is not None: kwargs['delete_after'] = delete_after # Handle Interaction objects if isinstance(source, discord.Interaction): # Add ephemeral parameter for interactions if ephemeral: kwargs['ephemeral'] = ephemeral # Strategy 1: Try edit_original_response if already deferred if source.response.is_done(): try: # For edit_original_response, we need to handle embeds differently edit_kwargs = kwargs.copy() if 'embeds' in edit_kwargs: # edit_original_response expects 'embeds' parameter pass # Already correct if 'ephemeral' in edit_kwargs: # Can't change ephemeral status on edit del edit_kwargs['ephemeral'] await source.edit_original_response(**edit_kwargs) # edit_original_response doesn't return a message object in the same way # We'll use followup as backup to get a returnable message if 'delete_after' not in kwargs: # Don't create extra messages if auto-deleting return await source.followup.send("Message sent", ephemeral=True, delete_after=0.1) return None # Can't return meaningful message object from edit except Exception as e: logger.debug(f"Failed to edit original response: {e}") # Strategy 2: Try followup.send() try: return await source.followup.send(**kwargs) except Exception as e: logger.debug(f"Failed to send followup message: {e}") # Strategy 3: Try channel.send() if possible if can_send_message(source.channel): try: # Remove ephemeral for channel send (not supported) channel_kwargs = kwargs.copy() if 'ephemeral' in channel_kwargs: del channel_kwargs['ephemeral'] return await source.channel.send(**channel_kwargs) except Exception as e: logger.debug(f"Failed to send channel message: {e}") # All interaction methods failed logger.error(f"All interaction message send methods failed for user {source.user.id}") raise RuntimeError("Unable to send interaction message through any available method") # Handle Context objects elif isinstance(source, commands.Context): # Strategy 1: Try ctx.send() directly try: # Remove ephemeral (not supported in Context) ctx_kwargs = kwargs.copy() if 'ephemeral' in ctx_kwargs: del ctx_kwargs['ephemeral'] return await source.send(**ctx_kwargs) except Exception as e: logger.debug(f"Failed to send context message to channel: {e}") # Strategy 2: Try DM to user with context info try: # Prepare DM with context information channel_name = getattr(source.channel, 'name', 'Unknown Channel') guild_name = getattr(source.guild, 'name', 'Unknown Server') if source.guild else 'DM' dm_content = f"[Bot Response from #{channel_name} in {guild_name}]\n\n" if content: dm_content += content # Send DM with modified content dm_kwargs = kwargs.copy() dm_kwargs['content'] = dm_content if 'ephemeral' in dm_kwargs: del dm_kwargs['ephemeral'] return await source.author.send(**dm_kwargs) except Exception as dm_error: logger.error(f"Failed to send DM fallback to user {source.author.id}: {dm_error}") # Both ctx.send() and DM failed - let the exception bubble up raise dm_error else: raise TypeError(f"Source must be discord.Interaction or commands.Context, got {type(source)}") def get_role(ctx, role_name): return discord.utils.get(ctx.guild.roles, name=role_name) async def team_summary_embed(team, ctx, include_roster: bool = True): embed = get_team_embed(f'{team["lname"]} Overview', team) embed.add_field(name='General Manager', value=team['gmname'], inline=False) embed.add_field(name='Wallet', value=f'{team["wallet"]}₼') # embed.add_field(name='Collection Value', value=team['collection_value']) p_query = await db_get('packs', params=[('team_id', team['id']), ('opened', False)]) if p_query['count'] > 0: all_packs = {} for x in p_query['packs']: if x['pack_type']['name'] not in all_packs: all_packs[x['pack_type']['name']] = 1 else: all_packs[x['pack_type']['name']] += 1 pack_string = '' for pack_type in all_packs: pack_string += f'{pack_type.title()}: {all_packs[pack_type]}\n' else: pack_string = 'None' embed.add_field(name='Unopened Packs', value=pack_string) embed.add_field(name='Team Rating', value=f'{team["ranking"]}') r_query = await db_get(f'results/team/{team["id"]}?season={PD_SEASON}') if r_query: embed.add_field( name='Record', value=f'Ranked: {r_query["ranked_wins"]}-{r_query["ranked_losses"]}\n' f'Unlimited: {r_query["casual_wins"]}-{r_query["casual_losses"]}' ) # try: # r_query = await db_get('rosters', params=[('team_id', team['id'])]) # if r_query['count']: # embed.add_field(name=f'Rosters', value=f'** **', inline=False) # for roster in r_query['rosters']: # roster_string = '' # for i in range(1, 27): # card = roster[f'card_{i}'] # roster_string += f'{card["player"]["description"]} ({card["player"]["pos_1"]})\n' # embed.add_field( # name=f'{roster["name"]} Roster', # value=roster_string if len(roster_string) else "Unknown" # ) # else: # embed.add_field( # name='Rosters', # value='You can set up to three rosters for quick switching from your team sheet.', # inline=False # ) # except Exception as e: # logger.error(f'Could not pull rosters for {team["abbrev"]}') # embed.add_field( # name='Rosters', # value='Unable to pull current rosters. `/pullroster` to sync.', # inline=False # ) if include_roster: embed.add_field(name='Team Sheet', value=get_roster_sheet(team), inline=False) embed.add_field( name='For Help', value=f'`/help-pd` has FAQs; feel free to post questions in ' f'{get_channel(ctx, "paper-dynasty-chat").mention}.', inline=False ) return embed async def give_cards_to_team(team, players: list = None, player_ids: list = None, pack_id=None): if not pack_id: p_query = await db_post( 'packs/one', payload={ 'team_id': team['id'], 'pack_type_id': 4, 'open_time': datetime.datetime.timestamp(datetime.datetime.now()) * 1000} ) pack_id = p_query['id'] if not players and not player_ids: raise ValueError('One of players or player_ids must be provided to distribute cards') if players: await db_post('cards', payload={'cards': [ {'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': pack_id} for x in players ]}, timeout=10) elif player_ids: await db_post('cards', payload={'cards': [ {'player_id': x, 'team_id': team['id'], 'pack_id': pack_id} for x in player_ids ]}, timeout=10) def get_ratings_guide(sheets): this_sheet = sheets.open_by_key(RATINGS_SHEET_KEY) b_sheet = this_sheet.worksheet_by_title('ratings_Batters') p_sheet = this_sheet.worksheet_by_title('ratings_Pitchers') b_data = b_sheet.range('A2:N') p_data = p_sheet.range('A2:N') try: batters = [ { 'player_id': int(x[0].value), 'p_name': x[1].value, 'rating': int(x[2].value), 'contact-r': int(x[3].value), 'contact-l': int(x[4].value), 'power-r': int(x[5].value), 'power-l': int(x[6].value), 'vision': int(x[7].value), 'speed': int(x[8].value), 'stealing': int(x[9].value), 'reaction': int(x[10].value), 'arm': int(x[11].value), 'fielding': int(x[12].value), 'hand': int(x[13].value), } for x in b_data ] pitchers = [ { 'player_id': int(x[0].value), 'p_name': x[1].value, 'rating': int(x[2].value), 'control-r': int(x[3].value), 'control-l': int(x[4].value), 'stuff-r': int(x[5].value), 'stuff-l': int(x[6].value), 'stamina': int(x[7].value), 'fielding': int(x[8].value), 'hit-9': int(x[9].value), 'k-9': int(x[10].value), 'bb-9': int(x[11].value), 'hr-9': int(x[12].value), 'hand': int(x[13].value), } for x in p_data ] except Exception as e: return {'valid': False} return { 'valid': True, 'batter_ratings': batters, 'pitcher_ratings': pitchers } async def paperdex_cardset_embed(team: dict, this_cardset: dict) -> list[discord.Embed]: all_dex = await db_get( 'paperdex', params=[('team_id', team['id']), ('cardset_id', this_cardset['id']), ('flat', True)] ) dex_player_list = [x['player'] for x in all_dex['paperdex']] hof_embed = get_team_embed(f'{team["lname"]} Collection', team=team) mvp_embed = get_team_embed(f'{team["lname"]} Collection', team=team) as_embed = get_team_embed(f'{team["lname"]} Collection', team=team) sta_embed = get_team_embed(f'{team["lname"]} Collection', team=team) res_embed = get_team_embed(f'{team["lname"]} Collection', team=team) rep_embed = get_team_embed(f'{team["lname"]} Collection', team=team) coll_data = { 99: { 'name': 'Hall of Fame', 'owned': 0, 'players': [], 'embeds': [hof_embed] }, 1: { 'name': 'MVP', 'owned': 0, 'players': [], 'embeds': [mvp_embed] }, 2: { 'name': 'All-Star', 'owned': 0, 'players': [], 'embeds': [as_embed] }, 3: { 'name': 'Starter', 'owned': 0, 'players': [], 'embeds': [sta_embed] }, 4: { 'name': 'Reserve', 'owned': 0, 'players': [], 'embeds': [res_embed] }, 5: { 'name': 'Replacement', 'owned': 0, 'players': [], 'embeds': [rep_embed] }, 'total_owned': 0 } set_players = await db_get( 'players', params=[('cardset_id', this_cardset['id']), ('flat', True), ('inc_dex', False)], timeout=5 ) for player in set_players['players']: if player['player_id'] in dex_player_list: coll_data[player['rarity']]['owned'] += 1 coll_data['total_owned'] += 1 player['owned'] = True else: player['owned'] = False logger.debug(f'player: {player} / type: {type(player)}') coll_data[player['rarity']]['players'].append(player) cover_embed = get_team_embed(f'{team["lname"]} Collection', team=team) cover_embed.description = this_cardset['name'] cover_embed.add_field(name='# Total Cards', value=f'{set_players["count"]}') cover_embed.add_field(name='# Collected', value=f'{coll_data["total_owned"]}') display_embeds = [cover_embed] for rarity_id in coll_data: if rarity_id != 'total_owned': if coll_data[rarity_id]['players']: coll_data[rarity_id]['embeds'][0].description = f'Rarity: {coll_data[rarity_id]["name"]}' coll_data[rarity_id]['embeds'][0].add_field( name='# Collected / # Total Cards', value=f'{coll_data[rarity_id]["owned"]} / {len(coll_data[rarity_id]["players"])}', inline=False ) chunk_string = '' for index, this_player in enumerate(coll_data[rarity_id]['players']): logger.debug(f'this_player: {this_player}') chunk_string += '☑ ' if this_player['owned'] else '⬜ ' chunk_string += f'{this_player["p_name"]}\n' if (index + 1) == len(coll_data[rarity_id]["players"]): coll_data[rarity_id]['embeds'][0].add_field( name=f'Group {math.ceil((index + 1) / 20)} / ' f'{math.ceil(len(coll_data[rarity_id]["players"]) / 20)}', value=chunk_string ) elif (index + 1) % 20 == 0: coll_data[rarity_id]['embeds'][0].add_field( name=f'Group {math.floor((index + 1) / 20)} / ' f'{math.ceil(len(coll_data[rarity_id]["players"]) / 20)}', value=chunk_string ) chunk_string = '' display_embeds.append(coll_data[rarity_id]['embeds'][0]) return display_embeds async def paperdex_team_embed(team: dict, mlb_team: dict) -> list[discord.Embed]: all_dex = await db_get( 'paperdex', params=[('team_id', team['id']), ('franchise', mlb_team['lname']), ('flat', True)] ) dex_player_list = [x['player'] for x in all_dex['paperdex']] c_query = await db_get('cardsets') coll_data = {'total_owned': 0} total_players = 0 for x in c_query['cardsets']: set_players = await db_get( 'players', params=[('cardset_id', x['id']), ('franchise', mlb_team['lname']), ('flat', True), ('inc_dex', False)] ) if set_players is not None: coll_data[x['id']] = { 'name': x['name'], 'owned': 0, 'players': [], 'embeds': [get_team_embed(f'{team["lname"]} Collection', team=team)] } total_players += set_players['count'] for player in set_players['players']: if player['player_id'] in dex_player_list: coll_data[x['id']]['owned'] += 1 coll_data['total_owned'] += 1 player['owned'] = True else: player['owned'] = False logger.debug(f'player: {player} / type: {type(player)}') coll_data[x['id']]['players'].append(player) cover_embed = get_team_embed(f'{team["lname"]} Collection', team=team) cover_embed.description = mlb_team['lname'] cover_embed.add_field(name='# Total Cards', value=f'{total_players}') cover_embed.add_field(name='# Collected', value=f'{coll_data["total_owned"]}') display_embeds = [cover_embed] for cardset_id in coll_data: if cardset_id != 'total_owned': if coll_data[cardset_id]['players']: coll_data[cardset_id]['embeds'][0].description = f'{mlb_team["lname"]} / ' \ f'{coll_data[cardset_id]["name"]}' coll_data[cardset_id]['embeds'][0].add_field( name='# Collected / # Total Cards', value=f'{coll_data[cardset_id]["owned"]} / {len(coll_data[cardset_id]["players"])}', inline=False ) chunk_string = '' for index, this_player in enumerate(coll_data[cardset_id]['players']): logger.debug(f'this_player: {this_player}') chunk_string += '☑ ' if this_player['owned'] else '⬜ ' chunk_string += f'{this_player["p_name"]}\n' if (index + 1) == len(coll_data[cardset_id]["players"]): coll_data[cardset_id]['embeds'][0].add_field( name=f'Group {math.ceil((index + 1) / 20)} / ' f'{math.ceil(len(coll_data[cardset_id]["players"]) / 20)}', value=chunk_string ) elif (index + 1) % 20 == 0: coll_data[cardset_id]['embeds'][0].add_field( name=f'Group {math.floor((index + 1) / 20)} / ' f'{math.ceil(len(coll_data[cardset_id]["players"]) / 20)}', value=chunk_string ) chunk_string = '' display_embeds.append(coll_data[cardset_id]['embeds'][0]) return display_embeds def get_pack_cover(pack): if pack['pack_cardset'] is not None and pack['pack_cardset'] == 23: return IMAGES['pack-pkmnbs'] elif pack['pack_type']['name'] in ['Premium', 'MVP']: return IMAGES['pack-pre'] elif pack['pack_type']['name'] == 'Standard': return IMAGES['pack-sta'] elif pack['pack_type']['name'] == 'Mario': return IMAGES['pack-mar'] else: return None async def open_st_pr_packs(all_packs: list, team: dict, context): pack_channel = get_channel(context, 'pack-openings') pack_cover = get_pack_cover(all_packs[0]) if pack_cover is None: pack_channel = context.channel if not pack_channel: raise ValueError(f'I cannot find the pack-openings channel. {get_cal_user(context).mention} - halp?') pack_ids = await roll_for_cards(all_packs) if not pack_ids: logger.error(f'open_packs - unable to roll_for_cards for packs: {all_packs}') raise ValueError(f'I was not able to unpack these cards') all_cards = [] for p_id in pack_ids: new_cards = await db_get('cards', params=[('pack_id', p_id)]) all_cards.extend(new_cards['cards']) if not all_cards: logger.error(f'open_packs - unable to get cards for packs: {pack_ids}') raise ValueError(f'I was not able to display these cards') # Present cards to opening channel if type(context) == commands.Context: author = context.author else: author = context.user await context.channel.send(content=f'Let\'s head down to {pack_channel.mention}!') await display_cards(all_cards, team, pack_channel, author, pack_cover=pack_cover) async def get_choice_from_cards( interaction: discord.Interaction, all_players: list = None, cover_title: str = None, cover_desc: str = None, cover_image_url: str = None, callback=None, temp_message: str = None, conf_message: str = None, delete_message: bool = False): # Display them with pagination, prev/next/select card_embeds = [ await get_card_embeds( {'player': x, 'team': {'lname': 'Paper Dynasty', 'season': PD_SEASON, 'logo': IMAGES['logo']}} ) for x in all_players ] logger.debug(f'card embeds: {card_embeds}') if cover_title is not None and cover_image_url is not None: page_num = 0 view = Pagination([interaction.user], timeout=30) view.left_button.disabled = True view.left_button.label = f'Prev: -/{len(card_embeds)}' view.cancel_button.label = f'Take This Card' view.cancel_button.style = discord.ButtonStyle.success view.cancel_button.disabled = True view.right_button.label = f'Next: 1/{len(card_embeds)}' msg = await interaction.channel.send( content=None, embed=image_embed( image_url=cover_image_url, title=cover_title, desc=cover_desc ), view=view ) else: page_num = 1 view = Pagination([interaction.user], timeout=30) view.left_button.label = f'Prev: -/{len(card_embeds)}' view.left_button.disabled = True view.cancel_button.label = f'Take This Card' view.cancel_button.style = discord.ButtonStyle.success view.right_button.label = f'Next: {page_num + 1}/{len(card_embeds)}' msg = await interaction.channel.send(content=None, embeds=card_embeds[page_num - 1], view=view) if temp_message is not None: temp_msg = await interaction.channel.send(content=temp_message) else: temp_msg = None while True: await view.wait() if view.value: if view.value == 'cancel': await msg.edit(view=None) if callback is not None: callback(all_players[page_num - 1]) if conf_message is not None: if temp_msg is not None: await temp_msg.edit(content=conf_message) else: await interaction.channel.send(content=conf_message) break if view.value == 'left': page_num -= 1 if page_num > 1 else len(card_embeds) if view.value == 'right': page_num += 1 if page_num < len(card_embeds) else 1 else: if page_num == len(card_embeds): page_num = 1 else: page_num += 1 view.value = None view = Pagination([interaction.user], timeout=30) view.left_button.label = f'Prev: {page_num - 1}/{len(card_embeds)}' view.cancel_button.label = f'Take This Card' view.cancel_button.style = discord.ButtonStyle.success view.right_button.label = f'Next: {page_num + 1}/{len(card_embeds)}' if page_num == 1: view.left_button.label = f'Prev: -/{len(card_embeds)}' view.left_button.disabled = True elif page_num == len(card_embeds): view.right_button.label = f'Next: -/{len(card_embeds)}' view.right_button.disabled = True await msg.edit(content=None, embeds=card_embeds[page_num - 1], view=view) if delete_message: await msg.delete() return all_players[page_num - 1] async def open_choice_pack(this_pack, team: dict, context, cardset_id: Optional[int] = None): pack_channel = get_channel(context, 'pack-openings') pack_cover = get_pack_cover(this_pack) pack_type = this_pack['pack_type']['name'] players = [] if pack_type == 'Mario': d1000 = random.randint(1, 1000) if d1000 > 800: rarity_id = 5 elif d1000 > 550: rarity_id = 3 else: rarity_id = 2 pl = await db_get( 'players/random', params=[ ('cardset_id', 8), ('min_rarity', rarity_id), ('max_rarity', rarity_id), ('limit', 4) ] ) players = pl['players'] elif pack_type == 'Team Choice': if this_pack['pack_team'] is None: raise KeyError(f'Team not listed for Team Choice pack') d1000 = random.randint(1, 1000) pack_cover = this_pack['pack_team']['logo'] if d1000 > 800: rarity_id = 5 pack_cover = IMAGES['mvp'][this_pack['pack_team']['lname']] elif d1000 > 550: rarity_id = 3 else: rarity_id = 2 # # HAX FOR SOCC TO GET HIS MVP PACK # if (team['abbrev'] in ['KSK', 'NJY']) and (datetime.datetime.today().day == 24): # rarity_id = 5 min_rarity = rarity_id while len(players) < 4 and rarity_id < 10: params = [ ('min_rarity', min_rarity), ('max_rarity', rarity_id), ('limit', 4 - len(players)), ('franchise', this_pack['pack_team']['lname']) ] # Only apply in_packs filter if no specific cardset is provided if this_pack['pack_team']['abbrev'] not in ['MSS'] and cardset_id is None: params.append(('in_packs', True)) if cardset_id is not None: params.append(('cardset_id', cardset_id)) pl = await db_get( 'players/random', params=params ) if pl['count'] >= 0: for x in pl['players']: if x not in players: players.append(x) if len(players) < 4: min_rarity += 1 rarity_id += 1 elif pack_type == 'Promo Choice': if this_pack['pack_cardset'] is None: raise KeyError(f'Cardset not listed for Promo Choice pack') d1000 = random.randint(1, 1000) pack_cover = IMAGES['mvp-hype'] cardset_id = this_pack['pack_cardset']['id'] rarity_id = 5 if d1000 > 800: rarity_id = 8 while len(players) < 4 and rarity_id < 10: pl = await db_get( 'players/random', params=[('cardset_id', cardset_id), ('min_rarity', rarity_id), ('max_rarity', rarity_id), ('limit', 8)] ) if pl['count'] >= 0: for x in pl['players']: if len(players) >= 4: break if x not in players: players.append(x) if len(players) < 4: cardset_id = LIVE_CARDSET_ID else: # Get 4 MVP cards rarity_id = 5 if pack_type == 'HoF': rarity_id = 8 elif pack_type == 'All Star': rarity_id = 3 min_rarity = rarity_id while len(players) < 4 and rarity_id < 10: params = [ ('min_rarity', min_rarity), ('max_rarity', rarity_id), ('limit', 4) ] # Only apply in_packs filter if no specific cardset is provided if cardset_id is None: params.append(('in_packs', True)) if this_pack['pack_team'] is not None: params.append(('franchise', this_pack['pack_team']['lname'])) if cardset_id is not None: params.append(('cardset_id', cardset_id)) pl = await db_get('players/random', params=params) if pl['count'] > 0: players.extend(pl['players']) if len(players) < 4: rarity_id += 3 if len(players) == 0: logger.error(f'Could not create choice pack') raise ConnectionError(f'Could not create choice pack') if type(context) == commands.Context: author = context.author else: author = context.user logger.info(f'helpers - open_choice_pack - players: {players}') # Display them with pagination, prev/next/select card_embeds = [ await get_card_embeds( # {'player': x, 'team': {'lname': 'Paper Dynasty', 'season': PD_SEASON, 'logo': IMAGES['logo']}} {'player': x, 'team': team} # Show team and dupe info ) for x in players ] logger.debug(f'card embeds: {card_embeds}') page_num = 0 view = Pagination([author], timeout=30) view.left_button.disabled = True view.left_button.label = f'Prev: -/{len(card_embeds)}' view.cancel_button.label = f'Take This Card' view.cancel_button.style = discord.ButtonStyle.success view.cancel_button.disabled = True view.right_button.label = f'Next: 1/{len(card_embeds)}' # React to selection await context.channel.send(f'Let\'s head down to {pack_channel.mention}!') msg = await pack_channel.send( content=None, embed=image_embed(pack_cover, title=f'{team["lname"]}', desc=f'{pack_type} Pack - Choose 1 of 4 {pack_type}s!'), view=view ) if rarity_id >= 5: tmp_msg = await pack_channel.send(content=f'<@&1163537676885033010> we\'ve got an MVP!') else: tmp_msg = await pack_channel.send(content=f'We\'ve got a choice pack here!') while True: await view.wait() if view.value: if view.value == 'cancel': await msg.edit(view=None) try: await give_cards_to_team(team, players=[players[page_num - 1]], pack_id=this_pack['id']) except Exception as e: logger.error(f'failed to create cards: {e}') raise ConnectionError(f'Failed to distribute these cards.') await db_patch('packs', object_id=this_pack['id'], params=[ ('open_time', int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)) ]) await tmp_msg.edit( content=f'{players[page_num - 1]["p_name"]} has been added to the ' f'**{team["sname"]}** binder!' ) break if view.value == 'left': page_num -= 1 if page_num > 1 else len(card_embeds) if view.value == 'right': page_num += 1 if page_num < len(card_embeds) else 1 else: if page_num == len(card_embeds): page_num = 1 else: page_num += 1 view.value = None view = Pagination([author], timeout=30) view.left_button.label = f'Prev: {page_num - 1}/{len(card_embeds)}' view.cancel_button.label = f'Take This Card' view.cancel_button.style = discord.ButtonStyle.success view.right_button.label = f'Next: {page_num + 1}/{len(card_embeds)}' if page_num == 1: view.left_button.label = f'Prev: -/{len(card_embeds)}' view.left_button.disabled = True elif page_num == len(card_embeds): view.right_button.label = f'Next: -/{len(card_embeds)}' view.right_button.disabled = True await msg.edit(content=None, embeds=card_embeds[page_num - 1], view=view) async def confirm_pack_purchase(interaction, owner_team, num_packs, total_cost, pack_embed): view = Confirm(responders=[interaction.user], timeout=30) await interaction.channel.send( content=None, embed=pack_embed ) question = await interaction.channel.send( content=f'Your Wallet: {owner_team["wallet"]}₼\n' f'Pack{"s" if num_packs > 1 else ""} Price: {total_cost}₼\n' f'After Purchase: {owner_team["wallet"] - total_cost}₼\n\n' f'Would you like to make this purchase?', view=view ) await view.wait() if not view.value: await question.edit( content='Saving that money. Smart.', view=None ) return None else: return question def player_desc(this_player) -> str: if this_player['p_name'] in this_player['description']: return this_player['description'] return f'{this_player["description"]} {this_player["p_name"]}' def player_pcard(this_player): if this_player['image'] is not None and 'pitching' in this_player['image']: return this_player['image'] elif this_player['image2'] is not None and 'pitching' in this_player['image2']: return this_player['image2'] else: return this_player['image'] def player_bcard(this_player): if this_player['image'] is not None and 'batting' in this_player['image']: return this_player['image'] elif this_player['image2'] is not None and 'batting' in this_player['image2']: return this_player['image2'] # elif this_player['image'] is not None and 'pitching' in this_player['image']: # return PITCHER_BATTING_CARD else: return this_player['image']