243 lines
7.9 KiB
Python
243 lines
7.9 KiB
Python
"""
|
|
Author: Michel Peltriaux
|
|
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
|
|
Contact: ksp-servicestelle@sgdnord.rlp.de
|
|
Created on: 15.08.22
|
|
|
|
"""
|
|
import json
|
|
|
|
from django.contrib.gis import gdal
|
|
from django.contrib.gis.geos import MultiPolygon, Polygon
|
|
from django.contrib.gis.geos.prototypes.io import WKTWriter
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.forms import JSONField
|
|
|
|
from konova.forms.base_form import BaseForm
|
|
from konova.models import Geometry
|
|
from konova.settings import GEOM_MAX_VERTICES
|
|
from konova.tasks import celery_update_parcels, celery_check_for_geometry_conflicts
|
|
from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP
|
|
from user.models import UserActionLogEntry
|
|
|
|
|
|
class SimpleGeomForm(BaseForm):
|
|
""" A geometry form for rendering geometry read-only using a widget
|
|
|
|
"""
|
|
read_only = True
|
|
geometry_simplified = False
|
|
output = JSONField(
|
|
label=_("Geometry"),
|
|
help_text=_(""),
|
|
label_suffix="",
|
|
required=False,
|
|
disabled=False,
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.read_only = kwargs.pop("read_only", True)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
# Initialize geometry
|
|
try:
|
|
geom = self.instance.geometry.geom
|
|
self.empty = geom.empty
|
|
|
|
if self.empty:
|
|
raise AttributeError
|
|
|
|
geojson = self.instance.geometry.as_feature_collection(srid=DEFAULT_SRID_RLP)
|
|
geom = json.dumps(geojson)
|
|
except AttributeError:
|
|
# If no geometry exists for this form, we simply set the value to None and zoom to the maximum level
|
|
geom = ""
|
|
self.empty = True
|
|
|
|
self.initialize_form_field("output", geom)
|
|
|
|
def is_valid(self):
|
|
super().is_valid()
|
|
is_valid = True
|
|
|
|
# Get geojson from form
|
|
geom = self.data["output"]
|
|
if geom is None or len(geom) == 0:
|
|
# empty geometry is a valid geometry
|
|
self.cleaned_data["output"] = MultiPolygon(srid=DEFAULT_SRID_RLP).ewkt
|
|
return is_valid
|
|
geom = json.loads(geom)
|
|
|
|
# Write submitted data back into form field to make sure invalid geometry
|
|
# will be rendered again on failed submit
|
|
self.initialize_form_field("output", self.data["output"])
|
|
|
|
# Initialize features list with empty MultiPolygon, so that an empty input will result in a
|
|
# proper empty MultiPolygon object
|
|
features = []
|
|
features_json = geom.get("features", [])
|
|
accepted_ogr_types = [
|
|
"Polygon",
|
|
"Polygon25D",
|
|
"MultiPolygon",
|
|
"MultiPolygon25D",
|
|
]
|
|
for feature in features_json:
|
|
feature_geom = feature.get("geometry", feature)
|
|
if feature_geom is None:
|
|
# Fallback for rare cases where a feature does not contain any geometry
|
|
continue
|
|
|
|
feature_geom = json.dumps(feature_geom)
|
|
g = gdal.OGRGeometry(feature_geom, srs=DEFAULT_SRID_RLP)
|
|
|
|
flatten_geometry = g.coord_dim > 2
|
|
if flatten_geometry:
|
|
g = self.__flatten_geom_to_2D(g)
|
|
|
|
if g.geom_type not in accepted_ogr_types:
|
|
self.add_error("output", _("Only surfaces allowed. Points or lines must be buffered."))
|
|
is_valid &= False
|
|
return is_valid
|
|
|
|
is_valid &= self.__is_area_valid(g)
|
|
g = Polygon.from_ewkt(g.ewkt)
|
|
is_valid &= g.valid
|
|
if not g.valid:
|
|
self.add_error("output", g.valid_reason)
|
|
return is_valid
|
|
|
|
if isinstance(g, Polygon):
|
|
features.append(g)
|
|
elif isinstance(g, MultiPolygon):
|
|
features.extend(list(g))
|
|
|
|
# Unionize all geometry features into one new MultiPolygon
|
|
if features:
|
|
form_geom = MultiPolygon(*features, srid=DEFAULT_SRID_RLP).unary_union
|
|
else:
|
|
form_geom = MultiPolygon(srid=DEFAULT_SRID_RLP)
|
|
|
|
# Make sure to convert into a MultiPolygon. Relevant if a single Polygon is provided.
|
|
form_geom = Geometry.cast_to_multipolygon(form_geom)
|
|
|
|
# Write unioned Multipolygon into cleaned data
|
|
if self.cleaned_data is None:
|
|
self.cleaned_data = {}
|
|
self.cleaned_data["output"] = form_geom.ewkt
|
|
|
|
return is_valid
|
|
|
|
def __is_vertices_num_valid(self):
|
|
""" Checks whether the number of vertices in the geometry is not too high
|
|
|
|
Returns:
|
|
|
|
"""
|
|
geom = self.cleaned_data.get("output")
|
|
g = gdal.OGRGeometry(geom, srs=DEFAULT_SRID_RLP)
|
|
num_vertices = g.num_coords
|
|
|
|
return num_vertices <= GEOM_MAX_VERTICES
|
|
|
|
def __is_area_valid(self, geom: gdal.OGRGeometry):
|
|
""" Checks whether the area is at least > 1m²
|
|
|
|
Returns:
|
|
|
|
"""
|
|
is_area_valid = geom.area > 1 # > 1m² (SRID:25832)
|
|
|
|
if not is_area_valid:
|
|
self.add_error(
|
|
"output",
|
|
_("Geometry must be greater than 1m². Currently is {}m²").format(
|
|
float(geom.area)
|
|
)
|
|
)
|
|
|
|
return is_area_valid
|
|
|
|
def __simplify_geometry(self, geom, max_vert: int):
|
|
""" Simplifies a geometry
|
|
|
|
Geometry will be simplified until a threshold of max vertices has been reached.
|
|
|
|
Args:
|
|
geom (MultiPolygon): The geometry
|
|
max_vert (int): Threshold of maximum vertices in geometry
|
|
|
|
Returns:
|
|
geom (MultiPolygon): The simplified geometry
|
|
"""
|
|
tolerance = 0.1
|
|
n = geom.num_coords
|
|
while(n > max_vert):
|
|
geom = geom.simplify(tolerance)
|
|
n = geom.num_coords
|
|
tolerance += 0.1
|
|
return geom
|
|
|
|
def save(self, action: UserActionLogEntry):
|
|
""" Saves the form's geometry
|
|
|
|
Creates a new geometry entry if none is set, yet
|
|
|
|
Args:
|
|
action ():
|
|
|
|
Returns:
|
|
|
|
"""
|
|
try:
|
|
if self.instance is None or self.instance.geometry is None:
|
|
raise LookupError
|
|
geometry = self.instance.geometry
|
|
geometry.geom = self.cleaned_data.get("output", MultiPolygon(srid=DEFAULT_SRID_RLP))
|
|
geometry.modified = action
|
|
|
|
geometry.save()
|
|
except LookupError:
|
|
# No geometry or linked instance holding a geometry exist --> create a new one!
|
|
geometry = Geometry.objects.create(
|
|
geom=self.cleaned_data.get("output", MultiPolygon(srid=DEFAULT_SRID_RLP)),
|
|
created=action,
|
|
)
|
|
|
|
is_vertices_num_valid = self.__is_vertices_num_valid()
|
|
if not is_vertices_num_valid:
|
|
geometry.geom = self.__simplify_geometry(geometry.geom, max_vert=GEOM_MAX_VERTICES)
|
|
geometry.save()
|
|
self.geometry_simplified = True
|
|
|
|
# Start parcel update and geometry conflict checking procedure in a background process
|
|
celery_update_parcels.delay(geometry.id)
|
|
celery_check_for_geometry_conflicts.delay(geometry.id)
|
|
return geometry
|
|
|
|
def __flatten_geom_to_2D(self, geom):
|
|
"""
|
|
Enforces a given OGRGeometry from higher dimensions into 2D
|
|
|
|
"""
|
|
wkt_w = WKTWriter(dim=2)
|
|
g_wkt = wkt_w.write(geom.geos).decode("utf-8")
|
|
geom = gdal.OGRGeometry(g_wkt)
|
|
return geom
|
|
|
|
def _set_properties(self, geojson: dict, title: str):
|
|
""" Toggles the editable property of the geojson for proper handling in map client
|
|
|
|
Args:
|
|
geojson (dict): The GeoJson
|
|
|
|
Returns:
|
|
geojson (dict): The altered GeoJson
|
|
"""
|
|
features = geojson.get("features", [])
|
|
for feature in features:
|
|
feature["properties"]["editable"] = not self.read_only
|
|
if title:
|
|
feature["properties"]["title"] = title
|
|
return geojson
|