* adds sso_identifier as new User model attribute * refactors minor code snippets on user-propagation data resolving * adds updating of username based on propagation data * adds sso_identifier on admin backend view
319 lines
9.4 KiB
Python
319 lines
9.4 KiB
Python
"""
|
|
Author: Michel Peltriaux
|
|
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
|
|
Contact: michel.peltriaux@sgdnord.rlp.de
|
|
Created on: 15.11.21
|
|
|
|
"""
|
|
from django.contrib.auth.models import AbstractUser
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
|
|
from django.db import models
|
|
|
|
from api.models import APIUserToken, OAuthToken
|
|
from konova.settings import ZB_GROUP, DEFAULT_GROUP, ETS_GROUP
|
|
from konova.utils.mailer import Mailer
|
|
from user.enums import UserNotificationEnum
|
|
|
|
|
|
class User(AbstractUser):
|
|
notifications = models.ManyToManyField("user.UserNotification", related_name="+", blank=True)
|
|
api_token = models.OneToOneField(
|
|
"api.APIUserToken",
|
|
blank=True,
|
|
null=True,
|
|
help_text="The user's API token",
|
|
on_delete=models.SET_NULL
|
|
)
|
|
oauth_token = models.ForeignKey(
|
|
"api.OAuthToken",
|
|
blank=True,
|
|
null=True,
|
|
on_delete=models.SET_NULL,
|
|
db_comment="OAuth token for the user",
|
|
related_name="+"
|
|
)
|
|
sso_identifier = models.CharField(
|
|
blank=True,
|
|
null=True,
|
|
db_comment="Identifies the account based on an unique identifier from the SSO system",
|
|
max_length=255,
|
|
)
|
|
|
|
def is_notification_setting_set(self, notification_enum: UserNotificationEnum):
|
|
return self.notifications.filter(
|
|
id=notification_enum.value
|
|
).exists()
|
|
|
|
def is_zb_user(self):
|
|
""" Shortcut for checking whether a user is of a special group or not
|
|
|
|
Returns:
|
|
bool
|
|
"""
|
|
return self.groups.filter(
|
|
name=ZB_GROUP
|
|
).exists()
|
|
|
|
def is_default_user(self):
|
|
""" Shortcut for checking whether a user is of a special group or not
|
|
|
|
Returns:
|
|
bool
|
|
"""
|
|
return self.groups.filter(
|
|
name=DEFAULT_GROUP
|
|
).exists()
|
|
|
|
def is_ets_user(self):
|
|
""" Shortcut for checking whether a user is of a special group or not
|
|
|
|
Returns:
|
|
bool
|
|
"""
|
|
return self.groups.filter(
|
|
name=ETS_GROUP
|
|
).exists()
|
|
|
|
def is_default_group_only(self) -> bool:
|
|
""" Checks if the user is only part of the default group
|
|
|
|
Args:
|
|
|
|
Returns:
|
|
bool
|
|
"""
|
|
return not self.in_group(ZB_GROUP) and not self.in_group(ETS_GROUP)
|
|
|
|
def in_group(self, group: str) -> bool:
|
|
""" Checks if the user is part of a group
|
|
|
|
Args:
|
|
group (str): The group's name
|
|
|
|
Returns:
|
|
bool
|
|
"""
|
|
return self.groups.filter(
|
|
name=group
|
|
)
|
|
|
|
def send_mail_shared_access_removed(self, obj, municipals_names):
|
|
""" Sends a mail to the user in case of removed shared access
|
|
|
|
Args:
|
|
obj ():
|
|
municipals_names ():
|
|
|
|
Returns:
|
|
|
|
"""
|
|
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_ACCESS_REMOVED)
|
|
if notification_set:
|
|
mailer = Mailer()
|
|
mailer.send_mail_shared_access_removed(obj, self, municipals_names)
|
|
|
|
def send_mail_shared_access_given(self, obj, municipals_names):
|
|
""" Sends a mail to the user in case of given shared access
|
|
|
|
Args:
|
|
obj (): The entry
|
|
municipals_names (iterable): List of municipals for this entry
|
|
|
|
Returns:
|
|
|
|
"""
|
|
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_ACCESS_GAINED)
|
|
if notification_set:
|
|
mailer = Mailer()
|
|
mailer.send_mail_shared_access_given(obj, self, municipals_names)
|
|
|
|
def send_mail_shared_data_recorded(self, obj, municipals_names):
|
|
""" Sends a mail to the user in case of shared data has been recorded
|
|
|
|
Args:
|
|
obj (): The entry
|
|
municipals_names (iterable): List of municipals for this entry
|
|
|
|
Returns:
|
|
|
|
"""
|
|
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_DATA_RECORDED)
|
|
if notification_set:
|
|
mailer = Mailer()
|
|
mailer.send_mail_shared_data_recorded(obj, self, municipals_names)
|
|
|
|
def send_mail_shared_data_unrecorded(self, obj, municipals_names):
|
|
""" Sends a mail to the user in case of shared data has been unrecorded
|
|
|
|
Args:
|
|
obj (): The entry
|
|
municipals_names (iterable): List of municipals for this entry
|
|
|
|
Returns:
|
|
|
|
"""
|
|
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_DATA_RECORDED)
|
|
if notification_set:
|
|
mailer = Mailer()
|
|
mailer.send_mail_shared_data_unrecorded(obj, self, municipals_names)
|
|
|
|
def send_mail_shared_data_deleted(self, obj, municipals_names):
|
|
""" Sends a mail to the user in case of shared data has been deleted
|
|
|
|
Args:
|
|
obj (): The entry
|
|
municipals_names (iterable): List of municipals for this entry
|
|
|
|
Returns:
|
|
|
|
"""
|
|
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_DATA_DELETED)
|
|
if notification_set:
|
|
mailer = Mailer()
|
|
mailer.send_mail_shared_data_deleted(obj, self, municipals_names)
|
|
|
|
def send_mail_shared_data_checked(self, obj, municipals_names):
|
|
""" Sends a mail to the user in case of shared data has been deleted
|
|
|
|
Args:
|
|
obj (): The entry
|
|
municipals_names (iterable): List of municipals for this entry
|
|
|
|
Returns:
|
|
|
|
"""
|
|
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_SHARED_DATA_CHECKED)
|
|
if notification_set:
|
|
mailer = Mailer()
|
|
mailer.send_mail_shared_data_checked(obj, self, municipals_names)
|
|
|
|
def send_mail_deduction_changed(self, obj, data_changes):
|
|
""" Sends a mail to the user in case of a changed deduction
|
|
|
|
Args:
|
|
obj (): The object
|
|
data_changes (dict): Contains the old|new changes of the deduction changes
|
|
|
|
Returns:
|
|
|
|
"""
|
|
notification_set = self.is_notification_setting_set(UserNotificationEnum.NOTIFY_ON_DEDUCTION_CHANGES)
|
|
if notification_set:
|
|
mailer = Mailer()
|
|
mailer.send_mail_deduction_changed(obj, self, data_changes)
|
|
|
|
def get_API_token(self):
|
|
""" Getter for an API token
|
|
|
|
Creates a new one if none exists, yet.
|
|
|
|
Returns:
|
|
token (APIUserToken)
|
|
"""
|
|
if self.api_token is None:
|
|
token = APIUserToken.objects.create()
|
|
self.api_token = token
|
|
self.save()
|
|
else:
|
|
token = self.api_token
|
|
return token
|
|
|
|
@property
|
|
def shared_teams(self):
|
|
""" Wrapper for fetching active teams of this user
|
|
|
|
Returns:
|
|
|
|
"""
|
|
shared_teams = self.teams.filter(
|
|
deleted__isnull=True
|
|
)
|
|
return shared_teams
|
|
|
|
@staticmethod
|
|
def oauth_update_user(user_data: dict):
|
|
"""
|
|
Get or create a user depending on given user_data.
|
|
If the user record already exists, it's data will be updated using user_data.
|
|
|
|
Args:
|
|
user_data (dict): User data from OAuth SSO component
|
|
|
|
Returns:
|
|
user (User): The resolved user
|
|
"""
|
|
username = user_data.get("username")
|
|
user, is_created = User.objects.get_or_create(
|
|
username=username
|
|
)
|
|
|
|
if is_created:
|
|
user.set_unusable_password()
|
|
|
|
user.first_name = user_data.get("first_name")
|
|
user.last_name = user_data.get("last_name")
|
|
user.email = user_data.get("email")
|
|
|
|
return user
|
|
|
|
def oauth_replace_token(self, token: OAuthToken):
|
|
"""
|
|
Drops old token (if existing) and stores given token.
|
|
|
|
Args:
|
|
token (OAuthToken): New token
|
|
|
|
Returns:
|
|
user (User)
|
|
"""
|
|
if self.oauth_token:
|
|
self.oauth_token.delete()
|
|
self.oauth_token = token
|
|
self.save()
|
|
return self
|
|
|
|
@staticmethod
|
|
def resolve_user_using_propagation_data(data: dict):
|
|
""" Fetches user from db by the given data from propagation process
|
|
|
|
Args:
|
|
data (dict): json containing user information from the sso system
|
|
|
|
Returns:
|
|
user (User): The resolved user
|
|
"""
|
|
username = data.get("username", None)
|
|
sso_identifier = data.get("sso_identifier", None)
|
|
if not username and not sso_identifier:
|
|
raise AssertionError("No username or sso identifier provided")
|
|
|
|
try:
|
|
user = User.objects.get(username=username)
|
|
except ObjectDoesNotExist:
|
|
try:
|
|
user = User.objects.get(sso_identifier=sso_identifier)
|
|
except ObjectDoesNotExist:
|
|
raise ObjectDoesNotExist("No user with this username or sso identifier was found")
|
|
|
|
return user
|
|
|
|
def update_user_using_propagation_data(self, data: dict):
|
|
""" Update user data based on propagation data from sso system
|
|
|
|
Args:
|
|
data (dict): json containing user information from the sso system
|
|
|
|
Returns:
|
|
user (User): The updated user
|
|
"""
|
|
skipable_attrs = {
|
|
"is_staff",
|
|
"is_superuser",
|
|
}
|
|
for _attr, _val in data.items():
|
|
if _attr in skipable_attrs:
|
|
continue
|
|
setattr(self, _attr, _val)
|
|
return self
|