daniel@feature/dataGrabber_anbinden #9

Merged
Daniel merged 8 commits from Daniel/Plugin_SN_Basis:daniel@feature/dataGrabber_anbinden into unstable 2026-03-13 06:51:57 +01:00
19 changed files with 1857 additions and 360 deletions

View File

@@ -0,0 +1,289 @@
name: Release Plugin
run-name: "Release | ${{ github.ref_name }}"
on:
push:
tags:
- 'v*'
jobs:
release:
runs-on: alpine-latest
defaults:
run:
shell: bash
steps:
- name: Notwendige Abhängigkeiten installieren
shell: sh
run: |
apk add --no-cache git zip curl jq rsync bash
git config --global http.sslVerify false
- name: Code holen
run: |
# Tag aus GitHub Actions Kontext extrahieren
TAG="${GITHUB_REF#refs/tags/}"
# Repo-URL dynamisch aus vars und github.repository bauen
REPO_URL="https://${RELEASE_TOKEN}:x-oauth-basic@${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}.git"
# Repository klonen
git clone "$REPO_URL" repo
cd repo
git checkout "$TAG"
env:
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
- name: Version und Kanal bestimmen
id: releaseinfo
run: |
TAG="${{ github.ref_name }}"
VERSION="${TAG#v}"
case "$TAG" in
*-unstable*)
CHANNEL="unstable"
DRAFT="false"
PRERELEASE="true"
;;
*-testing*)
CHANNEL="testing"
DRAFT="false"
PRERELEASE="true"
;;
*)
CHANNEL="stable"
DRAFT="false"
PRERELEASE="false"
;;
esac
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "channel=$CHANNEL" >> $GITHUB_OUTPUT
echo "draft=$DRAFT" >> $GITHUB_OUTPUT
echo "prerelease=$PRERELEASE" >> $GITHUB_OUTPUT
- name: plugin.info einlesen
id: info
run: |
cd repo
while IFS='=' read -r key value; do
echo "$key=$value" >> $GITHUB_OUTPUT
done < plugin.info
- name: Changelog einlesen
id: changelog
run: |
cd repo
# Aktueller Block = alles vor dem ersten ---
CURRENT=$(awk '/^---/{exit} {print}' changelog.txt)
# Vollständige Historie = alles nach dem ersten ---
HISTORY=$(awk 'found{print} /^---/{found=1}' changelog.txt)
# Gitea Release Body zusammenbauen
VERSION="${{ steps.releaseinfo.outputs.version }}"
FULL=$(printf "## %s\n%s\n\n%s" "$VERSION" "$CURRENT" "$HISTORY")
echo "DEBUG | Aktueller Changelog:"
echo "$CURRENT"
# Für GITHUB_OUTPUT: Multiline via EOF-Marker
echo "current<<EOF" >> $GITHUB_OUTPUT
echo "$CURRENT" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
echo "full<<EOF" >> $GITHUB_OUTPUT
echo "$FULL" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: metadata.txt erzeugen
run: |
cd repo
# ------------------------ GEÄNDERT ------------------------
# Temporär die Vorlage aus dem hidden/templates Branch holen
git fetch origin hidden/templates
git checkout origin/hidden/templates -- metadata.template
TEMPLATE="metadata.template"
# -----------------------------------------------------------
# TEMPLATE="templates/metadata.template"
OUT="metadata.txt"
CONTENT=$(cat "$TEMPLATE")
CONTENT="${CONTENT//\{\{NAME\}\}/${{ steps.info.outputs.name }}}"
CONTENT="${CONTENT//\{\{QGIS_MIN\}\}/${{ steps.info.outputs.qgisMinimumVersion }}}"
CONTENT="${CONTENT//\{\{QGIS_MAX\}\}/${{ steps.info.outputs.qgisMaximumVersion }}}"
CONTENT="${CONTENT//\{\{DESCRIPTION\}\}/${{ steps.info.outputs.description }}}"
CONTENT="${CONTENT//\{\{VERSION\}\}/${{ steps.releaseinfo.outputs.version }}}"
CONTENT="${CONTENT//\{\{AUTHOR\}\}/${{ steps.info.outputs.author }}}"
CONTENT="${CONTENT//\{\{EMAIL\}\}/${{ steps.info.outputs.email }}}"
CONTENT="${CONTENT//\{\{HOMEPAGE\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
CONTENT="${CONTENT//\{\{TRACKER\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
CONTENT="${CONTENT//\{\{REPOSITORY\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
CONTENT="${CONTENT//\{\{EXPERIMENTAL\}\}/${{ steps.info.outputs.experimental }}}"
CONTENT="${CONTENT//\{\{DEPRECATED\}\}/${{ steps.info.outputs.deprecated }}}"
CONTENT="${CONTENT//\{\{QT6\}\}/${{ steps.info.outputs.supportsQt6 }}}"
printf "%s\n" "$CONTENT" > "$OUT"
rm $TEMPLATE
- name: ZIP-Datei erstellen
id: zip
run: |
cd repo
ZIP_FOLDER="${{ steps.info.outputs.zip_folder }}"
ZIP_FILE="${ZIP_FOLDER}.zip"
VERSION="${{ steps.releaseinfo.outputs.version }}"
REPO_NAME="${GITHUB_REPOSITORY##*/}"
#ZIP_NAME="${REPO_NAME}-${VERSION}.zip"
mkdir -p dist/${ZIP_FOLDER}
rsync -a \
--exclude='.git' \
--exclude='.gitea' \
--exclude='.plugin' \
--exclude='dist' \
./ dist/${ZIP_FOLDER}/
cd dist
zip -r "${ZIP_FILE}" "${ZIP_FOLDER}/" \
-x "*.pyc" -x "*/__pycache__/*"
cd ..
echo "zip_file=${ZIP_FILE}" >> $GITHUB_OUTPUT
- name: Gitea-Release erstellen
id: create_release
run: |
TAG="${{ github.ref_name }}"
VERSION="${{ steps.releaseinfo.outputs.version }}"
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
API_URL="https://${{ vars.RELEASE_URL }}/api/v1/repos/${GITHUB_REPOSITORY}/releases"
JSON=$(jq -n \
--arg tag "$TAG" \
--arg name "Version $VERSION" \
--arg body "${{ steps.changelog.outputs.current }}" \
--argjson draft "${{ steps.releaseinfo.outputs.draft }}" \
--argjson prerelease "${{ steps.releaseinfo.outputs.prerelease }}" \
'{tag_name: $tag, name: $name, body: $body, draft: $draft, prerelease: $prerelease}')
API_RESPONSE=$(curl -s -X POST "$API_URL" \
-H "accept: application/json" \
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
-H "Content-Type: application/json" \
-d "$JSON")
RELEASE_ID=$(echo "$API_RESPONSE" | jq -r '.id')
if [ "$RELEASE_ID" = "null" ] || [ -z "$RELEASE_ID" ]; then
echo "Fehler beim Erstellen des Releases!"
echo "$API_RESPONSE"
exit 1
fi
echo "release_id=$RELEASE_ID" >> $GITHUB_OUTPUT
- name: ZIP-Datei hochladen
run: |
RELEASE_ID="${{ steps.create_release.outputs.release_id }}"
ZIP_FILE="${{ steps.zip.outputs.zip_file }}"
API_URL="https://${{ vars.RELEASE_URL }}/api/v1/repos/${GITHUB_REPOSITORY}/releases/${RELEASE_ID}/assets?name=${ZIP_FILE}"
curl -s -X POST "$API_URL" \
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
-H "Content-Type: application/zip" \
--data-binary "@repo/dist/${ZIP_FILE}" \
-o upload_response.json
# Optional: Fehlerprüfung
if jq -e '.id' upload_response.json >/dev/null 2>&1; then
echo "ZIP erfolgreich hochgeladen."
else
echo "Fehler beim Hochladen der ZIP!"
exit 1
fi
- name: Payload erzeugen
run: |
cd repo
VERSION="${{ steps.releaseinfo.outputs.version }}"
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
ZIP_FILE="${{ steps.zip.outputs.zip_file }}"
DOWNLOAD_URL="https://${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}/releases/download/${{ github.ref_name }}/${ZIP_FILE}"
jq -n \
--arg name "${{ steps.info.outputs.name }}" \
--arg version "$VERSION" \
--arg channel "$CHANNEL" \
--arg description "${{ steps.info.outputs.description }}" \
--arg author "${{ steps.info.outputs.author }}" \
--arg email "${{ steps.info.outputs.email }}" \
--arg qgis_min "${{ steps.info.outputs.qgisMinimumVersion }}" \
--arg qgis_max "${{ steps.info.outputs.qgisMaximumVersion }}" \
--arg homepage "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
--arg tracker "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
--arg repository "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
--arg experimental "${{ steps.info.outputs.experimental }}" \
--arg deprecated "${{ steps.info.outputs.deprecated }}" \
--arg qt6 "${{ steps.info.outputs.supportsQt6 }}" \
--arg id "${{ steps.info.outputs.zip_folder }}" \
--arg url "$DOWNLOAD_URL" \
--arg changelog "${{ steps.changelog.outputs.current }}" \
'{
name: $name,
version: $version,
channel: $channel,
description: $description,
author: $author,
email: $email,
qgis_min: $qgis_min,
qgis_max: $qgis_max,
homepage: $homepage,
tracker: $tracker,
repository: $repository,
experimental: $experimental,
deprecated: $deprecated,
qt6: $qt6,
id: $id,
url: $url,
changelog: $changelog
}' > payload.json
- name: Repository aktualisieren
run: |
OWNER="AG_QGIS"
WORKFLOW="update.yml"
PAYLOAD_B64=$(base64 -w0 repo/payload.json)
FULL_NAME="${{ steps.info.outputs.name }}"
NAME=$(echo "$FULL_NAME" | awk -F'|' '{gsub(/^ +| +$/,"",$2); print $2}')
TAG="${{ steps.releaseinfo.outputs.version }}"
JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\",\"name\":\"$NAME\",\"tag\":\"$TAG\"}}"
#JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\"}}"
echo "DEBUG | Sende JSON:"
echo "$JSON"
curl -X POST \
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
-H "Content-Type: application/json" \
-d "$JSON" \
"https://${{ vars.RELEASE_URL }}/api/v1/repos/${OWNER}/Repository/actions/workflows/${WORKFLOW}/dispatches"

View File

