""" Author: Michel Peltriaux Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany Contact: michel.peltriaux@sgdnord.rlp.de Created on: 21.01.22 """ import json from django.db.models import QuerySet from django.http import JsonResponse, HttpRequest from django.views import View from django.views.decorators.csrf import csrf_exempt from api.models import APIUserToken from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER from compensation.models import EcoAccount from ema.models import Ema from intervention.models import Intervention from konova.utils.message_templates import DATA_UNSHARED from konova.utils.user_checks import is_default_group_only from user.models import User class AbstractAPIView(View): """ Base class for API views The API must follow the GeoJSON Specification RFC 7946 https://geojson.org/ https://datatracker.ietf.org/doc/html/rfc7946 """ user = None serializer = None rpp = 5 # Results per page default page_number = 1 # Page number default class Meta: abstract = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.response_body_base = { "rpp": None, "p": None, "next": None, "results": None } @csrf_exempt def dispatch(self, request, *args, **kwargs): try: # Fetch the proper user from the given request header token ksp_token = request.headers.get(KSP_TOKEN_HEADER_IDENTIFIER, None) ksp_user = request.headers.get(KSP_USER_HEADER_IDENTIFIER, None) self.user = APIUserToken.get_user_from_token(ksp_token, ksp_user) request.user = self.user if not self.user.is_default_user(): raise PermissionError("Default permissions required") except PermissionError as e: return self._return_error_response(e, 403) return super().dispatch(request, *args, **kwargs) def _return_error_response(self, error, status_code=500): """ Returns an error as JsonReponse Args: error (): The error/exception status_code (): The desired status code Returns: """ content = [error.__str__()] if hasattr(error, "messages"): content = error.messages return JsonResponse( { "errors": content }, status=status_code ) def _return_response(self, request: HttpRequest, data): """ Returns all important data into a response object Args: request (HttpRequest): The incoming request data (dict): The serialized data Returns: response (JsonResponse): The response to be returned """ response = self.response_body_base next_page = self.page_number + 1 next_page = next_page if next_page in self.serializer.paginator.page_range else None if next_page is not None: next_url = request.build_absolute_uri( request.path + f"?rpp={self.rpp}&p={next_page}" ) else: next_url = None response["rpp"] = self.rpp response["p"] = self.page_number response["next"] = next_url response["results"] = data return JsonResponse(response) class InterventionCheckAPIView(AbstractAPIView): def get(self, request: HttpRequest, id): """ Takes the GET request Args: request (HttpRequest): The incoming request id (str): The intervention's id Returns: response (JsonResponse) """ if not self.user.is_zb_user(): return self._return_error_response("Permission not granted", 403) try: obj = Intervention.objects.get( id=id, users__in=[self.user] ) except Exception as e: return self._return_error_response(e) all_valid, check_details = self.run_quality_checks(obj) if all_valid: log_entry = obj.set_checked(self.user) obj.log.add(log_entry) data = { "success": all_valid, "details": check_details } return JsonResponse(data) def run_quality_checks(self, obj: Intervention) -> (bool, dict): """ Performs a check for intervention and related compensations Args: obj (Intervention): The intervention Returns: all_valid (boold): Whether an error occured or not check_details (dict): A dict containg details on which elements have errors """ # Run quality check for Intervention all_valid = True intervention_checker = obj.quality_check() all_valid = intervention_checker.valid and all_valid # Run quality checks for linked compensations comps = obj.compensations.all() comp_checkers = [] for comp in comps: comp_checker = comp.quality_check() comp_checkers.append(comp_checker) all_valid = comp_checker.valid and all_valid check_details = { "intervention": { "id": obj.id, "errors": intervention_checker.messages }, "compensations": [ { "id": comp_checker.obj.id, "errors": comp_checker.messages } for comp_checker in comp_checkers ] } return all_valid, check_details class AbstractModelShareAPIView(AbstractAPIView): model = None class Meta: abstract = True def get(self, request: HttpRequest, id): """ Performs the GET request handling Args: request (HttpRequest): The incoming request id (str): The object's id Returns: """ try: users = self._get_shared_users_of_object(id) except Exception as e: return self._return_error_response(e) data = { "users": [ user.username for user in users ] } return JsonResponse(data) def put(self, request: HttpRequest, id): """ Performs the PUT request handling Args: request (HttpRequest): The incoming request id (str): The object's id Returns: """ try: success = self._process_put_body(request.body, id) except Exception as e: return self._return_error_response(e) data = { "success": success, } return JsonResponse(data) def _check_user_has_shared_access(self, obj): """ Raises a PermissionError if user has no shared access Args: obj (BaseObject): The object Returns: """ is_shared = obj.is_shared_with(self.user) if not is_shared: raise PermissionError(DATA_UNSHARED) def _get_shared_users_of_object(self, id) -> QuerySet: """ Check permissions and get the users Args: id (str): The object's id Returns: users (QuerySet) """ obj = self.model.objects.get( id=id ) self._check_user_has_shared_access(obj) users = obj.shared_users return users def _process_put_body(self, body: bytes, id: str): """ Reads the body data, performs validity checks and sets the new users Args: body (bytes): The request.body id (str): The object's id Returns: success (bool) """ obj = self.model.objects.get(id=id) self._check_user_has_shared_access(obj) new_users = json.loads(body.decode("utf-8")) new_users = new_users.get("users", []) if len(new_users) == 0: raise ValueError("Shared user list must not be empty!") # Eliminate duplicates new_users = list(dict.fromkeys(new_users)) # Make sure each of these names exist as a user new_users_objs = [] for user in new_users: new_users_objs.append(User.objects.get(username=user)) if is_default_group_only(self.user): # Default only users are not allowed to remove other users from having access. They can only add new ones! new_users_to_be_added = User.objects.filter( username__in=new_users ).exclude( id__in=obj.shared_users ) new_users_objs = obj.shared_users.union(new_users_to_be_added) obj.share_with_list(new_users_objs) return True class InterventionAPIShareView(AbstractModelShareAPIView): model = Intervention class EcoAccountAPIShareView(AbstractModelShareAPIView): model = EcoAccount class EmaAPIShareView(AbstractModelShareAPIView): model = Ema