paper-dynasty-discord/cogs/economy_new/team_setup.py
Cal Corum c0af0c3d32
All checks were successful
Build Docker Image / build (pull_request) Successful in 1m19s
fix: pack rarity targeting, StratGame methods, HR detection (#20 #21 #22)
- Fix pack distribution to use exact rarity targeting (rarity=0 for
  Replacement, rarity=1 for Reserve) instead of max_rarity=1 which
  matched both tiers; applied to cogs/economy.py and
  cogs/economy_new/team_setup.py

- Add get_away_team() and get_home_team() async methods to StratGame
  dataclass, delegating to get_game_team() with the appropriate
  team_id; remove stale TODO comment from Game model

- Standardize home-run detection in complete_play(): set
  batter_final = batter_to_base when not None before the HR check,
  then only check batter_final == 4 (removes redundant batter_to_base
  path and the patch comment)

Closes #20, Closes #21, Closes #22

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 23:31:16 -05:00

668 lines
24 KiB
Python

# Economy Team Setup Module
# Contains team creation and Google Sheets integration from the original economy.py
import logging
from discord.ext import commands
from discord import app_commands
import discord
import datetime
import os
from typing import Optional
# Import specific utilities needed by this module
import pygsheets
from api_calls import db_get, db_post, db_patch, db_delete, get_team_by_abbrev
from help_text import SHEET_SHARE_STEPS, HELP_SHEET_SCRIPTS
from helpers.constants import PD_PLAYERS, ALL_MLB_TEAMS
from helpers import (
get_team_by_owner,
share_channel,
get_role,
get_cal_user,
get_or_create_role,
display_cards,
give_packs,
get_all_pos,
get_sheets,
refresh_sheet,
post_ratings_guide,
team_summary_embed,
get_roster_sheet,
Question,
Confirm,
ButtonOptions,
legal_channel,
get_channel,
create_channel,
get_context_user,
)
from api_calls import team_hash
from helpers.discord_utils import get_team_embed, send_to_channel
logger = logging.getLogger("discord_app")
class TeamSetup(commands.Cog):
"""Team creation and Google Sheets integration functionality for Paper Dynasty."""
def __init__(self, bot):
self.bot = bot
@app_commands.command(
name="newteam", description="Get your fresh team for a new season"
)
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_commands.describe(
gm_name="The fictional name of your team's GM",
team_abbrev="2, 3, or 4 character abbreviation (e.g. WV, ATL, MAD)",
team_full_name="City/location and name (e.g. Baltimore Orioles)",
team_short_name="Name of team (e.g. Yankees)",
mlb_anchor_team="2 or 3 character abbreviation of your anchor MLB team (e.g. NYM, MKE)",
team_logo_url="[Optional] URL ending in .png or .jpg for your team logo",
color="[Optional] Hex color code to highlight your team",
)
async def new_team_slash(
self,
interaction: discord.Interaction,
gm_name: str,
team_abbrev: str,
team_full_name: str,
team_short_name: str,
mlb_anchor_team: str,
team_logo_url: str = None,
color: str = None,
):
owner_team = await get_team_by_owner(interaction.user.id)
current = await db_get("current")
# Check for existing team
if owner_team and not os.environ.get("TESTING"):
await interaction.response.send_message(
f"Whoa there, bucko. I already have you down as GM of the {owner_team['sname']}."
)
return
# Check for duplicate team data
dupes = await db_get("teams", params=[("abbrev", team_abbrev)])
if dupes["count"]:
await interaction.response.send_message(
f"Yikes! {team_abbrev.upper()} is a popular abbreviation - it's already in use by the "
f"{dupes['teams'][0]['sname']}. No worries, though, you can run the `/newteam` command again to get "
f"started!"
)
return
# Check for duplicate team data
dupes = await db_get("teams", params=[("lname", team_full_name)])
if dupes["count"]:
await interaction.response.send_message(
f"Yikes! {team_full_name.title()} is a popular name - it's already in use by "
f"{dupes['teams'][0]['abbrev']}. No worries, though, you can run the `/newteam` command again to get "
f"started!"
)
return
# Get personal bot channel
hello_channel = discord.utils.get(
interaction.guild.text_channels,
name=f"hello-{interaction.user.name.lower()}",
)
if hello_channel:
op_ch = hello_channel
else:
op_ch = await create_channel(
interaction,
channel_name=f"hello-{interaction.user.name}",
category_name="Paper Dynasty Team",
everyone_read=False,
read_send_members=[interaction.user],
)
await share_channel(op_ch, interaction.guild.me)
await share_channel(op_ch, interaction.user)
try:
poke_role = get_role(interaction, "Pokétwo")
await share_channel(op_ch, poke_role, read_only=True)
except Exception as e:
logger.error(f"unable to share sheet with Poketwo")
await interaction.response.send_message(
f"Let's head down to your private channel: {op_ch.mention}", ephemeral=True
)
await op_ch.send(
f"Hey there, {interaction.user.mention}! I am Paper Domo - welcome to season "
f"{current['season']} of Paper Dynasty! We've got a lot of special updates in store for this "
f"season including live cards, throwback cards, and special events."
)
# Confirm user is happy with branding
embed = get_team_embed(
f"Branding Check",
{
"logo": team_logo_url if team_logo_url else None,
"color": color if color else "a6ce39",
"season": 4,
},
)
embed.add_field(name="GM Name", value=gm_name, inline=False)
embed.add_field(name="Full Team Name", value=team_full_name)
embed.add_field(name="Short Team Name", value=team_short_name)
embed.add_field(name="Team Abbrev", value=team_abbrev.upper())
view = Confirm(responders=[interaction.user])
question = await op_ch.send(
"Are you happy with this branding? Don't worry - you can update it later!",
embed=embed,
view=view,
)
await view.wait()
if not view.value:
await question.edit(
content="~~Are you happy with this branding?~~\n\nI gotta go, but when you're ready to start again "
"run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the "
"command from last time and make edits.",
view=None,
)
return
await question.edit(
content="Looking good, champ in the making! Let's get you your starter team!",
view=None,
)
team_choice = None
if mlb_anchor_team.title() in ALL_MLB_TEAMS.keys():
team_choice = mlb_anchor_team.title()
else:
for x in ALL_MLB_TEAMS:
if (
mlb_anchor_team.upper() in ALL_MLB_TEAMS[x]
or mlb_anchor_team.title() in ALL_MLB_TEAMS[x]
):
team_choice = x
break
team_string = mlb_anchor_team
logger.debug(f"team_string: {team_string} / team_choice: {team_choice}")
if not team_choice:
# Get MLB anchor team
while True:
prompt = (
f"I don't recognize **{team_string}**. I try to recognize abbreviations (BAL), "
f'short names (Orioles), and long names ("Baltimore Orioles").\n\nWhat MLB club would you '
f"like to use as your anchor team?"
)
this_q = Question(self.bot, op_ch, prompt, "text", 120)
team_string = await this_q.ask([interaction.user])
if not team_string:
await op_ch.send(
f"Tell you hwat. You think on it and come back I gotta go, but when you're ready to start again "
"run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the "
"command from last time and make edits."
)
return
if team_string.title() in ALL_MLB_TEAMS.keys():
team_choice = team_string.title()
break
else:
match = False
for x in ALL_MLB_TEAMS:
if (
team_string.upper() in ALL_MLB_TEAMS[x]
or team_string.title() in ALL_MLB_TEAMS[x]
):
team_choice = x
match = True
break
if not match:
await op_ch.send(f"Got it!")
team = await db_post(
"teams",
payload={
"abbrev": team_abbrev.upper(),
"sname": team_short_name,
"lname": team_full_name,
"gmid": interaction.user.id,
"gmname": gm_name,
"gsheet": "None",
"season": current["season"],
"wallet": 100,
"color": color if color else "a6ce39",
"logo": team_logo_url if team_logo_url else None,
},
)
if not team:
await op_ch.send(
f"Frick. {get_cal_user(interaction).mention}, can you help? I can't find this team."
)
return
t_role = await get_or_create_role(
interaction, f"{team_abbrev} - {team_full_name}"
)
await interaction.user.add_roles(t_role)
anchor_players = []
anchor_all_stars = await db_get(
"players/random",
params=[
("min_rarity", 3),
("max_rarity", 3),
("franchise", team_choice),
("pos_exclude", "RP"),
("limit", 1),
("in_packs", True),
],
)
anchor_starters = await db_get(
"players/random",
params=[
("min_rarity", 2),
("max_rarity", 2),
("franchise", team_choice),
("pos_exclude", "RP"),
("limit", 2),
("in_packs", True),
],
)
if not anchor_all_stars:
await op_ch.send(
f"I am so sorry, but the {team_choice} do not have an All-Star to "
f"provide as your anchor player. Let's start this process over - will you please "
f"run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the "
"command from last time and make edits."
)
await db_delete("teams", object_id=team["id"])
return
if not anchor_starters or anchor_starters["count"] <= 1:
await op_ch.send(
f"I am so sorry, but the {team_choice} do not have two Starters to "
f"provide as your anchor players. Let's start this process over - will you please "
f"run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the "
"command from last time and make edits."
)
await db_delete("teams", object_id=team["id"])
return
anchor_players.append(anchor_all_stars["players"][0])
anchor_players.append(anchor_starters["players"][0])
anchor_players.append(anchor_starters["players"][1])
this_pack = await db_post(
"packs/one",
payload={
"team_id": team["id"],
"pack_type_id": 2,
"open_time": datetime.datetime.timestamp(datetime.datetime.now())
* 1000,
},
)
roster_counts = {
"SP": 0,
"RP": 0,
"CP": 0,
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"All-Star": 0,
"Starter": 0,
"Reserve": 0,
"Replacement": 0,
}
def update_roster_counts(players: list):
for pl in players:
roster_counts[pl["rarity"]["name"]] += 1
for x in get_all_pos(pl):
roster_counts[x] += 1
logger.warning(f"Roster counts for {team['sname']}: {roster_counts}")
# Add anchor position coverage
update_roster_counts(anchor_players)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in anchor_players
]
},
timeout=10,
)
# Get 10 pitchers to seed team
five_sps = await db_get(
"players/random",
params=[("pos_include", "SP"), ("max_rarity", 1), ("limit", 5)],
)
five_rps = await db_get(
"players/random",
params=[("pos_include", "RP"), ("max_rarity", 1), ("limit", 5)],
)
team_sp = [x for x in five_sps["players"]]
team_rp = [x for x in five_rps["players"]]
update_roster_counts([*team_sp, *team_rp])
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in [*team_sp, *team_rp]
]
},
timeout=10,
)
# Collect infielders
team_infielders = []
for pos in ["C", "1B", "2B", "3B", "SS"]:
if roster_counts["Replacement"] < roster_counts["Reserve"]:
rarity_param = ("rarity", 0)
else:
rarity_param = ("rarity", 1)
r_draw = await db_get(
"players/random",
params=[("pos_include", pos), rarity_param, ("limit", 2)],
none_okay=False,
)
team_infielders.extend(r_draw["players"])
update_roster_counts(team_infielders)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in team_infielders
]
},
timeout=10,
)
# Collect outfielders
team_outfielders = []
for pos in ["LF", "CF", "RF"]:
if roster_counts["Replacement"] < roster_counts["Reserve"]:
rarity_param = ("rarity", 0)
else:
rarity_param = ("rarity", 1)
r_draw = await db_get(
"players/random",
params=[("pos_include", pos), rarity_param, ("limit", 2)],
none_okay=False,
)
team_outfielders.extend(r_draw["players"])
update_roster_counts(team_outfielders)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in team_outfielders
]
},
timeout=10,
)
async with op_ch.typing():
done_anc = await display_cards(
[{"player": x, "team": team} for x in anchor_players],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Let's take a look at your three {team_choice} anchor players.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
error_text = f"Yikes - I can't display the rest of your team. {get_cal_user(interaction).mention} plz halp"
if not done_anc:
await op_ch.send(error_text)
async with op_ch.typing():
done_sp = await display_cards(
[{"player": x, "team": team} for x in team_sp],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Here are your starting pitchers.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_sp:
await op_ch.send(error_text)
async with op_ch.typing():
done_rp = await display_cards(
[{"player": x, "team": team} for x in team_rp],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"And now for your bullpen.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_rp:
await op_ch.send(error_text)
async with op_ch.typing():
done_inf = await display_cards(
[{"player": x, "team": team} for x in team_infielders],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Next let's take a look at your infielders.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_inf:
await op_ch.send(error_text)
async with op_ch.typing():
done_out = await display_cards(
[{"player": x, "team": team} for x in team_outfielders],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Now let's take a look at your outfielders.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_out:
await op_ch.send(error_text)
await give_packs(team, 1)
await op_ch.send(
f"To get you started, I've spotted you 100₼ and a pack of cards. You can rip that with the "
f"`/open` command once your google sheet is set up!"
)
await op_ch.send(
f"{t_role.mention}\n\n"
f"There's your roster! We have one more step and you will be ready to play.\n\n{SHEET_SHARE_STEPS}\n\n"
f"{get_roster_sheet({'gsheet': current['gsheet_template']})}"
)
new_team_embed = await team_summary_embed(
team, interaction, include_roster=False
)
await send_to_channel(
self.bot,
"pd-network-news",
content="A new challenger approaches...",
embed=new_team_embed,
)
@commands.hybrid_command(
name="newsheet", help="Link a new team sheet with your team"
)
@commands.has_any_role(PD_PLAYERS)
async def share_sheet_command(
self,
ctx,
google_sheet_url: str,
team_abbrev: Optional[str],
copy_rosters: Optional[bool] = True,
):
owner_team = await get_team_by_owner(get_context_user(ctx).id)
if not owner_team:
await ctx.send(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
return
team = owner_team
if team_abbrev and team_abbrev != owner_team["abbrev"]:
if get_context_user(ctx).id != 258104532423147520:
await ctx.send(
f"You can only update the team sheet for your own team, you goober."
)
return
else:
team = await get_team_by_abbrev(team_abbrev)
current = await db_get("current")
if current["gsheet_template"] in google_sheet_url:
await ctx.send(
f"Ope, looks like that is the template sheet. Would you please make a copy and then share?"
)
return
gauntlet_team = await get_team_by_abbrev(f"Gauntlet-{owner_team['abbrev']}")
if gauntlet_team:
view = ButtonOptions(
[ctx.author],
timeout=30,
labels=["Main Team", "Gauntlet Team", None, None, None],
)
question = await ctx.send(
f"Is this sheet for your main PD team or your active Gauntlet team?",
view=view,
)
await view.wait()
if not view.value:
await question.edit(
content=f"Okay you keep thinking on it and get back to me when you're ready.",
view=None,
)
return
elif view.value == "Gauntlet Team":
await question.delete()
team = gauntlet_team
sheets = get_sheets(self.bot)
response = await ctx.send(f"I'll go grab that sheet...")
try:
new_sheet = sheets.open_by_url(google_sheet_url)
except Exception as e:
logger.error(f"Error accessing {team['abbrev']} sheet: {e}")
current = await db_get("current")
await ctx.send(
f"I wasn't able to access that sheet. Did you remember to share it with my PD email?"
f"\n\nHere's a quick refresher:\n{SHEET_SHARE_STEPS}\n\n"
f"{get_roster_sheet({'gsheet': current['gsheet_template']})}"
)
return
team_data = new_sheet.worksheet_by_title("Team Data")
if not gauntlet_team or owner_team != gauntlet_team:
team_data.update_values(
crange="B1:B2", values=[[f"{team['id']}"], [f"{team_hash(team)}"]]
)
if copy_rosters and team["gsheet"].lower() != "none":
old_sheet = sheets.open_by_key(team["gsheet"])
r_sheet = old_sheet.worksheet_by_title(f"My Rosters")
roster_ids = r_sheet.range("B3:B80")
lineups_data = r_sheet.range("H4:M26")
new_r_data, new_l_data = [], []
for row in roster_ids:
if row[0].value != "":
new_r_data.append([int(row[0].value)])
else:
new_r_data.append([None])
logger.debug(f"new_r_data: {new_r_data}")
for row in lineups_data:
logger.debug(f"row: {row}")
new_l_data.append(
[
row[0].value if row[0].value != "" else None,
int(row[1].value) if row[1].value != "" else None,
row[2].value if row[2].value != "" else None,
int(row[3].value) if row[3].value != "" else None,
row[4].value if row[4].value != "" else None,
int(row[5].value) if row[5].value != "" else None,
]
)
logger.debug(f"new_l_data: {new_l_data}")
new_r_sheet = new_sheet.worksheet_by_title(f"My Rosters")
new_r_sheet.update_values(crange="B3:B80", values=new_r_data)
new_r_sheet.update_values(crange="H4:M26", values=new_l_data)
if team["has_guide"]:
post_ratings_guide(team, self.bot, this_sheet=new_sheet)
team = await db_patch(
"teams", object_id=team["id"], params=[("gsheet", new_sheet.id)]
)
await refresh_sheet(team, self.bot, sheets)
conf_message = f"Alright, your sheet is linked to your team - good luck"
if owner_team == team:
conf_message += " this season!"
else:
conf_message += " on your run!"
conf_message += f"\n\n{HELP_SHEET_SCRIPTS}"
await response.edit(content=f"{conf_message}")
async def setup(bot):
"""Setup function for the TeamSetup cog."""
await bot.add_cog(TeamSetup(bot))