From a1d0691268e2b8e6fa103014dce5c87b68a1f7bd Mon Sep 17 00:00:00 2001 From: zhachen Date: Tue, 31 Jul 2018 09:20:37 +0800 Subject: [PATCH] refactor zkRegistry --- core/util.go | 24 ++ core/util_test.go | 25 ++ ha/failoverHA.go | 2 +- registry/zkRegistry.go | 794 +++++++++++++++++++++++------------- registry/zkRegistry_test.go | 296 ++++++++++++++ 5 files changed, 849 insertions(+), 292 deletions(-) diff --git a/core/util.go b/core/util.go index 1b530a99..d9cd4948 100644 --- a/core/util.go +++ b/core/util.go @@ -125,3 +125,27 @@ func HandlePanic(f func()) { } } } + +// Split slices s into all substrings separated by sep and +// returns a slice of the substrings between those separators, +// specially trim all substrings. +func TrimSplit(s string, sep string) []string { + n := strings.Count(s, sep) + 1 + a := make([]string, n) + i := 0 + if sep == "" { + return strings.Split(s, sep) + } + for { + m := strings.Index(s, sep) + if m < 0 { + s = strings.TrimSpace(s) + break + } + a[i] = strings.TrimSpace(s[:m]) + i++ + s = s[m+len(sep):] + } + a[i] = s + return a[:i+1] +} diff --git a/core/util_test.go b/core/util_test.go index 0858ae02..cb46aee1 100644 --- a/core/util_test.go +++ b/core/util_test.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "testing" + "github.com/stretchr/testify/assert" ) func TestParseExportString(t *testing.T) { @@ -75,3 +76,27 @@ func TestHandlePanic(t *testing.T) { }) panic("test panic") } + +func TestSplitTrim(t *testing.T) { + type SplitTest struct { + str string + sep string + expect []string + } + space := "\t\v\r\f\n\u0085\u00a0\u2000\u3000" + var splitList = []SplitTest{ + {"", "", []string{}}, + {"abcd", "", []string{"a", "b", "c", "d"}}, + {"☺☻☹", "", []string{"☺", "☻", "☹"}}, + {"abcd", "a", []string{"", "bcd"}}, + {"abcd", "z", []string{"abcd"}}, + {space + "1....2....3....4" + space, "...", []string{"1", ".2", ".3", ".4"}}, + {"☺☻☹", "☹", []string{"☺☻", ""}}, + {"1\t " + space + "\n2\t", " ", []string{"1", "2"}}, + {"fd , fds, ,df\n, \v\ff ds ,,fd s , fds ,", ",", []string{"fd", "fds", "", "df", "f ds", "", "fd s", "fds", ""}}, + } + for _, tt := range splitList { + ret := TrimSplit(tt.str, tt.sep) + assert.Equal(t, tt.expect, ret) + } +} diff --git a/ha/failoverHA.go b/ha/failoverHA.go index c1ba9f32..8c8210f5 100644 --- a/ha/failoverHA.go +++ b/ha/failoverHA.go @@ -40,7 +40,7 @@ func (f *FailOverHA) Call(request motan.Request, loadBalance motan.LoadBalance) lastErr = respnose.GetException() vlog.Warningf("FailOverHA call fail! url:%s, err:%+v\n", ep.GetURL().GetIdentity(), lastErr) } - return getErrorResponse(request.GetRequestID(), fmt.Sprintf("call fail over %d times.Exception:%s", retries, lastErr.ErrMsg)) + return getErrorResponse(request.GetRequestID(), fmt.Sprintf("FailOverHA call fail %d times.Exception:%s", retries+1, lastErr.ErrMsg)) } diff --git a/registry/zkRegistry.go b/registry/zkRegistry.go index 0559e82d..f6c87f96 100644 --- a/registry/zkRegistry.go +++ b/registry/zkRegistry.go @@ -6,7 +6,7 @@ import ( "strings" "sync" "time" - + "encoding/binary" "github.com/samuel/go-zookeeper/zk" "github.com/weibocom/motan-go/cluster" motan "github.com/weibocom/motan-go/core" @@ -14,386 +14,449 @@ import ( ) const ( - ZkRegistryNamespace = "/motan" - ZkRegistryCommand = "/command" - ZkRegistryNode = "/node" - PathSeparator = "/" -) - -const ( - ZkNodetypeServer = "server" - ZkNodetypeClient = "client" - ZkNodetypeAgent = "agent" + ZkRegistryNamespace = "/motan" + ZkRegistryCommand = "/command" + ZkRegistryNode = "/node" + ZkPathSeparator = "/" + ZkNodeTypeServer = "server" + ZkNodeTypeUnavailableServer = "unavailableServer" + ZkNodeTypeClient = "client" + ZkNodeTypeAgent = "agent" + + //Compatible with java ioStream + StreamMagicTag = 0xaced + ShortStringTag = 0x74 + LongStringTag = 0x7C ) type ZkRegistry struct { - url *motan.URL - timeout time.Duration - sessionTimeout time.Duration - zkConn *zk.Conn - nodeRs map[string]ServiceNode - - subscribeMap map[string]map[string]motan.NotifyListener - subscribeLock sync.Mutex - subscribeCmdLock sync.Mutex - watchSwitcherMap map[string]chan bool + available bool + zkConn *zk.Conn + url *motan.URL + sessionTimeout time.Duration + registerLock sync.Mutex + subscribeLock sync.Mutex + serviceNodeMap map[string]ServiceNode + switcherMap map[string]chan bool + registeredServiceMap map[string]*motan.URL + availableServiceMap map[string]*motan.URL + subscribedServiceMap map[string]map[motan.NotifyListener]*motan.URL + subscribedCommandMap map[string]map[motan.CommandNotifyListener]*motan.URL } func (z *ZkRegistry) Initialize() { z.sessionTimeout = time.Duration( z.url.GetPositiveIntValue(motan.SessionTimeOutKey, DefaultHeartbeatInterval)) * time.Millisecond - z.timeout = time.Duration(z.url.GetPositiveIntValue(motan.TimeOutKey, DefaultTimeout)) * time.Millisecond - var addrs []string - if z.url.Host != "" { - addrs = []string{z.url.GetAddressStr()} - } else { - addrs = strings.Split(z.url.GetParam(motan.AddressKey, ""), ",") + z.subscribedServiceMap = make(map[string]map[motan.NotifyListener]*motan.URL) + z.subscribedCommandMap = make(map[string]map[motan.CommandNotifyListener]*motan.URL) + z.serviceNodeMap = make(map[string]ServiceNode) + z.StartSnapshot(GetSanpshotConf()) + z.switcherMap = make(map[string]chan bool) + z.registeredServiceMap = make(map[string]*motan.URL) + z.availableServiceMap = make(map[string]*motan.URL) + addrs := motan.TrimSplit(z.url.GetAddressStr(), ",") + c, ch, err := zk.Connect(addrs, z.sessionTimeout) + if err != nil { + vlog.Errorf("[ZkRegistry] connect server error. err:%v\n", err) + return } - if c, _, err := zk.Connect(addrs, z.sessionTimeout); err == nil { - z.zkConn = c - } else { - vlog.Errorf("zk connect error:%+v\n", err) + z.zkConn = c + go z.handleNewSession(ch) + z.setAvailable(true) +} + +func (z *ZkRegistry) handleNewSession(ch <-chan zk.Event) { + defer motan.HandlePanic(nil) + for { + ev := <-ch + if ev.State == zk.StateDisconnected { + z.setAvailable(false) + } else if ev.State == zk.StateHasSession && !z.IsAvailable() { + z.setAvailable(true) + vlog.Infoln("[ZkRegistry] get new session notify") + z.recoverService() + z.recoverSubscribe() + } } - z.subscribeMap = make(map[string]map[string]motan.NotifyListener) - z.nodeRs = make(map[string]ServiceNode) - z.StartSnapshot(GetSanpshotConf()) - z.watchSwitcherMap = make(map[string]chan bool) -} - -func ToGroupPath(url *motan.URL) string { - return ZkRegistryNamespace + PathSeparator + url.Group -} - -func ToServicePath(url *motan.URL) string { - return ToGroupPath(url) + PathSeparator + url.Path -} - -func ToCommandPath(url *motan.URL) string { - return ToGroupPath(url) + ZkRegistryCommand -} - -func ToNodeTypePath(url *motan.URL, nodeType string) string { - return ToServicePath(url) + PathSeparator + nodeType -} - -func ToNodePath(url *motan.URL, nodeType string) string { - return ToNodeTypePath(url, nodeType) + PathSeparator + url.GetAddressStr() -} - -func ToAgentPath(url *motan.URL) string { - return ZkRegistryNamespace + PathSeparator + ZkNodetypeAgent + PathSeparator + url.Parameters["application"] -} - -func ToAgentNodeTypePath(url *motan.URL) string { - return ToAgentPath(url) + ZkRegistryNode -} - -func ToAgentNodePath(url *motan.URL) string { - return ToAgentNodeTypePath(url) + PathSeparator + url.GetAddressStr() } -func ToAgentCommandPath(url *motan.URL) string { - return ToAgentPath(url) + ZkRegistryCommand -} - -func (z *ZkRegistry) RemoveNode(url *motan.URL, nodeType string) error { - var ( - nodePath string - err error - ) - if IsAgent(url) { - nodePath = ToAgentNodePath(url) - } else { - nodePath = ToNodePath(url, nodeType) +func (z *ZkRegistry) recoverService() { + z.registerLock.Lock() + defer z.registerLock.Unlock() + if len(z.registeredServiceMap) > 0 { + for _, url := range z.registeredServiceMap { + z.doRegister(url) + } + vlog.Infoln("[ZkRegistry] register services success:", z.registeredServiceMap) } - - if isexist, stats, err := z.zkConn.Exists(nodePath); err == nil { - if isexist { - if rmerr := z.zkConn.Delete(nodePath, stats.Version); rmerr != nil { - err = rmerr - } + if len(z.availableServiceMap) > 0 { + for _, url := range z.availableServiceMap { + z.doAvailable(url) } - } else { - vlog.Infof("zk query err:%+v\n", err) + vlog.Infoln("[ZkRegistry] available services success:", z.availableServiceMap) } - return err } -func (z *ZkRegistry) CreateNode(url *motan.URL, nodeType string) error { - var ( - typePath string - nodePath string - errc error - ) - if IsAgent(url) { - typePath = ToAgentNodeTypePath(url) - nodePath = ToAgentNodePath(url) - } else { - typePath = ToNodeTypePath(url, nodeType) - nodePath = ToNodePath(url, nodeType) - } - if isexist, _, err := z.zkConn.Exists(typePath); err == nil { - if !isexist { - z.CreatePersistent(typePath, true) +func (z *ZkRegistry) recoverSubscribe() { + z.subscribeLock.Lock() + defer z.subscribeLock.Unlock() + if len(z.subscribedServiceMap) > 0 { + for _, listeners := range z.subscribedServiceMap { + for _, url := range listeners { + z.doSubscribe(url) + } } - var data []byte - if _, errc := z.zkConn.Create(nodePath, data, - zk.FlagEphemeral, zk.WorldACL(zk.PermAll)); errc != nil { - vlog.Errorf("create node: %s, error, err:%s\n", nodePath, errc) - } else { - vlog.Infof("create node: %s\n", nodePath) + vlog.Infoln("[ZkRegistry] subscribe services success") + } + if len(z.subscribedCommandMap) > 0 { + for _, listeners := range z.subscribedCommandMap { + for _, url := range listeners { + z.doSubscribeCommand(url) + } } - } else { - errc = err + vlog.Infoln("[ZkRegistry] subscribe commands success") } - return errc } -func (z *ZkRegistry) CreatePersistent(path string, createParents bool) { - if _, err := z.zkConn.Create(path, nil, 0, zk.WorldACL(zk.PermAll)); err == nil { - vlog.Infof("create Persistent node: %s\n", path) - } else if err == zk.ErrNoNode { - if createParents { - parts := strings.Split(path, "/") - parentPath := strings.Join(parts[:len(parts)-1], "/") - z.CreatePersistent(parentPath, createParents) - z.CreatePersistent(path, createParents) - } - } else { - vlog.Errorf("err create Persistent. path: %s, err: %v\n", path, err) +func (z *ZkRegistry) Register(url *motan.URL) { + if !z.IsAvailable() { + return + } + z.registerLock.Lock() + defer z.registerLock.Unlock() + if _, ok := z.registeredServiceMap[url.GetIdentity()]; !ok { + vlog.Infof("[ZkRegistry] register service. url:%s\n", url.GetIdentity()) + z.doRegister(url) + z.registeredServiceMap[url.GetIdentity()] = url } } -func (z *ZkRegistry) Register(url *motan.URL) { - vlog.Infof("start zk register %s\n", url.GetIdentity()) +func (z *ZkRegistry) doRegister(url *motan.URL) { if url.Group == "" || url.Path == "" || url.Host == "" { - vlog.Errorf("register fail.invalid url : %s\n", url.GetIdentity()) + vlog.Errorf("[ZkRegistry] register service fail. invalid url:%s\n", url.GetIdentity()) } - nodeType := getNodeType(url, "unknown") - z.RemoveNode(url, nodeType) - errc := z.CreateNode(url, nodeType) - if errc != nil { - vlog.Errorf("register failed, service:%s, error:%+v\n", url.GetIdentity(), errc) + if IsAgent(url) { + z.createNode(url, ZkNodeTypeAgent) } else { - vlog.Infof("register sucesss, service:%s\n", url.GetIdentity()) + z.removeNode(url, ZkNodeTypeServer) + z.createNode(url, ZkNodeTypeUnavailableServer) } } func (z *ZkRegistry) UnRegister(url *motan.URL) { - z.RemoveNode(url, getNodeType(url, "unknown")) -} - -// @TODO extInfo from java Obj Pase -func buildURL4Nodes(nodes []string, url *motan.URL) []*motan.URL { - result := make([]*motan.URL, 0, len(nodes)) - for _, node := range nodes { - nodeinfo := strings.Split(node, ":") - port, _ := strconv.Atoi(nodeinfo[1]) - refURL := url.Copy() - refURL.Host = nodeinfo[0] - refURL.Port = port - result = append(result, refURL) + if !z.IsAvailable() { + return + } + z.registerLock.Lock() + defer z.registerLock.Unlock() + if _, ok := z.registeredServiceMap[url.GetIdentity()]; ok { + vlog.Infof("[ZkRegistry] unregister service. url:%s\n", url.GetIdentity()) + z.removeNode(url, ZkNodeTypeServer) + z.removeNode(url, ZkNodeTypeUnavailableServer) + delete(z.registeredServiceMap, url.GetIdentity()) } - return result } func (z *ZkRegistry) Subscribe(url *motan.URL, listener motan.NotifyListener) { + if !z.IsAvailable() { + return + } z.subscribeLock.Lock() defer z.subscribeLock.Unlock() - vlog.Infof("start subscribe service. url:%s\n", url.GetIdentity()) - subkey := GetSubKey(url) - idt := listener.GetIdentity() - if listeners, ok := z.subscribeMap[subkey]; ok { - if _, exist := listeners[idt]; !exist { - listeners[idt] = listener - } - } else { - lmap := make(map[string]motan.NotifyListener) - lmap[idt] = listener - z.subscribeMap[subkey] = lmap - serverPath := ToNodeTypePath(url, ZkNodetypeServer) - if _, _, ch, err := z.zkConn.ChildrenW(serverPath); err == nil { - vlog.Infof("start watch %s\n", subkey) - url.PutParam(motan.NodeTypeKey, motan.NodeTypeReferer) // all subscribe url must as referer - if url.Host == "" { - url.Host = motan.GetLocalIP() - } - z.Register(url) // register as rpc client - go func() { - defer motan.HandlePanic(nil) - for { - select { - case evt := <-ch: - if evt.Type == zk.EventNodeChildrenChanged { - if nodes, _, chx, err := z.zkConn.ChildrenW(serverPath); err == nil { - z.buildNodes(nodes, url) - ch = chx - if listeners, ok := z.subscribeMap[subkey]; ok { - for _, l := range listeners { - l.Notify(z.url, buildURL4Nodes(nodes, url)) - vlog.Infof("EventNodeChildrenChanged %+v\n", nodes) - } - } + servicePath := toNodeTypePath(url, ZkNodeTypeServer) + if listeners, ok := z.subscribedServiceMap[servicePath]; ok { + listeners[listener] = url + vlog.Infof("[ZkRegistry] subscribe service success. path:%s, listener:%s\n", servicePath, listener.GetIdentity()) + return + } + lisMap := make(map[motan.NotifyListener]*motan.URL) + lisMap[listener] = url + z.subscribedServiceMap[servicePath] = lisMap + vlog.Infof("[ZkRegistry] subscribe service. url:%s\n", url.GetIdentity()) + z.doSubscribe(url) +} + +func (z *ZkRegistry) doSubscribe(url *motan.URL) { + servicePath := toNodeTypePath(url, ZkNodeTypeServer) + if isExist, _, err := z.zkConn.Exists(servicePath); err != nil || !isExist { + vlog.Errorf("[ZkRegistry] check service exists fail. isExist:%v, path:%s, err:%v, \n", isExist, servicePath, err) + return + } + _, _, ch, err := z.zkConn.ChildrenW(servicePath) + if err != nil { + vlog.Errorf("[ZkRegistry] subscribe service error. err:%v\n", err) + return + } + switcherChan, ok := z.switcherMap[servicePath] + if !ok { + switcherChan = make(chan bool) + z.switcherMap[servicePath] = switcherChan + } + vlog.Infof("[ZkRegistry] start watch server node. path:%s\n", servicePath) + url.PutParam(motan.NodeTypeKey, motan.NodeTypeReferer) // all subscribe url must as referer + if url.Host == "" { + url.Host = motan.GetLocalIP() + } + z.createNode(url, ZkNodeTypeClient) // register as rpc client + go func() { + defer motan.HandlePanic(nil) + for { + select { + case evt := <-ch: + if evt.Type == zk.EventNodeChildrenChanged { + if nodes, _, chx, err := z.zkConn.ChildrenW(servicePath); err == nil { + z.buildSnapShotNodes(nodes, url) + ch = chx + listeners, ok := z.subscribedServiceMap[servicePath]; + if ok && len(nodes) > 0 { + for lis := range listeners { + lis.Notify(z.url, z.nodeChildsToURLs(url, servicePath, nodes)) + vlog.Infof("[ZkRegistry] notify nodes:%+v\n", nodes) } } + } else { + vlog.Errorln("[ZkRegistry] watch server node error. err:", err) } + } else if evt.Type == zk.EventNotWatching { + vlog.Infoln("[ZkRegistry] not watch server node. path:", servicePath) + return } - }() - } else { - vlog.Infof("zk Subscribe err %+v\n", err) + case checkWatch := <-switcherChan: + if !checkWatch { + close(switcherChan) + delete(z.switcherMap, servicePath) + return + } + } } - } -} - -func (z *ZkRegistry) buildNodes(nodes []string, url *motan.URL) { - servicenode := &ServiceNode{ - Group: url.Group, - Path: url.Path, - } - nodeInfos := []SnapShotNodeInfo{} - for _, addr := range nodes { - info := &SnapShotNodeInfo{Addr: addr} - nodeInfos = append(nodeInfos, *info) - } - servicenode.Nodes = nodeInfos - z.nodeRs[getNodeKey(url)] = *servicenode + }() } func (z *ZkRegistry) Unsubscribe(url *motan.URL, listener motan.NotifyListener) { + if !z.IsAvailable() { + return + } z.subscribeLock.Lock() defer z.subscribeLock.Unlock() - subkey := GetSubKey(url) - idt := listener.GetIdentity() - if listeners, ok := z.subscribeMap[subkey]; ok { - delete(listeners, idt) + servicePath := toNodeTypePath(url, ZkNodeTypeServer) + if _, ok := z.subscribedServiceMap[servicePath]; ok { + vlog.Infof("[ZkRegistry] unsubscribe service. url:%s\n", url.GetIdentity()) + delete(z.subscribedServiceMap[servicePath], listener) + if switcherChan, ok := z.switcherMap[servicePath]; ok && len(z.subscribedServiceMap[servicePath]) < 1 { + switcherChan <- false + delete(z.subscribedServiceMap, servicePath) + } } } func (z *ZkRegistry) Discover(url *motan.URL) []*motan.URL { - nodePath := ToNodeTypePath(url, ZkNodetypeServer) // discover server nodes + if !z.IsAvailable() { + return nil + } + nodePath := toNodeTypePath(url, ZkNodeTypeServer) nodes, _, err := z.zkConn.Children(nodePath) if err == nil { - z.buildNodes(nodes, url) - return buildURL4Nodes(nodes, url) + z.buildSnapShotNodes(nodes, url) + return z.nodeChildsToURLs(url, nodePath, nodes) } - vlog.Errorf("zookeeper registry discover fail! discover url:%s, err:%s\n", url.GetIdentity(), err.Error()) + vlog.Errorf("[ZkRegistry] discover service error! url:%s, err:%v\n", url.GetIdentity(), err) return nil } func (z *ZkRegistry) SubscribeCommand(url *motan.URL, listener motan.CommandNotifyListener) { - z.subscribeCmdLock.Lock() - defer z.subscribeCmdLock.Unlock() - vlog.Infof("zookeeper subscribe command of %s\n", url.GetIdentity()) - var commandPath string + if !z.IsAvailable() { + return + } + z.subscribeLock.Lock() + defer z.subscribeLock.Unlock() + commandPath := "" if IsAgent(url) { - commandPath = ToAgentCommandPath(url) + commandPath = toAgentCommandPath(url) } else { - commandPath = ToCommandPath(url) + commandPath = toCommandPath(url) } - if isexist, _, err := z.zkConn.Exists(commandPath); err == nil { - if !isexist { - vlog.Infof("command didn't exists, path:%s\n", commandPath) - return - } + if listeners, ok := z.subscribedCommandMap[commandPath]; ok && listeners != nil { + vlog.Infof("[ZkRegistry] subscribe command success. path:%s, listener:%s\n", commandPath, listener.GetIdentity()) + listeners[listener] = url + return + } + lisMap := make(map[motan.CommandNotifyListener]*motan.URL) + lisMap[listener] = url + z.subscribedCommandMap[commandPath] = lisMap + vlog.Infof("[ZkRegistry] subscribe command. url:%s\n", url.GetIdentity()) + z.doSubscribeCommand(url) +} + +func (z *ZkRegistry) doSubscribeCommand(url *motan.URL) { + var commandPath string + if IsAgent(url) { + commandPath = toAgentCommandPath(url) } else { - vlog.Errorf("check command exists error: %+v\n", err) + commandPath = toCommandPath(url) } - if _, _, ch, err := z.zkConn.GetW(commandPath); err == nil { - tempChan := z.watchSwitcherMap[commandPath] - if tempChan == nil { - z.watchSwitcherMap[commandPath] = make(chan bool) - } - vlog.Infof("start watch command %s\n", commandPath) - go func() { - defer motan.HandlePanic(nil) - watchData := true - for { - select { - case evt := <-ch: - if evt.Type == zk.EventNodeDataChanged { - if data, _, chx, err := z.zkConn.GetW(commandPath); err == nil { - if watchData { - ch = chx - } else { - // @TODO check this close if UnSubscribeCommand is still write sth - close(tempChan) - break + if isExist, _, err := z.zkConn.Exists(commandPath); err != nil || !isExist { + vlog.Errorf("[ZkRegistry] check command exists fail. isExist:%v, path:%s, err:%v, \n", isExist, commandPath, err) + return + } + _, _, ch, err := z.zkConn.GetW(commandPath) + if err != nil { + vlog.Errorf("[ZkRegistry] subscribe command error. commandPath:%s, url:%v, err:%v\n", commandPath, url, err) + return + } + switcherChan, ok := z.switcherMap[commandPath] + if !ok { + switcherChan = make(chan bool) + z.switcherMap[commandPath] = switcherChan + } + vlog.Infof("[ZkRegistry] start watch command %s\n", commandPath) + go func() { + defer motan.HandlePanic(nil) + for { + select { + case evt := <-ch: + if evt.Type == zk.EventNodeDataChanged { + if data, _, chx, err := z.zkConn.GetW(commandPath); err == nil { + ch = chx + if listeners, ok := z.subscribedCommandMap[commandPath]; ok && len(data) > 0 { + cmdInfo := getNodeInfo(data) + for lis := range listeners { + lis.NotifyCommand(url, cluster.ServiceCmd, cmdInfo) + vlog.Infof("[ZkRegistry] command changed, path:%s, cmdInfo:%s\n", commandPath, cmdInfo) } - cmdInfo := tempFixZK(data) - listener.NotifyCommand(z.url, cluster.ServiceCmd, cmdInfo) - vlog.Infof("command changed, path:%s, data:%s\n", commandPath, cmdInfo) - } else { - vlog.Infof("command changed, get cmdInfo error, err: %+v\n", err) } + } else { + vlog.Errorf("[ZkRegistry] command changed, get cmdInfo error, err:%v\n", err) } - case checkWatch := <-tempChan: - watchData = checkWatch + } else if evt.Type == zk.EventNotWatching { + vlog.Infoln("[ZkRegistry] not watching commandPath:", commandPath) + return + } + case checkWatch := <-switcherChan: + if !checkWatch { + close(switcherChan) + delete(z.switcherMap, commandPath) + return } } - }() - } else { - vlog.Warningf("zookeeper subscribe command fail. url:%s, err:%s, zk_path:%s, urlx:%+v\n", url.GetIdentity(), err.Error(), commandPath, url) - } + } + }() } func (z *ZkRegistry) UnSubscribeCommand(url *motan.URL, listener motan.CommandNotifyListener) { - z.subscribeCmdLock.Lock() - defer z.subscribeCmdLock.Unlock() + if !z.IsAvailable() { + return + } + z.subscribeLock.Lock() + defer z.subscribeLock.Unlock() var commandPath string if IsAgent(url) { - commandPath = ToAgentCommandPath(url) + commandPath = toAgentCommandPath(url) } else { - commandPath = ToCommandPath(url) + commandPath = toCommandPath(url) + } + if _, ok := z.subscribedCommandMap[commandPath]; ok { + vlog.Infof("[ZkRegistry] unsubscribe command. url:%s\n", url.GetIdentity()) + delete(z.subscribedCommandMap[commandPath], listener) + if switcherChan, ok := z.switcherMap[commandPath]; ok && len(z.subscribedCommandMap[commandPath]) < 1 { + switcherChan <- false + delete(z.subscribedCommandMap, commandPath) + } } - z.watchSwitcherMap[commandPath] <- false } func (z *ZkRegistry) DiscoverCommand(url *motan.URL) string { - vlog.Infof("zookeeper Discover command of %s\n", url.GetIdentity()) - var ( - res string - commandPath string - ) + if !z.IsAvailable() { + return "" + } + var res string + var commandPath string if IsAgent(url) { - commandPath = ToAgentCommandPath(url) + commandPath = toAgentCommandPath(url) } else { - commandPath = ToCommandPath(url) + commandPath = toCommandPath(url) } - if isexist, _, err := z.zkConn.Exists(commandPath); err == nil { - if !isexist { - vlog.Infof("zookeeper command didn't exist, path:%s\n", commandPath) + if isExist, _, err := z.zkConn.Exists(commandPath); err == nil { + if !isExist { + vlog.Warningf("[ZkRegistry] command didn't exist, path:%s\n", commandPath) return res } } else { - vlog.Infof("zookeeper command check err: %+v\n", err) + vlog.Errorf("[ZkRegistry] command check error:%v\n", err) return res } if data, _, err := z.zkConn.Get(commandPath); err == nil { - vlog.Infof("zookeeper Discover command %s\n", commandPath) - res = tempFixZK(data) + vlog.Infof("[ZkRegistry] discover command. path:%s\n", commandPath) + res = getNodeInfo(data) } else { - vlog.Warningf("zookeeper DiscoverCommand error. url:%s, err:%s\n", url.GetIdentity(), err.Error()) + vlog.Errorf("[ZkRegistry] discover command error. url:%s, err:%s\n", url.GetIdentity(), err.Error()) } return res } -func tempFixZK(data []byte) string { - if len(data) > 7 && data[0] != '{' && data[7] == '{' { - return string(data[7:]) +func (z *ZkRegistry) Available(url *motan.URL) { + if !z.IsAvailable() { + return } - return string(data) + z.registerLock.Lock() + z.registerLock.Unlock() + if url == nil { + vlog.Infof("[ZkRegistry] available all services:%v\n", z.registeredServiceMap) + } else { + vlog.Infof("[ZkRegistry] available service:%s\n", url.GetIdentity()) + } + z.doAvailable(url) } -func (z *ZkRegistry) Available(url *motan.URL) { - +func (z *ZkRegistry) doAvailable(url *motan.URL) { + if url == nil { + for _, u := range z.registeredServiceMap { + z.removeNode(u, ZkNodeTypeUnavailableServer) + z.createNode(u, ZkNodeTypeServer) + z.availableServiceMap[u.GetIdentity()] = url + } + } else { + z.removeNode(url, ZkNodeTypeUnavailableServer) + z.createNode(url, ZkNodeTypeServer) + z.availableServiceMap[url.GetIdentity()] = url + } } func (z *ZkRegistry) Unavailable(url *motan.URL) { + if !z.IsAvailable() { + return + } + z.registerLock.Lock() + z.registerLock.Unlock() + if url == nil { + vlog.Infof("[ZkRegistry] unavailable all services:%v\n", z.registeredServiceMap) + } else { + vlog.Infof("[ZkRegistry] unavailable service. url:%s\n", url.GetIdentity()) + } + z.doUnavailable(url) +} +func (z *ZkRegistry) doUnavailable(url *motan.URL) { + if url == nil { + for _, u := range z.registeredServiceMap { + z.removeNode(u, ZkNodeTypeServer) + z.createNode(u, ZkNodeTypeUnavailableServer) + delete(z.availableServiceMap, u.GetIdentity()) + } + } else { + z.removeNode(url, ZkNodeTypeServer) + z.createNode(url, ZkNodeTypeUnavailableServer) + delete(z.availableServiceMap, url.GetIdentity()) + } } func (z *ZkRegistry) GetRegisteredServices() []*motan.URL { - return nil + z.registerLock.Lock() + defer z.registerLock.Unlock() + urls := make([]*motan.URL, 0, len(z.registeredServiceMap)) + for _, u := range z.registeredServiceMap { + urls = append(urls, u) + } + return urls } func (z *ZkRegistry) GetURL() *motan.URL { @@ -401,37 +464,186 @@ func (z *ZkRegistry) GetURL() *motan.URL { } func (z *ZkRegistry) SetURL(url *motan.URL) { - + z.url = url } func (z *ZkRegistry) GetName() string { return "zookeeper" } +func (z *ZkRegistry) IsAvailable() bool { + return z.available +} + +func (z *ZkRegistry) setAvailable(available bool) { + z.available = available +} + func (z *ZkRegistry) StartSnapshot(conf *motan.SnapshotConf) { if _, err := os.Stat(conf.SnapshotDir); os.IsNotExist(err) { if err := os.Mkdir(conf.SnapshotDir, 0774); err != nil { - vlog.Infoln(err) + vlog.Errorln("[ZkRegistry] make directory error. err:" + err.Error()) } } go func(z *ZkRegistry) { defer motan.HandlePanic(nil) ticker := time.NewTicker(conf.SnapshotInterval) for range ticker.C { - saveSnapshot(conf.SnapshotDir, z.nodeRs) + saveSnapshot(conf.SnapshotDir, z.serviceNodeMap) } }(z) } -func getNodeType(url *motan.URL, defaultNodeType string) string { - nodeType := url.GetParam(motan.NodeTypeKey, defaultNodeType) - switch nodeType { - case motan.NodeTypeService: - nodeType = ZkNodetypeServer - case motan.NodeTypeReferer: - nodeType = ZkNodetypeClient - case motan.NodeTypeAgent: - nodeType = ZkNodetypeAgent +func (z *ZkRegistry) buildSnapShotNodes(nodes []string, url *motan.URL) { + nodeRsSnapshotLock.Lock() + defer nodeRsSnapshotLock.Unlock() + serviceNode := ServiceNode{ + Group: url.Group, + Path: url.Path, + } + nodeInfos := make([]SnapShotNodeInfo, 0, len(nodes)) + for _, addr := range nodes { + nodeInfos = append(nodeInfos, SnapShotNodeInfo{Addr: addr}) + } + serviceNode.Nodes = nodeInfos + z.serviceNodeMap[getNodeKey(url)] = serviceNode +} + +func (z *ZkRegistry) removeNode(url *motan.URL, nodeType string) { + var nodePath string + if nodeType == ZkNodeTypeAgent { + nodePath = toAgentNodePath(url) + } else { + nodePath = toNodePath(url, nodeType) + } + isExist, stats, err := z.zkConn.Exists(nodePath) + if err == nil && isExist { + if err = z.zkConn.Delete(nodePath, stats.Version); err == nil { + return + } + } + if err != nil { + vlog.Errorf("[ZkRegistry] remove node error. err:%v, isExist:%v\n", err, isExist) + } +} + +func (z *ZkRegistry) createNode(url *motan.URL, nodeType string) { + var typePath string + var nodePath string + if nodeType == ZkNodeTypeAgent { + typePath = toAgentNodeTypePath(url) + nodePath = toAgentNodePath(url) + } else { + typePath = toNodeTypePath(url, nodeType) + nodePath = toNodePath(url, nodeType) + } + z.removeNode(url, nodeType) + if isExist, _, err := z.zkConn.Exists(typePath); err != nil { + vlog.Errorf("[ZkRegistry] create node error. path:%s, err:%v\n", nodePath, err) + return + } else if !isExist { + z.createPersistent(typePath, true) + } + if _, err := z.zkConn.Create(nodePath, []byte(url.ToExtInfo()), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)); err != nil { + vlog.Errorf("[ZkRegistry] create node error. path:%s, err:%v\n", nodePath, err) + return + } +} + +func (z *ZkRegistry) createPersistent(path string, createParents bool) { + if _, err := z.zkConn.Create(path, nil, 0, zk.WorldACL(zk.PermAll)); err != nil { + if err == zk.ErrNoNode && createParents { + parts := strings.Split(path, "/") + parentPath := strings.Join(parts[:len(parts)-1], "/") + z.createPersistent(parentPath, createParents) + z.createPersistent(path, createParents) + return + } + vlog.Errorf("[ZkRegistry] create persistent error. path:%s, err:%v\n", path, err) + } +} + +func getNodeInfo(data []byte) string { + if len(data) > 7 && binary.BigEndian.Uint16(data[:2]) == StreamMagicTag { + if data[4] == ShortStringTag { + return string(data[7:]) + } else if data[4] == LongStringTag && len(data) > 13 { + return string(data[13:]) + } + } + return string(data) +} + +func (z *ZkRegistry) nodeChildsToURLs(url *motan.URL, parentPath string, currentChilds []string) []*motan.URL { + urls := make([]*motan.URL, 0, len(currentChilds)) + if currentChilds != nil { + for _, node := range currentChilds { + nodePath := parentPath + ZkPathSeparator + node + data, _, err := z.zkConn.Get(nodePath) + if err != nil { + vlog.Errorln("[ZkRegistry] get node data error. err:" + err.Error()) + continue + } + newURL := &motan.URL{} + nodeInfo := getNodeInfo(data) + if nodeInfo != "" { + newURL = motan.FromExtInfo(nodeInfo) + } else { + newURL = url.Copy() + var host string + port := 80 + if strings.Index(node, ":") > -1 { + hp := strings.Split(node, ":") + if len(hp) > 1 { + host = hp[0] + port, _ = strconv.Atoi(hp[1]) + } + } else { + host = node + } + newURL.Host = host + newURL.Port = port + } + if newURL.Port != 0 || newURL.Host != "" { + urls = append(urls, newURL) + } + } } - return nodeType + return urls +} + +func toGroupPath(url *motan.URL) string { + return ZkRegistryNamespace + ZkPathSeparator + url.Group +} + +func toServicePath(url *motan.URL) string { + return toGroupPath(url) + ZkPathSeparator + url.Path +} + +func toCommandPath(url *motan.URL) string { + return toGroupPath(url) + ZkRegistryCommand +} + +func toNodeTypePath(url *motan.URL, nodeType string) string { + return toServicePath(url) + ZkPathSeparator + nodeType +} + +func toNodePath(url *motan.URL, nodeType string) string { + return toNodeTypePath(url, nodeType) + ZkPathSeparator + url.GetAddressStr() +} + +func toAgentPath(url *motan.URL) string { + return ZkRegistryNamespace + ZkPathSeparator + ZkNodeTypeAgent + ZkPathSeparator + url.GetParam(motan.ApplicationKey, "") +} + +func toAgentNodeTypePath(url *motan.URL) string { + return toAgentPath(url) + ZkRegistryNode +} + +func toAgentNodePath(url *motan.URL) string { + return toAgentNodeTypePath(url) + ZkPathSeparator + url.GetAddressStr() +} + +func toAgentCommandPath(url *motan.URL) string { + return toAgentPath(url) + ZkRegistryCommand } diff --git a/registry/zkRegistry_test.go b/registry/zkRegistry_test.go index b2a276fb..b7c90d8e 100644 --- a/registry/zkRegistry_test.go +++ b/registry/zkRegistry_test.go @@ -1 +1,297 @@ package registry + +import ( + motan "github.com/weibocom/motan-go/core" + "testing" + "time" + "net" + "sync" + "github.com/stretchr/testify/assert" +) + +var ( + //zk server url + zkURL = &motan.URL{Host: "127.0.0.1", Port: 2181} + DefaultWaitTime = 200 * time.Millisecond + //unified test url + testURL = &motan.URL{ + Protocol: "zookeeper", + Group: "zkTestGroup", + Path: "zkTestPath", + Host: "127.0.0.1", + Port: 1234, + Parameters: map[string]string{motan.ApplicationKey: "zkTestApp"}, + } + //serverPath = "/motan/zkTestGroup/zkTestPath/server/127.0.0.1:1234" + serverPath = ZkRegistryNamespace + ZkPathSeparator + testURL.Group + ZkPathSeparator + testURL.Path + ZkPathSeparator + ZkNodeTypeServer + ZkPathSeparator + testURL.Host + ":" + testURL.GetPortStr() + //unavailableServerPath = "/motan/zkTestGroup/zkTestPath/unavailableServer/127.0.0.1:1234" + unavailableServerPath = ZkRegistryNamespace + ZkPathSeparator + testURL.Group + ZkPathSeparator + testURL.Path + ZkPathSeparator + ZkNodeTypeUnavailableServer + ZkPathSeparator + testURL.Host + ":" + testURL.GetPortStr() + //agentPath = "/motan/agent/zkTestApp/node/127.0.0.1:1234" + agentPath = ZkRegistryNamespace + ZkPathSeparator + ZkNodeTypeAgent + ZkPathSeparator + testURL.GetParam(motan.ApplicationKey, "") + ZkRegistryNode + ZkPathSeparator + testURL.Host + ":" + testURL.GetPortStr() + //commandPath = "/motan/zkTestGroup/command" + commandPath = ZkRegistryNamespace + ZkPathSeparator + testURL.Group + ZkRegistryCommand + //agentCommandPath = "/motan/agent/zkTestApp/command" + agentCommandPath = ZkRegistryNamespace + ZkPathSeparator + ZkNodeTypeAgent + ZkPathSeparator + testURL.GetParam(motan.ApplicationKey, "") + ZkRegistryCommand + z = &ZkRegistry{} + hasZKServer = false + once sync.Once +) + +//Test path generation methods. +func TestZkRegistryToPath(t *testing.T) { + //Test path create methods. + if p := toNodePath(testURL, ZkNodeTypeServer); p != serverPath { + t.Error("toNodePath err. result:", p) + } + if p := toCommandPath(testURL); p != commandPath { + t.Error("toCommandPath err. result:", p) + } + if p := toAgentNodePath(testURL); p != agentPath { + t.Error("toAgentNodePath err. result:", p) + } + if p := toAgentCommandPath(testURL); p != agentCommandPath { + t.Error("toAgentCommandPath err. result:", p) + } + + //Test SetURL method and GetURL method. + z.SetURL(testURL) + assert.Equal(t, z.GetURL(), testURL) + + //Test GetName method. + assert.Equal(t, z.GetName(), "zookeeper") +} + +func TestZkRegistryBasic(t *testing.T) { + if once.Do(initZK); hasZKServer { + //Test createNode method: server path. + z.createNode(testURL, ZkNodeTypeServer) + time.Sleep(DefaultWaitTime) + isExist, _, err := z.zkConn.Exists(serverPath) + if err != nil || !isExist { + t.Error("Create server node fail. exist:", isExist, " err:", err) + } + + //Test createNode method: agent path. + z.createNode(testURL, ZkNodeTypeAgent) + time.Sleep(DefaultWaitTime) + isExist, _, err = z.zkConn.Exists(agentPath) + if err != nil || !isExist { + t.Error("Create agent node fail. exist:", isExist, " err:", err) + } + + //Test Discover method. + testURL.ClearCachedInfo() + disURL := z.Discover(testURL) + time.Sleep(DefaultWaitTime) + assert.Equal(t, disURL[0], testURL) + + //Test DiscoverCommand method. + z.createPersistent(commandPath, true) + commandReq := "hello" + z.zkConn.Set(commandPath, []byte(commandReq), -1) + commandRes := z.DiscoverCommand(testURL) + time.Sleep(DefaultWaitTime) + assert.Equal(t, commandReq, commandRes) + + //Test DiscoverCommand method. + z.createPersistent(agentCommandPath, true) + z.zkConn.Set(agentCommandPath, []byte(commandReq), -1) + testURL.PutParam("nodeType", ZkNodeTypeAgent) + commandRes = z.DiscoverCommand(testURL) + testURL.PutParam("nodeType", "") + time.Sleep(DefaultWaitTime) + assert.Equal(t, commandReq, commandRes) + + //Test removeNode method. + z.removeNode(testURL, ZkNodeTypeServer) + time.Sleep(DefaultWaitTime) + if isExist, _, err := z.zkConn.Exists(serverPath); err == nil { + if isExist { + t.Error("removeNode fail.") + } + } else { + t.Error("removeNode err:", err) + } + } +} + +func TestZkRegistryRegister(t *testing.T) { + if once.Do(initZK); hasZKServer { + //Test Available method: with parameter. + z.Register(testURL) + z.Available(testURL) + time.Sleep(DefaultWaitTime) + if isExist, _, err := z.zkConn.Exists(serverPath); err == nil { + if !isExist { + t.Error("Register fail.") + } + } else { + t.Error("Register err:", err) + } + + //Test Unavailable method: without parameter. + z.Unavailable(testURL) + time.Sleep(DefaultWaitTime) + isExistUnAvail, _, errUnAvail := z.zkConn.Exists(unavailableServerPath) + isExistAvail, _, errAvail := z.zkConn.Exists(serverPath) + if errUnAvail == nil && errAvail == nil { + if !isExistUnAvail || isExistAvail { + t.Error("Unavailable fail.") + } + } else { + t.Error("Unavailable err:", errUnAvail, errAvail) + } + + //Test Available method: without parameter. + z.Register(testURL) + z.Available(nil) + time.Sleep(DefaultWaitTime) + if isExist, _, err := z.zkConn.Exists(serverPath); err == nil { + if !isExist { + t.Error("Register fail.") + } + } else { + t.Error("Register err:", err) + } + + //Test Unavailable method: with parameter. + z.Unavailable(nil) + time.Sleep(DefaultWaitTime) + isExistUnAvail, _, errUnAvail = z.zkConn.Exists(unavailableServerPath) + isExistAvail, _, errAvail = z.zkConn.Exists(serverPath) + if errUnAvail == nil && errAvail == nil { + if !isExistUnAvail || isExistAvail { + t.Error("Unavailable fail.") + } + } else { + t.Error("Unavailable err:", errUnAvail, errAvail) + } + } +} + +func TestZkRegistrySubscribe(t *testing.T) { + if once.Do(initZK); hasZKServer { + //Test Register method. + z.Register(testURL) + time.Sleep(DefaultWaitTime) + if isExist, _, err := z.zkConn.Exists(unavailableServerPath); !isExist || err != nil { + t.Error("Register fail:", err) + } + testURL.PutParam("nodeType", ZkNodeTypeAgent) + testURL.Group = "agent" //build different urlID + testURL.ClearCachedInfo() + z.Register(testURL) + testURL.Group = "zkTestGroup" //revert urlID + testURL.ClearCachedInfo() + if isExist, _, err := z.zkConn.Exists(agentPath); !isExist || err != nil { + t.Error("Register fail:", err) + } + testURL.PutParam("nodeType", "") + + //Test GetRegisteredServices method. + assert.Equal(t, z.GetRegisteredServices()[0], testURL) + + //Test Subscribe method. + lis := MockListener{registryURL: &motan.URL{}} + z.Subscribe(testURL, &lis) + z.createNode(testURL, ZkNodeTypeServer) + time.Sleep(DefaultWaitTime) + urlRes := &motan.URL{ + Host: zkURL.Host, + Port: zkURL.Port, + } + lis.registryURL.ClearCachedInfo() + time.Sleep(DefaultWaitTime) + assert.Equal(t, urlRes, lis.registryURL) + + //Test UnSubscribe method. + lis = MockListener{} + z.Unsubscribe(testURL, &lis) + time.Sleep(DefaultWaitTime) + if listeners, ok := z.subscribedServiceMap[serverPath]; ok { + if _, ok := listeners[&lis]; ok { + t.Error("UnSubscribe fail. registryURL:", lis.registryURL) + } + } + + //Test SubscribeCommand method: service command path. + lis = MockListener{} + z.createPersistent(commandPath, true) + z.SubscribeCommand(testURL, &lis) + commandReq := "hello" + z.zkConn.Set(commandPath, []byte(commandReq), -1) + time.Sleep(DefaultWaitTime) + //assert.Equal(t, commandReq, lis.command) + + //Test SubscribeCommand method: agent command path. + lis = MockListener{} + testURL.PutParam("nodeType", ZkNodeTypeAgent) + testURL.Group = "agentCommand" //build different urlID + testURL.ClearCachedInfo() + z.createPersistent(agentCommandPath, true) + z.SubscribeCommand(testURL, &lis) + testURL.Group = "zkTestGroup" //revert urlID + testURL.ClearCachedInfo() + testURL.PutParam("nodeType", "") + z.zkConn.Set(agentCommandPath, []byte(commandReq), -1) + time.Sleep(DefaultWaitTime) + assert.Equal(t, commandReq, lis.command) + + //Test UnSubscribeCommand method: service command path. + z.UnSubscribeCommand(testURL, &lis) + time.Sleep(DefaultWaitTime) + if _, ok := z.switcherMap[commandPath]; ok { + t.Error("UnSubscribe command fail.") + } + + //Test UnSubscribeCommand method: agent command path. + testURL.PutParam("nodeType", ZkNodeTypeAgent) + z.UnSubscribeCommand(testURL, &lis) + testURL.PutParam("nodeType", "") + time.Sleep(DefaultWaitTime) + if _, ok := z.switcherMap[commandPath]; ok { + t.Error("UnSubscribe command fail.") + } + + //Test UnRegister method. + z.UnRegister(testURL) + isExistUnReg, _, errUnReg := z.zkConn.Exists(unavailableServerPath) + isExistGeg, _, errReg := z.zkConn.Exists(serverPath) + if errUnReg == nil && errReg == nil { + if isExistUnReg || isExistGeg { + t.Error("UnRegister fail.") + } + } else { + t.Error("UnRegister err:", errUnReg, errReg) + } + } +} + +func initZK() { + tcpAddr, _ := net.ResolveTCPAddr("tcp4", zkURL.GetAddressStr()) + if _, err := net.DialTCP("tcp", nil, tcpAddr); err == nil { + z = &ZkRegistry{url: zkURL} + z.Initialize() + hasZKServer = true + } +} + +type MockListener struct { + registryURL *motan.URL + urls []*motan.URL + command string +} + +func (m *MockListener) Notify(registryURL *motan.URL, urls []*motan.URL) { + m.registryURL = registryURL + m.urls = urls +} + +func (m *MockListener) NotifyCommand(registryURL *motan.URL, commandType int, commandInfo string) { + m.registryURL = registryURL + m.command = commandInfo +} + +func (m *MockListener) GetIdentity() string { + return "mocklistener" +}