Source code for common.helpers
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Mar 27 12:50:16 2019
@author: Paolo Cozzi <cozzi@ibba.cnr.it>
"""
import re
import logging
import datetime
import websockets
import time
import json
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.contrib.admin.utils import NestedObjects
from django.core.mail import send_mass_mail
from django.db import DEFAULT_DB_ALIAS
from django.utils.text import capfirst
from django.utils.encoding import force_text
from .constants import YEARS, MONTHS, DAYS, OBO_URL, TIME_UNITS
# Get an instance of a logger
logger = logging.getLogger(__name__)
[docs]def image_timedelta(t1, t2):
"""A function to deal with image time intervals. Returns a number and
time unit"""
if t1 is None or t2 is None:
logger.warning("One date is NULL ({0}, {1}) ignoring".format(t2, t1))
return None, YEARS
if type(t1) != datetime.date or type(t2) != datetime.date:
raise ValueError(
"Expecting a datetime value (got '%s':%s and '%s':%s)" % (
t1, type(t1), t2, type(t2)))
if t2 > t1:
logger.warning("t2>t1 ({0}, {1}) ignoring".format(t2, t1))
return None, YEARS
# check for meaningful intervald
if t1.year == 1900 or t2.year == 1900:
logger.warning("Ignoring one date ({0}, {1})".format(t2, t1))
return None, YEARS
rdelta = relativedelta(t1, t2)
if rdelta.years != 0:
return rdelta.years, YEARS
elif rdelta.months != 0:
return rdelta.months, MONTHS
else:
return rdelta.days, DAYS
PATTERN_INTERVAL = re.compile(r"([\d]+) ([\w]+s?)")
[docs]def parse_image_timedelta(interval):
"""A function to parse from a image_timdelta string"""
# cast interval as string
interval = str(interval)
match = re.search(PATTERN_INTERVAL, interval)
# get parsed data
try:
value, units = match.groups()
except AttributeError:
raise ValueError("Expecting a value like '1 year', '10 months', etc")
# time units are plural in database
if units[-1] != 's':
units += 's'
# get time units from database
units = TIME_UNITS.get_value_by_desc(units)
return int(value), units
# https://stackoverflow.com/a/39533619/4385116
# inspired django.contrib.admin.utils.get_deleted_objects, this function
# tries to determine all related objects starting from a provied one
# HINT: similar function at https://gist.github.com/nealtodd/4594575
[docs]def get_deleted_objects(objs, db_alias=DEFAULT_DB_ALIAS):
# NestedObjects is an imporovement of django.db.models.deletion.Collector
collector = NestedObjects(using=db_alias)
collector.collect(objs)
def format_callback(obj):
opts = obj._meta
no_edit_link = '%s: %s' % (capfirst(opts.verbose_name),
force_text(obj))
return no_edit_link
to_delete = collector.nested(format_callback)
protected = [format_callback(obj) for obj in collector.protected]
model_count = {
model._meta.verbose_name_plural:
len(objs) for model, objs in collector.model_objs.items()}
return to_delete, model_count, protected
[docs]async def send_message_to_websocket(message, pk):
"""
Function will create websocket object and send message to django-channels
Args:
message (dict): message to send to websocket
pk (str): primary key of submission
"""
# Need to have it here as in case with small test data message sent to
# websocket will overcome response from server
time.sleep(3)
async with websockets.connect(
'ws://asgi:8001/ws/submissions/{}/'.format(pk)) as websocket:
await websocket.send(json.dumps(message))
[docs]def get_admin_emails():
"""Return admin email from image.settings"""
ADMINS = settings.ADMINS
# return all admin mail addresses
return [admin[1] for admin in ADMINS]
[docs]def send_mail_to_admins(
email_subject, email_message,
default_from=settings.DEFAULT_FROM_EMAIL,
addresses=get_admin_emails()):
"""Send a mail to all admins
Args:
email_subject (str): the email subject
email_message (str): the body of the email
default_from (str): the from field of the email
addresses (list): a list of email addresses
"""
# submit mail to admins
datatuple = (
email_subject,
email_message,
default_from,
addresses)
send_mass_mail((datatuple, ))
[docs]def uid2biosample(value):
"""Convert human-readable name to model field"""
if value == 'Sample storage':
return 'storage'
elif value == 'Sample storage processing':
return 'storage_processing'
elif value == 'Sampling to preparation interval':
return 'preparation_interval_units'
elif value == 'Specimen collection protocol':
return 'protocol'
return '_'.join(value.lower().split(" "))