#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jul 18 14:14:06 2019
@author: Paolo Cozzi <cozzi@ibba.cnr.it>
"""
import redis
import traceback
import pyUSIrest.usi
import pyUSIrest.exceptions
from celery import chord
from celery.utils.log import get_task_logger
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count, F
from django.db.models.functions import Least
from django.utils import timezone
from django.template.defaultfilters import truncatechars
from common.constants import (
ERROR, READY, SUBMITTED, COMPLETED, EMAIL_MAX_BODY_SIZE)
from common.tasks import BaseTask, NotifyAdminTaskMixin
from image.celery import app as celery_app
from submissions.tasks import SubmissionTaskMixin
from ..helpers import get_auth
from ..models import (
Submission as USISubmission, SubmissionData as USISubmissionData)
# Get an instance of a logger
logger = get_task_logger(__name__)
# how many sample for submission
MAX_SAMPLES = 100
[docs]class SubmissionError(Exception):
"""Exception call for Error with submissions"""
pass
# HINT: move into helper module?
[docs]class SubmissionHelper():
"""
An helper class for submission task, used to deal with pyUSIrest
"""
# define my class attributes
[docs] def __init__(self, submission_id):
# ok those are my default class attributes
self.submission_id = submission_id
self.submission_obj = None
self.token = None
# here are pyUSIrest object
self.auth = None
self.root = None
# here I will track a USI submission
self.usi_submission = None
# here I will store samples already submitted
self.submitted_samples = {}
# get a submission object
self.submission_obj = USISubmission.objects.get(
pk=self.submission_id)
# HINT: should I check my status?
@property
def owner(self):
"""Recover owner from a submission object related with a UID
Submission
Returns:
:py:attr:`Submission.owner`: a django
:py:class:`django.contrib.auth.models.User` object
"""
return self.submission_obj.uid_submission.owner
@property
def team_name(self):
"""Recover team_name from a submission object
Returns:
str: the team name"""
return self.owner.biosample_account.team.name
@property
def usi_submission_name(self):
"""Get/set biosample submission id from database
Returns:
str: the biosample USI submission identifier"""
return self.submission_obj.usi_submission_name
@usi_submission_name.setter
def usi_submission_name(self, name):
self.submission_obj.usi_submission_name = name
self.submission_obj.save()
[docs] def read_token(self):
"""Read token from REDIS database and set root attribute with a
pyUSIrest.usi.Root instance
Returns:
str: the read token"""
# read biosample token from redis database
client = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB)
# infere key from submission data
key = "token:submission:{submission_id}:{user}".format(
submission_id=self.submission_obj.uid_submission.id,
user=self.owner)
# create a new auth object
logger.debug("Reading token for '%s'" % self.owner)
# getting token from redis db and set submission data
self.token = client.get(key).decode("utf8")
# get a root object with auth
self.auth = get_auth(token=self.token)
logger.debug("getting biosample root")
self.root = pyUSIrest.usi.Root(auth=self.auth)
return self.token
[docs] def start_submission(self):
"""
Get a USI submission document. Recover submission if possible,
create a new one if not defined. If recovered submission is
closed, raise an error
"""
if not self.recover_submission():
self.create_submission()
return self.usi_submission
[docs] def recover_submission(self):
"""Try to recover a USI submission document or raise Exception. If
not defined, return false"""
# If no usi_submission_name, return False
if not self.usi_submission_name:
return False
logger.info("Recovering submission %s for team %s" % (
self.usi_submission_name,
self.team_name))
# get the same submission object
self.usi_submission = self.root.get_submission_by_name(
submission_name=self.usi_submission_name)
# check that a submission is still editable
if self.usi_submission.status != "Draft":
raise SubmissionError(
"Cannot recover submission '%s': current status is '%s'" % (
self.usi_submission_name,
self.usi_submission.status))
# read already submitted samples
self.read_samples()
return True
[docs] def create_submission(self):
"""Create a new USI submission object
Returns:
:py:class:`pyUSIrest.usi.Submission` a pyUSIrest submission
object
"""
# getting team
logger.debug("getting team '%s'" % (self.team_name))
team = self.root.get_team_by_name(self.team_name)
# create a new submission
logger.info("Creating a new submission for '%s'" % (team.name))
self.usi_submission = team.create_submission()
# track submission_id in table
self.usi_submission_name = self.usi_submission.name
# return USI submission document
return self.usi_submission
[docs] def read_samples(self):
"""Read sample in a USI submission document and set submitted_samples
attribute"""
# read already submitted samples
logger.debug("Getting info on samples...")
for i, sample in enumerate(self.usi_submission.get_samples()):
self.submitted_samples[sample.alias] = sample
logger.debug("Got %s samples" % (i+1))
return self.submitted_samples
[docs] def create_or_update_sample(self, model):
"""Add or patch a sample into USI submission document. Can be
animal or sample
Args:
model (:py:class:`uid.mixins.BioSampleMixin`): An animal or
sample object"""
# alias is used to reference the same objects
alias = model.biosample_alias
# check in my submitted samples
if alias in self.submitted_samples:
# patch sample
logger.info("Patching %s" % (alias))
# get usi sample
sample = self.submitted_samples[alias]
sample.patch(model.to_biosample())
else:
sample = self.usi_submission.create_sample(
model.to_biosample())
self.submitted_samples[alias] = sample
# update sample status
model.status = SUBMITTED
model.last_submitted = timezone.now()
model.save()
[docs] def add_samples(self):
"""Iterate over sample data (animal/sample) and call
create_or_update_sample (if model is in READY state)"""
# iterate over sample data
for submission_data in self.submission_obj.submission_data\
.order_by('id'):
# get model for simplicity
model = submission_data.content_object
if model.status == READY:
logger.debug("Adding %s %s to submission %s" % (
model._meta.verbose_name,
model,
self.usi_submission_name))
self.create_or_update_sample(model)
else:
logger.debug("Ignoring %s %s: current status is %s" % (
model._meta.verbose_name,
model,
model.get_status_display()))
[docs] def mark_submission(self, status, message):
self.submission_obj.status = status
self.submission_obj.message = message
self.submission_obj.save()
[docs] def mark_fail(self, message):
"""Set a :py:const:`ERROR <common.constants.ERROR>` status for
:py:class:`biosample.models.Submission` and a message"""
self.mark_submission(ERROR, message)
[docs] def mark_success(self, message="Waiting for biosample validation"):
"""Set a :py:const:`SUBMITTED <common.constants.SUBMITTED>`
:py:class:`biosample.models.Submission` and a message"""
self.mark_submission(SUBMITTED, message)
[docs]class SubmitTask(NotifyAdminTaskMixin, BaseTask):
name = "Submit to Biosample"
description = """Submit to Biosample using USI"""
[docs] def run(self, usi_submission_id):
"""Run task. Instantiate a SubmissionHelper with the provided
:py:class:`biosample.models.Submission` id. Read token from database,
start or recover a submission, add samples to it and then mark a
status for it
"""
# get a submission helper object
submission_helper = SubmissionHelper(submission_id=usi_submission_id)
# No retries, we expect always success
try:
submission_helper.read_token()
submission_helper.start_submission()
submission_helper.add_samples()
submission_helper.mark_success()
except pyUSIrest.exceptions.USIConnectionError as exc:
logger.error("Error in biosample submission: %s" % exc)
message = "Errors in EBI API endpoints. Please try again later"
logger.error(message)
# track message in submission object
submission_helper.mark_submission(READY, message)
# TODO: should I rename this exception with a more informative name
# when token expires during a submission?
except pyUSIrest.exceptions.TokenExpiredError as exc:
logger.error("Error in biosample submission: %s" % exc)
message = (
"Your token is expired: please submit again to resume "
"submission")
logger.error(message)
# track message in submission object
submission_helper.mark_submission(READY, message)
except Exception as exc:
logger.error("Unmanaged error: %s" % exc)
# get exception info
einfo = traceback.format_exc()
# track traceback in message
submission_helper.mark_fail(einfo)
return "success", usi_submission_id
# HINT: move into helper module?
[docs]class SplitSubmissionHelper():
"""
helper class to split py:class`uid.models.Submission` data in
bacthes limited in sizes"""
[docs] def __init__(self, uid_submission):
self.counter = 0
self.uid_submission = uid_submission
self.usi_submission = None
self.submission_ids = []
[docs] def process_data(self):
"""Add animal and its samples to a submission"""
# here we try to submit first animal without parents, then animal
# with parent with lowest foreign keys, supposing that when uploadimg
# a chiuld, his parents need to be defined and so they have a lower id
# the postgres LEAST is a function that will return the column with
# the lowest value, then we have to order explicitely with F in
# order to apply the NULLS FIRST condition (animal without parents)
for animal in self.uid_submission.animal_set.annotate(
least=Least('father_id', 'mother_id')).order_by(
F('least').asc(nulls_first=True), F('id')):
# ignore not READY models
self.process_model(animal)
# Add their specimen
for sample in animal.sample_set.all():
# ignore not READY models
self.process_model(sample)
# end of cicle for animal
# are there orphaned samples (a submission with only samples)?
for sample in self.uid_submission.sample_set.all():
# ignore not READY models
self.process_model(sample)
[docs] def process_model(self, model):
"""Test for a model in a biosample submission. Ignore a model if
status is not READY"""
if model.status == READY:
self.add_to_submission_data(model)
else:
# already submittes, so could be ignored
logger.debug("Ignoring %s %s" % (model._meta.verbose_name, model))
[docs] def create_submission(self):
"""
Create a new :py:class:`biosample.models.Submission` object and
set sample counter to 0"""
self.usi_submission = USISubmission.objects.create(
uid_submission=self.uid_submission,
status=READY)
# track object pks
self.usi_submission.refresh_from_db()
self.submission_ids.append(self.usi_submission.id)
logger.debug("Created submission %s" % (self.usi_submission))
# reset couter object
self.counter = 0
[docs] def model_in_submission(self, model):
"""
Check if :py:class:`uid.mixins.BioSampleMixin` is already in an
opened submission"""
logger.debug("Searching %s %s in submissions" % (
model._meta.verbose_name,
model))
# get content type
ct = ContentType.objects.get_for_model(model)
# define a queryset
data_qs = USISubmissionData.objects.filter(
content_type=ct,
object_id=model.id)
# exclude opened submission
data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
if data_qs.count() == 1:
usi_submission = data_qs.first().submission
logger.debug("Found %s %s in %s" % (
model._meta.verbose_name,
model,
usi_submission))
# mark this batch to be called like it was created
if usi_submission.id not in self.submission_ids:
self.submission_ids.append(usi_submission.id)
logger.debug(
"Reset status for submission %s" % (usi_submission))
usi_submission.status = READY
usi_submission.save()
return True
elif data_qs.count() >= 1:
raise SubmissionError(
"More than one submission opened for %s %s" % (
model._meta.verbose_name,
model))
else:
# no sample in data. I could append model into submission
logger.debug("No %s %s in submission data" % (
model._meta.verbose_name,
model))
return False
[docs] def add_to_submission_data(self, model):
"""Add a :py:class:`uid.mixins.BioSampleMixin` to a
:py:class:`biosample.models.Submission` object, or create a new
one if there are more samples than required"""
# get model type (animal or sample)
model_type = model._meta.verbose_name
# check if model is already in an opened submission
if self.model_in_submission(model):
logger.debug("Ignoring %s %s: already in a submission" % (
model_type,
model))
return
# Create a new submission if necessary
if self.usi_submission is None:
self.create_submission()
# every time I split data in chunks I need to call the
# submission task. Do it only on animals, to prevent
# to put samples in a different submission
if model_type == 'animal' and self.counter >= MAX_SAMPLES:
self.create_submission()
logger.info("Appending %s %s to %s" % (
model._meta.verbose_name,
model,
self.usi_submission))
# add object to submission data and updating counter
USISubmissionData.objects.create(
submission=self.usi_submission,
content_object=model)
self.counter += 1
# Raise internal counter
self.usi_submission.samples_count = F('samples_count') + 1
self.usi_submission.save()
[docs]class SplitSubmissionTask(SubmissionTaskMixin, NotifyAdminTaskMixin, BaseTask):
"""Split submission data in chunks in order to submit data through
multiple tasks/processes and with smaller submissions"""
name = "Split submission data"
description = """Split submission data in chunks"""
action = "biosample submission"
[docs] def run(self, submission_id):
"""Call :py:class:`SplitSubmissionHelper` to split
:py:class:`uid.models.Submission` data.
Call :py:class:`SubmitTask` for each
batch of data and then call :py:class:`SubmissionCompleteTask` after
all data were submitted"""
logger.info("Starting %s for submission %s" % (
self.name, submission_id))
uid_submission = self.get_uid_submission(submission_id)
# call an helper class to create database objects
submission_data_helper = SplitSubmissionHelper(uid_submission)
# iterate over animal and samples
submission_data_helper.process_data()
# prepare to launch chord tasks
submissioncomplete = SubmissionCompleteTask()
# assign kwargs to chord
callback = submissioncomplete.s(uid_submission_id=submission_id)
submit = SubmitTask()
header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
logger.debug("Preparing chord for %s tasks" % len(header))
# call chord task. Chord will be called only after all tasks
res = chord(header)(callback)
logger.info(
"Start submission chord process for %s with task %s" % (
uid_submission,
res.task_id))
logger.info("%s completed" % self.name)
# return a status
return "success"
[docs]class SubmissionCompleteTask(
SubmissionTaskMixin, NotifyAdminTaskMixin, BaseTask):
"""Update submission status after batch submission"""
name = "Complete Submission Process"
description = """Check submission status and update stuff"""
action = "biosample submission"
[docs] def run(self, *args, **kwargs):
"""Fetch submission data and then update
:py:class:`uid.models.Submission` status"""
# those are the output of SubmitTask, as a tuple of
# biosample.model.Submission.pk and "success"
submission_statuses = args[0]
# get UID submission
uid_submission = self.get_uid_submission(kwargs['uid_submission_id'])
# mark as completed if submission_statuses is empty, for example when
# submitting a uid submission with no data
if not submission_statuses:
message = "Submission %s is empty!" % uid_submission
logger.warning(message)
# update submission status. No more queries on this
self.update_submission_status(
uid_submission, ERROR, message)
return "success"
# submission_statuses will be an array like this
# [("success", 1), ("success"), 2]
usi_submission_ids = [status[1] for status in submission_statuses]
# fetch data from database
submission_qs = USISubmission.objects.filter(
pk__in=usi_submission_ids)
# annotate biosample submission by statuses
statuses = {}
for res in submission_qs.values('status').annotate(
count=Count('status')):
statuses[res['status']] = res['count']
# check for errors in submission. Those are statuses setted by
# SubmitTask
if ERROR in statuses:
# submission failed
logger.info("Submission %s failed" % uid_submission)
self.update_message(uid_submission, submission_qs, ERROR)
# send a mail to the user
uid_submission.owner.email_user(
"Error in biosample submission %s" % (
uid_submission.id),
("Something goes wrong with biosample submission. Please "
"report this to InjectTool team\n\n"
"%s" % truncatechars(
uid_submission.message, EMAIL_MAX_BODY_SIZE)),
)
elif READY in statuses:
# submission failed
logger.info("Temporary error for %s" % uid_submission)
self.update_message(uid_submission, submission_qs, READY)
# send a mail to the user
uid_submission.owner.email_user(
"Temporary error in biosample submission %s" % (
uid_submission.id),
("Something goes wrong with biosample submission. Please "
"try again\n\n"
"%s" % truncatechars(
uid_submission.message, EMAIL_MAX_BODY_SIZE)),
)
else:
# Update submission status: a completed but not yet finalized
# submission
logger.info("Submission %s success" % uid_submission)
self.update_message(uid_submission, submission_qs, SUBMITTED)
return "success"
[docs] def update_message(self, uid_submission, submission_qs, status):
"""Read :py:class:`biosample.models.Submission` message and set
:py:class:`uid.models.Submission` message"""
# get error messages for submission
message = []
for submission in submission_qs.filter(status=status):
message.append(submission.message)
self.update_submission_status(
uid_submission, status, "\n".join(set(message)))
# register explicitly tasks
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
celery_app.tasks.register(SubmitTask)
celery_app.tasks.register(SplitSubmissionTask)
celery_app.tasks.register(SubmissionCompleteTask)