Skip to content

Commit

Permalink
Adding pipeline function for parsing org user association (#6707)
Browse files Browse the repository at this point in the history
* Adding pipeline function for parsing org user association

Signed-off-by: Yashvi Jain <Yashvi.jain@progress.com>

* Changes for lint fixes

Signed-off-by: Yashvi Jain <Yashvi.jain@progress.com>

* Fixing test cases for parse org users

Signed-off-by: Yashvi Jain <Yashvi.jain@progress.com>

* Adding more test cases for Org User Association

Signed-off-by: Yashvi Jain <Yashvi.jain@progress.com>

* Fixing code smells

Signed-off-by: Yashvi Jain <Yashvi.jain@progress.com>

* adding correct action ops

Signed-off-by: Yashvi Jain <Yashvi.jain@progress.com>

* Test cases correction for org users

Signed-off-by: Yashvi Jain <Yashvi.jain@progress.com>

* Get org admins from back up zip

Signed-off-by: sonali wale <sonali.wale@progress.com>

* Adding test cases for getting admin from org

Signed-off-by: Yashvi Jain <Yashvi.jain@progress.com>

* Updated test cases

Signed-off-by: sonali wale <sonali.wale@progress.com>

* Updated test data

Signed-off-by: sonali wale <sonali.wale@progress.com>

* Added minor changes

Signed-off-by: sonali wale <sonali.wale@progress.com>

* Added new line

Signed-off-by: sonali wale <sonali.wale@progress.com>

* Storage function changes added

Signed-off-by: sonali wale <sonali.wale@progress.com>

* Changes for review comment

Signed-off-by: sonali wale <sonali.wale@progress.com>

Co-authored-by: sonali wale <sonali.wale@progress.com>
  • Loading branch information
2 people authored and vinay033 committed Apr 18, 2022
1 parent c2e92d3 commit 468a5d9
Show file tree
Hide file tree
Showing 15 changed files with 414 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type PhaseOnePipleine struct {

type PhaseOnePipelineProcessor func(<-chan PipelineData) <-chan PipelineData

// ParseOrg returns PhaseOnePipelineProcessor
// ParseOrg unzip the backup file and returns PhaseOnePipelineProcessor
func UnzipSrc(service *service.Service) PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return unzipSrc(result, service)
Expand Down Expand Up @@ -52,7 +52,7 @@ func unzipSrc(result <-chan PipelineData, service *service.Service) <-chan Pipel
return out
}

// ParseOrg returns PhaseOnePipelineProcessor
// ParseOrg parse the orgs and returns PhaseOnePipelineProcessor
func ParseOrgSrc(service *service.Service) PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return parseOrgSrc(result, service)
Expand Down Expand Up @@ -84,6 +84,49 @@ func parseOrgSrc(result <-chan PipelineData, service *service.Service) <-chan Pi
return out
}

// ParseOrg parse the org user association and returns PhaseOnePipelineProcessor
func ParseOrgUserAssociationSrc(service *service.Service) PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return parseOrgUserAssociationSrc(result, service)
}
}

