Files
Plugin_SN_Plan41/ui/tab_a_logic.py

1070 lines
40 KiB
Python
Raw Normal View History

2026-01-08 17:13:43 +01:00
"""
sn_plan41/ui/tab_a_logic.py Fachlogik für Tab A (Daten)
"""
from __future__ import annotations
2026-03-12 16:14:24 +01:00
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
2026-01-08 17:13:43 +01:00
from typing import Any, Dict, List, Optional
from collections.abc import Mapping as _Mapping
import os
2026-03-12 16:14:24 +01:00
import datetime
import json
import tempfile
from sn_basis.functions.qgiscore_wrapper import (
QgsVectorFileWriter,
QgsVectorLayer,
QgsProject,
QgsGeometry,
QgsFeature,
QgsField,
2026-03-12 16:14:24 +01:00
QgsFeatureRequest,
QgsCoordinateReferenceSystem,
)
from sn_basis.functions.variable_wrapper import (
2026-01-08 17:13:43 +01:00
get_variable,
set_variable,
)
from sn_basis.functions.ly_existence_wrapper import layer_exists
from sn_basis.functions.ly_metadata_wrapper import get_layer_type
from sn_basis.functions.qt_wrapper import QVariant
2026-03-12 16:14:24 +01:00
from sn_basis.functions.dialog_wrapper import create_progress_dialog
from sn_basis.functions.message_wrapper import info, warning, error
# Prüfer-Typen
from sn_basis.modules.Pruefmanager import Pruefmanager
from sn_basis.modules.linkpruefer import Linkpruefer
from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.Dateipruefer import Dateipruefer
from sn_basis.modules.layerpruefer import Layerpruefer
2026-03-12 16:14:24 +01:00
from sn_basis.modules.LayerLoader import LayerLoader
from sn_basis.modules.Datenschreiber import Datenschreiber
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.modules.DataGrabber import DataGrabber, SourceType, SourceDict
2026-03-12 16:14:24 +01:00
from sn_basis.modules.Datenabruf import Datenabruf
Row = Dict[str, Any]
DataDict = Dict[str, List[Row]]
2026-01-08 17:13:43 +01:00
class TabALogic:
"""
Kapselt die Fachlogik von Tab A. Verfahrens-DB wird **nicht** bei Pfad-Auswahl,
sondern erst beim ersten Layer-Schreiben angelegt.
2026-01-08 17:13:43 +01:00
"""
def __init__(self, pruefmanager: Pruefmanager, link_pruefer: Linkpruefer, stil_pruefer: Stilpruefer) -> None:
self.pruefmanager = pruefmanager
self.link_pruefer = link_pruefer
self.stil_pruefer = stil_pruefer
self.data_grabber: Optional[DataGrabber] = None
def _log(self, msg: str) -> None:
print(f"[TabALogic] {msg}")
2026-01-08 17:13:43 +01:00
# -------------------------------
# Verfahrens-Datenbank (Pfad-Management)
2026-01-08 17:13:43 +01:00
# -------------------------------
def load_verfahrens_db(self) -> Optional[str]:
"""Lädt den gespeicherten Verfahrens-DB-Pfad (Datei muss nicht existieren)."""
2026-01-08 17:13:43 +01:00
path = get_variable("verfahrens_db", scope="project")
return path or None
2026-01-08 17:13:43 +01:00
def set_verfahrens_db(self, path: Optional[str]) -> None:
"""Speichert den Verfahrens-DB-Pfad (Datei wird später angelegt)."""
2026-01-08 17:13:43 +01:00
if path:
set_variable("verfahrens_db", path, scope="project")
else:
set_variable("verfahrens_db", "", scope="project")
# -------------------------------
# Layer → Verfahrens-DB schreiben (alte Logik!)
# -------------------------------
def write_layer_to_verfahrens_db(
self,
source_layer: QgsVectorLayer,
zielpfad: str,
layer_name: str,
) -> bool:
"""
Schreibt einen Layer in die Verfahrens-DB.
Legt GPKG **bei Bedarf neu an** (wie puffer_setzen im alten Code).
Args:
source_layer: Layer zum Exportieren (z.B. aus DataGrabber)
zielpfad: Vom Dateiprüfer geprüfter Ziel-GPKG-Pfad
layer_name: Name des Layers in der GPKG
Returns:
True wenn erfolgreich
"""
if not zielpfad or not source_layer or not source_layer.isValid():
2026-01-08 17:13:43 +01:00
return False
# Optionen wie im alten puffer_setzen
opts = QgsVectorFileWriter.SaveVectorOptions()
opts.driverName = "GPKG"
opts.fileEncoding = "UTF-8"
opts.layerName = layer_name
# Alte Logik: bei neuem Pfad komplett neue GPKG, sonst Layer überschreiben
if not os.path.exists(zielpfad):
opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteFile
else:
opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer
transform_context = QgsProject.instance().transformContext()
error = QgsVectorFileWriter.writeAsVectorFormatV3(
source_layer,
zielpfad,
transform_context,
opts,
)
if error != QgsVectorFileWriter.NoError:
print(f"Fehler beim Schreiben nach {zielpfad}: {error}")
2026-01-08 17:13:43 +01:00
return False
# Pfad jetzt auch als "Verfahrens-DB" merken
self.set_verfahrens_db(zielpfad)
2026-01-08 17:13:43 +01:00
return True
# -------------------------------
# Lokale Linkliste
# -------------------------------
def load_linkliste(self) -> Optional[str]:
path = get_variable("linkliste", scope="project")
if path and file_exists(path):
return path
return None
def set_linkliste(self, path: Optional[str]) -> None:
if path:
set_variable("linkliste", path, scope="project")
else:
set_variable("linkliste", "", scope="project")
# -------------------------------
# Verfahrensgebiet-Layer
# -------------------------------
def save_verfahrensgebiet_layer(self, layer: QgsVectorLayer) -> None:
"""Speichert die Verfahrensgebiet-Layer-ID, unter Annahme, dass der Layer prevalidiert ist."""
layer_id = layer.id() if layer is not None else ""
set_variable("verfahrensgebiet_layer", layer_id or "", scope="project")
2026-01-08 17:13:43 +01:00
def load_verfahrensgebiet_layer_id(self) -> Optional[str]:
value = get_variable("verfahrensgebiet_layer", scope="project")
return value or None
def is_valid_verfahrensgebiet_layer(self, layer) -> bool:
if not layer_exists(layer):
return False
layer_type = get_layer_type(layer)
return layer_type == "vector"
# === PIPELINE ===
def _on_run_pipeline(
self,
source: str,
linkliste: str | None,
raumfilter: str,
2026-03-12 16:14:24 +01:00
progress: Optional[Any] = None,
) -> Optional[Dict[str, Any]]:
"""Pipeline starten; Linkliste wird ausgelesen und geprüft, dann Datenabruf ausgeführt."""
self._log("Pipeline startet")
if not self.pruefmanager or not self.data_grabber:
2026-03-12 16:14:24 +01:00
self._log("Fehler: Pruefmanager oder DataGrabber fehlt")
return None
2026-03-12 16:14:24 +01:00
# 1) Verfahrens-DB prüfen und als aktive DB setzen
datei_ergebnis = Dateipruefer(
source,
basis_pfad="",
leereingabe_erlaubt=False,
standarddatei=None,
temporaer_erlaubt=True,
verfahrens_db_modus=True,
).pruefe()
2026-03-12 16:14:24 +01:00
datei_ergebnis = self.pruefmanager.verarbeite(datei_ergebnis)
if not datei_ergebnis.ok:
self._log("Verfahrens-DB-Pruefung fehlgeschlagen")
return None
2026-03-12 16:14:24 +01:00
final_pfad = str(datei_ergebnis.kontext or source)
self.set_verfahrens_db(final_pfad)
2026-03-12 16:14:24 +01:00
# Nach bestätigter Entscheidung: sofort Fortschrittsdialog zeigen
if progress is None:
progress = create_progress_dialog(1, "Fachdaten laden", "Prüfe Eingaben...")
else:
progress.set_total(1)
progress.set_value(0)
progress.set_label("Prüfe Eingaben...")
2026-03-12 16:14:24 +01:00
# 2) Linkliste auflösen, falls leer Standardlinkliste verwenden
linkliste_final = self._resolve_linkliste(linkliste)
if linkliste_final is None:
self._log("Linkliste kann nicht aufgelöst werden")
return None
else:
self._log(f"Linkliste final: '{linkliste_final}'")
# 3) Raumfilter prüfen
raumfilter_layer = self._resolve_raumfilter(raumfilter, final_pfad)
if raumfilter in ("Verfahrensgebiet", "Pufferlayer") and raumfilter_layer is None:
2026-03-12 16:14:24 +01:00
self._log(f"Raumfilter '{raumfilter}' nicht verfügbar")
return None
# 4) Lade-Status initialisieren (funktioniert ab Bestätigung überschreiben/anhängen)
if progress is None:
# placeholder mit 1; tatsächliche Gesamtzahl kennt DataGrabber später
progress = create_progress_dialog(1, "Fachdaten laden", "Prüfe Eingaben...")
else:
progress.set_total(1)
progress.set_value(0)
progress.set_label("Prüfe Eingaben...")
# 5) Daten aus Linkliste laden und prüfen
source_dict, grabber_summary = self.data_grabber.run(linkliste_final)
self._log(f"DataGrabber: {grabber_summary.meldung} [{grabber_summary.aktion}]")
# DEBUG: detaillierter Status
print("[TabALogic] ... Debug: source_dict keys:", list(source_dict.keys()))
print("[TabALogic] ... Debug: rows count:", len(source_dict.get("rows", [])))
if source_dict.get("rows"):
for i, row in enumerate(source_dict.get("rows", []), start=1):
print(f"[TabALogic] ... Debug: row {i}: {row}")
if not source_dict.get("rows"):
self._log("Keine validen Linkliste-Einträge für Datenabruf")
print("[TabALogic] ... STOP: rows:", len(source_dict.get("rows", [])))
return None
total_rows = len(source_dict.get("rows", []))
if progress is not None:
if hasattr(progress, "set_total"):
progress.set_total(max(total_rows, 1))
elif hasattr(progress, "setMaximum"):
progress.setMaximum(max(total_rows, 1))
else:
progress.total = max(total_rows, 1)
progress.set_value(0)
progress.set_label("Lade Daten...")
if not grabber_summary.ok:
self._log("Warnung: DataGrabber meldet fehlerhafte Zeilen, fahre mit Validierungsdaten fort")
# 5) Datenabruf (aus validierten Zeilen)
datenabruf = Datenabruf(self.pruefmanager)
result_dict, datenabruf_results = datenabruf.datenabruf(
result_dict=source_dict,
raumfilter=raumfilter,
verfahrensgebiet_layer=raumfilter_layer,
speicherort=final_pfad,
pruef_ergebnisse=[grabber_summary],
progress=progress,
)
self._log("Datenabruf abgeschlossen")
pipeline_context = {
"source": final_pfad,
"linkliste": linkliste_final,
"raumfilter": raumfilter_layer,
2026-03-12 16:14:24 +01:00
"raumfilter_name": raumfilter,
"source_dict": source_dict,
"result_dict": result_dict,
"datenabruf_results": datenabruf_results,
}
2026-03-12 16:14:24 +01:00
# 6) Lade Dienste in das Projekt aus result_dict
load_summary = self._load_dienste_aus_result_dict(source_dict, pipeline_context, progress=progress)
2026-03-12 16:14:24 +01:00
if progress is not None:
progress.set_value(total_rows)
2026-03-19 15:45:06 +01:00
progress.set_label("Fachdaten laden abgeschlossen. Bitte OK klicken, um den Dialog zu schließen.")
2026-03-12 16:14:24 +01:00
# 7) Log-Datei schreiben
self._write_markdown_log(final_pfad, source_dict, pipeline_context, load_summary)
print("=" * 60 + "\n")
return pipeline_context
2026-03-12 16:14:24 +01:00
def _load_dienste_aus_result_dict(self, source_dict: DataDict, pipeline_context: Dict[str, Any], progress: Optional[Any] = None) -> Dict[str, Any]:
"""Lädt Dienste (aus Linkliste) ins Projekt und persistiert optional mit Datenschreiber."""
rows = source_dict.get("rows", [])
total = len(rows)
loaded_count = 0
skipped_outside = 0
aborted = False
if not rows:
self._log("Keine Dienste zum Laden")
return
2026-03-12 16:14:24 +01:00
final_pfad = pipeline_context.get("source") or ""
use_datenschreiber = bool(final_pfad)
datenschreiber = None
if use_datenschreiber:
datenschreiber = Datenschreiber(self.pruefmanager, gpkg_path=final_pfad)
daten_dict: Dict[str, Any] = {"daten": {}}
raumfilter_layer = pipeline_context.get("raumfilter")
raumfilter_name = pipeline_context.get("raumfilter_name", "unbekannt")
raumfilter_crs_authid = None
if raumfilter_layer is not None and hasattr(raumfilter_layer, "crs") and callable(getattr(raumfilter_layer, "crs")):
try:
crs = raumfilter_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
raumfilter_crs_authid = crs.authid()
except Exception:
raumfilter_crs_authid = None
# Für den späteren Filter benötigen wir entweder die reine Ausdehnung
# (Pufferlayer) oder im Falle eines echten Verfahrensgebiets die
# vollständige Geometrie. Die Filtermethode wird im Schleifenrumpf
# ausgewählt.
raumfilter_extent = None
if raumfilter_layer is not None and getattr(raumfilter_layer, 'extent', None) is not None:
raumfilter_extent = raumfilter_layer.extent()
temp_layers: List[Any] = []
layer_loader = LayerLoader(self.pruefmanager, stil_pruefer=self.stil_pruefer, layer_pruefer=self.link_pruefer)
# Statistiken für Log: Raumfilter-Info pro Dienst
row_stats: List[Dict[str, Any]] = []
layer_call_status: Dict[str, str] = {}
for idx, row in enumerate(rows, start=1):
ident = str(row.get("ident") or "")
provider = str(row.get("Provider", "")).lower()
link = str(row.get("Link", ""))
thema = str(row.get("Inhalt") or row.get("ident") or "Dienst")
style = row.get("stildatei")
daten_map = (pipeline_context.get("result_dict") or {}).get("daten", {})
fetched_features = daten_map.get(ident, []) if isinstance(daten_map, dict) else []
fetched_count = len(fetched_features) if isinstance(fetched_features, list) else None
if progress is not None:
progress.set_label(f"Lade Dienst {idx}/{total}: {thema}")
progress.set_value(idx)
if progress.is_canceled():
aborted = True
layer_call_status[thema] = "abbruch_vor_layeraufruf"
self._log("Nutzerabbruch: Pipeline gestoppt")
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung="Pipeline durch Benutzer abgebrochen",
aktion="abbruch",
kontext={"dienst": thema, "schritt": idx},
)
)
break
self._log(f"Lade Dienst '{thema}' ({provider})")
self._log(f"[DEBUG] Layeraufruf startet: thema='{thema}', provider='{provider}', link='{link}'")
layer_call_status[thema] = "layeraufruf_start"
layer = layer_loader.create_layer(provider, link, thema)
if not layer:
layer_call_status[thema] = "layer_nicht_ladbar"
self._log(f"[DEBUG] Layeraufruf fehlgeschlagen: thema='{thema}'")
row_stats.append({
"dienst": thema,
"provider": provider,
"link": link,
"style": style or "",
"datenabruf_features": fetched_count,
"total_features": None,
"filtered_features": None,
"status": "layer_nicht_ladbar",
"raumfilter": raumfilter_name,
})
continue
layer_call_status[thema] = "layeraufruf_ok"
self._log(f"[DEBUG] Layeraufruf erfolgreich: thema='{thema}'")
if progress is not None and progress.is_canceled():
aborted = True
layer_call_status[thema] = "abbruch_nach_layeraufruf"
self._log("Nutzerabbruch nach Layer-Erzeugung")
break
# Je nach Typ des Filters einen geeigneten Filter anwenden.
if raumfilter_layer and raumfilter_name == "Verfahrensgebiet":
# echte Geometrie-Schnittmenge, nicht nur BBox
layer_for_write = layer_loader.filter_by_layer(
layer,
raumfilter_layer,
cancel_callback=(progress.is_canceled if progress is not None else None),
)
else:
layer_for_write = layer_loader.filter_by_extent(
layer,
raumfilter_extent,
cancel_callback=(progress.is_canceled if progress is not None else None),
source_layer=raumfilter_layer,
)
if progress is not None and progress.is_canceled():
aborted = True
layer_call_status[thema] = "abbruch_nach_raumfilter"
self._log("Nutzerabbruch nach Raumfilter")
break
# Zähle Features vor/nach Raumfilter
total_features = None
filtered_features = None
try:
if layer is not None and hasattr(layer, "featureCount"):
total_features = int(layer.featureCount())
except Exception:
total_features = None
if layer_for_write is not None and hasattr(layer_for_write, "featureCount"):
try:
filtered_features = int(layer_for_write.featureCount())
except Exception:
filtered_features = None
if not layer_for_write:
layer_call_status[thema] = "raumfilter_ausserhalb"
self._log(f"Dienst {thema} ist außerhalb des Raumfilters")
skipped_outside += 1
row_stats.append({
"dienst": thema,
"provider": provider,
"link": link,
"style": style or "",
"datenabruf_features": fetched_count,
"total_features": total_features,
"filtered_features": 0,
"status": "außerhalb",
"raumfilter": raumfilter_name,
})
continue
if style:
layer_loader.apply_style(layer_for_write, style)
row_stats.append({
"dienst": thema,
"provider": provider,
"link": link,
"style": style or "",
"datenabruf_features": fetched_count,
"total_features": total_features,
"filtered_features": filtered_features,
"status": "geladen",
"raumfilter": raumfilter_name,
})
layer_call_status[thema] = "geladen"
self._log(f"[DEBUG] Dienst geladen: thema='{thema}', provider='{provider}', filtered_features={filtered_features}")
if provider == "wms":
# WMS ist Raster und wird nicht in GPKG geschrieben.
# Im temporären Modus wird er trotzdem direkt geladen.
loaded_count += 1
if use_datenschreiber:
self._log(f"WMS-Layer {thema} wird nicht in GPKG gespeichert, nur in Projekt (temporär)")
# Während Datenbankmodus: wir speichern nicht in daten_dict,
# aber für gute Sichtbarkeit laden wir nach erfolgreichem Schreibprozess.
temp_layers.append(layer)
else:
temp_layers.append(layer)
continue
if use_datenschreiber and datenschreiber:
daten_dict["daten"][thema] = {
"layer": layer_for_write,
"style_path": style,
}
else:
temp_layers.append(layer_for_write)
loaded_count += 1
if use_datenschreiber and datenschreiber and daten_dict["daten"]:
self._log(f"Schreibe {len(daten_dict['daten'])} Layer in {final_pfad}")
results = datenschreiber.schreibe_Daten(
daten_dict,
processed_results=pipeline_context.get("datenabruf_results", []),
speicherort=final_pfad,
)
datenschreiber.lade_Layer(results)
self._log("Datenschreiber abgeschlossen")
elif temp_layers:
self._log(f"Temporärmodus: Lade {len(temp_layers)} Layer ins Projekt")
for layer in temp_layers:
QgsProject.instance().addMapLayer(layer)
self._log("Temporärmodus: Layer im Projekt geladen")
else:
self._log("Keine Layer zum Laden (kein persistierter GPkg-Write).")
self._log(f"Dienst-Laden fertig ({len(rows)} Zeilen)")
return {
"row_count": len(rows),
"loaded_count": loaded_count,
"skipped_outside": skipped_outside,
"aborted": aborted,
"row_stats": row_stats,
"layer_call_status": layer_call_status,
"raumfilter_name": raumfilter_name,
}
2026-03-12 16:14:24 +01:00
def _create_local_layer_from_fetched_features(
self,
thema: str,
features: List[Any],
crs_authid: Optional[str] = None,
) -> Optional[QgsVectorLayer]:
"""Erzeugt aus bereits geholten GeoJSON-Features einen lokalen OGR-Layer.
Verhindert einen zweiten potentiell blockierenden Remote-Aufruf (WFS/REST).
"""
if not features:
return None
normalized_features: List[Dict[str, Any]] = []
detected_crs_authid: Optional[str] = None
for feature in features:
if not isinstance(feature, dict):
continue
# Fall 1: bereits GeoJSON-Feature
if feature.get("type") == "Feature" and isinstance(feature.get("geometry"), dict):
normalized_features.append(feature)
continue
# Fall 2: ArcGIS Feature-JSON -> GeoJSON konvertieren
attributes = feature.get("attributes")
geometry = feature.get("geometry")
if not isinstance(attributes, dict) or not isinstance(geometry, dict):
continue
if detected_crs_authid is None:
try:
sr = geometry.get("spatialReference")
if isinstance(sr, dict):
wkid = sr.get("latestWkid") or sr.get("wkid")
if wkid:
detected_crs_authid = f"EPSG:{int(wkid)}"
except Exception:
detected_crs_authid = None
geojson_geometry: Dict[str, Any] | None = None
# Point
if "x" in geometry and "y" in geometry:
geojson_geometry = {
"type": "Point",
"coordinates": [geometry.get("x"), geometry.get("y")],
}
# MultiPoint
elif isinstance(geometry.get("points"), list):
geojson_geometry = {
"type": "MultiPoint",
"coordinates": geometry.get("points", []),
}
# LineString / MultiLineString
elif isinstance(geometry.get("paths"), list):
paths = geometry.get("paths", [])
if len(paths) == 1:
geojson_geometry = {
"type": "LineString",
"coordinates": paths[0],
}
else:
geojson_geometry = {
"type": "MultiLineString",
"coordinates": paths,
}
# Polygon / MultiPolygon
elif isinstance(geometry.get("rings"), list):
rings = geometry.get("rings", [])
cleaned_rings = [
ring for ring in rings
if isinstance(ring, list) and len(ring) >= 4
]
if len(cleaned_rings) == 1:
geojson_geometry = {
"type": "Polygon",
"coordinates": cleaned_rings,
}
elif len(cleaned_rings) > 1:
# Robuster Fallback für ArcGIS-Ringe:
# Mehrere Ringe werden als MultiPolygon behandelt,
# damit nicht versehentlich alle Ringe als Löcher eines
# einzigen Polygons interpretiert werden.
geojson_geometry = {
"type": "MultiPolygon",
"coordinates": [[ring] for ring in cleaned_rings],
}
if geojson_geometry is None:
continue
normalized_features.append(
{
"type": "Feature",
"geometry": geojson_geometry,
"properties": attributes,
}
)
if not normalized_features:
self._log(f"[DEBUG] Keine konvertierbaren Features für lokalen Layer: thema='{thema}'")
return None
try:
payload = {
"type": "FeatureCollection",
"features": normalized_features,
}
with tempfile.NamedTemporaryFile(
suffix=".geojson",
delete=False,
mode="w",
encoding="utf-8",
) as fh:
json.dump(payload, fh, ensure_ascii=False)
tmp_path = fh.name
layer = QgsVectorLayer(tmp_path, thema, "ogr")
if layer and layer.isValid():
target_crs = detected_crs_authid or crs_authid
if target_crs and QgsCoordinateReferenceSystem is not None and hasattr(layer, "setCrs"):
try:
layer.setCrs(QgsCoordinateReferenceSystem(target_crs))
except Exception:
pass
self._log(
f"[DEBUG] Lokaler Layer gültig: thema='{thema}', "
f"input_features={len(features)}, geojson_features={len(normalized_features)}, "
f"layer_features={layer.featureCount()}, crs='{target_crs or 'unbekannt'}'"
)
return layer
self._log(f"[DEBUG] Lokaler Layer aus Datenabruf ungültig: thema='{thema}', pfad='{tmp_path}'")
return None
except Exception as exc:
self._log(f"[DEBUG] Fehler beim Erzeugen lokaler Featureschicht für {thema}: {exc}")
return None
def _write_markdown_log(
self,
final_pfad: str,
source_dict: DataDict,
pipeline_context: Dict[str, Any],
load_summary: Dict[str, Any],
) -> None:
"""Schreibt den Pipeline-Log (Markdown)."""
lines = [
"# Plan41 Fachdaten-Ladevorgang",
"",
f"**Datum**: {datetime.datetime.now().isoformat()}",
f"**Verfahrens-DB**: {final_pfad or 'temporär'}",
f"**Linkliste**: {pipeline_context.get('linkliste')}",
"",
"## Zusammenfassung",
"",
f"- **Zeilen gesamt**: {load_summary.get('row_count', 0)}",
f"- **Geladene Dienste**: {load_summary.get('loaded_count', 0)}",
f"- **Außerhalb Raumfilter**: {load_summary.get('skipped_outside', 0)}",
f"- **Abgebrochen**: {load_summary.get('aborted', False)}",
f"- **Raumfilter**: {load_summary.get('raumfilter_name', 'unbekannt')}",
f"- **Raumfilter-Typ**: {pipeline_context.get('raumfilter_name', 'unbekannt')}",
"",
"## Dienstliste",
"",
"| Dienst | Provider | Linkadresse | Aufrufstatus | Ergebnisstatus |",
"|---|---|---|---|---|",
]
status_by_dienst = {
str(stat.get("dienst", "")): str(stat.get("status", "n/a"))
for stat in load_summary.get("row_stats", [])
}
aufrufstatus_by_dienst = {
str(key): str(value)
for key, value in (load_summary.get("layer_call_status", {}) or {}).items()
}
for row in source_dict.get('rows', []):
dienst = row.get('Inhalt') or row.get('ident') or ''
provider = row.get('Provider') or ''
link = row.get('Link') or ''
aufrufstatus = aufrufstatus_by_dienst.get(str(dienst), "nicht_aufgerufen")
ergebnisstatus = status_by_dienst.get(str(dienst), "n/a")
lines.append(f"| {dienst} | {provider} | {link} | {aufrufstatus} | {ergebnisstatus} |")
lines.extend([
"",
"## Raumfilter-Statistik",
"",
"| Dienst | Provider | Linkadresse | Datenabruf-Objekte | Gesamt-Objekte | Gefilterte Objekte | Raumfilter | Status |",
"|---|---|---|---|---|---|---|---|",
])
for stat in load_summary.get('row_stats', []):
lines.append(
f"| {stat.get('dienst', '')} | {stat.get('provider', '')} | {stat.get('link', '')} | {stat.get('datenabruf_features', 'n/a')} | {stat.get('total_features', 'n/a')} | {stat.get('filtered_features', 'n/a')} | {stat.get('raumfilter', '')} | {stat.get('status', 'n/a')} |"
)
markdown = "\n".join(lines)
if final_pfad:
log_dir = os.path.dirname(final_pfad)
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "plan41_lade_log.md")
try:
with open(log_file, "w", encoding="utf-8") as fh:
fh.write(markdown)
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=True,
meldung=f"Lade-Log gespeichert: {log_file}",
aktion="log_geschrieben",
kontext={"log_file": log_file},
)
)
info("Lade-Log", f"Lade-Protokoll gespeichert: {log_file}", duration=10)
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Schreiben des Logle (md): {exc}",
aktion="log_schreiben_fehlgeschlagen",
kontext={"error": str(exc)},
)
)
warning("Lade-Log", f"Konnte Datei nicht schreiben: {exc}", duration=10)
else:
# temporärer Modus: nur anzeigen
info("Lade-Log (temporär)", markdown, duration=20)
def _clone_layer_with_extent(self, layer: QgsVectorLayer, extent, thema: str) -> QgsVectorLayer | None:
"""Erstellt eine Memory-Kopie von <layer> mit Geometrien im BBOX-Raumfilter."""
try:
request = QgsFeatureRequest().setFilterRect(extent)
features = list(layer.getFeatures(request))
if not features:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{thema}_BBOX", "memory")
if not filtered_layer or not filtered_layer.isValid():
self._log(f"Fehler beim Erzeugen des temporären Filterlayers für {thema}")
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(features)
filtered_layer.updateExtents()
return filtered_layer
except Exception as e:
self._log(f"Fehler beim Filtern von {thema} nach Raumfilter: {e}")
return None
def _resolve_linkliste(self, linkliste: str | None) -> str | None:
"""
Prüft und normalisiert den Linklisten-Pfad.
Rückgabe:
- gültiger Pfad zur Linkliste (str)
- None Pipeline abbrechen
"""
# --------------------------------------------------
# Standard-Linkliste (plattformneutral)
# --------------------------------------------------
plugin_root = get_plugin_root()
2026-03-12 16:14:24 +01:00
standard_linkliste = join_path(plugin_root, "sn_plan41","assets", "Linkliste.xlsx")
# --------------------------------------------------
# 🔹 LEERE EINGABE → AUTOMATISCH STANDARDDATEI
# --------------------------------------------------
if not linkliste:
linkliste_final = str(standard_linkliste)
self.set_linkliste(linkliste_final)
return linkliste_final
# --------------------------------------------------
# Dateiprüfung nur bei expliziter Eingabe
# --------------------------------------------------
pruefer = Dateipruefer(
pfad=linkliste,
leereingabe_erlaubt=True,
standarddatei=str(standard_linkliste),
)
ergebnis = pruefer.pruefe()
# --------------------------------------------------
# Entscheidung über Pruefmanager
# --------------------------------------------------
ergebnis = self.pruefmanager.verarbeite(ergebnis)
if not ergebnis.ok:
# Nutzer hat abgebrochen oder Fehler nicht bestätigt
return None
# --------------------------------------------------
# Erfolgsfall → geprüften Pfad übernehmen
# --------------------------------------------------
linkliste_final = str(ergebnis.kontext)
# Optional: Projektvariable aktualisieren
self.set_linkliste(linkliste_final)
return linkliste_final
def _resolve_raumfilter(self, raumfilter: str, source: str) -> QgsVectorLayer | None:
self._log(f"Raumfilter-Auswahl: '{raumfilter}'")
self._log(f"Source: '{source}'")
if raumfilter == "Verfahrensgebiet":
layer = self._get_verfahrensgebiet_layer()
self._log(
"Verfahrensgebiet gefunden"
if layer else
"❌ Kein Verfahrensgebiet im Projekt"
)
return layer
if raumfilter == "Pufferlayer":
self._log("Pufferlayer-Modus aktiv")
return self._handle_pufferlayer(source)
self._log("Kein Raumfilter gewählt")
return None
def _get_verfahrensgebiet_layer(self) -> QgsVectorLayer | None:
layer_id = self.load_verfahrensgebiet_layer_id()
self._log(f"Verfahrensgebiet-Layer-ID: {layer_id}")
if not layer_id:
self._log("❌ Keine Layer-ID gespeichert")
return None
layer = QgsProject.instance().mapLayer(layer_id)
if not layer:
self._log("❌ Layer-ID existiert nicht im Projekt")
return None
if not self.is_valid_verfahrensgebiet_layer(layer):
self._log("❌ Layer ist kein gültiger Vektorlayer")
return None
self._log(f"Verfahrensgebiet-Layer OK: '{layer.name()}'")
return layer
def _handle_pufferlayer(self, source: str) -> QgsVectorLayer | None:
self._log("Prüfe vorhandenen Pufferlayer im Projekt")
existing = self._load_existing_pufferlayer()
if existing:
self._log("✔ Pufferlayer bereits im Projekt vorhanden")
return existing
self._log("Kein Pufferlayer im Projekt")
if source:
self._log("Prüfe Pufferlayer im Source")
exists = self._pufferlayer_exists_in_source(source)
self._log(f"Pufferlayer im Source vorhanden: {exists}")
if exists:
return self._load_existing_pufferlayer() or self._create_pufferlayer()
self._log("Erzeuge neuen Pufferlayer")
return self._create_pufferlayer()
def _load_existing_pufferlayer(self) -> QgsVectorLayer | None:
"""
Liefert einen vorhandenen Pufferlayer aus dem Projekt.
"""
layers = QgsProject.instance().mapLayersByName("Pufferlayer")
return layers[0] if layers else None
def _create_pufferlayer(self) -> QgsVectorLayer | None:
self._log("Starte Pufferlayer-Erstellung")
basis_layer = self._get_verfahrensgebiet_layer()
if not basis_layer:
self._log("❌ Kein Verfahrensgebiet → kein Puffer möglich")
return None
source = self.load_verfahrens_db()
self._log(f"Basislayer: '{basis_layer.name()}'")
layer = self.Pufferlayer_erstellen(
basis_layer=basis_layer,
distance=1000,
name="Pufferlayer",
source=source,
)
self._log(
"✔ Pufferlayer erfolgreich erzeugt"
if layer else
"❌ Pufferlayer-Erstellung fehlgeschlagen"
)
return layer
from sn_basis.functions.qgiscore_wrapper import QgsVectorLayer
def _pufferlayer_exists_in_source(self, source: str) -> bool:
"""
Prüft, ob im Source (z.B. GPKG) ein Layer namens 'Pufferlayer' existiert.
"""
if not source:
return False
uri = f"{source}|layername=Pufferlayer"
layer = QgsVectorLayer(uri, "Pufferlayer", "ogr")
return layer.isValid()
def Pufferlayer_erstellen(
self,
basis_layer: QgsVectorLayer,
distance: float,
name: str,
source: str | None = None,
) -> QgsVectorLayer | None:
"""
Erzeugt einen rechteckigen Pufferlayer (BBOX + Abstand)
um das Verfahrensgebiet.
- Ohne Source temporärer Memory-Layer
- Mit Source Schreiben über Datenschreiber
Parameters
----------
basis_layer : QgsVectorLayer
Verfahrensgebiet-Layer.
distance : float
Pufferabstand in Metern.
name : str
Name des Ziel-Layers.
source : str | None
Zielquelle (z.B. Verfahrens-DB) oder None für temporär.
Returns
-------
QgsVectorLayer | None
Neuer Pufferlayer oder None bei Fehler.
"""
if not basis_layer or not basis_layer.isValid():
self._log("❌ Basislayer ungültig kein Puffer möglich")
return None
# --------------------------------------------------
# 1. Rechteck-Geometrie (Extent + Puffer)
# --------------------------------------------------
extent = basis_layer.extent().buffered(distance)
bbox_geom = QgsGeometry.fromRect(extent)
# --------------------------------------------------
# 2. CRS übernehmen
# --------------------------------------------------
crs_auth = basis_layer.crs().authid()
uri = f"Polygon?crs={crs_auth}"
mem_layer = QgsVectorLayer(uri, name, "memory")
provider = mem_layer.dataProvider()
provider.addAttributes([
QgsField("id", QVariant.Int)
])
mem_layer.updateFields()
# --------------------------------------------------
# 4. Feature erzeugen
# --------------------------------------------------
feat = QgsFeature(mem_layer.fields())
feat.setGeometry(bbox_geom)
feat["id"] = 1
provider.addFeature(feat)
mem_layer.updateExtents()
# --------------------------------------------------
# 5. Temporärer Modus → direkt ins Projekt
# --------------------------------------------------
if not source:
QgsProject.instance().addMapLayer(mem_layer)
self._log("✔ Temporärer rechteckiger Pufferlayer erzeugt")
return mem_layer
# --------------------------------------------------
# 6. Persistenter Modus → Datenschreiber
# --------------------------------------------------
writer = Datenschreiber(
pruefmanager=self.pruefmanager,
gpkg_path=source,
)
daten_dict = {
"daten": {
name: {
"layer": mem_layer
}
}
}
results = writer.schreibe_Daten(
daten_dict=daten_dict,
processed_results=[],
speicherort=source,
)
if not results:
self._log("❌ Schreiben des Pufferlayers fehlgeschlagen")
return None
writer.lade_Layer(results)
layers = QgsProject.instance().mapLayersByName(name)
if layers:
self._log("✔ Persistenter rechteckiger Pufferlayer geladen")
return layers[0]
return None