From be16485820c9f4c510bdfa2489c7d5e357185d3b Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Thu, 16 May 2019 16:56:13 -0400 Subject: [PATCH 01/42] Add the code to enforce a timeout on salmon. Hopefully this isn't actually necessary... --- .../processors/salmon.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/workers/data_refinery_workers/processors/salmon.py b/workers/data_refinery_workers/processors/salmon.py index b5e4dc2e3..f2551ecc4 100644 --- a/workers/data_refinery_workers/processors/salmon.py +++ b/workers/data_refinery_workers/processors/salmon.py @@ -785,10 +785,27 @@ def _run_salmon(job_context: Dict) -> Dict: formatted_command, processor_job=job_context["job_id"]) + # 3 hours seems like a reasonable timeout for salmon. This is + # necessary because some samples make salmon hang forever and this + # ties up our computing resources forever. Until this bug is + # fixed, we'll just have to do it like this. + timeout = 60 * 60 * 3 job_context['time_start'] = timezone.now() - completed_command = subprocess.run(formatted_command.split(), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + try: + completed_command = subprocess.run(formatted_command.split(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=timeout) + except subprocess.TimeoutError: + failure_reason = "Salmon timed out because it failed to complete within 3 hours." + logger.error(failure_reason, + sample_accesion_code=job_context["sample"].accession_code, + processor_job=job_context["job_id"] + ) + job_context["job"].failure_reason = failure_reason + job_context["success"] = False + return job_context + job_context['time_end'] = timezone.now() ## To me, this looks broken: error codes are anything non-zero. From e883bad17eac48215fa268df2fcb6cc95d983a40 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Wed, 5 Jun 2019 15:21:32 -0400 Subject: [PATCH 02/42] Create management command to recreate downloader jobs for all samples that don't have computed files on a per-source-database basis. --- common/data_refinery_common/utils.py | 130 +++++++++++++++++- .../management/commands/retry_samples.py | 70 ++++++++++ .../data_refinery_workers/processors/utils.py | 130 +----------------- 3 files changed, 201 insertions(+), 129 deletions(-) create mode 100644 foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py diff --git a/common/data_refinery_common/utils.py b/common/data_refinery_common/utils.py index be7ff61e3..d111ba615 100644 --- a/common/data_refinery_common/utils.py +++ b/common/data_refinery_common/utils.py @@ -1,4 +1,4 @@ -from typing import Dict, Set +from typing import Dict, Set, List from urllib.parse import urlparse import csv import nomad @@ -331,3 +331,131 @@ def load_blacklist(blacklist_csv: str="config/RNASeqRunBlackList.csv"): blacklisted_samples.append(line[0].strip()) return blacklisted_samples + + +def create_downloader_job(undownloaded_files: List[OriginalFile], + *, + processor_job_id=None, + force=False) -> bool: + """Creates a downloader job to download `undownloaded_files`.""" + if not undownloaded_files: + return False + + original_downloader_job = None + archive_file = None + for undownloaded_file in undownloaded_files: + try: + original_downloader_job = undownloaded_file.downloader_jobs.latest('id') + + # Found the job so we don't need to keep going. + break + except DownloaderJob.DoesNotExist: + # If there's no association between this file and any + # downloader jobs, it's most likely because the original + # file was created after extracting a archive containing + # multiple files worth of data. + # The way to handle this is to find that archive and + # recreate a downloader job FOR THAT. That archive will + # have the same filename as the file at the end of the + # 'source_url' field, because that source URL is pointing + # to the archive we need. + archive_filename = undownloaded_file.source_url.split("/")[-1] + + # This file or its job might not exist, but we'll wait + # until we've checked all the files before calling it a + # failure. + try: + archive_file = OriginalFile.objects.filter(filename=archive_filename) + if archive_file.count() > 0: + archive_file = archive_file.first() + else: + # We might need to match these up based on + # source_filenames rather than filenames so just + # try them both. + archive_file = OriginalFile.objects.filter(source_filename=archive_filename).first() + + original_downloader_job = DownloaderJobOriginalFileAssociation.objects.filter( + original_file=archive_file + ).latest('id').downloader_job + # Found the job so we don't need to keep going. + break + except: + pass + + if not original_downloader_job: + sample_object = list(undownloaded_files)[0].samples.first() + if sample_object: + downloader_task = job_lookup.determine_downloader_task(sample_object) + + if downloader_task == job_lookup.Downloaders.NONE: + logger.warn(("No valid downloader task found for sample, which is weird" + " because it was able to have a processor job created for it..."), + sample=sample_object.id) + return False + else: + # determine_downloader_task returns an enum object, + # but we wanna set this on the DownloaderJob object so + # we want the actual value. + downloader_task = downloader_task.value + + accession_code = sample_object.accession_code + original_files = sample_object.original_files.all() + else: + logger.error( + "Could not find the original DownloaderJob or Sample for these files.", + undownloaded_file=undownloaded_files + ) + return False + elif original_downloader_job.was_recreated and not force: + logger.warn( + "Downloader job has already been recreated once, not doing it again.", + original_downloader_job=original_downloader_job, + undownloaded_files=undownloaded_files + ) + return False + else: + downloader_task = original_downloader_job.downloader_task + accession_code = original_downloader_job.accession_code + original_files = original_downloader_job.original_files.all() + + sample_object = original_files[0].samples.first() + + new_job = DownloaderJob() + new_job.downloader_task = downloader_task + new_job.accession_code = accession_code + new_job.was_recreated = True + new_job.ram_amount = 1024 + new_job.save() + + if archive_file: + # If this downloader job is for an archive file, then the + # files that were passed into this function aren't what need + # to be directly downloaded, they were extracted out of this + # archive. The DownloaderJob will re-extract them and set up + # the associations for the new ProcessorJob. + # So double check that it still needs downloading because + # another file that came out of it could have already + # recreated the DownloaderJob. + if archive_file.needs_downloading(processor_job_id): + if archive_file.is_downloaded: + # If it needs to be downloaded then it's not + # downloaded and the is_downloaded field should stop + # lying about that. + archive_file.is_downloaded = False + archive_file.save() + + DownloaderJobOriginalFileAssociation.objects.get_or_create( + downloader_job=new_job, + original_file=archive_file + ) + else: + # We can't just associate the undownloaded files, because + # there's a chance that there is a file which actually is + # downloaded that also needs to be associated with the job. + for original_file in original_files: + DownloaderJobOriginalFileAssociation.objects.get_or_create( + downloader_job=new_job, + original_file=original_file + ) + + return True diff --git a/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py new file mode 100644 index 000000000..815124970 --- /dev/null +++ b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py @@ -0,0 +1,70 @@ +import random +import sys +import time +from typing import Dict, List + +from django.core.management.base import BaseCommand +from nomad import Nomad + +from data_refinery_common.models import ( + DownloaderJob, + DownloaderJobOriginalFileAssociation, + Experiment, + ExperimentOrganismAssociation, + ExperimentSampleAssociation, + Organism, + OriginalFile, + ProcessorJob, + ProcessorJobOriginalFileAssociation, + Sample, +) +from data_refinery_common.job_lookup import ProcessorPipeline, Downloaders +from data_refinery_common.logging import get_and_configure_logger +from data_refinery_common.message_queue import send_job +from data_refinery_common.utils import get_env_variable, get_active_volumes, create_downloader_job + + +logger = get_and_configure_logger(__name__) + +PAGE_SIZE=2000 + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + "--source-database", + type=str, + help=("The name of a source database, such as Array Express, GEO, or SRA." + "All samples from this source database will have downloader " + "jobs requeued for them.") + ) + + def handle(self, *args, **options): + """Requeues all unprocessed RNA-Seq samples for an organism. + """ + if options["source_database"] is None: + logger.error("You must specify a source-database.") + sys.exit(1) + else: + source_database = options["source_database"] + + samples_with_computed_files = SampleComputedFileAssociation.objects. + unprocessed_samples = Sample.objects.filter(computed_files=[]) + sra_samples = Sample.objects.filter( + source_database="SRA" + ).prefetch_related( + "computed_files", + "original_files" + ) + + paginator = Paginator(sra_samples, PAGE_SIZE) + page = paginator.page() + page_count = 0 + + while page.has_next(): + for sample in page.object_list: + if sample.computed_files.count() == 0: + create_downloader_job(sample.original_files, force=True) + + # 2000 samples queued up every five minutes should be fast + # enough and also not thrash the DB. + time.sleep(60 * 5) diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index ab1538172..867d67dde 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -13,7 +13,7 @@ from typing import List, Dict, Callable from data_refinery_common import job_lookup -from data_refinery_common.logging import get_and_configure_logger +from data_refinery_common.logging import get_and_configure_logger, create_downloader_job from data_refinery_common.models import ( ComputationalResult, ComputationalResultAnnotation, @@ -58,132 +58,6 @@ def signal_handler(sig, frame): CURRENT_JOB.save() sys.exit(0) - -def create_downloader_job(undownloaded_files: OriginalFile, processor_job_id: int) -> bool: - """Creates a downloader job to download `undownloaded_files`.""" - if not undownloaded_files: - return False - - original_downloader_job = None - archive_file = None - for undownloaded_file in undownloaded_files: - try: - original_downloader_job = undownloaded_file.downloader_jobs.latest('id') - - # Found the job so we don't need to keep going. - break - except DownloaderJob.DoesNotExist: - # If there's no association between this file and any - # downloader jobs, it's most likely because the original - # file was created after extracting a archive containing - # multiple files worth of data. - # The way to handle this is to find that archive and - # recreate a downloader job FOR THAT. That archive will - # have the same filename as the file at the end of the - # 'source_url' field, because that source URL is pointing - # to the archive we need. - archive_filename = undownloaded_file.source_url.split("/")[-1] - - # This file or its job might not exist, but we'll wait - # until we've checked all the files before calling it a - # failure. - try: - archive_file = OriginalFile.objects.filter(filename=archive_filename) - if archive_file.count() > 0: - archive_file = archive_file.first() - else: - # We might need to match these up based on - # source_filenames rather than filenames so just - # try them both. - archive_file = OriginalFile.objects.filter(source_filename=archive_filename).first() - - original_downloader_job = DownloaderJobOriginalFileAssociation.objects.filter( - original_file=archive_file - ).latest('id').downloader_job - # Found the job so we don't need to keep going. - break - except: - pass - - if not original_downloader_job: - sample_object = list(undownloaded_files)[0].samples.first() - if sample_object: - downloader_task = job_lookup.determine_downloader_task(sample_object) - - if downloader_task == job_lookup.Downloaders.NONE: - logger.warn(("No valid downloader task found for sample, which is weird" - " because it was able to have a processor job created for it..."), - sample=sample_object.id) - return False - else: - # determine_downloader_task returns an enum object, - # but we wanna set this on the DownloaderJob object so - # we want the actual value. - downloader_task = downloader_task.value - - accession_code = sample_object.accession_code - original_files = sample_object.original_files.all() - else: - logger.error( - "Could not find the original DownloaderJob or Sample for these files.", - undownloaded_file=undownloaded_files - ) - return False - elif original_downloader_job.was_recreated: - logger.warn( - "Downloader job has already been recreated once, not doing it again.", - original_downloader_job=original_downloader_job, - undownloaded_files=undownloaded_files - ) - return False - else: - downloader_task = original_downloader_job.downloader_task - accession_code = original_downloader_job.accession_code - original_files = original_downloader_job.original_files.all() - - sample_object = original_files[0].samples.first() - - new_job = DownloaderJob() - new_job.downloader_task = downloader_task - new_job.accession_code = accession_code - new_job.was_recreated = True - new_job.ram_amount = 1024 - new_job.save() - - if archive_file: - # If this downloader job is for an archive file, then the - # files that were passed into this function aren't what need - # to be directly downloaded, they were extracted out of this - # archive. The DownloaderJob will re-extract them and set up - # the associations for the new ProcessorJob. - # So double check that it still needs downloading because - # another file that came out of it could have already - # recreated the DownloaderJob. - if archive_file.needs_downloading(processor_job_id): - if archive_file.is_downloaded: - # If it needs to be downloaded then it's not - # downloaded and the is_downloaded field should stop - # lying about that. - archive_file.is_downloaded = False - archive_file.save() - - DownloaderJobOriginalFileAssociation.objects.get_or_create( - downloader_job=new_job, - original_file=archive_file - ) - else: - # We can't just associate the undownloaded files, because - # there's a chance that there is a file which actually is - # downloaded that also needs to be associated with the job. - for original_file in original_files: - DownloaderJobOriginalFileAssociation.objects.get_or_create( - downloader_job=new_job, - original_file=original_file - ) - - return True - - def prepare_original_files(job_context): """ Provision in the Job context for OriginalFile-driven processors """ @@ -217,7 +91,7 @@ def prepare_original_files(job_context): missing_files=list(undownloaded_files) ) - if not create_downloader_job(undownloaded_files, job_context["job_id"]): + if not create_downloader_job(undownloaded_files, processor_job_id=job_context["job_id"]): failure_reason = "Missing file for processor job but unable to recreate downloader jobs!" logger.error(failure_reason, processor_job=job.id) job_context["success"] = False From 7adb8bd3cd69c1e1842e1805b587124c3f031c22 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Wed, 5 Jun 2019 15:23:27 -0400 Subject: [PATCH 03/42] Clean up the retry samples command. --- .../management/commands/retry_samples.py | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py index 815124970..2d3f4ba62 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py @@ -4,24 +4,13 @@ from typing import Dict, List from django.core.management.base import BaseCommand -from nomad import Nomad from data_refinery_common.models import ( - DownloaderJob, - DownloaderJobOriginalFileAssociation, - Experiment, - ExperimentOrganismAssociation, - ExperimentSampleAssociation, - Organism, - OriginalFile, - ProcessorJob, - ProcessorJobOriginalFileAssociation, Sample, ) -from data_refinery_common.job_lookup import ProcessorPipeline, Downloaders from data_refinery_common.logging import get_and_configure_logger -from data_refinery_common.message_queue import send_job -from data_refinery_common.utils import get_env_variable, get_active_volumes, create_downloader_job +from data_refinery_common.utils import create_downloader_job +from data_refinery_foreman.foreman.performant_pagination.pagination import PerformantPaginator as Paginator logger = get_and_configure_logger(__name__) @@ -47,8 +36,6 @@ def handle(self, *args, **options): else: source_database = options["source_database"] - samples_with_computed_files = SampleComputedFileAssociation.objects. - unprocessed_samples = Sample.objects.filter(computed_files=[]) sra_samples = Sample.objects.filter( source_database="SRA" ).prefetch_related( From 2f9e5395b6e69ee6d48d1fd5183792063b464eb9 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Wed, 5 Jun 2019 15:43:39 -0400 Subject: [PATCH 04/42] Add script to foreman instance to make running management commands easy. --- .../foreman-server-instance-user-data.tpl.sh | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/infrastructure/foreman-configuration/foreman-server-instance-user-data.tpl.sh b/infrastructure/foreman-configuration/foreman-server-instance-user-data.tpl.sh index 11daae616..a6cb85c32 100644 --- a/infrastructure/foreman-configuration/foreman-server-instance-user-data.tpl.sh +++ b/infrastructure/foreman-configuration/foreman-server-instance-user-data.tpl.sh @@ -42,6 +42,29 @@ docker run \\ chmod +x /home/ubuntu/run_foreman.sh /home/ubuntu/run_foreman.sh +# The foreman instance is used for running various management +# commands. This script provides an easy way to do that. +echo " +#!/bin/sh + +# This script should be used by passing a management command name as +# the first argument followed by any additional arguments to that +# command. + +docker run \\ + --env-file /home/ubuntu/environment \\ + -e DATABASE_HOST=${database_host} \\ + -e DATABASE_NAME=${database_name} \\ + -e DATABASE_USER=${database_user} \\ + -e DATABASE_PASSWORD=${database_password} \\ + -e ELASTICSEARCH_HOST=${elasticsearch_host} \\ + -e ELASTICSEARCH_PORT=${elasticsearch_port} \\ + -v /tmp:/tmp \\ + --add-host=nomad:${nomad_lead_server_ip} \\ + -it -d ${dockerhub_repo}/${foreman_docker_image} python3 manage.py \$@ +" >> /home/ubuntu/run_management_command.sh +chmod +x /home/ubuntu/run_foreman_command.sh + # Start the Nomad agent in server mode via Monit apt-get -y update apt-get -y install monit htop From 82ad9bf26521c527a5b21082e75317065afb347c Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Wed, 5 Jun 2019 16:23:05 -0400 Subject: [PATCH 05/42] Break a dependency cycle. --- common/data_refinery_common/job_management.py | 145 ++++++++++++++++++ common/data_refinery_common/utils.py | 128 ---------------- .../data_refinery_workers/processors/utils.py | 3 +- 3 files changed, 147 insertions(+), 129 deletions(-) create mode 100644 common/data_refinery_common/job_management.py diff --git a/common/data_refinery_common/job_management.py b/common/data_refinery_common/job_management.py new file mode 100644 index 000000000..99bacdd9f --- /dev/null +++ b/common/data_refinery_common/job_management.py @@ -0,0 +1,145 @@ +from typing import List, Dict, Callable + +from data_refinery_common import job_lookup +from data_refinery_common.logging import get_and_configure_logger +from data_refinery_common.models import ( + DownloaderJob, + DownloaderJobOriginalFileAssociation, + OriginalFile, +) +from data_refinery_common.utils import ( + get_env_variable, + get_env_variable_gracefully, + get_instance_id, +) + + +logger = get_and_configure_logger(__name__) + + +def create_downloader_job(undownloaded_files: List[OriginalFile], + *, + processor_job_id=None, + force=False) -> bool: + """Creates a downloader job to download `undownloaded_files`.""" + if not undownloaded_files: + return False + + original_downloader_job = None + archive_file = None + for undownloaded_file in undownloaded_files: + try: + original_downloader_job = undownloaded_file.downloader_jobs.latest('id') + + # Found the job so we don't need to keep going. + break + except DownloaderJob.DoesNotExist: + # If there's no association between this file and any + # downloader jobs, it's most likely because the original + # file was created after extracting a archive containing + # multiple files worth of data. + # The way to handle this is to find that archive and + # recreate a downloader job FOR THAT. That archive will + # have the same filename as the file at the end of the + # 'source_url' field, because that source URL is pointing + # to the archive we need. + archive_filename = undownloaded_file.source_url.split("/")[-1] + + # This file or its job might not exist, but we'll wait + # until we've checked all the files before calling it a + # failure. + try: + archive_file = OriginalFile.objects.filter(filename=archive_filename) + if archive_file.count() > 0: + archive_file = archive_file.first() + else: + # We might need to match these up based on + # source_filenames rather than filenames so just + # try them both. + archive_file = OriginalFile.objects.filter(source_filename=archive_filename).first() + + original_downloader_job = DownloaderJobOriginalFileAssociation.objects.filter( + original_file=archive_file + ).latest('id').downloader_job + # Found the job so we don't need to keep going. + break + except: + pass + + if not original_downloader_job: + sample_object = list(undownloaded_files)[0].samples.first() + if sample_object: + downloader_task = job_lookup.determine_downloader_task(sample_object) + + if downloader_task == job_lookup.Downloaders.NONE: + logger.warn(("No valid downloader task found for sample, which is weird" + " because it was able to have a processor job created for it..."), + sample=sample_object.id) + return False + else: + # determine_downloader_task returns an enum object, + # but we wanna set this on the DownloaderJob object so + # we want the actual value. + downloader_task = downloader_task.value + + accession_code = sample_object.accession_code + original_files = sample_object.original_files.all() + else: + logger.error( + "Could not find the original DownloaderJob or Sample for these files.", + undownloaded_file=undownloaded_files + ) + return False + elif original_downloader_job.was_recreated and not force: + logger.warn( + "Downloader job has already been recreated once, not doing it again.", + original_downloader_job=original_downloader_job, + undownloaded_files=undownloaded_files + ) + return False + else: + downloader_task = original_downloader_job.downloader_task + accession_code = original_downloader_job.accession_code + original_files = original_downloader_job.original_files.all() + + sample_object = original_files[0].samples.first() + + new_job = DownloaderJob() + new_job.downloader_task = downloader_task + new_job.accession_code = accession_code + new_job.was_recreated = True + new_job.ram_amount = 1024 + new_job.save() + + if archive_file: + # If this downloader job is for an archive file, then the + # files that were passed into this function aren't what need + # to be directly downloaded, they were extracted out of this + # archive. The DownloaderJob will re-extract them and set up + # the associations for the new ProcessorJob. + # So double check that it still needs downloading because + # another file that came out of it could have already + # recreated the DownloaderJob. + if archive_file.needs_downloading(processor_job_id): + if archive_file.is_downloaded: + # If it needs to be downloaded then it's not + # downloaded and the is_downloaded field should stop + # lying about that. + archive_file.is_downloaded = False + archive_file.save() + + DownloaderJobOriginalFileAssociation.objects.get_or_create( + downloader_job=new_job, + original_file=archive_file + ) + else: + # We can't just associate the undownloaded files, because + # there's a chance that there is a file which actually is + # downloaded that also needs to be associated with the job. + for original_file in original_files: + DownloaderJobOriginalFileAssociation.objects.get_or_create( + downloader_job=new_job, + original_file=original_file + ) + + return True diff --git a/common/data_refinery_common/utils.py b/common/data_refinery_common/utils.py index d111ba615..66ab80df0 100644 --- a/common/data_refinery_common/utils.py +++ b/common/data_refinery_common/utils.py @@ -331,131 +331,3 @@ def load_blacklist(blacklist_csv: str="config/RNASeqRunBlackList.csv"): blacklisted_samples.append(line[0].strip()) return blacklisted_samples - - -def create_downloader_job(undownloaded_files: List[OriginalFile], - *, - processor_job_id=None, - force=False) -> bool: - """Creates a downloader job to download `undownloaded_files`.""" - if not undownloaded_files: - return False - - original_downloader_job = None - archive_file = None - for undownloaded_file in undownloaded_files: - try: - original_downloader_job = undownloaded_file.downloader_jobs.latest('id') - - # Found the job so we don't need to keep going. - break - except DownloaderJob.DoesNotExist: - # If there's no association between this file and any - # downloader jobs, it's most likely because the original - # file was created after extracting a archive containing - # multiple files worth of data. - # The way to handle this is to find that archive and - # recreate a downloader job FOR THAT. That archive will - # have the same filename as the file at the end of the - # 'source_url' field, because that source URL is pointing - # to the archive we need. - archive_filename = undownloaded_file.source_url.split("/")[-1] - - # This file or its job might not exist, but we'll wait - # until we've checked all the files before calling it a - # failure. - try: - archive_file = OriginalFile.objects.filter(filename=archive_filename) - if archive_file.count() > 0: - archive_file = archive_file.first() - else: - # We might need to match these up based on - # source_filenames rather than filenames so just - # try them both. - archive_file = OriginalFile.objects.filter(source_filename=archive_filename).first() - - original_downloader_job = DownloaderJobOriginalFileAssociation.objects.filter( - original_file=archive_file - ).latest('id').downloader_job - # Found the job so we don't need to keep going. - break - except: - pass - - if not original_downloader_job: - sample_object = list(undownloaded_files)[0].samples.first() - if sample_object: - downloader_task = job_lookup.determine_downloader_task(sample_object) - - if downloader_task == job_lookup.Downloaders.NONE: - logger.warn(("No valid downloader task found for sample, which is weird" - " because it was able to have a processor job created for it..."), - sample=sample_object.id) - return False - else: - # determine_downloader_task returns an enum object, - # but we wanna set this on the DownloaderJob object so - # we want the actual value. - downloader_task = downloader_task.value - - accession_code = sample_object.accession_code - original_files = sample_object.original_files.all() - else: - logger.error( - "Could not find the original DownloaderJob or Sample for these files.", - undownloaded_file=undownloaded_files - ) - return False - elif original_downloader_job.was_recreated and not force: - logger.warn( - "Downloader job has already been recreated once, not doing it again.", - original_downloader_job=original_downloader_job, - undownloaded_files=undownloaded_files - ) - return False - else: - downloader_task = original_downloader_job.downloader_task - accession_code = original_downloader_job.accession_code - original_files = original_downloader_job.original_files.all() - - sample_object = original_files[0].samples.first() - - new_job = DownloaderJob() - new_job.downloader_task = downloader_task - new_job.accession_code = accession_code - new_job.was_recreated = True - new_job.ram_amount = 1024 - new_job.save() - - if archive_file: - # If this downloader job is for an archive file, then the - # files that were passed into this function aren't what need - # to be directly downloaded, they were extracted out of this - # archive. The DownloaderJob will re-extract them and set up - # the associations for the new ProcessorJob. - # So double check that it still needs downloading because - # another file that came out of it could have already - # recreated the DownloaderJob. - if archive_file.needs_downloading(processor_job_id): - if archive_file.is_downloaded: - # If it needs to be downloaded then it's not - # downloaded and the is_downloaded field should stop - # lying about that. - archive_file.is_downloaded = False - archive_file.save() - - DownloaderJobOriginalFileAssociation.objects.get_or_create( - downloader_job=new_job, - original_file=archive_file - ) - else: - # We can't just associate the undownloaded files, because - # there's a chance that there is a file which actually is - # downloaded that also needs to be associated with the job. - for original_file in original_files: - DownloaderJobOriginalFileAssociation.objects.get_or_create( - downloader_job=new_job, - original_file=original_file - ) - - return True diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index 867d67dde..8c142394d 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -13,7 +13,8 @@ from typing import List, Dict, Callable from data_refinery_common import job_lookup -from data_refinery_common.logging import get_and_configure_logger, create_downloader_job +from data_refinery_common.job_management import create_downloader_job +from data_refinery_common.logging import get_and_configure_logger from data_refinery_common.models import ( ComputationalResult, ComputationalResultAnnotation, From 3aab4465f2c043e7f5971ebd5b229b96e3cb6d02 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Thu, 6 Jun 2019 12:45:06 -0400 Subject: [PATCH 06/42] Fixes up the retry_samples command. --- .../management/commands/retry_samples.py | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py index 2d3f4ba62..299e6d65e 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py @@ -9,7 +9,7 @@ Sample, ) from data_refinery_common.logging import get_and_configure_logger -from data_refinery_common.utils import create_downloader_job +from data_refinery_common.job_management import create_downloader_job from data_refinery_foreman.foreman.performant_pagination.pagination import PerformantPaginator as Paginator @@ -37,7 +37,7 @@ def handle(self, *args, **options): source_database = options["source_database"] sra_samples = Sample.objects.filter( - source_database="SRA" + source_database=source_database ).prefetch_related( "computed_files", "original_files" @@ -47,10 +47,23 @@ def handle(self, *args, **options): page = paginator.page() page_count = 0 - while page.has_next(): + creation_count = 0 + while True: for sample in page.object_list: if sample.computed_files.count() == 0: - create_downloader_job(sample.original_files, force=True) + logger.debug("Creating downloader job for a sample.", + sample=sample.accession_code) + if create_downloader_job(sample.original_files.all(), force=True): + creation_count += 1 + + logger.info( + "Created %d new downloader jobs because their samples lacked computed files.", + creation_count + ) + + if not page.has_next(): + break + creation_count = 0 # 2000 samples queued up every five minutes should be fast # enough and also not thrash the DB. From 15e8c62bac2721102c90537a0c851e67c5c8a152 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Thu, 6 Jun 2019 13:52:10 -0400 Subject: [PATCH 07/42] Update the page on every cycle of the loop. --- .../foreman/management/commands/retry_samples.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py index 299e6d65e..acbaa5ae4 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py @@ -63,6 +63,9 @@ def handle(self, *args, **options): if not page.has_next(): break + else: + page = paginator.page(page.next_page_number()) + creation_count = 0 # 2000 samples queued up every five minutes should be fast From 800b2e323680ead0be214a678fe565b4a6ff57e9 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Thu, 6 Jun 2019 14:02:40 -0400 Subject: [PATCH 08/42] Move salmon timeout down to 1 hour. --- workers/data_refinery_workers/processors/salmon.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workers/data_refinery_workers/processors/salmon.py b/workers/data_refinery_workers/processors/salmon.py index f2551ecc4..47b26e3cc 100644 --- a/workers/data_refinery_workers/processors/salmon.py +++ b/workers/data_refinery_workers/processors/salmon.py @@ -785,11 +785,11 @@ def _run_salmon(job_context: Dict) -> Dict: formatted_command, processor_job=job_context["job_id"]) - # 3 hours seems like a reasonable timeout for salmon. This is + # 1 hour seems like a reasonable timeout for salmon. This is # necessary because some samples make salmon hang forever and this # ties up our computing resources forever. Until this bug is # fixed, we'll just have to do it like this. - timeout = 60 * 60 * 3 + timeout = 60 * 60 job_context['time_start'] = timezone.now() try: completed_command = subprocess.run(formatted_command.split(), From df4bc6bf7b18423160922240c9783311465f7ed8 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Thu, 6 Jun 2019 16:39:32 -0400 Subject: [PATCH 09/42] Make the organism shepherd only worry about experiments that we've processed at least one sample for. --- .../management/commands/organism_shepherd.py | 23 +++++++++++-------- .../processors/salmon.py | 2 +- .../data_refinery_workers/processors/utils.py | 7 +++++- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/organism_shepherd.py b/foreman/data_refinery_foreman/foreman/management/commands/organism_shepherd.py index 2b340c39a..ca4be62e3 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/organism_shepherd.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/organism_shepherd.py @@ -71,11 +71,16 @@ def build_completion_list(organism: Organism) -> List[Dict]: else: unprocessed_samples.add(sample) - completion_list.append({ - "experiment": experiment, - "unprocessed": unprocessed_samples, - "processed": processed_samples - }) + # For now we only want to queue samples from experiments from + # which we've been able to process at least one sample, + # because that means the sample probably does not have unmated + # reads. Unmated reads currently break our salmon pipeline. + if len(processed_samples) > 0: + completion_list.append({ + "experiment": experiment, + "unprocessed": unprocessed_samples, + "processed": processed_samples + }) return completion_list @@ -263,16 +268,16 @@ def handle(self, *args, **options): # active volumes. Also in order to spread them around # do so randomly. We don't want to hammer Nomad to # get the active volumes though, so just do it once - # per 10 minute loop. + # per 5 minute loop. volume_index = random.choice(list(get_active_volumes())) for i in range(num_short_from_max): if len(prioritized_job_list) > 0: requeue_job(prioritized_job_list.pop(0), volume_index) - # Wait 10 minutes in between queuing additional work to + # Wait 5 minutes in between queuing additional work to # give it time to actually get done. if len(prioritized_job_list) > 0: - logger.info("Sleeping for 10 minutes while jobs get done.") - time.sleep(600) + logger.info("Sleeping for 5 minutes while jobs get done.") + time.sleep(300) logger.info("Successfully requeued all jobs for unprocessed %s samples.", organism_name) diff --git a/workers/data_refinery_workers/processors/salmon.py b/workers/data_refinery_workers/processors/salmon.py index 47b26e3cc..f34def105 100644 --- a/workers/data_refinery_workers/processors/salmon.py +++ b/workers/data_refinery_workers/processors/salmon.py @@ -796,7 +796,7 @@ def _run_salmon(job_context: Dict) -> Dict: stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) - except subprocess.TimeoutError: + except subprocess.TimeoutExpired: failure_reason = "Salmon timed out because it failed to complete within 3 hours." logger.error(failure_reason, sample_accesion_code=job_context["sample"].accession_code, diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index 8c142394d..46517bda7 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -92,7 +92,12 @@ def prepare_original_files(job_context): missing_files=list(undownloaded_files) ) - if not create_downloader_job(undownloaded_files, processor_job_id=job_context["job_id"]): + was_job_created = create_downloader_job( + undownloaded_files, + processor_job_id=job_context["job_id"], + force=True + ) + if not was_job_created: failure_reason = "Missing file for processor job but unable to recreate downloader jobs!" logger.error(failure_reason, processor_job=job.id) job_context["success"] = False From 22c0f164485223e729f156bb14110756094a4f7c Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Thu, 6 Jun 2019 16:47:28 -0400 Subject: [PATCH 10/42] Fix the tests for the organism shepherd. --- .../commands/test_organism_shepherd.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/test_organism_shepherd.py b/foreman/data_refinery_foreman/foreman/management/commands/test_organism_shepherd.py index 3fc0fcefa..c176ad189 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/test_organism_shepherd.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/test_organism_shepherd.py @@ -205,11 +205,13 @@ def mock_init_nomad(host, port=0, timeout=0): fifty_percent_processor_job.refresh_from_db() self.assertEqual(first_call_job_object, fifty_percent_processor_job.retried_job) - second_call_job_type = mock_calls[1][1][0] - second_call_job_object = mock_calls[1][2]["job"] - self.assertEqual(second_call_job_type, Downloaders.SRA) - self.assertEqual(second_call_job_object.accession_code, zero_percent_dl_job.accession_code) - self.assertEqual(second_call_job_object.downloader_task, zero_percent_dl_job.downloader_task) - - zero_percent_dl_job.refresh_from_db() - self.assertEqual(second_call_job_object, zero_percent_dl_job.retried_job) + # For now we aren't queuing experiments that haven't been processed at all. + self.assertEqual(len(mock_calls), 1) + # second_call_job_type = mock_calls[1][1][0] + # second_call_job_object = mock_calls[1][2]["job"] + # self.assertEqual(second_call_job_type, Downloaders.SRA) + # self.assertEqual(second_call_job_object.accession_code, zero_percent_dl_job.accession_code) + # self.assertEqual(second_call_job_object.downloader_task, zero_percent_dl_job.downloader_task) + + # zero_percent_dl_job.refresh_from_db() + # self.assertEqual(second_call_job_object, zero_percent_dl_job.retried_job) From 6b5b8d31ef15e7ffe17df881e200007f3e4c8967 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Fri, 7 Jun 2019 12:20:41 -0400 Subject: [PATCH 11/42] Remove blacklisting code --- foreman/data_refinery_foreman/foreman/main.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/foreman/data_refinery_foreman/foreman/main.py b/foreman/data_refinery_foreman/foreman/main.py index a0735715b..8e465d89c 100644 --- a/foreman/data_refinery_foreman/foreman/main.py +++ b/foreman/data_refinery_foreman/foreman/main.py @@ -353,13 +353,6 @@ def requeue_downloader_job(last_job: DownloaderJob) -> bool: return False first_sample = original_file.samples.first() - if first_sample and first_sample.is_blacklisted: - last_job.no_retry = True - last_job.success = False - last_job.failure_reason = "Sample run accession has been blacklisted by SRA." - last_job.save() - logger.info("Avoiding requeuing for DownloaderJob for blacklisted run accession: " + str(first_sample.accession_code)) - return False # This is a magic string that all the dbGaP studies appear to have if first_sample and ("in the dbGaP study" in first_sample.title): From e483cc35afd170cde08d879a757587e65193c572 Mon Sep 17 00:00:00 2001 From: Ariel Date: Mon, 10 Jun 2019 17:05:57 -0400 Subject: [PATCH 12/42] Use api to get ensembl version --- .gitignore | 2 ++ .../data_refinery_foreman/surveyor/transcriptome_index.py | 7 +------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 4e5022d5a..30ca10df9 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,5 @@ dr_env/ # emacs backup files *~ + +.vscode \ No newline at end of file diff --git a/foreman/data_refinery_foreman/surveyor/transcriptome_index.py b/foreman/data_refinery_foreman/surveyor/transcriptome_index.py index 6c64d228e..e4bfbbca2 100644 --- a/foreman/data_refinery_foreman/surveyor/transcriptome_index.py +++ b/foreman/data_refinery_foreman/surveyor/transcriptome_index.py @@ -60,12 +60,7 @@ def __init__(self, species: Dict): self.url_root = "ensemblgenomes.org/pub/release-{assembly_version}/{short_division}" self.short_division = DIVISION_LOOKUP[species["division"]] self.assembly = species["assembly_name"].replace(" ", "_") - - # For some reason the API is returning version 44, which doesn't seem to exist - # in the FTP servers: ftp://ftp.ensemblgenomes.org/pub/ - # That's why the version is hardcoded below. - # self.assembly_version = utils.requests_retry_session().get(DIVISION_RELEASE_URL).json()["version"] - self.assembly_version = '43' + self.assembly_version = utils.requests_retry_session().get(DIVISION_RELEASE_URL).json()["version"] self.species_sub_dir = species["name"] self.filename_species = species["name"].capitalize() From 14e3049c7144bffe32d175b00742c4ada67be632 Mon Sep 17 00:00:00 2001 From: Ariel Date: Tue, 11 Jun 2019 11:54:53 -0400 Subject: [PATCH 13/42] Send slack notification when a dataset fails processing --- .../processors/smasher.py | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/workers/data_refinery_workers/processors/smasher.py b/workers/data_refinery_workers/processors/smasher.py index e2396cb6f..a6c0610bc 100644 --- a/workers/data_refinery_workers/processors/smasher.py +++ b/workers/data_refinery_workers/processors/smasher.py @@ -9,6 +9,7 @@ import simplejson as json import string import warnings +import requests from botocore.exceptions import ClientError from datetime import datetime, timedelta @@ -904,6 +905,38 @@ def _notify(job_context: Dict) -> Dict: # SES ## if job_context.get("upload", True) and settings.RUNNING_IN_CLOUD: + # Link to the dataset page, where the user can re-try the download job + dataset_url = 'https://www.refine.bio/dataset/' + str(job_context['dataset'].id) + + # Send a notification to slack when a dataset fails to be processed + if job_context['job'].failure_reason not in ['', None]: + try: + requests.post( + "https://hooks.slack.com/services/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", + json={ + 'fallback': 'Dataset failed processing.' + 'title': 'Dataset failed processing', + 'title_link': dataset_url, + "attachments":[ + { + "color": "warning", + "text": job_context['job'].failure_reason, + 'author_name': job_context["dataset"].email_address, + 'fields': [ + { + 'title': 'Id', + 'value': str(job_context['dataset'].id) + } + ] + } + ] + }, + headers={'Content-Type': 'application/json'}, + timeout=10 + ) + except Exception as e: + logger.error(e) # It doens't really matter if this didn't work + pass # Don't send an email if we don't have address. if job_context["dataset"].email_address: @@ -912,8 +945,6 @@ def _notify(job_context: Dict) -> Dict: AWS_REGION = "us-east-1" CHARSET = "UTF-8" - # Link to the dataset page, where the user can re-try the download job - dataset_url = 'https://www.refine.bio/dataset/' + str(job_context['dataset'].id) if job_context['job'].failure_reason not in ['', None]: SUBJECT = "There was a problem processing your refine.bio dataset :(" From acc0210f798a2bcb2d563ad994ec07219f1d259f Mon Sep 17 00:00:00 2001 From: Ariel Date: Tue, 11 Jun 2019 11:56:20 -0400 Subject: [PATCH 14/42] dataset id --- workers/data_refinery_workers/processors/smasher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/data_refinery_workers/processors/smasher.py b/workers/data_refinery_workers/processors/smasher.py index a6c0610bc..cac49e8a2 100644 --- a/workers/data_refinery_workers/processors/smasher.py +++ b/workers/data_refinery_workers/processors/smasher.py @@ -924,7 +924,7 @@ def _notify(job_context: Dict) -> Dict: 'author_name': job_context["dataset"].email_address, 'fields': [ { - 'title': 'Id', + 'title': 'Dataset id', 'value': str(job_context['dataset'].id) } ] From 42f48e9aa9a532830e047f702ad8863ada73eb75 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 12 Jun 2019 13:15:10 -0400 Subject: [PATCH 15/42] Update experiment cached values on `end_job` --- .../data_refinery_workers/processors/utils.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index 46517bda7..941716dd3 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -272,12 +272,9 @@ def end_job(job_context: Dict, abort=False): if mark_as_processed: # This handles most of our cases - unique_experiments = [] for sample in job_context.get("samples", []): sample.is_processed = True sample.save() - if sample.experiments.all().count() > 0: - unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) # Explicitly for the single-salmon scenario if 'sample' in job_context: @@ -285,10 +282,6 @@ def end_job(job_context: Dict, abort=False): sample.is_processed = True sample.save() - for experiment in unique_experiments: - experiment.update_num_samples() - experiment.save() - # If we are aborting, it's because we want to do something # different, so leave the original files so that "something # different" can use them. @@ -299,6 +292,15 @@ def end_job(job_context: Dict, abort=False): original_file.delete_local_file() if success: + # update the cached values of each experiment + unique_experiments = [] + for sample in job_context.get("samples", []): + if sample.experiments.all().count() > 0: + unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) + for experiment in unique_experiments: + experiment.update_num_samples() + experiment.save() + # QN reference files go to a special bucket so they can be # publicly available. if job_context["job"].pipeline_applied == "QN_REFERENCE": From 625348f4a9e70a56929059ec1c1d683ae35dcb29 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 12 Jun 2019 13:25:38 -0400 Subject: [PATCH 16/42] Add field num_downloadable_samples to experiment --- ...0021_experiment_num_downloadable_samples.py | 18 ++++++++++++++++++ common/data_refinery_common/models/models.py | 2 ++ common/version | 1 + .../data_refinery_workers/processors/utils.py | 1 - 4 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py create mode 100644 common/version diff --git a/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py new file mode 100644 index 000000000..df6dc7f86 --- /dev/null +++ b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py @@ -0,0 +1,18 @@ +# Generated by Django 2.1.8 on 2019-06-12 17:25 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('data_refinery_common', '0020_update_qn_bucket'), + ] + + operations = [ + migrations.AddField( + model_name='experiment', + name='num_downloadable_samples', + field=models.IntegerField(default=0), + ), + ] diff --git a/common/data_refinery_common/models/models.py b/common/data_refinery_common/models/models.py index a76a289bc..bdb14afca 100644 --- a/common/data_refinery_common/models/models.py +++ b/common/data_refinery_common/models/models.py @@ -297,6 +297,7 @@ def __str__(self): # Cached Computed Properties num_total_samples = models.IntegerField(default=0) num_processed_samples = models.IntegerField(default=0) + num_downloadable_samples = models.IntegerField(default=0) sample_metadata_fields = ArrayField(models.TextField(), default=list) organism_names = ArrayField(models.TextField(), default=list) platform_names = ArrayField(models.TextField(), default=list) @@ -326,6 +327,7 @@ def update_num_samples(self): """ Update our cache values """ self.num_total_samples = self.samples.count() self.num_processed_samples = self.samples.filter(is_processed=True).count() + self.num_processed_samples = 0 self.save() def to_metadata_dict(self): diff --git a/common/version b/common/version new file mode 100644 index 000000000..b4bc8df71 --- /dev/null +++ b/common/version @@ -0,0 +1 @@ +local1560360299 diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index 941716dd3..6087c7b2d 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -299,7 +299,6 @@ def end_job(job_context: Dict, abort=False): unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) for experiment in unique_experiments: experiment.update_num_samples() - experiment.save() # QN reference files go to a special bucket so they can be # publicly available. From 76b772d8154e2e6c324964da79c71272cd1f9fa9 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 12 Jun 2019 14:08:19 -0400 Subject: [PATCH 17/42] update num_downloadable_samples in migration --- .../0021_experiment_num_downloadable_samples.py | 10 ++++++++++ common/version | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py index df6dc7f86..9c8783aea 100644 --- a/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py +++ b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py @@ -1,7 +1,16 @@ # Generated by Django 2.1.8 on 2019-06-12 17:25 +import django.contrib.postgres.fields from django.db import migrations, models +def update_cached_values(apps, schema_editor): + """ """ + Experiment = apps.get_model('data_refinery_common', 'Experiment') + ComputationalResultAnnotation = apps.get_model('data_refinery_common', 'ComputationalResultAnnotation') + for experiment in Experiment.objects.all(): + organism_ids = list(ComputationalResultAnnotation.objects.filter(data__is_qn=True).values_list('data__organism_id', flat=True)) + experiment.num_downloadable_samples = experiment.samples.filter(is_processed=True, organism__id__in=organism_ids).count() + experiment.save() class Migration(migrations.Migration): @@ -15,4 +24,5 @@ class Migration(migrations.Migration): name='num_downloadable_samples', field=models.IntegerField(default=0), ), + migrations.RunPython(update_cached_values) ] diff --git a/common/version b/common/version index b4bc8df71..bbc2d086e 100644 --- a/common/version +++ b/common/version @@ -1 +1 @@ -local1560360299 +local1560362792 From c2343056e04bac0aec7fd0048f89de7d69f8f1e4 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 12 Jun 2019 15:10:10 -0400 Subject: [PATCH 18/42] Add `num_downloadable_samples` cached value --- api/data_refinery_api/serializers.py | 9 ++++++--- api/data_refinery_api/views.py | 10 +++++----- common/data_refinery_common/models/documents.py | 1 + common/data_refinery_common/models/models.py | 3 ++- common/version | 2 +- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/api/data_refinery_api/serializers.py b/api/data_refinery_api/serializers.py index 38715d393..030509c50 100644 --- a/api/data_refinery_api/serializers.py +++ b/api/data_refinery_api/serializers.py @@ -389,8 +389,7 @@ class Meta: class DetailedExperimentSerializer(serializers.ModelSerializer): annotations = ExperimentAnnotationSerializer(many=True, source='experimentannotation_set') samples = DetailedExperimentSampleSerializer(many=True) - organisms = OrganismSerializer(many=True) - sample_metadata = serializers.ReadOnlyField(source='get_sample_metadata_fields') + sample_metadata = serializers.ReadOnlyField(source='sample_metadata_fields') class Meta: model = Experiment @@ -414,8 +413,11 @@ class Meta: 'submitter_institution', 'last_modified', 'created_at', - 'organisms', + 'organism_names', 'sample_metadata', + 'num_total_samples', + 'num_processed_samples', + 'num_downloadable_samples' ) class PlatformSerializer(serializers.ModelSerializer): @@ -796,6 +798,7 @@ class ExperimentDocumentSerializer(serializers.Serializer): pubmed_id = serializers.CharField(read_only=True) num_total_samples = serializers.IntegerField(read_only=True) num_processed_samples = serializers.IntegerField(read_only=True) + num_downloadable_samples = serializers.IntegerField(read_only=True) source_first_published = serializers.DateField(read_only=True) # FK/M2M diff --git a/api/data_refinery_api/views.py b/api/data_refinery_api/views.py index 8742821f6..e8d7bc85c 100644 --- a/api/data_refinery_api/views.py +++ b/api/data_refinery_api/views.py @@ -157,7 +157,7 @@ def aggregate(self, request, queryset, view): All we need to add is one line when building the facets: - .metric('total_samples', 'sum', field='num_processed_samples') + .metric('total_samples', 'sum', field='num_downloadable_samples') (Maybe there's a way to do this with the options in `ExperimentDocumentView`) """ @@ -165,7 +165,7 @@ def aggregate(self, request, queryset, view): for field, facet in iteritems(facets): agg = facet['facet'].get_aggregation() queryset.aggs.bucket(field, agg)\ - .metric('total_samples', 'sum', field='num_processed_samples') + .metric('total_samples', 'sum', field='num_downloadable_samples') return queryset @@ -229,8 +229,8 @@ class ExperimentDocumentView(DocumentViewSet): 'has_publication': 'has_publication', 'platform': 'platform_accession_codes', 'organism': 'organism_names', - 'num_processed_samples': { - 'field': 'num_processed_samples', + 'num_downloadable_samples': { + 'field': 'num_downloadable_samples', 'lookups': [ LOOKUP_FILTER_RANGE, LOOKUP_QUERY_IN, @@ -245,7 +245,7 @@ class ExperimentDocumentView(DocumentViewSet): 'title': 'title.raw', 'description': 'description.raw', 'num_total_samples': 'num_total_samples', - 'num_processed_samples': 'num_processed_samples', + 'num_downloadable_samples': 'num_downloadable_samples', 'source_first_published': 'source_first_published' } diff --git a/common/data_refinery_common/models/documents.py b/common/data_refinery_common/models/documents.py index 31b3cb601..6d5f187e9 100644 --- a/common/data_refinery_common/models/documents.py +++ b/common/data_refinery_common/models/documents.py @@ -102,6 +102,7 @@ class ExperimentDocument(DocType): pubmed_id = fields.TextField() num_total_samples = fields.IntegerField() num_processed_samples = fields.IntegerField() + num_downloadable_samples = fields.IntegerField() source_first_published = fields.DateField() # FK/M2M diff --git a/common/data_refinery_common/models/models.py b/common/data_refinery_common/models/models.py index bdb14afca..5382fcb6a 100644 --- a/common/data_refinery_common/models/models.py +++ b/common/data_refinery_common/models/models.py @@ -327,7 +327,8 @@ def update_num_samples(self): """ Update our cache values """ self.num_total_samples = self.samples.count() self.num_processed_samples = self.samples.filter(is_processed=True).count() - self.num_processed_samples = 0 + qn_organisms = Organism.get_objects_with_qn_targets() + self.num_downloadable_samples = self.samples.filter(is_processed=True, organism__in=qn_organisms).count() self.save() def to_metadata_dict(self): diff --git a/common/version b/common/version index bbc2d086e..33979fcf9 100644 --- a/common/version +++ b/common/version @@ -1 +1 @@ -local1560362792 +local1560363052 From 776b73a35916e81f3f945f3e56de9cc0080f7719 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 12 Jun 2019 15:40:37 -0400 Subject: [PATCH 19/42] set num_downloadable_samples on test data --- api/data_refinery_api/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/api/data_refinery_api/tests.py b/api/data_refinery_api/tests.py index 96fd4b9cc..b185c2e4f 100644 --- a/api/data_refinery_api/tests.py +++ b/api/data_refinery_api/tests.py @@ -1180,6 +1180,7 @@ def test_es_endpoint(self): experiment.technology = "MICROARRAY" experiment.num_processed_samples = 1 # added below experiment.num_total_samples = 1 + experiment.num_downloadable_samples = 1 experiment.save() self.experiment = experiment From 95134c45d0ab8fae8dfde188df2c11b57ca591d3 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 12 Jun 2019 16:23:04 -0400 Subject: [PATCH 20/42] Skip updating experiment cashed values for smasher jobs --- .../data_refinery_workers/processors/utils.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index 6087c7b2d..29b4d2234 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -264,7 +264,6 @@ def end_job(job_context: Dict, abort=False): if not abort: if job_context.get("success", False) and not (job_context["job"].pipeline_applied in ["SMASHER", "QN_REFERENCE", "COMPENDIA"]): - # Salmon requires the final `tximport` step to be fully `is_processed`. mark_as_processed = True if (job_context["job"].pipeline_applied == "SALMON" and not job_context.get('tximported', False)): @@ -292,13 +291,15 @@ def end_job(job_context: Dict, abort=False): original_file.delete_local_file() if success: - # update the cached values of each experiment - unique_experiments = [] - for sample in job_context.get("samples", []): - if sample.experiments.all().count() > 0: - unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) - for experiment in unique_experiments: - experiment.update_num_samples() + if job_context["job"].pipeline_applied not in ["SMASHER"]: + # update the cached values of each experiment + # job_context['samples'] is a string for SMASHER jobs, that's why we skip this part for those + unique_experiments = [] + for sample in job_context.get("samples", []): + if sample.experiments.all().count() > 0: + unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) + for experiment in unique_experiments: + experiment.update_num_samples() # QN reference files go to a special bucket so they can be # publicly available. From b9c4c80c134cb3419b67f07887dca0d99c6dd871 Mon Sep 17 00:00:00 2001 From: Casey Greene Date: Thu, 13 Jun 2019 09:19:23 -0400 Subject: [PATCH 21/42] warning -> debug This looks like a status message. It also swamps our Sentry interface when folks download large datasets because each message is subtly different. I don't have strong feelings between `info` and `debug` but I do have strong feelings that it shouldn't be `warning` or above. --- workers/data_refinery_workers/processors/smasher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/data_refinery_workers/processors/smasher.py b/workers/data_refinery_workers/processors/smasher.py index e2396cb6f..3799dfd54 100644 --- a/workers/data_refinery_workers/processors/smasher.py +++ b/workers/data_refinery_workers/processors/smasher.py @@ -560,7 +560,7 @@ def _smash(job_context: Dict, how="inner") -> Dict: num_samples = num_samples + 1 if (num_samples % 100) == 0: - logger.warning("Loaded " + str(num_samples) + " samples into frames.", + logger.debug("Loaded " + str(num_samples) + " samples into frames.", dataset_id=job_context['dataset'].id, how=how ) From 572cde78d2e5fb7d3b1a6c1f467851f2f328baec Mon Sep 17 00:00:00 2001 From: Ariel Date: Thu, 13 Jun 2019 13:00:02 -0400 Subject: [PATCH 22/42] Move updating experiment caches to qn file --- .../processors/qn_reference.py | 15 +++++++++++++++ .../data_refinery_workers/processors/utils.py | 16 ++++++---------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/workers/data_refinery_workers/processors/qn_reference.py b/workers/data_refinery_workers/processors/qn_reference.py index 6433a2f97..e1bc36b91 100644 --- a/workers/data_refinery_workers/processors/qn_reference.py +++ b/workers/data_refinery_workers/processors/qn_reference.py @@ -210,6 +210,20 @@ def _create_result_objects(job_context: Dict) -> Dict: job_context['success'] = True return job_context +def _update_experiment_caches(job_context: Dict) -> Dict: + """ Experiments have a cached value with the number of samples that have QN targets + generated, this value should be updated after generating new QN targets. """ + unique_experiments = [] + all_samples = job_context['samples']['ALL'] + for sample in all_samples: + if sample.experiments.all().count() > 0: + unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) + + for experiment in unique_experiments: + experiment.update_num_samples() + + return job_context + def create_qn_reference(job_id: int) -> None: pipeline = Pipeline(name=utils.PipelineEnum.QN_REFERENCE.value) job_context = utils.run_pipeline({"job_id": job_id, "pipeline": pipeline}, @@ -218,5 +232,6 @@ def create_qn_reference(job_id: int) -> None: _build_qn_target, # _verify_result, _create_result_objects, + _update_experiment_caches, utils.end_job]) return job_context diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index 29b4d2234..d07957345 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -271,9 +271,12 @@ def end_job(job_context: Dict, abort=False): if mark_as_processed: # This handles most of our cases + unique_experiments = [] for sample in job_context.get("samples", []): sample.is_processed = True sample.save() + if sample.experiments.all().count() > 0: + unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) # Explicitly for the single-salmon scenario if 'sample' in job_context: @@ -281,6 +284,9 @@ def end_job(job_context: Dict, abort=False): sample.is_processed = True sample.save() + for experiment in unique_experiments: + experiment.update_num_samples() + # If we are aborting, it's because we want to do something # different, so leave the original files so that "something # different" can use them. @@ -291,16 +297,6 @@ def end_job(job_context: Dict, abort=False): original_file.delete_local_file() if success: - if job_context["job"].pipeline_applied not in ["SMASHER"]: - # update the cached values of each experiment - # job_context['samples'] is a string for SMASHER jobs, that's why we skip this part for those - unique_experiments = [] - for sample in job_context.get("samples", []): - if sample.experiments.all().count() > 0: - unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) - for experiment in unique_experiments: - experiment.update_num_samples() - # QN reference files go to a special bucket so they can be # publicly available. if job_context["job"].pipeline_applied == "QN_REFERENCE": From 20c7d05b7d48cdbf2f3925e2b86cebbdf6bc6683 Mon Sep 17 00:00:00 2001 From: Ariel Date: Thu, 13 Jun 2019 13:01:03 -0400 Subject: [PATCH 23/42] remove whitespace --- workers/data_refinery_workers/processors/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index d07957345..e949ebb18 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -264,6 +264,7 @@ def end_job(job_context: Dict, abort=False): if not abort: if job_context.get("success", False) and not (job_context["job"].pipeline_applied in ["SMASHER", "QN_REFERENCE", "COMPENDIA"]): + # Salmon requires the final `tximport` step to be fully `is_processed`. mark_as_processed = True if (job_context["job"].pipeline_applied == "SALMON" and not job_context.get('tximported', False)): @@ -271,7 +272,7 @@ def end_job(job_context: Dict, abort=False): if mark_as_processed: # This handles most of our cases - unique_experiments = [] + unique_experiments = [] for sample in job_context.get("samples", []): sample.is_processed = True sample.save() From e860eedde7613fd09ece87cc54fb44fe7cd5c227 Mon Sep 17 00:00:00 2001 From: Ariel Date: Fri, 14 Jun 2019 11:04:34 -0400 Subject: [PATCH 24/42] Send dataset url instead of the download url in email --- workers/data_refinery_workers/processors/smasher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workers/data_refinery_workers/processors/smasher.py b/workers/data_refinery_workers/processors/smasher.py index e2396cb6f..f231041ef 100644 --- a/workers/data_refinery_workers/processors/smasher.py +++ b/workers/data_refinery_workers/processors/smasher.py @@ -934,8 +934,8 @@ def _notify(job_context: Dict) -> Dict: job_context['success'] = False else: SUBJECT = "Your refine.bio Dataset is Ready!" - BODY_TEXT = "Hot off the presses:\n\n" + job_context["result_url"] + "\n\nLove!,\nThe refine.bio Team" - FORMATTED_HTML = BODY_HTML.replace('REPLACE_DOWNLOAD_URL', job_context["result_url"])\ + BODY_TEXT = "Hot off the presses:\n\n" + dataset_url + "\n\nLove!,\nThe refine.bio Team" + FORMATTED_HTML = BODY_HTML.replace('REPLACE_DOWNLOAD_URL', dataset_url)\ .replace('REPLACE_DATASET_URL', dataset_url) # Try to send the email. From 3c1761252913527b7e353e87f3f5168b9b20f70f Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Tue, 18 Jun 2019 16:29:24 -0400 Subject: [PATCH 25/42] Scale down max clients to 1 to run a slim test. --- infrastructure/environments/prod.tfvars | Bin 817 -> 816 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/infrastructure/environments/prod.tfvars b/infrastructure/environments/prod.tfvars index 880cb391a976fb214b40ad8713927f46cf074f5f..1d2bdaf36b1ef62e11bd30b65e37cc556e89a3d2 100644 GIT binary patch literal 816 zcmV-01JC>bM@dveQdv+`04doIwI<_i%oBP<;8w7jTDK9G+#>hmD%llGgK|*0yn{F5 z3&;N&^Y-b8E3|i$MJ+QH7rl8~6-k-$7!mndxRSMBqh{c42*5%oQJ_!2zB8gN z?qhw zg}2C536{cUM;F+iESUkg90PXVTTx-pESgZKS`Z^0DhGGe9io-DU4zEZ6VMi;8=hXe z62~Gz6HKTMLGgn+A-hN%jD)L39^6ZL>em8oTCKBDrKtXqJri6AXP-V%J*v#Cq_xj- zOgec}Ymd?AuOg9%ylr?7l{I`Cfo%i&u_k*H#{NYzsulvD9l?2EH1@CjQ*eibG+C(y$%*h&0tNs>_!)-GCR ziUvd88Vz4idM#g|w9DHrLr!NOKTsAzuvhciQs@K^)#lYFQ#UlIa`+9#l}1^gqORg| zeZgC^EN|m!qqHURj~}^j6{r|2X&j8v(7y{rjrG*e4F3nW%q(`D8|SvZd#&qej0C6> z{5#3+iHVrpHod$fZvdhCu`R8!2>z2K%uJTRz#|t1GrlxT0L#m!l(17%@@Oh&>l5^; z!P#`gMAt^pEyxjXYjr~dA!84HNZA@n?JUJk$=mn~*g-eTMj8_r1s#|EHzb08wxZ*( zta4jSs-N9WikfV7*$kWoJwp?6lY)#TlwXD1kE)*Oj2sFkv97^r&QpQ>!8Xb2)Y`iI z9mbp^Fm4?xV_ke8#W6s4pk(Fro8a%`AyCGl+bWe@l9X%Bk6w)vwk-n9GvWw%!e zc7kAVfs8>NOh-irvnM`HMlRZG)ns`_?<5&@O~gC$c(eTiZK literal 817 zcmV-11J3*aM@dveQdv+`0DJ2734s3sB@5@K@_3B0BG5eK_$0+{YUbFn0Crvw8SHVc z`c8J-;wlYwxx@g49&Ke7`Vgssr*EV6ruUEL+7hqD_WC?OCkio4#7{e}L6djgvo9Cq zlY>k(CYL;yYyWfGDw@Rl(BIH8kOtNO4-8=qiKr!<uLI>&>sDUm_b|Uj@g? zDT&7N_gN*ob(aNGEg6q>Wt3Rmlo<0+gETp=H1!#G?#4hKX6x8D2}sDEor50i+-5%P z&!S-nBf}+>m?x8e6p?He;IJG!LWWx7Qj_hO4dwICo;B=!Htzsz{4;u*g~LCdIHjD= zhljv_QV_TPd07eu)ovoA0EA#1Y=;wPjWv^`jn>g8q^jR6Wfp{?$hTD35*jyOC$7kr zA#}l~zDN=$02vh4GBGJz2vJy+f zW~3TRR_Zp_NT2C)*jWl}F?KuXSx00oyx|3W`~BX=(Tr{GtqZ#M zn58$0%`{4XKdJuICrKa@rmi549B?~!O83W_J$oLdUA2QIqrJvipjVr>a25<}0Z-tL zcwcg?w3cZmkL{NmD*jLD>Xz`Kv1ejDRLSolZ;(IH*Kb47snu6<;tEOF?N9L{P%@+z z6-FPTZ4iYOiO!9d9bO5ZNhY`7U;*t?SJ`=CW)9WYaUFtKG)aB68B;M|{vx=T33Tzl v(EArKNI?V9&BlcPgA)lm^(@+k^?YZF!4avhHrt1dyoJ3$o1r@zBJYqBCp?Jf From db9f6cc1b5d99ac18328a5ac8b42ca8619ae78ff Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Wed, 19 Jun 2019 14:56:46 -0400 Subject: [PATCH 26/42] When DL jobs get OOM killed they have a blank failure reason, we were filtering on failure_reasons poorly. This fixes that. --- foreman/data_refinery_foreman/foreman/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/foreman/data_refinery_foreman/foreman/main.py b/foreman/data_refinery_foreman/foreman/main.py index 8e465d89c..83c334c45 100644 --- a/foreman/data_refinery_foreman/foreman/main.py +++ b/foreman/data_refinery_foreman/foreman/main.py @@ -323,7 +323,7 @@ def requeue_downloader_job(last_job: DownloaderJob) -> bool: num_retries = last_job.num_retries + 1 ram_amount = last_job.ram_amount - if last_job.failure_reason is not None and 'harakiri' not in last_job.failure_reason: + if last_job.failure_reason is None or 'harakiri' not in last_job.failure_reason: if ram_amount == 1024: ram_amount = 4096 elif ram_amount == 4096: From a63231df1086a3c9ade9cb421fe8eda080ba72a0 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 19 Jun 2019 16:06:58 -0400 Subject: [PATCH 27/42] remove version --- common/version | 1 - 1 file changed, 1 deletion(-) delete mode 100644 common/version diff --git a/common/version b/common/version deleted file mode 100644 index 33979fcf9..000000000 --- a/common/version +++ /dev/null @@ -1 +0,0 @@ -local1560363052 From 332513fc505cdc3e0b4c7ced617b928d422e22b2 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 19 Jun 2019 16:08:15 -0400 Subject: [PATCH 28/42] keep num_processed_samples in the search filters --- api/data_refinery_api/views.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/api/data_refinery_api/views.py b/api/data_refinery_api/views.py index e8d7bc85c..11f18047f 100644 --- a/api/data_refinery_api/views.py +++ b/api/data_refinery_api/views.py @@ -229,6 +229,14 @@ class ExperimentDocumentView(DocumentViewSet): 'has_publication': 'has_publication', 'platform': 'platform_accession_codes', 'organism': 'organism_names', + 'num_processed_samples': { + 'field': 'num_processed_samples', + 'lookups': [ + LOOKUP_FILTER_RANGE, + LOOKUP_QUERY_IN, + LOOKUP_QUERY_GT + ], + }, 'num_downloadable_samples': { 'field': 'num_downloadable_samples', 'lookups': [ From 3f5563740b444d0f8eb86f87eb30edc3d0ff9189 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 19 Jun 2019 16:32:14 -0400 Subject: [PATCH 29/42] update all related experiments when generating qn targets --- .../data_refinery_workers/processors/qn_reference.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/workers/data_refinery_workers/processors/qn_reference.py b/workers/data_refinery_workers/processors/qn_reference.py index e1bc36b91..ecf113418 100644 --- a/workers/data_refinery_workers/processors/qn_reference.py +++ b/workers/data_refinery_workers/processors/qn_reference.py @@ -213,12 +213,10 @@ def _create_result_objects(job_context: Dict) -> Dict: def _update_experiment_caches(job_context: Dict) -> Dict: """ Experiments have a cached value with the number of samples that have QN targets generated, this value should be updated after generating new QN targets. """ - unique_experiments = [] - all_samples = job_context['samples']['ALL'] - for sample in all_samples: - if sample.experiments.all().count() > 0: - unique_experiments = list(set(unique_experiments + sample.experiments.all()[::1])) - + organism_id = job_context['samples']['ALL'][0].organism_id + organism_name = Organism.get_name_for_id(organism_id) + unique_experiments = Experiments.objects.all().filter(organism_names__contains=organism_name) + for experiment in unique_experiments: experiment.update_num_samples() From d5dfbd1182210c1b3b229af879db25aea7a1bcdc Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 19 Jun 2019 17:43:12 -0400 Subject: [PATCH 30/42] add test value in experiment organism name --- workers/data_refinery_workers/processors/test_qn_reference.py | 1 + 1 file changed, 1 insertion(+) diff --git a/workers/data_refinery_workers/processors/test_qn_reference.py b/workers/data_refinery_workers/processors/test_qn_reference.py index 5eca35ff0..c1446f62c 100644 --- a/workers/data_refinery_workers/processors/test_qn_reference.py +++ b/workers/data_refinery_workers/processors/test_qn_reference.py @@ -37,6 +37,7 @@ def test_qn_reference(self): experiment = Experiment() experiment.accession_code = "12345" + experiment.organism_names = [homo_sapiens.name] experiment.save() for code in ['1', '2', '3', '4', '5', '6']: From cc3309154465e46170f313e98a70cec475e781a1 Mon Sep 17 00:00:00 2001 From: Ariel Date: Thu, 20 Jun 2019 15:02:09 -0400 Subject: [PATCH 31/42] fix organism import --- workers/data_refinery_workers/processors/test_qn_reference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/data_refinery_workers/processors/test_qn_reference.py b/workers/data_refinery_workers/processors/test_qn_reference.py index c1446f62c..2e123a0f5 100644 --- a/workers/data_refinery_workers/processors/test_qn_reference.py +++ b/workers/data_refinery_workers/processors/test_qn_reference.py @@ -8,7 +8,6 @@ ProcessorJob, OriginalFile, Sample, - Organism, SampleComputedFileAssociation, ProcessorJobOriginalFileAssociation, Dataset, @@ -18,6 +17,7 @@ ExperimentSampleAssociation, ProcessorJobDatasetAssociation ) +from data_refinery_common.models.organism import Organism from data_refinery_workers.processors import qn_reference, smasher, utils From 3a794b054054c186eec1c204ec4495b8a54ea542 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Thu, 20 Jun 2019 16:22:57 -0400 Subject: [PATCH 32/42] Fix a syntax error in the smasher. --- workers/data_refinery_workers/processors/smasher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workers/data_refinery_workers/processors/smasher.py b/workers/data_refinery_workers/processors/smasher.py index cac49e8a2..829ce5d1f 100644 --- a/workers/data_refinery_workers/processors/smasher.py +++ b/workers/data_refinery_workers/processors/smasher.py @@ -914,11 +914,11 @@ def _notify(job_context: Dict) -> Dict: requests.post( "https://hooks.slack.com/services/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", json={ - 'fallback': 'Dataset failed processing.' + 'fallback': 'Dataset failed processing.', 'title': 'Dataset failed processing', 'title_link': dataset_url, "attachments":[ - { + { "color": "warning", "text": job_context['job'].failure_reason, 'author_name': job_context["dataset"].email_address, From 93ea26dfb48bc83bd03608cd2a091ffcf6421867 Mon Sep 17 00:00:00 2001 From: Ariel Date: Fri, 21 Jun 2019 10:29:00 -0400 Subject: [PATCH 33/42] fix tests --- workers/data_refinery_workers/processors/qn_reference.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workers/data_refinery_workers/processors/qn_reference.py b/workers/data_refinery_workers/processors/qn_reference.py index ecf113418..352f9f03f 100644 --- a/workers/data_refinery_workers/processors/qn_reference.py +++ b/workers/data_refinery_workers/processors/qn_reference.py @@ -20,6 +20,7 @@ Processor, SampleComputedFileAssociation, SampleResultAssociation, + Experiment, ) from data_refinery_common.utils import get_env_variable from data_refinery_workers.processors import utils, smasher @@ -213,9 +214,8 @@ def _create_result_objects(job_context: Dict) -> Dict: def _update_experiment_caches(job_context: Dict) -> Dict: """ Experiments have a cached value with the number of samples that have QN targets generated, this value should be updated after generating new QN targets. """ - organism_id = job_context['samples']['ALL'][0].organism_id - organism_name = Organism.get_name_for_id(organism_id) - unique_experiments = Experiments.objects.all().filter(organism_names__contains=organism_name) + organism_name = job_context['samples']['ALL'][0].organism.name + unique_experiments = Experiment.objects.all().filter(organism_names__contains=[organism_name]) for experiment in unique_experiments: experiment.update_num_samples() From 49af4aa38e555861f20b7a28cde6fafb02fb4586 Mon Sep 17 00:00:00 2001 From: Will Vauclain Date: Tue, 25 Jun 2019 12:14:04 -0400 Subject: [PATCH 34/42] Allow SRA metadata to be processed, and test the harmonization --- foreman/data_refinery_foreman/surveyor/sra.py | 13 +++++++++---- foreman/data_refinery_foreman/surveyor/test_sra.py | 8 ++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/foreman/data_refinery_foreman/surveyor/sra.py b/foreman/data_refinery_foreman/surveyor/sra.py index 086af0e98..c3fbd7e2b 100644 --- a/foreman/data_refinery_foreman/surveyor/sra.py +++ b/foreman/data_refinery_foreman/surveyor/sra.py @@ -331,6 +331,14 @@ def _build_ncbi_file_url(run_accession: str): return download_url + @staticmethod + def _apply_harmonized_metadata_to_sample(sample: Sample, metadata: dict): + """Harmonizes the metadata and applies it to `sample`""" + sample.title = harmony.extract_title(metadata) + harmonized_sample = harmony.harmonize([metadata]) + for key, value in harmonized_sample[sample.title].items(): + setattr(sample, key, value) + def _generate_experiment_and_samples(self, run_accession: str, study_accession: str=None) -> (Experiment, List[Sample]): """Generates Experiments and Samples for the provided run_accession.""" metadata = SraSurveyor.gather_all_metadata(run_accession) @@ -467,10 +475,7 @@ def _generate_experiment_and_samples(self, run_accession: str, study_accession: sample_object.manufacturer = "UNKNOWN" # Directly apply the harmonized values - sample_object.title = harmony.extract_title(metadata) - harmonized_sample = harmony.harmonize([metadata]) - for key, value in harmonized_sample.items(): - setattr(sample_object, key, value) + SraSurveyor._apply_harmonized_metadata_to_sample(sample_object, metadata) protocol_info, is_updated = self.update_sample_protocol_info( existing_protocols=[], diff --git a/foreman/data_refinery_foreman/surveyor/test_sra.py b/foreman/data_refinery_foreman/surveyor/test_sra.py index 902ab1cdf..3b039afc4 100644 --- a/foreman/data_refinery_foreman/surveyor/test_sra.py +++ b/foreman/data_refinery_foreman/surveyor/test_sra.py @@ -194,3 +194,11 @@ def test_metadata_is_gathered_correctly(self, mock_get): ncbi_url = SraSurveyor._build_ncbi_file_url(metadata["run_accession"]) self.assertTrue(ncbi_url in ['anonftp@ftp.ncbi.nlm.nih.gov:/sra/sra-instant/reads/ByRun/sra/DRR/DRR002/DRR002116/DRR002116.sra', 'anonftp@ftp-private.ncbi.nlm.nih.gov:/sra/sra-instant/reads/ByRun/sra/DRR/DRR002/DRR002116/DRR002116.sra', 'dbtest@sra-download.ncbi.nlm.nih.gov:data/sracloud/traces/dra0/DRR/000002/DRR002116']) + + def test_sra_metadata_is_harmonized(self): + metadata = SraSurveyor.gather_all_metadata("SRR3098582") + sample = Sample() + SraSurveyor._apply_harmonized_metadata_to_sample(sample, metadata) + self.assertEqual(sample.treatment, "biliatresone") + self.assertEqual(sample.subject, "liver") + self.assertEqual(sample.specimen_part, "liver") From 460646bda39fd155f071ad5461f727bf8c129a8e Mon Sep 17 00:00:00 2001 From: wvauclain <13942258+wvauclain@users.noreply.github.com> Date: Tue, 25 Jun 2019 14:55:19 -0400 Subject: [PATCH 35/42] Removed unnecessary comment --- foreman/data_refinery_foreman/surveyor/sra.py | 1 - 1 file changed, 1 deletion(-) diff --git a/foreman/data_refinery_foreman/surveyor/sra.py b/foreman/data_refinery_foreman/surveyor/sra.py index c3fbd7e2b..e98da8b07 100644 --- a/foreman/data_refinery_foreman/surveyor/sra.py +++ b/foreman/data_refinery_foreman/surveyor/sra.py @@ -474,7 +474,6 @@ def _generate_experiment_and_samples(self, run_accession: str, study_accession: else: sample_object.manufacturer = "UNKNOWN" - # Directly apply the harmonized values SraSurveyor._apply_harmonized_metadata_to_sample(sample_object, metadata) protocol_info, is_updated = self.update_sample_protocol_info( From 9065cbd506e5af648f00b5983ae04d3e596a62b9 Mon Sep 17 00:00:00 2001 From: wvauclain <13942258+wvauclain@users.noreply.github.com> Date: Wed, 26 Jun 2019 10:24:23 -0400 Subject: [PATCH 36/42] Removed pip.conf recommendation from README --- README.md | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/README.md b/README.md index d124a3579..405bb06b6 100644 --- a/README.md +++ b/README.md @@ -102,17 +102,6 @@ Instructions for installing Docker, Terraform, and Nomad can be found by following the link for each service. git-crypt, jq, and iproute2 can be installed via `sudo apt-get install git-crypt jq iproute2`. -When installing pip packages later in the install, you might get an error saying you need sudo permissions. -In order to fix this you have to edit your `~/.config/pip/pip.conf` to add this: - -``` -[install] -user = yes -no-binary = :all: -``` - -This sets pip to install all packages in your user directory so sudo is not required for pip intalls. - #### Mac The following services will need to be installed: From e00370c534162c57223b7bf28916bad52f98ae05 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Fri, 28 Jun 2019 12:19:33 -0400 Subject: [PATCH 37/42] Bumps max_clients to 5 and DOWNLOADER_JOBS_PER_NODE to 50. --- foreman/data_refinery_foreman/foreman/main.py | 2 +- infrastructure/environments/prod.tfvars | Bin 816 -> 816 bytes 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/foreman/data_refinery_foreman/foreman/main.py b/foreman/data_refinery_foreman/foreman/main.py index 83c334c45..d26ace3cb 100644 --- a/foreman/data_refinery_foreman/foreman/main.py +++ b/foreman/data_refinery_foreman/foreman/main.py @@ -50,7 +50,7 @@ # This can be overritten by the env var "MAX_TOTAL_JOBS" DEFAULT_MAX_JOBS = 20000 -DOWNLOADER_JOBS_PER_NODE = 150 +DOWNLOADER_JOBS_PER_NODE = 50 PAGE_SIZE=2000 # This is the maximum number of non-dead nomad jobs that can be in the diff --git a/infrastructure/environments/prod.tfvars b/infrastructure/environments/prod.tfvars index 1d2bdaf36b1ef62e11bd30b65e37cc556e89a3d2..9d07c6d404a9ae432c11292d101b6d26285f1e8d 100644 GIT binary patch literal 816 zcmV-01JC>bM@dveQdv+`0R7MU?0nnt1UC(kF|;eMGNI}nnqBpV>p`fl}0fqBsB?SOSIzo!=PyXv8p1hGcl1&Y2FI-BX69c+M-W zaa#)No6|a!YGV(k*(n~GxGYEywp2l zHf43%Oxl)LdcP*{=Tb$`VsGUs`eos2G`{cbE$|L>7r0T=b<{KZ9Usi;6o>pUV8yY< zI#FlnVH-8JWFMNo{3lJ@|L3REVNzWtpfdBGZwTbMx0oNyGK*!sjRI_ZG6nan!J7tW|*PXGB)_I>OgI}zqXSJ+R26_*E_VqT||*i#+D(z zS@b~D-o~EWA1A-xeK-Z)i_yy={cngEm)t}h23tl#5P_Rl9c}<_*u%5V?+1E87PLsy z?f4EZ6fjU}#KrSy=rUQ=bs%PR_Q6xRFgFFtPdTm8G?0nFLpV9G@eJv=uOk)>2Hdf# z?8q&C}sy;uc!ExuIEMNXBD44I50iWJ5x1Zt+5mpRt|iXy+C+w&2EM5BSU;1VM7sGMAl9nQgEP=|??fS{NF literal 816 zcmV-01JC>bM@dveQdv+`04doIwI<_i%oBP<;8w7jTDK9G+#>hmD%llGgK|*0yn{F5 z3&;N&^Y-b8E3|i$MJ+QH7rl8~6-k-$7!mndxRSMBqh{c42*5%oQJ_!2zB8gN z?qhw zg}2C536{cUM;F+iESUkg90PXVTTx-pESgZKS`Z^0DhGGe9io-DU4zEZ6VMi;8=hXe z62~Gz6HKTMLGgn+A-hN%jD)L39^6ZL>em8oTCKBDrKtXqJri6AXP-V%J*v#Cq_xj- zOgec}Ymd?AuOg9%ylr?7l{I`Cfo%i&u_k*H#{NYzsulvD9l?2EH1@CjQ*eibG+C(y$%*h&0tNs>_!)-GCR ziUvd88Vz4idM#g|w9DHrLr!NOKTsAzuvhciQs@K^)#lYFQ#UlIa`+9#l}1^gqORg| zeZgC^EN|m!qqHURj~}^j6{r|2X&j8v(7y{rjrG*e4F3nW%q(`D8|SvZd#&qej0C6> z{5#3+iHVrpHod$fZvdhCu`R8!2>z2K%uJTRz#|t1GrlxT0L#m!l(17%@@Oh&>l5^; z!P#`gMAt^pEyxjXYjr~dA!84HNZA@n?JUJk$=mn~*g-eTMj8_r1s#|EHzb08wxZ*( zta4jSs-N9WikfV7*$kWoJwp?6lY)#TlwXD1kE)*Oj2sFkv97^r&QpQ>!8Xb2)Y`iI z9mbp^Fm4?xV_ke8#W6s4pk(Fro8a%`AyCGl+bWe@l9X%Bk6w)vwk-n9GvWw%!e zc7kAVfs8>NOh-irvnM`HMlRZG)ns`_?<5&@O~gC$c(eTiZK From 92b6fa5b265eb5203f8dfbb00f3c15113f9ba9ef Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Fri, 28 Jun 2019 12:20:20 -0400 Subject: [PATCH 38/42] Bumps DOWNLOADER_JOBS_PER_NODE to 75. --- foreman/data_refinery_foreman/foreman/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/foreman/data_refinery_foreman/foreman/main.py b/foreman/data_refinery_foreman/foreman/main.py index d26ace3cb..ebc80b2a9 100644 --- a/foreman/data_refinery_foreman/foreman/main.py +++ b/foreman/data_refinery_foreman/foreman/main.py @@ -50,7 +50,7 @@ # This can be overritten by the env var "MAX_TOTAL_JOBS" DEFAULT_MAX_JOBS = 20000 -DOWNLOADER_JOBS_PER_NODE = 50 +DOWNLOADER_JOBS_PER_NODE = 75 PAGE_SIZE=2000 # This is the maximum number of non-dead nomad jobs that can be in the From fc909379d80ae597092be296b6a4cd551e01a47f Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Mon, 1 Jul 2019 10:00:30 -0400 Subject: [PATCH 39/42] Remove a migration and readd it as a management command since it takes too long. --- ...021_experiment_num_downloadable_samples.py | 28 ------------------- .../commands/update_downloadable_samples.py | 9 ++++++ 2 files changed, 9 insertions(+), 28 deletions(-) delete mode 100644 common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py create mode 100644 foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py diff --git a/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py deleted file mode 100644 index 9c8783aea..000000000 --- a/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py +++ /dev/null @@ -1,28 +0,0 @@ -# Generated by Django 2.1.8 on 2019-06-12 17:25 - -import django.contrib.postgres.fields -from django.db import migrations, models - -def update_cached_values(apps, schema_editor): - """ """ - Experiment = apps.get_model('data_refinery_common', 'Experiment') - ComputationalResultAnnotation = apps.get_model('data_refinery_common', 'ComputationalResultAnnotation') - for experiment in Experiment.objects.all(): - organism_ids = list(ComputationalResultAnnotation.objects.filter(data__is_qn=True).values_list('data__organism_id', flat=True)) - experiment.num_downloadable_samples = experiment.samples.filter(is_processed=True, organism__id__in=organism_ids).count() - experiment.save() - -class Migration(migrations.Migration): - - dependencies = [ - ('data_refinery_common', '0020_update_qn_bucket'), - ] - - operations = [ - migrations.AddField( - model_name='experiment', - name='num_downloadable_samples', - field=models.IntegerField(default=0), - ), - migrations.RunPython(update_cached_values) - ] diff --git a/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py new file mode 100644 index 000000000..51a8ea192 --- /dev/null +++ b/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py @@ -0,0 +1,9 @@ +import django.contrib.postgres.fields +from data_refinery_common.models import Experiment, ComputationalResultAnnotation + +class Command(BaseCommand): + def handle(self, *args, **options): + for experiment in Experiment.objects.all(): + organism_ids = list(ComputationalResultAnnotation.objects.filter(data__is_qn=True).values_list('data__organism_id', flat=True)) + experiment.num_downloadable_samples = experiment.samples.filter(is_processed=True, organism__id__in=organism_ids).count() + experiment.save() From 0ff6b2e670fea3290b21fd59ebbf5f4d0a65933c Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Mon, 1 Jul 2019 10:17:42 -0400 Subject: [PATCH 40/42] Add the migration to add num_downloadable_samples to the experiment model. --- ...0021_experiment_num_downloadable_samples.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py diff --git a/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py new file mode 100644 index 000000000..b0daedd96 --- /dev/null +++ b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py @@ -0,0 +1,18 @@ +# Generated by Django 2.1.8 on 2019-07-01 14:16 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('data_refinery_common', '0020_update_qn_bucket'), + ] + + operations = [ + migrations.AddField( + model_name='experiment', + name='num_downloadable_samples', + field=models.IntegerField(default=0), + ), + ] From 36bb8720431654342cc88a9ef10e051aa290efa3 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Mon, 1 Jul 2019 10:21:08 -0400 Subject: [PATCH 41/42] Fix management command to update num_downloadable_samples. --- .../management/commands/update_downloadable_samples.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py index 51a8ea192..47830fa6c 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py @@ -1,4 +1,4 @@ -import django.contrib.postgres.fields +from django.core.management.base import BaseCommand from data_refinery_common.models import Experiment, ComputationalResultAnnotation class Command(BaseCommand): @@ -7,3 +7,5 @@ def handle(self, *args, **options): organism_ids = list(ComputationalResultAnnotation.objects.filter(data__is_qn=True).values_list('data__organism_id', flat=True)) experiment.num_downloadable_samples = experiment.samples.filter(is_processed=True, organism__id__in=organism_ids).count() experiment.save() + + print("Updated the num_downloadable_samples field on all experiment objects.") From 5de7137cc09e3972601cf361ab1bc719525c1ae6 Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Mon, 1 Jul 2019 10:23:14 -0400 Subject: [PATCH 42/42] Move a query outside the loop. --- .../foreman/management/commands/update_downloadable_samples.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py index 47830fa6c..d1f2de7b5 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py @@ -3,8 +3,8 @@ class Command(BaseCommand): def handle(self, *args, **options): + organism_ids = list(ComputationalResultAnnotation.objects.filter(data__is_qn=True).values_list('data__organism_id', flat=True)) for experiment in Experiment.objects.all(): - organism_ids = list(ComputationalResultAnnotation.objects.filter(data__is_qn=True).values_list('data__organism_id', flat=True)) experiment.num_downloadable_samples = experiment.samples.filter(is_processed=True, organism__id__in=organism_ids).count() experiment.save()