Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding pipeline function for parsing org user association #6707

Merged
merged 15 commits into from
Feb 22, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type PhaseOnePipleine struct {

type PhaseOnePipelineProcessor func(<-chan PipelineData) <-chan PipelineData

// ParseOrg returns PhaseOnePipelineProcessor
// ParseOrg unzip the backup file and returns PhaseOnePipelineProcessor
func UnzipSrc(service *service.Service) PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return unzipSrc(result, service)
Expand Down Expand Up @@ -52,7 +52,7 @@ func unzipSrc(result <-chan PipelineData, service *service.Service) <-chan Pipel
return out
}

// ParseOrg returns PhaseOnePipelineProcessor
// ParseOrg parse the orgs and returns PhaseOnePipelineProcessor
func ParseOrgSrc(service *service.Service) PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return parseOrgSrc(result, service)
Expand Down Expand Up @@ -84,6 +84,49 @@ func parseOrgSrc(result <-chan PipelineData, service *service.Service) <-chan Pi
return out
}

// ParseOrg parse the org user association and returns PhaseOnePipelineProcessor
func ParseOrgUserAssociationSrc(service *service.Service) PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
return parseOrgUserAssociationSrc(result, service)
}
}

func parseOrgUserAssociationSrc(result <-chan PipelineData, service *service.Service) <-chan PipelineData {
log.Info("Starting to parse_orgs_user_association pipeline")

out := make(chan PipelineData, 100)

go func() {
log.Info("Processing to parse orgs user association...")
for res := range result {
_, err := service.Migration.StartUserAssociationParsing(res.Ctx, res.Result.Meta.MigrationID, res.Result.Meta.ServerID)
if err != nil {
log.Errorf("Failed to update the status for start org user association for the migration id %s : %s", res.Result.Meta.MigrationID, err.Error())
return
}
result, err := ParseOrgUserAssociation(res.Ctx, service.Storage, res.Result)
if err != nil {
_, _ = service.Migration.FailedUserAssociationParsing(res.Ctx, res.Result.Meta.MigrationID, res.Result.Meta.ServerID, err.Error(), 0, 0, 0)
return
}
_, err = service.Migration.CompleteUserAssociationParsing(res.Ctx, res.Result.Meta.MigrationID, res.Result.Meta.ServerID, 0, 0, 0)
if err != nil {
log.Errorf("Failed to update the status for complete org user association for the migration id %s : %s", res.Result.Meta.MigrationID, err.Error())
return
}
res.Result = result
select {
case out <- res:
case <-res.Ctx.Done():
res.Done <- nil
}
}
log.Info("Closing parse_orgs_user_association pipeline")
close(out)
}()
return out
}

