187 lines
7.8 KiB
Python
187 lines
7.8 KiB
Python
# Fixed Standings and Records Module
|
|
# Corrected to match original players.py business requirements
|
|
|
|
from discord.ext import commands
|
|
from discord import app_commands
|
|
import discord
|
|
import math
|
|
from typing import Optional, Literal
|
|
from datetime import datetime, timedelta
|
|
|
|
# Import specific utilities needed by this module
|
|
import logging
|
|
from sqlmodel import Session
|
|
from in_game.gameplay_queries import get_team_or_none
|
|
from in_game.gameplay_models import Play, engine
|
|
from api_calls import db_get, get_team_by_abbrev
|
|
from helpers import (
|
|
PD_PLAYERS_ROLE_NAME, get_team_embed, get_team_by_owner, legal_channel, embed_pagination
|
|
)
|
|
# Import shared utility functions
|
|
from .shared_utils import get_ai_records, get_record_embed, get_record_embed_legacy
|
|
|
|
logger = logging.getLogger('discord_app')
|
|
|
|
|
|
class StandingsRecords(commands.Cog):
|
|
"""Standings and game records functionality for Paper Dynasty."""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
|
|
@app_commands.command(name='record', description='Display team record against AI teams')
|
|
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
|
async def record_slash_command(
|
|
self, interaction: discord.Interaction,
|
|
league: Literal['All', 'Minor League', 'Major League', 'Flashback', 'Hall of Fame'],
|
|
team_abbrev: Optional[str] = None):
|
|
"""Display team record against AI teams with proper pagination and data formatting."""
|
|
|
|
# Handle ephemeral messaging like original
|
|
ephemeral = False
|
|
if interaction.channel.name in ['paper-dynasty-chat', 'pd-news-ticker']:
|
|
ephemeral = True
|
|
|
|
# Get team data - match original logic exactly
|
|
if team_abbrev:
|
|
team = await get_team_by_abbrev(team_abbrev)
|
|
if not team:
|
|
await interaction.response.send_message(
|
|
f'Hmm...I can\'t find the team you looking for.', ephemeral=ephemeral
|
|
)
|
|
return
|
|
else:
|
|
team = await get_team_by_owner(interaction.user.id)
|
|
if not team:
|
|
await interaction.response.send_message(
|
|
f'Hmm...I can\'t find the team you looking for.', ephemeral=ephemeral
|
|
)
|
|
return
|
|
|
|
current = await db_get('current')
|
|
|
|
await interaction.response.send_message(
|
|
f'I\'m tallying the {team["lname"]} results now...', ephemeral=ephemeral
|
|
)
|
|
|
|
# Use the same API endpoint as original
|
|
st_query = await db_get(f'teams/{team["id"]}/season-record', object_id=current["season"])
|
|
|
|
# Create embeds using original format and data structure
|
|
minor_embed = get_record_embed(team, st_query['minor-league'], 'Minor League')
|
|
major_embed = get_record_embed(team, st_query['major-league'], 'Major League')
|
|
flashback_embed = get_record_embed(team, st_query['flashback'], 'Flashback')
|
|
hof_embed = get_record_embed(team, st_query['hall-of-fame'], 'Hall of Fame')
|
|
|
|
# Set starting page based on league parameter - exact match to original
|
|
if league == 'All':
|
|
start_page = 0
|
|
elif league == 'Minor League':
|
|
start_page = 0
|
|
elif league == 'Major League':
|
|
start_page = 1
|
|
elif league == 'Flashback':
|
|
start_page = 2
|
|
else:
|
|
start_page = 3
|
|
|
|
await interaction.edit_original_response(content=f'Here are the {team["lname"]} campaign records')
|
|
|
|
# Use embed pagination exactly like original
|
|
await embed_pagination(
|
|
[minor_embed, major_embed, flashback_embed, hof_embed],
|
|
interaction.channel,
|
|
interaction.user,
|
|
timeout=20,
|
|
start_page=start_page
|
|
)
|
|
|
|
@commands.hybrid_command(name='standings', help='Check weekly or season-long standings')
|
|
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
|
|
@commands.check(legal_channel)
|
|
async def standings_command(self, ctx: commands.Context, which: Literal['week', 'season']):
|
|
"""Display league standings with proper data source and formatting."""
|
|
|
|
# Use same data source as original
|
|
current = await db_get('current')
|
|
params = [('season', current['season']), ('ranked', True)]
|
|
|
|
if which == 'week':
|
|
params.append(('week', current['week']))
|
|
|
|
r_query = await db_get('results', params=params)
|
|
if not r_query['count']:
|
|
await ctx.send(f'There are no Ranked games on record this {"week" if which == "week" else "season"}.')
|
|
return
|
|
|
|
# Calculate records using original algorithm
|
|
all_records = {}
|
|
for line in r_query['results']:
|
|
home_win = True if line['home_score'] > line['away_score'] else False
|
|
|
|
# Away team logic - exact match to original
|
|
if line['away_team']['id'] not in all_records:
|
|
all_records[line['away_team']['id']] = {
|
|
'wins': 1 if not home_win else 0,
|
|
'losses': 1 if home_win else 0,
|
|
'points': 2 if not home_win else 1
|
|
}
|
|
else:
|
|
all_records[line['away_team']['id']]['wins'] += 1 if not home_win else 0
|
|
all_records[line['away_team']['id']]['losses'] += 1 if home_win else 0
|
|
all_records[line['away_team']['id']]['points'] += 2 if not home_win else 1
|
|
|
|
# Home team logic - exact match to original
|
|
if line['home_team']['id'] not in all_records:
|
|
all_records[line['home_team']['id']] = {
|
|
'wins': 1 if home_win else 0,
|
|
'losses': 1 if not home_win else 0,
|
|
'points': 2 if home_win else 1
|
|
}
|
|
else:
|
|
all_records[line['home_team']['id']]['wins'] += 1 if home_win else 0
|
|
all_records[line['home_team']['id']]['losses'] += 1 if not home_win else 0
|
|
all_records[line['home_team']['id']]['points'] += 2 if home_win else 1
|
|
|
|
# Sort exactly like original
|
|
sorted_records = sorted(all_records.items(), key=lambda k_v: k_v[1]['points'], reverse=True)
|
|
|
|
# Create embed with original format
|
|
embed = get_team_embed(
|
|
title=f'{"Season" if which == "season" else "Week"} '
|
|
f'{current["season"] if which == "season" else current["week"]} Standings'
|
|
)
|
|
|
|
# Build standings display with chunking like original
|
|
chunk_string = ''
|
|
for index, record in enumerate(sorted_records):
|
|
# Get team data like original
|
|
team = await db_get('teams', object_id=record[0])
|
|
if team:
|
|
chunk_string += f'{record[1]["points"]} pt{"s" if record[1]["points"] != 1 else ""} ' \
|
|
f'({record[1]["wins"]}-{record[1]["losses"]}) - {team["sname"]} [{team["ranking"]}]\n'
|
|
else:
|
|
logger.error(f'Could not find team {record[0]} when running standings.')
|
|
|
|
# Handle chunking exactly like original
|
|
if (index + 1) == len(sorted_records):
|
|
embed.add_field(
|
|
name=f'Group {math.ceil((index + 1) / 20)} / '
|
|
f'{math.ceil(len(sorted_records) / 20)}',
|
|
value=chunk_string
|
|
)
|
|
chunk_string = '' # Reset for next chunk
|
|
elif (index + 1) % 20 == 0:
|
|
embed.add_field(
|
|
name=f'Group {math.ceil((index + 1) / 20)} / '
|
|
f'{math.floor(len(sorted_records) / 20)}',
|
|
value=chunk_string
|
|
)
|
|
chunk_string = '' # Reset for next chunk
|
|
|
|
await ctx.send(content=None, embed=embed)
|
|
|
|
|
|
async def setup(bot):
|
|
"""Setup function for the StandingsRecords cog."""
|
|
await bot.add_cog(StandingsRecords(bot)) |