Skip to content

Commit

Permalink
Feature/migration pipe (#6647)
Browse files Browse the repository at this point in the history
* Pipeline demo

* Demo Pipeline

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Based on PR Review

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Models

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Updateed select statement

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Removed package-lock.json

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Pipeline to process and close the go routines

Signed-off-by: Kallol Roy <karoy@progress.com>

* Feature/models (#6655)

* Pipeline demo

* Demo Pipeline

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Based on PR Review

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Models

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Models done

* Package JSON change

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Package JSON change

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Package JSON change

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

* Setup pipeline to process data

Signed-off-by: Kallol Roy <karoy@progress.com>

* Some comment corrections

Signed-off-by: Pappu Kumar <pappu.kumar@progress.com>

Co-authored-by: Kallol Roy <karoy@progress.com>
  • Loading branch information
2 people authored and vinay033 committed Feb 15, 2022
1 parent 1bb60ae commit 8a40d29
Show file tree
Hide file tree
Showing 4 changed files with 356 additions and 3 deletions.
5 changes: 4 additions & 1 deletion components/infra-proxy-service/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/chef/automate/components/infra-proxy-service/migrations/pipeline"
"io"
"os"
"path"
Expand Down Expand Up @@ -31,7 +32,7 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF
stream.SendAndClose(res)
return err
}
log.Info("Starting with migration phase with the upload filei for migration id: ", migrationId)
log.Info("Starting with migration phase with the upload file for migration id: ", migrationId)
_, err = s.service.Migration.StartMigration(ctx, migrationId, serverId)
fileData := bytes.Buffer{}
s.service.Migration.StartFileUpload(ctx, migrationId, serverId)
Expand Down Expand Up @@ -81,6 +82,8 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF
return err
}

pipelineResult := pipeline.Result{Meta: pipeline.Meta{ZipFile: fileName}}
s.phaseOnePipeline.Run(pipelineResult)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package pipeline

import (
"context"
"fmt"
)

type PipelineData struct {
Result Result
Done chan<- error
Ctx context.Context
}

type PhaseOnePipleine struct {
in chan<- PipelineData
}

type PhaseOnePipelineProcessor func(<-chan PipelineData) <-chan PipelineData

// ParseOrg returns PhaseOnePipelineProcessor
func UnzipSrc() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return unzip(result)
}
}

func unzip(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting unzip pipeline")
out := make(chan PipelineData, 100)
go func() {

for res := range result {
res.Result.Meta.UnzipFolder = "backup"
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing unzip")
close(out)
}()
return out
}

// ParseOrg returns PhaseOnePipelineProcessor
func ParseOrg() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return parseOrg(result)
}
}

func parseOrg(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to parse_orgs pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to parse orgs...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
fmt.Println("after write")
}
fmt.Println("CLosing parse_orgs pipeline")
close(out)
}()
return out
}

// ParseUser returns PhaseOnePipelineProcessor
func ParseUser() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return parseUser(result)
}
}

func parseUser(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to parse_user pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to parse_user...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing parse_user")
close(out)
}()

return out
}

// ConflictingUsers returns PhaseOnePipelineProcessor
func ConflictingUsers() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return conflictingUsers(result)
}
}

func conflictingUsers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to conflicting_user check pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to conflicting_user users...")

for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing conflicting_user")
close(out)

}()

return out
}

// OrgMembers returns PhaseOnePipelineProcessor
func OrgMembers() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return orgMembers(result)
}
}

func orgMembers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to org_user check pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to check org_user association...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing org_user association check")
close(out)

}()

return out
}

// AdminUsers Return PhaseOnePipelineProcessor
func AdminUsers() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return adminUsers(result)
}
}

func adminUsers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting org admin_users check ")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to to check admin_users...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing admin_users")
close(out)

}()

return out
}

func migrationPipeline(source <-chan PipelineData, pipes ...PhaseOnePipelineProcessor) {
fmt.Println("Pipeline started...")

go func() {
for _, pipe := range pipes {
source = pipe(source)
}

for s := range source {
s.Done <- nil
}
}()
}

