#189 Parcel calculation mutex

* adds cache based mutex (valid e.g. for celery use cases)
* drops atomic parcel bulk creation in favor of proper mutex implementation
This commit is contained in:
mpeltriaux 2022-08-04 15:25:55 +02:00
parent 9d11008fee
commit 7acd11096b
4 changed files with 99 additions and 35 deletions

View File

@ -6,14 +6,17 @@ Created on: 15.11.21
"""
import json
from time import sleep
from django.contrib.gis.db.models import MultiPolygonField
from django.contrib.gis.geos import Polygon
from django.db import models, transaction
from django.core.exceptions import MultipleObjectsReturned
from django.db import models
from django.utils import timezone
from konova.models import BaseResource, UuidModel
from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP
from konova.utils.mutex import cache_lock
from konova.utils.wfs.spatial import ParcelWFSFetcher
@ -99,7 +102,6 @@ class Geometry(BaseResource):
objs += set_objs
return objs
@transaction.atomic
def update_parcels(self):
""" Updates underlying parcel information
@ -122,38 +124,53 @@ class Geometry(BaseResource):
# which needs to be deleted and just keep the numerical values
## THIS CAN BE REMOVED IN THE FUTURE, WHEN 'Flur' WON'T OCCUR ANYMORE!
flr_val = parcel_properties["flur"].replace("Flur ", "")
district = District.objects.get_or_create(
key=parcel_properties["kreisschl"],
name=parcel_properties["kreis"],
)[0]
municipal = Municipal.objects.get_or_create(
key=parcel_properties["gmdschl"],
name=parcel_properties["gemeinde"],
district=district,
)[0]
parcel_group = ParcelGroup.objects.get_or_create(
key=parcel_properties["gemaschl"],
name=parcel_properties["gemarkung"],
municipal=municipal,
)[0]
flrstck_nnr = parcel_properties['flstnrnen']
if not flrstck_nnr:
flrstck_nnr = None
flrstck_zhlr = parcel_properties['flstnrzae']
if not flrstck_zhlr:
flrstck_zhlr = None
parcel_obj = Parcel.objects.get_or_create(
district=district,
municipal=municipal,
parcel_group=parcel_group,
flr=flr_val,
flrstck_nnr=flrstck_nnr,
flrstck_zhlr=flrstck_zhlr,
)[0]
parcel_obj.district = district
parcel_obj.updated_on = _now
parcel_obj.save()
underlying_parcels.append(parcel_obj)
# Run possible race-condition snippet mutexed
# Use Flurstückkennzeichen as identifier to prevent the same calculation runs parallel, leading to
# a race condition
flr_id = parcel_properties['flstkennz']
lock_id = f"parcel_calc-lock-{flr_id}"
with cache_lock(lock_id) as acquired:
while not acquired:
print(f"Am locked. Need to rest. Calculating: {parcel_properties}")
sleep(0.5)
acquired = cache_lock(lock_id)
district = District.objects.get_or_create(
key=parcel_properties["kreisschl"],
name=parcel_properties["kreis"],
)[0]
municipal = Municipal.objects.get_or_create(
key=parcel_properties["gmdschl"],
name=parcel_properties["gemeinde"],
district=district,
)[0]
parcel_group = ParcelGroup.objects.get_or_create(
key=parcel_properties["gemaschl"],
name=parcel_properties["gemarkung"],
municipal=municipal,
)[0]
flrstck_nnr = parcel_properties['flstnrnen']
if not flrstck_nnr:
flrstck_nnr = None
flrstck_zhlr = parcel_properties['flstnrzae']
if not flrstck_zhlr:
flrstck_zhlr = None
try:
parcel_obj = Parcel.objects.get_or_create(
district=district,
municipal=municipal,
parcel_group=parcel_group,
flr=flr_val,
flrstck_nnr=flrstck_nnr,
flrstck_zhlr=flrstck_zhlr,
)[0]
except MultipleObjectsReturned as e:
raise MultipleObjectsReturned(f"{e}: {flr_id}")
parcel_obj.district = district
parcel_obj.updated_on = _now
parcel_obj.save()
underlying_parcels.append(parcel_obj)
# Update the linked parcels
self.parcels.clear()

View File

@ -132,6 +132,17 @@ DATABASES = {
}
}
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://localhost:6379/1",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient"
},
"KEY_PREFIX": "konova_cache"
}
}
# Password validation
# https://docs.djangoproject.com/en/3.1/ref/settings/#auth-password-validators

35
konova/utils/mutex.py Normal file
View File

@ -0,0 +1,35 @@
from contextlib import contextmanager
from django.core.cache import cache
import time
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
@contextmanager
def cache_lock(lock_id):
""" Cache based lock e.g. for parallel celery processing
Derived from https://docs.celeryq.dev/en/latest/tutorials/task-cookbook.html
Use it like
...
with cache_lock(lock_id, self.app.oid) as acquired:
if acquired:
do_mutexed_stuff()
...
"""
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, "LOCKED", LOCK_EXPIRE)
try:
yield status
finally:
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)

View File

@ -18,6 +18,7 @@ django-bootstrap4==3.0.1
django-debug-toolbar==3.1.1
django-filter==2.4.0
django-fontawesome-5==1.0.18
django-redis==5.2.0
django-simple-sso==1.1.0
django-tables2==2.3.4
et-xmlfile==1.1.0
@ -48,4 +49,4 @@ wcwidth==0.2.5
webservices==0.7
wrapt==1.13.3
xmltodict==0.12.0
zipp==3.4.1
zipp==3.4.1