Skip to content

Commit

Permalink
Merge pull request #302 from Revolyssup/noti
Browse files Browse the repository at this point in the history
Add Event streamer struct
  • Loading branch information
Revolyssup authored Sep 16, 2022
2 parents 0a17b86 + 83dd902 commit 4d36c22
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 24 deletions.
8 changes: 4 additions & 4 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import (
"github.com/layer5io/meshkit/logger"
"github.com/layer5io/meshkit/models"
"github.com/layer5io/meshkit/models/oam/core/v1alpha1"
"github.com/layer5io/meshkit/utils/events"
"gopkg.in/yaml.v2"
)

type Consul struct {
adapter.Adapter
}

func New(config config.Handler, log logger.Handler, kubeConfig config.Handler) adapter.Handler {
func New(config config.Handler, log logger.Handler, kubeConfig config.Handler, e *events.EventStreamer) adapter.Handler {
return &Consul{
adapter.Adapter{Config: config, Log: log, KubeconfigHandler: kubeConfig},
adapter.Adapter{Config: config, Log: log, KubeconfigHandler: kubeConfig, EventStreamer: e},
}
}

Expand Down Expand Up @@ -82,12 +83,11 @@ func (h *Consul) CreateKubeconfigs(kubeconfigs []string) error {
}

// ProcessOAM will handles the grpc invocation for handling OAM objects
func (h *Consul) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) {
func (h *Consul) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (string, error) {
err := h.CreateKubeconfigs(oamReq.K8sConfigs)
if err != nil {
return "", err
}
h.SetChannel(hchan)
kubeconfigs := oamReq.K8sConfigs
var comps []v1alpha1.Component
for _, acomp := range oamReq.OamComps {
Expand Down
27 changes: 24 additions & 3 deletions consul/oam.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"strings"
"sync"

"github.com/google/uuid"
"github.com/layer5io/meshery-adapter-library/meshes"
"github.com/layer5io/meshery-consul/internal/config"
"github.com/layer5io/meshkit/models/oam/core/v1alpha1"
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
"gopkg.in/yaml.v2"
Expand All @@ -16,29 +19,47 @@ type CompHandler func(*Consul, v1alpha1.Component, bool, []string) (string, erro
func (h *Consul) HandleComponents(comps []v1alpha1.Component, isDel bool, kubeconfigs []string) (string, error) {
var errs []error
var msgs []string

stat1 := "deploying"
stat2 := "deployed"
if isDel {
stat1 = "removing"
stat2 = "removed"
}
compFuncMap := map[string]CompHandler{
"ConsulMesh": handleComponentConsulMesh,
}
for _, comp := range comps {
ee := &meshes.EventsResponse{
OperationId: uuid.New().String(),
Component: config.ServerDefaults["type"],
ComponentName: config.ServerDefaults["name"],
}
fnc, ok := compFuncMap[comp.Spec.Type]
if !ok {
msg, err := handleConsulCoreComponents(h, comp, isDel, "", "", kubeconfigs)
if err != nil {
ee.Summary = fmt.Sprintf("Error while %s %s", stat1, comp.Spec.Type)
h.streamErr(ee.Summary, ee, err)
errs = append(errs, err)
continue
}

ee.Summary = fmt.Sprintf("%s %s successfully", comp.Spec.Type, stat2)
ee.Details = fmt.Sprintf("The %s is now %s.", comp.Spec.Type, stat2)
h.StreamInfo(ee)
msgs = append(msgs, msg)
continue
}

msg, err := fnc(h, comp, isDel, kubeconfigs)
if err != nil {
ee.Summary = fmt.Sprintf("Error while %s %s", stat1, comp.Spec.Type)
h.streamErr(ee.Summary, ee, err)
errs = append(errs, err)
continue
}

ee.Summary = fmt.Sprintf("%s %s %s successfully", comp.Name, comp.Spec.Type, stat2)
ee.Details = fmt.Sprintf("The %s %s is now %s.", comp.Name, comp.Spec.Type, stat2)
h.StreamInfo(ee)
msgs = append(msgs, msg)
}
if err := mergeErrors(errs); err != nil {
Expand Down
15 changes: 7 additions & 8 deletions consul/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import (
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
)

func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRequest, hchan *chan interface{}) error {
func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRequest) error {
err := h.CreateKubeconfigs(request.K8sConfigs)
if err != nil {
return err
}
h.SetChannel(hchan)
kubeconfigs := request.K8sConfigs
operations := make(adapter.Operations)
err = h.Config.GetObject(adapter.OperationsKey, &operations)
Expand All @@ -42,10 +41,10 @@ func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRe

//status := opstatus.Deploying
e := &meshes.EventsResponse{
OperationId: request.OperationID,
Summary: "Deploying",
Details: "None",
Component: config.ServerDefaults["type"],
OperationId: request.OperationID,
Summary: "Deploying",
Details: "None",
Component: config.ServerDefaults["type"],
ComponentName: config.ServerDefaults["name"],
}

Expand Down Expand Up @@ -114,7 +113,7 @@ func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRe
APIServerURL: kClient.RestConfig.Host,
})
if err1 != nil {
summary := fmt.Sprintf("Unable to retrieve service endpoint for the service %s.", svc)
summary := fmt.Sprintf("Unable to retrieve service endpoint for the service %s.", svc)
h.streamErr(summary, e, err1)
} else {
external := "N/A"
Expand Down Expand Up @@ -142,7 +141,7 @@ func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRe
return nil
}

func(h *Consul) streamErr(summary string, e *meshes.EventsResponse, err error) {
func (h *Consul) streamErr(summary string, e *meshes.EventsResponse, err error) {
e.Summary = summary
e.Details = err.Error()
e.ErrorCode = errors.GetCode(err)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ replace (
)

require (
github.com/layer5io/meshery-adapter-library v0.5.9
github.com/layer5io/meshkit v0.5.32
github.com/layer5io/meshery-adapter-library v0.5.10
github.com/layer5io/meshkit v0.5.37
github.com/layer5io/service-mesh-performance v0.3.4
gopkg.in/yaml.v2 v2.4.0
)
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -865,10 +865,10 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6Fm
github.com/layer5io/kuttl v0.4.1-0.20200723152044-916f10574334/go.mod h1:UmrVd7x+bNVKrpmKgTtfRiTKHZeNPcMjQproJ0vGwhE=
github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34 h1:QaViadDOBCMDUwYx78kfRvHMkzRVnh/GOhm3s2gxoP4=
github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw=
github.com/layer5io/meshery-adapter-library v0.5.9 h1:Zp79l4J8kMjML9zAQ4Xu4QiKM5q5HEGcv04Jjg+xWSA=
github.com/layer5io/meshery-adapter-library v0.5.9/go.mod h1:IvURQMnZHa3z0OTcUSPqCHUgTsW2x0/+KjCqpYfMbt0=
github.com/layer5io/meshkit v0.5.32 h1:jIkQ9gKH7TPMWKbVtf6wQ+qv4553UyZ9SV4yKA2D4oo=
github.com/layer5io/meshkit v0.5.32/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0=
github.com/layer5io/meshery-adapter-library v0.5.10 h1:Qgr6vDx2s10mkhtk7Mnz5I73m/9yf2yyjCkPMeB4jmA=
github.com/layer5io/meshery-adapter-library v0.5.10/go.mod h1:Sg6WNN82uRo2kiFDEMc/LM/AJ/Pu6ZmBZGbFxZuC7zc=
github.com/layer5io/meshkit v0.5.37 h1:EO0wXAI+eqAm+4uKSzFd50rDkr6nqQ17m1j0wmv9hQA=
github.com/layer5io/meshkit v0.5.37/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0=
github.com/layer5io/service-mesh-performance v0.3.2-0.20210122142912-a94e0658b021/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ=
github.com/layer5io/service-mesh-performance v0.3.4 h1:aw/elsx0wkry7SyiQRIj31wW7TPCP4YfhINdNOLXVg8=
github.com/layer5io/service-mesh-performance v0.3.4/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ=
Expand Down
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/layer5io/meshery-consul/consul/oam"
"github.com/layer5io/meshery-consul/internal/config"
configprovider "github.com/layer5io/meshkit/config/provider"
"github.com/layer5io/meshkit/utils/events"

"github.com/layer5io/meshery-adapter-library/api/grpc"
"github.com/layer5io/meshery-consul/build"
Expand Down Expand Up @@ -73,9 +74,9 @@ func main() {
os.Exit(1)
}
log.Info(fmt.Sprintf("KUBECONFIG: %s", kubeconfig))

service.Handler = consul.New(cfg, log, kubeCfg)
service.Channel = make(chan interface{}, 100)
e := events.NewEventStreamer()
service.Handler = consul.New(cfg, log, kubeCfg, e)
service.EventStreamer = e
service.StartedAt = time.Now()
service.Version = version
service.GitSHA = gitsha
Expand Down

0 comments on commit 4d36c22

Please sign in to comment.