33 Commits

Author SHA1 Message Date
2ff465b86d Merge pull request 'feture/Druck_tab' (#11) from feture/Druck_tab into unstable
Reviewed-on: #11
2026-03-20 22:58:08 +01:00
f19fe71bfa Ergänzungen in de Wrappern/ Prüfmanager für Layouts 2026-03-20 14:01:57 +01:00
ae5f88c5b8 Dialoge für Vorlagen-Piepeline ergänzt 2026-03-20 12:42:21 +01:00
7cd6e3ef24 checkbox im qt_wrapper ergänzt 2026-03-20 12:01:16 +01:00
1be1420f66 changelog.txt aktualisiert 2026-03-19 06:44:35 +01:00
f25e30c489 Release 26.3.6-unstable 2026-03-19 05:32:02 +00:00
0eb32453d6 changelog.txt aktualisiert
All checks were successful
Release Plugin / release (push) Successful in 3s
2026-03-19 06:31:36 +01:00
841b529ad8 Release 26.3.5-unstable 2026-03-19 05:30:39 +00:00
ae5725cd03 Release 26.3.4-unstable
All checks were successful
Release Plugin / release (push) Successful in 3s
2026-03-19 05:23:20 +00:00
ac5a3993c8 Release 26.3.3-unstable
All checks were successful
Release Plugin / release (push) Successful in 7s
2026-03-18 14:34:41 +00:00
22b45fe19a Release 26.3.2-unstable
All checks were successful
Release Plugin / release (push) Successful in 3s
2026-03-18 14:18:14 +00:00
Michael Otto
24c2137dc2 Auf neuen Release Workflow umgestellt
All checks were successful
Release Plugin / release (push) Successful in 3s
2026-03-18 15:13:37 +01:00
Michael Otto
c0c0387b1d Änderungen an plugin.cfg
All checks were successful
Release Plugin / release (push) Successful in 7s
2026-03-13 13:58:33 +01:00
Michael Otto
663ca770a1 Schritt zu 'plugin.cfg einlesen' umbenannt und Schleife angepasst, um letzte Zeile ohne Newline zu lesen 2026-03-13 12:08:32 +01:00
Michael Otto
04319b6f7b plugin.info->plugin.cfg
Some checks failed
Release Plugin / release (push) Failing after 5s
2026-03-13 12:06:34 +01:00
Michael Otto
1c70d62739 plugin.info einlesen: echo eingefügt
Some checks failed
Release Plugin / release (push) Has been cancelled
2026-03-13 12:04:32 +01:00
Michael Otto
3971bd3408 plugin.info einlesen angepasst 2026-03-13 12:02:39 +01:00
Michael Otto
fa04fc80e3 ZIP Erstellung auf Original zurückgesetzt
Some checks failed
Release Plugin / release (push) Has been cancelled
2026-03-13 11:30:11 +01:00
Michael Otto
04bdfbe9d8 Fallback für ZIP_FOLDER hinzugefügt: Wenn leer, auf sn_basis setzen 2026-03-13 11:27:18 +01:00
Michael Otto
b6b791e5bd Behebung des Parsens von plugin.info: Robuste Schleife für Outputs
Some checks failed
Release Plugin / release (push) Failing after 5s
2026-03-13 11:24:36 +01:00
Michael Otto
82be564c29 Verbesserung des ZIP-Erstellungsprozesses: Debugging hinzugefügt und Warnungen bei /dev-Dateien durch Behandlung symbolischer Links behoben
Some checks failed
Release Plugin / release (push) Failing after 5s
2026-03-13 11:21:38 +01:00
Michael Otto
f42260b66c ZIP Erstellung geändert um im Runner genauer zu sehen wo es zu Problemen gekommen ist
Some checks failed
Release Plugin / release (push) Has been cancelled
2026-03-13 11:16:46 +01:00
Michael Otto
327c25388f metadata.txt gelöscht, changelog.txt eingefügt
Some checks failed
Release Plugin / release (push) Has been cancelled
2026-03-13 11:06:58 +01:00
Michael Otto
c6c9613120 Update plugin.info based on metadata.txt
Some checks failed
Release Plugin / release (push) Failing after 3s
2026-03-13 08:39:24 +01:00
6e1f4c615b Merge pull request 'daniel@feature/dataGrabber_anbinden' (#9) from Daniel/Plugin_SN_Basis:daniel@feature/dataGrabber_anbinden into unstable
Reviewed-on: #9
2026-03-13 06:51:56 +01:00
f876218134 plugin.info hinzugefügt 2026-03-13 06:45:55 +01:00
9829ac9c81 Diensteabruf integriert 2026-03-12 16:14:02 +01:00
ae956b0046 Überarbeitung für Pufferlayer-Fachdaten laden und gpkg-speichern/laden 2026-03-11 20:56:02 +01:00
0ec24029d8 fix QGIS 4.0-Kompatibilität;
Linkpruefer-Eingabe als String normalisiert, falls Paf-Objekte übergeben werden
2026-03-11 12:38:48 +01:00
26f426dfcd Merge pull request 'dev' (#8) from Daniel/Plugin_SN_Basis:dev into main, Basisfunktion für sn_Verfahrensgebiet
Reviewed-on: #8
2026-03-06 10:34:43 +01:00
5dc8412a6a Imports für sn_Verfahrensgebiet ergänzt, Stilprüfer wird jetzt auch für apply_style verwendet 2026-03-06 10:20:40 +01:00
Michael Otto
00f800b1e6 metadata.txt gelöscht, plugin.info eingefügt, workflow hinzugefügt 2026-03-05 16:02:03 +01:00
137baaf19c .gitea/workflows/release.yaml hinzugefügt
All checks were successful
Automatisches Release mit ZIP-Archiv / Build-Release (push) Successful in 3s
2026-03-01 13:36:27 +01:00
20 changed files with 1949 additions and 363 deletions

View File

@@ -0,0 +1,133 @@
name: Release Plugin
run-name: "Release | ${{ github.ref_name }}"
on:
push:
tags:
- 'v*'
jobs:
release:
runs-on: alpine-latest
defaults:
run:
shell: bash
steps:
- name: Notwendige Abhängigkeiten installieren
shell: sh
run: |
apk add --no-cache bash git jq curl
- name: Code holen
run: |
REPO_URL="https://${RELEASE_TOKEN}:x-oauth-basic@${{ vars.RELEASE_URL }}/${GITHUB_REPOSITORY}.git"
git clone "$REPO_URL" repo
cd repo
git checkout "$TAG"
env:
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
TAG: "${{ github.ref_name }}"
- name: Version und Kanal bestimmen
id: releaseinfo
run: |
TAG="${{ github.ref_name }}"
RAW_VERSION="${TAG#v}"
VERSION="${RAW_VERSION%%-*}"
# Channel und Suffix automatisch bestimmen anhand des Tag-Suffix
case "$RAW_VERSION" in
*-testing*|*-t|*-T)
CHANNEL="testing"
PRERELEASE="true"
SUFFIX="-testing"
;;
*-unstable*|*-u|*-U)
CHANNEL="unstable"
PRERELEASE="true"
SUFFIX="-unstable"
;;
*)
CHANNEL="stable"
PRERELEASE="false"
SUFFIX=""
;;
esac
# Output setzen
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "channel=$CHANNEL" >> $GITHUB_OUTPUT
echo "prerelease=$PRERELEASE" >> $GITHUB_OUTPUT
# Optional Debug
echo "VERSION=$VERSION"
echo "CHANNEL=$CHANNEL"
echo "PRERELEASE=$PRERELEASE"
# - name: plugin.cfg einlesen
# id: config
# run: |
# cd repo
# while read -r line || [ -n "$line" ]; do
# key="${line%%=*}"
# value="${line#*=}"
# echo "$key=$value" >> $GITHUB_OUTPUT
# done < plugin.cfg
- name: Payload erzeugen
id: payload
run: |
cd repo
NAME="${GITHUB_REPOSITORY##*/}"
GROUP="${GITHUB_REPOSITORY%%/*}"
VERSION="${{ steps.releaseinfo.outputs.version }}"
CHANNEL="${{ steps.releaseinfo.outputs.channel }}"
PRERELEASE="${{ steps.releaseinfo.outputs.prerelease }}"
ZIP_FOLDER="${{ vars.ZIP_FOLDER }}"
ZIP_FILE="${ZIP_FOLDER}.zip"
TAG="${{ github.ref_name }}"
#GIT_URL=${GITHUB_REPOSITORY}
jq -n \
--arg name "$NAME" \
--arg group "$GROUP" \
--arg version "$VERSION" \
--arg channel "$CHANNEL" \
--arg prerelease "$PRERELEASE" \
--arg zip_folder "$ZIP_FOLDER" \
--arg zip_file "$ZIP_FILE" \
--arg tag "$TAG" \
'{
name: $name,
group: $group,
version: $version,
channel: $channel,
prerelease: ($prerelease == "true"),
zip_folder: $zip_folder,
zip_file: $zip_file,
tag: $tag
}' > payload.json
cat payload.json
- name: Repository aktualisieren
run: |
NAME="${GITHUB_REPOSITORY##*/}"
TAG="${{ steps.releaseinfo.outputs.version }}"-"${{ steps.releaseinfo.outputs.channel }}"
PAYLOAD_B64=$(base64 -w0 repo/payload.json)
JSON="{\"ref\":\"hidden/workflows\",\"inputs\":{\"payload\":\"$PAYLOAD_B64\",\"name\":\"$NAME\",\"tag\":\"$TAG\"}}"
curl -X POST \
-H "Authorization: token ${{ secrets.RELEASE_TOKEN }}" \
-H "Content-Type: application/json" \
-d "$JSON" \
"https://${{ vars.RELEASE_URL }}/api/v1/repos/${OWNER}/Repository/actions/workflows/${WORKFLOW}/dispatches"
env:
RELEASE_TOKEN: ${{ secrets.RELEASE_TOKEN }}
OWNER: "AG_QGIS"
WORKFLOW: "release.yaml"

0
changelog.txt Normal file
View File

View File