@@ -15,7 +15,7 @@ from .ly_metadata_wrapper import (
is_layer_editable,
)
from .ly_style_wrapper import apply_style
from .dialog_wrapper import ask_yes_no
from .dialog_wrapper import ask_yes_no, ask_overwrite_append_cancel_custom
from .message_wrapper import (
_get_message_bar,

View File

@@ -2,8 +2,10 @@
sn_basis/functions/dialog_wrapper.py Benutzer-Dialoge (Qt5/6/Mock-kompatibel)
"""
from typing import Any
from typing import Literal, Optional
from sn_basis.functions.qt_wrapper import (
QMessageBox, YES, NO, QT_VERSION
QMessageBox, YES, NO, CANCEL, QT_VERSION, exec_dialog, ICON_QUESTION,
QProgressDialog, QCoreApplication, Qt,
)
def ask_yes_no(
@@ -35,3 +37,146 @@ def ask_yes_no(
except Exception as e:
print(f"⚠️ ask_yes_no Fehler: {e}")
return default
OverwriteDecision = Optional[Literal["overwrite", "append", "cancel"]]
def ask_overwrite_append_cancel_custom(
parent,
title: str,
message: str,
) -> Literal["overwrite", "append", "cancel"]:
"""Zeigt Dialog mit benutzerdefinierten Buttons: Überschreiben/Anhängen/Abbrechen.
Parameters
----------
parent :
Eltern-Widget oder None.
title : str
Dialog-Titel.
message : str
Hauptmeldung mit Erklärung.
Returns
-------
Literal["overwrite", "append", "cancel"]
Genaue Entscheidung des Nutzers.
"""
msg = QMessageBox(parent)
msg.setIcon(ICON_QUESTION)
msg.setWindowTitle(title)
msg.setText(message)
# Eigene Buttons mit exakten Texten
overwrite_btn = msg.addButton("Überschreiben", QMessageBox.ButtonRole.AcceptRole)
append_btn = msg.addButton("Anhängen", QMessageBox.ButtonRole.ActionRole)
cancel_btn = msg.addButton("Abbrechen", QMessageBox.ButtonRole.RejectRole)
exec_dialog(msg)
clicked = msg.clickedButton()
if clicked == overwrite_btn:
return "overwrite"
elif clicked == append_btn:
return "append"
else: # cancel_btn
return "cancel"
class ProgressDialog:
def __init__(self, total: int, title: str = "Fortschritt", label: str = "Verarbeite..."):
self.total = max(total, 1)
self._canceled = False
if QT_VERSION == 0:
self.value = 0
self.label = label
self.title = title
return
self._dlg = QProgressDialog(label, "Abbrechen", 0, self.total)
self._dlg.setWindowTitle(title)
# Qt5 vs Qt6: WindowModality-Enum unterschiedlich verfügbar
modality = None
if hasattr(Qt, "WindowModality"):
try:
modality = Qt.WindowModality.WindowModal
except Exception:
modality = None
if modality is None and hasattr(Qt, "WindowModal"):
modality = Qt.WindowModal
if modality is not None:
try:
self._dlg.setWindowModality(modality)
except Exception:
pass
self._dlg.setMinimumDuration(0)
self._dlg.setAutoClose(False)
self._dlg.setAutoReset(False)
self._dlg.setValue(0)
def on_cancel():
if self._dlg and self._dlg.value() >= self.total:
# OK-Button am Ende
self._dlg.close()
return
self._canceled = True
self._dlg.close()
try:
self._dlg.canceled.connect(on_cancel)
except Exception:
pass
def set_total(self, total: int) -> None:
self.total = max(total, 1)
if QT_VERSION == 0:
return
if self._dlg is not None:
self._dlg.setMaximum(self.total)
def set_value(self, value: int) -> None:
if QT_VERSION == 0:
self.value = value
return
if self._dlg is not None:
self._dlg.setValue(min(value, self.total))
if value >= self.total:
self._dlg.setLabelText("Fertig. Klicken Sie auf OK, um das Fenster zu schließen.")
self._dlg.setCancelButtonText("OK")
QCoreApplication.processEvents()
def set_label(self, text: str) -> None:
if QT_VERSION == 0:
self.label = text
return
if self._dlg is not None:
self._dlg.setLabelText(text)
QCoreApplication.processEvents()
def is_canceled(self) -> bool:
if QT_VERSION == 0:
return self._canceled
if self._dlg is not None:
return self._canceled or self._dlg.wasCanceled()
return self._canceled
def close(self) -> None:
if QT_VERSION == 0:
return
if self._dlg is not None:
self._dlg.close()
def create_progress_dialog(total: int, title: str = "Fortschritt", label: str = "Verarbeite...") -> ProgressDialog:
return ProgressDialog(total, title, label)

View File

@@ -1,23 +1,44 @@
# sn_basis/functions/ly_style_wrapper.py
from sn_basis.functions.ly_existence_wrapper import layer_exists
from sn_basis.functions.sys_wrapper import (
get_plugin_root,
join_path,
file_exists,
)
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path
from sn_basis.modules.stilpruefer import Stilpruefer
from typing import Optional
def apply_style(layer, style_name: str) -> bool:
"""
Wendet einen Layerstil an, sofern er gültig ist.
- Validierung erfolgt ausschließlich über Stilpruefer
- Keine eigenen Dateisystem- oder Endungsprüfungen
- Keine Seiteneffekte bei ungültigem Stil
"""
print(">>> apply_style() START")
if not layer_exists(layer):
return False
style_path = join_path(get_plugin_root(), "styles", style_name)
if not file_exists(style_path):
# Stilpfad zusammensetzen
style_path = join_path(get_plugin_root(), "sn_verfahrensgebiet","styles", style_name)
# Stil prüfen
pruefer = Stilpruefer()
ergebnis = pruefer.pruefe(style_path)
print(">>> Stilprüfung:", ergebnis)
print(
f"[Stilprüfung] ok={ergebnis.ok} | "
f"aktion={ergebnis.aktion} | "
f"meldung={ergebnis.meldung}"
)
if not ergebnis.ok:
return False
# Stil anwenden
try:
ok, _ = layer.loadNamedStyle(style_path)
ok, _ = layer.loadNamedStyle(str(ergebnis.kontext))
if ok:
getattr(layer, "triggerRepaint", lambda: None)()
return True

View File

@@ -57,6 +57,22 @@ def get_home_dir() -> Path:
return Path.home()
def is_absolute_path(path: _PathLike) -> bool:
"""Prüft, ob ein Pfad absolut ist."""
try:
return Path(path).is_absolute()
except Exception:
return False
def basename(path: _PathLike) -> str:
"""Gibt den finalen Namen des Pfades zurück (Dateiname oder Ordner)."""
try:
return Path(path).name
except Exception:
return ""
# ---------------------------------------------------------
# Dateisystem-Eigenschaften
# ---------------------------------------------------------
@@ -75,3 +91,11 @@ def is_case_sensitive_fs() -> bool:
# Linux praktisch immer case-sensitiv
return True
def path_suffix(path: _PathLike) -> str:
"""Gibt die Dateiendung eines Pfades zurück (inklusive Punkt)."""
try:
return Path(path).suffix
except Exception:
return ""

View File

@@ -20,6 +20,12 @@ QgsNetworkAccessManager: Type[Any]
Qgis: Type[Any]
QgsMapLayerProxyModel: Type[Any]
QgsVectorFileWriter: Type[Any] # neu: Schreib-API
QgsFeature: Type[Any]
QgsField: Type[Any]
QgsGeometry: Type[Any]
QgsFeatureRequest: Type[Any]
QgsCoordinateTransform: Type[Any]
QgsCoordinateReferenceSystem: Type[Any]
QGIS_AVAILABLE = False
@@ -36,6 +42,12 @@ try:
Qgis as _Qgis,
QgsMapLayerProxyModel as _QgsMaplLayerProxyModel,
QgsVectorFileWriter as _QgsVectorFileWriter,
QgsFeature as _QgsFeature,
QgsField as _QgsField,
QgsGeometry as _QgsGeometry,
QgsFeatureRequest as _QgsFeatureRequest,
QgsCoordinateTransform as _QgsCoordinateTransform,
QgsCoordinateReferenceSystem as _QgsCoordinateReferenceSystem,
)
QgsProject = _QgsProject
@@ -45,6 +57,12 @@ try:
Qgis = _Qgis
QgsMapLayerProxyModel = _QgsMaplLayerProxyModel
QgsVectorFileWriter = _QgsVectorFileWriter
QgsFeature = _QgsFeature
QgsField = _QgsField
QgsGeometry = _QgsGeometry
QgsFeatureRequest = _QgsFeatureRequest
QgsCoordinateTransform = _QgsCoordinateTransform
QgsCoordinateReferenceSystem = _QgsCoordinateReferenceSystem
QGIS_AVAILABLE = True
@@ -116,6 +134,30 @@ except Exception:
QgsRasterLayer = _MockQgsRasterLayer
class _MockQgsFeatureRequest:
def __init__(self):
self._filter_rect = None
def setFilterRect(self, rect):
self._filter_rect = rect
return self
QgsFeatureRequest = _MockQgsFeatureRequest
class _MockQgsCoordinateTransform:
def __init__(self, *args, **kwargs):
pass
def transformBoundingBox(self, rect):
return rect
class _MockQgsCoordinateReferenceSystem:
def __init__(self, *args, **kwargs):
pass
QgsCoordinateTransform = _MockQgsCoordinateTransform
QgsCoordinateReferenceSystem = _MockQgsCoordinateReferenceSystem
QgsNetworkAccessManager = _MockQgsNetworkAccessManager
class _MockQgis:

View File

@@ -10,12 +10,17 @@ YES: Optional[Any] = None
NO: Optional[Any] = None
CANCEL: Optional[Any] = None
ICON_QUESTION: Optional[Any] = None
QVariant: Type[Any] = object
# Qt-Klassen (werden dynamisch gesetzt)
QDockWidget: Type[Any] = object
QMessageBox: Type[Any] = object
QFileDialog: Type[Any] = object
QProgressDialog: Type[Any] = object
QEventLoop: Type[Any] = object
QTimer: Type[Any] = object
QUrl: Type[Any] = object
QNetworkRequest: Type[Any] = object
QNetworkReply: Type[Any] = object
@@ -36,6 +41,8 @@ QToolButton: Type[Any] = object
QSizePolicy: Type[Any] = object
Qt: Type[Any] = object
QComboBox: Type[Any] = object
QHBoxLayout: Type[Any] = object
def exec_dialog(dialog: Any) -> Any:
"""Führt Dialog modal aus (Qt6: exec(), Qt5: exec_(), Mock: YES)"""
@@ -61,6 +68,7 @@ try:
from qgis.PyQt.QtWidgets import (
QMessageBox as _QMessageBox,
QFileDialog as _QFileDialog,
QProgressDialog as _QProgressDialog,
QWidget as _QWidget,
QGridLayout as _QGridLayout,
QLabel as _QLabel,
@@ -77,12 +85,15 @@ try:
QToolButton as _QToolButton,
QSizePolicy as _QSizePolicy,
QComboBox as _QComboBox,
QHBoxLayout as _QHBoxLayout,
)
from qgis.PyQt.QtCore import (
QEventLoop as _QEventLoop,
QTimer as _QTimer,
QUrl as _QUrl,
QCoreApplication as _QCoreApplication,
Qt as _Qt,
QVariant as _QVariant
)
from qgis.PyQt.QtNetwork import (
QNetworkRequest as _QNetworkRequest,
@@ -93,7 +104,10 @@ try:
QT_VERSION = 6
QMessageBox = _QMessageBox
QFileDialog = _QFileDialog
QProgressDialog = _QProgressDialog
QProgressDialog = _QProgressDialog
QEventLoop = _QEventLoop
QTimer = _QTimer
QUrl = _QUrl
QNetworkRequest = _QNetworkRequest
QNetworkReply = _QNetworkReply
@@ -115,12 +129,16 @@ try:
QToolButton = _QToolButton
QSizePolicy = _QSizePolicy
QComboBox = _QComboBox
QVariant = _QVariant
QHBoxLayout= _QHBoxLayout
# ✅ QT6 ENUMS
YES = QMessageBox.StandardButton.Yes
NO = QMessageBox.StandardButton.No
CANCEL = QMessageBox.StandardButton.Cancel
ICON_QUESTION = QMessageBox.Icon.Question
AcceptRole = QMessageBox.ButtonRole.AcceptRole
ActionRole = QMessageBox.ButtonRole.ActionRole
RejectRole = QMessageBox.ButtonRole.RejectRole
# Qt6 Enum-Aliase
ToolButtonTextBesideIcon = Qt.ToolButtonStyle.ToolButtonTextBesideIcon
@@ -161,12 +179,15 @@ except (ImportError, AttributeError):
QToolButton as _QToolButton,
QSizePolicy as _QSizePolicy,
QComboBox as _QComboBox,
QHBoxLayout as _QHBoxLayout,
)
from PyQt5.QtCore import (
QEventLoop as _QEventLoop,
QTimer as _QTimer,
QUrl as _QUrl,
QCoreApplication as _QCoreApplication,
Qt as _Qt,
QVariant as _QVariant
)
from PyQt5.QtNetwork import (
QNetworkRequest as _QNetworkRequest,
@@ -178,6 +199,7 @@ except (ImportError, AttributeError):
QMessageBox = _QMessageBox
QFileDialog = _QFileDialog
QEventLoop = _QEventLoop
QTimer = _QTimer
QUrl = _QUrl
QNetworkRequest = _QNetworkRequest
QNetworkReply = _QNetworkReply
@@ -199,12 +221,18 @@ except (ImportError, AttributeError):
QToolButton = _QToolButton
QSizePolicy = _QSizePolicy
QComboBox = _QComboBox
QVariant = _QVariant
QHBoxLayout = _QHBoxLayout
# ✅ PYQT5 ENUMS
YES = QMessageBox.Yes
NO = QMessageBox.No
CANCEL = QMessageBox.Cancel
ICON_QUESTION = QMessageBox.Question
AcceptRole = QMessageBox.AcceptRole
ActionRole = QMessageBox.ActionRole
RejectRole = QMessageBox.RejectRole
# PyQt5 Enum-Aliase
ToolButtonTextBesideIcon = Qt.ToolButtonTextBesideIcon
@@ -244,6 +272,10 @@ except (ImportError, AttributeError):
No = NO
Cancel = CANCEL
Question = ICON_QUESTION
AcceptRole = 0
ActionRole = 3
RejectRole = 1
@classmethod
def question(cls, parent, title, message, buttons, default_button):
@@ -268,6 +300,17 @@ except (ImportError, AttributeError):
QEventLoop = _MockQEventLoop
class _MockQTimer:
def __init__(self, *args, **kwargs):
self.timeout = type('Signal', (), {
'connect': lambda s, cb: None,
})()
def setSingleShot(self, v: bool) -> None: pass
def start(self, ms: int) -> None: pass
def stop(self) -> None: pass
QTimer = _MockQTimer
class _MockQUrl(str):
def isValid(self) -> bool: return True
@@ -423,9 +466,71 @@ except (ImportError, AttributeError):
QComboBox = _MockComboBox
# ---------------------------
# Mock für QVariant
# ---------------------------
class _MockQVariant:
"""
Minimaler Ersatz für QtCore.QVariant.
Ziel:
- Werte transparent durchreichen
- Typ-Konstanten bereitstellen
- Keine Qt-Abhängigkeiten
"""
# Typ-Konstanten (symbolisch, Werte egal)
Invalid = 0
Int = 1
Double = 2
String = 3
Bool = 4
Date = 5
DateTime = 6
def __init__(self, value: Any = None):
self._value = value
def value(self) -> Any:
return self._value
def __repr__(self) -> str:
return f"QVariant({self._value!r})"
# Optional: automatische Entpackung
def __int__(self):
return int(self._value)
def __float__(self):
return float(self._value)
def __str__(self):
return str(self._value)
QVariant = _MockQVariant
class _MockQHBoxLayout:
def __init__(self, *args, **kwargs):
self._widgets = []
def addWidget(self, widget):
self._widgets.append(widget)
def addLayout(self, layout):
pass
def addStretch(self, *args, **kwargs):
pass
def setSpacing(self, *args, **kwargs):
pass
def setContentsMargins(self, *args, **kwargs):
pass
QHBoxLayout = _MockQHBoxLayout
def exec_dialog(dialog: Any) -> Any:
return YES
# --------------------------- TEST ---------------------------
if __name__ == "__main__":
debug_qt_status()

View File

@@ -6,6 +6,7 @@ from pathlib import Path
from typing import Union
import sys
from sn_basis.functions.os_wrapper import is_absolute_path, basename
_PathLike = Union[str, Path]

View File

@@ -1,13 +1,15 @@
[general]
name=LNO Sachsen | Basisfunktionen
qgisMinimumVersion=3.0
name=LNO Sachsen | Basisfunktionen
description=Plugin mit Basisfunktionen
version=25.11.4
author=Michael Otto
email=michael.otto@landkreis-mittelsachsen.de
about=Plugin mit Basisfunktionen
category=Plugins
homepage=https://entwicklung.vln-sn.de/AG_QGIS/Plugin_SN_Basis
repository=https://entwicklung.vln-sn.de/AG_QGIS/Repository
supportsQt6=true
experimental=true
homepage=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis
repository=https://entwicklung.flurneuordnung-sachsen/AG_QGIS/Repository
qgisMinimumVersion=3.40
qgisMaximumVersion=4.99
version=26.3.11
deprecated=False
experimental=true
supportsQt6=Yes
zip_folder=sn_basis

View File

@@ -2,14 +2,14 @@
DataGrabber module
==================
UIfreier Orchestrator für die Prüfung und Klassifikation von Datenquellen.
UI-freier Orchestrator für die Prüfung und Klassifikation von Datenquellen.
Der DataGrabber:
- klassifiziert die übergebene Quelle (Datei, Dienst, Datenbank, Excel),
- ruft passende Prüfer (Dateipruefer, Linkpruefer, Layerpruefer, Stilpruefer) auf,
- sammelt alle rohen ``pruef_ergebnis``Objekte,
- sammelt alle rohen ``pruef_ergebnis``-Objekte,
- aggregiert diese zu einem zusammenfassenden Ergebnis,
- **löst selbst keinerlei UIInteraktion aus**.
- **löst selbst keinerlei UI-Interaktion aus**.
Alle Nutzerinteraktionen (MessageBar, QMessageBox, Logging) erfolgen
ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline).
@@ -17,8 +17,11 @@ ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline
from __future__ import annotations
import re
from typing import Any, Dict, List, Mapping, Optional, Tuple, Literal
from sn_basis.functions.os_wrapper import basename, path_suffix
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.modules.Pruefmanager import Pruefmanager
@@ -27,6 +30,7 @@ from sn_basis.modules.linkpruefer import Linkpruefer
from sn_basis.modules.layerpruefer import Layerpruefer
from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.excel_importer import ExcelImporter
from sn_plan41.modules.listenauswerter import Listenauswerter
SourceType = Literal["service", "database", "excel", "unknown"]
@@ -38,9 +42,6 @@ class DataGrabber:
"""
Analysiert und prüft Datenquellen für den Fachdatenabruf.
Der DataGrabber ist **UIfrei**. Er erzeugt ausschließlich rohe
``pruef_ergebnis``Objekte und überlässt deren Verarbeitung
vollständig dem aufrufenden Code.
"""
def __init__(
@@ -55,9 +56,9 @@ class DataGrabber:
) -> None:
self.pruefmanager = pruefmanager
self._datei_pruefer_cls = datei_pruefer_cls
self.link_pruefer = link_pruefer
self.layer_pruefer = layer_pruefer
self.stil_pruefer = stil_pruefer
self.link_pruefer = link_pruefer or Linkpruefer()
self.layer_pruefer = layer_pruefer or Layerpruefer()
self.stil_pruefer = stil_pruefer or Stilpruefer()
self._excel_importer_cls = excel_importer_cls
self._source: Optional[str] = None
@@ -69,23 +70,61 @@ class DataGrabber:
"""Setzt die aktuell zu untersuchende Rohquelle."""
self._source = source
def analyze_source_type(self, source: str) -> SourceType:
"""
Klassifiziert die Quelle.
SourceType = str # "excel" | "datenbank" | "dienst" | "unbekannt"
Aktuell Platzhalter liefert ``"unknown"``.
def analyze_source_type(self, quelle: str) -> Tuple[SourceType, pruef_ergebnis]:
"""
return "unknown"
Klassifiziert die Quelle und liefert das zugehörige pruef_ergebnis.
Reihenfolge:
1. Dateipruefer (Datei + Dateityp)
2. Linkpruefer (Dienst)
"""
# --------------------------------------------------
# 1. Datei prüfen (inkl. Typ-Erkennung)
# --------------------------------------------------
dateipruefer = Dateipruefer(pfad=quelle)
datei_ergebnis = dateipruefer.pruefe()
if datei_ergebnis.ok:
suffix = path_suffix(datei_ergebnis.kontext).lower()
print(f"[DataGrabber] Debug: analyze_source_type source={quelle} -> suffix={suffix}")
if suffix == ".xlsx":
return "excel", datei_ergebnis
if suffix in (".gpkg", ".sqlite"):
return "datenbank", datei_ergebnis
return "unbekannter_dateityp", datei_ergebnis
# --------------------------------------------------
# 2. Keine Datei → Link prüfen
# --------------------------------------------------
linkpruefer = Linkpruefer()
link_ergebnis = linkpruefer.pruefe(quelle)
if link_ergebnis.ok:
return "dienst", link_ergebnis
# --------------------------------------------------
# 3. Weder Datei noch Dienst
# --------------------------------------------------
return "unbekannte_quelle", link_ergebnis
def run(self, source: str) -> Tuple[SourceDict, pruef_ergebnis]:
"""
Führt die vollständige Quellprüfung aus.
Diese Methode ist **UIfrei**. Sie gibt rohe Ergebnisse zurück,
Diese Methode ist **UIfrei**. Sie gibt rohe Ergebnisse zurück,
die vom Aufrufer über den ``Pruefmanager`` verarbeitet werden.
"""
self.set_source(source)
source_type = self.analyze_source_type(source)
source_type, source_result = self.analyze_source_type(source)
print(f"[DataGrabber] Debug: run source={source} -> source_type={source_type}")
source_dict: SourceDict = {}
partial_results: List[pruef_ergebnis] = []
@@ -97,14 +136,7 @@ class DataGrabber:
elif source_type == "service":
source_dict, partial_results = self._process_service_source(source)
else:
partial_results.append(
pruef_ergebnis(
ok=False,
meldung="Quelle konnte nicht klassifiziert werden",
aktion="kein_dateipfad",
kontext={"source": source},
)
)
partial_results.append(source_result)
summary = self._aggregate_results(source, source_dict, partial_results)
return source_dict, summary
@@ -115,9 +147,150 @@ class DataGrabber:
def _process_excel_source(
self, filepath: str
) -> Tuple[SourceDict, List[pruef_ergebnis]]:
source_dict: SourceDict = {}
source_dict: SourceDict = {"rows": []}
results: List[pruef_ergebnis] = []
return source_dict, results
rows = ExcelImporter(filepath, self.pruefmanager).import_xlsx()
print(f"[DataGrabber] Debug: Excel-Linkliste geladen: {filepath}")
print(f"[DataGrabber] Debug: raw rows count: {len(rows)}")
if rows:
first = rows[:min(5, len(rows))]
print(f"[DataGrabber] Debug: first rows: {first}")
if not rows:
return source_dict, results
required_keys = {"ident", "gruppe", "kartenebene", "inhalt", "link", "provider", "stildatei"}
def extract_url(raw_link: str, provider: str) -> str:
if not raw_link:
return ""
if not isinstance(raw_link, str):
return str(raw_link)
if provider == "wfs":
url_match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
type_match = re.search(r"typename\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if url_match:
url = url_match.group(1).strip()
if type_match:
typename = type_match.group(1).strip()
separator = "&" if "?" in url else "?"
return f"url={url}{separator}service=WFS&request=GetFeature&typename={typename}"
return f"url={url}"
if provider == "wms":
# falls WMS-URL als url='...' vorliegt
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
if provider == "rest":
# REST/ArcGIS-Server: direkt nutzen
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
# allgemeines Rückfallverhalten
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
return raw_link.strip()
for row_index, raw_row in enumerate(rows, start=2):
if not isinstance(raw_row, Mapping):
pe = pruef_ergebnis(
ok=False,
meldung="Linklisten-Zeile ist nicht als Dictionary formatiert.",
aktion="ungueltige_zeile",
kontext={"zeile": row_index, "wert": raw_row},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
normalized = {str(k).strip().lower(): v for k, v in raw_row.items() if k is not None}
if not required_keys.issubset(normalized.keys()):
missing = required_keys.difference(normalized.keys())
pe = pruef_ergebnis(
ok=False,
meldung=f"Linkliste fehlt erforderliche Spalten: {', '.join(sorted(missing))}",
aktion="spaltenfehlend",
kontext={"zeile": row_index, "fehlend": sorted(missing)},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
ident = normalized.get("ident")
link_raw = normalized.get("link") or ""
provider = str(normalized.get("provider") or "").strip().lower()
stildatei_raw = normalized.get("stildatei") or ""
stildatei = None
if stildatei_raw and str(stildatei_raw).strip():
style_result = self.stil_pruefer.pruefe(str(stildatei_raw).strip())
results.append(self.pruefmanager.verarbeite(style_result))
if style_result.ok:
# Style-Pfad in der Datenkette beibehalten (absolut, wenn vorhanden).
stildatei = str(style_result.kontext or stildatei_raw).strip()
else:
stildatei = None
else:
results.append(self.pruefmanager.verarbeite(pruef_ergebnis(ok=True, meldung="Kein Stil angegeben", aktion="stil_optional", kontext=None)))
stildatei = None
if not ident or not link_raw or not provider:
pe = pruef_ergebnis(
ok=False,
meldung="Linklisten-Zeile hat fehlende Pflichtfelder (ident/link/provider).",
aktion="pflichtfelder_fehlen",
kontext={"zeile": row_index, "daten": raw_row},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
link_url = extract_url(link_raw, provider)
# Provider-abhängige Linkvalidierung
if provider in ("wfs", "wms", "rest"):
# Webdienste: wir akzeptieren die URL-Form und prüfen nicht per network_head.
link_result = pruef_ergebnis(ok=True, meldung="Service-Link angenommen", aktion="service_link", kontext=link_url)
elif provider in ("ogr", "gpkg", "shp", "geojson"):
# OGR/Pfad: mit Linkpruefer (pfad oder lokale Datei) prüfen
link_result = self.link_pruefer.pruefe(link_url)
else:
link_result = self.link_pruefer.pruefe(link_url)
results.append(self.pruefmanager.verarbeite(link_result))
# stildatei wurde bereits oben geprüft und ggf. auf Dateiname gesetzt oder auf None
if not link_result.ok:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Zeile {row_index}: fehlerhafter Link",
aktion="link_unvollstaendig",
kontext={"row": row_index, "ident": ident},
)
)
continue
result_row = {
"ident": ident,
"gruppe": normalized.get("gruppe"),
"Kartenebene": normalized.get("kartenebene"),
"Inhalt": normalized.get("inhalt"),
"Link": link_url,
"Provider": provider,
"stildatei": stildatei,
}
source_dict["rows"].append(result_row)
# Validierung über Listenauswerter
listenauswerter = Listenauswerter(self.pruefmanager, self.stil_pruefer or Stilpruefer())
validated, validation_results = listenauswerter.validate_rows(source_dict)
results.extend(validation_results)
return validated, results
# ------------------------------------------------------------------
# DatenbankQuellen
@@ -125,6 +298,7 @@ class DataGrabber:
def _process_database_source(
self, db_path: str
) -> Tuple[SourceDict, List[pruef_ergebnis]]:
print(f"[DataGrabber] Debug: _process_database_source called, db_path={db_path}")
source_dict: SourceDict = {}
results: List[pruef_ergebnis] = []
return source_dict, results
@@ -149,24 +323,29 @@ class DataGrabber:
partial_results: List[pruef_ergebnis],
) -> pruef_ergebnis:
"""
Aggregiert Einzelprüfungen zu einem Gesamt``pruef_ergebnis``.
Aggregiert Einzelprüfungen zu einem Gesamt-``pruef_ergebnis``.
**Keine UIInteraktion.**
**Keine UI-Interaktion.**
"""
if source_dict:
rows = source_dict.get("rows") if isinstance(source_dict, dict) else None
if rows:
return pruef_ergebnis(
ok=True,
meldung="Quelle erfolgreich geprüft",
aktion="ok",
kontext={
"source": source,
"valid_entries": sum(len(v) for v in source_dict.values()),
"valid_entries": len(rows),
},
)
# Wenn die Linkliste zwar gelesen wurde, aber keine gültigen Zeilen verfügbar sind, geben wir spezifischere Infos zurück.
return pruef_ergebnis(
ok=False,
meldung="Keine gültigen Einträge in der Quelle gefunden",
aktion="read_error",
kontext={"source": source},
meldung="Keine validen Einträge in der Linkliste gefunden",
aktion="keine_validen_eintraege",
kontext={
"source": source,
"eintraege_gesamt": len(source_dict.get("rows", [])),
},
)

View File

@@ -6,11 +6,11 @@ der Anforderungen 1-2.e (leerer Pfad, fehlende Datei, bestehende Datei).
"""
from pathlib import Path
from typing import Optional
from typing import Optional, Literal
from sn_basis.functions.sys_wrapper import join_path, file_exists
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion
DateiTyp = Literal["excel","datenbank","unbekannt"]
class Dateipruefer:
"""
@@ -74,10 +74,25 @@ class Dateipruefer:
# ------------------------------------------------------------------
# Hilfsfunktionen
# ------------------------------------------------------------------
def erkenne_dateityp(self, pfad: Path) -> DateiTyp:
"""
Erkennt den Dateityp anhand der Endung.
"""
suffix = pfad.suffix.lower()
if suffix == ".xlsx":
return "excel"
if suffix in (".gpkg", ".sqlite"):
return "datenbank"
return "unbekannt"
def _pfad(self, relativer_pfad: str) -> Path:
"""Erzeugt OS-unabhängigen Pfad relativ zum Basisverzeichnis."""
return join_path(self.basis_pfad, relativer_pfad)
def _ist_leer(self) -> bool:
"""
Prüft robust, ob Eingabe als „leer" zu behandeln ist.
@@ -134,6 +149,31 @@ class Dateipruefer:
# 2. Pfad normalisieren
pfad = self._pfad(self.pfad.strip())
#Excel-dateien erkennen
dateityp = self.erkenne_dateityp(pfad)
if dateityp == "excel":
if not file_exists(pfad):
return pruef_ergebnis(
ok=False,
meldung=f"Excel-Datei '{self.pfad}' wurde nicht gefunden.",
aktion="datei_nicht_gefunden",
kontext=pfad,
)
return pruef_ergebnis(
ok=True,
meldung="Excel-Datei ist gültig.",
aktion="ok",
kontext=pfad,
)
if dateityp != "datenbank":
return pruef_ergebnis(
ok=False,
meldung=f"Der Pfad '{self.pfad}' ist kein unterstützter Dateityp.",
aktion="unbekannter_dateityp",
kontext=pfad,
)
# 🆕 2.c: Ungültiger GPKG-Pfad?
if not self.verfahrens_db_modus or not self._ist_gueltiger_gpkg_pfad(pfad):

View File

@@ -17,10 +17,11 @@ Designprinzipien
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
"""
from typing import Any, Dict, List, Mapping, Optional, Tuple
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import json
import time
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qgiscore_wrapper as qgiscore
@@ -59,6 +60,7 @@ class Datenabruf:
verfahrensgebiet_layer: Any,
speicherort: str,
pruef_ergebnisse: Optional[List[Any]] = None,
progress: Optional[Any] = None,
) -> Tuple[Dict[str, Any], List[Any]]:
"""
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
@@ -82,6 +84,10 @@ class Datenabruf:
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
filter_crs_authid = None
if isinstance(bbox_geom, dict):
raw_crs = bbox_geom.get("crs_authid")
filter_crs_authid = str(raw_crs) if raw_crs else None
# Globale Logs über alle Dienste hinweg
log_geladen: Dict[str, int] = {}
@@ -90,7 +96,20 @@ class Datenabruf:
log_ausserhalb: Dict[str, int] = {}
# 2) Über alle Zeilen iterieren
for row in rows:
total_rows = len(rows)
for idx, row in enumerate(rows, start=1):
if progress is not None:
progress.set_label(f"Datenabruf {idx}/{total_rows}")
if progress.is_canceled():
pe_cancel = pruef_ergebnis(
ok=False,
meldung="Datenabruf durch Benutzer abgebrochen",
aktion="abbruch",
kontext={"schritt": idx},
)
processed_results.append(self.pruefmanager.verarbeite(pe_cancel))
break
ident = row.get("ident")
link = row.get("Link")
provider = row.get("Provider")
@@ -115,7 +134,16 @@ class Datenabruf:
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
# 2b) Fachdaten abrufen
features, error_msg = self._fetch_features(url=url, provider=str(provider))
features, error_msg = self._fetch_features(
url=url,
provider=str(provider),
cancel_callback=(progress.is_canceled if progress is not None else None),
)
if progress is not None:
if hasattr(progress, "set_value"):
progress.set_value(idx)
# 2c) Logs und Aggregation
if error_msg:
@@ -207,7 +235,18 @@ class Datenabruf:
return None
if raumfilter == "Verfahrensgebiet":
return qgiscore.get_layer_extent(verfahrensgebiet_layer)
extent = qgiscore.get_layer_extent(verfahrensgebiet_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(verfahrensgebiet_layer, "crs") and callable(getattr(verfahrensgebiet_layer, "crs")):
crs = verfahrensgebiet_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
if raumfilter == "Pufferlayer":
buffer_layer = qgiscore.create_buffer_layer(
@@ -216,8 +255,18 @@ class Datenabruf:
layer_name="Verfahrensgebiet_Puffer_1km",
)
if buffer_layer is not None:
qgisui.add_layer_to_project(buffer_layer)
return qgiscore.get_layer_extent(buffer_layer)
extent = qgiscore.get_layer_extent(buffer_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(buffer_layer, "crs") and callable(getattr(buffer_layer, "crs")):
crs = buffer_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
return None
@@ -233,60 +282,130 @@ class Datenabruf:
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
"""
provider_norm = (provider or "").upper()
base_link = link or ""
base_link = (link or "").strip()
if base_link.lower().startswith("url="):
base_link = base_link[4:].strip()
# WMS: niemals BBOX anhängen
if provider_norm == "WFS" and base_link.count("?") > 1:
first, rest = base_link.split("?", 1)
base_link = f"{first}?{rest.replace('?', '&')}"
extent_obj = bbox_geom
crs_authid: Optional[str] = None
if isinstance(bbox_geom, dict):
extent_obj = bbox_geom.get("extent")
raw_crs = bbox_geom.get("crs_authid")
crs_authid = str(raw_crs) if raw_crs else None
# WMS: unverändert durchreichen
if provider_norm == "WMS":
return base_link
if bbox_geom is None:
return base_link
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
# Versuche bbox-String zu erzeugen (falls Raumfilter aktiv)
bbox_str: Optional[str] = None
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(bbox_geom)
else:
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
if extent_obj is not None:
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(extent_obj)
else:
bbox_str = str(bbox_geom)
except Exception:
bbox_str = None
if not bbox_str:
return base_link
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
bbox_str = f"{extent_obj.xmin()},{extent_obj.ymin()},{extent_obj.xmax()},{extent_obj.ymax()}"
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
bbox_str = f"{extent_obj[0]},{extent_obj[1]},{extent_obj[2]},{extent_obj[3]}"
else:
bbox_str = str(extent_obj)
except Exception:
bbox_str = None
parsed = urlparse(base_link)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
if provider_norm == "WFS":
query_params.setdefault("BBOX", bbox_str)
query_params.setdefault("service", "WFS")
query_params.setdefault("request", "GetFeature")
query_params.setdefault("outputFormat", "application/json")
if bbox_str:
query_params.setdefault("BBOX", bbox_str)
if crs_authid:
query_params.setdefault("SRSNAME", crs_authid)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
# ArcGIS FeatureServer erwartet i.d.R. den /query-Endpunkt
rest_base = base_link.rstrip("/")
if not rest_base.lower().endswith("/query"):
rest_base = f"{rest_base}/query"
parsed_rest = urlparse(rest_base)
query_params = dict(parse_qsl(parsed_rest.query, keep_blank_values=True))
query_params.setdefault("where", "1=1")
query_params.setdefault("outFields", "*")
query_params.setdefault("returnGeometry", "true")
query_params.setdefault("f", query_params.get("f", "json"))
if bbox_str:
geometry_envelope = None
try:
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
geometry_envelope = {
"xmin": extent_obj.xmin(),
"ymin": extent_obj.ymin(),
"xmax": extent_obj.xmax(),
"ymax": extent_obj.ymax(),
}
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
geometry_envelope = {
"xmin": extent_obj[0],
"ymin": extent_obj[1],
"xmax": extent_obj[2],
"ymax": extent_obj[3],
}
else:
parts = [p.strip() for p in str(bbox_str).split(",")]
if len(parts) == 4:
geometry_envelope = {
"xmin": float(parts[0]),
"ymin": float(parts[1]),
"xmax": float(parts[2]),
"ymax": float(parts[3]),
}
except Exception:
geometry_envelope = None
if geometry_envelope is not None:
query_params.setdefault("geometry", json.dumps(geometry_envelope))
else:
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
if crs_authid and ":" in crs_authid:
srid = crs_authid.split(":", 1)[1]
if srid.isdigit():
query_params.setdefault("inSR", srid)
query_params.setdefault("outSR", srid)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
rebuilt = parsed_rest._replace(query=new_query)
return urlunparse(rebuilt)
# Default: generischer bbox-Parameter
query_params.setdefault("bbox", bbox_str)
# Default: generischer bbox-Parameter (nur wenn vorhanden)
if bbox_str:
query_params.setdefault("bbox", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]:
def _fetch_features(
self,
url: str,
provider: str,
cancel_callback: Optional[Callable[[], bool]] = None,
) -> Tuple[List[Any], Optional[str]]:
"""
Führt den eigentlichen Abruf der Fachdaten durch.
@@ -336,34 +455,100 @@ class Datenabruf:
http_error: Optional[str] = None
# QGIS NetworkAccessManager bevorzugen
_FETCH_TIMEOUT_MS = 30_000 # 30 Sekunden
aborted_or_timed_out = False
attempted_qgis_fetch = False
if callable(cancel_callback) and cancel_callback():
return [], "Abbruch durch Benutzer"
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
attempted_qgis_fetch = True
try:
manager = qgiscore.QgsNetworkAccessManager.instance()
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None)
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None)
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None)
# Netzwerk-Timeout global setzen (QGIS >= 3.6)
if hasattr(manager, "setTimeout"):
manager.setTimeout(_FETCH_TIMEOUT_MS)
_qt = __import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"])
QUrl = getattr(_qt, "QUrl", None)
QNetworkRequest = getattr(_qt, "QNetworkRequest", None)
QEventLoop = getattr(_qt, "QEventLoop", None)
QTimer = getattr(_qt, "QTimer", None)
if QUrl is not None and QNetworkRequest is not None:
req = QNetworkRequest(QUrl(url))
reply = manager.get(req)
if QEventLoop is not None:
loop = QEventLoop()
reply.finished.connect(loop.quit)
loop.exec()
try:
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
_poll_timer = None
if QTimer is not None:
try:
_poll_timer = QTimer()
_poll_timer.setSingleShot(False)
_poll_timer.timeout.connect(loop.quit)
_poll_timer.start(100)
except Exception:
_poll_timer = None
start_time = time.monotonic()
while True:
if callable(cancel_callback) and cancel_callback():
reply.abort()
http_error = "Abbruch durch Benutzer"
aborted_or_timed_out = True
break
elapsed_ms = int((time.monotonic() - start_time) * 1000)
if elapsed_ms >= _FETCH_TIMEOUT_MS:
reply.abort()
http_error = f"Timeout nach {_FETCH_TIMEOUT_MS // 1000} s: {url}"
aborted_or_timed_out = True
break
if hasattr(reply, "isFinished") and reply.isFinished():
break
loop.exec()
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
if _poll_timer is not None:
try:
_poll_timer.stop()
except Exception:
pass
if not aborted_or_timed_out:
# Fehler aus Reply auslesen
err_code = None
try:
err_code = reply.error()
except Exception:
pass
if err_code and int(err_code) != 0:
http_error = f"Netzwerkfehler ({err_code}): {reply.errorString()}"
if http_error:
# Timeout oder Netzwerkfehler keinen Body lesen
pass
else:
try:
response_text = reply.text()
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
response_text = None
try:
response_text = reply.text()
except Exception:
response_text = None
except Exception as exc:
http_error = f"QgsNetworkAccessManager error: {exc}"
response_text = None
# Fallback: requests
if response_text is None:
# Fallback: requests nur wenn kein harter Abbruch/Timeout im QGIS-Request vorlag
if response_text is None and (not attempted_qgis_fetch or not aborted_or_timed_out):
try:
import requests # lokal import, keine harte Abhängigkeit
r = requests.get(url, timeout=30)
@@ -383,6 +568,8 @@ class Datenabruf:
return parsed.get("features", []), None
if isinstance(parsed, dict) and "features" in parsed:
return parsed.get("features", []), None
if prov in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER", "WFS"):
return [], "Antwort enthält keine Feature-Liste"
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
return [parsed], None
except json.JSONDecodeError:

View File

@@ -30,9 +30,13 @@ from __future__ import annotations
from typing import Any, Dict, List, Optional
import os
import json
import re
import datetime
import sqlite3
from sn_basis.functions import qgiscore_wrapper as qgiscore
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
@@ -53,10 +57,97 @@ class Datenschreiber:
def __init__(self, pruefmanager: Any, gpkg_path: Optional[str] = None) -> None:
self.pruefmanager = pruefmanager
self.gpkg_path = gpkg_path
self.gpkg_path = str(gpkg_path) if gpkg_path else None
# ------------------------------------------------------------------ #
# Schreibe Daten
def _resolve_style_path(self, style_path: Optional[str]) -> Optional[str]:
if not style_path:
return None
style_path_str = str(style_path).strip()
if not style_path_str:
return None
if not is_absolute_path(style_path_str):
plugin_root = get_plugin_root()
style_path_str = str(join_path(plugin_root, "sn_plan41", "assets", style_path_str))
style_path_str = str(normalize_path(style_path_str))
return style_path_str if file_exists(style_path_str) else None
def _store_style_in_gpkg(self, layer_name: str, style_path: str, layer: Optional[Any] = None) -> None:
"""Stellt sicher, dass der Stil in der layer_styles-Tabelle der GPKG gespeichert wird."""
try:
with open(style_path, "r", encoding="utf-8") as fh:
style_qml = fh.read()
f_geometry_column = ''
if layer is not None:
try:
if hasattr(layer, 'geometryColumn'):
f_geometry_column = str(layer.geometryColumn())
elif hasattr(layer, 'dataProvider') and hasattr(layer.dataProvider(), 'geometryColumnName'):
f_geometry_column = str(layer.dataProvider().geometryColumnName())
except Exception:
f_geometry_column = ''
with sqlite3.connect(self.gpkg_path) as conn:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS layer_styles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
f_table_catalog TEXT,
f_table_schema TEXT,
f_table_name TEXT NOT NULL,
f_geometry_column TEXT,
styleName TEXT,
styleQML TEXT,
styleSLD TEXT,
useAsDefault BOOLEAN,
description TEXT,
owner TEXT,
ui TEXT,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
# Das aktuelle QGIS-Style-Verhalten: bestehenden Style für denselben Layer nicht löschen (nur appenden)
# Wir wollen aber Default-Style setzen: alte Default-Styles entfernen.
cur.execute(
"UPDATE layer_styles SET useAsDefault = 0 WHERE f_table_name = ?",
(layer_name,),
)
# Fülle die bekannten QGIS-Kolonnen
style_name = os.path.basename(style_path)
cur.execute(
"INSERT INTO layer_styles (f_table_catalog, f_table_schema, f_table_name, f_geometry_column, styleName, styleQML, styleSLD, useAsDefault, description, owner, ui) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
'',
'',
layer_name,
f_geometry_column,
style_name,
style_qml,
None,
1,
'',
'',
'',
),
)
conn.commit()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Speichern des Layer-Stils in GPKG: {exc}",
aktion="style_gpkg_speichern_fehlgeschlagen",
kontext={"layer_name": layer_name, "style_path": style_path},
)
)
# ------------------------------------------------------------------ #
def schreibe_Daten(
self,
@@ -65,192 +156,93 @@ class Datenschreiber:
speicherort: str,
) -> List[Dict[str, Any]]:
"""
Schreibt die abgerufenen Daten in die Zieldatenbank/Dateien.
Schreibt die übergebenen Layer in die Ziel-GPKG.
Ablauf
------
Für jede Zeile (ident) in ``daten_dict["daten"]``:
1. Bestimme Ziel-Layername (z. B. Thema oder ident).
2. Prüfe, ob ein Layer mit diesem Namen bereits existiert (Wrapper).
3. Falls vorhanden, frage den Benutzer (Überschreiben / Anhängen / Abbrechen)
über die zentrale Pruefmanager-Methode `ask_overwrite_append_cancel`.
4. Führe die gewählte Operation aus oder schreibe den Layer, wenn er noch nicht existiert.
5. Schreibe ggf. den Stil in die GPKG und setze ihn als Vorgabe.
6. Sammle und gib eine Liste der angelegten/geänderten Layer zurück.
Returns
-------
List[Dict[str, Any]]
Liste von Dicts mit Informationen zu jedem angelegten/geänderten Layer.
Erwartung:
- daten_dict["daten"] enthält Einträge der Form:
ident -> {"layer": QgsVectorLayer}
- self.gpkg_path ist ein str
"""
if not speicherort:
raise ValueError("Ein gültiger Speicherort (speicherort) muss übergeben werden.")
# Setze gpkg_path falls noch nicht vorhanden
# gpkg_path einmalig setzen / normalisieren
if not self.gpkg_path:
self.gpkg_path = speicherort
self.gpkg_path = str(speicherort)
results: List[Dict[str, Any]] = []
daten_map: Dict[str, List[Any]] = daten_dict.get("daten", {})
daten_map: Dict[str, Any] = daten_dict.get("daten", {})
# Iteriere über alle Einträge
for ident, features in daten_map.items():
# Thema/Name ableiten (falls vorhanden in processed_results oder ident)
for ident, entry in daten_map.items():
layer = None
style_path = None
# -----------------------------
# Layer extrahieren
# -----------------------------
if isinstance(entry, dict):
layer = entry.get("layer")
style_path = self._resolve_style_path(entry.get("style_path"))
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Ungültiger Layer für {ident}",
aktion="save_exception",
kontext={"ident": ident},
)
self.pruefmanager.verarbeite(pe_err)
continue
# -----------------------------
# Layername bestimmen
# -----------------------------
thema = None
for pe in processed_results:
try:
kontext = getattr(pe, "kontext", None) or {}
if kontext and kontext.get("ident") == ident:
if kontext.get("ident") == ident:
thema = kontext.get("thema")
break
except Exception:
continue
if not thema:
thema = str(ident)
layer_name = thema
layer_name_raw = thema or str(ident)
layer_name = re.sub(r"[^A-Za-z0-9_]+", "_", layer_name_raw).strip("_")
if not layer_name:
layer_name = f"layer_{ident}"
# Prüfe, ob Layer bereits existiert in der Ziel-GPKG
layer_exists = False
try:
layer_exists_fn = getattr(qgiscore, "layer_exists_in_gpkg", None)
if callable(layer_exists_fn):
layer_exists = layer_exists_fn(self.gpkg_path, layer_name)
else:
# Fallback: QGIS-Fallback-Check via QgsVectorLayer
if getattr(qgiscore, "QgsVectorLayer", None) is not None and qgiscore.QGIS_AVAILABLE:
uri = f"{self.gpkg_path}|layername={layer_name}"
layer = qgiscore.QgsVectorLayer(uri, layer_name, "ogr")
layer_exists = bool(layer and getattr(layer, "isValid", lambda: False)())
except Exception:
layer_exists = False
operation = "created"
if layer_exists:
# Zentrale Nutzerabfrage über Pruefmanager
# Erwartet Rückgabe: "overwrite" | "append" | "cancel"
try:
user_choice = self.pruefmanager.ask_overwrite_append_cancel(layer_name)
except Exception:
# Fallback: overwrite, falls Pruefmanager nicht verfügbar
user_choice = "overwrite"
if user_choice == "cancel":
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
if user_choice == "overwrite":
write_err = self._write_layer_to_gpkg(layer_name, features, mode="overwrite")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Überschreiben von {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "overwritten"
elif user_choice == "append":
write_err = self._write_layer_to_gpkg(layer_name, features, mode="append")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Anhängen an {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "appended"
else:
# Layer existiert nicht -> neu anlegen
write_err = self._write_layer_to_gpkg(layer_name, features, mode="create")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erstellen von {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "created"
# Stilbehandlung (falls in processed_results referenziert)
style_written = False
style_path = None
for pe in processed_results:
try:
kontext = getattr(pe, "kontext", None) or {}
if kontext and kontext.get("ident") == ident:
style_path = kontext.get("stildatei") or kontext.get("Stildatei")
break
except Exception:
continue
# Layer in GPKG schreiben
err_msg = self._write_layer_to_gpkg(layer_name=layer_name, layer=layer)
if err_msg is not None:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Schreiben des Layers {layer_name}: {err_msg}",
aktion="save_exception",
kontext={"ident": ident, "layer_name": layer_name},
)
self.pruefmanager.verarbeite(pe_err)
continue
# Wenn der Stil vorhanden und valide ist, als Default in GPKG-Style-Tabelle ablegen
if style_path:
if not os.path.isabs(style_path):
base_dir = os.path.dirname(__file__)
style_path = os.path.join(base_dir, style_path)
write_style_fn = getattr(qgiscore, "write_style_to_gpkg", None)
if callable(write_style_fn):
try:
write_style_fn(self.gpkg_path, style_path, layer_name)
style_written = True
except Exception:
style_written = False
feature_count = len(features) if isinstance(features, list) else 0
self._store_style_in_gpkg(layer_name, style_path, layer)
# Erfolgsfall: Info für lade_Layer sammeln
layer_path = f"{self.gpkg_path}|layername={layer_name}"
results.append({
"layer_path": layer_path,
"thema": layer_name,
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": feature_count,
"style_written": style_written,
"style_path": style_path,
})
return results
# -----------------------------
# ------------------------------------------------------------------ #
# Lade Layer ins Projekt
# ------------------------------------------------------------------ #
@@ -288,18 +280,33 @@ class Datenschreiber:
self.pruefmanager.verarbeite(pe_err)
continue
try:
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
if callable(apply_style_fn):
apply_style_fn(self.gpkg_path, layer)
except Exception:
pe_warn = pruef_ergebnis(
ok=True,
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
aktion="stil_not_implemented",
kontext={"thema": thema},
)
self.pruefmanager.verarbeite(pe_warn)
style_path = info.get("style_path")
resolved_style_path = self._resolve_style_path(style_path)
if resolved_style_path:
try:
layer.loadNamedStyle(resolved_style_path)
layer.triggerRepaint()
except Exception as exc:
pe_warn = pruef_ergebnis(
ok=True,
meldung=f"Style konnte für {thema} nicht geladen werden: {exc}",
aktion="stil_laden_fehlgeschlagen",
kontext={"thema": thema, "style_path": resolved_style_path},
)
self.pruefmanager.verarbeite(pe_warn)
else:
try:
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
if callable(apply_style_fn):
apply_style_fn(self.gpkg_path, layer)
except Exception:
pe_warn = pruef_ergebnis(
ok=True,
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
aktion="stil_not_implemented",
kontext={"thema": thema},
)
self.pruefmanager.verarbeite(pe_warn)
try:
# qgisui wrapper wird hier nicht direkt für die Abfrage verwendet;
@@ -374,62 +381,67 @@ class Datenschreiber:
# ------------------------------------------------------------------ #
# Hilfsfunktionen intern
# ------------------------------------------------------------------ #
def _write_layer_to_gpkg(self, layer_name: str, features: List[Any], mode: str = "create") -> Optional[str]:
def _write_layer_to_gpkg(
self,
layer_name: str,
layer: Any,
) -> Optional[str]:
"""
Interne Hilfsfunktion zum Schreiben eines Layers in das GPKG.
Schreibt einen QgsVectorLayer in die Ziel-GPKG.
Erwartete qgiscore-Funktion:
qgiscore.write_features_to_gpkg(gpkg_path, layer_name, features, mode)
Voraussetzungen:
- self.gpkg_path ist ein str
- layer ist ein gültiger QgsVectorLayer
"""
write_fn = getattr(qgiscore, "write_features_to_gpkg", None)
if callable(write_fn):
try:
write_fn(self.gpkg_path, layer_name, features, mode)
return None
except Exception as exc:
return str(exc)
# Fallback: Verwende QgsVectorFileWriter, falls QGIS verfügbar
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsVectorFileWriter", None) is not None:
try:
# Minimaler Fallback: erwarte, dass 'features' eine Liste von QgsFeature ist
if not features:
# Erstelle leeren Layer-Eintrag (GPKG erlaubt leere Layer)
# Hier vereinfachen wir: writeAsVectorFormatV3 benötigt ein Layer-Objekt.
return None
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
return "Ungültiger Layer zum Schreiben übergeben"
# Versuche, ein Memory-Layer aus dem ersten Feature zu ermitteln
first = features[0]
mem_layer = None
if hasattr(first, "fields") and hasattr(first, "geometry"):
# Wenn Features QgsFeature sind, versuchen wir, das zugehörige Layer zu nutzen
try:
mem_layer = first.layer() if hasattr(first, "layer") else None
except Exception:
mem_layer = None
try:
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
opts.driverName = "GPKG"
opts.layerName = layer_name
opts.fileEncoding = "UTF-8"
if mem_layer is None:
return "Keine Feld-/Geometrie-Informationen zum Schreiben vorhanden"
# Style in der GPKG speichern, wenn möglich
if hasattr(opts, "symbologyExport"):
try:
# QGIS: SymbologyExport-Wert z.B. QgsVectorFileWriter.SaveVectorOptions.Symbology
saveOpts = qgiscore.QgsVectorFileWriter.SaveVectorOptions
sym_val = getattr(saveOpts, "Symbology", None)
if sym_val is None:
sym_val = getattr(saveOpts, "SymbologyExport", None)
if sym_val is not None:
opts.symbologyExport = sym_val
except Exception:
pass
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
opts.driverName = "GPKG"
opts.layerName = layer_name
opts.fileEncoding = "UTF-8"
if mode == "overwrite":
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
else:
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
# Datei existiert → Layer überschreiben
# Datei existiert nicht → neue GPKG anlegen
if not os.path.exists(self.gpkg_path):
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
else:
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
mem_layer,
self.gpkg_path,
qgiscore.QgsProject.instance().transformContext(),
opts
)
if err != qgiscore.QgsVectorFileWriter.NoError:
return f"Fehler beim Schreiben (Code {err})"
return None
except Exception as exc:
return str(exc)
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
layer,
self.gpkg_path,
qgiscore.QgsProject.instance().transformContext(),
opts,
)
return "Keine Schreib-Funktion verfügbar (Wrapper nicht implementiert)"
# QGIS ≥3 liefert ein Tupel: (error_code, error_message, new_filename, new_layer_name)
if isinstance(err, tuple):
error_code = err[0]
error_msg = err[1] if len(err) > 1 else ""
else:
error_code = err
error_msg = ""
if error_code != qgiscore.QgsVectorFileWriter.NoError:
return f"Fehler beim Schreiben (Code {error_code}, msg='{error_msg}')"
return None
except Exception as exc:
return str(exc)

395
modules/LayerLoader.py Normal file
View File

@@ -0,0 +1,395 @@
"""sn_basis/modules/LayerLoader.py
Kapselt Layer-Erstellung, Raumfilter und Stil-Logik.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import time
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
from sn_basis.functions.qgiscore_wrapper import (
QgsVectorLayer,
QgsRasterLayer,
QgsFeatureRequest,
QgsProject,
QgsNetworkAccessManager,
QgsCoordinateTransform,
)
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.layerpruefer import Layerpruefer
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qt_wrapper as qt
class LayerLoader:
"""Lädt und filtert Layer aus Dienst-/Datenquellen."""
def __init__(
self,
pruefmanager: Any,
stil_pruefer: Optional[Stilpruefer] = None,
layer_pruefer: Optional[Layerpruefer] = None,
) -> None:
self.pruefmanager = pruefmanager
self.stil_pruefer = stil_pruefer or Stilpruefer()
self.layer_pruefer = layer_pruefer or Layerpruefer()
_LAYER_TIMEOUT_MS = 30_000 # 30 Sekunden
def _was_canceled(self, cancel_callback: Optional[Any]) -> bool:
if not callable(cancel_callback):
return False
try:
return bool(cancel_callback())
except Exception:
return False
def _process_events(self) -> None:
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
def _transform_geometry_to_layer_crs(self, geometry: Any, source_layer: Any, target_layer: Any) -> Any:
if geometry is None or source_layer is None or target_layer is None:
return geometry
if QgsCoordinateTransform is None or QgsProject is None:
return geometry
try:
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
if source_crs is None or target_crs is None:
return geometry
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
if source_authid and target_authid and source_authid == target_authid:
return geometry
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
if hasattr(geometry, "clone") and callable(getattr(geometry, "clone")):
geom_copy = geometry.clone()
else:
geom_copy = geometry
geom_copy.transform(ct)
return geom_copy
except Exception:
return geometry
def _transform_extent_to_layer_crs(self, extent: Any, source_layer: Any, target_layer: Any) -> Any:
if extent is None or source_layer is None or target_layer is None:
return extent
if QgsCoordinateTransform is None or QgsProject is None:
return extent
try:
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
if source_crs is None or target_crs is None:
return extent
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
if source_authid and target_authid and source_authid == target_authid:
return extent
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
if hasattr(ct, "transformBoundingBox"):
return ct.transformBoundingBox(extent)
return extent
except Exception:
return extent
def create_layer(self, provider: str, link: str, thema: str) -> Optional[QgsVectorLayer]:
provider_lower = provider.lower() if provider else ""
layer = None
# Netzwerk-Timeout für alle netzwerkbasierten Provider setzen
if provider_lower in ("wfs", "wms", "rest"):
try:
nam = QgsNetworkAccessManager.instance()
if hasattr(nam, "setTimeout"):
nam.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
try:
if provider_lower == "wfs":
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
layer = QgsVectorLayer(uri, thema, "WFS")
elif provider_lower == "wms":
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
layer = QgsRasterLayer(uri, thema, "wms")
elif provider_lower in ("ogr", "gpkg", "shp", "geojson"):
layer = QgsVectorLayer(link, thema, "ogr")
elif provider_lower == "rest":
rest_link = link.strip()
if rest_link.lower().endswith("/featureserver"):
rest_link = rest_link.rstrip("/") + "/0"
uri = rest_link if rest_link.lower().startswith("url=") else f"url={rest_link}"
layer = QgsVectorLayer(uri, thema, "arcgisfeatureserver")
else:
layer = QgsVectorLayer(link, thema, "ogr")
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erstellen des Layers {thema}: {exc}",
aktion="layer_nicht_verfuegbar",
kontext={"provider": provider, "link": link},
)
)
return None
if not layer or not layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Layer {thema} (Provider={provider}) konnte nicht geladen werden."
,aktion="layer_nicht_verfuegbar",
kontext={"provider": provider, "link": link},
)
)
return None
return layer
def apply_style(self, layer: QgsVectorLayer, style_path: Optional[str]) -> None:
if not style_path or layer is None or not layer.isValid():
return
if not style_path.strip():
return
if not is_absolute_path(style_path):
plugin_root = get_plugin_root()
style_path = str(join_path(plugin_root, "sn_plan41", "assets", style_path))
# normalize path for consistency
style_path = str(normalize_path(style_path))
# Debug: welche Stil-Datei wird geprüft?
print(f"[LayerLoader] Überprüfe Stildatei: '{style_path}'")
if file_exists(style_path):
try:
layer.loadNamedStyle(style_path)
layer.triggerRepaint()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Stil-Laden für {layer.name()}: {exc}",
aktion="stil_laden_fehlgeschlagen",
kontext={"thema": layer.name(), "style_path": style_path},
)
)
else:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=True,
meldung=f"Stildatei nicht gefunden (optional): {style_path}",
aktion="stil_nicht_gefunden",
kontext={"thema": layer.name(), "style_path": style_path},
)
)
def filter_by_extent(self, layer: QgsVectorLayer, extent, cancel_callback: Optional[Any] = None, source_layer: Optional[Any] = None) -> Optional[QgsVectorLayer]:
"""Beschneidet <layer> auf die rechteckige Ausdehnung <extent>.
Diese Methode verwendet einen einfachen BBOX-Filter. Für komplexere
Raumeinschränkungen (z.B. Verfahrensgebiet) sollte stattdessen
:meth:`filter_by_layer` verwendet werden, da dort echte Geometrie-Tests
stattfinden.
"""
if not layer or not layer.isValid() or extent is None:
return layer
if layer.type() != QgsVectorLayer.VectorLayer:
return layer
extent_for_layer = self._transform_extent_to_layer_crs(extent, source_layer, layer)
request = QgsFeatureRequest().setFilterRect(extent_for_layer)
if hasattr(request, "setTimeout"):
try:
request.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
start = time.monotonic()
features: List[Any] = []
try:
for feat in layer.getFeatures(request):
if self._was_canceled(cancel_callback):
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Abbruch beim Raumfilter (BBOX) für {layer.name()}",
aktion="needs_user_action",
kontext={"thema": layer.name()},
)
)
return None
elapsed_ms = int((time.monotonic() - start) * 1000)
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Timeout beim Raumfilter (BBOX) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
aktion="url_nicht_erreichbar",
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
)
)
return None
features.append(feat)
if len(features) % 100 == 0:
self._process_events()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Lesen der Features für {layer.name()}: {exc}",
aktion="layer_nicht_verfuegbar",
kontext={"thema": layer.name()},
)
)
return None
if not features:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_bbox", "memory")
if not filtered_layer or not filtered_layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
aktion="filterlayer_nicht_erzeugt",
kontext={"thema": layer.name()},
)
)
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(features)
filtered_layer.updateExtents()
return filtered_layer
def filter_by_layer(self, layer: QgsVectorLayer, filter_layer: QgsVectorLayer, cancel_callback: Optional[Any] = None) -> Optional[QgsVectorLayer]:
"""Beschneidet <layer> auf die tatsächliche Geometrie des
<filter_layer>.
Diese Methode wird z.B. für das Verfahrensgebiet verwendet, damit nicht
die gesamte Bounding-Box, sondern nur die echten Flächen als Raumfilter
gelten. Wenn der Filter-Layer mehrere Features enthält, werden deren
Geometrien zu einem Multi-Geom vereinigt.
"""
if not layer or not layer.isValid() or not filter_layer or not filter_layer.isValid():
return layer
if layer.type() != QgsVectorLayer.VectorLayer:
return layer
# vereinigte Geometrie aller Features im Filter-Layer
union_geom = None
for f in filter_layer.getFeatures():
try:
geom = self._transform_geometry_to_layer_crs(f.geometry(), filter_layer, layer)
if union_geom is None:
union_geom = geom
else:
union_geom = union_geom.combine(geom)
except Exception:
# bei einem Fehler einfach weiterfahren
continue
if union_geom is None or union_geom.isEmpty():
return None
# nun alle Features aus <layer> nehmen, deren Geometrie sich schneidet
filtered = []
request = QgsFeatureRequest().setFilterRect(union_geom.boundingBox())
if hasattr(request, "setTimeout"):
try:
request.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
start = time.monotonic()
for f in layer.getFeatures(request):
if self._was_canceled(cancel_callback):
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Abbruch beim Raumfilter (Geometrie) für {layer.name()}",
aktion="needs_user_action",
kontext={"thema": layer.name()},
)
)
return None
elapsed_ms = int((time.monotonic() - start) * 1000)
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Timeout beim Raumfilter (Geometrie) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
aktion="url_nicht_erreichbar",
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
)
)
return None
try:
if f.geometry() and f.geometry().intersects(union_geom):
filtered.append(f)
except Exception:
continue
if len(filtered) % 100 == 0:
self._process_events()
if not filtered:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_filtered", "memory")
if not filtered_layer or not filtered_layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
aktion="filterlayer_nicht_erzeugt",
kontext={"thema": layer.name()},
)
)
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(filtered)
filtered_layer.updateExtents()
return filtered_layer
def add_to_project(self, layer: QgsVectorLayer) -> None:
if layer and layer.isValid():
QgsProject.instance().addMapLayer(layer)