func parseOrgUserAssociationSrc(result <-chan PipelineData, service *service.Service) <-chan PipelineData {
log.Info("Starting to parse_orgs_user_association pipeline")

out := make(chan PipelineData, 100)

go func() {
log.Info("Processing to parse orgs user association...")
for res := range result {
_, err := service.Migration.StartUserAssociationParsing(res.Ctx, res.Result.Meta.MigrationID, res.Result.Meta.ServerID)
if err != nil {
log.Errorf("Failed to update the status for start org user association for the migration id %s : %s", res.Result.Meta.MigrationID, err.Error())
return
}
result, err := ParseOrgUserAssociation(res.Ctx, service.Storage, res.Result)
if err != nil {
_, _ = service.Migration.FailedUserAssociationParsing(res.Ctx, res.Result.Meta.MigrationID, res.Result.Meta.ServerID, err.Error(), 0, 0, 0)
return
}
_, err = service.Migration.CompleteUserAssociationParsing(res.Ctx, res.Result.Meta.MigrationID, res.Result.Meta.ServerID, 0, 0, 0)
if err != nil {
log.Errorf("Failed to update the status for complete org user association for the migration id %s : %s", res.Result.Meta.MigrationID, err.Error())
return
}
res.Result = result
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
log.Info("Closing parse_orgs_user_association pipeline")
close(out)
}()
return out
}

// ParseOrg returns PhaseOnePipelineProcessor
func CreatePreviewSrc(service *service.Service) PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
Expand Down Expand Up @@ -250,6 +293,7 @@ func SetupPhaseOnePipeline(service *service.Service) PhaseOnePipleine {
migrationPipeline(c,
UnzipSrc(service),
ParseOrgSrc(service),
ParseOrgUserAssociationSrc(service),
CreatePreviewSrc(service),
// ParseUser(),
// ConflictingUsers(),
Expand Down
206 changes: 200 additions & 6 deletions components/infra-proxy-service/migrations/pipeline/utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"archive/zip"
"context"
"encoding/json"
"fmt"
"github.com/chef/automate/api/interservice/authz"
"github.com/chef/automate/components/infra-proxy-service/pipeline"
"github.com/chef/automate/components/infra-proxy-service/storage"
log "github.com/sirupsen/logrus"
"io"
"os"
"path"
"path/filepath"

"github.com/chef/automate/api/interservice/authz"
"github.com/chef/automate/components/infra-proxy-service/pipeline"
"github.com/chef/automate/components/infra-proxy-service/storage"
log "github.com/sirupsen/logrus"
)

// StoreOrgs reads the Result struct and populate the orgs table
Expand Down Expand Up @@ -244,7 +244,7 @@ func openOrgFolder(org os.FileInfo, fileLocation string) pipeline.OrgJson {
jsonFile, err := os.Open(jsonPath)
// if we os.Open returns an error then handle it
if err != nil {
fmt.Println(err)
log.Errorf("Unable to open the file at location : %s", jsonPath)
}
log.Info("Successfully opened the file at location", jsonPath)
defer func() {
Expand Down Expand Up @@ -341,3 +341,197 @@ func Unzip(ctx context.Context, mst storage.MigrationStorage, result pipeline.Re
}
return result, nil
}

// ParseOrgUserAssociation sync the automate org users with chef server org users
func ParseOrgUserAssociation(ctx context.Context, st storage.Storage, result pipeline.Result) (pipeline.Result, error) {
log.Info("Starting with the parsing org user association for migration id :", result.Meta.MigrationID)
var orgUserAssociations []pipeline.OrgsUsersAssociations
var err error
orgUserAssociations, err = getActionForOrgUsers(ctx, st, result)
if err != nil {
log.Errorf("Unable to parse org user association for migration id : %s : %s", result.Meta.MigrationID, err.Error())
return result, err
}
result.ParsedResult.OrgsUsers = append(result.ParsedResult.OrgsUsers, orgUserAssociations...)
log.Info("Completed with the parsing org user association for migration id :", result.Meta.MigrationID)
return result, nil
}

func getActionForOrgUsers(ctx context.Context, st storage.Storage, result pipeline.Result) ([]pipeline.OrgsUsersAssociations, error) {
orgUserAssociations := make([]pipeline.OrgsUsersAssociations, 0)
var userAssociations []pipeline.UserAssociation
orgPath := path.Join(result.Meta.UnzipFolder, "organizations")
for _, org := range result.ParsedResult.Orgs {
log.Info("Getting actions for org id", org.Name)
chefServerOrgUsers, err := getChefServerOrgUsers(org.Name, orgPath)
if err != nil {
log.Errorf("Unable to get the chef server organisation users %s ", err)
return nil, err
}
if org.ActionOps == pipeline.Insert {
userAssociations = append(userAssociations, createInsertUserAssociation(chefServerOrgUsers)...)
} else {
if org.ActionOps == pipeline.Delete {
userAssociations = append(userAssociations, createDeleteUserAssociation(chefServerOrgUsers)...)
} else {
orgUsersInDb, err := st.GetAutomateOrgUsers(ctx, org.Name)
if err != nil {
log.Errorf("Unable to fetch automate Users for org %s : %s", org.Name, err.Error())
return nil, err
}
userAssociations = append(userAssociations, insertOrUpdateActionForOrgUsers(orgUsersInDb, chefServerOrgUsers)...)
userAssociations = append(userAssociations, deleteActionForOrgUses(orgUsersInDb, chefServerOrgUsers)...)
}
}
orgUserAssociations = append(orgUserAssociations, pipeline.OrgsUsersAssociations{OrgName: org, Users: userAssociations})
}
return orgUserAssociations, nil
}

func createInsertUserAssociation(chefServerOrgUsers []pipeline.UserAssociation) []pipeline.UserAssociation {
userAssociation := make([]pipeline.UserAssociation, 0)
for _, user := range chefServerOrgUsers {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Insert})
}
return userAssociation
}

func createDeleteUserAssociation(chefServerOrgUsers []pipeline.UserAssociation) []pipeline.UserAssociation {
userAssociation := make([]pipeline.UserAssociation, 0)
for _, user := range chefServerOrgUsers {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Delete})
}
return userAssociation
}

