Skip to content

Commit

Permalink
ARTEMIS-4452: Allow to customize http header in http-upgrade request …
Browse files Browse the repository at this point in the history
…from Artemis.

Using a prefix "netty.http.header." to be able to define http headers
used for http request from the netty connector.

Issue: https://issues.apache.org/jira/browse/ARTEMIS-4452

Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
  • Loading branch information
ehsavoie committed Dec 14, 2023
1 parent bd925a7 commit b7cc9f7
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ public class NettyConnector extends AbstractConnector {
public static final String SEC_ACTIVEMQ_REMOTING_ACCEPT = "Sec-ActiveMQRemoting-Accept";
public static final String ACTIVEMQ_REMOTING = "activemq-remoting";

public static final String NETTY_HTTP_HEADER_PREFIX = "netty.http.header.";

private static final AttributeKey<String> REMOTING_KEY = AttributeKey.valueOf(SEC_ACTIVEMQ_REMOTING_KEY);

// Default Configuration
Expand Down Expand Up @@ -309,6 +311,8 @@ public class NettyConnector extends AbstractConnector {

private final ClientProtocolManager protocolManager;

private final Map<String, String> http_headers;

public NettyConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final BaseConnectionLifeCycleListener<?> listener,
Expand Down Expand Up @@ -361,10 +365,17 @@ public NettyConnector(final Map<String, Object> configuration,
httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, configuration);
httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, configuration);
httpRequiresSessionId = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_REQUIRES_SESSION_ID, TransportConstants.DEFAULT_HTTP_REQUIRES_SESSION_ID, configuration);
http_headers = new HashMap<>();
for (Map.Entry<String, Object> header : configuration.entrySet()) {
if (header.getKey().startsWith(NETTY_HTTP_HEADER_PREFIX)) {
http_headers.put(header.getKey().substring(NETTY_HTTP_HEADER_PREFIX.length()), header.getValue().toString());
}
}
} else {
httpMaxClientIdleTime = 0;
httpClientIdleScanPeriod = -1;
httpRequiresSessionId = false;
http_headers = Collections.emptyMap();
}

httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration);
Expand Down Expand Up @@ -743,7 +754,7 @@ public void initChannel(Channel channel) throws Exception {

pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));

pipeline.addLast(new HttpHandler());
pipeline.addLast(new HttpHandler(http_headers));
}

if (httpUpgradeEnabled) {
Expand Down Expand Up @@ -1110,9 +1121,15 @@ class HttpHandler extends ChannelDuplexHandler {
private boolean handshaking = false;

private String cookie;
private Map<String, String> headers;

HttpHandler() throws Exception {
HttpHandler(Map<String, String> headers) throws Exception {
url = new URI("http", null, host, port, servletPath, null, null).toString();
this.headers = headers;
}

public Map<String, String> getHeaders() {
return headers;
}

@Override
Expand Down Expand Up @@ -1170,6 +1187,9 @@ public void write(final ChannelHandlerContext ctx, final Object msg, ChannelProm
ByteBuf buf = (ByteBuf) msg;
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url, buf);
httpRequest.headers().add(HttpHeaderNames.HOST, NettyConnector.this.host);
for (Map.Entry<String, String> header :headers.entrySet()) {
httpRequest.headers().add(header.getKey(), header.getValue());
}
if (cookie != null) {
httpRequest.headers().add(HttpHeaderNames.COOKIE, cookie);
}
Expand Down Expand Up @@ -1197,6 +1217,9 @@ public synchronized void run() {
if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime) {
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
httpRequest.headers().add(HttpHeaderNames.HOST, NettyConnector.this.host);
for (Map.Entry<String, String> header : headers.entrySet()) {
httpRequest.headers().add(header.getKey(), header.getValue());
}
waitingGet = true;
channel.writeAndFlush(httpRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.client;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
Expand Down Expand Up @@ -60,4 +66,32 @@ public void testConnectionTimeoutConfig() throws Exception {
factory.close();
locator.close();
}

@Test
public void testConnectionHttpHeaders() throws Exception {
TransportConfiguration transport = new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
transport.getParams().put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
transport.getParams().put("netty.http.header.accept", "text/html,application/xhtml+xml,application/xml");
transport.getParams().put("netty.http.header.Accept-Encoding", "gzip,deflate");
transport.getParams().put("netty.http.header.Accept-Language", "en-us,en;q=0.5");

try (ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(transport)) {
ClientSessionFactoryImpl factory = (ClientSessionFactoryImpl) locator.createSessionFactory();
NettyConnector connector = (NettyConnector) factory.getConnector();

Bootstrap bootstrap = connector.getBootStrap();
ChannelPipeline pipeline = bootstrap.register().channel().pipeline();
pipeline.flush();
Object httpHandler = pipeline.get("NettyConnector$HttpHandler#0");
Method getHeadersMethod = httpHandler.getClass().getMethod("getHeaders", null);
getHeadersMethod.setAccessible(true);
Map<String, String> headers = (Map<String, String>) getHeadersMethod.invoke(httpHandler, null);
assertEquals(3, headers.size());
assertTrue(headers.containsKey("accept"));
assertTrue(headers.containsKey("Accept-Encoding"));
assertTrue(headers.containsKey("Accept-Language"));
assertFalse(headers.containsKey("test"));
factory.close();
}
}
}

0 comments on commit b7cc9f7

Please sign in to comment.