Skip to content

Commit

Permalink
migrate upload_sources to Dask Cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
srggrs committed Oct 12, 2020
1 parent 63c71e6 commit 06b6fbf
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
8 changes: 2 additions & 6 deletions vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,10 @@ def final_operations(
axis=1,
meta=object
)
import ipdb; ipdb.set_trace() # breakpoint 0eaeae7c //
# upload sources and related to DB
upload_sources(p_run, srcs_df)
srcs_df = upload_sources(p_run, srcs_df)

# get db ids for sources
srcs_df['id'] = srcs_df['src_dj'].apply(lambda x: x.id, meta=int)

import ipdb; ipdb.set_trace() # breakpoint bcf8f142 //
import ipdb; ipdb.set_trace() # breakpoint 04f4de0a //
# gather the related df, upload to db and save to parquet file
# the df will look like
#
Expand Down
17 changes: 14 additions & 3 deletions vast_pipeline/pipeline/loading.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
import logging
import pandas as pd
import dask.dataframe as dd

from django.db import transaction

from vast_pipeline.image.main import SelavyImage
from vast_pipeline.models import Association, Measurement, Source, RelatedSource
from vast_pipeline.models import (
Association, Measurement, RelatedSource, Run, Source
)
from .utils import (
get_create_img, get_create_img_band, get_measurement_models
)
Expand Down Expand Up @@ -147,7 +150,7 @@ def upload_images(paths, config, pipeline_run):
return images, meas_dj_obj


def upload_sources(pipeline_run, srcs_df):
def upload_sources(pipeline_run: Run, srcs_df: dd.DataFrame) -> dd.DataFrame:
'''
delete previous sources for given pipeline run and bulk upload
new found sources as well as related sources
Expand All @@ -166,7 +169,15 @@ def upload_sources(pipeline_run, srcs_df):
)
logger.debug('(type, #deleted): %s', detail_del)

bulk_upload_model(srcs_df['src_dj'], Source)
dj_src_models = srcs_df['src_dj'].compute()
bulk_upload_model(dj_src_models, Source)

# get db ids for sources and drop the models
srcs_df['id'] = srcs_df['src_dj'].apply(lambda x: x.id, meta=int)
srcs_df = srcs_df.drop('src_dj', axis=1)

return srcs_df.persist()



def upload_related_sources(related):
Expand Down

0 comments on commit 06b6fbf

Please sign in to comment.