You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
konova/user/models/user.py

267 lines
7.7 KiB
Python

"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 15.11.21
"""
from django.contrib.auth.models import AbstractUser
from django.db import models
from api.models import APIUserToken, OAuthToken
from konova.settings import ZB_GROUP, DEFAULT_GROUP, ETS_GROUP
from konova.utils.mailer import Mailer
from user.enums import UserNotificationEnum
class User(AbstractUser):
notifications = models.ManyToManyField("user.UserNotification", related_name="+", blank=True)
api_token = models.OneToOneField(
"api.APIUserToken",
blank=True,
null=True,
help_text="The user's API token",
on_delete=models.SET_NULL
)
oauth_token = models.ForeignKey(
"api.OAuthToken",
blank=True,
null=True,
on_delete=models.SET_NULL,
db_comment="OAuth token for the user",
related_name="+"
)
def is_notification_setting_set(self, notification_enum: UserNotificationEnum):
return self.notifications.filter(
id=notification_enum.value
).exists()
def is_zb_user(self):
""" Shortcut for checking whether a user is of a special group or not
Returns:
bool
"""
return self.groups.filter(
name=ZB_GROUP
).exists()
def is_default_user(self):
""" Shortcut for checking whether a user is of a special group or not
Returns:
bool
"""
return self.groups.filter(
name=DEFAULT_GROUP
).exists()
def is_ets_user(self):
""" Shortcut for checking whether a user is of a special group or not
Returns:
bool
"""
return self.groups.filter(
name=ETS_GROUP
).exists()
def is_default_group_only(self) -> bool:
""" Checks if the user is only part of the default group
Args:
Returns:
bool
"""
return not self.in_group(ZB_GROUP) and not self.in_group(ETS_GROUP)
def in_group(self, group: str) -> bool:
""" Checks if the user is part of a group
Args:
group (str): The group's name
Returns:
bool
"""
return self.groups.filter(
name=group
)
def send_mail_shared_access_removed(self, obj, municipals_names):
""" Sends a mail to the user in case of removed shared access
Args:
obj ():
municipals_names ():
Returns:
"""
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_ACCESS_REMOVED)
if notification_set:
mailer = Mailer()
mailer.send_mail_shared_access_removed(obj, self, municipals_names)
def send_mail_shared_access_given(self, obj, municipals_names):
""" Sends a mail to the user in case of given shared access
Args:
obj (): The entry
municipals_names (iterable): List of municipals for this entry
Returns:
"""
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_ACCESS_GAINED)
if notification_set:
mailer = Mailer()
mailer.send_mail_shared_access_given(obj, self, municipals_names)
def send_mail_shared_data_recorded(self, obj, municipals_names):
""" Sends a mail to the user in case of shared data has been recorded
Args:
obj (): The entry
municipals_names (iterable): List of municipals for this entry
Returns:
"""
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_DATA_RECORDED)
if notification_set:
mailer = Mailer()
mailer.send_mail_shared_data_recorded(obj, self, municipals_names)
def send_mail_shared_data_unrecorded(self, obj, municipals_names):
""" Sends a mail to the user in case of shared data has been unrecorded
Args:
obj (): The entry
municipals_names (iterable): List of municipals for this entry
Returns:
"""
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_DATA_RECORDED)
if notification_set:
mailer = Mailer()
mailer.send_mail_shared_data_unrecorded(obj, self, municipals_names)
def send_mail_shared_data_deleted(self, obj, municipals_names):
""" Sends a mail to the user in case of shared data has been deleted
Args:
obj (): The entry
municipals_names (iterable): List of municipals for this entry
Returns:
"""
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_DATA_DELETED)
if notification_set:
mailer = Mailer()
mailer.send_mail_shared_data_deleted(obj, self, municipals_names)
def send_mail_shared_data_checked(self, obj, municipals_names):
""" Sends a mail to the user in case of shared data has been deleted
Args:
obj (): The entry
municipals_names (iterable): List of municipals for this entry
Returns:
"""
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_DATA_CHECKED)
if notification_set:
mailer = Mailer()
mailer.send_mail_shared_data_checked(obj, self, municipals_names)
def send_mail_deduction_changed(self, obj, data_changes):
""" Sends a mail to the user in case of a changed deduction
Args:
obj (): The object
data_changes (dict): Contains the old|new changes of the deduction changes
Returns:
"""
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_DEDUCTION_CHANGES)
if notification_set:
mailer = Mailer()
mailer.send_mail_deduction_changed(obj, self, data_changes)
def get_API_token(self):
""" Getter for an API token
Creates a new one if none exists, yet.
Returns:
token (APIUserToken)
"""
if self.api_token is None:
token = APIUserToken.objects.create()
self.api_token = token
self.save()
else:
token = self.api_token
return token
@property
def shared_teams(self):
""" Wrapper for fetching active teams of this user
Returns:
"""
shared_teams = self.teams.filter(
deleted__isnull=True
)
return shared_teams
@staticmethod
def oauth_update_user(user_data: dict):
"""
Get or create a user depending on given user_data.
If the user record already exists, it's data will be updated using user_data.
Args:
user_data (dict): User data from OAuth SSO component
Returns:
user (User): The resolved user
"""
username = user_data.get("username")
user, is_created = User.objects.get_or_create(
username=username
)
if is_created:
user.set_unusable_password()
user.first_name = user_data.get("first_name")
user.last_name = user_data.get("last_name")
user.email = user_data.get("email")
return user
def oauth_replace_token(self, token: OAuthToken):
"""
Drops old token (if existing) and stores given token.
Args:
token (OAuthToken): New token
Returns:
user (User)
"""
if self.oauth_token:
self.oauth_token.delete()
self.oauth_token = token
self.save()
return self