mpeltriaux
69a8139601
* drops unused methods * fixes typos * updates comments * drops unused model attribute
191 lines
6.5 KiB
Python
191 lines
6.5 KiB
Python
from time import sleep
|
|
|
|
from celery import shared_task
|
|
from django.apps import apps
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
|
|
|
|
@shared_task
|
|
def celery_update_parcels(geometry_id: str, recheck: bool = True):
|
|
from konova.models import Geometry, ParcelIntersection
|
|
try:
|
|
geom = Geometry.objects.get(id=geometry_id)
|
|
geom.parcels.clear()
|
|
geom.update_parcels()
|
|
|
|
except ObjectDoesNotExist:
|
|
if recheck:
|
|
sleep(5)
|
|
celery_update_parcels(geometry_id, False)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_access_removed(obj_id, obj_class, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
user.send_mail_shared_access_removed(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_access_given(obj_id, obj_class, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
user.send_mail_shared_access_given(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_access_removed_team(obj_id, obj_class, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
team.send_mail_shared_access_removed(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_access_given_team(obj_id, obj_class, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
team.send_mail_shared_access_given_team(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_recorded(obj_id, obj_class, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
user.send_mail_shared_data_recorded(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_unrecorded(obj_id, obj_class, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
user.send_mail_shared_data_unrecorded(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_recorded_team(obj_id, obj_class, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
team.send_mail_shared_data_recorded(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_unrecorded_team(obj_id, obj_class, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
team.send_mail_shared_data_unrecorded(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_deleted(obj_id, obj_class, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
user.send_mail_shared_data_deleted(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_checked(obj_id, obj_class, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
user.send_mail_shared_data_checked(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_deleted_team(obj_id, obj_class, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
team.send_mail_shared_data_deleted(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_checked_team(obj_id, obj_class, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
|
|
municipals_names = list(obj.geometry.get_underlying_municipals().values_list("name", flat=True))
|
|
team.send_mail_shared_data_checked(obj, municipals_names)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_deduction_changed_team(obj_id, obj_class, team_id=None, data_changes=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
team.send_mail_deduction_changed(obj, data_changes)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_deduction_changed(obj_id, obj_class, user_id=None, data_changes=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
|
|
obj_class = apps.get_model(obj_class[0], obj_class[1])
|
|
obj = obj_class.objects.get(id=obj_id)
|
|
user.send_mail_deduction_changed(obj, data_changes)
|
|
|
|
|
|
@shared_task
|
|
def celery_check_for_geometry_conflicts(geom_id):
|
|
from konova.models import Geometry
|
|
geometry = Geometry.objects.get(id=geom_id)
|
|
geometry.check_for_conflicts()
|