# OAuth migrations

* adds migrations for storing OAuthToken
* adds OAuthToken model
* adds OAuthToken admin
* adds user migration for Fkey relation to OAuthToken
pull/396/head
mpeltriaux 5 months ago
parent be373e2b14
commit 5bd0a161ff

@ -1,6 +1,6 @@
from django.contrib import admin
from api.models.token import APIUserToken
from api.models.token import APIUserToken, OAuthToken
class APITokenAdmin(admin.ModelAdmin):
@ -17,4 +17,17 @@ class APITokenAdmin(admin.ModelAdmin):
]
class OAuthTokenAdmin(admin.ModelAdmin):
list_display = [
"access_token",
"refresh_token",
"expires_on",
]
search_fields = [
"access_token",
"refresh_token",
]
admin.site.register(APIUserToken, APITokenAdmin)
admin.site.register(OAuthToken, OAuthTokenAdmin)

@ -0,0 +1,26 @@
# Generated by Django 5.0.4 on 2024-04-30 07:20
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0002_alter_apiusertoken_valid_until'),
]
operations = [
migrations.CreateModel(
name='OAuthToken',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('access_token', models.CharField(db_comment='OAuth access token', max_length=255)),
('refresh_token', models.CharField(db_comment='OAuth refresh token', max_length=255)),
('expires_on', models.DateTimeField(db_comment='When the token will be expired')),
],
options={
'abstract': False,
},
),
]

