* adds new custom command send_to_egon for performing EGON sending on a list of intervention ids
55 lines
1.9 KiB
Python
55 lines
1.9 KiB
Python
"""
|
|
Author: Michel Peltriaux
|
|
Organization: Struktur- und Genehmigungsdirektion Nord, Rhineland-Palatinate, Germany
|
|
Contact: ksp-servicestelle@sgdnord.rlp.de
|
|
Created on: 18.06.24
|
|
|
|
"""
|
|
from django.db.models import QuerySet
|
|
|
|
from intervention.models import Intervention
|
|
from konova.management.commands.setup import BaseKonovaCommand
|
|
|
|
|
|
class Command(BaseKonovaCommand):
|
|
help = "Send specific intervention entries to EGON if there are any payments on them"
|
|
|
|
def add_arguments(self, parser):
|
|
try:
|
|
parser.add_argument("--intervention-ids", type=str)
|
|
except ValueError as e:
|
|
self._write_error(f"Argument error: {e}")
|
|
exit(-1)
|
|
|
|
def __handle_arguments(self, options):
|
|
self.intervention_ids = options["intervention_ids"] or ""
|
|
self.intervention_ids = self.intervention_ids.split(",")
|
|
self.intervention_ids = [x.strip() for x in self.intervention_ids]
|
|
|
|
def handle(self, *args, **options):
|
|
try:
|
|
self.__handle_arguments(options)
|
|
interventions = self.get_interventions()
|
|
self.process_egon_sending(interventions)
|
|
except KeyboardInterrupt:
|
|
self._break_line()
|
|
exit(-1)
|
|
|
|
def get_interventions(self) -> QuerySet:
|
|
"""
|
|
Getter for interventions, defined by parameter 'intervention-ids'
|
|
|
|
Returns:
|
|
interventions (QuerySet): The interventions
|
|
"""
|
|
interventions = Intervention.objects.filter(
|
|
id__in=self.intervention_ids,
|
|
)
|
|
self._write_success(f"... Found {interventions.count()} interventions")
|
|
return interventions
|
|
|
|
def process_egon_sending(self, interventions: QuerySet):
|
|
for intervention in interventions:
|
|
intervention.send_data_to_egon()
|
|
self._write_warning(f"... {intervention.identifier} has been sent to EGON (if it has payments)")
|