You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
konova/api/models/token.py

158 lines
4.6 KiB
Python

import json
from datetime import timedelta
import requests
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.utils import timezone
from django.utils.timezone import now
from konova.models import UuidModel
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, SSO_SERVER_BASE
from konova.utils.generators import generate_token
class APIUserToken(models.Model):
token = models.CharField(
primary_key=True,
max_length=1000,
default=generate_token,
)
valid_until = models.DateField(
blank=True,
null=True,
help_text="Token is only valid until this date. Forever if null/blank.",
)
is_active = models.BooleanField(
default=False,
help_text="Must be activated by an admin"
)
def __str__(self):
return self.token
@staticmethod
def get_user_from_token(token: str):
""" Getter for the related user object
Args:
token (str): The used token
Returns:
user (User): Otherwise None
"""
_today = timezone.now().date()
try:
token_obj = APIUserToken.objects.get(
token=token,
)
if not token_obj.is_active:
raise PermissionError("Token unverified")
if token_obj.valid_until is not None and token_obj.valid_until < _today:
raise PermissionError("Token validity expired")
except ObjectDoesNotExist:
raise PermissionError("Credentials invalid")
return token_obj.user
class OAuthToken(UuidModel):
access_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth access token"
)
refresh_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth refresh token"
)
expires_on = models.DateTimeField(
db_comment="When the token will be expired"
)
ASSUMED_LATENCY = 1000 # assumed latency between creation and receiving of an access token
def __str__(self):
return str(self.access_token)
@staticmethod
def from_access_token_response(access_token_data: str, received_on):
"""
Creates an OAuthToken based on retrieved access token data (OAuth2.0 specification)
Args:
access_token_data (str): OAuth2.0 response data
received_on (): Timestamp when the response has been received
Returns:
"""
oauth_token = OAuthToken()
data = json.loads(access_token_data)
oauth_token.access_token = data.get("access_token")
oauth_token.refresh_token = data.get("refresh_token")
expires_on = received_on + timedelta(
seconds=(data.get("expires_in") + OAuthToken.ASSUMED_LATENCY)
)
oauth_token.expires_on = expires_on
return oauth_token
def refresh(self):
url = f"{SSO_SERVER_BASE}o/token/"
params = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
}
response = requests.post(
url,
params
)
_now = now()
is_response_invalid = response.status_code != 200
if is_response_invalid:
raise RuntimeError(f"Refreshing token not possible: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
access_token = response_content.get("access_token")
refresh_token = response_content.get("refresh_token")
expires_in = response_content.get("expires")
self.access_token = access_token
self.refresh_token = refresh_token
self.expires_in = expires_in
self.save()
return self
def update_and_get_user(self):
from user.models import User
url = f"{SSO_SERVER_BASE}users/oauth/data/"
access_token = self.access_token
response = requests.get(
url,
headers={
"Authorization": f"Bearer {access_token}",
}
)
is_response_code_invalid = response.status_code != 200
if is_response_code_invalid:
raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
user = User.oauth_update_user(response_content)
return user