paper-dynasty-database/app/services/refractor_boost.py
Cal Corum 7f17c9b9f2 fix: address PR #177 review — move import os to top-level, add audit idempotency guard
- Move `import os` from inside evaluate_game() to module top-level imports
  (lazy imports are only for circular dependency avoidance)
- Add get_or_none idempotency guard before RefractorBoostAudit.create()
  inside db.atomic() to prevent IntegrityError on UNIQUE(card_state, tier)
  constraint in PostgreSQL when apply_tier_boost is called twice for the
  same tier
- Update atomicity test stub to provide card_state/tier attributes for
  the new Peewee expression in the idempotency guard

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 13:16:27 -05:00

699 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Refractor rating boost service (Phase 2).
Pure functions for computing boosted card ratings when a player
reaches a new Refractor tier. The module-level 'db' variable is used by
apply_tier_boost() for atomic writes; tests patch this reference to redirect
writes to a shared-memory SQLite database.
Batter boost: fixed +0.5 to four offensive columns per tier.
Pitcher boost: 1.5 TB-budget priority algorithm per tier.
"""
from decimal import Decimal, ROUND_HALF_UP
import hashlib
import json
import logging
# Module-level db reference imported lazily so that this module can be
# imported before app.db_engine is fully initialised (e.g. in tests that
# patch DATABASE_TYPE before importing db_engine).
# Tests that need to redirect DB writes should patch this attribute at module
# level: `import app.services.refractor_boost as m; m.db = test_db`.
db = None
def _get_db():
"""Return the module-level db, importing lazily on first use."""
global db
if db is None:
from app.db_engine import db as _db # noqa: PLC0415
db = _db
return db
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Batter constants
# ---------------------------------------------------------------------------
BATTER_POSITIVE_DELTAS: dict[str, Decimal] = {
"homerun": Decimal("0.50"),
"double_pull": Decimal("0.50"),
"single_one": Decimal("0.50"),
"walk": Decimal("0.50"),
}
BATTER_NEGATIVE_DELTAS: dict[str, Decimal] = {
"strikeout": Decimal("-1.50"),
"groundout_a": Decimal("-0.50"),
}
# All 22 outcome columns that must sum to 108.
BATTER_OUTCOME_COLUMNS: list[str] = [
"homerun",
"bp_homerun",
"triple",
"double_three",
"double_two",
"double_pull",
"single_two",
"single_one",
"single_center",
"bp_single",
"hbp",
"walk",
"strikeout",
"lineout",
"popout",
"flyout_a",
"flyout_bq",
"flyout_lf_b",
"flyout_rf_b",
"groundout_a",
"groundout_b",
"groundout_c",
]
# ---------------------------------------------------------------------------
# Pitcher constants
# ---------------------------------------------------------------------------
# (column, tb_cost) pairs in priority order.
PITCHER_PRIORITY: list[tuple[str, int]] = [
("double_cf", 2),
("double_three", 2),
("double_two", 2),
("single_center", 1),
("single_two", 1),
("single_one", 1),
("bp_single", 1),
("walk", 1),
("homerun", 4),
("bp_homerun", 4),
("triple", 3),
("hbp", 1),
]
# All 18 variable outcome columns (sum to 79; x-checks add 29 for 108 total).
PITCHER_OUTCOME_COLUMNS: list[str] = [
"homerun",
"bp_homerun",
"triple",
"double_three",
"double_two",
"double_cf",
"single_two",
"single_one",
"single_center",
"bp_single",
"hbp",
"walk",
"strikeout",
"flyout_lf_b",
"flyout_cf_b",
"flyout_rf_b",
"groundout_a",
"groundout_b",
]
# Cross-check columns that are NEVER modified by the boost algorithm.
PITCHER_XCHECK_COLUMNS: list[str] = [
"xcheck_p",
"xcheck_c",
"xcheck_1b",
"xcheck_2b",
"xcheck_3b",
"xcheck_ss",
"xcheck_lf",
"xcheck_cf",
"xcheck_rf",
]
PITCHER_TB_BUDGET = Decimal("1.5")
# ---------------------------------------------------------------------------
# Batter boost
# ---------------------------------------------------------------------------
def apply_batter_boost(ratings_dict: dict) -> dict:
"""Apply one Refractor tier boost to a batter's outcome ratings.
Adds fixed positive deltas to four offensive columns (homerun, double_pull,
single_one, walk) while funding that increase by reducing strikeout and
groundout_a. A 0-floor is enforced on negative columns: if the full
reduction cannot be taken, positive deltas are scaled proportionally so that
the invariant (22 columns sum to 108.0) is always preserved.
Args:
ratings_dict: Dict containing at minimum all 22 BATTER_OUTCOME_COLUMNS
as numeric (int or float) values.
Returns:
New dict with the same keys as ratings_dict, with boosted outcome column
values as floats. All other keys are passed through unchanged.
Raises:
KeyError: If any BATTER_OUTCOME_COLUMNS key is missing from ratings_dict.
"""
result = dict(ratings_dict)
# Step 1 — convert the 22 outcome columns to Decimal for precise arithmetic.
ratings: dict[str, Decimal] = {
col: Decimal(str(result[col])) for col in BATTER_OUTCOME_COLUMNS
}
# Step 2 — apply negative deltas with 0-floor, tracking how much was
# actually removed versus how much was requested.
total_requested_reduction = Decimal("0")
total_actually_reduced = Decimal("0")
for col, delta in BATTER_NEGATIVE_DELTAS.items():
requested = abs(delta)
total_requested_reduction += requested
actual = min(requested, ratings[col])
ratings[col] -= actual
total_actually_reduced += actual
# Step 3 — check whether any truncation occurred.
total_truncated = total_requested_reduction - total_actually_reduced
# Step 4 — scale positive deltas if we couldn't take the full reduction.
if total_truncated > Decimal("0"):
# Positive additions must equal what was actually reduced so the
# 108-sum is preserved.
total_requested_addition = sum(BATTER_POSITIVE_DELTAS.values())
if total_requested_addition > Decimal("0"):
scale = total_actually_reduced / total_requested_addition
else:
scale = Decimal("0")
logger.warning(
"refractor_boost: batter truncation occurred — "
"requested_reduction=%.4f actually_reduced=%.4f scale=%.6f",
float(total_requested_reduction),
float(total_actually_reduced),
float(scale),
)
# Quantize the first N-1 deltas independently, then assign the last
# delta as the remainder so the total addition equals
# total_actually_reduced exactly (no quantize drift across 4 ops).
pos_cols = list(BATTER_POSITIVE_DELTAS.keys())
positive_deltas = {}
running_sum = Decimal("0")
for col in pos_cols[:-1]:
scaled = (BATTER_POSITIVE_DELTAS[col] * scale).quantize(
Decimal("0.000001"), rounding=ROUND_HALF_UP
)
positive_deltas[col] = scaled
running_sum += scaled
last_delta = total_actually_reduced - running_sum
positive_deltas[pos_cols[-1]] = max(last_delta, Decimal("0"))
else:
positive_deltas = BATTER_POSITIVE_DELTAS
# Step 5 — apply (possibly scaled) positive deltas.
for col, delta in positive_deltas.items():
ratings[col] += delta
# Write boosted values back as floats.
for col in BATTER_OUTCOME_COLUMNS:
result[col] = float(ratings[col])
return result
# ---------------------------------------------------------------------------
# Pitcher boost
# ---------------------------------------------------------------------------
def apply_pitcher_boost(ratings_dict: dict, tb_budget: float = 1.5) -> dict:
"""Apply one Refractor tier boost to a pitcher's outcome ratings.
Iterates through PITCHER_PRIORITY in order, converting as many outcome
chances as the TB budget allows into strikeouts. The TB cost per chance
varies by outcome type (e.g. a double costs 2 TB budget units, a single
costs 1). The strikeout column absorbs all converted chances.
X-check columns (xcheck_p through xcheck_rf) are never touched.
Args:
ratings_dict: Dict containing at minimum all 18 PITCHER_OUTCOME_COLUMNS
as numeric (int or float) values.
tb_budget: Total base budget available for this boost tier. Defaults
to 1.5 (PITCHER_TB_BUDGET).
Returns:
New dict with the same keys as ratings_dict, with boosted outcome column
values as floats. All other keys are passed through unchanged.
Raises:
KeyError: If any PITCHER_OUTCOME_COLUMNS key is missing from ratings_dict.
"""
result = dict(ratings_dict)
# Step 1 — convert outcome columns to Decimal, set remaining budget.
ratings: dict[str, Decimal] = {
col: Decimal(str(result[col])) for col in PITCHER_OUTCOME_COLUMNS
}
remaining = Decimal(str(tb_budget))
# Step 2 — iterate priority list, draining budget.
for col, tb_cost in PITCHER_PRIORITY:
if ratings[col] <= Decimal("0"):
continue
tb_cost_d = Decimal(str(tb_cost))
max_chances = remaining / tb_cost_d
chances_to_take = min(ratings[col], max_chances)
ratings[col] -= chances_to_take
ratings["strikeout"] += chances_to_take
remaining -= chances_to_take * tb_cost_d
if remaining <= Decimal("0"):
break
# Step 3 — warn if budget was not fully spent (rare, indicates all priority
# columns were already at zero).
if remaining > Decimal("0"):
logger.warning(
"refractor_boost: pitcher TB budget not fully spent — "
"remaining=%.4f of tb_budget=%.4f",
float(remaining),
tb_budget,
)
# Write boosted values back as floats.
for col in PITCHER_OUTCOME_COLUMNS:
result[col] = float(ratings[col])
return result
# ---------------------------------------------------------------------------
# Variant hash
# ---------------------------------------------------------------------------
def compute_variant_hash(
player_id: int,
refractor_tier: int,
cosmetics: list[str] | None = None,
) -> int:
"""Compute a stable, deterministic variant identifier for a boosted card.
Hashes the combination of player_id, refractor_tier, and an optional sorted
list of cosmetic identifiers to produce a compact integer suitable for use
as a database variant key. The result is derived from the first 8 hex
characters of a SHA-256 digest, so collisions are extremely unlikely in
practice.
variant=0 is reserved and will never be returned; any hash that resolves to
0 is remapped to 1.
Args:
player_id: Player primary key.
refractor_tier: Refractor tier (04) the card has reached.
cosmetics: Optional list of cosmetic tag strings (e.g. special art
identifiers). Order is normalised — callers need not sort.
Returns:
A positive integer in the range [1, 2^32 - 1].
"""
inputs = {
"player_id": player_id,
"refractor_tier": refractor_tier,
"cosmetics": sorted(cosmetics or []),
}
raw = hashlib.sha256(json.dumps(inputs, sort_keys=True).encode()).hexdigest()
result = int(raw[:8], 16)
return result if result != 0 else 1 # variant=0 is reserved
# ---------------------------------------------------------------------------
# Display stat helpers
# ---------------------------------------------------------------------------
def compute_batter_display_stats(ratings: dict) -> dict:
"""Compute avg/obp/slg from batter outcome columns.
Uses the same formulas as the BattingCardRatingsModel Pydantic validator
so that variant card display stats are always consistent with the boosted
chance values. All denominators are 108 (the full card chance total).
Args:
ratings: Dict containing at minimum all BATTER_OUTCOME_COLUMNS as
numeric (int or float) values.
Returns:
Dict with keys 'avg', 'obp', 'slg' as floats.
"""
avg = (
ratings["homerun"]
+ ratings["bp_homerun"] / 2
+ ratings["triple"]
+ ratings["double_three"]
+ ratings["double_two"]
+ ratings["double_pull"]
+ ratings["single_two"]
+ ratings["single_one"]
+ ratings["single_center"]
+ ratings["bp_single"] / 2
) / 108
obp = (ratings["hbp"] + ratings["walk"]) / 108 + avg
slg = (
ratings["homerun"] * 4
+ ratings["bp_homerun"] * 2
+ ratings["triple"] * 3
+ ratings["double_three"] * 2
+ ratings["double_two"] * 2
+ ratings["double_pull"] * 2
+ ratings["single_two"]
+ ratings["single_one"]
+ ratings["single_center"]
+ ratings["bp_single"] / 2
) / 108
return {"avg": avg, "obp": obp, "slg": slg}
def compute_pitcher_display_stats(ratings: dict) -> dict:
"""Compute avg/obp/slg from pitcher outcome columns.
Uses the same formulas as the PitchingCardRatingsModel Pydantic validator
so that variant card display stats are always consistent with the boosted
chance values. All denominators are 108 (the full card chance total).
Args:
ratings: Dict containing at minimum all PITCHER_OUTCOME_COLUMNS as
numeric (int or float) values.
Returns:
Dict with keys 'avg', 'obp', 'slg' as floats.
"""
avg = (
ratings["homerun"]
+ ratings["bp_homerun"] / 2
+ ratings["triple"]
+ ratings["double_three"]
+ ratings["double_two"]
+ ratings["double_cf"]
+ ratings["single_two"]
+ ratings["single_one"]
+ ratings["single_center"]
+ ratings["bp_single"] / 2
) / 108
obp = (ratings["hbp"] + ratings["walk"]) / 108 + avg
slg = (
ratings["homerun"] * 4
+ ratings["bp_homerun"] * 2
+ ratings["triple"] * 3
+ ratings["double_three"] * 2
+ ratings["double_two"] * 2
+ ratings["double_cf"] * 2
+ ratings["single_two"]
+ ratings["single_one"]
+ ratings["single_center"]
+ ratings["bp_single"] / 2
) / 108
return {"avg": avg, "obp": obp, "slg": slg}
# ---------------------------------------------------------------------------
# Orchestration: apply_tier_boost
# ---------------------------------------------------------------------------
def apply_tier_boost(
player_id: int,
team_id: int,
new_tier: int,
card_type: str,
_batting_card_model=None,
_batting_ratings_model=None,
_pitching_card_model=None,
_pitching_ratings_model=None,
_card_model=None,
_state_model=None,
_audit_model=None,
) -> dict:
"""Create a boosted variant card for a tier-up.
IMPORTANT: This function is the SOLE writer of current_tier on
RefractorCardState when a tier-up occurs. The evaluator computes
the new tier but does NOT write it — this function writes tier +
variant + audit atomically inside a single db.atomic() block.
If this function fails, the tier stays at its old value and will
be retried on the next game evaluation.
Orchestrates the full flow (card creation outside atomic; state
mutations inside db.atomic()):
1. Determine source variant (variant=0 for T1, previous tier's hash for T2+)
2. Fetch source card and ratings rows
3. Apply boost formula (batter or pitcher) per vs_hand split
4. Assert 108-sum after boost for both batters and pitchers
5. Compute new variant hash
6. Create new card row with new variant (idempotency: skip if exists)
7. Create new ratings rows for both vs_hand splits (idempotency: skip if exists)
8. Inside db.atomic():
a. Write RefractorBoostAudit record
b. Update RefractorCardState: current_tier, variant, fully_evolved
c. Propagate variant to all Card rows for (player_id, team_id)
Args:
player_id: Player primary key.
team_id: Team primary key.
new_tier: The tier being reached (1-4).
card_type: One of 'batter', 'sp', 'rp'.
_batting_card_model: Injectable stub for BattingCard (used in tests).
_batting_ratings_model: Injectable stub for BattingCardRatings.
_pitching_card_model: Injectable stub for PitchingCard.
_pitching_ratings_model: Injectable stub for PitchingCardRatings.
_card_model: Injectable stub for Card.
_state_model: Injectable stub for RefractorCardState.
_audit_model: Injectable stub for RefractorBoostAudit.
Returns:
Dict with 'variant_created' (int) and 'boost_deltas' (per-split dict).
Raises:
ValueError: If the source card or ratings are missing, or if
RefractorCardState is not found for (player_id, team_id).
"""
# Lazy model imports — same pattern as refractor_evaluator.py.
if _batting_card_model is None:
from app.db_engine import BattingCard as _batting_card_model # noqa: PLC0415
if _batting_ratings_model is None:
from app.db_engine import BattingCardRatings as _batting_ratings_model # noqa: PLC0415
if _pitching_card_model is None:
from app.db_engine import PitchingCard as _pitching_card_model # noqa: PLC0415
if _pitching_ratings_model is None:
from app.db_engine import PitchingCardRatings as _pitching_ratings_model # noqa: PLC0415
if _card_model is None:
from app.db_engine import Card as _card_model # noqa: PLC0415
if _state_model is None:
from app.db_engine import RefractorCardState as _state_model # noqa: PLC0415
if _audit_model is None:
from app.db_engine import RefractorBoostAudit as _audit_model # noqa: PLC0415
_db = _get_db()
if card_type not in ("batter", "sp", "rp"):
raise ValueError(
f"Invalid card_type={card_type!r}; expected one of 'batter', 'sp', 'rp'"
)
is_batter = card_type == "batter"
CardModel = _batting_card_model if is_batter else _pitching_card_model
RatingsModel = _batting_ratings_model if is_batter else _pitching_ratings_model
fk_field = "battingcard" if is_batter else "pitchingcard"
# 1. Determine source variant.
if new_tier == 1:
source_variant = 0
else:
source_variant = compute_variant_hash(player_id, new_tier - 1)
# 2. Fetch source card and ratings rows.
source_card = CardModel.get_or_none(
(CardModel.player == player_id) & (CardModel.variant == source_variant)
)
if source_card is None:
raise ValueError(
f"No {'batting' if is_batter else 'pitching'}card for "
f"player={player_id} variant={source_variant}"
)
ratings_rows = list(
RatingsModel.select().where(getattr(RatingsModel, fk_field) == source_card.id)
)
if not ratings_rows:
raise ValueError(f"No ratings rows for card_id={source_card.id}")
# 3. Apply boost to each vs_hand split.
boost_fn = apply_batter_boost if is_batter else apply_pitcher_boost
outcome_cols = BATTER_OUTCOME_COLUMNS if is_batter else PITCHER_OUTCOME_COLUMNS
boosted_splits: dict[str, dict] = {}
for row in ratings_rows:
# Build the ratings dict: outcome columns + (pitcher) x-check columns.
ratings_dict: dict = {col: getattr(row, col) for col in outcome_cols}
if not is_batter:
for col in PITCHER_XCHECK_COLUMNS:
ratings_dict[col] = getattr(row, col)
boosted = boost_fn(ratings_dict)
# 4. Assert 108-sum invariant after boost (Peewee bypasses Pydantic validators).
if is_batter:
boosted_sum = sum(boosted[col] for col in BATTER_OUTCOME_COLUMNS)
else:
boosted_sum = sum(boosted[col] for col in PITCHER_OUTCOME_COLUMNS) + sum(
boosted[col] for col in PITCHER_XCHECK_COLUMNS
)
if abs(boosted_sum - 108.0) >= 0.01:
raise ValueError(
f"108-sum invariant violated after boost for player={player_id} "
f"vs_hand={row.vs_hand}: sum={boosted_sum:.6f}"
)
boosted_splits[row.vs_hand] = boosted
# 5. Compute new variant hash.
new_variant = compute_variant_hash(player_id, new_tier)
# 6. Create new card row (idempotency: skip if exists).
existing_card = CardModel.get_or_none(
(CardModel.player == player_id) & (CardModel.variant == new_variant)
)
if existing_card is not None:
new_card = existing_card
else:
if is_batter:
clone_fields = [
"steal_low",
"steal_high",
"steal_auto",
"steal_jump",
"bunting",
"hit_and_run",
"running",
"offense_col",
"hand",
]
else:
clone_fields = [
"balk",
"wild_pitch",
"hold",
"starter_rating",
"relief_rating",
"closer_rating",
"batting",
"offense_col",
"hand",
]
card_data: dict = {
"player": player_id,
"variant": new_variant,
"image_url": None, # No rendered image for variant cards yet.
}
for fname in clone_fields:
card_data[fname] = getattr(source_card, fname)
new_card = CardModel.create(**card_data)
# 7. Create new ratings rows for each split (idempotency: skip if exists).
display_stats_fn = (
compute_batter_display_stats if is_batter else compute_pitcher_display_stats
)
for vs_hand, boosted_ratings in boosted_splits.items():
existing_ratings = RatingsModel.get_or_none(
(getattr(RatingsModel, fk_field) == new_card.id)
& (RatingsModel.vs_hand == vs_hand)
)
if existing_ratings is not None:
continue # Idempotency: already written.
ratings_data: dict = {
fk_field: new_card.id,
"vs_hand": vs_hand,
}
# Outcome columns (boosted values).
ratings_data.update({col: boosted_ratings[col] for col in outcome_cols})
# X-check columns for pitchers (unchanged by boost, copy from boosted dict).
if not is_batter:
for col in PITCHER_XCHECK_COLUMNS:
ratings_data[col] = boosted_ratings[col]
# Direction rates for batters: copy from source row.
if is_batter:
source_row = next(r for r in ratings_rows if r.vs_hand == vs_hand)
for rate_col in ("pull_rate", "center_rate", "slap_rate"):
ratings_data[rate_col] = getattr(source_row, rate_col)
# Compute fresh display stats from boosted chance columns.
display_stats = display_stats_fn(boosted_ratings)
ratings_data.update(display_stats)
RatingsModel.create(**ratings_data)
# 8. Load card state — needed for atomic state mutations.
card_state = _state_model.get_or_none(
(_state_model.player == player_id) & (_state_model.team == team_id)
)
if card_state is None:
raise ValueError(
f"No refractor_card_state for player={player_id} team={team_id}"
)
# All state mutations in a single atomic block.
with _db.atomic():
# 8a. Write audit record.
# boost_delta_json stores per-split boosted values including x-check columns
# for pitchers so the full card can be reconstructed from the audit.
audit_data: dict = {
"card_state": card_state.id,
"tier": new_tier,
"variant_created": new_variant,
"boost_delta_json": json.dumps(boosted_splits, default=str),
}
if is_batter:
audit_data["battingcard"] = new_card.id
else:
audit_data["pitchingcard"] = new_card.id
existing_audit = _audit_model.get_or_none(
(_audit_model.card_state == card_state.id) & (_audit_model.tier == new_tier)
)
if existing_audit is None:
_audit_model.create(**audit_data)
# 8b. Update RefractorCardState — this is the SOLE tier write on tier-up.
card_state.current_tier = new_tier
card_state.fully_evolved = new_tier >= 4
card_state.variant = new_variant
card_state.save()
# 8c. Propagate variant to all Card rows for (player_id, team_id).
_card_model.update(variant=new_variant).where(
(_card_model.player == player_id) & (_card_model.team == team_id)
).execute()
logger.debug(
"refractor_boost: applied T%s boost for player=%s team=%s variant=%s",
new_tier,
player_id,
team_id,
new_variant,
)
return {
"variant_created": new_variant,
"boost_deltas": dict(boosted_splits),
}