Skip to content

Commit

Permalink
Fix #1918 to add a integration test with camel-jdbc and camel-jms
Browse files Browse the repository at this point in the history
  • Loading branch information
zhfeng committed Feb 22, 2021
1 parent 58249a7 commit cbf68d7
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 1 deletion.
36 changes: 36 additions & 0 deletions integration-tests/jta/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,60 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mock</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-jta</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-jdbc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-artemis-jms</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-testcontainers-support</artifactId>
<scope>test</scope>
</dependency>

<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
Expand Down Expand Up @@ -108,6 +142,8 @@
</activation>
<properties>
<quarkus.package.type>native</quarkus.package.type>
<!-- TODO check this option when https://github.com/quarkusio/quarkus/issues/14871 is resolved -->
<quarkus.native.additional-build-args>--report-unsupported-elements-at-runtime</quarkus.native.additional-build-args>
</properties>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,61 @@
package org.apache.camel.quarkus.component.jta.it;

import java.net.URI;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.transaction.Transactional;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import io.agroal.api.AgroalDataSource;
import io.quarkus.agroal.DataSource;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mock.MockEndpoint;
import org.jboss.logging.Logger;

@Path("/jta")
@ApplicationScoped
public class JtaResource {

private static final Logger LOG = Logger.getLogger(JtaResource.class);

@Inject
@DataSource("camel-ds")
AgroalDataSource dataSource;

@Inject
ProducerTemplate producerTemplate;

@Inject
CamelContext context;

@PostConstruct
void postConstruct() throws SQLException {
try (Connection conn = dataSource.getConnection()) {
try (Statement statement = conn.createStatement()) {
try {
statement.execute("drop table example");
} catch (Exception ignored) {
}
statement.execute("create table example (id serial primary key, message varchar(255) not null)");
}
}
}

@Path("/{policy}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
Expand All @@ -64,4 +95,42 @@ public Response postInTx(@PathParam("policy") String policy, String message) thr
return post(policy, message);
}

@Path("/jdbc")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response jdbc(String message) throws Exception {
LOG.infof("message is %s", message);
MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
mockEndpoint.reset();
if (!message.equals("fail")) {
mockEndpoint.expectedMessageCount(1);
mockEndpoint.message(0).body().isEqualTo(message);
}
final String response = producerTemplate.requestBody("direct:transaction", message, String.class);
mockEndpoint.assertIsSatisfied(15000);

LOG.infof("Got response from jta: %s", response);
return Response
.created(new URI("https://camel.apache.org/"))
.entity(response)
.build();
}

@Path("/mock")
@GET
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response mock() throws Exception {
MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
List<Exchange> exchanges = mockEndpoint.getExchanges();
if (exchanges.isEmpty()) {
return Response.ok().entity("empty").build();
} else {
Message message = exchanges.get(0).getMessage();

LOG.infof("mock message is " + message.getBody());
return Response.ok().entity(message.getBody()).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,24 @@ public void configure() throws Exception {

from("direct:not_supported")
.transacted("PROPAGATION_NOT_SUPPORTED").transform().constant("not_supported");

from("direct:transaction")
.transacted()
.setHeader("message", body())
.to("jms:queue:txTest?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
.transform().simple("insert into example(message) values ('${body}')")
.to("jdbc:camel-ds?resetAutoCommit=false")
.choice()
.when(header("message").startsWith("fail"))
.log("Failing forever with exception")
.process(x -> {
throw new RuntimeException("Fail");
})
.otherwise()
.transform().simple("${header.message} added")
.endChoice();

from("jms:queue:txTest?connectionFactory=#xaConnectionFactory")
.to("mock:txResult");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.jta.it;

import javax.enterprise.context.Dependent;
import javax.inject.Named;
import javax.transaction.TransactionManager;

import io.quarkus.artemis.core.runtime.ArtemisRuntimeConfig;
import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
import org.jboss.narayana.jta.jms.ConnectionFactoryProxy;
import org.jboss.narayana.jta.jms.TransactionHelperImpl;

@Dependent
@Named("xaConnectionFactory")
public class WapperXAConnectionFactory extends ConnectionFactoryProxy {

// This class should be remove if https://github.com/quarkusio/quarkus/issues/14871 resolved
// And the ConnectionFactory could be integrated with TransactionManager
public WapperXAConnectionFactory(TransactionManager tm, ArtemisRuntimeConfig config) {
super(new ActiveMQXAConnectionFactory(
config.url, config.username.orElse(null), config.password.orElse(null)),
new TransactionHelperImpl(tm));
}
}
24 changes: 24 additions & 0 deletions integration-tests/jta/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

#
# Quarkus :: DS
#
quarkus.datasource.camel-ds.jdbc.url=jdbc:h2:tcp://localhost/mem:test
quarkus.datasource.camel-ds.db-kind=h2
quarkus.datasource.camel-ds.jdbc.max-size=8
quarkus.datasource.camel-ds.jdbc.transactions=xa
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.jta.it;

import java.util.Map;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.apache.camel.util.CollectionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.TestcontainersConfiguration;

public class ActiveMQXATestResource implements QuarkusTestResourceLifecycleManager {

private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQXATestResource.class);
private static final String ACTIVEMQ_IMAGE = "vromero/activemq-artemis:2.11.0-alpine";
private static final String ACTIVEMQ_USERNAME = "artemis";
private static final String ACTIVEMQ_PASSWORD = "simetraehcapa";
private static final int ACTIVEMQ_PORT = 61616;

private GenericContainer<?> container;

@Override
public Map<String, String> start() {
LOGGER.info(TestcontainersConfiguration.getInstance().toString());

try {
container = new GenericContainer<>(ACTIVEMQ_IMAGE)
.withExposedPorts(ACTIVEMQ_PORT)
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.withEnv("BROKER_CONFIG_MAX_DISK_USAGE", "100")
.waitingFor(Wait.forListeningPort());

container.start();

String brokerUrlTcp = String.format("tcp://%s:%d/", container.getContainerIpAddress(),
container.getMappedPort(ACTIVEMQ_PORT));

return CollectionHelper.mapOf(
"quarkus.artemis.url", brokerUrlTcp,
"quarkus.artemis.username", ACTIVEMQ_USERNAME,
"quarkus.artemis.password", ACTIVEMQ_PASSWORD);

} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void stop() {
try {
if (container != null) {
container.stop();
}
} catch (Exception e) {
// ignored
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.camel.quarkus.component.jta.it;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.h2.H2DatabaseTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
Expand All @@ -24,6 +26,8 @@
import static org.hamcrest.Matchers.is;

@QuarkusTest
@QuarkusTestResource(H2DatabaseTestResource.class)
@QuarkusTestResource(ActiveMQXATestResource.class)
class JtaTest {

@Test
Expand Down Expand Up @@ -131,4 +135,32 @@ public void testInTx() {
.statusCode(201)
.body(is("not_supported"));
}

@Test
public void testJdbcInTx() {
final String msg = java.util.UUID.randomUUID().toString().replace("-", "");

RestAssured.given()
.contentType(ContentType.TEXT)
.body(msg)
.post("/jta/jdbc")
.then()
.statusCode(201)
.body(is(msg + " added"));

RestAssured.given()
.contentType(ContentType.TEXT)
.body("fail")
.post("/jta/jdbc")
.then()
.statusCode(500);

RestAssured.given()
.contentType(ContentType.TEXT)
.get("/jta/mock")
.then()
.statusCode(200)
.body(is("empty"))
.log();
}
}

0 comments on commit cbf68d7

Please sign in to comment.