konova/konova/models/object.py
mpeltriaux 14fee4474f #229 Shared users mandatory on admin
* changes mandatory state of users and teams on admin backend to optional (as expected by the model)
* adds team selection to admin backend
2022-11-16 13:27:57 +01:00

772 lines
23 KiB
Python

"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 15.11.21
"""
import uuid
from abc import abstractmethod
from django.contrib import messages
from django.db.models import QuerySet
from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP, LANIS_ZOOM_LUT, LANIS_LINK_TEMPLATE
from konova.tasks import celery_send_mail_shared_access_removed, celery_send_mail_shared_access_given, \
celery_send_mail_shared_data_recorded, celery_send_mail_shared_data_unrecorded, \
celery_send_mail_shared_data_deleted, celery_send_mail_shared_data_checked, \
celery_send_mail_shared_access_given_team, celery_send_mail_shared_access_removed_team, \
celery_send_mail_shared_data_checked_team, celery_send_mail_shared_data_deleted_team, \
celery_send_mail_shared_data_unrecorded_team, celery_send_mail_shared_data_recorded_team
from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpRequest
from django.utils.timezone import now
from django.db import models, transaction
from compensation.settings import COMPENSATION_IDENTIFIER_TEMPLATE, COMPENSATION_IDENTIFIER_LENGTH, \
ECO_ACCOUNT_IDENTIFIER_TEMPLATE, ECO_ACCOUNT_IDENTIFIER_LENGTH
from ema.settings import EMA_ACCOUNT_IDENTIFIER_LENGTH, EMA_ACCOUNT_IDENTIFIER_TEMPLATE
from intervention.settings import INTERVENTION_IDENTIFIER_LENGTH, INTERVENTION_IDENTIFIER_TEMPLATE
from konova.utils import generators
from konova.utils.generators import generate_random_string
from konova.utils.message_templates import CHECKED_RECORDED_RESET, GEOMETRY_CONFLICT_WITH_TEMPLATE
class UuidModel(models.Model):
"""
Encapsules identifying via uuid
"""
id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False,
)
class Meta:
abstract = True
class BaseResource(UuidModel):
"""
A basic resource model, which defines attributes for every derived model
"""
created = models.ForeignKey(
"user.UserActionLogEntry",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='+'
)
modified = models.ForeignKey(
"user.UserActionLogEntry",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='+',
help_text="Last modified"
)
class Meta:
abstract = True
def delete(self, using=None, keep_parents=False):
""" Base deleting of a resource
Args:
using ():
keep_parents ():
Returns:
"""
try:
self.created.delete()
except (ObjectDoesNotExist, AttributeError):
# Object does not exist anymore - we can skip this
pass
super().delete()
class DeletableObjectMixin(models.Model):
""" Wraps deleted field and related functionality
"""
deleted = models.ForeignKey("user.UserActionLogEntry", on_delete=models.SET_NULL, null=True, blank=True, related_name='+')
class Meta:
abstract = True
def mark_as_deleted(self, user, send_mail: bool = True):
""" Mark an entry as deleted
Does not delete from database but sets a timestamp for being deleted on and which user deleted the object
Args:
user (User): The performing user
Returns:
"""
from user.models import UserActionLogEntry
if self.deleted:
# Nothing to do here
return
with transaction.atomic():
action = UserActionLogEntry.get_deleted_action(user)
self.deleted = action
self.log.add(action)
if send_mail:
# Send mail
shared_users = self.shared_users.values_list("id", flat=True)
for user_id in shared_users:
celery_send_mail_shared_data_deleted.delay(self.identifier, self.title, user_id)
# Send mail
shared_teams = self.shared_teams.values_list("id", flat=True)
for team_id in shared_teams:
celery_send_mail_shared_data_deleted_team.delay(self.identifier, self.title, team_id)
self.save()
class BaseObject(BaseResource, DeletableObjectMixin):
"""
A basic object model, which specifies BaseResource.
Mainly used for intervention, compensation, ecoaccount
"""
identifier = models.CharField(max_length=1000, null=True, blank=True)
title = models.CharField(max_length=1000, null=True, blank=True)
comment = models.TextField(null=True, blank=True)
log = models.ManyToManyField("user.UserActionLogEntry", blank=True, help_text="Keeps all user actions of an object", editable=False)
class Meta:
abstract = True
@abstractmethod
def set_status_messages(self, request: HttpRequest):
raise NotImplementedError
def mark_as_edited(self, performing_user, request: HttpRequest = None, edit_comment: str = None):
""" In case the object or a related object changed the log history needs to be updated
Args:
performing_user (User): The user which performed the editing action
request (HttpRequest): The used request for this action
edit_comment (str): Additional comment for the log entry
Returns:
"""
from user.models import UserActionLogEntry
edit_action = UserActionLogEntry.get_edited_action(performing_user, edit_comment)
self.modified = edit_action
self.log.add(edit_action)
self.save()
return edit_action
def add_log_entry(self, action, user, comment: str):
""" Wraps adding of UserActionLogEntry to log
Args:
action (UserAction): The performed UserAction
user (User): Performing user
comment (str): The optional comment
Returns:
"""
from user.models import UserActionLogEntry
user_action = UserActionLogEntry.objects.create(
user=user,
action=action,
comment=comment
)
self.log.add(user_action)
def generate_new_identifier(self) -> str:
""" Generates a new identifier for the intervention object
Returns:
str
"""
from compensation.models import Compensation, EcoAccount
from intervention.models import Intervention
from ema.models import Ema
definitions = {
Intervention: {
"length": INTERVENTION_IDENTIFIER_LENGTH,
"template": INTERVENTION_IDENTIFIER_TEMPLATE,
},
Compensation: {
"length": COMPENSATION_IDENTIFIER_LENGTH,
"template": COMPENSATION_IDENTIFIER_TEMPLATE,
},
EcoAccount: {
"length": ECO_ACCOUNT_IDENTIFIER_LENGTH,
"template": ECO_ACCOUNT_IDENTIFIER_TEMPLATE,
},
Ema: {
"length": EMA_ACCOUNT_IDENTIFIER_LENGTH,
"template": EMA_ACCOUNT_IDENTIFIER_TEMPLATE,
},
}
if self.__class__ not in definitions:
# Not defined, yet. Create fallback identifier for this case
return generate_random_string(10)
_now = now()
curr_month = _now.month
if curr_month < 10:
# Make sure month part has two digits to have same length identifiers by default
curr_month = f"0{curr_month}"
else:
curr_month = str(curr_month)
curr_year = str(_now.year)
rand_str = generate_random_string(
length=definitions[self.__class__]["length"],
use_numbers=True,
use_letters_lc=False,
use_letters_uc=True,
)
_str = "{}{}-{}".format(curr_month, curr_year, rand_str)
return definitions[self.__class__]["template"].format(_str)
@abstractmethod
def get_detail_url(self):
raise NotImplementedError()
class RecordableObjectMixin(models.Model):
""" Wraps record related fields and functionality
"""
# Refers to "verzeichnen"
recorded = models.OneToOneField(
"user.UserActionLogEntry",
on_delete=models.SET_NULL,
null=True,
blank=True,
help_text="Holds data on user and timestamp of this action",
related_name="+"
)
class Meta:
abstract = True
def set_unrecorded(self, user):
""" Perform unrecording
Args:
user (User): Performing user
Returns:
"""
from user.models import UserActionLogEntry
if not self.recorded:
return None
action = UserActionLogEntry.get_unrecorded_action(user)
self.recorded = None
self.save()
self.log.add(action)
shared_users = self.shared_users.values_list("id", flat=True)
shared_teams = self.shared_teams.values_list("id", flat=True)
for user_id in shared_users:
celery_send_mail_shared_data_unrecorded.delay(self.identifier, self.title, user_id)
for team_id in shared_teams:
celery_send_mail_shared_data_unrecorded_team.delay(self.identifier, self.title, team_id)
return action
def set_recorded(self, user):
""" Perform recording
Args:
user (User): Performing user
Returns:
"""
from user.models import UserActionLogEntry
if self.recorded:
return None
self.unshare_with_default_users()
action = UserActionLogEntry.get_recorded_action(user)
self.recorded = action
self.save()
self.log.add(action)
shared_users = self.shared_users.values_list("id", flat=True)
shared_teams = self.shared_teams.values_list("id", flat=True)
for user_id in shared_users:
celery_send_mail_shared_data_recorded.delay(self.identifier, self.title, user_id)
for team_id in shared_teams:
celery_send_mail_shared_data_recorded_team.delay(self.identifier, self.title, team_id)
return action
def unrecord(self, performing_user, request: HttpRequest = None):
""" Unrecords a dataset
Args:
performing_user (User): The user which performed the editing action
request (HttpRequest): The used request for this action
Returns:
"""
action = None
if self.recorded:
action = self.set_unrecorded(performing_user)
self.log.add(action)
if request:
messages.info(
request,
CHECKED_RECORDED_RESET
)
return action
@abstractmethod
def is_ready_for_publish(self) -> bool:
""" Check for all needed publishing-constraints on the data
Returns:
is_ready (bool): True|False
"""
raise NotImplementedError("Implement this in the subclass!")
@property
def is_recorded(self):
""" Getter for record status as property
Returns:
"""
return self.recorded is not None
class CheckableObjectMixin(models.Model):
# Checks - Refers to "Genehmigen" but optional
checked = models.OneToOneField(
"user.UserActionLogEntry",
on_delete=models.SET_NULL,
null=True,
blank=True,
help_text="Holds data on user and timestamp of this action",
related_name="+"
)
class Meta:
abstract = True
def set_unchecked(self) -> None:
""" Perform unrecording
Args:
Returns:
"""
if not self.checked:
# Nothing to do
return
# Do not .delete() the checked attribute! Just set it to None, since a delete() would kill it out of the
# log history, which is not what we want!
self.checked = None
self.save()
return None
def set_checked(self, user):
""" Perform checking
Args:
user (User): Performing user
Returns:
"""
from user.models import UserActionLogEntry
if self.checked:
# Nothing to do
return
action = UserActionLogEntry.get_checked_action(user)
self.checked = action
self.save()
# Send mail
shared_users = self.shared_users.values_list("id", flat=True)
for user_id in shared_users:
celery_send_mail_shared_data_checked.delay(self.identifier, self.title, user_id)
# Send mail
shared_teams = self.shared_teams.values_list("id", flat=True)
for team_id in shared_teams:
celery_send_mail_shared_data_checked_team.delay(self.identifier, self.title, team_id)
self.log.add(action)
return action
def get_last_checked_action(self):
""" Getter for the most recent checked action on the log
Returns:
previously_checked (UserActionLogEntry): The most recent checked action
"""
from user.models import UserAction
previously_checked = self.log.filter(
action=UserAction.CHECKED
).order_by(
"-timestamp"
).first()
return previously_checked
class ShareableObjectMixin(models.Model):
# Users having access on this object
users = models.ManyToManyField(
"user.User",
help_text="Users having access (data shared with)",
blank=True
)
teams = models.ManyToManyField(
"user.Team",
help_text="Teams having access (data shared with)",
blank=True
)
access_token = models.CharField(
max_length=255,
null=True,
blank=True,
help_text="Used for sharing access",
)
class Meta:
abstract = True
def generate_access_token(self, make_unique: bool = False, rec_depth: int = 5):
""" Creates a new access token for the data
Tokens are not used for identification of a table row. The share logic checks the intervention id as well
as the given token. Therefore two different interventions can hold the same access_token without problems.
For (possible) future changes to the share logic, the make_unique parameter may be used for checking whether
the access_token is already used in any intervention. If so, tokens will be generated as long as a free token
can be found.
Args:
make_unique (bool): Perform check on uniqueness over all intervention entries
rec_depth (int): How many tries for generating a free random token (only if make_unique)
Returns:
"""
# Make sure we won't end up in an infinite loop of trying to generate access_tokens
rec_depth = rec_depth - 1
if rec_depth < 0 and make_unique:
raise RuntimeError(
"Access token generating for {} does not seem to find a free random token! Aborted!".format(self.id)
)
# Create random token
token = generators.generate_random_string(15, True, True, False)
# Check dynamically wheter there is another instance of that model, which holds this random access token
_model = self._meta.concrete_model
token_used_in = _model.objects.filter(access_token=token)
# Make sure the token is not used anywhere as access_token, yet.
# Make use of QuerySet lazy method for checking if it exists or not.
if token_used_in and make_unique:
self.generate_access_token(make_unique, rec_depth)
else:
self.access_token = token
self.save()
def is_shared_with(self, user):
""" Access check
Checks whether a given user has access to this object
Args:
user ():
Returns:
"""
directly_shared = self.shared_users.filter(id=user.id).exists()
team_shared = self.shared_teams.filter(
users__in=[user]
).exists()
is_shared = directly_shared or team_shared
return is_shared
def share_with_team(self, team):
""" Adds team to list of shared access teans
Args:
team (Team): The team to be added to the object
Returns:
"""
self.teams.add(team)
def share_with_team_list(self, team_list: list):
""" Sets the list of shared access teams
Args:
team_list (list): The teams to be added to the object
Returns:
"""
self.teams.set(team_list)
def share_with_user(self, user):
""" Adds user to list of shared access users
Args:
user (User): The user to be added to the object
Returns:
"""
if not self.is_shared_with(user):
self.users.add(user)
def share_with_user_list(self, user_list: list):
""" Sets the list of shared access users
Args:
user_list (list): The users to be added to the object
Returns:
"""
self.users.set(user_list)
def _update_shared_teams(self, form):
""" Updates shared access on the object for teams
Args:
form (ShareModalForm): The form holding the data
Returns:
"""
form_data = form.cleaned_data
shared_teams = self.shared_teams
# Fetch selected teams and find out which user IDs are in removed teams -> mails need to be sent
accessing_teams = form_data["teams"]
removed_teams = shared_teams.exclude(
id__in=accessing_teams
).values_list("id", flat=True)
new_teams = accessing_teams.exclude(
id__in=shared_teams
).values_list("id", flat=True)
for team_id in new_teams:
celery_send_mail_shared_access_given_team.delay(self.identifier, self.title, team_id)
for team_id in removed_teams:
celery_send_mail_shared_access_removed_team.delay(self.identifier, self.title, team_id)
self.share_with_team_list(accessing_teams)
def _update_shared_users(self, form):
""" Updates shared access on the object for single users
Args:
form (ShareModalForm): The form holding the data
Returns:
"""
form_data = form.cleaned_data
shared_users = self.shared_users
# Fetch selected users
accessing_users = form_data["users"]
removed_users = shared_users.exclude(
id__in=accessing_users
).values_list("id", flat=True)
new_users = accessing_users.exclude(
id__in=shared_users
).values_list("id", flat=True)
# Send mails
for user_id in removed_users:
celery_send_mail_shared_access_removed.delay(self.identifier, self.title, user_id)
for user_id in new_users:
celery_send_mail_shared_access_given.delay(self.identifier, self.title, user_id)
# Set new shared users
self.share_with_user_list(accessing_users)
def update_shared_access(self, form):
""" Updates shared access on the object
Args:
form (ShareModalForm): The form holding the data
Returns:
"""
self._update_shared_teams(form)
self._update_shared_users(form)
@property
def shared_users(self) -> QuerySet:
""" Shortcut for fetching the users which have shared access on this object
Returns:
users (QuerySet)
"""
return self.users.all()
@property
def shared_teams(self) -> QuerySet:
""" Shortcut for fetching the teams which have shared access on this object
Returns:
teams (QuerySet)
"""
return self.teams.filter(
deleted__isnull=True
)
@abstractmethod
def get_share_url(self):
""" Returns the share url for the object
Returns:
"""
raise NotImplementedError("Must be implemented in subclasses!")
def unshare_with_default_users(self):
""" Removes all shared users from direct shared access which are only default group users
Returns:
"""
from konova.utils.user_checks import is_default_group_only
users = self.shared_users
cleaned_users = []
default_users = []
for user in users:
if not is_default_group_only(user):
cleaned_users.append(user)
else:
default_users.append(user)
self.share_with_user_list(cleaned_users)
for user in default_users:
celery_send_mail_shared_access_removed.delay(self.identifier, self.title, user.id)
class GeoReferencedMixin(models.Model):
geometry = models.ForeignKey("konova.Geometry", null=True, blank=True, on_delete=models.SET_NULL)
class Meta:
abstract = True
def get_underlying_parcels(self):
""" Getter for related parcels
Returns:
parcels (Iterable): An empty list or a Queryset
"""
result = []
if self.geometry is not None:
result = self.geometry.get_underlying_parcels()
return result
def count_underlying_parcels(self):
""" Getter for number of underlying parcels
Returns:
"""
result = 0
if self.geometry is not None:
result = self.geometry.count_underlying_parcels()
return result
def set_geometry_conflict_message(self, request: HttpRequest):
if self.geometry is None:
return request
instance_objs = []
conflicts = self.geometry.conflicts_geometries.all()
for conflict in conflicts:
instance_objs += conflict.affected_geometry.get_data_objects()
conflicts = self.geometry.conflicted_by_geometries.all()
for conflict in conflicts:
instance_objs += conflict.conflicting_geometry.get_data_objects()
add_message = len(instance_objs) > 0
if add_message:
instance_identifiers = [x.identifier for x in instance_objs]
instance_identifiers = ", ".join(instance_identifiers)
message_str = GEOMETRY_CONFLICT_WITH_TEMPLATE.format(instance_identifiers)
messages.info(request, message_str)
return request
def get_LANIS_link(self) -> str:
""" Generates a link for LANIS depending on the geometry
Returns:
"""
try:
geom = self.geometry.geom.transform(DEFAULT_SRID_RLP, clone=True)
x = geom.centroid.x
y = geom.centroid.y
area = int(geom.envelope.area)
z_l = 16
for k_area, v_zoom in LANIS_ZOOM_LUT.items():
if k_area < area:
z_l = v_zoom
break
zoom_lvl = z_l
except (AttributeError, IndexError) as e:
# If no geometry has been added, yet.
x = 1
y = 1
zoom_lvl = 6
return LANIS_LINK_TEMPLATE.format(
zoom_lvl,
x,
y,
)
class ResubmitableObjectMixin(models.Model):
resubmissions = models.ManyToManyField(
"konova.Resubmission",
blank=True,
related_name="+",
)
class Meta:
abstract = True
def resubmit(self):
""" Run resubmit check and run for all related resubmissions
"""
resubmissions = self.resubmissions.all()
for resubmission in resubmissions:
resubmission.send_resubmission_mail(self.identifier)