""" Author: Michel Peltriaux Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany Contact: ksp-servicestelle@sgdnord.rlp.de Created on: 10.05.24 """ import base64 import hashlib import http import json from cryptography.fernet import Fernet, InvalidToken from django.core.exceptions import ObjectDoesNotExist from django.http import HttpRequest, JsonResponse from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.csrf import csrf_exempt from konova.sub_settings.sso_settings import PROPAGATION_SECRET from user.models import User class PropagateUserView(View): """ Receives user data to be stored in db Used if new user gets access to application and needs to be created in application before first login (e.g. proper rights management) """ class PropagateStatus: UNPROCESSED = "unprocessed" UPDATED = "updated" CREATED = "created" @method_decorator(csrf_exempt) def dispatch(self, request, *args, **kwargs): return super().dispatch(request, *args, **kwargs) def post(self, request: HttpRequest, *args, **kwargs): response_data = { "success": None, "status": None } # Decrypt encrypted_body = request.body _hash = hashlib.md5() _hash.update(PROPAGATION_SECRET.encode("utf-8")) key = base64.urlsafe_b64encode(_hash.hexdigest().encode("utf-8")) fernet = Fernet(key) try: body = fernet.decrypt(encrypted_body).decode("utf-8") body = json.loads(body) except InvalidToken: response_data["error"] = "Invalid Token" response_data["success"] = False response_data["status"] = self.PropagateStatus.UNPROCESSED return JsonResponse( status=http.HTTPStatus.UNPROCESSABLE_CONTENT, data=response_data ) except (json.JSONDecodeError) as e: response_data["error"] = str(e) response_data["success"] = False response_data["status"] = self.PropagateStatus.UNPROCESSED return JsonResponse( status=http.HTTPStatus.UNPROCESSABLE_CONTENT, data=response_data ) # Process decrypted user data processing_ret_vals = self.__process_user_data(body) response_data["success"] = processing_ret_vals[0] response_data["status"] = processing_ret_vals[1] user = processing_ret_vals[2] user.set_unusable_password() user.save() return JsonResponse( status=http.HTTPStatus.OK, data=response_data ) def __process_user_data(self, body: dict) -> (bool, str, User): """ Process decrypted user data Args: body: Returns: success (bool): Whether the processing was successful status (str): In which way the data was used ('created' | 'updated') user (User): Processed user object """ try: user = User.resolve_user_using_propagation_data(body) user = user.update_user_using_propagation_data(body) status = self.PropagateStatus.UPDATED except ObjectDoesNotExist: user = User(**body) status = self.PropagateStatus.CREATED return True, status, user