Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 93d29982a6 | |||
| 59a1bdfb1c | |||
| 056a92b068 |
@@ -31,3 +31,21 @@ class ExternalIdentifier(models.Model):
|
|||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f"{self.external_id} -> {self.internal_id}"
|
return f"{self.external_id} -> {self.internal_id}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def resolve_external_identifier(external_identifier: str):
|
||||||
|
""" Returns a ExternalIdentifier object, if the given parameter could be resolved as an external identifier.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
external_identifier (str): An external identifier.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ExternalIdentifier | None
|
||||||
|
"""
|
||||||
|
if external_identifier:
|
||||||
|
try:
|
||||||
|
obj = ExternalIdentifier.objects.get(external_id=external_identifier)
|
||||||
|
return obj
|
||||||
|
except ExternalIdentifier.DoesNotExist:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|||||||
@@ -31,9 +31,10 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
|
|||||||
def setUpTestData(cls):
|
def setUpTestData(cls):
|
||||||
super().setUpTestData()
|
super().setUpTestData()
|
||||||
|
|
||||||
def _run_share_request(self, url, user_list: list):
|
def _run_share_request(self, url, user_list: list, team_list: list):
|
||||||
data = {
|
data = {
|
||||||
"users": user_list
|
"users": user_list,
|
||||||
|
"teams": team_list
|
||||||
}
|
}
|
||||||
data = json.dumps(data)
|
data = json.dumps(data)
|
||||||
response = self.client.put(
|
response = self.client.put(
|
||||||
@@ -58,16 +59,22 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
|
|||||||
self.superuser.username,
|
self.superuser.username,
|
||||||
self.user.username,
|
self.user.username,
|
||||||
]
|
]
|
||||||
|
team_list = [
|
||||||
|
str(self.team.id),
|
||||||
|
]
|
||||||
|
|
||||||
response = self._run_share_request(url, user_list)
|
response = self._run_share_request(url, user_list, team_list)
|
||||||
|
|
||||||
# Must fail, since performing user has no access on requested object
|
# Must fail, since performing user has no access on requested object
|
||||||
self.assertEqual(response.status_code, 500)
|
self.assertEqual(response.status_code, 500)
|
||||||
self.assertTrue(len(json.loads(response.content.decode("utf-8")).get("errors", [])) > 0)
|
self.assertTrue(len(json.loads(response.content.decode("utf-8")).get("errors", [])) > 0)
|
||||||
|
|
||||||
# Add performing user to shared access users and rerun the request
|
# Add performing user to shared access users, switch from team id to team name and rerun the request
|
||||||
obj.users.add(self.superuser)
|
obj.users.add(self.superuser)
|
||||||
response = self._run_share_request(url, user_list)
|
team_list = [
|
||||||
|
self.team.name
|
||||||
|
]
|
||||||
|
response = self._run_share_request(url, user_list, team_list)
|
||||||
|
|
||||||
shared_users = obj.shared_users
|
shared_users = obj.shared_users
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
@@ -84,14 +91,14 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
|
|||||||
share_url = reverse("api:v1:intervention-share", args=(self.intervention.id,))
|
share_url = reverse("api:v1:intervention-share", args=(self.intervention.id,))
|
||||||
# Expect the first request to work properly
|
# Expect the first request to work properly
|
||||||
self.intervention.users.add(self.superuser)
|
self.intervention.users.add(self.superuser)
|
||||||
response = self._run_share_request(share_url, [self.superuser.username])
|
response = self._run_share_request(share_url, [self.superuser.username], [str(self.team.id)])
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
# Change the token
|
# Change the token
|
||||||
self.header_data["HTTP_ksptoken"] = f"{self.superuser.api_token.token}__X"
|
self.header_data["HTTP_ksptoken"] = f"{self.superuser.api_token.token}__X"
|
||||||
|
|
||||||
# Expect the request to fail now
|
# Expect the request to fail now
|
||||||
response = self._run_share_request(share_url, [self.superuser.username])
|
response = self._run_share_request(share_url, [self.superuser.username], [])
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
def test_api_intervention_sharing(self):
|
def test_api_intervention_sharing(self):
|
||||||
@@ -144,11 +151,11 @@ class APIV1SharingTestCase(BaseAPIV1TestCase):
|
|||||||
self.assertEqual(self.intervention.users.count(), 1)
|
self.assertEqual(self.intervention.users.count(), 1)
|
||||||
|
|
||||||
# Try to add another user via API -> must work!
|
# Try to add another user via API -> must work!
|
||||||
response = self._run_share_request(share_url, [self.superuser.username, self.user.username])
|
response = self._run_share_request(share_url, [self.superuser.username, self.user.username], [])
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertEqual(self.intervention.users.count(), 2)
|
self.assertEqual(self.intervention.users.count(), 2)
|
||||||
|
|
||||||
# Now try to remove the user again -> expect no changes at all to the shared user list
|
# Now try to remove the user again -> expect no changes at all to the shared user list
|
||||||
response = self._run_share_request(share_url, [self.superuser.username])
|
response = self._run_share_request(share_url, [self.superuser.username], [])
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertEqual(self.intervention.users.count(), 2)
|
self.assertEqual(self.intervention.users.count(), 2)
|
||||||
|
|||||||
@@ -179,13 +179,7 @@ class AbstractModelAPISerializerV1(AbstractModelAPISerializer):
|
|||||||
Returns:
|
Returns:
|
||||||
ExternalIdentifier | None
|
ExternalIdentifier | None
|
||||||
"""
|
"""
|
||||||
if external_identifier:
|
return ExternalIdentifier.resolve_external_identifier(external_identifier)
|
||||||
try:
|
|
||||||
obj = ExternalIdentifier.objects.get(external_id=external_identifier)
|
|
||||||
return obj
|
|
||||||
except ObjectDoesNotExist:
|
|
||||||
pass
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _check_external_identifier_on_entry_creation(self, external_identifier):
|
def _check_external_identifier_on_entry_creation(self, external_identifier):
|
||||||
""" Special check for POST processing:
|
""" Special check for POST processing:
|
||||||
|
|||||||
@@ -6,13 +6,14 @@ Created on: 21.01.22
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
|
import uuid
|
||||||
|
|
||||||
from django.db.models import QuerySet
|
from django.db.models import QuerySet, Q
|
||||||
from django.http import JsonResponse, HttpRequest
|
from django.http import JsonResponse, HttpRequest
|
||||||
from django.views import View
|
from django.views import View
|
||||||
from django.views.decorators.csrf import csrf_exempt
|
from django.views.decorators.csrf import csrf_exempt
|
||||||
|
|
||||||
from api.models import APIUserToken
|
from api.models import APIUserToken, ExternalIdentifier
|
||||||
from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER
|
from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER
|
||||||
from compensation.models import EcoAccount
|
from compensation.models import EcoAccount
|
||||||
from ema.models import Ema
|
from ema.models import Ema
|
||||||
@@ -205,6 +206,10 @@ class AbstractModelShareAPIView(AbstractAPIView):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
external_identifier = ExternalIdentifier.resolve_external_identifier(id)
|
||||||
|
if external_identifier:
|
||||||
|
id = external_identifier.internal_id
|
||||||
|
|
||||||
users = self._get_shared_users_of_object(id)
|
users = self._get_shared_users_of_object(id)
|
||||||
teams = self._get_shared_teams_of_object(id)
|
teams = self._get_shared_teams_of_object(id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -237,6 +242,9 @@ class AbstractModelShareAPIView(AbstractAPIView):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
external_identifier = ExternalIdentifier.resolve_external_identifier(id)
|
||||||
|
if external_identifier:
|
||||||
|
id = external_identifier.internal_id
|
||||||
success = self._process_put_body(request.body, id)
|
success = self._process_put_body(request.body, id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return self._return_error_response(e)
|
return self._return_error_response(e)
|
||||||
@@ -309,22 +317,36 @@ class AbstractModelShareAPIView(AbstractAPIView):
|
|||||||
raise ValueError("Shared user list must not be empty!")
|
raise ValueError("Shared user list must not be empty!")
|
||||||
new_teams = content.get("teams", [])
|
new_teams = content.get("teams", [])
|
||||||
|
|
||||||
|
self.__process_user_sharing(new_users, obj)
|
||||||
|
self.__process_team_sharing(new_teams, obj)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __process_user_sharing(self, user_list: list, obj):
|
||||||
|
""" Processes API sharing for user payload
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_list (list): A list of users to share the obj with
|
||||||
|
obj (BaseObject): The shareable object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
# Eliminate duplicates
|
# Eliminate duplicates
|
||||||
new_users = list(dict.fromkeys(new_users))
|
new_users = list(dict.fromkeys(user_list))
|
||||||
new_teams = list(dict.fromkeys(new_teams))
|
|
||||||
|
|
||||||
# Make sure each of these names exist as a user
|
# Make sure each of these names exist as a user
|
||||||
new_users_objs = []
|
new_users_objs = []
|
||||||
for user in new_users:
|
for user in new_users:
|
||||||
new_users_objs.append(User.objects.get(username=user))
|
try:
|
||||||
|
user_obj = User.objects.get(username=user)
|
||||||
# Make sure each of these names exist as a user
|
except User.DoesNotExist:
|
||||||
new_teams_objs = []
|
raise AssertionError(f"User with username {user} does not exist")
|
||||||
for team_name in new_teams:
|
new_users_objs.append(user_obj)
|
||||||
new_teams_objs.append(Team.objects.get(name=team_name))
|
|
||||||
|
|
||||||
if self.user.is_default_group_only():
|
if self.user.is_default_group_only():
|
||||||
# Default only users are not allowed to remove other users from having access. They can only add new ones!
|
# Default only users are not allowed to remove other users from having access. They can only add new ones!
|
||||||
|
# So we need to keep the ones that already have access from being removed!
|
||||||
new_users_to_be_added = User.objects.filter(
|
new_users_to_be_added = User.objects.filter(
|
||||||
username__in=new_users
|
username__in=new_users
|
||||||
).exclude(
|
).exclude(
|
||||||
@@ -332,17 +354,44 @@ class AbstractModelShareAPIView(AbstractAPIView):
|
|||||||
)
|
)
|
||||||
new_users_objs = obj.shared_users.union(new_users_to_be_added)
|
new_users_objs = obj.shared_users.union(new_users_to_be_added)
|
||||||
|
|
||||||
new_teams_to_be_added = Team.objects.filter(
|
|
||||||
name__in=new_teams
|
|
||||||
).exclude(
|
|
||||||
id__in=obj.shared_teams
|
|
||||||
)
|
|
||||||
new_teams_objs = obj.shared_teams.union(new_teams_to_be_added)
|
|
||||||
|
|
||||||
obj.share_with_user_list(new_users_objs)
|
obj.share_with_user_list(new_users_objs)
|
||||||
obj.share_with_team_list(new_teams_objs)
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
def __process_team_sharing(self, team_list: list, obj):
|
||||||
|
""" Processes API sharing for team payload
|
||||||
|
|
||||||
|
Args:
|
||||||
|
team_list (list): A list of teams to share the obj with
|
||||||
|
obj (BaseObject): The shareable object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
# Eliminate duplicates
|
||||||
|
new_teams = list(dict.fromkeys(team_list))
|
||||||
|
|
||||||
|
# Resolve team names or ids into objects
|
||||||
|
new_team_ids = []
|
||||||
|
for team in new_teams:
|
||||||
|
try:
|
||||||
|
uuid.UUID(team)
|
||||||
|
try:
|
||||||
|
new_team_ids.append(Team.objects.get(id=team).id)
|
||||||
|
except Team.DoesNotExist:
|
||||||
|
raise AssertionError(f"Team with id {team} does not exist!")
|
||||||
|
except ValueError:
|
||||||
|
# entry is a name and not a uuid -> try to resolve as name!
|
||||||
|
try:
|
||||||
|
new_team_ids.append(Team.objects.get(name=team).id)
|
||||||
|
except Team.DoesNotExist:
|
||||||
|
raise AssertionError(f"Team with name {team} does not exist!")
|
||||||
|
|
||||||
|
new_team_objs = Team.objects.filter(id__in=new_team_ids)
|
||||||
|
if self.user.is_default_group_only():
|
||||||
|
# Default only users are not allowed to remove other users from having access. They can only add new ones!
|
||||||
|
# So we need to keep the ones that already have access from being removed!
|
||||||
|
new_team_objs = obj.shared_teams.union(new_team_objs)
|
||||||
|
|
||||||
|
obj.share_with_team_list(new_team_objs)
|
||||||
|
|
||||||
class InterventionAPIShareView(AbstractModelShareAPIView):
|
class InterventionAPIShareView(AbstractModelShareAPIView):
|
||||||
model = Intervention
|
model = Intervention
|
||||||
|
|||||||
Reference in New Issue
Block a user