448 lines
18 KiB
Python
448 lines
18 KiB
Python
# sn_basis/modules/Datenschreiber.py
|
|
"""
|
|
Modul Datenschreiber
|
|
|
|
Enthält die Klasse Datenschreiber mit drei Hauptmethoden:
|
|
|
|
- schreibe_Daten: schreibt die abgerufenen Daten in die Ziel-GPKG/Dateien,
|
|
fragt bei vorhandenen Layern nach Überschreiben/Anhängen/Abbrechen und
|
|
legt Stile in der Datenbank ab.
|
|
- lade_Layer: lädt die erzeugten/aktualisierten Layer ins Projekt und
|
|
wendet die Vorgabestile an; sortiert abschließend die Layer.
|
|
- schreibe_log: schreibt die verarbeiteten Pruefergebnisse strukturiert in
|
|
eine Log-Datei im angegebenen Speicherort.
|
|
|
|
Die Implementierung verwendet die Wrapper-APIs:
|
|
- qgiscore_wrapper als qgiscore
|
|
- qgisui_wrapper als qgisui (nur wenn nötig)
|
|
- qt_wrapper als qt
|
|
|
|
Wichtig
|
|
------
|
|
Alle Nutzerinteraktionen (z. B. Überschreiben / Anhängen / Abbrechen) werden
|
|
zentral über den Pruefmanager gebündelt. Die Methode `ask_overwrite_append_cancel`
|
|
des Pruefmanagers wird verwendet, damit UI-Interaktionen an einer Stelle
|
|
konsolidiert und testbar sind.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
import os
|
|
import json
|
|
import re
|
|
import datetime
|
|
import sqlite3
|
|
|
|
from sn_basis.functions import qgiscore_wrapper as qgiscore
|
|
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
|
|
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
|
|
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
|
|
|
|
|
class Datenschreiber:
|
|
"""
|
|
Schreibt abgerufene Fachdaten in die Zieldatenbank/Dateien und lädt
|
|
die Layer ins Projekt.
|
|
|
|
Konstruktor
|
|
----------
|
|
pruefmanager:
|
|
Instanz des Pruefmanagers; wird verwendet, um Pruefergebnisse zu
|
|
verarbeiten und Nutzerinteraktionen zu zentralisieren.
|
|
gpkg_path:
|
|
Pfad zur Ziel-GPKG-Datei (oder Verzeichnis). Wenn None, muss der
|
|
Aufrufer einen Speicherort übergeben.
|
|
"""
|
|
|
|
def __init__(self, pruefmanager: Any, gpkg_path: Optional[str] = None) -> None:
|
|
self.pruefmanager = pruefmanager
|
|
self.gpkg_path = str(gpkg_path) if gpkg_path else None
|
|
|
|
def _resolve_style_path(self, style_path: Optional[str]) -> Optional[str]:
|
|
if not style_path:
|
|
return None
|
|
|
|
style_path_str = str(style_path).strip()
|
|
if not style_path_str:
|
|
return None
|
|
|
|
if not is_absolute_path(style_path_str):
|
|
plugin_root = get_plugin_root()
|
|
style_path_str = str(join_path(plugin_root, "sn_plan41", "assets", style_path_str))
|
|
|
|
style_path_str = str(normalize_path(style_path_str))
|
|
return style_path_str if file_exists(style_path_str) else None
|
|
|
|
def _store_style_in_gpkg(self, layer_name: str, style_path: str, layer: Optional[Any] = None) -> None:
|
|
"""Stellt sicher, dass der Stil in der layer_styles-Tabelle der GPKG gespeichert wird."""
|
|
try:
|
|
with open(style_path, "r", encoding="utf-8") as fh:
|
|
style_qml = fh.read()
|
|
|
|
f_geometry_column = ''
|
|
if layer is not None:
|
|
try:
|
|
if hasattr(layer, 'geometryColumn'):
|
|
f_geometry_column = str(layer.geometryColumn())
|
|
elif hasattr(layer, 'dataProvider') and hasattr(layer.dataProvider(), 'geometryColumnName'):
|
|
f_geometry_column = str(layer.dataProvider().geometryColumnName())
|
|
except Exception:
|
|
f_geometry_column = ''
|
|
|
|
with sqlite3.connect(self.gpkg_path) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS layer_styles (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
f_table_catalog TEXT,
|
|
f_table_schema TEXT,
|
|
f_table_name TEXT NOT NULL,
|
|
f_geometry_column TEXT,
|
|
styleName TEXT,
|
|
styleQML TEXT,
|
|
styleSLD TEXT,
|
|
useAsDefault BOOLEAN,
|
|
description TEXT,
|
|
owner TEXT,
|
|
ui TEXT,
|
|
update_time DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
)
|
|
|
|
# Das aktuelle QGIS-Style-Verhalten: bestehenden Style für denselben Layer nicht löschen (nur appenden)
|
|
# Wir wollen aber Default-Style setzen: alte Default-Styles entfernen.
|
|
cur.execute(
|
|
"UPDATE layer_styles SET useAsDefault = 0 WHERE f_table_name = ?",
|
|
(layer_name,),
|
|
)
|
|
|
|
# Fülle die bekannten QGIS-Kolonnen
|
|
style_name = os.path.basename(style_path)
|
|
|
|
cur.execute(
|
|
"INSERT INTO layer_styles (f_table_catalog, f_table_schema, f_table_name, f_geometry_column, styleName, styleQML, styleSLD, useAsDefault, description, owner, ui) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
'',
|
|
'',
|
|
layer_name,
|
|
f_geometry_column,
|
|
style_name,
|
|
style_qml,
|
|
None,
|
|
1,
|
|
'',
|
|
'',
|
|
'',
|
|
),
|
|
)
|
|
conn.commit()
|
|
except Exception as exc:
|
|
self.pruefmanager.verarbeite(
|
|
pruef_ergebnis(
|
|
ok=False,
|
|
meldung=f"Fehler beim Speichern des Layer-Stils in GPKG: {exc}",
|
|
aktion="style_gpkg_speichern_fehlgeschlagen",
|
|
kontext={"layer_name": layer_name, "style_path": style_path},
|
|
)
|
|
)
|
|
# ------------------------------------------------------------------ #
|
|
def schreibe_Daten(
|
|
self,
|
|
daten_dict: Dict[str, Any],
|
|
processed_results: List[Any],
|
|
speicherort: str,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Schreibt die übergebenen Layer in die Ziel-GPKG.
|
|
|
|
Erwartung:
|
|
- daten_dict["daten"] enthält Einträge der Form:
|
|
ident -> {"layer": QgsVectorLayer}
|
|
- self.gpkg_path ist ein str
|
|
"""
|
|
|
|
if not speicherort:
|
|
raise ValueError("Ein gültiger Speicherort (speicherort) muss übergeben werden.")
|
|
|
|
# gpkg_path einmalig setzen / normalisieren
|
|
if not self.gpkg_path:
|
|
self.gpkg_path = str(speicherort)
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
daten_map: Dict[str, Any] = daten_dict.get("daten", {})
|
|
|
|
for ident, entry in daten_map.items():
|
|
layer = None
|
|
style_path = None
|
|
|
|
# -----------------------------
|
|
# Layer extrahieren
|
|
# -----------------------------
|
|
if isinstance(entry, dict):
|
|
layer = entry.get("layer")
|
|
style_path = self._resolve_style_path(entry.get("style_path"))
|
|
|
|
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
|
|
pe_err = pruef_ergebnis(
|
|
ok=False,
|
|
meldung=f"Ungültiger Layer für {ident}",
|
|
aktion="save_exception",
|
|
kontext={"ident": ident},
|
|
)
|
|
self.pruefmanager.verarbeite(pe_err)
|
|
continue
|
|
|
|
# -----------------------------
|
|
# Layername bestimmen
|
|
# -----------------------------
|
|
thema = None
|
|
for pe in processed_results:
|
|
try:
|
|
kontext = getattr(pe, "kontext", None) or {}
|
|
if kontext.get("ident") == ident:
|
|
thema = kontext.get("thema")
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
layer_name_raw = thema or str(ident)
|
|
layer_name = re.sub(r"[^A-Za-z0-9_]+", "_", layer_name_raw).strip("_")
|
|
if not layer_name:
|
|
layer_name = f"layer_{ident}"
|
|
|
|
# Layer in GPKG schreiben
|
|
err_msg = self._write_layer_to_gpkg(layer_name=layer_name, layer=layer)
|
|
if err_msg is not None:
|
|
pe_err = pruef_ergebnis(
|
|
ok=False,
|
|
meldung=f"Fehler beim Schreiben des Layers {layer_name}: {err_msg}",
|
|
aktion="save_exception",
|
|
kontext={"ident": ident, "layer_name": layer_name},
|
|
)
|
|
self.pruefmanager.verarbeite(pe_err)
|
|
continue
|
|
|
|
# Wenn der Stil vorhanden und valide ist, als Default in GPKG-Style-Tabelle ablegen
|
|
if style_path:
|
|
self._store_style_in_gpkg(layer_name, style_path, layer)
|
|
|
|
# Erfolgsfall: Info für lade_Layer sammeln
|
|
layer_path = f"{self.gpkg_path}|layername={layer_name}"
|
|
results.append({
|
|
"layer_path": layer_path,
|
|
"thema": layer_name,
|
|
"ident": ident,
|
|
"style_path": style_path,
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
# -----------------------------
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Lade Layer ins Projekt
|
|
# ------------------------------------------------------------------ #
|
|
def lade_Layer(self, layer_infos: List[Dict[str, Any]]) -> None:
|
|
"""
|
|
Lädt die in schreibe_Daten erzeugten/aktualisierten Layer ins Projekt
|
|
und wendet die Vorgabestile an.
|
|
"""
|
|
loaded_layers = []
|
|
|
|
for info in layer_infos:
|
|
layer_path = info.get("layer_path")
|
|
thema = info.get("thema")
|
|
if not layer_path:
|
|
continue
|
|
|
|
try:
|
|
layer = qgiscore.QgsVectorLayer(layer_path, thema, "ogr")
|
|
if not layer or not getattr(layer, "isValid", lambda: False)():
|
|
pe_err = pruef_ergebnis(
|
|
ok=False,
|
|
meldung=f"Layer {thema} konnte nicht geladen werden",
|
|
aktion="layer_nicht_gefunden",
|
|
kontext={"layer_path": layer_path},
|
|
)
|
|
self.pruefmanager.verarbeite(pe_err)
|
|
continue
|
|
except Exception as exc:
|
|
pe_err = pruef_ergebnis(
|
|
ok=False,
|
|
meldung=f"Fehler beim Erzeugen des Layers {thema}: {exc}",
|
|
aktion="layer_nicht_gefunden",
|
|
kontext={"layer_path": layer_path, "error": str(exc)},
|
|
)
|
|
self.pruefmanager.verarbeite(pe_err)
|
|
continue
|
|
|
|
style_path = info.get("style_path")
|
|
resolved_style_path = self._resolve_style_path(style_path)
|
|
if resolved_style_path:
|
|
try:
|
|
layer.loadNamedStyle(resolved_style_path)
|
|
layer.triggerRepaint()
|
|
except Exception as exc:
|
|
pe_warn = pruef_ergebnis(
|
|
ok=True,
|
|
meldung=f"Style konnte für {thema} nicht geladen werden: {exc}",
|
|
aktion="stil_laden_fehlgeschlagen",
|
|
kontext={"thema": thema, "style_path": resolved_style_path},
|
|
)
|
|
self.pruefmanager.verarbeite(pe_warn)
|
|
else:
|
|
try:
|
|
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
|
|
if callable(apply_style_fn):
|
|
apply_style_fn(self.gpkg_path, layer)
|
|
except Exception:
|
|
pe_warn = pruef_ergebnis(
|
|
ok=True,
|
|
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
|
|
aktion="stil_not_implemented",
|
|
kontext={"thema": thema},
|
|
)
|
|
self.pruefmanager.verarbeite(pe_warn)
|
|
|
|
try:
|
|
# qgisui wrapper wird hier nicht direkt für die Abfrage verwendet;
|
|
# qgisui.add_layer_to_project sollte aber vorhanden sein.
|
|
from sn_basis.functions import qgisui_wrapper as qgisui
|
|
add_fn = getattr(qgisui, "add_layer_to_project", None)
|
|
if callable(add_fn):
|
|
add_fn(layer)
|
|
else:
|
|
# Fallback: falls wrapper nicht vorhanden, versuche QGIS-API direkt
|
|
if getattr(qgiscore, "QgsProject", None) is not None and qgiscore.QGIS_AVAILABLE:
|
|
qgiscore.QgsProject.instance().addMapLayer(layer)
|
|
loaded_layers.append(layer)
|
|
except Exception:
|
|
pe_err = pruef_ergebnis(
|
|
ok=False,
|
|
meldung=f"Layer {thema} konnte nicht ins Projekt geladen werden",
|
|
aktion="layer_nicht_gefunden",
|
|
kontext={"thema": thema},
|
|
)
|
|
self.pruefmanager.verarbeite(pe_err)
|
|
continue
|
|
|
|
# Sortiere Layer im Projekt nach ID (Wrapper-Funktion bevorzugt)
|
|
sort_fn = getattr(qgiscore, "sort_layers_by_id", None)
|
|
if callable(sort_fn):
|
|
try:
|
|
sort_fn()
|
|
except Exception:
|
|
pass
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Schreibe Log
|
|
# ------------------------------------------------------------------ #
|
|
def schreibe_log(self, processed_results: List[Any], speicherort: str) -> str:
|
|
"""
|
|
Schreibt die verarbeiteten Pruefergebnisse strukturiert in eine Log-Datei.
|
|
"""
|
|
if not speicherort:
|
|
raise ValueError("Ein gültiger Speicherort muss übergeben werden.")
|
|
|
|
log_dir = speicherort
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
log_path = os.path.join(log_dir, f"datenabruf_log_{timestamp}.json")
|
|
|
|
serializable: List[Dict[str, Any]] = []
|
|
for pe in processed_results:
|
|
try:
|
|
entry = {}
|
|
entry["ok"] = getattr(pe, "ok", None) if hasattr(pe, "ok") else None
|
|
entry["meldung"] = getattr(pe, "meldung", None) if hasattr(pe, "meldung") else None
|
|
kontext = getattr(pe, "kontext", None) if hasattr(pe, "kontext") else None
|
|
entry["kontext"] = kontext
|
|
serializable.append(entry)
|
|
except Exception:
|
|
serializable.append({"raw": str(pe)})
|
|
|
|
with open(log_path, "w", encoding="utf-8") as fh:
|
|
json.dump(serializable, fh, ensure_ascii=False, indent=2)
|
|
|
|
pe_log = pruef_ergebnis(
|
|
ok=True,
|
|
meldung=f"Log geschrieben: {os.path.basename(log_path)}",
|
|
aktion="standarddatei_vorschlagen",
|
|
kontext={"log_path": log_path},
|
|
)
|
|
self.pruefmanager.verarbeite(pe_log)
|
|
|
|
return log_path
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Hilfsfunktionen intern
|
|
# ------------------------------------------------------------------ #
|
|
def _write_layer_to_gpkg(
|
|
self,
|
|
layer_name: str,
|
|
layer: Any,
|
|
) -> Optional[str]:
|
|
"""
|
|
Schreibt einen QgsVectorLayer in die Ziel-GPKG.
|
|
|
|
Voraussetzungen:
|
|
- self.gpkg_path ist ein str
|
|
- layer ist ein gültiger QgsVectorLayer
|
|
"""
|
|
|
|
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
|
|
return "Ungültiger Layer zum Schreiben übergeben"
|
|
|
|
try:
|
|
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
|
|
opts.driverName = "GPKG"
|
|
opts.layerName = layer_name
|
|
opts.fileEncoding = "UTF-8"
|
|
|
|
# Style in der GPKG speichern, wenn möglich
|
|
if hasattr(opts, "symbologyExport"):
|
|
try:
|
|
# QGIS: SymbologyExport-Wert z.B. QgsVectorFileWriter.SaveVectorOptions.Symbology
|
|
saveOpts = qgiscore.QgsVectorFileWriter.SaveVectorOptions
|
|
sym_val = getattr(saveOpts, "Symbology", None)
|
|
if sym_val is None:
|
|
sym_val = getattr(saveOpts, "SymbologyExport", None)
|
|
if sym_val is not None:
|
|
opts.symbologyExport = sym_val
|
|
except Exception:
|
|
pass
|
|
|
|
# Datei existiert → Layer überschreiben
|
|
# Datei existiert nicht → neue GPKG anlegen
|
|
if not os.path.exists(self.gpkg_path):
|
|
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
|
|
else:
|
|
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
|
|
|
|
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
|
|
layer,
|
|
self.gpkg_path,
|
|
qgiscore.QgsProject.instance().transformContext(),
|
|
opts,
|
|
)
|
|
|
|
# QGIS ≥3 liefert ein Tupel: (error_code, error_message, new_filename, new_layer_name)
|
|
if isinstance(err, tuple):
|
|
error_code = err[0]
|
|
error_msg = err[1] if len(err) > 1 else ""
|
|
else:
|
|
error_code = err
|
|
error_msg = ""
|
|
|
|
if error_code != qgiscore.QgsVectorFileWriter.NoError:
|
|
return f"Fehler beim Schreiben (Code {error_code}, msg='{error_msg}')"
|
|
|
|
return None
|
|
|
|
except Exception as exc:
|
|
return str(exc)
|