Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HOTFIX] Deploy old salmon version rerunning! (and others) #1520

Merged
merged 31 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ca62385
optimize db queries
arielsvn Aug 16, 2019
0e6ec37
Check that all samples were processed with the same salmon version
arielsvn Aug 16, 2019
59fcabe
syntax test
arielsvn Aug 16, 2019
99d2d1a
return queryset from get_quant_results_for_experiment
arielsvn Aug 16, 2019
290a759
Bump up RAM amount for compendia jobs and make smasher instance large…
kurtwheeler Aug 19, 2019
bbbecbc
Add comment about updating salmon version
arielsvn Aug 19, 2019
93fac67
Add command to rerun salmon on older samples
arielsvn Aug 19, 2019
8da9e2d
Add tests to re-run salmon
arielsvn Aug 19, 2019
2662dcb
refactor create_processor_job into common module
arielsvn Aug 20, 2019
5cc13e4
Apply suggestions from code review
arielsvn Aug 20, 2019
5256ab8
add positional argument
arielsvn Aug 20, 2019
7578b08
Merge branch 'arielsvn/1496-tximport-salmon-version-consistent' of gi…
arielsvn Aug 20, 2019
ebe1566
remove unneded import
arielsvn Aug 20, 2019
f343e46
extend excluded list and add ALSF email addrs
davidsmejia Aug 20, 2019
86c5f11
fix import in job management test
arielsvn Aug 20, 2019
30a6f04
Ensure jobs are not recreated if they exists
arielsvn Aug 20, 2019
23ad24b
remove tag
arielsvn Aug 20, 2019
eb2e631
Fix foreman test that broke because I fixed the download job cap.
kurtwheeler Aug 20, 2019
5c1f5ea
Merge pull request #1516 from AlexsLemonade/kurtwheeler/fix-failing-test
kurtwheeler Aug 21, 2019
677e012
Create and use the create_es_index management command instead of sear…
kurtwheeler Aug 21, 2019
a6dabd6
Cast the models to a set cause that's what the original function does.
kurtwheeler Aug 21, 2019
9718e59
add sleep 10
arielsvn Aug 21, 2019
05323f7
Merge pull request #1512 from AlexsLemonade/kurtwheeler/bump-compendi…
kurtwheeler Aug 21, 2019
6d08a67
Merge pull request #1515 from AlexsLemonade/davidsmejia/714-exclude-e…
kurtwheeler Aug 21, 2019
3a89ac3
Merge pull request #1513 from AlexsLemonade/arielsvn/1496-tximport-sa…
kurtwheeler Aug 21, 2019
273cb62
Merge pull request #1518 from AlexsLemonade/kurtwheeler/incremental-e…
kurtwheeler Aug 21, 2019
1b62864
Skip accessions we've surveyed already, bump up volumes by 2.
kurtwheeler Aug 21, 2019
51ee391
Log downloader exceptions better.
kurtwheeler Aug 21, 2019
42217ef
Break up failure reason a bit more.
kurtwheeler Aug 21, 2019
41f8d5c
Fix the newline I stuck in there.
kurtwheeler Aug 21, 2019
2ec0000
Merge pull request #1519 from AlexsLemonade/kurtwheeler/tuning-again
kurtwheeler Aug 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
31 changes: 31 additions & 0 deletions api/data_refinery_api/management/commands/update_es_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import datetime

from django.core.management.base import BaseCommand
from django_elasticsearch_dsl.registries import registry
from django.utils import timezone


# We'll update for the past 30 minutes every 20 minutes.
UPDATE_WINDOW = datetime.timedelta(minutes=30)


class Command(BaseCommand):
help = 'Manage elasticsearch index.'

def handle(self, *args, **options):
"""This command is based off of the 'populate' command of Django ES DSL:

https://github.com/sabricot/django-elasticsearch-dsl/blob/f6b2e0694e4ed69826c824196ccec5863874c856/django_elasticsearch_dsl/management/commands/search_index.py#L86

We have updated it so that it will do incremental updates
rather than looping over the full queryset every time.
"""
models = set(registry.get_models())

for doc in registry.get_documents(models):
start_time = timezone.now() - UPDATE_WINDOW
qs = doc().get_queryset().filter(last_modified__gt=start_time).order_by('id')
self.stdout.write("Indexing {} '{}' objects".format(
qs.count(), qs.model.__name__)
)
doc().update(qs)
2 changes: 1 addition & 1 deletion api/data_refinery_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ def __init__(self, *args, **kwargs):

