paper-dynasty-discord/cogs/economy_new/marketplace.py
2025-10-08 14:45:41 -05:00

425 lines
17 KiB
Python

# Economy Marketplace Module
# Contains buy/sell functionality from the original economy.py
import logging
from discord.ext import commands
from discord import app_commands
import discord
from typing import Optional
# Import specific utilities needed by this module
from api_calls import db_get, db_post, db_patch
from helpers.constants import PD_PLAYERS, IMAGES, LIVE_CARDSET_ID
from helpers import (
get_team_by_owner, display_cards, give_packs, legal_channel, get_channel,
get_blank_team_card, get_card_embeds, confirm_pack_purchase, get_cal_user,
Question, image_embed
)
from helpers.discord_utils import get_team_embed, send_to_channel, get_emoji
from helpers.search_utils import fuzzy_search, cardset_search
from api_calls import team_hash
from discord_ui import Confirm, ButtonOptions, SelectView, SelectBuyPacksCardset, SelectBuyPacksTeam
logger = logging.getLogger('discord_app')
class Marketplace(commands.Cog):
"""Marketplace functionality for buying and selling cards and packs."""
def __init__(self, bot):
self.bot = bot
async def buy_card(self, interaction: discord.Interaction, this_player: dict, owner_team: dict):
"""Helper method for purchasing individual cards."""
c_query = await db_get('cards',
params=[('player_id', this_player['player_id']), ('team_id', owner_team["id"])])
num_copies = c_query['count'] if c_query else 0
if not this_player['cardset']['for_purchase']:
await interaction.response.send_message(
content=f'Ope - looks like singles from the {this_player["cardset"]["name"]} cardset are not available '
f'for purchase.'
)
return
if this_player['cost'] > owner_team['wallet']:
await interaction.response.send_message(
content=None,
embeds=await get_card_embeds(get_blank_team_card(this_player))
)
await interaction.channel.send(
content=f'You currently have {num_copies} cop{"ies" if num_copies != 1 else "y"} of this card.\n\n'
f'Your Wallet: {owner_team["wallet"]}\n'
f'Card Price: {this_player["cost"]}\n'
f'After Purchase: {await get_emoji(interaction.guild, "dead", False)}\n\n'
f'You will have to save up a little more.'
)
return
view = Confirm(responders=[interaction.user])
await interaction.response.send_message(
content=None,
embeds=await get_card_embeds(get_blank_team_card(this_player))
)
question = await interaction.channel.send(
content=f'You currently have {num_copies} cop{"ies" if num_copies != 1 else "y"} of this card.\n\n'
f'Your Wallet: {owner_team["wallet"]}\n'
f'Card Price: {this_player["cost"]}\n'
f'After Purchase: {owner_team["wallet"] - this_player["cost"]}\n\n'
f'Would you like to make this purchase?',
view=view
)
await view.wait()
if not view.value:
await question.edit(
content='Saving that money. Smart.',
view=None
)
return
purchase = await db_get(
f'teams/{owner_team["id"]}/buy/players',
params=[('ts', team_hash(owner_team)), ('ids', f'{this_player["player_id"]}')],
timeout=10
)
if not purchase:
await question.edit(
content=f'That didn\'t go through for some reason. If this happens again, go ping the shit out of Cal.',
view=None
)
return
await question.edit(content=f'It\'s all yours!', view=None)
group_buy = app_commands.Group(name='buy', description='Make a purchase from the marketplace')
@group_buy.command(name='card-by-id', description='Buy a player card from the marketplace')
@app_commands.checks.has_any_role(PD_PLAYERS)
async def buy_card_id_slash(self, interaction: discord.Interaction, player_id: int):
if interaction.channel.name in ['paper-dynasty-chat', 'pd-news-ticker', 'pd-network-news']:
await interaction.response.send_message(
f'Please head to down to {get_channel(interaction, "pd-bot-hole")} to run this command.',
ephemeral=True
)
return
owner_team = await get_team_by_owner(interaction.user.id)
if not owner_team:
await interaction.response.send_message(
f'I don\'t see a team for you, yet. You can sign up with the `/newteam` command!'
)
p_query = await db_get('players', object_id=player_id, none_okay=False)
logger.debug(f'this_player: {p_query}')
await self.buy_card(interaction, p_query, owner_team)
@group_buy.command(name='card-by-name', description='Buy a player card from the marketplace')
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_commands.describe(
player_name='Name of the player you want to purchase',
player_cardset='Optional: Name of the cardset the player is from'
)
async def buy_card_slash(
self, interaction: discord.Interaction, player_name: str, player_cardset: Optional[str] = None):
if interaction.channel.name in ['paper-dynasty-chat', 'pd-news-ticker', 'pd-network-news']:
await interaction.response.send_message(
f'Please head to down to {get_channel(interaction, "pd-bot-hole")} to run this command.',
ephemeral=True
)
return
owner_team = await get_team_by_owner(interaction.user.id)
if not owner_team:
await interaction.response.send_message(
f'I don\'t see a team for you, yet. You can sign up with the `/newteam` command!'
)
player_cog = self.bot.get_cog('PlayerLookup')
proper_name = fuzzy_search(player_name, player_cog.player_list)
if not proper_name:
await interaction.response.send_message(f'No clue who that is.')
return
all_params = [('name', proper_name)]
if player_cardset:
this_cardset = await cardset_search(player_cardset, player_cog.cardset_list)
all_params.append(('cardset_id', this_cardset['id']))
p_query = await db_get('players', params=all_params)
if p_query['count'] == 0:
await interaction.response.send_message(
f'I didn\'t find any cards for {proper_name}'
)
return
if p_query['count'] > 1:
await interaction.response.send_message(
f'I found {p_query["count"]} different cards for {proper_name}. Would you please run this again '
f'with the cardset specified?'
)
return
this_player = p_query['players'][0]
logger.debug(f'this_player: {this_player}')
await self.buy_card(interaction, this_player, owner_team)
@group_buy.command(name='pack', description='Buy a pack or 7 from the marketplace')
@app_commands.checks.has_any_role(PD_PLAYERS)
@app_commands.describe()
async def buy_pack_slash(self, interaction: discord.Interaction):
if interaction.channel.name in ['paper-dynasty-chat', 'pd-news-ticker', 'pd-network-news']:
await interaction.response.send_message(
f'Please head to down to {get_channel(interaction, "pd-bot-hole")} to run this command.',
ephemeral=True
)
return
owner_team = await get_team_by_owner(interaction.user.id)
if not owner_team:
await interaction.response.send_message(
f'I don\'t see a team for you, yet. You can sign up with the `/newteam` command!'
)
p_query = await db_get('packtypes', params=[('available', True)])
if 'count' not in p_query:
await interaction.response.send_message(
f'Welp, I couldn\'t find any packs in my database. Should probably go ping '
f'{get_cal_user(interaction).mention} about that.'
)
return
embed = get_team_embed('Packs for Purchase')
# embed.description = 'Run `/buy pack <pack_name>`'
for x in p_query['packtypes']:
embed.add_field(name=f'{x["name"]} - {x["cost"]}', value=f'{x["description"]}')
pack_options = [x['name'] for x in p_query['packtypes'][:5] if x['available'] and x['cost']]
if len(pack_options) < 5:
pack_options.extend(['na' for x in range(5 - len(pack_options))])
view = ButtonOptions(
responders=[interaction.user], timeout=60,
labels=pack_options
)
await interaction.response.send_message(
content=None,
embed=embed
)
question = await interaction.channel.send(
f'Which pack would you like to purchase?', view=view
)
await view.wait()
if view.value:
pack_name = view.value
await question.delete()
this_q = Question(self.bot, interaction.channel, 'How many would you like?', 'int', 60)
num_packs = await this_q.ask([interaction.user])
else:
await question.delete()
await interaction.channel.send('Hm. Another window shopper. I\'ll be here when you\'re serious.')
return
p_query = await db_get(
'packtypes', params=[('name', pack_name.lower().replace('pack', '')), ('available', True)]
)
if 'count' not in p_query:
await interaction.channel.send(
f'Hmm...I don\'t recognize {pack_name.title()} as a pack type. Check on that and get back to me.',
ephemeral=True
)
return
pack_type = p_query['packtypes'][0]
pack_cover = IMAGES['logo']
if pack_type['name'] == 'Standard':
pack_cover = IMAGES['pack-sta']
elif pack_type['name'] == 'Premium':
pack_cover = IMAGES['pack-pre']
elif pack_type['name'] == 'Promo Choice':
pack_cover = IMAGES['mvp-hype']
total_cost = pack_type['cost'] * num_packs
pack_embed = image_embed(
pack_cover,
title=f'{owner_team["lname"]}',
desc=f'{num_packs if num_packs > 1 else ""}{"x " if num_packs > 1 else ""}'
f'{pack_type["name"]} Pack{"s" if num_packs != 1 else ""}',
)
if total_cost > owner_team['wallet']:
await interaction.channel.send(
content=None,
embed=pack_embed
)
await interaction.channel.send(
content=f'Your Wallet: {owner_team["wallet"]}\n'
f'Pack{"s" if num_packs > 1 else ""} Price: {total_cost}\n'
f'After Purchase: {await get_emoji(interaction.guild, "dead", False)}\n\n'
f'You will have to save up a little more.'
)
return
# Get Customization and make purchase
if pack_name in ['Standard', 'Premium']:
view = ButtonOptions(
[interaction.user],
timeout=15,
labels=['No Customization', 'Cardset', 'Franchise', None, None]
)
view.option1.style = discord.ButtonStyle.danger
await interaction.channel.send(
content='Would you like to apply a pack customization?',
embed=pack_embed,
view=view
)
await view.wait()
if not view.value:
await interaction.channel.send(f'You think on it and get back to me.')
return
elif view.value == 'Cardset':
# await interaction.delete_original_response()
view = SelectView([SelectBuyPacksCardset(owner_team, num_packs, pack_type['id'], pack_embed, total_cost)])
await interaction.channel.send(
content=None,
view=view
)
return
elif view.value == 'Franchise':
# await interaction.delete_original_response()
view = SelectView(
[
SelectBuyPacksTeam('AL', owner_team, num_packs, pack_type['id'], pack_embed, total_cost),
SelectBuyPacksTeam('NL', owner_team, num_packs, pack_type['id'], pack_embed, total_cost)
],
timeout=30
)
await interaction.channel.send(
content=None,
view=view
)
return
question = await confirm_pack_purchase(interaction, owner_team, num_packs, total_cost, pack_embed)
if question is None:
return
purchase = await db_get(
f'teams/{owner_team["id"]}/buy/pack/{pack_type["id"]}',
params=[('ts', team_hash(owner_team)), ('quantity', num_packs)]
)
if not purchase:
await question.edit(
f'That didn\'t go through for some reason. If this happens again, go ping the shit out of Cal.',
view=None
)
return
await question.edit(
content=f'{"They are" if num_packs > 1 else "It is"} all yours! Go rip \'em with `/open-packs`',
view=None
)
return
@app_commands.command(name='selldupes', description='Sell all of your duplicate cards')
@app_commands.checks.has_any_role(PD_PLAYERS)
@commands.check(legal_channel)
@app_commands.describe(
immediately='Skip all prompts and sell dupes immediately; default False',
skip_live='Skip all live series cards; default True'
)
async def sell_dupes_command(
self, interaction: discord.Interaction, skip_live: bool = True, immediately: bool = False):
team = await get_team_by_owner(interaction.user.id)
if not team:
await interaction.response.send_message(
f'I don\'t see a team for you, yet. You can sign up with the `/newteam` command!',
ephemeral=True
)
return
await interaction.response.send_message(
f'Let me flip through your cards. This could take a while if you have a ton of cards...'
)
try:
c_query = await db_get('cards', params=[('team_id', team['id']), ('dupes', True)], timeout=15)
except Exception as e:
await interaction.edit_original_response(
content=f'{e}\n\nSounds like a {get_cal_user(interaction).mention} problem tbh'
)
return
player_ids = []
dupe_ids = ''
dupe_cards = []
dupe_strings = ['' for x in range(20)]
str_count = 0
for card in c_query['cards']:
if len(dupe_strings[str_count]) > 1500:
str_count += 1
logger.debug(f'card: {card}')
if skip_live and (card['player']['cardset']['id'] == LIVE_CARDSET_ID):
logger.debug(f'live series card - skipping')
elif card['player']['player_id'] not in player_ids:
logger.debug(f'not a dupe')
player_ids.append(card['player']['player_id'])
else:
logger.info(f'{team["abbrev"]} duplicate card: {card["id"]}')
dupe_cards.append(card)
dupe_ids += f'{card["id"]},'
dupe_strings[str_count] += f'{card["player"]["rarity"]["name"]} {card["player"]["p_name"]} - ' \
f'{card["player"]["cardset"]["name"]}\n'
if len(dupe_cards) == 0:
await interaction.edit_original_response(content=f'You currently have 0 duplicate cards!')
return
logger.info(f'sending first message / length {len(dupe_strings[0])}')
await interaction.edit_original_response(
content=f'You currently have {len(dupe_cards)} duplicate cards:\n\n{dupe_strings[0]}'
)
for x in dupe_strings[1:]:
logger.info(f'checking string: {len(x)}')
if len(x) > 0:
await interaction.channel.send(x)
else:
break
if not immediately:
view = Confirm(responders=[interaction.user])
question = await interaction.channel.send('Would you like to sell all of them?', view=view)
await view.wait()
if not view.value:
await question.edit(
content='We can leave them be for now.',
view=None
)
return
await question.edit(content=f'The sale is going through...', view=None)
# for card in dupe_cards:
sale = await db_get(
f'teams/{team["id"]}/sell/cards',
params=[('ts', team_hash(team)), ('ids', dupe_ids)],
timeout=10
)
if not sale:
await interaction.channel.send(
f'That didn\'t go through for some reason. Go ping the shit out of {get_cal_user(interaction).mention}.'
)
return
team = await db_get('teams', object_id=team['id'])
await interaction.channel.send(f'Your Wallet: {team["wallet"]}')
async def setup(bot):
"""Setup function for the Marketplace cog."""
await bot.add_cog(Marketplace(bot))