View File

@@ -5,7 +5,7 @@ sn_basis/modules/Pruefmanager.py
from __future__ import annotations
from typing import Optional, Any
from sn_basis.functions import ask_yes_no, info, warning, error
from sn_basis.functions import ask_yes_no, info, warning, error, ask_overwrite_append_cancel_custom
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion
print("DEBUG: Pruefmanager DATEI GELADEN:", __file__)
@@ -60,6 +60,26 @@ class Pruefmanager:
# VERFAHRENS-DB-spezifische Entscheidungen
# ------------------------------------------------------------------
def _handle_datei_existiert(self, ergebnis: pruef_ergebnis) -> pruef_ergebnis:
"""Handhabt das Szenario, dass die Ziel-Verfahrens-DB bereits existiert.
Zeigt einen einzigen Dialog mit drei Optionen an:
- **Überschreiben**: Bestehende Layer ersetzen (entspricht YES)
- **Anhängen**: Neue Layer zur Datei hinzufügen (entspricht NO)
- **Abbrechen**: Vorgang beenden (entspricht CANCEL)
Parameters
----------
ergebnis : pruef_ergebnis
Eingabe-Ergebnis mit Dateipfad im ``kontext``-Attribut.
Returns
-------
pruef_ergebnis
Ergebnis mit Aktion:
- ``datei_existiert_ueberschreiben``
- ``datei_existiert_anhaengen``
- ``datei_existiert_ueberspringen`` (für Cancel-Fall)
"""
if self.ui_modus != "qgis":
return ergebnis
@@ -72,48 +92,34 @@ class Pruefmanager:
"Was soll geschehen?\n\n"
"• **Überschreiben**: Bestehende Layer ersetzen\n"
"• **Anhängen**: Neue Layer hinzufügen\n"
"• **Überspringen**: Nur temporäre Layer erzeugen"
"• **Abbrechen**: Vorgang beenden"
)
# Vereinfacht: Erst Überschreiben? → Dann Anhängen? → Überspringen
if ask_yes_no(
titel,
f"{meldung}\n\n**Überschreiben** (alle Layer ersetzen)?",
default=False,
parent=self.parent
):
# Einzelner Dialog mit drei Optionen
entscheidung = ask_overwrite_append_cancel_custom(
parent=self.parent,
title=titel,
message=meldung
)
if entscheidung == "overwrite":
return pruef_ergebnis(
ok=True,
aktion="datei_existiert_ueberschreiben",
kontext=ergebnis.kontext,
)
if ask_yes_no(
titel,
f"{meldung}\n\n**Anhängen** (neue Layer hinzufügen)?",
default=False,
parent=self.parent
):
elif entscheidung == "append":
return pruef_ergebnis(
ok=True,
aktion="datei_existiert_anhaengen",
kontext=ergebnis.kontext,
)
if ask_yes_no(
titel,
f"{meldung}\n\n**Überspringen** (nur temporäre Layer)?",
default=True,
parent=self.parent
):
else: # cancel
return pruef_ergebnis(
ok=True,
aktion="datei_existiert_ueberspringen",
kontext=ergebnis.kontext,
)
return ergebnis
# ------------------------------------------------------------------
# Basis-Entscheidungen (KORREKT: → pruef_ergebnis)
# ------------------------------------------------------------------
@@ -210,3 +216,26 @@ class Pruefmanager:
)
print("🔥 verarbeite() ENDE mit ok=False")
return ergebnis
def _ask_use_or_replace_pufferlayer(self) -> str:
"""
Fragt den Nutzer, ob ein vorhandener Pufferlayer verwendet
oder ersetzt werden soll.
Returns
-------
str
"verwenden", "ersetzen" oder "abbrechen"
"""
ergebnis = pruef_ergebnis(
ok=False,
aktion="layer_existiert",
meldung="Ein Pufferlayer ist bereits vorhanden.",
)
ergebnis = self.pruefmanager.verarbeite(ergebnis)
if not ergebnis.ok:
return "abbrechen"
return "verwenden" if ergebnis.aktion == "ok" else "ersetzen"

