Merge pull request '189_Parcel_calculation_race_condition' (#193) from 189_Parcel_calculation_race_condition into 132_Old_entries
Reviewed-on: SGD-Nord/konova#193
This commit is contained in:
commit
5e0144f761
@ -14,6 +14,7 @@ from intervention.models import Responsibility, Handler, Intervention, Legal
|
||||
from konova.management.commands.kspMigrater.compensation_migrater import CompensationMigrater
|
||||
from konova.models import Geometry
|
||||
from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP
|
||||
from konova.tasks import celery_update_parcels
|
||||
from user.models import UserActionLogEntry
|
||||
|
||||
|
||||
@ -144,6 +145,7 @@ class EcoAccountMigrater(CompensationMigrater):
|
||||
instance.deductable_surface = area
|
||||
instance.geometry.geom = db_result_geom if not db_result_geom.empty else None
|
||||
instance.geometry.save()
|
||||
celery_update_parcels.delay(instance.geometry.id)
|
||||
except TypeError:
|
||||
raise TypeError(f"{identifier}, {db_result_geom}")
|
||||
except GDALException as e:
|
||||
|
@ -6,14 +6,17 @@ Created on: 15.11.21
|
||||
|
||||
"""
|
||||
import json
|
||||
from time import sleep
|
||||
|
||||
from django.contrib.gis.db.models import MultiPolygonField
|
||||
from django.contrib.gis.geos import Polygon
|
||||
from django.db import models, transaction
|
||||
from django.core.exceptions import MultipleObjectsReturned
|
||||
from django.db import models
|
||||
from django.utils import timezone
|
||||
|
||||
from konova.models import BaseResource, UuidModel
|
||||
from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP
|
||||
from konova.utils.mutex import cache_lock
|
||||
from konova.utils.wfs.spatial import ParcelWFSFetcher
|
||||
|
||||
|
||||
@ -99,7 +102,6 @@ class Geometry(BaseResource):
|
||||
objs += set_objs
|
||||
return objs
|
||||
|
||||
@transaction.atomic
|
||||
def update_parcels(self):
|
||||
""" Updates underlying parcel information
|
||||
|
||||
@ -122,38 +124,53 @@ class Geometry(BaseResource):
|
||||
# which needs to be deleted and just keep the numerical values
|
||||
## THIS CAN BE REMOVED IN THE FUTURE, WHEN 'Flur' WON'T OCCUR ANYMORE!
|
||||
flr_val = parcel_properties["flur"].replace("Flur ", "")
|
||||
district = District.objects.get_or_create(
|
||||
key=parcel_properties["kreisschl"],
|
||||
name=parcel_properties["kreis"],
|
||||
)[0]
|
||||
municipal = Municipal.objects.get_or_create(
|
||||
key=parcel_properties["gmdschl"],
|
||||
name=parcel_properties["gemeinde"],
|
||||
district=district,
|
||||
)[0]
|
||||
parcel_group = ParcelGroup.objects.get_or_create(
|
||||
key=parcel_properties["gemaschl"],
|
||||
name=parcel_properties["gemarkung"],
|
||||
municipal=municipal,
|
||||
)[0]
|
||||
flrstck_nnr = parcel_properties['flstnrnen']
|
||||
if not flrstck_nnr:
|
||||
flrstck_nnr = None
|
||||
flrstck_zhlr = parcel_properties['flstnrzae']
|
||||
if not flrstck_zhlr:
|
||||
flrstck_zhlr = None
|
||||
parcel_obj = Parcel.objects.get_or_create(
|
||||
district=district,
|
||||
municipal=municipal,
|
||||
parcel_group=parcel_group,
|
||||
flr=flr_val,
|
||||
flrstck_nnr=flrstck_nnr,
|
||||
flrstck_zhlr=flrstck_zhlr,
|
||||
)[0]
|
||||
parcel_obj.district = district
|
||||
parcel_obj.updated_on = _now
|
||||
parcel_obj.save()
|
||||
underlying_parcels.append(parcel_obj)
|
||||
|
||||
# Run possible race-condition snippet mutexed
|
||||
# Use Flurstückkennzeichen as identifier to prevent the same calculation runs parallel, leading to
|
||||
# a race condition
|
||||
flr_id = parcel_properties['flstkennz']
|
||||
lock_id = f"parcel_calc-lock-{flr_id}"
|
||||
with cache_lock(lock_id) as acquired:
|
||||
while not acquired:
|
||||
sleep(0.5)
|
||||
acquired = cache_lock(lock_id)
|
||||
|
||||
district = District.objects.get_or_create(
|
||||
key=parcel_properties["kreisschl"],
|
||||
name=parcel_properties["kreis"],
|
||||
)[0]
|
||||
municipal = Municipal.objects.get_or_create(
|
||||
key=parcel_properties["gmdschl"],
|
||||
name=parcel_properties["gemeinde"],
|
||||
district=district,
|
||||
)[0]
|
||||
parcel_group = ParcelGroup.objects.get_or_create(
|
||||
key=parcel_properties["gemaschl"],
|
||||
name=parcel_properties["gemarkung"],
|
||||
municipal=municipal,
|
||||
)[0]
|
||||
flrstck_nnr = parcel_properties['flstnrnen']
|
||||
if not flrstck_nnr:
|
||||
flrstck_nnr = None
|
||||
flrstck_zhlr = parcel_properties['flstnrzae']
|
||||
if not flrstck_zhlr:
|
||||
flrstck_zhlr = None
|
||||
|
||||
try:
|
||||
parcel_obj = Parcel.objects.get_or_create(
|
||||
district=district,
|
||||
municipal=municipal,
|
||||
parcel_group=parcel_group,
|
||||
flr=flr_val,
|
||||
flrstck_nnr=flrstck_nnr,
|
||||
flrstck_zhlr=flrstck_zhlr,
|
||||
)[0]
|
||||
except MultipleObjectsReturned as e:
|
||||
raise MultipleObjectsReturned(f"{e}: {flr_id}")
|
||||
parcel_obj.district = district
|
||||
parcel_obj.updated_on = _now
|
||||
parcel_obj.save()
|
||||
underlying_parcels.append(parcel_obj)
|
||||
|
||||
# Update the linked parcels
|
||||
self.parcels.clear()
|
||||
|
@ -132,6 +132,17 @@ DATABASES = {
|
||||
}
|
||||
}
|
||||
|
||||
CACHES = {
|
||||
"default": {
|
||||
"BACKEND": "django_redis.cache.RedisCache",
|
||||
"LOCATION": "redis://localhost:6379/1",
|
||||
"OPTIONS": {
|
||||
"CLIENT_CLASS": "django_redis.client.DefaultClient"
|
||||
},
|
||||
"KEY_PREFIX": "konova_cache"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# Password validation
|
||||
# https://docs.djangoproject.com/en/3.1/ref/settings/#auth-password-validators
|
||||
|
35
konova/utils/mutex.py
Normal file
35
konova/utils/mutex.py
Normal file
@ -0,0 +1,35 @@
|
||||
from contextlib import contextmanager
|
||||
|
||||
from django.core.cache import cache
|
||||
import time
|
||||
|
||||
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
|
||||
|
||||
|
||||
@contextmanager
|
||||
def cache_lock(lock_id):
|
||||
""" Cache based lock e.g. for parallel celery processing
|
||||
|
||||
Derived from https://docs.celeryq.dev/en/latest/tutorials/task-cookbook.html
|
||||
|
||||
Use it like
|
||||
...
|
||||
with cache_lock(lock_id, self.app.oid) as acquired:
|
||||
if acquired:
|
||||
do_mutexed_stuff()
|
||||
...
|
||||
|
||||
"""
|
||||
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
|
||||
# cache.add fails if the key already exists
|
||||
status = cache.add(lock_id, "LOCKED", LOCK_EXPIRE)
|
||||
try:
|
||||
yield status
|
||||
finally:
|
||||
# advantage of using add() for atomic locking
|
||||
if time.monotonic() < timeout_at and status:
|
||||
# don't release the lock if we exceeded the timeout
|
||||
# to lessen the chance of releasing an expired lock
|
||||
# owned by someone else
|
||||
# also don't release the lock if we didn't acquire it
|
||||
cache.delete(lock_id)
|
@ -86,13 +86,14 @@ class ParcelWFSFetcher(AbstractWFSFetcher):
|
||||
from konova.models import Geometry
|
||||
if filter_srid is None:
|
||||
filter_srid = DEFAULT_SRID_RLP
|
||||
geom_gml = Geometry.objects.filter(
|
||||
geom = Geometry.objects.filter(
|
||||
id=self.geometry_id
|
||||
).annotate(
|
||||
transformed=Transform(srid=filter_srid, expression="geom")
|
||||
).annotate(
|
||||
gml=AsGML(MakeValid('transformed'))
|
||||
).first().gml
|
||||
).first()
|
||||
geom_gml = geom.gml
|
||||
spatial_filter = f"<Filter><{geometry_operation}><PropertyName>{self.geometry_property_name}</PropertyName>{geom_gml}</{geometry_operation}></Filter>"
|
||||
return spatial_filter
|
||||
|
||||
@ -163,7 +164,28 @@ class ParcelWFSFetcher(AbstractWFSFetcher):
|
||||
if rerun_on_exception:
|
||||
# Wait a second before another try
|
||||
sleep(1)
|
||||
self.get_features(
|
||||
return self.get_features(
|
||||
typenames,
|
||||
spatial_operator,
|
||||
filter_srid,
|
||||
start_index,
|
||||
rerun_on_exception=False
|
||||
)
|
||||
else:
|
||||
e.msg += content
|
||||
raise e
|
||||
|
||||
try:
|
||||
fetched_features = content.get(
|
||||
"features",
|
||||
{},
|
||||
)
|
||||
except AttributeError as e:
|
||||
# Might occur if returned content is no json but something different (maybe an error message)
|
||||
if rerun_on_exception:
|
||||
# Wait a second before another try
|
||||
sleep(1)
|
||||
return self.get_features(
|
||||
typenames,
|
||||
spatial_operator,
|
||||
filter_srid,
|
||||
@ -173,10 +195,6 @@ class ParcelWFSFetcher(AbstractWFSFetcher):
|
||||
else:
|
||||
e.msg += content
|
||||
raise e
|
||||
fetched_features = content.get(
|
||||
"features",
|
||||
{},
|
||||
)
|
||||
|
||||
found_features += fetched_features
|
||||
|
||||
@ -187,4 +205,7 @@ class ParcelWFSFetcher(AbstractWFSFetcher):
|
||||
# If a 'full' response returned, there might be more to fetch. Increase the start_index!
|
||||
start_index += self.count
|
||||
|
||||
if not found_features:
|
||||
print(f"No features found. Fetched content: {content}\nUsed POST body: {post_body}")
|
||||
|
||||
return found_features
|
||||
|
@ -18,6 +18,7 @@ django-bootstrap4==3.0.1
|
||||
django-debug-toolbar==3.1.1
|
||||
django-filter==2.4.0
|
||||
django-fontawesome-5==1.0.18
|
||||
django-redis==5.2.0
|
||||
django-simple-sso==1.1.0
|
||||
django-tables2==2.3.4
|
||||
et-xmlfile==1.1.0
|
||||
|
@ -1,7 +1,7 @@
|
||||
from django.contrib import admin
|
||||
|
||||
from konova.admin import DeletableObjectMixinAdmin
|
||||
from user.models import User, Team
|
||||
from user.models import User, Team, UserActionLogEntry
|
||||
|
||||
|
||||
class UserNotificationAdmin(admin.ModelAdmin):
|
||||
@ -93,4 +93,4 @@ admin.site.register(Team, TeamAdmin)
|
||||
|
||||
# Outcommented for a cleaner admin backend on production
|
||||
#admin.site.register(UserNotification, UserNotificationAdmin)
|
||||
#admin.site.register(UserActionLogEntry, UserActionLogEntryAdmin)
|
||||
admin.site.register(UserActionLogEntry, UserActionLogEntryAdmin)
|
||||
|
Loading…
Reference in New Issue
Block a user