Files
Plugin_SN_Plan41/ui/tab_a_logic.py
2026-03-19 15:45:06 +01:00

1070 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
sn_plan41/ui/tab_a_logic.py Fachlogik für Tab A (Daten)
"""
from __future__ import annotations
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
from typing import Any, Dict, List, Optional
from collections.abc import Mapping as _Mapping
import os
import datetime
import json
import tempfile
from sn_basis.functions.qgiscore_wrapper import (
QgsVectorFileWriter,
QgsVectorLayer,
QgsProject,
QgsGeometry,
QgsFeature,
QgsField,
QgsFeatureRequest,
QgsCoordinateReferenceSystem,
)
from sn_basis.functions.variable_wrapper import (
get_variable,
set_variable,
)
from sn_basis.functions.ly_existence_wrapper import layer_exists
from sn_basis.functions.ly_metadata_wrapper import get_layer_type
from sn_basis.functions.qt_wrapper import QVariant
from sn_basis.functions.dialog_wrapper import create_progress_dialog
from sn_basis.functions.message_wrapper import info, warning, error
# Prüfer-Typen
from sn_basis.modules.Pruefmanager import Pruefmanager
from sn_basis.modules.linkpruefer import Linkpruefer
from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.Dateipruefer import Dateipruefer
from sn_basis.modules.layerpruefer import Layerpruefer
from sn_basis.modules.LayerLoader import LayerLoader
from sn_basis.modules.Datenschreiber import Datenschreiber
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.modules.DataGrabber import DataGrabber, SourceType, SourceDict
from sn_basis.modules.Datenabruf import Datenabruf
Row = Dict[str, Any]
DataDict = Dict[str, List[Row]]
class TabALogic:
"""
Kapselt die Fachlogik von Tab A. Verfahrens-DB wird **nicht** bei Pfad-Auswahl,
sondern erst beim ersten Layer-Schreiben angelegt.
"""
def __init__(self, pruefmanager: Pruefmanager, link_pruefer: Linkpruefer, stil_pruefer: Stilpruefer) -> None:
self.pruefmanager = pruefmanager
self.link_pruefer = link_pruefer
self.stil_pruefer = stil_pruefer
self.data_grabber: Optional[DataGrabber] = None
def _log(self, msg: str) -> None:
print(f"[TabALogic] {msg}")
# -------------------------------
# Verfahrens-Datenbank (Pfad-Management)
# -------------------------------
def load_verfahrens_db(self) -> Optional[str]:
"""Lädt den gespeicherten Verfahrens-DB-Pfad (Datei muss nicht existieren)."""
path = get_variable("verfahrens_db", scope="project")
return path or None
def set_verfahrens_db(self, path: Optional[str]) -> None:
"""Speichert den Verfahrens-DB-Pfad (Datei wird später angelegt)."""
if path:
set_variable("verfahrens_db", path, scope="project")
else:
set_variable("verfahrens_db", "", scope="project")
# -------------------------------
# Layer → Verfahrens-DB schreiben (alte Logik!)
# -------------------------------
def write_layer_to_verfahrens_db(
self,
source_layer: QgsVectorLayer,
zielpfad: str,
layer_name: str,
) -> bool:
"""
Schreibt einen Layer in die Verfahrens-DB.
Legt GPKG **bei Bedarf neu an** (wie puffer_setzen im alten Code).
Args:
source_layer: Layer zum Exportieren (z.B. aus DataGrabber)
zielpfad: Vom Dateiprüfer geprüfter Ziel-GPKG-Pfad
layer_name: Name des Layers in der GPKG
Returns:
True wenn erfolgreich
"""
if not zielpfad or not source_layer or not source_layer.isValid():
return False
# Optionen wie im alten puffer_setzen
opts = QgsVectorFileWriter.SaveVectorOptions()
opts.driverName = "GPKG"
opts.fileEncoding = "UTF-8"
opts.layerName = layer_name
# Alte Logik: bei neuem Pfad komplett neue GPKG, sonst Layer überschreiben
if not os.path.exists(zielpfad):
opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteFile
else:
opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer
transform_context = QgsProject.instance().transformContext()
error = QgsVectorFileWriter.writeAsVectorFormatV3(
source_layer,
zielpfad,
transform_context,
opts,
)
if error != QgsVectorFileWriter.NoError:
print(f"Fehler beim Schreiben nach {zielpfad}: {error}")
return False
# Pfad jetzt auch als "Verfahrens-DB" merken
self.set_verfahrens_db(zielpfad)
return True
# -------------------------------
# Lokale Linkliste
# -------------------------------
def load_linkliste(self) -> Optional[str]:
path = get_variable("linkliste", scope="project")
if path and file_exists(path):
return path
return None
def set_linkliste(self, path: Optional[str]) -> None:
if path:
set_variable("linkliste", path, scope="project")
else:
set_variable("linkliste", "", scope="project")
# -------------------------------
# Verfahrensgebiet-Layer
# -------------------------------
def save_verfahrensgebiet_layer(self, layer: QgsVectorLayer) -> None:
"""Speichert die Verfahrensgebiet-Layer-ID, unter Annahme, dass der Layer prevalidiert ist."""
layer_id = layer.id() if layer is not None else ""
set_variable("verfahrensgebiet_layer", layer_id or "", scope="project")
def load_verfahrensgebiet_layer_id(self) -> Optional[str]:
value = get_variable("verfahrensgebiet_layer", scope="project")
return value or None
def is_valid_verfahrensgebiet_layer(self, layer) -> bool:
if not layer_exists(layer):
return False
layer_type = get_layer_type(layer)
return layer_type == "vector"
# === PIPELINE ===
def _on_run_pipeline(
self,
source: str,
linkliste: str | None,
raumfilter: str,
progress: Optional[Any] = None,
) -> Optional[Dict[str, Any]]:
"""Pipeline starten; Linkliste wird ausgelesen und geprüft, dann Datenabruf ausgeführt."""
self._log("Pipeline startet")
if not self.pruefmanager or not self.data_grabber:
self._log("Fehler: Pruefmanager oder DataGrabber fehlt")
return None
# 1) Verfahrens-DB prüfen und als aktive DB setzen
datei_ergebnis = Dateipruefer(
source,
basis_pfad="",
leereingabe_erlaubt=False,
standarddatei=None,
temporaer_erlaubt=True,
verfahrens_db_modus=True,
).pruefe()
datei_ergebnis = self.pruefmanager.verarbeite(datei_ergebnis)
if not datei_ergebnis.ok:
self._log("Verfahrens-DB-Pruefung fehlgeschlagen")
return None
final_pfad = str(datei_ergebnis.kontext or source)
self.set_verfahrens_db(final_pfad)
# Nach bestätigter Entscheidung: sofort Fortschrittsdialog zeigen
if progress is None:
progress = create_progress_dialog(1, "Fachdaten laden", "Prüfe Eingaben...")
else:
progress.set_total(1)
progress.set_value(0)
progress.set_label("Prüfe Eingaben...")
# 2) Linkliste auflösen, falls leer Standardlinkliste verwenden
linkliste_final = self._resolve_linkliste(linkliste)
if linkliste_final is None:
self._log("Linkliste kann nicht aufgelöst werden")
return None
else:
self._log(f"Linkliste final: '{linkliste_final}'")
# 3) Raumfilter prüfen
raumfilter_layer = self._resolve_raumfilter(raumfilter, final_pfad)
if raumfilter in ("Verfahrensgebiet", "Pufferlayer") and raumfilter_layer is None:
self._log(f"Raumfilter '{raumfilter}' nicht verfügbar")
return None
# 4) Lade-Status initialisieren (funktioniert ab Bestätigung überschreiben/anhängen)
if progress is None:
# placeholder mit 1; tatsächliche Gesamtzahl kennt DataGrabber später
progress = create_progress_dialog(1, "Fachdaten laden", "Prüfe Eingaben...")
else:
progress.set_total(1)
progress.set_value(0)
progress.set_label("Prüfe Eingaben...")
# 5) Daten aus Linkliste laden und prüfen
source_dict, grabber_summary = self.data_grabber.run(linkliste_final)
self._log(f"DataGrabber: {grabber_summary.meldung} [{grabber_summary.aktion}]")
# DEBUG: detaillierter Status
print("[TabALogic] ... Debug: source_dict keys:", list(source_dict.keys()))
print("[TabALogic] ... Debug: rows count:", len(source_dict.get("rows", [])))
if source_dict.get("rows"):
for i, row in enumerate(source_dict.get("rows", []), start=1):
print(f"[TabALogic] ... Debug: row {i}: {row}")
if not source_dict.get("rows"):
self._log("Keine validen Linkliste-Einträge für Datenabruf")
print("[TabALogic] ... STOP: rows:", len(source_dict.get("rows", [])))
return None
total_rows = len(source_dict.get("rows", []))
if progress is not None:
if hasattr(progress, "set_total"):
progress.set_total(max(total_rows, 1))
elif hasattr(progress, "setMaximum"):
progress.setMaximum(max(total_rows, 1))
else:
progress.total = max(total_rows, 1)
progress.set_value(0)
progress.set_label("Lade Daten...")
if not grabber_summary.ok:
self._log("Warnung: DataGrabber meldet fehlerhafte Zeilen, fahre mit Validierungsdaten fort")
# 5) Datenabruf (aus validierten Zeilen)
datenabruf = Datenabruf(self.pruefmanager)
result_dict, datenabruf_results = datenabruf.datenabruf(
result_dict=source_dict,
raumfilter=raumfilter,
verfahrensgebiet_layer=raumfilter_layer,
speicherort=final_pfad,
pruef_ergebnisse=[grabber_summary],
progress=progress,
)
self._log("Datenabruf abgeschlossen")
pipeline_context = {
"source": final_pfad,
"linkliste": linkliste_final,
"raumfilter": raumfilter_layer,
"raumfilter_name": raumfilter,
"source_dict": source_dict,
"result_dict": result_dict,
"datenabruf_results": datenabruf_results,
}
# 6) Lade Dienste in das Projekt aus result_dict
load_summary = self._load_dienste_aus_result_dict(source_dict, pipeline_context, progress=progress)
if progress is not None:
progress.set_value(total_rows)
progress.set_label("Fachdaten laden abgeschlossen. Bitte OK klicken, um den Dialog zu schließen.")
# 7) Log-Datei schreiben
self._write_markdown_log(final_pfad, source_dict, pipeline_context, load_summary)
print("=" * 60 + "\n")
return pipeline_context
def _load_dienste_aus_result_dict(self, source_dict: DataDict, pipeline_context: Dict[str, Any], progress: Optional[Any] = None) -> Dict[str, Any]:
"""Lädt Dienste (aus Linkliste) ins Projekt und persistiert optional mit Datenschreiber."""
rows = source_dict.get("rows", [])
total = len(rows)
loaded_count = 0
skipped_outside = 0
aborted = False
if not rows:
self._log("Keine Dienste zum Laden")
return
final_pfad = pipeline_context.get("source") or ""
use_datenschreiber = bool(final_pfad)
datenschreiber = None
if use_datenschreiber:
datenschreiber = Datenschreiber(self.pruefmanager, gpkg_path=final_pfad)
daten_dict: Dict[str, Any] = {"daten": {}}
raumfilter_layer = pipeline_context.get("raumfilter")
raumfilter_name = pipeline_context.get("raumfilter_name", "unbekannt")
raumfilter_crs_authid = None
if raumfilter_layer is not None and hasattr(raumfilter_layer, "crs") and callable(getattr(raumfilter_layer, "crs")):
try:
crs = raumfilter_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
raumfilter_crs_authid = crs.authid()
except Exception:
raumfilter_crs_authid = None
# Für den späteren Filter benötigen wir entweder die reine Ausdehnung
# (Pufferlayer) oder im Falle eines echten Verfahrensgebiets die
# vollständige Geometrie. Die Filtermethode wird im Schleifenrumpf
# ausgewählt.
raumfilter_extent = None
if raumfilter_layer is not None and getattr(raumfilter_layer, 'extent', None) is not None:
raumfilter_extent = raumfilter_layer.extent()
temp_layers: List[Any] = []
layer_loader = LayerLoader(self.pruefmanager, stil_pruefer=self.stil_pruefer, layer_pruefer=self.link_pruefer)
# Statistiken für Log: Raumfilter-Info pro Dienst
row_stats: List[Dict[str, Any]] = []
layer_call_status: Dict[str, str] = {}
for idx, row in enumerate(rows, start=1):
ident = str(row.get("ident") or "")
provider = str(row.get("Provider", "")).lower()
link = str(row.get("Link", ""))
thema = str(row.get("Inhalt") or row.get("ident") or "Dienst")
style = row.get("stildatei")
daten_map = (pipeline_context.get("result_dict") or {}).get("daten", {})
fetched_features = daten_map.get(ident, []) if isinstance(daten_map, dict) else []
fetched_count = len(fetched_features) if isinstance(fetched_features, list) else None
if progress is not None:
progress.set_label(f"Lade Dienst {idx}/{total}: {thema}")
progress.set_value(idx)
if progress.is_canceled():
aborted = True
layer_call_status[thema] = "abbruch_vor_layeraufruf"
self._log("Nutzerabbruch: Pipeline gestoppt")
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung="Pipeline durch Benutzer abgebrochen",
aktion="abbruch",
kontext={"dienst": thema, "schritt": idx},
)
)
break
self._log(f"Lade Dienst '{thema}' ({provider})")
self._log(f"[DEBUG] Layeraufruf startet: thema='{thema}', provider='{provider}', link='{link}'")
layer_call_status[thema] = "layeraufruf_start"
layer = layer_loader.create_layer(provider, link, thema)
if not layer:
layer_call_status[thema] = "layer_nicht_ladbar"
self._log(f"[DEBUG] Layeraufruf fehlgeschlagen: thema='{thema}'")
row_stats.append({
"dienst": thema,
"provider": provider,
"link": link,
"style": style or "",
"datenabruf_features": fetched_count,
"total_features": None,
"filtered_features": None,
"status": "layer_nicht_ladbar",
"raumfilter": raumfilter_name,
})
continue
layer_call_status[thema] = "layeraufruf_ok"
self._log(f"[DEBUG] Layeraufruf erfolgreich: thema='{thema}'")
if progress is not None and progress.is_canceled():
aborted = True
layer_call_status[thema] = "abbruch_nach_layeraufruf"
self._log("Nutzerabbruch nach Layer-Erzeugung")
break
# Je nach Typ des Filters einen geeigneten Filter anwenden.
if raumfilter_layer and raumfilter_name == "Verfahrensgebiet":
# echte Geometrie-Schnittmenge, nicht nur BBox
layer_for_write = layer_loader.filter_by_layer(
layer,
raumfilter_layer,
cancel_callback=(progress.is_canceled if progress is not None else None),
)
else:
layer_for_write = layer_loader.filter_by_extent(
layer,
raumfilter_extent,
cancel_callback=(progress.is_canceled if progress is not None else None),
source_layer=raumfilter_layer,
)
if progress is not None and progress.is_canceled():
aborted = True
layer_call_status[thema] = "abbruch_nach_raumfilter"
self._log("Nutzerabbruch nach Raumfilter")
break
# Zähle Features vor/nach Raumfilter
total_features = None
filtered_features = None
try:
if layer is not None and hasattr(layer, "featureCount"):
total_features = int(layer.featureCount())
except Exception:
total_features = None
if layer_for_write is not None and hasattr(layer_for_write, "featureCount"):
try:
filtered_features = int(layer_for_write.featureCount())
except Exception:
filtered_features = None
if not layer_for_write:
layer_call_status[thema] = "raumfilter_ausserhalb"
self._log(f"Dienst {thema} ist außerhalb des Raumfilters")
skipped_outside += 1
row_stats.append({
"dienst": thema,
"provider": provider,
"link": link,
"style": style or "",
"datenabruf_features": fetched_count,
"total_features": total_features,
"filtered_features": 0,
"status": "außerhalb",
"raumfilter": raumfilter_name,
})
continue
if style:
layer_loader.apply_style(layer_for_write, style)
row_stats.append({
"dienst": thema,
"provider": provider,
"link": link,
"style": style or "",
"datenabruf_features": fetched_count,
"total_features": total_features,
"filtered_features": filtered_features,
"status": "geladen",
"raumfilter": raumfilter_name,
})
layer_call_status[thema] = "geladen"
self._log(f"[DEBUG] Dienst geladen: thema='{thema}', provider='{provider}', filtered_features={filtered_features}")
if provider == "wms":
# WMS ist Raster und wird nicht in GPKG geschrieben.
# Im temporären Modus wird er trotzdem direkt geladen.
loaded_count += 1
if use_datenschreiber:
self._log(f"WMS-Layer {thema} wird nicht in GPKG gespeichert, nur in Projekt (temporär)")
# Während Datenbankmodus: wir speichern nicht in daten_dict,
# aber für gute Sichtbarkeit laden wir nach erfolgreichem Schreibprozess.
temp_layers.append(layer)
else:
temp_layers.append(layer)
continue
if use_datenschreiber and datenschreiber:
daten_dict["daten"][thema] = {
"layer": layer_for_write,
"style_path": style,
}
else:
temp_layers.append(layer_for_write)
loaded_count += 1
if use_datenschreiber and datenschreiber and daten_dict["daten"]:
self._log(f"Schreibe {len(daten_dict['daten'])} Layer in {final_pfad}")
results = datenschreiber.schreibe_Daten(
daten_dict,
processed_results=pipeline_context.get("datenabruf_results", []),
speicherort=final_pfad,
)
datenschreiber.lade_Layer(results)
self._log("Datenschreiber abgeschlossen")
elif temp_layers:
self._log(f"Temporärmodus: Lade {len(temp_layers)} Layer ins Projekt")
for layer in temp_layers:
QgsProject.instance().addMapLayer(layer)
self._log("Temporärmodus: Layer im Projekt geladen")
else:
self._log("Keine Layer zum Laden (kein persistierter GPkg-Write).")
self._log(f"Dienst-Laden fertig ({len(rows)} Zeilen)")
return {
"row_count": len(rows),
"loaded_count": loaded_count,
"skipped_outside": skipped_outside,
"aborted": aborted,
"row_stats": row_stats,
"layer_call_status": layer_call_status,
"raumfilter_name": raumfilter_name,
}
def _create_local_layer_from_fetched_features(
self,
thema: str,
features: List[Any],
crs_authid: Optional[str] = None,
) -> Optional[QgsVectorLayer]:
"""Erzeugt aus bereits geholten GeoJSON-Features einen lokalen OGR-Layer.
Verhindert einen zweiten potentiell blockierenden Remote-Aufruf (WFS/REST).
"""
if not features:
return None
normalized_features: List[Dict[str, Any]] = []
detected_crs_authid: Optional[str] = None
for feature in features:
if not isinstance(feature, dict):
continue
# Fall 1: bereits GeoJSON-Feature
if feature.get("type") == "Feature" and isinstance(feature.get("geometry"), dict):
normalized_features.append(feature)
continue
# Fall 2: ArcGIS Feature-JSON -> GeoJSON konvertieren
attributes = feature.get("attributes")
geometry = feature.get("geometry")
if not isinstance(attributes, dict) or not isinstance(geometry, dict):
continue
if detected_crs_authid is None:
try:
sr = geometry.get("spatialReference")
if isinstance(sr, dict):
wkid = sr.get("latestWkid") or sr.get("wkid")
if wkid:
detected_crs_authid = f"EPSG:{int(wkid)}"
except Exception:
detected_crs_authid = None
geojson_geometry: Dict[str, Any] | None = None
# Point
if "x" in geometry and "y" in geometry:
geojson_geometry = {
"type": "Point",
"coordinates": [geometry.get("x"), geometry.get("y")],
}
# MultiPoint
elif isinstance(geometry.get("points"), list):
geojson_geometry = {
"type": "MultiPoint",
"coordinates": geometry.get("points", []),
}
# LineString / MultiLineString
elif isinstance(geometry.get("paths"), list):
paths = geometry.get("paths", [])
if len(paths) == 1:
geojson_geometry = {
"type": "LineString",
"coordinates": paths[0],
}
else:
geojson_geometry = {
"type": "MultiLineString",
"coordinates": paths,
}
# Polygon / MultiPolygon
elif isinstance(geometry.get("rings"), list):
rings = geometry.get("rings", [])
cleaned_rings = [
ring for ring in rings
if isinstance(ring, list) and len(ring) >= 4
]
if len(cleaned_rings) == 1:
geojson_geometry = {
"type": "Polygon",
"coordinates": cleaned_rings,
}
elif len(cleaned_rings) > 1:
# Robuster Fallback für ArcGIS-Ringe:
# Mehrere Ringe werden als MultiPolygon behandelt,
# damit nicht versehentlich alle Ringe als Löcher eines
# einzigen Polygons interpretiert werden.
geojson_geometry = {
"type": "MultiPolygon",
"coordinates": [[ring] for ring in cleaned_rings],
}
if geojson_geometry is None:
continue
normalized_features.append(
{
"type": "Feature",
"geometry": geojson_geometry,
"properties": attributes,
}
)
if not normalized_features:
self._log(f"[DEBUG] Keine konvertierbaren Features für lokalen Layer: thema='{thema}'")
return None
try:
payload = {
"type": "FeatureCollection",
"features": normalized_features,
}
with tempfile.NamedTemporaryFile(
suffix=".geojson",
delete=False,
mode="w",
encoding="utf-8",
) as fh:
json.dump(payload, fh, ensure_ascii=False)
tmp_path = fh.name
layer = QgsVectorLayer(tmp_path, thema, "ogr")
if layer and layer.isValid():
target_crs = detected_crs_authid or crs_authid
if target_crs and QgsCoordinateReferenceSystem is not None and hasattr(layer, "setCrs"):
try:
layer.setCrs(QgsCoordinateReferenceSystem(target_crs))
except Exception:
pass
self._log(
f"[DEBUG] Lokaler Layer gültig: thema='{thema}', "
f"input_features={len(features)}, geojson_features={len(normalized_features)}, "
f"layer_features={layer.featureCount()}, crs='{target_crs or 'unbekannt'}'"
)
return layer
self._log(f"[DEBUG] Lokaler Layer aus Datenabruf ungültig: thema='{thema}', pfad='{tmp_path}'")
return None
except Exception as exc:
self._log(f"[DEBUG] Fehler beim Erzeugen lokaler Featureschicht für {thema}: {exc}")
return None
def _write_markdown_log(
self,
final_pfad: str,
source_dict: DataDict,
pipeline_context: Dict[str, Any],
load_summary: Dict[str, Any],
) -> None:
"""Schreibt den Pipeline-Log (Markdown)."""
lines = [
"# Plan41 Fachdaten-Ladevorgang",
"",
f"**Datum**: {datetime.datetime.now().isoformat()}",
f"**Verfahrens-DB**: {final_pfad or 'temporär'}",
f"**Linkliste**: {pipeline_context.get('linkliste')}",
"",
"## Zusammenfassung",
"",
f"- **Zeilen gesamt**: {load_summary.get('row_count', 0)}",
f"- **Geladene Dienste**: {load_summary.get('loaded_count', 0)}",
f"- **Außerhalb Raumfilter**: {load_summary.get('skipped_outside', 0)}",
f"- **Abgebrochen**: {load_summary.get('aborted', False)}",
f"- **Raumfilter**: {load_summary.get('raumfilter_name', 'unbekannt')}",
f"- **Raumfilter-Typ**: {pipeline_context.get('raumfilter_name', 'unbekannt')}",
"",
"## Dienstliste",
"",
"| Dienst | Provider | Linkadresse | Aufrufstatus | Ergebnisstatus |",
"|---|---|---|---|---|",
]
status_by_dienst = {
str(stat.get("dienst", "")): str(stat.get("status", "n/a"))
for stat in load_summary.get("row_stats", [])
}
aufrufstatus_by_dienst = {
str(key): str(value)
for key, value in (load_summary.get("layer_call_status", {}) or {}).items()
}
for row in source_dict.get('rows', []):
dienst = row.get('Inhalt') or row.get('ident') or ''
provider = row.get('Provider') or ''
link = row.get('Link') or ''
aufrufstatus = aufrufstatus_by_dienst.get(str(dienst), "nicht_aufgerufen")
ergebnisstatus = status_by_dienst.get(str(dienst), "n/a")
lines.append(f"| {dienst} | {provider} | {link} | {aufrufstatus} | {ergebnisstatus} |")
lines.extend([
"",
"## Raumfilter-Statistik",
"",
"| Dienst | Provider | Linkadresse | Datenabruf-Objekte | Gesamt-Objekte | Gefilterte Objekte | Raumfilter | Status |",
"|---|---|---|---|---|---|---|---|",
])
for stat in load_summary.get('row_stats', []):
lines.append(
f"| {stat.get('dienst', '')} | {stat.get('provider', '')} | {stat.get('link', '')} | {stat.get('datenabruf_features', 'n/a')} | {stat.get('total_features', 'n/a')} | {stat.get('filtered_features', 'n/a')} | {stat.get('raumfilter', '')} | {stat.get('status', 'n/a')} |"
)
markdown = "\n".join(lines)
if final_pfad:
log_dir = os.path.dirname(final_pfad)
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "plan41_lade_log.md")
try:
with open(log_file, "w", encoding="utf-8") as fh:
fh.write(markdown)
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=True,
meldung=f"Lade-Log gespeichert: {log_file}",
aktion="log_geschrieben",
kontext={"log_file": log_file},
)
)
info("Lade-Log", f"Lade-Protokoll gespeichert: {log_file}", duration=10)
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Schreiben des Logle (md): {exc}",
aktion="log_schreiben_fehlgeschlagen",
kontext={"error": str(exc)},
)
)
warning("Lade-Log", f"Konnte Datei nicht schreiben: {exc}", duration=10)
else:
# temporärer Modus: nur anzeigen
info("Lade-Log (temporär)", markdown, duration=20)
def _clone_layer_with_extent(self, layer: QgsVectorLayer, extent, thema: str) -> QgsVectorLayer | None:
"""Erstellt eine Memory-Kopie von <layer> mit Geometrien im BBOX-Raumfilter."""
try:
request = QgsFeatureRequest().setFilterRect(extent)
features = list(layer.getFeatures(request))
if not features:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{thema}_BBOX", "memory")
if not filtered_layer or not filtered_layer.isValid():
self._log(f"Fehler beim Erzeugen des temporären Filterlayers für {thema}")
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(features)
filtered_layer.updateExtents()
return filtered_layer
except Exception as e:
self._log(f"Fehler beim Filtern von {thema} nach Raumfilter: {e}")
return None
def _resolve_linkliste(self, linkliste: str | None) -> str | None:
"""
Prüft und normalisiert den Linklisten-Pfad.
Rückgabe:
- gültiger Pfad zur Linkliste (str)
- None → Pipeline abbrechen
"""
# --------------------------------------------------
# Standard-Linkliste (plattformneutral)
# --------------------------------------------------
plugin_root = get_plugin_root()
standard_linkliste = join_path(plugin_root, "sn_plan41","assets", "Linkliste.xlsx")
# --------------------------------------------------
# 🔹 LEERE EINGABE → AUTOMATISCH STANDARDDATEI
# --------------------------------------------------
if not linkliste:
linkliste_final = str(standard_linkliste)
self.set_linkliste(linkliste_final)
return linkliste_final
# --------------------------------------------------
# Dateiprüfung nur bei expliziter Eingabe
# --------------------------------------------------
pruefer = Dateipruefer(
pfad=linkliste,
leereingabe_erlaubt=True,
standarddatei=str(standard_linkliste),
)
ergebnis = pruefer.pruefe()
# --------------------------------------------------
# Entscheidung über Pruefmanager
# --------------------------------------------------
ergebnis = self.pruefmanager.verarbeite(ergebnis)
if not ergebnis.ok:
# Nutzer hat abgebrochen oder Fehler nicht bestätigt
return None
# --------------------------------------------------
# Erfolgsfall → geprüften Pfad übernehmen
# --------------------------------------------------
linkliste_final = str(ergebnis.kontext)
# Optional: Projektvariable aktualisieren
self.set_linkliste(linkliste_final)
return linkliste_final
def _resolve_raumfilter(self, raumfilter: str, source: str) -> QgsVectorLayer | None:
self._log(f"Raumfilter-Auswahl: '{raumfilter}'")
self._log(f"Source: '{source}'")
if raumfilter == "Verfahrensgebiet":
layer = self._get_verfahrensgebiet_layer()
self._log(
"Verfahrensgebiet gefunden"
if layer else
"❌ Kein Verfahrensgebiet im Projekt"
)
return layer
if raumfilter == "Pufferlayer":
self._log("Pufferlayer-Modus aktiv")
return self._handle_pufferlayer(source)
self._log("Kein Raumfilter gewählt")
return None
def _get_verfahrensgebiet_layer(self) -> QgsVectorLayer | None:
layer_id = self.load_verfahrensgebiet_layer_id()
self._log(f"Verfahrensgebiet-Layer-ID: {layer_id}")
if not layer_id:
self._log("❌ Keine Layer-ID gespeichert")
return None
layer = QgsProject.instance().mapLayer(layer_id)
if not layer:
self._log("❌ Layer-ID existiert nicht im Projekt")
return None
if not self.is_valid_verfahrensgebiet_layer(layer):
self._log("❌ Layer ist kein gültiger Vektorlayer")
return None
self._log(f"Verfahrensgebiet-Layer OK: '{layer.name()}'")
return layer
def _handle_pufferlayer(self, source: str) -> QgsVectorLayer | None:
self._log("Prüfe vorhandenen Pufferlayer im Projekt")
existing = self._load_existing_pufferlayer()
if existing:
self._log("✔ Pufferlayer bereits im Projekt vorhanden")
return existing
self._log("Kein Pufferlayer im Projekt")
if source:
self._log("Prüfe Pufferlayer im Source")
exists = self._pufferlayer_exists_in_source(source)
self._log(f"Pufferlayer im Source vorhanden: {exists}")
if exists:
return self._load_existing_pufferlayer() or self._create_pufferlayer()
self._log("Erzeuge neuen Pufferlayer")
return self._create_pufferlayer()
def _load_existing_pufferlayer(self) -> QgsVectorLayer | None:
"""
Liefert einen vorhandenen Pufferlayer aus dem Projekt.
"""
layers = QgsProject.instance().mapLayersByName("Pufferlayer")
return layers[0] if layers else None
def _create_pufferlayer(self) -> QgsVectorLayer | None:
self._log("Starte Pufferlayer-Erstellung")
basis_layer = self._get_verfahrensgebiet_layer()
if not basis_layer:
self._log("❌ Kein Verfahrensgebiet → kein Puffer möglich")
return None
source = self.load_verfahrens_db()
self._log(f"Basislayer: '{basis_layer.name()}'")
layer = self.Pufferlayer_erstellen(
basis_layer=basis_layer,
distance=1000,
name="Pufferlayer",
source=source,
)
self._log(
"✔ Pufferlayer erfolgreich erzeugt"
if layer else
"❌ Pufferlayer-Erstellung fehlgeschlagen"
)
return layer
from sn_basis.functions.qgiscore_wrapper import QgsVectorLayer
def _pufferlayer_exists_in_source(self, source: str) -> bool:
"""
Prüft, ob im Source (z.B. GPKG) ein Layer namens 'Pufferlayer' existiert.
"""
if not source:
return False
uri = f"{source}|layername=Pufferlayer"
layer = QgsVectorLayer(uri, "Pufferlayer", "ogr")
return layer.isValid()
def Pufferlayer_erstellen(
self,
basis_layer: QgsVectorLayer,
distance: float,
name: str,
source: str | None = None,
) -> QgsVectorLayer | None:
"""
Erzeugt einen rechteckigen Pufferlayer (BBOX + Abstand)
um das Verfahrensgebiet.
- Ohne Source → temporärer Memory-Layer
- Mit Source → Schreiben über Datenschreiber
Parameters
----------
basis_layer : QgsVectorLayer
Verfahrensgebiet-Layer.
distance : float
Pufferabstand in Metern.
name : str
Name des Ziel-Layers.
source : str | None
Zielquelle (z.B. Verfahrens-DB) oder None für temporär.
Returns
-------
QgsVectorLayer | None
Neuer Pufferlayer oder None bei Fehler.
"""
if not basis_layer or not basis_layer.isValid():
self._log("❌ Basislayer ungültig kein Puffer möglich")
return None
# --------------------------------------------------
# 1. Rechteck-Geometrie (Extent + Puffer)
# --------------------------------------------------
extent = basis_layer.extent().buffered(distance)
bbox_geom = QgsGeometry.fromRect(extent)
# --------------------------------------------------
# 2. CRS übernehmen
# --------------------------------------------------
crs_auth = basis_layer.crs().authid()
uri = f"Polygon?crs={crs_auth}"
mem_layer = QgsVectorLayer(uri, name, "memory")
provider = mem_layer.dataProvider()
provider.addAttributes([
QgsField("id", QVariant.Int)
])
mem_layer.updateFields()
# --------------------------------------------------
# 4. Feature erzeugen
# --------------------------------------------------
feat = QgsFeature(mem_layer.fields())
feat.setGeometry(bbox_geom)
feat["id"] = 1
provider.addFeature(feat)
mem_layer.updateExtents()
# --------------------------------------------------
# 5. Temporärer Modus → direkt ins Projekt
# --------------------------------------------------
if not source:
QgsProject.instance().addMapLayer(mem_layer)
self._log("✔ Temporärer rechteckiger Pufferlayer erzeugt")
return mem_layer
# --------------------------------------------------
# 6. Persistenter Modus → Datenschreiber
# --------------------------------------------------
writer = Datenschreiber(
pruefmanager=self.pruefmanager,
gpkg_path=source,
)
daten_dict = {
"daten": {
name: {
"layer": mem_layer
}
}
}
results = writer.schreibe_Daten(
daten_dict=daten_dict,
processed_results=[],
speicherort=source,
)
if not results:
self._log("❌ Schreiben des Pufferlayers fehlgeschlagen")
return None
writer.lade_Layer(results)
layers = QgsProject.instance().mapLayersByName(name)
if layers:
self._log("✔ Persistenter rechteckiger Pufferlayer geladen")
return layers[0]
return None