Skip to content

Commit

Permalink
Fix Nacos ServiceName error (#11180)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ authored Dec 21, 2022
1 parent d6341d9 commit 326fb72
Showing 1 changed file with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Collections;

import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
Expand Down Expand Up @@ -123,7 +123,7 @@ public class NacosRegistry extends FailbackRegistry {

private final NacosNamingServiceWrapper namingService;

private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, EventListener>> nacosListeners = new ConcurrentHashMap<>();
private final ConcurrentMap<URL, Map<NotifyListener, Map<String, EventListener>>> nacosListeners = new ConcurrentHashMap<>();

public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) {
super(url);
Expand Down Expand Up @@ -242,8 +242,7 @@ private boolean isServiceNamesWithCompatibleMode(final URL url) {
public void doUnsubscribe(URL url, NotifyListener listener) {
if (isAdminProtocol(url)) {
shutdownServiceNamesLookup();
}
else {
} else {
Set<String> serviceNames = getServiceNames(url, listener);

doUnsubscribe(url, listener, serviceNames);
Expand Down Expand Up @@ -305,7 +304,7 @@ private Set<String> filterServiceNames(NacosServiceName serviceName) {
Set<String> serviceNames = new LinkedHashSet<>();

execute(namingService -> serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData()
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData()
.stream()
.filter(this::isConformRules)
.map(NacosServiceName::new)
Expand Down Expand Up @@ -511,40 +510,41 @@ private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) {

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
ConcurrentMap<NotifyListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
EventListener nacosListener = listeners.computeIfAbsent(listener, k -> {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
List<Instance> instances = e.getInstances();


if (isServiceNamesWithCompatibleMode(url)) {
Map<NotifyListener, Map<String, EventListener>> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
Map<String, EventListener> eventListenerMap = listeners.computeIfAbsent(listener, k -> new ConcurrentHashMap<>());
EventListener nacosListener = eventListenerMap.computeIfAbsent(serviceName,
name -> event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
List<Instance> instances = e.getInstances();
if (isServiceNamesWithCompatibleMode(url)) {

// Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
// in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(name, instances);
instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(name);
}

// Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
// in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(e.getServiceName(), instances);
instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(e.getServiceName());
notifySubscriber(url, listener, instances);
}

notifySubscriber(url, listener, instances);
}
};
return eventListener;
});
});
namingService.subscribe(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
nacosListener);
}

private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
ConcurrentMap<NotifyListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
if(notifyListenerEventListenerConcurrentMap == null){
Map<NotifyListener, Map<String, EventListener>> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
if (notifyListenerEventListenerConcurrentMap == null) {
return;
}
Map<String, EventListener> listenerMap = notifyListenerEventListenerConcurrentMap.get(listener);
if (listenerMap == null) {
return;
}
EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener);
if(nacosListener == null){
EventListener nacosListener = listenerMap.remove(serviceName);
if (nacosListener == null) {
return;
}
namingService.unsubscribe(serviceName,
Expand Down

0 comments on commit 326fb72

Please sign in to comment.