# OAuth migrations

* adds migrations for storing OAuthToken
* adds OAuthToken model
* adds OAuthToken admin
* adds user migration for Fkey relation to OAuthToken
This commit is contained in:
2024-04-30 14:56:48 +02:00
parent f135008447
commit 8ff3cb9adc
7 changed files with 213 additions and 35 deletions

View File

@@ -1,6 +1,6 @@
from django.contrib import admin
from api.models.token import APIUserToken
from api.models.token import APIUserToken, OAuthToken
class APITokenAdmin(admin.ModelAdmin):
@@ -17,4 +17,17 @@ class APITokenAdmin(admin.ModelAdmin):
]
class OAuthTokenAdmin(admin.ModelAdmin):
list_display = [
"access_token",
"refresh_token",
"expires_on",
]
search_fields = [
"access_token",
"refresh_token",
]
admin.site.register(APIUserToken, APITokenAdmin)
admin.site.register(OAuthToken, OAuthTokenAdmin)

View File

@@ -0,0 +1,26 @@
# Generated by Django 5.0.4 on 2024-04-30 07:20
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0002_alter_apiusertoken_valid_until'),
]
operations = [
migrations.CreateModel(
name='OAuthToken',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('access_token', models.CharField(db_comment='OAuth access token', max_length=255)),
('refresh_token', models.CharField(db_comment='OAuth refresh token', max_length=255)),
('expires_on', models.DateTimeField(db_comment='When the token will be expired')),
],
options={
'abstract': False,
},
),
]

View File

@@ -1,7 +1,14 @@
import json
from datetime import timedelta
import requests
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.utils import timezone
from django.utils.timezone import now
from konova.models import UuidModel
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, SSO_SERVER_BASE
from konova.utils.generators import generate_token
@@ -46,3 +53,105 @@ class APIUserToken(models.Model):
except ObjectDoesNotExist:
raise PermissionError("Credentials invalid")
return token_obj.user
class OAuthToken(UuidModel):
access_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth access token"
)
refresh_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth refresh token"
)
expires_on = models.DateTimeField(
db_comment="When the token will be expired"
)
ASSUMED_LATENCY = 1000 # assumed latency between creation and receiving of an access token
def __str__(self):
return str(self.access_token)
@staticmethod
def from_access_token_response(access_token_data: str, received_on):
"""
Creates an OAuthToken based on retrieved access token data (OAuth2.0 specification)
Args:
access_token_data (str): OAuth2.0 response data
received_on (): Timestamp when the response has been received
Returns:
"""
oauth_token = OAuthToken()
data = json.loads(access_token_data)
oauth_token.access_token = data.get("access_token")
oauth_token.refresh_token = data.get("refresh_token")
expires_on = received_on + timedelta(
seconds=(data.get("expires_in") + OAuthToken.ASSUMED_LATENCY)
)
oauth_token.expires_on = expires_on
return oauth_token
def refresh(self):
url = f"{SSO_SERVER_BASE}o/token/"
params = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
}
response = requests.post(
url,
params
)
_now = now()
is_response_invalid = response.status_code != 200
if is_response_invalid:
raise RuntimeError(f"Refreshing token not possible: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
access_token = response_content.get("access_token")
refresh_token = response_content.get("refresh_token")
expires_in = response_content.get("expires")
self.access_token = access_token
self.refresh_token = refresh_token
self.expires_in = expires_in
self.save()
return self
def update_and_get_user(self):
from user.models import User
url = f"{SSO_SERVER_BASE}users/oauth/data/"
access_token = self.access_token
response = requests.get(
url,
headers={
"Authorization": f"Bearer {access_token}",
}
)
is_response_code_invalid = response.status_code != 200
if is_response_code_invalid:
raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
user = User.oauth_update_user(response_content)
return user