diff --git a/cogs/compare.py b/cogs/compare.py new file mode 100644 index 0000000..e663443 --- /dev/null +++ b/cogs/compare.py @@ -0,0 +1,425 @@ +""" +Compare cog — /compare slash command. + +Displays a side-by-side stat embed for two cards of the same type (batter +vs batter, pitcher vs pitcher) with directional delta arrows. + +Card stats are derived from the battingcards / pitchingcards API endpoints +which carry the actual card data (running, steal range, pitcher ratings, etc.) +alongside the player's rarity and cost. + +Batter stats shown: + Cost (Overall proxy), Rarity, Running, Steal Low, Steal High, Bunting, + Hit & Run + +Pitcher stats shown: + Cost (Overall proxy), Rarity, Starter Rating, Relief Rating, + Closer Rating, Balk, Wild Pitch + +Arrow semantics: + ▲ card2 is higher (better for ↑-better stats) + ▼ card1 is higher (better for ↑-better stats) + ═ tied +""" + +import logging +from typing import List, Optional, Tuple + +import discord +from discord import app_commands +from discord.ext import commands + +from api_calls import db_get +from constants import PD_PLAYERS_ROLE_NAME +from utilities.autocomplete import player_autocomplete + +logger = logging.getLogger("discord_app") + +# ----- helpers ---------------------------------------------------------------- + +GRADE_ORDER = ["A", "B", "C", "D", "E", "F"] + + +def _grade_to_int(grade: Optional[str]) -> Optional[int]: + """Convert a letter grade (A-F) to a numeric rank for comparison. + + Lower rank = better grade. Returns None when grade is None/empty. + """ + if grade is None: + return None + upper = grade.upper().strip() + try: + return GRADE_ORDER.index(upper) + except ValueError: + return None + + +def _delta_arrow( + val1, + val2, + higher_is_better: bool = True, + grade_field: bool = False, +) -> str: + """Return a directional arrow showing which card has the better value. + + Args: + val1: stat value for card1 + val2: stat value for card2 + higher_is_better: when True, a larger numeric value is preferred. + When False (e.g. balk, wild_pitch), a smaller value is preferred. + grade_field: when True, val1/val2 are letter grades (A-F) where A > B. + + Returns: + '▲' if card2 wins, '▼' if card1 wins, '═' if tied / not comparable. + """ + if val1 is None or val2 is None: + return "═" + + if grade_field: + n1 = _grade_to_int(val1) + n2 = _grade_to_int(val2) + if n1 is None or n2 is None or n1 == n2: + return "═" + # Lower index = better grade; card2 wins when n2 < n1 + return "▲" if n2 < n1 else "▼" + + try: + n1 = float(val1) + n2 = float(val2) + except (TypeError, ValueError): + return "═" + + if n1 == n2: + return "═" + + if higher_is_better: + return "▲" if n2 > n1 else "▼" + else: + # lower is better (e.g. balk count) + return "▲" if n2 < n1 else "▼" + + +def _fmt(val) -> str: + """Format a stat value for display. Falls back to '—' when None.""" + if val is None: + return "—" + return str(val) + + +def _is_pitcher(player: dict) -> bool: + """Return True if the player is a pitcher (pos_1 in SP, RP).""" + return player.get("pos_1", "").upper() in ("SP", "RP") + + +def _card_type_label(player: dict) -> str: + return "pitcher" if _is_pitcher(player) else "batter" + + +# ----- embed builder (pure function, testable without Discord state) --------- + +_BATTER_STATS: List[Tuple[str, str, str, bool]] = [ + # (label, key_in_card, key_in_player, higher_is_better) + ("Cost (Overall)", "cost", "player", True), + ("Rarity", "rarity_value", "player", True), + ("Running", "running", "battingcard", True), + ("Steal Low", "steal_low", "battingcard", True), + ("Steal High", "steal_high", "battingcard", True), + ("Bunting", "bunting", "battingcard", False), # grade: A>B>C... + ("Hit & Run", "hit_and_run", "battingcard", False), # grade +] + +_PITCHER_STATS: List[Tuple[str, str, str, bool]] = [ + ("Cost (Overall)", "cost", "player", True), + ("Rarity", "rarity_value", "player", True), + ("Starter Rating", "starter_rating", "pitchingcard", True), + ("Relief Rating", "relief_rating", "pitchingcard", True), + ("Closer Rating", "closer_rating", "pitchingcard", True), + ("Balk", "balk", "pitchingcard", False), # lower is better + ("Wild Pitch", "wild_pitch", "pitchingcard", False), # lower is better +] + +_GRADE_FIELDS = {"bunting", "hit_and_run"} + + +class CompareMismatchError(ValueError): + """Raised when two cards are not of the same type.""" + + +def build_compare_embed( + card1: dict, + card2: dict, + card1_name: str, + card2_name: str, +) -> discord.Embed: + """Build a side-by-side comparison embed for two cards. + + Args: + card1: card data dict (player + battingcard OR pitchingcard). + Expects 'player', 'battingcard' or 'pitchingcard' keys. + card2: same shape as card1 + card1_name: display name override (falls back to player p_name) + card2_name: display name override + + Returns: + discord.Embed with inline stat rows + + Raises: + CompareMismatchError: if card types differ (batter vs pitcher) + """ + p1 = card1.get("player", {}) + p2 = card2.get("player", {}) + + type1 = _card_type_label(p1) + type2 = _card_type_label(p2) + + if type1 != type2: + raise CompareMismatchError( + f"Card types differ: '{card1_name}' is a {type1}, " + f"'{card2_name}' is a {type2}." + ) + + color_hex = p1.get("rarity", {}).get("color", "3498DB") + try: + color = int(color_hex, 16) + except (TypeError, ValueError): + color = 0x3498DB + + # Embed header + embed = discord.Embed( + title="Card Comparison", + description=( + f"**{card1_name}** vs **{card2_name}** — " + f"{'Pitchers' if type1 == 'pitcher' else 'Batters'}" + ), + color=color, + ) + + # Thumbnail from card1 headshot if available + thumbnail = p1.get("headshot") or p2.get("headshot") + if thumbnail: + embed.set_thumbnail(url=thumbnail) + + # Card name headers (inline, side-by-side feel) + embed.add_field(name="Card 1", value=f"**{card1_name}**", inline=True) + embed.add_field(name="Stat", value="\u200b", inline=True) + embed.add_field(name="Card 2", value=f"**{card2_name}**", inline=True) + + # Choose stat spec + stats = _PITCHER_STATS if type1 == "pitcher" else _BATTER_STATS + + for label, key, source, higher_is_better in stats: + # Extract values + if source == "player": + if key == "rarity_value": + v1 = p1.get("rarity", {}).get("value") + v2 = p2.get("rarity", {}).get("value") + # Display as rarity name + value + display1 = p1.get("rarity", {}).get("name", _fmt(v1)) + display2 = p2.get("rarity", {}).get("name", _fmt(v2)) + else: + v1 = p1.get(key) + v2 = p2.get(key) + display1 = _fmt(v1) + display2 = _fmt(v2) + elif source == "battingcard": + bc1 = card1.get("battingcard", {}) or {} + bc2 = card2.get("battingcard", {}) or {} + v1 = bc1.get(key) + v2 = bc2.get(key) + display1 = _fmt(v1) + display2 = _fmt(v2) + elif source == "pitchingcard": + pc1 = card1.get("pitchingcard", {}) or {} + pc2 = card2.get("pitchingcard", {}) or {} + v1 = pc1.get(key) + v2 = pc2.get(key) + display1 = _fmt(v1) + display2 = _fmt(v2) + else: + continue + + is_grade = key in _GRADE_FIELDS + arrow = _delta_arrow( + v1, + v2, + higher_is_better=higher_is_better, + grade_field=is_grade, + ) + + embed.add_field(name="\u200b", value=display1, inline=True) + embed.add_field(name=label, value=arrow, inline=True) + embed.add_field(name="\u200b", value=display2, inline=True) + + embed.set_footer(text="Paper Dynasty — /compare") + return embed + + +# ----- card fetch helpers ----------------------------------------------------- + + +async def _fetch_player_by_name(name: str) -> Optional[dict]: + """Search for a player by name and return the first match.""" + result = await db_get( + "players/search", + params=[("q", name), ("limit", 1)], + timeout=5, + ) + if not result or not result.get("players"): + return None + return result["players"][0] + + +async def _fetch_batting_card(player_id: int) -> Optional[dict]: + """Fetch the variant-0 batting card for a player.""" + result = await db_get( + "battingcards", + params=[("player_id", player_id), ("variant", 0)], + timeout=5, + ) + if not result or not result.get("cards"): + # Fall back to any variant + result = await db_get( + "battingcards", + params=[("player_id", player_id)], + timeout=5, + ) + if not result or not result.get("cards"): + return None + return result["cards"][0] + + +async def _fetch_pitching_card(player_id: int) -> Optional[dict]: + """Fetch the variant-0 pitching card for a player.""" + result = await db_get( + "pitchingcards", + params=[("player_id", player_id), ("variant", 0)], + timeout=5, + ) + if not result or not result.get("cards"): + result = await db_get( + "pitchingcards", + params=[("player_id", player_id)], + timeout=5, + ) + if not result or not result.get("cards"): + return None + return result["cards"][0] + + +async def _build_card_data(player: dict) -> dict: + """Build a unified card data dict for use with build_compare_embed. + + Returns a dict with 'player', 'battingcard', and 'pitchingcard' keys. + """ + pid = player.get("player_id") or player.get("id") + batting_card = None + pitching_card = None + + if _is_pitcher(player): + pitching_card = await _fetch_pitching_card(pid) + else: + batting_card = await _fetch_batting_card(pid) + + return { + "player": player, + "battingcard": batting_card, + "pitchingcard": pitching_card, + } + + +# ----- Cog -------------------------------------------------------------------- + + +class CompareCog(commands.Cog, name="Compare"): + """Slash command cog providing /compare for side-by-side card comparison.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @app_commands.command( + name="compare", + description="Side-by-side stat comparison for two cards", + ) + @app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME) + @app_commands.describe( + card1="First player's card (type a name to search)", + card2="Second player's card (type a name to search)", + ) + @app_commands.autocomplete(card1=player_autocomplete, card2=player_autocomplete) + async def compare_command( + self, + interaction: discord.Interaction, + card1: str, + card2: str, + ): + """Compare two cards side-by-side. + + Fetches both players by name, validates that they are the same card + type (both batters or both pitchers), then builds and sends the + comparison embed. + """ + await interaction.response.defer() + + # --- fetch player 1 --------------------------------------------------- + player1 = await _fetch_player_by_name(card1) + if not player1: + await interaction.edit_original_response( + content=f"Could not find a card for **{card1}**." + ) + return + + # --- fetch player 2 --------------------------------------------------- + player2 = await _fetch_player_by_name(card2) + if not player2: + await interaction.edit_original_response( + content=f"Could not find a card for **{card2}**." + ) + return + + # --- type-gate -------------------------------------------------------- + type1 = _card_type_label(player1) + type2 = _card_type_label(player2) + if type1 != type2: + await interaction.edit_original_response( + content=( + "Can only compare cards of the same type " + "(batter vs batter, pitcher vs pitcher)." + ), + ) + return + + # --- build card data -------------------------------------------------- + card_data1 = await _build_card_data(player1) + card_data2 = await _build_card_data(player2) + + name1 = player1.get("p_name", card1) + name2 = player2.get("p_name", card2) + + # --- build embed ------------------------------------------------------ + try: + embed = build_compare_embed(card_data1, card_data2, name1, name2) + except CompareMismatchError as exc: + logger.warning("CompareMismatchError (should not reach here): %s", exc) + await interaction.edit_original_response( + content=( + "Can only compare cards of the same type " + "(batter vs batter, pitcher vs pitcher)." + ), + ) + return + except Exception as exc: + logger.error( + "compare_command build_compare_embed error: %s", exc, exc_info=True + ) + await interaction.edit_original_response( + content="Something went wrong building the comparison. Please contact Cal." + ) + return + + # Send publicly so players can share the result + await interaction.edit_original_response(embed=embed) + + +async def setup(bot: commands.Bot) -> None: + """Discord.py cog loader entry point.""" + await bot.add_cog(CompareCog(bot)) diff --git a/gauntlets.py b/gauntlets.py index d617bd6..4ceeeb1 100644 --- a/gauntlets.py +++ b/gauntlets.py @@ -2393,6 +2393,156 @@ async def evolve_pokemon(this_team: Team, channel, responders): await channel.send("All of your Pokemon are fully evolved!") +def build_gauntlet_recap_embed( + this_run: dict, + this_event: dict, + main_team: dict, + rewards: list[dict], +) -> discord.Embed: + """Build a Discord embed summarising a completed gauntlet run. + + Called when a player finishes a gauntlet (10 wins). This is a pure + synchronous builder so it can be unit-tested without a Discord connection. + + Args: + this_run: gauntletruns API dict (must have wins/losses/team keys). + this_event: events API dict (name, url, short_desc). + main_team: teams API dict for the player's real team (gmid, lname, logo). + rewards: list of gauntletrewards API dicts for the event. + + Returns: + A discord.Embed with champion highlight, run record, and prize table. + """ + # Gold/champion accent colour + GOLD = 0xFFD700 + + team_name = main_team.get("lname", "Unknown Team") + gmid = main_team.get("gmid") + wins = this_run.get("wins", 0) + losses = this_run.get("losses", 0) + event_name = this_event.get("name", "Gauntlet") + + embed = discord.Embed( + title=f"Gauntlet Complete: {event_name}", + color=GOLD, + ) + + # Champion highlight + champion_value = f"**{team_name}**" + if gmid: + champion_value += f"\n<@{gmid}>" + if main_team.get("logo"): + embed.set_thumbnail(url=main_team["logo"]) + embed.add_field(name="Champion", value=champion_value, inline=True) + + # Run record + embed.add_field( + name="Final Record", + value=f"**{wins}-{losses}**", + inline=True, + ) + + # Bracket / progression — for the solo gauntlet format this is a + # milestone ladder (win-by-win), not a bracket tree. + if wins > 0: + bracket_lines = [] + milestone_wins = sorted( + {r["win_num"] for r in rewards if r.get("win_num") is not None} + ) + for mw in milestone_wins: + marker = "✅" if wins >= mw else "⬜" + bracket_lines.append(f"{marker} Win {mw}") + if bracket_lines: + embed.add_field( + name="Progression", + value="\n".join(bracket_lines), + inline=False, + ) + + # Prize distribution table + if rewards: + prize_lines = [] + for r in sorted(rewards, key=lambda x: x.get("win_num", 0)): + win_num = r.get("win_num", "?") + loss_max = r.get("loss_max") + label = f"{win_num}-0" if loss_max == 0 else f"{win_num} Wins" + + earned = ( + wins >= win_num and losses <= loss_max + if loss_max is not None + else wins >= win_num + ) + marker = ( + "✅" + if earned + else "❌" + if losses > (loss_max if loss_max is not None else 99) + else "⬜" + ) + + reward = r.get("reward", {}) + if reward.get("money"): + prize_desc = f"{reward['money']}₼" + elif reward.get("player"): + prize_desc = reward["player"].get("description", "Special Card") + elif reward.get("pack_type"): + prize_desc = f"1x {reward['pack_type']['name']} Pack" + else: + prize_desc = "Reward" + + prize_lines.append(f"{marker} {label}: {prize_desc}") + + if prize_lines: + embed.add_field( + name="Prize Distribution", + value="\n".join(prize_lines), + inline=False, + ) + + embed.set_footer(text="Paper Dynasty Gauntlet") + return embed + + +async def post_gauntlet_recap( + this_run: dict, + this_event: dict, + main_team: dict, + channel, +) -> None: + """Send a gauntlet completion recap embed to the given channel. + + Fetches all rewards for the event so the prize table is complete, then + builds and posts the recap embed. Gracefully handles a missing or + unavailable channel by logging and returning without raising so the + gauntlet completion flow is never interrupted. + + Args: + this_run: gauntletruns API dict. + this_event: events API dict. + main_team: player's main teams API dict. + channel: discord.TextChannel to post to, or None. + """ + if channel is None: + logger.warning( + "post_gauntlet_recap: no channel available — recap skipped " + f"(run_id={this_run.get('id')})" + ) + return + try: + all_rewards_query = await db_get( + "gauntletrewards", params=[("gauntlet_id", this_event["id"])] + ) + all_rewards = all_rewards_query.get("rewards", []) + embed = build_gauntlet_recap_embed(this_run, this_event, main_team, all_rewards) + await channel.send(embed=embed) + except Exception: + logger.warning( + "post_gauntlet_recap: failed to send recap embed " + f"(run_id={this_run.get('id')})", + exc_info=True, + ) + + async def post_result( run_id: int, is_win: bool, @@ -2544,6 +2694,10 @@ async def post_result( final_message += f"\n\nGo share the highlights in {get_channel(channel, 'pd-news-ticker').mention}!" await channel.send(content=final_message, embed=await get_embed(this_run)) + + # Post gauntlet completion recap embed when the run is finished (10 wins) + if this_run["wins"] == 10: + await post_gauntlet_recap(this_run, this_event, main_team, channel) else: # this_run = await db_patch( # 'gauntletruns', diff --git a/in_game/gameplay_queries.py b/in_game/gameplay_queries.py index 789aebf..0836f26 100644 --- a/in_game/gameplay_queries.py +++ b/in_game/gameplay_queries.py @@ -5,13 +5,51 @@ from typing import Literal import pydantic from sqlalchemy import func +from sqlalchemy.exc import NoResultFound from api_calls import db_get, db_post from sqlmodel import col, update -from in_game.gameplay_models import CACHE_LIMIT, BatterScouting, BatterScoutingBase, BattingCard, BattingCardBase, BattingRatings, BattingRatingsBase, Card, CardBase, Cardset, CardsetBase, GameCardsetLink, Lineup, PitcherScouting, PitchingCard, PitchingCardBase, PitchingRatings, PitchingRatingsBase, Player, PlayerBase, PositionRating, PositionRatingBase, RosterLink, Session, Team, TeamBase, select, or_, Game, Play -from exceptions import DatabaseError, PositionNotFoundException, log_errors, log_exception, PlayNotFoundException +from in_game.gameplay_models import ( + CACHE_LIMIT, + BatterScouting, + BatterScoutingBase, + BattingCard, + BattingCardBase, + BattingRatings, + BattingRatingsBase, + Card, + CardBase, + Cardset, + CardsetBase, + GameCardsetLink, + Lineup, + PitcherScouting, + PitchingCard, + PitchingCardBase, + PitchingRatings, + PitchingRatingsBase, + Player, + PlayerBase, + PositionRating, + PositionRatingBase, + RosterLink, + Session, + Team, + TeamBase, + select, + or_, + Game, + Play, +) +from exceptions import ( + DatabaseError, + PositionNotFoundException, + log_errors, + log_exception, + PlayNotFoundException, +) -logger = logging.getLogger('discord_app') +logger = logging.getLogger("discord_app") class DecisionModel(pydantic.BaseModel): @@ -34,20 +72,28 @@ class DecisionModel(pydantic.BaseModel): def get_batting_team(session: Session, this_play: Play) -> Team: - return this_play.game.away_team if this_play.inning_half == 'top' else this_play.game.home_team + return ( + this_play.game.away_team + if this_play.inning_half == "top" + else this_play.game.home_team + ) def get_games_by_channel(session: Session, channel_id: int) -> list[Game]: - logger.info(f'Getting games in channel {channel_id}') - return session.exec(select(Game).where(Game.channel_id == channel_id, Game.active)).all() + logger.info(f"Getting games in channel {channel_id}") + return session.exec( + select(Game).where(Game.channel_id == channel_id, Game.active) + ).all() def get_channel_game_or_none(session: Session, channel_id: int) -> Game | None: - logger.info(f'Getting one game from channel {channel_id}') + logger.info(f"Getting one game from channel {channel_id}") all_games = get_games_by_channel(session, channel_id) if len(all_games) > 1: - err = 'Too many games found in get_channel_game_or_none' - logger.error(f'cogs.gameplay - get_channel_game_or_none - channel_id: {channel_id} / {err}') + err = "Too many games found in get_channel_game_or_none" + logger.error( + f"cogs.gameplay - get_channel_game_or_none - channel_id: {channel_id} / {err}" + ) raise Exception(err) elif len(all_games) == 0: return None @@ -55,224 +101,298 @@ def get_channel_game_or_none(session: Session, channel_id: int) -> Game | None: def get_active_games_by_team(session: Session, team: Team) -> list[Game]: - logger.info(f'Getting game for team {team.lname}') - return session.exec(select(Game).where(Game.active, or_(Game.away_team_id == team.id, Game.home_team_id == team.id))).all() + logger.info(f"Getting game for team {team.lname}") + return session.exec( + select(Game).where( + Game.active, or_(Game.away_team_id == team.id, Game.home_team_id == team.id) + ) + ).all() async def get_team_or_none( - session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False, main_team: bool = None, gauntlet_team: bool = None, include_packs: bool = False) -> Team | None: - logger.info(f'Getting team or none / team_id: {team_id} / gm_id: {gm_id} / team_abbrev: {team_abbrev} / skip_cache: {skip_cache} / main_team: {main_team} / gauntlet_team: {gauntlet_team}') - + session: Session, + team_id: int | None = None, + gm_id: int | None = None, + team_abbrev: str | None = None, + skip_cache: bool = False, + main_team: bool = None, + gauntlet_team: bool = None, + include_packs: bool = False, +) -> Team | None: + logger.info( + f"Getting team or none / team_id: {team_id} / gm_id: {gm_id} / team_abbrev: {team_abbrev} / skip_cache: {skip_cache} / main_team: {main_team} / gauntlet_team: {gauntlet_team}" + ) + this_team = None if gm_id is not None: if main_team is None and gauntlet_team is None: main_team = True gauntlet_team = False elif main_team == gauntlet_team: - log_exception(KeyError, 'Must select either main_team or gauntlet_team') - - logger.info(f'main_team: {main_team} / gauntlet_team: {gauntlet_team}') + log_exception(KeyError, "Must select either main_team or gauntlet_team") + + logger.info(f"main_team: {main_team} / gauntlet_team: {gauntlet_team}") if team_id is None and gm_id is None and team_abbrev is None: - log_exception(KeyError, 'One of "team_id", "gm_id", or "team_abbrev" must be included in search') - + log_exception( + KeyError, + 'One of "team_id", "gm_id", or "team_abbrev" must be included in search', + ) + if not skip_cache: if team_id is not None: - logger.info(f'Getting team by team_id: {team_id}') + logger.info(f"Getting team by team_id: {team_id}") this_team = session.get(Team, team_id) else: if gm_id is not None: - logger.info(f'Getting team by gm_id: {gm_id}') + logger.info(f"Getting team by gm_id: {gm_id}") for team in session.exec(select(Team).where(Team.gmid == gm_id)).all(): - if ('gauntlet' in team.abbrev.lower() and gauntlet_team) or ('gauntlet' not in team.abbrev.lower() and main_team): - logger.info(f'Found the team: {team}') + if ("gauntlet" in team.abbrev.lower() and gauntlet_team) or ( + "gauntlet" not in team.abbrev.lower() and main_team + ): + logger.info(f"Found the team: {team}") this_team = team break - logger.info(f'post loop, this_team: {this_team}') + logger.info(f"post loop, this_team: {this_team}") else: - logger.info(f'Getting team by abbrev: {team_abbrev}') - this_team = session.exec(select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower())).one_or_none() - + logger.info(f"Getting team by abbrev: {team_abbrev}") + this_team = session.exec( + select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower()) + ).one_or_none() + if this_team is not None: - logger.info(f'we found a team: {this_team} / created: {this_team.created}') + logger.info(f"we found a team: {this_team} / created: {this_team.created}") tdelta = datetime.datetime.now() - this_team.created - logger.info(f'tdelta: {tdelta}') + logger.info(f"tdelta: {tdelta}") if tdelta.total_seconds() < CACHE_LIMIT: return this_team # else: # session.delete(this_team) # session.commit() - + def cache_team(json_data: dict) -> Team: - logger.info(f'gameplay_queries - cache_team - writing a team to cache: {json_data}') + logger.info( + f"gameplay_queries - cache_team - writing a team to cache: {json_data}" + ) valid_team = TeamBase.model_validate(json_data, from_attributes=True) - logger.info(f'gameplay_queries - cache_team - valid_team: {valid_team}') + logger.info(f"gameplay_queries - cache_team - valid_team: {valid_team}") db_team = Team.model_validate(valid_team) - logger.info(f'gameplay_queries - cache_team - db_team: {db_team}') - logger.info(f'Checking for existing team ID: {db_team.id}') + logger.info(f"gameplay_queries - cache_team - db_team: {db_team}") + logger.info(f"Checking for existing team ID: {db_team.id}") try: this_team = session.exec(select(Team).where(Team.id == db_team.id)).one() - logger.info(f'Found team: {this_team}\nUpdating with db team: {db_team}') - + logger.info(f"Found team: {this_team}\nUpdating with db team: {db_team}") + for key, value in db_team.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_team, key, value) - logger.info(f'Set this_team to db_team') + logger.info(f"Set this_team to db_team") session.add(this_team) session.commit() - logger.info(f'Refreshing this_team') + logger.info(f"Refreshing this_team") session.refresh(this_team) return this_team - except Exception: - logger.info(f'Team not found, adding to db') + except NoResultFound: + logger.info(f"Team not found, adding to db") session.add(db_team) session.commit() session.refresh(db_team) return db_team - + if team_id is not None: - t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', include_packs)]) + t_query = await db_get( + "teams", object_id=team_id, params=[("inc_packs", include_packs)] + ) if t_query is not None: return cache_team(t_query) - + elif gm_id is not None: - t_query = await db_get('teams', params=[('gm_id', gm_id), ('inc_packs', include_packs)]) - if t_query['count'] != 0: - for team in t_query['teams']: - logger.info(f'in t_query loop / team: {team} / gauntlet_team: {gauntlet_team} / main_team: {main_team}') - if (gauntlet_team and 'gauntlet' in team['abbrev'].lower()) or (main_team and 'gauntlet' not in team['abbrev'].lower()): + t_query = await db_get( + "teams", params=[("gm_id", gm_id), ("inc_packs", include_packs)] + ) + if t_query["count"] != 0: + for team in t_query["teams"]: + logger.info( + f"in t_query loop / team: {team} / gauntlet_team: {gauntlet_team} / main_team: {main_team}" + ) + if (gauntlet_team and "gauntlet" in team["abbrev"].lower()) or ( + main_team and "gauntlet" not in team["abbrev"].lower() + ): return cache_team(team) - + elif team_abbrev is not None: - t_query = await db_get('teams', params=[('abbrev', team_abbrev), ('inc_packs', include_packs)]) - if t_query['count'] != 0: - if 'gauntlet' in team_abbrev.lower(): - return cache_team(t_query['teams'][0]) - - for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]: + t_query = await db_get( + "teams", params=[("abbrev", team_abbrev), ("inc_packs", include_packs)] + ) + if t_query["count"] != 0: + if "gauntlet" in team_abbrev.lower(): + return cache_team(t_query["teams"][0]) + + for team in [ + x for x in t_query["teams"] if "gauntlet" not in x["abbrev"].lower() + ]: return cache_team(team) - - logger.warning(f'No team found') + + logger.warning(f"No team found") return None -async def get_cardset_or_none(session: Session, cardset_id: int = None, cardset_name: str = None): - logger.info(f'Getting cardset or none / cardset_id: {cardset_id} / cardset_name: {cardset_name}') +async def get_cardset_or_none( + session: Session, cardset_id: int = None, cardset_name: str = None +): + logger.info( + f"Getting cardset or none / cardset_id: {cardset_id} / cardset_name: {cardset_name}" + ) if cardset_id is None and cardset_name is None: - log_exception(KeyError, 'One of "cardset_id" or "cardset_name" must be included in search') - + log_exception( + KeyError, 'One of "cardset_id" or "cardset_name" must be included in search' + ) + if cardset_id is not None: - logger.info(f'Getting cardset by id: {cardset_id}') + logger.info(f"Getting cardset by id: {cardset_id}") this_cardset = session.get(Cardset, cardset_id) else: - logger.info(f'Getting cardset by name: {cardset_name}') - this_cardset = session.exec(select(Cardset).where(func.lower(Cardset.name) == cardset_name.lower())).one_or_none() - + logger.info(f"Getting cardset by name: {cardset_name}") + this_cardset = session.exec( + select(Cardset).where(func.lower(Cardset.name) == cardset_name.lower()) + ).one_or_none() + if this_cardset is not None: - logger.info(f'we found a cardset: {this_cardset}') + logger.info(f"we found a cardset: {this_cardset}") return this_cardset - + def cache_cardset(json_data: dict) -> Cardset: - logger.info(f'gameplay_queries - cache_team - writing a team to cache: {json_data}') + logger.info( + f"gameplay_queries - cache_team - writing a team to cache: {json_data}" + ) valid_cardset = CardsetBase.model_validate(json_data, from_attributes=True) - logger.info(f'gameplay_queries - cache_team - valid_cardset: {valid_cardset}') + logger.info(f"gameplay_queries - cache_team - valid_cardset: {valid_cardset}") db_cardset = Cardset.model_validate(valid_cardset) - logger.info(f'gameplay_queries - cache_team - db_cardset: {db_cardset}') + logger.info(f"gameplay_queries - cache_team - db_cardset: {db_cardset}") session.add(db_cardset) session.commit() session.refresh(db_cardset) return db_cardset - + if cardset_id is not None: - c_query = await db_get('cardsets', object_id=cardset_id) + c_query = await db_get("cardsets", object_id=cardset_id) if c_query is not None: return cache_cardset(c_query) - + elif cardset_name is not None: - c_query = await db_get('cardsets', params=[('name', cardset_name)]) - if c_query['count'] != 0: - return cache_cardset(c_query['cardsets'][0]) - - logger.warning(f'No cardset found') + c_query = await db_get("cardsets", params=[("name", cardset_name)]) + if c_query["count"] != 0: + return cache_cardset(c_query["cardsets"][0]) + + logger.warning(f"No cardset found") return None -async def get_player_or_none(session: Session, player_id: int, skip_cache: bool = False) -> Player | None: - logger.info(f'gameplay_models - get_player_or_none - player_id: {player_id} / skip_cache: {skip_cache}') +async def get_player_or_none( + session: Session, player_id: int, skip_cache: bool = False +) -> Player | None: + logger.info( + f"gameplay_models - get_player_or_none - player_id: {player_id} / skip_cache: {skip_cache}" + ) if not skip_cache: this_player = session.get(Player, player_id) if this_player is not None: - logger.info(f'we found a cached player: {this_player}\ncreated: {this_player.created}') + logger.info( + f"we found a cached player: {this_player}\ncreated: {this_player.created}" + ) tdelta = datetime.datetime.now() - this_player.created - logger.info(f'tdelta: {tdelta}') + logger.info(f"tdelta: {tdelta}") if tdelta.total_seconds() < CACHE_LIMIT: - logger.info(f'returning this player') + logger.info(f"returning this player") return this_player # else: # logger.warning('Deleting old player record') # session.delete(this_player) # session.commit() - + def cache_player(json_data: dict) -> Player: - logger.info(f'gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}') + logger.info( + f"gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}" + ) valid_player = PlayerBase.model_validate(json_data, from_attributes=True) db_player = Player.model_validate(valid_player) try: this_player = session.get(Player, player_id) - logger.info(f'Found player: {this_player}\nUpdating with db_player: {db_player}') - + logger.info( + f"Found player: {this_player}\nUpdating with db_player: {db_player}" + ) + for key, value in db_player.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_player, key, value) - logger.info(f'Set this_player to db_player') + logger.info(f"Set this_player to db_player") session.add(this_player) session.commit() - logger.info(f'Refreshing this_player') + logger.info(f"Refreshing this_player") session.refresh(this_player) return this_player - except Exception: + except NoResultFound: session.add(db_player) session.commit() session.refresh(db_player) return db_player - - p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)]) + + p_query = await db_get("players", object_id=player_id, params=[("inc_dex", False)]) if p_query is not None: - if 'id' not in p_query: - p_query['id'] = p_query['player_id'] - if 'name' not in p_query: - p_query['name'] = p_query['p_name'] + if "id" not in p_query: + p_query["id"] = p_query["player_id"] + if "name" not in p_query: + p_query["name"] = p_query["p_name"] return cache_player(p_query) - + return None -async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache: bool = False) -> BatterScouting | None: - logger.info(f'Getting batting scouting for card ID #{card.id}: {card.player.name_with_desc} / skip_cache: {skip_cache}') +async def get_batter_scouting_or_none( + session: Session, card: Card, skip_cache: bool = False +) -> BatterScouting | None: + logger.info( + f"Getting batting scouting for card ID #{card.id}: {card.player.name_with_desc} / skip_cache: {skip_cache}" + ) - s_query = await db_get(f'battingcardratings/player/{card.player.id}?variant={card.variant}', none_okay=False) - if s_query['count'] != 2: - log_exception(DatabaseError, f'Scouting for {card.player.name_with_desc} was not found.') + s_query = await db_get( + f"battingcardratings/player/{card.player.id}?variant={card.variant}", + none_okay=False, + ) + if s_query["count"] != 2: + log_exception( + DatabaseError, f"Scouting for {card.player.name_with_desc} was not found." + ) - bs_query = session.exec(select(BatterScouting).where(BatterScouting.battingcard_id == s_query['ratings'][0]['battingcard']['id'])).all() - logger.info(f'bs_query: {bs_query}') + bs_query = session.exec( + select(BatterScouting).where( + BatterScouting.battingcard_id == s_query["ratings"][0]["battingcard"]["id"] + ) + ).all() + logger.info(f"bs_query: {bs_query}") if len(bs_query) > 0: this_scouting = bs_query[0] - logger.info(f'this_scouting: {this_scouting}') - logger.info(f'we found a cached scouting: {this_scouting} / created {this_scouting.created}') + logger.info(f"this_scouting: {this_scouting}") + logger.info( + f"we found a cached scouting: {this_scouting} / created {this_scouting.created}" + ) tdelta = datetime.datetime.now() - this_scouting.created - logger.debug(f'tdelta: {tdelta}') - if tdelta.total_seconds() < CACHE_LIMIT and None not in [this_scouting.battingcard, this_scouting.ratings_vl, this_scouting.ratings_vr]: - logger.info(f'returning cached scouting') + logger.debug(f"tdelta: {tdelta}") + if tdelta.total_seconds() < CACHE_LIMIT and None not in [ + this_scouting.battingcard, + this_scouting.ratings_vl, + this_scouting.ratings_vr, + ]: + logger.info(f"returning cached scouting") return this_scouting else: - logger.info(f'Refreshing cache') + logger.info(f"Refreshing cache") # logger.info(f'deleting cached scouting') # session.delete(this_scouting.battingcard) # session.delete(this_scouting.ratings_vl) @@ -280,8 +400,10 @@ async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache: # session.delete(this_scouting) # session.commit() - def cache_scouting(batting_card: dict, ratings_vr: dict, ratings_vl: dict) -> BatterScouting: - logger.info(f'Beginning batter scouting cache process') + def cache_scouting( + batting_card: dict, ratings_vr: dict, ratings_vl: dict + ) -> BatterScouting: + logger.info(f"Beginning batter scouting cache process") valid_bc = BattingCardBase.model_validate(batting_card, from_attributes=True) db_bc = BattingCard.model_validate(valid_bc) @@ -290,71 +412,81 @@ async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache: valid_vr = BattingRatingsBase.model_validate(ratings_vr, from_attributes=True) db_vr = BattingRatings.model_validate(valid_vr) - logger.info(f'db_bc: {db_bc}\n\ndb_vl: {db_vl}\n\ndb_vr: {db_vr}') + logger.info(f"db_bc: {db_bc}\n\ndb_vl: {db_vl}\n\ndb_vr: {db_vr}") - logger.info(f'Checking for existing battingcard ID: {db_bc.id}') + logger.info(f"Checking for existing battingcard ID: {db_bc.id}") try: - this_card = session.exec(select(BattingCard).where(BattingCard.id == db_bc.id)).one() - logger.info(f'Found card: {this_card}\nUpdating with db card: {db_bc}') - + this_card = session.exec( + select(BattingCard).where(BattingCard.id == db_bc.id) + ).one() + logger.info(f"Found card: {this_card}\nUpdating with db card: {db_bc}") + for key, value in db_bc.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_card, key, value) - logger.info(f'Set this_card to db_bc') + logger.info(f"Set this_card to db_bc") session.add(this_card) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except Exception: - logger.info(f'Card not found, adding to db') + except NoResultFound: + logger.info(f"Card not found, adding to db") this_card = db_bc session.add(this_card) # session.commit() # session.refresh(db_card) # return db_card - logger.info(f'Checking for existing vl ratings ID: {db_vl.id}') + logger.info(f"Checking for existing vl ratings ID: {db_vl.id}") try: - this_vl_rating = session.exec(select(BattingRatings).where(BattingRatings.id == db_vl.id)).one() - logger.info(f'Found ratings: {this_vl_rating}\nUpdating with db ratings: {db_vl}') - + this_vl_rating = session.exec( + select(BattingRatings).where(BattingRatings.id == db_vl.id) + ).one() + logger.info( + f"Found ratings: {this_vl_rating}\nUpdating with db ratings: {db_vl}" + ) + for key, value in db_vl.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_vl_rating, key, value) - logger.info(f'Set this_vr_rating to db_vl') + logger.info(f"Set this_vr_rating to db_vl") session.add(this_vl_rating) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except Exception: - logger.info(f'Card not found, adding to db') + except NoResultFound: + logger.info(f"Card not found, adding to db") this_vl_rating = db_vl session.add(this_vl_rating) # session.commit() # session.refresh(db_card) # return db_card - logger.info(f'Checking for existing vr ratings ID: {db_vr.id}') + logger.info(f"Checking for existing vr ratings ID: {db_vr.id}") try: - this_vr_rating = session.exec(select(BattingRatings).where(BattingRatings.id == db_vr.id)).one() - logger.info(f'Found ratings: {this_vr_rating}\nUpdating with db ratings: {db_vr}') - + this_vr_rating = session.exec( + select(BattingRatings).where(BattingRatings.id == db_vr.id) + ).one() + logger.info( + f"Found ratings: {this_vr_rating}\nUpdating with db ratings: {db_vr}" + ) + for key, value in db_vr.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_vr_rating, key, value) - logger.info(f'Set this_vr_rating to db_vl') + logger.info(f"Set this_vr_rating to db_vl") session.add(this_vr_rating) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except Exception: - logger.info(f'Card not found, adding to db') + except NoResultFound: + logger.info(f"Card not found, adding to db") this_vr_rating = db_vr session.add(this_vr_rating) # session.commit() @@ -362,9 +494,7 @@ async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache: # return db_card db_scouting = BatterScouting( - battingcard=this_card, - ratings_vl=this_vl_rating, - ratings_vr=this_vr_rating + battingcard=this_card, ratings_vl=this_vl_rating, ratings_vr=this_vr_rating ) # db_scouting = BatterScouting( @@ -372,53 +502,81 @@ async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache: # ratings_vl=db_vl, # ratings_vr=db_vr # ) - + session.add(db_scouting) - logger.info(f'caching scouting') + logger.info(f"caching scouting") session.commit() session.refresh(db_scouting) - logger.info(f'scouting id: {db_scouting.id} / battingcard: {db_scouting.battingcard.id} / vL: {db_scouting.ratings_vl.id} / vR: {db_scouting.ratings_vr.id}') + logger.info( + f"scouting id: {db_scouting.id} / battingcard: {db_scouting.battingcard.id} / vL: {db_scouting.ratings_vl.id} / vR: {db_scouting.ratings_vr.id}" + ) return db_scouting - + return cache_scouting( - batting_card=s_query['ratings'][0]['battingcard'], - ratings_vr=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'R' else s_query['ratings'][1], - ratings_vl=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'L' else s_query['ratings'][1] + batting_card=s_query["ratings"][0]["battingcard"], + ratings_vr=s_query["ratings"][0] + if s_query["ratings"][0]["vs_hand"] == "R" + else s_query["ratings"][1], + ratings_vl=s_query["ratings"][0] + if s_query["ratings"][0]["vs_hand"] == "L" + else s_query["ratings"][1], ) -async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache: bool = False) -> PitcherScouting | None: - logger.info(f'Getting pitching scouting for card ID #{card.id}: {card.player.name_with_desc}') - - s_query = await db_get(f'pitchingcardratings/player/{card.player.id}?variant={card.variant}', none_okay=False) - if s_query['count'] != 2: - log_exception(DatabaseError, f'Scouting for {card.player.name_with_desc} was not found.') +async def get_pitcher_scouting_or_none( + session: Session, card: Card, skip_cache: bool = False +) -> PitcherScouting | None: + logger.info( + f"Getting pitching scouting for card ID #{card.id}: {card.player.name_with_desc}" + ) - bs_query = session.exec(select(PitcherScouting).where(PitcherScouting.pitchingcard_id == s_query['ratings'][0]['pitchingcard']['id'])).all() - logger.info(f'bs_query: {bs_query}') + s_query = await db_get( + f"pitchingcardratings/player/{card.player.id}?variant={card.variant}", + none_okay=False, + ) + if s_query["count"] != 2: + log_exception( + DatabaseError, f"Scouting for {card.player.name_with_desc} was not found." + ) + + bs_query = session.exec( + select(PitcherScouting).where( + PitcherScouting.pitchingcard_id + == s_query["ratings"][0]["pitchingcard"]["id"] + ) + ).all() + logger.info(f"bs_query: {bs_query}") # this_scouting = session.get(PitcherScouting, s_query['ratings'][0]['pitchingcard']['id']) if len(bs_query) > 0: this_scouting = bs_query[0] - logger.info(f'we found a cached scouting: {this_scouting} / created {this_scouting.created}') + logger.info( + f"we found a cached scouting: {this_scouting} / created {this_scouting.created}" + ) tdelta = datetime.datetime.now() - this_scouting.created - logger.debug(f'tdelta: {tdelta}') + logger.debug(f"tdelta: {tdelta}") - if tdelta.total_seconds() < CACHE_LIMIT and None not in [this_scouting.pitchingcard, this_scouting.ratings_vl, this_scouting.ratings_vr]: - logger.info(f'returning cached scouting') + if tdelta.total_seconds() < CACHE_LIMIT and None not in [ + this_scouting.pitchingcard, + this_scouting.ratings_vl, + this_scouting.ratings_vr, + ]: + logger.info(f"returning cached scouting") return this_scouting - + else: - logger.info(f'Refreshing cache') + logger.info(f"Refreshing cache") # logger.info(f'deleting cached scouting') # session.delete(this_scouting.pitchingcard) # session.delete(this_scouting.ratings_vl) # session.delete(this_scouting.ratings_vr) # session.delete(this_scouting) # session.commit() - - def cache_scouting(pitching_card: dict, ratings_vr: dict, ratings_vl: dict) -> PitcherScouting: - logger.info(f'Beginning pitcher scouting cache process') + + def cache_scouting( + pitching_card: dict, ratings_vr: dict, ratings_vl: dict + ) -> PitcherScouting: + logger.info(f"Beginning pitcher scouting cache process") valid_bc = PitchingCardBase.model_validate(pitching_card, from_attributes=True) db_bc = PitchingCard.model_validate(valid_bc) @@ -427,71 +585,81 @@ async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache: valid_vr = PitchingRatingsBase.model_validate(ratings_vr, from_attributes=True) db_vr = PitchingRatings.model_validate(valid_vr) - logger.info(f'db_bc: {db_bc}\n\ndb_vl: {db_vl}\n\ndb_vr: {db_vr}') + logger.info(f"db_bc: {db_bc}\n\ndb_vl: {db_vl}\n\ndb_vr: {db_vr}") - logger.info(f'Checking for existing battingcard ID: {db_bc.id}') + logger.info(f"Checking for existing battingcard ID: {db_bc.id}") try: - this_card = session.exec(select(PitchingCard).where(PitchingCard.id == db_bc.id)).one() - logger.info(f'Found card: {this_card}\nUpdating with db card: {db_bc}') - + this_card = session.exec( + select(PitchingCard).where(PitchingCard.id == db_bc.id) + ).one() + logger.info(f"Found card: {this_card}\nUpdating with db card: {db_bc}") + for key, value in db_bc.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_card, key, value) - logger.info(f'Set this_card to db_bc') + logger.info(f"Set this_card to db_bc") session.add(this_card) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except Exception: - logger.info(f'Card not found, adding to db') + except NoResultFound: + logger.info(f"Card not found, adding to db") this_card = db_bc session.add(this_card) # session.commit() # session.refresh(db_card) # return db_card - logger.info(f'Checking for existing vl ratings ID: {db_vl.id}') + logger.info(f"Checking for existing vl ratings ID: {db_vl.id}") try: - this_vl_rating = session.exec(select(PitchingRatings).where(PitchingRatings.id == db_vl.id)).one() - logger.info(f'Found ratings: {this_vl_rating}\nUpdating with db ratings: {db_vl}') - + this_vl_rating = session.exec( + select(PitchingRatings).where(PitchingRatings.id == db_vl.id) + ).one() + logger.info( + f"Found ratings: {this_vl_rating}\nUpdating with db ratings: {db_vl}" + ) + for key, value in db_vl.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_vl_rating, key, value) - logger.info(f'Set this_vr_rating to db_vl') + logger.info(f"Set this_vr_rating to db_vl") session.add(this_vl_rating) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except Exception: - logger.info(f'Card not found, adding to db') + except NoResultFound: + logger.info(f"Card not found, adding to db") this_vl_rating = db_vl session.add(this_vl_rating) # session.commit() # session.refresh(db_card) # return db_card - logger.info(f'Checking for existing vr ratings ID: {db_vr.id}') + logger.info(f"Checking for existing vr ratings ID: {db_vr.id}") try: - this_vr_rating = session.exec(select(PitchingRatings).where(PitchingRatings.id == db_vr.id)).one() - logger.info(f'Found ratings: {this_vr_rating}\nUpdating with db ratings: {db_vr}') - + this_vr_rating = session.exec( + select(PitchingRatings).where(PitchingRatings.id == db_vr.id) + ).one() + logger.info( + f"Found ratings: {this_vr_rating}\nUpdating with db ratings: {db_vr}" + ) + for key, value in db_vr.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_vr_rating, key, value) - logger.info(f'Set this_vr_rating to db_vl') + logger.info(f"Set this_vr_rating to db_vl") session.add(this_vr_rating) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except Exception: - logger.info(f'Card not found, adding to db') + except NoResultFound: + logger.info(f"Card not found, adding to db") this_vr_rating = db_vr session.add(this_vr_rating) # session.commit() @@ -499,87 +667,111 @@ async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache: # return db_card db_scouting = PitcherScouting( - pitchingcard=this_card, - ratings_vl=this_vl_rating, - ratings_vr=this_vr_rating + pitchingcard=this_card, ratings_vl=this_vl_rating, ratings_vr=this_vr_rating ) - + session.add(db_scouting) - logger.info(f'caching scouting') + logger.info(f"caching scouting") session.commit() session.refresh(db_scouting) - logger.info(f'scouting id: {db_scouting.id} / pitching: {db_scouting.pitchingcard.id} / vL: {db_scouting.ratings_vl.id} / vR: {db_scouting.ratings_vr.id}') + logger.info( + f"scouting id: {db_scouting.id} / pitching: {db_scouting.pitchingcard.id} / vL: {db_scouting.ratings_vl.id} / vR: {db_scouting.ratings_vr.id}" + ) return db_scouting scouting = cache_scouting( - pitching_card=s_query['ratings'][0]['pitchingcard'], - ratings_vr=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'R' else s_query['ratings'][1], - ratings_vl=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'L' else s_query['ratings'][1] + pitching_card=s_query["ratings"][0]["pitchingcard"], + ratings_vr=s_query["ratings"][0] + if s_query["ratings"][0]["vs_hand"] == "R" + else s_query["ratings"][1], + ratings_vl=s_query["ratings"][0] + if s_query["ratings"][0]["vs_hand"] == "L" + else s_query["ratings"][1], ) - pos_rating = await get_position(session, card, 'P') + pos_rating = await get_position(session, card, "P") return scouting def get_player_id_from_dict(json_data: dict) -> int: - logger.info(f'Getting player from dict {json_data}') - if 'player_id' in json_data: - return json_data['player_id'] - elif 'id' in json_data: - return json_data['id'] - log_exception(KeyError, 'Player ID could not be extracted from json data') + logger.info(f"Getting player from dict {json_data}") + if "player_id" in json_data: + return json_data["player_id"] + elif "id" in json_data: + return json_data["id"] + log_exception(KeyError, "Player ID could not be extracted from json data") def get_player_name_from_dict(json_data: dict) -> str: - logger.info(f'Getting player from dict {json_data}') - if 'name' in json_data: - return json_data['name'] - elif 'p_name' in json_data: - return json_data['p_name'] - log_exception(KeyError, 'Player name could not be extracted from json data') + logger.info(f"Getting player from dict {json_data}") + if "name" in json_data: + return json_data["name"] + elif "p_name" in json_data: + return json_data["p_name"] + log_exception(KeyError, "Player name could not be extracted from json data") -async def shared_get_scouting(session: Session, this_card: Card, which: Literal['batter', 'pitcher']): - if which == 'batter': - logger.info(f'Pulling batter scouting for {this_card.player.name_with_desc}') +async def shared_get_scouting( + session: Session, this_card: Card, which: Literal["batter", "pitcher"] +): + if which == "batter": + logger.info(f"Pulling batter scouting for {this_card.player.name_with_desc}") this_scouting = await get_batter_scouting_or_none(session, this_card) else: - logger.info(f'Pulling pitcher scouting for {this_card.player.name_with_desc}') + logger.info(f"Pulling pitcher scouting for {this_card.player.name_with_desc}") this_scouting = await get_pitcher_scouting_or_none(session, this_card) - logger.info(f'this_scouting: {this_scouting}') + logger.info(f"this_scouting: {this_scouting}") return this_scouting -async def get_position(session: Session, this_card: Card, position: Literal['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF'], skip_cache: bool = False) -> PositionRating: - logger.info(f'Pulling position rating for {this_card.player.name_with_desc} at {position} / skip_cache: {skip_cache}') +async def get_position( + session: Session, + this_card: Card, + position: Literal["P", "C", "1B", "2B", "3B", "SS", "LF", "CF", "RF"], + skip_cache: bool = False, +) -> PositionRating: + logger.info( + f"Pulling position rating for {this_card.player.name_with_desc} at {position} / skip_cache: {skip_cache}" + ) if not skip_cache: - this_pos = session.exec(select(PositionRating).where(PositionRating.player_id == this_card.player.id, PositionRating.position == position, PositionRating.variant == this_card.variant)).all() - logger.info(f'Ratings found: {len(this_pos)}') + this_pos = session.exec( + select(PositionRating).where( + PositionRating.player_id == this_card.player.id, + PositionRating.position == position, + PositionRating.variant == this_card.variant, + ) + ).all() + logger.info(f"Ratings found: {len(this_pos)}") if len(this_pos) > 0: - logger.info(f'we found a cached position rating: {this_pos[0]} / created: {this_pos[0].created}') + logger.info( + f"we found a cached position rating: {this_pos[0]} / created: {this_pos[0].created}" + ) tdelta = datetime.datetime.now() - this_pos[0].created - logger.debug(f'tdelta: {tdelta}') + logger.debug(f"tdelta: {tdelta}") if tdelta.total_seconds() < CACHE_LIMIT: return this_pos[0] else: session.delete(this_pos[0]) session.commit() - + def cache_pos(json_data: dict) -> PositionRating: - if 'id' in json_data: - del json_data['id'] + if "id" in json_data: + del json_data["id"] valid_pos = PositionRatingBase.model_validate(json_data, from_attributes=True) db_pos = PositionRating.model_validate(valid_pos) session.add(db_pos) session.commit() session.refresh(db_pos) return db_pos - - p_query = await db_get('cardpositions', params=[('player_id', this_card.player.id), ('position', position)]) - if p_query['count'] > 0: - json_data = p_query['positions'][0] - json_data['player_id'] = get_player_id_from_dict(json_data['player']) + + p_query = await db_get( + "cardpositions", + params=[("player_id", this_card.player.id), ("position", position)], + ) + if p_query["count"] > 0: + json_data = p_query["positions"][0] + json_data["player_id"] = get_player_id_from_dict(json_data["player"]) this_pos = cache_pos(json_data) session.add(this_pos) @@ -587,86 +779,119 @@ async def get_position(session: Session, this_card: Card, position: Literal['P', session.refresh(this_pos) return this_pos - - log_exception(PositionNotFoundException, f'{position} ratings not found for {this_card.player.name_with_desc}') + + log_exception( + PositionNotFoundException, + f"{position} ratings not found for {this_card.player.name_with_desc}", + ) -async def get_all_positions(session: Session, this_card: Card, skip_cache: bool = False) -> int: - logger.info(f'Pulling all position ratings for {this_card.player.name_with_desc} / skip_cache: {skip_cache}') +async def get_all_positions( + session: Session, this_card: Card, skip_cache: bool = False +) -> int: + logger.info( + f"Pulling all position ratings for {this_card.player.name_with_desc} / skip_cache: {skip_cache}" + ) if not skip_cache: - all_pos = session.exec(select(PositionRating).where(PositionRating.player_id == this_card.player.id, PositionRating.variant == this_card.variant)).all() - logger.info(f'Ratings found: {len(all_pos)}') + all_pos = session.exec( + select(PositionRating).where( + PositionRating.player_id == this_card.player.id, + PositionRating.variant == this_card.variant, + ) + ).all() + logger.info(f"Ratings found: {len(all_pos)}") should_repull = False for position in all_pos: - logger.info(f'we found a cached position rating: {position} / created: {position.created}') + logger.info( + f"we found a cached position rating: {position} / created: {position.created}" + ) tdelta = datetime.datetime.now() - position.created - logger.debug(f'tdelta: {tdelta}') + logger.debug(f"tdelta: {tdelta}") if tdelta.total_seconds() >= CACHE_LIMIT or datetime.datetime.now().day < 5: session.delete(position) session.commit() should_repull = True - + if not should_repull and len(all_pos) > 0: - logger.info(f'Returning {len(all_pos)}') + logger.info(f"Returning {len(all_pos)}") return len(all_pos) - p_query = await db_get('cardpositions', params=[('player_id', this_card.player.id)]) + p_query = await db_get("cardpositions", params=[("player_id", this_card.player.id)]) - if not p_query or p_query['count'] == 0: - logger.info(f'No positions received, returning 0') + if not p_query or p_query["count"] == 0: + logger.info(f"No positions received, returning 0") return 0 - - old_pos = session.exec(select(PositionRating).where(PositionRating.player_id == this_card.player_id)).all() + + old_pos = session.exec( + select(PositionRating).where(PositionRating.player_id == this_card.player_id) + ).all() for position in old_pos: - logger.info(f'Deleting orphaned position rating: {position}') + logger.info(f"Deleting orphaned position rating: {position}") session.delete(position) - + session.commit() - + def cache_pos(json_data: dict) -> PositionRating: - if 'id' in json_data: - del json_data['id'] + if "id" in json_data: + del json_data["id"] valid_pos = PositionRatingBase.model_validate(json_data, from_attributes=True) db_pos = PositionRating.model_validate(valid_pos) session.add(db_pos) session.commit() session.refresh(db_pos) return db_pos - + added_count = 0 - for json_data in p_query['positions']: - logger.info(f'Processing: {json_data}') - json_data['player_id'] = get_player_id_from_dict(json_data['player']) + for json_data in p_query["positions"]: + logger.info(f"Processing: {json_data}") + json_data["player_id"] = get_player_id_from_dict(json_data["player"]) this_pos = cache_pos(json_data) session.add(this_pos) added_count += 1 - + return added_count -async def get_or_create_ai_card(session: Session, player: Player, team: Team, skip_cache: bool = False, dev_mode: bool = False) -> Card: - logger.info(f'Getting or creating card for {player.name_with_desc} on the {team.sname} / skip_cache: {skip_cache}') +async def get_or_create_ai_card( + session: Session, + player: Player, + team: Team, + skip_cache: bool = False, + dev_mode: bool = False, +) -> Card: + logger.info( + f"Getting or creating card for {player.name_with_desc} on the {team.sname} / skip_cache: {skip_cache}" + ) if not team.is_ai: - err = f'Cannot create AI cards for human teams' - logger.error(f'gameplay_models - get_or_create_ai_card: {err}') + err = f"Cannot create AI cards for human teams" + logger.error(f"gameplay_models - get_or_create_ai_card: {err}") raise TypeError(err) - - logger.info(f'gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}') + + logger.info( + f"gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}" + ) if not skip_cache: - c_query = session.exec(select(Card).where(Card.player == player, Card.team == team)).all() + c_query = session.exec( + select(Card).where(Card.player == player, Card.team == team) + ).all() if len(c_query) > 0: this_card = c_query[0] - logger.info(f'we found a cached card: {this_card} / created: {this_card.created}') + logger.info( + f"we found a cached card: {this_card} / created: {this_card.created}" + ) tdelta = datetime.datetime.now() - this_card.created - logger.debug(f'tdelta: {tdelta}') - if tdelta.total_seconds() < CACHE_LIMIT and (this_card.pitcherscouting is not None or this_card.batterscouting is not None): - logger.info(f'returning this_card') + logger.debug(f"tdelta: {tdelta}") + if tdelta.total_seconds() < CACHE_LIMIT and ( + this_card.pitcherscouting is not None + or this_card.batterscouting is not None + ): + logger.info(f"returning this_card") return this_card # else: # logger.info(f'deleting card record') @@ -674,58 +899,73 @@ async def get_or_create_ai_card(session: Session, player: Player, team: Team, sk # session.commit() async def pull_card(p: Player, t: Team): - c_query = await db_get('cards', params=[('team_id', t.id), ('player_id', p.id)]) - if c_query['count'] > 0: - json_data = c_query['cards'][0] - logger.info(f'gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}') - json_data['team_id'] = json_data['team']['id'] - json_data['player_id'] = get_player_id_from_dict(json_data['player']) - valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True) + c_query = await db_get("cards", params=[("team_id", t.id), ("player_id", p.id)]) + if c_query["count"] > 0: + json_data = c_query["cards"][0] + logger.info( + f"gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}" + ) + json_data["team_id"] = json_data["team"]["id"] + json_data["player_id"] = get_player_id_from_dict(json_data["player"]) + valid_card = CardBase.model_validate( + c_query["cards"][0], from_attributes=True + ) db_card = Card.model_validate(valid_card) - logger.info(f'gameplay_queries - cache_team - db_card: {db_card}') - logger.info(f'Checking for existing card ID: {db_card.id}') + logger.info(f"gameplay_queries - cache_team - db_card: {db_card}") + logger.info(f"Checking for existing card ID: {db_card.id}") try: - this_card = session.exec(select(Card).where(Card.id == db_card.id)).one() - logger.info(f'Found card: {this_card}\nUpdating with db card: {db_card}') - + this_card = session.exec( + select(Card).where(Card.id == db_card.id) + ).one() + logger.info( + f"Found card: {this_card}\nUpdating with db card: {db_card}" + ) + for key, value in db_card.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_card, key, value) - logger.info(f'Set this_card to db_card') + logger.info(f"Set this_card to db_card") session.add(this_card) session.commit() - logger.info(f'Refreshing this_card') + logger.info(f"Refreshing this_card") session.refresh(this_card) return this_card - except Exception: - logger.info(f'Card not found, adding to db') + except NoResultFound: + logger.info(f"Card not found, adding to db") session.add(db_card) session.commit() session.refresh(db_card) return db_card else: return None - + this_card = await pull_card(player, team) if this_card is not None: - if player.pos_1 not in ['SP', 'RP']: - this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') + if player.pos_1 not in ["SP", "RP"]: + this_card.batterscouting = await shared_get_scouting( + session, this_card, "batter" + ) else: - this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') - + this_card.pitcherscouting = await shared_get_scouting( + session, this_card, "pitcher" + ) + session.add(this_card) session.commit() session.refresh(this_card) return this_card - logger.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}') + logger.info( + f"gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}" + ) if dev_mode: # Find next available ID since Card model has autoincrement=False from sqlmodel import func + max_id = session.exec(select(func.max(Card.id))).one() next_id = (max_id or 0) + 1 this_card = Card(id=next_id, player=player, team=team) @@ -735,42 +975,51 @@ async def get_or_create_ai_card(session: Session, player: Player, team: Team, sk return this_card await db_post( - 'cards', - payload={'cards': [ - {'player_id': player.id, 'team_id': team.id, 'pack_id': 1} - ]} + "cards", + payload={"cards": [{"player_id": player.id, "team_id": team.id, "pack_id": 1}]}, ) - + this_card = await pull_card(player, team) if this_card is not None: - if player.pos_1 not in ['SP', 'RP']: - this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') + if player.pos_1 not in ["SP", "RP"]: + this_card.batterscouting = await shared_get_scouting( + session, this_card, "batter" + ) else: - this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') - + this_card.pitcherscouting = await shared_get_scouting( + session, this_card, "pitcher" + ) + session.add(this_card) session.commit() session.refresh(this_card) return this_card - - err = f'Could not create {player.name} card for {team.abbrev}' - logger.error(f'gameplay_models - get_or_create_ai_card - {err}') + + err = f"Could not create {player.name} card for {team.abbrev}" + logger.error(f"gameplay_models - get_or_create_ai_card - {err}") raise LookupError(err) @log_errors -async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = False) -> Card | None: - logger.info(f'Getting card {card_id} / skip_cache: {skip_cache}') +async def get_card_or_none( + session: Session, card_id: int, skip_cache: bool = False +) -> Card | None: + logger.info(f"Getting card {card_id} / skip_cache: {skip_cache}") if not skip_cache: this_card = session.get(Card, card_id) if this_card is not None: - logger.info(f'we found a cached card: {this_card} / created: {this_card.created}') + logger.info( + f"we found a cached card: {this_card} / created: {this_card.created}" + ) tdelta = datetime.datetime.now() - this_card.created - logger.debug(f'tdelta: {tdelta}') - if tdelta.total_seconds() < CACHE_LIMIT and (this_card.pitcherscouting is not None or this_card.batterscouting is not None): - logger.info(f'returning this_card') + logger.debug(f"tdelta: {tdelta}") + if tdelta.total_seconds() < CACHE_LIMIT and ( + this_card.pitcherscouting is not None + or this_card.batterscouting is not None + ): + logger.info(f"returning this_card") return this_card # else: # logger.info(f'deleting this_card') @@ -778,7 +1027,7 @@ async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = Fa # session.delete(this_card.batterscouting) # except Exception as e: # logger.error(f'Could not delete batter scouting: {e}') - + # try: # session.delete(this_card.pitcherscouting) # except Exception as e: @@ -786,120 +1035,183 @@ async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = Fa # session.delete(this_card) # session.commit() - + def cache_card(json_data: dict) -> Card: valid_card = CardBase.model_validate(json_data, from_attributes=True) db_card = Card.model_validate(valid_card) - logger.info(f'gameplay_queries - cache_team - db_card: {db_card}') - logger.info(f'Checking for existing card ID: {db_card.id}') + logger.info(f"gameplay_queries - cache_team - db_card: {db_card}") + logger.info(f"Checking for existing card ID: {db_card.id}") try: this_card = session.exec(select(Card).where(Card.id == db_card.id)).one() - logger.info(f'Found card: {this_card}\nUpdating with db card: {db_card}') - + logger.info(f"Found card: {this_card}\nUpdating with db card: {db_card}") + # this_team = db_team for key, value in db_card.model_dump(exclude_unset=True).items(): - logger.info(f'Setting key ({key}) to value ({value})') + logger.info(f"Setting key ({key}) to value ({value})") setattr(this_card, key, value) - logger.info(f'Set this_card to db_card') + logger.info(f"Set this_card to db_card") session.add(this_card) session.commit() - logger.info(f'Refreshing this_card') + logger.info(f"Refreshing this_card") session.refresh(this_card) return this_card - except Exception: - logger.info(f'Card not found, adding to db') + except NoResultFound: + logger.info(f"Card not found, adding to db") session.add(db_card) session.commit() session.refresh(db_card) return db_card - - c_query = await db_get('cards', object_id=card_id) + + c_query = await db_get("cards", object_id=card_id) if c_query is not None: - c_query['team_id'] = c_query['team']['id'] - c_query['player_id'] = get_player_id_from_dict(c_query['player']) - - this_player = await get_player_or_none(session, player_id=c_query['player_id']) - this_team = await get_team_or_none(session, team_id=c_query['team_id']) + c_query["team_id"] = c_query["team"]["id"] + c_query["player_id"] = get_player_id_from_dict(c_query["player"]) + + this_player = await get_player_or_none(session, player_id=c_query["player_id"]) + this_team = await get_team_or_none(session, team_id=c_query["team_id"]) if this_player is None: - raise LookupError(f'Player ID {c_query["player_id"]} not found during card check') + raise LookupError( + f"Player ID {c_query['player_id']} not found during card check" + ) if this_team is None: - raise LookupError(f'Team ID {c_query["team_id"]} not found during card check') + raise LookupError( + f"Team ID {c_query['team_id']} not found during card check" + ) - logger.info(f'Caching card ID {card_id} now') + logger.info(f"Caching card ID {card_id} now") this_card = cache_card(c_query) - - logger.info(f'Card is cached, checking for scouting') - all_pos = [x for x in [this_player.pos_1, this_player.pos_2, this_player.pos_3, this_player.pos_3, this_player.pos_4, this_player.pos_5, this_player.pos_6, this_player.pos_7, this_player.pos_8] if x is not None] - logger.info(f'All positions: {all_pos}') - if 'SP' in all_pos or 'RP' in all_pos: - logger.info(f'Pulling pitcher scouting') - this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') - if any(item in all_pos for item in ['DH', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF']): - logger.info(f'Pulling batter scouting') - this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') - - logger.info(f'Updating this_card') + + logger.info(f"Card is cached, checking for scouting") + all_pos = [ + x + for x in [ + this_player.pos_1, + this_player.pos_2, + this_player.pos_3, + this_player.pos_3, + this_player.pos_4, + this_player.pos_5, + this_player.pos_6, + this_player.pos_7, + this_player.pos_8, + ] + if x is not None + ] + logger.info(f"All positions: {all_pos}") + if "SP" in all_pos or "RP" in all_pos: + logger.info(f"Pulling pitcher scouting") + this_card.pitcherscouting = await shared_get_scouting( + session, this_card, "pitcher" + ) + if any( + item in all_pos + for item in ["DH", "C", "1B", "2B", "3B", "SS", "LF", "CF", "RF"] + ): + logger.info(f"Pulling batter scouting") + this_card.batterscouting = await shared_get_scouting( + session, this_card, "batter" + ) + + logger.info(f"Updating this_card") session.add(this_card) session.commit() - logger.info(f'Refreshing this_card') + logger.info(f"Refreshing this_card") session.refresh(this_card) - logger.info(f'this_card: {this_card}') + logger.info(f"this_card: {this_card}") return this_card return None -def get_game_lineups(session: Session, this_game: Game, specific_team: Team = None, is_active: bool = None) -> list[Lineup]: - logger.info(f'Getting lineups for game {this_game.id} / specific_team: {specific_team} / is_active: {is_active}') +def get_game_lineups( + session: Session, + this_game: Game, + specific_team: Team = None, + is_active: bool = None, +) -> list[Lineup]: + logger.info( + f"Getting lineups for game {this_game.id} / specific_team: {specific_team} / is_active: {is_active}" + ) st = select(Lineup).where(Lineup.game == this_game) - + if specific_team is not None: st = st.where(Lineup.team == specific_team) if is_active is not None: st = st.where(Lineup.active == is_active) - + return session.exec(st).all() -def get_players_last_pa(session: Session, lineup_member: Lineup, none_okay: bool = False): - logger.info(f'Getting last AB for {lineup_member.player.name_with_desc} on the {lineup_member.team.lname}') - last_pa = session.exec(select(Play).where(Play.game == lineup_member.game, Play.batter == lineup_member).order_by(Play.play_num.desc()).limit(1)).all() +def get_players_last_pa( + session: Session, lineup_member: Lineup, none_okay: bool = False +): + logger.info( + f"Getting last AB for {lineup_member.player.name_with_desc} on the {lineup_member.team.lname}" + ) + last_pa = session.exec( + select(Play) + .where(Play.game == lineup_member.game, Play.batter == lineup_member) + .order_by(Play.play_num.desc()) + .limit(1) + ).all() if len(last_pa) == 1: return last_pa[0] else: if none_okay: return None else: - log_exception(PlayNotFoundException, f'No play found for {lineup_member.player.name_with_desc}\'s last AB') + log_exception( + PlayNotFoundException, + f"No play found for {lineup_member.player.name_with_desc}'s last AB", + ) -def get_one_lineup(session: Session, this_game: Game, this_team: Team, active: bool = True, position: str = None, batting_order: int = None) -> Lineup: - logger.info(f'Getting one lineup / this_game: {this_game.id} / this_team: {this_team.lname} / active: {active}, position: {position}, batting_order: {batting_order}') +def get_one_lineup( + session: Session, + this_game: Game, + this_team: Team, + active: bool = True, + position: str = None, + batting_order: int = None, +) -> Lineup: + logger.info( + f"Getting one lineup / this_game: {this_game.id} / this_team: {this_team.lname} / active: {active}, position: {position}, batting_order: {batting_order}" + ) if position is None and batting_order is None: - raise KeyError('Position or batting order must be provided for get_one_lineup') - - st = select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team, Lineup.active == active) + raise KeyError("Position or batting order must be provided for get_one_lineup") + + st = select(Lineup).where( + Lineup.game == this_game, Lineup.team == this_team, Lineup.active == active + ) if position is not None: st = st.where(Lineup.position == position) else: st = st.where(Lineup.batting_order == batting_order) - - logger.info(f'get_one_lineup query: {st}') + + logger.info(f"get_one_lineup query: {st}") compiled = st.compile(compile_kwargs={"literal_binds": True}) - logger.info(f'get_one_lineup literal SQL: {compiled}') + logger.info(f"get_one_lineup literal SQL: {compiled}") this_lineup = session.exec(st).one() - logger.info(f'Found lineup: {this_lineup}') + logger.info(f"Found lineup: {this_lineup}") return this_lineup -def get_last_team_play(session: Session, this_game: Game, this_team: Team, none_okay: bool = False): - logger.info(f'Getting last play for the {this_team.lname} in game {this_game.id}') - last_play = session.exec(select(Play).join(Lineup, onclause=Lineup.id == Play.batter_id).where(Play.game == this_game, Lineup.team == this_team).order_by(Play.play_num.desc()).limit(1)).all() +def get_last_team_play( + session: Session, this_game: Game, this_team: Team, none_okay: bool = False +): + logger.info(f"Getting last play for the {this_team.lname} in game {this_game.id}") + last_play = session.exec( + select(Play) + .join(Lineup, onclause=Lineup.id == Play.batter_id) + .where(Play.game == this_game, Lineup.team == this_team) + .order_by(Play.play_num.desc()) + .limit(1) + ).all() if len(last_play) == 1: return last_play[0] @@ -907,50 +1219,78 @@ def get_last_team_play(session: Session, this_game: Game, this_team: Team, none_ if none_okay: return None else: - log_exception(PlayNotFoundException, f'No last play found for the {this_team.sname}') + log_exception( + PlayNotFoundException, f"No last play found for the {this_team.sname}" + ) -def get_sorted_lineups(session: Session, this_game: Game, this_team: Team) -> list[Lineup]: - logger.info(f'Getting sorted lineups for the {this_team.lname} in game {this_game.id}') - custom_order = {'P': 1, 'C': 2, '1B': 3, '2B': 4, '3B': 5, 'SS': 6, 'LF': 7, 'CF': 8, 'RF': 9} +def get_sorted_lineups( + session: Session, this_game: Game, this_team: Team +) -> list[Lineup]: + logger.info( + f"Getting sorted lineups for the {this_team.lname} in game {this_game.id}" + ) + custom_order = { + "P": 1, + "C": 2, + "1B": 3, + "2B": 4, + "3B": 5, + "SS": 6, + "LF": 7, + "CF": 8, + "RF": 9, + } - all_lineups = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.active == True, Lineup.team == this_team)).all() + all_lineups = session.exec( + select(Lineup).where( + Lineup.game == this_game, Lineup.active == True, Lineup.team == this_team + ) + ).all() - sorted_lineups = sorted(all_lineups, key=lambda x: custom_order.get(x.position, float('inf'))) + sorted_lineups = sorted( + all_lineups, key=lambda x: custom_order.get(x.position, float("inf")) + ) return sorted_lineups def get_db_ready_plays(session: Session, this_game: Game, db_game_id: int): - logger.info(f'Getting db ready plays for game {this_game.id}') - all_plays = session.exec(select(Play).where(Play.game == this_game).order_by(Play.play_num.desc())).all() + logger.info(f"Getting db ready plays for game {this_game.id}") + all_plays = session.exec( + select(Play).where(Play.game == this_game).order_by(Play.play_num.desc()) + ).all() - obc_list = ['000', '001', '010', '100', '011', '101', '110', '111'] + obc_list = ["000", "001", "010", "100", "011", "101", "110", "111"] return_plays = [] for play in all_plays: dump = play.model_dump() - dump['game_id'] = db_game_id - dump['on_base_code'] = obc_list[play.on_base_code] - dump['batter_id'] = play.batter.player.id - dump['pitcher_id'] = play.pitcher.player.id - dump['catcher_id'] = play.catcher.player.id - if 'runner_id' in dump and dump['runner_id'] is not None: - dump['runner_id'] = play.runner.player.id - if 'defender_id' in dump and dump['defender_id'] is not None: - dump['defender_id'] = play.defender.player.id - if 'on_first_id' in dump and dump['on_first_id'] is not None: - dump['on_first_id'] = play.on_first.player.id - if 'on_second_id' in dump and dump['on_second_id'] is not None: - dump['on_second_id'] = play.on_second.player.id - if 'on_third_id' in dump and dump['on_third_id'] is not None: - dump['on_third_id'] = play.on_third.player.id + dump["game_id"] = db_game_id + dump["on_base_code"] = obc_list[play.on_base_code] + dump["batter_id"] = play.batter.player.id + dump["pitcher_id"] = play.pitcher.player.id + dump["catcher_id"] = play.catcher.player.id + if "runner_id" in dump and dump["runner_id"] is not None: + dump["runner_id"] = play.runner.player.id + if "defender_id" in dump and dump["defender_id"] is not None: + dump["defender_id"] = play.defender.player.id + if "on_first_id" in dump and dump["on_first_id"] is not None: + dump["on_first_id"] = play.on_first.player.id + if "on_second_id" in dump and dump["on_second_id"] is not None: + dump["on_second_id"] = play.on_second.player.id + if "on_third_id" in dump and dump["on_third_id"] is not None: + dump["on_third_id"] = play.on_third.player.id return_plays.append(dump) - - return {'plays': return_plays} + + return {"plays": return_plays} -def get_db_ready_decisions(session: Session, this_game: Game, db_game_id: int) -> list[DecisionModel]: - logger.info(f'Game {this_game.id} | Getting db ready decisions for game {this_game.id}') +def get_db_ready_decisions( + session: Session, this_game: Game, db_game_id: int +) -> list[DecisionModel]: + logger.info( + f"Game {this_game.id} | Getting db ready decisions for game {this_game.id}" + ) save = None away_starter = None home_starter = None @@ -968,146 +1308,219 @@ def get_db_ready_decisions(session: Session, this_game: Game, db_game_id: int) - # { : DecisionModel } } - final_inning = session.exec(select(func.max(Play.inning_num)).where(Play.game == this_game)).one() - away_starter = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.team == this_game.away_team, Lineup.position == 'P', Lineup.after_play == 0)).one() + final_inning = session.exec( + select(func.max(Play.inning_num)).where(Play.game == this_game) + ).one() + away_starter = session.exec( + select(Lineup).where( + Lineup.game == this_game, + Lineup.team == this_game.away_team, + Lineup.position == "P", + Lineup.after_play == 0, + ) + ).one() away_pitcher = away_starter last_winner = None last_loser = None # Get starting pitchers and update this as a pointer for the play crawl - for play in session.exec(select(Play).where(Play.game == this_game).order_by(Play.play_num)).all(): - logger.info(f'Game {this_game.id} | Crawling play #{play.play_num}') + for play in session.exec( + select(Play).where(Play.game == this_game).order_by(Play.play_num) + ).all(): + logger.info(f"Game {this_game.id} | Crawling play #{play.play_num}") runs_scored = 0 - if play.inning_half == 'top': + if play.inning_half == "top": if home_starter is None: - logger.info(f'Game {this_game.id} | Setting home starter to {play.pitcher.player.name_with_desc} on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Setting home starter to {play.pitcher.player.name_with_desc} on play #{play.play_num}" + ) home_starter = play.pitcher if home_finisher is None: - logger.info(f'Game {this_game.id} | Setting home finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Setting home finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}" + ) home_finisher = play.pitcher - + if home_pitcher != play.pitcher: - logger.info(f'Game {this_game.id} | Setting home pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Setting home pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}" + ) home_pitcher = play.pitcher if save == play.pitcher: if play.home_score > play.away_score: if play.pitcher not in holds: - logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}" + ) holds.append(play.pitcher) else: if play.pitcher not in b_save: - logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}" + ) b_save.append(play.pitcher) - - elif play.home_score > play.away_score and play.home_score - play.away_score <= 3 and home_pitcher != home_starter and play.inning_num >= final_inning - 2: - logger.info(f'Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}') + + elif ( + play.home_score > play.away_score + and play.home_score - play.away_score <= 3 + and home_pitcher != home_starter + and play.inning_num >= final_inning - 2 + ): + logger.info( + f"Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}" + ) save = home_pitcher - - elif play.inning_half == 'bot': + + elif play.inning_half == "bot": if away_finisher is None: - logger.info(f'Game {this_game.id} | Setting away finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Setting away finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}" + ) away_finisher = play.pitcher - + if away_pitcher != play.pitcher: - logger.info(f'Game {this_game.id} | Setting away pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Setting away pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}" + ) away_pitcher = play.pitcher if save == play.pitcher: if play.away_score > play.home_score: if play.pitcher not in holds: - logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}" + ) holds.append(play.pitcher) else: if play.pitcher not in b_save: - logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}" + ) b_save.append(play.pitcher) - - elif play.away_score > play.home_score and play.away_score - play.home_score <= 3 and away_pitcher != away_starter and play.inning_num >= final_inning - 2: - logger.info(f'Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}') + + elif ( + play.away_score > play.home_score + and play.away_score - play.home_score <= 3 + and away_pitcher != away_starter + and play.inning_num >= final_inning - 2 + ): + logger.info( + f"Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}" + ) save = away_pitcher - + if play.is_go_ahead: run_diff = play.home_score - play.away_score - for x in [play.on_first_final, play.on_second_final, play.on_third_final, play.batter_final]: + for x in [ + play.on_first_final, + play.on_second_final, + play.on_third_final, + play.batter_final, + ]: runs_scored += 1 if x == 4 else 0 - if play.inning_half == 'top': + if play.inning_half == "top": run_diff -= runs_scored else: run_diff += runs_scored - logger.info(f'run_diff for go-ahead: {run_diff}') - logger.info(f'go-ahead play: {play}') + logger.info(f"run_diff for go-ahead: {run_diff}") + logger.info(f"go-ahead play: {play}") count = 1 - for runner, dest in [(play.on_third, play.on_third_final), (play.on_second, play.on_second_final), (play.on_first, play.on_first_final), (play.batter, play.batter_final)]: - logger.info(f'Game {this_game.id} | Looking for go-ahead runner / runner, dest: {runner}, {dest} / count: {count}') + for runner, dest in [ + (play.on_third, play.on_third_final), + (play.on_second, play.on_second_final), + (play.on_first, play.on_first_final), + (play.batter, play.batter_final), + ]: + logger.info( + f"Game {this_game.id} | Looking for go-ahead runner / runner, dest: {runner}, {dest} / count: {count}" + ) if dest == 4 and count == abs(run_diff): winning_play = get_players_last_pa(session, runner) loser = winning_play.pitcher - logger.info(f'Game {this_game.id} | Setting loser to {loser} on play #{play.play_num}') - + logger.info( + f"Game {this_game.id} | Setting loser to {loser} on play #{play.play_num}" + ) + if save == loser: - logger.info(f'Game {this_game.id} | Appending {loser} to blown saves on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Appending {loser} to blown saves on play #{play.play_num}" + ) b_save.append(loser) - - winner = home_pitcher if play.inning_half == 'bot' else away_pitcher - logger.info(f'Game {this_game.id} | Setting winner to {winner} on play #{play.play_num}') + + winner = home_pitcher if play.inning_half == "bot" else away_pitcher + logger.info( + f"Game {this_game.id} | Setting winner to {winner} on play #{play.play_num}" + ) break count += 1 - + if winner is None: - winner = home_pitcher if play.inning_half == 'bot' else away_pitcher - logger.info(f'Game {this_game.id} | Setting winner to {winner} by default on play #{play.play_num}') - + winner = home_pitcher if play.inning_half == "bot" else away_pitcher + logger.info( + f"Game {this_game.id} | Setting winner to {winner} by default on play #{play.play_num}" + ) + if loser is None: - logger.info(f'Game {this_game.id} | Setting loser to {play.pitcher} by default on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Setting loser to {play.pitcher} by default on play #{play.play_num}" + ) loser = play.pitcher - + if play.is_tied and runs_scored == 0: - logger.info(f'Game {this_game.id} | Clearing winner and loser on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Clearing winner and loser on play #{play.play_num}" + ) last_winner = winner last_loser = loser winner, loser = None, None if save is not None: - logger.info(f'Game {this_game.id} | Appending current save pitcher {save.player.name_with_desc} to blown saves and clearing save on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Appending current save pitcher {save.player.name_with_desc} to blown saves and clearing save on play #{play.play_num}" + ) b_save.append(save) save = None - + if play.pitcher.player_id not in decisions: - logger.info(f'Game {this_game.id} | Adding {play.pitcher.player.name} to decisions dict on play #{play.play_num}') + logger.info( + f"Game {this_game.id} | Adding {play.pitcher.player.name} to decisions dict on play #{play.play_num}" + ) decisions[play.pitcher.player_id] = DecisionModel( game_id=db_game_id, season=this_game.season, week=0 if this_game.week is None else this_game.week, pitcher_id=play.pitcher.player_id, - pitcher_team_id=play.pitcher.team_id + pitcher_team_id=play.pitcher.team_id, ) - + # After the play loop, determine winner/loser from final game state if still None - final_play = session.exec(select(Play).where(Play.game == this_game).order_by(Play.play_num.desc())).first() + final_play = session.exec( + select(Play).where(Play.game == this_game).order_by(Play.play_num.desc()) + ).first() if winner is None: if final_play.home_score > final_play.away_score: winner = home_finisher - logger.info(f'Setting winner to home_finisher: {winner}') + logger.info(f"Setting winner to home_finisher: {winner}") else: winner = away_finisher - logger.info(f'Setting winner to away_finisher: {winner}') + logger.info(f"Setting winner to away_finisher: {winner}") if loser is None: if final_play.home_score > final_play.away_score: loser = away_finisher - logger.info(f'Setting loser to away_finisher: {loser}') + logger.info(f"Setting loser to away_finisher: {loser}") else: loser = home_finisher - logger.info(f'Setting loser to home_finisher: {loser}') - - logger.info(f'winner: {winner} / loser: {loser}') + logger.info(f"Setting loser to home_finisher: {loser}") + + logger.info(f"winner: {winner} / loser: {loser}") if winner is not None: decisions[winner.player_id].win = 1 if loser is not None: @@ -1120,301 +1533,453 @@ def get_db_ready_decisions(session: Session, this_game: Game, db_game_id: int) - decisions[away_finisher.player_id].game_finished = 1 if home_finisher is not None: decisions[home_finisher.player_id].game_finished = 1 - + for lineup in holds: decisions[lineup.player_id].hold = 1 - + if save is not None: decisions[save.player_id].is_save = 1 decisions[save.player_id].hold = 0 - + for lineup in b_save: decisions[lineup.player_id].b_save = 1 decisions[lineup.player_id].is_save = 0 - + return [x.model_dump() for x in decisions.values()] -async def post_game_rewards(session: Session, winning_team: Team, losing_team: Team, this_game: Game): +async def post_game_rewards( + session: Session, winning_team: Team, losing_team: Team, this_game: Game +): wr_query = await db_get( - 'gamerewards', params=[('name', f'{"Short" if this_game.short_game else "Full"} Game Win')]) + "gamerewards", + params=[("name", f"{'Short' if this_game.short_game else 'Full'} Game Win")], + ) lr_query = await db_get( - 'gamerewards', params=[('name', f'{"Short" if this_game.short_game else "Full"} Game Loss')]) - if not wr_query['count'] or not lr_query['count']: - raise DatabaseError(f'Game rewards were not found. Leaving this game active.') - - win_reward = wr_query['gamerewards'][0] - loss_reward = lr_query['gamerewards'][0] - win_string = f'1x {win_reward["pack_type"]["name"]} Pack\n' + "gamerewards", + params=[("name", f"{'Short' if this_game.short_game else 'Full'} Game Loss")], + ) + if not wr_query["count"] or not lr_query["count"]: + raise DatabaseError(f"Game rewards were not found. Leaving this game active.") + + win_reward = wr_query["gamerewards"][0] + loss_reward = lr_query["gamerewards"][0] + win_string = f"1x {win_reward['pack_type']['name']} Pack\n" # Post Campaign Team Choice packs - if this_game.ai_team is not None and losing_team.is_ai and 'gauntlet' not in this_game.game_type and not this_game.short_game: - g_query = await db_get('games', params=[('team1_id', winning_team.id), ('season', this_game.season), ('forfeit', False)]) + if ( + this_game.ai_team is not None + and losing_team.is_ai + and "gauntlet" not in this_game.game_type + and not this_game.short_game + ): + g_query = await db_get( + "games", + params=[ + ("team1_id", winning_team.id), + ("season", this_game.season), + ("forfeit", False), + ], + ) win_points = 0 - for x in g_query['games']: - if (x['away_score'] > x['home_score'] and x['away_team']['id'] == winning_team.id) or (x['home_score'] > x['away_score'] and x['home_team']['id'] == winning_team.id): - if x['game_type'] == 'minor-league': + for x in g_query["games"]: + if ( + x["away_score"] > x["home_score"] + and x["away_team"]["id"] == winning_team.id + ) or ( + x["home_score"] > x["away_score"] + and x["home_team"]["id"] == winning_team.id + ): + if x["game_type"] == "minor-league": win_points += 1 - elif x['game_type'] in ['major-league', 'flashback']: + elif x["game_type"] in ["major-league", "flashback"]: win_points += 2 - - elif x['game_type'] == 'hall-of-fame': + + elif x["game_type"] == "hall-of-fame": win_points += 3 - - if this_game.game_type == 'minor-league': + + if this_game.game_type == "minor-league": this_game_points = 1 - elif this_game.game_type in ['major-league', 'flashback']: + elif this_game.game_type in ["major-league", "flashback"]: this_game_points = 2 else: this_game_points = 3 pre_game_points = win_points - this_game_points if math.floor(win_points / 6) > math.floor(pre_game_points / 6): - await db_post('packs/one', payload={ - 'team_id': winning_team.id, - 'pack_type_id': 8, - 'pack_team_id': losing_team.id - }) - win_string += f'1x {losing_team.abbrev} Team Choice pack\n' + await db_post( + "packs/one", + payload={ + "team_id": winning_team.id, + "pack_type_id": 8, + "pack_team_id": losing_team.id, + }, + ) + win_string += f"1x {losing_team.abbrev} Team Choice pack\n" - win_string += f'{win_reward["money"]}₼\n' - loss_string = f'{loss_reward["money"]}₼\n' + win_string += f"{win_reward['money']}₼\n" + loss_string = f"{loss_reward['money']}₼\n" - if 'gauntlet' in this_game.game_type: + if "gauntlet" in this_game.game_type: winning_abbrev = winning_team.abbrev.lower() - if 'gauntlet' in winning_abbrev: - winning_team = await get_team_or_none(session, team_abbrev=winning_abbrev.split('-')[1]) + if "gauntlet" in winning_abbrev: + winning_team = await get_team_or_none( + session, team_abbrev=winning_abbrev.split("-")[1] + ) if winning_team is None: - raise DatabaseError(f'Main team not found for {winning_abbrev}') - - losing_abbrev = losing_team.abbrev.lower() - if 'gauntlet' in losing_abbrev: - losing_team = await get_team_or_none(session, team_abbrev=losing_abbrev.split('-')[1]) - if losing_team is None: - raise DatabaseError(f'Main team not found for {losing_abbrev}') + raise DatabaseError(f"Main team not found for {winning_abbrev}") - await db_post('packs/one', payload={'team_id': winning_team.id, 'pack_type_id': win_reward['pack_type']['id']}) - await db_post(f'teams/{winning_team.id}/money/{win_reward["money"]}') - await db_post(f'teams/{losing_team.id}/money/{loss_reward["money"]}') + losing_abbrev = losing_team.abbrev.lower() + if "gauntlet" in losing_abbrev: + losing_team = await get_team_or_none( + session, team_abbrev=losing_abbrev.split("-")[1] + ) + if losing_team is None: + raise DatabaseError(f"Main team not found for {losing_abbrev}") + + await db_post( + "packs/one", + payload={ + "team_id": winning_team.id, + "pack_type_id": win_reward["pack_type"]["id"], + }, + ) + await db_post(f"teams/{winning_team.id}/money/{win_reward['money']}") + await db_post(f"teams/{losing_team.id}/money/{loss_reward['money']}") return win_string, loss_string -def get_available_subs(session: Session, this_game: Game, this_team: Team) -> list[Card]: - logger.info(f'Getting all available subs') - team_lineups = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team)).all() +def get_available_subs( + session: Session, this_game: Game, this_team: Team +) -> list[Card]: + logger.info(f"Getting all available subs") + team_lineups = session.exec( + select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team) + ).all() used_card_ids = [x.card.id for x in team_lineups] - logger.info(f'USED CARD IDS: {used_card_ids}') + logger.info(f"USED CARD IDS: {used_card_ids}") - all_roster_links = session.exec(select(RosterLink).where(RosterLink.game == this_game, RosterLink.team == this_team)).all() + all_roster_links = session.exec( + select(RosterLink).where( + RosterLink.game == this_game, RosterLink.team == this_team + ) + ).all() return [x.card for x in all_roster_links if x.card_id not in used_card_ids] -def get_available_pitchers(session: Session, this_game: Game, this_team: Team, sort: Literal['starter-desc', 'closer-desc'] = 'closer-desc') -> list[Card]: - logger.info(f'getting available pitchers for team {this_team.id} in game {this_game.id}') +def get_available_pitchers( + session: Session, + this_game: Game, + this_team: Team, + sort: Literal["starter-desc", "closer-desc"] = "closer-desc", +) -> list[Card]: + logger.info( + f"getting available pitchers for team {this_team.id} in game {this_game.id}" + ) all_subs = get_available_subs(session, this_game, this_team) - logger.info(f'all_subs: {all_subs}') + logger.info(f"all_subs: {all_subs}") pitchers = [x for x in all_subs if x.pitcherscouting is not None] - logger.info(f'pitchers: {pitchers}') - + logger.info(f"pitchers: {pitchers}") + def sort_by_pow(this_card: Card): s_pow = this_card.pitcherscouting.pitchingcard.starter_rating r_pow = this_card.pitcherscouting.pitchingcard.relief_rating - c_pow = this_card.pitcherscouting.pitchingcard.closer_rating if this_card.pitcherscouting.pitchingcard.closer_rating is not None else 0 - - if sort == 'starter-desc': + c_pow = ( + this_card.pitcherscouting.pitchingcard.closer_rating + if this_card.pitcherscouting.pitchingcard.closer_rating is not None + else 0 + ) + + if sort == "starter-desc": r_val = (s_pow * 3) + r_pow else: r_val = (c_pow * 10) - (r_pow * 5) - (s_pow * 3) - + return r_val - + pitchers.sort(key=sort_by_pow, reverse=True) return pitchers -def get_available_batters(session: Session, this_game: Game, this_team: Team) -> list[Card]: - logger.info(f'getting available batters for team {this_team.id} in game {this_game.id}') +def get_available_batters( + session: Session, this_game: Game, this_team: Team +) -> list[Card]: + logger.info( + f"getting available batters for team {this_team.id} in game {this_game.id}" + ) all_subs = get_available_subs(session, this_game, this_team) - logger.info(f'all_subs: {all_subs}') + logger.info(f"all_subs: {all_subs}") batters = [x for x in all_subs if x.batterscouting is not None] - logger.info(f'batters: {batters}') + logger.info(f"batters: {batters}") return batters def get_batter_card(this_card: Card = None, this_lineup: Lineup = None) -> BattingCard: if this_card is not None: - logger.info(f'Getting batter card for {this_card.player.name}') + logger.info(f"Getting batter card for {this_card.player.name}") return this_card.batterscouting.battingcard if this_lineup is not None: - logger.info(f'Getting batter card for {this_lineup.player.name}') + logger.info(f"Getting batter card for {this_lineup.player.name}") return this_lineup.card.batterscouting.battingcard - log_exception(KeyError, 'Either a Card or Lineup must be provided to get_batter_card') + log_exception( + KeyError, "Either a Card or Lineup must be provided to get_batter_card" + ) def get_batting_statline(session: Session, this_lineup: Lineup) -> str: - logger.info(f'Getting batting statline for {this_lineup.player.name} in Game {this_lineup.game.id}') + logger.info( + f"Getting batting statline for {this_lineup.player.name} in Game {this_lineup.game.id}" + ) - at_bats = session.exec(select(func.count(Play.id)).where( - Play.game == this_lineup.game, Play.batter == this_lineup, Play.ab == 1, Play.complete == True - )).one() - hits = session.exec(select(func.count(Play.id)).where( + at_bats = session.exec( + select(func.count(Play.id)).where( + Play.game == this_lineup.game, + Play.batter == this_lineup, + Play.ab == 1, + Play.complete == True, + ) + ).one() + hits = session.exec( + select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.hit == 1 - )).one() + ) + ).one() - bat_string = f'{hits}-{at_bats}' - logger.info(f'at-bat bat_string: {bat_string}') + bat_string = f"{hits}-{at_bats}" + logger.info(f"at-bat bat_string: {bat_string}") - homeruns = session.exec(select(func.count(Play.id)).where( + homeruns = session.exec( + select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.homerun == 1 - )).one() + ) + ).one() if homeruns > 0: - number_string = f'{homeruns} ' if homeruns > 1 else "" - bat_string += f', {number_string}HR' + number_string = f"{homeruns} " if homeruns > 1 else "" + bat_string += f", {number_string}HR" - triples = session.exec(select(func.count(Play.id)).where( + triples = session.exec( + select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.triple == 1 - )).one() + ) + ).one() if triples > 0: - number_string = f'{triples} ' if triples > 1 else "" - bat_string += f', {number_string}3B' - - doubles = session.exec(select(func.count(Play.id)).where( - Play.game == this_lineup.game, Play.batter == this_lineup, Play.double == 1 - )).one() - if doubles > 0: - number_string = f'{doubles} ' if doubles > 1 else "" - bat_string += f', {number_string}2B' - - stolenbases = session.exec(select(func.count(Play.id)).where( - Play.game == this_lineup.game, Play.runner == this_lineup, Play.sb == 1 - )).one() - if stolenbases > 0: - number_string = f'{stolenbases} ' if stolenbases > 1 else "" - bat_string += f', {number_string}SB' - - walks = session.exec(select(func.count(Play.id)).where( - Play.game == this_lineup.game, Play.batter == this_lineup, Play.bb == 1 - )).one() - if walks > 0: - number_string = f'{walks} ' if walks > 1 else "" - bat_string += f', {number_string}BB' - - strikeouts = session.exec(select(func.count(Play.id)).where( - Play.game == this_lineup.game, Play.batter == this_lineup, Play.so == 1 - )).one() - if strikeouts > 0: - number_string = f'{strikeouts} ' if strikeouts > 1 else "" - bat_string += f', {number_string}K' - - logger.info(f'bat_string: {bat_string}') + number_string = f"{triples} " if triples > 1 else "" + bat_string += f", {number_string}3B" - if bat_string == '0-0': - return '1st AB' + doubles = session.exec( + select(func.count(Play.id)).where( + Play.game == this_lineup.game, Play.batter == this_lineup, Play.double == 1 + ) + ).one() + if doubles > 0: + number_string = f"{doubles} " if doubles > 1 else "" + bat_string += f", {number_string}2B" + + stolenbases = session.exec( + select(func.count(Play.id)).where( + Play.game == this_lineup.game, Play.runner == this_lineup, Play.sb == 1 + ) + ).one() + if stolenbases > 0: + number_string = f"{stolenbases} " if stolenbases > 1 else "" + bat_string += f", {number_string}SB" + + walks = session.exec( + select(func.count(Play.id)).where( + Play.game == this_lineup.game, Play.batter == this_lineup, Play.bb == 1 + ) + ).one() + if walks > 0: + number_string = f"{walks} " if walks > 1 else "" + bat_string += f", {number_string}BB" + + strikeouts = session.exec( + select(func.count(Play.id)).where( + Play.game == this_lineup.game, Play.batter == this_lineup, Play.so == 1 + ) + ).one() + if strikeouts > 0: + number_string = f"{strikeouts} " if strikeouts > 1 else "" + bat_string += f", {number_string}K" + + logger.info(f"bat_string: {bat_string}") + + if bat_string == "0-0": + return "1st AB" else: return bat_string def get_pitching_statline(session: Session, this_lineup: Lineup) -> str: - logger.info(f'Getting pitching statline for {this_lineup.player.name} in Game {this_lineup.game.id}') + logger.info( + f"Getting pitching statline for {this_lineup.player.name} in Game {this_lineup.game.id}" + ) - outs = session.exec(select(func.sum(Play.outs)).where( - Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.complete == True - )).one() + outs = session.exec( + select(func.sum(Play.outs)).where( + Play.game == this_lineup.game, + Play.pitcher == this_lineup, + Play.complete == True, + ) + ).one() if outs is None: - return '***N E W P I T C H E R***' + return "***N E W P I T C H E R***" whole_innings = math.floor(outs / 3) rem_outs = outs % 3 - pit_string = f'{whole_innings}.{rem_outs} IP' - logger.info(f'IP pit_string: {pit_string}') + pit_string = f"{whole_innings}.{rem_outs} IP" + logger.info(f"IP pit_string: {pit_string}") - runs = session.exec(select(func.count(Play.id)).where( + runs = session.exec( + select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.run == 1 - )).one() + ) + ).one() if runs > 0: - number_string = f'{runs} ' if runs > 1 else "" - pit_string += f', {number_string}R' + number_string = f"{runs} " if runs > 1 else "" + pit_string += f", {number_string}R" - e_runs = session.exec(select(func.count(Play.id)).where( + e_runs = session.exec( + select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.e_run == 1 - )).one() + ) + ).one() if e_runs != runs: - pit_string += f' ({e_runs} ER)' + pit_string += f" ({e_runs} ER)" - hits = session.exec(select(func.count(Play.id)).where( + hits = session.exec( + select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.hit == 1 - )).one() + ) + ).one() if hits > 0: - pit_string += f', {hits} H' + pit_string += f", {hits} H" - walks = session.exec(select(func.count(Play.id)).where( + walks = session.exec( + select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.bb == 1 - )).one() + ) + ).one() if walks > 0: - number_string = f'{walks} ' if walks > 1 else "" - pit_string += f', {number_string}BB' + number_string = f"{walks} " if walks > 1 else "" + pit_string += f", {number_string}BB" - strikeouts = session.exec(select(func.count(Play.id)).where( + strikeouts = session.exec( + select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.so == 1 - )).one() + ) + ).one() if strikeouts > 0: - number_string = f'{strikeouts} ' if strikeouts > 1 else "" - pit_string += f', {number_string}K' + number_string = f"{strikeouts} " if strikeouts > 1 else "" + pit_string += f", {number_string}K" return pit_string def get_game_cardset_links(session: Session, this_game: Game) -> list[GameCardsetLink]: - logger.info(f'Getting game cardset links for game: {this_game}') - cardset_links = session.exec(select(GameCardsetLink).where(GameCardsetLink.game == this_game)).all() - logger.info(f'links: {cardset_links}') + logger.info(f"Getting game cardset links for game: {this_game}") + cardset_links = session.exec( + select(GameCardsetLink).where(GameCardsetLink.game == this_game) + ).all() + logger.info(f"links: {cardset_links}") return cardset_links -def get_plays_by_pitcher(session: Session, this_game: Game, this_lineup: Lineup, reversed: bool = False) -> list[Play]: - logger.info(f'Getting all pitching plays for {this_lineup.card.player.name_with_desc}') +def get_plays_by_pitcher( + session: Session, this_game: Game, this_lineup: Lineup, reversed: bool = False +) -> list[Play]: + logger.info( + f"Getting all pitching plays for {this_lineup.card.player.name_with_desc}" + ) statement = select(Play).where(Play.game == this_game, Play.pitcher == this_lineup) if reversed: statement = statement.order_by(Play.play_num.desc()) all_plays = session.exec(statement).all() - - logger.info(f'all_plays: {all_plays}') + + logger.info(f"all_plays: {all_plays}") return all_plays -def get_pitcher_runs_by_innings(session: Session, this_game: Game, this_pitcher: Lineup, innings: list[int]) -> int: - logger.info(f'Checking runs for {this_pitcher.player.name_with_desc} in innings: {innings}') - runs = session.exec(select(func.count(Play.id)).where( - Play.game == this_game, Play.pitcher == this_pitcher, Play.run == 1, Play.inning_num.in_(innings) - )).one() - logger.info(f'runs: {runs}') +def get_pitcher_runs_by_innings( + session: Session, this_game: Game, this_pitcher: Lineup, innings: list[int] +) -> int: + logger.info( + f"Checking runs for {this_pitcher.player.name_with_desc} in innings: {innings}" + ) + runs = session.exec( + select(func.count(Play.id)).where( + Play.game == this_game, + Play.pitcher == this_pitcher, + Play.run == 1, + Play.inning_num.in_(innings), + ) + ).one() + logger.info(f"runs: {runs}") return runs - -def reset_cache(session: Session, players: bool = True, scouting: bool = True, team: bool = True): + +def reset_cache( + session: Session, players: bool = True, scouting: bool = True, team: bool = True +): if players: - logger.warning(f'Resetting created date for Players') - session.exec(update(Player).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + logger.warning(f"Resetting created date for Players") + session.exec( + update(Player).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) if scouting: - logger.warning(f'Resetting created date for scouting objects') - session.exec(update(BattingCard).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) - session.exec(update(BatterScouting).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) - session.exec(update(PitchingCard).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) - session.exec(update(PitcherScouting).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) - session.exec(update(PositionRating).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) - session.exec(update(PitchingRatings).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) - session.exec(update(BattingRatings).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + logger.warning(f"Resetting created date for scouting objects") + session.exec( + update(BattingCard).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) + session.exec( + update(BatterScouting).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) + session.exec( + update(PitchingCard).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) + session.exec( + update(PitcherScouting).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) + session.exec( + update(PositionRating).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) + session.exec( + update(PitchingRatings).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) + session.exec( + update(BattingRatings).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) if team: - logger.warning(f'Resetting created date for Teams') - session.exec(update(Team).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) - + logger.warning(f"Resetting created date for Teams") + session.exec( + update(Team).values( + created=datetime.datetime.now() - datetime.timedelta(days=365) + ) + ) + session.commit() diff --git a/paperdynasty.py b/paperdynasty.py index 65f3845..20beac3 100644 --- a/paperdynasty.py +++ b/paperdynasty.py @@ -54,6 +54,7 @@ COGS = [ "cogs.gameplay", "cogs.economy_new.scouting", "cogs.refractor", + "cogs.compare", ] intents = discord.Intents.default() diff --git a/tests/test_compare_command.py b/tests/test_compare_command.py new file mode 100644 index 0000000..1a408e9 --- /dev/null +++ b/tests/test_compare_command.py @@ -0,0 +1,395 @@ +""" +Tests for the /compare slash command embed builder (cogs/compare.py). + +What: + - build_compare_embed() is a pure function that takes two card-data dicts + and returns a discord.Embed. + - Tests verify field count, arrow directions, type-mismatch raises, and + tied stats. + +Why: + - The embed builder has no Discord I/O so it can be tested synchronously + without a bot or API calls. + - Correct arrow direction is critical for usability: wrong arrows would + mislead players making trade/lineup decisions. +""" + +import sys +import os + +import pytest +import discord + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from cogs.compare import ( + build_compare_embed, + CompareMismatchError, + _delta_arrow, + _is_pitcher, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _make_player( + name: str = "Test Player", + pos_1: str = "CF", + rarity_value: int = 3, + rarity_name: str = "All-Star", + cost: int = 300, + headshot: str = None, +) -> dict: + """Build a minimal player dict that mirrors the API response shape.""" + return { + "player_id": 1, + "p_name": name, + "pos_1": pos_1, + "cost": cost, + "rarity": { + "value": rarity_value, + "name": rarity_name, + "color": "FFD700", + }, + "headshot": headshot, + } + + +def _make_batting_card( + running: int = 12, + steal_low: int = 7, + steal_high: int = 12, + bunting: str = "C", + hit_and_run: str = "B", +) -> dict: + """Build a minimal batting card dict.""" + return { + "running": running, + "steal_low": steal_low, + "steal_high": steal_high, + "steal_auto": False, + "steal_jump": 0.2, + "bunting": bunting, + "hit_and_run": hit_and_run, + "hand": "R", + "offense_col": 1, + } + + +def _make_pitching_card( + starter_rating: int = 7, + relief_rating: int = 4, + closer_rating: int = None, + balk: int = 2, + wild_pitch: int = 3, +) -> dict: + """Build a minimal pitching card dict.""" + return { + "starter_rating": starter_rating, + "relief_rating": relief_rating, + "closer_rating": closer_rating, + "balk": balk, + "wild_pitch": wild_pitch, + "hand": "R", + } + + +def _batter_card_data(player: dict, batting_card: dict) -> dict: + return { + "player": player, + "battingcard": batting_card, + "pitchingcard": None, + } + + +def _pitcher_card_data(player: dict, pitching_card: dict) -> dict: + return { + "player": player, + "battingcard": None, + "pitchingcard": pitching_card, + } + + +# --------------------------------------------------------------------------- +# _delta_arrow unit tests +# --------------------------------------------------------------------------- + + +class TestDeltaArrow: + """_delta_arrow correctly indicates which side wins.""" + + def test_higher_wins_when_card2_greater(self): + """▲ when card2 value is higher and higher_is_better=True.""" + assert _delta_arrow(10, 15) == "▲" + + def test_higher_wins_when_card1_greater(self): + """▼ when card1 value is higher and higher_is_better=True.""" + assert _delta_arrow(20, 10) == "▼" + + def test_tied_returns_equals(self): + """═ when both values are equal.""" + assert _delta_arrow(10, 10) == "═" + + def test_lower_is_better_arrow_flipped(self): + """▲ when card2 is lower and lower_is_better (e.g. balk count).""" + assert _delta_arrow(5, 2, higher_is_better=False) == "▲" + + def test_lower_is_better_card1_wins(self): + """▼ when card1 is lower and lower_is_better.""" + assert _delta_arrow(2, 5, higher_is_better=False) == "▼" + + def test_grade_field_a_beats_b(self): + """▲ when card2 has grade A and card1 has grade B (grade_field=True).""" + assert _delta_arrow("B", "A", grade_field=True) == "▲" + + def test_grade_field_b_beats_c(self): + """▼ when card1 has grade B and card2 has grade C.""" + assert _delta_arrow("B", "C", grade_field=True) == "▼" + + def test_grade_field_tie(self): + """═ when both cards have the same grade.""" + assert _delta_arrow("C", "C", grade_field=True) == "═" + + def test_none_returns_equals(self): + """═ when either value is None (missing stat).""" + assert _delta_arrow(None, 10) == "═" + assert _delta_arrow(10, None) == "═" + + +# --------------------------------------------------------------------------- +# _is_pitcher +# --------------------------------------------------------------------------- + + +class TestIsPitcher: + """_is_pitcher correctly classifies positions.""" + + def test_sp_is_pitcher(self): + assert _is_pitcher({"pos_1": "SP"}) is True + + def test_rp_is_pitcher(self): + assert _is_pitcher({"pos_1": "RP"}) is True + + def test_cf_is_batter(self): + assert _is_pitcher({"pos_1": "CF"}) is False + + def test_dh_is_batter(self): + assert _is_pitcher({"pos_1": "DH"}) is False + + def test_lowercase_sp(self): + """pos_1 comparison is case-insensitive.""" + assert _is_pitcher({"pos_1": "sp"}) is True + + +# --------------------------------------------------------------------------- +# build_compare_embed — batter path +# --------------------------------------------------------------------------- + + +class TestBuildCompareEmbedBatters: + """build_compare_embed works correctly for two batter cards.""" + + def setup_method(self): + """Create two distinct batter cards for comparison.""" + self.player1 = _make_player("Mike Trout", pos_1="CF", cost=500, rarity_value=5) + self.player2 = _make_player("Joe Batter", pos_1="LF", cost=200, rarity_value=2) + + self.bc1 = _make_batting_card( + running=15, steal_low=9, steal_high=14, bunting="B", hit_and_run="A" + ) + self.bc2 = _make_batting_card( + running=10, steal_low=5, steal_high=10, bunting="C", hit_and_run="C" + ) + + self.card1 = _batter_card_data(self.player1, self.bc1) + self.card2 = _batter_card_data(self.player2, self.bc2) + + def test_embed_is_discord_embed(self): + """Return value is a discord.Embed instance.""" + embed = build_compare_embed(self.card1, self.card2, "Mike Trout", "Joe Batter") + assert isinstance(embed, discord.Embed) + + def test_embed_title_contains_comparison(self): + """Embed title identifies this as a card comparison.""" + embed = build_compare_embed(self.card1, self.card2, "Mike Trout", "Joe Batter") + assert "Comparison" in embed.title + + def test_embed_field_count(self): + """ + Batter embed has header row (3 fields) + 7 stat rows × 3 fields = 24 + total inline fields. + """ + embed = build_compare_embed(self.card1, self.card2, "Mike Trout", "Joe Batter") + assert len(embed.fields) == 3 + 7 * 3 # 24 + + def test_higher_cost_gets_down_arrow_in_center_column(self): + """ + card1 has higher cost (500 vs 200). The center arrow field for + 'Cost (Overall)' should be '▼' (card1 wins when higher_is_better). + """ + embed = build_compare_embed(self.card1, self.card2, "Mike Trout", "Joe Batter") + # First stat row starts at field index 3; center field of each row is idx+1 + cost_arrow_field = embed.fields[4] # index 3=left, 4=center, 5=right + assert cost_arrow_field.value == "▼" + + def test_higher_running_gets_down_arrow(self): + """ + card1 running=15 > card2 running=10 → center arrow for Running is ▼. + Running is the 3rd stat (header row at 0..2, cost at 3..5, rarity at + 6..8, running at 9..11 → center = index 10). + """ + embed = build_compare_embed(self.card1, self.card2, "Mike Trout", "Joe Batter") + running_arrow = embed.fields[10] + assert running_arrow.value == "▼" + + def test_better_grade_card1_bunting_b_beats_c(self): + """ + card1 bunting='B' beats card2 bunting='C'. + Bunting is the 6th stat, center field at index 3 + 5*3 + 1 = 19. + Arrow should be ▼ (card1 wins). + """ + embed = build_compare_embed(self.card1, self.card2, "Mike Trout", "Joe Batter") + bunt_arrow = embed.fields[19] + assert bunt_arrow.value == "▼" + + def test_type_mismatch_raises(self): + """CompareMismatchError raised when one card is batter and other pitcher.""" + pitcher_player = _make_player("Max Scherzer", pos_1="SP", cost=400) + pitcher_card = _make_pitching_card(starter_rating=9) + card_p = _pitcher_card_data(pitcher_player, pitcher_card) + + with pytest.raises(CompareMismatchError, match="Card types differ"): + build_compare_embed(self.card1, card_p, "Mike Trout", "Max Scherzer") + + +# --------------------------------------------------------------------------- +# build_compare_embed — pitcher path +# --------------------------------------------------------------------------- + + +class TestBuildCompareEmbedPitchers: + """build_compare_embed works correctly for two pitcher cards.""" + + def setup_method(self): + self.player1 = _make_player( + "Max Scherzer", pos_1="SP", cost=450, rarity_value=4 + ) + self.player2 = _make_player("Bullpen Bob", pos_1="RP", cost=150, rarity_value=2) + + self.pc1 = _make_pitching_card( + starter_rating=9, relief_rating=5, closer_rating=None, balk=1, wild_pitch=2 + ) + self.pc2 = _make_pitching_card( + starter_rating=3, relief_rating=8, closer_rating=6, balk=3, wild_pitch=5 + ) + + self.card1 = _pitcher_card_data(self.player1, self.pc1) + self.card2 = _pitcher_card_data(self.player2, self.pc2) + + def test_embed_field_count_pitchers(self): + """ + Pitcher embed has header row (3 fields) + 7 stat rows × 3 fields = 24. + """ + embed = build_compare_embed( + self.card1, self.card2, "Max Scherzer", "Bullpen Bob" + ) + assert len(embed.fields) == 3 + 7 * 3 # 24 + + def test_description_labels_pitchers(self): + """Embed description identifies card type as Pitchers.""" + embed = build_compare_embed( + self.card1, self.card2, "Max Scherzer", "Bullpen Bob" + ) + assert "Pitchers" in embed.description + + def test_starter_rating_card1_wins(self): + """ + card1 starter_rating=9 > card2 starter_rating=3 → arrow ▼ (card1 wins). + Starter Rating is 3rd stat, center field at index 3 + 2*3 + 1 = 10. + """ + embed = build_compare_embed( + self.card1, self.card2, "Max Scherzer", "Bullpen Bob" + ) + starter_arrow = embed.fields[10] + assert starter_arrow.value == "▼" + + def test_relief_rating_card2_wins(self): + """ + card2 relief_rating=8 > card1 relief_rating=5 → arrow ▲ (card2 wins). + Relief Rating is 4th stat, center field at index 3 + 3*3 + 1 = 13. + """ + embed = build_compare_embed( + self.card1, self.card2, "Max Scherzer", "Bullpen Bob" + ) + relief_arrow = embed.fields[13] + assert relief_arrow.value == "▲" + + def test_lower_balk_is_better(self): + """ + card1 balk=1 < card2 balk=3 → lower is better → arrow ▼ (card1 wins). + Balk is 6th stat, center field at index 3 + 5*3 + 1 = 19. + """ + embed = build_compare_embed( + self.card1, self.card2, "Max Scherzer", "Bullpen Bob" + ) + balk_arrow = embed.fields[19] + assert balk_arrow.value == "▼" + + def test_tied_stat_shows_equals(self): + """═ when both pitchers have the same starter_rating.""" + pc_tied = _make_pitching_card(starter_rating=9) + card_tied = _pitcher_card_data(self.player2, pc_tied) + embed = build_compare_embed( + self.card1, card_tied, "Max Scherzer", "Bullpen Bob" + ) + starter_arrow = embed.fields[10] + assert starter_arrow.value == "═" + + def test_type_mismatch_pitcher_vs_batter_raises(self): + """CompareMismatchError raised when pitcher compared to batter.""" + batter_player = _make_player("Speedy Guy", pos_1="CF") + batter_card_data = _batter_card_data(batter_player, _make_batting_card()) + + with pytest.raises(CompareMismatchError): + build_compare_embed( + self.card1, batter_card_data, "Max Scherzer", "Speedy Guy" + ) + + +# --------------------------------------------------------------------------- +# build_compare_embed — edge cases +# --------------------------------------------------------------------------- + + +class TestBuildCompareEmbedEdgeCases: + """Edge cases: missing data, None stats, same card compared to itself.""" + + def test_missing_batting_card_graceful(self): + """ + When battingcard is None, stat values display as '—' and arrows show ═. + No exception should be raised. + """ + player = _make_player("Player A", pos_1="1B") + card1 = {"player": player, "battingcard": None, "pitchingcard": None} + card2 = {"player": player, "battingcard": None, "pitchingcard": None} + # Should not raise + embed = build_compare_embed(card1, card2, "Player A", "Player A") + assert isinstance(embed, discord.Embed) + + def test_same_card_all_tied(self): + """Comparing a card against itself should show ═ on every arrow field.""" + player = _make_player("Clone", pos_1="CF", cost=300) + bc = _make_batting_card(running=12) + card_data = _batter_card_data(player, bc) + + embed = build_compare_embed(card_data, card_data, "Clone", "Clone") + # Arrow fields are at positions 4, 7, 10, 13, 16, 19, 22 (center of each row) + arrow_indices = [4, 7, 10, 13, 16, 19, 22] + for idx in arrow_indices: + assert embed.fields[idx].value == "═", ( + f"Expected ═ at field index {idx}, " + f"got {embed.fields[idx].value!r} (name={embed.fields[idx].name!r})" + ) diff --git a/tests/test_gauntlet_recap.py b/tests/test_gauntlet_recap.py new file mode 100644 index 0000000..0324440 --- /dev/null +++ b/tests/test_gauntlet_recap.py @@ -0,0 +1,315 @@ +""" +Tests for the gauntlet completion recap embed (Roadmap 2.4a). + +These tests verify: + 1. build_gauntlet_recap_embed produces an embed with the required fields + (title, champion, record, prize distribution) for a completed run. + 2. The embed title always starts with "Gauntlet Complete:". + 3. The champion field contains the team name and, when gmid is present, + a Discord user mention (<@gmid>). + 4. The prize distribution marks earned rewards with ✅ and unearned with ❌ + or ⬜ depending on whether losses were exceeded. + 5. post_gauntlet_recap is a no-op when channel is None (graceful fallback). + 6. post_gauntlet_recap calls channel.send exactly once on success. + 7. post_gauntlet_recap does not raise when channel.send raises — gauntlet + completion must never be interrupted by a recap failure. + +The builder is a pure synchronous function so tests do not require an async +event loop; only the async sender tests use pytest-asyncio. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +import discord + +from gauntlets import build_gauntlet_recap_embed, post_gauntlet_recap + + +# --------------------------------------------------------------------------- +# Test data helpers +# --------------------------------------------------------------------------- + + +def make_run(wins: int = 10, losses: int = 1, run_id: int = 42) -> dict: + """Return a minimal gauntletruns API dict for a completed run.""" + return { + "id": run_id, + "wins": wins, + "losses": losses, + "gauntlet": {"id": 9, "name": "2005 Live"}, + "team": {"lname": "Gauntlet-NCB", "logo": None}, + } + + +def make_event(event_id: int = 9, name: str = "2005 Live") -> dict: + """Return a minimal events API dict.""" + return { + "id": event_id, + "name": name, + "url": "https://example.com/gauntlet.png", + "short_desc": "Go 10-0!", + } + + +def make_main_team(gmid: int = 123456789, lname: str = "Normal CornBelters") -> dict: + """Return a minimal teams API dict for the player's real team.""" + return { + "id": 31, + "lname": lname, + "gmid": gmid, + "logo": "https://example.com/logo.png", + } + + +def make_rewards() -> list[dict]: + """Return a representative gauntletrewards list for a gauntlet.""" + return [ + { + "win_num": 3, + "loss_max": 2, + "reward": {"money": 500, "player": None, "pack_type": None}, + }, + { + "win_num": 7, + "loss_max": 2, + "reward": { + "money": None, + "player": None, + "pack_type": {"id": 1, "name": "Standard"}, + }, + }, + { + "win_num": 10, + "loss_max": 0, + "reward": { + "money": None, + "player": { + "player_id": 99, + "description": "Babe Ruth HoF", + }, + "pack_type": None, + }, + }, + ] + + +# --------------------------------------------------------------------------- +# Unit: build_gauntlet_recap_embed +# --------------------------------------------------------------------------- + + +class TestBuildGauntletRecapEmbed: + """Verify the embed produced by the synchronous builder function.""" + + def test_title_starts_with_gauntlet_complete(self): + """Title must start with 'Gauntlet Complete:' followed by the event name. + + This is the canonical format expected by the PO spec and makes the + embed immediately recognisable in the channel feed. + """ + embed = build_gauntlet_recap_embed( + make_run(), make_event(), make_main_team(), make_rewards() + ) + assert embed.title.startswith("Gauntlet Complete:") + assert "2005 Live" in embed.title + + def test_embed_colour_is_gold(self): + """Embed colour must be the gold/champion accent (0xFFD700). + + Gold is the PO-specified accent for champion-level events. + """ + embed = build_gauntlet_recap_embed( + make_run(), make_event(), make_main_team(), make_rewards() + ) + assert embed.color.value == 0xFFD700 + + def test_champion_field_contains_team_name(self): + """The Champion field must display the team's long name. + + Players identify with their team name, not the gauntlet draft copy. + """ + embed = build_gauntlet_recap_embed( + make_run(), + make_event(), + make_main_team(lname="Normal CornBelters"), + make_rewards(), + ) + champion_field = next((f for f in embed.fields if f.name == "Champion"), None) + assert champion_field is not None, "Expected a 'Champion' embed field" + assert "Normal CornBelters" in champion_field.value + + def test_champion_field_contains_user_mention_when_gmid_present(self): + """When gmid is set, the Champion field must include a Discord user mention. + + The mention (<@gmid>) creates social validation — the winner is pinged + in the channel where they completed the gauntlet. + """ + embed = build_gauntlet_recap_embed( + make_run(), make_event(), make_main_team(gmid=987654321), make_rewards() + ) + champion_field = next(f for f in embed.fields if f.name == "Champion") + assert "<@987654321>" in champion_field.value + + def test_champion_field_omits_mention_when_gmid_is_none(self): + """When gmid is None the embed must not include any mention syntax. + + Some legacy records or AI teams may not have a Discord user ID. + The embed must still be valid without one. + """ + team = make_main_team() + team["gmid"] = None + embed = build_gauntlet_recap_embed( + make_run(), make_event(), team, make_rewards() + ) + champion_field = next(f for f in embed.fields if f.name == "Champion") + assert "<@" not in champion_field.value + + def test_final_record_field_present(self): + """Final Record field must show wins-losses for the completed run.""" + embed = build_gauntlet_recap_embed( + make_run(wins=10, losses=1), make_event(), make_main_team(), make_rewards() + ) + record_field = next((f for f in embed.fields if f.name == "Final Record"), None) + assert record_field is not None, "Expected a 'Final Record' field" + assert "10" in record_field.value + assert "1" in record_field.value + + def test_prize_distribution_field_present(self): + """Prize Distribution field must be present when rewards are provided.""" + embed = build_gauntlet_recap_embed( + make_run(), make_event(), make_main_team(), make_rewards() + ) + prize_field = next( + (f for f in embed.fields if f.name == "Prize Distribution"), None + ) + assert prize_field is not None, "Expected a 'Prize Distribution' field" + + def test_earned_rewards_marked_with_checkmark(self): + """Rewards that were earned (wins >= threshold and losses within limit) + must be marked with ✅. + + A 10-1 run earns the 3-win and 7-win milestones but not the 10-0 bonus. + """ + embed = build_gauntlet_recap_embed( + make_run(wins=10, losses=1), make_event(), make_main_team(), make_rewards() + ) + prize_field = next(f for f in embed.fields if f.name == "Prize Distribution") + # 3-win (loss_max=2, losses=1) → earned + assert "✅" in prize_field.value + + def test_unearned_perfect_bonus_marked_correctly(self): + """The 10-0 bonus reward must NOT be marked earned when losses > 0.""" + embed = build_gauntlet_recap_embed( + make_run(wins=10, losses=1), make_event(), make_main_team(), make_rewards() + ) + prize_field = next(f for f in embed.fields if f.name == "Prize Distribution") + # The 10-0 bonus line must be marked ❌ — ineligible, not pending (⬜) + lines = prize_field.value.split("\n") + bonus_line = next((line for line in lines if "10-0" in line), None) + assert bonus_line is not None, "Expected a '10-0' line in prizes" + assert "❌" in bonus_line + + def test_empty_rewards_list_omits_prize_field(self): + """When rewards is an empty list the Prize Distribution field must be omitted. + + Some event types may not have configured rewards; the embed must + still be valid and informative without a prize table. + """ + embed = build_gauntlet_recap_embed( + make_run(), make_event(), make_main_team(), [] + ) + prize_field = next( + (f for f in embed.fields if f.name == "Prize Distribution"), None + ) + assert prize_field is None + + +# --------------------------------------------------------------------------- +# Async: post_gauntlet_recap +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_post_gauntlet_recap_sends_embed_on_success(): + """When all inputs are valid post_gauntlet_recap sends exactly one embed. + + This confirms the function wires build_gauntlet_recap_embed → channel.send + correctly and doesn't double-post. + """ + channel = AsyncMock() + all_rewards_response = {"rewards": make_rewards()} + + with patch( + "gauntlets.db_get", new_callable=AsyncMock, return_value=all_rewards_response + ): + await post_gauntlet_recap(make_run(), make_event(), make_main_team(), channel) + + channel.send.assert_called_once() + # Verify an embed was passed (not just plain text) + kwargs = channel.send.call_args.kwargs + assert "embed" in kwargs + assert isinstance(kwargs["embed"], discord.Embed) + + +@pytest.mark.asyncio +async def test_post_gauntlet_recap_noop_when_channel_is_none(): + """When channel is None post_gauntlet_recap must return without raising. + + The gauntlet channel may be unavailable (deleted, bot lost permissions, + or not set in the record). The completion flow must never fail due to + a missing recap channel. + """ + # No channel.send to assert on — just ensure no exception is raised + with patch( + "gauntlets.db_get", new_callable=AsyncMock, return_value={"rewards": []} + ): + try: + await post_gauntlet_recap(make_run(), make_event(), make_main_team(), None) + except Exception as exc: + pytest.fail(f"post_gauntlet_recap raised with None channel: {exc}") + + +@pytest.mark.asyncio +async def test_post_gauntlet_recap_nonfatal_when_channel_send_raises(): + """A channel.send failure must not propagate out of post_gauntlet_recap. + + The gauntlet run is already complete when the recap fires; a Discord API + error (rate limit, permissions revoked) must not corrupt the game state. + """ + channel = AsyncMock() + channel.send.side_effect = Exception("Discord API error") + all_rewards_response = {"rewards": make_rewards()} + + with patch( + "gauntlets.db_get", new_callable=AsyncMock, return_value=all_rewards_response + ): + try: + await post_gauntlet_recap( + make_run(), make_event(), make_main_team(), channel + ) + except Exception as exc: + pytest.fail(f"post_gauntlet_recap raised when channel.send failed: {exc}") + + +@pytest.mark.asyncio +async def test_post_gauntlet_recap_nonfatal_when_db_get_raises(): + """A db_get failure inside post_gauntlet_recap must not propagate. + + If the rewards endpoint is unavailable the recap silently skips rather + than crashing the completion flow. + """ + channel = AsyncMock() + + with patch( + "gauntlets.db_get", new_callable=AsyncMock, side_effect=Exception("API down") + ): + try: + await post_gauntlet_recap( + make_run(), make_event(), make_main_team(), channel + ) + except Exception as exc: + pytest.fail(f"post_gauntlet_recap raised when db_get failed: {exc}") + + # channel.send should NOT have been called since the error happened before it + channel.send.assert_not_called()