Filter and tokens update
This commit is contained in:
@@ -2,7 +2,7 @@ import asyncio
|
||||
import json
|
||||
import re
|
||||
import shutil
|
||||
from urllib.parse import quote
|
||||
from urllib.parse import quote, unquote, urlparse
|
||||
|
||||
import httpx
|
||||
|
||||
@@ -16,6 +16,19 @@ def safe_filename(name: str) -> str:
|
||||
return re.sub(r"[^a-z0-9_]", "", name.lower().replace(" ", "_"))
|
||||
|
||||
|
||||
def is_scry_url(s: str) -> bool:
|
||||
s = s.strip().lower()
|
||||
return s.startswith("http") and "scryfall.com/card/" in s
|
||||
|
||||
|
||||
def parse_scry_url(url: str) -> tuple[str, str] | None:
|
||||
# https://scryfall.com/card/{set}/{number}[/{lang}]/{name-slug}
|
||||
parts = [p for p in urlparse(url.strip()).path.split("/") if p]
|
||||
if len(parts) >= 3 and parts[0] == "card":
|
||||
return parts[1], unquote(parts[2])
|
||||
return None
|
||||
|
||||
|
||||
async def _scryfall_get(client: httpx.AsyncClient, url: str, slug: str) -> httpx.Response:
|
||||
for attempt in range(4):
|
||||
r = await client.get(url)
|
||||
@@ -28,48 +41,68 @@ async def _scryfall_get(client: httpx.AsyncClient, url: str, slug: str) -> httpx
|
||||
return r
|
||||
|
||||
|
||||
async def fetch_card_data(client: httpx.AsyncClient, name: str, slug: str) -> tuple[bytes, float]:
|
||||
async def _fetch_bytes(client: httpx.AsyncClient, url: str, slug: str) -> bytes:
|
||||
r = await _scryfall_get(client, url, slug)
|
||||
r.raise_for_status()
|
||||
return r.content
|
||||
|
||||
|
||||
async def resolve_card(client: httpx.AsyncClient, slug: str, name: str, scry_url: str | None) -> dict:
|
||||
if scry_url:
|
||||
parsed = parse_scry_url(scry_url)
|
||||
if not parsed:
|
||||
raise ValueError("unrecognized scryfall url")
|
||||
setcode, num = parsed
|
||||
url = f"https://api.scryfall.com/cards/{quote(setcode)}/{quote(num)}"
|
||||
r = await _scryfall_get(client, url, slug)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
for param in ("exact", "fuzzy"):
|
||||
url = f"https://api.scryfall.com/cards/named?{param}={quote(name.strip())}"
|
||||
r = await _scryfall_get(client, url, slug)
|
||||
if r.status_code == 200:
|
||||
break
|
||||
return r.json()
|
||||
if param == "fuzzy":
|
||||
r.raise_for_status()
|
||||
raise ValueError("not found")
|
||||
|
||||
data = r.json()
|
||||
price = float(data.get("prices", {}).get("usd") or 0)
|
||||
|
||||
def extract_images(data: dict) -> tuple[str, str | None]:
|
||||
# Single image when the root carries image_uris (normal, split, flip, meld parts).
|
||||
# Two distinct faces (transform / modal_dfc / double_faced_token / reversible)
|
||||
# carry per-face image_uris and no root image_uris.
|
||||
if "image_uris" in data:
|
||||
img_url = data["image_uris"]["normal"]
|
||||
elif data.get("card_faces") and data["card_faces"][0].get("image_uris"):
|
||||
img_url = data["card_faces"][0]["image_uris"]["normal"]
|
||||
else:
|
||||
return data["image_uris"]["normal"], None
|
||||
faces = data.get("card_faces") or []
|
||||
front = faces[0]["image_uris"]["normal"] if faces and faces[0].get("image_uris") else None
|
||||
back = faces[1]["image_uris"]["normal"] if len(faces) >= 2 and faces[1].get("image_uris") else None
|
||||
if not front:
|
||||
raise ValueError("no image_uris in response")
|
||||
|
||||
img_r = await _scryfall_get(client, img_url, slug)
|
||||
img_r.raise_for_status()
|
||||
return img_r.content, price
|
||||
return front, back
|
||||
|
||||
|
||||
def build_tts(slug: str) -> dict:
|
||||
deck = db.get_deck(slug)
|
||||
cards = db.get_cards(slug)
|
||||
commander_name = deck["commander"]
|
||||
commander_name = (deck["commander"] or "").strip()
|
||||
|
||||
done_cards = [c for c in cards if c["fetch_status"] == "done" and c["filename"]]
|
||||
if not done_cards:
|
||||
done = [c for c in cards if c["fetch_status"] == "done" and c["filename"]]
|
||||
if not done:
|
||||
raise ValueError("no successfully fetched cards")
|
||||
|
||||
commander = next((c for c in done_cards if c["name"] == commander_name), done_cards[0])
|
||||
rest = [c for c in done_cards if c["id"] != commander["id"]]
|
||||
def card_url(fn: str) -> str:
|
||||
return f"{BASE_URL}/cards/{slug}/{fn}"
|
||||
|
||||
def card_url(filename: str) -> str:
|
||||
return f"{BASE_URL}/cards/{slug}/{filename}"
|
||||
|
||||
def custom_entry(url: str) -> dict:
|
||||
def entry(c) -> dict:
|
||||
if c["back_filename"]:
|
||||
return {
|
||||
"FaceURL": card_url(c["filename"]), "BackURL": card_url(c["back_filename"]),
|
||||
"NumWidth": 1, "NumHeight": 1,
|
||||
"BackIsHidden": False, "UniqueBack": True, "Type": 0,
|
||||
}
|
||||
return {
|
||||
"FaceURL": url, "BackURL": MTG_BACK,
|
||||
"FaceURL": card_url(c["filename"]), "BackURL": MTG_BACK,
|
||||
"NumWidth": 1, "NumHeight": 1,
|
||||
"BackIsHidden": True, "UniqueBack": False, "Type": 0,
|
||||
}
|
||||
@@ -85,40 +118,56 @@ def build_tts(slug: str) -> dict:
|
||||
"LuaScript": "", "LuaScriptState": "", "XmlUI": "",
|
||||
}
|
||||
|
||||
commander_obj = {
|
||||
"Name": "Card", "Transform": transform(x=-5.0, rz=0.0),
|
||||
"Nickname": commander["name"], **base,
|
||||
"CardID": 100,
|
||||
"CustomDeck": {"1": custom_entry(card_url(commander["filename"]))},
|
||||
}
|
||||
k = 0
|
||||
|
||||
deck_ids, custom_deck, contained = [], {}, []
|
||||
for i, card in enumerate(rest):
|
||||
key = str(i + 2)
|
||||
cid = (i + 2) * 100
|
||||
entry = custom_entry(card_url(card["filename"]))
|
||||
custom_deck[key] = entry
|
||||
deck_ids.append(cid)
|
||||
contained.append({
|
||||
"Name": "Card", "Transform": transform(), "Nickname": card["name"],
|
||||
**base, "CardID": cid, "CustomDeck": {key: entry},
|
||||
def next_k() -> int:
|
||||
nonlocal k
|
||||
k += 1
|
||||
return k
|
||||
|
||||
def card_object(c, x=0.0, rz=180.0) -> dict:
|
||||
n = next_k()
|
||||
return {
|
||||
"Name": "Card", "Transform": transform(x=x, rz=rz),
|
||||
"Nickname": c["name"], **base, "CardID": n * 100,
|
||||
"CustomDeck": {str(n): entry(c)},
|
||||
}
|
||||
|
||||
commander = next((c for c in done if c["name"] == commander_name), None) if commander_name else None
|
||||
rest = [c for c in done if commander is None or c["id"] != commander["id"]]
|
||||
|
||||
objects = []
|
||||
if commander:
|
||||
objects.append(card_object(commander, x=-5.0, rz=0.0))
|
||||
|
||||
if len(rest) >= 2:
|
||||
deck_ids, custom_deck, contained = [], {}, []
|
||||
for c in rest:
|
||||
n = next_k()
|
||||
e = entry(c)
|
||||
custom_deck[str(n)] = e
|
||||
deck_ids.append(n * 100)
|
||||
contained.append({
|
||||
"Name": "Card", "Transform": transform(), "Nickname": c["name"],
|
||||
**base, "CardID": n * 100, "CustomDeck": {str(n): e},
|
||||
})
|
||||
objects.append({
|
||||
"Name": "Deck", "Transform": transform(), "Nickname": deck["deck_name"],
|
||||
"Description": "", "Locked": False, "Grid": True, "Snap": True,
|
||||
"Autoraise": True, "Sticky": True, "Tooltip": True, "GridProjection": False,
|
||||
"HideWhenFaceDown": True, "Hands": False, "SidewaysCard": False,
|
||||
"DeckIDs": deck_ids, "CustomDeck": custom_deck, "ContainedObjects": contained,
|
||||
"LuaScript": "", "LuaScriptState": "", "XmlUI": "",
|
||||
})
|
||||
|
||||
deck_obj = {
|
||||
"Name": "Deck", "Transform": transform(), "Nickname": deck["deck_name"],
|
||||
"Description": "", "Locked": False, "Grid": True, "Snap": True,
|
||||
"Autoraise": True, "Sticky": True, "Tooltip": True, "GridProjection": False,
|
||||
"HideWhenFaceDown": True, "Hands": False, "SidewaysCard": False,
|
||||
"DeckIDs": deck_ids, "CustomDeck": custom_deck, "ContainedObjects": contained,
|
||||
"LuaScript": "", "LuaScriptState": "", "XmlUI": "",
|
||||
}
|
||||
elif rest:
|
||||
objects.append(card_object(rest[0], rz=0.0))
|
||||
|
||||
return {
|
||||
"SaveName": deck["deck_name"], "Date": "", "VersionNumber": "",
|
||||
"GameMode": "Tabletop Simulator", "Gravity": 0.5, "PlayArea": 0.5,
|
||||
"Table": "", "Sky": "", "Note": "", "TabStates": {},
|
||||
"LuaScript": "", "LuaScriptState": "", "XmlUI": "",
|
||||
"ObjectStates": [commander_obj, deck_obj],
|
||||
"ObjectStates": objects,
|
||||
}
|
||||
|
||||
|
||||
@@ -150,60 +199,88 @@ async def _process(client: httpx.AsyncClient, slug: str):
|
||||
for card in pending:
|
||||
name = card["name"]
|
||||
card_id = card["id"]
|
||||
filename = f"{safe_filename(name)}_{card_id}.png"
|
||||
scry_url = card["scry_url"]
|
||||
base = safe_filename(name) if name else f"card_{card_id}"
|
||||
filename = f"{base}_{card_id}.png"
|
||||
back_filename = f"{base}_{card_id}_back.png"
|
||||
dest = deck_dir / filename
|
||||
label = f"[{card['position']}] {name}"
|
||||
|
||||
with db.conn() as c:
|
||||
existing_row = c.execute(
|
||||
"SELECT filename FROM cards WHERE deck_slug=? AND name=? AND fetch_status='done' AND id!=? LIMIT 1",
|
||||
(slug, name, card_id)
|
||||
).fetchone()
|
||||
existing_filename = existing_row["filename"] if existing_row else None
|
||||
label = f"[{card['position']}] {name or scry_url}"
|
||||
|
||||
if dest.exists():
|
||||
db.add_log(slug, f"{label} — SKIP")
|
||||
bf = back_filename if (deck_dir / back_filename).exists() else None
|
||||
with db.conn() as c:
|
||||
c.execute("UPDATE cards SET fetch_status='done', filename=? WHERE id=?", (filename, card_id))
|
||||
elif existing_filename:
|
||||
src = deck_dir / existing_filename
|
||||
if src.exists():
|
||||
shutil.copy(src, dest)
|
||||
price_row = c.execute(
|
||||
"SELECT price_usd FROM cards WHERE deck_slug=? AND name=? AND fetch_status='done' AND id!=? LIMIT 1",
|
||||
(slug, name, card_id)
|
||||
).fetchone()
|
||||
price = price_row["price_usd"] if price_row else None
|
||||
if price is not None:
|
||||
c.execute("UPDATE cards SET fetch_status='done', filename=?, back_filename=?, price_usd=? WHERE id=?",
|
||||
(filename, bf, price, card_id))
|
||||
else:
|
||||
c.execute("UPDATE cards SET fetch_status='done', filename=?, back_filename=? WHERE id=?",
|
||||
(filename, bf, card_id))
|
||||
db.add_log(slug, f"{label} — SKIP")
|
||||
db.recalc_done(slug)
|
||||
continue
|
||||
|
||||
# Name-based dedup only — URL adds are precise prints and must never be aliased
|
||||
if not scry_url:
|
||||
with db.conn() as c:
|
||||
row = c.execute(
|
||||
"SELECT filename, back_filename, price_usd FROM cards "
|
||||
"WHERE deck_slug=? AND name=? AND fetch_status='done' AND id!=? LIMIT 1",
|
||||
(slug, name, card_id)
|
||||
).fetchone()
|
||||
if row and row["filename"] and (deck_dir / row["filename"]).exists():
|
||||
shutil.copy(deck_dir / row["filename"], dest)
|
||||
bf = None
|
||||
if row["back_filename"] and (deck_dir / row["back_filename"]).exists():
|
||||
shutil.copy(deck_dir / row["back_filename"], deck_dir / back_filename)
|
||||
bf = back_filename
|
||||
with db.conn() as c:
|
||||
price_row = c.execute(
|
||||
"SELECT price_usd FROM cards WHERE deck_slug=? AND name=? AND fetch_status='done' LIMIT 1",
|
||||
(slug, name)
|
||||
).fetchone()
|
||||
price = price_row["price_usd"] if price_row else 0.0
|
||||
c.execute("UPDATE cards SET fetch_status='done', filename=?, price_usd=? WHERE id=?",
|
||||
(filename, price, card_id))
|
||||
c.execute("UPDATE cards SET fetch_status='done', filename=?, back_filename=?, price_usd=? WHERE id=?",
|
||||
(filename, bf, row["price_usd"], card_id))
|
||||
db.add_log(slug, f"{label} — COPY")
|
||||
else:
|
||||
db.add_log(slug, f"{label} — FAIL: source missing")
|
||||
with db.conn() as c:
|
||||
c.execute("UPDATE cards SET fetch_status='failed' WHERE id=?", (card_id,))
|
||||
else:
|
||||
try:
|
||||
data, price = await fetch_card_data(client, name, slug)
|
||||
dest.write_bytes(data)
|
||||
with db.conn() as c:
|
||||
c.execute("UPDATE cards SET fetch_status='done', filename=?, price_usd=? WHERE id=?",
|
||||
(filename, price, card_id))
|
||||
db.add_log(slug, f"{label} — OK (${price:.2f})")
|
||||
await asyncio.sleep(0.5)
|
||||
except Exception as e:
|
||||
with db.conn() as c:
|
||||
c.execute("UPDATE cards SET fetch_status='failed' WHERE id=?", (card_id,))
|
||||
db.add_log(slug, f"{label} — FAIL: {e}")
|
||||
db.recalc_done(slug)
|
||||
continue
|
||||
|
||||
try:
|
||||
data = await resolve_card(client, slug, name, scry_url)
|
||||
front_url, back_url = extract_images(data)
|
||||
price = float(data.get("prices", {}).get("usd") or 0)
|
||||
resolved_name = name or data.get("name", "")
|
||||
|
||||
dest.write_bytes(await _fetch_bytes(client, front_url, slug))
|
||||
bf = None
|
||||
if back_url:
|
||||
(deck_dir / back_filename).write_bytes(await _fetch_bytes(client, back_url, slug))
|
||||
bf = back_filename
|
||||
|
||||
with db.conn() as c:
|
||||
c.execute("UPDATE cards SET fetch_status='done', name=?, filename=?, back_filename=?, price_usd=? WHERE id=?",
|
||||
(resolved_name, filename, bf, price, card_id))
|
||||
db.add_log(slug, f"[{card['position']}] {resolved_name} — OK (${price:.2f}){' 2-sided' if back_url else ''}")
|
||||
await asyncio.sleep(0.5)
|
||||
except Exception as e:
|
||||
with db.conn() as c:
|
||||
c.execute("UPDATE cards SET fetch_status='failed' WHERE id=?", (card_id,))
|
||||
db.add_log(slug, f"{label} — FAIL: {e}")
|
||||
|
||||
db.recalc_done(slug)
|
||||
|
||||
all_cards = db.get_cards(slug)
|
||||
if not all_cards:
|
||||
db.update_deck(slug, status="complete", price_usd=0)
|
||||
db.add_log(slug, "Empty bundle — add cards to export.")
|
||||
return
|
||||
|
||||
try:
|
||||
tts = build_tts(slug)
|
||||
(deck_dir / "deck.tts.json").write_text(json.dumps(tts, indent=2))
|
||||
total_price = sum(c["price_usd"] for c in db.get_cards(slug))
|
||||
total_price = sum(c["price_usd"] for c in all_cards)
|
||||
db.update_deck(slug, status="complete", price_usd=round(total_price, 2))
|
||||
db.add_log(slug, f"Complete. Deck value: ${total_price:.2f}")
|
||||
except Exception as e:
|
||||
db.update_deck(slug, status="failed")
|
||||
db.add_log(slug, f"TTS build failed: {e}")
|
||||
db.add_log(slug, f"TTS build failed: {e}")
|
||||
Reference in New Issue
Block a user