diff --git a/.gitignore b/.gitignore index 4e5022d5a..30ca10df9 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,5 @@ dr_env/ # emacs backup files *~ + +.vscode \ No newline at end of file diff --git a/README.md b/README.md index d124a3579..405bb06b6 100644 --- a/README.md +++ b/README.md @@ -102,17 +102,6 @@ Instructions for installing Docker, Terraform, and Nomad can be found by following the link for each service. git-crypt, jq, and iproute2 can be installed via `sudo apt-get install git-crypt jq iproute2`. -When installing pip packages later in the install, you might get an error saying you need sudo permissions. -In order to fix this you have to edit your `~/.config/pip/pip.conf` to add this: - -``` -[install] -user = yes -no-binary = :all: -``` - -This sets pip to install all packages in your user directory so sudo is not required for pip intalls. - #### Mac The following services will need to be installed: diff --git a/api/data_refinery_api/serializers.py b/api/data_refinery_api/serializers.py index 38715d393..030509c50 100644 --- a/api/data_refinery_api/serializers.py +++ b/api/data_refinery_api/serializers.py @@ -389,8 +389,7 @@ class Meta: class DetailedExperimentSerializer(serializers.ModelSerializer): annotations = ExperimentAnnotationSerializer(many=True, source='experimentannotation_set') samples = DetailedExperimentSampleSerializer(many=True) - organisms = OrganismSerializer(many=True) - sample_metadata = serializers.ReadOnlyField(source='get_sample_metadata_fields') + sample_metadata = serializers.ReadOnlyField(source='sample_metadata_fields') class Meta: model = Experiment @@ -414,8 +413,11 @@ class Meta: 'submitter_institution', 'last_modified', 'created_at', - 'organisms', + 'organism_names', 'sample_metadata', + 'num_total_samples', + 'num_processed_samples', + 'num_downloadable_samples' ) class PlatformSerializer(serializers.ModelSerializer): @@ -796,6 +798,7 @@ class ExperimentDocumentSerializer(serializers.Serializer): pubmed_id = serializers.CharField(read_only=True) num_total_samples = serializers.IntegerField(read_only=True) num_processed_samples = serializers.IntegerField(read_only=True) + num_downloadable_samples = serializers.IntegerField(read_only=True) source_first_published = serializers.DateField(read_only=True) # FK/M2M diff --git a/api/data_refinery_api/tests.py b/api/data_refinery_api/tests.py index 96fd4b9cc..b185c2e4f 100644 --- a/api/data_refinery_api/tests.py +++ b/api/data_refinery_api/tests.py @@ -1180,6 +1180,7 @@ def test_es_endpoint(self): experiment.technology = "MICROARRAY" experiment.num_processed_samples = 1 # added below experiment.num_total_samples = 1 + experiment.num_downloadable_samples = 1 experiment.save() self.experiment = experiment diff --git a/api/data_refinery_api/views.py b/api/data_refinery_api/views.py index 8742821f6..11f18047f 100644 --- a/api/data_refinery_api/views.py +++ b/api/data_refinery_api/views.py @@ -157,7 +157,7 @@ def aggregate(self, request, queryset, view): All we need to add is one line when building the facets: - .metric('total_samples', 'sum', field='num_processed_samples') + .metric('total_samples', 'sum', field='num_downloadable_samples') (Maybe there's a way to do this with the options in `ExperimentDocumentView`) """ @@ -165,7 +165,7 @@ def aggregate(self, request, queryset, view): for field, facet in iteritems(facets): agg = facet['facet'].get_aggregation() queryset.aggs.bucket(field, agg)\ - .metric('total_samples', 'sum', field='num_processed_samples') + .metric('total_samples', 'sum', field='num_downloadable_samples') return queryset @@ -236,6 +236,14 @@ class ExperimentDocumentView(DocumentViewSet): LOOKUP_QUERY_IN, LOOKUP_QUERY_GT ], + }, + 'num_downloadable_samples': { + 'field': 'num_downloadable_samples', + 'lookups': [ + LOOKUP_FILTER_RANGE, + LOOKUP_QUERY_IN, + LOOKUP_QUERY_GT + ], } } @@ -245,7 +253,7 @@ class ExperimentDocumentView(DocumentViewSet): 'title': 'title.raw', 'description': 'description.raw', 'num_total_samples': 'num_total_samples', - 'num_processed_samples': 'num_processed_samples', + 'num_downloadable_samples': 'num_downloadable_samples', 'source_first_published': 'source_first_published' } diff --git a/common/data_refinery_common/job_management.py b/common/data_refinery_common/job_management.py new file mode 100644 index 000000000..99bacdd9f --- /dev/null +++ b/common/data_refinery_common/job_management.py @@ -0,0 +1,145 @@ +from typing import List, Dict, Callable + +from data_refinery_common import job_lookup +from data_refinery_common.logging import get_and_configure_logger +from data_refinery_common.models import ( + DownloaderJob, + DownloaderJobOriginalFileAssociation, + OriginalFile, +) +from data_refinery_common.utils import ( + get_env_variable, + get_env_variable_gracefully, + get_instance_id, +) + + +logger = get_and_configure_logger(__name__) + + +def create_downloader_job(undownloaded_files: List[OriginalFile], + *, + processor_job_id=None, + force=False) -> bool: + """Creates a downloader job to download `undownloaded_files`.""" + if not undownloaded_files: + return False + + original_downloader_job = None + archive_file = None + for undownloaded_file in undownloaded_files: + try: + original_downloader_job = undownloaded_file.downloader_jobs.latest('id') + + # Found the job so we don't need to keep going. + break + except DownloaderJob.DoesNotExist: + # If there's no association between this file and any + # downloader jobs, it's most likely because the original + # file was created after extracting a archive containing + # multiple files worth of data. + # The way to handle this is to find that archive and + # recreate a downloader job FOR THAT. That archive will + # have the same filename as the file at the end of the + # 'source_url' field, because that source URL is pointing + # to the archive we need. + archive_filename = undownloaded_file.source_url.split("/")[-1] + + # This file or its job might not exist, but we'll wait + # until we've checked all the files before calling it a + # failure. + try: + archive_file = OriginalFile.objects.filter(filename=archive_filename) + if archive_file.count() > 0: + archive_file = archive_file.first() + else: + # We might need to match these up based on + # source_filenames rather than filenames so just + # try them both. + archive_file = OriginalFile.objects.filter(source_filename=archive_filename).first() + + original_downloader_job = DownloaderJobOriginalFileAssociation.objects.filter( + original_file=archive_file + ).latest('id').downloader_job + # Found the job so we don't need to keep going. + break + except: + pass + + if not original_downloader_job: + sample_object = list(undownloaded_files)[0].samples.first() + if sample_object: + downloader_task = job_lookup.determine_downloader_task(sample_object) + + if downloader_task == job_lookup.Downloaders.NONE: + logger.warn(("No valid downloader task found for sample, which is weird" + " because it was able to have a processor job created for it..."), + sample=sample_object.id) + return False + else: + # determine_downloader_task returns an enum object, + # but we wanna set this on the DownloaderJob object so + # we want the actual value. + downloader_task = downloader_task.value + + accession_code = sample_object.accession_code + original_files = sample_object.original_files.all() + else: + logger.error( + "Could not find the original DownloaderJob or Sample for these files.", + undownloaded_file=undownloaded_files + ) + return False + elif original_downloader_job.was_recreated and not force: + logger.warn( + "Downloader job has already been recreated once, not doing it again.", + original_downloader_job=original_downloader_job, + undownloaded_files=undownloaded_files + ) + return False + else: + downloader_task = original_downloader_job.downloader_task + accession_code = original_downloader_job.accession_code + original_files = original_downloader_job.original_files.all() + + sample_object = original_files[0].samples.first() + + new_job = DownloaderJob() + new_job.downloader_task = downloader_task + new_job.accession_code = accession_code + new_job.was_recreated = True + new_job.ram_amount = 1024 + new_job.save() + + if archive_file: + # If this downloader job is for an archive file, then the + # files that were passed into this function aren't what need + # to be directly downloaded, they were extracted out of this + # archive. The DownloaderJob will re-extract them and set up + # the associations for the new ProcessorJob. + # So double check that it still needs downloading because + # another file that came out of it could have already + # recreated the DownloaderJob. + if archive_file.needs_downloading(processor_job_id): + if archive_file.is_downloaded: + # If it needs to be downloaded then it's not + # downloaded and the is_downloaded field should stop + # lying about that. + archive_file.is_downloaded = False + archive_file.save() + + DownloaderJobOriginalFileAssociation.objects.get_or_create( + downloader_job=new_job, + original_file=archive_file + ) + else: + # We can't just associate the undownloaded files, because + # there's a chance that there is a file which actually is + # downloaded that also needs to be associated with the job. + for original_file in original_files: + DownloaderJobOriginalFileAssociation.objects.get_or_create( + downloader_job=new_job, + original_file=original_file + ) + + return True diff --git a/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py new file mode 100644 index 000000000..b0daedd96 --- /dev/null +++ b/common/data_refinery_common/migrations/0021_experiment_num_downloadable_samples.py @@ -0,0 +1,18 @@ +# Generated by Django 2.1.8 on 2019-07-01 14:16 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('data_refinery_common', '0020_update_qn_bucket'), + ] + + operations = [ + migrations.AddField( + model_name='experiment', + name='num_downloadable_samples', + field=models.IntegerField(default=0), + ), + ] diff --git a/common/data_refinery_common/models/documents.py b/common/data_refinery_common/models/documents.py index 31b3cb601..6d5f187e9 100644 --- a/common/data_refinery_common/models/documents.py +++ b/common/data_refinery_common/models/documents.py @@ -102,6 +102,7 @@ class ExperimentDocument(DocType): pubmed_id = fields.TextField() num_total_samples = fields.IntegerField() num_processed_samples = fields.IntegerField() + num_downloadable_samples = fields.IntegerField() source_first_published = fields.DateField() # FK/M2M diff --git a/common/data_refinery_common/models/models.py b/common/data_refinery_common/models/models.py index a76a289bc..5382fcb6a 100644 --- a/common/data_refinery_common/models/models.py +++ b/common/data_refinery_common/models/models.py @@ -297,6 +297,7 @@ def __str__(self): # Cached Computed Properties num_total_samples = models.IntegerField(default=0) num_processed_samples = models.IntegerField(default=0) + num_downloadable_samples = models.IntegerField(default=0) sample_metadata_fields = ArrayField(models.TextField(), default=list) organism_names = ArrayField(models.TextField(), default=list) platform_names = ArrayField(models.TextField(), default=list) @@ -326,6 +327,8 @@ def update_num_samples(self): """ Update our cache values """ self.num_total_samples = self.samples.count() self.num_processed_samples = self.samples.filter(is_processed=True).count() + qn_organisms = Organism.get_objects_with_qn_targets() + self.num_downloadable_samples = self.samples.filter(is_processed=True, organism__in=qn_organisms).count() self.save() def to_metadata_dict(self): diff --git a/common/data_refinery_common/utils.py b/common/data_refinery_common/utils.py index be7ff61e3..66ab80df0 100644 --- a/common/data_refinery_common/utils.py +++ b/common/data_refinery_common/utils.py @@ -1,4 +1,4 @@ -from typing import Dict, Set +from typing import Dict, Set, List from urllib.parse import urlparse import csv import nomad diff --git a/foreman/data_refinery_foreman/foreman/main.py b/foreman/data_refinery_foreman/foreman/main.py index a0735715b..ebc80b2a9 100644 --- a/foreman/data_refinery_foreman/foreman/main.py +++ b/foreman/data_refinery_foreman/foreman/main.py @@ -50,7 +50,7 @@ # This can be overritten by the env var "MAX_TOTAL_JOBS" DEFAULT_MAX_JOBS = 20000 -DOWNLOADER_JOBS_PER_NODE = 150 +DOWNLOADER_JOBS_PER_NODE = 75 PAGE_SIZE=2000 # This is the maximum number of non-dead nomad jobs that can be in the @@ -323,7 +323,7 @@ def requeue_downloader_job(last_job: DownloaderJob) -> bool: num_retries = last_job.num_retries + 1 ram_amount = last_job.ram_amount - if last_job.failure_reason is not None and 'harakiri' not in last_job.failure_reason: + if last_job.failure_reason is None or 'harakiri' not in last_job.failure_reason: if ram_amount == 1024: ram_amount = 4096 elif ram_amount == 4096: @@ -353,13 +353,6 @@ def requeue_downloader_job(last_job: DownloaderJob) -> bool: return False first_sample = original_file.samples.first() - if first_sample and first_sample.is_blacklisted: - last_job.no_retry = True - last_job.success = False - last_job.failure_reason = "Sample run accession has been blacklisted by SRA." - last_job.save() - logger.info("Avoiding requeuing for DownloaderJob for blacklisted run accession: " + str(first_sample.accession_code)) - return False # This is a magic string that all the dbGaP studies appear to have if first_sample and ("in the dbGaP study" in first_sample.title): diff --git a/foreman/data_refinery_foreman/foreman/management/commands/organism_shepherd.py b/foreman/data_refinery_foreman/foreman/management/commands/organism_shepherd.py index 2b340c39a..ca4be62e3 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/organism_shepherd.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/organism_shepherd.py @@ -71,11 +71,16 @@ def build_completion_list(organism: Organism) -> List[Dict]: else: unprocessed_samples.add(sample) - completion_list.append({ - "experiment": experiment, - "unprocessed": unprocessed_samples, - "processed": processed_samples - }) + # For now we only want to queue samples from experiments from + # which we've been able to process at least one sample, + # because that means the sample probably does not have unmated + # reads. Unmated reads currently break our salmon pipeline. + if len(processed_samples) > 0: + completion_list.append({ + "experiment": experiment, + "unprocessed": unprocessed_samples, + "processed": processed_samples + }) return completion_list @@ -263,16 +268,16 @@ def handle(self, *args, **options): # active volumes. Also in order to spread them around # do so randomly. We don't want to hammer Nomad to # get the active volumes though, so just do it once - # per 10 minute loop. + # per 5 minute loop. volume_index = random.choice(list(get_active_volumes())) for i in range(num_short_from_max): if len(prioritized_job_list) > 0: requeue_job(prioritized_job_list.pop(0), volume_index) - # Wait 10 minutes in between queuing additional work to + # Wait 5 minutes in between queuing additional work to # give it time to actually get done. if len(prioritized_job_list) > 0: - logger.info("Sleeping for 10 minutes while jobs get done.") - time.sleep(600) + logger.info("Sleeping for 5 minutes while jobs get done.") + time.sleep(300) logger.info("Successfully requeued all jobs for unprocessed %s samples.", organism_name) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py new file mode 100644 index 000000000..acbaa5ae4 --- /dev/null +++ b/foreman/data_refinery_foreman/foreman/management/commands/retry_samples.py @@ -0,0 +1,73 @@ +import random +import sys +import time +from typing import Dict, List + +from django.core.management.base import BaseCommand + +from data_refinery_common.models import ( + Sample, +) +from data_refinery_common.logging import get_and_configure_logger +from data_refinery_common.job_management import create_downloader_job +from data_refinery_foreman.foreman.performant_pagination.pagination import PerformantPaginator as Paginator + + +logger = get_and_configure_logger(__name__) + +PAGE_SIZE=2000 + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + "--source-database", + type=str, + help=("The name of a source database, such as Array Express, GEO, or SRA." + "All samples from this source database will have downloader " + "jobs requeued for them.") + ) + + def handle(self, *args, **options): + """Requeues all unprocessed RNA-Seq samples for an organism. + """ + if options["source_database"] is None: + logger.error("You must specify a source-database.") + sys.exit(1) + else: + source_database = options["source_database"] + + sra_samples = Sample.objects.filter( + source_database=source_database + ).prefetch_related( + "computed_files", + "original_files" + ) + + paginator = Paginator(sra_samples, PAGE_SIZE) + page = paginator.page() + page_count = 0 + + creation_count = 0 + while True: + for sample in page.object_list: + if sample.computed_files.count() == 0: + logger.debug("Creating downloader job for a sample.", + sample=sample.accession_code) + if create_downloader_job(sample.original_files.all(), force=True): + creation_count += 1 + + logger.info( + "Created %d new downloader jobs because their samples lacked computed files.", + creation_count + ) + + if not page.has_next(): + break + else: + page = paginator.page(page.next_page_number()) + + creation_count = 0 + + # 2000 samples queued up every five minutes should be fast + # enough and also not thrash the DB. + time.sleep(60 * 5) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/test_organism_shepherd.py b/foreman/data_refinery_foreman/foreman/management/commands/test_organism_shepherd.py index 3fc0fcefa..c176ad189 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/test_organism_shepherd.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/test_organism_shepherd.py @@ -205,11 +205,13 @@ def mock_init_nomad(host, port=0, timeout=0): fifty_percent_processor_job.refresh_from_db() self.assertEqual(first_call_job_object, fifty_percent_processor_job.retried_job) - second_call_job_type = mock_calls[1][1][0] - second_call_job_object = mock_calls[1][2]["job"] - self.assertEqual(second_call_job_type, Downloaders.SRA) - self.assertEqual(second_call_job_object.accession_code, zero_percent_dl_job.accession_code) - self.assertEqual(second_call_job_object.downloader_task, zero_percent_dl_job.downloader_task) - - zero_percent_dl_job.refresh_from_db() - self.assertEqual(second_call_job_object, zero_percent_dl_job.retried_job) + # For now we aren't queuing experiments that haven't been processed at all. + self.assertEqual(len(mock_calls), 1) + # second_call_job_type = mock_calls[1][1][0] + # second_call_job_object = mock_calls[1][2]["job"] + # self.assertEqual(second_call_job_type, Downloaders.SRA) + # self.assertEqual(second_call_job_object.accession_code, zero_percent_dl_job.accession_code) + # self.assertEqual(second_call_job_object.downloader_task, zero_percent_dl_job.downloader_task) + + # zero_percent_dl_job.refresh_from_db() + # self.assertEqual(second_call_job_object, zero_percent_dl_job.retried_job) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py b/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py new file mode 100644 index 000000000..d1f2de7b5 --- /dev/null +++ b/foreman/data_refinery_foreman/foreman/management/commands/update_downloadable_samples.py @@ -0,0 +1,11 @@ +from django.core.management.base import BaseCommand +from data_refinery_common.models import Experiment, ComputationalResultAnnotation + +class Command(BaseCommand): + def handle(self, *args, **options): + organism_ids = list(ComputationalResultAnnotation.objects.filter(data__is_qn=True).values_list('data__organism_id', flat=True)) + for experiment in Experiment.objects.all(): + experiment.num_downloadable_samples = experiment.samples.filter(is_processed=True, organism__id__in=organism_ids).count() + experiment.save() + + print("Updated the num_downloadable_samples field on all experiment objects.") diff --git a/foreman/data_refinery_foreman/surveyor/sra.py b/foreman/data_refinery_foreman/surveyor/sra.py index 086af0e98..e98da8b07 100644 --- a/foreman/data_refinery_foreman/surveyor/sra.py +++ b/foreman/data_refinery_foreman/surveyor/sra.py @@ -331,6 +331,14 @@ def _build_ncbi_file_url(run_accession: str): return download_url + @staticmethod + def _apply_harmonized_metadata_to_sample(sample: Sample, metadata: dict): + """Harmonizes the metadata and applies it to `sample`""" + sample.title = harmony.extract_title(metadata) + harmonized_sample = harmony.harmonize([metadata]) + for key, value in harmonized_sample[sample.title].items(): + setattr(sample, key, value) + def _generate_experiment_and_samples(self, run_accession: str, study_accession: str=None) -> (Experiment, List[Sample]): """Generates Experiments and Samples for the provided run_accession.""" metadata = SraSurveyor.gather_all_metadata(run_accession) @@ -466,11 +474,7 @@ def _generate_experiment_and_samples(self, run_accession: str, study_accession: else: sample_object.manufacturer = "UNKNOWN" - # Directly apply the harmonized values - sample_object.title = harmony.extract_title(metadata) - harmonized_sample = harmony.harmonize([metadata]) - for key, value in harmonized_sample.items(): - setattr(sample_object, key, value) + SraSurveyor._apply_harmonized_metadata_to_sample(sample_object, metadata) protocol_info, is_updated = self.update_sample_protocol_info( existing_protocols=[], diff --git a/foreman/data_refinery_foreman/surveyor/test_sra.py b/foreman/data_refinery_foreman/surveyor/test_sra.py index 902ab1cdf..3b039afc4 100644 --- a/foreman/data_refinery_foreman/surveyor/test_sra.py +++ b/foreman/data_refinery_foreman/surveyor/test_sra.py @@ -194,3 +194,11 @@ def test_metadata_is_gathered_correctly(self, mock_get): ncbi_url = SraSurveyor._build_ncbi_file_url(metadata["run_accession"]) self.assertTrue(ncbi_url in ['anonftp@ftp.ncbi.nlm.nih.gov:/sra/sra-instant/reads/ByRun/sra/DRR/DRR002/DRR002116/DRR002116.sra', 'anonftp@ftp-private.ncbi.nlm.nih.gov:/sra/sra-instant/reads/ByRun/sra/DRR/DRR002/DRR002116/DRR002116.sra', 'dbtest@sra-download.ncbi.nlm.nih.gov:data/sracloud/traces/dra0/DRR/000002/DRR002116']) + + def test_sra_metadata_is_harmonized(self): + metadata = SraSurveyor.gather_all_metadata("SRR3098582") + sample = Sample() + SraSurveyor._apply_harmonized_metadata_to_sample(sample, metadata) + self.assertEqual(sample.treatment, "biliatresone") + self.assertEqual(sample.subject, "liver") + self.assertEqual(sample.specimen_part, "liver") diff --git a/foreman/data_refinery_foreman/surveyor/transcriptome_index.py b/foreman/data_refinery_foreman/surveyor/transcriptome_index.py index 6c64d228e..e4bfbbca2 100644 --- a/foreman/data_refinery_foreman/surveyor/transcriptome_index.py +++ b/foreman/data_refinery_foreman/surveyor/transcriptome_index.py @@ -60,12 +60,7 @@ def __init__(self, species: Dict): self.url_root = "ensemblgenomes.org/pub/release-{assembly_version}/{short_division}" self.short_division = DIVISION_LOOKUP[species["division"]] self.assembly = species["assembly_name"].replace(" ", "_") - - # For some reason the API is returning version 44, which doesn't seem to exist - # in the FTP servers: ftp://ftp.ensemblgenomes.org/pub/ - # That's why the version is hardcoded below. - # self.assembly_version = utils.requests_retry_session().get(DIVISION_RELEASE_URL).json()["version"] - self.assembly_version = '43' + self.assembly_version = utils.requests_retry_session().get(DIVISION_RELEASE_URL).json()["version"] self.species_sub_dir = species["name"] self.filename_species = species["name"].capitalize() diff --git a/infrastructure/environments/prod.tfvars b/infrastructure/environments/prod.tfvars index 880cb391a..9d07c6d40 100644 Binary files a/infrastructure/environments/prod.tfvars and b/infrastructure/environments/prod.tfvars differ diff --git a/infrastructure/foreman-configuration/foreman-server-instance-user-data.tpl.sh b/infrastructure/foreman-configuration/foreman-server-instance-user-data.tpl.sh index 11daae616..a6cb85c32 100644 --- a/infrastructure/foreman-configuration/foreman-server-instance-user-data.tpl.sh +++ b/infrastructure/foreman-configuration/foreman-server-instance-user-data.tpl.sh @@ -42,6 +42,29 @@ docker run \\ chmod +x /home/ubuntu/run_foreman.sh /home/ubuntu/run_foreman.sh +# The foreman instance is used for running various management +# commands. This script provides an easy way to do that. +echo " +#!/bin/sh + +# This script should be used by passing a management command name as +# the first argument followed by any additional arguments to that +# command. + +docker run \\ + --env-file /home/ubuntu/environment \\ + -e DATABASE_HOST=${database_host} \\ + -e DATABASE_NAME=${database_name} \\ + -e DATABASE_USER=${database_user} \\ + -e DATABASE_PASSWORD=${database_password} \\ + -e ELASTICSEARCH_HOST=${elasticsearch_host} \\ + -e ELASTICSEARCH_PORT=${elasticsearch_port} \\ + -v /tmp:/tmp \\ + --add-host=nomad:${nomad_lead_server_ip} \\ + -it -d ${dockerhub_repo}/${foreman_docker_image} python3 manage.py \$@ +" >> /home/ubuntu/run_management_command.sh +chmod +x /home/ubuntu/run_foreman_command.sh + # Start the Nomad agent in server mode via Monit apt-get -y update apt-get -y install monit htop diff --git a/workers/data_refinery_workers/processors/qn_reference.py b/workers/data_refinery_workers/processors/qn_reference.py index 6433a2f97..352f9f03f 100644 --- a/workers/data_refinery_workers/processors/qn_reference.py +++ b/workers/data_refinery_workers/processors/qn_reference.py @@ -20,6 +20,7 @@ Processor, SampleComputedFileAssociation, SampleResultAssociation, + Experiment, ) from data_refinery_common.utils import get_env_variable from data_refinery_workers.processors import utils, smasher @@ -210,6 +211,17 @@ def _create_result_objects(job_context: Dict) -> Dict: job_context['success'] = True return job_context +def _update_experiment_caches(job_context: Dict) -> Dict: + """ Experiments have a cached value with the number of samples that have QN targets + generated, this value should be updated after generating new QN targets. """ + organism_name = job_context['samples']['ALL'][0].organism.name + unique_experiments = Experiment.objects.all().filter(organism_names__contains=[organism_name]) + + for experiment in unique_experiments: + experiment.update_num_samples() + + return job_context + def create_qn_reference(job_id: int) -> None: pipeline = Pipeline(name=utils.PipelineEnum.QN_REFERENCE.value) job_context = utils.run_pipeline({"job_id": job_id, "pipeline": pipeline}, @@ -218,5 +230,6 @@ def create_qn_reference(job_id: int) -> None: _build_qn_target, # _verify_result, _create_result_objects, + _update_experiment_caches, utils.end_job]) return job_context diff --git a/workers/data_refinery_workers/processors/salmon.py b/workers/data_refinery_workers/processors/salmon.py index b5e4dc2e3..f34def105 100644 --- a/workers/data_refinery_workers/processors/salmon.py +++ b/workers/data_refinery_workers/processors/salmon.py @@ -785,10 +785,27 @@ def _run_salmon(job_context: Dict) -> Dict: formatted_command, processor_job=job_context["job_id"]) + # 1 hour seems like a reasonable timeout for salmon. This is + # necessary because some samples make salmon hang forever and this + # ties up our computing resources forever. Until this bug is + # fixed, we'll just have to do it like this. + timeout = 60 * 60 job_context['time_start'] = timezone.now() - completed_command = subprocess.run(formatted_command.split(), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + try: + completed_command = subprocess.run(formatted_command.split(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=timeout) + except subprocess.TimeoutExpired: + failure_reason = "Salmon timed out because it failed to complete within 3 hours." + logger.error(failure_reason, + sample_accesion_code=job_context["sample"].accession_code, + processor_job=job_context["job_id"] + ) + job_context["job"].failure_reason = failure_reason + job_context["success"] = False + return job_context + job_context['time_end'] = timezone.now() ## To me, this looks broken: error codes are anything non-zero. diff --git a/workers/data_refinery_workers/processors/smasher.py b/workers/data_refinery_workers/processors/smasher.py index e2396cb6f..7e2e7c811 100644 --- a/workers/data_refinery_workers/processors/smasher.py +++ b/workers/data_refinery_workers/processors/smasher.py @@ -9,6 +9,7 @@ import simplejson as json import string import warnings +import requests from botocore.exceptions import ClientError from datetime import datetime, timedelta @@ -560,7 +561,7 @@ def _smash(job_context: Dict, how="inner") -> Dict: num_samples = num_samples + 1 if (num_samples % 100) == 0: - logger.warning("Loaded " + str(num_samples) + " samples into frames.", + logger.debug("Loaded " + str(num_samples) + " samples into frames.", dataset_id=job_context['dataset'].id, how=how ) @@ -904,6 +905,38 @@ def _notify(job_context: Dict) -> Dict: # SES ## if job_context.get("upload", True) and settings.RUNNING_IN_CLOUD: + # Link to the dataset page, where the user can re-try the download job + dataset_url = 'https://www.refine.bio/dataset/' + str(job_context['dataset'].id) + + # Send a notification to slack when a dataset fails to be processed + if job_context['job'].failure_reason not in ['', None]: + try: + requests.post( + "https://hooks.slack.com/services/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", + json={ + 'fallback': 'Dataset failed processing.', + 'title': 'Dataset failed processing', + 'title_link': dataset_url, + "attachments":[ + { + "color": "warning", + "text": job_context['job'].failure_reason, + 'author_name': job_context["dataset"].email_address, + 'fields': [ + { + 'title': 'Dataset id', + 'value': str(job_context['dataset'].id) + } + ] + } + ] + }, + headers={'Content-Type': 'application/json'}, + timeout=10 + ) + except Exception as e: + logger.error(e) # It doens't really matter if this didn't work + pass # Don't send an email if we don't have address. if job_context["dataset"].email_address: @@ -912,8 +945,6 @@ def _notify(job_context: Dict) -> Dict: AWS_REGION = "us-east-1" CHARSET = "UTF-8" - # Link to the dataset page, where the user can re-try the download job - dataset_url = 'https://www.refine.bio/dataset/' + str(job_context['dataset'].id) if job_context['job'].failure_reason not in ['', None]: SUBJECT = "There was a problem processing your refine.bio dataset :(" @@ -934,8 +965,8 @@ def _notify(job_context: Dict) -> Dict: job_context['success'] = False else: SUBJECT = "Your refine.bio Dataset is Ready!" - BODY_TEXT = "Hot off the presses:\n\n" + job_context["result_url"] + "\n\nLove!,\nThe refine.bio Team" - FORMATTED_HTML = BODY_HTML.replace('REPLACE_DOWNLOAD_URL', job_context["result_url"])\ + BODY_TEXT = "Hot off the presses:\n\n" + dataset_url + "\n\nLove!,\nThe refine.bio Team" + FORMATTED_HTML = BODY_HTML.replace('REPLACE_DOWNLOAD_URL', dataset_url)\ .replace('REPLACE_DATASET_URL', dataset_url) # Try to send the email. diff --git a/workers/data_refinery_workers/processors/test_qn_reference.py b/workers/data_refinery_workers/processors/test_qn_reference.py index 5eca35ff0..2e123a0f5 100644 --- a/workers/data_refinery_workers/processors/test_qn_reference.py +++ b/workers/data_refinery_workers/processors/test_qn_reference.py @@ -8,7 +8,6 @@ ProcessorJob, OriginalFile, Sample, - Organism, SampleComputedFileAssociation, ProcessorJobOriginalFileAssociation, Dataset, @@ -18,6 +17,7 @@ ExperimentSampleAssociation, ProcessorJobDatasetAssociation ) +from data_refinery_common.models.organism import Organism from data_refinery_workers.processors import qn_reference, smasher, utils @@ -37,6 +37,7 @@ def test_qn_reference(self): experiment = Experiment() experiment.accession_code = "12345" + experiment.organism_names = [homo_sapiens.name] experiment.save() for code in ['1', '2', '3', '4', '5', '6']: diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index ab1538172..e949ebb18 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -13,6 +13,7 @@ from typing import List, Dict, Callable from data_refinery_common import job_lookup +from data_refinery_common.job_management import create_downloader_job from data_refinery_common.logging import get_and_configure_logger from data_refinery_common.models import ( ComputationalResult, @@ -58,132 +59,6 @@ def signal_handler(sig, frame): CURRENT_JOB.save() sys.exit(0) - -def create_downloader_job(undownloaded_files: OriginalFile, processor_job_id: int) -> bool: - """Creates a downloader job to download `undownloaded_files`.""" - if not undownloaded_files: - return False - - original_downloader_job = None - archive_file = None - for undownloaded_file in undownloaded_files: - try: - original_downloader_job = undownloaded_file.downloader_jobs.latest('id') - - # Found the job so we don't need to keep going. - break - except DownloaderJob.DoesNotExist: - # If there's no association between this file and any - # downloader jobs, it's most likely because the original - # file was created after extracting a archive containing - # multiple files worth of data. - # The way to handle this is to find that archive and - # recreate a downloader job FOR THAT. That archive will - # have the same filename as the file at the end of the - # 'source_url' field, because that source URL is pointing - # to the archive we need. - archive_filename = undownloaded_file.source_url.split("/")[-1] - - # This file or its job might not exist, but we'll wait - # until we've checked all the files before calling it a - # failure. - try: - archive_file = OriginalFile.objects.filter(filename=archive_filename) - if archive_file.count() > 0: - archive_file = archive_file.first() - else: - # We might need to match these up based on - # source_filenames rather than filenames so just - # try them both. - archive_file = OriginalFile.objects.filter(source_filename=archive_filename).first() - - original_downloader_job = DownloaderJobOriginalFileAssociation.objects.filter( - original_file=archive_file - ).latest('id').downloader_job - # Found the job so we don't need to keep going. - break - except: - pass - - if not original_downloader_job: - sample_object = list(undownloaded_files)[0].samples.first() - if sample_object: - downloader_task = job_lookup.determine_downloader_task(sample_object) - - if downloader_task == job_lookup.Downloaders.NONE: - logger.warn(("No valid downloader task found for sample, which is weird" - " because it was able to have a processor job created for it..."), - sample=sample_object.id) - return False - else: - # determine_downloader_task returns an enum object, - # but we wanna set this on the DownloaderJob object so - # we want the actual value. - downloader_task = downloader_task.value - - accession_code = sample_object.accession_code - original_files = sample_object.original_files.all() - else: - logger.error( - "Could not find the original DownloaderJob or Sample for these files.", - undownloaded_file=undownloaded_files - ) - return False - elif original_downloader_job.was_recreated: - logger.warn( - "Downloader job has already been recreated once, not doing it again.", - original_downloader_job=original_downloader_job, - undownloaded_files=undownloaded_files - ) - return False - else: - downloader_task = original_downloader_job.downloader_task - accession_code = original_downloader_job.accession_code - original_files = original_downloader_job.original_files.all() - - sample_object = original_files[0].samples.first() - - new_job = DownloaderJob() - new_job.downloader_task = downloader_task - new_job.accession_code = accession_code - new_job.was_recreated = True - new_job.ram_amount = 1024 - new_job.save() - - if archive_file: - # If this downloader job is for an archive file, then the - # files that were passed into this function aren't what need - # to be directly downloaded, they were extracted out of this - # archive. The DownloaderJob will re-extract them and set up - # the associations for the new ProcessorJob. - # So double check that it still needs downloading because - # another file that came out of it could have already - # recreated the DownloaderJob. - if archive_file.needs_downloading(processor_job_id): - if archive_file.is_downloaded: - # If it needs to be downloaded then it's not - # downloaded and the is_downloaded field should stop - # lying about that. - archive_file.is_downloaded = False - archive_file.save() - - DownloaderJobOriginalFileAssociation.objects.get_or_create( - downloader_job=new_job, - original_file=archive_file - ) - else: - # We can't just associate the undownloaded files, because - # there's a chance that there is a file which actually is - # downloaded that also needs to be associated with the job. - for original_file in original_files: - DownloaderJobOriginalFileAssociation.objects.get_or_create( - downloader_job=new_job, - original_file=original_file - ) - - return True - - def prepare_original_files(job_context): """ Provision in the Job context for OriginalFile-driven processors """ @@ -217,7 +92,12 @@ def prepare_original_files(job_context): missing_files=list(undownloaded_files) ) - if not create_downloader_job(undownloaded_files, job_context["job_id"]): + was_job_created = create_downloader_job( + undownloaded_files, + processor_job_id=job_context["job_id"], + force=True + ) + if not was_job_created: failure_reason = "Missing file for processor job but unable to recreate downloader jobs!" logger.error(failure_reason, processor_job=job.id) job_context["success"] = False @@ -384,7 +264,7 @@ def end_job(job_context: Dict, abort=False): if not abort: if job_context.get("success", False) and not (job_context["job"].pipeline_applied in ["SMASHER", "QN_REFERENCE", "COMPENDIA"]): - + # Salmon requires the final `tximport` step to be fully `is_processed`. mark_as_processed = True if (job_context["job"].pipeline_applied == "SALMON" and not job_context.get('tximported', False)): @@ -407,7 +287,6 @@ def end_job(job_context: Dict, abort=False): for experiment in unique_experiments: experiment.update_num_samples() - experiment.save() # If we are aborting, it's because we want to do something # different, so leave the original files so that "something