Files

406 lines
17 KiB
Python
Raw Permalink Normal View History

# sn_basis/modules/Datenabruf.py
"""
Modul ``datenabruf``
Enthält die Klasse :class:`Datenabruf`, die für eine Menge bereits
validierter Links (aus ``validate_rows``) die Fachdaten abruft und
aggregierte Prüfergebnisse liefert.
Designprinzipien
----------------
- Die BBOX wird serverseitig angewendet: wenn ein Raumfilter aktiv ist,
wird die BBOX in die Abruf-URL eingebettet (außer bei WMS).
- Alle QGIS-Interaktionen laufen über die Wrapper `qgiscore_wrapper` und
`qgisui_wrapper`.
- Fehler werden als kurze Strings zurückgegeben und zentral in `log_fehler`
gesammelt; erfolgreiche Aufrufe werden in `log_geladen` protokolliert.
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
"""
from typing import Any, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import json
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qgiscore_wrapper as qgiscore
from sn_basis.functions import qgisui_wrapper as qgisui
from sn_basis.functions import qt_wrapper as qt
DataDict = Dict[str, List[Mapping[str, Any]]]
class Datenabruf:
"""
Führt den eigentlichen Fachdatenabruf für eine Menge validierter Links durch.
Erwartet ein ``DataDict`` der Form ``{"rows": [row1, row2, ...]}``.
"""
def __init__(self, pruefmanager: Any) -> None:
"""
Initialisiert eine neue Instanz des Datenabrufs.
Parameters
----------
pruefmanager:
Instanz des Pruefmanagers, der :class:`pruef_ergebnis` verarbeitet.
"""
self.pruefmanager = pruefmanager
# ------------------------------------------------------------------ #
# Öffentliche API
# ------------------------------------------------------------------ #
def datenabruf(
self,
result_dict: DataDict,
raumfilter: str,
verfahrensgebiet_layer: Any,
speicherort: str,
pruef_ergebnisse: Optional[List[Any]] = None,
) -> Tuple[Dict[str, Any], List[Any]]:
"""
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
liefert ein DatenDict sowie die Liste verarbeiteter Pruefergebnisse.
Logging / Aggregation
---------------------
Am Ende enthält das zusammenfassende PruefErgebnis im Kontext:
- geladen: dict(dienst -> anzahl geladen)
- fehler: dict(dienst -> fehlermeldung)
- relevant: dict(dienst -> anzahl relevant)
- ausserhalb: dict(dienst -> anzahl geladen, aber ausserhalb)
"""
if pruef_ergebnisse is None:
processed_results: List[Any] = []
else:
processed_results = list(pruef_ergebnisse)
rows = result_dict.get("rows", [])
daten: Dict[str, List[Any]] = {}
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
# Globale Logs über alle Dienste hinweg
log_geladen: Dict[str, int] = {}
log_fehler: Dict[str, str] = {}
log_relevant: Dict[str, int] = {}
log_ausserhalb: Dict[str, int] = {}
# 2) Über alle Zeilen iterieren
for row in rows:
ident = row.get("ident")
link = row.get("Link")
provider = row.get("Provider")
if not ident or not link or not provider:
pe = pruef_ergebnis(
ok=False,
meldung="Ungültige Zeile im Datenabruf (fehlende Pflichtfelder)",
aktion="pflichtfelder_fehlen",
kontext=row,
)
processed_results.append(self.pruefmanager.verarbeite(pe))
continue
# Lesbarer Dienstname für Logs
thema = row.get("Inhalt") or row.get("Thema") or row.get("Titel") or str(ident)
# 2a) Provider-spezifische URL zusammenbauen
# Wenn Raumfilter aktiv ist, übergeben wir bbox_geom an _build_provider_url,
# außer bei WMS (WMS bleibt unverändert).
use_bbox = (raumfilter != "ohne") and (str(provider).upper() != "WMS")
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
# 2b) Fachdaten abrufen
features, error_msg = self._fetch_features(url=url, provider=str(provider))
# 2c) Logs und Aggregation
if error_msg:
# Fehler beim Abruf
log_fehler[thema] = error_msg
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Abruf von {thema}: {error_msg}",
aktion="url_nicht_erreichbar",
kontext={"ident": ident, "thema": thema, "url": url, "error": error_msg},
)
processed_results.append(self.pruefmanager.verarbeite(pe_err))
# daten[ident] bleibt nicht gesetzt oder leer
daten[str(ident)] = []
continue
# Erfolgreich aufgerufen (auch wenn features == [])
anzahl_geladen = len(features)
log_geladen[thema] = anzahl_geladen
# Da die BBOX serverseitig angewendet wurde:
# - anzahl_geladen > 0 -> relevant
# - anzahl_geladen == 0 -> ausserhalb
if anzahl_geladen > 0:
log_relevant[thema] = anzahl_geladen
daten[str(ident)] = features
else:
log_ausserhalb[thema] = 0
daten[str(ident)] = []
# 2d) Kurzes Prüfergebnis pro Zeile
pe_row = pruef_ergebnis(
ok=True,
meldung=(
f"Datenabruf für ident={ident}: {anzahl_geladen} geladene Objekte"
),
aktion="datenabruf",
kontext={
"ident": ident,
"thema": thema,
"anzahl_gesamt": anzahl_geladen,
"url": url,
},
)
processed_results.append(self.pruefmanager.verarbeite(pe_row))
# 3) Zusammenfassendes Prüfergebnis (wie alter DataGrabber)
summary_kontext = {
"geladen": log_geladen,
"fehler": log_fehler,
"relevant": log_relevant,
"ausserhalb": log_ausserhalb,
}
pe_summary = pruef_ergebnis(
ok=(len(log_fehler) == 0),
meldung=(
f"Datenabruf abgeschlossen: {len(log_geladen)} Dienste geladen, "
f"{len(log_fehler)} Fehler"
),
aktion="datenabruf",
kontext=summary_kontext,
)
processed_results.append(self.pruefmanager.verarbeite(pe_summary))
daten_dict: Dict[str, Any] = {
"speicherort": speicherort,
"daten": daten,
}
return daten_dict, processed_results
# ------------------------------------------------------------------ #
# Hilfsmethoden: räumlicher Filter
# ------------------------------------------------------------------ #
def _determine_spatial_filter(self, raumfilter: str, verfahrensgebiet_layer: Any) -> Optional[Any]:
"""
Bestimmt die räumliche Filtergeometrie (BBox) abhängig vom Raumfilter.
Returns
-------
Optional[Any]
Eine Geometrie/Extent (z. B. QgsRectangle) oder ``None``.
"""
if raumfilter == "ohne":
return None
if verfahrensgebiet_layer is None:
return None
if raumfilter == "Verfahrensgebiet":
return qgiscore.get_layer_extent(verfahrensgebiet_layer)
if raumfilter == "Pufferlayer":
buffer_layer = qgiscore.create_buffer_layer(
source_layer=verfahrensgebiet_layer,
distance_m=1000.0,
layer_name="Verfahrensgebiet_Puffer_1km",
)
if buffer_layer is not None:
qgisui.add_layer_to_project(buffer_layer)
return qgiscore.get_layer_extent(buffer_layer)
return None
# ------------------------------------------------------------------ #
# Hilfsmethoden: Provider-URL und Datenabruf
# ------------------------------------------------------------------ #
def _build_provider_url(self, link: str, provider: str, bbox_geom: Optional[Any]) -> str:
"""
Baut eine Provider-spezifische Abruf-URL. Wenn `bbox_geom` übergeben
wird, wird sie in die URL eingebettet (außer bei WMS).
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
"""
provider_norm = (provider or "").upper()
base_link = link or ""
# WMS: niemals BBOX anhängen
if provider_norm == "WMS":
return base_link
if bbox_geom is None:
return base_link
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
bbox_str: Optional[str] = None
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(bbox_geom)
else:
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
else:
bbox_str = str(bbox_geom)
except Exception:
bbox_str = None
if not bbox_str:
return base_link
parsed = urlparse(base_link)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
if provider_norm == "WFS":
query_params.setdefault("BBOX", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
query_params.setdefault("f", query_params.get("f", "json"))
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
# Default: generischer bbox-Parameter
query_params.setdefault("bbox", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]:
"""
Führt den eigentlichen Abruf der Fachdaten durch.
Returns
-------
Tuple[List[Any], Optional[str]]
- features: Liste der geladenen Features (ggf. leer)
- error_msg: None bei Erfolg, sonst kurzer Fehlertext
"""
features: List[Any] = []
prov = str(provider).upper()
# WMS: kein Featureabruf; caller behandelt WMS separat (hier defensiv)
if prov == "WMS":
return [], None
# OGR / lokale Dateien: versuche QGIS-Layer (wenn QGIS verfügbar)
if prov in ("OGR", "GPKG", "SHP", "GEOJSON"):
if getattr(qgiscore, "QGIS_AVAILABLE", False):
try:
layer = qgiscore.QgsVectorLayer(url, "tmp", "ogr")
if not layer or not getattr(layer, "isValid", lambda: False)():
return [], "Layer ungültig oder konnte nicht geladen werden"
for feat in layer.getFeatures():
features.append(feat)
return features, None
except FileNotFoundError:
return [], "Lokale Datei nicht gefunden"
except Exception as exc:
return [], f"Fehler beim Laden der OGR-Quelle: {exc}"
else:
# Mock: falls GeoJSON-Datei vorhanden, versuche lokale Datei zu lesen
try:
if url.lower().endswith(".geojson"):
with open(url, "r", encoding="utf-8") as fh:
data = json.load(fh)
if isinstance(data, dict) and data.get("type") == "FeatureCollection":
return data.get("features", []), None
return [], "Keine QGIS-Umgebung und keine lesbare lokale GeoJSON"
except FileNotFoundError:
return [], "Lokale Datei nicht gefunden"
except Exception as exc:
return [], f"Fehler beim Lesen lokaler GeoJSON (Mock): {exc}"
# HTTP-basierte Dienste (WFS, REST/ArcGIS, generisch)
response_text: Optional[str] = None
http_error: Optional[str] = None
# QGIS NetworkAccessManager bevorzugen
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
try:
manager = qgiscore.QgsNetworkAccessManager.instance()
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None)
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None)
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None)
if QUrl is not None and QNetworkRequest is not None:
req = QNetworkRequest(QUrl(url))
reply = manager.get(req)
if QEventLoop is not None:
loop = QEventLoop()
reply.finished.connect(loop.quit)
loop.exec()
try:
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
try:
response_text = reply.text()
except Exception:
response_text = None
except Exception as exc:
http_error = f"QgsNetworkAccessManager error: {exc}"
response_text = None
# Fallback: requests
if response_text is None:
try:
import requests # lokal import, keine harte Abhängigkeit
r = requests.get(url, timeout=30)
r.raise_for_status()
response_text = r.text
except Exception as exc:
http_error = f"requests error: {exc}"
response_text = None
if response_text is None:
return [], http_error or "keine Antwort vom Server"
# Versuche JSON/GeoJSON zu parsen
try:
parsed = json.loads(response_text)
if isinstance(parsed, dict) and parsed.get("type") == "FeatureCollection":
return parsed.get("features", []), None
if isinstance(parsed, dict) and "features" in parsed:
return parsed.get("features", []), None
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
return [parsed], None
except json.JSONDecodeError:
# Nicht-JSON-Antwort (z. B. GML). Wenn QGIS verfügbar, versuche GML via temporärer Datei + OGR
if getattr(qgiscore, "QGIS_AVAILABLE", False):
try:
import tempfile
with tempfile.NamedTemporaryFile(suffix=".gml", delete=False, mode="w", encoding="utf-8") as fh:
fh.write(response_text)
tmp_path = fh.name
layer = qgiscore.QgsVectorLayer(tmp_path, "tmp_gml", "ogr")
if layer and getattr(layer, "isValid", lambda: False)():
for feat in layer.getFeatures():
features.append(feat)
return features, None
return [], "GML-Antwort konnte nicht als Layer geladen werden"
except Exception as exc:
return [], f"Fehler beim Parsen von GML: {exc}"
# Wenn alles fehlschlägt:
return [], "Antwort konnte nicht als JSON oder GML geparst werden"