Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/migration pipe #6647

Merged
merged 10 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion components/infra-proxy-service/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/chef/automate/components/infra-proxy-service/migrations/pipeline"
"io"
"os"
"path"
Expand Down Expand Up @@ -31,7 +32,7 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF
stream.SendAndClose(res)
return err
}
log.Info("Starting with migration phase with the upload filei for migration id: ", migrationId)
log.Info("Starting with migration phase with the upload file for migration id: ", migrationId)
_, err = s.service.Migration.StartMigration(ctx, migrationId, serverId)
fileData := bytes.Buffer{}
s.service.Migration.StartFileUpload(ctx, migrationId, serverId)
Expand Down Expand Up @@ -81,6 +82,8 @@ func (s *MigrationServer) UploadFile(stream service.MigrationDataService_UploadF
return err
}

pipelineResult := pipeline.Result{Meta: pipeline.Meta{ZipFile: fileName}}
s.phaseOnePipeline.Run(pipelineResult)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package pipeline

import (
"context"
"fmt"
)

type PipelineData struct {
Result Result
Done chan<- error
Ctx context.Context
}

type PhaseOnePipleine struct {
in chan<- PipelineData
}

type PhaseOnePipelineProcessor func(<-chan PipelineData) <-chan PipelineData

// ParseOrg returns PhaseOnePipelineProcessor
func UnzipSrc() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return unzip(result)
}
}

func unzip(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting unzip pipeline")
out := make(chan PipelineData, 100)
go func() {

for res := range result {
res.Result.Meta.UnzipFolder = "backup"
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing unzip")
close(out)
}()
return out
}

// ParseOrg returns PhaseOnePipelineProcessor
func ParseOrg() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return parseOrg(result)
}
}

func parseOrg(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to parse_orgs pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to parse orgs...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
fmt.Println("after write")
}
fmt.Println("CLosing parse_orgs pipeline")
close(out)
}()
return out
}

// ParseUser returns PhaseOnePipelineProcessor
func ParseUser() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return parseUser(result)
}
}

func parseUser(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to parse_user pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to parse_user...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing parse_user")
close(out)
}()

return out
}

// ConflictingUsers returns PhaseOnePipelineProcessor
func ConflictingUsers() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return conflictingUsers(result)
}
}

func conflictingUsers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to conflicting_user check pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to conflicting_user users...")

for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing conflicting_user")
close(out)

}()

return out
}

// OrgMembers returns PhaseOnePipelineProcessor
func OrgMembers() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return orgMembers(result)
}
}

func orgMembers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting to org_user check pipeline")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to check org_user association...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing org_user association check")
close(out)

}()

return out
}

// AdminUsers Return PhaseOnePipelineProcessor
func AdminUsers() PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return adminUsers(result)
}
}

func adminUsers(result <-chan PipelineData) <-chan PipelineData {
fmt.Println("Starting org admin_users check ")

out := make(chan PipelineData, 100)

go func() {
fmt.Println("Processing to to check admin_users...")
for res := range result {
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
fmt.Println("Closing admin_users")
close(out)

}()

return out
}

func migrationPipeline(source <-chan PipelineData, pipes ...PhaseOnePipelineProcessor) {
fmt.Println("Pipeline started...")

go func() {
for _, pipe := range pipes {
source = pipe(source)
}

for s := range source {
s.Done <- nil
}
}()
}

func SetupPhaseOnePipeline() PhaseOnePipleine {
c := make(chan PipelineData, 100)
migrationPipeline(c,
UnzipSrc(),
ParseOrg(),
ParseUser(),
ConflictingUsers(),
OrgMembers(),
AdminUsers(),
)
return PhaseOnePipleine{in: c}
}

func (p *PhaseOnePipleine) Run(result Result) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan error)
select {
case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}:
}
err := <-done
if err != nil {
fmt.Println("received error")
}
fmt.Println("received done")
}()
}
8 changes: 6 additions & 2 deletions components/infra-proxy-service/migrations/server.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package migrations

import (
"github.com/chef/automate/components/infra-proxy-service/migrations/pipeline"
"github.com/chef/automate/components/infra-proxy-service/service"
)

type MigrationServer struct {
service *service.Service
service *service.Service
phaseOnePipeline pipeline.PhaseOnePipleine
}

// NewMigrationServer returns an infra-proxy migration server
func NewMigrationServer(service *service.Service) *MigrationServer {
c := pipeline.SetupPhaseOnePipeline()
return &MigrationServer{
service: service,
service: service,
phaseOnePipeline: c,
}
}
115 changes: 115 additions & 0 deletions components/infra-proxy-service/pipeline/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package pipeline

type ActionOps int

const (
Insert ActionOps = 1 + iota
Skip
Delete
Update
)

//
type Result struct {
// Meta for Zip file info
Meta Meta `json:"meta"`

// ParsedResult for Orgs, Users
ParsedResult ParsedResult `json:"parsed_result"`
}

type Meta struct {
// StageResults holds Skipped instances
StageResults []StageResult `json:"stage_results"`

// ZipFile for zip file location
ZipFile string `json:"zip_file"`

// UnzipFolder for unzipped folder's location
UnzipFolder string `json:"unzip_folder"`
}

type StageResult struct {
StageName string `json:"stage_name"`
IsSuccess bool `json:"is_success"`
Failure error `json:"failure"`
}

type ParsedResult struct {
// Orgs array
Orgs []Org `json:"orgs"`

// Users array
Users []User `json:"users"`

// OrgsUsers for Orgs and Users associations
OrgsUsers []OrgsUsersAssociations `json:"orgs_users_associations"`
}

// OrgsUsersAssociations
type OrgsUsersAssociations struct {
// OrgName
OrgName Org `json:"org_name"`

// Users UserAssociation slice
Users []UserAssociation `json:"user_association"`
}
type UserAssociation struct {
// Username
Username string `json:"username"`

// IsAdmin
IsAdmin bool `json:"is_admin"`
}

type KeyDump struct {
ID string `json:"id"`
AuthzID string `json:"authz_id"`
Username string `json:"username"`
Email string `json:"email"`
PubkeyVersion int `json:"pubkey_version"`
PublicKey interface{} `json:"public_key"`
SerializedObject string `json:"serialized_object"`
LastUpdatedBy string `json:"last_updated_by"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
ExternalAuthenticationUID interface{} `json:"external_authentication_uid"`
RecoveryAuthenticationEnabled interface{} `json:"recovery_authentication_enabled"`
Admin bool `json:"admin"`
HashedPassword interface{} `json:"hashed_password"`
Salt interface{} `json:"salt"`
HashType interface{} `json:"hash_type"`
}

type Org struct {

// Org Name
Name string `json:"name"`

// FullName
FullName string `json:"full_name"`

// ActionOps for Insert Skip Update and Delete
ActionOps ActionOps `json:"guid"`
}

type User struct {
Username string `json:"username"`
Email string `json:"email"`
DisplayName string `json:"display_name"`
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
MiddleName string `json:"middle_name"`

// AutomateUsername is ldap username
AutomateUsername string `json:"automate_username"`

// Connector ldap user
Connector string `json:"connector"`

// IsConflicting for user's existance in db
IsConflicting bool `json:"is_conflicting"`

// IsAdmin (user is admin or not)
IsAdmin bool `json:"is_admin"`
}