@@ -15,7 +15,7 @@ from .ly_metadata_wrapper import (
is_layer_editable, is_layer_editable,
) )
from .ly_style_wrapper import apply_style from .ly_style_wrapper import apply_style
from .dialog_wrapper import ask_yes_no from .dialog_wrapper import ask_yes_no, ask_overwrite_append_cancel_custom
from .message_wrapper import ( from .message_wrapper import (
_get_message_bar, _get_message_bar,

View File

@@ -2,8 +2,10 @@
sn_basis/functions/dialog_wrapper.py Benutzer-Dialoge (Qt5/6/Mock-kompatibel) sn_basis/functions/dialog_wrapper.py Benutzer-Dialoge (Qt5/6/Mock-kompatibel)
""" """
from typing import Any from typing import Any
from typing import Literal, Optional
from sn_basis.functions.qt_wrapper import ( from sn_basis.functions.qt_wrapper import (
QMessageBox, YES, NO, QT_VERSION QMessageBox, YES, NO, CANCEL, QT_VERSION, exec_dialog, ICON_QUESTION,
QProgressDialog, QCoreApplication, Qt, QInputDialog, QLineEdit,
) )
def ask_yes_no( def ask_yes_no(
@@ -35,3 +37,190 @@ def ask_yes_no(
except Exception as e: except Exception as e:
print(f"⚠️ ask_yes_no Fehler: {e}") print(f"⚠️ ask_yes_no Fehler: {e}")
return default return default
def show_info_dialog(title: str, message: str, parent: Any = None) -> None:
"""
Zeigt einen modalen Info-Dialog mit OK-Button.
Blockiert bis der Nutzer bestätigt.
"""
try:
if QT_VERSION == 0: # Mock-Modus
print(f"Mock-Modus: show_info_dialog('{title}')")
return
QMessageBox.information(parent, title, message)
except Exception as e:
print(f"⚠️ show_info_dialog Fehler: {e}")
def ask_text(
title: str,
label: str,
default_text: str = "",
parent: Any = None,
) -> tuple[str, bool]:
"""Zeigt einen modalen Texteingabe-Dialog und gibt Text + OK-Status zurück."""
try:
if QT_VERSION == 0: # Mock-Modus
print(f"Mock-Modus: ask_text('{title}') -> '{default_text}'")
return default_text, True
# PyQt6: QLineEdit.EchoMode.Normal / PyQt5: QLineEdit.Normal
echo_mode = (
getattr(QLineEdit, "Normal", None)
or getattr(getattr(QLineEdit, "EchoMode", None), "Normal", None)
or 0
)
text, accepted = QInputDialog.getText(
parent,
title,
label,
echo_mode,
default_text,
)
return str(text or ""), bool(accepted)
except Exception as e:
print(f"⚠️ ask_text Fehler: {e}")
return default_text, False
OverwriteDecision = Optional[Literal["overwrite", "append", "cancel"]]
def ask_overwrite_append_cancel_custom(
parent,
title: str,
message: str,
) -> Literal["overwrite", "append", "cancel"]:
"""Zeigt Dialog mit benutzerdefinierten Buttons: Überschreiben/Anhängen/Abbrechen.
Parameters
----------
parent :
Eltern-Widget oder None.
title : str
Dialog-Titel.
message : str
Hauptmeldung mit Erklärung.
Returns
-------
Literal["overwrite", "append", "cancel"]
Genaue Entscheidung des Nutzers.
"""
msg = QMessageBox(parent)
msg.setIcon(ICON_QUESTION)
msg.setWindowTitle(title)
msg.setText(message)
# Eigene Buttons mit exakten Texten
overwrite_btn = msg.addButton("Überschreiben", QMessageBox.ButtonRole.AcceptRole)
append_btn = msg.addButton("Anhängen", QMessageBox.ButtonRole.ActionRole)
cancel_btn = msg.addButton("Abbrechen", QMessageBox.ButtonRole.RejectRole)
exec_dialog(msg)
clicked = msg.clickedButton()
if clicked == overwrite_btn:
return "overwrite"
elif clicked == append_btn:
return "append"
else: # cancel_btn
return "cancel"
class ProgressDialog:
def __init__(self, total: int, title: str = "Fortschritt", label: str = "Verarbeite..."):
self.total = max(total, 1)
self._canceled = False
if QT_VERSION == 0:
self.value = 0
self.label = label
self.title = title
return
self._dlg = QProgressDialog(label, "Abbrechen", 0, self.total)
self._dlg.setWindowTitle(title)
# Qt5 vs Qt6: WindowModality-Enum unterschiedlich verfügbar
modality = None
if hasattr(Qt, "WindowModality"):
try:
modality = Qt.WindowModality.WindowModal
except Exception:
modality = None
if modality is None and hasattr(Qt, "WindowModal"):
modality = Qt.WindowModal
if modality is not None:
try:
self._dlg.setWindowModality(modality)
except Exception:
pass
self._dlg.setMinimumDuration(0)
self._dlg.setAutoClose(False)
self._dlg.setAutoReset(False)
self._dlg.setValue(0)
def on_cancel():
if self._dlg and self._dlg.value() >= self.total:
# OK-Button am Ende
self._dlg.close()
return
self._canceled = True
self._dlg.close()
try:
self._dlg.canceled.connect(on_cancel)
except Exception:
pass
def set_total(self, total: int) -> None:
self.total = max(total, 1)
if QT_VERSION == 0:
return
if self._dlg is not None:
self._dlg.setMaximum(self.total)
def set_value(self, value: int) -> None:
if QT_VERSION == 0:
self.value = value
return
if self._dlg is not None:
self._dlg.setValue(min(value, self.total))
if value >= self.total:
self._dlg.setLabelText("Fertig. Klicken Sie auf OK, um das Fenster zu schließen.")
self._dlg.setCancelButtonText("OK")
QCoreApplication.processEvents()
def set_label(self, text: str) -> None:
if QT_VERSION == 0:
self.label = text
return
if self._dlg is not None:
self._dlg.setLabelText(text)
QCoreApplication.processEvents()
def is_canceled(self) -> bool:
if QT_VERSION == 0:
return self._canceled
if self._dlg is not None:
return self._canceled or self._dlg.wasCanceled()
return self._canceled
def close(self) -> None:
if QT_VERSION == 0:
return
if self._dlg is not None:
self._dlg.close()
def create_progress_dialog(total: int, title: str = "Fortschritt", label: str = "Verarbeite...") -> ProgressDialog:
return ProgressDialog(total, title, label)

View File

@@ -1,23 +1,44 @@
# sn_basis/functions/ly_style_wrapper.py # sn_basis/functions/ly_style_wrapper.py
from sn_basis.functions.ly_existence_wrapper import layer_exists from sn_basis.functions.ly_existence_wrapper import layer_exists
from sn_basis.functions.sys_wrapper import ( from sn_basis.functions.sys_wrapper import get_plugin_root, join_path
get_plugin_root, from sn_basis.modules.stilpruefer import Stilpruefer
join_path, from typing import Optional
file_exists,
)
def apply_style(layer, style_name: str) -> bool: def apply_style(layer, style_name: str) -> bool:
"""
Wendet einen Layerstil an, sofern er gültig ist.
- Validierung erfolgt ausschließlich über Stilpruefer
- Keine eigenen Dateisystem- oder Endungsprüfungen
- Keine Seiteneffekte bei ungültigem Stil
"""
print(">>> apply_style() START")
if not layer_exists(layer): if not layer_exists(layer):
return False return False
style_path = join_path(get_plugin_root(), "styles", style_name) # Stilpfad zusammensetzen
if not file_exists(style_path): style_path = join_path(get_plugin_root(), "sn_verfahrensgebiet","styles", style_name)
# Stil prüfen
pruefer = Stilpruefer()
ergebnis = pruefer.pruefe(style_path)
print(">>> Stilprüfung:", ergebnis)
print(
f"[Stilprüfung] ok={ergebnis.ok} | "
f"aktion={ergebnis.aktion} | "
f"meldung={ergebnis.meldung}"
)
if not ergebnis.ok:
return False return False
# Stil anwenden
try: try:
ok, _ = layer.loadNamedStyle(style_path) ok, _ = layer.loadNamedStyle(str(ergebnis.kontext))
if ok: if ok:
getattr(layer, "triggerRepaint", lambda: None)() getattr(layer, "triggerRepaint", lambda: None)()
return True return True

View File

@@ -57,6 +57,22 @@ def get_home_dir() -> Path:
return Path.home() return Path.home()
def is_absolute_path(path: _PathLike) -> bool:
"""Prüft, ob ein Pfad absolut ist."""
try:
return Path(path).is_absolute()
except Exception:
return False
def basename(path: _PathLike) -> str:
"""Gibt den finalen Namen des Pfades zurück (Dateiname oder Ordner)."""
try:
return Path(path).name
except Exception:
return ""
# --------------------------------------------------------- # ---------------------------------------------------------
# Dateisystem-Eigenschaften # Dateisystem-Eigenschaften
# --------------------------------------------------------- # ---------------------------------------------------------
@@ -75,3 +91,11 @@ def is_case_sensitive_fs() -> bool:
# Linux praktisch immer case-sensitiv # Linux praktisch immer case-sensitiv
return True return True
def path_suffix(path: _PathLike) -> str:
"""Gibt die Dateiendung eines Pfades zurück (inklusive Punkt)."""
try:
return Path(path).suffix
except Exception:
return ""

View File

@@ -20,6 +20,19 @@ QgsNetworkAccessManager: Type[Any]
Qgis: Type[Any] Qgis: Type[Any]
QgsMapLayerProxyModel: Type[Any] QgsMapLayerProxyModel: Type[Any]
QgsVectorFileWriter: Type[Any] # neu: Schreib-API QgsVectorFileWriter: Type[Any] # neu: Schreib-API
QgsFeature: Type[Any]
QgsField: Type[Any]
QgsGeometry: Type[Any]
QgsFeatureRequest: Type[Any]
QgsCoordinateTransform: Type[Any]
QgsCoordinateReferenceSystem: Type[Any]
QgsPrintLayout: Type[Any]
QgsLayoutItemMap: Type[Any]
QgsLayoutItemLabel: Type[Any]
QgsLayoutPoint: Type[Any]
QgsLayoutSize: Type[Any]
QgsUnitTypes: Type[Any]
QgsLayoutItem: Type[Any]
QGIS_AVAILABLE = False QGIS_AVAILABLE = False
@@ -36,6 +49,19 @@ try:
Qgis as _Qgis, Qgis as _Qgis,
QgsMapLayerProxyModel as _QgsMaplLayerProxyModel, QgsMapLayerProxyModel as _QgsMaplLayerProxyModel,
QgsVectorFileWriter as _QgsVectorFileWriter, QgsVectorFileWriter as _QgsVectorFileWriter,
QgsFeature as _QgsFeature,
QgsField as _QgsField,
QgsGeometry as _QgsGeometry,
QgsFeatureRequest as _QgsFeatureRequest,
QgsCoordinateTransform as _QgsCoordinateTransform,
QgsCoordinateReferenceSystem as _QgsCoordinateReferenceSystem,
QgsPrintLayout as _QgsPrintLayout,
QgsLayoutItemMap as _QgsLayoutItemMap,
QgsLayoutItemLabel as _QgsLayoutItemLabel,
QgsLayoutPoint as _QgsLayoutPoint,
QgsLayoutSize as _QgsLayoutSize,
QgsUnitTypes as _QgsUnitTypes,
QgsLayoutItem as _QgsLayoutItem,
) )
QgsProject = _QgsProject QgsProject = _QgsProject
@@ -45,6 +71,19 @@ try:
Qgis = _Qgis Qgis = _Qgis
QgsMapLayerProxyModel = _QgsMaplLayerProxyModel QgsMapLayerProxyModel = _QgsMaplLayerProxyModel
QgsVectorFileWriter = _QgsVectorFileWriter QgsVectorFileWriter = _QgsVectorFileWriter
QgsFeature = _QgsFeature
QgsField = _QgsField
QgsGeometry = _QgsGeometry
QgsFeatureRequest = _QgsFeatureRequest
QgsCoordinateTransform = _QgsCoordinateTransform
QgsCoordinateReferenceSystem = _QgsCoordinateReferenceSystem
QgsPrintLayout = _QgsPrintLayout
QgsLayoutItemMap = _QgsLayoutItemMap
QgsLayoutItemLabel = _QgsLayoutItemLabel
QgsLayoutPoint = _QgsLayoutPoint
QgsLayoutSize = _QgsLayoutSize
QgsUnitTypes = _QgsUnitTypes
QgsLayoutItem = _QgsLayoutItem
QGIS_AVAILABLE = True QGIS_AVAILABLE = True
@@ -55,9 +94,17 @@ try:
except Exception: except Exception:
QGIS_AVAILABLE = False QGIS_AVAILABLE = False
class _MockLayoutManager:
def layoutByName(self, name: str):
return None
def addLayout(self, layout: Any) -> bool:
return True
class _MockQgsProject: class _MockQgsProject:
def __init__(self): def __init__(self):
self._variables = {} self._variables = {}
self._layout_manager = _MockLayoutManager()
@staticmethod @staticmethod
def instance() -> "_MockQgsProject": def instance() -> "_MockQgsProject":
@@ -66,6 +113,9 @@ except Exception:
def read(self) -> bool: def read(self) -> bool:
return True return True
def layoutManager(self):
return self._layout_manager
QgsProject = _MockQgsProject QgsProject = _MockQgsProject
class _MockQgsVectorLayer: class _MockQgsVectorLayer:
@@ -116,6 +166,134 @@ except Exception:
QgsRasterLayer = _MockQgsRasterLayer QgsRasterLayer = _MockQgsRasterLayer
class _MockQgsPrintLayout:
def __init__(self, project: Any):
self.project = project
self._name = ""
self._page = _MockQgsLayoutPage()
def initializeDefaults(self) -> None:
pass
def setName(self, name: str) -> None:
self._name = name
def pageCollection(self):
return self
def page(self, index: int):
return self._page
def addLayoutItem(self, item: Any) -> None:
pass
class _MockQgsLayoutPage:
def setPageSize(self, size: Any) -> None:
self.size = size
class _MockQgsLayoutItem:
class ReferencePoint:
LowerLeft = 0
class _MockQgsLayoutItemMap:
def __init__(self, layout: Any):
self.layout = layout
def setId(self, item_id: str) -> None:
pass
def setExtent(self, extent: Any) -> None:
pass
def setScale(self, scale: float) -> None:
pass
def attemptMove(self, point: Any) -> None:
pass
def attemptResize(self, size: Any) -> None:
pass
def setFollowVisibilityPreset(self, active: bool) -> None:
pass
def setFollowVisibilityPresetName(self, name: str) -> None:
pass
class _MockQgsLayoutItemLabel:
ModeHtml = 1
def __init__(self, layout: Any):
self.layout = layout
def setId(self, item_id: str) -> None:
pass
def setText(self, text: str) -> None:
pass
def setMode(self, mode: Any) -> None:
pass
def setFont(self, font: Any) -> None:
pass
def setReferencePoint(self, point: Any) -> None:
pass
def attemptMove(self, point: Any) -> None:
pass
def attemptResize(self, size: Any) -> None:
pass
class _MockQgsLayoutPoint:
def __init__(self, x: float, y: float, unit: Any):
self.x = x
self.y = y
self.unit = unit
class _MockQgsLayoutSize:
def __init__(self, width: float, height: float, unit: Any):
self.width = width
self.height = height
self.unit = unit
class _MockQgsUnitTypes:
LayoutMillimeters = 0
QgsPrintLayout = _MockQgsPrintLayout
QgsLayoutItemMap = _MockQgsLayoutItemMap
QgsLayoutItemLabel = _MockQgsLayoutItemLabel
QgsLayoutPoint = _MockQgsLayoutPoint
QgsLayoutSize = _MockQgsLayoutSize
QgsUnitTypes = _MockQgsUnitTypes
QgsLayoutItem = _MockQgsLayoutItem
class _MockQgsFeatureRequest:
def __init__(self):
self._filter_rect = None
def setFilterRect(self, rect):
self._filter_rect = rect
return self
QgsFeatureRequest = _MockQgsFeatureRequest
class _MockQgsCoordinateTransform:
def __init__(self, *args, **kwargs):
pass
def transformBoundingBox(self, rect):
return rect
class _MockQgsCoordinateReferenceSystem:
def __init__(self, *args, **kwargs):
pass
QgsCoordinateTransform = _MockQgsCoordinateTransform
QgsCoordinateReferenceSystem = _MockQgsCoordinateReferenceSystem
QgsNetworkAccessManager = _MockQgsNetworkAccessManager QgsNetworkAccessManager = _MockQgsNetworkAccessManager
class _MockQgis: class _MockQgis:

View File

@@ -76,6 +76,9 @@ except Exception:
def removeToolBar(self, *args, **kwargs): def removeToolBar(self, *args, **kwargs):
pass pass
def openLayoutDesigner(self, layout):
return layout
iface = _MockIface() iface = _MockIface()
class _MockQgsFileWidget: class _MockQgsFileWidget:
@@ -132,6 +135,13 @@ def get_main_window():
return None return None
def open_layout_designer(layout: Any) -> Any:
try:
return iface.openLayoutDesigner(layout)
except Exception:
return None
# --------------------------------------------------------- # ---------------------------------------------------------
# Dock-Handling # Dock-Handling
# --------------------------------------------------------- # ---------------------------------------------------------

View File

@@ -10,12 +10,17 @@ YES: Optional[Any] = None
NO: Optional[Any] = None NO: Optional[Any] = None
CANCEL: Optional[Any] = None CANCEL: Optional[Any] = None
ICON_QUESTION: Optional[Any] = None ICON_QUESTION: Optional[Any] = None
QVariant: Type[Any] = object
# Qt-Klassen (werden dynamisch gesetzt) # Qt-Klassen (werden dynamisch gesetzt)
QDockWidget: Type[Any] = object QDockWidget: Type[Any] = object
QMessageBox: Type[Any] = object QMessageBox: Type[Any] = object
QFileDialog: Type[Any] = object QFileDialog: Type[Any] = object
QProgressDialog: Type[Any] = object
QEventLoop: Type[Any] = object QEventLoop: Type[Any] = object
QTimer: Type[Any] = object
QUrl: Type[Any] = object QUrl: Type[Any] = object
QNetworkRequest: Type[Any] = object QNetworkRequest: Type[Any] = object
QNetworkReply: Type[Any] = object QNetworkReply: Type[Any] = object
@@ -24,6 +29,7 @@ QWidget: Type[Any] = object
QGridLayout: Type[Any] = object QGridLayout: Type[Any] = object
QLabel: Type[Any] = object QLabel: Type[Any] = object
QLineEdit: Type[Any] = object QLineEdit: Type[Any] = object
QInputDialog: Type[Any] = object
QGroupBox: Type[Any] = object QGroupBox: Type[Any] = object
QVBoxLayout: Type[Any] = object QVBoxLayout: Type[Any] = object
QPushButton: Type[Any] = object QPushButton: Type[Any] = object
@@ -36,6 +42,10 @@ QToolButton: Type[Any] = object
QSizePolicy: Type[Any] = object QSizePolicy: Type[Any] = object
Qt: Type[Any] = object Qt: Type[Any] = object
QComboBox: Type[Any] = object QComboBox: Type[Any] = object
QCheckBox: Type[Any] = object
QHBoxLayout: Type[Any] = object
QFont: Type[Any] = object
def exec_dialog(dialog: Any) -> Any: def exec_dialog(dialog: Any) -> Any:
"""Führt Dialog modal aus (Qt6: exec(), Qt5: exec_(), Mock: YES)""" """Führt Dialog modal aus (Qt6: exec(), Qt5: exec_(), Mock: YES)"""
@@ -61,10 +71,12 @@ try:
from qgis.PyQt.QtWidgets import ( from qgis.PyQt.QtWidgets import (
QMessageBox as _QMessageBox, QMessageBox as _QMessageBox,
QFileDialog as _QFileDialog, QFileDialog as _QFileDialog,
QProgressDialog as _QProgressDialog,
QWidget as _QWidget, QWidget as _QWidget,
QGridLayout as _QGridLayout, QGridLayout as _QGridLayout,
QLabel as _QLabel, QLabel as _QLabel,
QLineEdit as _QLineEdit, QLineEdit as _QLineEdit,
QInputDialog as _QInputDialog,
QGroupBox as _QGroupBox, QGroupBox as _QGroupBox,
QVBoxLayout as _QVBoxLayout, QVBoxLayout as _QVBoxLayout,
QPushButton as _QPushButton, QPushButton as _QPushButton,
@@ -77,12 +89,17 @@ try:
QToolButton as _QToolButton, QToolButton as _QToolButton,
QSizePolicy as _QSizePolicy, QSizePolicy as _QSizePolicy,
QComboBox as _QComboBox, QComboBox as _QComboBox,
QCheckBox as _QCheckBox,
QHBoxLayout as _QHBoxLayout,
) )
from qgis.PyQt.QtGui import QFont as _QFont
from qgis.PyQt.QtCore import ( from qgis.PyQt.QtCore import (
QEventLoop as _QEventLoop, QEventLoop as _QEventLoop,
QTimer as _QTimer,
QUrl as _QUrl, QUrl as _QUrl,
QCoreApplication as _QCoreApplication, QCoreApplication as _QCoreApplication,
Qt as _Qt, Qt as _Qt,
QVariant as _QVariant
) )
from qgis.PyQt.QtNetwork import ( from qgis.PyQt.QtNetwork import (
QNetworkRequest as _QNetworkRequest, QNetworkRequest as _QNetworkRequest,
@@ -93,7 +110,10 @@ try:
QT_VERSION = 6 QT_VERSION = 6
QMessageBox = _QMessageBox QMessageBox = _QMessageBox
QFileDialog = _QFileDialog QFileDialog = _QFileDialog
QProgressDialog = _QProgressDialog
QProgressDialog = _QProgressDialog
QEventLoop = _QEventLoop QEventLoop = _QEventLoop
QTimer = _QTimer
QUrl = _QUrl QUrl = _QUrl
QNetworkRequest = _QNetworkRequest QNetworkRequest = _QNetworkRequest
QNetworkReply = _QNetworkReply QNetworkReply = _QNetworkReply
@@ -104,6 +124,7 @@ try:
QGridLayout = _QGridLayout QGridLayout = _QGridLayout
QLabel = _QLabel QLabel = _QLabel
QLineEdit = _QLineEdit QLineEdit = _QLineEdit
QInputDialog = _QInputDialog
QGroupBox = _QGroupBox QGroupBox = _QGroupBox
QVBoxLayout = _QVBoxLayout QVBoxLayout = _QVBoxLayout
QPushButton = _QPushButton QPushButton = _QPushButton
@@ -115,12 +136,18 @@ try:
QToolButton = _QToolButton QToolButton = _QToolButton
QSizePolicy = _QSizePolicy QSizePolicy = _QSizePolicy
QComboBox = _QComboBox QComboBox = _QComboBox
QCheckBox = _QCheckBox
QVariant = _QVariant
QHBoxLayout = _QHBoxLayout
QFont = _QFont
# ✅ QT6 ENUMS # ✅ QT6 ENUMS
YES = QMessageBox.StandardButton.Yes YES = QMessageBox.StandardButton.Yes
NO = QMessageBox.StandardButton.No NO = QMessageBox.StandardButton.No
CANCEL = QMessageBox.StandardButton.Cancel CANCEL = QMessageBox.StandardButton.Cancel
ICON_QUESTION = QMessageBox.Icon.Question ICON_QUESTION = QMessageBox.Icon.Question
AcceptRole = QMessageBox.ButtonRole.AcceptRole
ActionRole = QMessageBox.ButtonRole.ActionRole
RejectRole = QMessageBox.ButtonRole.RejectRole
# Qt6 Enum-Aliase # Qt6 Enum-Aliase
ToolButtonTextBesideIcon = Qt.ToolButtonStyle.ToolButtonTextBesideIcon ToolButtonTextBesideIcon = Qt.ToolButtonStyle.ToolButtonTextBesideIcon
@@ -149,6 +176,7 @@ except (ImportError, AttributeError):
QGridLayout as _QGridLayout, QGridLayout as _QGridLayout,
QLabel as _QLabel, QLabel as _QLabel,
QLineEdit as _QLineEdit, QLineEdit as _QLineEdit,
QInputDialog as _QInputDialog,
QGroupBox as _QGroupBox, QGroupBox as _QGroupBox,
QVBoxLayout as _QVBoxLayout, QVBoxLayout as _QVBoxLayout,
QPushButton as _QPushButton, QPushButton as _QPushButton,
@@ -161,12 +189,17 @@ except (ImportError, AttributeError):
QToolButton as _QToolButton, QToolButton as _QToolButton,
QSizePolicy as _QSizePolicy, QSizePolicy as _QSizePolicy,
QComboBox as _QComboBox, QComboBox as _QComboBox,
QCheckBox as _QCheckBox,
QHBoxLayout as _QHBoxLayout,
) )
from PyQt5.QtGui import QFont as _QFont
from PyQt5.QtCore import ( from PyQt5.QtCore import (
QEventLoop as _QEventLoop, QEventLoop as _QEventLoop,
QTimer as _QTimer,
QUrl as _QUrl, QUrl as _QUrl,
QCoreApplication as _QCoreApplication, QCoreApplication as _QCoreApplication,
Qt as _Qt, Qt as _Qt,
QVariant as _QVariant
) )
from PyQt5.QtNetwork import ( from PyQt5.QtNetwork import (
QNetworkRequest as _QNetworkRequest, QNetworkRequest as _QNetworkRequest,
@@ -178,6 +211,7 @@ except (ImportError, AttributeError):
QMessageBox = _QMessageBox QMessageBox = _QMessageBox
QFileDialog = _QFileDialog QFileDialog = _QFileDialog
QEventLoop = _QEventLoop QEventLoop = _QEventLoop
QTimer = _QTimer
QUrl = _QUrl QUrl = _QUrl
QNetworkRequest = _QNetworkRequest QNetworkRequest = _QNetworkRequest
QNetworkReply = _QNetworkReply QNetworkReply = _QNetworkReply
@@ -188,6 +222,7 @@ except (ImportError, AttributeError):
QGridLayout = _QGridLayout QGridLayout = _QGridLayout
QLabel = _QLabel QLabel = _QLabel
QLineEdit = _QLineEdit QLineEdit = _QLineEdit
QInputDialog = _QInputDialog
QGroupBox = _QGroupBox QGroupBox = _QGroupBox
QVBoxLayout = _QVBoxLayout QVBoxLayout = _QVBoxLayout
QPushButton = _QPushButton QPushButton = _QPushButton
@@ -199,12 +234,20 @@ except (ImportError, AttributeError):
QToolButton = _QToolButton QToolButton = _QToolButton
QSizePolicy = _QSizePolicy QSizePolicy = _QSizePolicy
QComboBox = _QComboBox QComboBox = _QComboBox
QCheckBox = _QCheckBox
QVariant = _QVariant
QHBoxLayout= _QHBoxLayout
QFont = _QFont
# ✅ PYQT5 ENUMS # ✅ PYQT5 ENUMS
YES = QMessageBox.Yes YES = QMessageBox.Yes
NO = QMessageBox.No NO = QMessageBox.No
CANCEL = QMessageBox.Cancel CANCEL = QMessageBox.Cancel
ICON_QUESTION = QMessageBox.Question ICON_QUESTION = QMessageBox.Question
AcceptRole = QMessageBox.AcceptRole
ActionRole = QMessageBox.ActionRole
RejectRole = QMessageBox.RejectRole
# PyQt5 Enum-Aliase # PyQt5 Enum-Aliase
ToolButtonTextBesideIcon = Qt.ToolButtonTextBesideIcon ToolButtonTextBesideIcon = Qt.ToolButtonTextBesideIcon
@@ -244,6 +287,10 @@ except (ImportError, AttributeError):
No = NO No = NO
Cancel = CANCEL Cancel = CANCEL
Question = ICON_QUESTION Question = ICON_QUESTION
AcceptRole = 0
ActionRole = 3
RejectRole = 1
@classmethod @classmethod
def question(cls, parent, title, message, buttons, default_button): def question(cls, parent, title, message, buttons, default_button):
@@ -262,12 +309,30 @@ except (ImportError, AttributeError):
QFileDialog = _MockQFileDialog QFileDialog = _MockQFileDialog
class _MockQInputDialog:
@staticmethod
def getText(parent, title, label, mode=None, text=""):
return text, True
QInputDialog = _MockQInputDialog
class _MockQEventLoop: class _MockQEventLoop:
def exec(self) -> int: return 0 def exec(self) -> int: return 0
def quit(self) -> None: pass def quit(self) -> None: pass
QEventLoop = _MockQEventLoop QEventLoop = _MockQEventLoop
class _MockQTimer:
def __init__(self, *args, **kwargs):
self.timeout = type('Signal', (), {
'connect': lambda s, cb: None,
})()
def setSingleShot(self, v: bool) -> None: pass
def start(self, ms: int) -> None: pass
def stop(self) -> None: pass
QTimer = _MockQTimer
class _MockQUrl(str): class _MockQUrl(str):
def isValid(self) -> bool: return True def isValid(self) -> bool: return True
@@ -298,10 +363,17 @@ except (ImportError, AttributeError):
class _MockLabel: class _MockLabel:
def __init__(self, text: str = ""): self._text = text def __init__(self, text: str = ""): self._text = text
class _MockLineEdit: class _MockLineEdit:
Normal = 0
def __init__(self, *args, **kwargs): self._text = "" def __init__(self, *args, **kwargs): self._text = ""
def text(self) -> str: return self._text def text(self) -> str: return self._text
def setText(self, value: str) -> None: self._text = value def setText(self, value: str) -> None: self._text = value
class _MockFont:
def __init__(self, family: str = "", pointSize: int = 10):
self.family = family
self.pointSize = pointSize
class _MockButton: class _MockButton:
def __init__(self, *args, **kwargs): self.clicked = lambda *a, **k: None def __init__(self, *args, **kwargs): self.clicked = lambda *a, **k: None
@@ -312,6 +384,7 @@ except (ImportError, AttributeError):
QGroupBox = _MockWidget QGroupBox = _MockWidget
QVBoxLayout = _MockLayout QVBoxLayout = _MockLayout
QPushButton = _MockButton QPushButton = _MockButton
QFont = _MockFont
QCoreApplication = object() QCoreApplication = object()
class _MockQt: class _MockQt:
@@ -423,9 +496,87 @@ except (ImportError, AttributeError):
QComboBox = _MockComboBox QComboBox = _MockComboBox
# ---------------------------
# Mock für QVariant
# ---------------------------
class _MockQVariant:
"""
Minimaler Ersatz für QtCore.QVariant.
Ziel:
- Werte transparent durchreichen
- Typ-Konstanten bereitstellen
- Keine Qt-Abhängigkeiten
"""
# Typ-Konstanten (symbolisch, Werte egal)
Invalid = 0
Int = 1
Double = 2
String = 3
Bool = 4
Date = 5
DateTime = 6
def __init__(self, value: Any = None):
self._value = value
def value(self) -> Any:
return self._value
def __repr__(self) -> str:
return f"QVariant({self._value!r})"
# Optional: automatische Entpackung
def __int__(self):
return int(self._value)
def __float__(self):
return float(self._value)
def __str__(self):
return str(self._value)
QVariant = _MockQVariant
class _MockQHBoxLayout:
def __init__(self, *args, **kwargs):
self._widgets = []
def addWidget(self, widget):
self._widgets.append(widget)
def addLayout(self, layout):
pass
def addStretch(self, *args, **kwargs):
pass
def setSpacing(self, *args, **kwargs):
pass
def setContentsMargins(self, *args, **kwargs):
pass
QHBoxLayout = _MockQHBoxLayout
class _MockQCheckBox:
def __init__(self, text: str = "", *args, **kwargs):
self._text = text
self._checked = False
def setText(self, text: str) -> None:
self._text = text
def isChecked(self) -> bool:
return self._checked
def setChecked(self, checked: bool) -> None:
self._checked = checked
QCheckBox = _MockQCheckBox
def exec_dialog(dialog: Any) -> Any: def exec_dialog(dialog: Any) -> Any:
return YES return YES
# --------------------------- TEST --------------------------- # --------------------------- TEST ---------------------------
if __name__ == "__main__": if __name__ == "__main__":
debug_qt_status() debug_qt_status()

View File

@@ -6,6 +6,7 @@ from pathlib import Path
from typing import Union from typing import Union
import sys import sys
from sn_basis.functions.os_wrapper import is_absolute_path, basename
_PathLike = Union[str, Path] _PathLike = Union[str, Path]

View File

@@ -1,13 +1,14 @@
[general] [general]
name=LNO Sachsen | Basisfunktionen name=LNO Sachsen | Plugin Basisfunktionen
qgisMinimumVersion=3.0 qgisMinimumVersion=3.40
qgisMaximumVersion=3.99
description=Plugin mit Basisfunktionen description=Plugin mit Basisfunktionen
version=25.11.4 version=26.3.6-unstable
author=Michael Otto author=Daniel Helbig
email=michael.otto@landkreis-mittelsachsen.de email=daniel.helbig@kreis-meissen.de
about=Plugin mit Basisfunktionen homepage=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis
category=Plugins tracker=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis/issues
homepage=https://entwicklung.vln-sn.de/AG_QGIS/Plugin_SN_Basis repository=https://entwicklung.flurneuordnung-sachsen.de/AG_QGIS/Plugin_SN_Basis/src/branch/unstable/
repository=https://entwicklung.vln-sn.de/AG_QGIS/Repository
supportsQt6=true
experimental=true experimental=true
deprecated=false
supportsQt6=true

View File

@@ -2,14 +2,14 @@
DataGrabber module DataGrabber module
================== ==================
UIfreier Orchestrator für die Prüfung und Klassifikation von Datenquellen. UI-freier Orchestrator für die Prüfung und Klassifikation von Datenquellen.
Der DataGrabber: Der DataGrabber:
- klassifiziert die übergebene Quelle (Datei, Dienst, Datenbank, Excel), - klassifiziert die übergebene Quelle (Datei, Dienst, Datenbank, Excel),
- ruft passende Prüfer (Dateipruefer, Linkpruefer, Layerpruefer, Stilpruefer) auf, - ruft passende Prüfer (Dateipruefer, Linkpruefer, Layerpruefer, Stilpruefer) auf,
- sammelt alle rohen ``pruef_ergebnis``Objekte, - sammelt alle rohen ``pruef_ergebnis``-Objekte,
- aggregiert diese zu einem zusammenfassenden Ergebnis, - aggregiert diese zu einem zusammenfassenden Ergebnis,
- **löst selbst keinerlei UIInteraktion aus**. - **löst selbst keinerlei UI-Interaktion aus**.
Alle Nutzerinteraktionen (MessageBar, QMessageBox, Logging) erfolgen Alle Nutzerinteraktionen (MessageBar, QMessageBox, Logging) erfolgen
ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline). ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline).
@@ -17,8 +17,11 @@ ausschließlich über den ``Pruefmanager`` im aufrufenden Kontext (UI / Pipeline
from __future__ import annotations from __future__ import annotations
import re
from typing import Any, Dict, List, Mapping, Optional, Tuple, Literal from typing import Any, Dict, List, Mapping, Optional, Tuple, Literal
from sn_basis.functions.os_wrapper import basename, path_suffix
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.modules.Pruefmanager import Pruefmanager from sn_basis.modules.Pruefmanager import Pruefmanager
@@ -27,6 +30,7 @@ from sn_basis.modules.linkpruefer import Linkpruefer
from sn_basis.modules.layerpruefer import Layerpruefer from sn_basis.modules.layerpruefer import Layerpruefer
from sn_basis.modules.stilpruefer import Stilpruefer from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.excel_importer import ExcelImporter from sn_basis.modules.excel_importer import ExcelImporter
from sn_plan41.modules.listenauswerter import Listenauswerter
SourceType = Literal["service", "database", "excel", "unknown"] SourceType = Literal["service", "database", "excel", "unknown"]
@@ -38,9 +42,6 @@ class DataGrabber:
""" """
Analysiert und prüft Datenquellen für den Fachdatenabruf. Analysiert und prüft Datenquellen für den Fachdatenabruf.
Der DataGrabber ist **UIfrei**. Er erzeugt ausschließlich rohe
``pruef_ergebnis``Objekte und überlässt deren Verarbeitung
vollständig dem aufrufenden Code.
""" """
def __init__( def __init__(
@@ -55,9 +56,9 @@ class DataGrabber:
) -> None: ) -> None:
self.pruefmanager = pruefmanager self.pruefmanager = pruefmanager
self._datei_pruefer_cls = datei_pruefer_cls self._datei_pruefer_cls = datei_pruefer_cls
self.link_pruefer = link_pruefer self.link_pruefer = link_pruefer or Linkpruefer()
self.layer_pruefer = layer_pruefer self.layer_pruefer = layer_pruefer or Layerpruefer()
self.stil_pruefer = stil_pruefer self.stil_pruefer = stil_pruefer or Stilpruefer()
self._excel_importer_cls = excel_importer_cls self._excel_importer_cls = excel_importer_cls
self._source: Optional[str] = None self._source: Optional[str] = None
@@ -69,23 +70,61 @@ class DataGrabber:
"""Setzt die aktuell zu untersuchende Rohquelle.""" """Setzt die aktuell zu untersuchende Rohquelle."""
self._source = source self._source = source
def analyze_source_type(self, source: str) -> SourceType: SourceType = str # "excel" | "datenbank" | "dienst" | "unbekannt"
"""
Klassifiziert die Quelle.
Aktuell Platzhalter liefert ``"unknown"``.
def analyze_source_type(self, quelle: str) -> Tuple[SourceType, pruef_ergebnis]:
""" """
return "unknown" Klassifiziert die Quelle und liefert das zugehörige pruef_ergebnis.
Reihenfolge:
1. Dateipruefer (Datei + Dateityp)
2. Linkpruefer (Dienst)
"""
# --------------------------------------------------
# 1. Datei prüfen (inkl. Typ-Erkennung)
# --------------------------------------------------
dateipruefer = Dateipruefer(pfad=quelle)
datei_ergebnis = dateipruefer.pruefe()
if datei_ergebnis.ok:
suffix = path_suffix(datei_ergebnis.kontext).lower()
print(f"[DataGrabber] Debug: analyze_source_type source={quelle} -> suffix={suffix}")
if suffix == ".xlsx":
return "excel", datei_ergebnis
if suffix in (".gpkg", ".sqlite"):
return "datenbank", datei_ergebnis
return "unbekannter_dateityp", datei_ergebnis
# --------------------------------------------------
# 2. Keine Datei → Link prüfen
# --------------------------------------------------
linkpruefer = Linkpruefer()
link_ergebnis = linkpruefer.pruefe(quelle)
if link_ergebnis.ok:
return "dienst", link_ergebnis
# --------------------------------------------------
# 3. Weder Datei noch Dienst
# --------------------------------------------------
return "unbekannte_quelle", link_ergebnis
def run(self, source: str) -> Tuple[SourceDict, pruef_ergebnis]: def run(self, source: str) -> Tuple[SourceDict, pruef_ergebnis]:
""" """
Führt die vollständige Quellprüfung aus. Führt die vollständige Quellprüfung aus.
Diese Methode ist **UIfrei**. Sie gibt rohe Ergebnisse zurück, Diese Methode ist **UIfrei**. Sie gibt rohe Ergebnisse zurück,
die vom Aufrufer über den ``Pruefmanager`` verarbeitet werden. die vom Aufrufer über den ``Pruefmanager`` verarbeitet werden.
""" """
self.set_source(source) self.set_source(source)
source_type = self.analyze_source_type(source) source_type, source_result = self.analyze_source_type(source)
print(f"[DataGrabber] Debug: run source={source} -> source_type={source_type}")
source_dict: SourceDict = {} source_dict: SourceDict = {}
partial_results: List[pruef_ergebnis] = [] partial_results: List[pruef_ergebnis] = []
@@ -97,14 +136,7 @@ class DataGrabber:
elif source_type == "service": elif source_type == "service":
source_dict, partial_results = self._process_service_source(source) source_dict, partial_results = self._process_service_source(source)
else: else:
partial_results.append( partial_results.append(source_result)
pruef_ergebnis(
ok=False,
meldung="Quelle konnte nicht klassifiziert werden",
aktion="kein_dateipfad",
kontext={"source": source},
)
)
summary = self._aggregate_results(source, source_dict, partial_results) summary = self._aggregate_results(source, source_dict, partial_results)
return source_dict, summary return source_dict, summary
@@ -115,9 +147,150 @@ class DataGrabber:
def _process_excel_source( def _process_excel_source(
self, filepath: str self, filepath: str
) -> Tuple[SourceDict, List[pruef_ergebnis]]: ) -> Tuple[SourceDict, List[pruef_ergebnis]]:
source_dict: SourceDict = {} source_dict: SourceDict = {"rows": []}
results: List[pruef_ergebnis] = [] results: List[pruef_ergebnis] = []
return source_dict, results
rows = ExcelImporter(filepath, self.pruefmanager).import_xlsx()
print(f"[DataGrabber] Debug: Excel-Linkliste geladen: {filepath}")
print(f"[DataGrabber] Debug: raw rows count: {len(rows)}")
if rows:
first = rows[:min(5, len(rows))]
print(f"[DataGrabber] Debug: first rows: {first}")
if not rows:
return source_dict, results
required_keys = {"ident", "gruppe", "kartenebene", "inhalt", "link", "provider", "stildatei"}
def extract_url(raw_link: str, provider: str) -> str:
if not raw_link:
return ""
if not isinstance(raw_link, str):
return str(raw_link)
if provider == "wfs":
url_match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
type_match = re.search(r"typename\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if url_match:
url = url_match.group(1).strip()
if type_match:
typename = type_match.group(1).strip()
separator = "&" if "?" in url else "?"
return f"url={url}{separator}service=WFS&request=GetFeature&typename={typename}"
return f"url={url}"
if provider == "wms":
# falls WMS-URL als url='...' vorliegt
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
if provider == "rest":
# REST/ArcGIS-Server: direkt nutzen
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
# allgemeines Rückfallverhalten
match = re.search(r"url\s*=\s*['\"]([^'\"]+)['\"]", raw_link, re.IGNORECASE)
if match:
return match.group(1).strip()
return raw_link.strip()
for row_index, raw_row in enumerate(rows, start=2):
if not isinstance(raw_row, Mapping):
pe = pruef_ergebnis(
ok=False,
meldung="Linklisten-Zeile ist nicht als Dictionary formatiert.",
aktion="ungueltige_zeile",
kontext={"zeile": row_index, "wert": raw_row},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
normalized = {str(k).strip().lower(): v for k, v in raw_row.items() if k is not None}
if not required_keys.issubset(normalized.keys()):
missing = required_keys.difference(normalized.keys())
pe = pruef_ergebnis(
ok=False,
meldung=f"Linkliste fehlt erforderliche Spalten: {', '.join(sorted(missing))}",
aktion="spaltenfehlend",
kontext={"zeile": row_index, "fehlend": sorted(missing)},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
ident = normalized.get("ident")
link_raw = normalized.get("link") or ""
provider = str(normalized.get("provider") or "").strip().lower()
stildatei_raw = normalized.get("stildatei") or ""
stildatei = None
if stildatei_raw and str(stildatei_raw).strip():
style_result = self.stil_pruefer.pruefe(str(stildatei_raw).strip())
results.append(self.pruefmanager.verarbeite(style_result))
if style_result.ok:
# Style-Pfad in der Datenkette beibehalten (absolut, wenn vorhanden).
stildatei = str(style_result.kontext or stildatei_raw).strip()
else:
stildatei = None
else:
results.append(self.pruefmanager.verarbeite(pruef_ergebnis(ok=True, meldung="Kein Stil angegeben", aktion="stil_optional", kontext=None)))
stildatei = None
if not ident or not link_raw or not provider:
pe = pruef_ergebnis(
ok=False,
meldung="Linklisten-Zeile hat fehlende Pflichtfelder (ident/link/provider).",
aktion="pflichtfelder_fehlen",
kontext={"zeile": row_index, "daten": raw_row},
)
results.append(self.pruefmanager.verarbeite(pe))
continue
link_url = extract_url(link_raw, provider)
# Provider-abhängige Linkvalidierung
if provider in ("wfs", "wms", "rest"):
# Webdienste: wir akzeptieren die URL-Form und prüfen nicht per network_head.
link_result = pruef_ergebnis(ok=True, meldung="Service-Link angenommen", aktion="service_link", kontext=link_url)
elif provider in ("ogr", "gpkg", "shp", "geojson"):
# OGR/Pfad: mit Linkpruefer (pfad oder lokale Datei) prüfen
link_result = self.link_pruefer.pruefe(link_url)
else:
link_result = self.link_pruefer.pruefe(link_url)
results.append(self.pruefmanager.verarbeite(link_result))
# stildatei wurde bereits oben geprüft und ggf. auf Dateiname gesetzt oder auf None
if not link_result.ok:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Zeile {row_index}: fehlerhafter Link",
aktion="link_unvollstaendig",
kontext={"row": row_index, "ident": ident},
)
)
continue
result_row = {
"ident": ident,
"gruppe": normalized.get("gruppe"),
"Kartenebene": normalized.get("kartenebene"),
"Inhalt": normalized.get("inhalt"),
"Link": link_url,
"Provider": provider,
"stildatei": stildatei,
}
source_dict["rows"].append(result_row)
# Validierung über Listenauswerter
listenauswerter = Listenauswerter(self.pruefmanager, self.stil_pruefer or Stilpruefer())
validated, validation_results = listenauswerter.validate_rows(source_dict)
results.extend(validation_results)
return validated, results
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# DatenbankQuellen # DatenbankQuellen
@@ -125,6 +298,7 @@ class DataGrabber:
def _process_database_source( def _process_database_source(
self, db_path: str self, db_path: str
) -> Tuple[SourceDict, List[pruef_ergebnis]]: ) -> Tuple[SourceDict, List[pruef_ergebnis]]:
print(f"[DataGrabber] Debug: _process_database_source called, db_path={db_path}")
source_dict: SourceDict = {} source_dict: SourceDict = {}
results: List[pruef_ergebnis] = [] results: List[pruef_ergebnis] = []
return source_dict, results return source_dict, results
@@ -149,24 +323,29 @@ class DataGrabber:
partial_results: List[pruef_ergebnis], partial_results: List[pruef_ergebnis],
) -> pruef_ergebnis: ) -> pruef_ergebnis:
""" """
Aggregiert Einzelprüfungen zu einem Gesamt``pruef_ergebnis``. Aggregiert Einzelprüfungen zu einem Gesamt-``pruef_ergebnis``.
**Keine UIInteraktion.** **Keine UI-Interaktion.**
""" """
if source_dict: rows = source_dict.get("rows") if isinstance(source_dict, dict) else None
if rows:
return pruef_ergebnis( return pruef_ergebnis(
ok=True, ok=True,
meldung="Quelle erfolgreich geprüft", meldung="Quelle erfolgreich geprüft",
aktion="ok", aktion="ok",
kontext={ kontext={
"source": source, "source": source,
"valid_entries": sum(len(v) for v in source_dict.values()), "valid_entries": len(rows),
}, },
) )
# Wenn die Linkliste zwar gelesen wurde, aber keine gültigen Zeilen verfügbar sind, geben wir spezifischere Infos zurück.
return pruef_ergebnis( return pruef_ergebnis(
ok=False, ok=False,
meldung="Keine gültigen Einträge in der Quelle gefunden", meldung="Keine validen Einträge in der Linkliste gefunden",
aktion="read_error", aktion="keine_validen_eintraege",
kontext={"source": source}, kontext={
"source": source,
"eintraege_gesamt": len(source_dict.get("rows", [])),
},
) )

View File

@@ -6,11 +6,11 @@ der Anforderungen 1-2.e (leerer Pfad, fehlende Datei, bestehende Datei).
""" """
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional, Literal
from sn_basis.functions.sys_wrapper import join_path, file_exists from sn_basis.functions.sys_wrapper import join_path, file_exists
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion
DateiTyp = Literal["excel","datenbank","unbekannt"]
class Dateipruefer: class Dateipruefer:
""" """
@@ -74,10 +74,25 @@ class Dateipruefer:
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Hilfsfunktionen # Hilfsfunktionen
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def erkenne_dateityp(self, pfad: Path) -> DateiTyp:
"""
Erkennt den Dateityp anhand der Endung.
"""
suffix = pfad.suffix.lower()
if suffix == ".xlsx":
return "excel"
if suffix in (".gpkg", ".sqlite"):
return "datenbank"
return "unbekannt"
def _pfad(self, relativer_pfad: str) -> Path: def _pfad(self, relativer_pfad: str) -> Path:
"""Erzeugt OS-unabhängigen Pfad relativ zum Basisverzeichnis.""" """Erzeugt OS-unabhängigen Pfad relativ zum Basisverzeichnis."""
return join_path(self.basis_pfad, relativer_pfad) return join_path(self.basis_pfad, relativer_pfad)
def _ist_leer(self) -> bool: def _ist_leer(self) -> bool:
""" """
Prüft robust, ob Eingabe als „leer" zu behandeln ist. Prüft robust, ob Eingabe als „leer" zu behandeln ist.
@@ -134,6 +149,31 @@ class Dateipruefer:
# 2. Pfad normalisieren # 2. Pfad normalisieren
pfad = self._pfad(self.pfad.strip()) pfad = self._pfad(self.pfad.strip())
#Excel-dateien erkennen
dateityp = self.erkenne_dateityp(pfad)
if dateityp == "excel":
if not file_exists(pfad):
return pruef_ergebnis(
ok=False,
meldung=f"Excel-Datei '{self.pfad}' wurde nicht gefunden.",
aktion="datei_nicht_gefunden",
kontext=pfad,
)
return pruef_ergebnis(
ok=True,
meldung="Excel-Datei ist gültig.",
aktion="ok",
kontext=pfad,
)
if dateityp != "datenbank":
return pruef_ergebnis(
ok=False,
meldung=f"Der Pfad '{self.pfad}' ist kein unterstützter Dateityp.",
aktion="unbekannter_dateityp",
kontext=pfad,
)
# 🆕 2.c: Ungültiger GPKG-Pfad? # 🆕 2.c: Ungültiger GPKG-Pfad?
if not self.verfahrens_db_modus or not self._ist_gueltiger_gpkg_pfad(pfad): if not self.verfahrens_db_modus or not self._ist_gueltiger_gpkg_pfad(pfad):

View File

@@ -17,10 +17,11 @@ Designprinzipien
- Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten. - Die Methode ist pdoc-kompatibel dokumentiert und bewusst einfach gehalten.
""" """
from typing import Any, Dict, List, Mapping, Optional, Tuple from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import json import json
import time
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qgiscore_wrapper as qgiscore from sn_basis.functions import qgiscore_wrapper as qgiscore
@@ -59,6 +60,7 @@ class Datenabruf:
verfahrensgebiet_layer: Any, verfahrensgebiet_layer: Any,
speicherort: str, speicherort: str,
pruef_ergebnisse: Optional[List[Any]] = None, pruef_ergebnisse: Optional[List[Any]] = None,
progress: Optional[Any] = None,
) -> Tuple[Dict[str, Any], List[Any]]: ) -> Tuple[Dict[str, Any], List[Any]]:
""" """
Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und Ruft für alle Zeilen in ``result_dict["rows"]`` die Fachdaten ab und
@@ -82,6 +84,10 @@ class Datenabruf:
# 1) Räumliche Filtergeometrie bestimmen (BBox oder None) # 1) Räumliche Filtergeometrie bestimmen (BBox oder None)
bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer) bbox_geom = self._determine_spatial_filter(raumfilter, verfahrensgebiet_layer)
filter_crs_authid = None
if isinstance(bbox_geom, dict):
raw_crs = bbox_geom.get("crs_authid")
filter_crs_authid = str(raw_crs) if raw_crs else None
# Globale Logs über alle Dienste hinweg # Globale Logs über alle Dienste hinweg
log_geladen: Dict[str, int] = {} log_geladen: Dict[str, int] = {}
@@ -90,7 +96,20 @@ class Datenabruf:
log_ausserhalb: Dict[str, int] = {} log_ausserhalb: Dict[str, int] = {}
# 2) Über alle Zeilen iterieren # 2) Über alle Zeilen iterieren
for row in rows: total_rows = len(rows)
for idx, row in enumerate(rows, start=1):
if progress is not None:
progress.set_label(f"Datenabruf {idx}/{total_rows}")
if progress.is_canceled():
pe_cancel = pruef_ergebnis(
ok=False,
meldung="Datenabruf durch Benutzer abgebrochen",
aktion="abbruch",
kontext={"schritt": idx},
)
processed_results.append(self.pruefmanager.verarbeite(pe_cancel))
break
ident = row.get("ident") ident = row.get("ident")
link = row.get("Link") link = row.get("Link")
provider = row.get("Provider") provider = row.get("Provider")
@@ -115,7 +134,16 @@ class Datenabruf:
url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None) url = self._build_provider_url(link=link, provider=str(provider), bbox_geom=bbox_geom if use_bbox else None)
# 2b) Fachdaten abrufen # 2b) Fachdaten abrufen
features, error_msg = self._fetch_features(url=url, provider=str(provider)) features, error_msg = self._fetch_features(
url=url,
provider=str(provider),
cancel_callback=(progress.is_canceled if progress is not None else None),
)
if progress is not None:
if hasattr(progress, "set_value"):
progress.set_value(idx)
# 2c) Logs und Aggregation # 2c) Logs und Aggregation
if error_msg: if error_msg:
@@ -207,7 +235,18 @@ class Datenabruf:
return None return None
if raumfilter == "Verfahrensgebiet": if raumfilter == "Verfahrensgebiet":
return qgiscore.get_layer_extent(verfahrensgebiet_layer) extent = qgiscore.get_layer_extent(verfahrensgebiet_layer)
if extent is None:
return None
crs_authid = None
try:
if hasattr(verfahrensgebiet_layer, "crs") and callable(getattr(verfahrensgebiet_layer, "crs")):
crs = verfahrensgebiet_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
if raumfilter == "Pufferlayer": if raumfilter == "Pufferlayer":
buffer_layer = qgiscore.create_buffer_layer( buffer_layer = qgiscore.create_buffer_layer(
@@ -216,8 +255,18 @@ class Datenabruf:
layer_name="Verfahrensgebiet_Puffer_1km", layer_name="Verfahrensgebiet_Puffer_1km",
) )
if buffer_layer is not None: if buffer_layer is not None:
qgisui.add_layer_to_project(buffer_layer) extent = qgiscore.get_layer_extent(buffer_layer)
return qgiscore.get_layer_extent(buffer_layer) if extent is None:
return None
crs_authid = None
try:
if hasattr(buffer_layer, "crs") and callable(getattr(buffer_layer, "crs")):
crs = buffer_layer.crs()
if crs is not None and hasattr(crs, "authid") and callable(getattr(crs, "authid")):
crs_authid = crs.authid()
except Exception:
crs_authid = None
return {"extent": extent, "crs_authid": crs_authid}
return None return None
@@ -233,60 +282,130 @@ class Datenabruf:
Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS"). Erwartet: provider ist gesetzt (z. B. "WFS", "REST", "OGR", "WMS").
""" """
provider_norm = (provider or "").upper() provider_norm = (provider or "").upper()
base_link = link or "" base_link = (link or "").strip()
if base_link.lower().startswith("url="):
base_link = base_link[4:].strip()
# WMS: niemals BBOX anhängen if provider_norm == "WFS" and base_link.count("?") > 1:
first, rest = base_link.split("?", 1)
base_link = f"{first}?{rest.replace('?', '&')}"
extent_obj = bbox_geom
crs_authid: Optional[str] = None
if isinstance(bbox_geom, dict):
extent_obj = bbox_geom.get("extent")
raw_crs = bbox_geom.get("crs_authid")
crs_authid = str(raw_crs) if raw_crs else None
# WMS: unverändert durchreichen
if provider_norm == "WMS": if provider_norm == "WMS":
return base_link return base_link
if bbox_geom is None: # Versuche bbox-String zu erzeugen (falls Raumfilter aktiv)
return base_link
# Versuche bbox-String zu erzeugen (nutzt qgiscore.extent_to_bbox_string wenn vorhanden)
bbox_str: Optional[str] = None bbox_str: Optional[str] = None
try: if extent_obj is not None:
extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None) try:
if callable(extent_to_bbox): extent_to_bbox = getattr(__import__("sn_basis.functions.qgiscore_wrapper", fromlist=["qgiscore_wrapper"]), "extent_to_bbox_string", None)
bbox_str = extent_to_bbox(bbox_geom) if callable(extent_to_bbox):
else: bbox_str = extent_to_bbox(extent_obj)
# Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
if hasattr(bbox_geom, "xmin") and callable(getattr(bbox_geom, "xmin")):
bbox_str = f"{bbox_geom.xmin()},{bbox_geom.ymin()},{bbox_geom.xmax()},{bbox_geom.ymax()}"
elif isinstance(bbox_geom, (tuple, list)) and len(bbox_geom) == 4:
bbox_str = f"{bbox_geom[0]},{bbox_geom[1]},{bbox_geom[2]},{bbox_geom[3]}"
else: else:
bbox_str = str(bbox_geom) # Fallback: einfache xmin/ymin/xmax/ymax-Extraktion (duck-typing)
except Exception: if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
bbox_str = None bbox_str = f"{extent_obj.xmin()},{extent_obj.ymin()},{extent_obj.xmax()},{extent_obj.ymax()}"
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
if not bbox_str: bbox_str = f"{extent_obj[0]},{extent_obj[1]},{extent_obj[2]},{extent_obj[3]}"
return base_link else:
bbox_str = str(extent_obj)
except Exception:
bbox_str = None
parsed = urlparse(base_link) parsed = urlparse(base_link)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True)) query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
if provider_norm == "WFS": if provider_norm == "WFS":
query_params.setdefault("BBOX", bbox_str) query_params.setdefault("service", "WFS")
query_params.setdefault("request", "GetFeature")
query_params.setdefault("outputFormat", "application/json")
if bbox_str:
query_params.setdefault("BBOX", bbox_str)
if crs_authid:
query_params.setdefault("SRSNAME", crs_authid)
new_query = urlencode(query_params, doseq=True) new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query) rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt) return urlunparse(rebuilt)
if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"): if provider_norm in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER"):
query_params.setdefault("geometry", bbox_str) # ArcGIS FeatureServer erwartet i.d.R. den /query-Endpunkt
query_params.setdefault("geometryType", "esriGeometryEnvelope") rest_base = base_link.rstrip("/")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects") if not rest_base.lower().endswith("/query"):
rest_base = f"{rest_base}/query"
parsed_rest = urlparse(rest_base)
query_params = dict(parse_qsl(parsed_rest.query, keep_blank_values=True))
query_params.setdefault("where", "1=1")
query_params.setdefault("outFields", "*")
query_params.setdefault("returnGeometry", "true")
query_params.setdefault("f", query_params.get("f", "json")) query_params.setdefault("f", query_params.get("f", "json"))
if bbox_str:
geometry_envelope = None
try:
if hasattr(extent_obj, "xmin") and callable(getattr(extent_obj, "xmin")):
geometry_envelope = {
"xmin": extent_obj.xmin(),
"ymin": extent_obj.ymin(),
"xmax": extent_obj.xmax(),
"ymax": extent_obj.ymax(),
}
elif isinstance(extent_obj, (tuple, list)) and len(extent_obj) == 4:
geometry_envelope = {
"xmin": extent_obj[0],
"ymin": extent_obj[1],
"xmax": extent_obj[2],
"ymax": extent_obj[3],
}
else:
parts = [p.strip() for p in str(bbox_str).split(",")]
if len(parts) == 4:
geometry_envelope = {
"xmin": float(parts[0]),
"ymin": float(parts[1]),
"xmax": float(parts[2]),
"ymax": float(parts[3]),
}
except Exception:
geometry_envelope = None
if geometry_envelope is not None:
query_params.setdefault("geometry", json.dumps(geometry_envelope))
else:
query_params.setdefault("geometry", bbox_str)
query_params.setdefault("geometryType", "esriGeometryEnvelope")
query_params.setdefault("spatialRel", "esriSpatialRelIntersects")
if crs_authid and ":" in crs_authid:
srid = crs_authid.split(":", 1)[1]
if srid.isdigit():
query_params.setdefault("inSR", srid)
query_params.setdefault("outSR", srid)
new_query = urlencode(query_params, doseq=True) new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query) rebuilt = parsed_rest._replace(query=new_query)
return urlunparse(rebuilt) return urlunparse(rebuilt)
# Default: generischer bbox-Parameter # Default: generischer bbox-Parameter (nur wenn vorhanden)
query_params.setdefault("bbox", bbox_str) if bbox_str:
query_params.setdefault("bbox", bbox_str)
new_query = urlencode(query_params, doseq=True) new_query = urlencode(query_params, doseq=True)
rebuilt = parsed._replace(query=new_query) rebuilt = parsed._replace(query=new_query)
return urlunparse(rebuilt) return urlunparse(rebuilt)
def _fetch_features(self, url: str, provider: str) -> Tuple[List[Any], Optional[str]]: def _fetch_features(
self,
url: str,
provider: str,
cancel_callback: Optional[Callable[[], bool]] = None,
) -> Tuple[List[Any], Optional[str]]:
""" """
Führt den eigentlichen Abruf der Fachdaten durch. Führt den eigentlichen Abruf der Fachdaten durch.
@@ -336,34 +455,100 @@ class Datenabruf:
http_error: Optional[str] = None http_error: Optional[str] = None
# QGIS NetworkAccessManager bevorzugen # QGIS NetworkAccessManager bevorzugen
_FETCH_TIMEOUT_MS = 30_000 # 30 Sekunden
aborted_or_timed_out = False
attempted_qgis_fetch = False
if callable(cancel_callback) and cancel_callback():
return [], "Abbruch durch Benutzer"
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None: if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsNetworkAccessManager", None) is not None:
attempted_qgis_fetch = True
try: try:
manager = qgiscore.QgsNetworkAccessManager.instance() manager = qgiscore.QgsNetworkAccessManager.instance()
QUrl = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QUrl", None) # Netzwerk-Timeout global setzen (QGIS >= 3.6)
QNetworkRequest = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QNetworkRequest", None) if hasattr(manager, "setTimeout"):
QEventLoop = getattr(__import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"]), "QEventLoop", None) manager.setTimeout(_FETCH_TIMEOUT_MS)
_qt = __import__("sn_basis.functions.qt_wrapper", fromlist=["qt_wrapper"])
QUrl = getattr(_qt, "QUrl", None)
QNetworkRequest = getattr(_qt, "QNetworkRequest", None)
QEventLoop = getattr(_qt, "QEventLoop", None)
QTimer = getattr(_qt, "QTimer", None)
if QUrl is not None and QNetworkRequest is not None: if QUrl is not None and QNetworkRequest is not None:
req = QNetworkRequest(QUrl(url)) req = QNetworkRequest(QUrl(url))
reply = manager.get(req) reply = manager.get(req)
if QEventLoop is not None: if QEventLoop is not None:
loop = QEventLoop() loop = QEventLoop()
reply.finished.connect(loop.quit) reply.finished.connect(loop.quit)
loop.exec() _poll_timer = None
try: if QTimer is not None:
raw = reply.readAll() try:
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw _poll_timer = QTimer()
response_text = data_bytes.decode("utf-8", errors="replace") _poll_timer.setSingleShot(False)
except Exception: _poll_timer.timeout.connect(loop.quit)
_poll_timer.start(100)
except Exception:
_poll_timer = None
start_time = time.monotonic()
while True:
if callable(cancel_callback) and cancel_callback():
reply.abort()
http_error = "Abbruch durch Benutzer"
aborted_or_timed_out = True
break
elapsed_ms = int((time.monotonic() - start_time) * 1000)
if elapsed_ms >= _FETCH_TIMEOUT_MS:
reply.abort()
http_error = f"Timeout nach {_FETCH_TIMEOUT_MS // 1000} s: {url}"
aborted_or_timed_out = True
break
if hasattr(reply, "isFinished") and reply.isFinished():
break
loop.exec()
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
if _poll_timer is not None:
try:
_poll_timer.stop()
except Exception:
pass
if not aborted_or_timed_out:
# Fehler aus Reply auslesen
err_code = None
try:
err_code = reply.error()
except Exception:
pass
if err_code and int(err_code) != 0:
http_error = f"Netzwerkfehler ({err_code}): {reply.errorString()}"
if http_error:
# Timeout oder Netzwerkfehler keinen Body lesen
pass
else:
try: try:
response_text = reply.text() raw = reply.readAll()
data_bytes = bytes(raw) if hasattr(raw, "__bytes__") else raw
response_text = data_bytes.decode("utf-8", errors="replace")
except Exception: except Exception:
response_text = None try:
response_text = reply.text()
except Exception:
response_text = None
except Exception as exc: except Exception as exc:
http_error = f"QgsNetworkAccessManager error: {exc}" http_error = f"QgsNetworkAccessManager error: {exc}"
response_text = None response_text = None
# Fallback: requests # Fallback: requests nur wenn kein harter Abbruch/Timeout im QGIS-Request vorlag
if response_text is None: if response_text is None and (not attempted_qgis_fetch or not aborted_or_timed_out):
try: try:
import requests # lokal import, keine harte Abhängigkeit import requests # lokal import, keine harte Abhängigkeit
r = requests.get(url, timeout=30) r = requests.get(url, timeout=30)
@@ -383,6 +568,8 @@ class Datenabruf:
return parsed.get("features", []), None return parsed.get("features", []), None
if isinstance(parsed, dict) and "features" in parsed: if isinstance(parsed, dict) and "features" in parsed:
return parsed.get("features", []), None return parsed.get("features", []), None
if prov in ("REST", "ARCGIS", "ARCGISFEATURESERVER", "ARCGIS_FEATURESERVER", "WFS"):
return [], "Antwort enthält keine Feature-Liste"
# Sonst: gib das gesamte JSON als einzelnes Objekt zurück # Sonst: gib das gesamte JSON als einzelnes Objekt zurück
return [parsed], None return [parsed], None
except json.JSONDecodeError: except json.JSONDecodeError:

View File

@@ -30,9 +30,13 @@ from __future__ import annotations
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import os import os
import json import json
import re
import datetime import datetime
import sqlite3
from sn_basis.functions import qgiscore_wrapper as qgiscore from sn_basis.functions import qgiscore_wrapper as qgiscore
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
@@ -53,10 +57,97 @@ class Datenschreiber:
def __init__(self, pruefmanager: Any, gpkg_path: Optional[str] = None) -> None: def __init__(self, pruefmanager: Any, gpkg_path: Optional[str] = None) -> None:
self.pruefmanager = pruefmanager self.pruefmanager = pruefmanager
self.gpkg_path = gpkg_path self.gpkg_path = str(gpkg_path) if gpkg_path else None
# ------------------------------------------------------------------ # def _resolve_style_path(self, style_path: Optional[str]) -> Optional[str]:
# Schreibe Daten if not style_path:
return None
style_path_str = str(style_path).strip()
if not style_path_str:
return None
if not is_absolute_path(style_path_str):
plugin_root = get_plugin_root()
style_path_str = str(join_path(plugin_root, "sn_plan41", "assets", style_path_str))
style_path_str = str(normalize_path(style_path_str))
return style_path_str if file_exists(style_path_str) else None
def _store_style_in_gpkg(self, layer_name: str, style_path: str, layer: Optional[Any] = None) -> None:
"""Stellt sicher, dass der Stil in der layer_styles-Tabelle der GPKG gespeichert wird."""
try:
with open(style_path, "r", encoding="utf-8") as fh:
style_qml = fh.read()
f_geometry_column = ''
if layer is not None:
try:
if hasattr(layer, 'geometryColumn'):
f_geometry_column = str(layer.geometryColumn())
elif hasattr(layer, 'dataProvider') and hasattr(layer.dataProvider(), 'geometryColumnName'):
f_geometry_column = str(layer.dataProvider().geometryColumnName())
except Exception:
f_geometry_column = ''
with sqlite3.connect(self.gpkg_path) as conn:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS layer_styles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
f_table_catalog TEXT,
f_table_schema TEXT,
f_table_name TEXT NOT NULL,
f_geometry_column TEXT,
styleName TEXT,
styleQML TEXT,
styleSLD TEXT,
useAsDefault BOOLEAN,
description TEXT,
owner TEXT,
ui TEXT,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
# Das aktuelle QGIS-Style-Verhalten: bestehenden Style für denselben Layer nicht löschen (nur appenden)
# Wir wollen aber Default-Style setzen: alte Default-Styles entfernen.
cur.execute(
"UPDATE layer_styles SET useAsDefault = 0 WHERE f_table_name = ?",
(layer_name,),
)
# Fülle die bekannten QGIS-Kolonnen
style_name = os.path.basename(style_path)
cur.execute(
"INSERT INTO layer_styles (f_table_catalog, f_table_schema, f_table_name, f_geometry_column, styleName, styleQML, styleSLD, useAsDefault, description, owner, ui) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
'',
'',
layer_name,
f_geometry_column,
style_name,
style_qml,
None,
1,
'',
'',
'',
),
)
conn.commit()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Speichern des Layer-Stils in GPKG: {exc}",
aktion="style_gpkg_speichern_fehlgeschlagen",
kontext={"layer_name": layer_name, "style_path": style_path},
)
)
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
def schreibe_Daten( def schreibe_Daten(
self, self,
@@ -65,192 +156,93 @@ class Datenschreiber:
speicherort: str, speicherort: str,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Schreibt die abgerufenen Daten in die Zieldatenbank/Dateien. Schreibt die übergebenen Layer in die Ziel-GPKG.
Ablauf Erwartung:
------ - daten_dict["daten"] enthält Einträge der Form:
Für jede Zeile (ident) in ``daten_dict["daten"]``: ident -> {"layer": QgsVectorLayer}
1. Bestimme Ziel-Layername (z. B. Thema oder ident). - self.gpkg_path ist ein str
2. Prüfe, ob ein Layer mit diesem Namen bereits existiert (Wrapper).
3. Falls vorhanden, frage den Benutzer (Überschreiben / Anhängen / Abbrechen)
über die zentrale Pruefmanager-Methode `ask_overwrite_append_cancel`.
4. Führe die gewählte Operation aus oder schreibe den Layer, wenn er noch nicht existiert.
5. Schreibe ggf. den Stil in die GPKG und setze ihn als Vorgabe.
6. Sammle und gib eine Liste der angelegten/geänderten Layer zurück.
Returns
-------
List[Dict[str, Any]]
Liste von Dicts mit Informationen zu jedem angelegten/geänderten Layer.
""" """
if not speicherort: if not speicherort:
raise ValueError("Ein gültiger Speicherort (speicherort) muss übergeben werden.") raise ValueError("Ein gültiger Speicherort (speicherort) muss übergeben werden.")
# Setze gpkg_path falls noch nicht vorhanden # gpkg_path einmalig setzen / normalisieren
if not self.gpkg_path: if not self.gpkg_path:
self.gpkg_path = speicherort self.gpkg_path = str(speicherort)
results: List[Dict[str, Any]] = [] results: List[Dict[str, Any]] = []
daten_map: Dict[str, List[Any]] = daten_dict.get("daten", {}) daten_map: Dict[str, Any] = daten_dict.get("daten", {})
# Iteriere über alle Einträge for ident, entry in daten_map.items():
for ident, features in daten_map.items(): layer = None
# Thema/Name ableiten (falls vorhanden in processed_results oder ident) style_path = None
# -----------------------------
# Layer extrahieren
# -----------------------------
if isinstance(entry, dict):
layer = entry.get("layer")
style_path = self._resolve_style_path(entry.get("style_path"))
if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Ungültiger Layer für {ident}",
aktion="save_exception",
kontext={"ident": ident},
)
self.pruefmanager.verarbeite(pe_err)
continue
# -----------------------------
# Layername bestimmen
# -----------------------------
thema = None thema = None
for pe in processed_results: for pe in processed_results:
try: try:
kontext = getattr(pe, "kontext", None) or {} kontext = getattr(pe, "kontext", None) or {}
if kontext and kontext.get("ident") == ident: if kontext.get("ident") == ident:
thema = kontext.get("thema") thema = kontext.get("thema")
break break
except Exception: except Exception:
continue continue
if not thema:
thema = str(ident)
layer_name = thema layer_name_raw = thema or str(ident)
layer_name = re.sub(r"[^A-Za-z0-9_]+", "_", layer_name_raw).strip("_")
if not layer_name:
layer_name = f"layer_{ident}"
# Prüfe, ob Layer bereits existiert in der Ziel-GPKG # Layer in GPKG schreiben
layer_exists = False err_msg = self._write_layer_to_gpkg(layer_name=layer_name, layer=layer)
try: if err_msg is not None:
layer_exists_fn = getattr(qgiscore, "layer_exists_in_gpkg", None) pe_err = pruef_ergebnis(
if callable(layer_exists_fn): ok=False,
layer_exists = layer_exists_fn(self.gpkg_path, layer_name) meldung=f"Fehler beim Schreiben des Layers {layer_name}: {err_msg}",
else: aktion="save_exception",
# Fallback: QGIS-Fallback-Check via QgsVectorLayer kontext={"ident": ident, "layer_name": layer_name},
if getattr(qgiscore, "QgsVectorLayer", None) is not None and qgiscore.QGIS_AVAILABLE: )
uri = f"{self.gpkg_path}|layername={layer_name}" self.pruefmanager.verarbeite(pe_err)
layer = qgiscore.QgsVectorLayer(uri, layer_name, "ogr") continue
layer_exists = bool(layer and getattr(layer, "isValid", lambda: False)())
except Exception:
layer_exists = False
operation = "created"
if layer_exists:
# Zentrale Nutzerabfrage über Pruefmanager
# Erwartet Rückgabe: "overwrite" | "append" | "cancel"
try:
user_choice = self.pruefmanager.ask_overwrite_append_cancel(layer_name)
except Exception:
# Fallback: overwrite, falls Pruefmanager nicht verfügbar
user_choice = "overwrite"
if user_choice == "cancel":
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
if user_choice == "overwrite":
write_err = self._write_layer_to_gpkg(layer_name, features, mode="overwrite")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Überschreiben von {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "overwritten"
elif user_choice == "append":
write_err = self._write_layer_to_gpkg(layer_name, features, mode="append")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Anhängen an {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "appended"
else:
# Layer existiert nicht -> neu anlegen
write_err = self._write_layer_to_gpkg(layer_name, features, mode="create")
if write_err:
pe_err = pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erstellen von {layer_name}: {write_err}",
aktion="save_exception",
kontext={"ident": ident, "thema": thema, "error": write_err},
)
self.pruefmanager.verarbeite(pe_err)
operation = "skipped"
results.append({
"ident": ident,
"thema": thema,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": 0,
})
continue
else:
operation = "created"
# Stilbehandlung (falls in processed_results referenziert)
style_written = False
style_path = None
for pe in processed_results:
try:
kontext = getattr(pe, "kontext", None) or {}
if kontext and kontext.get("ident") == ident:
style_path = kontext.get("stildatei") or kontext.get("Stildatei")
break
except Exception:
continue
# Wenn der Stil vorhanden und valide ist, als Default in GPKG-Style-Tabelle ablegen
if style_path: if style_path:
if not os.path.isabs(style_path): self._store_style_in_gpkg(layer_name, style_path, layer)
base_dir = os.path.dirname(__file__)
style_path = os.path.join(base_dir, style_path)
write_style_fn = getattr(qgiscore, "write_style_to_gpkg", None)
if callable(write_style_fn):
try:
write_style_fn(self.gpkg_path, style_path, layer_name)
style_written = True
except Exception:
style_written = False
feature_count = len(features) if isinstance(features, list) else 0
# Erfolgsfall: Info für lade_Layer sammeln
layer_path = f"{self.gpkg_path}|layername={layer_name}"
results.append({ results.append({
"layer_path": layer_path,
"thema": layer_name,
"ident": ident, "ident": ident,
"thema": thema, "style_path": style_path,
"operation": operation,
"layer_path": f"{self.gpkg_path}|layername={layer_name}",
"feature_count": feature_count,
"style_written": style_written,
}) })
return results return results
# -----------------------------
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# Lade Layer ins Projekt # Lade Layer ins Projekt
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
@@ -288,18 +280,33 @@ class Datenschreiber:
self.pruefmanager.verarbeite(pe_err) self.pruefmanager.verarbeite(pe_err)
continue continue
try: style_path = info.get("style_path")
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None) resolved_style_path = self._resolve_style_path(style_path)
if callable(apply_style_fn): if resolved_style_path:
apply_style_fn(self.gpkg_path, layer) try:
except Exception: layer.loadNamedStyle(resolved_style_path)
pe_warn = pruef_ergebnis( layer.triggerRepaint()
ok=True, except Exception as exc:
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden", pe_warn = pruef_ergebnis(
aktion="stil_not_implemented", ok=True,
kontext={"thema": thema}, meldung=f"Style konnte für {thema} nicht geladen werden: {exc}",
) aktion="stil_laden_fehlgeschlagen",
self.pruefmanager.verarbeite(pe_warn) kontext={"thema": thema, "style_path": resolved_style_path},
)
self.pruefmanager.verarbeite(pe_warn)
else:
try:
apply_style_fn = getattr(qgiscore, "apply_default_style_from_gpkg", None)
if callable(apply_style_fn):
apply_style_fn(self.gpkg_path, layer)
except Exception:
pe_warn = pruef_ergebnis(
ok=True,
meldung=f"Style konnte für {thema} nicht automatisch angewendet werden",
aktion="stil_not_implemented",
kontext={"thema": thema},
)
self.pruefmanager.verarbeite(pe_warn)
try: try:
# qgisui wrapper wird hier nicht direkt für die Abfrage verwendet; # qgisui wrapper wird hier nicht direkt für die Abfrage verwendet;
@@ -374,62 +381,67 @@ class Datenschreiber:
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# Hilfsfunktionen intern # Hilfsfunktionen intern
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
def _write_layer_to_gpkg(self, layer_name: str, features: List[Any], mode: str = "create") -> Optional[str]: def _write_layer_to_gpkg(
self,
layer_name: str,
layer: Any,
) -> Optional[str]:
""" """
Interne Hilfsfunktion zum Schreiben eines Layers in das GPKG. Schreibt einen QgsVectorLayer in die Ziel-GPKG.
Erwartete qgiscore-Funktion: Voraussetzungen:
qgiscore.write_features_to_gpkg(gpkg_path, layer_name, features, mode) - self.gpkg_path ist ein str
- layer ist ein gültiger QgsVectorLayer
""" """
write_fn = getattr(qgiscore, "write_features_to_gpkg", None)
if callable(write_fn):
try:
write_fn(self.gpkg_path, layer_name, features, mode)
return None
except Exception as exc:
return str(exc)
# Fallback: Verwende QgsVectorFileWriter, falls QGIS verfügbar if layer is None or not hasattr(layer, "isValid") or not layer.isValid():
if getattr(qgiscore, "QGIS_AVAILABLE", False) and getattr(qgiscore, "QgsVectorFileWriter", None) is not None: return "Ungültiger Layer zum Schreiben übergeben"
try:
# Minimaler Fallback: erwarte, dass 'features' eine Liste von QgsFeature ist
if not features:
# Erstelle leeren Layer-Eintrag (GPKG erlaubt leere Layer)
# Hier vereinfachen wir: writeAsVectorFormatV3 benötigt ein Layer-Objekt.
return None
# Versuche, ein Memory-Layer aus dem ersten Feature zu ermitteln try:
first = features[0] opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions()
mem_layer = None opts.driverName = "GPKG"
if hasattr(first, "fields") and hasattr(first, "geometry"): opts.layerName = layer_name
# Wenn Features QgsFeature sind, versuchen wir, das zugehörige Layer zu nutzen opts.fileEncoding = "UTF-8"
try:
mem_layer = first.layer() if hasattr(first, "layer") else None
except Exception:
mem_layer = None
if mem_layer is None: # Style in der GPKG speichern, wenn möglich
return "Keine Feld-/Geometrie-Informationen zum Schreiben vorhanden" if hasattr(opts, "symbologyExport"):
try:
# QGIS: SymbologyExport-Wert z.B. QgsVectorFileWriter.SaveVectorOptions.Symbology
saveOpts = qgiscore.QgsVectorFileWriter.SaveVectorOptions
sym_val = getattr(saveOpts, "Symbology", None)
if sym_val is None:
sym_val = getattr(saveOpts, "SymbologyExport", None)
if sym_val is not None:
opts.symbologyExport = sym_val
except Exception:
pass
opts = qgiscore.QgsVectorFileWriter.SaveVectorOptions() # Datei existiert → Layer überschreiben
opts.driverName = "GPKG" # Datei existiert nicht → neue GPKG anlegen
opts.layerName = layer_name if not os.path.exists(self.gpkg_path):
opts.fileEncoding = "UTF-8" opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile
if mode == "overwrite": else:
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteFile opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
else:
opts.actionOnExistingFile = qgiscore.QgsVectorFileWriter.CreateOrOverwriteLayer
err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3( err = qgiscore.QgsVectorFileWriter.writeAsVectorFormatV3(
mem_layer, layer,
self.gpkg_path, self.gpkg_path,
qgiscore.QgsProject.instance().transformContext(), qgiscore.QgsProject.instance().transformContext(),
opts opts,
) )
if err != qgiscore.QgsVectorFileWriter.NoError:
return f"Fehler beim Schreiben (Code {err})"
return None
except Exception as exc:
return str(exc)
return "Keine Schreib-Funktion verfügbar (Wrapper nicht implementiert)" # QGIS ≥3 liefert ein Tupel: (error_code, error_message, new_filename, new_layer_name)
if isinstance(err, tuple):
error_code = err[0]
error_msg = err[1] if len(err) > 1 else ""
else:
error_code = err
error_msg = ""
if error_code != qgiscore.QgsVectorFileWriter.NoError:
return f"Fehler beim Schreiben (Code {error_code}, msg='{error_msg}')"
return None
except Exception as exc:
return str(exc)

