Skip to content

Commit

Permalink
Read multiple TLS packets in one read call (elastic#41725)
Browse files Browse the repository at this point in the history
This is related to elastic#27260. Currently we have a single read buffer that
is no larger than a single TLS packet. This prevents us from reading
multiple TLS packets in a single socket read call. This commit modifies
our TLS work to support reading similar to the plaintext case. The data
will be copied to a (potentially) recycled TLS packet-sized buffer for
interaction with the SSLEngine.
  • Loading branch information
Tim-Brooks authored and Gurkan Kaymak committed May 27, 2019
1 parent 35cec03 commit be5ceef
Show file tree
Hide file tree
Showing 15 changed files with 406 additions and 375 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.function.IntFunction;

/**
* This is a channel byte buffer composed internally of 16kb pages. When an entire message has been read
Expand All @@ -37,30 +37,27 @@
*/
public final class InboundChannelBuffer implements AutoCloseable {

private static final int PAGE_SIZE = 1 << 14;
public static final int PAGE_SIZE = 1 << 14;
private static final int PAGE_MASK = PAGE_SIZE - 1;
private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
private static final Page[] EMPTY_BYTE_PAGE_ARRAY = new Page[0];


private final ArrayDeque<Page> pages;
private final Supplier<Page> pageSupplier;
private final IntFunction<Page> pageAllocator;
private final ArrayDeque<Page> pages = new ArrayDeque<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);

private long capacity = 0;
private long internalIndex = 0;
// The offset is an int as it is the offset of where the bytes begin in the first buffer
private int offset = 0;

public InboundChannelBuffer(Supplier<Page> pageSupplier) {
this.pageSupplier = pageSupplier;
this.pages = new ArrayDeque<>();
this.capacity = PAGE_SIZE * pages.size();
public InboundChannelBuffer(IntFunction<Page> pageAllocator) {
this.pageAllocator = pageAllocator;
}

public static InboundChannelBuffer allocatingInstance() {
return new InboundChannelBuffer(() -> new Page(ByteBuffer.allocate(PAGE_SIZE), () -> {}));
return new InboundChannelBuffer((n) -> new Page(ByteBuffer.allocate(n), () -> {}));
}

@Override
Expand All @@ -87,7 +84,7 @@ public void ensureCapacity(long requiredCapacity) {
int numPages = numPages(requiredCapacity + offset);
int pagesToAdd = numPages - pages.size();
for (int i = 0; i < pagesToAdd; i++) {
Page page = pageSupplier.get();
Page page = pageAllocator.apply(PAGE_SIZE);
pages.addLast(page);
}
capacity += pagesToAdd * PAGE_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.nio;

import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.nio.utils.ByteBufferUtils;
import org.elasticsearch.nio.utils.ExceptionsHelper;

import java.io.IOException;
Expand Down Expand Up @@ -249,26 +250,6 @@ protected void setCloseNow() {
// data that is copied to the buffer for a write, but not successfully flushed immediately, must be
// copied again on the next call.

protected int readFromChannel(ByteBuffer buffer) throws IOException {
ByteBuffer ioBuffer = getSelector().getIoBuffer();
ioBuffer.limit(Math.min(buffer.remaining(), ioBuffer.limit()));
int bytesRead;
try {
bytesRead = rawChannel.read(ioBuffer);
} catch (IOException e) {
closeNow = true;
throw e;
}
if (bytesRead < 0) {
closeNow = true;
return 0;
} else {
ioBuffer.flip();
buffer.put(ioBuffer);
return bytesRead;
}
}

protected int readFromChannel(InboundChannelBuffer channelBuffer) throws IOException {
ByteBuffer ioBuffer = getSelector().getIoBuffer();
int bytesRead;
Expand All @@ -288,7 +269,7 @@ protected int readFromChannel(InboundChannelBuffer channelBuffer) throws IOExcep
int j = 0;
while (j < buffers.length && ioBuffer.remaining() > 0) {
ByteBuffer buffer = buffers[j++];
copyBytes(ioBuffer, buffer);
ByteBufferUtils.copyBytes(ioBuffer, buffer);
}
channelBuffer.incrementIndex(bytesRead);
return bytesRead;
Expand All @@ -299,24 +280,6 @@ protected int readFromChannel(InboundChannelBuffer channelBuffer) throws IOExcep
// copying.
private final int WRITE_LIMIT = 1 << 16;

protected int flushToChannel(ByteBuffer buffer) throws IOException {
int initialPosition = buffer.position();
ByteBuffer ioBuffer = getSelector().getIoBuffer();
ioBuffer.limit(Math.min(WRITE_LIMIT, ioBuffer.limit()));
copyBytes(buffer, ioBuffer);
ioBuffer.flip();
int bytesWritten;
try {
bytesWritten = rawChannel.write(ioBuffer);
} catch (IOException e) {
closeNow = true;
buffer.position(initialPosition);
throw e;
}
buffer.position(initialPosition + bytesWritten);
return bytesWritten;
}

protected int flushToChannel(FlushOperation flushOperation) throws IOException {
ByteBuffer ioBuffer = getSelector().getIoBuffer();

Expand All @@ -325,12 +288,8 @@ protected int flushToChannel(FlushOperation flushOperation) throws IOException {
while (continueFlush) {
ioBuffer.clear();
ioBuffer.limit(Math.min(WRITE_LIMIT, ioBuffer.limit()));
int j = 0;
ByteBuffer[] buffers = flushOperation.getBuffersToWrite(WRITE_LIMIT);
while (j < buffers.length && ioBuffer.remaining() > 0) {
ByteBuffer buffer = buffers[j++];
copyBytes(buffer, ioBuffer);
}
ByteBufferUtils.copyBytes(buffers, ioBuffer);
ioBuffer.flip();
int bytesFlushed;
try {
Expand All @@ -345,12 +304,4 @@ protected int flushToChannel(FlushOperation flushOperation) throws IOException {
}
return totalBytesFlushed;
}

private void copyBytes(ByteBuffer from, ByteBuffer to) {
int nBytesToCopy = Math.min(to.remaining(), from.remaining());
int initialLimit = from.limit();
from.limit(from.position() + nBytesToCopy);
to.put(from);
from.limit(initialLimit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.nio.utils;

import java.nio.ByteBuffer;

public final class ByteBufferUtils {

private ByteBufferUtils() {}

/**
* Copies bytes from the array of byte buffers into the destination buffer. The number of bytes copied is
* limited by the bytes available to copy and the space remaining in the destination byte buffer.
*
* @param source byte buffers to copy from
* @param destination byte buffer to copy to
*
* @return number of bytes copied
*/
public static long copyBytes(ByteBuffer[] source, ByteBuffer destination) {
long bytesCopied = 0;
for (int i = 0; i < source.length && destination.hasRemaining(); i++) {
ByteBuffer buffer = source[i];
bytesCopied += copyBytes(buffer, destination);
}
return bytesCopied;
}

/**
* Copies bytes from source byte buffer into the destination buffer. The number of bytes copied is
* limited by the bytes available to copy and the space remaining in the destination byte buffer.
*
* @param source byte buffer to copy from
* @param destination byte buffer to copy to
*
* @return number of bytes copied
*/
public static int copyBytes(ByteBuffer source, ByteBuffer destination) {
int nBytesToCopy = Math.min(destination.remaining(), source.remaining());
int initialLimit = source.limit();
source.limit(source.position() + nBytesToCopy);
destination.put(source);
source.limit(initialLimit);
return nBytesToCopy;
}
}
Loading

0 comments on commit be5ceef

Please sign in to comment.