Files
Plugin_SN_Basis/modules/Datenabruf.py

406 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# sn_basis/modules/Datenabruf.py
"""
Modul ``datenabruf``
Enthält die Klasse :class:`Datenabruf`, die für eine Menge bereits
validierter Links (aus ``validate_rows``) die Fachdaten abruft und
aggregierte Prüfergebnisse liefert.
Designprinzipien
----------------
- Die BBOX wird serverseitig angewendet: wenn ein Raumfilter aktiv ist,
wird die BBOX in die Abruf-URL eingebettet (außer bei WMS).
- Alle QGIS-Interaktionen laufen über die Wrapper `qgiscore_wrapper` und
`qgisui_wrapper`.
- Fehler werden als kurze Strings zurückgegeben und zentral in `log_fehler`
gesammelt; erfolgreiche Aufrufe werden in `log_geladen` protokolliert.
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
"""
from typing import Any, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import json
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qgiscore_wrapper as qgiscore
from sn_basis.functions import qgisui_wrapper as qgisui
from sn_basis.functions import qt_wrapper as qt
DataDict = Dict[str, List[Mapping[str, Any]]]
class Datenabruf:
"""
Führt den eigentlichen Fachdatenabruf für eine Menge validierter Links durch.
Erwartet ein ``DataDict`` der Form ``{"rows": [row1, row2, ...]}``.
"""
def __init__(self, pruefmanager: Any) -> None:
"""
Initialisiert eine neue Instanz des Datenabrufs.
Parameters
----------
pruefmanager:
Instanz des Pruefmanagers, der :class:`pruef_ergebnis` verarbeitet.
"""
self.pruefmanager = pruefmanager
# ------------------------------------------------------------------ #
# Öffentliche API
# ------------------------------------------------------------------ #
def datenabruf(
self,
result_dict: DataDict,
raumfilter: str,
verfahrensgebiet_layer: Any,
speicherort: str,
pruef_ergebnisse: Optional[List[Any]] = None,
) -> Tuple[Dict[str, Any], List[Any]]:
"""
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
liefert ein DatenDict sowie die Liste verarbeiteter Pruefergebnisse.
Logging / Aggregation
---------------------
Am Ende enthält das zusammenfassende PruefErgebnis im Kontext:
- geladen: dict(dienst -> anzahl geladen)
- fehler: dict(dienst -> fehlermeldung)
- relevant: dict(dienst -> anzahl relevant)
- ausserhalb: dict(dienst -> anzahl geladen, aber ausserhalb)
"""
if pruef_ergebnisse is None:
processed_results: List[Any] = []
else:
processed_results = list(pruef_ergebnisse)
rows = result_dict.get("rows", [])
daten: Dict[str, List[Any]] = {}
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
# Globale Logs über alle Dienste hinweg
log_geladen: Dict[str, int] = {}
log_fehler: Dict[str, str] = {}
log_relevant: Dict[str, int] = {}
log_ausserhalb: Dict[str, int] = {}
# 2) Über alle Zeilen iterieren
for row in rows:
ident = row.get("ident")
link = row.get("Link")
provider = row.get("Provider")
if not ident or not link or not provider:
pe = pruef_ergebnis(
ok=False,
meldung="Ungültige Zeile im Datenabruf (fehlende Pflichtfelder)",
aktion="pflichtfelder_fehlen",
kontext=row,
)
processed_results.append(self.pruefmanager.verarbeite(pe))
continue
# Lesbarer Dienstname für Logs
thema = row.get("Inhalt") or row.get("Thema") or row.get("Titel") or str(ident)
# 2a) Provider-spezifische URL zusammenbauen
# Wenn Raumfilter aktiv ist, übergeben wir bbox_geom an _build_provider_url,
# außer bei WMS (WMS bleibt unverändert).
use_bbox = (raumfilter != "ohne") and (str(provider).upper() != "WMS")
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
# 2b) Fachdaten abrufen
features, error_msg = self._fetch_features(url=url, provider=str(provider))
# 2c) Logs und Aggregation
if error_msg:
# Fehler beim Abruf
log_fehler[thema] = error_msg
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Abruf von {thema}: {error_msg}",
aktion="url_nicht_erreichbar",
kontext={"ident": ident, "thema": thema, "url": url, "error": error_msg},
)
processed_results.append(self.pruefmanager.verarbeite(pe_err))
# daten[ident] bleibt nicht gesetzt oder leer
daten[str(ident)] = []
continue
# Erfolgreich aufgerufen (auch wenn features == [])
anzahl_geladen = len(features)
log_geladen[thema] = anzahl_geladen
# Da die BBOX serverseitig angewendet wurde:
# - anzahl_geladen > 0 -> relevant
# - anzahl_geladen == 0 -> ausserhalb
if anzahl_geladen > 0:
log_relevant[thema] = anzahl_geladen
daten[str(ident)] = features
else:
log_ausserhalb[thema] = 0
daten[str(ident)] = []
# 2d) Kurzes Prüfergebnis pro Zeile
pe_row = pruef_ergebnis(
ok=True,
meldung=(
f"Datenabruf für ident={ident}: {anzahl_geladen} geladene Objekte"
),
aktion="datenabruf",
kontext={
"ident": ident,
"thema": thema,
"anzahl_gesamt": anzahl_geladen,
"url": url,
},
)
processed_results.append(self.pruefmanager.verarbeite(pe_row))
# 3) Zusammenfassendes Prüfergebnis (wie alter DataGrabber)
summary_kontext = {
"geladen": log_geladen,
"fehler": log_fehler,
"relevant": log_relevant,
"ausserhalb": log_ausserhalb,
}
pe_summary = pruef_ergebnis(
ok=(len(log_fehler) == 0),
meldung=(
f"Datenabruf abgeschlossen: {len(log_geladen)} Dienste geladen, "
f"{len(log_fehler)} Fehler"
),
aktion="datenabruf",
kontext=summary_kontext,
)
processed_results.append(self.pruefmanager.verarbeite(pe_summary))
daten_dict: Dict[str, Any] = {
"speicherort": speicherort,
"daten": daten,
}
return daten_dict, processed_results
# ------------------------------------------------------------------ #
# Hilfsmethoden: räumlicher Filter
# ------------------------------------------------------------------ #
def _determine_spatial_filter(self, raumfilter: str, verfahrensgebiet_layer: Any) -> Optional[Any]:
"""
Bestimmt die räumliche Filtergeometrie (BBox) abhängig vom Raumfilter.
Returns
-------
Optional[Any]
Eine Geometrie/Extent (z. B. QgsRectangle) oder ``None``.
"""
if raumfilter == "ohne":
return None
if verfahrensgebiet_layer is None:
return None
if raumfilter == "Verfahrensgebiet":
return qgiscore.get_layer_extent(verfahrensgebiet_layer)
if raumfilter == "Pufferlayer":
buffer_layer = qgiscore.create_buffer_layer(
source_layer=verfahrensgebiet_layer,
distance_m=1000.0,
layer_name="Verfahrensgebiet_Puffer_1km",
)
if buffer_layer is not None:
qgisui.add_layer_to_project(buffer_layer)
return qgiscore.get_layer_extent(buffer_layer)
return None
# ------------------------------------------------------------------ #
# Hilfsmethoden: Provider-URL und Datenabruf
# ------------------------------------------------------------------ #
def _build_provider_url(self, link: str, provider: str, bbox_geom: Optional[Any]) -> str:
"""
Baut eine Provider-spezifische Abruf-URL. Wenn `bbox_geom` übergeben
wird, wird sie in die URL eingebettet (außer bei WMS).
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
"""
provider_norm = (provider or "").upper()
base_link = link or ""
# WMS: niemals BBOX anhängen
if provider_norm == "WMS":
return base_link
if bbox_geom is None:
return base_link
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
bbox_str: Optional[str] = None
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(bbox_geom)
else:
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
else:
bbox_str = str(bbox_geom)
except Exception:
bbox_str = None
if not bbox_str:
return base_link
parsed = urlparse(base_link)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
if provider_norm == "WFS":
query_params.setdefault("BBOX", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
query_params.setdefault("f", query_params.get("f", "json"))
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
# Default: generischer bbox-Parameter
query_params.setdefault("bbox", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]:
"""
Führt den eigentlichen Abruf der Fachdaten durch.
Returns
-------
Tuple[List[Any], Optional[str]]
- features: Liste der geladenen Features (ggf. leer)
- error_msg: None bei Erfolg, sonst kurzer Fehlertext
"""
features: List[Any] = []
prov = str(provider).upper()
# WMS: kein Featureabruf; caller behandelt WMS separat (hier defensiv)
if prov == "WMS":
return [], None
# OGR / lokale Dateien: versuche QGIS-Layer (wenn QGIS verfügbar)
if prov in ("OGR", "GPKG", "SHP", "GEOJSON"):
if getattr(qgiscore, "QGIS_AVAILABLE", False):
try:
layer = qgiscore.QgsVectorLayer(url, "tmp", "ogr")
if not layer or not getattr(layer, "isValid", lambda: False)():
return [], "Layer ungültig oder konnte nicht geladen werden"
for feat in layer.getFeatures():
features.append(feat)
return features, None
except FileNotFoundError:
return [], "Lokale Datei nicht gefunden"
except Exception as exc:
return [], f"Fehler beim Laden der OGR-Quelle: {exc}"
else:
# Mock: falls GeoJSON-Datei vorhanden, versuche lokale Datei zu lesen
try:
if url.lower().endswith(".geojson"):
with open(url, "r", encoding="utf-8") as fh:
data = json.load(fh)
if isinstance(data, dict) and data.get("type") == "FeatureCollection":
return data.get("features", []), None
return [], "Keine QGIS-Umgebung und keine lesbare lokale GeoJSON"
except FileNotFoundError:
return [], "Lokale Datei nicht gefunden"
except Exception as exc:
return [], f"Fehler beim Lesen lokaler GeoJSON (Mock): {exc}"
# HTTP-basierte Dienste (WFS, REST/ArcGIS, generisch)
response_text: Optional[str] = None
http_error: Optional[str] = None
# QGIS NetworkAccessManager bevorzugen
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
try:
manager = qgiscore.QgsNetworkAccessManager.instance()
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None)
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None)
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None)
if QUrl is not None and QNetworkRequest is not None:
req = QNetworkRequest(QUrl(url))
reply = manager.get(req)
if QEventLoop is not None:
loop = QEventLoop()
reply.finished.connect(loop.quit)
loop.exec()
try:
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
try:
response_text = reply.text()
except Exception:
response_text = None
except Exception as exc:
http_error = f"QgsNetworkAccessManager error: {exc}"
response_text = None
# Fallback: requests
if response_text is None:
try:
import requests # lokal import, keine harte Abhängigkeit
r = requests.get(url, timeout=30)
r.raise_for_status()
response_text = r.text
except Exception as exc:
http_error = f"requests error: {exc}"
response_text = None
if response_text is None:
return [], http_error or "keine Antwort vom Server"
# Versuche JSON/GeoJSON zu parsen
try:
parsed = json.loads(response_text)
if isinstance(parsed, dict) and parsed.get("type") == "FeatureCollection":
return parsed.get("features", []), None
if isinstance(parsed, dict) and "features" in parsed:
return parsed.get("features", []), None
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
return [parsed], None
except json.JSONDecodeError:
# Nicht-JSON-Antwort (z. B. GML). Wenn QGIS verfügbar, versuche GML via temporärer Datei + OGR
if getattr(qgiscore, "QGIS_AVAILABLE", False):
try:
import tempfile
with tempfile.NamedTemporaryFile(suffix=".gml", delete=False, mode="w", encoding="utf-8") as fh:
fh.write(response_text)
tmp_path = fh.name
layer = qgiscore.QgsVectorLayer(tmp_path, "tmp_gml", "ogr")
if layer and getattr(layer, "isValid", lambda: False)():
for feat in layer.getFeatures():
features.append(feat)
return features, None
return [], "GML-Antwort konnte nicht als Layer geladen werden"
except Exception as exc:
return [], f"Fehler beim Parsen von GML: {exc}"
# Wenn alles fehlschlägt:
return [], "Antwort konnte nicht als JSON oder GML geparst werden"