View File

@@ -61,7 +61,8 @@ class Linkpruefer:
aktion="leer",
kontext=None,
)
#evtl. Pfad-Objekte in string umwandeln
eingabe = str(eingabe)
# -----------------------------------------------------
# 1. Fall: URL
# -----------------------------------------------------

View File

@@ -43,6 +43,13 @@ PruefAktion = Literal[
# Dateiendung/Format
"falsche_endung",
"pflichtfelder_fehlen",
"unbekannter_dateityp",
"Datenbank",
"dienst",
"excel",
"unbekannte_quelle",
# Excel/Import
"kein_header",
@@ -50,6 +57,8 @@ PruefAktion = Literal[
"read_error",
"open_error",
"datenabruf",
# 🆕 VERFAHRENS-DB SPEZIFISCH (deine Anforderungen 2.d, 2.e)
"datei_wird_erzeugt", # 2.d: Pfad gültig, Datei fehlt → weiter

View File

@@ -4,9 +4,10 @@ Prüft ausschließlich, ob ein Stilpfad gültig ist.
Die Anwendung erfolgt später über eine Aktion.
"""
from pathlib import Path
import os
from sn_basis.functions import file_exists
from sn_basis.functions.os_wrapper import is_absolute_path
from sn_basis.functions.sys_wrapper import get_plugin_root, file_exists, join_path
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
@@ -40,7 +41,11 @@ class Stilpruefer:
kontext=None,
)
pfad = Path(stil_pfad)
pfad = str(stil_pfad)
if not is_absolute_path(pfad):
plugin_root = get_plugin_root()
pfad = str(join_path(plugin_root, "sn_plan41", "assets", pfad))
# -----------------------------------------------------
# 2. Datei existiert nicht
@@ -56,7 +61,7 @@ class Stilpruefer:
# -----------------------------------------------------
# 3. Falsche Endung
# -----------------------------------------------------
if pfad.suffix.lower() != ".qml":
if os.path.splitext(pfad)[1].lower() != ".qml":
return pruef_ergebnis(
ok=False,
meldung="Die Stil-Datei muss die Endung '.qml' haben.",

11
plugin.info Normal file
View File

@@ -0,0 +1,11 @@
name=LNO Sachsen | Basisfunktionen
description=Plugin mit Basisfunktionen
author=Daniel Helbig
email=daniel.helbig@kreis-meissen.de
qgisMinimumVersion=3.0
qgisMaximumVersion=4.99
deprecated=False
experimental=true
supportsQt6=Yes
zip_folder=sn_basis