Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6f2b6c44d9 | |||
| 494e80a4ac | |||
| 1f6c81874b | |||
| 9ee016a8bb | |||
| 93d29982a6 | |||
| 59a1bdfb1c | |||
| 056a92b068 |
@@ -31,3 +31,21 @@ class ExternalIdentifier(models.Model):
|
|||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f"{self.external_id} -> {self.internal_id}"
|
return f"{self.external_id} -> {self.internal_id}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def resolve_external_identifier(external_identifier: str):
|
||||||
|
""" Returns a ExternalIdentifier object, if the given parameter could be resolved as an external identifier.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
external_identifier (str): An external identifier.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ExternalIdentifier | None
|
||||||
|
"""
|
||||||
|
if external_identifier:
|
||||||
|
try:
|
||||||
|
obj = ExternalIdentifier.objects.get(external_id=external_identifier)
|
||||||
|
return obj
|
||||||
|
except ExternalIdentifier.DoesNotExist:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|||||||
@@ -31,9 +31,10 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
|
|||||||
def setUpTestData(cls):
|
def setUpTestData(cls):
|
||||||
super().setUpTestData()
|
super().setUpTestData()
|
||||||
|
|
||||||
def _run_share_request(self, url, user_list: list):
|
def _run_share_request(self, url, user_list: list, team_list: list):
|
||||||
data = {
|
data = {
|
||||||
"users": user_list
|
"users": user_list,
|
||||||
|
"teams": team_list
|
||||||
}
|
}
|
||||||
data = json.dumps(data)
|
data = json.dumps(data)
|
||||||
response = self.client.put(
|
response = self.client.put(
|
||||||
@@ -58,16 +59,22 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
|
|||||||
self.superuser.username,
|
self.superuser.username,
|
||||||
self.user.username,
|
self.user.username,
|
||||||
]
|
]
|
||||||
|
team_list = [
|
||||||
|
str(self.team.id),
|
||||||
|
]
|
||||||
|
|
||||||
response = self._run_share_request(url, user_list)
|
response = self._run_share_request(url, user_list, team_list)
|
||||||
|
|
||||||
# Must fail, since performing user has no access on requested object
|
# Must fail, since performing user has no access on requested object
|
||||||
self.assertEqual(response.status_code, 500)
|
self.assertEqual(response.status_code, 500)
|
||||||
self.assertTrue(len(json.loads(response.content.decode("utf-8")).get("errors", [])) > 0)
|
self.assertTrue(len(json.loads(response.content.decode("utf-8")).get("errors", [])) > 0)
|
||||||
|
|
||||||
# Add performing user to shared access users and rerun the request
|
# Add performing user to shared access users, switch from team id to team name and rerun the request
|
||||||
obj.users.add(self.superuser)
|
obj.users.add(self.superuser)
|
||||||
response = self._run_share_request(url, user_list)
|
team_list = [
|
||||||
|
self.team.name
|
||||||
|
]
|
||||||
|
response = self._run_share_request(url, user_list, team_list)
|
||||||
|
|
||||||
shared_users = obj.shared_users
|
shared_users = obj.shared_users
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
@@ -84,14 +91,14 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
|
|||||||
share_url = reverse("api:v1:intervention-share", args=(self.intervention.id,))
|
share_url = reverse("api:v1:intervention-share", args=(self.intervention.id,))
|
||||||
# Expect the first request to work properly
|
# Expect the first request to work properly
|
||||||
self.intervention.users.add(self.superuser)
|
self.intervention.users.add(self.superuser)
|
||||||
response = self._run_share_request(share_url, [self.superuser.username])
|
response = self._run_share_request(share_url, [self.superuser.username], [str(self.team.id)])
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
# Change the token
|
# Change the token
|
||||||
self.header_data["HTTP_ksptoken"] = f"{self.superuser.api_token.token}__X"
|
self.header_data["HTTP_ksptoken"] = f"{self.superuser.api_token.token}__X"
|
||||||
|
|
||||||
# Expect the request to fail now
|
# Expect the request to fail now
|
||||||
response = self._run_share_request(share_url, [self.superuser.username])
|
response = self._run_share_request(share_url, [self.superuser.username], [])
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
def test_api_intervention_sharing(self):
|
def test_api_intervention_sharing(self):
|
||||||
@@ -144,11 +151,11 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
|
|||||||
self.assertEqual(self.intervention.users.count(), 1)
|
self.assertEqual(self.intervention.users.count(), 1)
|
||||||
|
|
||||||
# Try to add another user via API -> must work!
|
# Try to add another user via API -> must work!
|
||||||
response = self._run_share_request(share_url, [self.superuser.username, self.user.username])
|
response = self._run_share_request(share_url, [self.superuser.username, self.user.username], [])
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertEqual(self.intervention.users.count(), 2)
|
self.assertEqual(self.intervention.users.count(), 2)
|
||||||
|
|
||||||
# Now try to remove the user again -> expect no changes at all to the shared user list
|
# Now try to remove the user again -> expect no changes at all to the shared user list
|
||||||
response = self._run_share_request(share_url, [self.superuser.username])
|
response = self._run_share_request(share_url, [self.superuser.username], [])
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertEqual(self.intervention.users.count(), 2)
|
self.assertEqual(self.intervention.users.count(), 2)
|
||||||
|
|||||||
@@ -179,13 +179,7 @@ class AbstractModelAPISerializerV1(AbstractModelAPISerializer):
|
|||||||
Returns:
|
Returns:
|
||||||
ExternalIdentifier | None
|
ExternalIdentifier | None
|
||||||
"""
|
"""
|
||||||
if external_identifier:
|
return ExternalIdentifier.resolve_external_identifier(external_identifier)
|
||||||
try:
|
|
||||||
obj = ExternalIdentifier.objects.get(external_id=external_identifier)
|
|
||||||
return obj
|
|
||||||
except ObjectDoesNotExist:
|
|
||||||
pass
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _check_external_identifier_on_entry_creation(self, external_identifier):
|
def _check_external_identifier_on_entry_creation(self, external_identifier):
|
||||||
""" Special check for POST processing:
|
""" Special check for POST processing:
|
||||||
|
|||||||
+68
-19
@@ -6,13 +6,14 @@ Created on: 21.01.22
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
|
import uuid
|
||||||
|
|
||||||
from django.db.models import QuerySet
|
from django.db.models import QuerySet, Q
|
||||||
from django.http import JsonResponse, HttpRequest
|
from django.http import JsonResponse, HttpRequest
|
||||||
from django.views import View
|
from django.views import View
|
||||||
from django.views.decorators.csrf import csrf_exempt
|
from django.views.decorators.csrf import csrf_exempt
|
||||||
|
|
||||||
from api.models import APIUserToken
|
from api.models import APIUserToken, ExternalIdentifier
|
||||||
from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER
|
from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER
|
||||||
from compensation.models import EcoAccount
|
from compensation.models import EcoAccount
|
||||||
from ema.models import Ema
|
from ema.models import Ema
|
||||||
@@ -205,6 +206,10 @@ class AbstractModelShareAPIView(AbstractAPIView):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
external_identifier = ExternalIdentifier.resolve_external_identifier(id)
|
||||||
|
if external_identifier:
|
||||||
|
id = external_identifier.internal_id
|
||||||
|
|
||||||
users = self._get_shared_users_of_object(id)
|
users = self._get_shared_users_of_object(id)
|
||||||
teams = self._get_shared_teams_of_object(id)
|
teams = self._get_shared_teams_of_object(id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -237,6 +242,9 @@ class AbstractModelShareAPIView(AbstractAPIView):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
external_identifier = ExternalIdentifier.resolve_external_identifier(id)
|
||||||
|
if external_identifier:
|
||||||
|
id = external_identifier.internal_id
|
||||||
success = self._process_put_body(request.body, id)
|
success = self._process_put_body(request.body, id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return self._return_error_response(e)
|
return self._return_error_response(e)
|
||||||
@@ -309,22 +317,36 @@ class AbstractModelShareAPIView(AbstractAPIView):
|
|||||||
raise ValueError("Shared user list must not be empty!")
|
raise ValueError("Shared user list must not be empty!")
|
||||||
new_teams = content.get("teams", [])
|
new_teams = content.get("teams", [])
|
||||||
|
|
||||||
|
self.__process_user_sharing(new_users, obj)
|
||||||
|
self.__process_team_sharing(new_teams, obj)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __process_user_sharing(self, user_list: list, obj):
|
||||||
|
""" Processes API sharing for user payload
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_list (list): A list of users to share the obj with
|
||||||
|
obj (BaseObject): The shareable object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
# Eliminate duplicates
|
# Eliminate duplicates
|
||||||
new_users = list(dict.fromkeys(new_users))
|
new_users = list(dict.fromkeys(user_list))
|
||||||
new_teams = list(dict.fromkeys(new_teams))
|
|
||||||
|
|
||||||
# Make sure each of these names exist as a user
|
# Make sure each of these names exist as a user
|
||||||
new_users_objs = []
|
new_users_objs = []
|
||||||
for user in new_users:
|
for user in new_users:
|
||||||
new_users_objs.append(User.objects.get(username=user))
|
try:
|
||||||
|
user_obj = User.objects.get(username=user)
|
||||||
# Make sure each of these names exist as a user
|
except User.DoesNotExist:
|
||||||
new_teams_objs = []
|
raise AssertionError(f"User with username {user} does not exist")
|
||||||
for team_name in new_teams:
|
new_users_objs.append(user_obj)
|
||||||
new_teams_objs.append(Team.objects.get(name=team_name))
|
|
||||||
|
|
||||||
if self.user.is_default_group_only():
|
if self.user.is_default_group_only():
|
||||||
# Default only users are not allowed to remove other users from having access. They can only add new ones!
|
# Default only users are not allowed to remove other users from having access. They can only add new ones!
|
||||||
|
# So we need to keep the ones that already have access from being removed!
|
||||||
new_users_to_be_added = User.objects.filter(
|
new_users_to_be_added = User.objects.filter(
|
||||||
username__in=new_users
|
username__in=new_users
|
||||||
).exclude(
|
).exclude(
|
||||||
@@ -332,17 +354,44 @@ class AbstractModelShareAPIView(AbstractAPIView):
|
|||||||
)
|
)
|
||||||
new_users_objs = obj.shared_users.union(new_users_to_be_added)
|
new_users_objs = obj.shared_users.union(new_users_to_be_added)
|
||||||
|
|
||||||
new_teams_to_be_added = Team.objects.filter(
|
|
||||||
name__in=new_teams
|
|
||||||
).exclude(
|
|
||||||
id__in=obj.shared_teams
|
|
||||||
)
|
|
||||||
new_teams_objs = obj.shared_teams.union(new_teams_to_be_added)
|
|
||||||
|
|
||||||
obj.share_with_user_list(new_users_objs)
|
obj.share_with_user_list(new_users_objs)
|
||||||
obj.share_with_team_list(new_teams_objs)
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
def __process_team_sharing(self, team_list: list, obj):
|
||||||
|
""" Processes API sharing for team payload
|
||||||
|
|
||||||
|
Args:
|
||||||
|
team_list (list): A list of teams to share the obj with
|
||||||
|
obj (BaseObject): The shareable object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
# Eliminate duplicates
|
||||||
|
new_teams = list(dict.fromkeys(team_list))
|
||||||
|
|
||||||
|
# Resolve team names or ids into objects
|
||||||
|
new_team_ids = []
|
||||||
|
for team in new_teams:
|
||||||
|
try:
|
||||||
|
uuid.UUID(team)
|
||||||
|
try:
|
||||||
|
new_team_ids.append(Team.objects.get(id=team).id)
|
||||||
|
except Team.DoesNotExist:
|
||||||
|
raise AssertionError(f"Team with id {team} does not exist!")
|
||||||
|
except ValueError:
|
||||||
|
# entry is a name and not a uuid -> try to resolve as name!
|
||||||
|
try:
|
||||||
|
new_team_ids.append(Team.objects.get(name=team).id)
|
||||||
|
except Team.DoesNotExist:
|
||||||
|
raise AssertionError(f"Team with name {team} does not exist!")
|
||||||
|
|
||||||
|
new_team_objs = Team.objects.filter(id__in=new_team_ids)
|
||||||
|
if self.user.is_default_group_only():
|
||||||
|
# Default only users are not allowed to remove other users from having access. They can only add new ones!
|
||||||
|
# So we need to keep the ones that already have access from being removed!
|
||||||
|
new_team_objs = obj.shared_teams.union(new_team_objs)
|
||||||
|
|
||||||
|
obj.share_with_team_list(new_team_objs)
|
||||||
|
|
||||||
class InterventionAPIShareView(AbstractModelShareAPIView):
|
class InterventionAPIShareView(AbstractModelShareAPIView):
|
||||||
model = Intervention
|
model = Intervention
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.run_check_url = reverse("intervention:check", args=(self.intervention.id,))
|
self.run_check_url = reverse("intervention:check", args=(self.intervention.id,))
|
||||||
self.record_url = reverse("intervention:record", args=(self.intervention.id,))
|
self.record_url = reverse("intervention:record", args=(self.intervention.id,))
|
||||||
self.report_url = reverse("intervention:report", args=(self.intervention.id,))
|
self.report_url = reverse("intervention:report", args=(self.intervention.id,))
|
||||||
|
self.compensation_remove_url = reverse("intervention:remove-compensation", args=(self.compensation.intervention.id, self.compensation.id))
|
||||||
|
|
||||||
self.deduction.intervention = self.intervention
|
self.deduction.intervention = self.intervention
|
||||||
self.deduction.save()
|
self.deduction.save()
|
||||||
@@ -83,6 +84,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.revocation_new_url: f"{login_redirect_base}{self.revocation_new_url}",
|
self.revocation_new_url: f"{login_redirect_base}{self.revocation_new_url}",
|
||||||
self.revocation_edit_url: f"{login_redirect_base}{self.revocation_edit_url}",
|
self.revocation_edit_url: f"{login_redirect_base}{self.revocation_edit_url}",
|
||||||
self.revocation_remove_url: f"{login_redirect_base}{self.revocation_remove_url}",
|
self.revocation_remove_url: f"{login_redirect_base}{self.revocation_remove_url}",
|
||||||
|
self.compensation_remove_url: f"{login_redirect_base}{self.compensation_remove_url}",
|
||||||
}
|
}
|
||||||
|
|
||||||
self.assert_url_success(client, success_urls)
|
self.assert_url_success(client, success_urls)
|
||||||
@@ -124,6 +126,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.deduction_new_url,
|
self.deduction_new_url,
|
||||||
self.deduction_edit_url,
|
self.deduction_edit_url,
|
||||||
self.deduction_remove_url,
|
self.deduction_remove_url,
|
||||||
|
self.compensation_remove_url,
|
||||||
]
|
]
|
||||||
|
|
||||||
self.assert_url_success(client, success_urls)
|
self.assert_url_success(client, success_urls)
|
||||||
@@ -162,6 +165,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.deduction_new_url,
|
self.deduction_new_url,
|
||||||
self.deduction_edit_url,
|
self.deduction_edit_url,
|
||||||
self.deduction_remove_url,
|
self.deduction_remove_url,
|
||||||
|
self.compensation_remove_url,
|
||||||
]
|
]
|
||||||
fail_urls = [
|
fail_urls = [
|
||||||
self.run_check_url,
|
self.run_check_url,
|
||||||
@@ -212,6 +216,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.deduction_new_url,
|
self.deduction_new_url,
|
||||||
self.deduction_edit_url,
|
self.deduction_edit_url,
|
||||||
self.deduction_remove_url,
|
self.deduction_remove_url,
|
||||||
|
self.compensation_remove_url,
|
||||||
]
|
]
|
||||||
success_urls_redirect = {
|
success_urls_redirect = {
|
||||||
self.share_url: self.detail_url
|
self.share_url: self.detail_url
|
||||||
@@ -258,6 +263,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.deduction_new_url,
|
self.deduction_new_url,
|
||||||
self.deduction_edit_url,
|
self.deduction_edit_url,
|
||||||
self.deduction_remove_url,
|
self.deduction_remove_url,
|
||||||
|
self.compensation_remove_url,
|
||||||
]
|
]
|
||||||
success_urls_redirect = {
|
success_urls_redirect = {
|
||||||
self.share_url: self.detail_url
|
self.share_url: self.detail_url
|
||||||
@@ -304,6 +310,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.deduction_new_url,
|
self.deduction_new_url,
|
||||||
self.deduction_edit_url,
|
self.deduction_edit_url,
|
||||||
self.deduction_remove_url,
|
self.deduction_remove_url,
|
||||||
|
self.compensation_remove_url,
|
||||||
]
|
]
|
||||||
success_urls_redirect = {
|
success_urls_redirect = {
|
||||||
self.share_url: self.detail_url
|
self.share_url: self.detail_url
|
||||||
@@ -350,6 +357,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.deduction_new_url,
|
self.deduction_new_url,
|
||||||
self.deduction_edit_url,
|
self.deduction_edit_url,
|
||||||
self.deduction_remove_url,
|
self.deduction_remove_url,
|
||||||
|
self.compensation_remove_url,
|
||||||
]
|
]
|
||||||
success_urls_redirect = {
|
success_urls_redirect = {
|
||||||
self.share_url: self.detail_url
|
self.share_url: self.detail_url
|
||||||
@@ -396,6 +404,7 @@ class InterventionViewTestCase(BaseViewTestCase):
|
|||||||
self.deduction_new_url,
|
self.deduction_new_url,
|
||||||
self.deduction_edit_url,
|
self.deduction_edit_url,
|
||||||
self.deduction_remove_url,
|
self.deduction_remove_url,
|
||||||
|
self.compensation_remove_url,
|
||||||
]
|
]
|
||||||
# Define urls where a redirect to a specific location is the proper response
|
# Define urls where a redirect to a specific location is the proper response
|
||||||
success_urls_redirect = {
|
success_urls_redirect = {
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ from django.utils.decorators import method_decorator
|
|||||||
from django.views import View
|
from django.views import View
|
||||||
|
|
||||||
from intervention.models import Intervention
|
from intervention.models import Intervention
|
||||||
from konova.decorators import shared_access_required
|
from konova.decorators import shared_access_required, default_group_required
|
||||||
from konova.forms.modals import RemoveModalForm
|
from konova.forms.modals import RemoveModalForm
|
||||||
from konova.utils.message_templates import COMPENSATION_REMOVED_TEMPLATE
|
from konova.utils.message_templates import COMPENSATION_REMOVED_TEMPLATE
|
||||||
|
|
||||||
@@ -45,10 +45,12 @@ class RemoveCompensationFromInterventionView(LoginRequiredMixin, View):
|
|||||||
redirect_url=reverse("intervention:detail", args=(id,)) + "#related_data",
|
redirect_url=reverse("intervention:detail", args=(id,)) + "#related_data",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@method_decorator(default_group_required)
|
||||||
@method_decorator(shared_access_required(Intervention, "id"))
|
@method_decorator(shared_access_required(Intervention, "id"))
|
||||||
def get(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
|
def get(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
|
||||||
return self.__process_request(request, id, comp_id, *args, **kwargs)
|
return self.__process_request(request, id, comp_id, *args, **kwargs)
|
||||||
|
|
||||||
|
@method_decorator(default_group_required)
|
||||||
@method_decorator(shared_access_required(Intervention, "id"))
|
@method_decorator(shared_access_required(Intervention, "id"))
|
||||||
def post(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
|
def post(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
|
||||||
return self.__process_request(request, id, comp_id, *args, **kwargs)
|
return self.__process_request(request, id, comp_id, *args, **kwargs)
|
||||||
@@ -5,22 +5,18 @@ Contact: michel.peltriaux@sgdnord.rlp.de
|
|||||||
Created on: 09.11.20
|
Created on: 09.11.20
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import random
|
import secrets
|
||||||
import string
|
import string
|
||||||
import qrcode
|
|
||||||
import qrcode.image.svg
|
|
||||||
|
|
||||||
from io import BytesIO
|
|
||||||
|
|
||||||
|
|
||||||
def generate_token() -> str:
|
def generate_token(length: int = 64) -> str:
|
||||||
""" Shortcut for default generating of e.g. API token
|
""" Shortcut for default generating of e.g. API token
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
token (str)
|
token (str)
|
||||||
"""
|
"""
|
||||||
return generate_random_string(
|
return generate_random_string(
|
||||||
length=64,
|
length=length,
|
||||||
use_numbers=True,
|
use_numbers=True,
|
||||||
use_letters_lc=True
|
use_letters_lc=True
|
||||||
)
|
)
|
||||||
@@ -39,7 +35,7 @@ def generate_random_string(length: int, use_numbers: bool = False, use_letters_l
|
|||||||
elements.append(string.ascii_uppercase)
|
elements.append(string.ascii_uppercase)
|
||||||
|
|
||||||
elements = "".join(elements)
|
elements = "".join(elements)
|
||||||
ret_val = "".join(random.choice(elements) for i in range(length))
|
ret_val = "".join(secrets.choice(elements) for i in range(length))
|
||||||
return ret_val
|
return ret_val
|
||||||
|
|
||||||
class IdentifierGenerator:
|
class IdentifierGenerator:
|
||||||
|
|||||||
+59
-15
@@ -7,9 +7,10 @@ Created on: 10.05.24
|
|||||||
"""
|
"""
|
||||||
import base64
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import http
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from cryptography.fernet import Fernet
|
from cryptography.fernet import Fernet, InvalidToken
|
||||||
from django.core.exceptions import ObjectDoesNotExist
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
from django.http import HttpRequest, JsonResponse
|
from django.http import HttpRequest, JsonResponse
|
||||||
from django.utils.decorators import method_decorator
|
from django.utils.decorators import method_decorator
|
||||||
@@ -27,34 +28,77 @@ class PropagateUserView(View):
|
|||||||
proper rights management)
|
proper rights management)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
class PropagateStatus:
|
||||||
|
UNPROCESSED = "unprocessed"
|
||||||
|
UPDATED = "updated"
|
||||||
|
CREATED = "created"
|
||||||
|
|
||||||
@method_decorator(csrf_exempt)
|
@method_decorator(csrf_exempt)
|
||||||
def dispatch(self, request, *args, **kwargs):
|
def dispatch(self, request, *args, **kwargs):
|
||||||
return super().dispatch(request, *args, **kwargs)
|
return super().dispatch(request, *args, **kwargs)
|
||||||
|
|
||||||
def post(self, request: HttpRequest, *args, **kwargs):
|
def post(self, request: HttpRequest, *args, **kwargs):
|
||||||
|
response_data = {
|
||||||
|
"success": None,
|
||||||
|
"status": None
|
||||||
|
}
|
||||||
|
|
||||||
# Decrypt
|
# Decrypt
|
||||||
encrypted_body = request.body
|
encrypted_body = request.body
|
||||||
_hash = hashlib.md5()
|
_hash = hashlib.md5()
|
||||||
_hash.update(PROPAGATION_SECRET.encode("utf-8"))
|
_hash.update(PROPAGATION_SECRET.encode("utf-8"))
|
||||||
key = base64.urlsafe_b64encode(_hash.hexdigest().encode("utf-8"))
|
key = base64.urlsafe_b64encode(_hash.hexdigest().encode("utf-8"))
|
||||||
fernet = Fernet(key)
|
fernet = Fernet(key)
|
||||||
body = fernet.decrypt(encrypted_body).decode("utf-8")
|
|
||||||
body = json.loads(body)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
status = "updated"
|
body = fernet.decrypt(encrypted_body).decode("utf-8")
|
||||||
user = User.resolve_user_using_propagation_data(body)
|
body = json.loads(body)
|
||||||
user = user.update_user_using_propagation_data(body)
|
except InvalidToken:
|
||||||
except ObjectDoesNotExist:
|
response_data["error"] = "Invalid Token"
|
||||||
user = User(**body)
|
response_data["success"] = False
|
||||||
status = "created"
|
response_data["status"] = self.PropagateStatus.UNPROCESSED
|
||||||
|
return JsonResponse(
|
||||||
|
status=http.HTTPStatus.UNPROCESSABLE_CONTENT,
|
||||||
|
data=response_data
|
||||||
|
)
|
||||||
|
except (json.JSONDecodeError) as e:
|
||||||
|
response_data["error"] = str(e)
|
||||||
|
response_data["success"] = False
|
||||||
|
response_data["status"] = self.PropagateStatus.UNPROCESSED
|
||||||
|
return JsonResponse(
|
||||||
|
status=http.HTTPStatus.UNPROCESSABLE_CONTENT,
|
||||||
|
data=response_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process decrypted user data
|
||||||
|
processing_ret_vals = self.__process_user_data(body)
|
||||||
|
response_data["success"] = processing_ret_vals[0]
|
||||||
|
response_data["status"] = processing_ret_vals[1]
|
||||||
|
user = processing_ret_vals[2]
|
||||||
|
|
||||||
user.set_unusable_password()
|
user.set_unusable_password()
|
||||||
user.save()
|
user.save()
|
||||||
|
|
||||||
data = {
|
return JsonResponse(
|
||||||
"success": True,
|
status=http.HTTPStatus.OK,
|
||||||
"status": status
|
data=response_data
|
||||||
}
|
)
|
||||||
|
|
||||||
return JsonResponse(data)
|
def __process_user_data(self, body: dict) -> (bool, str, User):
|
||||||
|
""" Process decrypted user data
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
success (bool): Whether the processing was successful
|
||||||
|
status (str): In which way the data was used ('created' | 'updated')
|
||||||
|
user (User): Processed user object
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
user = User.resolve_user_using_propagation_data(body)
|
||||||
|
user = user.update_user_using_propagation_data(body)
|
||||||
|
status = self.PropagateStatus.UPDATED
|
||||||
|
except ObjectDoesNotExist:
|
||||||
|
user = User(**body)
|
||||||
|
status = self.PropagateStatus.CREATED
|
||||||
|
return True, status, user
|
||||||
|
|||||||
Reference in New Issue
Block a user