Source code for common.tasks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jan 15 16:42:24 2019

@author: Paolo Cozzi <cozzi@ibba.cnr.it>
"""

import redis
import traceback

from contextlib import contextmanager
from celery.five import monotonic
from celery.utils.log import get_task_logger

from django.conf import settings
from django.core import management

from image.celery import app as celery_app

from .helpers import send_mail_to_admins

# Lock expires in 10 minutes
LOCK_EXPIRE = 60 * 10

# Get an instance of a logger
logger = get_task_logger(__name__)


[docs]class BaseTask(celery_app.Task): """Base class to celery tasks. Define logs for on_failure and debug_task""" name = None description = None action = None
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.name is None: self.name = str(self.__class__) if self.action is None: self.action = str(self.__class__)
[docs] def on_failure(self, exc, task_id, args, kwargs, einfo): logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
[docs] def debug_task(self): # this doesn't throw an error when debugging a task called with run() if self.request_stack: logger.debug('Request: {0!r}'.format(self.request))
[docs]class NotifyAdminTaskMixin(): """A mixin to send error message to admins""" action = None
[docs] def on_failure(self, exc, task_id, args, kwargs, einfo): """Override the default on_failure method""" # call base class super().on_failure(exc, task_id, args, kwargs, einfo) # get exception info einfo = traceback.format_exc() subject = "Error in %s" % (self.action) body = str(einfo) send_mail_to_admins(subject, body)
# https://stackoverflow.com/a/51429597 @celery_app.task(bind=True, base=BaseTask) def clearsessions(self): """Cleanup expired sessions by using Django management command.""" logger.info("Clearing session with celery...") # debugging instance self.debug_task() # calling management command management.call_command("clearsessions", verbosity=1) # debug logger.info("Sessions cleaned!") return "Sessions cleaned with success" @celery_app.task(bind=True, base=BaseTask) def cleanupregistration(self): """Cleanup expired registration keys by using Django management command.""" logger.info("Cleaning up expired registration keys with celery...") # debugging instance self.debug_task() # calling management command management.call_command("cleanupregistration", verbosity=1) # debug logger.info("Registrations cleaned!") return "Registrations cleaned with success"
[docs]@contextmanager def redis_lock(lock_id, blocking=False, expire=True): """ This function get a lock relying on a lock name and other status. You can describe more process using the same lock name and give exclusive access to one of them. Args: lock_id (str): the name of the lock to take blocking (bool): if True, we wait until we have the block, if False we returns immediately False expire (bool): if True, lock will expire after LOCK_EXPIRE timeout, if False, it will persist until lock is released Returns: bool: True if lock acquired, False otherwise """ # read parameters from settings REDIS_CLIENT = redis.StrictRedis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB) # this will be the redis lock lock = None # timeout for the lock (if expire condition) timeout_at = monotonic() + LOCK_EXPIRE - 3 if expire: lock = REDIS_CLIENT.lock(lock_id, timeout=LOCK_EXPIRE) else: lock = REDIS_CLIENT.lock(lock_id, timeout=None) status = lock.acquire(blocking=blocking) try: logger.debug("lock %s acquired is: %s" % (lock_id, status)) yield status finally: # we take advantage of using add() for atomic locking # don't release the lock if we didn't acquire it if status and ((monotonic() < timeout_at and expire) or not expire): logger.debug("Releasing lock %s" % lock_id) # don't release the lock if we exceeded the timeout # to lessen the chance of releasing an expired lock # owned by someone else # if no timeout and lock is taken, release it lock.release()
[docs]class exclusive_task(object): """A class decorator to execute an exclusive task by decorating celery.tasks.Task.run (run this task once, others task calls will return already running message without calling task or will wait until other tasks of this type are completed) Args: task_name (str): task name used for debug lock_id (str): the task lock id blocking (bool): set task as blocking (wait until no other tasks are running. def. False) lock_expire (bool): define if lock will expire or not after a certain time (def. False) """
[docs] def __init__(self, task_name, lock_id, blocking=False, block_expire=False): """ If there are decorator arguments, the function to be decorated is not passed to the constructor! """ logger.debug("Setting up ExclusiveTaskDecorator") self.task_name = task_name self.lock_id = lock_id self.blocking = blocking self.block_expire = block_expire
def __call__(self, f): """ If there are decorator arguments, __call__() is only called once, as part of the decoration process! You can only give it a single argument, which is the function object. """ logger.debug("Decorating function") def wrapped_f(*args, **kwargs): with redis_lock( self.lock_id, self.blocking, self.block_expire) as acquired: if acquired: logger.debug("lock %s acquired" % self.lock_id) # do stuff and return something result = f(*args, **kwargs) logger.debug("lock %s released" % self.lock_id) else: # warn user and return a default message result = "%s already running!" % (self.task_name) logger.warning(result) return result return wrapped_f