Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove manual tracking of registered channels #27445

Merged
merged 1 commit into from
Nov 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
Expand Down Expand Up @@ -93,7 +89,6 @@ private void setUpNewServerChannels() {
newChannel.register();
SelectionKey selectionKey = newChannel.getSelectionKey();
selectionKey.attach(newChannel);
addRegisteredChannel(newChannel);
eventHandler.serverChannelRegistered(newChannel);
} else {
eventHandler.registrationException(newChannel, new ClosedChannelException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
* This is a basic selector abstraction used by {@link org.elasticsearch.transport.nio.NioTransport}. This
Expand All @@ -56,7 +55,6 @@ public abstract class ESSelector implements Closeable {
private final CountDownLatch exitedLoop = new CountDownLatch(1);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final PlainActionFuture<Boolean> isRunningFuture = PlainActionFuture.newFuture();
private final Set<NioChannel> registeredChannels = Collections.newSetFromMap(new ConcurrentHashMap<NioChannel, Boolean>());
private volatile Thread thread;

ESSelector(EventHandler eventHandler) throws IOException {
Expand Down Expand Up @@ -134,7 +132,7 @@ void singleLoop() {

void cleanupAndCloseChannels() {
cleanup();
channelsToClose.addAll(registeredChannels);
channelsToClose.addAll(selector.keys().stream().map(sk -> (NioChannel) sk.attachment()).collect(Collectors.toList()));
closePendingChannels();
}

Expand Down Expand Up @@ -171,19 +169,6 @@ void wakeup() {
selector.wakeup();
}

public Set<NioChannel> getRegisteredChannels() {
return registeredChannels;
}

public void addRegisteredChannel(NioChannel channel) {
assert registeredChannels.contains(channel) == false : "Should only register channel once";
registeredChannels.add(channel);
}

public void removeRegisteredChannel(NioChannel channel) {
registeredChannels.remove(channel);
}

@Override
public void close() throws IOException {
if (isClosed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ private void setupChannel(NioSocketChannel newChannel) {
try {
if (newChannel.isOpen()) {
newChannel.register();
addRegisteredChannel(newChannel);
SelectionKey key = newChannel.getSelectionKey();
key.attach(newChannel);
eventHandler.handleRegistration(newChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public void closeFromSelector() throws IOException {
} catch (IOException e) {
closeContext.completeExceptionally(e);
throw e;
} finally {
// There is no problem with calling this multiple times
selector.removeRegisteredChannel(this);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.transport.nio;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.utils.TestSelectionKey;
import org.junit.Before;
Expand All @@ -30,8 +29,8 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.security.PrivilegedActionException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
Expand All @@ -46,6 +45,7 @@ public class AcceptingSelectorTests extends ESTestCase {
private NioServerSocketChannel serverChannel;
private AcceptorEventHandler eventHandler;
private TestSelectionKey selectionKey;
private Selector rawSelector;

@Before
public void setUp() throws Exception {
Expand All @@ -54,7 +54,7 @@ public void setUp() throws Exception {
eventHandler = mock(AcceptorEventHandler.class);
serverChannel = mock(NioServerSocketChannel.class);

Selector rawSelector = mock(Selector.class);
rawSelector = mock(Selector.class);
selector = new AcceptingSelector(eventHandler, rawSelector);
this.selector.setThread();

Expand All @@ -71,9 +71,6 @@ public void testRegisteredChannel() throws IOException, PrivilegedActionExceptio
selector.preSelect();

verify(eventHandler).serverChannelRegistered(serverChannel);
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(1, registeredChannels.size());
assertTrue(registeredChannels.contains(serverChannel));
}

public void testClosedChannelWillNotBeRegistered() throws Exception {
Expand All @@ -83,10 +80,6 @@ public void testClosedChannelWillNotBeRegistered() throws Exception {
selector.preSelect();

verify(eventHandler).registrationException(same(serverChannel), any(ClosedChannelException.class));

Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(serverChannel));
}

public void testRegisterChannelFailsDueToException() throws Exception {
Expand All @@ -98,10 +91,6 @@ public void testRegisterChannelFailsDueToException() throws Exception {
selector.preSelect();

verify(eventHandler).registrationException(serverChannel, closedChannelException);

Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(serverChannel));
}

public void testAcceptEvent() throws IOException {
Expand All @@ -128,7 +117,9 @@ public void testCleanup() throws IOException {

selector.preSelect();

assertEquals(1, selector.getRegisteredChannels().size());
TestSelectionKey key = new TestSelectionKey(0);
key.attach(serverChannel);
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(key)));

selector.cleanupAndCloseChannels();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,12 @@ public void setUp() throws Exception {
public void testQueueChannelForClosed() throws IOException {
NioChannel channel = mock(NioChannel.class);
when(channel.getSelector()).thenReturn(selector);
selector.addRegisteredChannel(channel);

selector.queueChannelClose(channel);

assertEquals(1, selector.getRegisteredChannels().size());

selector.singleLoop();

verify(handler).handleClose(channel);
// Will be called in the channel close method
selector.removeRegisteredChannel(channel);

assertEquals(0, selector.getRegisteredChannels().size());
}

public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Set;
import java.util.Collections;
import java.util.HashSet;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
Expand All @@ -54,6 +55,7 @@ public class SocketSelectorTests extends ESTestCase {
private WriteContext writeContext;
private ActionListener<NioChannel> listener;
private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1]));
private Selector rawSelector;

@Before
@SuppressWarnings("unchecked")
Expand All @@ -65,7 +67,7 @@ public void setUp() throws Exception {
listener = mock(ActionListener.class);
selectionKey = new TestSelectionKey(0);
selectionKey.attach(channel);
Selector rawSelector = mock(Selector.class);
rawSelector = mock(Selector.class);

this.socketSelector = new SocketSelector(eventHandler, rawSelector);
this.socketSelector.setThread();
Expand All @@ -83,10 +85,6 @@ public void testRegisterChannel() throws Exception {
socketSelector.preSelect();

verify(eventHandler).handleRegistration(channel);

Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(1, registeredChannels.size());
assertTrue(registeredChannels.contains(channel));
}

public void testClosedChannelWillNotBeRegistered() throws Exception {
Expand All @@ -97,10 +95,6 @@ public void testClosedChannelWillNotBeRegistered() throws Exception {

verify(eventHandler).registrationException(same(channel), any(ClosedChannelException.class));
verify(channel, times(0)).finishConnect();

Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(channel));
}

public void testRegisterChannelFailsDueToException() throws Exception {
Expand All @@ -113,10 +107,6 @@ public void testRegisterChannelFailsDueToException() throws Exception {

verify(eventHandler).registrationException(channel, closedChannelException);
verify(channel, times(0)).finishConnect();

Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(channel));
}

public void testSuccessfullyRegisterChannelWillConnect() throws Exception {
Expand Down Expand Up @@ -309,6 +299,10 @@ public void testCleanup() throws Exception {
socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener));
socketSelector.scheduleForRegistration(unRegisteredChannel);

TestSelectionKey testSelectionKey = new TestSelectionKey(0);
testSelectionKey.attach(channel);
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(testSelectionKey)));

socketSelector.cleanupAndCloseChannels();

verify(listener).onFailure(any(ClosedSelectorException.class));
Expand Down