import asyncio import json import re import shutil import secrets from collections import Counter from contextlib import asynccontextmanager from dataclasses import dataclass, field from pathlib import Path from typing import Literal from urllib.parse import quote import httpx import yaml from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import StreamingResponse, FileResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials cfg = yaml.safe_load(Path("config.yaml").read_text()) USERNAME: str = cfg["username"] PASSWORD: str = cfg["password"] BASE_URL: str = cfg["base_url"].rstrip("/") PORT: int = cfg.get("port", 8000) OUTPUTS = Path("outputs") MTG_BACK = "https://upload.wikimedia.org/wikipedia/en/a/aa/Magic_the_Gathering_card_back.jpg" security = HTTPBasic() def auth(creds: HTTPBasicCredentials = Depends(security)): ok = ( secrets.compare_digest(creds.username.encode(), USERNAME.encode()) and secrets.compare_digest(creds.password.encode(), PASSWORD.encode()) ) if not ok: raise HTTPException(401, headers={"WWW-Authenticate": "Basic"}) @dataclass class Job: deck_name: str slug: str cards: list[str] status: Literal["queued", "running", "complete", "failed"] = "queued" log: list[str] = field(default_factory=list) done: int = 0 total: int = 0 jobs: dict[str, Job] = {} queue: asyncio.Queue[str] = asyncio.Queue() def slugify(name: str) -> str: return re.sub(r"[^a-z0-9]+", "_", name.lower().strip()).strip("_") def valid_slug(slug: str) -> str: if not re.match(r"^[a-z0-9_]+$", slug): raise HTTPException(400, "invalid slug") return slug def safe_filename(name: str) -> str: return re.sub(r"[^a-z0-9_]", "", name.lower().replace(" ", "_")) def card_dest(deck_dir: Path, name: str, occ: int, total: int) -> Path: base = safe_filename(name) suffix = f"_{occ}" if total > 1 else "" return deck_dir / f"{base}{suffix}.png" async def scryfall_get(client: httpx.AsyncClient, url: str, job: "Job") -> httpx.Response: for attempt in range(4): r = await client.get(url) if r.status_code != 429: return r wait = float(r.headers.get("Retry-After", 2 ** attempt)) job.log.append(f" 429 — waiting {wait:.0f}s") await asyncio.sleep(wait) r.raise_for_status() return r async def fetch_image(client: httpx.AsyncClient, name: str, job: "Job") -> bytes: for param in ("exact", "fuzzy"): url = f"https://api.scryfall.com/cards/named?{param}={quote(name.strip())}" r = await scryfall_get(client, url, job) if r.status_code == 200: break if param == "fuzzy": r.raise_for_status() data = r.json() if "image_uris" in data: img_url = data["image_uris"]["normal"] elif "card_faces" in data and data["card_faces"][0].get("image_uris"): img_url = data["card_faces"][0]["image_uris"]["normal"] else: raise ValueError("no image_uris in scryfall response") job.log.append(f" img: {img_url}") img_r = await scryfall_get(client, img_url, job) img_r.raise_for_status() return img_r.content def build_tts(job: Job) -> dict: deck_dir = OUTPUTS / job.slug counts = Counter(job.cards) seen: Counter = Counter() all_entries: list[tuple[str, Path]] = [] for card in job.cards: seen[card] += 1 all_entries.append((card, card_dest(deck_dir, card, seen[card], counts[card]))) commander_name, commander_path = all_entries[0] rest = all_entries[1:] def card_url(p: Path) -> str: return f"{BASE_URL}/cards/{job.slug}/{p.name}" def custom_entry(url: str) -> dict: return { "FaceURL": url, "BackURL": MTG_BACK, "NumWidth": 1, "NumHeight": 1, "BackIsHidden": True, "UniqueBack": False, "Type": 0, } def transform(x=0.0, y=1.0, z=0.0, rx=0.0, ry=180.0, rz=180.0) -> dict: return { "posX": x, "posY": y, "posZ": z, "rotX": rx, "rotY": ry, "rotZ": rz, "scaleX": 1.0, "scaleY": 1.0, "scaleZ": 1.0, } card_base = { "Description": "", "Locked": False, "Grid": True, "Snap": True, "Autoraise": True, "Sticky": True, "Tooltip": True, "HideWhenFaceDown": True, "Hands": True, "SidewaysCard": False, "LuaScript": "", "LuaScriptState": "", "XmlUI": "", } # CustomDeck key 1 → CardID 100 → commander commander_entry = custom_entry(card_url(commander_path)) commander_obj = { "Name": "Card", "Transform": transform(x=-5.0, rz=0.0), "Nickname": commander_name, **card_base, "CardID": 100, "CustomDeck": {"1": commander_entry}, } deck_ids = [] custom_deck = {} contained = [] for i, (name, path) in enumerate(rest): key = i + 2 # keys 2..N, CardIDs 200..N*100 cid = key * 100 entry = custom_entry(card_url(path)) custom_deck[str(key)] = entry deck_ids.append(cid) contained.append({ "Name": "Card", "Transform": transform(), "Nickname": name, **card_base, "CardID": cid, "CustomDeck": {str(key): entry}, }) deck_obj = { "Name": "Deck", "Transform": transform(), "Nickname": job.deck_name, "Description": "", "Locked": False, "Grid": True, "Snap": True, "Autoraise": True, "Sticky": True, "Tooltip": True, "GridProjection": False, "HideWhenFaceDown": True, "Hands": False, "SidewaysCard": False, "DeckIDs": deck_ids, "CustomDeck": custom_deck, "ContainedObjects": contained, "LuaScript": "", "LuaScriptState": "", "XmlUI": "", } return { "SaveName": job.deck_name, "Date": "", "VersionNumber": "", "GameMode": "Tabletop Simulator", "Gravity": 0.5, "PlayArea": 0.5, "Table": "", "Sky": "", "Note": "", "TabStates": {}, "LuaScript": "", "LuaScriptState": "", "XmlUI": "", "ObjectStates": [commander_obj, deck_obj], } async def worker(): async with httpx.AsyncClient(timeout=30, headers={"User-Agent": "card-server/1.0"}) as client: while True: slug = await queue.get() job = jobs.get(slug) if not job: queue.task_done() continue job.status = "running" deck_dir = OUTPUTS / slug deck_dir.mkdir(exist_ok=True) counts = Counter(job.cards) seen: Counter = Counter() job.total = len(job.cards) try: for i, card in enumerate(job.cards, 1): seen[card] += 1 occ = seen[card] total = counts[card] dest = card_dest(deck_dir, card, occ, total) label = f"[{i}/{job.total}] {card}" + (f" ({occ}/{total})" if total > 1 else "") if dest.exists(): job.log.append(f"{label} — SKIP") elif occ > 1: src = card_dest(deck_dir, card, 1, total) if src.exists(): shutil.copy(src, dest) job.log.append(f"{label} — COPY") else: job.log.append(f"{label} — FAIL: source missing") else: try: data = await fetch_image(client, card, job) dest.write_bytes(data) job.log.append(f"{label} — OK") except Exception as e: job.log.append(f"{label} — FAIL: {e}") job.done = i if i < job.total and occ == 1: await asyncio.sleep(0.5) # stay well under scryfall rate limit tts = build_tts(job) tts_json = json.dumps(tts, indent=2) (deck_dir / "deck.tts.json").write_text(tts_json) # Saved Object: same structure, TTS spawns it into a running game # Place in: Saves/Saved Objects/ and use Objects → Saved Objects in-game (deck_dir / "deck.tts.object.json").write_text(tts_json) (deck_dir / "cards.txt").write_text("\n".join(job.cards)) job.status = "complete" job.log.append("Complete.") except Exception as e: job.status = "failed" job.log.append(f"Worker error: {e}") finally: queue.task_done() @asynccontextmanager async def lifespan(app: FastAPI): OUTPUTS.mkdir(exist_ok=True) for d in OUTPUTS.iterdir(): if not d.is_dir() or not (d / "cards.txt").exists(): continue slug = d.name if slug in jobs: continue cards = [l for l in (d / "cards.txt").read_text().splitlines() if l] status: Literal["complete", "failed"] = "failed" deck_name = slug.replace("_", " ").title() if (d / "deck.tts.json").exists(): status = "complete" try: deck_name = json.loads((d / "deck.tts.json").read_text()).get("SaveName", deck_name) except Exception: pass jobs[slug] = Job( deck_name=deck_name, slug=slug, cards=cards, status=status, done=len(cards), total=len(cards), log=["Loaded from disk."], ) asyncio.create_task(worker()) yield app = FastAPI(lifespan=lifespan) @app.get("/") async def index(_=Depends(auth)): return FileResponse("static/index.html") @app.get("/decks") async def list_decks(_=Depends(auth)): return [ {"slug": j.slug, "deck_name": j.deck_name, "status": j.status, "done": j.done, "total": j.total} for j in jobs.values() ] @app.get("/decks/{slug}") async def get_deck(slug: str, _=Depends(auth)): slug = valid_slug(slug) job = jobs.get(slug) if not job: raise HTTPException(404) return { "slug": job.slug, "deck_name": job.deck_name, "status": job.status, "done": job.done, "total": job.total, "log": job.log, } @app.post("/decks") async def submit_deck(request: Request, _=Depends(auth)): body = await request.json() name = body.get("deck_name", "").strip() card_text = body.get("cards", "").strip() if not name or not card_text: raise HTTPException(400, "deck_name and cards required") slug = slugify(name) cards = [l.strip() for l in card_text.splitlines() if l.strip()] if not cards: raise HTTPException(400, "no cards") if slug in jobs and jobs[slug].status in ("queued", "running"): raise HTTPException(409, "deck already processing") jobs[slug] = Job(deck_name=name, slug=slug, cards=cards, total=len(cards)) await queue.put(slug) return {"slug": slug} @app.delete("/decks/{slug}") async def delete_deck(slug: str, _=Depends(auth)): slug = valid_slug(slug) job = jobs.get(slug) if not job: raise HTTPException(404) if job.status in ("queued", "running"): raise HTTPException(409, "can't delete while processing") deck_dir = OUTPUTS / slug if deck_dir.exists(): shutil.rmtree(deck_dir) del jobs[slug] return {"deleted": slug} @app.get("/decks/{slug}/stream") async def stream(slug: str, _=Depends(auth)): slug = valid_slug(slug) async def gen(): sent = 0 while True: job = jobs.get(slug) if not job: yield "data: not found\n\n" return while sent < len(job.log): yield f"data: {job.log[sent]}\n\n" sent += 1 if job.status in ("complete", "failed"): yield "data: __DONE__\n\n" return await asyncio.sleep(0.5) return StreamingResponse(gen(), media_type="text/event-stream") @app.get("/decks/{slug}/tts") async def download_tts(slug: str, _=Depends(auth)): slug = valid_slug(slug) p = OUTPUTS / slug / "deck.tts.json" if not p.exists(): raise HTTPException(404) return FileResponse(p, filename=f"{slug}.json", media_type="application/json") @app.get("/decks/{slug}/object") async def download_object(slug: str, _=Depends(auth)): slug = valid_slug(slug) p = OUTPUTS / slug / "deck.tts.object.json" if not p.exists(): raise HTTPException(404) return FileResponse(p, filename=f"{slug}.object.json", media_type="application/json") # No auth — TTS desktop app fetches these URLs directly @app.get("/cards/{slug}/{filename}") async def serve_card(slug: str, filename: str): slug = Path(slug).name filename = Path(filename).name p = OUTPUTS / slug / filename if not p.exists(): raise HTTPException(404) media_type = "image/png" if filename.endswith(".png") else "image/webp" return FileResponse(p, media_type=media_type) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=PORT)