#!/usr/bin/env python3 """ Extract PDF Form Data and Convert to JSON This script extracts form data from a PDF file, maps it using the provided field mappings, and emits a structured JSON payload. Requires: - PyPDF2 - pdf_field_mapping.py containing: TEXT_MAPPING_COMMON, TEXT_MAPPING_QSM, TEXT_MAPPING_VSM, _PLACEHOLDER_VALUES """ from __future__ import annotations import json import re from argparse import ArgumentParser from dataclasses import dataclass, asdict, field from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple import PyPDF2 from pdf_field_mapping import ( TEXT_MAPPING_COMMON, TEXT_MAPPING_QSM, TEXT_MAPPING_VSM, _PLACEHOLDER_VALUES, ) # ========================= # Types / Data Model # ========================= @dataclass class Meta: id: Optional[str] = None key: Optional[str] = None @dataclass class Name: first: Optional[str] = None last: Optional[str] = None @dataclass class Institution: name: Optional[str] = None type: Optional[str] = None # enum key @dataclass class Contact: email: Optional[str] = None phone: Optional[str] = None @dataclass class Applicant: type: Optional[str] = None # enum key institution: Institution = field(default_factory=Institution) name: Name = field(default_factory=Name) course: Optional[str] = None # enum key role: Optional[str] = None # enum key contact: Contact = field(default_factory=Contact) @dataclass class Dates: start: Optional[str] = None end: Optional[str] = None @dataclass class ParticipationFaculties: inf: Optional[bool] = None esb: Optional[bool] = None ls: Optional[bool] = None tec: Optional[bool] = None tex: Optional[bool] = None nxt: Optional[bool] = None open: Optional[bool] = None @dataclass class Participation: faculties: ParticipationFaculties = field(default_factory=ParticipationFaculties) @dataclass class Cost: name: Optional[str] = None amountEur: Optional[float] = None @dataclass class VSMFlags: aufgaben: Optional[bool] = None individuell: Optional[bool] = None @dataclass class VSM: code: Optional[str] = None # enum key flags: VSMFlags = field(default_factory=VSMFlags) @dataclass class QSMFlags: stellenfinanzierungen: Optional[bool] = None studierende: Optional[bool] = None individuell: Optional[bool] = None exkursionGenehmigt: Optional[bool] = None exkursionBezuschusst: Optional[bool] = None @dataclass class QSM: code: Optional[str] = None # enum key flags: QSMFlags = field(default_factory=QSMFlags) @dataclass class Financing: vsm: Optional[VSM] = None qsm: Optional[QSM] = None @dataclass class Project: name: Optional[str] = None dates: Dates = field(default_factory=Dates) participants: Optional[int] = None description: Optional[str] = None participation: Participation = field(default_factory=Participation) costs: List[Cost] = field(default_factory=list) financing: Financing = field(default_factory=Financing) @dataclass class Attachments: comparativeOffers: Optional[bool] = None fakultaet: Optional[bool] = None # only in QSM variant @dataclass class WarningInfo: notSupported: Optional[str] = None @dataclass class RootPayload: pa: Dict[str, Any] = field(default_factory=dict) # kompatibel für Dump applicant: Applicant = field(default_factory=Applicant) project: Project = field(default_factory=Project) attachments: Attachments = field(default_factory=Attachments) meta: Meta = field(default_factory=Meta) warning: WarningInfo = field(default_factory=WarningInfo) _validation: Dict[str, Any] = field(default_factory=dict) # ========================= # Mapping helpers # ========================= def _to_bool(v: Any) -> bool: if isinstance(v, str): return v not in ("/Off", "") return bool(v) def _to_int(v: Any) -> Optional[int]: if v in _PLACEHOLDER_VALUES: return None try: return int(str(v).strip()) except Exception: try: return int(float(str(v).strip().replace(",", ".").replace(" ", ""))) except Exception: return None def _to_float_de(v: Any) -> Optional[float]: if v in _PLACEHOLDER_VALUES: return None if not "," in str(v): return float(str(v)) try: s = str(v).replace(".", "").replace(" ", "").replace(",", ".") return float(s) except Exception: return None def _to_str(v: Any) -> Optional[str]: if v is None: return None s = str(v) if s.strip() in _PLACEHOLDER_VALUES: return None return s def _from_enum(v: Any, pairs: Iterable[Tuple[str, str]]) -> Optional[str]: if v in _PLACEHOLDER_VALUES: return None s = str(v) for key, _ in pairs: if s == key: return key for key, label in pairs: if s == label: return key return None def _coerce(v: Any, spec: Mapping[str, Any]) -> Any: t = spec.get("type") if t is bool: return _to_bool(v) if t is int: return _to_int(v) if t is float: return _to_float_de(v) if t is str: return _to_str(v) if t == "enum": return _from_enum(v, spec.get("values", [])) return None if v in _PLACEHOLDER_VALUES else v _key_index_re = re.compile(r"([^\[\]]+)\[(\d+)\]") def _set_nested(root: Dict[str, Any], dotted: str, value: Any) -> None: parts = dotted.split(".") curr: Any = root for i, part in enumerate(parts): m = _key_index_re.fullmatch(part) is_last = i == len(parts) - 1 if m: k, idx_str = m.group(1), m.group(2) idx = int(idx_str) if k not in curr or not isinstance(curr.get(k), list): curr[k] = [] lst = curr[k] while len(lst) <= idx: lst.append({}) if is_last: lst[idx] = value else: if not isinstance(lst[idx], dict): lst[idx] = {} curr = lst[idx] else: if is_last: curr[part] = value else: if part not in curr or not isinstance(curr[part], dict): curr[part] = {} curr = curr[part] def _detect_best_mapping(form_fields: Mapping[str, Any]) -> str: # Implement logic to detect best mapping based on form fields # Example: Check for specific field names or patterns # Return "QSM", "VSM", or "COMMON" based on the detection if "pa-qsm-financing" in form_fields: return "QSM" if "pa-vsm-financing" in form_fields: return "VSM" return "COMMON" def _merge_mapping(variant: str, form_fields: Mapping[str, Any]) -> Dict[str, Dict[str, Any]]: v = (variant or "").strip().upper() if v == "AUTO": # Find best mapping based on form fields return _merge_mapping(_detect_best_mapping(form_fields), form_fields) if v == "QSM": return {**TEXT_MAPPING_COMMON, **TEXT_MAPPING_QSM} if v == "VSM": return {**TEXT_MAPPING_COMMON, **TEXT_MAPPING_VSM} return dict(TEXT_MAPPING_COMMON) _cost_name_pat = re.compile(r"^pa-cost-(\d+)-name$") _cost_amt_pat = re.compile(r"^pa-cost-(\d+)-amount-euro$") def detect_variant(form_fields: Mapping[str, Any]) -> str: """Best-effort variant detection from raw PDF fields.""" keys = set(form_fields.keys()) if "pa-qsm-financing" in keys: return "QSM" if "pa-vsm-financing" in keys: return "VSM" return "COMMON" def map_form_to_payload(form_json: Dict[str, Dict[str, Any]], variant: str) -> Dict[str, Any]: """ Map PDF-like form JSON (fieldName -> dict with '/V', etc.) to nested payload using TEXT_MAPPING_* dicts. Unknown/empty fields are skipped. """ form_fields = form_json.keys() mapping = _merge_mapping(variant, form_fields) out: Dict[str, Any] = {} # Pre-collect costs costs_tmp: Dict[int, Dict[str, Any]] = {} # First pass for field_name, meta in form_json.items(): raw_val = meta.get("/V") # Costs pattern m_name = _cost_name_pat.match(field_name) m_amt = _cost_amt_pat .match(field_name) if m_name: idx = int(m_name.group(1)) # 1..24 -> zero-based costs_tmp.setdefault(idx - 1, {})["name"] = _to_str(raw_val) continue if m_amt: idx = int(m_amt.group(1)) costs_tmp.setdefault(idx - 1, {})["amountEur"] = _to_float_de(raw_val) continue spec = mapping.get(field_name) if not spec: continue coerced = _coerce(raw_val, spec) include = isinstance(coerced, bool) or coerced not in (None, "", []) if not include: continue target = spec["target-key"] _set_nested(out, target, coerced) # Costs into payload (skip empty rows) for idx, row in sorted(costs_tmp.items()): if row.get("name") is None and row.get("amountEur") is None: continue _set_nested(out, f"pa.project.costs[{idx}]", row) # Required check missing = [] for fname, spec in mapping.items(): if not spec.get("required"): continue tkey = spec["target-key"] cursor = out ok = True for part in tkey.split("."): mm = _key_index_re.fullmatch(part) if mm: k, idx = mm.group(1), int(mm.group(2)) if k not in cursor or not isinstance(cursor[k], list) or len(cursor[k]) <= idx: ok = False break cursor = cursor[k][idx] else: if part not in cursor: ok = False break cursor = cursor[part] if not ok: missing.append((fname, tkey)) if missing: out.setdefault("_validation", {})["missingRequired"] = missing return out # ========================= # Builders to dataclasses # ========================= def _get(d: Mapping[str, Any], path: str, default=None): curr = d for part in path.split("."): if not isinstance(curr, Mapping) or part not in curr: return default curr = curr[part] return curr def payload_to_model(payload: Dict[str, Any]) -> RootPayload: # Meta meta_dict = _get(payload, "pa.meta", {}) or {} meta = Meta( id=meta_dict.get("id"), key=meta_dict.get("key"), ) # Build Applicant applicant_dict = _get(payload, "pa.applicant", {}) or {} applicant = Applicant( type=applicant_dict.get("type") or "person", institution=Institution( name=_get(applicant_dict, "institution.name"), type=_get(applicant_dict, "institution.type"), ), name=Name( first=_get(applicant_dict, "name.first"), last=_get(applicant_dict, "name.last"), ), course=applicant_dict.get("course"), role=applicant_dict.get("role"), contact=Contact( email=_get(applicant_dict, "contact.email"), phone=_get(applicant_dict, "contact.phone"), ), ) # Project project_dict = _get(payload, "pa.project", {}) or {} costs = [] for c in project_dict.get("costs", []) or []: if not isinstance(c, Mapping): continue costs.append(Cost(name=c.get("name"), amountEur=c.get("amountEur"))) # Financing vsm_dict = _get(project_dict, "financing.vsm", {}) or {} vsm = None if vsm_dict: vsm = VSM( code=vsm_dict.get("code"), flags=VSMFlags( aufgaben=_get(vsm_dict, "flags.aufgaben"), individuell=_get(vsm_dict, "flags.individuell"), ), ) qsm_dict = _get(project_dict, "financing.qsm", {}) or {} qsm = None if qsm_dict: qsm = QSM( code=qsm_dict.get("code"), flags=QSMFlags( stellenfinanzierungen=_get(qsm_dict, "flags.stellenfinanzierungen"), studierende=_get(qsm_dict, "flags.studierende"), individuell=_get(qsm_dict, "flags.individuell"), exkursionGenehmigt=_get(qsm_dict, "flags.exkursionGenehmigt"), exkursionBezuschusst=_get(qsm_dict, "flags.exkursionBezuschusst"), ), ) project = Project( name=project_dict.get("name"), dates=Dates( start=_get(project_dict, "dates.start"), end=_get(project_dict, "dates.end"), ), participants=project_dict.get("participants"), description=project_dict.get("description"), participation=Participation( faculties=ParticipationFaculties( inf=_get(project_dict, "participation.faculties.inf"), esb=_get(project_dict, "participation.faculties.esb"), ls=_get(project_dict, "participation.faculties.ls"), tec=_get(project_dict, "participation.faculties.tec"), tex=_get(project_dict, "participation.faculties.tex"), nxt=_get(project_dict, "participation.faculties.nxt"), open=_get(project_dict, "participation.faculties.open"), ) ), costs=costs, financing=Financing(vsm=vsm, qsm=qsm), ) # Attachments attachments_dict = _get(payload, "pa.attachments", {}) or {} attachments = Attachments( comparativeOffers=attachments_dict.get("comparativeOffers"), fakultaet=attachments_dict.get("fakultaet"), ) # Warning warning_dict = payload.get("warning", {}) or {} warning = WarningInfo(notSupported=warning_dict.get("notSupported")) root = RootPayload( applicant=applicant, project=project, attachments=attachments, meta=meta, warning=warning, _validation=payload.get("_validation", {}), ) # Für JSON-Kompatibilität auch pa zusammenbauen root.pa = { "meta": asdict(meta), "applicant": asdict(applicant), "project": asdict(project), "attachments": asdict(attachments), } return root # ========================= # PDF reading + end-to-end # ========================= def read_pdf_fields(pdf_file: str) -> Dict[str, Dict[str, Any]]: with open(pdf_file, "rb") as f: reader = PyPDF2.PdfReader(f, strict=True) fields = reader.get_fields() or {} # ensure dict[str, dict] and keep only keys we care about return {k: (v or {}) for k, v in fields.items()} def pdf_to_payload(pdf_file: str, variant: Optional[str] = None) -> RootPayload: """ Extract, map, and convert to dataclass model. """ form_fields = read_pdf_fields(pdf_file) v = variant or detect_variant(form_fields) mapped = map_form_to_payload(form_fields, v) return payload_to_model(mapped) def pdf_to_json(pdf_file: str, variant: Optional[str] = None) -> str: """ Convenience: return the structured JSON string of the mapped payload. """ model = pdf_to_payload(pdf_file, variant=variant) return json.dumps(asdict(model), ensure_ascii=False, indent=2) # ========================= # CLI # ========================= if __name__ == "__main__": parser = ArgumentParser(description="Extract PDF Form Data and Convert to structured JSON") parser.add_argument("pdf_file", help="Path to the PDF file") parser.add_argument( "--variant", choices=["QSM", "VSM", "COMMON", "AUTO", "auto"], default="AUTO", help="Form variant (default: AUTO)", ) args = parser.parse_args() v = None if args.variant.upper() == "AUTO" else args.variant.upper() print(pdf_to_json(args.pdf_file, variant=v))