31 Commits

Author SHA1 Message Date
2ff465b86d Merge pull request 'feture/Druck_tab' (#11) from feture/Druck_tab into unstable
Reviewed-on: AG_QGIS/Plugin_SN_Basis#11
2026-03-20 22:58:08 +01:00
f19fe71bfa Ergänzungen in de Wrappern/ Prüfmanager für Layouts 2026-03-20 14:01:57 +01:00
ae5f88c5b8 Dialoge für Vorlagen-Piepeline ergänzt 2026-03-20 12:42:21 +01:00
7cd6e3ef24 checkbox im qt_wrapper ergänzt 2026-03-20 12:01:16 +01:00
1be1420f66 changelog.txt aktualisiert 2026-03-19 06:44:35 +01:00
f25e30c489 Release 26.3.6-unstable 2026-03-19 05:32:02 +00:00
0eb32453d6 changelog.txt aktualisiert 2026-03-19 06:31:36 +01:00
841b529ad8 Release 26.3.5-unstable 2026-03-19 05:30:39 +00:00
ae5725cd03 Release 26.3.4-unstable 2026-03-19 05:23:20 +00:00
ac5a3993c8 Release 26.3.3-unstable 2026-03-18 14:34:41 +00:00
22b45fe19a Release 26.3.2-unstable 2026-03-18 14:18:14 +00:00
Michael Otto
24c2137dc2 Auf neuen Release Workflow umgestellt 2026-03-18 15:13:37 +01:00
Michael Otto
c0c0387b1d Änderungen an plugin.cfg 2026-03-13 13:58:33 +01:00
Michael Otto
663ca770a1 Schritt zu 'plugin.cfg einlesen' umbenannt und Schleife angepasst, um letzte Zeile ohne Newline zu lesen 2026-03-13 12:08:32 +01:00
Michael Otto
04319b6f7b plugin.info->plugin.cfg 2026-03-13 12:06:34 +01:00
Michael Otto
1c70d62739 plugin.info einlesen: echo eingefügt 2026-03-13 12:04:32 +01:00
Michael Otto
3971bd3408 plugin.info einlesen angepasst 2026-03-13 12:02:39 +01:00
Michael Otto
fa04fc80e3 ZIP Erstellung auf Original zurückgesetzt 2026-03-13 11:30:11 +01:00
Michael Otto
04bdfbe9d8 Fallback für ZIP_FOLDER hinzugefügt: Wenn leer, auf sn_basis setzen 2026-03-13 11:27:18 +01:00
Michael Otto
b6b791e5bd Behebung des Parsens von plugin.info: Robuste Schleife für Outputs 2026-03-13 11:24:36 +01:00
Michael Otto
82be564c29 Verbesserung des ZIP-Erstellungsprozesses: Debugging hinzugefügt und Warnungen bei /dev-Dateien durch Behandlung symbolischer Links behoben 2026-03-13 11:21:38 +01:00
Michael Otto
f42260b66c ZIP Erstellung geändert um im Runner genauer zu sehen wo es zu Problemen gekommen ist 2026-03-13 11:16:46 +01:00
Michael Otto
327c25388f metadata.txt gelöscht, changelog.txt eingefügt 2026-03-13 11:06:58 +01:00
Michael Otto
c6c9613120 Update plugin.info based on metadata.txt 2026-03-13 08:39:24 +01:00
6e1f4c615b Merge pull request 'daniel@feature/dataGrabber_anbinden' (#9) from Daniel/Plugin_SN_Basis:daniel@feature/dataGrabber_anbinden into unstable
Reviewed-on: AG_QGIS/Plugin_SN_Basis#9
2026-03-13 06:51:56 +01:00
f876218134 plugin.info hinzugefügt 2026-03-13 06:45:55 +01:00
9829ac9c81 Diensteabruf integriert 2026-03-12 16:14:02 +01:00
ae956b0046 Überarbeitung für Pufferlayer-Fachdaten laden und gpkg-speichern/laden 2026-03-11 20:56:02 +01:00
0ec24029d8 fix QGIS 4.0-Kompatibilität;
Linkpruefer-Eingabe als String normalisiert, falls Paf-Objekte übergeben werden
2026-03-11 12:38:48 +01:00
948041da52 Merge pull request 'qt_wrapper, dialog;wrapper, Pruef_ergebnis und Pruefmanager überarbeitet, so dass die Übergaben jetzt stimmen. Nutzerabfragen werden tatsächlich ausgelöst- Nutzerabfrage Datei überschreiebn... ist noch Blödsinn' (#7) from Daniel/Plugin_SN_Basis:dev into dev
Reviewed-on: AG_QGIS/Plugin_SN_Basis#7
2026-03-04 19:42:41 +01:00
439de5527a Merge pull request 'dev' (#6) from Daniel/Plugin_SN_Basis:dev into dev
Reviewed-on: https://entwicklung.vln-sn.de/AG_QGIS/Plugin_SN_Basis/pulls/6
2026-02-25 13:27:01 +01:00
20 changed files with 1755 additions and 619 deletions

View File

@@ -0,0 +1,133 @@
name: Release Plugin
run-name: "Release | ${{ github.ref_name }}"
on:
push:
tags:
- 'v*'
jobs:
release:
runs-on: alpine-latest
defaults:
run:
shell: bash
steps:
- name: Notwendige Abhängigkeiten installieren
shell: sh
run: |
apk add --no-cache bash git jq curl
- name: Code holen
run: |
REPO_URL="https://${RELEASE_TOKEN}:x-oauth-basic@${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}.git"
git clone "$REPO_URL" repo
cd repo
git checkout "$TAG"
env:
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
TAG: "${{ github.ref_name }}"
- name: Version und Kanal bestimmen
id: releaseinfo
run: |
TAG="${{ github.ref_name }}"
RAW_VERSION="${TAG#v}"
VERSION="${RAW_VERSION%%-*}"
# Channel und Suffix automatisch bestimmen anhand des Tag-Suffix
case "$RAW_VERSION" in
*-testing*|*-t|*-T)
CHANNEL="testing"
PRERELEASE="true"
SUFFIX="-testing"
;;
*-unstable*|*-u|*-U)
CHANNEL="unstable"
PRERELEASE="true"
SUFFIX="-unstable"
;;
*)
CHANNEL="stable"
PRERELEASE="false"
SUFFIX=""
;;
esac
# Output setzen
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "channel=$CHANNEL" >> $GITHUB_OUTPUT
echo "prerelease=$PRERELEASE" >> $GITHUB_OUTPUT
# Optional Debug
echo "VERSION=$VERSION"
echo "CHANNEL=$CHANNEL"
echo "PRERELEASE=$PRERELEASE"
# - name: plugin.cfg einlesen
# id: config
# run: |
# cd repo
# while read -r line || [ -n "$line" ]; do
# key="${line%%=*}"
# value="${line#*=}"
# echo "$key=$value" >> $GITHUB_OUTPUT
# done < plugin.cfg
- name: Payload erzeugen
id: payload
run: |
cd repo
NAME="${GITHUB_REPOSITORY##*/}"
GROUP="${GITHUB_REPOSITORY%%/*}"
VERSION="${{ steps.releaseinfo.outputs.version }}"
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
PRERELEASE="${{ steps.releaseinfo.outputs.prerelease }}"
ZIP_FOLDER="${{ vars.ZIP_FOLDER }}"
ZIP_FILE="${ZIP_FOLDER}.zip"
TAG="${{ github.ref_name }}"
#GIT_URL=${GITHUB_REPOSITORY}
jq -n \
--arg name "$NAME" \
--arg group "$GROUP" \
--arg version "$VERSION" \
--arg channel "$CHANNEL" \
--arg prerelease "$PRERELEASE" \
--arg zip_folder "$ZIP_FOLDER" \
--arg zip_file "$ZIP_FILE" \
--arg tag "$TAG" \
'{
name: $name,
group: $group,
version: $version,
channel: $channel,
prerelease: ($prerelease == "true"),
zip_folder: $zip_folder,
zip_file: $zip_file,
tag: $tag
}' > payload.json
cat payload.json
- name: Repository aktualisieren
run: |
NAME="${GITHUB_REPOSITORY##*/}"
TAG="${{ steps.releaseinfo.outputs.version }}"-"${{ steps.releaseinfo.outputs.channel }}"
PAYLOAD_B64=$(base64 -w0 repo/payload.json)
JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\",\"name\":\"$NAME\",\"tag\":\"$TAG\"}}"
curl -X POST \
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
-H "Content-Type: application/json" \
-d "$JSON" \
"https://${{ vars.RELEASE_URL }}/api/v1/repos/${OWNER}/Repository/actions/workflows/${WORKFLOW}/dispatches"
env:
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
OWNER: "AG_QGIS"
WORKFLOW: "release.yaml"

View File

