# WIP: OAuth draft implementation
* first working client implementation of oauth workflow for logging in users
This commit is contained in:
		
							parent
							
								
									9d4a9bd122
								
							
						
					
					
						commit
						b00dd5bcc8
					
				@ -49,7 +49,7 @@ CSRF_TRUSTED_ORIGINS = [
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
# Authentication settings
 | 
			
		||||
LOGIN_URL = "/login/"
 | 
			
		||||
LOGIN_URL = "/oauth/login/"
 | 
			
		||||
 | 
			
		||||
# Session settings
 | 
			
		||||
SESSION_COOKIE_AGE = 60 * 60  # 60 minutes
 | 
			
		||||
 | 
			
		||||
@ -5,9 +5,18 @@ Contact: michel.peltriaux@sgdnord.rlp.de
 | 
			
		||||
Created on: 31.01.22
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import random
 | 
			
		||||
import string
 | 
			
		||||
 | 
			
		||||
# SSO settings
 | 
			
		||||
SSO_SERVER_BASE = "http://127.0.0.1:8000/"
 | 
			
		||||
SSO_SERVER = f"{SSO_SERVER_BASE}sso/"
 | 
			
		||||
SSO_PRIVATE_KEY = "QuziFeih7U8DZvQQ1riPv2MXz0ZABupHED9wjoqZAqeMQaqkqTfxJDRXgSIyASwJ"
 | 
			
		||||
SSO_PUBLIC_KEY = "AGGK7E8eT5X5u2GD38ygGG3GpAefmIldJiiWW7gldRPqCG1CzmUfGdvPSGDbEY2n"
 | 
			
		||||
SSO_PRIVATE_KEY = "CHANGE_ME"
 | 
			
		||||
SSO_PUBLIC_KEY = "CHANGE_ME"
 | 
			
		||||
 | 
			
		||||
# OAuth
 | 
			
		||||
OAUTH_CODE_VERIFIER = ''.join(
 | 
			
		||||
    random.choice(
 | 
			
		||||
        string.ascii_uppercase + string.digits
 | 
			
		||||
    ) for _ in range(random.randint(43, 128))
 | 
			
		||||
)
 | 
			
		||||
@ -23,11 +23,14 @@ from konova.views.logout import LogoutView
 | 
			
		||||
from konova.views.geometry import GeomParcelsView, GeomParcelsContentView
 | 
			
		||||
from konova.views.home import HomeView
 | 
			
		||||
from konova.views.map_proxy import ClientProxyParcelSearch, ClientProxyParcelWFS
 | 
			
		||||
from konova.views.oauth import OAuthLoginView, OAuthCallbackView
 | 
			
		||||
 | 
			
		||||
sso_client = KonovaSSOClient(SSO_SERVER, SSO_PUBLIC_KEY, SSO_PRIVATE_KEY)
 | 
			
		||||
urlpatterns = [
 | 
			
		||||
    path('admin/', admin.site.urls),
 | 
			
		||||
    path('login/', include(sso_client.get_urls())),
 | 
			
		||||
    path('oauth/callback/', OAuthCallbackView.as_view(), name="oauth-callback"),
 | 
			
		||||
    path('oauth/login/', OAuthLoginView.as_view(), name="oauth-login"),
 | 
			
		||||
    path('logout/', LogoutView.as_view(), name="logout"),
 | 
			
		||||
    path('', HomeView.as_view(), name="home"),
 | 
			
		||||
    path('intervention/', include("intervention.urls")),
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										117
									
								
								konova/views/oauth.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										117
									
								
								konova/views/oauth.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,117 @@
 | 
			
		||||
"""
 | 
			
		||||
Author: Michel Peltriaux
 | 
			
		||||
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
 | 
			
		||||
Contact: ksp-servicestelle@sgdnord.rlp.de
 | 
			
		||||
Created on: 26.04.24
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import base64
 | 
			
		||||
import hashlib
 | 
			
		||||
import json
 | 
			
		||||
from urllib.parse import urlencode
 | 
			
		||||
 | 
			
		||||
import requests
 | 
			
		||||
from django.contrib.auth import login
 | 
			
		||||
from django.http import HttpRequest
 | 
			
		||||
from django.shortcuts import redirect
 | 
			
		||||
from django.urls import reverse
 | 
			
		||||
from django.views import View
 | 
			
		||||
 | 
			
		||||
from konova.sub_settings.sso_settings import SSO_SERVER_BASE, OAUTH_CODE_VERIFIER
 | 
			
		||||
from user.models import User
 | 
			
		||||
 | 
			
		||||
OAUTH_CLIENT_ID = "CHANGE_ME"
 | 
			
		||||
OAUTH_CLIENT_SECRET = "CHANGE_ME"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OAuthCallbackView(View):
 | 
			
		||||
    """
 | 
			
		||||
    Callback view for a OAuth2.0 authentication token.
 | 
			
		||||
    Authentication tokens need to be exchanged for the access token.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def get(self, request: HttpRequest, *args, **kwargs):
 | 
			
		||||
        authentication_code = request.GET.get("code")
 | 
			
		||||
        oauth_acces_token_url = f"{SSO_SERVER_BASE}o/token/"
 | 
			
		||||
 | 
			
		||||
        next_callback_url = request.build_absolute_uri(
 | 
			
		||||
            reverse(
 | 
			
		||||
                "oauth-callback"
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        params = {
 | 
			
		||||
            "grant_type": "authorization_code",
 | 
			
		||||
            "code": authentication_code,
 | 
			
		||||
            "redirect_uri": next_callback_url,
 | 
			
		||||
            "code_verifier": OAUTH_CODE_VERIFIER,
 | 
			
		||||
            "client_id": OAUTH_CLIENT_ID,
 | 
			
		||||
            "client_secret": OAUTH_CLIENT_SECRET
 | 
			
		||||
        }
 | 
			
		||||
        access_code_response = requests.post(
 | 
			
		||||
            oauth_acces_token_url,
 | 
			
		||||
            data=params
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        access_code_response_body = access_code_response.content.decode("utf-8")
 | 
			
		||||
        status_code_invalid = access_code_response.status_code != 200
 | 
			
		||||
        if status_code_invalid:
 | 
			
		||||
            raise RuntimeError(f"OAuth access token could not be fetched: {access_code_response.text}")
 | 
			
		||||
 | 
			
		||||
        access_code_response_body = json.loads(access_code_response_body)
 | 
			
		||||
        access_token = access_code_response_body.get("access_token")
 | 
			
		||||
        if not access_token:
 | 
			
		||||
            raise RuntimeError(f"Access token response contained no token: {access_code_response_body}")
 | 
			
		||||
 | 
			
		||||
        user = User.oauth_get_user(access_token)
 | 
			
		||||
        login(request, user)
 | 
			
		||||
        return redirect("home")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OAuthLoginView(View):
 | 
			
		||||
 | 
			
		||||
    def __create_code_challenge(self):
 | 
			
		||||
        """
 | 
			
		||||
        Creates a code verifier and code challenge for extra security.
 | 
			
		||||
        See https://django-oauth-toolkit.readthedocs.io/en/latest/getting_started.html#authorization-code for further
 | 
			
		||||
        information
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        code_verifier = OAUTH_CODE_VERIFIER
 | 
			
		||||
 | 
			
		||||
        code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
 | 
			
		||||
        code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8').replace('=', '')
 | 
			
		||||
        return code_verifier, code_challenge
 | 
			
		||||
 | 
			
		||||
    def get(self, request: HttpRequest, *args, **kwargs):
 | 
			
		||||
        """ Redirects user to OAuth SSO webservice
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            request ():
 | 
			
		||||
            *args ():
 | 
			
		||||
            **kwargs ():
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        oauth_authentication_code_url = f"{SSO_SERVER_BASE}o/authorize/"
 | 
			
		||||
        code_verifier, code_challenge = self.__create_code_challenge()
 | 
			
		||||
        print(code_verifier)
 | 
			
		||||
 | 
			
		||||
        urlencode_params = urlencode(
 | 
			
		||||
            {
 | 
			
		||||
                "response_type": "code",
 | 
			
		||||
                "code_challenge": code_challenge,
 | 
			
		||||
                "code_challenge_method": "S256",
 | 
			
		||||
                "client_id": OAUTH_CLIENT_ID,
 | 
			
		||||
                "redirect_uri": request.build_absolute_uri(
 | 
			
		||||
                    reverse(
 | 
			
		||||
                        "oauth-callback"
 | 
			
		||||
                    )
 | 
			
		||||
                ),
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
        url = f"{oauth_authentication_code_url}?{urlencode_params}"
 | 
			
		||||
        return redirect(url)
 | 
			
		||||
@ -5,12 +5,16 @@ Contact: michel.peltriaux@sgdnord.rlp.de
 | 
			
		||||
Created on: 15.11.21
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
import requests
 | 
			
		||||
from django.contrib.auth.models import AbstractUser
 | 
			
		||||
 | 
			
		||||
from django.db import models
 | 
			
		||||
 | 
			
		||||
from api.models import APIUserToken
 | 
			
		||||
from konova.settings import ZB_GROUP, DEFAULT_GROUP, ETS_GROUP
 | 
			
		||||
from konova.sub_settings.sso_settings import SSO_SERVER_BASE
 | 
			
		||||
from konova.utils.mailer import Mailer
 | 
			
		||||
from user.enums import UserNotificationEnum
 | 
			
		||||
 | 
			
		||||
@ -214,4 +218,41 @@ class User(AbstractUser):
 | 
			
		||||
        shared_teams = self.teams.filter(
 | 
			
		||||
            deleted__isnull=True
 | 
			
		||||
        )
 | 
			
		||||
        return shared_teams
 | 
			
		||||
        return shared_teams
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _oauth_update_user(user_data: dict):
 | 
			
		||||
        username = user_data.get("username")
 | 
			
		||||
        user, is_created = User.objects.get_or_create(
 | 
			
		||||
            username=username
 | 
			
		||||
        )
 | 
			
		||||
        if is_created:
 | 
			
		||||
            user.set_unusable_password()
 | 
			
		||||
 | 
			
		||||
        user.first_name = user_data.get("first_name")
 | 
			
		||||
        user.last_name = user_data.get("last_name")
 | 
			
		||||
        user.email = user_data.get("email")
 | 
			
		||||
 | 
			
		||||
        return user
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def oauth_get_user(oauth_access_token: str):
 | 
			
		||||
        url = f"{SSO_SERVER_BASE}users/oauth/data"
 | 
			
		||||
 | 
			
		||||
        response = requests.get(
 | 
			
		||||
            url,
 | 
			
		||||
            headers={
 | 
			
		||||
                "Authorization":f"Bearer {oauth_access_token}",
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        is_response_code_invalid = response.status_code != 200
 | 
			
		||||
        if is_response_code_invalid:
 | 
			
		||||
            raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
 | 
			
		||||
 | 
			
		||||
        response_content = response.content.decode("utf-8")
 | 
			
		||||
        response_content = json.loads(response_content)
 | 
			
		||||
        user = User._oauth_update_user(response_content)
 | 
			
		||||
 | 
			
		||||
        return user
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user