-
Notifications
You must be signed in to change notification settings - Fork 113
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Pipeline demo * Demo Pipeline Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Based on PR Review Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Models Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Updateed select statement Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Removed package-lock.json Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Pipeline to process and close the go routines Signed-off-by: Kallol Roy <karoy@progress.com> * Feature/models (#6655) * Pipeline demo * Demo Pipeline Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Based on PR Review Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Models Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Models done * Package JSON change Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Package JSON change Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Package JSON change Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> * Setup pipeline to process data Signed-off-by: Kallol Roy <karoy@progress.com> * Some comment corrections Signed-off-by: Pappu Kumar <pappu.kumar@progress.com> Co-authored-by: Kallol Roy <karoy@progress.com>
- Loading branch information
1 parent
2c5f432
commit 3c4183e
Showing
4 changed files
with
356 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
231 changes: 231 additions & 0 deletions
231
components/infra-proxy-service/migrations/pipeline/phaseonemigration.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
package pipeline | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
) | ||
|
||
type PipelineData struct { | ||
Result Result | ||
Done chan<- error | ||
Ctx context.Context | ||
} | ||
|
||
type PhaseOnePipleine struct { | ||
in chan<- PipelineData | ||
} | ||
|
||
type PhaseOnePipelineProcessor func(<-chan PipelineData) <-chan PipelineData | ||
|
||
// ParseOrg returns PhaseOnePipelineProcessor | ||
func UnzipSrc() PhaseOnePipelineProcessor { | ||
return func(result <-chan PipelineData) <-chan PipelineData { | ||
return unzip(result) | ||
} | ||
} | ||
|
||
func unzip(result <-chan PipelineData) <-chan PipelineData { | ||
fmt.Println("Starting unzip pipeline") | ||
out := make(chan PipelineData, 100) | ||
go func() { | ||
|
||
for res := range result { | ||
res.Result.Meta.UnzipFolder = "backup" | ||
select { | ||
case out <- res: | ||
case <-res.Ctx.Done(): | ||
res.Done <- nil | ||
} | ||
} | ||
fmt.Println("Closing unzip") | ||
close(out) | ||
}() | ||
return out | ||
} | ||
|
||
// ParseOrg returns PhaseOnePipelineProcessor | ||
func ParseOrg() PhaseOnePipelineProcessor { | ||
return func(result <-chan PipelineData) <-chan PipelineData { | ||
return parseOrg(result) | ||
} | ||
} | ||
|
||
func parseOrg(result <-chan PipelineData) <-chan PipelineData { | ||
fmt.Println("Starting to parse_orgs pipeline") | ||
|
||
out := make(chan PipelineData, 100) | ||
|
||
go func() { | ||
fmt.Println("Processing to parse orgs...") | ||
for res := range result { | ||
select { | ||
case out <- res: | ||
case <-res.Ctx.Done(): | ||
res.Done <- nil | ||
} | ||
fmt.Println("after write") | ||
} | ||
fmt.Println("CLosing parse_orgs pipeline") | ||
close(out) | ||
}() | ||
return out | ||
} | ||
|
||
// ParseUser returns PhaseOnePipelineProcessor | ||
func ParseUser() PhaseOnePipelineProcessor { | ||
return func(result <-chan PipelineData) <-chan PipelineData { | ||
return parseUser(result) | ||
} | ||
} | ||
|
||
func parseUser(result <-chan PipelineData) <-chan PipelineData { | ||
fmt.Println("Starting to parse_user pipeline") | ||
|
||
out := make(chan PipelineData, 100) | ||
|
||
go func() { | ||
fmt.Println("Processing to parse_user...") | ||
for res := range result { | ||
select { | ||
case out <- res: | ||
case <-res.Ctx.Done(): | ||
res.Done <- nil | ||
} | ||
} | ||
fmt.Println("Closing parse_user") | ||
close(out) | ||
}() | ||
|
||
return out | ||
} | ||
|
||
// ConflictingUsers returns PhaseOnePipelineProcessor | ||
func ConflictingUsers() PhaseOnePipelineProcessor { | ||
return func(result <-chan PipelineData) <-chan PipelineData { | ||
return conflictingUsers(result) | ||
} | ||
} | ||
|
||
func conflictingUsers(result <-chan PipelineData) <-chan PipelineData { | ||
fmt.Println("Starting to conflicting_user check pipeline") | ||
|
||
out := make(chan PipelineData, 100) | ||
|
||
go func() { | ||
fmt.Println("Processing to conflicting_user users...") | ||
|
||
for res := range result { | ||
select { | ||
case out <- res: | ||
case <-res.Ctx.Done(): | ||
res.Done <- nil | ||
} | ||
} | ||
fmt.Println("Closing conflicting_user") | ||
close(out) | ||
|
||
}() | ||
|
||
return out | ||
} | ||
|
||
// OrgMembers returns PhaseOnePipelineProcessor | ||
func OrgMembers() PhaseOnePipelineProcessor { | ||
return func(result <-chan PipelineData) <-chan PipelineData { | ||
return orgMembers(result) | ||
} | ||
} | ||
|
||
func orgMembers(result <-chan PipelineData) <-chan PipelineData { | ||
fmt.Println("Starting to org_user check pipeline") | ||
|
||
out := make(chan PipelineData, 100) | ||
|
||
go func() { | ||
fmt.Println("Processing to check org_user association...") | ||
for res := range result { | ||
select { | ||
case out <- res: | ||
case <-res.Ctx.Done(): | ||
res.Done <- nil | ||
} | ||
} | ||
fmt.Println("Closing org_user association check") | ||
close(out) | ||
|
||
}() | ||
|
||
return out | ||
} | ||
|
||
// AdminUsers Return PhaseOnePipelineProcessor | ||
func AdminUsers() PhaseOnePipelineProcessor { | ||
return func(result <-chan PipelineData) <-chan PipelineData { | ||
return adminUsers(result) | ||
} | ||
} | ||
|
||
func adminUsers(result <-chan PipelineData) <-chan PipelineData { | ||
fmt.Println("Starting org admin_users check ") | ||
|
||
out := make(chan PipelineData, 100) | ||
|
||
go func() { | ||
fmt.Println("Processing to to check admin_users...") | ||
for res := range result { | ||
select { | ||
case out <- res: | ||
case <-res.Ctx.Done(): | ||
res.Done <- nil | ||
} | ||
} | ||
fmt.Println("Closing admin_users") | ||
close(out) | ||
|
||
}() | ||
|
||
return out | ||
} | ||
|
||
func migrationPipeline(source <-chan PipelineData, pipes ...PhaseOnePipelineProcessor) { | ||
fmt.Println("Pipeline started...") | ||
|
||
go func() { | ||
for _, pipe := range pipes { | ||
source = pipe(source) | ||
} | ||
|
||
for s := range source { | ||
s.Done <- nil | ||
} | ||
}() | ||
} | ||
|
||
func SetupPhaseOnePipeline() PhaseOnePipleine { | ||
c := make(chan PipelineData, 100) | ||
migrationPipeline(c, | ||
UnzipSrc(), | ||
ParseOrg(), | ||
ParseUser(), | ||
ConflictingUsers(), | ||
OrgMembers(), | ||
AdminUsers(), | ||
) | ||
return PhaseOnePipleine{in: c} | ||
} | ||
|
||
func (p *PhaseOnePipleine) Run(result Result) { | ||
go func() { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
done := make(chan error) | ||
select { | ||
case p.in <- PipelineData{Result: result, Done: done, Ctx: ctx}: | ||
} | ||
err := <-done | ||
if err != nil { | ||
fmt.Println("received error") | ||
} | ||
fmt.Println("received done") | ||
}() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,20 @@ | ||
package migrations | ||
|
||
import ( | ||
"github.com/chef/automate/components/infra-proxy-service/migrations/pipeline" | ||
"github.com/chef/automate/components/infra-proxy-service/service" | ||
) | ||
|
||
type MigrationServer struct { | ||
service *service.Service | ||
service *service.Service | ||
phaseOnePipeline pipeline.PhaseOnePipleine | ||
} | ||
|
||
// NewMigrationServer returns an infra-proxy migration server | ||
func NewMigrationServer(service *service.Service) *MigrationServer { | ||
c := pipeline.SetupPhaseOnePipeline() | ||
return &MigrationServer{ | ||
service: service, | ||
service: service, | ||
phaseOnePipeline: c, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
package pipeline | ||
|
||
type ActionOps int | ||
|
||
const ( | ||
Insert ActionOps = 1 + iota | ||
Skip | ||
Delete | ||
Update | ||
) | ||
|
||
// | ||
type Result struct { | ||
// Meta for Zip file info | ||
Meta Meta `json:"meta"` | ||
|
||
// ParsedResult for Orgs, Users | ||
ParsedResult ParsedResult `json:"parsed_result"` | ||
} | ||
|
||
type Meta struct { | ||
// StageResults holds Skipped instances | ||
StageResults []StageResult `json:"stage_results"` | ||
|
||
// ZipFile for zip file location | ||
ZipFile string `json:"zip_file"` | ||
|
||
// UnzipFolder for unzipped folder's location | ||
UnzipFolder string `json:"unzip_folder"` | ||
} | ||
|
||
type StageResult struct { | ||
StageName string `json:"stage_name"` | ||
IsSuccess bool `json:"is_success"` | ||
Failure error `json:"failure"` | ||
} | ||
|
||
type ParsedResult struct { | ||
// Orgs array | ||
Orgs []Org `json:"orgs"` | ||
|
||
// Users array | ||
Users []User `json:"users"` | ||
|
||
// OrgsUsers for Orgs and Users associations | ||
OrgsUsers []OrgsUsersAssociations `json:"orgs_users_associations"` | ||
} | ||
|
||
// OrgsUsersAssociations | ||
type OrgsUsersAssociations struct { | ||
// OrgName | ||
OrgName Org `json:"org_name"` | ||
|
||
// Users UserAssociation slice | ||
Users []UserAssociation `json:"user_association"` | ||
} | ||
type UserAssociation struct { | ||
// Username | ||
Username string `json:"username"` | ||
|
||
// IsAdmin | ||
IsAdmin bool `json:"is_admin"` | ||
} | ||
|
||
type KeyDump struct { | ||
ID string `json:"id"` | ||
AuthzID string `json:"authz_id"` | ||
Username string `json:"username"` | ||
Email string `json:"email"` | ||
PubkeyVersion int `json:"pubkey_version"` | ||
PublicKey interface{} `json:"public_key"` | ||
SerializedObject string `json:"serialized_object"` | ||
LastUpdatedBy string `json:"last_updated_by"` | ||
CreatedAt string `json:"created_at"` | ||
UpdatedAt string `json:"updated_at"` | ||
ExternalAuthenticationUID interface{} `json:"external_authentication_uid"` | ||
RecoveryAuthenticationEnabled interface{} `json:"recovery_authentication_enabled"` | ||
Admin bool `json:"admin"` | ||
HashedPassword interface{} `json:"hashed_password"` | ||
Salt interface{} `json:"salt"` | ||
HashType interface{} `json:"hash_type"` | ||
} | ||
|
||
type Org struct { | ||
|
||
// Org Name | ||
Name string `json:"name"` | ||
|
||
// FullName | ||
FullName string `json:"full_name"` | ||
|
||
// ActionOps for Insert Skip Update and Delete | ||
ActionOps ActionOps `json:"guid"` | ||
} | ||
|
||
type User struct { | ||
Username string `json:"username"` | ||
Email string `json:"email"` | ||
DisplayName string `json:"display_name"` | ||
FirstName string `json:"first_name"` | ||
LastName string `json:"last_name"` | ||
MiddleName string `json:"middle_name"` | ||
|
||
// AutomateUsername is ldap username | ||
AutomateUsername string `json:"automate_username"` | ||
|
||
// Connector ldap user | ||
Connector string `json:"connector"` | ||
|
||
// IsConflicting for user's existance in db | ||
IsConflicting bool `json:"is_conflicting"` | ||
|
||
// IsAdmin (user is admin or not) | ||
IsAdmin bool `json:"is_admin"` | ||
} |