func insertOrUpdateActionForOrgUsers(orgUsers []storage.OrgUser, chefServerOrgUsers []pipeline.UserAssociation) []pipeline.UserAssociation {
var userAssociation []pipeline.UserAssociation
orgUserMapDB := createMapForOrgUsersInDB(orgUsers)
for _, user := range chefServerOrgUsers {
isAdmin, valuePresent := orgUserMapDB[user.Username]
if valuePresent {
//check for the org admins
if user.IsAdmin != isAdmin {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Update})
} else {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Skip})
}
} else {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Insert})
}
}
return userAssociation
}

func deleteActionForOrgUses(orgUsers []storage.OrgUser, chefServerOrgUsers []pipeline.UserAssociation) []pipeline.UserAssociation {
var userAssociation []pipeline.UserAssociation
orgUserJsonMap := createMapForOrgUsersInJson(chefServerOrgUsers)
for _, user := range orgUsers {
_, valuePresent := orgUserJsonMap[user.InfraServerUsername]
if !valuePresent {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.InfraServerUsername, IsAdmin: user.IsAdmin, ActionOps: pipeline.Delete})
}
}

return userAssociation
}

func createMapForOrgUsersInDB(orgUsers []storage.OrgUser) map[string]bool {
orgUsersMap := make(map[string]bool)
for _, s := range orgUsers {
orgUsersMap[s.InfraServerUsername] = s.IsAdmin
}
return orgUsersMap
}

func createMapForOrgUsersInJson(chefServerOrgUsers []pipeline.UserAssociation) map[string]string {
orgUsersMap := make(map[string]string)
for _, user := range chefServerOrgUsers {
orgUsersMap[user.Username] = ""
}
return orgUsersMap
}

// getChefServerOrgUsers returns the chef server organisation users from backup file
func getChefServerOrgUsers(orgName, fileLocation string) ([]pipeline.UserAssociation, error) {
orgUsers := make([]pipeline.UserAssociation, 0)

members, err := getOrgMembers(orgName, fileLocation)
if err != nil {
log.Errorf("Unable to get orgnisation members %s", err)
return nil, err
}
admins, err := getOrgAdmins(orgName, fileLocation)
if err != nil {
log.Errorf("Unable to get orgnisation admins %s", err)
return nil, err
}
orgAdminMap := createMapForOrgAdminsInJson(admins)
for _, member := range members {
orgUser := pipeline.UserAssociation{}
if member.User.Username == "pivotal" {
continue
}
orgUser.Username = member.User.Username
_, valuePresent := orgAdminMap[member.User.Username]
if valuePresent {
orgUser.IsAdmin = true
}
orgUsers = append(orgUsers, orgUser)
}
return orgUsers, nil
}

// getOrgMembers Get the data of members.json
func getOrgMembers(orgName, fileLocation string) ([]pipeline.MembersJson, error) {
var orgMembers []pipeline.MembersJson
usersJsonPath := path.Join(fileLocation, orgName, "members.json")
usersjsonFile, err := os.Open(usersJsonPath)
// if we os.Open returns an error then handle it
if err != nil {
log.Errorf("Unable to open org members file at the location : %s", usersJsonPath)
return nil, err
}
log.Info("Successfully opened the org members file at location", usersJsonPath)
// defer the closing of our jsonFile so that we can parse it later on
defer func() {
_ = usersjsonFile.Close()
}()
err = json.NewDecoder(usersjsonFile).Decode(&orgMembers)
if err != nil {
log.Errorf("Unable to decode the org members file %s %s", usersJsonPath, err)
return nil, err
}
return orgMembers, nil
}

