from time import sleep from celery import shared_task from django.core.exceptions import ObjectDoesNotExist @shared_task def celery_update_parcels(geometry_id: str, recheck: bool = True): from konova.models import Geometry, ParcelIntersection try: geom = Geometry.objects.get(id=geometry_id) objs = geom.parcelintersection_set.all() for obj in objs: obj.calculated_on = None ParcelIntersection.objects.bulk_update( objs, ["calculated_on"] ) geom.update_parcels() except ObjectDoesNotExist: if recheck: sleep(5) celery_update_parcels(geometry_id, False) @shared_task def celery_send_mail_shared_access_removed(obj_identifier, obj_title=None, user_id=None, municipals_names=[]): from user.models import User user = User.objects.get(id=user_id) user.send_mail_shared_access_removed(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_access_given(obj_identifier, obj_title=None, user_id=None, municipals_names=[]): from user.models import User user = User.objects.get(id=user_id) user.send_mail_shared_access_given(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_access_removed_team(obj_identifier, obj_title=None, team_id=None, municipals_names=[]): from user.models import Team team = Team.objects.get(id=team_id) team.send_mail_shared_access_removed(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_access_given_team(obj_identifier, obj_title=None, team_id=None, municipals_names=[]): from user.models import Team team = Team.objects.get(id=team_id) team.send_mail_shared_access_given_team(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_data_recorded(obj_identifier, obj_title=None, user_id=None, municipals_names=[]): from user.models import User user = User.objects.get(id=user_id) user.send_mail_shared_data_recorded(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_data_unrecorded(obj_identifier, obj_title=None, user_id=None, municipals_names=[]): from user.models import User user = User.objects.get(id=user_id) user.send_mail_shared_data_unrecorded(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_data_recorded_team(obj_identifier, obj_title=None, team_id=None, municipals_names=[]): from user.models import Team team = Team.objects.get(id=team_id) team.send_mail_shared_data_recorded(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_data_unrecorded_team(obj_identifier, obj_title=None, team_id=None, municipals_names=[]): from user.models import Team team = Team.objects.get(id=team_id) team.send_mail_shared_data_unrecorded(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_data_deleted(obj_identifier, obj_title=None, user_id=None, municipals_names=[]): from user.models import User user = User.objects.get(id=user_id) user.send_mail_shared_data_deleted(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_data_checked(obj_identifier, obj_title=None, user_id=None, municipals_names=[]): from user.models import User user = User.objects.get(id=user_id) user.send_mail_shared_data_checked(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_data_deleted_team(obj_identifier, obj_title=None, team_id=None, municipals_names=[]): from user.models import Team team = Team.objects.get(id=team_id) team.send_mail_shared_data_deleted(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_shared_data_checked_team(obj_identifier, obj_title=None, team_id=None, municipals_names=[]): from user.models import Team team = Team.objects.get(id=team_id) team.send_mail_shared_data_checked(obj_identifier, obj_title, municipals_names) @shared_task def celery_send_mail_deduction_changed_team(obj_identifier, obj_title=None, team_id=None, data_changes=None): from user.models import Team team = Team.objects.get(id=team_id) team.send_mail_deduction_changed(obj_identifier, obj_title, data_changes) @shared_task def celery_send_mail_deduction_changed(obj_identifier, obj_title=None, user_id=None, data_changes=None): from user.models import User user = User.objects.get(id=user_id) user.send_mail_deduction_changed(obj_identifier, obj_title, data_changes) @shared_task def celery_check_for_geometry_conflicts(geom_id): from konova.models import Geometry geometry = Geometry.objects.get(id=geom_id) geometry.check_for_conflicts()