#132 Migrater restructure

* restructures code into base migrater logic and inheriting migrater logic
    * preparation for further migraters
This commit is contained in:
mpeltriaux 2022-03-14 15:03:05 +01:00
parent e95b337edc
commit 7338bfd073
3 changed files with 287 additions and 264 deletions

View File

@ -0,0 +1,32 @@
from abc import abstractmethod
import psycopg2
class BaseMigrater:
options = None
db_connection = None
def __init__(self, options: dict):
self.options = options
self.connect_db()
def __del__(self):
if self.db_connection is not None:
self.db_connection.close()
def connect_db(self):
if self.options is None:
return
conn = psycopg2.connect(
host=self.options["host"],
database="ksp",
user=self.options["db_user"],
password=self.options["db_pw"],
)
print("Connected to ksp db...")
self.db_connection = conn
@abstractmethod
def migrate(self):
raise NotImplementedError("Must be implemented in subclass")

View File

@ -0,0 +1,252 @@
from django.contrib.gis.geos import GEOSException, MultiPolygon, Polygon, MultiPoint, MultiLineString
from django.core.exceptions import ObjectDoesNotExist
from django.core.files.uploadedfile import UploadedFile
from django.db import transaction
from codelist.models import KonovaCode
from codelist.settings import CODELIST_LAW_ID, CODELIST_PROCESS_TYPE_ID, CODELIST_HANDLER_ID, \
CODELIST_CONSERVATION_OFFICE_ID, CODELIST_REGISTRATION_OFFICE_ID
from compensation.models import Payment
from intervention.models import Intervention, InterventionDocument, Legal, Handler, Responsibility
from konova.management.commands.kspMigrater.base_migrater import BaseMigrater
from konova.models import Geometry
class InterventionMigrater(BaseMigrater):
def _migrate_intervention_geometry(self, intervention:Intervention, eiv: tuple):
try:
# eiv[2] == polygon, eiv[3] == line, eiv[4] == point
if eiv[2] is not None:
eiv_geom = MultiPolygon.from_ewkt(eiv[2])
elif eiv[3] is not None:
eiv_geom = MultiLineString.from_ewkt(eiv[3])
eiv_geom = eiv_geom.buffer(0.00001, 1)
if isinstance(eiv_geom, Polygon):
eiv_geom = MultiPolygon(eiv_geom)
elif eiv[4] is not None:
eiv_geom = MultiPoint.from_ewkt(eiv[4])
eiv_geom = eiv_geom.buffer(0.00001, 1)
if isinstance(eiv_geom, Polygon):
eiv_geom = MultiPolygon(eiv_geom)
else:
eiv_geom = None
except GEOSException:
print(f"Malicious geometry on {eiv}")
return
geom = intervention.geometry or Geometry()
geom.geom = eiv_geom
geom.save()
# celery_update_parcels.delay(geom.id)
intervention.geometry = geom
return intervention
def _migrate_intervention_responsibility(self, intervention, eiv):
intervention.responsible = intervention.responsible or Responsibility.objects.create()
intervention.responsible.handler = intervention.responsible.handler or Handler.objects.create()
eiv_reg_off = eiv[7]
eiv_cons_off = eiv[9]
eiv_handler_type = eiv[11]
eiv_handler_detail = eiv[12]
if eiv_reg_off is not None and eiv_reg_off != 0:
try:
reg_office_code = KonovaCode.objects.get(
atom_id=eiv_reg_off,
is_leaf=True,
code_lists__in=[CODELIST_REGISTRATION_OFFICE_ID],
)
intervention.responsible.registration_office = reg_office_code
except ObjectDoesNotExist:
intervention.comment = f"{intervention.comment or ''}\nNicht migrierbare Zulassungsbehörde: {eiv_reg_off}"
intervention.responsible.registration_file_number = eiv[8]
if eiv_cons_off is not None and eiv_cons_off != 0:
try:
cons_office_code = KonovaCode.objects.get(
atom_id=eiv_cons_off,
is_leaf=True,
code_lists__in=[CODELIST_CONSERVATION_OFFICE_ID],
)
except ObjectDoesNotExist:
intervention.comment = f"{intervention.comment or ''}\nNicht migrierbare Eintragungsstelle: {eiv_cons_off}"
intervention.responsible.conservation_office = cons_office_code
intervention.responsible.conservation_file_number = eiv[10]
if eiv_handler_type is not None and eiv_handler_type != 0:
try:
handler_type_code = KonovaCode.objects.get(
atom_id=eiv_handler_type,
is_leaf=True,
code_lists__in=[CODELIST_HANDLER_ID]
)
intervention.responsible.handler.type = handler_type_code
except ObjectDoesNotExist:
intervention.comment = f"{intervention.comment or ''}\nNicht migrierbarer Eingriffsverursacher_TYP: {eiv_handler_type}"
intervention.responsible.handler.detail = eiv_handler_detail
intervention.responsible.handler.save()
intervention.responsible.save()
return intervention
def _migrate_intervention_legal(self, intervention, eiv):
intervention.legal = intervention.legal or Legal.objects.create()
eiv_process_type = eiv[5]
eiv_law = eiv[6]
eiv_date_registration = eiv[14]
eiv_date_binding = eiv[15]
if eiv_process_type is not None and eiv_process_type != 0:
process_type_code = KonovaCode.objects.get(
atom_id=eiv_process_type,
is_leaf=True,
code_lists__in=[CODELIST_PROCESS_TYPE_ID],
)
intervention.legal.process_type = process_type_code
if eiv_law is not None and eiv_law != 0:
law_code = KonovaCode.objects.get(
atom_id=eiv_law,
is_leaf=True,
code_lists__in=[CODELIST_LAW_ID],
)
intervention.legal.laws.add(law_code)
if eiv_date_registration is not None:
intervention.legal.registration_date = eiv_date_registration
if eiv_date_binding is not None:
intervention.legal.binding_date = eiv_date_binding
intervention.legal.save()
return intervention
def _migrate_intervention_payment(self, intervention, eiv):
payment_date = eiv[16]
payment_amount = eiv[17]
if payment_amount is not None and payment_amount != 0:
payment_exists = intervention.payments.filter(
amount=payment_amount
).exists()
if payment_exists:
return intervention
payment = Payment(
amount=payment_amount,
)
if payment_date is None:
payment.comment = "Datenmigration: Kein Zahlungsdatum hinterlegt! Schnellstmöglich nachtragen oder diesen Kommentar mit einer Begründung ersetzen, falls kein Datum existiert."
else:
payment.due_on = payment_date
payment.save()
intervention.payments.add(payment)
return intervention
def _migrate_intervention_documents(self, intervention, eiv):
eiv_identifier = f"'{eiv[0]}'"
tmp_cursor = self.db_connection.cursor()
tmp_cursor.execute(
'select '
'doc.pfad, '
'doc."Bemerkung", '
'doc."Datum" '
'from "OBJ_MASTER" om '
'left join "LINFOS" linf on om."GISPADID"=linf."GISPADID" '
'left join "Objektphotos" doc on linf."PKEY"=doc."FKEY" '
'where '
f'om."KENNUNG"={eiv_identifier} and '
'doc.pfad is not null '
)
doc_results = tmp_cursor.fetchall()
if len(doc_results) > 0:
for doc_result in doc_results:
doc_path = doc_result[0]
doc_comment = doc_result[1]
doc_date = doc_result[2]
with open(doc_path, encoding="latin1") as file:
file = UploadedFile(file)
doc_title = "Migrierte Datei"
doc_date = doc_date or "1970-01-01"
doc_exists = InterventionDocument.objects.filter(
instance=intervention,
title=doc_title,
comment=doc_comment,
date_of_creation=doc_date
).exists()
if doc_exists:
continue
doc = InterventionDocument.objects.create(
title=doc_title,
comment=doc_comment,
file=file,
date_of_creation=doc_date,
instance=intervention,
)
tmp_cursor.close()
return intervention
def migrate(self):
cursor = self.db_connection.cursor()
cursor.execute(
'select '
'om."KENNUNG", '
'linf."OBJBEZ", '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(geomf.the_geom,4326)), 3))) as geomf, '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(geoml.the_geom,4326)), 2))) as geoml, '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(geomp.the_geom,4326)), 1))) as geomp, '
'eiv.verfahrenstyp, '
'vr.verfahrensrecht, '
'adr.behoerde as zb, '
'linf."AZ" as zb_az, '
'adr.adr_pruef as ets, '
'auf."AZ" as ets_az, '
'adr.adressrolle as eingriffsverursacher_typ, '
'adr."Bemerkung" as eingriffsverursacher_bemerkung, '
'linf."Bemerkung" as eiv_comment, '
'zt.erlass as Zulassungsdatum, '
'zt.rechtskraft as Bestandskraftdatum, '
'zt.baubeginn as ersatzzahlungstermin, '
'eiv.ersatzzahlung '
'from "OBJ_MASTER" om '
'left join "LINFOS" linf on om."GISPADID"=linf."GISPADID" '
'left join eiv on om."GISPADID"=eiv.gispadid '
'left join verfahrensrecht vr on om."GISPADID"=vr.gispadid '
'left join geometry_f geomf on om."GISPADID"=geomf.gispadid '
'left join geometry_l geoml on om."GISPADID"=geoml.gispadid '
'left join geometry_p geomp on om."GISPADID"=geomp.gispadid '
'left join adressrolle adr on om."GISPADID"=adr."GISPADID" '
'left join "Aufwertung" auf on om."GISPADID"=auf."GISPADID" '
'left join zulassungstermin zt on eiv.zulassung=zt.id '
'where '
'om."OKL"=7730085 and '
'om.archiv=false and '
'om.nicht_vollstaendig=0;'
)
all_eivs = cursor.fetchall()
len_all_eivs = len(all_eivs)
num_processed = 0
print(f"Migrate EIVs to interventions...")
print(f"--Found {len_all_eivs} entries. Process now...")
for eiv in all_eivs:
if num_processed % 500 == 0:
print(f"----{num_processed}/{len_all_eivs} processed")
with transaction.atomic():
eiv_comment = eiv[13] or ""
intervention = Intervention.objects.get_or_create(
identifier=eiv[0]
)[0]
intervention.title = eiv[1]
intervention.comment = eiv_comment
intervention = self._migrate_intervention_geometry(intervention, eiv)
intervention = self._migrate_intervention_legal(intervention, eiv)
intervention = self._migrate_intervention_responsibility(intervention, eiv)
intervention = self._migrate_intervention_payment(intervention, eiv)
intervention = self._migrate_intervention_documents(intervention, eiv)
intervention.save()
num_processed += 1
cursor.close()

