Merge branch 'refs/heads/master' into Docker
# Conflicts: # konova/sub_settings/sso_settings.py # requirements.txt
This commit is contained in:
		
						commit
						c882173e78
					
				
							
								
								
									
										15
									
								
								api/admin.py
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								api/admin.py
									
									
									
									
									
								
							@ -1,6 +1,6 @@
 | 
			
		||||
from django.contrib import admin
 | 
			
		||||
 | 
			
		||||
from api.models.token import APIUserToken
 | 
			
		||||
from api.models.token import APIUserToken, OAuthToken
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class APITokenAdmin(admin.ModelAdmin):
 | 
			
		||||
@ -17,4 +17,17 @@ class APITokenAdmin(admin.ModelAdmin):
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OAuthTokenAdmin(admin.ModelAdmin):
 | 
			
		||||
    list_display = [
 | 
			
		||||
        "access_token",
 | 
			
		||||
        "refresh_token",
 | 
			
		||||
        "expires_on",
 | 
			
		||||
    ]
 | 
			
		||||
    search_fields = [
 | 
			
		||||
        "access_token",
 | 
			
		||||
        "refresh_token",
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
admin.site.register(APIUserToken, APITokenAdmin)
 | 
			
		||||
admin.site.register(OAuthToken, OAuthTokenAdmin)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										26
									
								
								api/migrations/0003_oauthtoken.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										26
									
								
								api/migrations/0003_oauthtoken.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,26 @@
 | 
			
		||||
# Generated by Django 5.0.4 on 2024-04-30 07:20
 | 
			
		||||
 | 
			
		||||
import uuid
 | 
			
		||||
from django.db import migrations, models
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Migration(migrations.Migration):
 | 
			
		||||
 | 
			
		||||
    dependencies = [
 | 
			
		||||
        ('api', '0002_alter_apiusertoken_valid_until'),
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    operations = [
 | 
			
		||||
        migrations.CreateModel(
 | 
			
		||||
            name='OAuthToken',
 | 
			
		||||
            fields=[
 | 
			
		||||
                ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
 | 
			
		||||
                ('access_token', models.CharField(db_comment='OAuth access token', max_length=255)),
 | 
			
		||||
                ('refresh_token', models.CharField(db_comment='OAuth refresh token', max_length=255)),
 | 
			
		||||
                ('expires_on', models.DateTimeField(db_comment='When the token will be expired')),
 | 
			
		||||
        ],
 | 
			
		||||
            options={
 | 
			
		||||
                'abstract': False,
 | 
			
		||||
            },
 | 
			
		||||
        ),
 | 
			
		||||
    ]
 | 
			
		||||
@ -1,7 +1,14 @@
 | 
			
		||||
import json
 | 
			
		||||
from datetime import timedelta
 | 
			
		||||
 | 
			
		||||
import requests
 | 
			
		||||
from django.core.exceptions import ObjectDoesNotExist
 | 
			
		||||
from django.db import models
 | 
			
		||||
from django.utils import timezone
 | 
			
		||||
from django.utils.timezone import now
 | 
			
		||||
 | 
			
		||||
from konova.models import UuidModel
 | 
			
		||||
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, SSO_SERVER_BASE
 | 
			
		||||
from konova.utils.generators import generate_token
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -46,3 +53,105 @@ class APIUserToken(models.Model):
 | 
			
		||||
        except ObjectDoesNotExist:
 | 
			
		||||
            raise PermissionError("Credentials invalid")
 | 
			
		||||
        return token_obj.user
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OAuthToken(UuidModel):
 | 
			
		||||
    access_token = models.CharField(
 | 
			
		||||
        max_length=255,
 | 
			
		||||
        blank=False,
 | 
			
		||||
        null=False,
 | 
			
		||||
        db_comment="OAuth access token"
 | 
			
		||||
    )
 | 
			
		||||
    refresh_token = models.CharField(
 | 
			
		||||
        max_length=255,
 | 
			
		||||
        blank=False,
 | 
			
		||||
        null=False,
 | 
			
		||||
        db_comment="OAuth refresh token"
 | 
			
		||||
    )
 | 
			
		||||
    expires_on = models.DateTimeField(
 | 
			
		||||
        db_comment="When the token will be expired"
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    ASSUMED_LATENCY = 1000  # assumed latency between creation and receiving of an access token
 | 
			
		||||
 | 
			
		||||
    def __str__(self):
 | 
			
		||||
        return str(self.access_token)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def from_access_token_response(access_token_data: str, received_on):
 | 
			
		||||
        """
 | 
			
		||||
        Creates an OAuthToken based on retrieved access token data (OAuth2.0 specification)
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            access_token_data (str): OAuth2.0 response data
 | 
			
		||||
            received_on (): Timestamp when the response has been received
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        oauth_token = OAuthToken()
 | 
			
		||||
        data = json.loads(access_token_data)
 | 
			
		||||
 | 
			
		||||
        oauth_token.access_token = data.get("access_token")
 | 
			
		||||
        oauth_token.refresh_token = data.get("refresh_token")
 | 
			
		||||
 | 
			
		||||
        expires_on = received_on + timedelta(
 | 
			
		||||
            seconds=(data.get("expires_in") + OAuthToken.ASSUMED_LATENCY)
 | 
			
		||||
        )
 | 
			
		||||
        oauth_token.expires_on = expires_on
 | 
			
		||||
 | 
			
		||||
        return oauth_token
 | 
			
		||||
 | 
			
		||||
    def refresh(self):
 | 
			
		||||
        url = f"{SSO_SERVER_BASE}o/token/"
 | 
			
		||||
        params = {
 | 
			
		||||
            "grant_type": "refresh_token",
 | 
			
		||||
            "refresh_token": self.refresh_token,
 | 
			
		||||
            "client_id": OAUTH_CLIENT_ID,
 | 
			
		||||
            "client_secret": OAUTH_CLIENT_SECRET
 | 
			
		||||
        }
 | 
			
		||||
        response = requests.post(
 | 
			
		||||
            url,
 | 
			
		||||
            params
 | 
			
		||||
        )
 | 
			
		||||
        _now = now()
 | 
			
		||||
        is_response_invalid = response.status_code != 200
 | 
			
		||||
        if is_response_invalid:
 | 
			
		||||
            raise RuntimeError(f"Refreshing token not possible: {response.status_code}")
 | 
			
		||||
 | 
			
		||||
        response_content = response.content.decode("utf-8")
 | 
			
		||||
        response_content = json.loads(response_content)
 | 
			
		||||
 | 
			
		||||
        access_token = response_content.get("access_token")
 | 
			
		||||
        refresh_token = response_content.get("refresh_token")
 | 
			
		||||
        expires_in = response_content.get("expires")
 | 
			
		||||
 | 
			
		||||
        self.access_token = access_token
 | 
			
		||||
        self.refresh_token = refresh_token
 | 
			
		||||
        self.expires_in = expires_in
 | 
			
		||||
        self.save()
 | 
			
		||||
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    def update_and_get_user(self):
 | 
			
		||||
        from user.models import User
 | 
			
		||||
        url = f"{SSO_SERVER_BASE}users/oauth/data/"
 | 
			
		||||
 | 
			
		||||
        access_token = self.access_token
 | 
			
		||||
        response = requests.get(
 | 
			
		||||
            url,
 | 
			
		||||
            headers={
 | 
			
		||||
                "Authorization": f"Bearer {access_token}",
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        is_response_code_invalid = response.status_code != 200
 | 
			
		||||
        if is_response_code_invalid:
 | 
			
		||||
            raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
 | 
			
		||||
 | 
			
		||||
        response_content = response.content.decode("utf-8")
 | 
			
		||||
        response_content = json.loads(response_content)
 | 
			
		||||
        user = User.oauth_update_user(response_content)
 | 
			
		||||
 | 
			
		||||
        return user
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -49,7 +49,7 @@ CSRF_TRUSTED_ORIGINS = [
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
# Authentication settings
 | 
			
		||||
LOGIN_URL = "/login/"
 | 
			
		||||
LOGIN_URL = "/oauth/login/"
 | 
			
		||||
 | 
			
		||||
# Session settings
 | 
			
		||||
SESSION_COOKIE_AGE = 60 * 60  # 60 minutes
 | 
			
		||||
 | 
			
		||||
@ -5,11 +5,23 @@ Contact: michel.peltriaux@sgdnord.rlp.de
 | 
			
		||||
Created on: 31.01.22
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
# SSO settings
 | 
			
		||||
import random
 | 
			
		||||
import string
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Django-simple-SSO settings
 | 
			
		||||
SSO_SERVER_BASE = f"http://{os.environ.get('SSO_HOST')}/"
 | 
			
		||||
SSO_SERVER = f"{SSO_SERVER_BASE}sso/"
 | 
			
		||||
SSO_PRIVATE_KEY = "CHANGE_ME"
 | 
			
		||||
SSO_PUBLIC_KEY = "CHANGE_ME"
 | 
			
		||||
 | 
			
		||||
# OAuth settings
 | 
			
		||||
OAUTH_CODE_VERIFIER = ''.join(
 | 
			
		||||
    random.choice(
 | 
			
		||||
        string.ascii_uppercase + string.digits
 | 
			
		||||
    ) for _ in range(random.randint(43, 128))
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
OAUTH_CLIENT_ID = "CHANGE_ME"
 | 
			
		||||
OAUTH_CLIENT_SECRET = "CHANGE_ME"
 | 
			
		||||
@ -509,7 +509,7 @@ class BaseViewTestCase(BaseTestCase):
 | 
			
		||||
        
 | 
			
		||||
    def setUp(self) -> None:
 | 
			
		||||
        super().setUp()
 | 
			
		||||
        self.login_url = reverse("simple-sso-login")
 | 
			
		||||
        self.login_url = reverse("oauth-login")
 | 
			
		||||
 | 
			
		||||
    def assert_url_success(self, client: Client, urls: list):
 | 
			
		||||
        """ Assert for all given urls a direct 200 response
 | 
			
		||||
 | 
			
		||||
@ -23,11 +23,14 @@ from konova.views.logout import LogoutView
 | 
			
		||||
from konova.views.geometry import GeomParcelsView, GeomParcelsContentView
 | 
			
		||||
from konova.views.home import HomeView
 | 
			
		||||
from konova.views.map_proxy import ClientProxyParcelSearch, ClientProxyParcelWFS
 | 
			
		||||
from konova.views.oauth import OAuthLoginView, OAuthCallbackView
 | 
			
		||||
 | 
			
		||||
sso_client = KonovaSSOClient(SSO_SERVER, SSO_PUBLIC_KEY, SSO_PRIVATE_KEY)
 | 
			
		||||
urlpatterns = [
 | 
			
		||||
    path('admin/', admin.site.urls),
 | 
			
		||||
    path('login/', include(sso_client.get_urls())),
 | 
			
		||||
    path('oauth/callback/', OAuthCallbackView.as_view(), name="oauth-callback"),
 | 
			
		||||
    path('oauth/login/', OAuthLoginView.as_view(), name="oauth-login"),
 | 
			
		||||
    path('logout/', LogoutView.as_view(), name="logout"),
 | 
			
		||||
    path('', HomeView.as_view(), name="home"),
 | 
			
		||||
    path('intervention/', include("intervention.urls")),
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										130
									
								
								konova/views/oauth.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										130
									
								
								konova/views/oauth.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,130 @@
 | 
			
		||||
"""
 | 
			
		||||
Author: Michel Peltriaux
 | 
			
		||||
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
 | 
			
		||||
Contact: ksp-servicestelle@sgdnord.rlp.de
 | 
			
		||||
Created on: 26.04.24
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import base64
 | 
			
		||||
import hashlib
 | 
			
		||||
from urllib.parse import urlencode
 | 
			
		||||
 | 
			
		||||
import requests
 | 
			
		||||
from django.contrib.auth import login
 | 
			
		||||
from django.http import HttpRequest
 | 
			
		||||
from django.shortcuts import redirect
 | 
			
		||||
from django.urls import reverse
 | 
			
		||||
from django.utils.timezone import now
 | 
			
		||||
from django.views import View
 | 
			
		||||
 | 
			
		||||
from api.models import OAuthToken
 | 
			
		||||
from konova.sub_settings.sso_settings import SSO_SERVER_BASE, OAUTH_CODE_VERIFIER, OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OAuthLoginView(View):
 | 
			
		||||
    """
 | 
			
		||||
    Starts OAuth Login procedure
 | 
			
		||||
        -> AnonymousUser is redirected to SSO component using specific parameters
 | 
			
		||||
        -> After successful login (in SSO component), user will be redirected to a specific callback url (OAuthCallbackView)
 | 
			
		||||
        -> Callback view uses retrieved authorization token to get a proper access token from SSO component
 | 
			
		||||
        -> SSO component answers with access token
 | 
			
		||||
        -> OAuthCallbackView uses token in Authorization header to access user data of logged-in user in SSO component
 | 
			
		||||
        -> OAuthCallbackView creates/updates user
 | 
			
		||||
        -> OAuthCallbackView logs in user and redirects to default home view
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __create_code_challenge(self):
 | 
			
		||||
        """
 | 
			
		||||
        Creates a code verifier and code challenge for extra security.
 | 
			
		||||
        See https://django-oauth-toolkit.readthedocs.io/en/latest/getting_started.html#authorization-code for further
 | 
			
		||||
        information
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        code_verifier = OAUTH_CODE_VERIFIER
 | 
			
		||||
 | 
			
		||||
        code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
 | 
			
		||||
        code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8').replace('=', '')
 | 
			
		||||
        return code_verifier, code_challenge
 | 
			
		||||
 | 
			
		||||
    def get(self, request: HttpRequest, *args, **kwargs):
 | 
			
		||||
        """
 | 
			
		||||
        Redirects user to OAuth SSO webservice for credential based login there
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            request ():
 | 
			
		||||
            *args ():
 | 
			
		||||
            **kwargs ():
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        oauth_authentication_code_url = f"{SSO_SERVER_BASE}o/authorize/"
 | 
			
		||||
        code_verifier, code_challenge = self.__create_code_challenge()
 | 
			
		||||
 | 
			
		||||
        urlencode_params = urlencode(
 | 
			
		||||
            {
 | 
			
		||||
                "response_type": "code",
 | 
			
		||||
                "code_challenge": code_challenge,
 | 
			
		||||
                "code_challenge_method": "S256",
 | 
			
		||||
                "client_id": OAUTH_CLIENT_ID,
 | 
			
		||||
                "redirect_uri": request.build_absolute_uri(
 | 
			
		||||
                    reverse(
 | 
			
		||||
                        "oauth-callback"
 | 
			
		||||
                    )
 | 
			
		||||
                ),
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
        url = f"{oauth_authentication_code_url}?{urlencode_params}"
 | 
			
		||||
        return redirect(url)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OAuthCallbackView(View):
 | 
			
		||||
    """
 | 
			
		||||
    Callback view for OAuth2.0 authentication token.
 | 
			
		||||
    Authentication tokens will be exchanged for access token.
 | 
			
		||||
    Access Token will be used for fetching user data from SSO component.
 | 
			
		||||
    User data will be used for creating/updating user data inside this app.
 | 
			
		||||
    User will be logged-in and redirected to default home view.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def get(self, request: HttpRequest, *args, **kwargs):
 | 
			
		||||
        authentication_code = request.GET.get("code")
 | 
			
		||||
        oauth_acces_token_url = f"{SSO_SERVER_BASE}o/token/"
 | 
			
		||||
 | 
			
		||||
        callback_url = request.build_absolute_uri(
 | 
			
		||||
            reverse(
 | 
			
		||||
                "oauth-callback"
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        params = {
 | 
			
		||||
            "grant_type": "authorization_code",
 | 
			
		||||
            "code": authentication_code,
 | 
			
		||||
            "redirect_uri": callback_url,
 | 
			
		||||
            "code_verifier": OAUTH_CODE_VERIFIER,
 | 
			
		||||
            "client_id": OAUTH_CLIENT_ID,
 | 
			
		||||
            "client_secret": OAUTH_CLIENT_SECRET
 | 
			
		||||
        }
 | 
			
		||||
        access_code_response = requests.post(
 | 
			
		||||
            oauth_acces_token_url,
 | 
			
		||||
            data=params
 | 
			
		||||
        )
 | 
			
		||||
        received_on = now()
 | 
			
		||||
 | 
			
		||||
        access_code_response_body = access_code_response.content.decode("utf-8")
 | 
			
		||||
        status_code_invalid = access_code_response.status_code != 200
 | 
			
		||||
        if status_code_invalid:
 | 
			
		||||
            raise RuntimeError(f"OAuth access token could not be fetched: {access_code_response.text}")
 | 
			
		||||
 | 
			
		||||
        oauth_access_token = OAuthToken.from_access_token_response(access_code_response_body, received_on)
 | 
			
		||||
        oauth_access_token.save()
 | 
			
		||||
        user = oauth_access_token.update_and_get_user()
 | 
			
		||||
        user.oauth_replace_token(oauth_access_token)
 | 
			
		||||
 | 
			
		||||
        login(request, user)
 | 
			
		||||
        return redirect("home")
 | 
			
		||||
 | 
			
		||||
@ -6,6 +6,7 @@ billiard==4.2.0
 | 
			
		||||
cached-property==1.5.2
 | 
			
		||||
celery==5.4.0rc2
 | 
			
		||||
certifi==2024.2.2
 | 
			
		||||
cffi==1.16.0
 | 
			
		||||
chardet==5.2.0
 | 
			
		||||
charset-normalizer==3.3.2
 | 
			
		||||
click==8.1.7
 | 
			
		||||
@ -13,6 +14,7 @@ click-didyoumean==0.3.1
 | 
			
		||||
click-plugins==1.1.1
 | 
			
		||||
click-repl==0.3.0
 | 
			
		||||
coverage==7.4.4
 | 
			
		||||
cryptography==42.0.5
 | 
			
		||||
Deprecated==1.2.14
 | 
			
		||||
Django==5.0.4
 | 
			
		||||
django-autocomplete-light==3.11.0
 | 
			
		||||
@ -22,19 +24,24 @@ django-debug-toolbar==4.3.0
 | 
			
		||||
django-environ==0.11.2
 | 
			
		||||
django-filter==24.2
 | 
			
		||||
django-fontawesome-5==1.0.18
 | 
			
		||||
django-oauth-toolkit==2.3.0
 | 
			
		||||
django-simple-sso==1.2.0
 | 
			
		||||
django-tables2==2.7.0
 | 
			
		||||
et-xmlfile==1.1.0
 | 
			
		||||
idna==3.7
 | 
			
		||||
importlib_metadata==7.1.0
 | 
			
		||||
itsdangerous<1.0.0
 | 
			
		||||
itsdangerous==0.24
 | 
			
		||||
jwcrypto==1.5.6
 | 
			
		||||
kombu==5.3.7
 | 
			
		||||
oauthlib==3.2.2
 | 
			
		||||
openpyxl==3.2.0b1
 | 
			
		||||
packaging==24.0
 | 
			
		||||
pika==1.3.2
 | 
			
		||||
pillow==10.2.0
 | 
			
		||||
prompt-toolkit==3.0.43
 | 
			
		||||
psycopg==3.1.18
 | 
			
		||||
psycopg-binary==3.1.18
 | 
			
		||||
pycparser==2.22
 | 
			
		||||
pyparsing==3.1.2
 | 
			
		||||
pypng==0.20220715.0
 | 
			
		||||
pyproj==3.6.1
 | 
			
		||||
 | 
			
		||||
@ -29,6 +29,7 @@ class UserAdmin(admin.ModelAdmin):
 | 
			
		||||
        "is_staff",
 | 
			
		||||
        "is_superuser",
 | 
			
		||||
        "api_token",
 | 
			
		||||
        "oauth_token",
 | 
			
		||||
        "groups",
 | 
			
		||||
        "notifications",
 | 
			
		||||
        "date_joined",
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										20
									
								
								user/migrations/0009_user_oauth_token.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								user/migrations/0009_user_oauth_token.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,20 @@
 | 
			
		||||
# Generated by Django 5.0.4 on 2024-04-30 07:20
 | 
			
		||||
 | 
			
		||||
import django.db.models.deletion
 | 
			
		||||
from django.db import migrations, models
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Migration(migrations.Migration):
 | 
			
		||||
 | 
			
		||||
    dependencies = [
 | 
			
		||||
        ('api', '0003_oauthtoken'),
 | 
			
		||||
        ('user', '0008_alter_user_id'),
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    operations = [
 | 
			
		||||
        migrations.AddField(
 | 
			
		||||
            model_name='user',
 | 
			
		||||
            name='oauth_token',
 | 
			
		||||
            field=models.ForeignKey(blank=True, db_comment='OAuth token for the user', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to='api.oauthtoken'),
 | 
			
		||||
        ),
 | 
			
		||||
    ]
 | 
			
		||||
@ -9,7 +9,7 @@ from django.contrib.auth.models import AbstractUser
 | 
			
		||||
 | 
			
		||||
from django.db import models
 | 
			
		||||
 | 
			
		||||
from api.models import APIUserToken
 | 
			
		||||
from api.models import APIUserToken, OAuthToken
 | 
			
		||||
from konova.settings import ZB_GROUP, DEFAULT_GROUP, ETS_GROUP
 | 
			
		||||
from konova.utils.mailer import Mailer
 | 
			
		||||
from user.enums import UserNotificationEnum
 | 
			
		||||
@ -24,6 +24,14 @@ class User(AbstractUser):
 | 
			
		||||
        help_text="The user's API token",
 | 
			
		||||
        on_delete=models.SET_NULL
 | 
			
		||||
    )
 | 
			
		||||
    oauth_token = models.ForeignKey(
 | 
			
		||||
        "api.OAuthToken",
 | 
			
		||||
        blank=True,
 | 
			
		||||
        null=True,
 | 
			
		||||
        on_delete=models.SET_NULL,
 | 
			
		||||
        db_comment="OAuth token for the user",
 | 
			
		||||
        related_name="+"
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    def is_notification_setting_set(self, notification_enum: UserNotificationEnum):
 | 
			
		||||
        return self.notifications.filter(
 | 
			
		||||
@ -214,4 +222,46 @@ class User(AbstractUser):
 | 
			
		||||
        shared_teams = self.teams.filter(
 | 
			
		||||
            deleted__isnull=True
 | 
			
		||||
        )
 | 
			
		||||
        return shared_teams
 | 
			
		||||
        return shared_teams
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def oauth_update_user(user_data: dict):
 | 
			
		||||
        """
 | 
			
		||||
        Get or create a user depending on given user_data.
 | 
			
		||||
        If the user record already exists, it's data will be updated using user_data.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            user_data (dict): User data from OAuth SSO component
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            user (User): The resolved user
 | 
			
		||||
        """
 | 
			
		||||
        username = user_data.get("username")
 | 
			
		||||
        user, is_created = User.objects.get_or_create(
 | 
			
		||||
            username=username
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if is_created:
 | 
			
		||||
            user.set_unusable_password()
 | 
			
		||||
 | 
			
		||||
        user.first_name = user_data.get("first_name")
 | 
			
		||||
        user.last_name = user_data.get("last_name")
 | 
			
		||||
        user.email = user_data.get("email")
 | 
			
		||||
 | 
			
		||||
        return user
 | 
			
		||||
 | 
			
		||||
    def oauth_replace_token(self, token: OAuthToken):
 | 
			
		||||
        """
 | 
			
		||||
        Drops old token (if existing) and stores given token.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            token (OAuthToken): New token
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            user (User)
 | 
			
		||||
        """
 | 
			
		||||
        if self.oauth_token:
 | 
			
		||||
            self.oauth_token.delete()
 | 
			
		||||
        self.oauth_token = token
 | 
			
		||||
        self.save()
 | 
			
		||||
        return self
 | 
			
		||||
@ -9,11 +9,13 @@ from django.urls import path
 | 
			
		||||
 | 
			
		||||
from user.autocomplete.share import ShareUserAutocomplete, ShareTeamAutocomplete
 | 
			
		||||
from user.autocomplete.team import TeamAdminAutocomplete
 | 
			
		||||
from user.views import *
 | 
			
		||||
from user.views.propagate import PropagateUserView
 | 
			
		||||
from user.views.views import *
 | 
			
		||||
 | 
			
		||||
app_name = "user"
 | 
			
		||||
urlpatterns = [
 | 
			
		||||
    path("", index_view, name="index"),
 | 
			
		||||
    path("propagate/", PropagateUserView.as_view(), name="propagate"),
 | 
			
		||||
    path("notifications/", notifications_view, name="notifications"),
 | 
			
		||||
    path("token/api", api_token_view, name="api-token"),
 | 
			
		||||
    path("contact/<id>", contact_view, name="contact"),
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										7
									
								
								user/views/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								user/views/__init__.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,7 @@
 | 
			
		||||
"""
 | 
			
		||||
Author: Michel Peltriaux
 | 
			
		||||
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
 | 
			
		||||
Contact: ksp-servicestelle@sgdnord.rlp.de
 | 
			
		||||
Created on: 10.05.24
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
							
								
								
									
										69
									
								
								user/views/propagate.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								user/views/propagate.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,69 @@
 | 
			
		||||
"""
 | 
			
		||||
Author: Michel Peltriaux
 | 
			
		||||
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
 | 
			
		||||
Contact: ksp-servicestelle@sgdnord.rlp.de
 | 
			
		||||
Created on: 10.05.24
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import base64
 | 
			
		||||
import hashlib
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
from cryptography.fernet import Fernet
 | 
			
		||||
from django.core.exceptions import ObjectDoesNotExist
 | 
			
		||||
from django.http import HttpRequest, JsonResponse
 | 
			
		||||
from django.utils.decorators import method_decorator
 | 
			
		||||
from django.views import View
 | 
			
		||||
from django.views.decorators.csrf import csrf_exempt
 | 
			
		||||
 | 
			
		||||
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID
 | 
			
		||||
from user.models import User
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PropagateUserView(View):
 | 
			
		||||
    """ Receives user data to be stored in db
 | 
			
		||||
 | 
			
		||||
    Used if new user gets access to application and needs to be created in application before first login (e.g.
 | 
			
		||||
    proper rights management)
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    @method_decorator(csrf_exempt)
 | 
			
		||||
    def dispatch(self, request, *args, **kwargs):
 | 
			
		||||
        return super().dispatch(request, *args, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def post(self, request: HttpRequest, *args, **kwargs):
 | 
			
		||||
        # Decrypt
 | 
			
		||||
        encrypted_body = request.body
 | 
			
		||||
        hash = hashlib.md5()
 | 
			
		||||
        hash.update(OAUTH_CLIENT_ID.encode("utf-8"))
 | 
			
		||||
        key = base64.urlsafe_b64encode(hash.hexdigest().encode("utf-8"))
 | 
			
		||||
        fernet = Fernet(key)
 | 
			
		||||
        body = fernet.decrypt(encrypted_body).decode("utf-8")
 | 
			
		||||
        body = json.loads(body)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            status = "updated"
 | 
			
		||||
            user = User.objects.get(username=body.get('username'))
 | 
			
		||||
            # Update user data, excluding some changes
 | 
			
		||||
            skipable_attrs = {
 | 
			
		||||
                "username",
 | 
			
		||||
                "is_staff",
 | 
			
		||||
                "is_superuser",
 | 
			
		||||
            }
 | 
			
		||||
            for _attr, _val in body.items():
 | 
			
		||||
                if _attr in skipable_attrs:
 | 
			
		||||
                    continue
 | 
			
		||||
                setattr(user, _attr, _val)
 | 
			
		||||
        except ObjectDoesNotExist:
 | 
			
		||||
            user = User(**body)
 | 
			
		||||
            status = "created"
 | 
			
		||||
        user.set_unusable_password()
 | 
			
		||||
        user.save()
 | 
			
		||||
 | 
			
		||||
        data = {
 | 
			
		||||
            "success": True,
 | 
			
		||||
            "status": status
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return JsonResponse(data)
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user