mpeltriaux
2957a6aa60
* wraps check_for_conflicts() in celery based method * fixes bug on InterventionEditForm where geometry's save() has been called twice
130 lines
4.3 KiB
Python
130 lines
4.3 KiB
Python
from time import sleep
|
|
|
|
from celery import shared_task
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
|
|
|
|
@shared_task
|
|
def celery_update_parcels(geometry_id: str, recheck: bool = True):
|
|
from konova.models import Geometry, ParcelIntersection
|
|
try:
|
|
geom = Geometry.objects.get(id=geometry_id)
|
|
objs = geom.parcelintersection_set.all()
|
|
for obj in objs:
|
|
obj.calculated_on = None
|
|
ParcelIntersection.objects.bulk_update(
|
|
objs,
|
|
["calculated_on"]
|
|
)
|
|
|
|
geom.update_parcels()
|
|
except ObjectDoesNotExist:
|
|
if recheck:
|
|
sleep(5)
|
|
celery_update_parcels(geometry_id, False)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_access_removed(obj_identifier, obj_title=None, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
user.send_mail_shared_access_removed(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_access_given(obj_identifier, obj_title=None, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
user.send_mail_shared_access_given(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_access_removed_team(obj_identifier, obj_title=None, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
team.send_mail_shared_access_removed(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_access_given_team(obj_identifier, obj_title=None, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
team.send_mail_shared_access_given_team(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_recorded(obj_identifier, obj_title=None, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
user.send_mail_shared_data_recorded(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_unrecorded(obj_identifier, obj_title=None, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
user.send_mail_shared_data_unrecorded(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_recorded_team(obj_identifier, obj_title=None, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
team.send_mail_shared_data_recorded(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_unrecorded_team(obj_identifier, obj_title=None, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
team.send_mail_shared_data_unrecorded(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_deleted(obj_identifier, obj_title=None, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
user.send_mail_shared_data_deleted(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_checked(obj_identifier, obj_title=None, user_id=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
user.send_mail_shared_data_checked(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_deleted_team(obj_identifier, obj_title=None, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
team.send_mail_shared_data_deleted(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_shared_data_checked_team(obj_identifier, obj_title=None, team_id=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
team.send_mail_shared_data_checked(obj_identifier, obj_title)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_deduction_changed_team(obj_identifier, obj_title=None, team_id=None, data_changes=None):
|
|
from user.models import Team
|
|
team = Team.objects.get(id=team_id)
|
|
team.send_mail_deduction_changed(obj_identifier, obj_title, data_changes)
|
|
|
|
|
|
@shared_task
|
|
def celery_send_mail_deduction_changed(obj_identifier, obj_title=None, user_id=None, data_changes=None):
|
|
from user.models import User
|
|
user = User.objects.get(id=user_id)
|
|
user.send_mail_deduction_changed(obj_identifier, obj_title, data_changes)
|
|
|
|
|
|
@shared_task
|
|
def celery_check_for_geometry_conflicts(geom_id):
|
|
from konova.models import Geometry
|
|
geometry = Geometry.objects.get(id=geom_id)
|
|
geometry.check_for_conflicts()
|