konova/konova/management/commands/kspMigrater/eco_account_migrater.py
mpeltriaux fbf613f16e #189 EcoAccount deferred parcel calculation
* adds celery parcel calculation to EcoAccount migration logic
2022-08-08 12:13:10 +02:00

413 lines
18 KiB
Python

import datetime
from django.contrib.gis.gdal import GDALException
from django.contrib.gis.geos import MultiPolygon, Polygon
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.utils import timezone, formats
from codelist.models import KonovaCode
from codelist.settings import CODELIST_CONSERVATION_OFFICE_ID, CODELIST_COMPENSATION_HANDLER_ID
from compensation.models import EcoAccount, EcoAccountDocument, EcoAccountDeduction
from compensation.utils.quality import EcoAccountQualityChecker
from intervention.models import Responsibility, Handler, Intervention, Legal
from konova.management.commands.kspMigrater.compensation_migrater import CompensationMigrater
from konova.models import Geometry
from konova.sub_settings.lanis_settings import DEFAULT_SRID_RLP
from konova.tasks import celery_update_parcels
from user.models import UserActionLogEntry
class EcoAccountMigrater(CompensationMigrater):
def migrate(self):
el = "'OEK-1488450234228'"
self.connect_db()
cursor = self.db_connection.cursor()
cursor.execute(
'select '
'om."KENNUNG", '
'linf."OBJBEZ", '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(geomf.the_geom), 3))) as geomf, '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(geoml.the_geom), 2))) as geoml, '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(geomp.the_geom), 1))) as geomp, '
'linf."Bemerkung" '
'from "OBJ_MASTER" om '
'left join "LINFOS" linf on om."GISPADID"=linf."GISPADID" '
'left join kom on om."GISPADID"=kom.gispadid '
'left join geometry_f geomf on om."GISPADID"=geomf.gispadid '
'left join geometry_l geoml on om."GISPADID"=geoml.gispadid '
'left join geometry_p geomp on om."GISPADID"=geomp.gispadid '
'where '
'om."OKL"=7730081 and '
'om.archiv=false and '
'om.nicht_vollstaendig=0 '
)
all_oeks = cursor.fetchall()
len_all_oeks = len(all_oeks)
num_processed = 0
print(f"Migrate OEKs to ecoaccounts...")
print(f"--Found {len_all_oeks} entries. Process now...")
for oek in all_oeks:
if num_processed % 500 == 0:
print(f"----{num_processed}/{len_all_oeks} processed")
with transaction.atomic():
oek_identifier = oek[0]
oek_title = oek[1]
oek_comment = oek[5] or ""
eco_account = EcoAccount.objects.get_or_create(
identifier=oek_identifier
)[0]
eco_account.title = oek_title
eco_account.prevent_recording = False
eco_account.comment = oek_comment
eco_account = self._migrate_legal(eco_account, oek)
eco_account = self._migrate_states(eco_account, oek)
eco_account = self._migrate_geometry(eco_account, oek)
eco_account = self._migrate_responsibility(eco_account, oek)
eco_account = self._migrate_deadlines(eco_account, oek)
eco_account = self._migrate_action_control_deadlines(eco_account, oek)
eco_account = self._migrate_actions(eco_account, oek)
eco_account = self._migrate_log(eco_account, oek)
eco_account = self._migrate_recorded(eco_account, oek)
eco_account = self._migrate_deductions(eco_account, oek)
eco_account = self._migrate_documents(eco_account, EcoAccountDocument, oek)
eco_account.save()
num_processed += 1
cursor.close()
def _migrate_geometry(self, instance, db_result: tuple):
identifier = f"'{db_result[0]}'"
empty_str = f"''"
tmp_cursor = self.db_connection.cursor()
tmp_cursor.execute(
'select '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Union(geom.the_geom))))) '
'from "OBJ_MASTER" om '
'left join "Aufwertung" auf on om."GISPADID"=auf."GISPADID" '
'left join "OBJ_MASTER" om_kom on auf."Infos"=om_kom."KENNUNG" '
'left join geometry_f geom on geom.gispadid=om_kom."GISPADID" '
'where '
'om."OKL"=7730081 and '
'om.archiv=false and '
'om_kom.archiv=false and '
f'(auf."Infos" is not null and auf."Infos"!={empty_str}) and '
f'om."KENNUNG"={identifier}'
)
deduction_db_results = tmp_cursor.fetchall()
if len(deduction_db_results) != 1:
raise AssertionError(f"Unexpected number of matches: {deduction_db_results}")
db_result_geom = deduction_db_results[0][0]
if db_result_geom is not None:
deductions_db_result_geom = MultiPolygon.from_ewkt(deduction_db_results[0][0])
else:
deductions_db_result_geom = MultiPolygon(srid=DEFAULT_SRID_RLP)
tmp_cursor.execute(
'select '
'ST_AsEWKT(ST_Multi(ST_CollectionExtract(ST_MakeValid(geomf.the_geom), 3))) as geomf '
'from "OBJ_MASTER" om '
'left join geometry_f geomf on om."GISPADID"=geomf.gispadid '
'where '
f'om."KENNUNG"={identifier}'
)
original_geom_db_results = tmp_cursor.fetchall()
if len(original_geom_db_results) != 1:
raise AssertionError(f"Unexpected number of matches: {original_geom_db_results}")
account_db_result_geom = MultiPolygon.from_ewkt(original_geom_db_results[0][0])
db_result_geom = account_db_result_geom.union(deductions_db_result_geom)
if isinstance(db_result_geom, Polygon):
db_result_geom = MultiPolygon(db_result_geom, srid=DEFAULT_SRID_RLP)
instance.geometry = instance.geometry or Geometry()
try:
# Calculate area by transforming
rlp_geom = db_result_geom.transform(ct=DEFAULT_SRID_RLP, clone=True)
area = round(rlp_geom.area)
max_state_after_area = instance.get_state_after_surface_sum()
# Check whether the geometric area is at least
diff = abs(area - max_state_after_area)
diff_perc = diff / max(area, max_state_after_area)
is_diff_too_high = diff_perc > 0.1
if is_diff_too_high:
print(f" !!! {identifier} has diff of {diff_perc*100} % between geometry and after_states. Should not be recorded!")
instance.comment += f"\n\nÖkokonto konnte nicht automatisch verzeichnet übernommen werden: Zu große Differenz zwischen Geometriefläche ({formats.localize(area, use_l10n=True)} m²) und angegebener Zielzustandsfläche. Bitte prüfen und korrigieren bzw. eigenständig verzeichnen."
instance.prevent_recording = True
area = min(area, max_state_after_area)
else:
area = max_state_after_area
instance.deductable_surface = area
instance.geometry.geom = db_result_geom if not db_result_geom.empty else None
instance.geometry.save()
celery_update_parcels.delay(instance.geometry.id)
except TypeError:
raise TypeError(f"{identifier}, {db_result_geom}")
except GDALException as e:
raise GDALException(f"{identifier}, {e}")
tmp_cursor.close()
return instance
def _migrate_responsibility(self, eco_account, oek):
fallback = "Kein Eintrag"
acc_identifier = f"'{oek[0]}'"
tmp_cursor = self.db_connection.cursor()
tmp_cursor.execute(
'select '
'adr."adr_pruef" as ets, '
'linf."AZ", '
'oek.traeger, '
'oek.bemerkungtraeger '
'from "OBJ_MASTER" om '
'left join "LINFOS" linf on om."GISPADID"=linf."GISPADID" '
'left join adressrolle adr on adr."GISPADID"=om."GISPADID" '
'left join oek on om."GISPADID"=oek.gispadid '
'where '
f'om."KENNUNG"={acc_identifier}'
)
db_results = tmp_cursor.fetchall()
if len(db_results) != 1:
raise AssertionError(f"{acc_identifier} has invalid responsibilities: {db_results}")
db_results = db_results[0]
cons_office_code = db_results[0]
cons_file_number = db_results[1]
handler_type = db_results[2]
handler_detail = db_results[3]
if cons_file_number is None or len(cons_file_number) == 0:
cons_file_number = fallback
eco_account.responsible = eco_account.responsible or Responsibility.objects.create()
try:
conservation_office = KonovaCode.objects.get(
atom_id=cons_office_code,
is_selectable=True,
is_archived=False,
code_lists__in=[CODELIST_CONSERVATION_OFFICE_ID]
)
self._migrate_responsible_code_to_team(eco_account, conservation_office, "ETS")
except ObjectDoesNotExist:
raise ObjectDoesNotExist(f"{acc_identifier}, {db_results}")
try:
handler_type = KonovaCode.objects.get(
atom_id=handler_type,
is_selectable=True,
is_archived=False,
code_lists__in=[CODELIST_COMPENSATION_HANDLER_ID]
)
except ObjectDoesNotExist:
handler_type = None
eco_account.responsible.conservation_office = conservation_office
eco_account.responsible.conservation_file_number = cons_file_number
handler = eco_account.responsible.handler or Handler.objects.create()
handler.type = handler_type
handler.detail = handler_detail
eco_account.responsible.handler = handler
eco_account.responsible.handler.save()
eco_account.responsible.save()
tmp_cursor.close()
return eco_account
def _migrate_deductions(self, eco_account, oek):
identifier = f"'{oek[0]}'"
empty_str = "''"
tmp_cursor = self.db_connection.cursor()
tmp_cursor.execute(
'select '
'om_kom."KENNUNG", '
'auf."Anteil", '
'ref."REFERENZ" '
'from "OBJ_MASTER" om '
'left join "Aufwertung" auf on om."GISPADID"=auf."GISPADID" '
'left join "OBJ_MASTER" om_kom on auf."Infos"=om_kom."KENNUNG" '
'left join "REFERENZ" ref on om_kom."GISPADID"=ref."GISPADID" '
'where '
f'(auf."Infos" is not null and auf."Infos"!={empty_str}) and '
f'(ref."REFERENZ" is not null and ref."REFERENZ"!={empty_str}) and '
f'om."KENNUNG"={identifier}'
)
fetched_results = tmp_cursor.fetchall()
eco_account.deductions.all().delete()
for result in fetched_results:
old_deduction_kom_identifier = result[0]
deduction_amount_percentage = result[1]
target_intervention_identifier = result[2]
if target_intervention_identifier is None:
# old garbage data - skip
continue
if deduction_amount_percentage is None or float(deduction_amount_percentage) > 100.0:
deduction_amount_percentage = self.__calculate_deduction_amount_percentage(eco_account, old_deduction_kom_identifier)
try:
intervention = Intervention.objects.get(
identifier=target_intervention_identifier
)
except ObjectDoesNotExist:
# old garbage data
print(f"{identifier} has deduction for {target_intervention_identifier} which does not exist")
continue
deduction_amount_sqm = round(eco_account.deductable_surface * (float(deduction_amount_percentage)/100))
rest_available_surface = eco_account.deductable_surface - eco_account.get_deductions_surface()
rest_after_deduction = rest_available_surface - deduction_amount_sqm
if rest_after_deduction < 0:
print(f"{identifier} has {rest_available_surface} sqm left but old deduction {old_deduction_kom_identifier} requires {deduction_amount_sqm} sqm.")
print(f"Increase deductable surface by {rest_after_deduction} sqm")
eco_account.deductable_surface += abs(rest_after_deduction)
eco_account.save()
deduction = EcoAccountDeduction.objects.get_or_create(
account=eco_account,
surface=deduction_amount_sqm,
intervention=intervention
)[0]
created_on = self.__fetch_deduction_create_date(old_deduction_kom_identifier)
deduction.created = created_on
deduction.save()
tmp_cursor.close()
return eco_account
def _migrate_legal(self, eco_account, oek):
# Just add default dummy values, since there has never been data for this previously
eco_account.legal = eco_account.legal or Legal()
eco_account.legal.registration_date = datetime.date.fromisoformat("1970-01-01")
eco_account.legal.save()
eco_account.comment += "\nKein Vereinbarungsdatum eingetragen. Platzhalter 01.01.1970 gesetzt."
return eco_account
def __calculate_deduction_amount_percentage(self, eco_account, kom_deduction_identifier):
""" Calculates the amount of a deduction from an eco account in percentage.
Depends on the geometry of the old KOM-deduction
"""
kom_deduction_identifier = f"'{kom_deduction_identifier}'"
result = 0.0
tmp_cursor = self.db_connection.cursor()
tmp_cursor.execute(
'select '
f'st_area(geom.the_geom) '
'from "OBJ_MASTER" om '
'join geometry_f geom on om."GISPADID"=geom.gispadid '
'where '
f'om."KENNUNG"={kom_deduction_identifier}'
)
fetch_result = tmp_cursor.fetchall()
area_surface = fetch_result[0][0]
tmp_cursor.close()
result = float(area_surface / eco_account.deductable_surface) * 100
return result
def __fetch_deduction_create_date(self, deduction_kom_identifier):
""" Fetches the creation timestamp for the old KOM-deduction to be used as create timestamp for
the migrated deduction entry
"""
deduction_kom_identifier = f"'{deduction_kom_identifier}'"
tmp_cursor = self.db_connection.cursor()
tmp_cursor.execute(
'select '
'log.erstelltam, '
'log.erstelltvon '
'from "OBJ_MASTER" om '
'join log on om."GISPADID"=log.gispadid::Integer '
'where '
f'om."KENNUNG"={deduction_kom_identifier} '
'order by log.erstelltam '
'limit 1'
)
fetch_results = tmp_cursor.fetchall()
if len(fetch_results) == 0:
tmp_cursor.execute(
'select '
'p.geaendertam, '
'p.geaendertvon '
'from "OBJ_MASTER" om '
'join protokoll p on om."GISPADID"=p."FKEY" '
'where '
f'om."KENNUNG"={deduction_kom_identifier} '
'order by p.geaendertam '
'limit 1'
)
fetch_results = tmp_cursor.fetchall()
if len(fetch_results) == 0:
return None
create_ts = fetch_results[0][0]
create_user = fetch_results[0][1]
user = self._get_migrate_user(create_user)
tmp_cursor.close()
create_action = UserActionLogEntry.get_created_action(user, comment="[Migriert] Abbuchung angelegt")
create_action.timestamp = timezone.make_aware(create_ts)
create_action.save()
return create_action
def _migrate_recorded(self, instance, db_result):
quality_checker = EcoAccountQualityChecker(instance)
quality_checker.run_check()
if quality_checker.valid and not instance.prevent_recording:
identifier = f"'{db_result[0]}'"
tmp_cursor = self.db_connection.cursor()
tmp_cursor.execute(
'select '
'lb.status_neu, '
'lb.erstelltvon, '
'lb.erstelltam '
'from "OBJ_MASTER" om '
'join logbuch lb on om."GISPADID"=lb.gispadid '
'where '
f'om."KENNUNG"={identifier} and '
'lb.status_neu=610 '
'order by lb.erstelltam desc '
'limit 1'
)
fetch_result = tmp_cursor.fetchone()
if fetch_result is None:
# Can happen on very old eco accounts: This data might only be found on table 'protokoll'
tmp_cursor.execute(
'select '
'p.bemerkung1, '
'p.geaendertvon, '
'p.geaendertam '
'from "OBJ_MASTER" om '
'join protokoll p on om."GISPADID"=p."FKEY" '
'where '
f'om."KENNUNG"={identifier} '
'order by '
'p.geaendertam '
'limit 1'
)
fetch_result = tmp_cursor.fetchone()
if fetch_result is not None:
# Something has been found on one of these two tables
recorded_by = fetch_result[1]
recorded_ts = timezone.make_aware(fetch_result[2])
user = self._get_migrate_user(recorded_by)
instance.recorded = instance.recorded or UserActionLogEntry.get_recorded_action(
user,
"Migriertes Verzeichnen"
)
instance.recorded.timestamp = recorded_ts
instance.recorded.save()
tmp_cursor.close()
else:
instance.recorded = None
return instance