diff --git a/.gitea/workflows/release.yml b/.gitea/workflows/release.yml new file mode 100644 index 0000000..bec21ca --- /dev/null +++ b/.gitea/workflows/release.yml @@ -0,0 +1,289 @@ +name: Release Plugin +run-name: "Release | ${{ github.ref_name }}" + +on: + push: + tags: + - 'v*' + +jobs: + release: + runs-on: alpine-latest + defaults: + run: + shell: bash + + steps: + - name: Notwendige Abhängigkeiten installieren + shell: sh + run: | + apk add --no-cache git zip curl jq rsync bash + git config --global http.sslVerify false + + - name: Code holen + run: | + # Tag aus GitHub Actions Kontext extrahieren + TAG="${GITHUB_REF#refs/tags/}" + + # Repo-URL dynamisch aus vars und github.repository bauen + REPO_URL="https://${RELEASE_TOKEN}:x-oauth-basic@${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}.git" + + # Repository klonen + git clone "$REPO_URL" repo + cd repo + + git checkout "$TAG" + env: + RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }} + + - name: Version und Kanal bestimmen + id: releaseinfo + run: | + TAG="${{ github.ref_name }}" + VERSION="${TAG#v}" + + case "$TAG" in + *-unstable*) + CHANNEL="unstable" + DRAFT="false" + PRERELEASE="true" + ;; + *-testing*) + CHANNEL="testing" + DRAFT="false" + PRERELEASE="true" + ;; + *) + CHANNEL="stable" + DRAFT="false" + PRERELEASE="false" + ;; + esac + + echo "version=$VERSION" >> $GITHUB_OUTPUT + echo "channel=$CHANNEL" >> $GITHUB_OUTPUT + echo "draft=$DRAFT" >> $GITHUB_OUTPUT + echo "prerelease=$PRERELEASE" >> $GITHUB_OUTPUT + + - name: plugin.info einlesen + id: info + run: | + cd repo + while IFS='=' read -r key value; do + echo "$key=$value" >> $GITHUB_OUTPUT + done < plugin.info + + - name: Changelog einlesen + id: changelog + run: | + cd repo + + # Aktueller Block = alles vor dem ersten --- + CURRENT=$(awk '/^---/{exit} {print}' changelog.txt) + + # Vollständige Historie = alles nach dem ersten --- + HISTORY=$(awk 'found{print} /^---/{found=1}' changelog.txt) + + # Gitea Release Body zusammenbauen + VERSION="${{ steps.releaseinfo.outputs.version }}" + FULL=$(printf "## %s\n%s\n\n%s" "$VERSION" "$CURRENT" "$HISTORY") + + echo "DEBUG | Aktueller Changelog:" + echo "$CURRENT" + + # Für GITHUB_OUTPUT: Multiline via EOF-Marker + echo "current<> $GITHUB_OUTPUT + echo "$CURRENT" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + + echo "full<> $GITHUB_OUTPUT + echo "$FULL" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + + - name: metadata.txt erzeugen + run: | + cd repo + + # ------------------------ GEÄNDERT ------------------------ + # Temporär die Vorlage aus dem hidden/templates Branch holen + git fetch origin hidden/templates + git checkout origin/hidden/templates -- metadata.template + TEMPLATE="metadata.template" + # ----------------------------------------------------------- + + # TEMPLATE="templates/metadata.template" + OUT="metadata.txt" + + CONTENT=$(cat "$TEMPLATE") + + CONTENT="${CONTENT//\{\{NAME\}\}/${{ steps.info.outputs.name }}}" + CONTENT="${CONTENT//\{\{QGIS_MIN\}\}/${{ steps.info.outputs.qgisMinimumVersion }}}" + CONTENT="${CONTENT//\{\{QGIS_MAX\}\}/${{ steps.info.outputs.qgisMaximumVersion }}}" + CONTENT="${CONTENT//\{\{DESCRIPTION\}\}/${{ steps.info.outputs.description }}}" + CONTENT="${CONTENT//\{\{VERSION\}\}/${{ steps.releaseinfo.outputs.version }}}" + CONTENT="${CONTENT//\{\{AUTHOR\}\}/${{ steps.info.outputs.author }}}" + CONTENT="${CONTENT//\{\{EMAIL\}\}/${{ steps.info.outputs.email }}}" + CONTENT="${CONTENT//\{\{HOMEPAGE\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}" + CONTENT="${CONTENT//\{\{TRACKER\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}" + CONTENT="${CONTENT//\{\{REPOSITORY\}\}/${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}}" + CONTENT="${CONTENT//\{\{EXPERIMENTAL\}\}/${{ steps.info.outputs.experimental }}}" + CONTENT="${CONTENT//\{\{DEPRECATED\}\}/${{ steps.info.outputs.deprecated }}}" + CONTENT="${CONTENT//\{\{QT6\}\}/${{ steps.info.outputs.supportsQt6 }}}" + + printf "%s\n" "$CONTENT" > "$OUT" + rm $TEMPLATE + + - name: ZIP-Datei erstellen + id: zip + run: | + cd repo + + ZIP_FOLDER="${{ steps.info.outputs.zip_folder }}" + ZIP_FILE="${ZIP_FOLDER}.zip" + + VERSION="${{ steps.releaseinfo.outputs.version }}" + REPO_NAME="${GITHUB_REPOSITORY##*/}" + #ZIP_NAME="${REPO_NAME}-${VERSION}.zip" + + + mkdir -p dist/${ZIP_FOLDER} + + rsync -a \ + --exclude='.git' \ + --exclude='.gitea' \ + --exclude='.plugin' \ + --exclude='dist' \ + ./ dist/${ZIP_FOLDER}/ + + cd dist + zip -r "${ZIP_FILE}" "${ZIP_FOLDER}/" \ + -x "*.pyc" -x "*/__pycache__/*" + cd .. + + echo "zip_file=${ZIP_FILE}" >> $GITHUB_OUTPUT + + - name: Gitea-Release erstellen + id: create_release + run: | + TAG="${{ github.ref_name }}" + VERSION="${{ steps.releaseinfo.outputs.version }}" + CHANNEL="${{ steps.releaseinfo.outputs.channel }}" + + API_URL="https://${{ vars.RELEASE_URL }}/api/v1/repos/${GITHUB_REPOSITORY}/releases" + + JSON=$(jq -n \ + --arg tag "$TAG" \ + --arg name "Version $VERSION" \ + --arg body "${{ steps.changelog.outputs.current }}" \ + --argjson draft "${{ steps.releaseinfo.outputs.draft }}" \ + --argjson prerelease "${{ steps.releaseinfo.outputs.prerelease }}" \ + '{tag_name: $tag, name: $name, body: $body, draft: $draft, prerelease: $prerelease}') + + API_RESPONSE=$(curl -s -X POST "$API_URL" \ + -H "accept: application/json" \ + -H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \ + -H "Content-Type: application/json" \ + -d "$JSON") + + RELEASE_ID=$(echo "$API_RESPONSE" | jq -r '.id') + + if [ "$RELEASE_ID" = "null" ] || [ -z "$RELEASE_ID" ]; then + echo "Fehler beim Erstellen des Releases!" + echo "$API_RESPONSE" + exit 1 + fi + + echo "release_id=$RELEASE_ID" >> $GITHUB_OUTPUT + + - name: ZIP-Datei hochladen + run: | + RELEASE_ID="${{ steps.create_release.outputs.release_id }}" + ZIP_FILE="${{ steps.zip.outputs.zip_file }}" + + API_URL="https://${{ vars.RELEASE_URL }}/api/v1/repos/${GITHUB_REPOSITORY}/releases/${RELEASE_ID}/assets?name=${ZIP_FILE}" + + curl -s -X POST "$API_URL" \ + -H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \ + -H "Content-Type: application/zip" \ + --data-binary "@repo/dist/${ZIP_FILE}" \ + -o upload_response.json + + # Optional: Fehlerprüfung + if jq -e '.id' upload_response.json >/dev/null 2>&1; then + echo "ZIP erfolgreich hochgeladen." + else + echo "Fehler beim Hochladen der ZIP!" + exit 1 + fi + + - name: Payload erzeugen + run: | + cd repo + + VERSION="${{ steps.releaseinfo.outputs.version }}" + CHANNEL="${{ steps.releaseinfo.outputs.channel }}" + ZIP_FILE="${{ steps.zip.outputs.zip_file }}" + + DOWNLOAD_URL="https://${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}/releases/download/${{ github.ref_name }}/${ZIP_FILE}" + + jq -n \ + --arg name "${{ steps.info.outputs.name }}" \ + --arg version "$VERSION" \ + --arg channel "$CHANNEL" \ + --arg description "${{ steps.info.outputs.description }}" \ + --arg author "${{ steps.info.outputs.author }}" \ + --arg email "${{ steps.info.outputs.email }}" \ + --arg qgis_min "${{ steps.info.outputs.qgisMinimumVersion }}" \ + --arg qgis_max "${{ steps.info.outputs.qgisMaximumVersion }}" \ + --arg homepage "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \ + --arg tracker "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \ + --arg repository "${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}" \ + --arg experimental "${{ steps.info.outputs.experimental }}" \ + --arg deprecated "${{ steps.info.outputs.deprecated }}" \ + --arg qt6 "${{ steps.info.outputs.supportsQt6 }}" \ + --arg id "${{ steps.info.outputs.zip_folder }}" \ + --arg url "$DOWNLOAD_URL" \ + --arg changelog "${{ steps.changelog.outputs.current }}" \ + '{ + name: $name, + version: $version, + channel: $channel, + description: $description, + author: $author, + email: $email, + qgis_min: $qgis_min, + qgis_max: $qgis_max, + homepage: $homepage, + tracker: $tracker, + repository: $repository, + experimental: $experimental, + deprecated: $deprecated, + qt6: $qt6, + id: $id, + url: $url, + changelog: $changelog + }' > payload.json + + - name: Repository aktualisieren + run: | + OWNER="AG_QGIS" + WORKFLOW="update.yml" + + PAYLOAD_B64=$(base64 -w0 repo/payload.json) + + FULL_NAME="${{ steps.info.outputs.name }}" + NAME=$(echo "$FULL_NAME" | awk -F'|' '{gsub(/^ +| +$/,"",$2); print $2}') + TAG="${{ steps.releaseinfo.outputs.version }}" + + JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\",\"name\":\"$NAME\",\"tag\":\"$TAG\"}}" + + #JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\"}}" + + echo "DEBUG | Sende JSON:" + echo "$JSON" + + curl -X POST \ + -H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \ + -H "Content-Type: application/json" \ + -d "$JSON" \ + "https://${{ vars.RELEASE_URL }}/api/v1/repos/${OWNER}/Repository/actions/workflows/${WORKFLOW}/dispatches" diff --git a/functions/__init__.py b/functions/__init__.py index cfbfc4a..8b06c1d 100644 --- a/functions/__init__.py +++ b/functions/__init__.py @@ -15,7 +15,7 @@ from .ly_metadata_wrapper import ( is_layer_editable, ) from .ly_style_wrapper import apply_style -from .dialog_wrapper import ask_yes_no +from .dialog_wrapper import ask_yes_no, ask_overwrite_append_cancel_custom from .message_wrapper import ( _get_message_bar, diff --git a/functions/dialog_wrapper.py b/functions/dialog_wrapper.py index f91c4bf..f86413e 100644 --- a/functions/dialog_wrapper.py +++ b/functions/dialog_wrapper.py @@ -2,8 +2,10 @@ sn_basis/functions/dialog_wrapper.py – Benutzer-Dialoge (Qt5/6/Mock-kompatibel) """ from typing import Any +from typing import Literal, Optional from sn_basis.functions.qt_wrapper import ( - QMessageBox, YES, NO, QT_VERSION + QMessageBox, YES, NO, CANCEL, QT_VERSION, exec_dialog, ICON_QUESTION, + QProgressDialog, QCoreApplication, Qt, ) def ask_yes_no( @@ -35,3 +37,146 @@ def ask_yes_no( except Exception as e: print(f"⚠️ ask_yes_no Fehler: {e}") return default + + +OverwriteDecision = Optional[Literal["overwrite", "append", "cancel"]] + + +def ask_overwrite_append_cancel_custom( + parent, + title: str, + message: str, +) -> Literal["overwrite", "append", "cancel"]: + """Zeigt Dialog mit benutzerdefinierten Buttons: Überschreiben/Anhängen/Abbrechen. + + Parameters + ---------- + parent : + Eltern-Widget oder None. + title : str + Dialog-Titel. + message : str + Hauptmeldung mit Erklärung. + + Returns + ------- + Literal["overwrite", "append", "cancel"] + Genaue Entscheidung des Nutzers. + """ + msg = QMessageBox(parent) + msg.setIcon(ICON_QUESTION) + msg.setWindowTitle(title) + msg.setText(message) + + # Eigene Buttons mit exakten Texten + overwrite_btn = msg.addButton("Überschreiben", QMessageBox.ButtonRole.AcceptRole) + append_btn = msg.addButton("Anhängen", QMessageBox.ButtonRole.ActionRole) + cancel_btn = msg.addButton("Abbrechen", QMessageBox.ButtonRole.RejectRole) + + exec_dialog(msg) + + clicked = msg.clickedButton() + if clicked == overwrite_btn: + return "overwrite" + elif clicked == append_btn: + return "append" + else: # cancel_btn + return "cancel" + + +class ProgressDialog: + def __init__(self, total: int, title: str = "Fortschritt", label: str = "Verarbeite..."): + self.total = max(total, 1) + self._canceled = False + + if QT_VERSION == 0: + self.value = 0 + self.label = label + self.title = title + return + + self._dlg = QProgressDialog(label, "Abbrechen", 0, self.total) + self._dlg.setWindowTitle(title) + + # Qt5 vs Qt6: WindowModality-Enum unterschiedlich verfügbar + modality = None + if hasattr(Qt, "WindowModality"): + try: + modality = Qt.WindowModality.WindowModal + except Exception: + modality = None + if modality is None and hasattr(Qt, "WindowModal"): + modality = Qt.WindowModal + if modality is not None: + try: + self._dlg.setWindowModality(modality) + except Exception: + pass + + self._dlg.setMinimumDuration(0) + self._dlg.setAutoClose(False) + self._dlg.setAutoReset(False) + self._dlg.setValue(0) + + def on_cancel(): + if self._dlg and self._dlg.value() >= self.total: + # OK-Button am Ende + self._dlg.close() + return + self._canceled = True + self._dlg.close() + + try: + self._dlg.canceled.connect(on_cancel) + except Exception: + pass + + def set_total(self, total: int) -> None: + self.total = max(total, 1) + if QT_VERSION == 0: + return + + if self._dlg is not None: + self._dlg.setMaximum(self.total) + + def set_value(self, value: int) -> None: + if QT_VERSION == 0: + self.value = value + return + + if self._dlg is not None: + self._dlg.setValue(min(value, self.total)) + if value >= self.total: + self._dlg.setLabelText("Fertig. Klicken Sie auf OK, um das Fenster zu schließen.") + self._dlg.setCancelButtonText("OK") + QCoreApplication.processEvents() + + def set_label(self, text: str) -> None: + if QT_VERSION == 0: + self.label = text + return + + if self._dlg is not None: + self._dlg.setLabelText(text) + QCoreApplication.processEvents() + + def is_canceled(self) -> bool: + if QT_VERSION == 0: + return self._canceled + + if self._dlg is not None: + return self._canceled or self._dlg.wasCanceled() + + return self._canceled + + def close(self) -> None: + if QT_VERSION == 0: + return + + if self._dlg is not None: + self._dlg.close() + + +def create_progress_dialog(total: int, title: str = "Fortschritt", label: str = "Verarbeite...") -> ProgressDialog: + return ProgressDialog(total, title, label) + diff --git a/functions/ly_style_wrapper.py b/functions/ly_style_wrapper.py index 3145532..ad0221a 100644 --- a/functions/ly_style_wrapper.py +++ b/functions/ly_style_wrapper.py @@ -1,23 +1,44 @@ # sn_basis/functions/ly_style_wrapper.py from sn_basis.functions.ly_existence_wrapper import layer_exists -from sn_basis.functions.sys_wrapper import ( - get_plugin_root, - join_path, - file_exists, -) - +from sn_basis.functions.sys_wrapper import get_plugin_root, join_path +from sn_basis.modules.stilpruefer import Stilpruefer +from typing import Optional def apply_style(layer, style_name: str) -> bool: + """ + Wendet einen Layerstil an, sofern er gültig ist. + + - Validierung erfolgt ausschließlich über Stilpruefer + - Keine eigenen Dateisystem- oder Endungsprüfungen + - Keine Seiteneffekte bei ungültigem Stil + """ + print(">>> apply_style() START") + if not layer_exists(layer): return False - style_path = join_path(get_plugin_root(), "styles", style_name) - if not file_exists(style_path): + # Stilpfad zusammensetzen + style_path = join_path(get_plugin_root(), "sn_verfahrensgebiet","styles", style_name) + + # Stil prüfen + pruefer = Stilpruefer() + ergebnis = pruefer.pruefe(style_path) + print(">>> Stilprüfung:", ergebnis) + + print( + f"[Stilprüfung] ok={ergebnis.ok} | " + f"aktion={ergebnis.aktion} | " + f"meldung={ergebnis.meldung}" +) + + + if not ergebnis.ok: return False + # Stil anwenden try: - ok, _ = layer.loadNamedStyle(style_path) + ok, _ = layer.loadNamedStyle(str(ergebnis.kontext)) if ok: getattr(layer, "triggerRepaint", lambda: None)() return True diff --git a/functions/os_wrapper.py b/functions/os_wrapper.py index 6bd6d10..5558072 100644 --- a/functions/os_wrapper.py +++ b/functions/os_wrapper.py @@ -57,6 +57,22 @@ def get_home_dir() -> Path: return Path.home() +def is_absolute_path(path: _PathLike) -> bool: + """Prüft, ob ein Pfad absolut ist.""" + try: + return Path(path).is_absolute() + except Exception: + return False + + +def basename(path: _PathLike) -> str: + """Gibt den finalen Namen des Pfades zurück (Dateiname oder Ordner).""" + try: + return Path(path).name + except Exception: + return "" + + # --------------------------------------------------------- # Dateisystem-Eigenschaften # --------------------------------------------------------- @@ -75,3 +91,11 @@ def is_case_sensitive_fs() -> bool: # Linux praktisch immer case-sensitiv return True + + +def path_suffix(path: _PathLike) -> str: + """Gibt die Dateiendung eines Pfades zurück (inklusive Punkt).""" + try: + return Path(path).suffix + except Exception: + return "" diff --git a/functions/qgiscore_wrapper.py b/functions/qgiscore_wrapper.py index 4a3905e..de4b063 100644 --- a/functions/qgiscore_wrapper.py +++ b/functions/qgiscore_wrapper.py @@ -20,6 +20,12 @@ QgsNetworkAccessManager: Type[Any] Qgis: Type[Any] QgsMapLayerProxyModel: Type[Any] QgsVectorFileWriter: Type[Any] # neu: Schreib-API +QgsFeature: Type[Any] +QgsField: Type[Any] +QgsGeometry: Type[Any] +QgsFeatureRequest: Type[Any] +QgsCoordinateTransform: Type[Any] +QgsCoordinateReferenceSystem: Type[Any] QGIS_AVAILABLE = False @@ -36,6 +42,12 @@ try: Qgis as _Qgis, QgsMapLayerProxyModel as _QgsMaplLayerProxyModel, QgsVectorFileWriter as _QgsVectorFileWriter, + QgsFeature as _QgsFeature, + QgsField as _QgsField, + QgsGeometry as _QgsGeometry, + QgsFeatureRequest as _QgsFeatureRequest, + QgsCoordinateTransform as _QgsCoordinateTransform, + QgsCoordinateReferenceSystem as _QgsCoordinateReferenceSystem, ) QgsProject = _QgsProject @@ -45,6 +57,12 @@ try: Qgis = _Qgis QgsMapLayerProxyModel = _QgsMaplLayerProxyModel QgsVectorFileWriter = _QgsVectorFileWriter + QgsFeature = _QgsFeature + QgsField = _QgsField + QgsGeometry = _QgsGeometry + QgsFeatureRequest = _QgsFeatureRequest + QgsCoordinateTransform = _QgsCoordinateTransform + QgsCoordinateReferenceSystem = _QgsCoordinateReferenceSystem QGIS_AVAILABLE = True @@ -116,6 +134,30 @@ except Exception: QgsRasterLayer = _MockQgsRasterLayer + class _MockQgsFeatureRequest: + def __init__(self): + self._filter_rect = None + + def setFilterRect(self, rect): + self._filter_rect = rect + return self + + QgsFeatureRequest = _MockQgsFeatureRequest + + class _MockQgsCoordinateTransform: + def __init__(self, *args, **kwargs): + pass + + def transformBoundingBox(self, rect): + return rect + + class _MockQgsCoordinateReferenceSystem: + def __init__(self, *args, **kwargs): + pass + + QgsCoordinateTransform = _MockQgsCoordinateTransform + QgsCoordinateReferenceSystem = _MockQgsCoordinateReferenceSystem + QgsNetworkAccessManager = _MockQgsNetworkAccessManager class _MockQgis: diff --git a/functions/qt_wrapper.py b/functions/qt_wrapper.py index 09dfa40..256f0a2 100644 --- a/functions/qt_wrapper.py +++ b/functions/qt_wrapper.py @@ -10,12 +10,17 @@ YES: Optional[Any] = None NO: Optional[Any] = None CANCEL: Optional[Any] = None ICON_QUESTION: Optional[Any] = None +QVariant: Type[Any] = object + + # Qt-Klassen (werden dynamisch gesetzt) QDockWidget: Type[Any] = object QMessageBox: Type[Any] = object QFileDialog: Type[Any] = object +QProgressDialog: Type[Any] = object QEventLoop: Type[Any] = object +QTimer: Type[Any] = object QUrl: Type[Any] = object QNetworkRequest: Type[Any] = object QNetworkReply: Type[Any] = object @@ -36,6 +41,8 @@ QToolButton: Type[Any] = object QSizePolicy: Type[Any] = object Qt: Type[Any] = object QComboBox: Type[Any] = object +QHBoxLayout: Type[Any] = object + def exec_dialog(dialog: Any) -> Any: """Führt Dialog modal aus (Qt6: exec(), Qt5: exec_(), Mock: YES)""" @@ -61,6 +68,7 @@ try: from qgis.PyQt.QtWidgets import ( QMessageBox as _QMessageBox, QFileDialog as _QFileDialog, + QProgressDialog as _QProgressDialog, QWidget as _QWidget, QGridLayout as _QGridLayout, QLabel as _QLabel, @@ -77,12 +85,15 @@ try: QToolButton as _QToolButton, QSizePolicy as _QSizePolicy, QComboBox as _QComboBox, + QHBoxLayout as _QHBoxLayout, ) from qgis.PyQt.QtCore import ( QEventLoop as _QEventLoop, + QTimer as _QTimer, QUrl as _QUrl, QCoreApplication as _QCoreApplication, Qt as _Qt, + QVariant as _QVariant ) from qgis.PyQt.QtNetwork import ( QNetworkRequest as _QNetworkRequest, @@ -93,7 +104,10 @@ try: QT_VERSION = 6 QMessageBox = _QMessageBox QFileDialog = _QFileDialog + QProgressDialog = _QProgressDialog + QProgressDialog = _QProgressDialog QEventLoop = _QEventLoop + QTimer = _QTimer QUrl = _QUrl QNetworkRequest = _QNetworkRequest QNetworkReply = _QNetworkReply @@ -115,12 +129,16 @@ try: QToolButton = _QToolButton QSizePolicy = _QSizePolicy QComboBox = _QComboBox - + QVariant = _QVariant + QHBoxLayout= _QHBoxLayout # ✅ QT6 ENUMS YES = QMessageBox.StandardButton.Yes NO = QMessageBox.StandardButton.No CANCEL = QMessageBox.StandardButton.Cancel ICON_QUESTION = QMessageBox.Icon.Question + AcceptRole = QMessageBox.ButtonRole.AcceptRole + ActionRole = QMessageBox.ButtonRole.ActionRole + RejectRole = QMessageBox.ButtonRole.RejectRole # Qt6 Enum-Aliase ToolButtonTextBesideIcon = Qt.ToolButtonStyle.ToolButtonTextBesideIcon @@ -161,12 +179,15 @@ except (ImportError, AttributeError): QToolButton as _QToolButton, QSizePolicy as _QSizePolicy, QComboBox as _QComboBox, + QHBoxLayout as _QHBoxLayout, ) from PyQt5.QtCore import ( QEventLoop as _QEventLoop, + QTimer as _QTimer, QUrl as _QUrl, QCoreApplication as _QCoreApplication, Qt as _Qt, + QVariant as _QVariant ) from PyQt5.QtNetwork import ( QNetworkRequest as _QNetworkRequest, @@ -178,6 +199,7 @@ except (ImportError, AttributeError): QMessageBox = _QMessageBox QFileDialog = _QFileDialog QEventLoop = _QEventLoop + QTimer = _QTimer QUrl = _QUrl QNetworkRequest = _QNetworkRequest QNetworkReply = _QNetworkReply @@ -199,12 +221,18 @@ except (ImportError, AttributeError): QToolButton = _QToolButton QSizePolicy = _QSizePolicy QComboBox = _QComboBox + QVariant = _QVariant + QHBoxLayout = _QHBoxLayout # ✅ PYQT5 ENUMS YES = QMessageBox.Yes NO = QMessageBox.No CANCEL = QMessageBox.Cancel ICON_QUESTION = QMessageBox.Question + AcceptRole = QMessageBox.AcceptRole + ActionRole = QMessageBox.ActionRole + RejectRole = QMessageBox.RejectRole + # PyQt5 Enum-Aliase ToolButtonTextBesideIcon = Qt.ToolButtonTextBesideIcon @@ -244,6 +272,10 @@ except (ImportError, AttributeError): No = NO Cancel = CANCEL Question = ICON_QUESTION + AcceptRole = 0 + ActionRole = 3 + RejectRole = 1 + @classmethod def question(cls, parent, title, message, buttons, default_button): @@ -268,6 +300,17 @@ except (ImportError, AttributeError): QEventLoop = _MockQEventLoop + class _MockQTimer: + def __init__(self, *args, **kwargs): + self.timeout = type('Signal', (), { + 'connect': lambda s, cb: None, + })() + def setSingleShot(self, v: bool) -> None: pass + def start(self, ms: int) -> None: pass + def stop(self) -> None: pass + + QTimer = _MockQTimer + class _MockQUrl(str): def isValid(self) -> bool: return True @@ -423,9 +466,71 @@ except (ImportError, AttributeError): QComboBox = _MockComboBox + + # --------------------------- + # Mock für QVariant + # --------------------------- + + class _MockQVariant: + """ + Minimaler Ersatz für QtCore.QVariant. + + Ziel: + - Werte transparent durchreichen + - Typ-Konstanten bereitstellen + - Keine Qt-Abhängigkeiten + """ + + # Typ-Konstanten (symbolisch, Werte egal) + Invalid = 0 + Int = 1 + Double = 2 + String = 3 + Bool = 4 + Date = 5 + DateTime = 6 + + def __init__(self, value: Any = None): + self._value = value + + def value(self) -> Any: + return self._value + + def __repr__(self) -> str: + return f"QVariant({self._value!r})" + + # Optional: automatische Entpackung + def __int__(self): + return int(self._value) + + def __float__(self): + return float(self._value) + + def __str__(self): + return str(self._value) + QVariant = _MockQVariant + + class _MockQHBoxLayout: + def __init__(self, *args, **kwargs): + self._widgets = [] + + def addWidget(self, widget): + self._widgets.append(widget) + + def addLayout(self, layout): + pass + + def addStretch(self, *args, **kwargs): + pass + + def setSpacing(self, *args, **kwargs): + pass + + def setContentsMargins(self, *args, **kwargs): + pass + QHBoxLayout = _MockQHBoxLayout def exec_dialog(dialog: Any) -> Any: return YES - # --------------------------- TEST --------------------------- if __name__ == "__main__": debug_qt_status() diff --git a/functions/sys_wrapper.py b/functions/sys_wrapper.py index 75b1899..047f563 100644 --- a/functions/sys_wrapper.py +++ b/functions/sys_wrapper.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import Union import sys +from sn_basis.functions.os_wrapper import is_absolute_path, basename _PathLike = Union[str, Path] diff --git a/metadata.txt b/metadata.txt index c63370b..868d712 100644 --- a/metadata.txt +++ b/metadata.txt @@ -1,13 +1,15 @@ [general] -name=LNO Sachsen | Basisfunktionen -qgisMinimumVersion=3.0 +name=LNO Sachsen | Basisfunktionen description=Plugin mit Basisfunktionen -version=25.11.4 author=Michael Otto email=michael.otto@landkreis-mittelsachsen.de -about=Plugin mit Basisfunktionen -category=Plugins -homepage=https://entwicklung.vln-sn.de/AG_QGIS/Plugin_SN_Basis -repository=https://entwicklung.vln-sn.de/AG_QGIS/Repository -supportsQt6=true -experimental=true \ No newline at end of file +homepage=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis +repository=https://entwicklung.flurneuordnung-sachsen/AG_QGIS/Repository +qgisMinimumVersion=3.40 +qgisMaximumVersion=4.99 +version=26.3.11 +deprecated=False +experimental=true +supportsQt6=Yes + +zip_folder=sn_basis diff --git a/modules/DataGrabber.py b/modules/DataGrabber.py index d78ce61..b0b440a 100644 --- a/modules/DataGrabber.py +++ b/modules/DataGrabber.py @@ -2,14 +2,14 @@ DataGrabber module ================== -UI‑freier Orchestrator für die Prüfung und Klassifikation von Datenquellen. +UI-freier Orchestrator für die Prüfung und Klassifikation von Datenquellen. Der DataGrabber: - klassifiziert die übergebene Quelle (Datei, Dienst, Datenbank, Excel), - ruft passende Prüfer (Dateipruefer, Linkpruefer, Layerpruefer, Stilpruefer) auf, -- sammelt alle rohen ``pruef_ergebnis``‑Objekte, +- sammelt alle rohen ``pruef_ergebnis``-Objekte, - aggregiert diese zu einem zusammenfassenden Ergebnis, -- **löst selbst keinerlei UI‑Interaktion aus**. +- **löst selbst keinerlei UI-Interaktion aus**. Alle Nutzerinteraktionen (MessageBar, QMessageBox, Logging) erfolgen ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline). @@ -17,8 +17,11 @@ ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline from __future__ import annotations +import re from typing import Any, Dict, List, Mapping, Optional, Tuple, Literal +from sn_basis.functions.os_wrapper import basename, path_suffix + from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.modules.Pruefmanager import Pruefmanager @@ -27,6 +30,7 @@ from sn_basis.modules.linkpruefer import Linkpruefer from sn_basis.modules.layerpruefer import Layerpruefer from sn_basis.modules.stilpruefer import Stilpruefer from sn_basis.modules.excel_importer import ExcelImporter +from sn_plan41.modules.listenauswerter import Listenauswerter SourceType = Literal["service", "database", "excel", "unknown"] @@ -38,9 +42,6 @@ class DataGrabber: """ Analysiert und prüft Datenquellen für den Fachdatenabruf. - Der DataGrabber ist **UI‑frei**. Er erzeugt ausschließlich rohe - ``pruef_ergebnis``‑Objekte und überlässt deren Verarbeitung - vollständig dem aufrufenden Code. """ def __init__( @@ -55,9 +56,9 @@ class DataGrabber: ) -> None: self.pruefmanager = pruefmanager self._datei_pruefer_cls = datei_pruefer_cls - self.link_pruefer = link_pruefer - self.layer_pruefer = layer_pruefer - self.stil_pruefer = stil_pruefer + self.link_pruefer = link_pruefer or Linkpruefer() + self.layer_pruefer = layer_pruefer or Layerpruefer() + self.stil_pruefer = stil_pruefer or Stilpruefer() self._excel_importer_cls = excel_importer_cls self._source: Optional[str] = None @@ -69,23 +70,61 @@ class DataGrabber: """Setzt die aktuell zu untersuchende Rohquelle.""" self._source = source - def analyze_source_type(self, source: str) -> SourceType: - """ - Klassifiziert die Quelle. + SourceType = str # "excel" | "datenbank" | "dienst" | "unbekannt" - Aktuell Platzhalter – liefert ``"unknown"``. + + def analyze_source_type(self, quelle: str) -> Tuple[SourceType, pruef_ergebnis]: """ - return "unknown" + Klassifiziert die Quelle und liefert das zugehörige pruef_ergebnis. + + Reihenfolge: + 1. Dateipruefer (Datei + Dateityp) + 2. Linkpruefer (Dienst) + """ + + # -------------------------------------------------- + # 1. Datei prüfen (inkl. Typ-Erkennung) + # -------------------------------------------------- + dateipruefer = Dateipruefer(pfad=quelle) + datei_ergebnis = dateipruefer.pruefe() + + if datei_ergebnis.ok: + suffix = path_suffix(datei_ergebnis.kontext).lower() + print(f"[DataGrabber] Debug: analyze_source_type source={quelle} -> suffix={suffix}") + + if suffix == ".xlsx": + return "excel", datei_ergebnis + + if suffix in (".gpkg", ".sqlite"): + return "datenbank", datei_ergebnis + + return "unbekannter_dateityp", datei_ergebnis + + # -------------------------------------------------- + # 2. Keine Datei → Link prüfen + # -------------------------------------------------- + linkpruefer = Linkpruefer() + link_ergebnis = linkpruefer.pruefe(quelle) + + if link_ergebnis.ok: + return "dienst", link_ergebnis + + # -------------------------------------------------- + # 3. Weder Datei noch Dienst + # -------------------------------------------------- + + return "unbekannte_quelle", link_ergebnis def run(self, source: str) -> Tuple[SourceDict, pruef_ergebnis]: """ Führt die vollständige Quellprüfung aus. - Diese Methode ist **UI‑frei**. Sie gibt rohe Ergebnisse zurück, + Diese Methode ist **UIfrei**. Sie gibt rohe Ergebnisse zurück, die vom Aufrufer über den ``Pruefmanager`` verarbeitet werden. """ self.set_source(source) - source_type = self.analyze_source_type(source) + source_type, source_result = self.analyze_source_type(source) + print(f"[DataGrabber] Debug: run source={source} -> source_type={source_type}") source_dict: SourceDict = {} partial_results: List[pruef_ergebnis] = [] @@ -97,14 +136,7 @@ class DataGrabber: elif source_type == "service": source_dict, partial_results = self._process_service_source(source) else: - partial_results.append( - pruef_ergebnis( - ok=False, - meldung="Quelle konnte nicht klassifiziert werden", - aktion="kein_dateipfad", - kontext={"source": source}, - ) - ) + partial_results.append(source_result) summary = self._aggregate_results(source, source_dict, partial_results) return source_dict, summary @@ -115,9 +147,150 @@ class DataGrabber: def _process_excel_source( self, filepath: str ) -> Tuple[SourceDict, List[pruef_ergebnis]]: - source_dict: SourceDict = {} + source_dict: SourceDict = {"rows": []} results: List[pruef_ergebnis] = [] - return source_dict, results + + rows = ExcelImporter(filepath, self.pruefmanager).import_xlsx() + print(f"[DataGrabber] Debug: Excel-Linkliste geladen: {filepath}") + print(f"[DataGrabber] Debug: raw rows count: {len(rows)}") + if rows: + first = rows[:min(5, len(rows))] + print(f"[DataGrabber] Debug: first rows: {first}") + + if not rows: + return source_dict, results + + required_keys = {"ident", "gruppe", "kartenebene", "inhalt", "link", "provider", "stildatei"} + + def extract_url(raw_link: str, provider: str) -> str: + if not raw_link: + return "" + if not isinstance(raw_link, str): + return str(raw_link) + + if provider == "wfs": + url_match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE) + type_match = re.search(r"typename\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE) + if url_match: + url = url_match.group(1).strip() + if type_match: + typename = type_match.group(1).strip() + separator = "&" if "?" in url else "?" + return f"url={url}{separator}service=WFS&request=GetFeature&typename={typename}" + return f"url={url}" + + if provider == "wms": + # falls WMS-URL als url='...' vorliegt + match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE) + if match: + return match.group(1).strip() + + if provider == "rest": + # REST/ArcGIS-Server: direkt nutzen + match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE) + if match: + return match.group(1).strip() + + # allgemeines Rückfallverhalten + match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE) + if match: + return match.group(1).strip() + return raw_link.strip() + + for row_index, raw_row in enumerate(rows, start=2): + if not isinstance(raw_row, Mapping): + pe = pruef_ergebnis( + ok=False, + meldung="Linklisten-Zeile ist nicht als Dictionary formatiert.", + aktion="ungueltige_zeile", + kontext={"zeile": row_index, "wert": raw_row}, + ) + results.append(self.pruefmanager.verarbeite(pe)) + continue + + normalized = {str(k).strip().lower(): v for k, v in raw_row.items() if k is not None} + if not required_keys.issubset(normalized.keys()): + missing = required_keys.difference(normalized.keys()) + pe = pruef_ergebnis( + ok=False, + meldung=f"Linkliste fehlt erforderliche Spalten: {', '.join(sorted(missing))}", + aktion="spaltenfehlend", + kontext={"zeile": row_index, "fehlend": sorted(missing)}, + ) + results.append(self.pruefmanager.verarbeite(pe)) + continue + + ident = normalized.get("ident") + link_raw = normalized.get("link") or "" + provider = str(normalized.get("provider") or "").strip().lower() + stildatei_raw = normalized.get("stildatei") or "" + stildatei = None + + if stildatei_raw and str(stildatei_raw).strip(): + style_result = self.stil_pruefer.pruefe(str(stildatei_raw).strip()) + results.append(self.pruefmanager.verarbeite(style_result)) + if style_result.ok: + # Style-Pfad in der Datenkette beibehalten (absolut, wenn vorhanden). + stildatei = str(style_result.kontext or stildatei_raw).strip() + else: + stildatei = None + else: + results.append(self.pruefmanager.verarbeite(pruef_ergebnis(ok=True, meldung="Kein Stil angegeben", aktion="stil_optional", kontext=None))) + stildatei = None + + if not ident or not link_raw or not provider: + pe = pruef_ergebnis( + ok=False, + meldung="Linklisten-Zeile hat fehlende Pflichtfelder (ident/link/provider).", + aktion="pflichtfelder_fehlen", + kontext={"zeile": row_index, "daten": raw_row}, + ) + results.append(self.pruefmanager.verarbeite(pe)) + continue + + link_url = extract_url(link_raw, provider) + + # Provider-abhängige Linkvalidierung + if provider in ("wfs", "wms", "rest"): + # Webdienste: wir akzeptieren die URL-Form und prüfen nicht per network_head. + link_result = pruef_ergebnis(ok=True, meldung="Service-Link angenommen", aktion="service_link", kontext=link_url) + elif provider in ("ogr", "gpkg", "shp", "geojson"): + # OGR/Pfad: mit Linkpruefer (pfad oder lokale Datei) prüfen + link_result = self.link_pruefer.pruefe(link_url) + else: + link_result = self.link_pruefer.pruefe(link_url) + + results.append(self.pruefmanager.verarbeite(link_result)) + + # stildatei wurde bereits oben geprüft und ggf. auf Dateiname gesetzt oder auf None + + if not link_result.ok: + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Zeile {row_index}: fehlerhafter Link", + aktion="link_unvollstaendig", + kontext={"row": row_index, "ident": ident}, + ) + ) + continue + + result_row = { + "ident": ident, + "gruppe": normalized.get("gruppe"), + "Kartenebene": normalized.get("kartenebene"), + "Inhalt": normalized.get("inhalt"), + "Link": link_url, + "Provider": provider, + "stildatei": stildatei, + } + source_dict["rows"].append(result_row) + + # Validierung über Listenauswerter + listenauswerter = Listenauswerter(self.pruefmanager, self.stil_pruefer or Stilpruefer()) + validated, validation_results = listenauswerter.validate_rows(source_dict) + results.extend(validation_results) + return validated, results # ------------------------------------------------------------------ # Datenbank‑Quellen @@ -125,6 +298,7 @@ class DataGrabber: def _process_database_source( self, db_path: str ) -> Tuple[SourceDict, List[pruef_ergebnis]]: + print(f"[DataGrabber] Debug: _process_database_source called, db_path={db_path}") source_dict: SourceDict = {} results: List[pruef_ergebnis] = [] return source_dict, results @@ -149,24 +323,29 @@ class DataGrabber: partial_results: List[pruef_ergebnis], ) -> pruef_ergebnis: """ - Aggregiert Einzelprüfungen zu einem Gesamt‑``pruef_ergebnis``. + Aggregiert Einzelprüfungen zu einem Gesamt-``pruef_ergebnis``. - **Keine UI‑Interaktion.** + **Keine UI-Interaktion.** """ - if source_dict: + rows = source_dict.get("rows") if isinstance(source_dict, dict) else None + if rows: return pruef_ergebnis( ok=True, meldung="Quelle erfolgreich geprüft", aktion="ok", kontext={ "source": source, - "valid_entries": sum(len(v) for v in source_dict.values()), + "valid_entries": len(rows), }, ) + # Wenn die Linkliste zwar gelesen wurde, aber keine gültigen Zeilen verfügbar sind, geben wir spezifischere Infos zurück. return pruef_ergebnis( ok=False, - meldung="Keine gültigen Einträge in der Quelle gefunden", - aktion="read_error", - kontext={"source": source}, + meldung="Keine validen Einträge in der Linkliste gefunden", + aktion="keine_validen_eintraege", + kontext={ + "source": source, + "eintraege_gesamt": len(source_dict.get("rows", [])), + }, ) diff --git a/modules/Dateipruefer.py b/modules/Dateipruefer.py index 1ad1a44..552c558 100644 --- a/modules/Dateipruefer.py +++ b/modules/Dateipruefer.py @@ -6,11 +6,11 @@ der Anforderungen 1-2.e (leerer Pfad, fehlende Datei, bestehende Datei). """ from pathlib import Path -from typing import Optional +from typing import Optional, Literal from sn_basis.functions.sys_wrapper import join_path, file_exists from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion - +DateiTyp = Literal["excel","datenbank","unbekannt"] class Dateipruefer: """ @@ -74,10 +74,25 @@ class Dateipruefer: # ------------------------------------------------------------------ # Hilfsfunktionen # ------------------------------------------------------------------ + def erkenne_dateityp(self, pfad: Path) -> DateiTyp: + """ + Erkennt den Dateityp anhand der Endung. + """ + suffix = pfad.suffix.lower() + + if suffix == ".xlsx": + return "excel" + + if suffix in (".gpkg", ".sqlite"): + return "datenbank" + + return "unbekannt" + def _pfad(self, relativer_pfad: str) -> Path: """Erzeugt OS-unabhängigen Pfad relativ zum Basisverzeichnis.""" return join_path(self.basis_pfad, relativer_pfad) + def _ist_leer(self) -> bool: """ Prüft robust, ob Eingabe als „leer" zu behandeln ist. @@ -134,6 +149,31 @@ class Dateipruefer: # 2. Pfad normalisieren pfad = self._pfad(self.pfad.strip()) + #Excel-dateien erkennen + dateityp = self.erkenne_dateityp(pfad) + + if dateityp == "excel": + if not file_exists(pfad): + return pruef_ergebnis( + ok=False, + meldung=f"Excel-Datei '{self.pfad}' wurde nicht gefunden.", + aktion="datei_nicht_gefunden", + kontext=pfad, + ) + + return pruef_ergebnis( + ok=True, + meldung="Excel-Datei ist gültig.", + aktion="ok", + kontext=pfad, + ) + if dateityp != "datenbank": + return pruef_ergebnis( + ok=False, + meldung=f"Der Pfad '{self.pfad}' ist kein unterstützter Dateityp.", + aktion="unbekannter_dateityp", + kontext=pfad, + ) # 🆕 2.c: Ungültiger GPKG-Pfad? if not self.verfahrens_db_modus or not self._ist_gueltiger_gpkg_pfad(pfad): diff --git a/modules/Datenabruf.py b/modules/Datenabruf.py index 7ff9036..45409d4 100644 --- a/modules/Datenabruf.py +++ b/modules/Datenabruf.py @@ -17,10 +17,11 @@ Designprinzipien - Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten. """ -from typing import Any, Dict, List, Mapping, Optional, Tuple +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse import json +import time from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.functions import qgiscore_wrapper as qgiscore @@ -59,6 +60,7 @@ class Datenabruf: verfahrensgebiet_layer: Any, speicherort: str, pruef_ergebnisse: Optional[List[Any]] = None, + progress: Optional[Any] = None, ) -> Tuple[Dict[str, Any], List[Any]]: """ Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und @@ -82,6 +84,10 @@ class Datenabruf: # 1) Räumliche Filtergeometrie bestimmen (BBox oder None) bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer) + filter_crs_authid = None + if isinstance(bbox_geom, dict): + raw_crs = bbox_geom.get("crs_authid") + filter_crs_authid = str(raw_crs) if raw_crs else None # Globale Logs über alle Dienste hinweg log_geladen: Dict[str, int] = {} @@ -90,7 +96,20 @@ class Datenabruf: log_ausserhalb: Dict[str, int] = {} # 2) Über alle Zeilen iterieren - for row in rows: + total_rows = len(rows) + for idx, row in enumerate(rows, start=1): + if progress is not None: + progress.set_label(f"Datenabruf {idx}/{total_rows}…") + if progress.is_canceled(): + pe_cancel = pruef_ergebnis( + ok=False, + meldung="Datenabruf durch Benutzer abgebrochen", + aktion="abbruch", + kontext={"schritt": idx}, + ) + processed_results.append(self.pruefmanager.verarbeite(pe_cancel)) + break + ident = row.get("ident") link = row.get("Link") provider = row.get("Provider") @@ -115,7 +134,16 @@ class Datenabruf: url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None) # 2b) Fachdaten abrufen - features, error_msg = self._fetch_features(url=url, provider=str(provider)) + features, error_msg = self._fetch_features( + url=url, + provider=str(provider), + cancel_callback=(progress.is_canceled if progress is not None else None), + ) + + + if progress is not None: + if hasattr(progress, "set_value"): + progress.set_value(idx) # 2c) Logs und Aggregation if error_msg: @@ -207,7 +235,18 @@ class Datenabruf: return None if raumfilter == "Verfahrensgebiet": - return qgiscore.get_layer_extent(verfahrensgebiet_layer) + extent = qgiscore.get_layer_extent(verfahrensgebiet_layer) + if extent is None: + return None + crs_authid = None + try: + if hasattr(verfahrensgebiet_layer, "crs") and callable(getattr(verfahrensgebiet_layer, "crs")): + crs = verfahrensgebiet_layer.crs() + if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")): + crs_authid = crs.authid() + except Exception: + crs_authid = None + return {"extent": extent, "crs_authid": crs_authid} if raumfilter == "Pufferlayer": buffer_layer = qgiscore.create_buffer_layer( @@ -216,8 +255,18 @@ class Datenabruf: layer_name="Verfahrensgebiet_Puffer_1km", ) if buffer_layer is not None: - qgisui.add_layer_to_project(buffer_layer) - return qgiscore.get_layer_extent(buffer_layer) + extent = qgiscore.get_layer_extent(buffer_layer) + if extent is None: + return None + crs_authid = None + try: + if hasattr(buffer_layer, "crs") and callable(getattr(buffer_layer, "crs")): + crs = buffer_layer.crs() + if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")): + crs_authid = crs.authid() + except Exception: + crs_authid = None + return {"extent": extent, "crs_authid": crs_authid} return None @@ -233,60 +282,130 @@ class Datenabruf: Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS"). """ provider_norm = (provider or "").upper() - base_link = link or "" + base_link = (link or "").strip() + if base_link.lower().startswith("url="): + base_link = base_link[4:].strip() - # WMS: niemals BBOX anhängen + if provider_norm == "WFS" and base_link.count("?") > 1: + first, rest = base_link.split("?", 1) + base_link = f"{first}?{rest.replace('?', '&')}" + + extent_obj = bbox_geom + crs_authid: Optional[str] = None + if isinstance(bbox_geom, dict): + extent_obj = bbox_geom.get("extent") + raw_crs = bbox_geom.get("crs_authid") + crs_authid = str(raw_crs) if raw_crs else None + + # WMS: unverändert durchreichen if provider_norm == "WMS": return base_link - if bbox_geom is None: - return base_link - - # Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden) + # Versuche bbox-String zu erzeugen (falls Raumfilter aktiv) bbox_str: Optional[str] = None - try: - extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None) - if callable(extent_to_bbox): - bbox_str = extent_to_bbox(bbox_geom) - else: - # Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing) - if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")): - bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}" - elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4: - bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}" + if extent_obj is not None: + try: + extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None) + if callable(extent_to_bbox): + bbox_str = extent_to_bbox(extent_obj) else: - bbox_str = str(bbox_geom) - except Exception: - bbox_str = None - - if not bbox_str: - return base_link + # Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing) + if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")): + bbox_str = f"{extent_obj.xmin()},{extent_obj.ymin()},{extent_obj.xmax()},{extent_obj.ymax()}" + elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4: + bbox_str = f"{extent_obj[0]},{extent_obj[1]},{extent_obj[2]},{extent_obj[3]}" + else: + bbox_str = str(extent_obj) + except Exception: + bbox_str = None parsed = urlparse(base_link) query_params = dict(parse_qsl(parsed.query, keep_blank_values=True)) if provider_norm == "WFS": - query_params.setdefault("BBOX", bbox_str) + query_params.setdefault("service", "WFS") + query_params.setdefault("request", "GetFeature") + query_params.setdefault("outputFormat", "application/json") + if bbox_str: + query_params.setdefault("BBOX", bbox_str) + if crs_authid: + query_params.setdefault("SRSNAME", crs_authid) new_query = urlencode(query_params, doseq=True) rebuilt = parsed._replace(query=new_query) return urlunparse(rebuilt) if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"): - query_params.setdefault("geometry", bbox_str) - query_params.setdefault("geometryType", "esriGeometryEnvelope") - query_params.setdefault("spatialRel", "esriSpatialRelIntersects") + # ArcGIS FeatureServer erwartet i.d.R. den /query-Endpunkt + rest_base = base_link.rstrip("/") + if not rest_base.lower().endswith("/query"): + rest_base = f"{rest_base}/query" + + parsed_rest = urlparse(rest_base) + query_params = dict(parse_qsl(parsed_rest.query, keep_blank_values=True)) + query_params.setdefault("where", "1=1") + query_params.setdefault("outFields", "*") + query_params.setdefault("returnGeometry", "true") query_params.setdefault("f", query_params.get("f", "json")) + + if bbox_str: + geometry_envelope = None + try: + if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")): + geometry_envelope = { + "xmin": extent_obj.xmin(), + "ymin": extent_obj.ymin(), + "xmax": extent_obj.xmax(), + "ymax": extent_obj.ymax(), + } + elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4: + geometry_envelope = { + "xmin": extent_obj[0], + "ymin": extent_obj[1], + "xmax": extent_obj[2], + "ymax": extent_obj[3], + } + else: + parts = [p.strip() for p in str(bbox_str).split(",")] + if len(parts) == 4: + geometry_envelope = { + "xmin": float(parts[0]), + "ymin": float(parts[1]), + "xmax": float(parts[2]), + "ymax": float(parts[3]), + } + except Exception: + geometry_envelope = None + + if geometry_envelope is not None: + query_params.setdefault("geometry", json.dumps(geometry_envelope)) + else: + query_params.setdefault("geometry", bbox_str) + query_params.setdefault("geometryType", "esriGeometryEnvelope") + query_params.setdefault("spatialRel", "esriSpatialRelIntersects") + + if crs_authid and ":" in crs_authid: + srid = crs_authid.split(":", 1)[1] + if srid.isdigit(): + query_params.setdefault("inSR", srid) + query_params.setdefault("outSR", srid) + new_query = urlencode(query_params, doseq=True) - rebuilt = parsed._replace(query=new_query) + rebuilt = parsed_rest._replace(query=new_query) return urlunparse(rebuilt) - # Default: generischer bbox-Parameter - query_params.setdefault("bbox", bbox_str) + # Default: generischer bbox-Parameter (nur wenn vorhanden) + if bbox_str: + query_params.setdefault("bbox", bbox_str) new_query = urlencode(query_params, doseq=True) rebuilt = parsed._replace(query=new_query) return urlunparse(rebuilt) - def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]: + def _fetch_features( + self, + url: str, + provider: str, + cancel_callback: Optional[Callable[[], bool]] = None, + ) -> Tuple[List[Any], Optional[str]]: """ Führt den eigentlichen Abruf der Fachdaten durch. @@ -336,34 +455,100 @@ class Datenabruf: http_error: Optional[str] = None # QGIS NetworkAccessManager bevorzugen + _FETCH_TIMEOUT_MS = 30_000 # 30 Sekunden + aborted_or_timed_out = False + attempted_qgis_fetch = False + + if callable(cancel_callback) and cancel_callback(): + return [], "Abbruch durch Benutzer" + if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None: + attempted_qgis_fetch = True try: manager = qgiscore.QgsNetworkAccessManager.instance() - QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None) - QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None) - QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None) + # Netzwerk-Timeout global setzen (QGIS >= 3.6) + if hasattr(manager, "setTimeout"): + manager.setTimeout(_FETCH_TIMEOUT_MS) + _qt = __import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]) + QUrl = getattr(_qt, "QUrl", None) + QNetworkRequest = getattr(_qt, "QNetworkRequest", None) + QEventLoop = getattr(_qt, "QEventLoop", None) + QTimer = getattr(_qt, "QTimer", None) if QUrl is not None and QNetworkRequest is not None: req = QNetworkRequest(QUrl(url)) reply = manager.get(req) if QEventLoop is not None: loop = QEventLoop() reply.finished.connect(loop.quit) - loop.exec() - try: - raw = reply.readAll() - data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw - response_text = data_bytes.decode("utf-8", errors="replace") - except Exception: + _poll_timer = None + if QTimer is not None: + try: + _poll_timer = QTimer() + _poll_timer.setSingleShot(False) + _poll_timer.timeout.connect(loop.quit) + _poll_timer.start(100) + except Exception: + _poll_timer = None + + start_time = time.monotonic() + while True: + if callable(cancel_callback) and cancel_callback(): + reply.abort() + http_error = "Abbruch durch Benutzer" + aborted_or_timed_out = True + break + + elapsed_ms = int((time.monotonic() - start_time) * 1000) + if elapsed_ms >= _FETCH_TIMEOUT_MS: + reply.abort() + http_error = f"Timeout nach {_FETCH_TIMEOUT_MS // 1000} s: {url}" + aborted_or_timed_out = True + break + + if hasattr(reply, "isFinished") and reply.isFinished(): + break + + loop.exec() + try: + if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"): + qt.QCoreApplication.processEvents() + except Exception: + pass + + if _poll_timer is not None: + try: + _poll_timer.stop() + except Exception: + pass + + if not aborted_or_timed_out: + # Fehler aus Reply auslesen + err_code = None + try: + err_code = reply.error() + except Exception: + pass + if err_code and int(err_code) != 0: + http_error = f"Netzwerkfehler ({err_code}): {reply.errorString()}" + if http_error: + # Timeout oder Netzwerkfehler – keinen Body lesen + pass + else: try: - response_text = reply.text() + raw = reply.readAll() + data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw + response_text = data_bytes.decode("utf-8", errors="replace") except Exception: - response_text = None + try: + response_text = reply.text() + except Exception: + response_text = None except Exception as exc: http_error = f"QgsNetworkAccessManager error: {exc}" response_text = None - # Fallback: requests - if response_text is None: + # Fallback: requests nur wenn kein harter Abbruch/Timeout im QGIS-Request vorlag + if response_text is None and (not attempted_qgis_fetch or not aborted_or_timed_out): try: import requests # lokal import, keine harte Abhängigkeit r = requests.get(url, timeout=30) @@ -383,6 +568,8 @@ class Datenabruf: return parsed.get("features", []), None if isinstance(parsed, dict) and "features" in parsed: return parsed.get("features", []), None + if prov in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER", "WFS"): + return [], "Antwort enthält keine Feature-Liste" # Sonst: gib das gesamte JSON als einzelnes Objekt zurück return [parsed], None except json.JSONDecodeError: diff --git a/modules/Datenschreiber.py b/modules/Datenschreiber.py index 143e090..71e988b 100644 --- a/modules/Datenschreiber.py +++ b/modules/Datenschreiber.py @@ -30,9 +30,13 @@ from __future__ import annotations from typing import Any, Dict, List, Optional import os import json +import re import datetime +import sqlite3 from sn_basis.functions import qgiscore_wrapper as qgiscore +from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path +from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists from sn_basis.modules.pruef_ergebnis import pruef_ergebnis @@ -53,10 +57,97 @@ class Datenschreiber: def __init__(self, pruefmanager: Any, gpkg_path: Optional[str] = None) -> None: self.pruefmanager = pruefmanager - self.gpkg_path = gpkg_path + self.gpkg_path = str(gpkg_path) if gpkg_path else None - # ------------------------------------------------------------------ # - # Schreibe Daten + def _resolve_style_path(self, style_path: Optional[str]) -> Optional[str]: + if not style_path: + return None + + style_path_str = str(style_path).strip() + if not style_path_str: + return None + + if not is_absolute_path(style_path_str): + plugin_root = get_plugin_root() + style_path_str = str(join_path(plugin_root, "sn_plan41", "assets", style_path_str)) + + style_path_str = str(normalize_path(style_path_str)) + return style_path_str if file_exists(style_path_str) else None + + def _store_style_in_gpkg(self, layer_name: str, style_path: str, layer: Optional[Any] = None) -> None: + """Stellt sicher, dass der Stil in der layer_styles-Tabelle der GPKG gespeichert wird.""" + try: + with open(style_path, "r", encoding="utf-8") as fh: + style_qml = fh.read() + + f_geometry_column = '' + if layer is not None: + try: + if hasattr(layer, 'geometryColumn'): + f_geometry_column = str(layer.geometryColumn()) + elif hasattr(layer, 'dataProvider') and hasattr(layer.dataProvider(), 'geometryColumnName'): + f_geometry_column = str(layer.dataProvider().geometryColumnName()) + except Exception: + f_geometry_column = '' + + with sqlite3.connect(self.gpkg_path) as conn: + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS layer_styles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + f_table_catalog TEXT, + f_table_schema TEXT, + f_table_name TEXT NOT NULL, + f_geometry_column TEXT, + styleName TEXT, + styleQML TEXT, + styleSLD TEXT, + useAsDefault BOOLEAN, + description TEXT, + owner TEXT, + ui TEXT, + update_time DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + + # Das aktuelle QGIS-Style-Verhalten: bestehenden Style für denselben Layer nicht löschen (nur appenden) + # Wir wollen aber Default-Style setzen: alte Default-Styles entfernen. + cur.execute( + "UPDATE layer_styles SET useAsDefault = 0 WHERE f_table_name = ?", + (layer_name,), + ) + + # Fülle die bekannten QGIS-Kolonnen + style_name = os.path.basename(style_path) + + cur.execute( + "INSERT INTO layer_styles (f_table_catalog, f_table_schema, f_table_name, f_geometry_column, styleName, styleQML, styleSLD, useAsDefault, description, owner, ui) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + '', + '', + layer_name, + f_geometry_column, + style_name, + style_qml, + None, + 1, + '', + '', + '', + ), + ) + conn.commit() + except Exception as exc: + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Fehler beim Speichern des Layer-Stils in GPKG: {exc}", + aktion="style_gpkg_speichern_fehlgeschlagen", + kontext={"layer_name": layer_name, "style_path": style_path}, + ) + ) # ------------------------------------------------------------------ # def schreibe_Daten( self, @@ -65,192 +156,93 @@ class Datenschreiber: speicherort: str, ) -> List[Dict[str, Any]]: """ - Schreibt die abgerufenen Daten in die Zieldatenbank/Dateien. + Schreibt die übergebenen Layer in die Ziel-GPKG. - Ablauf - ------ - Für jede Zeile (ident) in ``daten_dict["daten"]``: - 1. Bestimme Ziel-Layername (z. B. Thema oder ident). - 2. Prüfe, ob ein Layer mit diesem Namen bereits existiert (Wrapper). - 3. Falls vorhanden, frage den Benutzer (Überschreiben / Anhängen / Abbrechen) - über die zentrale Pruefmanager-Methode `ask_overwrite_append_cancel`. - 4. Führe die gewählte Operation aus oder schreibe den Layer, wenn er noch nicht existiert. - 5. Schreibe ggf. den Stil in die GPKG und setze ihn als Vorgabe. - 6. Sammle und gib eine Liste der angelegten/geänderten Layer zurück. - - Returns - ------- - List[Dict[str, Any]] - Liste von Dicts mit Informationen zu jedem angelegten/geänderten Layer. + Erwartung: + - daten_dict["daten"] enthält Einträge der Form: + ident -> {"layer": QgsVectorLayer} + - self.gpkg_path ist ein str """ + if not speicherort: raise ValueError("Ein gültiger Speicherort (speicherort) muss übergeben werden.") - # Setze gpkg_path falls noch nicht vorhanden + # gpkg_path einmalig setzen / normalisieren if not self.gpkg_path: - self.gpkg_path = speicherort + self.gpkg_path = str(speicherort) results: List[Dict[str, Any]] = [] - daten_map: Dict[str, List[Any]] = daten_dict.get("daten", {}) + daten_map: Dict[str, Any] = daten_dict.get("daten", {}) - # Iteriere über alle Einträge - for ident, features in daten_map.items(): - # Thema/Name ableiten (falls vorhanden in processed_results oder ident) + for ident, entry in daten_map.items(): + layer = None + style_path = None + + # ----------------------------- + # Layer extrahieren + # ----------------------------- + if isinstance(entry, dict): + layer = entry.get("layer") + style_path = self._resolve_style_path(entry.get("style_path")) + + if layer is None or not hasattr(layer, "isValid") or not layer.isValid(): + pe_err = pruef_ergebnis( + ok=False, + meldung=f"Ungültiger Layer für {ident}", + aktion="save_exception", + kontext={"ident": ident}, + ) + self.pruefmanager.verarbeite(pe_err) + continue + + # ----------------------------- + # Layername bestimmen + # ----------------------------- thema = None for pe in processed_results: try: kontext = getattr(pe, "kontext", None) or {} - if kontext and kontext.get("ident") == ident: + if kontext.get("ident") == ident: thema = kontext.get("thema") break except Exception: continue - if not thema: - thema = str(ident) - layer_name = thema + layer_name_raw = thema or str(ident) + layer_name = re.sub(r"[^A-Za-z0-9_]+", "_", layer_name_raw).strip("_") + if not layer_name: + layer_name = f"layer_{ident}" - # Prüfe, ob Layer bereits existiert in der Ziel-GPKG - layer_exists = False - try: - layer_exists_fn = getattr(qgiscore, "layer_exists_in_gpkg", None) - if callable(layer_exists_fn): - layer_exists = layer_exists_fn(self.gpkg_path, layer_name) - else: - # Fallback: QGIS-Fallback-Check via QgsVectorLayer - if getattr(qgiscore, "QgsVectorLayer", None) is not None and qgiscore.QGIS_AVAILABLE: - uri = f"{self.gpkg_path}|layername={layer_name}" - layer = qgiscore.QgsVectorLayer(uri, layer_name, "ogr") - layer_exists = bool(layer and getattr(layer, "isValid", lambda: False)()) - except Exception: - layer_exists = False - - operation = "created" - - if layer_exists: - # Zentrale Nutzerabfrage über Pruefmanager - # Erwartet Rückgabe: "overwrite" | "append" | "cancel" - try: - user_choice = self.pruefmanager.ask_overwrite_append_cancel(layer_name) - except Exception: - # Fallback: overwrite, falls Pruefmanager nicht verfügbar - user_choice = "overwrite" - - if user_choice == "cancel": - operation = "skipped" - results.append({ - "ident": ident, - "thema": thema, - "operation": operation, - "layer_path": f"{self.gpkg_path}|layername={layer_name}", - "feature_count": 0, - }) - continue - - if user_choice == "overwrite": - write_err = self._write_layer_to_gpkg(layer_name, features, mode="overwrite") - if write_err: - pe_err = pruef_ergebnis( - ok=False, - meldung=f"Fehler beim Überschreiben von {layer_name}: {write_err}", - aktion="save_exception", - kontext={"ident": ident, "thema": thema, "error": write_err}, - ) - self.pruefmanager.verarbeite(pe_err) - operation = "skipped" - results.append({ - "ident": ident, - "thema": thema, - "operation": operation, - "layer_path": f"{self.gpkg_path}|layername={layer_name}", - "feature_count": 0, - }) - continue - else: - operation = "overwritten" - - elif user_choice == "append": - write_err = self._write_layer_to_gpkg(layer_name, features, mode="append") - if write_err: - pe_err = pruef_ergebnis( - ok=False, - meldung=f"Fehler beim Anhängen an {layer_name}: {write_err}", - aktion="save_exception", - kontext={"ident": ident, "thema": thema, "error": write_err}, - ) - self.pruefmanager.verarbeite(pe_err) - operation = "skipped" - results.append({ - "ident": ident, - "thema": thema, - "operation": operation, - "layer_path": f"{self.gpkg_path}|layername={layer_name}", - "feature_count": 0, - }) - continue - else: - operation = "appended" - - else: - # Layer existiert nicht -> neu anlegen - write_err = self._write_layer_to_gpkg(layer_name, features, mode="create") - if write_err: - pe_err = pruef_ergebnis( - ok=False, - meldung=f"Fehler beim Erstellen von {layer_name}: {write_err}", - aktion="save_exception", - kontext={"ident": ident, "thema": thema, "error": write_err}, - ) - self.pruefmanager.verarbeite(pe_err) - operation = "skipped" - results.append({ - "ident": ident, - "thema": thema, - "operation": operation, - "layer_path": f"{self.gpkg_path}|layername={layer_name}", - "feature_count": 0, - }) - continue - else: - operation = "created" - - # Stilbehandlung (falls in processed_results referenziert) - style_written = False - style_path = None - for pe in processed_results: - try: - kontext = getattr(pe, "kontext", None) or {} - if kontext and kontext.get("ident") == ident: - style_path = kontext.get("stildatei") or kontext.get("Stildatei") - break - except Exception: - continue + # Layer in GPKG schreiben + err_msg = self._write_layer_to_gpkg(layer_name=layer_name, layer=layer) + if err_msg is not None: + pe_err = pruef_ergebnis( + ok=False, + meldung=f"Fehler beim Schreiben des Layers {layer_name}: {err_msg}", + aktion="save_exception", + kontext={"ident": ident, "layer_name": layer_name}, + ) + self.pruefmanager.verarbeite(pe_err) + continue + # Wenn der Stil vorhanden und valide ist, als Default in GPKG-Style-Tabelle ablegen if style_path: - if not os.path.isabs(style_path): - base_dir = os.path.dirname(__file__) - style_path = os.path.join(base_dir, style_path) - write_style_fn = getattr(qgiscore, "write_style_to_gpkg", None) - if callable(write_style_fn): - try: - write_style_fn(self.gpkg_path, style_path, layer_name) - style_written = True - except Exception: - style_written = False - - feature_count = len(features) if isinstance(features, list) else 0 + self._store_style_in_gpkg(layer_name, style_path, layer) + # Erfolgsfall: Info für lade_Layer sammeln + layer_path = f"{self.gpkg_path}|layername={layer_name}" results.append({ + "layer_path": layer_path, + "thema": layer_name, "ident": ident, - "thema": thema, - "operation": operation, - "layer_path": f"{self.gpkg_path}|layername={layer_name}", - "feature_count": feature_count, - "style_written": style_written, + "style_path": style_path, }) return results + + # ----------------------------- + # ------------------------------------------------------------------ # # Lade Layer ins Projekt # ------------------------------------------------------------------ # @@ -288,18 +280,33 @@ class Datenschreiber: self.pruefmanager.verarbeite(pe_err) continue - try: - apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None) - if callable(apply_style_fn): - apply_style_fn(self.gpkg_path, layer) - except Exception: - pe_warn = pruef_ergebnis( - ok=True, - meldung=f"Style konnte für {thema} nicht automatisch angewendet werden", - aktion="stil_not_implemented", - kontext={"thema": thema}, - ) - self.pruefmanager.verarbeite(pe_warn) + style_path = info.get("style_path") + resolved_style_path = self._resolve_style_path(style_path) + if resolved_style_path: + try: + layer.loadNamedStyle(resolved_style_path) + layer.triggerRepaint() + except Exception as exc: + pe_warn = pruef_ergebnis( + ok=True, + meldung=f"Style konnte für {thema} nicht geladen werden: {exc}", + aktion="stil_laden_fehlgeschlagen", + kontext={"thema": thema, "style_path": resolved_style_path}, + ) + self.pruefmanager.verarbeite(pe_warn) + else: + try: + apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None) + if callable(apply_style_fn): + apply_style_fn(self.gpkg_path, layer) + except Exception: + pe_warn = pruef_ergebnis( + ok=True, + meldung=f"Style konnte für {thema} nicht automatisch angewendet werden", + aktion="stil_not_implemented", + kontext={"thema": thema}, + ) + self.pruefmanager.verarbeite(pe_warn) try: # qgisui wrapper wird hier nicht direkt für die Abfrage verwendet; @@ -374,62 +381,67 @@ class Datenschreiber: # ------------------------------------------------------------------ # # Hilfsfunktionen intern # ------------------------------------------------------------------ # - def _write_layer_to_gpkg(self, layer_name: str, features: List[Any], mode: str = "create") -> Optional[str]: + def _write_layer_to_gpkg( + self, + layer_name: str, + layer: Any, + ) -> Optional[str]: """ - Interne Hilfsfunktion zum Schreiben eines Layers in das GPKG. + Schreibt einen QgsVectorLayer in die Ziel-GPKG. - Erwartete qgiscore-Funktion: - qgiscore.write_features_to_gpkg(gpkg_path, layer_name, features, mode) + Voraussetzungen: + - self.gpkg_path ist ein str + - layer ist ein gültiger QgsVectorLayer """ - write_fn = getattr(qgiscore, "write_features_to_gpkg", None) - if callable(write_fn): - try: - write_fn(self.gpkg_path, layer_name, features, mode) - return None - except Exception as exc: - return str(exc) - # Fallback: Verwende QgsVectorFileWriter, falls QGIS verfügbar - if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsVectorFileWriter", None) is not None: - try: - # Minimaler Fallback: erwarte, dass 'features' eine Liste von QgsFeature ist - if not features: - # Erstelle leeren Layer-Eintrag (GPKG erlaubt leere Layer) - # Hier vereinfachen wir: writeAsVectorFormatV3 benötigt ein Layer-Objekt. - return None + if layer is None or not hasattr(layer, "isValid") or not layer.isValid(): + return "Ungültiger Layer zum Schreiben übergeben" - # Versuche, ein Memory-Layer aus dem ersten Feature zu ermitteln - first = features[0] - mem_layer = None - if hasattr(first, "fields") and hasattr(first, "geometry"): - # Wenn Features QgsFeature sind, versuchen wir, das zugehörige Layer zu nutzen - try: - mem_layer = first.layer() if hasattr(first, "layer") else None - except Exception: - mem_layer = None + try: + opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions() + opts.driverName = "GPKG" + opts.layerName = layer_name + opts.fileEncoding = "UTF-8" - if mem_layer is None: - return "Keine Feld-/Geometrie-Informationen zum Schreiben vorhanden" + # Style in der GPKG speichern, wenn möglich + if hasattr(opts, "symbologyExport"): + try: + # QGIS: SymbologyExport-Wert z.B. QgsVectorFileWriter.SaveVectorOptions.Symbology + saveOpts = qgiscore.QgsVectorFileWriter.SaveVectorOptions + sym_val = getattr(saveOpts, "Symbology", None) + if sym_val is None: + sym_val = getattr(saveOpts, "SymbologyExport", None) + if sym_val is not None: + opts.symbologyExport = sym_val + except Exception: + pass - opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions() - opts.driverName = "GPKG" - opts.layerName = layer_name - opts.fileEncoding = "UTF-8" - if mode == "overwrite": - opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile - else: - opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer + # Datei existiert → Layer überschreiben + # Datei existiert nicht → neue GPKG anlegen + if not os.path.exists(self.gpkg_path): + opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile + else: + opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer - err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3( - mem_layer, - self.gpkg_path, - qgiscore.QgsProject.instance().transformContext(), - opts - ) - if err != qgiscore.QgsVectorFileWriter.NoError: - return f"Fehler beim Schreiben (Code {err})" - return None - except Exception as exc: - return str(exc) + err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3( + layer, + self.gpkg_path, + qgiscore.QgsProject.instance().transformContext(), + opts, + ) - return "Keine Schreib-Funktion verfügbar (Wrapper nicht implementiert)" + # QGIS ≥3 liefert ein Tupel: (error_code, error_message, new_filename, new_layer_name) + if isinstance(err, tuple): + error_code = err[0] + error_msg = err[1] if len(err) > 1 else "" + else: + error_code = err + error_msg = "" + + if error_code != qgiscore.QgsVectorFileWriter.NoError: + return f"Fehler beim Schreiben (Code {error_code}, msg='{error_msg}')" + + return None + + except Exception as exc: + return str(exc) diff --git a/modules/LayerLoader.py b/modules/LayerLoader.py new file mode 100644 index 0000000..f609fa3 --- /dev/null +++ b/modules/LayerLoader.py @@ -0,0 +1,395 @@ +"""sn_basis/modules/LayerLoader.py + +Kapselt Layer-Erstellung, Raumfilter und Stil-Logik. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +import time + +from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path +from sn_basis.functions.qgiscore_wrapper import ( + QgsVectorLayer, + QgsRasterLayer, + QgsFeatureRequest, + QgsProject, + QgsNetworkAccessManager, + QgsCoordinateTransform, +) +from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists +from sn_basis.modules.stilpruefer import Stilpruefer +from sn_basis.modules.layerpruefer import Layerpruefer +from sn_basis.modules.pruef_ergebnis import pruef_ergebnis +from sn_basis.functions import qt_wrapper as qt + + +class LayerLoader: + """Lädt und filtert Layer aus Dienst-/Datenquellen.""" + + def __init__( + self, + pruefmanager: Any, + stil_pruefer: Optional[Stilpruefer] = None, + layer_pruefer: Optional[Layerpruefer] = None, + ) -> None: + self.pruefmanager = pruefmanager + self.stil_pruefer = stil_pruefer or Stilpruefer() + self.layer_pruefer = layer_pruefer or Layerpruefer() + + _LAYER_TIMEOUT_MS = 30_000 # 30 Sekunden + + def _was_canceled(self, cancel_callback: Optional[Any]) -> bool: + if not callable(cancel_callback): + return False + try: + return bool(cancel_callback()) + except Exception: + return False + + def _process_events(self) -> None: + try: + if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"): + qt.QCoreApplication.processEvents() + except Exception: + pass + + def _transform_geometry_to_layer_crs(self, geometry: Any, source_layer: Any, target_layer: Any) -> Any: + if geometry is None or source_layer is None or target_layer is None: + return geometry + + if QgsCoordinateTransform is None or QgsProject is None: + return geometry + + try: + source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None + target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None + if source_crs is None or target_crs is None: + return geometry + + source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None + target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None + if source_authid and target_authid and source_authid == target_authid: + return geometry + + ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance()) + if hasattr(geometry, "clone") and callable(getattr(geometry, "clone")): + geom_copy = geometry.clone() + else: + geom_copy = geometry + geom_copy.transform(ct) + return geom_copy + except Exception: + return geometry + + def _transform_extent_to_layer_crs(self, extent: Any, source_layer: Any, target_layer: Any) -> Any: + if extent is None or source_layer is None or target_layer is None: + return extent + + if QgsCoordinateTransform is None or QgsProject is None: + return extent + + try: + source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None + target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None + if source_crs is None or target_crs is None: + return extent + + source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None + target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None + if source_authid and target_authid and source_authid == target_authid: + return extent + + ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance()) + if hasattr(ct, "transformBoundingBox"): + return ct.transformBoundingBox(extent) + return extent + except Exception: + return extent + + def create_layer(self, provider: str, link: str, thema: str) -> Optional[QgsVectorLayer]: + provider_lower = provider.lower() if provider else "" + layer = None + + # Netzwerk-Timeout für alle netzwerkbasierten Provider setzen + if provider_lower in ("wfs", "wms", "rest"): + try: + nam = QgsNetworkAccessManager.instance() + if hasattr(nam, "setTimeout"): + nam.setTimeout(self._LAYER_TIMEOUT_MS) + except Exception: + pass + + try: + if provider_lower == "wfs": + uri = link if link.strip().lower().startswith("url=") else f"url={link}" + layer = QgsVectorLayer(uri, thema, "WFS") + elif provider_lower == "wms": + uri = link if link.strip().lower().startswith("url=") else f"url={link}" + layer = QgsRasterLayer(uri, thema, "wms") + elif provider_lower in ("ogr", "gpkg", "shp", "geojson"): + layer = QgsVectorLayer(link, thema, "ogr") + elif provider_lower == "rest": + rest_link = link.strip() + if rest_link.lower().endswith("/featureserver"): + rest_link = rest_link.rstrip("/") + "/0" + uri = rest_link if rest_link.lower().startswith("url=") else f"url={rest_link}" + layer = QgsVectorLayer(uri, thema, "arcgisfeatureserver") + else: + layer = QgsVectorLayer(link, thema, "ogr") + except Exception as exc: + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Fehler beim Erstellen des Layers {thema}: {exc}", + aktion="layer_nicht_verfuegbar", + kontext={"provider": provider, "link": link}, + ) + ) + return None + + if not layer or not layer.isValid(): + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Layer {thema} (Provider={provider}) konnte nicht geladen werden." + ,aktion="layer_nicht_verfuegbar", + kontext={"provider": provider, "link": link}, + ) + ) + return None + + return layer + + def apply_style(self, layer: QgsVectorLayer, style_path: Optional[str]) -> None: + if not style_path or layer is None or not layer.isValid(): + return + + if not style_path.strip(): + return + + if not is_absolute_path(style_path): + plugin_root = get_plugin_root() + style_path = str(join_path(plugin_root, "sn_plan41", "assets", style_path)) + + # normalize path for consistency + style_path = str(normalize_path(style_path)) + + # Debug: welche Stil-Datei wird geprüft? + print(f"[LayerLoader] Überprüfe Stildatei: '{style_path}'") + + if file_exists(style_path): + try: + layer.loadNamedStyle(style_path) + layer.triggerRepaint() + except Exception as exc: + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Fehler beim Stil-Laden für {layer.name()}: {exc}", + aktion="stil_laden_fehlgeschlagen", + kontext={"thema": layer.name(), "style_path": style_path}, + ) + ) + else: + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=True, + meldung=f"Stildatei nicht gefunden (optional): {style_path}", + aktion="stil_nicht_gefunden", + kontext={"thema": layer.name(), "style_path": style_path}, + ) + ) + + def filter_by_extent(self, layer: QgsVectorLayer, extent, cancel_callback: Optional[Any] = None, source_layer: Optional[Any] = None) -> Optional[QgsVectorLayer]: + """Beschneidet auf die rechteckige Ausdehnung . + + Diese Methode verwendet einen einfachen BBOX-Filter. Für komplexere + Raumeinschränkungen (z.B. Verfahrensgebiet) sollte stattdessen + :meth:`filter_by_layer` verwendet werden, da dort echte Geometrie-Tests + stattfinden. + """ + if not layer or not layer.isValid() or extent is None: + return layer + + if layer.type() != QgsVectorLayer.VectorLayer: + return layer + + extent_for_layer = self._transform_extent_to_layer_crs(extent, source_layer, layer) + request = QgsFeatureRequest().setFilterRect(extent_for_layer) + if hasattr(request, "setTimeout"): + try: + request.setTimeout(self._LAYER_TIMEOUT_MS) + except Exception: + pass + + start = time.monotonic() + features: List[Any] = [] + try: + for feat in layer.getFeatures(request): + if self._was_canceled(cancel_callback): + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Abbruch beim Raumfilter (BBOX) für {layer.name()}", + aktion="needs_user_action", + kontext={"thema": layer.name()}, + ) + ) + return None + + elapsed_ms = int((time.monotonic() - start) * 1000) + if elapsed_ms >= self._LAYER_TIMEOUT_MS: + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Timeout beim Raumfilter (BBOX) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s", + aktion="url_nicht_erreichbar", + kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000}, + ) + ) + return None + + features.append(feat) + if len(features) % 100 == 0: + self._process_events() + except Exception as exc: + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Fehler beim Lesen der Features für {layer.name()}: {exc}", + aktion="layer_nicht_verfuegbar", + kontext={"thema": layer.name()}, + ) + ) + return None + + if not features: + return None + + geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"} + geom_type = geom_type_map.get(layer.geometryType(), "Polygon") + uri = f"{geom_type}?crs={layer.crs().authid()}" + filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_bbox", "memory") + if not filtered_layer or not filtered_layer.isValid(): + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}", + aktion="filterlayer_nicht_erzeugt", + kontext={"thema": layer.name()}, + ) + ) + return None + + provider = filtered_layer.dataProvider() + provider.addAttributes(layer.fields()) + filtered_layer.updateFields() + provider.addFeatures(features) + filtered_layer.updateExtents() + + return filtered_layer + + def filter_by_layer(self, layer: QgsVectorLayer, filter_layer: QgsVectorLayer, cancel_callback: Optional[Any] = None) -> Optional[QgsVectorLayer]: + """Beschneidet auf die tatsächliche Geometrie des + . + + Diese Methode wird z.B. für das Verfahrensgebiet verwendet, damit nicht + die gesamte Bounding-Box, sondern nur die echten Flächen als Raumfilter + gelten. Wenn der Filter-Layer mehrere Features enthält, werden deren + Geometrien zu einem Multi-Geom vereinigt. + """ + if not layer or not layer.isValid() or not filter_layer or not filter_layer.isValid(): + return layer + + if layer.type() != QgsVectorLayer.VectorLayer: + return layer + + # vereinigte Geometrie aller Features im Filter-Layer + union_geom = None + for f in filter_layer.getFeatures(): + try: + geom = self._transform_geometry_to_layer_crs(f.geometry(), filter_layer, layer) + if union_geom is None: + union_geom = geom + else: + union_geom = union_geom.combine(geom) + except Exception: + # bei einem Fehler einfach weiterfahren + continue + + if union_geom is None or union_geom.isEmpty(): + return None + + # nun alle Features aus nehmen, deren Geometrie sich schneidet + filtered = [] + request = QgsFeatureRequest().setFilterRect(union_geom.boundingBox()) + if hasattr(request, "setTimeout"): + try: + request.setTimeout(self._LAYER_TIMEOUT_MS) + except Exception: + pass + + start = time.monotonic() + for f in layer.getFeatures(request): + if self._was_canceled(cancel_callback): + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Abbruch beim Raumfilter (Geometrie) für {layer.name()}", + aktion="needs_user_action", + kontext={"thema": layer.name()}, + ) + ) + return None + + elapsed_ms = int((time.monotonic() - start) * 1000) + if elapsed_ms >= self._LAYER_TIMEOUT_MS: + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Timeout beim Raumfilter (Geometrie) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s", + aktion="url_nicht_erreichbar", + kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000}, + ) + ) + return None + + try: + if f.geometry() and f.geometry().intersects(union_geom): + filtered.append(f) + except Exception: + continue + + if len(filtered) % 100 == 0: + self._process_events() + + if not filtered: + return None + + geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"} + geom_type = geom_type_map.get(layer.geometryType(), "Polygon") + uri = f"{geom_type}?crs={layer.crs().authid()}" + filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_filtered", "memory") + if not filtered_layer or not filtered_layer.isValid(): + self.pruefmanager.verarbeite( + pruef_ergebnis( + ok=False, + meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}", + aktion="filterlayer_nicht_erzeugt", + kontext={"thema": layer.name()}, + ) + ) + return None + + provider = filtered_layer.dataProvider() + provider.addAttributes(layer.fields()) + filtered_layer.updateFields() + provider.addFeatures(filtered) + filtered_layer.updateExtents() + + return filtered_layer + + def add_to_project(self, layer: QgsVectorLayer) -> None: + if layer and layer.isValid(): + QgsProject.instance().addMapLayer(layer) diff --git a/modules/Pruefmanager.py b/modules/Pruefmanager.py index 3ea40ec..aa979d4 100644 --- a/modules/Pruefmanager.py +++ b/modules/Pruefmanager.py @@ -5,7 +5,7 @@ sn_basis/modules/Pruefmanager.py from __future__ import annotations from typing import Optional, Any -from sn_basis.functions import ask_yes_no, info, warning, error +from sn_basis.functions import ask_yes_no, info, warning, error, ask_overwrite_append_cancel_custom from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion print("DEBUG: Pruefmanager DATEI GELADEN:", __file__) @@ -60,6 +60,26 @@ class Pruefmanager: # VERFAHRENS-DB-spezifische Entscheidungen # ------------------------------------------------------------------ def _handle_datei_existiert(self, ergebnis: pruef_ergebnis) -> pruef_ergebnis: + """Handhabt das Szenario, dass die Ziel-Verfahrens-DB bereits existiert. + + Zeigt einen einzigen Dialog mit drei Optionen an: + - **Überschreiben**: Bestehende Layer ersetzen (entspricht YES) + - **Anhängen**: Neue Layer zur Datei hinzufügen (entspricht NO) + - **Abbrechen**: Vorgang beenden (entspricht CANCEL) + + Parameters + ---------- + ergebnis : pruef_ergebnis + Eingabe-Ergebnis mit Dateipfad im ``kontext``-Attribut. + + Returns + ------- + pruef_ergebnis + Ergebnis mit Aktion: + - ``datei_existiert_ueberschreiben`` + - ``datei_existiert_anhaengen`` + - ``datei_existiert_ueberspringen`` (für Cancel-Fall) + """ if self.ui_modus != "qgis": return ergebnis @@ -72,48 +92,34 @@ class Pruefmanager: "Was soll geschehen?\n\n" "• **Überschreiben**: Bestehende Layer ersetzen\n" "• **Anhängen**: Neue Layer hinzufügen\n" - "• **Überspringen**: Nur temporäre Layer erzeugen" + "• **Abbrechen**: Vorgang beenden" ) - # Vereinfacht: Erst Überschreiben? → Dann Anhängen? → Überspringen - if ask_yes_no( - titel, - f"{meldung}\n\n**Überschreiben** (alle Layer ersetzen)?", - default=False, - parent=self.parent - ): + # Einzelner Dialog mit drei Optionen + entscheidung = ask_overwrite_append_cancel_custom( + parent=self.parent, + title=titel, + message=meldung + ) + + if entscheidung == "overwrite": return pruef_ergebnis( ok=True, aktion="datei_existiert_ueberschreiben", kontext=ergebnis.kontext, ) - - if ask_yes_no( - titel, - f"{meldung}\n\n**Anhängen** (neue Layer hinzufügen)?", - default=False, - parent=self.parent - ): + elif entscheidung == "append": return pruef_ergebnis( ok=True, aktion="datei_existiert_anhaengen", kontext=ergebnis.kontext, ) - - if ask_yes_no( - titel, - f"{meldung}\n\n**Überspringen** (nur temporäre Layer)?", - default=True, - parent=self.parent - ): + else: # cancel return pruef_ergebnis( ok=True, aktion="datei_existiert_ueberspringen", kontext=ergebnis.kontext, ) - - return ergebnis - # ------------------------------------------------------------------ # Basis-Entscheidungen (KORREKT: → pruef_ergebnis) # ------------------------------------------------------------------ @@ -210,3 +216,26 @@ class Pruefmanager: ) print("🔥 verarbeite() ENDE mit ok=False") return ergebnis + + def _ask_use_or_replace_pufferlayer(self) -> str: + """ + Fragt den Nutzer, ob ein vorhandener Pufferlayer verwendet + oder ersetzt werden soll. + + Returns + ------- + str + "verwenden", "ersetzen" oder "abbrechen" + """ + ergebnis = pruef_ergebnis( + ok=False, + aktion="layer_existiert", + meldung="Ein Pufferlayer ist bereits vorhanden.", + ) + + ergebnis = self.pruefmanager.verarbeite(ergebnis) + + if not ergebnis.ok: + return "abbrechen" + + return "verwenden" if ergebnis.aktion == "ok" else "ersetzen" diff --git a/modules/linkpruefer.py b/modules/linkpruefer.py index a94e863..1033630 100644 --- a/modules/linkpruefer.py +++ b/modules/linkpruefer.py @@ -61,7 +61,8 @@ class Linkpruefer: aktion="leer", kontext=None, ) - + #evtl. Pfad-Objekte in string umwandeln + eingabe = str(eingabe) # ----------------------------------------------------- # 1. Fall: URL # ----------------------------------------------------- diff --git a/modules/pruef_ergebnis.py b/modules/pruef_ergebnis.py index 78eb423..6849351 100644 --- a/modules/pruef_ergebnis.py +++ b/modules/pruef_ergebnis.py @@ -43,6 +43,13 @@ PruefAktion = Literal[ # Dateiendung/Format "falsche_endung", "pflichtfelder_fehlen", + "unbekannter_dateityp", + "Datenbank", + "dienst", + "excel", + "unbekannte_quelle", + + # Excel/Import "kein_header", @@ -50,6 +57,8 @@ PruefAktion = Literal[ "read_error", "open_error", "datenabruf", + + # 🆕 VERFAHRENS-DB SPEZIFISCH (deine Anforderungen 2.d, 2.e) "datei_wird_erzeugt", # 2.d: Pfad gültig, Datei fehlt → weiter diff --git a/modules/stilpruefer.py b/modules/stilpruefer.py index db9312a..f8b743d 100644 --- a/modules/stilpruefer.py +++ b/modules/stilpruefer.py @@ -4,9 +4,10 @@ Prüft ausschließlich, ob ein Stilpfad gültig ist. Die Anwendung erfolgt später über eine Aktion. """ -from pathlib import Path +import os -from sn_basis.functions import file_exists +from sn_basis.functions.os_wrapper import is_absolute_path +from sn_basis.functions.sys_wrapper import get_plugin_root, file_exists, join_path from sn_basis.modules.pruef_ergebnis import pruef_ergebnis @@ -40,7 +41,11 @@ class Stilpruefer: kontext=None, ) - pfad = Path(stil_pfad) + pfad = str(stil_pfad) + + if not is_absolute_path(pfad): + plugin_root = get_plugin_root() + pfad = str(join_path(plugin_root, "sn_plan41", "assets", pfad)) # ----------------------------------------------------- # 2. Datei existiert nicht @@ -56,7 +61,7 @@ class Stilpruefer: # ----------------------------------------------------- # 3. Falsche Endung # ----------------------------------------------------- - if pfad.suffix.lower() != ".qml": + if os.path.splitext(pfad)[1].lower() != ".qml": return pruef_ergebnis( ok=False, meldung="Die Stil-Datei muss die Endung '.qml' haben.", diff --git a/plugin.info b/plugin.info new file mode 100644 index 0000000..4235a7f --- /dev/null +++ b/plugin.info @@ -0,0 +1,11 @@ +name=LNO Sachsen | Basisfunktionen +description=Plugin mit Basisfunktionen +author=Daniel Helbig +email=daniel.helbig@kreis-meissen.de +qgisMinimumVersion=3.0 +qgisMaximumVersion=4.99 +deprecated=False +experimental=true +supportsQt6=Yes + +zip_folder=sn_basis \ No newline at end of file