// ParseOrg returns PhaseOnePipelineProcessor
func CreatePreviewSrc(service *service.Service) PhaseOnePipelineProcessor {
return func(result <-chan PipelineData) <-chan PipelineData {
Expand Down Expand Up @@ -250,6 +293,7 @@ func SetupPhaseOnePipeline(service *service.Service) PhaseOnePipleine {
migrationPipeline(c,
UnzipSrc(service),
ParseOrgSrc(service),
ParseOrgUserAssociationSrc(service),
CreatePreviewSrc(service),
// ParseUser(),
// ConflictingUsers(),
Expand Down
206 changes: 200 additions & 6 deletions components/infra-proxy-service/migrations/pipeline/utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"archive/zip"
"context"
"encoding/json"
"fmt"
"github.com/chef/automate/api/interservice/authz"
"github.com/chef/automate/components/infra-proxy-service/pipeline"
"github.com/chef/automate/components/infra-proxy-service/storage"
log "github.com/sirupsen/logrus"
"io"
"os"
"path"
"path/filepath"

"github.com/chef/automate/api/interservice/authz"
"github.com/chef/automate/components/infra-proxy-service/pipeline"
"github.com/chef/automate/components/infra-proxy-service/storage"
log "github.com/sirupsen/logrus"
)

// StoreOrgs reads the Result struct and populate the orgs table
Expand Down Expand Up @@ -244,7 +244,7 @@ func openOrgFolder(org os.FileInfo, fileLocation string) pipeline.OrgJson {
jsonFile, err := os.Open(jsonPath)
// if we os.Open returns an error then handle it
if err != nil {
fmt.Println(err)
log.Errorf("Unable to open the file at location : %s", jsonPath)
}
log.Info("Successfully opened the file at location", jsonPath)
defer func() {
Expand Down Expand Up @@ -341,3 +341,197 @@ func Unzip(ctx context.Context, mst storage.MigrationStorage, result pipeline.Re
}
return result, nil
}

// ParseOrgUserAssociation sync the automate org users with chef server org users
func ParseOrgUserAssociation(ctx context.Context, st storage.Storage, result pipeline.Result) (pipeline.Result, error) {
log.Info("Starting with the parsing org user association for migration id :", result.Meta.MigrationID)
var orgUserAssociations []pipeline.OrgsUsersAssociations
var err error
orgUserAssociations, err = getActionForOrgUsers(ctx, st, result)
if err != nil {
log.Errorf("Unable to parse org user association for migration id : %s : %s", result.Meta.MigrationID, err.Error())
return result, err
}
result.ParsedResult.OrgsUsers = append(result.ParsedResult.OrgsUsers, orgUserAssociations...)
log.Info("Completed with the parsing org user association for migration id :", result.Meta.MigrationID)
return result, nil
}

func getActionForOrgUsers(ctx context.Context, st storage.Storage, result pipeline.Result) ([]pipeline.OrgsUsersAssociations, error) {
orgUserAssociations := make([]pipeline.OrgsUsersAssociations, 0)
var userAssociations []pipeline.UserAssociation
orgPath := path.Join(result.Meta.UnzipFolder, "organizations")
for _, org := range result.ParsedResult.Orgs {
log.Info("Getting actions for org id", org.Name)
chefServerOrgUsers, err := getChefServerOrgUsers(org.Name, orgPath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call the DB only in case of the org is an existing org. Can put it under the else block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kalroy this function returns the users from back up zip not from the automate DB

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the one at L374

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay. Did the changes, please check

if err != nil {
log.Errorf("Unable to get the chef server organisation users %s ", err)
return nil, err
}
if org.ActionOps == pipeline.Insert {
userAssociations = append(userAssociations, createInsertUserAssociation(chefServerOrgUsers)...)
} else {
if org.ActionOps == pipeline.Delete {
userAssociations = append(userAssociations, createDeleteUserAssociation(chefServerOrgUsers)...)
} else {
orgUsersInDb, err := st.GetAutomateOrgUsers(ctx, org.Name)
if err != nil {
log.Errorf("Unable to fetch automate Users for org %s : %s", org.Name, err.Error())
return nil, err
}
userAssociations = append(userAssociations, insertOrUpdateActionForOrgUsers(orgUsersInDb, chefServerOrgUsers)...)
userAssociations = append(userAssociations, deleteActionForOrgUses(orgUsersInDb, chefServerOrgUsers)...)
}
}
orgUserAssociations = append(orgUserAssociations, pipeline.OrgsUsersAssociations{OrgName: org, Users: userAssociations})
}
return orgUserAssociations, nil
}

func createInsertUserAssociation(chefServerOrgUsers []pipeline.UserAssociation) []pipeline.UserAssociation {
userAssociation := make([]pipeline.UserAssociation, 0)
for _, user := range chefServerOrgUsers {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Insert})
}
return userAssociation
}

func createDeleteUserAssociation(chefServerOrgUsers []pipeline.UserAssociation) []pipeline.UserAssociation {
userAssociation := make([]pipeline.UserAssociation, 0)
for _, user := range chefServerOrgUsers {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Delete})
}
return userAssociation
}

func insertOrUpdateActionForOrgUsers(orgUsers []storage.OrgUser, chefServerOrgUsers []pipeline.UserAssociation) []pipeline.UserAssociation {
var userAssociation []pipeline.UserAssociation
orgUserMapDB := createMapForOrgUsersInDB(orgUsers)
for _, user := range chefServerOrgUsers {
isAdmin, valuePresent := orgUserMapDB[user.Username]
if valuePresent {
//check for the org admins
if user.IsAdmin != isAdmin {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Update})
} else {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Skip})
}
} else {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.Username, IsAdmin: user.IsAdmin, ActionOps: pipeline.Insert})
}
}
return userAssociation
}

