Skip to content

Commit

Permalink
Merge pull request #1520 from AlexsLemonade/dev
Browse files Browse the repository at this point in the history
[HOTFIX] Deploy old salmon version rerunning! (and others)
  • Loading branch information
kurtwheeler authored Aug 21, 2019
2 parents a790dbc + 2ec0000 commit f10da1e
Show file tree
Hide file tree
Showing 25 changed files with 635 additions and 244 deletions.
Empty file.
Empty file.
31 changes: 31 additions & 0 deletions api/data_refinery_api/management/commands/update_es_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import datetime

from django.core.management.base import BaseCommand
from django_elasticsearch_dsl.registries import registry
from django.utils import timezone


# We'll update for the past 30 minutes every 20 minutes.
UPDATE_WINDOW = datetime.timedelta(minutes=30)


class Command(BaseCommand):
help = 'Manage elasticsearch index.'

def handle(self, *args, **options):
"""This command is based off of the 'populate' command of Django ES DSL:
https://github.com/sabricot/django-elasticsearch-dsl/blob/f6b2e0694e4ed69826c824196ccec5863874c856/django_elasticsearch_dsl/management/commands/search_index.py#L86
We have updated it so that it will do incremental updates
rather than looping over the full queryset every time.
"""
models = set(registry.get_models())

for doc in registry.get_documents(models):
start_time = timezone.now() - UPDATE_WINDOW
qs = doc().get_queryset().filter(last_modified__gt=start_time).order_by('id')
self.stdout.write("Indexing {} '{}' objects".format(
qs.count(), qs.model.__name__)
)
doc().update(qs)
2 changes: 1 addition & 1 deletion api/data_refinery_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ def __init__(self, *args, **kwargs):

if 'context' in kwargs:
if 'request' in kwargs['context']:
# only inclue the fields `experiments` and `organism_samples` when the param `?details=true`
# only include the fields `experiments` and `organism_samples` when the param `?details=true`
# is provided. This is used on the frontend to render the downloads page
# thanks to https://django.cowhite.com/blog/dynamically-includeexclude-fields-to-django-rest-framwork-serializers-based-on-user-requests/
if 'details' not in kwargs['context']['request'].query_params:
Expand Down
15 changes: 14 additions & 1 deletion api/data_refinery_api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,14 +939,27 @@ def update_cache(cls, range_param):

return data

EMAIL_USERNAME_BLACKLIST = ['arielsvn', 'miserlou', 'kurt.wheeler91', 'd.prasad']

EMAIL_USERNAME_BLACKLIST = [
'arielsvn',
'cansav09',
'd.prasad',
'daniel.himmelstein',
'dv.prasad991',
'greenescientist',
'jaclyn.n.taroni',
'kurt.wheeler91',
'michael.zietz',
'miserlou'
]

@classmethod
def _get_dataset_stats(cls, range_param):
"""Returns stats for processed datasets"""
filter_query = Q()
for username in Stats.EMAIL_USERNAME_BLACKLIST:
filter_query = filter_query | Q(email_address__startswith=username)
filter_query = filter_query | Q(email_address__endswith='@alexslemonade.org')
processed_datasets = Dataset.objects.filter(is_processed=True, email_address__isnull=False).exclude(filter_query)
result = processed_datasets.aggregate(
total=Count('id'),
Expand Down
123 changes: 123 additions & 0 deletions common/data_refinery_common/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
DownloaderJob,
DownloaderJobOriginalFileAssociation,
OriginalFile,
ProcessorJob,
ProcessorJobOriginalFileAssociation,
)
from data_refinery_common.utils import (
get_env_variable,
get_env_variable_gracefully,
get_instance_id,
)
from data_refinery_common.job_lookup import determine_processor_pipeline, determine_ram_amount, ProcessorEnum, ProcessorPipeline, Downloaders
from data_refinery_common.message_queue import send_job


logger = get_and_configure_logger(__name__)
Expand Down Expand Up @@ -141,3 +145,122 @@ def create_downloader_job(undownloaded_files: List[OriginalFile],
)

return True

# TODO: extend this list.
BLACKLISTED_EXTENSIONS = ["xml", "chp", "exp"]

def delete_if_blacklisted(original_file: OriginalFile) -> OriginalFile:
extension = original_file.filename.split(".")[-1]
if extension.lower() in BLACKLISTED_EXTENSIONS:
logger.debug("Original file had a blacklisted extension of %s, skipping",
extension,
original_file=original_file.id)

original_file.delete_local_file()
original_file.is_downloaded = False
original_file.save()
return None

return original_file

def create_processor_jobs_for_original_files(original_files: List[OriginalFile],
downloader_job: DownloaderJob=None):
"""
Creates one processor job for each original file given.
"""
for original_file in original_files:
sample_object = original_file.samples.first()

if not delete_if_blacklisted(original_file):
continue

# Fix for: https://github.com/AlexsLemonade/refinebio/issues/968
# Basically, we incorrectly detected technology/manufacturers
# for many Affymetrix samples and this is a good place to fix
# some of them.
if original_file.is_affy_data():
# Only Affymetrix Microarrays produce .CEL files
sample_object.technology = 'MICROARRAY'
sample_object.manufacturer = 'AFFYMETRIX'
sample_object.save()

pipeline_to_apply = determine_processor_pipeline(sample_object, original_file)

