"""Refractor rating boost service (Phase 2). Pure functions for computing boosted card ratings when a player reaches a new Refractor tier. The module-level 'db' variable is used by apply_tier_boost() for atomic writes; tests patch this reference to redirect writes to a shared-memory SQLite database. Batter boost: fixed +0.5 to four offensive columns per tier. Pitcher boost: 1.5 TB-budget priority algorithm per tier. """ from decimal import Decimal, ROUND_HALF_UP import hashlib import json import logging # Module-level db reference imported lazily so that this module can be # imported before app.db_engine is fully initialised (e.g. in tests that # patch DATABASE_TYPE before importing db_engine). # Tests that need to redirect DB writes should patch this attribute at module # level: `import app.services.refractor_boost as m; m.db = test_db`. db = None def _get_db(): """Return the module-level db, importing lazily on first use.""" global db if db is None: from app.db_engine import db as _db # noqa: PLC0415 db = _db return db logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Batter constants # --------------------------------------------------------------------------- BATTER_POSITIVE_DELTAS: dict[str, Decimal] = { "homerun": Decimal("0.50"), "double_pull": Decimal("0.50"), "single_one": Decimal("0.50"), "walk": Decimal("0.50"), } BATTER_NEGATIVE_DELTAS: dict[str, Decimal] = { "strikeout": Decimal("-1.50"), "groundout_a": Decimal("-0.50"), } # All 22 outcome columns that must sum to 108. BATTER_OUTCOME_COLUMNS: list[str] = [ "homerun", "bp_homerun", "triple", "double_three", "double_two", "double_pull", "single_two", "single_one", "single_center", "bp_single", "hbp", "walk", "strikeout", "lineout", "popout", "flyout_a", "flyout_bq", "flyout_lf_b", "flyout_rf_b", "groundout_a", "groundout_b", "groundout_c", ] # --------------------------------------------------------------------------- # Pitcher constants # --------------------------------------------------------------------------- # (column, tb_cost) pairs in priority order. PITCHER_PRIORITY: list[tuple[str, int]] = [ ("double_cf", 2), ("double_three", 2), ("double_two", 2), ("single_center", 1), ("single_two", 1), ("single_one", 1), ("bp_single", 1), ("walk", 1), ("homerun", 4), ("bp_homerun", 4), ("triple", 3), ("hbp", 1), ] # All 18 variable outcome columns (sum to 79; x-checks add 29 for 108 total). PITCHER_OUTCOME_COLUMNS: list[str] = [ "homerun", "bp_homerun", "triple", "double_three", "double_two", "double_cf", "single_two", "single_one", "single_center", "bp_single", "hbp", "walk", "strikeout", "flyout_lf_b", "flyout_cf_b", "flyout_rf_b", "groundout_a", "groundout_b", ] # Cross-check columns that are NEVER modified by the boost algorithm. PITCHER_XCHECK_COLUMNS: list[str] = [ "xcheck_p", "xcheck_c", "xcheck_1b", "xcheck_2b", "xcheck_3b", "xcheck_ss", "xcheck_lf", "xcheck_cf", "xcheck_rf", ] PITCHER_TB_BUDGET = Decimal("1.5") # --------------------------------------------------------------------------- # Batter boost # --------------------------------------------------------------------------- def apply_batter_boost(ratings_dict: dict) -> dict: """Apply one Refractor tier boost to a batter's outcome ratings. Adds fixed positive deltas to four offensive columns (homerun, double_pull, single_one, walk) while funding that increase by reducing strikeout and groundout_a. A 0-floor is enforced on negative columns: if the full reduction cannot be taken, positive deltas are scaled proportionally so that the invariant (22 columns sum to 108.0) is always preserved. Args: ratings_dict: Dict containing at minimum all 22 BATTER_OUTCOME_COLUMNS as numeric (int or float) values. Returns: New dict with the same keys as ratings_dict, with boosted outcome column values as floats. All other keys are passed through unchanged. Raises: KeyError: If any BATTER_OUTCOME_COLUMNS key is missing from ratings_dict. """ result = dict(ratings_dict) # Step 1 — convert the 22 outcome columns to Decimal for precise arithmetic. ratings: dict[str, Decimal] = { col: Decimal(str(result[col])) for col in BATTER_OUTCOME_COLUMNS } # Step 2 — apply negative deltas with 0-floor, tracking how much was # actually removed versus how much was requested. total_requested_reduction = Decimal("0") total_actually_reduced = Decimal("0") for col, delta in BATTER_NEGATIVE_DELTAS.items(): requested = abs(delta) total_requested_reduction += requested actual = min(requested, ratings[col]) ratings[col] -= actual total_actually_reduced += actual # Step 3 — check whether any truncation occurred. total_truncated = total_requested_reduction - total_actually_reduced # Step 4 — scale positive deltas if we couldn't take the full reduction. if total_truncated > Decimal("0"): # Positive additions must equal what was actually reduced so the # 108-sum is preserved. total_requested_addition = sum(BATTER_POSITIVE_DELTAS.values()) if total_requested_addition > Decimal("0"): scale = total_actually_reduced / total_requested_addition else: scale = Decimal("0") logger.warning( "refractor_boost: batter truncation occurred — " "requested_reduction=%.4f actually_reduced=%.4f scale=%.6f", float(total_requested_reduction), float(total_actually_reduced), float(scale), ) # Quantize the first N-1 deltas independently, then assign the last # delta as the remainder so the total addition equals # total_actually_reduced exactly (no quantize drift across 4 ops). pos_cols = list(BATTER_POSITIVE_DELTAS.keys()) positive_deltas = {} running_sum = Decimal("0") for col in pos_cols[:-1]: scaled = (BATTER_POSITIVE_DELTAS[col] * scale).quantize( Decimal("0.000001"), rounding=ROUND_HALF_UP ) positive_deltas[col] = scaled running_sum += scaled last_delta = total_actually_reduced - running_sum positive_deltas[pos_cols[-1]] = max(last_delta, Decimal("0")) else: positive_deltas = BATTER_POSITIVE_DELTAS # Step 5 — apply (possibly scaled) positive deltas. for col, delta in positive_deltas.items(): ratings[col] += delta # Write boosted values back as floats. for col in BATTER_OUTCOME_COLUMNS: result[col] = float(ratings[col]) return result # --------------------------------------------------------------------------- # Pitcher boost # --------------------------------------------------------------------------- def apply_pitcher_boost(ratings_dict: dict, tb_budget: float = 1.5) -> dict: """Apply one Refractor tier boost to a pitcher's outcome ratings. Iterates through PITCHER_PRIORITY in order, converting as many outcome chances as the TB budget allows into strikeouts. The TB cost per chance varies by outcome type (e.g. a double costs 2 TB budget units, a single costs 1). The strikeout column absorbs all converted chances. X-check columns (xcheck_p through xcheck_rf) are never touched. Args: ratings_dict: Dict containing at minimum all 18 PITCHER_OUTCOME_COLUMNS as numeric (int or float) values. tb_budget: Total base budget available for this boost tier. Defaults to 1.5 (PITCHER_TB_BUDGET). Returns: New dict with the same keys as ratings_dict, with boosted outcome column values as floats. All other keys are passed through unchanged. Raises: KeyError: If any PITCHER_OUTCOME_COLUMNS key is missing from ratings_dict. """ result = dict(ratings_dict) # Step 1 — convert outcome columns to Decimal, set remaining budget. ratings: dict[str, Decimal] = { col: Decimal(str(result[col])) for col in PITCHER_OUTCOME_COLUMNS } remaining = Decimal(str(tb_budget)) # Step 2 — iterate priority list, draining budget. for col, tb_cost in PITCHER_PRIORITY: if ratings[col] <= Decimal("0"): continue tb_cost_d = Decimal(str(tb_cost)) max_chances = remaining / tb_cost_d chances_to_take = min(ratings[col], max_chances) ratings[col] -= chances_to_take ratings["strikeout"] += chances_to_take remaining -= chances_to_take * tb_cost_d if remaining <= Decimal("0"): break # Step 3 — warn if budget was not fully spent (rare, indicates all priority # columns were already at zero). if remaining > Decimal("0"): logger.warning( "refractor_boost: pitcher TB budget not fully spent — " "remaining=%.4f of tb_budget=%.4f", float(remaining), tb_budget, ) # Write boosted values back as floats. for col in PITCHER_OUTCOME_COLUMNS: result[col] = float(ratings[col]) return result # --------------------------------------------------------------------------- # Variant hash # --------------------------------------------------------------------------- def compute_variant_hash( player_id: int, refractor_tier: int, cosmetics: list[str] | None = None, ) -> int: """Compute a stable, deterministic variant identifier for a boosted card. Hashes the combination of player_id, refractor_tier, and an optional sorted list of cosmetic identifiers to produce a compact integer suitable for use as a database variant key. The result is derived from the first 8 hex characters of a SHA-256 digest, so collisions are extremely unlikely in practice. variant=0 is reserved and will never be returned; any hash that resolves to 0 is remapped to 1. Args: player_id: Player primary key. refractor_tier: Refractor tier (0–4) the card has reached. cosmetics: Optional list of cosmetic tag strings (e.g. special art identifiers). Order is normalised — callers need not sort. Returns: A positive integer in the range [1, 2^32 - 1]. """ inputs = { "player_id": player_id, "refractor_tier": refractor_tier, "cosmetics": sorted(cosmetics or []), } raw = hashlib.sha256(json.dumps(inputs, sort_keys=True).encode()).hexdigest() result = int(raw[:8], 16) return result if result != 0 else 1 # variant=0 is reserved # --------------------------------------------------------------------------- # Display stat helpers # --------------------------------------------------------------------------- def compute_batter_display_stats(ratings: dict) -> dict: """Compute avg/obp/slg from batter outcome columns. Uses the same formulas as the BattingCardRatingsModel Pydantic validator so that variant card display stats are always consistent with the boosted chance values. All denominators are 108 (the full card chance total). Args: ratings: Dict containing at minimum all BATTER_OUTCOME_COLUMNS as numeric (int or float) values. Returns: Dict with keys 'avg', 'obp', 'slg' as floats. """ avg = ( ratings["homerun"] + ratings["bp_homerun"] / 2 + ratings["triple"] + ratings["double_three"] + ratings["double_two"] + ratings["double_pull"] + ratings["single_two"] + ratings["single_one"] + ratings["single_center"] + ratings["bp_single"] / 2 ) / 108 obp = (ratings["hbp"] + ratings["walk"]) / 108 + avg slg = ( ratings["homerun"] * 4 + ratings["bp_homerun"] * 2 + ratings["triple"] * 3 + ratings["double_three"] * 2 + ratings["double_two"] * 2 + ratings["double_pull"] * 2 + ratings["single_two"] + ratings["single_one"] + ratings["single_center"] + ratings["bp_single"] / 2 ) / 108 return {"avg": avg, "obp": obp, "slg": slg} def compute_pitcher_display_stats(ratings: dict) -> dict: """Compute avg/obp/slg from pitcher outcome columns. Uses the same formulas as the PitchingCardRatingsModel Pydantic validator so that variant card display stats are always consistent with the boosted chance values. All denominators are 108 (the full card chance total). Args: ratings: Dict containing at minimum all PITCHER_OUTCOME_COLUMNS as numeric (int or float) values. Returns: Dict with keys 'avg', 'obp', 'slg' as floats. """ avg = ( ratings["homerun"] + ratings["bp_homerun"] / 2 + ratings["triple"] + ratings["double_three"] + ratings["double_two"] + ratings["double_cf"] + ratings["single_two"] + ratings["single_one"] + ratings["single_center"] + ratings["bp_single"] / 2 ) / 108 obp = (ratings["hbp"] + ratings["walk"]) / 108 + avg slg = ( ratings["homerun"] * 4 + ratings["bp_homerun"] * 2 + ratings["triple"] * 3 + ratings["double_three"] * 2 + ratings["double_two"] * 2 + ratings["double_cf"] * 2 + ratings["single_two"] + ratings["single_one"] + ratings["single_center"] + ratings["bp_single"] / 2 ) / 108 return {"avg": avg, "obp": obp, "slg": slg} # --------------------------------------------------------------------------- # Orchestration: apply_tier_boost # --------------------------------------------------------------------------- def apply_tier_boost( player_id: int, team_id: int, new_tier: int, card_type: str, _batting_card_model=None, _batting_ratings_model=None, _pitching_card_model=None, _pitching_ratings_model=None, _card_model=None, _state_model=None, _audit_model=None, ) -> dict: """Create a boosted variant card for a tier-up. IMPORTANT: This function is the SOLE writer of current_tier on RefractorCardState when a tier-up occurs. The evaluator computes the new tier but does NOT write it — this function writes tier + variant + audit atomically inside a single db.atomic() block. If this function fails, the tier stays at its old value and will be retried on the next game evaluation. Orchestrates the full flow (card creation outside atomic; state mutations inside db.atomic()): 1. Determine source variant (variant=0 for T1, previous tier's hash for T2+) 2. Fetch source card and ratings rows 3. Apply boost formula (batter or pitcher) per vs_hand split 4. Assert 108-sum after boost for both batters and pitchers 5. Compute new variant hash 6. Create new card row with new variant (idempotency: skip if exists) 7. Create new ratings rows for both vs_hand splits (idempotency: skip if exists) 8. Inside db.atomic(): a. Write RefractorBoostAudit record b. Update RefractorCardState: current_tier, variant, fully_evolved c. Propagate variant to all Card rows for (player_id, team_id) Args: player_id: Player primary key. team_id: Team primary key. new_tier: The tier being reached (1-4). card_type: One of 'batter', 'sp', 'rp'. _batting_card_model: Injectable stub for BattingCard (used in tests). _batting_ratings_model: Injectable stub for BattingCardRatings. _pitching_card_model: Injectable stub for PitchingCard. _pitching_ratings_model: Injectable stub for PitchingCardRatings. _card_model: Injectable stub for Card. _state_model: Injectable stub for RefractorCardState. _audit_model: Injectable stub for RefractorBoostAudit. Returns: Dict with 'variant_created' (int) and 'boost_deltas' (per-split dict). Raises: ValueError: If the source card or ratings are missing, or if RefractorCardState is not found for (player_id, team_id). """ # Lazy model imports — same pattern as refractor_evaluator.py. if _batting_card_model is None: from app.db_engine import BattingCard as _batting_card_model # noqa: PLC0415 if _batting_ratings_model is None: from app.db_engine import BattingCardRatings as _batting_ratings_model # noqa: PLC0415 if _pitching_card_model is None: from app.db_engine import PitchingCard as _pitching_card_model # noqa: PLC0415 if _pitching_ratings_model is None: from app.db_engine import PitchingCardRatings as _pitching_ratings_model # noqa: PLC0415 if _card_model is None: from app.db_engine import Card as _card_model # noqa: PLC0415 if _state_model is None: from app.db_engine import RefractorCardState as _state_model # noqa: PLC0415 if _audit_model is None: from app.db_engine import RefractorBoostAudit as _audit_model # noqa: PLC0415 _db = _get_db() if card_type not in ("batter", "sp", "rp"): raise ValueError( f"Invalid card_type={card_type!r}; expected one of 'batter', 'sp', 'rp'" ) is_batter = card_type == "batter" CardModel = _batting_card_model if is_batter else _pitching_card_model RatingsModel = _batting_ratings_model if is_batter else _pitching_ratings_model fk_field = "battingcard" if is_batter else "pitchingcard" # 1. Determine source variant. if new_tier == 1: source_variant = 0 else: source_variant = compute_variant_hash(player_id, new_tier - 1) # 2. Fetch source card and ratings rows. source_card = CardModel.get_or_none( (CardModel.player == player_id) & (CardModel.variant == source_variant) ) if source_card is None: raise ValueError( f"No {'batting' if is_batter else 'pitching'}card for " f"player={player_id} variant={source_variant}" ) ratings_rows = list( RatingsModel.select().where(getattr(RatingsModel, fk_field) == source_card.id) ) if not ratings_rows: raise ValueError(f"No ratings rows for card_id={source_card.id}") # 3. Apply boost to each vs_hand split. boost_fn = apply_batter_boost if is_batter else apply_pitcher_boost outcome_cols = BATTER_OUTCOME_COLUMNS if is_batter else PITCHER_OUTCOME_COLUMNS boosted_splits: dict[str, dict] = {} for row in ratings_rows: # Build the ratings dict: outcome columns + (pitcher) x-check columns. ratings_dict: dict = {col: getattr(row, col) for col in outcome_cols} if not is_batter: for col in PITCHER_XCHECK_COLUMNS: ratings_dict[col] = getattr(row, col) boosted = boost_fn(ratings_dict) # 4. Assert 108-sum invariant after boost (Peewee bypasses Pydantic validators). if is_batter: boosted_sum = sum(boosted[col] for col in BATTER_OUTCOME_COLUMNS) else: boosted_sum = sum(boosted[col] for col in PITCHER_OUTCOME_COLUMNS) + sum( boosted[col] for col in PITCHER_XCHECK_COLUMNS ) if abs(boosted_sum - 108.0) >= 0.01: raise ValueError( f"108-sum invariant violated after boost for player={player_id} " f"vs_hand={row.vs_hand}: sum={boosted_sum:.6f}" ) boosted_splits[row.vs_hand] = boosted # 5. Compute new variant hash. new_variant = compute_variant_hash(player_id, new_tier) # 6. Create new card row (idempotency: skip if exists). existing_card = CardModel.get_or_none( (CardModel.player == player_id) & (CardModel.variant == new_variant) ) if existing_card is not None: new_card = existing_card else: if is_batter: clone_fields = [ "steal_low", "steal_high", "steal_auto", "steal_jump", "bunting", "hit_and_run", "running", "offense_col", "hand", ] else: clone_fields = [ "balk", "wild_pitch", "hold", "starter_rating", "relief_rating", "closer_rating", "batting", "offense_col", "hand", ] card_data: dict = { "player": player_id, "variant": new_variant, "image_url": None, # No rendered image for variant cards yet. } for fname in clone_fields: card_data[fname] = getattr(source_card, fname) new_card = CardModel.create(**card_data) # 7. Create new ratings rows for each split (idempotency: skip if exists). display_stats_fn = ( compute_batter_display_stats if is_batter else compute_pitcher_display_stats ) for vs_hand, boosted_ratings in boosted_splits.items(): existing_ratings = RatingsModel.get_or_none( (getattr(RatingsModel, fk_field) == new_card.id) & (RatingsModel.vs_hand == vs_hand) ) if existing_ratings is not None: continue # Idempotency: already written. ratings_data: dict = { fk_field: new_card.id, "vs_hand": vs_hand, } # Outcome columns (boosted values). ratings_data.update({col: boosted_ratings[col] for col in outcome_cols}) # X-check columns for pitchers (unchanged by boost, copy from boosted dict). if not is_batter: for col in PITCHER_XCHECK_COLUMNS: ratings_data[col] = boosted_ratings[col] # Direction rates for batters: copy from source row. if is_batter: source_row = next(r for r in ratings_rows if r.vs_hand == vs_hand) for rate_col in ("pull_rate", "center_rate", "slap_rate"): ratings_data[rate_col] = getattr(source_row, rate_col) # Compute fresh display stats from boosted chance columns. display_stats = display_stats_fn(boosted_ratings) ratings_data.update(display_stats) RatingsModel.create(**ratings_data) # 8. Load card state — needed for atomic state mutations. card_state = _state_model.get_or_none( (_state_model.player == player_id) & (_state_model.team == team_id) ) if card_state is None: raise ValueError( f"No refractor_card_state for player={player_id} team={team_id}" ) # All state mutations in a single atomic block. with _db.atomic(): # 8a. Write audit record. # boost_delta_json stores per-split boosted values including x-check columns # for pitchers so the full card can be reconstructed from the audit. audit_data: dict = { "card_state": card_state.id, "tier": new_tier, "variant_created": new_variant, "boost_delta_json": json.dumps(boosted_splits, default=str), } if is_batter: audit_data["battingcard"] = new_card.id else: audit_data["pitchingcard"] = new_card.id existing_audit = _audit_model.get_or_none( (_audit_model.card_state == card_state.id) & (_audit_model.tier == new_tier) ) if existing_audit is None: _audit_model.create(**audit_data) # 8b. Update RefractorCardState — this is the SOLE tier write on tier-up. card_state.current_tier = new_tier card_state.fully_evolved = new_tier >= 4 card_state.variant = new_variant card_state.save() # 8c. Propagate variant to all Card rows for (player_id, team_id). _card_model.update(variant=new_variant).where( (_card_model.player == player_id) & (_card_model.team == team_id) ).execute() logger.debug( "refractor_boost: applied T%s boost for player=%s team=%s variant=%s", new_tier, player_id, team_id, new_variant, ) return { "variant_created": new_variant, "boost_deltas": dict(boosted_splits), }