Compare commits

..

4 Commits

Author SHA1 Message Date
mpeltriaux 6f2b6c44d9 Merge pull request '541 security improvements' (#542) from 541_Security_improvements into master
Reviewed-on: #542
2026-06-13 13:42:46 +02:00
mpeltriaux 494e80a4ac # Intervention remove compensation
* adds default role check for intervention's compensation removing endpoint
2026-06-13 13:41:45 +02:00
mpeltriaux 1f6c81874b # User propagation
* adds exception catching if gibberish data is sent to POST endpoint of user data propagation
* commits 'changed' manage.py and jspdf.debug.js despite being identical with repo files (git wants it, git gets it)
2026-06-13 13:33:17 +02:00
mpeltriaux 9ee016a8bb # Token generator
* improves reliability of generated token randomness
2026-06-13 12:57:19 +02:00
6 changed files with 281 additions and 230 deletions
+9
View File
@@ -36,6 +36,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.run_check_url = reverse("intervention:check", args=(self.intervention.id,)) self.run_check_url = reverse("intervention:check", args=(self.intervention.id,))
self.record_url = reverse("intervention:record", args=(self.intervention.id,)) self.record_url = reverse("intervention:record", args=(self.intervention.id,))
self.report_url = reverse("intervention:report", args=(self.intervention.id,)) self.report_url = reverse("intervention:report", args=(self.intervention.id,))
self.compensation_remove_url = reverse("intervention:remove-compensation", args=(self.compensation.intervention.id, self.compensation.id))
self.deduction.intervention = self.intervention self.deduction.intervention = self.intervention
self.deduction.save() self.deduction.save()
@@ -83,6 +84,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.revocation_new_url: f"{login_redirect_base}{self.revocation_new_url}", self.revocation_new_url: f"{login_redirect_base}{self.revocation_new_url}",
self.revocation_edit_url: f"{login_redirect_base}{self.revocation_edit_url}", self.revocation_edit_url: f"{login_redirect_base}{self.revocation_edit_url}",
self.revocation_remove_url: f"{login_redirect_base}{self.revocation_remove_url}", self.revocation_remove_url: f"{login_redirect_base}{self.revocation_remove_url}",
self.compensation_remove_url: f"{login_redirect_base}{self.compensation_remove_url}",
} }
self.assert_url_success(client, success_urls) self.assert_url_success(client, success_urls)
@@ -124,6 +126,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
self.assert_url_success(client, success_urls) self.assert_url_success(client, success_urls)
@@ -162,6 +165,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
fail_urls = [ fail_urls = [
self.run_check_url, self.run_check_url,
@@ -212,6 +216,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
success_urls_redirect = { success_urls_redirect = {
self.share_url: self.detail_url self.share_url: self.detail_url
@@ -258,6 +263,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
success_urls_redirect = { success_urls_redirect = {
self.share_url: self.detail_url self.share_url: self.detail_url
@@ -304,6 +310,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
success_urls_redirect = { success_urls_redirect = {
self.share_url: self.detail_url self.share_url: self.detail_url
@@ -350,6 +357,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
success_urls_redirect = { success_urls_redirect = {
self.share_url: self.detail_url self.share_url: self.detail_url
@@ -396,6 +404,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
# Define urls where a redirect to a specific location is the proper response # Define urls where a redirect to a specific location is the proper response
success_urls_redirect = { success_urls_redirect = {
+3 -1
View File
@@ -14,7 +14,7 @@ from django.utils.decorators import method_decorator
from django.views import View from django.views import View
from intervention.models import Intervention from intervention.models import Intervention
from konova.decorators import shared_access_required from konova.decorators import shared_access_required, default_group_required
from konova.forms.modals import RemoveModalForm from konova.forms.modals import RemoveModalForm
from konova.utils.message_templates import COMPENSATION_REMOVED_TEMPLATE from konova.utils.message_templates import COMPENSATION_REMOVED_TEMPLATE
@@ -45,10 +45,12 @@ class RemoveCompensationFromInterventionView(LoginRequiredMixin, View):
redirect_url=reverse("intervention:detail", args=(id,)) + "#related_data", redirect_url=reverse("intervention:detail", args=(id,)) + "#related_data",
) )
@method_decorator(default_group_required)
@method_decorator(shared_access_required(Intervention, "id")) @method_decorator(shared_access_required(Intervention, "id"))
def get(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse: def get(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
return self.__process_request(request, id, comp_id, *args, **kwargs) return self.__process_request(request, id, comp_id, *args, **kwargs)
@method_decorator(default_group_required)
@method_decorator(shared_access_required(Intervention, "id")) @method_decorator(shared_access_required(Intervention, "id"))
def post(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse: def post(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
return self.__process_request(request, id, comp_id, *args, **kwargs) return self.__process_request(request, id, comp_id, *args, **kwargs)
+4 -8
View File
@@ -5,22 +5,18 @@ Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 09.11.20 Created on: 09.11.20
""" """
import random import secrets
import string import string
import qrcode
import qrcode.image.svg
from io import BytesIO
def generate_token() -> str: def generate_token(length: int = 64) -> str:
""" Shortcut for default generating of e.g. API token """ Shortcut for default generating of e.g. API token
Returns: Returns:
token (str) token (str)
""" """
return generate_random_string( return generate_random_string(
length=64, length=length,
use_numbers=True, use_numbers=True,
use_letters_lc=True use_letters_lc=True
) )
@@ -39,7 +35,7 @@ def generate_random_string(length: int, use_numbers: bool = False, use_letters_l
elements.append(string.ascii_uppercase) elements.append(string.ascii_uppercase)
elements = "".join(elements) elements = "".join(elements)
ret_val = "".join(random.choice(elements) for i in range(length)) ret_val = "".join(secrets.choice(elements) for i in range(length))
return ret_val return ret_val
class IdentifierGenerator: class IdentifierGenerator:
Executable → Regular
View File
+59 -15
View File
@@ -7,9 +7,10 @@ Created on: 10.05.24
""" """
import base64 import base64
import hashlib import hashlib
import http
import json import json
from cryptography.fernet import Fernet from cryptography.fernet import Fernet, InvalidToken
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpRequest, JsonResponse from django.http import HttpRequest, JsonResponse
from django.utils.decorators import method_decorator from django.utils.decorators import method_decorator
@@ -27,34 +28,77 @@ class PropagateUserView(View):
proper rights management) proper rights management)
""" """
class PropagateStatus:
UNPROCESSED = "unprocessed"
UPDATED = "updated"
CREATED = "created"
@method_decorator(csrf_exempt) @method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs): def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs) return super().dispatch(request, *args, **kwargs)
def post(self, request: HttpRequest, *args, **kwargs): def post(self, request: HttpRequest, *args, **kwargs):
response_data = {
"success": None,
"status": None
}
# Decrypt # Decrypt
encrypted_body = request.body encrypted_body = request.body
_hash = hashlib.md5() _hash = hashlib.md5()
_hash.update(PROPAGATION_SECRET.encode("utf-8")) _hash.update(PROPAGATION_SECRET.encode("utf-8"))
key = base64.urlsafe_b64encode(_hash.hexdigest().encode("utf-8")) key = base64.urlsafe_b64encode(_hash.hexdigest().encode("utf-8"))
fernet = Fernet(key) fernet = Fernet(key)
body = fernet.decrypt(encrypted_body).decode("utf-8")
body = json.loads(body)
try: try:
status = "updated" body = fernet.decrypt(encrypted_body).decode("utf-8")
user = User.resolve_user_using_propagation_data(body) body = json.loads(body)
user = user.update_user_using_propagation_data(body) except InvalidToken:
except ObjectDoesNotExist: response_data["error"] = "Invalid Token"
user = User(**body) response_data["success"] = False
status = "created" response_data["status"] = self.PropagateStatus.UNPROCESSED
return JsonResponse(
status=http.HTTPStatus.UNPROCESSABLE_CONTENT,
data=response_data
)
except (json.JSONDecodeError) as e:
response_data["error"] = str(e)
response_data["success"] = False
response_data["status"] = self.PropagateStatus.UNPROCESSED
return JsonResponse(
status=http.HTTPStatus.UNPROCESSABLE_CONTENT,
data=response_data
)
# Process decrypted user data
processing_ret_vals = self.__process_user_data(body)
response_data["success"] = processing_ret_vals[0]
response_data["status"] = processing_ret_vals[1]
user = processing_ret_vals[2]
user.set_unusable_password() user.set_unusable_password()
user.save() user.save()
data = { return JsonResponse(
"success": True, status=http.HTTPStatus.OK,
"status": status data=response_data
} )
return JsonResponse(data) def __process_user_data(self, body: dict) -> (bool, str, User):
""" Process decrypted user data
Args:
body:
Returns:
success (bool): Whether the processing was successful
status (str): In which way the data was used ('created' | 'updated')
user (User): Processed user object
"""
try:
user = User.resolve_user_using_propagation_data(body)
user = user.update_user_using_propagation_data(body)
status = self.PropagateStatus.UPDATED
except ObjectDoesNotExist:
user = User(**body)
status = self.PropagateStatus.CREATED
return True, status, user