Merge branch 'main' into ai/paper-dynasty-database#77
All checks were successful
Ruff Lint / lint (pull_request) Successful in 16s

This commit is contained in:
cal 2026-03-23 20:05:29 +00:00
commit 3ce5aebc57
20 changed files with 5062 additions and 5426 deletions

View File

@ -1,21 +1,18 @@
# Gitea Actions: Docker Build, Push, and Notify
#
# CI/CD pipeline for Paper Dynasty Discord Bot:
# - Builds Docker images on every push/PR
# - Auto-generates CalVer version (YYYY.MM.BUILD) on main branch merges
# - Pushes to Docker Hub and creates git tag on main
# - Triggered by pushing a CalVer tag (e.g., 2026.3.11)
# - Builds Docker image and pushes to Docker Hub with version + production tags
# - Sends Discord notifications on success/failure
#
# To release: git tag 2026.3.11 && git push origin 2026.3.11
name: Build Docker Image
on:
push:
branches:
- main
- next-release
pull_request:
branches:
- main
tags:
- '20*' # matches CalVer tags like 2026.3.11
jobs:
build:
@ -25,7 +22,16 @@ jobs:
- name: Checkout code
uses: https://github.com/actions/checkout@v4
with:
fetch-depth: 0 # Full history for tag counting
fetch-depth: 0
- name: Extract version from tag
id: version
run: |
VERSION=${GITHUB_REF#refs/tags/}
SHA_SHORT=$(git rev-parse --short HEAD)
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "sha_short=$SHA_SHORT" >> $GITHUB_OUTPUT
echo "timestamp=$(date -u +%Y-%m-%dT%H:%M:%SZ)" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: https://github.com/docker/setup-buildx-action@v3
@ -36,67 +42,47 @@ jobs:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Generate CalVer version
id: calver
uses: cal/gitea-actions/calver@main
- name: Resolve Docker tags
id: tags
uses: cal/gitea-actions/docker-tags@main
with:
image: manticorum67/paper-dynasty-discordapp
version: ${{ steps.calver.outputs.version }}
sha_short: ${{ steps.calver.outputs.sha_short }}
- name: Build and push Docker image
uses: https://github.com/docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.tags.outputs.tags }}
tags: |
manticorum67/paper-dynasty-discordapp:${{ steps.version.outputs.version }}
manticorum67/paper-dynasty-discordapp:production
cache-from: type=registry,ref=manticorum67/paper-dynasty-discordapp:buildcache
cache-to: type=registry,ref=manticorum67/paper-dynasty-discordapp:buildcache,mode=max
- name: Tag release
if: success() && steps.tags.outputs.channel == 'stable'
uses: cal/gitea-actions/gitea-tag@main
with:
version: ${{ steps.calver.outputs.version }}
token: ${{ github.token }}
- name: Build Summary
run: |
echo "## Docker Build Successful" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Channel:** \`${{ steps.tags.outputs.channel }}\`" >> $GITHUB_STEP_SUMMARY
echo "**Version:** \`${{ steps.version.outputs.version }}\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Image Tags:**" >> $GITHUB_STEP_SUMMARY
IFS=',' read -ra TAG_ARRAY <<< "${{ steps.tags.outputs.tags }}"
for tag in "${TAG_ARRAY[@]}"; do
echo "- \`${tag}\`" >> $GITHUB_STEP_SUMMARY
done
echo "- \`manticorum67/paper-dynasty-discordapp:${{ steps.version.outputs.version }}\`" >> $GITHUB_STEP_SUMMARY
echo "- \`manticorum67/paper-dynasty-discordapp:production\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Build Details:**" >> $GITHUB_STEP_SUMMARY
echo "- Branch: \`${{ steps.calver.outputs.branch }}\`" >> $GITHUB_STEP_SUMMARY
echo "- Commit: \`${{ github.sha }}\`" >> $GITHUB_STEP_SUMMARY
echo "- Timestamp: \`${{ steps.calver.outputs.timestamp }}\`" >> $GITHUB_STEP_SUMMARY
echo "- Commit: \`${{ steps.version.outputs.sha_short }}\`" >> $GITHUB_STEP_SUMMARY
echo "- Timestamp: \`${{ steps.version.outputs.timestamp }}\`" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Pull with: \`docker pull manticorum67/paper-dynasty-discordapp:${{ steps.tags.outputs.primary_tag }}\`" >> $GITHUB_STEP_SUMMARY
echo "Pull with: \`docker pull manticorum67/paper-dynasty-discordapp:${{ steps.version.outputs.version }}\`" >> $GITHUB_STEP_SUMMARY
- name: Discord Notification - Success
if: success() && steps.tags.outputs.channel != 'dev'
if: success()
uses: cal/gitea-actions/discord-notify@main
with:
webhook_url: ${{ secrets.DISCORD_WEBHOOK }}
title: "Paper Dynasty Bot"
status: success
version: ${{ steps.calver.outputs.version }}
image_tag: ${{ steps.calver.outputs.version_sha }}
commit_sha: ${{ steps.calver.outputs.sha_short }}
timestamp: ${{ steps.calver.outputs.timestamp }}
version: ${{ steps.version.outputs.version }}
image_tag: ${{ steps.version.outputs.version }}
commit_sha: ${{ steps.version.outputs.sha_short }}
timestamp: ${{ steps.version.outputs.timestamp }}
- name: Discord Notification - Failure
if: failure() && steps.tags.outputs.channel != 'dev'
if: failure()
uses: cal/gitea-actions/discord-notify@main
with:
webhook_url: ${{ secrets.DISCORD_WEBHOOK }}

View File

@ -0,0 +1,31 @@
# Gitea Actions: Ruff Lint Check
#
# Runs ruff on every PR to main to catch violations before merge.
# Complements the local pre-commit hook — violations blocked here even if
# the developer bypassed the hook with --no-verify.
name: Ruff Lint
on:
pull_request:
branches:
- main
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: https://github.com/actions/checkout@v4
- name: Set up Python
uses: https://github.com/actions/setup-python@v5
with:
python-version: "3.12"
- name: Install ruff
run: pip install ruff
- name: Run ruff check
run: ruff check .

View File

@ -32,7 +32,7 @@ pip install -r requirements.txt # Install dependencies
- **Container**: `paper-dynasty_discord-app_1`
- **Image**: `manticorum67/paper-dynasty-discordapp`
- **Health**: `GET http://localhost:8080/health` (HTTP server in `health_server.py`)
- **Versioning**: CalVer (`YYYY.MM.BUILD`) — auto-generated on merge to `main`
- **Versioning**: CalVer (`YYYY.M.BUILD`) — manually tagged when ready to release
### Logs
- **Container logs**: `ssh sba-bots "docker logs --since 1h paper-dynasty_discord-app_1"`
@ -49,8 +49,9 @@ pip install -r requirements.txt # Install dependencies
- Health endpoint not responding → `health_server.py` runs on port 8080 inside the container
### CI/CD
Gitea Actions on PR to `main` — builds Docker image, auto-generates CalVer version on merge.
Ruff lint on PRs. Docker image built on CalVer tag push only.
```bash
# Release: git tag YYYY.M.BUILD && git push origin YYYY.M.BUILD
tea pulls create --repo cal/paper-dynasty --head <branch> --base main --title "title" --description "description"
```

View File

@ -17,7 +17,6 @@ DB_URL = (
if "prod" in ENV_DATABASE
else "https://pddev.manticorum.com/api"
)
PLAYER_CACHE = {}
logger = logging.getLogger("discord_app")

File diff suppressed because it is too large Load Diff

View File

@ -15,131 +15,161 @@ from api_calls import db_get, db_post, db_patch, db_delete, get_team_by_abbrev
from help_text import SHEET_SHARE_STEPS, HELP_SHEET_SCRIPTS
from helpers.constants import PD_PLAYERS, ALL_MLB_TEAMS
from helpers import (
get_team_by_owner, share_channel, get_role, get_cal_user, get_or_create_role,
display_cards, give_packs, get_all_pos, get_sheets, refresh_sheet,
post_ratings_guide, team_summary_embed, get_roster_sheet, Question, Confirm,
ButtonOptions, legal_channel, get_channel, create_channel, get_context_user
get_team_by_owner,
share_channel,
get_role,
get_cal_user,
get_or_create_role,
display_cards,
give_packs,
get_all_pos,
get_sheets,
refresh_sheet,
post_ratings_guide,
team_summary_embed,
get_roster_sheet,
Question,
Confirm,
ButtonOptions,
legal_channel,
get_channel,
create_channel,
get_context_user,
)
from api_calls import team_hash
from helpers.discord_utils import get_team_embed, send_to_channel
logger = logging.getLogger('discord_app')
logger = logging.getLogger("discord_app")
class TeamSetup(commands.Cog):
"""Team creation and Google Sheets integration functionality for Paper Dynasty."""
def __init__(self, bot):
self.bot = bot
@app_commands.command(name='newteam', description='Get your fresh team for a new season')
@app_commands.command(
name="newteam", description="Get your fresh team for a new season"
)
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_commands.describe(
gm_name='The fictional name of your team\'s GM',
team_abbrev='2, 3, or 4 character abbreviation (e.g. WV, ATL, MAD)',
team_full_name='City/location and name (e.g. Baltimore Orioles)',
team_short_name='Name of team (e.g. Yankees)',
mlb_anchor_team='2 or 3 character abbreviation of your anchor MLB team (e.g. NYM, MKE)',
team_logo_url='[Optional] URL ending in .png or .jpg for your team logo',
color='[Optional] Hex color code to highlight your team'
gm_name="The fictional name of your team's GM",
team_abbrev="2, 3, or 4 character abbreviation (e.g. WV, ATL, MAD)",
team_full_name="City/location and name (e.g. Baltimore Orioles)",
team_short_name="Name of team (e.g. Yankees)",
mlb_anchor_team="2 or 3 character abbreviation of your anchor MLB team (e.g. NYM, MKE)",
team_logo_url="[Optional] URL ending in .png or .jpg for your team logo",
color="[Optional] Hex color code to highlight your team",
)
async def new_team_slash(
self, interaction: discord.Interaction, gm_name: str, team_abbrev: str, team_full_name: str,
team_short_name: str, mlb_anchor_team: str, team_logo_url: str = None, color: str = None):
self,
interaction: discord.Interaction,
gm_name: str,
team_abbrev: str,
team_full_name: str,
team_short_name: str,
mlb_anchor_team: str,
team_logo_url: str = None,
color: str = None,
):
owner_team = await get_team_by_owner(interaction.user.id)
current = await db_get('current')
current = await db_get("current")
# Check for existing team
if owner_team and not os.environ.get('TESTING'):
if owner_team and not os.environ.get("TESTING"):
await interaction.response.send_message(
f'Whoa there, bucko. I already have you down as GM of the {owner_team["sname"]}.'
f"Whoa there, bucko. I already have you down as GM of the {owner_team['sname']}."
)
return
# Check for duplicate team data
dupes = await db_get('teams', params=[('abbrev', team_abbrev)])
if dupes['count']:
dupes = await db_get("teams", params=[("abbrev", team_abbrev)])
if dupes["count"]:
await interaction.response.send_message(
f'Yikes! {team_abbrev.upper()} is a popular abbreviation - it\'s already in use by the '
f'{dupes["teams"][0]["sname"]}. No worries, though, you can run the `/newteam` command again to get '
f'started!'
f"Yikes! {team_abbrev.upper()} is a popular abbreviation - it's already in use by the "
f"{dupes['teams'][0]['sname']}. No worries, though, you can run the `/newteam` command again to get "
f"started!"
)
return
# Check for duplicate team data
dupes = await db_get('teams', params=[('lname', team_full_name)])
if dupes['count']:
dupes = await db_get("teams", params=[("lname", team_full_name)])
if dupes["count"]:
await interaction.response.send_message(
f'Yikes! {team_full_name.title()} is a popular name - it\'s already in use by '
f'{dupes["teams"][0]["abbrev"]}. No worries, though, you can run the `/newteam` command again to get '
f'started!'
f"Yikes! {team_full_name.title()} is a popular name - it's already in use by "
f"{dupes['teams'][0]['abbrev']}. No worries, though, you can run the `/newteam` command again to get "
f"started!"
)
return
# Get personal bot channel
hello_channel = discord.utils.get(
interaction.guild.text_channels,
name=f'hello-{interaction.user.name.lower()}'
name=f"hello-{interaction.user.name.lower()}",
)
if hello_channel:
op_ch = hello_channel
else:
op_ch = await create_channel(
interaction,
channel_name=f'hello-{interaction.user.name}',
category_name='Paper Dynasty Team',
channel_name=f"hello-{interaction.user.name}",
category_name="Paper Dynasty Team",
everyone_read=False,
read_send_members=[interaction.user]
read_send_members=[interaction.user],
)
await share_channel(op_ch, interaction.guild.me)
await share_channel(op_ch, interaction.user)
try:
poke_role = get_role(interaction, 'Pokétwo')
poke_role = get_role(interaction, "Pokétwo")
await share_channel(op_ch, poke_role, read_only=True)
except Exception as e:
logger.error(f'unable to share sheet with Poketwo')
logger.error(f"unable to share sheet with Poketwo")
await interaction.response.send_message(
f'Let\'s head down to your private channel: {op_ch.mention}',
ephemeral=True
f"Let's head down to your private channel: {op_ch.mention}", ephemeral=True
)
await op_ch.send(
f"Hey there, {interaction.user.mention}! I am Paper Domo - welcome to season "
f"{current['season']} of Paper Dynasty! We've got a lot of special updates in store for this "
f"season including live cards, throwback cards, and special events."
)
await op_ch.send(f'Hey there, {interaction.user.mention}! I am Paper Domo - welcome to season '
f'{current["season"]} of Paper Dynasty! We\'ve got a lot of special updates in store for this '
f'season including live cards, throwback cards, and special events.')
# Confirm user is happy with branding
embed = get_team_embed(
f'Branding Check',
f"Branding Check",
{
'logo': team_logo_url if team_logo_url else None,
'color': color if color else 'a6ce39',
'season': 4
}
"logo": team_logo_url if team_logo_url else None,
"color": color if color else "a6ce39",
"season": 4,
},
)
embed.add_field(name='GM Name', value=gm_name, inline=False)
embed.add_field(name='Full Team Name', value=team_full_name)
embed.add_field(name='Short Team Name', value=team_short_name)
embed.add_field(name='Team Abbrev', value=team_abbrev.upper())
embed.add_field(name="GM Name", value=gm_name, inline=False)
embed.add_field(name="Full Team Name", value=team_full_name)
embed.add_field(name="Short Team Name", value=team_short_name)
embed.add_field(name="Team Abbrev", value=team_abbrev.upper())
view = Confirm(responders=[interaction.user])
question = await op_ch.send('Are you happy with this branding? Don\'t worry - you can update it later!',
embed=embed, view=view)
question = await op_ch.send(
"Are you happy with this branding? Don't worry - you can update it later!",
embed=embed,
view=view,
)
await view.wait()
if not view.value:
await question.edit(
content='~~Are you happy with this branding?~~\n\nI gotta go, but when you\'re ready to start again '
'run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the '
'command from last time and make edits.',
view=None
content="~~Are you happy with this branding?~~\n\nI gotta go, but when you're ready to start again "
"run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the "
"command from last time and make edits.",
view=None,
)
return
await question.edit(
content='Looking good, champ in the making! Let\'s get you your starter team!',
view=None
content="Looking good, champ in the making! Let's get you your starter team!",
view=None,
)
team_choice = None
@ -147,26 +177,31 @@ class TeamSetup(commands.Cog):
team_choice = mlb_anchor_team.title()
else:
for x in ALL_MLB_TEAMS:
if mlb_anchor_team.upper() in ALL_MLB_TEAMS[x] or mlb_anchor_team.title() in ALL_MLB_TEAMS[x]:
if (
mlb_anchor_team.upper() in ALL_MLB_TEAMS[x]
or mlb_anchor_team.title() in ALL_MLB_TEAMS[x]
):
team_choice = x
break
team_string = mlb_anchor_team
logger.debug(f'team_string: {team_string} / team_choice: {team_choice}')
logger.debug(f"team_string: {team_string} / team_choice: {team_choice}")
if not team_choice:
# Get MLB anchor team
while True:
prompt = f'I don\'t recognize **{team_string}**. I try to recognize abbreviations (BAL), ' \
f'short names (Orioles), and long names ("Baltimore Orioles").\n\nWhat MLB club would you ' \
f'like to use as your anchor team?'
this_q = Question(self.bot, op_ch, prompt, 'text', 120)
prompt = (
f"I don't recognize **{team_string}**. I try to recognize abbreviations (BAL), "
f'short names (Orioles), and long names ("Baltimore Orioles").\n\nWhat MLB club would you '
f"like to use as your anchor team?"
)
this_q = Question(self.bot, op_ch, prompt, "text", 120)
team_string = await this_q.ask([interaction.user])
if not team_string:
await op_ch.send(
f'Tell you hwat. You think on it and come back I gotta go, but when you\'re ready to start again '
'run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the '
'command from last time and make edits.'
f"Tell you hwat. You think on it and come back I gotta go, but when you're ready to start again "
"run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the "
"command from last time and make edits."
)
return
@ -176,166 +211,257 @@ class TeamSetup(commands.Cog):
else:
match = False
for x in ALL_MLB_TEAMS:
if team_string.upper() in ALL_MLB_TEAMS[x] or team_string.title() in ALL_MLB_TEAMS[x]:
if (
team_string.upper() in ALL_MLB_TEAMS[x]
or team_string.title() in ALL_MLB_TEAMS[x]
):
team_choice = x
match = True
break
if not match:
await op_ch.send(f'Got it!')
await op_ch.send(f"Got it!")
team = await db_post('teams', payload={
'abbrev': team_abbrev.upper(),
'sname': team_short_name,
'lname': team_full_name,
'gmid': interaction.user.id,
'gmname': gm_name,
'gsheet': 'None',
'season': current['season'],
'wallet': 100,
'color': color if color else 'a6ce39',
'logo': team_logo_url if team_logo_url else None
})
team = await db_post(
"teams",
payload={
"abbrev": team_abbrev.upper(),
"sname": team_short_name,
"lname": team_full_name,
"gmid": interaction.user.id,
"gmname": gm_name,
"gsheet": "None",
"season": current["season"],
"wallet": 100,
"color": color if color else "a6ce39",
"logo": team_logo_url if team_logo_url else None,
},
)
if not team:
await op_ch.send(f'Frick. {get_cal_user(interaction).mention}, can you help? I can\'t find this team.')
await op_ch.send(
f"Frick. {get_cal_user(interaction).mention}, can you help? I can't find this team."
)
return
t_role = await get_or_create_role(interaction, f'{team_abbrev} - {team_full_name}')
t_role = await get_or_create_role(
interaction, f"{team_abbrev} - {team_full_name}"
)
await interaction.user.add_roles(t_role)
anchor_players = []
anchor_all_stars = await db_get(
'players/random',
"players/random",
params=[
('min_rarity', 3), ('max_rarity', 3), ('franchise', team_choice), ('pos_exclude', 'RP'), ('limit', 1),
('in_packs', True)
]
("min_rarity", 3),
("max_rarity", 3),
("franchise", team_choice),
("pos_exclude", "RP"),
("limit", 1),
("in_packs", True),
],
)
anchor_starters = await db_get(
'players/random',
"players/random",
params=[
('min_rarity', 2), ('max_rarity', 2), ('franchise', team_choice), ('pos_exclude', 'RP'), ('limit', 2),
('in_packs', True)
]
("min_rarity", 2),
("max_rarity", 2),
("franchise", team_choice),
("pos_exclude", "RP"),
("limit", 2),
("in_packs", True),
],
)
if not anchor_all_stars:
await op_ch.send(f'I am so sorry, but the {team_choice} do not have an All-Star to '
f'provide as your anchor player. Let\'s start this process over - will you please '
f'run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the '
'command from last time and make edits.')
await db_delete('teams', object_id=team['id'])
await op_ch.send(
f"I am so sorry, but the {team_choice} do not have an All-Star to "
f"provide as your anchor player. Let's start this process over - will you please "
f"run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the "
"command from last time and make edits."
)
await db_delete("teams", object_id=team["id"])
return
if not anchor_starters or anchor_starters['count'] <= 1:
await op_ch.send(f'I am so sorry, but the {team_choice} do not have two Starters to '
f'provide as your anchor players. Let\'s start this process over - will you please '
f'run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the '
'command from last time and make edits.')
await db_delete('teams', object_id=team['id'])
if not anchor_starters or anchor_starters["count"] <= 1:
await op_ch.send(
f"I am so sorry, but the {team_choice} do not have two Starters to "
f"provide as your anchor players. Let's start this process over - will you please "
f"run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the "
"command from last time and make edits."
)
await db_delete("teams", object_id=team["id"])
return
anchor_players.append(anchor_all_stars['players'][0])
anchor_players.append(anchor_starters['players'][0])
anchor_players.append(anchor_starters['players'][1])
anchor_players.append(anchor_all_stars["players"][0])
anchor_players.append(anchor_starters["players"][0])
anchor_players.append(anchor_starters["players"][1])
this_pack = await db_post('packs/one',
payload={'team_id': team['id'], 'pack_type_id': 2,
'open_time': datetime.datetime.timestamp(datetime.datetime.now())*1000})
this_pack = await db_post(
"packs/one",
payload={
"team_id": team["id"],
"pack_type_id": 2,
"open_time": datetime.datetime.timestamp(datetime.datetime.now())
* 1000,
},
)
roster_counts = {
'SP': 0,
'RP': 0,
'CP': 0,
'C': 0,
'1B': 0,
'2B': 0,
'3B': 0,
'SS': 0,
'LF': 0,
'CF': 0,
'RF': 0,
'DH': 0,
'All-Star': 0,
'Starter': 0,
'Reserve': 0,
'Replacement': 0,
"SP": 0,
"RP": 0,
"CP": 0,
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"All-Star": 0,
"Starter": 0,
"Reserve": 0,
"Replacement": 0,
}
def update_roster_counts(players: list):
for pl in players:
roster_counts[pl['rarity']['name']] += 1
roster_counts[pl["rarity"]["name"]] += 1
for x in get_all_pos(pl):
roster_counts[x] += 1
logger.warning(f'Roster counts for {team["sname"]}: {roster_counts}')
logger.warning(f"Roster counts for {team['sname']}: {roster_counts}")
# Add anchor position coverage
update_roster_counts(anchor_players)
await db_post('cards', payload={'cards': [
{'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in anchor_players]
}, timeout=10)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in anchor_players
]
},
timeout=10,
)
# Get 10 pitchers to seed team
five_sps = await db_get('players/random', params=[('pos_include', 'SP'), ('max_rarity', 1), ('limit', 5)])
five_rps = await db_get('players/random', params=[('pos_include', 'RP'), ('max_rarity', 1), ('limit', 5)])
team_sp = [x for x in five_sps['players']]
team_rp = [x for x in five_rps['players']]
five_sps = await db_get(
"players/random",
params=[("pos_include", "SP"), ("max_rarity", 1), ("limit", 5)],
)
five_rps = await db_get(
"players/random",
params=[("pos_include", "RP"), ("max_rarity", 1), ("limit", 5)],
)
team_sp = [x for x in five_sps["players"]]
team_rp = [x for x in five_rps["players"]]
update_roster_counts([*team_sp, *team_rp])
await db_post('cards', payload={'cards': [
{'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in [*team_sp, *team_rp]]
}, timeout=10)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in [*team_sp, *team_rp]
]
},
timeout=10,
)
# TODO: track reserve vs replacement and if rep < res, get rep, else get res
# Collect infielders
team_infielders = []
for pos in ['C', '1B', '2B', '3B', 'SS']:
max_rar = 1
if roster_counts['Replacement'] < roster_counts['Reserve']:
max_rar = 0
for pos in ["C", "1B", "2B", "3B", "SS"]:
if roster_counts["Replacement"] < roster_counts["Reserve"]:
rarity_params = [("min_rarity", 0), ("max_rarity", 0)]
else:
rarity_params = [("min_rarity", 1), ("max_rarity", 1)]
r_draw = await db_get(
'players/random', params=[('pos_include', pos), ('max_rarity', max_rar), ('limit', 2)], none_okay=False
"players/random",
params=[("pos_include", pos), *rarity_params, ("limit", 2)],
none_okay=False,
)
team_infielders.extend(r_draw['players'])
team_infielders.extend(r_draw["players"])
update_roster_counts(team_infielders)
await db_post('cards', payload={'cards': [
{'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in team_infielders]
}, timeout=10)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in team_infielders
]
},
timeout=10,
)
# Collect outfielders
team_outfielders = []
for pos in ['LF', 'CF', 'RF']:
max_rar = 1
if roster_counts['Replacement'] < roster_counts['Reserve']:
max_rar = 0
for pos in ["LF", "CF", "RF"]:
if roster_counts["Replacement"] < roster_counts["Reserve"]:
rarity_params = [("min_rarity", 0), ("max_rarity", 0)]
else:
rarity_params = [("min_rarity", 1), ("max_rarity", 1)]
r_draw = await db_get(
'players/random', params=[('pos_include', pos), ('max_rarity', max_rar), ('limit', 2)], none_okay=False
"players/random",
params=[("pos_include", pos), *rarity_params, ("limit", 2)],
none_okay=False,
)
team_outfielders.extend(r_draw['players'])
team_outfielders.extend(r_draw["players"])
update_roster_counts(team_outfielders)
await db_post('cards', payload={'cards': [
{'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in team_outfielders]
}, timeout=10)
await db_post(
"cards",
payload={
"cards": [
{
"player_id": x["player_id"],
"team_id": team["id"],
"pack_id": this_pack["id"],
}
for x in team_outfielders
]
},
timeout=10,
)
async with op_ch.typing():
done_anc = await display_cards(
[{'player': x, 'team': team} for x in anchor_players], team, op_ch, interaction.user, self.bot,
cust_message=f'Let\'s take a look at your three {team_choice} anchor players.\n'
f'Press `Close Pack` to continue.',
add_roster=False
[{"player": x, "team": team} for x in anchor_players],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Let's take a look at your three {team_choice} anchor players.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
error_text = f'Yikes - I can\'t display the rest of your team. {get_cal_user(interaction).mention} plz halp'
error_text = f"Yikes - I can't display the rest of your team. {get_cal_user(interaction).mention} plz halp"
if not done_anc:
await op_ch.send(error_text)
async with op_ch.typing():
done_sp = await display_cards(
[{'player': x, 'team': team} for x in team_sp], team, op_ch, interaction.user, self.bot,
cust_message=f'Here are your starting pitchers.\n'
f'Press `Close Pack` to continue.',
add_roster=False
[{"player": x, "team": team} for x in team_sp],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Here are your starting pitchers.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_sp:
@ -343,10 +469,14 @@ class TeamSetup(commands.Cog):
async with op_ch.typing():
done_rp = await display_cards(
[{'player': x, 'team': team} for x in team_rp], team, op_ch, interaction.user, self.bot,
cust_message=f'And now for your bullpen.\n'
f'Press `Close Pack` to continue.',
add_roster=False
[{"player": x, "team": team} for x in team_rp],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"And now for your bullpen.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_rp:
@ -354,10 +484,14 @@ class TeamSetup(commands.Cog):
async with op_ch.typing():
done_inf = await display_cards(
[{'player': x, 'team': team} for x in team_infielders], team, op_ch, interaction.user, self.bot,
cust_message=f'Next let\'s take a look at your infielders.\n'
f'Press `Close Pack` to continue.',
add_roster=False
[{"player": x, "team": team} for x in team_infielders],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Next let's take a look at your infielders.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_inf:
@ -365,10 +499,14 @@ class TeamSetup(commands.Cog):
async with op_ch.typing():
done_out = await display_cards(
[{'player': x, 'team': team} for x in team_outfielders], team, op_ch, interaction.user, self.bot,
cust_message=f'Now let\'s take a look at your outfielders.\n'
f'Press `Close Pack` to continue.',
add_roster=False
[{"player": x, "team": team} for x in team_outfielders],
team,
op_ch,
interaction.user,
self.bot,
cust_message=f"Now let's take a look at your outfielders.\n"
f"Press `Close Pack` to continue.",
add_roster=False,
)
if not done_out:
@ -376,129 +514,154 @@ class TeamSetup(commands.Cog):
await give_packs(team, 1)
await op_ch.send(
f'To get you started, I\'ve spotted you 100₼ and a pack of cards. You can rip that with the '
f'`/open` command once your google sheet is set up!'
f"To get you started, I've spotted you 100₼ and a pack of cards. You can rip that with the "
f"`/open` command once your google sheet is set up!"
)
await op_ch.send(
f'{t_role.mention}\n\n'
f'There\'s your roster! We have one more step and you will be ready to play.\n\n{SHEET_SHARE_STEPS}\n\n'
f'{get_roster_sheet({"gsheet": current["gsheet_template"]})}'
f"{t_role.mention}\n\n"
f"There's your roster! We have one more step and you will be ready to play.\n\n{SHEET_SHARE_STEPS}\n\n"
f"{get_roster_sheet({'gsheet': current['gsheet_template']})}"
)
new_team_embed = await team_summary_embed(team, interaction, include_roster=False)
new_team_embed = await team_summary_embed(
team, interaction, include_roster=False
)
await send_to_channel(
self.bot, "pd-network-news", content='A new challenger approaches...', embed=new_team_embed
self.bot,
"pd-network-news",
content="A new challenger approaches...",
embed=new_team_embed,
)
@commands.hybrid_command(name='newsheet', help='Link a new team sheet with your team')
@commands.hybrid_command(
name="newsheet", help="Link a new team sheet with your team"
)
@commands.has_any_role(PD_PLAYERS)
async def share_sheet_command(
self, ctx, google_sheet_url: str, team_abbrev: Optional[str], copy_rosters: Optional[bool] = True):
self,
ctx,
google_sheet_url: str,
team_abbrev: Optional[str],
copy_rosters: Optional[bool] = True,
):
owner_team = await get_team_by_owner(get_context_user(ctx).id)
if not owner_team:
await ctx.send(f'I don\'t see a team for you, yet. You can sign up with the `/newteam` command!')
await ctx.send(
f"I don't see a team for you, yet. You can sign up with the `/newteam` command!"
)
return
team = owner_team
if team_abbrev and team_abbrev != owner_team['abbrev']:
if team_abbrev and team_abbrev != owner_team["abbrev"]:
if get_context_user(ctx).id != 258104532423147520:
await ctx.send(f'You can only update the team sheet for your own team, you goober.')
await ctx.send(
f"You can only update the team sheet for your own team, you goober."
)
return
else:
team = await get_team_by_abbrev(team_abbrev)
current = await db_get('current')
if current['gsheet_template'] in google_sheet_url:
await ctx.send(f'Ope, looks like that is the template sheet. Would you please make a copy and then share?')
current = await db_get("current")
if current["gsheet_template"] in google_sheet_url:
await ctx.send(
f"Ope, looks like that is the template sheet. Would you please make a copy and then share?"
)
return
gauntlet_team = await get_team_by_abbrev(f'Gauntlet-{owner_team["abbrev"]}')
gauntlet_team = await get_team_by_abbrev(f"Gauntlet-{owner_team['abbrev']}")
if gauntlet_team:
view = ButtonOptions([ctx.author], timeout=30, labels=['Main Team', 'Gauntlet Team', None, None, None])
question = await ctx.send(f'Is this sheet for your main PD team or your active Gauntlet team?', view=view)
view = ButtonOptions(
[ctx.author],
timeout=30,
labels=["Main Team", "Gauntlet Team", None, None, None],
)
question = await ctx.send(
f"Is this sheet for your main PD team or your active Gauntlet team?",
view=view,
)
await view.wait()
if not view.value:
await question.edit(
content=f'Okay you keep thinking on it and get back to me when you\'re ready.', view=None
content=f"Okay you keep thinking on it and get back to me when you're ready.",
view=None,
)
return
elif view.value == 'Gauntlet Team':
elif view.value == "Gauntlet Team":
await question.delete()
team = gauntlet_team
sheets = get_sheets(self.bot)
response = await ctx.send(f'I\'ll go grab that sheet...')
response = await ctx.send(f"I'll go grab that sheet...")
try:
new_sheet = sheets.open_by_url(google_sheet_url)
except Exception as e:
logger.error(f'Error accessing {team["abbrev"]} sheet: {e}')
current = await db_get('current')
await ctx.send(f'I wasn\'t able to access that sheet. Did you remember to share it with my PD email?'
f'\n\nHere\'s a quick refresher:\n{SHEET_SHARE_STEPS}\n\n'
f'{get_roster_sheet({"gsheet": current["gsheet_template"]})}')
logger.error(f"Error accessing {team['abbrev']} sheet: {e}")
current = await db_get("current")
await ctx.send(
f"I wasn't able to access that sheet. Did you remember to share it with my PD email?"
f"\n\nHere's a quick refresher:\n{SHEET_SHARE_STEPS}\n\n"
f"{get_roster_sheet({'gsheet': current['gsheet_template']})}"
)
return
team_data = new_sheet.worksheet_by_title('Team Data')
team_data = new_sheet.worksheet_by_title("Team Data")
if not gauntlet_team or owner_team != gauntlet_team:
team_data.update_values(
crange='B1:B2',
values=[[f'{team["id"]}'], [f'{team_hash(team)}']]
crange="B1:B2", values=[[f"{team['id']}"], [f"{team_hash(team)}"]]
)
if copy_rosters and team['gsheet'].lower() != 'none':
old_sheet = sheets.open_by_key(team['gsheet'])
r_sheet = old_sheet.worksheet_by_title(f'My Rosters')
roster_ids = r_sheet.range('B3:B80')
lineups_data = r_sheet.range('H4:M26')
if copy_rosters and team["gsheet"].lower() != "none":
old_sheet = sheets.open_by_key(team["gsheet"])
r_sheet = old_sheet.worksheet_by_title(f"My Rosters")
roster_ids = r_sheet.range("B3:B80")
lineups_data = r_sheet.range("H4:M26")
new_r_data, new_l_data = [], []
for row in roster_ids:
if row[0].value != '':
if row[0].value != "":
new_r_data.append([int(row[0].value)])
else:
new_r_data.append([None])
logger.debug(f'new_r_data: {new_r_data}')
logger.debug(f"new_r_data: {new_r_data}")
for row in lineups_data:
logger.debug(f'row: {row}')
new_l_data.append([
row[0].value if row[0].value != '' else None,
int(row[1].value) if row[1].value != '' else None,
row[2].value if row[2].value != '' else None,
int(row[3].value) if row[3].value != '' else None,
row[4].value if row[4].value != '' else None,
int(row[5].value) if row[5].value != '' else None
])
logger.debug(f'new_l_data: {new_l_data}')
logger.debug(f"row: {row}")
new_l_data.append(
[
row[0].value if row[0].value != "" else None,
int(row[1].value) if row[1].value != "" else None,
row[2].value if row[2].value != "" else None,
int(row[3].value) if row[3].value != "" else None,
row[4].value if row[4].value != "" else None,
int(row[5].value) if row[5].value != "" else None,
]
)
logger.debug(f"new_l_data: {new_l_data}")
new_r_sheet = new_sheet.worksheet_by_title(f'My Rosters')
new_r_sheet.update_values(
crange='B3:B80',
values=new_r_data
)
new_r_sheet.update_values(
crange='H4:M26',
values=new_l_data
)
new_r_sheet = new_sheet.worksheet_by_title(f"My Rosters")
new_r_sheet.update_values(crange="B3:B80", values=new_r_data)
new_r_sheet.update_values(crange="H4:M26", values=new_l_data)
if team['has_guide']:
if team["has_guide"]:
post_ratings_guide(team, self.bot, this_sheet=new_sheet)
team = await db_patch('teams', object_id=team['id'], params=[('gsheet', new_sheet.id)])
team = await db_patch(
"teams", object_id=team["id"], params=[("gsheet", new_sheet.id)]
)
await refresh_sheet(team, self.bot, sheets)
conf_message = f'Alright, your sheet is linked to your team - good luck'
conf_message = f"Alright, your sheet is linked to your team - good luck"
if owner_team == team:
conf_message += ' this season!'
conf_message += " this season!"
else:
conf_message += ' on your run!'
conf_message += f'\n\n{HELP_SHEET_SCRIPTS}'
await response.edit(content=f'{conf_message}')
conf_message += " on your run!"
conf_message += f"\n\n{HELP_SHEET_SCRIPTS}"
await response.edit(content=f"{conf_message}")
async def setup(bot):
"""Setup function for the TeamSetup cog."""
await bot.add_cog(TeamSetup(bot))
await bot.add_cog(TeamSetup(bot))

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -12,65 +12,89 @@ import datetime
from sqlmodel import Session
from api_calls import db_get, db_post, db_patch, db_delete, get_team_by_abbrev
from helpers import (
ACTIVE_EVENT_LITERAL, PD_PLAYERS_ROLE_NAME, get_team_embed, get_team_by_owner,
legal_channel, Confirm, send_to_channel
ACTIVE_EVENT_LITERAL,
PD_PLAYERS_ROLE_NAME,
get_team_embed,
get_team_by_owner,
legal_channel,
Confirm,
send_to_channel,
)
from helpers.utils import get_roster_sheet, get_cal_user
from utilities.buttons import ask_with_buttons
from in_game.gameplay_models import engine
from in_game.gameplay_queries import get_team_or_none
logger = logging.getLogger('discord_app')
logger = logging.getLogger("discord_app")
# Try to import gauntlets module, provide fallback if not available
try:
import gauntlets
GAUNTLETS_AVAILABLE = True
except ImportError:
logger.warning("Gauntlets module not available - gauntlet commands will have limited functionality")
logger.warning(
"Gauntlets module not available - gauntlet commands will have limited functionality"
)
GAUNTLETS_AVAILABLE = False
gauntlets = None
class Gauntlet(commands.Cog):
"""Gauntlet game mode functionality for Paper Dynasty."""
def __init__(self, bot):
self.bot = bot
group_gauntlet = app_commands.Group(name='gauntlets', description='Check your progress or start a new Gauntlet')
group_gauntlet = app_commands.Group(
name="gauntlets", description="Check your progress or start a new Gauntlet"
)
@group_gauntlet.command(name='status', description='View status of current Gauntlet run')
@group_gauntlet.command(
name="status", description="View status of current Gauntlet run"
)
@app_commands.describe(
team_abbrev='To check the status of a team\'s active run, enter their abbreviation'
team_abbrev="To check the status of a team's active run, enter their abbreviation"
)
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def gauntlet_run_command(
self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL, # type: ignore
team_abbrev: Optional[str] = None):
self,
interaction: discord.Interaction,
event_name: ACTIVE_EVENT_LITERAL, # type: ignore
team_abbrev: Optional[str] = None,
):
"""View status of current gauntlet run - corrected to match original business logic."""
await interaction.response.defer()
e_query = await db_get('events', params=[("name", event_name), ("active", True)])
if not e_query or e_query.get('count', 0) == 0:
await interaction.edit_original_response(content=f'Hmm...looks like that event is inactive.')
e_query = await db_get(
"events", params=[("name", event_name), ("active", True)]
)
if not e_query or e_query.get("count", 0) == 0:
await interaction.edit_original_response(
content=f"Hmm...looks like that event is inactive."
)
return
else:
this_event = e_query['events'][0]
this_event = e_query["events"][0]
this_run, this_team = None, None
if team_abbrev:
if 'Gauntlet-' not in team_abbrev:
team_abbrev = f'Gauntlet-{team_abbrev}'
t_query = await db_get('teams', params=[('abbrev', team_abbrev)])
if t_query and t_query.get('count', 0) != 0:
this_team = t_query['teams'][0]
r_query = await db_get('gauntletruns', params=[
('team_id', this_team['id']), ('is_active', True), ('gauntlet_id', this_event['id'])
])
if "Gauntlet-" not in team_abbrev:
team_abbrev = f"Gauntlet-{team_abbrev}"
t_query = await db_get("teams", params=[("abbrev", team_abbrev)])
if t_query and t_query.get("count", 0) != 0:
this_team = t_query["teams"][0]
r_query = await db_get(
"gauntletruns",
params=[
("team_id", this_team["id"]),
("is_active", True),
("gauntlet_id", this_event["id"]),
],
)
if r_query and r_query.get('count', 0) != 0:
this_run = r_query['runs'][0]
if r_query and r_query.get("count", 0) != 0:
this_run = r_query["runs"][0]
else:
await interaction.edit_original_response(
content=f'I do not see an active run for the {this_team["lname"]}.'
@ -78,7 +102,7 @@ class Gauntlet(commands.Cog):
return
else:
await interaction.edit_original_response(
content=f'I do not see an active run for {team_abbrev.upper()}.'
content=f"I do not see an active run for {team_abbrev.upper()}."
)
return
@ -86,127 +110,168 @@ class Gauntlet(commands.Cog):
if GAUNTLETS_AVAILABLE and gauntlets:
await interaction.edit_original_response(
content=None,
embed=await gauntlets.get_embed(this_run, this_event, this_team) # type: ignore
embed=await gauntlets.get_embed(this_run, this_event, this_team), # type: ignore
)
else:
await interaction.edit_original_response(
content='Gauntlet status unavailable - gauntlets module not loaded.'
content="Gauntlet status unavailable - gauntlets module not loaded."
)
@group_gauntlet.command(name='start', description='Start a new Gauntlet run')
@group_gauntlet.command(name="start", description="Start a new Gauntlet run")
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def gauntlet_start_command(self, interaction: discord.Interaction):
"""Start a new gauntlet run."""
# Channel restriction - must be in a 'hello' channel (private channel)
if interaction.channel and hasattr(interaction.channel, 'name') and 'hello' not in str(interaction.channel.name):
if (
interaction.channel
and hasattr(interaction.channel, "name")
and "hello" not in str(interaction.channel.name)
):
await interaction.response.send_message(
content='The draft will probably take you about 15 minutes. Why don\'t you head to your private '
'channel to run the draft?',
ephemeral=True
content="The draft will probably take you about 15 minutes. Why don't you head to your private "
"channel to run the draft?",
ephemeral=True,
)
return
logger.info(f'Starting a gauntlet run for user {interaction.user.name}')
logger.info(f"Starting a gauntlet run for user {interaction.user.name}")
await interaction.response.defer()
with Session(engine) as session:
main_team = await get_team_or_none(session, gm_id=interaction.user.id, main_team=True)
draft_team = await get_team_or_none(session, gm_id=interaction.user.id, gauntlet_team=True)
main_team = await get_team_or_none(
session, gm_id=interaction.user.id, main_team=True
)
draft_team = await get_team_or_none(
session, gm_id=interaction.user.id, gauntlet_team=True
)
# Get active events
e_query = await db_get('events', params=[("active", True)])
if not e_query or e_query.get('count', 0) == 0:
await interaction.edit_original_response(content='Hmm...I don\'t see any active events.')
e_query = await db_get("events", params=[("active", True)])
if not e_query or e_query.get("count", 0) == 0:
await interaction.edit_original_response(
content="Hmm...I don't see any active events."
)
return
elif e_query.get('count', 0) == 1:
this_event = e_query['events'][0]
elif e_query.get("count", 0) == 1:
this_event = e_query["events"][0]
else:
event_choice = await ask_with_buttons(
interaction,
button_options=[x['name'] for x in e_query['events']],
question='Which event would you like to take on?',
button_options=[x["name"] for x in e_query["events"]],
question="Which event would you like to take on?",
timeout=3,
delete_question=False
delete_question=False,
)
this_event = [event for event in e_query['events'] if event['name'] == event_choice][0]
logger.info(f'this_event: {this_event}')
this_event = [
event
for event in e_query["events"]
if event["name"] == event_choice
][0]
logger.info(f"this_event: {this_event}")
first_flag = draft_team is None
if draft_team is not None:
r_query = await db_get(
'gauntletruns',
params=[('team_id', draft_team.id), ('gauntlet_id', this_event['id']), ('is_active', True)]
"gauntletruns",
params=[
("team_id", draft_team.id),
("gauntlet_id", this_event["id"]),
("is_active", True),
],
)
if r_query and r_query.get('count', 0) != 0:
if r_query and r_query.get("count", 0) != 0:
await interaction.edit_original_response(
content=f'Looks like you already have a {r_query["runs"][0]["gauntlet"]["name"]} run active! '
f'You can check it out with the `/gauntlets status` command.'
f"You can check it out with the `/gauntlets status` command."
)
return
try:
draft_embed = await gauntlets.run_draft(interaction, main_team, this_event, draft_team) # type: ignore
draft_embed = await gauntlets.run_draft(interaction, main_team, this_event, draft_team) # type: ignore
except ZeroDivisionError as e:
logger.error(
f'ZeroDivisionError in {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}'
)
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
await interaction.followup.send(
content=f"Shoot - it looks like we ran into an issue running the draft. I had to clear it all out "
f"for now. I let {get_cal_user(interaction).mention} know what happened so he better "
f"fix it quick."
)
return
except Exception as e:
logger.error(f'Failed to run {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}')
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
logger.error(
f'Failed to run {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}'
)
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
await interaction.followup.send(
content=f'Shoot - it looks like we ran into an issue running the draft. I had to clear it all out '
f'for now. I let {get_cal_user(interaction).mention} know what happened so he better '
f'fix it quick.'
content=f"Shoot - it looks like we ran into an issue running the draft. I had to clear it all out "
f"for now. I let {get_cal_user(interaction).mention} know what happened so he better "
f"fix it quick."
)
return
if first_flag:
await interaction.followup.send(
f'Good luck, champ in the making! To start playing, follow these steps:\n\n'
f'1) Make a copy of the Team Sheet Template found in `/help-pd links`\n'
f'2) Run `/newsheet` to link it to your Gauntlet team\n'
f"Good luck, champ in the making! To start playing, follow these steps:\n\n"
f"1) Make a copy of the Team Sheet Template found in `/help-pd links`\n"
f"2) Run `/newsheet` to link it to your Gauntlet team\n"
f'3) Go play your first game with `/new-game gauntlet {this_event["name"]}`'
)
else:
await interaction.followup.send(
f'Good luck, champ in the making! In your team sheet, sync your cards with **Paper Dynasty** -> '
f'**Data Imports** -> **My Cards** then you can set your lineup here and you\'ll be ready to go!\n\n'
f'{get_roster_sheet(draft_team)}'
f"Good luck, champ in the making! In your team sheet, sync your cards with **Paper Dynasty** -> "
f"**Data Imports** -> **My Cards** then you can set your lineup here and you'll be ready to go!\n\n"
f"{get_roster_sheet(draft_team)}"
)
await send_to_channel(
bot=self.bot,
channel_name='pd-news-ticker',
channel_name="pd-news-ticker",
content=f'The {main_team.lname if main_team else "Unknown Team"} have entered the {this_event["name"]} Gauntlet!',
embed=draft_embed
embed=draft_embed,
)
@group_gauntlet.command(name='reset', description='Wipe your current team so you can re-draft')
@group_gauntlet.command(
name="reset", description="Wipe your current team so you can re-draft"
)
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def gauntlet_reset_command(self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL): # type: ignore
async def gauntlet_reset_command(self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL): # type: ignore
"""Reset current gauntlet run."""
await interaction.response.defer()
main_team = await get_team_by_owner(interaction.user.id)
draft_team = await get_team_by_abbrev(f'Gauntlet-{main_team["abbrev"]}')
if draft_team is None:
await interaction.edit_original_response(
content='Hmm, I can\'t find a gauntlet team for you. Have you signed up already?')
content="Hmm, I can't find a gauntlet team for you. Have you signed up already?"
)
return
e_query = await db_get('events', params=[("name", event_name), ("active", True)])
if e_query['count'] == 0:
await interaction.edit_original_response(content='Hmm...looks like that event is inactive.')
e_query = await db_get(
"events", params=[("name", event_name), ("active", True)]
)
if e_query["count"] == 0:
await interaction.edit_original_response(
content="Hmm...looks like that event is inactive."
)
return
else:
this_event = e_query['events'][0]
this_event = e_query["events"][0]
r_query = await db_get('gauntletruns', params=[
('team_id', draft_team['id']), ('is_active', True), ('gauntlet_id', this_event['id'])
])
r_query = await db_get(
"gauntletruns",
params=[
("team_id", draft_team["id"]),
("is_active", True),
("gauntlet_id", this_event["id"]),
],
)
if r_query and r_query.get('count', 0) != 0:
this_run = r_query['runs'][0]
if r_query and r_query.get("count", 0) != 0:
this_run = r_query["runs"][0]
else:
await interaction.edit_original_response(
content=f'I do not see an active run for the {draft_team["lname"]}.'
@ -214,27 +279,24 @@ class Gauntlet(commands.Cog):
return
view = Confirm(responders=[interaction.user], timeout=60)
conf_string = f'Are you sure you want to wipe your active run?'
await interaction.edit_original_response(
content=conf_string,
view=view
)
conf_string = f"Are you sure you want to wipe your active run?"
await interaction.edit_original_response(content=conf_string, view=view)
await view.wait()
if view.value:
await gauntlets.end_run(this_run, this_event, draft_team, force_end=True) # type: ignore
await gauntlets.end_run(this_run, this_event, draft_team, force_end=True) # type: ignore
await interaction.edit_original_response(
content=f'Your {event_name} run has been reset. Run `/gauntlets start` to redraft!',
view=None
content=f"Your {event_name} run has been reset. Run `/gauntlets start` to redraft!",
view=None,
)
else:
await interaction.edit_original_response(
content=f'~~{conf_string}~~\n\nNo worries, I will leave it active.',
view=None
content=f"~~{conf_string}~~\n\nNo worries, I will leave it active.",
view=None,
)
async def setup(bot):
"""Setup function for the Gauntlet cog."""
await bot.add_cog(Gauntlet(bot))
await bot.add_cog(Gauntlet(bot))

View File

@ -1266,7 +1266,7 @@ async def checks_log_interaction(
f"Hm, I was not able to find a gauntlet team for you."
)
if not owner_team.id in [this_game.away_team_id, this_game.home_team_id]:
if owner_team.id not in [this_game.away_team_id, this_game.home_team_id]:
if interaction.user.id != 258104532423147520:
logger.exception(
f"{interaction.user.display_name} tried to run a command in Game {this_game.id} when they aren't a GM in the game."
@ -4295,32 +4295,29 @@ async def complete_game(
else this_game.away_team
)
db_game = None
try:
db_game = await db_post("games", payload=game_data)
db_ready_plays = get_db_ready_plays(session, this_game, db_game["id"])
db_ready_decisions = get_db_ready_decisions(session, this_game, db_game["id"])
except Exception as e:
await roll_back(db_game["id"])
if db_game is not None:
await roll_back(db_game["id"])
log_exception(e, msg="Unable to post game to API, rolling back")
# Post game stats to API
try:
resp = await db_post("plays", payload=db_ready_plays)
await db_post("plays", payload=db_ready_plays)
except Exception as e:
await roll_back(db_game["id"], plays=True)
log_exception(e, msg="Unable to post plays to API, rolling back")
if len(resp) > 0:
pass
try:
resp = await db_post("decisions", payload={"decisions": db_ready_decisions})
await db_post("decisions", payload={"decisions": db_ready_decisions})
except Exception as e:
await roll_back(db_game["id"], plays=True, decisions=True)
log_exception(e, msg="Unable to post decisions to API, rolling back")
if len(resp) > 0:
pass
# Post game rewards (gauntlet and main team)
try:

File diff suppressed because it is too large Load Diff

View File

@ -11,7 +11,7 @@ import logging
import discord
from api_calls import db_get, db_patch, db_post
from api_calls import db_get, db_post
from helpers.main import get_team_by_owner, get_card_embeds
from helpers.scouting import (
SCOUT_TOKEN_COST,
@ -340,9 +340,7 @@ class BuyScoutTokenView(discord.ui.View):
# Deduct currency
new_wallet = team["wallet"] - SCOUT_TOKEN_COST
try:
await db_patch(
"teams", object_id=team["id"], params=[("wallet", new_wallet)]
)
await db_post(f'teams/{team["id"]}/money/-{SCOUT_TOKEN_COST}')
except Exception as e:
logger.error(f"Failed to deduct scout token cost: {e}")
await interaction.response.edit_message(

View File

@ -1,252 +0,0 @@
"""
Discord Utilities
This module contains Discord helper functions for channels, roles, embeds,
and other Discord-specific operations.
"""
import logging
import os
import asyncio
from typing import Optional
import discord
from discord.ext import commands
from helpers.constants import SBA_COLOR, PD_SEASON, IMAGES
logger = logging.getLogger('discord_app')
async def send_to_bothole(ctx, content, embed):
"""Send a message to the pd-bot-hole channel."""
await discord.utils.get(ctx.guild.text_channels, name='pd-bot-hole') \
.send(content=content, embed=embed)
async def send_to_news(ctx, content, embed):
"""Send a message to the pd-news-ticker channel."""
await discord.utils.get(ctx.guild.text_channels, name='pd-news-ticker') \
.send(content=content, embed=embed)
async def typing_pause(ctx, seconds=1):
"""Show typing indicator for specified seconds."""
async with ctx.typing():
await asyncio.sleep(seconds)
async def pause_then_type(ctx, message):
"""Show typing indicator based on message length, then send message."""
async with ctx.typing():
await asyncio.sleep(len(message) / 100)
await ctx.send(message)
async def check_if_pdhole(ctx):
"""Check if the current channel is pd-bot-hole."""
if ctx.message.channel.name != 'pd-bot-hole':
await ctx.send('Slide on down to my bot-hole for running commands.')
await ctx.message.add_reaction('')
return False
return True
async def bad_channel(ctx):
"""Check if current channel is in the list of bad channels for commands."""
bad_channels = ['paper-dynasty-chat', 'pd-news-ticker']
if ctx.message.channel.name in bad_channels:
await ctx.message.add_reaction('')
bot_hole = discord.utils.get(
ctx.guild.text_channels,
name=f'pd-bot-hole'
)
await ctx.send(f'Slide on down to the {bot_hole.mention} ;)')
return True
else:
return False
def get_channel(ctx, name) -> Optional[discord.TextChannel]:
"""Get a text channel by name."""
# Handle both Context and Interaction objects
guild = ctx.guild if hasattr(ctx, 'guild') else None
if not guild:
return None
channel = discord.utils.get(
guild.text_channels,
name=name
)
if channel:
return channel
return None
async def get_emoji(ctx, name, return_empty=True):
"""Get an emoji by name, with fallback options."""
try:
emoji = await commands.converter.EmojiConverter().convert(ctx, name)
except:
if return_empty:
emoji = ''
else:
return name
return emoji
async def react_and_reply(ctx, reaction, message):
"""Add a reaction to the message and send a reply."""
await ctx.message.add_reaction(reaction)
await ctx.send(message)
async def send_to_channel(bot, channel_name, content=None, embed=None):
"""Send a message to a specific channel by name or ID."""
guild = bot.get_guild(int(os.environ.get('GUILD_ID')))
if not guild:
logger.error('Cannot send to channel - bot not logged in')
return
this_channel = discord.utils.get(guild.text_channels, name=channel_name)
if not this_channel:
this_channel = discord.utils.get(guild.text_channels, id=channel_name)
if not this_channel:
raise NameError(f'**{channel_name}** channel not found')
return await this_channel.send(content=content, embed=embed)
async def get_or_create_role(ctx, role_name, mentionable=True):
"""Get an existing role or create it if it doesn't exist."""
this_role = discord.utils.get(ctx.guild.roles, name=role_name)
if not this_role:
this_role = await ctx.guild.create_role(name=role_name, mentionable=mentionable)
return this_role
def get_special_embed(special):
"""Create an embed for a special item."""
embed = discord.Embed(title=f'{special.name} - Special #{special.get_id()}',
color=discord.Color.random(),
description=f'{special.short_desc}')
embed.add_field(name='Description', value=f'{special.long_desc}', inline=False)
if special.thumbnail.lower() != 'none':
embed.set_thumbnail(url=f'{special.thumbnail}')
if special.url.lower() != 'none':
embed.set_image(url=f'{special.url}')
return embed
def get_random_embed(title, thumb=None):
"""Create a basic embed with random color."""
embed = discord.Embed(title=title, color=discord.Color.random())
if thumb:
embed.set_thumbnail(url=thumb)
return embed
def get_team_embed(title, team=None, thumbnail: bool = True):
"""Create a team-branded embed."""
if team:
embed = discord.Embed(
title=title,
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16)
)
embed.set_footer(text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES['logo'])
if thumbnail:
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES['logo'])
else:
embed = discord.Embed(
title=title,
color=int(SBA_COLOR, 16)
)
embed.set_footer(text=f'Paper Dynasty Season {PD_SEASON}', icon_url=IMAGES['logo'])
if thumbnail:
embed.set_thumbnail(url=IMAGES['logo'])
return embed
async def create_channel_old(
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True, allowed_members=None,
allowed_roles=None):
"""Create a text channel with specified permissions (legacy version)."""
this_category = discord.utils.get(ctx.guild.categories, name=category_name)
if not this_category:
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
overwrites = {
ctx.guild.me: discord.PermissionOverwrite(read_messages=True, send_messages=True),
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
}
if allowed_members:
if isinstance(allowed_members, list):
for member in allowed_members:
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
if allowed_roles:
if isinstance(allowed_roles, list):
for role in allowed_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
this_channel = await ctx.guild.create_text_channel(
channel_name,
overwrites=overwrites,
category=this_category
)
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
return this_channel
async def create_channel(
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True,
read_send_members: list = None, read_send_roles: list = None, read_only_roles: list = None):
"""Create a text channel with specified permissions."""
# Handle both Context and Interaction objects
guild = ctx.guild if hasattr(ctx, 'guild') else None
if not guild:
raise ValueError(f'Unable to access guild from context object')
# Get bot member - different for Context vs Interaction
if hasattr(ctx, 'me'): # Context object
bot_member = ctx.me
elif hasattr(ctx, 'client'): # Interaction object
bot_member = guild.get_member(ctx.client.user.id)
else:
# Fallback - try to find bot member by getting the first member with bot=True
bot_member = next((m for m in guild.members if m.bot), None)
if not bot_member:
raise ValueError(f'Unable to find bot member in guild')
this_category = discord.utils.get(guild.categories, name=category_name)
if not this_category:
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
overwrites = {
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True),
guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
}
if read_send_members:
for member in read_send_members:
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
if read_send_roles:
for role in read_send_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
if read_only_roles:
for role in read_only_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=False)
this_channel = await guild.create_text_channel(
channel_name,
overwrites=overwrites,
category=this_category
)
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
return this_channel

2153
helpers.py

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ Discord Utilities
This module contains Discord helper functions for channels, roles, embeds,
and other Discord-specific operations.
"""
import logging
import os
import asyncio
@ -13,19 +14,21 @@ import discord
from discord.ext import commands
from helpers.constants import SBA_COLOR, PD_SEASON, IMAGES
logger = logging.getLogger('discord_app')
logger = logging.getLogger("discord_app")
async def send_to_bothole(ctx, content, embed):
"""Send a message to the pd-bot-hole channel."""
await discord.utils.get(ctx.guild.text_channels, name='pd-bot-hole') \
.send(content=content, embed=embed)
await discord.utils.get(ctx.guild.text_channels, name="pd-bot-hole").send(
content=content, embed=embed
)
async def send_to_news(ctx, content, embed):
"""Send a message to the pd-news-ticker channel."""
await discord.utils.get(ctx.guild.text_channels, name='pd-news-ticker') \
.send(content=content, embed=embed)
await discord.utils.get(ctx.guild.text_channels, name="pd-news-ticker").send(
content=content, embed=embed
)
async def typing_pause(ctx, seconds=1):
@ -43,23 +46,20 @@ async def pause_then_type(ctx, message):
async def check_if_pdhole(ctx):
"""Check if the current channel is pd-bot-hole."""
if ctx.message.channel.name != 'pd-bot-hole':
await ctx.send('Slide on down to my bot-hole for running commands.')
await ctx.message.add_reaction('')
if ctx.message.channel.name != "pd-bot-hole":
await ctx.send("Slide on down to my bot-hole for running commands.")
await ctx.message.add_reaction("")
return False
return True
async def bad_channel(ctx):
"""Check if current channel is in the list of bad channels for commands."""
bad_channels = ['paper-dynasty-chat', 'pd-news-ticker']
bad_channels = ["paper-dynasty-chat", "pd-news-ticker"]
if ctx.message.channel.name in bad_channels:
await ctx.message.add_reaction('')
bot_hole = discord.utils.get(
ctx.guild.text_channels,
name=f'pd-bot-hole'
)
await ctx.send(f'Slide on down to the {bot_hole.mention} ;)')
await ctx.message.add_reaction("")
bot_hole = discord.utils.get(ctx.guild.text_channels, name=f"pd-bot-hole")
await ctx.send(f"Slide on down to the {bot_hole.mention} ;)")
return True
else:
return False
@ -68,14 +68,11 @@ async def bad_channel(ctx):
def get_channel(ctx, name) -> Optional[discord.TextChannel]:
"""Get a text channel by name."""
# Handle both Context and Interaction objects
guild = ctx.guild if hasattr(ctx, 'guild') else None
guild = ctx.guild if hasattr(ctx, "guild") else None
if not guild:
return None
channel = discord.utils.get(
guild.text_channels,
name=name
)
channel = discord.utils.get(guild.text_channels, name=name)
if channel:
return channel
return None
@ -87,7 +84,7 @@ async def get_emoji(ctx, name, return_empty=True):
emoji = await commands.converter.EmojiConverter().convert(ctx, name)
except:
if return_empty:
emoji = ''
emoji = ""
else:
return name
return emoji
@ -101,9 +98,13 @@ async def react_and_reply(ctx, reaction, message):
async def send_to_channel(bot, channel_name, content=None, embed=None):
"""Send a message to a specific channel by name or ID."""
guild = bot.get_guild(int(os.environ.get('GUILD_ID')))
guild_id = os.environ.get("GUILD_ID")
if not guild_id:
logger.error("GUILD_ID env var is not set")
return
guild = bot.get_guild(int(guild_id))
if not guild:
logger.error('Cannot send to channel - bot not logged in')
logger.error("Cannot send to channel - bot not logged in")
return
this_channel = discord.utils.get(guild.text_channels, name=channel_name)
@ -111,7 +112,7 @@ async def send_to_channel(bot, channel_name, content=None, embed=None):
if not this_channel:
this_channel = discord.utils.get(guild.text_channels, id=channel_name)
if not this_channel:
raise NameError(f'**{channel_name}** channel not found')
raise NameError(f"**{channel_name}** channel not found")
return await this_channel.send(content=content, embed=embed)
@ -128,14 +129,16 @@ async def get_or_create_role(ctx, role_name, mentionable=True):
def get_special_embed(special):
"""Create an embed for a special item."""
embed = discord.Embed(title=f'{special.name} - Special #{special.get_id()}',
color=discord.Color.random(),
description=f'{special.short_desc}')
embed.add_field(name='Description', value=f'{special.long_desc}', inline=False)
if special.thumbnail.lower() != 'none':
embed.set_thumbnail(url=f'{special.thumbnail}')
if special.url.lower() != 'none':
embed.set_image(url=f'{special.url}')
embed = discord.Embed(
title=f"{special.name} - Special #{special.get_id()}",
color=discord.Color.random(),
description=f"{special.short_desc}",
)
embed.add_field(name="Description", value=f"{special.long_desc}", inline=False)
if special.thumbnail.lower() != "none":
embed.set_thumbnail(url=f"{special.thumbnail}")
if special.url.lower() != "none":
embed.set_image(url=f"{special.url}")
return embed
@ -154,99 +157,125 @@ def get_team_embed(title, team=None, thumbnail: bool = True):
if team:
embed = discord.Embed(
title=title,
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16)
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16),
)
embed.set_footer(
text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES["logo"]
)
embed.set_footer(text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES['logo'])
if thumbnail:
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES['logo'])
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES["logo"])
else:
embed = discord.Embed(
title=title,
color=int(SBA_COLOR, 16)
embed = discord.Embed(title=title, color=int(SBA_COLOR, 16))
embed.set_footer(
text=f"Paper Dynasty Season {PD_SEASON}", icon_url=IMAGES["logo"]
)
embed.set_footer(text=f'Paper Dynasty Season {PD_SEASON}', icon_url=IMAGES['logo'])
if thumbnail:
embed.set_thumbnail(url=IMAGES['logo'])
embed.set_thumbnail(url=IMAGES["logo"])
return embed
async def create_channel_old(
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True, allowed_members=None,
allowed_roles=None):
ctx,
channel_name: str,
category_name: str,
everyone_send=False,
everyone_read=True,
allowed_members=None,
allowed_roles=None,
):
"""Create a text channel with specified permissions (legacy version)."""
this_category = discord.utils.get(ctx.guild.categories, name=category_name)
if not this_category:
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
raise ValueError(f"I couldn't find a category named **{category_name}**")
overwrites = {
ctx.guild.me: discord.PermissionOverwrite(read_messages=True, send_messages=True),
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
ctx.guild.me: discord.PermissionOverwrite(
read_messages=True, send_messages=True
),
ctx.guild.default_role: discord.PermissionOverwrite(
read_messages=everyone_read, send_messages=everyone_send
),
}
if allowed_members:
if isinstance(allowed_members, list):
for member in allowed_members:
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[member] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if allowed_roles:
if isinstance(allowed_roles, list):
for role in allowed_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
this_channel = await ctx.guild.create_text_channel(
channel_name,
overwrites=overwrites,
category=this_category
channel_name, overwrites=overwrites, category=this_category
)
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
return this_channel
async def create_channel(
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True,
read_send_members: list = None, read_send_roles: list = None, read_only_roles: list = None):
ctx,
channel_name: str,
category_name: str,
everyone_send=False,
everyone_read=True,
read_send_members: list = None,
read_send_roles: list = None,
read_only_roles: list = None,
):
"""Create a text channel with specified permissions."""
# Handle both Context and Interaction objects
guild = ctx.guild if hasattr(ctx, 'guild') else None
guild = ctx.guild if hasattr(ctx, "guild") else None
if not guild:
raise ValueError(f'Unable to access guild from context object')
raise ValueError(f"Unable to access guild from context object")
# Get bot member - different for Context vs Interaction
if hasattr(ctx, 'me'): # Context object
if hasattr(ctx, "me"): # Context object
bot_member = ctx.me
elif hasattr(ctx, 'client'): # Interaction object
elif hasattr(ctx, "client"): # Interaction object
bot_member = guild.get_member(ctx.client.user.id)
else:
# Fallback - try to find bot member by getting the first member with bot=True
bot_member = next((m for m in guild.members if m.bot), None)
if not bot_member:
raise ValueError(f'Unable to find bot member in guild')
raise ValueError(f"Unable to find bot member in guild")
this_category = discord.utils.get(guild.categories, name=category_name)
if not this_category:
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
raise ValueError(f"I couldn't find a category named **{category_name}**")
overwrites = {
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True),
guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
guild.default_role: discord.PermissionOverwrite(
read_messages=everyone_read, send_messages=everyone_send
),
}
if read_send_members:
for member in read_send_members:
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[member] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if read_send_roles:
for role in read_send_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if read_only_roles:
for role in read_only_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=False)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=False
)
this_channel = await guild.create_text_channel(
channel_name,
overwrites=overwrites,
category=this_category
channel_name, overwrites=overwrites, category=this_category
)
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
return this_channel
return this_channel

View File

@ -21,7 +21,7 @@ from utils import (
get_cal_user,
)
from search_utils import *
from discord_utils import *
from .discord_utils import *
# Refractor tier badge prefixes for card embeds (T0 = no badge)
TIER_BADGES = {1: "BC", 2: "R", 3: "GR", 4: "SF"}
@ -182,11 +182,19 @@ async def get_card_embeds(card, include_stats=False) -> list:
name="Collected By", value=f"{count} team{'s' if count != 1 else ''}"
)
# TODO: check for dupes with the included paperdex data
# if card['team']['lname'] != 'Paper Dynasty':
# team_dex = await db_get('cards', params=[("player_id", card["player"]["player_id"]), ('team_id', card['team']['id'])])
# count = 1 if not team_dex['count'] else team_dex['count']
# embed.add_field(name='# Dupes', value=f'{count - 1} dupe{"s" if count - 1 != 1 else ""}')
if card["team"]["lname"] != "Paper Dynasty":
team_dex = await db_get(
"cards",
params=[
("player_id", card["player"]["player_id"]),
("team_id", card["team"]["id"]),
],
)
if team_dex is not None:
dupe_count = max(0, team_dex["count"] - 1)
embed.add_field(
name="Dupes", value=f"{dupe_count} dupe{'s' if dupe_count != 1 else ''}"
)
# embed.add_field(name='Team', value=f'{card["player"]["mlbclub"]}')
if card["player"]["franchise"] != "Pokemon":

View File

@ -1315,47 +1315,9 @@ def create_test_games():
session.commit()
def select_speed_testing():
with Session(engine) as session:
game_1 = session.exec(select(Game).where(Game.id == 1)).one()
ss_search_start = datetime.datetime.now()
man_ss = [x for x in game_1.lineups if x.position == 'SS' and x.active]
ss_search_end = datetime.datetime.now()
ss_query_start = datetime.datetime.now()
query_ss = session.exec(select(Lineup).where(Lineup.game == game_1, Lineup.position == 'SS', Lineup.active == True)).all()
ss_query_end = datetime.datetime.now()
manual_time = ss_search_end - ss_search_start
query_time = ss_query_end - ss_query_start
print(f'Manual Shortstops: time: {manual_time.microseconds} ms / {man_ss}')
print(f'Query Shortstops: time: {query_time.microseconds} ms / {query_ss}')
print(f'Game: {game_1}')
games = session.exec(select(Game).where(Game.active == True)).all()
print(f'len(games): {len(games)}')
def select_all_testing():
with Session(engine) as session:
game_search = session.exec(select(Team)).all()
for game in game_search:
print(f'Game: {game}')
# def select_specic_fields():
# with Session(engine) as session:
# games = session.exec(select(Game.id, Game.away_team, Game.home_team))
# print(f'Games: {games}')
# print(f'.all(): {games.all()}')
def main():
create_db_and_tables()
create_test_games()
# select_speed_testing()
# select_all_testing()
if __name__ == "__main__":

View File

@ -124,7 +124,7 @@ async def get_team_or_none(
logger.info(f'Refreshing this_team')
session.refresh(this_team)
return this_team
except:
except Exception:
logger.info(f'Team not found, adding to db')
session.add(db_team)
session.commit()
@ -235,7 +235,7 @@ async def get_player_or_none(session: Session, player_id: int, skip_cache: bool
logger.info(f'Refreshing this_player')
session.refresh(this_player)
return this_player
except:
except Exception:
session.add(db_player)
session.commit()
session.refresh(db_player)
@ -307,7 +307,7 @@ async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache:
# logger.info(f'Refreshing this_card')
# session.refresh(this_card)
# return this_card
except:
except Exception:
logger.info(f'Card not found, adding to db')
this_card = db_bc
session.add(this_card)
@ -330,7 +330,7 @@ async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache:
# logger.info(f'Refreshing this_card')
# session.refresh(this_card)
# return this_card
except:
except Exception:
logger.info(f'Card not found, adding to db')
this_vl_rating = db_vl
session.add(this_vl_rating)
@ -353,7 +353,7 @@ async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache:
# logger.info(f'Refreshing this_card')
# session.refresh(this_card)
# return this_card
except:
except Exception:
logger.info(f'Card not found, adding to db')
this_vr_rating = db_vr
session.add(this_vr_rating)
@ -444,7 +444,7 @@ async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache:
# logger.info(f'Refreshing this_card')
# session.refresh(this_card)
# return this_card
except:
except Exception:
logger.info(f'Card not found, adding to db')
this_card = db_bc
session.add(this_card)
@ -467,7 +467,7 @@ async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache:
# logger.info(f'Refreshing this_card')
# session.refresh(this_card)
# return this_card
except:
except Exception:
logger.info(f'Card not found, adding to db')
this_vl_rating = db_vl
session.add(this_vl_rating)
@ -490,7 +490,7 @@ async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache:
# logger.info(f'Refreshing this_card')
# session.refresh(this_card)
# return this_card
except:
except Exception:
logger.info(f'Card not found, adding to db')
this_vr_rating = db_vr
session.add(this_vr_rating)
@ -699,7 +699,7 @@ async def get_or_create_ai_card(session: Session, player: Player, team: Team, sk
logger.info(f'Refreshing this_card')
session.refresh(this_card)
return this_card
except:
except Exception:
logger.info(f'Card not found, adding to db')
session.add(db_card)
session.commit()
@ -808,7 +808,7 @@ async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = Fa
logger.info(f'Refreshing this_card')
session.refresh(this_card)
return this_card
except:
except Exception:
logger.info(f'Card not found, adding to db')
session.add(db_card)
session.commit()

View File

@ -0,0 +1,154 @@
"""
Tests for evolution tier completion notification embeds (WP-14).
These are pure unit tests no database or Discord bot connection required.
Each test constructs embeds and asserts on title, description, color, and
footer to verify the notification design spec is met.
"""
import discord
from utilities.evolution_notifications import (
TIER_COLORS,
build_tier_embeds,
tier_up_embed,
)
class TestTierUpEmbed:
"""Unit tests for tier_up_embed() — standard (T1T3) and fully-evolved (T4) paths."""
def test_tier_up_title(self):
"""Standard tier-up embeds must use the 'Evolution Tier Up!' title."""
embed = tier_up_embed(
"Mike Trout", tier=2, tier_name="Rising", track_name="Batter"
)
assert embed.title == "Evolution Tier Up!"
def test_tier_up_description_format(self):
"""Description must include player name, tier number, tier name, and track name."""
embed = tier_up_embed(
"Mike Trout", tier=2, tier_name="Rising", track_name="Batter"
)
assert (
embed.description
== "Mike Trout reached Tier 2 (Rising) on the Batter track"
)
def test_tier_up_color_matches_tier(self):
"""Each tier must map to its specified embed color."""
for tier, expected_color in TIER_COLORS.items():
if tier == 4:
continue # T4 handled in fully-evolved tests
embed = tier_up_embed(
"Test Player", tier=tier, tier_name="Name", track_name="Batter"
)
assert embed.color.value == expected_color, f"Tier {tier} color mismatch"
def test_tier_up_no_footer_for_standard_tiers(self):
"""Standard tier-up embeds (T1T3) must not have a footer."""
for tier in (1, 2, 3):
embed = tier_up_embed(
"Test Player", tier=tier, tier_name="Name", track_name="Batter"
)
assert embed.footer.text is None
class TestFullyEvolvedEmbed:
"""Unit tests for the fully-evolved (T4) embed — distinct title, description, and footer."""
def test_fully_evolved_title(self):
"""T4 embeds must use the 'FULLY EVOLVED!' title."""
embed = tier_up_embed(
"Mike Trout", tier=4, tier_name="Legendary", track_name="Batter"
)
assert embed.title == "FULLY EVOLVED!"
def test_fully_evolved_description(self):
"""T4 description must indicate maximum evolution without mentioning tier number."""
embed = tier_up_embed(
"Mike Trout", tier=4, tier_name="Legendary", track_name="Batter"
)
assert (
embed.description
== "Mike Trout has reached maximum evolution on the Batter track"
)
def test_fully_evolved_footer(self):
"""T4 embeds must include the Phase 2 teaser footer."""
embed = tier_up_embed(
"Mike Trout", tier=4, tier_name="Legendary", track_name="Batter"
)
assert embed.footer.text == "Rating boosts coming in a future update!"
def test_fully_evolved_color(self):
"""T4 embed color must be teal."""
embed = tier_up_embed(
"Mike Trout", tier=4, tier_name="Legendary", track_name="Batter"
)
assert embed.color.value == TIER_COLORS[4]
class TestBuildTierEmbeds:
"""Unit tests for build_tier_embeds() — list construction and edge cases."""
def test_no_tier_ups_returns_empty_list(self):
"""When no tier-ups occurred, build_tier_embeds must return an empty list."""
result = build_tier_embeds([])
assert result == []
def test_single_tier_up_returns_one_embed(self):
"""A single tier-up event must produce exactly one embed."""
tier_ups = [
{
"player_name": "Mike Trout",
"tier": 2,
"tier_name": "Rising",
"track_name": "Batter",
}
]
result = build_tier_embeds(tier_ups)
assert len(result) == 1
assert isinstance(result[0], discord.Embed)
def test_multiple_tier_ups_return_separate_embeds(self):
"""Multiple tier-up events in one game must produce one embed per event."""
tier_ups = [
{
"player_name": "Mike Trout",
"tier": 2,
"tier_name": "Rising",
"track_name": "Batter",
},
{
"player_name": "Sandy Koufax",
"tier": 3,
"tier_name": "Elite",
"track_name": "Starter",
},
]
result = build_tier_embeds(tier_ups)
assert len(result) == 2
assert (
result[0].description
== "Mike Trout reached Tier 2 (Rising) on the Batter track"
)
assert (
result[1].description
== "Sandy Koufax reached Tier 3 (Elite) on the Starter track"
)
def test_fully_evolved_in_batch(self):
"""A T4 event in a batch must produce a fully-evolved embed, not a standard one."""
tier_ups = [
{
"player_name": "Babe Ruth",
"tier": 4,
"tier_name": "Legendary",
"track_name": "Batter",
}
]
result = build_tier_embeds(tier_ups)
assert len(result) == 1
assert result[0].title == "FULLY EVOLVED!"
assert result[0].footer.text == "Rating boosts coming in a future update!"

View File

@ -0,0 +1,59 @@
import discord
# Tier colors as Discord embed color integers
TIER_COLORS = {
1: 0x57F287, # green
2: 0xF1C40F, # gold
3: 0x9B59B6, # purple
4: 0x1ABC9C, # teal
}
MAX_TIER = 4
def tier_up_embed(
player_name: str, tier: int, tier_name: str, track_name: str
) -> discord.Embed:
"""
Build a Discord embed for a single evolution tier-up event.
For tier 4 (fully evolved), uses a distinct title, description, and footer.
For tiers 13, uses the standard tier-up format.
"""
color = TIER_COLORS.get(tier, 0xFFFFFF)
if tier == MAX_TIER:
embed = discord.Embed(
title="FULLY EVOLVED!",
description=f"{player_name} has reached maximum evolution on the {track_name} track",
color=color,
)
embed.set_footer(text="Rating boosts coming in a future update!")
else:
embed = discord.Embed(
title="Evolution Tier Up!",
description=f"{player_name} reached Tier {tier} ({tier_name}) on the {track_name} track",
color=color,
)
return embed
def build_tier_embeds(tier_ups: list) -> list:
"""
Build a list of Discord embeds for all tier-up events in a game.
Each item in tier_ups should be a dict with keys:
player_name (str), tier (int), tier_name (str), track_name (str)
Returns an empty list if there are no tier-ups.
"""
return [
tier_up_embed(
player_name=t["player_name"],
tier=t["tier"],
tier_name=t["tier_name"],
track_name=t["track_name"],
)
for t in tier_ups
]