"""sn_basis/modules/LayerLoader.py Kapselt Layer-Erstellung, Raumfilter und Stil-Logik. """ from __future__ import annotations from typing import Any, Dict, List, Optional import time from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path from sn_basis.functions.qgiscore_wrapper import ( QgsVectorLayer, QgsRasterLayer, QgsFeatureRequest, QgsProject, QgsNetworkAccessManager, QgsCoordinateTransform, ) from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists from sn_basis.modules.stilpruefer import Stilpruefer from sn_basis.modules.layerpruefer import Layerpruefer from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.functions import qt_wrapper as qt class LayerLoader: """Lädt und filtert Layer aus Dienst-/Datenquellen.""" def __init__( self, pruefmanager: Any, stil_pruefer: Optional[Stilpruefer] = None, layer_pruefer: Optional[Layerpruefer] = None, ) -> None: self.pruefmanager = pruefmanager self.stil_pruefer = stil_pruefer or Stilpruefer() self.layer_pruefer = layer_pruefer or Layerpruefer() _LAYER_TIMEOUT_MS = 30_000 # 30 Sekunden def _was_canceled(self, cancel_callback: Optional[Any]) -> bool: if not callable(cancel_callback): return False try: return bool(cancel_callback()) except Exception: return False def _process_events(self) -> None: try: if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"): qt.QCoreApplication.processEvents() except Exception: pass def _transform_geometry_to_layer_crs(self, geometry: Any, source_layer: Any, target_layer: Any) -> Any: if geometry is None or source_layer is None or target_layer is None: return geometry if QgsCoordinateTransform is None or QgsProject is None: return geometry try: source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None if source_crs is None or target_crs is None: return geometry source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None if source_authid and target_authid and source_authid == target_authid: return geometry ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance()) if hasattr(geometry, "clone") and callable(getattr(geometry, "clone")): geom_copy = geometry.clone() else: geom_copy = geometry geom_copy.transform(ct) return geom_copy except Exception: return geometry def _transform_extent_to_layer_crs(self, extent: Any, source_layer: Any, target_layer: Any) -> Any: if extent is None or source_layer is None or target_layer is None: return extent if QgsCoordinateTransform is None or QgsProject is None: return extent try: source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None if source_crs is None or target_crs is None: return extent source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None if source_authid and target_authid and source_authid == target_authid: return extent ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance()) if hasattr(ct, "transformBoundingBox"): return ct.transformBoundingBox(extent) return extent except Exception: return extent def create_layer(self, provider: str, link: str, thema: str) -> Optional[QgsVectorLayer]: provider_lower = provider.lower() if provider else "" layer = None # Netzwerk-Timeout für alle netzwerkbasierten Provider setzen if provider_lower in ("wfs", "wms", "rest"): try: nam = QgsNetworkAccessManager.instance() if hasattr(nam, "setTimeout"): nam.setTimeout(self._LAYER_TIMEOUT_MS) except Exception: pass try: if provider_lower == "wfs": uri = link if link.strip().lower().startswith("url=") else f"url={link}" layer = QgsVectorLayer(uri, thema, "WFS") elif provider_lower == "wms": uri = link if link.strip().lower().startswith("url=") else f"url={link}" layer = QgsRasterLayer(uri, thema, "wms") elif provider_lower in ("ogr", "gpkg", "shp", "geojson"): layer = QgsVectorLayer(link, thema, "ogr") elif provider_lower == "rest": rest_link = link.strip() if rest_link.lower().endswith("/featureserver"): rest_link = rest_link.rstrip("/") + "/0" uri = rest_link if rest_link.lower().startswith("url=") else f"url={rest_link}" layer = QgsVectorLayer(uri, thema, "arcgisfeatureserver") else: layer = QgsVectorLayer(link, thema, "ogr") except Exception as exc: self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Fehler beim Erstellen des Layers {thema}: {exc}", aktion="layer_nicht_verfuegbar", kontext={"provider": provider, "link": link}, ) ) return None if not layer or not layer.isValid(): self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Layer {thema} (Provider={provider}) konnte nicht geladen werden." ,aktion="layer_nicht_verfuegbar", kontext={"provider": provider, "link": link}, ) ) return None return layer def apply_style(self, layer: QgsVectorLayer, style_path: Optional[str]) -> None: if not style_path or layer is None or not layer.isValid(): return if not style_path.strip(): return if not is_absolute_path(style_path): plugin_root = get_plugin_root() style_path = str(join_path(plugin_root, "sn_plan41", "assets", style_path)) # normalize path for consistency style_path = str(normalize_path(style_path)) # Debug: welche Stil-Datei wird geprüft? print(f"[LayerLoader] Überprüfe Stildatei: '{style_path}'") if file_exists(style_path): try: layer.loadNamedStyle(style_path) layer.triggerRepaint() except Exception as exc: self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Fehler beim Stil-Laden für {layer.name()}: {exc}", aktion="stil_laden_fehlgeschlagen", kontext={"thema": layer.name(), "style_path": style_path}, ) ) else: self.pruefmanager.verarbeite( pruef_ergebnis( ok=True, meldung=f"Stildatei nicht gefunden (optional): {style_path}", aktion="stil_nicht_gefunden", kontext={"thema": layer.name(), "style_path": style_path}, ) ) def filter_by_extent(self, layer: QgsVectorLayer, extent, cancel_callback: Optional[Any] = None, source_layer: Optional[Any] = None) -> Optional[QgsVectorLayer]: """Beschneidet auf die rechteckige Ausdehnung . Diese Methode verwendet einen einfachen BBOX-Filter. Für komplexere Raumeinschränkungen (z.B. Verfahrensgebiet) sollte stattdessen :meth:`filter_by_layer` verwendet werden, da dort echte Geometrie-Tests stattfinden. """ if not layer or not layer.isValid() or extent is None: return layer if layer.type() != QgsVectorLayer.VectorLayer: return layer extent_for_layer = self._transform_extent_to_layer_crs(extent, source_layer, layer) request = QgsFeatureRequest().setFilterRect(extent_for_layer) if hasattr(request, "setTimeout"): try: request.setTimeout(self._LAYER_TIMEOUT_MS) except Exception: pass start = time.monotonic() features: List[Any] = [] try: for feat in layer.getFeatures(request): if self._was_canceled(cancel_callback): self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Abbruch beim Raumfilter (BBOX) für {layer.name()}", aktion="needs_user_action", kontext={"thema": layer.name()}, ) ) return None elapsed_ms = int((time.monotonic() - start) * 1000) if elapsed_ms >= self._LAYER_TIMEOUT_MS: self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Timeout beim Raumfilter (BBOX) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s", aktion="url_nicht_erreichbar", kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000}, ) ) return None features.append(feat) if len(features) % 100 == 0: self._process_events() except Exception as exc: self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Fehler beim Lesen der Features für {layer.name()}: {exc}", aktion="layer_nicht_verfuegbar", kontext={"thema": layer.name()}, ) ) return None if not features: return None geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"} geom_type = geom_type_map.get(layer.geometryType(), "Polygon") uri = f"{geom_type}?crs={layer.crs().authid()}" filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_bbox", "memory") if not filtered_layer or not filtered_layer.isValid(): self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}", aktion="filterlayer_nicht_erzeugt", kontext={"thema": layer.name()}, ) ) return None provider = filtered_layer.dataProvider() provider.addAttributes(layer.fields()) filtered_layer.updateFields() provider.addFeatures(features) filtered_layer.updateExtents() return filtered_layer def filter_by_layer(self, layer: QgsVectorLayer, filter_layer: QgsVectorLayer, cancel_callback: Optional[Any] = None) -> Optional[QgsVectorLayer]: """Beschneidet auf die tatsächliche Geometrie des . Diese Methode wird z.B. für das Verfahrensgebiet verwendet, damit nicht die gesamte Bounding-Box, sondern nur die echten Flächen als Raumfilter gelten. Wenn der Filter-Layer mehrere Features enthält, werden deren Geometrien zu einem Multi-Geom vereinigt. """ if not layer or not layer.isValid() or not filter_layer or not filter_layer.isValid(): return layer if layer.type() != QgsVectorLayer.VectorLayer: return layer # vereinigte Geometrie aller Features im Filter-Layer union_geom = None for f in filter_layer.getFeatures(): try: geom = self._transform_geometry_to_layer_crs(f.geometry(), filter_layer, layer) if union_geom is None: union_geom = geom else: union_geom = union_geom.combine(geom) except Exception: # bei einem Fehler einfach weiterfahren continue if union_geom is None or union_geom.isEmpty(): return None # nun alle Features aus nehmen, deren Geometrie sich schneidet filtered = [] request = QgsFeatureRequest().setFilterRect(union_geom.boundingBox()) if hasattr(request, "setTimeout"): try: request.setTimeout(self._LAYER_TIMEOUT_MS) except Exception: pass start = time.monotonic() for f in layer.getFeatures(request): if self._was_canceled(cancel_callback): self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Abbruch beim Raumfilter (Geometrie) für {layer.name()}", aktion="needs_user_action", kontext={"thema": layer.name()}, ) ) return None elapsed_ms = int((time.monotonic() - start) * 1000) if elapsed_ms >= self._LAYER_TIMEOUT_MS: self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Timeout beim Raumfilter (Geometrie) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s", aktion="url_nicht_erreichbar", kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000}, ) ) return None try: if f.geometry() and f.geometry().intersects(union_geom): filtered.append(f) except Exception: continue if len(filtered) % 100 == 0: self._process_events() if not filtered: return None geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"} geom_type = geom_type_map.get(layer.geometryType(), "Polygon") uri = f"{geom_type}?crs={layer.crs().authid()}" filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_filtered", "memory") if not filtered_layer or not filtered_layer.isValid(): self.pruefmanager.verarbeite( pruef_ergebnis( ok=False, meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}", aktion="filterlayer_nicht_erzeugt", kontext={"thema": layer.name()}, ) ) return None provider = filtered_layer.dataProvider() provider.addAttributes(layer.fields()) filtered_layer.updateFields() provider.addFeatures(filtered) filtered_layer.updateExtents() return filtered_layer def add_to_project(self, layer: QgsVectorLayer) -> None: if layer and layer.isValid(): QgsProject.instance().addMapLayer(layer)