1238 lines
48 KiB
Python
1238 lines
48 KiB
Python
|
||
"""
|
||
sn_plan41/ui/tab_a_logic.py – Fachlogik für Tab A (Daten)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
|
||
|
||
from typing import Any, Dict, List, Optional
|
||
from collections.abc import Mapping as _Mapping
|
||
import os
|
||
import datetime
|
||
import json
|
||
import tempfile
|
||
import html
|
||
|
||
from sn_basis.functions.qgiscore_wrapper import (
|
||
QgsVectorFileWriter,
|
||
QgsVectorLayer,
|
||
QgsProject,
|
||
QgsGeometry,
|
||
QgsFeature,
|
||
QgsField,
|
||
QgsFeatureRequest,
|
||
QgsCoordinateReferenceSystem,
|
||
|
||
)
|
||
|
||
from sn_basis.functions.variable_wrapper import (
|
||
get_variable,
|
||
set_variable,
|
||
)
|
||
from sn_basis.functions.ly_existence_wrapper import layer_exists
|
||
from sn_basis.functions.ly_metadata_wrapper import get_layer_type
|
||
from sn_basis.functions.qt_wrapper import QVariant
|
||
from sn_basis.functions.dialog_wrapper import create_progress_dialog
|
||
from sn_basis.functions.message_wrapper import info, warning, error
|
||
|
||
|
||
# Prüfer-Typen
|
||
from sn_basis.modules.Pruefmanager import Pruefmanager
|
||
from sn_basis.modules.linkpruefer import Linkpruefer
|
||
from sn_basis.modules.stilpruefer import Stilpruefer
|
||
from sn_basis.modules.Dateipruefer import Dateipruefer
|
||
from sn_basis.modules.layerpruefer import Layerpruefer
|
||
from sn_basis.modules.LayerLoader import LayerLoader
|
||
from sn_basis.modules.Datenschreiber import Datenschreiber
|
||
|
||
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
||
from sn_basis.modules.DataGrabber import DataGrabber, SourceType, SourceDict
|
||
from sn_basis.modules.Datenabruf import Datenabruf
|
||
|
||
Row = Dict[str, Any]
|
||
DataDict = Dict[str, List[Row]]
|
||
|
||
class TabALogic:
|
||
"""
|
||
Kapselt die Fachlogik von Tab A. Verfahrens-DB wird **nicht** bei Pfad-Auswahl,
|
||
sondern erst beim ersten Layer-Schreiben angelegt.
|
||
"""
|
||
|
||
def __init__(self, pruefmanager: Pruefmanager, link_pruefer: Linkpruefer, stil_pruefer: Stilpruefer) -> None:
|
||
self.pruefmanager = pruefmanager
|
||
self.link_pruefer = link_pruefer
|
||
self.stil_pruefer = stil_pruefer
|
||
self.data_grabber: Optional[DataGrabber] = None
|
||
|
||
class _SilentPruefmanagerProxy:
|
||
"""Proxy, der Prüfergebnisse ohne UI-Meldungen verarbeitet."""
|
||
|
||
def __init__(self, outer: "TabALogic", fallback: Optional[Any]) -> None:
|
||
self._outer = outer
|
||
self._fallback = fallback
|
||
self.results: List[Any] = []
|
||
|
||
def verarbeite(self, ergebnis: Any) -> Any:
|
||
self.results.append(ergebnis)
|
||
ok = bool(getattr(ergebnis, "ok", False))
|
||
if not ok:
|
||
aktion = getattr(ergebnis, "aktion", "unbekannt")
|
||
meldung = getattr(ergebnis, "meldung", "")
|
||
self._outer._log(f"[SILENT-PRUEFMANAGER] {aktion}: {meldung}")
|
||
return ergebnis
|
||
|
||
def __getattr__(self, name: str) -> Any:
|
||
if self._fallback is not None:
|
||
return getattr(self._fallback, name)
|
||
raise AttributeError(name)
|
||
|
||
def _log(self, msg: str) -> None:
|
||
print(f"[TabALogic] {msg}")
|
||
|
||
def _create_silent_pruefmanager(self) -> "TabALogic._SilentPruefmanagerProxy":
|
||
return TabALogic._SilentPruefmanagerProxy(self, self.pruefmanager)
|
||
|
||
def _extract_datenabruf_error_maps(self, datenabruf_results: List[Any]) -> Dict[str, Dict[str, str]]:
|
||
by_ident: Dict[str, str] = {}
|
||
by_thema: Dict[str, str] = {}
|
||
|
||
for result in datenabruf_results or []:
|
||
if not result:
|
||
continue
|
||
context = getattr(result, "kontext", None)
|
||
if not isinstance(context, dict):
|
||
continue
|
||
|
||
if getattr(result, "aktion", None) == "url_nicht_erreichbar":
|
||
ident = str(context.get("ident") or "").strip()
|
||
thema = str(context.get("thema") or "").strip()
|
||
reason = str(context.get("error") or getattr(result, "meldung", "") or "unbekannter Fehler").strip()
|
||
if ident and reason:
|
||
by_ident[ident] = reason
|
||
if thema and reason:
|
||
by_thema[thema] = reason
|
||
|
||
if getattr(result, "aktion", None) == "datenabruf":
|
||
fehler_map = context.get("fehler")
|
||
if isinstance(fehler_map, dict):
|
||
for thema_key, value in fehler_map.items():
|
||
thema = str(thema_key or "").strip()
|
||
reason = str(value or "").strip()
|
||
if thema and reason and thema not in by_thema:
|
||
by_thema[thema] = reason
|
||
|
||
return {"by_ident": by_ident, "by_thema": by_thema}
|
||
|
||
def _zeige_verfahrensgebiet_hinweis(self) -> None:
|
||
if not self.pruefmanager:
|
||
return
|
||
show_hint = getattr(self.pruefmanager, "zeige_hinweis", None)
|
||
if callable(show_hint):
|
||
show_hint(
|
||
"Verfahrensgebiet fehlt",
|
||
"Kein gueltiger Verfahrensgebiet-Layer vorhanden. "
|
||
"Bitte zuerst in sn_verfahrensgebiet den Layer ueber 'Aus ALKIS laden' laden "
|
||
"oder im Dropdown einen gueltigen Layer auswaehlen.",
|
||
)
|
||
|
||
def _close_progress_dialog(self, progress: Optional[Any]) -> None:
|
||
if progress is None:
|
||
return
|
||
close = getattr(progress, "close", None)
|
||
if callable(close):
|
||
try:
|
||
close()
|
||
except Exception:
|
||
pass
|
||
|
||
# -------------------------------
|
||
# Verfahrens-Datenbank (Pfad-Management)
|
||
# -------------------------------
|
||
|
||
def load_verfahrens_db(self) -> Optional[str]:
|
||
"""Lädt den gespeicherten Verfahrens-DB-Pfad (Datei muss nicht existieren)."""
|
||
path = get_variable("verfahrens_db", scope="project")
|
||
return path or None
|
||
|
||
def set_verfahrens_db(self, path: Optional[str]) -> None:
|
||
"""Speichert den Verfahrens-DB-Pfad (Datei wird später angelegt)."""
|
||
if path:
|
||
set_variable("verfahrens_db", path, scope="project")
|
||
else:
|
||
set_variable("verfahrens_db", "", scope="project")
|
||
|
||
# -------------------------------
|
||
# Layer → Verfahrens-DB schreiben (alte Logik!)
|
||
# -------------------------------
|
||
|
||
def write_layer_to_verfahrens_db(
|
||
self,
|
||
source_layer: QgsVectorLayer,
|
||
zielpfad: str,
|
||
layer_name: str,
|
||
) -> bool:
|
||
"""
|
||
Schreibt einen Layer in die Verfahrens-DB.
|
||
Legt GPKG **bei Bedarf neu an** (wie puffer_setzen im alten Code).
|
||
|
||
Args:
|
||
source_layer: Layer zum Exportieren (z.B. aus DataGrabber)
|
||
zielpfad: Vom Dateiprüfer geprüfter Ziel-GPKG-Pfad
|
||
layer_name: Name des Layers in der GPKG
|
||
|
||
Returns:
|
||
True wenn erfolgreich
|
||
"""
|
||
if not zielpfad or not source_layer or not source_layer.isValid():
|
||
return False
|
||
|
||
# Optionen wie im alten puffer_setzen
|
||
opts = QgsVectorFileWriter.SaveVectorOptions()
|
||
opts.driverName = "GPKG"
|
||
opts.fileEncoding = "UTF-8"
|
||
opts.layerName = layer_name
|
||
|
||
# Alte Logik: bei neuem Pfad komplett neue GPKG, sonst Layer überschreiben
|
||
if not os.path.exists(zielpfad):
|
||
opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteFile
|
||
else:
|
||
opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer
|
||
|
||
transform_context = QgsProject.instance().transformContext()
|
||
|
||
error = QgsVectorFileWriter.writeAsVectorFormatV3(
|
||
source_layer,
|
||
zielpfad,
|
||
transform_context,
|
||
opts,
|
||
)
|
||
|
||
if error != QgsVectorFileWriter.NoError:
|
||
print(f"Fehler beim Schreiben nach {zielpfad}: {error}")
|
||
return False
|
||
|
||
# Pfad jetzt auch als "Verfahrens-DB" merken
|
||
self.set_verfahrens_db(zielpfad)
|
||
return True
|
||
|
||
# -------------------------------
|
||
# Lokale Linkliste
|
||
# -------------------------------
|
||
|
||
def load_linkliste(self) -> Optional[str]:
|
||
path = get_variable("linkliste", scope="project")
|
||
if path and file_exists(path):
|
||
return path
|
||
return None
|
||
|
||
def set_linkliste(self, path: Optional[str]) -> None:
|
||
if path:
|
||
set_variable("linkliste", path, scope="project")
|
||
else:
|
||
set_variable("linkliste", "", scope="project")
|
||
|
||
# -------------------------------
|
||
# Verfahrensgebiet-Layer
|
||
# -------------------------------
|
||
def load_verfahrensgebiet_layer_id(self) -> Optional[str]:
|
||
value = get_variable("verfahrensgebiet_layer", scope="project")
|
||
return value or None
|
||
|
||
def is_valid_verfahrensgebiet_layer(self, layer) -> bool:
|
||
if not layer_exists(layer):
|
||
return False
|
||
|
||
layer_type = get_layer_type(layer)
|
||
return layer_type == "vector"
|
||
|
||
# === PIPELINE ===
|
||
def _on_run_pipeline(
|
||
self,
|
||
source: str,
|
||
linkliste: str | None,
|
||
raumfilter: str,
|
||
progress: Optional[Any] = None,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""Pipeline starten; Linkliste wird ausgelesen und geprüft, dann Datenabruf ausgeführt."""
|
||
self._log("Pipeline startet")
|
||
|
||
if not self.pruefmanager or not self.data_grabber:
|
||
self._log("Fehler: Pruefmanager oder DataGrabber fehlt")
|
||
return None
|
||
|
||
# 1) Verfahrens-DB prüfen und als aktive DB setzen
|
||
datei_ergebnis = Dateipruefer(
|
||
source,
|
||
basis_pfad="",
|
||
leereingabe_erlaubt=False,
|
||
standarddatei=None,
|
||
temporaer_erlaubt=True,
|
||
verfahrens_db_modus=True,
|
||
).pruefe()
|
||
|
||
datei_ergebnis = self.pruefmanager.verarbeite(datei_ergebnis)
|
||
if not datei_ergebnis.ok:
|
||
self._log("Verfahrens-DB-Pruefung fehlgeschlagen")
|
||
return None
|
||
|
||
final_pfad = str(datei_ergebnis.kontext or source)
|
||
self.set_verfahrens_db(final_pfad)
|
||
|
||
# Nach bestätigter Entscheidung: sofort Fortschrittsdialog zeigen
|
||
if progress is None:
|
||
progress = create_progress_dialog(1, "Fachdaten laden", "Prüfe Eingaben...")
|
||
else:
|
||
progress.set_total(1)
|
||
progress.set_value(0)
|
||
progress.set_label("Prüfe Eingaben...")
|
||
|
||
# 2) Linkliste auflösen, falls leer Standardlinkliste verwenden
|
||
linkliste_final = self._resolve_linkliste(linkliste)
|
||
if linkliste_final is None:
|
||
self._log("Linkliste kann nicht aufgelöst werden")
|
||
self._close_progress_dialog(progress)
|
||
return None
|
||
else:
|
||
self._log(f"Linkliste final: '{linkliste_final}'")
|
||
# 3) Raumfilter prüfen
|
||
raumfilter_layer = self._resolve_raumfilter(raumfilter, final_pfad)
|
||
if raumfilter in ("Verfahrensgebiet", "Pufferlayer") and raumfilter_layer is None:
|
||
self._log(f"Raumfilter '{raumfilter}' nicht verfügbar")
|
||
self._close_progress_dialog(progress)
|
||
return None
|
||
|
||
# 4) Lade-Status initialisieren (funktioniert ab Bestätigung überschreiben/anhängen)
|
||
if progress is None:
|
||
# placeholder mit 1; tatsächliche Gesamtzahl kennt DataGrabber später
|
||
progress = create_progress_dialog(1, "Fachdaten laden", "Prüfe Eingaben...")
|
||
else:
|
||
progress.set_total(1)
|
||
progress.set_value(0)
|
||
progress.set_label("Prüfe Eingaben...")
|
||
|
||
# 5) Daten aus Linkliste laden und prüfen
|
||
silent_pruefmanager = self._create_silent_pruefmanager()
|
||
local_data_grabber = DataGrabber(
|
||
silent_pruefmanager,
|
||
datei_pruefer_cls=self.data_grabber._datei_pruefer_cls,
|
||
link_pruefer=self.data_grabber.link_pruefer,
|
||
layer_pruefer=self.data_grabber.layer_pruefer,
|
||
stil_pruefer=self.data_grabber.stil_pruefer,
|
||
excel_importer_cls=self.data_grabber._excel_importer_cls,
|
||
)
|
||
source_dict, grabber_summary = local_data_grabber.run(linkliste_final)
|
||
self._log(f"DataGrabber: {grabber_summary.meldung} [{grabber_summary.aktion}]")
|
||
|
||
# DEBUG: detaillierter Status
|
||
print("[TabALogic] ... Debug: source_dict keys:", list(source_dict.keys()))
|
||
print("[TabALogic] ... Debug: rows count:", len(source_dict.get("rows", [])))
|
||
if source_dict.get("rows"):
|
||
for i, row in enumerate(source_dict.get("rows", []), start=1):
|
||
print(f"[TabALogic] ... Debug: row {i}: {row}")
|
||
|
||
if not source_dict.get("rows"):
|
||
self._log("Keine validen Linkliste-Einträge für Datenabruf")
|
||
print("[TabALogic] ... STOP: rows:", len(source_dict.get("rows", [])))
|
||
self._close_progress_dialog(progress)
|
||
return None
|
||
|
||
total_rows = len(source_dict.get("rows", []))
|
||
if progress is not None:
|
||
if hasattr(progress, "set_total"):
|
||
progress.set_total(max(total_rows, 1))
|
||
elif hasattr(progress, "setMaximum"):
|
||
progress.setMaximum(max(total_rows, 1))
|
||
else:
|
||
progress.total = max(total_rows, 1)
|
||
progress.set_value(0)
|
||
progress.set_label("Lade Daten...")
|
||
|
||
if not grabber_summary.ok:
|
||
self._log("Warnung: DataGrabber meldet fehlerhafte Zeilen, fahre mit Validierungsdaten fort")
|
||
|
||
# 5) Datenabruf (aus validierten Zeilen)
|
||
silent_pruefmanager = self._create_silent_pruefmanager()
|
||
datenabruf = Datenabruf(silent_pruefmanager)
|
||
result_dict, datenabruf_results = datenabruf.datenabruf(
|
||
result_dict=source_dict,
|
||
raumfilter=raumfilter,
|
||
verfahrensgebiet_layer=raumfilter_layer,
|
||
speicherort=final_pfad,
|
||
pruef_ergebnisse=[grabber_summary],
|
||
progress=progress,
|
||
)
|
||
|
||
self._log("Datenabruf abgeschlossen")
|
||
|
||
pipeline_context = {
|
||
"source": final_pfad,
|
||
"linkliste": linkliste_final,
|
||
"raumfilter": raumfilter_layer,
|
||
"raumfilter_name": raumfilter,
|
||
"source_dict": source_dict,
|
||
"result_dict": result_dict,
|
||
"datenabruf_results": datenabruf_results,
|
||
}
|
||
|
||
# 6) Lade Dienste in das Projekt aus result_dict
|
||
load_summary = self._load_dienste_aus_result_dict(source_dict, pipeline_context, progress=progress)
|
||
|
||
if progress is not None:
|
||
progress.set_value(total_rows)
|
||
progress.set_label("Fachdaten laden abgeschlossen. Bitte OK klicken, um den Dialog zu schließen.")
|
||
|
||
# 7) Log-Datei schreiben
|
||
self._write_html_log(final_pfad, source_dict, pipeline_context, load_summary)
|
||
|
||
print("=" * 60 + "\n")
|
||
return pipeline_context
|
||
|
||
def _load_dienste_aus_result_dict(self, source_dict: DataDict, pipeline_context: Dict[str, Any], progress: Optional[Any] = None) -> Dict[str, Any]:
|
||
"""Lädt Dienste (aus Linkliste) ins Projekt und persistiert optional mit Datenschreiber."""
|
||
rows = source_dict.get("rows", [])
|
||
total = len(rows)
|
||
loaded_count = 0
|
||
skipped_outside = 0
|
||
aborted = False
|
||
if not rows:
|
||
self._log("Keine Dienste zum Laden")
|
||
return {
|
||
"row_count": 0,
|
||
"loaded_count": 0,
|
||
"skipped_outside": 0,
|
||
"aborted": False,
|
||
"row_stats": [],
|
||
"layer_call_status": {},
|
||
"raumfilter_name": pipeline_context.get("raumfilter_name", "unbekannt"),
|
||
}
|
||
|
||
final_pfad = pipeline_context.get("source") or ""
|
||
use_datenschreiber = bool(final_pfad)
|
||
|
||
datenschreiber = None
|
||
if use_datenschreiber:
|
||
datenschreiber = Datenschreiber(self.pruefmanager, gpkg_path=final_pfad)
|
||
|
||
daten_dict: Dict[str, Any] = {"daten": {}}
|
||
|
||
raumfilter_layer = pipeline_context.get("raumfilter")
|
||
raumfilter_name = pipeline_context.get("raumfilter_name", "unbekannt")
|
||
raumfilter_crs_authid = None
|
||
if raumfilter_layer is not None and hasattr(raumfilter_layer, "crs") and callable(getattr(raumfilter_layer, "crs")):
|
||
try:
|
||
crs = raumfilter_layer.crs()
|
||
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
|
||
raumfilter_crs_authid = crs.authid()
|
||
except Exception:
|
||
raumfilter_crs_authid = None
|
||
# Für den späteren Filter benötigen wir entweder die reine Ausdehnung
|
||
# (Pufferlayer) oder – im Falle eines echten Verfahrensgebiets – die
|
||
# vollständige Geometrie. Die Filtermethode wird im Schleifenrumpf
|
||
# ausgewählt.
|
||
raumfilter_extent = None
|
||
if raumfilter_layer is not None and getattr(raumfilter_layer, 'extent', None) is not None:
|
||
raumfilter_extent = raumfilter_layer.extent()
|
||
|
||
temp_layers: List[Any] = []
|
||
silent_pruefmanager = self._create_silent_pruefmanager()
|
||
layer_loader = LayerLoader(silent_pruefmanager, stil_pruefer=self.stil_pruefer, layer_pruefer=self.link_pruefer)
|
||
|
||
error_maps = self._extract_datenabruf_error_maps(pipeline_context.get("datenabruf_results", []))
|
||
error_by_ident = error_maps.get("by_ident", {})
|
||
error_by_thema = error_maps.get("by_thema", {})
|
||
|
||
# Statistiken für Log: Raumfilter-Info pro Dienst
|
||
row_stats: List[Dict[str, Any]] = []
|
||
layer_call_status: Dict[str, str] = {}
|
||
|
||
|
||
for idx, row in enumerate(rows, start=1):
|
||
ident = str(row.get("ident") or "")
|
||
provider = str(row.get("Provider", "")).lower()
|
||
link = str(row.get("Link", ""))
|
||
thema = str(row.get("Inhalt") or row.get("ident") or "Dienst")
|
||
style = row.get("stildatei")
|
||
|
||
daten_map = (pipeline_context.get("result_dict") or {}).get("daten", {})
|
||
fetched_features = daten_map.get(ident, []) if isinstance(daten_map, dict) else []
|
||
fetched_count = len(fetched_features) if isinstance(fetched_features, list) else None
|
||
base_reason = error_by_ident.get(ident) or error_by_thema.get(thema)
|
||
|
||
if progress is not None:
|
||
progress.set_label(f"Lade Dienst {idx}/{total}: {thema}")
|
||
progress.set_value(idx)
|
||
if progress.is_canceled():
|
||
aborted = True
|
||
layer_call_status[thema] = "abbruch_vor_layeraufruf"
|
||
self._log("Nutzerabbruch: Pipeline gestoppt")
|
||
self.pruefmanager.verarbeite(
|
||
pruef_ergebnis(
|
||
ok=False,
|
||
meldung="Pipeline durch Benutzer abgebrochen",
|
||
aktion="abbruch",
|
||
kontext={"dienst": thema, "schritt": idx},
|
||
)
|
||
)
|
||
break
|
||
|
||
|
||
self._log(f"Lade Dienst '{thema}' ({provider})")
|
||
self._log(f"[DEBUG] Layeraufruf startet: thema='{thema}', provider='{provider}', link='{link}'")
|
||
layer_call_status[thema] = "layeraufruf_start"
|
||
|
||
layer = layer_loader.create_layer(provider, link, thema)
|
||
|
||
if not layer:
|
||
layer_call_status[thema] = "layer_nicht_ladbar"
|
||
self._log(f"[DEBUG] Layeraufruf fehlgeschlagen: thema='{thema}'")
|
||
reason = base_reason or "Layer konnte nicht geladen werden (Dienst nicht erreichbar oder Link fehlerhaft)"
|
||
self._log(f"[DEBUG] Dienst nicht geladen: {thema} | Grund: {reason}")
|
||
row_stats.append({
|
||
"dienst": thema,
|
||
"provider": provider,
|
||
"link": link,
|
||
"style": style or "",
|
||
"datenabruf_features": fetched_count,
|
||
"total_features": None,
|
||
"filtered_features": None,
|
||
"status": "layer_nicht_ladbar",
|
||
"reason": reason,
|
||
})
|
||
continue
|
||
|
||
layer_call_status[thema] = "layeraufruf_ok"
|
||
self._log(f"[DEBUG] Layeraufruf erfolgreich: thema='{thema}'")
|
||
|
||
if progress is not None and progress.is_canceled():
|
||
aborted = True
|
||
layer_call_status[thema] = "abbruch_nach_layeraufruf"
|
||
self._log("Nutzerabbruch nach Layer-Erzeugung")
|
||
break
|
||
|
||
# Je nach Typ des Filters einen geeigneten Filter anwenden.
|
||
if raumfilter_layer and raumfilter_name == "Verfahrensgebiet":
|
||
# echte Geometrie-Schnittmenge, nicht nur BBox
|
||
layer_for_write = layer_loader.filter_by_layer(
|
||
layer,
|
||
raumfilter_layer,
|
||
cancel_callback=(progress.is_canceled if progress is not None else None),
|
||
)
|
||
else:
|
||
layer_for_write = layer_loader.filter_by_extent(
|
||
layer,
|
||
raumfilter_extent,
|
||
cancel_callback=(progress.is_canceled if progress is not None else None),
|
||
source_layer=raumfilter_layer,
|
||
)
|
||
|
||
if progress is not None and progress.is_canceled():
|
||
aborted = True
|
||
layer_call_status[thema] = "abbruch_nach_raumfilter"
|
||
self._log("Nutzerabbruch nach Raumfilter")
|
||
break
|
||
|
||
# Zähle Features vor/nach Raumfilter
|
||
total_features = None
|
||
filtered_features = None
|
||
try:
|
||
if layer is not None and hasattr(layer, "featureCount"):
|
||
total_features = int(layer.featureCount())
|
||
except Exception:
|
||
total_features = None
|
||
|
||
if layer_for_write is not None and hasattr(layer_for_write, "featureCount"):
|
||
try:
|
||
filtered_features = int(layer_for_write.featureCount())
|
||
except Exception:
|
||
filtered_features = None
|
||
|
||
if not layer_for_write:
|
||
layer_call_status[thema] = "raumfilter_ausserhalb"
|
||
self._log(f"Dienst {thema} ist außerhalb des Raumfilters")
|
||
skipped_outside += 1
|
||
reason = "Keine Objekte innerhalb des Raumfilters"
|
||
row_stats.append({
|
||
"dienst": thema,
|
||
"provider": provider,
|
||
"link": link,
|
||
"style": style or "",
|
||
"datenabruf_features": fetched_count,
|
||
"total_features": total_features,
|
||
"filtered_features": 0,
|
||
"status": "außerhalb",
|
||
"reason": reason,
|
||
})
|
||
continue
|
||
|
||
if style:
|
||
layer_loader.apply_style(layer_for_write, style)
|
||
|
||
row_stats.append({
|
||
"dienst": thema,
|
||
"provider": provider,
|
||
"link": link,
|
||
"style": style or "",
|
||
"datenabruf_features": fetched_count,
|
||
"total_features": total_features,
|
||
"filtered_features": filtered_features,
|
||
"status": "geladen",
|
||
"reason": "geladen",
|
||
})
|
||
layer_call_status[thema] = "geladen"
|
||
self._log(f"[DEBUG] Dienst geladen: thema='{thema}', provider='{provider}', filtered_features={filtered_features}")
|
||
|
||
|
||
if provider == "wms":
|
||
# WMS ist Raster und wird nicht in GPKG geschrieben.
|
||
# Im temporären Modus wird er trotzdem direkt geladen.
|
||
loaded_count += 1
|
||
if use_datenschreiber:
|
||
self._log(f"WMS-Layer {thema} wird nicht in GPKG gespeichert, nur in Projekt (temporär)")
|
||
# Während Datenbankmodus: wir speichern nicht in daten_dict,
|
||
# aber für gute Sichtbarkeit laden wir nach erfolgreichem Schreibprozess.
|
||
temp_layers.append(layer)
|
||
else:
|
||
temp_layers.append(layer)
|
||
continue
|
||
|
||
if use_datenschreiber and datenschreiber:
|
||
daten_dict["daten"][thema] = {
|
||
"layer": layer_for_write,
|
||
"style_path": style,
|
||
}
|
||
else:
|
||
temp_layers.append(layer_for_write)
|
||
|
||
loaded_count += 1
|
||
|
||
if use_datenschreiber and datenschreiber and daten_dict["daten"]:
|
||
self._log(f"Schreibe {len(daten_dict['daten'])} Layer in {final_pfad}")
|
||
results = datenschreiber.schreibe_Daten(
|
||
daten_dict,
|
||
processed_results=pipeline_context.get("datenabruf_results", []),
|
||
speicherort=final_pfad,
|
||
)
|
||
datenschreiber.lade_Layer(results)
|
||
self._log("Datenschreiber abgeschlossen")
|
||
elif temp_layers:
|
||
self._log(f"Temporärmodus: Lade {len(temp_layers)} Layer ins Projekt")
|
||
for layer in temp_layers:
|
||
QgsProject.instance().addMapLayer(layer)
|
||
self._log("Temporärmodus: Layer im Projekt geladen")
|
||
else:
|
||
self._log("Keine Layer zum Laden (kein persistierter GPkg-Write).")
|
||
|
||
self._log(f"Dienst-Laden fertig ({len(rows)} Zeilen)")
|
||
|
||
return {
|
||
"row_count": len(rows),
|
||
"loaded_count": loaded_count,
|
||
"skipped_outside": skipped_outside,
|
||
"aborted": aborted,
|
||
"row_stats": row_stats,
|
||
"layer_call_status": layer_call_status,
|
||
"raumfilter_name": raumfilter_name,
|
||
}
|
||
|
||
def _create_local_layer_from_fetched_features(
|
||
self,
|
||
thema: str,
|
||
features: List[Any],
|
||
crs_authid: Optional[str] = None,
|
||
) -> Optional[QgsVectorLayer]:
|
||
"""Erzeugt aus bereits geholten GeoJSON-Features einen lokalen OGR-Layer.
|
||
|
||
Verhindert einen zweiten potentiell blockierenden Remote-Aufruf (WFS/REST).
|
||
"""
|
||
if not features:
|
||
return None
|
||
|
||
normalized_features: List[Dict[str, Any]] = []
|
||
detected_crs_authid: Optional[str] = None
|
||
for feature in features:
|
||
if not isinstance(feature, dict):
|
||
continue
|
||
|
||
# Fall 1: bereits GeoJSON-Feature
|
||
if feature.get("type") == "Feature" and isinstance(feature.get("geometry"), dict):
|
||
normalized_features.append(feature)
|
||
continue
|
||
|
||
# Fall 2: ArcGIS Feature-JSON -> GeoJSON konvertieren
|
||
attributes = feature.get("attributes")
|
||
geometry = feature.get("geometry")
|
||
if not isinstance(attributes, dict) or not isinstance(geometry, dict):
|
||
continue
|
||
|
||
if detected_crs_authid is None:
|
||
try:
|
||
sr = geometry.get("spatialReference")
|
||
if isinstance(sr, dict):
|
||
wkid = sr.get("latestWkid") or sr.get("wkid")
|
||
if wkid:
|
||
detected_crs_authid = f"EPSG:{int(wkid)}"
|
||
except Exception:
|
||
detected_crs_authid = None
|
||
|
||
geojson_geometry: Dict[str, Any] | None = None
|
||
|
||
# Point
|
||
if "x" in geometry and "y" in geometry:
|
||
geojson_geometry = {
|
||
"type": "Point",
|
||
"coordinates": [geometry.get("x"), geometry.get("y")],
|
||
}
|
||
# MultiPoint
|
||
elif isinstance(geometry.get("points"), list):
|
||
geojson_geometry = {
|
||
"type": "MultiPoint",
|
||
"coordinates": geometry.get("points", []),
|
||
}
|
||
# LineString / MultiLineString
|
||
elif isinstance(geometry.get("paths"), list):
|
||
paths = geometry.get("paths", [])
|
||
if len(paths) == 1:
|
||
geojson_geometry = {
|
||
"type": "LineString",
|
||
"coordinates": paths[0],
|
||
}
|
||
else:
|
||
geojson_geometry = {
|
||
"type": "MultiLineString",
|
||
"coordinates": paths,
|
||
}
|
||
# Polygon / MultiPolygon
|
||
elif isinstance(geometry.get("rings"), list):
|
||
rings = geometry.get("rings", [])
|
||
cleaned_rings = [
|
||
ring for ring in rings
|
||
if isinstance(ring, list) and len(ring) >= 4
|
||
]
|
||
if len(cleaned_rings) == 1:
|
||
geojson_geometry = {
|
||
"type": "Polygon",
|
||
"coordinates": cleaned_rings,
|
||
}
|
||
elif len(cleaned_rings) > 1:
|
||
# Robuster Fallback für ArcGIS-Ringe:
|
||
# Mehrere Ringe werden als MultiPolygon behandelt,
|
||
# damit nicht versehentlich alle Ringe als Löcher eines
|
||
# einzigen Polygons interpretiert werden.
|
||
geojson_geometry = {
|
||
"type": "MultiPolygon",
|
||
"coordinates": [[ring] for ring in cleaned_rings],
|
||
}
|
||
|
||
if geojson_geometry is None:
|
||
continue
|
||
|
||
normalized_features.append(
|
||
{
|
||
"type": "Feature",
|
||
"geometry": geojson_geometry,
|
||
"properties": attributes,
|
||
}
|
||
)
|
||
|
||
if not normalized_features:
|
||
self._log(f"[DEBUG] Keine konvertierbaren Features für lokalen Layer: thema='{thema}'")
|
||
return None
|
||
|
||
try:
|
||
payload = {
|
||
"type": "FeatureCollection",
|
||
"features": normalized_features,
|
||
}
|
||
|
||
with tempfile.NamedTemporaryFile(
|
||
suffix=".geojson",
|
||
delete=False,
|
||
mode="w",
|
||
encoding="utf-8",
|
||
) as fh:
|
||
json.dump(payload, fh, ensure_ascii=False)
|
||
tmp_path = fh.name
|
||
|
||
layer = QgsVectorLayer(tmp_path, thema, "ogr")
|
||
if layer and layer.isValid():
|
||
target_crs = detected_crs_authid or crs_authid
|
||
if target_crs and QgsCoordinateReferenceSystem is not None and hasattr(layer, "setCrs"):
|
||
try:
|
||
layer.setCrs(QgsCoordinateReferenceSystem(target_crs))
|
||
except Exception:
|
||
pass
|
||
self._log(
|
||
f"[DEBUG] Lokaler Layer gültig: thema='{thema}', "
|
||
f"input_features={len(features)}, geojson_features={len(normalized_features)}, "
|
||
f"layer_features={layer.featureCount()}, crs='{target_crs or 'unbekannt'}'"
|
||
)
|
||
return layer
|
||
|
||
self._log(f"[DEBUG] Lokaler Layer aus Datenabruf ungültig: thema='{thema}', pfad='{tmp_path}'")
|
||
return None
|
||
except Exception as exc:
|
||
self._log(f"[DEBUG] Fehler beim Erzeugen lokaler Featureschicht für {thema}: {exc}")
|
||
return None
|
||
|
||
def _write_html_log(
|
||
self,
|
||
final_pfad: str,
|
||
source_dict: DataDict,
|
||
pipeline_context: Dict[str, Any],
|
||
load_summary: Dict[str, Any],
|
||
) -> None:
|
||
"""Schreibt den Pipeline-Log als HTML-Datei."""
|
||
lines = [
|
||
"<!doctype html>",
|
||
"<html lang='de'>",
|
||
"<head>",
|
||
" <meta charset='utf-8'>",
|
||
" <title>Plan41 Fachdaten-Ladevorgang</title>",
|
||
" <style>",
|
||
" body { font-family: Arial, sans-serif; margin: 20px; color: #222; }",
|
||
" h1, h2 { margin-bottom: 8px; }",
|
||
" .meta p { margin: 4px 0; }",
|
||
" table { border-collapse: collapse; width: 100%; margin: 12px 0 24px; }",
|
||
" th, td { border: 1px solid #ccc; padding: 6px 8px; text-align: left; vertical-align: top; }",
|
||
" th { background: #f2f2f2; }",
|
||
" .ok { color: #1b5e20; font-weight: 600; }",
|
||
" .warn { color: #8d6e00; font-weight: 600; }",
|
||
" .err { color: #b71c1c; font-weight: 600; }",
|
||
" </style>",
|
||
"</head>",
|
||
"<body>",
|
||
"<h1>Plan41 Fachdaten-Ladevorgang</h1>",
|
||
"<div class='meta'>",
|
||
f" <p><strong>Datum:</strong> {datetime.datetime.now().isoformat()}</p>",
|
||
f" <p><strong>Verfahrens-DB:</strong> {final_pfad or 'temporär'}</p>",
|
||
f" <p><strong>Linkliste:</strong> {pipeline_context.get('linkliste')}</p>",
|
||
f" <p><strong>Raumfilter:</strong> {load_summary.get('raumfilter_name', 'unbekannt')}</p>",
|
||
"</div>",
|
||
"<h2>Zusammenfassung</h2>",
|
||
"<ul>",
|
||
f" <li>Zeilen gesamt: {load_summary.get('row_count', 0)}</li>",
|
||
f" <li>Geladene Dienste: {load_summary.get('loaded_count', 0)}</li>",
|
||
f" <li>Außerhalb Raumfilter: {load_summary.get('skipped_outside', 0)}</li>",
|
||
f" <li>Abgebrochen: {load_summary.get('aborted', False)}</li>",
|
||
"</ul>",
|
||
"<h2>Dienstliste</h2>",
|
||
"<table>",
|
||
"<thead><tr><th>Dienst</th><th>Provider</th><th>Linkadresse</th><th>Aufrufstatus</th></tr></thead>",
|
||
"<tbody>",
|
||
]
|
||
|
||
aufrufstatus_by_dienst = {
|
||
str(key): str(value)
|
||
for key, value in (load_summary.get("layer_call_status", {}) or {}).items()
|
||
}
|
||
aufrufstatus_labels = {
|
||
"layeraufruf_start": "Layer-Aufruf gestartet",
|
||
"layeraufruf_ok": "Layer-Aufruf erfolgreich",
|
||
"geladen": "Geladen",
|
||
"layer_nicht_ladbar": "Nicht ladbar",
|
||
"raumfilter_ausserhalb": "Außerhalb Raumfilter",
|
||
"abbruch_vor_layeraufruf": "Abbruch vor Layer-Aufruf",
|
||
"abbruch_nach_layeraufruf": "Abbruch nach Layer-Aufruf",
|
||
"abbruch_nach_raumfilter": "Abbruch nach Raumfilter",
|
||
"nicht_aufgerufen": "Nicht aufgerufen",
|
||
}
|
||
status_labels = {
|
||
"geladen": "Geladen",
|
||
"außerhalb": "Außerhalb Raumfilter",
|
||
"layer_nicht_ladbar": "Nicht ladbar",
|
||
}
|
||
|
||
for row in source_dict.get('rows', []):
|
||
dienst = row.get('Inhalt') or row.get('ident') or ''
|
||
provider = row.get('Provider') or ''
|
||
link = row.get('Link') or ''
|
||
aufrufstatus = aufrufstatus_by_dienst.get(str(dienst), "nicht_aufgerufen")
|
||
aufrufstatus_text = aufrufstatus_labels.get(str(aufrufstatus), str(aufrufstatus))
|
||
lines.append(
|
||
f"<tr><td>{html.escape(str(dienst))}</td><td>{html.escape(str(provider))}</td><td>{html.escape(str(link))}</td><td>{html.escape(str(aufrufstatus_text))}</td></tr>"
|
||
)
|
||
|
||
lines.extend([
|
||
"</tbody>",
|
||
"</table>",
|
||
"<h2>Raumfilter-Statistik</h2>",
|
||
"<table>",
|
||
"<thead><tr><th>Dienst</th><th>Datenabruf-Objekte</th><th>Gesamt-Objekte</th><th>Gefilterte Objekte</th><th>Status</th></tr></thead>",
|
||
"<tbody>",
|
||
])
|
||
|
||
for stat in load_summary.get('row_stats', []):
|
||
status = str(stat.get('status', 'n/a'))
|
||
status_label = status_labels.get(status, status)
|
||
reason = str(stat.get('reason', '') or '')
|
||
status_text = status_label if not reason else f"{status_label}: {reason}"
|
||
lines.append(
|
||
f"<tr><td>{html.escape(str(stat.get('dienst', '')))}</td><td>{html.escape(str(stat.get('datenabruf_features', 'n/a')))}</td><td>{html.escape(str(stat.get('total_features', 'n/a')))}</td><td>{html.escape(str(stat.get('filtered_features', 'n/a')))}</td><td>{html.escape(status_text)}</td></tr>"
|
||
)
|
||
|
||
lines.extend([
|
||
"</tbody>",
|
||
"</table>",
|
||
"</body>",
|
||
"</html>",
|
||
])
|
||
|
||
html_text = "\n".join(lines)
|
||
|
||
if final_pfad:
|
||
log_dir = os.path.dirname(final_pfad)
|
||
os.makedirs(log_dir, exist_ok=True)
|
||
log_file = os.path.join(log_dir, "plan41_lade_log.html")
|
||
try:
|
||
with open(log_file, "w", encoding="utf-8") as fh:
|
||
fh.write(html_text)
|
||
|
||
self.pruefmanager.verarbeite(
|
||
pruef_ergebnis(
|
||
ok=True,
|
||
meldung=f"Lade-Log gespeichert: {log_file}",
|
||
aktion="log_geschrieben",
|
||
kontext={"log_file": log_file},
|
||
)
|
||
)
|
||
info("Lade-Log", f"Lade-Protokoll gespeichert: {log_file}", duration=10)
|
||
except Exception as exc:
|
||
self.pruefmanager.verarbeite(
|
||
pruef_ergebnis(
|
||
ok=False,
|
||
meldung=f"Fehler beim Schreiben des Logs (html): {exc}",
|
||
aktion="log_schreiben_fehlgeschlagen",
|
||
kontext={"error": str(exc)},
|
||
)
|
||
)
|
||
warning("Lade-Log", f"Konnte Datei nicht schreiben: {exc}", duration=10)
|
||
else:
|
||
try:
|
||
tmp_dir = os.path.join(tempfile.gettempdir(), "sn_plan41")
|
||
os.makedirs(tmp_dir, exist_ok=True)
|
||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
log_file = os.path.join(tmp_dir, f"plan41_lade_log_{timestamp}.html")
|
||
|
||
with open(log_file, "w", encoding="utf-8") as fh:
|
||
fh.write(html_text)
|
||
|
||
self._log(f"Lade-Log (temporär) gespeichert: {log_file}")
|
||
|
||
if hasattr(os, "startfile"):
|
||
try:
|
||
os.startfile(log_file)
|
||
except Exception as exc:
|
||
self._log(f"Konnte temporäre Log-Datei nicht öffnen: {exc}")
|
||
|
||
info("Lade-Log (temporär)", f"Lade-Protokoll gespeichert: {log_file}", duration=20)
|
||
except Exception as exc:
|
||
warning("Lade-Log", f"Konnte temporäre Log-Datei nicht schreiben: {exc}", duration=10)
|
||
|
||
|
||
def _clone_layer_with_extent(self, layer: QgsVectorLayer, extent, thema: str) -> QgsVectorLayer | None:
|
||
"""Erstellt eine Memory-Kopie von <layer> mit Geometrien im BBOX-Raumfilter."""
|
||
try:
|
||
request = QgsFeatureRequest().setFilterRect(extent)
|
||
features = list(layer.getFeatures(request))
|
||
if not features:
|
||
return None
|
||
|
||
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
|
||
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
|
||
uri = f"{geom_type}?crs={layer.crs().authid()}"
|
||
|
||
filtered_layer = QgsVectorLayer(uri, f"{thema}_BBOX", "memory")
|
||
if not filtered_layer or not filtered_layer.isValid():
|
||
self._log(f"Fehler beim Erzeugen des temporären Filterlayers für {thema}")
|
||
return None
|
||
|
||
provider = filtered_layer.dataProvider()
|
||
provider.addAttributes(layer.fields())
|
||
filtered_layer.updateFields()
|
||
provider.addFeatures(features)
|
||
filtered_layer.updateExtents()
|
||
|
||
return filtered_layer
|
||
except Exception as e:
|
||
self._log(f"Fehler beim Filtern von {thema} nach Raumfilter: {e}")
|
||
return None
|
||
|
||
|
||
def _resolve_linkliste(self, linkliste: str | None) -> str | None:
|
||
"""
|
||
Prüft und normalisiert den Linklisten-Pfad.
|
||
|
||
Rückgabe:
|
||
- gültiger Pfad zur Linkliste (str)
|
||
- None → Pipeline abbrechen
|
||
"""
|
||
|
||
# --------------------------------------------------
|
||
# Standard-Linkliste (plattformneutral)
|
||
# --------------------------------------------------
|
||
plugin_root = get_plugin_root()
|
||
standard_linkliste = join_path(plugin_root, "sn_plan41","assets", "Linkliste.xlsx")
|
||
|
||
# --------------------------------------------------
|
||
# 🔹 LEERE EINGABE → AUTOMATISCH STANDARDDATEI
|
||
# --------------------------------------------------
|
||
if not linkliste:
|
||
linkliste_final = str(standard_linkliste)
|
||
self.set_linkliste(linkliste_final)
|
||
return linkliste_final
|
||
|
||
# --------------------------------------------------
|
||
# Dateiprüfung nur bei expliziter Eingabe
|
||
# --------------------------------------------------
|
||
|
||
pruefer = Dateipruefer(
|
||
pfad=linkliste,
|
||
leereingabe_erlaubt=True,
|
||
standarddatei=str(standard_linkliste),
|
||
)
|
||
|
||
ergebnis = pruefer.pruefe()
|
||
|
||
# --------------------------------------------------
|
||
# Entscheidung über Pruefmanager
|
||
# --------------------------------------------------
|
||
ergebnis = self.pruefmanager.verarbeite(ergebnis)
|
||
|
||
if not ergebnis.ok:
|
||
# Nutzer hat abgebrochen oder Fehler nicht bestätigt
|
||
return None
|
||
|
||
# --------------------------------------------------
|
||
# Erfolgsfall → geprüften Pfad übernehmen
|
||
# --------------------------------------------------
|
||
linkliste_final = str(ergebnis.kontext)
|
||
|
||
# Optional: Projektvariable aktualisieren
|
||
self.set_linkliste(linkliste_final)
|
||
|
||
return linkliste_final
|
||
|
||
def _resolve_raumfilter(self, raumfilter: str, source: str) -> QgsVectorLayer | None:
|
||
self._log(f"Raumfilter-Auswahl: '{raumfilter}'")
|
||
self._log(f"Source: '{source}'")
|
||
|
||
if raumfilter == "Verfahrensgebiet":
|
||
layer = self._get_verfahrensgebiet_layer()
|
||
self._log(
|
||
"Verfahrensgebiet gefunden"
|
||
if layer else
|
||
"❌ Kein Verfahrensgebiet im Projekt"
|
||
)
|
||
return layer
|
||
|
||
if raumfilter == "Pufferlayer":
|
||
self._log("Pufferlayer-Modus aktiv")
|
||
return self._handle_pufferlayer(source)
|
||
|
||
self._log("Kein Raumfilter gewählt")
|
||
return None
|
||
|
||
|
||
def _get_verfahrensgebiet_layer(self) -> QgsVectorLayer | None:
|
||
layer_id = self.load_verfahrensgebiet_layer_id()
|
||
self._log(f"Verfahrensgebiet-Layer-ID: {layer_id}")
|
||
|
||
if not layer_id:
|
||
self._log("❌ Keine Layer-ID gespeichert")
|
||
self._zeige_verfahrensgebiet_hinweis()
|
||
return None
|
||
|
||
layer = QgsProject.instance().mapLayer(layer_id)
|
||
if not layer:
|
||
self._log("❌ Layer-ID existiert nicht im Projekt")
|
||
self._zeige_verfahrensgebiet_hinweis()
|
||
return None
|
||
|
||
if not self.is_valid_verfahrensgebiet_layer(layer):
|
||
self._log("❌ Layer ist kein gültiger Vektorlayer")
|
||
self._zeige_verfahrensgebiet_hinweis()
|
||
return None
|
||
|
||
self._log(f"Verfahrensgebiet-Layer OK: '{layer.name()}'")
|
||
return layer
|
||
|
||
|
||
def _handle_pufferlayer(self, source: str) -> QgsVectorLayer | None:
|
||
self._log("Prüfe vorhandenen Pufferlayer im Projekt")
|
||
|
||
existing = self._load_existing_pufferlayer()
|
||
if existing:
|
||
self._log("✔ Pufferlayer bereits im Projekt vorhanden")
|
||
return existing
|
||
|
||
self._log("Kein Pufferlayer im Projekt")
|
||
|
||
if source:
|
||
self._log("Prüfe Pufferlayer im Source")
|
||
exists = self._pufferlayer_exists_in_source(source)
|
||
self._log(f"Pufferlayer im Source vorhanden: {exists}")
|
||
|
||
if exists:
|
||
return self._load_existing_pufferlayer() or self._create_pufferlayer()
|
||
|
||
self._log("Erzeuge neuen Pufferlayer")
|
||
return self._create_pufferlayer()
|
||
|
||
|
||
def _load_existing_pufferlayer(self) -> QgsVectorLayer | None:
|
||
"""
|
||
Liefert einen vorhandenen Pufferlayer aus dem Projekt.
|
||
"""
|
||
layers = QgsProject.instance().mapLayersByName("Pufferlayer")
|
||
return layers[0] if layers else None
|
||
|
||
|
||
def _create_pufferlayer(self) -> QgsVectorLayer | None:
|
||
self._log("Starte Pufferlayer-Erstellung")
|
||
|
||
basis_layer = self._get_verfahrensgebiet_layer()
|
||
if not basis_layer:
|
||
self._log("❌ Kein Verfahrensgebiet → kein Puffer möglich")
|
||
return None
|
||
source = self.load_verfahrens_db()
|
||
self._log(f"Basislayer: '{basis_layer.name()}'")
|
||
|
||
layer = self.Pufferlayer_erstellen(
|
||
basis_layer=basis_layer,
|
||
distance=1000,
|
||
name="Pufferlayer",
|
||
source=source,
|
||
)
|
||
|
||
self._log(
|
||
"✔ Pufferlayer erfolgreich erzeugt"
|
||
if layer else
|
||
"❌ Pufferlayer-Erstellung fehlgeschlagen"
|
||
)
|
||
return layer
|
||
|
||
|
||
from sn_basis.functions.qgiscore_wrapper import QgsVectorLayer
|
||
|
||
def _pufferlayer_exists_in_source(self, source: str) -> bool:
|
||
"""
|
||
Prüft, ob im Source (z.B. GPKG) ein Layer namens 'Pufferlayer' existiert.
|
||
"""
|
||
if not source:
|
||
return False
|
||
|
||
uri = f"{source}|layername=Pufferlayer"
|
||
layer = QgsVectorLayer(uri, "Pufferlayer", "ogr")
|
||
|
||
return layer.isValid()
|
||
|
||
|
||
|
||
def Pufferlayer_erstellen(
|
||
self,
|
||
basis_layer: QgsVectorLayer,
|
||
distance: float,
|
||
name: str,
|
||
source: str | None = None,
|
||
) -> QgsVectorLayer | None:
|
||
"""
|
||
Erzeugt einen rechteckigen Pufferlayer (BBOX + Abstand)
|
||
um das Verfahrensgebiet.
|
||
|
||
- Ohne Source → temporärer Memory-Layer
|
||
- Mit Source → Schreiben über Datenschreiber
|
||
|
||
Parameters
|
||
----------
|
||
basis_layer : QgsVectorLayer
|
||
Verfahrensgebiet-Layer.
|
||
distance : float
|
||
Pufferabstand in Metern.
|
||
name : str
|
||
Name des Ziel-Layers.
|
||
source : str | None
|
||
Zielquelle (z.B. Verfahrens-DB) oder None für temporär.
|
||
|
||
Returns
|
||
-------
|
||
QgsVectorLayer | None
|
||
Neuer Pufferlayer oder None bei Fehler.
|
||
"""
|
||
if not basis_layer or not basis_layer.isValid():
|
||
self._log("❌ Basislayer ungültig – kein Puffer möglich")
|
||
return None
|
||
|
||
# --------------------------------------------------
|
||
# 1. Rechteck-Geometrie (Extent + Puffer)
|
||
# --------------------------------------------------
|
||
extent = basis_layer.extent().buffered(distance)
|
||
bbox_geom = QgsGeometry.fromRect(extent)
|
||
|
||
# --------------------------------------------------
|
||
# 2. CRS übernehmen
|
||
# --------------------------------------------------
|
||
crs_auth = basis_layer.crs().authid()
|
||
uri = f"Polygon?crs={crs_auth}"
|
||
|
||
mem_layer = QgsVectorLayer(uri, name, "memory")
|
||
provider = mem_layer.dataProvider()
|
||
provider.addAttributes([
|
||
QgsField("id", QVariant.Int)
|
||
])
|
||
mem_layer.updateFields()
|
||
|
||
|
||
|
||
# --------------------------------------------------
|
||
# 4. Feature erzeugen
|
||
# --------------------------------------------------
|
||
feat = QgsFeature(mem_layer.fields())
|
||
feat.setGeometry(bbox_geom)
|
||
feat["id"] = 1
|
||
provider.addFeature(feat)
|
||
mem_layer.updateExtents()
|
||
|
||
# --------------------------------------------------
|
||
# 5. Temporärer Modus → direkt ins Projekt
|
||
# --------------------------------------------------
|
||
if not source:
|
||
QgsProject.instance().addMapLayer(mem_layer)
|
||
self._log("✔ Temporärer rechteckiger Pufferlayer erzeugt")
|
||
return mem_layer
|
||
|
||
# --------------------------------------------------
|
||
# 6. Persistenter Modus → Datenschreiber
|
||
# --------------------------------------------------
|
||
writer = Datenschreiber(
|
||
pruefmanager=self.pruefmanager,
|
||
gpkg_path=source,
|
||
)
|
||
|
||
daten_dict = {
|
||
"daten": {
|
||
name: {
|
||
"layer": mem_layer
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
results = writer.schreibe_Daten(
|
||
daten_dict=daten_dict,
|
||
processed_results=[],
|
||
speicherort=source,
|
||
)
|
||
|
||
if not results:
|
||
self._log("❌ Schreiben des Pufferlayers fehlgeschlagen")
|
||
return None
|
||
|
||
writer.lade_Layer(results)
|
||
|
||
layers = QgsProject.instance().mapLayersByName(name)
|
||
if layers:
|
||
self._log("✔ Persistenter rechteckiger Pufferlayer geladen")
|
||
return layers[0]
|
||
|
||
return None |