Compare commits

...

8 Commits

Author SHA1 Message Date
mpeltriaux 0425430e65 Merge pull request '# HOTFIX' (#547) from hotfix_indexErrorOnAPIGet into master
Reviewed-on: #547
2026-06-20 09:27:55 +02:00
mpeltriaux fec313445d # HOTFIX
* fixes a bug where detected GeometryConflicts with deleted entries would case an IndexError
2026-06-20 09:27:02 +02:00
mpeltriaux 28db96b081 Merge pull request '# Geometry conflicts on API' (#544) from 534_Return_geometry_conflicts_on_API into master
Reviewed-on: #544
2026-06-17 11:58:55 +02:00
mpeltriaux cf53e69d74 # Geometry conflicts on API
* refactors internal fetching of GeometryConflict data
* adds serializing of GeometryConflict entry data (identifier, id) to GET API calls
2026-06-17 11:57:19 +02:00
mpeltriaux 6f2b6c44d9 Merge pull request '541 security improvements' (#542) from 541_Security_improvements into master
Reviewed-on: #542
2026-06-13 13:42:46 +02:00
mpeltriaux 494e80a4ac # Intervention remove compensation
* adds default role check for intervention's compensation removing endpoint
2026-06-13 13:41:45 +02:00
mpeltriaux 1f6c81874b # User propagation
* adds exception catching if gibberish data is sent to POST endpoint of user data propagation
* commits 'changed' manage.py and jspdf.debug.js despite being identical with repo files (git wants it, git gets it)
2026-06-13 13:33:17 +02:00
mpeltriaux 9ee016a8bb # Token generator
* improves reliability of generated token randomness
2026-06-13 12:57:19 +02:00
10 changed files with 364 additions and 252 deletions
+26
View File
@@ -202,3 +202,29 @@ class AbstractModelAPISerializer:
obj (Intervention)
"""
raise NotImplementedError("Must be implemented in subclasses")
def _geometry_conflicts_to_list(self, geometry) -> list:
""" Serializes geometry conflict ids into dict
Args:
geometry (Geometry): The geometry to fetch geometry conflicts from
Returns:
ids (list): Serialized geometry conflicts as dict objects inside a list
"""
ids = []
conflict_geometries = geometry.get_conflict_geometries()
for geom in conflict_geometries:
try:
data = geom.get_data_objects(["identifier", "id"])[0]
except KeyError:
raise AssertionError(f"Geometry {geom.id} is not attached to any entries. Contact an admin!")
ids.append(
{
"identifier": data["identifier"],
"id": data["id"],
}
)
return ids
+1
View File
@@ -54,6 +54,7 @@ class AbstractModelAPISerializerV1(AbstractModelAPISerializer):
"created_on": self._created_on_to_json(entry),
"modified_on": self._modified_on_to_json(entry),
"external_identifiers": ext_ids,
"geometry_conflicts": self._geometry_conflicts_to_list(entry.geometry)
}
self._extend_properties_data(entry)
geo_json["properties"] = self.properties_data
+9
View File
@@ -36,6 +36,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.run_check_url = reverse("intervention:check", args=(self.intervention.id,))
self.record_url = reverse("intervention:record", args=(self.intervention.id,))
self.report_url = reverse("intervention:report", args=(self.intervention.id,))
self.compensation_remove_url = reverse("intervention:remove-compensation", args=(self.compensation.intervention.id, self.compensation.id))
self.deduction.intervention = self.intervention
self.deduction.save()
@@ -83,6 +84,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.revocation_new_url: f"{login_redirect_base}{self.revocation_new_url}",
self.revocation_edit_url: f"{login_redirect_base}{self.revocation_edit_url}",
self.revocation_remove_url: f"{login_redirect_base}{self.revocation_remove_url}",
self.compensation_remove_url: f"{login_redirect_base}{self.compensation_remove_url}",
}
self.assert_url_success(client, success_urls)
@@ -124,6 +126,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url,
self.deduction_edit_url,
self.deduction_remove_url,
self.compensation_remove_url,
]
self.assert_url_success(client, success_urls)
@@ -162,6 +165,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url,
self.deduction_edit_url,
self.deduction_remove_url,
self.compensation_remove_url,
]
fail_urls = [
self.run_check_url,
@@ -212,6 +216,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url,
self.deduction_edit_url,
self.deduction_remove_url,
self.compensation_remove_url,
]
success_urls_redirect = {
self.share_url: self.detail_url
@@ -258,6 +263,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url,
self.deduction_edit_url,
self.deduction_remove_url,
self.compensation_remove_url,
]
success_urls_redirect = {
self.share_url: self.detail_url
@@ -304,6 +310,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url,
self.deduction_edit_url,
self.deduction_remove_url,
self.compensation_remove_url,
]
success_urls_redirect = {
self.share_url: self.detail_url
@@ -350,6 +357,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url,
self.deduction_edit_url,
self.deduction_remove_url,
self.compensation_remove_url,
]
success_urls_redirect = {
self.share_url: self.detail_url
@@ -396,6 +404,7 @@ class InterventionViewTestCase(BaseViewTestCase):
self.deduction_new_url,
self.deduction_edit_url,
self.deduction_remove_url,
self.compensation_remove_url,
]
# Define urls where a redirect to a specific location is the proper response
success_urls_redirect = {
+3 -1
View File
@@ -14,7 +14,7 @@ from django.utils.decorators import method_decorator
from django.views import View
from intervention.models import Intervention
from konova.decorators import shared_access_required
from konova.decorators import shared_access_required, default_group_required
from konova.forms.modals import RemoveModalForm
from konova.utils.message_templates import COMPENSATION_REMOVED_TEMPLATE
@@ -45,10 +45,12 @@ class RemoveCompensationFromInterventionView(LoginRequiredMixin, View):
redirect_url=reverse("intervention:detail", args=(id,)) + "#related_data",
)
@method_decorator(default_group_required)
@method_decorator(shared_access_required(Intervention, "id"))
def get(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
return self.__process_request(request, id, comp_id, *args, **kwargs)
@method_decorator(default_group_required)
@method_decorator(shared_access_required(Intervention, "id"))
def post(self, request, id: str, comp_id: str, *args, **kwargs) -> HttpResponse:
return self.__process_request(request, id, comp_id, *args, **kwargs)
+47 -6
View File
@@ -125,7 +125,7 @@ class Geometry(BaseResource):
deleted=None
)
if limit_to_attrs:
objs += set_objs.values_list(*limit_to_attrs, flat=True)
objs += set_objs.values(*limit_to_attrs)
else:
objs += set_objs
@@ -135,18 +135,29 @@ class Geometry(BaseResource):
Q(deleted=None) & Q(intervention__deleted=None)
)
if limit_to_attrs:
objs += comp_objs.values_list(*limit_to_attrs, flat=True)
objs += comp_objs.values(*limit_to_attrs)
else:
objs += comp_objs
return objs
def get_data_object(self):
def get_data_object(self, limit_to_attrs: list = None):
"""
Getter for the specific data object which is related to this geometry
Getter for the specific data object which is related to this geometry.
!!! Only returns undeleted entries !!!
Returns:
result (str|None): Returns the desired attributes or None if the data object is marked as deleted
"""
objs = self.get_data_objects()
objs = self.get_data_objects(limit_to_attrs)
assert (len(objs) <= 1)
result = objs.pop()
try:
result = objs.pop()
except IndexError:
# If this happens, we just processed a GeometryConflict with an entry which is marked as deleted.
# Therefore we return None
result = None
return result
def update_parcels(self):
@@ -436,6 +447,16 @@ class Geometry(BaseResource):
output_geom.transform(DEFAULT_SRID_RLP)
return output_geom
def get_conflict_geometries(self):
""" Getter for geometry ids which conflict with this geometry or are conflicted by this one
Returns:
geom_ids (list): List of geometry ids
"""
conflict_geoms_id = GeometryConflict.get_conflict_geometries(self)
conflict_geoms = Geometry.objects.filter(id__in=conflict_geoms_id)
return conflict_geoms
class GeometryConflict(UuidModel):
"""
@@ -459,3 +480,23 @@ class GeometryConflict(UuidModel):
def __str__(self):
return f"{self.conflicting_geometry.id} conflicts with {self.affected_geometry.id}"
@staticmethod
def get_conflict_geometries(geometry: Geometry):
""" Getter for geometries which conflict in one or another way with the given one
Args:
geometry (Geometry): The geometry which shall be checked
Returns:
conflict_geometries (QuerySet): QuerySet of geometries which have conflicts with the given geometry
"""
conflict_geometries = GeometryConflict.objects.filter(
affected_geometry=geometry.id,
).values_list("conflicting_geometry__id", flat=True)
conflict_geometries = conflict_geometries.union(
GeometryConflict.objects.filter(
conflicting_geometry=geometry.id,
).values_list("affected_geometry__id", flat=True)
)
return conflict_geometries
+9 -16
View File
@@ -676,24 +676,17 @@ class GeoReferencedMixin(models.Model):
if self.geometry is None:
return request
instance_objs = []
needed_data_object_attrs = [
"identifier"
]
conflicts = self.geometry.conflicts_geometries.iterator()
conflicting_geometries = self.geometry.get_conflict_geometries()
data_object_identifiers = []
for conflicting_geom in conflicting_geometries:
data_obj_id = conflicting_geom.get_data_object(["identifier"])
if data_obj_id:
data_object_identifiers.append(data_obj_id)
for conflict in conflicts:
# Only check the affected geometry of this conflict, since we know the conflicting geometry is self.geometry
instance_objs += conflict.affected_geometry.get_data_objects(needed_data_object_attrs)
conflicts = self.geometry.conflicted_by_geometries.iterator()
for conflict in conflicts:
# Only check the conflicting geometry of this conflict, since we know the affected geometry is self.geometry
instance_objs += conflict.conflicting_geometry.get_data_objects(needed_data_object_attrs)
add_message = len(instance_objs) > 0
add_message = len(data_object_identifiers) > 0
if add_message:
instance_identifiers = ", ".join(instance_objs)
data_object_identifiers = [x["identifier"] for x in data_object_identifiers]
instance_identifiers = ", ".join(data_object_identifiers)
message_str = GEOMETRY_CONFLICT_WITH_TEMPLATE.format(instance_identifiers)
messages.info(request, message_str)
return request
+4 -8
View File
@@ -5,22 +5,18 @@ Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 09.11.20
"""
import random
import secrets
import string
import qrcode
import qrcode.image.svg
from io import BytesIO
def generate_token() -> str:
def generate_token(length: int = 64) -> str:
""" Shortcut for default generating of e.g. API token
Returns:
token (str)
"""
return generate_random_string(
length=64,
length=length,
use_numbers=True,
use_letters_lc=True
)
@@ -39,7 +35,7 @@ def generate_random_string(length: int, use_numbers: bool = False, use_letters_l
elements.append(string.ascii_uppercase)
elements = "".join(elements)
ret_val = "".join(random.choice(elements) for i in range(length))
ret_val = "".join(secrets.choice(elements) for i in range(length))
return ret_val
class IdentifierGenerator:
Executable → Regular
View File
@@ -9615,212 +9615,212 @@ Copyright (c) 2012 Willow Systems Corporation, willow-systems.com
};
})(jsPDF.API);
/* Blob.js
* A Blob implementation.
* 2014-07-24
*
* By Eli Grey, http://eligrey.com
* By Devin Samarin, https://github.com/dsamarin
* License: X11/MIT
* See https://github.com/eligrey/Blob.js/blob/master/LICENSE.md
*/
/*global self, unescape */
/*jslint bitwise: true, regexp: true, confusion: true, es5: true, vars: true, white: true,
plusplus: true */
/*! @source http://purl.eligrey.com/github/Blob.js/blob/master/Blob.js */
(function (view) {
"use strict";
view.URL = view.URL || view.webkitURL;
if (view.Blob && view.URL) {
try {
new Blob;
return;
} catch (e) {}
}
// Internally we use a BlobBuilder implementation to base Blob off of
// in order to support older browsers that only have BlobBuilder
var BlobBuilder = view.BlobBuilder || view.WebKitBlobBuilder || view.MozBlobBuilder || (function(view) {
var
get_class = function(object) {
return Object.prototype.toString.call(object).match(/^\[object\s(.*)\]$/)[1];
}
, FakeBlobBuilder = function BlobBuilder() {
this.data = [];
}
, FakeBlob = function Blob(data, type, encoding) {
this.data = data;
this.size = data.length;
this.type = type;
this.encoding = encoding;
}
, FBB_proto = FakeBlobBuilder.prototype
, FB_proto = FakeBlob.prototype
, FileReaderSync = view.FileReaderSync
, FileException = function(type) {
this.code = this[this.name = type];
}
, file_ex_codes = (
"NOT_FOUND_ERR SECURITY_ERR ABORT_ERR NOT_READABLE_ERR ENCODING_ERR "
+ "NO_MODIFICATION_ALLOWED_ERR INVALID_STATE_ERR SYNTAX_ERR"
).split(" ")
, file_ex_code = file_ex_codes.length
, real_URL = view.URL || view.webkitURL || view
, real_create_object_URL = real_URL.createObjectURL
, real_revoke_object_URL = real_URL.revokeObjectURL
, URL = real_URL
, btoa = view.btoa
, atob = view.atob
, ArrayBuffer = view.ArrayBuffer
, Uint8Array = view.Uint8Array
, origin = /^[\w-]+:\/*\[?[\w\.:-]+\]?(?::[0-9]+)?/;
FakeBlob.fake = FB_proto.fake = true;
while (file_ex_code--) {
FileException.prototype[file_ex_codes[file_ex_code]] = file_ex_code + 1;
}
// Polyfill URL
if (!real_URL.createObjectURL) {
URL = view.URL = function(uri) {
var
uri_info = document.createElementNS("http://www.w3.org/1999/xhtml", "a")
, uri_origin;
uri_info.href = uri;
if (!("origin" in uri_info)) {
if (uri_info.protocol.toLowerCase() === "data:") {
uri_info.origin = null;
} else {
uri_origin = uri.match(origin);
uri_info.origin = uri_origin && uri_origin[1];
}
}
return uri_info;
};
}
URL.createObjectURL = function(blob) {
var
type = blob.type
, data_URI_header;
if (type === null) {
type = "application/octet-stream";
}
if (blob instanceof FakeBlob) {
data_URI_header = "data:" + type;
if (blob.encoding === "base64") {
return data_URI_header + ";base64," + blob.data;
} else if (blob.encoding === "URI") {
return data_URI_header + "," + decodeURIComponent(blob.data);
} if (btoa) {
return data_URI_header + ";base64," + btoa(blob.data);
} else {
return data_URI_header + "," + encodeURIComponent(blob.data);
}
} else if (real_create_object_URL) {
return real_create_object_URL.call(real_URL, blob);
}
};
URL.revokeObjectURL = function(object_URL) {
if (object_URL.substring(0, 5) !== "data:" && real_revoke_object_URL) {
real_revoke_object_URL.call(real_URL, object_URL);
}
};
FBB_proto.append = function(data/*, endings*/) {
var bb = this.data;
// decode data to a binary string
if (Uint8Array && (data instanceof ArrayBuffer || data instanceof Uint8Array)) {
var
str = ""
, buf = new Uint8Array(data)
, i = 0
, buf_len = buf.length;
for (; i < buf_len; i++) {
str += String.fromCharCode(buf[i]);
}
bb.push(str);
} else if (get_class(data) === "Blob" || get_class(data) === "File") {
if (FileReaderSync) {
var fr = new FileReaderSync;
bb.push(fr.readAsBinaryString(data));
} else {
// async FileReader won't work as BlobBuilder is sync
throw new FileException("NOT_READABLE_ERR");
}
} else if (data instanceof FakeBlob) {
if (data.encoding === "base64" && atob) {
bb.push(atob(data.data));
} else if (data.encoding === "URI") {
bb.push(decodeURIComponent(data.data));
} else if (data.encoding === "raw") {
bb.push(data.data);
}
} else {
if (typeof data !== "string") {
data += ""; // convert unsupported types to strings
}
// decode UTF-16 to binary string
bb.push(unescape(encodeURIComponent(data)));
}
};
FBB_proto.getBlob = function(type) {
if (!arguments.length) {
type = null;
}
return new FakeBlob(this.data.join(""), type, "raw");
};
FBB_proto.toString = function() {
return "[object BlobBuilder]";
};
FB_proto.slice = function(start, end, type) {
var args = arguments.length;
if (args < 3) {
type = null;
}
return new FakeBlob(
this.data.slice(start, args > 1 ? end : this.data.length)
, type
, this.encoding
);
};
FB_proto.toString = function() {
return "[object Blob]";
};
FB_proto.close = function() {
this.size = 0;
delete this.data;
};
return FakeBlobBuilder;
}(view));
view.Blob = function(blobParts, options) {
var type = options ? (options.type || "") : "";
var builder = new BlobBuilder();
if (blobParts) {
for (var i = 0, len = blobParts.length; i < len; i++) {
if (Uint8Array && blobParts[i] instanceof Uint8Array) {
builder.append(blobParts[i].buffer);
}
else {
builder.append(blobParts[i]);
}
}
}
var blob = builder.getBlob(type);
if (!blob.slice && blob.webkitSlice) {
blob.slice = blob.webkitSlice;
}
return blob;
};
var getPrototypeOf = Object.getPrototypeOf || function(object) {
return object.__proto__;
};
view.Blob.prototype = getPrototypeOf(new view.Blob());
/* Blob.js
* A Blob implementation.
* 2014-07-24
*
* By Eli Grey, http://eligrey.com
* By Devin Samarin, https://github.com/dsamarin
* License: X11/MIT
* See https://github.com/eligrey/Blob.js/blob/master/LICENSE.md
*/
/*global self, unescape */
/*jslint bitwise: true, regexp: true, confusion: true, es5: true, vars: true, white: true,
plusplus: true */
/*! @source http://purl.eligrey.com/github/Blob.js/blob/master/Blob.js */
(function (view) {
"use strict";
view.URL = view.URL || view.webkitURL;
if (view.Blob && view.URL) {
try {
new Blob;
return;
} catch (e) {}
}
// Internally we use a BlobBuilder implementation to base Blob off of
// in order to support older browsers that only have BlobBuilder
var BlobBuilder = view.BlobBuilder || view.WebKitBlobBuilder || view.MozBlobBuilder || (function(view) {
var
get_class = function(object) {
return Object.prototype.toString.call(object).match(/^\[object\s(.*)\]$/)[1];
}
, FakeBlobBuilder = function BlobBuilder() {
this.data = [];
}
, FakeBlob = function Blob(data, type, encoding) {
this.data = data;
this.size = data.length;
this.type = type;
this.encoding = encoding;
}
, FBB_proto = FakeBlobBuilder.prototype
, FB_proto = FakeBlob.prototype
, FileReaderSync = view.FileReaderSync
, FileException = function(type) {
this.code = this[this.name = type];
}
, file_ex_codes = (
"NOT_FOUND_ERR SECURITY_ERR ABORT_ERR NOT_READABLE_ERR ENCODING_ERR "
+ "NO_MODIFICATION_ALLOWED_ERR INVALID_STATE_ERR SYNTAX_ERR"
).split(" ")
, file_ex_code = file_ex_codes.length
, real_URL = view.URL || view.webkitURL || view
, real_create_object_URL = real_URL.createObjectURL
, real_revoke_object_URL = real_URL.revokeObjectURL
, URL = real_URL
, btoa = view.btoa
, atob = view.atob
, ArrayBuffer = view.ArrayBuffer
, Uint8Array = view.Uint8Array
, origin = /^[\w-]+:\/*\[?[\w\.:-]+\]?(?::[0-9]+)?/;
FakeBlob.fake = FB_proto.fake = true;
while (file_ex_code--) {
FileException.prototype[file_ex_codes[file_ex_code]] = file_ex_code + 1;
}
// Polyfill URL
if (!real_URL.createObjectURL) {
URL = view.URL = function(uri) {
var
uri_info = document.createElementNS("http://www.w3.org/1999/xhtml", "a")
, uri_origin;
uri_info.href = uri;
if (!("origin" in uri_info)) {
if (uri_info.protocol.toLowerCase() === "data:") {
uri_info.origin = null;
} else {
uri_origin = uri.match(origin);
uri_info.origin = uri_origin && uri_origin[1];
}
}
return uri_info;
};
}
URL.createObjectURL = function(blob) {
var
type = blob.type
, data_URI_header;
if (type === null) {
type = "application/octet-stream";
}
if (blob instanceof FakeBlob) {
data_URI_header = "data:" + type;
if (blob.encoding === "base64") {
return data_URI_header + ";base64," + blob.data;
} else if (blob.encoding === "URI") {
return data_URI_header + "," + decodeURIComponent(blob.data);
} if (btoa) {
return data_URI_header + ";base64," + btoa(blob.data);
} else {
return data_URI_header + "," + encodeURIComponent(blob.data);
}
} else if (real_create_object_URL) {
return real_create_object_URL.call(real_URL, blob);
}
};
URL.revokeObjectURL = function(object_URL) {
if (object_URL.substring(0, 5) !== "data:" && real_revoke_object_URL) {
real_revoke_object_URL.call(real_URL, object_URL);
}
};
FBB_proto.append = function(data/*, endings*/) {
var bb = this.data;
// decode data to a binary string
if (Uint8Array && (data instanceof ArrayBuffer || data instanceof Uint8Array)) {
var
str = ""
, buf = new Uint8Array(data)
, i = 0
, buf_len = buf.length;
for (; i < buf_len; i++) {
str += String.fromCharCode(buf[i]);
}
bb.push(str);
} else if (get_class(data) === "Blob" || get_class(data) === "File") {
if (FileReaderSync) {
var fr = new FileReaderSync;
bb.push(fr.readAsBinaryString(data));
} else {
// async FileReader won't work as BlobBuilder is sync
throw new FileException("NOT_READABLE_ERR");
}
} else if (data instanceof FakeBlob) {
if (data.encoding === "base64" && atob) {
bb.push(atob(data.data));
} else if (data.encoding === "URI") {
bb.push(decodeURIComponent(data.data));
} else if (data.encoding === "raw") {
bb.push(data.data);
}
} else {
if (typeof data !== "string") {
data += ""; // convert unsupported types to strings
}
// decode UTF-16 to binary string
bb.push(unescape(encodeURIComponent(data)));
}
};
FBB_proto.getBlob = function(type) {
if (!arguments.length) {
type = null;
}
return new FakeBlob(this.data.join(""), type, "raw");
};
FBB_proto.toString = function() {
return "[object BlobBuilder]";
};
FB_proto.slice = function(start, end, type) {
var args = arguments.length;
if (args < 3) {
type = null;
}
return new FakeBlob(
this.data.slice(start, args > 1 ? end : this.data.length)
, type
, this.encoding
);
};
FB_proto.toString = function() {
return "[object Blob]";
};
FB_proto.close = function() {
this.size = 0;
delete this.data;
};
return FakeBlobBuilder;
}(view));
view.Blob = function(blobParts, options) {
var type = options ? (options.type || "") : "";
var builder = new BlobBuilder();
if (blobParts) {
for (var i = 0, len = blobParts.length; i < len; i++) {
if (Uint8Array && blobParts[i] instanceof Uint8Array) {
builder.append(blobParts[i].buffer);
}
else {
builder.append(blobParts[i]);
}
}
}
var blob = builder.getBlob(type);
if (!blob.slice && blob.webkitSlice) {
blob.slice = blob.webkitSlice;
}
return blob;
};
var getPrototypeOf = Object.getPrototypeOf || function(object) {
return object.__proto__;
};
view.Blob.prototype = getPrototypeOf(new view.Blob());
}(typeof self !== "undefined" && self || typeof window !== "undefined" && window || undefined.content || undefined));
/* FileSaver.js
+59 -15
View File
@@ -7,9 +7,10 @@ Created on: 10.05.24
"""
import base64
import hashlib
import http
import json
from cryptography.fernet import Fernet
from cryptography.fernet import Fernet, InvalidToken
from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpRequest, JsonResponse
from django.utils.decorators import method_decorator
@@ -27,34 +28,77 @@ class PropagateUserView(View):
proper rights management)
"""
class PropagateStatus:
UNPROCESSED = "unprocessed"
UPDATED = "updated"
CREATED = "created"
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
def post(self, request: HttpRequest, *args, **kwargs):
response_data = {
"success": None,
"status": None
}
# Decrypt
encrypted_body = request.body
_hash = hashlib.md5()
_hash.update(PROPAGATION_SECRET.encode("utf-8"))
key = base64.urlsafe_b64encode(_hash.hexdigest().encode("utf-8"))
fernet = Fernet(key)
body = fernet.decrypt(encrypted_body).decode("utf-8")
body = json.loads(body)
try:
status = "updated"
user = User.resolve_user_using_propagation_data(body)
user = user.update_user_using_propagation_data(body)
except ObjectDoesNotExist:
user = User(**body)
status = "created"
body = fernet.decrypt(encrypted_body).decode("utf-8")
body = json.loads(body)
except InvalidToken:
response_data["error"] = "Invalid Token"
response_data["success"] = False
response_data["status"] = self.PropagateStatus.UNPROCESSED
return JsonResponse(
status=http.HTTPStatus.UNPROCESSABLE_CONTENT,
data=response_data
)
except (json.JSONDecodeError) as e:
response_data["error"] = str(e)
response_data["success"] = False
response_data["status"] = self.PropagateStatus.UNPROCESSED
return JsonResponse(
status=http.HTTPStatus.UNPROCESSABLE_CONTENT,
data=response_data
)
# Process decrypted user data
processing_ret_vals = self.__process_user_data(body)
response_data["success"] = processing_ret_vals[0]
response_data["status"] = processing_ret_vals[1]
user = processing_ret_vals[2]
user.set_unusable_password()
user.save()
data = {
"success": True,
"status": status
}
return JsonResponse(
status=http.HTTPStatus.OK,
data=response_data
)
return JsonResponse(data)
def __process_user_data(self, body: dict) -> (bool, str, User):
""" Process decrypted user data
Args:
body:
Returns:
success (bool): Whether the processing was successful
status (str): In which way the data was used ('created' | 'updated')
user (User): Processed user object
"""
try:
user = User.resolve_user_using_propagation_data(body)
user = user.update_user_using_propagation_data(body)
status = self.PropagateStatus.UPDATED
except ObjectDoesNotExist:
user = User(**body)
status = self.PropagateStatus.CREATED
return True, status, user