* fixes race condition on geometry conflict calculation if performed in background process * simplifies access to smaller buffered geometry * adds mapping of "qm"->"m2" for UnitChoice in API usage for backwards compatibility
163 lines
5.4 KiB
Python
163 lines
5.4 KiB
Python
"""
|
|
Author: Michel Peltriaux
|
|
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
|
|
Contact: ksp-servicestelle@sgdnord.rlp.de
|
|
Created on: 15.08.22
|
|
|
|
"""
|
|
import json
|
|
|
|
from django.contrib.gis import gdal
|
|
from django.contrib.gis.forms import MultiPolygonField
|
|
from django.contrib.gis.geos import MultiPolygon, Polygon
|
|
from django.contrib.gis.geos.prototypes.io import WKTWriter
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from konova.forms.base_form import BaseForm
|
|
from konova.models import Geometry
|
|
from konova.tasks import celery_update_parcels, celery_check_for_geometry_conflicts
|
|
from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP
|
|
from user.models import UserActionLogEntry
|
|
|
|
|
|
class SimpleGeomForm(BaseForm):
|
|
""" A geometry form for rendering geometry read-only using a widget
|
|
|
|
"""
|
|
read_only = True
|
|
geom = MultiPolygonField(
|
|
srid=DEFAULT_SRID_RLP,
|
|
label=_("Geometry"),
|
|
help_text=_(""),
|
|
label_suffix="",
|
|
required=False,
|
|
disabled=False,
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.read_only = kwargs.pop("read_only", True)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
# Initialize geometry
|
|
try:
|
|
geom = self.instance.geometry.geom
|
|
self.empty = geom.empty
|
|
|
|
if self.empty:
|
|
raise AttributeError
|
|
|
|
geojson = self.instance.geometry.as_feature_collection(srid=DEFAULT_SRID_RLP)
|
|
geom = json.dumps(geojson)
|
|
except AttributeError:
|
|
# If no geometry exists for this form, we simply set the value to None and zoom to the maximum level
|
|
geom = ""
|
|
self.empty = True
|
|
|
|
self.initialize_form_field("geom", geom)
|
|
|
|
def is_valid(self):
|
|
super().is_valid()
|
|
is_valid = True
|
|
|
|
# Get geojson from form
|
|
geom = self.data["geom"]
|
|
if geom is None or len(geom) == 0:
|
|
# empty geometry is a valid geometry
|
|
return is_valid
|
|
geom = json.loads(geom)
|
|
|
|
# Write submitted data back into form field to make sure invalid geometry
|
|
# will be rendered again on failed submit
|
|
self.initialize_form_field("geom", self.data["geom"])
|
|
|
|
# Read geojson into gdal geometry
|
|
# HINT: This can be simplified if the geojson format holds data in epsg:4326 (GDAL provides direct creation for
|
|
# this case)
|
|
features = []
|
|
features_json = geom.get("features", [])
|
|
accepted_ogr_types = [
|
|
"Polygon",
|
|
"Polygon25D",
|
|
"MultiPolygon",
|
|
"MultiPolygon25D",
|
|
]
|
|
for feature in features_json:
|
|
feature_geom = feature.get("geometry", feature)
|
|
if feature_geom is None:
|
|
# Fallback for rare cases where a feature does not contain any geometry
|
|
continue
|
|
|
|
feature_geom = json.dumps(feature_geom)
|
|
g = gdal.OGRGeometry(feature_geom, srs=DEFAULT_SRID_RLP)
|
|
|
|
flatten_geometry = g.coord_dim > 2
|
|
if flatten_geometry:
|
|
g = self.__flatten_geom_to_2D(g)
|
|
|
|
if g.geom_type not in accepted_ogr_types:
|
|
self.add_error("geom", _("Only surfaces allowed. Points or lines must be buffered."))
|
|
is_valid = False
|
|
return is_valid
|
|
|
|
polygon = Polygon.from_ewkt(g.ewkt)
|
|
is_valid = polygon.valid
|
|
if not is_valid:
|
|
self.add_error("geom", polygon.valid_reason)
|
|
return is_valid
|
|
|
|
features.append(polygon)
|
|
form_geom = MultiPolygon(srid=DEFAULT_SRID_RLP)
|
|
for feature in features:
|
|
form_geom = form_geom.union(feature)
|
|
|
|
# Make sure to convert into a MultiPolygon. Relevant if a single Polygon is provided.
|
|
if form_geom.geom_type != "MultiPolygon":
|
|
form_geom = MultiPolygon(form_geom, srid=DEFAULT_SRID_RLP)
|
|
|
|
# Write unioned Multipolygon into cleaned data
|
|
if self.cleaned_data is None:
|
|
self.cleaned_data = {}
|
|
self.cleaned_data["geom"] = form_geom.ewkt
|
|
|
|
return is_valid
|
|
|
|
def save(self, action: UserActionLogEntry):
|
|
""" Saves the form's geometry
|
|
|
|
Creates a new geometry entry if none is set, yet
|
|
|
|
Args:
|
|
action ():
|
|
|
|
Returns:
|
|
|
|
"""
|
|
try:
|
|
if self.instance is None or self.instance.geometry is None:
|
|
raise LookupError
|
|
geometry = self.instance.geometry
|
|
geometry.geom = self.cleaned_data.get("geom", MultiPolygon(srid=DEFAULT_SRID_RLP))
|
|
geometry.modified = action
|
|
|
|
geometry.save()
|
|
except LookupError:
|
|
# No geometry or linked instance holding a geometry exist --> create a new one!
|
|
geometry = Geometry.objects.create(
|
|
geom=self.cleaned_data.get("geom", MultiPolygon(srid=DEFAULT_SRID_RLP)),
|
|
created=action,
|
|
)
|
|
# Start parcel update and geometry conflict checking procedure in a background process
|
|
celery_update_parcels.delay(geometry.id)
|
|
celery_check_for_geometry_conflicts.delay(geometry.id)
|
|
return geometry
|
|
|
|
def __flatten_geom_to_2D(self, geom):
|
|
"""
|
|
Enforces a given OGRGeometry from higher dimensions into 2D
|
|
|
|
"""
|
|
wkt_w = WKTWriter(dim=2)
|
|
g_wkt = wkt_w.write(geom.geos).decode("utf-8")
|
|
geom = gdal.OGRGeometry(g_wkt)
|
|
return geom
|