Skip to content

Commit

Permalink
bigquery: add retry logic
Browse files Browse the repository at this point in the history
Many clearly idempotent methods did not retry. Added retry logic to them.

Also, minor style tweaks like better variable names.

Fixes #690.

Change-Id: I87d63728338876bf421ce9803dfe3d543520ba69
Reviewed-on: https://code-review.googlesource.com/14452
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Michael Darakananda <pongad@google.com>
  • Loading branch information
jba committed Jul 3, 2017
1 parent a5913b3 commit b419efa
Showing 1 changed file with 65 additions and 38 deletions.
103 changes: 65 additions & 38 deletions bigquery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,11 @@ func (s *bigqueryService) insertRows(ctx context.Context, projectID, datasetID,
Json: m,
})
}
call := s.s.Tabledata.InsertAll(projectID, datasetID, tableID, req).Context(ctx)
setClientHeader(call.Header())
var res *bq.TableDataInsertAllResponse
err := runWithRetry(ctx, func() error {
var err error
req := s.s.Tabledata.InsertAll(projectID, datasetID, tableID, req).Context(ctx)
setClientHeader(req.Header())
res, err = req.Do()
err := runWithRetry(ctx, func() (err error) {
res, err = call.Do()
return err
})
if err != nil {
Expand Down Expand Up @@ -309,18 +308,15 @@ func (s *bigqueryService) insertRows(ctx context.Context, projectID, datasetID,
}

func (s *bigqueryService) getJob(ctx context.Context, projectID, jobID string) (*Job, error) {
res, err := s.s.Jobs.Get(projectID, jobID).
Fields("configuration").
Context(ctx).
Do()
job, err := s.getJobInternal(ctx, projectID, jobID, "configuration")
if err != nil {
return nil, err
}
var isQuery bool
var dest *bq.TableReference
if res.Configuration.Query != nil {
if job.Configuration.Query != nil {
isQuery = true
dest = res.Configuration.Query.DestinationTable
dest = job.Configuration.Query.DestinationTable
}
return &Job{
projectID: projectID,
Expand All @@ -330,35 +326,49 @@ func (s *bigqueryService) getJob(ctx context.Context, projectID, jobID string) (
}, nil
}

func (s *bigqueryService) jobCancel(ctx context.Context, projectID, jobID string) error {
// Jobs.Cancel returns a job entity, but the only relevant piece of
// data it may contain (the status of the job) is unreliable. From the
// docs: "This call will return immediately, and the client will need
// to poll for the job status to see if the cancel completed
// successfully". So it would be misleading to return a status.
_, err := s.s.Jobs.Cancel(projectID, jobID).
Fields(). // We don't need any of the response data.
Context(ctx).
Do()
return err
}

func (s *bigqueryService) jobStatus(ctx context.Context, projectID, jobID string) (*JobStatus, error) {
res, err := s.s.Jobs.Get(projectID, jobID).
Fields("status", "statistics"). // Only fetch what we need.
Context(ctx).
Do()
job, err := s.getJobInternal(ctx, projectID, jobID, "status", "statistics")
if err != nil {
return nil, err
}
st, err := jobStatusFromProto(res.Status)
st, err := jobStatusFromProto(job.Status)
if err != nil {
return nil, err
}
st.Statistics = jobStatisticsFromProto(res.Statistics)
st.Statistics = jobStatisticsFromProto(job.Statistics)
return st, nil
}

func (s *bigqueryService) getJobInternal(ctx context.Context, projectID, jobID string, fields ...googleapi.Field) (*bq.Job, error) {
var job *bq.Job
err := runWithRetry(ctx, func() (err error) {
job, err = s.s.Jobs.Get(projectID, jobID).
Fields(fields...).
Context(ctx).
Do()
return err
})
if err != nil {
return nil, err
}
return job, nil
}

func (s *bigqueryService) jobCancel(ctx context.Context, projectID, jobID string) error {
// Jobs.Cancel returns a job entity, but the only relevant piece of
// data it may contain (the status of the job) is unreliable. From the
// docs: "This call will return immediately, and the client will need
// to poll for the job status to see if the cancel completed
// successfully". So it would be misleading to return a status.
return runWithRetry(ctx, func() error {
_, err := s.s.Jobs.Cancel(projectID, jobID).
Fields(). // We don't need any of the response data.
Context(ctx).
Do()
return err
})
}

var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}

func jobStatusFromProto(status *bq.JobStatus) (*JobStatus, error) {
Expand Down Expand Up @@ -465,7 +475,11 @@ func (s *bigqueryService) listTables(ctx context.Context, projectID, datasetID s
if pageSize > 0 {
req.MaxResults(int64(pageSize))
}
res, err := req.Do()
var res *bq.TableList
err := runWithRetry(ctx, func() (err error) {
res, err = req.Do()
return err
})
if err != nil {
return nil, "", err
}
Expand All @@ -491,6 +505,7 @@ type createTableConf struct {
// Note: after table creation, a view can be modified only if its table was initially created with a view.
func (s *bigqueryService) createTable(ctx context.Context, conf *createTableConf) error {
table := &bq.Table{
// TODO(jba): retry? Is this always idempotent?
TableReference: &bq.TableReference{
ProjectId: conf.projectID,
DatasetId: conf.datasetID,
Expand Down Expand Up @@ -529,7 +544,11 @@ func (s *bigqueryService) createTable(ctx context.Context, conf *createTableConf
func (s *bigqueryService) getTableMetadata(ctx context.Context, projectID, datasetID, tableID string) (*TableMetadata, error) {
req := s.s.Tables.Get(projectID, datasetID, tableID).Context(ctx)
setClientHeader(req.Header())
table, err := req.Do()
var table *bq.Table
err := runWithRetry(ctx, func() (err error) {
table, err = req.Do()
return err
})
if err != nil {
return nil, err
}
Expand All @@ -539,7 +558,7 @@ func (s *bigqueryService) getTableMetadata(ctx context.Context, projectID, datas
func (s *bigqueryService) deleteTable(ctx context.Context, projectID, datasetID, tableID string) error {
req := s.s.Tables.Delete(projectID, datasetID, tableID).Context(ctx)
setClientHeader(req.Header())
return req.Do()
return runWithRetry(ctx, func() error { return req.Do() })
}

func bqTableToMetadata(t *bq.Table) *TableMetadata {
Expand Down Expand Up @@ -643,6 +662,7 @@ func (s *bigqueryService) patchTable(ctx context.Context, projectID, datasetID,
}

func (s *bigqueryService) insertDataset(ctx context.Context, datasetID, projectID string) error {
// TODO(jba): retry?
ds := &bq.Dataset{
DatasetReference: &bq.DatasetReference{DatasetId: datasetID},
}
Expand All @@ -655,17 +675,20 @@ func (s *bigqueryService) insertDataset(ctx context.Context, datasetID, projectI
func (s *bigqueryService) deleteDataset(ctx context.Context, datasetID, projectID string) error {
req := s.s.Datasets.Delete(projectID, datasetID).Context(ctx)
setClientHeader(req.Header())
return req.Do()
return runWithRetry(ctx, func() error { return req.Do() })
}

func (s *bigqueryService) getDatasetMetadata(ctx context.Context, projectID, datasetID string) (*DatasetMetadata, error) {
req := s.s.Datasets.Get(projectID, datasetID).Context(ctx)
setClientHeader(req.Header())
table, err := req.Do()
if err != nil {
var ds *bq.Dataset
if err := runWithRetry(ctx, func() (err error) {
ds, err = req.Do()
return err
}); err != nil {
return nil, err
}
return bqDatasetToMetadata(table), nil
return bqDatasetToMetadata(ds), nil
}

func (s *bigqueryService) listDatasets(ctx context.Context, projectID string, maxResults int, pageToken string, all bool, filter string) ([]*Dataset, string, error) {
Expand All @@ -680,7 +703,11 @@ func (s *bigqueryService) listDatasets(ctx context.Context, projectID string, ma
if filter != "" {
req.Filter(filter)
}
res, err := req.Do()
var res *bq.DatasetList
err := runWithRetry(ctx, func() (err error) {
res, err = req.Do()
return err
})
if err != nil {
return nil, "", err
}
Expand Down

0 comments on commit b419efa

Please sign in to comment.