paper-dynasty-discord/helpers/main.py
cal 3ce5aebc57
All checks were successful
Ruff Lint / lint (pull_request) Successful in 16s
Merge branch 'main' into ai/paper-dynasty-database#77
2026-03-23 20:05:29 +00:00

2185 lines
75 KiB
Python

import asyncio
import datetime
import logging
import math
import random
import discord
import aiohttp
from discord.ext import commands
from api_calls import *
from bs4 import BeautifulSoup
from typing import Optional, Union, List
from in_game.gameplay_models import Team
from constants import *
from discord_ui import *
from random_content import *
from utils import (
get_roster_sheet,
get_cal_user,
)
from search_utils import *
from .discord_utils import *
# Refractor tier badge prefixes for card embeds (T0 = no badge)
TIER_BADGES = {1: "BC", 2: "R", 3: "GR", 4: "SF"}
async def get_player_photo(player):
search_term = player["bbref_id"] if player["bbref_id"] else player["p_name"]
req_url = (
f"https://www.thesportsdb.com/api/v1/json/1/searchplayers.php?p={search_term}"
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(
req_url, timeout=aiohttp.ClientTimeout(total=0.5)
) as resp:
if resp.status == 200:
data = await resp.json()
if data["player"] and data["player"][0]["strSport"] == "Baseball":
await db_patch(
"players",
object_id=player["player_id"],
params=[("headshot", data["player"][0]["strThumb"])],
)
return data["player"][0]["strThumb"]
except Exception:
return None
return None
async def get_player_headshot(player):
search_term = player["bbref_id"] if player["bbref_id"] else player["p_name"]
req_url = (
f"https://www.baseball-reference.com/search/search.fcgi?search={search_term}"
)
try:
resp = requests.get(req_url, timeout=2).text
soup = BeautifulSoup(resp, "html.parser")
for item in soup.find_all("img"):
if "headshot" in item["src"]:
await db_patch(
"players",
object_id=player["player_id"],
params=[("headshot", item["src"])],
)
return item["src"]
except:
pass
return await get_player_photo(player)
"""
NEW FOR SEASON 4
"""
async def get_team_by_owner(owner_id: int):
team = await db_get("teams", params=[("gm_id", owner_id)])
if not team["count"]:
return None
# Prefer the non-gauntlet team (main team) if multiple teams exist
for t in team["teams"]:
if "gauntlet" not in t["abbrev"].lower():
return t
# Fallback to first team if all are gauntlet teams
return team["teams"][0]
async def team_role(ctx, team: Team):
return await get_or_create_role(ctx, f"{team.abbrev} - {team.lname}")
def get_all_pos(player):
all_pos = []
for x in range(1, 8):
if player[f"pos_{x}"]:
all_pos.append(player[f"pos_{x}"])
return all_pos
async def share_channel(channel, user, read_only=False):
await channel.set_permissions(user, read_messages=True, send_messages=not read_only)
async def get_card_embeds(card, include_stats=False) -> list:
tier_badge = ""
try:
evo_state = await db_get(f"evolution/cards/{card['id']}")
if evo_state and evo_state.get("current_tier", 0) > 0:
tier = evo_state["current_tier"]
badge = TIER_BADGES.get(tier)
tier_badge = f"[{badge}] " if badge else ""
except Exception:
pass
embed = discord.Embed(
title=f"{tier_badge}{card['player']['p_name']}",
color=int(card["player"]["rarity"]["color"], 16),
)
# embed.description = card['team']['lname']
embed.description = (
f"{card['player']['cardset']['name']} / {card['player']['mlbclub']}"
)
embed.set_author(
name=card["team"]["lname"], url=IMAGES["logo"], icon_url=card["team"]["logo"]
)
embed.set_footer(
text=f"Paper Dynasty Season {card['team']['season']}", icon_url=IMAGES["logo"]
)
if include_stats:
b_query = await db_get(
"plays/batting",
params=[("player_id", card["player"]["player_id"]), ("season", PD_SEASON)],
)
p_query = await db_get(
"plays/pitching",
params=[("player_id", card["player"]["player_id"]), ("season", PD_SEASON)],
)
embed.add_field(name="Player ID", value=f"{card['player']['player_id']}")
embed.add_field(name="Rarity", value=f"{card['player']['rarity']['name']}")
embed.add_field(name="Cost", value=f"{card['player']['cost']}")
pos_string = ", ".join(get_all_pos(card["player"]))
embed.add_field(name="Positions", value=pos_string)
# all_dex = card['player']['paperdex']
all_dex = await db_get(
"paperdex", params=[("player_id", card["player"]["player_id"]), ("flat", True)]
)
count = all_dex["count"]
if card["team"]["lname"] != "Paper Dynasty":
bool_list = [
True
for elem in all_dex["paperdex"]
if elem["team"] == card["team"].get("id", None)
]
if any(bool_list):
if count == 1:
coll_string = "Only you"
else:
coll_string = (
f"You and {count - 1} other{'s' if count - 1 != 1 else ''}"
)
elif count:
coll_string = f"{count} other team{'s' if count != 1 else ''}"
else:
coll_string = "0 teams"
embed.add_field(name="Collected By", value=coll_string)
else:
embed.add_field(
name="Collected By", value=f"{count} team{'s' if count != 1 else ''}"
)
if card["team"]["lname"] != "Paper Dynasty":
team_dex = await db_get(
"cards",
params=[
("player_id", card["player"]["player_id"]),
("team_id", card["team"]["id"]),
],
)
if team_dex is not None:
dupe_count = max(0, team_dex["count"] - 1)
embed.add_field(
name="Dupes", value=f"{dupe_count} dupe{'s' if dupe_count != 1 else ''}"
)
# embed.add_field(name='Team', value=f'{card["player"]["mlbclub"]}')
if card["player"]["franchise"] != "Pokemon":
player_pages = f"[BBRef](https://www.baseball-reference.com/players/{card['player']['bbref_id'][0]}/{card['player']['bbref_id']}.shtml)"
else:
player_pages = f"[Pkmn]({PKMN_REF_URL}{card['player']['bbref_id']})"
embed.add_field(name="Player Page", value=f"{player_pages}")
embed.set_image(url=card["player"]["image"])
headshot = (
card["player"]["headshot"]
if card["player"]["headshot"]
else await get_player_headshot(card["player"])
)
if headshot:
embed.set_thumbnail(url=headshot)
else:
embed.set_thumbnail(url=IMAGES["logo"])
if card["player"]["franchise"] == "Pokemon":
if card["player"]["fangr_id"] is not None:
try:
evo_mon = await db_get(
"players", object_id=card["player"]["fangr_id"], none_okay=True
)
if evo_mon is not None:
embed.add_field(name="Evolves Into", value=f"{evo_mon['p_name']}")
except Exception as e:
logging.error(
f"could not pull evolution: {e}", exc_info=True, stack_info=True
)
if "420420" not in card["player"]["strat_code"]:
try:
evo_mon = await db_get(
"players", object_id=card["player"]["strat_code"], none_okay=True
)
if evo_mon is not None:
embed.add_field(name="Evolves From", value=f"{evo_mon['p_name']}")
except Exception as e:
logging.error(
f"could not pull evolution: {e}", exc_info=True, stack_info=True
)
if include_stats:
if b_query["count"] > 0:
b = b_query["stats"][0]
re24 = f"{b['re24']:.2f}"
batting_string = (
f"```\n"
f" AVG OBP SLG\n"
f" {b['avg']:.3f} {b['obp']:.3f} {b['slg']:.3f}\n``````\n"
f" OPS wOBA RE24\n"
f" {b['ops']:.3f} {b['woba']:.3f} {re24: ^5}\n``````\n"
f" PA H RBI 2B 3B HR SB\n"
f"{b['pa']: >3} {b['hit']: ^3} {b['rbi']: ^3} {b['double']: >2} {b['triple']: >2} "
f"{b['hr']: >2} {b['sb']: >2}```\n"
)
embed.add_field(name="Batting Stats", value=batting_string, inline=False)
if p_query["count"] > 0:
p = p_query["stats"][0]
ip_whole = math.floor(p["outs"] / 3)
ip_denom = p["outs"] % 3
ips = ip_whole + (ip_denom * 0.1)
kpbb = f"{p['k/bb']:.1f}"
era = f"{p['era']:.2f}"
whip = f"{p['whip']:.2f}"
re24 = f"{p['re24']:.2f}"
pitching_string = (
f"```\n"
f" W-L SV ERA WHIP\n"
f"{p['win']: >2}-{p['loss']: <2} {p['save']: >2} {era: >5} {whip: >4}\n``````\n"
f" IP SO K/BB RE24\n"
f"{ips: >5} {p['so']: ^3} {kpbb: ^4} {re24: ^5}\n```"
)
embed.add_field(name="Pitching Stats", value=pitching_string, inline=False)
if not card["player"]["image2"]:
return [embed]
card_two = discord.Embed(color=int(card["player"]["rarity"]["color"], 16))
card_two.set_footer(
text=f"Paper Dynasty Season {card['team']['season']}", icon_url=IMAGES["logo"]
)
card_two.set_image(url=card["player"]["image2"])
return [embed, card_two]
def image_embed(
image_url: str,
title: str = None,
color: str = None,
desc: str = None,
author_name: str = None,
author_icon: str = None,
):
embed_color = int(SBA_COLOR, 16)
if color is not None:
embed_color = int(color, 16)
embed = discord.Embed(color=embed_color)
if title is not None:
embed.title = title
if desc is not None:
embed.description = desc
if author_name is not None:
icon = author_icon if author_icon is not None else IMAGES["logo"]
embed.set_author(name=author_name, icon_url=icon)
embed.set_footer(text=f"Paper Dynasty Season {PD_SEASON}", icon_url=IMAGES["logo"])
embed.set_image(url=image_url)
return embed
def is_shiny(card):
if card["player"]["rarity"]["value"] >= 5:
return True
return False
async def display_cards(
cards: list,
team: dict,
channel,
user,
bot=None,
pack_cover: str = None,
cust_message: str = None,
add_roster: bool = True,
pack_name: str = None,
) -> bool:
logger.info(
f"display_cards called with {len(cards)} cards for team {team.get('abbrev', 'Unknown')}"
)
try:
cards.sort(key=lambda x: x["player"]["rarity"]["value"])
logger.debug("Cards sorted successfully")
card_embeds = [await get_card_embeds(x) for x in cards]
logger.debug(f"Created {len(card_embeds)} card embeds")
page_num = 0 if pack_cover is None else -1
seen_shiny = False
logger.debug(
f"Initial page_num: {page_num}, pack_cover: {pack_cover is not None}"
)
except Exception as e:
logger.error(f"Error in display_cards initialization: {e}", exc_info=True)
return False
try:
view = Pagination([user], timeout=10)
# Use simple text arrows instead of emojis to avoid context issues
l_emoji = ""
r_emoji = ""
view.left_button.disabled = True
view.left_button.label = f"{l_emoji}Prev: -/{len(card_embeds)}"
view.cancel_button.label = "Close Pack"
view.right_button.label = f"Next: {page_num + 2}/{len(card_embeds)}{r_emoji}"
if len(cards) == 1:
view.right_button.disabled = True
logger.debug("Pagination view created successfully")
if pack_cover:
logger.debug("Sending pack cover message")
msg = await channel.send(
content=None,
embed=image_embed(pack_cover, title=f"{team['lname']}", desc=pack_name),
view=view,
)
else:
logger.debug(f"Sending card embed message for page {page_num}")
msg = await channel.send(
content=None, embeds=card_embeds[page_num], view=view
)
logger.debug("Initial message sent successfully")
except Exception as e:
logger.error(
f"Error creating view or sending initial message: {e}", exc_info=True
)
return False
try:
if cust_message:
logger.debug(f"Sending custom message: {cust_message[:50]}...")
follow_up = await channel.send(cust_message)
else:
logger.debug(f"Sending default message for {len(cards)} cards")
follow_up = await channel.send(
f"{user.mention} you've got {len(cards)} cards here"
)
logger.debug("Follow-up message sent successfully")
except Exception as e:
logger.error(f"Error sending follow-up message: {e}", exc_info=True)
return False
logger.debug("Starting main interaction loop")
while True:
try:
logger.debug(f"Waiting for user interaction on page {page_num}")
await view.wait()
logger.debug(f"User interaction received: {view.value}")
except Exception as e:
logger.error(f"Error in view.wait(): {e}", exc_info=True)
await msg.edit(view=None)
return False
if view.value:
if view.value == "cancel":
await msg.edit(view=None)
if add_roster:
await follow_up.edit(
content=f"Refresh your cards here: {get_roster_sheet(team)}"
)
return True
if view.value == "left":
page_num -= 1 if page_num > 0 else 0
if view.value == "right":
page_num += 1 if page_num < len(card_embeds) - 1 else 0
else:
if page_num == len(card_embeds) - 1:
await msg.edit(view=None)
if add_roster:
await follow_up.edit(
content=f"Refresh your cards here: {get_roster_sheet(team)}"
)
return True
else:
page_num += 1
view.value = None
try:
if is_shiny(cards[page_num]) and not seen_shiny:
logger.info(
f"Shiny card detected on page {page_num}: {cards[page_num]['player']['p_name']}"
)
seen_shiny = True
view = Pagination([user], timeout=300)
view.cancel_button.style = discord.ButtonStyle.success
view.cancel_button.label = "Flip!"
view.left_button.label = "-"
view.right_button.label = "-"
view.left_button.disabled = True
view.right_button.disabled = True
# Get MVP image safely with fallback
franchise = cards[page_num]["player"]["franchise"]
logger.debug(f"Getting MVP image for franchise: {franchise}")
mvp_image = IMAGES["mvp"].get(
franchise, IMAGES.get("mvp-hype", IMAGES["logo"])
)
await msg.edit(
embed=image_embed(
mvp_image,
color="56f1fa",
author_name=team["lname"],
author_icon=team["logo"],
),
view=view,
)
logger.debug("MVP display updated successfully")
except Exception as e:
logger.error(
f"Error processing shiny card on page {page_num}: {e}", exc_info=True
)
# Continue with regular flow instead of crashing
try:
tmp_msg = await channel.send(
content="<@&1163537676885033010> we've got an MVP!"
)
await follow_up.edit(
content="<@&1163537676885033010> we've got an MVP!"
)
await tmp_msg.delete()
except discord.errors.NotFound:
# Role might not exist or message was already deleted
await follow_up.edit(content="We've got an MVP!")
except Exception as e:
# Log error but don't crash the function
logger.error(f"Error handling MVP notification: {e}")
await follow_up.edit(content="We've got an MVP!")
await view.wait()
view = Pagination([user], timeout=10)
try:
view.right_button.label = (
f"Next: {page_num + 2}/{len(card_embeds)}{r_emoji}"
)
view.cancel_button.label = "Close Pack"
view.left_button.label = f"{l_emoji}Prev: {page_num}/{len(card_embeds)}"
if page_num == 0:
view.left_button.label = f"{l_emoji}Prev: -/{len(card_embeds)}"
view.left_button.disabled = True
elif page_num == len(card_embeds) - 1:
view.timeout = 600.0
view.right_button.label = f"Next: -/{len(card_embeds)}{r_emoji}"
view.right_button.disabled = True
logger.debug(f"Updating message to show page {page_num}/{len(card_embeds)}")
if page_num >= len(card_embeds):
logger.error(
f"Page number {page_num} exceeds card_embeds length {len(card_embeds)}"
)
page_num = len(card_embeds) - 1
await msg.edit(content=None, embeds=card_embeds[page_num], view=view)
logger.debug(f"Message updated successfully to page {page_num}")
except Exception as e:
logger.error(
f"Error updating message on page {page_num}: {e}", exc_info=True
)
# Try to clean up and return
try:
await msg.edit(view=None)
except:
pass # If this fails too, just give up
return False
async def embed_pagination(
all_embeds: list,
channel,
user: discord.Member,
custom_message: str = None,
timeout: int = 10,
start_page: int = 0,
):
if start_page > len(all_embeds) - 1 or start_page < 0:
page_num = 0
else:
page_num = start_page
view = Pagination([user], timeout=timeout)
l_emoji = ""
r_emoji = ""
view.right_button.label = f"Next: {page_num + 2}/{len(all_embeds)}{r_emoji}"
view.cancel_button.label = "Cancel"
view.left_button.label = f"{l_emoji}Prev: {page_num}/{len(all_embeds)}"
if page_num == 0:
view.left_button.label = f"{l_emoji}Prev: -/{len(all_embeds)}"
view.left_button.disabled = True
elif page_num == len(all_embeds) - 1:
view.right_button.label = f"Next: -/{len(all_embeds)}{r_emoji}"
view.right_button.disabled = True
msg = await channel.send(
content=custom_message, embed=all_embeds[page_num], view=view
)
while True:
await view.wait()
if view.value:
if view.value == "cancel":
await msg.edit(view=None)
return True
if view.value == "left":
page_num -= 1 if page_num > 0 else 0
if view.value == "right":
page_num += 1 if page_num <= len(all_embeds) else len(all_embeds)
else:
if page_num == len(all_embeds) - 1:
await msg.edit(view=None)
return True
else:
page_num += 1
view.value = None
view = Pagination([user], timeout=timeout)
view.right_button.label = f"Next: {page_num + 2}/{len(all_embeds)}{r_emoji}"
view.cancel_button.label = "Cancel"
view.left_button.label = f"{l_emoji}Prev: {page_num}/{len(all_embeds)}"
if page_num == 0:
view.left_button.label = f"{l_emoji}Prev: -/{len(all_embeds)}"
view.left_button.disabled = True
elif page_num == len(all_embeds) - 1:
view.timeout = 600.0
view.right_button.label = f"Next: -/{len(all_embeds)}{r_emoji}"
view.right_button.disabled = True
await msg.edit(content=None, embed=all_embeds[page_num], view=view)
async def get_test_pack(ctx, team):
pull_notifs = []
this_pack = await db_post(
"packs/one",
payload={
"team_id": team["id"],
"pack_type_id": 1,
"open_time": int(
datetime.datetime.timestamp(datetime.datetime.now()) * 1000
),
},
)
ft_query = await db_get("players/random", params=[("max_rarity", 1), ("limit", 3)])
four_query = await db_get(
"players/random", params=[("min_rarity", 1), ("max_rarity", 3), ("limit", 1)]
)
five_query = await db_get(
"players/random", params=[("min_rarity", 5), ("max_rarity", 5), ("limit", 1)]
)
first_three = ft_query["players"]
fourth = four_query["players"]
fifth = five_query["players"]
all_cards = [*first_three, *fourth, *fifth]
success = await db_post(
"cards",
timeout=10,
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in all_cards
]
},
)
if not success:
await ctx.send(
f"I was not able to create these cards {get_emoji(ctx, 'slight_frown')}"
)
return
for x in all_cards:
if x["rarity"]["value"] >= 3:
pull_notifs.append(x)
for pull in pull_notifs:
await db_post(
"notifs",
payload={
"created": int(
datetime.datetime.timestamp(datetime.datetime.now()) * 1000
),
"title": "Rare Pull",
"field_name": f"{player_desc(pull)} ({pull['rarity']['name']})",
"message": f"Pulled by {team['abbrev']}",
"about": f"Player-{pull['player_id']}",
},
)
return [{"player": x, "team": team} for x in all_cards]
async def roll_for_cards(all_packs: list, extra_val=None) -> list:
"""
Pack odds are calculated based on the pack type
Parameters
----------
extra_val
all_packs
Returns
-------
"""
all_players = []
team = all_packs[0]["team"]
pack_ids = []
for pack in all_packs:
counts = {
"Rep": {"count": 0, "rarity": 0},
"Res": {"count": 0, "rarity": 1},
"Sta": {"count": 0, "rarity": 2},
"All": {"count": 0, "rarity": 3},
"MVP": {"count": 0, "rarity": 5},
"HoF": {"count": 0, "rarity": 8},
}
this_pack_players = []
if pack["pack_type"]["name"] == "Standard":
# Cards 1 - 2
for x in range(2):
d_1000 = random.randint(1, 1000)
if d_1000 <= 450:
counts["Rep"]["count"] += 1
elif d_1000 <= 900:
counts["Res"]["count"] += 1
else:
counts["Sta"]["count"] += 1
# Card 3
d_1000 = random.randint(1, 1000)
if d_1000 <= 350:
counts["Rep"]["count"] += 1
elif d_1000 <= 700:
counts["Res"]["count"] += 1
elif d_1000 <= 950:
counts["Sta"]["count"] += 1
else:
counts["All"]["count"] += 1
# Card 4
d_1000 = random.randint(1, 1000)
if d_1000 <= 310:
counts["Rep"]["count"] += 1
elif d_1000 <= 620:
counts["Res"]["count"] += 1
elif d_1000 <= 940:
counts["Sta"]["count"] += 1
elif d_1000 <= 990:
counts["All"]["count"] += 1
else:
counts["MVP"]["count"] += 1
# Card 5
d_1000 = random.randint(1, 1000)
if d_1000 <= 215:
counts["Rep"]["count"] += 1
elif d_1000 <= 430:
counts["Res"]["count"] += 1
elif d_1000 <= 930:
counts["Sta"]["count"] += 1
elif d_1000 <= 980:
counts["All"]["count"] += 1
elif d_1000 <= 990:
counts["MVP"]["count"] += 1
else:
counts["HoF"]["count"] += 1
elif pack["pack_type"]["name"] == "Premium":
# Card 1
d_1000 = random.randint(1, 1000)
if d_1000 <= 400:
counts["Rep"]["count"] += 1
elif d_1000 <= 870:
counts["Res"]["count"] += 1
elif d_1000 <= 970:
counts["Sta"]["count"] += 1
elif d_1000 <= 990:
counts["All"]["count"] += 1
else:
counts["MVP"]["count"] += 1
# Card 2
d_1000 = random.randint(1, 1000)
if d_1000 <= 300:
counts["Rep"]["count"] += 1
elif d_1000 <= 770:
counts["Res"]["count"] += 1
elif d_1000 <= 970:
counts["Sta"]["count"] += 1
elif d_1000 <= 990:
counts["All"]["count"] += 1
else:
counts["MVP"]["count"] += 1
# Card 3
d_1000 = random.randint(1, 1000)
if d_1000 <= 200:
counts["Rep"]["count"] += 1
elif d_1000 <= 640:
counts["Res"]["count"] += 1
elif d_1000 <= 940:
counts["Sta"]["count"] += 1
elif d_1000 <= 990:
counts["All"]["count"] += 1
else:
counts["MVP"]["count"] += 1
# Card 4
d_1000 = random.randint(1, 1000)
if d_1000 <= 100:
counts["Rep"]["count"] += 1
if d_1000 <= 530:
counts["Res"]["count"] += 1
elif d_1000 <= 930:
counts["Sta"]["count"] += 1
elif d_1000 <= 980:
counts["All"]["count"] += 1
elif d_1000 <= 990:
counts["MVP"]["count"] += 1
else:
counts["HoF"]["count"] += 1
# Card 5
d_1000 = random.randint(1, 1000)
if d_1000 <= 380:
counts["Res"]["count"] += 1
elif d_1000 <= 880:
counts["Sta"]["count"] += 1
elif d_1000 <= 980:
counts["All"]["count"] += 1
elif d_1000 <= 990:
counts["MVP"]["count"] += 1
else:
counts["HoF"]["count"] += 1
elif pack["pack_type"]["name"] == "Check-In Player":
logger.info(
f"Building Check-In Pack // extra_val (type): {extra_val} {type(extra_val)}"
)
# Single Card
mod = 0
if isinstance(extra_val, int):
mod = extra_val
d_1000 = random.randint(1, 1000 + mod)
if d_1000 >= 1100:
counts["All"]["count"] += 1
elif d_1000 >= 1000:
counts["Sta"]["count"] += 1
elif d_1000 >= 500:
counts["Res"]["count"] += 1
else:
counts["Rep"]["count"] += 1
else:
raise TypeError(f"Pack type not recognized: {pack['pack_type']['name']}")
pull_notifs = []
for key in counts:
mvp_flag = None
if counts[key]["count"] > 0:
params = [
("min_rarity", counts[key]["rarity"]),
("max_rarity", counts[key]["rarity"]),
("limit", counts[key]["count"]),
]
if all_packs[0]["pack_team"] is not None:
params.extend(
[
("franchise", all_packs[0]["pack_team"]["sname"]),
("in_packs", True),
]
)
elif all_packs[0]["pack_cardset"] is not None:
params.append(("cardset_id", all_packs[0]["pack_cardset"]["id"]))
else:
params.append(("in_packs", True))
pl = await db_get("players/random", params=params)
if pl["count"] != counts[key]["count"]:
mvp_flag = counts[key]["count"] - pl["count"]
logging.info(
f"Set mvp flag to {mvp_flag} / cardset_id: {all_packs[0]['pack_cardset']['id']}"
)
for x in pl["players"]:
this_pack_players.append(x)
all_players.append(x)
if x["rarity"]["value"] >= 3:
pull_notifs.append(x)
if mvp_flag and all_packs[0]["pack_cardset"]["id"] not in [23]:
logging.info(f"Adding {mvp_flag} MVPs for missing cards")
pl = await db_get(
"players/random", params=[("min_rarity", 5), ("limit", mvp_flag)]
)
for x in pl["players"]:
this_pack_players.append(x)
all_players.append(x)
# Add dupes of Replacement/Reserve cards
elif mvp_flag:
logging.info(f"Adding {mvp_flag} duplicate pokemon cards")
for count in range(mvp_flag):
logging.info(f"Adding {pl['players'][0]['p_name']} to the pack")
this_pack_players.append(x)
all_players.append(pl["players"][0])
success = await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": pack["team"]["id"],
"pack_id": pack["id"],
}
for x in this_pack_players
]
},
timeout=10,
)
if not success:
raise ConnectionError("Failed to create this pack of cards.")
await db_patch(
"packs",
object_id=pack["id"],
params=[
(
"open_time",
int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000),
)
],
)
pack_ids.append(pack["id"])
for pull in pull_notifs:
logger.info(f"good pull: {pull}")
await db_post(
"notifs",
payload={
"created": int(
datetime.datetime.timestamp(datetime.datetime.now()) * 1000
),
"title": "Rare Pull",
"field_name": f"{player_desc(pull)} ({pull['rarity']['name']})",
"message": f"Pulled by {team['abbrev']}",
"about": f"Player-{pull['player_id']}",
},
)
return pack_ids
async def give_packs(team: dict, num_packs: int, pack_type: dict = None) -> dict:
"""
Parameters
----------
pack_type
team
num_packs
Returns
-------
{ 'count': int, 'packs': [ all team packs ] }
"""
pt_id = pack_type["id"] if pack_type is not None else 1
await db_post(
"packs",
payload={
"packs": [
{"team_id": team["id"], "pack_type_id": pt_id} for x in range(num_packs)
]
},
)
total_packs = await db_get(
"packs", params=[("team_id", team["id"]), ("opened", False)]
)
return total_packs
def get_sheets(bot):
try:
return bot.get_cog("Gameplay").sheets
except Exception as e:
logger.error(f"Could not grab sheets auth: {e}")
raise ConnectionError(
"Bot has not authenticated with discord; please try again in 1 minute."
)
def create_team_sheet(team, email: str, current, bot):
sheets = get_sheets(bot)
new_sheet = sheets.drive.copy_file(
f"{current['gsheet_template']}",
f"{team['lname']} Roster Sheet v{current['gsheet_version']}",
"1539D0imTMjlUx2VF3NPMt7Sv85sb2XAJ",
)
logger.info(f"new_sheet: {new_sheet}")
this_sheet = sheets.open_by_key(new_sheet["id"])
this_sheet.share(email, role="writer")
team_data = this_sheet.worksheet_by_title("Team Data")
team_data.update_values(
crange="B1:B2", values=[[f"{team['id']}"], [f"'{team_hash(team)}"]]
)
logger.debug(f"this_sheet: {this_sheet}")
return this_sheet
async def refresh_sheet(team, bot, sheets=None) -> None:
return
if not sheets:
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team["gsheet"])
my_cards = this_sheet.worksheet_by_title("My Cards")
all_cards = this_sheet.worksheet_by_title("All Cards")
my_cards.update_value("A2", "FALSE")
all_cards.update_value("A2", "FALSE")
await asyncio.sleep(1)
my_cards.update_value("A2", "TRUE")
await asyncio.sleep(0.5)
all_cards.update_value("A2", "TRUE")
def delete_sheet(team, bot):
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team["gsheet"])
this_sheet.delete()
def share_sheet(team, email, bot) -> None:
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team["gsheet"])
this_sheet.share(email, role="writer")
def int_timestamp(datetime_obj: datetime.datetime) -> int:
return int(datetime.datetime.timestamp(datetime_obj) * 1000)
def get_pos_abbrev(pos_name):
if pos_name == "Catcher":
return "C"
elif pos_name == "First Base":
return "1B"
elif pos_name == "Second Base":
return "2B"
elif pos_name == "Third Base":
return "3B"
elif pos_name == "Shortstop":
return "SS"
elif pos_name == "Left Field":
return "LF"
elif pos_name == "Center Field":
return "CF"
elif pos_name == "Right Field":
return "RF"
elif pos_name == "Pitcher":
return "P"
elif pos_name == "Designated Hitter":
return "DH"
elif pos_name == "Pinch Hitter":
return "PH"
else:
raise KeyError(f"{pos_name} is not a recognized position name")
async def cardset_search(cardset: str, cardset_list: list) -> Optional[dict]:
cardset_name = fuzzy_search(cardset, cardset_list)
if not cardset_name:
return None
c_query = await db_get("cardsets", params=[("name", cardset_name)])
if c_query["count"] == 0:
return None
return c_query["cardsets"][0]
def get_blank_team_card(player):
return {
"player": player,
"team": {
"lname": "Paper Dynasty",
"logo": IMAGES["logo"],
"season": PD_SEASON,
"id": None,
},
}
def get_rosters(team, bot, roster_num: Optional[int] = None) -> list:
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team["gsheet"])
r_sheet = this_sheet.worksheet_by_title("My Rosters")
logger.debug(f"this_sheet: {this_sheet} / r_sheet = {r_sheet}")
all_rosters = [None, None, None]
# Pull roster 1
if not roster_num or roster_num == 1:
roster_1 = r_sheet.range("B3:B28")
roster_name = r_sheet.cell("F30").value
logger.info(f"roster_1: {roster_1}")
if not roster_1[0][0].value == "":
all_rosters[0] = {
"name": roster_name,
"roster_num": 1,
"team_id": team["id"],
"cards": None,
}
all_rosters[0]["cards"] = [int(x[0].value) for x in roster_1]
# Pull roster 2
if not roster_num or roster_num == 2:
roster_2 = r_sheet.range("B29:B54")
roster_name = r_sheet.cell("F31").value
logger.info(f"roster_2: {roster_2}")
if not roster_2[0][0].value == "":
all_rosters[1] = {
"name": roster_name,
"roster_num": 2,
"team_id": team["id"],
"cards": None,
}
all_rosters[1]["cards"] = [int(x[0].value) for x in roster_2]
# Pull roster 3
if not roster_num or roster_num == 3:
roster_3 = r_sheet.range("B55:B80")
roster_name = r_sheet.cell("F32").value
logger.info(f"roster_3: {roster_3}")
if not roster_3[0][0].value == "":
all_rosters[2] = {
"name": roster_name,
"roster_num": 3,
"team_id": team["id"],
"cards": None,
}
all_rosters[2]["cards"] = [int(x[0].value) for x in roster_3]
return all_rosters
def get_roster_lineups(team, bot, roster_num, lineup_num) -> list:
sheets = get_sheets(bot)
logger.debug(f"sheets: {sheets}")
this_sheet = sheets.open_by_key(team["gsheet"])
logger.debug(f"this_sheet: {this_sheet}")
r_sheet = this_sheet.worksheet_by_title("My Rosters")
logger.debug(f"r_sheet: {r_sheet}")
if lineup_num == 1:
row_start = 9
row_end = 17
else:
row_start = 18
row_end = 26
if roster_num == 1:
l_range = f"H{row_start}:I{row_end}"
elif roster_num == 2:
l_range = f"J{row_start}:K{row_end}"
else:
l_range = f"L{row_start}:M{row_end}"
logger.debug(f"l_range: {l_range}")
raw_cells = r_sheet.range(l_range)
logger.debug(f"raw_cells: {raw_cells}")
try:
lineup_cells = [(row[0].value, int(row[1].value)) for row in raw_cells]
except ValueError:
logger.error(f"Could not pull roster for {team['abbrev']} due to a ValueError")
raise ValueError(
"Uh oh. Looks like your roster might not be saved. I am reading blanks when I try to "
"get the card IDs"
)
logger.debug(f"lineup_cells: {lineup_cells}")
return lineup_cells
def post_ratings_guide(team, bot, this_sheet=None):
if not this_sheet:
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(team["gsheet"])
p_guide = this_sheet.worksheet_by_title("Full Guide - Pitchers")
b_guide = this_sheet.worksheet_by_title("Full Guide - Batters")
p_guide.update_value("A1", RATINGS_PITCHER_FORMULA)
b_guide.update_value("A1", RATINGS_BATTER_FORMULA)
async def legal_channel(ctx):
bad_channels = ["paper-dynasty-chat", "pd-news-ticker", "pd-network-news"]
if isinstance(ctx, commands.Context):
if ctx.channel.name in bad_channels:
raise commands.CheckFailure(
f"Slide on down to the {get_channel(ctx, 'pd-bot-hole').mention} ;)"
)
else:
return True
elif ctx.channel.name in bad_channels:
# await ctx.message.add_reaction('❌')
# await ctx.send(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)')
# logger.warning(f'{ctx.author.name} posted in illegal channel.')
# return False
raise discord.app_commands.AppCommandError(
f"Slide on down to the {get_channel(ctx, 'pd-bot-hole').mention} ;)"
)
else:
return True
def app_legal_channel():
"""Check for slash commands (app_commands). Use as @app_legal_channel()"""
async def predicate(interaction: discord.Interaction) -> bool:
bad_channels = ["paper-dynasty-chat", "pd-news-ticker", "pd-network-news"]
if interaction.channel.name in bad_channels:
raise discord.app_commands.CheckFailure(
f"Slide on down to the {get_channel(interaction, 'pd-bot-hole').mention} ;)"
)
return True
return discord.app_commands.check(predicate)
def is_ephemeral_channel(channel) -> bool:
"""Check if channel requires ephemeral responses (chat channels)."""
if not channel or not hasattr(channel, "name"):
return False
return channel.name in ["paper-dynasty-chat", "pd-news-ticker"]
def is_restricted_channel(channel) -> bool:
"""Check if channel is restricted for certain commands (chat/ticker channels)."""
if not channel or not hasattr(channel, "name"):
return False
return channel.name in ["paper-dynasty-chat", "pd-news-ticker"]
def can_send_message(channel) -> bool:
"""Check if channel supports sending messages."""
return channel and hasattr(channel, "send")
async def send_safe_message(
source: Union[discord.Interaction, commands.Context],
content: str = None,
*,
embeds: List[discord.Embed] = None,
view: discord.ui.View = None,
ephemeral: bool = False,
delete_after: float = None,
) -> discord.Message:
"""
Safely send a message using the most appropriate method based on context.
For Interactions:
1. Try edit_original_response() if deferred
2. Try followup.send() if response is done
3. Try channel.send() if channel supports it
For Context:
1. Try ctx.send()
2. Try DM to user with context info if channel send fails
Args:
source: Discord Interaction or Context object
content: Message content
embeds: List of embeds to send
view: UI view to attach
ephemeral: Whether message should be ephemeral (Interaction only)
delete_after: Seconds after which to delete message
Returns:
The sent message object
Raises:
Exception: If all send methods fail
"""
logger = logging.getLogger("discord_app")
# Prepare message kwargs
kwargs = {}
if content is not None:
kwargs["content"] = content
if embeds is not None:
kwargs["embeds"] = embeds
if view is not None:
kwargs["view"] = view
if delete_after is not None:
kwargs["delete_after"] = delete_after
# Handle Interaction objects
if isinstance(source, discord.Interaction):
# Add ephemeral parameter for interactions
if ephemeral:
kwargs["ephemeral"] = ephemeral
# Strategy 1: Try edit_original_response if already deferred
if source.response.is_done():
try:
# For edit_original_response, we need to handle embeds differently
edit_kwargs = kwargs.copy()
if "embeds" in edit_kwargs:
# edit_original_response expects 'embeds' parameter
pass # Already correct
if "ephemeral" in edit_kwargs:
# Can't change ephemeral status on edit
del edit_kwargs["ephemeral"]
await source.edit_original_response(**edit_kwargs)
# edit_original_response doesn't return a message object in the same way
# We'll use followup as backup to get a returnable message
if (
"delete_after" not in kwargs
): # Don't create extra messages if auto-deleting
return await source.followup.send(
"Message sent", ephemeral=True, delete_after=0.1
)
return None # Can't return meaningful message object from edit
except Exception as e:
logger.debug(f"Failed to edit original response: {e}")
# Strategy 2: Try followup.send()
try:
return await source.followup.send(**kwargs)
except Exception as e:
logger.debug(f"Failed to send followup message: {e}")
# Strategy 3: Try channel.send() if possible
if can_send_message(source.channel):
try:
# Remove ephemeral for channel send (not supported)
channel_kwargs = kwargs.copy()
if "ephemeral" in channel_kwargs:
del channel_kwargs["ephemeral"]
return await source.channel.send(**channel_kwargs)
except Exception as e:
logger.debug(f"Failed to send channel message: {e}")
# All interaction methods failed
logger.error(
f"All interaction message send methods failed for user {source.user.id}"
)
raise RuntimeError(
"Unable to send interaction message through any available method"
)
# Handle Context objects
elif isinstance(source, commands.Context):
# Strategy 1: Try ctx.send() directly
try:
# Remove ephemeral (not supported in Context)
ctx_kwargs = kwargs.copy()
if "ephemeral" in ctx_kwargs:
del ctx_kwargs["ephemeral"]
return await source.send(**ctx_kwargs)
except Exception as e:
logger.debug(f"Failed to send context message to channel: {e}")
# Strategy 2: Try DM to user with context info
try:
# Prepare DM with context information
channel_name = getattr(source.channel, "name", "Unknown Channel")
guild_name = (
getattr(source.guild, "name", "Unknown Server")
if source.guild
else "DM"
)
dm_content = f"[Bot Response from #{channel_name} in {guild_name}]\n\n"
if content:
dm_content += content
# Send DM with modified content
dm_kwargs = kwargs.copy()
dm_kwargs["content"] = dm_content
if "ephemeral" in dm_kwargs:
del dm_kwargs["ephemeral"]
return await source.author.send(**dm_kwargs)
except Exception as dm_error:
logger.error(
f"Failed to send DM fallback to user {source.author.id}: {dm_error}"
)
# Both ctx.send() and DM failed - let the exception bubble up
raise dm_error
else:
raise TypeError(
f"Source must be discord.Interaction or commands.Context, got {type(source)}"
)
def get_role(ctx, role_name):
return discord.utils.get(ctx.guild.roles, name=role_name)
async def team_summary_embed(team, ctx, include_roster: bool = True):
embed = get_team_embed(f"{team['lname']} Overview", team)
embed.add_field(name="General Manager", value=team["gmname"], inline=False)
embed.add_field(name="Wallet", value=f"{team['wallet']}")
# embed.add_field(name='Collection Value', value=team['collection_value'])
p_query = await db_get("packs", params=[("team_id", team["id"]), ("opened", False)])
if p_query["count"] > 0:
all_packs = {}
for x in p_query["packs"]:
if x["pack_type"]["name"] not in all_packs:
all_packs[x["pack_type"]["name"]] = 1
else:
all_packs[x["pack_type"]["name"]] += 1
pack_string = ""
for pack_type in all_packs:
pack_string += f"{pack_type.title()}: {all_packs[pack_type]}\n"
else:
pack_string = "None"
embed.add_field(name="Unopened Packs", value=pack_string)
embed.add_field(name="Team Rating", value=f"{team['ranking']}")
r_query = await db_get(f"results/team/{team['id']}?season={PD_SEASON}")
if r_query:
embed.add_field(
name="Record",
value=f"Ranked: {r_query['ranked_wins']}-{r_query['ranked_losses']}\n"
f"Unlimited: {r_query['casual_wins']}-{r_query['casual_losses']}",
)
# try:
# r_query = await db_get('rosters', params=[('team_id', team['id'])])
# if r_query['count']:
# embed.add_field(name=f'Rosters', value=f'** **', inline=False)
# for roster in r_query['rosters']:
# roster_string = ''
# for i in range(1, 27):
# card = roster[f'card_{i}']
# roster_string += f'{card["player"]["description"]} ({card["player"]["pos_1"]})\n'
# embed.add_field(
# name=f'{roster["name"]} Roster',
# value=roster_string if len(roster_string) else "Unknown"
# )
# else:
# embed.add_field(
# name='Rosters',
# value='You can set up to three rosters for quick switching from your team sheet.',
# inline=False
# )
# except Exception as e:
# logger.error(f'Could not pull rosters for {team["abbrev"]}')
# embed.add_field(
# name='Rosters',
# value='Unable to pull current rosters. `/pullroster` to sync.',
# inline=False
# )
if include_roster:
embed.add_field(name="Team Sheet", value=get_roster_sheet(team), inline=False)
embed.add_field(
name="For Help",
value=f"`/help-pd` has FAQs; feel free to post questions in "
f"{get_channel(ctx, 'paper-dynasty-chat').mention}.",
inline=False,
)
return embed
async def give_cards_to_team(
team, players: list = None, player_ids: list = None, pack_id=None
):
if not pack_id:
p_query = await db_post(
"packs/one",
payload={
"team_id": team["id"],
"pack_type_id": 4,
"open_time": datetime.datetime.timestamp(datetime.datetime.now())
* 1000,
},
)
pack_id = p_query["id"]
if not players and not player_ids:
raise ValueError(
"One of players or player_ids must be provided to distribute cards"
)
if players:
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": pack_id,
}
for x in players
]
},
timeout=10,
)
elif player_ids:
await db_post(
"cards",
payload={
"cards": [
{"player_id": x, "team_id": team["id"], "pack_id": pack_id}
for x in player_ids
]
},
timeout=10,
)
def get_ratings_guide(sheets):
this_sheet = sheets.open_by_key(RATINGS_SHEET_KEY)
b_sheet = this_sheet.worksheet_by_title("ratings_Batters")
p_sheet = this_sheet.worksheet_by_title("ratings_Pitchers")
b_data = b_sheet.range("A2:N")
p_data = p_sheet.range("A2:N")
try:
batters = [
{
"player_id": int(x[0].value),
"p_name": x[1].value,
"rating": int(x[2].value),
"contact-r": int(x[3].value),
"contact-l": int(x[4].value),
"power-r": int(x[5].value),
"power-l": int(x[6].value),
"vision": int(x[7].value),
"speed": int(x[8].value),
"stealing": int(x[9].value),
"reaction": int(x[10].value),
"arm": int(x[11].value),
"fielding": int(x[12].value),
"hand": int(x[13].value),
}
for x in b_data
]
pitchers = [
{
"player_id": int(x[0].value),
"p_name": x[1].value,
"rating": int(x[2].value),
"control-r": int(x[3].value),
"control-l": int(x[4].value),
"stuff-r": int(x[5].value),
"stuff-l": int(x[6].value),
"stamina": int(x[7].value),
"fielding": int(x[8].value),
"hit-9": int(x[9].value),
"k-9": int(x[10].value),
"bb-9": int(x[11].value),
"hr-9": int(x[12].value),
"hand": int(x[13].value),
}
for x in p_data
]
except Exception:
return {"valid": False}
return {"valid": True, "batter_ratings": batters, "pitcher_ratings": pitchers}
async def paperdex_cardset_embed(team: dict, this_cardset: dict) -> list[discord.Embed]:
all_dex = await db_get(
"paperdex",
params=[
("team_id", team["id"]),
("cardset_id", this_cardset["id"]),
("flat", True),
],
)
dex_player_list = [x["player"] for x in all_dex["paperdex"]]
hof_embed = get_team_embed(f"{team['lname']} Collection", team=team)
mvp_embed = get_team_embed(f"{team['lname']} Collection", team=team)
as_embed = get_team_embed(f"{team['lname']} Collection", team=team)
sta_embed = get_team_embed(f"{team['lname']} Collection", team=team)
res_embed = get_team_embed(f"{team['lname']} Collection", team=team)
rep_embed = get_team_embed(f"{team['lname']} Collection", team=team)
coll_data = {
99: {"name": "Hall of Fame", "owned": 0, "players": [], "embeds": [hof_embed]},
1: {"name": "MVP", "owned": 0, "players": [], "embeds": [mvp_embed]},
2: {"name": "All-Star", "owned": 0, "players": [], "embeds": [as_embed]},
3: {"name": "Starter", "owned": 0, "players": [], "embeds": [sta_embed]},
4: {"name": "Reserve", "owned": 0, "players": [], "embeds": [res_embed]},
5: {"name": "Replacement", "owned": 0, "players": [], "embeds": [rep_embed]},
"total_owned": 0,
}
set_players = await db_get(
"players",
params=[("cardset_id", this_cardset["id"]), ("flat", True), ("inc_dex", False)],
timeout=5,
)
for player in set_players["players"]:
if player["player_id"] in dex_player_list:
coll_data[player["rarity"]]["owned"] += 1
coll_data["total_owned"] += 1
player["owned"] = True
else:
player["owned"] = False
logger.debug(f"player: {player} / type: {type(player)}")
coll_data[player["rarity"]]["players"].append(player)
cover_embed = get_team_embed(f"{team['lname']} Collection", team=team)
cover_embed.description = this_cardset["name"]
cover_embed.add_field(name="# Total Cards", value=f"{set_players['count']}")
cover_embed.add_field(name="# Collected", value=f"{coll_data['total_owned']}")
display_embeds = [cover_embed]
for rarity_id in coll_data:
if rarity_id != "total_owned":
if coll_data[rarity_id]["players"]:
coll_data[rarity_id]["embeds"][
0
].description = f"Rarity: {coll_data[rarity_id]['name']}"
coll_data[rarity_id]["embeds"][0].add_field(
name="# Collected / # Total Cards",
value=f"{coll_data[rarity_id]['owned']} / {len(coll_data[rarity_id]['players'])}",
inline=False,
)
chunk_string = ""
for index, this_player in enumerate(coll_data[rarity_id]["players"]):
logger.debug(f"this_player: {this_player}")
chunk_string += "" if this_player["owned"] else ""
chunk_string += f"{this_player['p_name']}\n"
if (index + 1) == len(coll_data[rarity_id]["players"]):
coll_data[rarity_id]["embeds"][0].add_field(
name=f"Group {math.ceil((index + 1) / 20)} / "
f"{math.ceil(len(coll_data[rarity_id]['players']) / 20)}",
value=chunk_string,
)
elif (index + 1) % 20 == 0:
coll_data[rarity_id]["embeds"][0].add_field(
name=f"Group {math.floor((index + 1) / 20)} / "
f"{math.ceil(len(coll_data[rarity_id]['players']) / 20)}",
value=chunk_string,
)
chunk_string = ""
display_embeds.append(coll_data[rarity_id]["embeds"][0])
return display_embeds
async def paperdex_team_embed(team: dict, mlb_team: dict) -> list[discord.Embed]:
all_dex = await db_get(
"paperdex",
params=[
("team_id", team["id"]),
("franchise", mlb_team["sname"]),
("flat", True),
],
)
dex_player_list = [x["player"] for x in all_dex["paperdex"]]
c_query = await db_get("cardsets")
coll_data = {"total_owned": 0}
total_players = 0
for x in c_query["cardsets"]:
set_players = await db_get(
"players",
params=[
("cardset_id", x["id"]),
("franchise", mlb_team["sname"]),
("flat", True),
("inc_dex", False),
],
)
if set_players is not None:
coll_data[x["id"]] = {
"name": x["name"],
"owned": 0,
"players": [],
"embeds": [get_team_embed(f"{team['lname']} Collection", team=team)],
}
total_players += set_players["count"]
for player in set_players["players"]:
if player["player_id"] in dex_player_list:
coll_data[x["id"]]["owned"] += 1
coll_data["total_owned"] += 1
player["owned"] = True
else:
player["owned"] = False
logger.debug(f"player: {player} / type: {type(player)}")
coll_data[x["id"]]["players"].append(player)
cover_embed = get_team_embed(f"{team['lname']} Collection", team=team)
cover_embed.description = mlb_team["lname"]
cover_embed.add_field(name="# Total Cards", value=f"{total_players}")
cover_embed.add_field(name="# Collected", value=f"{coll_data['total_owned']}")
display_embeds = [cover_embed]
for cardset_id in coll_data:
if cardset_id != "total_owned":
if coll_data[cardset_id]["players"]:
coll_data[cardset_id]["embeds"][
0
].description = f"{mlb_team['lname']} / {coll_data[cardset_id]['name']}"
coll_data[cardset_id]["embeds"][0].add_field(
name="# Collected / # Total Cards",
value=f"{coll_data[cardset_id]['owned']} / {len(coll_data[cardset_id]['players'])}",
inline=False,
)
chunk_string = ""
for index, this_player in enumerate(coll_data[cardset_id]["players"]):
logger.debug(f"this_player: {this_player}")
chunk_string += "" if this_player["owned"] else ""
chunk_string += f"{this_player['p_name']}\n"
if (index + 1) == len(coll_data[cardset_id]["players"]):
coll_data[cardset_id]["embeds"][0].add_field(
name=f"Group {math.ceil((index + 1) / 20)} / "
f"{math.ceil(len(coll_data[cardset_id]['players']) / 20)}",
value=chunk_string,
)
elif (index + 1) % 20 == 0:
coll_data[cardset_id]["embeds"][0].add_field(
name=f"Group {math.floor((index + 1) / 20)} / "
f"{math.ceil(len(coll_data[cardset_id]['players']) / 20)}",
value=chunk_string,
)
chunk_string = ""
display_embeds.append(coll_data[cardset_id]["embeds"][0])
return display_embeds
def get_pack_cover(pack):
if pack["pack_cardset"] is not None and pack["pack_cardset"] == 23:
return IMAGES["pack-pkmnbs"]
elif pack["pack_type"]["name"] in ["Premium", "MVP"]:
return IMAGES["pack-pre"]
elif pack["pack_type"]["name"] == "Standard":
return IMAGES["pack-sta"]
elif pack["pack_type"]["name"] == "Mario":
return IMAGES["pack-mar"]
else:
return None
async def open_st_pr_packs(all_packs: list, team: dict, context):
pack_channel = get_channel(context, "pack-openings")
pack_cover = get_pack_cover(all_packs[0])
if pack_cover is None:
pack_channel = context.channel
if not pack_channel:
raise ValueError(
f"I cannot find the pack-openings channel. {get_cal_user(context).mention} - halp?"
)
pack_ids = await roll_for_cards(all_packs)
if not pack_ids:
logger.error(f"open_packs - unable to roll_for_cards for packs: {all_packs}")
raise ValueError("I was not able to unpack these cards")
all_cards = []
for p_id in pack_ids:
new_cards = await db_get("cards", params=[("pack_id", p_id)])
for card in new_cards["cards"]:
card.setdefault("pack_id", p_id)
all_cards.extend(new_cards["cards"])
if not all_cards:
logger.error(f"open_packs - unable to get cards for packs: {pack_ids}")
raise ValueError("I was not able to display these cards")
# Present cards to opening channel
if type(context) == commands.Context:
author = context.author
else:
author = context.user
await context.channel.send(content=f"Let's head down to {pack_channel.mention}!")
await display_cards(all_cards, team, pack_channel, author, pack_cover=pack_cover)
# Create scout opportunities for each pack (Standard/Premium only)
from helpers.scouting import create_scout_opportunity, SCOUTABLE_PACK_TYPES
pack_type_name = all_packs[0].get("pack_type", {}).get("name")
if pack_type_name in SCOUTABLE_PACK_TYPES:
for p_id in pack_ids:
pack_cards = [c for c in all_cards if c.get("pack_id") == p_id]
if pack_cards:
await create_scout_opportunity(
pack_cards, team, pack_channel, author, context
)
if len(pack_ids) > 1:
await asyncio.sleep(2)
async def get_choice_from_cards(
interaction: discord.Interaction,
all_players: list = None,
cover_title: str = None,
cover_desc: str = None,
cover_image_url: str = None,
callback=None,
temp_message: str = None,
conf_message: str = None,
delete_message: bool = False,
):
# Display them with pagination, prev/next/select
card_embeds = [
await get_card_embeds(
{
"player": x,
"team": {
"lname": "Paper Dynasty",
"season": PD_SEASON,
"logo": IMAGES["logo"],
},
}
)
for x in all_players
]
logger.debug(f"card embeds: {card_embeds}")
if cover_title is not None and cover_image_url is not None:
page_num = 0
view = Pagination([interaction.user], timeout=30)
view.left_button.disabled = True
view.left_button.label = f"Prev: -/{len(card_embeds)}"
view.cancel_button.label = "Take This Card"
view.cancel_button.style = discord.ButtonStyle.success
view.cancel_button.disabled = True
view.right_button.label = f"Next: 1/{len(card_embeds)}"
msg = await interaction.channel.send(
content=None,
embed=image_embed(
image_url=cover_image_url, title=cover_title, desc=cover_desc
),
view=view,
)
else:
page_num = 1
view = Pagination([interaction.user], timeout=30)
view.left_button.label = f"Prev: -/{len(card_embeds)}"
view.left_button.disabled = True
view.cancel_button.label = "Take This Card"
view.cancel_button.style = discord.ButtonStyle.success
view.right_button.label = f"Next: {page_num + 1}/{len(card_embeds)}"
msg = await interaction.channel.send(
content=None, embeds=card_embeds[page_num - 1], view=view
)
if temp_message is not None:
temp_msg = await interaction.channel.send(content=temp_message)
else:
temp_msg = None
while True:
await view.wait()
if view.value:
if view.value == "cancel":
await msg.edit(view=None)
if callback is not None:
callback(all_players[page_num - 1])
if conf_message is not None:
if temp_msg is not None:
await temp_msg.edit(content=conf_message)
else:
await interaction.channel.send(content=conf_message)
break
if view.value == "left":
page_num -= 1 if page_num > 1 else len(card_embeds)
if view.value == "right":
page_num += 1 if page_num < len(card_embeds) else 1
else:
if page_num == len(card_embeds):
page_num = 1
else:
page_num += 1
view.value = None
view = Pagination([interaction.user], timeout=30)
view.left_button.label = f"Prev: {page_num - 1}/{len(card_embeds)}"
view.cancel_button.label = "Take This Card"
view.cancel_button.style = discord.ButtonStyle.success
view.right_button.label = f"Next: {page_num + 1}/{len(card_embeds)}"
if page_num == 1:
view.left_button.label = f"Prev: -/{len(card_embeds)}"
view.left_button.disabled = True
elif page_num == len(card_embeds):
view.right_button.label = f"Next: -/{len(card_embeds)}"
view.right_button.disabled = True
await msg.edit(content=None, embeds=card_embeds[page_num - 1], view=view)
if delete_message:
await msg.delete()
return all_players[page_num - 1]
async def open_choice_pack(
this_pack, team: dict, context, cardset_id: Optional[int] = None
):
pack_channel = get_channel(context, "pack-openings")
pack_cover = get_pack_cover(this_pack)
pack_type = this_pack["pack_type"]["name"]
players = []
if pack_type == "Mario":
d1000 = random.randint(1, 1000)
if d1000 > 800:
rarity_id = 5
elif d1000 > 550:
rarity_id = 3
else:
rarity_id = 2
pl = await db_get(
"players/random",
params=[
("cardset_id", 8),
("min_rarity", rarity_id),
("max_rarity", rarity_id),
("limit", 4),
],
)
players = pl["players"]
elif pack_type == "Team Choice":
if this_pack["pack_team"] is None:
raise KeyError("Team not listed for Team Choice pack")
d1000 = random.randint(1, 1000)
pack_cover = this_pack["pack_team"]["logo"]
if d1000 > 800:
rarity_id = 5
pack_cover = IMAGES["mvp"][this_pack["pack_team"]["lname"]]
elif d1000 > 550:
rarity_id = 3
else:
rarity_id = 2
# # HAX FOR SOCC TO GET HIS MVP PACK
# if (team['abbrev'] in ['KSK', 'NJY']) and (datetime.datetime.today().day == 24):
# rarity_id = 5
min_rarity = rarity_id
while len(players) < 4 and rarity_id < 10:
params = [
("min_rarity", min_rarity),
("max_rarity", rarity_id),
("limit", 4 - len(players)),
("franchise", this_pack["pack_team"]["sname"]),
]
# Only apply in_packs filter if no specific cardset is provided
if this_pack["pack_team"]["abbrev"] not in ["MSS"] and cardset_id is None:
params.append(("in_packs", True))
if cardset_id is not None:
params.append(("cardset_id", cardset_id))
pl = await db_get("players/random", params=params)
if pl["count"] >= 0:
for x in pl["players"]:
if x not in players:
players.append(x)
if len(players) < 4:
min_rarity += 1
rarity_id += 1
elif pack_type == "Promo Choice":
if this_pack["pack_cardset"] is None:
raise KeyError("Cardset not listed for Promo Choice pack")
d1000 = random.randint(1, 1000)
pack_cover = IMAGES["mvp-hype"]
cardset_id = this_pack["pack_cardset"]["id"]
rarity_id = 5
if d1000 > 800:
rarity_id = 8
while len(players) < 4 and rarity_id < 10:
pl = await db_get(
"players/random",
params=[
("cardset_id", cardset_id),
("min_rarity", rarity_id),
("max_rarity", rarity_id),
("limit", 8),
],
)
if pl["count"] >= 0:
for x in pl["players"]:
if len(players) >= 4:
break
if x not in players:
players.append(x)
if len(players) < 4:
cardset_id = LIVE_CARDSET_ID
else:
# Get 4 MVP cards
rarity_id = 5
if pack_type == "HoF":
rarity_id = 8
elif pack_type == "All Star":
rarity_id = 3
min_rarity = rarity_id
while len(players) < 4 and rarity_id < 10:
params = [
("min_rarity", min_rarity),
("max_rarity", rarity_id),
("limit", 4),
]
# Only apply in_packs filter if no specific cardset is provided
if cardset_id is None:
params.append(("in_packs", True))
if this_pack["pack_team"] is not None:
params.append(("franchise", this_pack["pack_team"]["sname"]))
if cardset_id is not None:
params.append(("cardset_id", cardset_id))
pl = await db_get("players/random", params=params)
if pl["count"] > 0:
players.extend(pl["players"])
if len(players) < 4:
rarity_id += 3
if len(players) == 0:
logger.error("Could not create choice pack")
raise ConnectionError("Could not create choice pack")
if type(context) == commands.Context:
author = context.author
else:
author = context.user
logger.info(f"helpers - open_choice_pack - players: {players}")
# Display them with pagination, prev/next/select
card_embeds = [
await get_card_embeds(
# {'player': x, 'team': {'lname': 'Paper Dynasty', 'season': PD_SEASON, 'logo': IMAGES['logo']}}
{"player": x, "team": team} # Show team and dupe info
)
for x in players
]
logger.debug(f"card embeds: {card_embeds}")
page_num = 0
view = Pagination([author], timeout=30)
view.left_button.disabled = True
view.left_button.label = f"Prev: -/{len(card_embeds)}"
view.cancel_button.label = "Take This Card"
view.cancel_button.style = discord.ButtonStyle.success
view.cancel_button.disabled = True
view.right_button.label = f"Next: 1/{len(card_embeds)}"
# React to selection
await context.channel.send(f"Let's head down to {pack_channel.mention}!")
msg = await pack_channel.send(
content=None,
embed=image_embed(
pack_cover,
title=f"{team['lname']}",
desc=f"{pack_type} Pack - Choose 1 of 4 {pack_type}s!",
),
view=view,
)
if rarity_id >= 5:
tmp_msg = await pack_channel.send(
content="<@&1163537676885033010> we've got an MVP!"
)
else:
tmp_msg = await pack_channel.send(content="We've got a choice pack here!")
while True:
await view.wait()
if view.value:
if view.value == "cancel":
await msg.edit(view=None)
try:
await give_cards_to_team(
team, players=[players[page_num - 1]], pack_id=this_pack["id"]
)
except Exception as e:
logger.error(f"failed to create cards: {e}")
raise ConnectionError("Failed to distribute these cards.")
await db_patch(
"packs",
object_id=this_pack["id"],
params=[
(
"open_time",
int(
datetime.datetime.timestamp(datetime.datetime.now())
* 1000
),
)
],
)
await tmp_msg.edit(
content=f"{players[page_num - 1]['p_name']} has been added to the "
f"**{team['sname']}** binder!"
)
break
if view.value == "left":
page_num -= 1 if page_num > 1 else len(card_embeds)
if view.value == "right":
page_num += 1 if page_num < len(card_embeds) else 1
else:
if page_num == len(card_embeds):
page_num = 1
else:
page_num += 1
view.value = None
view = Pagination([author], timeout=30)
view.left_button.label = f"Prev: {page_num - 1}/{len(card_embeds)}"
view.cancel_button.label = "Take This Card"
view.cancel_button.style = discord.ButtonStyle.success
view.right_button.label = f"Next: {page_num + 1}/{len(card_embeds)}"
if page_num == 1:
view.left_button.label = f"Prev: -/{len(card_embeds)}"
view.left_button.disabled = True
elif page_num == len(card_embeds):
view.right_button.label = f"Next: -/{len(card_embeds)}"
view.right_button.disabled = True
await msg.edit(content=None, embeds=card_embeds[page_num - 1], view=view)
async def confirm_pack_purchase(
interaction, owner_team, num_packs, total_cost, pack_embed
):
view = Confirm(responders=[interaction.user], timeout=30)
await interaction.channel.send(content=None, embed=pack_embed)
question = await interaction.channel.send(
content=f"Your Wallet: {owner_team['wallet']}\n"
f"Pack{'s' if num_packs > 1 else ''} Price: {total_cost}\n"
f"After Purchase: {owner_team['wallet'] - total_cost}\n\n"
f"Would you like to make this purchase?",
view=view,
)
await view.wait()
if not view.value:
await question.edit(content="Saving that money. Smart.", view=None)
return None
else:
return question
def player_desc(this_player) -> str:
if this_player["p_name"] in this_player["description"]:
return this_player["description"]
return f"{this_player['description']} {this_player['p_name']}"
def player_pcard(this_player):
if this_player["image"] is not None and "pitching" in this_player["image"]:
return this_player["image"]
elif this_player["image2"] is not None and "pitching" in this_player["image2"]:
return this_player["image2"]
else:
return this_player["image"]
def player_bcard(this_player):
if this_player["image"] is not None and "batting" in this_player["image"]:
return this_player["image"]
elif this_player["image2"] is not None and "batting" in this_player["image2"]:
return this_player["image2"]
# elif this_player['image'] is not None and 'pitching' in this_player['image']:
# return PITCHER_BATTING_CARD
else:
return this_player["image"]