Files
Plugin_SN_Basis/modules/Datenabruf.py
2026-03-12 16:14:02 +01:00

593 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# sn_basis/modules/Datenabruf.py
"""
Modul ``datenabruf``
Enthält die Klasse :class:`Datenabruf`, die für eine Menge bereits
validierter Links (aus ``validate_rows``) die Fachdaten abruft und
aggregierte Prüfergebnisse liefert.
Designprinzipien
----------------
- Die BBOX wird serverseitig angewendet: wenn ein Raumfilter aktiv ist,
wird die BBOX in die Abruf-URL eingebettet (außer bei WMS).
- Alle QGIS-Interaktionen laufen über die Wrapper `qgiscore_wrapper` und
`qgisui_wrapper`.
- Fehler werden als kurze Strings zurückgegeben und zentral in `log_fehler`
gesammelt; erfolgreiche Aufrufe werden in `log_geladen` protokolliert.
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
"""
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import json
import time
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qgiscore_wrapper as qgiscore
from sn_basis.functions import qgisui_wrapper as qgisui
from sn_basis.functions import qt_wrapper as qt
DataDict = Dict[str, List[Mapping[str, Any]]]
class Datenabruf:
"""
Führt den eigentlichen Fachdatenabruf für eine Menge validierter Links durch.
Erwartet ein ``DataDict`` der Form ``{"rows": [row1, row2, ...]}``.
"""
def __init__(self, pruefmanager: Any) -> None:
"""
Initialisiert eine neue Instanz des Datenabrufs.
Parameters
----------
pruefmanager:
Instanz des Pruefmanagers, der :class:`pruef_ergebnis` verarbeitet.
"""
self.pruefmanager = pruefmanager
# ------------------------------------------------------------------ #
# Öffentliche API
# ------------------------------------------------------------------ #
def datenabruf(
self,
result_dict: DataDict,
raumfilter: str,
verfahrensgebiet_layer: Any,
speicherort: str,
pruef_ergebnisse: Optional[List[Any]] = None,
progress: Optional[Any] = None,
) -> Tuple[Dict[str, Any], List[Any]]:
"""
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
liefert ein DatenDict sowie die Liste verarbeiteter Pruefergebnisse.
Logging / Aggregation
---------------------
Am Ende enthält das zusammenfassende PruefErgebnis im Kontext:
- geladen: dict(dienst -> anzahl geladen)
- fehler: dict(dienst -> fehlermeldung)
- relevant: dict(dienst -> anzahl relevant)
- ausserhalb: dict(dienst -> anzahl geladen, aber ausserhalb)
"""
if pruef_ergebnisse is None:
processed_results: List[Any] = []
else:
processed_results = list(pruef_ergebnisse)
rows = result_dict.get("rows", [])
daten: Dict[str, List[Any]] = {}
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
filter_crs_authid = None
if isinstance(bbox_geom, dict):
raw_crs = bbox_geom.get("crs_authid")
filter_crs_authid = str(raw_crs) if raw_crs else None
# Globale Logs über alle Dienste hinweg
log_geladen: Dict[str, int] = {}
log_fehler: Dict[str, str] = {}
log_relevant: Dict[str, int] = {}
log_ausserhalb: Dict[str, int] = {}
# 2) Über alle Zeilen iterieren
total_rows = len(rows)
for idx, row in enumerate(rows, start=1):
if progress is not None:
progress.set_label(f"Datenabruf {idx}/{total_rows}")
if progress.is_canceled():
pe_cancel = pruef_ergebnis(
ok=False,
meldung="Datenabruf durch Benutzer abgebrochen",
aktion="abbruch",
kontext={"schritt": idx},
)
processed_results.append(self.pruefmanager.verarbeite(pe_cancel))
break
ident = row.get("ident")
link = row.get("Link")
provider = row.get("Provider")
if not ident or not link or not provider:
pe = pruef_ergebnis(
ok=False,
meldung="Ungültige Zeile im Datenabruf (fehlende Pflichtfelder)",
aktion="pflichtfelder_fehlen",
kontext=row,
)
processed_results.append(self.pruefmanager.verarbeite(pe))
continue
# Lesbarer Dienstname für Logs
thema = row.get("Inhalt") or row.get("Thema") or row.get("Titel") or str(ident)
# 2a) Provider-spezifische URL zusammenbauen
# Wenn Raumfilter aktiv ist, übergeben wir bbox_geom an _build_provider_url,
# außer bei WMS (WMS bleibt unverändert).
use_bbox = (raumfilter != "ohne") and (str(provider).upper() != "WMS")
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
# 2b) Fachdaten abrufen
features, error_msg = self._fetch_features(
url=url,
provider=str(provider),
cancel_callback=(progress.is_canceled if progress is not None else None),
)
if progress is not None:
if hasattr(progress, "set_value"):
progress.set_value(idx)
# 2c) Logs und Aggregation
if error_msg:
# Fehler beim Abruf
log_fehler[thema] = error_msg
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Abruf von {thema}: {error_msg}",
aktion="url_nicht_erreichbar",
kontext={"ident": ident, "thema": thema, "url": url, "error": error_msg},
)
processed_results.append(self.pruefmanager.verarbeite(pe_err))
# daten[ident] bleibt nicht gesetzt oder leer
daten[str(ident)] = []
continue
# Erfolgreich aufgerufen (auch wenn features == [])
anzahl_geladen = len(features)
log_geladen[thema] = anzahl_geladen
# Da die BBOX serverseitig angewendet wurde:
# - anzahl_geladen > 0 -> relevant
# - anzahl_geladen == 0 -> ausserhalb
if anzahl_geladen > 0:
log_relevant[thema] = anzahl_geladen
daten[str(ident)] = features
else:
log_ausserhalb[thema] = 0
daten[str(ident)] = []
# 2d) Kurzes Prüfergebnis pro Zeile
pe_row = pruef_ergebnis(
ok=True,
meldung=(
f"Datenabruf für ident={ident}: {anzahl_geladen} geladene Objekte"
),
aktion="datenabruf",
kontext={
"ident": ident,
"thema": thema,
"anzahl_gesamt": anzahl_geladen,
"url": url,
},
)
processed_results.append(self.pruefmanager.verarbeite(pe_row))
# 3) Zusammenfassendes Prüfergebnis (wie alter DataGrabber)
summary_kontext = {
"geladen": log_geladen,
"fehler": log_fehler,
"relevant": log_relevant,
"ausserhalb": log_ausserhalb,
}
pe_summary = pruef_ergebnis(
ok=(len(log_fehler) == 0),
meldung=(
f"Datenabruf abgeschlossen: {len(log_geladen)} Dienste geladen, "
f"{len(log_fehler)} Fehler"
),
aktion="datenabruf",
kontext=summary_kontext,
)
processed_results.append(self.pruefmanager.verarbeite(pe_summary))
daten_dict: Dict[str, Any] = {
"speicherort": speicherort,
"daten": daten,
}
return daten_dict, processed_results
# ------------------------------------------------------------------ #
# Hilfsmethoden: räumlicher Filter
# ------------------------------------------------------------------ #
def _determine_spatial_filter(self, raumfilter: str, verfahrensgebiet_layer: Any) -> Optional[Any]:
"""
Bestimmt die räumliche Filtergeometrie (BBox) abhängig vom Raumfilter.
Returns
-------
Optional[Any]
Eine Geometrie/Extent (z. B. QgsRectangle) oder ``None``.
"""
if raumfilter == "ohne":
return None
if verfahrensgebiet_layer is None:
return None
if raumfilter == "Verfahrensgebiet":
extent = qgiscore.get_layer_extent(verfahrensgebiet_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(verfahrensgebiet_layer, "crs") and callable(getattr(verfahrensgebiet_layer, "crs")):
crs = verfahrensgebiet_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
if raumfilter == "Pufferlayer":
buffer_layer = qgiscore.create_buffer_layer(
source_layer=verfahrensgebiet_layer,
distance_m=1000.0,
layer_name="Verfahrensgebiet_Puffer_1km",
)
if buffer_layer is not None:
extent = qgiscore.get_layer_extent(buffer_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(buffer_layer, "crs") and callable(getattr(buffer_layer, "crs")):
crs = buffer_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
return None
# ------------------------------------------------------------------ #
# Hilfsmethoden: Provider-URL und Datenabruf
# ------------------------------------------------------------------ #
def _build_provider_url(self, link: str, provider: str, bbox_geom: Optional[Any]) -> str:
"""
Baut eine Provider-spezifische Abruf-URL. Wenn `bbox_geom` übergeben
wird, wird sie in die URL eingebettet (außer bei WMS).
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
"""
provider_norm = (provider or "").upper()
base_link = (link or "").strip()
if base_link.lower().startswith("url="):
base_link = base_link[4:].strip()
if provider_norm == "WFS" and base_link.count("?") > 1:
first, rest = base_link.split("?", 1)
base_link = f"{first}?{rest.replace('?', '&')}"
extent_obj = bbox_geom
crs_authid: Optional[str] = None
if isinstance(bbox_geom, dict):
extent_obj = bbox_geom.get("extent")
raw_crs = bbox_geom.get("crs_authid")
crs_authid = str(raw_crs) if raw_crs else None
# WMS: unverändert durchreichen
if provider_norm == "WMS":
return base_link
# Versuche bbox-String zu erzeugen (falls Raumfilter aktiv)
bbox_str: Optional[str] = None
if extent_obj is not None:
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(extent_obj)
else:
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
bbox_str = f"{extent_obj.xmin()},{extent_obj.ymin()},{extent_obj.xmax()},{extent_obj.ymax()}"
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
bbox_str = f"{extent_obj[0]},{extent_obj[1]},{extent_obj[2]},{extent_obj[3]}"
else:
bbox_str = str(extent_obj)
except Exception:
bbox_str = None
parsed = urlparse(base_link)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
if provider_norm == "WFS":
query_params.setdefault("service", "WFS")
query_params.setdefault("request", "GetFeature")
query_params.setdefault("outputFormat", "application/json")
if bbox_str:
query_params.setdefault("BBOX", bbox_str)
if crs_authid:
query_params.setdefault("SRSNAME", crs_authid)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
# ArcGIS FeatureServer erwartet i.d.R. den /query-Endpunkt
rest_base = base_link.rstrip("/")
if not rest_base.lower().endswith("/query"):
rest_base = f"{rest_base}/query"
parsed_rest = urlparse(rest_base)
query_params = dict(parse_qsl(parsed_rest.query, keep_blank_values=True))
query_params.setdefault("where", "1=1")
query_params.setdefault("outFields", "*")
query_params.setdefault("returnGeometry", "true")
query_params.setdefault("f", query_params.get("f", "json"))
if bbox_str:
geometry_envelope = None
try:
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
geometry_envelope = {
"xmin": extent_obj.xmin(),
"ymin": extent_obj.ymin(),
"xmax": extent_obj.xmax(),
"ymax": extent_obj.ymax(),
}
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
geometry_envelope = {
"xmin": extent_obj[0],
"ymin": extent_obj[1],
"xmax": extent_obj[2],
"ymax": extent_obj[3],
}
else:
parts = [p.strip() for p in str(bbox_str).split(",")]
if len(parts) == 4:
geometry_envelope = {
"xmin": float(parts[0]),
"ymin": float(parts[1]),
"xmax": float(parts[2]),
"ymax": float(parts[3]),
}
except Exception:
geometry_envelope = None
if geometry_envelope is not None:
query_params.setdefault("geometry", json.dumps(geometry_envelope))
else:
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
if crs_authid and ":" in crs_authid:
srid = crs_authid.split(":", 1)[1]
if srid.isdigit():
query_params.setdefault("inSR", srid)
query_params.setdefault("outSR", srid)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed_rest._replace(query=new_query)
return urlunparse(rebuilt)
# Default: generischer bbox-Parameter (nur wenn vorhanden)
if bbox_str:
query_params.setdefault("bbox", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
def _fetch_features(
self,
url: str,
provider: str,
cancel_callback: Optional[Callable[[], bool]] = None,
) -> Tuple[List[Any], Optional[str]]:
"""
Führt den eigentlichen Abruf der Fachdaten durch.
Returns
-------
Tuple[List[Any], Optional[str]]
- features: Liste der geladenen Features (ggf. leer)
- error_msg: None bei Erfolg, sonst kurzer Fehlertext
"""
features: List[Any] = []
prov = str(provider).upper()
# WMS: kein Featureabruf; caller behandelt WMS separat (hier defensiv)
if prov == "WMS":
return [], None
# OGR / lokale Dateien: versuche QGIS-Layer (wenn QGIS verfügbar)
if prov in ("OGR", "GPKG", "SHP", "GEOJSON"):
if getattr(qgiscore, "QGIS_AVAILABLE", False):
try:
layer = qgiscore.QgsVectorLayer(url, "tmp", "ogr")
if not layer or not getattr(layer, "isValid", lambda: False)():
return [], "Layer ungültig oder konnte nicht geladen werden"
for feat in layer.getFeatures():
features.append(feat)
return features, None
except FileNotFoundError:
return [], "Lokale Datei nicht gefunden"
except Exception as exc:
return [], f"Fehler beim Laden der OGR-Quelle: {exc}"
else:
# Mock: falls GeoJSON-Datei vorhanden, versuche lokale Datei zu lesen
try:
if url.lower().endswith(".geojson"):
with open(url, "r", encoding="utf-8") as fh:
data = json.load(fh)
if isinstance(data, dict) and data.get("type") == "FeatureCollection":
return data.get("features", []), None
return [], "Keine QGIS-Umgebung und keine lesbare lokale GeoJSON"
except FileNotFoundError:
return [], "Lokale Datei nicht gefunden"
except Exception as exc:
return [], f"Fehler beim Lesen lokaler GeoJSON (Mock): {exc}"
# HTTP-basierte Dienste (WFS, REST/ArcGIS, generisch)
response_text: Optional[str] = None
http_error: Optional[str] = None
# QGIS NetworkAccessManager bevorzugen
_FETCH_TIMEOUT_MS = 30_000 # 30 Sekunden
aborted_or_timed_out = False
attempted_qgis_fetch = False
if callable(cancel_callback) and cancel_callback():
return [], "Abbruch durch Benutzer"
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
attempted_qgis_fetch = True
try:
manager = qgiscore.QgsNetworkAccessManager.instance()
# Netzwerk-Timeout global setzen (QGIS >= 3.6)
if hasattr(manager, "setTimeout"):
manager.setTimeout(_FETCH_TIMEOUT_MS)
_qt = __import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"])
QUrl = getattr(_qt, "QUrl", None)
QNetworkRequest = getattr(_qt, "QNetworkRequest", None)
QEventLoop = getattr(_qt, "QEventLoop", None)
QTimer = getattr(_qt, "QTimer", None)
if QUrl is not None and QNetworkRequest is not None:
req = QNetworkRequest(QUrl(url))
reply = manager.get(req)
if QEventLoop is not None:
loop = QEventLoop()
reply.finished.connect(loop.quit)
_poll_timer = None
if QTimer is not None:
try:
_poll_timer = QTimer()
_poll_timer.setSingleShot(False)
_poll_timer.timeout.connect(loop.quit)
_poll_timer.start(100)
except Exception:
_poll_timer = None
start_time = time.monotonic()
while True:
if callable(cancel_callback) and cancel_callback():
reply.abort()
http_error = "Abbruch durch Benutzer"
aborted_or_timed_out = True
break
elapsed_ms = int((time.monotonic() - start_time) * 1000)
if elapsed_ms >= _FETCH_TIMEOUT_MS:
reply.abort()
http_error = f"Timeout nach {_FETCH_TIMEOUT_MS // 1000} s: {url}"
aborted_or_timed_out = True
break
if hasattr(reply, "isFinished") and reply.isFinished():
break
loop.exec()
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
if _poll_timer is not None:
try:
_poll_timer.stop()
except Exception:
pass
if not aborted_or_timed_out:
# Fehler aus Reply auslesen
err_code = None
try:
err_code = reply.error()
except Exception:
pass
if err_code and int(err_code) != 0:
http_error = f"Netzwerkfehler ({err_code}): {reply.errorString()}"
if http_error:
# Timeout oder Netzwerkfehler keinen Body lesen
pass
else:
try:
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
try:
response_text = reply.text()
except Exception:
response_text = None
except Exception as exc:
http_error = f"QgsNetworkAccessManager error: {exc}"
response_text = None
# Fallback: requests nur wenn kein harter Abbruch/Timeout im QGIS-Request vorlag
if response_text is None and (not attempted_qgis_fetch or not aborted_or_timed_out):
try:
import requests # lokal import, keine harte Abhängigkeit
r = requests.get(url, timeout=30)
r.raise_for_status()
response_text = r.text
except Exception as exc:
http_error = f"requests error: {exc}"
response_text = None
if response_text is None:
return [], http_error or "keine Antwort vom Server"
# Versuche JSON/GeoJSON zu parsen
try:
parsed = json.loads(response_text)
if isinstance(parsed, dict) and parsed.get("type") == "FeatureCollection":
return parsed.get("features", []), None
if isinstance(parsed, dict) and "features" in parsed:
return parsed.get("features", []), None
if prov in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER", "WFS"):
return [], "Antwort enthält keine Feature-Liste"
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
return [parsed], None
except json.JSONDecodeError:
# Nicht-JSON-Antwort (z. B. GML). Wenn QGIS verfügbar, versuche GML via temporärer Datei + OGR
if getattr(qgiscore, "QGIS_AVAILABLE", False):
try:
import tempfile
with tempfile.NamedTemporaryFile(suffix=".gml", delete=False, mode="w", encoding="utf-8") as fh:
fh.write(response_text)
tmp_path = fh.name
layer = qgiscore.QgsVectorLayer(tmp_path, "tmp_gml", "ogr")
if layer and getattr(layer, "isValid", lambda: False)():
for feat in layer.getFeatures():
features.append(feat)
return features, None
return [], "GML-Antwort konnte nicht als Layer geladen werden"
except Exception as exc:
return [], f"Fehler beim Parsen von GML: {exc}"
# Wenn alles fehlschlägt:
return [], "Antwort konnte nicht als JSON oder GML geparst werden"