func deleteActionForOrgUses(orgUsers []storage.OrgUser, chefServerOrgUsers []pipeline.UserAssociation) []pipeline.UserAssociation {
var userAssociation []pipeline.UserAssociation
orgUserJsonMap := createMapForOrgUsersInJson(chefServerOrgUsers)
for _, user := range orgUsers {
_, valuePresent := orgUserJsonMap[user.InfraServerUsername]
if !valuePresent {
userAssociation = append(userAssociation, pipeline.UserAssociation{Username: user.InfraServerUsername, IsAdmin: user.IsAdmin, ActionOps: pipeline.Delete})
}
}

return userAssociation
}

func createMapForOrgUsersInDB(orgUsers []storage.OrgUser) map[string]bool {
orgUsersMap := make(map[string]bool)
for _, s := range orgUsers {
orgUsersMap[s.InfraServerUsername] = s.IsAdmin
}
return orgUsersMap
}

func createMapForOrgUsersInJson(chefServerOrgUsers []pipeline.UserAssociation) map[string]string {
orgUsersMap := make(map[string]string)
for _, user := range chefServerOrgUsers {
orgUsersMap[user.Username] = ""
}
return orgUsersMap
}

// getChefServerOrgUsers returns the chef server organisation users from backup file
func getChefServerOrgUsers(orgName, fileLocation string) ([]pipeline.UserAssociation, error) {
orgUsers := make([]pipeline.UserAssociation, 0)

members, err := getOrgMembers(orgName, fileLocation)
if err != nil {
log.Errorf("Unable to get orgnisation members %s", err)
return nil, err
}
admins, err := getOrgAdmins(orgName, fileLocation)
if err != nil {
log.Errorf("Unable to get orgnisation admins %s", err)
return nil, err
}
orgAdminMap := createMapForOrgAdminsInJson(admins)
for _, member := range members {
orgUser := pipeline.UserAssociation{}
if member.User.Username == "pivotal" {
continue
}
orgUser.Username = member.User.Username
_, valuePresent := orgAdminMap[member.User.Username]
if valuePresent {
orgUser.IsAdmin = true
}
orgUsers = append(orgUsers, orgUser)
}
return orgUsers, nil
}

// getOrgMembers Get the data of members.json
func getOrgMembers(orgName, fileLocation string) ([]pipeline.MembersJson, error) {
var orgMembers []pipeline.MembersJson
usersJsonPath := path.Join(fileLocation, orgName, "members.json")
usersjsonFile, err := os.Open(usersJsonPath)
// if we os.Open returns an error then handle it
if err != nil {
log.Errorf("Unable to open org members file at the location : %s", usersJsonPath)
return nil, err
}
log.Info("Successfully opened the org members file at location", usersJsonPath)
// defer the closing of our jsonFile so that we can parse it later on
defer func() {
_ = usersjsonFile.Close()
}()
err = json.NewDecoder(usersjsonFile).Decode(&orgMembers)
if err != nil {
log.Errorf("Unable to decode the org members file %s %s", usersJsonPath, err)
return nil, err
}
return orgMembers, nil
}

// getOrgAdmins Get the data of admins.json
func getOrgAdmins(orgName, fileLocation string) (pipeline.AdminsJson, error) {
var orgAdmins pipeline.AdminsJson
adminJsonPath := path.Join(fileLocation, orgName, "groups", "admins.json")
jsonFile, err := os.Open(adminJsonPath)
// if we os.Open returns an error then handle it
if err != nil {
log.Errorf("Unable to open org admins file at the location : %s %s", adminJsonPath, err)
return pipeline.AdminsJson{}, err
}
log.Info("Successfully opened the org admins file at location", adminJsonPath)
// defer the closing of our jsonFile so that we can parse it later on
defer func() {
_ = jsonFile.Close()
}()
err = json.NewDecoder(jsonFile).Decode(&orgAdmins)
if err != nil {
log.Errorf("Unable to decode the org admins file %s %s", adminJsonPath, err)
return pipeline.AdminsJson{}, err
}
return orgAdmins, nil
}

