Finished Docker API

This commit is contained in:
Frederik Beimgraben 2025-08-31 19:08:33 +02:00
parent f63fbeeffd
commit 2c94e8c1f9
11 changed files with 1157 additions and 1 deletions

46
Dockerfile Normal file
View File

@ -0,0 +1,46 @@
# ---------- Base ----------
FROM python:3.11-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
# System deps (optional: tzdata for correct time)
RUN apt-get update && apt-get install -y --no-install-recommends \
tzdata ca-certificates \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# ---------- Dependencies ----------
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
# ---------- App ----------
# Struktur-Annahme:
# - src/ (alle .py Module, inkl. service_api.py, pdf_to_struct.py, pdf_filler.py, etc.)
# - assets/ (qsm.pdf, vsm.pdf)
COPY src/ /app/src/
COPY assets/ /app/assets/
# Falls deine Module relative Imports nutzen, src ins PYTHONPATH aufnehmen
ENV PYTHONPATH=/app/src
# pdf_filler.py sucht standardmäßig assets relativ zum Modul.
# Wir überschreiben die Template-Pfade per ENV, da die PDFs im Build-Root unter /app/assets liegen.
ENV QSM_TEMPLATE=/app/assets/qsm.pdf \
VSM_TEMPLATE=/app/assets/vsm.pdf
# Optional: Master-Key / DB-Config kommen zur Laufzeit per -e oder .env (docker run --env-file)
# ENV MASTER_KEY=change_me \
# MYSQL_HOST=mysql \
# MYSQL_PORT=3306 \
# MYSQL_DB=stupa \
# MYSQL_USER=stupa \
# MYSQL_PASSWORD=secret
EXPOSE 8000
# ---------- Run ----------
# Starte die FastAPI
# Hinweis: service_api.py muss in src/ liegen und die App als "app" exportieren.
CMD ["uvicorn", "service_api:app", "--host", "0.0.0.0", "--port", "8000"]

BIN
assets/qsm.pdf Normal file

Binary file not shown.

BIN
assets/vgl.pdf Normal file

Binary file not shown.

BIN
assets/vsm.pdf Normal file

Binary file not shown.

81
docker-compose.yml Normal file
View File

@ -0,0 +1,81 @@
version: "3.9"
services:
db:
image: mysql:8.0
container_name: stupa_db
restart: unless-stopped
command:
[
"mysqld",
"--character-set-server=utf8mb4",
"--collation-server=utf8mb4_unicode_ci",
"--default-authentication-plugin=mysql_native_password",
]
environment:
MYSQL_DATABASE: ${MYSQL_DB:-stupa}
MYSQL_USER: ${MYSQL_USER:-stupa}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-secret}
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootsecret}
healthcheck:
test:
[
"CMD-SHELL",
"mysqladmin ping -h 127.0.0.1 -uroot -p${MYSQL_ROOT_PASSWORD:-rootsecret} --silent",
]
interval: 10s
timeout: 5s
retries: 6
ports:
- "3306:3306"
volumes:
- db_data:/var/lib/mysql
api:
build:
context: .
dockerfile: Dockerfile
container_name: stupa_api
restart: unless-stopped
depends_on:
db:
condition: service_healthy
environment:
# DB
MYSQL_HOST: db
MYSQL_PORT: 3306
MYSQL_DB: ${MYSQL_DB:-stupa}
MYSQL_USER: ${MYSQL_USER:-stupa}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-secret}
# Auth / Limits
MASTER_KEY: ${MASTER_KEY:-change_me}
RATE_IP_PER_MIN: ${RATE_IP_PER_MIN:-60}
RATE_KEY_PER_MIN: ${RATE_KEY_PER_MIN:-30}
# PDF-Templates (liegen im Image in /app/assets)
QSM_TEMPLATE: /app/assets/qsm.pdf
VSM_TEMPLATE: /app/assets/vsm.pdf
# Optional: TZ
TZ: ${TZ:-Europe/Berlin}
ports:
- "8000:8000"
# Healthcheck: ping FastAPI root
healthcheck:
test: ["CMD-SHELL", "wget -qO- http://127.0.0.1:8000/ || exit 1"]
interval: 10s
timeout: 5s
retries: 6
adminer:
image: adminer:4
container_name: stupa_adminer
restart: unless-stopped
depends_on:
db:
condition: service_healthy
environment:
ADMINER_DEFAULT_SERVER: db
ports:
- "8080:8080"
volumes:
db_data:

19
requirements.txt Normal file
View File