@@ -1,289 +0,0 @@
name: Release Plugin
run-name: "Release | ${{ github.ref_name }}"
on:
push:
tags:
- 'v*'
jobs:
release:
runs-on: alpine-latest
defaults:
run:
shell: bash
steps:
- name: Notwendige Abhängigkeiten installieren
shell: sh
run: |
apk add --no-cache git zip curl jq rsync bash
git config --global http.sslVerify false
- name: Code holen
run: |
# Tag aus GitHub Actions Kontext extrahieren
TAG="${GITHUB_REF#refs/tags/}"
# Repo-URL dynamisch aus vars und github.repository bauen
REPO_URL="https://${RELEASE_TOKEN}:x-oauth-basic@${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}.git"
# Repository klonen
git clone "$REPO_URL" repo
cd repo
git checkout "$TAG"
env:
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
- name: Version und Kanal bestimmen
id: releaseinfo
run: |
TAG="${{ github.ref_name }}"
VERSION="${TAG#v}"
case "$TAG" in
*-unstable*)
CHANNEL="unstable"
DRAFT="false"
PRERELEASE="true"
;;
*-testing*)
CHANNEL="testing"
DRAFT="false"
PRERELEASE="true"
;;
*)
CHANNEL="stable"
DRAFT="false"
PRERELEASE="false"
;;
esac
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "channel=$CHANNEL" >> $GITHUB_OUTPUT
echo "draft=$DRAFT" >> $GITHUB_OUTPUT
echo "prerelease=$PRERELEASE" >> $GITHUB_OUTPUT
- name: plugin.info einlesen
id: info
run: |
cd repo
while IFS='=' read -r key value; do
echo "$key=$value" >> $GITHUB_OUTPUT
done < plugin.info
- name: Changelog einlesen
id: changelog
run: |
cd repo
# Aktueller Block = alles vor dem ersten ---
CURRENT=$(awk '/^---/{exit} {print}' changelog.txt)
# Vollständige Historie = alles nach dem ersten ---
HISTORY=$(awk 'found{print} /^---/{found=1}' changelog.txt)
# Gitea Release Body zusammenbauen
VERSION="${{ steps.releaseinfo.outputs.version }}"
FULL=$(printf "## %s\n%s\n\n%s" "$VERSION" "$CURRENT" "$HISTORY")
echo "DEBUG | Aktueller Changelog:"
echo "$CURRENT"
# Für GITHUB_OUTPUT: Multiline via EOF-Marker
echo "current<<EOF" >> $GITHUB_OUTPUT
echo "$CURRENT" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
echo "full<<EOF" >> $GITHUB_OUTPUT
echo "$FULL" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: metadata.txt erzeugen
run: |
cd repo
# ------------------------ GEÄNDERT ------------------------
# Temporär die Vorlage aus dem hidden/templates Branch holen
git fetch origin hidden/templates
git checkout origin/hidden/templates -- metadata.template
TEMPLATE="metadata.template"
# -----------------------------------------------------------
# TEMPLATE="templates/metadata.template"
OUT="metadata.txt"
CONTENT=$(cat "$TEMPLATE")
CONTENT="${CONTENT//\{\{NAME\}\}/${{ steps.info.outputs.name }}}"
CONTENT="${CONTENT//\{\{QGIS_MIN\}\}/${{ steps.info.outputs.qgisMinimumVersion }}}"
CONTENT="${CONTENT//\{\{QGIS_MAX\}\}/${{ steps.info.outputs.qgisMaximumVersion }}}"
CONTENT="${CONTENT//\{\{DESCRIPTION\}\}/${{ steps.info.outputs.description }}}"
CONTENT="${CONTENT//\{\{VERSION\}\}/${{ steps.releaseinfo.outputs.version }}}"
CONTENT="${CONTENT//\{\{AUTHOR\}\}/${{ steps.info.outputs.author }}}"
CONTENT="${CONTENT//\{\{EMAIL\}\}/${{ steps.info.outputs.email }}}"
CONTENT="${CONTENT//\{\{HOMEPAGE\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
CONTENT="${CONTENT//\{\{TRACKER\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
CONTENT="${CONTENT//\{\{REPOSITORY\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
CONTENT="${CONTENT//\{\{EXPERIMENTAL\}\}/${{ steps.info.outputs.experimental }}}"
CONTENT="${CONTENT//\{\{DEPRECATED\}\}/${{ steps.info.outputs.deprecated }}}"
CONTENT="${CONTENT//\{\{QT6\}\}/${{ steps.info.outputs.supportsQt6 }}}"
printf "%s\n" "$CONTENT" > "$OUT"
rm $TEMPLATE
- name: ZIP-Datei erstellen
id: zip
run: |
cd repo
ZIP_FOLDER="${{ steps.info.outputs.zip_folder }}"
ZIP_FILE="${ZIP_FOLDER}.zip"
VERSION="${{ steps.releaseinfo.outputs.version }}"
REPO_NAME="${GITHUB_REPOSITORY##*/}"
#ZIP_NAME="${REPO_NAME}-${VERSION}.zip"
mkdir -p dist/${ZIP_FOLDER}
rsync -a \
--exclude='.git' \
--exclude='.gitea' \
--exclude='.plugin' \
--exclude='dist' \
./ dist/${ZIP_FOLDER}/
cd dist
zip -r "${ZIP_FILE}" "${ZIP_FOLDER}/" \
-x "*.pyc" -x "*/__pycache__/*"
cd ..
echo "zip_file=${ZIP_FILE}" >> $GITHUB_OUTPUT
- name: Gitea-Release erstellen
id: create_release
run: |
TAG="${{ github.ref_name }}"
VERSION="${{ steps.releaseinfo.outputs.version }}"
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
API_URL="https://${{ vars.RELEASE_URL }}/api/v1/repos/${GITHUB_REPOSITORY}/releases"
JSON=$(jq -n \
--arg tag "$TAG" \
--arg name "Version $VERSION" \
--arg body "${{ steps.changelog.outputs.current }}" \
--argjson draft "${{ steps.releaseinfo.outputs.draft }}" \
--argjson prerelease "${{ steps.releaseinfo.outputs.prerelease }}" \
'{tag_name: $tag, name: $name, body: $body, draft: $draft, prerelease: $prerelease}')
API_RESPONSE=$(curl -s -X POST "$API_URL" \
-H "accept: application/json" \
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
-H "Content-Type: application/json" \
-d "$JSON")
RELEASE_ID=$(echo "$API_RESPONSE" | jq -r '.id')
if [ "$RELEASE_ID" = "null" ] || [ -z "$RELEASE_ID" ]; then
echo "Fehler beim Erstellen des Releases!"
echo "$API_RESPONSE"
exit 1
fi
echo "release_id=$RELEASE_ID" >> $GITHUB_OUTPUT
- name: ZIP-Datei hochladen
run: |
RELEASE_ID="${{ steps.create_release.outputs.release_id }}"
ZIP_FILE="${{ steps.zip.outputs.zip_file }}"
API_URL="https://${{ vars.RELEASE_URL }}/api/v1/repos/${GITHUB_REPOSITORY}/releases/${RELEASE_ID}/assets?name=${ZIP_FILE}"
curl -s -X POST "$API_URL" \
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
-H "Content-Type: application/zip" \
--data-binary "@repo/dist/${ZIP_FILE}" \
-o upload_response.json
# Optional: Fehlerprüfung
if jq -e '.id' upload_response.json >/dev/null 2>&1; then
echo "ZIP erfolgreich hochgeladen."
else
echo "Fehler beim Hochladen der ZIP!"
exit 1
fi
- name: Payload erzeugen
run: |
cd repo
VERSION="${{ steps.releaseinfo.outputs.version }}"
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
ZIP_FILE="${{ steps.zip.outputs.zip_file }}"
DOWNLOAD_URL="https://${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}/releases/download/${{ github.ref_name }}/${ZIP_FILE}"
jq -n \
--arg name "${{ steps.info.outputs.name }}" \
--arg version "$VERSION" \
--arg channel "$CHANNEL" \
--arg description "${{ steps.info.outputs.description }}" \
--arg author "${{ steps.info.outputs.author }}" \
--arg email "${{ steps.info.outputs.email }}" \
--arg qgis_min "${{ steps.info.outputs.qgisMinimumVersion }}" \
--arg qgis_max "${{ steps.info.outputs.qgisMaximumVersion }}" \
--arg homepage "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
--arg tracker "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
--arg repository "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
--arg experimental "${{ steps.info.outputs.experimental }}" \
--arg deprecated "${{ steps.info.outputs.deprecated }}" \
--arg qt6 "${{ steps.info.outputs.supportsQt6 }}" \
--arg id "${{ steps.info.outputs.zip_folder }}" \
--arg url "$DOWNLOAD_URL" \
--arg changelog "${{ steps.changelog.outputs.current }}" \
'{
name: $name,
version: $version,
channel: $channel,
description: $description,
author: $author,
email: $email,
qgis_min: $qgis_min,
qgis_max: $qgis_max,
homepage: $homepage,
tracker: $tracker,
repository: $repository,
experimental: $experimental,
deprecated: $deprecated,
qt6: $qt6,
id: $id,
url: $url,
changelog: $changelog
}' > payload.json
- name: Repository aktualisieren
run: |
OWNER="AG_QGIS"
WORKFLOW="update.yml"
PAYLOAD_B64=$(base64 -w0 repo/payload.json)
FULL_NAME="${{ steps.info.outputs.name }}"
NAME=$(echo "$FULL_NAME" | awk -F'|' '{gsub(/^ +| +$/,"",$2); print $2}')
TAG="${{ steps.releaseinfo.outputs.version }}"
JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\",\"name\":\"$NAME\",\"tag\":\"$TAG\"}}"
#JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\"}}"
echo "DEBUG | Sende JSON:"
echo "$JSON"
curl -X POST \
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
-H "Content-Type: application/json" \
-d "$JSON" \
"https://${{ vars.RELEASE_URL }}/api/v1/repos/${OWNER}/Repository/actions/workflows/${WORKFLOW}/dispatches"

0
changelog.txt Normal file
View File

View File

