mpeltriaux
e39c7eb51f
* adds support for standardized bearer token usage instead of ksptoken/kspuser header usage (still supported)
180 lines
5.4 KiB
Python
180 lines
5.4 KiB
Python
import json
|
|
from datetime import timedelta
|
|
|
|
import requests
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.db import models
|
|
from django.utils import timezone
|
|
from django.utils.timezone import now
|
|
|
|
from konova.models import UuidModel
|
|
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, SSO_SERVER_BASE
|
|
from konova.utils.generators import generate_token
|
|
|
|
|
|
class APIUserToken(models.Model):
|
|
token = models.CharField(
|
|
primary_key=True,
|
|
max_length=1000,
|
|
default=generate_token,
|
|
)
|
|
valid_until = models.DateField(
|
|
blank=True,
|
|
null=True,
|
|
help_text="Token is only valid until this date. Forever if null/blank.",
|
|
)
|
|
is_active = models.BooleanField(
|
|
default=False,
|
|
help_text="Must be activated by an admin"
|
|
)
|
|
|
|
def __str__(self):
|
|
return self.token
|
|
|
|
@staticmethod
|
|
def get_user_from_token(token: str):
|
|
""" Getter for the related user object
|
|
|
|
Args:
|
|
token (str): The used token
|
|
|
|
Returns:
|
|
user (User): Otherwise None
|
|
"""
|
|
_today = timezone.now().date()
|
|
try:
|
|
token_obj = APIUserToken.objects.get(
|
|
token=token,
|
|
)
|
|
if not token_obj.is_active:
|
|
raise PermissionError("Token unverified")
|
|
if token_obj.valid_until is not None and token_obj.valid_until < _today:
|
|
raise PermissionError("Token validity expired")
|
|
except ObjectDoesNotExist:
|
|
raise PermissionError("Token unknown")
|
|
return token_obj.user
|
|
|
|
|
|
class OAuthToken(UuidModel):
|
|
access_token = models.CharField(
|
|
max_length=255,
|
|
blank=False,
|
|
null=False,
|
|
db_comment="OAuth access token"
|
|
)
|
|
refresh_token = models.CharField(
|
|
max_length=255,
|
|
blank=False,
|
|
null=False,
|
|
db_comment="OAuth refresh token"
|
|
)
|
|
expires_on = models.DateTimeField(
|
|
db_comment="When the token will be expired"
|
|
)
|
|
|
|
ASSUMED_LATENCY = 1000 # assumed latency between creation and receiving of an access token
|
|
|
|
def __str__(self):
|
|
return str(self.access_token)
|
|
|
|
@staticmethod
|
|
def from_access_token_response(access_token_data: str, received_on):
|
|
"""
|
|
Creates an OAuthToken based on retrieved access token data (OAuth2.0 specification)
|
|
|
|
Args:
|
|
access_token_data (str): OAuth2.0 response data
|
|
received_on (): Timestamp when the response has been received
|
|
|
|
Returns:
|
|
|
|
"""
|
|
oauth_token = OAuthToken()
|
|
data = json.loads(access_token_data)
|
|
|
|
oauth_token.access_token = data.get("access_token")
|
|
oauth_token.refresh_token = data.get("refresh_token")
|
|
|
|
expires_on = received_on + timedelta(
|
|
seconds=(data.get("expires_in") + OAuthToken.ASSUMED_LATENCY)
|
|
)
|
|
oauth_token.expires_on = expires_on
|
|
|
|
return oauth_token
|
|
|
|
def refresh(self):
|
|
url = f"{SSO_SERVER_BASE}o/token/"
|
|
params = {
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": self.refresh_token,
|
|
"client_id": OAUTH_CLIENT_ID,
|
|
"client_secret": OAUTH_CLIENT_SECRET
|
|
}
|
|
response = requests.post(
|
|
url,
|
|
params
|
|
)
|
|
_now = now()
|
|
is_response_invalid = response.status_code != 200
|
|
if is_response_invalid:
|
|
raise RuntimeError(f"Refreshing token not possible: {response.status_code}")
|
|
|
|
response_content = response.content.decode("utf-8")
|
|
response_content = json.loads(response_content)
|
|
|
|
access_token = response_content.get("access_token")
|
|
refresh_token = response_content.get("refresh_token")
|
|
expires_in = response_content.get("expires")
|
|
|
|
self.access_token = access_token
|
|
self.refresh_token = refresh_token
|
|
self.expires_in = expires_in
|
|
self.save()
|
|
|
|
return self
|
|
|
|
def update_and_get_user(self):
|
|
from user.models import User
|
|
url = f"{SSO_SERVER_BASE}users/oauth/data/"
|
|
|
|
access_token = self.access_token
|
|
response = requests.get(
|
|
url,
|
|
headers={
|
|
"Authorization": f"Bearer {access_token}",
|
|
}
|
|
)
|
|
|
|
is_response_code_invalid = response.status_code != 200
|
|
if is_response_code_invalid:
|
|
raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
|
|
|
|
response_content = response.content.decode("utf-8")
|
|
response_content = json.loads(response_content)
|
|
user = User.oauth_update_user(response_content)
|
|
|
|
return user
|
|
|
|
def revoke(self) -> int:
|
|
""" Revokes the OAuth2 token of the user
|
|
|
|
(/o/revoke_token/ indeed removes the corresponding access token on provider side and invalidates the
|
|
submitted refresh token in one step)
|
|
|
|
Returns:
|
|
revocation_status_code (int): HTTP status code for revocation of refresh_token
|
|
"""
|
|
revoke_url = f"{SSO_SERVER_BASE}o/revoke_token/"
|
|
token = self.refresh_token
|
|
revocation_status_code = requests.post(
|
|
revoke_url,
|
|
data={
|
|
'token': token,
|
|
'token_type_hint': "refresh_token",
|
|
},
|
|
auth=(OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET),
|
|
).status_code
|
|
|
|
return revocation_status_code
|
|
|