Diensteabruf integriert

This commit is contained in:
2026-03-12 16:14:02 +01:00
parent ae956b0046
commit 9829ac9c81
10 changed files with 1136 additions and 89 deletions

View File

@@ -17,10 +17,11 @@ Designprinzipien
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
"""
from typing import Any, Dict, List, Mapping, Optional, Tuple
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import json
import time
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qgiscore_wrapper as qgiscore
@@ -59,6 +60,7 @@ class Datenabruf:
verfahrensgebiet_layer: Any,
speicherort: str,
pruef_ergebnisse: Optional[List[Any]] = None,
progress: Optional[Any] = None,
) -> Tuple[Dict[str, Any], List[Any]]:
"""
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
@@ -82,6 +84,10 @@ class Datenabruf:
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
filter_crs_authid = None
if isinstance(bbox_geom, dict):
raw_crs = bbox_geom.get("crs_authid")
filter_crs_authid = str(raw_crs) if raw_crs else None
# Globale Logs über alle Dienste hinweg
log_geladen: Dict[str, int] = {}
@@ -90,7 +96,20 @@ class Datenabruf:
log_ausserhalb: Dict[str, int] = {}
# 2) Über alle Zeilen iterieren
for row in rows:
total_rows = len(rows)
for idx, row in enumerate(rows, start=1):
if progress is not None:
progress.set_label(f"Datenabruf {idx}/{total_rows}")
if progress.is_canceled():
pe_cancel = pruef_ergebnis(
ok=False,
meldung="Datenabruf durch Benutzer abgebrochen",
aktion="abbruch",
kontext={"schritt": idx},
)
processed_results.append(self.pruefmanager.verarbeite(pe_cancel))
break
ident = row.get("ident")
link = row.get("Link")
provider = row.get("Provider")
@@ -115,7 +134,16 @@ class Datenabruf:
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
# 2b) Fachdaten abrufen
features, error_msg = self._fetch_features(url=url, provider=str(provider))
features, error_msg = self._fetch_features(
url=url,
provider=str(provider),
cancel_callback=(progress.is_canceled if progress is not None else None),
)
if progress is not None:
if hasattr(progress, "set_value"):
progress.set_value(idx)
# 2c) Logs und Aggregation
if error_msg:
@@ -207,7 +235,18 @@ class Datenabruf:
return None
if raumfilter == "Verfahrensgebiet":
return qgiscore.get_layer_extent(verfahrensgebiet_layer)
extent = qgiscore.get_layer_extent(verfahrensgebiet_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(verfahrensgebiet_layer, "crs") and callable(getattr(verfahrensgebiet_layer, "crs")):
crs = verfahrensgebiet_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
if raumfilter == "Pufferlayer":
buffer_layer = qgiscore.create_buffer_layer(
@@ -216,8 +255,18 @@ class Datenabruf:
layer_name="Verfahrensgebiet_Puffer_1km",
)
if buffer_layer is not None:
qgisui.add_layer_to_project(buffer_layer)
return qgiscore.get_layer_extent(buffer_layer)
extent = qgiscore.get_layer_extent(buffer_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(buffer_layer, "crs") and callable(getattr(buffer_layer, "crs")):
crs = buffer_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
return None
@@ -233,60 +282,130 @@ class Datenabruf:
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
"""
provider_norm = (provider or "").upper()
base_link = link or ""
base_link = (link or "").strip()
if base_link.lower().startswith("url="):
base_link = base_link[4:].strip()
# WMS: niemals BBOX anhängen
if provider_norm == "WFS" and base_link.count("?") > 1:
first, rest = base_link.split("?", 1)
base_link = f"{first}?{rest.replace('?', '&')}"
extent_obj = bbox_geom
crs_authid: Optional[str] = None
if isinstance(bbox_geom, dict):
extent_obj = bbox_geom.get("extent")
raw_crs = bbox_geom.get("crs_authid")
crs_authid = str(raw_crs) if raw_crs else None
# WMS: unverändert durchreichen
if provider_norm == "WMS":
return base_link
if bbox_geom is None:
return base_link
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
# Versuche bbox-String zu erzeugen (falls Raumfilter aktiv)
bbox_str: Optional[str] = None
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(bbox_geom)
else:
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
if extent_obj is not None:
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(extent_obj)
else:
bbox_str = str(bbox_geom)
except Exception:
bbox_str = None
if not bbox_str:
return base_link
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
bbox_str = f"{extent_obj.xmin()},{extent_obj.ymin()},{extent_obj.xmax()},{extent_obj.ymax()}"
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
bbox_str = f"{extent_obj[0]},{extent_obj[1]},{extent_obj[2]},{extent_obj[3]}"
else:
bbox_str = str(extent_obj)
except Exception:
bbox_str = None
parsed = urlparse(base_link)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
if provider_norm == "WFS":
query_params.setdefault("BBOX", bbox_str)
query_params.setdefault("service", "WFS")
query_params.setdefault("request", "GetFeature")
query_params.setdefault("outputFormat", "application/json")
if bbox_str:
query_params.setdefault("BBOX", bbox_str)
if crs_authid:
query_params.setdefault("SRSNAME", crs_authid)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
# ArcGIS FeatureServer erwartet i.d.R. den /query-Endpunkt
rest_base = base_link.rstrip("/")
if not rest_base.lower().endswith("/query"):
rest_base = f"{rest_base}/query"
parsed_rest = urlparse(rest_base)
query_params = dict(parse_qsl(parsed_rest.query, keep_blank_values=True))
query_params.setdefault("where", "1=1")
query_params.setdefault("outFields", "*")
query_params.setdefault("returnGeometry", "true")
query_params.setdefault("f", query_params.get("f", "json"))
if bbox_str:
geometry_envelope = None
try:
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
geometry_envelope = {
"xmin": extent_obj.xmin(),
"ymin": extent_obj.ymin(),
"xmax": extent_obj.xmax(),
"ymax": extent_obj.ymax(),
}
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
geometry_envelope = {
"xmin": extent_obj[0],
"ymin": extent_obj[1],
"xmax": extent_obj[2],
"ymax": extent_obj[3],
}
else:
parts = [p.strip() for p in str(bbox_str).split(",")]
if len(parts) == 4:
geometry_envelope = {
"xmin": float(parts[0]),
"ymin": float(parts[1]),
"xmax": float(parts[2]),
"ymax": float(parts[3]),
}
except Exception:
geometry_envelope = None
if geometry_envelope is not None:
query_params.setdefault("geometry", json.dumps(geometry_envelope))
else:
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
if crs_authid and ":" in crs_authid:
srid = crs_authid.split(":", 1)[1]
if srid.isdigit():
query_params.setdefault("inSR", srid)
query_params.setdefault("outSR", srid)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
rebuilt = parsed_rest._replace(query=new_query)
return urlunparse(rebuilt)
# Default: generischer bbox-Parameter
query_params.setdefault("bbox", bbox_str)
# Default: generischer bbox-Parameter (nur wenn vorhanden)
if bbox_str:
query_params.setdefault("bbox", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]:
def _fetch_features(
self,
url: str,
provider: str,
cancel_callback: Optional[Callable[[], bool]] = None,
) -> Tuple[List[Any], Optional[str]]:
"""
Führt den eigentlichen Abruf der Fachdaten durch.
@@ -336,34 +455,100 @@ class Datenabruf:
http_error: Optional[str] = None
# QGIS NetworkAccessManager bevorzugen
_FETCH_TIMEOUT_MS = 30_000 # 30 Sekunden
aborted_or_timed_out = False
attempted_qgis_fetch = False
if callable(cancel_callback) and cancel_callback():
return [], "Abbruch durch Benutzer"
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
attempted_qgis_fetch = True
try:
manager = qgiscore.QgsNetworkAccessManager.instance()
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None)
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None)
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None)
# Netzwerk-Timeout global setzen (QGIS >= 3.6)
if hasattr(manager, "setTimeout"):
manager.setTimeout(_FETCH_TIMEOUT_MS)
_qt = __import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"])
QUrl = getattr(_qt, "QUrl", None)
QNetworkRequest = getattr(_qt, "QNetworkRequest", None)
QEventLoop = getattr(_qt, "QEventLoop", None)
QTimer = getattr(_qt, "QTimer", None)
if QUrl is not None and QNetworkRequest is not None:
req = QNetworkRequest(QUrl(url))
reply = manager.get(req)
if QEventLoop is not None:
loop = QEventLoop()
reply.finished.connect(loop.quit)
loop.exec()
try:
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
_poll_timer = None
if QTimer is not None:
try:
_poll_timer = QTimer()
_poll_timer.setSingleShot(False)
_poll_timer.timeout.connect(loop.quit)
_poll_timer.start(100)
except Exception:
_poll_timer = None
start_time = time.monotonic()
while True:
if callable(cancel_callback) and cancel_callback():
reply.abort()
http_error = "Abbruch durch Benutzer"
aborted_or_timed_out = True
break
elapsed_ms = int((time.monotonic() - start_time) * 1000)
if elapsed_ms >= _FETCH_TIMEOUT_MS:
reply.abort()
http_error = f"Timeout nach {_FETCH_TIMEOUT_MS // 1000} s: {url}"
aborted_or_timed_out = True
break
if hasattr(reply, "isFinished") and reply.isFinished():
break
loop.exec()
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
if _poll_timer is not None:
try:
_poll_timer.stop()
except Exception:
pass
if not aborted_or_timed_out:
# Fehler aus Reply auslesen
err_code = None
try:
err_code = reply.error()
except Exception:
pass
if err_code and int(err_code) != 0:
http_error = f"Netzwerkfehler ({err_code}): {reply.errorString()}"
if http_error:
# Timeout oder Netzwerkfehler keinen Body lesen
pass
else:
try:
response_text = reply.text()
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
response_text = None
try:
response_text = reply.text()
except Exception:
response_text = None
except Exception as exc:
http_error = f"QgsNetworkAccessManager error: {exc}"
response_text = None
# Fallback: requests
if response_text is None:
# Fallback: requests nur wenn kein harter Abbruch/Timeout im QGIS-Request vorlag
if response_text is None and (not attempted_qgis_fetch or not aborted_or_timed_out):
try:
import requests # lokal import, keine harte Abhängigkeit
r = requests.get(url, timeout=30)
@@ -383,6 +568,8 @@ class Datenabruf:
return parsed.get("features", []), None
if isinstance(parsed, dict) and "features" in parsed:
return parsed.get("features", []), None
if prov in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER", "WFS"):
return [], "Antwort enthält keine Feature-Liste"
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
return [parsed], None
except json.JSONDecodeError: