* adds support for POST of new compensations * adds shared_users property to BaseObject and Compensation to simplify fetching of shared users (Compensation inherits from intervention) * extends compensation admin index * modifies compensation manager which led to invisibility of deleted entries in the admin backend * fixes bug in sanitize_db.py where CREATED useractions would be removed if they are not found on any log but still are used on the .created attribute of the objects
518 lines
16 KiB
Python
518 lines
16 KiB
Python
"""
|
|
Author: Michel Peltriaux
|
|
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
|
|
Contact: michel.peltriaux@sgdnord.rlp.de
|
|
Created on: 15.11.21
|
|
|
|
"""
|
|
|
|
import uuid
|
|
from abc import abstractmethod
|
|
|
|
from django.contrib import messages
|
|
from django.db.models import QuerySet
|
|
|
|
from konova.tasks import celery_send_mail_shared_access_removed, celery_send_mail_shared_access_given, \
|
|
celery_send_mail_shared_data_recorded, celery_send_mail_shared_data_unrecorded, \
|
|
celery_send_mail_shared_data_deleted, celery_send_mail_shared_data_checked
|
|
from user.models import User
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.http import HttpRequest
|
|
from django.utils.timezone import now
|
|
from django.db import models, transaction
|
|
from compensation.settings import COMPENSATION_IDENTIFIER_TEMPLATE, COMPENSATION_IDENTIFIER_LENGTH, \
|
|
ECO_ACCOUNT_IDENTIFIER_TEMPLATE, ECO_ACCOUNT_IDENTIFIER_LENGTH
|
|
from ema.settings import EMA_ACCOUNT_IDENTIFIER_LENGTH, EMA_ACCOUNT_IDENTIFIER_TEMPLATE
|
|
from intervention.settings import INTERVENTION_IDENTIFIER_LENGTH, INTERVENTION_IDENTIFIER_TEMPLATE
|
|
from konova.utils import generators
|
|
from konova.utils.generators import generate_random_string
|
|
from konova.utils.message_templates import CHECKED_RECORDED_RESET, GEOMETRY_CONFLICT_WITH_TEMPLATE
|
|
from user.models import UserActionLogEntry, UserAction
|
|
|
|
|
|
class UuidModel(models.Model):
|
|
"""
|
|
Encapsules identifying via uuid
|
|
"""
|
|
id = models.UUIDField(
|
|
primary_key=True,
|
|
default=uuid.uuid4,
|
|
editable=False,
|
|
)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
|
|
class BaseResource(UuidModel):
|
|
"""
|
|
A basic resource model, which defines attributes for every derived model
|
|
"""
|
|
created = models.ForeignKey(
|
|
UserActionLogEntry,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
related_name='+'
|
|
)
|
|
modified = models.ForeignKey(
|
|
UserActionLogEntry,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
related_name='+',
|
|
help_text="Last modified"
|
|
)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def delete(self, using=None, keep_parents=False):
|
|
""" Base deleting of a resource
|
|
|
|
Args:
|
|
using ():
|
|
keep_parents ():
|
|
|
|
Returns:
|
|
|
|
"""
|
|
try:
|
|
self.created.delete()
|
|
except (ObjectDoesNotExist, AttributeError):
|
|
# Object does not exist anymore - we can skip this
|
|
pass
|
|
super().delete()
|
|
|
|
|
|
class BaseObject(BaseResource):
|
|
"""
|
|
A basic object model, which specifies BaseResource.
|
|
|
|
Mainly used for intervention, compensation, ecoaccount
|
|
"""
|
|
identifier = models.CharField(max_length=1000, null=True, blank=True)
|
|
title = models.CharField(max_length=1000, null=True, blank=True)
|
|
deleted = models.ForeignKey(UserActionLogEntry, on_delete=models.SET_NULL, null=True, blank=True, related_name='+')
|
|
comment = models.TextField(null=True, blank=True)
|
|
log = models.ManyToManyField(UserActionLogEntry, blank=True, help_text="Keeps all user actions of an object", editable=False)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
@abstractmethod
|
|
def set_status_messages(self, request: HttpRequest):
|
|
raise NotImplementedError
|
|
|
|
def mark_as_deleted(self, user: User):
|
|
""" Mark an entry as deleted
|
|
|
|
Does not delete from database but sets a timestamp for being deleted on and which user deleted the object
|
|
|
|
Args:
|
|
user (User): The performing user
|
|
|
|
Returns:
|
|
|
|
"""
|
|
if self.deleted:
|
|
# Nothing to do here
|
|
return
|
|
|
|
with transaction.atomic():
|
|
action = UserActionLogEntry.get_deleted_action(user)
|
|
self.deleted = action
|
|
self.log.add(action)
|
|
|
|
# Send mail
|
|
shared_users = self.shared_users.values_list("id", flat=True)
|
|
for user_id in shared_users:
|
|
celery_send_mail_shared_data_deleted.delay(self.identifier, user_id)
|
|
|
|
self.save()
|
|
|
|
def add_log_entry(self, action: UserAction, user: User, comment: str):
|
|
""" Wraps adding of UserActionLogEntry to log
|
|
|
|
Args:
|
|
action (UserAction): The performed UserAction
|
|
user (User): Performing user
|
|
comment (str): The optional comment
|
|
|
|
Returns:
|
|
|
|
"""
|
|
user_action = UserActionLogEntry.objects.create(
|
|
user=user,
|
|
action=action,
|
|
comment=comment
|
|
)
|
|
self.log.add(user_action)
|
|
|
|
def generate_new_identifier(self) -> str:
|
|
""" Generates a new identifier for the intervention object
|
|
|
|
Returns:
|
|
str
|
|
"""
|
|
from compensation.models import Compensation, EcoAccount
|
|
from intervention.models import Intervention
|
|
from ema.models import Ema
|
|
|
|
definitions = {
|
|
Intervention: {
|
|
"length": INTERVENTION_IDENTIFIER_LENGTH,
|
|
"template": INTERVENTION_IDENTIFIER_TEMPLATE,
|
|
},
|
|
Compensation: {
|
|
"length": COMPENSATION_IDENTIFIER_LENGTH,
|
|
"template": COMPENSATION_IDENTIFIER_TEMPLATE,
|
|
},
|
|
EcoAccount: {
|
|
"length": ECO_ACCOUNT_IDENTIFIER_LENGTH,
|
|
"template": ECO_ACCOUNT_IDENTIFIER_TEMPLATE,
|
|
},
|
|
Ema: {
|
|
"length": EMA_ACCOUNT_IDENTIFIER_LENGTH,
|
|
"template": EMA_ACCOUNT_IDENTIFIER_TEMPLATE,
|
|
},
|
|
}
|
|
|
|
if self.__class__ not in definitions:
|
|
# Not defined, yet. Create fallback identifier for this case
|
|
return generate_random_string(10)
|
|
|
|
_now = now()
|
|
curr_month = _now.month
|
|
if curr_month < 10:
|
|
# Make sure month part has two digits to have same length identifiers by default
|
|
curr_month = f"0{curr_month}"
|
|
else:
|
|
curr_month = str(curr_month)
|
|
curr_year = str(_now.year)
|
|
rand_str = generate_random_string(
|
|
length=definitions[self.__class__]["length"],
|
|
use_numbers=True,
|
|
use_letters_lc=False,
|
|
use_letters_uc=True,
|
|
)
|
|
_str = "{}{}-{}".format(curr_month, curr_year, rand_str)
|
|
return definitions[self.__class__]["template"].format(_str)
|
|
|
|
|
|
class RecordableObjectMixin(models.Model):
|
|
""" Wraps record related fields and functionality
|
|
|
|
"""
|
|
# Refers to "verzeichnen"
|
|
recorded = models.OneToOneField(
|
|
UserActionLogEntry,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
help_text="Holds data on user and timestamp of this action",
|
|
related_name="+"
|
|
)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def set_unrecorded(self, user: User):
|
|
""" Perform unrecording
|
|
|
|
Args:
|
|
user (User): Performing user
|
|
|
|
Returns:
|
|
|
|
"""
|
|
if not self.recorded:
|
|
return None
|
|
action = UserActionLogEntry.get_unrecorded_action(user)
|
|
self.recorded = None
|
|
self.save()
|
|
self.log.add(action)
|
|
|
|
shared_users = self.users.all().values_list("id", flat=True)
|
|
for user_id in shared_users:
|
|
celery_send_mail_shared_data_unrecorded.delay(self.identifier, user_id)
|
|
|
|
return action
|
|
|
|
def set_recorded(self, user: User):
|
|
""" Perform recording
|
|
|
|
Args:
|
|
user (User): Performing user
|
|
|
|
Returns:
|
|
|
|
"""
|
|
if self.recorded:
|
|
return None
|
|
action = UserActionLogEntry.get_recorded_action(user)
|
|
self.recorded = action
|
|
self.save()
|
|
self.log.add(action)
|
|
|
|
shared_users = self.users.all().values_list("id", flat=True)
|
|
for user_id in shared_users:
|
|
celery_send_mail_shared_data_recorded.delay(self.identifier, user_id)
|
|
|
|
return action
|
|
|
|
def mark_as_edited(self, performing_user: User, request: HttpRequest = None, edit_comment: str = None):
|
|
""" In case the object or a related object changed, internal processes need to be started, such as
|
|
unrecord and uncheck
|
|
|
|
Args:
|
|
performing_user (User): The user which performed the editing action
|
|
|
|
Returns:
|
|
|
|
"""
|
|
action = UserActionLogEntry.get_edited_action(performing_user, edit_comment)
|
|
self.modified = action
|
|
self.log.add(action)
|
|
self.save()
|
|
|
|
if self.recorded:
|
|
self.set_unrecorded(performing_user)
|
|
if request:
|
|
messages.info(
|
|
request,
|
|
CHECKED_RECORDED_RESET
|
|
)
|
|
|
|
@abstractmethod
|
|
def is_ready_for_publish(self) -> bool:
|
|
""" Check for all needed publishing-constraints on the data
|
|
|
|
Returns:
|
|
is_ready (bool): True|False
|
|
"""
|
|
raise NotImplementedError("Implement this in the subclass!")
|
|
|
|
|
|
class CheckableObjectMixin(models.Model):
|
|
# Checks - Refers to "Genehmigen" but optional
|
|
checked = models.OneToOneField(
|
|
UserActionLogEntry,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
help_text="Holds data on user and timestamp of this action",
|
|
related_name="+"
|
|
)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def set_unchecked(self) -> None:
|
|
""" Perform unrecording
|
|
|
|
Args:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
if not self.checked:
|
|
# Nothing to do
|
|
return
|
|
# Do not .delete() the checked attribute! Just set it to None, since a delete() would kill it out of the
|
|
# log history, which is not what we want!
|
|
self.checked = None
|
|
self.save()
|
|
return None
|
|
|
|
def set_checked(self, user: User) -> UserActionLogEntry:
|
|
""" Perform checking
|
|
|
|
Args:
|
|
user (User): Performing user
|
|
|
|
Returns:
|
|
|
|
"""
|
|
if self.checked:
|
|
# Nothing to do
|
|
return
|
|
action = UserActionLogEntry.get_checked_action(user)
|
|
self.checked = action
|
|
self.save()
|
|
|
|
# Send mail
|
|
shared_users = self.users.all().values_list("id", flat=True)
|
|
for user_id in shared_users:
|
|
celery_send_mail_shared_data_checked.delay(self.identifier, user_id)
|
|
|
|
self.log.add(action)
|
|
return action
|
|
|
|
|
|
class ShareableObjectMixin(models.Model):
|
|
# Users having access on this object
|
|
users = models.ManyToManyField(User, help_text="Users having access (data shared with)")
|
|
access_token = models.CharField(
|
|
max_length=255,
|
|
null=True,
|
|
blank=True,
|
|
help_text="Used for sharing access",
|
|
)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def generate_access_token(self, make_unique: bool = False, rec_depth: int = 5):
|
|
""" Creates a new access token for the data
|
|
|
|
Tokens are not used for identification of a table row. The share logic checks the intervention id as well
|
|
as the given token. Therefore two different interventions can hold the same access_token without problems.
|
|
For (possible) future changes to the share logic, the make_unique parameter may be used for checking whether
|
|
the access_token is already used in any intervention. If so, tokens will be generated as long as a free token
|
|
can be found.
|
|
|
|
Args:
|
|
make_unique (bool): Perform check on uniqueness over all intervention entries
|
|
rec_depth (int): How many tries for generating a free random token (only if make_unique)
|
|
|
|
Returns:
|
|
|
|
"""
|
|
# Make sure we won't end up in an infinite loop of trying to generate access_tokens
|
|
rec_depth = rec_depth - 1
|
|
if rec_depth < 0 and make_unique:
|
|
raise RuntimeError(
|
|
"Access token generating for {} does not seem to find a free random token! Aborted!".format(self.id)
|
|
)
|
|
|
|
# Create random token
|
|
token = generators.generate_random_string(15, True, True, False)
|
|
# Check dynamically wheter there is another instance of that model, which holds this random access token
|
|
_model = self._meta.concrete_model
|
|
token_used_in = _model.objects.filter(access_token=token)
|
|
# Make sure the token is not used anywhere as access_token, yet.
|
|
# Make use of QuerySet lazy method for checking if it exists or not.
|
|
if token_used_in and make_unique:
|
|
self.generate_access_token(make_unique, rec_depth)
|
|
else:
|
|
self.access_token = token
|
|
self.save()
|
|
|
|
def is_shared_with(self, user: User):
|
|
""" Access check
|
|
|
|
Checks whether a given user has access to this object
|
|
|
|
Args:
|
|
user ():
|
|
|
|
Returns:
|
|
|
|
"""
|
|
return self.users.filter(id=user.id)
|
|
|
|
def share_with(self, user: User):
|
|
""" Adds user to list of shared access users
|
|
|
|
Args:
|
|
user (User): The user to be added to the object
|
|
|
|
Returns:
|
|
|
|
"""
|
|
if not self.is_shared_with(user):
|
|
self.users.add(user)
|
|
|
|
def share_with_list(self, user_list: list):
|
|
""" Sets the list of shared access users
|
|
|
|
Args:
|
|
user_list (list): The users to be added to the object
|
|
|
|
Returns:
|
|
|
|
"""
|
|
self.users.set(user_list)
|
|
|
|
def update_sharing_user(self, form):
|
|
""" Adds a new user with shared access to the object
|
|
|
|
Args:
|
|
form (ShareModalForm): The form holding the data
|
|
|
|
Returns:
|
|
|
|
"""
|
|
form_data = form.cleaned_data
|
|
|
|
keep_accessing_users = form_data["users"]
|
|
new_accessing_users = list(form_data["user_select"].values_list("id", flat=True))
|
|
accessing_users = keep_accessing_users + new_accessing_users
|
|
users = User.objects.filter(
|
|
id__in=accessing_users
|
|
)
|
|
removed_users = self.users.all().exclude(
|
|
id__in=accessing_users
|
|
).values("id")
|
|
|
|
# Send mails
|
|
for user in removed_users:
|
|
celery_send_mail_shared_access_removed.delay(self.identifier, user["id"])
|
|
for user in new_accessing_users:
|
|
celery_send_mail_shared_access_given.delay(self.identifier, user)
|
|
|
|
# Set new shared users
|
|
self.share_with_list(users)
|
|
|
|
@property
|
|
def shared_users(self) -> QuerySet:
|
|
""" Shortcut for fetching the users which have shared access on this object
|
|
|
|
Returns:
|
|
users (QuerySet)
|
|
"""
|
|
return self.users.all()
|
|
|
|
|
|
class GeoReferencedMixin(models.Model):
|
|
geometry = models.ForeignKey("konova.Geometry", null=True, blank=True, on_delete=models.SET_NULL)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def get_underlying_parcels(self):
|
|
""" Getter for related parcels
|
|
|
|
Returns:
|
|
parcels (Iterable): An empty list or a Queryset
|
|
"""
|
|
if self.geometry is not None:
|
|
return self.geometry.get_underlying_parcels()
|
|
else:
|
|
return []
|
|
|
|
def set_geometry_conflict_message(self, request: HttpRequest):
|
|
if self.geometry is None:
|
|
return request
|
|
|
|
instance_objs = []
|
|
add_message = False
|
|
conflicts = self.geometry.conflicts_geometries.all()
|
|
|
|
for conflict in conflicts:
|
|
instance_objs += conflict.affected_geometry.get_data_objects()
|
|
add_message = True
|
|
|
|
conflicts = self.geometry.conflicted_by_geometries.all()
|
|
for conflict in conflicts:
|
|
instance_objs += conflict.conflicting_geometry.get_data_objects()
|
|
add_message = True
|
|
|
|
if add_message:
|
|
instance_identifiers = [x.identifier for x in instance_objs]
|
|
instance_identifiers = ", ".join(instance_identifiers)
|
|
message_str = GEOMETRY_CONFLICT_WITH_TEMPLATE.format(instance_identifiers)
|
|
messages.info(request, message_str)
|
|
return request
|