konova/konova/views/map_proxy.py
mpeltriaux 4052105e1d # Server proxy for client parcel wfs
* refactors map_proxy.py
* adds proxy support for parcel wfs
2023-02-13 10:55:58 +01:00

96 lines
2.8 KiB
Python

"""
Author: Michel Peltriaux
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
Contact: ksp-servicestelle@sgdnord.rlp.de
Created on: 19.08.22
"""
import json
import requests
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse, HttpRequest
from django.utils.decorators import method_decorator
from django.utils.http import urlencode
from django.views import View
from requests.auth import HTTPDigestAuth
from konova.sub_settings.proxy_settings import PROXIES, CLIENT_PROXY_AUTH_USER, CLIENT_PROXY_AUTH_PASSWORD
class BaseClientProxyView(View):
""" Provides proxy functionality for NETGIS map client.
"""
class Meta:
abstract = True
@method_decorator(login_required)
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
def perform_url_call(self, url, headers={}, auth=None):
""" Generic proxied call
Args:
url (str): The url to call
headers (dict): Optional headers
auth: Optional authentication info
Returns:
"""
response = requests.get(
url,
proxies=PROXIES,
headers=headers,
auth=auth
)
content = response.content
if isinstance(content, bytes):
content = content.decode("utf-8")
return content, response.status_code
class ClientProxyParcelSearch(BaseClientProxyView):
def get(self, request: HttpRequest):
url = request.META.get("QUERY_STRING")
content, response_code = self.perform_url_call(url)
body = json.loads(content)
if response_code != 200:
return JsonResponse({
"status_code": response_code,
"content": body,
})
return JsonResponse(body)
class ClientProxyParcelWFS(BaseClientProxyView):
def get(self, request: HttpRequest):
params = request.GET.dict()
params["version"] = "2.0.0"
params["outputformat"] = "application/json; subtype=geojson"
base_url = "https://www.geoportal.rlp.de/registry/wfs/519"
url = f"{base_url}?{urlencode(params, doseq=True)}"
url = url.replace("typename", "typenames")
auth = HTTPDigestAuth(CLIENT_PROXY_AUTH_USER, CLIENT_PROXY_AUTH_PASSWORD)
content, response_code = self.perform_url_call(url, auth=auth)
body = json.loads(content)
body["crs"] = {
"type": "name",
"properties": {
"name": "urn:ogc:def:crs:EPSG::25832"
}
}
if response_code != 200:
return JsonResponse({
"status_code": response_code,
"content": body,
})
resp = JsonResponse(body)
return resp