Skip to content

Commit

Permalink
Ensure WRITE_TRUNCATE BigQuery loads to the same table are processed …
Browse files Browse the repository at this point in the history
…together.

This fixes apache#24535.
  • Loading branch information
robertwb committed Dec 5, 2022
1 parent cb329e4 commit bd0f710
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,17 @@ def start_bundle(self):
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
self.pending_jobs = []

def process(self, element, job_name_prefix=None, unused_schema_mod_jobs=None):
destination = element[0]
job_reference = element[1]
def process(
self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
if isinstance(element_list, tuple):
# Allow this for streaming update compatibility while fixing BEAM-24535.
self.process_one(element_list)
else:
for element in element_list:
self.process_one(element)

def process_one(self, element, job_name_prefix):
destination, job_reference = element

copy_to_reference = bigquery_tools.parse_table_reference(destination)
if copy_to_reference.projectId is None:
Expand Down Expand Up @@ -1085,8 +1093,24 @@ def _load_data(
load_job_project_id=self.load_job_project_id),
schema_mod_job_name_pcv))

if self.create_disposition == 'WRITE_TRUNCATE':
# All loads going to the same table must be processed together so that
# the truncation happens only once. See BEAM-24535.
finished_temp_tables_load_job_ids_list_pc = (
finished_temp_tables_load_job_ids_pc | beam.MapTuple(
lambda destination,
job_reference: (
bigquery_tools.parse_table_reference(destination).tableId,
(destination, job_reference)))
| beam.GroupByKey()
| beam.MapTuple(lambda tableId, batch: list(batch)))
else:
# Loads can happen in parallel.
finished_temp_tables_load_job_ids_list_pc = (
finished_temp_tables_load_job_ids_pc | beam.Map(lambda x: [x]))

copy_job_outputs = (
finished_temp_tables_load_job_ids_pc
finished_temp_tables_load_job_ids_list_pc
| beam.ParDo(
TriggerCopyJobs(
project=self.project,
Expand Down

0 comments on commit bd0f710

Please sign in to comment.