# sn_basis/modules/Datenabruf.py """ Modul ``datenabruf`` Enthält die Klasse :class:`Datenabruf`, die für eine Menge bereits validierter Links (aus ``validate_rows``) die Fachdaten abruft und aggregierte Prüfergebnisse liefert. Designprinzipien ---------------- - Die BBOX wird serverseitig angewendet: wenn ein Raumfilter aktiv ist, wird die BBOX in die Abruf-URL eingebettet (außer bei WMS). - Alle QGIS-Interaktionen laufen über die Wrapper `qgiscore_wrapper` und `qgisui_wrapper`. - Fehler werden als kurze Strings zurückgegeben und zentral in `log_fehler` gesammelt; erfolgreiche Aufrufe werden in `log_geladen` protokolliert. - Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten. """ from typing import Any, Dict, List, Mapping, Optional, Tuple from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse import json from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.functions import qgiscore_wrapper as qgiscore from sn_basis.functions import qgisui_wrapper as qgisui from sn_basis.functions import qt_wrapper as qt DataDict = Dict[str, List[Mapping[str, Any]]] class Datenabruf: """ Führt den eigentlichen Fachdatenabruf für eine Menge validierter Links durch. Erwartet ein ``DataDict`` der Form ``{"rows": [row1, row2, ...]}``. """ def __init__(self, pruefmanager: Any) -> None: """ Initialisiert eine neue Instanz des Datenabrufs. Parameters ---------- pruefmanager: Instanz des Pruefmanagers, der :class:`pruef_ergebnis` verarbeitet. """ self.pruefmanager = pruefmanager # ------------------------------------------------------------------ # # Öffentliche API # ------------------------------------------------------------------ # def datenabruf( self, result_dict: DataDict, raumfilter: str, verfahrensgebiet_layer: Any, speicherort: str, pruef_ergebnisse: Optional[List[Any]] = None, ) -> Tuple[Dict[str, Any], List[Any]]: """ Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und liefert ein Daten‑Dict sowie die Liste verarbeiteter Pruefergebnisse. Logging / Aggregation --------------------- Am Ende enthält das zusammenfassende PruefErgebnis im Kontext: - geladen: dict(dienst -> anzahl geladen) - fehler: dict(dienst -> fehlermeldung) - relevant: dict(dienst -> anzahl relevant) - ausserhalb: dict(dienst -> anzahl geladen, aber ausserhalb) """ if pruef_ergebnisse is None: processed_results: List[Any] = [] else: processed_results = list(pruef_ergebnisse) rows = result_dict.get("rows", []) daten: Dict[str, List[Any]] = {} # 1) Räumliche Filtergeometrie bestimmen (BBox oder None) bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer) # Globale Logs über alle Dienste hinweg log_geladen: Dict[str, int] = {} log_fehler: Dict[str, str] = {} log_relevant: Dict[str, int] = {} log_ausserhalb: Dict[str, int] = {} # 2) Über alle Zeilen iterieren for row in rows: ident = row.get("ident") link = row.get("Link") provider = row.get("Provider") if not ident or not link or not provider: pe = pruef_ergebnis( ok=False, meldung="Ungültige Zeile im Datenabruf (fehlende Pflichtfelder)", aktion="pflichtfelder_fehlen", kontext=row, ) processed_results.append(self.pruefmanager.verarbeite(pe)) continue # Lesbarer Dienstname für Logs thema = row.get("Inhalt") or row.get("Thema") or row.get("Titel") or str(ident) # 2a) Provider-spezifische URL zusammenbauen # Wenn Raumfilter aktiv ist, übergeben wir bbox_geom an _build_provider_url, # außer bei WMS (WMS bleibt unverändert). use_bbox = (raumfilter != "ohne") and (str(provider).upper() != "WMS") url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None) # 2b) Fachdaten abrufen features, error_msg = self._fetch_features(url=url, provider=str(provider)) # 2c) Logs und Aggregation if error_msg: # Fehler beim Abruf log_fehler[thema] = error_msg pe_err = pruef_ergebnis( ok=False, meldung=f"Fehler beim Abruf von {thema}: {error_msg}", aktion="url_nicht_erreichbar", kontext={"ident": ident, "thema": thema, "url": url, "error": error_msg}, ) processed_results.append(self.pruefmanager.verarbeite(pe_err)) # daten[ident] bleibt nicht gesetzt oder leer daten[str(ident)] = [] continue # Erfolgreich aufgerufen (auch wenn features == []) anzahl_geladen = len(features) log_geladen[thema] = anzahl_geladen # Da die BBOX serverseitig angewendet wurde: # - anzahl_geladen > 0 -> relevant # - anzahl_geladen == 0 -> ausserhalb if anzahl_geladen > 0: log_relevant[thema] = anzahl_geladen daten[str(ident)] = features else: log_ausserhalb[thema] = 0 daten[str(ident)] = [] # 2d) Kurzes Prüfergebnis pro Zeile pe_row = pruef_ergebnis( ok=True, meldung=( f"Datenabruf für ident={ident}: {anzahl_geladen} geladene Objekte" ), aktion="datenabruf", kontext={ "ident": ident, "thema": thema, "anzahl_gesamt": anzahl_geladen, "url": url, }, ) processed_results.append(self.pruefmanager.verarbeite(pe_row)) # 3) Zusammenfassendes Prüfergebnis (wie alter DataGrabber) summary_kontext = { "geladen": log_geladen, "fehler": log_fehler, "relevant": log_relevant, "ausserhalb": log_ausserhalb, } pe_summary = pruef_ergebnis( ok=(len(log_fehler) == 0), meldung=( f"Datenabruf abgeschlossen: {len(log_geladen)} Dienste geladen, " f"{len(log_fehler)} Fehler" ), aktion="datenabruf", kontext=summary_kontext, ) processed_results.append(self.pruefmanager.verarbeite(pe_summary)) daten_dict: Dict[str, Any] = { "speicherort": speicherort, "daten": daten, } return daten_dict, processed_results # ------------------------------------------------------------------ # # Hilfsmethoden: räumlicher Filter # ------------------------------------------------------------------ # def _determine_spatial_filter(self, raumfilter: str, verfahrensgebiet_layer: Any) -> Optional[Any]: """ Bestimmt die räumliche Filtergeometrie (BBox) abhängig vom Raumfilter. Returns ------- Optional[Any] Eine Geometrie/Extent (z. B. QgsRectangle) oder ``None``. """ if raumfilter == "ohne": return None if verfahrensgebiet_layer is None: return None if raumfilter == "Verfahrensgebiet": return qgiscore.get_layer_extent(verfahrensgebiet_layer) if raumfilter == "Pufferlayer": buffer_layer = qgiscore.create_buffer_layer( source_layer=verfahrensgebiet_layer, distance_m=1000.0, layer_name="Verfahrensgebiet_Puffer_1km", ) if buffer_layer is not None: qgisui.add_layer_to_project(buffer_layer) return qgiscore.get_layer_extent(buffer_layer) return None # ------------------------------------------------------------------ # # Hilfsmethoden: Provider-URL und Datenabruf # ------------------------------------------------------------------ # def _build_provider_url(self, link: str, provider: str, bbox_geom: Optional[Any]) -> str: """ Baut eine Provider-spezifische Abruf-URL. Wenn `bbox_geom` übergeben wird, wird sie in die URL eingebettet (außer bei WMS). Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS"). """ provider_norm = (provider or "").upper() base_link = link or "" # WMS: niemals BBOX anhängen if provider_norm == "WMS": return base_link if bbox_geom is None: return base_link # Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden) bbox_str: Optional[str] = None try: extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None) if callable(extent_to_bbox): bbox_str = extent_to_bbox(bbox_geom) else: # Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing) if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")): bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}" elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4: bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}" else: bbox_str = str(bbox_geom) except Exception: bbox_str = None if not bbox_str: return base_link parsed = urlparse(base_link) query_params = dict(parse_qsl(parsed.query, keep_blank_values=True)) if provider_norm == "WFS": query_params.setdefault("BBOX", bbox_str) new_query = urlencode(query_params, doseq=True) rebuilt = parsed._replace(query=new_query) return urlunparse(rebuilt) if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"): query_params.setdefault("geometry", bbox_str) query_params.setdefault("geometryType", "esriGeometryEnvelope") query_params.setdefault("spatialRel", "esriSpatialRelIntersects") query_params.setdefault("f", query_params.get("f", "json")) new_query = urlencode(query_params, doseq=True) rebuilt = parsed._replace(query=new_query) return urlunparse(rebuilt) # Default: generischer bbox-Parameter query_params.setdefault("bbox", bbox_str) new_query = urlencode(query_params, doseq=True) rebuilt = parsed._replace(query=new_query) return urlunparse(rebuilt) def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]: """ Führt den eigentlichen Abruf der Fachdaten durch. Returns ------- Tuple[List[Any], Optional[str]] - features: Liste der geladenen Features (ggf. leer) - error_msg: None bei Erfolg, sonst kurzer Fehlertext """ features: List[Any] = [] prov = str(provider).upper() # WMS: kein Featureabruf; caller behandelt WMS separat (hier defensiv) if prov == "WMS": return [], None # OGR / lokale Dateien: versuche QGIS-Layer (wenn QGIS verfügbar) if prov in ("OGR", "GPKG", "SHP", "GEOJSON"): if getattr(qgiscore, "QGIS_AVAILABLE", False): try: layer = qgiscore.QgsVectorLayer(url, "tmp", "ogr") if not layer or not getattr(layer, "isValid", lambda: False)(): return [], "Layer ungültig oder konnte nicht geladen werden" for feat in layer.getFeatures(): features.append(feat) return features, None except FileNotFoundError: return [], "Lokale Datei nicht gefunden" except Exception as exc: return [], f"Fehler beim Laden der OGR-Quelle: {exc}" else: # Mock: falls GeoJSON-Datei vorhanden, versuche lokale Datei zu lesen try: if url.lower().endswith(".geojson"): with open(url, "r", encoding="utf-8") as fh: data = json.load(fh) if isinstance(data, dict) and data.get("type") == "FeatureCollection": return data.get("features", []), None return [], "Keine QGIS-Umgebung und keine lesbare lokale GeoJSON" except FileNotFoundError: return [], "Lokale Datei nicht gefunden" except Exception as exc: return [], f"Fehler beim Lesen lokaler GeoJSON (Mock): {exc}" # HTTP-basierte Dienste (WFS, REST/ArcGIS, generisch) response_text: Optional[str] = None http_error: Optional[str] = None # QGIS NetworkAccessManager bevorzugen if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None: try: manager = qgiscore.QgsNetworkAccessManager.instance() QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None) QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None) QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None) if QUrl is not None and QNetworkRequest is not None: req = QNetworkRequest(QUrl(url)) reply = manager.get(req) if QEventLoop is not None: loop = QEventLoop() reply.finished.connect(loop.quit) loop.exec() try: raw = reply.readAll() data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw response_text = data_bytes.decode("utf-8", errors="replace") except Exception: try: response_text = reply.text() except Exception: response_text = None except Exception as exc: http_error = f"QgsNetworkAccessManager error: {exc}" response_text = None # Fallback: requests if response_text is None: try: import requests # lokal import, keine harte Abhängigkeit r = requests.get(url, timeout=30) r.raise_for_status() response_text = r.text except Exception as exc: http_error = f"requests error: {exc}" response_text = None if response_text is None: return [], http_error or "keine Antwort vom Server" # Versuche JSON/GeoJSON zu parsen try: parsed = json.loads(response_text) if isinstance(parsed, dict) and parsed.get("type") == "FeatureCollection": return parsed.get("features", []), None if isinstance(parsed, dict) and "features" in parsed: return parsed.get("features", []), None # Sonst: gib das gesamte JSON als einzelnes Objekt zurück return [parsed], None except json.JSONDecodeError: # Nicht-JSON-Antwort (z. B. GML). Wenn QGIS verfügbar, versuche GML via temporärer Datei + OGR if getattr(qgiscore, "QGIS_AVAILABLE", False): try: import tempfile with tempfile.NamedTemporaryFile(suffix=".gml", delete=False, mode="w", encoding="utf-8") as fh: fh.write(response_text) tmp_path = fh.name layer = qgiscore.QgsVectorLayer(tmp_path, "tmp_gml", "ogr") if layer and getattr(layer, "isValid", lambda: False)(): for feat in layer.getFeatures(): features.append(feat) return features, None return [], "GML-Antwort konnte nicht als Layer geladen werden" except Exception as exc: return [], f"Fehler beim Parsen von GML: {exc}" # Wenn alles fehlschlägt: return [], "Antwort konnte nicht als JSON oder GML geparst werden"