""" sn_plan41/ui/tab_a_logic.py – Fachlogik für Tab A (Daten) """ from __future__ import annotations from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists from typing import Any, Dict, List, Optional from collections.abc import Mapping as _Mapping import os import datetime import json import tempfile from sn_basis.functions.qgiscore_wrapper import ( QgsVectorFileWriter, QgsVectorLayer, QgsProject, QgsGeometry, QgsFeature, QgsField, QgsFeatureRequest, QgsCoordinateReferenceSystem, ) from sn_basis.functions.variable_wrapper import ( get_variable, set_variable, ) from sn_basis.functions.ly_existence_wrapper import layer_exists from sn_basis.functions.ly_metadata_wrapper import get_layer_type from sn_basis.functions.qt_wrapper import QVariant from sn_basis.functions.dialog_wrapper import create_progress_dialog from sn_basis.functions.message_wrapper import info, warning, error # Prüfer-Typen from sn_basis.modules.Pruefmanager import Pruefmanager from sn_basis.modules.linkpruefer import Linkpruefer from sn_basis.modules.stilpruefer import Stilpruefer from sn_basis.modules.Dateipruefer import Dateipruefer from sn_basis.modules.layerpruefer import Layerpruefer from sn_basis.modules.LayerLoader import LayerLoader from sn_basis.modules.Datenschreiber import Datenschreiber from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.modules.DataGrabber import DataGrabber, SourceType, SourceDict from sn_basis.modules.Datenabruf import Datenabruf Row = Dict[str, Any] DataDict = Dict[str, List[Row]] class TabALogic: """ Kapselt die Fachlogik von Tab A. Verfahrens-DB wird **nicht** bei Pfad-Auswahl, sondern erst beim ersten Layer-Schreiben angelegt. """ def __init__(self, pruefmanager: Pruefmanager, link_pruefer: Linkpruefer, stil_pruefer: Stilpruefer) -> None: self.pruefmanager = pruefmanager self.link_pruefer = link_pruefer self.stil_pruefer = stil_pruefer self.data_grabber: Optional[DataGrabber] = None def _log(self, msg: str) -> None: print(f"[TabALogic] {msg}") # ------------------------------- # Verfahrens-Datenbank (Pfad-Management) # ------------------------------- def load_verfahrens_db(self) -> Optional[str]: """Lädt den gespeicherten Verfahrens-DB-Pfad (Datei muss nicht existieren).""" path = get_variable("verfahrens_db", scope="project") return path or None def set_verfahrens_db(self, path: Optional[str]) -> None: """Speichert den Verfahrens-DB-Pfad (Datei wird später angelegt).""" if path: set_variable("verfahrens_db", path, scope="project") else: set_variable("verfahrens_db", "", scope="project") # ------------------------------- # Layer → Verfahrens-DB schreiben (alte Logik!) # ------------------------------- def write_layer_to_verfahrens_db( self, source_layer: QgsVectorLayer, zielpfad: str, layer_name: str, ) -> bool: """ Schreibt einen Layer in die Verfahrens-DB. Legt GPKG **bei Bedarf neu an** (wie puffer_setzen im alten Code). Args: source_layer: Layer zum Exportieren (z.B. aus DataGrabber) zielpfad: Vom Dateiprüfer geprüfter Ziel-GPKG-Pfad layer_name: Name des Layers in der GPKG Returns: True wenn erfolgreich """ if not zielpfad or not source_layer or not source_layer.isValid(): return False # Optionen wie im alten puffer_setzen opts = QgsVectorFileWriter.SaveVectorOptions() opts.driverName = "GPKG" opts.fileEncoding = "UTF-8" opts.layerName = layer_name # Alte Logik: bei neuem Pfad komplett neue GPKG, sonst Layer überschreiben if not os.path.exists(zielpfad): opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteFile else: opts.actionOnExistingFile = QgsVectorFileWriter.CreateOrOverwriteLayer transform_context = QgsProject.instance().transformContext() error = QgsVectorFileWriter.writeAsVectorFormatV3( source_layer, zielpfad, transform_context, opts, ) if error != QgsVectorFileWriter.NoError: print(f"Fehler beim Schreiben nach {zielpfad}: {error}") return False # Pfad jetzt auch als "Verfahrens-DB" merken self.set_verfahrens_db(zielpfad) return True # ------------------------------- # Lokale Linkliste # ------------------------------- def load_linkliste(self) -> Optional[str]: path = get_variable("linkliste", scope="project") if path and file_exists(path): return path return None def set_linkliste(self, path: Optional[str]) -> None: if path: set_variable("linkliste", path, scope="project") else: set_variable("linkliste", "", scope="project") # ------------------------------- # Verfahrensgebiet-Layer # ------------------------------- def save_verfahrensgebiet_layer(self, layer: QgsVectorLayer) -> None: """Speichert die Verfahrensgebiet-Layer-ID, unter Annahme, dass der Layer prevalidiert ist.""" layer_id = layer.id() if layer is not None else "" set_variable("verfahrensgebiet_layer", layer_id or "", scope="project") def load_verfahrensgebiet_layer_id(self) -> Optional[str]: value = get_variable("verfahrensgebiet_layer", scope="project") return value or None def is_valid_verfahrensgebiet_layer(self, layer) -> bool: if not layer_exists(layer): return False layer_type = get_layer_type(layer) return layer_type == "vector" # === PIPELINE === def _on_run_pipeline( self, source: str, linkliste: str | None, raumfilter: str, progress: Optional[Any] = None, ) -> Optional[Dict[str, Any]]: """Pipeline starten; Linkliste wird ausgelesen und geprüft, dann Datenabruf ausgeführt.""" self._log("Pipeline startet") if not self.pruefmanager or not self.data_grabber: self._log("Fehler: Pruefmanager oder DataGrabber fehlt") return None # 1) Verfahrens-DB prüfen und als aktive DB setzen datei_ergebnis = Dateipruefer( source, basis_pfad="", leereingabe_erlaubt=False, standarddatei=None, temporaer_erlaubt=True, verfahrens_db_modus=True, ).pruefe() datei_ergebnis = self.pruefmanager.verarbeite(datei_ergebnis) if not datei_ergebnis.ok: self._log("Verfahrens-DB-Pruefung fehlgeschlagen") return None final_pfad = str(datei_ergebnis.kontext or source) self.set_verfahrens_db(final_pfad) # Nach bestätigter Entscheidung: sofort Fortschrittsdialog zeigen if progress is None: progress = create_progress_dialog(1, "Fachdaten laden", "Prüfe Eingaben...") else: progress.set_total(1) progress.set_value(0) progress.set_label("Prüfe Eingaben...") # 2) Linkliste auflösen, falls leer Standardlinkliste verwenden linkliste_final = self._resolve_linkliste(linkliste) if linkliste_final is None: self._log("Linkliste kann nicht aufgelöst werden") return None else: self._log(f"Linkliste final: '{linkliste_final}'") # 3) Raumfilter prüfen raumfilter_layer = self._resolve_raumfilter(raumfilter, final_pfad) if raumfilter in ("Verfahrensgebiet", "Pufferlayer") and raumfilter_layer is None: self._log(f"Raumfilter '{raumfilter}' nicht verfügbar") return None # 4) Lade-Status initialisieren (funktioniert ab Bestätigung überschreiben/anhängen) if progress is None: # placeholder mit 1; tatsächliche Gesamtzahl kennt DataGrabber später progress = create_progress_dialog(1, "Fachdaten laden", "Prüfe Eingaben...") else: progress.set_total(1) progress.set_value(0) progress.set_label("Prüfe Eingaben...") # 5) Daten aus Linkliste laden und prüfen source_dict, grabber_summary = self.data_grabber.run(linkliste_final) self._log(f"DataGrabber: {grabber_summary.meldung} [{grabber_summary.aktion}]") # DEBUG: detaillierter Status print("[TabALogic] ... Debug: source_dict keys:", list(source_dict.keys())) print("[TabALogic] ... Debug: rows count:", len(source_dict.get("rows", []))) if source_dict.get("rows"): for i, row in enumerate(source_dict.get("rows", []), start=1): print(f"[TabALogic] ... Debug: row {i}: {row}") if not source_dict.get("rows"): self._log("Keine validen Linkliste-Einträge für Datenabruf") print("[TabALogic] ... STOP: rows:", len(source_dict.get("rows", []))) return None total_rows = len(source_dict.get("rows", [])) if progress is not None: if hasattr(progress, "set_total"): progress.set_total(max(total_rows, 1)) elif hasattr(progress, "setMaximum"): progress.setMaximum(max(total_rows, 1)) else: progress.total = max(total_rows, 1) progress.set_value(0) progress.set_label("Lade Daten...") if not grabber_summary.ok: self._log("Warnung: DataGrabber meldet fehlerhafte Zeilen, fahre mit Validierungsdaten fort") # 5) Datenabruf (aus validierten Zeilen) datenabruf = Datenabruf(self.pruefmanager) result_dict, datenabruf_results = datenabruf.datenabruf( result_dict=source_dict, raumfilter=raumfilter, verfahrensgebiet_layer=raumfilter_layer, speicherort=final_pfad, pruef_ergebnisse=[grabber_summary], progress=progress, ) self._log("Datenabruf abgeschlossen") pipeline_context = { "source": final_pfad, "linkliste": linkliste_final, "raumfilter": raumfilter_layer, "raumfilter_name": raumfilter, "source_dict": source_dict, "result_dict": result_dict, "datenabruf_results": datenabruf_results, } # 6) Lade Dienste in das Projekt aus result_dict load_summary = self._load_dienste_aus_result_dict(source_dict, pipeline_context, progress=progress) if progress is not None: progress.set_value(total_rows) progress.set_label("Pipeline abgeschlossen. Bitte OK klicken, um den Dialog zu schließen.") # 7) Log-Datei schreiben self._write_markdown_log(final_pfad, source_dict, pipeline_context, load_summary) print("=" * 60 + "\n") return pipeline_context def _load_dienste_aus_result_dict(self, source_dict: DataDict, pipeline_context: Dict[str, Any], progress: Optional[Any] = None) -> Dict[str, Any]: """Lädt Dienste (aus Linkliste) ins Projekt und persistiert optional mit Datenschreiber.""" rows = source_dict.get("rows", []) total = len(rows) loaded_count = 0 skipped_outside = 0 aborted = False if not rows: self._log("Keine Dienste zum Laden") return final_pfad = pipeline_context.get("source") or "" use_datenschreiber = bool(final_pfad) datenschreiber = None if use_datenschreiber: datenschreiber = Datenschreiber(self.pruefmanager, gpkg_path=final_pfad) daten_dict: Dict[str, Any] = {"daten": {}} raumfilter_layer = pipeline_context.get("raumfilter") raumfilter_name = pipeline_context.get("raumfilter_name", "unbekannt") raumfilter_crs_authid = None if raumfilter_layer is not None and hasattr(raumfilter_layer, "crs") and callable(getattr(raumfilter_layer, "crs")): try: crs = raumfilter_layer.crs() if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")): raumfilter_crs_authid = crs.authid() except Exception: raumfilter_crs_authid = None # Für den späteren Filter benötigen wir entweder die reine Ausdehnung # (Pufferlayer) oder – im Falle eines echten Verfahrensgebiets – die # vollständige Geometrie. Die Filtermethode wird im Schleifenrumpf # ausgewählt. raumfilter_extent = None if raumfilter_layer is not None and getattr(raumfilter_layer, 'extent', None) is not None: raumfilter_extent = raumfilter_layer.extent() temp_layers: List[Any] = [] layer_loader = LayerLoader(self.pruefmanager, stil_pruefer=self.stil_pruefer, layer_pruefer=self.link_pruefer) # Statistiken für Log: Raumfilter-Info pro Dienst row_stats: List[Dict[str, Any]] = [] layer_call_status: Dict[str, str] = {} for idx, row in enumerate(rows, start=1): ident = str(row.get("ident") or "") provider = str(row.get("Provider", "")).lower() link = str(row.get("Link", "")) thema = str(row.get("Inhalt") or row.get("ident") or "Dienst") style = row.get("stildatei") daten_map = (pipeline_context.get("result_dict") or {}).get("daten", {}) fetched_features = daten_map.get(ident, []) if isinstance(daten_map, dict) else [] fetched_count = len(fetched_features) if isinstance(fetched_features, list) else None if progress is not None: progress.set_label(f"Lade Dienst {idx}/{total}: {thema}") progress.set_value(idx) if progress.is_canceled(): aborted = True layer_call_status[thema] = "abbruch_vor_layeraufruf" self._log("Nutzerabbruch: Pipeline gestoppt") self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung="Pipeline durch Benutzer abgebrochen", aktion="abbruch", kontext={"dienst": thema, "schritt": idx}, ) ) break self._log(f"Lade Dienst '{thema}' ({provider})") self._log(f"[DEBUG] Layeraufruf startet: thema='{thema}', provider='{provider}', link='{link}'") layer_call_status[thema] = "layeraufruf_start" layer = layer_loader.create_layer(provider, link, thema) if not layer: layer_call_status[thema] = "layer_nicht_ladbar" self._log(f"[DEBUG] Layeraufruf fehlgeschlagen: thema='{thema}'") row_stats.append({ "dienst": thema, "provider": provider, "link": link, "style": style or "", "datenabruf_features": fetched_count, "total_features": None, "filtered_features": None, "status": "layer_nicht_ladbar", "raumfilter": raumfilter_name, }) continue layer_call_status[thema] = "layeraufruf_ok" self._log(f"[DEBUG] Layeraufruf erfolgreich: thema='{thema}'") if progress is not None and progress.is_canceled(): aborted = True layer_call_status[thema] = "abbruch_nach_layeraufruf" self._log("Nutzerabbruch nach Layer-Erzeugung") break # Je nach Typ des Filters einen geeigneten Filter anwenden. if raumfilter_layer and raumfilter_name == "Verfahrensgebiet": # echte Geometrie-Schnittmenge, nicht nur BBox layer_for_write = layer_loader.filter_by_layer( layer, raumfilter_layer, cancel_callback=(progress.is_canceled if progress is not None else None), ) else: layer_for_write = layer_loader.filter_by_extent( layer, raumfilter_extent, cancel_callback=(progress.is_canceled if progress is not None else None), source_layer=raumfilter_layer, ) if progress is not None and progress.is_canceled(): aborted = True layer_call_status[thema] = "abbruch_nach_raumfilter" self._log("Nutzerabbruch nach Raumfilter") break # Zähle Features vor/nach Raumfilter total_features = None filtered_features = None try: if layer is not None and hasattr(layer, "featureCount"): total_features = int(layer.featureCount()) except Exception: total_features = None if layer_for_write is not None and hasattr(layer_for_write, "featureCount"): try: filtered_features = int(layer_for_write.featureCount()) except Exception: filtered_features = None if not layer_for_write: layer_call_status[thema] = "raumfilter_ausserhalb" self._log(f"Dienst {thema} ist außerhalb des Raumfilters") skipped_outside += 1 row_stats.append({ "dienst": thema, "provider": provider, "link": link, "style": style or "", "datenabruf_features": fetched_count, "total_features": total_features, "filtered_features": 0, "status": "außerhalb", "raumfilter": raumfilter_name, }) continue if style: layer_loader.apply_style(layer_for_write, style) row_stats.append({ "dienst": thema, "provider": provider, "link": link, "style": style or "", "datenabruf_features": fetched_count, "total_features": total_features, "filtered_features": filtered_features, "status": "geladen", "raumfilter": raumfilter_name, }) layer_call_status[thema] = "geladen" self._log(f"[DEBUG] Dienst geladen: thema='{thema}', provider='{provider}', filtered_features={filtered_features}") if provider == "wms": # WMS ist Raster und wird nicht in GPKG geschrieben. # Im temporären Modus wird er trotzdem direkt geladen. loaded_count += 1 if use_datenschreiber: self._log(f"WMS-Layer {thema} wird nicht in GPKG gespeichert, nur in Projekt (temporär)") # Während Datenbankmodus: wir speichern nicht in daten_dict, # aber für gute Sichtbarkeit laden wir nach erfolgreichem Schreibprozess. temp_layers.append(layer) else: temp_layers.append(layer) continue if use_datenschreiber and datenschreiber: daten_dict["daten"][thema] = { "layer": layer_for_write, "style_path": style, } else: temp_layers.append(layer_for_write) loaded_count += 1 if use_datenschreiber and datenschreiber and daten_dict["daten"]: self._log(f"Schreibe {len(daten_dict['daten'])} Layer in {final_pfad}") results = datenschreiber.schreibe_Daten( daten_dict, processed_results=pipeline_context.get("datenabruf_results", []), speicherort=final_pfad, ) datenschreiber.lade_Layer(results) self._log("Datenschreiber abgeschlossen") elif temp_layers: self._log(f"Temporärmodus: Lade {len(temp_layers)} Layer ins Projekt") for layer in temp_layers: QgsProject.instance().addMapLayer(layer) self._log("Temporärmodus: Layer im Projekt geladen") else: self._log("Keine Layer zum Laden (kein persistierter GPkg-Write).") self._log(f"Dienst-Laden fertig ({len(rows)} Zeilen)") return { "row_count": len(rows), "loaded_count": loaded_count, "skipped_outside": skipped_outside, "aborted": aborted, "row_stats": row_stats, "layer_call_status": layer_call_status, "raumfilter_name": raumfilter_name, } def _create_local_layer_from_fetched_features( self, thema: str, features: List[Any], crs_authid: Optional[str] = None, ) -> Optional[QgsVectorLayer]: """Erzeugt aus bereits geholten GeoJSON-Features einen lokalen OGR-Layer. Verhindert einen zweiten potentiell blockierenden Remote-Aufruf (WFS/REST). """ if not features: return None normalized_features: List[Dict[str, Any]] = [] detected_crs_authid: Optional[str] = None for feature in features: if not isinstance(feature, dict): continue # Fall 1: bereits GeoJSON-Feature if feature.get("type") == "Feature" and isinstance(feature.get("geometry"), dict): normalized_features.append(feature) continue # Fall 2: ArcGIS Feature-JSON -> GeoJSON konvertieren attributes = feature.get("attributes") geometry = feature.get("geometry") if not isinstance(attributes, dict) or not isinstance(geometry, dict): continue if detected_crs_authid is None: try: sr = geometry.get("spatialReference") if isinstance(sr, dict): wkid = sr.get("latestWkid") or sr.get("wkid") if wkid: detected_crs_authid = f"EPSG:{int(wkid)}" except Exception: detected_crs_authid = None geojson_geometry: Dict[str, Any] | None = None # Point if "x" in geometry and "y" in geometry: geojson_geometry = { "type": "Point", "coordinates": [geometry.get("x"), geometry.get("y")], } # MultiPoint elif isinstance(geometry.get("points"), list): geojson_geometry = { "type": "MultiPoint", "coordinates": geometry.get("points", []), } # LineString / MultiLineString elif isinstance(geometry.get("paths"), list): paths = geometry.get("paths", []) if len(paths) == 1: geojson_geometry = { "type": "LineString", "coordinates": paths[0], } else: geojson_geometry = { "type": "MultiLineString", "coordinates": paths, } # Polygon / MultiPolygon elif isinstance(geometry.get("rings"), list): rings = geometry.get("rings", []) cleaned_rings = [ ring for ring in rings if isinstance(ring, list) and len(ring) >= 4 ] if len(cleaned_rings) == 1: geojson_geometry = { "type": "Polygon", "coordinates": cleaned_rings, } elif len(cleaned_rings) > 1: # Robuster Fallback für ArcGIS-Ringe: # Mehrere Ringe werden als MultiPolygon behandelt, # damit nicht versehentlich alle Ringe als Löcher eines # einzigen Polygons interpretiert werden. geojson_geometry = { "type": "MultiPolygon", "coordinates": [[ring] for ring in cleaned_rings], } if geojson_geometry is None: continue normalized_features.append( { "type": "Feature", "geometry": geojson_geometry, "properties": attributes, } ) if not normalized_features: self._log(f"[DEBUG] Keine konvertierbaren Features für lokalen Layer: thema='{thema}'") return None try: payload = { "type": "FeatureCollection", "features": normalized_features, } with tempfile.NamedTemporaryFile( suffix=".geojson", delete=False, mode="w", encoding="utf-8", ) as fh: json.dump(payload, fh, ensure_ascii=False) tmp_path = fh.name layer = QgsVectorLayer(tmp_path, thema, "ogr") if layer and layer.isValid(): target_crs = detected_crs_authid or crs_authid if target_crs and QgsCoordinateReferenceSystem is not None and hasattr(layer, "setCrs"): try: layer.setCrs(QgsCoordinateReferenceSystem(target_crs)) except Exception: pass self._log( f"[DEBUG] Lokaler Layer gültig: thema='{thema}', " f"input_features={len(features)}, geojson_features={len(normalized_features)}, " f"layer_features={layer.featureCount()}, crs='{target_crs or 'unbekannt'}'" ) return layer self._log(f"[DEBUG] Lokaler Layer aus Datenabruf ungültig: thema='{thema}', pfad='{tmp_path}'") return None except Exception as exc: self._log(f"[DEBUG] Fehler beim Erzeugen lokaler Featureschicht für {thema}: {exc}") return None def _write_markdown_log( self, final_pfad: str, source_dict: DataDict, pipeline_context: Dict[str, Any], load_summary: Dict[str, Any], ) -> None: """Schreibt den Pipeline-Log (Markdown).""" lines = [ "# Plan41 Fachdaten-Ladevorgang", "", f"**Datum**: {datetime.datetime.now().isoformat()}", f"**Verfahrens-DB**: {final_pfad or 'temporär'}", f"**Linkliste**: {pipeline_context.get('linkliste')}", "", "## Zusammenfassung", "", f"- **Zeilen gesamt**: {load_summary.get('row_count', 0)}", f"- **Geladene Dienste**: {load_summary.get('loaded_count', 0)}", f"- **Außerhalb Raumfilter**: {load_summary.get('skipped_outside', 0)}", f"- **Abgebrochen**: {load_summary.get('aborted', False)}", f"- **Raumfilter**: {load_summary.get('raumfilter_name', 'unbekannt')}", f"- **Raumfilter-Typ**: {pipeline_context.get('raumfilter_name', 'unbekannt')}", "", "## Dienstliste", "", "| Dienst | Provider | Linkadresse | Aufrufstatus | Ergebnisstatus |", "|---|---|---|---|---|", ] status_by_dienst = { str(stat.get("dienst", "")): str(stat.get("status", "n/a")) for stat in load_summary.get("row_stats", []) } aufrufstatus_by_dienst = { str(key): str(value) for key, value in (load_summary.get("layer_call_status", {}) or {}).items() } for row in source_dict.get('rows', []): dienst = row.get('Inhalt') or row.get('ident') or '' provider = row.get('Provider') or '' link = row.get('Link') or '' aufrufstatus = aufrufstatus_by_dienst.get(str(dienst), "nicht_aufgerufen") ergebnisstatus = status_by_dienst.get(str(dienst), "n/a") lines.append(f"| {dienst} | {provider} | {link} | {aufrufstatus} | {ergebnisstatus} |") lines.extend([ "", "## Raumfilter-Statistik", "", "| Dienst | Provider | Linkadresse | Datenabruf-Objekte | Gesamt-Objekte | Gefilterte Objekte | Raumfilter | Status |", "|---|---|---|---|---|---|---|---|", ]) for stat in load_summary.get('row_stats', []): lines.append( f"| {stat.get('dienst', '')} | {stat.get('provider', '')} | {stat.get('link', '')} | {stat.get('datenabruf_features', 'n/a')} | {stat.get('total_features', 'n/a')} | {stat.get('filtered_features', 'n/a')} | {stat.get('raumfilter', '')} | {stat.get('status', 'n/a')} |" ) markdown = "\n".join(lines) if final_pfad: log_dir = os.path.dirname(final_pfad) os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "plan41_lade_log.md") try: with open(log_file, "w", encoding="utf-8") as fh: fh.write(markdown) self.pruefmanager.verarbeite( pruef_ergebnis( ok=True, meldung=f"Lade-Log gespeichert: {log_file}", aktion="log_geschrieben", kontext={"log_file": log_file}, ) ) info("Lade-Log", f"Lade-Protokoll gespeichert: {log_file}", duration=10) except Exception as exc: self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Fehler beim Schreiben des Logle (md): {exc}", aktion="log_schreiben_fehlgeschlagen", kontext={"error": str(exc)}, ) ) warning("Lade-Log", f"Konnte Datei nicht schreiben: {exc}", duration=10) else: # temporärer Modus: nur anzeigen info("Lade-Log (temporär)", markdown, duration=20) def _clone_layer_with_extent(self, layer: QgsVectorLayer, extent, thema: str) -> QgsVectorLayer | None: """Erstellt eine Memory-Kopie von mit Geometrien im BBOX-Raumfilter.""" try: request = QgsFeatureRequest().setFilterRect(extent) features = list(layer.getFeatures(request)) if not features: return None geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"} geom_type = geom_type_map.get(layer.geometryType(), "Polygon") uri = f"{geom_type}?crs={layer.crs().authid()}" filtered_layer = QgsVectorLayer(uri, f"{thema}_BBOX", "memory") if not filtered_layer or not filtered_layer.isValid(): self._log(f"Fehler beim Erzeugen des temporären Filterlayers für {thema}") return None provider = filtered_layer.dataProvider() provider.addAttributes(layer.fields()) filtered_layer.updateFields() provider.addFeatures(features) filtered_layer.updateExtents() return filtered_layer except Exception as e: self._log(f"Fehler beim Filtern von {thema} nach Raumfilter: {e}") return None def _resolve_linkliste(self, linkliste: str | None) -> str | None: """ Prüft und normalisiert den Linklisten-Pfad. Rückgabe: - gültiger Pfad zur Linkliste (str) - None → Pipeline abbrechen """ # -------------------------------------------------- # Standard-Linkliste (plattformneutral) # -------------------------------------------------- plugin_root = get_plugin_root() standard_linkliste = join_path(plugin_root, "sn_plan41","assets", "Linkliste.xlsx") # -------------------------------------------------- # 🔹 LEERE EINGABE → AUTOMATISCH STANDARDDATEI # -------------------------------------------------- if not linkliste: linkliste_final = str(standard_linkliste) self.set_linkliste(linkliste_final) return linkliste_final # -------------------------------------------------- # Dateiprüfung nur bei expliziter Eingabe # -------------------------------------------------- pruefer = Dateipruefer( pfad=linkliste, leereingabe_erlaubt=True, standarddatei=str(standard_linkliste), ) ergebnis = pruefer.pruefe() # -------------------------------------------------- # Entscheidung über Pruefmanager # -------------------------------------------------- ergebnis = self.pruefmanager.verarbeite(ergebnis) if not ergebnis.ok: # Nutzer hat abgebrochen oder Fehler nicht bestätigt return None # -------------------------------------------------- # Erfolgsfall → geprüften Pfad übernehmen # -------------------------------------------------- linkliste_final = str(ergebnis.kontext) # Optional: Projektvariable aktualisieren self.set_linkliste(linkliste_final) return linkliste_final def _resolve_raumfilter(self, raumfilter: str, source: str) -> QgsVectorLayer | None: self._log(f"Raumfilter-Auswahl: '{raumfilter}'") self._log(f"Source: '{source}'") if raumfilter == "Verfahrensgebiet": layer = self._get_verfahrensgebiet_layer() self._log( "Verfahrensgebiet gefunden" if layer else "❌ Kein Verfahrensgebiet im Projekt" ) return layer if raumfilter == "Pufferlayer": self._log("Pufferlayer-Modus aktiv") return self._handle_pufferlayer(source) self._log("Kein Raumfilter gewählt") return None def _get_verfahrensgebiet_layer(self) -> QgsVectorLayer | None: layer_id = self.load_verfahrensgebiet_layer_id() self._log(f"Verfahrensgebiet-Layer-ID: {layer_id}") if not layer_id: self._log("❌ Keine Layer-ID gespeichert") return None layer = QgsProject.instance().mapLayer(layer_id) if not layer: self._log("❌ Layer-ID existiert nicht im Projekt") return None if not self.is_valid_verfahrensgebiet_layer(layer): self._log("❌ Layer ist kein gültiger Vektorlayer") return None self._log(f"Verfahrensgebiet-Layer OK: '{layer.name()}'") return layer def _handle_pufferlayer(self, source: str) -> QgsVectorLayer | None: self._log("Prüfe vorhandenen Pufferlayer im Projekt") existing = self._load_existing_pufferlayer() if existing: self._log("✔ Pufferlayer bereits im Projekt vorhanden") return existing self._log("Kein Pufferlayer im Projekt") if source: self._log("Prüfe Pufferlayer im Source") exists = self._pufferlayer_exists_in_source(source) self._log(f"Pufferlayer im Source vorhanden: {exists}") if exists: return self._load_existing_pufferlayer() or self._create_pufferlayer() self._log("Erzeuge neuen Pufferlayer") return self._create_pufferlayer() def _load_existing_pufferlayer(self) -> QgsVectorLayer | None: """ Liefert einen vorhandenen Pufferlayer aus dem Projekt. """ layers = QgsProject.instance().mapLayersByName("Pufferlayer") return layers[0] if layers else None def _create_pufferlayer(self) -> QgsVectorLayer | None: self._log("Starte Pufferlayer-Erstellung") basis_layer = self._get_verfahrensgebiet_layer() if not basis_layer: self._log("❌ Kein Verfahrensgebiet → kein Puffer möglich") return None source = self.load_verfahrens_db() self._log(f"Basislayer: '{basis_layer.name()}'") layer = self.Pufferlayer_erstellen( basis_layer=basis_layer, distance=1000, name="Pufferlayer", source=source, ) self._log( "✔ Pufferlayer erfolgreich erzeugt" if layer else "❌ Pufferlayer-Erstellung fehlgeschlagen" ) return layer from sn_basis.functions.qgiscore_wrapper import QgsVectorLayer def _pufferlayer_exists_in_source(self, source: str) -> bool: """ Prüft, ob im Source (z.B. GPKG) ein Layer namens 'Pufferlayer' existiert. """ if not source: return False uri = f"{source}|layername=Pufferlayer" layer = QgsVectorLayer(uri, "Pufferlayer", "ogr") return layer.isValid() def Pufferlayer_erstellen( self, basis_layer: QgsVectorLayer, distance: float, name: str, source: str | None = None, ) -> QgsVectorLayer | None: """ Erzeugt einen rechteckigen Pufferlayer (BBOX + Abstand) um das Verfahrensgebiet. - Ohne Source → temporärer Memory-Layer - Mit Source → Schreiben über Datenschreiber Parameters ---------- basis_layer : QgsVectorLayer Verfahrensgebiet-Layer. distance : float Pufferabstand in Metern. name : str Name des Ziel-Layers. source : str | None Zielquelle (z.B. Verfahrens-DB) oder None für temporär. Returns ------- QgsVectorLayer | None Neuer Pufferlayer oder None bei Fehler. """ if not basis_layer or not basis_layer.isValid(): self._log("❌ Basislayer ungültig – kein Puffer möglich") return None # -------------------------------------------------- # 1. Rechteck-Geometrie (Extent + Puffer) # -------------------------------------------------- extent = basis_layer.extent().buffered(distance) bbox_geom = QgsGeometry.fromRect(extent) # -------------------------------------------------- # 2. CRS übernehmen # -------------------------------------------------- crs_auth = basis_layer.crs().authid() uri = f"Polygon?crs={crs_auth}" mem_layer = QgsVectorLayer(uri, name, "memory") provider = mem_layer.dataProvider() provider.addAttributes([ QgsField("id", QVariant.Int) ]) mem_layer.updateFields() # -------------------------------------------------- # 4. Feature erzeugen # -------------------------------------------------- feat = QgsFeature(mem_layer.fields()) feat.setGeometry(bbox_geom) feat["id"] = 1 provider.addFeature(feat) mem_layer.updateExtents() # -------------------------------------------------- # 5. Temporärer Modus → direkt ins Projekt # -------------------------------------------------- if not source: QgsProject.instance().addMapLayer(mem_layer) self._log("✔ Temporärer rechteckiger Pufferlayer erzeugt") return mem_layer # -------------------------------------------------- # 6. Persistenter Modus → Datenschreiber # -------------------------------------------------- writer = Datenschreiber( pruefmanager=self.pruefmanager, gpkg_path=source, ) daten_dict = { "daten": { name: { "layer": mem_layer } } } results = writer.schreibe_Daten( daten_dict=daten_dict, processed_results=[], speicherort=source, ) if not results: self._log("❌ Schreiben des Pufferlayers fehlgeschlagen") return None writer.lade_Layer(results) layers = QgsProject.instance().mapLayersByName(name) if layers: self._log("✔ Persistenter rechteckiger Pufferlayer geladen") return layers[0] return None