- Add APITimeoutError exception and retry logic to db_get - Add timeout handling to db_post, db_put, db_patch, db_delete - Fix get_team_by_owner to prefer non-gauntlet team (PostgreSQL migration fix) - Code formatting cleanup (black)
2154 lines
76 KiB
Python
2154 lines
76 KiB
Python
import asyncio
|
|
import datetime
|
|
import logging
|
|
import math
|
|
import os
|
|
import random
|
|
import traceback
|
|
|
|
import discord
|
|
import pygsheets
|
|
import requests
|
|
from discord.ext import commands
|
|
from api_calls import *
|
|
|
|
from bs4 import BeautifulSoup
|
|
from difflib import get_close_matches
|
|
from dataclasses import dataclass
|
|
from typing import Optional, Literal, Union, List
|
|
|
|
from exceptions import log_exception
|
|
from in_game.gameplay_models import Team
|
|
from constants import *
|
|
from discord_ui import *
|
|
from random_content import *
|
|
from utils import (
|
|
position_name_to_abbrev,
|
|
user_has_role,
|
|
get_roster_sheet_legacy,
|
|
get_roster_sheet,
|
|
get_player_url,
|
|
owner_only,
|
|
get_cal_user,
|
|
get_context_user,
|
|
)
|
|
from search_utils import *
|
|
from discord_utils import *
|
|
|
|
|
|
async def get_player_photo(player):
|
|
search_term = player["bbref_id"] if player["bbref_id"] else player["p_name"]
|
|
req_url = (
|
|
f"https://www.thesportsdb.com/api/v1/json/1/searchplayers.php?p={search_term}"
|
|
)
|
|
|
|
try:
|
|
resp = requests.get(req_url, timeout=0.5)
|
|
except Exception as e:
|
|
return None
|
|
if resp.status_code == 200 and resp.json()["player"]:
|
|
if resp.json()["player"][0]["strSport"] == "Baseball":
|
|
await db_patch(
|
|
"players",
|
|
object_id=player["player_id"],
|
|
params=[("headshot", resp.json()["player"][0]["strThumb"])],
|
|
)
|
|
return resp.json()["player"][0]["strThumb"]
|
|
return None
|
|
|
|
|
|
async def get_player_headshot(player):
|
|
search_term = player["bbref_id"] if player["bbref_id"] else player["p_name"]
|
|
req_url = (
|
|
f"https://www.baseball-reference.com/search/search.fcgi?search={search_term}"
|
|
)
|
|
|
|
try:
|
|
resp = requests.get(req_url, timeout=2).text
|
|
soup = BeautifulSoup(resp, "html.parser")
|
|
for item in soup.find_all("img"):
|
|
if "headshot" in item["src"]:
|
|
await db_patch(
|
|
"players",
|
|
object_id=player["player_id"],
|
|
params=[("headshot", item["src"])],
|
|
)
|
|
return item["src"]
|
|
except:
|
|
pass
|
|
return await get_player_photo(player)
|
|
|
|
|
|
"""
|
|
NEW FOR SEASON 4
|
|
"""
|
|
|
|
|
|
async def get_team_by_owner(owner_id: int):
|
|
team = await db_get("teams", params=[("gm_id", owner_id)])
|
|
|
|
if not team["count"]:
|
|
return None
|
|
|
|
# Prefer the non-gauntlet team (main team) if multiple teams exist
|
|
for t in team["teams"]:
|
|
if "gauntlet" not in t["abbrev"].lower():
|
|
return t
|
|
|
|
# Fallback to first team if all are gauntlet teams
|
|
return team["teams"][0]
|
|
|
|
|
|
async def team_role(ctx, team: Team):
|
|
return await get_or_create_role(ctx, f"{team.abbrev} - {team.lname}")
|
|
|
|
|
|
def get_all_pos(player):
|
|
all_pos = []
|
|
|
|
for x in range(1, 8):
|
|
if player[f"pos_{x}"]:
|
|
all_pos.append(player[f"pos_{x}"])
|
|
|
|
return all_pos
|
|
|
|
|
|
async def share_channel(channel, user, read_only=False):
|
|
await channel.set_permissions(user, read_messages=True, send_messages=not read_only)
|
|
|
|
|
|
async def get_card_embeds(card, include_stats=False) -> list:
|
|
embed = discord.Embed(
|
|
title=f"{card['player']['p_name']}",
|
|
color=int(card["player"]["rarity"]["color"], 16),
|
|
)
|
|
# embed.description = card['team']['lname']
|
|
embed.description = (
|
|
f"{card['player']['cardset']['name']} / {card['player']['mlbclub']}"
|
|
)
|
|
embed.set_author(
|
|
name=card["team"]["lname"], url=IMAGES["logo"], icon_url=card["team"]["logo"]
|
|
)
|
|
embed.set_footer(
|
|
text=f"Paper Dynasty Season {card['team']['season']}", icon_url=IMAGES["logo"]
|
|
)
|
|
|
|
if include_stats:
|
|
b_query = await db_get(
|
|
"plays/batting",
|
|
params=[("player_id", card["player"]["player_id"]), ("season", PD_SEASON)],
|
|
)
|
|
p_query = await db_get(
|
|
"plays/pitching",
|
|
params=[("player_id", card["player"]["player_id"]), ("season", PD_SEASON)],
|
|
)
|
|
|
|
embed.add_field(name="Player ID", value=f"{card['player']['player_id']}")
|
|
embed.add_field(name="Rarity", value=f"{card['player']['rarity']['name']}")
|
|
embed.add_field(name="Cost", value=f"{card['player']['cost']}₼")
|
|
|
|
pos_string = ", ".join(get_all_pos(card["player"]))
|
|
embed.add_field(name="Positions", value=pos_string)
|
|
# all_dex = card['player']['paperdex']
|
|
all_dex = await db_get(
|
|
"paperdex", params=[("player_id", card["player"]["player_id"]), ("flat", True)]
|
|
)
|
|
count = all_dex["count"]
|
|
if card["team"]["lname"] != "Paper Dynasty":
|
|
bool_list = [
|
|
True
|
|
for elem in all_dex["paperdex"]
|
|
if elem["team"] == card["team"].get("id", None)
|
|
]
|
|
if any(bool_list):
|
|
if count == 1:
|
|
coll_string = f"Only you"
|
|
else:
|
|
coll_string = (
|
|
f"You and {count - 1} other{'s' if count - 1 != 1 else ''}"
|
|
)
|
|
elif count:
|
|
coll_string = f"{count} other team{'s' if count != 1 else ''}"
|
|
else:
|
|
coll_string = f"0 teams"
|
|
embed.add_field(name="Collected By", value=coll_string)
|
|
else:
|
|
embed.add_field(
|
|
name="Collected By", value=f"{count} team{'s' if count != 1 else ''}"
|
|
)
|
|
|
|
# TODO: check for dupes with the included paperdex data
|
|
# if card['team']['lname'] != 'Paper Dynasty':
|
|
# team_dex = await db_get('cards', params=[("player_id", card["player"]["player_id"]), ('team_id', card['team']['id'])])
|
|
# count = 1 if not team_dex['count'] else team_dex['count']
|
|
# embed.add_field(name='# Dupes', value=f'{count - 1} dupe{"s" if count - 1 != 1 else ""}')
|
|
|
|
# embed.add_field(name='Team', value=f'{card["player"]["mlbclub"]}')
|
|
if card["player"]["franchise"] != "Pokemon":
|
|
player_pages = f"[BBRef](https://www.baseball-reference.com/players/{card['player']['bbref_id'][0]}/{card['player']['bbref_id']}.shtml)"
|
|
else:
|
|
player_pages = f"[Pkmn]({PKMN_REF_URL}{card['player']['bbref_id']})"
|
|
embed.add_field(name="Player Page", value=f"{player_pages}")
|
|
embed.set_image(url=card["player"]["image"])
|
|
|
|
headshot = (
|
|
card["player"]["headshot"]
|
|
if card["player"]["headshot"]
|
|
else await get_player_headshot(card["player"])
|
|
)
|
|
if headshot:
|
|
embed.set_thumbnail(url=headshot)
|
|
else:
|
|
embed.set_thumbnail(url=IMAGES["logo"])
|
|
|
|
if card["player"]["franchise"] == "Pokemon":
|
|
if card["player"]["fangr_id"] is not None:
|
|
try:
|
|
evo_mon = await db_get(
|
|
"players", object_id=card["player"]["fangr_id"], none_okay=True
|
|
)
|
|
if evo_mon is not None:
|
|
embed.add_field(name="Evolves Into", value=f"{evo_mon['p_name']}")
|
|
except Exception as e:
|
|
logging.error(
|
|
"could not pull evolution: {e}", exc_info=True, stack_info=True
|
|
)
|
|
if "420420" not in card["player"]["strat_code"]:
|
|
try:
|
|
evo_mon = await db_get(
|
|
"players", object_id=card["player"]["strat_code"], none_okay=True
|
|
)
|
|
if evo_mon is not None:
|
|
embed.add_field(name="Evolves From", value=f"{evo_mon['p_name']}")
|
|
except Exception as e:
|
|
logging.error(
|
|
"could not pull evolution: {e}", exc_info=True, stack_info=True
|
|
)
|
|
|
|
if include_stats:
|
|
if b_query["count"] > 0:
|
|
b = b_query["stats"][0]
|
|
|
|
re24 = f"{b['re24']:.2f}"
|
|
batting_string = (
|
|
f"```\n"
|
|
f" AVG OBP SLG\n"
|
|
f" {b['avg']:.3f} {b['obp']:.3f} {b['slg']:.3f}\n``````\n"
|
|
f" OPS wOBA RE24\n"
|
|
f" {b['ops']:.3f} {b['woba']:.3f} {re24: ^5}\n``````\n"
|
|
f" PA H RBI 2B 3B HR SB\n"
|
|
f"{b['pa']: >3} {b['hit']: ^3} {b['rbi']: ^3} {b['double']: >2} {b['triple']: >2} "
|
|
f"{b['hr']: >2} {b['sb']: >2}```\n"
|
|
)
|
|
embed.add_field(name="Batting Stats", value=batting_string, inline=False)
|
|
if p_query["count"] > 0:
|
|
p = p_query["stats"][0]
|
|
|
|
ip_whole = math.floor(p["outs"] / 3)
|
|
ip_denom = p["outs"] % 3
|
|
ips = ip_whole + (ip_denom * 0.1)
|
|
|
|
kpbb = f"{p['k/bb']:.1f}"
|
|
era = f"{p['era']:.2f}"
|
|
whip = f"{p['whip']:.2f}"
|
|
re24 = f"{p['re24']:.2f}"
|
|
|
|
pitching_string = (
|
|
f"```\n"
|
|
f" W-L SV ERA WHIP\n"
|
|
f"{p['win']: >2}-{p['loss']: <2} {p['save']: >2} {era: >5} {whip: >4}\n``````\n"
|
|
f" IP SO K/BB RE24\n"
|
|
f"{ips: >5} {p['so']: ^3} {kpbb: ^4} {re24: ^5}\n```"
|
|
)
|
|
embed.add_field(name="Pitching Stats", value=pitching_string, inline=False)
|
|
|
|
if not card["player"]["image2"]:
|
|
return [embed]
|
|
|
|
card_two = discord.Embed(color=int(card["player"]["rarity"]["color"], 16))
|
|
card_two.set_footer(
|
|
text=f"Paper Dynasty Season {card['team']['season']}", icon_url=IMAGES["logo"]
|
|
)
|
|
card_two.set_image(url=card["player"]["image2"])
|
|
|
|
return [embed, card_two]
|
|
|
|
|
|
def image_embed(
|
|
image_url: str,
|
|
title: str = None,
|
|
color: str = None,
|
|
desc: str = None,
|
|
author_name: str = None,
|
|
author_icon: str = None,
|
|
):
|
|
embed_color = int(SBA_COLOR, 16)
|
|
if color is not None:
|
|
embed_color = int(color, 16)
|
|
|
|
embed = discord.Embed(color=embed_color)
|
|
|
|
if title is not None:
|
|
embed.title = title
|
|
if desc is not None:
|
|
embed.description = desc
|
|
if author_name is not None:
|
|
icon = author_icon if author_icon is not None else IMAGES["logo"]
|
|
embed.set_author(name=author_name, icon_url=icon)
|
|
embed.set_footer(text=f"Paper Dynasty Season {PD_SEASON}", icon_url=IMAGES["logo"])
|
|
embed.set_image(url=image_url)
|
|
return embed
|
|
|
|
|
|
def is_shiny(card):
|
|
if card["player"]["rarity"]["value"] >= 5:
|
|
return True
|
|
return False
|
|
|
|
|
|
async def display_cards(
|
|
cards: list,
|
|
team: dict,
|
|
channel,
|
|
user,
|
|
bot=None,
|
|
pack_cover: str = None,
|
|
cust_message: str = None,
|
|
add_roster: bool = True,
|
|
pack_name: str = None,
|
|
) -> bool:
|
|
logger.info(
|
|
f"display_cards called with {len(cards)} cards for team {team.get('abbrev', 'Unknown')}"
|
|
)
|
|
try:
|
|
cards.sort(key=lambda x: x["player"]["rarity"]["value"])
|
|
logger.debug(f"Cards sorted successfully")
|
|
|
|
card_embeds = [await get_card_embeds(x) for x in cards]
|
|
logger.debug(f"Created {len(card_embeds)} card embeds")
|
|
|
|
page_num = 0 if pack_cover is None else -1
|
|
seen_shiny = False
|
|
logger.debug(
|
|
f"Initial page_num: {page_num}, pack_cover: {pack_cover is not None}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error in display_cards initialization: {e}", exc_info=True)
|
|
return False
|
|
|
|
try:
|
|
view = Pagination([user], timeout=10)
|
|
# Use simple text arrows instead of emojis to avoid context issues
|
|
l_emoji = "←"
|
|
r_emoji = "→"
|
|
view.left_button.disabled = True
|
|
view.left_button.label = f"{l_emoji}Prev: -/{len(card_embeds)}"
|
|
view.cancel_button.label = f"Close Pack"
|
|
view.right_button.label = f"Next: {page_num + 2}/{len(card_embeds)}{r_emoji}"
|
|
if len(cards) == 1:
|
|
view.right_button.disabled = True
|
|
|
|
logger.debug(f"Pagination view created successfully")
|
|
|
|
if pack_cover:
|
|
logger.debug(f"Sending pack cover message")
|
|
msg = await channel.send(
|
|
content=None,
|
|
embed=image_embed(pack_cover, title=f"{team['lname']}", desc=pack_name),
|
|
view=view,
|
|
)
|
|
else:
|
|
logger.debug(f"Sending card embed message for page {page_num}")
|
|
msg = await channel.send(
|
|
content=None, embeds=card_embeds[page_num], view=view
|
|
)
|
|
|
|
logger.debug(f"Initial message sent successfully")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error creating view or sending initial message: {e}", exc_info=True
|
|
)
|
|
return False
|
|
|
|
try:
|
|
if cust_message:
|
|
logger.debug(f"Sending custom message: {cust_message[:50]}...")
|
|
follow_up = await channel.send(cust_message)
|
|
else:
|
|
logger.debug(f"Sending default message for {len(cards)} cards")
|
|
follow_up = await channel.send(
|
|
f"{user.mention} you've got {len(cards)} cards here"
|
|
)
|
|
|
|
logger.debug(f"Follow-up message sent successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error sending follow-up message: {e}", exc_info=True)
|
|
return False
|
|
|
|
logger.debug(f"Starting main interaction loop")
|
|
while True:
|
|
try:
|
|
logger.debug(f"Waiting for user interaction on page {page_num}")
|
|
await view.wait()
|
|
logger.debug(f"User interaction received: {view.value}")
|
|
except Exception as e:
|
|
logger.error(f"Error in view.wait(): {e}", exc_info=True)
|
|
await msg.edit(view=None)
|
|
return False
|
|
|
|
if view.value:
|
|
if view.value == "cancel":
|
|
await msg.edit(view=None)
|
|
if add_roster:
|
|
await follow_up.edit(
|
|
content=f"Refresh your cards here: {get_roster_sheet(team)}"
|
|
)
|
|
return True
|
|
if view.value == "left":
|
|
page_num -= 1 if page_num > 0 else 0
|
|
if view.value == "right":
|
|
page_num += 1 if page_num < len(card_embeds) - 1 else 0
|
|
else:
|
|
if page_num == len(card_embeds) - 1:
|
|
await msg.edit(view=None)
|
|
if add_roster:
|
|
await follow_up.edit(
|
|
content=f"Refresh your cards here: {get_roster_sheet(team)}"
|
|
)
|
|
return True
|
|
else:
|
|
page_num += 1
|
|
|
|
view.value = None
|
|
|
|
try:
|
|
if is_shiny(cards[page_num]) and not seen_shiny:
|
|
logger.info(
|
|
f"Shiny card detected on page {page_num}: {cards[page_num]['player']['p_name']}"
|
|
)
|
|
seen_shiny = True
|
|
view = Pagination([user], timeout=300)
|
|
view.cancel_button.style = discord.ButtonStyle.success
|
|
view.cancel_button.label = "Flip!"
|
|
view.left_button.label = "-"
|
|
view.right_button.label = "-"
|
|
view.left_button.disabled = True
|
|
view.right_button.disabled = True
|
|
|
|
# Get MVP image safely with fallback
|
|
franchise = cards[page_num]["player"]["franchise"]
|
|
logger.debug(f"Getting MVP image for franchise: {franchise}")
|
|
mvp_image = IMAGES["mvp"].get(
|
|
franchise, IMAGES.get("mvp-hype", IMAGES["logo"])
|
|
)
|
|
|
|
await msg.edit(
|
|
embed=image_embed(
|
|
mvp_image,
|
|
color="56f1fa",
|
|
author_name=team["lname"],
|
|
author_icon=team["logo"],
|
|
),
|
|
view=view,
|
|
)
|
|
logger.debug(f"MVP display updated successfully")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing shiny card on page {page_num}: {e}", exc_info=True
|
|
)
|
|
# Continue with regular flow instead of crashing
|
|
try:
|
|
tmp_msg = await channel.send(
|
|
content=f"<@&1163537676885033010> we've got an MVP!"
|
|
)
|
|
await follow_up.edit(
|
|
content=f"<@&1163537676885033010> we've got an MVP!"
|
|
)
|
|
await tmp_msg.delete()
|
|
except discord.errors.NotFound:
|
|
# Role might not exist or message was already deleted
|
|
await follow_up.edit(content=f"We've got an MVP!")
|
|
except Exception as e:
|
|
# Log error but don't crash the function
|
|
logger.error(f"Error handling MVP notification: {e}")
|
|
await follow_up.edit(content=f"We've got an MVP!")
|
|
await view.wait()
|
|
|
|
view = Pagination([user], timeout=10)
|
|
try:
|
|
view.right_button.label = (
|
|
f"Next: {page_num + 2}/{len(card_embeds)}{r_emoji}"
|
|
)
|
|
view.cancel_button.label = f"Close Pack"
|
|
view.left_button.label = f"{l_emoji}Prev: {page_num}/{len(card_embeds)}"
|
|
if page_num == 0:
|
|
view.left_button.label = f"{l_emoji}Prev: -/{len(card_embeds)}"
|
|
view.left_button.disabled = True
|
|
elif page_num == len(card_embeds) - 1:
|
|
view.timeout = 600.0
|
|
view.right_button.label = f"Next: -/{len(card_embeds)}{r_emoji}"
|
|
view.right_button.disabled = True
|
|
|
|
logger.debug(f"Updating message to show page {page_num}/{len(card_embeds)}")
|
|
if page_num >= len(card_embeds):
|
|
logger.error(
|
|
f"Page number {page_num} exceeds card_embeds length {len(card_embeds)}"
|
|
)
|
|
page_num = len(card_embeds) - 1
|
|
|
|
await msg.edit(content=None, embeds=card_embeds[page_num], view=view)
|
|
logger.debug(f"Message updated successfully to page {page_num}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error updating message on page {page_num}: {e}", exc_info=True
|
|
)
|
|
# Try to clean up and return
|
|
try:
|
|
await msg.edit(view=None)
|
|
except:
|
|
pass # If this fails too, just give up
|
|
return False
|
|
|
|
|
|
async def embed_pagination(
|
|
all_embeds: list,
|
|
channel,
|
|
user: discord.Member,
|
|
custom_message: str = None,
|
|
timeout: int = 10,
|
|
start_page: int = 0,
|
|
):
|
|
if start_page > len(all_embeds) - 1 or start_page < 0:
|
|
page_num = 0
|
|
else:
|
|
page_num = start_page
|
|
|
|
view = Pagination([user], timeout=timeout)
|
|
l_emoji = ""
|
|
r_emoji = ""
|
|
view.right_button.label = f"Next: {page_num + 2}/{len(all_embeds)}{r_emoji}"
|
|
view.cancel_button.label = f"Cancel"
|
|
view.left_button.label = f"{l_emoji}Prev: {page_num}/{len(all_embeds)}"
|
|
if page_num == 0:
|
|
view.left_button.label = f"{l_emoji}Prev: -/{len(all_embeds)}"
|
|
view.left_button.disabled = True
|
|
elif page_num == len(all_embeds) - 1:
|
|
view.right_button.label = f"Next: -/{len(all_embeds)}{r_emoji}"
|
|
view.right_button.disabled = True
|
|
|
|
msg = await channel.send(
|
|
content=custom_message, embed=all_embeds[page_num], view=view
|
|
)
|
|
|
|
while True:
|
|
await view.wait()
|
|
|
|
if view.value:
|
|
if view.value == "cancel":
|
|
await msg.edit(view=None)
|
|
return True
|
|
if view.value == "left":
|
|
page_num -= 1 if page_num > 0 else 0
|
|
if view.value == "right":
|
|
page_num += 1 if page_num <= len(all_embeds) else len(all_embeds)
|
|
else:
|
|
if page_num == len(all_embeds) - 1:
|
|
await msg.edit(view=None)
|
|
return True
|
|
else:
|
|
page_num += 1
|
|
|
|
view.value = None
|
|
|
|
view = Pagination([user], timeout=timeout)
|
|
view.right_button.label = f"Next: {page_num + 2}/{len(all_embeds)}{r_emoji}"
|
|
view.cancel_button.label = f"Cancel"
|
|
view.left_button.label = f"{l_emoji}Prev: {page_num}/{len(all_embeds)}"
|
|
if page_num == 0:
|
|
view.left_button.label = f"{l_emoji}Prev: -/{len(all_embeds)}"
|
|
view.left_button.disabled = True
|
|
elif page_num == len(all_embeds) - 1:
|
|
view.timeout = 600.0
|
|
view.right_button.label = f"Next: -/{len(all_embeds)}{r_emoji}"
|
|
view.right_button.disabled = True
|
|
|
|
await msg.edit(content=None, embed=all_embeds[page_num], view=view)
|
|
|
|
|
|
async def get_test_pack(ctx, team):
|
|
pull_notifs = []
|
|
this_pack = await db_post(
|
|
"packs/one",
|
|
payload={
|
|
"team_id": team["id"],
|
|
"pack_type_id": 1,
|
|
"open_time": int(
|
|
datetime.datetime.timestamp(datetime.datetime.now()) * 1000
|
|
),
|
|
},
|
|
)
|
|
ft_query = await db_get("players/random", params=[("max_rarity", 1), ("limit", 3)])
|
|
four_query = await db_get(
|
|
"players/random", params=[("min_rarity", 1), ("max_rarity", 3), ("limit", 1)]
|
|
)
|
|
five_query = await db_get(
|
|
"players/random", params=[("min_rarity", 5), ("max_rarity", 5), ("limit", 1)]
|
|
)
|
|
first_three = ft_query["players"]
|
|
fourth = four_query["players"]
|
|
fifth = five_query["players"]
|
|
all_cards = [*first_three, *fourth, *fifth]
|
|
|
|
success = await db_post(
|
|
"cards",
|
|
timeout=10,
|
|
payload={
|
|
"cards": [
|
|
{
|
|
"player_id": x["player_id"],
|
|
"team_id": team["id"],
|
|
"pack_id": this_pack["id"],
|
|
}
|
|
for x in all_cards
|
|
]
|
|
},
|
|
)
|
|
if not success:
|
|
await ctx.send(
|
|
f"I was not able to create these cards {get_emoji(ctx, 'slight_frown')}"
|
|
)
|
|
return
|
|
|
|
for x in all_cards:
|
|
if x["rarity"]["value"] >= 3:
|
|
pull_notifs.append(x)
|
|
|
|
for pull in pull_notifs:
|
|
await db_post(
|
|
"notifs",
|
|
payload={
|
|
"created": int(
|
|
datetime.datetime.timestamp(datetime.datetime.now()) * 1000
|
|
),
|
|
"title": "Rare Pull",
|
|
"field_name": f"{player_desc(pull)} ({pull['rarity']['name']})",
|
|
"message": f"Pulled by {team['abbrev']}",
|
|
"about": f"Player-{pull['player_id']}",
|
|
},
|
|
)
|
|
|
|
return [{"player": x, "team": team} for x in all_cards]
|
|
|
|
|
|
async def roll_for_cards(all_packs: list, extra_val=None) -> list:
|
|
"""
|
|
Pack odds are calculated based on the pack type
|
|
|
|
Parameters
|
|
----------
|
|
extra_val
|
|
all_packs
|
|
|
|
Returns
|
|
-------
|
|
|
|
"""
|
|
all_players = []
|
|
team = all_packs[0]["team"]
|
|
pack_ids = []
|
|
for pack in all_packs:
|
|
counts = {
|
|
"Rep": {"count": 0, "rarity": 0},
|
|
"Res": {"count": 0, "rarity": 1},
|
|
"Sta": {"count": 0, "rarity": 2},
|
|
"All": {"count": 0, "rarity": 3},
|
|
"MVP": {"count": 0, "rarity": 5},
|
|
"HoF": {"count": 0, "rarity": 8},
|
|
}
|
|
this_pack_players = []
|
|
if pack["pack_type"]["name"] == "Standard":
|
|
# Cards 1 - 2
|
|
for x in range(2):
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 450:
|
|
counts["Rep"]["count"] += 1
|
|
elif d_1000 <= 900:
|
|
counts["Res"]["count"] += 1
|
|
else:
|
|
counts["Sta"]["count"] += 1
|
|
|
|
# Card 3
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 350:
|
|
counts["Rep"]["count"] += 1
|
|
elif d_1000 <= 700:
|
|
counts["Res"]["count"] += 1
|
|
elif d_1000 <= 950:
|
|
counts["Sta"]["count"] += 1
|
|
else:
|
|
counts["All"]["count"] += 1
|
|
|
|
# Card 4
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 310:
|
|
counts["Rep"]["count"] += 1
|
|
elif d_1000 <= 620:
|
|
counts["Res"]["count"] += 1
|
|
elif d_1000 <= 940:
|
|
counts["Sta"]["count"] += 1
|
|
elif d_1000 <= 990:
|
|
counts["All"]["count"] += 1
|
|
else:
|
|
counts["MVP"]["count"] += 1
|
|
|
|
# Card 5
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 215:
|
|
counts["Rep"]["count"] += 1
|
|
elif d_1000 <= 430:
|
|
counts["Res"]["count"] += 1
|
|
elif d_1000 <= 930:
|
|
counts["Sta"]["count"] += 1
|
|
elif d_1000 <= 980:
|
|
counts["All"]["count"] += 1
|
|
elif d_1000 <= 990:
|
|
counts["MVP"]["count"] += 1
|
|
else:
|
|
counts["HoF"]["count"] += 1
|
|
|
|
elif pack["pack_type"]["name"] == "Premium":
|
|
# Card 1
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 400:
|
|
counts["Rep"]["count"] += 1
|
|
elif d_1000 <= 870:
|
|
counts["Res"]["count"] += 1
|
|
elif d_1000 <= 970:
|
|
counts["Sta"]["count"] += 1
|
|
elif d_1000 <= 990:
|
|
counts["All"]["count"] += 1
|
|
else:
|
|
counts["MVP"]["count"] += 1
|
|
|
|
# Card 2
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 300:
|
|
counts["Rep"]["count"] += 1
|
|
elif d_1000 <= 770:
|
|
counts["Res"]["count"] += 1
|
|
elif d_1000 <= 970:
|
|
counts["Sta"]["count"] += 1
|
|
elif d_1000 <= 990:
|
|
counts["All"]["count"] += 1
|
|
else:
|
|
counts["MVP"]["count"] += 1
|
|
|
|
# Card 3
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 200:
|
|
counts["Rep"]["count"] += 1
|
|
elif d_1000 <= 640:
|
|
counts["Res"]["count"] += 1
|
|
elif d_1000 <= 940:
|
|
counts["Sta"]["count"] += 1
|
|
elif d_1000 <= 990:
|
|
counts["All"]["count"] += 1
|
|
else:
|
|
counts["MVP"]["count"] += 1
|
|
|
|
# Card 4
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 100:
|
|
counts["Rep"]["count"] += 1
|
|
if d_1000 <= 530:
|
|
counts["Res"]["count"] += 1
|
|
elif d_1000 <= 930:
|
|
counts["Sta"]["count"] += 1
|
|
elif d_1000 <= 980:
|
|
counts["All"]["count"] += 1
|
|
elif d_1000 <= 990:
|
|
counts["MVP"]["count"] += 1
|
|
else:
|
|
counts["HoF"]["count"] += 1
|
|
|
|
# Card 5
|
|
d_1000 = random.randint(1, 1000)
|
|
if d_1000 <= 380:
|
|
counts["Res"]["count"] += 1
|
|
elif d_1000 <= 880:
|
|
counts["Sta"]["count"] += 1
|
|
elif d_1000 <= 980:
|
|
counts["All"]["count"] += 1
|
|
elif d_1000 <= 990:
|
|
counts["MVP"]["count"] += 1
|
|
else:
|
|
counts["HoF"]["count"] += 1
|
|
|
|
elif pack["pack_type"]["name"] == "Check-In Player":
|
|
logger.info(
|
|
f"Building Check-In Pack // extra_val (type): {extra_val} {type(extra_val)}"
|
|
)
|
|
# Single Card
|
|
mod = 0
|
|
if isinstance(extra_val, int):
|
|
mod = extra_val
|
|
d_1000 = random.randint(1, 1000 + mod)
|
|
|
|
if d_1000 >= 1100:
|
|
counts["All"]["count"] += 1
|
|
elif d_1000 >= 1000:
|
|
counts["Sta"]["count"] += 1
|
|
elif d_1000 >= 500:
|
|
counts["Res"]["count"] += 1
|
|
else:
|
|
counts["Rep"]["count"] += 1
|
|
|
|
else:
|
|
raise TypeError(f"Pack type not recognized: {pack['pack_type']['name']}")
|
|
|
|
pull_notifs = []
|
|
for key in counts:
|
|
mvp_flag = None
|
|
|
|
if counts[key]["count"] > 0:
|
|
params = [
|
|
("min_rarity", counts[key]["rarity"]),
|
|
("max_rarity", counts[key]["rarity"]),
|
|
("limit", counts[key]["count"]),
|
|
]
|
|
if all_packs[0]["pack_team"] is not None:
|
|
params.extend(
|
|
[
|
|
("franchise", all_packs[0]["pack_team"]["lname"]),
|
|
("in_packs", True),
|
|
]
|
|
)
|
|
elif all_packs[0]["pack_cardset"] is not None:
|
|
params.append(("cardset_id", all_packs[0]["pack_cardset"]["id"]))
|
|
else:
|
|
params.append(("in_packs", True))
|
|
|
|
pl = await db_get("players/random", params=params)
|
|
|
|
if pl["count"] != counts[key]["count"]:
|
|
mvp_flag = counts[key]["count"] - pl["count"]
|
|
logging.info(
|
|
f"Set mvp flag to {mvp_flag} / cardset_id: {all_packs[0]['pack_cardset']['id']}"
|
|
)
|
|
|
|
for x in pl["players"]:
|
|
this_pack_players.append(x)
|
|
all_players.append(x)
|
|
|
|
if x["rarity"]["value"] >= 3:
|
|
pull_notifs.append(x)
|
|
|
|
if mvp_flag and all_packs[0]["pack_cardset"]["id"] not in [23]:
|
|
logging.info(f"Adding {mvp_flag} MVPs for missing cards")
|
|
pl = await db_get(
|
|
"players/random", params=[("min_rarity", 5), ("limit", mvp_flag)]
|
|
)
|
|
|
|
for x in pl["players"]:
|
|
this_pack_players.append(x)
|
|
all_players.append(x)
|
|
|
|
# Add dupes of Replacement/Reserve cards
|
|
elif mvp_flag:
|
|
logging.info(f"Adding {mvp_flag} duplicate pokemon cards")
|
|
for count in range(mvp_flag):
|
|
logging.info(f"Adding {pl['players'][0]['p_name']} to the pack")
|
|
this_pack_players.append(x)
|
|
all_players.append(pl["players"][0])
|
|
|
|
success = await db_post(
|
|
"cards",
|
|
payload={
|
|
"cards": [
|
|
{
|
|
"player_id": x["player_id"],
|
|
"team_id": pack["team"]["id"],
|
|
"pack_id": pack["id"],
|
|
}
|
|
for x in this_pack_players
|
|
]
|
|
},
|
|
timeout=10,
|
|
)
|
|
if not success:
|
|
raise ConnectionError(f"Failed to create this pack of cards.")
|
|
|
|
await db_patch(
|
|
"packs",
|
|
object_id=pack["id"],
|
|
params=[
|
|
(
|
|
"open_time",
|
|
int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000),
|
|
)
|
|
],
|
|
)
|
|
pack_ids.append(pack["id"])
|
|
|
|
for pull in pull_notifs:
|
|
logger.info(f"good pull: {pull}")
|
|
await db_post(
|
|
"notifs",
|
|
payload={
|
|
"created": int(
|
|
datetime.datetime.timestamp(datetime.datetime.now()) * 1000
|
|
),
|
|
"title": "Rare Pull",
|
|
"field_name": f"{player_desc(pull)} ({pull['rarity']['name']})",
|
|
"message": f"Pulled by {team['abbrev']}",
|
|
"about": f"Player-{pull['player_id']}",
|
|
},
|
|
)
|
|
|
|
return pack_ids
|
|
|
|
|
|
async def give_packs(team: dict, num_packs: int, pack_type: dict = None) -> dict:
|
|
"""
|
|
Parameters
|
|
----------
|
|
pack_type
|
|
team
|
|
num_packs
|
|
|
|
Returns
|
|
-------
|
|
{ 'count': int, 'packs': [ all team packs ] }
|
|
"""
|
|
pt_id = pack_type["id"] if pack_type is not None else 1
|
|
await db_post(
|
|
"packs",
|
|
payload={
|
|
"packs": [
|
|
{"team_id": team["id"], "pack_type_id": pt_id} for x in range(num_packs)
|
|
]
|
|
},
|
|
)
|
|
total_packs = await db_get(
|
|
"packs", params=[("team_id", team["id"]), ("opened", False)]
|
|
)
|
|
|
|
return total_packs
|
|
|
|
|
|
def get_sheets(bot):
|
|
try:
|
|
return bot.get_cog("Gameplay").sheets
|
|
except Exception as e:
|
|
logger.error(f"Could not grab sheets auth: {e}")
|
|
raise ConnectionError(
|
|
f"Bot has not authenticated with discord; please try again in 1 minute."
|
|
)
|
|
|
|
|
|
def create_team_sheet(team, email: str, current, bot):
|
|
sheets = get_sheets(bot)
|
|
new_sheet = sheets.drive.copy_file(
|
|
f"{current['gsheet_template']}",
|
|
f"{team['lname']} Roster Sheet v{current['gsheet_version']}",
|
|
"1539D0imTMjlUx2VF3NPMt7Sv85sb2XAJ",
|
|
)
|
|
logger.info(f"new_sheet: {new_sheet}")
|
|
|
|
this_sheet = sheets.open_by_key(new_sheet["id"])
|
|
this_sheet.share(email, role="writer")
|
|
team_data = this_sheet.worksheet_by_title("Team Data")
|
|
team_data.update_values(
|
|
crange="B1:B2", values=[[f"{team['id']}"], [f"'{team_hash(team)}"]]
|
|
)
|
|
logger.debug(f"this_sheet: {this_sheet}")
|
|
return this_sheet
|
|
|
|
|
|
async def refresh_sheet(team, bot, sheets=None) -> None:
|
|
return
|
|
if not sheets:
|
|
sheets = get_sheets(bot)
|
|
|
|
this_sheet = sheets.open_by_key(team["gsheet"])
|
|
my_cards = this_sheet.worksheet_by_title("My Cards")
|
|
all_cards = this_sheet.worksheet_by_title("All Cards")
|
|
|
|
my_cards.update_value("A2", "FALSE")
|
|
all_cards.update_value("A2", "FALSE")
|
|
await asyncio.sleep(1)
|
|
|
|
my_cards.update_value("A2", "TRUE")
|
|
await asyncio.sleep(0.5)
|
|
all_cards.update_value("A2", "TRUE")
|
|
|
|
|
|
def delete_sheet(team, bot):
|
|
sheets = get_sheets(bot)
|
|
this_sheet = sheets.open_by_key(team["gsheet"])
|
|
this_sheet.delete()
|
|
|
|
|
|
def share_sheet(team, email, bot) -> None:
|
|
sheets = get_sheets(bot)
|
|
this_sheet = sheets.open_by_key(team["gsheet"])
|
|
this_sheet.share(email, role="writer")
|
|
|
|
|
|
def int_timestamp(datetime_obj: datetime.datetime) -> int:
|
|
return int(datetime.datetime.timestamp(datetime_obj) * 1000)
|
|
|
|
|
|
def get_pos_abbrev(pos_name):
|
|
if pos_name == "Catcher":
|
|
return "C"
|
|
elif pos_name == "First Base":
|
|
return "1B"
|
|
elif pos_name == "Second Base":
|
|
return "2B"
|
|
elif pos_name == "Third Base":
|
|
return "3B"
|
|
elif pos_name == "Shortstop":
|
|
return "SS"
|
|
elif pos_name == "Left Field":
|
|
return "LF"
|
|
elif pos_name == "Center Field":
|
|
return "CF"
|
|
elif pos_name == "Right Field":
|
|
return "RF"
|
|
elif pos_name == "Pitcher":
|
|
return "P"
|
|
elif pos_name == "Designated Hitter":
|
|
return "DH"
|
|
elif pos_name == "Pinch Hitter":
|
|
return "PH"
|
|
else:
|
|
raise KeyError(f"{pos_name} is not a recognized position name")
|
|
|
|
|
|
async def cardset_search(cardset: str, cardset_list: list) -> Optional[dict]:
|
|
cardset_name = fuzzy_search(cardset, cardset_list)
|
|
if not cardset_name:
|
|
return None
|
|
|
|
c_query = await db_get("cardsets", params=[("name", cardset_name)])
|
|
if c_query["count"] == 0:
|
|
return None
|
|
return c_query["cardsets"][0]
|
|
|
|
|
|
def get_blank_team_card(player):
|
|
return {
|
|
"player": player,
|
|
"team": {
|
|
"lname": "Paper Dynasty",
|
|
"logo": IMAGES["logo"],
|
|
"season": PD_SEASON,
|
|
"id": None,
|
|
},
|
|
}
|
|
|
|
|
|
def get_rosters(team, bot, roster_num: Optional[int] = None) -> list:
|
|
sheets = get_sheets(bot)
|
|
this_sheet = sheets.open_by_key(team["gsheet"])
|
|
r_sheet = this_sheet.worksheet_by_title(f"My Rosters")
|
|
logger.debug(f"this_sheet: {this_sheet} / r_sheet = {r_sheet}")
|
|
|
|
all_rosters = [None, None, None]
|
|
|
|
# Pull roster 1
|
|
if not roster_num or roster_num == 1:
|
|
roster_1 = r_sheet.range("B3:B28")
|
|
roster_name = r_sheet.cell("F30").value
|
|
logger.info(f"roster_1: {roster_1}")
|
|
|
|
if not roster_1[0][0].value == "":
|
|
all_rosters[0] = {
|
|
"name": roster_name,
|
|
"roster_num": 1,
|
|
"team_id": team["id"],
|
|
"cards": None,
|
|
}
|
|
all_rosters[0]["cards"] = [int(x[0].value) for x in roster_1]
|
|
|
|
# Pull roster 2
|
|
if not roster_num or roster_num == 2:
|
|
roster_2 = r_sheet.range("B29:B54")
|
|
roster_name = r_sheet.cell("F31").value
|
|
logger.info(f"roster_2: {roster_2}")
|
|
|
|
if not roster_2[0][0].value == "":
|
|
all_rosters[1] = {
|
|
"name": roster_name,
|
|
"roster_num": 2,
|
|
"team_id": team["id"],
|
|
"cards": None,
|
|
}
|
|
all_rosters[1]["cards"] = [int(x[0].value) for x in roster_2]
|
|
|
|
# Pull roster 3
|
|
if not roster_num or roster_num == 3:
|
|
roster_3 = r_sheet.range("B55:B80")
|
|
roster_name = r_sheet.cell("F32").value
|
|
logger.info(f"roster_3: {roster_3}")
|
|
|
|
if not roster_3[0][0].value == "":
|
|
all_rosters[2] = {
|
|
"name": roster_name,
|
|
"roster_num": 3,
|
|
"team_id": team["id"],
|
|
"cards": None,
|
|
}
|
|
all_rosters[2]["cards"] = [int(x[0].value) for x in roster_3]
|
|
|
|
return all_rosters
|
|
|
|
|
|
def get_roster_lineups(team, bot, roster_num, lineup_num) -> list:
|
|
sheets = get_sheets(bot)
|
|
logger.debug(f"sheets: {sheets}")
|
|
this_sheet = sheets.open_by_key(team["gsheet"])
|
|
logger.debug(f"this_sheet: {this_sheet}")
|
|
r_sheet = this_sheet.worksheet_by_title("My Rosters")
|
|
logger.debug(f"r_sheet: {r_sheet}")
|
|
|
|
if lineup_num == 1:
|
|
row_start = 9
|
|
row_end = 17
|
|
else:
|
|
row_start = 18
|
|
row_end = 26
|
|
|
|
if roster_num == 1:
|
|
l_range = f"H{row_start}:I{row_end}"
|
|
elif roster_num == 2:
|
|
l_range = f"J{row_start}:K{row_end}"
|
|
else:
|
|
l_range = f"L{row_start}:M{row_end}"
|
|
|
|
logger.debug(f"l_range: {l_range}")
|
|
raw_cells = r_sheet.range(l_range)
|
|
logger.debug(f"raw_cells: {raw_cells}")
|
|
|
|
try:
|
|
lineup_cells = [(row[0].value, int(row[1].value)) for row in raw_cells]
|
|
except ValueError as e:
|
|
logger.error(f"Could not pull roster for {team['abbrev']} due to a ValueError")
|
|
raise ValueError(
|
|
f"Uh oh. Looks like your roster might not be saved. I am reading blanks when I try to "
|
|
f"get the card IDs"
|
|
)
|
|
logger.debug(f"lineup_cells: {lineup_cells}")
|
|
|
|
return lineup_cells
|
|
|
|
|
|
def post_ratings_guide(team, bot, this_sheet=None):
|
|
if not this_sheet:
|
|
sheets = get_sheets(bot)
|
|
this_sheet = sheets.open_by_key(team["gsheet"])
|
|
p_guide = this_sheet.worksheet_by_title("Full Guide - Pitchers")
|
|
b_guide = this_sheet.worksheet_by_title("Full Guide - Batters")
|
|
|
|
p_guide.update_value("A1", RATINGS_PITCHER_FORMULA)
|
|
b_guide.update_value("A1", RATINGS_BATTER_FORMULA)
|
|
|
|
|
|
async def legal_channel(ctx):
|
|
"""Check for prefix commands (commands.Context)."""
|
|
bad_channels = ["paper-dynasty-chat", "pd-news-ticker", "pd-network-news"]
|
|
|
|
if isinstance(ctx, commands.Context):
|
|
if ctx.channel.name in bad_channels:
|
|
raise commands.CheckFailure(
|
|
f"Slide on down to the {get_channel(ctx, 'pd-bot-hole').mention} ;)"
|
|
)
|
|
else:
|
|
return True
|
|
|
|
elif ctx.channel.name in bad_channels:
|
|
# await ctx.message.add_reaction('❌')
|
|
# await ctx.send(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)')
|
|
# logger.warning(f'{ctx.author.name} posted in illegal channel.')
|
|
# return False
|
|
raise discord.app_commands.AppCommandError(
|
|
f"Slide on down to the {get_channel(ctx, 'pd-bot-hole').mention} ;)"
|
|
)
|
|
else:
|
|
return True
|
|
|
|
|
|
def app_legal_channel():
|
|
"""Check for slash commands (app_commands). Use as @app_legal_channel()"""
|
|
|
|
async def predicate(interaction: discord.Interaction) -> bool:
|
|
bad_channels = ["paper-dynasty-chat", "pd-news-ticker", "pd-network-news"]
|
|
if interaction.channel.name in bad_channels:
|
|
raise discord.app_commands.CheckFailure(
|
|
f"Slide on down to the {get_channel(interaction, 'pd-bot-hole').mention} ;)"
|
|
)
|
|
return True
|
|
|
|
return discord.app_commands.check(predicate)
|
|
|
|
|
|
def is_ephemeral_channel(channel) -> bool:
|
|
"""Check if channel requires ephemeral responses (chat channels)."""
|
|
if not channel or not hasattr(channel, "name"):
|
|
return False
|
|
return channel.name in ["paper-dynasty-chat", "pd-news-ticker"]
|
|
|
|
|
|
def is_restricted_channel(channel) -> bool:
|
|
"""Check if channel is restricted for certain commands (chat/ticker channels)."""
|
|
if not channel or not hasattr(channel, "name"):
|
|
return False
|
|
return channel.name in ["paper-dynasty-chat", "pd-news-ticker"]
|
|
|
|
|
|
def can_send_message(channel) -> bool:
|
|
"""Check if channel supports sending messages."""
|
|
return channel and hasattr(channel, "send")
|
|
|
|
|
|
async def send_safe_message(
|
|
source: Union[discord.Interaction, commands.Context],
|
|
content: str = None,
|
|
*,
|
|
embeds: List[discord.Embed] = None,
|
|
view: discord.ui.View = None,
|
|
ephemeral: bool = False,
|
|
delete_after: float = None,
|
|
) -> discord.Message:
|
|
"""
|
|
Safely send a message using the most appropriate method based on context.
|
|
|
|
For Interactions:
|
|
1. Try edit_original_response() if deferred
|
|
2. Try followup.send() if response is done
|
|
3. Try channel.send() if channel supports it
|
|
|
|
For Context:
|
|
1. Try ctx.send()
|
|
2. Try DM to user with context info if channel send fails
|
|
|
|
Args:
|
|
source: Discord Interaction or Context object
|
|
content: Message content
|
|
embeds: List of embeds to send
|
|
view: UI view to attach
|
|
ephemeral: Whether message should be ephemeral (Interaction only)
|
|
delete_after: Seconds after which to delete message
|
|
|
|
Returns:
|
|
The sent message object
|
|
|
|
Raises:
|
|
Exception: If all send methods fail
|
|
"""
|
|
logger = logging.getLogger("discord_app")
|
|
|
|
# Prepare message kwargs
|
|
kwargs = {}
|
|
if content is not None:
|
|
kwargs["content"] = content
|
|
if embeds is not None:
|
|
kwargs["embeds"] = embeds
|
|
if view is not None:
|
|
kwargs["view"] = view
|
|
if delete_after is not None:
|
|
kwargs["delete_after"] = delete_after
|
|
|
|
# Handle Interaction objects
|
|
if isinstance(source, discord.Interaction):
|
|
# Add ephemeral parameter for interactions
|
|
if ephemeral:
|
|
kwargs["ephemeral"] = ephemeral
|
|
|
|
# Strategy 1: Try edit_original_response if already deferred
|
|
if source.response.is_done():
|
|
try:
|
|
# For edit_original_response, we need to handle embeds differently
|
|
edit_kwargs = kwargs.copy()
|
|
if "embeds" in edit_kwargs:
|
|
# edit_original_response expects 'embeds' parameter
|
|
pass # Already correct
|
|
if "ephemeral" in edit_kwargs:
|
|
# Can't change ephemeral status on edit
|
|
del edit_kwargs["ephemeral"]
|
|
|
|
await source.edit_original_response(**edit_kwargs)
|
|
# edit_original_response doesn't return a message object in the same way
|
|
# We'll use followup as backup to get a returnable message
|
|
if (
|
|
"delete_after" not in kwargs
|
|
): # Don't create extra messages if auto-deleting
|
|
return await source.followup.send(
|
|
"Message sent", ephemeral=True, delete_after=0.1
|
|
)
|
|
return None # Can't return meaningful message object from edit
|
|
except Exception as e:
|
|
logger.debug(f"Failed to edit original response: {e}")
|
|
|
|
# Strategy 2: Try followup.send()
|
|
try:
|
|
return await source.followup.send(**kwargs)
|
|
except Exception as e:
|
|
logger.debug(f"Failed to send followup message: {e}")
|
|
|
|
# Strategy 3: Try channel.send() if possible
|
|
if can_send_message(source.channel):
|
|
try:
|
|
# Remove ephemeral for channel send (not supported)
|
|
channel_kwargs = kwargs.copy()
|
|
if "ephemeral" in channel_kwargs:
|
|
del channel_kwargs["ephemeral"]
|
|
return await source.channel.send(**channel_kwargs)
|
|
except Exception as e:
|
|
logger.debug(f"Failed to send channel message: {e}")
|
|
|
|
# All interaction methods failed
|
|
logger.error(
|
|
f"All interaction message send methods failed for user {source.user.id}"
|
|
)
|
|
raise RuntimeError(
|
|
"Unable to send interaction message through any available method"
|
|
)
|
|
|
|
# Handle Context objects
|
|
elif isinstance(source, commands.Context):
|
|
# Strategy 1: Try ctx.send() directly
|
|
try:
|
|
# Remove ephemeral (not supported in Context)
|
|
ctx_kwargs = kwargs.copy()
|
|
if "ephemeral" in ctx_kwargs:
|
|
del ctx_kwargs["ephemeral"]
|
|
return await source.send(**ctx_kwargs)
|
|
except Exception as e:
|
|
logger.debug(f"Failed to send context message to channel: {e}")
|
|
|
|
# Strategy 2: Try DM to user with context info
|
|
try:
|
|
# Prepare DM with context information
|
|
channel_name = getattr(source.channel, "name", "Unknown Channel")
|
|
guild_name = (
|
|
getattr(source.guild, "name", "Unknown Server")
|
|
if source.guild
|
|
else "DM"
|
|
)
|
|
|
|
dm_content = f"[Bot Response from #{channel_name} in {guild_name}]\n\n"
|
|
if content:
|
|
dm_content += content
|
|
|
|
# Send DM with modified content
|
|
dm_kwargs = kwargs.copy()
|
|
dm_kwargs["content"] = dm_content
|
|
if "ephemeral" in dm_kwargs:
|
|
del dm_kwargs["ephemeral"]
|
|
|
|
return await source.author.send(**dm_kwargs)
|
|
except Exception as dm_error:
|
|
logger.error(
|
|
f"Failed to send DM fallback to user {source.author.id}: {dm_error}"
|
|
)
|
|
# Both ctx.send() and DM failed - let the exception bubble up
|
|
raise dm_error
|
|
|
|
else:
|
|
raise TypeError(
|
|
f"Source must be discord.Interaction or commands.Context, got {type(source)}"
|
|
)
|
|
|
|
|
|
def get_role(ctx, role_name):
|
|
return discord.utils.get(ctx.guild.roles, name=role_name)
|
|
|
|
|
|
async def team_summary_embed(team, ctx, include_roster: bool = True):
|
|
embed = get_team_embed(f"{team['lname']} Overview", team)
|
|
|
|
embed.add_field(name="General Manager", value=team["gmname"], inline=False)
|
|
embed.add_field(name="Wallet", value=f"{team['wallet']}₼")
|
|
# embed.add_field(name='Collection Value', value=team['collection_value'])
|
|
|
|
p_query = await db_get("packs", params=[("team_id", team["id"]), ("opened", False)])
|
|
if p_query["count"] > 0:
|
|
all_packs = {}
|
|
for x in p_query["packs"]:
|
|
if x["pack_type"]["name"] not in all_packs:
|
|
all_packs[x["pack_type"]["name"]] = 1
|
|
else:
|
|
all_packs[x["pack_type"]["name"]] += 1
|
|
|
|
pack_string = ""
|
|
for pack_type in all_packs:
|
|
pack_string += f"{pack_type.title()}: {all_packs[pack_type]}\n"
|
|
else:
|
|
pack_string = "None"
|
|
embed.add_field(name="Unopened Packs", value=pack_string)
|
|
embed.add_field(name="Team Rating", value=f"{team['ranking']}")
|
|
|
|
r_query = await db_get(f"results/team/{team['id']}?season={PD_SEASON}")
|
|
if r_query:
|
|
embed.add_field(
|
|
name="Record",
|
|
value=f"Ranked: {r_query['ranked_wins']}-{r_query['ranked_losses']}\n"
|
|
f"Unlimited: {r_query['casual_wins']}-{r_query['casual_losses']}",
|
|
)
|
|
|
|
# try:
|
|
# r_query = await db_get('rosters', params=[('team_id', team['id'])])
|
|
# if r_query['count']:
|
|
# embed.add_field(name=f'Rosters', value=f'** **', inline=False)
|
|
# for roster in r_query['rosters']:
|
|
# roster_string = ''
|
|
# for i in range(1, 27):
|
|
# card = roster[f'card_{i}']
|
|
# roster_string += f'{card["player"]["description"]} ({card["player"]["pos_1"]})\n'
|
|
# embed.add_field(
|
|
# name=f'{roster["name"]} Roster',
|
|
# value=roster_string if len(roster_string) else "Unknown"
|
|
# )
|
|
# else:
|
|
# embed.add_field(
|
|
# name='Rosters',
|
|
# value='You can set up to three rosters for quick switching from your team sheet.',
|
|
# inline=False
|
|
# )
|
|
# except Exception as e:
|
|
# logger.error(f'Could not pull rosters for {team["abbrev"]}')
|
|
# embed.add_field(
|
|
# name='Rosters',
|
|
# value='Unable to pull current rosters. `/pullroster` to sync.',
|
|
# inline=False
|
|
# )
|
|
|
|
if include_roster:
|
|
embed.add_field(name="Team Sheet", value=get_roster_sheet(team), inline=False)
|
|
|
|
embed.add_field(
|
|
name="For Help",
|
|
value=f"`/help-pd` has FAQs; feel free to post questions in "
|
|
f"{get_channel(ctx, 'paper-dynasty-chat').mention}.",
|
|
inline=False,
|
|
)
|
|
|
|
return embed
|
|
|
|
|
|
async def give_cards_to_team(
|
|
team, players: list = None, player_ids: list = None, pack_id=None
|
|
):
|
|
if not pack_id:
|
|
p_query = await db_post(
|
|
"packs/one",
|
|
payload={
|
|
"team_id": team["id"],
|
|
"pack_type_id": 4,
|
|
"open_time": datetime.datetime.timestamp(datetime.datetime.now())
|
|
* 1000,
|
|
},
|
|
)
|
|
pack_id = p_query["id"]
|
|
|
|
if not players and not player_ids:
|
|
raise ValueError(
|
|
"One of players or player_ids must be provided to distribute cards"
|
|
)
|
|
|
|
if players:
|
|
await db_post(
|
|
"cards",
|
|
payload={
|
|
"cards": [
|
|
{
|
|
"player_id": x["player_id"],
|
|
"team_id": team["id"],
|
|
"pack_id": pack_id,
|
|
}
|
|
for x in players
|
|
]
|
|
},
|
|
timeout=10,
|
|
)
|
|
elif player_ids:
|
|
await db_post(
|
|
"cards",
|
|
payload={
|
|
"cards": [
|
|
{"player_id": x, "team_id": team["id"], "pack_id": pack_id}
|
|
for x in player_ids
|
|
]
|
|
},
|
|
timeout=10,
|
|
)
|
|
|
|
|
|
def get_ratings_guide(sheets):
|
|
this_sheet = sheets.open_by_key(RATINGS_SHEET_KEY)
|
|
b_sheet = this_sheet.worksheet_by_title("ratings_Batters")
|
|
p_sheet = this_sheet.worksheet_by_title("ratings_Pitchers")
|
|
|
|
b_data = b_sheet.range("A2:N")
|
|
p_data = p_sheet.range("A2:N")
|
|
|
|
try:
|
|
batters = [
|
|
{
|
|
"player_id": int(x[0].value),
|
|
"p_name": x[1].value,
|
|
"rating": int(x[2].value),
|
|
"contact-r": int(x[3].value),
|
|
"contact-l": int(x[4].value),
|
|
"power-r": int(x[5].value),
|
|
"power-l": int(x[6].value),
|
|
"vision": int(x[7].value),
|
|
"speed": int(x[8].value),
|
|
"stealing": int(x[9].value),
|
|
"reaction": int(x[10].value),
|
|
"arm": int(x[11].value),
|
|
"fielding": int(x[12].value),
|
|
"hand": int(x[13].value),
|
|
}
|
|
for x in b_data
|
|
]
|
|
pitchers = [
|
|
{
|
|
"player_id": int(x[0].value),
|
|
"p_name": x[1].value,
|
|
"rating": int(x[2].value),
|
|
"control-r": int(x[3].value),
|
|
"control-l": int(x[4].value),
|
|
"stuff-r": int(x[5].value),
|
|
"stuff-l": int(x[6].value),
|
|
"stamina": int(x[7].value),
|
|
"fielding": int(x[8].value),
|
|
"hit-9": int(x[9].value),
|
|
"k-9": int(x[10].value),
|
|
"bb-9": int(x[11].value),
|
|
"hr-9": int(x[12].value),
|
|
"hand": int(x[13].value),
|
|
}
|
|
for x in p_data
|
|
]
|
|
except Exception as e:
|
|
return {"valid": False}
|
|
|
|
return {"valid": True, "batter_ratings": batters, "pitcher_ratings": pitchers}
|
|
|
|
|
|
async def paperdex_cardset_embed(team: dict, this_cardset: dict) -> list[discord.Embed]:
|
|
all_dex = await db_get(
|
|
"paperdex",
|
|
params=[
|
|
("team_id", team["id"]),
|
|
("cardset_id", this_cardset["id"]),
|
|
("flat", True),
|
|
],
|
|
)
|
|
dex_player_list = [x["player"] for x in all_dex["paperdex"]]
|
|
|
|
hof_embed = get_team_embed(f"{team['lname']} Collection", team=team)
|
|
mvp_embed = get_team_embed(f"{team['lname']} Collection", team=team)
|
|
as_embed = get_team_embed(f"{team['lname']} Collection", team=team)
|
|
sta_embed = get_team_embed(f"{team['lname']} Collection", team=team)
|
|
res_embed = get_team_embed(f"{team['lname']} Collection", team=team)
|
|
rep_embed = get_team_embed(f"{team['lname']} Collection", team=team)
|
|
|
|
coll_data = {
|
|
99: {"name": "Hall of Fame", "owned": 0, "players": [], "embeds": [hof_embed]},
|
|
1: {"name": "MVP", "owned": 0, "players": [], "embeds": [mvp_embed]},
|
|
2: {"name": "All-Star", "owned": 0, "players": [], "embeds": [as_embed]},
|
|
3: {"name": "Starter", "owned": 0, "players": [], "embeds": [sta_embed]},
|
|
4: {"name": "Reserve", "owned": 0, "players": [], "embeds": [res_embed]},
|
|
5: {"name": "Replacement", "owned": 0, "players": [], "embeds": [rep_embed]},
|
|
"total_owned": 0,
|
|
}
|
|
|
|
set_players = await db_get(
|
|
"players",
|
|
params=[("cardset_id", this_cardset["id"]), ("flat", True), ("inc_dex", False)],
|
|
timeout=5,
|
|
)
|
|
|
|
for player in set_players["players"]:
|
|
if player["player_id"] in dex_player_list:
|
|
coll_data[player["rarity"]]["owned"] += 1
|
|
coll_data["total_owned"] += 1
|
|
player["owned"] = True
|
|
else:
|
|
player["owned"] = False
|
|
|
|
logger.debug(f"player: {player} / type: {type(player)}")
|
|
coll_data[player["rarity"]]["players"].append(player)
|
|
|
|
cover_embed = get_team_embed(f"{team['lname']} Collection", team=team)
|
|
cover_embed.description = this_cardset["name"]
|
|
cover_embed.add_field(name="# Total Cards", value=f"{set_players['count']}")
|
|
cover_embed.add_field(name="# Collected", value=f"{coll_data['total_owned']}")
|
|
display_embeds = [cover_embed]
|
|
|
|
for rarity_id in coll_data:
|
|
if rarity_id != "total_owned":
|
|
if coll_data[rarity_id]["players"]:
|
|
coll_data[rarity_id]["embeds"][
|
|
0
|
|
].description = f"Rarity: {coll_data[rarity_id]['name']}"
|
|
coll_data[rarity_id]["embeds"][0].add_field(
|
|
name="# Collected / # Total Cards",
|
|
value=f"{coll_data[rarity_id]['owned']} / {len(coll_data[rarity_id]['players'])}",
|
|
inline=False,
|
|
)
|
|
|
|
chunk_string = ""
|
|
for index, this_player in enumerate(coll_data[rarity_id]["players"]):
|
|
logger.debug(f"this_player: {this_player}")
|
|
chunk_string += "☑ " if this_player["owned"] else "⬜ "
|
|
chunk_string += f"{this_player['p_name']}\n"
|
|
|
|
if (index + 1) == len(coll_data[rarity_id]["players"]):
|
|
coll_data[rarity_id]["embeds"][0].add_field(
|
|
name=f"Group {math.ceil((index + 1) / 20)} / "
|
|
f"{math.ceil(len(coll_data[rarity_id]['players']) / 20)}",
|
|
value=chunk_string,
|
|
)
|
|
|
|
elif (index + 1) % 20 == 0:
|
|
coll_data[rarity_id]["embeds"][0].add_field(
|
|
name=f"Group {math.floor((index + 1) / 20)} / "
|
|
f"{math.ceil(len(coll_data[rarity_id]['players']) / 20)}",
|
|
value=chunk_string,
|
|
)
|
|
chunk_string = ""
|
|
|
|
display_embeds.append(coll_data[rarity_id]["embeds"][0])
|
|
|
|
return display_embeds
|
|
|
|
|
|
async def paperdex_team_embed(team: dict, mlb_team: dict) -> list[discord.Embed]:
|
|
all_dex = await db_get(
|
|
"paperdex",
|
|
params=[
|
|
("team_id", team["id"]),
|
|
("franchise", mlb_team["lname"]),
|
|
("flat", True),
|
|
],
|
|
)
|
|
dex_player_list = [x["player"] for x in all_dex["paperdex"]]
|
|
|
|
c_query = await db_get("cardsets")
|
|
coll_data = {"total_owned": 0}
|
|
|
|
total_players = 0
|
|
for x in c_query["cardsets"]:
|
|
set_players = await db_get(
|
|
"players",
|
|
params=[
|
|
("cardset_id", x["id"]),
|
|
("franchise", mlb_team["lname"]),
|
|
("flat", True),
|
|
("inc_dex", False),
|
|
],
|
|
)
|
|
if set_players is not None:
|
|
coll_data[x["id"]] = {
|
|
"name": x["name"],
|
|
"owned": 0,
|
|
"players": [],
|
|
"embeds": [get_team_embed(f"{team['lname']} Collection", team=team)],
|
|
}
|
|
total_players += set_players["count"]
|
|
|
|
for player in set_players["players"]:
|
|
if player["player_id"] in dex_player_list:
|
|
coll_data[x["id"]]["owned"] += 1
|
|
coll_data["total_owned"] += 1
|
|
player["owned"] = True
|
|
else:
|
|
player["owned"] = False
|
|
|
|
logger.debug(f"player: {player} / type: {type(player)}")
|
|
coll_data[x["id"]]["players"].append(player)
|
|
|
|
cover_embed = get_team_embed(f"{team['lname']} Collection", team=team)
|
|
cover_embed.description = mlb_team["lname"]
|
|
cover_embed.add_field(name="# Total Cards", value=f"{total_players}")
|
|
cover_embed.add_field(name="# Collected", value=f"{coll_data['total_owned']}")
|
|
display_embeds = [cover_embed]
|
|
|
|
for cardset_id in coll_data:
|
|
if cardset_id != "total_owned":
|
|
if coll_data[cardset_id]["players"]:
|
|
coll_data[cardset_id]["embeds"][0].description = (
|
|
f"{mlb_team['lname']} / {coll_data[cardset_id]['name']}"
|
|
)
|
|
coll_data[cardset_id]["embeds"][0].add_field(
|
|
name="# Collected / # Total Cards",
|
|
value=f"{coll_data[cardset_id]['owned']} / {len(coll_data[cardset_id]['players'])}",
|
|
inline=False,
|
|
)
|
|
|
|
chunk_string = ""
|
|
for index, this_player in enumerate(coll_data[cardset_id]["players"]):
|
|
logger.debug(f"this_player: {this_player}")
|
|
chunk_string += "☑ " if this_player["owned"] else "⬜ "
|
|
chunk_string += f"{this_player['p_name']}\n"
|
|
|
|
if (index + 1) == len(coll_data[cardset_id]["players"]):
|
|
coll_data[cardset_id]["embeds"][0].add_field(
|
|
name=f"Group {math.ceil((index + 1) / 20)} / "
|
|
f"{math.ceil(len(coll_data[cardset_id]['players']) / 20)}",
|
|
value=chunk_string,
|
|
)
|
|
|
|
elif (index + 1) % 20 == 0:
|
|
coll_data[cardset_id]["embeds"][0].add_field(
|
|
name=f"Group {math.floor((index + 1) / 20)} / "
|
|
f"{math.ceil(len(coll_data[cardset_id]['players']) / 20)}",
|
|
value=chunk_string,
|
|
)
|
|
chunk_string = ""
|
|
|
|
display_embeds.append(coll_data[cardset_id]["embeds"][0])
|
|
|
|
return display_embeds
|
|
|
|
|
|
def get_pack_cover(pack):
|
|
if pack["pack_cardset"] is not None and pack["pack_cardset"] == 23:
|
|
return IMAGES["pack-pkmnbs"]
|
|
elif pack["pack_type"]["name"] in ["Premium", "MVP"]:
|
|
return IMAGES["pack-pre"]
|
|
elif pack["pack_type"]["name"] == "Standard":
|
|
return IMAGES["pack-sta"]
|
|
elif pack["pack_type"]["name"] == "Mario":
|
|
return IMAGES["pack-mar"]
|
|
else:
|
|
return None
|
|
|
|
|
|
async def open_st_pr_packs(all_packs: list, team: dict, context):
|
|
pack_channel = get_channel(context, "pack-openings")
|
|
pack_cover = get_pack_cover(all_packs[0])
|
|
|
|
if pack_cover is None:
|
|
pack_channel = context.channel
|
|
|
|
if not pack_channel:
|
|
raise ValueError(
|
|
f"I cannot find the pack-openings channel. {get_cal_user(context).mention} - halp?"
|
|
)
|
|
|
|
pack_ids = await roll_for_cards(all_packs)
|
|
if not pack_ids:
|
|
logger.error(f"open_packs - unable to roll_for_cards for packs: {all_packs}")
|
|
raise ValueError(f"I was not able to unpack these cards")
|
|
|
|
all_cards = []
|
|
for p_id in pack_ids:
|
|
new_cards = await db_get("cards", params=[("pack_id", p_id)])
|
|
all_cards.extend(new_cards["cards"])
|
|
|
|
if not all_cards:
|
|
logger.error(f"open_packs - unable to get cards for packs: {pack_ids}")
|
|
raise ValueError(f"I was not able to display these cards")
|
|
|
|
# Present cards to opening channel
|
|
if type(context) == commands.Context:
|
|
author = context.author
|
|
else:
|
|
author = context.user
|
|
|
|
await context.channel.send(content=f"Let's head down to {pack_channel.mention}!")
|
|
await display_cards(all_cards, team, pack_channel, author, pack_cover=pack_cover)
|
|
|
|
|
|
async def get_choice_from_cards(
|
|
interaction: discord.Interaction,
|
|
all_players: list = None,
|
|
cover_title: str = None,
|
|
cover_desc: str = None,
|
|
cover_image_url: str = None,
|
|
callback=None,
|
|
temp_message: str = None,
|
|
conf_message: str = None,
|
|
delete_message: bool = False,
|
|
):
|
|
# Display them with pagination, prev/next/select
|
|
card_embeds = [
|
|
await get_card_embeds(
|
|
{
|
|
"player": x,
|
|
"team": {
|
|
"lname": "Paper Dynasty",
|
|
"season": PD_SEASON,
|
|
"logo": IMAGES["logo"],
|
|
},
|
|
}
|
|
)
|
|
for x in all_players
|
|
]
|
|
logger.debug(f"card embeds: {card_embeds}")
|
|
|
|
if cover_title is not None and cover_image_url is not None:
|
|
page_num = 0
|
|
|
|
view = Pagination([interaction.user], timeout=30)
|
|
view.left_button.disabled = True
|
|
view.left_button.label = f"Prev: -/{len(card_embeds)}"
|
|
view.cancel_button.label = f"Take This Card"
|
|
view.cancel_button.style = discord.ButtonStyle.success
|
|
view.cancel_button.disabled = True
|
|
view.right_button.label = f"Next: 1/{len(card_embeds)}"
|
|
|
|
msg = await interaction.channel.send(
|
|
content=None,
|
|
embed=image_embed(
|
|
image_url=cover_image_url, title=cover_title, desc=cover_desc
|
|
),
|
|
view=view,
|
|
)
|
|
else:
|
|
page_num = 1
|
|
|
|
view = Pagination([interaction.user], timeout=30)
|
|
view.left_button.label = f"Prev: -/{len(card_embeds)}"
|
|
view.left_button.disabled = True
|
|
view.cancel_button.label = f"Take This Card"
|
|
view.cancel_button.style = discord.ButtonStyle.success
|
|
view.right_button.label = f"Next: {page_num + 1}/{len(card_embeds)}"
|
|
|
|
msg = await interaction.channel.send(
|
|
content=None, embeds=card_embeds[page_num - 1], view=view
|
|
)
|
|
|
|
if temp_message is not None:
|
|
temp_msg = await interaction.channel.send(content=temp_message)
|
|
else:
|
|
temp_msg = None
|
|
|
|
while True:
|
|
await view.wait()
|
|
|
|
if view.value:
|
|
if view.value == "cancel":
|
|
await msg.edit(view=None)
|
|
|
|
if callback is not None:
|
|
callback(all_players[page_num - 1])
|
|
|
|
if conf_message is not None:
|
|
if temp_msg is not None:
|
|
await temp_msg.edit(content=conf_message)
|
|
else:
|
|
await interaction.channel.send(content=conf_message)
|
|
break
|
|
if view.value == "left":
|
|
page_num -= 1 if page_num > 1 else len(card_embeds)
|
|
if view.value == "right":
|
|
page_num += 1 if page_num < len(card_embeds) else 1
|
|
else:
|
|
if page_num == len(card_embeds):
|
|
page_num = 1
|
|
else:
|
|
page_num += 1
|
|
|
|
view.value = None
|
|
|
|
view = Pagination([interaction.user], timeout=30)
|
|
view.left_button.label = f"Prev: {page_num - 1}/{len(card_embeds)}"
|
|
view.cancel_button.label = f"Take This Card"
|
|
view.cancel_button.style = discord.ButtonStyle.success
|
|
view.right_button.label = f"Next: {page_num + 1}/{len(card_embeds)}"
|
|
if page_num == 1:
|
|
view.left_button.label = f"Prev: -/{len(card_embeds)}"
|
|
view.left_button.disabled = True
|
|
elif page_num == len(card_embeds):
|
|
view.right_button.label = f"Next: -/{len(card_embeds)}"
|
|
view.right_button.disabled = True
|
|
|
|
await msg.edit(content=None, embeds=card_embeds[page_num - 1], view=view)
|
|
|
|
if delete_message:
|
|
await msg.delete()
|
|
return all_players[page_num - 1]
|
|
|
|
|
|
async def open_choice_pack(
|
|
this_pack, team: dict, context, cardset_id: Optional[int] = None
|
|
):
|
|
pack_channel = get_channel(context, "pack-openings")
|
|
pack_cover = get_pack_cover(this_pack)
|
|
pack_type = this_pack["pack_type"]["name"]
|
|
|
|
players = []
|
|
|
|
if pack_type == "Mario":
|
|
d1000 = random.randint(1, 1000)
|
|
if d1000 > 800:
|
|
rarity_id = 5
|
|
elif d1000 > 550:
|
|
rarity_id = 3
|
|
else:
|
|
rarity_id = 2
|
|
pl = await db_get(
|
|
"players/random",
|
|
params=[
|
|
("cardset_id", 8),
|
|
("min_rarity", rarity_id),
|
|
("max_rarity", rarity_id),
|
|
("limit", 4),
|
|
],
|
|
)
|
|
players = pl["players"]
|
|
elif pack_type == "Team Choice":
|
|
if this_pack["pack_team"] is None:
|
|
raise KeyError(f"Team not listed for Team Choice pack")
|
|
|
|
d1000 = random.randint(1, 1000)
|
|
pack_cover = this_pack["pack_team"]["logo"]
|
|
if d1000 > 800:
|
|
rarity_id = 5
|
|
pack_cover = IMAGES["mvp"][this_pack["pack_team"]["lname"]]
|
|
elif d1000 > 550:
|
|
rarity_id = 3
|
|
else:
|
|
rarity_id = 2
|
|
|
|
# # HAX FOR SOCC TO GET HIS MVP PACK
|
|
# if (team['abbrev'] in ['KSK', 'NJY']) and (datetime.datetime.today().day == 24):
|
|
# rarity_id = 5
|
|
|
|
min_rarity = rarity_id
|
|
while len(players) < 4 and rarity_id < 10:
|
|
params = [
|
|
("min_rarity", min_rarity),
|
|
("max_rarity", rarity_id),
|
|
("limit", 4 - len(players)),
|
|
("franchise", this_pack["pack_team"]["lname"]),
|
|
]
|
|
if this_pack["pack_team"]["abbrev"] not in ["MSS"]:
|
|
params.append(("in_packs", True))
|
|
if cardset_id is not None:
|
|
params.append(("cardset_id", cardset_id))
|
|
pl = await db_get("players/random", params=params)
|
|
if pl["count"] >= 0:
|
|
for x in pl["players"]:
|
|
if x not in players:
|
|
players.append(x)
|
|
if len(players) < 4:
|
|
min_rarity += 1
|
|
rarity_id += 1
|
|
elif pack_type == "Promo Choice":
|
|
if this_pack["pack_cardset"] is None:
|
|
raise KeyError(f"Cardset not listed for Promo Choice pack")
|
|
|
|
d1000 = random.randint(1, 1000)
|
|
pack_cover = IMAGES["mvp-hype"]
|
|
cardset_id = this_pack["pack_cardset"]["id"]
|
|
rarity_id = 5
|
|
if d1000 > 800:
|
|
rarity_id = 8
|
|
|
|
while len(players) < 4 and rarity_id < 10:
|
|
pl = await db_get(
|
|
"players/random",
|
|
params=[
|
|
("cardset_id", cardset_id),
|
|
("min_rarity", rarity_id),
|
|
("max_rarity", rarity_id),
|
|
("limit", 8),
|
|
],
|
|
)
|
|
if pl["count"] >= 0:
|
|
for x in pl["players"]:
|
|
if len(players) >= 4:
|
|
break
|
|
if x not in players:
|
|
players.append(x)
|
|
if len(players) < 4:
|
|
cardset_id = LIVE_CARDSET_ID
|
|
else:
|
|
# Get 4 MVP cards
|
|
rarity_id = 5
|
|
if pack_type == "HoF":
|
|
rarity_id = 8
|
|
elif pack_type == "All Star":
|
|
rarity_id = 3
|
|
|
|
min_rarity = rarity_id
|
|
while len(players) < 4 and rarity_id < 10:
|
|
params = [
|
|
("min_rarity", min_rarity),
|
|
("max_rarity", rarity_id),
|
|
("limit", 4),
|
|
("in_packs", True),
|
|
]
|
|
if this_pack["pack_team"] is not None:
|
|
params.append(("franchise", this_pack["pack_team"]["lname"]))
|
|
if cardset_id is not None:
|
|
params.append(("cardset_id", cardset_id))
|
|
pl = await db_get("players/random", params=params)
|
|
|
|
if pl["count"] > 0:
|
|
players.extend(pl["players"])
|
|
if len(players) < 4:
|
|
rarity_id += 3
|
|
|
|
if len(players) == 0:
|
|
logger.error(f"Could not create choice pack")
|
|
raise ConnectionError(f"Could not create choice pack")
|
|
|
|
if type(context) == commands.Context:
|
|
author = context.author
|
|
else:
|
|
author = context.user
|
|
|
|
logger.info(f"helpers - open_choice_pack - players: {players}")
|
|
|
|
# Display them with pagination, prev/next/select
|
|
card_embeds = [
|
|
await get_card_embeds(
|
|
# {'player': x, 'team': {'lname': 'Paper Dynasty', 'season': PD_SEASON, 'logo': IMAGES['logo']}}
|
|
{"player": x, "team": team} # Show team and dupe info
|
|
)
|
|
for x in players
|
|
]
|
|
logger.debug(f"card embeds: {card_embeds}")
|
|
page_num = 0
|
|
|
|
view = Pagination([author], timeout=30)
|
|
view.left_button.disabled = True
|
|
view.left_button.label = f"Prev: -/{len(card_embeds)}"
|
|
view.cancel_button.label = f"Take This Card"
|
|
view.cancel_button.style = discord.ButtonStyle.success
|
|
view.cancel_button.disabled = True
|
|
view.right_button.label = f"Next: 1/{len(card_embeds)}"
|
|
|
|
# React to selection
|
|
await context.channel.send(f"Let's head down to {pack_channel.mention}!")
|
|
msg = await pack_channel.send(
|
|
content=None,
|
|
embed=image_embed(
|
|
pack_cover,
|
|
title=f"{team['lname']}",
|
|
desc=f"{pack_type} Pack - Choose 1 of 4 {pack_type}s!",
|
|
),
|
|
view=view,
|
|
)
|
|
if rarity_id >= 5:
|
|
tmp_msg = await pack_channel.send(
|
|
content=f"<@&1163537676885033010> we've got an MVP!"
|
|
)
|
|
else:
|
|
tmp_msg = await pack_channel.send(content=f"We've got a choice pack here!")
|
|
|
|
while True:
|
|
await view.wait()
|
|
|
|
if view.value:
|
|
if view.value == "cancel":
|
|
await msg.edit(view=None)
|
|
|
|
try:
|
|
await give_cards_to_team(
|
|
team, players=[players[page_num - 1]], pack_id=this_pack["id"]
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"failed to create cards: {e}")
|
|
raise ConnectionError(f"Failed to distribute these cards.")
|
|
|
|
await db_patch(
|
|
"packs",
|
|
object_id=this_pack["id"],
|
|
params=[
|
|
(
|
|
"open_time",
|
|
int(
|
|
datetime.datetime.timestamp(datetime.datetime.now())
|
|
* 1000
|
|
),
|
|
)
|
|
],
|
|
)
|
|
await tmp_msg.edit(
|
|
content=f"{players[page_num - 1]['p_name']} has been added to the "
|
|
f"**{team['sname']}** binder!"
|
|
)
|
|
break
|
|
if view.value == "left":
|
|
page_num -= 1 if page_num > 1 else len(card_embeds)
|
|
if view.value == "right":
|
|
page_num += 1 if page_num < len(card_embeds) else 1
|
|
else:
|
|
if page_num == len(card_embeds):
|
|
page_num = 1
|
|
else:
|
|
page_num += 1
|
|
|
|
view.value = None
|
|
|
|
view = Pagination([author], timeout=30)
|
|
view.left_button.label = f"Prev: {page_num - 1}/{len(card_embeds)}"
|
|
view.cancel_button.label = f"Take This Card"
|
|
view.cancel_button.style = discord.ButtonStyle.success
|
|
view.right_button.label = f"Next: {page_num + 1}/{len(card_embeds)}"
|
|
if page_num == 1:
|
|
view.left_button.label = f"Prev: -/{len(card_embeds)}"
|
|
view.left_button.disabled = True
|
|
elif page_num == len(card_embeds):
|
|
view.right_button.label = f"Next: -/{len(card_embeds)}"
|
|
view.right_button.disabled = True
|
|
|
|
await msg.edit(content=None, embeds=card_embeds[page_num - 1], view=view)
|
|
|
|
|
|
async def confirm_pack_purchase(
|
|
interaction, owner_team, num_packs, total_cost, pack_embed
|
|
):
|
|
view = Confirm(responders=[interaction.user], timeout=30)
|
|
await interaction.channel.send(content=None, embed=pack_embed)
|
|
question = await interaction.channel.send(
|
|
content=f"Your Wallet: {owner_team['wallet']}₼\n"
|
|
f"Pack{'s' if num_packs > 1 else ''} Price: {total_cost}₼\n"
|
|
f"After Purchase: {owner_team['wallet'] - total_cost}₼\n\n"
|
|
f"Would you like to make this purchase?",
|
|
view=view,
|
|
)
|
|
await view.wait()
|
|
|
|
if not view.value:
|
|
await question.edit(content="Saving that money. Smart.", view=None)
|
|
return None
|
|
else:
|
|
return question
|
|
|
|
|
|
def player_desc(this_player) -> str:
|
|
if this_player["p_name"] in this_player["description"]:
|
|
return this_player["description"]
|
|
return f"{this_player['description']} {this_player['p_name']}"
|
|
|
|
|
|
def player_pcard(this_player):
|
|
if this_player["image"] is not None and "pitching" in this_player["image"]:
|
|
return this_player["image"]
|
|
elif this_player["image2"] is not None and "pitching" in this_player["image2"]:
|
|
return this_player["image2"]
|
|
else:
|
|
return this_player["image"]
|
|
|
|
|
|
def player_bcard(this_player):
|
|
if this_player["image"] is not None and "batting" in this_player["image"]:
|
|
return this_player["image"]
|
|
elif this_player["image2"] is not None and "batting" in this_player["image2"]:
|
|
return this_player["image2"]
|
|
# elif this_player['image'] is not None and 'pitching' in this_player['image']:
|
|
# return PITCHER_BATTING_CARD
|
|
else:
|
|
return this_player["image"]
|