Merge branch 'refs/heads/407_Drop_django-simple-sso' into env

# Conflicts:
#	konova/sub_settings/sso_settings.py
#	requirements.txt
pull/411/head
mpeltriaux 4 months ago
commit 6e35dc7de6

@ -1,6 +1,6 @@
from django.contrib import admin from django.contrib import admin
from api.models.token import APIUserToken from api.models.token import APIUserToken, OAuthToken
class APITokenAdmin(admin.ModelAdmin): class APITokenAdmin(admin.ModelAdmin):
@ -17,4 +17,17 @@ class APITokenAdmin(admin.ModelAdmin):
] ]
class OAuthTokenAdmin(admin.ModelAdmin):
list_display = [
"access_token",
"refresh_token",
"expires_on",
]
search_fields = [
"access_token",
"refresh_token",
]
admin.site.register(APIUserToken, APITokenAdmin) admin.site.register(APIUserToken, APITokenAdmin)
admin.site.register(OAuthToken, OAuthTokenAdmin)

@ -0,0 +1,26 @@
# Generated by Django 5.0.4 on 2024-04-30 07:20
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0002_alter_apiusertoken_valid_until'),
]
operations = [
migrations.CreateModel(
name='OAuthToken',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('access_token', models.CharField(db_comment='OAuth access token', max_length=255)),
('refresh_token', models.CharField(db_comment='OAuth refresh token', max_length=255)),
('expires_on', models.DateTimeField(db_comment='When the token will be expired')),
],
options={
'abstract': False,
},
),
]