@ -1,7 +1,14 @@
import json
from datetime import timedelta
import requests
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.utils import timezone
from django.utils.timezone import now
from konova.models import UuidModel
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, SSO_SERVER_BASE
from konova.utils.generators import generate_token
@ -46,3 +53,105 @@ class APIUserToken(models.Model):
except ObjectDoesNotExist:
raise PermissionError("Credentials invalid")
return token_obj.user
class OAuthToken(UuidModel):
access_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth access token"
)
refresh_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth refresh token"
)
expires_on = models.DateTimeField(
db_comment="When the token will be expired"
)
ASSUMED_LATENCY = 1000 # assumed latency between creation and receiving of an access token
def __str__(self):
return str(self.access_token)
@staticmethod
def from_access_token_response(access_token_data: str, received_on):
"""
Creates an OAuthToken based on retrieved access token data (OAuth2.0 specification)
Args:
access_token_data (str): OAuth2.0 response data
received_on (): Timestamp when the response has been received
Returns:
"""
oauth_token = OAuthToken()
data = json.loads(access_token_data)
oauth_token.access_token = data.get("access_token")
oauth_token.refresh_token = data.get("refresh_token")
expires_on = received_on + timedelta(
seconds=(data.get("expires_in") + OAuthToken.ASSUMED_LATENCY)
)
oauth_token.expires_on = expires_on
return oauth_token
def refresh(self):
url = f"{SSO_SERVER_BASE}o/token/"
params = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
}
response = requests.post(
url,
params
)
_now = now()
is_response_invalid = response.status_code != 200
if is_response_invalid:
raise RuntimeError(f"Refreshing token not possible: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
access_token = response_content.get("access_token")
refresh_token = response_content.get("refresh_token")
expires_in = response_content.get("expires")
self.access_token = access_token
self.refresh_token = refresh_token
self.expires_in = expires_in
self.save()
return self
def update_and_get_user(self):
from user.models import User
url = f"{SSO_SERVER_BASE}users/oauth/data/"
access_token = self.access_token
response = requests.get(
url,
headers={
"Authorization": f"Bearer {access_token}",
}
)
is_response_code_invalid = response.status_code != 200
if is_response_code_invalid:
raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
user = User.oauth_update_user(response_content)
return user

@ -7,7 +7,6 @@ Created on: 26.04.24
"""
import base64
import hashlib
import json
from urllib.parse import urlencode
import requests
@ -15,10 +14,11 @@ from django.contrib.auth import login
from django.http import HttpRequest
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.timezone import now
from django.views import View
from api.models import OAuthToken
from konova.sub_settings.sso_settings import SSO_SERVER_BASE, OAUTH_CODE_VERIFIER, OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET
from user.models import User
class OAuthLoginView(View):
@ -95,7 +95,7 @@ class OAuthCallbackView(View):
authentication_code = request.GET.get("code")
oauth_acces_token_url = f"{SSO_SERVER_BASE}o/token/"
next_callback_url = request.build_absolute_uri(
callback_url = request.build_absolute_uri(
reverse(
"oauth-callback"
)
@ -104,7 +104,7 @@ class OAuthCallbackView(View):
params = {
"grant_type": "authorization_code",
"code": authentication_code,
"redirect_uri": next_callback_url,
"redirect_uri": callback_url,
"code_verifier": OAUTH_CODE_VERIFIER,
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
@ -113,18 +113,18 @@ class OAuthCallbackView(View):
oauth_acces_token_url,
data=params
)
received_on = now()
access_code_response_body = access_code_response.content.decode("utf-8")
status_code_invalid = access_code_response.status_code != 200
if status_code_invalid:
raise RuntimeError(f"OAuth access token could not be fetched: {access_code_response.text}")
access_code_response_body = json.loads(access_code_response_body)
access_token = access_code_response_body.get("access_token")
if not access_token:
raise RuntimeError(f"Access token response contained no token: {access_code_response_body}")
oauth_access_token = OAuthToken.from_access_token_response(access_code_response_body, received_on)
oauth_access_token.save()
user = oauth_access_token.update_and_get_user()
user.oauth_replace_token(oauth_access_token)
user = User.oauth_get_user(access_token)
login(request, user)
return redirect("home")

@ -29,6 +29,7 @@ class UserAdmin(admin.ModelAdmin):
"is_staff",
"is_superuser",
"api_token",
"oauth_token",
"groups",
"notifications",
"date_joined",

@ -0,0 +1,20 @@
# Generated by Django 5.0.4 on 2024-04-30 07:20
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0003_oauthtoken'),
('user', '0008_alter_user_id'),
]
operations = [
migrations.AddField(
model_name='user',
name='oauth_token',
field=models.ForeignKey(blank=True, db_comment='OAuth token for the user', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to='api.oauthtoken'),
),
]

@ -5,16 +5,12 @@ Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 15.11.21
"""
import json
import requests
from django.contrib.auth.models import AbstractUser
from django.db import models
from api.models import APIUserToken
from api.models import APIUserToken, OAuthToken
from konova.settings import ZB_GROUP, DEFAULT_GROUP, ETS_GROUP
from konova.sub_settings.sso_settings import SSO_SERVER_BASE
from konova.utils.mailer import Mailer
from user.enums import UserNotificationEnum
@ -28,6 +24,14 @@ class User(AbstractUser):
help_text="The user's API token",
on_delete=models.SET_NULL
)
oauth_token = models.ForeignKey(
"api.OAuthToken",
blank=True,
null=True,
on_delete=models.SET_NULL,
db_comment="OAuth token for the user",
related_name="+"
)
def is_notification_setting_set(self, notification_enum: UserNotificationEnum):
return self.notifications.filter(
@ -221,11 +225,22 @@ class User(AbstractUser):
return shared_teams
@staticmethod
def _oauth_update_user(user_data: dict):
def oauth_update_user(user_data: dict):
"""
Get or create a user depending on given user_data.
If the user record already exists, it's data will be updated using user_data.
Args:
user_data (dict): User data from OAuth SSO component
Returns:
user (User): The resolved user
"""
username = user_data.get("username")
user, is_created = User.objects.get_or_create(
username=username
)
if is_created:
user.set_unusable_password()
@ -235,24 +250,18 @@ class User(AbstractUser):
return user
@staticmethod
def oauth_get_user(oauth_access_token: str):
url = f"{SSO_SERVER_BASE}users/oauth/data"
response = requests.get(
url,
headers={
"Authorization":f"Bearer {oauth_access_token}",
}
)
is_response_code_invalid = response.status_code != 200
if is_response_code_invalid:
raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
user = User._oauth_update_user(response_content)
def oauth_replace_token(self, token: OAuthToken):
"""
Drops old token (if existing) and stores given token.
return user
Args:
token (OAuthToken): New token
Returns:
user (User)
"""
if self.oauth_token:
self.oauth_token.delete()
self.oauth_token = token
self.save()
return self
Loading…
Cancel
Save