diff --git a/codelist/management/commands/export_codelist.py b/codelist/management/commands/export_codelist.py new file mode 100644 index 0000000..a80098e --- /dev/null +++ b/codelist/management/commands/export_codelist.py @@ -0,0 +1,68 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 10.11.21 + +""" +import csv + +from codelist.models import KonovaCodeList +from konova.management.commands.setup import BaseKonovaCommand + + +class Command(BaseKonovaCommand): + help = "Exports a single list of internal codes. Codelist identifier must be provided as argument" + list_id = 'list_id' + save_to = 'save_to' + + def add_arguments(self, parser): + try: + parser.add_argument(self.list_id, type=int) + except ValueError: + self._write_error("No list id provided") + exit(-1) + try: + parser.add_argument(self.save_to, type=str) + except ValueError: + self._write_error("No save to path given") + exit(-1) + + def handle(self, *args, **options): + try: + list_id = options[self.list_id] + self.save_to = options[self.save_to] + self._write_warning("Fetching codes...") + code_list = KonovaCodeList.objects.get( + id=list_id, + ) + codes = code_list.codes.filter( + is_selectable=True, + ).order_by( + "parent" + ) + header_row = [ + "Parent long name", + "Parent short name", + "Code long name", + "Code short name", + "Code ID", + ] + with open(self.save_to, 'w', newline='') as csvfile: + writer = csv.writer( + csvfile, + delimiter=' ', + quoting=csv.QUOTE_MINIMAL, + ) + writer.writerow(header_row) + for code in codes: + if code.parent is not None: + row = [code.parent.long_name, code.parent.short_name, code.long_name, code.short_name, code.id] + else: + row = ["", "", code.long_name, code.short_name, code.id] + #row = f"{code.parent.long_name};{code.parent.short_name};{code.long_name};{code.short_name};{code.id}" + writer.writerow(row) + + except KeyboardInterrupt: + self._break_line() + exit(-1) diff --git a/codelist/management/commands/update_codelist.py b/codelist/management/commands/update_codelist.py index 74bfd55..3807b29 100644 --- a/codelist/management/commands/update_codelist.py +++ b/codelist/management/commands/update_codelist.py @@ -23,7 +23,7 @@ bool_map = { class Command(BaseKonovaCommand): - help = "Performs test on collisions using the identifier generation" + help = "Updates internal codelist by external API" def handle(self, *args, **options): try: diff --git a/compensation/account_urls.py b/compensation/account_urls.py index d29c827..a2bfd6f 100644 --- a/compensation/account_urls.py +++ b/compensation/account_urls.py @@ -20,7 +20,11 @@ urlpatterns = [ path('/remove', remove_view, name='acc-remove'), path('/state/new', state_new_view, name='acc-new-state'), path('/action/new', action_new_view, name='acc-new-action'), + path('/state//remove', state_remove_view, name='acc-state-remove'), + path('/action//remove', action_remove_view, name='acc-action-remove'), path('/deadline/new', deadline_new_view, name="acc-new-deadline"), + path('/share/', share_view, name='share'), + path('/share', create_share_view, name='share-create'), # Documents path('/document/new/', new_document_view, name='acc-new-doc'), @@ -28,7 +32,7 @@ urlpatterns = [ path('document//remove/', remove_document_view, name='acc-remove-doc'), # Eco-account deductions - path('/remove/', deduction_remove_view, name='deduction-remove'), + path('/remove/', deduction_remove_view, name='acc-remove-deduction'), path('/deduct/new', new_deduction_view, name='acc-new-deduction'), ] \ No newline at end of file diff --git a/compensation/comp_urls.py b/compensation/comp_urls.py index a63a690..8eb7c48 100644 --- a/compensation/comp_urls.py +++ b/compensation/comp_urls.py @@ -20,6 +20,8 @@ urlpatterns = [ path('/remove', remove_view, name='remove'), path('/state/new', state_new_view, name='new-state'), path('/action/new', action_new_view, name='new-action'), + path('/state//remove', state_remove_view, name='state-remove'), + path('/action//remove', action_remove_view, name='action-remove'), path('/deadline/new', deadline_new_view, name="new-deadline"), path('/report', report_view, name='report'), @@ -28,10 +30,4 @@ urlpatterns = [ path('document/', get_document_view, name='get-doc'), path('document//remove/', remove_document_view, name='remove-doc'), - # Generic state routes - path('state//remove', state_remove_view, name='state-remove'), - - # Generic action routes - path('action//remove', action_remove_view, name='action-remove'), - ] \ No newline at end of file diff --git a/compensation/forms/forms.py b/compensation/forms/forms.py index eadee2f..1b9f050 100644 --- a/compensation/forms/forms.py +++ b/compensation/forms/forms.py @@ -380,7 +380,7 @@ class NewEcoAccountForm(AbstractCompensationForm, CompensationResponsibleFormMix legal=legal ) acc.fundings.set(fundings) - acc.users.add(user) + acc.share_with(user) # Add the log entry to the main objects log list acc.log.add(action) diff --git a/compensation/managers.py b/compensation/managers.py index c97cd51..61933ee 100644 --- a/compensation/managers.py +++ b/compensation/managers.py @@ -35,7 +35,9 @@ class CompensationManager(models.Manager): """ def get_queryset(self): - return super().get_queryset().select_related( + return super().get_queryset().filter( + deleted__isnull=True, + ).select_related( "modified", "intervention", "intervention__recorded", diff --git a/compensation/models.py b/compensation/models.py index 3755012..672d4e0 100644 --- a/compensation/models.py +++ b/compensation/models.py @@ -22,7 +22,7 @@ from compensation.managers import CompensationStateManager, EcoAccountDeductionM from compensation.utils.quality import CompensationQualityChecker, EcoAccountQualityChecker from intervention.models import Intervention, ResponsibilityData, LegalData from konova.models import BaseObject, BaseResource, Geometry, UuidModel, AbstractDocument, \ - generate_document_file_upload_path, RecordableMixin + generate_document_file_upload_path, RecordableObject, ShareableObject from konova.settings import DEFAULT_SRID_RLP, LANIS_LINK_TEMPLATE from user.models import UserActionLogEntry @@ -229,6 +229,20 @@ class Compensation(AbstractCompensation): self.identifier = self.generate_new_identifier() super().save(*args, **kwargs) + def is_shared_with(self, user: User): + """ Access check + + Checks whether a given user has access to this object + + Args: + user (User): The user to be checked + + Returns: + + """ + # Compensations inherit their shared state from the interventions + return self.intervention.is_shared_with(user) + def get_LANIS_link(self) -> str: """ Generates a link for LANIS depending on the geometry @@ -311,28 +325,11 @@ class CompensationDocument(AbstractDocument): pass -class EcoAccount(AbstractCompensation, RecordableMixin): +class EcoAccount(AbstractCompensation, ShareableObject, RecordableObject): """ An eco account is a kind of 'prepaid' compensation. It can be compared to an account that already has been filled with some kind of currency. From this account one is able to deduct currency for current projects. """ - # Users having access on this object - # Not needed in regular Compensation since their access is defined by the linked intervention's access - users = models.ManyToManyField( - User, - help_text="Users having access (shared with)" - ) - - # Refers to "verzeichnen" - recorded = models.OneToOneField( - UserActionLogEntry, - on_delete=models.SET_NULL, - null=True, - blank=True, - help_text="Holds data on user and timestamp of this action", - related_name="+" - ) - deductable_surface = models.FloatField( blank=True, null=True, diff --git a/compensation/templates/compensation/detail/compensation/includes/actions.html b/compensation/templates/compensation/detail/compensation/includes/actions.html index 08930a9..92cf45f 100644 --- a/compensation/templates/compensation/detail/compensation/includes/actions.html +++ b/compensation/templates/compensation/detail/compensation/includes/actions.html @@ -50,7 +50,7 @@ {{ action.comment|default_if_none:"" }} {% if is_default_member and has_access %} - {% endif %} diff --git a/compensation/templates/compensation/detail/compensation/includes/states-after.html b/compensation/templates/compensation/detail/compensation/includes/states-after.html index 6b37297..f8dd00a 100644 --- a/compensation/templates/compensation/detail/compensation/includes/states-after.html +++ b/compensation/templates/compensation/detail/compensation/includes/states-after.html @@ -51,7 +51,7 @@ {{ state.surface|floatformat:2 }} m² {% if is_default_member and has_access %} - {% endif %} diff --git a/compensation/templates/compensation/detail/compensation/includes/states-before.html b/compensation/templates/compensation/detail/compensation/includes/states-before.html index a0c541a..39b5e03 100644 --- a/compensation/templates/compensation/detail/compensation/includes/states-before.html +++ b/compensation/templates/compensation/detail/compensation/includes/states-before.html @@ -51,7 +51,7 @@ {{ state.surface|floatformat:2 }} m² {% if is_default_member and has_access %} - {% endif %} diff --git a/compensation/templates/compensation/detail/eco_account/includes/actions.html b/compensation/templates/compensation/detail/eco_account/includes/actions.html index 868242a..4e81e22 100644 --- a/compensation/templates/compensation/detail/eco_account/includes/actions.html +++ b/compensation/templates/compensation/detail/eco_account/includes/actions.html @@ -50,7 +50,7 @@ {{ action.comment|default_if_none:"" }} {% if is_default_member and has_access %} - {% endif %} diff --git a/compensation/templates/compensation/detail/eco_account/includes/controls.html b/compensation/templates/compensation/detail/eco_account/includes/controls.html index 5aa9620..f43ddeb 100644 --- a/compensation/templates/compensation/detail/eco_account/includes/controls.html +++ b/compensation/templates/compensation/detail/eco_account/includes/controls.html @@ -12,6 +12,9 @@ {% if has_access %} + {% if is_ets_member %} {% if obj.recorded %} {% endif %} diff --git a/compensation/templates/compensation/detail/eco_account/includes/states-after.html b/compensation/templates/compensation/detail/eco_account/includes/states-after.html index bd71e25..eab29ad 100644 --- a/compensation/templates/compensation/detail/eco_account/includes/states-after.html +++ b/compensation/templates/compensation/detail/eco_account/includes/states-after.html @@ -51,7 +51,7 @@ {{ state.surface|floatformat:2 }} m² {% if is_default_member and has_access %} - {% endif %} diff --git a/compensation/templates/compensation/detail/eco_account/includes/states-before.html b/compensation/templates/compensation/detail/eco_account/includes/states-before.html index 8acb486..ea3df84 100644 --- a/compensation/templates/compensation/detail/eco_account/includes/states-before.html +++ b/compensation/templates/compensation/detail/eco_account/includes/states-before.html @@ -51,7 +51,7 @@ {{ state.surface|floatformat:2 }} m² {% if is_default_member and has_access %} - {% endif %} diff --git a/compensation/tests.py b/compensation/tests.py deleted file mode 100644 index 7ce503c..0000000 --- a/compensation/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/compensation/tests/__init__.py b/compensation/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/compensation/tests/test_views.py b/compensation/tests/test_views.py new file mode 100644 index 0000000..ff9bc72 --- /dev/null +++ b/compensation/tests/test_views.py @@ -0,0 +1,406 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 27.10.21 + +""" +from django.urls import reverse +from django.test import Client + +from konova.settings import DEFAULT_GROUP +from konova.tests.test_views import BaseViewTestCase + + +class CompensationViewTestCase(BaseViewTestCase): + """ + These tests focus on proper returned views depending on the user's groups privileges and login status + + """ + + @classmethod + def setUpTestData(cls) -> None: + super().setUpTestData() + state = cls.create_dummy_states() + cls.compensation.before_states.set([state]) + cls.compensation.after_states.set([state]) + + action = cls.create_dummy_action() + cls.compensation.actions.set([action]) + + # Prepare urls + cls.index_url = reverse("compensation:index", args=()) + cls.new_url = reverse("compensation:new", args=(cls.intervention.id,)) + cls.new_id_url = reverse("compensation:new-id", args=()) + cls.detail_url = reverse("compensation:detail", args=(cls.compensation.id,)) + cls.log_url = reverse("compensation:log", args=(cls.compensation.id,)) + cls.edit_url = reverse("compensation:edit", args=(cls.compensation.id,)) + cls.remove_url = reverse("compensation:remove", args=(cls.compensation.id,)) + cls.report_url = reverse("compensation:report", args=(cls.compensation.id,)) + cls.state_new_url = reverse("compensation:new-state", args=(cls.compensation.id,)) + cls.action_new_url = reverse("compensation:new-action", args=(cls.compensation.id,)) + cls.deadline_new_url = reverse("compensation:new-deadline", args=(cls.compensation.id,)) + cls.new_doc_url = reverse("compensation:new-doc", args=(cls.compensation.id,)) + + cls.state_remove_url = reverse("compensation:state-remove", args=(cls.compensation.id, cls.comp_state.id,)) + cls.action_remove_url = reverse("compensation:action-remove", args=(cls.compensation.id, cls.comp_action.id,)) + + def test_anonymous_user(self): + """ Check correct status code for all requests + + Assumption: User not logged in + + Returns: + + """ + client = Client() + + success_urls = [ + self.report_url, + ] + fail_urls = [ + self.index_url, + self.detail_url, + self.new_url, + self.new_id_url, + self.log_url, + self.edit_url, + self.remove_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + ] + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + + def test_logged_in_no_groups_shared(self): + """ Check correct status code for all requests + + Assumption: User logged in and has no groups and data is shared + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + self.superuser.groups.set([]) + self.intervention.share_with_list([self.superuser]) + + # Since the user has no groups, it does not matter that data has been shared. There SHOULD not be any difference + # to a user without access, since the important permissions are missing + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + ] + fail_urls = [ + self.new_url, + self.new_id_url, + self.log_url, + self.edit_url, + self.remove_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + ] + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + + def test_logged_in_no_groups_unshared(self): + """ Check correct status code for all requests + + Assumption: User logged in and has no groups and data is shared + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + self.superuser.groups.set([]) + # Sharing is inherited by base intervention for compensation. Therefore configure the interventions share state + self.intervention.share_with_list([]) + + # Since the user has no groups, it does not matter that data is unshared. There SHOULD not be any difference + # to a user having shared access, since all important permissions are missing + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + ] + fail_urls = [ + self.new_url, + self.new_id_url, + self.log_url, + self.edit_url, + self.remove_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + ] + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + + def test_logged_in_default_group_shared(self): + """ Check correct status code for all requests + + Assumption: User logged in, is default group member and data is shared + --> Default group necessary since all base functionalities depend on this group membership + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + group = self.groups.get(name=DEFAULT_GROUP) + self.superuser.groups.set([group]) + # Sharing is inherited by base intervention for compensation. Therefore configure the interventions share state + self.intervention.share_with_list([self.superuser]) + + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + self.new_url, + self.new_id_url, + self.edit_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + self.log_url, + self.remove_url, + ] + self.assert_url_success(client, success_urls) + + def test_logged_in_default_group_unshared(self): + """ Check correct status code for all requests + + Assumption: User logged in, is default group member and data is NOT shared + --> Default group necessary since all base functionalities depend on this group membership + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + group = self.groups.get(name=DEFAULT_GROUP) + self.superuser.groups.set([group]) + # Sharing is inherited by base intervention for compensation. Therefore configure the interventions share state + self.intervention.share_with_list([]) + + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + self.new_id_url, + ] + fail_urls = [ + self.new_url, + self.edit_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + self.log_url, + self.remove_url, + ] + self.assert_url_fail(client, fail_urls) + self.assert_url_success(client, success_urls) + + +class EcoAccountViewTestCase(CompensationViewTestCase): + """ + These tests focus on proper returned views depending on the user's groups privileges and login status + + EcoAccounts can inherit the same tests used for compensations. + + """ + comp_state = None + comp_action = None + + @classmethod + def setUpTestData(cls) -> None: + super().setUpTestData() + state = cls.create_dummy_states() + cls.eco_account.before_states.set([state]) + cls.eco_account.after_states.set([state]) + + action = cls.create_dummy_action() + cls.eco_account.actions.set([action]) + + # Prepare urls + cls.index_url = reverse("compensation:acc-index", args=()) + cls.new_url = reverse("compensation:acc-new", args=()) + cls.new_id_url = reverse("compensation:acc-new-id", args=()) + cls.detail_url = reverse("compensation:acc-detail", args=(cls.eco_account.id,)) + cls.log_url = reverse("compensation:acc-log", args=(cls.eco_account.id,)) + cls.edit_url = reverse("compensation:acc-edit", args=(cls.eco_account.id,)) + cls.remove_url = reverse("compensation:acc-remove", args=(cls.eco_account.id,)) + cls.report_url = reverse("compensation:acc-report", args=(cls.eco_account.id,)) + cls.state_new_url = reverse("compensation:acc-new-state", args=(cls.eco_account.id,)) + cls.action_new_url = reverse("compensation:acc-new-action", args=(cls.eco_account.id,)) + cls.deadline_new_url = reverse("compensation:acc-new-deadline", args=(cls.eco_account.id,)) + cls.new_doc_url = reverse("compensation:acc-new-doc", args=(cls.eco_account.id,)) + cls.state_remove_url = reverse("compensation:acc-state-remove", args=(cls.eco_account.id, cls.comp_state.id,)) + cls.action_remove_url = reverse("compensation:acc-action-remove", args=(cls.eco_account.id, cls.comp_action.id,)) + + def test_logged_in_no_groups_shared(self): + """ Check correct status code for all requests + + Assumption: User logged in and has no groups and data is shared + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + self.superuser.groups.set([]) + self.eco_account.share_with_list([self.superuser]) + + # Since the user has no groups, it does not matter that data has been shared. There SHOULD not be any difference + # to a user without access, since the important permissions are missing + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + ] + fail_urls = [ + self.new_url, + self.new_id_url, + self.log_url, + self.edit_url, + self.remove_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + ] + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + + def test_logged_in_no_groups_unshared(self): + """ Check correct status code for all requests + + Assumption: User logged in and has no groups and data is shared + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + self.superuser.groups.set([]) + self.eco_account.share_with_list([]) + + # Since the user has no groups, it does not matter that data is unshared. There SHOULD not be any difference + # to a user having shared access, since all important permissions are missing + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + ] + fail_urls = [ + self.new_url, + self.new_id_url, + self.log_url, + self.edit_url, + self.remove_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + ] + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + + def test_logged_in_default_group_shared(self): + """ Check correct status code for all requests + + Assumption: User logged in, is default group member and data is shared + --> Default group necessary since all base functionalities depend on this group membership + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + group = self.groups.get(name=DEFAULT_GROUP) + self.superuser.groups.set([group]) + # Sharing is inherited by base intervention for compensation. Therefore configure the interventions share state + self.eco_account.share_with_list([self.superuser]) + + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + self.new_url, + self.new_id_url, + self.edit_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + self.log_url, + self.remove_url, + ] + self.assert_url_success(client, success_urls) + + def test_logged_in_default_group_unshared(self): + """ Check correct status code for all requests + + Assumption: User logged in, is default group member and data is NOT shared + --> Default group necessary since all base functionalities depend on this group membership + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + group = self.groups.get(name=DEFAULT_GROUP) + self.superuser.groups.set([group]) + self.eco_account.share_with_list([]) + + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + self.new_id_url, + self.new_url, + ] + fail_urls = [ + self.edit_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + self.log_url, + self.remove_url, + ] + self.assert_url_fail(client, fail_urls) + self.assert_url_success(client, success_urls) + diff --git a/compensation/tests/test_workflow.py b/compensation/tests/test_workflow.py new file mode 100644 index 0000000..28bc62d --- /dev/null +++ b/compensation/tests/test_workflow.py @@ -0,0 +1,295 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 11.11.21 + +""" +import datetime + +from django.contrib.gis.geos import MultiPolygon +from django.urls import reverse + +from compensation.models import Compensation +from konova.settings import ETS_GROUP, ZB_GROUP +from konova.tests.test_views import BaseWorkflowTestCase +from user.models import UserAction + + +class CompensationWorkflowTestCase(BaseWorkflowTestCase): + @classmethod + def setUpTestData(cls): + super().setUpTestData() + + # Give the user shared access to the dummy intervention -> inherits the access to the compensation + cls.intervention.share_with(cls.superuser) + + # Make sure the intervention itself would be fine with valid data + cls.intervention = cls.fill_out_intervention(cls.intervention) + + # Make sure the compensation is linked to the intervention + cls.intervention.compensations.set([cls.compensation]) + + def setUp(self) -> None: + super().setUp() + # Delete all existing compensations, which might be created by tests + Compensation.objects.all().delete() + + # Create a fresh dummy (non-valid) compensation before each test + self.compensation = self.create_dummy_compensation() + + def test_new(self): + """ Test the creation of a compensation + + Returns: + + """ + # Prepare url and form data to be posted + new_url = reverse("compensation:new") + test_id = self.create_dummy_string() + test_title = self.create_dummy_string() + test_geom = self.create_dummy_geometry() + post_data = { + "identifier": test_id, + "title": test_title, + "geom": test_geom.geojson, + "intervention": self.intervention.id, + } + + # Preserve the current number of intervention's compensations + num_compensations = self.intervention.compensations.count() + self.client_user.post(new_url, post_data) + + self.intervention.refresh_from_db() + self.assertEqual(num_compensations + 1, self.intervention.compensations.count()) + new_compensation = self.intervention.compensations.get(identifier=test_id) + self.assertEqual(new_compensation.identifier, test_id) + self.assertEqual(new_compensation.title, test_title) + self.assert_equal_geometries(new_compensation.geometry.geom, test_geom) + + def test_new_from_intervention(self): + """ Test the creation of a compensation from a given intervention + + Returns: + + """ + # Prepare url and form data to be posted + new_url = reverse("compensation:new", args=(self.intervention.id,)) + test_id = self.create_dummy_string() + test_title = self.create_dummy_string() + test_geom = self.create_dummy_geometry() + post_data = { + "identifier": test_id, + "title": test_title, + "geom": test_geom.geojson, + } + + # Preserve the current number of intervention's compensations + num_compensations = self.intervention.compensations.count() + self.client_user.post(new_url, post_data) + + self.intervention.refresh_from_db() + self.assertEqual(num_compensations + 1, self.intervention.compensations.count()) + new_compensation = self.intervention.compensations.get(identifier=test_id) + self.assertEqual(new_compensation.identifier, test_id) + self.assertEqual(new_compensation.title, test_title) + self.assert_equal_geometries(new_compensation.geometry.geom, test_geom) + + def test_edit(self): + """ Checks that the editing of a compensation works + + Returns: + + """ + url = reverse("compensation:edit", args=(self.compensation.id,)) + self.compensation = self.fill_out_compensation(self.compensation) + + new_title = self.create_dummy_string() + new_identifier = self.create_dummy_string() + new_comment = self.create_dummy_string() + new_geometry = MultiPolygon(srid=4326) # Create an empty geometry + + check_on_elements = { + self.compensation.title: new_title, + self.compensation.identifier: new_identifier, + self.compensation.comment: new_comment, + } + for k, v in check_on_elements.items(): + self.assertNotEqual(k, v) + + post_data = { + "identifier": new_identifier, + "title": new_title, + "intervention": self.intervention.id, # just keep the intervention as it is + "comment": new_comment, + "geom": new_geometry.geojson, + } + self.client_user.post(url, post_data) + self.compensation.refresh_from_db() + + check_on_elements = { + self.compensation.title: new_title, + self.compensation.identifier: new_identifier, + self.compensation.comment: new_comment, + } + + for k, v in check_on_elements.items(): + self.assertEqual(k, v) + + self.assert_equal_geometries(self.compensation.geometry.geom, new_geometry) + + def test_checkability(self): + """ + This tests if the checkability of the compensation (which is defined by the linked intervention's checked + attribute) is triggered by the quality of it's data (e.g. not all fields filled) + + We expect a compensation, missing required data, linked to an intervention to fail the intervention's quality + check performed in the checking action. + + Returns: + + """ + # Add proper privilege for the user + self.superuser.groups.add(self.groups.get(name=ZB_GROUP)) + + # Prepare url and form data + url = reverse("intervention:check", args=(self.intervention.id,)) + post_data = { + "checked_intervention": True, + "checked_comps": True, + } + + # Make sure the intervention is not checked + self.assertIsNone(self.intervention.checked) + + # Run the request --> expect fail, since the compensation is not valid, yet + self.client_user.post(url, post_data) + + # Check that the intervention is still not recorded + self.assertIsNone(self.intervention.checked) + + # Now fill out the data for a compensation + self.compensation = self.fill_out_compensation(self.compensation) + + # Rerun the request + self.client_user.post(url, post_data) + + # Expect the linked intervention now to be checked + # Attention: We can only test the date part of the timestamp, + # since the delay in microseconds would lead to fail + self.intervention.refresh_from_db() + checked = self.intervention.checked + self.assertIsNotNone(checked) + self.assertEqual(self.superuser, checked.user) + self.assertEqual(UserAction.CHECKED, checked.action) + self.assertEqual(datetime.date.today(), checked.timestamp.date()) + + # Expect the user action to be in the log + self.assertIn(checked, self.compensation.log.all()) + + def test_recordability(self): + """ + This tests if the recordability of the compensation (which is defined by the linked intervention's recorded + attribute) is triggered by the quality of it's data (e.g. not all fields filled) + + We expect a compensation, missing required data, linked to an intervention to fail the intervention's quality + check performed in the recording action. + + Returns: + + """ + # Add proper privilege for the user + self.superuser.groups.add(self.groups.get(name=ETS_GROUP)) + + # Prepare url and form data + record_url = reverse("intervention:record", args=(self.intervention.id,)) + post_data = { + "confirm": True, + } + + # Make sure the intervention is not recorded + self.assertIsNone(self.intervention.recorded) + + # Run the request --> expect fail, since the compensation is not valid, yet + self.client_user.post(record_url, post_data) + + # Check that the intervention is still not recorded + self.assertIsNone(self.intervention.recorded) + + # Now fill out the data for a compensation + self.compensation = self.fill_out_compensation(self.compensation) + + # Rerun the request + self.client_user.post(record_url, post_data) + + # Expect the linked intervention now to be recorded + # Attention: We can only test the date part of the timestamp, + # since the delay in microseconds would lead to fail + self.intervention.refresh_from_db() + recorded = self.intervention.recorded + self.assertIsNotNone(recorded) + self.assertEqual(self.superuser, recorded.user) + self.assertEqual(UserAction.RECORDED, recorded.action) + self.assertEqual(datetime.date.today(), recorded.timestamp.date()) + + # Expect the user action to be in the log + self.assertIn(recorded, self.compensation.log.all()) + + +class EcoAccountWorkflowTestCase(BaseWorkflowTestCase): + @classmethod + def setUpTestData(cls): + super().setUpTestData() + + # Add user to conservation office group and give shared access to the account + cls.superuser.groups.add(cls.groups.get(name=ETS_GROUP)) + cls.eco_account.share_with_list([cls.superuser]) + + def test_deductability(self): + """ + This tests the deductability of an eco account. + + An eco account should only be deductible if it is recorded. + + Returns: + + """ + # Give user shared access to the dummy intervention, which will be needed here + self.intervention.share_with(self.superuser) + + # Prepare data for deduction creation + deduct_url = reverse("compensation:acc-new-deduction", args=(self.eco_account.id,)) + test_surface = 10.00 + post_data = { + "surface": test_surface, + "account": self.id, + "intervention": self.intervention.id, + } + # Perform request --> expect to fail + self.client_user.post(deduct_url, post_data) + + # Expect that no deduction has been created + self.assertEqual(0, self.eco_account.deductions.count()) + self.assertEqual(0, self.intervention.deductions.count()) + + # Now mock the eco account as it would be recorded (with invalid data) + # Make sure the deductible surface is high enough for the request + self.eco_account.toggle_recorded(self.superuser) + self.eco_account.refresh_from_db() + self.eco_account.deductable_surface = test_surface + 1.00 + self.eco_account.save() + self.assertIsNotNone(self.eco_account.recorded) + self.assertGreater(self.eco_account.deductable_surface, test_surface) + + # Rerun the request + self.client_user.post(deduct_url, post_data) + + # Expect that the deduction has been created + self.assertEqual(1, self.eco_account.deductions.count()) + self.assertEqual(1, self.intervention.deductions.count()) + deduction = self.eco_account.deductions.first() + self.assertEqual(deduction.surface, test_surface) + self.assertEqual(deduction.account, self.eco_account) + self.assertEqual(deduction.intervention, self.intervention) + + diff --git a/compensation/views/compensation_views.py b/compensation/views/compensation_views.py index c722157..02bdde0 100644 --- a/compensation/views/compensation_views.py +++ b/compensation/views/compensation_views.py @@ -90,6 +90,7 @@ def new_view(request: HttpRequest, intervention_id: str = None): @login_required +@default_group_required def new_id_view(request: HttpRequest): """ JSON endpoint @@ -109,6 +110,7 @@ def new_id_view(request: HttpRequest): @login_required @default_group_required +@shared_access_required(Compensation, "id") def edit_view(request: HttpRequest, id: str): """ Renders a view for editing compensations @@ -196,6 +198,8 @@ def detail_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(Compensation, "id") def log_view(request: HttpRequest, id: str): """ Renders a log view using modal @@ -220,6 +224,8 @@ def log_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(Compensation, "id") def remove_view(request: HttpRequest, id: str): """ Renders a modal view for removing the compensation @@ -240,6 +246,8 @@ def remove_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(Compensation, "id") def new_document_view(request: HttpRequest, id: str): """ Renders a form for uploading new documents @@ -258,6 +266,7 @@ def new_document_view(request: HttpRequest, id: str): @login_required +@default_group_required def get_document_view(request: HttpRequest, doc_id: str): """ Returns the document as downloadable file @@ -284,6 +293,7 @@ def get_document_view(request: HttpRequest, doc_id: str): @login_required +@default_group_required def remove_document_view(request: HttpRequest, doc_id: str): """ Removes the document from the database and file system @@ -304,6 +314,8 @@ def remove_document_view(request: HttpRequest, doc_id: str): @login_required +@default_group_required +@shared_access_required(Compensation, "id") def state_new_view(request: HttpRequest, id: str): """ Renders a form for adding new states for a compensation @@ -323,6 +335,8 @@ def state_new_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(Compensation, "id") def action_new_view(request: HttpRequest, id: str): """ Renders a form for adding new actions for a compensation @@ -342,6 +356,8 @@ def action_new_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(Compensation, "id") def deadline_new_view(request: HttpRequest, id: str): """ Renders a form for adding new states for a compensation @@ -361,17 +377,20 @@ def deadline_new_view(request: HttpRequest, id: str): @login_required -def state_remove_view(request: HttpRequest, id: str): +@default_group_required +@shared_access_required(Compensation, "id") +def state_remove_view(request: HttpRequest, id: str, state_id: str): """ Renders a form for removing a compensation state Args: request (HttpRequest): The incoming request - id (str): The state's id + id (str): The compensation's id + state_id (str): The state's id Returns: """ - state = get_object_or_404(CompensationState, id=id) + state = get_object_or_404(CompensationState, id=state_id) form = RemoveModalForm(request.POST or None, instance=state, user=request.user) return form.process_request( request, @@ -380,17 +399,20 @@ def state_remove_view(request: HttpRequest, id: str): @login_required -def action_remove_view(request: HttpRequest, id: str): +@default_group_required +@shared_access_required(Compensation, "id") +def action_remove_view(request: HttpRequest, id: str, action_id: str): """ Renders a form for removing a compensation action Args: request (HttpRequest): The incoming request + id (str): The compensation's id id (str): The action's id Returns: """ - action = get_object_or_404(CompensationAction, id=id) + action = get_object_or_404(CompensationAction, id=action_id) form = RemoveModalForm(request.POST or None, instance=action, user=request.user) return form.process_request( request, @@ -398,7 +420,7 @@ def action_remove_view(request: HttpRequest, id: str): ) -def report_view(request:HttpRequest, id: str): +def report_view(request: HttpRequest, id: str): """ Renders the public report view Args: diff --git a/compensation/views/eco_account_views.py b/compensation/views/eco_account_views.py index 1ac355f..e9934d4 100644 --- a/compensation/views/eco_account_views.py +++ b/compensation/views/eco_account_views.py @@ -16,16 +16,18 @@ from django.shortcuts import render, get_object_or_404, redirect from compensation.forms.forms import NewEcoAccountForm, EditEcoAccountForm from compensation.forms.modalForms import NewStateModalForm, NewActionModalForm, NewDeadlineModalForm -from compensation.models import EcoAccount, EcoAccountDocument +from compensation.models import EcoAccount, EcoAccountDocument, CompensationState, CompensationAction from compensation.tables import EcoAccountTable -from intervention.forms.modalForms import NewDeductionModalForm +from intervention.forms.modalForms import NewDeductionModalForm, ShareInterventionModalForm from konova.contexts import BaseContext -from konova.decorators import any_group_check, default_group_required, conservation_office_group_required +from konova.decorators import any_group_check, default_group_required, conservation_office_group_required, \ + shared_access_required from konova.forms import RemoveModalForm, SimpleGeomForm, NewDocumentForm, RecordModalForm from konova.settings import DEFAULT_GROUP, ZB_GROUP, ETS_GROUP from konova.utils.documents import get_document, remove_document from konova.utils.generators import generate_qr_code -from konova.utils.message_templates import IDENTIFIER_REPLACED, FORM_INVALID, DATA_UNSHARED, DATA_UNSHARED_EXPLANATION +from konova.utils.message_templates import IDENTIFIER_REPLACED, FORM_INVALID, DATA_UNSHARED, DATA_UNSHARED_EXPLANATION, \ + CANCEL_ACC_RECORDED_OR_DEDUCTED from konova.utils.user_checks import in_group @@ -99,6 +101,7 @@ def new_view(request: HttpRequest): @login_required +@default_group_required def new_id_view(request: HttpRequest): """ JSON endpoint @@ -118,6 +121,7 @@ def new_id_view(request: HttpRequest): @login_required @default_group_required +@shared_access_required(EcoAccount, "id") def edit_view(request: HttpRequest, id: str): """ Renders a view for editing compensations @@ -223,6 +227,8 @@ def detail_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(EcoAccount, "id") def remove_view(request: HttpRequest, id: str): """ Renders a modal view for removing the eco account @@ -234,6 +240,15 @@ def remove_view(request: HttpRequest, id: str): """ acc = get_object_or_404(EcoAccount, id=id) + + # If the eco account has already been recorded OR there are already deductions, it can not be deleted by a regular + # default group user + if acc.recorded is not None or acc.deductions.exists(): + user = request.user + if not in_group(user, ETS_GROUP): + messages.info(request, CANCEL_ACC_RECORDED_OR_DEDUCTED) + return redirect("compensation:acc-detail", id=id) + form = RemoveModalForm(request.POST or None, instance=acc, user=request.user) return form.process_request( request=request, @@ -244,6 +259,7 @@ def remove_view(request: HttpRequest, id: str): @login_required @default_group_required +@shared_access_required(EcoAccount, "id") def deduction_remove_view(request: HttpRequest, id: str, deduction_id: str): """ Renders a modal view for removing deductions @@ -270,6 +286,7 @@ def deduction_remove_view(request: HttpRequest, id: str, deduction_id: str): @login_required @default_group_required +@shared_access_required(EcoAccount, "id") def log_view(request: HttpRequest, id: str): """ Renders a log view using modal @@ -295,6 +312,7 @@ def log_view(request: HttpRequest, id: str): @login_required @conservation_office_group_required +@shared_access_required(EcoAccount, "id") def record_view(request: HttpRequest, id:str): """ Renders a modal form for recording an eco account @@ -316,6 +334,8 @@ def record_view(request: HttpRequest, id:str): @login_required +@default_group_required +@shared_access_required(EcoAccount, "id") def state_new_view(request: HttpRequest, id: str): """ Renders a form for adding new states for an eco account @@ -335,6 +355,8 @@ def state_new_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(EcoAccount, "id") def action_new_view(request: HttpRequest, id: str): """ Renders a form for adding new actions for an eco account @@ -354,6 +376,52 @@ def action_new_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(EcoAccount, "id") +def state_remove_view(request: HttpRequest, id: str, state_id: str): + """ Renders a form for removing a compensation state + + Args: + request (HttpRequest): The incoming request + id (str): The compensation's id + state_id (str): The state's id + + Returns: + + """ + state = get_object_or_404(CompensationState, id=state_id) + form = RemoveModalForm(request.POST or None, instance=state, user=request.user) + return form.process_request( + request, + msg_success=_("State removed") + ) + + +@login_required +@default_group_required +@shared_access_required(EcoAccount, "id") +def action_remove_view(request: HttpRequest, id: str, action_id: str): + """ Renders a form for removing a compensation action + + Args: + request (HttpRequest): The incoming request + id (str): The compensation's id + id (str): The action's id + + Returns: + + """ + action = get_object_or_404(CompensationAction, id=action_id) + form = RemoveModalForm(request.POST or None, instance=action, user=request.user) + return form.process_request( + request, + msg_success=_("Action removed") + ) + + +@login_required +@default_group_required +@shared_access_required(EcoAccount, "id") def deadline_new_view(request: HttpRequest, id: str): """ Renders a form for adding new states for an eco account @@ -373,6 +441,8 @@ def deadline_new_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(EcoAccount, "id") def new_document_view(request: HttpRequest, id: str): """ Renders a form for uploading new documents @@ -391,6 +461,7 @@ def new_document_view(request: HttpRequest, id: str): @login_required +@default_group_required def get_document_view(request: HttpRequest, doc_id: str): """ Returns the document as downloadable file @@ -417,6 +488,8 @@ def get_document_view(request: HttpRequest, doc_id: str): @login_required +@default_group_required +@shared_access_required(EcoAccount, "id") def remove_document_view(request: HttpRequest, doc_id: str): """ Removes the document from the database and file system @@ -438,6 +511,7 @@ def remove_document_view(request: HttpRequest, doc_id: str): @login_required @default_group_required +@shared_access_required(EcoAccount, "id") def new_deduction_view(request: HttpRequest, id: str): """ Renders a modal form view for creating deductions @@ -511,3 +585,64 @@ def report_view(request:HttpRequest, id: str): } context = BaseContext(request, context).context return render(request, template, context) + + +@login_required +def share_view(request: HttpRequest, id: str, token: str): + """ Performs sharing of an eco account + + If token given in url is not valid, the user will be redirected to the dashboard + + Args: + request (HttpRequest): The incoming request + id (str): EcoAccount's id + token (str): Access token for EcoAccount + + Returns: + + """ + user = request.user + obj = get_object_or_404(EcoAccount, id=id) + # Check tokens + if obj.access_token == token: + # Send different messages in case user has already been added to list of sharing users + if obj.is_shared_with(user): + messages.info( + request, + _("{} has already been shared with you").format(obj.identifier) + ) + else: + messages.success( + request, + _("{} has been shared with you").format(obj.identifier) + ) + obj.share_with(user) + return redirect("compensation:acc-detail", id=id) + else: + messages.error( + request, + _("Share link invalid"), + extra_tags="danger", + ) + return redirect("home") + + +@login_required +@default_group_required +@shared_access_required(EcoAccount, "id") +def create_share_view(request: HttpRequest, id: str): + """ Renders sharing form for an eco account + + Args: + request (HttpRequest): The incoming request + id (str): EcoAccount's id + + Returns: + + """ + obj = get_object_or_404(EcoAccount, id=id) + form = ShareInterventionModalForm(request.POST or None, instance=obj, request=request) + return form.process_request( + request, + msg_success=_("Share settings updated") + ) \ No newline at end of file diff --git a/ema/forms.py b/ema/forms.py index 7fd42a5..44d186c 100644 --- a/ema/forms.py +++ b/ema/forms.py @@ -83,7 +83,7 @@ class NewEmaForm(AbstractCompensationForm, CompensationResponsibleFormMixin): acc.fundings.set(fundings) # Add the creating user to the list of shared users - acc.users.add(user) + acc.share_with(user) # Add the log entry to the main objects log list acc.log.add(action) diff --git a/ema/models.py b/ema/models.py index 1fa8501..d37eca5 100644 --- a/ema/models.py +++ b/ema/models.py @@ -7,12 +7,11 @@ from django.db.models import QuerySet from compensation.models import AbstractCompensation from ema.managers import EmaManager from ema.utils.quality import EmaQualityChecker -from konova.models import AbstractDocument, generate_document_file_upload_path, RecordableMixin +from konova.models import AbstractDocument, generate_document_file_upload_path, RecordableObject, ShareableObject from konova.settings import DEFAULT_SRID_RLP, LANIS_LINK_TEMPLATE -from user.models import UserActionLogEntry -class Ema(AbstractCompensation, RecordableMixin): +class Ema(AbstractCompensation, ShareableObject, RecordableObject): """ EMA = Ersatzzahlungsmaßnahme (compensation actions from payments) @@ -28,23 +27,6 @@ class Ema(AbstractCompensation, RecordableMixin): EMA therefore holds data like a compensation: actions, before-/after-states, deadlines, ... """ - # Users having access on this object - # Not needed in regular Compensation since their access is defined by the linked intervention's access - users = models.ManyToManyField( - User, - help_text="Users having access (shared with)" - ) - - # Refers to "verzeichnen" - recorded = models.OneToOneField( - UserActionLogEntry, - on_delete=models.SET_NULL, - null=True, - blank=True, - help_text="Holds data on user and timestamp of this action", - related_name="+" - ) - objects = EmaManager() def __str__(self): diff --git a/ema/templates/ema/detail/includes/actions.html b/ema/templates/ema/detail/includes/actions.html index 8fab123..5635970 100644 --- a/ema/templates/ema/detail/includes/actions.html +++ b/ema/templates/ema/detail/includes/actions.html @@ -48,7 +48,7 @@ {{ action.comment|default_if_none:"" }} {% if is_default_member and has_access %} - {% endif %} diff --git a/ema/templates/ema/detail/includes/controls.html b/ema/templates/ema/detail/includes/controls.html index 1d5e546..6a4f706 100644 --- a/ema/templates/ema/detail/includes/controls.html +++ b/ema/templates/ema/detail/includes/controls.html @@ -12,6 +12,9 @@ {% if has_access %} + {% if is_ets_member %} {% if obj.recorded %} {% endif %} diff --git a/ema/templates/ema/detail/includes/states-before.html b/ema/templates/ema/detail/includes/states-before.html index 7422065..42c0eb1 100644 --- a/ema/templates/ema/detail/includes/states-before.html +++ b/ema/templates/ema/detail/includes/states-before.html @@ -49,7 +49,7 @@ {{ state.surface|floatformat:2 }} m² {% if is_default_member and has_access %} - {% endif %} diff --git a/ema/tests.py b/ema/tests.py deleted file mode 100644 index 7ce503c..0000000 --- a/ema/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/ema/tests/__init__.py b/ema/tests/__init__.py new file mode 100644 index 0000000..10799e8 --- /dev/null +++ b/ema/tests/__init__.py @@ -0,0 +1,7 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 26.10.21 + +""" diff --git a/ema/tests/test_views.py b/ema/tests/test_views.py new file mode 100644 index 0000000..07d761c --- /dev/null +++ b/ema/tests/test_views.py @@ -0,0 +1,242 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 26.10.21 + +""" +from django.db.models import Q +from django.urls import reverse +from django.test.client import Client + +from compensation.tests.test_views import CompensationViewTestCase +from ema.models import Ema +from intervention.models import ResponsibilityData +from konova.models import Geometry +from konova.settings import DEFAULT_GROUP, ETS_GROUP +from user.models import UserActionLogEntry, UserAction + + +class EmaViewTestCase(CompensationViewTestCase): + """ Test cases for EMA. + + Since we inherit most tests functions from CompensationViewTestCase, we only need to add some EMA specific + test functions + + """ + ema = None + + @classmethod + def setUpTestData(cls) -> None: + super().setUpTestData() + + # Create dummy data and related objects, like states or actions + cls.create_dummy_data() + state = cls.create_dummy_states() + action = cls.create_dummy_action() + cls.ema.before_states.set([state]) + cls.ema.after_states.set([state]) + cls.ema.actions.set([action]) + + # Prepare urls + cls.index_url = reverse("ema:index", args=()) + cls.new_url = reverse("ema:new", args=()) + cls.new_id_url = reverse("ema:new-id", args=()) + cls.detail_url = reverse("ema:detail", args=(cls.ema.id,)) + cls.log_url = reverse("ema:log", args=(cls.ema.id,)) + cls.edit_url = reverse("ema:edit", args=(cls.ema.id,)) + cls.remove_url = reverse("ema:remove", args=(cls.ema.id,)) + cls.share_url = reverse("ema:share", args=(cls.ema.id, cls.ema.access_token,)) + cls.share_create_url = reverse("ema:share-create", args=(cls.ema.id,)) + cls.record_url = reverse("ema:record", args=(cls.ema.id,)) + cls.report_url = reverse("ema:report", args=(cls.ema.id,)) + cls.new_doc_url = reverse("ema:new-doc", args=(cls.ema.id,)) + cls.state_new_url = reverse("ema:new-state", args=(cls.ema.id,)) + cls.action_new_url = reverse("ema:new-action", args=(cls.ema.id,)) + cls.deadline_new_url = reverse("ema:new-deadline", args=(cls.ema.id,)) + cls.state_remove_url = reverse("ema:state-remove", args=(cls.ema.id, state.id,)) + cls.action_remove_url = reverse("ema:action-remove", args=(cls.ema.id, action.id,)) + + @classmethod + def create_dummy_data(cls): + # Create dummy data + # Create log entry + action = UserActionLogEntry.objects.create( + user=cls.superuser, + action=UserAction.CREATED, + ) + # Create responsible data object + responsibility_data = ResponsibilityData.objects.create() + geometry = Geometry.objects.create() + cls.ema = Ema.objects.create( + identifier="TEST", + title="Test_title", + created=action, + geometry=geometry, + responsible=responsibility_data, + comment="Test", + ) + + def test_logged_in_default_group_shared(self): + """ Check correct status code for all requests + + OVERWRITES DEFAULT COMPENSATION TEST METHOD DUE TO SPECIFIC BEHAVIOUR OF EMAS + + Assumption: User logged in, is default group member and data is shared + + Normally default group would give access to all base functionalities. In case of EMAs we expect these + requests to fail, since a user must be part of the ets group as well, not only default. + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + group = self.groups.get(name=DEFAULT_GROUP) + self.superuser.groups.set([group]) + + # Sharing does not have any effect in here, since the default group will prohibit further functionality access + # to this user + self.ema.share_with_list([self.superuser]) + + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + ] + fail_urls = [ + self.new_url, + self.new_id_url, + self.edit_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + self.log_url, + self.remove_url, + ] + self.assert_url_fail(client, fail_urls) + self.assert_url_success(client, success_urls) + + def test_logged_in_default_group_unshared(self): + """ Check correct status code for all requests + + OVERWRITES DEFAULT COMPENSATION TEST METHOD DUE TO SPECIFIC BEHAVIOUR OF EMAS + + Assumption: User logged in, is default group member and data is NOT shared + + Normally default group would give access to all base functionalities. In case of EMAs we expect these + requests to fail, since a user must be part of the ets group as well, not only default. + + We check on the same tests as in the _shared test, since we want to make sure there is no difference in + between shared and unshared, if the user is only part of the default group. + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + group = self.groups.get(name=DEFAULT_GROUP) + self.superuser.groups.set([group]) + + # Sharing does not have any effect in here, since the default group will prohibit further functionality access + # to this user + self.ema.share_with_list([]) + + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + ] + fail_urls = [ + self.new_url, + self.new_id_url, + self.edit_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + self.log_url, + self.remove_url, + ] + self.assert_url_fail(client, fail_urls) + self.assert_url_success(client, success_urls) + + def test_logged_in_ets_group_shared(self): + """ Check correct status code for all requests + + Assumption: User logged in, is conservation office group member and data is shared + + For EMAs we expect a user to be ETS and default group member to have full access to all functionalities, normally + provided for default group members. + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + groups = self.groups.filter(Q(name=ETS_GROUP)|Q(name=DEFAULT_GROUP)) + self.superuser.groups.set(groups) + # Sharing is inherited by base intervention for compensation. Therefore configure the interventions share state + self.ema.share_with_list([self.superuser]) + + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + self.new_url, + self.new_id_url, + self.edit_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + self.log_url, + self.remove_url, + ] + self.assert_url_success(client, success_urls) + + def test_logged_in_ets_group_unshared(self): + """ Check correct status code for all requests + + Assumption: User logged in, is conservation office group member and data is NOT shared + + For EMAs we expect a user to be ETS and default group member to have full access to all functionalities, normally + provided for default group members. + + Returns: + + """ + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + groups = self.groups.filter(Q(name=ETS_GROUP)|Q(name=DEFAULT_GROUP)) + self.superuser.groups.set(groups) + # Sharing is inherited by base intervention for compensation. Therefore configure the interventions share state + self.ema.share_with_list([]) + + success_urls = [ + self.index_url, + self.detail_url, + self.report_url, + self.new_url, + self.new_id_url, + ] + fail_urls = [ + self.edit_url, + self.state_new_url, + self.action_new_url, + self.deadline_new_url, + self.state_remove_url, + self.action_remove_url, + self.new_doc_url, + self.log_url, + self.remove_url, + ] + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) diff --git a/ema/urls.py b/ema/urls.py index ad926d0..a732f32 100644 --- a/ema/urls.py +++ b/ema/urls.py @@ -21,7 +21,11 @@ urlpatterns = [ path('/report', report_view, name='report'), path('/state/new', state_new_view, name='new-state'), path('/action/new', action_new_view, name='new-action'), + path('/state//remove', state_remove_view, name='state-remove'), + path('/action//remove', action_remove_view, name='action-remove'), path('/deadline/new', deadline_new_view, name="new-deadline"), + path('/share/', share_view, name='share'), + path('/share', create_share_view, name='share-create'), # Documents # Document remove route can be found in konova/urls.py @@ -29,9 +33,4 @@ urlpatterns = [ path('document/', get_document_view, name='get-doc'), path('document//remove/', remove_document_view, name='remove-doc'), - # Generic state routes - path('state//remove', state_remove_view, name='state-remove'), - - # Generic action routes - path('action//remove', action_remove_view, name='action-remove'), ] \ No newline at end of file diff --git a/ema/views.py b/ema/views.py index 1e72e09..496c55b 100644 --- a/ema/views.py +++ b/ema/views.py @@ -6,12 +6,13 @@ from django.shortcuts import render, get_object_or_404, redirect from django.urls import reverse from django.utils.translation import gettext_lazy as _ -import compensation from compensation.forms.modalForms import NewStateModalForm, NewActionModalForm, NewDeadlineModalForm +from compensation.models import CompensationAction, CompensationState from ema.forms import NewEmaForm, EditEmaForm from ema.tables import EmaTable +from intervention.forms.modalForms import ShareInterventionModalForm from konova.contexts import BaseContext -from konova.decorators import conservation_office_group_required +from konova.decorators import conservation_office_group_required, shared_access_required from ema.models import Ema, EmaDocument from konova.forms import RemoveModalForm, NewDocumentForm, SimpleGeomForm, RecordModalForm from konova.settings import DEFAULT_GROUP, ZB_GROUP, ETS_GROUP @@ -91,6 +92,7 @@ def new_view(request: HttpRequest): @login_required +@conservation_office_group_required def new_id_view(request: HttpRequest): """ JSON endpoint @@ -158,6 +160,8 @@ def detail_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") def log_view(request: HttpRequest, id: str): """ Renders a log view using modal @@ -182,6 +186,8 @@ def log_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") def edit_view(request: HttpRequest, id: str): """ Renders a view for editing compensations @@ -218,6 +224,8 @@ def edit_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") def remove_view(request: HttpRequest, id: str): """ Renders a modal view for removing the EMA @@ -238,6 +246,8 @@ def remove_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") def record_view(request: HttpRequest, id: str): """ Renders a modal view for recording the EMA @@ -258,6 +268,8 @@ def record_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") def state_new_view(request: HttpRequest, id: str): """ Renders a form for adding new states for an EMA @@ -277,6 +289,8 @@ def state_new_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") def action_new_view(request: HttpRequest, id: str): """ Renders a form for adding new actions for an EMA @@ -296,6 +310,8 @@ def action_new_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") def deadline_new_view(request: HttpRequest, id: str): """ Renders a form for adding new states for an EMA @@ -315,6 +331,8 @@ def deadline_new_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") def document_new_view(request: HttpRequest, id: str): """ Renders a form for uploading new documents @@ -333,6 +351,7 @@ def document_new_view(request: HttpRequest, id: str): @login_required +@conservation_office_group_required def get_document_view(request: HttpRequest, doc_id: str): """ Returns the document as downloadable file @@ -359,6 +378,7 @@ def get_document_view(request: HttpRequest, doc_id: str): @login_required +@conservation_office_group_required def remove_document_view(request: HttpRequest, doc_id: str): """ Removes the document from the database and file system @@ -379,37 +399,46 @@ def remove_document_view(request: HttpRequest, doc_id: str): @login_required -def state_remove_view(request: HttpRequest, id: str): +@conservation_office_group_required +@shared_access_required(Ema, "id") +def state_remove_view(request: HttpRequest, id: str, state_id: str): """ Renders a form for removing an EMA state Args: request (HttpRequest): The incoming request - id (str): The state's id + id (str): The ema id + state_id (str): The state's id Returns: """ - return compensation.views.compensation_views.state_remove_view( + state = get_object_or_404(CompensationState, id=state_id) + form = RemoveModalForm(request.POST or None, instance=state, user=request.user) + return form.process_request( request, - id + msg_success=_("State removed") ) @login_required -def action_remove_view(request: HttpRequest, id: str): - """ Renders a form for removing an EMA state +@conservation_office_group_required +@shared_access_required(Ema, "id") +def action_remove_view(request: HttpRequest, id: str, action_id: str): + """ Renders a form for removing an EMA action Args: request (HttpRequest): The incoming request - id (str): The state's id + id (str): The ema id + id (str): The action's id Returns: """ - # Reuses the route logic from compensation view - return compensation.views.compensation_views.action_remove_view( + action = get_object_or_404(CompensationAction, id=action_id) + form = RemoveModalForm(request.POST or None, instance=action, user=request.user) + return form.process_request( request, - id + msg_success=_("Action removed") ) @@ -460,4 +489,65 @@ def report_view(request:HttpRequest, id: str): "actions": actions, } context = BaseContext(request, context).context - return render(request, template, context) \ No newline at end of file + return render(request, template, context) + + +@login_required +def share_view(request: HttpRequest, id: str, token: str): + """ Performs sharing of an ema + + If token given in url is not valid, the user will be redirected to the dashboard + + Args: + request (HttpRequest): The incoming request + id (str): EMA's id + token (str): Access token for EMA + + Returns: + + """ + user = request.user + obj = get_object_or_404(Ema, id=id) + # Check tokens + if obj.access_token == token: + # Send different messages in case user has already been added to list of sharing users + if obj.is_shared_with(user): + messages.info( + request, + _("{} has already been shared with you").format(obj.identifier) + ) + else: + messages.success( + request, + _("{} has been shared with you").format(obj.identifier) + ) + obj.share_with(user) + return redirect("ema:detail", id=id) + else: + messages.error( + request, + _("Share link invalid"), + extra_tags="danger", + ) + return redirect("home") + + +@login_required +@conservation_office_group_required +@shared_access_required(Ema, "id") +def create_share_view(request: HttpRequest, id: str): + """ Renders sharing form for an Ema + + Args: + request (HttpRequest): The incoming request + id (str): Ema's id + + Returns: + + """ + obj = get_object_or_404(Ema, id=id) + form = ShareInterventionModalForm(request.POST or None, instance=obj, request=request) + return form.process_request( + request, + msg_success=_("Share settings updated") + ) \ No newline at end of file diff --git a/intervention/forms/forms.py b/intervention/forms/forms.py index 67a58a4..f7453cc 100644 --- a/intervention/forms/forms.py +++ b/intervention/forms/forms.py @@ -255,7 +255,7 @@ class NewInterventionForm(BaseForm): intervention.log.add(action) # Add the performing user as the first user having access to the data - intervention.users.add(user) + intervention.share_with(user) return intervention @@ -356,7 +356,7 @@ class EditInterventionForm(NewInterventionForm): self.instance.save() # Uncheck and unrecord intervention due to changed data - self.instance.set_unchecked(user) + self.instance.set_unchecked() self.instance.set_unrecorded(user) return self.instance diff --git a/intervention/forms/modalForms.py b/intervention/forms/modalForms.py index 6942596..fe24efa 100644 --- a/intervention/forms/modalForms.py +++ b/intervention/forms/modalForms.py @@ -68,8 +68,9 @@ class ShareInterventionModalForm(BaseModalForm): """ # Initialize share_link field + url_name = f"{self.instance._meta.app_label}:share" self.share_link = self.request.build_absolute_uri( - reverse("intervention:share", args=(self.instance.id, self.instance.access_token,)) + reverse(url_name, args=(self.instance.id, self.instance.access_token,)) ) self.initialize_form_field( "url", @@ -98,7 +99,7 @@ class ShareInterventionModalForm(BaseModalForm): accessing_users = User.objects.filter( id__in=self.cleaned_data["users"] ) - self.instance.users.set(accessing_users) + self.instance.share_with_list(accessing_users) class NewRevocationModalForm(BaseModalForm): @@ -181,7 +182,10 @@ class NewRevocationModalForm(BaseModalForm): return revocation -class RunCheckModalForm(BaseModalForm): +class CheckModalForm(BaseModalForm): + """ The modal form for running a check on interventions and their compensations + + """ checked_intervention = forms.BooleanField( label=_("Checked intervention data"), label_suffix="", @@ -231,16 +235,7 @@ class RunCheckModalForm(BaseModalForm): """ with transaction.atomic(): - user_action = UserActionLogEntry.objects.create( - user=self.user, - action=UserAction.CHECKED - ) - # Replace old checked - if self.instance.checked: - self.instance.checked.delete() - self.instance.checked = user_action - self.instance.log.add(user_action) - self.instance.save() + self.instance.toggle_checked(self.user) # Send message to the SSO server messenger = Messenger( @@ -344,9 +339,9 @@ class NewDeductionModalForm(BaseModalForm): return False # Calculate valid surface - sum_surface = acc.get_surface_after_states() + deductable_surface = acc.deductable_surface sum_surface_deductions = acc.get_deductions_surface() - rest_surface = sum_surface - sum_surface_deductions + rest_surface = deductable_surface - sum_surface_deductions form_surface = float(self.cleaned_data["surface"]) is_valid_surface = form_surface < rest_surface if not is_valid_surface: diff --git a/intervention/models.py b/intervention/models.py index 46cc095..47d3a58 100644 --- a/intervention/models.py +++ b/intervention/models.py @@ -17,9 +17,8 @@ from codelist.settings import CODELIST_REGISTRATION_OFFICE_ID, CODELIST_CONSERVA from intervention.managers import InterventionManager from intervention.utils.quality import InterventionQualityChecker from konova.models import BaseObject, Geometry, UuidModel, BaseResource, AbstractDocument, \ - generate_document_file_upload_path, RecordableMixin, CheckableMixin + generate_document_file_upload_path, RecordableObject, CheckableObject, ShareableObject from konova.settings import DEFAULT_SRID_RLP, LANIS_LINK_TEMPLATE, LANIS_ZOOM_LUT -from konova.utils import generators from user.models import UserActionLogEntry @@ -171,7 +170,7 @@ class LegalData(UuidModel): revocation = models.OneToOneField(Revocation, null=True, blank=True, help_text="Refers to 'Widerspruch am'", on_delete=models.SET_NULL) -class Intervention(BaseObject, RecordableMixin, CheckableMixin): +class Intervention(BaseObject, ShareableObject, RecordableObject, CheckableObject): """ Interventions are e.g. construction sites where nature used to be. """ @@ -191,74 +190,11 @@ class Intervention(BaseObject, RecordableMixin, CheckableMixin): ) geometry = models.ForeignKey(Geometry, null=True, blank=True, on_delete=models.SET_NULL) - # Checks - Refers to "Genehmigen" but optional - checked = models.OneToOneField( - UserActionLogEntry, - on_delete=models.SET_NULL, - null=True, - blank=True, - help_text="Holds data on user and timestamp of this action", - related_name="+" - ) - - # Refers to "verzeichnen" - recorded = models.OneToOneField( - UserActionLogEntry, - on_delete=models.SET_NULL, - null=True, - blank=True, - help_text="Holds data on user and timestamp of this action", - related_name="+" - ) - - # Users having access on this object - users = models.ManyToManyField(User, help_text="Users having access (data shared with)") - access_token = models.CharField( - max_length=255, - null=True, - blank=True, - help_text="Used for sharing access", - ) - objects = InterventionManager() def __str__(self): return "{} ({})".format(self.identifier, self.title) - def generate_access_token(self, make_unique: bool = False, rec_depth: int = 5): - """ Creates a new access token for the intervention - - Tokens are not used for identification of a table row. The share logic checks the intervention id as well - as the given token. Therefore two different interventions can hold the same access_token without problems. - For (possible) future changes to the share logic, the make_unique parameter may be used for checking whether - the access_token is already used in any intervention. If so, tokens will be generated as long as a free token - can be found. - - Args: - make_unique (bool): Perform check on uniqueness over all intervention entries - rec_depth (int): How many tries for generating a free random token (only if make_unique) - - Returns: - - """ - # Make sure we won't end up in an infinite loop of trying to generate access_tokens - rec_depth = rec_depth - 1 - if rec_depth < 0 and make_unique: - raise RuntimeError( - "Access token generating for {} does not seem to find a free random token! Aborted!".format(self.id) - ) - - # Create random token - token = generators.generate_random_string(15) - token_used_in = Intervention.objects.filter(access_token=token) - # Make sure the token is not used anywhere as access_token, yet. - # Make use of QuerySet lazy method for checking if it exists or not. - if token_used_in and make_unique: - self.generate_access_token(make_unique, rec_depth) - else: - self.access_token = token - self.save() - def save(self, *args, **kwargs): """ Custom save functionality @@ -348,6 +284,44 @@ class Intervention(BaseObject, RecordableMixin, CheckableMixin): ) return revoc_docs, regular_docs + def toggle_recorded(self, user: User): + """ Toggle the recorded state + + For interventions the recorded action needs to be added to their compensation objects as well + + Args: + user (User): The performing user + + Returns: + + """ + log_entry = super().toggle_recorded(user) + + # Add this action to the linked compensation logs as well + comps = self.compensations.all() + for comp in comps: + comp.log.add(log_entry) + + def toggle_checked(self, user: User) -> UserActionLogEntry: + """ Toggle the checked state + + For interventions the checked action needs to be added to their compensation objects as well + + Args: + user (User): The performing user + + Returns: + + """ + log_entry = super().toggle_checked(user) + # Leave if the log_entry is None (means "unchecked") + if log_entry is None: + return + # Add this action to the linked compensation logs as well + comps = self.compensations.all() + for comp in comps: + comp.log.add(log_entry) + class InterventionDocument(AbstractDocument): """ diff --git a/intervention/templates/intervention/detail/includes/controls.html b/intervention/templates/intervention/detail/includes/controls.html index bb94042..f41c8b8 100644 --- a/intervention/templates/intervention/detail/includes/controls.html +++ b/intervention/templates/intervention/detail/includes/controls.html @@ -16,7 +16,7 @@ {% fa5_icon 'share-alt' %} {% if is_zb_member %} - {% endif %} diff --git a/intervention/templates/intervention/detail/includes/deductions.html b/intervention/templates/intervention/detail/includes/deductions.html index cb2f817..6bb602d 100644 --- a/intervention/templates/intervention/detail/includes/deductions.html +++ b/intervention/templates/intervention/detail/includes/deductions.html @@ -55,7 +55,7 @@ {{ deduction.created.timestamp|default_if_none:""|naturalday}} {% if is_default_member and has_access %} - {% endif %} diff --git a/intervention/tests.py b/intervention/tests.py deleted file mode 100644 index 7ce503c..0000000 --- a/intervention/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/intervention/tests/__init__.py b/intervention/tests/__init__.py new file mode 100644 index 0000000..10799e8 --- /dev/null +++ b/intervention/tests/__init__.py @@ -0,0 +1,7 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 26.10.21 + +""" diff --git a/intervention/tests/test_views.py b/intervention/tests/test_views.py new file mode 100644 index 0000000..23da913 --- /dev/null +++ b/intervention/tests/test_views.py @@ -0,0 +1,343 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 26.10.21 + +""" +from django.test import Client + +from django.contrib.auth.models import Group +from django.urls import reverse + +from konova.settings import DEFAULT_GROUP, ZB_GROUP, ETS_GROUP +from konova.tests.test_views import BaseViewTestCase + + +class InterventionViewTestCase(BaseViewTestCase): + + @classmethod + def setUpTestData(cls) -> None: + super().setUpTestData() + + # Prepare urls + cls.index_url = reverse("intervention:index", args=()) + cls.new_url = reverse("intervention:new", args=()) + cls.new_id_url = reverse("intervention:new-id", args=()) + cls.detail_url = reverse("intervention:detail", args=(cls.intervention.id,)) + cls.log_url = reverse("intervention:log", args=(cls.intervention.id,)) + cls.edit_url = reverse("intervention:edit", args=(cls.intervention.id,)) + cls.remove_url = reverse("intervention:remove", args=(cls.intervention.id,)) + cls.share_url = reverse("intervention:share", args=(cls.intervention.id, cls.intervention.access_token,)) + cls.share_create_url = reverse("intervention:share-create", args=(cls.intervention.id,)) + cls.run_check_url = reverse("intervention:check", args=(cls.intervention.id,)) + cls.record_url = reverse("intervention:record", args=(cls.intervention.id,)) + cls.report_url = reverse("intervention:report", args=(cls.intervention.id,)) + + def test_views_anonymous_user(self): + """ Check correct status code for all requests + + Assumption: User not logged in + + Returns: + + """ + # Unknown client + client = Client() + + success_urls = [ + self.report_url, + ] + login_redirect_base = f"{self.login_url}?next=" + fail_urls = { + self.detail_url: f"{login_redirect_base}{self.detail_url}", + self.index_url: f"{login_redirect_base}{self.index_url}", + self.log_url: f"{login_redirect_base}{self.log_url}", + self.new_id_url: f"{login_redirect_base}{self.new_id_url}", + self.new_url: f"{login_redirect_base}{self.new_url}", + self.edit_url: f"{login_redirect_base}{self.edit_url}", + self.remove_url: f"{login_redirect_base}{self.remove_url}", + self.share_url: f"{login_redirect_base}{self.share_url}", + self.share_create_url: f"{login_redirect_base}{self.share_create_url}", + self.run_check_url: f"{login_redirect_base}{self.run_check_url}", + self.record_url: f"{login_redirect_base}{self.record_url}", + } + + self.assert_url_success(client, success_urls) + + for url in fail_urls: + response = client.get(url, follow=True) + self.assertEqual(response.redirect_chain[0], (f"{self.login_url}?next={url}", 302), msg=f"Failed for {url}. Redirect chain is {response.redirect_chain}") + + def test_views_logged_in_no_groups(self): + """ Check correct status code for all requests + + Assumption: User logged in but has no groups + + Returns: + + """ + # Login client + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + self.superuser.groups.set([]) + success_urls = [ + self.index_url, + self.report_url, + self.detail_url, + ] + fail_urls = [ + self.log_url, + self.new_id_url, + self.new_url, + self.edit_url, + self.remove_url, + self.share_url, + self.share_create_url, + self.run_check_url, + self.record_url, + ] + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + + def test_views_logged_in_default_group_shared(self): + """ Check correct status code for all requests + + Assumption: User logged in and is default group member + + Returns: + + """ + # Login client + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + + # Add user to default group + default_group = Group.objects.get(name=DEFAULT_GROUP) + self.superuser.groups.set([default_group]) + self.intervention.share_with_list([self.superuser]) + + success_urls = [ + self.index_url, + self.report_url, + self.detail_url, + self.log_url, + self.new_id_url, + self.new_url, + self.edit_url, + self.remove_url, + self.share_create_url, + ] + fail_urls = [ + self.run_check_url, + self.record_url, + ] + success_urls_redirect = { + self.share_url: self.detail_url + } + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + self.assert_url_success_redirect(client, success_urls_redirect) + + def test_views_logged_in_default_group_unshared(self): + """ Check correct status code for all requests + + Assumption: User logged in and is default group member but data is not shared with + + Returns: + + """ + # Login client + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + + # Add user to default group + default_group = Group.objects.get(name=DEFAULT_GROUP) + self.superuser.groups.set([default_group]) + self.intervention.share_with_list([]) + + success_urls = [ + self.index_url, + self.report_url, + self.detail_url, + self.new_id_url, + self.new_url, + ] + fail_urls = [ + self.run_check_url, + self.record_url, + self.edit_url, + self.remove_url, + self.share_create_url, + self.log_url, + ] + success_urls_redirect = { + self.share_url: self.detail_url + } + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + self.assert_url_success_redirect(client, success_urls_redirect) + + def test_views_logged_in_zb_group_shared(self): + """ Check correct status code for all requests + + Assumption: User logged in and is registration office member and data is shared + + Returns: + + """ + # Login client + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + + # Add user to zb group + zb_group = self.groups.get(name=ZB_GROUP) + self.superuser.groups.set([zb_group]) + self.intervention.share_with_list([self.superuser]) + + success_urls = [ + self.index_url, + self.report_url, + self.detail_url, + self.run_check_url, + ] + fail_urls = [ + self.log_url, + self.new_id_url, + self.new_url, + self.edit_url, + self.remove_url, + self.share_create_url, + self.record_url, + ] + success_urls_redirect = { + self.share_url: self.detail_url + } + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + self.assert_url_success_redirect(client, success_urls_redirect) + + def test_views_logged_in_zb_group_unshared(self): + """ Check correct status code for all requests + + Assumption: User logged in and is registration office member but data is not shared + + Returns: + + """ + # Login client + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + + # Add user to zb group + zb_group = self.groups.get(name=ZB_GROUP) + self.superuser.groups.set([zb_group]) + self.intervention.share_with_list([]) + + success_urls = [ + self.index_url, + self.report_url, + self.detail_url, + ] + fail_urls = [ + self.log_url, + self.new_id_url, + self.new_url, + self.edit_url, + self.remove_url, + self.share_create_url, + self.record_url, + self.run_check_url, + ] + success_urls_redirect = { + self.share_url: self.detail_url + } + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + self.assert_url_success_redirect(client, success_urls_redirect) + + def test_views_logged_in_ets_group_shared(self): + """ Check correct status code for all requests + + Assumption: User logged in and is conservation office member and data is shared with + + Returns: + + """ + # Login client + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + + # Add user to ets group + ets_group = Group.objects.get(name=ETS_GROUP) + self.superuser.groups.set([ets_group]) + self.intervention.share_with_list([self.superuser]) + + success_urls = [ + self.index_url, + self.report_url, + self.detail_url, + self.record_url, + ] + fail_urls = [ + self.log_url, + self.new_id_url, + self.new_url, + self.edit_url, + self.remove_url, + self.share_create_url, + self.run_check_url, + ] + success_urls_redirect = { + self.share_url: self.detail_url + } + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + self.assert_url_success_redirect(client, success_urls_redirect) + + def test_views_logged_in_ets_group_unshared(self): + """ Check correct status code for all requests + + Assumption: User logged in and is registration office member and data is not shared with + + Returns: + + """ + # Login client + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + + # Add user to default group + ets_group = Group.objects.get(name=ETS_GROUP) + self.superuser.groups.set([ets_group]) + self.intervention.share_with_list([]) + + success_urls = [ + self.index_url, + self.report_url, + self.detail_url, + ] + fail_urls = [ + self.record_url, + self.log_url, + self.new_id_url, + self.new_url, + self.edit_url, + self.remove_url, + self.share_create_url, + self.run_check_url, + ] + # Define urls where a redirect to a specific location is the proper response + success_urls_redirect = { + self.share_url: self.detail_url + } + + self.assert_url_success(client, success_urls) + self.assert_url_fail(client, fail_urls) + self.assert_url_success_redirect(client, success_urls_redirect) diff --git a/intervention/tests/test_workflow.py b/intervention/tests/test_workflow.py new file mode 100644 index 0000000..f4484c6 --- /dev/null +++ b/intervention/tests/test_workflow.py @@ -0,0 +1,450 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 10.11.21 + +""" +import datetime + +from django.core.exceptions import ObjectDoesNotExist +from django.urls import reverse + +from compensation.models import Payment, EcoAccountDeduction +from intervention.models import Intervention +from konova.settings import ETS_GROUP, ZB_GROUP +from konova.tests.test_views import BaseWorkflowTestCase +from user.models import UserActionLogEntry, UserAction + + +class InterventionWorkflowTestCase(BaseWorkflowTestCase): + """ This test case adds workflow tests + + """ + + @classmethod + def setUpTestData(cls): + super().setUpTestData() + + def setUp(self) -> None: + super().setUp() + # Recreate a new (bare minimum) intervention before each test + self.intervention = self.create_dummy_intervention() + self.intervention.share_with(self.superuser) + + def test_new(self): + """ + Checks a 'normal' case of creating a new intervention. + We expect the user to be redirected as expected right away to the detail page of the new intervention. + We expect the user to be directly added to the shared user of the intervention + We expect that a minimum of data (identifier, title, (empty) geometry) can be used to create an intervention + + Returns: + + """ + # Define the intervention identifier for easier handling on the next lines + test_id = self.create_dummy_string() + test_title = self.create_dummy_string() + test_geom = self.create_dummy_geometry() + + new_url = reverse("intervention:new", args=()) + + # Expect the new intervention does not exist yet + obj_exists = Intervention.objects.filter( + identifier=test_id + ).exists() + self.assertFalse(obj_exists) + + # User creates a new intervention with bare minimum content, using the proper url and post data + post_data = { + "identifier": test_id, + "title": test_title, + "geom": test_geom.geojson, + } + response = self.client_user.post( + new_url, + post_data + ) + + # Now expect the new intervention to exist in the db + try: + obj = Intervention.objects.get( + identifier=test_id + ) + self.assertEqual(obj.identifier, test_id) + self.assertEqual(obj.title, test_title) + self.assert_equal_geometries(obj.geometry.geom, test_geom) + except ObjectDoesNotExist: + # Fail if there is no such object + self.fail() + + expected_redirect = reverse("intervention:detail", args=(obj.id,)) + # Expect redirect to the detail view of the new intervention + self.assertRedirects(response, expected_redirect) + + # Expect user to be first and only user with shared access + self.assertIn(self.superuser, obj.users.all()) + self.assertEqual(1, obj.users.count()) + + def test_checkability(self): + """ Tests that the intervention can only be checked if all required data has been added + + Returns: + + """ + check_url = reverse("intervention:check", args=(self.intervention.id,)) + post_data = { + "checked_intervention": True, + "checked_comps": True, + } + + # First of all, the intervention should not be checked, yet + if self.intervention.checked: + self.intervention.checked.delete() + self.intervention.refresh_from_db() + + # Make sure the dummy compensation is currently not linked to the intervention, + # since the system would check on it's quality as well (and it would fail) + self.intervention.compensations.set([]) + + # Run request with an incomplete intervention and missing user privileges --> expect to fail + self.client_user.post(check_url, post_data) + + # We expect that the intervention is still not checked now + self.intervention.refresh_from_db() + self.assertIsNone(self.intervention.checked) + + # Now give the user the required privileges by adding to the registration office group + group = self.groups.get(name=ZB_GROUP) + self.superuser.groups.add(group) + + # Now fill in the missing data, so the intervention is 'valid' for checking + self.intervention = self.fill_out_intervention(self.intervention) + + # Then add a dummy payment, so we pass the quality check (Checks whether any kind of valid compensation exists) + payment = Payment.objects.create(amount=10.00, due_on=None, comment="No due date because test") + self.intervention.payments.add(payment) + + # Run request again + self.client_user.post(check_url, post_data) + + # Update intervention from db + self.intervention.refresh_from_db() + + # We expect the intervention to be checked now and contains the proper data + # Attention: We check the timestamp only on the date, not the time, since the microseconds delay would result + # in an unwanted assertion error + checked = self.intervention.checked + self.assertIsNotNone(checked) + self.assertEqual(self.superuser, checked.user) + self.assertEqual(datetime.date.today(), checked.timestamp.date()) + self.assertEqual(UserAction.CHECKED, checked.action) + + # Expect the user action now to live in the log + self.assertIn(checked, self.intervention.log.all()) + + def test_recordability(self): + """ Tests that the intervention can only be recorded if all required data has been added + + Returns: + + """ + record_url = reverse("intervention:record", args=(self.intervention.id,)) + post_data = { + "confirm": True, + } + + # Make sure the dummy compensation is currently not linked to the intervention, + # since we would check on it's quality as well then + self.intervention.compensations.set([]) + + # First of all, the intervention should not be recorded, yet + if self.intervention.recorded: + self.intervention.recorded.delete() + self.intervention.refresh_from_db() + + # Run request with an incomplete intervention and missing user privileges --> expect to fail + self.client_user.post(record_url, post_data) + + # We expect that the intervention is still not recorded now + self.intervention.refresh_from_db() + self.assertIsNone(self.intervention.recorded) + + # Now give the user the required privileges by adding to the ETS group + group = self.groups.get(name=ETS_GROUP) + self.superuser.groups.add(group) + + # Now fill in the missing data, so the intervention is 'valid' for recording + self.intervention = self.fill_out_intervention(self.intervention) + + # Then add a dummy payment, so we pass the quality check (Checks whether any kind of valid compensation exists) + payment = Payment.objects.create(amount=10.00, due_on=None, comment="No due date because test") + self.intervention.payments.add(payment) + + # Run request again + self.client_user.post(record_url, post_data) + + # Update intervention from db + self.intervention.refresh_from_db() + + # We expect the intervention to be recorded now and contains the proper data + # Attention: We check the timestamp only on the date, not the time, since the microseconds delay would result + # in an unwanted assertion error + self.assertIsNotNone(self.intervention.recorded) + self.assertEqual(self.superuser, self.intervention.recorded.user) + self.assertEqual(datetime.date.today(), self.intervention.recorded.timestamp.date()) + self.assertEqual(UserAction.RECORDED, self.intervention.recorded.action) + + # Expect the user action now to live in the log + self.assertIn(self.intervention.recorded, self.intervention.log.all()) + + def subtest_add_payment(self): + """ Subroutine for 'normal' payment tests + + Checks a 'normal' case of adding a payment. + We expect a new payment to be addable to an existing intervention + + Returns: + + """ + # Attention: Despite the fact, this url refers to a compensation app route, we test it here for the interventions. + # Reason: A payment is some kind of compensation for an intervention. Therefore it lives inside the compensation app. + # BUT: Payments are added on the intervention detail page. Therefore it's part of a regular intervention workflow. + new_payment_url = reverse("compensation:pay-new", args=(self.intervention.id,)) + + # Make sure there are no payments on the intervention, yet + self.assertEqual(0, self.intervention.payments.count()) + + # Create form data to be sent to the url + test_amount = 10.00 + test_due = "2021-01-01" + test_comment = self.create_dummy_string() + post_data = { + "amount": test_amount, + "due": test_due, + "comment": test_comment + } + self.client_user.post( + new_payment_url, + post_data, + ) + # We do not test for any redirects in here, since the new payment url is realized using a modal, which does not + # perform any direct redirects but instead reloads the page after finisihing. + + # Make sure there is a new payment on the intervention now + self.assertEqual(1, self.intervention.payments.count()) + + # Make sure the payment contains our data + payment = self.intervention.payments.all()[0] + self.assertEqual(payment.amount, test_amount) + self.assertEqual(payment.due_on, datetime.date.fromisoformat(test_due)) + self.assertEqual(payment.comment, test_comment) + return payment + + def subtest_delete_payment(self, payment: Payment): + """ Subroutine for 'normal' payment tests + + Checks a 'normal' case of adding a payment. + We expect a payment to be deletable to an existing intervention + + Returns: + + """ + # Create removing url for the payment + remove_url = reverse("compensation:pay-remove", args=(payment.id,)) + post_data = { + "confirm": True, + } + self.client_user.post( + remove_url, + post_data + ) + + # Expect the payment to be gone from the db and therefore from the intervention as well + self.assert_object_is_deleted(payment) + + # Now make sure the intervention has no payments anymore + self.assertEqual(0, self.intervention.payments.count()) + + def test_payments(self): + """ + Checks a 'normal' case of adding a payment. + We expect a new payment to be addable to an existing intervention + We expect a payment to be deletable from an existing intervention + + Returns: + + """ + # Create new payment for the default intervention + payment = self.subtest_add_payment() + + # Now remove the payment again + self.subtest_delete_payment(payment) + + def subtest_add_deduction_fail_positive(self, new_url: str, post_data: dict, test_surface: float): + """ Holds tests for postivie fails of new deduction creation + + Reasons for failing are: + * EcoAccount does not provide enough 'deductable_surface' + * EcoAccount is not recorded (not "approved"), yet + * EcoAccount is not shared with performing user + + Args: + new_url (str): The url to send the post data to + post_data (dict): The form post data to be sent + + Returns: + + """ + # Before running fail positive tests, we need to have an account in a (normally) fine working state + self.assertIsNotNone(self.eco_account.recorded) # -> is recorded + self.assertGreater(self.eco_account.deductable_surface, test_surface) # -> has more deductable surface than we need + self.assertIn(self.superuser, self.eco_account.users.all()) # -> is shared with the performing user + + # Count the number of already existing deductions in total and for the account for later comparison + num_deductions = self.eco_account.deductions.count() + num_deductions_total = EcoAccountDeduction.objects.count() + + # First test that a deduction can not be created, if the account does not provide + # enough surface for the deduction. So we modify the deductable surface of the account + self.eco_account.deductable_surface = 0 + self.eco_account.save() + + # Now perform the (expected) failing request + self.client_user.post(new_url, post_data) + + # Expect no changes at all, since the deduction should not have been created + self.assertEqual(num_deductions, self.eco_account.deductions.count()) + self.assertEqual(num_deductions_total, EcoAccountDeduction.objects.count()) + + # Now restore the deductable surface to a valid size back again but remove the user from the shared list + self.eco_account.deductable_surface = test_surface + 100.00 + self.eco_account.share_with_list([]) + self.eco_account.save() + + # Now perform the (expected) failing request (again) + self.client_user.post(new_url, post_data) + + # Expect no changes at all, since the account is not shared + self.assertEqual(num_deductions, self.eco_account.deductions.count()) + self.assertEqual(num_deductions_total, EcoAccountDeduction.objects.count()) + + # Restore the sharing but remove the recording state + self.eco_account.share_with_list([self.superuser]) + self.eco_account.recorded.delete() + self.eco_account.refresh_from_db() + self.eco_account.save() + + # Now perform the (expected) failing request (again) + self.client_user.post(new_url, post_data) + + # Expect no changes at all, since the account is no shared with the user, yet + self.assertEqual(num_deductions, self.eco_account.deductions.count()) + self.assertEqual(num_deductions_total, EcoAccountDeduction.objects.count()) + + def subtest_add_deduction_normal(self, new_url: str, post_data: dict, test_surface: float): + """ Holds tests on working ("normal") deduction creation + + Args: + new_url (str): The url to send the post data to + post_data (dict): The form post data to be sent + test_surface (float): The expected surface of the deduction + + Returns: + + """ + # Prepare the account for a working situation (enough deductable surface, recorded and shared) + self.eco_account.deductable_surface = 10000.00 + if self.eco_account.recorded is None: + rec_action = UserActionLogEntry.objects.create( + user=self.superuser, + action=UserAction.RECORDED + ) + self.eco_account.recorded = rec_action + self.eco_account.share_with_list([self.superuser]) + self.eco_account.save() + + # Run the request + self.client_user.post(new_url, post_data) + + # Expect the deduction to be created, since all constraints are fulfilled + self.assertEqual(1, self.eco_account.deductions.count()) + self.assertEqual(1, EcoAccountDeduction.objects.count()) + + # Make sure the deduction contains the expected data + deduction = EcoAccountDeduction.objects.first() + self.assertEqual(deduction.surface, test_surface) + self.assertEqual(deduction.intervention, self.intervention) + self.assertEqual(deduction.account, self.eco_account) + + # Return deduction for further usage in tests + return deduction + + def subtest_add_deduction(self): + """ Holds test for adding a new deduction + + Contains tests for + * positive fails (as expected) + * normal cases + + Returns: + + """ + # Create the url for creating a new deduction + new_url = reverse("compensation:acc-new-deduction", args=(self.eco_account.id,)) + + # Prepare the form data + test_surface = 100.00 + post_data = { + "surface": test_surface, + "account": self.eco_account.id, + "intervention": self.intervention.id, + } + # Run some tests for regular, working cases + deduction = self.subtest_add_deduction_normal(new_url, post_data, test_surface) + + # Run some tests where we expect the creation of a deduction to fail (as expected) + self.subtest_add_deduction_fail_positive(new_url, post_data, test_surface) + + # Return deduction for further usage in tests + return deduction + + def subtest_delete_deduction(self, deduction: EcoAccountDeduction): + """ Holds test for deleting a deduction + + Returns: + + """ + # Prepare url for deleting of this deduction + delete_url = reverse("compensation:acc-remove-deduction", args=(self.eco_account.id, deduction.id,)) + post_data = { + "confirm": True + } + # Save number of current deductions for later comparison + num_deductions = self.eco_account.deductions.count() + num_deductions_total = EcoAccountDeduction.objects.count() + + # Run request + self.client_user.post(delete_url, post_data) + + # Expect the deduction to be gone from the db and relations + self.assertEqual(num_deductions - 1, self.eco_account.deductions.count()) + self.assertEqual(num_deductions_total - 1, EcoAccountDeduction.objects.count()) + + # Expect the deduction to be totally gone + self.assert_object_is_deleted(deduction) + + def test_deduction(self): + """ + Checks a 'normal case of adding a deduction. + We expect a new deduction to be addable to an existing intervention + We expect a deduction to be deletable + + Returns: + + """ + # Create a new deduction for the default intervention + deduction = self.subtest_add_deduction() + + # Now remove the deduction again + self.subtest_delete_deduction(deduction) diff --git a/intervention/urls.py b/intervention/urls.py index 022c3c9..df1e0d7 100644 --- a/intervention/urls.py +++ b/intervention/urls.py @@ -8,7 +8,7 @@ Created on: 30.11.20 from django.urls import path from intervention.views import index_view, new_view, detail_view, edit_view, remove_view, new_document_view, share_view, \ - create_share_view, remove_revocation_view, new_revocation_view, run_check_view, log_view, new_deduction_view, \ + create_share_view, remove_revocation_view, new_revocation_view, check_view, log_view, new_deduction_view, \ record_view, remove_document_view, get_document_view, get_revocation_view, new_id_view, report_view app_name = "intervention" @@ -22,7 +22,7 @@ urlpatterns = [ path('/remove', remove_view, name='remove'), path('/share/', share_view, name='share'), path('/share', create_share_view, name='share-create'), - path('/check', run_check_view, name='run-check'), + path('/check', check_view, name='check'), path('/record', record_view, name='record'), path('/report', report_view, name='report'), diff --git a/intervention/views.py b/intervention/views.py index 6c2f524..a535d75 100644 --- a/intervention/views.py +++ b/intervention/views.py @@ -5,7 +5,7 @@ from django.shortcuts import render from intervention.forms.forms import NewInterventionForm, EditInterventionForm from intervention.forms.modalForms import ShareInterventionModalForm, NewRevocationModalForm, \ - RunCheckModalForm, NewDeductionModalForm + CheckModalForm, NewDeductionModalForm from intervention.models import Intervention, Revocation, InterventionDocument, RevocationDocument from intervention.tables import InterventionTable from konova.contexts import BaseContext @@ -93,6 +93,7 @@ def new_view(request: HttpRequest): @login_required +@default_group_required def new_id_view(request: HttpRequest): """ JSON endpoint @@ -111,6 +112,8 @@ def new_id_view(request: HttpRequest): @login_required +@default_group_required +@shared_access_required(Intervention, "id") def new_document_view(request: HttpRequest, id: str): """ Renders a form for uploading new documents @@ -129,6 +132,7 @@ def new_document_view(request: HttpRequest, id: str): @login_required +@default_group_required def get_revocation_view(request: HttpRequest, doc_id: str): """ Returns the revocation document as downloadable file @@ -142,10 +146,18 @@ def get_revocation_view(request: HttpRequest, doc_id: str): """ doc = get_object_or_404(RevocationDocument, id=doc_id) + # File download only possible if related instance is shared with user + if not doc.instance.users.filter(id=request.user.id): + messages.info( + request, + DATA_UNSHARED + ) + return redirect("intervention:detail", id=doc.instance.id) return get_document(doc) @login_required +@default_group_required def get_document_view(request: HttpRequest, doc_id: str): """ Returns the document as downloadable file @@ -172,6 +184,7 @@ def get_document_view(request: HttpRequest, doc_id: str): @login_required +@default_group_required def remove_document_view(request: HttpRequest, doc_id: str): """ Removes the document from the database and file system @@ -251,6 +264,8 @@ def detail_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(Intervention, "id") def edit_view(request: HttpRequest, id: str): """ Renders a view for editing interventions @@ -293,6 +308,7 @@ def edit_view(request: HttpRequest, id: str): @login_required @default_group_required +@shared_access_required(Intervention, "id") def remove_view(request: HttpRequest, id: str): """ Renders a remove view for this intervention @@ -362,7 +378,7 @@ def share_view(request: HttpRequest, id: str, token: str): request, _("{} has been shared with you").format(intervention.identifier) ) - intervention.users.add(user) + intervention.share_with(user) return redirect("intervention:detail", id=id) else: messages.error( @@ -374,6 +390,8 @@ def share_view(request: HttpRequest, id: str, token: str): @login_required +@default_group_required +@shared_access_required(Intervention, "id") def create_share_view(request: HttpRequest, id: str): """ Renders sharing form for an intervention @@ -393,7 +411,9 @@ def create_share_view(request: HttpRequest, id: str): @login_required -def run_check_view(request: HttpRequest, id: str): +@registration_office_group_required +@shared_access_required(Intervention, "id") +def check_view(request: HttpRequest, id: str): """ Renders check form for an intervention Args: @@ -404,7 +424,7 @@ def run_check_view(request: HttpRequest, id: str): """ intervention = get_object_or_404(Intervention, id=id) - form = RunCheckModalForm(request.POST or None, instance=intervention, user=request.user) + form = CheckModalForm(request.POST or None, instance=intervention, user=request.user) return form.process_request( request, msg_success=_("Check performed"), @@ -413,6 +433,8 @@ def run_check_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(Intervention, "id") def new_revocation_view(request: HttpRequest, id: str): """ Renders sharing form for an intervention @@ -432,6 +454,8 @@ def new_revocation_view(request: HttpRequest, id: str): @login_required +@default_group_required +@shared_access_required(Intervention, "id") def log_view(request: HttpRequest, id: str): """ Renders a log view using modal @@ -457,6 +481,7 @@ def log_view(request: HttpRequest, id: str): @login_required @default_group_required +@shared_access_required(Intervention, "id") def new_deduction_view(request: HttpRequest, id: str): """ Renders a modal form view for creating deductions @@ -477,6 +502,7 @@ def new_deduction_view(request: HttpRequest, id: str): @login_required @conservation_office_group_required +@shared_access_required(Intervention, "id") def record_view(request: HttpRequest, id: str): """ Renders a modal form for recording an intervention diff --git a/konova/autocompletes.py b/konova/autocompletes.py index 3930b6b..ca5ee2c 100644 --- a/konova/autocompletes.py +++ b/konova/autocompletes.py @@ -29,14 +29,13 @@ class EcoAccountAutocomplete(Select2QuerySetView): deleted=None, recorded__isnull=False, users__in=[self.request.user], + ).order_by( + "identifier" ) if self.q: qs = qs.filter( identifier__icontains=self.q ) - qs = qs.order_by( - "identifier" - ) return qs @@ -52,14 +51,13 @@ class InterventionAutocomplete(Select2QuerySetView): qs = Intervention.objects.filter( deleted=None, users__in=[self.request.user], + ).order_by( + "identifier" ) if self.q: qs = qs.filter( identifier__icontains=self.q ) - qs = qs.order_by( - "identifier" - ) return qs diff --git a/konova/models.py b/konova/models.py index 830916f..cc836a2 100644 --- a/konova/models.py +++ b/konova/models.py @@ -21,6 +21,7 @@ from compensation.settings import COMPENSATION_IDENTIFIER_TEMPLATE, COMPENSATION from ema.settings import EMA_ACCOUNT_IDENTIFIER_LENGTH, EMA_ACCOUNT_IDENTIFIER_TEMPLATE from intervention.settings import INTERVENTION_IDENTIFIER_LENGTH, INTERVENTION_IDENTIFIER_TEMPLATE from konova.settings import INTERVENTION_REVOCATION_DOC_PATH +from konova.utils import generators from konova.utils.generators import generate_random_string from user.models import UserActionLogEntry, UserAction @@ -121,11 +122,12 @@ class BaseObject(BaseResource): self.save() def add_log_entry(self, action: UserAction, user: User, comment: str): - """ Wraps adding of UserActionLogEntry to self.log + """ Wraps adding of UserActionLogEntry to log Args: action (UserAction): The performed UserAction user (User): Performing user + comment (str): The optional comment Returns: @@ -148,11 +150,34 @@ class BaseObject(BaseResource): Returns: """ - if hasattr(self, "users"): + if isinstance(self, ShareableObject): return self.users.filter(id=user.id) else: return User.objects.none() + def share_with(self, user: User): + """ Adds user to list of shared access users + + Args: + user (User): The user to be added to the object + + Returns: + + """ + if not self.is_shared_with(user): + self.users.add(user) + + def share_with_list(self, user_list: list): + """ Sets the list of shared access users + + Args: + user_list (list): The users to be added to the object + + Returns: + + """ + self.users.set(user_list) + def generate_new_identifier(self) -> str: """ Generates a new identifier for the intervention object @@ -315,12 +340,23 @@ class Geometry(BaseResource): geom = MultiPolygonField(null=True, blank=True, srid=DEFAULT_SRID) -class RecordableMixin: - """ Mixin to be combined with BaseObject class - - Provides functionality related to un/recording of data +class RecordableObject(models.Model): + """ Wraps record related fields and functionality """ + # Refers to "verzeichnen" + recorded = models.OneToOneField( + UserActionLogEntry, + on_delete=models.SET_NULL, + null=True, + blank=True, + help_text="Holds data on user and timestamp of this action", + related_name="+" + ) + + class Meta: + abstract = True + def set_unrecorded(self, user: User): """ Perform unrecording @@ -337,6 +373,7 @@ class RecordableMixin: self.recorded = None self.save() self.log.add(action) + return action def set_recorded(self, user: User): """ Perform recording @@ -354,8 +391,9 @@ class RecordableMixin: self.recorded = action self.save() self.log.add(action) + return action - def toggle_recorded(self, user: User): + def toggle_recorded(self, user: User) -> UserActionLogEntry: """ Un/Record intervention Args: @@ -365,18 +403,27 @@ class RecordableMixin: """ if not self.recorded: - self.set_recorded(user) + ret_log_entry = self.set_recorded(user) else: - self.set_unrecorded(user) + ret_log_entry = self.set_unrecorded(user) + return ret_log_entry -class CheckableMixin: - """ Mixin to be combined with BaseObject class +class CheckableObject(models.Model): + # Checks - Refers to "Genehmigen" but optional + checked = models.OneToOneField( + UserActionLogEntry, + on_delete=models.SET_NULL, + null=True, + blank=True, + help_text="Holds data on user and timestamp of this action", + related_name="+" + ) - Provides functionality related to un/checking of data + class Meta: + abstract = True - """ - def set_unchecked(self, user: User): + def set_unchecked(self) -> None: """ Perform unrecording Args: @@ -384,10 +431,13 @@ class CheckableMixin: Returns: """ + # Do not .delete() the checked attribute! Just set it to None, since a delete() would kill it out of the + # log history, which is not what we want! self.checked = None self.save() + return None - def set_checked(self, user: User): + def set_checked(self, user: User) -> UserActionLogEntry: """ Perform checking Args: @@ -403,8 +453,9 @@ class CheckableMixin: self.checked = action self.save() self.log.add(action) + return action - def toggle_checked(self, user: User): + def toggle_checked(self, user: User) -> UserActionLogEntry: """ Un/Record intervention Args: @@ -414,6 +465,57 @@ class CheckableMixin: """ if not self.checked: - self.set_checked(user) + ret_log_entry = self.set_checked(user) + else: + ret_log_entry = self.set_unchecked() + return ret_log_entry + + +class ShareableObject(models.Model): + # Users having access on this object + users = models.ManyToManyField(User, help_text="Users having access (data shared with)") + access_token = models.CharField( + max_length=255, + null=True, + blank=True, + help_text="Used for sharing access", + ) + + class Meta: + abstract = True + + def generate_access_token(self, make_unique: bool = False, rec_depth: int = 5): + """ Creates a new access token for the data + + Tokens are not used for identification of a table row. The share logic checks the intervention id as well + as the given token. Therefore two different interventions can hold the same access_token without problems. + For (possible) future changes to the share logic, the make_unique parameter may be used for checking whether + the access_token is already used in any intervention. If so, tokens will be generated as long as a free token + can be found. + + Args: + make_unique (bool): Perform check on uniqueness over all intervention entries + rec_depth (int): How many tries for generating a free random token (only if make_unique) + + Returns: + + """ + # Make sure we won't end up in an infinite loop of trying to generate access_tokens + rec_depth = rec_depth - 1 + if rec_depth < 0 and make_unique: + raise RuntimeError( + "Access token generating for {} does not seem to find a free random token! Aborted!".format(self.id) + ) + + # Create random token + token = generators.generate_random_string(15, True, True, False) + # Check dynamically wheter there is another instance of that model, which holds this random access token + _model = self._meta.concrete_model + token_used_in = _model.objects.filter(access_token=token) + # Make sure the token is not used anywhere as access_token, yet. + # Make use of QuerySet lazy method for checking if it exists or not. + if token_used_in and make_unique: + self.generate_access_token(make_unique, rec_depth) else: - self.set_unchecked(user) + self.access_token = token + self.save() diff --git a/konova/settings.py b/konova/settings.py index 05e3c71..c80c54f 100644 --- a/konova/settings.py +++ b/konova/settings.py @@ -50,7 +50,7 @@ PAGE_DEFAULT = 1 # SSO settings SSO_SERVER_BASE = "http://127.0.0.1:8000/" -SSO_SERVER = "{}sso/".format(SSO_SERVER_BASE) +SSO_SERVER = f"{SSO_SERVER_BASE}sso/" SSO_PRIVATE_KEY = "QuziFeih7U8DZvQQ1riPv2MXz0ZABupHED9wjoqZAqeMQaqkqTfxJDRXgSIyASwJ" SSO_PUBLIC_KEY = "AGGK7E8eT5X5u2GD38ygGG3GpAefmIldJiiWW7gldRPqCG1CzmUfGdvPSGDbEY2n" diff --git a/konova/tests/__init__.py b/konova/tests/__init__.py new file mode 100644 index 0000000..10799e8 --- /dev/null +++ b/konova/tests/__init__.py @@ -0,0 +1,7 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 26.10.21 + +""" diff --git a/konova/tests/test_views.py b/konova/tests/test_views.py new file mode 100644 index 0000000..b5712cd --- /dev/null +++ b/konova/tests/test_views.py @@ -0,0 +1,505 @@ +""" +Author: Michel Peltriaux +Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany +Contact: michel.peltriaux@sgdnord.rlp.de +Created on: 26.10.21 + +""" +import datetime + +from django.contrib.auth.models import User, Group +from django.contrib.gis.geos import MultiPolygon, Polygon +from django.core.exceptions import ObjectDoesNotExist +from django.test import TestCase, Client +from django.urls import reverse + +from codelist.models import KonovaCode +from compensation.models import Compensation, CompensationState, CompensationAction, EcoAccount +from intervention.models import LegalData, ResponsibilityData, Intervention +from konova.management.commands.setup_data import GROUPS_DATA +from konova.models import Geometry +from konova.settings import DEFAULT_GROUP +from konova.utils.generators import generate_random_string +from user.models import UserActionLogEntry, UserAction + + +class BaseTestCase(TestCase): + """ Provides reusable functionality for specialized test cases + + """ + users = None + groups = None + superuser = None + user = None + intervention = None + compensation = None + eco_account = None + comp_state = None + comp_action = None + codes = None + + superuser_pw = "root" + user_pw = "root" + + class Meta: + abstract = True + + @classmethod + def setUpTestData(cls): + cls.create_users() + cls.create_groups() + cls.intervention = cls.create_dummy_intervention() + cls.compensation = cls.create_dummy_compensation() + cls.eco_account = cls.create_dummy_eco_account() + cls.create_dummy_states() + cls.create_dummy_action() + cls.codes = cls.create_dummy_codes() + + @classmethod + def create_users(cls): + # Create superuser and regular user + cls.superuser = User.objects.create_superuser( + username="root", + email="root@root.com", + password=cls.superuser_pw, + ) + cls.user = User.objects.create_user( + username="user1", + email="user@root.com", + password=cls.user_pw + ) + cls.users = User.objects.all() + + @classmethod + def create_groups(cls): + # Create groups + for group_data in GROUPS_DATA: + name = group_data.get("name") + Group.objects.get_or_create( + name=name, + ) + cls.groups = Group.objects.all() + + @staticmethod + def create_dummy_string(prefix: str = ""): + """ Create + + Returns: + + """ + return f"{prefix}{generate_random_string(3, True)}" + + @classmethod + def create_dummy_intervention(cls): + """ Creates an intervention which can be used for tests + + Returns: + + """ + # Create dummy data + # Create log entry + action = UserActionLogEntry.objects.create( + user=cls.superuser, + action=UserAction.CREATED, + ) + # Create legal data object (without M2M laws first) + legal_data = LegalData.objects.create() + # Create responsible data object + responsibility_data = ResponsibilityData.objects.create() + geometry = Geometry.objects.create() + # Finally create main object, holding the other objects + intervention = Intervention.objects.create( + identifier="TEST", + title="Test_title", + responsible=responsibility_data, + legal=legal_data, + created=action, + geometry=geometry, + comment="Test", + ) + intervention.generate_access_token(make_unique=True) + return intervention + + @classmethod + def create_dummy_compensation(cls): + """ Creates a compensation which can be used for tests + + Returns: + + """ + if cls.intervention is None: + cls.intervention = cls.create_dummy_intervention() + # Create dummy data + # Create log entry + action = UserActionLogEntry.objects.create( + user=cls.superuser, + action=UserAction.CREATED, + ) + geometry = Geometry.objects.create() + # Finally create main object, holding the other objects + compensation = Compensation.objects.create( + identifier="TEST", + title="Test_title", + intervention=cls.intervention, + created=action, + geometry=geometry, + comment="Test", + ) + return compensation + + @classmethod + def create_dummy_eco_account(cls): + """ Creates an eco account which can be used for tests + + Returns: + + """ + # Create dummy data + # Create log entry + action = UserActionLogEntry.objects.create( + user=cls.superuser, + action=UserAction.CREATED, + ) + geometry = Geometry.objects.create() + # Create responsible data object + lega_data = LegalData.objects.create() + responsible_data = ResponsibilityData.objects.create() + # Finally create main object, holding the other objects + eco_account = EcoAccount.objects.create( + identifier="TEST", + title="Test_title", + legal=lega_data, + responsible=responsible_data, + created=action, + geometry=geometry, + comment="Test", + ) + return eco_account + + @classmethod + def create_dummy_states(cls): + """ Creates an intervention which can be used for tests + + Returns: + + """ + cls.comp_state = CompensationState.objects.create( + surface=10.00, + biotope_type=None, + ) + return cls.comp_state + + @classmethod + def create_dummy_action(cls): + """ Creates an intervention which can be used for tests + + Returns: + + """ + cls.comp_action = CompensationAction.objects.create( + amount=10 + ) + return cls.comp_action + + @classmethod + def create_dummy_codes(cls): + """ Creates some dummy KonovaCodes which can be used for testing + + Returns: + + """ + codes = KonovaCode.objects.bulk_create([ + KonovaCode(id=1, is_selectable=True, long_name="Test1"), + KonovaCode(id=2, is_selectable=True, long_name="Test2"), + KonovaCode(id=3, is_selectable=True, long_name="Test3"), + KonovaCode(id=4, is_selectable=True, long_name="Test4"), + ]) + return codes + + @staticmethod + def create_dummy_geometry() -> MultiPolygon: + """ Creates some geometry + + Returns: + + """ + polygon = Polygon.from_bbox((7.157593, 49.882247, 7.816772, 50.266521)) + polygon.srid = 4326 + polygon = polygon.transform(3857, clone=True) + return MultiPolygon(polygon, srid=3857) # 3857 is the default srid used for MultiPolygonField in the form + + @classmethod + def fill_out_intervention(cls, intervention: Intervention) -> Intervention: + """ Adds all required (dummy) data to an intervention + + Args: + intervention (Intervention): The intervention which shall be filled out + + Returns: + intervention (Intervention): The modified intervention + """ + intervention.responsible.registration_office = KonovaCode.objects.get(id=1) + intervention.responsible.conservation_office = KonovaCode.objects.get(id=2) + intervention.responsible.registration_file_number = "test" + intervention.responsible.conservation_file_number = "test" + intervention.responsible.handler = "handler" + intervention.responsible.save() + intervention.legal.registration_date = datetime.date.fromisoformat("1970-01-01") + intervention.legal.binding_date = datetime.date.fromisoformat("1970-01-01") + intervention.legal.process_type = KonovaCode.objects.get(id=3) + intervention.legal.save() + intervention.legal.laws.set([KonovaCode.objects.get(id=(4))]) + intervention.geometry.geom = cls.create_dummy_geometry() + intervention.geometry.save() + intervention.save() + return intervention + + @classmethod + def fill_out_compensation(cls, compensation: Compensation) -> Compensation: + """ Adds all required (dummy) data to a compensation + + Args: + compensation (Compensation): The compensation which shall be filled out + + Returns: + compensation (Compensation): The modified compensation + """ + compensation.after_states.add(cls.comp_state) + compensation.before_states.add(cls.comp_state) + compensation.actions.add(cls.comp_action) + compensation.geometry.geom = cls.create_dummy_geometry() + compensation.geometry.save() + return compensation + + def assert_equal_geometries(self, geom1: MultiPolygon, geom2: MultiPolygon): + """ Assert for geometries to be equal + + Transforms the geometries to matching srids before checking + + Args: + geom1 (MultiPolygon): A geometry + geom2 (MultiPolygon): A geometry + + Returns: + + """ + # Two empty geometries are basically identical - no further testing + if geom1.empty and geom2.empty: + self.assertTrue(True) + return + + if geom1.srid != geom2.srid: + # Due to prior possible transformation of any of these geometries, we need to make sure there exists a + # transformation from one coordinate system into the other, which is valid + geom1_t = geom1.transform(geom2.srid, clone=True) + geom2_t = geom2.transform(geom1.srid, clone=True) + self.assertTrue(geom1_t.equals(geom2) or geom2_t.equals(geom1)) + else: + self.assertTrue(geom1.equals(geom2)) + + +class BaseViewTestCase(BaseTestCase): + """ Wraps basic test functionality, reusable for every specialized ViewTestCase + + """ + login_url = None + + class Meta: + abstract = True + + @classmethod + def setUpTestData(cls) -> None: + super().setUpTestData() + cls.login_url = reverse("simple-sso-login") + + def assert_url_success(self, client: Client, urls: list): + """ Assert for all given urls a direct 200 response + + Args: + client (Client): The performing client + urls (list): An iterable list of urls to be checked + + Returns: + + """ + for url in urls: + response = client.get(url) + self.assertEqual(response.status_code, 200, msg=f"Failed for {url}") + + def assert_url_success_redirect(self, client: Client, urls: dict): + """ Assert for all given urls a 302 response to a certain location. + + Assert the redirect being the expected behaviour. + + Args: + client (Client): The performing client + urls (dict): An iterable dict of (urls, redirect_to_url) pairs to be checked + + Returns: + + """ + for url, redirect_to in urls.items(): + response = client.get(url, follow=True) + # Expect redirects to the landing page + self.assertEqual(response.redirect_chain[0], (redirect_to, 302), msg=f"Failed for {url}") + + def assert_url_fail(self, client: Client, urls: list): + """ Assert for all given urls a direct 302 response + + Args: + client (Client): The performing client + urls (list): An iterable list of urls to be checked + + Returns: + + """ + for url in urls: + response = client.get(url) + self.assertEqual(response.status_code, 302, msg=f"Failed for {url}") + + +class KonovaViewTestCase(BaseViewTestCase): + """ Holds tests for all regular views, which are not app specific + + """ + @classmethod + def setUpTestData(cls) -> None: + super().setUpTestData() + + cls.home_url = reverse("home") + + def test_views_logged_in_no_groups(self): + """ Check correct status code for all requests + + Assumption: User logged in but has no groups + + Returns: + + """ + # User logged in + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + self.superuser.groups.set([]) + success_urls = [ + self.home_url + ] + self.assert_url_success(client, success_urls) + + def test_views_anonymous_user(self): + """ Check correct status code for all requests + + Assumption: User logged in but has no groups + + Returns: + + """ + # User not logged in + client = Client() + urls = [ + self.home_url + ] + self.assert_url_fail(client, urls) + + +class AutocompleteTestCase(BaseViewTestCase): + @classmethod + def setUpTestData(cls) -> None: + super().setUpTestData() + cls.atcmplt_accs = reverse("accounts-autocomplete") + cls.atcmplt_interventions = reverse("interventions-autocomplete") + cls.atcmplt_code_comp_action = reverse("codes-compensation-action-autocomplete") + cls.atcmplt_code_comp_funding = reverse("codes-compensation-funding-autocomplete") + cls.atcmplt_code_comp_biotope = reverse("codes-biotope-autocomplete") + cls.atcmplt_code_comp_law = reverse("codes-law-autocomplete") + cls.atcmplt_code_comp_process = reverse("codes-process-type-autocomplete") + cls.atcmplt_code_comp_reg_off = reverse("codes-registration-office-autocomplete") + cls.atcmplt_code_comp_cons_off = reverse("codes-conservation-office-autocomplete") + + def _test_views_anonymous_user(self): + # ATTENTION: As of the current state of django-autocomplete-light, there is no way to check on authenticated + # users in a way like @loing_required or anything else. The documentation considers to check on the user's + # authentication state during get_queryset() of the call. Therefore this test method here will stay here + # for future clarification but won't be run due to the prefix '_' + # User not logged in + client = Client() + urls = [ + self.atcmplt_accs, + self.atcmplt_interventions, + self.atcmplt_code_comp_action, + self.atcmplt_code_comp_funding, + self.atcmplt_code_comp_biotope, + self.atcmplt_code_comp_law, + self.atcmplt_code_comp_process, + self.atcmplt_code_comp_reg_off, + self.atcmplt_code_comp_cons_off, + ] + self.assert_url_fail(client, urls) + + def test_views_logged_in_no_groups(self): + # User logged in + client = Client() + client.login(username=self.superuser.username, password=self.superuser_pw) + self.superuser.groups.set([]) + urls = [ + self.atcmplt_accs, + self.atcmplt_interventions, + self.atcmplt_code_comp_action, + self.atcmplt_code_comp_funding, + self.atcmplt_code_comp_biotope, + self.atcmplt_code_comp_law, + self.atcmplt_code_comp_process, + self.atcmplt_code_comp_reg_off, + self.atcmplt_code_comp_cons_off, + ] + self.assert_url_success(client, urls) + + +class BaseWorkflowTestCase(BaseTestCase): + """ + Holds base methods and attributes for workflow testing + + """ + + client_user = None + client_anon = None + + class Meta: + abstract = True + + @classmethod + def setUpTestData(cls): + super().setUpTestData() + + def setUp(self) -> None: + """ Setup data before each test run + + Returns: + + """ + # Set the default group as only group for the user + default_group = self.groups.get(name=DEFAULT_GROUP) + self.superuser.groups.set([default_group]) + + # Create fresh logged in client and a non-logged in client (anon) for each test + self.client_user = Client() + self.client_user.login(username=self.superuser.username, password=self.superuser_pw) + self.client_anon = Client() + + def assert_object_is_deleted(self, obj): + """ Provides a quick check whether an object has been removed from the database or not + + Args: + obj (): + + Returns: + + """ + # Expect the object to be gone from the db + try: + obj.refresh_from_db() + # Well, we should not reach this next line of code, since the object should be gone, therefore not + # refreshable -> fail! + self.fail() + except ObjectDoesNotExist: + # If we get in here, the test was fine + pass \ No newline at end of file diff --git a/konova/utils/message_templates.py b/konova/utils/message_templates.py index ae29853..7048f2c 100644 --- a/konova/utils/message_templates.py +++ b/konova/utils/message_templates.py @@ -16,4 +16,7 @@ DATA_UNSHARED = _("This data is not shared with you") DATA_UNSHARED_EXPLANATION = _("Remember: This data has not been shared with you, yet. This means you can only read but can not edit or perform any actions like running a check or recording.") MISSING_GROUP_PERMISSION = _("You need to be part of another user group.") -CHECKED_RECORDED_RESET = _("Status of Checked and Recorded reseted") \ No newline at end of file +CHECKED_RECORDED_RESET = _("Status of Checked and Recorded reseted") + +# ECO ACCOUNT +CANCEL_ACC_RECORDED_OR_DEDUCTED = _("Action canceled. Eco account is recorded or deductions exist. Only conservation office member can perform this action.") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index ac89391..c29cb37 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,7 @@ django-tables2==2.3.4 et-xmlfile==1.1.0 idna==2.10 importlib-metadata==2.1.1 -itsdangerous==0.24 +itsdangerous<1.0.0 openpyxl==3.0.9 psycopg2-binary==2.9.1 pytz==2020.4