Compare commits
2 Commits
f1d022b19b
...
ebc7e2a8ee
| Author | SHA1 | Date | |
|---|---|---|---|
| ebc7e2a8ee | |||
| 3ddb091d1e |
@ -38,6 +38,10 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
|
|||||||
# System deps
|
# System deps
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
tzdata ca-certificates \
|
tzdata ca-certificates \
|
||||||
|
qpdf \
|
||||||
|
pdftk-java \
|
||||||
|
libmupdf-dev \
|
||||||
|
mupdf-tools \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|||||||
@ -7,6 +7,7 @@ pydantic>=2.6
|
|||||||
|
|
||||||
# PDF handling
|
# PDF handling
|
||||||
PyPDF2>=3.0.1
|
PyPDF2>=3.0.1
|
||||||
|
PyMuPDF>=1.23.0
|
||||||
|
|
||||||
# DB (MySQL via SQLAlchemy + PyMySQL)
|
# DB (MySQL via SQLAlchemy + PyMySQL)
|
||||||
SQLAlchemy>=2.0
|
SQLAlchemy>=2.0
|
||||||
|
|||||||
@ -4,6 +4,9 @@ from __future__ import annotations
|
|||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
import logging
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
import PyPDF2
|
import PyPDF2
|
||||||
@ -16,6 +19,17 @@ from PyPDF2.generic import (
|
|||||||
createStringObject,
|
createStringObject,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Try to import PyMuPDF for better flattening
|
||||||
|
try:
|
||||||
|
import fitz # PyMuPDF
|
||||||
|
HAS_PYMUPDF = True
|
||||||
|
except ImportError:
|
||||||
|
HAS_PYMUPDF = False
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# dein Modul mit Mapping; ggf. Namen anpassen:
|
# dein Modul mit Mapping; ggf. Namen anpassen:
|
||||||
import pdf_to_struct as core # _merge_mapping
|
import pdf_to_struct as core # _merge_mapping
|
||||||
|
|
||||||
@ -134,14 +148,162 @@ def _collect_btn_widgets(reader: PyPDF2.PdfReader):
|
|||||||
|
|
||||||
return btn_widgets_by_name, export_values_by_name
|
return btn_widgets_by_name, export_values_by_name
|
||||||
|
|
||||||
|
# -----------------------------
|
||||||
|
# PDF Flattening Helper
|
||||||
|
# -----------------------------
|
||||||
|
|
||||||
|
def _flatten_pdf_with_pymupdf(pdf_bytes: bytes) -> Optional[bytes]:
|
||||||
|
"""Try to flatten PDF using PyMuPDF if available."""
|
||||||
|
if not HAS_PYMUPDF:
|
||||||
|
logger.info("PyMuPDF not available for flattening")
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("Attempting to flatten PDF with PyMuPDF")
|
||||||
|
# Open PDF with PyMuPDF
|
||||||
|
pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||||
|
|
||||||
|
# Convert form fields to drawings (most reliable method)
|
||||||
|
for page in pdf_document:
|
||||||
|
# Convert all annotations to their appearance
|
||||||
|
for annot in page.annots():
|
||||||
|
if annot.type[0] == fitz.PDF_ANNOT_WIDGET:
|
||||||
|
# Get the appearance and draw it on the page
|
||||||
|
annot.set_flags(fitz.PDF_ANNOT_PRINT)
|
||||||
|
annot.update()
|
||||||
|
|
||||||
|
# Save as a new PDF without form fields
|
||||||
|
# Use convert_to_pdf to create a clean PDF
|
||||||
|
new_doc = fitz.open()
|
||||||
|
for page in pdf_document:
|
||||||
|
# Create a new page with the same dimensions
|
||||||
|
new_page = new_doc.new_page(width=page.rect.width, height=page.rect.height)
|
||||||
|
# Get the page as a pixmap
|
||||||
|
pix = page.get_pixmap(dpi=150)
|
||||||
|
# Insert the pixmap as an image
|
||||||
|
new_page.insert_image(new_page.rect, pixmap=pix)
|
||||||
|
|
||||||
|
# Save the new document
|
||||||
|
flattened_bytes = new_doc.tobytes(deflate=True, clean=True)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
new_doc.close()
|
||||||
|
pdf_document.close()
|
||||||
|
|
||||||
|
logger.info("Successfully flattened PDF with PyMuPDF")
|
||||||
|
return flattened_bytes
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"PyMuPDF flattening failed: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _flatten_pdf_with_qpdf(pdf_bytes: bytes) -> Optional[bytes]:
|
||||||
|
"""Try to flatten PDF using qpdf if available."""
|
||||||
|
try:
|
||||||
|
logger.info("Attempting to flatten PDF with qpdf")
|
||||||
|
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as input_file:
|
||||||
|
with tempfile.NamedTemporaryFile(suffix='_flattened.pdf', delete=False) as output_file:
|
||||||
|
input_path = input_file.name
|
||||||
|
output_path = output_file.name
|
||||||
|
|
||||||
|
# Write input PDF
|
||||||
|
with open(input_path, 'wb') as f:
|
||||||
|
f.write(pdf_bytes)
|
||||||
|
|
||||||
|
# Try to flatten with qpdf
|
||||||
|
# First pass: generate appearances for all form fields
|
||||||
|
temp_path = input_path + '.temp'
|
||||||
|
result1 = subprocess.run(
|
||||||
|
['qpdf', '--generate-appearances', input_path, temp_path],
|
||||||
|
capture_output=True,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
if result1.returncode == 0:
|
||||||
|
# Second pass: flatten all annotations including form fields
|
||||||
|
result = subprocess.run(
|
||||||
|
['qpdf', '--flatten-annotations=all', temp_path, output_path],
|
||||||
|
capture_output=True,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
os.unlink(temp_path)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
result = result1
|
||||||
|
|
||||||
|
if result.returncode == 0:
|
||||||
|
with open(output_path, 'rb') as f:
|
||||||
|
flattened_bytes = f.read()
|
||||||
|
# Cleanup
|
||||||
|
os.unlink(input_path)
|
||||||
|
os.unlink(output_path)
|
||||||
|
logger.info("Successfully flattened PDF with qpdf")
|
||||||
|
return flattened_bytes
|
||||||
|
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
|
||||||
|
logger.error(f"qpdf flattening failed: {e}")
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
# Ensure cleanup
|
||||||
|
for path in [input_path, output_path]:
|
||||||
|
if os.path.exists(path):
|
||||||
|
os.unlink(path)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _flatten_pdf_with_pdftk(pdf_bytes: bytes) -> Optional[bytes]:
|
||||||
|
"""Try to flatten PDF using pdftk if available."""
|
||||||
|
try:
|
||||||
|
logger.info("Attempting to flatten PDF with pdftk")
|
||||||
|
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as input_file:
|
||||||
|
with tempfile.NamedTemporaryFile(suffix='_flattened.pdf', delete=False) as output_file:
|
||||||
|
input_path = input_file.name
|
||||||
|
output_path = output_file.name
|
||||||
|
|
||||||
|
# Write input PDF
|
||||||
|
with open(input_path, 'wb') as f:
|
||||||
|
f.write(pdf_bytes)
|
||||||
|
|
||||||
|
# Try to flatten with pdftk
|
||||||
|
# pdftk's flatten command specifically flattens form fields
|
||||||
|
result = subprocess.run(
|
||||||
|
['pdftk', input_path, 'output', output_path, 'flatten'],
|
||||||
|
capture_output=True,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode == 0:
|
||||||
|
with open(output_path, 'rb') as f:
|
||||||
|
flattened_bytes = f.read()
|
||||||
|
# Cleanup
|
||||||
|
os.unlink(input_path)
|
||||||
|
os.unlink(output_path)
|
||||||
|
logger.info("Successfully flattened PDF with pdftk")
|
||||||
|
return flattened_bytes
|
||||||
|
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
|
||||||
|
logger.error(f"pdftk flattening failed: {e}")
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
# Ensure cleanup
|
||||||
|
for path in [input_path, output_path]:
|
||||||
|
if os.path.exists(path):
|
||||||
|
os.unlink(path)
|
||||||
|
return None
|
||||||
|
|
||||||
# -----------------------------
|
# -----------------------------
|
||||||
# Kern: PDF füllen (direktes Widget-Update)
|
# Kern: PDF füllen (direktes Widget-Update)
|
||||||
# -----------------------------
|
# -----------------------------
|
||||||
|
|
||||||
def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = None) -> bytes:
|
def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = None, flatten: bool = False) -> bytes:
|
||||||
"""
|
"""
|
||||||
Payload (asdict(RootPayload) ODER dein payload["pa"]-ähnliches Dict) -> befüllte PDF-Bytes.
|
Payload (asdict(RootPayload) ODER dein payload["pa"]-ähnliches Dict) -> befüllte PDF-Bytes.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
payload: Dictionary mit den Formulardaten
|
||||||
|
variant: "QSM" oder "VSM"
|
||||||
|
out_path: Optionaler Pfad zum Speichern der PDF
|
||||||
|
flatten: Wenn True, werden Formularfelder in statischen Inhalt umgewandelt (Standard: False)
|
||||||
"""
|
"""
|
||||||
|
logger.info(f"fill_pdf called with variant={variant}, flatten={flatten}")
|
||||||
template_path = _get_template(variant)
|
template_path = _get_template(variant)
|
||||||
if not os.path.isfile(template_path):
|
if not os.path.isfile(template_path):
|
||||||
raise FileNotFoundError(f"Template not found: {template_path}")
|
raise FileNotFoundError(f"Template not found: {template_path}")
|
||||||
@ -172,14 +334,19 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
|
|||||||
if root and "/AcroForm" in root:
|
if root and "/AcroForm" in root:
|
||||||
acroform = root["/AcroForm"]
|
acroform = root["/AcroForm"]
|
||||||
writer._root_object.update({NameObject("/AcroForm"): acroform})
|
writer._root_object.update({NameObject("/AcroForm"): acroform})
|
||||||
|
# Set NeedAppearances to False when flattening to force appearance generation
|
||||||
try:
|
try:
|
||||||
writer._root_object["/AcroForm"].update({NameObject("/NeedAppearances"): BooleanObject(True)})
|
if flatten:
|
||||||
|
# False forces PDF viewers to use existing appearances
|
||||||
|
writer._root_object["/AcroForm"].update({NameObject("/NeedAppearances"): BooleanObject(False)})
|
||||||
|
else:
|
||||||
|
writer._root_object["/AcroForm"].update({NameObject("/NeedAppearances"): BooleanObject(True)})
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
writer._root_object.update({
|
writer._root_object.update({
|
||||||
NameObject("/AcroForm"): PyPDF2.generic.DictionaryObject({
|
NameObject("/AcroForm"): PyPDF2.generic.DictionaryObject({
|
||||||
NameObject("/NeedAppearances"): BooleanObject(True)
|
NameObject("/NeedAppearances"): BooleanObject(False if flatten else True)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -275,6 +442,12 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
|
|||||||
if name in text_updates and ft in (None, "Tx", "Ch"):
|
if name in text_updates and ft in (None, "Tx", "Ch"):
|
||||||
value = text_updates[name]
|
value = text_updates[name]
|
||||||
annot.update({NameObject("/V"): createStringObject(value)})
|
annot.update({NameObject("/V"): createStringObject(value)})
|
||||||
|
|
||||||
|
# For flattening: ensure default appearance is set
|
||||||
|
if flatten and "/DA" not in annot:
|
||||||
|
# Set a default appearance string (Helvetica 10pt black)
|
||||||
|
annot.update({NameObject("/DA"): createStringObject("/Helv 10 Tf 0 g")})
|
||||||
|
|
||||||
parent = annot.get("/Parent")
|
parent = annot.get("/Parent")
|
||||||
if isinstance(parent, IndirectObject):
|
if isinstance(parent, IndirectObject):
|
||||||
try:
|
try:
|
||||||
@ -306,6 +479,12 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
|
|||||||
if desired != "Off" and widget_on and desired == widget_on:
|
if desired != "Off" and widget_on and desired == widget_on:
|
||||||
annot.update({NameObject("/AS"): _to_name(desired)})
|
annot.update({NameObject("/AS"): _to_name(desired)})
|
||||||
annot.update({NameObject("/V"): _to_name(desired)})
|
annot.update({NameObject("/V"): _to_name(desired)})
|
||||||
|
|
||||||
|
# For checkboxes/radio buttons, ensure they're visible when flattened
|
||||||
|
if flatten:
|
||||||
|
# Make sure the appearance state matches the value
|
||||||
|
annot.update({NameObject("/AS"): _to_name(desired)})
|
||||||
|
|
||||||
parent = annot.get("/Parent")
|
parent = annot.get("/Parent")
|
||||||
if isinstance(parent, IndirectObject):
|
if isinstance(parent, IndirectObject):
|
||||||
try:
|
try:
|
||||||
@ -329,15 +508,86 @@ def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = No
|
|||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 3) Schreiben
|
# 3) Write the PDF with filled forms
|
||||||
bio = io.BytesIO()
|
bio = io.BytesIO()
|
||||||
writer.write(bio)
|
writer.write(bio)
|
||||||
data = bio.getvalue()
|
data = bio.getvalue()
|
||||||
|
|
||||||
|
# 4) Flatten if requested
|
||||||
|
if flatten:
|
||||||
|
logger.info(f"Starting PDF flattening process (PDF size: {len(data)} bytes)")
|
||||||
|
# Try qpdf first (most reliable for form fields)
|
||||||
|
flattened = _flatten_pdf_with_qpdf(data)
|
||||||
|
if flattened:
|
||||||
|
logger.info("PDF flattened successfully with qpdf")
|
||||||
|
data = flattened
|
||||||
|
else:
|
||||||
|
# Try PyMuPDF as second option
|
||||||
|
flattened = _flatten_pdf_with_pymupdf(data)
|
||||||
|
if flattened:
|
||||||
|
logger.info("PDF flattened successfully with PyMuPDF")
|
||||||
|
data = flattened
|
||||||
|
else:
|
||||||
|
# Try pdftk as third option
|
||||||
|
flattened = _flatten_pdf_with_pdftk(data)
|
||||||
|
if flattened:
|
||||||
|
logger.info("PDF flattened successfully with pdftk")
|
||||||
|
data = flattened
|
||||||
|
else:
|
||||||
|
logger.warning("All flattening methods failed, using PyPDF2 fallback (content may be lost)")
|
||||||
|
# Fallback: Remove form fields using PyPDF2 (fields won't be visible)
|
||||||
|
# This is not ideal but better than nothing
|
||||||
|
reader = PyPDF2.PdfReader(io.BytesIO(data))
|
||||||
|
writer = PyPDF2.PdfWriter()
|
||||||
|
|
||||||
|
# Copy all pages
|
||||||
|
for page in reader.pages:
|
||||||
|
writer.add_page(page)
|
||||||
|
|
||||||
|
# Remove AcroForm to make fields non-interactive
|
||||||
|
if "/AcroForm" in writer._root_object:
|
||||||
|
del writer._root_object["/AcroForm"]
|
||||||
|
|
||||||
|
# Remove Widget annotations
|
||||||
|
for page in writer.pages:
|
||||||
|
if "/Annots" in page:
|
||||||
|
annots = page["/Annots"]
|
||||||
|
if isinstance(annots, IndirectObject):
|
||||||
|
try:
|
||||||
|
annots = annots.get_object()
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
|
||||||
|
new_annots = ArrayObject()
|
||||||
|
if isinstance(annots, (list, ArrayObject)):
|
||||||
|
for annot_ref in annots:
|
||||||
|
try:
|
||||||
|
annot = annot_ref.get_object() if isinstance(annot_ref, IndirectObject) else annot_ref
|
||||||
|
if isinstance(annot, DictionaryObject):
|
||||||
|
subtype = _to_str(annot.get("/Subtype"))
|
||||||
|
if subtype and subtype != "Widget":
|
||||||
|
new_annots.append(annot_ref)
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if len(new_annots) > 0:
|
||||||
|
page[NameObject("/Annots")] = new_annots
|
||||||
|
else:
|
||||||
|
if "/Annots" in page:
|
||||||
|
del page["/Annots"]
|
||||||
|
|
||||||
|
bio = io.BytesIO()
|
||||||
|
writer.write(bio)
|
||||||
|
data = bio.getvalue()
|
||||||
|
logger.info("PDF flattened with PyPDF2 fallback")
|
||||||
|
else:
|
||||||
|
logger.info("Flattening not requested, returning PDF with editable fields")
|
||||||
|
|
||||||
if out_path:
|
if out_path:
|
||||||
with open(out_path, "wb") as out:
|
with open(out_path, "wb") as out:
|
||||||
out.write(data)
|
out.write(data)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def save_pdf(payload: Dict[str, Any], variant: str, out_path: str) -> None:
|
def save_pdf(payload: Dict[str, Any], variant: str, out_path: str, flatten: bool = False) -> None:
|
||||||
_ = fill_pdf(payload, variant, out_path=out_path)
|
_ = fill_pdf(payload, variant, out_path=out_path, flatten=flatten)
|
||||||
|
|||||||
@ -265,14 +265,59 @@ def _merge_mapping(variant: str, form_fields: Mapping[str, Any]) -> Dict[str, Di
|
|||||||
_cost_name_pat = re.compile(r"^pa-cost-(\d+)-name$")
|
_cost_name_pat = re.compile(r"^pa-cost-(\d+)-name$")
|
||||||
_cost_amt_pat = re.compile(r"^pa-cost-(\d+)-amount-euro$")
|
_cost_amt_pat = re.compile(r"^pa-cost-(\d+)-amount-euro$")
|
||||||
|
|
||||||
def detect_variant(form_fields: Mapping[str, Any]) -> str:
|
def detect_variant(form_fields: Mapping[str, Any], pdf_file: Optional[str] = None) -> str:
|
||||||
"""Best-effort variant detection from raw PDF fields."""
|
"""
|
||||||
|
Best-effort variant detection from raw PDF fields and content.
|
||||||
|
|
||||||
|
Detection strategy:
|
||||||
|
1. Check PDF title/header text for "QSM" or "VSM"
|
||||||
|
2. Check specific form field names
|
||||||
|
3. Check form field patterns
|
||||||
|
4. Default to VSM (most common)
|
||||||
|
"""
|
||||||
|
# First try to detect from PDF text content if file is provided
|
||||||
|
if pdf_file:
|
||||||
|
try:
|
||||||
|
text = extract_pdf_text(pdf_file, max_pages=2)
|
||||||
|
text_upper = text.upper()
|
||||||
|
|
||||||
|
# Look for clear indicators in the title/header
|
||||||
|
# QSM has "Projektantrag: QSM" or "QSM – Allgemeiner Teil"
|
||||||
|
if "PROJEKTANTRAG: QSM" in text_upper or "QSM – ALLGEMEINER TEIL" in text_upper:
|
||||||
|
return "QSM"
|
||||||
|
# VSM has "Projektantrag: VSM" or "VSM – Allgemeiner Teil" or "VSM – Kostenaufstellung"
|
||||||
|
if "PROJEKTANTRAG: VSM" in text_upper or "VSM – ALLGEMEINER TEIL" in text_upper or "VSM – KOSTENAUFSTELLUNG" in text_upper:
|
||||||
|
return "VSM"
|
||||||
|
|
||||||
|
# Additional checks for QSM-specific content
|
||||||
|
if "ANTRAGSTELLER" in text_upper and "INSTITUTION (SOFERN VORHANDEN)" not in text_upper:
|
||||||
|
# QSM has just "Antragsteller", VSM has "Institution (sofern vorhanden)"
|
||||||
|
return "QSM"
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Check form field names
|
||||||
keys = set(form_fields.keys())
|
keys = set(form_fields.keys())
|
||||||
if "pa-qsm-financing" in keys:
|
|
||||||
|
# Check for variant-specific fields
|
||||||
|
qsm_indicators = {"pa-qsm-financing", "pa-qsm-reason", "pa-applicant-course"}
|
||||||
|
vsm_indicators = {"pa-vsm-financing", "pa-institution-type", "pa-institution"}
|
||||||
|
|
||||||
|
qsm_count = len(qsm_indicators & keys)
|
||||||
|
vsm_count = len(vsm_indicators & keys)
|
||||||
|
|
||||||
|
if qsm_count > vsm_count:
|
||||||
return "QSM"
|
return "QSM"
|
||||||
if "pa-vsm-financing" in keys:
|
if vsm_count > qsm_count:
|
||||||
return "VSM"
|
return "VSM"
|
||||||
return "COMMON"
|
|
||||||
|
# Check field patterns - QSM has different cost structure
|
||||||
|
if any(k.startswith("pa-cost-") and k.endswith("-description") for k in keys):
|
||||||
|
# VSM has numbered cost positions
|
||||||
|
return "VSM"
|
||||||
|
|
||||||
|
# Default to VSM as it's more common
|
||||||
|
return "VSM"
|
||||||
|
|
||||||
def map_form_to_payload(form_json: Dict[str, Dict[str, Any]], variant: str) -> Dict[str, Any]:
|
def map_form_to_payload(form_json: Dict[str, Dict[str, Any]], variant: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@ -289,7 +334,11 @@ def map_form_to_payload(form_json: Dict[str, Dict[str, Any]], variant: str) -> D
|
|||||||
|
|
||||||
# First pass
|
# First pass
|
||||||
for field_name, meta in form_json.items():
|
for field_name, meta in form_json.items():
|
||||||
raw_val = meta.get("/V")
|
# Handle both PDF fields (dict with /V) and form_json (direct string values)
|
||||||
|
if isinstance(meta, dict):
|
||||||
|
raw_val = meta.get("/V")
|
||||||
|
else:
|
||||||
|
raw_val = meta
|
||||||
|
|
||||||
# Costs pattern
|
# Costs pattern
|
||||||
m_name = _cost_name_pat.match(field_name)
|
m_name = _cost_name_pat.match(field_name)
|
||||||
@ -487,12 +536,26 @@ def read_pdf_fields(pdf_file: str) -> Dict[str, Dict[str, Any]]:
|
|||||||
# ensure dict[str, dict] and keep only keys we care about
|
# ensure dict[str, dict] and keep only keys we care about
|
||||||
return {k: (v or {}) for k, v in fields.items()}
|
return {k: (v or {}) for k, v in fields.items()}
|
||||||
|
|
||||||
|
|
||||||
|
def extract_pdf_text(pdf_file: str, max_pages: int = 2) -> str:
|
||||||
|
"""Extract text from the first few pages of a PDF for variant detection."""
|
||||||
|
try:
|
||||||
|
with open(pdf_file, "rb") as f:
|
||||||
|
reader = PyPDF2.PdfReader(f, strict=False)
|
||||||
|
text = ""
|
||||||
|
for i in range(min(max_pages, len(reader.pages))):
|
||||||
|
page = reader.pages[i]
|
||||||
|
text += page.extract_text() + "\n"
|
||||||
|
return text
|
||||||
|
except Exception:
|
||||||
|
return ""
|
||||||
|
|
||||||
def pdf_to_payload(pdf_file: str, variant: Optional[str] = None) -> RootPayload:
|
def pdf_to_payload(pdf_file: str, variant: Optional[str] = None) -> RootPayload:
|
||||||
"""
|
"""
|
||||||
Extract, map, and convert to dataclass model.
|
Extract, map, and convert to dataclass model.
|
||||||
"""
|
"""
|
||||||
form_fields = read_pdf_fields(pdf_file)
|
form_fields = read_pdf_fields(pdf_file)
|
||||||
v = variant or detect_variant(form_fields)
|
v = variant or detect_variant(form_fields, pdf_file)
|
||||||
mapped = map_form_to_payload(form_fields, v)
|
mapped = map_form_to_payload(form_fields, v)
|
||||||
return payload_to_model(mapped)
|
return payload_to_model(mapped)
|
||||||
|
|
||||||
|
|||||||
@ -30,7 +30,7 @@ import secrets
|
|||||||
import hashlib
|
import hashlib
|
||||||
import tempfile
|
import tempfile
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any, Dict, Optional, List
|
from typing import Any, Dict, List, Optional, Union, Tuple
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends, Query, Body, Header, Response
|
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends, Query, Body, Header, Response
|
||||||
@ -400,12 +400,17 @@ def _payload_from_pdf_bytes(tmp_path: str, variant: Optional[str]) -> Dict[str,
|
|||||||
except PdfReadError as e:
|
except PdfReadError as e:
|
||||||
raise HTTPException(status_code=400, detail=f"PDF parse error: {e}")
|
raise HTTPException(status_code=400, detail=f"PDF parse error: {e}")
|
||||||
|
|
||||||
def _payload_from_form_json(form_json: Dict[str, Any], variant: Optional[str]) -> Dict[str, Any]:
|
def _payload_from_form_json(form_json: Dict[str, Any], variant: Optional[str]) -> Tuple[Dict[str, Any], str]:
|
||||||
# map_form_to_payload -> dict mit 'pa....'; danach in Model, dann wieder asdict
|
# map_form_to_payload -> dict mit 'pa....'; danach in Model, dann wieder asdict
|
||||||
mapped = core.map_form_to_payload(form_json, variant or "AUTO")
|
# Detect variant if AUTO or not specified
|
||||||
|
if variant is None or variant == "AUTO":
|
||||||
|
detected_variant = core.detect_variant(form_json)
|
||||||
|
else:
|
||||||
|
detected_variant = variant
|
||||||
|
mapped = core.map_form_to_payload(form_json, detected_variant)
|
||||||
model = core.payload_to_model(mapped)
|
model = core.payload_to_model(mapped)
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
return asdict(model)
|
return asdict(model), detected_variant
|
||||||
|
|
||||||
def _inject_meta_for_render(payload: Dict[str, Any], pa_id: str, pa_key: Optional[str]) -> Dict[str, Any]:
|
def _inject_meta_for_render(payload: Dict[str, Any], pa_id: str, pa_key: Optional[str]) -> Dict[str, Any]:
|
||||||
# Wir injizieren Key/ID NUR für die PDF-Generierung in payload['pa'].*,
|
# Wir injizieren Key/ID NUR für die PDF-Generierung in payload['pa'].*,
|
||||||
@ -467,18 +472,20 @@ def create_application(
|
|||||||
# Payload beschaffen
|
# Payload beschaffen
|
||||||
payload: Dict[str, Any]
|
payload: Dict[str, Any]
|
||||||
raw_form: Optional[Dict[str, Any]] = None
|
raw_form: Optional[Dict[str, Any]] = None
|
||||||
|
detected_variant: Optional[str] = None
|
||||||
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
|
with tempfile.NamedTemporaryFile(delete=True, suffix=".pdf") as tf:
|
||||||
if pdf:
|
if pdf:
|
||||||
tf.write(pdf.file.read())
|
tf.write(pdf.file.read())
|
||||||
tf.flush()
|
tf.flush()
|
||||||
payload = _payload_from_pdf_bytes(tf.name, variant)
|
payload = _payload_from_pdf_bytes(tf.name, variant)
|
||||||
|
# For PDF, we'll detect variant from the PDF content/fields
|
||||||
elif form_json_b64:
|
elif form_json_b64:
|
||||||
try:
|
try:
|
||||||
raw = base64.b64decode(form_json_b64)
|
raw = base64.b64decode(form_json_b64)
|
||||||
raw_form = json.loads(raw.decode("utf-8"))
|
raw_form = json.loads(raw.decode("utf-8"))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
|
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
|
||||||
payload = _payload_from_form_json(raw_form, variant or "AUTO")
|
payload, detected_variant = _payload_from_form_json(raw_form, variant or "AUTO")
|
||||||
else:
|
else:
|
||||||
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
|
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
|
||||||
|
|
||||||
@ -494,11 +501,53 @@ def create_application(
|
|||||||
pa_key_plain = _gen_pa_key()
|
pa_key_plain = _gen_pa_key()
|
||||||
salt, key_hash = _hash_key(pa_key_plain)
|
salt, key_hash = _hash_key(pa_key_plain)
|
||||||
|
|
||||||
# Variante bestimmen (falls AUTO)
|
# Variante bestimmen
|
||||||
detected = variant or core.detect_variant(payload.get("pa", {})) or "VSM"
|
# If variant was explicitly provided and not AUTO, use it
|
||||||
detected = detected.upper()
|
if variant and variant.upper() not in ["AUTO", "COMMON"]:
|
||||||
if detected == "AUTO":
|
detected = variant.upper()
|
||||||
|
# If detected_variant was set from form_json processing, use it
|
||||||
|
elif detected_variant:
|
||||||
|
detected = detected_variant.upper()
|
||||||
|
# If PDF was uploaded, detect from payload structure
|
||||||
|
elif pdf:
|
||||||
|
# Look for variant-specific fields in the payload
|
||||||
|
pa_data = payload.get("pa", {})
|
||||||
|
# Check for QSM fields in the correct location
|
||||||
|
project_data = pa_data.get("project", {})
|
||||||
|
financing_data = project_data.get("financing", {})
|
||||||
|
|
||||||
|
# Check which financing type has actual content (not just empty structure)
|
||||||
|
qsm_data = financing_data.get("qsm", {})
|
||||||
|
vsm_data = financing_data.get("vsm", {})
|
||||||
|
|
||||||
|
# QSM has 'code' and 'flags' fields when filled
|
||||||
|
has_qsm_content = bool(qsm_data.get("code") or qsm_data.get("flags"))
|
||||||
|
# VSM has different structure (check if actually filled)
|
||||||
|
has_vsm_content = bool(vsm_data and any(vsm_data.values()))
|
||||||
|
|
||||||
|
# Also check institution fields (VSM-specific)
|
||||||
|
# Note: Institution name alone doesn't determine variant, as QSM can also have institution name
|
||||||
|
institution_data = pa_data.get("applicant", {}).get("institution", {})
|
||||||
|
has_institution_type = bool(institution_data.get("type")) # Only type is VSM-specific
|
||||||
|
|
||||||
|
# Determine variant based on which fields have actual content
|
||||||
|
# Prioritize financing fields over institution fields
|
||||||
|
if has_qsm_content and not has_vsm_content:
|
||||||
|
detected = "QSM"
|
||||||
|
elif has_vsm_content:
|
||||||
|
detected = "VSM"
|
||||||
|
elif has_institution_type:
|
||||||
|
# Only consider institution type, not name
|
||||||
|
detected = "VSM"
|
||||||
|
elif has_qsm_content:
|
||||||
|
# If only QSM fields are filled, it's QSM
|
||||||
|
detected = "QSM"
|
||||||
|
else:
|
||||||
|
detected = "VSM"
|
||||||
|
else:
|
||||||
|
# Default to VSM
|
||||||
detected = "VSM"
|
detected = "VSM"
|
||||||
|
|
||||||
# Map COMMON to VSM for backwards compatibility
|
# Map COMMON to VSM for backwards compatibility
|
||||||
if detected == "COMMON":
|
if detected == "COMMON":
|
||||||
detected = "VSM"
|
detected = "VSM"
|
||||||
@ -582,7 +631,7 @@ def update_application(
|
|||||||
raw_form = json.loads(raw.decode("utf-8"))
|
raw_form = json.loads(raw.decode("utf-8"))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
|
raise HTTPException(status_code=400, detail=f"Invalid form_json_b64: {e}")
|
||||||
payload = _payload_from_form_json(raw_form, variant or app_row.variant)
|
payload, _ = _payload_from_form_json(raw_form, variant or app_row.variant)
|
||||||
else:
|
else:
|
||||||
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
|
raise HTTPException(status_code=400, detail="Provide either PDF file or form_json_b64")
|
||||||
|
|
||||||
|
|||||||
141
backend/test_flattening.py
Normal file
141
backend/test_flattening.py
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test script to verify PDF flattening functionality.
|
||||||
|
Tests that form fields are properly removed after filling.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Add src to path
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
|
||||||
|
|
||||||
|
import PyPDF2
|
||||||
|
from pdf_filler import fill_pdf
|
||||||
|
|
||||||
|
def check_pdf_has_forms(pdf_path):
|
||||||
|
"""Check if a PDF has form fields."""
|
||||||
|
with open(pdf_path, 'rb') as f:
|
||||||
|
reader = PyPDF2.PdfReader(f)
|
||||||
|
|
||||||
|
# Check for AcroForm
|
||||||
|
if '/AcroForm' in reader.trailer.get('/Root', {}):
|
||||||
|
acroform = reader.trailer['/Root']['/AcroForm']
|
||||||
|
if '/Fields' in acroform:
|
||||||
|
fields = acroform['/Fields']
|
||||||
|
if fields and len(fields) > 0:
|
||||||
|
return True, f"Found {len(fields)} form fields"
|
||||||
|
|
||||||
|
# Check for widget annotations
|
||||||
|
widget_count = 0
|
||||||
|
for page in reader.pages:
|
||||||
|
if '/Annots' in page:
|
||||||
|
annots = page['/Annots']
|
||||||
|
if hasattr(annots, 'get_object'):
|
||||||
|
annots = annots.get_object()
|
||||||
|
|
||||||
|
if isinstance(annots, (list, PyPDF2.generic.ArrayObject)):
|
||||||
|
for annot_ref in annots:
|
||||||
|
try:
|
||||||
|
annot = annot_ref.get_object() if hasattr(annot_ref, 'get_object') else annot_ref
|
||||||
|
if isinstance(annot, dict) or isinstance(annot, PyPDF2.generic.DictionaryObject):
|
||||||
|
subtype = annot.get('/Subtype')
|
||||||
|
if subtype and str(subtype) == '/Widget':
|
||||||
|
widget_count += 1
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if widget_count > 0:
|
||||||
|
return True, f"Found {widget_count} widget annotations"
|
||||||
|
|
||||||
|
return False, "No form fields or widgets found"
|
||||||
|
|
||||||
|
def test_flattening():
|
||||||
|
"""Test PDF flattening functionality."""
|
||||||
|
|
||||||
|
# Test payload
|
||||||
|
test_payload = {
|
||||||
|
"pa": {
|
||||||
|
"meta": {
|
||||||
|
"id": "TEST-001",
|
||||||
|
"key": "test-key-123"
|
||||||
|
},
|
||||||
|
"applicant": {
|
||||||
|
"name": "Test Applicant",
|
||||||
|
"email": "test@example.com"
|
||||||
|
},
|
||||||
|
"project": {
|
||||||
|
"title": "Test Project",
|
||||||
|
"description": "This is a test project",
|
||||||
|
"costs": [
|
||||||
|
{"description": "Item 1", "amountEur": 100.50},
|
||||||
|
{"description": "Item 2", "amountEur": 200.75}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
print("Testing PDF Flattening...")
|
||||||
|
print("-" * 50)
|
||||||
|
|
||||||
|
# Test both variants
|
||||||
|
for variant in ["VSM", "QSM"]:
|
||||||
|
print(f"\nTesting {variant} variant:")
|
||||||
|
|
||||||
|
# Check if template exists
|
||||||
|
template_path = os.path.join(os.path.dirname(__file__), "src", "assets", f"{variant.lower()}.pdf")
|
||||||
|
if not os.path.exists(template_path):
|
||||||
|
print(f" ⚠️ Template not found at {template_path}, skipping...")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check template has forms
|
||||||
|
has_forms, msg = check_pdf_has_forms(template_path)
|
||||||
|
print(f" Template: {msg}")
|
||||||
|
|
||||||
|
# Generate PDF with flattening (default)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=f"_{variant}_flattened.pdf", delete=False) as tf:
|
||||||
|
flattened_path = tf.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
pdf_bytes = fill_pdf(test_payload, variant, out_path=flattened_path, flatten=True)
|
||||||
|
has_forms, msg = check_pdf_has_forms(flattened_path)
|
||||||
|
print(f" Flattened PDF: {msg}")
|
||||||
|
|
||||||
|
if has_forms:
|
||||||
|
print(f" ❌ FAILED: Flattened PDF still has form fields!")
|
||||||
|
else:
|
||||||
|
print(f" ✅ SUCCESS: Form fields removed after flattening")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ❌ ERROR generating flattened PDF: {e}")
|
||||||
|
finally:
|
||||||
|
if os.path.exists(flattened_path):
|
||||||
|
os.unlink(flattened_path)
|
||||||
|
|
||||||
|
# Generate PDF without flattening for comparison
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=f"_{variant}_not_flattened.pdf", delete=False) as tf:
|
||||||
|
not_flattened_path = tf.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
pdf_bytes = fill_pdf(test_payload, variant, out_path=not_flattened_path, flatten=False)
|
||||||
|
has_forms, msg = check_pdf_has_forms(not_flattened_path)
|
||||||
|
print(f" Non-flattened PDF: {msg}")
|
||||||
|
|
||||||
|
if not has_forms:
|
||||||
|
print(f" ⚠️ WARNING: Non-flattened PDF has no form fields (unexpected)")
|
||||||
|
else:
|
||||||
|
print(f" ✅ Non-flattened PDF keeps form fields as expected")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ❌ ERROR generating non-flattened PDF: {e}")
|
||||||
|
finally:
|
||||||
|
if os.path.exists(not_flattened_path):
|
||||||
|
os.unlink(not_flattened_path)
|
||||||
|
|
||||||
|
print("\n" + "-" * 50)
|
||||||
|
print("Test complete!")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_flattening()
|
||||||
@ -582,7 +582,8 @@ const AdminApplicationView: React.FC = () => {
|
|||||||
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
||||||
)}
|
)}
|
||||||
<Typography variant="body1">
|
<Typography variant="body1">
|
||||||
Es handelt sich um Stellenfinanzierungen
|
Die Maßnahme beinhaltet keine zeitlich unbefristeten
|
||||||
|
Stellenfinanzierungen
|
||||||
</Typography>
|
</Typography>
|
||||||
</Box>
|
</Box>
|
||||||
<Box sx={{ display: "flex", alignItems: "center", mb: 1 }}>
|
<Box sx={{ display: "flex", alignItems: "center", mb: 1 }}>
|
||||||
@ -592,8 +593,7 @@ const AdminApplicationView: React.FC = () => {
|
|||||||
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
||||||
)}
|
)}
|
||||||
<Typography variant="body1">
|
<Typography variant="body1">
|
||||||
Die Studierenden werden an der Planung und Durchführung
|
Die Maßnahme kommt den Studierenden zugute (vgl. VWV)
|
||||||
der Maßnahme beteiligt
|
|
||||||
</Typography>
|
</Typography>
|
||||||
</Box>
|
</Box>
|
||||||
<Box sx={{ display: "flex", alignItems: "center", mb: 1 }}>
|
<Box sx={{ display: "flex", alignItems: "center", mb: 1 }}>
|
||||||
@ -603,7 +603,8 @@ const AdminApplicationView: React.FC = () => {
|
|||||||
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
||||||
)}
|
)}
|
||||||
<Typography variant="body1">
|
<Typography variant="body1">
|
||||||
Es werden keine Einzelpersonen von der Maßnahme gefördert
|
Es findet keine individuelle Förderung von Studierenden
|
||||||
|
statt
|
||||||
</Typography>
|
</Typography>
|
||||||
</Box>
|
</Box>
|
||||||
{formData.qsmFlags.exkursionGenehmigt !== undefined && (
|
{formData.qsmFlags.exkursionGenehmigt !== undefined && (
|
||||||
@ -614,8 +615,7 @@ const AdminApplicationView: React.FC = () => {
|
|||||||
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
||||||
)}
|
)}
|
||||||
<Typography variant="body1">
|
<Typography variant="body1">
|
||||||
Die beantragte Exkursion wurde von den zuständigen
|
Die Exkursion wurde von der Fakultät genehmigt
|
||||||
Stellen genehmigt
|
|
||||||
</Typography>
|
</Typography>
|
||||||
</Box>
|
</Box>
|
||||||
)}
|
)}
|
||||||
@ -627,7 +627,7 @@ const AdminApplicationView: React.FC = () => {
|
|||||||
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
<CheckBoxOutlineBlank sx={{ mr: 1 }} />
|
||||||
)}
|
)}
|
||||||
<Typography variant="body1">
|
<Typography variant="body1">
|
||||||
Die Exkursion wird bereits aus anderen Mitteln
|
Die Exkursion wird maßgeblich von der Fakultät
|
||||||
bezuschusst
|
bezuschusst
|
||||||
</Typography>
|
</Typography>
|
||||||
</Box>
|
</Box>
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user