@@ -5,7 +5,7 @@ from typing import Any
from typing import Literal, Optional
from sn_basis.functions.qt_wrapper import (
QMessageBox, YES, NO, CANCEL, QT_VERSION, exec_dialog, ICON_QUESTION,
QProgressDialog, QCoreApplication, Qt, QInputDialog, QLineEdit,
)
def ask_yes_no(
@@ -39,6 +39,50 @@ def ask_yes_no(
return default
def show_info_dialog(title: str, message: str, parent: Any = None) -> None:
"""
Zeigt einen modalen Info-Dialog mit OK-Button.
Blockiert bis der Nutzer bestätigt.
"""
try:
if QT_VERSION == 0: # Mock-Modus
print(f"Mock-Modus: show_info_dialog('{title}')")
return
QMessageBox.information(parent, title, message)
except Exception as e:
print(f"⚠️ show_info_dialog Fehler: {e}")
def ask_text(
title: str,
label: str,
default_text: str = "",
parent: Any = None,
) -> tuple[str, bool]:
"""Zeigt einen modalen Texteingabe-Dialog und gibt Text + OK-Status zurück."""
try:
if QT_VERSION == 0: # Mock-Modus
print(f"Mock-Modus: ask_text('{title}') -> '{default_text}'")
return default_text, True
# PyQt6: QLineEdit.EchoMode.Normal / PyQt5: QLineEdit.Normal
echo_mode = (
getattr(QLineEdit, "Normal", None)
or getattr(getattr(QLineEdit, "EchoMode", None), "Normal", None)
or 0
)
text, accepted = QInputDialog.getText(
parent,
title,
label,
echo_mode,
default_text,
)
return str(text or ""), bool(accepted)
except Exception as e:
print(f"⚠️ ask_text Fehler: {e}")
return default_text, False
OverwriteDecision = Optional[Literal["overwrite", "append", "cancel"]]
@@ -82,3 +126,101 @@ def ask_overwrite_append_cancel_custom(
return "append"
else: # cancel_btn
return "cancel"
class ProgressDialog:
def __init__(self, total: int, title: str = "Fortschritt", label: str = "Verarbeite..."):
self.total = max(total, 1)
self._canceled = False
if QT_VERSION == 0:
self.value = 0
self.label = label
self.title = title
return
self._dlg = QProgressDialog(label, "Abbrechen", 0, self.total)
self._dlg.setWindowTitle(title)
# Qt5 vs Qt6: WindowModality-Enum unterschiedlich verfügbar
modality = None
if hasattr(Qt, "WindowModality"):
try:
modality = Qt.WindowModality.WindowModal
except Exception:
modality = None
if modality is None and hasattr(Qt, "WindowModal"):
modality = Qt.WindowModal
if modality is not None:
try:
self._dlg.setWindowModality(modality)
except Exception:
pass
self._dlg.setMinimumDuration(0)
self._dlg.setAutoClose(False)
self._dlg.setAutoReset(False)
self._dlg.setValue(0)
def on_cancel():
if self._dlg and self._dlg.value() >= self.total:
# OK-Button am Ende
self._dlg.close()
return
self._canceled = True
self._dlg.close()
try:
self._dlg.canceled.connect(on_cancel)
except Exception:
pass
def set_total(self, total: int) -> None:
self.total = max(total, 1)
if QT_VERSION == 0:
return
if self._dlg is not None:
self._dlg.setMaximum(self.total)
def set_value(self, value: int) -> None:
if QT_VERSION == 0:
self.value = value
return
if self._dlg is not None:
self._dlg.setValue(min(value, self.total))
if value >= self.total:
self._dlg.setLabelText("Fertig. Klicken Sie auf OK, um das Fenster zu schließen.")
self._dlg.setCancelButtonText("OK")
QCoreApplication.processEvents()
def set_label(self, text: str) -> None:
if QT_VERSION == 0:
self.label = text
return
if self._dlg is not None:
self._dlg.setLabelText(text)
QCoreApplication.processEvents()
def is_canceled(self) -> bool:
if QT_VERSION == 0:
return self._canceled
if self._dlg is not None:
return self._canceled or self._dlg.wasCanceled()
return self._canceled
def close(self) -> None:
if QT_VERSION == 0:
return
if self._dlg is not None:
self._dlg.close()
def create_progress_dialog(total: int, title: str = "Fortschritt", label: str = "Verarbeite...") -> ProgressDialog:
return ProgressDialog(total, title, label)

View File

@@ -57,6 +57,22 @@ def get_home_dir() -> Path:
return Path.home()
def is_absolute_path(path: _PathLike) -> bool:
"""Prüft, ob ein Pfad absolut ist."""
try:
return Path(path).is_absolute()
except Exception:
return False
def basename(path: _PathLike) -> str:
"""Gibt den finalen Namen des Pfades zurück (Dateiname oder Ordner)."""
try:
return Path(path).name
except Exception:
return ""
# ---------------------------------------------------------
# Dateisystem-Eigenschaften
# ---------------------------------------------------------
@@ -75,3 +91,11 @@ def is_case_sensitive_fs() -> bool:
# Linux praktisch immer case-sensitiv
return True
def path_suffix(path: _PathLike) -> str:
"""Gibt die Dateiendung eines Pfades zurück (inklusive Punkt)."""
try:
return Path(path).suffix
except Exception:
return ""

View File

@@ -20,6 +20,19 @@ QgsNetworkAccessManager: Type[Any]
Qgis: Type[Any]
QgsMapLayerProxyModel: Type[Any]
QgsVectorFileWriter: Type[Any] # neu: Schreib-API
QgsFeature: Type[Any]
QgsField: Type[Any]
QgsGeometry: Type[Any]
QgsFeatureRequest: Type[Any]
QgsCoordinateTransform: Type[Any]
QgsCoordinateReferenceSystem: Type[Any]
QgsPrintLayout: Type[Any]
QgsLayoutItemMap: Type[Any]
QgsLayoutItemLabel: Type[Any]
QgsLayoutPoint: Type[Any]
QgsLayoutSize: Type[Any]
QgsUnitTypes: Type[Any]
QgsLayoutItem: Type[Any]
QGIS_AVAILABLE = False
@@ -36,9 +49,19 @@ try:
Qgis as _Qgis,
QgsMapLayerProxyModel as _QgsMaplLayerProxyModel,
QgsVectorFileWriter as _QgsVectorFileWriter,
QgsFeature as _QgsFeature,
QgsFeature as _QgsFeature,
QgsField as _QgsField,
QgsGeometry as _QgsGeometry,
QgsGeometry as _QgsGeometry,
QgsFeatureRequest as _QgsFeatureRequest,
QgsCoordinateTransform as _QgsCoordinateTransform,
QgsCoordinateReferenceSystem as _QgsCoordinateReferenceSystem,
QgsPrintLayout as _QgsPrintLayout,
QgsLayoutItemMap as _QgsLayoutItemMap,
QgsLayoutItemLabel as _QgsLayoutItemLabel,
QgsLayoutPoint as _QgsLayoutPoint,
QgsLayoutSize as _QgsLayoutSize,
QgsUnitTypes as _QgsUnitTypes,
QgsLayoutItem as _QgsLayoutItem,
)
QgsProject = _QgsProject
@@ -50,7 +73,17 @@ try:
QgsVectorFileWriter = _QgsVectorFileWriter
QgsFeature = _QgsFeature
QgsField = _QgsField
QgsGeometry = _QgsGeometry
QgsGeometry = _QgsGeometry
QgsFeatureRequest = _QgsFeatureRequest
QgsCoordinateTransform = _QgsCoordinateTransform
QgsCoordinateReferenceSystem = _QgsCoordinateReferenceSystem
QgsPrintLayout = _QgsPrintLayout
QgsLayoutItemMap = _QgsLayoutItemMap
QgsLayoutItemLabel = _QgsLayoutItemLabel
QgsLayoutPoint = _QgsLayoutPoint
QgsLayoutSize = _QgsLayoutSize
QgsUnitTypes = _QgsUnitTypes
QgsLayoutItem = _QgsLayoutItem
QGIS_AVAILABLE = True
@@ -61,9 +94,17 @@ try:
except Exception:
QGIS_AVAILABLE = False
class _MockLayoutManager:
def layoutByName(self, name: str):
return None
def addLayout(self, layout: Any) -> bool:
return True
class _MockQgsProject:
def __init__(self):
self._variables = {}
self._layout_manager = _MockLayoutManager()
@staticmethod
def instance() -> "_MockQgsProject":
@@ -72,6 +113,9 @@ except Exception:
def read(self) -> bool:
return True
def layoutManager(self):
return self._layout_manager
QgsProject = _MockQgsProject
class _MockQgsVectorLayer:
@@ -122,6 +166,134 @@ except Exception:
QgsRasterLayer = _MockQgsRasterLayer
class _MockQgsPrintLayout:
def __init__(self, project: Any):
self.project = project
self._name = ""
self._page = _MockQgsLayoutPage()
def initializeDefaults(self) -> None:
pass
def setName(self, name: str) -> None:
self._name = name
def pageCollection(self):
return self
def page(self, index: int):
return self._page
def addLayoutItem(self, item: Any) -> None:
pass
class _MockQgsLayoutPage:
def setPageSize(self, size: Any) -> None:
self.size = size
class _MockQgsLayoutItem:
class ReferencePoint:
LowerLeft = 0
class _MockQgsLayoutItemMap:
def __init__(self, layout: Any):
self.layout = layout
def setId(self, item_id: str) -> None:
pass
def setExtent(self, extent: Any) -> None:
pass
def setScale(self, scale: float) -> None:
pass
def attemptMove(self, point: Any) -> None:
pass
def attemptResize(self, size: Any) -> None:
pass
def setFollowVisibilityPreset(self, active: bool) -> None:
pass
def setFollowVisibilityPresetName(self, name: str) -> None:
pass
class _MockQgsLayoutItemLabel:
ModeHtml = 1
def __init__(self, layout: Any):
self.layout = layout
def setId(self, item_id: str) -> None:
pass
def setText(self, text: str) -> None:
pass
def setMode(self, mode: Any) -> None:
pass
def setFont(self, font: Any) -> None:
pass
def setReferencePoint(self, point: Any) -> None:
pass
def attemptMove(self, point: Any) -> None:
pass
def attemptResize(self, size: Any) -> None:
pass
class _MockQgsLayoutPoint:
def __init__(self, x: float, y: float, unit: Any):
self.x = x
self.y = y
self.unit = unit
class _MockQgsLayoutSize:
def __init__(self, width: float, height: float, unit: Any):
self.width = width
self.height = height
self.unit = unit
class _MockQgsUnitTypes:
LayoutMillimeters = 0
QgsPrintLayout = _MockQgsPrintLayout
QgsLayoutItemMap = _MockQgsLayoutItemMap
QgsLayoutItemLabel = _MockQgsLayoutItemLabel
QgsLayoutPoint = _MockQgsLayoutPoint
QgsLayoutSize = _MockQgsLayoutSize
QgsUnitTypes = _MockQgsUnitTypes
QgsLayoutItem = _MockQgsLayoutItem
class _MockQgsFeatureRequest:
def __init__(self):
self._filter_rect = None
def setFilterRect(self, rect):
self._filter_rect = rect
return self
QgsFeatureRequest = _MockQgsFeatureRequest
class _MockQgsCoordinateTransform:
def __init__(self, *args, **kwargs):
pass
def transformBoundingBox(self, rect):
return rect
class _MockQgsCoordinateReferenceSystem:
def __init__(self, *args, **kwargs):
pass
QgsCoordinateTransform = _MockQgsCoordinateTransform
QgsCoordinateReferenceSystem = _MockQgsCoordinateReferenceSystem
QgsNetworkAccessManager = _MockQgsNetworkAccessManager
class _MockQgis:

View File

@@ -76,6 +76,9 @@ except Exception:
def removeToolBar(self, *args, **kwargs):
pass
def openLayoutDesigner(self, layout):
return layout
iface = _MockIface()
class _MockQgsFileWidget:
@@ -132,6 +135,13 @@ def get_main_window():
return None
def open_layout_designer(layout: Any) -> Any:
try:
return iface.openLayoutDesigner(layout)
except Exception:
return None
# ---------------------------------------------------------
# Dock-Handling
# ---------------------------------------------------------

View File

@@ -10,13 +10,17 @@ YES: Optional[Any] = None
NO: Optional[Any] = None
CANCEL: Optional[Any] = None
ICON_QUESTION: Optional[Any] = None
QVariant: Type[Any] = object
# Qt-Klassen (werden dynamisch gesetzt)
QDockWidget: Type[Any] = object
QMessageBox: Type[Any] = object
QFileDialog: Type[Any] = object
QProgressDialog: Type[Any] = object
QEventLoop: Type[Any] = object
QTimer: Type[Any] = object
QUrl: Type[Any] = object
QNetworkRequest: Type[Any] = object
QNetworkReply: Type[Any] = object
@@ -25,6 +29,7 @@ QWidget: Type[Any] = object
QGridLayout: Type[Any] = object
QLabel: Type[Any] = object
QLineEdit: Type[Any] = object
QInputDialog: Type[Any] = object
QGroupBox: Type[Any] = object
QVBoxLayout: Type[Any] = object
QPushButton: Type[Any] = object
@@ -37,7 +42,9 @@ QToolButton: Type[Any] = object
QSizePolicy: Type[Any] = object
Qt: Type[Any] = object
QComboBox: Type[Any] = object
QCheckBox: Type[Any] = object
QHBoxLayout: Type[Any] = object
QFont: Type[Any] = object
def exec_dialog(dialog: Any) -> Any:
@@ -64,10 +71,12 @@ try:
from qgis.PyQt.QtWidgets import (
QMessageBox as _QMessageBox,
QFileDialog as _QFileDialog,
QProgressDialog as _QProgressDialog,
QWidget as _QWidget,
QGridLayout as _QGridLayout,
QLabel as _QLabel,
QLineEdit as _QLineEdit,
QInputDialog as _QInputDialog,
QGroupBox as _QGroupBox,
QVBoxLayout as _QVBoxLayout,
QPushButton as _QPushButton,
@@ -80,10 +89,13 @@ try:
QToolButton as _QToolButton,
QSizePolicy as _QSizePolicy,
QComboBox as _QComboBox,
QCheckBox as _QCheckBox,
QHBoxLayout as _QHBoxLayout,
)
from qgis.PyQt.QtGui import QFont as _QFont
from qgis.PyQt.QtCore import (
QEventLoop as _QEventLoop,
QTimer as _QTimer,
QUrl as _QUrl,
QCoreApplication as _QCoreApplication,
Qt as _Qt,
@@ -98,7 +110,10 @@ try:
QT_VERSION = 6
QMessageBox = _QMessageBox
QFileDialog = _QFileDialog
QProgressDialog = _QProgressDialog
QProgressDialog = _QProgressDialog
QEventLoop = _QEventLoop
QTimer = _QTimer
QUrl = _QUrl
QNetworkRequest = _QNetworkRequest
QNetworkReply = _QNetworkReply
@@ -109,6 +124,7 @@ try:
QGridLayout = _QGridLayout
QLabel = _QLabel
QLineEdit = _QLineEdit
QInputDialog = _QInputDialog
QGroupBox = _QGroupBox
QVBoxLayout = _QVBoxLayout
QPushButton = _QPushButton
@@ -120,8 +136,10 @@ try:
QToolButton = _QToolButton
QSizePolicy = _QSizePolicy
QComboBox = _QComboBox
QCheckBox = _QCheckBox
QVariant = _QVariant
QHBoxLayout= _QHBoxLayout
QHBoxLayout = _QHBoxLayout
QFont = _QFont
# ✅ QT6 ENUMS
YES = QMessageBox.StandardButton.Yes
NO = QMessageBox.StandardButton.No
@@ -158,6 +176,7 @@ except (ImportError, AttributeError):
QGridLayout as _QGridLayout,
QLabel as _QLabel,
QLineEdit as _QLineEdit,
QInputDialog as _QInputDialog,
QGroupBox as _QGroupBox,
QVBoxLayout as _QVBoxLayout,
QPushButton as _QPushButton,
@@ -170,10 +189,13 @@ except (ImportError, AttributeError):
QToolButton as _QToolButton,
QSizePolicy as _QSizePolicy,
QComboBox as _QComboBox,
QCheckBox as _QCheckBox,
QHBoxLayout as _QHBoxLayout,
)
from PyQt5.QtGui import QFont as _QFont
from PyQt5.QtCore import (
QEventLoop as _QEventLoop,
QTimer as _QTimer,
QUrl as _QUrl,
QCoreApplication as _QCoreApplication,
Qt as _Qt,
@@ -189,6 +211,7 @@ except (ImportError, AttributeError):
QMessageBox = _QMessageBox
QFileDialog = _QFileDialog
QEventLoop = _QEventLoop
QTimer = _QTimer
QUrl = _QUrl
QNetworkRequest = _QNetworkRequest
QNetworkReply = _QNetworkReply
@@ -199,6 +222,7 @@ except (ImportError, AttributeError):
QGridLayout = _QGridLayout
QLabel = _QLabel
QLineEdit = _QLineEdit
QInputDialog = _QInputDialog
QGroupBox = _QGroupBox
QVBoxLayout = _QVBoxLayout
QPushButton = _QPushButton
@@ -210,8 +234,10 @@ except (ImportError, AttributeError):
QToolButton = _QToolButton
QSizePolicy = _QSizePolicy
QComboBox = _QComboBox
QCheckBox = _QCheckBox
QVariant = _QVariant
QHBoxLayout = _QHBoxLayout
QHBoxLayout= _QHBoxLayout
QFont = _QFont
# ✅ PYQT5 ENUMS
YES = QMessageBox.Yes
@@ -283,12 +309,30 @@ except (ImportError, AttributeError):
QFileDialog = _MockQFileDialog
class _MockQInputDialog:
@staticmethod
def getText(parent, title, label, mode=None, text=""):
return text, True
QInputDialog = _MockQInputDialog
class _MockQEventLoop:
def exec(self) -> int: return 0
def quit(self) -> None: pass
QEventLoop = _MockQEventLoop
class _MockQTimer:
def __init__(self, *args, **kwargs):
self.timeout = type('Signal', (), {
'connect': lambda s, cb: None,
})()
def setSingleShot(self, v: bool) -> None: pass
def start(self, ms: int) -> None: pass
def stop(self) -> None: pass
QTimer = _MockQTimer
class _MockQUrl(str):
def isValid(self) -> bool: return True
@@ -319,11 +363,18 @@ except (ImportError, AttributeError):
class _MockLabel:
def __init__(self, text: str = ""): self._text = text
class _MockLineEdit:
Normal = 0
def __init__(self, *args, **kwargs): self._text = ""
def text(self) -> str: return self._text
def setText(self, value: str) -> None: self._text = value
class _MockButton:
class _MockFont:
def __init__(self, family: str = "", pointSize: int = 10):
self.family = family
self.pointSize = pointSize
class _MockButton:
def __init__(self, *args, **kwargs): self.clicked = lambda *a, **k: None
QWidget = _MockWidget
@@ -333,6 +384,7 @@ except (ImportError, AttributeError):
QGroupBox = _MockWidget
QVBoxLayout = _MockLayout
QPushButton = _MockButton
QFont = _MockFont
QCoreApplication = object()
class _MockQt:
@@ -507,6 +559,22 @@ except (ImportError, AttributeError):
def setContentsMargins(self, *args, **kwargs):
pass
QHBoxLayout = _MockQHBoxLayout
class _MockQCheckBox:
def __init__(self, text: str = "", *args, **kwargs):
self._text = text
self._checked = False
def setText(self, text: str) -> None:
self._text = text
def isChecked(self) -> bool:
return self._checked
def setChecked(self, checked: bool) -> None:
self._checked = checked
QCheckBox = _MockQCheckBox
def exec_dialog(dialog: Any) -> Any:
return YES
# --------------------------- TEST ---------------------------

View File

@@ -6,6 +6,7 @@ from pathlib import Path
from typing import Union
import sys
from sn_basis.functions.os_wrapper import is_absolute_path, basename
_PathLike = Union[str, Path]

14
metadata.txt Normal file
View File

@@ -0,0 +1,14 @@
[general]
name=LNO Sachsen | Plugin Basisfunktionen
qgisMinimumVersion=3.40
qgisMaximumVersion=3.99
description=Plugin mit Basisfunktionen
version=26.3.6-unstable
author=Daniel Helbig
email=daniel.helbig@kreis-meissen.de
homepage=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis
tracker=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis/issues
repository=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis/src/branch/unstable/
experimental=true
deprecated=false
supportsQt6=true

View File

@@ -2,14 +2,14 @@
DataGrabber module
==================
UIfreier Orchestrator für die Prüfung und Klassifikation von Datenquellen.
UI-freier Orchestrator für die Prüfung und Klassifikation von Datenquellen.
Der DataGrabber:
- klassifiziert die übergebene Quelle (Datei, Dienst, Datenbank, Excel),
- ruft passende Prüfer (Dateipruefer, Linkpruefer, Layerpruefer, Stilpruefer) auf,
- sammelt alle rohen ``pruef_ergebnis``Objekte,
- sammelt alle rohen ``pruef_ergebnis``-Objekte,
- aggregiert diese zu einem zusammenfassenden Ergebnis,
- **löst selbst keinerlei UIInteraktion aus**.
- **löst selbst keinerlei UI-Interaktion aus**.
Alle Nutzerinteraktionen (MessageBar, QMessageBox, Logging) erfolgen
ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline).
@@ -17,8 +17,11 @@ ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline
from __future__ import annotations
import re
from typing import Any, Dict, List, Mapping, Optional, Tuple, Literal
from sn_basis.functions.os_wrapper import basename, path_suffix
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.modules.Pruefmanager import Pruefmanager
@@ -27,6 +30,7 @@ from sn_basis.modules.linkpruefer import Linkpruefer
from sn_basis.modules.layerpruefer import Layerpruefer
from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.excel_importer import ExcelImporter
from sn_plan41.modules.listenauswerter import Listenauswerter
SourceType = Literal["service", "database", "excel", "unknown"]
@@ -38,9 +42,6 @@ class DataGrabber:
"""
Analysiert und prüft Datenquellen für den Fachdatenabruf.
Der DataGrabber ist **UIfrei**. Er erzeugt ausschließlich rohe
``pruef_ergebnis``Objekte und überlässt deren Verarbeitung
vollständig dem aufrufenden Code.
"""
def __init__(
@@ -55,9 +56,9 @@ class DataGrabber:
) -> None:
self.pruefmanager = pruefmanager
self._datei_pruefer_cls = datei_pruefer_cls
self.link_pruefer = link_pruefer
self.layer_pruefer = layer_pruefer
self.stil_pruefer = stil_pruefer
self.link_pruefer = link_pruefer or Linkpruefer()
self.layer_pruefer = layer_pruefer or Layerpruefer()
self.stil_pruefer = stil_pruefer or Stilpruefer()
self._excel_importer_cls = excel_importer_cls
self._source: Optional[str] = None
@@ -69,23 +70,61 @@ class DataGrabber:
"""Setzt die aktuell zu untersuchende Rohquelle."""
self._source = source
def analyze_source_type(self, source: str) -> SourceType:
"""
Klassifiziert die Quelle.
SourceType = str # "excel" | "datenbank" | "dienst" | "unbekannt"
Aktuell Platzhalter liefert ``"unknown"``.
def analyze_source_type(self, quelle: str) -> Tuple[SourceType, pruef_ergebnis]:
"""
return "unknown"
Klassifiziert die Quelle und liefert das zugehörige pruef_ergebnis.
Reihenfolge:
1. Dateipruefer (Datei + Dateityp)
2. Linkpruefer (Dienst)
"""
# --------------------------------------------------
# 1. Datei prüfen (inkl. Typ-Erkennung)
# --------------------------------------------------
dateipruefer = Dateipruefer(pfad=quelle)
datei_ergebnis = dateipruefer.pruefe()
if datei_ergebnis.ok:
suffix = path_suffix(datei_ergebnis.kontext).lower()
print(f"[DataGrabber] Debug: analyze_source_type source={quelle} -> suffix={suffix}")
if suffix == ".xlsx":
return "excel", datei_ergebnis
if suffix in (".gpkg", ".sqlite"):
return "datenbank", datei_ergebnis
return "unbekannter_dateityp", datei_ergebnis
# --------------------------------------------------
# 2. Keine Datei → Link prüfen
# --------------------------------------------------
linkpruefer = Linkpruefer()
link_ergebnis = linkpruefer.pruefe(quelle)
if link_ergebnis.ok:
return "dienst", link_ergebnis
# --------------------------------------------------
# 3. Weder Datei noch Dienst
# --------------------------------------------------
return "unbekannte_quelle", link_ergebnis
def run(self, source: str) -> Tuple[SourceDict, pruef_ergebnis]:
"""
Führt die vollständige Quellprüfung aus.
Diese Methode ist **UIfrei**. Sie gibt rohe Ergebnisse zurück,
Diese Methode ist **UIfrei**. Sie gibt rohe Ergebnisse zurück,
die vom Aufrufer über den ``Pruefmanager`` verarbeitet werden.
"""
self.set_source(source)
source_type = self.analyze_source_type(source)
source_type, source_result = self.analyze_source_type(source)
print(f"[DataGrabber] Debug: run source={source} -> source_type={source_type}")
source_dict: SourceDict = {}
partial_results: List[pruef_ergebnis] = []
@@ -97,14 +136,7 @@ class DataGrabber:
elif source_type == "service":
source_dict, partial_results = self._process_service_source(source)
else:
partial_results.append(
pruef_ergebnis(
ok=False,
meldung="Quelle konnte nicht klassifiziert werden",
aktion="kein_dateipfad",
kontext={"source": source},
)
)
partial_results.append(source_result)
summary = self._aggregate_results(source, source_dict, partial_results)
return source_dict, summary
@@ -115,9 +147,150 @@ class DataGrabber:
def _process_excel_source(
self, filepath: str
) -> Tuple[SourceDict, List[pruef_ergebnis]]:
source_dict: SourceDict = {}
source_dict: SourceDict = {"rows": []}
results: List[pruef_ergebnis] = []
return source_dict, results
rows = ExcelImporter(filepath, self.pruefmanager).import_xlsx()
print(f"[DataGrabber] Debug: Excel-Linkliste geladen: {filepath}")
print(f"[DataGrabber] Debug: raw rows count: {len(rows)}")
if rows:
first = rows[:min(5, len(rows))]
print(f"[DataGrabber] Debug: first rows: {first}")
if not rows:
return source_dict, results
required_keys = {"ident", "gruppe", "kartenebene", "inhalt", "link", "provider", "stildatei"}
def extract_url(raw_link: str, provider: str) -> str:
if not raw_link:
return ""
if not isinstance(raw_link, str):
return str(raw_link)
if provider == "wfs":
url_match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
type_match = re.search(r"typename\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if url_match:
url = url_match.group(1).strip()
if type_match:
typename = type_match.group(1).strip()
separator = "&" if "?" in url else "?"
return f"url={url}{separator}service=WFS&request=GetFeature&typename={typename}"
return f"url={url}"
if provider == "wms":
# falls WMS-URL als url='...' vorliegt
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
if provider == "rest":
# REST/ArcGIS-Server: direkt nutzen
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
# allgemeines Rückfallverhalten
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
return raw_link.strip()
for row_index, raw_row in enumerate(rows, start=2):
if not isinstance(raw_row, Mapping):
pe = pruef_ergebnis(
ok=False,
meldung="Linklisten-Zeile ist nicht als Dictionary formatiert.",
aktion="ungueltige_zeile",
kontext={"zeile": row_index, "wert": raw_row},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
normalized = {str(k).strip().lower(): v for k, v in raw_row.items() if k is not None}
if not required_keys.issubset(normalized.keys()):
missing = required_keys.difference(normalized.keys())
pe = pruef_ergebnis(
ok=False,
meldung=f"Linkliste fehlt erforderliche Spalten: {', '.join(sorted(missing))}",
aktion="spaltenfehlend",
kontext={"zeile": row_index, "fehlend": sorted(missing)},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
ident = normalized.get("ident")
link_raw = normalized.get("link") or ""
provider = str(normalized.get("provider") or "").strip().lower()
stildatei_raw = normalized.get("stildatei") or ""
stildatei = None
if stildatei_raw and str(stildatei_raw).strip():
style_result = self.stil_pruefer.pruefe(str(stildatei_raw).strip())
results.append(self.pruefmanager.verarbeite(style_result))
if style_result.ok:
# Style-Pfad in der Datenkette beibehalten (absolut, wenn vorhanden).
stildatei = str(style_result.kontext or stildatei_raw).strip()
else:
stildatei = None
else:
results.append(self.pruefmanager.verarbeite(pruef_ergebnis(ok=True, meldung="Kein Stil angegeben", aktion="stil_optional", kontext=None)))
stildatei = None
if not ident or not link_raw or not provider:
pe = pruef_ergebnis(
ok=False,
meldung="Linklisten-Zeile hat fehlende Pflichtfelder (ident/link/provider).",
aktion="pflichtfelder_fehlen",
kontext={"zeile": row_index, "daten": raw_row},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
link_url = extract_url(link_raw, provider)
# Provider-abhängige Linkvalidierung
if provider in ("wfs", "wms", "rest"):
# Webdienste: wir akzeptieren die URL-Form und prüfen nicht per network_head.
link_result = pruef_ergebnis(ok=True, meldung="Service-Link angenommen", aktion="service_link", kontext=link_url)
elif provider in ("ogr", "gpkg", "shp", "geojson"):
# OGR/Pfad: mit Linkpruefer (pfad oder lokale Datei) prüfen
link_result = self.link_pruefer.pruefe(link_url)
else:
link_result = self.link_pruefer.pruefe(link_url)
results.append(self.pruefmanager.verarbeite(link_result))
# stildatei wurde bereits oben geprüft und ggf. auf Dateiname gesetzt oder auf None
if not link_result.ok:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Zeile {row_index}: fehlerhafter Link",
aktion="link_unvollstaendig",
kontext={"row": row_index, "ident": ident},
)
)
continue
result_row = {
"ident": ident,
"gruppe": normalized.get("gruppe"),
"Kartenebene": normalized.get("kartenebene"),
"Inhalt": normalized.get("inhalt"),
"Link": link_url,
"Provider": provider,
"stildatei": stildatei,
}
source_dict["rows"].append(result_row)
# Validierung über Listenauswerter
listenauswerter = Listenauswerter(self.pruefmanager, self.stil_pruefer or Stilpruefer())
validated, validation_results = listenauswerter.validate_rows(source_dict)
results.extend(validation_results)
return validated, results
# ------------------------------------------------------------------
# DatenbankQuellen
@@ -125,6 +298,7 @@ class DataGrabber:
def _process_database_source(
self, db_path: str
) -> Tuple[SourceDict, List[pruef_ergebnis]]:
print(f"[DataGrabber] Debug: _process_database_source called, db_path={db_path}")
source_dict: SourceDict = {}
results: List[pruef_ergebnis] = []
return source_dict, results
@@ -149,24 +323,29 @@ class DataGrabber:
partial_results: List[pruef_ergebnis],
) -> pruef_ergebnis:
"""
Aggregiert Einzelprüfungen zu einem Gesamt``pruef_ergebnis``.
Aggregiert Einzelprüfungen zu einem Gesamt-``pruef_ergebnis``.
**Keine UIInteraktion.**
**Keine UI-Interaktion.**
"""
if source_dict:
rows = source_dict.get("rows") if isinstance(source_dict, dict) else None
if rows:
return pruef_ergebnis(
ok=True,
meldung="Quelle erfolgreich geprüft",
aktion="ok",
kontext={
"source": source,
"valid_entries": sum(len(v) for v in source_dict.values()),
"valid_entries": len(rows),
},
)
# Wenn die Linkliste zwar gelesen wurde, aber keine gültigen Zeilen verfügbar sind, geben wir spezifischere Infos zurück.
return pruef_ergebnis(
ok=False,
meldung="Keine gültigen Einträge in der Quelle gefunden",
aktion="read_error",
kontext={"source": source},
meldung="Keine validen Einträge in der Linkliste gefunden",
aktion="keine_validen_eintraege",
kontext={
"source": source,
"eintraege_gesamt": len(source_dict.get("rows", [])),
},
)

View File

@@ -6,11 +6,11 @@ der Anforderungen 1-2.e (leerer Pfad, fehlende Datei, bestehende Datei).
"""
from pathlib import Path
from typing import Optional
from typing import Optional, Literal
from sn_basis.functions.sys_wrapper import join_path, file_exists
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion
DateiTyp = Literal["excel","datenbank","unbekannt"]
class Dateipruefer:
"""
@@ -74,10 +74,25 @@ class Dateipruefer:
# ------------------------------------------------------------------
# Hilfsfunktionen
# ------------------------------------------------------------------
def erkenne_dateityp(self, pfad: Path) -> DateiTyp:
"""
Erkennt den Dateityp anhand der Endung.
"""
suffix = pfad.suffix.lower()
if suffix == ".xlsx":
return "excel"
if suffix in (".gpkg", ".sqlite"):
return "datenbank"
return "unbekannt"
def _pfad(self, relativer_pfad: str) -> Path:
"""Erzeugt OS-unabhängigen Pfad relativ zum Basisverzeichnis."""
return join_path(self.basis_pfad, relativer_pfad)
def _ist_leer(self) -> bool:
"""
Prüft robust, ob Eingabe als „leer" zu behandeln ist.
@@ -134,6 +149,31 @@ class Dateipruefer:
# 2. Pfad normalisieren
pfad = self._pfad(self.pfad.strip())
#Excel-dateien erkennen
dateityp = self.erkenne_dateityp(pfad)
if dateityp == "excel":
if not file_exists(pfad):
return pruef_ergebnis(
ok=False,
meldung=f"Excel-Datei '{self.pfad}' wurde nicht gefunden.",
aktion="datei_nicht_gefunden",
kontext=pfad,
)
return pruef_ergebnis(
ok=True,
meldung="Excel-Datei ist gültig.",
aktion="ok",
kontext=pfad,
)
if dateityp != "datenbank":
return pruef_ergebnis(
ok=False,
meldung=f"Der Pfad '{self.pfad}' ist kein unterstützter Dateityp.",
aktion="unbekannter_dateityp",
kontext=pfad,
)
# 🆕 2.c: Ungültiger GPKG-Pfad?
if not self.verfahrens_db_modus or not self._ist_gueltiger_gpkg_pfad(pfad):

View File

@@ -17,10 +17,11 @@ Designprinzipien
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
"""
from typing import Any, Dict, List, Mapping, Optional, Tuple
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import json
import time
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qgiscore_wrapper as qgiscore
@@ -59,6 +60,7 @@ class Datenabruf:
verfahrensgebiet_layer: Any,
speicherort: str,
pruef_ergebnisse: Optional[List[Any]] = None,
progress: Optional[Any] = None,
) -> Tuple[Dict[str, Any], List[Any]]:
"""
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
@@ -82,6 +84,10 @@ class Datenabruf:
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
filter_crs_authid = None
if isinstance(bbox_geom, dict):
raw_crs = bbox_geom.get("crs_authid")
filter_crs_authid = str(raw_crs) if raw_crs else None
# Globale Logs über alle Dienste hinweg
log_geladen: Dict[str, int] = {}
@@ -90,7 +96,20 @@ class Datenabruf:
log_ausserhalb: Dict[str, int] = {}
# 2) Über alle Zeilen iterieren
for row in rows:
total_rows = len(rows)
for idx, row in enumerate(rows, start=1):
if progress is not None:
progress.set_label(f"Datenabruf {idx}/{total_rows}")
if progress.is_canceled():
pe_cancel = pruef_ergebnis(
ok=False,
meldung="Datenabruf durch Benutzer abgebrochen",
aktion="abbruch",
kontext={"schritt": idx},
)
processed_results.append(self.pruefmanager.verarbeite(pe_cancel))
break
ident = row.get("ident")
link = row.get("Link")
provider = row.get("Provider")
@@ -115,7 +134,16 @@ class Datenabruf:
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
# 2b) Fachdaten abrufen
features, error_msg = self._fetch_features(url=url, provider=str(provider))
features, error_msg = self._fetch_features(
url=url,
provider=str(provider),
cancel_callback=(progress.is_canceled if progress is not None else None),
)
if progress is not None:
if hasattr(progress, "set_value"):
progress.set_value(idx)
# 2c) Logs und Aggregation
if error_msg:
@@ -207,7 +235,18 @@ class Datenabruf:
return None
if raumfilter == "Verfahrensgebiet":
return qgiscore.get_layer_extent(verfahrensgebiet_layer)
extent = qgiscore.get_layer_extent(verfahrensgebiet_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(verfahrensgebiet_layer, "crs") and callable(getattr(verfahrensgebiet_layer, "crs")):
crs = verfahrensgebiet_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
if raumfilter == "Pufferlayer":
buffer_layer = qgiscore.create_buffer_layer(
@@ -216,8 +255,18 @@ class Datenabruf:
layer_name="Verfahrensgebiet_Puffer_1km",
)
if buffer_layer is not None:
qgisui.add_layer_to_project(buffer_layer)
return qgiscore.get_layer_extent(buffer_layer)
extent = qgiscore.get_layer_extent(buffer_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(buffer_layer, "crs") and callable(getattr(buffer_layer, "crs")):
crs = buffer_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
return None
@@ -233,60 +282,130 @@ class Datenabruf:
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
"""
provider_norm = (provider or "").upper()
base_link = link or ""
base_link = (link or "").strip()
if base_link.lower().startswith("url="):
base_link = base_link[4:].strip()
# WMS: niemals BBOX anhängen
if provider_norm == "WFS" and base_link.count("?") > 1:
first, rest = base_link.split("?", 1)
base_link = f"{first}?{rest.replace('?', '&')}"
extent_obj = bbox_geom
crs_authid: Optional[str] = None
if isinstance(bbox_geom, dict):
extent_obj = bbox_geom.get("extent")
raw_crs = bbox_geom.get("crs_authid")
crs_authid = str(raw_crs) if raw_crs else None
# WMS: unverändert durchreichen
if provider_norm == "WMS":
return base_link
if bbox_geom is None:
return base_link
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
# Versuche bbox-String zu erzeugen (falls Raumfilter aktiv)
bbox_str: Optional[str] = None
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(bbox_geom)
else:
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
if extent_obj is not None:
try:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
if callable(extent_to_bbox):
bbox_str = extent_to_bbox(extent_obj)
else:
bbox_str = str(bbox_geom)
except Exception:
bbox_str = None
if not bbox_str:
return base_link
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
bbox_str = f"{extent_obj.xmin()},{extent_obj.ymin()},{extent_obj.xmax()},{extent_obj.ymax()}"
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
bbox_str = f"{extent_obj[0]},{extent_obj[1]},{extent_obj[2]},{extent_obj[3]}"
else:
bbox_str = str(extent_obj)
except Exception:
bbox_str = None
parsed = urlparse(base_link)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
if provider_norm == "WFS":
query_params.setdefault("BBOX", bbox_str)
query_params.setdefault("service", "WFS")
query_params.setdefault("request", "GetFeature")
query_params.setdefault("outputFormat", "application/json")
if bbox_str:
query_params.setdefault("BBOX", bbox_str)
if crs_authid:
query_params.setdefault("SRSNAME", crs_authid)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
# ArcGIS FeatureServer erwartet i.d.R. den /query-Endpunkt
rest_base = base_link.rstrip("/")
if not rest_base.lower().endswith("/query"):
rest_base = f"{rest_base}/query"
parsed_rest = urlparse(rest_base)
query_params = dict(parse_qsl(parsed_rest.query, keep_blank_values=True))
query_params.setdefault("where", "1=1")
query_params.setdefault("outFields", "*")
query_params.setdefault("returnGeometry", "true")
query_params.setdefault("f", query_params.get("f", "json"))
if bbox_str:
geometry_envelope = None
try:
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
geometry_envelope = {
"xmin": extent_obj.xmin(),
"ymin": extent_obj.ymin(),
"xmax": extent_obj.xmax(),
"ymax": extent_obj.ymax(),
}
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
geometry_envelope = {
"xmin": extent_obj[0],
"ymin": extent_obj[1],
"xmax": extent_obj[2],
"ymax": extent_obj[3],
}
else:
parts = [p.strip() for p in str(bbox_str).split(",")]
if len(parts) == 4:
geometry_envelope = {
"xmin": float(parts[0]),
"ymin": float(parts[1]),
"xmax": float(parts[2]),
"ymax": float(parts[3]),
}
except Exception:
geometry_envelope = None
if geometry_envelope is not None:
query_params.setdefault("geometry", json.dumps(geometry_envelope))
else:
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
if crs_authid and ":" in crs_authid:
srid = crs_authid.split(":", 1)[1]
if srid.isdigit():
query_params.setdefault("inSR", srid)
query_params.setdefault("outSR", srid)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
rebuilt = parsed_rest._replace(query=new_query)
return urlunparse(rebuilt)
# Default: generischer bbox-Parameter
query_params.setdefault("bbox", bbox_str)
# Default: generischer bbox-Parameter (nur wenn vorhanden)
if bbox_str:
query_params.setdefault("bbox", bbox_str)
new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt)
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]:
def _fetch_features(
self,
url: str,
provider: str,
cancel_callback: Optional[Callable[[], bool]] = None,
) -> Tuple[List[Any], Optional[str]]:
"""
Führt den eigentlichen Abruf der Fachdaten durch.
@@ -336,34 +455,100 @@ class Datenabruf:
http_error: Optional[str] = None
# QGIS NetworkAccessManager bevorzugen
_FETCH_TIMEOUT_MS = 30_000 # 30 Sekunden
aborted_or_timed_out = False
attempted_qgis_fetch = False
if callable(cancel_callback) and cancel_callback():
return [], "Abbruch durch Benutzer"
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
attempted_qgis_fetch = True
try:
manager = qgiscore.QgsNetworkAccessManager.instance()
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None)
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None)
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None)
# Netzwerk-Timeout global setzen (QGIS >= 3.6)
if hasattr(manager, "setTimeout"):
manager.setTimeout(_FETCH_TIMEOUT_MS)
_qt = __import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"])
QUrl = getattr(_qt, "QUrl", None)
QNetworkRequest = getattr(_qt, "QNetworkRequest", None)
QEventLoop = getattr(_qt, "QEventLoop", None)
QTimer = getattr(_qt, "QTimer", None)
if QUrl is not None and QNetworkRequest is not None:
req = QNetworkRequest(QUrl(url))
reply = manager.get(req)
if QEventLoop is not None:
loop = QEventLoop()
reply.finished.connect(loop.quit)
loop.exec()
try:
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
_poll_timer = None
if QTimer is not None:
try:
_poll_timer = QTimer()
_poll_timer.setSingleShot(False)
_poll_timer.timeout.connect(loop.quit)
_poll_timer.start(100)
except Exception:
_poll_timer = None
start_time = time.monotonic()
while True:
if callable(cancel_callback) and cancel_callback():
reply.abort()
http_error = "Abbruch durch Benutzer"
aborted_or_timed_out = True
break
elapsed_ms = int((time.monotonic() - start_time) * 1000)
if elapsed_ms >= _FETCH_TIMEOUT_MS:
reply.abort()
http_error = f"Timeout nach {_FETCH_TIMEOUT_MS // 1000} s: {url}"
aborted_or_timed_out = True
break
if hasattr(reply, "isFinished") and reply.isFinished():
break
loop.exec()
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
if _poll_timer is not None:
try:
_poll_timer.stop()
except Exception:
pass
if not aborted_or_timed_out:
# Fehler aus Reply auslesen
err_code = None
try:
err_code = reply.error()
except Exception:
pass
if err_code and int(err_code) != 0:
http_error = f"Netzwerkfehler ({err_code}): {reply.errorString()}"
if http_error:
# Timeout oder Netzwerkfehler keinen Body lesen
pass
else:
try:
response_text = reply.text()
raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception:
response_text = None
try:
response_text = reply.text()
except Exception:
response_text = None
except Exception as exc:
http_error = f"QgsNetworkAccessManager error: {exc}"
response_text = None
# Fallback: requests
if response_text is None:
# Fallback: requests nur wenn kein harter Abbruch/Timeout im QGIS-Request vorlag
if response_text is None and (not attempted_qgis_fetch or not aborted_or_timed_out):
try:
import requests # lokal import, keine harte Abhängigkeit
r = requests.get(url, timeout=30)
@@ -383,6 +568,8 @@ class Datenabruf:
return parsed.get("features", []), None
if isinstance(parsed, dict) and "features" in parsed:
return parsed.get("features", []), None
if prov in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER", "WFS"):
return [], "Antwort enthält keine Feature-Liste"
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
return [parsed], None
except json.JSONDecodeError:

View File

@@ -30,9 +30,13 @@ from __future__ import annotations
from typing import Any, Dict, List, Optional
import os
import json
import re
import datetime
import sqlite3
from sn_basis.functions import qgiscore_wrapper as qgiscore
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
@@ -53,10 +57,97 @@ class Datenschreiber:
def __init__(self, pruefmanager: Any, gpkg_path: Optional[str] = None) -> None:
self.pruefmanager = pruefmanager
self.gpkg_path = gpkg_path
self.gpkg_path = str(gpkg_path) if gpkg_path else None
# ------------------------------------------------------------------ #
# Schreibe Daten
def _resolve_style_path(self, style_path: Optional[str]) -> Optional[str]:
if not style_path:
return None
style_path_str = str(style_path).strip()
if not style_path_str:
return None
if not is_absolute_path(style_path_str):
plugin_root = get_plugin_root()
style_path_str = str(join_path(plugin_root, "sn_plan41", "assets", style_path_str))
style_path_str = str(normalize_path(style_path_str))
return style_path_str if file_exists(style_path_str) else None
def _store_style_in_gpkg(self, layer_name: str, style_path: str, layer: Optional[Any] = None) -> None:
"""Stellt sicher, dass der Stil in der layer_styles-Tabelle der GPKG gespeichert wird."""
try:
with open(style_path, "r", encoding="utf-8") as fh:
style_qml = fh.read()
f_geometry_column = ''
if layer is not None:
try:
if hasattr(layer, 'geometryColumn'):
f_geometry_column = str(layer.geometryColumn())
elif hasattr(layer, 'dataProvider') and hasattr(layer.dataProvider(), 'geometryColumnName'):
f_geometry_column = str(layer.dataProvider().geometryColumnName())
except Exception:
f_geometry_column = ''
with sqlite3.connect(self.gpkg_path) as conn:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS layer_styles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
f_table_catalog TEXT,
f_table_schema TEXT,
f_table_name TEXT NOT NULL,
f_geometry_column TEXT,
styleName TEXT,
styleQML TEXT,
styleSLD TEXT,
useAsDefault BOOLEAN,
description TEXT,
owner TEXT,
ui TEXT,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
# Das aktuelle QGIS-Style-Verhalten: bestehenden Style für denselben Layer nicht löschen (nur appenden)
# Wir wollen aber Default-Style setzen: alte Default-Styles entfernen.
cur.execute(
"UPDATE layer_styles SET useAsDefault = 0 WHERE f_table_name = ?",
(layer_name,),
)
# Fülle die bekannten QGIS-Kolonnen
style_name = os.path.basename(style_path)
cur.execute(
"INSERT INTO layer_styles (f_table_catalog, f_table_schema, f_table_name, f_geometry_column, styleName, styleQML, styleSLD, useAsDefault, description, owner, ui) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
'',
'',
layer_name,
f_geometry_column,
style_name,
style_qml,
None,
1,
'',
'',
'',
),
)
conn.commit()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Speichern des Layer-Stils in GPKG: {exc}",
aktion="style_gpkg_speichern_fehlgeschlagen",
kontext={"layer_name": layer_name, "style_path": style_path},
)
)
# ------------------------------------------------------------------ #
def schreibe_Daten(
self,
@@ -65,192 +156,93 @@ class Datenschreiber:
speicherort: str,
) -> List[Dict[str, Any]]:
"""
Schreibt die abgerufenen Daten in die Zieldatenbank/Dateien.
Schreibt die übergebenen Layer in die Ziel-GPKG.
Ablauf
------
Für jede Zeile (ident) in ``daten_dict["daten"]``:
1. Bestimme Ziel-Layername (z. B. Thema oder ident).
2. Prüfe, ob ein Layer mit diesem Namen bereits existiert (Wrapper).
3. Falls vorhanden, frage den Benutzer (Überschreiben / Anhängen / Abbrechen)
über die zentrale Pruefmanager-Methode `ask_overwrite_append_cancel`.
4. Führe die gewählte Operation aus oder schreibe den Layer, wenn er noch nicht existiert.
5. Schreibe ggf. den Stil in die GPKG und setze ihn als Vorgabe.
6. Sammle und gib eine Liste der angelegten/geänderten Layer zurück.
Returns
-------
List[Dict[str, Any]]
Liste von Dicts mit Informationen zu jedem angelegten/geänderten Layer.
Erwartung:
- daten_dict["daten"] enthält Einträge der Form:
ident -> {"layer": QgsVectorLayer}
- self.gpkg_path ist ein str
"""
if not speicherort:
raise ValueError("Ein gültiger Speicherort (speicherort) muss übergeben werden.")
# Setze gpkg_path falls noch nicht vorhanden
# gpkg_path einmalig setzen / normalisieren
if not self.gpkg_path:
self.gpkg_path = speicherort
self.gpkg_path = str(speicherort)
results: List[Dict[str, Any]] = []
daten_map: Dict[str, List[Any]] = daten_dict.get("daten", {})
daten_map: Dict[str, Any] = daten_dict.get("daten", {})
# Iteriere über alle Einträge
for ident, features in daten_map.items():
# Thema/Name ableiten (falls vorhanden in processed_results oder ident)
for ident, entry in daten_map.items():
layer = None
style_path = None
# -----------------------------
# Layer extrahieren
# -----------------------------
if isinstance(entry, dict):
layer = entry.get("layer")
style_path = self._resolve_style_path(entry.get("style_path"))
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Ungültiger Layer für {ident}",
aktion="save_exception",
kontext={"ident": ident},
)
self.pruefmanager.verarbeite(pe_err)
continue
# -----------------------------
# Layername bestimmen
# -----------------------------
thema = None
for pe in processed_results:
try:
kontext = getattr(pe, "kontext", None) or {}
if kontext and kontext.get("ident") == ident:
if kontext.get("ident") == ident:
thema = kontext.get("thema")
break
except Exception:
continue
if not thema:
thema = str(ident)
layer_name = thema
layer_name_raw = thema or str(ident)
layer_name = re.sub(r"[^A-Za-z0-9_]+", "_", layer_name_raw).strip("_")
if not layer_name:
layer_name = f"layer_{ident}"
# Prüfe, ob Layer bereits existiert in der Ziel-GPKG
layer_exists = False
try:
layer_exists_fn = getattr(qgiscore, "layer_exists_in_gpkg", None)
if callable(layer_exists_fn):
layer_exists = layer_exists_fn(self.gpkg_path, layer_name)
else:
# Fallback: QGIS-Fallback-Check via QgsVectorLayer
if getattr(qgiscore, "QgsVectorLayer", None) is not None and qgiscore.QGIS_AVAILABLE:
uri = f"{self.gpkg_path}|layername={layer_name}"
layer = qgiscore.QgsVectorLayer(uri, layer_name, "ogr")
layer_exists = bool(layer and getattr(layer, "isValid", lambda: False)())
except Exception:
layer_exists = False
operation = "created"
if layer_exists:
# Zentrale Nutzerabfrage über Pruefmanager
# Erwartet Rückgabe: "overwrite" | "append" | "cancel"
try:
user_choice = self.pruefmanager.ask_overwrite_append_cancel(layer_name)
except Exception:
# Fallback: overwrite, falls Pruefmanager nicht verfügbar
user_choice = "overwrite"
if user_choice == "cancel":
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
if user_choice == "overwrite":
write_err = self._write_layer_to_gpkg(layer_name, features, mode="overwrite")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Überschreiben von {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "overwritten"
elif user_choice == "append":
write_err = self._write_layer_to_gpkg(layer_name, features, mode="append")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Anhängen an {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "appended"
else:
# Layer existiert nicht -> neu anlegen
write_err = self._write_layer_to_gpkg(layer_name, features, mode="create")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erstellen von {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "created"
# Stilbehandlung (falls in processed_results referenziert)
style_written = False
style_path = None
for pe in processed_results:
try:
kontext = getattr(pe, "kontext", None) or {}
if kontext and kontext.get("ident") == ident:
style_path = kontext.get("stildatei") or kontext.get("Stildatei")
break
except Exception:
continue
# Layer in GPKG schreiben
err_msg = self._write_layer_to_gpkg(layer_name=layer_name, layer=layer)
if err_msg is not None:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Schreiben des Layers {layer_name}: {err_msg}",
aktion="save_exception",
kontext={"ident": ident, "layer_name": layer_name},
)
self.pruefmanager.verarbeite(pe_err)
continue
# Wenn der Stil vorhanden und valide ist, als Default in GPKG-Style-Tabelle ablegen
if style_path:
if not os.path.isabs(style_path):
base_dir = os.path.dirname(__file__)
style_path = os.path.join(base_dir, style_path)
write_style_fn = getattr(qgiscore, "write_style_to_gpkg", None)
if callable(write_style_fn):
try:
write_style_fn(self.gpkg_path, style_path, layer_name)
style_written = True
except Exception:
style_written = False
feature_count = len(features) if isinstance(features, list) else 0
self._store_style_in_gpkg(layer_name, style_path, layer)
# Erfolgsfall: Info für lade_Layer sammeln
layer_path = f"{self.gpkg_path}|layername={layer_name}"
results.append({
"layer_path": layer_path,
"thema": layer_name,
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": feature_count,
"style_written": style_written,
"style_path": style_path,
})
return results
# -----------------------------
# ------------------------------------------------------------------ #
# Lade Layer ins Projekt
# ------------------------------------------------------------------ #
@@ -288,18 +280,33 @@ class Datenschreiber:
self.pruefmanager.verarbeite(pe_err)
continue
try:
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
if callable(apply_style_fn):
apply_style_fn(self.gpkg_path, layer)
except Exception:
pe_warn = pruef_ergebnis(
ok=True,
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
aktion="stil_not_implemented",
kontext={"thema": thema},
)
self.pruefmanager.verarbeite(pe_warn)
style_path = info.get("style_path")
resolved_style_path = self._resolve_style_path(style_path)
if resolved_style_path:
try:
layer.loadNamedStyle(resolved_style_path)
layer.triggerRepaint()
except Exception as exc:
pe_warn = pruef_ergebnis(
ok=True,
meldung=f"Style konnte für {thema} nicht geladen werden: {exc}",
aktion="stil_laden_fehlgeschlagen",
kontext={"thema": thema, "style_path": resolved_style_path},
)
self.pruefmanager.verarbeite(pe_warn)
else:
try:
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
if callable(apply_style_fn):
apply_style_fn(self.gpkg_path, layer)
except Exception:
pe_warn = pruef_ergebnis(
ok=True,
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
aktion="stil_not_implemented",
kontext={"thema": thema},
)
self.pruefmanager.verarbeite(pe_warn)
try:
# qgisui wrapper wird hier nicht direkt für die Abfrage verwendet;
@@ -374,62 +381,67 @@ class Datenschreiber:
# ------------------------------------------------------------------ #
# Hilfsfunktionen intern
# ------------------------------------------------------------------ #
def _write_layer_to_gpkg(self, layer_name: str, features: List[Any], mode: str = "create") -> Optional[str]:
def _write_layer_to_gpkg(
self,
layer_name: str,
layer: Any,
) -> Optional[str]:
"""
Interne Hilfsfunktion zum Schreiben eines Layers in das GPKG.
Schreibt einen QgsVectorLayer in die Ziel-GPKG.
Erwartete qgiscore-Funktion:
qgiscore.write_features_to_gpkg(gpkg_path, layer_name, features, mode)
Voraussetzungen:
- self.gpkg_path ist ein str
- layer ist ein gültiger QgsVectorLayer
"""
write_fn = getattr(qgiscore, "write_features_to_gpkg", None)
if callable(write_fn):
try:
write_fn(self.gpkg_path, layer_name, features, mode)
return None
except Exception as exc:
return str(exc)
# Fallback: Verwende QgsVectorFileWriter, falls QGIS verfügbar
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsVectorFileWriter", None) is not None:
try:
# Minimaler Fallback: erwarte, dass 'features' eine Liste von QgsFeature ist
if not features:
# Erstelle leeren Layer-Eintrag (GPKG erlaubt leere Layer)
# Hier vereinfachen wir: writeAsVectorFormatV3 benötigt ein Layer-Objekt.
return None
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
return "Ungültiger Layer zum Schreiben übergeben"
# Versuche, ein Memory-Layer aus dem ersten Feature zu ermitteln
first = features[0]
mem_layer = None
if hasattr(first, "fields") and hasattr(first, "geometry"):
# Wenn Features QgsFeature sind, versuchen wir, das zugehörige Layer zu nutzen
try:
mem_layer = first.layer() if hasattr(first, "layer") else None
except Exception:
mem_layer = None
try:
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
opts.driverName = "GPKG"
opts.layerName = layer_name
opts.fileEncoding = "UTF-8"
if mem_layer is None:
return "Keine Feld-/Geometrie-Informationen zum Schreiben vorhanden"
# Style in der GPKG speichern, wenn möglich
if hasattr(opts, "symbologyExport"):
try:
# QGIS: SymbologyExport-Wert z.B. QgsVectorFileWriter.SaveVectorOptions.Symbology
saveOpts = qgiscore.QgsVectorFileWriter.SaveVectorOptions
sym_val = getattr(saveOpts, "Symbology", None)
if sym_val is None:
sym_val = getattr(saveOpts, "SymbologyExport", None)
if sym_val is not None:
opts.symbologyExport = sym_val
except Exception:
pass
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
opts.driverName = "GPKG"
opts.layerName = layer_name
opts.fileEncoding = "UTF-8"
if mode == "overwrite":
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
else:
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
# Datei existiert → Layer überschreiben
# Datei existiert nicht → neue GPKG anlegen
if not os.path.exists(self.gpkg_path):
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
else:
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
mem_layer,
self.gpkg_path,
qgiscore.QgsProject.instance().transformContext(),
opts
)
if err != qgiscore.QgsVectorFileWriter.NoError:
return f"Fehler beim Schreiben (Code {err})"
return None
except Exception as exc:
return str(exc)
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
layer,
self.gpkg_path,
qgiscore.QgsProject.instance().transformContext(),
opts,
)
return "Keine Schreib-Funktion verfügbar (Wrapper nicht implementiert)"
# QGIS ≥3 liefert ein Tupel: (error_code, error_message, new_filename, new_layer_name)
if isinstance(err, tuple):
error_code = err[0]
error_msg = err[1] if len(err) > 1 else ""
else:
error_code = err
error_msg = ""
if error_code != qgiscore.QgsVectorFileWriter.NoError:
return f"Fehler beim Schreiben (Code {error_code}, msg='{error_msg}')"
return None
except Exception as exc:
return str(exc)

