486 lines
14 KiB
Python
Executable File
486 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Extract PDF Form Data and Convert to JSON
|
|
|
|
This script extracts form data from a PDF file, maps it using the provided
|
|
field mappings, and emits a structured JSON payload.
|
|
|
|
Requires:
|
|
- PyPDF2
|
|
- pdf_field_mapping.py containing:
|
|
TEXT_MAPPING_COMMON, TEXT_MAPPING_QSM, TEXT_MAPPING_VSM, _PLACEHOLDER_VALUES
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from argparse import ArgumentParser
|
|
from dataclasses import dataclass, asdict, field
|
|
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
|
|
|
|
import PyPDF2
|
|
from pdf_field_mapping import (
|
|
TEXT_MAPPING_COMMON,
|
|
TEXT_MAPPING_QSM,
|
|
TEXT_MAPPING_VSM,
|
|
_PLACEHOLDER_VALUES,
|
|
)
|
|
|
|
# =========================
|
|
# Types / Data Model
|
|
# =========================
|
|
|
|
@dataclass
|
|
class Name:
|
|
first: Optional[str] = None
|
|
last: Optional[str] = None
|
|
|
|
@dataclass
|
|
class Institution:
|
|
name: Optional[str] = None
|
|
type: Optional[str] = None # enum key
|
|
|
|
@dataclass
|
|
class Contact:
|
|
email: Optional[str] = None
|
|
phone: Optional[str] = None
|
|
|
|
@dataclass
|
|
class Applicant:
|
|
type: Optional[str] = None # enum key
|
|
institution: Institution = field(default_factory=Institution)
|
|
name: Name = field(default_factory=Name)
|
|
course: Optional[str] = None # enum key
|
|
role: Optional[str] = None # enum key
|
|
contact: Contact = field(default_factory=Contact)
|
|
|
|
@dataclass
|
|
class Dates:
|
|
start: Optional[str] = None
|
|
end: Optional[str] = None
|
|
|
|
@dataclass
|
|
class ParticipationFaculties:
|
|
inf: Optional[bool] = None
|
|
esb: Optional[bool] = None
|
|
ls: Optional[bool] = None
|
|
tec: Optional[bool] = None
|
|
tex: Optional[bool] = None
|
|
nxt: Optional[bool] = None
|
|
open: Optional[bool] = None
|
|
|
|
@dataclass
|
|
class Participation:
|
|
faculties: ParticipationFaculties = field(default_factory=ParticipationFaculties)
|
|
|
|
@dataclass
|
|
class Cost:
|
|
name: Optional[str] = None
|
|
amountEur: Optional[float] = None
|
|
|
|
@dataclass
|
|
class Totals:
|
|
requestedAmountEur: Optional[float] = None
|
|
|
|
@dataclass
|
|
class VSMFlags:
|
|
aufgaben: Optional[bool] = None
|
|
individuell: Optional[bool] = None
|
|
|
|
@dataclass
|
|
class VSM:
|
|
code: Optional[str] = None # enum key
|
|
flags: VSMFlags = field(default_factory=VSMFlags)
|
|
|
|
@dataclass
|
|
class QSMFlags:
|
|
stellenfinanzierungen: Optional[bool] = None
|
|
studierende: Optional[bool] = None
|
|
individuell: Optional[bool] = None
|
|
exkursionGenehmigt: Optional[bool] = None
|
|
exkursionBezuschusst: Optional[bool] = None
|
|
|
|
@dataclass
|
|
class QSM:
|
|
code: Optional[str] = None # enum key
|
|
flags: QSMFlags = field(default_factory=QSMFlags)
|
|
|
|
@dataclass
|
|
class Financing:
|
|
vsm: Optional[VSM] = None
|
|
qsm: Optional[QSM] = None
|
|
|
|
@dataclass
|
|
class Project:
|
|
name: Optional[str] = None
|
|
dates: Dates = field(default_factory=Dates)
|
|
participants: Optional[int] = None
|
|
description: Optional[str] = None
|
|
participation: Participation = field(default_factory=Participation)
|
|
costs: List[Cost] = field(default_factory=list)
|
|
totals: Totals = field(default_factory=Totals)
|
|
financing: Financing = field(default_factory=Financing)
|
|
|
|
@dataclass
|
|
class Attachments:
|
|
comparativeOffers: Optional[bool] = None
|
|
fakultaet: Optional[bool] = None # only in QSM variant
|
|
|
|
@dataclass
|
|
class WarningInfo:
|
|
notSupported: Optional[str] = None
|
|
|
|
@dataclass
|
|
class RootPayload:
|
|
pa: Any = field(default_factory=dict) # will hold applicant + project + attachments
|
|
warning: WarningInfo = field(default_factory=WarningInfo)
|
|
_validation: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
# =========================
|
|
# Mapping helpers
|
|
# =========================
|
|
|
|
def _to_bool(v: Any) -> bool:
|
|
if isinstance(v, str):
|
|
return v not in ("/Off", "")
|
|
return bool(v)
|
|
|
|
def _to_int(v: Any) -> Optional[int]:
|
|
if v in _PLACEHOLDER_VALUES:
|
|
return None
|
|
try:
|
|
return int(str(v).strip())
|
|
except Exception:
|
|
try:
|
|
return int(float(str(v).strip().replace(",", ".").replace(" ", "")))
|
|
except Exception:
|
|
return None
|
|
|
|
def _to_float_de(v: Any) -> Optional[float]:
|
|
if v in _PLACEHOLDER_VALUES:
|
|
return None
|
|
try:
|
|
s = str(v).replace(".", "").replace(" ", "").replace(",", ".")
|
|
return float(s)
|
|
except Exception:
|
|
return None
|
|
|
|
def _to_str(v: Any) -> Optional[str]:
|
|
if v is None:
|
|
return None
|
|
s = str(v)
|
|
if s.strip() in _PLACEHOLDER_VALUES:
|
|
return None
|
|
return s
|
|
|
|
def _from_enum(v: Any, pairs: Iterable[Tuple[str, str]]) -> Optional[str]:
|
|
if v in _PLACEHOLDER_VALUES:
|
|
return None
|
|
s = str(v)
|
|
for key, _ in pairs:
|
|
if s == key:
|
|
return key
|
|
for key, label in pairs:
|
|
if s == label:
|
|
return key
|
|
return None
|
|
|
|
def _coerce(v: Any, spec: Mapping[str, Any]) -> Any:
|
|
t = spec.get("type")
|
|
if t is bool:
|
|
return _to_bool(v)
|
|
if t is int:
|
|
return _to_int(v)
|
|
if t is float:
|
|
return _to_float_de(v)
|
|
if t is str:
|
|
return _to_str(v)
|
|
if t == "enum":
|
|
return _from_enum(v, spec.get("values", []))
|
|
return None if v in _PLACEHOLDER_VALUES else v
|
|
|
|
_key_index_re = re.compile(r"([^\[\]]+)\[(\d+)\]")
|
|
|
|
def _set_nested(root: Dict[str, Any], dotted: str, value: Any) -> None:
|
|
parts = dotted.split(".")
|
|
curr: Any = root
|
|
for i, part in enumerate(parts):
|
|
m = _key_index_re.fullmatch(part)
|
|
is_last = i == len(parts) - 1
|
|
if m:
|
|
k, idx_str = m.group(1), m.group(2)
|
|
idx = int(idx_str)
|
|
if k not in curr or not isinstance(curr.get(k), list):
|
|
curr[k] = []
|
|
lst = curr[k]
|
|
while len(lst) <= idx:
|
|
lst.append({})
|
|
if is_last:
|
|
lst[idx] = value
|
|
else:
|
|
if not isinstance(lst[idx], dict):
|
|
lst[idx] = {}
|
|
curr = lst[idx]
|
|
else:
|
|
if is_last:
|
|
curr[part] = value
|
|
else:
|
|
if part not in curr or not isinstance(curr[part], dict):
|
|
curr[part] = {}
|
|
curr = curr[part]
|
|
|
|
def _merge_mapping(variant: str) -> Dict[str, Dict[str, Any]]:
|
|
v = (variant or "").strip().upper()
|
|
if v == "QSM":
|
|
return {**TEXT_MAPPING_COMMON, **TEXT_MAPPING_QSM}
|
|
if v == "VSM":
|
|
return {**TEXT_MAPPING_COMMON, **TEXT_MAPPING_VSM}
|
|
return dict(TEXT_MAPPING_COMMON)
|
|
|
|
_cost_name_pat = re.compile(r"^pa-cost-(\d+)-name$")
|
|
_cost_amt_pat = re.compile(r"^pa-cost-(\d+)-amount-euro$")
|
|
|
|
def detect_variant(form_fields: Mapping[str, Any]) -> str:
|
|
"""Best-effort variant detection from raw PDF fields."""
|
|
keys = set(form_fields.keys())
|
|
if "pa-qsm-financing" in keys:
|
|
return "QSM"
|
|
if "pa-vsm-financing" in keys:
|
|
return "VSM"
|
|
return "COMMON"
|
|
|
|
|
|
def map_form_to_payload(form_json: Dict[str, Dict[str, Any]], variant: str) -> Dict[str, Any]:
|
|
"""
|
|
Map PDF-like form JSON (fieldName -> dict with '/V', etc.) to nested payload
|
|
using TEXT_MAPPING_* dicts. Unknown/empty fields are skipped.
|
|
"""
|
|
mapping = _merge_mapping(variant)
|
|
out: Dict[str, Any] = {}
|
|
|
|
# Pre-collect costs
|
|
costs_tmp: Dict[int, Dict[str, Any]] = {}
|
|
|
|
# First pass
|
|
for field_name, meta in form_json.items():
|
|
raw_val = meta.get("/V")
|
|
|
|
# Costs pattern
|
|
m_name = _cost_name_pat.match(field_name)
|
|
m_amt = _cost_amt_pat .match(field_name)
|
|
|
|
if m_name:
|
|
idx = int(m_name.group(1)) # 1..24 -> zero-based
|
|
costs_tmp.setdefault(idx - 1, {})["name"] = _to_str(raw_val)
|
|
continue
|
|
if m_amt:
|
|
idx = int(m_amt.group(1))
|
|
costs_tmp.setdefault(idx - 1, {})["amountEur"] = _to_float_de(raw_val)
|
|
continue
|
|
|
|
spec = mapping.get(field_name)
|
|
if not spec:
|
|
continue
|
|
coerced = _coerce(raw_val, spec)
|
|
include = isinstance(coerced, bool) or coerced not in (None, "", [])
|
|
if not include:
|
|
continue
|
|
target = spec["target-key"]
|
|
_set_nested(out, target, coerced)
|
|
|
|
# Costs into payload (skip empty rows)
|
|
for idx, row in sorted(costs_tmp.items()):
|
|
if row.get("name") is None and row.get("amountEur") is None:
|
|
continue
|
|
_set_nested(out, f"pa.project.costs[{idx}]", row)
|
|
|
|
# Required check
|
|
missing = []
|
|
for fname, spec in mapping.items():
|
|
if not spec.get("required"):
|
|
continue
|
|
tkey = spec["target-key"]
|
|
cursor = out
|
|
ok = True
|
|
for part in tkey.split("."):
|
|
mm = _key_index_re.fullmatch(part)
|
|
if mm:
|
|
k, idx = mm.group(1), int(mm.group(2))
|
|
if k not in cursor or not isinstance(cursor[k], list) or len(cursor[k]) <= idx:
|
|
ok = False
|
|
break
|
|
cursor = cursor[k][idx]
|
|
else:
|
|
if part not in cursor:
|
|
ok = False
|
|
break
|
|
cursor = cursor[part]
|
|
if not ok:
|
|
missing.append((fname, tkey))
|
|
if missing:
|
|
out.setdefault("_validation", {})["missingRequired"] = missing
|
|
|
|
return out
|
|
|
|
|
|
# =========================
|
|
# Builders to dataclasses
|
|
# =========================
|
|
|
|
def _get(d: Mapping[str, Any], path: str, default=None):
|
|
curr = d
|
|
for part in path.split("."):
|
|
if not isinstance(curr, Mapping) or part not in curr:
|
|
return default
|
|
curr = curr[part]
|
|
return curr
|
|
|
|
def payload_to_model(payload: Dict[str, Any]) -> RootPayload:
|
|
# Build Applicant
|
|
applicant_dict = _get(payload, "pa.applicant", {}) or {}
|
|
applicant = Applicant(
|
|
type=applicant_dict.get("type"),
|
|
institution=Institution(
|
|
name=_get(applicant_dict, "institution.name"),
|
|
type=_get(applicant_dict, "institution.type"),
|
|
),
|
|
name=Name(
|
|
first=_get(applicant_dict, "name.first"),
|
|
last=_get(applicant_dict, "name.last"),
|
|
),
|
|
course=applicant_dict.get("course"),
|
|
role=applicant_dict.get("role"),
|
|
contact=Contact(
|
|
email=_get(applicant_dict, "contact.email"),
|
|
phone=_get(applicant_dict, "contact.phone"),
|
|
),
|
|
)
|
|
|
|
# Project
|
|
project_dict = _get(payload, "pa.project", {}) or {}
|
|
costs = []
|
|
for c in project_dict.get("costs", []) or []:
|
|
if not isinstance(c, Mapping):
|
|
continue
|
|
costs.append(Cost(name=c.get("name"), amountEur=c.get("amountEur")))
|
|
|
|
# Financing
|
|
vsm_dict = _get(project_dict, "financing.vsm", {}) or {}
|
|
vsm = None
|
|
if vsm_dict:
|
|
vsm = VSM(
|
|
code=vsm_dict.get("code"),
|
|
flags=VSMFlags(
|
|
aufgaben=_get(vsm_dict, "flags.aufgaben"),
|
|
individuell=_get(vsm_dict, "flags.individuell"),
|
|
),
|
|
)
|
|
qsm_dict = _get(project_dict, "financing.qsm", {}) or {}
|
|
qsm = None
|
|
if qsm_dict:
|
|
qsm = QSM(
|
|
code=qsm_dict.get("code"),
|
|
flags=QSMFlags(
|
|
stellenfinanzierungen=_get(qsm_dict, "flags.stellenfinanzierungen"),
|
|
studierende=_get(qsm_dict, "flags.studierende"),
|
|
individuell=_get(qsm_dict, "flags.individuell"),
|
|
exkursionGenehmigt=_get(qsm_dict, "flags.exkursionGenehmigt"),
|
|
exkursionBezuschusst=_get(qsm_dict, "flags.exkursionBezuschusst"),
|
|
),
|
|
)
|
|
|
|
project = Project(
|
|
name=project_dict.get("name"),
|
|
dates=Dates(
|
|
start=_get(project_dict, "dates.start"),
|
|
end=_get(project_dict, "dates.end"),
|
|
),
|
|
participants=project_dict.get("participants"),
|
|
description=project_dict.get("description"),
|
|
participation=Participation(
|
|
faculties=ParticipationFaculties(
|
|
inf=_get(project_dict, "participation.faculties.inf"),
|
|
esb=_get(project_dict, "participation.faculties.esb"),
|
|
ls=_get(project_dict, "participation.faculties.ls"),
|
|
tec=_get(project_dict, "participation.faculties.tec"),
|
|
tex=_get(project_dict, "participation.faculties.tex"),
|
|
nxt=_get(project_dict, "participation.faculties.nxt"),
|
|
open=_get(project_dict, "participation.faculties.open"),
|
|
)
|
|
),
|
|
costs=costs,
|
|
totals=Totals(requestedAmountEur=_get(project_dict, "totals.requestedAmountEur")),
|
|
financing=Financing(vsm=vsm, qsm=qsm),
|
|
)
|
|
|
|
# Attachments
|
|
attachments_dict = _get(payload, "pa.attachments", {}) or {}
|
|
attachments = Attachments(
|
|
comparativeOffers=attachments_dict.get("comparativeOffers"),
|
|
fakultaet=attachments_dict.get("fakultaet"),
|
|
)
|
|
|
|
# Warning
|
|
warning_dict = payload.get("warning", {}) or {}
|
|
warning = WarningInfo(notSupported=warning_dict.get("notSupported"))
|
|
|
|
root = RootPayload(
|
|
pa={
|
|
"applicant": asdict(applicant),
|
|
"project": asdict(project),
|
|
"attachments": asdict(attachments),
|
|
},
|
|
warning=warning,
|
|
_validation=payload.get("_validation", {}),
|
|
)
|
|
return root
|
|
|
|
|
|
# =========================
|
|
# PDF reading + end-to-end
|
|
# =========================
|
|
|
|
def read_pdf_fields(pdf_file: str) -> Dict[str, Dict[str, Any]]:
|
|
with open(pdf_file, "rb") as f:
|
|
reader = PyPDF2.PdfReader(f)
|
|
fields = reader.get_fields() or {}
|
|
# ensure dict[str, dict] and keep only keys we care about
|
|
return {k: (v or {}) for k, v in fields.items()}
|
|
|
|
def pdf_to_payload(pdf_file: str, variant: Optional[str] = None) -> RootPayload:
|
|
"""
|
|
Extract, map, and convert to dataclass model.
|
|
"""
|
|
form_fields = read_pdf_fields(pdf_file)
|
|
v = variant or detect_variant(form_fields)
|
|
mapped = map_form_to_payload(form_fields, v)
|
|
return payload_to_model(mapped)
|
|
|
|
def pdf_to_json(pdf_file: str, variant: Optional[str] = None) -> str:
|
|
"""
|
|
Convenience: return the structured JSON string of the mapped payload.
|
|
"""
|
|
model = pdf_to_payload(pdf_file, variant=variant)
|
|
return json.dumps(asdict(model), ensure_ascii=False, indent=2)
|
|
|
|
|
|
# =========================
|
|
# CLI
|
|
# =========================
|
|
|
|
if __name__ == "__main__":
|
|
parser = ArgumentParser(description="Extract PDF Form Data and Convert to structured JSON")
|
|
parser.add_argument("pdf_file", help="Path to the PDF file")
|
|
parser.add_argument(
|
|
"--variant",
|
|
choices=["QSM", "VSM", "COMMON", "AUTO", "auto"],
|
|
default="AUTO",
|
|
help="Form variant (default: AUTO)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
v = None if args.variant.upper() == "AUTO" else args.variant.upper()
|
|
print(pdf_to_json(args.pdf_file, variant=v))
|