diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cf852a3a2b50..d8b35bc5c2eb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204)) - Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992)) - [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839)) +- Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261)) + ### Dependencies - Bump JNA version from 5.5 to 5.13 ([#9963](https://github.com/opensearch-project/OpenSearch/pull/9963)) - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4HeaderVerifierIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4HeaderVerifierIT.java new file mode 100644 index 0000000000000..0d41d416499f6 --- /dev/null +++ b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4HeaderVerifierIT.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4; + +import org.opensearch.OpenSearchNetty4IntegTestCase; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import org.opensearch.transport.Netty4BlockingPlugin; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import io.netty.buffer.ByteBufUtil; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.ReferenceCounted; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static io.netty.handler.codec.http.HttpHeaderNames.HOST; + +@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) +public class Netty4HeaderVerifierIT extends OpenSearchNetty4IntegTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(Netty4BlockingPlugin.class); + } + + public void testThatNettyHttpServerRequestBlockedWithHeaderVerifier() throws Exception { + HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); + TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); + TransportAddress transportAddress = randomFrom(boundAddresses); + + final FullHttpRequest blockedRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + blockedRequest.headers().add("blockme", "Not Allowed"); + blockedRequest.headers().add(HOST, "localhost"); + + final List responses = new ArrayList<>(); + try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) { + try { + FullHttpResponse blockedResponse = nettyHttpClient.send(transportAddress.address(), blockedRequest); + responses.add(blockedResponse); + String blockedResponseContent = new String(ByteBufUtil.getBytes(blockedResponse.content()), StandardCharsets.UTF_8); + assertThat(blockedResponseContent, containsString("Hit header_verifier")); + assertThat(blockedResponse.status().code(), equalTo(401)); + } finally { + responses.forEach(ReferenceCounted::release); + } + } + } + +} diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/Netty4BlockingPlugin.java b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/Netty4BlockingPlugin.java new file mode 100644 index 0000000000000..85729e2120607 --- /dev/null +++ b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/transport/Netty4BlockingPlugin.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.netty4.Netty4HttpServerTransport; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; + +public class Netty4BlockingPlugin extends Netty4Plugin { + + private static final AttributeKey SHOULD_BLOCK = AttributeKey.newInstance("should-block"); + + public class Netty4BlockingHttpServerTransport extends Netty4HttpServerTransport { + + public Netty4BlockingHttpServerTransport( + Settings settings, + NetworkService networkService, + BigArrays bigArrays, + ThreadPool threadPool, + NamedXContentRegistry xContentRegistry, + Dispatcher dispatcher, + ClusterSettings clusterSettings, + SharedGroupFactory sharedGroupFactory, + Tracer tracer + ) { + super( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry, + dispatcher, + clusterSettings, + sharedGroupFactory, + tracer + ); + } + + @Override + protected ChannelInboundHandlerAdapter createHeaderVerifier() { + return new ExampleBlockingNetty4HeaderVerifier(); + } + } + + @Override + public Map> getHttpTransports( + Settings settings, + ThreadPool threadPool, + BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, + CircuitBreakerService circuitBreakerService, + NamedXContentRegistry xContentRegistry, + NetworkService networkService, + HttpServerTransport.Dispatcher dispatcher, + ClusterSettings clusterSettings, + Tracer tracer + ) { + return Collections.singletonMap( + NETTY_HTTP_TRANSPORT_NAME, + () -> new Netty4BlockingHttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry, + new BlockingDispatcher(dispatcher), + clusterSettings, + getSharedGroupFactory(settings), + tracer + ) + ); + } + + /** POC for how an external header verifier would be implemented */ + public class ExampleBlockingNetty4HeaderVerifier extends SimpleChannelInboundHandler { + + @Override + public void channelRead0(ChannelHandlerContext ctx, DefaultHttpRequest msg) throws Exception { + ReferenceCountUtil.retain(msg); + if (isBlocked(msg)) { + msg.headers().add("blocked", true); + } + ctx.fireChannelRead(msg); + } + + private boolean isBlocked(HttpMessage request) { + final boolean shouldBlock = request.headers().contains("blockme"); + + return shouldBlock; + } + } + + class BlockingDispatcher implements HttpServerTransport.Dispatcher { + + private HttpServerTransport.Dispatcher originalDispatcher; + + public BlockingDispatcher(final HttpServerTransport.Dispatcher originalDispatcher) { + super(); + this.originalDispatcher = originalDispatcher; + } + + @Override + public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { + if (request.getHeaders().containsKey("blocked")) { + channel.sendResponse(new BytesRestResponse(RestStatus.UNAUTHORIZED, "Hit header_verifier")); + return; + } + originalDispatcher.dispatchRequest(request, channel, threadContext); + + } + + @Override + public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + originalDispatcher.dispatchBadRequest(channel, threadContext, cause); + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index ededfdf27e42b..0838cfe8c3273 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -317,8 +317,10 @@ public ChannelHandler configureServerChannelHandler() { return new HttpChannelHandler(this, handlingSettings); } - static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-channel"); - static final AttributeKey HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-server-channel"); + public static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-channel"); + protected static final AttributeKey HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance( + "opensearch-http-server-channel" + ); protected static class HttpChannelHandler extends ChannelInitializer { @@ -351,7 +353,8 @@ protected void initChannel(Channel ch) throws Exception { ); decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); ch.pipeline().addLast("decoder", decoder); - ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor()); + ch.pipeline().addLast("header_verifier", transport.createHeaderVerifier()); + ch.pipeline().addLast("decoder_compress", transport.createDecompressor()); ch.pipeline().addLast("encoder", new HttpResponseEncoder()); final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength()); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); @@ -393,4 +396,21 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } } + + /** + * Extension point that allows a NetworkPlugin to extend the netty pipeline and inspect headers after request decoding + */ + protected ChannelInboundHandlerAdapter createHeaderVerifier() { + // pass-through + return new ChannelInboundHandlerAdapter(); + } + + /** + * Extension point that allows a NetworkPlugin to override the default netty HttpContentDecompressor and supply a custom decompressor. + * + * Used in instances to conditionally decompress depending on the outcome from header verification + */ + protected ChannelInboundHandlerAdapter createDecompressor() { + return new HttpContentDecompressor(); + } } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4Plugin.java index 885315bcc1f3c..4258fa8e04d61 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4Plugin.java @@ -144,7 +144,7 @@ public Map> getHttpTransports( ); } - private SharedGroupFactory getSharedGroupFactory(Settings settings) { + SharedGroupFactory getSharedGroupFactory(Settings settings) { SharedGroupFactory groupFactory = this.groupFactory.get(); if (groupFactory != null) { assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided"; diff --git a/server/src/main/java/org/opensearch/rest/RestHandler.java b/server/src/main/java/org/opensearch/rest/RestHandler.java index 7832649e8ad32..294dc3ffbe329 100644 --- a/server/src/main/java/org/opensearch/rest/RestHandler.java +++ b/server/src/main/java/org/opensearch/rest/RestHandler.java @@ -108,7 +108,7 @@ default List replacedRoutes() { } /** - * Controls whether requests handled by this class are allowed to to access system indices by default. + * Controls whether requests handled by this class are allowed to access system indices by default. * @return {@code true} if requests handled by this class should be allowed to access system indices. */ default boolean allowSystemIndexAccessByDefault() {