View File

@ -1,17 +1,5 @@
import psycopg2 from konova.management.commands.kspMigrater.intervention_migrater import InterventionMigrater
from django.contrib.gis.geos import MultiPolygon, GEOSException, MultiPoint, MultiLineString, Polygon
from django.core.exceptions import ObjectDoesNotExist
from django.core.files.uploadedfile import UploadedFile
from django.db import transaction
from codelist.models import KonovaCode
from codelist.settings import CODELIST_CONSERVATION_OFFICE_ID, CODELIST_REGISTRATION_OFFICE_ID, \
CODELIST_PROCESS_TYPE_ID, CODELIST_LAW_ID, CODELIST_HANDLER_ID
from compensation.models import Payment
from intervention.models import Intervention, Legal, Responsibility, Handler, InterventionDocument
from konova.management.commands.setup import BaseKonovaCommand from konova.management.commands.setup import BaseKonovaCommand
from konova.models import Geometry
from konova.tasks import celery_update_parcels
class Command(BaseKonovaCommand): class Command(BaseKonovaCommand):
@ -25,258 +13,9 @@ class Command(BaseKonovaCommand):
def handle(self, *args, **options): def handle(self, *args, **options):
try: try:
self.connect_db(options) intervention_migrater = InterventionMigrater(options)
self.migrate_interventions() intervention_migrater.migrate()
if self.db_connection is not None:
self.db_connection.close()
except KeyboardInterrupt: except KeyboardInterrupt:
self._break_line() self._break_line()
exit(-1) exit(-1)
def connect_db(self, options):
conn = psycopg2.connect(
host=options["host"],
database="ksp",
user=options["db_user"],
password=options["db_pw"],
)
self._write_success("Connected to ksp db...")
self.db_connection = conn
def _migrate_intervention_geometry(self, intervention:Intervention, eiv: tuple):
try:
# eiv[2] == polygon, eiv[3] == line, eiv[4] == point
if eiv[2] is not None:
eiv_geom = MultiPolygon.from_ewkt(eiv[2])
elif eiv[3] is not None:
eiv_geom = MultiLineString.from_ewkt(eiv[3])
eiv_geom = eiv_geom.buffer(0.00001, 1)
if isinstance(eiv_geom, Polygon):
eiv_geom = MultiPolygon(eiv_geom)
elif eiv[4] is not None:
eiv_geom = MultiPoint.from_ewkt(eiv[4])
eiv_geom = eiv_geom.buffer(0.00001, 1)
if isinstance(eiv_geom, Polygon):
eiv_geom = MultiPolygon(eiv_geom)
else:
eiv_geom = None
except GEOSException:
self._write_error(f"Malicious geometry on {eiv}")
return
geom = intervention.geometry or Geometry()
geom.geom = eiv_geom
geom.save()
# celery_update_parcels.delay(geom.id)
intervention.geometry = geom
return intervention
def _migrate_intervention_responsibility(self, intervention, eiv):
intervention.responsible = intervention.responsible or Responsibility.objects.create()
intervention.responsible.handler = intervention.responsible.handler or Handler.objects.create()
eiv_reg_off = eiv[7]
eiv_cons_off = eiv[9]
eiv_handler_type = eiv[11]
eiv_handler_detail = eiv[12]
if eiv_reg_off is not None and eiv_reg_off != 0:
try:
reg_office_code = KonovaCode.objects.get(
atom_id=eiv_reg_off,
is_leaf=True,
code_lists__in=[CODELIST_REGISTRATION_OFFICE_ID],
)
intervention.responsible.registration_office = reg_office_code
except ObjectDoesNotExist:
intervention.comment = f"{intervention.comment or ''}\nNicht migrierbare Zulassungsbehörde: {eiv_reg_off}"
intervention.responsible.registration_file_number = eiv[8]
if eiv_cons_off is not None and eiv_cons_off != 0:
try:
cons_office_code = KonovaCode.objects.get(
atom_id=eiv_cons_off,
is_leaf=True,
code_lists__in=[CODELIST_CONSERVATION_OFFICE_ID],
)
except ObjectDoesNotExist:
intervention.comment = f"{intervention.comment or ''}\nNicht migrierbare Eintragungsstelle: {eiv_cons_off}"
intervention.responsible.conservation_office = cons_office_code
intervention.responsible.conservation_file_number = eiv[10]
if eiv_handler_type is not None and eiv_handler_type != 0:
try:
handler_type_code = KonovaCode.objects.get(
atom_id=eiv_handler_type,
is_leaf=True,
code_lists__in=[CODELIST_HANDLER_ID]
)
intervention.responsible.handler.type = handler_type_code
except ObjectDoesNotExist:
intervention.comment = f"{intervention.comment or ''}\nNicht migrierbarer Eingriffsverursacher_TYP: {eiv_handler_type}"
intervention.responsible.handler.detail = eiv_handler_detail
intervention.responsible.handler.save()
intervention.responsible.save()
return intervention
def _migrate_intervention_legal(self, intervention, eiv):
intervention.legal = intervention.legal or Legal.objects.create()
eiv_process_type = eiv[5]
eiv_law = eiv[6]
eiv_date_registration = eiv[14]
eiv_date_binding = eiv[15]
if eiv_process_type is not None and eiv_process_type != 0:
process_type_code = KonovaCode.objects.get(
atom_id=eiv_process_type,
is_leaf=True,
code_lists__in=[CODELIST_PROCESS_TYPE_ID],
)
intervention.legal.process_type = process_type_code
if eiv_law is not None and eiv_law != 0:
law_code = KonovaCode.objects.get(
atom_id=eiv_law,
is_leaf=True,
code_lists__in=[CODELIST_LAW_ID],
)
intervention.legal.laws.add(law_code)
if eiv_date_registration is not None:
intervention.legal.registration_date = eiv_date_registration
if eiv_date_binding is not None:
intervention.legal.binding_date = eiv_date_binding
intervention.legal.save()
return intervention
def _migrate_intervention_payment(self, intervention, eiv):
payment_date = eiv[16]
payment_amount = eiv[17]
if payment_amount is not None and payment_amount != 0:
payment_exists = intervention.payments.filter(
amount=payment_amount
).exists()
if payment_exists:
return intervention
payment = Payment(
amount=payment_amount,
)
if payment_date is None:
payment.comment = "Datenmigration: Kein Zahlungsdatum hinterlegt! Schnellstmöglich nachtragen oder diesen Kommentar mit einer Begründung ersetzen, falls kein Datum existiert."
else:
payment.due_on = payment_date
payment.save()
intervention.payments.add(payment)
return intervention
def _migrate_intervention_documents(self, intervention, eiv):
eiv_identifier = f"'{eiv[0]}'"
tmp_cursor = self.db_connection.cursor()
tmp_cursor.execute(
'select '
'doc.pfad, '
'doc."Bemerkung", '
'doc."Datum" '
'from "OBJ_MASTER" om '
'left join "LINFOS" linf on om."GISPADID"=linf."GISPADID" '
'left join "Objektphotos" doc on linf."PKEY"=doc."FKEY" '
'where '
f'om."KENNUNG"={eiv_identifier} and '
'doc.pfad is not null '
)
doc_results = tmp_cursor.fetchall()
if len(doc_results) > 0:
for doc_result in doc_results:
doc_path = doc_result[0]
doc_comment = doc_result[1]
doc_date = doc_result[2]
with open(doc_path, encoding="latin1") as file:
file = UploadedFile(file)
doc_title = "Migrierte Datei"
doc_date = doc_date or "1970-01-01"
doc_exists = InterventionDocument.objects.filter(
instance=intervention,
title=doc_title,
comment=doc_comment,
date_of_creation=doc_date
).exists()
if doc_exists:
continue
doc = InterventionDocument.objects.create(
title=doc_title,
comment=doc_comment,
file=file,
date_of_creation=doc_date,
instance=intervention,
)
tmp_cursor.close()
return intervention
def migrate_interventions(self):
cursor = self.db_connection.cursor()
cursor.execute(
'select '
'om."KENNUNG", '
'linf."OBJBEZ", '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(geomf.the_geom,4326)), 3))) as geomf, '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(geoml.the_geom,4326)), 2))) as geoml, '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(geomp.the_geom,4326)), 1))) as geomp, '
'eiv.verfahrenstyp, '
'vr.verfahrensrecht, '
'adr.behoerde as zb, '
'linf."AZ" as zb_az, '
'adr.adr_pruef as ets, '
'auf."AZ" as ets_az, '
'adr.adressrolle as eingriffsverursacher_typ, '
'adr."Bemerkung" as eingriffsverursacher_bemerkung, '
'linf."Bemerkung" as eiv_comment, '
'zt.erlass as Zulassungsdatum, '
'zt.rechtskraft as Bestandskraftdatum, '
'zt.baubeginn as ersatzzahlungstermin, '
'eiv.ersatzzahlung '
'from "OBJ_MASTER" om '
'left join "LINFOS" linf on om."GISPADID"=linf."GISPADID" '
'left join eiv on om."GISPADID"=eiv.gispadid '
'left join verfahrensrecht vr on om."GISPADID"=vr.gispadid '
'left join geometry_f geomf on om."GISPADID"=geomf.gispadid '
'left join geometry_l geoml on om."GISPADID"=geoml.gispadid '
'left join geometry_p geomp on om."GISPADID"=geomp.gispadid '
'left join adressrolle adr on om."GISPADID"=adr."GISPADID" '
'left join "Aufwertung" auf on om."GISPADID"=auf."GISPADID" '
'left join zulassungstermin zt on eiv.zulassung=zt.id '
'where '
'om."OKL"=7730085 and '
'om.archiv=false and '
'om.nicht_vollstaendig=0;'
)
all_eivs = cursor.fetchall()
len_all_eivs = len(all_eivs)
num_processed = 0
self._write_warning(f"Migrate EIVs to interventions...")
self._write_warning(f"--Found {len_all_eivs} entries. Process now...")
for eiv in all_eivs:
if num_processed % 500 == 0:
self._write_warning(f"----{num_processed}/{len_all_eivs} processed")
with transaction.atomic():
eiv_comment = eiv[13] or ""
intervention = Intervention.objects.get_or_create(
identifier=eiv[0]
)[0]
intervention.title = eiv[1]
intervention.comment = eiv_comment
intervention = self._migrate_intervention_geometry(intervention, eiv)
intervention = self._migrate_intervention_legal(intervention, eiv)
intervention = self._migrate_intervention_responsibility(intervention, eiv)
intervention = self._migrate_intervention_payment(intervention, eiv)
intervention = self._migrate_intervention_documents(intervention, eiv)
intervention.save()
num_processed += 1
cursor.close()