DataGrabber aktualisiert, grabberfunktionen aus dem Prototyp implementiert

This commit is contained in:
2026-02-14 22:14:33 +01:00
parent e6ffab1c10
commit f8be65f6f6
8 changed files with 1324 additions and 42 deletions

View File

@@ -2,8 +2,7 @@
sn_basis/functions/qgiscore_wrapper.py zentrale QGIS-Core-Abstraktion
"""
from typing import Type, Any
from typing import Type, Any, Optional
from sn_basis.functions.qt_wrapper import (
QUrl,
QEventLoop,
@@ -16,9 +15,11 @@ from sn_basis.functions.qt_wrapper import (
QgsProject: Type[Any]
QgsVectorLayer: Type[Any]
QgsRasterLayer: Type[Any]
QgsNetworkAccessManager: Type[Any]
Qgis: Type[Any]
QgsMapLayerProxyModel: Type[Any]
QgsVectorFileWriter: Type[Any] # neu: Schreib-API
QGIS_AVAILABLE = False
@@ -30,16 +31,20 @@ try:
from qgis.core import (
QgsProject as _QgsProject,
QgsVectorLayer as _QgsVectorLayer,
QgsRasterLayer as _QgsRasterLayer,
QgsNetworkAccessManager as _QgsNetworkAccessManager,
Qgis as _Qgis,
QgsMapLayerProxyModel as _QgsMaplLayerProxyModel
QgsMapLayerProxyModel as _QgsMaplLayerProxyModel,
QgsVectorFileWriter as _QgsVectorFileWriter,
)
QgsProject = _QgsProject
QgsVectorLayer = _QgsVectorLayer
QgsRasterLayer = _QgsRasterLayer
QgsNetworkAccessManager = _QgsNetworkAccessManager
Qgis = _Qgis
QgsMapLayerProxyModel = _QgsMaplLayerProxyModel
QgsVectorFileWriter = _QgsVectorFileWriter
QGIS_AVAILABLE = True
@@ -76,6 +81,9 @@ except Exception:
def triggerRepaint(self) -> None:
pass
def dataProvider(self):
return None
QgsVectorLayer = _MockQgsVectorLayer
class _MockQgsNetworkAccessManager:
@@ -86,6 +94,28 @@ except Exception:
def head(self, request: Any):
return None
class _MockQgsRasterLayer:
"""
Minimaler Mock für QgsRasterLayer, ausreichend für Tests und
um im Datenabruf ein Raster-Layer-Objekt im pruef_ergebnis kontext mitzugeben.
"""
def __init__(self, source: str, name: str = "Raster", provider: str = "wms"):
self.source = source
self._name = name
self.provider = provider
self._valid = True
def isValid(self) -> bool:
return self._valid
def name(self) -> str:
return self._name
def dataProvider(self):
return None
QgsRasterLayer = _MockQgsRasterLayer
QgsNetworkAccessManager = _MockQgsNetworkAccessManager
class _MockQgis:
@@ -112,6 +142,63 @@ except Exception:
QgsMapLayerProxyModel = _MockQgsMapLayerProxyModel
# ---------------------------------------------------------
# Mock für QgsVectorFileWriter
# ---------------------------------------------------------
class _MockSaveVectorOptions:
"""
Minimaler Ersatz für QgsVectorFileWriter.SaveVectorOptions.
Felder werden als einfache Attribute bereitgestellt.
"""
def __init__(self):
self.driverName: str = "GPKG"
self.layerName: Optional[str] = None
self.fileEncoding: str = "UTF-8"
# Action-Konstanten werden symbolisch verwendet
self.actionOnExistingFile: Optional[int] = None
class _MockQgsVectorFileWriter:
"""
Minimaler Mock für QgsVectorFileWriter mit der benötigten API:
- SaveVectorOptions (als Klasse)
- writeAsVectorFormatV3(layer, path, transformContext, options) -> error_code
- NoError (Konstante)
- CreateOrOverwriteFile / CreateOrOverwriteLayer (Konstanten)
"""
# Fehlerkonstanten (0 = NoError)
NoError = 0
# Action-Konstanten (Werte nur symbolisch)
CreateOrOverwriteFile = 1
CreateOrOverwriteLayer = 2
# SaveVectorOptions-Klasse
SaveVectorOptions = _MockSaveVectorOptions
@staticmethod
def writeAsVectorFormatV3(layer: Any, path: str, transform_context: Any, options: Any) -> int:
"""
Mock-Schreibfunktion.
Verhalten im Mock:
- Wenn 'layer' None oder options.layerName fehlt, geben wir NoError zurück,
aber schreiben nichts (Tests erwarten nur Rückgabecode).
- Diese Implementierung versucht nicht, echte Dateien zu schreiben.
- Rückgabewert: 0 (NoError) bei Erfolg, sonst eine positive Fehlernummer.
"""
try:
# Sehr einfache Validierung: wenn path leer -> Fehler
if not path:
return 999
# Simuliere Erfolg
return _MockQgsVectorFileWriter.NoError
except Exception:
return 999 # generischer Fehlercode
QgsVectorFileWriter = _MockQgsVectorFileWriter
# ---------------------------------------------------------
# Netzwerk
# ---------------------------------------------------------
@@ -154,3 +241,138 @@ def network_head(url: str) -> NetworkReply | None:
return NetworkReply(error=reply.error())
except Exception:
return None
# ---------------------------------------------------------
# Layer-Geometrie / Extent
# ---------------------------------------------------------
def get_layer_extent(layer: Any) -> Any:
"""
Gibt die Ausdehnung (Extent) eines Layers zurück.
Diese Funktion kapselt den Zugriff auf ``layer.extent()`` und dient als
zentrale Abstraktion für alle Stellen, die die Bounding Box eines Layers
benötigen (z.B. für räumliche Filter im Datenabruf).
Verhalten
---------
- Wenn QGIS verfügbar ist und der Layer eine ``extent()``-Methode besitzt,
wird deren Rückgabewert zurückgegeben.
- Wenn QGIS nicht verfügbar ist oder der Layer keine ``extent()``-Methode
hat, wird ``None`` zurückgegeben.
"""
if not QGIS_AVAILABLE or layer is None:
return None
extent_func = getattr(layer, "extent", None)
if callable(extent_func):
try:
return extent_func()
except Exception:
return None
return None
# ---------------------------------------------------------
# Buffer-Layer erzeugen
# ---------------------------------------------------------
def create_buffer_layer(
source_layer: Any,
distance_m: float,
layer_name: str = "BufferLayer"
) -> Optional[Any]:
"""
Erzeugt einen Pufferlayer um alle Features eines Quelllayers.
Diese Funktion dient als zentrale Abstraktion für die Erzeugung eines
Pufferlayers in QGIS. Sie wird z.B. im Datenabruf verwendet, wenn der
Raumfilter ``"Pufferlayer"`` aktiv ist.
Verhalten
---------
- Wenn QGIS verfügbar ist und der ``source_layer`` gültig ist, wird ein
temporärer Vektorlayer erzeugt, der die gepufferten Geometrien enthält.
- Der Puffer wird in Metern angegeben.
- Der zurückgegebene Layer ist **nicht gespeichert**, sondern ein
temporärer Speicherlayer, der anschließend über den UIWrapper ins
Projekt geladen werden kann.
- Wenn QGIS nicht verfügbar ist oder ein Fehler auftritt, wird ``None``
zurückgegeben.
"""
if not QGIS_AVAILABLE:
return None
if source_layer is None or not hasattr(source_layer, "getFeatures"):
return None
try:
# Geometrien puffern
buffered_geoms = []
for feat in source_layer.getFeatures():
geom = feat.geometry()
if geom is None:
continue
buf = geom.buffer(distance_m, 8)
if buf is not None:
buffered_geoms.append(buf)
if not buffered_geoms:
return None
# Neuen Memory-Layer erzeugen
crs = source_layer.crs().authid() if hasattr(source_layer, "crs") else "EPSG:4326"
mem_layer = QgsVectorLayer(f"Polygon?crs={crs}", layer_name, "memory")
prov = mem_layer.dataProvider()
prov.addAttributes([])
mem_layer.updateFields()
# Features hinzufügen
from qgis.core import QgsFeature
for geom in buffered_geoms:
f = QgsFeature()
f.setGeometry(geom)
prov.addFeature(f)
mem_layer.updateExtents()
return mem_layer
except Exception:
return None
#Hilfsfunktion, keine qgiscore-Entsprechung
def layer_exists_in_gpkg(gpkg_path: str, layer_name: str) -> bool:
"""
Prüft, ob ein Layer mit dem Namen `layer_name` in `gpkg_path` existiert.
- bevorzugt: SQLite-Abfrage auf gpkg_contents
- fallback: kurzer Versuch, mit QgsVectorLayer zu laden (wenn QGIS verfügbar)
"""
import os, sqlite3
if not gpkg_path or not layer_name or not os.path.exists(gpkg_path):
return False
# 1) SQLite-Check (schnell)
try:
conn = sqlite3.connect(gpkg_path)
cur = conn.cursor()
cur.execute("SELECT COUNT(1) FROM gpkg_contents WHERE table_name = ?", (layer_name,))
row = cur.fetchone()
conn.close()
if row and row[0] > 0:
return True
except Exception:
# falls sqlite fehlschlägt, weiter zum QGIS-Fallback
pass
# 2) QGIS-Fallback: versuche kurz, den Layer zu laden
try:
if getattr(QgsVectorLayer, "__call__", None) and QGIS_AVAILABLE:
uri = f"{gpkg_path}|layername={layer_name}"
layer = QgsVectorLayer(uri, layer_name, "ogr")
return bool(layer and getattr(layer, "isValid", lambda: False)())
except Exception:
pass
return False

View File

@@ -8,6 +8,7 @@ from typing import Any, List, Type
from sn_basis.functions.qt_wrapper import QDockWidget
from sn_basis.functions.qgiscore_wrapper import QgsProject, QGIS_AVAILABLE
iface: Any
@@ -199,3 +200,48 @@ def remove_toolbar(toolbar: Any) -> None:
iface.removeToolBar(toolbar)
except Exception:
pass
# ---------------------------------------------------------
# Layer zum Projekt hinzufügen
# ---------------------------------------------------------
def add_layer_to_project(layer: Any) -> bool:
"""
Fügt einen Layer dem aktuellen QGIS-Projekt hinzu.
Diese Funktion kapselt den Zugriff auf ``QgsProject.instance().addMapLayer``
und dient als zentrale Abstraktion für alle Stellen, die Layer dynamisch
ins Projekt einfügen möchten (z.B. Pufferlayer im Datenabruf).
Verhalten
---------
- Wenn QGIS verfügbar ist und der Layer gültig ist, wird er dem Projekt
hinzugefügt und ``True`` zurückgegeben.
- Wenn QGIS nicht verfügbar ist oder der Layer ungültig ist, wird
``False`` zurückgegeben.
- Im Mock-Modus wird kein Layer hinzugefügt, aber ``True`` zurückgegeben,
damit Tests ohne QGIS nicht fehlschlagen.
Parameters
----------
layer:
Ein QGIS-Layer (typischerweise ``QgsVectorLayer``), der dem Projekt
hinzugefügt werden soll.
Returns
-------
bool
``True`` bei Erfolg oder im Mock-Modus, sonst ``False``.
"""
if layer is None:
return False
# Mock-Modus: Erfolg simulieren
if not QGIS_AVAILABLE:
return True
try:
project = QgsProject.instance()
project.addMapLayer(layer)
return True
except Exception:
return False

View File

@@ -32,7 +32,7 @@ QTabWidget: type
QToolButton: Type[Any]
QSizePolicy: Type[Any]
Qt: Type[Any]
ComboBox: Type[Any]
QComboBox: Type[Any]
YES: Optional[Any] = None
NO: Optional[Any] = None

View File

@@ -148,68 +148,168 @@ class DataGrabber:
# ------------------------------------------------------------------ #
# Excel-Verarbeitung
#Es werden alle Werte ohne Prüfung der Links, Pfade oder Stile geladen, da verschiedene Plugins verschiedene xlsx-Strukturen haben können
#Es werden alle Werte mit gültigem Link übernommen. Die restliche Struktur
#wird nicht überprüft, da alle Fachplugins unterschiedliche Strukturen haben können
# ------------------------------------------------------------------ #
def process_excel_source(self, filepath: str) -> Tuple[Optional[Dict[str, List[Mapping[str, Any]]]], pruef_ergebnis]:
def process_excel_source(
self,
filepath: str
) -> Tuple[Optional[Dict[str, List[Mapping[str, Any]]]], Any]:
"""
Liest eine Excel-Datei (.xlsx/.xls) mit dem ExcelImporter und gibt ein Dict
mit den Zeilen zurück sowie das vom Pruefmanager verarbeitete pruef_ergebnis.
Liest eine Excel-Datei ein und übernimmt ausschließlich die Zeilen,
deren Link durch den Linkpruefer als gültig eingestuft wurde.
Rückgabe
Ablauf
------
1. Die Excel-Datei wird mit dem ``ExcelImporter`` eingelesen.
Erwartet wird eine Liste von Mappings (z.B. dicts), die jeweils
die Linkparameter enthalten.
2. Für jede Zeile wird der Wert ``row["Link"]`` extrahiert und durch
``self.link_pruefer.pruefe(...)`` geprüft.
3. Das Prüfergebnis wird durch ``self.pruefmanager.verarbeite(...)``
geleitet, der UI-Interaktion, Logging und finale Entscheidung übernimmt.
4. Nur Zeilen, deren verarbeitete Prüfergebnisse ``ok == True`` liefern,
werden in die Ergebnisliste übernommen.
5. Wenn mindestens eine Zeile gültig ist, wird ein Dict der Form::
{"rows": [row1, row2, ...]}
zurückgegeben.
Wenn keine Zeile gültig ist, wird ``None`` zurückgegeben.
Parameter
---------
filepath:
Pfad zur Excel-Datei, die eingelesen werden soll.
Returns
-------
- (data_dict, processed_pruef_ergebnis)
data_dict: {'rows': [Mapping,...]} oder None bei Fehlern
processed_pruef_ergebnis: das Ergebnis, nachdem der Pruefmanager das
interne pruef_ergebnis verarbeitet hat.
Tuple[Optional[Dict[str, List[Mapping[str, Any]]]], pruef_ergebnis]
- ``data``: ``{"rows": [...]} `` wenn gültige Zeilen existieren,
sonst ``None``.
- ``pruef_ergebnis``: ein zusammenfassendes Prüfergebnis, das
den Lesevorgang beschreibt (nicht die Einzelprüfungen).
Hinweise
--------
- Diese Methode führt **keine Normalisierung** durch.
- Die Verantwortung für die Struktur der Excel-Zeilen liegt beim Fachplugin.
- Der Linkpruefer prüft ausschließlich den Wert ``row["Link"]``.
"""
# 1) Excel einlesen
importer = ExcelImporter(filepath=filepath, pruefmanager=self.pruefmanager)
rows = importer.import_xlsx() # erwartet: List[Mapping[str, Any]]
data = {"rows": rows}
pe_ok = pruef_ergebnis(ok=True, meldung="Excel erfolgreich gelesen", aktion="ok", kontext=filepath)
processed = self.pruefmanager.verarbeite(pe_ok)
return data, processed
valid_rows: List[Mapping[str, Any]] = []
# 2) Jede Zeile einzeln prüfen
for row in rows:
raw_link = row.get("Link")
# 2a) Fachliche Prüfung
pe = self.link_pruefer.pruefe(raw_link)
# 2b) Verarbeitung durch den Pruefmanager
processed = self.pruefmanager.verarbeite(pe)
# 2c) Nur gültige Zeilen übernehmen
if getattr(processed, "ok", False):
valid_rows.append(row)
# 3) Zusammenfassendes Prüfergebnis erzeugen
if valid_rows:
pe_ok = pruef_ergebnis(
ok=True,
meldung=f"{len(valid_rows)} gültige Zeilen aus Excel gelesen",
aktion="ok",
kontext=filepath,
)
processed_summary = self.pruefmanager.verarbeite(pe_ok)
return {"rows": valid_rows}, processed_summary
# Keine gültigen Zeilen
pe_fail = pruef_ergebnis(
ok=False,
meldung="Keine gültigen Links in der Excel-Datei gefunden",
aktion="read_error",
kontext=filepath,
)
processed_summary = self.pruefmanager.verarbeite(pe_fail)
return None, processed_summary
# ------------------------------------------------------------------ #
# Einzellink-Verarbeitung
# ------------------------------------------------------------------ #
def process_single_link(self, link: str) -> Tuple[Optional[Dict[str, List[Mapping[str, Any]]]], pruef_ergebnis]:
def process_single_link(
self,
link: Mapping[str, Any]
) -> Tuple[Optional[Dict[str, List[Mapping[str, Any]]]], Any]:
"""
Verarbeitet einen Einzellink.
Prüft einen einzelnen Link anhand der im Link-Dict enthaltenen Link-URL.
Ablauf
------
1. Führt die fachliche Prüfung über self.link_pruefer.pruefe(link) aus.
2. Übergibt das Ergebnis an den Pruefmanager (self.pruefmanager.verarbeite).
3. Wenn die Prüfung nicht OK ist, wird nur das verarbeitete pruef_ergebnis zurückgegeben.
4. Wenn die Prüfung OK ist, erwartet diese Implementierung, dass der Prüfer
die Link-Parameter im pruef_ergebnis.kontext als Mapping bereitstellt.
Dieses Mapping wird unverändert in ein Dict {'rows': [kontext]} überführt
und zusammen mit dem verarbeiteten pruef_ergebnis zurückgegeben.
1. Erwartet wird ein Mapping (z.B. dict), das die Linkparameter enthält.
Mindestens der Schlüssel ``"Link"`` muss vorhanden sein.
Hinweis
------
Diese Funktion enthält keine Fallbacks, keine normalize-/load-Aufrufe und
keine zusätzlichen Validierungen. Der Linkpruefer ist verantwortlich dafür,
bei OK ein geeignetes Mapping im pruef_ergebnis.kontext bereitzustellen.
2. Der eigentliche Link (z.B. URL) wird aus ``link["Link"]`` extrahiert
und an ``self.link_pruefer.pruefe(...)`` übergeben.
3. Das Prüfergebnis wird anschließend durch ``self.pruefmanager.verarbeite(...)``
geleitet, der UIInteraktion, Logging und finale Entscheidung übernimmt.
4. Wenn das verarbeitete Prüfergebnis **nicht OK** ist, wird
``(None, pruef_ergebnis)`` zurückgegeben.
5. Wenn das Prüfergebnis **OK** ist, wird das unveränderte LinkDict
in der Struktur ``{"rows": [link]}`` zurückgegeben.
Parameter
---------
link:
Ein Mapping mit den Linkparametern (z.B. id, Thema, Gruppe, Link,
Anbieter, Stildatei). Diese Methode verändert das Mapping nicht.
Returns
-------
Tuple[Optional[Dict[str, List[Mapping[str, Any]]]], pruef_ergebnis]
- ``data``: ``{"rows": [link]}`` wenn gültig, sonst ``None``
- ``pruef_ergebnis``: das vom Pruefmanager verarbeitete Ergebnis
Hinweise
--------
- Diese Methode führt **keine Normalisierung** durch.
- Die Verantwortung für die Struktur des Link-Dicts liegt beim Fachplugin.
- Der Linkpruefer prüft ausschließlich den Wert ``link["Link"]``.
"""
# 1) Fachliche Prüfung durch den Linkpruefer
pe = self.link_pruefer.pruefe(link)
# 2) Pruefmanager verarbeiten lassen (Logging / UI / Entscheidung)
processed = self.pruefmanager.verarbeite(pe)
# 1) Link extrahieren (Fachplugin garantiert, dass "Link" existiert)
raw_link = link.get("Link")
# 3) Wenn Prüfung nicht OK -> nur das verarbeitete pruef_ergebnis zurückgeben
# 2) Fachliche Prüfung durch den Linkpruefer
pruef_ergebnis = self.link_pruefer.pruefe(raw_link)
# 3) Verarbeitung durch den Pruefmanager
processed = self.pruefmanager.verarbeite(pruef_ergebnis)
# 4) Wenn Prüfung nicht OK → keine Daten zurückgeben
if not getattr(processed, "ok", False):
return None, processed
# 4) Prüfung OK -> Prüfer liefert die Link-Parameter im pruef_ergebnis.kontext
kontext = getattr(pe, "kontext", None)
data = {"rows": [kontext]}
# Erwartung: kontext ist ein Mapping mit den Link-Parametern.
# Wir übergeben es unverändert in das rows-Format.
# 5) Prüfung OK → unverändertes Link-Dict zurückgeben
data = {"rows": [link]}
return data, processed
# ------------------------------------------------------------------ #
# Datenbank-Verarbeitung
# ------------------------------------------------------------------ #

405
modules/Datenabruf.py Normal file
View File

@@ -0,0 +1,405 @@
# sn_basis/modules/Datenabruf.py
"""
Modul ``datenabruf``
Enthält die Klasse :class:`Datenabruf`, die für eine Menge bereits
validierter Links (aus ``validate_rows``) die Fachdaten abruft und
aggregierte Prüfergebnisse liefert.
Designprinzipien
----------------
- Die BBOX wird serverseitig angewendet: wenn ein Raumfilter aktiv ist,
wird die BBOX in die Abruf-URL eingebettet (außer bei WMS).
- Alle QGIS-Interaktionen laufen über die Wrapper `qgiscore_wrapper` und
`qgisui_wrapper`.
- Fehler werden als kurze Strings zurückgegeben und zentral in `log_fehler`
gesammelt; erfolgreiche Aufrufe werden in `log_geladen` protokolliert.
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
"""
from typing import Any, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import json
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qgiscore_wrapper as qgiscore
from sn_basis.functions import qgisui_wrapper as qgisui
from sn_basis.functions import qt_wrapper as qt
DataDict = Dict[str, List[Mapping[str, Any]]]
class Datenabruf:
"""
Führt den eigentlichen Fachdatenabruf für eine Menge validierter Links durch.
Erwartet ein ``DataDict`` der Form ``{"rows": [row1, row2, ...]}``.
"""
def __init__(self, pruefmanager: Any) -> None:
"""
Initialisiert eine neue Instanz des Datenabrufs.
Parameters
----------
pruefmanager:
Instanz des Pruefmanagers, der :class:`pruef_ergebnis` verarbeitet.
"""
self.pruefmanager = pruefmanager
# ------------------------------------------------------------------ #
# Öffentliche API
# ------------------------------------------------------------------ #
def datenabruf(
self,
result_dict: DataDict,
raumfilter: str,
verfahrensgebiet_layer: Any,
speicherort: str,
pruef_ergebnisse: Optional[List[Any]] = None,
) -> Tuple[Dict[str, Any], List[Any]]:
"""
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
liefert ein DatenDict sowie die Liste verarbeiteter Pruefergebnisse.
Logging / Aggregation
---------------------
Am Ende enthält das zusammenfassende PruefErgebnis im Kontext:
- geladen: dict(dienst -> anzahl geladen)
- fehler: dict(dienst -> fehlermeldung)
- relevant: dict(dienst -> anzahl relevant)
- ausserhalb: dict(dienst -> anzahl geladen, aber ausserhalb)
"""
if pruef_ergebnisse is None:
processed_results: List[Any] = []
else:
processed_results = list(pruef_ergebnisse)
rows = result_dict.get("rows", [])
daten: Dict[str, List[Any]] = {}
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
# Globale Logs über alle Dienste hinweg
log_geladen: Dict[str, int] = {}
log_fehler: Dict[str, str] = {}
log_relevant: Dict[str, int] = {}
log_ausserhalb: Dict[str, int] = {}
# 2) Über alle Zeilen iterieren
for row in rows:
ident = row.get("ident")
link = row.get("Link")
provider = row.get("Provider")
if not ident or not link or not provider:
pe = pruef_ergebnis(
ok=False,
meldung="Ungültige Zeile im Datenabruf (fehlende Pflichtfelder)",
aktion="pflichtfelder_fehlen",
kontext=row,
)
processed_results.append(self.pruefmanager.verarbeite(pe))
continue
# Lesbarer Dienstname für Logs
thema = row.get("Inhalt") or row.get("Thema") or row.get("Titel") or str(ident)
# 2a) Provider-spezifische URL zusammenbauen
# Wenn Raumfilter aktiv ist, übergeben wir bbox_geom an _build_provider_url,
# außer bei WMS (WMS bleibt unverändert).
use_bbox = (raumfilter != "ohne") and (str(provider).upper() != "WMS")
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
# 2b) Fachdaten abrufen
features, error_msg = self._fetch_features(url=url, provider=str(provider))
# 2c) Logs und Aggregation
if error_msg:
# Fehler beim Abruf
log_fehler[thema] = error_msg
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Abruf von {thema}: {error_msg}",
aktion="url_nicht_erreichbar",
kontext={"ident": ident, "thema": thema, "url": url, "error": error_msg},
)
processed_results.append(self.pruefmanager.verarbeite(pe_err))
# daten[ident] bleibt nicht gesetzt oder leer
daten[str(ident)] = []
continue
# Erfolgreich aufgerufen (auch wenn features == [])
anzahl_geladen = len(features)
log_geladen[thema] = anzahl_geladen
# Da die BBOX serverseitig angewendet wurde:
# - anzahl_geladen > 0 -> relevant
# - anzahl_geladen == 0 -> ausserhalb
if anzahl_geladen > 0:
log_relevant[thema] = anzahl_geladen
daten[str(ident)] = features
else:
log_ausserhalb[thema] = 0
daten[str(ident)] = []
# 2d) Kurzes Prüfergebnis pro Zeile
pe_row = pruef_ergebnis(
ok=True,
meldung=(
f"Datenabruf für ident={ident}: {anzahl_geladen} geladene Objekte"
),
aktion="datenabruf",
kontext={
"ident": ident,
"thema": thema,
"anzahl_gesamt": anzahl_geladen,
"url": url,
},
)
processed_results.append(self.pruefmanager.verarbeite(pe_row))
# 3) Zusammenfassendes Prüfergebnis (wie alter DataGrabber)
summary_kontext = {
"geladen": log_geladen,
"fehler": log_fehler,
"relevant": log_relevant,
"ausserhalb": log_ausserhalb,
}
pe_summary = pruef_ergebnis(
ok=(len(log_fehler) == 0),
meldung=(
f"Datenabruf abgeschlossen: {len(log_geladen)} Dienste geladen, "
f"{len(log_fehler)} Fehler"
),
aktion="datenabruf",
kontext=summary_kontext,
)
processed_results.append(self.pruefmanager.verarbeite(pe_summary))
daten_dict: Dict[str, Any] = {
"speicherort": speicherort,
"daten": daten,
}
return daten_dict, processed_results
# ------------------------------------------------------------------ #
# Hilfsmethoden: räumlicher Filter
# ------------------------------------------------------------------ #
def _determine_spatial_filter(self, raumfilter: str, verfahrensgebiet_layer: Any) -> Optional[Any]:
"""
Bestimmt die räumliche Filtergeometrie (BBox) abhängig vom Raumfilter.
Returns
-------
Optional[Any]
Eine Geometrie/Extent (z. B. QgsRectangle) oder ``None``.
"""
if raumfilter == "ohne":
return None
if verfahrensgebiet_layer is None:
return None
if raumfilter == "Verfahrensgebiet":
return qgiscore.get_layer_extent(verfahrensgebiet_layer)
if raumfilter == "Pufferlayer":
buffer_layer = qgiscore.create_buffer_layer(
source_layer=verfahrensgebiet_layer,
distance_m=1000.0,
layer_name="Verfahrensgebiet_Puffer_1km",
)
if buffer_layer is not None:
qgisui.add_layer_to_project(buffer_layer)
return qgiscore.get_layer_extent(buffer_layer)
return None
# ------------------------------------------------------------------ #
# Hilfsmethoden: Provider-URL und Datenabruf
# ------------------------------------------------------------------ #
def _build_provider_url(self, link: str, provider: str, bbox_geom: Optional[Any]) -> str:
"""
Baut eine Provider-spezifische Abruf-URL. Wenn `bbox_geom` übergeben
wird, wird sie in die URL eingebettet (außer bei WMS).
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
"""
provider_norm = (provider or "").upper()
base_link = link or ""
# WMS: niemals BBOX anhängen
if provider_norm == "WMS":
return base_link
if bbox_geom is None:
return base_link
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
bbox_str: Optional[str] = None
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(bbox_geom)
else:
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
else:
bbox_str = str(bbox_geom)
except Exception:
bbox_str = None
if not bbox_str:
return base_link
parsed = urlparse(base_link)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
if provider_norm == "WFS":
query_params.setdefault("BBOX", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
query_params.setdefault("f", query_params.get("f", "json"))
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
# Default: generischer bbox-Parameter
query_params.setdefault("bbox", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]:
"""
Führt den eigentlichen Abruf der Fachdaten durch.
Returns
-------
Tuple[List[Any], Optional[str]]
- features: Liste der geladenen Features (ggf. leer)
- error_msg: None bei Erfolg, sonst kurzer Fehlertext
"""
features: List[Any] = []
prov = str(provider).upper()
# WMS: kein Featureabruf; caller behandelt WMS separat (hier defensiv)
if prov == "WMS":
return [], None
# OGR / lokale Dateien: versuche QGIS-Layer (wenn QGIS verfügbar)
if prov in ("OGR", "GPKG", "SHP", "GEOJSON"):
if getattr(qgiscore, "QGIS_AVAILABLE", False):
try:
layer = qgiscore.QgsVectorLayer(url, "tmp", "ogr")
if not layer or not getattr(layer, "isValid", lambda: False)():
return [], "Layer ungültig oder konnte nicht geladen werden"
for feat in layer.getFeatures():
features.append(feat)
return features, None
except FileNotFoundError:
return [], "Lokale Datei nicht gefunden"
except Exception as exc:
return [], f"Fehler beim Laden der OGR-Quelle: {exc}"
else:
# Mock: falls GeoJSON-Datei vorhanden, versuche lokale Datei zu lesen
try:
if url.lower().endswith(".geojson"):
with open(url, "r", encoding="utf-8") as fh:
data = json.load(fh)
if isinstance(data, dict) and data.get("type") == "FeatureCollection":
return data.get("features", []), None
return [], "Keine QGIS-Umgebung und keine lesbare lokale GeoJSON"
except FileNotFoundError:
return [], "Lokale Datei nicht gefunden"
except Exception as exc:
return [], f"Fehler beim Lesen lokaler GeoJSON (Mock): {exc}"
# HTTP-basierte Dienste (WFS, REST/ArcGIS, generisch)
response_text: Optional[str] = None
http_error: Optional[str] = None
# QGIS NetworkAccessManager bevorzugen
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
try:
manager = qgiscore.QgsNetworkAccessManager.instance()
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None)
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None)
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None)
if QUrl is not None and QNetworkRequest is not None:
req = QNetworkRequest(QUrl(url))
reply = manager.get(req)
if QEventLoop is not None:
loop = QEventLoop()
reply.finished.connect(loop.quit)
loop.exec()
try:
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
try:
response_text = reply.text()
except Exception:
response_text = None
except Exception as exc:
http_error = f"QgsNetworkAccessManager error: {exc}"
response_text = None
# Fallback: requests
if response_text is None:
try:
import requests # lokal import, keine harte Abhängigkeit
r = requests.get(url, timeout=30)
r.raise_for_status()
response_text = r.text
except Exception as exc:
http_error = f"requests error: {exc}"
response_text = None
if response_text is None:
return [], http_error or "keine Antwort vom Server"
# Versuche JSON/GeoJSON zu parsen
try:
parsed = json.loads(response_text)
if isinstance(parsed, dict) and parsed.get("type") == "FeatureCollection":
return parsed.get("features", []), None
if isinstance(parsed, dict) and "features" in parsed:
return parsed.get("features", []), None
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
return [parsed], None
except json.JSONDecodeError:
# Nicht-JSON-Antwort (z. B. GML). Wenn QGIS verfügbar, versuche GML via temporärer Datei + OGR
if getattr(qgiscore, "QGIS_AVAILABLE", False):
try:
import tempfile
with tempfile.NamedTemporaryFile(suffix=".gml", delete=False, mode="w", encoding="utf-8") as fh:
fh.write(response_text)
tmp_path = fh.name
layer = qgiscore.QgsVectorLayer(tmp_path, "tmp_gml", "ogr")
if layer and getattr(layer, "isValid", lambda: False)():
for feat in layer.getFeatures():
features.append(feat)
return features, None
return [], "GML-Antwort konnte nicht als Layer geladen werden"
except Exception as exc:
return [], f"Fehler beim Parsen von GML: {exc}"
# Wenn alles fehlschlägt:
return [], "Antwort konnte nicht als JSON oder GML geparst werden"

435
modules/Datenschreiber.py Normal file
View File

@@ -0,0 +1,435 @@
# sn_basis/modules/Datenschreiber.py
"""
Modul Datenschreiber
Enthält die Klasse Datenschreiber mit drei Hauptmethoden:
- schreibe_Daten: schreibt die abgerufenen Daten in die Ziel-GPKG/Dateien,
fragt bei vorhandenen Layern nach Überschreiben/Anhängen/Abbrechen und
legt Stile in der Datenbank ab.
- lade_Layer: lädt die erzeugten/aktualisierten Layer ins Projekt und
wendet die Vorgabestile an; sortiert abschließend die Layer.
- schreibe_log: schreibt die verarbeiteten Pruefergebnisse strukturiert in
eine Log-Datei im angegebenen Speicherort.
Die Implementierung verwendet die Wrapper-APIs:
- qgiscore_wrapper als qgiscore
- qgisui_wrapper als qgisui (nur wenn nötig)
- qt_wrapper als qt
Wichtig
------
Alle Nutzerinteraktionen (z. B. Überschreiben / Anhängen / Abbrechen) werden
zentral über den Pruefmanager gebündelt. Die Methode `ask_overwrite_append_cancel`
des Pruefmanagers wird verwendet, damit UI-Interaktionen an einer Stelle
konsolidiert und testbar sind.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import os
import json
import datetime
from sn_basis.functions import qgiscore_wrapper as qgiscore
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
class Datenschreiber:
"""
Schreibt abgerufene Fachdaten in die Zieldatenbank/Dateien und lädt
die Layer ins Projekt.
Konstruktor
----------
pruefmanager:
Instanz des Pruefmanagers; wird verwendet, um Pruefergebnisse zu
verarbeiten und Nutzerinteraktionen zu zentralisieren.
gpkg_path:
Pfad zur Ziel-GPKG-Datei (oder Verzeichnis). Wenn None, muss der
Aufrufer einen Speicherort übergeben.
"""
def __init__(self, pruefmanager: Any, gpkg_path: Optional[str] = None) -> None:
self.pruefmanager = pruefmanager
self.gpkg_path = gpkg_path
# ------------------------------------------------------------------ #
# Schreibe Daten
# ------------------------------------------------------------------ #
def schreibe_Daten(
self,
daten_dict: Dict[str, Any],
processed_results: List[Any],
speicherort: str,
) -> List[Dict[str, Any]]:
"""
Schreibt die abgerufenen Daten in die Zieldatenbank/Dateien.
Ablauf
------
Für jede Zeile (ident) in ``daten_dict["daten"]``:
1. Bestimme Ziel-Layername (z. B. Thema oder ident).
2. Prüfe, ob ein Layer mit diesem Namen bereits existiert (Wrapper).
3. Falls vorhanden, frage den Benutzer (Überschreiben / Anhängen / Abbrechen)
über die zentrale Pruefmanager-Methode `ask_overwrite_append_cancel`.
4. Führe die gewählte Operation aus oder schreibe den Layer, wenn er noch nicht existiert.
5. Schreibe ggf. den Stil in die GPKG und setze ihn als Vorgabe.
6. Sammle und gib eine Liste der angelegten/geänderten Layer zurück.
Returns
-------
List[Dict[str, Any]]
Liste von Dicts mit Informationen zu jedem angelegten/geänderten Layer.
"""
if not speicherort:
raise ValueError("Ein gültiger Speicherort (speicherort) muss übergeben werden.")
# Setze gpkg_path falls noch nicht vorhanden
if not self.gpkg_path:
self.gpkg_path = speicherort
results: List[Dict[str, Any]] = []
daten_map: Dict[str, List[Any]] = daten_dict.get("daten", {})
# Iteriere über alle Einträge
for ident, features in daten_map.items():
# Thema/Name ableiten (falls vorhanden in processed_results oder ident)
thema = None
for pe in processed_results:
try:
kontext = getattr(pe, "kontext", None) or {}
if kontext and kontext.get("ident") == ident:
thema = kontext.get("thema")
break
except Exception:
continue
if not thema:
thema = str(ident)
layer_name = thema
# Prüfe, ob Layer bereits existiert in der Ziel-GPKG
layer_exists = False
try:
layer_exists_fn = getattr(qgiscore, "layer_exists_in_gpkg", None)
if callable(layer_exists_fn):
layer_exists = layer_exists_fn(self.gpkg_path, layer_name)
else:
# Fallback: QGIS-Fallback-Check via QgsVectorLayer
if getattr(qgiscore, "QgsVectorLayer", None) is not None and qgiscore.QGIS_AVAILABLE:
uri = f"{self.gpkg_path}|layername={layer_name}"
layer = qgiscore.QgsVectorLayer(uri, layer_name, "ogr")
layer_exists = bool(layer and getattr(layer, "isValid", lambda: False)())
except Exception:
layer_exists = False
operation = "created"
if layer_exists:
# Zentrale Nutzerabfrage über Pruefmanager
# Erwartet Rückgabe: "overwrite" | "append" | "cancel"
try:
user_choice = self.pruefmanager.ask_overwrite_append_cancel(layer_name)
except Exception:
# Fallback: overwrite, falls Pruefmanager nicht verfügbar
user_choice = "overwrite"
if user_choice == "cancel":
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
if user_choice == "overwrite":
write_err = self._write_layer_to_gpkg(layer_name, features, mode="overwrite")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Überschreiben von {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "overwritten"
elif user_choice == "append":
write_err = self._write_layer_to_gpkg(layer_name, features, mode="append")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Anhängen an {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "appended"
else:
# Layer existiert nicht -> neu anlegen
write_err = self._write_layer_to_gpkg(layer_name, features, mode="create")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erstellen von {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "created"
# Stilbehandlung (falls in processed_results referenziert)
style_written = False
style_path = None
for pe in processed_results:
try:
kontext = getattr(pe, "kontext", None) or {}
if kontext and kontext.get("ident") == ident:
style_path = kontext.get("stildatei") or kontext.get("Stildatei")
break
except Exception:
continue
if style_path:
if not os.path.isabs(style_path):
base_dir = os.path.dirname(__file__)
style_path = os.path.join(base_dir, style_path)
write_style_fn = getattr(qgiscore, "write_style_to_gpkg", None)
if callable(write_style_fn):
try:
write_style_fn(self.gpkg_path, style_path, layer_name)
style_written = True
except Exception:
style_written = False
feature_count = len(features) if isinstance(features, list) else 0
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": feature_count,
"style_written": style_written,
})
return results
# ------------------------------------------------------------------ #
# Lade Layer ins Projekt
# ------------------------------------------------------------------ #
def lade_Layer(self, layer_infos: List[Dict[str, Any]]) -> None:
"""
Lädt die in schreibe_Daten erzeugten/aktualisierten Layer ins Projekt
und wendet die Vorgabestile an.
"""
loaded_layers = []
for info in layer_infos:
layer_path = info.get("layer_path")
thema = info.get("thema")
if not layer_path:
continue
try:
layer = qgiscore.QgsVectorLayer(layer_path, thema, "ogr")
if not layer or not getattr(layer, "isValid", lambda: False)():
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Layer {thema} konnte nicht geladen werden",
aktion="layer_nicht_gefunden",
kontext={"layer_path": layer_path},
)
self.pruefmanager.verarbeite(pe_err)
continue
except Exception as exc:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Layers {thema}: {exc}",
aktion="layer_nicht_gefunden",
kontext={"layer_path": layer_path, "error": str(exc)},
)
self.pruefmanager.verarbeite(pe_err)
continue
try:
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
if callable(apply_style_fn):
apply_style_fn(self.gpkg_path, layer)
except Exception:
pe_warn = pruef_ergebnis(
ok=True,
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
aktion="stil_not_implemented",
kontext={"thema": thema},
)
self.pruefmanager.verarbeite(pe_warn)
try:
# qgisui wrapper wird hier nicht direkt für die Abfrage verwendet;
# qgisui.add_layer_to_project sollte aber vorhanden sein.
from sn_basis.functions import qgisui_wrapper as qgisui
add_fn = getattr(qgisui, "add_layer_to_project", None)
if callable(add_fn):
add_fn(layer)
else:
# Fallback: falls wrapper nicht vorhanden, versuche QGIS-API direkt
if getattr(qgiscore, "QgsProject", None) is not None and qgiscore.QGIS_AVAILABLE:
qgiscore.QgsProject.instance().addMapLayer(layer)
loaded_layers.append(layer)
except Exception:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Layer {thema} konnte nicht ins Projekt geladen werden",
aktion="layer_nicht_gefunden",
kontext={"thema": thema},
)
self.pruefmanager.verarbeite(pe_err)
continue
# Sortiere Layer im Projekt nach ID (Wrapper-Funktion bevorzugt)
sort_fn = getattr(qgiscore, "sort_layers_by_id", None)
if callable(sort_fn):
try:
sort_fn()
except Exception:
pass
# ------------------------------------------------------------------ #
# Schreibe Log
# ------------------------------------------------------------------ #
def schreibe_log(self, processed_results: List[Any], speicherort: str) -> str:
"""
Schreibt die verarbeiteten Pruefergebnisse strukturiert in eine Log-Datei.
"""
if not speicherort:
raise ValueError("Ein gültiger Speicherort muss übergeben werden.")
log_dir = speicherort
os.makedirs(log_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = os.path.join(log_dir, f"datenabruf_log_{timestamp}.json")
serializable: List[Dict[str, Any]] = []
for pe in processed_results:
try:
entry = {}
entry["ok"] = getattr(pe, "ok", None) if hasattr(pe, "ok") else None
entry["meldung"] = getattr(pe, "meldung", None) if hasattr(pe, "meldung") else None
kontext = getattr(pe, "kontext", None) if hasattr(pe, "kontext") else None
entry["kontext"] = kontext
serializable.append(entry)
except Exception:
serializable.append({"raw": str(pe)})
with open(log_path, "w", encoding="utf-8") as fh:
json.dump(serializable, fh, ensure_ascii=False, indent=2)
pe_log = pruef_ergebnis(
ok=True,
meldung=f"Log geschrieben: {os.path.basename(log_path)}",
aktion="standarddatei_vorschlagen",
kontext={"log_path": log_path},
)
self.pruefmanager.verarbeite(pe_log)
return log_path
# ------------------------------------------------------------------ #
# Hilfsfunktionen intern
# ------------------------------------------------------------------ #
def _write_layer_to_gpkg(self, layer_name: str, features: List[Any], mode: str = "create") -> Optional[str]:
"""
Interne Hilfsfunktion zum Schreiben eines Layers in das GPKG.
Erwartete qgiscore-Funktion:
qgiscore.write_features_to_gpkg(gpkg_path, layer_name, features, mode)
"""
write_fn = getattr(qgiscore, "write_features_to_gpkg", None)
if callable(write_fn):
try:
write_fn(self.gpkg_path, layer_name, features, mode)
return None
except Exception as exc:
return str(exc)
# Fallback: Verwende QgsVectorFileWriter, falls QGIS verfügbar
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsVectorFileWriter", None) is not None:
try:
# Minimaler Fallback: erwarte, dass 'features' eine Liste von QgsFeature ist
if not features:
# Erstelle leeren Layer-Eintrag (GPKG erlaubt leere Layer)
# Hier vereinfachen wir: writeAsVectorFormatV3 benötigt ein Layer-Objekt.
return None
# Versuche, ein Memory-Layer aus dem ersten Feature zu ermitteln
first = features[0]
mem_layer = None
if hasattr(first, "fields") and hasattr(first, "geometry"):
# Wenn Features QgsFeature sind, versuchen wir, das zugehörige Layer zu nutzen
try:
mem_layer = first.layer() if hasattr(first, "layer") else None
except Exception:
mem_layer = None
if mem_layer is None:
return "Keine Feld-/Geometrie-Informationen zum Schreiben vorhanden"
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
opts.driverName = "GPKG"
opts.layerName = layer_name
opts.fileEncoding = "UTF-8"
if mode == "overwrite":
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
else:
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
mem_layer,
self.gpkg_path,
qgiscore.QgsProject.instance().transformContext(),
opts
)
if err != qgiscore.QgsVectorFileWriter.NoError:
return f"Fehler beim Schreiben (Code {err})"
return None
except Exception as exc:
return str(exc)
return "Keine Schreib-Funktion verfügbar (Wrapper nicht implementiert)"

