forked from AG_QGIS/Plugin_SN_Basis
406 lines
17 KiB
Python
406 lines
17 KiB
Python
|
|
# sn_basis/modules/Datenabruf.py
|
|||
|
|
"""
|
|||
|
|
Modul ``datenabruf``
|
|||
|
|
|
|||
|
|
Enthält die Klasse :class:`Datenabruf`, die für eine Menge bereits
|
|||
|
|
validierter Links (aus ``validate_rows``) die Fachdaten abruft und
|
|||
|
|
aggregierte Prüfergebnisse liefert.
|
|||
|
|
|
|||
|
|
Designprinzipien
|
|||
|
|
----------------
|
|||
|
|
- Die BBOX wird serverseitig angewendet: wenn ein Raumfilter aktiv ist,
|
|||
|
|
wird die BBOX in die Abruf-URL eingebettet (außer bei WMS).
|
|||
|
|
- Alle QGIS-Interaktionen laufen über die Wrapper `qgiscore_wrapper` und
|
|||
|
|
`qgisui_wrapper`.
|
|||
|
|
- Fehler werden als kurze Strings zurückgegeben und zentral in `log_fehler`
|
|||
|
|
gesammelt; erfolgreiche Aufrufe werden in `log_geladen` protokolliert.
|
|||
|
|
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from typing import Any, Dict, List, Mapping, Optional, Tuple
|
|||
|
|
|
|||
|
|
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
|
|||
|
|
import json
|
|||
|
|
|
|||
|
|
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
|||
|
|
from sn_basis.functions import qgiscore_wrapper as qgiscore
|
|||
|
|
from sn_basis.functions import qgisui_wrapper as qgisui
|
|||
|
|
from sn_basis.functions import qt_wrapper as qt
|
|||
|
|
|
|||
|
|
DataDict = Dict[str, List[Mapping[str, Any]]]
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Datenabruf:
|
|||
|
|
"""
|
|||
|
|
Führt den eigentlichen Fachdatenabruf für eine Menge validierter Links durch.
|
|||
|
|
|
|||
|
|
Erwartet ein ``DataDict`` der Form ``{"rows": [row1, row2, ...]}``.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, pruefmanager: Any) -> None:
|
|||
|
|
"""
|
|||
|
|
Initialisiert eine neue Instanz des Datenabrufs.
|
|||
|
|
|
|||
|
|
Parameters
|
|||
|
|
----------
|
|||
|
|
pruefmanager:
|
|||
|
|
Instanz des Pruefmanagers, der :class:`pruef_ergebnis` verarbeitet.
|
|||
|
|
"""
|
|||
|
|
self.pruefmanager = pruefmanager
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Öffentliche API
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
|
|||
|
|
def datenabruf(
|
|||
|
|
self,
|
|||
|
|
result_dict: DataDict,
|
|||
|
|
raumfilter: str,
|
|||
|
|
verfahrensgebiet_layer: Any,
|
|||
|
|
speicherort: str,
|
|||
|
|
pruef_ergebnisse: Optional[List[Any]] = None,
|
|||
|
|
) -> Tuple[Dict[str, Any], List[Any]]:
|
|||
|
|
"""
|
|||
|
|
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
|
|||
|
|
liefert ein Daten‑Dict sowie die Liste verarbeiteter Pruefergebnisse.
|
|||
|
|
|
|||
|
|
Logging / Aggregation
|
|||
|
|
---------------------
|
|||
|
|
Am Ende enthält das zusammenfassende PruefErgebnis im Kontext:
|
|||
|
|
- geladen: dict(dienst -> anzahl geladen)
|
|||
|
|
- fehler: dict(dienst -> fehlermeldung)
|
|||
|
|
- relevant: dict(dienst -> anzahl relevant)
|
|||
|
|
- ausserhalb: dict(dienst -> anzahl geladen, aber ausserhalb)
|
|||
|
|
"""
|
|||
|
|
if pruef_ergebnisse is None:
|
|||
|
|
processed_results: List[Any] = []
|
|||
|
|
else:
|
|||
|
|
processed_results = list(pruef_ergebnisse)
|
|||
|
|
|
|||
|
|
rows = result_dict.get("rows", [])
|
|||
|
|
daten: Dict[str, List[Any]] = {}
|
|||
|
|
|
|||
|
|
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
|
|||
|
|
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
|
|||
|
|
|
|||
|
|
# Globale Logs über alle Dienste hinweg
|
|||
|
|
log_geladen: Dict[str, int] = {}
|
|||
|
|
log_fehler: Dict[str, str] = {}
|
|||
|
|
log_relevant: Dict[str, int] = {}
|
|||
|
|
log_ausserhalb: Dict[str, int] = {}
|
|||
|
|
|
|||
|
|
# 2) Über alle Zeilen iterieren
|
|||
|
|
for row in rows:
|
|||
|
|
ident = row.get("ident")
|
|||
|
|
link = row.get("Link")
|
|||
|
|
provider = row.get("Provider")
|
|||
|
|
|
|||
|
|
if not ident or not link or not provider:
|
|||
|
|
pe = pruef_ergebnis(
|
|||
|
|
ok=False,
|
|||
|
|
meldung="Ungültige Zeile im Datenabruf (fehlende Pflichtfelder)",
|
|||
|
|
aktion="pflichtfelder_fehlen",
|
|||
|
|
kontext=row,
|
|||
|
|
)
|
|||
|
|
processed_results.append(self.pruefmanager.verarbeite(pe))
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Lesbarer Dienstname für Logs
|
|||
|
|
thema = row.get("Inhalt") or row.get("Thema") or row.get("Titel") or str(ident)
|
|||
|
|
|
|||
|
|
# 2a) Provider-spezifische URL zusammenbauen
|
|||
|
|
# Wenn Raumfilter aktiv ist, übergeben wir bbox_geom an _build_provider_url,
|
|||
|
|
# außer bei WMS (WMS bleibt unverändert).
|
|||
|
|
use_bbox = (raumfilter != "ohne") and (str(provider).upper() != "WMS")
|
|||
|
|
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
|
|||
|
|
|
|||
|
|
# 2b) Fachdaten abrufen
|
|||
|
|
features, error_msg = self._fetch_features(url=url, provider=str(provider))
|
|||
|
|
|
|||
|
|
# 2c) Logs und Aggregation
|
|||
|
|
if error_msg:
|
|||
|
|
# Fehler beim Abruf
|
|||
|
|
log_fehler[thema] = error_msg
|
|||
|
|
pe_err = pruef_ergebnis(
|
|||
|
|
ok=False,
|
|||
|
|
meldung=f"Fehler beim Abruf von {thema}: {error_msg}",
|
|||
|
|
aktion="url_nicht_erreichbar",
|
|||
|
|
kontext={"ident": ident, "thema": thema, "url": url, "error": error_msg},
|
|||
|
|
)
|
|||
|
|
processed_results.append(self.pruefmanager.verarbeite(pe_err))
|
|||
|
|
# daten[ident] bleibt nicht gesetzt oder leer
|
|||
|
|
daten[str(ident)] = []
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Erfolgreich aufgerufen (auch wenn features == [])
|
|||
|
|
anzahl_geladen = len(features)
|
|||
|
|
log_geladen[thema] = anzahl_geladen
|
|||
|
|
|
|||
|
|
# Da die BBOX serverseitig angewendet wurde:
|
|||
|
|
# - anzahl_geladen > 0 -> relevant
|
|||
|
|
# - anzahl_geladen == 0 -> ausserhalb
|
|||
|
|
if anzahl_geladen > 0:
|
|||
|
|
log_relevant[thema] = anzahl_geladen
|
|||
|
|
daten[str(ident)] = features
|
|||
|
|
else:
|
|||
|
|
log_ausserhalb[thema] = 0
|
|||
|
|
daten[str(ident)] = []
|
|||
|
|
|
|||
|
|
# 2d) Kurzes Prüfergebnis pro Zeile
|
|||
|
|
pe_row = pruef_ergebnis(
|
|||
|
|
ok=True,
|
|||
|
|
meldung=(
|
|||
|
|
f"Datenabruf für ident={ident}: {anzahl_geladen} geladene Objekte"
|
|||
|
|
),
|
|||
|
|
aktion="datenabruf",
|
|||
|
|
kontext={
|
|||
|
|
"ident": ident,
|
|||
|
|
"thema": thema,
|
|||
|
|
"anzahl_gesamt": anzahl_geladen,
|
|||
|
|
"url": url,
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
processed_results.append(self.pruefmanager.verarbeite(pe_row))
|
|||
|
|
|
|||
|
|
# 3) Zusammenfassendes Prüfergebnis (wie alter DataGrabber)
|
|||
|
|
summary_kontext = {
|
|||
|
|
"geladen": log_geladen,
|
|||
|
|
"fehler": log_fehler,
|
|||
|
|
"relevant": log_relevant,
|
|||
|
|
"ausserhalb": log_ausserhalb,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pe_summary = pruef_ergebnis(
|
|||
|
|
ok=(len(log_fehler) == 0),
|
|||
|
|
meldung=(
|
|||
|
|
f"Datenabruf abgeschlossen: {len(log_geladen)} Dienste geladen, "
|
|||
|
|
f"{len(log_fehler)} Fehler"
|
|||
|
|
),
|
|||
|
|
aktion="datenabruf",
|
|||
|
|
kontext=summary_kontext,
|
|||
|
|
)
|
|||
|
|
processed_results.append(self.pruefmanager.verarbeite(pe_summary))
|
|||
|
|
|
|||
|
|
daten_dict: Dict[str, Any] = {
|
|||
|
|
"speicherort": speicherort,
|
|||
|
|
"daten": daten,
|
|||
|
|
}
|
|||
|
|
return daten_dict, processed_results
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Hilfsmethoden: räumlicher Filter
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
|
|||
|
|
def _determine_spatial_filter(self, raumfilter: str, verfahrensgebiet_layer: Any) -> Optional[Any]:
|
|||
|
|
"""
|
|||
|
|
Bestimmt die räumliche Filtergeometrie (BBox) abhängig vom Raumfilter.
|
|||
|
|
|
|||
|
|
Returns
|
|||
|
|
-------
|
|||
|
|
Optional[Any]
|
|||
|
|
Eine Geometrie/Extent (z. B. QgsRectangle) oder ``None``.
|
|||
|
|
"""
|
|||
|
|
if raumfilter == "ohne":
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
if verfahrensgebiet_layer is None:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
if raumfilter == "Verfahrensgebiet":
|
|||
|
|
return qgiscore.get_layer_extent(verfahrensgebiet_layer)
|
|||
|
|
|
|||
|
|
if raumfilter == "Pufferlayer":
|
|||
|
|
buffer_layer = qgiscore.create_buffer_layer(
|
|||
|
|
source_layer=verfahrensgebiet_layer,
|
|||
|
|
distance_m=1000.0,
|
|||
|
|
layer_name="Verfahrensgebiet_Puffer_1km",
|
|||
|
|
)
|
|||
|
|
if buffer_layer is not None:
|
|||
|
|
qgisui.add_layer_to_project(buffer_layer)
|
|||
|
|
return qgiscore.get_layer_extent(buffer_layer)
|
|||
|
|
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
# Hilfsmethoden: Provider-URL und Datenabruf
|
|||
|
|
# ------------------------------------------------------------------ #
|
|||
|
|
|
|||
|
|
def _build_provider_url(self, link: str, provider: str, bbox_geom: Optional[Any]) -> str:
|
|||
|
|
"""
|
|||
|
|
Baut eine Provider-spezifische Abruf-URL. Wenn `bbox_geom` übergeben
|
|||
|
|
wird, wird sie in die URL eingebettet (außer bei WMS).
|
|||
|
|
|
|||
|
|
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
|
|||
|
|
"""
|
|||
|
|
provider_norm = (provider or "").upper()
|
|||
|
|
base_link = link or ""
|
|||
|
|
|
|||
|
|
# WMS: niemals BBOX anhängen
|
|||
|
|
if provider_norm == "WMS":
|
|||
|
|
return base_link
|
|||
|
|
|
|||
|
|
if bbox_geom is None:
|
|||
|
|
return base_link
|
|||
|
|
|
|||
|
|
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
|
|||
|
|
bbox_str: Optional[str] = None
|
|||
|
|
try:
|
|||
|
|
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
|
|||
|
|
if callable(extent_to_bbox):
|
|||
|
|
bbox_str = extent_to_bbox(bbox_geom)
|
|||
|
|
else:
|
|||
|
|
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
|
|||
|
|
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
|
|||
|
|
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
|
|||
|
|
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
|
|||
|
|
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
|
|||
|
|
else:
|
|||
|
|
bbox_str = str(bbox_geom)
|
|||
|
|
except Exception:
|
|||
|
|
bbox_str = None
|
|||
|
|
|
|||
|
|
if not bbox_str:
|
|||
|
|
return base_link
|
|||
|
|
|
|||
|
|
parsed = urlparse(base_link)
|
|||
|
|
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
|
|||
|
|
|
|||
|
|
if provider_norm == "WFS":
|
|||
|
|
query_params.setdefault("BBOX", bbox_str)
|
|||
|
|
new_query = urlencode(query_params, doseq=True)
|
|||
|
|
rebuilt = parsed._replace(query=new_query)
|
|||
|
|
return urlunparse(rebuilt)
|
|||
|
|
|
|||
|
|
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
|
|||
|
|
query_params.setdefault("geometry", bbox_str)
|
|||
|
|
query_params.setdefault("geometryType", "esriGeometryEnvelope")
|
|||
|
|
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
|
|||
|
|
query_params.setdefault("f", query_params.get("f", "json"))
|
|||
|
|
new_query = urlencode(query_params, doseq=True)
|
|||
|
|
rebuilt = parsed._replace(query=new_query)
|
|||
|
|
return urlunparse(rebuilt)
|
|||
|
|
|
|||
|
|
# Default: generischer bbox-Parameter
|
|||
|
|
query_params.setdefault("bbox", bbox_str)
|
|||
|
|
new_query = urlencode(query_params, doseq=True)
|
|||
|
|
rebuilt = parsed._replace(query=new_query)
|
|||
|
|
return urlunparse(rebuilt)
|
|||
|
|
|
|||
|
|
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]:
|
|||
|
|
"""
|
|||
|
|
Führt den eigentlichen Abruf der Fachdaten durch.
|
|||
|
|
|
|||
|
|
Returns
|
|||
|
|
-------
|
|||
|
|
Tuple[List[Any], Optional[str]]
|
|||
|
|
- features: Liste der geladenen Features (ggf. leer)
|
|||
|
|
- error_msg: None bei Erfolg, sonst kurzer Fehlertext
|
|||
|
|
"""
|
|||
|
|
features: List[Any] = []
|
|||
|
|
prov = str(provider).upper()
|
|||
|
|
|
|||
|
|
# WMS: kein Featureabruf; caller behandelt WMS separat (hier defensiv)
|
|||
|
|
if prov == "WMS":
|
|||
|
|
return [], None
|
|||
|
|
|
|||
|
|
# OGR / lokale Dateien: versuche QGIS-Layer (wenn QGIS verfügbar)
|
|||
|
|
if prov in ("OGR", "GPKG", "SHP", "GEOJSON"):
|
|||
|
|
if getattr(qgiscore, "QGIS_AVAILABLE", False):
|
|||
|
|
try:
|
|||
|
|
layer = qgiscore.QgsVectorLayer(url, "tmp", "ogr")
|
|||
|
|
if not layer or not getattr(layer, "isValid", lambda: False)():
|
|||
|
|
return [], "Layer ungültig oder konnte nicht geladen werden"
|
|||
|
|
for feat in layer.getFeatures():
|
|||
|
|
features.append(feat)
|
|||
|
|
return features, None
|
|||
|
|
except FileNotFoundError:
|
|||
|
|
return [], "Lokale Datei nicht gefunden"
|
|||
|
|
except Exception as exc:
|
|||
|
|
return [], f"Fehler beim Laden der OGR-Quelle: {exc}"
|
|||
|
|
else:
|
|||
|
|
# Mock: falls GeoJSON-Datei vorhanden, versuche lokale Datei zu lesen
|
|||
|
|
try:
|
|||
|
|
if url.lower().endswith(".geojson"):
|
|||
|
|
with open(url, "r", encoding="utf-8") as fh:
|
|||
|
|
data = json.load(fh)
|
|||
|
|
if isinstance(data, dict) and data.get("type") == "FeatureCollection":
|
|||
|
|
return data.get("features", []), None
|
|||
|
|
return [], "Keine QGIS-Umgebung und keine lesbare lokale GeoJSON"
|
|||
|
|
except FileNotFoundError:
|
|||
|
|
return [], "Lokale Datei nicht gefunden"
|
|||
|
|
except Exception as exc:
|
|||
|
|
return [], f"Fehler beim Lesen lokaler GeoJSON (Mock): {exc}"
|
|||
|
|
|
|||
|
|
# HTTP-basierte Dienste (WFS, REST/ArcGIS, generisch)
|
|||
|
|
response_text: Optional[str] = None
|
|||
|
|
http_error: Optional[str] = None
|
|||
|
|
|
|||
|
|
# QGIS NetworkAccessManager bevorzugen
|
|||
|
|
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
|
|||
|
|
try:
|
|||
|
|
manager = qgiscore.QgsNetworkAccessManager.instance()
|
|||
|
|
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None)
|
|||
|
|
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None)
|
|||
|
|
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None)
|
|||
|
|
if QUrl is not None and QNetworkRequest is not None:
|
|||
|
|
req = QNetworkRequest(QUrl(url))
|
|||
|
|
reply = manager.get(req)
|
|||
|
|
if QEventLoop is not None:
|
|||
|
|
loop = QEventLoop()
|
|||
|
|
reply.finished.connect(loop.quit)
|
|||
|
|
loop.exec()
|
|||
|
|
try:
|
|||
|
|
raw = reply.readAll()
|
|||
|
|
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
|
|||
|
|
response_text = data_bytes.decode("utf-8", errors="replace")
|
|||
|
|
except Exception:
|
|||
|
|
try:
|
|||
|
|
response_text = reply.text()
|
|||
|
|
except Exception:
|
|||
|
|
response_text = None
|
|||
|
|
except Exception as exc:
|
|||
|
|
http_error = f"QgsNetworkAccessManager error: {exc}"
|
|||
|
|
response_text = None
|
|||
|
|
|
|||
|
|
# Fallback: requests
|
|||
|
|
if response_text is None:
|
|||
|
|
try:
|
|||
|
|
import requests # lokal import, keine harte Abhängigkeit
|
|||
|
|
r = requests.get(url, timeout=30)
|
|||
|
|
r.raise_for_status()
|
|||
|
|
response_text = r.text
|
|||
|
|
except Exception as exc:
|
|||
|
|
http_error = f"requests error: {exc}"
|
|||
|
|
response_text = None
|
|||
|
|
|
|||
|
|
if response_text is None:
|
|||
|
|
return [], http_error or "keine Antwort vom Server"
|
|||
|
|
|
|||
|
|
# Versuche JSON/GeoJSON zu parsen
|
|||
|
|
try:
|
|||
|
|
parsed = json.loads(response_text)
|
|||
|
|
if isinstance(parsed, dict) and parsed.get("type") == "FeatureCollection":
|
|||
|
|
return parsed.get("features", []), None
|
|||
|
|
if isinstance(parsed, dict) and "features" in parsed:
|
|||
|
|
return parsed.get("features", []), None
|
|||
|
|
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
|
|||
|
|
return [parsed], None
|
|||
|
|
except json.JSONDecodeError:
|
|||
|
|
# Nicht-JSON-Antwort (z. B. GML). Wenn QGIS verfügbar, versuche GML via temporärer Datei + OGR
|
|||
|
|
if getattr(qgiscore, "QGIS_AVAILABLE", False):
|
|||
|
|
try:
|
|||
|
|
import tempfile
|
|||
|
|
with tempfile.NamedTemporaryFile(suffix=".gml", delete=False, mode="w", encoding="utf-8") as fh:
|
|||
|
|
fh.write(response_text)
|
|||
|
|
tmp_path = fh.name
|
|||
|
|
layer = qgiscore.QgsVectorLayer(tmp_path, "tmp_gml", "ogr")
|
|||
|
|
if layer and getattr(layer, "isValid", lambda: False)():
|
|||
|
|
for feat in layer.getFeatures():
|
|||
|
|
features.append(feat)
|
|||
|
|
return features, None
|
|||
|
|
return [], "GML-Antwort konnte nicht als Layer geladen werden"
|
|||
|
|
except Exception as exc:
|
|||
|
|
return [], f"Fehler beim Parsen von GML: {exc}"
|
|||
|
|
# Wenn alles fehlschlägt:
|
|||
|
|
return [], "Antwort konnte nicht als JSON oder GML geparst werden"
|