stupa-pdf-api/src/service_api.py
2025-09-01 03:05:36 +02:00

979 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# service_api.py
from __future__ import annotations
"""
FastAPI-Service für STUPA-PDF-Workflows.
Voraussetzung: vorhandene Module
- pdf_to_struct (stellt u.a. bereit: pdf_to_payload, map_form_to_payload, payload_to_model)
- pdf_filler (stellt u.a. bereit: fill_pdf)
.env (Beispiel):
MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_DB=stupa
MYSQL_USER=stupa
MYSQL_PASSWORD=secret
MASTER_KEY=supersecret_master
RATE_IP_PER_MIN=60
RATE_KEY_PER_MIN=30
QSM_TEMPLATE=assets/qsm.pdf # optional (falls abweichend)
VSM_TEMPLATE=assets/vsm.pdf
"""
import io
import os
import time
import json
import base64
import secrets
import hashlib
import tempfile
from datetime import datetime
from typing import Any, Dict, Optional, List
from dotenv import load_dotenv
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends, Query, Body, Header, Response
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
from sqlalchemy import (
create_engine, Column, Integer, String, Text, DateTime, JSON as SAJSON,
select, func, UniqueConstraint
)
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from sqlalchemy.exc import IntegrityError
from sqlalchemy import text as sql_text
import PyPDF2
from PyPDF2.errors import PdfReadError
# Eigene Module (aus deinem Projekt):
import pdf_to_struct as core # nutzt: pdf_to_payload, map_form_to_payload, payload_to_model, detect_variant
from pdf_filler import fill_pdf
# -------------------------------------------------------------
# ENV & DB
# -------------------------------------------------------------
load_dotenv()
MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306"))
MYSQL_DB = os.getenv("MYSQL_DB", "stupa")
MYSQL_USER = os.getenv("MYSQL_USER", "stupa")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "secret")
MASTER_KEY = os.getenv("MASTER_KEY", "")
RATE_IP_PER_MIN = int(os.getenv("RATE_IP_PER_MIN", "60"))
RATE_KEY_PER_MIN = int(os.getenv("RATE_KEY_PER_MIN", "30"))
DB_DSN = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}?charset=utf8mb4"
engine = create_engine(DB_DSN, pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)
Base = declarative_base()
# -------------------------------------------------------------
# DB-Modelle
# -------------------------------------------------------------
class Counter(Base):
__tablename__ = "counters"
# Jahr in voller Form (z.B. 2025)
year = Column(Integer, primary_key=True)
seq = Column(Integer, nullable=False, default=0)
class Application(Base):
__tablename__ = "applications"
id = Column(Integer, primary_key=True, autoincrement=True)
pa_id = Column(String(16), unique=True, index=True, nullable=False) # YY-NNNN
pa_key_salt = Column(String(64), nullable=False)
pa_key_hash = Column(String(128), nullable=False)
variant = Column(String(8), nullable=False) # QSM/VSM
status = Column(String(64), nullable=False, default="new")
# Gespeicherter Payload (ohne Klartext-Key)
payload_json = Column(SAJSON, nullable=False)
# optional: rohes Form-JSON (zur Nachvollziehbarkeit)
raw_form_json = Column(SAJSON, nullable=True)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow)
__table_args__ = (
UniqueConstraint("pa_id", name="uq_pa_id"),
)
class Attachment(Base):
__tablename__ = "attachments"
id = Column(Integer, primary_key=True, autoincrement=True)
filename = Column(String(255), nullable=False)
content_type = Column(String(100), nullable=False)
size = Column(Integer, nullable=False)
data = Column(LONGTEXT, nullable=False) # Base64 encoded blob
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
class ApplicationAttachment(Base):
__tablename__ = "application_attachments"
id = Column(Integer, primary_key=True, autoincrement=True)
application_id = Column(Integer, nullable=False, index=True)
attachment_id = Column(Integer, nullable=False, index=True)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
__table_args__ = (
UniqueConstraint("application_id", "attachment_id", name="uq_app_attachment"),
)
def init_db():
Base.metadata.create_all(bind=engine)
# -------------------------------------------------------------
# Utils: Key-Hashing, ID-Vergabe, Rate-Limiting
# -------------------------------------------------------------
def _gen_pa_key() -> str:
# URL-sicher, ~32 Zeichen
return secrets.token_urlsafe(24)
def _hash_key(key: str, salt: Optional[str] = None) -> (str, str):
if not salt:
salt = secrets.token_hex(16) # 32 hex chars
# PBKDF2-HMAC-SHA256
dk = hashlib.pbkdf2_hmac("sha256", key.encode("utf-8"), bytes.fromhex(salt), 310000)
return salt, dk.hex()
def _verify_key(key: str, salt_hex: str, hash_hex: str) -> bool:
test = hashlib.pbkdf2_hmac("sha256", key.encode("utf-8"), bytes.fromhex(salt_hex), 310000).hex()
# timing-safe compare
return secrets.compare_digest(test, hash_hex)
def _alloc_next_id(db: Session) -> str:
now = datetime.utcnow()
year_full = now.year
yy = year_full % 100
# Counter row sperren/erstellen
row = db.execute(
select(Counter).where(Counter.year == year_full).with_for_update()
).scalar_one_or_none()
if not row:
row = Counter(year=year_full, seq=0)
db.add(row)
db.flush()
db.refresh(row)
row.seq += 1
db.flush()
db.refresh(row)
return f"{yy:02d}-{row.seq:04d}"
# sehr einfacher In-Memory-Rate-Limiter (pro Prozess)
# production: besser Redis verwenden
_RATE_BUCKETS: dict[str, List[float]] = {}
def _rate_limit(key: str, limit: int, window_sec: int = 60):
now = time.time()
bucket = _RATE_BUCKETS.setdefault(key, [])
# alte Einträge entfernen
while bucket and bucket[0] <= now - window_sec:
bucket.pop(0)
if len(bucket) >= limit:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
bucket.append(now)
# -------------------------------------------------------------
# Schemas (Pydantic)
# -------------------------------------------------------------
class CreateResponse(BaseModel):
pa_id: str
pa_key: str
variant: str
status: str = "new"
class UpdateResponse(BaseModel):
pa_id: str
variant: str
status: str
class SetStatusRequest(BaseModel):
status: str = Field(..., min_length=1, max_length=64)
class SearchQuery(BaseModel):
q: Optional[str] = None
status: Optional[str] = None
variant: Optional[str] = None
limit: int = 50
offset: int = 0
class AttachmentInfo(BaseModel):
id: int
filename: str
content_type: str
size: int
created_at: datetime
class AttachmentUploadResponse(BaseModel):
attachment_id: int
filename: str
size: int
# -------------------------------------------------------------
# Auth-Helpers
# -------------------------------------------------------------
def _auth_from_request(
db: Session,
pa_id: Optional[str],
key_header: Optional[str],
key_query: Optional[str],
master_header: Optional[str],
) -> dict:
# Ratelimit (IP-unabhängig auf Key/Master)
if master_header:
_rate_limit(f"MASTER:{master_header}", RATE_KEY_PER_MIN)
if not MASTER_KEY or master_header != MASTER_KEY:
raise HTTPException(status_code=403, detail="Invalid master key")
return {"scope": "master"}
supplied = key_header or key_query
if pa_id is None:
# für Public Endpunkte (z.B. Create ohne ID) nicht nötig
return {"scope": "public"}
if not supplied:
raise HTTPException(status_code=401, detail="Missing key")
_rate_limit(f"APPKEY:{pa_id}", RATE_KEY_PER_MIN)
app = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
if not _verify_key(supplied, app.pa_key_salt, app.pa_key_hash):
raise HTTPException(status_code=403, detail="Invalid application key")
return {"scope": "app", "app": app}
# -------------------------------------------------------------
# FastAPI Setup
# -------------------------------------------------------------
app = FastAPI(title="STUPA PDF API", version="1.0.0")
@app.on_event("startup")
def _startup():
init_db()
# Globales IP-Ratelimit (sehr einfach) per Request
def rate_limit_ip(ip: str):
if not ip:
ip = "unknown"
_rate_limit(f"IP:{ip}", RATE_IP_PER_MIN)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# -------------------------------------------------------------
# Hilfen: Payload-Erzeugung aus Upload
# -------------------------------------------------------------
def _payload_from_pdf_bytes(tmp_path: str, variant: Optional[str]) -> Dict[str, Any]:
try:
# pdf_to_payload liefert RootPayload-Dataclass
model = core.pdf_to_payload(tmp_path, variant=variant)
# asdict(model) in pdf_to_struct wird schon beim JSON-Export genutzt;
# wir brauchen das verschachtelte Objekt, das 'pa' enthält:
from dataclasses import asdict
return asdict(model)
except PdfReadError as e:
raise HTTPException(status_code=400, detail=f"PDF parse error: {e}")
def _payload_from_form_json(form_json: Dict[str, Any], variant: Optional[str]) -> Dict[str, Any]:
# map_form_to_payload -> dict mit 'pa....'; danach in Model, dann wieder asdict
mapped = core.map_form_to_payload(form_json, variant or "AUTO")
model = core.payload_to_model(mapped)
from dataclasses import asdict
return asdict(model)
def _inject_meta_for_render(payload: Dict[str, Any], pa_id: str, pa_key: Optional[str]) -> Dict[str, Any]:
# Wir injizieren Key/ID NUR für die PDF-Generierung in payload['pa'].*,
# speichern aber den Key nicht im DB-Payload.
p2 = json.loads(json.dumps(payload)) # deep copy
p2.setdefault("pa", {}).setdefault("meta", {})
p2["pa"]["meta"]["id"] = pa_id
if pa_key is not None:
p2["pa"]["meta"]["key"] = pa_key
# Calculate total amount from costs dynamically
project = p2.get("pa", {}).get("project", {})
costs = project.get("costs", [])
total_amount = 0.0
for cost in costs:
if isinstance(cost, dict) and "amountEur" in cost:
amount = cost.get("amountEur")
if amount is not None and isinstance(amount, (int, float)):
total_amount += float(amount)
# Set the calculated total
project.setdefault("totals", {})
project["totals"]["requestedAmountEur"] = total_amount
return p2
def _sanitize_payload_for_db(payload: Dict[str, Any]) -> Dict[str, Any]:
# Key aus persistentem Payload entfernen/neutralisieren
p2 = json.loads(json.dumps(payload))
meta = p2.setdefault("pa", {}).setdefault("meta", {})
if "key" in meta:
meta["key"] = None
# Remove calculated total from database storage
project = p2.get("pa", {}).get("project", {})
if "totals" in project and "requestedAmountEur" in project["totals"]:
del project["totals"]["requestedAmountEur"]
return p2
# -------------------------------------------------------------
# Endpunkte
# -------------------------------------------------------------
@app.post("/applications", response_model=CreateResponse, responses={200: {"content": {"application/pdf": {}}}})
def create_application(
response: Response,
variant: Optional[str] = Query(None, description="QSM|VSM|AUTO"),
return_format: str = Query("pdf", regex="^(pdf|json)$"),
pdf: Optional[UploadFile] = File(None, description="PDF Upload (Alternative zu form_json)"),
form_json_b64: Optional[str] = Form(None, description="Base64-kodiertes Roh-Form-JSON (Alternative zu Datei)"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
# Rate-Limit nach IP
rate_limit_ip(x_forwarded_for or "")
# Payload beschaffen
payload: Dict[str, Any]
raw_form: Optional[Dict[str, Any]] = None
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
if pdf:
tf.write(pdf.file.read())
tf.flush()
payload = _payload_from_pdf_bytes(tf.name, variant)
elif form_json_b64:
try:
raw = base64.b64decode(form_json_b64)
raw_form = json.loads(raw.decode("utf-8"))
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
payload = _payload_from_form_json(raw_form, variant or "AUTO")
else:
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
# Prüfen, ob bereits pa.meta.id gesetzt ist → Create nur ohne ID
pa_meta = payload.get("pa", {}).get("meta", {}) or {}
if pa_meta.get("id"):
raise HTTPException(status_code=400, detail="pa-id already set; use update endpoint")
# Erzeugen in TX
try:
with db.begin():
pa_id = _alloc_next_id(db)
pa_key_plain = _gen_pa_key()
salt, key_hash = _hash_key(pa_key_plain)
# Variante bestimmen (falls AUTO)
detected = variant or core.detect_variant(payload.get("pa", {})) or "VSM"
detected = detected.upper()
if detected == "AUTO":
detected = "VSM"
# Map COMMON to VSM for backwards compatibility
if detected == "COMMON":
detected = "VSM"
# Render-Payload mit ID/Key
render = _inject_meta_for_render(payload, pa_id, pa_key_plain)
# PDF erzeugen
pdf_bytes = fill_pdf(render, "QSM" if detected == "QSM" else "VSM")
# Validate PDF generation
if not pdf_bytes or len(pdf_bytes) == 0:
raise HTTPException(status_code=500, detail="Failed to generate PDF - empty result")
if len(pdf_bytes) < 1000: # PDF should be at least 1KB
raise HTTPException(status_code=500, detail="PDF generation resulted in suspiciously small file")
# DB-Payload ohne Key
store_payload = _sanitize_payload_for_db(payload)
app_row = Application(
pa_id=pa_id,
pa_key_salt=salt,
pa_key_hash=key_hash,
variant=detected,
status="new",
payload_json=store_payload,
raw_form_json=raw_form,
)
db.add(app_row)
except IntegrityError:
# sehr seltene Race-Condition bei ID erneut versuchen
raise HTTPException(status_code=409, detail="ID allocation conflict; retry")
# Antwort
if return_format == "json":
return CreateResponse(pa_id=pa_id, pa_key=pa_key_plain, variant=detected, status="new")
# PDF zurückgeben, Key in Header
response.headers["X-PA-ID"] = pa_id
response.headers["X-PA-KEY"] = pa_key_plain
headers = {
"Content-Disposition": f"attachment; filename=antrag-{pa_id}.pdf"
}
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf", headers=headers)
@app.put("/applications/{pa_id}", response_model=UpdateResponse, responses={200: {"content": {"application/pdf": {}}}})
def update_application(
pa_id: str,
response: Response,
return_format: str = Query("pdf", regex="^(pdf|json)$"),
variant: Optional[str] = Query(None),
pdf: Optional[UploadFile] = File(None),
form_json_b64: Optional[str] = Form(None),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key, None, x_master_key)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
# Payload beschaffen
payload: Dict[str, Any]
raw_form: Optional[Dict[str, Any]] = None
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
if pdf:
tf.write(pdf.file.read())
tf.flush()
payload = _payload_from_pdf_bytes(tf.name, variant or app_row.variant)
elif form_json_b64:
try:
raw = base64.b64decode(form_json_b64)
raw_form = json.loads(raw.decode("utf-8"))
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
payload = _payload_from_form_json(raw_form, variant or app_row.variant)
else:
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
# Immer mit bestehender ID, Key NICHT in DB-Payload speichern
render = _inject_meta_for_render(payload, app_row.pa_id, None) # Key nicht neu ausgeben
store_payload = _sanitize_payload_for_db(payload)
# PDF rendern mit vorhandener Variante
chosen_variant = (variant or app_row.variant).upper()
pdf_bytes = fill_pdf(render, "QSM" if chosen_variant == "QSM" else "VSM")
# Validate PDF generation
if not pdf_bytes or len(pdf_bytes) == 0:
raise HTTPException(status_code=500, detail="Failed to generate PDF - empty result")
if len(pdf_bytes) < 1000: # PDF should be at least 1KB
raise HTTPException(status_code=500, detail="PDF generation resulted in suspiciously small file")
try:
app_row.variant = chosen_variant
app_row.updated_at = datetime.utcnow()
app_row.payload_json = store_payload
if raw_form is not None:
app_row.raw_form_json = raw_form
db.add(app_row)
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to update application: {str(e)}")
if return_format == "json":
return UpdateResponse(pa_id=app_row.pa_id, variant=app_row.variant, status=app_row.status)
response.headers["X-PA-ID"] = app_row.pa_id
headers = {
"Content-Disposition": f"attachment; filename=antrag-{app_row.pa_id}.pdf"
}
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf", headers=headers)
@app.get("/applications/{pa_id}")
def get_application(
pa_id: str,
format: str = Query("json", regex="^(json|pdf)$"),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None, description="Alternative zum Header für den App-Key"),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
if format == "pdf":
# Für Anzeige PDF neu rendern (ohne Key)
render = _inject_meta_for_render(app_row.payload_json, app_row.pa_id, None)
pdf_bytes = fill_pdf(render, "QSM" if app_row.variant == "QSM" else "VSM")
# Validate PDF generation
if not pdf_bytes or len(pdf_bytes) == 0:
raise HTTPException(status_code=500, detail="Failed to generate PDF - empty result")
if len(pdf_bytes) < 1000: # PDF should be at least 1KB
raise HTTPException(status_code=500, detail="PDF generation resulted in suspiciously small file")
headers = {
"Content-Disposition": f"attachment; filename=antrag-{app_row.pa_id}.pdf"
}
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf", headers=headers)
# Sonst JSON
return {
"pa_id": app_row.pa_id,
"variant": "VSM" if app_row.variant == "COMMON" else app_row.variant,
"status": app_row.status,
"payload": app_row.payload_json,
"created_at": app_row.created_at.isoformat(),
"updated_at": app_row.updated_at.isoformat()
}
@app.get("/applications")
def list_applications(
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
status: Optional[str] = Query(None),
variant: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
pa_id: Optional[str] = Query(None, description="Mit Key: nur diesen Antrag anzeigen"),
key: Optional[str] = Query(None),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
# Mit Master-Key: alle listen/filtern
if x_master_key:
_ = _auth_from_request(db, None, None, None, x_master_key)
q = select(Application).order_by(Application.created_at.desc())
if status:
q = q.where(Application.status == status)
if variant:
q = q.where(Application.variant == variant.upper())
q = q.limit(limit).offset(offset)
rows = db.execute(q).scalars().all()
result = []
for r in rows:
# Extract project name from payload if available
project_name = ""
if r.payload_json:
try:
payload = json.loads(r.payload_json) if isinstance(r.payload_json, str) else r.payload_json
project_name = payload.get("pa", {}).get("project", {}).get("name", "")
except:
pass
result.append({
"pa_id": r.pa_id,
"variant": "VSM" if r.variant == "COMMON" else r.variant,
"status": r.status,
"project_name": project_name,
"created_at": r.created_at.isoformat(),
"updated_at": r.updated_at.isoformat()
})
return result
# Ohne Master: nur eigenen Antrag (pa_id + key erforderlich)
if not pa_id:
raise HTTPException(status_code=400, detail="pa_id required without master key")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, None)
app_row: Application = auth.get("app")
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
# Extract project name from payload if available
project_name = ""
if app_row.payload_json:
try:
payload = json.loads(app_row.payload_json) if isinstance(app_row.payload_json, str) else app_row.payload_json
project_name = payload.get("pa", {}).get("project", {}).get("name", "")
except:
pass
return [{
"pa_id": app_row.pa_id,
"variant": "VSM" if app_row.variant == "COMMON" else app_row.variant,
"status": app_row.status,
"project_name": project_name,
"created_at": app_row.created_at.isoformat(),
"updated_at": app_row.updated_at.isoformat()
}]
@app.post("/applications/{pa_id}/status")
def set_status(
pa_id: str,
req: SetStatusRequest,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
try:
app_row.status = req.status
app_row.updated_at = datetime.utcnow()
db.add(app_row)
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to update status: {str(e)}")
return {"pa_id": app_row.pa_id, "status": app_row.status}
@app.delete("/applications/{pa_id}")
def delete_application(
pa_id: str,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
try:
db.delete(app_row)
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to delete application: {str(e)}")
return {"deleted": True, "pa_id": pa_id}
@app.post("/applications/{pa_id}/reset-credentials")
def reset_credentials(
pa_id: str,
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Reset access credentials for an application (Admin only)"""
rate_limit_ip(x_forwarded_for or "")
# Master-Key prüfen
if not x_master_key or x_master_key != MASTER_KEY:
raise HTTPException(status_code=403, detail="Invalid or missing master key")
# Antrag suchen
app_row = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
# Generate new credentials
pa_key_plain = _gen_pa_key()
salt, key_hash = _hash_key(pa_key_plain)
# Update the application
app_row.pa_key_salt = salt
app_row.pa_key_hash = key_hash
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to reset credentials: {str(e)}")
return {
"pa_id": pa_id,
"pa_key": pa_key_plain,
"message": "Credentials reset successfully"
}
@app.get("/applications/search")
def search_applications(
q: Optional[str] = Query(None, description="Volltext über payload_json (einfach)"),
status: Optional[str] = Query(None),
variant: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
_ = _auth_from_request(db, None, None, None, x_master_key)
# sehr einfache Suche (MySQL JSON_EXTRACT/LIKE); für produktion auf FTS migrieren
base_sql = "SELECT pa_id, variant, status, created_at, updated_at FROM applications WHERE 1=1"
params = {}
if status:
base_sql += " AND status=:status"
params["status"] = status
if variant:
base_sql += " AND variant=:variant"
params["variant"] = variant.upper()
if q:
# naive Suche im JSON
base_sql += " AND JSON_SEARCH(JSON_EXTRACT(payload_json, '$'), 'all', :q) IS NOT NULL"
params["q"] = f"%{q}%"
base_sql += " ORDER BY created_at DESC LIMIT :limit OFFSET :offset"
params["limit"] = limit
params["offset"] = offset
rows = db.execute(sql_text(base_sql), params).all()
return [
{"pa_id": r[0], "variant": r[1], "status": r[2],
"created_at": r[3].isoformat(), "updated_at": r[4].isoformat()}
for r in rows
]
# -------------------------------------------------------------
# Attachment Endpoints
# -------------------------------------------------------------
@app.post("/applications/{pa_id}/attachments", response_model=AttachmentUploadResponse)
async def upload_attachment(
pa_id: str,
file: UploadFile = File(...),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Upload an attachment for an application"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
if not auth:
raise HTTPException(status_code=401, detail="Unauthorized")
# Check if application exists
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check attachment count limit (30 attachments max)
attachment_count = db.query(ApplicationAttachment).filter(
ApplicationAttachment.application_id == app.id
).count()
if attachment_count >= 30:
raise HTTPException(status_code=400, detail="Maximum number of attachments (30) reached")
# Check total size limit (100MB)
existing_attachments = db.query(Attachment).join(
ApplicationAttachment, Attachment.id == ApplicationAttachment.attachment_id
).filter(ApplicationAttachment.application_id == app.id).all()
total_size = sum(att.size for att in existing_attachments)
file_content = await file.read()
file_size = len(file_content)
if total_size + file_size > 100 * 1024 * 1024: # 100MB
raise HTTPException(status_code=400, detail="Total attachment size would exceed 100MB limit")
# Create attachment
attachment = Attachment(
filename=file.filename,
content_type=file.content_type or "application/octet-stream",
size=file_size,
data=base64.b64encode(file_content).decode('utf-8')
)
db.add(attachment)
db.flush()
# Link to application
app_attachment = ApplicationAttachment(
application_id=app.id,
attachment_id=attachment.id
)
db.add(app_attachment)
db.commit()
return AttachmentUploadResponse(
attachment_id=attachment.id,
filename=attachment.filename,
size=attachment.size
)
@app.get("/applications/{pa_id}/attachments", response_model=List[AttachmentInfo])
def list_attachments(
pa_id: str,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""List all attachments for an application"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
if not auth:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Get attachments
attachments = db.query(Attachment).join(
ApplicationAttachment, Attachment.id == ApplicationAttachment.attachment_id
).filter(ApplicationAttachment.application_id == app.id).all()
return [
AttachmentInfo(
id=att.id,
filename=att.filename,
content_type=att.content_type,
size=att.size,
created_at=att.created_at
)
for att in attachments
]
@app.get("/applications/{pa_id}/attachments/{attachment_id}")
def download_attachment(
pa_id: str,
attachment_id: int,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Download a specific attachment"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
if not auth:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if attachment belongs to this application
app_attachment = db.query(ApplicationAttachment).filter(
ApplicationAttachment.application_id == app.id,
ApplicationAttachment.attachment_id == attachment_id
).first()
if not app_attachment:
raise HTTPException(status_code=404, detail="Attachment not found for this application")
# Get attachment
attachment = db.query(Attachment).filter(Attachment.id == attachment_id).first()
if not attachment:
raise HTTPException(status_code=404, detail="Attachment not found")
# Decode and return file
file_data = base64.b64decode(attachment.data)
return StreamingResponse(
io.BytesIO(file_data),
media_type=attachment.content_type,
headers={"Content-Disposition": f"attachment; filename={attachment.filename}"}
)
@app.delete("/applications/{pa_id}/attachments/{attachment_id}")
def delete_attachment(
pa_id: str,
attachment_id: int,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Delete a specific attachment"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key)
if not auth:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if attachment belongs to this application
app_attachment = db.query(ApplicationAttachment).filter(
ApplicationAttachment.application_id == app.id,
ApplicationAttachment.attachment_id == attachment_id
).first()
if not app_attachment:
raise HTTPException(status_code=404, detail="Attachment not found for this application")
# Delete link and attachment
db.delete(app_attachment)
attachment = db.query(Attachment).filter(Attachment.id == attachment_id).first()
if attachment:
db.delete(attachment)
db.commit()
return {"detail": "Attachment deleted successfully"}