stupa-pdf-api/backend/src/service_api.py

1736 lines
68 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# service_api.py
from __future__ import annotations
"""
FastAPI-Service für STUPA-PDF-Workflows.
Voraussetzung: vorhandene Module
- pdf_to_struct (stellt u.a. bereit: pdf_to_payload, map_form_to_payload, payload_to_model)
- pdf_filler (stellt u.a. bereit: fill_pdf)
.env (Beispiel):
MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_DB=stupa
MYSQL_USER=stupa
MYSQL_PASSWORD=secret
MASTER_KEY=supersecret_master
RATE_IP_PER_MIN=60
RATE_KEY_PER_MIN=30
QSM_TEMPLATE=assets/qsm.pdf # optional (falls abweichend)
VSM_TEMPLATE=assets/vsm.pdf
"""
import io
import os
import time
import json
import base64
import secrets
import hashlib
import tempfile
from datetime import datetime
from typing import Any, Dict, List, Optional, Union, Tuple
from dotenv import load_dotenv
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends, Query, Body, Header, Response
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
from sqlalchemy import (
create_engine, Column, Integer, String, Text, DateTime, JSON as SAJSON,
select, func, UniqueConstraint, Boolean, Index
)
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from sqlalchemy.exc import IntegrityError
from sqlalchemy import text as sql_text, text
import PyPDF2
from PyPDF2.errors import PdfReadError
# Eigene Module (aus deinem Projekt):
import pdf_to_struct as core # nutzt: pdf_to_payload, map_form_to_payload, payload_to_model, detect_variant
from pdf_filler import fill_pdf
# -------------------------------------------------------------
# ENV & DB
# -------------------------------------------------------------
load_dotenv()
MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306"))
MYSQL_DB = os.getenv("MYSQL_DB", "stupa")
MYSQL_USER = os.getenv("MYSQL_USER", "stupa")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "secret")
MASTER_KEY = os.getenv("MASTER_KEY", "")
RATE_IP_PER_MIN = int(os.getenv("RATE_IP_PER_MIN", "60"))
RATE_KEY_PER_MIN = int(os.getenv("RATE_KEY_PER_MIN", "30"))
DB_DSN = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}?charset=utf8mb4"
engine = create_engine(DB_DSN, pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)
Base = declarative_base()
# -------------------------------------------------------------
# DB-Modelle
# -------------------------------------------------------------
class Counter(Base):
__tablename__ = "counters"
# Jahr in voller Form (z.B. 2025)
year = Column(Integer, primary_key=True)
seq = Column(Integer, nullable=False, default=0)
class Application(Base):
__tablename__ = "applications"
id = Column(Integer, primary_key=True, autoincrement=True)
pa_id = Column(String(16), unique=True, index=True, nullable=False) # YY-NNNN
pa_key_salt = Column(String(64), nullable=False)
pa_key_hash = Column(String(128), nullable=False)
variant = Column(String(8), nullable=False) # QSM/VSM
status = Column(String(64), nullable=False, default="new")
# Gespeicherter Payload (ohne Klartext-Key)
payload_json = Column(SAJSON, nullable=False)
# optional: rohes Form-JSON (zur Nachvollziehbarkeit)
raw_form_json = Column(SAJSON, nullable=True)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow)
__table_args__ = (
UniqueConstraint("pa_id", name="uq_pa_id"),
)
class Attachment(Base):
__tablename__ = "attachments"
id = Column(Integer, primary_key=True, autoincrement=True)
filename = Column(String(255), nullable=False)
content_type = Column(String(100), nullable=False)
size = Column(Integer, nullable=False)
data = Column(LONGTEXT, nullable=False) # Base64 encoded blob
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
class ApplicationAttachment(Base):
__tablename__ = "application_attachments"
id = Column(Integer, primary_key=True, autoincrement=True)
application_id = Column(Integer, nullable=False, index=True)
attachment_id = Column(Integer, nullable=False, index=True)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
__table_args__ = (
UniqueConstraint("application_id", "attachment_id", name="uq_app_attachment"),
)
class ComparisonOffer(Base):
__tablename__ = "comparison_offers"
id = Column(Integer, primary_key=True, autoincrement=True)
application_id = Column(Integer, nullable=False, index=True)
cost_position_index = Column(Integer, nullable=False) # Index of the cost position in the array
supplier_name = Column(String(255), nullable=False)
amount = Column(Integer, nullable=False) # Amount in cents
description = Column(Text, nullable=True)
url = Column(String(500), nullable=True) # Optional URL to offer
attachment_id = Column(Integer, nullable=True) # Link to attachment
is_preferred = Column(Boolean, default=False, nullable=False) # Whether this is the preferred offer
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Create composite index for efficient queries
__table_args__ = (
UniqueConstraint('application_id', 'cost_position_index', 'supplier_name', name='uq_offer'),
Index('idx_app_cost', 'application_id', 'cost_position_index'),
)
class CostPositionJustification(Base):
__tablename__ = "cost_position_justifications"
id = Column(Integer, primary_key=True, autoincrement=True)
application_id = Column(Integer, nullable=False, index=True)
cost_position_index = Column(Integer, nullable=False)
no_offers_required = Column(Integer, nullable=False, default=0) # Boolean as INT
justification = Column(Text, nullable=True) # Required when no_offers_required is True
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
UniqueConstraint("application_id", "cost_position_index", name="uq_position_justification"),
)
def init_db():
Base.metadata.create_all(bind=engine)
# -------------------------------------------------------------
# Utils: Key-Hashing, ID-Vergabe, Rate-Limiting
# -------------------------------------------------------------
def _gen_pa_key() -> str:
# URL-sicher, ~32 Zeichen
return secrets.token_urlsafe(24)
def _hash_key(key: str, salt: Optional[str] = None) -> (str, str):
if not salt:
salt = secrets.token_hex(16) # 32 hex chars
# PBKDF2-HMAC-SHA256
dk = hashlib.pbkdf2_hmac("sha256", key.encode("utf-8"), bytes.fromhex(salt), 310000)
return salt, dk.hex()
def _verify_key(key: str, salt_hex: str, hash_hex: str) -> bool:
test = hashlib.pbkdf2_hmac("sha256", key.encode("utf-8"), bytes.fromhex(salt_hex), 310000).hex()
# timing-safe compare
return secrets.compare_digest(test, hash_hex)
def _alloc_next_id(db: Session) -> str:
now = datetime.utcnow()
year_full = now.year
yy = year_full % 100
# Counter row sperren/erstellen
row = db.execute(
select(Counter).where(Counter.year == year_full).with_for_update()
).scalar_one_or_none()
if not row:
row = Counter(year=year_full, seq=0)
db.add(row)
db.flush()
db.refresh(row)
row.seq += 1
db.flush()
db.refresh(row)
return f"{yy:02d}-{row.seq:04d}"
# sehr einfacher In-Memory-Rate-Limiter (pro Prozess)
# production: besser Redis verwenden
_RATE_BUCKETS: dict[str, List[float]] = {}
def _rate_limit(key: str, limit: int, window_sec: int = 60):
now = time.time()
bucket = _RATE_BUCKETS.setdefault(key, [])
# alte Einträge entfernen
while bucket and bucket[0] <= now - window_sec:
bucket.pop(0)
if len(bucket) >= limit:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
bucket.append(now)
# -------------------------------------------------------------
# Schemas (Pydantic)
# -------------------------------------------------------------
class CreateResponse(BaseModel):
pa_id: str
pa_key: str
variant: str
status: str = "new"
class UpdateResponse(BaseModel):
pa_id: str
variant: str
status: str
class SetStatusRequest(BaseModel):
status: str = Field(..., min_length=1, max_length=64)
class SearchQuery(BaseModel):
q: Optional[str] = None
status: Optional[str] = None
variant: Optional[str] = None
limit: int = 50
offset: int = 0
class AttachmentInfo(BaseModel):
id: int
filename: str
content_type: str
size: int
created_at: datetime
class AttachmentUploadResponse(BaseModel):
attachment_id: int
filename: str
size: int
class ComparisonOfferCreate(BaseModel):
supplier_name: str = Field(..., min_length=1, max_length=255)
amount: float = Field(..., gt=0) # Amount in EUR
description: Optional[str] = None
url: Optional[str] = Field(None, max_length=500, pattern="^https?://")
attachment_id: Optional[int] = None
is_preferred: Optional[bool] = False
class ComparisonOfferResponse(BaseModel):
id: int
supplier_name: str
amount: float # Amount in EUR
description: Optional[str]
url: Optional[str]
attachment_id: Optional[int]
is_preferred: bool
created_at: datetime
class CostPositionOffersResponse(BaseModel):
cost_position_index: int
offers: List[ComparisonOfferResponse]
no_offers_required: bool
justification: Optional[str]
class CostPositionJustificationRequest(BaseModel):
no_offers_required: bool
justification: Optional[str] = Field(None, min_length=10)
# -------------------------------------------------------------
# Auth-Helpers
# -------------------------------------------------------------
def _auth_from_request(
db: Session,
pa_id: Optional[str],
key_header: Optional[str],
key_query: Optional[str],
master_header: Optional[str],
x_forwarded_for: Optional[str] = None,
) -> dict:
# Ratelimit (IP-unabhängig auf Key/Master)
if master_header:
if not should_skip_rate_limit(x_forwarded_for):
_rate_limit(f"MASTER:{master_header}", RATE_KEY_PER_MIN)
if not MASTER_KEY or master_header != MASTER_KEY:
raise HTTPException(status_code=403, detail="Invalid master key")
return {"scope": "master"}
supplied = key_header or key_query
if pa_id is None:
# für Public Endpunkte (z.B. Create ohne ID) nicht nötig
return {"scope": "public"}
if not supplied:
raise HTTPException(status_code=401, detail="Missing key")
if not should_skip_rate_limit(x_forwarded_for):
_rate_limit(f"APPKEY:{pa_id}", RATE_KEY_PER_MIN)
app = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
if not _verify_key(supplied, app.pa_key_salt, app.pa_key_hash):
raise HTTPException(status_code=403, detail="Invalid application key")
return {"scope": "app", "app": app}
# -------------------------------------------------------------
# FastAPI Setup
# -------------------------------------------------------------
app = FastAPI(title="STUPA PDF API", version="1.0.0")
@app.on_event("startup")
def _startup():
init_db()
@app.get("/")
def root():
"""Root endpoint for health checks"""
return {"message": "STUPA PDF API"}
# Helper function to check if we should skip rate limiting
def should_skip_rate_limit(ip: str = "") -> bool:
"""Check if rate limiting should be skipped for localhost/Docker IPs"""
if not ip:
return False
return ip in ["127.0.0.1", "localhost", "172.26.0.1", "unknown"] or ip.startswith("172.")
# Globales IP-Ratelimit (sehr einfach) per Request
def rate_limit_ip(ip: str):
if not ip:
ip = "unknown"
# Skip rate limiting for localhost and Docker internal IPs
if should_skip_rate_limit(ip):
return
_rate_limit(f"IP:{ip}", RATE_IP_PER_MIN)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# -------------------------------------------------------------
# Hilfen: Payload-Erzeugung aus Upload
# -------------------------------------------------------------
def _payload_from_pdf_bytes(tmp_path: str, variant: Optional[str]) -> Dict[str, Any]:
try:
# pdf_to_payload liefert RootPayload-Dataclass
model = core.pdf_to_payload(tmp_path, variant=variant)
# asdict(model) in pdf_to_struct wird schon beim JSON-Export genutzt;
# wir brauchen das verschachtelte Objekt, das 'pa' enthält:
from dataclasses import asdict
return asdict(model)
except PdfReadError as e:
raise HTTPException(status_code=400, detail=f"PDF parse error: {e}")
def _payload_from_form_json(form_json: Dict[str, Any], variant: Optional[str]) -> Tuple[Dict[str, Any], str]:
# map_form_to_payload -> dict mit 'pa....'; danach in Model, dann wieder asdict
# Detect variant if AUTO or not specified
if variant is None or variant == "AUTO":
detected_variant = core.detect_variant(form_json)
print(f"DEBUG - Variant auto-detected as: {detected_variant}")
else:
detected_variant = variant
print(f"DEBUG - Variant explicitly set to: {detected_variant}")
mapped = core.map_form_to_payload(form_json, detected_variant)
model = core.payload_to_model(mapped)
from dataclasses import asdict
return asdict(model), detected_variant
def _inject_meta_for_render(payload: Dict[str, Any], pa_id: str, pa_key: Optional[str]) -> Dict[str, Any]:
# Wir injizieren Key/ID NUR für die PDF-Generierung in payload['pa'].*,
# speichern aber den Key nicht im DB-Payload.
p2 = json.loads(json.dumps(payload)) # deep copy
p2.setdefault("pa", {}).setdefault("meta", {})
p2["pa"]["meta"]["id"] = pa_id
if pa_key is not None:
p2["pa"]["meta"]["key"] = pa_key
# Calculate total amount from costs dynamically
project = p2.get("pa", {}).get("project", {}) or {}
costs = project.get("costs", [])
total_amount = 0.0
for cost in costs:
if isinstance(cost, dict) and "amountEur" in cost:
amount = cost.get("amountEur")
if amount is not None and isinstance(amount, (int, float)):
total_amount += float(amount)
# Set the calculated total
project.setdefault("totals", {})
project["totals"]["requestedAmountEur"] = total_amount
return p2
def _sanitize_payload_for_db(payload: Dict[str, Any]) -> Dict[str, Any]:
# Key aus persistentem Payload entfernen/neutralisieren
p2 = json.loads(json.dumps(payload))
meta = p2.setdefault("pa", {}).setdefault("meta", {})
if "key" in meta:
meta["key"] = None
# Remove calculated total from database storage
project = p2.get("pa", {}).get("project", {}) or {}
if "totals" in project and "requestedAmountEur" in project["totals"]:
del project["totals"]["requestedAmountEur"]
return p2
# -------------------------------------------------------------
# Endpunkte
# -------------------------------------------------------------
@app.post("/applications", response_model=CreateResponse, responses={200: {"content": {"application/pdf": {}}}})
def create_application(
response: Response,
variant: Optional[str] = Query(None, description="QSM|VSM|AUTO"),
return_format: str = Query("pdf", pattern="^(pdf|json)$"),
pdf: Optional[UploadFile] = File(None, description="PDF Upload (Alternative zu form_json)"),
form_json_b64: Optional[str] = Form(None, description="Base64-kodiertes Roh-Form-JSON (Alternative zu Datei)"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
# Rate-Limit nach IP
rate_limit_ip(x_forwarded_for or "")
# Debug: Log received variant parameter
print(f"DEBUG CREATE - Received variant parameter: '{variant}'")
# Payload beschaffen
payload: Dict[str, Any]
raw_form: Optional[Dict[str, Any]] = None
detected_variant: Optional[str] = None
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
if pdf:
tf.write(pdf.file.read())
tf.flush()
payload = _payload_from_pdf_bytes(tf.name, variant)
# For PDF, we'll detect variant from the PDF content/fields
elif form_json_b64:
try:
raw = base64.b64decode(form_json_b64)
raw_form = json.loads(raw.decode("utf-8"))
# Debug: Log raw form data for financing fields
financing_fields = {k: v for k, v in raw_form.items() if 'financing' in k.lower() or 'qsm' in k.lower() or 'vsm' in k.lower()}
print(f"DEBUG CREATE - Raw form financing fields: {json.dumps(financing_fields, indent=2, default=str)}")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
payload, detected_variant = _payload_from_form_json(raw_form, variant or "AUTO")
else:
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
# Prüfen, ob bereits pa.meta.id gesetzt ist → Create nur ohne ID
pa_meta = payload.get("pa", {}).get("meta", {}) or {}
if pa_meta.get("id"):
raise HTTPException(status_code=400, detail="pa-id already set; use update endpoint")
# Debug: Log financing fields specifically
pa_data = payload.get("pa", {})
project_data = pa_data.get("project", {})
financing_data = project_data.get("financing", {})
print(f"DEBUG CREATE - Financing data: {json.dumps(financing_data, indent=2, default=str)}")
# Erzeugen in TX
try:
with db.begin():
pa_id = _alloc_next_id(db)
pa_key_plain = _gen_pa_key()
salt, key_hash = _hash_key(pa_key_plain)
# Variante bestimmen
# If variant was explicitly provided and not AUTO, use it
if variant and variant.upper() not in ["AUTO", "COMMON"]:
detected = variant.upper()
# If detected_variant was set from form_json processing, use it
elif detected_variant:
detected = detected_variant.upper()
# If PDF was uploaded, detect from payload structure
elif pdf:
# Look for variant-specific fields in the payload
pa_data = payload.get("pa", {})
# Check for QSM fields in the correct location
project_data = pa_data.get("project", {})
financing_data = project_data.get("financing", {})
# Check which financing type has actual content (not just empty structure)
qsm_data = financing_data.get("qsm", {}) or {}
vsm_data = financing_data.get("vsm", {}) or {}
# QSM has 'code' and 'flags' fields when filled
has_qsm_content = bool(qsm_data.get("code") or qsm_data.get("flags"))
# VSM has different structure (check if actually filled)
has_vsm_content = bool(vsm_data and any(vsm_data.values()))
# Also check institution fields (VSM-specific)
# Note: Institution name alone doesn't determine variant, as QSM can also have institution name
institution_data = pa_data.get("applicant", {}).get("institution", {}) or {}
has_institution_type = bool(institution_data.get("type")) # Only type is VSM-specific
# Determine variant based on which fields have actual content
# Prioritize financing fields over institution fields
if has_qsm_content and not has_vsm_content:
detected = "QSM"
elif has_vsm_content:
detected = "VSM"
elif has_institution_type:
# Only consider institution type, not name
detected = "VSM"
elif has_qsm_content:
# If only QSM fields are filled, it's QSM
detected = "QSM"
else:
detected = "VSM"
else:
# Default to VSM
detected = "VSM"
# Map COMMON to VSM for backwards compatibility
if detected == "COMMON":
detected = "VSM"
# Render-Payload mit ID/Key
render = _inject_meta_for_render(payload, pa_id, pa_key_plain)
# PDF erzeugen
pdf_bytes = fill_pdf(render, "QSM" if detected == "QSM" else "VSM")
# Validate PDF generation
if not pdf_bytes or len(pdf_bytes) == 0:
raise HTTPException(status_code=500, detail="Failed to generate PDF - empty result")
if len(pdf_bytes) < 1000: # PDF should be at least 1KB
raise HTTPException(status_code=500, detail="PDF generation resulted in suspiciously small file")
# DB-Payload ohne Key
store_payload = _sanitize_payload_for_db(payload)
app_row = Application(
pa_id=pa_id,
pa_key_salt=salt,
pa_key_hash=key_hash,
variant=detected,
status="new",
payload_json=store_payload,
raw_form_json=raw_form,
)
db.add(app_row)
except IntegrityError:
# sehr seltene Race-Condition bei ID erneut versuchen
raise HTTPException(status_code=409, detail="ID allocation conflict; retry")
# Antwort
if return_format == "json":
return CreateResponse(pa_id=pa_id, pa_key=pa_key_plain, variant=detected, status="new")
# PDF zurückgeben, Key in Header
response.headers["X-PA-ID"] = pa_id
response.headers["X-PA-KEY"] = pa_key_plain
headers = {
"Content-Disposition": f"attachment; filename=antrag-{pa_id}.pdf"
}
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf", headers=headers)
@app.put("/applications/{pa_id}", response_model=UpdateResponse, responses={200: {"content": {"application/pdf": {}}}})
def update_application(
pa_id: str,
response: Response,
return_format: str = Query("pdf", pattern="^(pdf|json)$"),
variant: Optional[str] = Query(None),
pdf: Optional[UploadFile] = File(None),
form_json_b64: Optional[str] = Form(None),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key, None, x_master_key, x_forwarded_for)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
# Check if application is in final states (in-review, approved, rejected) and user is not admin
if app_row.status in ["in-review", "approved", "rejected"] and auth["scope"] != "master":
status_messages = {
"in-review": "Cannot update application while in review",
"approved": "Cannot update approved application",
"rejected": "Cannot update rejected application"
}
raise HTTPException(status_code=403, detail=status_messages[app_row.status])
# Payload beschaffen
payload: Dict[str, Any]
raw_form: Optional[Dict[str, Any]] = None
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
if pdf:
tf.write(pdf.file.read())
tf.flush()
payload = _payload_from_pdf_bytes(tf.name, variant or app_row.variant)
elif form_json_b64:
try:
raw = base64.b64decode(form_json_b64)
raw_form = json.loads(raw.decode("utf-8"))
# Debug: Log raw form data for financing fields
financing_fields = {k: v for k, v in raw_form.items() if 'financing' in k.lower() or 'qsm' in k.lower() or 'vsm' in k.lower()}
print(f"DEBUG UPDATE - Raw form financing fields: {json.dumps(financing_fields, indent=2, default=str)}")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
payload, _ = _payload_from_form_json(raw_form, variant or app_row.variant)
else:
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
# Debug: Log financing fields specifically
pa_data = payload.get("pa", {})
project_data = pa_data.get("project", {})
financing_data = project_data.get("financing", {})
print(f"DEBUG UPDATE - Financing data: {json.dumps(financing_data, indent=2, default=str)}")
# Immer mit bestehender ID, Key NICHT in DB-Payload speichern
render = _inject_meta_for_render(payload, app_row.pa_id, None) # Key nicht neu ausgeben
store_payload = _sanitize_payload_for_db(payload)
# PDF rendern mit vorhandener Variante
chosen_variant = (variant or app_row.variant).upper()
pdf_bytes = fill_pdf(render, "QSM" if chosen_variant == "QSM" else "VSM")
# Validate PDF generation
if not pdf_bytes or len(pdf_bytes) == 0:
raise HTTPException(status_code=500, detail="Failed to generate PDF - empty result")
if len(pdf_bytes) < 1000: # PDF should be at least 1KB
raise HTTPException(status_code=500, detail="PDF generation resulted in suspiciously small file")
try:
app_row.variant = chosen_variant
app_row.updated_at = datetime.utcnow()
app_row.payload_json = store_payload
if raw_form is not None:
app_row.raw_form_json = raw_form
db.add(app_row)
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to update application: {str(e)}")
if return_format == "json":
return UpdateResponse(pa_id=app_row.pa_id, variant=app_row.variant, status=app_row.status)
response.headers["X-PA-ID"] = app_row.pa_id
headers = {
"Content-Disposition": f"attachment; filename=antrag-{app_row.pa_id}.pdf"
}
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf", headers=headers)
@app.get("/applications/search")
def search_applications(
q: Optional[str] = Query(None, description="Volltext über payload_json (einfach)"),
status: Optional[str] = Query(None),
variant: Optional[str] = Query(None),
amount_min: Optional[float] = Query(None, description="Mindestbetrag"),
amount_max: Optional[float] = Query(None, description="Höchstbetrag"),
date_from: Optional[str] = Query(None, description="Erstellungsdatum ab (ISO format)"),
date_to: Optional[str] = Query(None, description="Erstellungsdatum bis (ISO format)"),
created_by: Optional[str] = Query(None, description="Ersteller (E-Mail)"),
has_attachments: Optional[bool] = Query(None, description="Mit/ohne Anhänge"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
order_by: Optional[str] = Query("created_at", description="Sort by: pa_id, project_name, variant, status, total_amount, created_at, updated_at"),
order: Optional[str] = Query("desc", description="Sort order: asc, desc"),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
_ = _auth_from_request(db, None, None, None, x_master_key, x_forwarded_for)
# sehr einfache Suche (MySQL JSON_EXTRACT/LIKE); für produktion auf FTS migrieren
base_sql = """
SELECT a.pa_id, a.variant, a.status, a.created_at, a.updated_at,
JSON_UNQUOTE(JSON_EXTRACT(a.payload_json, '$.pa.project.name')) as project_name,
COALESCE(att_count.attachment_count, 0) as attachment_count
FROM applications a
LEFT JOIN (
SELECT aa.application_id, COUNT(*) as attachment_count
FROM application_attachments aa
GROUP BY aa.application_id
) att_count ON a.id = att_count.application_id
WHERE 1=1"""
params = {}
if status:
base_sql += " AND a.status=:status"
params["status"] = status
if variant:
base_sql += " AND a.variant=:variant"
params["variant"] = variant.upper()
if q:
# naive Suche im JSON
base_sql += " AND JSON_SEARCH(JSON_EXTRACT(a.payload_json, '$'), 'all', :q) IS NOT NULL"
params["q"] = f"%{q}%"
# Date range filters
if date_from:
try:
from datetime import datetime
# Handle YYYY-MM-DD format from frontend
if len(date_from) == 10 and '-' in date_from:
start_date = datetime.strptime(date_from, "%Y-%m-%d")
base_sql += " AND a.created_at >= :date_from"
params["date_from"] = start_date.strftime("%Y-%m-%d 00:00:00")
else:
base_sql += " AND a.created_at >= :date_from"
params["date_from"] = date_from
except:
base_sql += " AND a.created_at >= :date_from"
params["date_from"] = date_from
if date_to:
try:
from datetime import datetime
# Handle YYYY-MM-DD format from frontend
if len(date_to) == 10 and '-' in date_to:
end_date = datetime.strptime(date_to, "%Y-%m-%d")
# Set to end of day (23:59:59)
base_sql += " AND a.created_at <= :date_to"
params["date_to"] = end_date.strftime("%Y-%m-%d 23:59:59")
else:
# Try ISO format with time adjustment
end_date = datetime.fromisoformat(date_to.replace('Z', '+00:00'))
end_date = end_date.replace(hour=23, minute=59, second=59, microsecond=999999)
base_sql += " AND a.created_at <= :date_to"
params["date_to"] = end_date.isoformat()
except:
# Fallback to original behavior if date parsing fails
base_sql += " AND a.created_at <= :date_to"
params["date_to"] = date_to
# Created by filter (search in applicant email) - fuzzy search
if created_by:
base_sql += " AND LOWER(JSON_UNQUOTE(JSON_EXTRACT(a.payload_json, '$.pa.applicant.contact.email'))) LIKE LOWER(:created_by)"
params["created_by"] = f"%{created_by}%"
# Has attachments filter
if has_attachments is not None:
if has_attachments:
base_sql += " AND att_count.attachment_count > 0"
else:
base_sql += " AND (att_count.attachment_count IS NULL OR att_count.attachment_count = 0)"
# Add sorting
valid_db_sort_fields = {
"pa_id": "pa_id",
"variant": "variant",
"status": "status",
"created_at": "created_at",
"updated_at": "updated_at",
"project_name": "project_name"
}
db_sort_field = valid_db_sort_fields.get(order_by, "created_at")
sort_order = order.upper() if order and order.upper() in ['ASC', 'DESC'] else 'DESC'
base_sql += f" ORDER BY {db_sort_field} {sort_order} LIMIT :limit OFFSET :offset"
params["limit"] = limit
params["offset"] = offset
rows = db.execute(sql_text(base_sql), params).all()
# Calculate total_amount and apply post-processing filters
result = []
for r in rows:
# Extract basic info
pa_id = r[0]
variant = r[1]
status = r[2]
created_at = r[3]
updated_at = r[4]
project_name = r[5] if r[5] else None
has_attachments_count = r[6] or 0
# Calculate total_amount
total_amount = 0.0
try:
# Get the full application to calculate total and check attachments
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if app_row and app_row.payload_json:
payload = json.loads(app_row.payload_json) if isinstance(app_row.payload_json, str) else app_row.payload_json
project = payload.get("pa", {}).get("project", {})
# Calculate total from costs
costs = project.get("costs", [])
for cost in costs:
if isinstance(cost, dict) and "amountEur" in cost:
amount = cost.get("amountEur")
if amount is not None and isinstance(amount, (int, float)):
total_amount += float(amount)
except:
pass
# Apply amount filters
if amount_min is not None and total_amount < amount_min:
continue
if amount_max is not None and total_amount > amount_max:
continue
# Add to results if all filters pass
result.append({
"pa_id": pa_id,
"variant": variant,
"status": status,
"created_at": created_at.isoformat(),
"updated_at": updated_at.isoformat(),
"project_name": project_name,
"total_amount": total_amount
})
# Handle sorting for total_amount which requires post-processing
if order_by == "total_amount":
reverse_order = (order.lower() == 'desc') if order else True
result.sort(key=lambda x: x["total_amount"] or 0, reverse=reverse_order)
return result
@app.get("/applications/{pa_id}")
def get_application(
pa_id: str,
format: str = Query("json", pattern="^(json|pdf)$"),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None, description="Alternative zum Header für den App-Key"),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
if format == "pdf":
# Für Anzeige PDF neu rendern (ohne Key)
render = _inject_meta_for_render(app_row.payload_json, app_row.pa_id, None)
pdf_bytes = fill_pdf(render, "QSM" if app_row.variant == "QSM" else "VSM")
# Validate PDF generation
if not pdf_bytes or len(pdf_bytes) == 0:
raise HTTPException(status_code=500, detail="Failed to generate PDF - empty result")
if len(pdf_bytes) < 1000: # PDF should be at least 1KB
raise HTTPException(status_code=500, detail="PDF generation resulted in suspiciously small file")
headers = {
"Content-Disposition": f"attachment; filename=antrag-{app_row.pa_id}.pdf"
}
return StreamingResponse(io.BytesIO(pdf_bytes), media_type="application/pdf", headers=headers)
# Debug: Log financing data from database
if app_row.payload_json:
pa_data = app_row.payload_json.get("pa", {})
project_data = pa_data.get("project", {})
financing_data = project_data.get("financing", {})
print(f"DEBUG GET - Financing data from DB: {json.dumps(financing_data, indent=2, default=str)}")
# Sonst JSON
return {
"pa_id": app_row.pa_id,
"variant": "VSM" if app_row.variant == "COMMON" else app_row.variant,
"status": app_row.status,
"payload": app_row.payload_json,
"created_at": app_row.created_at.isoformat(),
"updated_at": app_row.updated_at.isoformat()
}
@app.get("/applications")
def list_applications(
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
status: Optional[str] = Query(None),
variant: Optional[str] = Query(None),
order_by: Optional[str] = Query("created_at", description="Sort by: pa_id, project_name, variant, status, total_amount, created_at, updated_at"),
order: Optional[str] = Query("desc", description="Sort order: asc, desc"),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
pa_id: Optional[str] = Query(None, description="Mit Key: nur diesen Antrag anzeigen"),
key: Optional[str] = Query(None),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
# Mit Master-Key: alle listen/filtern
if x_master_key:
_ = _auth_from_request(db, None, None, None, x_master_key)
# Validate and map sort parameters
valid_sort_fields = {
"pa_id": Application.pa_id,
"variant": Application.variant,
"status": Application.status,
"created_at": Application.created_at,
"updated_at": Application.updated_at
}
sort_field = valid_sort_fields.get(order_by, Application.created_at)
sort_order = order.lower() if order and order.lower() in ['asc', 'desc'] else 'desc'
if sort_order == 'desc':
q = select(Application).order_by(sort_field.desc())
else:
q = select(Application).order_by(sort_field.asc())
if status:
q = q.where(Application.status == status)
if variant:
q = q.where(Application.variant == variant.upper())
q = q.limit(limit).offset(offset)
rows = db.execute(q).scalars().all()
result = []
for r in rows:
# Extract project name and calculate total amount from payload if available
project_name = ""
total_amount = 0.0
if r.payload_json:
try:
payload = json.loads(r.payload_json) if isinstance(r.payload_json, str) else r.payload_json
project = payload.get("pa", {}).get("project", {})
project_name = project.get("name", "")
# Calculate total from costs
costs = project.get("costs", [])
for cost in costs:
if isinstance(cost, dict) and "amountEur" in cost:
amount = cost.get("amountEur")
if amount is not None and isinstance(amount, (int, float)):
total_amount += float(amount)
except:
pass
result.append({
"pa_id": r.pa_id,
"variant": "VSM" if r.variant == "COMMON" else r.variant,
"status": r.status,
"project_name": project_name,
"total_amount": total_amount,
"created_at": r.created_at.isoformat(),
"updated_at": r.updated_at.isoformat()
})
# Handle sorting for fields that require post-processing (project_name, total_amount)
if order_by in ["project_name", "total_amount"]:
reverse_order = (order.lower() == 'desc') if order else True
if order_by == "project_name":
result.sort(key=lambda x: (x["project_name"] or "").lower(), reverse=reverse_order)
elif order_by == "total_amount":
result.sort(key=lambda x: x["total_amount"] or 0, reverse=reverse_order)
return result
# Ohne Master: nur eigenen Antrag (pa_id + key erforderlich)
if not pa_id:
raise HTTPException(status_code=400, detail="pa_id required without master key")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, None)
app_row: Application = auth.get("app")
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
# Extract project name and calculate total amount from payload if available
project_name = ""
total_amount = 0.0
if app_row.payload_json:
try:
payload = json.loads(app_row.payload_json) if isinstance(app_row.payload_json, str) else app_row.payload_json
project = payload.get("pa", {}).get("project", {})
project_name = project.get("name", "")
# Calculate total from costs
costs = project.get("costs", [])
for cost in costs:
if isinstance(cost, dict) and "amountEur" in cost:
amount = cost.get("amountEur")
if amount is not None and isinstance(amount, (int, float)):
total_amount += float(amount)
except:
pass
# Note: Sorting is not really applicable for single application return
# but we keep the parameters for API consistency
return [{
"pa_id": app_row.pa_id,
"variant": "VSM" if app_row.variant == "COMMON" else app_row.variant,
"status": app_row.status,
"project_name": project_name,
"total_amount": total_amount,
"created_at": app_row.created_at.isoformat(),
"updated_at": app_row.updated_at.isoformat()
}]
@app.post("/applications/{pa_id}/status")
def set_status(
pa_id: str,
req: SetStatusRequest,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
try:
app_row.status = req.status
app_row.updated_at = datetime.utcnow()
db.add(app_row)
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to update status: {str(e)}")
return {"pa_id": app_row.pa_id, "status": app_row.status}
@app.delete("/applications/{pa_id}")
def delete_application(
pa_id: str,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
app_row: Application = auth.get("app")
if not app_row and auth["scope"] != "master":
raise HTTPException(status_code=404, detail="Application not found")
if auth["scope"] == "master" and not app_row:
app_row = db.execute(select(Application).where(Application.pa_id == pa_id)).scalar_one_or_none()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
try:
db.delete(app_row)
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to delete application: {str(e)}")
return {"deleted": True, "pa_id": pa_id}
@app.post("/applications/{pa_id}/reset-credentials")
def reset_credentials(
pa_id: str,
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Reset access credentials for an application (Admin only)"""
rate_limit_ip(x_forwarded_for or "")
# Master-Key prüfen
if not x_master_key or x_master_key != MASTER_KEY:
raise HTTPException(status_code=403, detail="Invalid or missing master key")
# Antrag suchen
app_row = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app_row:
raise HTTPException(status_code=404, detail="Application not found")
# Generate new credentials
pa_key_plain = _gen_pa_key()
salt, key_hash = _hash_key(pa_key_plain)
# Update the application
app_row.pa_key_salt = salt
app_row.pa_key_hash = key_hash
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to reset credentials: {str(e)}")
return {
"pa_id": pa_id,
"pa_key": pa_key_plain,
"message": "Credentials reset successfully"
}
# -------------------------------------------------------------
# Bulk Operations Endpoints
# -------------------------------------------------------------
class BulkOperationRequest(BaseModel):
pa_ids: List[str]
operation: str # "delete", "approve", "reject", "set_in_review", "set_new"
@app.post("/admin/applications/bulk")
def bulk_operation(
request: BulkOperationRequest,
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Perform bulk operations on applications (Admin only)"""
rate_limit_ip(x_forwarded_for or "")
_ = _auth_from_request(db, None, None, None, x_master_key, x_forwarded_for)
if not request.pa_ids:
raise HTTPException(status_code=400, detail="No application IDs provided")
if request.operation not in ["delete", "approve", "reject", "set_in_review", "set_new"]:
raise HTTPException(status_code=400, detail="Invalid operation")
results = {"success": [], "failed": []}
for pa_id in request.pa_ids:
try:
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
results["failed"].append({"pa_id": pa_id, "error": "Application not found"})
continue
if request.operation == "delete":
# Delete related data first
db.execute(text("DELETE FROM application_attachments WHERE application_id = :app_id"), {"app_id": app.id})
db.execute(text("DELETE FROM comparison_offers WHERE application_id = :app_id"), {"app_id": app.id})
db.execute(text("DELETE FROM cost_position_justifications WHERE application_id = :app_id"), {"app_id": app.id})
db.delete(app)
elif request.operation == "approve":
app.status = "approved"
elif request.operation == "reject":
app.status = "rejected"
elif request.operation == "set_in_review":
app.status = "in-review"
elif request.operation == "set_new":
app.status = "new"
results["success"].append(pa_id)
except Exception as e:
results["failed"].append({"pa_id": pa_id, "error": str(e)})
db.commit()
return results
# Attachment Endpoints
# -------------------------------------------------------------
@app.post("/applications/{pa_id}/attachments", response_model=AttachmentUploadResponse)
async def upload_attachment(
pa_id: str,
file: UploadFile = File(...),
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Upload an attachment for an application"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
if not auth:
raise HTTPException(status_code=401, detail="Unauthorized")
# Check if application exists
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if application is in final states and user is not admin
if app.status in ["in-review", "approved", "rejected"] and auth["scope"] != "master":
status_messages = {
"in-review": "Cannot upload attachments while application is in review",
"approved": "Cannot upload attachments to approved application",
"rejected": "Cannot upload attachments to rejected application"
}
raise HTTPException(status_code=403, detail=status_messages[app.status])
# Check attachment count limit (30 attachments max)
attachment_count = db.query(ApplicationAttachment).filter(
ApplicationAttachment.application_id == app.id
).count()
if attachment_count >= 30:
raise HTTPException(status_code=400, detail="Maximum number of attachments (30) reached")
# Check total size limit (100MB)
existing_attachments = db.query(Attachment).join(
ApplicationAttachment, Attachment.id == ApplicationAttachment.attachment_id
).filter(ApplicationAttachment.application_id == app.id).all()
total_size = sum(att.size for att in existing_attachments)
file_content = await file.read()
file_size = len(file_content)
if total_size + file_size > 100 * 1024 * 1024: # 100MB
raise HTTPException(status_code=400, detail="Total attachment size would exceed 100MB limit")
# Create attachment
attachment = Attachment(
filename=file.filename,
content_type=file.content_type or "application/octet-stream",
size=file_size,
data=base64.b64encode(file_content).decode('utf-8')
)
db.add(attachment)
db.flush()
# Link to application
app_attachment = ApplicationAttachment(
application_id=app.id,
attachment_id=attachment.id
)
db.add(app_attachment)
db.commit()
return AttachmentUploadResponse(
attachment_id=attachment.id,
filename=attachment.filename,
size=attachment.size
)
@app.get("/applications/{pa_id}/attachments", response_model=List[AttachmentInfo])
def list_attachments(
pa_id: str,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""List all attachments for an application"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
if not auth:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Get attachments
attachments = db.query(Attachment).join(
ApplicationAttachment, Attachment.id == ApplicationAttachment.attachment_id
).filter(ApplicationAttachment.application_id == app.id).all()
return [
AttachmentInfo(
id=att.id,
filename=att.filename,
content_type=att.content_type,
size=att.size,
created_at=att.created_at
)
for att in attachments
]
@app.get("/applications/{pa_id}/attachments/{attachment_id}")
def download_attachment(
pa_id: str,
attachment_id: int,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Download a specific attachment"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
if not auth:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if attachment belongs to this application
app_attachment = db.query(ApplicationAttachment).filter(
ApplicationAttachment.application_id == app.id,
ApplicationAttachment.attachment_id == attachment_id
).first()
if not app_attachment:
raise HTTPException(status_code=404, detail="Attachment not found for this application")
# Get attachment
attachment = db.query(Attachment).filter(Attachment.id == attachment_id).first()
if not attachment:
raise HTTPException(status_code=404, detail="Attachment not found")
# Decode and return file
file_data = base64.b64decode(attachment.data)
return StreamingResponse(
io.BytesIO(file_data),
media_type=attachment.content_type,
headers={"Content-Disposition": f"attachment; filename={attachment.filename}"}
)
@app.delete("/applications/{pa_id}/attachments/{attachment_id}")
def delete_attachment(
pa_id: str,
attachment_id: int,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Delete a specific attachment"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
if not auth:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if application is in final states and user is not admin
if app.status in ["in-review", "approved", "rejected"] and auth["scope"] != "master":
status_messages = {
"in-review": "Cannot delete attachments while application is in review",
"approved": "Cannot delete attachments from approved application",
"rejected": "Cannot delete attachments from rejected application"
}
raise HTTPException(status_code=403, detail=status_messages[app.status])
# Check if attachment belongs to this application
app_attachment = db.query(ApplicationAttachment).filter(
ApplicationAttachment.application_id == app.id,
ApplicationAttachment.attachment_id == attachment_id
).first()
if not app_attachment:
raise HTTPException(status_code=404, detail="Attachment not found for this application")
# Delete link and attachment
db.delete(app_attachment)
attachment = db.query(Attachment).filter(Attachment.id == attachment_id).first()
if attachment:
db.delete(attachment)
db.commit()
return {"detail": "Attachment deleted successfully"}
# -------------------------------------------------------------
# Comparison Offers Endpoints
# -------------------------------------------------------------
@app.post("/applications/{pa_id}/costs/{cost_position_index}/offers", response_model=ComparisonOfferResponse)
async def create_comparison_offer(
pa_id: str,
cost_position_index: int,
offer: ComparisonOfferCreate,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Create a comparison offer for a cost position"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
# Check if application exists
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if application is in final states and user is not admin
if app.status in ["in-review", "approved", "rejected"] and auth["scope"] != "master":
status_messages = {
"in-review": "Cannot create comparison offers while application is in review",
"approved": "Cannot create comparison offers for approved application",
"rejected": "Cannot create comparison offers for rejected application"
}
raise HTTPException(status_code=403, detail=status_messages[app.status])
# Validate cost position index
payload = app.payload_json
costs = payload.get("pa", {}).get("project", {}).get("costs", [])
# print(f"DEBUG: Application {pa_id} has {len(costs)} cost positions, requesting index {cost_position_index}")
# print(f"DEBUG: Costs array: {costs}")
if cost_position_index < 0 or cost_position_index >= len(costs):
raise HTTPException(status_code=404, detail=f"Cost position not found: index {cost_position_index} out of range (0-{len(costs)-1})")
# Validate that either URL or attachment is provided
if not offer.url and not offer.attachment_id:
raise HTTPException(status_code=400, detail="Either URL or attachment is required")
# Validate URL format if provided
if offer.url:
import re
if not re.match(r'^https?://', offer.url):
raise HTTPException(status_code=400, detail="URL must start with http:// or https://")
# Check if attachment exists and belongs to this application
if offer.attachment_id:
app_attachment = db.query(ApplicationAttachment).filter(
ApplicationAttachment.application_id == app.id,
ApplicationAttachment.attachment_id == offer.attachment_id
).first()
if not app_attachment:
raise HTTPException(status_code=400, detail="Attachment not found or doesn't belong to this application")
# Create comparison offer
comparison_offer = ComparisonOffer(
application_id=app.id,
cost_position_index=cost_position_index,
supplier_name=offer.supplier_name,
amount=int(offer.amount * 100), # Convert EUR to cents
description=offer.description,
url=offer.url,
attachment_id=offer.attachment_id,
is_preferred=offer.is_preferred or False
)
try:
db.add(comparison_offer)
db.commit()
db.refresh(comparison_offer)
except IntegrityError as e:
db.rollback()
if "uq_offer_supplier" in str(e):
raise HTTPException(
status_code=409,
detail=f"Ein Angebot von '{offer.supplier_name}' existiert bereits für diese Kostenposition"
)
raise HTTPException(status_code=400, detail="Datenbankfehler beim Speichern des Angebots")
return ComparisonOfferResponse(
id=comparison_offer.id,
supplier_name=comparison_offer.supplier_name,
amount=comparison_offer.amount / 100, # Convert cents to EUR
description=comparison_offer.description,
url=comparison_offer.url,
attachment_id=comparison_offer.attachment_id,
created_at=comparison_offer.created_at
)
@app.get("/applications/{pa_id}/costs/{cost_position_index}/offers", response_model=CostPositionOffersResponse)
def get_cost_position_offers(
pa_id: str,
cost_position_index: int,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Get all comparison offers for a cost position"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Validate cost position index
payload = app.payload_json
costs = payload.get("pa", {}).get("project", {}).get("costs", [])
# print(f"DEBUG GET: Application {pa_id} has {len(costs)} cost positions, requesting index {cost_position_index}")
if cost_position_index < 0 or cost_position_index >= len(costs):
raise HTTPException(status_code=404, detail=f"Cost position not found: index {cost_position_index} out of range (0-{len(costs)-1})")
# Get offers
offers = db.query(ComparisonOffer).filter(
ComparisonOffer.application_id == app.id,
ComparisonOffer.cost_position_index == cost_position_index
).all()
# Get justification
justification = db.query(CostPositionJustification).filter(
CostPositionJustification.application_id == app.id,
CostPositionJustification.cost_position_index == cost_position_index
).first()
return CostPositionOffersResponse(
cost_position_index=cost_position_index,
offers=[
ComparisonOfferResponse(
id=offer.id,
supplier_name=offer.supplier_name,
amount=offer.amount / 100, # Convert cents to EUR
description=offer.description,
url=offer.url,
attachment_id=offer.attachment_id,
is_preferred=offer.is_preferred,
created_at=offer.created_at
)
for offer in offers
],
no_offers_required=bool(justification and justification.no_offers_required),
justification=justification.justification if justification else None
)
@app.delete("/applications/{pa_id}/costs/{cost_position_index}/offers/{offer_id}")
def delete_comparison_offer(
pa_id: str,
cost_position_index: int,
offer_id: int,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Delete a comparison offer"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if application is in final states and user is not admin
if app.status in ["in-review", "approved", "rejected"] and auth["scope"] != "master":
status_messages = {
"in-review": "Cannot delete comparison offers while application is in review",
"approved": "Cannot delete comparison offers from approved application",
"rejected": "Cannot delete comparison offers from rejected application"
}
raise HTTPException(status_code=403, detail=status_messages[app.status])
# Get and delete offer
offer = db.query(ComparisonOffer).filter(
ComparisonOffer.id == offer_id,
ComparisonOffer.application_id == app.id,
ComparisonOffer.cost_position_index == cost_position_index
).first()
if not offer:
raise HTTPException(status_code=404, detail="Offer not found")
db.delete(offer)
db.commit()
return {"detail": "Offer deleted successfully"}
@app.put("/applications/{pa_id}/costs/{cost_position_index}/justification")
def update_cost_position_justification(
pa_id: str,
cost_position_index: int,
request: CostPositionJustificationRequest,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Update justification for a cost position without comparison offers"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if application is in final states and user is not admin
if app.status in ["in-review", "approved", "rejected"] and auth["scope"] != "master":
status_messages = {
"in-review": "Cannot update cost position justification while application is in review",
"approved": "Cannot update cost position justification for approved application",
"rejected": "Cannot update cost position justification for rejected application"
}
raise HTTPException(status_code=403, detail=status_messages[app.status])
# Validate cost position index
payload = app.payload_json
costs = payload.get("pa", {}).get("project", {}).get("costs", [])
if cost_position_index < 0 or cost_position_index >= len(costs):
raise HTTPException(status_code=404, detail="Cost position not found")
# Validate justification
if request.no_offers_required and not request.justification:
raise HTTPException(status_code=400, detail="Justification is required when no offers are needed")
# Get or create justification
justification = db.query(CostPositionJustification).filter(
CostPositionJustification.application_id == app.id,
CostPositionJustification.cost_position_index == cost_position_index
).first()
if justification:
justification.no_offers_required = 1 if request.no_offers_required else 0
justification.justification = request.justification
justification.updated_at = datetime.utcnow()
else:
justification = CostPositionJustification(
application_id=app.id,
cost_position_index=cost_position_index,
no_offers_required=1 if request.no_offers_required else 0,
justification=request.justification
)
db.add(justification)
db.commit()
return {"detail": "Justification updated successfully"}
@app.put("/applications/{pa_id}/costs/{cost_position_index}/offers/{offer_id}/preferred")
def set_preferred_offer(
pa_id: str,
cost_position_index: int,
offer_id: int,
x_pa_key: Optional[str] = Header(None, alias="X-PA-KEY"),
key: Optional[str] = Query(None),
x_master_key: Optional[str] = Header(None, alias="X-MASTER-KEY"),
x_forwarded_for: Optional[str] = Header(None),
db: Session = Depends(get_db),
):
"""Set an offer as preferred for a cost position"""
rate_limit_ip(x_forwarded_for or "")
auth = _auth_from_request(db, pa_id, x_pa_key or key, None, x_master_key, x_forwarded_for)
# Get application
app = db.query(Application).filter(Application.pa_id == pa_id).first()
if not app:
raise HTTPException(status_code=404, detail="Application not found")
# Check if application is in final states and user is not admin
if app.status in ["in-review", "approved", "rejected"] and auth["scope"] != "master":
status_messages = {
"in-review": "Cannot set preferred offer while application is in review",
"approved": "Cannot set preferred offer for approved application",
"rejected": "Cannot set preferred offer for rejected application"
}
raise HTTPException(status_code=403, detail=status_messages[app.status])
# Validate cost position index
payload = app.payload_json
costs = payload.get("pa", {}).get("project", {}).get("costs", [])
if cost_position_index < 0 or cost_position_index >= len(costs):
raise HTTPException(status_code=404, detail="Cost position not found")
# Get the offer to set as preferred
offer = db.query(ComparisonOffer).filter(
ComparisonOffer.id == offer_id,
ComparisonOffer.application_id == app.id,
ComparisonOffer.cost_position_index == cost_position_index
).first()
if not offer:
raise HTTPException(status_code=404, detail="Offer not found")
# Clear any existing preferred offer for this cost position
db.query(ComparisonOffer).filter(
ComparisonOffer.application_id == app.id,
ComparisonOffer.cost_position_index == cost_position_index,
ComparisonOffer.is_preferred == True
).update({"is_preferred": False})
# Set this offer as preferred
offer.is_preferred = True
db.commit()
return {"detail": "Preferred offer updated successfully"}