Merge pull request 'chore/cleanup-legacy-files' (#5) from chore/cleanup-legacy-files into main
All checks were successful
Build Docker Image / build (push) Successful in 1m9s

Reviewed-on: #5
This commit is contained in:
cal 2026-02-05 19:33:31 +00:00
commit 86dbe871a1
15 changed files with 4 additions and 10448 deletions

2
=2.9.0
View File

@ -1,2 +0,0 @@
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: psycopg2-binary in /home/cal/.local/lib/python3.13/site-packages (2.9.10)

View File

@ -1,350 +0,0 @@
# Data Consistency Analysis Report
## Major Domo Database - Refactored Code vs Production API
Generated: 2026-02-03
---
## Executive Summary
| Status | Critical Issues | Medium Issues | Low Issues |
|--------|----------------|---------------|------------|
| 🟡 MEDIUM | 2 | 4 | 6 |
**Note:** Previous analysis incorrectly compared against Paper Dynasty database.
This analysis correctly compares against **Major Domo** database schema.
---
## Critical Issues (Must Fix)
### 1. ROUTER CALLS SERVICE METHODS INCORRECTLY
**File:** `app/routers_v3/players.py:29`
**Problem:** Router calls service methods as static, but they're instance methods.
```python
# Router calls:
result = PlayerService.get_players(season=10, ...)
# Service defines:
def get_players(self, season, team_id, ...):
```
**Impact:** Runtime error - will crash when endpoint is called.
**Fix Required:** Router must instantiate service or make methods `@classmethod`.
---
### 2. WRONG IMPORT PATH
**File:** `app/routers_v3/players.py:10` and `app/routers_v3/teams.py:10`
**Problem:** Imports `from .base import BaseService` but file doesn't exist at that path.
```python
from .base import BaseService # ❌ File does not exist in routers_v3/!
# Should be:
from ..services.base import BaseService
```
**Impact:** Import error on startup.
**Fix Required:** Correct import path to `..services.base`.
---
## Medium Issues (Should Fix)
### 3. TEAM RESPONSE FIELDS - Major Domo Schema
**Major Domo Team Model (`db_engine.py`):**
```python
class Team(BaseModel):
abbrev = CharField()
sname = CharField()
lname = CharField()
manager_legacy = CharField(null=True)
division_legacy = CharField(null=True)
gmid = CharField(max_length=20, null=True) # Discord snowflake
gmid2 = CharField(max_length=20, null=True) # Discord snowflake
manager1 = ForeignKeyField(Manager, null=True)
manager2 = ForeignKeyField(Manager, null=True)
division = ForeignKeyField(Division, null=True)
mascot = CharField(null=True)
stadium = CharField(null=True)
gsheet = CharField(null=True)
thumbnail = CharField(null=True)
color = CharField(null=True)
dice_color = CharField(null=True)
season = IntegerField()
auto_draft = BooleanField(null=True)
salary_cap = FloatField(null=True)
```
**Refactored Service (`team_service.py`) Returns:**
```python
model_to_dict(t, recurse=not short_output)
```
**Field Comparison:**
| Field | DB Model | Refactored | Notes |
|-------|----------|------------|-------|
| `id` | ✅ | ✅ | Auto-increment PK |
| `abbrev` | ✅ | ✅ | Team abbreviation |
| `sname` | ✅ | ✅ | Short name |
| `lname` | ✅ | ✅ | Full name |
| `gmid` | ✅ | ✅ | Discord GM ID |
| `gmid2` | ✅ | ✅ | Secondary GM ID |
| `manager1` | ✅ (FK) | ✅ | FK object or ID |
| `manager2` | ✅ (FK) | ✅ | FK object or ID |
| `division` | ✅ (FK) | ✅ | FK object or ID |
| `season` | ✅ | ✅ | Season number |
| `mascot` | ✅ | ✅ | Team mascot |
| `stadium` | ✅ | ✅ | Stadium name |
| `gsheet` | ✅ | ✅ | Google Sheet URL |
| `thumbnail` | ✅ | ✅ | Logo URL |
| `color` | ✅ | ✅ | Team color hex |
| `dice_color` | ✅ | ✅ | Dice color hex |
**Status:** ✅ **Fields match the Major Domo schema correctly.**
---
### 4. PLAYER RESPONSE FIELDS - Major Domo Schema
**Major Domo Player Model (`db_engine.py`):**
```python
class Player(BaseModel):
name = CharField(max_length=500)
wara = FloatField()
image = CharField(max_length=1000)
image2 = CharField(max_length=1000, null=True)
team = ForeignKeyField(Team)
season = IntegerField()
pitcher_injury = IntegerField(null=True)
pos_1 = CharField(max_length=5)
pos_2 = CharField(max_length=5, null=True)
pos_3 = CharField(max_length=5, null=True)
pos_4 = CharField(max_length=5, null=True)
pos_5 = CharField(max_length=5, null=True)
pos_6 = CharField(max_length=5, null=True)
pos_7 = CharField(max_length=5, null=True)
pos_8 = CharField(max_length=5, null=True)
last_game = CharField(max_length=20, null=True)
last_game2 = CharField(max_length=20, null=True)
il_return = CharField(max_length=20, null=True)
demotion_week = IntegerField(null=True)
headshot = CharField(max_length=500, null=True)
vanity_card = CharField(max_length=500, null=True)
strat_code = CharField(max_length=100, null=True)
bbref_id = CharField(max_length=50, null=True)
injury_rating = CharField(max_length=50, null=True)
sbaplayer = ForeignKeyField(SbaPlayer, null=True)
```
**Refactored Service Returns:**
```python
model_to_dict(player, recurse=not short_output)
```
**Field Comparison:**
| Field | DB Model | Refactored | Notes |
|-------|----------|------------|-------|
| `id` | ✅ | ✅ | Auto-increment PK |
| `name` | ✅ | ✅ | Player name |
| `wara` | ✅ | ✅ | WAR above replacement |
| `image` | ✅ | ✅ | Player image URL |
| `image2` | ✅ | ✅ | Secondary image URL |
| `team` | ✅ (FK) | ✅ | FK to Team |
| `season` | ✅ | ✅ | Season number |
| `pos_1` | ✅ | ✅ | Primary position |
| `pos_2` - `pos_8` | ✅ | ✅ | Additional positions |
| `il_return` | ✅ | ✅ | Injury list return week |
| `demotion_week` | ✅ | ✅ | Demotion week |
| `strat_code` | ✅ | ✅ | Stratification code |
| `bbref_id` | ✅ | ✅ | Baseball Reference ID |
| `injury_rating` | ✅ | ✅ | Injury rating |
| `sbaplayer` | ✅ (FK) | ✅ | FK to SbaPlayer |
**Status:** ✅ **Fields match the Major Domo schema correctly.**
---
### 5. FILTER PARAMETERS - Compatibility Check
**Router Parameters → Service Parameters:**
| Router | Service | Match? |
|--------|---------|--------|
| `season` | `season` | ✅ |
| `team_id` | `team_id` | ✅ |
| `pos` | `pos` | ✅ |
| `strat_code` | `strat_code` | ✅ |
| `name` | `name` | ✅ |
| `is_injured` | `is_injured` | ✅ |
| `sort` | `sort` | ✅ |
| `short_output` | `short_output` | ✅ |
| `csv` | `as_csv` | ⚠️ Different name |
**Issue:** Router uses `csv` parameter but service expects `as_csv`.
**Status:** ⚠️ **Parameter name mismatch - may cause CSV export to fail.**
---
### 6. SEARCH ENDPOINT STRUCTURE
**Router (`players.py:47`):**
```python
return PlayerService.search_players(
query_str=q,
season=season,
limit=limit,
short_output=short_output
)
```
**Service (`player_service.py`):**
```python
def search_players(self, query_str, season, limit, short_output):
return {
"count": len(results),
"total_matches": len(exact_matches + partial_matches),
"all_seasons": search_all_seasons,
"players": results
}
```
**Status:** ✅ **Structure matches.**
---
### 7. ROUTER PARAMETER HANDLING
**Issue:** Router converts empty lists to `None`:
```python
team_id=team_id if team_id else None,
```
**Expected Behavior:** Empty list `[]` should be treated as "no filter" (return all), which is handled correctly.
**Status:** ✅ **Correct.**
---
## Low Issues (Nice to Fix)
### 8. AUTHENTICATION
**Router:** Uses `oauth2_scheme` dependency
**Service:** Uses `require_auth(token)` method
**Status:** ✅ **Compatible.**
---
### 9. ERROR RESPONSES
**Router:** Returns FastAPI HTTPException
**Service:** Raises HTTPException with status_code
**Example:**
```json
{"detail": "Player ID X not found"}
```
**Status:** ✅ **Compatible.**
---
### 10. CACHE KEY PATTERNS
**PlayerService uses:**
```python
cache_patterns = [
"players*",
"players-search*",
"player*",
"team-roster*"
]
```
**Status:** ⚠️ **Should be validated against actual Redis configuration.**
---
### 11. CSV OUTPUT FORMAT
**Service uses:** `query_to_csv(query, exclude=[...])`
**Status:** ⚠️ **Headers depend on `model_to_dict` output - should be verified.**
---
### 12. SHORT OUTPUT BEHAVIOR
**Router:** `short_output=False` by default
**Service:** `model_to_dict(player, recurse=not short_output)`
- `short_output=False``recurse=True` → Includes FK objects (team, etc.)
- `short_output=True``recurse=False` → Only direct fields
**Status:** ✅ **Logical and correct.**
---
## Comparison Summary
| Component | Status | Notes |
|-----------|--------|-------|
| Team fields | ✅ Match | Major Domo schema correctly |
| Player fields | ✅ Match | Major Domo schema correctly |
| Filter parameters | ⚠️ Partial | `csv` vs `as_csv` mismatch |
| Search structure | ✅ Match | count, total_matches, all_seasons, players |
| Authentication | ✅ Match | oauth2_scheme compatible |
| Error format | ✅ Match | HTTPException compatible |
| Service calls | ❌ Broken | Instance vs static method issue |
| Import paths | ❌ Broken | Wrong path `from .base` |
---
## Recommendations
### Immediate (Must Do)
1. **Fix router-service method call mismatch**
- Change service methods to `@classmethod` OR
- Router instantiates service with dependencies
2. **Fix import paths**
- `from .base``from ..services.base`
### Before Release
3. **Standardize CSV parameter name**
- Change service parameter from `as_csv` to `csv`
- Or document the difference
4. **Add integration tests**
- Test against actual Major Domo database
- Verify field output matches expected schema
---
## Conclusion
The refactored code has **2 critical issues** that will cause startup/runtime failures:
1. Router calls instance methods as static
2. Wrong import path
Once fixed, the **field schemas are correct** for the Major Domo database. The service layer properly implements the Major Domo models.
**Recommendation:** Fix the critical issues and proceed with integration testing.

View File

@ -1,53 +0,0 @@
# Use specific version instead of 'latest' for reproducible builds
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.11-slim
# Set environment variables for Python optimization
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONPATH=/app
ENV PIP_NO_CACHE_DIR=1
ENV PIP_DISABLE_PIP_VERSION_CHECK=1
# Create non-root user for security
RUN groupadd -r sba && useradd -r -g sba sba
# Set working directory
WORKDIR /usr/src/app
# Install system dependencies in a single layer
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libpq-dev \
curl \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
# Copy requirements first for better layer caching
COPY requirements.txt ./
# Install Python dependencies with optimizations
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY ./app /app/app
# Create necessary directories and set permissions
RUN mkdir -p /usr/src/app/storage /usr/src/app/logs && \
chown -R sba:sba /usr/src/app && \
chmod -R 755 /usr/src/app
# Health check for container monitoring
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:80/api/v3/current || exit 1
# Switch to non-root user
USER sba
# Expose port
EXPOSE 80
# Add labels for metadata
LABEL maintainer="SBA League Management"
LABEL version="1.0"
LABEL description="Major Domo Database API"

View File

@ -1,231 +0,0 @@
# Major Domo Database - Dependency Injection Refactor
**Branch:** `jarvis/testability`
**Status:** Paused for E2E Testing
**Last Updated:** 2026-02-03
---
## Executive Summary
Refactored `PlayerService` and `TeamService` to use dependency injection for testability, enabling unit tests without a real database connection.
**Key Achievement:** 90.7% test coverage on refactored code.
---
## What Was Done
### 1. Dependency Injection Architecture
**Files Created:**
- `app/services/interfaces.py` - Abstract protocols (AbstractPlayerRepository, AbstractTeamRepository, AbstractCacheService)
- `app/services/mocks.py` - Mock implementations for testing
- `app/services/base.py` - BaseService with common functionality
**Files Refactored:**
- `app/services/player_service.py` - Full DI implementation
- `app/services/team_service.py` - DI implementation (was already classmethods)
- `app/routers_v3/players.py` - Fixed import paths
- `app/routers_v3/teams.py` - Fixed import paths
**Test Files Created:**
- `tests/unit/test_player_service.py` - 35+ tests
- `tests/unit/test_team_service.py` - 25+ tests
- `tests/unit/test_base_service.py` - 10+ tests
### 2. Key Design Decisions
**Protocol-based DI (not ABCs):**
```python
@runtime_checkable
class AbstractPlayerRepository(Protocol):
def select_season(self, season: int) -> QueryResult: ...
def get_by_id(self, player_id: int) -> Optional[PlayerData]: ...
```
**Lazy Loading with Fallback:**
```python
@property
def player_repo(self) -> AbstractPlayerRepository:
if self._player_repo is not None:
return self._player_repo
# Fall back to real DB for production
from ..db_engine import Player
return RealPlayerRepository(Player)
```
**Repo-Agnostic Filtering:**
```python
def _apply_player_filters(self, query, team_id, pos, strat_code, ...):
# Check if using mock (dict) or real DB (Peewee model)
first_item = next(iter(query), None)
if first_item is not None and not isinstance(first_item, dict):
# Use DB-native filtering
from ..db_engine import Player
query = query.where(Player.team_id << team_id)
else:
# Use Python filtering for mocks
filtered = [p for p in query if p.get('team_id') in team_id]
query = InMemoryQueryResult(filtered)
```
### 3. Test Coverage
| Metric | Value |
|--------|-------|
| Total Test Lines | 1,210 |
| Service Lines | 1,334 |
| Coverage | **90.7%** |
**Tests Cover:**
- ✅ CRUD operations (create, read, update, delete)
- ✅ Filtering (team_id, pos, strat_code, is_injured)
- ✅ Sorting (cost-asc/desc, name-asc/desc)
- ✅ Search (exact, partial, no results)
- ✅ Cache operations
- ✅ Authentication
- ✅ Edge cases
---
## Current State
### ✅ Working
- PlayerService with full DI
- TeamService with full DI
- Unit tests pass without DB
- Router-service integration fixed
- Import paths corrected
### ⚠️ Known Issues
1. CSV parameter name mismatch: Router uses `csv`, service expects `as_csv`
2. Cache key patterns need validation against Redis config
3. Only Player and Team services refactored (other 18 routers still use direct DB imports)
### ❌ Not Started
- Other routers (divisions, battingstats, pitchingstats, etc.)
- Integration tests with real DB
- Performance benchmarking
---
## How to Run Tests
```bash
cd major-domo-database
# Run unit tests (no DB needed)
python3 -c "
from app.services.player_service import PlayerService
from app.services.mocks import MockPlayerRepository, MockCacheService
repo = MockPlayerRepository()
repo.add_player({'id': 1, 'name': 'Test', ...})
cache = MockCacheService()
service = PlayerService(player_repo=repo, cache=cache)
result = service.get_players(season=10)
print(f'Count: {result[\"count\"]}')
"
# Full pytest suite
pytest tests/ -v
```
---
## API Compatibility
### Response Structures (Verified for Major Domo)
**Team Response:**
```json
{
"id": 1,
"abbrev": "ARI",
"sname": "Diamondbacks",
"lname": "Arizona Diamondbacks",
"gmid": "69420666",
"manager1": {"id": 1, "name": "..."},
"manager2": null,
"division": {"id": 1, ...},
"season": 10,
...
}
```
**Player Response:**
```json
{
"id": 1,
"name": "Mike Trout",
"wara": 5.2,
"team": {"id": 1, "abbrev": "ARI", ...},
"season": 10,
"pos_1": "CF",
"pos_2": "LF",
...
}
```
---
## To Continue This Work
### Option 1: Complete Full Refactor
1. Apply same pattern to remaining 18 routers
2. Create services for: Division, Manager, Standing, Schedule, Transaction, etc.
3. Add integration tests
4. Set up CI/CD pipeline
### Option 2: Expand Test Coverage
1. Add integration tests with real PostgreSQL
2. Add performance benchmarks
3. Add cache invalidation tests
### Option 3: Merge and Pause
1. Merge `jarvis/testability` into main
2. Document pattern for future contributors
3. Return when needed
---
## Files Reference
```
major-domo-database/
├── app/
│ ├── services/
│ │ ├── base.py # BaseService with auth, caching, error handling
│ │ ├── interfaces.py # Protocol definitions
│ │ ├── mocks.py # Mock implementations
│ │ ├── player_service.py # Full DI implementation
│ │ └── team_service.py # Full DI implementation
│ ├── routers_v3/
│ │ ├── players.py # Fixed imports, calls PlayerService
│ │ └── teams.py # Fixed imports, calls TeamService
│ └── db_engine.py # Original models (unchanged)
├── tests/
│ └── unit/
│ ├── test_base_service.py
│ ├── test_player_service.py
│ └── test_team_service.py
└── DATA_CONSISTENCY_REPORT.md # Detailed analysis
```
---
## Questions to Answer After E2E Testing
1. Do the refactored endpoints return the same data as before?
2. Are there any performance regressions?
3. Does authentication work correctly?
4. Do cache invalidations work as expected?
5. Are there any edge cases that fail?
---
## Contact
**Context:** This work was started to enable testability of the Major Domo codebase. The PoC demonstrates the pattern works for Player and Team. The same pattern should be applied to other models as needed.

View File

@ -1 +1 @@
2.5.3
2.5.4

View File

@ -1,185 +0,0 @@
"""
Data Consistency Validator
Compares refactored service layer output with expected router output.
"""
# ============================================================================
# DATA STRUCTURE COMPARISON
# ============================================================================
"""
EXPECTED OUTPUT STRUCTURES (from router definition):
===================================================
GET /api/v3/players
------------------
Response: {
"count": int,
"players": [
{
"id": int,
"name": str,
"wara": float,
"image": str,
"image2": str | None,
"team_id": int,
"season": int,
"pitcher_injury": str | None,
"pos_1": str,
"pos_2": str | None,
...
"team": { "id": int, "abbrev": str, "sname": str, ... } | int # if short_output
}
]
}
GET /api/v3/players/{player_id}
-------------------------------
Response: Player dict or null
GET /api/v3/players/search
--------------------------
Response: {
"count": int,
"total_matches": int,
"all_seasons": bool,
"players": [Player dicts]
}
GET /api/v3/teams
------------------
Response: {
"count": int,
"teams": [
{
"id": int,
"abbrev": str,
"sname": str,
"lname": str,
"gmid": int | None,
"gmid2": int | None,
"manager1_id": int | None,
"manager2_id": int | None,
"division_id": int | None,
"stadium": str | None,
"thumbnail": str | None,
"color": str | None,
"dice_color": str | None,
"season": int
}
]
}
GET /api/v3/teams/{team_id}
----------------------------
Response: Team dict or null
EXPECTED BEHAVIOR DIFFERENCES (Issues Found):
=============================================
1. STATIC VS INSTANCE METHOD MISMATCH
PlayerService.get_players() - Called as static in router
ISSUE: Method has `self` parameter - will fail!
TeamService.get_teams() - Correctly uses @classmethod
OK: Uses cls instead of self
2. FILTER FIELD INCONSISTENCY
Router: name=str (exact match filter)
Service: name.lower() comparison
ISSUE: Different behavior!
3. POSITION FILTER INCOMPLETE
Router: pos=[list of positions]
Service: Only checks pos_1 through pos_8
OK: Actually correct implementation
4. CSV OUTPUT DIFFERENCE
Router: csv=bool, returns Response with content
Service: as_csv=bool, returns CSV string
OK: Just parameter name difference
5. INJURED FILTER SEMANTICS
Router: is_injured=True show injured players
Service: is_injured is not None filter il_return IS NOT NULL
OK: Same behavior
6. SORT PARAMETER MAPPING
Router: sort="name-asc" | "cost-desc" | etc
Service: Maps to Player.name.asc(), Player.wara.desc()
OK: Correct mapping
7. DEPENDENCY INJECTION INCOMPLETE
Service imports: from ..db_engine import Player, Team
ISSUE: Still uses direct model imports for filtering!
Service uses: Player.team_id << team_id (Peewee query)
ISSUE: This won't work with MockPlayerRepository!
Service uses: peewee_fn.lower(Player.strat_code)
ISSUE: This won't work with MockPlayerRepository!
ISSUE: MockPlayerRepository doesn't support peewee_fn!
8. RESPONSE FIELD DIFFERENCES
get_players: count + players [ match]
get_one_player: returns dict or null [ match]
search_players: count + players + all_seasons [ match]
get_teams: count + teams [ match]
get_one_team: returns dict or null [ match]
"""
# ============================================================================
# RECOMMENDED FIXES
# ============================================================================
"""
To make refactored code return EXACT SAME data:
1. FIX PLAYERSERVICE METHOD SIGNATURE
Current:
def get_players(self, season, team_id, pos, strat_code, name, ...):
Fix: Add @classmethod decorator
def get_players(cls, season, team_id, pos, strat_code, name, ...):
- Use cls instead of self
- Use Team.select() instead of self.team_repo
2. STANDARDIZE PARAMETER NAMES
Rename:
- as_csv csv (to match router)
- short_output stays (both use same)
3. IMPLEMENT REPO-AGNOSTIC FILTERING
Current (broken):
query.where(Player.team_id << team_id)
Fix for Mock:
def _apply_filters(query, team_id, pos, strat_code, name, is_injured):
result = []
for item in query:
if team_id and item.get('team_id') not in team_id:
continue
if strat_code and item.get('strat_code', '').lower() not in [s.lower() for s in strat_code]:
continue
result.append(item)
return result
4. REMOVE DEPENDENCY ON peewee_fn IN SERVICE LAYER
Current:
query.where(peewee_fn.lower(Player.name) == name.lower())
Fix: Do string comparison in Python
for player in query:
if name and player.name.lower() != name.lower():
continue
5. REMOVE UNNECESSARY IMPORTS
Current in player_service.py:
from peewee import fn as peewee_fn
from ..db_engine import Player
These imports break the dependency injection pattern.
The service should ONLY use the repo interface.
"""
print(__doc__)

File diff suppressed because it is too large Load Diff

View File

@ -6,8 +6,8 @@ networks:
services:
api:
# build: .
image: manticorum67/major-domo-database:dev-pg
build: .
# image: manticorum67/major-domo-database:dev-pg
restart: unless-stopped
container_name: sba_db_api
volumes:
@ -17,7 +17,7 @@ services:
- 801:80
networks:
- default
- nginx-proxy-manager_npm_network
# - nginx-proxy-manager_npm_network # Commented for local testing
environment:
- TESTING=False
- LOG_LEVEL=${LOG_LEVEL}

7304
main.py

File diff suppressed because it is too large Load Diff

View File

@ -1,50 +0,0 @@
from playhouse.migrate import *
import app.db_engine as db_engine
migrator = SqliteMigrator(db_engine.db)
# pubdate_field = DateTimeField(null=True)
# comment_field = TextField(default='')
# pitcher_injury = IntegerField(null=True)
# pos_1 = CharField(default='None')
# pos_2 = CharField(null=True)
# hand_batting = CharField(null=True)
# hand_pitching = CharField(null=True)
# re24_primary = FloatField(null=True)
# re24_running = FloatField(null=True)
# last_game2 = CharField(null=True)
# division = ForeignKeyField(db_engine.Division, field=db_engine.Division.id, null=True) # for division migration
# manager = ForeignKeyField(db_engine.Manager, field=db_engine.Manager.id, null=True) # for manager table
# p_career = ForeignKeyField(db_engine.PitchingCareer, field=db_engine.PitchingCareer.id, null=True) # for careers
# b_career = ForeignKeyField(db_engine.BattingCareer, field=db_engine.BattingCareer.id, null=True) # for careers
# f_career = ForeignKeyField(db_engine.FieldingCareer, field=db_engine.FieldingCareer.id, null=True) # for careers
away_manager = ForeignKeyField(db_engine.Manager, field=db_engine.Manager.id, null=True) # to add Manager to games
home_manager = ForeignKeyField(db_engine.Manager, field=db_engine.Manager.id, null=True) # to add Manager to games
team = ForeignKeyField(db_engine.Team, field=db_engine.Team.id, null=True) # to add Team to decisions
migrate(
# migrator.add_column('team', 'division_id', division), # for division migration
# migrator.add_column('team', 'manager1_id', manager), # for manager table
# migrator.add_column('team', 'manager2_id', manager), # for manager table
# migrator.add_column('battingseason', 'career_id', b_career), # for career stats
# migrator.add_column('pitchingseason', 'career_id', p_career), # for career stats
# migrator.add_column('fieldingseason', 'career_id', f_career), # for career stats
# migrator.add_column('player', 'last_game2', last_game2),
# migrator.add_column('player', 'pos_1', pos_1),
# migrator.add_column('comment_tbl', 'comment', comment_field),
# migrator.rename_column('team', 'division', 'division_legacy'),
# migrator.drop_column('story', 'some_old_field'),
# migrator.drop_not_null('team', 'abbrev'),
# migrator.add_not_null('story', 'modified_date'),
# migrator.rename_table('story', 'stories_tbl'),
# migrator.drop_index('team', 'team_abbrev'),
# migrator.drop_index('player', 'player_name')
migrator.add_column('decision', 'team', team),
# migrator.add_column('stratplay', 'hand_batting', hand_batting),
# migrator.add_column('stratplay', 'hand_pitching', hand_pitching),
# migrator.add_column('stratplay', 're24_primary', re24_primary),
# migrator.add_column('stratplay', 're24_running', re24_running)
)

Binary file not shown.

View File

@ -1,458 +0,0 @@
#!/usr/bin/env python3
"""
Migration script to transfer custom commands from old database to new schema.
This script:
1. Reads existing commands and creators from sba_is_fun.db
2. Maps the old schema to the new custom_commands schema
3. Migrates all data preserving relationships and metadata
4. Provides detailed logging and validation
Usage:
python migrate_custom_commands.py --source /path/to/sba_is_fun.db --target /path/to/sba_master.db [--dry-run]
"""
import sqlite3
import json
import logging
import argparse
from datetime import datetime
from typing import Dict, List, Tuple, Optional
class CustomCommandMigrator:
def __init__(self, source_db: str, target_db: str, dry_run: bool = False):
self.source_db = source_db
self.target_db = target_db
self.dry_run = dry_run
self.setup_logging()
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'migration_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('migrate_custom_commands.CustomCommandMigrator')
def validate_source_database(self) -> bool:
"""Validate that source database has expected tables and structure"""
try:
conn = sqlite3.connect(self.source_db)
cursor = conn.cursor()
# Check for required tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('command', 'creator');")
tables = [row[0] for row in cursor.fetchall()]
if 'command' not in tables or 'creator' not in tables:
self.logger.error(f"Required tables missing. Found: {tables}")
return False
# Check command table structure
cursor.execute("PRAGMA table_info(command);")
command_cols = [row[1] for row in cursor.fetchall()]
required_command_cols = ['id', 'name', 'message', 'creator_id', 'createtime']
for col in required_command_cols:
if col not in command_cols:
self.logger.error(f"Required column '{col}' missing from command table")
return False
# Check creator table structure
cursor.execute("PRAGMA table_info(creator);")
creator_cols = [row[1] for row in cursor.fetchall()]
required_creator_cols = ['id', 'name', 'discordid']
for col in required_creator_cols:
if col not in creator_cols:
self.logger.error(f"Required column '{col}' missing from creator table")
return False
conn.close()
self.logger.info("Source database validation passed")
return True
except Exception as e:
self.logger.error(f"Error validating source database: {e}")
return False
def validate_target_database(self) -> bool:
"""Validate that target database has the new custom_commands tables"""
try:
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
# Check for required tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('custom_commands', 'custom_command_creators');")
tables = [row[0] for row in cursor.fetchall()]
if 'custom_commands' not in tables or 'custom_command_creators' not in tables:
self.logger.error(f"Target tables missing. Found: {tables}")
self.logger.error("Please ensure the new custom_commands schema has been created first")
return False
conn.close()
self.logger.info("Target database validation passed")
return True
except Exception as e:
self.logger.error(f"Error validating target database: {e}")
return False
def load_source_data(self) -> Tuple[List[Dict], List[Dict]]:
"""Load creators and commands from source database"""
conn = sqlite3.connect(self.source_db)
cursor = conn.cursor()
# Load creators
self.logger.info("Loading creators from source database...")
cursor.execute("SELECT id, name, discordid FROM creator ORDER BY id;")
creators_raw = cursor.fetchall()
creators = []
for row in creators_raw:
creators.append({
'old_id': row[0],
'name': row[1],
'discord_id': row[2]
})
self.logger.info(f"Loaded {len(creators)} creators")
# Load commands
self.logger.info("Loading commands from source database...")
cursor.execute("""
SELECT c.id, c.name, c.message, c.creator_id, c.createtime, c.last_used, c.sent_warns,
cr.name as creator_name, cr.discordid as creator_discord_id
FROM command c
LEFT JOIN creator cr ON c.creator_id = cr.id
ORDER BY c.id;
""")
commands_raw = cursor.fetchall()
commands = []
for row in commands_raw:
# Parse last_used datetime
last_used = None
if row[5]: # last_used
try:
last_used = datetime.fromisoformat(row[5]).isoformat()
except:
last_used = row[5] # Keep original if parsing fails
# Parse createtime
created_at = None
if row[4]: # createtime
try:
created_at = datetime.fromisoformat(row[4]).isoformat()
except:
created_at = row[4] # Keep original if parsing fails
commands.append({
'old_id': row[0],
'name': row[1],
'content': row[2], # message -> content
'old_creator_id': row[3],
'created_at': created_at,
'last_used': last_used,
'sent_warns': row[6],
'creator_name': row[7],
'creator_discord_id': row[8]
})
self.logger.info(f"Loaded {len(commands)} commands")
conn.close()
return creators, commands
def migrate_creators(self, creators: List[Dict]) -> Dict[int, int]:
"""Migrate creators and return mapping of old_id -> new_id"""
if self.dry_run:
self.logger.info(f"[DRY RUN] Would migrate {len(creators)} creators")
return {creator['old_id']: creator['old_id'] for creator in creators} # Mock mapping
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
creator_id_mapping = {}
now = datetime.now().isoformat()
for creator in creators:
try:
# Check if creator already exists by discord_id
cursor.execute("SELECT id FROM custom_command_creators WHERE discord_id = ?", (creator['discord_id'],))
existing = cursor.fetchone()
if existing:
creator_id_mapping[creator['old_id']] = existing[0]
self.logger.info(f"Creator '{creator['name']}' (Discord: {creator['discord_id']}) already exists with ID {existing[0]}")
continue
# Insert new creator
cursor.execute("""
INSERT INTO custom_command_creators
(discord_id, username, display_name, created_at, total_commands, active_commands)
VALUES (?, ?, ?, ?, 0, 0)
""", (creator['discord_id'], creator['name'], creator['name'], now))
new_id = cursor.lastrowid
creator_id_mapping[creator['old_id']] = new_id
self.logger.info(f"Migrated creator '{creator['name']}': {creator['old_id']} -> {new_id}")
except Exception as e:
self.logger.error(f"Error migrating creator {creator}: {e}")
raise
conn.commit()
conn.close()
self.logger.info(f"Successfully migrated {len(creator_id_mapping)} creators")
return creator_id_mapping
def migrate_commands(self, commands: List[Dict], creator_id_mapping: Dict[int, int]) -> None:
"""Migrate commands using the creator ID mapping"""
if self.dry_run:
self.logger.info(f"[DRY RUN] Would migrate {len(commands)} commands")
return
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
migrated_count = 0
skipped_count = 0
for command in commands:
try:
# Map old creator_id to new creator_id
if command['old_creator_id'] not in creator_id_mapping:
self.logger.warning(f"Skipping command '{command['name']}' - creator ID {command['old_creator_id']} not found in mapping")
skipped_count += 1
continue
new_creator_id = creator_id_mapping[command['old_creator_id']]
# Check if command already exists by name
cursor.execute("SELECT id FROM custom_commands WHERE name = ?", (command['name'],))
existing = cursor.fetchone()
if existing:
self.logger.warning(f"Command '{command['name']}' already exists with ID {existing[0]} - skipping")
skipped_count += 1
continue
# Determine if command was warned/inactive based on sent_warns
warning_sent = bool(command['sent_warns'] and command['sent_warns'] != 0)
# For migrated commands, ensure last_used is at least the migration date
# to prevent immediate deletion eligibility
migration_date = datetime.now().isoformat()
last_used = command['last_used']
# If command hasn't been used recently, set last_used to migration date
# to give it a grace period
if last_used:
try:
last_used_dt = datetime.fromisoformat(last_used.replace('Z', '+00:00'))
# If last used more than 60 days ago, update to migration date
if (datetime.now() - last_used_dt).days > 60:
last_used = migration_date
self.logger.info(f"Updated last_used for command '{command['name']}' to migration date")
except:
# If we can't parse the date, use migration date
last_used = migration_date
else:
# If no last_used date, use migration date
last_used = migration_date
# Add migration tag to indicate this is a migrated command
tags = '["migrated"]'
# Insert command
cursor.execute("""
INSERT INTO custom_commands
(name, content, creator_id, created_at, updated_at, last_used, use_count, warning_sent, is_active, tags)
VALUES (?, ?, ?, ?, ?, ?, 0, ?, 1, ?)
""", (
command['name'],
command['content'],
new_creator_id,
command['created_at'],
None, # updated_at
last_used,
warning_sent,
tags
))
migrated_count += 1
if migrated_count % 10 == 0:
self.logger.info(f"Migrated {migrated_count} commands...")
except Exception as e:
self.logger.error(f"Error migrating command {command}: {e}")
raise
conn.commit()
conn.close()
self.logger.info(f"Successfully migrated {migrated_count} commands, skipped {skipped_count}")
def update_creator_stats(self) -> None:
"""Update creator statistics after migration"""
if self.dry_run:
self.logger.info("[DRY RUN] Would update creator statistics")
return
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
# Update creator stats
cursor.execute("""
UPDATE custom_command_creators SET
total_commands = (
SELECT COUNT(*) FROM custom_commands
WHERE creator_id = custom_command_creators.id
),
active_commands = (
SELECT COUNT(*) FROM custom_commands
WHERE creator_id = custom_command_creators.id AND is_active = 1
)
""")
conn.commit()
conn.close()
self.logger.info("Updated creator statistics")
def generate_migration_report(self) -> None:
"""Generate a detailed migration report"""
if self.dry_run:
conn_source = sqlite3.connect(self.source_db)
cursor_source = conn_source.cursor()
cursor_source.execute("SELECT COUNT(*) FROM creator")
source_creators = cursor_source.fetchone()[0]
cursor_source.execute("SELECT COUNT(*) FROM command")
source_commands = cursor_source.fetchone()[0]
conn_source.close()
self.logger.info(f"""
=== DRY RUN MIGRATION REPORT ===
Source Database: {self.source_db}
Target Database: {self.target_db}
Would migrate:
- {source_creators} creators
- {source_commands} commands
No actual changes made (dry run mode).
""".strip())
return
# Real migration report
conn_source = sqlite3.connect(self.source_db)
conn_target = sqlite3.connect(self.target_db)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
# Source counts
cursor_source.execute("SELECT COUNT(*) FROM creator")
source_creators = cursor_source.fetchone()[0]
cursor_source.execute("SELECT COUNT(*) FROM command")
source_commands = cursor_source.fetchone()[0]
# Target counts
cursor_target.execute("SELECT COUNT(*) FROM custom_command_creators")
target_creators = cursor_target.fetchone()[0]
cursor_target.execute("SELECT COUNT(*) FROM custom_commands")
target_commands = cursor_target.fetchone()[0]
# Get sample of migrated data
cursor_target.execute("""
SELECT cc.name, cc.content, ccc.username
FROM custom_commands cc
JOIN custom_command_creators ccc ON cc.creator_id = ccc.id
LIMIT 5
""")
sample_commands = cursor_target.fetchall()
conn_source.close()
conn_target.close()
self.logger.info(f"""
=== MIGRATION REPORT ===
Source Database: {self.source_db}
Target Database: {self.target_db}
Migration Results:
- Source creators: {source_creators} -> Target creators: {target_creators}
- Source commands: {source_commands} -> Target commands: {target_commands}
Sample migrated commands:
""".strip())
for cmd in sample_commands:
self.logger.info(f" '{cmd[0]}' by {cmd[2]}: {cmd[1][:50]}...")
def run_migration(self) -> bool:
"""Execute the full migration process"""
self.logger.info(f"Starting custom commands migration {'(DRY RUN)' if self.dry_run else ''}")
self.logger.info(f"Source: {self.source_db}")
self.logger.info(f"Target: {self.target_db}")
try:
# Validate databases
if not self.validate_source_database():
return False
if not self.validate_target_database():
return False
# Load source data
creators, commands = self.load_source_data()
# Migrate creators first
creator_id_mapping = self.migrate_creators(creators)
# Migrate commands
self.migrate_commands(commands, creator_id_mapping)
# Update statistics
self.update_creator_stats()
# Generate report
self.generate_migration_report()
self.logger.info("Migration completed successfully!")
return True
except Exception as e:
self.logger.error(f"Migration failed: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Migrate custom commands from old database to new schema')
parser.add_argument('--source', required=True, help='Path to source database (sba_is_fun.db)')
parser.add_argument('--target', required=True, help='Path to target database (sba_master.db)')
parser.add_argument('--dry-run', action='store_true', help='Run in dry-run mode (no actual changes)')
args = parser.parse_args()
migrator = CustomCommandMigrator(args.source, args.target, args.dry_run)
success = migrator.run_migration()
exit(0 if success else 1)
if __name__ == '__main__':
main()

Binary file not shown.

Binary file not shown.