395_OAuth2_refactoring #396

Merged
mpeltriaux merged 5 commits from 395_OAuth2_refactoring into master 5 months ago

@ -1,6 +1,6 @@
from django.contrib import admin
from api.models.token import APIUserToken
from api.models.token import APIUserToken, OAuthToken
class APITokenAdmin(admin.ModelAdmin):
@ -17,4 +17,17 @@ class APITokenAdmin(admin.ModelAdmin):
]
class OAuthTokenAdmin(admin.ModelAdmin):
list_display = [
"access_token",
"refresh_token",
"expires_on",
]
search_fields = [
"access_token",
"refresh_token",
]
admin.site.register(APIUserToken, APITokenAdmin)
admin.site.register(OAuthToken, OAuthTokenAdmin)

@ -0,0 +1,26 @@
# Generated by Django 5.0.4 on 2024-04-30 07:20
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0002_alter_apiusertoken_valid_until'),
]
operations = [
migrations.CreateModel(
name='OAuthToken',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('access_token', models.CharField(db_comment='OAuth access token', max_length=255)),
('refresh_token', models.CharField(db_comment='OAuth refresh token', max_length=255)),
('expires_on', models.DateTimeField(db_comment='When the token will be expired')),
],
options={
'abstract': False,
},
),
]

@ -1,7 +1,14 @@
import json
from datetime import timedelta
import requests
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.utils import timezone
from django.utils.timezone import now
from konova.models import UuidModel
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, SSO_SERVER_BASE
from konova.utils.generators import generate_token
@ -46,3 +53,105 @@ class APIUserToken(models.Model):
except ObjectDoesNotExist:
raise PermissionError("Credentials invalid")
return token_obj.user
class OAuthToken(UuidModel):
access_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth access token"
)
refresh_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth refresh token"
)
expires_on = models.DateTimeField(
db_comment="When the token will be expired"
)
ASSUMED_LATENCY = 1000 # assumed latency between creation and receiving of an access token
def __str__(self):
return str(self.access_token)
@staticmethod
def from_access_token_response(access_token_data: str, received_on):
"""
Creates an OAuthToken based on retrieved access token data (OAuth2.0 specification)
Args:
access_token_data (str): OAuth2.0 response data
received_on (): Timestamp when the response has been received
Returns:
"""
oauth_token = OAuthToken()
data = json.loads(access_token_data)
oauth_token.access_token = data.get("access_token")
oauth_token.refresh_token = data.get("refresh_token")
expires_on = received_on + timedelta(
seconds=(data.get("expires_in") + OAuthToken.ASSUMED_LATENCY)
)
oauth_token.expires_on = expires_on
return oauth_token
def refresh(self):
url = f"{SSO_SERVER_BASE}o/token/"
params = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
}
response = requests.post(
url,
params
)
_now = now()
is_response_invalid = response.status_code != 200
if is_response_invalid:
raise RuntimeError(f"Refreshing token not possible: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
access_token = response_content.get("access_token")
refresh_token = response_content.get("refresh_token")
expires_in = response_content.get("expires")
self.access_token = access_token
self.refresh_token = refresh_token
self.expires_in = expires_in
self.save()
return self
def update_and_get_user(self):
from user.models import User
url = f"{SSO_SERVER_BASE}users/oauth/data/"
access_token = self.access_token
response = requests.get(
url,
headers={
"Authorization": f"Bearer {access_token}",
}
)
is_response_code_invalid = response.status_code != 200
if is_response_code_invalid:
raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
user = User.oauth_update_user(response_content)
return user

@ -49,7 +49,7 @@ CSRF_TRUSTED_ORIGINS = [
]
# Authentication settings
LOGIN_URL = "/login/"
LOGIN_URL = "/oauth/login/"
# Session settings
SESSION_COOKIE_AGE = 60 * 60 # 60 minutes

@ -5,9 +5,21 @@ Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 31.01.22
"""
import random
import string
# SSO settings
# Django-simple-SSO settings
SSO_SERVER_BASE = "http://127.0.0.1:8000/"
SSO_SERVER = f"{SSO_SERVER_BASE}sso/"
SSO_PRIVATE_KEY = "QuziFeih7U8DZvQQ1riPv2MXz0ZABupHED9wjoqZAqeMQaqkqTfxJDRXgSIyASwJ"
SSO_PUBLIC_KEY = "AGGK7E8eT5X5u2GD38ygGG3GpAefmIldJiiWW7gldRPqCG1CzmUfGdvPSGDbEY2n"
SSO_PRIVATE_KEY = "CHANGE_ME"
SSO_PUBLIC_KEY = "CHANGE_ME"
# OAuth settings
OAUTH_CODE_VERIFIER = ''.join(
random.choice(
string.ascii_uppercase + string.digits
) for _ in range(random.randint(43, 128))
)
OAUTH_CLIENT_ID = "CHANGE_ME"
OAUTH_CLIENT_SECRET = "CHANGE_ME"

@ -509,7 +509,7 @@ class BaseViewTestCase(BaseTestCase):
def setUp(self) -> None:
super().setUp()
self.login_url = reverse("simple-sso-login")
self.login_url = reverse("oauth-login")
def assert_url_success(self, client: Client, urls: list):
""" Assert for all given urls a direct 200 response

@ -23,11 +23,14 @@ from konova.views.logout import LogoutView
from konova.views.geometry import GeomParcelsView, GeomParcelsContentView
from konova.views.home import HomeView
from konova.views.map_proxy import ClientProxyParcelSearch, ClientProxyParcelWFS
from konova.views.oauth import OAuthLoginView, OAuthCallbackView
sso_client = KonovaSSOClient(SSO_SERVER, SSO_PUBLIC_KEY, SSO_PRIVATE_KEY)
urlpatterns = [
path('admin/', admin.site.urls),
path('login/', include(sso_client.get_urls())),
path('oauth/callback/', OAuthCallbackView.as_view(), name="oauth-callback"),
path('oauth/login/', OAuthLoginView.as_view(), name="oauth-login"),
path('logout/', LogoutView.as_view(), name="logout"),
path('', HomeView.as_view(), name="home"),
path('intervention/', include("intervention.urls")),

@ -0,0 +1,130 @@
"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: ksp-servicestelle@sgdnord.rlp.de
Created on: 26.04.24
"""
import base64
import hashlib
from urllib.parse import urlencode
import requests
from django.contrib.auth import login
from django.http import HttpRequest
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.timezone import now
from django.views import View
from api.models import OAuthToken
from konova.sub_settings.sso_settings import SSO_SERVER_BASE, OAUTH_CODE_VERIFIER, OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET
class OAuthLoginView(View):
"""
Starts OAuth Login procedure
-> AnonymousUser is redirected to SSO component using specific parameters
-> After successful login (in SSO component), user will be redirected to a specific callback url (OAuthCallbackView)
-> Callback view uses retrieved authorization token to get a proper access token from SSO component
-> SSO component answers with access token
-> OAuthCallbackView uses token in Authorization header to access user data of logged-in user in SSO component
-> OAuthCallbackView creates/updates user
-> OAuthCallbackView logs in user and redirects to default home view
"""
def __create_code_challenge(self):
"""
Creates a code verifier and code challenge for extra security.
See https://django-oauth-toolkit.readthedocs.io/en/latest/getting_started.html#authorization-code for further
information
Returns:
"""
code_verifier = OAUTH_CODE_VERIFIER
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8').replace('=', '')
return code_verifier, code_challenge
def get(self, request: HttpRequest, *args, **kwargs):
"""
Redirects user to OAuth SSO webservice for credential based login there
Args:
request ():
*args ():
**kwargs ():
Returns:
"""
oauth_authentication_code_url = f"{SSO_SERVER_BASE}o/authorize/"
code_verifier, code_challenge = self.__create_code_challenge()
urlencode_params = urlencode(
{
"response_type": "code",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"client_id": OAUTH_CLIENT_ID,
"redirect_uri": request.build_absolute_uri(
reverse(
"oauth-callback"
)
),
}
)
url = f"{oauth_authentication_code_url}?{urlencode_params}"
return redirect(url)
class OAuthCallbackView(View):
"""
Callback view for OAuth2.0 authentication token.
Authentication tokens will be exchanged for access token.
Access Token will be used for fetching user data from SSO component.
User data will be used for creating/updating user data inside this app.
User will be logged-in and redirected to default home view.
"""
def get(self, request: HttpRequest, *args, **kwargs):
authentication_code = request.GET.get("code")
oauth_acces_token_url = f"{SSO_SERVER_BASE}o/token/"
callback_url = request.build_absolute_uri(
reverse(
"oauth-callback"
)
)
params = {
"grant_type": "authorization_code",
"code": authentication_code,
"redirect_uri": callback_url,
"code_verifier": OAUTH_CODE_VERIFIER,
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
}
access_code_response = requests.post(
oauth_acces_token_url,
data=params
)
received_on = now()
access_code_response_body = access_code_response.content.decode("utf-8")
status_code_invalid = access_code_response.status_code != 200
if status_code_invalid:
raise RuntimeError(f"OAuth access token could not be fetched: {access_code_response.text}")
oauth_access_token = OAuthToken.from_access_token_response(access_code_response_body, received_on)
oauth_access_token.save()
user = oauth_access_token.update_and_get_user()
user.oauth_replace_token(oauth_access_token)
login(request, user)
return redirect("home")

@ -6,6 +6,7 @@ billiard==4.2.0
cached-property==1.5.2
celery==5.4.0rc2
certifi==2024.2.2
cffi==1.16.0
chardet==5.2.0
charset-normalizer==3.3.2
click==8.1.7
@ -13,6 +14,7 @@ click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
coverage==7.4.4
cryptography==42.0.5
Deprecated==1.2.14
Django==5.0.4
django-autocomplete-light==3.11.0
@ -22,19 +24,24 @@ django-debug-toolbar==4.3.0
django-environ==0.11.2
django-filter==24.2
django-fontawesome-5==1.0.18
django-oauth-toolkit==2.3.0
django-simple-sso==1.2.0
django-tables2==2.7.0
et-xmlfile==1.1.0
idna==3.7
importlib_metadata==7.1.0
itsdangerous==2.1.2
itsdangerous==0.24
jwcrypto==1.5.6
kombu==5.3.7
oauthlib==3.2.2
openpyxl==3.2.0b1
packaging==24.0
pika==1.3.2
pillow==10.2.0
prompt-toolkit==3.0.43
psycopg==3.1.18
psycopg-binary==3.1.18
pycparser==2.22
pyparsing==3.1.2
pypng==0.20220715.0
pyproj==3.6.1

@ -29,6 +29,7 @@ class UserAdmin(admin.ModelAdmin):
"is_staff",
"is_superuser",
"api_token",
"oauth_token",
"groups",
"notifications",
"date_joined",

@ -0,0 +1,20 @@
# Generated by Django 5.0.4 on 2024-04-30 07:20
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0003_oauthtoken'),
('user', '0008_alter_user_id'),
]
operations = [
migrations.AddField(
model_name='user',
name='oauth_token',
field=models.ForeignKey(blank=True, db_comment='OAuth token for the user', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to='api.oauthtoken'),
),
]

@ -9,7 +9,7 @@ from django.contrib.auth.models import AbstractUser
from django.db import models
from api.models import APIUserToken
from api.models import APIUserToken, OAuthToken
from konova.settings import ZB_GROUP, DEFAULT_GROUP, ETS_GROUP
from konova.utils.mailer import Mailer
from user.enums import UserNotificationEnum
@ -24,6 +24,14 @@ class User(AbstractUser):
help_text="The user's API token",
on_delete=models.SET_NULL
)
oauth_token = models.ForeignKey(
"api.OAuthToken",
blank=True,
null=True,
on_delete=models.SET_NULL,
db_comment="OAuth token for the user",
related_name="+"
)
def is_notification_setting_set(self, notification_enum: UserNotificationEnum):
return self.notifications.filter(
@ -214,4 +222,46 @@ class User(AbstractUser):
shared_teams = self.teams.filter(
deleted__isnull=True
)
return shared_teams
return shared_teams
@staticmethod
def oauth_update_user(user_data: dict):
"""
Get or create a user depending on given user_data.
If the user record already exists, it's data will be updated using user_data.
Args:
user_data (dict): User data from OAuth SSO component
Returns:
user (User): The resolved user
"""
username = user_data.get("username")
user, is_created = User.objects.get_or_create(
username=username
)
if is_created:
user.set_unusable_password()
user.first_name = user_data.get("first_name")
user.last_name = user_data.get("last_name")
user.email = user_data.get("email")
return user
def oauth_replace_token(self, token: OAuthToken):
"""
Drops old token (if existing) and stores given token.
Args:
token (OAuthToken): New token
Returns:
user (User)
"""
if self.oauth_token:
self.oauth_token.delete()
self.oauth_token = token
self.save()
return self

@ -9,11 +9,13 @@ from django.urls import path
from user.autocomplete.share import ShareUserAutocomplete, ShareTeamAutocomplete
from user.autocomplete.team import TeamAdminAutocomplete
from user.views import *
from user.views.propagate import PropagateUserView
from user.views.views import *
app_name = "user"
urlpatterns = [
path("", index_view, name="index"),
path("propagate/", PropagateUserView.as_view(), name="propagate"),
path("notifications/", notifications_view, name="notifications"),
path("token/api", api_token_view, name="api-token"),
path("contact/<id>", contact_view, name="contact"),

@ -0,0 +1,7 @@
"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: ksp-servicestelle@sgdnord.rlp.de
Created on: 10.05.24
"""

@ -0,0 +1,69 @@
"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: ksp-servicestelle@sgdnord.rlp.de
Created on: 10.05.24
"""
import base64
import hashlib
import json
from cryptography.fernet import Fernet
from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpRequest, JsonResponse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID
from user.models import User
class PropagateUserView(View):
""" Receives user data to be stored in db
Used if new user gets access to application and needs to be created in application before first login (e.g.
proper rights management)
"""
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
def post(self, request: HttpRequest, *args, **kwargs):
# Decrypt
encrypted_body = request.body
hash = hashlib.md5()
hash.update(OAUTH_CLIENT_ID.encode("utf-8"))
key = base64.urlsafe_b64encode(hash.hexdigest().encode("utf-8"))
fernet = Fernet(key)
body = fernet.decrypt(encrypted_body).decode("utf-8")
body = json.loads(body)
try:
status = "updated"
user = User.objects.get(username=body.get('username'))
# Update user data, excluding some changes
skipable_attrs = {
"username",
"is_staff",
"is_superuser",
}
for _attr, _val in body.items():
if _attr in skipable_attrs:
continue
setattr(user, _attr, _val)
except ObjectDoesNotExist:
user = User(**body)
status = "created"
user.set_unusable_password()
user.save()
data = {
"success": True,
"status": status
}
return JsonResponse(data)
Loading…
Cancel
Save