Merge branch 'main' into ai/paper-dynasty-database-79
All checks were successful
Build Docker Image / build (pull_request) Successful in 1m11s
All checks were successful
Build Docker Image / build (pull_request) Successful in 1m11s
This commit is contained in:
commit
d98f8ea8ab
1808
cogs/gameplay.py
1808
cogs/gameplay.py
File diff suppressed because it is too large
Load Diff
1353
cogs/players.py
1353
cogs/players.py
File diff suppressed because it is too large
Load Diff
@ -12,65 +12,89 @@ import datetime
|
|||||||
from sqlmodel import Session
|
from sqlmodel import Session
|
||||||
from api_calls import db_get, db_post, db_patch, db_delete, get_team_by_abbrev
|
from api_calls import db_get, db_post, db_patch, db_delete, get_team_by_abbrev
|
||||||
from helpers import (
|
from helpers import (
|
||||||
ACTIVE_EVENT_LITERAL, PD_PLAYERS_ROLE_NAME, get_team_embed, get_team_by_owner,
|
ACTIVE_EVENT_LITERAL,
|
||||||
legal_channel, Confirm, send_to_channel
|
PD_PLAYERS_ROLE_NAME,
|
||||||
|
get_team_embed,
|
||||||
|
get_team_by_owner,
|
||||||
|
legal_channel,
|
||||||
|
Confirm,
|
||||||
|
send_to_channel,
|
||||||
)
|
)
|
||||||
from helpers.utils import get_roster_sheet, get_cal_user
|
from helpers.utils import get_roster_sheet, get_cal_user
|
||||||
from utilities.buttons import ask_with_buttons
|
from utilities.buttons import ask_with_buttons
|
||||||
from in_game.gameplay_models import engine
|
from in_game.gameplay_models import engine
|
||||||
from in_game.gameplay_queries import get_team_or_none
|
from in_game.gameplay_queries import get_team_or_none
|
||||||
|
|
||||||
logger = logging.getLogger('discord_app')
|
logger = logging.getLogger("discord_app")
|
||||||
|
|
||||||
# Try to import gauntlets module, provide fallback if not available
|
# Try to import gauntlets module, provide fallback if not available
|
||||||
try:
|
try:
|
||||||
import gauntlets
|
import gauntlets
|
||||||
|
|
||||||
GAUNTLETS_AVAILABLE = True
|
GAUNTLETS_AVAILABLE = True
|
||||||
except ImportError:
|
except ImportError:
|
||||||
logger.warning("Gauntlets module not available - gauntlet commands will have limited functionality")
|
logger.warning(
|
||||||
|
"Gauntlets module not available - gauntlet commands will have limited functionality"
|
||||||
|
)
|
||||||
GAUNTLETS_AVAILABLE = False
|
GAUNTLETS_AVAILABLE = False
|
||||||
gauntlets = None
|
gauntlets = None
|
||||||
|
|
||||||
|
|
||||||
class Gauntlet(commands.Cog):
|
class Gauntlet(commands.Cog):
|
||||||
"""Gauntlet game mode functionality for Paper Dynasty."""
|
"""Gauntlet game mode functionality for Paper Dynasty."""
|
||||||
|
|
||||||
def __init__(self, bot):
|
def __init__(self, bot):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
|
|
||||||
group_gauntlet = app_commands.Group(name='gauntlets', description='Check your progress or start a new Gauntlet')
|
group_gauntlet = app_commands.Group(
|
||||||
|
name="gauntlets", description="Check your progress or start a new Gauntlet"
|
||||||
|
)
|
||||||
|
|
||||||
@group_gauntlet.command(name='status', description='View status of current Gauntlet run')
|
@group_gauntlet.command(
|
||||||
|
name="status", description="View status of current Gauntlet run"
|
||||||
|
)
|
||||||
@app_commands.describe(
|
@app_commands.describe(
|
||||||
team_abbrev='To check the status of a team\'s active run, enter their abbreviation'
|
team_abbrev="To check the status of a team's active run, enter their abbreviation"
|
||||||
)
|
)
|
||||||
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
||||||
async def gauntlet_run_command(
|
async def gauntlet_run_command(
|
||||||
self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL, # type: ignore
|
self,
|
||||||
team_abbrev: Optional[str] = None):
|
interaction: discord.Interaction,
|
||||||
|
event_name: ACTIVE_EVENT_LITERAL, # type: ignore
|
||||||
|
team_abbrev: Optional[str] = None,
|
||||||
|
):
|
||||||
"""View status of current gauntlet run - corrected to match original business logic."""
|
"""View status of current gauntlet run - corrected to match original business logic."""
|
||||||
await interaction.response.defer()
|
await interaction.response.defer()
|
||||||
|
|
||||||
e_query = await db_get('events', params=[("name", event_name), ("active", True)])
|
e_query = await db_get(
|
||||||
if not e_query or e_query.get('count', 0) == 0:
|
"events", params=[("name", event_name), ("active", True)]
|
||||||
await interaction.edit_original_response(content=f'Hmm...looks like that event is inactive.')
|
)
|
||||||
|
if not e_query or e_query.get("count", 0) == 0:
|
||||||
|
await interaction.edit_original_response(
|
||||||
|
content=f"Hmm...looks like that event is inactive."
|
||||||
|
)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
this_event = e_query['events'][0]
|
this_event = e_query["events"][0]
|
||||||
|
|
||||||
this_run, this_team = None, None
|
this_run, this_team = None, None
|
||||||
if team_abbrev:
|
if team_abbrev:
|
||||||
if 'Gauntlet-' not in team_abbrev:
|
if "Gauntlet-" not in team_abbrev:
|
||||||
team_abbrev = f'Gauntlet-{team_abbrev}'
|
team_abbrev = f"Gauntlet-{team_abbrev}"
|
||||||
t_query = await db_get('teams', params=[('abbrev', team_abbrev)])
|
t_query = await db_get("teams", params=[("abbrev", team_abbrev)])
|
||||||
if t_query and t_query.get('count', 0) != 0:
|
if t_query and t_query.get("count", 0) != 0:
|
||||||
this_team = t_query['teams'][0]
|
this_team = t_query["teams"][0]
|
||||||
r_query = await db_get('gauntletruns', params=[
|
r_query = await db_get(
|
||||||
('team_id', this_team['id']), ('is_active', True), ('gauntlet_id', this_event['id'])
|
"gauntletruns",
|
||||||
])
|
params=[
|
||||||
|
("team_id", this_team["id"]),
|
||||||
|
("is_active", True),
|
||||||
|
("gauntlet_id", this_event["id"]),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
if r_query and r_query.get('count', 0) != 0:
|
if r_query and r_query.get("count", 0) != 0:
|
||||||
this_run = r_query['runs'][0]
|
this_run = r_query["runs"][0]
|
||||||
else:
|
else:
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content=f'I do not see an active run for the {this_team["lname"]}.'
|
content=f'I do not see an active run for the {this_team["lname"]}.'
|
||||||
@ -78,7 +102,7 @@ class Gauntlet(commands.Cog):
|
|||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content=f'I do not see an active run for {team_abbrev.upper()}.'
|
content=f"I do not see an active run for {team_abbrev.upper()}."
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -86,127 +110,168 @@ class Gauntlet(commands.Cog):
|
|||||||
if GAUNTLETS_AVAILABLE and gauntlets:
|
if GAUNTLETS_AVAILABLE and gauntlets:
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content=None,
|
content=None,
|
||||||
embed=await gauntlets.get_embed(this_run, this_event, this_team) # type: ignore
|
embed=await gauntlets.get_embed(this_run, this_event, this_team), # type: ignore
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content='Gauntlet status unavailable - gauntlets module not loaded.'
|
content="Gauntlet status unavailable - gauntlets module not loaded."
|
||||||
)
|
)
|
||||||
|
|
||||||
@group_gauntlet.command(name='start', description='Start a new Gauntlet run')
|
@group_gauntlet.command(name="start", description="Start a new Gauntlet run")
|
||||||
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
||||||
async def gauntlet_start_command(self, interaction: discord.Interaction):
|
async def gauntlet_start_command(self, interaction: discord.Interaction):
|
||||||
"""Start a new gauntlet run."""
|
"""Start a new gauntlet run."""
|
||||||
|
|
||||||
# Channel restriction - must be in a 'hello' channel (private channel)
|
# Channel restriction - must be in a 'hello' channel (private channel)
|
||||||
if interaction.channel and hasattr(interaction.channel, 'name') and 'hello' not in str(interaction.channel.name):
|
if (
|
||||||
|
interaction.channel
|
||||||
|
and hasattr(interaction.channel, "name")
|
||||||
|
and "hello" not in str(interaction.channel.name)
|
||||||
|
):
|
||||||
await interaction.response.send_message(
|
await interaction.response.send_message(
|
||||||
content='The draft will probably take you about 15 minutes. Why don\'t you head to your private '
|
content="The draft will probably take you about 15 minutes. Why don't you head to your private "
|
||||||
'channel to run the draft?',
|
"channel to run the draft?",
|
||||||
ephemeral=True
|
ephemeral=True,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(f'Starting a gauntlet run for user {interaction.user.name}')
|
logger.info(f"Starting a gauntlet run for user {interaction.user.name}")
|
||||||
await interaction.response.defer()
|
await interaction.response.defer()
|
||||||
|
|
||||||
with Session(engine) as session:
|
with Session(engine) as session:
|
||||||
main_team = await get_team_or_none(session, gm_id=interaction.user.id, main_team=True)
|
main_team = await get_team_or_none(
|
||||||
draft_team = await get_team_or_none(session, gm_id=interaction.user.id, gauntlet_team=True)
|
session, gm_id=interaction.user.id, main_team=True
|
||||||
|
)
|
||||||
|
draft_team = await get_team_or_none(
|
||||||
|
session, gm_id=interaction.user.id, gauntlet_team=True
|
||||||
|
)
|
||||||
|
|
||||||
# Get active events
|
# Get active events
|
||||||
e_query = await db_get('events', params=[("active", True)])
|
e_query = await db_get("events", params=[("active", True)])
|
||||||
if not e_query or e_query.get('count', 0) == 0:
|
if not e_query or e_query.get("count", 0) == 0:
|
||||||
await interaction.edit_original_response(content='Hmm...I don\'t see any active events.')
|
await interaction.edit_original_response(
|
||||||
|
content="Hmm...I don't see any active events."
|
||||||
|
)
|
||||||
return
|
return
|
||||||
elif e_query.get('count', 0) == 1:
|
elif e_query.get("count", 0) == 1:
|
||||||
this_event = e_query['events'][0]
|
this_event = e_query["events"][0]
|
||||||
else:
|
else:
|
||||||
event_choice = await ask_with_buttons(
|
event_choice = await ask_with_buttons(
|
||||||
interaction,
|
interaction,
|
||||||
button_options=[x['name'] for x in e_query['events']],
|
button_options=[x["name"] for x in e_query["events"]],
|
||||||
question='Which event would you like to take on?',
|
question="Which event would you like to take on?",
|
||||||
timeout=3,
|
timeout=3,
|
||||||
delete_question=False
|
delete_question=False,
|
||||||
)
|
)
|
||||||
this_event = [event for event in e_query['events'] if event['name'] == event_choice][0]
|
this_event = [
|
||||||
|
event
|
||||||
logger.info(f'this_event: {this_event}')
|
for event in e_query["events"]
|
||||||
|
if event["name"] == event_choice
|
||||||
|
][0]
|
||||||
|
|
||||||
|
logger.info(f"this_event: {this_event}")
|
||||||
|
|
||||||
first_flag = draft_team is None
|
first_flag = draft_team is None
|
||||||
if draft_team is not None:
|
if draft_team is not None:
|
||||||
r_query = await db_get(
|
r_query = await db_get(
|
||||||
'gauntletruns',
|
"gauntletruns",
|
||||||
params=[('team_id', draft_team.id), ('gauntlet_id', this_event['id']), ('is_active', True)]
|
params=[
|
||||||
|
("team_id", draft_team.id),
|
||||||
|
("gauntlet_id", this_event["id"]),
|
||||||
|
("is_active", True),
|
||||||
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
if r_query and r_query.get('count', 0) != 0:
|
if r_query and r_query.get("count", 0) != 0:
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content=f'Looks like you already have a {r_query["runs"][0]["gauntlet"]["name"]} run active! '
|
content=f'Looks like you already have a {r_query["runs"][0]["gauntlet"]["name"]} run active! '
|
||||||
f'You can check it out with the `/gauntlets status` command.'
|
f"You can check it out with the `/gauntlets status` command."
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
draft_embed = await gauntlets.run_draft(interaction, main_team, this_event, draft_team) # type: ignore
|
draft_embed = await gauntlets.run_draft(interaction, main_team, this_event, draft_team) # type: ignore
|
||||||
except ZeroDivisionError as e:
|
except ZeroDivisionError as e:
|
||||||
|
logger.error(
|
||||||
|
f'ZeroDivisionError in {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}'
|
||||||
|
)
|
||||||
|
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
|
||||||
|
await interaction.followup.send(
|
||||||
|
content=f"Shoot - it looks like we ran into an issue running the draft. I had to clear it all out "
|
||||||
|
f"for now. I let {get_cal_user(interaction).mention} know what happened so he better "
|
||||||
|
f"fix it quick."
|
||||||
|
)
|
||||||
return
|
return
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f'Failed to run {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}')
|
logger.error(
|
||||||
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
|
f'Failed to run {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}'
|
||||||
|
)
|
||||||
|
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
|
||||||
await interaction.followup.send(
|
await interaction.followup.send(
|
||||||
content=f'Shoot - it looks like we ran into an issue running the draft. I had to clear it all out '
|
content=f"Shoot - it looks like we ran into an issue running the draft. I had to clear it all out "
|
||||||
f'for now. I let {get_cal_user(interaction).mention} know what happened so he better '
|
f"for now. I let {get_cal_user(interaction).mention} know what happened so he better "
|
||||||
f'fix it quick.'
|
f"fix it quick."
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
if first_flag:
|
if first_flag:
|
||||||
await interaction.followup.send(
|
await interaction.followup.send(
|
||||||
f'Good luck, champ in the making! To start playing, follow these steps:\n\n'
|
f"Good luck, champ in the making! To start playing, follow these steps:\n\n"
|
||||||
f'1) Make a copy of the Team Sheet Template found in `/help-pd links`\n'
|
f"1) Make a copy of the Team Sheet Template found in `/help-pd links`\n"
|
||||||
f'2) Run `/newsheet` to link it to your Gauntlet team\n'
|
f"2) Run `/newsheet` to link it to your Gauntlet team\n"
|
||||||
f'3) Go play your first game with `/new-game gauntlet {this_event["name"]}`'
|
f'3) Go play your first game with `/new-game gauntlet {this_event["name"]}`'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await interaction.followup.send(
|
await interaction.followup.send(
|
||||||
f'Good luck, champ in the making! In your team sheet, sync your cards with **Paper Dynasty** -> '
|
f"Good luck, champ in the making! In your team sheet, sync your cards with **Paper Dynasty** -> "
|
||||||
f'**Data Imports** -> **My Cards** then you can set your lineup here and you\'ll be ready to go!\n\n'
|
f"**Data Imports** -> **My Cards** then you can set your lineup here and you'll be ready to go!\n\n"
|
||||||
f'{get_roster_sheet(draft_team)}'
|
f"{get_roster_sheet(draft_team)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
await send_to_channel(
|
await send_to_channel(
|
||||||
bot=self.bot,
|
bot=self.bot,
|
||||||
channel_name='pd-news-ticker',
|
channel_name="pd-news-ticker",
|
||||||
content=f'The {main_team.lname if main_team else "Unknown Team"} have entered the {this_event["name"]} Gauntlet!',
|
content=f'The {main_team.lname if main_team else "Unknown Team"} have entered the {this_event["name"]} Gauntlet!',
|
||||||
embed=draft_embed
|
embed=draft_embed,
|
||||||
)
|
)
|
||||||
|
|
||||||
@group_gauntlet.command(name='reset', description='Wipe your current team so you can re-draft')
|
@group_gauntlet.command(
|
||||||
|
name="reset", description="Wipe your current team so you can re-draft"
|
||||||
|
)
|
||||||
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
||||||
async def gauntlet_reset_command(self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL): # type: ignore
|
async def gauntlet_reset_command(self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL): # type: ignore
|
||||||
"""Reset current gauntlet run."""
|
"""Reset current gauntlet run."""
|
||||||
await interaction.response.defer()
|
await interaction.response.defer()
|
||||||
main_team = await get_team_by_owner(interaction.user.id)
|
main_team = await get_team_by_owner(interaction.user.id)
|
||||||
draft_team = await get_team_by_abbrev(f'Gauntlet-{main_team["abbrev"]}')
|
draft_team = await get_team_by_abbrev(f'Gauntlet-{main_team["abbrev"]}')
|
||||||
if draft_team is None:
|
if draft_team is None:
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content='Hmm, I can\'t find a gauntlet team for you. Have you signed up already?')
|
content="Hmm, I can't find a gauntlet team for you. Have you signed up already?"
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
e_query = await db_get('events', params=[("name", event_name), ("active", True)])
|
e_query = await db_get(
|
||||||
if e_query['count'] == 0:
|
"events", params=[("name", event_name), ("active", True)]
|
||||||
await interaction.edit_original_response(content='Hmm...looks like that event is inactive.')
|
)
|
||||||
|
if e_query["count"] == 0:
|
||||||
|
await interaction.edit_original_response(
|
||||||
|
content="Hmm...looks like that event is inactive."
|
||||||
|
)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
this_event = e_query['events'][0]
|
this_event = e_query["events"][0]
|
||||||
|
|
||||||
r_query = await db_get('gauntletruns', params=[
|
r_query = await db_get(
|
||||||
('team_id', draft_team['id']), ('is_active', True), ('gauntlet_id', this_event['id'])
|
"gauntletruns",
|
||||||
])
|
params=[
|
||||||
|
("team_id", draft_team["id"]),
|
||||||
|
("is_active", True),
|
||||||
|
("gauntlet_id", this_event["id"]),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
if r_query and r_query.get('count', 0) != 0:
|
if r_query and r_query.get("count", 0) != 0:
|
||||||
this_run = r_query['runs'][0]
|
this_run = r_query["runs"][0]
|
||||||
else:
|
else:
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content=f'I do not see an active run for the {draft_team["lname"]}.'
|
content=f'I do not see an active run for the {draft_team["lname"]}.'
|
||||||
@ -214,27 +279,24 @@ class Gauntlet(commands.Cog):
|
|||||||
return
|
return
|
||||||
|
|
||||||
view = Confirm(responders=[interaction.user], timeout=60)
|
view = Confirm(responders=[interaction.user], timeout=60)
|
||||||
conf_string = f'Are you sure you want to wipe your active run?'
|
conf_string = f"Are you sure you want to wipe your active run?"
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(content=conf_string, view=view)
|
||||||
content=conf_string,
|
|
||||||
view=view
|
|
||||||
)
|
|
||||||
await view.wait()
|
await view.wait()
|
||||||
|
|
||||||
if view.value:
|
if view.value:
|
||||||
await gauntlets.end_run(this_run, this_event, draft_team, force_end=True) # type: ignore
|
await gauntlets.end_run(this_run, this_event, draft_team, force_end=True) # type: ignore
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content=f'Your {event_name} run has been reset. Run `/gauntlets start` to redraft!',
|
content=f"Your {event_name} run has been reset. Run `/gauntlets start` to redraft!",
|
||||||
view=None
|
view=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
await interaction.edit_original_response(
|
await interaction.edit_original_response(
|
||||||
content=f'~~{conf_string}~~\n\nNo worries, I will leave it active.',
|
content=f"~~{conf_string}~~\n\nNo worries, I will leave it active.",
|
||||||
view=None
|
view=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def setup(bot):
|
async def setup(bot):
|
||||||
"""Setup function for the Gauntlet cog."""
|
"""Setup function for the Gauntlet cog."""
|
||||||
await bot.add_cog(Gauntlet(bot))
|
await bot.add_cog(Gauntlet(bot))
|
||||||
|
|||||||
@ -4295,12 +4295,14 @@ async def complete_game(
|
|||||||
else this_game.away_team
|
else this_game.away_team
|
||||||
)
|
)
|
||||||
|
|
||||||
|
db_game = None
|
||||||
try:
|
try:
|
||||||
db_game = await db_post("games", payload=game_data)
|
db_game = await db_post("games", payload=game_data)
|
||||||
db_ready_plays = get_db_ready_plays(session, this_game, db_game["id"])
|
db_ready_plays = get_db_ready_plays(session, this_game, db_game["id"])
|
||||||
db_ready_decisions = get_db_ready_decisions(session, this_game, db_game["id"])
|
db_ready_decisions = get_db_ready_decisions(session, this_game, db_game["id"])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await roll_back(db_game["id"])
|
if db_game is not None:
|
||||||
|
await roll_back(db_game["id"])
|
||||||
log_exception(e, msg="Unable to post game to API, rolling back")
|
log_exception(e, msg="Unable to post game to API, rolling back")
|
||||||
|
|
||||||
# Post game stats to API
|
# Post game stats to API
|
||||||
|
|||||||
171
discord_utils.py
171
discord_utils.py
@ -4,6 +4,7 @@ Discord Utilities
|
|||||||
This module contains Discord helper functions for channels, roles, embeds,
|
This module contains Discord helper functions for channels, roles, embeds,
|
||||||
and other Discord-specific operations.
|
and other Discord-specific operations.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
@ -13,19 +14,21 @@ import discord
|
|||||||
from discord.ext import commands
|
from discord.ext import commands
|
||||||
from helpers.constants import SBA_COLOR, PD_SEASON, IMAGES
|
from helpers.constants import SBA_COLOR, PD_SEASON, IMAGES
|
||||||
|
|
||||||
logger = logging.getLogger('discord_app')
|
logger = logging.getLogger("discord_app")
|
||||||
|
|
||||||
|
|
||||||
async def send_to_bothole(ctx, content, embed):
|
async def send_to_bothole(ctx, content, embed):
|
||||||
"""Send a message to the pd-bot-hole channel."""
|
"""Send a message to the pd-bot-hole channel."""
|
||||||
await discord.utils.get(ctx.guild.text_channels, name='pd-bot-hole') \
|
await discord.utils.get(ctx.guild.text_channels, name="pd-bot-hole").send(
|
||||||
.send(content=content, embed=embed)
|
content=content, embed=embed
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def send_to_news(ctx, content, embed):
|
async def send_to_news(ctx, content, embed):
|
||||||
"""Send a message to the pd-news-ticker channel."""
|
"""Send a message to the pd-news-ticker channel."""
|
||||||
await discord.utils.get(ctx.guild.text_channels, name='pd-news-ticker') \
|
await discord.utils.get(ctx.guild.text_channels, name="pd-news-ticker").send(
|
||||||
.send(content=content, embed=embed)
|
content=content, embed=embed
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def typing_pause(ctx, seconds=1):
|
async def typing_pause(ctx, seconds=1):
|
||||||
@ -43,23 +46,20 @@ async def pause_then_type(ctx, message):
|
|||||||
|
|
||||||
async def check_if_pdhole(ctx):
|
async def check_if_pdhole(ctx):
|
||||||
"""Check if the current channel is pd-bot-hole."""
|
"""Check if the current channel is pd-bot-hole."""
|
||||||
if ctx.message.channel.name != 'pd-bot-hole':
|
if ctx.message.channel.name != "pd-bot-hole":
|
||||||
await ctx.send('Slide on down to my bot-hole for running commands.')
|
await ctx.send("Slide on down to my bot-hole for running commands.")
|
||||||
await ctx.message.add_reaction('❌')
|
await ctx.message.add_reaction("❌")
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def bad_channel(ctx):
|
async def bad_channel(ctx):
|
||||||
"""Check if current channel is in the list of bad channels for commands."""
|
"""Check if current channel is in the list of bad channels for commands."""
|
||||||
bad_channels = ['paper-dynasty-chat', 'pd-news-ticker']
|
bad_channels = ["paper-dynasty-chat", "pd-news-ticker"]
|
||||||
if ctx.message.channel.name in bad_channels:
|
if ctx.message.channel.name in bad_channels:
|
||||||
await ctx.message.add_reaction('❌')
|
await ctx.message.add_reaction("❌")
|
||||||
bot_hole = discord.utils.get(
|
bot_hole = discord.utils.get(ctx.guild.text_channels, name=f"pd-bot-hole")
|
||||||
ctx.guild.text_channels,
|
await ctx.send(f"Slide on down to the {bot_hole.mention} ;)")
|
||||||
name=f'pd-bot-hole'
|
|
||||||
)
|
|
||||||
await ctx.send(f'Slide on down to the {bot_hole.mention} ;)')
|
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
@ -68,14 +68,11 @@ async def bad_channel(ctx):
|
|||||||
def get_channel(ctx, name) -> Optional[discord.TextChannel]:
|
def get_channel(ctx, name) -> Optional[discord.TextChannel]:
|
||||||
"""Get a text channel by name."""
|
"""Get a text channel by name."""
|
||||||
# Handle both Context and Interaction objects
|
# Handle both Context and Interaction objects
|
||||||
guild = ctx.guild if hasattr(ctx, 'guild') else None
|
guild = ctx.guild if hasattr(ctx, "guild") else None
|
||||||
if not guild:
|
if not guild:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
channel = discord.utils.get(
|
channel = discord.utils.get(guild.text_channels, name=name)
|
||||||
guild.text_channels,
|
|
||||||
name=name
|
|
||||||
)
|
|
||||||
if channel:
|
if channel:
|
||||||
return channel
|
return channel
|
||||||
return None
|
return None
|
||||||
@ -87,7 +84,7 @@ async def get_emoji(ctx, name, return_empty=True):
|
|||||||
emoji = await commands.converter.EmojiConverter().convert(ctx, name)
|
emoji = await commands.converter.EmojiConverter().convert(ctx, name)
|
||||||
except:
|
except:
|
||||||
if return_empty:
|
if return_empty:
|
||||||
emoji = ''
|
emoji = ""
|
||||||
else:
|
else:
|
||||||
return name
|
return name
|
||||||
return emoji
|
return emoji
|
||||||
@ -101,9 +98,13 @@ async def react_and_reply(ctx, reaction, message):
|
|||||||
|
|
||||||
async def send_to_channel(bot, channel_name, content=None, embed=None):
|
async def send_to_channel(bot, channel_name, content=None, embed=None):
|
||||||
"""Send a message to a specific channel by name or ID."""
|
"""Send a message to a specific channel by name or ID."""
|
||||||
guild = bot.get_guild(int(os.environ.get('GUILD_ID')))
|
guild_id = os.environ.get("GUILD_ID")
|
||||||
|
if not guild_id:
|
||||||
|
logger.error("GUILD_ID env var is not set")
|
||||||
|
return
|
||||||
|
guild = bot.get_guild(int(guild_id))
|
||||||
if not guild:
|
if not guild:
|
||||||
logger.error('Cannot send to channel - bot not logged in')
|
logger.error("Cannot send to channel - bot not logged in")
|
||||||
return
|
return
|
||||||
|
|
||||||
this_channel = discord.utils.get(guild.text_channels, name=channel_name)
|
this_channel = discord.utils.get(guild.text_channels, name=channel_name)
|
||||||
@ -111,7 +112,7 @@ async def send_to_channel(bot, channel_name, content=None, embed=None):
|
|||||||
if not this_channel:
|
if not this_channel:
|
||||||
this_channel = discord.utils.get(guild.text_channels, id=channel_name)
|
this_channel = discord.utils.get(guild.text_channels, id=channel_name)
|
||||||
if not this_channel:
|
if not this_channel:
|
||||||
raise NameError(f'**{channel_name}** channel not found')
|
raise NameError(f"**{channel_name}** channel not found")
|
||||||
|
|
||||||
return await this_channel.send(content=content, embed=embed)
|
return await this_channel.send(content=content, embed=embed)
|
||||||
|
|
||||||
@ -128,14 +129,16 @@ async def get_or_create_role(ctx, role_name, mentionable=True):
|
|||||||
|
|
||||||
def get_special_embed(special):
|
def get_special_embed(special):
|
||||||
"""Create an embed for a special item."""
|
"""Create an embed for a special item."""
|
||||||
embed = discord.Embed(title=f'{special.name} - Special #{special.get_id()}',
|
embed = discord.Embed(
|
||||||
color=discord.Color.random(),
|
title=f"{special.name} - Special #{special.get_id()}",
|
||||||
description=f'{special.short_desc}')
|
color=discord.Color.random(),
|
||||||
embed.add_field(name='Description', value=f'{special.long_desc}', inline=False)
|
description=f"{special.short_desc}",
|
||||||
if special.thumbnail.lower() != 'none':
|
)
|
||||||
embed.set_thumbnail(url=f'{special.thumbnail}')
|
embed.add_field(name="Description", value=f"{special.long_desc}", inline=False)
|
||||||
if special.url.lower() != 'none':
|
if special.thumbnail.lower() != "none":
|
||||||
embed.set_image(url=f'{special.url}')
|
embed.set_thumbnail(url=f"{special.thumbnail}")
|
||||||
|
if special.url.lower() != "none":
|
||||||
|
embed.set_image(url=f"{special.url}")
|
||||||
|
|
||||||
return embed
|
return embed
|
||||||
|
|
||||||
@ -154,99 +157,125 @@ def get_team_embed(title, team=None, thumbnail: bool = True):
|
|||||||
if team:
|
if team:
|
||||||
embed = discord.Embed(
|
embed = discord.Embed(
|
||||||
title=title,
|
title=title,
|
||||||
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16)
|
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16),
|
||||||
|
)
|
||||||
|
embed.set_footer(
|
||||||
|
text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES["logo"]
|
||||||
)
|
)
|
||||||
embed.set_footer(text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES['logo'])
|
|
||||||
if thumbnail:
|
if thumbnail:
|
||||||
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES['logo'])
|
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES["logo"])
|
||||||
else:
|
else:
|
||||||
embed = discord.Embed(
|
embed = discord.Embed(title=title, color=int(SBA_COLOR, 16))
|
||||||
title=title,
|
embed.set_footer(
|
||||||
color=int(SBA_COLOR, 16)
|
text=f"Paper Dynasty Season {PD_SEASON}", icon_url=IMAGES["logo"]
|
||||||
)
|
)
|
||||||
embed.set_footer(text=f'Paper Dynasty Season {PD_SEASON}', icon_url=IMAGES['logo'])
|
|
||||||
if thumbnail:
|
if thumbnail:
|
||||||
embed.set_thumbnail(url=IMAGES['logo'])
|
embed.set_thumbnail(url=IMAGES["logo"])
|
||||||
|
|
||||||
return embed
|
return embed
|
||||||
|
|
||||||
|
|
||||||
async def create_channel_old(
|
async def create_channel_old(
|
||||||
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True, allowed_members=None,
|
ctx,
|
||||||
allowed_roles=None):
|
channel_name: str,
|
||||||
|
category_name: str,
|
||||||
|
everyone_send=False,
|
||||||
|
everyone_read=True,
|
||||||
|
allowed_members=None,
|
||||||
|
allowed_roles=None,
|
||||||
|
):
|
||||||
"""Create a text channel with specified permissions (legacy version)."""
|
"""Create a text channel with specified permissions (legacy version)."""
|
||||||
this_category = discord.utils.get(ctx.guild.categories, name=category_name)
|
this_category = discord.utils.get(ctx.guild.categories, name=category_name)
|
||||||
if not this_category:
|
if not this_category:
|
||||||
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
|
raise ValueError(f"I couldn't find a category named **{category_name}**")
|
||||||
|
|
||||||
overwrites = {
|
overwrites = {
|
||||||
ctx.guild.me: discord.PermissionOverwrite(read_messages=True, send_messages=True),
|
ctx.guild.me: discord.PermissionOverwrite(
|
||||||
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
|
read_messages=True, send_messages=True
|
||||||
|
),
|
||||||
|
ctx.guild.default_role: discord.PermissionOverwrite(
|
||||||
|
read_messages=everyone_read, send_messages=everyone_send
|
||||||
|
),
|
||||||
}
|
}
|
||||||
if allowed_members:
|
if allowed_members:
|
||||||
if isinstance(allowed_members, list):
|
if isinstance(allowed_members, list):
|
||||||
for member in allowed_members:
|
for member in allowed_members:
|
||||||
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
|
overwrites[member] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=True
|
||||||
|
)
|
||||||
if allowed_roles:
|
if allowed_roles:
|
||||||
if isinstance(allowed_roles, list):
|
if isinstance(allowed_roles, list):
|
||||||
for role in allowed_roles:
|
for role in allowed_roles:
|
||||||
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
|
overwrites[role] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=True
|
||||||
|
)
|
||||||
|
|
||||||
this_channel = await ctx.guild.create_text_channel(
|
this_channel = await ctx.guild.create_text_channel(
|
||||||
channel_name,
|
channel_name, overwrites=overwrites, category=this_category
|
||||||
overwrites=overwrites,
|
|
||||||
category=this_category
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
|
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
|
||||||
|
|
||||||
return this_channel
|
return this_channel
|
||||||
|
|
||||||
|
|
||||||
async def create_channel(
|
async def create_channel(
|
||||||
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True,
|
ctx,
|
||||||
read_send_members: list = None, read_send_roles: list = None, read_only_roles: list = None):
|
channel_name: str,
|
||||||
|
category_name: str,
|
||||||
|
everyone_send=False,
|
||||||
|
everyone_read=True,
|
||||||
|
read_send_members: list = None,
|
||||||
|
read_send_roles: list = None,
|
||||||
|
read_only_roles: list = None,
|
||||||
|
):
|
||||||
"""Create a text channel with specified permissions."""
|
"""Create a text channel with specified permissions."""
|
||||||
# Handle both Context and Interaction objects
|
# Handle both Context and Interaction objects
|
||||||
guild = ctx.guild if hasattr(ctx, 'guild') else None
|
guild = ctx.guild if hasattr(ctx, "guild") else None
|
||||||
if not guild:
|
if not guild:
|
||||||
raise ValueError(f'Unable to access guild from context object')
|
raise ValueError(f"Unable to access guild from context object")
|
||||||
|
|
||||||
# Get bot member - different for Context vs Interaction
|
# Get bot member - different for Context vs Interaction
|
||||||
if hasattr(ctx, 'me'): # Context object
|
if hasattr(ctx, "me"): # Context object
|
||||||
bot_member = ctx.me
|
bot_member = ctx.me
|
||||||
elif hasattr(ctx, 'client'): # Interaction object
|
elif hasattr(ctx, "client"): # Interaction object
|
||||||
bot_member = guild.get_member(ctx.client.user.id)
|
bot_member = guild.get_member(ctx.client.user.id)
|
||||||
else:
|
else:
|
||||||
# Fallback - try to find bot member by getting the first member with bot=True
|
# Fallback - try to find bot member by getting the first member with bot=True
|
||||||
bot_member = next((m for m in guild.members if m.bot), None)
|
bot_member = next((m for m in guild.members if m.bot), None)
|
||||||
if not bot_member:
|
if not bot_member:
|
||||||
raise ValueError(f'Unable to find bot member in guild')
|
raise ValueError(f"Unable to find bot member in guild")
|
||||||
|
|
||||||
this_category = discord.utils.get(guild.categories, name=category_name)
|
this_category = discord.utils.get(guild.categories, name=category_name)
|
||||||
if not this_category:
|
if not this_category:
|
||||||
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
|
raise ValueError(f"I couldn't find a category named **{category_name}**")
|
||||||
|
|
||||||
overwrites = {
|
overwrites = {
|
||||||
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True),
|
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True),
|
||||||
guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
|
guild.default_role: discord.PermissionOverwrite(
|
||||||
|
read_messages=everyone_read, send_messages=everyone_send
|
||||||
|
),
|
||||||
}
|
}
|
||||||
if read_send_members:
|
if read_send_members:
|
||||||
for member in read_send_members:
|
for member in read_send_members:
|
||||||
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
|
overwrites[member] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=True
|
||||||
|
)
|
||||||
if read_send_roles:
|
if read_send_roles:
|
||||||
for role in read_send_roles:
|
for role in read_send_roles:
|
||||||
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
|
overwrites[role] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=True
|
||||||
|
)
|
||||||
if read_only_roles:
|
if read_only_roles:
|
||||||
for role in read_only_roles:
|
for role in read_only_roles:
|
||||||
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=False)
|
overwrites[role] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=False
|
||||||
|
)
|
||||||
|
|
||||||
this_channel = await guild.create_text_channel(
|
this_channel = await guild.create_text_channel(
|
||||||
channel_name,
|
channel_name, overwrites=overwrites, category=this_category
|
||||||
overwrites=overwrites,
|
|
||||||
category=this_category
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
|
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
|
||||||
|
|
||||||
return this_channel
|
return this_channel
|
||||||
|
|||||||
@ -4,6 +4,7 @@ Discord Utilities
|
|||||||
This module contains Discord helper functions for channels, roles, embeds,
|
This module contains Discord helper functions for channels, roles, embeds,
|
||||||
and other Discord-specific operations.
|
and other Discord-specific operations.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
@ -13,19 +14,21 @@ import discord
|
|||||||
from discord.ext import commands
|
from discord.ext import commands
|
||||||
from helpers.constants import SBA_COLOR, PD_SEASON, IMAGES
|
from helpers.constants import SBA_COLOR, PD_SEASON, IMAGES
|
||||||
|
|
||||||
logger = logging.getLogger('discord_app')
|
logger = logging.getLogger("discord_app")
|
||||||
|
|
||||||
|
|
||||||
async def send_to_bothole(ctx, content, embed):
|
async def send_to_bothole(ctx, content, embed):
|
||||||
"""Send a message to the pd-bot-hole channel."""
|
"""Send a message to the pd-bot-hole channel."""
|
||||||
await discord.utils.get(ctx.guild.text_channels, name='pd-bot-hole') \
|
await discord.utils.get(ctx.guild.text_channels, name="pd-bot-hole").send(
|
||||||
.send(content=content, embed=embed)
|
content=content, embed=embed
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def send_to_news(ctx, content, embed):
|
async def send_to_news(ctx, content, embed):
|
||||||
"""Send a message to the pd-news-ticker channel."""
|
"""Send a message to the pd-news-ticker channel."""
|
||||||
await discord.utils.get(ctx.guild.text_channels, name='pd-news-ticker') \
|
await discord.utils.get(ctx.guild.text_channels, name="pd-news-ticker").send(
|
||||||
.send(content=content, embed=embed)
|
content=content, embed=embed
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def typing_pause(ctx, seconds=1):
|
async def typing_pause(ctx, seconds=1):
|
||||||
@ -43,23 +46,20 @@ async def pause_then_type(ctx, message):
|
|||||||
|
|
||||||
async def check_if_pdhole(ctx):
|
async def check_if_pdhole(ctx):
|
||||||
"""Check if the current channel is pd-bot-hole."""
|
"""Check if the current channel is pd-bot-hole."""
|
||||||
if ctx.message.channel.name != 'pd-bot-hole':
|
if ctx.message.channel.name != "pd-bot-hole":
|
||||||
await ctx.send('Slide on down to my bot-hole for running commands.')
|
await ctx.send("Slide on down to my bot-hole for running commands.")
|
||||||
await ctx.message.add_reaction('❌')
|
await ctx.message.add_reaction("❌")
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def bad_channel(ctx):
|
async def bad_channel(ctx):
|
||||||
"""Check if current channel is in the list of bad channels for commands."""
|
"""Check if current channel is in the list of bad channels for commands."""
|
||||||
bad_channels = ['paper-dynasty-chat', 'pd-news-ticker']
|
bad_channels = ["paper-dynasty-chat", "pd-news-ticker"]
|
||||||
if ctx.message.channel.name in bad_channels:
|
if ctx.message.channel.name in bad_channels:
|
||||||
await ctx.message.add_reaction('❌')
|
await ctx.message.add_reaction("❌")
|
||||||
bot_hole = discord.utils.get(
|
bot_hole = discord.utils.get(ctx.guild.text_channels, name=f"pd-bot-hole")
|
||||||
ctx.guild.text_channels,
|
await ctx.send(f"Slide on down to the {bot_hole.mention} ;)")
|
||||||
name=f'pd-bot-hole'
|
|
||||||
)
|
|
||||||
await ctx.send(f'Slide on down to the {bot_hole.mention} ;)')
|
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
@ -68,14 +68,11 @@ async def bad_channel(ctx):
|
|||||||
def get_channel(ctx, name) -> Optional[discord.TextChannel]:
|
def get_channel(ctx, name) -> Optional[discord.TextChannel]:
|
||||||
"""Get a text channel by name."""
|
"""Get a text channel by name."""
|
||||||
# Handle both Context and Interaction objects
|
# Handle both Context and Interaction objects
|
||||||
guild = ctx.guild if hasattr(ctx, 'guild') else None
|
guild = ctx.guild if hasattr(ctx, "guild") else None
|
||||||
if not guild:
|
if not guild:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
channel = discord.utils.get(
|
channel = discord.utils.get(guild.text_channels, name=name)
|
||||||
guild.text_channels,
|
|
||||||
name=name
|
|
||||||
)
|
|
||||||
if channel:
|
if channel:
|
||||||
return channel
|
return channel
|
||||||
return None
|
return None
|
||||||
@ -87,7 +84,7 @@ async def get_emoji(ctx, name, return_empty=True):
|
|||||||
emoji = await commands.converter.EmojiConverter().convert(ctx, name)
|
emoji = await commands.converter.EmojiConverter().convert(ctx, name)
|
||||||
except:
|
except:
|
||||||
if return_empty:
|
if return_empty:
|
||||||
emoji = ''
|
emoji = ""
|
||||||
else:
|
else:
|
||||||
return name
|
return name
|
||||||
return emoji
|
return emoji
|
||||||
@ -101,9 +98,13 @@ async def react_and_reply(ctx, reaction, message):
|
|||||||
|
|
||||||
async def send_to_channel(bot, channel_name, content=None, embed=None):
|
async def send_to_channel(bot, channel_name, content=None, embed=None):
|
||||||
"""Send a message to a specific channel by name or ID."""
|
"""Send a message to a specific channel by name or ID."""
|
||||||
guild = bot.get_guild(int(os.environ.get('GUILD_ID')))
|
guild_id = os.environ.get("GUILD_ID")
|
||||||
|
if not guild_id:
|
||||||
|
logger.error("GUILD_ID env var is not set")
|
||||||
|
return
|
||||||
|
guild = bot.get_guild(int(guild_id))
|
||||||
if not guild:
|
if not guild:
|
||||||
logger.error('Cannot send to channel - bot not logged in')
|
logger.error("Cannot send to channel - bot not logged in")
|
||||||
return
|
return
|
||||||
|
|
||||||
this_channel = discord.utils.get(guild.text_channels, name=channel_name)
|
this_channel = discord.utils.get(guild.text_channels, name=channel_name)
|
||||||
@ -111,7 +112,7 @@ async def send_to_channel(bot, channel_name, content=None, embed=None):
|
|||||||
if not this_channel:
|
if not this_channel:
|
||||||
this_channel = discord.utils.get(guild.text_channels, id=channel_name)
|
this_channel = discord.utils.get(guild.text_channels, id=channel_name)
|
||||||
if not this_channel:
|
if not this_channel:
|
||||||
raise NameError(f'**{channel_name}** channel not found')
|
raise NameError(f"**{channel_name}** channel not found")
|
||||||
|
|
||||||
return await this_channel.send(content=content, embed=embed)
|
return await this_channel.send(content=content, embed=embed)
|
||||||
|
|
||||||
@ -128,14 +129,16 @@ async def get_or_create_role(ctx, role_name, mentionable=True):
|
|||||||
|
|
||||||
def get_special_embed(special):
|
def get_special_embed(special):
|
||||||
"""Create an embed for a special item."""
|
"""Create an embed for a special item."""
|
||||||
embed = discord.Embed(title=f'{special.name} - Special #{special.get_id()}',
|
embed = discord.Embed(
|
||||||
color=discord.Color.random(),
|
title=f"{special.name} - Special #{special.get_id()}",
|
||||||
description=f'{special.short_desc}')
|
color=discord.Color.random(),
|
||||||
embed.add_field(name='Description', value=f'{special.long_desc}', inline=False)
|
description=f"{special.short_desc}",
|
||||||
if special.thumbnail.lower() != 'none':
|
)
|
||||||
embed.set_thumbnail(url=f'{special.thumbnail}')
|
embed.add_field(name="Description", value=f"{special.long_desc}", inline=False)
|
||||||
if special.url.lower() != 'none':
|
if special.thumbnail.lower() != "none":
|
||||||
embed.set_image(url=f'{special.url}')
|
embed.set_thumbnail(url=f"{special.thumbnail}")
|
||||||
|
if special.url.lower() != "none":
|
||||||
|
embed.set_image(url=f"{special.url}")
|
||||||
|
|
||||||
return embed
|
return embed
|
||||||
|
|
||||||
@ -154,99 +157,125 @@ def get_team_embed(title, team=None, thumbnail: bool = True):
|
|||||||
if team:
|
if team:
|
||||||
embed = discord.Embed(
|
embed = discord.Embed(
|
||||||
title=title,
|
title=title,
|
||||||
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16)
|
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16),
|
||||||
|
)
|
||||||
|
embed.set_footer(
|
||||||
|
text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES["logo"]
|
||||||
)
|
)
|
||||||
embed.set_footer(text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES['logo'])
|
|
||||||
if thumbnail:
|
if thumbnail:
|
||||||
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES['logo'])
|
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES["logo"])
|
||||||
else:
|
else:
|
||||||
embed = discord.Embed(
|
embed = discord.Embed(title=title, color=int(SBA_COLOR, 16))
|
||||||
title=title,
|
embed.set_footer(
|
||||||
color=int(SBA_COLOR, 16)
|
text=f"Paper Dynasty Season {PD_SEASON}", icon_url=IMAGES["logo"]
|
||||||
)
|
)
|
||||||
embed.set_footer(text=f'Paper Dynasty Season {PD_SEASON}', icon_url=IMAGES['logo'])
|
|
||||||
if thumbnail:
|
if thumbnail:
|
||||||
embed.set_thumbnail(url=IMAGES['logo'])
|
embed.set_thumbnail(url=IMAGES["logo"])
|
||||||
|
|
||||||
return embed
|
return embed
|
||||||
|
|
||||||
|
|
||||||
async def create_channel_old(
|
async def create_channel_old(
|
||||||
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True, allowed_members=None,
|
ctx,
|
||||||
allowed_roles=None):
|
channel_name: str,
|
||||||
|
category_name: str,
|
||||||
|
everyone_send=False,
|
||||||
|
everyone_read=True,
|
||||||
|
allowed_members=None,
|
||||||
|
allowed_roles=None,
|
||||||
|
):
|
||||||
"""Create a text channel with specified permissions (legacy version)."""
|
"""Create a text channel with specified permissions (legacy version)."""
|
||||||
this_category = discord.utils.get(ctx.guild.categories, name=category_name)
|
this_category = discord.utils.get(ctx.guild.categories, name=category_name)
|
||||||
if not this_category:
|
if not this_category:
|
||||||
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
|
raise ValueError(f"I couldn't find a category named **{category_name}**")
|
||||||
|
|
||||||
overwrites = {
|
overwrites = {
|
||||||
ctx.guild.me: discord.PermissionOverwrite(read_messages=True, send_messages=True),
|
ctx.guild.me: discord.PermissionOverwrite(
|
||||||
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
|
read_messages=True, send_messages=True
|
||||||
|
),
|
||||||
|
ctx.guild.default_role: discord.PermissionOverwrite(
|
||||||
|
read_messages=everyone_read, send_messages=everyone_send
|
||||||
|
),
|
||||||
}
|
}
|
||||||
if allowed_members:
|
if allowed_members:
|
||||||
if isinstance(allowed_members, list):
|
if isinstance(allowed_members, list):
|
||||||
for member in allowed_members:
|
for member in allowed_members:
|
||||||
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
|
overwrites[member] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=True
|
||||||
|
)
|
||||||
if allowed_roles:
|
if allowed_roles:
|
||||||
if isinstance(allowed_roles, list):
|
if isinstance(allowed_roles, list):
|
||||||
for role in allowed_roles:
|
for role in allowed_roles:
|
||||||
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
|
overwrites[role] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=True
|
||||||
|
)
|
||||||
|
|
||||||
this_channel = await ctx.guild.create_text_channel(
|
this_channel = await ctx.guild.create_text_channel(
|
||||||
channel_name,
|
channel_name, overwrites=overwrites, category=this_category
|
||||||
overwrites=overwrites,
|
|
||||||
category=this_category
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
|
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
|
||||||
|
|
||||||
return this_channel
|
return this_channel
|
||||||
|
|
||||||
|
|
||||||
async def create_channel(
|
async def create_channel(
|
||||||
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True,
|
ctx,
|
||||||
read_send_members: list = None, read_send_roles: list = None, read_only_roles: list = None):
|
channel_name: str,
|
||||||
|
category_name: str,
|
||||||
|
everyone_send=False,
|
||||||
|
everyone_read=True,
|
||||||
|
read_send_members: list = None,
|
||||||
|
read_send_roles: list = None,
|
||||||
|
read_only_roles: list = None,
|
||||||
|
):
|
||||||
"""Create a text channel with specified permissions."""
|
"""Create a text channel with specified permissions."""
|
||||||
# Handle both Context and Interaction objects
|
# Handle both Context and Interaction objects
|
||||||
guild = ctx.guild if hasattr(ctx, 'guild') else None
|
guild = ctx.guild if hasattr(ctx, "guild") else None
|
||||||
if not guild:
|
if not guild:
|
||||||
raise ValueError(f'Unable to access guild from context object')
|
raise ValueError(f"Unable to access guild from context object")
|
||||||
|
|
||||||
# Get bot member - different for Context vs Interaction
|
# Get bot member - different for Context vs Interaction
|
||||||
if hasattr(ctx, 'me'): # Context object
|
if hasattr(ctx, "me"): # Context object
|
||||||
bot_member = ctx.me
|
bot_member = ctx.me
|
||||||
elif hasattr(ctx, 'client'): # Interaction object
|
elif hasattr(ctx, "client"): # Interaction object
|
||||||
bot_member = guild.get_member(ctx.client.user.id)
|
bot_member = guild.get_member(ctx.client.user.id)
|
||||||
else:
|
else:
|
||||||
# Fallback - try to find bot member by getting the first member with bot=True
|
# Fallback - try to find bot member by getting the first member with bot=True
|
||||||
bot_member = next((m for m in guild.members if m.bot), None)
|
bot_member = next((m for m in guild.members if m.bot), None)
|
||||||
if not bot_member:
|
if not bot_member:
|
||||||
raise ValueError(f'Unable to find bot member in guild')
|
raise ValueError(f"Unable to find bot member in guild")
|
||||||
|
|
||||||
this_category = discord.utils.get(guild.categories, name=category_name)
|
this_category = discord.utils.get(guild.categories, name=category_name)
|
||||||
if not this_category:
|
if not this_category:
|
||||||
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
|
raise ValueError(f"I couldn't find a category named **{category_name}**")
|
||||||
|
|
||||||
overwrites = {
|
overwrites = {
|
||||||
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True),
|
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True),
|
||||||
guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
|
guild.default_role: discord.PermissionOverwrite(
|
||||||
|
read_messages=everyone_read, send_messages=everyone_send
|
||||||
|
),
|
||||||
}
|
}
|
||||||
if read_send_members:
|
if read_send_members:
|
||||||
for member in read_send_members:
|
for member in read_send_members:
|
||||||
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
|
overwrites[member] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=True
|
||||||
|
)
|
||||||
if read_send_roles:
|
if read_send_roles:
|
||||||
for role in read_send_roles:
|
for role in read_send_roles:
|
||||||
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
|
overwrites[role] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=True
|
||||||
|
)
|
||||||
if read_only_roles:
|
if read_only_roles:
|
||||||
for role in read_only_roles:
|
for role in read_only_roles:
|
||||||
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=False)
|
overwrites[role] = discord.PermissionOverwrite(
|
||||||
|
read_messages=True, send_messages=False
|
||||||
|
)
|
||||||
|
|
||||||
this_channel = await guild.create_text_channel(
|
this_channel = await guild.create_text_channel(
|
||||||
channel_name,
|
channel_name, overwrites=overwrites, category=this_category
|
||||||
overwrites=overwrites,
|
|
||||||
category=this_category
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
|
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
|
||||||
|
|
||||||
return this_channel
|
return this_channel
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user