@ -0,0 +1,19 @@
# Core API & HTTP
fastapi>=0.110
uvicorn[standard]>=0.27
# Data parsing / validation
pydantic>=2.6
# PDF handling
PyPDF2>=3.0.1
# DB (MySQL via SQLAlchemy + PyMySQL)
SQLAlchemy>=2.0
PyMySQL>=1.1
# Env handling
python-dotenv>=1.0
# File uploads (FastAPI Form/File)
python-multipart>=0.0.9

68
src/parser_api.py Normal file
View File

@ -0,0 +1,68 @@
# parser_api.py
from __future__ import annotations
import json
from io import BytesIO
from dataclasses import asdict
from typing import Any, Dict, Optional, Union
import PyPDF2
# Importiere DEIN vorhandenes Modul (der Dateiname darunter ist nur ein Beispiel!)
# Passe ggf. den Modulnamen an, in dem dein großes Skript liegt:
import pdf_to_struct as core # enthält: read_pdf_fields, map_form_to_payload, payload_to_model, detect_variant, pdf_to_payload
JsonDict = Dict[str, Any]
def parse_from_pdf_path(pdf_path: str, variant: Optional[str] = None) -> JsonDict:
"""
Parse a PDF file from disk (path) using the mapping and return a JSON dict.
"""
model = core.pdf_to_payload(pdf_path, variant=variant)
return asdict(model)
def parse_from_pdf_bytes(pdf_bytes: Union[bytes, bytearray, memoryview],
variant: Optional[str] = None) -> JsonDict:
"""
Parse a PDF from in-memory bytes and return a JSON dict.
"""
with BytesIO(pdf_bytes) as bio:
reader = PyPDF2.PdfReader(bio)
form_fields: Dict[str, Dict[str, Any]] = reader.get_fields() or {}
form_fields = {k: (v or {}) for k, v in form_fields.items()}
v = variant or core.detect_variant(form_fields)
mapped = core.map_form_to_payload(form_fields, v)
model = core.payload_to_model(mapped)
return asdict(model)
def parse_from_form_data(form_data: Dict[str, Dict[str, Any]],
variant: Optional[str] = None) -> JsonDict:
"""
Parse raw PDF form-data structure (as returned by PyPDF2.get_fields()).
Expected shape: { field_name: { "/V": <value>, ... }, ... }
"""
if not isinstance(form_data, dict):
raise ValueError("form_data must be a dict in the shape {name: field_meta_dict}.")
v = variant or core.detect_variant(form_data)
mapped = core.map_form_to_payload(form_data, v)
model = core.payload_to_model(mapped)
return asdict(model)
# Convenience wrappers that return a JSON string (pretty-printed)
def parse_from_pdf_path_json(pdf_path: str, variant: Optional[str] = None) -> str:
return json.dumps(parse_from_pdf_path(pdf_path, variant), ensure_ascii=False, indent=2)
def parse_from_pdf_bytes_json(pdf_bytes: Union[bytes, bytearray, memoryview],
variant: Optional[str] = None) -> str:
return json.dumps(parse_from_pdf_bytes(pdf_bytes, variant), ensure_ascii=False, indent=2)
def parse_from_form_data_json(form_data: Dict[str, Dict[str, Any]],
variant: Optional[str] = None) -> str:
return json.dumps(parse_from_form_data(form_data, variant), ensure_ascii=False, indent=2)

331
src/pdf_filler.py Normal file
View File