View File

@@ -140,6 +140,7 @@ class Pruefmanager:
"kein_arbeitsblatt",
"read_error",
"open_error",
"pflichtfelder_fehlen",
}
if aktion in informational_actions:
return "abort"
@@ -202,3 +203,74 @@ class Pruefmanager:
# Standard: keine Änderung
return ergebnis
def ask_overwrite_append_cancel(self, layer_name: str, default: str = "overwrite") -> str:
"""
Zeigt dem Nutzer eine Auswahl für einen bereits existierenden Layer an.
Rückgabe
-------
str
Einer der Werte: "overwrite", "append", "cancel".
Verhalten
--------
- Verwendet bevorzugt die UI-Wrapper-Funktion `qt_wrapper` / `qgisui_wrapper`,
falls vorhanden (z. B. ein QMessageBox-Dialog mit drei Buttons).
- Im Mock- oder Headless-Modus (kein Qt/QGIS verfügbar) wird der übergebene
`default`-Wert zurückgegeben.
- Alle Nutzerinteraktionen laufen über diese zentrale Methode, damit das
Plugin an einer Stelle gesteuert und ggf. getested werden kann.
Parameter
---------
layer_name:
Anzeigename des Layers, der bereits existiert (wird im Dialog angezeigt).
default:
Rückgabewert im Headless/Mock-Modus oder wenn der Dialog nicht verfügbar ist.
Gültige Werte: "overwrite", "append", "cancel". Standard: "overwrite".
"""
# Validierung des Defaults
if default not in ("overwrite", "append", "cancel"):
default = "overwrite"
# Versuche, eine UI-Wrapper-Funktion zu verwenden, falls vorhanden
try:
# qgisui_wrapper kann eine spezialisierte Dialogfunktion bereitstellen
from sn_basis.functions import qgisui_wrapper as qgisui
ask_fn = getattr(qgisui, "ask_overwrite_append_cancel", None)
if callable(ask_fn):
# Die Wrapper-Funktion soll genau die drei Strings zurückgeben
choice = ask_fn(layer_name)
if choice in ("overwrite", "append", "cancel"):
return choice
except Exception:
# Falls Import/Wrapper fehlschlägt, weiter zum Qt-Fallback
pass
# Fallback: direkte Qt-Dialoge über qt_wrapper (wenn verfügbar)
try:
from sn_basis.functions import qt_wrapper as qt
QMessageBox = getattr(qt, "QMessageBox", None)
if QMessageBox is not None:
# Erzeuge und konfiguriere Dialog
msg = QMessageBox()
msg.setWindowTitle("Layer bereits vorhanden")
msg.setText(f"Der Layer '{layer_name}' existiert bereits. Was möchten Sie tun?")
overwrite_btn = msg.addButton("Überschreiben", QMessageBox.AcceptRole)
append_btn = msg.addButton("Anhängen", QMessageBox.AcceptRole)
cancel_btn = msg.addButton("Abbrechen", QMessageBox.RejectRole)
msg.setDefaultButton(overwrite_btn)
# Blockierend anzeigen
msg.exec_()
clicked = msg.clickedButton()
if clicked == overwrite_btn:
return "overwrite"
if clicked == append_btn:
return "append"
return "cancel"
except Exception:
# Qt nicht verfügbar oder Fehler beim Dialogaufbau
pass
# Headless / Mock: gib Default zurück
return default

View File

@@ -26,11 +26,13 @@ PruefAktion = Literal[
"datenquelle_unerwartet",
"layer_nicht_editierbar",
"falsche_endung",
"pflichtfelder_fehlen",
# Excel / Import-spezifische Aktionen
"kein_header",
"kein_arbeitsblatt",
"read_error",
"open_error",
"datenabruf",
# Generische Prüf-/Speicher-Aktionen
"pruefe_exception",
"save_exception",