diff --git a/components/infra-proxy-service/migrations/migrations.go b/components/infra-proxy-service/migrations/migrations.go index c0e2bea6102..cd7a29ac475 100644 --- a/components/infra-proxy-service/migrations/migrations.go +++ b/components/infra-proxy-service/migrations/migrations.go @@ -8,8 +8,6 @@ import ( "os" "path" - "github.com/chef/automate/components/infra-proxy-service/pipeline" - "github.com/chef/automate/api/interservice/infra_proxy/migrations/request" "github.com/chef/automate/api/interservice/infra_proxy/migrations/response" "github.com/chef/automate/api/interservice/infra_proxy/migrations/service" @@ -42,11 +40,13 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF _, err = s.service.Migration.StartMigration(ctx, migrationId, serverId) if err != nil { log.Errorf("Unable to insert the migration status Start Migration for migration id : %s", migrationId) + return err } fileData := bytes.Buffer{} _, err = s.service.Migration.StartFileUpload(ctx, migrationId, serverId) if err != nil { log.Errorf("Unable to insert the migration status Start File upload for migration id : %s", migrationId) + return err } for { req, err := stream.Recv() @@ -104,7 +104,7 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF } pipelineResult := pipeline_model.Result{Meta: pipeline_model.Meta{ZipFile: fileName}} - s.phaseOnePipeline.Run(pipelineResult) + go s.phaseOnePipeline.Run(pipelineResult) return nil } @@ -332,7 +332,7 @@ func (s *MigrationServer) ConfirmPreview(ctx context.Context, req *request.Confi } // call pipeline function to trigger the phase 2 pipeline - s.phaseTwoPipeline.Run(migrationStage.StagedData) + go s.phaseTwoPipeline.Run(migrationStage.StagedData) return &response.ConfirmPreview{ MigrationId: req.MigrationId, diff --git a/components/infra-proxy-service/migrations/pipeline/phaseonemigration.go b/components/infra-proxy-service/migrations/pipeline/phaseonemigration.go index eeb5ca405ea..8f1e1854bcd 100644 --- a/components/infra-proxy-service/migrations/pipeline/phaseonemigration.go +++ b/components/infra-proxy-service/migrations/pipeline/phaseonemigration.go @@ -2,7 +2,6 @@ package pipeline import ( "context" - "fmt" log "github.com/sirupsen/logrus" @@ -35,7 +34,7 @@ func UnzipSrc() PhaseOnePipelineProcessor { } func unzip(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting unzip pipeline") + log.Info("Starting unzip pipeline") out := make(chan PipelineData, 100) go func() { @@ -47,7 +46,7 @@ func unzip(result <-chan PipelineData) <-chan PipelineData { res.Done <- nil } } - fmt.Println("Closing unzip") + log.Info("Closing unzip") close(out) }() return out @@ -61,21 +60,21 @@ func ParseOrg() PhaseOnePipelineProcessor { } func parseOrg(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting to parse_orgs pipeline") + log.Info("Starting to parse_orgs pipeline") out := make(chan PipelineData, 100) go func() { - fmt.Println("Processing to parse orgs...") + log.Info("Processing to parse orgs...") for res := range result { select { case out <- res: case <-res.Ctx.Done(): res.Done <- nil } - fmt.Println("after write") + log.Info("after write") } - fmt.Println("CLosing parse_orgs pipeline") + log.Info("CLosing parse_orgs pipeline") close(out) }() return out @@ -89,12 +88,12 @@ func ParseUser() PhaseOnePipelineProcessor { } func parseUser(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting to parse_user pipeline") + log.Info("Starting to parse_user pipeline") out := make(chan PipelineData, 100) go func() { - fmt.Println("Processing to parse_user...") + log.Info("Processing to parse_user...") for res := range result { select { case out <- res: @@ -102,7 +101,7 @@ func parseUser(result <-chan PipelineData) <-chan PipelineData { res.Done <- nil } } - fmt.Println("Closing parse_user") + log.Info("Closing parse_user") close(out) }() @@ -117,12 +116,12 @@ func ConflictingUsers() PhaseOnePipelineProcessor { } func conflictingUsers(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting to conflicting_user check pipeline") + log.Info("Starting to conflicting_user check pipeline") out := make(chan PipelineData, 100) go func() { - fmt.Println("Processing to conflicting_user users...") + log.Info("Processing to conflicting_user users...") for res := range result { select { @@ -131,7 +130,7 @@ func conflictingUsers(result <-chan PipelineData) <-chan PipelineData { res.Done <- nil } } - fmt.Println("Closing conflicting_user") + log.Info("Closing conflicting_user") close(out) }() @@ -147,12 +146,12 @@ func OrgMembers() PhaseOnePipelineProcessor { } func orgMembers(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting to org_user check pipeline") + log.Info("Starting to org_user check pipeline") out := make(chan PipelineData, 100) go func() { - fmt.Println("Processing to check org_user association...") + log.Info("Processing to check org_user association...") for res := range result { select { case out <- res: @@ -160,7 +159,7 @@ func orgMembers(result <-chan PipelineData) <-chan PipelineData { res.Done <- nil } } - fmt.Println("Closing org_user association check") + log.Info("Closing org_user association check") close(out) }() @@ -176,12 +175,12 @@ func AdminUsers() PhaseOnePipelineProcessor { } func adminUsers(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting org admin_users check ") + log.Info("Starting org admin_users check ") out := make(chan PipelineData, 100) go func() { - fmt.Println("Processing to to check admin_users...") + log.Info("Processing to to check admin_users...") for res := range result { select { case out <- res: @@ -189,7 +188,7 @@ func adminUsers(result <-chan PipelineData) <-chan PipelineData { res.Done <- nil } } - fmt.Println("Closing admin_users") + log.Info("Closing admin_users") close(out) }() @@ -198,7 +197,7 @@ func adminUsers(result <-chan PipelineData) <-chan PipelineData { } func migrationPipeline(source <-chan PipelineData, pipes ...PhaseOnePipelineProcessor) { - fmt.Println("Pipeline started...") + log.Info("Pipeline started...") status := make(chan string) go func() { for _, pipe := range pipes { @@ -227,23 +226,18 @@ func SetupPhaseOnePipeline() PhaseOnePipleine { } func (p *PhaseOnePipleine) Run(result pipeline.Result) { - status := make(chan string) - go func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - done := make(chan error) - select { - case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}: - } - err := <-done - if err != nil { - MigrationError(err, Mig, ctx, result.Meta.MigrationID, result.Meta.ServerID) - log.Errorf("Phase one pipeline received error for migration %s: %s", result.Meta.MigrationID, err) - } - log.Println("received done") - status <- "Done" - }() - <-status + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error) + select { + case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}: + } + err := <-done + if err != nil { + MigrationError(err, Mig, ctx, result.Meta.MigrationID, result.Meta.ServerID) + log.Errorf("Phase one pipeline received error for migration %s: %s", result.Meta.MigrationID, err) + } + log.Info("received done") } func MigrationError(err error, st storage.MigrationStorage, ctx context.Context, migrationId, serviceId string) { diff --git a/components/infra-proxy-service/migrations/pipeline/phasetwomigration.go b/components/infra-proxy-service/migrations/pipeline/phasetwomigration.go index 2b1512d8e67..3ae1ad42aa9 100644 --- a/components/infra-proxy-service/migrations/pipeline/phasetwomigration.go +++ b/components/infra-proxy-service/migrations/pipeline/phasetwomigration.go @@ -2,7 +2,6 @@ package pipeline import ( "context" - "fmt" log "github.com/sirupsen/logrus" @@ -23,19 +22,19 @@ func PopulateOrgs() PhaseTwoPipelineProcessor { } func populateOrgs(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting populateOrgs routine") + log.Info("Starting populateOrgs routine") out := make(chan PipelineData, 100) go func() { for res := range result { - fmt.Println("Processing to populateOrgs...") + log.Info("Processing to populateOrgs...") select { case out <- res: case <-res.Ctx.Done(): res.Done <- nil } } - fmt.Println("Closing populateOrgs routine") + log.Info("Closing populateOrgs routine") close(out) }() return out @@ -49,19 +48,19 @@ func CreateProject() PhaseTwoPipelineProcessor { } func createProject(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting CreateProject routine") + log.Info("Starting CreateProject routine") out := make(chan PipelineData, 100) go func() { for res := range result { - fmt.Println("Processing to createProject...") + log.Info("Processing to createProject...") select { case out <- res: case <-res.Ctx.Done(): res.Done <- nil } } - fmt.Println("Closing CreateProject routine") + log.Info("Closing CreateProject routine") close(out) }() return out @@ -75,19 +74,19 @@ func PopulateUsers() PhaseTwoPipelineProcessor { } func populateUsers(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting PopulateUsers routine") + log.Info("Starting PopulateUsers routine") out := make(chan PipelineData, 100) go func() { for res := range result { - fmt.Println("Processing to populateUsers...") + log.Info("Processing to populateUsers...") select { case out <- res: case <-res.Ctx.Done(): res.Done <- nil } } - fmt.Println("Closing PopulateUsers routine") + log.Info("Closing PopulateUsers routine") close(out) }() return out @@ -101,19 +100,19 @@ func PopulateORGUser() PhaseTwoPipelineProcessor { } func populateORGUser(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting PopulateORGUser routine") + log.Info("Starting PopulateORGUser routine") out := make(chan PipelineData, 100) go func() { for res := range result { - fmt.Println("Processing to populateORGUser...") + log.Info("Processing to populateORGUser...") select { case out <- res: case <-res.Ctx.Done(): res.Done <- nil } } - fmt.Println("Closing PopulateORGUser routine") + log.Info("Closing PopulateORGUser routine") close(out) }() return out @@ -127,26 +126,26 @@ func PopulateMembersPolicy() PhaseTwoPipelineProcessor { } func populateMembersPolicy(result <-chan PipelineData) <-chan PipelineData { - fmt.Println("Starting PopulateMembersPolicy routine") + log.Info("Starting PopulateMembersPolicy routine") out := make(chan PipelineData, 100) go func() { for res := range result { - fmt.Println("Processing to populateMembersPolicy...") + log.Info("Processing to populateMembersPolicy...") select { case out <- res: case <-res.Ctx.Done(): res.Done <- nil } } - fmt.Println("Closing PopulateMembersPolicy routine") + log.Info("Closing PopulateMembersPolicy routine") close(out) }() return out } func migrationTwoPipeline(source <-chan PipelineData, pipes ...PhaseTwoPipelineProcessor) { - fmt.Println("Pipeline started...") + log.Info("Pipeline started...") status := make(chan string) go func() { for _, pipe := range pipes { @@ -174,22 +173,18 @@ func SetupPhaseTwoPipeline() PhaseTwoPipleine { } func (p *PhaseTwoPipleine) Run(result pipeline.Result) { - status := make(chan string) - go func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - done := make(chan error) - select { - case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}: - } - err := <-done - if err != nil { - MigrationError(err, Mig, ctx, result.Meta.MigrationID, result.Meta.ServerID) - log.Errorf("Phase two pipeline received error for migration %s: %s", result.Meta.MigrationID, err) - } - MigrationSuccess(Mig, ctx, result.Meta.MigrationID, result.Meta.ServerID) - log.Println("received done") - status <- "Done" - }() - <-status + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error) + select { + case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}: + } + err := <-done + if err != nil { + MigrationError(err, Mig, ctx, result.Meta.MigrationID, result.Meta.ServerID) + log.Errorf("Phase two pipeline received error for migration %s: %s", result.Meta.MigrationID, err) + } + MigrationSuccess(Mig, ctx, result.Meta.MigrationID, result.Meta.ServerID) + log.Info("received done") + }