if 'context' in kwargs:
if 'request' in kwargs['context']:
# only inclue the fields `experiments` and `organism_samples` when the param `?details=true`
# only include the fields `experiments` and `organism_samples` when the param `?details=true`
# is provided. This is used on the frontend to render the downloads page
# thanks to https://django.cowhite.com/blog/dynamically-includeexclude-fields-to-django-rest-framwork-serializers-based-on-user-requests/
if 'details' not in kwargs['context']['request'].query_params:
Expand Down
15 changes: 14 additions & 1 deletion api/data_refinery_api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,14 +939,27 @@ def update_cache(cls, range_param):

return data

EMAIL_USERNAME_BLACKLIST = ['arielsvn', 'miserlou', 'kurt.wheeler91', 'd.prasad']

EMAIL_USERNAME_BLACKLIST = [
'arielsvn',
'cansav09',
'd.prasad',
'daniel.himmelstein',
'dv.prasad991',
'greenescientist',
'jaclyn.n.taroni',
'kurt.wheeler91',
'michael.zietz',
'miserlou'
]

@classmethod
def _get_dataset_stats(cls, range_param):
"""Returns stats for processed datasets"""
filter_query = Q()
for username in Stats.EMAIL_USERNAME_BLACKLIST:
filter_query = filter_query | Q(email_address__startswith=username)
filter_query = filter_query | Q(email_address__endswith='@alexslemonade.org')
processed_datasets = Dataset.objects.filter(is_processed=True, email_address__isnull=False).exclude(filter_query)
result = processed_datasets.aggregate(
total=Count('id'),
Expand Down
123 changes: 123 additions & 0 deletions common/data_refinery_common/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
DownloaderJob,
DownloaderJobOriginalFileAssociation,
OriginalFile,
ProcessorJob,
ProcessorJobOriginalFileAssociation,
)
from data_refinery_common.utils import (
get_env_variable,
get_env_variable_gracefully,
get_instance_id,
)
from data_refinery_common.job_lookup import determine_processor_pipeline, determine_ram_amount, ProcessorEnum, ProcessorPipeline, Downloaders
from data_refinery_common.message_queue import send_job


logger = get_and_configure_logger(__name__)
Expand Down Expand Up @@ -141,3 +145,122 @@ def create_downloader_job(undownloaded_files: List[OriginalFile],
)

return True

# TODO: extend this list.
BLACKLISTED_EXTENSIONS = ["xml", "chp", "exp"]

def delete_if_blacklisted(original_file: OriginalFile) -> OriginalFile:
extension = original_file.filename.split(".")[-1]
if extension.lower() in BLACKLISTED_EXTENSIONS:
logger.debug("Original file had a blacklisted extension of %s, skipping",
extension,
original_file=original_file.id)

original_file.delete_local_file()
original_file.is_downloaded = False
original_file.save()
return None

return original_file

def create_processor_jobs_for_original_files(original_files: List[OriginalFile],
downloader_job: DownloaderJob=None):
"""
Creates one processor job for each original file given.
"""
for original_file in original_files:
sample_object = original_file.samples.first()

if not delete_if_blacklisted(original_file):
continue

# Fix for: https://github.com/AlexsLemonade/refinebio/issues/968
# Basically, we incorrectly detected technology/manufacturers
# for many Affymetrix samples and this is a good place to fix
# some of them.
if original_file.is_affy_data():
# Only Affymetrix Microarrays produce .CEL files
sample_object.technology = 'MICROARRAY'
sample_object.manufacturer = 'AFFYMETRIX'
sample_object.save()

pipeline_to_apply = determine_processor_pipeline(sample_object, original_file)

if pipeline_to_apply == ProcessorPipeline.NONE:
logger.info("No valid processor pipeline found to apply to sample.",
sample=sample_object.id,
original_file=original_files[0].id)
original_file.delete_local_file()
original_file.is_downloaded = False
original_file.save()
else:
processor_job = ProcessorJob()
processor_job.pipeline_applied = pipeline_to_apply.value
processor_job.ram_amount = determine_ram_amount(sample_object, processor_job)
processor_job.save()

assoc = ProcessorJobOriginalFileAssociation()
assoc.original_file = original_file
assoc.processor_job = processor_job
assoc.save()

if downloader_job:
logger.debug("Queuing processor job.",
processor_job=processor_job.id,
original_file=original_file.id,
downloader_job=downloader_job.id)
else:
logger.debug("Queuing processor job.",
processor_job=processor_job.id,
original_file=original_file.id)

