A few bug fixes regarding auto detection

This commit is contained in:
Frederik Beimgraben 2025-09-01 15:59:34 +02:00
parent 3ddb091d1e
commit ebc7e2a8ee
6 changed files with 299 additions and 72 deletions

View File

@ -39,6 +39,9 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
RUN apt-get update && apt-get install -y --no-install-recommends \
tzdata ca-certificates \
qpdf \
pdftk-java \
libmupdf-dev \
mupdf-tools \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app

View File

@ -7,6 +7,7 @@ pydantic>=2.6
# PDF handling
PyPDF2>=3.0.1
PyMuPDF>=1.23.0
# DB (MySQL via SQLAlchemy + PyMySQL)
SQLAlchemy>=2.0

View File

@ -6,6 +6,7 @@ import os
import re
import subprocess
import tempfile
import logging
from typing import Any, Dict, Optional
import PyPDF2
@ -18,6 +19,17 @@ from PyPDF2.generic import (
createStringObject,
)
# Try to import PyMuPDF for better flattening
try:
import fitz # PyMuPDF
HAS_PYMUPDF = True
except ImportError:
HAS_PYMUPDF = False
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# dein Modul mit Mapping; ggf. Namen anpassen:
import pdf_to_struct as core # _merge_mapping
@ -140,9 +152,54 @@ def _collect_btn_widgets(reader: PyPDF2.PdfReader):
# PDF Flattening Helper
# -----------------------------
def _flatten_pdf_with_pymupdf(pdf_bytes: bytes) -> Optional[bytes]:
"""Try to flatten PDF using PyMuPDF if available."""
if not HAS_PYMUPDF:
logger.info("PyMuPDF not available for flattening")
return None
try:
logger.info("Attempting to flatten PDF with PyMuPDF")
# Open PDF with PyMuPDF
pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf")
# Convert form fields to drawings (most reliable method)
for page in pdf_document:
# Convert all annotations to their appearance
for annot in page.annots():
if annot.type[0] == fitz.PDF_ANNOT_WIDGET:
# Get the appearance and draw it on the page
annot.set_flags(fitz.PDF_ANNOT_PRINT)
annot.update()
# Save as a new PDF without form fields
# Use convert_to_pdf to create a clean PDF
new_doc = fitz.open()
for page in pdf_document:
# Create a new page with the same dimensions
new_page = new_doc.new_page(width=page.rect.width, height=page.rect.height)
# Get the page as a pixmap
pix = page.get_pixmap(dpi=150)
# Insert the pixmap as an image
new_page.insert_image(new_page.rect, pixmap=pix)
# Save the new document
flattened_bytes = new_doc.tobytes(deflate=True, clean=True)
# Cleanup
new_doc.close()
pdf_document.close()
logger.info("Successfully flattened PDF with PyMuPDF")
return flattened_bytes
except Exception as e:
logger.error(f"PyMuPDF flattening failed: {e}")
return None
def _flatten_pdf_with_qpdf(pdf_bytes: bytes) -> Optional[bytes]:
"""Try to flatten PDF using qpdf if available."""
try:
logger.info("Attempting to flatten PDF with qpdf")
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as input_file:
with tempfile.NamedTemporaryFile(suffix='_flattened.pdf', delete=False) as output_file:
input_path = input_file.name
@ -153,20 +210,38 @@ def _flatten_pdf_with_qpdf(pdf_bytes: bytes) -> Optional[bytes]:
f.write(pdf_bytes)
# Try to flatten with qpdf
result = subprocess.run(
['qpdf', '--flatten-annotations=all', '--generate-appearances', input_path, output_path],
# First pass: generate appearances for all form fields
temp_path = input_path + '.temp'
result1 = subprocess.run(
['qpdf', '--generate-appearances', input_path, temp_path],
capture_output=True,
timeout=30
)
if result1.returncode == 0:
# Second pass: flatten all annotations including form fields
result = subprocess.run(
['qpdf', '--flatten-annotations=all', temp_path, output_path],
capture_output=True,
timeout=30
)
try:
os.unlink(temp_path)
except:
pass
else:
result = result1
if result.returncode == 0:
with open(output_path, 'rb') as f:
flattened_bytes = f.read()
# Cleanup
os.unlink(input_path)
os.unlink(output_path)
logger.info("Successfully flattened PDF with qpdf")
return flattened_bytes
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired):
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
logger.error(f"qpdf flattening failed: {e}")
pass
finally:
# Ensure cleanup
@ -178,6 +253,7 @@ def _flatten_pdf_with_qpdf(pdf_bytes: bytes) -> Optional[bytes]:
def _flatten_pdf_with_pdftk(pdf_bytes: bytes) -> Optional[bytes]:
"""Try to flatten PDF using pdftk if available."""
try:
logger.info("Attempting to flatten PDF with pdftk")
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as input_file:
with tempfile.NamedTemporaryFile(suffix='_flattened.pdf', delete=False) as output_file:
input_path = input_file.name
@ -188,6 +264,7 @@ def _flatten_pdf_with_pdftk(pdf_bytes: bytes) -> Optional[bytes]:
f.write(pdf_bytes)
# Try to flatten with pdftk
# pdftk's flatten command specifically flattens form fields
result = subprocess.run(
['pdftk', input_path, 'output', output_path, 'flatten'],
capture_output=True,
@ -200,8 +277,10 @@ def _flatten_pdf_with_pdftk(pdf_bytes: bytes) -> Optional[bytes]:
# Cleanup
os.unlink(input_path)
os.unlink(output_path)
logger.info("Successfully flattened PDF with pdftk")
return flattened_bytes
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired):
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
logger.error(f"pdftk flattening failed: {e}")
pass
finally:
# Ensure cleanup
@ -214,7 +293,7 @@ def _flatten_pdf_with_pdftk(pdf_bytes: bytes) -> Optional[bytes]:
# Kern: PDF füllen (direktes Widget-Update)
# -----------------------------
def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = None, flatten: bool = True) -> bytes:
def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = None, flatten: bool = False) -> bytes:
"""
Payload (asdict(RootPayload) ODER dein payload["pa"]-ähnliches Dict) -> befüllte PDF-Bytes.
@ -222,8 +301,9 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
payload: Dictionary mit den Formulardaten
variant: "QSM" oder "VSM"
out_path: Optionaler Pfad zum Speichern der PDF
flatten: Wenn True, werden Formularfelder in statischen Inhalt umgewandelt
flatten: Wenn True, werden Formularfelder in statischen Inhalt umgewandelt (Standard: False)
"""
logger.info(f"fill_pdf called with variant={variant}, flatten={flatten}")
template_path = _get_template(variant)
if not os.path.isfile(template_path):
raise FileNotFoundError(f"Template not found: {template_path}")
@ -254,14 +334,19 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
if root and "/AcroForm" in root:
acroform = root["/AcroForm"]
writer._root_object.update({NameObject("/AcroForm"): acroform})
# Set NeedAppearances to False when flattening to force appearance generation
try:
if flatten:
# False forces PDF viewers to use existing appearances
writer._root_object["/AcroForm"].update({NameObject("/NeedAppearances"): BooleanObject(False)})
else:
writer._root_object["/AcroForm"].update({NameObject("/NeedAppearances"): BooleanObject(True)})
except Exception:
pass
else:
writer._root_object.update({
NameObject("/AcroForm"): PyPDF2.generic.DictionaryObject({
NameObject("/NeedAppearances"): BooleanObject(True)
NameObject("/NeedAppearances"): BooleanObject(False if flatten else True)
})
})
except Exception:
@ -357,6 +442,12 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
if name in text_updates and ft in (None, "Tx", "Ch"):
value = text_updates[name]
annot.update({NameObject("/V"): createStringObject(value)})
# For flattening: ensure default appearance is set
if flatten and "/DA" not in annot:
# Set a default appearance string (Helvetica 10pt black)
annot.update({NameObject("/DA"): createStringObject("/Helv 10 Tf 0 g")})
parent = annot.get("/Parent")
if isinstance(parent, IndirectObject):
try:
@ -388,6 +479,12 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
if desired != "Off" and widget_on and desired == widget_on:
annot.update({NameObject("/AS"): _to_name(desired)})
annot.update({NameObject("/V"): _to_name(desired)})
# For checkboxes/radio buttons, ensure they're visible when flattened
if flatten:
# Make sure the appearance state matches the value
annot.update({NameObject("/AS"): _to_name(desired)})
parent = annot.get("/Parent")
if isinstance(parent, IndirectObject):
try:
@ -418,16 +515,26 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
# 4) Flatten if requested
if flatten:
# Try external tools first for better flattening
logger.info(f"Starting PDF flattening process (PDF size: {len(data)} bytes)")
# Try qpdf first (most reliable for form fields)
flattened = _flatten_pdf_with_qpdf(data)
if flattened:
logger.info("PDF flattened successfully with qpdf")
data = flattened
else:
# Try pdftk as fallback
# Try PyMuPDF as second option
flattened = _flatten_pdf_with_pymupdf(data)
if flattened:
logger.info("PDF flattened successfully with PyMuPDF")
data = flattened
else:
# Try pdftk as third option
flattened = _flatten_pdf_with_pdftk(data)
if flattened:
logger.info("PDF flattened successfully with pdftk")
data = flattened
else:
logger.warning("All flattening methods failed, using PyPDF2 fallback (content may be lost)")
# Fallback: Remove form fields using PyPDF2 (fields won't be visible)
# This is not ideal but better than nothing
reader = PyPDF2.PdfReader(io.BytesIO(data))
@ -472,11 +579,15 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
bio = io.BytesIO()
writer.write(bio)
data = bio.getvalue()
logger.info("PDF flattened with PyPDF2 fallback")
else:
logger.info("Flattening not requested, returning PDF with editable fields")
if out_path:
with open(out_path, "wb") as out:
out.write(data)
return data
def save_pdf(payload: Dict[str, Any], variant: str, out_path: str, flatten: bool = True) -> None:
def save_pdf(payload: Dict[str, Any], variant: str, out_path: str, flatten: bool = False) -> None:
_ = fill_pdf(payload, variant, out_path=out_path, flatten=flatten)

