diff --git a/api/models/external_identifier.py b/api/models/external_identifier.py index 597197f7..47d0a580 100644 --- a/api/models/external_identifier.py +++ b/api/models/external_identifier.py @@ -30,4 +30,22 @@ class ExternalIdentifier(models.Model): ) def __str__(self): - return f"{self.external_id} -> {self.internal_id}" \ No newline at end of file + return f"{self.external_id} -> {self.internal_id}" + + @staticmethod + def resolve_external_identifier(external_identifier: str): + """ Returns a ExternalIdentifier object, if the given parameter could be resolved as an external identifier. + + Args: + external_identifier (str): An external identifier. + + Returns: + ExternalIdentifier | None + """ + if external_identifier: + try: + obj = ExternalIdentifier.objects.get(external_id=external_identifier) + return obj + except ExternalIdentifier.DoesNotExist: + pass + return None diff --git a/api/tests/v1/share/test_api_sharing.py b/api/tests/v1/share/test_api_sharing.py index 8aff6819..fb7fd5d3 100644 --- a/api/tests/v1/share/test_api_sharing.py +++ b/api/tests/v1/share/test_api_sharing.py @@ -31,9 +31,10 @@ class APIV1SharingTestCase(BaseAPIV1TestCase): def setUpTestData(cls): super().setUpTestData() - def _run_share_request(self, url, user_list: list): + def _run_share_request(self, url, user_list: list, team_list: list): data = { - "users": user_list + "users": user_list, + "teams": team_list } data = json.dumps(data) response = self.client.put( @@ -58,16 +59,22 @@ class APIV1SharingTestCase(BaseAPIV1TestCase): self.superuser.username, self.user.username, ] + team_list = [ + str(self.team.id), + ] - response = self._run_share_request(url, user_list) + response = self._run_share_request(url, user_list, team_list) # Must fail, since performing user has no access on requested object self.assertEqual(response.status_code, 500) self.assertTrue(len(json.loads(response.content.decode("utf-8")).get("errors", [])) > 0) - # Add performing user to shared access users and rerun the request + # Add performing user to shared access users, switch from team id to team name and rerun the request obj.users.add(self.superuser) - response = self._run_share_request(url, user_list) + team_list = [ + self.team.name + ] + response = self._run_share_request(url, user_list, team_list) shared_users = obj.shared_users self.assertEqual(response.status_code, 200) @@ -84,14 +91,14 @@ class APIV1SharingTestCase(BaseAPIV1TestCase): share_url = reverse("api:v1:intervention-share", args=(self.intervention.id,)) # Expect the first request to work properly self.intervention.users.add(self.superuser) - response = self._run_share_request(share_url, [self.superuser.username]) + response = self._run_share_request(share_url, [self.superuser.username], [str(self.team.id)]) self.assertEqual(response.status_code, 200) # Change the token self.header_data["HTTP_ksptoken"] = f"{self.superuser.api_token.token}__X" # Expect the request to fail now - response = self._run_share_request(share_url, [self.superuser.username]) + response = self._run_share_request(share_url, [self.superuser.username], []) self.assertEqual(response.status_code, 403) def test_api_intervention_sharing(self): @@ -144,11 +151,11 @@ class APIV1SharingTestCase(BaseAPIV1TestCase): self.assertEqual(self.intervention.users.count(), 1) # Try to add another user via API -> must work! - response = self._run_share_request(share_url, [self.superuser.username, self.user.username]) + response = self._run_share_request(share_url, [self.superuser.username, self.user.username], []) self.assertEqual(response.status_code, 200) self.assertEqual(self.intervention.users.count(), 2) # Now try to remove the user again -> expect no changes at all to the shared user list - response = self._run_share_request(share_url, [self.superuser.username]) + response = self._run_share_request(share_url, [self.superuser.username], []) self.assertEqual(response.status_code, 200) self.assertEqual(self.intervention.users.count(), 2) diff --git a/api/utils/serializer/v1/serializer.py b/api/utils/serializer/v1/serializer.py index d09a4619..b22aa504 100644 --- a/api/utils/serializer/v1/serializer.py +++ b/api/utils/serializer/v1/serializer.py @@ -179,13 +179,7 @@ class AbstractModelAPISerializerV1(AbstractModelAPISerializer): Returns: ExternalIdentifier | None """ - if external_identifier: - try: - obj = ExternalIdentifier.objects.get(external_id=external_identifier) - return obj - except ObjectDoesNotExist: - pass - return None + return ExternalIdentifier.resolve_external_identifier(external_identifier) def _check_external_identifier_on_entry_creation(self, external_identifier): """ Special check for POST processing: diff --git a/api/views/views.py b/api/views/views.py index 376d9d7f..062cabb4 100644 --- a/api/views/views.py +++ b/api/views/views.py @@ -6,13 +6,14 @@ Created on: 21.01.22 """ import json +import uuid -from django.db.models import QuerySet +from django.db.models import QuerySet, Q from django.http import JsonResponse, HttpRequest from django.views import View from django.views.decorators.csrf import csrf_exempt -from api.models import APIUserToken +from api.models import APIUserToken, ExternalIdentifier from api.settings import KSP_TOKEN_HEADER_IDENTIFIER, KSP_USER_HEADER_IDENTIFIER from compensation.models import EcoAccount from ema.models import Ema @@ -205,6 +206,10 @@ class AbstractModelShareAPIView(AbstractAPIView): """ try: + external_identifier = ExternalIdentifier.resolve_external_identifier(id) + if external_identifier: + id = external_identifier.internal_id + users = self._get_shared_users_of_object(id) teams = self._get_shared_teams_of_object(id) except Exception as e: @@ -237,6 +242,9 @@ class AbstractModelShareAPIView(AbstractAPIView): """ try: + external_identifier = ExternalIdentifier.resolve_external_identifier(id) + if external_identifier: + id = external_identifier.internal_id success = self._process_put_body(request.body, id) except Exception as e: return self._return_error_response(e) @@ -309,22 +317,36 @@ class AbstractModelShareAPIView(AbstractAPIView): raise ValueError("Shared user list must not be empty!") new_teams = content.get("teams", []) + self.__process_user_sharing(new_users, obj) + self.__process_team_sharing(new_teams, obj) + + return True + + def __process_user_sharing(self, user_list: list, obj): + """ Processes API sharing for user payload + + Args: + user_list (list): A list of users to share the obj with + obj (BaseObject): The shareable object + + Returns: + + """ # Eliminate duplicates - new_users = list(dict.fromkeys(new_users)) - new_teams = list(dict.fromkeys(new_teams)) + new_users = list(dict.fromkeys(user_list)) # Make sure each of these names exist as a user new_users_objs = [] for user in new_users: - new_users_objs.append(User.objects.get(username=user)) - - # Make sure each of these names exist as a user - new_teams_objs = [] - for team_name in new_teams: - new_teams_objs.append(Team.objects.get(name=team_name)) + try: + user_obj = User.objects.get(username=user) + except User.DoesNotExist: + raise AssertionError(f"User with username {user} does not exist") + new_users_objs.append(user_obj) if self.user.is_default_group_only(): # Default only users are not allowed to remove other users from having access. They can only add new ones! + # So we need to keep the ones that already have access from being removed! new_users_to_be_added = User.objects.filter( username__in=new_users ).exclude( @@ -332,17 +354,44 @@ class AbstractModelShareAPIView(AbstractAPIView): ) new_users_objs = obj.shared_users.union(new_users_to_be_added) - new_teams_to_be_added = Team.objects.filter( - name__in=new_teams - ).exclude( - id__in=obj.shared_teams - ) - new_teams_objs = obj.shared_teams.union(new_teams_to_be_added) - obj.share_with_user_list(new_users_objs) - obj.share_with_team_list(new_teams_objs) - return True + def __process_team_sharing(self, team_list: list, obj): + """ Processes API sharing for team payload + + Args: + team_list (list): A list of teams to share the obj with + obj (BaseObject): The shareable object + + Returns: + + """ + # Eliminate duplicates + new_teams = list(dict.fromkeys(team_list)) + + # Resolve team names or ids into objects + new_team_ids = [] + for team in new_teams: + try: + uuid.UUID(team) + try: + new_team_ids.append(Team.objects.get(id=team).id) + except Team.DoesNotExist: + raise AssertionError(f"Team with id {team} does not exist!") + except ValueError: + # entry is a name and not a uuid -> try to resolve as name! + try: + new_team_ids.append(Team.objects.get(name=team).id) + except Team.DoesNotExist: + raise AssertionError(f"Team with name {team} does not exist!") + + new_team_objs = Team.objects.filter(id__in=new_team_ids) + if self.user.is_default_group_only(): + # Default only users are not allowed to remove other users from having access. They can only add new ones! + # So we need to keep the ones that already have access from being removed! + new_team_objs = obj.shared_teams.union(new_team_objs) + + obj.share_with_team_list(new_team_objs) class InterventionAPIShareView(AbstractModelShareAPIView): model = Intervention