395
modules/LayerLoader.py Normal file
View File

@@ -0,0 +1,395 @@
"""sn_basis/modules/LayerLoader.py
Kapselt Layer-Erstellung, Raumfilter und Stil-Logik.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import time
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
from sn_basis.functions.qgiscore_wrapper import (
QgsVectorLayer,
QgsRasterLayer,
QgsFeatureRequest,
QgsProject,
QgsNetworkAccessManager,
QgsCoordinateTransform,
)
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.layerpruefer import Layerpruefer
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qt_wrapper as qt
class LayerLoader:
"""Lädt und filtert Layer aus Dienst-/Datenquellen."""
def __init__(
self,
pruefmanager: Any,
stil_pruefer: Optional[Stilpruefer] = None,
layer_pruefer: Optional[Layerpruefer] = None,
) -> None:
self.pruefmanager = pruefmanager
self.stil_pruefer = stil_pruefer or Stilpruefer()
self.layer_pruefer = layer_pruefer or Layerpruefer()
_LAYER_TIMEOUT_MS = 30_000 # 30 Sekunden
def _was_canceled(self, cancel_callback: Optional[Any]) -> bool:
if not callable(cancel_callback):
return False
try:
return bool(cancel_callback())
except Exception:
return False
def _process_events(self) -> None:
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
def _transform_geometry_to_layer_crs(self, geometry: Any, source_layer: Any, target_layer: Any) -> Any:
if geometry is None or source_layer is None or target_layer is None:
return geometry
if QgsCoordinateTransform is None or QgsProject is None:
return geometry
try:
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
if source_crs is None or target_crs is None:
return geometry
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
if source_authid and target_authid and source_authid == target_authid:
return geometry
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
if hasattr(geometry, "clone") and callable(getattr(geometry, "clone")):
geom_copy = geometry.clone()
else:
geom_copy = geometry
geom_copy.transform(ct)
return geom_copy
except Exception:
return geometry
def _transform_extent_to_layer_crs(self, extent: Any, source_layer: Any, target_layer: Any) -> Any:
if extent is None or source_layer is None or target_layer is None:
return extent
if QgsCoordinateTransform is None or QgsProject is None:
return extent
try:
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
if source_crs is None or target_crs is None:
return extent
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
if source_authid and target_authid and source_authid == target_authid:
return extent
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
if hasattr(ct, "transformBoundingBox"):
return ct.transformBoundingBox(extent)
return extent
except Exception:
return extent
def create_layer(self, provider: str, link: str, thema: str) -> Optional[QgsVectorLayer]:
provider_lower = provider.lower() if provider else ""
layer = None
# Netzwerk-Timeout für alle netzwerkbasierten Provider setzen
if provider_lower in ("wfs", "wms", "rest"):
try:
nam = QgsNetworkAccessManager.instance()
if hasattr(nam, "setTimeout"):
nam.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
try:
if provider_lower == "wfs":
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
layer = QgsVectorLayer(uri, thema, "WFS")
elif provider_lower == "wms":
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
layer = QgsRasterLayer(uri, thema, "wms")
elif provider_lower in ("ogr", "gpkg", "shp", "geojson"):
layer = QgsVectorLayer(link, thema, "ogr")
elif provider_lower == "rest":
rest_link = link.strip()
if rest_link.lower().endswith("/featureserver"):
rest_link = rest_link.rstrip("/") + "/0"
uri = rest_link if rest_link.lower().startswith("url=") else f"url={rest_link}"
layer = QgsVectorLayer(uri, thema, "arcgisfeatureserver")
else:
layer = QgsVectorLayer(link, thema, "ogr")
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erstellen des Layers {thema}: {exc}",
aktion="layer_nicht_verfuegbar",
kontext={"provider": provider, "link": link},
)
)
return None
if not layer or not layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Layer {thema} (Provider={provider}) konnte nicht geladen werden."
,aktion="layer_nicht_verfuegbar",
kontext={"provider": provider, "link": link},
)
)
return None
return layer
def apply_style(self, layer: QgsVectorLayer, style_path: Optional[str]) -> None:
if not style_path or layer is None or not layer.isValid():
return
if not style_path.strip():
return
if not is_absolute_path(style_path):
plugin_root = get_plugin_root()
style_path = str(join_path(plugin_root, "sn_plan41", "assets", style_path))
# normalize path for consistency
style_path = str(normalize_path(style_path))
# Debug: welche Stil-Datei wird geprüft?
print(f"[LayerLoader] Überprüfe Stildatei: '{style_path}'")
if file_exists(style_path):
try:
layer.loadNamedStyle(style_path)
layer.triggerRepaint()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Stil-Laden für {layer.name()}: {exc}",
aktion="stil_laden_fehlgeschlagen",
kontext={"thema": layer.name(), "style_path": style_path},
)
)
else:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=True,
meldung=f"Stildatei nicht gefunden (optional): {style_path}",
aktion="stil_nicht_gefunden",
kontext={"thema": layer.name(), "style_path": style_path},
)
)
def filter_by_extent(self, layer: QgsVectorLayer, extent, cancel_callback: Optional[Any] = None, source_layer: Optional[Any] = None) -> Optional[QgsVectorLayer]:
"""Beschneidet <layer> auf die rechteckige Ausdehnung <extent>.
Diese Methode verwendet einen einfachen BBOX-Filter. Für komplexere
Raumeinschränkungen (z.B. Verfahrensgebiet) sollte stattdessen
:meth:`filter_by_layer` verwendet werden, da dort echte Geometrie-Tests
stattfinden.
"""
if not layer or not layer.isValid() or extent is None:
return layer
if layer.type() != QgsVectorLayer.VectorLayer:
return layer
extent_for_layer = self._transform_extent_to_layer_crs(extent, source_layer, layer)
request = QgsFeatureRequest().setFilterRect(extent_for_layer)
if hasattr(request, "setTimeout"):
try:
request.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
start = time.monotonic()
features: List[Any] = []
try:
for feat in layer.getFeatures(request):
if self._was_canceled(cancel_callback):
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Abbruch beim Raumfilter (BBOX) für {layer.name()}",
aktion="needs_user_action",
kontext={"thema": layer.name()},
)
)
return None
elapsed_ms = int((time.monotonic() - start) * 1000)
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Timeout beim Raumfilter (BBOX) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
aktion="url_nicht_erreichbar",
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
)
)
return None
features.append(feat)
if len(features) % 100 == 0:
self._process_events()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Lesen der Features für {layer.name()}: {exc}",
aktion="layer_nicht_verfuegbar",
kontext={"thema": layer.name()},
)
)
return None
if not features:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_bbox", "memory")
if not filtered_layer or not filtered_layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
aktion="filterlayer_nicht_erzeugt",
kontext={"thema": layer.name()},
)
)
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(features)
filtered_layer.updateExtents()
return filtered_layer
def filter_by_layer(self, layer: QgsVectorLayer, filter_layer: QgsVectorLayer, cancel_callback: Optional[Any] = None) -> Optional[QgsVectorLayer]:
"""Beschneidet <layer> auf die tatsächliche Geometrie des
<filter_layer>.
Diese Methode wird z.B. für das Verfahrensgebiet verwendet, damit nicht
die gesamte Bounding-Box, sondern nur die echten Flächen als Raumfilter
gelten. Wenn der Filter-Layer mehrere Features enthält, werden deren
Geometrien zu einem Multi-Geom vereinigt.
"""
if not layer or not layer.isValid() or not filter_layer or not filter_layer.isValid():
return layer
if layer.type() != QgsVectorLayer.VectorLayer:
return layer
# vereinigte Geometrie aller Features im Filter-Layer
union_geom = None
for f in filter_layer.getFeatures():
try:
geom = self._transform_geometry_to_layer_crs(f.geometry(), filter_layer, layer)
if union_geom is None:
union_geom = geom
else:
union_geom = union_geom.combine(geom)
except Exception:
# bei einem Fehler einfach weiterfahren
continue
if union_geom is None or union_geom.isEmpty():
return None
# nun alle Features aus <layer> nehmen, deren Geometrie sich schneidet
filtered = []
request = QgsFeatureRequest().setFilterRect(union_geom.boundingBox())
if hasattr(request, "setTimeout"):
try:
request.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
start = time.monotonic()
for f in layer.getFeatures(request):
if self._was_canceled(cancel_callback):
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Abbruch beim Raumfilter (Geometrie) für {layer.name()}",
aktion="needs_user_action",
kontext={"thema": layer.name()},
)
)
return None
elapsed_ms = int((time.monotonic() - start) * 1000)
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Timeout beim Raumfilter (Geometrie) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
aktion="url_nicht_erreichbar",
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
)
)
return None
try:
if f.geometry() and f.geometry().intersects(union_geom):
filtered.append(f)
except Exception:
continue
if len(filtered) % 100 == 0:
self._process_events()
if not filtered:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_filtered", "memory")
if not filtered_layer or not filtered_layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
aktion="filterlayer_nicht_erzeugt",
kontext={"thema": layer.name()},
)
)
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(filtered)
filtered_layer.updateExtents()
return filtered_layer
def add_to_project(self, layer: QgsVectorLayer) -> None:
if layer and layer.isValid():
QgsProject.instance().addMapLayer(layer)

View File

@@ -56,6 +56,27 @@ class Pruefmanager:
)
info("DataGrabber Zusammenfassung", message)
# ------------------------------------------------------------------
# Allgemeine Nutzerinteraktionen
# ------------------------------------------------------------------
def zeige_hinweis(self, titel: str, meldung: str) -> None:
"""Zeigt eine modale Hinweismeldung mit OK-Button."""
from sn_basis.functions.dialog_wrapper import show_info_dialog
show_info_dialog(titel, meldung, parent=self.parent)
def frage_ja_nein(self, titel: str, meldung: str, default: bool = True) -> bool:
"""Stellt eine Ja/Nein-Frage. Gibt True zurück, wenn der Nutzer Ja wählt."""
if self.ui_modus != "qgis":
return default
return ask_yes_no(titel, meldung, default=default, parent=self.parent)
def frage_text(self, titel: str, meldung: str, default_text: str = "") -> tuple[str, bool]:
"""Fragt einen Textwert ab und gibt Text + OK-Status zurück."""
from sn_basis.functions.dialog_wrapper import ask_text
if self.ui_modus != "qgis":
return default_text, True
return ask_text(titel, meldung, default_text=default_text, parent=self.parent)
# ------------------------------------------------------------------
# VERFAHRENS-DB-spezifische Entscheidungen
# ------------------------------------------------------------------
@@ -216,3 +237,26 @@ class Pruefmanager:
)
print("🔥 verarbeite() ENDE mit ok=False")
return ergebnis
def _ask_use_or_replace_pufferlayer(self) -> str:
"""
Fragt den Nutzer, ob ein vorhandener Pufferlayer verwendet
oder ersetzt werden soll.
Returns
-------
str
"verwenden", "ersetzen" oder "abbrechen"
"""
ergebnis = pruef_ergebnis(
ok=False,
aktion="layer_existiert",
meldung="Ein Pufferlayer ist bereits vorhanden.",
)
ergebnis = self.pruefmanager.verarbeite(ergebnis)
if not ergebnis.ok:
return "abbrechen"
return "verwenden" if ergebnis.aktion == "ok" else "ersetzen"

View File

@@ -61,7 +61,8 @@ class Linkpruefer:
aktion="leer",
kontext=None,
)
#evtl. Pfad-Objekte in string umwandeln
eingabe = str(eingabe)
# -----------------------------------------------------
# 1. Fall: URL
# -----------------------------------------------------

View File

@@ -43,6 +43,13 @@ PruefAktion = Literal[
# Dateiendung/Format
"falsche_endung",
"pflichtfelder_fehlen",
"unbekannter_dateityp",
"Datenbank",
"dienst",
"excel",
"unbekannte_quelle",
# Excel/Import
"kein_header",
@@ -50,6 +57,8 @@ PruefAktion = Literal[
"read_error",
"open_error",
"datenabruf",
# 🆕 VERFAHRENS-DB SPEZIFISCH (deine Anforderungen 2.d, 2.e)
"datei_wird_erzeugt", # 2.d: Pfad gültig, Datei fehlt → weiter

View File

@@ -4,9 +4,10 @@ Prüft ausschließlich, ob ein Stilpfad gültig ist.
Die Anwendung erfolgt später über eine Aktion.
"""
from pathlib import Path
import os
from sn_basis.functions.sys_wrapper import file_exists
from sn_basis.functions.os_wrapper import is_absolute_path
from sn_basis.functions.sys_wrapper import get_plugin_root, file_exists, join_path
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
@@ -40,7 +41,11 @@ class Stilpruefer:
kontext=None,
)
pfad = Path(stil_pfad)
pfad = str(stil_pfad)
if not is_absolute_path(pfad):
plugin_root = get_plugin_root()
pfad = str(join_path(plugin_root, "sn_plan41", "assets", pfad))
# -----------------------------------------------------
# 2. Datei existiert nicht
@@ -56,7 +61,7 @@ class Stilpruefer:
# -----------------------------------------------------
# 3. Falsche Endung
# -----------------------------------------------------
if pfad.suffix.lower() != ".qml":
if os.path.splitext(pfad)[1].lower() != ".qml":
return pruef_ergebnis(
ok=False,
meldung="Die Stil-Datei muss die Endung '.qml' haben.",

View File

@@ -1,11 +0,0 @@
name=LNO Sachsen | Basisfunktionen
description=Plugin mit Basisfunktionen
author=Daniel Helbig
email=daniel.helbig@kreis-meissen.de
qgisMinimumVersion=3.0
qgisMaximumVersion=3.99
deprecated=False
experimental=False
supportsQt6=Yes
zip_folder=sn_basis