395
modules/LayerLoader.py Normal file
View File

@@ -0,0 +1,395 @@
"""sn_basis/modules/LayerLoader.py
Kapselt Layer-Erstellung, Raumfilter und Stil-Logik.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import time
from sn_basis.functions.os_wrapper import normalize_path, is_absolute_path
from sn_basis.functions.qgiscore_wrapper import (
QgsVectorLayer,
QgsRasterLayer,
QgsFeatureRequest,
QgsProject,
QgsNetworkAccessManager,
QgsCoordinateTransform,
)
from sn_basis.functions.sys_wrapper import get_plugin_root, join_path, file_exists
from sn_basis.modules.stilpruefer import Stilpruefer
from sn_basis.modules.layerpruefer import Layerpruefer
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
from sn_basis.functions import qt_wrapper as qt
class LayerLoader:
"""Lädt und filtert Layer aus Dienst-/Datenquellen."""
def __init__(
self,
pruefmanager: Any,
stil_pruefer: Optional[Stilpruefer] = None,
layer_pruefer: Optional[Layerpruefer] = None,
) -> None:
self.pruefmanager = pruefmanager
self.stil_pruefer = stil_pruefer or Stilpruefer()
self.layer_pruefer = layer_pruefer or Layerpruefer()
_LAYER_TIMEOUT_MS = 30_000 # 30 Sekunden
def _was_canceled(self, cancel_callback: Optional[Any]) -> bool:
if not callable(cancel_callback):
return False
try:
return bool(cancel_callback())
except Exception:
return False
def _process_events(self) -> None:
try:
if hasattr(qt, "QCoreApplication") and hasattr(qt.QCoreApplication, "processEvents"):
qt.QCoreApplication.processEvents()
except Exception:
pass
def _transform_geometry_to_layer_crs(self, geometry: Any, source_layer: Any, target_layer: Any) -> Any:
if geometry is None or source_layer is None or target_layer is None:
return geometry
if QgsCoordinateTransform is None or QgsProject is None:
return geometry
try:
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
if source_crs is None or target_crs is None:
return geometry
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
if source_authid and target_authid and source_authid == target_authid:
return geometry
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
if hasattr(geometry, "clone") and callable(getattr(geometry, "clone")):
geom_copy = geometry.clone()
else:
geom_copy = geometry
geom_copy.transform(ct)
return geom_copy
except Exception:
return geometry
def _transform_extent_to_layer_crs(self, extent: Any, source_layer: Any, target_layer: Any) -> Any:
if extent is None or source_layer is None or target_layer is None:
return extent
if QgsCoordinateTransform is None or QgsProject is None:
return extent
try:
source_crs = source_layer.crs() if hasattr(source_layer, "crs") else None
target_crs = target_layer.crs() if hasattr(target_layer, "crs") else None
if source_crs is None or target_crs is None:
return extent
source_authid = source_crs.authid() if hasattr(source_crs, "authid") else None
target_authid = target_crs.authid() if hasattr(target_crs, "authid") else None
if source_authid and target_authid and source_authid == target_authid:
return extent
ct = QgsCoordinateTransform(source_crs, target_crs, QgsProject.instance())
if hasattr(ct, "transformBoundingBox"):
return ct.transformBoundingBox(extent)
return extent
except Exception:
return extent
def create_layer(self, provider: str, link: str, thema: str) -> Optional[QgsVectorLayer]:
provider_lower = provider.lower() if provider else ""
layer = None
# Netzwerk-Timeout für alle netzwerkbasierten Provider setzen
if provider_lower in ("wfs", "wms", "rest"):
try:
nam = QgsNetworkAccessManager.instance()
if hasattr(nam, "setTimeout"):
nam.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
try:
if provider_lower == "wfs":
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
layer = QgsVectorLayer(uri, thema, "WFS")
elif provider_lower == "wms":
uri = link if link.strip().lower().startswith("url=") else f"url={link}"
layer = QgsRasterLayer(uri, thema, "wms")
elif provider_lower in ("ogr", "gpkg", "shp", "geojson"):
layer = QgsVectorLayer(link, thema, "ogr")
elif provider_lower == "rest":
rest_link = link.strip()
if rest_link.lower().endswith("/featureserver"):
rest_link = rest_link.rstrip("/") + "/0"
uri = rest_link if rest_link.lower().startswith("url=") else f"url={rest_link}"
layer = QgsVectorLayer(uri, thema, "arcgisfeatureserver")
else:
layer = QgsVectorLayer(link, thema, "ogr")
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erstellen des Layers {thema}: {exc}",
aktion="layer_nicht_verfuegbar",
kontext={"provider": provider, "link": link},
)
)
return None
if not layer or not layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Layer {thema} (Provider={provider}) konnte nicht geladen werden."
,aktion="layer_nicht_verfuegbar",
kontext={"provider": provider, "link": link},
)
)
return None
return layer
def apply_style(self, layer: QgsVectorLayer, style_path: Optional[str]) -> None:
if not style_path or layer is None or not layer.isValid():
return
if not style_path.strip():
return
if not is_absolute_path(style_path):
plugin_root = get_plugin_root()
style_path = str(join_path(plugin_root, "sn_plan41", "assets", style_path))
# normalize path for consistency
style_path = str(normalize_path(style_path))
# Debug: welche Stil-Datei wird geprüft?
print(f"[LayerLoader] Überprüfe Stildatei: '{style_path}'")
if file_exists(style_path):
try:
layer.loadNamedStyle(style_path)
layer.triggerRepaint()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Stil-Laden für {layer.name()}: {exc}",
aktion="stil_laden_fehlgeschlagen",
kontext={"thema": layer.name(), "style_path": style_path},
)
)
else:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=True,
meldung=f"Stildatei nicht gefunden (optional): {style_path}",
aktion="stil_nicht_gefunden",
kontext={"thema": layer.name(), "style_path": style_path},
)
)
def filter_by_extent(self, layer: QgsVectorLayer, extent, cancel_callback: Optional[Any] = None, source_layer: Optional[Any] = None) -> Optional[QgsVectorLayer]:
"""Beschneidet <layer> auf die rechteckige Ausdehnung <extent>.
Diese Methode verwendet einen einfachen BBOX-Filter. Für komplexere
Raumeinschränkungen (z.B. Verfahrensgebiet) sollte stattdessen
:meth:`filter_by_layer` verwendet werden, da dort echte Geometrie-Tests
stattfinden.
"""
if not layer or not layer.isValid() or extent is None:
return layer
if layer.type() != QgsVectorLayer.VectorLayer:
return layer
extent_for_layer = self._transform_extent_to_layer_crs(extent, source_layer, layer)
request = QgsFeatureRequest().setFilterRect(extent_for_layer)
if hasattr(request, "setTimeout"):
try:
request.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
start = time.monotonic()
features: List[Any] = []
try:
for feat in layer.getFeatures(request):
if self._was_canceled(cancel_callback):
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Abbruch beim Raumfilter (BBOX) für {layer.name()}",
aktion="needs_user_action",
kontext={"thema": layer.name()},
)
)
return None
elapsed_ms = int((time.monotonic() - start) * 1000)
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Timeout beim Raumfilter (BBOX) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
aktion="url_nicht_erreichbar",
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
)
)
return None
features.append(feat)
if len(features) % 100 == 0:
self._process_events()
except Exception as exc:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Lesen der Features für {layer.name()}: {exc}",
aktion="layer_nicht_verfuegbar",
kontext={"thema": layer.name()},
)
)
return None
if not features:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_bbox", "memory")
if not filtered_layer or not filtered_layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
aktion="filterlayer_nicht_erzeugt",
kontext={"thema": layer.name()},
)
)
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(features)
filtered_layer.updateExtents()
return filtered_layer
def filter_by_layer(self, layer: QgsVectorLayer, filter_layer: QgsVectorLayer, cancel_callback: Optional[Any] = None) -> Optional[QgsVectorLayer]:
"""Beschneidet <layer> auf die tatsächliche Geometrie des
<filter_layer>.
Diese Methode wird z.B. für das Verfahrensgebiet verwendet, damit nicht
die gesamte Bounding-Box, sondern nur die echten Flächen als Raumfilter
gelten. Wenn der Filter-Layer mehrere Features enthält, werden deren
Geometrien zu einem Multi-Geom vereinigt.
"""
if not layer or not layer.isValid() or not filter_layer or not filter_layer.isValid():
return layer
if layer.type() != QgsVectorLayer.VectorLayer:
return layer
# vereinigte Geometrie aller Features im Filter-Layer
union_geom = None
for f in filter_layer.getFeatures():
try:
geom = self._transform_geometry_to_layer_crs(f.geometry(), filter_layer, layer)
if union_geom is None:
union_geom = geom
else:
union_geom = union_geom.combine(geom)
except Exception:
# bei einem Fehler einfach weiterfahren
continue
if union_geom is None or union_geom.isEmpty():
return None
# nun alle Features aus <layer> nehmen, deren Geometrie sich schneidet
filtered = []
request = QgsFeatureRequest().setFilterRect(union_geom.boundingBox())
if hasattr(request, "setTimeout"):
try:
request.setTimeout(self._LAYER_TIMEOUT_MS)
except Exception:
pass
start = time.monotonic()
for f in layer.getFeatures(request):
if self._was_canceled(cancel_callback):
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Abbruch beim Raumfilter (Geometrie) für {layer.name()}",
aktion="needs_user_action",
kontext={"thema": layer.name()},
)
)
return None
elapsed_ms = int((time.monotonic() - start) * 1000)
if elapsed_ms >= self._LAYER_TIMEOUT_MS:
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Timeout beim Raumfilter (Geometrie) für {layer.name()} nach {self._LAYER_TIMEOUT_MS // 1000}s",
aktion="url_nicht_erreichbar",
kontext={"thema": layer.name(), "timeout_s": self._LAYER_TIMEOUT_MS // 1000},
)
)
return None
try:
if f.geometry() and f.geometry().intersects(union_geom):
filtered.append(f)
except Exception:
continue
if len(filtered) % 100 == 0:
self._process_events()
if not filtered:
return None
geom_type_map = {0: "Point", 1: "LineString", 2: "Polygon"}
geom_type = geom_type_map.get(layer.geometryType(), "Polygon")
uri = f"{geom_type}?crs={layer.crs().authid()}"
filtered_layer = QgsVectorLayer(uri, f"{layer.name()}_filtered", "memory")
if not filtered_layer or not filtered_layer.isValid():
self.pruefmanager.verarbeite(
pruef_ergebnis(
ok=False,
meldung=f"Fehler beim Erzeugen des Filter-Layers für {layer.name()}",
aktion="filterlayer_nicht_erzeugt",
kontext={"thema": layer.name()},
)
)
return None
provider = filtered_layer.dataProvider()
provider.addAttributes(layer.fields())
filtered_layer.updateFields()
provider.addFeatures(filtered)
filtered_layer.updateExtents()
return filtered_layer
def add_to_project(self, layer: QgsVectorLayer) -> None:
if layer and layer.isValid():
QgsProject.instance().addMapLayer(layer)

View File

@@ -5,7 +5,7 @@ sn_basis/modules/Pruefmanager.py
from __future__ import annotations from __future__ import annotations
from typing import Optional, Any from typing import Optional, Any
from sn_basis.functions import ask_yes_no, info, warning, error from sn_basis.functions import ask_yes_no, info, warning, error, ask_overwrite_append_cancel_custom
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion from sn_basis.modules.pruef_ergebnis import pruef_ergebnis, PruefAktion
print("DEBUG: Pruefmanager DATEI GELADEN:", __file__) print("DEBUG: Pruefmanager DATEI GELADEN:", __file__)
@@ -56,10 +56,51 @@ class Pruefmanager:
) )
info("DataGrabber Zusammenfassung", message) info("DataGrabber Zusammenfassung", message)
# ------------------------------------------------------------------
# Allgemeine Nutzerinteraktionen
# ------------------------------------------------------------------
def zeige_hinweis(self, titel: str, meldung: str) -> None:
"""Zeigt eine modale Hinweismeldung mit OK-Button."""
from sn_basis.functions.dialog_wrapper import show_info_dialog
show_info_dialog(titel, meldung, parent=self.parent)
def frage_ja_nein(self, titel: str, meldung: str, default: bool = True) -> bool:
"""Stellt eine Ja/Nein-Frage. Gibt True zurück, wenn der Nutzer Ja wählt."""
if self.ui_modus != "qgis":
return default
return ask_yes_no(titel, meldung, default=default, parent=self.parent)
def frage_text(self, titel: str, meldung: str, default_text: str = "") -> tuple[str, bool]:
"""Fragt einen Textwert ab und gibt Text + OK-Status zurück."""
from sn_basis.functions.dialog_wrapper import ask_text
if self.ui_modus != "qgis":
return default_text, True
return ask_text(titel, meldung, default_text=default_text, parent=self.parent)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# VERFAHRENS-DB-spezifische Entscheidungen # VERFAHRENS-DB-spezifische Entscheidungen
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _handle_datei_existiert(self, ergebnis: pruef_ergebnis) -> pruef_ergebnis: def _handle_datei_existiert(self, ergebnis: pruef_ergebnis) -> pruef_ergebnis:
"""Handhabt das Szenario, dass die Ziel-Verfahrens-DB bereits existiert.
Zeigt einen einzigen Dialog mit drei Optionen an:
- **Überschreiben**: Bestehende Layer ersetzen (entspricht YES)
- **Anhängen**: Neue Layer zur Datei hinzufügen (entspricht NO)
- **Abbrechen**: Vorgang beenden (entspricht CANCEL)
Parameters
----------
ergebnis : pruef_ergebnis
Eingabe-Ergebnis mit Dateipfad im ``kontext``-Attribut.
Returns
-------
pruef_ergebnis
Ergebnis mit Aktion:
- ``datei_existiert_ueberschreiben``
- ``datei_existiert_anhaengen``
- ``datei_existiert_ueberspringen`` (für Cancel-Fall)
"""
if self.ui_modus != "qgis": if self.ui_modus != "qgis":
return ergebnis return ergebnis
@@ -72,48 +113,34 @@ class Pruefmanager:
"Was soll geschehen?\n\n" "Was soll geschehen?\n\n"
"• **Überschreiben**: Bestehende Layer ersetzen\n" "• **Überschreiben**: Bestehende Layer ersetzen\n"
"• **Anhängen**: Neue Layer hinzufügen\n" "• **Anhängen**: Neue Layer hinzufügen\n"
"• **Überspringen**: Nur temporäre Layer erzeugen" "• **Abbrechen**: Vorgang beenden"
) )
# Vereinfacht: Erst Überschreiben? → Dann Anhängen? → Überspringen # Einzelner Dialog mit drei Optionen
if ask_yes_no( entscheidung = ask_overwrite_append_cancel_custom(
titel, parent=self.parent,
f"{meldung}\n\n**Überschreiben** (alle Layer ersetzen)?", title=titel,
default=False, message=meldung
parent=self.parent )
):
if entscheidung == "overwrite":
return pruef_ergebnis( return pruef_ergebnis(
ok=True, ok=True,
aktion="datei_existiert_ueberschreiben", aktion="datei_existiert_ueberschreiben",
kontext=ergebnis.kontext, kontext=ergebnis.kontext,
) )
elif entscheidung == "append":
if ask_yes_no(
titel,
f"{meldung}\n\n**Anhängen** (neue Layer hinzufügen)?",
default=False,
parent=self.parent
):
return pruef_ergebnis( return pruef_ergebnis(
ok=True, ok=True,
aktion="datei_existiert_anhaengen", aktion="datei_existiert_anhaengen",
kontext=ergebnis.kontext, kontext=ergebnis.kontext,
) )
else: # cancel
if ask_yes_no(
titel,
f"{meldung}\n\n**Überspringen** (nur temporäre Layer)?",
default=True,
parent=self.parent
):
return pruef_ergebnis( return pruef_ergebnis(
ok=True, ok=True,
aktion="datei_existiert_ueberspringen", aktion="datei_existiert_ueberspringen",
kontext=ergebnis.kontext, kontext=ergebnis.kontext,
) )
return ergebnis
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Basis-Entscheidungen (KORREKT: → pruef_ergebnis) # Basis-Entscheidungen (KORREKT: → pruef_ergebnis)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -210,3 +237,26 @@ class Pruefmanager:
) )
print("🔥 verarbeite() ENDE mit ok=False") print("🔥 verarbeite() ENDE mit ok=False")
return ergebnis return ergebnis
def _ask_use_or_replace_pufferlayer(self) -> str:
"""
Fragt den Nutzer, ob ein vorhandener Pufferlayer verwendet
oder ersetzt werden soll.
Returns
-------
str
"verwenden", "ersetzen" oder "abbrechen"
"""
ergebnis = pruef_ergebnis(
ok=False,
aktion="layer_existiert",
meldung="Ein Pufferlayer ist bereits vorhanden.",
)
ergebnis = self.pruefmanager.verarbeite(ergebnis)
if not ergebnis.ok:
return "abbrechen"
return "verwenden" if ergebnis.aktion == "ok" else "ersetzen"

View File

@@ -61,7 +61,8 @@ class Linkpruefer:
aktion="leer", aktion="leer",
kontext=None, kontext=None,
) )
#evtl. Pfad-Objekte in string umwandeln
eingabe = str(eingabe)
# ----------------------------------------------------- # -----------------------------------------------------
# 1. Fall: URL # 1. Fall: URL
# ----------------------------------------------------- # -----------------------------------------------------

View File

@@ -43,6 +43,13 @@ PruefAktion = Literal[
# Dateiendung/Format # Dateiendung/Format
"falsche_endung", "falsche_endung",
"pflichtfelder_fehlen", "pflichtfelder_fehlen",
"unbekannter_dateityp",
"Datenbank",
"dienst",
"excel",
"unbekannte_quelle",
# Excel/Import # Excel/Import
"kein_header", "kein_header",
@@ -51,6 +58,8 @@ PruefAktion = Literal[
"open_error", "open_error",
"datenabruf", "datenabruf",
# 🆕 VERFAHRENS-DB SPEZIFISCH (deine Anforderungen 2.d, 2.e) # 🆕 VERFAHRENS-DB SPEZIFISCH (deine Anforderungen 2.d, 2.e)
"datei_wird_erzeugt", # 2.d: Pfad gültig, Datei fehlt → weiter "datei_wird_erzeugt", # 2.d: Pfad gültig, Datei fehlt → weiter
"datei_existiert", # Datei vorhanden → Layer-Entscheidung "datei_existiert", # Datei vorhanden → Layer-Entscheidung

View File

@@ -4,9 +4,10 @@ Prüft ausschließlich, ob ein Stilpfad gültig ist.
Die Anwendung erfolgt später über eine Aktion. Die Anwendung erfolgt später über eine Aktion.
""" """
from pathlib import Path import os
from sn_basis.functions import file_exists from sn_basis.functions.os_wrapper import is_absolute_path
from sn_basis.functions.sys_wrapper import get_plugin_root, file_exists, join_path
from sn_basis.modules.pruef_ergebnis import pruef_ergebnis from sn_basis.modules.pruef_ergebnis import pruef_ergebnis
@@ -40,7 +41,11 @@ class Stilpruefer:
kontext=None, kontext=None,
) )
pfad = Path(stil_pfad) pfad = str(stil_pfad)
if not is_absolute_path(pfad):
plugin_root = get_plugin_root()
pfad = str(join_path(plugin_root, "sn_plan41", "assets", pfad))
# ----------------------------------------------------- # -----------------------------------------------------
# 2. Datei existiert nicht # 2. Datei existiert nicht
@@ -56,7 +61,7 @@ class Stilpruefer:
# ----------------------------------------------------- # -----------------------------------------------------
# 3. Falsche Endung # 3. Falsche Endung
# ----------------------------------------------------- # -----------------------------------------------------
if pfad.suffix.lower() != ".qml": if os.path.splitext(pfad)[1].lower() != ".qml":
return pruef_ergebnis( return pruef_ergebnis(
ok=False, ok=False,
meldung="Die Stil-Datei muss die Endung '.qml' haben.", meldung="Die Stil-Datei muss die Endung '.qml' haben.",