@ -1,7 +1,14 @@
import json
from datetime import timedelta
import requests
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.db import models from django.db import models
from django.utils import timezone from django.utils import timezone
from django.utils.timezone import now
from konova.models import UuidModel
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, SSO_SERVER_BASE
from konova.utils.generators import generate_token from konova.utils.generators import generate_token
@ -46,3 +53,105 @@ class APIUserToken(models.Model):
except ObjectDoesNotExist: except ObjectDoesNotExist:
raise PermissionError("Credentials invalid") raise PermissionError("Credentials invalid")
return token_obj.user return token_obj.user
class OAuthToken(UuidModel):
access_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth access token"
)
refresh_token = models.CharField(
max_length=255,
blank=False,
null=False,
db_comment="OAuth refresh token"
)
expires_on = models.DateTimeField(
db_comment="When the token will be expired"
)
ASSUMED_LATENCY = 1000 # assumed latency between creation and receiving of an access token
def __str__(self):
return str(self.access_token)
@staticmethod
def from_access_token_response(access_token_data: str, received_on):
"""
Creates an OAuthToken based on retrieved access token data (OAuth2.0 specification)
Args:
access_token_data (str): OAuth2.0 response data
received_on (): Timestamp when the response has been received
Returns:
"""
oauth_token = OAuthToken()
data = json.loads(access_token_data)
oauth_token.access_token = data.get("access_token")
oauth_token.refresh_token = data.get("refresh_token")
expires_on = received_on + timedelta(
seconds=(data.get("expires_in") + OAuthToken.ASSUMED_LATENCY)
)
oauth_token.expires_on = expires_on
return oauth_token
def refresh(self):
url = f"{SSO_SERVER_BASE}o/token/"
params = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
}
response = requests.post(
url,
params
)
_now = now()
is_response_invalid = response.status_code != 200
if is_response_invalid:
raise RuntimeError(f"Refreshing token not possible: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
access_token = response_content.get("access_token")
refresh_token = response_content.get("refresh_token")
expires_in = response_content.get("expires")
self.access_token = access_token
self.refresh_token = refresh_token
self.expires_in = expires_in
self.save()
return self
def update_and_get_user(self):
from user.models import User
url = f"{SSO_SERVER_BASE}users/oauth/data/"
access_token = self.access_token
response = requests.get(
url,
headers={
"Authorization": f"Bearer {access_token}",
}
)
is_response_code_invalid = response.status_code != 200
if is_response_code_invalid:
raise RuntimeError(f"OAuth user data fetching unsuccessful: {response.status_code}")
response_content = response.content.decode("utf-8")
response_content = json.loads(response_content)
user = User.oauth_update_user(response_content)
return user

@ -11,6 +11,7 @@ from abc import abstractmethod
from django.contrib.gis import geos from django.contrib.gis import geos
from django.contrib.gis.geos import GEOSGeometry from django.contrib.gis.geos import GEOSGeometry
from django.core.paginator import Paginator from django.core.paginator import Paginator
from django.db.models import Q
from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP
from konova.utils.message_templates import DATA_UNSHARED from konova.utils.message_templates import DATA_UNSHARED
@ -32,8 +33,8 @@ class AbstractModelAPISerializer:
self.lookup = { self.lookup = {
"id": None, # must be set "id": None, # must be set
"deleted__isnull": True, "deleted__isnull": True,
"users__in": [], # must be set
} }
self.shared_lookup = Q() # must be set, so user or team based share will be used properly
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@abstractmethod @abstractmethod
@ -76,7 +77,11 @@ class AbstractModelAPISerializer:
else: else:
# Return certain object # Return certain object
self.lookup["id"] = _id self.lookup["id"] = _id
self.lookup["users__in"] = [user]
self.shared_lookup = Q(
Q(users__in=[user]) |
Q(teams__in=list(user.shared_teams))
)
def fetch_and_serialize(self): def fetch_and_serialize(self):
""" Serializes the model entry according to the given lookup data """ Serializes the model entry according to the given lookup data
@ -86,7 +91,13 @@ class AbstractModelAPISerializer:
Returns: Returns:
serialized_data (dict) serialized_data (dict)
""" """
entries = self.model.objects.filter(**self.lookup).order_by("id") entries = self.model.objects.filter(
**self.lookup
).filter(
self.shared_lookup
).order_by(
"id"
).distinct()
self.paginator = Paginator(entries, self.rpp) self.paginator = Paginator(entries, self.rpp)
requested_entries = self.paginator.page(self.page_number) requested_entries = self.paginator.page(self.page_number)

@ -6,6 +6,7 @@ Created on: 24.01.22
""" """
from django.db import transaction from django.db import transaction
from django.db.models import Q
from api.utils.serializer.v1.serializer import AbstractModelAPISerializerV1, AbstractCompensationAPISerializerV1Mixin from api.utils.serializer.v1.serializer import AbstractModelAPISerializerV1, AbstractCompensationAPISerializerV1Mixin
from compensation.models import Compensation from compensation.models import Compensation
@ -21,8 +22,10 @@ class CompensationAPISerializerV1(AbstractModelAPISerializerV1, AbstractCompensa
def prepare_lookup(self, id, user): def prepare_lookup(self, id, user):
super().prepare_lookup(id, user) super().prepare_lookup(id, user)
del self.lookup["users__in"] self.shared_lookup = Q(
self.lookup["intervention__users__in"] = [user] Q(intervention__users__in=[user]) |
Q(intervention__teams__in=user.shared_teams)
)
def intervention_to_json(self, entry): def intervention_to_json(self, entry):
return { return {

@ -6,6 +6,7 @@ Created on: 28.01.22
""" """
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
from api.utils.serializer.v1.serializer import DeductableAPISerializerV1Mixin, AbstractModelAPISerializerV1 from api.utils.serializer.v1.serializer import DeductableAPISerializerV1Mixin, AbstractModelAPISerializerV1
from compensation.models import EcoAccountDeduction, EcoAccount from compensation.models import EcoAccountDeduction, EcoAccount
@ -28,9 +29,11 @@ class DeductionAPISerializerV1(AbstractModelAPISerializerV1,
""" """
super().prepare_lookup(_id, user) super().prepare_lookup(_id, user)
del self.lookup["users__in"]
del self.lookup["deleted__isnull"] del self.lookup["deleted__isnull"]
self.lookup["intervention__users__in"] = [user] self.shared_lookup = Q(
Q(intervention__users__in=[user]) |
Q(intervention__teams__in=user.shared_teams)
)
def _model_to_geo_json(self, entry): def _model_to_geo_json(self, entry):
""" Adds the basic data """ Adds the basic data

@ -23,11 +23,6 @@ class AbstractAPIViewV1(AbstractAPIView):
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.lookup = {
"id": None, # must be set in subclasses
"deleted__isnull": True,
"users__in": [], # must be set in subclasses
}
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.serializer = self.serializer() self.serializer = self.serializer()

@ -1,78 +0,0 @@
"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: michel.peltriaux@sgdnord.rlp.de
Created on: 17.08.21
"""
from django.http import HttpResponse
from django.urls import re_path
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from itsdangerous import TimedSerializer
from simple_sso.sso_client.client import Client
from user.models import User
class PropagateView(View):
""" View used to receive propagated sso-server user data
"""
client = None
signer = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signer = TimedSerializer(self.client.private_key)
@csrf_exempt
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
def post(self, request):
user_data = request.body
user_data = self.signer.loads(user_data)
self.client.build_user(user_data)
return HttpResponse(status=200)
class KonovaSSOClient(Client):
""" Konova specialized derivative of general sso.Client.
Adds some custom behaviour for konova usage.
"""
propagate_view = PropagateView
def get_urls(self):
urls = super().get_urls()
urls += re_path(r'^propagate/$', self.propagate_view.as_view(client=self), name='simple-sso-propagate'),
return urls
def build_user(self, user_data):
""" Creates a user or updates user data
Args:
user_data ():
Returns:
"""
try:
user = User.objects.get(username=user_data['username'])
# Update user data, excluding some changes
skipable_attrs = {
"username",
"is_staff",
"is_superuser",
}
for _attr, _val in user_data.items():
if _attr in skipable_attrs:
continue
setattr(user, _attr, _val)
except User.DoesNotExist:
user = User(**user_data)
user.set_unusable_password()
user.save()
return user

@ -47,7 +47,7 @@ CSRF_TRUSTED_ORIGINS = [
] ]
# Authentication settings # Authentication settings
LOGIN_URL = "/login/" LOGIN_URL = "/oauth/login/"
# Session settings # Session settings
SESSION_COOKIE_AGE = 60 * 60 # 60 minutes SESSION_COOKIE_AGE = 60 * 60 # 60 minutes
@ -81,10 +81,6 @@ INSTALLED_APPS = [
'analysis', 'analysis',
'api', 'api',
] ]
if DEBUG:
INSTALLED_APPS += [
'debug_toolbar',
]
MIDDLEWARE = [ MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware', 'django.middleware.security.SecurityMiddleware',
@ -96,10 +92,6 @@ MIDDLEWARE = [
'django.contrib.messages.middleware.MessageMiddleware', 'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware',
] ]
if DEBUG:
MIDDLEWARE += [
"debug_toolbar.middleware.DebugToolbarMiddleware",
]
ROOT_URLCONF = 'konova.urls' ROOT_URLCONF = 'konova.urls'
@ -200,28 +192,6 @@ STATICFILES_DIRS = [
os.path.join(BASE_DIR, 'templates/map/client/libs'), # NETGIS map client files os.path.join(BASE_DIR, 'templates/map/client/libs'), # NETGIS map client files
] ]
# DJANGO DEBUG TOOLBAR
INTERNAL_IPS = [
"127.0.0.1"
]
DEBUG_TOOLBAR_CONFIG = {
"DISABLE_PANELS": {
'debug_toolbar.panels.versions.VersionsPanel',
'debug_toolbar.panels.timer.TimerPanel',
'debug_toolbar.panels.settings.SettingsPanel',
'debug_toolbar.panels.headers.HeadersPanel',
'debug_toolbar.panels.request.RequestPanel',
'debug_toolbar.panels.sql.SQLPanel',
'debug_toolbar.panels.staticfiles.StaticFilesPanel',
'debug_toolbar.panels.templates.TemplatesPanel',
'debug_toolbar.panels.cache.CachePanel',
'debug_toolbar.panels.signals.SignalsPanel',
'debug_toolbar.panels.logging.LoggingPanel',
'debug_toolbar.panels.redirects.RedirectsPanel',
'debug_toolbar.panels.profiling.ProfilingPanel',
}
}
# EMAIL (see https://docs.djangoproject.com/en/dev/topics/email/) # EMAIL (see https://docs.djangoproject.com/en/dev/topics/email/)
# CHANGE_ME !!! ONLY FOR DEVELOPMENT !!! # CHANGE_ME !!! ONLY FOR DEVELOPMENT !!!

@ -10,5 +10,9 @@ from konova.sub_settings.django_settings import env
# SSO settings # SSO settings
SSO_SERVER_BASE = env("SSO_SERVER_BASE_URL") SSO_SERVER_BASE = env("SSO_SERVER_BASE_URL")
SSO_SERVER = f"{SSO_SERVER_BASE}sso/" SSO_SERVER = f"{SSO_SERVER_BASE}sso/"
SSO_PRIVATE_KEY = env("SSO_PRIVATE_KEY")
SSO_PUBLIC_KEY = env("SSO_PUBLIC_KEY") # OAuth settings
OAUTH_CODE_VERIFIER = env("OAUTH_CODE_VERIFIER")
OAUTH_CLIENT_ID = env("OAUTH_CLIENT_ID")
OAUTH_CLIENT_SECRET = env("OAUTH_CLIENT_SECRET")

@ -20,7 +20,7 @@
</div> </div>
<div class="card-body"> <div class="card-body">
<div class="scroll-150 font-italic"> <div class="scroll-150 font-italic">
{{obj.comment}} {{obj.comment|linebreaks}}
</div> </div>
</div> </div>
</div> </div>

@ -509,7 +509,7 @@ class BaseViewTestCase(BaseTestCase):
def setUp(self) -> None: def setUp(self) -> None:
super().setUp() super().setUp()
self.login_url = reverse("simple-sso-login") self.login_url = reverse("oauth-login")
def assert_url_success(self, client: Client, urls: list): def assert_url_success(self, client: Client, urls: list):
""" Assert for all given urls a direct 200 response """ Assert for all given urls a direct 200 response

@ -13,21 +13,19 @@ Including another URLconf
1. Import the include() function: from django.urls import include, path 1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) 2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
""" """
import debug_toolbar
from django.contrib import admin from django.contrib import admin
from django.urls import path, include from django.urls import path, include
from konova.settings import SSO_SERVER, SSO_PUBLIC_KEY, SSO_PRIVATE_KEY, DEBUG
from konova.sso.sso import KonovaSSOClient
from konova.views.logout import LogoutView from konova.views.logout import LogoutView
from konova.views.geometry import GeomParcelsView, GeomParcelsContentView from konova.views.geometry import GeomParcelsView, GeomParcelsContentView
from konova.views.home import HomeView from konova.views.home import HomeView
from konova.views.map_proxy import ClientProxyParcelSearch, ClientProxyParcelWFS from konova.views.map_proxy import ClientProxyParcelSearch, ClientProxyParcelWFS
from konova.views.oauth import OAuthLoginView, OAuthCallbackView
sso_client = KonovaSSOClient(SSO_SERVER, SSO_PUBLIC_KEY, SSO_PRIVATE_KEY)
urlpatterns = [ urlpatterns = [
path('admin/', admin.site.urls), path('admin/', admin.site.urls),
path('login/', include(sso_client.get_urls())), path('oauth/callback/', OAuthCallbackView.as_view(), name="oauth-callback"),
path('oauth/login/', OAuthLoginView.as_view(), name="oauth-login"),
path('logout/', LogoutView.as_view(), name="logout"), path('logout/', LogoutView.as_view(), name="logout"),
path('', HomeView.as_view(), name="home"), path('', HomeView.as_view(), name="home"),
path('intervention/', include("intervention.urls")), path('intervention/', include("intervention.urls")),
@ -44,10 +42,5 @@ urlpatterns = [
path('client/proxy/wfs', ClientProxyParcelWFS.as_view(), name="client-proxy-wfs"), path('client/proxy/wfs', ClientProxyParcelWFS.as_view(), name="client-proxy-wfs"),
] ]
if DEBUG:
urlpatterns += [
path('__debug__/', include(debug_toolbar.urls)),
]
handler404 = "konova.views.error.get_404_view" handler404 = "konova.views.error.get_404_view"
handler500 = "konova.views.error.get_500_view" handler500 = "konova.views.error.get_500_view"

@ -0,0 +1,125 @@
"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: ksp-servicestelle@sgdnord.rlp.de
Created on: 26.04.24
"""
import base64
import hashlib
from urllib.parse import urlencode
import requests
from django.contrib.auth import login
from django.http import HttpRequest
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.timezone import now
from django.views import View
from api.models import OAuthToken
from konova.sub_settings.django_settings import BASE_URL
from konova.sub_settings.sso_settings import SSO_SERVER_BASE, OAUTH_CODE_VERIFIER, OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET
class OAuthLoginView(View):
"""
Starts OAuth Login procedure
-> AnonymousUser is redirected to SSO component using specific parameters
-> After successful login (in SSO component), user will be redirected to a specific callback url (OAuthCallbackView)
-> Callback view uses retrieved authorization token to get a proper access token from SSO component
-> SSO component answers with access token
-> OAuthCallbackView uses token in Authorization header to access user data of logged-in user in SSO component
-> OAuthCallbackView creates/updates user
-> OAuthCallbackView logs in user and redirects to default home view
"""
def __create_code_challenge(self):
"""
Creates a code verifier and code challenge for extra security.
See https://django-oauth-toolkit.readthedocs.io/en/latest/getting_started.html#authorization-code for further
information
Returns:
"""
code_verifier = OAUTH_CODE_VERIFIER
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8').replace('=', '')
return code_verifier, code_challenge
def get(self, request: HttpRequest, *args, **kwargs):
"""
Redirects user to OAuth SSO webservice for credential based login there
Args:
request ():
*args ():
**kwargs ():
Returns:
"""
oauth_authentication_code_url = f"{SSO_SERVER_BASE}o/authorize/"
redirect_uri = f'{BASE_URL}{reverse("oauth-callback")}'
code_verifier, code_challenge = self.__create_code_challenge()
urlencode_params = urlencode(
{
"response_type": "code",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"client_id": OAUTH_CLIENT_ID,
"redirect_uri": redirect_uri,
}
)
url = f"{oauth_authentication_code_url}?{urlencode_params}"
return redirect(url)
class OAuthCallbackView(View):
"""
Callback view for OAuth2.0 authentication token.
Authentication tokens will be exchanged for access token.
Access Token will be used for fetching user data from SSO component.
User data will be used for creating/updating user data inside this app.
User will be logged-in and redirected to default home view.
"""
def get(self, request: HttpRequest, *args, **kwargs):
authentication_code = request.GET.get("code")
oauth_acces_token_url = f"{SSO_SERVER_BASE}o/token/"
callback_url = f'{BASE_URL}{reverse("oauth-callback")}'
params = {
"grant_type": "authorization_code",
"code": authentication_code,
"redirect_uri": callback_url,
"code_verifier": OAUTH_CODE_VERIFIER,
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
}
access_code_response = requests.post(
oauth_acces_token_url,
data=params
)
received_on = now()
access_code_response_body = access_code_response.content.decode("utf-8")
status_code_invalid = access_code_response.status_code != 200
if status_code_invalid:
raise RuntimeError(f"OAuth access token could not be fetched: {access_code_response.text}")
oauth_access_token = OAuthToken.from_access_token_response(access_code_response_body, received_on)
oauth_access_token.save()
user = oauth_access_token.update_and_get_user()
user.oauth_replace_token(oauth_access_token)
login(request, user)
return redirect("home")

@ -1,40 +1,45 @@
amqp==5.2.0 amqp==5.2.0
asgiref==3.7.2 asgiref==3.8.1
async-timeout==4.0.3 async-timeout==4.0.3
beautifulsoup4==4.12.3 beautifulsoup4==4.13.0b2
billiard==4.2.0 billiard==4.2.0
cached-property==1.5.2 cached-property==1.5.2
celery==5.3.6 celery==5.4.0
certifi==2024.2.2 certifi==2024.6.2
cffi==1.17.0rc1
chardet==5.2.0 chardet==5.2.0
charset-normalizer==3.3.2 charset-normalizer==3.3.2
click==8.1.7 click==8.1.7
click-didyoumean==0.3.0 click-didyoumean==0.3.1
click-plugins==1.1.1 click-plugins==1.1.1
click-repl==0.3.0 click-repl==0.3.0
coverage==7.3.3 coverage==7.5.3
cryptography==42.0.8
Deprecated==1.2.14 Deprecated==1.2.14
Django==5.0.3 Django==5.0.6
django-autocomplete-light==3.11.0 django-autocomplete-light==3.11.0
django-bootstrap-modal-forms==3.0.4 django-bootstrap-modal-forms==3.0.4
django-bootstrap4==24.1 django-bootstrap4==24.3
django-debug-toolbar==4.2.0
django-environ==0.11.2 django-environ==0.11.2
django-filter==24.1 django-filter==24.2
django-fontawesome-5==1.0.18 django-fontawesome-5==1.0.18
django-simple-sso==1.2.0 django-oauth-toolkit==2.4.0
django-tables2==2.7.0 django-tables2==2.7.0
et-xmlfile==1.1.0 et-xmlfile==1.1.0
idna==3.6 gunicorn==22.0.0
importlib_metadata==7.0.2 idna==3.7
itsdangerous==0.24 importlib_metadata==7.1.0
kombu==5.3.5 jwcrypto==1.5.6
kombu==5.3.7
oauthlib==3.2.2
openpyxl==3.2.0b1 openpyxl==3.2.0b1
packaging==24.0 packaging==24.1
pika==1.3.2 pika==1.3.2
prompt-toolkit==3.0.43 pillow==10.3.0
psycopg==3.1.18 prompt_toolkit==3.0.47
psycopg-binary==3.1.18 psycopg==3.1.19
psycopg-binary==3.1.19
pycparser==2.22
pyparsing==3.1.2 pyparsing==3.1.2
pypng==0.20220715.0 pypng==0.20220715.0
pyproj==3.6.1 pyproj==3.6.1
@ -42,17 +47,16 @@ python-dateutil==2.9.0.post0
pytz==2024.1 pytz==2024.1
PyYAML==6.0.1 PyYAML==6.0.1
qrcode==7.3.1 qrcode==7.3.1
redis==5.1.0a1 redis==5.1.0b6
requests==2.31.0 requests==2.32.3
six==1.16.0 six==1.16.0
soupsieve==2.5 soupsieve==2.5
sqlparse==0.4.4 sqlparse==0.5.0
typing_extensions==4.10.0 typing_extensions==4.12.2
tzdata==2024.1 tzdata==2024.1
urllib3==2.2.1 urllib3==2.2.1
vine==5.1.0 vine==5.1.0
wcwidth==0.2.12 wcwidth==0.2.13
webservices==0.7
wrapt==1.16.0 wrapt==1.16.0
xmltodict==0.13.0 xmltodict==0.13.0
zipp==3.17.0 zipp==3.19.2

@ -29,6 +29,7 @@ class UserAdmin(admin.ModelAdmin):
"is_staff", "is_staff",
"is_superuser", "is_superuser",
"api_token", "api_token",
"oauth_token",
"groups", "groups",
"notifications", "notifications",
"date_joined", "date_joined",

@ -0,0 +1,20 @@
# Generated by Django 5.0.4 on 2024-04-30 07:20
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0003_oauthtoken'),
('user', '0008_alter_user_id'),
]
operations = [
migrations.AddField(
model_name='user',
name='oauth_token',
field=models.ForeignKey(blank=True, db_comment='OAuth token for the user', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to='api.oauthtoken'),
),
]

@ -9,7 +9,7 @@ from django.contrib.auth.models import AbstractUser
from django.db import models from django.db import models
from api.models import APIUserToken from api.models import APIUserToken, OAuthToken
from konova.settings import ZB_GROUP, DEFAULT_GROUP, ETS_GROUP from konova.settings import ZB_GROUP, DEFAULT_GROUP, ETS_GROUP
from konova.utils.mailer import Mailer from konova.utils.mailer import Mailer
from user.enums import UserNotificationEnum from user.enums import UserNotificationEnum
@ -24,6 +24,14 @@ class User(AbstractUser):
help_text="The user's API token", help_text="The user's API token",
on_delete=models.SET_NULL on_delete=models.SET_NULL
) )
oauth_token = models.ForeignKey(
"api.OAuthToken",
blank=True,
null=True,
on_delete=models.SET_NULL,
db_comment="OAuth token for the user",
related_name="+"
)
def is_notification_setting_set(self, notification_enum: UserNotificationEnum): def is_notification_setting_set(self, notification_enum: UserNotificationEnum):
return self.notifications.filter( return self.notifications.filter(
@ -215,3 +223,45 @@ class User(AbstractUser):
deleted__isnull=True deleted__isnull=True
) )
return shared_teams return shared_teams
@staticmethod
def oauth_update_user(user_data: dict):
"""
Get or create a user depending on given user_data.
If the user record already exists, it's data will be updated using user_data.
Args:
user_data (dict): User data from OAuth SSO component
Returns:
user (User): The resolved user
"""
username = user_data.get("username")
user, is_created = User.objects.get_or_create(
username=username
)
if is_created:
user.set_unusable_password()
user.first_name = user_data.get("first_name")
user.last_name = user_data.get("last_name")
user.email = user_data.get("email")
return user
def oauth_replace_token(self, token: OAuthToken):
"""
Drops old token (if existing) and stores given token.
Args:
token (OAuthToken): New token
Returns:
user (User)
"""
if self.oauth_token:
self.oauth_token.delete()
self.oauth_token = token
self.save()
return self

@ -9,11 +9,13 @@ from django.urls import path
from user.autocomplete.share import ShareUserAutocomplete, ShareTeamAutocomplete from user.autocomplete.share import ShareUserAutocomplete, ShareTeamAutocomplete
from user.autocomplete.team import TeamAdminAutocomplete from user.autocomplete.team import TeamAdminAutocomplete
from user.views import * from user.views.propagate import PropagateUserView
from user.views.views import *
app_name = "user" app_name = "user"
urlpatterns = [ urlpatterns = [
path("", index_view, name="index"), path("", index_view, name="index"),
path("propagate/", PropagateUserView.as_view(), name="propagate"),
path("notifications/", notifications_view, name="notifications"), path("notifications/", notifications_view, name="notifications"),
path("token/api", api_token_view, name="api-token"), path("token/api", api_token_view, name="api-token"),
path("contact/<id>", contact_view, name="contact"), path("contact/<id>", contact_view, name="contact"),

@ -0,0 +1,7 @@
"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: ksp-servicestelle@sgdnord.rlp.de
Created on: 10.05.24
"""

@ -0,0 +1,69 @@
"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: ksp-servicestelle@sgdnord.rlp.de
Created on: 10.05.24
"""
import base64
import hashlib
import json
from cryptography.fernet import Fernet
from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpRequest, JsonResponse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from konova.sub_settings.sso_settings import OAUTH_CLIENT_ID
from user.models import User
class PropagateUserView(View):
""" Receives user data to be stored in db
Used if new user gets access to application and needs to be created in application before first login (e.g.
proper rights management)
"""
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
def post(self, request: HttpRequest, *args, **kwargs):
# Decrypt
encrypted_body = request.body
hash = hashlib.md5()
hash.update(OAUTH_CLIENT_ID.encode("utf-8"))
key = base64.urlsafe_b64encode(hash.hexdigest().encode("utf-8"))
fernet = Fernet(key)
body = fernet.decrypt(encrypted_body).decode("utf-8")
body = json.loads(body)
try:
status = "updated"
user = User.objects.get(username=body.get('username'))
# Update user data, excluding some changes
skipable_attrs = {
"username",
"is_staff",
"is_superuser",
}
for _attr, _val in body.items():
if _attr in skipable_attrs:
continue
setattr(user, _attr, _val)
except ObjectDoesNotExist:
user = User(**body)
status = "created"
user.set_unusable_password()
user.save()
data = {
"success": True,
"status": status
}
return JsonResponse(data)
Loading…
Cancel
Save