Skip to content

Commit

Permalink
Merge pull request #1361 from AlexsLemonade/dev
Browse files Browse the repository at this point in the history
Bring master up to date with dev
  • Loading branch information
kurtwheeler authored Jul 1, 2019
2 parents e134437 + dcaa6d7 commit af9d26d
Show file tree
Hide file tree
Showing 25 changed files with 418 additions and 193 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,5 @@ dr_env/

# emacs backup files
*~

.vscode
11 changes: 0 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,6 @@ Instructions for installing Docker, Terraform, and Nomad can be found by
following the link for each service. git-crypt, jq, and iproute2 can be installed via
`sudo apt-get install git-crypt jq iproute2`.

When installing pip packages later in the install, you might get an error saying you need sudo permissions.
In order to fix this you have to edit your `~/.config/pip/pip.conf` to add this:

```
[install]
user = yes
no-binary = :all:
```

This sets pip to install all packages in your user directory so sudo is not required for pip intalls.

#### Mac

The following services will need to be installed:
Expand Down
9 changes: 6 additions & 3 deletions api/data_refinery_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,7 @@ class Meta:
class DetailedExperimentSerializer(serializers.ModelSerializer):
annotations = ExperimentAnnotationSerializer(many=True, source='experimentannotation_set')
samples = DetailedExperimentSampleSerializer(many=True)
organisms = OrganismSerializer(many=True)
sample_metadata = serializers.ReadOnlyField(source='get_sample_metadata_fields')
sample_metadata = serializers.ReadOnlyField(source='sample_metadata_fields')

class Meta:
model = Experiment
Expand All @@ -414,8 +413,11 @@ class Meta:
'submitter_institution',
'last_modified',
'created_at',
'organisms',
'organism_names',
'sample_metadata',
'num_total_samples',
'num_processed_samples',
'num_downloadable_samples'
)

class PlatformSerializer(serializers.ModelSerializer):
Expand Down Expand Up @@ -796,6 +798,7 @@ class ExperimentDocumentSerializer(serializers.Serializer):
pubmed_id = serializers.CharField(read_only=True)
num_total_samples = serializers.IntegerField(read_only=True)
num_processed_samples = serializers.IntegerField(read_only=True)
num_downloadable_samples = serializers.IntegerField(read_only=True)
source_first_published = serializers.DateField(read_only=True)

# FK/M2M
Expand Down
1 change: 1 addition & 0 deletions api/data_refinery_api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ def test_es_endpoint(self):
experiment.technology = "MICROARRAY"
experiment.num_processed_samples = 1 # added below
experiment.num_total_samples = 1
experiment.num_downloadable_samples = 1
experiment.save()
self.experiment = experiment

