Files
Plugin_SN_Basis/modules/LayerLoader.py
2026-03-12 16:14:02 +01:00

396 lines
16 KiB
Python

"""sn_basis/modules/LayerLoader.py
Kapselt Layer-Erstellung, Raumfilter und Stil-Logik.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import time
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
from sn_basis.functions.qgiscore_wrapper import (
QgsVectorLayer,
QgsRasterLayer,
QgsFeatureRequest,
QgsProject,
QgsNetworkAccessManager,
QgsCoordinateTransform,
)
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.layerpruefer import Layerpruefer
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qt_wrapper as qt
class LayerLoader:
"""Lädt und filtert Layer aus Dienst-/Datenquellen."""
def __init__(
self,
pruefmanager: Any,
stil_pruefer: Optional[Stilpruefer] = None,
layer_pruefer: Optional[Layerpruefer] = None,
) -> None:
self.pruefmanager = pruefmanager
self.stil_pruefer = stil_pruefer or Stilpruefer()
self.layer_pruefer = layer_pruefer or Layerpruefer()
_LAYER_TIMEOUT_MS = 30_000 # 30 Sekunden
def _was_canceled(self, cancel_callback: Optional[Any]) -> bool:
if not callable(cancel_callback):
return False
try:
return bool(cancel_callback())
except Exception:
return False
def _process_events(self) -> None:
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
def _transform_geometry_to_layer_crs(self, geometry: Any, source_layer: Any, target_layer: Any) -> Any:
if geometry is None or source_layer is None or target_layer is None:
return geometry
if QgsCoordinateTransform is None or QgsProject is None:
return geometry
try:
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
if source_crs is None or target_crs is None:
return geometry
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
if source_authid and target_authid and source_authid == target_authid:
return geometry
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
if hasattr(geometry, "clone") and callable(getattr(geometry, "clone")):
geom_copy = geometry.clone()
else:
geom_copy = geometry
geom_copy.transform(ct)
return geom_copy
except Exception:
return geometry
def _transform_extent_to_layer_crs(self, extent: Any, source_layer: Any, target_layer: Any) -> Any:
if extent is None or source_layer is None or target_layer is None:
return extent
if QgsCoordinateTransform is None or QgsProject is None:
return extent
try:
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
if source_crs is None or target_crs is None:
return extent
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
if source_authid and target_authid and source_authid == target_authid:
return extent
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
if hasattr(ct, "transformBoundingBox"):
return ct.transformBoundingBox(extent)
return extent
except Exception:
return extent
def create_layer(self, provider: str, link: str, thema: str) -> Optional[QgsVectorLayer]:
provider_lower = provider.lower() if provider else ""
layer = None
# Netzwerk-Timeout für alle netzwerkbasierten Provider setzen
if provider_lower in ("wfs", "wms", "rest"):
try:
nam = QgsNetworkAccessManager.instance()
if hasattr(nam, "setTimeout"):
nam.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
try:
if provider_lower == "wfs":
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
layer = QgsVectorLayer(uri, thema, "WFS")
elif provider_lower == "wms":
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
layer = QgsRasterLayer(uri, thema, "wms")
elif provider_lower in ("ogr", "gpkg", "shp", "geojson"):
layer = QgsVectorLayer(link, thema, "ogr")
elif provider_lower == "rest":
rest_link = link.strip()
if rest_link.lower().endswith("/featureserver"):
rest_link = rest_link.rstrip("/") + "/0"
uri = rest_link if rest_link.lower().startswith("url=") else f"url={rest_link}"
layer = QgsVectorLayer(uri, thema, "arcgisfeatureserver")
else:
layer = QgsVectorLayer(link, thema, "ogr")
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erstellen des Layers {thema}: {exc}",
aktion="layer_nicht_verfuegbar",
kontext={"provider": provider, "link": link},
)
)
return None
if not layer or not layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Layer {thema} (Provider={provider}) konnte nicht geladen werden."
,aktion="layer_nicht_verfuegbar",
kontext={"provider": provider, "link": link},
)
)
return None
return layer
def apply_style(self, layer: QgsVectorLayer, style_path: Optional[str]) -> None:
if not style_path or layer is None or not layer.isValid():
return
if not style_path.strip():
return
if not is_absolute_path(style_path):
plugin_root = get_plugin_root()
style_path = str(join_path(plugin_root, "sn_plan41", "assets", style_path))
# normalize path for consistency
style_path = str(normalize_path(style_path))
# Debug: welche Stil-Datei wird geprüft?
print(f"[LayerLoader] Überprüfe Stildatei: '{style_path}'")
if file_exists(style_path):
try:
layer.loadNamedStyle(style_path)
layer.triggerRepaint()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Stil-Laden für {layer.name()}: {exc}",
aktion="stil_laden_fehlgeschlagen",
kontext={"thema": layer.name(), "style_path": style_path},
)
)
else:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=True,
meldung=f"Stildatei nicht gefunden (optional): {style_path}",
aktion="stil_nicht_gefunden",
kontext={"thema": layer.name(), "style_path": style_path},
)
)
def filter_by_extent(self, layer: QgsVectorLayer, extent, cancel_callback: Optional[Any] = None, source_layer: Optional[Any] = None) -> Optional[QgsVectorLayer]:
"""Beschneidet <layer> auf die rechteckige Ausdehnung <extent>.
Diese Methode verwendet einen einfachen BBOX-Filter. Für komplexere
Raumeinschränkungen (z.B. Verfahrensgebiet) sollte stattdessen
:meth:`filter_by_layer` verwendet werden, da dort echte Geometrie-Tests
stattfinden.
"""
if not layer or not layer.isValid() or extent is None:
return layer
if layer.type() != QgsVectorLayer.VectorLayer:
return layer
extent_for_layer = self._transform_extent_to_layer_crs(extent, source_layer, layer)
request = QgsFeatureRequest().setFilterRect(extent_for_layer)
if hasattr(request, "setTimeout"):
try:
request.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
start = time.monotonic()
features: List[Any] = []
try:
for feat in layer.getFeatures(request):
if self._was_canceled(cancel_callback):
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Abbruch beim Raumfilter (BBOX) für {layer.name()}",
aktion="needs_user_action",
kontext={"thema": layer.name()},
)
)
return None
elapsed_ms = int((time.monotonic() - start) * 1000)
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Timeout beim Raumfilter (BBOX) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
aktion="url_nicht_erreichbar",
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
)
)
return None
features.append(feat)
if len(features) % 100 == 0:
self._process_events()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Lesen der Features für {layer.name()}: {exc}",
aktion="layer_nicht_verfuegbar",
kontext={"thema": layer.name()},
)
)
return None
if not features:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_bbox", "memory")
if not filtered_layer or not filtered_layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
aktion="filterlayer_nicht_erzeugt",
kontext={"thema": layer.name()},
)
)
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(features)
filtered_layer.updateExtents()
return filtered_layer
def filter_by_layer(self, layer: QgsVectorLayer, filter_layer: QgsVectorLayer, cancel_callback: Optional[Any] = None) -> Optional[QgsVectorLayer]:
"""Beschneidet <layer> auf die tatsächliche Geometrie des
<filter_layer>.
Diese Methode wird z.B. für das Verfahrensgebiet verwendet, damit nicht
die gesamte Bounding-Box, sondern nur die echten Flächen als Raumfilter
gelten. Wenn der Filter-Layer mehrere Features enthält, werden deren
Geometrien zu einem Multi-Geom vereinigt.
"""
if not layer or not layer.isValid() or not filter_layer or not filter_layer.isValid():
return layer
if layer.type() != QgsVectorLayer.VectorLayer:
return layer
# vereinigte Geometrie aller Features im Filter-Layer
union_geom = None
for f in filter_layer.getFeatures():
try:
geom = self._transform_geometry_to_layer_crs(f.geometry(), filter_layer, layer)
if union_geom is None:
union_geom = geom
else:
union_geom = union_geom.combine(geom)
except Exception:
# bei einem Fehler einfach weiterfahren
continue
if union_geom is None or union_geom.isEmpty():
return None
# nun alle Features aus <layer> nehmen, deren Geometrie sich schneidet
filtered = []
request = QgsFeatureRequest().setFilterRect(union_geom.boundingBox())
if hasattr(request, "setTimeout"):
try:
request.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
start = time.monotonic()
for f in layer.getFeatures(request):
if self._was_canceled(cancel_callback):
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Abbruch beim Raumfilter (Geometrie) für {layer.name()}",
aktion="needs_user_action",
kontext={"thema": layer.name()},
)
)
return None
elapsed_ms = int((time.monotonic() - start) * 1000)
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Timeout beim Raumfilter (Geometrie) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
aktion="url_nicht_erreichbar",
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
)
)
return None
try:
if f.geometry() and f.geometry().intersects(union_geom):
filtered.append(f)
except Exception:
continue
if len(filtered) % 100 == 0:
self._process_events()
if not filtered:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_filtered", "memory")
if not filtered_layer or not filtered_layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
aktion="filterlayer_nicht_erzeugt",
kontext={"thema": layer.name()},
)
)
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(filtered)
filtered_layer.updateExtents()
return filtered_layer
def add_to_project(self, layer: QgsVectorLayer) -> None:
if layer and layer.isValid():
QgsProject.instance().addMapLayer(layer)