import json from datetime import timedelta import requests from django.core.exceptions import ObjectDoesNotExist from django.db import models from django.utils import timezone from django.utils.timezone import now from konova.models import UuidModel from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, SSO_SERVER_BASE from konova.utils.generators import generate_token class APIUserToken(models.Model): token = models.CharField( primary_key=True, max_length=1000, default=generate_token, ) valid_until = models.DateField( blank=True, null=True, help_text="Token is only valid until this date. Forever if null/blank.", ) is_active = models.BooleanField( default=False, help_text="Must be activated by an admin" ) def __str__(self): return self.token @staticmethod def get_user_from_token(token: str): """ Getter for the related user object Args: token (str): The used token Returns: user (User): Otherwise None """ _today = timezone.now().date() try: token_obj = APIUserToken.objects.get( token=token, ) if not token_obj.is_active: raise PermissionError("Token unverified") if token_obj.valid_until is not None and token_obj.valid_until < _today: raise PermissionError("Token validity expired") except ObjectDoesNotExist: raise PermissionError("Credentials invalid") return token_obj.user class OAuthToken(UuidModel): access_token = models.CharField( max_length=255, blank=False, null=False, db_comment="OAuth access token" ) refresh_token = models.CharField( max_length=255, blank=False, null=False, db_comment="OAuth refresh token" ) expires_on = models.DateTimeField( db_comment="When the token will be expired" ) ASSUMED_LATENCY = 1000 # assumed latency between creation and receiving of an access token def __str__(self): return str(self.access_token) @staticmethod def from_access_token_response(access_token_data: str, received_on): """ Creates an OAuthToken based on retrieved access token data (OAuth2.0 specification) Args: access_token_data (str): OAuth2.0 response data received_on (): Timestamp when the response has been received Returns: """ oauth_token = OAuthToken() data = json.loads(access_token_data) oauth_token.access_token = data.get("access_token") oauth_token.refresh_token = data.get("refresh_token") expires_on = received_on + timedelta( seconds=(data.get("expires_in") + OAuthToken.ASSUMED_LATENCY) ) oauth_token.expires_on = expires_on return oauth_token def refresh(self): url = f"{SSO_SERVER_BASE}o/token/" params = { "grant_type": "refresh_token", "refresh_token": self.refresh_token, "client_id": OAUTH_CLIENT_ID, "client_secret": OAUTH_CLIENT_SECRET } response = requests.post( url, params ) _now = now() is_response_invalid = response.status_code != 200 if is_response_invalid: raise RuntimeError(f"Refreshing token not possible: {response.status_code}") response_content = response.content.decode("utf-8") response_content = json.loads(response_content) access_token = response_content.get("access_token") refresh_token = response_content.get("refresh_token") expires_in = response_content.get("expires") self.access_token = access_token self.refresh_token = refresh_token self.expires_in = expires_in self.save() return self def update_and_get_user(self): from user.models import User url = f"{SSO_SERVER_BASE}users/oauth/data/" access_token = self.access_token response = requests.get( url, headers={ "Authorization": f"Bearer {access_token}", } ) is_response_code_invalid = response.status_code != 200 if is_response_code_invalid: raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}") response_content = response.content.decode("utf-8") response_content = json.loads(response_content) user = User.oauth_update_user(response_content) return user def revoke(self) -> int: """ Revokes the OAuth2 token of the user (/o/revoke_token/ indeed removes the corresponding access token on provider side and invalidates the submitted refresh token in one step) Returns: revocation_status_code (int): HTTP status code for revocation of refresh_token """ revoke_url = f"{SSO_SERVER_BASE}o/revoke_token/" token = self.refresh_token revocation_status_code = requests.post( revoke_url, data={ 'token': token, 'token_type_hint': "refresh_token", }, auth=(OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET), ).status_code return revocation_status_code