claude-home/media-tools/scripts/pokeflix_scraper.py
Cal Corum ceb4dd36a0 Add docker scripts, media-tools, VM management, and n8n workflow docs
Add CONTEXT.md for docker and VM management script directories.
Add media-tools documentation with Playwright scraping patterns.
Add Tdarr GPU monitor n8n workflow definition.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 22:26:10 -06:00

778 lines
27 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Pokeflix Scraper - Download Pokemon episodes from pokeflix.tv
Pokeflix hosts videos directly on their CDN (v1.pkflx.com). This scraper:
1. Extracts the episode list from a season browse page
2. Visits each episode page to detect its CDN episode number
3. Downloads videos directly from the CDN via yt-dlp
Usage:
# Download entire season
python pokeflix_scraper.py --url "https://www.pokeflix.tv/browse/pokemon-indigo-league" --output ~/Pokemon/
# Download specific episode range
python pokeflix_scraper.py --url "..." --start 1 --end 10 --output ~/Pokemon/
# Resume interrupted download
python pokeflix_scraper.py --url "..." --output ~/Pokemon/ --resume
# Dry run (extract URLs only)
python pokeflix_scraper.py --url "..." --dry-run
# Choose quality
python pokeflix_scraper.py --url "..." --quality 720p --output ~/Pokemon/
Dependencies:
pip install playwright
playwright install chromium
# yt-dlp must be installed: pip install yt-dlp
Author: Cal Corum (with Jarvis assistance)
"""
import argparse
import asyncio
import json
import logging
import random
import re
import subprocess
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Optional
try:
from playwright.async_api import async_playwright, Page, Browser
except ImportError:
print("ERROR: playwright not installed. Run: pip install playwright && playwright install chromium")
sys.exit(1)
# ============================================================================
# Data Classes
# ============================================================================
@dataclass
class Episode:
"""Represents a single episode with its metadata and download status."""
cdn_number: int # The actual episode number on the CDN
title: str
page_url: str
slug: str # URL slug e.g., "01-pokemon-i-choose-you"
video_url: Optional[str] = None
downloaded: bool = False
error: Optional[str] = None
@property
def filename(self) -> str:
"""Generate safe filename for the episode."""
safe_title = re.sub(r'[<>:"/\\|?*]', '', self.title)
safe_title = safe_title.strip()
return f"E{self.cdn_number:02d} - {safe_title}.mp4"
@dataclass
class Season:
"""Represents a season/series with all its episodes."""
name: str
url: str
cdn_slug: str # e.g., "01-indigo-league" - used for CDN URLs
episodes: list[Episode] = field(default_factory=list)
@property
def safe_name(self) -> str:
"""Generate safe directory name for the season."""
safe = re.sub(r'[<>:"/\\|?*]', '', self.name)
return safe.strip()
@dataclass
class DownloadState:
"""Persistent state for resumable downloads."""
season_url: str
season_name: str
cdn_slug: str
episodes: dict[int, dict] = field(default_factory=dict) # cdn_number -> episode dict
episode_urls: list[str] = field(default_factory=list) # All episode page URLs
last_updated: str = ""
def save(self, path: Path) -> None:
"""Save state to JSON file."""
self.last_updated = datetime.now().isoformat()
with open(path, 'w') as f:
json.dump(asdict(self), f, indent=2)
@classmethod
def load(cls, path: Path) -> Optional['DownloadState']:
"""Load state from JSON file."""
if not path.exists():
return None
with open(path) as f:
data = json.load(f)
return cls(**data)
# ============================================================================
# Logging Setup
# ============================================================================
def setup_logging(verbose: bool = False) -> logging.Logger:
"""Configure logging with console output."""
logger = logging.getLogger('pokeflix_scraper')
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
if not logger.handlers:
console = logging.StreamHandler()
console.setLevel(logging.DEBUG if verbose else logging.INFO)
console.setFormatter(logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
))
logger.addHandler(console)
return logger
# ============================================================================
# Scraper Class
# ============================================================================
class PokeflixScraper:
"""
Scrapes pokeflix.tv for video URLs and downloads them.
Pokeflix hosts videos on their CDN with URLs like:
https://v1.pkflx.com/hls/{season-slug}/{ep-num}/{ep-num}_{quality}.mp4
The episode number must be detected by visiting each episode page,
as the browse page URL slugs don't contain episode numbers.
"""
BASE_URL = "https://www.pokeflix.tv"
CDN_URL = "https://v1.pkflx.com/hls"
# Map browse page URL slugs to CDN slugs
SEASON_SLUG_MAP = {
'pokemon-indigo-league': '01-indigo-league',
'pokemon-adventures-in-the-orange-islands': '02-orange-islands',
'pokemon-the-johto-journeys': '03-johto-journeys',
'pokemon-johto-league-champions': '04-johto-league-champions',
'pokemon-master-quest': '05-master-quest',
'pokemon-advanced': '06-advanced',
'pokemon-advanced-challenge': '07-advanced-challenge',
'pokemon-advanced-battle': '08-advanced-battle',
'pokemon-battle-frontier': '09-battle-frontier',
'pokemon-diamond-and-pearl': '10-diamond-and-pearl',
'pokemon-dp-battle-dimension': '11-battle-dimension',
'pokemon-dp-galactic-battles': '12-galactic-battles',
'pokemon-dp-sinnoh-league-victors': '13-sinnoh-league-victors',
'pokemon-black-white': '14-black-and-white',
'pokemon-bw-rival-destinies': '15-rival-destinies',
'pokemon-bw-adventures-in-unova': '16-adventures-in-unova',
'pokemon-xy': '17-xy',
'pokemon-xy-kalos-quest': '18-kalos-quest',
'pokemon-xyz': '19-xyz',
'pokemon-sun-moon': '20-sun-and-moon',
'pokemon-sun-moon-ultra-adventures': '21-ultra-adventures',
'pokemon-sun-moon-ultra-legends': '22-ultra-legends',
'pokemon-journeys': '23-journeys',
'pokemon-master-journeys': '24-master-journeys',
'pokemon-ultimate-journeys': '25-ultimate-journeys',
'pokemon-horizons': '26-horizons',
}
def __init__(
self,
output_dir: Path,
headless: bool = False,
dry_run: bool = False,
verbose: bool = False,
quality: str = "1080p"
):
self.output_dir = output_dir
self.headless = headless
self.dry_run = dry_run
self.quality = quality
self.logger = setup_logging(verbose)
self.browser: Optional[Browser] = None
self._context = None
async def __aenter__(self):
"""Async context manager entry - launch browser."""
playwright = await async_playwright().start()
self.browser = await playwright.chromium.launch(
headless=self.headless,
args=['--disable-blink-features=AutomationControlled']
)
self._playwright = playwright
# Create a persistent context to reuse
self._context = await self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit - close browser."""
if self._context:
await self._context.close()
if self.browser:
await self.browser.close()
await self._playwright.stop()
async def _new_page(self) -> Page:
"""Create a new page using the shared context."""
return await self._context.new_page()
async def _random_delay(self, min_sec: float = 1.0, max_sec: float = 3.0):
"""Random delay to avoid detection."""
delay = random.uniform(min_sec, max_sec)
await asyncio.sleep(delay)
async def _wait_for_cloudflare(self, page: Page, timeout: int = 60):
"""Wait for Cloudflare challenge to be solved by user."""
try:
# Check if we're on a Cloudflare challenge page
is_cf = await page.query_selector('#challenge-running, .cf-browser-verification, [id*="challenge"]')
if is_cf:
self.logger.warning("Cloudflare challenge detected - please solve it in the browser window")
self.logger.info("Waiting up to 60 seconds for challenge completion...")
# Wait for the challenge to be solved (URL changes or challenge element disappears)
for _ in range(timeout):
await asyncio.sleep(1)
is_cf = await page.query_selector('#challenge-running, .cf-browser-verification, [id*="challenge"]')
if not is_cf:
self.logger.info("Cloudflare challenge completed!")
await asyncio.sleep(2) # Wait for page to fully load
return True
self.logger.error("Cloudflare challenge timeout - please try again")
return False
except Exception:
pass
return True
def _get_cdn_slug(self, browse_url: str) -> Optional[str]:
"""Extract CDN slug from browse page URL."""
match = re.search(r'/browse/([^/]+)', browse_url)
if match:
page_slug = match.group(1)
if page_slug in self.SEASON_SLUG_MAP:
return self.SEASON_SLUG_MAP[page_slug]
self.logger.warning(f"Unknown season slug: {page_slug}, will try to detect from page")
return None
def _construct_video_url(self, cdn_slug: str, ep_num: int) -> str:
"""Construct direct CDN video URL."""
return f"{self.CDN_URL}/{cdn_slug}/{ep_num:02d}/{ep_num:02d}_{self.quality}.mp4"
def _slug_to_title(self, slug: str) -> str:
"""Convert URL slug to human-readable title."""
# Remove season prefix like "01-"
title_slug = re.sub(r'^\d+-', '', slug)
# Convert to title case
title = title_slug.replace('-', ' ').title()
# Clean up common words
title = re.sub(r'\bPokemon\b', 'Pokémon', title)
return title
async def get_episode_list(self, season_url: str) -> tuple[str, str, list[tuple[str, str]]]:
"""
Get the list of episode URLs from a season browse page.
Returns:
Tuple of (season_name, cdn_slug, list of (page_url, slug) tuples)
"""
self.logger.info(f"Fetching season page: {season_url}")
cdn_slug = self._get_cdn_slug(season_url)
page = await self._new_page()
try:
await page.goto(season_url, wait_until='networkidle', timeout=60000)
await self._wait_for_cloudflare(page)
await self._random_delay(2, 4)
# Extract season title
title_elem = await page.query_selector('h1, .season-title, .series-title')
if not title_elem:
title_elem = await page.query_selector('title')
season_name = await title_elem.inner_text() if title_elem else "Unknown Season"
season_name = season_name.replace('Pokéflix - Watch ', '').replace(' for free online!', '').strip()
self.logger.info(f"Season: {season_name}")
# Find all episode links with /v/ pattern
links = await page.query_selector_all('a[href^="/v/"]')
self.logger.info(f"Found {len(links)} episode links")
# If we don't have CDN slug, detect it from first episode
if not cdn_slug and links:
first_href = await links[0].get_attribute('href')
cdn_slug = await self._detect_cdn_slug(first_href)
if not cdn_slug:
self.logger.error("Could not determine CDN slug for this season")
return season_name, "unknown", []
self.logger.info(f"CDN slug: {cdn_slug}")
# Collect all episode URLs
episode_data = []
seen_urls = set()
for link in links:
href = await link.get_attribute('href')
if not href or href in seen_urls:
continue
seen_urls.add(href)
# Extract slug from URL
slug_match = re.search(r'/v/(.+)', href)
if slug_match:
slug = slug_match.group(1)
full_url = self.BASE_URL + href
episode_data.append((full_url, slug))
return season_name, cdn_slug, episode_data
finally:
await page.close()
async def _detect_cdn_slug(self, episode_href: str) -> Optional[str]:
"""Visit an episode page to detect the CDN slug from network requests."""
self.logger.info("Detecting CDN slug from episode page...")
detected_slug = None
async def capture_request(request):
nonlocal detected_slug
if 'v1.pkflx.com/hls/' in request.url:
match = re.search(r'hls/([^/]+)/', request.url)
if match:
detected_slug = match.group(1)
page = await self._new_page()
page.on('request', capture_request)
try:
await page.goto(self.BASE_URL + episode_href, timeout=60000)
await self._wait_for_cloudflare(page)
await asyncio.sleep(5)
return detected_slug
finally:
await page.close()
async def get_episode_cdn_number(self, page_url: str) -> Optional[int]:
"""
Visit an episode page and detect its CDN episode number.
Returns:
The episode number used in CDN URLs, or None if not detected
"""
detected_num = None
async def capture_request(request):
nonlocal detected_num
if 'v1.pkflx.com/hls/' in request.url:
match = re.search(r'/(\d+)/\d+_', request.url)
if match:
detected_num = int(match.group(1))
page = await self._new_page()
page.on('request', capture_request)
try:
await page.goto(page_url, timeout=60000)
await self._wait_for_cloudflare(page)
# Wait for initial load
await asyncio.sleep(2)
# Try to trigger video playback by clicking play button or video area
play_selectors = [
'button[aria-label*="play" i]',
'.play-button',
'[class*="play"]',
'video',
'.video-player',
'.player',
'#player',
]
for selector in play_selectors:
try:
elem = await page.query_selector(selector)
if elem:
await elem.click()
await asyncio.sleep(0.5)
if detected_num:
break
except Exception:
pass
# Wait for video requests after click attempts
for _ in range(10): # Wait up to 5 seconds
if detected_num:
break
await asyncio.sleep(0.5)
# If still not detected, try looking in page source
if not detected_num:
content = await page.content()
match = re.search(r'v1\.pkflx\.com/hls/[^/]+/(\d+)/', content)
if match:
detected_num = int(match.group(1))
return detected_num
finally:
await page.close()
def download_video(self, video_url: str, output_path: Path) -> bool:
"""
Download video using yt-dlp.
Args:
video_url: Direct CDN URL to the video
output_path: Full path for output file
Returns:
True if download succeeded
"""
if self.dry_run:
self.logger.info(f" [DRY RUN] Would download: {video_url}")
self.logger.info(f" To: {output_path}")
return True
self.logger.info(f" Downloading: {output_path.name}")
cmd = [
'yt-dlp',
'--no-warnings',
'-o', str(output_path),
'--no-part',
video_url
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=1800
)
if result.returncode == 0:
self.logger.info(f" Download complete!")
return True
else:
self.logger.error(f" yt-dlp error: {result.stderr}")
return False
except subprocess.TimeoutExpired:
self.logger.error(" Download timed out after 30 minutes")
return False
except FileNotFoundError:
self.logger.error(" yt-dlp not found. Install with: pip install yt-dlp")
return False
async def download_direct(
self,
season_url: str,
start_ep: int,
end_ep: int,
resume: bool = False
) -> None:
"""
Direct download mode - download episodes by number without visiting pages.
This is faster and more reliable when you know the episode range.
"""
# Get CDN slug from URL
cdn_slug = self._get_cdn_slug(season_url)
if not cdn_slug:
self.logger.error("Unknown season - direct mode requires a known season URL")
self.logger.info("Known seasons: " + ", ".join(self.SEASON_SLUG_MAP.keys()))
return
# In direct mode, output_dir is the final destination (no subfolder created)
season_dir = self.output_dir
season_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Direct download mode: Episodes {start_ep}-{end_ep}")
self.logger.info(f"CDN slug: {cdn_slug}")
self.logger.info(f"Quality: {self.quality}")
self.logger.info(f"Output: {season_dir}")
downloaded = 0
skipped = 0
failed = 0
for ep_num in range(start_ep, end_ep + 1):
output_path = season_dir / f"E{ep_num:02d}.mp4"
# Check if already exists
if output_path.exists() and resume:
self.logger.info(f"E{ep_num:02d}: Skipping (file exists)")
skipped += 1
continue
video_url = self._construct_video_url(cdn_slug, ep_num)
self.logger.info(f"E{ep_num:02d}: Downloading...")
success = self.download_video(video_url, output_path)
if success:
downloaded += 1
else:
failed += 1
if not self.dry_run:
await self._random_delay(0.5, 1.5)
self.logger.info(f"\nComplete! Downloaded: {downloaded}, Skipped: {skipped}, Failed: {failed}")
async def download_season(
self,
season_url: str,
start_ep: Optional[int] = None,
end_ep: Optional[int] = None,
resume: bool = False,
direct: bool = False
) -> None:
"""
Download all episodes from a season.
Args:
season_url: URL to the season browse page
start_ep: First episode number to download (inclusive)
end_ep: Last episode number to download (inclusive)
resume: Whether to resume from previous state
direct: If True, skip page visits and download by episode number
"""
# Direct mode - just download by episode number
if direct:
if start_ep is None or end_ep is None:
self.logger.error("Direct mode requires --start and --end episode numbers")
return
await self.download_direct(season_url, start_ep, end_ep, resume)
return
# Get episode list
season_name, cdn_slug, episode_data = await self.get_episode_list(season_url)
if not episode_data:
self.logger.error("No episodes found!")
return
# Create output directory
season_dir = self.output_dir / re.sub(r'[<>:"/\\|?*]', '', season_name).strip()
season_dir.mkdir(parents=True, exist_ok=True)
# State file for resume
state_path = season_dir / 'download_state.json'
state = None
if resume:
state = DownloadState.load(state_path)
if state:
self.logger.info(f"Resuming from previous state ({state.last_updated})")
if not state:
state = DownloadState(
season_url=season_url,
season_name=season_name,
cdn_slug=cdn_slug,
episode_urls=[url for url, _ in episode_data]
)
self.logger.info(f"Processing {len(episode_data)} episodes (quality: {self.quality})")
# Process each episode
downloaded_count = 0
skipped_count = 0
failed_count = 0
for page_url, slug in episode_data:
title = self._slug_to_title(slug)
# Check if we already have this episode in state (by URL)
existing_ep = None
for ep_data in state.episodes.values():
if ep_data.get('page_url') == page_url:
existing_ep = ep_data
break
if existing_ep and existing_ep.get('downloaded') and resume:
self.logger.info(f"Skipping: {title} (already downloaded)")
skipped_count += 1
continue
# Get the CDN episode number by visiting the page
self.logger.info(f"Checking: {title}")
cdn_num = await self.get_episode_cdn_number(page_url)
if cdn_num is None:
self.logger.error(f" Could not detect episode number, skipping")
failed_count += 1
continue
# Check if within requested range
if start_ep is not None and cdn_num < start_ep:
self.logger.info(f" Episode {cdn_num} before start range, skipping")
continue
if end_ep is not None and cdn_num > end_ep:
self.logger.info(f" Episode {cdn_num} after end range, skipping")
continue
# Check if file already exists
output_path = season_dir / f"E{cdn_num:02d} - {title}.mp4"
if output_path.exists() and resume:
self.logger.info(f" File exists, skipping")
state.episodes[str(cdn_num)] = {
'cdn_number': cdn_num,
'title': title,
'page_url': page_url,
'slug': slug,
'video_url': self._construct_video_url(cdn_slug, cdn_num),
'downloaded': True,
'error': None
}
state.save(state_path)
skipped_count += 1
continue
# Construct video URL and download
video_url = self._construct_video_url(cdn_slug, cdn_num)
self.logger.info(f" Episode {cdn_num}: {title}")
success = self.download_video(video_url, output_path)
# Save state
state.episodes[str(cdn_num)] = {
'cdn_number': cdn_num,
'title': title,
'page_url': page_url,
'slug': slug,
'video_url': video_url,
'downloaded': success,
'error': None if success else "Download failed"
}
state.save(state_path)
if success:
downloaded_count += 1
else:
failed_count += 1
# Delay between episodes
if not self.dry_run:
await self._random_delay(1, 2)
# Summary
self.logger.info(f"\nComplete!")
self.logger.info(f" Downloaded: {downloaded_count}")
self.logger.info(f" Skipped: {skipped_count}")
self.logger.info(f" Failed: {failed_count}")
self.logger.info(f" Output: {season_dir}")
# ============================================================================
# CLI
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description='Download Pokemon episodes from pokeflix.tv',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --url "https://www.pokeflix.tv/browse/pokemon-indigo-league" --output ~/Pokemon/
%(prog)s --url "..." --start 1 --end 10 --output ~/Pokemon/
%(prog)s --url "..." --output ~/Pokemon/ --resume
%(prog)s --url "..." --quality 720p --output ~/Pokemon/
%(prog)s --url "..." --dry-run
"""
)
parser.add_argument(
'--url', '-u',
required=True,
help='URL of the season/series page'
)
parser.add_argument(
'--output', '-o',
type=Path,
default=Path.home() / 'Downloads' / 'Pokemon',
help='Output directory (default: ~/Downloads/Pokemon)'
)
parser.add_argument(
'--start', '-s',
type=int,
help='Start episode number (CDN number)'
)
parser.add_argument(
'--end', '-e',
type=int,
help='End episode number (CDN number)'
)
parser.add_argument(
'--quality', '-q',
choices=['1080p', '720p', '360p'],
default='1080p',
help='Video quality (default: 1080p)'
)
parser.add_argument(
'--resume', '-r',
action='store_true',
help='Resume from previous download state'
)
parser.add_argument(
'--dry-run', '-n',
action='store_true',
help='Extract URLs only, do not download'
)
parser.add_argument(
'--headless',
action='store_true',
help='Run browser in headless mode (may trigger anti-bot)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose logging'
)
parser.add_argument(
'--direct',
action='store_true',
help='Direct download mode - skip page visits, just download episode range by number'
)
args = parser.parse_args()
async def run():
async with PokeflixScraper(
output_dir=args.output,
headless=args.headless,
dry_run=args.dry_run,
verbose=args.verbose,
quality=args.quality
) as scraper:
await scraper.download_season(
season_url=args.url,
start_ep=args.start,
end_ep=args.end,
resume=args.resume,
direct=args.direct
)
asyncio.run(run())
if __name__ == '__main__':
main()