From 83dd902a4120ac7bbc0147977479a87653065a97 Mon Sep 17 00:00:00 2001 From: Ashish Tiwari Date: Thu, 1 Sep 2022 23:38:39 +0530 Subject: [PATCH] Add Event streamer struct Signed-off-by: Ashish Tiwari --- consul/consul.go | 8 ++++---- consul/oam.go | 27 ++++++++++++++++++++++++--- consul/operations.go | 15 +++++++-------- go.mod | 4 ++-- go.sum | 8 ++++---- main.go | 7 ++++--- 6 files changed, 45 insertions(+), 24 deletions(-) diff --git a/consul/consul.go b/consul/consul.go index 5ca7d0d9..31186b07 100644 --- a/consul/consul.go +++ b/consul/consul.go @@ -23,6 +23,7 @@ import ( "github.com/layer5io/meshkit/logger" "github.com/layer5io/meshkit/models" "github.com/layer5io/meshkit/models/oam/core/v1alpha1" + "github.com/layer5io/meshkit/utils/events" "gopkg.in/yaml.v2" ) @@ -30,9 +31,9 @@ type Consul struct { adapter.Adapter } -func New(config config.Handler, log logger.Handler, kubeConfig config.Handler) adapter.Handler { +func New(config config.Handler, log logger.Handler, kubeConfig config.Handler, e *events.EventStreamer) adapter.Handler { return &Consul{ - adapter.Adapter{Config: config, Log: log, KubeconfigHandler: kubeConfig}, + adapter.Adapter{Config: config, Log: log, KubeconfigHandler: kubeConfig, EventStreamer: e}, } } @@ -82,12 +83,11 @@ func (h *Consul) CreateKubeconfigs(kubeconfigs []string) error { } // ProcessOAM will handles the grpc invocation for handling OAM objects -func (h *Consul) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) { +func (h *Consul) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (string, error) { err := h.CreateKubeconfigs(oamReq.K8sConfigs) if err != nil { return "", err } - h.SetChannel(hchan) kubeconfigs := oamReq.K8sConfigs var comps []v1alpha1.Component for _, acomp := range oamReq.OamComps { diff --git a/consul/oam.go b/consul/oam.go index 401efb0d..c946af02 100644 --- a/consul/oam.go +++ b/consul/oam.go @@ -5,6 +5,9 @@ import ( "strings" "sync" + "github.com/google/uuid" + "github.com/layer5io/meshery-adapter-library/meshes" + "github.com/layer5io/meshery-consul/internal/config" "github.com/layer5io/meshkit/models/oam/core/v1alpha1" mesherykube "github.com/layer5io/meshkit/utils/kubernetes" "gopkg.in/yaml.v2" @@ -16,29 +19,47 @@ type CompHandler func(*Consul, v1alpha1.Component, bool, []string) (string, erro func (h *Consul) HandleComponents(comps []v1alpha1.Component, isDel bool, kubeconfigs []string) (string, error) { var errs []error var msgs []string - + stat1 := "deploying" + stat2 := "deployed" + if isDel { + stat1 = "removing" + stat2 = "removed" + } compFuncMap := map[string]CompHandler{ "ConsulMesh": handleComponentConsulMesh, } for _, comp := range comps { + ee := &meshes.EventsResponse{ + OperationId: uuid.New().String(), + Component: config.ServerDefaults["type"], + ComponentName: config.ServerDefaults["name"], + } fnc, ok := compFuncMap[comp.Spec.Type] if !ok { msg, err := handleConsulCoreComponents(h, comp, isDel, "", "", kubeconfigs) if err != nil { + ee.Summary = fmt.Sprintf("Error while %s %s", stat1, comp.Spec.Type) + h.streamErr(ee.Summary, ee, err) errs = append(errs, err) continue } - + ee.Summary = fmt.Sprintf("%s %s successfully", comp.Spec.Type, stat2) + ee.Details = fmt.Sprintf("The %s is now %s.", comp.Spec.Type, stat2) + h.StreamInfo(ee) msgs = append(msgs, msg) continue } msg, err := fnc(h, comp, isDel, kubeconfigs) if err != nil { + ee.Summary = fmt.Sprintf("Error while %s %s", stat1, comp.Spec.Type) + h.streamErr(ee.Summary, ee, err) errs = append(errs, err) continue } - + ee.Summary = fmt.Sprintf("%s %s %s successfully", comp.Name, comp.Spec.Type, stat2) + ee.Details = fmt.Sprintf("The %s %s is now %s.", comp.Name, comp.Spec.Type, stat2) + h.StreamInfo(ee) msgs = append(msgs, msg) } if err := mergeErrors(errs); err != nil { diff --git a/consul/operations.go b/consul/operations.go index 78ed6657..5b3ac1ca 100644 --- a/consul/operations.go +++ b/consul/operations.go @@ -27,12 +27,11 @@ import ( mesherykube "github.com/layer5io/meshkit/utils/kubernetes" ) -func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRequest, hchan *chan interface{}) error { +func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRequest) error { err := h.CreateKubeconfigs(request.K8sConfigs) if err != nil { return err } - h.SetChannel(hchan) kubeconfigs := request.K8sConfigs operations := make(adapter.Operations) err = h.Config.GetObject(adapter.OperationsKey, &operations) @@ -42,10 +41,10 @@ func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRe //status := opstatus.Deploying e := &meshes.EventsResponse{ - OperationId: request.OperationID, - Summary: "Deploying", - Details: "None", - Component: config.ServerDefaults["type"], + OperationId: request.OperationID, + Summary: "Deploying", + Details: "None", + Component: config.ServerDefaults["type"], ComponentName: config.ServerDefaults["name"], } @@ -114,7 +113,7 @@ func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRe APIServerURL: kClient.RestConfig.Host, }) if err1 != nil { - summary := fmt.Sprintf("Unable to retrieve service endpoint for the service %s.", svc) + summary := fmt.Sprintf("Unable to retrieve service endpoint for the service %s.", svc) h.streamErr(summary, e, err1) } else { external := "N/A" @@ -142,7 +141,7 @@ func (h *Consul) ApplyOperation(ctx context.Context, request adapter.OperationRe return nil } -func(h *Consul) streamErr(summary string, e *meshes.EventsResponse, err error) { +func (h *Consul) streamErr(summary string, e *meshes.EventsResponse, err error) { e.Summary = summary e.Details = err.Error() e.ErrorCode = errors.GetCode(err) diff --git a/go.mod b/go.mod index 30161288..c8f89499 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,8 @@ replace ( ) require ( - github.com/layer5io/meshery-adapter-library v0.5.9 - github.com/layer5io/meshkit v0.5.32 + github.com/layer5io/meshery-adapter-library v0.5.10 + github.com/layer5io/meshkit v0.5.37 github.com/layer5io/service-mesh-performance v0.3.4 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 82f19b3e..765654dd 100644 --- a/go.sum +++ b/go.sum @@ -865,10 +865,10 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6Fm github.com/layer5io/kuttl v0.4.1-0.20200723152044-916f10574334/go.mod h1:UmrVd7x+bNVKrpmKgTtfRiTKHZeNPcMjQproJ0vGwhE= github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34 h1:QaViadDOBCMDUwYx78kfRvHMkzRVnh/GOhm3s2gxoP4= github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw= -github.com/layer5io/meshery-adapter-library v0.5.9 h1:Zp79l4J8kMjML9zAQ4Xu4QiKM5q5HEGcv04Jjg+xWSA= -github.com/layer5io/meshery-adapter-library v0.5.9/go.mod h1:IvURQMnZHa3z0OTcUSPqCHUgTsW2x0/+KjCqpYfMbt0= -github.com/layer5io/meshkit v0.5.32 h1:jIkQ9gKH7TPMWKbVtf6wQ+qv4553UyZ9SV4yKA2D4oo= -github.com/layer5io/meshkit v0.5.32/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0= +github.com/layer5io/meshery-adapter-library v0.5.10 h1:Qgr6vDx2s10mkhtk7Mnz5I73m/9yf2yyjCkPMeB4jmA= +github.com/layer5io/meshery-adapter-library v0.5.10/go.mod h1:Sg6WNN82uRo2kiFDEMc/LM/AJ/Pu6ZmBZGbFxZuC7zc= +github.com/layer5io/meshkit v0.5.37 h1:EO0wXAI+eqAm+4uKSzFd50rDkr6nqQ17m1j0wmv9hQA= +github.com/layer5io/meshkit v0.5.37/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0= github.com/layer5io/service-mesh-performance v0.3.2-0.20210122142912-a94e0658b021/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ= github.com/layer5io/service-mesh-performance v0.3.4 h1:aw/elsx0wkry7SyiQRIj31wW7TPCP4YfhINdNOLXVg8= github.com/layer5io/service-mesh-performance v0.3.4/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ= diff --git a/main.go b/main.go index 0fdbbee5..f624f622 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,7 @@ import ( "github.com/layer5io/meshery-consul/consul/oam" "github.com/layer5io/meshery-consul/internal/config" configprovider "github.com/layer5io/meshkit/config/provider" + "github.com/layer5io/meshkit/utils/events" "github.com/layer5io/meshery-adapter-library/api/grpc" "github.com/layer5io/meshery-consul/build" @@ -73,9 +74,9 @@ func main() { os.Exit(1) } log.Info(fmt.Sprintf("KUBECONFIG: %s", kubeconfig)) - - service.Handler = consul.New(cfg, log, kubeCfg) - service.Channel = make(chan interface{}, 100) + e := events.NewEventStreamer() + service.Handler = consul.New(cfg, log, kubeCfg, e) + service.EventStreamer = e service.StartedAt = time.Now() service.Version = version service.GitSHA = gitsha