diff --git a/src/registrar/assets/sass/_theme/_admin.scss b/src/registrar/assets/sass/_theme/_admin.scss
index 8ed7026655..701b239ca5 100644
--- a/src/registrar/assets/sass/_theme/_admin.scss
+++ b/src/registrar/assets/sass/_theme/_admin.scss
@@ -773,3 +773,12 @@ div.dja__model-description{
.module caption, .inline-group h2 {
text-transform: capitalize;
}
+
+.wrapped-button-group {
+ // This button group has too many items
+ flex-wrap: wrap;
+ // Fix a weird spacing issue with USWDS a buttons in DJA
+ a.button {
+ padding: 6px 8px 10px 8px;
+ }
+}
diff --git a/src/registrar/config/urls.py b/src/registrar/config/urls.py
index bf13b950ec..dc6c8acb57 100644
--- a/src/registrar/config/urls.py
+++ b/src/registrar/config/urls.py
@@ -18,6 +18,7 @@
ExportDataType,
ExportDataUnmanagedDomains,
AnalyticsView,
+ ExportDomainRequestDataFull,
)
from registrar.views.domain_request import Step
@@ -66,6 +67,11 @@
ExportDataType.as_view(),
name="export_data_type",
),
+ path(
+ "admin/analytics/export_data_domain_requests_full/",
+ ExportDomainRequestDataFull.as_view(),
+ name="export_data_domain_requests_full",
+ ),
path(
"admin/analytics/export_data_full/",
ExportDataFull.as_view(),
diff --git a/src/registrar/models/domain_request.py b/src/registrar/models/domain_request.py
index 1ad8ae7b3b..11c355e1cc 100644
--- a/src/registrar/models/domain_request.py
+++ b/src/registrar/models/domain_request.py
@@ -52,6 +52,11 @@ class DomainRequestStatus(models.TextChoices):
WITHDRAWN = "withdrawn", "Withdrawn"
STARTED = "started", "Started"
+ @classmethod
+ def get_status_label(cls, status_name: str):
+ """Returns the associated label for a given status name"""
+ return cls(status_name).label if status_name else None
+
class StateTerritoryChoices(models.TextChoices):
ALABAMA = "AL", "Alabama (AL)"
ALASKA = "AK", "Alaska (AK)"
@@ -133,6 +138,14 @@ class OrganizationChoices(models.TextChoices):
SPECIAL_DISTRICT = "special_district", "Special district"
SCHOOL_DISTRICT = "school_district", "School district"
+ @classmethod
+ def get_org_label(cls, org_name: str):
+ """Returns the associated label for a given org name"""
+ org_names = org_name.split("_election")
+ if len(org_names) > 0:
+ org_name = org_names[0]
+ return cls(org_name).label if org_name else None
+
class OrgChoicesElectionOffice(models.TextChoices):
"""
Primary organization choices for Django admin:
diff --git a/src/registrar/models/utility/generic_helper.py b/src/registrar/models/utility/generic_helper.py
index ca6ce6c31d..f9d4303c4a 100644
--- a/src/registrar/models/utility/generic_helper.py
+++ b/src/registrar/models/utility/generic_helper.py
@@ -298,3 +298,26 @@ def replace_url_queryparams(url_to_modify: str, query_params, convert_list_to_cs
new_url = urlunparse(url_parts)
return new_url
+
+
+def convert_queryset_to_dict(queryset, is_model=True, key="id"):
+ """
+ Transforms a queryset into a dictionary keyed by a specified key (like "id").
+
+ Parameters:
+ requests (QuerySet or list of dicts): Input data.
+ is_model (bool): Indicates if each item in 'queryset' are model instances (True) or dictionaries (False).
+ key (str): Key or attribute to use for the resulting dictionary's keys.
+
+ Returns:
+ dict: Dictionary with keys derived from 'key' and values corresponding to items in 'queryset'.
+ """
+
+ if is_model:
+ request_dict = {getattr(value, key): value for value in queryset}
+ else:
+ # Querysets sometimes contain sets of dictionaries.
+ # Calling .values is an example of this.
+ request_dict = {value[key]: value for value in queryset}
+
+ return request_dict
diff --git a/src/registrar/templates/admin/analytics.html b/src/registrar/templates/admin/analytics.html
index e73f22ec51..13db3b60a9 100644
--- a/src/registrar/templates/admin/analytics.html
+++ b/src/registrar/templates/admin/analytics.html
@@ -27,28 +27,35 @@
At a glance
diff --git a/src/registrar/tests/common.py b/src/registrar/tests/common.py
index f6ad8d3e9b..923195bc18 100644
--- a/src/registrar/tests/common.py
+++ b/src/registrar/tests/common.py
@@ -735,19 +735,53 @@ def setUp(self):
self.domain_request_4 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.STARTED,
name="city4.gov",
+ is_election_board=True,
+ generic_org_type="city",
)
self.domain_request_5 = completed_domain_request(
status=DomainRequest.DomainRequestStatus.APPROVED,
name="city5.gov",
)
+ self.domain_request_6 = completed_domain_request(
+ status=DomainRequest.DomainRequestStatus.STARTED,
+ name="city6.gov",
+ )
self.domain_request_3.submit()
self.domain_request_4.submit()
-
+ self.domain_request_6.submit()
+
+ other, _ = Contact.objects.get_or_create(
+ first_name="Testy1232",
+ last_name="Tester24",
+ title="Another Tester",
+ email="te2@town.com",
+ phone="(555) 555 5557",
+ )
+ other_2, _ = Contact.objects.get_or_create(
+ first_name="Meow",
+ last_name="Tester24",
+ title="Another Tester",
+ email="te2@town.com",
+ phone="(555) 555 5557",
+ )
+ website, _ = Website.objects.get_or_create(website="igorville.gov")
+ website_2, _ = Website.objects.get_or_create(website="cheeseville.gov")
+ website_3, _ = Website.objects.get_or_create(website="https://www.example.com")
+ website_4, _ = Website.objects.get_or_create(website="https://www.example2.com")
+
+ self.domain_request_3.other_contacts.add(other, other_2)
+ self.domain_request_3.alternative_domains.add(website, website_2)
+ self.domain_request_3.current_websites.add(website_3, website_4)
+ self.domain_request_3.cisa_representative_email = "test@igorville.com"
self.domain_request_3.submission_date = get_time_aware_date(datetime(2024, 4, 2))
- self.domain_request_4.submission_date = get_time_aware_date(datetime(2024, 4, 2))
self.domain_request_3.save()
+
+ self.domain_request_4.submission_date = get_time_aware_date(datetime(2024, 4, 2))
self.domain_request_4.save()
+ self.domain_request_6.submission_date = get_time_aware_date(datetime(2024, 4, 2))
+ self.domain_request_6.save()
+
def tearDown(self):
super().tearDown()
PublicContact.objects.all().delete()
diff --git a/src/registrar/tests/test_reports.py b/src/registrar/tests/test_reports.py
index 4f308b2b6a..0028034fb2 100644
--- a/src/registrar/tests/test_reports.py
+++ b/src/registrar/tests/test_reports.py
@@ -4,6 +4,7 @@
from io import StringIO
from registrar.models.domain_request import DomainRequest
from registrar.models.domain import Domain
+from registrar.models.utility.generic_helper import convert_queryset_to_dict
from registrar.utility.csv_export import (
export_data_managed_domains_to_csv,
export_data_unmanaged_domains_to_csv,
@@ -12,7 +13,7 @@
write_csv_for_domains,
get_default_start_date,
get_default_end_date,
- write_csv_for_requests,
+ DomainRequestExport,
)
from django.core.management import call_command
@@ -23,6 +24,7 @@
import boto3_mocking
from registrar.utility.s3_bucket import S3ClientError, S3ClientErrorCodes # type: ignore
from django.utils import timezone
+from api.tests.common import less_console_noise_decorator
from .common import MockDb, MockEppLib, less_console_noise, get_time_aware_date
@@ -667,10 +669,7 @@ def test_write_requests_body_with_date_filter_pulls_requests_in_range(self):
# Define columns, sort fields, and filter condition
# We'll skip submission date because it's dynamic and therefore
# impossible to set in expected_content
- columns = [
- "Requested domain",
- "Organization type",
- ]
+ columns = ["Domain request", "Domain type", "Federal type"]
sort_fields = [
"requested_domain__name",
]
@@ -679,7 +678,12 @@ def test_write_requests_body_with_date_filter_pulls_requests_in_range(self):
"submission_date__lte": self.end_date,
"submission_date__gte": self.start_date,
}
- write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)
+
+ additional_values = ["requested_domain__name"]
+ all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
+ annotated_requests = DomainRequestExport.annotate_and_retrieve_fields(all_requests, {}, additional_values)
+ requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
+ DomainRequestExport.write_csv_for_requests(writer, columns, requests_dict)
# Reset the CSV file's position to the beginning
csv_file.seek(0)
# Read the content into a variable
@@ -687,9 +691,10 @@ def test_write_requests_body_with_date_filter_pulls_requests_in_range(self):
# We expect READY domains first, created between today-2 and today+2, sorted by created_at then name
# and DELETED domains deleted between today-2 and today+2, sorted by deleted then name
expected_content = (
- "Requested domain,Organization type\n"
- "city3.gov,Federal - Executive\n"
- "city4.gov,Federal - Executive\n"
+ "Domain request,Domain type,Federal type\n"
+ "city3.gov,Federal,Executive\n"
+ "city4.gov,City,Executive\n"
+ "city6.gov,Federal,Executive\n"
)
# Normalize line endings and remove commas,
@@ -699,6 +704,72 @@ def test_write_requests_body_with_date_filter_pulls_requests_in_range(self):
self.assertEqual(csv_content, expected_content)
+ @less_console_noise_decorator
+ def test_full_domain_request_report(self):
+ """Tests the full domain request report."""
+
+ # Create a CSV file in memory
+ csv_file = StringIO()
+ writer = csv.writer(csv_file)
+
+ # Call the report. Get existing fields from the report itself.
+ annotations = DomainRequestExport._full_domain_request_annotations()
+ additional_values = [
+ "requested_domain__name",
+ "federal_agency__agency",
+ "authorizing_official__first_name",
+ "authorizing_official__last_name",
+ "authorizing_official__email",
+ "authorizing_official__title",
+ "creator__first_name",
+ "creator__last_name",
+ "creator__email",
+ "investigator__email",
+ ]
+ requests = DomainRequest.objects.exclude(status=DomainRequest.DomainRequestStatus.STARTED)
+ annotated_requests = DomainRequestExport.annotate_and_retrieve_fields(requests, annotations, additional_values)
+ requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
+ DomainRequestExport.write_csv_for_requests(writer, DomainRequestExport.all_columns, requests_dict)
+
+ # Reset the CSV file's position to the beginning
+ csv_file.seek(0)
+ # Read the content into a variable
+ csv_content = csv_file.read()
+ print(csv_content)
+ self.maxDiff = None
+ expected_content = (
+ # Header
+ "Domain request,Submitted at,Status,Domain type,Federal type,"
+ "Federal agency,Organization name,Election office,City,State/territory,"
+ "Region,Creator first name,Creator last name,Creator email,Creator approved domains count,"
+ "Creator active requests count,Alternative domains,AO first name,AO last name,AO email,"
+ "AO title/role,Request purpose,Request additional details,Other contacts,"
+ "CISA regional representative,Current websites,Investigator\n"
+ # Content
+ "city2.gov,,In review,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,city1.gov,Testy,Tester,testy@town.com,"
+ "Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n"
+ "city3.gov,2024-04-02,Submitted,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,"
+ "cheeseville.gov | city1.gov | igorville.gov,Testy,Tester,testy@town.com,Chief Tester,"
+ "Purpose of the site,CISA-first-name CISA-last-name | There is more,Meow Tester24 te2@town.com | "
+ "Testy1232 Tester24 te2@town.com | Testy Tester testy2@town.com,test@igorville.com,"
+ "city.com | https://www.example2.com | https://www.example.com,\n"
+ "city4.gov,2024-04-02,Submitted,City,Executive,,Testorg,Yes,,NY,2,,,,0,1,city1.gov,Testy,Tester,"
+ "testy@town.com,Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more,"
+ "Testy Tester testy2@town.com,cisaRep@igorville.gov,city.com,\n"
+ "city5.gov,,Approved,Federal,Executive,,Testorg,N/A,,NY,2,,,,1,0,city1.gov,Testy,Tester,testy@town.com,"
+ "Chief Tester,Purpose of the site,There is more,Testy Tester testy2@town.com,,city.com,\n"
+ "city6.gov,2024-04-02,Submitted,Federal,Executive,,Testorg,N/A,,NY,2,,,,0,1,city1.gov,Testy,Tester,"
+ "testy@town.com,Chief Tester,Purpose of the site,CISA-first-name CISA-last-name | There is more,"
+ "Testy Tester testy2@town.com,cisaRep@igorville.gov,city.com,"
+ )
+
+ # Normalize line endings and remove commas,
+ # spaces and leading/trailing whitespace
+ csv_content = csv_content.replace(",,", "").replace(",", "").replace(" ", "").replace("\r\n", "\n").strip()
+ expected_content = expected_content.replace(",,", "").replace(",", "").replace(" ", "").strip()
+
+ self.assertEqual(csv_content, expected_content)
+
class HelperFunctions(MockDb):
"""This asserts that 1=1. Its limited usefulness lies in making sure the helper methods stay healthy."""
@@ -741,5 +812,5 @@ def test_get_sliced_requests(self):
"submission_date__lte": self.end_date,
}
submitted_requests_sliced_at_end_date = get_sliced_requests(filter_condition)
- expected_content = [2, 2, 0, 0, 0, 0, 0, 0, 0, 0]
+ expected_content = [3, 2, 0, 0, 0, 0, 1, 0, 0, 1]
self.assertEqual(submitted_requests_sliced_at_end_date, expected_content)
diff --git a/src/registrar/utility/constants.py b/src/registrar/utility/constants.py
index 9ec5e7d5ec..9b6c3f71d3 100644
--- a/src/registrar/utility/constants.py
+++ b/src/registrar/utility/constants.py
@@ -5,3 +5,8 @@ class BranchChoices(models.TextChoices):
EXECUTIVE = "executive", "Executive"
JUDICIAL = "judicial", "Judicial"
LEGISLATIVE = "legislative", "Legislative"
+
+ @classmethod
+ def get_branch_label(cls, branch_name: str):
+ """Returns the associated label for a given org name"""
+ return cls(branch_name).label if branch_name else None
diff --git a/src/registrar/utility/csv_export.py b/src/registrar/utility/csv_export.py
index 6d3079bc14..1a35c81645 100644
--- a/src/registrar/utility/csv_export.py
+++ b/src/registrar/utility/csv_export.py
@@ -1,18 +1,25 @@
import csv
import logging
from datetime import datetime
-from registrar.models.domain import Domain
-from registrar.models.domain_invitation import DomainInvitation
-from registrar.models.domain_request import DomainRequest
-from registrar.models.domain_information import DomainInformation
+from registrar.models import (
+ Domain,
+ DomainInvitation,
+ DomainRequest,
+ DomainInformation,
+ PublicContact,
+ UserDomainRole,
+)
+from django.db.models import QuerySet, Value, CharField, Count, Q, F
+from django.db.models import ManyToManyField
from django.utils import timezone
from django.core.paginator import Paginator
-from django.db.models import F, Value, CharField
from django.db.models.functions import Concat, Coalesce
-
-from registrar.models.public_contact import PublicContact
-from registrar.models.user_domain_role import UserDomainRole
+from django.contrib.postgres.aggregates import StringAgg
+from registrar.models.utility.generic_helper import convert_queryset_to_dict
+from registrar.templatetags.custom_filters import get_region
from registrar.utility.enums import DefaultEmail
+from registrar.utility.constants import BranchChoices
+
logger = logging.getLogger(__name__)
@@ -299,84 +306,6 @@ def write_csv_for_domains(
writer.writerows(total_body_rows)
-def get_requests(filter_condition, sort_fields):
- """
- Returns DomainRequest objects filtered and sorted based on the provided conditions.
- filter_condition -> A dictionary of conditions to filter the objects.
- sort_fields -> A list of fields to sort the resulting query set.
- returns: A queryset of DomainRequest objects
- """
- requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
- return requests
-
-
-def parse_row_for_requests(columns, request: DomainRequest):
- """Given a set of columns, generate a new row from cleaned column data"""
-
- requested_domain_name = "No requested domain"
-
- if request.requested_domain is not None:
- requested_domain_name = request.requested_domain.name
-
- if request.federal_type:
- request_type = f"{request.get_organization_type_display()} - {request.get_federal_type_display()}"
- else:
- request_type = request.get_organization_type_display()
-
- # create a dictionary of fields which can be included in output
- FIELDS = {
- "Requested domain": requested_domain_name,
- "Status": request.get_status_display(),
- "Organization type": request_type,
- "Agency": request.federal_agency,
- "Organization name": request.organization_name,
- "City": request.city,
- "State": request.state_territory,
- "AO email": request.authorizing_official.email if request.authorizing_official else " ",
- "Security contact email": request,
- "Created at": request.created_at,
- "Submission date": request.submission_date,
- }
-
- row = [FIELDS.get(column, "") for column in columns]
- return row
-
-
-def write_csv_for_requests(
- writer,
- columns,
- sort_fields,
- filter_condition,
- should_write_header=True,
-):
- """Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
- Works with write_header as long as the same writer object is passed."""
-
- all_requests = get_requests(filter_condition, sort_fields)
-
- # Reduce the memory overhead when performing the write operation
- paginator = Paginator(all_requests, 1000)
- total_body_rows = []
-
- for page_num in paginator.page_range:
- page = paginator.page(page_num)
- rows = []
- for request in page.object_list:
- try:
- row = parse_row_for_requests(columns, request)
- rows.append(row)
- except ValueError:
- # This should not happen. If it does, just skip this row.
- # It indicates that DomainInformation.domain is None.
- logger.error("csv_export -> Error when parsing row, domain was None")
- continue
- total_body_rows.extend(rows)
-
- if should_write_header:
- write_header(writer, columns)
- writer.writerows(total_body_rows)
-
-
def export_data_type_to_csv(csv_file):
"""
All domains report with extra columns.
@@ -775,30 +704,338 @@ def export_data_unmanaged_domains_to_csv(csv_file, start_date, end_date):
)
-def export_data_requests_growth_to_csv(csv_file, start_date, end_date):
+class DomainRequestExport:
"""
- Growth report:
- Receive start and end dates from the view, parse them.
- Request from write_requests_body SUBMITTED requests that are created between
- the start and end dates. Specify sort params.
+ A collection of functions which return csv files regarding the DomainRequest model.
"""
- start_date_formatted = format_start_date(start_date)
- end_date_formatted = format_end_date(end_date)
- writer = csv.writer(csv_file)
- # define columns to include in export
- columns = [
- "Requested domain",
- "Organization type",
- "Submission date",
- ]
- sort_fields = [
- "requested_domain__name",
+ # Get all columns on the full metadata report
+ all_columns = [
+ "Domain request",
+ "Submitted at",
+ "Status",
+ "Domain type",
+ "Federal type",
+ "Federal agency",
+ "Organization name",
+ "Election office",
+ "City",
+ "State/territory",
+ "Region",
+ "Creator first name",
+ "Creator last name",
+ "Creator email",
+ "Creator approved domains count",
+ "Creator active requests count",
+ "Alternative domains",
+ "AO first name",
+ "AO last name",
+ "AO email",
+ "AO title/role",
+ "Request purpose",
+ "Request additional details",
+ "Other contacts",
+ "CISA regional representative",
+ "Current websites",
+ "Investigator",
]
- filter_condition = {
- "status": DomainRequest.DomainRequestStatus.SUBMITTED,
- "submission_date__lte": end_date_formatted,
- "submission_date__gte": start_date_formatted,
- }
- write_csv_for_requests(writer, columns, sort_fields, filter_condition, should_write_header=True)
+ @classmethod
+ def export_data_requests_growth_to_csv(cls, csv_file, start_date, end_date):
+ """
+ Growth report:
+ Receive start and end dates from the view, parse them.
+ Request from write_requests_body SUBMITTED requests that are created between
+ the start and end dates. Specify sort params.
+ """
+
+ start_date_formatted = format_start_date(start_date)
+ end_date_formatted = format_end_date(end_date)
+ writer = csv.writer(csv_file)
+ # define columns to include in export
+ columns = [
+ "Domain request",
+ "Domain type",
+ "Federal type",
+ "Submitted at",
+ ]
+
+ sort_fields = [
+ "requested_domain__name",
+ ]
+ filter_condition = {
+ "status": DomainRequest.DomainRequestStatus.SUBMITTED,
+ "submission_date__lte": end_date_formatted,
+ "submission_date__gte": start_date_formatted,
+ }
+
+ # We don't want to annotate anything, but we do want to access the requested domain name
+ annotations = {}
+ additional_values = ["requested_domain__name"]
+
+ all_requests = DomainRequest.objects.filter(**filter_condition).order_by(*sort_fields).distinct()
+
+ annotated_requests = cls.annotate_and_retrieve_fields(all_requests, annotations, additional_values)
+ requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
+
+ cls.write_csv_for_requests(writer, columns, requests_dict)
+
+ @classmethod
+ def export_full_domain_request_report(cls, csv_file):
+ """
+ Generates a detailed domain request report to a CSV file.
+
+ Retrieves and annotates DomainRequest objects, excluding 'STARTED' status,
+ with related data optimizations via select/prefetch and annotation.
+
+ Annotated with counts and aggregates of related entities.
+ Converts to dict and writes to CSV using predefined columns.
+
+ Parameters:
+ csv_file (file-like object): Target CSV file.
+ """
+ writer = csv.writer(csv_file)
+
+ requests = (
+ DomainRequest.objects.select_related(
+ "creator", "authorizing_official", "federal_agency", "investigator", "requested_domain"
+ )
+ .prefetch_related("current_websites", "other_contacts", "alternative_domains")
+ .exclude(status__in=[DomainRequest.DomainRequestStatus.STARTED])
+ .order_by(
+ "status",
+ "requested_domain__name",
+ )
+ .distinct()
+ )
+
+ # Annotations are custom columns returned to the queryset (AKA: computed in the DB).
+ annotations = cls._full_domain_request_annotations()
+
+ # The .values returned from annotate_and_retrieve_fields can't go two levels deep
+ # (just returns the field id of say, "creator") - so we have to include this.
+ additional_values = [
+ "requested_domain__name",
+ "federal_agency__agency",
+ "authorizing_official__first_name",
+ "authorizing_official__last_name",
+ "authorizing_official__email",
+ "authorizing_official__title",
+ "creator__first_name",
+ "creator__last_name",
+ "creator__email",
+ "investigator__email",
+ ]
+
+ # Convert the domain request queryset to a dictionary (including annotated fields)
+ annotated_requests = cls.annotate_and_retrieve_fields(requests, annotations, additional_values)
+ requests_dict = convert_queryset_to_dict(annotated_requests, is_model=False)
+
+ # Write the csv file
+ cls.write_csv_for_requests(writer, cls.all_columns, requests_dict)
+
+ @classmethod
+ def _full_domain_request_annotations(cls, delimiter=" | "):
+ """Returns the annotations for the full domain request report"""
+ return {
+ "creator_approved_domains_count": DomainRequestExport.get_creator_approved_domains_count_query(),
+ "creator_active_requests_count": DomainRequestExport.get_creator_active_requests_count_query(),
+ "all_current_websites": StringAgg("current_websites__website", delimiter=delimiter, distinct=True),
+ "all_alternative_domains": StringAgg("alternative_domains__website", delimiter=delimiter, distinct=True),
+ # Coerce the other contacts object to "{first_name} {last_name} {email}"
+ "all_other_contacts": StringAgg(
+ Concat(
+ "other_contacts__first_name",
+ Value(" "),
+ "other_contacts__last_name",
+ Value(" "),
+ "other_contacts__email",
+ ),
+ delimiter=delimiter,
+ distinct=True,
+ ),
+ }
+
+ @staticmethod
+ def write_csv_for_requests(
+ writer,
+ columns,
+ requests_dict,
+ should_write_header=True,
+ ):
+ """Receives params from the parent methods and outputs a CSV with filtered and sorted requests.
+ Works with write_header as long as the same writer object is passed."""
+
+ rows = []
+ for request in requests_dict.values():
+ try:
+ row = DomainRequestExport.parse_row_for_requests(columns, request)
+ rows.append(row)
+ except ValueError as err:
+ logger.error(f"csv_export -> Error when parsing row: {err}")
+ continue
+
+ if should_write_header:
+ write_header(writer, columns)
+
+ writer.writerows(rows)
+
+ @staticmethod
+ def parse_row_for_requests(columns, request):
+ """
+ Given a set of columns and a request dictionary, generate a new row from cleaned column data.
+ """
+
+ # Handle the federal_type field. Defaults to the wrong format.
+ federal_type = request.get("federal_type")
+ human_readable_federal_type = BranchChoices.get_branch_label(federal_type) if federal_type else None
+
+ # Handle the org_type field
+ org_type = request.get("generic_org_type") or request.get("organization_type")
+ human_readable_org_type = DomainRequest.OrganizationChoices.get_org_label(org_type) if org_type else None
+
+ # Handle the status field. Defaults to the wrong format.
+ status = request.get("status")
+ status_display = DomainRequest.DomainRequestStatus.get_status_label(status) if status else None
+
+ # Handle the region field.
+ state_territory = request.get("state_territory")
+ region = get_region(state_territory) if state_territory else None
+
+ # Handle the requested_domain field (add a default if None)
+ requested_domain = request.get("requested_domain__name")
+ requested_domain_name = requested_domain if requested_domain else "No requested domain"
+
+ # Handle the election field. N/A if None, "Yes"/"No" if boolean
+ human_readable_election_board = "N/A"
+ is_election_board = request.get("is_election_board")
+ if is_election_board is not None:
+ human_readable_election_board = "Yes" if is_election_board else "No"
+
+ # Handle the additional details field. Pipe seperated.
+ cisa_rep_first = request.get("cisa_representative_first_name")
+ cisa_rep_last = request.get("cisa_representative_last_name")
+ name = [n for n in [cisa_rep_first, cisa_rep_last] if n]
+
+ cisa_rep = " ".join(name) if name else None
+ details = [cisa_rep, request.get("anything_else")]
+ additional_details = " | ".join([field for field in details if field])
+
+ # create a dictionary of fields which can be included in output.
+ # "extra_fields" are precomputed fields (generated in the DB or parsed).
+ FIELDS = {
+ # Parsed fields - defined above.
+ "Domain request": requested_domain_name,
+ "Region": region,
+ "Status": status_display,
+ "Election office": human_readable_election_board,
+ "Federal type": human_readable_federal_type,
+ "Domain type": human_readable_org_type,
+ "Request additional details": additional_details,
+ # Annotated fields - passed into the request dict.
+ "Creator approved domains count": request.get("creator_approved_domains_count", 0),
+ "Creator active requests count": request.get("creator_active_requests_count", 0),
+ "Alternative domains": request.get("all_alternative_domains"),
+ "Other contacts": request.get("all_other_contacts"),
+ "Current websites": request.get("all_current_websites"),
+ # Untouched FK fields - passed into the request dict.
+ "Federal agency": request.get("federal_agency__agency"),
+ "AO first name": request.get("authorizing_official__first_name"),
+ "AO last name": request.get("authorizing_official__last_name"),
+ "AO email": request.get("authorizing_official__email"),
+ "AO title/role": request.get("authorizing_official__title"),
+ "Creator first name": request.get("creator__first_name"),
+ "Creator last name": request.get("creator__last_name"),
+ "Creator email": request.get("creator__email"),
+ "Investigator": request.get("investigator__email"),
+ # Untouched fields
+ "Organization name": request.get("organization_name"),
+ "City": request.get("city"),
+ "State/territory": request.get("state_territory"),
+ "Request purpose": request.get("purpose"),
+ "CISA regional representative": request.get("cisa_representative_email"),
+ "Submitted at": request.get("submission_date"),
+ }
+
+ row = [FIELDS.get(column, "") for column in columns]
+ return row
+
+ @classmethod
+ def annotate_and_retrieve_fields(
+ cls, requests, annotations, additional_values=None, include_many_to_many=False
+ ) -> QuerySet:
+ """
+ Applies annotations to a queryset and retrieves specified fields,
+ including class-defined and annotation-defined.
+
+ Parameters:
+ requests (QuerySet): Initial queryset.
+ annotations (dict, optional): Fields to compute {field_name: expression}.
+ additional_values (list, optional): Extra fields to retrieve; defaults to annotation keys if None.
+ include_many_to_many (bool, optional): Determines if we should include many to many fields or not
+
+ Returns:
+ QuerySet: Contains dictionaries with the specified fields for each record.
+ """
+
+ if additional_values is None:
+ additional_values = []
+
+ # We can infer that if we're passing in annotations,
+ # we want to grab the result of said annotation.
+ if annotations:
+ additional_values.extend(annotations.keys())
+
+ # Get prexisting fields on DomainRequest
+ domain_request_fields = set()
+ for field in DomainRequest._meta.get_fields():
+ # Exclude many to many fields unless we specify
+ many_to_many = isinstance(field, ManyToManyField) and include_many_to_many
+ if many_to_many or not isinstance(field, ManyToManyField):
+ domain_request_fields.add(field.name)
+
+ queryset = requests.annotate(**annotations).values(*domain_request_fields, *additional_values)
+ return queryset
+
+ # ============================================================= #
+ # Helper functions for django ORM queries. #
+ # We are using these rather than pure python for speed reasons. #
+ # ============================================================= #
+
+ @staticmethod
+ def get_creator_approved_domains_count_query():
+ """
+ Generates a Count query for distinct approved domain requests per creator.
+
+ Returns:
+ Count: Aggregates distinct 'APPROVED' domain requests by creator.
+ """
+
+ query = Count(
+ "creator__domain_requests_created__id",
+ filter=Q(creator__domain_requests_created__status=DomainRequest.DomainRequestStatus.APPROVED),
+ distinct=True,
+ )
+ return query
+
+ @staticmethod
+ def get_creator_active_requests_count_query():
+ """
+ Generates a Count query for distinct approved domain requests per creator.
+
+ Returns:
+ Count: Aggregates distinct 'SUBMITTED', 'IN_REVIEW', and 'ACTION_NEEDED' domain requests by creator.
+ """
+
+ query = Count(
+ "creator__domain_requests_created__id",
+ filter=Q(
+ creator__domain_requests_created__status__in=[
+ DomainRequest.DomainRequestStatus.SUBMITTED,
+ DomainRequest.DomainRequestStatus.IN_REVIEW,
+ DomainRequest.DomainRequestStatus.ACTION_NEEDED,
+ ]
+ ),
+ distinct=True,
+ )
+ return query
diff --git a/src/registrar/views/admin_views.py b/src/registrar/views/admin_views.py
index 01a8157f96..f1baa72bd0 100644
--- a/src/registrar/views/admin_views.py
+++ b/src/registrar/views/admin_views.py
@@ -164,6 +164,17 @@ def get(self, request, *args, **kwargs):
return response
+class ExportDomainRequestDataFull(View):
+ """Generates a downloaded report containing all Domain Requests (except started)"""
+
+ def get(self, request, *args, **kwargs):
+ """Returns a content disposition response for current-full-domain-request.csv"""
+ response = HttpResponse(content_type="text/csv")
+ response["Content-Disposition"] = 'attachment; filename="current-full-domain-request.csv"'
+ csv_export.DomainRequestExport.export_full_domain_request_report(response)
+ return response
+
+
class ExportDataDomainsGrowth(View):
def get(self, request, *args, **kwargs):
# Get start_date and end_date from the request's GET parameters
@@ -191,7 +202,7 @@ def get(self, request, *args, **kwargs):
response["Content-Disposition"] = f'attachment; filename="requests-{start_date}-to-{end_date}.csv"'
# For #999: set export_data_domain_growth_to_csv to return the resulting queryset, which we can then use
# in context to display this data in the template.
- csv_export.export_data_requests_growth_to_csv(response, start_date, end_date)
+ csv_export.DomainRequestExport.export_data_requests_growth_to_csv(response, start_date, end_date)
return response