Expand Down
14 changes: 11 additions & 3 deletions api/data_refinery_api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ def aggregate(self, request, queryset, view):
All we need to add is one line when building the facets:
.metric('total_samples', 'sum', field='num_processed_samples')
.metric('total_samples', 'sum', field='num_downloadable_samples')
(Maybe there's a way to do this with the options in `ExperimentDocumentView`)
"""
facets = self.construct_facets(request, view)
for field, facet in iteritems(facets):
agg = facet['facet'].get_aggregation()
queryset.aggs.bucket(field, agg)\
.metric('total_samples', 'sum', field='num_processed_samples')
.metric('total_samples', 'sum', field='num_downloadable_samples')
return queryset


Expand Down Expand Up @@ -236,6 +236,14 @@ class ExperimentDocumentView(DocumentViewSet):
LOOKUP_QUERY_IN,
LOOKUP_QUERY_GT
],
},
'num_downloadable_samples': {
'field': 'num_downloadable_samples',
'lookups': [
LOOKUP_FILTER_RANGE,
LOOKUP_QUERY_IN,
LOOKUP_QUERY_GT
],
}
}

Expand All @@ -245,7 +253,7 @@ class ExperimentDocumentView(DocumentViewSet):
'title': 'title.raw',
'description': 'description.raw',
'num_total_samples': 'num_total_samples',
'num_processed_samples': 'num_processed_samples',
'num_downloadable_samples': 'num_downloadable_samples',
'source_first_published': 'source_first_published'
}

Expand Down
145 changes: 145 additions & 0 deletions common/data_refinery_common/job_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from typing import List, Dict, Callable

from data_refinery_common import job_lookup
from data_refinery_common.logging import get_and_configure_logger
from data_refinery_common.models import (
DownloaderJob,
DownloaderJobOriginalFileAssociation,
OriginalFile,
)
from data_refinery_common.utils import (
get_env_variable,
get_env_variable_gracefully,
get_instance_id,
)


logger = get_and_configure_logger(__name__)


def create_downloader_job(undownloaded_files: List[OriginalFile],
*,
processor_job_id=None,
force=False) -> bool:
"""Creates a downloader job to download `undownloaded_files`."""
if not undownloaded_files:
return False

original_downloader_job = None
archive_file = None
for undownloaded_file in undownloaded_files:
try:
original_downloader_job = undownloaded_file.downloader_jobs.latest('id')

# Found the job so we don't need to keep going.
break
except DownloaderJob.DoesNotExist:
# If there's no association between this file and any
# downloader jobs, it's most likely because the original
# file was created after extracting a archive containing
# multiple files worth of data.
# The way to handle this is to find that archive and
# recreate a downloader job FOR THAT. That archive will
# have the same filename as the file at the end of the
# 'source_url' field, because that source URL is pointing
# to the archive we need.
archive_filename = undownloaded_file.source_url.split("/")[-1]

# This file or its job might not exist, but we'll wait
# until we've checked all the files before calling it a
# failure.
try:
archive_file = OriginalFile.objects.filter(filename=archive_filename)
if archive_file.count() > 0:
archive_file = archive_file.first()
else:
# We might need to match these up based on
# source_filenames rather than filenames so just
# try them both.
archive_file = OriginalFile.objects.filter(source_filename=archive_filename).first()

original_downloader_job = DownloaderJobOriginalFileAssociation.objects.filter(
original_file=archive_file
).latest('id').downloader_job
# Found the job so we don't need to keep going.
break
except:
pass

if not original_downloader_job:
sample_object = list(undownloaded_files)[0].samples.first()
if sample_object:
downloader_task = job_lookup.determine_downloader_task(sample_object)

if downloader_task == job_lookup.Downloaders.NONE:
logger.warn(("No valid downloader task found for sample, which is weird"
" because it was able to have a processor job created for it..."),
sample=sample_object.id)
return False
else:
# determine_downloader_task returns an enum object,
# but we wanna set this on the DownloaderJob object so
# we want the actual value.
downloader_task = downloader_task.value

accession_code = sample_object.accession_code
original_files = sample_object.original_files.all()
else:
logger.error(
"Could not find the original DownloaderJob or Sample for these files.",
undownloaded_file=undownloaded_files
)
return False
elif original_downloader_job.was_recreated and not force:
logger.warn(
"Downloader job has already been recreated once, not doing it again.",
original_downloader_job=original_downloader_job,
undownloaded_files=undownloaded_files
)
return False
else:
downloader_task = original_downloader_job.downloader_task
accession_code = original_downloader_job.accession_code
original_files = original_downloader_job.original_files.all()

sample_object = original_files[0].samples.first()

new_job = DownloaderJob()
new_job.downloader_task = downloader_task
new_job.accession_code = accession_code
new_job.was_recreated = True
new_job.ram_amount = 1024
new_job.save()

if archive_file:
# If this downloader job is for an archive file, then the
# files that were passed into this function aren't what need
# to be directly downloaded, they were extracted out of this
# archive. The DownloaderJob will re-extract them and set up
# the associations for the new ProcessorJob.
# So double check that it still needs downloading because
# another file that came out of it could have already
# recreated the DownloaderJob.
if archive_file.needs_downloading(processor_job_id):
if archive_file.is_downloaded:
# If it needs to be downloaded then it's not
# downloaded and the is_downloaded field should stop
# lying about that.
archive_file.is_downloaded = False
archive_file.save()

DownloaderJobOriginalFileAssociation.objects.get_or_create(
downloader_job=new_job,
original_file=archive_file
)
else:
# We can't just associate the undownloaded files, because
# there's a chance that there is a file which actually is
# downloaded that also needs to be associated with the job.
for original_file in original_files:
DownloaderJobOriginalFileAssociation.objects.get_or_create(
downloader_job=new_job,
original_file=original_file
)

return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 2.1.8 on 2019-07-01 14:16

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('data_refinery_common', '0020_update_qn_bucket'),
]

operations = [
migrations.AddField(
model_name='experiment',
name='num_downloadable_samples',
field=models.IntegerField(default=0),
),
]
1 change: 1 addition & 0 deletions common/data_refinery_common/models/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class ExperimentDocument(DocType):
pubmed_id = fields.TextField()
num_total_samples = fields.IntegerField()
num_processed_samples = fields.IntegerField()
num_downloadable_samples = fields.IntegerField()
source_first_published = fields.DateField()

# FK/M2M
Expand Down
3 changes: 3 additions & 0 deletions common/data_refinery_common/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def __str__(self):
# Cached Computed Properties
num_total_samples = models.IntegerField(default=0)
num_processed_samples = models.IntegerField(default=0)
num_downloadable_samples = models.IntegerField(default=0)
sample_metadata_fields = ArrayField(models.TextField(), default=list)
organism_names = ArrayField(models.TextField(), default=list)
platform_names = ArrayField(models.TextField(), default=list)
Expand Down Expand Up @@ -326,6 +327,8 @@ def update_num_samples(self):
""" Update our cache values """
self.num_total_samples = self.samples.count()
self.num_processed_samples = self.samples.filter(is_processed=True).count()
qn_organisms = Organism.get_objects_with_qn_targets()
self.num_downloadable_samples = self.samples.filter(is_processed=True, organism__in=qn_organisms).count()
self.save()

def to_metadata_dict(self):
Expand Down
2 changes: 1 addition & 1 deletion common/data_refinery_common/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Set
from typing import Dict, Set, List
from urllib.parse import urlparse
import csv
import nomad
Expand Down
11 changes: 2 additions & 9 deletions foreman/data_refinery_foreman/foreman/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
# This can be overritten by the env var "MAX_TOTAL_JOBS"
DEFAULT_MAX_JOBS = 20000

DOWNLOADER_JOBS_PER_NODE = 150
DOWNLOADER_JOBS_PER_NODE = 75
PAGE_SIZE=2000

# This is the maximum number of non-dead nomad jobs that can be in the
Expand Down Expand Up @@ -323,7 +323,7 @@ def requeue_downloader_job(last_job: DownloaderJob) -> bool:
num_retries = last_job.num_retries + 1

ram_amount = last_job.ram_amount
if last_job.failure_reason is not None and 'harakiri' not in last_job.failure_reason:
if last_job.failure_reason is None or 'harakiri' not in last_job.failure_reason:
if ram_amount == 1024:
ram_amount = 4096
elif ram_amount == 4096:
Expand Down Expand Up @@ -353,13 +353,6 @@ def requeue_downloader_job(last_job: DownloaderJob) -> bool:
return False

first_sample = original_file.samples.first()
if first_sample and first_sample.is_blacklisted:
last_job.no_retry = True
last_job.success = False
last_job.failure_reason = "Sample run accession has been blacklisted by SRA."
last_job.save()
logger.info("Avoiding requeuing for DownloaderJob for blacklisted run accession: " + str(first_sample.accession_code))
return False

# This is a magic string that all the dbGaP studies appear to have
if first_sample and ("in the dbGaP study" in first_sample.title):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ def build_completion_list(organism: Organism) -> List[Dict]:
else:
unprocessed_samples.add(sample)

completion_list.append({
"experiment": experiment,
"unprocessed": unprocessed_samples,
"processed": processed_samples
})
# For now we only want to queue samples from experiments from
# which we've been able to process at least one sample,
# because that means the sample probably does not have unmated
# reads. Unmated reads currently break our salmon pipeline.
if len(processed_samples) > 0:
completion_list.append({
"experiment": experiment,
"unprocessed": unprocessed_samples,
"processed": processed_samples
})

return completion_list

Expand Down Expand Up @@ -263,16 +268,16 @@ def handle(self, *args, **options):
# active volumes. Also in order to spread them around
# do so randomly. We don't want to hammer Nomad to
# get the active volumes though, so just do it once
# per 10 minute loop.
# per 5 minute loop.
volume_index = random.choice(list(get_active_volumes()))
for i in range(num_short_from_max):
if len(prioritized_job_list) > 0:
requeue_job(prioritized_job_list.pop(0), volume_index)

# Wait 10 minutes in between queuing additional work to
# Wait 5 minutes in between queuing additional work to
# give it time to actually get done.
if len(prioritized_job_list) > 0:
logger.info("Sleeping for 10 minutes while jobs get done.")
time.sleep(600)
logger.info("Sleeping for 5 minutes while jobs get done.")
time.sleep(300)

logger.info("Successfully requeued all jobs for unprocessed %s samples.", organism_name)
Loading

0 comments on commit af9d26d

Please sign in to comment.