View File

@ -265,14 +265,59 @@ def _merge_mapping(variant: str, form_fields: Mapping[str, Any]) -> Dict[str, Di
_cost_name_pat = re.compile(r"^pa-cost-(\d+)-name$")
_cost_amt_pat = re.compile(r"^pa-cost-(\d+)-amount-euro$")
def detect_variant(form_fields: Mapping[str, Any]) -> str:
"""Best-effort variant detection from raw PDF fields."""
keys = set(form_fields.keys())
if "pa-qsm-financing" in keys:
def detect_variant(form_fields: Mapping[str, Any], pdf_file: Optional[str] = None) -> str:
"""
Best-effort variant detection from raw PDF fields and content.
Detection strategy:
1. Check PDF title/header text for "QSM" or "VSM"
2. Check specific form field names
3. Check form field patterns
4. Default to VSM (most common)
"""
# First try to detect from PDF text content if file is provided
if pdf_file:
try:
text = extract_pdf_text(pdf_file, max_pages=2)
text_upper = text.upper()
# Look for clear indicators in the title/header
# QSM has "Projektantrag: QSM" or "QSM Allgemeiner Teil"
if "PROJEKTANTRAG: QSM" in text_upper or "QSM ALLGEMEINER TEIL" in text_upper:
return "QSM"
if "pa-vsm-financing" in keys:
# VSM has "Projektantrag: VSM" or "VSM Allgemeiner Teil" or "VSM Kostenaufstellung"
if "PROJEKTANTRAG: VSM" in text_upper or "VSM ALLGEMEINER TEIL" in text_upper or "VSM KOSTENAUFSTELLUNG" in text_upper:
return "VSM"
# Additional checks for QSM-specific content
if "ANTRAGSTELLER" in text_upper and "INSTITUTION (SOFERN VORHANDEN)" not in text_upper:
# QSM has just "Antragsteller", VSM has "Institution (sofern vorhanden)"
return "QSM"
except Exception:
pass
# Check form field names
keys = set(form_fields.keys())
# Check for variant-specific fields
qsm_indicators = {"pa-qsm-financing", "pa-qsm-reason", "pa-applicant-course"}
vsm_indicators = {"pa-vsm-financing", "pa-institution-type", "pa-institution"}
qsm_count = len(qsm_indicators & keys)
vsm_count = len(vsm_indicators & keys)
if qsm_count > vsm_count:
return "QSM"
if vsm_count > qsm_count:
return "VSM"
# Check field patterns - QSM has different cost structure
if any(k.startswith("pa-cost-") and k.endswith("-description") for k in keys):
# VSM has numbered cost positions
return "VSM"
# Default to VSM as it's more common
return "VSM"
return "COMMON"
def map_form_to_payload(form_json: Dict[str, Dict[str, Any]], variant: str) -> Dict[str, Any]:
"""
@ -289,7 +334,11 @@ def map_form_to_payload(form_json: Dict[str, Dict[str, Any]], variant: str) -> D
# First pass
for field_name, meta in form_json.items():
# Handle both PDF fields (dict with /V) and form_json (direct string values)
if isinstance(meta, dict):
raw_val = meta.get("/V")
else:
raw_val = meta
# Costs pattern
m_name = _cost_name_pat.match(field_name)
@ -487,12 +536,26 @@ def read_pdf_fields(pdf_file: str) -> Dict[str, Dict[str, Any]]:
# ensure dict[str, dict] and keep only keys we care about
return {k: (v or {}) for k, v in fields.items()}
def extract_pdf_text(pdf_file: str, max_pages: int = 2) -> str:
"""Extract text from the first few pages of a PDF for variant detection."""
try:
with open(pdf_file, "rb") as f:
reader = PyPDF2.PdfReader(f, strict=False)
text = ""
for i in range(min(max_pages, len(reader.pages))):
page = reader.pages[i]
text += page.extract_text() + "\n"
return text
except Exception:
return ""
def pdf_to_payload(pdf_file: str, variant: Optional[str] = None) -> RootPayload:
"""
Extract, map, and convert to dataclass model.
"""
form_fields = read_pdf_fields(pdf_file)
v = variant or detect_variant(form_fields)
v = variant or detect_variant(form_fields, pdf_file)
mapped = map_form_to_payload(form_fields, v)
return payload_to_model(mapped)

View File

@ -30,7 +30,7 @@ import secrets
import hashlib
import tempfile
from datetime import datetime
from typing import Any, Dict, Optional, List
from typing import Any, Dict, List, Optional, Union, Tuple
from dotenv import load_dotenv
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends, Query, Body, Header, Response
@ -400,12 +400,17 @@ def _payload_from_pdf_bytes(tmp_path: str, variant: Optional[str]) -> Dict[str,
except PdfReadError as e:
raise HTTPException(status_code=400, detail=f"PDF parse error: {e}")
def _payload_from_form_json(form_json: Dict[str, Any], variant: Optional[str]) -> Dict[str, Any]:
def _payload_from_form_json(form_json: Dict[str, Any], variant: Optional[str]) -> Tuple[Dict[str, Any], str]:
# map_form_to_payload -> dict mit 'pa....'; danach in Model, dann wieder asdict
mapped = core.map_form_to_payload(form_json, variant or "AUTO")
# Detect variant if AUTO or not specified
if variant is None or variant == "AUTO":
detected_variant = core.detect_variant(form_json)
else:
detected_variant = variant
mapped = core.map_form_to_payload(form_json, detected_variant)
model = core.payload_to_model(mapped)
from dataclasses import asdict
return asdict(model)
return asdict(model), detected_variant
def _inject_meta_for_render(payload: Dict[str, Any], pa_id: str, pa_key: Optional[str]) -> Dict[str, Any]:
# Wir injizieren Key/ID NUR für die PDF-Generierung in payload['pa'].*,
@ -467,18 +472,20 @@ def create_application(
# Payload beschaffen
payload: Dict[str, Any]
raw_form: Optional[Dict[str, Any]] = None
detected_variant: Optional[str] = None
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
if pdf:
tf.write(pdf.file.read())
tf.flush()
payload = _payload_from_pdf_bytes(tf.name, variant)
# For PDF, we'll detect variant from the PDF content/fields
elif form_json_b64:
try:
raw = base64.b64decode(form_json_b64)
raw_form = json.loads(raw.decode("utf-8"))
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
payload = _payload_from_form_json(raw_form, variant or "AUTO")
payload, detected_variant = _payload_from_form_json(raw_form, variant or "AUTO")
else:
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
@ -494,11 +501,53 @@ def create_application(
pa_key_plain = _gen_pa_key()
salt, key_hash = _hash_key(pa_key_plain)
# Variante bestimmen (falls AUTO)
detected = variant or core.detect_variant(payload.get("pa", {})) or "VSM"
detected = detected.upper()
if detected == "AUTO":
# Variante bestimmen
# If variant was explicitly provided and not AUTO, use it
if variant and variant.upper() not in ["AUTO", "COMMON"]:
detected = variant.upper()
# If detected_variant was set from form_json processing, use it
elif detected_variant:
detected = detected_variant.upper()
# If PDF was uploaded, detect from payload structure
elif pdf:
# Look for variant-specific fields in the payload
pa_data = payload.get("pa", {})
# Check for QSM fields in the correct location
project_data = pa_data.get("project", {})
financing_data = project_data.get("financing", {})
# Check which financing type has actual content (not just empty structure)
qsm_data = financing_data.get("qsm", {})
vsm_data = financing_data.get("vsm", {})
# QSM has 'code' and 'flags' fields when filled
has_qsm_content = bool(qsm_data.get("code") or qsm_data.get("flags"))
# VSM has different structure (check if actually filled)
has_vsm_content = bool(vsm_data and any(vsm_data.values()))
# Also check institution fields (VSM-specific)
# Note: Institution name alone doesn't determine variant, as QSM can also have institution name
institution_data = pa_data.get("applicant", {}).get("institution", {})
has_institution_type = bool(institution_data.get("type")) # Only type is VSM-specific
# Determine variant based on which fields have actual content
# Prioritize financing fields over institution fields
if has_qsm_content and not has_vsm_content:
detected = "QSM"
elif has_vsm_content:
detected = "VSM"
elif has_institution_type:
# Only consider institution type, not name
detected = "VSM"
elif has_qsm_content:
# If only QSM fields are filled, it's QSM
detected = "QSM"
else:
detected = "VSM"
else:
# Default to VSM
detected = "VSM"
# Map COMMON to VSM for backwards compatibility
if detected == "COMMON":
detected = "VSM"
@ -582,7 +631,7 @@ def update_application(
raw_form = json.loads(raw.decode("utf-8"))
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
payload = _payload_from_form_json(raw_form, variant or app_row.variant)
payload, _ = _payload_from_form_json(raw_form, variant or app_row.variant)
else:
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")

View File

@ -582,7 +582,8 @@ const AdminApplicationView: React.FC = () => {
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
)}
<Typography variant="body1">
Es handelt sich um Stellenfinanzierungen
Die Maßnahme beinhaltet keine zeitlich unbefristeten
Stellenfinanzierungen
</Typography>
</Box>
<Box sx={{ display: "flex", alignItems: "center", mb: 1 }}>
@ -592,8 +593,7 @@ const AdminApplicationView: React.FC = () => {
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
)}
<Typography variant="body1">
Die Studierenden werden an der Planung und Durchführung
der Maßnahme beteiligt
Die Maßnahme kommt den Studierenden zugute (vgl. VWV)
</Typography>
</Box>
<Box sx={{ display: "flex", alignItems: "center", mb: 1 }}>
@ -603,7 +603,8 @@ const AdminApplicationView: React.FC = () => {
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
)}
<Typography variant="body1">
Es werden keine Einzelpersonen von der Maßnahme gefördert
Es findet keine individuelle Förderung von Studierenden
statt
</Typography>
</Box>
{formData.qsmFlags.exkursionGenehmigt !== undefined && (
@ -614,8 +615,7 @@ const AdminApplicationView: React.FC = () => {
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
)}
<Typography variant="body1">
Die beantragte Exkursion wurde von den zuständigen
Stellen genehmigt
Die Exkursion wurde von der Fakultät genehmigt
</Typography>
</Box>
)}
@ -627,7 +627,7 @@ const AdminApplicationView: React.FC = () => {
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
)}
<Typography variant="body1">
Die Exkursion wird bereits aus anderen Mitteln
Die Exkursion wird maßgeblich von der Fakultät
bezuschusst
</Typography>
</Box>