if pipeline_to_apply == ProcessorPipeline.NONE:
logger.info("No valid processor pipeline found to apply to sample.",
sample=sample_object.id,
original_file=original_files[0].id)
original_file.delete_local_file()
original_file.is_downloaded = False
original_file.save()
else:
processor_job = ProcessorJob()
processor_job.pipeline_applied = pipeline_to_apply.value
processor_job.ram_amount = determine_ram_amount(sample_object, processor_job)
processor_job.save()

assoc = ProcessorJobOriginalFileAssociation()
assoc.original_file = original_file
assoc.processor_job = processor_job
assoc.save()

if downloader_job:
logger.debug("Queuing processor job.",
processor_job=processor_job.id,
original_file=original_file.id,
downloader_job=downloader_job.id)
else:
logger.debug("Queuing processor job.",
processor_job=processor_job.id,
original_file=original_file.id)

try:
send_job(pipeline_to_apply, processor_job)
except:
# If we cannot queue the job now the Foreman will do
# it later.
pass


def create_processor_job_for_original_files(original_files: List[OriginalFile],
downloader_job: DownloaderJob=None):
"""
Create a processor job and queue a processor task for sample related to an experiment.
"""

# If there's no original files then we've created all the jobs we need to!
if len(original_files) == 0:
return

# For anything that has raw data there should only be one Sample per OriginalFile
sample_object = original_files[0].samples.first()
pipeline_to_apply = determine_processor_pipeline(sample_object, original_files[0])

if pipeline_to_apply == ProcessorPipeline.NONE:
logger.info("No valid processor pipeline found to apply to sample.",
sample=sample_object.id,
original_file=original_files[0].id)
for original_file in original_files:
original_file.delete_local_file()
original_file.is_downloaded = False
original_file.save()
else:
processor_job = ProcessorJob()
processor_job.pipeline_applied = pipeline_to_apply.value
processor_job.ram_amount = determine_ram_amount(sample_object, processor_job)
processor_job.save()
for original_file in original_files:
assoc = ProcessorJobOriginalFileAssociation()
assoc.original_file = original_file
assoc.processor_job = processor_job
assoc.save()

logger.debug("Queuing processor job.",
processor_job=processor_job.id)

try:
send_job(pipeline_to_apply, processor_job)
except:
# If we cannot queue the job now the Foreman will do
# it later.
pass
47 changes: 35 additions & 12 deletions common/data_refinery_common/rna_seq.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, List
from django.db.models import OuterRef, Subquery

from data_refinery_common.job_lookup import ProcessorEnum
from data_refinery_common.logging import get_and_configure_logger
Expand Down Expand Up @@ -33,15 +34,24 @@


def should_run_tximport(experiment: Experiment,
num_quantified: int,
results,
is_tximport_job: bool):
""" Returns whether or not the experiment is eligible to have tximport
run on it.
num_quantified is how many samples have had salmon quant run on them.
results is a queryset of ComputationalResults for the samples that had salmon quant run on them.
"""
num_quantified = results.count()
if num_quantified == 0:
return False

num_salmon_versions = results.filter(organism_index__salmon_version__isnull=False)\
.values_list('organism_index__salmon_version')\
.distinct().count()
if num_salmon_versions > 1:
# Tximport requires that all samples are processed with the same salmon version
# https://github.com/AlexsLemonade/refinebio/issues/1496
return False

eligible_samples = experiment.samples.filter(source_database='SRA', technology='RNA-SEQ')

Expand All @@ -66,16 +76,29 @@ def should_run_tximport(experiment: Experiment,

def get_quant_results_for_experiment(experiment: Experiment):
"""Returns a list of salmon quant results from `experiment`."""
results = []
for sample in experiment.samples.all():
for result in sample.results.order_by('-created_at').all():
# TODO: this will break when we want to run for a new version.
if result.processor.name == ProcessorEnum.SALMON_QUANT.value['name']:
results.append(result)
break

return results

# Subquery to calculate quant results
# https://docs.djangoproject.com/en/2.2/ref/models/expressions/#subquery-expressions

# Calculate the computational results sorted that are associated with a given sample (
# referenced from the top query)
newest_computational_results = ComputationalResult.objects.all()\
.filter(
samples=OuterRef('id'),
processor__name=ProcessorEnum.SALMON_QUANT.value['name']
)\
.order_by('-created_at')

# Annotate each sample in the experiment with the id of the most recent computational result
computational_results_ids = experiment.samples.all().annotate(
latest_computational_result_id=Subquery(newest_computational_results.values('id')[:1])
)\
.filter(latest_computational_result_id__isnull=False)\
.values_list('latest_computational_result_id', flat=True)

# return the computational results that match those ids
return ComputationalResult.objects.all().filter(
id__in=computational_results_ids
)

def get_quant_files_for_results(results: List[ComputationalResult]):
"""Returns a list of salmon quant results from `experiment`."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import os
import psutil

from django.test import TestCase, tag
from typing import List
from unittest.mock import patch, call
from urllib.error import URLError

from data_refinery_workers.downloaders import utils
from data_refinery_common.job_management import create_processor_job_for_original_files


class UtilsTestCase(TestCase):
@tag('downloaders')
def test_no_jobs_to_create(self):
"""Make sure this function doesn't raise an exception with no files."""
utils.create_processor_job_for_original_files([])
create_processor_job_for_original_files([])

self.assertTrue(True)
Loading

0 comments on commit f10da1e

Please sign in to comment.