stupa-pdf-api/src/pdf_filler.py

346 lines
13 KiB
Python

# pdf_filler.py
from __future__ import annotations
import io
import os
import re
from typing import Any, Dict, Optional
import PyPDF2
from PyPDF2.generic import (
NameObject,
BooleanObject,
IndirectObject,
ArrayObject,
DictionaryObject,
createStringObject,
)
# dein Modul mit Mapping; ggf. Namen anpassen:
import pdf_to_struct as core # _merge_mapping
# -----------------------------
# Assets (relativ zum Modul)
# -----------------------------
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))
ASSET_QSM_DEFAULT = os.path.join(_THIS_DIR, "assets", "qsm.pdf")
ASSET_VSM_DEFAULT = os.path.join(_THIS_DIR, "assets", "vsm.pdf")
ASSET_QSM = os.environ.get("QSM_TEMPLATE", ASSET_QSM_DEFAULT)
ASSET_VSM = os.environ.get("VSM_TEMPLATE", ASSET_VSM_DEFAULT)
def _get_template(variant: str) -> str:
v = (variant or "").upper()
if v == "QSM":
return ASSET_QSM
if v == "VSM":
return ASSET_VSM
raise ValueError("variant must be 'QSM' or 'VSM'")
# -----------------------------
# Helfer
# -----------------------------
_WILDCARD_RE = re.compile(
r"^(?P<prefix>.*)\{(?P<var>[a-zA-Z]);(?P<start>\d+):(?P<end>\d+)\}(?P<suffix>.*)$"
)
def _expand_wildcard_key(form_key: str):
m = _WILDCARD_RE.match(form_key)
if not m:
return None
return (
m.group("prefix"),
m.group("var"),
int(m.group("start")),
int(m.group("end")),
m.group("suffix"),
)
def _fmt_de_amount(v: Any) -> str:
try:
f = float(v)
except Exception:
return "" if v is None else str(v)
return f"{f:.2f}".replace(".", ",")
def _to_name(value: str) -> NameObject:
return NameObject("/" + value)
def _to_str(obj) -> Optional[str]:
if obj is None:
return None
s = str(obj)
return s[1:] if s.startswith("/") else s
def _flatten(payload: Dict[str, Any]) -> Dict[str, Any]:
flat: Dict[str, Any] = {}
def rec(prefix: str, obj: Any):
if isinstance(obj, dict):
for k, v in obj.items():
rec(f"{prefix}.{k}" if prefix else k, v)
elif isinstance(obj, list):
for i, v in enumerate(obj):
rec(f"{prefix}[{i}]", v)
else:
flat[prefix] = obj
rec("", payload)
return flat
def _collect_btn_widgets(reader: PyPDF2.PdfReader):
"""
Sammle alle /Btn-Widgets: name -> [annotDicts...]
und die möglichen Exportzustände (aus /AP /N).
"""
btn_widgets_by_name: dict[str, list[DictionaryObject]] = {}
export_values_by_name: dict[str, set[str]] = {}
for page in reader.pages:
annots = page.get("/Annots")
if isinstance(annots, IndirectObject):
try:
annots = annots.get_object()
except Exception:
annots = None
if not annots or not isinstance(annots, (list, ArrayObject)):
continue
for annot_ref in annots:
try:
annot = annot_ref.get_object() if isinstance(annot_ref, IndirectObject) else annot_ref
if not isinstance(annot, DictionaryObject):
continue
if _to_str(annot.get("/FT")) != "Btn":
continue
name = _to_str(annot.get("/T"))
if not name:
continue
btn_widgets_by_name.setdefault(name, []).append(annot)
ap = annot.get("/AP")
if ap and isinstance(ap, DictionaryObject):
n = ap.get("/N")
if isinstance(n, IndirectObject):
n = n.get_object()
if n and isinstance(n, DictionaryObject):
for k in n.keys():
ks = str(k)
if ks == "/Off":
continue
val = ks[1:] if ks.startswith("/") else ks
export_values_by_name.setdefault(name, set()).add(val)
except Exception:
continue
return btn_widgets_by_name, export_values_by_name
# -----------------------------
# Kern: PDF füllen (direktes Widget-Update)
# -----------------------------
def fill_pdf(payload: Dict[str, Any], variant: str, out_path: Optional[str] = None) -> bytes:
"""
Payload (asdict(RootPayload) ODER dein payload["pa"]-ähnliches Dict) -> befüllte PDF-Bytes.
"""
template_path = _get_template(variant)
if not os.path.isfile(template_path):
raise FileNotFoundError(f"Template not found: {template_path}")
with open(template_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
writer = PyPDF2.PdfWriter()
# Seiten kopieren
for p in reader.pages:
writer.add_page(p)
# Flatten payload first
mapping = core._merge_mapping(variant, {})
flat = _flatten(payload)
# Calculate total amount from costs
total_amount = 0.0
costs = flat.get("pa.project.costs", [])
if isinstance(costs, list):
for cost in costs:
if isinstance(cost, dict) and "amountEur" in cost:
amount = cost.get("amountEur")
if amount is not None and isinstance(amount, (int, float)):
total_amount += float(amount)
# AcroForm übernehmen + NeedAppearances
try:
root = reader.trailer.get("/Root")
if root and "/AcroForm" in root:
acroform = root["/AcroForm"]
writer._root_object.update({NameObject("/AcroForm"): acroform})
try:
writer._root_object["/AcroForm"].update({NameObject("/NeedAppearances"): BooleanObject(True)})
except Exception:
pass
else:
writer._root_object.update({
NameObject("/AcroForm"): PyPDF2.generic.DictionaryObject({
NameObject("/NeedAppearances"): BooleanObject(True)
})
})
except Exception:
pass
# 1) Alle konkreten Updates vorbereiten (konkrete Feldnamen -> Wert)
text_updates: Dict[str, str] = {} # normale Felder (Tx/Ch)
btn_updates: Dict[str, str] = {} # Btn-Felder: 'Off' oder Exportname (ohne '/')
# Add calculated total to text_updates
text_updates["pa-requested-amount-euro-sum"] = _fmt_de_amount(total_amount)
# Erkenne vorhandene /Btn Feldnamen im Dokument
btn_widgets_by_name, export_values_by_name = _collect_btn_widgets(reader)
btn_names = set(btn_widgets_by_name.keys())
for form_key, spec in mapping.items():
w = _expand_wildcard_key(form_key)
if not w:
tkey = spec["target-key"]
if tkey not in flat:
continue
val = flat[tkey]
if val is None:
continue
if form_key in btn_names:
# /Btn
if spec.get("type") is bool:
btn_updates[form_key] = "Yes" if bool(val) else "Off"
elif spec.get("type") == "enum":
v = str(val)
# nur Exportwert zulassen, wenn im Template vorhanden (falls bekannt)
if not export_values_by_name.get(form_key) or v in export_values_by_name[form_key]:
btn_updates[form_key] = v
else:
btn_updates[form_key] = "Off"
else:
btn_updates[form_key] = "Off"
else:
# Text/Choice
if spec.get("type") is float and ("amountEur" in tkey or "requestedAmountEur" in tkey):
text_updates[form_key] = _fmt_de_amount(val)
else:
text_updates[form_key] = "" if val is None else str(val)
continue
# Wildcard (Kosten)
prefix, var, start, end, suffix = w
tkey_tpl: str = spec["target-key"]
for a in range(start, end + 1):
pdf_key = f"{prefix}{a}{suffix}" # 1-basiert
idx0 = a - 1
tkey_concrete = (
tkey_tpl.replace(f"[{{{var}}}]", f"[{idx0}]")
.replace(f"{{{var}}}", str(idx0))
)
if tkey_concrete not in flat:
continue
val = flat[tkey_concrete]
if val is None:
continue
# Kosten sind Text
if spec.get("type") is float and ("amountEur" in tkey_concrete or "requestedAmountEur" in tkey_concrete):
text_updates[pdf_key] = _fmt_de_amount(val)
else:
text_updates[pdf_key] = "" if val is None else str(val)
# 2) Alle Seiten/Widgets iterieren und direkt setzen
for page in writer.pages:
annots = page.get("/Annots")
if isinstance(annots, IndirectObject):
try:
annots = annots.get_object()
except Exception:
annots = None
if not annots or not isinstance(annots, (list, ArrayObject)):
continue
for annot_ref in annots:
try:
annot = annot_ref.get_object() if isinstance(annot_ref, IndirectObject) else annot_ref
if not isinstance(annot, DictionaryObject):
continue
name = _to_str(annot.get("/T"))
if not name:
continue
ft = _to_str(annot.get("/FT"))
# Text/Choice
if name in text_updates and ft in (None, "Tx", "Ch"):
value = text_updates[name]
annot.update({NameObject("/V"): createStringObject(value)})
parent = annot.get("/Parent")
if isinstance(parent, IndirectObject):
try:
p = parent.get_object()
if isinstance(p, DictionaryObject):
p.update({NameObject("/V"): createStringObject(value)})
except Exception:
pass
continue
# Button (/Btn)
if name in btn_updates and ft == "Btn":
desired = btn_updates[name] # 'Off' oder Exportname
# Export dieses Widgets herausfinden
widget_on = None
ap = annot.get("/AP")
if ap and isinstance(ap, DictionaryObject):
n = ap.get("/N")
if isinstance(n, IndirectObject):
n = n.get_object()
if n and isinstance(n, DictionaryObject):
for k in n.keys():
ks = str(k)
if ks == "/Off":
continue
widget_on = ks[1:] if ks.startswith("/") else ks
break
if desired != "Off" and widget_on and desired == widget_on:
annot.update({NameObject("/AS"): _to_name(desired)})
annot.update({NameObject("/V"): _to_name(desired)})
parent = annot.get("/Parent")
if isinstance(parent, IndirectObject):
try:
p = parent.get_object()
if isinstance(p, DictionaryObject):
p.update({NameObject("/V"): _to_name(desired)})
except Exception:
pass
else:
annot.update({NameObject("/AS"): NameObject("/Off")})
annot.update({NameObject("/V"): NameObject("/Off")})
parent = annot.get("/Parent")
if isinstance(parent, IndirectObject):
try:
p = parent.get_object()
if isinstance(p, DictionaryObject):
p.update({NameObject("/V"): NameObject("/Off")})
except Exception:
pass
except Exception:
continue
# 3) Schreiben
bio = io.BytesIO()
writer.write(bio)
data = bio.getvalue()
if out_path:
with open(out_path, "wb") as out:
out.write(data)
return data
def save_pdf(payload: Dict[str, Any], variant: str, out_path: str) -> None:
_ = fill_pdf(payload, variant, out_path=out_path)