Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement socket and server ChannelContexts #28275

Merged
merged 5 commits into from
Jan 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void closeFromSelector() throws IOException {
selector.assertOnSelectorThread();
if (closeContext.isDone() == false) {
try {
closeRawChannel();
socketChannel.close();
closeContext.complete(null);
} catch (IOException e) {
closeContext.completeExceptionally(e);
Expand Down Expand Up @@ -119,13 +119,13 @@ public void addCloseListener(BiConsumer<Void, Throwable> listener) {
closeContext.whenComplete(listener);
}

@Override
public void close() {
getContext().closeChannel();
}

// Package visibility for testing
void setSelectionKey(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
// Package visibility for testing

void closeRawChannel() throws IOException {
socketChannel.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOE
ChannelFactory<?, ?> channelFactory = nioServerChannel.getChannelFactory();
SocketSelector selector = selectorSupplier.get();
NioSocketChannel nioSocketChannel = channelFactory.acceptNioChannel(nioServerChannel, selector);
nioServerChannel.getAcceptContext().accept(nioSocketChannel);
nioServerChannel.getContext().acceptChannel(nioSocketChannel);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,30 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

public class BytesChannelContext implements ChannelContext {
public class BytesChannelContext extends SocketChannelContext {

private final NioSocketChannel channel;
private final ReadConsumer readConsumer;
private final InboundChannelBuffer channelBuffer;
private final LinkedList<BytesWriteOperation> queued = new LinkedList<>();
private final AtomicBoolean isClosing = new AtomicBoolean(false);
private boolean peerClosed = false;
private boolean ioException = false;

public BytesChannelContext(NioSocketChannel channel, ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) {
this.channel = channel;
public BytesChannelContext(NioSocketChannel channel, BiConsumer<NioSocketChannel, Exception> exceptionHandler,
ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) {
super(channel, exceptionHandler);
this.readConsumer = readConsumer;
this.channelBuffer = channelBuffer;
}

@Override
public void channelRegistered() throws IOException {}

@Override
public int read() throws IOException {
if (channelBuffer.getRemaining() == 0) {
// Requiring one additional byte will ensure that a new page is allocated.
channelBuffer.ensureCapacity(channelBuffer.getCapacity() + 1);
}

int bytesRead;
try {
bytesRead = channel.read(channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()));
} catch (IOException ex) {
ioException = true;
throw ex;
}
int bytesRead = readFromChannel(channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()));

if (bytesRead == -1) {
peerClosed = true;
if (bytesRead == 0) {
return 0;
}

Expand Down Expand Up @@ -90,7 +78,6 @@ public void sendMessage(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listen
return;
}

// TODO: Eval if we will allow writes from sendMessage
selector.queueWriteInChannelBuffer(writeOperation);
}

Expand Down Expand Up @@ -126,28 +113,38 @@ public void closeChannel() {

@Override
public boolean selectorShouldClose() {
return peerClosed || ioException || isClosing.get();
return isPeerClosed() || hasIOException() || isClosing.get();
}

@Override
public void closeFromSelector() {
public void closeFromSelector() throws IOException {
channel.getSelector().assertOnSelectorThread();
// Set to true in order to reject new writes before queuing with selector
isClosing.set(true);
channelBuffer.close();
for (BytesWriteOperation op : queued) {
channel.getSelector().executeFailedListener(op.getListener(), new ClosedChannelException());
if (channel.isOpen()) {
IOException channelCloseException = null;
try {
channel.closeFromSelector();
} catch (IOException e) {
channelCloseException = e;
}
// Set to true in order to reject new writes before queuing with selector
isClosing.set(true);
channelBuffer.close();
for (BytesWriteOperation op : queued) {
channel.getSelector().executeFailedListener(op.getListener(), new ClosedChannelException());
}
queued.clear();
if (channelCloseException != null) {
throw channelCloseException;
}
}
queued.clear();
}

private void singleFlush(BytesWriteOperation headOp) throws IOException {
try {
int written = channel.write(headOp.getBuffersToWrite());
int written = flushToChannel(headOp.getBuffersToWrite());
headOp.incrementIndex(written);
} catch (IOException e) {
channel.getSelector().executeFailedListener(headOp.getListener(), e);
ioException = true;
throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,26 @@
package org.elasticsearch.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.BiConsumer;

/**
* This context should implement the specific logic for a channel. When a channel receives a notification
* that it is ready to perform certain operations (read, write, etc) the {@link ChannelContext} will be
* called. This context will need to implement all protocol related logic. Additionally, if any special
* close behavior is required, it should be implemented in this context.
*
* The only methods of the context that should ever be called from a non-selector thread are
* {@link #closeChannel()} and {@link #sendMessage(ByteBuffer[], BiConsumer)}.
*/
public interface ChannelContext {

void channelRegistered() throws IOException;

int read() throws IOException;

void sendMessage(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener);

void queueWriteOperation(WriteOperation writeOperation);

void flushChannel() throws IOException;

boolean hasQueuedWriteOps();
/**
* This method cleans up any context resources that need to be released when a channel is closed. It
* should only be called by the selector thread.
*
* @throws IOException during channel / context close
*/
void closeFromSelector() throws IOException;

/**
* Schedules a channel to be closed by the selector event loop with which it is registered.
* <p>
*
* If the channel is open and the state can be transitioned to closed, the close operation will
* be scheduled with the event loop.
* <p>
* If the channel is already set to closed, it is assumed that it is already scheduled to be closed.
* <p>
*
* Depending on the underlying protocol of the channel, a close operation might simply close the socket
* channel or may involve reading and writing messages.
*/
void closeChannel();

/**
* This method indicates if a selector should close this channel.
*
* @return a boolean indicating if the selector should close
*/
boolean selectorShouldClose();

/**
* This method cleans up any context resources that need to be released when a channel is closed. It
* should only be called by the selector thread.
*
* @throws IOException during channel / context close
*/
void closeFromSelector() throws IOException;

@FunctionalInterface
interface ReadConsumer {
int consumeReads(InboundChannelBuffer channelBuffer) throws IOException;
}
void handleException(Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ private Socket internalCreateChannel(SocketSelector selector, SocketChannel rawC
try {
Socket channel = createChannel(selector, rawChannel);
assert channel.getContext() != null : "channel context should have been set on channel";
assert channel.getExceptionContext() != null : "exception handler should have been set on channel";
return channel;
} catch (Exception e) {
closeRawChannel(rawChannel, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected void uncaughtException(Exception exception) {
*/
protected void handleClose(NioChannel channel) {
try {
channel.closeFromSelector();
channel.getContext().closeFromSelector();
} catch (IOException e) {
closeException(channel, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public InboundChannelBuffer(Supplier<Page> pageSupplier) {
ensureCapacity(PAGE_SIZE);
}

public static InboundChannelBuffer allocatingInstance() {
return new InboundChannelBuffer(() -> new Page(ByteBuffer.allocate(PAGE_SIZE), () -> {}));
}

@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface NioChannel {

InetSocketAddress getLocalAddress();

void close();

void closeFromSelector() throws IOException;

void register() throws ClosedChannelException;
Expand All @@ -42,6 +44,8 @@ public interface NioChannel {

NetworkChannel getRawChannel();

ChannelContext getContext();

/**
* Adds a close listener to the channel. Multiple close listeners can be added. There is no guarantee
* about the order in which close listeners will be executed. If the channel is already closed, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicBoolean;

public class NioServerSocketChannel extends AbstractNioChannel<ServerSocketChannel> {

private final ChannelFactory<?, ?> channelFactory;
private Consumer<NioSocketChannel> acceptContext;
private ServerChannelContext context;
private final AtomicBoolean contextSet = new AtomicBoolean(false);

public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector)
throws IOException {
Expand All @@ -39,17 +40,22 @@ public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory<
}

/**
* This method sets the accept context for a server socket channel. The accept context is called when a
* new channel is accepted. The parameter passed to the context is the new channel.
* This method sets the context for a server socket channel. The context is called when a new channel is
* accepted, an exception occurs, or it is time to close the channel.
*
* @param acceptContext to call
* @param context to call
*/
public void setAcceptContext(Consumer<NioSocketChannel> acceptContext) {
this.acceptContext = acceptContext;
public void setContext(ServerChannelContext context) {
if (contextSet.compareAndSet(false, true)) {
this.context = context;
} else {
throw new IllegalStateException("Context on this channel were already set. It should only be once.");
}
}

public Consumer<NioSocketChannel> getAcceptContext() {
return acceptContext;
@Override
public ServerChannelContext getContext() {
return context;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

package org.elasticsearch.nio;

import org.elasticsearch.nio.utils.ExceptionsHelper;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand All @@ -35,9 +32,8 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
private final InetSocketAddress remoteAddress;
private final CompletableFuture<Void> connectContext = new CompletableFuture<>();
private final SocketSelector socketSelector;
private final AtomicBoolean contextsSet = new AtomicBoolean(false);
private ChannelContext context;
private BiConsumer<NioSocketChannel, Exception> exceptionContext;
private final AtomicBoolean contextSet = new AtomicBoolean(false);
private SocketChannelContext context;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have a context here again? the superclass has it aready? Can't we use a getter that has the right type or a generic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any change we can untangle the cyclic dependencies here and pass the context in as a ctor argument? or can we pass it as method arguments instead and don't have it as a member?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the duplicate contexts. Now each stances (NioServerSocketChannel and NioSocketChannel) has their own. I could store it in the super class parameterized. But AbstractNioChannel is already parameterized once and it would get pretty big.

Probably not possible to change the cyclic relationship right now. Context needs the channel. So it cannot be a ctor argument without allowing this to escape. Channel needs context as a field, as it is the hook into lower level selector operations. But going forward I have some ideas about how to continue to reduce this relationship.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool lets move on with this.

private Exception connectException;

public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException {
Expand All @@ -46,25 +42,6 @@ public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) th
this.socketSelector = selector;
}

@Override
public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread();
if (isOpen()) {
ArrayList<IOException> closingExceptions = new ArrayList<>(2);
try {
super.closeFromSelector();
} catch (IOException e) {
closingExceptions.add(e);
}
try {
context.closeFromSelector();
} catch (IOException e) {
closingExceptions.add(e);
}
ExceptionsHelper.rethrowAndSuppress(closingExceptions);
}
}

@Override
public SocketSelector getSelector() {
return socketSelector;
Expand Down Expand Up @@ -94,23 +71,19 @@ public int read(ByteBuffer[] buffers) throws IOException {
}
}

public void setContexts(ChannelContext context, BiConsumer<NioSocketChannel, Exception> exceptionContext) {
if (contextsSet.compareAndSet(false, true)) {
public void setContext(SocketChannelContext context) {
if (contextSet.compareAndSet(false, true)) {
this.context = context;
this.exceptionContext = exceptionContext;
} else {
throw new IllegalStateException("Contexts on this channel were already set. They should only be once.");
throw new IllegalStateException("Context on this channel were already set. It should only be once.");
}
}

public ChannelContext getContext() {
@Override
public SocketChannelContext getContext() {
return context;
}

public BiConsumer<NioSocketChannel, Exception> getExceptionContext() {
return exceptionContext;
}

public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
Expand Down
Loading