Compare commits

..

7 Commits

Author SHA1 Message Date
mpeltriaux 6f2b6c44d9 Merge pull request '541 security improvements' (#542) from 541_Security_improvements into master
Reviewed-on: #542
2026-06-13 13:42:46 +02:00
mpeltriaux 494e80a4ac # Intervention remove compensation
* adds default role check for intervention's compensation removing endpoint
2026-06-13 13:41:45 +02:00
mpeltriaux 1f6c81874b # User propagation
* adds exception catching if gibberish data is sent to POST endpoint of user data propagation
* commits 'changed' manage.py and jspdf.debug.js despite being identical with repo files (git wants it, git gets it)
2026-06-13 13:33:17 +02:00
mpeltriaux 9ee016a8bb # Token generator
* improves reliability of generated token randomness
2026-06-13 12:57:19 +02:00
mpeltriaux 93d29982a6 Merge pull request '538_API_share_with_ids' (#539) from 538_API_share_with_ids into master
Reviewed-on: #539
2026-05-14 13:04:59 +00:00
mpeltriaux 59a1bdfb1c # API team ID based sharing
* extents api sharing via team name with team id, so that both ways are supported now
* updates tests
2026-05-14 15:04:24 +02:00
mpeltriaux 056a92b068 # API Refactoring sharing
* refactors team and user sharing by splitting into more maintainable blocks of code
2026-05-14 13:58:26 +02:00
10 changed files with 385 additions and 266 deletions
+19 -1
View File
@@ -30,4 +30,22 @@ class ExternalIdentifier(models.Model):
) )
def __str__(self): def __str__(self):
return f"{self.external_id} -> {self.internal_id}" return f"{self.external_id} -> {self.internal_id}"
@staticmethod
def resolve_external_identifier(external_identifier: str):
""" Returns a ExternalIdentifier object, if the given parameter could be resolved as an external identifier.
Args:
external_identifier (str): An external identifier.
Returns:
ExternalIdentifier | None
"""
if external_identifier:
try:
obj = ExternalIdentifier.objects.get(external_id=external_identifier)
return obj
except ExternalIdentifier.DoesNotExist:
pass
return None
+16 -9
View File
@@ -31,9 +31,10 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
def setUpTestData(cls): def setUpTestData(cls):
super().setUpTestData() super().setUpTestData()
def _run_share_request(self, url, user_list: list): def _run_share_request(self, url, user_list: list, team_list: list):
data = { data = {
"users": user_list "users": user_list,
"teams": team_list
} }
data = json.dumps(data) data = json.dumps(data)
response = self.client.put( response = self.client.put(
@@ -58,16 +59,22 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
self.superuser.username, self.superuser.username,
self.user.username, self.user.username,
] ]
team_list = [
str(self.team.id),
]
response = self._run_share_request(url, user_list) response = self._run_share_request(url, user_list, team_list)
# Must fail, since performing user has no access on requested object # Must fail, since performing user has no access on requested object
self.assertEqual(response.status_code, 500) self.assertEqual(response.status_code, 500)
self.assertTrue(len(json.loads(response.content.decode("utf-8")).get("errors", [])) > 0) self.assertTrue(len(json.loads(response.content.decode("utf-8")).get("errors", [])) > 0)
# Add performing user to shared access users and rerun the request # Add performing user to shared access users, switch from team id to team name and rerun the request
obj.users.add(self.superuser) obj.users.add(self.superuser)
response = self._run_share_request(url, user_list) team_list = [
self.team.name
]
response = self._run_share_request(url, user_list, team_list)
shared_users = obj.shared_users shared_users = obj.shared_users
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
@@ -84,14 +91,14 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
share_url = reverse("api:v1:intervention-share", args=(self.intervention.id,)) share_url = reverse("api:v1:intervention-share", args=(self.intervention.id,))
# Expect the first request to work properly # Expect the first request to work properly
self.intervention.users.add(self.superuser) self.intervention.users.add(self.superuser)
response = self._run_share_request(share_url, [self.superuser.username]) response = self._run_share_request(share_url, [self.superuser.username], [str(self.team.id)])
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
# Change the token # Change the token
self.header_data["HTTP_ksptoken"] = f"{self.superuser.api_token.token}__X" self.header_data["HTTP_ksptoken"] = f"{self.superuser.api_token.token}__X"
# Expect the request to fail now # Expect the request to fail now
response = self._run_share_request(share_url, [self.superuser.username]) response = self._run_share_request(share_url, [self.superuser.username], [])
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
def test_api_intervention_sharing(self): def test_api_intervention_sharing(self):
@@ -144,11 +151,11 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
self.assertEqual(self.intervention.users.count(), 1) self.assertEqual(self.intervention.users.count(), 1)
# Try to add another user via API -> must work! # Try to add another user via API -> must work!
response = self._run_share_request(share_url, [self.superuser.username, self.user.username]) response = self._run_share_request(share_url, [self.superuser.username, self.user.username], [])
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(self.intervention.users.count(), 2) self.assertEqual(self.intervention.users.count(), 2)
# Now try to remove the user again -> expect no changes at all to the shared user list # Now try to remove the user again -> expect no changes at all to the shared user list
response = self._run_share_request(share_url, [self.superuser.username]) response = self._run_share_request(share_url, [self.superuser.username], [])
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(self.intervention.users.count(), 2) self.assertEqual(self.intervention.users.count(), 2)
+1 -7
View File
@@ -179,13 +179,7 @@ class AbstractModelAPISerializerV1(AbstractModelAPISerializer):
Returns: Returns:
ExternalIdentifier | None ExternalIdentifier | None
""" """
if external_identifier: return ExternalIdentifier.resolve_external_identifier(external_identifier)
try:
obj = ExternalIdentifier.objects.get(external_id=external_identifier)
return obj
except ObjectDoesNotExist:
pass
return None
def _check_external_identifier_on_entry_creation(self, external_identifier): def _check_external_identifier_on_entry_creation(self, external_identifier):
""" Special check for POST processing: """ Special check for POST processing:
+68 -19
View File
@@ -6,13 +6,14 @@ Created on: 21.01.22
""" """
import json import json
import uuid
from django.db.models import QuerySet from django.db.models import QuerySet, Q
from django.http import JsonResponse, HttpRequest from django.http import JsonResponse, HttpRequest
from django.views import View from django.views import View
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from api.models import APIUserToken from api.models import APIUserToken, ExternalIdentifier
from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER
from compensation.models import EcoAccount from compensation.models import EcoAccount
from ema.models import Ema from ema.models import Ema
@@ -205,6 +206,10 @@ class AbstractModelShareAPIView(AbstractAPIView):
""" """
try: try:
external_identifier = ExternalIdentifier.resolve_external_identifier(id)
if external_identifier:
id = external_identifier.internal_id
users = self._get_shared_users_of_object(id) users = self._get_shared_users_of_object(id)
teams = self._get_shared_teams_of_object(id) teams = self._get_shared_teams_of_object(id)
except Exception as e: except Exception as e:
@@ -237,6 +242,9 @@ class AbstractModelShareAPIView(AbstractAPIView):
""" """
try: try:
external_identifier = ExternalIdentifier.resolve_external_identifier(id)
if external_identifier:
id = external_identifier.internal_id
success = self._process_put_body(request.body, id) success = self._process_put_body(request.body, id)
except Exception as e: except Exception as e:
return self._return_error_response(e) return self._return_error_response(e)
@@ -309,22 +317,36 @@ class AbstractModelShareAPIView(AbstractAPIView):
raise ValueError("Shared user list must not be empty!") raise ValueError("Shared user list must not be empty!")
new_teams = content.get("teams", []) new_teams = content.get("teams", [])
self.__process_user_sharing(new_users, obj)
self.__process_team_sharing(new_teams, obj)
return True
def __process_user_sharing(self, user_list: list, obj):
""" Processes API sharing for user payload
Args:
user_list (list): A list of users to share the obj with
obj (BaseObject): The shareable object
Returns:
"""
# Eliminate duplicates # Eliminate duplicates
new_users = list(dict.fromkeys(new_users)) new_users = list(dict.fromkeys(user_list))
new_teams = list(dict.fromkeys(new_teams))
# Make sure each of these names exist as a user # Make sure each of these names exist as a user
new_users_objs = [] new_users_objs = []
for user in new_users: for user in new_users:
new_users_objs.append(User.objects.get(username=user)) try:
user_obj = User.objects.get(username=user)
# Make sure each of these names exist as a user except User.DoesNotExist:
new_teams_objs = [] raise AssertionError(f"User with username {user} does not exist")
for team_name in new_teams: new_users_objs.append(user_obj)
new_teams_objs.append(Team.objects.get(name=team_name))
if self.user.is_default_group_only(): if self.user.is_default_group_only():
# Default only users are not allowed to remove other users from having access. They can only add new ones! # Default only users are not allowed to remove other users from having access. They can only add new ones!
# So we need to keep the ones that already have access from being removed!
new_users_to_be_added = User.objects.filter( new_users_to_be_added = User.objects.filter(
username__in=new_users username__in=new_users
).exclude( ).exclude(
@@ -332,17 +354,44 @@ class AbstractModelShareAPIView(AbstractAPIView):
) )
new_users_objs = obj.shared_users.union(new_users_to_be_added) new_users_objs = obj.shared_users.union(new_users_to_be_added)
new_teams_to_be_added = Team.objects.filter(
name__in=new_teams
).exclude(
id__in=obj.shared_teams
)
new_teams_objs = obj.shared_teams.union(new_teams_to_be_added)
obj.share_with_user_list(new_users_objs) obj.share_with_user_list(new_users_objs)
obj.share_with_team_list(new_teams_objs)
return True
def __process_team_sharing(self, team_list: list, obj):
""" Processes API sharing for team payload
Args:
team_list (list): A list of teams to share the obj with
obj (BaseObject): The shareable object
Returns:
"""
# Eliminate duplicates
new_teams = list(dict.fromkeys(team_list))
# Resolve team names or ids into objects
new_team_ids = []
for team in new_teams:
try:
uuid.UUID(team)
try:
new_team_ids.append(Team.objects.get(id=team).id)
except Team.DoesNotExist:
raise AssertionError(f"Team with id {team} does not exist!")
except ValueError:
# entry is a name and not a uuid -> try to resolve as name!
try:
new_team_ids.append(Team.objects.get(name=team).id)
except Team.DoesNotExist:
raise AssertionError(f"Team with name {team} does not exist!")
new_team_objs = Team.objects.filter(id__in=new_team_ids)
if self.user.is_default_group_only():
# Default only users are not allowed to remove other users from having access. They can only add new ones!
# So we need to keep the ones that already have access from being removed!
new_team_objs = obj.shared_teams.union(new_team_objs)
obj.share_with_team_list(new_team_objs)
class InterventionAPIShareView(AbstractModelShareAPIView): class InterventionAPIShareView(AbstractModelShareAPIView):
model = Intervention model = Intervention
+9
View File
@@ -36,6 +36,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.run_check_url = reverse("intervention:check", args=(self.intervention.id,)) self.run_check_url = reverse("intervention:check", args=(self.intervention.id,))
self.record_url = reverse("intervention:record", args=(self.intervention.id,)) self.record_url = reverse("intervention:record", args=(self.intervention.id,))
self.report_url = reverse("intervention:report", args=(self.intervention.id,)) self.report_url = reverse("intervention:report", args=(self.intervention.id,))
self.compensation_remove_url = reverse("intervention:remove-compensation", args=(self.compensation.intervention.id, self.compensation.id))
self.deduction.intervention = self.intervention self.deduction.intervention = self.intervention
self.deduction.save() self.deduction.save()
@@ -83,6 +84,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.revocation_new_url: f"{login_redirect_base}{self.revocation_new_url}", self.revocation_new_url: f"{login_redirect_base}{self.revocation_new_url}",
self.revocation_edit_url: f"{login_redirect_base}{self.revocation_edit_url}", self.revocation_edit_url: f"{login_redirect_base}{self.revocation_edit_url}",
self.revocation_remove_url: f"{login_redirect_base}{self.revocation_remove_url}", self.revocation_remove_url: f"{login_redirect_base}{self.revocation_remove_url}",
self.compensation_remove_url: f"{login_redirect_base}{self.compensation_remove_url}",
} }
self.assert_url_success(client, success_urls) self.assert_url_success(client, success_urls)
@@ -124,6 +126,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
self.assert_url_success(client, success_urls) self.assert_url_success(client, success_urls)
@@ -162,6 +165,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
fail_urls = [ fail_urls = [
self.run_check_url, self.run_check_url,
@@ -212,6 +216,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
success_urls_redirect = { success_urls_redirect = {
self.share_url: self.detail_url self.share_url: self.detail_url
@@ -258,6 +263,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
success_urls_redirect = { success_urls_redirect = {
self.share_url: self.detail_url self.share_url: self.detail_url
@@ -304,6 +310,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
success_urls_redirect = { success_urls_redirect = {
self.share_url: self.detail_url self.share_url: self.detail_url
@@ -350,6 +357,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
success_urls_redirect = { success_urls_redirect = {
self.share_url: self.detail_url self.share_url: self.detail_url
@@ -396,6 +404,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url, self.deduction_new_url,
self.deduction_edit_url, self.deduction_edit_url,
self.deduction_remove_url, self.deduction_remove_url,
self.compensation_remove_url,
] ]
# Define urls where a redirect to a specific location is the proper response # Define urls where a redirect to a specific location is the proper response
success_urls_redirect = { success_urls_redirect = {
+3 -1
View File
@@ -14,7 +14,7 @@ from django.utils.decorators import method_decorator
from django.views import View from django.views import View
from intervention.models import Intervention from intervention.models import Intervention
from konova.decorators import shared_access_required from konova.decorators import shared_access_required, default_group_required
from konova.forms.modals import RemoveModalForm from konova.forms.modals import RemoveModalForm
from konova.utils.message_templates import COMPENSATION_REMOVED_TEMPLATE from konova.utils.message_templates import COMPENSATION_REMOVED_TEMPLATE
@@ -45,10 +45,12 @@ class RemoveCompensationFromInterventionView(LoginRequiredMixin, View):
redirect_url=reverse("intervention:detail", args=(id,)) + "#related_data", redirect_url=reverse("intervention:detail", args=(id,)) + "#related_data",
) )
@method_decorator(default_group_required)
@method_decorator(shared_access_required(Intervention, "id")) @method_decorator(shared_access_required(Intervention, "id"))
def get(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse: def get(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
return self.__process_request(request, id, comp_id, *args, **kwargs) return self.__process_request(request, id, comp_id, *args, **kwargs)
@method_decorator(default_group_required)
@method_decorator(shared_access_required(Intervention, "id")) @method_decorator(shared_access_required(Intervention, "id"))
def post(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse: def post(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
return self.__process_request(request, id, comp_id, *args, **kwargs) return self.__process_request(request, id, comp_id, *args, **kwargs)
+4 -8
View File
@@ -5,22 +5,18 @@ Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 09.11.20 Created on: 09.11.20
""" """
import random import secrets
import string import string
import qrcode
import qrcode.image.svg
from io import BytesIO
def generate_token() -> str: def generate_token(length: int = 64) -> str:
""" Shortcut for default generating of e.g. API token """ Shortcut for default generating of e.g. API token
Returns: Returns:
token (str) token (str)
""" """
return generate_random_string( return generate_random_string(
length=64, length=length,
use_numbers=True, use_numbers=True,
use_letters_lc=True use_letters_lc=True
) )
@@ -39,7 +35,7 @@ def generate_random_string(length: int, use_numbers: bool = False, use_letters_l
elements.append(string.ascii_uppercase) elements.append(string.ascii_uppercase)
elements = "".join(elements) elements = "".join(elements)
ret_val = "".join(random.choice(elements) for i in range(length)) ret_val = "".join(secrets.choice(elements) for i in range(length))
return ret_val return ret_val
class IdentifierGenerator: class IdentifierGenerator:
Executable → Regular
View File
@@ -9615,212 +9615,212 @@ Copyright (c) 2012 Willow Systems Corporation, willow-systems.com
}; };
})(jsPDF.API); })(jsPDF.API);
/* Blob.js /* Blob.js
* A Blob implementation. * A Blob implementation.
* 2014-07-24 * 2014-07-24
* *
* By Eli Grey, http://eligrey.com * By Eli Grey, http://eligrey.com
* By Devin Samarin, https://github.com/dsamarin * By Devin Samarin, https://github.com/dsamarin
* License: X11/MIT * License: X11/MIT
* See https://github.com/eligrey/Blob.js/blob/master/LICENSE.md * See https://github.com/eligrey/Blob.js/blob/master/LICENSE.md
*/ */
/*global self, unescape */ /*global self, unescape */
/*jslint bitwise: true, regexp: true, confusion: true, es5: true, vars: true, white: true, /*jslint bitwise: true, regexp: true, confusion: true, es5: true, vars: true, white: true,
plusplus: true */ plusplus: true */
/*! @source http://purl.eligrey.com/github/Blob.js/blob/master/Blob.js */ /*! @source http://purl.eligrey.com/github/Blob.js/blob/master/Blob.js */
(function (view) { (function (view) {
"use strict"; "use strict";
view.URL = view.URL || view.webkitURL; view.URL = view.URL || view.webkitURL;
if (view.Blob && view.URL) { if (view.Blob && view.URL) {
try { try {
new Blob; new Blob;
return; return;
} catch (e) {} } catch (e) {}
} }
// Internally we use a BlobBuilder implementation to base Blob off of // Internally we use a BlobBuilder implementation to base Blob off of
// in order to support older browsers that only have BlobBuilder // in order to support older browsers that only have BlobBuilder
var BlobBuilder = view.BlobBuilder || view.WebKitBlobBuilder || view.MozBlobBuilder || (function(view) { var BlobBuilder = view.BlobBuilder || view.WebKitBlobBuilder || view.MozBlobBuilder || (function(view) {
var var
get_class = function(object) { get_class = function(object) {
return Object.prototype.toString.call(object).match(/^\[object\s(.*)\]$/)[1]; return Object.prototype.toString.call(object).match(/^\[object\s(.*)\]$/)[1];
} }
, FakeBlobBuilder = function BlobBuilder() { , FakeBlobBuilder = function BlobBuilder() {
this.data = []; this.data = [];
} }
, FakeBlob = function Blob(data, type, encoding) { , FakeBlob = function Blob(data, type, encoding) {
this.data = data; this.data = data;
this.size = data.length; this.size = data.length;
this.type = type; this.type = type;
this.encoding = encoding; this.encoding = encoding;
} }
, FBB_proto = FakeBlobBuilder.prototype , FBB_proto = FakeBlobBuilder.prototype
, FB_proto = FakeBlob.prototype , FB_proto = FakeBlob.prototype
, FileReaderSync = view.FileReaderSync , FileReaderSync = view.FileReaderSync
, FileException = function(type) { , FileException = function(type) {
this.code = this[this.name = type]; this.code = this[this.name = type];
} }
, file_ex_codes = ( , file_ex_codes = (
"NOT_FOUND_ERR SECURITY_ERR ABORT_ERR NOT_READABLE_ERR ENCODING_ERR " "NOT_FOUND_ERR SECURITY_ERR ABORT_ERR NOT_READABLE_ERR ENCODING_ERR "
+ "NO_MODIFICATION_ALLOWED_ERR INVALID_STATE_ERR SYNTAX_ERR" + "NO_MODIFICATION_ALLOWED_ERR INVALID_STATE_ERR SYNTAX_ERR"
).split(" ") ).split(" ")
, file_ex_code = file_ex_codes.length , file_ex_code = file_ex_codes.length
, real_URL = view.URL || view.webkitURL || view , real_URL = view.URL || view.webkitURL || view
, real_create_object_URL = real_URL.createObjectURL , real_create_object_URL = real_URL.createObjectURL
, real_revoke_object_URL = real_URL.revokeObjectURL , real_revoke_object_URL = real_URL.revokeObjectURL
, URL = real_URL , URL = real_URL
, btoa = view.btoa , btoa = view.btoa
, atob = view.atob , atob = view.atob
, ArrayBuffer = view.ArrayBuffer , ArrayBuffer = view.ArrayBuffer
, Uint8Array = view.Uint8Array , Uint8Array = view.Uint8Array
, origin = /^[\w-]+:\/*\[?[\w\.:-]+\]?(?::[0-9]+)?/; , origin = /^[\w-]+:\/*\[?[\w\.:-]+\]?(?::[0-9]+)?/;
FakeBlob.fake = FB_proto.fake = true; FakeBlob.fake = FB_proto.fake = true;
while (file_ex_code--) { while (file_ex_code--) {
FileException.prototype[file_ex_codes[file_ex_code]] = file_ex_code + 1; FileException.prototype[file_ex_codes[file_ex_code]] = file_ex_code + 1;
} }
// Polyfill URL // Polyfill URL
if (!real_URL.createObjectURL) { if (!real_URL.createObjectURL) {
URL = view.URL = function(uri) { URL = view.URL = function(uri) {
var var
uri_info = document.createElementNS("http://www.w3.org/1999/xhtml", "a") uri_info = document.createElementNS("http://www.w3.org/1999/xhtml", "a")
, uri_origin; , uri_origin;
uri_info.href = uri; uri_info.href = uri;
if (!("origin" in uri_info)) { if (!("origin" in uri_info)) {
if (uri_info.protocol.toLowerCase() === "data:") { if (uri_info.protocol.toLowerCase() === "data:") {
uri_info.origin = null; uri_info.origin = null;
} else { } else {
uri_origin = uri.match(origin); uri_origin = uri.match(origin);
uri_info.origin = uri_origin && uri_origin[1]; uri_info.origin = uri_origin && uri_origin[1];
} }
} }
return uri_info; return uri_info;
}; };
} }
URL.createObjectURL = function(blob) { URL.createObjectURL = function(blob) {
var var
type = blob.type type = blob.type
, data_URI_header; , data_URI_header;
if (type === null) { if (type === null) {
type = "application/octet-stream"; type = "application/octet-stream";
} }
if (blob instanceof FakeBlob) { if (blob instanceof FakeBlob) {
data_URI_header = "data:" + type; data_URI_header = "data:" + type;
if (blob.encoding === "base64") { if (blob.encoding === "base64") {
return data_URI_header + ";base64," + blob.data; return data_URI_header + ";base64," + blob.data;
} else if (blob.encoding === "URI") { } else if (blob.encoding === "URI") {
return data_URI_header + "," + decodeURIComponent(blob.data); return data_URI_header + "," + decodeURIComponent(blob.data);
} if (btoa) { } if (btoa) {
return data_URI_header + ";base64," + btoa(blob.data); return data_URI_header + ";base64," + btoa(blob.data);
} else { } else {
return data_URI_header + "," + encodeURIComponent(blob.data); return data_URI_header + "," + encodeURIComponent(blob.data);
} }
} else if (real_create_object_URL) { } else if (real_create_object_URL) {
return real_create_object_URL.call(real_URL, blob); return real_create_object_URL.call(real_URL, blob);
} }
}; };
URL.revokeObjectURL = function(object_URL) { URL.revokeObjectURL = function(object_URL) {
if (object_URL.substring(0, 5) !== "data:" && real_revoke_object_URL) { if (object_URL.substring(0, 5) !== "data:" && real_revoke_object_URL) {
real_revoke_object_URL.call(real_URL, object_URL); real_revoke_object_URL.call(real_URL, object_URL);
} }
}; };
FBB_proto.append = function(data/*, endings*/) { FBB_proto.append = function(data/*, endings*/) {
var bb = this.data; var bb = this.data;
// decode data to a binary string // decode data to a binary string
if (Uint8Array && (data instanceof ArrayBuffer || data instanceof Uint8Array)) { if (Uint8Array && (data instanceof ArrayBuffer || data instanceof Uint8Array)) {
var var
str = "" str = ""
, buf = new Uint8Array(data) , buf = new Uint8Array(data)
, i = 0 , i = 0
, buf_len = buf.length; , buf_len = buf.length;
for (; i < buf_len; i++) { for (; i < buf_len; i++) {
str += String.fromCharCode(buf[i]); str += String.fromCharCode(buf[i]);
} }
bb.push(str); bb.push(str);
} else if (get_class(data) === "Blob" || get_class(data) === "File") { } else if (get_class(data) === "Blob" || get_class(data) === "File") {
if (FileReaderSync) { if (FileReaderSync) {
var fr = new FileReaderSync; var fr = new FileReaderSync;
bb.push(fr.readAsBinaryString(data)); bb.push(fr.readAsBinaryString(data));
} else { } else {
// async FileReader won't work as BlobBuilder is sync // async FileReader won't work as BlobBuilder is sync
throw new FileException("NOT_READABLE_ERR"); throw new FileException("NOT_READABLE_ERR");
} }
} else if (data instanceof FakeBlob) { } else if (data instanceof FakeBlob) {
if (data.encoding === "base64" && atob) { if (data.encoding === "base64" && atob) {
bb.push(atob(data.data)); bb.push(atob(data.data));
} else if (data.encoding === "URI") { } else if (data.encoding === "URI") {
bb.push(decodeURIComponent(data.data)); bb.push(decodeURIComponent(data.data));
} else if (data.encoding === "raw") { } else if (data.encoding === "raw") {
bb.push(data.data); bb.push(data.data);
} }
} else { } else {
if (typeof data !== "string") { if (typeof data !== "string") {
data += ""; // convert unsupported types to strings data += ""; // convert unsupported types to strings
} }
// decode UTF-16 to binary string // decode UTF-16 to binary string
bb.push(unescape(encodeURIComponent(data))); bb.push(unescape(encodeURIComponent(data)));
} }
}; };
FBB_proto.getBlob = function(type) { FBB_proto.getBlob = function(type) {
if (!arguments.length) { if (!arguments.length) {
type = null; type = null;
} }
return new FakeBlob(this.data.join(""), type, "raw"); return new FakeBlob(this.data.join(""), type, "raw");
}; };
FBB_proto.toString = function() { FBB_proto.toString = function() {
return "[object BlobBuilder]"; return "[object BlobBuilder]";
}; };
FB_proto.slice = function(start, end, type) { FB_proto.slice = function(start, end, type) {
var args = arguments.length; var args = arguments.length;
if (args < 3) { if (args < 3) {
type = null; type = null;
} }
return new FakeBlob( return new FakeBlob(
this.data.slice(start, args > 1 ? end : this.data.length) this.data.slice(start, args > 1 ? end : this.data.length)
, type , type
, this.encoding , this.encoding
); );
}; };
FB_proto.toString = function() { FB_proto.toString = function() {
return "[object Blob]"; return "[object Blob]";
}; };
FB_proto.close = function() { FB_proto.close = function() {
this.size = 0; this.size = 0;
delete this.data; delete this.data;
}; };
return FakeBlobBuilder; return FakeBlobBuilder;
}(view)); }(view));
view.Blob = function(blobParts, options) { view.Blob = function(blobParts, options) {
var type = options ? (options.type || "") : ""; var type = options ? (options.type || "") : "";
var builder = new BlobBuilder(); var builder = new BlobBuilder();
if (blobParts) { if (blobParts) {
for (var i = 0, len = blobParts.length; i < len; i++) { for (var i = 0, len = blobParts.length; i < len; i++) {
if (Uint8Array && blobParts[i] instanceof Uint8Array) { if (Uint8Array && blobParts[i] instanceof Uint8Array) {
builder.append(blobParts[i].buffer); builder.append(blobParts[i].buffer);
} }
else { else {
builder.append(blobParts[i]); builder.append(blobParts[i]);
} }
} }
} }
var blob = builder.getBlob(type); var blob = builder.getBlob(type);
if (!blob.slice && blob.webkitSlice) { if (!blob.slice && blob.webkitSlice) {
blob.slice = blob.webkitSlice; blob.slice = blob.webkitSlice;
} }
return blob; return blob;
}; };
var getPrototypeOf = Object.getPrototypeOf || function(object) { var getPrototypeOf = Object.getPrototypeOf || function(object) {
return object.__proto__; return object.__proto__;
}; };
view.Blob.prototype = getPrototypeOf(new view.Blob()); view.Blob.prototype = getPrototypeOf(new view.Blob());
}(typeof self !== "undefined" && self || typeof window !== "undefined" && window || undefined.content || undefined)); }(typeof self !== "undefined" && self || typeof window !== "undefined" && window || undefined.content || undefined));
/* FileSaver.js /* FileSaver.js
+59 -15
View File
@@ -7,9 +7,10 @@ Created on: 10.05.24
""" """
import base64 import base64
import hashlib import hashlib
import http
import json import json
from cryptography.fernet import Fernet from cryptography.fernet import Fernet, InvalidToken
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpRequest, JsonResponse from django.http import HttpRequest, JsonResponse
from django.utils.decorators import method_decorator from django.utils.decorators import method_decorator
@@ -27,34 +28,77 @@ class PropagateUserView(View):
proper rights management) proper rights management)
""" """
class PropagateStatus:
UNPROCESSED = "unprocessed"
UPDATED = "updated"
CREATED = "created"
@method_decorator(csrf_exempt) @method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs): def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs) return super().dispatch(request, *args, **kwargs)
def post(self, request: HttpRequest, *args, **kwargs): def post(self, request: HttpRequest, *args, **kwargs):
response_data = {
"success": None,
"status": None
}
# Decrypt # Decrypt
encrypted_body = request.body encrypted_body = request.body
_hash = hashlib.md5() _hash = hashlib.md5()
_hash.update(PROPAGATION_SECRET.encode("utf-8")) _hash.update(PROPAGATION_SECRET.encode("utf-8"))
key = base64.urlsafe_b64encode(_hash.hexdigest().encode("utf-8")) key = base64.urlsafe_b64encode(_hash.hexdigest().encode("utf-8"))
fernet = Fernet(key) fernet = Fernet(key)
body = fernet.decrypt(encrypted_body).decode("utf-8")
body = json.loads(body)
try: try:
status = "updated" body = fernet.decrypt(encrypted_body).decode("utf-8")
user = User.resolve_user_using_propagation_data(body) body = json.loads(body)
user = user.update_user_using_propagation_data(body) except InvalidToken:
except ObjectDoesNotExist: response_data["error"] = "Invalid Token"
user = User(**body) response_data["success"] = False
status = "created" response_data["status"] = self.PropagateStatus.UNPROCESSED
return JsonResponse(
status=http.HTTPStatus.UNPROCESSABLE_CONTENT,
data=response_data
)
except (json.JSONDecodeError) as e:
response_data["error"] = str(e)
response_data["success"] = False
response_data["status"] = self.PropagateStatus.UNPROCESSED
return JsonResponse(
status=http.HTTPStatus.UNPROCESSABLE_CONTENT,
data=response_data
)
# Process decrypted user data
processing_ret_vals = self.__process_user_data(body)
response_data["success"] = processing_ret_vals[0]
response_data["status"] = processing_ret_vals[1]
user = processing_ret_vals[2]
user.set_unusable_password() user.set_unusable_password()
user.save() user.save()
data = { return JsonResponse(
"success": True, status=http.HTTPStatus.OK,
"status": status data=response_data
} )
return JsonResponse(data) def __process_user_data(self, body: dict) -> (bool, str, User):
""" Process decrypted user data
Args:
body:
Returns:
success (bool): Whether the processing was successful
status (str): In which way the data was used ('created' | 'updated')
user (User): Processed user object
"""
try:
user = User.resolve_user_using_propagation_data(body)
user = user.update_user_using_propagation_data(body)
status = self.PropagateStatus.UPDATED
except ObjectDoesNotExist:
user = User(**body)
status = self.PropagateStatus.CREATED
return True, status, user