@ -0,0 +1,331 @@
# pdf_filler.py
from __future__ import annotations
import io
import os
import re
from typing import Any, Dict, Optional
import PyPDF2
from PyPDF2.generic import (
NameObject,
BooleanObject,
IndirectObject,
ArrayObject,
DictionaryObject,
createStringObject,
)
# dein Modul mit Mapping; ggf. Namen anpassen:
import pdf_to_struct as core # _merge_mapping
# -----------------------------
# Assets (relativ zum Modul)
# -----------------------------
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))
ASSET_QSM_DEFAULT = os.path.join(_THIS_DIR, "assets", "qsm.pdf")
ASSET_VSM_DEFAULT = os.path.join(_THIS_DIR, "assets", "vsm.pdf")
ASSET_QSM = os.environ.get("QSM_TEMPLATE", ASSET_QSM_DEFAULT)
ASSET_VSM = os.environ.get("VSM_TEMPLATE", ASSET_VSM_DEFAULT)
def _get_template(variant: str) -> str:
v = (variant or "").upper()
if v == "QSM":
return ASSET_QSM
if v == "VSM":
return ASSET_VSM
raise ValueError("variant must be 'QSM' or 'VSM'")
# -----------------------------
# Helfer
# -----------------------------
_WILDCARD_RE = re.compile(
r"^(?P<prefix>.*)\{(?P<var>[a-zA-Z]);(?P<start>\d+):(?P<end>\d+)\}(?P<suffix>.*)$"
)
def _expand_wildcard_key(form_key: str):
m = _WILDCARD_RE.match(form_key)
if not m:
return None
return (
m.group("prefix"),
m.group("var"),
int(m.group("start")),
int(m.group("end")),
m.group("suffix"),
)
def _fmt_de_amount(v: Any) -> str:
try:
f = float(v)
except Exception:
return "" if v is None else str(v)
return f"{f:.2f}".replace(".", ",")
def _to_name(value: str) -> NameObject:
return NameObject("/" + value)
def _to_str(obj) -> Optional[str]:
if obj is None:
return None
s = str(obj)
return s[1:] if s.startswith("/") else s
def _flatten(payload: Dict[str, Any]) -> Dict[str, Any]:
flat: Dict[str, Any] = {}
def rec(prefix: str, obj: Any):
if isinstance(obj, dict):
for k, v in obj.items():
rec(f"{prefix}.{k}" if prefix else k, v)
elif isinstance(obj, list):
for i, v in enumerate(obj):
rec(f"{prefix}[{i}]", v)
else:
flat[prefix] = obj
rec("", payload)
return flat
def _collect_btn_widgets(reader: PyPDF2.PdfReader):
"""
Sammle alle /Btn-Widgets: name -> [annotDicts...]
und die möglichen Exportzustände (aus /AP /N).
"""
btn_widgets_by_name: dict[str, list[DictionaryObject]] = {}
export_values_by_name: dict[str, set[str]] = {}
for page in reader.pages:
annots = page.get("/Annots")
if isinstance(annots, IndirectObject):
try:
annots = annots.get_object()
except Exception:
annots = None
if not annots or not isinstance(annots, (list, ArrayObject)):
continue
for annot_ref in annots:
try:
annot = annot_ref.get_object() if isinstance(annot_ref, IndirectObject) else annot_ref
if not isinstance(annot, DictionaryObject):
continue
if _to_str(annot.get("/FT")) != "Btn":
continue
name = _to_str(annot.get("/T"))
if not name:
continue
btn_widgets_by_name.setdefault(name, []).append(annot)
ap = annot.get("/AP")
if ap and isinstance(ap, DictionaryObject):
n = ap.get("/N")
if isinstance(n, IndirectObject):
n = n.get_object()
if n and isinstance(n, DictionaryObject):
for k in n.keys():
ks = str(k)
if ks == "/Off":
continue
val = ks[1:] if ks.startswith("/") else ks
export_values_by_name.setdefault(name, set()).add(val)
except Exception:
continue
return btn_widgets_by_name, export_values_by_name
# -----------------------------
# Kern: PDF füllen (direktes Widget-Update)
# -----------------------------
def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = None) -> bytes:
"""
Payload (asdict(RootPayload) ODER dein payload["pa"]-ähnliches Dict) -> befüllte PDF-Bytes.
"""
template_path = _get_template(variant)
if not os.path.isfile(template_path):
raise FileNotFoundError(f"Template not found: {template_path}")
with open(template_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
writer = PyPDF2.PdfWriter()
# Seiten kopieren
for p in reader.pages:
writer.add_page(p)
# AcroForm übernehmen + NeedAppearances
try:
root = reader.trailer.get("/Root")
if root and "/AcroForm" in root:
acroform = root["/AcroForm"]
writer._root_object.update({NameObject("/AcroForm"): acroform})
try:
writer._root_object["/AcroForm"].update({NameObject("/NeedAppearances"): BooleanObject(True)})
except Exception:
pass
else:
writer._root_object.update({
NameObject("/AcroForm"): PyPDF2.generic.DictionaryObject({
NameObject("/NeedAppearances"): BooleanObject(True)
})
})
except Exception:
pass
mapping = core._merge_mapping(variant)
flat = _flatten(payload)
# 1) Alle konkreten Updates vorbereiten (konkrete Feldnamen -> Wert)
text_updates: Dict[str, str] = {} # normale Felder (Tx/Ch)
btn_updates: Dict[str, str] = {} # Btn-Felder: 'Off' oder Exportname (ohne '/')
# Erkenne vorhandene /Btn Feldnamen im Dokument
btn_widgets_by_name, export_values_by_name = _collect_btn_widgets(reader)
btn_names = set(btn_widgets_by_name.keys())
for form_key, spec in mapping.items():
w = _expand_wildcard_key(form_key)
if not w:
tkey = spec["target-key"]
if tkey not in flat:
continue
val = flat[tkey]
if val is None:
continue
if form_key in btn_names:
# /Btn
if spec.get("type") is bool:
btn_updates[form_key] = "Yes" if bool(val) else "Off"
elif spec.get("type") == "enum":
v = str(val)
# nur Exportwert zulassen, wenn im Template vorhanden (falls bekannt)
if not export_values_by_name.get(form_key) or v in export_values_by_name[form_key]:
btn_updates[form_key] = v
else:
btn_updates[form_key] = "Off"
else:
btn_updates[form_key] = "Off"
else:
# Text/Choice
if spec.get("type") is float and ("amountEur" in tkey or "requestedAmountEur" in tkey):
text_updates[form_key] = _fmt_de_amount(val)
else:
text_updates[form_key] = "" if val is None else str(val)
continue
# Wildcard (Kosten)
prefix, var, start, end, suffix = w
tkey_tpl: str = spec["target-key"]
for a in range(start, end + 1):
pdf_key = f"{prefix}{a}{suffix}" # 1-basiert
idx0 = a - 1
tkey_concrete = (
tkey_tpl.replace(f"[{{{var}}}]", f"[{idx0}]")
.replace(f"{{{var}}}", str(idx0))
)
if tkey_concrete not in flat:
continue
val = flat[tkey_concrete]
if val is None:
continue
# Kosten sind Text
if spec.get("type") is float and ("amountEur" in tkey_concrete or "requestedAmountEur" in tkey_concrete):
text_updates[pdf_key] = _fmt_de_amount(val)
else:
text_updates[pdf_key] = "" if val is None else str(val)
# 2) Alle Seiten/Widgets iterieren und direkt setzen
for page in writer.pages:
annots = page.get("/Annots")
if isinstance(annots, IndirectObject):
try:
annots = annots.get_object()
except Exception:
annots = None
if not annots or not isinstance(annots, (list, ArrayObject)):
continue
for annot_ref in annots:
try:
annot = annot_ref.get_object() if isinstance(annot_ref, IndirectObject) else annot_ref
if not isinstance(annot, DictionaryObject):
continue
name = _to_str(annot.get("/T"))
if not name:
continue
ft = _to_str(annot.get("/FT"))
# Text/Choice
if name in text_updates and ft in (None, "Tx", "Ch"):
value = text_updates[name]
annot.update({NameObject("/V"): createStringObject(value)})
parent = annot.get("/Parent")
if isinstance(parent, IndirectObject):
try:
p = parent.get_object()
if isinstance(p, DictionaryObject):
p.update({NameObject("/V"): createStringObject(value)})
except Exception:
pass
continue
# Button (/Btn)
if name in btn_updates and ft == "Btn":
desired = btn_updates[name] # 'Off' oder Exportname
# Export dieses Widgets herausfinden
widget_on = None
ap = annot.get("/AP")
if ap and isinstance(ap, DictionaryObject):
n = ap.get("/N")
if isinstance(n, IndirectObject):
n = n.get_object()
if n and isinstance(n, DictionaryObject):
for k in n.keys():
ks = str(k)
if ks == "/Off":
continue
widget_on = ks[1:] if ks.startswith("/") else ks
break
if desired != "Off" and widget_on and desired == widget_on:
annot.update({NameObject("/AS"): _to_name(desired)})
annot.update({NameObject("/V"): _to_name(desired)})
parent = annot.get("/Parent")
if isinstance(parent, IndirectObject):
try:
p = parent.get_object()
if isinstance(p, DictionaryObject):
p.update({NameObject("/V"): _to_name(desired)})
except Exception:
pass
else:
annot.update({NameObject("/AS"): NameObject("/Off")})
annot.update({NameObject("/V"): NameObject("/Off")})
parent = annot.get("/Parent")
if isinstance(parent, IndirectObject):
try:
p = parent.get_object()
if isinstance(p, DictionaryObject):
p.update({NameObject("/V"): NameObject("/Off")})
except Exception:
pass
except Exception:
continue
# 3) Schreiben
bio = io.BytesIO()
writer.write(bio)
data = bio.getvalue()
if out_path:
with open(out_path, "wb") as out:
out.write(data)
return data
def save_pdf(payload: Dict[str, Any], variant: str, out_path: str) -> None:
_ = fill_pdf(payload, variant, out_path=out_path)

View File

@ -467,7 +467,8 @@ def payload_to_model(payload: Dict[str, Any]) -> RootPayload:
def read_pdf_fields(pdf_file: str) -> Dict[str, Dict[str, Any]]:
with open(pdf_file, "rb") as f:
reader = PyPDF2.PdfReader(f)
reader = PyPDF2.PdfReader(f, strict=True)
fields = reader.get_fields() or {}
# ensure dict[str, dict] and keep only keys we care about
return {k: (v or {}) for k, v in fields.items()}

610
src/service_api.py Normal file
View File

@ -0,0 +1,610 @@
# service_api.py
from __future__ import annotations
"""
FastAPI-Service für STUPA-PDF-Workflows.
Voraussetzung: vorhandene Module
- pdf_to_struct (stellt u.a. bereit: pdf_to_payload, map_form_to_payload, payload_to_model)
- pdf_filler (stellt u.a. bereit: fill_pdf)
.env (Beispiel):
MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_DB=stupa
MYSQL_USER=stupa
MYSQL_PASSWORD=secret
MASTER_KEY=supersecret_master
RATE_IP_PER_MIN=60
RATE_KEY_PER_MIN=30
QSM_TEMPLATE=assets/qsm.pdf # optional (falls abweichend)
VSM_TEMPLATE=assets/vsm.pdf
"""
import io
import os
import time
import json
import base64
import secrets
import hashlib
import tempfile
from datetime import datetime
from typing import Any, Dict, Optional, List
from dotenv import load_dotenv
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends, Query, Body, Header, Response
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
from sqlalchemy import (
create_engine, Column, Integer, String, Text, DateTime, JSON as SAJSON,
select, func, UniqueConstraint
)
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from sqlalchemy.exc import IntegrityError
from sqlalchemy import text as sql_text
import PyPDF2
from PyPDF2.errors import PdfReadError
# Eigene Module (aus deinem Projekt):
import pdf_to_struct as core # nutzt: pdf_to_payload, map_form_to_payload, payload_to_model, detect_variant
from pdf_filler import fill_pdf
# -------------------------------------------------------------
# ENV & DB
# -------------------------------------------------------------
load_dotenv()
MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306"))
MYSQL_DB = os.getenv("MYSQL_DB", "stupa")
MYSQL_USER = os.getenv("MYSQL_USER", "stupa")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "secret")
MASTER_KEY = os.getenv("MASTER_KEY", "")
RATE_IP_PER_MIN = int(os.getenv("RATE_IP_PER_MIN", "60"))
RATE_KEY_PER_MIN = int(os.getenv("RATE_KEY_PER_MIN", "30"))
DB_DSN = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}?charset=utf8mb4"
engine = create_engine(DB_DSN, pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)
Base = declarative_base()
# -------------------------------------------------------------
# DB-Modelle
# -------------------------------------------------------------
class Counter(Base):
__tablename__ = "counters"
# Jahr in voller Form (z.B. 2025)
year = Column(Integer, primary_key=True)
seq = Column(Integer, nullable=False, default=0)
class Application(Base):
__tablename__ = "applications"
id = Column(Integer, primary_key=True, autoincrement=True)
pa_id = Column(String(16), unique=True, index=True, nullable=False) # YY-NNNN
pa_key_salt = Column(String(64), nullable=False)
pa_key_hash = Column(String(128), nullable=False)
variant = Column(String(8), nullable=False) # QSM/VSM/COMMON
status = Column(String(64), nullable=False, default="new")
# Gespeicherter Payload (ohne Klartext-Key)
payload_json = Column(SAJSON, nullable=False)
# optional: rohes Form-JSON (zur Nachvollziehbarkeit)
raw_form_json = Column(SAJSON, nullable=True)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow)
__table_args__ = (
UniqueConstraint("pa_id", name="uq_pa_id"),
)
def init_db():
Base.metadata.create_all(bind=engine)
# -------------------------------------------------------------
# Utils: Key-Hashing, ID-Vergabe, Rate-Limiting
# -------------------------------------------------------------
def _gen_pa_key() -> str:
# URL-sicher, ~32 Zeichen
return secrets.token_urlsafe(24)
def _hash_key(key: str, salt: Optional[str] = None) -> (str, str):
if not salt:
salt = secrets.token_hex(16) # 32 hex chars
# PBKDF2-HMAC-SHA256
dk = hashlib.pbkdf2_hmac("sha256", key.encode("utf-8"), bytes.fromhex(salt), 310000)
return salt, dk.hex()
def _verify_key(key: str, salt_hex: str, hash_hex: str) -> bool:
test = hashlib.pbkdf2_hmac("sha256", key.encode("utf-8"), bytes.fromhex(salt_hex), 310000).hex()
# timing-safe compare
return secrets.compare_digest(test, hash_hex)
def _alloc_next_id(db: Session) -> str:
now = datetime.utcnow()
year_full = now.year
yy = year_full % 100
# Counter row sperren/erstellen
row = db.execute(
select(Counter).where(Counter.year == year_full).with_for_update()
).scalar_one_or_none()
if not row:
row = Counter(year=year_full, seq=0)
db.add(row)
db.flush()
db.refresh(row)
row.seq += 1
db.flush()
db.refresh(row)
return f"{yy:02d}-{row.seq:04d}"
# sehr einfacher In-Memory-Rate-Limiter (pro Prozess)
# production: besser Redis verwenden
_RATE_BUCKETS: dict[str, List[float]] = {}
def _rate_limit(key: str, limit: int, window_sec: int = 60):
now = time.time()
bucket = _RATE_BUCKETS.setdefault(key, [])
# alte Einträge entfernen
while bucket and bucket[0] <= now - window_sec:
bucket.pop(0)
if len(bucket) >= limit:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
bucket.append(now)
# -------------------------------------------------------------
# Schemas (Pydantic)
# -------------------------------------------------------------
class CreateResponse(BaseModel):
pa_id: str
pa_key: str
variant: str
status: str = "new"
class UpdateResponse(BaseModel):
pa_id: str
variant: str
status: str
class SetStatusRequest(BaseModel):
status: str = Field(..., min_length=1, max_length=64)
class SearchQuery(BaseModel):
q: Optional[str] = None
status: Optional[str] = None
variant: Optional[str] = None
limit: int = 50
offset: int = 0
# -------------------------------------------------------------
# Auth-Helpers
# -------------------------------------------------------------
def _auth_from_request(
db: Session,
pa_id: Optional[str],
key_header: Optional[str],
key_query: Optional[str],
master_header: Optional[str],
) -> dict:
# Ratelimit (IP-unabhängig auf Key/Master)
if master_header:
_rate_limit(f"MASTER:{master_header}", RATE_KEY_PER_MIN)
if not MASTER_KEY or master_header != MASTER_KEY:
raise HTTPException(status_code=403, detail="Invalid master key")
return {"scope": "master"}
supplied = key_header or key_query
if pa_id is None:
# für Public Endpunkte (z.B. Create ohne ID) nicht nötig
return {"scope": "public"}
if not supplied:
raise HTTPException(status_code=401, detail="Missing key")
_rate_limit(f"APPKEY:{pa_id}", RATE_KEY_PER_MIN)
app = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
if not _verify_key(supplied, app.pa_key_salt, app.pa_key_hash):
raise HTTPException(status_code=403, detail="Invalid application key")
return {"scope": "app", "app": app}
# -------------------------------------------------------------
# FastAPI Setup
# -------------------------------------------------------------
app = FastAPI(title="STUPA PDF API", version="1.0.0")
@app.on_event("startup")
def _startup():
init_db()
# Globales IP-Ratelimit (sehr einfach) per Request
def rate_limit_ip(ip: str):
if not ip:
ip = "unknown"
_rate_limit(f"IP:{ip}", RATE_IP_PER_MIN)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# -------------------------------------------------------------
# Hilfen: Payload-Erzeugung aus Upload
# -------------------------------------------------------------
def _payload_from_pdf_bytes(tmp_path: str, variant: Optional[str]) -> Dict[str, Any]:
try:
# pdf_to_payload liefert RootPayload-Dataclass
model = core.pdf_to_payload(tmp_path, variant=variant)
# asdict(model) in pdf_to_struct wird schon beim JSON-Export genutzt;
# wir brauchen das verschachtelte Objekt, das 'pa' enthält:
from dataclasses import asdict
return asdict(model)
except PdfReadError as e:
raise HTTPException(status_code=400, detail=f"PDF parse error: {e}")
def _payload_from_form_json(form_json: Dict[str, Any], variant: Optional[str]) -> Dict[str, Any]:
# map_form_to_payload -> dict mit 'pa....'; danach in Model, dann wieder asdict
mapped = core.map_form_to_payload(form_json, variant or "AUTO")
model = core.payload_to_model(mapped)
from dataclasses import asdict
return asdict(model)
def _inject_meta_for_render(payload: Dict[str, Any], pa_id: str, pa_key: Optional[str]) -> Dict[str, Any]:
# Wir injizieren Key/ID NUR für die PDF-Generierung in payload['pa'].*,
# speichern aber den Key nicht im DB-Payload.
p2 = json.loads(json.dumps(payload)) # deep copy
p2.setdefault("pa", {}).setdefault("meta", {})
p2["pa"]["meta"]["id"] = pa_id
if pa_key is not None:
p2["pa"]["meta"]["key"] = pa_key
return p2
def _sanitize_payload_for_db(payload: Dict[str, Any]) -> Dict[str, Any]:
# Key aus persistentem Payload entfernen/neutralisieren
p2 = json.loads(json.dumps(payload))
meta = p2.setdefault("pa", {}).setdefault("meta", {})
if "key" in meta:
meta["key"] = None
return p2
# -------------------------------------------------------------
# Endpunkte
# -------------------------------------------------------------
@app.post("/applications", response_model=CreateResponse, responses={200: {"content": {"application/pdf": {}}}})
def create_application(
response: Response,
variant: Optional[str] = Query(None, description="QSM|VSM|COMMON|AUTO"),
return_format: str = Query("pdf", regex="^(pdf|json)$"),
pdf: Optional[UploadFile] = File(None, description="PDF Upload (Alternative zu form_json)"),
form_json_b64: Optional[str] = Form(None, description="Base64-kodiertes Roh-Form-JSON (Alternative zu Datei)"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
# Rate-Limit nach IP
rate_limit_ip(x_forwarded_for or "")
# Payload beschaffen
payload: Dict[str, Any]
raw_form: Optional[Dict[str, Any]] = None
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
if pdf:
tf.write(pdf.file.read())
tf.flush()
payload = _payload_from_pdf_bytes(tf.name, variant)
elif form_json_b64:
try:
raw = base64.b64decode(form_json_b64)
raw_form = json.loads(raw.decode("utf-8"))
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
payload = _payload_from_form_json(raw_form, variant or "AUTO")
else:
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
# Prüfen, ob bereits pa.meta.id gesetzt ist → Create nur ohne ID
pa_meta = payload.get("pa", {}).get("meta", {}) or {}
if pa_meta.get("id"):
raise HTTPException(status_code=400, detail="pa-id already set; use update endpoint")
# Erzeugen in TX
try:
with db.begin():
pa_id = _alloc_next_id(db)
pa_key_plain = _gen_pa_key()
salt, key_hash = _hash_key(pa_key_plain)
# Variante bestimmen (falls AUTO)
detected = variant or core.detect_variant(payload.get("pa", {})) or "COMMON"
detected = detected.upper()
if detected == "AUTO":
detected = "COMMON"
# Render-Payload mit ID/Key
render = _inject_meta_for_render(payload, pa_id, pa_key_plain)
# PDF erzeugen
pdf_bytes = fill_pdf(render, "QSM" if detected == "QSM" else "VSM" if detected == "VSM" else "VSM") # COMMON: nimm VSM/Vorlage an ggf. anpassen
# DB-Payload ohne Key
store_payload = _sanitize_payload_for_db(payload)
app_row = Application(
pa_id=pa_id,
pa_key_salt=salt,
pa_key_hash=key_hash,
variant=detected,
status="new",
payload_json=store_payload,
raw_form_json=raw_form,
)
db.add(app_row)
except IntegrityError:
# sehr seltene Race-Condition bei ID erneut versuchen
raise HTTPException(status_code=409, detail="ID allocation conflict; retry")
# Antwort
if return_format == "json":
return CreateResponse(pa_id=pa_id, pa_key=pa_key_plain, variant=detected, status="new")
# PDF zurückgeben, Key in Header
response.headers["X-PA-ID"] = pa_id
response.headers["X-PA-KEY"] = pa_key_plain
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf")
@app.put("/applications/{pa_id}", response_model=UpdateResponse, responses={200: {"content": {"application/pdf": {}}}})
def update_application(
pa_id: str,
response: Response,
return_format: str = Query("pdf", regex="^(pdf|json)$"),
variant: Optional[str] = Query(None),
pdf: Optional[UploadFile] = File(None),
form_json_b64: Optional[str] = Form(None),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key, None, x_master_key)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
# Payload beschaffen
payload: Dict[str, Any]
raw_form: Optional[Dict[str, Any]] = None
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
if pdf:
tf.write(pdf.file.read())
tf.flush()
payload = _payload_from_pdf_bytes(tf.name, variant or app_row.variant)
elif form_json_b64:
try:
raw = base64.b64decode(form_json_b64)
raw_form = json.loads(raw.decode("utf-8"))
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
payload = _payload_from_form_json(raw_form, variant or app_row.variant)
else:
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
# Immer mit bestehender ID, Key NICHT in DB-Payload speichern
render = _inject_meta_for_render(payload, app_row.pa_id, None) # Key nicht neu ausgeben
store_payload = _sanitize_payload_for_db(payload)
# PDF rendern mit vorhandener Variante
chosen_variant = (variant or app_row.variant).upper()
pdf_bytes = fill_pdf(render, "QSM" if chosen_variant == "QSM" else "VSM")
with db.begin():
app_row.variant = chosen_variant
app_row.updated_at = datetime.utcnow()
app_row.payload_json = store_payload
if raw_form is not None:
app_row.raw_form_json = raw_form
db.add(app_row)
if return_format == "json":
return UpdateResponse(pa_id=app_row.pa_id, variant=app_row.variant, status=app_row.status)
response.headers["X-PA-ID"] = app_row.pa_id
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf")
@app.get("/applications/{pa_id}")
def get_application(
pa_id: str,
format: str = Query("json", regex="^(json|pdf)$"),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None, description="Alternative zum Header für den App-Key"),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
if format == "pdf":
# Für Anzeige PDF neu rendern (ohne Key)
render = _inject_meta_for_render(app_row.payload_json, app_row.pa_id, None)
pdf_bytes = fill_pdf(render, "QSM" if app_row.variant == "QSM" else "VSM")
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf")
# Sonst JSON
return {
"pa_id": app_row.pa_id,
"variant": app_row.variant,
"status": app_row.status,
"payload": app_row.payload_json,
"created_at": app_row.created_at.isoformat(),
"updated_at": app_row.updated_at.isoformat(),
}
@app.get("/applications")
def list_applications(
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
status: Optional[str] = Query(None),
variant: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
pa_id: Optional[str] = Query(None, description="Mit Key: nur diesen Antrag anzeigen"),
key: Optional[str] = Query(None),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
# Mit Master-Key: alle listen/filtern
if x_master_key:
_ = _auth_from_request(db, None, None, None, x_master_key)
q = select(Application).order_by(Application.created_at.desc())
if status:
q = q.where(Application.status == status)
if variant:
q = q.where(Application.variant == variant.upper())
q = q.limit(limit).offset(offset)
rows = db.execute(q).scalars().all()
return [
{"pa_id": r.pa_id, "variant": r.variant, "status": r.status,
"created_at": r.created_at.isoformat(), "updated_at": r.updated_at.isoformat()}
for r in rows
]
# Ohne Master: nur eigenen Antrag (pa_id + key erforderlich)
if not pa_id:
raise HTTPException(status_code=400, detail="pa_id required without master key")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, None)
app_row: Application = auth.get("app")
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
return [{
"pa_id": app_row.pa_id, "variant": app_row.variant, "status": app_row.status,
"created_at": app_row.created_at.isoformat(), "updated_at": app_row.updated_at.isoformat()
}]
@app.post("/applications/{pa_id}/status")
def set_status(
pa_id: str,
req: SetStatusRequest,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
with db.begin():
app_row.status = req.status
app_row.updated_at = datetime.utcnow()
db.add(app_row)
return {"pa_id": app_row.pa_id, "status": app_row.status}
@app.delete("/applications/{pa_id}")
def delete_application(
pa_id: str,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
with db.begin():
db.delete(app_row)
return {"deleted": True, "pa_id": pa_id}
@app.get("/applications/search")
def search_applications(
q: Optional[str] = Query(None, description="Volltext über payload_json (einfach)"),
status: Optional[str] = Query(None),
variant: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
_ = _auth_from_request(db, None, None, None, x_master_key)
# sehr einfache Suche (MySQL JSON_EXTRACT/LIKE); für produktion auf FTS migrieren
base_sql = "SELECT pa_id, variant, status, created_at, updated_at FROM applications WHERE 1=1"
params = {}
if status:
base_sql += " AND status=:status"
params["status"] = status
if variant:
base_sql += " AND variant=:variant"
params["variant"] = variant.upper()
if q:
# naive Suche im JSON
base_sql += " AND JSON_SEARCH(JSON_EXTRACT(payload_json, '$'), 'all', :q) IS NOT NULL"
params["q"] = f"%{q}%"
base_sql += " ORDER BY created_at DESC LIMIT :limit OFFSET :offset"
params["limit"] = limit
params["offset"] = offset
rows = db.execute(sql_text(base_sql), params).all()
return [
{"pa_id": r[0], "variant": r[1], "status": r[2],
"created_at": r[3].isoformat(), "updated_at": r[4].isoformat()}
for r in rows
]