Skip to content

Commit

Permalink
Updated error handling
Browse files Browse the repository at this point in the history
Signed-off-by: sonali wale <sonali.wale@progress.com>
  • Loading branch information
sonali wale authored and Yashvi Jain committed Feb 9, 2022
1 parent 53f3023 commit 8e7f95a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 76 deletions.
8 changes: 4 additions & 4 deletions components/infra-proxy-service/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"os"
"path"

"github.com/chef/automate/components/infra-proxy-service/pipeline"

"github.com/chef/automate/api/interservice/infra_proxy/migrations/request"
"github.com/chef/automate/api/interservice/infra_proxy/migrations/response"
"github.com/chef/automate/api/interservice/infra_proxy/migrations/service"
Expand Down Expand Up @@ -42,11 +40,13 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF
_, err = s.service.Migration.StartMigration(ctx, migrationId, serverId)
if err != nil {
log.Errorf("Unable to insert the migration status Start Migration for migration id : %s", migrationId)
return err
}
fileData := bytes.Buffer{}
_, err = s.service.Migration.StartFileUpload(ctx, migrationId, serverId)
if err != nil {
log.Errorf("Unable to insert the migration status Start File upload for migration id : %s", migrationId)
return err
}
for {
req, err := stream.Recv()
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF
}

pipelineResult := pipeline_model.Result{Meta: pipeline_model.Meta{ZipFile: fileName}}
s.phaseOnePipeline.Run(pipelineResult)
go s.phaseOnePipeline.Run(pipelineResult)
return nil
}

Expand Down Expand Up @@ -332,7 +332,7 @@ func (s *MigrationServer) ConfirmPreview(ctx context.Context, req *request.Confi
}

// call pipeline function to trigger the phase 2 pipeline
s.phaseTwoPipeline.Run(migrationStage.StagedData)
go s.phaseTwoPipeline.Run(migrationStage.StagedData)

return &response.ConfirmPreview{
MigrationId: req.MigrationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pipeline

import (
"context"
"fmt"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -35,7 +34,7 @@ func UnzipSrc() PhaseOnePipelineProcessor {
}

func unzip(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting unzip pipeline")
log.Info("Starting unzip pipeline")
out := make(chan PipelineData, 100)
go func() {

Expand All @@ -47,7 +46,7 @@ func unzip(result <-chan PipelineData) <-chan PipelineData {
res.Done <- nil
}
}
fmt.Println("Closing unzip")
log.Info("Closing unzip")
close(out)
}()
return out
Expand All @@ -61,21 +60,21 @@ func ParseOrg() PhaseOnePipelineProcessor {
}

func parseOrg(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to parse_orgs pipeline")
log.Info("Starting to parse_orgs pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to parse orgs...")
log.Info("Processing to parse orgs...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
fmt.Println("after write")
log.Info("after write")
}
fmt.Println("CLosing parse_orgs pipeline")
log.Info("CLosing parse_orgs pipeline")
close(out)
}()
return out
Expand All @@ -89,20 +88,20 @@ func ParseUser() PhaseOnePipelineProcessor {
}

func parseUser(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to parse_user pipeline")
log.Info("Starting to parse_user pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to parse_user...")
log.Info("Processing to parse_user...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing parse_user")
log.Info("Closing parse_user")
close(out)
}()

Expand All @@ -117,12 +116,12 @@ func ConflictingUsers() PhaseOnePipelineProcessor {
}

func conflictingUsers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to conflicting_user check pipeline")
log.Info("Starting to conflicting_user check pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to conflicting_user users...")
log.Info("Processing to conflicting_user users...")

for res := range result {
select {
Expand All @@ -131,7 +130,7 @@ func conflictingUsers(result <-chan PipelineData) <-chan PipelineData {
res.Done <- nil
}
}
fmt.Println("Closing conflicting_user")
log.Info("Closing conflicting_user")
close(out)

}()
Expand All @@ -147,20 +146,20 @@ func OrgMembers() PhaseOnePipelineProcessor {
}

func orgMembers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to org_user check pipeline")
log.Info("Starting to org_user check pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to check org_user association...")
log.Info("Processing to check org_user association...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing org_user association check")
log.Info("Closing org_user association check")
close(out)

}()
Expand All @@ -176,20 +175,20 @@ func AdminUsers() PhaseOnePipelineProcessor {
}

func adminUsers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting org admin_users check ")
log.Info("Starting org admin_users check ")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to to check admin_users...")
log.Info("Processing to to check admin_users...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing admin_users")
log.Info("Closing admin_users")
close(out)

}()
Expand All @@ -198,7 +197,7 @@ func adminUsers(result <-chan PipelineData) <-chan PipelineData {
}

func migrationPipeline(source <-chan PipelineData, pipes ...PhaseOnePipelineProcessor) {
fmt.Println("Pipeline started...")
log.Info("Pipeline started...")
status := make(chan string)
go func() {
for _, pipe := range pipes {
Expand Down Expand Up @@ -227,23 +226,18 @@ func SetupPhaseOnePipeline() PhaseOnePipleine {
}

func (p *PhaseOnePipleine) Run(result pipeline.Result) {
status := make(chan string)
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan error)
select {
case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}:
}
err := <-done
if err != nil {
MigrationError(err, Mig, ctx, result.Meta.MigrationID, result.Meta.ServerID)
log.Errorf("Phase one pipeline received error for migration %s: %s", result.Meta.MigrationID, err)
}
log.Println("received done")
status <- "Done"
}()
<-status
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan error)
select {
case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}:
}
err := <-done
if err != nil {
MigrationError(err, Mig, ctx, result.Meta.MigrationID, result.Meta.ServerID)
log.Errorf("Phase one pipeline received error for migration %s: %s", result.Meta.MigrationID, err)
}
log.Info("received done")
}

func MigrationError(err error, st storage.MigrationStorage, ctx context.Context, migrationId, serviceId string) {
Expand Down
Loading

0 comments on commit 8e7f95a

Please sign in to comment.