// getOrgAdmins Get the data of admins.json
func getOrgAdmins(orgName, fileLocation string) (pipeline.AdminsJson, error) {
var orgAdmins pipeline.AdminsJson
adminJsonPath := path.Join(fileLocation, orgName, "groups", "admins.json")
jsonFile, err := os.Open(adminJsonPath)
// if we os.Open returns an error then handle it
if err != nil {
log.Errorf("Unable to open org admins file at the location : %s %s", adminJsonPath, err)
return pipeline.AdminsJson{}, err
}
log.Info("Successfully opened the org admins file at location", adminJsonPath)
// defer the closing of our jsonFile so that we can parse it later on
defer func() {
_ = jsonFile.Close()
}()
err = json.NewDecoder(jsonFile).Decode(&orgAdmins)
if err != nil {
log.Errorf("Unable to decode the org admins file %s %s", adminJsonPath, err)
return pipeline.AdminsJson{}, err
}
return orgAdmins, nil
}

func createMapForOrgAdminsInJson(adminsJson pipeline.AdminsJson) map[string]string {
orgAdminsMap := make(map[string]string)
for _, username := range adminsJson.Users {
orgAdminsMap[username] = ""
}
return orgAdminsMap
}
34 changes: 34 additions & 0 deletions components/infra-proxy-service/migrations/pipeline/utility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,37 @@ func TestCreatePreview(t *testing.T) {
})
}
}

func TestUserOrgAssociation(t *testing.T) {
deleteBackUp := "../../testdata/deleteBackup/"
skipBackup := "../../testdata/skipBackup/"
type args struct {
ctx context.Context
st storage.Storage
result pipeline.Result
}
tests := []struct {
name string
args args
wantError error
want1 pipeline.Result
}{
{name: "Test Insert Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Insert"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: "../../testdata/insertBackup/", ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Insert}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Insert}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Insert}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: true, ActionOps: pipeline.Insert}, {Username: "user2", IsAdmin: false, ActionOps: pipeline.Insert}}}}}}},
{name: "Test Skip Org and Insert Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Insert"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: skipBackup, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: false, ActionOps: pipeline.Insert}, {Username: "user2", IsAdmin: true, ActionOps: pipeline.Insert}}}}}}},
{name: "Test Skip or Update Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Skip"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: skipBackup, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Skip}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: false, ActionOps: pipeline.Skip}, {Username: "user2", IsAdmin: true, ActionOps: pipeline.Update}}}}}}},
{name: "Test Delete Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Delete"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: deleteBackUp, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Delete}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Delete}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Delete}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: true, ActionOps: pipeline.Delete}}}}}}},
{name: "Test Skip org and Delete Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Skip"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: deleteBackUp, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Skip}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Skip}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Skip}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: true, ActionOps: pipeline.Update}, {Username: "user2", ActionOps: pipeline.Delete}}}}}}},
{name: "Test Skip Org and Insert Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Insert"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: skipBackup, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Update}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Update}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Update}, Users: []pipeline.UserAssociation{{Username: "user1", ActionOps: pipeline.Insert}, {Username: "user2", IsAdmin: true, ActionOps: pipeline.Insert}}}}}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseOrgUserAssociation(tt.args.ctx, tt.args.st, tt.args.result)
if err != nil && err.Error() != tt.wantError.Error() {
t.Errorf("ParseOrgUserAssociation() err = %v, want %v", err, tt.wantError)
}
if !reflect.DeepEqual(got.ParsedResult.OrgsUsers, tt.want1.ParsedResult.OrgsUsers) {
t.Errorf("ParseOrgUserAssociation() got = %v, want %v", got.ParsedResult.OrgsUsers, tt.want1.ParsedResult.OrgsUsers)
}
})
}
}
17 changes: 17 additions & 0 deletions components/infra-proxy-service/pipeline/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,16 @@ type OrgsUsersAssociations struct {
// Users UserAssociation slice
Users []UserAssociation `json:"user_association"`
}

type UserAssociation struct {
// Username
Username string `json:"username"`

// IsAdmin
IsAdmin bool `json:"is_admin"`

// ActionOps for Insert Skip Update and Delete for UserAssociation
ActionOps ActionOps `json:"action_ops"`
}

type KeyDump struct {
Expand Down Expand Up @@ -125,3 +129,16 @@ type OrgJson struct {
FullName string `json:"full_name"`
Guid string `json:"guid"`
}

type MembersJson struct {
User UsersJson `json:"user"`
}

type AdminsJson struct {
Name string `json:"name"`
Users []string `json:"users"`
}

type UsersJson struct {
Username string `json:"username"`
}
Loading

0 comments on commit 468a5d9

Please sign in to comment.