49f7f3db53
* adds codelist app * adds KonovaCodeList and KonovaCode model for fetching and storing OSIRIS Codelisten entries * adds update_codelist command for updating and fetching codes * adds autocomplete route for using codelists in forms
157 lines
5.4 KiB
Python
157 lines
5.4 KiB
Python
"""
|
|
Author: Michel Peltriaux
|
|
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
|
|
Contact: michel.peltriaux@sgdnord.rlp.de
|
|
Created on: 23.08.21
|
|
|
|
"""
|
|
import requests
|
|
from django.core.management import BaseCommand
|
|
from xml.etree import ElementTree as etree
|
|
|
|
from codelist.models import KonovaCode, KonovaCodeList
|
|
from codelist.settings import CODELIST_INTERVENTION_HANDLER_ID, CODELIST_CONSERVATION_OFFICE_ID, \
|
|
CODELIST_REGISTRATION_OFFICE_ID, CODELIST_BIOTOPES_ID, CODELIST_LAW_ID, CODELIST_COMPENSATION_HANDLER_ID, \
|
|
CODELIST_COMPENSATION_ACTION_ID, CODELIST_COMPENSATION_ACTION_CLASS_ID, CODELIST_COMPENSATION_ADDITIONAL_TYPE_ID, \
|
|
CODELIST_COMPENSATION_COMBINATION_ID, CODELIST_BASE_URL
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Performs test on collisions using the identifier generation"
|
|
|
|
def handle(self, *args, **options):
|
|
try:
|
|
codelist_ids = [
|
|
CODELIST_INTERVENTION_HANDLER_ID,
|
|
CODELIST_CONSERVATION_OFFICE_ID,
|
|
CODELIST_REGISTRATION_OFFICE_ID,
|
|
CODELIST_BIOTOPES_ID,
|
|
CODELIST_LAW_ID,
|
|
CODELIST_COMPENSATION_HANDLER_ID,
|
|
CODELIST_COMPENSATION_ACTION_ID,
|
|
CODELIST_COMPENSATION_ACTION_CLASS_ID,
|
|
CODELIST_COMPENSATION_ADDITIONAL_TYPE_ID,
|
|
CODELIST_COMPENSATION_COMBINATION_ID,
|
|
]
|
|
self._write_warning("Fetching codes...")
|
|
|
|
for list_id in codelist_ids:
|
|
_url = CODELIST_BASE_URL + "/" + str(list_id)
|
|
result = requests.get(url=_url)
|
|
|
|
if result.status_code != 200:
|
|
self._write_error("Error on codelist url '{}'".format(_url))
|
|
continue
|
|
self._write_warning("Fetch codes for list id {}".format(list_id))
|
|
content = result.content.decode("utf-8")
|
|
root = etree.fromstring(content)
|
|
items = root.findall("item")
|
|
self._write_warning("Found {} codes. Process now...".format(len(items)))
|
|
|
|
code_list = KonovaCodeList.objects.get_or_create(
|
|
id=list_id,
|
|
)[0]
|
|
self._rec_xml_code_fetch(
|
|
items=items,
|
|
code_list=code_list,
|
|
parent=None,
|
|
)
|
|
"""
|
|
for element in items:
|
|
atom_id = element.find("atomid").text
|
|
parent = element.find("vaterid").text
|
|
short_name = element.find("shortname").text
|
|
long_name = element.find("longname").text
|
|
is_archived = bool(element.find("archive").text)
|
|
|
|
# If a parent has been set, we need to fetch/create this entry. Otherwise ("0") we ignore it.
|
|
if parent == "0":
|
|
parent = None
|
|
else:
|
|
parent = KonovaCode.objects.get_or_create(
|
|
id=parent,
|
|
)[0]
|
|
|
|
code = KonovaCode.objects.get_or_create(
|
|
id=atom_id,
|
|
)
|
|
created = code[1]
|
|
if created:
|
|
num_created += 1
|
|
else:
|
|
num_updated += 1
|
|
code = code[0]
|
|
code.short_name = short_name
|
|
code.long_name = long_name
|
|
code.parent = parent
|
|
code.is_active = is_archived
|
|
|
|
code.save()
|
|
if code not in code_list.codes.all():
|
|
code_list.codes.add(code)
|
|
|
|
"""
|
|
|
|
except KeyboardInterrupt:
|
|
self._break_line()
|
|
exit(-1)
|
|
|
|
def _rec_xml_code_fetch(self, items: list, code_list: KonovaCodeList, parent: KonovaCode):
|
|
if items is None:
|
|
return
|
|
else:
|
|
for element in items:
|
|
children = element.find("items")
|
|
atom_id = element.find("atomid").text
|
|
short_name = element.find("shortname").text
|
|
long_name = element.find("longname").text
|
|
is_archived = bool(element.find("archive").text)
|
|
|
|
code = KonovaCode.objects.get_or_create(
|
|
id=atom_id,
|
|
)
|
|
code = code[0]
|
|
code.short_name = short_name
|
|
code.long_name = long_name
|
|
code.parent = parent
|
|
code.is_active = is_archived
|
|
code.is_leaf = children is None
|
|
|
|
code.save()
|
|
if code not in code_list.codes.all():
|
|
code_list.codes.add(code)
|
|
|
|
self._rec_xml_code_fetch(
|
|
items=children,
|
|
code_list=code_list,
|
|
parent=code
|
|
)
|
|
|
|
def _break_line(self):
|
|
""" Simply prints a line break
|
|
|
|
Returns:
|
|
|
|
"""
|
|
self.stdout.write("\n")
|
|
|
|
def _write_warning(self, txt: str):
|
|
self.stdout.write(
|
|
self.style.WARNING(
|
|
txt
|
|
)
|
|
)
|
|
|
|
def _write_success(self, txt: str):
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
txt
|
|
)
|
|
)
|
|
|
|
def _write_error(self, txt: str):
|
|
self.stdout.write(
|
|
self.style.ERROR(
|
|
txt
|
|
)
|
|
) |