From 7acd11096b17429f8edf4b7229b9a0db7a86bbb1 Mon Sep 17 00:00:00 2001 From: mpeltriaux Date: Thu, 4 Aug 2022 15:25:55 +0200 Subject: [PATCH 1/3] #189 Parcel calculation mutex * adds cache based mutex (valid e.g. for celery use cases) * drops atomic parcel bulk creation in favor of proper mutex implementation --- konova/models/geometry.py | 85 +++++++++++++++----------- konova/sub_settings/django_settings.py | 11 ++++ konova/utils/mutex.py | 35 +++++++++++ requirements.txt | 3 +- 4 files changed, 99 insertions(+), 35 deletions(-) create mode 100644 konova/utils/mutex.py diff --git a/konova/models/geometry.py b/konova/models/geometry.py index e55082ac..43c94368 100644 --- a/konova/models/geometry.py +++ b/konova/models/geometry.py @@ -6,14 +6,17 @@ Created on: 15.11.21 """ import json +from time import sleep from django.contrib.gis.db.models import MultiPolygonField from django.contrib.gis.geos import Polygon -from django.db import models, transaction +from django.core.exceptions import MultipleObjectsReturned +from django.db import models from django.utils import timezone from konova.models import BaseResource, UuidModel from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP +from konova.utils.mutex import cache_lock from konova.utils.wfs.spatial import ParcelWFSFetcher @@ -99,7 +102,6 @@ class Geometry(BaseResource): objs += set_objs return objs - @transaction.atomic def update_parcels(self): """ Updates underlying parcel information @@ -122,38 +124,53 @@ class Geometry(BaseResource): # which needs to be deleted and just keep the numerical values ## THIS CAN BE REMOVED IN THE FUTURE, WHEN 'Flur' WON'T OCCUR ANYMORE! flr_val = parcel_properties["flur"].replace("Flur ", "") - district = District.objects.get_or_create( - key=parcel_properties["kreisschl"], - name=parcel_properties["kreis"], - )[0] - municipal = Municipal.objects.get_or_create( - key=parcel_properties["gmdschl"], - name=parcel_properties["gemeinde"], - district=district, - )[0] - parcel_group = ParcelGroup.objects.get_or_create( - key=parcel_properties["gemaschl"], - name=parcel_properties["gemarkung"], - municipal=municipal, - )[0] - flrstck_nnr = parcel_properties['flstnrnen'] - if not flrstck_nnr: - flrstck_nnr = None - flrstck_zhlr = parcel_properties['flstnrzae'] - if not flrstck_zhlr: - flrstck_zhlr = None - parcel_obj = Parcel.objects.get_or_create( - district=district, - municipal=municipal, - parcel_group=parcel_group, - flr=flr_val, - flrstck_nnr=flrstck_nnr, - flrstck_zhlr=flrstck_zhlr, - )[0] - parcel_obj.district = district - parcel_obj.updated_on = _now - parcel_obj.save() - underlying_parcels.append(parcel_obj) + + # Run possible race-condition snippet mutexed + # Use Flurstückkennzeichen as identifier to prevent the same calculation runs parallel, leading to + # a race condition + flr_id = parcel_properties['flstkennz'] + lock_id = f"parcel_calc-lock-{flr_id}" + with cache_lock(lock_id) as acquired: + while not acquired: + print(f"Am locked. Need to rest. Calculating: {parcel_properties}") + sleep(0.5) + acquired = cache_lock(lock_id) + district = District.objects.get_or_create( + key=parcel_properties["kreisschl"], + name=parcel_properties["kreis"], + )[0] + municipal = Municipal.objects.get_or_create( + key=parcel_properties["gmdschl"], + name=parcel_properties["gemeinde"], + district=district, + )[0] + parcel_group = ParcelGroup.objects.get_or_create( + key=parcel_properties["gemaschl"], + name=parcel_properties["gemarkung"], + municipal=municipal, + )[0] + flrstck_nnr = parcel_properties['flstnrnen'] + if not flrstck_nnr: + flrstck_nnr = None + flrstck_zhlr = parcel_properties['flstnrzae'] + if not flrstck_zhlr: + flrstck_zhlr = None + + try: + parcel_obj = Parcel.objects.get_or_create( + district=district, + municipal=municipal, + parcel_group=parcel_group, + flr=flr_val, + flrstck_nnr=flrstck_nnr, + flrstck_zhlr=flrstck_zhlr, + )[0] + except MultipleObjectsReturned as e: + raise MultipleObjectsReturned(f"{e}: {flr_id}") + parcel_obj.district = district + parcel_obj.updated_on = _now + parcel_obj.save() + underlying_parcels.append(parcel_obj) # Update the linked parcels self.parcels.clear() diff --git a/konova/sub_settings/django_settings.py b/konova/sub_settings/django_settings.py index 746df289..d738bfbc 100644 --- a/konova/sub_settings/django_settings.py +++ b/konova/sub_settings/django_settings.py @@ -132,6 +132,17 @@ DATABASES = { } } +CACHES = { + "default": { + "BACKEND": "django_redis.cache.RedisCache", + "LOCATION": "redis://localhost:6379/1", + "OPTIONS": { + "CLIENT_CLASS": "django_redis.client.DefaultClient" + }, + "KEY_PREFIX": "konova_cache" + } +} + # Password validation # https://docs.djangoproject.com/en/3.1/ref/settings/#auth-password-validators diff --git a/konova/utils/mutex.py b/konova/utils/mutex.py new file mode 100644 index 00000000..8d5360c5 --- /dev/null +++ b/konova/utils/mutex.py @@ -0,0 +1,35 @@ +from contextlib import contextmanager + +from django.core.cache import cache +import time + +LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes + + +@contextmanager +def cache_lock(lock_id): + """ Cache based lock e.g. for parallel celery processing + + Derived from https://docs.celeryq.dev/en/latest/tutorials/task-cookbook.html + + Use it like + ... + with cache_lock(lock_id, self.app.oid) as acquired: + if acquired: + do_mutexed_stuff() + ... + + """ + timeout_at = time.monotonic() + LOCK_EXPIRE - 3 + # cache.add fails if the key already exists + status = cache.add(lock_id, "LOCKED", LOCK_EXPIRE) + try: + yield status + finally: + # advantage of using add() for atomic locking + if time.monotonic() < timeout_at and status: + # don't release the lock if we exceeded the timeout + # to lessen the chance of releasing an expired lock + # owned by someone else + # also don't release the lock if we didn't acquire it + cache.delete(lock_id) diff --git a/requirements.txt b/requirements.txt index 79a479ed..8101fa0c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,7 @@ django-bootstrap4==3.0.1 django-debug-toolbar==3.1.1 django-filter==2.4.0 django-fontawesome-5==1.0.18 +django-redis==5.2.0 django-simple-sso==1.1.0 django-tables2==2.3.4 et-xmlfile==1.1.0 @@ -48,4 +49,4 @@ wcwidth==0.2.5 webservices==0.7 wrapt==1.13.3 xmltodict==0.12.0 -zipp==3.4.1 \ No newline at end of file +zipp==3.4.1 From a8ffcf856d207c36544ec162eb17cf1863b603ec Mon Sep 17 00:00:00 2001 From: mpeltriaux Date: Fri, 5 Aug 2022 11:31:52 +0200 Subject: [PATCH 2/3] #189 Minor fix * adds try-catch if WFS result is not a valid json --- konova/models/geometry.py | 2 +- konova/utils/wfs/spatial.py | 35 ++++++++++++++++++++++++++++------- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/konova/models/geometry.py b/konova/models/geometry.py index 43c94368..081757c1 100644 --- a/konova/models/geometry.py +++ b/konova/models/geometry.py @@ -132,9 +132,9 @@ class Geometry(BaseResource): lock_id = f"parcel_calc-lock-{flr_id}" with cache_lock(lock_id) as acquired: while not acquired: - print(f"Am locked. Need to rest. Calculating: {parcel_properties}") sleep(0.5) acquired = cache_lock(lock_id) + district = District.objects.get_or_create( key=parcel_properties["kreisschl"], name=parcel_properties["kreis"], diff --git a/konova/utils/wfs/spatial.py b/konova/utils/wfs/spatial.py index e6cae847..1c7367f0 100644 --- a/konova/utils/wfs/spatial.py +++ b/konova/utils/wfs/spatial.py @@ -86,13 +86,14 @@ class ParcelWFSFetcher(AbstractWFSFetcher): from konova.models import Geometry if filter_srid is None: filter_srid = DEFAULT_SRID_RLP - geom_gml = Geometry.objects.filter( + geom = Geometry.objects.filter( id=self.geometry_id ).annotate( transformed=Transform(srid=filter_srid, expression="geom") ).annotate( gml=AsGML(MakeValid('transformed')) - ).first().gml + ).first() + geom_gml = geom.gml spatial_filter = f"<{geometry_operation}>{self.geometry_property_name}{geom_gml}" return spatial_filter @@ -163,7 +164,28 @@ class ParcelWFSFetcher(AbstractWFSFetcher): if rerun_on_exception: # Wait a second before another try sleep(1) - self.get_features( + return self.get_features( + typenames, + spatial_operator, + filter_srid, + start_index, + rerun_on_exception=False + ) + else: + e.msg += content + raise e + + try: + fetched_features = content.get( + "features", + {}, + ) + except AttributeError as e: + # Might occur if returned content is no json but something different (maybe an error message) + if rerun_on_exception: + # Wait a second before another try + sleep(1) + return self.get_features( typenames, spatial_operator, filter_srid, @@ -173,10 +195,6 @@ class ParcelWFSFetcher(AbstractWFSFetcher): else: e.msg += content raise e - fetched_features = content.get( - "features", - {}, - ) found_features += fetched_features @@ -187,4 +205,7 @@ class ParcelWFSFetcher(AbstractWFSFetcher): # If a 'full' response returned, there might be more to fetch. Increase the start_index! start_index += self.count + if not found_features: + print(f"No features found. Fetched content: {content}\nUsed POST body: {post_body}") + return found_features From fbf613f16eadc0ca8714cdeb5769012e846a5676 Mon Sep 17 00:00:00 2001 From: mpeltriaux Date: Mon, 8 Aug 2022 12:13:10 +0200 Subject: [PATCH 3/3] #189 EcoAccount deferred parcel calculation * adds celery parcel calculation to EcoAccount migration logic --- .../management/commands/kspMigrater/eco_account_migrater.py | 2 ++ user/admin.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/konova/management/commands/kspMigrater/eco_account_migrater.py b/konova/management/commands/kspMigrater/eco_account_migrater.py index fa815880..95cac2e8 100644 --- a/konova/management/commands/kspMigrater/eco_account_migrater.py +++ b/konova/management/commands/kspMigrater/eco_account_migrater.py @@ -14,6 +14,7 @@ from intervention.models import Responsibility, Handler, Intervention, Legal from konova.management.commands.kspMigrater.compensation_migrater import CompensationMigrater from konova.models import Geometry from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP +from konova.tasks import celery_update_parcels from user.models import UserActionLogEntry @@ -144,6 +145,7 @@ class EcoAccountMigrater(CompensationMigrater): instance.deductable_surface = area instance.geometry.geom = db_result_geom if not db_result_geom.empty else None instance.geometry.save() + celery_update_parcels.delay(instance.geometry.id) except TypeError: raise TypeError(f"{identifier}, {db_result_geom}") except GDALException as e: diff --git a/user/admin.py b/user/admin.py index bf5f5f86..da2a6695 100644 --- a/user/admin.py +++ b/user/admin.py @@ -1,7 +1,7 @@ from django.contrib import admin from konova.admin import DeletableObjectMixinAdmin -from user.models import User, Team +from user.models import User, Team, UserActionLogEntry class UserNotificationAdmin(admin.ModelAdmin): @@ -93,4 +93,4 @@ admin.site.register(Team, TeamAdmin) # Outcommented for a cleaner admin backend on production #admin.site.register(UserNotification, UserNotificationAdmin) -#admin.site.register(UserActionLogEntry, UserActionLogEntryAdmin) +admin.site.register(UserActionLogEntry, UserActionLogEntryAdmin)