Files
API_Karte_QGISDemo/vln_karten/api_client.py
T

256 lines
9.2 KiB
Python
Raw Normal View History

"""HTTP-Client für die VLN-Manager-API (Karten-Plugin map.php + Stammdaten).
API-Vertrag laut Doku im Wissensdatenbank-Vault
("API.md", "API Karten (map.php).md", "API Schnittstellen (Referenz).md",
Stand 2026-06-08):
Basis-URL: https://api.flurneuordnung-sachsen.de/v2
POST /person/login
Body: {"mail": "...", "password": "..."}
Antwort: {"data": {"userauth": "<url_token>", "id": ...}}
Das userauth-Token ist der API-Key (= PERSON.url_token).
GET /tgen -> Teilnehmergemeinschaften (Verfahren)
GET /maps/<layer>/{vkz} -> GeoJSON FeatureCollection (EPSG:25833)
PUT /maps/<layer>/{vkz} <- GeoJSON (ersetzt den Layer dieser VKZ)
Antwort: {"data": {"vkz", "layer", "art", "speicher_id"}, "status": "ok"}
Layer: umringe, p41, st, kas, we
Auth nach Login über den Header "X-API-Key".
Fehlerformat: RFC 7807 (application/problem+json).
Es wird QgsBlockingNetworkRequest verwendet, damit Proxy- und
Zertifikatseinstellungen aus QGIS automatisch greifen.
"""
import json
from qgis.PyQt.QtCore import QUrl
from qgis.PyQt.QtNetwork import QNetworkRequest
from qgis.core import QgsBlockingNetworkRequest
# Fest verdrahtete Produktiv-API des VLN Managers.
DEFAULT_BASE_URL = "https://api.flurneuordnung-sachsen.de/v2"
class ApiError(Exception):
"""Fehler bei der Kommunikation mit der Karten-API."""
class AuthError(ApiError):
"""API-Key fehlt, ist abgelaufen oder hat keine Berechtigung (401/403)."""
class KartenApiClient:
def __init__(self, base_url=DEFAULT_BASE_URL, api_key=None, mail=None):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.mail = mail
@property
def is_authenticated(self):
return bool(self.api_key)
def login(self, mail, password):
"""Meldet den Benutzer an. Die API liefert das userauth-Token
(= API-Key), das für alle weiteren Requests verwendet wird."""
data = self._request(
"POST",
"/person/login",
payload={"mail": mail, "password": password},
with_auth=False,
)
payload = data.get("data") if isinstance(data, dict) else None
if not isinstance(payload, dict):
payload = data if isinstance(data, dict) else {}
api_key = payload.get("userauth")
if not api_key:
raise ApiError(
"Login-Antwort enthielt kein userauth-Token. "
"Erhaltene Antwort: %s" % json.dumps(data)[:300]
)
self.api_key = api_key
self.mail = mail
return api_key
def logout(self):
self.api_key = None
self.mail = None
# ------------------------------------------------------------------
# Verfahren (Teilnehmergemeinschaften)
# ------------------------------------------------------------------
def get_verfahren(self):
"""Liste der Verfahren (TGs) als [{"vkz": ..., "name": ...}, ...],
sortiert nach VKZ. Quelle: GET /tgen."""
data = self._request("GET", "/tgen")
rows = None
if isinstance(data, list):
rows = data
elif isinstance(data, dict):
for key in ("data", "rows"):
value = data.get(key)
if isinstance(value, list):
rows = value
break
if isinstance(value, dict) and isinstance(value.get("rows"), list):
rows = value["rows"]
break
if rows is None:
raise ApiError(
"Antwort von /tgen hat ein unerwartetes Format: %s"
% json.dumps(data)[:200]
)
verfahren = []
for row in rows:
if not isinstance(row, dict):
continue
vkz = row.get("vkz") or row.get("VKZ")
name = (
row.get("kurzname")
or row.get("Kurzname")
or row.get("name")
or row.get("Name")
or ""
)
if vkz:
verfahren.append({"vkz": str(vkz), "name": str(name)})
verfahren.sort(key=lambda v: v["vkz"])
return verfahren
# ------------------------------------------------------------------
# Karten-Layer (die konkreten Pfade stehen in plugin.DATASETS)
# ------------------------------------------------------------------
def load_layer_complete(self, layer_key, vkz, page_size=2000):
"""Lädt ALLE Objekte eines Layers für eine VKZ aus KARTE_OBJEKT.
Nutzt wie das Web-GIS (0_map, db_p41) den Listen-Endpunkt
/maps/<layer>?vkz=…, aber mit limit/offset-Paging, bis keine
weitere Seite mehr kommt — so ist der Abruf auch bei mehr als
2000 Objekten garantiert vollständig."""
from urllib.parse import quote
features = []
offset = 0
while True:
data = self._request(
"GET",
"/maps/%s?vkz=%s&limit=%d&offset=%d"
% (quote(str(layer_key)), quote(str(vkz)), page_size, offset),
)
if not isinstance(data, dict) or data.get("type") != "FeatureCollection":
raise ApiError(
"Antwort von /maps/%s ist keine GeoJSON FeatureCollection."
% layer_key
)
page = data.get("features", [])
features.extend(page)
if len(page) < page_size:
break
offset += page_size
return {"type": "FeatureCollection", "features": features}
def load_feature_collection(self, path):
"""Lädt einen Layer als GeoJSON FeatureCollection (dict)."""
data = self._request("GET", path)
# Die Karten-Endpunkte liefern die FeatureCollection direkt,
# andere Routen verpacken sie in {"data": ...}.
if isinstance(data, dict):
if data.get("type") == "FeatureCollection":
return data
inner = data.get("data")
if isinstance(inner, dict) and inner.get("type") == "FeatureCollection":
return inner
raise ApiError(
"Antwort von %s ist keine GeoJSON FeatureCollection." % path
)
def save_feature_collection(self, path, feature_collection):
"""Schreibt eine GeoJSON FeatureCollection zurück an die API
(PUT ersetzt den kompletten Layer der jeweiligen VKZ)."""
return self._request("PUT", path, payload=feature_collection)
# ------------------------------------------------------------------
# Intern
# ------------------------------------------------------------------
def _request(self, method, path, payload=None, with_auth=True):
if with_auth and not self.is_authenticated:
raise AuthError("Nicht angemeldet — bitte zuerst einloggen.")
request = QNetworkRequest(QUrl(self.base_url + path))
request.setHeader(
QNetworkRequest.KnownHeaders.ContentTypeHeader, "application/json"
)
request.setRawHeader(b"Accept", b"application/json")
if with_auth:
request.setRawHeader(b"X-API-Key", self.api_key.encode("utf-8"))
body = b""
if payload is not None:
body = json.dumps(payload).encode("utf-8")
blocking = QgsBlockingNetworkRequest()
if method == "GET":
error = blocking.get(request)
elif method == "POST":
error = blocking.post(request, body)
elif method == "PUT":
error = blocking.put(request, body)
else:
raise ApiError("Nicht unterstützte HTTP-Methode: %s" % method)
reply = blocking.reply()
content = bytes(reply.content())
status = reply.attribute(
QNetworkRequest.Attribute.HttpStatusCodeAttribute
)
status = int(status) if status is not None else None
if status in (401, 403):
raise AuthError(
"Keine Berechtigung für %s %s (HTTP %s)%s"
% (method, path, status, self._error_detail(content))
)
if error != QgsBlockingNetworkRequest.NoError:
raise ApiError(
"%s %s fehlgeschlagen: %s%s"
% (method, path, blocking.errorMessage(), self._error_detail(content))
)
if status is not None and status >= 400:
raise ApiError(
"%s %s lieferte HTTP %s%s"
% (method, path, status, self._error_detail(content))
)
if not content:
return None
try:
return json.loads(content.decode("utf-8"))
except ValueError:
raise ApiError(
"Antwort von %s ist kein gültiges JSON." % path
)
@staticmethod
def _error_detail(content):
"""Liest title/detail aus einer RFC-7807-Fehlerantwort."""
if not content:
return ""
try:
problem = json.loads(content.decode("utf-8"))
parts = [
str(problem[k]) for k in ("title", "detail") if problem.get(k)
]
if parts:
return " — " + ": ".join(parts)
except (ValueError, AttributeError):
pass
return " — " + content[:300].decode("utf-8", "replace")