You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
konova/api/views/views.py

355 lines
10 KiB
Python

"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 21.01.22
"""
import json
from django.db.models import QuerySet
from django.http import JsonResponse, HttpRequest
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from api.models import APIUserToken
from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER
from compensation.models import EcoAccount
from ema.models import Ema
from intervention.models import Intervention
from konova.utils.message_templates import DATA_UNSHARED
from konova.utils.user_checks import is_default_group_only
from user.models import User, Team
class AbstractAPIView(View):
""" Base class for API views
The API must follow the GeoJSON Specification RFC 7946
https://geojson.org/
https://datatracker.ietf.org/doc/html/rfc7946
"""
user = None
serializer = None
rpp = 5 # Results per page default
page_number = 1 # Page number default
class Meta:
abstract = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.response_body_base = {
"rpp": None,
"p": None,
"next": None,
"results": None
}
@csrf_exempt
def dispatch(self, request, *args, **kwargs):
try:
# Fetch the proper user from the given request header token
ksp_token = request.headers.get(KSP_TOKEN_HEADER_IDENTIFIER, None)
ksp_user = request.headers.get(KSP_USER_HEADER_IDENTIFIER, None)
token_user = APIUserToken.get_user_from_token(ksp_token)
if ksp_user != token_user.username:
raise PermissionError(f"Invalid token for {ksp_user}")
else:
self.user = token_user
request.user = self.user
if not self.user.is_default_user():
raise PermissionError("Default permissions required")
except PermissionError as e:
return self._return_error_response(e, 403)
return super().dispatch(request, *args, **kwargs)
def _return_error_response(self, error, status_code=500):
""" Returns an error as JsonReponse
Args:
error (): The error/exception
status_code (): The desired status code
Returns:
"""
content = [error.__str__()]
if hasattr(error, "messages"):
content = error.messages
return JsonResponse(
{
"errors": content
},
status=status_code
)
def _return_response(self, request: HttpRequest, data):
""" Returns all important data into a response object
Args:
request (HttpRequest): The incoming request
data (dict): The serialized data
Returns:
response (JsonResponse): The response to be returned
"""
response = self.response_body_base
next_page = self.page_number + 1
next_page = next_page if next_page in self.serializer.paginator.page_range else None
if next_page is not None:
next_url = request.build_absolute_uri(
request.path + f"?rpp={self.rpp}&p={next_page}"
)
else:
next_url = None
response["rpp"] = self.rpp
response["p"] = self.page_number
response["next"] = next_url
response["results"] = data
return JsonResponse(response)
class InterventionCheckAPIView(AbstractAPIView):
def get(self, request: HttpRequest, id):
""" Takes the GET request
Args:
request (HttpRequest): The incoming request
id (str): The intervention's id
Returns:
response (JsonResponse)
"""
if not self.user.is_zb_user():
return self._return_error_response("Permission not granted", 403)
try:
obj = Intervention.objects.get(
id=id,
users__in=[self.user]
)
except Exception as e:
return self._return_error_response(e)
all_valid, check_details = self.run_quality_checks(obj)
if all_valid:
log_entry = obj.set_checked(self.user)
obj.log.add(log_entry)
data = {
"success": all_valid,
"details": check_details
}
return JsonResponse(data)
def run_quality_checks(self, obj: Intervention) -> (bool, dict):
""" Performs a check for intervention and related compensations
Args:
obj (Intervention): The intervention
Returns:
all_valid (boold): Whether an error occured or not
check_details (dict): A dict containg details on which elements have errors
"""
# Run quality check for Intervention
all_valid = True
intervention_checker = obj.quality_check()
all_valid = intervention_checker.valid and all_valid
# Run quality checks for linked compensations
comps = obj.compensations.all()
comp_checkers = []
for comp in comps:
comp_checker = comp.quality_check()
comp_checkers.append(comp_checker)
all_valid = comp_checker.valid and all_valid
check_details = {
"intervention": {
"id": obj.id,
"errors": intervention_checker.messages
},
"compensations": [
{
"id": comp_checker.obj.id,
"errors": comp_checker.messages
}
for comp_checker in comp_checkers
]
}
return all_valid, check_details
class AbstractModelShareAPIView(AbstractAPIView):
model = None
class Meta:
abstract = True
def get(self, request: HttpRequest, id):
""" Performs the GET request handling
Args:
request (HttpRequest): The incoming request
id (str): The object's id
Returns:
"""
try:
users = self._get_shared_users_of_object(id)
teams = self._get_shared_teams_of_object(id)
except Exception as e:
return self._return_error_response(e)
data = {
"users": [
user.username for user in users
],
"teams": [
{
"id": team.id,
"name": team.name,
}
for team in teams
],
}
return JsonResponse(data)
def put(self, request: HttpRequest, id):
""" Performs the PUT request handling
Args:
request (HttpRequest): The incoming request
id (str): The object's id
Returns:
"""
try:
success = self._process_put_body(request.body, id)
except Exception as e:
return self._return_error_response(e)
data = {
"success": success,
}
return JsonResponse(data)
def _check_user_has_shared_access(self, obj):
""" Raises a PermissionError if user has no shared access
Args:
obj (BaseObject): The object
Returns:
"""
is_shared = obj.is_shared_with(self.user)
if not is_shared:
raise PermissionError(DATA_UNSHARED)
def _get_shared_users_of_object(self, id) -> QuerySet:
""" Check permissions and get the users
Args:
id (str): The object's id
Returns:
users (QuerySet)
"""
obj = self.model.objects.get(
id=id
)
self._check_user_has_shared_access(obj)
users = obj.shared_users
return users
def _get_shared_teams_of_object(self, id) -> QuerySet:
""" Check permissions and get the teams
Args:
id (str): The object's id
Returns:
users (QuerySet)
"""
obj = self.model.objects.get(
id=id
)
self._check_user_has_shared_access(obj)
teams = obj.shared_teams
return teams
def _process_put_body(self, body: bytes, id: str):
""" Reads the body data, performs validity checks and sets the new users
Args:
body (bytes): The request.body
id (str): The object's id
Returns:
success (bool)
"""
obj = self.model.objects.get(id=id)
self._check_user_has_shared_access(obj)
content = json.loads(body.decode("utf-8"))
new_users = content.get("users", [])
if len(new_users) == 0:
raise ValueError("Shared user list must not be empty!")
new_teams = content.get("teams", [])
# Eliminate duplicates
new_users = list(dict.fromkeys(new_users))
new_teams = list(dict.fromkeys(new_teams))
# Make sure each of these names exist as a user
new_users_objs = []
for user in new_users:
new_users_objs.append(User.objects.get(username=user))
# Make sure each of these names exist as a user
new_teams_objs = []
for team_name in new_teams:
new_teams_objs.append(Team.objects.get(name=team_name))
if is_default_group_only(self.user):
# Default only users are not allowed to remove other users from having access. They can only add new ones!
new_users_to_be_added = User.objects.filter(
username__in=new_users
).exclude(
id__in=obj.shared_users
)
new_users_objs = obj.shared_users.union(new_users_to_be_added)
new_teams_to_be_added = Team.objects.filter(
name__in=new_teams
).exclude(
id__in=obj.shared_teams
)
new_teams_objs = obj.shared_teams.union(new_teams_to_be_added)
obj.share_with_user_list(new_users_objs)
obj.share_with_team_list(new_teams_objs)
return True
class InterventionAPIShareView(AbstractModelShareAPIView):
model = Intervention
class EcoAccountAPIShareView(AbstractModelShareAPIView):
model = EcoAccount
class EmaAPIShareView(AbstractModelShareAPIView):
model = Ema