paper-dynasty-card-creation/creation_helpers.py
Cal Corum 48ec7375a1 Normalize franchise values in card generation
- Add FRANCHISE_NORMALIZE dict and helper function
- Update FRANCHISE_LIST to return city-agnostic values
- Update mlbteam_and_franchise() to normalize franchise

Ensures new cards use normalized franchise format for AI roster matching

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 12:01:12 -06:00

1346 lines
40 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import datetime
import math
from decimal import ROUND_HALF_EVEN, Decimal
from exceptions import logger
from typing import Dict, List, Tuple, Optional
import pandas as pd
import pybaseball as pb
import random
import requests
import time
from db_calls import db_get
from db_calls_card_creation import *
from bs4 import BeautifulSoup
# Card Creation Constants
NEW_PLAYER_COST = 99999 # Sentinel value indicating a new player not yet priced
RARITY_BASE_COSTS = {
1: 810, # Diamond
2: 270, # Gold
3: 90, # Silver
4: 30, # Bronze
5: 10, # Common
99: 2400 # Special/Legend
}
# Rarity Cost Adjustments
# Maps (old_rarity, new_rarity) -> (cost_adjustment, minimum_cost)
# When a player's rarity changes, adjust their cost by the specified amount
# and enforce minimum cost if specified (None = no minimum)
RARITY_COST_ADJUSTMENTS = {
# From Diamond (1)
(1, 2): (-540, 100),
(1, 3): (-720, 50),
(1, 4): (-780, 15),
(1, 5): (-800, 5),
(1, 99): (1600, None),
# From Gold (2)
(2, 1): (540, None),
(2, 3): (-180, 50),
(2, 4): (-240, 15),
(2, 5): (-260, 5),
(2, 99): (2140, None),
# From Silver (3)
(3, 1): (720, None),
(3, 2): (180, None),
(3, 4): (-60, 15),
(3, 5): (-80, 5),
(3, 99): (2320, None),
# From Bronze (4)
(4, 1): (780, None),
(4, 2): (240, None),
(4, 3): (60, None),
(4, 5): (-20, 5),
(4, 99): (2380, None),
# From Common (5)
(5, 1): (800, None),
(5, 2): (260, None),
(5, 3): (80, None),
(5, 4): (20, None),
(5, 99): (2400, None),
# From Special/Legend (99)
(99, 1): (-1600, 800),
(99, 2): (-2140, 100),
(99, 3): (-2320, 50),
(99, 4): (-2380, 15),
(99, 5): (-2400, 5),
}
# Default OPS Values (fallbacks when actual averages unavailable)
# These are used to calculate player costs when we don't have enough data
# to calculate rarity-specific averages from the cardset
# Batter default OPS by rarity
DEFAULT_BATTER_OPS = {
1: 1.066, # Diamond
2: 0.938, # Gold
3: 0.844, # Silver
4: 0.752, # Bronze
5: 0.612, # Common
}
# Starting Pitcher default OPS-against by rarity
DEFAULT_STARTER_OPS = {
99: 0.388, # Special/Legend
1: 0.445, # Diamond
2: 0.504, # Gold
3: 0.568, # Silver
4: 0.634, # Bronze
5: 0.737, # Common
}
# Relief Pitcher default OPS-against by rarity
DEFAULT_RELIEVER_OPS = {
99: 0.282, # Special/Legend
1: 0.375, # Diamond
2: 0.442, # Gold
3: 0.516, # Silver
4: 0.591, # Bronze
5: 0.702, # Common
}
# Franchise normalization: Convert city+team names to city-agnostic team names
# This enables cross-era player matching (e.g., 'Oakland Athletics' -> 'Athletics')
FRANCHISE_NORMALIZE = {
'Arizona Diamondbacks': 'Diamondbacks',
'Atlanta Braves': 'Braves',
'Baltimore Orioles': 'Orioles',
'Boston Red Sox': 'Red Sox',
'Chicago Cubs': 'Cubs',
'Chicago White Sox': 'White Sox',
'Cincinnati Reds': 'Reds',
'Cleveland Guardians': 'Guardians',
'Colorado Rockies': 'Rockies',
'Detroit Tigers': 'Tigers',
'Houston Astros': 'Astros',
'Kansas City Royals': 'Royals',
'Los Angeles Angels': 'Angels',
'Los Angeles Dodgers': 'Dodgers',
'Miami Marlins': 'Marlins',
'Milwaukee Brewers': 'Brewers',
'Minnesota Twins': 'Twins',
'New York Mets': 'Mets',
'New York Yankees': 'Yankees',
'Oakland Athletics': 'Athletics',
'Philadelphia Phillies': 'Phillies',
'Pittsburgh Pirates': 'Pirates',
'San Diego Padres': 'Padres',
'San Francisco Giants': 'Giants',
'Seattle Mariners': 'Mariners',
'St Louis Cardinals': 'Cardinals',
'St. Louis Cardinals': 'Cardinals',
'Tampa Bay Rays': 'Rays',
'Texas Rangers': 'Rangers',
'Toronto Blue Jays': 'Blue Jays',
'Washington Nationals': 'Nationals',
}
def normalize_franchise(franchise: str) -> str:
"""Convert city+team name to team-only (e.g., 'Oakland Athletics' -> 'Athletics')"""
return FRANCHISE_NORMALIZE.get(franchise, franchise)
D20_CHANCES = {
'2': {
'chances': 1,
'inc': .05
},
'3': {
'chances': 2,
'inc': .1
},
'4': {
'chances': 3,
'inc': .15
},
'5': {
'chances': 4,
'inc': .2
},
'6': {
'chances': 5,
'inc': .25
},
'7': {
'chances': 6,
'inc': .3
},
'8': {
'chances': 5,
'inc': .25
},
'9': {
'chances': 4,
'inc': .2
},
'10': {
'chances': 3,
'inc': .15
},
'11': {
'chances': 2,
'inc': .1
},
'12': {
'chances': 1,
'inc': .05
}
}
BLANK_RESULTS = {
'vL': {
'1': {
'2': {
'result': None,
'splits': None,
'2d6': None
},
'3': {
'result': None,
'splits': None,
'2d6': None
},
'4': {
'result': None,
'splits': None,
'2d6': None
},
'5': {
'result': None,
'splits': None,
'2d6': None
},
'6': {
'result': None,
'splits': None,
'2d6': None
},
'7': {
'result': None,
'splits': None,
'2d6': None
},
'8': {
'result': None,
'splits': None,
'2d6': None
},
'9': {
'result': None,
'splits': None,
'2d6': None
},
'10': {
'result': None,
'splits': None,
'2d6': None
},
'11': {
'result': None,
'splits': None,
'2d6': None
},
'12': {
'result': None,
'splits': None,
'2d6': None
},
'splits': 0
},
'2': {
'2': {
'result': None,
'splits': None,
'2d6': None
},
'3': {
'result': None,
'splits': None,
'2d6': None
},
'4': {
'result': None,
'splits': None,
'2d6': None
},
'5': {
'result': None,
'splits': None,
'2d6': None
},
'6': {
'result': None,
'splits': None,
'2d6': None
},
'7': {
'result': None,
'splits': None,
'2d6': None
},
'8': {
'result': None,
'splits': None,
'2d6': None
},
'9': {
'result': None,
'splits': None,
'2d6': None
},
'10': {
'result': None,
'splits': None,
'2d6': None
},
'11': {
'result': None,
'splits': None,
'2d6': None
},
'12': {
'result': None,
'splits': None,
'2d6': None
},
'splits': 0
},
'3': {
'2': {
'result': None,
'splits': None,
'2d6': None
},
'3': {
'result': None,
'splits': None,
'2d6': None
},
'4': {
'result': None,
'splits': None,
'2d6': None
},
'5': {
'result': None,
'splits': None,
'2d6': None
},
'6': {
'result': None,
'splits': None,
'2d6': None
},
'7': {
'result': None,
'splits': None,
'2d6': None
},
'8': {
'result': None,
'splits': None,
'2d6': None
},
'9': {
'result': None,
'splits': None,
'2d6': None
},
'10': {
'result': None,
'splits': None,
'2d6': None
},
'11': {
'result': None,
'splits': None,
'2d6': None
},
'12': {
'result': None,
'splits': None,
'2d6': None
},
'splits': 0
}
},
'vR': {
'1': {
'2': {
'result': None,
'splits': None,
'2d6': None
},
'3': {
'result': None,
'splits': None,
'2d6': None
},
'4': {
'result': None,
'splits': None,
'2d6': None
},
'5': {
'result': None,
'splits': None,
'2d6': None
},
'6': {
'result': None,
'splits': None,
'2d6': None
},
'7': {
'result': None,
'splits': None,
'2d6': None
},
'8': {
'result': None,
'splits': None,
'2d6': None
},
'9': {
'result': None,
'splits': None,
'2d6': None
},
'10': {
'result': None,
'splits': None,
'2d6': None
},
'11': {
'result': None,
'splits': None,
'2d6': None
},
'12': {
'result': None,
'splits': None,
'2d6': None
},
'splits': 0
},
'2': {
'2': {
'result': None,
'splits': None,
'2d6': None
},
'3': {
'result': None,
'splits': None,
'2d6': None
},
'4': {
'result': None,
'splits': None,
'2d6': None
},
'5': {
'result': None,
'splits': None,
'2d6': None
},
'6': {
'result': None,
'splits': None,
'2d6': None
},
'7': {
'result': None,
'splits': None,
'2d6': None
},
'8': {
'result': None,
'splits': None,
'2d6': None
},
'9': {
'result': None,
'splits': None,
'2d6': None
},
'10': {
'result': None,
'splits': None,
'2d6': None
},
'11': {
'result': None,
'splits': None,
'2d6': None
},
'12': {
'result': None,
'splits': None,
'2d6': None
},
'splits': 0
},
'3': {
'2': {
'result': None,
'splits': None,
'2d6': None
},
'3': {
'result': None,
'splits': None,
'2d6': None
},
'4': {
'result': None,
'splits': None,
'2d6': None
},
'5': {
'result': None,
'splits': None,
'2d6': None
},
'6': {
'result': None,
'splits': None,
'2d6': None
},
'7': {
'result': None,
'splits': None,
'2d6': None
},
'8': {
'result': None,
'splits': None,
'2d6': None
},
'9': {
'result': None,
'splits': None,
'2d6': None
},
'10': {
'result': None,
'splits': None,
'2d6': None
},
'11': {
'result': None,
'splits': None,
'2d6': None
},
'12': {
'result': None,
'splits': None,
'2d6': None
},
'splits': 0
}
}
}
TESTING = False
YES = ['y', 'yes', 'yeet', 'please', 'yeah']
CLUB_LIST = {
'ANA': 'Anaheim Angels',
'ARI': 'Arizona Diamondbacks',
'ATH': 'Athletics',
'ATL': 'Atlanta Braves',
'BAL': 'Baltimore Orioles',
'BOS': 'Boston Red Sox',
'CHC': 'Chicago Cubs',
'CHW': 'Chicago White Sox',
'CIN': 'Cincinnati Reds',
'CLE': 'Cleveland Guardians',
'COL': 'Colorado Rockies',
'DET': 'Detroit Tigers',
'HOU': 'Houston Astros',
'KCR': 'Kansas City Royals',
'LAA': 'Los Angeles Angels',
'LAD': 'Los Angeles Dodgers',
'FLA': 'Florida Marlins',
'MIA': 'Miami Marlins',
'MIL': 'Milwaukee Brewers',
'MIN': 'Minnesota Twins',
'MON': 'Montreal Expos',
'NYM': 'New York Mets',
'NYY': 'New York Yankees',
'OAK': 'Oakland Athletics',
'PHI': 'Philadelphia Phillies',
'PIT': 'Pittsburgh Pirates',
'SDP': 'San Diego Padres',
'SEA': 'Seattle Mariners',
'SFG': 'San Francisco Giants',
'STL': 'St Louis Cardinals',
'TBD': 'Tampa Bay Devil Rays',
'TBR': 'Tampa Bay Rays',
'TEX': 'Texas Rangers',
'TOR': 'Toronto Blue Jays',
'WSN': 'Washington Nationals',
'TOT': 'None',
'2 Tms': 'None',
'2TM': 'None',
'3 Tms': 'None',
'3TM': 'None',
'4 Tms': 'None',
'4TM': 'None'
}
FRANCHISE_LIST = {
'ANA': 'Angels',
'ARI': 'Diamondbacks',
'ATH': 'Athletics',
'ATL': 'Braves',
'BAL': 'Orioles',
'BOS': 'Red Sox',
'CHC': 'Cubs',
'CHW': 'White Sox',
'CIN': 'Reds',
'CLE': 'Guardians',
'COL': 'Rockies',
'DET': 'Tigers',
'FLA': 'Marlins',
'HOU': 'Astros',
'KCR': 'Royals',
'LAA': 'Angels',
'LAD': 'Dodgers',
'MIA': 'Marlins',
'MIL': 'Brewers',
'MIN': 'Twins',
'MON': 'Nationals', # Expos -> Nationals franchise
'NYM': 'Mets',
'NYY': 'Yankees',
'OAK': 'Athletics',
'PHI': 'Phillies',
'PIT': 'Pirates',
'SDP': 'Padres',
'SEA': 'Mariners',
'SFG': 'Giants',
'STL': 'Cardinals',
'TBD': 'Rays',
'TBR': 'Rays',
'TEX': 'Rangers',
'TOR': 'Blue Jays',
'WSN': 'Nationals',
'TOT': 'None',
'2 Tms': 'None',
'2TM': 'None',
'3 Tms': 'None',
'3TM': 'None',
'4 Tms': 'None',
'4TM': 'None'
}
# PLAYER_DB_BACKUP = {
# 684007: {'key_mlbam': 684007, 'key_retro': 'imans001', 'key_fangraphs': 33829, 'key_bbref': 'imanash01'},
#
# }
def get_args(args):
logger.info(f'Process arguments: {args}')
final_args = {}
for x in args:
if "=" not in x:
raise TypeError(f'Invalid <key>=<value> argument: {x}')
key, value = x.split("=")
logger.info(f'key: {key} / value: {value}')
if key in final_args:
raise ValueError(f'Duplicate argument: {key}')
final_args[key] = value
return final_args
def should_update_player_description(
cardset_name: str,
player_cost: int,
current_description: str,
new_description: str
) -> bool:
"""
Determine if a player's description should be updated.
Business logic for description updates:
- Promo cardsets: Only update NEW players (cost == NEW_PLAYER_COST)
- Regular cardsets: Update if description differs and not a PotM card
Args:
cardset_name: Name of the cardset (e.g., "2024 Promos", "2025 Season")
player_cost: Current cost of the player (NEW_PLAYER_COST indicates new player)
current_description: Player's current description
new_description: Proposed new description
Returns:
True if description should be updated, False otherwise
Examples:
>>> should_update_player_description("2024 Promos", 99999, "", "May")
True # New promo card, set description
>>> should_update_player_description("2024 Promos", 100, "April", "May")
False # Existing promo card, keep "April"
>>> should_update_player_description("2025 Season", 100, "2024", "2025")
True # Regular cardset, update outdated description
>>> should_update_player_description("2025 Season", 100, "April PotM", "2025")
False # PotM card, never update
"""
is_promo_cardset = 'promo' in cardset_name.lower()
if is_promo_cardset:
# For promo cardsets: only update NEW players
return player_cost == NEW_PLAYER_COST
else:
# For regular cardsets: update if different and not PotM
is_potm = 'potm' in current_description.lower()
is_different = current_description != new_description
return is_different and not is_potm
def calculate_rarity_cost_adjustment(old_rarity: int, new_rarity: int, old_cost: int) -> int:
"""
Calculate new cost when a player's rarity changes.
Uses the RARITY_COST_ADJUSTMENTS lookup table to determine the cost adjustment
and minimum cost when a player moves between rarity tiers.
Args:
old_rarity: Current rarity tier (1-5, 99)
new_rarity: New rarity tier (1-5, 99)
old_cost: Current player cost
Returns:
New cost after adjustment (with minimum enforced if applicable)
Examples:
>>> calculate_rarity_cost_adjustment(1, 2, 1000)
460 # Diamond to Gold: 1000 - 540 = 460, min 100 → 460
>>> calculate_rarity_cost_adjustment(1, 5, 100)
5 # Diamond to Common: 100 - 800 = -700, min 5 → 5
>>> calculate_rarity_cost_adjustment(5, 1, 50)
850 # Common to Diamond: 50 + 800 = 850, no min → 850
>>> calculate_rarity_cost_adjustment(3, 3, 100)
100 # No change: same rarity returns same cost
"""
# No change if rarity stays the same
if old_rarity == new_rarity:
return old_cost
# Look up the adjustment and minimum cost
adjustment_data = RARITY_COST_ADJUSTMENTS.get((old_rarity, new_rarity))
if adjustment_data is None:
# No defined adjustment for this transition - return old cost
logger.warning(
f"creation_helpers.calculate_rarity_cost_adjustment - No cost adjustment defined for "
f"rarity change {old_rarity}{new_rarity}. Keeping cost at {old_cost}."
)
return old_cost
cost_adjustment, min_cost = adjustment_data
# Calculate new cost
new_cost = old_cost + cost_adjustment
# Apply minimum cost if specified
if min_cost is not None:
new_cost = max(new_cost, min_cost)
return new_cost
async def pd_players_df(cardset_id: int):
p_query = await db_get(
'players',
params=[('inc_dex', False), ('cardset_id', cardset_id), ('short_output', True)]
)
if p_query['count'] == 0:
return pd.DataFrame({
'player_id': [], 'p_name': [], 'cost': [], 'image': [], 'image2': [], 'mlbclub': [], 'franchise': [],
'cardset': [], 'set_num': [], 'rarity': [], 'pos_1': [], 'pos_2': [], 'pos_3': [], 'pos_4': [], 'pos_5': [],
'pos_6': [], 'pos_7': [], 'pos_8': [], 'headshot': [], 'vanity_card': [], 'strat_code': [], 'bbref_id': [],
'fangr_id': [], 'description': [], 'quantity': [], 'mlbplayer': []
})
return pd.DataFrame(p_query['players'])
async def pd_positions_df(cardset_id: int):
pos_query = await db_get(
'cardpositions', params=[('cardset_id', cardset_id), ('short_output', True), ('sort', 'innings-desc')])
if pos_query['count'] == 0:
raise ValueError('No position ratings returned from Paper Dynasty API')
all_pos = pd.DataFrame(pos_query['positions']).rename(columns={'player': 'player_id'})
return all_pos
def get_pitching_peripherals(season: int):
url = f'https://www.baseball-reference.com/leagues/majors/{season}-standard-pitching.shtml'
soup = BeautifulSoup(requests.get(url).text, 'html.parser')
time.sleep(3)
table = soup.find('table', {'id': 'players_standard_pitching'})
headers = []
data = []
indeces = []
for row in table.find_all('tr'):
row_data = []
col_names = []
for cell in row.find_all('td'):
try:
player_id = cell['data-append-csv']
row_data.append(player_id)
if len(headers) == 0:
col_names.append('key_bbref')
except Exception as e:
pass
row_data.append(cell.text)
if len(headers) == 0:
col_names.append(cell['data-stat'])
if len(row_data) > 0:
data.append(row_data)
indeces.append(row_data[0])
if len(headers) == 0:
headers.extend(col_names)
pit_frame = pd.DataFrame(data, index=indeces, columns=headers).query('key_bbref == key_bbref')
return pit_frame.drop_duplicates(subset=['key_bbref'], keep='first')
def mround(x, prec=2, base=.05):
num, to = Decimal(str(x)), Decimal(str(base))
return float(round(num / to) * to)
# return float(round(Decimal(str(x)) / Decimal(str(base)) * Decimal(str(base)), prec))
# return round(base * round(float(x) / base), prec)
def chances_from_row(row_num):
if row_num == '2' or row_num == '12':
return 1
if row_num == '3' or row_num == '11':
return 2
if row_num == '4' or row_num == '10':
return 3
if row_num == '5' or row_num == '9':
return 4
if row_num == '6' or row_num == '8':
return 5
if row_num == '7':
return 6
raise ValueError(f'No chance count found for row_num {row_num}')
def legal_splits(tot_chances):
legal_2d6 = []
for x in D20_CHANCES:
num_incs = mround(tot_chances) / D20_CHANCES[x]['inc']
if num_incs - int(num_incs) == 0 and int(20 - num_incs) > 0:
legal_2d6.append({
'2d6': int(x),
'incs': int(num_incs),
'bad_chances': mround(D20_CHANCES[x]['chances'] * (int(20 - num_incs) / 20)),
'bad_incs': int(20 - num_incs)
})
random.shuffle(legal_2d6)
# if TESTING: print(f'tot_chances: {myround(tot_chances)}')
# if TESTING: print(f'legal_2d6: {legal_2d6}')
return legal_2d6
def result_string(tba_data, row_num, split_min=None, split_max=None):
bold1 = f'{"<b>" if tba_data["bold"] else ""}'
bold2 = f'{"</b>" if tba_data["bold"] else ""}'
row_string = f'{"<b> </b>" if int(row_num) < 10 else ""}{row_num}'
if TESTING: print(f'adding {tba_data["string"]} to row {row_num} / '
f'split_min: {split_min} / split_max: {split_max}')
# No splits; standard result
if not split_min:
return f'{bold1}{row_string}-{tba_data["string"]}{bold2}'
# With splits
split_nums = f'{split_min if split_min != 20 else ""}{"-" if split_min != 20 else ""}{split_max}'
data_string = tba_data["sm-string"] if "sm-string" in tba_data.keys() else tba_data["string"]
spaces = 18 - len(data_string) - len(split_nums)
if 'WALK' in data_string:
spaces -= 3
elif 'SI**' in data_string:
spaces += 1
elif 'DO*' in data_string:
spaces -= 1
elif 'DO*' in data_string:
spaces -= 2
elif 'so' in data_string:
spaces += 3
elif 'gb' in data_string:
spaces -= 3
if TESTING: print(f'len(tba_data["string"]): {len(data_string)} / len(split_nums): {len(split_nums)} '
f'spaces: {spaces}')
if split_min == 1 or split_min is None:
row_output = f'{row_string}-'
else:
row_output = '<b> </b>'
if TESTING: print(f'row_output: {row_output}')
return f'{bold1}{row_output}{data_string}{" " * spaces}{split_nums}{bold2}'
def result_data(tba_data, row_num, tba_data_bottom=None, top_split_max=None, fatigue=False):
ret_data = {}
top_bold1 = f'{"<b>" if tba_data["bold"] else ""}'
top_bold2 = f'{"</b>" if tba_data["bold"] else ""}'
bot_bold1 = None
bot_bold2 = None
if tba_data_bottom:
bot_bold1 = f'{"<b>" if tba_data_bottom["bold"] else ""}'
bot_bold2 = f'{"</b>" if tba_data_bottom["bold"] else ""}'
if tba_data_bottom is None:
ret_data['2d6'] = f'{top_bold1}{int(row_num)}-{top_bold2}'
ret_data['splits'] = f'{top_bold1}{top_bold2}'
ret_data['result'] = f'{top_bold1}' \
f'{tba_data["string"]}{"" if fatigue else ""}' \
f'{top_bold2}'
else:
ret_data['2d6'] = f'{top_bold1}{int(row_num)}-{top_bold2}\n'
ret_data['splits'] = f'{top_bold1}1{"-" if top_split_max != 1 else ""}' \
f'{top_split_max if top_split_max != 1 else ""}{top_bold2}\n' \
f'{bot_bold1}{top_split_max+1}{"-20" if top_split_max != 19 else ""}{bot_bold2}'
ret_data['result'] = \
f'{top_bold1}{tba_data["sm-string"] if "sm-string" in tba_data.keys() else tba_data["string"]}' \
f'{top_bold2}\n' \
f'{bot_bold1}' \
f'{tba_data_bottom["sm-string"] if "sm-string" in tba_data_bottom.keys() else tba_data_bottom["string"]}' \
f'{bot_bold2}'
return ret_data
def get_of(batter_hand, pitcher_hand, pull_side=True):
if batter_hand == 'R':
return 'lf' if pull_side else 'rf'
if batter_hand == 'L':
return 'rf' if pull_side else 'lf'
if batter_hand == 'S':
if pitcher_hand == 'L':
return 'rf' if pull_side else 'rf'
else:
return 'lf' if pull_side else 'lf'
def get_col(col_num):
if col_num == '1':
return 'one'
if col_num == '2':
return 'two'
if col_num == '3':
return 'three'
def write_to_csv(output_path, file_name: str, row_data: list):
# Build the csv output
fpath = (output_path / f'{file_name}').with_suffix('.csv')
# logger.info(f'Printing following data to {file_name}:\n\n{row_data}')
with fpath.open(mode='w+', newline='', encoding='utf-8') as csv_File:
writer = csv.writer(csv_File)
writer.writerows(row_data)
def get_position_string(all_pos: list, inc_p: bool):
if len(all_pos) == 0:
return 'dh'
of_arm = None
of_error = None
of_innings = None
lf_range = None
lf_innings = 0
cf_range = None
cf_innings = 0
rf_range = None
rf_innings = 0
all_def = []
for x in all_pos:
if x.position == 'OF':
of_arm = f'{"+" if "-" not in x.arm else ""}{x.arm}'
of_error = x.error
of_innings = x.innings
elif x.position == 'CF':
cf_range = x.range
cf_innings = x.innings
elif x.position == 'LF':
lf_range = x.range
lf_innings = x.innings
elif x.position == 'RF':
rf_range = x.range
rf_innings = x.innings
elif x.position == 'C':
all_def.append(
(f'c-{x.range}({"+" if int(x.arm) >= 0 else ""}{x.arm}) e{x.error} T-{x.overthrow}(pb-{x.pb})', x.innings)
)
elif 'P' in x.position and not inc_p:
pass
else:
all_def.append((f'{x.position.lower()}-{x.range}e{x.error}', x.innings))
if of_arm is not None:
logger.info(
f'\n\nProcessing OF player ID {all_pos[0].player_id}\nlf-{lf_range} / cf-{cf_range} / rf-{rf_range}'
)
all_of = []
if lf_innings > 0:
all_of.append((lf_range, lf_innings, 'lf'))
if cf_innings > 0:
all_of.append((cf_range, cf_innings, 'cf'))
if rf_innings > 0:
all_of.append((rf_range, rf_innings, 'rf'))
logger.info(f'all_of: {all_of}')
if len(all_of) > 0:
all_of.sort(key=lambda y: y[1], reverse=True)
logger.info(f'sorted of: {all_of}')
out_string = f'{all_of[0][2]}-{all_of[0][0]}({of_arm})e{of_error}'
if len(all_of) >= 2:
out_string += f', {all_of[1][2]}-{all_of[1][0]}e{of_error}'
if len(all_of) >= 3:
out_string += f', {all_of[2][2]}-{all_of[2][0]}e{of_error}'
logger.info(f'of string: {out_string}')
all_def.append((out_string, of_innings))
all_def.sort(key=lambda z: z[1], reverse=True)
final_defense = ''
for x in all_def:
if len(final_defense) > 0:
final_defense += f', '
final_defense += f'{x[0]}'
return final_defense
def ordered_positions(all_pos: list) -> list:
if len(all_pos) == 0:
return ['DH']
all_def = []
for x in all_pos:
if x.position not in ['OF', 'P', 'SP', 'RP', 'CP']:
all_def.append((x.innings, x.position))
all_def.sort(key=lambda y: y[0], reverse=True)
return [x[1] for x in all_def]
def ordered_pitching_positions(all_pos: list) -> list:
all_def = []
for x in all_pos:
if x.position in ['SP', 'RP', 'CP']:
all_def.append((x.innings, x.position))
all_def.sort(key=lambda y: y[0], reverse=True)
return [x[1] for x in all_def]
def defense_rg(all_pos: list) -> list:
rg_data = [
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None,
None, None, None, None, None, None
]
all_pitcher = True
for line in all_pos:
if 'P' not in line.position:
all_pitcher = False
break
for line in all_pos:
if line.position == 'P' and all_pitcher:
this_pit = PitcherData.get_or_none(PitcherData.player == line.player, PitcherData.cardset == line.cardset)
if this_pit:
rg_data[0] = line.range
rg_data[9] = line.error
rg_data[22] = this_pit.wild_pitch
rg_data[23] = this_pit.balk
elif line.position == 'C':
rg_data[1] = line.range
rg_data[10] = line.error
rg_data[19] = line.arm
rg_data[20] = line.overthrow
rg_data[21] = line.pb
elif line.position == '1B':
rg_data[2] = line.range
rg_data[11] = line.error
elif line.position == '2B':
rg_data[3] = line.range
rg_data[12] = line.error
elif line.position == '3B':
rg_data[4] = line.range
rg_data[13] = line.error
elif line.position == 'SS':
rg_data[5] = line.range
rg_data[14] = line.error
elif line.position == 'LF':
rg_data[6] = line.range
elif line.position == 'CF':
rg_data[7] = line.range
elif line.position == 'RF':
rg_data[8] = line.range
elif line.position == 'OF':
rg_data[15] = line.error
rg_data[16] = line.error
rg_data[17] = line.error
rg_data[18] = line.arm
return rg_data
def sanitize_chance_output(total_chances, min_chances=1.0, rounding=0.05):
if total_chances < min_chances:
logger.debug(f'sanitize: {total_chances} is less than min_chances ({min_chances}); returning 0')
return 0
rounded_decimal = mround(total_chances, base=rounding)
if rounding == 1.0:
return rounded_decimal
exact_chances = [
Decimal('1.05'), Decimal('1.1'), Decimal('1.2'), Decimal('1.25'), Decimal('1.3'), Decimal('1.35'),
Decimal('1.4'), Decimal('1.5'), Decimal('1.6'), Decimal('1.65'), Decimal('1.7'), Decimal('1.75'),
Decimal('1.8'), Decimal('1.9'), Decimal('1.95'), Decimal('2.1'), Decimal('2.2'), Decimal('2.25'),
Decimal('2.4'), Decimal('2.5'), Decimal('2.55'), Decimal('2.6'), Decimal('2.7'), Decimal('2.75'),
Decimal('2.8'), Decimal('2.85'), Decimal('3.2'), Decimal('3.25'), Decimal('3.3'), Decimal('3.4'),
Decimal('3.5'), Decimal('3.6'), Decimal('3.75'), Decimal('3.8'), Decimal('3.9'), Decimal('4.2'),
Decimal('4.25'), Decimal('4.5'), Decimal('4.75'), Decimal('4.8'), Decimal('5.1'), Decimal('5.4'),
Decimal('5.7')
]
if rounded_decimal > exact_chances[-1]:
return rounded_decimal
for x in exact_chances:
if rounded_decimal <= x:
return float(x)
raise ArithmeticError(f'Attempt to sanitize {total_chances} rounded to {rounded_decimal} and could not be matched to an exact result')
def legacy_sanitize_chance_output(total_chances: float, min_chances: float = 1.0, rounding: float = 0.05):
# r_val = mround(total_chances) if total_chances >= min_chances else 0
r_val = Decimal(total_chances) if total_chances >= min_chances else Decimal(0)
logger.debug(f'r_val: {r_val}')
rounded_val = Decimal(float(math.floor(r_val / Decimal(rounding)) * Decimal(rounding))).quantize(Decimal("0.05"), ROUND_HALF_EVEN)
if math.floor(rounded_val) == rounded_val:
return rounded_val
exact_chances = [
Decimal('1.05'), Decimal('1.1'), Decimal('1.2'), Decimal('1.25'), Decimal('1.3'), Decimal('1.35'),
Decimal('1.4'), Decimal('1.5'), Decimal('1.6'), Decimal('1.65'), Decimal('1.7'), Decimal('1.75'),
Decimal('1.8'), Decimal('1.9'), Decimal('1.95'), Decimal('2.1'), Decimal('2.2'), Decimal('2.25'),
Decimal('2.4'), Decimal('2.5'), Decimal('2.55'), Decimal('2.6'), Decimal('2.7'), Decimal('2.75'),
Decimal('2.8'), Decimal('2.85'), Decimal('3.2'), Decimal('3.25'), Decimal('3.3'), Decimal('3.4'),
Decimal('3.5'), Decimal('3.6'), Decimal('3.75'), Decimal('3.8'), Decimal('3.9'), Decimal('4.2'),
Decimal('4.25'), Decimal('4.5'), Decimal('4.75'), Decimal('4.8'), Decimal('5.1'), Decimal('5.4'),
Decimal('5.7')
]
if rounded_val > exact_chances[-1]:
return rounded_val
for x in exact_chances:
if rounded_val <= x:
return x
def mlbteam_and_franchise(mlbam_playerid):
api_url = f'https://statsapi.mlb.com/api/v1/people/{mlbam_playerid}?hydrate=currentTeam'
logger.info(f'Calling {api_url}')
p_data = {'mlbclub': None, 'franchise': None}
# club_list = [
# 'Arizona Diamondbacks',
# 'Atlanta Braves',
# 'Baltimore Orioles',
# 'Boston Red Sox',
# 'Chicago Cubs',
# 'Chicago White Sox',
# 'Cincinnati Reds',
# 'Cleveland Guardians',
# 'Colorado Rockies',
# 'Detroit Tigers',
# 'Houston Astros',
# 'Kansas City Royals',
# 'Los Angeles Angels',
# 'Los Angeles Dodgers',
# 'Miami Marlins',
# 'Milwaukee Brewers',
# 'Minnesota Twins',
# 'New York Mets',
# 'New York Yankees',
# 'Oakland Athletics',
# 'Philadelphia Phillies',
# 'Pittsburgh Pirates',
# 'San Diego Padres',
# 'Seattle Mariners',
# 'San Francisco Giants',
# 'St Louis Cardinals',
# 'Tampa Bay Rays',
# 'Texas Rangers',
# 'Toronto Blue Jays',
# 'Washington Nationals'
# ]
try:
resp = requests.get(api_url, timeout=2)
except requests.ReadTimeout as e:
logger.error(f'mlbteam_and_franchise - ReadTimeout pull MLB team for MLB AM player ID {mlbam_playerid}')
return p_data
if resp.status_code == 200:
data = resp.json()
data = data['people'][0]
logger.debug(f'data: {data}')
if data['currentTeam']['name'] in CLUB_LIST.values():
p_data['mlbclub'] = data['currentTeam']['name']
p_data['franchise'] = normalize_franchise(data['currentTeam']['name'])
else:
logger.error(f'Could not set team for {mlbam_playerid}; received {data["currentTeam"]["name"]}')
else:
logger.error(f'mlbteam_and_franchise - Bad response from mlbstatsapi: {resp.status_code}')
return p_data
def get_all_pybaseball_ids(player_id: list, key_type: str, is_custom: bool = False, full_name: str = None):
if is_custom:
try:
long_player_id = int(player_id[0])
if long_player_id >= 999942001:
backyard_players = [
'akhan',
'amkhan',
'adelvecchio',
'afrazier',
'awebber',
'bblackwood',
'drobinson',
'dpetrovich',
'esteele',
'ghasselhoff',
'jsmith',
'jgarcia',
'kkawaguchi',
'kphillips',
'keckman',
'lcrocket',
'llui',
'mluna',
'mdubois',
'mthomas',
'psanchez',
'pwheeler',
'rworthington',
'rjohnson',
'rdobbs',
'sdobbs',
'swebber',
'smorgan',
'tdelvecchio',
'vkawaguchi'
]
return pd.Series(
{'key_bbref': backyard_players[long_player_id - 999942001],
'key_fangraphs': player_id[0],
'key_mlbam': player_id[0],
'bat_hand': 'L' if long_player_id in [
999942004, 999942007, 999942010, 999942018, 999942019, 999942020, 999942022
] else 'R'
},
)
except Exception as e:
logger.warning(e)
banned_ids = {
'fangraphs': [24816, 25782, 26548],
'bbref': ['pagesan01', 'pagespe01', 'pagespe02', 'garcian02']
}
if player_id[0] in banned_ids[key_type]:
logger.info(f'Player ID {player_id[0]} is banned in the {key_type} list.')
return None
q = pb.playerid_reverse_lookup(player_id, key_type=key_type)
if len(q.values) > 0:
return_val = q.loc[0]
# Check manual players
elif full_name is not None:
names = full_name.split(' ')
q = pb.playerid_lookup(last=names[-1], first=' '.join(names[:-1]), fuzzy=True)
if len(q.values) == 0:
logger.error(f'get_all_pybaseball_ids - Could not find id {player_id} / {key_type} or '
f'{full_name} / full name in pybaseball')
return None
elif len(q.values) > 1:
# q = q.drop(q[q['mlb_played_last'].isnull()])
# q.astype({'mlb_played_last': 'int32'})
# q = q.dropna()
q = q.drop(q[q['mlb_played_last'] == ''].index)
q = q.sort_values(by=['mlb_played_last'], ascending=False)
return_val = q.loc[0]
return_val['key_fangraphs'] = player_id[0]
else:
logger.error(f'get_all_pybaseball_ids - Could not find id {player_id} / {key_type} in pybaseball')
return_val = None
# p_query = await db_get('mlbplayers', params=[(f'key_{key_type}', player_id)])
# if p_query['count'] > 0:
# return_val = pd.DataFrame(p_query['players'])
# else:
# logger.error(f'get_all_pybaseball_ids - Could not find id {player_id} / {key_type} in PD mlbplayers table')
# return_val = None
return return_val
def sanitize_name(start_name: str) -> str:
return (start_name
.replace("é", "e")
.replace("á", "a")
.replace(".", "")
.replace("Á", "A")
.replace("ñ", "n")
.replace("ó", "o")
.replace("í", "i")
.replace("ú", "u")
.replace("'", "")
.replace('-', ' '))
def get_hand(df_data):
try:
if df_data['Name'][-1] == '*':
return 'L'
elif df_data['Name'][-1] == '#':
return 'S'
else:
return 'R'
except Exception as e:
logger.error(f'Error in get_hand for {df_data["Name"]}')
return 'R'