Skip to content

Commit

Permalink
Allow topology to be configured day 2 operation (#2362) (#2412)
Browse files Browse the repository at this point in the history
* Allow topology to be configured as day2 operation

Only allow topology to be configured as day2 operation if
it was not configured before.

* Fix syncer code to update topology

Enable update events

* Move json-patch as an explicit dependency

* Only reset topology status on vanilla clusters

* Rename topologyFound to csiNodeTopologyFound
  • Loading branch information
gnufied authored Jun 2, 2023
1 parent 79a5aa2 commit 8564eec
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 56 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/akutz/gofsutil v0.1.2
github.com/container-storage-interface/spec v1.7.0
github.com/davecgh/go-spew v1.1.1
github.com/evanphx/json-patch/v5 v5.6.0
github.com/fsnotify/fsnotify v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -70,7 +71,6 @@ require (
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/euank/go-kmsg-parser v2.0.0+incompatible // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/go-errors/errors v1.0.1 // indirect
Expand Down
171 changes: 121 additions & 50 deletions pkg/csi/service/common/commonco/k8sorchestrator/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package k8sorchestrator

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -28,6 +29,7 @@ import (
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
jsonpatch "github.com/evanphx/json-patch/v5"
cnstypes "github.com/vmware/govmomi/cns/types"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25/mo"
Expand Down Expand Up @@ -738,63 +740,43 @@ func (volTopology *nodeVolumeTopology) GetNodeTopologyLabels(ctx context.Context
log := logger.GetLogger(ctx)

var err error
if volTopology.isCSINodeIdFeatureEnabled && volTopology.clusterFlavor == cnstypes.CnsClusterFlavorVanilla {
csiNodeTopology := &csinodetopologyv1alpha1.CSINodeTopology{}
csiNodeTopologyKey := types.NamespacedName{
Name: nodeInfo.NodeName,
}

// Get CsiNodeTopology instance
err = volTopology.csiNodeTopologyK8sClient.Get(ctx, csiNodeTopologyKey, csiNodeTopology)
if err != nil {
if apierrors.IsNotFound(err) {
err = createCSINodeTopologyInstance(ctx, volTopology, nodeInfo)
if err != nil {
return nil, logger.LogNewErrorCodef(log, codes.Internal, err.Error())
}
} else {
msg := fmt.Sprintf("failed to get CsiNodeTopology for the node: %q. Error: %+v", nodeInfo.NodeName, err)
return nil, logger.LogNewErrorCodef(log, codes.Internal, msg)
}
} else {
// If CSINodeTopology instance already exists, check if the NodeUUID
// parameter in Spec is populated. If not, patch the instance.
if csiNodeTopology.Spec.NodeUUID == "" ||
csiNodeTopology.Spec.NodeUUID != nodeInfo.NodeID {
if csiNodeTopology.Spec.NodeUUID == "" {
log.Infof("CSINodeTopology instance: %q with empty nodeUUID found. "+
"Patching the instance with nodeUUID", nodeInfo.NodeName)
} else {
log.Infof("CSINodeTopology instance: %q with different "+
"nodeUUID: %s found. Patching the instance with nodeUUID: %s",
nodeInfo.NodeName, csiNodeTopology.Spec.NodeUUID, nodeInfo.NodeID)
}
patch := []byte(fmt.Sprintf(`{"spec":{"nodeID":"%s","nodeuuid":"%s"}}`, nodeInfo.NodeName, nodeInfo.NodeID))
// Patch the CSINodeTopology instance with nodeUUID
err = volTopology.csiNodeTopologyK8sClient.Patch(ctx,
&csinodetopologyv1alpha1.CSINodeTopology{
ObjectMeta: metav1.ObjectMeta{
Name: nodeInfo.NodeName,
},
},
client.RawPatch(types.MergePatchType, patch))
if err != nil {
msg := fmt.Sprintf("Fail to patch CsiNodeTopology for the node: %q "+
"with nodeUUID: %s. Error: %+v",
nodeInfo.NodeName, nodeInfo.NodeID, err)
return nil, logger.LogNewErrorCodef(log, codes.Internal, msg)
}
log.Infof("Successfully patched CSINodeTopology instance: %q with Uuid: %q",
nodeInfo.NodeName, nodeInfo.NodeID)
}
csiNodeTopology := &csinodetopologyv1alpha1.CSINodeTopology{}
csiNodeTopologyKey := types.NamespacedName{
Name: nodeInfo.NodeName,
}
err = volTopology.csiNodeTopologyK8sClient.Get(ctx, csiNodeTopologyKey, csiNodeTopology)
csiNodeTopologyFound := true
if err != nil {
if !apierrors.IsNotFound(err) {
msg := fmt.Sprintf("failed to get CsiNodeTopology for the node: %q. Error: %+v", nodeInfo.NodeName, err)
return nil, logger.LogNewErrorCodef(log, codes.Internal, msg)
}
} else {
csiNodeTopologyFound = false
err = createCSINodeTopologyInstance(ctx, volTopology, nodeInfo)
if err != nil {
return nil, logger.LogNewErrorCodef(log, codes.Internal, err.Error())
}
}

// there is an already existing topology
if csiNodeTopologyFound && volTopology.clusterFlavor == cnstypes.CnsClusterFlavorVanilla {
newCSINodeTopology := csiNodeTopology.DeepCopy()

if volTopology.isCSINodeIdFeatureEnabled {
newCSINodeTopology = volTopology.updateNodeIDForTopology(ctx, nodeInfo, newCSINodeTopology)
}
// reset the status so as syncer can sync the object again
newCSINodeTopology.Status.Status = ""
_, err = volTopology.patchCSINodeTopology(ctx, csiNodeTopology, newCSINodeTopology)
if err != nil {
msg := fmt.Sprintf("Fail to patch CsiNodeTopology for the node: %q "+
"with nodeUUID: %s. Error: %+v",
nodeInfo.NodeName, nodeInfo.NodeID, err)
return nil, logger.LogNewErrorCodef(log, codes.Internal, msg)
}
log.Infof("Successfully patched CSINodeTopology instance: %q with Uuid: %q",
nodeInfo.NodeName, nodeInfo.NodeID)
}
// Create a watcher for CSINodeTopology CRs.
timeoutSeconds := int64((time.Duration(getCSINodeTopologyWatchTimeoutInMin(ctx)) * time.Minute).Seconds())
watchCSINodeTopology, err := volTopology.csiNodeTopologyWatcher.Watch(metav1.ListOptions{
Expand Down Expand Up @@ -839,6 +821,95 @@ func (volTopology *nodeVolumeTopology) GetNodeTopologyLabels(ctx context.Context
nodeInfo.NodeName)
}

func (volTopology *nodeVolumeTopology) updateNodeIDForTopology(
ctx context.Context,
nodeInfo *commoncotypes.NodeInfo,
csiNodeTopology *csinodetopologyv1alpha1.CSINodeTopology) *csinodetopologyv1alpha1.CSINodeTopology {
log := logger.GetLogger(ctx)
// If CSINodeTopology instance already exists, check if the NodeUUID
// parameter in Spec is populated. If not, patch the instance.
if csiNodeTopology.Spec.NodeUUID == "" ||
csiNodeTopology.Spec.NodeUUID != nodeInfo.NodeID {
if csiNodeTopology.Spec.NodeUUID == "" {
log.Infof("CSINodeTopology instance: %q with empty nodeUUID found. "+
"Patching the instance with nodeUUID", nodeInfo.NodeName)
} else {
log.Infof("CSINodeTopology instance: %q with different "+
"nodeUUID: %s found. Patching the instance with nodeUUID: %s",
nodeInfo.NodeName, csiNodeTopology.Spec.NodeUUID, nodeInfo.NodeID)
}
csiNodeTopology.Spec.NodeID = nodeInfo.NodeName
csiNodeTopology.Spec.NodeUUID = nodeInfo.NodeID
}
return csiNodeTopology
}

func (volTopology *nodeVolumeTopology) patchCSINodeTopology(
ctx context.Context,
oldTopo, newTopo *csinodetopologyv1alpha1.CSINodeTopology) (*csinodetopologyv1alpha1.CSINodeTopology, error) {
patch, err := getCSINodePatchData(oldTopo, newTopo, true)
if err != nil {
return oldTopo, err
}
rawPatch := client.RawPatch(types.MergePatchType, patch)
err = volTopology.csiNodeTopologyK8sClient.Patch(ctx, oldTopo, rawPatch)
if err != nil {
return oldTopo, err
}
return newTopo, nil
}

func getCSINodePatchData(
oldNodeTopology, newNodeTopology *csinodetopologyv1alpha1.CSINodeTopology,
addResourceVersionCheck bool) ([]byte, error) {
patchBytes, err := getPatchData(oldNodeTopology, newNodeTopology)
if err != nil {
return nil, err
}
if addResourceVersionCheck {
patchBytes, err = addResourceVersion(patchBytes, oldNodeTopology.ResourceVersion)
if err != nil {
return nil, fmt.Errorf("apply ResourceVersion to patch data failed: %v", err)
}
}
return patchBytes, nil
}

func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patchBytes, &patchMap)
if err != nil {
return nil, fmt.Errorf("error unmarshalling patch with %v", err)
}
u := unstructured.Unstructured{Object: patchMap}
a, err := apiMeta.Accessor(&u)
if err != nil {
return nil, fmt.Errorf("error creating accessor with %v", err)
}
a.SetResourceVersion(resourceVersion)
versionBytes, err := json.Marshal(patchMap)
if err != nil {
return nil, fmt.Errorf("error marshalling json patch with %v", err)
}
return versionBytes, nil
}

func getPatchData(oldObj, newObj interface{}) ([]byte, error) {
oldData, err := json.Marshal(oldObj)
if err != nil {
return nil, fmt.Errorf("marshal old object failed: %v", err)
}
newData, err := json.Marshal(newObj)
if err != nil {
return nil, fmt.Errorf("marshal new object failed: %v", err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, fmt.Errorf("CreateMergePatch failed: %v", err)
}
return patchBytes, nil
}

// Create new CSINodeTopology instance if it doesn't exist
// Create CSINodeTopology instance with spec.nodeID and spec.nodeUUID
// if cluster flavor is Vanilla and UseCSINodeId feature is enabled
Expand Down
46 changes: 46 additions & 0 deletions pkg/csi/service/common/commonco/k8sorchestrator/topology_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package k8sorchestrator

import (
"encoding/json"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
csinodetopologyv1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/csinodetopology/v1alpha1"
)

func TestPatchNodeTopology(t *testing.T) {
csiNodeTopology := &csinodetopologyv1alpha1.CSINodeTopology{
ObjectMeta: metav1.ObjectMeta{
Name: "simple-topo",
ResourceVersion: "100",
},
Spec: csinodetopologyv1alpha1.CSINodeTopologySpec{
NodeID: "foobar",
NodeUUID: "foobar123",
},
Status: csinodetopologyv1alpha1.CSINodeTopologyStatus{
Status: csinodetopologyv1alpha1.CSINodeTopologySuccess,
},
}

newTopology := csiNodeTopology.DeepCopy()
newTopology.Status.Status = ""

patch, err := getCSINodePatchData(csiNodeTopology, newTopology, true)
if err != nil {
t.Errorf("error creating patch object: %v", err)
}
var patchMap map[string]interface{}
err = json.Unmarshal(patch, &patchMap)
if err != nil {
t.Errorf("failed to unmarshal patch: %v", err)
}
metadata, exist := patchMap["metadata"].(map[string]interface{})
if !exist {
t.Errorf("ResourceVersion should exist in patch data")
}
resourceVersion := metadata["resourceVersion"].(string)
if resourceVersion != csiNodeTopology.ResourceVersion {
t.Errorf("ResourceVersion should be %s, got %s", csiNodeTopology.ResourceVersion, resourceVersion)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
// The CO calls NodeGetInfo API just once during the node registration,
// therefore we do not support updates to the spec after the CR has
// been reconciled.
log.Debug("Ignoring CSINodeTopology reconciliation on update event")
return false
return true
},
DeleteFunc: func(e event.DeleteEvent) bool {
// Instances are deleted by the garbage collector automatically after
Expand Down Expand Up @@ -296,11 +295,10 @@ func (r *ReconcileCSINodeTopology) reconcileForVanilla(ctx context.Context, requ
return reconcile.Result{RequeueAfter: timeout}, nil
}

// Retrieve topology labels for nodeVM.
if r.configInfo.Cfg.Labels.TopologyCategories == "" &&
r.configInfo.Cfg.Labels.Zone == "" && r.configInfo.Cfg.Labels.Region == "" {
if !r.isTopologyEnabled() {
// Not a topology aware setup.
// Set the Status to Success and return.
log.Infof("Skipping topology update, topolgogy feature is disabled")
instance.Status.TopologyLabels = make([]csinodetopologyv1alpha1.TopologyLabel, 0)
err = updateCRStatus(ctx, r, instance, csinodetopologyv1alpha1.CSINodeTopologySuccess,
"Not a topology aware cluster.")
Expand All @@ -311,6 +309,15 @@ func (r *ReconcileCSINodeTopology) reconcileForVanilla(ctx context.Context, requ
(r.configInfo.Cfg.Labels.Zone != "" && r.configInfo.Cfg.Labels.Region != "") {
log.Infof("Detected a topology aware cluster")

if len(instance.Status.TopologyLabels) > 0 {
log.Infof("Found existing topology")
err = updateCRStatus(ctx, r, instance, csinodetopologyv1alpha1.CSINodeTopologySuccess,
"found existing topology.")
if err != nil {
return reconcile.Result{RequeueAfter: timeout}, nil
}
}

// Fetch topology labels for nodeVM.
topologyLabels, err := getNodeTopologyInfo(ctx, nodeVM, r.configInfo.Cfg, r.isMultiVCFSSEnabled)
if err != nil {
Expand Down Expand Up @@ -343,6 +350,16 @@ func (r *ReconcileCSINodeTopology) reconcileForVanilla(ctx context.Context, requ
return reconcile.Result{}, nil
}

// isTopologyEnabled checks if topology of cluster should be updated.
// if cluster is not topology aware return false.
func (r *ReconcileCSINodeTopology) isTopologyEnabled() bool {
if r.configInfo.Cfg.Labels.TopologyCategories == "" &&
r.configInfo.Cfg.Labels.Zone == "" && r.configInfo.Cfg.Labels.Region == "" {
return false
}
return true
}

func (r *ReconcileCSINodeTopology) reconcileForGuest(ctx context.Context, request reconcile.Request) (
reconcile.Result, error) {
log := logger.GetLogger(ctx)
Expand Down

0 comments on commit 8564eec

Please sign in to comment.