From 8eb10e2c76d19fd3bb0ee05ab183a1796f77feaf Mon Sep 17 00:00:00 2001 From: Dave Augustus <95467821+dave-augustus@users.noreply.github.com> Date: Fri, 4 Feb 2022 22:48:19 +0530 Subject: [PATCH] Feature/migration pipe (#6647) * Pipeline demo * Demo Pipeline Signed-off-by: Pappu Kumar * Based on PR Review Signed-off-by: Pappu Kumar * Models Signed-off-by: Pappu Kumar * Updateed select statement Signed-off-by: Pappu Kumar * Removed package-lock.json Signed-off-by: Pappu Kumar * Pipeline to process and close the go routines Signed-off-by: Kallol Roy * Feature/models (#6655) * Pipeline demo * Demo Pipeline Signed-off-by: Pappu Kumar * Based on PR Review Signed-off-by: Pappu Kumar * Models Signed-off-by: Pappu Kumar * Models done * Package JSON change Signed-off-by: Pappu Kumar * Package JSON change Signed-off-by: Pappu Kumar * Package JSON change Signed-off-by: Pappu Kumar * Setup pipeline to process data Signed-off-by: Kallol Roy * Some comment corrections Signed-off-by: Pappu Kumar Co-authored-by: Kallol Roy --- .../migrations/migrations.go | 5 +- .../migrations/pipeline/phaseonemigration.go | 231 ++++++++++++++++++ .../infra-proxy-service/migrations/server.go | 8 +- .../infra-proxy-service/pipeline/models.go | 115 +++++++++ 4 files changed, 356 insertions(+), 3 deletions(-) create mode 100644 components/infra-proxy-service/migrations/pipeline/phaseonemigration.go create mode 100644 components/infra-proxy-service/pipeline/models.go diff --git a/components/infra-proxy-service/migrations/migrations.go b/components/infra-proxy-service/migrations/migrations.go index 270074d52c7..eec78ac17ce 100644 --- a/components/infra-proxy-service/migrations/migrations.go +++ b/components/infra-proxy-service/migrations/migrations.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/chef/automate/components/infra-proxy-service/migrations/pipeline" "io" "os" "path" @@ -31,7 +32,7 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF stream.SendAndClose(res) return err } - log.Info("Starting with migration phase with the upload filei for migration id: ", migrationId) + log.Info("Starting with migration phase with the upload file for migration id: ", migrationId) _, err = s.service.Migration.StartMigration(ctx, migrationId, serverId) fileData := bytes.Buffer{} s.service.Migration.StartFileUpload(ctx, migrationId, serverId) @@ -81,6 +82,8 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF return err } + pipelineResult := pipeline.Result{Meta: pipeline.Meta{ZipFile: fileName}} + s.phaseOnePipeline.Run(pipelineResult) return nil } diff --git a/components/infra-proxy-service/migrations/pipeline/phaseonemigration.go b/components/infra-proxy-service/migrations/pipeline/phaseonemigration.go new file mode 100644 index 00000000000..a79089dd8f7 --- /dev/null +++ b/components/infra-proxy-service/migrations/pipeline/phaseonemigration.go @@ -0,0 +1,231 @@ +package pipeline + +import ( + "context" + "fmt" +) + +type PipelineData struct { + Result Result + Done chan<- error + Ctx context.Context +} + +type PhaseOnePipleine struct { + in chan<- PipelineData +} + +type PhaseOnePipelineProcessor func(<-chan PipelineData) <-chan PipelineData + +// ParseOrg returns PhaseOnePipelineProcessor +func UnzipSrc() PhaseOnePipelineProcessor { + return func(result <-chan PipelineData) <-chan PipelineData { + return unzip(result) + } +} + +func unzip(result <-chan PipelineData) <-chan PipelineData { + fmt.Println("Starting unzip pipeline") + out := make(chan PipelineData, 100) + go func() { + + for res := range result { + res.Result.Meta.UnzipFolder = "backup" + select { + case out <- res: + case <-res.Ctx.Done(): + res.Done <- nil + } + } + fmt.Println("Closing unzip") + close(out) + }() + return out +} + +// ParseOrg returns PhaseOnePipelineProcessor +func ParseOrg() PhaseOnePipelineProcessor { + return func(result <-chan PipelineData) <-chan PipelineData { + return parseOrg(result) + } +} + +func parseOrg(result <-chan PipelineData) <-chan PipelineData { + fmt.Println("Starting to parse_orgs pipeline") + + out := make(chan PipelineData, 100) + + go func() { + fmt.Println("Processing to parse orgs...") + for res := range result { + select { + case out <- res: + case <-res.Ctx.Done(): + res.Done <- nil + } + fmt.Println("after write") + } + fmt.Println("CLosing parse_orgs pipeline") + close(out) + }() + return out +} + +// ParseUser returns PhaseOnePipelineProcessor +func ParseUser() PhaseOnePipelineProcessor { + return func(result <-chan PipelineData) <-chan PipelineData { + return parseUser(result) + } +} + +func parseUser(result <-chan PipelineData) <-chan PipelineData { + fmt.Println("Starting to parse_user pipeline") + + out := make(chan PipelineData, 100) + + go func() { + fmt.Println("Processing to parse_user...") + for res := range result { + select { + case out <- res: + case <-res.Ctx.Done(): + res.Done <- nil + } + } + fmt.Println("Closing parse_user") + close(out) + }() + + return out +} + +// ConflictingUsers returns PhaseOnePipelineProcessor +func ConflictingUsers() PhaseOnePipelineProcessor { + return func(result <-chan PipelineData) <-chan PipelineData { + return conflictingUsers(result) + } +} + +func conflictingUsers(result <-chan PipelineData) <-chan PipelineData { + fmt.Println("Starting to conflicting_user check pipeline") + + out := make(chan PipelineData, 100) + + go func() { + fmt.Println("Processing to conflicting_user users...") + + for res := range result { + select { + case out <- res: + case <-res.Ctx.Done(): + res.Done <- nil + } + } + fmt.Println("Closing conflicting_user") + close(out) + + }() + + return out +} + +// OrgMembers returns PhaseOnePipelineProcessor +func OrgMembers() PhaseOnePipelineProcessor { + return func(result <-chan PipelineData) <-chan PipelineData { + return orgMembers(result) + } +} + +func orgMembers(result <-chan PipelineData) <-chan PipelineData { + fmt.Println("Starting to org_user check pipeline") + + out := make(chan PipelineData, 100) + + go func() { + fmt.Println("Processing to check org_user association...") + for res := range result { + select { + case out <- res: + case <-res.Ctx.Done(): + res.Done <- nil + } + } + fmt.Println("Closing org_user association check") + close(out) + + }() + + return out +} + +// AdminUsers Return PhaseOnePipelineProcessor +func AdminUsers() PhaseOnePipelineProcessor { + return func(result <-chan PipelineData) <-chan PipelineData { + return adminUsers(result) + } +} + +func adminUsers(result <-chan PipelineData) <-chan PipelineData { + fmt.Println("Starting org admin_users check ") + + out := make(chan PipelineData, 100) + + go func() { + fmt.Println("Processing to to check admin_users...") + for res := range result { + select { + case out <- res: + case <-res.Ctx.Done(): + res.Done <- nil + } + } + fmt.Println("Closing admin_users") + close(out) + + }() + + return out +} + +func migrationPipeline(source <-chan PipelineData, pipes ...PhaseOnePipelineProcessor) { + fmt.Println("Pipeline started...") + + go func() { + for _, pipe := range pipes { + source = pipe(source) + } + + for s := range source { + s.Done <- nil + } + }() +} + +func SetupPhaseOnePipeline() PhaseOnePipleine { + c := make(chan PipelineData, 100) + migrationPipeline(c, + UnzipSrc(), + ParseOrg(), + ParseUser(), + ConflictingUsers(), + OrgMembers(), + AdminUsers(), + ) + return PhaseOnePipleine{in: c} +} + +func (p *PhaseOnePipleine) Run(result Result) { + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error) + select { + case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}: + } + err := <-done + if err != nil { + fmt.Println("received error") + } + fmt.Println("received done") + }() +} diff --git a/components/infra-proxy-service/migrations/server.go b/components/infra-proxy-service/migrations/server.go index 6034bfd1545..adcfbde4e34 100644 --- a/components/infra-proxy-service/migrations/server.go +++ b/components/infra-proxy-service/migrations/server.go @@ -1,16 +1,20 @@ package migrations import ( + "github.com/chef/automate/components/infra-proxy-service/migrations/pipeline" "github.com/chef/automate/components/infra-proxy-service/service" ) type MigrationServer struct { - service *service.Service + service *service.Service + phaseOnePipeline pipeline.PhaseOnePipleine } // NewMigrationServer returns an infra-proxy migration server func NewMigrationServer(service *service.Service) *MigrationServer { + c := pipeline.SetupPhaseOnePipeline() return &MigrationServer{ - service: service, + service: service, + phaseOnePipeline: c, } } diff --git a/components/infra-proxy-service/pipeline/models.go b/components/infra-proxy-service/pipeline/models.go new file mode 100644 index 00000000000..c76d3f5f9c6 --- /dev/null +++ b/components/infra-proxy-service/pipeline/models.go @@ -0,0 +1,115 @@ +package pipeline + +type ActionOps int + +const ( + Insert ActionOps = 1 + iota + Skip + Delete + Update +) + +// +type Result struct { + // Meta for Zip file info + Meta Meta `json:"meta"` + + // ParsedResult for Orgs, Users + ParsedResult ParsedResult `json:"parsed_result"` +} + +type Meta struct { + // StageResults holds Skipped instances + StageResults []StageResult `json:"stage_results"` + + // ZipFile for zip file location + ZipFile string `json:"zip_file"` + + // UnzipFolder for unzipped folder's location + UnzipFolder string `json:"unzip_folder"` +} + +type StageResult struct { + StageName string `json:"stage_name"` + IsSuccess bool `json:"is_success"` + Failure error `json:"failure"` +} + +type ParsedResult struct { + // Orgs array + Orgs []Org `json:"orgs"` + + // Users array + Users []User `json:"users"` + + // OrgsUsers for Orgs and Users associations + OrgsUsers []OrgsUsersAssociations `json:"orgs_users_associations"` +} + +// OrgsUsersAssociations +type OrgsUsersAssociations struct { + // OrgName + OrgName Org `json:"org_name"` + + // Users UserAssociation slice + Users []UserAssociation `json:"user_association"` +} +type UserAssociation struct { + // Username + Username string `json:"username"` + + // IsAdmin + IsAdmin bool `json:"is_admin"` +} + +type KeyDump struct { + ID string `json:"id"` + AuthzID string `json:"authz_id"` + Username string `json:"username"` + Email string `json:"email"` + PubkeyVersion int `json:"pubkey_version"` + PublicKey interface{} `json:"public_key"` + SerializedObject string `json:"serialized_object"` + LastUpdatedBy string `json:"last_updated_by"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + ExternalAuthenticationUID interface{} `json:"external_authentication_uid"` + RecoveryAuthenticationEnabled interface{} `json:"recovery_authentication_enabled"` + Admin bool `json:"admin"` + HashedPassword interface{} `json:"hashed_password"` + Salt interface{} `json:"salt"` + HashType interface{} `json:"hash_type"` +} + +type Org struct { + + // Org Name + Name string `json:"name"` + + // FullName + FullName string `json:"full_name"` + + // ActionOps for Insert Skip Update and Delete + ActionOps ActionOps `json:"guid"` +} + +type User struct { + Username string `json:"username"` + Email string `json:"email"` + DisplayName string `json:"display_name"` + FirstName string `json:"first_name"` + LastName string `json:"last_name"` + MiddleName string `json:"middle_name"` + + // AutomateUsername is ldap username + AutomateUsername string `json:"automate_username"` + + // Connector ldap user + Connector string `json:"connector"` + + // IsConflicting for user's existance in db + IsConflicting bool `json:"is_conflicting"` + + // IsAdmin (user is admin or not) + IsAdmin bool `json:"is_admin"` +}