Skip to content

Commit

Permalink
[Issue #1746] Add transformations for assistance listing table
Browse files Browse the repository at this point in the history
  • Loading branch information
chouinar committed Apr 29, 2024
1 parent fda6a89 commit 54774fd
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from src.adapters import db
from src.constants.lookup_constants import OpportunityCategory
from src.db.models.base import ApiSchemaTable, TimestampMixin
from src.db.models.opportunity_models import Opportunity
from src.db.models.staging.opportunity import Topportunity
from src.db.models.opportunity_models import Opportunity, OpportunityAssistanceListing
from src.db.models.staging.opportunity import Topportunity, TopportunityCfda
from src.db.models.staging.staging_base import StagingBase, StagingParamMixin
from src.task.task import Task
from src.util import datetime_util
Expand Down Expand Up @@ -68,6 +68,15 @@ def fetch(
).all(),
)

def fetch_with_opportunity(self, source_model: Type[S], destination_model: Type[D], join_clause: list) -> list[Tuple[S, D | None, Opportunity | None]]:
return self.db_session.execute(
select(source_model, destination_model, Opportunity)
.join(destination_model, *join_clause, isouter=True)
.join(Opportunity, source_model.opportunity_id == Opportunity.opportunity_id)
.where(source_model.transformed_at.is_(None))
.execution_options(yield_per=5000)
).all()

def process_opportunities(self) -> None:
# Fetch all opportunities that were modified
# Alongside that, grab the existing opportunity record
Expand Down Expand Up @@ -121,8 +130,58 @@ def process_opportunity(
source_opportunity.transformed_at = self.transform_time

def process_assistance_listings(self) -> None:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/1746
pass

assistance_listings: list[Tuple[TopportunityCfda, OpportunityAssistanceListing | None, Opportunity | None]] = self.fetch_with_opportunity(
TopportunityCfda,
OpportunityAssistanceListing,
[TopportunityCfda.opp_cfda_id == OpportunityAssistanceListing.opportunity_assistance_listing_id]
)

for source_assistance_listing, target_assistance_listing, opportunity in assistance_listings:
try:
# TODO - docs & more detail
if opportunity is None:
logger.warning("TODO")
continue

self.process_assistance_listing(source_assistance_listing, target_assistance_listing)
except ValueError:
self.increment(self.Metrics.TOTAL_ERROR_COUNT)
logger.exception(
"Failed to process assistance listing",
extra={"opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id},
)

def process_assistance_listing(self, source_assistance_listing: TopportunityCfda, target_assistance_listing: OpportunityAssistanceListing | None) -> None:
self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED)
extra = {"opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id}
logger.info("Processing assistance listing", extra=extra)

if source_assistance_listing.is_deleted:
logger.info("Deleting assistance listing", extra=extra)

if target_assistance_listing is None:
raise ValueError("Cannot delete assistance listing as it does not exist")

self.increment(self.Metrics.TOTAL_RECORDS_DELETED)
self.db_session.delete(target_assistance_listing)

else:
# To avoid incrementing metrics for records we fail to transform, record
# here whether it's an insert/update and we'll increment after transforming
is_insert = target_assistance_listing is None

logger.info("Transforming and upserting assistance listing", extra=extra)
transformed_assistance_listing = transform_assistance_listing(source_assistance_listing, target_assistance_listing)
self.db_session.add(transformed_assistance_listing)

if is_insert:
self.increment(self.Metrics.TOTAL_RECORDS_INSERTED)
else:
self.increment(self.Metrics.TOTAL_RECORDS_UPDATED)

logger.info("Processed assistance listing", extra=extra)
source_assistance_listing.transformed_at = self.transform_time

def process_opportunity_summaries(self) -> None:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/1747
Expand Down Expand Up @@ -186,6 +245,18 @@ def transform_opportunity_category(value: str | None) -> OpportunityCategory | N
return transformed_value


def transform_assistance_listing(source_assistance_listing: TopportunityCfda, target_assistance_listing: OpportunityAssistanceListing | None) -> OpportunityAssistanceListing:
log_extra = {"opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id}

if target_assistance_listing is None:
logger.info("Creating new assistance listing record", extra=log_extra)
target_assistance_listing = OpportunityAssistanceListing(opportunity_assistance_listing_id=source_assistance_listing.opp_cfda_id, opportunity_id=source_assistance_listing.opportunity_id)

target_assistance_listing.assistance_listing_number = source_assistance_listing.cfdanumber
target_assistance_listing.program_title = source_assistance_listing.programtitle

transform_update_create_timestamp(source_assistance_listing, target_assistance_listing, log_extra=log_extra)

def convert_est_timestamp_to_utc(timestamp: datetime | None) -> datetime | None:
if timestamp is None:
return None
Expand Down
4 changes: 4 additions & 0 deletions api/src/db/models/staging/opportunity.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from src.db.legacy_mixin import opportunity_mixin
from src.db.models.staging.staging_base import StagingBase, StagingParamMixin

from sqlalchemy.orm import Mapped, relationship

class Topportunity(StagingBase, opportunity_mixin.TopportunityMixin, StagingParamMixin):
__tablename__ = "topportunity"

cfdas: Mapped[list["TopportunityCfda"]] = relationship(primaryjoin="remote(Topportunity.opportunity_id) == foreign(TopportunityCfda.opportunity_id)", uselist=True)

class TopportunityCfda(StagingBase, opportunity_mixin.TopportunityCfdaMixin, StagingParamMixin):
__tablename__ = "topportunity_cfda"

opportunity: Mapped[Topportunity | None] = relationship(primaryjoin="remote(TopportunityCfda.opportunity_id) == foreign(Topportunity.opportunity_id)", uselist=False)
35 changes: 35 additions & 0 deletions api/tests/src/db/models/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,12 @@ class Meta:
# Staging Table Factories
####################################

class ForeignTopportunityFactory(BaseFactory):
class Meta:
model = x




class StagingTopportunityFactory(BaseFactory):
class Meta:
Expand Down Expand Up @@ -628,6 +634,12 @@ class Meta:
is_deleted = False
transformed_at = None

cfdas = factory.RelatedFactoryList(
"tests.src.db.models.factories.StagingTopportunityCfdaFactory",
factory_related_name="opportunity",
size=lambda: random.randint(1, 3),
)

class Params:
already_transformed = factory.Trait(
transformed_at=factory.Faker("date_time_between", start_date="-7d", end_date="-1d")
Expand All @@ -643,6 +655,29 @@ class Params:
category_explanation=None,
)

class StagingTopportunityCfdaFactory(BaseFactory):
class Meta:
model = staging.opportunity.TopportunityCfda

opp_cfda_id = factory.Sequence(lambda n: n)

opportunity = factory.SubFactory(StagingTopportunityFactory)
opportunity_id = factory.LazyAttribute(lambda s: s.opportunity.opportunity_id)

programtitle = factory.Faker("company")
cfdanumber = factory.LazyFunction(
lambda: f"{fake.random_int(min=1, max=99):02}.{fake.random_int(min=1, max=999):03}"
)

created_date = factory.Faker("date_time_between", start_date="-10y", end_date="-5y")
last_upd_date = sometimes_none(
factory.Faker("date_time_between", start_date="-5y", end_date="now")
)

# Default to being a new insert/update
is_deleted = False
transformed_at = None


####################################
# Transfer Table Factories
Expand Down

0 comments on commit 54774fd

Please sign in to comment.