From b7cc9f7b736129b3b557ce1b38e5490124329d1e Mon Sep 17 00:00:00 2001 From: Emmanuel Hugonnet Date: Thu, 14 Dec 2023 17:17:58 +0100 Subject: [PATCH] ARTEMIS-4452: Allow to customize http header in http-upgrade request from Artemis. Using a prefix "netty.http.header." to be able to define http headers used for http request from the netty connector. Issue: https://issues.apache.org/jira/browse/ARTEMIS-4452 Signed-off-by: Emmanuel Hugonnet --- .../remoting/impl/netty/NettyConnector.java | 27 +++++++++++++-- .../client/NettyConnectorTest.java | 34 +++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 205b0ea061cf..384f7b45aaf7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -164,6 +164,8 @@ public class NettyConnector extends AbstractConnector { public static final String SEC_ACTIVEMQ_REMOTING_ACCEPT = "Sec-ActiveMQRemoting-Accept"; public static final String ACTIVEMQ_REMOTING = "activemq-remoting"; + public static final String NETTY_HTTP_HEADER_PREFIX = "netty.http.header."; + private static final AttributeKey REMOTING_KEY = AttributeKey.valueOf(SEC_ACTIVEMQ_REMOTING_KEY); // Default Configuration @@ -309,6 +311,8 @@ public class NettyConnector extends AbstractConnector { private final ClientProtocolManager protocolManager; + private final Map http_headers; + public NettyConnector(final Map configuration, final BufferHandler handler, final BaseConnectionLifeCycleListener listener, @@ -361,10 +365,17 @@ public NettyConnector(final Map configuration, httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, configuration); httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, configuration); httpRequiresSessionId = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_REQUIRES_SESSION_ID, TransportConstants.DEFAULT_HTTP_REQUIRES_SESSION_ID, configuration); + http_headers = new HashMap<>(); + for (Map.Entry header : configuration.entrySet()) { + if (header.getKey().startsWith(NETTY_HTTP_HEADER_PREFIX)) { + http_headers.put(header.getKey().substring(NETTY_HTTP_HEADER_PREFIX.length()), header.getValue().toString()); + } + } } else { httpMaxClientIdleTime = 0; httpClientIdleScanPeriod = -1; httpRequiresSessionId = false; + http_headers = Collections.emptyMap(); } httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration); @@ -743,7 +754,7 @@ public void initChannel(Channel channel) throws Exception { pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); - pipeline.addLast(new HttpHandler()); + pipeline.addLast(new HttpHandler(http_headers)); } if (httpUpgradeEnabled) { @@ -1110,9 +1121,15 @@ class HttpHandler extends ChannelDuplexHandler { private boolean handshaking = false; private String cookie; + private Map headers; - HttpHandler() throws Exception { + HttpHandler(Map headers) throws Exception { url = new URI("http", null, host, port, servletPath, null, null).toString(); + this.headers = headers; + } + + public Map getHeaders() { + return headers; } @Override @@ -1170,6 +1187,9 @@ public void write(final ChannelHandlerContext ctx, final Object msg, ChannelProm ByteBuf buf = (ByteBuf) msg; FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url, buf); httpRequest.headers().add(HttpHeaderNames.HOST, NettyConnector.this.host); + for (Map.Entry header :headers.entrySet()) { + httpRequest.headers().add(header.getKey(), header.getValue()); + } if (cookie != null) { httpRequest.headers().add(HttpHeaderNames.COOKIE, cookie); } @@ -1197,6 +1217,9 @@ public synchronized void run() { if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime) { FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); httpRequest.headers().add(HttpHeaderNames.HOST, NettyConnector.this.host); + for (Map.Entry header : headers.entrySet()) { + httpRequest.headers().add(header.getKey(), header.getValue()); + } waitingGet = true; channel.writeAndFlush(httpRequest); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/NettyConnectorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/NettyConnectorTest.java index ebbe5f51ac9b..68ebd585f5c2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/NettyConnectorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/NettyConnectorTest.java @@ -16,7 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.client; + import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelPipeline; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; @@ -60,4 +66,32 @@ public void testConnectionTimeoutConfig() throws Exception { factory.close(); locator.close(); } + + @Test + public void testConnectionHttpHeaders() throws Exception { + TransportConfiguration transport = new TransportConfiguration(NETTY_CONNECTOR_FACTORY); + transport.getParams().put(TransportConstants.HTTP_ENABLED_PROP_NAME, true); + transport.getParams().put("netty.http.header.accept", "text/html,application/xhtml+xml,application/xml"); + transport.getParams().put("netty.http.header.Accept-Encoding", "gzip,deflate"); + transport.getParams().put("netty.http.header.Accept-Language", "en-us,en;q=0.5"); + + try (ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(transport)) { + ClientSessionFactoryImpl factory = (ClientSessionFactoryImpl) locator.createSessionFactory(); + NettyConnector connector = (NettyConnector) factory.getConnector(); + + Bootstrap bootstrap = connector.getBootStrap(); + ChannelPipeline pipeline = bootstrap.register().channel().pipeline(); + pipeline.flush(); + Object httpHandler = pipeline.get("NettyConnector$HttpHandler#0"); + Method getHeadersMethod = httpHandler.getClass().getMethod("getHeaders", null); + getHeadersMethod.setAccessible(true); + Map headers = (Map) getHeadersMethod.invoke(httpHandler, null); + assertEquals(3, headers.size()); + assertTrue(headers.containsKey("accept")); + assertTrue(headers.containsKey("Accept-Encoding")); + assertTrue(headers.containsKey("Accept-Language")); + assertFalse(headers.containsKey("test")); + factory.close(); + } + } }