Files
konova/konova/views/map_proxy.py
mpeltriaux 61ec9c8c9b # ClientProxyView
* refactors login required from method decorator to mixin inheritance
2025-10-17 14:42:44 +02:00

112 lines
3.4 KiB
Python

"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: ksp-servicestelle@sgdnord.rlp.de
Created on: 19.08.22
"""
import json
from json import JSONDecodeError
import requests
import urllib3.util
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import JsonResponse, HttpRequest, HttpResponse
from django.utils.http import urlencode
from django.views import View
from requests.auth import HTTPDigestAuth
from konova.sub_settings.lanis_settings import MAP_PROXY_HOST_WHITELIST
from konova.sub_settings.proxy_settings import PROXIES, GEOPORTAL_RLP_USER, GEOPORTAL_RLP_PASSWORD
class BaseClientProxyView(LoginRequiredMixin, View):
""" Provides proxy functionality for NETGIS map client.
"""
class Meta:
abstract = True
def _check_with_whitelist(self, url):
parsed_url = urllib3.util.parse_url(url)
parsed_url_host = parsed_url.host
whitelist = set(MAP_PROXY_HOST_WHITELIST)
is_allowed = parsed_url_host in whitelist
return is_allowed
def perform_url_call(self, url, headers={}, auth=None):
""" Generic proxied call
Args:
url (str): The url to call
headers (dict): Optional headers
auth: Optional authentication info
Returns:
"""
response = requests.get(
url,
proxies=PROXIES,
headers=headers,
auth=auth
)
content = response.content
if isinstance(content, bytes):
try:
content = content.decode("utf-8")
except UnicodeDecodeError:
content = f"Can not decode content of {url}"
return content, response.status_code
class ClientProxyParcelSearch(BaseClientProxyView):
def get(self, request: HttpRequest):
url = request.META.get("QUERY_STRING")
is_url_allowed = self._check_with_whitelist(url)
if not is_url_allowed:
raise PermissionError(f"Proxied url '{url}' is not allowed!")
content, response_code = self.perform_url_call(url)
return HttpResponse(content)
class ClientProxyParcelWFS(BaseClientProxyView):
def get(self, request: HttpRequest):
params = request.GET.dict()
params["version"] = "2.0.0"
params["outputformat"] = "application/json; subtype=geojson"
base_url = "https://www.geoportal.rlp.de/registry/wfs/519"
url = f"{base_url}?{urlencode(params, doseq=True)}"
url = url.replace("typename", "typenames")
auth = HTTPDigestAuth(GEOPORTAL_RLP_USER, GEOPORTAL_RLP_PASSWORD)
content, response_code = self.perform_url_call(url, auth=auth)
error_detected = response_code != 200
error_code = f"response code:{response_code}"
try:
body = json.loads(content)
except JSONDecodeError:
body = {}
error_code = "json invalid"
error_detected = True
body["crs"] = {
"type": "name",
"properties": {
"name": "urn:ogc:def:crs:EPSG::25832",
}
}
if error_detected:
body["crs"]["properties"]["msg"] = f"Error detected ({error_code})"
return JsonResponse({
"status_code": response_code,
"content": body,
})
resp = JsonResponse(body)
return resp