func createMapForOrgAdminsInJson(adminsJson pipeline.AdminsJson) map[string]string {
orgAdminsMap := make(map[string]string)
for _, username := range adminsJson.Users {
orgAdminsMap[username] = ""
}
return orgAdminsMap
}
34 changes: 34 additions & 0 deletions components/infra-proxy-service/migrations/pipeline/utility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,37 @@ func TestCreatePreview(t *testing.T) {
})
}
}

func TestUserOrgAssociation(t *testing.T) {
deleteBackUp := "../../testdata/deleteBackup/"
skipBackup := "../../testdata/skipBackup/"
type args struct {
ctx context.Context
st storage.Storage
result pipeline.Result
}
tests := []struct {
name string
args args
wantError error
want1 pipeline.Result
}{
{name: "Test Insert Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Insert"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: "../../testdata/insertBackup/", ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Insert}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Insert}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Insert}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: true, ActionOps: pipeline.Insert}, {Username: "user2", IsAdmin: false, ActionOps: pipeline.Insert}}}}}}},
{name: "Test Skip Org and Insert Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Insert"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: skipBackup, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: false, ActionOps: pipeline.Insert}, {Username: "user2", IsAdmin: true, ActionOps: pipeline.Insert}}}}}}},
{name: "Test Skip or Update Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Skip"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: skipBackup, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Skip}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Skip}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: false, ActionOps: pipeline.Skip}, {Username: "user2", IsAdmin: true, ActionOps: pipeline.Update}}}}}}},
{name: "Test Delete Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Delete"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: deleteBackUp, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Delete}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Delete}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Delete}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: true, ActionOps: pipeline.Delete}}}}}}},
{name: "Test Skip org and Delete Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Skip"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: deleteBackUp, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Skip}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Skip}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org1", FullName: "Org1_infra", ActionOps: pipeline.Skip}, Users: []pipeline.UserAssociation{{Username: "user1", IsAdmin: true, ActionOps: pipeline.Update}, {Username: "user2", ActionOps: pipeline.Delete}}}}}}},
{name: "Test Skip Org and Insert Org User", args: args{ctx: context.Background(), st: &testDB.TestDB{Type: "Insert"}, result: pipeline.Result{Meta: pipeline.Meta{UnzipFolder: skipBackup, ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Update}}}}}, wantError: nil, want1: pipeline.Result{Meta: pipeline.Meta{ServerID: "server1", MigrationID: "mig1"}, ParsedResult: pipeline.ParsedResult{Orgs: []pipeline.Org{{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Update}}, OrgsUsers: []pipeline.OrgsUsersAssociations{{OrgName: pipeline.Org{Name: "org3", FullName: "Org1_infra", ActionOps: pipeline.Update}, Users: []pipeline.UserAssociation{{Username: "user1", ActionOps: pipeline.Insert}, {Username: "user2", IsAdmin: true, ActionOps: pipeline.Insert}}}}}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseOrgUserAssociation(tt.args.ctx, tt.args.st, tt.args.result)
if err != nil && err.Error() != tt.wantError.Error() {
t.Errorf("ParseOrgUserAssociation() err = %v, want %v", err, tt.wantError)
}
if !reflect.DeepEqual(got.ParsedResult.OrgsUsers, tt.want1.ParsedResult.OrgsUsers) {
t.Errorf("ParseOrgUserAssociation() got = %v, want %v", got.ParsedResult.OrgsUsers, tt.want1.ParsedResult.OrgsUsers)
}
})
}
}
17 changes: 17 additions & 0 deletions components/infra-proxy-service/pipeline/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,16 @@ type OrgsUsersAssociations struct {
// Users UserAssociation slice
Users []UserAssociation `json:"user_association"`
}

type UserAssociation struct {
// Username
Username string `json:"username"`

// IsAdmin
IsAdmin bool `json:"is_admin"`

// ActionOps for Insert Skip Update and Delete for UserAssociation
ActionOps ActionOps `json:"action_ops"`
}

type KeyDump struct {
Expand Down Expand Up @@ -125,3 +129,16 @@ type OrgJson struct {
FullName string `json:"full_name"`
Guid string `json:"guid"`
}

type MembersJson struct {
User UsersJson `json:"user"`
}

type AdminsJson struct {
Name string `json:"name"`
Users []string `json:"users"`
}

type UsersJson struct {
Username string `json:"username"`
}
Loading