paper-dynasty-discord/cogs/economy.py
Cal Corum 94f3b1dc97 fix: apply open-packs hotfix to cogs/economy.py
Port the Check-In Player pack fix from the hotfix branch to the legacy
economy.py cog (which production currently loads instead of economy_new).

- Filter auto-open pack types from the manual open-packs menu
- Add pretty_name fallback for hyphenated pack type names
- Add ruff noqa for pre-existing star import warnings

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 13:04:19 -05:00

2096 lines
79 KiB
Python

# ruff: noqa: F403, F405
import copy
import helpers
from helpers import *
import logging
import os
import random
import re
import discord
import asyncio
from discord.ext import commands, tasks
from discord.ext.commands import Greedy
from discord import app_commands, Member
from typing import Optional, Literal
from discord.app_commands import Choice
import datetime
import pygsheets
from api_calls import db_get, db_post, db_patch, db_delete, get_team_by_abbrev
from help_text import *
# date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}'
# logger.basicConfig(
# filename=f'logs/{date}.log',
# format='%(asctime)s - %(levelname)s - %(message)s',
# level=logger.WARNING
# )
# async def legal_channel(ctx):
# bad_channels = ['paper-dynasty-chat', 'pd-news-ticker']
# if ctx.message.channel.name in bad_channels:
# raise discord.ext.commands.CheckFailure(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)')
# else:
# return True
class Economy(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.daily_message_sent = True
self.weekly_update = False
bot.tree.on_error = self.on_app_command_error
# self.pd_ticker.start()
self.notif_check.start()
# def cog_unload(self):
# self.pd_ticker.cancel()
async def cog_command_error(self, ctx, error):
await ctx.send(
f"{error}\n\nRun .help <command_name> to see the command requirements"
)
async def on_app_command_error(
self,
interaction: discord.Interaction,
error: discord.app_commands.AppCommandError,
):
await interaction.channel.send(f"{error}")
async def buy_card(
self, interaction: discord.Interaction, this_player: dict, owner_team: dict
):
c_query = await db_get(
"cards",
params=[
("player_id", this_player["player_id"]),
("team_id", owner_team["id"]),
],
)
num_copies = c_query["count"] if c_query else 0
if not this_player["cardset"]["for_purchase"]:
await interaction.response.send_message(
content=f"Ope - looks like singles from the {this_player['cardset']['name']} cardset are not available "
f"for purchase."
)
return
if this_player["cost"] > owner_team["wallet"]:
await interaction.response.send_message(
content=None,
embeds=await get_card_embeds(get_blank_team_card(this_player)),
)
await interaction.channel.send(
content=f"You currently have {num_copies} cop{'ies' if num_copies != 1 else 'y'} of this card.\n\n"
f"Your Wallet: {owner_team['wallet']}\n"
f"Card Price: {this_player['cost']}\n"
f"After Purchase: {await get_emoji(interaction.guild, 'dead', False)}\n\n"
f"You will have to save up a little more."
)
return
view = Confirm(responders=[interaction.user])
await interaction.response.send_message(
content=None, embeds=await get_card_embeds(get_blank_team_card(this_player))
)
question = await interaction.channel.send(
content=f"You currently have {num_copies} cop{'ies' if num_copies != 1 else 'y'} of this card.\n\n"
f"Your Wallet: {owner_team['wallet']}\n"
f"Card Price: {this_player['cost']}\n"
f"After Purchase: {owner_team['wallet'] - this_player['cost']}\n\n"
f"Would you like to make this purchase?",
view=view,
)
await view.wait()
if not view.value:
await question.edit(content="Saving that money. Smart.", view=None)
return
purchase = await db_get(
f"teams/{owner_team['id']}/buy/players",
params=[
("ts", team_hash(owner_team)),
("ids", f"{this_player['player_id']}"),
],
timeout=10,
)
if not purchase:
await question.edit(
content=f"That didn't go through for some reason. If this happens again, go ping the shit out of Cal.",
view=None,
)
return
await question.edit(content=f"It's all yours!", view=None)
# async def slash_error(self, ctx, error):
# await ctx.send(f'{error}')
# @tasks.loop(minutes=10)
# async def pd_ticker(self):
# now = datetime.datetime.now()
# logger.info(f'Datetime: {now} / weekday: {now.weekday()}')
# guild = self.bot.get_guild(int(os.environ.get('GUILD_ID')))
# if not guild:
# return
#
# # Daily Specials Message
# if now.hour == 11 and not self.daily_message_sent:
# try:
# await helpers.send_to_channel(self.bot, 'pd-bot-hole', 'Here are the current specials:')
# await self.display_specials('pd-bot-hole')
# self.daily_message_sent = True
# except:
# await helpers.send_to_channel(self.bot, 'commissioners-office',
# 'Just tried and failed to send specials to news.')
# elif now.hour == 12 and self.daily_message_sent:
# self.daily_message_sent = False
#
# # Weekly Standings Message
# # if now.hour == 20 and not self.weekly_update:
# if now.weekday() == 0 and now.hour == 0 and not self.weekly_update:
# current = Current.get()
#
# # Send standings to Cal
# standings_embeds = self.bot.get_cog('Players').get_standings_embeds(
# current, 'week', f'Week {current.week} Standings'
# )
# await helpers.send_to_channel(
# self.bot, 'pd-news-ticker',
# content=f'Here are the final standings for week {current.week}! Cal will hand out packs in the morning.'
# )
# for x in standings_embeds:
# await helpers.send_to_channel(self.bot, 'commissioners-office', content=None, embed=x)
# await helpers.send_to_channel(self.bot, 'pd-news-ticker', content=None, embed=x)
#
# # Increment Week
# current.week += 1
# current.save()
#
# all_teams = Team.select()
# for x in all_teams:
# x.weeklyclaim = False
# x.dailyclaim = False
# x.weeklypacks = 0
# x.save()
#
# await helpers.send_to_channel(
# self.bot,
# 'commissioners-office',
# f'Flipped the week to {current.week} and updated {all_teams.count()} teams for their weekly.'
# )
# self.weekly_update = True
# elif now.weekday() != 0 and self.weekly_update:
# self.weekly_update = False
#
# db.close()
@tasks.loop(minutes=10)
async def notif_check(self):
# Check for notifications
all_notifs = await db_get("notifs", params=[("ack", False)])
if not all_notifs:
logger.debug(f"No notifications")
return
topics = {
"Price Change": {
"channel_name": "pd-market-watch",
"desc": "Modified by buying and selling",
"notifs": [],
},
"Rare Pull": {
"channel_name": "pd-network-news",
"desc": "MVP and All-Star cards pulled from packs",
"notifs": [],
},
}
for line in all_notifs["notifs"]:
if line["title"] in topics:
topics[line["title"]]["notifs"].append(line)
logger.info(f"topics:\n{topics}")
for topic in topics:
embed = get_team_embed(
title=f"{topic}{'s' if len(topics[topic]['notifs']) > 1 else ''}"
)
embed.description = topics[topic]["desc"]
p_list = {}
if topics[topic]["notifs"]:
for x in topics[topic]["notifs"]:
if x["field_name"] not in p_list:
p_list[x["field_name"]] = {
"field_name": x["field_name"],
"message": f"{x['message']}",
"count": 1,
}
else:
p_list[x["field_name"]]["message"] = f"{x['message']}"
p_list[x["field_name"]]["count"] += 1
await db_patch("notifs", object_id=x["id"], params=[("ack", True)])
logger.debug(f"p_list: {p_list}")
this_embed = copy.deepcopy(embed)
counter = 1
for player in p_list:
if counter % 25 == 0:
counter = 1
await send_to_channel(
self.bot, topics[topic]["channel_name"], embed=this_embed
)
this_embed = copy.deepcopy(embed)
this_embed.add_field(
name=p_list[player]["field_name"],
value=p_list[player]["message"],
inline=False,
)
if len(p_list) > 0:
await send_to_channel(
self.bot, topics[topic]["channel_name"], embed=this_embed
)
@notif_check.before_loop
async def before_notif_check(self):
await self.bot.wait_until_ready()
@commands.hybrid_group(
name="help-pd", help="FAQ for Paper Dynasty and the bot", aliases=["helppd"]
)
@commands.check(legal_channel)
async def pd_help_command(self, ctx: commands.Context):
if ctx.invoked_subcommand is None:
embed = get_team_embed(f"Paper Dynasty Help")
embed.description = "Frequently Asked Questions"
embed.add_field(
name="What the Heck is Paper Dynasty",
value=f"Well, whipper snapper, have a seat and I'll tell you. We're running a diamond dynasty / "
f"perfect team style game with electronic card and dice baseball!\n\nGet a starter pack, play "
f"games at your leisure either solo or against another player, and collect cards from the "
f"custom 2021 player set.",
inline=False,
)
embed.add_field(
name="How Do I Get Started",
value=f"Run the `.in` command - that's a period followed by the word \"in\". That'll get you the "
f"Paper Dynasty Players role so you can run all of the other PD commands!\n\nOnce you get your "
f"role, run `/newteam` and follow the prompts to get your starter team.",
inline=False,
)
embed.add_field(
name="How Do I Play",
value="A step-by-step of how to play was written by Riles [starting here](https://discord.com/channels"
"/613880856032968834/633456305830625303/985968300272001054). "
"In addition, you can find the Rules Reference [right here](https://docs.google.com/document/d/"
"1yGZcHy9zN2MUi4hnce12dAzlFpIApbn7zR24vCkPl1o).\n\nThere are three key differences from league "
'play:\n1) Injuries: there are no injuries in Paper Dynasty!\n2) sWAR: there is no sWAR "salary '
'cap" for your team like in league play. Some events will have roster construction rules to '
"follow, though!\n3) The Universal DH is in effect; teams may forfeit the DH at their "
"discretion.",
inline=False,
)
await ctx.send(content=None, embed=embed)
@pd_help_command.command(
name="start", help="FAQ for Paper Dynasty and the bot", aliases=["faq"]
)
@commands.check(legal_channel)
async def help_faq(self, ctx: commands.Context):
embed = get_team_embed(f"Paper Dynasty Help")
embed.description = "Frequently Asked Questions"
embed.add_field(
name="What the Heck is Paper Dynasty", value=HELP_START_WHAT, inline=False
)
embed.add_field(name="How Do I Get Started", value=HELP_START_HOW, inline=False)
embed.add_field(name="How Do I Play", value=HELP_START_PLAY, inline=False)
embed.add_field(
name="Other Questions?",
value=f"Feel free to ask any questions down in {get_channel(ctx, 'paper-dynasty-chat')} or check out "
f"the other `/help-pd` commands for the FAQs!",
)
await ctx.send(content=None, embed=embed)
@pd_help_command.command(name="links", help="Helpful links for Paper Dynasty")
@commands.check(legal_channel)
async def help_links(self, ctx: commands.Context):
current = await db_get("current")
embed = get_team_embed(f"Paper Dynasty Help")
embed.description = "Resources & Links"
embed.add_field(
name="Team Sheet Template",
value=f"{get_roster_sheet({'gsheet': current['gsheet_template']})}",
)
embed.add_field(
name="Paper Dynasty Guidelines",
value="https://docs.google.com/document/d/1ngsjbz8wYv7heSiPMJ21oKPa6JLStTsw6wNdLDnt-k4/edit?usp=sharing",
inline=False,
)
embed.add_field(
name="Rules Reference",
value="https://docs.google.com/document/d/1wu63XSgfQE2wadiegWaaDda11QvqkN0liRurKm0vcTs/edit?usp=sharing",
inline=False,
)
await ctx.send(content=None, embed=embed)
@pd_help_command.command(
name="rewards", help="How to Earn Rewards in Paper Dynasty"
)
@commands.check(legal_channel)
async def help_rewards(self, ctx: commands.Context):
embed = get_team_embed(f"Paper Dynasty Help")
embed.description = "How to Earn Rewards"
embed.add_field(name="Premium Pack", value=HELP_REWARDS_PREMIUM, inline=False)
embed.add_field(name="Standard Pack", value=HELP_REWARDS_STANDARD, inline=False)
embed.add_field(name="MantiBucks ₼", value=HELP_REWARDS_MONEY, inline=False)
embed.add_field(name="Ko-fi Shop", value=HELP_REWARDS_SHOP, inline=False)
await ctx.send(content=None, embed=embed)
@pd_help_command.command(name="team-sheet", help="How to Use Your Team Sheet")
@commands.check(legal_channel)
async def help_team_sheet(self, ctx: commands.Context):
embed = get_team_embed(f"Paper Dynasty Help")
embed.description = "How to Use Your Team Sheet"
embed.add_field(name="Your Dashboard", value=HELP_TS_DASH, inline=False)
embed.add_field(name="Roster Management", value=HELP_TS_ROSTER, inline=False)
embed.add_field(name="Marketplace", value=HELP_TS_MARKET, inline=False)
embed.add_field(name="Paper Dynasty Menu", value=HELP_TS_MENU, inline=False)
embed.set_footer(text="More details to come", icon_url=IMAGES["logo"])
await ctx.send(content=None, embed=embed)
@pd_help_command.command(name="gameplay", help="How to Play Paper Dynasty")
@commands.check(legal_channel)
async def help_gameplay(self, ctx: commands.Context):
embed = get_team_embed(f"Paper Dynasty Help")
embed.description = "How to Play Paper Dynasty"
embed.add_field(name="Game Modes", value=HELP_GAMEMODES, inline=False)
embed.add_field(
name="Start a New Game",
value=HELP_NEWGAME,
inline=False,
)
embed.add_field(name="Playing the Game", value=HELP_PLAYGAME, inline=False)
embed.add_field(
name="Ending the Game",
value=f"{HELP_ENDGAME}\n"
f"- Go post highlights in {get_channel(ctx, 'pd-news-ticker').mention}",
inline=False,
)
await ctx.send(content=None, embed=embed)
@pd_help_command.command(name="cardsets", help="Show Cardset Requirements")
@commands.check(legal_channel)
async def help_cardsets(self, ctx: commands.Context):
embed = get_team_embed(f"Paper Dynasty Help")
embed.description = "Cardset Requirements"
embed.add_field(
name="Ranked Legal", value="2005, 2025 Seasons + Promos", inline=False
)
embed.add_field(
name="Minor League",
value="Humans: Unlimited\nAI: 2005 Season / 2025 Season as backup",
inline=False,
)
embed.add_field(
name="Major League",
value="Humans: Ranked Legal\nAI: 2005, 2025, 2018, 2012 Seasons + Promos",
inline=False,
)
embed.add_field(
name="Flashback", value="2018, 2019, 2021, 2022 Seasons", inline=False
)
embed.add_field(
name="Hall of Fame",
value="Humans: Ranked Legal\nAI: Unlimited",
inline=False,
)
await ctx.send(content=None, embed=embed)
@commands.hybrid_group(name="donation", help="Mod: Give packs for PD donations")
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
async def donation(self, ctx: commands.Context):
if ctx.invoked_subcommand is None:
await ctx.send(
"To buy packs, visit https://ko-fi.com/manticorum/shop and include your discord username!"
)
@donation.command(
name="premium", help="Mod: Give premium packs", aliases=["p", "prem"]
)
async def donation_premium(self, ctx: commands.Context, num_packs: int, gm: Member):
if ctx.author.id != self.bot.owner_id:
await ctx.send("Wait a second. You're not in charge here!")
return
team = await get_team_by_owner(gm.id)
p_query = await db_get("packtypes", params=[("name", "Premium")])
if p_query["count"] == 0:
await ctx.send("Oof. I couldn't find a Premium Pack")
return
total_packs = await give_packs(
team, num_packs, pack_type=p_query["packtypes"][0]
)
await ctx.send(
f"The {team['lname']} now have {total_packs['count']} total packs!"
)
@donation.command(
name="standard", help="Mod: Give standard packs", aliases=["s", "sta"]
)
async def donation_standard(
self, ctx: commands.Context, num_packs: int, gm: Member
):
if ctx.author.id != self.bot.owner_id:
await ctx.send("Wait a second. You're not in charge here!")
return
team = await get_team_by_owner(gm.id)
p_query = await db_get("packtypes", params=[("name", "Standard")])
if p_query["count"] == 0:
await ctx.send("Oof. I couldn't find a Standard Pack")
return
total_packs = await give_packs(
team, num_packs, pack_type=p_query["packtypes"][0]
)
await ctx.send(
f"The {team['lname']} now have {total_packs['count']} total packs!"
)
@commands.hybrid_command(name="lastpack", help="Replay your last pack")
@commands.check(legal_channel)
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
async def last_pack_command(self, ctx: commands.Context):
team = await get_team_by_owner(ctx.author.id)
if not team:
await ctx.send(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
return
p_query = await db_get(
"packs",
params=[
("opened", True),
("team_id", team["id"]),
("new_to_old", True),
("limit", 1),
],
)
if not p_query["count"]:
await ctx.send(f"I do not see any packs for you, bub.")
return
pack_name = p_query["packs"][0]["pack_type"]["name"]
if pack_name == "Standard":
pack_cover = IMAGES["pack-sta"]
elif pack_name == "Premium":
pack_cover = IMAGES["pack-pre"]
else:
pack_cover = None
c_query = await db_get("cards", params=[("pack_id", p_query["packs"][0]["id"])])
if not c_query["count"]:
await ctx.send(f"Hmm...I didn't see any cards in that pack.")
return
await display_cards(
c_query["cards"],
team,
ctx.channel,
ctx.author,
self.bot,
pack_cover=pack_cover,
)
@app_commands.command(
name="comeonmanineedthis",
description="Daily check-in for cards, currency, and packs",
)
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_legal_channel()
async def daily_checkin(self, interaction: discord.Interaction):
await interaction.response.defer()
team = await get_team_by_owner(interaction.user.id)
if not team:
await interaction.edit_original_response(
content=f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
return
current = await db_get("current")
now = datetime.datetime.now()
midnight = int_timestamp(
datetime.datetime(now.year, now.month, now.day, 0, 0, 0)
)
daily = await db_get(
"rewards",
params=[
("name", "Daily Check-in"),
("team_id", team["id"]),
("created_after", midnight),
],
)
logger.debug(f"midnight: {midnight} / now: {int_timestamp(now)}")
logger.debug(f"daily_return: {daily}")
if daily:
await interaction.edit_original_response(
content=f"Looks like you already checked in today - come back at midnight Central!"
)
return
await db_post(
"rewards",
payload={
"name": "Daily Check-in",
"team_id": team["id"],
"season": current["season"],
"week": current["week"],
"created": int_timestamp(now),
},
)
current = await db_get("current")
check_ins = await db_get(
"rewards",
params=[
("name", "Daily Check-in"),
("team_id", team["id"]),
("season", current["season"]),
],
)
check_count = check_ins["count"] % 5
# 2nd, 4th, and 5th check-ins
if check_count == 0 or check_count % 2 == 0:
# Every fifth check-in
if check_count == 0:
await interaction.edit_original_response(
content=f"Hey, you just earned a Standard pack of cards!"
)
pack_channel = get_channel(interaction, "pack-openings")
p_query = await db_get("packtypes", params=[("name", "Standard")])
if not p_query:
await interaction.edit_original_response(
content=f"I was not able to pull this pack for you. "
f"Maybe ping {get_cal_user(interaction).mention}?"
)
return
# Every second and fourth check-in
else:
await interaction.edit_original_response(
content=f"Hey, you just earned a player card!"
)
pack_channel = interaction.channel
p_query = await db_get(
"packtypes", params=[("name", "Check-In Player")]
)
if not p_query:
await interaction.edit_original_response(
content=f"I was not able to pull this card for you. "
f"Maybe ping {get_cal_user(interaction).mention}?"
)
return
await give_packs(team, 1, p_query["packtypes"][0])
p_query = await db_get(
"packs",
params=[
("opened", False),
("team_id", team["id"]),
("new_to_old", True),
("limit", 1),
],
)
if not p_query["count"]:
await interaction.edit_original_response(
content=f"I do not see any packs in here. {await get_emoji(interaction, 'ConfusedPsyduck')}"
)
return
pack_ids = await roll_for_cards(
p_query["packs"], extra_val=check_ins["count"]
)
if not pack_ids:
await interaction.edit_original_response(
content=f"I was not able to create these cards {await get_emoji(interaction, 'slight_frown')}"
)
return
all_cards = []
for p_id in pack_ids:
new_cards = await db_get("cards", params=[("pack_id", p_id)])
all_cards.extend(new_cards["cards"])
if not all_cards:
await interaction.edit_original_response(
content=f"I was not able to pull these cards {await get_emoji(interaction, 'slight_frown')}"
)
return
await display_cards(
all_cards, team, pack_channel, interaction.user, self.bot
)
await refresh_sheet(team, self.bot)
return
# 1st, 3rd check-ins
else:
d_1000 = random.randint(1, 1000)
m_reward = 0
if d_1000 < 500:
m_reward = 10
elif d_1000 < 900:
m_reward = 15
elif d_1000 < 990:
m_reward = 20
else:
m_reward = 25
team = await db_post(f"teams/{team['id']}/money/{m_reward}")
await interaction.edit_original_response(
content=f"You just earned {m_reward}₼! That brings your wallet to {team['wallet']}₼!"
)
@app_commands.command(
name="open-packs", description="Open packs from your inventory"
)
@app_commands.checks.has_any_role(PD_PLAYERS)
async def open_packs_slash(self, interaction: discord.Interaction):
if interaction.channel.name in [
"paper-dynasty-chat",
"pd-news-ticker",
"pd-network-news",
]:
await interaction.response.send_message(
f"Please head to down to {get_channel(interaction, 'pd-bot-hole')} to run this command.",
ephemeral=True,
)
return
owner_team = await get_team_by_owner(interaction.user.id)
if not owner_team:
await interaction.response.send_message(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
return
p_query = await db_get(
"packs", params=[("team_id", owner_team["id"]), ("opened", False)]
)
if p_query["count"] == 0:
await interaction.response.send_message(
f"Looks like you are clean out of packs, friendo. You can earn them by playing PD games or by "
f"donating to the league."
)
return
# Pack types that are auto-opened and should not appear in the manual open menu
AUTO_OPEN_TYPES = {"Check-In Player"}
# Group packs by type and customization (e.g. Standard, Standard-Orioles, Standard-2012, Premium)
p_count = 0
p_data = {
"Standard": [],
"Premium": [],
"Daily": [],
"MVP": [],
"All Star": [],
"Mario": [],
"Team Choice": [],
}
logger.debug(f"Parsing packs...")
for pack in p_query["packs"]:
p_group = None
logger.debug(f"pack: {pack}")
logger.debug(f"pack cardset: {pack['pack_cardset']}")
if pack["pack_type"]["name"] in AUTO_OPEN_TYPES:
logger.debug(
f"Skipping auto-open pack type: {pack['pack_type']['name']}"
)
continue
if pack["pack_team"] is None and pack["pack_cardset"] is None:
p_group = pack["pack_type"]["name"]
# Add to p_data if this is a new pack type
if p_group not in p_data:
p_data[p_group] = []
elif pack["pack_team"] is not None:
if pack["pack_type"]["name"] == "Standard":
p_group = f"Standard-Team-{pack['pack_team']['id']}-{pack['pack_team']['sname']}"
elif pack["pack_type"]["name"] == "Premium":
p_group = f"Premium-Team-{pack['pack_team']['id']}-{pack['pack_team']['sname']}"
elif pack["pack_type"]["name"] == "Team Choice":
p_group = f"Team Choice-Team-{pack['pack_team']['id']}-{pack['pack_team']['sname']}"
elif pack["pack_type"]["name"] == "MVP":
p_group = f"MVP-Team-{pack['pack_team']['id']}-{pack['pack_team']['sname']}"
if pack["pack_cardset"] is not None:
p_group += f"-Cardset-{pack['pack_cardset']['id']}"
elif pack["pack_cardset"] is not None:
if pack["pack_type"]["name"] == "Standard":
p_group = f"Standard-Cardset-{pack['pack_cardset']['id']}-{pack['pack_cardset']['name']}"
elif pack["pack_type"]["name"] == "Premium":
p_group = f"Premium-Cardset-{pack['pack_cardset']['id']}-{pack['pack_cardset']['name']}"
elif pack["pack_type"]["name"] == "Team Choice":
p_group = f"Team Choice-Cardset-{pack['pack_cardset']['id']}-{pack['pack_cardset']['name']}"
elif pack["pack_type"]["name"] == "All Star":
p_group = f"All Star-Cardset-{pack['pack_cardset']['id']}-{pack['pack_cardset']['name']}"
elif pack["pack_type"]["name"] == "MVP":
p_group = f"MVP-Cardset-{pack['pack_cardset']['id']}-{pack['pack_cardset']['name']}"
elif pack["pack_type"]["name"] == "Promo Choice":
p_group = f"Promo Choice-Cardset-{pack['pack_cardset']['id']}-{pack['pack_cardset']['name']}"
logger.info(f"p_group: {p_group}")
if p_group is not None:
p_count += 1
if p_group not in p_data:
p_data[p_group] = [pack]
else:
p_data[p_group].append(pack)
if p_count == 0:
await interaction.response.send_message(
f"Looks like you are clean out of packs, friendo. You can earn them by playing PD games or by "
f"donating to the league."
)
return
# Display options and ask which group to open
embed = get_team_embed(f"Unopened Packs", team=owner_team)
embed.description = owner_team["lname"]
select_options = []
for key in p_data:
if len(p_data[key]) > 0:
pretty_name = None
# Not a specific pack
if "-" not in key:
pretty_name = key
elif "Team" in key:
pretty_name = f"{key.split('-')[0]} - {key.split('-')[3]}"
elif "Cardset" in key:
pretty_name = f"{key.split('-')[0]} - {key.split('-')[3]}"
else:
# Pack type name contains a hyphen (e.g. "Check-In Player")
pretty_name = key
if pretty_name is not None:
embed.add_field(name=pretty_name, value=f"Qty: {len(p_data[key])}")
select_options.append(
discord.SelectOption(label=pretty_name, value=key)
)
view = SelectView(
select_objects=[SelectOpenPack(select_options, owner_team)], timeout=15
)
await interaction.response.send_message(embed=embed, view=view)
group_buy = app_commands.Group(
name="buy", description="Make a purchase from the marketplace"
)
@group_buy.command(
name="card-by-id", description="Buy a player card from the marketplace"
)
@app_commands.checks.has_any_role(PD_PLAYERS)
async def buy_card_id_slash(self, interaction: discord.Interaction, player_id: int):
if interaction.channel.name in [
"paper-dynasty-chat",
"pd-news-ticker",
"pd-network-news",
]:
await interaction.response.send_message(
f"Please head to down to {get_channel(interaction, 'pd-bot-hole')} to run this command.",
ephemeral=True,
)
return
owner_team = await get_team_by_owner(interaction.user.id)
if not owner_team:
await interaction.response.send_message(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
p_query = await db_get("players", object_id=player_id, none_okay=False)
logger.debug(f"this_player: {p_query}")
await self.buy_card(interaction, p_query, owner_team)
@group_buy.command(
name="card-by-name", description="Buy a player card from the marketplace"
)
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_commands.describe(
player_name="Name of the player you want to purchase",
player_cardset="Optional: Name of the cardset the player is from",
)
async def buy_card_slash(
self,
interaction: discord.Interaction,
player_name: str,
player_cardset: Optional[str] = None,
):
if interaction.channel.name in [
"paper-dynasty-chat",
"pd-news-ticker",
"pd-network-news",
]:
await interaction.response.send_message(
f"Please head to down to {get_channel(interaction, 'pd-bot-hole')} to run this command.",
ephemeral=True,
)
return
owner_team = await get_team_by_owner(interaction.user.id)
if not owner_team:
await interaction.response.send_message(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
player_cog = self.bot.get_cog("Players")
proper_name = fuzzy_search(player_name, player_cog.player_list)
if not proper_name:
await interaction.response.send_message(f"No clue who that is.")
return
all_params = [("name", proper_name)]
if player_cardset:
this_cardset = await cardset_search(player_cardset, player_cog.cardset_list)
all_params.append(("cardset_id", this_cardset["id"]))
p_query = await db_get("players", params=all_params)
if p_query["count"] == 0:
await interaction.response.send_message(
f"I didn't find any cards for {proper_name}"
)
return
if p_query["count"] > 1:
await interaction.response.send_message(
f"I found {p_query['count']} different cards for {proper_name}. Would you please run this again "
f"with the cardset specified?"
)
return
this_player = p_query["players"][0]
logger.debug(f"this_player: {this_player}")
await self.buy_card(interaction, this_player, owner_team)
@group_buy.command(name="pack", description="Buy a pack or 7 from the marketplace")
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_commands.describe()
async def buy_pack_slash(self, interaction: discord.Interaction):
if interaction.channel.name in [
"paper-dynasty-chat",
"pd-news-ticker",
"pd-network-news",
]:
await interaction.response.send_message(
f"Please head to down to {get_channel(interaction, 'pd-bot-hole')} to run this command.",
ephemeral=True,
)
return
owner_team = await get_team_by_owner(interaction.user.id)
if not owner_team:
await interaction.response.send_message(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
p_query = await db_get("packtypes", params=[("available", True)])
if "count" not in p_query:
await interaction.response.send_message(
f"Welp, I couldn't find any packs in my database. Should probably go ping "
f"{get_cal_user(interaction).mention} about that."
)
return
embed = get_team_embed("Packs for Purchase")
# embed.description = 'Run `/buy pack <pack_name>`'
for x in p_query["packtypes"]:
embed.add_field(
name=f"{x['name']} - {x['cost']}", value=f"{x['description']}"
)
pack_options = [
x["name"] for x in p_query["packtypes"][:5] if x["available"] and x["cost"]
]
if len(pack_options) < 5:
pack_options.extend(["na" for x in range(5 - len(pack_options))])
view = ButtonOptions(
responders=[interaction.user], timeout=60, labels=pack_options
)
await interaction.response.send_message(content=None, embed=embed)
question = await interaction.channel.send(
f"Which pack would you like to purchase?", view=view
)
await view.wait()
if view.value:
pack_name = view.value
await question.delete()
this_q = Question(
self.bot, interaction.channel, "How many would you like?", "int", 60
)
num_packs = await this_q.ask([interaction.user])
else:
await question.delete()
await interaction.channel.send(
"Hm. Another window shopper. I'll be here when you're serious."
)
return
p_query = await db_get(
"packtypes",
params=[
("name", pack_name.lower().replace("pack", "")),
("available", True),
],
)
if "count" not in p_query:
await interaction.channel.send(
f"Hmm...I don't recognize {pack_name.title()} as a pack type. Check on that and get back to me.",
ephemeral=True,
)
return
pack_type = p_query["packtypes"][0]
pack_cover = IMAGES["logo"]
if pack_type["name"] == "Standard":
pack_cover = IMAGES["pack-sta"]
elif pack_type["name"] == "Premium":
pack_cover = IMAGES["pack-pre"]
elif pack_type["name"] == "Promo Choice":
pack_cover = IMAGES["mvp-hype"]
total_cost = pack_type["cost"] * num_packs
pack_embed = image_embed(
pack_cover,
title=f"{owner_team['lname']}",
desc=f"{num_packs if num_packs > 1 else ''}{'x ' if num_packs > 1 else ''}"
f"{pack_type['name']} Pack{'s' if num_packs != 1 else ''}",
)
if total_cost > owner_team["wallet"]:
await interaction.channel.send(content=None, embed=pack_embed)
await interaction.channel.send(
content=f"Your Wallet: {owner_team['wallet']}\n"
f"Pack{'s' if num_packs > 1 else ''} Price: {total_cost}\n"
f"After Purchase: {await get_emoji(interaction.guild, 'dead', False)}\n\n"
f"You will have to save up a little more."
)
return
# Get Customization and make purchase
if pack_name in ["Standard", "Premium"]:
view = ButtonOptions(
[interaction.user],
timeout=15,
labels=["No Customization", "Cardset", "Franchise", None, None],
)
view.option1.style = discord.ButtonStyle.danger
await interaction.channel.send(
content="Would you like to apply a pack customization?",
embed=pack_embed,
view=view,
)
await view.wait()
if not view.value:
await interaction.channel.send(f"You think on it and get back to me.")
return
elif view.value == "Cardset":
# await interaction.delete_original_response()
view = SelectView(
[
SelectBuyPacksCardset(
owner_team,
num_packs,
pack_type["id"],
pack_embed,
total_cost,
)
]
)
await interaction.channel.send(content=None, view=view)
return
elif view.value == "Franchise":
# await interaction.delete_original_response()
view = SelectView(
[
SelectBuyPacksTeam(
"AL",
owner_team,
num_packs,
pack_type["id"],
pack_embed,
total_cost,
),
SelectBuyPacksTeam(
"NL",
owner_team,
num_packs,
pack_type["id"],
pack_embed,
total_cost,
),
],
timeout=30,
)
await interaction.channel.send(content=None, view=view)
return
question = await confirm_pack_purchase(
interaction, owner_team, num_packs, total_cost, pack_embed
)
if question is None:
return
purchase = await db_get(
f"teams/{owner_team['id']}/buy/pack/{pack_type['id']}",
params=[("ts", team_hash(owner_team)), ("quantity", num_packs)],
)
if not purchase:
await question.edit(
f"That didn't go through for some reason. If this happens again, go ping the shit out of Cal.",
view=None,
)
return
await question.edit(
content=f"{'They are' if num_packs > 1 else 'It is'} all yours! Go rip 'em with `/open-packs`",
view=None,
)
return
@app_commands.command(
name="selldupes", description="Sell all of your duplicate cards"
)
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_legal_channel()
@app_commands.describe(
immediately="Skip all prompts and sell dupes immediately; default False",
skip_live="Skip all live series cards; default True",
)
async def sell_dupes_command(
self,
interaction: discord.Interaction,
skip_live: bool = True,
immediately: bool = False,
):
team = await get_team_by_owner(interaction.user.id)
if not team:
await interaction.response.send_message(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!",
ephemeral=True,
)
return
await interaction.response.send_message(
f"Let me flip through your cards. This could take a while if you have a ton of cards..."
)
try:
c_query = await db_get(
"cards", params=[("team_id", team["id"]), ("dupes", True)], timeout=15
)
except Exception as e:
await interaction.edit_original_response(
content=f"{e}\n\nSounds like a {get_cal_user(interaction).mention} problem tbh"
)
return
player_ids = []
dupe_ids = ""
dupe_cards = []
dupe_strings = ["" for x in range(20)]
str_count = 0
for card in c_query["cards"]:
if len(dupe_strings[str_count]) > 1500:
str_count += 1
logger.debug(f"card: {card}")
if skip_live and (card["player"]["cardset"]["id"] == LIVE_CARDSET_ID):
logger.debug(f"live series card - skipping")
elif card["player"]["player_id"] not in player_ids:
logger.debug(f"not a dupe")
player_ids.append(card["player"]["player_id"])
else:
logger.info(f"{team['abbrev']} duplicate card: {card['id']}")
dupe_cards.append(card)
dupe_ids += f"{card['id']},"
dupe_strings[str_count] += (
f"{card['player']['rarity']['name']} {card['player']['p_name']} - "
f"{card['player']['cardset']['name']}\n"
)
if len(dupe_cards) == 0:
await interaction.edit_original_response(
content=f"You currently have 0 duplicate cards!"
)
return
logger.info(f"sending first message / length {len(dupe_strings[0])}")
await interaction.edit_original_response(
content=f"You currently have {len(dupe_cards)} duplicate cards:\n\n{dupe_strings[0]}"
)
for x in dupe_strings[1:]:
logger.info(f"checking string: {len(x)}")
if len(x) > 0:
await interaction.channel.send(x)
else:
break
if not immediately:
view = Confirm(responders=[interaction.user])
question = await interaction.channel.send(
"Would you like to sell all of them?", view=view
)
await view.wait()
if not view.value:
await question.edit(content="We can leave them be for now.", view=None)
return
await question.edit(content=f"The sale is going through...", view=None)
# for card in dupe_cards:
sale = await db_get(
f"teams/{team['id']}/sell/cards",
params=[("ts", team_hash(team)), ("ids", dupe_ids)],
timeout=10,
)
if not sale:
await interaction.channel.send(
f"That didn't go through for some reason. Go ping the shit out of {get_cal_user(interaction).mention}."
)
return
team = await db_get("teams", object_id=team["id"])
await interaction.channel.send(f"Your Wallet: {team['wallet']}")
@app_commands.command(
name="newteam", description="Get your fresh team for a new season"
)
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_commands.describe(
gm_name="The fictional name of your team's GM",
team_abbrev="2, 3, or 4 character abbreviation (e.g. WV, ATL, MAD)",
team_full_name="City/location and name (e.g. Baltimore Orioles)",
team_short_name="Name of team (e.g. Yankees)",
mlb_anchor_team="2 or 3 character abbreviation of your anchor MLB team (e.g. NYM, MKE)",
team_logo_url="[Optional] URL ending in .png or .jpg for your team logo",
color="[Optional] Hex color code to highlight your team",
)
async def new_team_slash(
self,
interaction: discord.Interaction,
gm_name: str,
team_abbrev: str,
team_full_name: str,
team_short_name: str,
mlb_anchor_team: str,
team_logo_url: str = None,
color: str = None,
):
owner_team = await get_team_by_owner(interaction.user.id)
current = await db_get("current")
# Check for existing team
if owner_team and not os.environ.get("TESTING"):
await interaction.response.send_message(
f"Whoa there, bucko. I already have you down as GM of the {owner_team['sname']}."
)
return
# Check for duplicate team data
dupes = await db_get("teams", params=[("abbrev", team_abbrev)])
if dupes["count"]:
await interaction.response.send_message(
f"Yikes! {team_abbrev.upper()} is a popular abbreviation - it's already in use by the "
f"{dupes['teams'][0]['sname']}. No worries, though, you can run the `/newteam` command again to get "
f"started!"
)
return
# Check for duplicate team data
dupes = await db_get("teams", params=[("lname", team_full_name)])
if dupes["count"]:
await interaction.response.send_message(
f"Yikes! {team_full_name.title()} is a popular name - it's already in use by "
f"{dupes['teams'][0]['abbrev']}. No worries, though, you can run the `/newteam` command again to get "
f"started!"
)
return
# Get personal bot channel
hello_channel = discord.utils.get(
interaction.guild.text_channels,
name=f"hello-{interaction.user.name.lower()}",
)
if hello_channel:
op_ch = hello_channel
else:
op_ch = await helpers.create_channel(
interaction,
channel_name=f"hello-{interaction.user.name}",
category_name="Paper Dynasty Team",
everyone_read=False,
read_send_members=[interaction.user],
)
await share_channel(op_ch, interaction.guild.me)
await share_channel(op_ch, interaction.user)
try:
poke_role = get_role(interaction, "Pokétwo")
await share_channel(op_ch, poke_role, read_only=True)
except Exception as e:
logger.error(f"unable to share sheet with Poketwo")
await interaction.response.send_message(
f"Let's head down to your private channel: {op_ch.mention}", ephemeral=True
)
await op_ch.send(
f"Hey there, {interaction.user.mention}! I am Paper Domo - welcome to season "
f"{current['season']} of Paper Dynasty! We've got a lot of special updates in store for this "
f"season including live cards, throwback cards, and special events."
)
# Confirm user is happy with branding
embed = get_team_embed(
f"Branding Check",
{
"logo": team_logo_url if team_logo_url else None,
"color": color if color else "a6ce39",
"season": 4,
},
)
embed.add_field(name="GM Name", value=gm_name, inline=False)
embed.add_field(name="Full Team Name", value=team_full_name)
embed.add_field(name="Short Team Name", value=team_short_name)
embed.add_field(name="Team Abbrev", value=team_abbrev.upper())
view = Confirm(responders=[interaction.user])
question = await op_ch.send(
"Are you happy with this branding? Don't worry - you can update it later!",
embed=embed,
view=view,
)
await view.wait()
if not view.value:
await question.edit(
content="~~Are you happy with this branding?~~\n\nI gotta go, but when you're ready to start again "
"run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the "
"command from last time and make edits.",
view=None,
)
return
await question.edit(
content="Looking good, champ in the making! Let's get you your starter team!",
view=None,
)
team_choice = None
if mlb_anchor_team.title() in ALL_MLB_TEAMS.keys():
team_choice = mlb_anchor_team.title()
else:
for x in ALL_MLB_TEAMS:
if (
mlb_anchor_team.upper() in ALL_MLB_TEAMS[x]
or mlb_anchor_team.title() in ALL_MLB_TEAMS[x]
):
team_choice = x
break
team_string = mlb_anchor_team
logger.debug(f"team_string: {team_string} / team_choice: {team_choice}")
if not team_choice:
# Get MLB anchor team
while True:
prompt = (
f"I don't recognize **{team_string}**. I try to recognize abbreviations (BAL), "
f'short names (Orioles), and long names ("Baltimore Orioles").\n\nWhat MLB club would you '
f"like to use as your anchor team?"
)
this_q = Question(self.bot, op_ch, prompt, "text", 120)
team_string = await this_q.ask([interaction.user])
if not team_string:
await op_ch.send(
f"Tell you hwat. You think on it and come back I gotta go, but when you're ready to start again "
"run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the "
"command from last time and make edits."
)
return
if team_string.title() in ALL_MLB_TEAMS.keys():
team_choice = team_string.title()
break
else:
match = False
for x in ALL_MLB_TEAMS:
if (
team_string.upper() in ALL_MLB_TEAMS[x]
or team_string.title() in ALL_MLB_TEAMS[x]
):
team_choice = x
match = True
break
if not match:
await op_ch.send(f"Got it!")
team = await db_post(
"teams",
payload={
"abbrev": team_abbrev.upper(),
"sname": team_short_name,
"lname": team_full_name,
"gmid": interaction.user.id,
"gmname": gm_name,
"gsheet": "None",
"season": current["season"],
"wallet": 100,
"color": color if color else "a6ce39",
"logo": team_logo_url if team_logo_url else None,
},
)
if not team:
await op_ch.send(
f"Frick. {get_cal_user(interaction).mention}, can you help? I can't find this team."
)
return
t_role = await get_or_create_role(
interaction, f"{team_abbrev} - {team_full_name}"
)
await interaction.user.add_roles(t_role)
anchor_players = []
anchor_all_stars = await db_get(
"players/random",
params=[
("min_rarity", 3),
("max_rarity", 3),
("franchise", normalize_franchise(team_choice)),
("pos_exclude", "RP"),
("limit", 1),
("in_packs", True),
],
)
anchor_starters = await db_get(
"players/random",
params=[
("min_rarity", 2),
("max_rarity", 2),
("franchise", normalize_franchise(team_choice)),
("pos_exclude", "RP"),
("limit", 2),
("in_packs", True),
],
)
if not anchor_all_stars:
await op_ch.send(
f"I am so sorry, but the {team_choice} do not have an All-Star to "
f"provide as your anchor player. Let's start this process over - will you please "
f"run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the "
"command from last time and make edits."
)
await db_delete("teams", object_id=team["id"])
return
if not anchor_starters or anchor_starters["count"] <= 1:
await op_ch.send(
f"I am so sorry, but the {team_choice} do not have two Starters to "
f"provide as your anchor players. Let's start this process over - will you please "
f"run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the "
"command from last time and make edits."
)
await db_delete("teams", object_id=team["id"])
return
anchor_players.append(anchor_all_stars["players"][0])
anchor_players.append(anchor_starters["players"][0])
anchor_players.append(anchor_starters["players"][1])
this_pack = await db_post(
"packs/one",
payload={
"team_id": team["id"],
"pack_type_id": 2,
"open_time": datetime.datetime.timestamp(datetime.datetime.now())
* 1000,
},
)
roster_counts = {
"SP": 0,
"RP": 0,
"CP": 0,
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"All-Star": 0,
"Starter": 0,
"Reserve": 0,
"Replacement": 0,
}
def update_roster_counts(players: list):
for pl in players:
roster_counts[pl["rarity"]["name"]] += 1
for x in get_all_pos(pl):
roster_counts[x] += 1
logger.warning(f"Roster counts for {team['sname']}: {roster_counts}")
# Add anchor position coverage
update_roster_counts(anchor_players)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in anchor_players
]
},
timeout=10,
)
# Get 10 pitchers to seed team
five_sps = await db_get(
"players/random",
params=[("pos_include", "SP"), ("max_rarity", 1), ("limit", 5)],
)
five_rps = await db_get(
"players/random",
params=[("pos_include", "RP"), ("max_rarity", 1), ("limit", 5)],
)
team_sp = [x for x in five_sps["players"]]
team_rp = [x for x in five_rps["players"]]
update_roster_counts([*team_sp, *team_rp])
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in [*team_sp, *team_rp]
]
},
timeout=10,
)
# Collect infielders
team_infielders = []
for pos in ["C", "1B", "2B", "3B", "SS"]:
if roster_counts["Replacement"] < roster_counts["Reserve"]:
rarity_params = [("min_rarity", 0), ("max_rarity", 0)]
else:
rarity_params = [("min_rarity", 1), ("max_rarity", 1)]
r_draw = await db_get(
"players/random",
params=[("pos_include", pos), *rarity_params, ("limit", 2)],
none_okay=False,
)
team_infielders.extend(r_draw["players"])
update_roster_counts(team_infielders)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in team_infielders
]
},
timeout=10,
)
# Collect outfielders
team_outfielders = []
for pos in ["LF", "CF", "RF"]:
if roster_counts["Replacement"] < roster_counts["Reserve"]:
rarity_params = [("min_rarity", 0), ("max_rarity", 0)]
else:
rarity_params = [("min_rarity", 1), ("max_rarity", 1)]
r_draw = await db_get(
"players/random",
params=[("pos_include", pos), *rarity_params, ("limit", 2)],
none_okay=False,
)
team_outfielders.extend(r_draw["players"])
update_roster_counts(team_outfielders)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in team_outfielders
]
},
timeout=10,
)
async with op_ch.typing():
done_anc = await display_cards(
[{"player": x, "team": team} for x in anchor_players],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Let's take a look at your three {team_choice} anchor players.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
error_text = f"Yikes - I can't display the rest of your team. {get_cal_user(interaction).mention} plz halp"
if not done_anc:
await op_ch.send(error_text)
async with op_ch.typing():
done_sp = await display_cards(
[{"player": x, "team": team} for x in team_sp],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Here are your starting pitchers.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_sp:
await op_ch.send(error_text)
async with op_ch.typing():
done_rp = await display_cards(
[{"player": x, "team": team} for x in team_rp],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"And now for your bullpen.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_rp:
await op_ch.send(error_text)
async with op_ch.typing():
done_inf = await display_cards(
[{"player": x, "team": team} for x in team_infielders],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Next let's take a look at your infielders.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_inf:
await op_ch.send(error_text)
async with op_ch.typing():
done_out = await display_cards(
[{"player": x, "team": team} for x in team_outfielders],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Now let's take a look at your outfielders.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_out:
await op_ch.send(error_text)
await give_packs(team, 1)
await op_ch.send(
f"To get you started, I've spotted you 100₼ and a pack of cards. You can rip that with the "
f"`/open` command once your google sheet is set up!"
)
await op_ch.send(
f"{t_role.mention}\n\n"
f"There's your roster! We have one more step and you will be ready to play.\n\n{SHEET_SHARE_STEPS}\n\n"
f"{get_roster_sheet({'gsheet': current['gsheet_template']})}"
)
new_team_embed = await team_summary_embed(
team, interaction, include_roster=False
)
await send_to_channel(
self.bot,
"pd-network-news",
content="A new challenger approaches...",
embed=new_team_embed,
)
@commands.command(name="mlbteam", help="Mod: Load MLB team data")
@commands.is_owner()
async def mlb_team_command(
self,
ctx: commands.Context,
abbrev: str,
sname: str,
lname: str,
gmid: int,
gmname: str,
gsheet: str,
logo: str,
color: str,
ranking: int,
):
# Check for duplicate team data
dupes = await db_get("teams", params=[("abbrev", abbrev)])
if dupes["count"]:
await ctx.send(
f"Yikes! {abbrev.upper()} is a popular abbreviation - it's already in use by the "
f"{dupes['teams'][0]['sname']}. No worries, though, you can run the `/newteam` command again to get "
f"started!"
)
return
# Check for duplicate team data
dupes = await db_get("teams", params=[("lname", lname)])
if dupes["count"]:
await ctx.send(
f"Yikes! {lname.title()} is a popular name - it's already in use by "
f"{dupes['teams'][0]['abbrev']}. No worries, though, you can run the `/newteam` command again to get "
f"started!"
)
return
current = await db_get("current")
team = await db_post(
"teams",
payload={
"abbrev": abbrev.upper(),
"sname": sname,
"lname": lname,
"gmid": gmid,
"gmname": gmname,
"gsheet": gsheet,
"season": current["season"],
"wallet": 100,
"ranking": ranking,
"color": color if color else "a6ce39",
"logo": logo if logo else None,
"is_ai": True,
},
)
p_query = await db_get("players", params=[("franchise", sname)])
this_pack = await db_post(
"packs/one",
payload={
"team_id": team["id"],
"pack_type_id": 2,
"open_time": datetime.datetime.timestamp(datetime.datetime.now())
* 1000,
},
)
team_players = p_query["players"] + p_query["players"]
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in team_players
]
},
timeout=10,
)
embed = get_team_embed(f"{team['lname']}", team)
await ctx.send(content=None, embed=embed)
@commands.hybrid_command(name="mlb-update", help="Distribute MLB cards to AI teams")
@commands.is_owner()
async def mlb_update_command(self, ctx: commands.Context):
ai_teams = await db_get("teams", params=[("is_ai", True)])
if ai_teams["count"] == 0:
await ctx.send(f"I could not find any AI teams.")
return
total_cards = 0
total_teams = 0
for team in ai_teams["teams"]:
all_players = await db_get("players", params=[("franchise", team["sname"])])
new_players = []
if all_players:
for player in all_players["players"]:
owned_by_team_ids = [
entry["team"] for entry in player["paperdex"]["paperdex"]
]
if team["id"] not in owned_by_team_ids:
new_players.append(player)
if new_players:
await ctx.send(
f"Posting {len(new_players)} new cards for {team['gmname']}'s {team['sname']}..."
)
total_cards += len(new_players)
total_teams += 1
this_pack = await db_post(
"packs/one",
payload={
"team_id": team["id"],
"pack_type_id": 2,
"open_time": datetime.datetime.timestamp(
datetime.datetime.now()
)
* 1000,
},
)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in new_players
]
},
timeout=10,
)
await refresh_sheet(team, self.bot)
await ctx.send(f"All done! I added {total_cards} across {total_teams} teams.")
@commands.hybrid_command(
name="newsheet", help="Link a new team sheet with your team"
)
@commands.has_any_role(PD_PLAYERS)
async def share_sheet_command(
self,
ctx,
google_sheet_url: str,
team_abbrev: Optional[str],
copy_rosters: Optional[bool] = True,
):
owner_team = await get_team_by_owner(ctx.author.id)
if not owner_team:
await ctx.send(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
return
team = owner_team
if team_abbrev and team_abbrev != owner_team["abbrev"]:
if ctx.author.id != 258104532423147520:
await ctx.send(
f"You can only update the team sheet for your own team, you goober."
)
return
else:
team = await get_team_by_abbrev(team_abbrev)
current = await db_get("current")
if current["gsheet_template"] in google_sheet_url:
await ctx.send(
f"Ope, looks like that is the template sheet. Would you please make a copy and then share?"
)
return
gauntlet_team = await get_team_by_abbrev(f"Gauntlet-{owner_team['abbrev']}")
if gauntlet_team:
view = ButtonOptions(
[ctx.author],
timeout=30,
labels=["Main Team", "Gauntlet Team", None, None, None],
)
question = await ctx.send(
f"Is this sheet for your main PD team or your active Gauntlet team?",
view=view,
)
await view.wait()
if not view.value:
await question.edit(
content=f"Okay you keep thinking on it and get back to me when you're ready.",
view=None,
)
return
elif view.value == "Gauntlet Team":
await question.delete()
team = gauntlet_team
sheets = get_sheets(self.bot)
response = await ctx.send(f"I'll go grab that sheet...")
try:
new_sheet = sheets.open_by_url(google_sheet_url)
except Exception as e:
logger.error(f"Error accessing {team['abbrev']} sheet: {e}")
current = await db_get("current")
await ctx.send(
f"I wasn't able to access that sheet. Did you remember to share it with my PD email?"
f"\n\nHere's a quick refresher:\n{SHEET_SHARE_STEPS}\n\n"
f"{get_roster_sheet({'gsheet': current['gsheet_template']})}"
)
return
team_data = new_sheet.worksheet_by_title("Team Data")
if not gauntlet_team or owner_team != gauntlet_team:
team_data.update_values(
crange="B1:B2", values=[[f"{team['id']}"], [f"'{team_hash(team)}"]]
)
if copy_rosters and team["gsheet"].lower() != "none":
old_sheet = sheets.open_by_key(team["gsheet"])
r_sheet = old_sheet.worksheet_by_title(f"My Rosters")
roster_ids = r_sheet.range("B3:B80")
lineups_data = r_sheet.range("H4:M26")
new_r_data, new_l_data = [], []
for row in roster_ids:
if row[0].value != "":
new_r_data.append([int(row[0].value)])
else:
new_r_data.append([None])
logger.debug(f"new_r_data: {new_r_data}")
for row in lineups_data:
logger.debug(f"row: {row}")
new_l_data.append(
[
row[0].value if row[0].value != "" else None,
int(row[1].value) if row[1].value != "" else None,
row[2].value if row[2].value != "" else None,
int(row[3].value) if row[3].value != "" else None,
row[4].value if row[4].value != "" else None,
int(row[5].value) if row[5].value != "" else None,
]
)
logger.debug(f"new_l_data: {new_l_data}")
new_r_sheet = new_sheet.worksheet_by_title(f"My Rosters")
new_r_sheet.update_values(crange="B3:B80", values=new_r_data)
new_r_sheet.update_values(crange="H4:M26", values=new_l_data)
if team["has_guide"]:
post_ratings_guide(team, self.bot, this_sheet=new_sheet)
team = await db_patch(
"teams", object_id=team["id"], params=[("gsheet", new_sheet.id)]
)
await refresh_sheet(team, self.bot, sheets)
conf_message = f"Alright, your sheet is linked to your team - good luck"
if owner_team == team:
conf_message += " this season!"
else:
conf_message += " on your run!"
conf_message += f"\n\n{HELP_SHEET_SCRIPTS}"
await response.edit(content=f"{conf_message}")
# @commands.hybrid_command(name='refresh', help='Refresh team data in Sheets')
# @commands.has_any_role(PD_PLAYERS)
# async def update_team(self, ctx):
# if not await legal_channel(ctx):
# await ctx.send(f'Slide on down to the {get_channel(ctx, "pd-bot-hole").mention} ;)')
# return
#
# team = await get_team_by_owner(ctx.author.id)
# if not team:
# await ctx.send(
# f'I don\'t see a team for you, yet. You can sign up with the `/newteam` command!'
# )
# return
#
# await refresh_sheet(team, self.bot)
# await ctx.send(random_conf_gif())
#
# # if abbrev and self.bot.is_owner(ctx.author):
# # team = Team.get_season(abbrev[0])
# # else:
# # team = Team.get_by_owner(ctx.author.id)
# # if not team:
# # await ctx.send('Now you wait just a second. You don\'t have a team!')
# # return
# #
# # # Get data from Sheets
# # roster_data = await self.get_collection(ctx, team)
# #
# # # Cut any marked players
# # comp_trade = True
# # if len(roster_data['cut']) > 0:
# # comp_trade = await self.cut_players(ctx, team, roster_data['cut'])
# #
# # if not comp_trade:
# # return
# #
# # # Set new rostered list
# # self.set_rostered_players(team, roster_data['rostered'])
# #
# # # Send current data to Sheets
# # if not await self.write_collection(ctx, team, extra=len(roster_data['cut'])):
# # logger.error(f'There was an issue trying to update the {team.sname} roster.')
# # await helpers.pause_then_type(ctx, 'Yikes. I had an issue with Sheets. Send help.')
# # else:
# # await helpers.pause_then_type(ctx, 'Alrighty, your sheet is all up to date!')
# # if team.logo:
# # thumb = team.logo
# # else:
# # thumb = self.bot.get_user(team.gmid).avatar_url
# # await ctx.send(content=None, embed=helpers.get_active_roster(team, thumb))
# #
# # db.close()
@commands.hybrid_command(name="give-card", help="Mod: Give free card to team")
# @commands.is_owner()
@commands.has_any_role("PD Gift Players")
async def give_card_command(self, ctx, player_ids: str, team_abbrev: str):
if ctx.channel.name in [
"paper-dynasty-chat",
"pd-news-ticker",
"pd-network-news",
]:
await ctx.send(
f"Please head to down to {get_channel(ctx, 'pd-bot-hole')} to run this command."
)
return
question = await ctx.send(f"I'll go put that card on their roster...")
all_player_ids = player_ids.split(" ")
t_query = await db_get("teams", params=[("abbrev", team_abbrev)])
if not t_query["count"]:
await ctx.send(f"I could not find {team_abbrev}")
return
team = t_query["teams"][0]
this_pack = await db_post(
"packs/one",
payload={
"team_id": team["id"],
"pack_type_id": 4,
"open_time": datetime.datetime.timestamp(datetime.datetime.now())
* 1000,
},
)
try:
await give_cards_to_team(
team, player_ids=all_player_ids, pack_id=this_pack["id"]
)
except Exception as e:
logger.error(f"failed to create cards: {e}")
raise ConnectionError(f"Failed to distribute these cards.")
await question.edit(content=f"Alrighty, now I'll refresh their sheet...")
await refresh_sheet(team, self.bot)
await question.edit(content=f"All done!")
await send_to_channel(
self.bot,
channel_name="commissioners-office",
content=f"Just sent {len(all_player_ids)} players to {ctx.message.author.mention}:\n{all_player_ids}",
)
@commands.command(name="cleartest", hidden=True)
@commands.is_owner()
async def clear_test_command(self, ctx):
team = await get_team_by_owner(ctx.author.id)
msg = await ctx.send("Alright, let's go find your cards...")
all_cards = await db_get("cards", params=[("team_id", team["id"])])
if all_cards:
await msg.edit(
content=f"I found {len(all_cards['cards'])} cards; deleting now..."
)
for x in all_cards["cards"]:
await db_delete("cards", object_id=x["id"])
await msg.edit(content=f"All done with cards. Now I'll wipe out your packs...")
p_query = await db_get("packs", params=[("team_id", team["id"])])
if p_query["count"]:
for x in p_query["packs"]:
await db_delete("packs", object_id=x["id"])
await msg.edit(
content=f"All done with packs. Now I'll wipe out your paperdex..."
)
p_query = await db_get("paperdex", params=[("team_id", team["id"])])
if p_query["count"]:
for x in p_query["paperdex"]:
await db_delete("paperdex", object_id=x["id"])
await msg.edit(
content=f"All done with paperdex. Now I'll wipe out your team..."
)
if db_delete("teams", object_id=team["id"]):
await msg.edit(content=f"All done!")
@commands.command(name="packtest", hidden=True)
@commands.is_owner()
async def pack_test_command(self, ctx):
team = await get_team_by_owner(ctx.author.id)
await display_cards(
await get_test_pack(ctx, team),
team,
ctx.channel,
ctx.author,
self.bot,
pack_cover=IMAGES["pack-sta"],
pack_name="Standard Pack",
)
async def setup(bot):
await bot.add_cog(Economy(bot))