Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Meshync Dynamic Configuration from Meshsync #258

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
63ab2a3
add MeshsyncConfig used to receive config from meshync
KiptoonKipkurui Oct 11, 2023
8fecc42
add functionality to process config from
KiptoonKipkurui Oct 11, 2023
c2f557e
refactor to obtain incoporate configs from meshsync
KiptoonKipkurui Oct 11, 2023
80d8263
Merge branch 'master' into feat/kiptoonkipkurui/meshsync_crd
KiptoonKipkurui Oct 11, 2023
439496a
ensure the cr is present before using it
KiptoonKipkurui Oct 23, 2023
2ee712f
check conversion of spec for success
KiptoonKipkurui Oct 23, 2023
543b383
Merge branch 'meshery:master' into feat/kiptoonkipkurui/meshsync_crd
KiptoonKipkurui Oct 23, 2023
55d3cc3
refactor to add checks before using spec map
KiptoonKipkurui Oct 23, 2023
3dcd825
update variable namings for better understandability
KiptoonKipkurui Oct 26, 2023
042eb80
add slice package
KiptoonKipkurui Oct 29, 2023
46256a9
add watched resource model
KiptoonKipkurui Oct 29, 2023
c82b97c
refactor to allow configuration of resources
KiptoonKipkurui Oct 29, 2023
4837992
check whether an event type is supported before
KiptoonKipkurui Oct 29, 2023
95c9d15
check for presence of configs from piplines and
KiptoonKipkurui Oct 29, 2023
0e99363
safe initialize pipeline
KiptoonKipkurui Oct 29, 2023
d1b0c76
go mod tidy package management
KiptoonKipkurui Nov 4, 2023
a01a14a
refactor to supply dynamic kubernetes client
KiptoonKipkurui Nov 4, 2023
6d68702
add unit tests for crd configuration
KiptoonKipkurui Nov 4, 2023
be975d8
add default events to be tracked
KiptoonKipkurui Nov 4, 2023
077384f
refactor for easier testing
KiptoonKipkurui Nov 4, 2023
99d89b2
ensure either whitelist or blacklisted resources allowed
KiptoonKipkurui Nov 7, 2023
89d41ad
Merge branch 'master' into feat/kiptoonkipkurui/meshsync_crd
KiptoonKipkurui Nov 7, 2023
988de5b
update packages to get dynamic kubernetes go client
KiptoonKipkurui Nov 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ require (
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
128 changes: 128 additions & 0 deletions internal/config/crd_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package config

import (
"context"
"errors"

"github.com/layer5io/meshkit/utils"
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

var (
namespace = "meshery" // Namespace for the Custom Resource
crName = "meshery-meshsync" // Name of the custom resource
version = "v1alpha1" // Version of the Custom Resource
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can import it rather than hard coded it https://github.com/meshery/meshery-operator/blob/master/api/v1alpha1/meshsync_types.go. Same like metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update meshsync with this change immediately the meshery operator is approved with the CRD changes

group = "meshery.layer5.io" //Group for the Custom Resource
resource = "meshsyncs" //Name of the Resource
)

func GetMeshsyncCRDConfigs() (*MeshsyncConfig, error) {
// Initialize kubeclient
kubeClient, err := mesherykube.New(nil)
if err != nil {
return nil, ErrInitConfig(err)
}
// initialize the dynamic kube client
dyClient := kubeClient.DynamicKubeClient

// initialize the group version resource to access the custom resource
gvr := schema.GroupVersionResource{Version: version, Group: group, Resource: resource}

// make a call to get the custom resource
crd, err := dyClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), crName, metav1.GetOptions{})

if err != nil {
return nil, ErrInitConfig(err)
}

KiptoonKipkurui marked this conversation as resolved.
Show resolved Hide resolved
if crd == nil {
return nil, ErrInitConfig(errors.New("Custom Resource is nil"))
}

spec := crd.Object["spec"]
specMap, ok := spec.(map[string]interface{})
if !ok {
return nil, ErrInitConfig(errors.New("Unable to convert spec to map"))
}
configObj := specMap["watch-list"]
if configObj == nil {
return nil, ErrInitConfig(errors.New("Custom Resource does not have Meshsync Configs"))
}
configStr, err := utils.Marshal(configObj)
if err != nil {
return nil, ErrInitConfig(err)
}
meshsyncConfig := MeshsyncConfig{}
configMap := corev1.ConfigMap{}
err = utils.Unmarshal(string(configStr), &configMap)

if err != nil {
return nil, ErrInitConfig(err)
}

if _, ok := configMap.Data["blacklist"]; ok {
err = utils.Unmarshal(configMap.Data["blacklist"], &meshsyncConfig.BlackList)
if err != nil {
return nil, ErrInitConfig(err)
}
}

if _, ok := configMap.Data["whitelist"]; ok {
err = utils.Unmarshal(configMap.Data["whitelist"], &meshsyncConfig.WhiteList)
if err != nil {
return nil, ErrInitConfig(err)
}
}

// Handle global resources
globalPipelines := make(PipelineConfigs, 0)
for _, v := range Pipelines[GlobalResourceKey] {
if idx := slices.IndexFunc(meshsyncConfig.WhiteList, func(c ResourceConfig) bool { return c.Resource == v.Name }); idx != -1 {
config := meshsyncConfig.WhiteList[idx]
v.Events = config.Events
globalPipelines = append(globalPipelines, v)
}
}
if len(globalPipelines) > 0 {
meshsyncConfig.Pipelines = map[string]PipelineConfigs{}
meshsyncConfig.Pipelines[GlobalResourceKey] = globalPipelines
}

// Handle local resources
localPipelines := make(PipelineConfigs, 0)
for _, v := range Pipelines[LocalResourceKey] {
if idx := slices.IndexFunc(meshsyncConfig.WhiteList, func(c ResourceConfig) bool { return c.Resource == v.Name }); idx != -1 {
config := meshsyncConfig.WhiteList[idx]
v.Events = config.Events
localPipelines = append(localPipelines, v)
}
}

if len(localPipelines) > 0 {
if meshsyncConfig.Pipelines == nil {
meshsyncConfig.Pipelines = make(map[string]PipelineConfigs)
}
meshsyncConfig.Pipelines[LocalResourceKey] = localPipelines
}

// Handle listeners
listerners := make(ListenerConfigs, 0)
for _, v := range Listeners {
if idx := slices.IndexFunc(meshsyncConfig.WhiteList, func(c ResourceConfig) bool { return c.Resource == v.Name }); idx != -1 {
config := meshsyncConfig.WhiteList[idx]
v.Events = config.Events
listerners = append(listerners, v)
}
}

if len(listerners) > 0 {
meshsyncConfig.Listeners = make(map[string]ListenerConfig)
meshsyncConfig.Listeners = Listeners
}

return &meshsyncConfig, nil
}
28 changes: 22 additions & 6 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,31 @@ const (
type PipelineConfigs []PipelineConfig

type PipelineConfig struct {
Name string `json:"name" yaml:"name"`
PublishTo string `json:"publish-to" yaml:"publish-to"`
Name string `json:"name" yaml:"name"`
PublishTo string `json:"publish-to" yaml:"publish-to"`
Events []string `json:"events" yaml:"events"`
}

type ListenerConfigs []ListenerConfig

type ListenerConfig struct {
Name string `json:"name" yaml:"name"`
ConnectionName string `json:"connection-name" yaml:"connection-name"`
PublishTo string `json:"publish-to" yaml:"publish-to"`
SubscribeTo string `json:"subscribe-to" yaml:"subscribe-to"`
Name string `json:"name" yaml:"name"`
ConnectionName string `json:"connection-name" yaml:"connection-name"`
PublishTo string `json:"publish-to" yaml:"publish-to"`
SubscribeTo string `json:"subscribe-to" yaml:"subscribe-to"`
Events []string `json:"events" yaml:"events"`
}

// Meshsync configuration controls the resources meshsync produces and consumes
type MeshsyncConfig struct {
BlackList []string `json:"blacklist" yaml:"blacklist"`
Pipelines map[string]PipelineConfigs `json:"pipeline-configs,omitempty" yaml:"pipeline-configs,omitempty"`
Listeners map[string]ListenerConfig `json:"listener-config,omitempty" yaml:"listener-config,omitempty"`
WhiteList []ResourceConfig `json:"resource-configs" yaml:"resource-configs"`
}

// Watched Resource configuration
type ResourceConfig struct {
Resource string
Events []string
}
6 changes: 6 additions & 0 deletions internal/pipeline/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/layer5io/meshkit/broker"
internalconfig "github.com/layer5io/meshsync/internal/config"
"github.com/layer5io/meshsync/pkg/model"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -69,6 +70,11 @@ func (ri *RegisterInformer) registerHandlers(s cache.SharedIndexInformer) {
}

func (ri *RegisterInformer) publishItem(obj *unstructured.Unstructured, evtype broker.EventType, config internalconfig.PipelineConfig) error {

// if the event is not supported skip
if !slices.Contains(ri.config.Events, string(evtype)) {
return nil
}
err := ri.broker.Publish(config.PublishTo, &broker.Message{
ObjectType: broker.MeshSync,
EventType: evtype,
Expand Down
19 changes: 19 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ func main() {
os.Exit(1)
}

// get configs from meshsync crd if available
crdConfigs, err := config.GetMeshsyncCRDConfigs()

if err != nil {
// no configs found from meshsync CRD log warning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure the log level should be warning level, please get confirm from @leecalcote @theBeginner86. I assume that if we cannot get configs, it should be the Info level? Because we have the default configuraiton.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The choice of using a warning log is because the desired state of configuration for Meshsync from now and in future lies is utilizing the crd as per conversations with @leecalcote and @theBeginner86 so this is an extra speed bump to notify one that the desired approach has not been utilized

log.Warn(err)
}

// pass configs from crd to default configs
if crdConfigs != nil {
if len(crdConfigs.Pipelines) > 0 {
config.Pipelines = crdConfigs.Pipelines
}

if len(crdConfigs.Listeners) > 0 {
config.Listeners = crdConfigs.Listeners
}
}

// Config init and seed
cfg, err := config.New(provider)
if err != nil {
Expand Down
Loading