func SetupPhaseOnePipeline() PhaseOnePipleine {
c := make(chan PipelineData, 100)
migrationPipeline(c,
UnzipSrc(),
ParseOrg(),
ParseUser(),
ConflictingUsers(),
OrgMembers(),
AdminUsers(),
)
return PhaseOnePipleine{in: c}
}

func (p *PhaseOnePipleine) Run(result Result) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan error)
select {
case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}:
}
err := <-done
if err != nil {
fmt.Println("received error")
}
fmt.Println("received done")
}()
}
8 changes: 6 additions & 2 deletions components/infra-proxy-service/migrations/server.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package migrations

import (
"github.com/chef/automate/components/infra-proxy-service/migrations/pipeline"
"github.com/chef/automate/components/infra-proxy-service/service"
)

type MigrationServer struct {
service *service.Service
service *service.Service
phaseOnePipeline pipeline.PhaseOnePipleine
}

// NewMigrationServer returns an infra-proxy migration server
func NewMigrationServer(service *service.Service) *MigrationServer {
c := pipeline.SetupPhaseOnePipeline()
return &MigrationServer{
service: service,
service: service,
phaseOnePipeline: c,
}
}
115 changes: 115 additions & 0 deletions components/infra-proxy-service/pipeline/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package pipeline

type ActionOps int

const (
Insert ActionOps = 1 + iota
Skip
Delete
Update
)

//
type Result struct {
// Meta for Zip file info
Meta Meta `json:"meta"`

// ParsedResult for Orgs, Users
ParsedResult ParsedResult `json:"parsed_result"`
}

type Meta struct {
// StageResults holds Skipped instances
StageResults []StageResult `json:"stage_results"`

// ZipFile for zip file location
ZipFile string `json:"zip_file"`

// UnzipFolder for unzipped folder's location
UnzipFolder string `json:"unzip_folder"`
}

type StageResult struct {
StageName string `json:"stage_name"`
IsSuccess bool `json:"is_success"`
Failure error `json:"failure"`
}

type ParsedResult struct {
// Orgs array
Orgs []Org `json:"orgs"`

// Users array
Users []User `json:"users"`

// OrgsUsers for Orgs and Users associations
OrgsUsers []OrgsUsersAssociations `json:"orgs_users_associations"`
}

// OrgsUsersAssociations
type OrgsUsersAssociations struct {
// OrgName
OrgName Org `json:"org_name"`

// Users UserAssociation slice
Users []UserAssociation `json:"user_association"`
}
type UserAssociation struct {
// Username
Username string `json:"username"`

// IsAdmin
IsAdmin bool `json:"is_admin"`
}

type KeyDump struct {
ID string `json:"id"`
AuthzID string `json:"authz_id"`
Username string `json:"username"`
Email string `json:"email"`
PubkeyVersion int `json:"pubkey_version"`
PublicKey interface{} `json:"public_key"`
SerializedObject string `json:"serialized_object"`
LastUpdatedBy string `json:"last_updated_by"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
ExternalAuthenticationUID interface{} `json:"external_authentication_uid"`
RecoveryAuthenticationEnabled interface{} `json:"recovery_authentication_enabled"`
Admin bool `json:"admin"`
HashedPassword interface{} `json:"hashed_password"`
Salt interface{} `json:"salt"`
HashType interface{} `json:"hash_type"`
}

type Org struct {

// Org Name
Name string `json:"name"`

// FullName
FullName string `json:"full_name"`

// ActionOps for Insert Skip Update and Delete
ActionOps ActionOps `json:"guid"`
}

type User struct {
Username string `json:"username"`
Email string `json:"email"`
DisplayName string `json:"display_name"`
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
MiddleName string `json:"middle_name"`

// AutomateUsername is ldap username
AutomateUsername string `json:"automate_username"`

// Connector ldap user
Connector string `json:"connector"`

// IsConflicting for user's existance in db
IsConflicting bool `json:"is_conflicting"`

// IsAdmin (user is admin or not)
IsAdmin bool `json:"is_admin"`
}

0 comments on commit 8a40d29

Please sign in to comment.