try:
send_job(pipeline_to_apply, processor_job)
except:
# If we cannot queue the job now the Foreman will do
# it later.
pass


def create_processor_job_for_original_files(original_files: List[OriginalFile],
downloader_job: DownloaderJob=None):
"""
Create a processor job and queue a processor task for sample related to an experiment.
"""

# If there's no original files then we've created all the jobs we need to!
if len(original_files) == 0:
return

# For anything that has raw data there should only be one Sample per OriginalFile
sample_object = original_files[0].samples.first()
pipeline_to_apply = determine_processor_pipeline(sample_object, original_files[0])

if pipeline_to_apply == ProcessorPipeline.NONE:
logger.info("No valid processor pipeline found to apply to sample.",
sample=sample_object.id,
original_file=original_files[0].id)
for original_file in original_files:
original_file.delete_local_file()
original_file.is_downloaded = False
original_file.save()
else:
processor_job = ProcessorJob()
processor_job.pipeline_applied = pipeline_to_apply.value
processor_job.ram_amount = determine_ram_amount(sample_object, processor_job)
processor_job.save()
for original_file in original_files:
assoc = ProcessorJobOriginalFileAssociation()
assoc.original_file = original_file
assoc.processor_job = processor_job
assoc.save()

logger.debug("Queuing processor job.",
processor_job=processor_job.id)

try:
send_job(pipeline_to_apply, processor_job)
except:
# If we cannot queue the job now the Foreman will do
# it later.
pass
47 changes: 35 additions & 12 deletions common/data_refinery_common/rna_seq.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, List
from django.db.models import OuterRef, Subquery

from data_refinery_common.job_lookup import ProcessorEnum
from data_refinery_common.logging import get_and_configure_logger
Expand Down Expand Up @@ -33,15 +34,24 @@


def should_run_tximport(experiment: Experiment,
num_quantified: int,
results,
is_tximport_job: bool):
""" Returns whether or not the experiment is eligible to have tximport
run on it.

num_quantified is how many samples have had salmon quant run on them.
results is a queryset of ComputationalResults for the samples that had salmon quant run on them.
"""
num_quantified = results.count()
if num_quantified == 0:
return False

num_salmon_versions = results.filter(organism_index__salmon_version__isnull=False)\
.values_list('organism_index__salmon_version')\
.distinct().count()
if num_salmon_versions > 1:
# Tximport requires that all samples are processed with the same salmon version
# https://github.com/AlexsLemonade/refinebio/issues/1496
return False

eligible_samples = experiment.samples.filter(source_database='SRA', technology='RNA-SEQ')

Expand All @@ -66,16 +76,29 @@ def should_run_tximport(experiment: Experiment,

def get_quant_results_for_experiment(experiment: Experiment):
"""Returns a list of salmon quant results from `experiment`."""
results = []
for sample in experiment.samples.all():
for result in sample.results.order_by('-created_at').all():
# TODO: this will break when we want to run for a new version.
if result.processor.name == ProcessorEnum.SALMON_QUANT.value['name']:
results.append(result)
break

return results

# Subquery to calculate quant results
# https://docs.djangoproject.com/en/2.2/ref/models/expressions/#subquery-expressions

# Calculate the computational results sorted that are associated with a given sample (
# referenced from the top query)
newest_computational_results = ComputationalResult.objects.all()\
.filter(
samples=OuterRef('id'),
processor__name=ProcessorEnum.SALMON_QUANT.value['name']
)\
.order_by('-created_at')

# Annotate each sample in the experiment with the id of the most recent computational result
computational_results_ids = experiment.samples.all().annotate(
latest_computational_result_id=Subquery(newest_computational_results.values('id')[:1])
)\
.filter(latest_computational_result_id__isnull=False)\
.values_list('latest_computational_result_id', flat=True)

# return the computational results that match those ids
return ComputationalResult.objects.all().filter(
id__in=computational_results_ids
)

def get_quant_files_for_results(results: List[ComputationalResult]):
"""Returns a list of salmon quant results from `experiment`."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import os
import psutil

from django.test import TestCase, tag
from typing import List
from unittest.mock import patch, call
from urllib.error import URLError

from data_refinery_workers.downloaders import utils
from data_refinery_common.job_management import create_processor_job_for_original_files


class UtilsTestCase(TestCase):
@tag('downloaders')
def test_no_jobs_to_create(self):
"""Make sure this function doesn't raise an exception with no files."""
utils.create_processor_job_for_original_files([])
create_processor_job_for_original_files([])

self.assertTrue(True)
Loading