konova/konova/tasks.py
mpeltriaux fe29035158 #101 Team mails
* adds mail templates for shared data actions
* fixes bug where deleted compensations would be used for checking
2022-02-18 15:19:37 +01:00

109 lines
3.6 KiB
Python

from time import sleep
from celery import shared_task
from django.core.exceptions import ObjectDoesNotExist
@shared_task
def celery_update_parcels(geometry_id: str, recheck: bool = True):
from konova.models import Geometry, ParcelIntersection
try:
geom = Geometry.objects.get(id=geometry_id)
objs = geom.parcelintersection_set.all()
for obj in objs:
obj.calculated_on = None
ParcelIntersection.objects.bulk_update(
objs,
["calculated_on"]
)
geom.update_parcels()
except ObjectDoesNotExist:
if recheck:
sleep(5)
celery_update_parcels(geometry_id, False)
@shared_task
def celery_send_mail_shared_access_removed(obj_identifier, obj_title=None, user_id=None):
from user.models import User
user = User.objects.get(id=user_id)
user.send_mail_shared_access_removed(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_access_given(obj_identifier, obj_title=None, user_id=None):
from user.models import User
user = User.objects.get(id=user_id)
user.send_mail_shared_access_given(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_access_removed_team(obj_identifier, obj_title=None, team_id=None):
from user.models import Team
team = Team.objects.get(id=team_id)
team.send_mail_shared_access_removed(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_access_given_team(obj_identifier, obj_title=None, team_id=None):
from user.models import Team
team = Team.objects.get(id=team_id)
team.send_mail_shared_access_given_team(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_data_recorded(obj_identifier, obj_title=None, user_id=None):
from user.models import User
user = User.objects.get(id=user_id)
user.send_mail_shared_data_recorded(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_data_unrecorded(obj_identifier, obj_title=None, user_id=None):
from user.models import User
user = User.objects.get(id=user_id)
user.send_mail_shared_data_unrecorded(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_data_recorded_team(obj_identifier, obj_title=None, team_id=None):
from user.models import Team
team = Team.objects.get(id=team_id)
team.send_mail_shared_data_recorded(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_data_unrecorded_team(obj_identifier, obj_title=None, team_id=None):
from user.models import Team
team = Team.objects.get(id=team_id)
team.send_mail_shared_data_unrecorded(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_data_deleted(obj_identifier, obj_title=None, user_id=None):
from user.models import User
user = User.objects.get(id=user_id)
user.send_mail_shared_data_deleted(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_data_checked(obj_identifier, obj_title=None, user_id=None):
from user.models import User
user = User.objects.get(id=user_id)
user.send_mail_shared_data_checked(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_data_deleted_team(obj_identifier, obj_title=None, team_id=None):
from user.models import Team
team = Team.objects.get(id=team_id)
team.send_mail_shared_data_deleted(obj_identifier, obj_title)
@shared_task
def celery_send_mail_shared_data_checked_team(obj_identifier, obj_title=None, team_id=None):
from user.models import Team
team = Team.objects.get(id=team_id)
team.send_mail_shared_data_checked(obj_identifier, obj_title)