""" Author: Michel Peltriaux Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany Contact: michel.peltriaux@sgdnord.rlp.de Created on: 15.11.21 """ import uuid from abc import abstractmethod from django.contrib import messages from django.db.models import QuerySet from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP, LANIS_ZOOM_LUT, LANIS_LINK_TEMPLATE from konova.tasks import celery_send_mail_shared_access_removed, celery_send_mail_shared_access_given, \ celery_send_mail_shared_data_recorded, celery_send_mail_shared_data_unrecorded, \ celery_send_mail_shared_data_deleted, celery_send_mail_shared_data_checked, \ celery_send_mail_shared_access_given_team, celery_send_mail_shared_access_removed_team, \ celery_send_mail_shared_data_checked_team, celery_send_mail_shared_data_deleted_team, \ celery_send_mail_shared_data_unrecorded_team, celery_send_mail_shared_data_recorded_team from django.core.exceptions import ObjectDoesNotExist from django.http import HttpRequest from django.utils.timezone import now from django.db import models, transaction from compensation.settings import COMPENSATION_IDENTIFIER_TEMPLATE, COMPENSATION_IDENTIFIER_LENGTH, \ ECO_ACCOUNT_IDENTIFIER_TEMPLATE, ECO_ACCOUNT_IDENTIFIER_LENGTH from ema.settings import EMA_ACCOUNT_IDENTIFIER_LENGTH, EMA_ACCOUNT_IDENTIFIER_TEMPLATE from intervention.settings import INTERVENTION_IDENTIFIER_LENGTH, INTERVENTION_IDENTIFIER_TEMPLATE from konova.utils import generators from konova.utils.generators import generate_random_string from konova.utils.message_templates import CHECKED_RECORDED_RESET, GEOMETRY_CONFLICT_WITH_TEMPLATE class UuidModel(models.Model): """ Encapsules identifying via uuid """ id = models.UUIDField( primary_key=True, default=uuid.uuid4, editable=False, ) class Meta: abstract = True class BaseResource(UuidModel): """ A basic resource model, which defines attributes for every derived model """ created = models.ForeignKey( "user.UserActionLogEntry", on_delete=models.SET_NULL, null=True, blank=True, related_name='+' ) modified = models.ForeignKey( "user.UserActionLogEntry", on_delete=models.SET_NULL, null=True, blank=True, related_name='+', help_text="Last modified" ) class Meta: abstract = True def delete(self, using=None, keep_parents=False): """ Base deleting of a resource Args: using (): keep_parents (): Returns: """ try: self.created.delete() except (ObjectDoesNotExist, AttributeError): # Object does not exist anymore - we can skip this pass super().delete() class BaseObject(BaseResource): """ A basic object model, which specifies BaseResource. Mainly used for intervention, compensation, ecoaccount """ identifier = models.CharField(max_length=1000, null=True, blank=True) title = models.CharField(max_length=1000, null=True, blank=True) deleted = models.ForeignKey("user.UserActionLogEntry", on_delete=models.SET_NULL, null=True, blank=True, related_name='+') comment = models.TextField(null=True, blank=True) log = models.ManyToManyField("user.UserActionLogEntry", blank=True, help_text="Keeps all user actions of an object", editable=False) class Meta: abstract = True @abstractmethod def set_status_messages(self, request: HttpRequest): raise NotImplementedError def mark_as_deleted(self, user, send_mail: bool = True): """ Mark an entry as deleted Does not delete from database but sets a timestamp for being deleted on and which user deleted the object Args: user (User): The performing user Returns: """ from user.models import UserActionLogEntry if self.deleted: # Nothing to do here return with transaction.atomic(): action = UserActionLogEntry.get_deleted_action(user) self.deleted = action self.log.add(action) if send_mail: # Send mail shared_users = self.shared_users.values_list("id", flat=True) for user_id in shared_users: celery_send_mail_shared_data_deleted.delay(self.identifier, self.title, user_id) # Send mail shared_teams = self.shared_teams.values_list("id", flat=True) for team_id in shared_teams: celery_send_mail_shared_data_deleted_team.delay(self.identifier, self.title, team_id) self.save() def mark_as_edited(self, performing_user, request: HttpRequest = None, edit_comment: str = None): """ In case the object or a related object changed the log history needs to be updated Args: performing_user (User): The user which performed the editing action request (HttpRequest): The used request for this action edit_comment (str): Additional comment for the log entry Returns: """ from user.models import UserActionLogEntry edit_action = UserActionLogEntry.get_edited_action(performing_user, edit_comment) self.modified = edit_action self.log.add(edit_action) self.save() return edit_action def add_log_entry(self, action, user, comment: str): """ Wraps adding of UserActionLogEntry to log Args: action (UserAction): The performed UserAction user (User): Performing user comment (str): The optional comment Returns: """ from user.models import UserActionLogEntry user_action = UserActionLogEntry.objects.create( user=user, action=action, comment=comment ) self.log.add(user_action) def generate_new_identifier(self) -> str: """ Generates a new identifier for the intervention object Returns: str """ from compensation.models import Compensation, EcoAccount from intervention.models import Intervention from ema.models import Ema definitions = { Intervention: { "length": INTERVENTION_IDENTIFIER_LENGTH, "template": INTERVENTION_IDENTIFIER_TEMPLATE, }, Compensation: { "length": COMPENSATION_IDENTIFIER_LENGTH, "template": COMPENSATION_IDENTIFIER_TEMPLATE, }, EcoAccount: { "length": ECO_ACCOUNT_IDENTIFIER_LENGTH, "template": ECO_ACCOUNT_IDENTIFIER_TEMPLATE, }, Ema: { "length": EMA_ACCOUNT_IDENTIFIER_LENGTH, "template": EMA_ACCOUNT_IDENTIFIER_TEMPLATE, }, } if self.__class__ not in definitions: # Not defined, yet. Create fallback identifier for this case return generate_random_string(10) _now = now() curr_month = _now.month if curr_month < 10: # Make sure month part has two digits to have same length identifiers by default curr_month = f"0{curr_month}" else: curr_month = str(curr_month) curr_year = str(_now.year) rand_str = generate_random_string( length=definitions[self.__class__]["length"], use_numbers=True, use_letters_lc=False, use_letters_uc=True, ) _str = "{}{}-{}".format(curr_month, curr_year, rand_str) return definitions[self.__class__]["template"].format(_str) @abstractmethod def get_detail_url(self): raise NotImplementedError() class RecordableObjectMixin(models.Model): """ Wraps record related fields and functionality """ # Refers to "verzeichnen" recorded = models.OneToOneField( "user.UserActionLogEntry", on_delete=models.SET_NULL, null=True, blank=True, help_text="Holds data on user and timestamp of this action", related_name="+" ) class Meta: abstract = True def set_unrecorded(self, user): """ Perform unrecording Args: user (User): Performing user Returns: """ from user.models import UserActionLogEntry if not self.recorded: return None action = UserActionLogEntry.get_unrecorded_action(user) self.recorded = None self.save() self.log.add(action) shared_users = self.shared_users.values_list("id", flat=True) shared_teams = self.shared_teams.values_list("id", flat=True) for user_id in shared_users: celery_send_mail_shared_data_unrecorded.delay(self.identifier, self.title, user_id) for team_id in shared_teams: celery_send_mail_shared_data_unrecorded_team.delay(self.identifier, self.title, team_id) return action def set_recorded(self, user): """ Perform recording Args: user (User): Performing user Returns: """ from user.models import UserActionLogEntry if self.recorded: return None self.unshare_with_default_users() action = UserActionLogEntry.get_recorded_action(user) self.recorded = action self.save() self.log.add(action) shared_users = self.shared_users.values_list("id", flat=True) shared_teams = self.shared_teams.values_list("id", flat=True) for user_id in shared_users: celery_send_mail_shared_data_recorded.delay(self.identifier, self.title, user_id) for team_id in shared_teams: celery_send_mail_shared_data_recorded_team.delay(self.identifier, self.title, team_id) return action def unrecord(self, performing_user, request: HttpRequest = None): """ Unrecords a dataset Args: performing_user (User): The user which performed the editing action request (HttpRequest): The used request for this action Returns: """ action = None if self.recorded: action = self.set_unrecorded(performing_user) self.log.add(action) if request: messages.info( request, CHECKED_RECORDED_RESET ) return action @abstractmethod def is_ready_for_publish(self) -> bool: """ Check for all needed publishing-constraints on the data Returns: is_ready (bool): True|False """ raise NotImplementedError("Implement this in the subclass!") @property def is_recorded(self): """ Getter for record status as property Returns: """ return self.recorded is not None class CheckableObjectMixin(models.Model): # Checks - Refers to "Genehmigen" but optional checked = models.OneToOneField( "user.UserActionLogEntry", on_delete=models.SET_NULL, null=True, blank=True, help_text="Holds data on user and timestamp of this action", related_name="+" ) class Meta: abstract = True def set_unchecked(self) -> None: """ Perform unrecording Args: Returns: """ if not self.checked: # Nothing to do return # Do not .delete() the checked attribute! Just set it to None, since a delete() would kill it out of the # log history, which is not what we want! self.checked = None self.save() return None def set_checked(self, user): """ Perform checking Args: user (User): Performing user Returns: """ from user.models import UserActionLogEntry if self.checked: # Nothing to do return action = UserActionLogEntry.get_checked_action(user) self.checked = action self.save() # Send mail shared_users = self.shared_users.values_list("id", flat=True) for user_id in shared_users: celery_send_mail_shared_data_checked.delay(self.identifier, self.title, user_id) # Send mail shared_teams = self.shared_teams.values_list("id", flat=True) for team_id in shared_teams: celery_send_mail_shared_data_checked_team.delay(self.identifier, self.title, team_id) self.log.add(action) return action class ShareableObjectMixin(models.Model): # Users having access on this object users = models.ManyToManyField("user.User", help_text="Users having access (data shared with)") teams = models.ManyToManyField("user.Team", help_text="Teams having access (data shared with)") access_token = models.CharField( max_length=255, null=True, blank=True, help_text="Used for sharing access", ) class Meta: abstract = True def generate_access_token(self, make_unique: bool = False, rec_depth: int = 5): """ Creates a new access token for the data Tokens are not used for identification of a table row. The share logic checks the intervention id as well as the given token. Therefore two different interventions can hold the same access_token without problems. For (possible) future changes to the share logic, the make_unique parameter may be used for checking whether the access_token is already used in any intervention. If so, tokens will be generated as long as a free token can be found. Args: make_unique (bool): Perform check on uniqueness over all intervention entries rec_depth (int): How many tries for generating a free random token (only if make_unique) Returns: """ # Make sure we won't end up in an infinite loop of trying to generate access_tokens rec_depth = rec_depth - 1 if rec_depth < 0 and make_unique: raise RuntimeError( "Access token generating for {} does not seem to find a free random token! Aborted!".format(self.id) ) # Create random token token = generators.generate_random_string(15, True, True, False) # Check dynamically wheter there is another instance of that model, which holds this random access token _model = self._meta.concrete_model token_used_in = _model.objects.filter(access_token=token) # Make sure the token is not used anywhere as access_token, yet. # Make use of QuerySet lazy method for checking if it exists or not. if token_used_in and make_unique: self.generate_access_token(make_unique, rec_depth) else: self.access_token = token self.save() def is_shared_with(self, user): """ Access check Checks whether a given user has access to this object Args: user (): Returns: """ directly_shared = self.users.filter(id=user.id).exists() team_shared = self.teams.filter( users__in=[user] ).exists() is_shared = directly_shared or team_shared return is_shared def share_with_team(self, team): """ Adds team to list of shared access teans Args: team (Team): The team to be added to the object Returns: """ self.teams.add(team) def share_with_team_list(self, team_list: list): """ Sets the list of shared access teams Args: team_list (list): The teams to be added to the object Returns: """ self.teams.set(team_list) def share_with_user(self, user): """ Adds user to list of shared access users Args: user (User): The user to be added to the object Returns: """ if not self.is_shared_with(user): self.users.add(user) def share_with_user_list(self, user_list: list): """ Sets the list of shared access users Args: user_list (list): The users to be added to the object Returns: """ self.users.set(user_list) def _update_shared_teams(self, form): """ Updates shared access on the object for teams Args: form (ShareModalForm): The form holding the data Returns: """ form_data = form.cleaned_data shared_teams = self.shared_teams # Fetch selected teams and find out which user IDs are in removed teams -> mails need to be sent accessing_teams = form_data["teams"] removed_teams = shared_teams.exclude( id__in=accessing_teams ).values_list("id", flat=True) new_teams = accessing_teams.exclude( id__in=shared_teams ).values_list("id", flat=True) for team_id in new_teams: celery_send_mail_shared_access_given_team.delay(self.identifier, self.title, team_id) for team_id in removed_teams: celery_send_mail_shared_access_removed_team.delay(self.identifier, self.title, team_id) self.share_with_team_list(accessing_teams) def _update_shared_users(self, form): """ Updates shared access on the object for single users Args: form (ShareModalForm): The form holding the data Returns: """ form_data = form.cleaned_data shared_users = self.shared_users # Fetch selected users accessing_users = form_data["users"] removed_users = shared_users.exclude( id__in=accessing_users ).values_list("id", flat=True) new_users = accessing_users.exclude( id__in=shared_users ).values_list("id", flat=True) # Send mails for user_id in removed_users: celery_send_mail_shared_access_removed.delay(self.identifier, self.title, user_id) for user_id in new_users: celery_send_mail_shared_access_given.delay(self.identifier, self.title, user_id) # Set new shared users self.share_with_user_list(accessing_users) def update_shared_access(self, form): """ Updates shared access on the object Args: form (ShareModalForm): The form holding the data Returns: """ self._update_shared_teams(form) self._update_shared_users(form) @property def shared_users(self) -> QuerySet: """ Shortcut for fetching the users which have shared access on this object Returns: users (QuerySet) """ return self.users.all() @property def shared_teams(self) -> QuerySet: """ Shortcut for fetching the teams which have shared access on this object Returns: teams (QuerySet) """ return self.teams.all() @abstractmethod def get_share_url(self): """ Returns the share url for the object Returns: """ raise NotImplementedError("Must be implemented in subclasses!") def unshare_with_default_users(self): """ Removes all shared users from direct shared access which are only default group users Returns: """ from konova.utils.user_checks import is_default_group_only users = self.shared_users cleaned_users = [] default_users = [] for user in users: if not is_default_group_only(user): cleaned_users.append(user) else: default_users.append(user) self.share_with_user_list(cleaned_users) for user in default_users: celery_send_mail_shared_access_removed.delay(self.identifier, self.title, user.id) class GeoReferencedMixin(models.Model): geometry = models.ForeignKey("konova.Geometry", null=True, blank=True, on_delete=models.SET_NULL) class Meta: abstract = True def get_underlying_parcels(self): """ Getter for related parcels Returns: parcels (Iterable): An empty list or a Queryset """ if self.geometry is not None: return self.geometry.get_underlying_parcels() else: return [] def set_geometry_conflict_message(self, request: HttpRequest): if self.geometry is None: return request instance_objs = [] add_message = False conflicts = self.geometry.conflicts_geometries.all() for conflict in conflicts: instance_objs += conflict.affected_geometry.get_data_objects() add_message = True conflicts = self.geometry.conflicted_by_geometries.all() for conflict in conflicts: instance_objs += conflict.conflicting_geometry.get_data_objects() add_message = True if add_message: instance_identifiers = [x.identifier for x in instance_objs] instance_identifiers = ", ".join(instance_identifiers) message_str = GEOMETRY_CONFLICT_WITH_TEMPLATE.format(instance_identifiers) messages.info(request, message_str) return request def get_LANIS_link(self) -> str: """ Generates a link for LANIS depending on the geometry Returns: """ try: geom = self.geometry.geom.transform(DEFAULT_SRID_RLP, clone=True) x = geom.centroid.x y = geom.centroid.y area = int(geom.envelope.area) z_l = 16 for k_area, v_zoom in LANIS_ZOOM_LUT.items(): if k_area < area: z_l = v_zoom break zoom_lvl = z_l except (AttributeError, IndexError) as e: # If no geometry has been added, yet. x = 1 y = 1 zoom_lvl = 6 return LANIS_LINK_TEMPLATE.format( zoom_lvl, x, y, )