forked from AG_QGIS/Plugin_SN_Basis
Compare commits
31 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2ff465b86d | |||
| f19fe71bfa | |||
| ae5f88c5b8 | |||
| 7cd6e3ef24 | |||
| 1be1420f66 | |||
| f25e30c489 | |||
| 0eb32453d6 | |||
| 841b529ad8 | |||
| ae5725cd03 | |||
| ac5a3993c8 | |||
| 22b45fe19a | |||
|
|
24c2137dc2 | ||
|
|
c0c0387b1d | ||
|
|
663ca770a1 | ||
|
|
04319b6f7b | ||
|
|
1c70d62739 | ||
|
|
3971bd3408 | ||
|
|
fa04fc80e3 | ||
|
|
04bdfbe9d8 | ||
|
|
b6b791e5bd | ||
|
|
82be564c29 | ||
|
|
f42260b66c | ||
|
|
327c25388f | ||
|
|
c6c9613120 | ||
| 6e1f4c615b | |||
| f876218134 | |||
| 9829ac9c81 | |||
| ae956b0046 | |||
| 0ec24029d8 | |||
| 948041da52 | |||
| 439de5527a |
133
.gitea/workflows/release.yaml
Normal file
133
.gitea/workflows/release.yaml
Normal file
@@ -0,0 +1,133 @@
|
||||
name: Release Plugin
|
||||
run-name: "Release | ${{ github.ref_name }}"
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- 'v*'
|
||||
|
||||
jobs:
|
||||
release:
|
||||
runs-on: alpine-latest
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
|
||||
steps:
|
||||
- name: Notwendige Abhängigkeiten installieren
|
||||
shell: sh
|
||||
run: |
|
||||
apk add --no-cache bash git jq curl
|
||||
|
||||
- name: Code holen
|
||||
run: |
|
||||
REPO_URL="https://${RELEASE_TOKEN}:x-oauth-basic@${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}.git"
|
||||
|
||||
git clone "$REPO_URL" repo
|
||||
cd repo
|
||||
|
||||
git checkout "$TAG"
|
||||
env:
|
||||
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
|
||||
TAG: "${{ github.ref_name }}"
|
||||
|
||||
- name: Version und Kanal bestimmen
|
||||
id: releaseinfo
|
||||
run: |
|
||||
TAG="${{ github.ref_name }}"
|
||||
RAW_VERSION="${TAG#v}"
|
||||
VERSION="${RAW_VERSION%%-*}"
|
||||
|
||||
# Channel und Suffix automatisch bestimmen anhand des Tag-Suffix
|
||||
case "$RAW_VERSION" in
|
||||
*-testing*|*-t|*-T)
|
||||
CHANNEL="testing"
|
||||
PRERELEASE="true"
|
||||
SUFFIX="-testing"
|
||||
;;
|
||||
*-unstable*|*-u|*-U)
|
||||
CHANNEL="unstable"
|
||||
PRERELEASE="true"
|
||||
SUFFIX="-unstable"
|
||||
;;
|
||||
*)
|
||||
CHANNEL="stable"
|
||||
PRERELEASE="false"
|
||||
SUFFIX=""
|
||||
;;
|
||||
esac
|
||||
|
||||
# Output setzen
|
||||
echo "version=$VERSION" >> $GITHUB_OUTPUT
|
||||
echo "channel=$CHANNEL" >> $GITHUB_OUTPUT
|
||||
echo "prerelease=$PRERELEASE" >> $GITHUB_OUTPUT
|
||||
|
||||
# Optional Debug
|
||||
echo "VERSION=$VERSION"
|
||||
echo "CHANNEL=$CHANNEL"
|
||||
echo "PRERELEASE=$PRERELEASE"
|
||||
|
||||
# - name: plugin.cfg einlesen
|
||||
# id: config
|
||||
# run: |
|
||||
# cd repo
|
||||
# while read -r line || [ -n "$line" ]; do
|
||||
# key="${line%%=*}"
|
||||
# value="${line#*=}"
|
||||
# echo "$key=$value" >> $GITHUB_OUTPUT
|
||||
# done < plugin.cfg
|
||||
|
||||
- name: Payload erzeugen
|
||||
id: payload
|
||||
run: |
|
||||
cd repo
|
||||
|
||||
NAME="${GITHUB_REPOSITORY##*/}"
|
||||
GROUP="${GITHUB_REPOSITORY%%/*}"
|
||||
VERSION="${{ steps.releaseinfo.outputs.version }}"
|
||||
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
|
||||
PRERELEASE="${{ steps.releaseinfo.outputs.prerelease }}"
|
||||
ZIP_FOLDER="${{ vars.ZIP_FOLDER }}"
|
||||
ZIP_FILE="${ZIP_FOLDER}.zip"
|
||||
TAG="${{ github.ref_name }}"
|
||||
#GIT_URL=${GITHUB_REPOSITORY}
|
||||
|
||||
jq -n \
|
||||
--arg name "$NAME" \
|
||||
--arg group "$GROUP" \
|
||||
--arg version "$VERSION" \
|
||||
--arg channel "$CHANNEL" \
|
||||
--arg prerelease "$PRERELEASE" \
|
||||
--arg zip_folder "$ZIP_FOLDER" \
|
||||
--arg zip_file "$ZIP_FILE" \
|
||||
--arg tag "$TAG" \
|
||||
'{
|
||||
name: $name,
|
||||
group: $group,
|
||||
version: $version,
|
||||
channel: $channel,
|
||||
prerelease: ($prerelease == "true"),
|
||||
zip_folder: $zip_folder,
|
||||
zip_file: $zip_file,
|
||||
tag: $tag
|
||||
}' > payload.json
|
||||
|
||||
cat payload.json
|
||||
|
||||
- name: Repository aktualisieren
|
||||
run: |
|
||||
NAME="${GITHUB_REPOSITORY##*/}"
|
||||
TAG="${{ steps.releaseinfo.outputs.version }}"-"${{ steps.releaseinfo.outputs.channel }}"
|
||||
PAYLOAD_B64=$(base64 -w0 repo/payload.json)
|
||||
|
||||
JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\",\"name\":\"$NAME\",\"tag\":\"$TAG\"}}"
|
||||
curl -X POST \
|
||||
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$JSON" \
|
||||
"https://${{ vars.RELEASE_URL }}/api/v1/repos/${OWNER}/Repository/actions/workflows/${WORKFLOW}/dispatches"
|
||||
env:
|
||||
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
|
||||
OWNER: "AG_QGIS"
|
||||
WORKFLOW: "release.yaml"
|
||||
|
||||
@@ -1,289 +0,0 @@
|
||||
name: Release Plugin
|
||||
run-name: "Release | ${{ github.ref_name }}"
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- 'v*'
|
||||
|
||||
jobs:
|
||||
release:
|
||||
runs-on: alpine-latest
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
|
||||
steps:
|
||||
- name: Notwendige Abhängigkeiten installieren
|
||||
shell: sh
|
||||
run: |
|
||||
apk add --no-cache git zip curl jq rsync bash
|
||||
git config --global http.sslVerify false
|
||||
|
||||
- name: Code holen
|
||||
run: |
|
||||
# Tag aus GitHub Actions Kontext extrahieren
|
||||
TAG="${GITHUB_REF#refs/tags/}"
|
||||
|
||||
# Repo-URL dynamisch aus vars und github.repository bauen
|
||||
REPO_URL="https://${RELEASE_TOKEN}:x-oauth-basic@${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}.git"
|
||||
|
||||
# Repository klonen
|
||||
git clone "$REPO_URL" repo
|
||||
cd repo
|
||||
|
||||
git checkout "$TAG"
|
||||
env:
|
||||
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
|
||||
|
||||
- name: Version und Kanal bestimmen
|
||||
id: releaseinfo
|
||||
run: |
|
||||
TAG="${{ github.ref_name }}"
|
||||
VERSION="${TAG#v}"
|
||||
|
||||
case "$TAG" in
|
||||
*-unstable*)
|
||||
CHANNEL="unstable"
|
||||
DRAFT="false"
|
||||
PRERELEASE="true"
|
||||
;;
|
||||
*-testing*)
|
||||
CHANNEL="testing"
|
||||
DRAFT="false"
|
||||
PRERELEASE="true"
|
||||
;;
|
||||
*)
|
||||
CHANNEL="stable"
|
||||
DRAFT="false"
|
||||
PRERELEASE="false"
|
||||
;;
|
||||
esac
|
||||
|
||||
echo "version=$VERSION" >> $GITHUB_OUTPUT
|
||||
echo "channel=$CHANNEL" >> $GITHUB_OUTPUT
|
||||
echo "draft=$DRAFT" >> $GITHUB_OUTPUT
|
||||
echo "prerelease=$PRERELEASE" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: plugin.info einlesen
|
||||
id: info
|
||||
run: |
|
||||
cd repo
|
||||
while IFS='=' read -r key value; do
|
||||
echo "$key=$value" >> $GITHUB_OUTPUT
|
||||
done < plugin.info
|
||||
|
||||
- name: Changelog einlesen
|
||||
id: changelog
|
||||
run: |
|
||||
cd repo
|
||||
|
||||
# Aktueller Block = alles vor dem ersten ---
|
||||
CURRENT=$(awk '/^---/{exit} {print}' changelog.txt)
|
||||
|
||||
# Vollständige Historie = alles nach dem ersten ---
|
||||
HISTORY=$(awk 'found{print} /^---/{found=1}' changelog.txt)
|
||||
|
||||
# Gitea Release Body zusammenbauen
|
||||
VERSION="${{ steps.releaseinfo.outputs.version }}"
|
||||
FULL=$(printf "## %s\n%s\n\n%s" "$VERSION" "$CURRENT" "$HISTORY")
|
||||
|
||||
echo "DEBUG | Aktueller Changelog:"
|
||||
echo "$CURRENT"
|
||||
|
||||
# Für GITHUB_OUTPUT: Multiline via EOF-Marker
|
||||
echo "current<<EOF" >> $GITHUB_OUTPUT
|
||||
echo "$CURRENT" >> $GITHUB_OUTPUT
|
||||
echo "EOF" >> $GITHUB_OUTPUT
|
||||
|
||||
echo "full<<EOF" >> $GITHUB_OUTPUT
|
||||
echo "$FULL" >> $GITHUB_OUTPUT
|
||||
echo "EOF" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: metadata.txt erzeugen
|
||||
run: |
|
||||
cd repo
|
||||
|
||||
# ------------------------ GEÄNDERT ------------------------
|
||||
# Temporär die Vorlage aus dem hidden/templates Branch holen
|
||||
git fetch origin hidden/templates
|
||||
git checkout origin/hidden/templates -- metadata.template
|
||||
TEMPLATE="metadata.template"
|
||||
# -----------------------------------------------------------
|
||||
|
||||
# TEMPLATE="templates/metadata.template"
|
||||
OUT="metadata.txt"
|
||||
|
||||
CONTENT=$(cat "$TEMPLATE")
|
||||
|
||||
CONTENT="${CONTENT//\{\{NAME\}\}/${{ steps.info.outputs.name }}}"
|
||||
CONTENT="${CONTENT//\{\{QGIS_MIN\}\}/${{ steps.info.outputs.qgisMinimumVersion }}}"
|
||||
CONTENT="${CONTENT//\{\{QGIS_MAX\}\}/${{ steps.info.outputs.qgisMaximumVersion }}}"
|
||||
CONTENT="${CONTENT//\{\{DESCRIPTION\}\}/${{ steps.info.outputs.description }}}"
|
||||
CONTENT="${CONTENT//\{\{VERSION\}\}/${{ steps.releaseinfo.outputs.version }}}"
|
||||
CONTENT="${CONTENT//\{\{AUTHOR\}\}/${{ steps.info.outputs.author }}}"
|
||||
CONTENT="${CONTENT//\{\{EMAIL\}\}/${{ steps.info.outputs.email }}}"
|
||||
CONTENT="${CONTENT//\{\{HOMEPAGE\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
|
||||
CONTENT="${CONTENT//\{\{TRACKER\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
|
||||
CONTENT="${CONTENT//\{\{REPOSITORY\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}"
|
||||
CONTENT="${CONTENT//\{\{EXPERIMENTAL\}\}/${{ steps.info.outputs.experimental }}}"
|
||||
CONTENT="${CONTENT//\{\{DEPRECATED\}\}/${{ steps.info.outputs.deprecated }}}"
|
||||
CONTENT="${CONTENT//\{\{QT6\}\}/${{ steps.info.outputs.supportsQt6 }}}"
|
||||
|
||||
printf "%s\n" "$CONTENT" > "$OUT"
|
||||
rm $TEMPLATE
|
||||
|
||||
- name: ZIP-Datei erstellen
|
||||
id: zip
|
||||
run: |
|
||||
cd repo
|
||||
|
||||
ZIP_FOLDER="${{ steps.info.outputs.zip_folder }}"
|
||||
ZIP_FILE="${ZIP_FOLDER}.zip"
|
||||
|
||||
VERSION="${{ steps.releaseinfo.outputs.version }}"
|
||||
REPO_NAME="${GITHUB_REPOSITORY##*/}"
|
||||
#ZIP_NAME="${REPO_NAME}-${VERSION}.zip"
|
||||
|
||||
|
||||
mkdir -p dist/${ZIP_FOLDER}
|
||||
|
||||
rsync -a \
|
||||
--exclude='.git' \
|
||||
--exclude='.gitea' \
|
||||
--exclude='.plugin' \
|
||||
--exclude='dist' \
|
||||
./ dist/${ZIP_FOLDER}/
|
||||
|
||||
cd dist
|
||||
zip -r "${ZIP_FILE}" "${ZIP_FOLDER}/" \
|
||||
-x "*.pyc" -x "*/__pycache__/*"
|
||||
cd ..
|
||||
|
||||
echo "zip_file=${ZIP_FILE}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Gitea-Release erstellen
|
||||
id: create_release
|
||||
run: |
|
||||
TAG="${{ github.ref_name }}"
|
||||
VERSION="${{ steps.releaseinfo.outputs.version }}"
|
||||
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
|
||||
|
||||
API_URL="https://${{ vars.RELEASE_URL }}/api/v1/repos/${GITHUB_REPOSITORY}/releases"
|
||||
|
||||
JSON=$(jq -n \
|
||||
--arg tag "$TAG" \
|
||||
--arg name "Version $VERSION" \
|
||||
--arg body "${{ steps.changelog.outputs.current }}" \
|
||||
--argjson draft "${{ steps.releaseinfo.outputs.draft }}" \
|
||||
--argjson prerelease "${{ steps.releaseinfo.outputs.prerelease }}" \
|
||||
'{tag_name: $tag, name: $name, body: $body, draft: $draft, prerelease: $prerelease}')
|
||||
|
||||
API_RESPONSE=$(curl -s -X POST "$API_URL" \
|
||||
-H "accept: application/json" \
|
||||
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$JSON")
|
||||
|
||||
RELEASE_ID=$(echo "$API_RESPONSE" | jq -r '.id')
|
||||
|
||||
if [ "$RELEASE_ID" = "null" ] || [ -z "$RELEASE_ID" ]; then
|
||||
echo "Fehler beim Erstellen des Releases!"
|
||||
echo "$API_RESPONSE"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "release_id=$RELEASE_ID" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: ZIP-Datei hochladen
|
||||
run: |
|
||||
RELEASE_ID="${{ steps.create_release.outputs.release_id }}"
|
||||
ZIP_FILE="${{ steps.zip.outputs.zip_file }}"
|
||||
|
||||
API_URL="https://${{ vars.RELEASE_URL }}/api/v1/repos/${GITHUB_REPOSITORY}/releases/${RELEASE_ID}/assets?name=${ZIP_FILE}"
|
||||
|
||||
curl -s -X POST "$API_URL" \
|
||||
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
|
||||
-H "Content-Type: application/zip" \
|
||||
--data-binary "@repo/dist/${ZIP_FILE}" \
|
||||
-o upload_response.json
|
||||
|
||||
# Optional: Fehlerprüfung
|
||||
if jq -e '.id' upload_response.json >/dev/null 2>&1; then
|
||||
echo "ZIP erfolgreich hochgeladen."
|
||||
else
|
||||
echo "Fehler beim Hochladen der ZIP!"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Payload erzeugen
|
||||
run: |
|
||||
cd repo
|
||||
|
||||
VERSION="${{ steps.releaseinfo.outputs.version }}"
|
||||
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
|
||||
ZIP_FILE="${{ steps.zip.outputs.zip_file }}"
|
||||
|
||||
DOWNLOAD_URL="https://${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}/releases/download/${{ github.ref_name }}/${ZIP_FILE}"
|
||||
|
||||
jq -n \
|
||||
--arg name "${{ steps.info.outputs.name }}" \
|
||||
--arg version "$VERSION" \
|
||||
--arg channel "$CHANNEL" \
|
||||
--arg description "${{ steps.info.outputs.description }}" \
|
||||
--arg author "${{ steps.info.outputs.author }}" \
|
||||
--arg email "${{ steps.info.outputs.email }}" \
|
||||
--arg qgis_min "${{ steps.info.outputs.qgisMinimumVersion }}" \
|
||||
--arg qgis_max "${{ steps.info.outputs.qgisMaximumVersion }}" \
|
||||
--arg homepage "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
|
||||
--arg tracker "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
|
||||
--arg repository "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \
|
||||
--arg experimental "${{ steps.info.outputs.experimental }}" \
|
||||
--arg deprecated "${{ steps.info.outputs.deprecated }}" \
|
||||
--arg qt6 "${{ steps.info.outputs.supportsQt6 }}" \
|
||||
--arg id "${{ steps.info.outputs.zip_folder }}" \
|
||||
--arg url "$DOWNLOAD_URL" \
|
||||
--arg changelog "${{ steps.changelog.outputs.current }}" \
|
||||
'{
|
||||
name: $name,
|
||||
version: $version,
|
||||
channel: $channel,
|
||||
description: $description,
|
||||
author: $author,
|
||||
email: $email,
|
||||
qgis_min: $qgis_min,
|
||||
qgis_max: $qgis_max,
|
||||
homepage: $homepage,
|
||||
tracker: $tracker,
|
||||
repository: $repository,
|
||||
experimental: $experimental,
|
||||
deprecated: $deprecated,
|
||||
qt6: $qt6,
|
||||
id: $id,
|
||||
url: $url,
|
||||
changelog: $changelog
|
||||
}' > payload.json
|
||||
|
||||
- name: Repository aktualisieren
|
||||
run: |
|
||||
OWNER="AG_QGIS"
|
||||
WORKFLOW="update.yml"
|
||||
|
||||
PAYLOAD_B64=$(base64 -w0 repo/payload.json)
|
||||
|
||||
FULL_NAME="${{ steps.info.outputs.name }}"
|
||||
NAME=$(echo "$FULL_NAME" | awk -F'|' '{gsub(/^ +| +$/,"",$2); print $2}')
|
||||
TAG="${{ steps.releaseinfo.outputs.version }}"
|
||||
|
||||
JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\",\"name\":\"$NAME\",\"tag\":\"$TAG\"}}"
|
||||
|
||||
#JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\"}}"
|
||||
|
||||
echo "DEBUG | Sende JSON:"
|
||||
echo "$JSON"
|
||||
|
||||
curl -X POST \
|
||||
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$JSON" \
|
||||
"https://${{ vars.RELEASE_URL }}/api/v1/repos/${OWNER}/Repository/actions/workflows/${WORKFLOW}/dispatches"
|
||||
0
changelog.txt
Normal file
0
changelog.txt
Normal file
@@ -5,7 +5,7 @@ from typing import Any
|
||||
from typing import Literal, Optional
|
||||
from sn_basis.functions.qt_wrapper import (
|
||||
QMessageBox, YES, NO, CANCEL, QT_VERSION, exec_dialog, ICON_QUESTION,
|
||||
|
||||
QProgressDialog, QCoreApplication, Qt, QInputDialog, QLineEdit,
|
||||
)
|
||||
|
||||
def ask_yes_no(
|
||||
@@ -39,6 +39,50 @@ def ask_yes_no(
|
||||
return default
|
||||
|
||||
|
||||
def show_info_dialog(title: str, message: str, parent: Any = None) -> None:
|
||||
"""
|
||||
Zeigt einen modalen Info-Dialog mit OK-Button.
|
||||
Blockiert bis der Nutzer bestätigt.
|
||||
"""
|
||||
try:
|
||||
if QT_VERSION == 0: # Mock-Modus
|
||||
print(f"Mock-Modus: show_info_dialog('{title}')")
|
||||
return
|
||||
QMessageBox.information(parent, title, message)
|
||||
except Exception as e:
|
||||
print(f"⚠️ show_info_dialog Fehler: {e}")
|
||||
|
||||
|
||||
def ask_text(
|
||||
title: str,
|
||||
label: str,
|
||||
default_text: str = "",
|
||||
parent: Any = None,
|
||||
) -> tuple[str, bool]:
|
||||
"""Zeigt einen modalen Texteingabe-Dialog und gibt Text + OK-Status zurück."""
|
||||
try:
|
||||
if QT_VERSION == 0: # Mock-Modus
|
||||
print(f"Mock-Modus: ask_text('{title}') -> '{default_text}'")
|
||||
return default_text, True
|
||||
# PyQt6: QLineEdit.EchoMode.Normal / PyQt5: QLineEdit.Normal
|
||||
echo_mode = (
|
||||
getattr(QLineEdit, "Normal", None)
|
||||
or getattr(getattr(QLineEdit, "EchoMode", None), "Normal", None)
|
||||
or 0
|
||||
)
|
||||
text, accepted = QInputDialog.getText(
|
||||
parent,
|
||||
title,
|
||||
label,
|
||||
echo_mode,
|
||||
default_text,
|
||||
)
|
||||
return str(text or ""), bool(accepted)
|
||||
except Exception as e:
|
||||
print(f"⚠️ ask_text Fehler: {e}")
|
||||
return default_text, False
|
||||
|
||||
|
||||
OverwriteDecision = Optional[Literal["overwrite", "append", "cancel"]]
|
||||
|
||||
|
||||
@@ -82,3 +126,101 @@ def ask_overwrite_append_cancel_custom(
|
||||
return "append"
|
||||
else: # cancel_btn
|
||||
return "cancel"
|
||||
|
||||
|
||||
class ProgressDialog:
|
||||
def __init__(self, total: int, title: str = "Fortschritt", label: str = "Verarbeite..."):
|
||||
self.total = max(total, 1)
|
||||
self._canceled = False
|
||||
|
||||
if QT_VERSION == 0:
|
||||
self.value = 0
|
||||
self.label = label
|
||||
self.title = title
|
||||
return
|
||||
|
||||
self._dlg = QProgressDialog(label, "Abbrechen", 0, self.total)
|
||||
self._dlg.setWindowTitle(title)
|
||||
|
||||
# Qt5 vs Qt6: WindowModality-Enum unterschiedlich verfügbar
|
||||
modality = None
|
||||
if hasattr(Qt, "WindowModality"):
|
||||
try:
|
||||
modality = Qt.WindowModality.WindowModal
|
||||
except Exception:
|
||||
modality = None
|
||||
if modality is None and hasattr(Qt, "WindowModal"):
|
||||
modality = Qt.WindowModal
|
||||
if modality is not None:
|
||||
try:
|
||||
self._dlg.setWindowModality(modality)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._dlg.setMinimumDuration(0)
|
||||
self._dlg.setAutoClose(False)
|
||||
self._dlg.setAutoReset(False)
|
||||
self._dlg.setValue(0)
|
||||
|
||||
def on_cancel():
|
||||
if self._dlg and self._dlg.value() >= self.total:
|
||||
# OK-Button am Ende
|
||||
self._dlg.close()
|
||||
return
|
||||
self._canceled = True
|
||||
self._dlg.close()
|
||||
|
||||
try:
|
||||
self._dlg.canceled.connect(on_cancel)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def set_total(self, total: int) -> None:
|
||||
self.total = max(total, 1)
|
||||
if QT_VERSION == 0:
|
||||
return
|
||||
|
||||
if self._dlg is not None:
|
||||
self._dlg.setMaximum(self.total)
|
||||
|
||||
def set_value(self, value: int) -> None:
|
||||
if QT_VERSION == 0:
|
||||
self.value = value
|
||||
return
|
||||
|
||||
if self._dlg is not None:
|
||||
self._dlg.setValue(min(value, self.total))
|
||||
if value >= self.total:
|
||||
self._dlg.setLabelText("Fertig. Klicken Sie auf OK, um das Fenster zu schließen.")
|
||||
self._dlg.setCancelButtonText("OK")
|
||||
QCoreApplication.processEvents()
|
||||
|
||||
def set_label(self, text: str) -> None:
|
||||
if QT_VERSION == 0:
|
||||
self.label = text
|
||||
return
|
||||
|
||||
if self._dlg is not None:
|
||||
self._dlg.setLabelText(text)
|
||||
QCoreApplication.processEvents()
|
||||
|
||||
def is_canceled(self) -> bool:
|
||||
if QT_VERSION == 0:
|
||||
return self._canceled
|
||||
|
||||
if self._dlg is not None:
|
||||
return self._canceled or self._dlg.wasCanceled()
|
||||
|
||||
return self._canceled
|
||||
|
||||
def close(self) -> None:
|
||||
if QT_VERSION == 0:
|
||||
return
|
||||
|
||||
if self._dlg is not None:
|
||||
self._dlg.close()
|
||||
|
||||
|
||||
def create_progress_dialog(total: int, title: str = "Fortschritt", label: str = "Verarbeite...") -> ProgressDialog:
|
||||
return ProgressDialog(total, title, label)
|
||||
|
||||
|
||||
@@ -57,6 +57,22 @@ def get_home_dir() -> Path:
|
||||
return Path.home()
|
||||
|
||||
|
||||
def is_absolute_path(path: _PathLike) -> bool:
|
||||
"""Prüft, ob ein Pfad absolut ist."""
|
||||
try:
|
||||
return Path(path).is_absolute()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def basename(path: _PathLike) -> str:
|
||||
"""Gibt den finalen Namen des Pfades zurück (Dateiname oder Ordner)."""
|
||||
try:
|
||||
return Path(path).name
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# Dateisystem-Eigenschaften
|
||||
# ---------------------------------------------------------
|
||||
@@ -75,3 +91,11 @@ def is_case_sensitive_fs() -> bool:
|
||||
|
||||
# Linux praktisch immer case-sensitiv
|
||||
return True
|
||||
|
||||
|
||||
def path_suffix(path: _PathLike) -> str:
|
||||
"""Gibt die Dateiendung eines Pfades zurück (inklusive Punkt)."""
|
||||
try:
|
||||
return Path(path).suffix
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
@@ -20,6 +20,19 @@ QgsNetworkAccessManager: Type[Any]
|
||||
Qgis: Type[Any]
|
||||
QgsMapLayerProxyModel: Type[Any]
|
||||
QgsVectorFileWriter: Type[Any] # neu: Schreib-API
|
||||
QgsFeature: Type[Any]
|
||||
QgsField: Type[Any]
|
||||
QgsGeometry: Type[Any]
|
||||
QgsFeatureRequest: Type[Any]
|
||||
QgsCoordinateTransform: Type[Any]
|
||||
QgsCoordinateReferenceSystem: Type[Any]
|
||||
QgsPrintLayout: Type[Any]
|
||||
QgsLayoutItemMap: Type[Any]
|
||||
QgsLayoutItemLabel: Type[Any]
|
||||
QgsLayoutPoint: Type[Any]
|
||||
QgsLayoutSize: Type[Any]
|
||||
QgsUnitTypes: Type[Any]
|
||||
QgsLayoutItem: Type[Any]
|
||||
|
||||
QGIS_AVAILABLE = False
|
||||
|
||||
@@ -36,9 +49,19 @@ try:
|
||||
Qgis as _Qgis,
|
||||
QgsMapLayerProxyModel as _QgsMaplLayerProxyModel,
|
||||
QgsVectorFileWriter as _QgsVectorFileWriter,
|
||||
QgsFeature as _QgsFeature,
|
||||
QgsFeature as _QgsFeature,
|
||||
QgsField as _QgsField,
|
||||
QgsGeometry as _QgsGeometry,
|
||||
QgsGeometry as _QgsGeometry,
|
||||
QgsFeatureRequest as _QgsFeatureRequest,
|
||||
QgsCoordinateTransform as _QgsCoordinateTransform,
|
||||
QgsCoordinateReferenceSystem as _QgsCoordinateReferenceSystem,
|
||||
QgsPrintLayout as _QgsPrintLayout,
|
||||
QgsLayoutItemMap as _QgsLayoutItemMap,
|
||||
QgsLayoutItemLabel as _QgsLayoutItemLabel,
|
||||
QgsLayoutPoint as _QgsLayoutPoint,
|
||||
QgsLayoutSize as _QgsLayoutSize,
|
||||
QgsUnitTypes as _QgsUnitTypes,
|
||||
QgsLayoutItem as _QgsLayoutItem,
|
||||
)
|
||||
|
||||
QgsProject = _QgsProject
|
||||
@@ -50,7 +73,17 @@ try:
|
||||
QgsVectorFileWriter = _QgsVectorFileWriter
|
||||
QgsFeature = _QgsFeature
|
||||
QgsField = _QgsField
|
||||
QgsGeometry = _QgsGeometry
|
||||
QgsGeometry = _QgsGeometry
|
||||
QgsFeatureRequest = _QgsFeatureRequest
|
||||
QgsCoordinateTransform = _QgsCoordinateTransform
|
||||
QgsCoordinateReferenceSystem = _QgsCoordinateReferenceSystem
|
||||
QgsPrintLayout = _QgsPrintLayout
|
||||
QgsLayoutItemMap = _QgsLayoutItemMap
|
||||
QgsLayoutItemLabel = _QgsLayoutItemLabel
|
||||
QgsLayoutPoint = _QgsLayoutPoint
|
||||
QgsLayoutSize = _QgsLayoutSize
|
||||
QgsUnitTypes = _QgsUnitTypes
|
||||
QgsLayoutItem = _QgsLayoutItem
|
||||
|
||||
QGIS_AVAILABLE = True
|
||||
|
||||
@@ -61,9 +94,17 @@ try:
|
||||
except Exception:
|
||||
QGIS_AVAILABLE = False
|
||||
|
||||
class _MockLayoutManager:
|
||||
def layoutByName(self, name: str):
|
||||
return None
|
||||
|
||||
def addLayout(self, layout: Any) -> bool:
|
||||
return True
|
||||
|
||||
class _MockQgsProject:
|
||||
def __init__(self):
|
||||
self._variables = {}
|
||||
self._layout_manager = _MockLayoutManager()
|
||||
|
||||
@staticmethod
|
||||
def instance() -> "_MockQgsProject":
|
||||
@@ -72,6 +113,9 @@ except Exception:
|
||||
def read(self) -> bool:
|
||||
return True
|
||||
|
||||
def layoutManager(self):
|
||||
return self._layout_manager
|
||||
|
||||
QgsProject = _MockQgsProject
|
||||
|
||||
class _MockQgsVectorLayer:
|
||||
@@ -122,6 +166,134 @@ except Exception:
|
||||
|
||||
QgsRasterLayer = _MockQgsRasterLayer
|
||||
|
||||
class _MockQgsPrintLayout:
|
||||
def __init__(self, project: Any):
|
||||
self.project = project
|
||||
self._name = ""
|
||||
self._page = _MockQgsLayoutPage()
|
||||
|
||||
def initializeDefaults(self) -> None:
|
||||
pass
|
||||
|
||||
def setName(self, name: str) -> None:
|
||||
self._name = name
|
||||
|
||||
def pageCollection(self):
|
||||
return self
|
||||
|
||||
def page(self, index: int):
|
||||
return self._page
|
||||
|
||||
def addLayoutItem(self, item: Any) -> None:
|
||||
pass
|
||||
|
||||
class _MockQgsLayoutPage:
|
||||
def setPageSize(self, size: Any) -> None:
|
||||
self.size = size
|
||||
|
||||
class _MockQgsLayoutItem:
|
||||
class ReferencePoint:
|
||||
LowerLeft = 0
|
||||
|
||||
class _MockQgsLayoutItemMap:
|
||||
def __init__(self, layout: Any):
|
||||
self.layout = layout
|
||||
|
||||
def setId(self, item_id: str) -> None:
|
||||
pass
|
||||
|
||||
def setExtent(self, extent: Any) -> None:
|
||||
pass
|
||||
|
||||
def setScale(self, scale: float) -> None:
|
||||
pass
|
||||
|
||||
def attemptMove(self, point: Any) -> None:
|
||||
pass
|
||||
|
||||
def attemptResize(self, size: Any) -> None:
|
||||
pass
|
||||
|
||||
def setFollowVisibilityPreset(self, active: bool) -> None:
|
||||
pass
|
||||
|
||||
def setFollowVisibilityPresetName(self, name: str) -> None:
|
||||
pass
|
||||
|
||||
class _MockQgsLayoutItemLabel:
|
||||
ModeHtml = 1
|
||||
|
||||
def __init__(self, layout: Any):
|
||||
self.layout = layout
|
||||
|
||||
def setId(self, item_id: str) -> None:
|
||||
pass
|
||||
|
||||
def setText(self, text: str) -> None:
|
||||
pass
|
||||
|
||||
def setMode(self, mode: Any) -> None:
|
||||
pass
|
||||
|
||||
def setFont(self, font: Any) -> None:
|
||||
pass
|
||||
|
||||
def setReferencePoint(self, point: Any) -> None:
|
||||
pass
|
||||
|
||||
def attemptMove(self, point: Any) -> None:
|
||||
pass
|
||||
|
||||
def attemptResize(self, size: Any) -> None:
|
||||
pass
|
||||
|
||||
class _MockQgsLayoutPoint:
|
||||
def __init__(self, x: float, y: float, unit: Any):
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.unit = unit
|
||||
|
||||
class _MockQgsLayoutSize:
|
||||
def __init__(self, width: float, height: float, unit: Any):
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.unit = unit
|
||||
|
||||
class _MockQgsUnitTypes:
|
||||
LayoutMillimeters = 0
|
||||
|
||||
QgsPrintLayout = _MockQgsPrintLayout
|
||||
QgsLayoutItemMap = _MockQgsLayoutItemMap
|
||||
QgsLayoutItemLabel = _MockQgsLayoutItemLabel
|
||||
QgsLayoutPoint = _MockQgsLayoutPoint
|
||||
QgsLayoutSize = _MockQgsLayoutSize
|
||||
QgsUnitTypes = _MockQgsUnitTypes
|
||||
QgsLayoutItem = _MockQgsLayoutItem
|
||||
|
||||
class _MockQgsFeatureRequest:
|
||||
def __init__(self):
|
||||
self._filter_rect = None
|
||||
|
||||
def setFilterRect(self, rect):
|
||||
self._filter_rect = rect
|
||||
return self
|
||||
|
||||
QgsFeatureRequest = _MockQgsFeatureRequest
|
||||
|
||||
class _MockQgsCoordinateTransform:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def transformBoundingBox(self, rect):
|
||||
return rect
|
||||
|
||||
class _MockQgsCoordinateReferenceSystem:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
QgsCoordinateTransform = _MockQgsCoordinateTransform
|
||||
QgsCoordinateReferenceSystem = _MockQgsCoordinateReferenceSystem
|
||||
|
||||
QgsNetworkAccessManager = _MockQgsNetworkAccessManager
|
||||
|
||||
class _MockQgis:
|
||||
|
||||
@@ -76,6 +76,9 @@ except Exception:
|
||||
def removeToolBar(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def openLayoutDesigner(self, layout):
|
||||
return layout
|
||||
|
||||
iface = _MockIface()
|
||||
|
||||
class _MockQgsFileWidget:
|
||||
@@ -132,6 +135,13 @@ def get_main_window():
|
||||
return None
|
||||
|
||||
|
||||
def open_layout_designer(layout: Any) -> Any:
|
||||
try:
|
||||
return iface.openLayoutDesigner(layout)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# Dock-Handling
|
||||
# ---------------------------------------------------------
|
||||
|
||||
@@ -10,13 +10,17 @@ YES: Optional[Any] = None
|
||||
NO: Optional[Any] = None
|
||||
CANCEL: Optional[Any] = None
|
||||
ICON_QUESTION: Optional[Any] = None
|
||||
QVariant: Type[Any] = object
|
||||
|
||||
|
||||
|
||||
# Qt-Klassen (werden dynamisch gesetzt)
|
||||
QDockWidget: Type[Any] = object
|
||||
QMessageBox: Type[Any] = object
|
||||
QFileDialog: Type[Any] = object
|
||||
QProgressDialog: Type[Any] = object
|
||||
QEventLoop: Type[Any] = object
|
||||
QTimer: Type[Any] = object
|
||||
QUrl: Type[Any] = object
|
||||
QNetworkRequest: Type[Any] = object
|
||||
QNetworkReply: Type[Any] = object
|
||||
@@ -25,6 +29,7 @@ QWidget: Type[Any] = object
|
||||
QGridLayout: Type[Any] = object
|
||||
QLabel: Type[Any] = object
|
||||
QLineEdit: Type[Any] = object
|
||||
QInputDialog: Type[Any] = object
|
||||
QGroupBox: Type[Any] = object
|
||||
QVBoxLayout: Type[Any] = object
|
||||
QPushButton: Type[Any] = object
|
||||
@@ -37,7 +42,9 @@ QToolButton: Type[Any] = object
|
||||
QSizePolicy: Type[Any] = object
|
||||
Qt: Type[Any] = object
|
||||
QComboBox: Type[Any] = object
|
||||
QCheckBox: Type[Any] = object
|
||||
QHBoxLayout: Type[Any] = object
|
||||
QFont: Type[Any] = object
|
||||
|
||||
|
||||
def exec_dialog(dialog: Any) -> Any:
|
||||
@@ -64,10 +71,12 @@ try:
|
||||
from qgis.PyQt.QtWidgets import (
|
||||
QMessageBox as _QMessageBox,
|
||||
QFileDialog as _QFileDialog,
|
||||
QProgressDialog as _QProgressDialog,
|
||||
QWidget as _QWidget,
|
||||
QGridLayout as _QGridLayout,
|
||||
QLabel as _QLabel,
|
||||
QLineEdit as _QLineEdit,
|
||||
QInputDialog as _QInputDialog,
|
||||
QGroupBox as _QGroupBox,
|
||||
QVBoxLayout as _QVBoxLayout,
|
||||
QPushButton as _QPushButton,
|
||||
@@ -80,10 +89,13 @@ try:
|
||||
QToolButton as _QToolButton,
|
||||
QSizePolicy as _QSizePolicy,
|
||||
QComboBox as _QComboBox,
|
||||
QCheckBox as _QCheckBox,
|
||||
QHBoxLayout as _QHBoxLayout,
|
||||
)
|
||||
from qgis.PyQt.QtGui import QFont as _QFont
|
||||
from qgis.PyQt.QtCore import (
|
||||
QEventLoop as _QEventLoop,
|
||||
QTimer as _QTimer,
|
||||
QUrl as _QUrl,
|
||||
QCoreApplication as _QCoreApplication,
|
||||
Qt as _Qt,
|
||||
@@ -98,7 +110,10 @@ try:
|
||||
QT_VERSION = 6
|
||||
QMessageBox = _QMessageBox
|
||||
QFileDialog = _QFileDialog
|
||||
QProgressDialog = _QProgressDialog
|
||||
QProgressDialog = _QProgressDialog
|
||||
QEventLoop = _QEventLoop
|
||||
QTimer = _QTimer
|
||||
QUrl = _QUrl
|
||||
QNetworkRequest = _QNetworkRequest
|
||||
QNetworkReply = _QNetworkReply
|
||||
@@ -109,6 +124,7 @@ try:
|
||||
QGridLayout = _QGridLayout
|
||||
QLabel = _QLabel
|
||||
QLineEdit = _QLineEdit
|
||||
QInputDialog = _QInputDialog
|
||||
QGroupBox = _QGroupBox
|
||||
QVBoxLayout = _QVBoxLayout
|
||||
QPushButton = _QPushButton
|
||||
@@ -120,8 +136,10 @@ try:
|
||||
QToolButton = _QToolButton
|
||||
QSizePolicy = _QSizePolicy
|
||||
QComboBox = _QComboBox
|
||||
QCheckBox = _QCheckBox
|
||||
QVariant = _QVariant
|
||||
QHBoxLayout= _QHBoxLayout
|
||||
QHBoxLayout = _QHBoxLayout
|
||||
QFont = _QFont
|
||||
# ✅ QT6 ENUMS
|
||||
YES = QMessageBox.StandardButton.Yes
|
||||
NO = QMessageBox.StandardButton.No
|
||||
@@ -158,6 +176,7 @@ except (ImportError, AttributeError):
|
||||
QGridLayout as _QGridLayout,
|
||||
QLabel as _QLabel,
|
||||
QLineEdit as _QLineEdit,
|
||||
QInputDialog as _QInputDialog,
|
||||
QGroupBox as _QGroupBox,
|
||||
QVBoxLayout as _QVBoxLayout,
|
||||
QPushButton as _QPushButton,
|
||||
@@ -170,10 +189,13 @@ except (ImportError, AttributeError):
|
||||
QToolButton as _QToolButton,
|
||||
QSizePolicy as _QSizePolicy,
|
||||
QComboBox as _QComboBox,
|
||||
QCheckBox as _QCheckBox,
|
||||
QHBoxLayout as _QHBoxLayout,
|
||||
)
|
||||
from PyQt5.QtGui import QFont as _QFont
|
||||
from PyQt5.QtCore import (
|
||||
QEventLoop as _QEventLoop,
|
||||
QTimer as _QTimer,
|
||||
QUrl as _QUrl,
|
||||
QCoreApplication as _QCoreApplication,
|
||||
Qt as _Qt,
|
||||
@@ -189,6 +211,7 @@ except (ImportError, AttributeError):
|
||||
QMessageBox = _QMessageBox
|
||||
QFileDialog = _QFileDialog
|
||||
QEventLoop = _QEventLoop
|
||||
QTimer = _QTimer
|
||||
QUrl = _QUrl
|
||||
QNetworkRequest = _QNetworkRequest
|
||||
QNetworkReply = _QNetworkReply
|
||||
@@ -199,6 +222,7 @@ except (ImportError, AttributeError):
|
||||
QGridLayout = _QGridLayout
|
||||
QLabel = _QLabel
|
||||
QLineEdit = _QLineEdit
|
||||
QInputDialog = _QInputDialog
|
||||
QGroupBox = _QGroupBox
|
||||
QVBoxLayout = _QVBoxLayout
|
||||
QPushButton = _QPushButton
|
||||
@@ -210,8 +234,10 @@ except (ImportError, AttributeError):
|
||||
QToolButton = _QToolButton
|
||||
QSizePolicy = _QSizePolicy
|
||||
QComboBox = _QComboBox
|
||||
QCheckBox = _QCheckBox
|
||||
QVariant = _QVariant
|
||||
QHBoxLayout = _QHBoxLayout
|
||||
QHBoxLayout= _QHBoxLayout
|
||||
QFont = _QFont
|
||||
|
||||
# ✅ PYQT5 ENUMS
|
||||
YES = QMessageBox.Yes
|
||||
@@ -283,12 +309,30 @@ except (ImportError, AttributeError):
|
||||
|
||||
QFileDialog = _MockQFileDialog
|
||||
|
||||
class _MockQInputDialog:
|
||||
@staticmethod
|
||||
def getText(parent, title, label, mode=None, text=""):
|
||||
return text, True
|
||||
|
||||
QInputDialog = _MockQInputDialog
|
||||
|
||||
class _MockQEventLoop:
|
||||
def exec(self) -> int: return 0
|
||||
def quit(self) -> None: pass
|
||||
|
||||
QEventLoop = _MockQEventLoop
|
||||
|
||||
class _MockQTimer:
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.timeout = type('Signal', (), {
|
||||
'connect': lambda s, cb: None,
|
||||
})()
|
||||
def setSingleShot(self, v: bool) -> None: pass
|
||||
def start(self, ms: int) -> None: pass
|
||||
def stop(self) -> None: pass
|
||||
|
||||
QTimer = _MockQTimer
|
||||
|
||||
class _MockQUrl(str):
|
||||
def isValid(self) -> bool: return True
|
||||
|
||||
@@ -319,11 +363,18 @@ except (ImportError, AttributeError):
|
||||
class _MockLabel:
|
||||
def __init__(self, text: str = ""): self._text = text
|
||||
class _MockLineEdit:
|
||||
Normal = 0
|
||||
|
||||
def __init__(self, *args, **kwargs): self._text = ""
|
||||
def text(self) -> str: return self._text
|
||||
def setText(self, value: str) -> None: self._text = value
|
||||
|
||||
class _MockButton:
|
||||
class _MockFont:
|
||||
def __init__(self, family: str = "", pointSize: int = 10):
|
||||
self.family = family
|
||||
self.pointSize = pointSize
|
||||
|
||||
class _MockButton:
|
||||
def __init__(self, *args, **kwargs): self.clicked = lambda *a, **k: None
|
||||
|
||||
QWidget = _MockWidget
|
||||
@@ -333,6 +384,7 @@ except (ImportError, AttributeError):
|
||||
QGroupBox = _MockWidget
|
||||
QVBoxLayout = _MockLayout
|
||||
QPushButton = _MockButton
|
||||
QFont = _MockFont
|
||||
QCoreApplication = object()
|
||||
|
||||
class _MockQt:
|
||||
@@ -507,6 +559,22 @@ except (ImportError, AttributeError):
|
||||
def setContentsMargins(self, *args, **kwargs):
|
||||
pass
|
||||
QHBoxLayout = _MockQHBoxLayout
|
||||
|
||||
class _MockQCheckBox:
|
||||
def __init__(self, text: str = "", *args, **kwargs):
|
||||
self._text = text
|
||||
self._checked = False
|
||||
|
||||
def setText(self, text: str) -> None:
|
||||
self._text = text
|
||||
|
||||
def isChecked(self) -> bool:
|
||||
return self._checked
|
||||
|
||||
def setChecked(self, checked: bool) -> None:
|
||||
self._checked = checked
|
||||
|
||||
QCheckBox = _MockQCheckBox
|
||||
def exec_dialog(dialog: Any) -> Any:
|
||||
return YES
|
||||
# --------------------------- TEST ---------------------------
|
||||
|
||||
@@ -6,6 +6,7 @@ from pathlib import Path
|
||||
from typing import Union
|
||||
import sys
|
||||
|
||||
from sn_basis.functions.os_wrapper import is_absolute_path, basename
|
||||
|
||||
_PathLike = Union[str, Path]
|
||||
|
||||
|
||||
14
metadata.txt
Normal file
14
metadata.txt
Normal file
@@ -0,0 +1,14 @@
|
||||
[general]
|
||||
name=LNO Sachsen | Plugin Basisfunktionen
|
||||
qgisMinimumVersion=3.40
|
||||
qgisMaximumVersion=3.99
|
||||
description=Plugin mit Basisfunktionen
|
||||
version=26.3.6-unstable
|
||||
author=Daniel Helbig
|
||||
email=daniel.helbig@kreis-meissen.de
|
||||
homepage=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis
|
||||
tracker=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis/issues
|
||||
repository=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis/src/branch/unstable/
|
||||
experimental=true
|
||||
deprecated=false
|
||||
supportsQt6=true
|
||||
@@ -2,14 +2,14 @@
|
||||
DataGrabber module
|
||||
==================
|
||||
|
||||
UI‑freier Orchestrator für die Prüfung und Klassifikation von Datenquellen.
|
||||
UI-freier Orchestrator für die Prüfung und Klassifikation von Datenquellen.
|
||||
|
||||
Der DataGrabber:
|
||||
- klassifiziert die übergebene Quelle (Datei, Dienst, Datenbank, Excel),
|
||||
- ruft passende Prüfer (Dateipruefer, Linkpruefer, Layerpruefer, Stilpruefer) auf,
|
||||
- sammelt alle rohen ``pruef_ergebnis``‑Objekte,
|
||||
- sammelt alle rohen ``pruef_ergebnis``-Objekte,
|
||||
- aggregiert diese zu einem zusammenfassenden Ergebnis,
|
||||
- **löst selbst keinerlei UI‑Interaktion aus**.
|
||||
- **löst selbst keinerlei UI-Interaktion aus**.
|
||||
|
||||
Alle Nutzerinteraktionen (MessageBar, QMessageBox, Logging) erfolgen
|
||||
ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline).
|
||||
@@ -17,8 +17,11 @@ ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any, Dict, List, Mapping, Optional, Tuple, Literal
|
||||
|
||||
from sn_basis.functions.os_wrapper import basename, path_suffix
|
||||
|
||||
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
||||
from sn_basis.modules.Pruefmanager import Pruefmanager
|
||||
|
||||
@@ -27,6 +30,7 @@ from sn_basis.modules.linkpruefer import Linkpruefer
|
||||
from sn_basis.modules.layerpruefer import Layerpruefer
|
||||
from sn_basis.modules.stilpruefer import Stilpruefer
|
||||
from sn_basis.modules.excel_importer import ExcelImporter
|
||||
from sn_plan41.modules.listenauswerter import Listenauswerter
|
||||
|
||||
|
||||
SourceType = Literal["service", "database", "excel", "unknown"]
|
||||
@@ -38,9 +42,6 @@ class DataGrabber:
|
||||
"""
|
||||
Analysiert und prüft Datenquellen für den Fachdatenabruf.
|
||||
|
||||
Der DataGrabber ist **UI‑frei**. Er erzeugt ausschließlich rohe
|
||||
``pruef_ergebnis``‑Objekte und überlässt deren Verarbeitung
|
||||
vollständig dem aufrufenden Code.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -55,9 +56,9 @@ class DataGrabber:
|
||||
) -> None:
|
||||
self.pruefmanager = pruefmanager
|
||||
self._datei_pruefer_cls = datei_pruefer_cls
|
||||
self.link_pruefer = link_pruefer
|
||||
self.layer_pruefer = layer_pruefer
|
||||
self.stil_pruefer = stil_pruefer
|
||||
self.link_pruefer = link_pruefer or Linkpruefer()
|
||||
self.layer_pruefer = layer_pruefer or Layerpruefer()
|
||||
self.stil_pruefer = stil_pruefer or Stilpruefer()
|
||||
self._excel_importer_cls = excel_importer_cls
|
||||
|
||||
self._source: Optional[str] = None
|
||||
@@ -69,23 +70,61 @@ class DataGrabber:
|
||||
"""Setzt die aktuell zu untersuchende Rohquelle."""
|
||||
self._source = source
|
||||
|
||||
def analyze_source_type(self, source: str) -> SourceType:
|
||||
"""
|
||||
Klassifiziert die Quelle.
|
||||
SourceType = str # "excel" | "datenbank" | "dienst" | "unbekannt"
|
||||
|
||||
Aktuell Platzhalter – liefert ``"unknown"``.
|
||||
|
||||
def analyze_source_type(self, quelle: str) -> Tuple[SourceType, pruef_ergebnis]:
|
||||
"""
|
||||
return "unknown"
|
||||
Klassifiziert die Quelle und liefert das zugehörige pruef_ergebnis.
|
||||
|
||||
Reihenfolge:
|
||||
1. Dateipruefer (Datei + Dateityp)
|
||||
2. Linkpruefer (Dienst)
|
||||
"""
|
||||
|
||||
# --------------------------------------------------
|
||||
# 1. Datei prüfen (inkl. Typ-Erkennung)
|
||||
# --------------------------------------------------
|
||||
dateipruefer = Dateipruefer(pfad=quelle)
|
||||
datei_ergebnis = dateipruefer.pruefe()
|
||||
|
||||
if datei_ergebnis.ok:
|
||||
suffix = path_suffix(datei_ergebnis.kontext).lower()
|
||||
print(f"[DataGrabber] Debug: analyze_source_type source={quelle} -> suffix={suffix}")
|
||||
|
||||
if suffix == ".xlsx":
|
||||
return "excel", datei_ergebnis
|
||||
|
||||
if suffix in (".gpkg", ".sqlite"):
|
||||
return "datenbank", datei_ergebnis
|
||||
|
||||
return "unbekannter_dateityp", datei_ergebnis
|
||||
|
||||
# --------------------------------------------------
|
||||
# 2. Keine Datei → Link prüfen
|
||||
# --------------------------------------------------
|
||||
linkpruefer = Linkpruefer()
|
||||
link_ergebnis = linkpruefer.pruefe(quelle)
|
||||
|
||||
if link_ergebnis.ok:
|
||||
return "dienst", link_ergebnis
|
||||
|
||||
# --------------------------------------------------
|
||||
# 3. Weder Datei noch Dienst
|
||||
# --------------------------------------------------
|
||||
|
||||
return "unbekannte_quelle", link_ergebnis
|
||||
|
||||
def run(self, source: str) -> Tuple[SourceDict, pruef_ergebnis]:
|
||||
"""
|
||||
Führt die vollständige Quellprüfung aus.
|
||||
|
||||
Diese Methode ist **UI‑frei**. Sie gibt rohe Ergebnisse zurück,
|
||||
Diese Methode ist **UIfrei**. Sie gibt rohe Ergebnisse zurück,
|
||||
die vom Aufrufer über den ``Pruefmanager`` verarbeitet werden.
|
||||
"""
|
||||
self.set_source(source)
|
||||
source_type = self.analyze_source_type(source)
|
||||
source_type, source_result = self.analyze_source_type(source)
|
||||
print(f"[DataGrabber] Debug: run source={source} -> source_type={source_type}")
|
||||
|
||||
source_dict: SourceDict = {}
|
||||
partial_results: List[pruef_ergebnis] = []
|
||||
@@ -97,14 +136,7 @@ class DataGrabber:
|
||||
elif source_type == "service":
|
||||
source_dict, partial_results = self._process_service_source(source)
|
||||
else:
|
||||
partial_results.append(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung="Quelle konnte nicht klassifiziert werden",
|
||||
aktion="kein_dateipfad",
|
||||
kontext={"source": source},
|
||||
)
|
||||
)
|
||||
partial_results.append(source_result)
|
||||
|
||||
summary = self._aggregate_results(source, source_dict, partial_results)
|
||||
return source_dict, summary
|
||||
@@ -115,9 +147,150 @@ class DataGrabber:
|
||||
def _process_excel_source(
|
||||
self, filepath: str
|
||||
) -> Tuple[SourceDict, List[pruef_ergebnis]]:
|
||||
source_dict: SourceDict = {}
|
||||
source_dict: SourceDict = {"rows": []}
|
||||
results: List[pruef_ergebnis] = []
|
||||
return source_dict, results
|
||||
|
||||
rows = ExcelImporter(filepath, self.pruefmanager).import_xlsx()
|
||||
print(f"[DataGrabber] Debug: Excel-Linkliste geladen: {filepath}")
|
||||
print(f"[DataGrabber] Debug: raw rows count: {len(rows)}")
|
||||
if rows:
|
||||
first = rows[:min(5, len(rows))]
|
||||
print(f"[DataGrabber] Debug: first rows: {first}")
|
||||
|
||||
if not rows:
|
||||
return source_dict, results
|
||||
|
||||
required_keys = {"ident", "gruppe", "kartenebene", "inhalt", "link", "provider", "stildatei"}
|
||||
|
||||
def extract_url(raw_link: str, provider: str) -> str:
|
||||
if not raw_link:
|
||||
return ""
|
||||
if not isinstance(raw_link, str):
|
||||
return str(raw_link)
|
||||
|
||||
if provider == "wfs":
|
||||
url_match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
|
||||
type_match = re.search(r"typename\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
|
||||
if url_match:
|
||||
url = url_match.group(1).strip()
|
||||
if type_match:
|
||||
typename = type_match.group(1).strip()
|
||||
separator = "&" if "?" in url else "?"
|
||||
return f"url={url}{separator}service=WFS&request=GetFeature&typename={typename}"
|
||||
return f"url={url}"
|
||||
|
||||
if provider == "wms":
|
||||
# falls WMS-URL als url='...' vorliegt
|
||||
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1).strip()
|
||||
|
||||
if provider == "rest":
|
||||
# REST/ArcGIS-Server: direkt nutzen
|
||||
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1).strip()
|
||||
|
||||
# allgemeines Rückfallverhalten
|
||||
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1).strip()
|
||||
return raw_link.strip()
|
||||
|
||||
for row_index, raw_row in enumerate(rows, start=2):
|
||||
if not isinstance(raw_row, Mapping):
|
||||
pe = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung="Linklisten-Zeile ist nicht als Dictionary formatiert.",
|
||||
aktion="ungueltige_zeile",
|
||||
kontext={"zeile": row_index, "wert": raw_row},
|
||||
)
|
||||
results.append(self.pruefmanager.verarbeite(pe))
|
||||
continue
|
||||
|
||||
normalized = {str(k).strip().lower(): v for k, v in raw_row.items() if k is not None}
|
||||
if not required_keys.issubset(normalized.keys()):
|
||||
missing = required_keys.difference(normalized.keys())
|
||||
pe = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Linkliste fehlt erforderliche Spalten: {', '.join(sorted(missing))}",
|
||||
aktion="spaltenfehlend",
|
||||
kontext={"zeile": row_index, "fehlend": sorted(missing)},
|
||||
)
|
||||
results.append(self.pruefmanager.verarbeite(pe))
|
||||
continue
|
||||
|
||||
ident = normalized.get("ident")
|
||||
link_raw = normalized.get("link") or ""
|
||||
provider = str(normalized.get("provider") or "").strip().lower()
|
||||
stildatei_raw = normalized.get("stildatei") or ""
|
||||
stildatei = None
|
||||
|
||||
if stildatei_raw and str(stildatei_raw).strip():
|
||||
style_result = self.stil_pruefer.pruefe(str(stildatei_raw).strip())
|
||||
results.append(self.pruefmanager.verarbeite(style_result))
|
||||
if style_result.ok:
|
||||
# Style-Pfad in der Datenkette beibehalten (absolut, wenn vorhanden).
|
||||
stildatei = str(style_result.kontext or stildatei_raw).strip()
|
||||
else:
|
||||
stildatei = None
|
||||
else:
|
||||
results.append(self.pruefmanager.verarbeite(pruef_ergebnis(ok=True, meldung="Kein Stil angegeben", aktion="stil_optional", kontext=None)))
|
||||
stildatei = None
|
||||
|
||||
if not ident or not link_raw or not provider:
|
||||
pe = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung="Linklisten-Zeile hat fehlende Pflichtfelder (ident/link/provider).",
|
||||
aktion="pflichtfelder_fehlen",
|
||||
kontext={"zeile": row_index, "daten": raw_row},
|
||||
)
|
||||
results.append(self.pruefmanager.verarbeite(pe))
|
||||
continue
|
||||
|
||||
link_url = extract_url(link_raw, provider)
|
||||
|
||||
# Provider-abhängige Linkvalidierung
|
||||
if provider in ("wfs", "wms", "rest"):
|
||||
# Webdienste: wir akzeptieren die URL-Form und prüfen nicht per network_head.
|
||||
link_result = pruef_ergebnis(ok=True, meldung="Service-Link angenommen", aktion="service_link", kontext=link_url)
|
||||
elif provider in ("ogr", "gpkg", "shp", "geojson"):
|
||||
# OGR/Pfad: mit Linkpruefer (pfad oder lokale Datei) prüfen
|
||||
link_result = self.link_pruefer.pruefe(link_url)
|
||||
else:
|
||||
link_result = self.link_pruefer.pruefe(link_url)
|
||||
|
||||
results.append(self.pruefmanager.verarbeite(link_result))
|
||||
|
||||
# stildatei wurde bereits oben geprüft und ggf. auf Dateiname gesetzt oder auf None
|
||||
|
||||
if not link_result.ok:
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Zeile {row_index}: fehlerhafter Link",
|
||||
aktion="link_unvollstaendig",
|
||||
kontext={"row": row_index, "ident": ident},
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
result_row = {
|
||||
"ident": ident,
|
||||
"gruppe": normalized.get("gruppe"),
|
||||
"Kartenebene": normalized.get("kartenebene"),
|
||||
"Inhalt": normalized.get("inhalt"),
|
||||
"Link": link_url,
|
||||
"Provider": provider,
|
||||
"stildatei": stildatei,
|
||||
}
|
||||
source_dict["rows"].append(result_row)
|
||||
|
||||
# Validierung über Listenauswerter
|
||||
listenauswerter = Listenauswerter(self.pruefmanager, self.stil_pruefer or Stilpruefer())
|
||||
validated, validation_results = listenauswerter.validate_rows(source_dict)
|
||||
results.extend(validation_results)
|
||||
return validated, results
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Datenbank‑Quellen
|
||||
@@ -125,6 +298,7 @@ class DataGrabber:
|
||||
def _process_database_source(
|
||||
self, db_path: str
|
||||
) -> Tuple[SourceDict, List[pruef_ergebnis]]:
|
||||
print(f"[DataGrabber] Debug: _process_database_source called, db_path={db_path}")
|
||||
source_dict: SourceDict = {}
|
||||
results: List[pruef_ergebnis] = []
|
||||
return source_dict, results
|
||||
@@ -149,24 +323,29 @@ class DataGrabber:
|
||||
partial_results: List[pruef_ergebnis],
|
||||
) -> pruef_ergebnis:
|
||||
"""
|
||||
Aggregiert Einzelprüfungen zu einem Gesamt‑``pruef_ergebnis``.
|
||||
Aggregiert Einzelprüfungen zu einem Gesamt-``pruef_ergebnis``.
|
||||
|
||||
**Keine UI‑Interaktion.**
|
||||
**Keine UI-Interaktion.**
|
||||
"""
|
||||
if source_dict:
|
||||
rows = source_dict.get("rows") if isinstance(source_dict, dict) else None
|
||||
if rows:
|
||||
return pruef_ergebnis(
|
||||
ok=True,
|
||||
meldung="Quelle erfolgreich geprüft",
|
||||
aktion="ok",
|
||||
kontext={
|
||||
"source": source,
|
||||
"valid_entries": sum(len(v) for v in source_dict.values()),
|
||||
"valid_entries": len(rows),
|
||||
},
|
||||
)
|
||||
|
||||
# Wenn die Linkliste zwar gelesen wurde, aber keine gültigen Zeilen verfügbar sind, geben wir spezifischere Infos zurück.
|
||||
return pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung="Keine gültigen Einträge in der Quelle gefunden",
|
||||
aktion="read_error",
|
||||
kontext={"source": source},
|
||||
meldung="Keine validen Einträge in der Linkliste gefunden",
|
||||
aktion="keine_validen_eintraege",
|
||||
kontext={
|
||||
"source": source,
|
||||
"eintraege_gesamt": len(source_dict.get("rows", [])),
|
||||
},
|
||||
)
|
||||
|
||||
@@ -6,11 +6,11 @@ der Anforderungen 1-2.e (leerer Pfad, fehlende Datei, bestehende Datei).
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import Optional, Literal
|
||||
|
||||
from sn_basis.functions.sys_wrapper import join_path, file_exists
|
||||
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion
|
||||
|
||||
DateiTyp = Literal["excel","datenbank","unbekannt"]
|
||||
|
||||
class Dateipruefer:
|
||||
"""
|
||||
@@ -74,10 +74,25 @@ class Dateipruefer:
|
||||
# ------------------------------------------------------------------
|
||||
# Hilfsfunktionen
|
||||
# ------------------------------------------------------------------
|
||||
def erkenne_dateityp(self, pfad: Path) -> DateiTyp:
|
||||
"""
|
||||
Erkennt den Dateityp anhand der Endung.
|
||||
"""
|
||||
suffix = pfad.suffix.lower()
|
||||
|
||||
if suffix == ".xlsx":
|
||||
return "excel"
|
||||
|
||||
if suffix in (".gpkg", ".sqlite"):
|
||||
return "datenbank"
|
||||
|
||||
return "unbekannt"
|
||||
|
||||
def _pfad(self, relativer_pfad: str) -> Path:
|
||||
"""Erzeugt OS-unabhängigen Pfad relativ zum Basisverzeichnis."""
|
||||
return join_path(self.basis_pfad, relativer_pfad)
|
||||
|
||||
|
||||
def _ist_leer(self) -> bool:
|
||||
"""
|
||||
Prüft robust, ob Eingabe als „leer" zu behandeln ist.
|
||||
@@ -134,6 +149,31 @@ class Dateipruefer:
|
||||
|
||||
# 2. Pfad normalisieren
|
||||
pfad = self._pfad(self.pfad.strip())
|
||||
#Excel-dateien erkennen
|
||||
dateityp = self.erkenne_dateityp(pfad)
|
||||
|
||||
if dateityp == "excel":
|
||||
if not file_exists(pfad):
|
||||
return pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Excel-Datei '{self.pfad}' wurde nicht gefunden.",
|
||||
aktion="datei_nicht_gefunden",
|
||||
kontext=pfad,
|
||||
)
|
||||
|
||||
return pruef_ergebnis(
|
||||
ok=True,
|
||||
meldung="Excel-Datei ist gültig.",
|
||||
aktion="ok",
|
||||
kontext=pfad,
|
||||
)
|
||||
if dateityp != "datenbank":
|
||||
return pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Der Pfad '{self.pfad}' ist kein unterstützter Dateityp.",
|
||||
aktion="unbekannter_dateityp",
|
||||
kontext=pfad,
|
||||
)
|
||||
|
||||
# 🆕 2.c: Ungültiger GPKG-Pfad?
|
||||
if not self.verfahrens_db_modus or not self._ist_gueltiger_gpkg_pfad(pfad):
|
||||
|
||||
@@ -17,10 +17,11 @@ Designprinzipien
|
||||
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
|
||||
"""
|
||||
|
||||
from typing import Any, Dict, List, Mapping, Optional, Tuple
|
||||
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
|
||||
|
||||
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
|
||||
import json
|
||||
import time
|
||||
|
||||
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
||||
from sn_basis.functions import qgiscore_wrapper as qgiscore
|
||||
@@ -59,6 +60,7 @@ class Datenabruf:
|
||||
verfahrensgebiet_layer: Any,
|
||||
speicherort: str,
|
||||
pruef_ergebnisse: Optional[List[Any]] = None,
|
||||
progress: Optional[Any] = None,
|
||||
) -> Tuple[Dict[str, Any], List[Any]]:
|
||||
"""
|
||||
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
|
||||
@@ -82,6 +84,10 @@ class Datenabruf:
|
||||
|
||||
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
|
||||
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
|
||||
filter_crs_authid = None
|
||||
if isinstance(bbox_geom, dict):
|
||||
raw_crs = bbox_geom.get("crs_authid")
|
||||
filter_crs_authid = str(raw_crs) if raw_crs else None
|
||||
|
||||
# Globale Logs über alle Dienste hinweg
|
||||
log_geladen: Dict[str, int] = {}
|
||||
@@ -90,7 +96,20 @@ class Datenabruf:
|
||||
log_ausserhalb: Dict[str, int] = {}
|
||||
|
||||
# 2) Über alle Zeilen iterieren
|
||||
for row in rows:
|
||||
total_rows = len(rows)
|
||||
for idx, row in enumerate(rows, start=1):
|
||||
if progress is not None:
|
||||
progress.set_label(f"Datenabruf {idx}/{total_rows}…")
|
||||
if progress.is_canceled():
|
||||
pe_cancel = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung="Datenabruf durch Benutzer abgebrochen",
|
||||
aktion="abbruch",
|
||||
kontext={"schritt": idx},
|
||||
)
|
||||
processed_results.append(self.pruefmanager.verarbeite(pe_cancel))
|
||||
break
|
||||
|
||||
ident = row.get("ident")
|
||||
link = row.get("Link")
|
||||
provider = row.get("Provider")
|
||||
@@ -115,7 +134,16 @@ class Datenabruf:
|
||||
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
|
||||
|
||||
# 2b) Fachdaten abrufen
|
||||
features, error_msg = self._fetch_features(url=url, provider=str(provider))
|
||||
features, error_msg = self._fetch_features(
|
||||
url=url,
|
||||
provider=str(provider),
|
||||
cancel_callback=(progress.is_canceled if progress is not None else None),
|
||||
)
|
||||
|
||||
|
||||
if progress is not None:
|
||||
if hasattr(progress, "set_value"):
|
||||
progress.set_value(idx)
|
||||
|
||||
# 2c) Logs und Aggregation
|
||||
if error_msg:
|
||||
@@ -207,7 +235,18 @@ class Datenabruf:
|
||||
return None
|
||||
|
||||
if raumfilter == "Verfahrensgebiet":
|
||||
return qgiscore.get_layer_extent(verfahrensgebiet_layer)
|
||||
extent = qgiscore.get_layer_extent(verfahrensgebiet_layer)
|
||||
if extent is None:
|
||||
return None
|
||||
crs_authid = None
|
||||
try:
|
||||
if hasattr(verfahrensgebiet_layer, "crs") and callable(getattr(verfahrensgebiet_layer, "crs")):
|
||||
crs = verfahrensgebiet_layer.crs()
|
||||
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
|
||||
crs_authid = crs.authid()
|
||||
except Exception:
|
||||
crs_authid = None
|
||||
return {"extent": extent, "crs_authid": crs_authid}
|
||||
|
||||
if raumfilter == "Pufferlayer":
|
||||
buffer_layer = qgiscore.create_buffer_layer(
|
||||
@@ -216,8 +255,18 @@ class Datenabruf:
|
||||
layer_name="Verfahrensgebiet_Puffer_1km",
|
||||
)
|
||||
if buffer_layer is not None:
|
||||
qgisui.add_layer_to_project(buffer_layer)
|
||||
return qgiscore.get_layer_extent(buffer_layer)
|
||||
extent = qgiscore.get_layer_extent(buffer_layer)
|
||||
if extent is None:
|
||||
return None
|
||||
crs_authid = None
|
||||
try:
|
||||
if hasattr(buffer_layer, "crs") and callable(getattr(buffer_layer, "crs")):
|
||||
crs = buffer_layer.crs()
|
||||
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
|
||||
crs_authid = crs.authid()
|
||||
except Exception:
|
||||
crs_authid = None
|
||||
return {"extent": extent, "crs_authid": crs_authid}
|
||||
|
||||
return None
|
||||
|
||||
@@ -233,60 +282,130 @@ class Datenabruf:
|
||||
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
|
||||
"""
|
||||
provider_norm = (provider or "").upper()
|
||||
base_link = link or ""
|
||||
base_link = (link or "").strip()
|
||||
if base_link.lower().startswith("url="):
|
||||
base_link = base_link[4:].strip()
|
||||
|
||||
# WMS: niemals BBOX anhängen
|
||||
if provider_norm == "WFS" and base_link.count("?") > 1:
|
||||
first, rest = base_link.split("?", 1)
|
||||
base_link = f"{first}?{rest.replace('?', '&')}"
|
||||
|
||||
extent_obj = bbox_geom
|
||||
crs_authid: Optional[str] = None
|
||||
if isinstance(bbox_geom, dict):
|
||||
extent_obj = bbox_geom.get("extent")
|
||||
raw_crs = bbox_geom.get("crs_authid")
|
||||
crs_authid = str(raw_crs) if raw_crs else None
|
||||
|
||||
# WMS: unverändert durchreichen
|
||||
if provider_norm == "WMS":
|
||||
return base_link
|
||||
|
||||
if bbox_geom is None:
|
||||
return base_link
|
||||
|
||||
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
|
||||
# Versuche bbox-String zu erzeugen (falls Raumfilter aktiv)
|
||||
bbox_str: Optional[str] = None
|
||||
try:
|
||||
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
|
||||
if callable(extent_to_bbox):
|
||||
bbox_str = extent_to_bbox(bbox_geom)
|
||||
else:
|
||||
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
|
||||
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
|
||||
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
|
||||
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
|
||||
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
|
||||
if extent_obj is not None:
|
||||
try:
|
||||
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
|
||||
if callable(extent_to_bbox):
|
||||
bbox_str = extent_to_bbox(extent_obj)
|
||||
else:
|
||||
bbox_str = str(bbox_geom)
|
||||
except Exception:
|
||||
bbox_str = None
|
||||
|
||||
if not bbox_str:
|
||||
return base_link
|
||||
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
|
||||
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
|
||||
bbox_str = f"{extent_obj.xmin()},{extent_obj.ymin()},{extent_obj.xmax()},{extent_obj.ymax()}"
|
||||
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
|
||||
bbox_str = f"{extent_obj[0]},{extent_obj[1]},{extent_obj[2]},{extent_obj[3]}"
|
||||
else:
|
||||
bbox_str = str(extent_obj)
|
||||
except Exception:
|
||||
bbox_str = None
|
||||
|
||||
parsed = urlparse(base_link)
|
||||
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
|
||||
|
||||
if provider_norm == "WFS":
|
||||
query_params.setdefault("BBOX", bbox_str)
|
||||
query_params.setdefault("service", "WFS")
|
||||
query_params.setdefault("request", "GetFeature")
|
||||
query_params.setdefault("outputFormat", "application/json")
|
||||
if bbox_str:
|
||||
query_params.setdefault("BBOX", bbox_str)
|
||||
if crs_authid:
|
||||
query_params.setdefault("SRSNAME", crs_authid)
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
rebuilt = parsed._replace(query=new_query)
|
||||
return urlunparse(rebuilt)
|
||||
|
||||
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
|
||||
query_params.setdefault("geometry", bbox_str)
|
||||
query_params.setdefault("geometryType", "esriGeometryEnvelope")
|
||||
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
|
||||
# ArcGIS FeatureServer erwartet i.d.R. den /query-Endpunkt
|
||||
rest_base = base_link.rstrip("/")
|
||||
if not rest_base.lower().endswith("/query"):
|
||||
rest_base = f"{rest_base}/query"
|
||||
|
||||
parsed_rest = urlparse(rest_base)
|
||||
query_params = dict(parse_qsl(parsed_rest.query, keep_blank_values=True))
|
||||
query_params.setdefault("where", "1=1")
|
||||
query_params.setdefault("outFields", "*")
|
||||
query_params.setdefault("returnGeometry", "true")
|
||||
query_params.setdefault("f", query_params.get("f", "json"))
|
||||
|
||||
if bbox_str:
|
||||
geometry_envelope = None
|
||||
try:
|
||||
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
|
||||
geometry_envelope = {
|
||||
"xmin": extent_obj.xmin(),
|
||||
"ymin": extent_obj.ymin(),
|
||||
"xmax": extent_obj.xmax(),
|
||||
"ymax": extent_obj.ymax(),
|
||||
}
|
||||
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
|
||||
geometry_envelope = {
|
||||
"xmin": extent_obj[0],
|
||||
"ymin": extent_obj[1],
|
||||
"xmax": extent_obj[2],
|
||||
"ymax": extent_obj[3],
|
||||
}
|
||||
else:
|
||||
parts = [p.strip() for p in str(bbox_str).split(",")]
|
||||
if len(parts) == 4:
|
||||
geometry_envelope = {
|
||||
"xmin": float(parts[0]),
|
||||
"ymin": float(parts[1]),
|
||||
"xmax": float(parts[2]),
|
||||
"ymax": float(parts[3]),
|
||||
}
|
||||
except Exception:
|
||||
geometry_envelope = None
|
||||
|
||||
if geometry_envelope is not None:
|
||||
query_params.setdefault("geometry", json.dumps(geometry_envelope))
|
||||
else:
|
||||
query_params.setdefault("geometry", bbox_str)
|
||||
query_params.setdefault("geometryType", "esriGeometryEnvelope")
|
||||
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
|
||||
|
||||
if crs_authid and ":" in crs_authid:
|
||||
srid = crs_authid.split(":", 1)[1]
|
||||
if srid.isdigit():
|
||||
query_params.setdefault("inSR", srid)
|
||||
query_params.setdefault("outSR", srid)
|
||||
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
rebuilt = parsed._replace(query=new_query)
|
||||
rebuilt = parsed_rest._replace(query=new_query)
|
||||
return urlunparse(rebuilt)
|
||||
|
||||
# Default: generischer bbox-Parameter
|
||||
query_params.setdefault("bbox", bbox_str)
|
||||
# Default: generischer bbox-Parameter (nur wenn vorhanden)
|
||||
if bbox_str:
|
||||
query_params.setdefault("bbox", bbox_str)
|
||||
new_query = urlencode(query_params, doseq=True)
|
||||
rebuilt = parsed._replace(query=new_query)
|
||||
return urlunparse(rebuilt)
|
||||
|
||||
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]:
|
||||
def _fetch_features(
|
||||
self,
|
||||
url: str,
|
||||
provider: str,
|
||||
cancel_callback: Optional[Callable[[], bool]] = None,
|
||||
) -> Tuple[List[Any], Optional[str]]:
|
||||
"""
|
||||
Führt den eigentlichen Abruf der Fachdaten durch.
|
||||
|
||||
@@ -336,34 +455,100 @@ class Datenabruf:
|
||||
http_error: Optional[str] = None
|
||||
|
||||
# QGIS NetworkAccessManager bevorzugen
|
||||
_FETCH_TIMEOUT_MS = 30_000 # 30 Sekunden
|
||||
aborted_or_timed_out = False
|
||||
attempted_qgis_fetch = False
|
||||
|
||||
if callable(cancel_callback) and cancel_callback():
|
||||
return [], "Abbruch durch Benutzer"
|
||||
|
||||
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
|
||||
attempted_qgis_fetch = True
|
||||
try:
|
||||
manager = qgiscore.QgsNetworkAccessManager.instance()
|
||||
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None)
|
||||
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None)
|
||||
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None)
|
||||
# Netzwerk-Timeout global setzen (QGIS >= 3.6)
|
||||
if hasattr(manager, "setTimeout"):
|
||||
manager.setTimeout(_FETCH_TIMEOUT_MS)
|
||||
_qt = __import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"])
|
||||
QUrl = getattr(_qt, "QUrl", None)
|
||||
QNetworkRequest = getattr(_qt, "QNetworkRequest", None)
|
||||
QEventLoop = getattr(_qt, "QEventLoop", None)
|
||||
QTimer = getattr(_qt, "QTimer", None)
|
||||
if QUrl is not None and QNetworkRequest is not None:
|
||||
req = QNetworkRequest(QUrl(url))
|
||||
reply = manager.get(req)
|
||||
if QEventLoop is not None:
|
||||
loop = QEventLoop()
|
||||
reply.finished.connect(loop.quit)
|
||||
loop.exec()
|
||||
try:
|
||||
raw = reply.readAll()
|
||||
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
|
||||
response_text = data_bytes.decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
_poll_timer = None
|
||||
if QTimer is not None:
|
||||
try:
|
||||
_poll_timer = QTimer()
|
||||
_poll_timer.setSingleShot(False)
|
||||
_poll_timer.timeout.connect(loop.quit)
|
||||
_poll_timer.start(100)
|
||||
except Exception:
|
||||
_poll_timer = None
|
||||
|
||||
start_time = time.monotonic()
|
||||
while True:
|
||||
if callable(cancel_callback) and cancel_callback():
|
||||
reply.abort()
|
||||
http_error = "Abbruch durch Benutzer"
|
||||
aborted_or_timed_out = True
|
||||
break
|
||||
|
||||
elapsed_ms = int((time.monotonic() - start_time) * 1000)
|
||||
if elapsed_ms >= _FETCH_TIMEOUT_MS:
|
||||
reply.abort()
|
||||
http_error = f"Timeout nach {_FETCH_TIMEOUT_MS // 1000} s: {url}"
|
||||
aborted_or_timed_out = True
|
||||
break
|
||||
|
||||
if hasattr(reply, "isFinished") and reply.isFinished():
|
||||
break
|
||||
|
||||
loop.exec()
|
||||
try:
|
||||
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
|
||||
qt.QCoreApplication.processEvents()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if _poll_timer is not None:
|
||||
try:
|
||||
_poll_timer.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not aborted_or_timed_out:
|
||||
# Fehler aus Reply auslesen
|
||||
err_code = None
|
||||
try:
|
||||
err_code = reply.error()
|
||||
except Exception:
|
||||
pass
|
||||
if err_code and int(err_code) != 0:
|
||||
http_error = f"Netzwerkfehler ({err_code}): {reply.errorString()}"
|
||||
if http_error:
|
||||
# Timeout oder Netzwerkfehler – keinen Body lesen
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
response_text = reply.text()
|
||||
raw = reply.readAll()
|
||||
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
|
||||
response_text = data_bytes.decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
response_text = None
|
||||
try:
|
||||
response_text = reply.text()
|
||||
except Exception:
|
||||
response_text = None
|
||||
except Exception as exc:
|
||||
http_error = f"QgsNetworkAccessManager error: {exc}"
|
||||
response_text = None
|
||||
|
||||
# Fallback: requests
|
||||
if response_text is None:
|
||||
# Fallback: requests nur wenn kein harter Abbruch/Timeout im QGIS-Request vorlag
|
||||
if response_text is None and (not attempted_qgis_fetch or not aborted_or_timed_out):
|
||||
try:
|
||||
import requests # lokal import, keine harte Abhängigkeit
|
||||
r = requests.get(url, timeout=30)
|
||||
@@ -383,6 +568,8 @@ class Datenabruf:
|
||||
return parsed.get("features", []), None
|
||||
if isinstance(parsed, dict) and "features" in parsed:
|
||||
return parsed.get("features", []), None
|
||||
if prov in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER", "WFS"):
|
||||
return [], "Antwort enthält keine Feature-Liste"
|
||||
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück
|
||||
return [parsed], None
|
||||
except json.JSONDecodeError:
|
||||
|
||||
@@ -30,9 +30,13 @@ from __future__ import annotations
|
||||
from typing import Any, Dict, List, Optional
|
||||
import os
|
||||
import json
|
||||
import re
|
||||
import datetime
|
||||
import sqlite3
|
||||
|
||||
from sn_basis.functions import qgiscore_wrapper as qgiscore
|
||||
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
|
||||
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
|
||||
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
||||
|
||||
|
||||
@@ -53,10 +57,97 @@ class Datenschreiber:
|
||||
|
||||
def __init__(self, pruefmanager: Any, gpkg_path: Optional[str] = None) -> None:
|
||||
self.pruefmanager = pruefmanager
|
||||
self.gpkg_path = gpkg_path
|
||||
self.gpkg_path = str(gpkg_path) if gpkg_path else None
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Schreibe Daten
|
||||
def _resolve_style_path(self, style_path: Optional[str]) -> Optional[str]:
|
||||
if not style_path:
|
||||
return None
|
||||
|
||||
style_path_str = str(style_path).strip()
|
||||
if not style_path_str:
|
||||
return None
|
||||
|
||||
if not is_absolute_path(style_path_str):
|
||||
plugin_root = get_plugin_root()
|
||||
style_path_str = str(join_path(plugin_root, "sn_plan41", "assets", style_path_str))
|
||||
|
||||
style_path_str = str(normalize_path(style_path_str))
|
||||
return style_path_str if file_exists(style_path_str) else None
|
||||
|
||||
def _store_style_in_gpkg(self, layer_name: str, style_path: str, layer: Optional[Any] = None) -> None:
|
||||
"""Stellt sicher, dass der Stil in der layer_styles-Tabelle der GPKG gespeichert wird."""
|
||||
try:
|
||||
with open(style_path, "r", encoding="utf-8") as fh:
|
||||
style_qml = fh.read()
|
||||
|
||||
f_geometry_column = ''
|
||||
if layer is not None:
|
||||
try:
|
||||
if hasattr(layer, 'geometryColumn'):
|
||||
f_geometry_column = str(layer.geometryColumn())
|
||||
elif hasattr(layer, 'dataProvider') and hasattr(layer.dataProvider(), 'geometryColumnName'):
|
||||
f_geometry_column = str(layer.dataProvider().geometryColumnName())
|
||||
except Exception:
|
||||
f_geometry_column = ''
|
||||
|
||||
with sqlite3.connect(self.gpkg_path) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS layer_styles (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
f_table_catalog TEXT,
|
||||
f_table_schema TEXT,
|
||||
f_table_name TEXT NOT NULL,
|
||||
f_geometry_column TEXT,
|
||||
styleName TEXT,
|
||||
styleQML TEXT,
|
||||
styleSLD TEXT,
|
||||
useAsDefault BOOLEAN,
|
||||
description TEXT,
|
||||
owner TEXT,
|
||||
ui TEXT,
|
||||
update_time DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
# Das aktuelle QGIS-Style-Verhalten: bestehenden Style für denselben Layer nicht löschen (nur appenden)
|
||||
# Wir wollen aber Default-Style setzen: alte Default-Styles entfernen.
|
||||
cur.execute(
|
||||
"UPDATE layer_styles SET useAsDefault = 0 WHERE f_table_name = ?",
|
||||
(layer_name,),
|
||||
)
|
||||
|
||||
# Fülle die bekannten QGIS-Kolonnen
|
||||
style_name = os.path.basename(style_path)
|
||||
|
||||
cur.execute(
|
||||
"INSERT INTO layer_styles (f_table_catalog, f_table_schema, f_table_name, f_geometry_column, styleName, styleQML, styleSLD, useAsDefault, description, owner, ui) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
'',
|
||||
'',
|
||||
layer_name,
|
||||
f_geometry_column,
|
||||
style_name,
|
||||
style_qml,
|
||||
None,
|
||||
1,
|
||||
'',
|
||||
'',
|
||||
'',
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception as exc:
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Speichern des Layer-Stils in GPKG: {exc}",
|
||||
aktion="style_gpkg_speichern_fehlgeschlagen",
|
||||
kontext={"layer_name": layer_name, "style_path": style_path},
|
||||
)
|
||||
)
|
||||
# ------------------------------------------------------------------ #
|
||||
def schreibe_Daten(
|
||||
self,
|
||||
@@ -65,192 +156,93 @@ class Datenschreiber:
|
||||
speicherort: str,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Schreibt die abgerufenen Daten in die Zieldatenbank/Dateien.
|
||||
Schreibt die übergebenen Layer in die Ziel-GPKG.
|
||||
|
||||
Ablauf
|
||||
------
|
||||
Für jede Zeile (ident) in ``daten_dict["daten"]``:
|
||||
1. Bestimme Ziel-Layername (z. B. Thema oder ident).
|
||||
2. Prüfe, ob ein Layer mit diesem Namen bereits existiert (Wrapper).
|
||||
3. Falls vorhanden, frage den Benutzer (Überschreiben / Anhängen / Abbrechen)
|
||||
über die zentrale Pruefmanager-Methode `ask_overwrite_append_cancel`.
|
||||
4. Führe die gewählte Operation aus oder schreibe den Layer, wenn er noch nicht existiert.
|
||||
5. Schreibe ggf. den Stil in die GPKG und setze ihn als Vorgabe.
|
||||
6. Sammle und gib eine Liste der angelegten/geänderten Layer zurück.
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[Dict[str, Any]]
|
||||
Liste von Dicts mit Informationen zu jedem angelegten/geänderten Layer.
|
||||
Erwartung:
|
||||
- daten_dict["daten"] enthält Einträge der Form:
|
||||
ident -> {"layer": QgsVectorLayer}
|
||||
- self.gpkg_path ist ein str
|
||||
"""
|
||||
|
||||
if not speicherort:
|
||||
raise ValueError("Ein gültiger Speicherort (speicherort) muss übergeben werden.")
|
||||
|
||||
# Setze gpkg_path falls noch nicht vorhanden
|
||||
# gpkg_path einmalig setzen / normalisieren
|
||||
if not self.gpkg_path:
|
||||
self.gpkg_path = speicherort
|
||||
self.gpkg_path = str(speicherort)
|
||||
|
||||
results: List[Dict[str, Any]] = []
|
||||
daten_map: Dict[str, List[Any]] = daten_dict.get("daten", {})
|
||||
daten_map: Dict[str, Any] = daten_dict.get("daten", {})
|
||||
|
||||
# Iteriere über alle Einträge
|
||||
for ident, features in daten_map.items():
|
||||
# Thema/Name ableiten (falls vorhanden in processed_results oder ident)
|
||||
for ident, entry in daten_map.items():
|
||||
layer = None
|
||||
style_path = None
|
||||
|
||||
# -----------------------------
|
||||
# Layer extrahieren
|
||||
# -----------------------------
|
||||
if isinstance(entry, dict):
|
||||
layer = entry.get("layer")
|
||||
style_path = self._resolve_style_path(entry.get("style_path"))
|
||||
|
||||
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
|
||||
pe_err = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Ungültiger Layer für {ident}",
|
||||
aktion="save_exception",
|
||||
kontext={"ident": ident},
|
||||
)
|
||||
self.pruefmanager.verarbeite(pe_err)
|
||||
continue
|
||||
|
||||
# -----------------------------
|
||||
# Layername bestimmen
|
||||
# -----------------------------
|
||||
thema = None
|
||||
for pe in processed_results:
|
||||
try:
|
||||
kontext = getattr(pe, "kontext", None) or {}
|
||||
if kontext and kontext.get("ident") == ident:
|
||||
if kontext.get("ident") == ident:
|
||||
thema = kontext.get("thema")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
if not thema:
|
||||
thema = str(ident)
|
||||
|
||||
layer_name = thema
|
||||
layer_name_raw = thema or str(ident)
|
||||
layer_name = re.sub(r"[^A-Za-z0-9_]+", "_", layer_name_raw).strip("_")
|
||||
if not layer_name:
|
||||
layer_name = f"layer_{ident}"
|
||||
|
||||
# Prüfe, ob Layer bereits existiert in der Ziel-GPKG
|
||||
layer_exists = False
|
||||
try:
|
||||
layer_exists_fn = getattr(qgiscore, "layer_exists_in_gpkg", None)
|
||||
if callable(layer_exists_fn):
|
||||
layer_exists = layer_exists_fn(self.gpkg_path, layer_name)
|
||||
else:
|
||||
# Fallback: QGIS-Fallback-Check via QgsVectorLayer
|
||||
if getattr(qgiscore, "QgsVectorLayer", None) is not None and qgiscore.QGIS_AVAILABLE:
|
||||
uri = f"{self.gpkg_path}|layername={layer_name}"
|
||||
layer = qgiscore.QgsVectorLayer(uri, layer_name, "ogr")
|
||||
layer_exists = bool(layer and getattr(layer, "isValid", lambda: False)())
|
||||
except Exception:
|
||||
layer_exists = False
|
||||
|
||||
operation = "created"
|
||||
|
||||
if layer_exists:
|
||||
# Zentrale Nutzerabfrage über Pruefmanager
|
||||
# Erwartet Rückgabe: "overwrite" | "append" | "cancel"
|
||||
try:
|
||||
user_choice = self.pruefmanager.ask_overwrite_append_cancel(layer_name)
|
||||
except Exception:
|
||||
# Fallback: overwrite, falls Pruefmanager nicht verfügbar
|
||||
user_choice = "overwrite"
|
||||
|
||||
if user_choice == "cancel":
|
||||
operation = "skipped"
|
||||
results.append({
|
||||
"ident": ident,
|
||||
"thema": thema,
|
||||
"operation": operation,
|
||||
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
|
||||
"feature_count": 0,
|
||||
})
|
||||
continue
|
||||
|
||||
if user_choice == "overwrite":
|
||||
write_err = self._write_layer_to_gpkg(layer_name, features, mode="overwrite")
|
||||
if write_err:
|
||||
pe_err = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Überschreiben von {layer_name}: {write_err}",
|
||||
aktion="save_exception",
|
||||
kontext={"ident": ident, "thema": thema, "error": write_err},
|
||||
)
|
||||
self.pruefmanager.verarbeite(pe_err)
|
||||
operation = "skipped"
|
||||
results.append({
|
||||
"ident": ident,
|
||||
"thema": thema,
|
||||
"operation": operation,
|
||||
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
|
||||
"feature_count": 0,
|
||||
})
|
||||
continue
|
||||
else:
|
||||
operation = "overwritten"
|
||||
|
||||
elif user_choice == "append":
|
||||
write_err = self._write_layer_to_gpkg(layer_name, features, mode="append")
|
||||
if write_err:
|
||||
pe_err = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Anhängen an {layer_name}: {write_err}",
|
||||
aktion="save_exception",
|
||||
kontext={"ident": ident, "thema": thema, "error": write_err},
|
||||
)
|
||||
self.pruefmanager.verarbeite(pe_err)
|
||||
operation = "skipped"
|
||||
results.append({
|
||||
"ident": ident,
|
||||
"thema": thema,
|
||||
"operation": operation,
|
||||
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
|
||||
"feature_count": 0,
|
||||
})
|
||||
continue
|
||||
else:
|
||||
operation = "appended"
|
||||
|
||||
else:
|
||||
# Layer existiert nicht -> neu anlegen
|
||||
write_err = self._write_layer_to_gpkg(layer_name, features, mode="create")
|
||||
if write_err:
|
||||
pe_err = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Erstellen von {layer_name}: {write_err}",
|
||||
aktion="save_exception",
|
||||
kontext={"ident": ident, "thema": thema, "error": write_err},
|
||||
)
|
||||
self.pruefmanager.verarbeite(pe_err)
|
||||
operation = "skipped"
|
||||
results.append({
|
||||
"ident": ident,
|
||||
"thema": thema,
|
||||
"operation": operation,
|
||||
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
|
||||
"feature_count": 0,
|
||||
})
|
||||
continue
|
||||
else:
|
||||
operation = "created"
|
||||
|
||||
# Stilbehandlung (falls in processed_results referenziert)
|
||||
style_written = False
|
||||
style_path = None
|
||||
for pe in processed_results:
|
||||
try:
|
||||
kontext = getattr(pe, "kontext", None) or {}
|
||||
if kontext and kontext.get("ident") == ident:
|
||||
style_path = kontext.get("stildatei") or kontext.get("Stildatei")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
# Layer in GPKG schreiben
|
||||
err_msg = self._write_layer_to_gpkg(layer_name=layer_name, layer=layer)
|
||||
if err_msg is not None:
|
||||
pe_err = pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Schreiben des Layers {layer_name}: {err_msg}",
|
||||
aktion="save_exception",
|
||||
kontext={"ident": ident, "layer_name": layer_name},
|
||||
)
|
||||
self.pruefmanager.verarbeite(pe_err)
|
||||
continue
|
||||
|
||||
# Wenn der Stil vorhanden und valide ist, als Default in GPKG-Style-Tabelle ablegen
|
||||
if style_path:
|
||||
if not os.path.isabs(style_path):
|
||||
base_dir = os.path.dirname(__file__)
|
||||
style_path = os.path.join(base_dir, style_path)
|
||||
write_style_fn = getattr(qgiscore, "write_style_to_gpkg", None)
|
||||
if callable(write_style_fn):
|
||||
try:
|
||||
write_style_fn(self.gpkg_path, style_path, layer_name)
|
||||
style_written = True
|
||||
except Exception:
|
||||
style_written = False
|
||||
|
||||
feature_count = len(features) if isinstance(features, list) else 0
|
||||
self._store_style_in_gpkg(layer_name, style_path, layer)
|
||||
|
||||
# Erfolgsfall: Info für lade_Layer sammeln
|
||||
layer_path = f"{self.gpkg_path}|layername={layer_name}"
|
||||
results.append({
|
||||
"layer_path": layer_path,
|
||||
"thema": layer_name,
|
||||
"ident": ident,
|
||||
"thema": thema,
|
||||
"operation": operation,
|
||||
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
|
||||
"feature_count": feature_count,
|
||||
"style_written": style_written,
|
||||
"style_path": style_path,
|
||||
})
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# -----------------------------
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Lade Layer ins Projekt
|
||||
# ------------------------------------------------------------------ #
|
||||
@@ -288,18 +280,33 @@ class Datenschreiber:
|
||||
self.pruefmanager.verarbeite(pe_err)
|
||||
continue
|
||||
|
||||
try:
|
||||
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
|
||||
if callable(apply_style_fn):
|
||||
apply_style_fn(self.gpkg_path, layer)
|
||||
except Exception:
|
||||
pe_warn = pruef_ergebnis(
|
||||
ok=True,
|
||||
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
|
||||
aktion="stil_not_implemented",
|
||||
kontext={"thema": thema},
|
||||
)
|
||||
self.pruefmanager.verarbeite(pe_warn)
|
||||
style_path = info.get("style_path")
|
||||
resolved_style_path = self._resolve_style_path(style_path)
|
||||
if resolved_style_path:
|
||||
try:
|
||||
layer.loadNamedStyle(resolved_style_path)
|
||||
layer.triggerRepaint()
|
||||
except Exception as exc:
|
||||
pe_warn = pruef_ergebnis(
|
||||
ok=True,
|
||||
meldung=f"Style konnte für {thema} nicht geladen werden: {exc}",
|
||||
aktion="stil_laden_fehlgeschlagen",
|
||||
kontext={"thema": thema, "style_path": resolved_style_path},
|
||||
)
|
||||
self.pruefmanager.verarbeite(pe_warn)
|
||||
else:
|
||||
try:
|
||||
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
|
||||
if callable(apply_style_fn):
|
||||
apply_style_fn(self.gpkg_path, layer)
|
||||
except Exception:
|
||||
pe_warn = pruef_ergebnis(
|
||||
ok=True,
|
||||
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
|
||||
aktion="stil_not_implemented",
|
||||
kontext={"thema": thema},
|
||||
)
|
||||
self.pruefmanager.verarbeite(pe_warn)
|
||||
|
||||
try:
|
||||
# qgisui wrapper wird hier nicht direkt für die Abfrage verwendet;
|
||||
@@ -374,62 +381,67 @@ class Datenschreiber:
|
||||
# ------------------------------------------------------------------ #
|
||||
# Hilfsfunktionen intern
|
||||
# ------------------------------------------------------------------ #
|
||||
def _write_layer_to_gpkg(self, layer_name: str, features: List[Any], mode: str = "create") -> Optional[str]:
|
||||
def _write_layer_to_gpkg(
|
||||
self,
|
||||
layer_name: str,
|
||||
layer: Any,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Interne Hilfsfunktion zum Schreiben eines Layers in das GPKG.
|
||||
Schreibt einen QgsVectorLayer in die Ziel-GPKG.
|
||||
|
||||
Erwartete qgiscore-Funktion:
|
||||
qgiscore.write_features_to_gpkg(gpkg_path, layer_name, features, mode)
|
||||
Voraussetzungen:
|
||||
- self.gpkg_path ist ein str
|
||||
- layer ist ein gültiger QgsVectorLayer
|
||||
"""
|
||||
write_fn = getattr(qgiscore, "write_features_to_gpkg", None)
|
||||
if callable(write_fn):
|
||||
try:
|
||||
write_fn(self.gpkg_path, layer_name, features, mode)
|
||||
return None
|
||||
except Exception as exc:
|
||||
return str(exc)
|
||||
|
||||
# Fallback: Verwende QgsVectorFileWriter, falls QGIS verfügbar
|
||||
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsVectorFileWriter", None) is not None:
|
||||
try:
|
||||
# Minimaler Fallback: erwarte, dass 'features' eine Liste von QgsFeature ist
|
||||
if not features:
|
||||
# Erstelle leeren Layer-Eintrag (GPKG erlaubt leere Layer)
|
||||
# Hier vereinfachen wir: writeAsVectorFormatV3 benötigt ein Layer-Objekt.
|
||||
return None
|
||||
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
|
||||
return "Ungültiger Layer zum Schreiben übergeben"
|
||||
|
||||
# Versuche, ein Memory-Layer aus dem ersten Feature zu ermitteln
|
||||
first = features[0]
|
||||
mem_layer = None
|
||||
if hasattr(first, "fields") and hasattr(first, "geometry"):
|
||||
# Wenn Features QgsFeature sind, versuchen wir, das zugehörige Layer zu nutzen
|
||||
try:
|
||||
mem_layer = first.layer() if hasattr(first, "layer") else None
|
||||
except Exception:
|
||||
mem_layer = None
|
||||
try:
|
||||
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
|
||||
opts.driverName = "GPKG"
|
||||
opts.layerName = layer_name
|
||||
opts.fileEncoding = "UTF-8"
|
||||
|
||||
if mem_layer is None:
|
||||
return "Keine Feld-/Geometrie-Informationen zum Schreiben vorhanden"
|
||||
# Style in der GPKG speichern, wenn möglich
|
||||
if hasattr(opts, "symbologyExport"):
|
||||
try:
|
||||
# QGIS: SymbologyExport-Wert z.B. QgsVectorFileWriter.SaveVectorOptions.Symbology
|
||||
saveOpts = qgiscore.QgsVectorFileWriter.SaveVectorOptions
|
||||
sym_val = getattr(saveOpts, "Symbology", None)
|
||||
if sym_val is None:
|
||||
sym_val = getattr(saveOpts, "SymbologyExport", None)
|
||||
if sym_val is not None:
|
||||
opts.symbologyExport = sym_val
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
|
||||
opts.driverName = "GPKG"
|
||||
opts.layerName = layer_name
|
||||
opts.fileEncoding = "UTF-8"
|
||||
if mode == "overwrite":
|
||||
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
|
||||
else:
|
||||
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
|
||||
# Datei existiert → Layer überschreiben
|
||||
# Datei existiert nicht → neue GPKG anlegen
|
||||
if not os.path.exists(self.gpkg_path):
|
||||
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
|
||||
else:
|
||||
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
|
||||
|
||||
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
|
||||
mem_layer,
|
||||
self.gpkg_path,
|
||||
qgiscore.QgsProject.instance().transformContext(),
|
||||
opts
|
||||
)
|
||||
if err != qgiscore.QgsVectorFileWriter.NoError:
|
||||
return f"Fehler beim Schreiben (Code {err})"
|
||||
return None
|
||||
except Exception as exc:
|
||||
return str(exc)
|
||||
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
|
||||
layer,
|
||||
self.gpkg_path,
|
||||
qgiscore.QgsProject.instance().transformContext(),
|
||||
opts,
|
||||
)
|
||||
|
||||
return "Keine Schreib-Funktion verfügbar (Wrapper nicht implementiert)"
|
||||
# QGIS ≥3 liefert ein Tupel: (error_code, error_message, new_filename, new_layer_name)
|
||||
if isinstance(err, tuple):
|
||||
error_code = err[0]
|
||||
error_msg = err[1] if len(err) > 1 else ""
|
||||
else:
|
||||
error_code = err
|
||||
error_msg = ""
|
||||
|
||||
if error_code != qgiscore.QgsVectorFileWriter.NoError:
|
||||
return f"Fehler beim Schreiben (Code {error_code}, msg='{error_msg}')"
|
||||
|
||||
return None
|
||||
|
||||
except Exception as exc:
|
||||
return str(exc)
|
||||
|
||||
395
modules/LayerLoader.py
Normal file
395
modules/LayerLoader.py
Normal file
@@ -0,0 +1,395 @@
|
||||
"""sn_basis/modules/LayerLoader.py
|
||||
|
||||
Kapselt Layer-Erstellung, Raumfilter und Stil-Logik.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
import time
|
||||
|
||||
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
|
||||
from sn_basis.functions.qgiscore_wrapper import (
|
||||
QgsVectorLayer,
|
||||
QgsRasterLayer,
|
||||
QgsFeatureRequest,
|
||||
QgsProject,
|
||||
QgsNetworkAccessManager,
|
||||
QgsCoordinateTransform,
|
||||
)
|
||||
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
|
||||
from sn_basis.modules.stilpruefer import Stilpruefer
|
||||
from sn_basis.modules.layerpruefer import Layerpruefer
|
||||
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
||||
from sn_basis.functions import qt_wrapper as qt
|
||||
|
||||
|
||||
class LayerLoader:
|
||||
"""Lädt und filtert Layer aus Dienst-/Datenquellen."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pruefmanager: Any,
|
||||
stil_pruefer: Optional[Stilpruefer] = None,
|
||||
layer_pruefer: Optional[Layerpruefer] = None,
|
||||
) -> None:
|
||||
self.pruefmanager = pruefmanager
|
||||
self.stil_pruefer = stil_pruefer or Stilpruefer()
|
||||
self.layer_pruefer = layer_pruefer or Layerpruefer()
|
||||
|
||||
_LAYER_TIMEOUT_MS = 30_000 # 30 Sekunden
|
||||
|
||||
def _was_canceled(self, cancel_callback: Optional[Any]) -> bool:
|
||||
if not callable(cancel_callback):
|
||||
return False
|
||||
try:
|
||||
return bool(cancel_callback())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _process_events(self) -> None:
|
||||
try:
|
||||
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
|
||||
qt.QCoreApplication.processEvents()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _transform_geometry_to_layer_crs(self, geometry: Any, source_layer: Any, target_layer: Any) -> Any:
|
||||
if geometry is None or source_layer is None or target_layer is None:
|
||||
return geometry
|
||||
|
||||
if QgsCoordinateTransform is None or QgsProject is None:
|
||||
return geometry
|
||||
|
||||
try:
|
||||
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
|
||||
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
|
||||
if source_crs is None or target_crs is None:
|
||||
return geometry
|
||||
|
||||
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
|
||||
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
|
||||
if source_authid and target_authid and source_authid == target_authid:
|
||||
return geometry
|
||||
|
||||
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
|
||||
if hasattr(geometry, "clone") and callable(getattr(geometry, "clone")):
|
||||
geom_copy = geometry.clone()
|
||||
else:
|
||||
geom_copy = geometry
|
||||
geom_copy.transform(ct)
|
||||
return geom_copy
|
||||
except Exception:
|
||||
return geometry
|
||||
|
||||
def _transform_extent_to_layer_crs(self, extent: Any, source_layer: Any, target_layer: Any) -> Any:
|
||||
if extent is None or source_layer is None or target_layer is None:
|
||||
return extent
|
||||
|
||||
if QgsCoordinateTransform is None or QgsProject is None:
|
||||
return extent
|
||||
|
||||
try:
|
||||
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
|
||||
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
|
||||
if source_crs is None or target_crs is None:
|
||||
return extent
|
||||
|
||||
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
|
||||
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
|
||||
if source_authid and target_authid and source_authid == target_authid:
|
||||
return extent
|
||||
|
||||
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
|
||||
if hasattr(ct, "transformBoundingBox"):
|
||||
return ct.transformBoundingBox(extent)
|
||||
return extent
|
||||
except Exception:
|
||||
return extent
|
||||
|
||||
def create_layer(self, provider: str, link: str, thema: str) -> Optional[QgsVectorLayer]:
|
||||
provider_lower = provider.lower() if provider else ""
|
||||
layer = None
|
||||
|
||||
# Netzwerk-Timeout für alle netzwerkbasierten Provider setzen
|
||||
if provider_lower in ("wfs", "wms", "rest"):
|
||||
try:
|
||||
nam = QgsNetworkAccessManager.instance()
|
||||
if hasattr(nam, "setTimeout"):
|
||||
nam.setTimeout(self._LAYER_TIMEOUT_MS)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
if provider_lower == "wfs":
|
||||
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
|
||||
layer = QgsVectorLayer(uri, thema, "WFS")
|
||||
elif provider_lower == "wms":
|
||||
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
|
||||
layer = QgsRasterLayer(uri, thema, "wms")
|
||||
elif provider_lower in ("ogr", "gpkg", "shp", "geojson"):
|
||||
layer = QgsVectorLayer(link, thema, "ogr")
|
||||
elif provider_lower == "rest":
|
||||
rest_link = link.strip()
|
||||
if rest_link.lower().endswith("/featureserver"):
|
||||
rest_link = rest_link.rstrip("/") + "/0"
|
||||
uri = rest_link if rest_link.lower().startswith("url=") else f"url={rest_link}"
|
||||
layer = QgsVectorLayer(uri, thema, "arcgisfeatureserver")
|
||||
else:
|
||||
layer = QgsVectorLayer(link, thema, "ogr")
|
||||
except Exception as exc:
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Erstellen des Layers {thema}: {exc}",
|
||||
aktion="layer_nicht_verfuegbar",
|
||||
kontext={"provider": provider, "link": link},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
if not layer or not layer.isValid():
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Layer {thema} (Provider={provider}) konnte nicht geladen werden."
|
||||
,aktion="layer_nicht_verfuegbar",
|
||||
kontext={"provider": provider, "link": link},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
return layer
|
||||
|
||||
def apply_style(self, layer: QgsVectorLayer, style_path: Optional[str]) -> None:
|
||||
if not style_path or layer is None or not layer.isValid():
|
||||
return
|
||||
|
||||
if not style_path.strip():
|
||||
return
|
||||
|
||||
if not is_absolute_path(style_path):
|
||||
plugin_root = get_plugin_root()
|
||||
style_path = str(join_path(plugin_root, "sn_plan41", "assets", style_path))
|
||||
|
||||
# normalize path for consistency
|
||||
style_path = str(normalize_path(style_path))
|
||||
|
||||
# Debug: welche Stil-Datei wird geprüft?
|
||||
print(f"[LayerLoader] Überprüfe Stildatei: '{style_path}'")
|
||||
|
||||
if file_exists(style_path):
|
||||
try:
|
||||
layer.loadNamedStyle(style_path)
|
||||
layer.triggerRepaint()
|
||||
except Exception as exc:
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Stil-Laden für {layer.name()}: {exc}",
|
||||
aktion="stil_laden_fehlgeschlagen",
|
||||
kontext={"thema": layer.name(), "style_path": style_path},
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=True,
|
||||
meldung=f"Stildatei nicht gefunden (optional): {style_path}",
|
||||
aktion="stil_nicht_gefunden",
|
||||
kontext={"thema": layer.name(), "style_path": style_path},
|
||||
)
|
||||
)
|
||||
|
||||
def filter_by_extent(self, layer: QgsVectorLayer, extent, cancel_callback: Optional[Any] = None, source_layer: Optional[Any] = None) -> Optional[QgsVectorLayer]:
|
||||
"""Beschneidet <layer> auf die rechteckige Ausdehnung <extent>.
|
||||
|
||||
Diese Methode verwendet einen einfachen BBOX-Filter. Für komplexere
|
||||
Raumeinschränkungen (z.B. Verfahrensgebiet) sollte stattdessen
|
||||
:meth:`filter_by_layer` verwendet werden, da dort echte Geometrie-Tests
|
||||
stattfinden.
|
||||
"""
|
||||
if not layer or not layer.isValid() or extent is None:
|
||||
return layer
|
||||
|
||||
if layer.type() != QgsVectorLayer.VectorLayer:
|
||||
return layer
|
||||
|
||||
extent_for_layer = self._transform_extent_to_layer_crs(extent, source_layer, layer)
|
||||
request = QgsFeatureRequest().setFilterRect(extent_for_layer)
|
||||
if hasattr(request, "setTimeout"):
|
||||
try:
|
||||
request.setTimeout(self._LAYER_TIMEOUT_MS)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
start = time.monotonic()
|
||||
features: List[Any] = []
|
||||
try:
|
||||
for feat in layer.getFeatures(request):
|
||||
if self._was_canceled(cancel_callback):
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Abbruch beim Raumfilter (BBOX) für {layer.name()}",
|
||||
aktion="needs_user_action",
|
||||
kontext={"thema": layer.name()},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
elapsed_ms = int((time.monotonic() - start) * 1000)
|
||||
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Timeout beim Raumfilter (BBOX) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
|
||||
aktion="url_nicht_erreichbar",
|
||||
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
features.append(feat)
|
||||
if len(features) % 100 == 0:
|
||||
self._process_events()
|
||||
except Exception as exc:
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Lesen der Features für {layer.name()}: {exc}",
|
||||
aktion="layer_nicht_verfuegbar",
|
||||
kontext={"thema": layer.name()},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
if not features:
|
||||
return None
|
||||
|
||||
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
|
||||
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
|
||||
uri = f"{geom_type}?crs={layer.crs().authid()}"
|
||||
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_bbox", "memory")
|
||||
if not filtered_layer or not filtered_layer.isValid():
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
|
||||
aktion="filterlayer_nicht_erzeugt",
|
||||
kontext={"thema": layer.name()},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
provider = filtered_layer.dataProvider()
|
||||
provider.addAttributes(layer.fields())
|
||||
filtered_layer.updateFields()
|
||||
provider.addFeatures(features)
|
||||
filtered_layer.updateExtents()
|
||||
|
||||
return filtered_layer
|
||||
|
||||
def filter_by_layer(self, layer: QgsVectorLayer, filter_layer: QgsVectorLayer, cancel_callback: Optional[Any] = None) -> Optional[QgsVectorLayer]:
|
||||
"""Beschneidet <layer> auf die tatsächliche Geometrie des
|
||||
<filter_layer>.
|
||||
|
||||
Diese Methode wird z.B. für das Verfahrensgebiet verwendet, damit nicht
|
||||
die gesamte Bounding-Box, sondern nur die echten Flächen als Raumfilter
|
||||
gelten. Wenn der Filter-Layer mehrere Features enthält, werden deren
|
||||
Geometrien zu einem Multi-Geom vereinigt.
|
||||
"""
|
||||
if not layer or not layer.isValid() or not filter_layer or not filter_layer.isValid():
|
||||
return layer
|
||||
|
||||
if layer.type() != QgsVectorLayer.VectorLayer:
|
||||
return layer
|
||||
|
||||
# vereinigte Geometrie aller Features im Filter-Layer
|
||||
union_geom = None
|
||||
for f in filter_layer.getFeatures():
|
||||
try:
|
||||
geom = self._transform_geometry_to_layer_crs(f.geometry(), filter_layer, layer)
|
||||
if union_geom is None:
|
||||
union_geom = geom
|
||||
else:
|
||||
union_geom = union_geom.combine(geom)
|
||||
except Exception:
|
||||
# bei einem Fehler einfach weiterfahren
|
||||
continue
|
||||
|
||||
if union_geom is None or union_geom.isEmpty():
|
||||
return None
|
||||
|
||||
# nun alle Features aus <layer> nehmen, deren Geometrie sich schneidet
|
||||
filtered = []
|
||||
request = QgsFeatureRequest().setFilterRect(union_geom.boundingBox())
|
||||
if hasattr(request, "setTimeout"):
|
||||
try:
|
||||
request.setTimeout(self._LAYER_TIMEOUT_MS)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
start = time.monotonic()
|
||||
for f in layer.getFeatures(request):
|
||||
if self._was_canceled(cancel_callback):
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Abbruch beim Raumfilter (Geometrie) für {layer.name()}",
|
||||
aktion="needs_user_action",
|
||||
kontext={"thema": layer.name()},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
elapsed_ms = int((time.monotonic() - start) * 1000)
|
||||
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Timeout beim Raumfilter (Geometrie) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
|
||||
aktion="url_nicht_erreichbar",
|
||||
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
try:
|
||||
if f.geometry() and f.geometry().intersects(union_geom):
|
||||
filtered.append(f)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if len(filtered) % 100 == 0:
|
||||
self._process_events()
|
||||
|
||||
if not filtered:
|
||||
return None
|
||||
|
||||
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
|
||||
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
|
||||
uri = f"{geom_type}?crs={layer.crs().authid()}"
|
||||
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_filtered", "memory")
|
||||
if not filtered_layer or not filtered_layer.isValid():
|
||||
self.pruefmanager.verarbeite(
|
||||
pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
|
||||
aktion="filterlayer_nicht_erzeugt",
|
||||
kontext={"thema": layer.name()},
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
provider = filtered_layer.dataProvider()
|
||||
provider.addAttributes(layer.fields())
|
||||
filtered_layer.updateFields()
|
||||
provider.addFeatures(filtered)
|
||||
filtered_layer.updateExtents()
|
||||
|
||||
return filtered_layer
|
||||
|
||||
def add_to_project(self, layer: QgsVectorLayer) -> None:
|
||||
if layer and layer.isValid():
|
||||
QgsProject.instance().addMapLayer(layer)
|
||||
@@ -56,6 +56,27 @@ class Pruefmanager:
|
||||
)
|
||||
info("DataGrabber Zusammenfassung", message)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Allgemeine Nutzerinteraktionen
|
||||
# ------------------------------------------------------------------
|
||||
def zeige_hinweis(self, titel: str, meldung: str) -> None:
|
||||
"""Zeigt eine modale Hinweismeldung mit OK-Button."""
|
||||
from sn_basis.functions.dialog_wrapper import show_info_dialog
|
||||
show_info_dialog(titel, meldung, parent=self.parent)
|
||||
|
||||
def frage_ja_nein(self, titel: str, meldung: str, default: bool = True) -> bool:
|
||||
"""Stellt eine Ja/Nein-Frage. Gibt True zurück, wenn der Nutzer Ja wählt."""
|
||||
if self.ui_modus != "qgis":
|
||||
return default
|
||||
return ask_yes_no(titel, meldung, default=default, parent=self.parent)
|
||||
|
||||
def frage_text(self, titel: str, meldung: str, default_text: str = "") -> tuple[str, bool]:
|
||||
"""Fragt einen Textwert ab und gibt Text + OK-Status zurück."""
|
||||
from sn_basis.functions.dialog_wrapper import ask_text
|
||||
if self.ui_modus != "qgis":
|
||||
return default_text, True
|
||||
return ask_text(titel, meldung, default_text=default_text, parent=self.parent)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# VERFAHRENS-DB-spezifische Entscheidungen
|
||||
# ------------------------------------------------------------------
|
||||
@@ -216,3 +237,26 @@ class Pruefmanager:
|
||||
)
|
||||
print("🔥 verarbeite() ENDE mit ok=False")
|
||||
return ergebnis
|
||||
|
||||
def _ask_use_or_replace_pufferlayer(self) -> str:
|
||||
"""
|
||||
Fragt den Nutzer, ob ein vorhandener Pufferlayer verwendet
|
||||
oder ersetzt werden soll.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
"verwenden", "ersetzen" oder "abbrechen"
|
||||
"""
|
||||
ergebnis = pruef_ergebnis(
|
||||
ok=False,
|
||||
aktion="layer_existiert",
|
||||
meldung="Ein Pufferlayer ist bereits vorhanden.",
|
||||
)
|
||||
|
||||
ergebnis = self.pruefmanager.verarbeite(ergebnis)
|
||||
|
||||
if not ergebnis.ok:
|
||||
return "abbrechen"
|
||||
|
||||
return "verwenden" if ergebnis.aktion == "ok" else "ersetzen"
|
||||
|
||||
@@ -61,7 +61,8 @@ class Linkpruefer:
|
||||
aktion="leer",
|
||||
kontext=None,
|
||||
)
|
||||
|
||||
#evtl. Pfad-Objekte in string umwandeln
|
||||
eingabe = str(eingabe)
|
||||
# -----------------------------------------------------
|
||||
# 1. Fall: URL
|
||||
# -----------------------------------------------------
|
||||
|
||||
@@ -43,6 +43,13 @@ PruefAktion = Literal[
|
||||
# Dateiendung/Format
|
||||
"falsche_endung",
|
||||
"pflichtfelder_fehlen",
|
||||
"unbekannter_dateityp",
|
||||
"Datenbank",
|
||||
"dienst",
|
||||
"excel",
|
||||
"unbekannte_quelle",
|
||||
|
||||
|
||||
|
||||
# Excel/Import
|
||||
"kein_header",
|
||||
@@ -50,6 +57,8 @@ PruefAktion = Literal[
|
||||
"read_error",
|
||||
"open_error",
|
||||
"datenabruf",
|
||||
|
||||
|
||||
|
||||
# 🆕 VERFAHRENS-DB SPEZIFISCH (deine Anforderungen 2.d, 2.e)
|
||||
"datei_wird_erzeugt", # 2.d: Pfad gültig, Datei fehlt → weiter
|
||||
|
||||
@@ -4,9 +4,10 @@ Prüft ausschließlich, ob ein Stilpfad gültig ist.
|
||||
Die Anwendung erfolgt später über eine Aktion.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
import os
|
||||
|
||||
from sn_basis.functions.sys_wrapper import file_exists
|
||||
from sn_basis.functions.os_wrapper import is_absolute_path
|
||||
from sn_basis.functions.sys_wrapper import get_plugin_root, file_exists, join_path
|
||||
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
|
||||
|
||||
|
||||
@@ -40,7 +41,11 @@ class Stilpruefer:
|
||||
kontext=None,
|
||||
)
|
||||
|
||||
pfad = Path(stil_pfad)
|
||||
pfad = str(stil_pfad)
|
||||
|
||||
if not is_absolute_path(pfad):
|
||||
plugin_root = get_plugin_root()
|
||||
pfad = str(join_path(plugin_root, "sn_plan41", "assets", pfad))
|
||||
|
||||
# -----------------------------------------------------
|
||||
# 2. Datei existiert nicht
|
||||
@@ -56,7 +61,7 @@ class Stilpruefer:
|
||||
# -----------------------------------------------------
|
||||
# 3. Falsche Endung
|
||||
# -----------------------------------------------------
|
||||
if pfad.suffix.lower() != ".qml":
|
||||
if os.path.splitext(pfad)[1].lower() != ".qml":
|
||||
return pruef_ergebnis(
|
||||
ok=False,
|
||||
meldung="Die Stil-Datei muss die Endung '.qml' haben.",
|
||||
|
||||
11
plugin.info
11
plugin.info
@@ -1,11 +0,0 @@
|
||||
name=LNO Sachsen | Basisfunktionen
|
||||
description=Plugin mit Basisfunktionen
|
||||
author=Daniel Helbig
|
||||
email=daniel.helbig@kreis-meissen.de
|
||||
qgisMinimumVersion=3.0
|
||||
qgisMaximumVersion=3.99
|
||||
deprecated=False
|
||||
experimental=False
|
||||
supportsQt6=Yes
|
||||
|
||||
zip_folder=sn_basis
|
||||
Reference in New Issue
Block a user