forked from AG_QGIS/Plugin_SN_Plan41
516 lines
17 KiB
Python
516 lines
17 KiB
Python
|
||
"""
|
||
sn_plan41/ui/tab_a_logic.py – Fachlogik für Tab A (Daten)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path
|
||
|
||
from typing import Any, Dict, List, Optional
|
||
from collections.abc import Mapping as _Mapping
|
||
import os
|
||
|
||
from sn_basis.functions.qgiscore_wrapper import (
|
||
QgsVectorFileWriter,
|
||
QgsVectorLayer,
|
||
QgsProject,
|
||
QgsGeometry,
|
||
QgsFeature,
|
||
QgsField,
|
||
|
||
)
|
||
|
||
from sn_basis.functions.variable_wrapper import (
|
||
get_variable,
|
||
set_variable,
|
||
)
|
||
from sn_basis.functions.sys_wrapper import file_exists
|
||
from sn_basis.functions.ly_existence_wrapper import layer_exists
|
||
from sn_basis.functions.ly_metadata_wrapper import get_layer_type
|
||
from sn_basis.functions.qt_wrapper import QVariant
|
||
|
||
|
||
# Prüfer-Typen
|
||
from sn_basis.modules.Pruefmanager import Pruefmanager
|
||
from sn_basis.modules.linkpruefer import Linkpruefer
|
||
from sn_basis.modules.stilpruefer import Stilpruefer
|
||
from sn_basis.modules.Dateipruefer import Dateipruefer
|
||
from sn_basis.modules.layerpruefer import Layerpruefer
|
||
from sn_basis.modules.Datenschreiber import Datenschreiber
|
||
|
||
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
||
from sn_basis.modules.DataGrabber import DataGrabber, SourceType, SourceDict
|
||
|
||
Row = Dict[str, Any]
|
||
DataDict = Dict[str, List[Row]]
|
||
|
||
class TabALogic:
|
||
"""
|
||
Kapselt die Fachlogik von Tab A. Verfahrens-DB wird **nicht** bei Pfad-Auswahl,
|
||
sondern erst beim ersten Layer-Schreiben angelegt.
|
||
"""
|
||
|
||
def __init__(self, pruefmanager: Pruefmanager, link_pruefer: Linkpruefer, stil_pruefer: Stilpruefer) -> None:
|
||
self.pruefmanager = pruefmanager
|
||
self.link_pruefer = link_pruefer
|
||
self.stil_pruefer = stil_pruefer
|
||
self.data_grabber: Optional[DataGrabber] = None
|
||
def _log(self, msg: str) -> None:
|
||
print(f"[TabALogic] {msg}")
|
||
|
||
# -------------------------------
|
||
# Verfahrens-Datenbank (Pfad-Management)
|
||
# -------------------------------
|
||
|
||
def load_verfahrens_db(self) -> Optional[str]:
|
||
"""Lädt den gespeicherten Verfahrens-DB-Pfad (Datei muss nicht existieren)."""
|
||
path = get_variable("verfahrens_db", scope="project")
|
||
return path or None
|
||
|
||
def set_verfahrens_db(self, path: Optional[str]) -> None:
|
||
"""Speichert den Verfahrens-DB-Pfad (Datei wird später angelegt)."""
|
||
if path:
|
||
set_variable("verfahrens_db", path, scope="project")
|
||
else:
|
||
set_variable("verfahrens_db", "", scope="project")
|
||
|
||
# -------------------------------
|
||
# Layer → Verfahrens-DB schreiben (alte Logik!)
|
||
# -------------------------------
|
||
|
||
def write_layer_to_verfahrens_db(
|
||
self,
|
||
source_layer: QgsVectorLayer,
|
||
zielpfad: str,
|
||
layer_name: str,
|
||
) -> bool:
|
||
"""
|
||
Schreibt einen Layer in die Verfahrens-DB.
|
||
Legt GPKG **bei Bedarf neu an** (wie puffer_setzen im alten Code).
|
||
|
||
Args:
|
||
source_layer: Layer zum Exportieren (z.B. aus DataGrabber)
|
||
zielpfad: Vom Dateiprüfer geprüfter Ziel-GPKG-Pfad
|
||
layer_name: Name des Layers in der GPKG
|
||
|
||
Returns:
|
||
True wenn erfolgreich
|
||
"""
|
||
if not zielpfad or not source_layer or not source_layer.isValid():
|
||
return False
|
||
|
||
# Optionen wie im alten puffer_setzen
|
||
opts = QgsVectorFileWriter.SaveVectorOptions()
|
||
opts.driverName = "GPKG"
|
||
opts.fileEncoding = "UTF-8"
|
||
opts.layerName = layer_name
|
||
|
||
# Alte Logik: bei neuem Pfad komplett neue GPKG, sonst Layer überschreiben
|
||
if not os.path.exists(zielpfad):
|
||
opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteFile
|
||
else:
|
||
opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer
|
||
|
||
transform_context = QgsProject.instance().transformContext()
|
||
|
||
error = QgsVectorFileWriter.writeAsVectorFormatV3(
|
||
source_layer,
|
||
zielpfad,
|
||
transform_context,
|
||
opts,
|
||
)
|
||
|
||
if error != QgsVectorFileWriter.NoError:
|
||
print(f"Fehler beim Schreiben nach {zielpfad}: {error}")
|
||
return False
|
||
|
||
# Pfad jetzt auch als "Verfahrens-DB" merken
|
||
self.set_verfahrens_db(zielpfad)
|
||
return True
|
||
|
||
# -------------------------------
|
||
# Lokale Linkliste
|
||
# -------------------------------
|
||
|
||
def load_linkliste(self) -> Optional[str]:
|
||
path = get_variable("linkliste", scope="project")
|
||
if path and file_exists(path):
|
||
return path
|
||
return None
|
||
|
||
def set_linkliste(self, path: Optional[str]) -> None:
|
||
if path:
|
||
set_variable("linkliste", path, scope="project")
|
||
else:
|
||
set_variable("linkliste", "", scope="project")
|
||
|
||
# -------------------------------
|
||
# Verfahrensgebiet-Layer
|
||
# -------------------------------
|
||
|
||
def save_verfahrensgebiet_layer(self, layer: QgsVectorLayer) -> None:
|
||
"""Speichert die Verfahrensgebiet-Layer-ID, unter Annahme, dass der Layer prevalidiert ist."""
|
||
layer_id = layer.id() if layer is not None else ""
|
||
set_variable("verfahrensgebiet_layer", layer_id or "", scope="project")
|
||
|
||
def load_verfahrensgebiet_layer_id(self) -> Optional[str]:
|
||
value = get_variable("verfahrensgebiet_layer", scope="project")
|
||
return value or None
|
||
|
||
def is_valid_verfahrensgebiet_layer(self, layer) -> bool:
|
||
if not layer_exists(layer):
|
||
return False
|
||
|
||
layer_type = get_layer_type(layer)
|
||
return layer_type == "vector"
|
||
|
||
# === PIPELINE ===
|
||
def _on_run_pipeline(
|
||
self,
|
||
source: str,
|
||
linkliste: str | None,
|
||
raumfilter: str,
|
||
) -> None:
|
||
"""Pipeline starten; Eingaben gelten als vorvalidiert (Dateiprüfer + Pruefmanager)."""
|
||
if not self.pruefmanager or not self.data_grabber:
|
||
return
|
||
|
||
ergebnis1 = Dateipruefer(
|
||
source,
|
||
basis_pfad="",
|
||
leereingabe_erlaubt=False,
|
||
standarddatei=None,
|
||
temporaer_erlaubt=True,
|
||
verfahrens_db_modus=True,
|
||
).pruefe()
|
||
|
||
ergebnis2 = self.pruefmanager.verarbeite(ergebnis1)
|
||
if not ergebnis2.ok:
|
||
return
|
||
|
||
final_pfad = str(ergebnis2.kontext or source)
|
||
self.set_verfahrens_db(final_pfad)
|
||
self.data_grabber.run(final_pfad)
|
||
|
||
linkliste_final = self._resolve_linkliste(linkliste)
|
||
if not linkliste_final:
|
||
return
|
||
|
||
raumfilter_layer = self._resolve_raumfilter(raumfilter, final_pfad)
|
||
|
||
if raumfilter in ("Verfahrensgebiet", "Pufferlayer") and raumfilter_layer is None:
|
||
return
|
||
|
||
pipeline_context = {
|
||
"source": final_pfad,
|
||
"linkliste": linkliste_final,
|
||
"raumfilter": raumfilter_layer,
|
||
}
|
||
|
||
try:
|
||
self.data_grabber.run(final_pfad)
|
||
print("✅ DataGrabber aufgerufen!")
|
||
except Exception as e:
|
||
print(f"💥 DataGrabber FEHLER: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
else:
|
||
print("⏹️ Pipeline gestoppt (erwartet bei leerem Pfad)")
|
||
|
||
print("="*60 + "\n")
|
||
linkliste_final=self._resolve_linkliste(linkliste)
|
||
if linkliste_final is None:
|
||
|
||
print("⏹️ Pipeline abgebrochen (Linkliste)")
|
||
return
|
||
raumfilter_layer = self._resolve_raumfilter(raumfilter, final_pfad)
|
||
|
||
if raumfilter == "Verfahrensgebiet" and raumfilter_layer is None:
|
||
print("⏹️ Pipeline abgebrochen: kein Verfahrensgebiet gesetzt")
|
||
return
|
||
|
||
if raumfilter == "Pufferlayer" and raumfilter_layer is None:
|
||
print("⏹️ Pipeline abgebrochen: Pufferlayer konnte nicht erzeugt werden")
|
||
return
|
||
|
||
pipeline_context = {
|
||
"source": final_pfad,
|
||
"linkliste": linkliste_final,
|
||
"raumfilter": raumfilter_layer,
|
||
}
|
||
|
||
|
||
def _resolve_linkliste(self, linkliste: str | None) -> str | None:
|
||
"""
|
||
Prüft und normalisiert den Linklisten-Pfad.
|
||
|
||
Rückgabe:
|
||
- gültiger Pfad zur Linkliste (str)
|
||
- None → Pipeline abbrechen
|
||
"""
|
||
|
||
# --------------------------------------------------
|
||
# Standard-Linkliste (plattformneutral)
|
||
# --------------------------------------------------
|
||
plugin_root = get_plugin_root()
|
||
standard_linkliste = join_path(plugin_root, "assets", "Linkliste.xlsx")
|
||
|
||
# --------------------------------------------------
|
||
# 🔹 LEERE EINGABE → AUTOMATISCH STANDARDDATEI
|
||
# --------------------------------------------------
|
||
if not linkliste:
|
||
linkliste_final = str(standard_linkliste)
|
||
self.set_linkliste(linkliste_final)
|
||
return linkliste_final
|
||
|
||
# --------------------------------------------------
|
||
# Dateiprüfung nur bei expliziter Eingabe
|
||
# --------------------------------------------------
|
||
|
||
pruefer = Dateipruefer(
|
||
pfad=linkliste,
|
||
leereingabe_erlaubt=True,
|
||
standarddatei=str(standard_linkliste),
|
||
)
|
||
|
||
ergebnis = pruefer.pruefe()
|
||
|
||
# --------------------------------------------------
|
||
# Entscheidung über Pruefmanager
|
||
# --------------------------------------------------
|
||
ergebnis = self.pruefmanager.verarbeite(ergebnis)
|
||
|
||
if not ergebnis.ok:
|
||
# Nutzer hat abgebrochen oder Fehler nicht bestätigt
|
||
return None
|
||
|
||
# --------------------------------------------------
|
||
# Erfolgsfall → geprüften Pfad übernehmen
|
||
# --------------------------------------------------
|
||
linkliste_final = str(ergebnis.kontext)
|
||
|
||
# Optional: Projektvariable aktualisieren
|
||
self.set_linkliste(linkliste_final)
|
||
|
||
return linkliste_final
|
||
|
||
def _resolve_raumfilter(self, raumfilter: str, source: str) -> QgsVectorLayer | None:
|
||
self._log(f"Raumfilter-Auswahl: '{raumfilter}'")
|
||
self._log(f"Source: '{source}'")
|
||
|
||
if raumfilter == "Verfahrensgebiet":
|
||
layer = self._get_verfahrensgebiet_layer()
|
||
self._log(
|
||
"Verfahrensgebiet gefunden"
|
||
if layer else
|
||
"❌ Kein Verfahrensgebiet im Projekt"
|
||
)
|
||
return layer
|
||
|
||
if raumfilter == "Pufferlayer":
|
||
self._log("Pufferlayer-Modus aktiv")
|
||
return self._handle_pufferlayer(source)
|
||
|
||
self._log("Kein Raumfilter gewählt")
|
||
return None
|
||
|
||
|
||
def _get_verfahrensgebiet_layer(self) -> QgsVectorLayer | None:
|
||
layer_id = self.load_verfahrensgebiet_layer_id()
|
||
self._log(f"Verfahrensgebiet-Layer-ID: {layer_id}")
|
||
|
||
if not layer_id:
|
||
self._log("❌ Keine Layer-ID gespeichert")
|
||
return None
|
||
|
||
layer = QgsProject.instance().mapLayer(layer_id)
|
||
if not layer:
|
||
self._log("❌ Layer-ID existiert nicht im Projekt")
|
||
return None
|
||
|
||
if not self.is_valid_verfahrensgebiet_layer(layer):
|
||
self._log("❌ Layer ist kein gültiger Vektorlayer")
|
||
return None
|
||
|
||
self._log(f"Verfahrensgebiet-Layer OK: '{layer.name()}'")
|
||
return layer
|
||
|
||
|
||
def _handle_pufferlayer(self, source: str) -> QgsVectorLayer | None:
|
||
self._log("Prüfe vorhandenen Pufferlayer im Projekt")
|
||
|
||
existing = self._load_existing_pufferlayer()
|
||
if existing:
|
||
self._log("✔ Pufferlayer bereits im Projekt vorhanden")
|
||
return existing
|
||
|
||
self._log("Kein Pufferlayer im Projekt")
|
||
|
||
if source:
|
||
self._log("Prüfe Pufferlayer im Source")
|
||
exists = self._pufferlayer_exists_in_source(source)
|
||
self._log(f"Pufferlayer im Source vorhanden: {exists}")
|
||
|
||
if exists:
|
||
return self._load_existing_pufferlayer() or self._create_pufferlayer()
|
||
|
||
self._log("Erzeuge neuen Pufferlayer")
|
||
return self._create_pufferlayer()
|
||
|
||
|
||
def _load_existing_pufferlayer(self) -> QgsVectorLayer | None:
|
||
"""
|
||
Liefert einen vorhandenen Pufferlayer aus dem Projekt.
|
||
"""
|
||
layers = QgsProject.instance().mapLayersByName("Pufferlayer")
|
||
return layers[0] if layers else None
|
||
|
||
|
||
def _create_pufferlayer(self) -> QgsVectorLayer | None:
|
||
self._log("Starte Pufferlayer-Erstellung")
|
||
|
||
basis_layer = self._get_verfahrensgebiet_layer()
|
||
if not basis_layer:
|
||
self._log("❌ Kein Verfahrensgebiet → kein Puffer möglich")
|
||
return None
|
||
source = self.load_verfahrens_db()
|
||
self._log(f"Basislayer: '{basis_layer.name()}'")
|
||
|
||
layer = self.Pufferlayer_erstellen(
|
||
basis_layer=basis_layer,
|
||
distance=1000,
|
||
name="Pufferlayer",
|
||
source=source,
|
||
)
|
||
|
||
self._log(
|
||
"✔ Pufferlayer erfolgreich erzeugt"
|
||
if layer else
|
||
"❌ Pufferlayer-Erstellung fehlgeschlagen"
|
||
)
|
||
return layer
|
||
|
||
|
||
from sn_basis.functions.qgiscore_wrapper import QgsVectorLayer
|
||
|
||
def _pufferlayer_exists_in_source(self, source: str) -> bool:
|
||
"""
|
||
Prüft, ob im Source (z.B. GPKG) ein Layer namens 'Pufferlayer' existiert.
|
||
"""
|
||
if not source:
|
||
return False
|
||
|
||
uri = f"{source}|layername=Pufferlayer"
|
||
layer = QgsVectorLayer(uri, "Pufferlayer", "ogr")
|
||
|
||
return layer.isValid()
|
||
|
||
|
||
|
||
def Pufferlayer_erstellen(
|
||
self,
|
||
basis_layer: QgsVectorLayer,
|
||
distance: float,
|
||
name: str,
|
||
source: str | None = None,
|
||
) -> QgsVectorLayer | None:
|
||
"""
|
||
Erzeugt einen rechteckigen Pufferlayer (BBOX + Abstand)
|
||
um das Verfahrensgebiet.
|
||
|
||
- Ohne Source → temporärer Memory-Layer
|
||
- Mit Source → Schreiben über Datenschreiber
|
||
|
||
Parameters
|
||
----------
|
||
basis_layer : QgsVectorLayer
|
||
Verfahrensgebiet-Layer.
|
||
distance : float
|
||
Pufferabstand in Metern.
|
||
name : str
|
||
Name des Ziel-Layers.
|
||
source : str | None
|
||
Zielquelle (z.B. Verfahrens-DB) oder None für temporär.
|
||
|
||
Returns
|
||
-------
|
||
QgsVectorLayer | None
|
||
Neuer Pufferlayer oder None bei Fehler.
|
||
"""
|
||
if not basis_layer or not basis_layer.isValid():
|
||
self._log("❌ Basislayer ungültig – kein Puffer möglich")
|
||
return None
|
||
|
||
# --------------------------------------------------
|
||
# 1. Rechteck-Geometrie (Extent + Puffer)
|
||
# --------------------------------------------------
|
||
extent = basis_layer.extent().buffered(distance)
|
||
bbox_geom = QgsGeometry.fromRect(extent)
|
||
|
||
# --------------------------------------------------
|
||
# 2. CRS übernehmen
|
||
# --------------------------------------------------
|
||
crs_auth = basis_layer.crs().authid()
|
||
uri = f"Polygon?crs={crs_auth}"
|
||
|
||
mem_layer = QgsVectorLayer(uri, name, "memory")
|
||
provider = mem_layer.dataProvider()
|
||
provider.addAttributes([
|
||
QgsField("id", QVariant.Int)
|
||
])
|
||
mem_layer.updateFields()
|
||
|
||
|
||
|
||
# --------------------------------------------------
|
||
# 4. Feature erzeugen
|
||
# --------------------------------------------------
|
||
feat = QgsFeature(mem_layer.fields())
|
||
feat.setGeometry(bbox_geom)
|
||
feat["id"] = 1
|
||
provider.addFeature(feat)
|
||
mem_layer.updateExtents()
|
||
|
||
# --------------------------------------------------
|
||
# 5. Temporärer Modus → direkt ins Projekt
|
||
# --------------------------------------------------
|
||
if not source:
|
||
QgsProject.instance().addMapLayer(mem_layer)
|
||
self._log("✔ Temporärer rechteckiger Pufferlayer erzeugt")
|
||
return mem_layer
|
||
|
||
# --------------------------------------------------
|
||
# 6. Persistenter Modus → Datenschreiber
|
||
# --------------------------------------------------
|
||
writer = Datenschreiber(
|
||
pruefmanager=self.pruefmanager,
|
||
gpkg_path=source,
|
||
)
|
||
|
||
daten_dict = {
|
||
"daten": {
|
||
name: {
|
||
"layer": mem_layer
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
results = writer.schreibe_Daten(
|
||
daten_dict=daten_dict,
|
||
processed_results=[],
|
||
speicherort=source,
|
||
)
|
||
|
||
if not results:
|
||
self._log("❌ Schreiben des Pufferlayers fehlgeschlagen")
|
||
return None
|
||
|
||
writer.lade_Layer(results)
|
||
|
||
layers = QgsProject.instance().mapLayersByName(name)
|
||
if layers:
|
||
self._log("✔ Persistenter rechteckiger Pufferlayer geladen")
|
||
return layers[0]
|
||
|
||
return None |