Source code for zooma.tasks
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Oct 25 11:27:52 2018
@author: Paolo Cozzi <cozzi@ibba.cnr.it>
"""
from celery import group
from celery.utils.log import get_task_logger
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
from image.celery import app as celery_app
from uid.models import (
DictCountry, DictBreed, DictSpecie, DictUberon, DictDevelStage,
DictPhysioStage)
from .helpers import (
annotate_country, annotate_breed, annotate_specie, annotate_organismpart,
annotate_develstage, annotate_physiostage)
# Get an instance of a logger
logger = get_task_logger(__name__)
[docs]class AnnotateTaskMixin(NotifyAdminTaskMixin):
name = None
descripttion = None
model = None
annotate_func = None
[docs] def run(self):
"""This function is called when delay is called"""
logger.debug("Starting %s" % self.name.lower())
# get all countries without a term
for term in self.model.objects.filter(term__isnull=True):
self.annotate_func(term)
logger.debug("%s completed" % self.name.lower())
return "success"
[docs]class AnnotateCountries(AnnotateTaskMixin, BaseTask):
name = "Annotate Countries"
description = """Annotate countries with ontologies using Zooma tools"""
model = DictCountry
annotate_func = staticmethod(annotate_country)
@exclusive_task(
task_name="Annotate Countries", lock_id="AnnotateCountries")
def run(self):
return super().run()
[docs]class AnnotateBreeds(AnnotateTaskMixin, BaseTask):
name = "Annotate Breeds"
description = """Annotate breeds with ontologies using Zooma tools"""
model = DictBreed
annotate_func = staticmethod(annotate_breed)
@exclusive_task(task_name="Annotate Breeds", lock_id="AnnotateBreeds")
def run(self):
return super().run()
[docs]class AnnotateSpecies(AnnotateTaskMixin, BaseTask):
name = "Annotate Species"
description = """Annotate species with ontologies using Zooma tools"""
model = DictSpecie
annotate_func = staticmethod(annotate_specie)
@exclusive_task(task_name="Annotate Species", lock_id="AnnotateSpecies")
def run(self):
return super().run()
[docs]class AnnotateOrganismPart(AnnotateTaskMixin, BaseTask):
name = "Annotate OrganismPart"
description = "Annotate organism parts with ontologies using Zooma tools"
model = DictUberon
annotate_func = staticmethod(annotate_organismpart)
lock_id = "AnnotateOrganismPart"
@exclusive_task(task_name="Annotate Uberon", lock_id="AnnotateUberon")
def run(self):
return super().run()
[docs]class AnnotateDevelStage(AnnotateTaskMixin, BaseTask):
name = "Annotate DevelStage"
description = (
"Annotate developmental stages with ontologies using Zooma tools")
model = DictDevelStage
annotate_func = staticmethod(annotate_develstage)
@exclusive_task(
task_name="Annotate DevelStage",
lock_id="AnnotateDevelStage")
def run(self):
return super().run()
[docs]class AnnotatePhysioStage(AnnotateTaskMixin, BaseTask):
name = "Annotate PhysioStage"
description = (
"Annotate physiological stages with ontologies using Zooma tools")
model = DictPhysioStage
annotate_func = staticmethod(annotate_physiostage)
@exclusive_task(
task_name="Annotate PhysioStage",
lock_id="AnnotatePhysioStage")
def run(self):
return super().run()
[docs]class AnnotateAll(BaseTask):
name = "Annotate All"
description = """Annotate all dict tables using Zooma"""
@exclusive_task(task_name="Annotate All", lock_id="AnnotateAll")
def run(self):
"""
This function is called when delay is called. It will acquire a lock
in redis, so those tasks are mutually exclusive
Returns:
str: success if everything is ok. Different messages if task is
already running or exception is caught"""
# debugging instance
self.debug_task()
tasks = [
AnnotateCountries(), AnnotateBreeds(), AnnotateSpecies(),
AnnotateOrganismPart(), AnnotateDevelStage(),
AnnotatePhysioStage()
]
# instantiate the group
annotate_task = group([task.s() for task in tasks])
logger.debug("Starting task %s" % (annotate_task))
# start the group task - shortcut for apply_asyinc
result = annotate_task.delay()
logger.debug(result)
# forget about called tasks and exit
return "success"
# --- task registering
# register explicitly tasks
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
celery_app.tasks.register(AnnotateCountries)
celery_app.tasks.register(AnnotateBreeds)
celery_app.tasks.register(AnnotateSpecies)
celery_app.tasks.register(AnnotateOrganismPart)
celery_app.tasks.register(AnnotateDevelStage)
celery_app.tasks.register(AnnotatePhysioStage)
celery_app.tasks.register(AnnotateAll)