diff --git a/integration-tests/jta/pom.xml b/integration-tests/jta/pom.xml
index 849e4a711541..2598853782d3 100644
--- a/integration-tests/jta/pom.xml
+++ b/integration-tests/jta/pom.xml
@@ -47,14 +47,38 @@
org.apache.camel.quarkus
camel-quarkus-direct
+
+ org.apache.camel.quarkus
+ camel-quarkus-mock
+
org.apache.camel.quarkus
camel-quarkus-jta
+
+ org.apache.camel.quarkus
+ camel-quarkus-jms
+
+
+ org.apache.camel.quarkus
+ camel-quarkus-jdbc
+
io.quarkus
quarkus-resteasy
+
+ io.quarkus
+ quarkus-jackson
+
+
+ io.quarkus
+ quarkus-jdbc-h2
+
+
+ io.quarkus
+ quarkus-artemis-jms
+
@@ -62,11 +86,21 @@
quarkus-junit5
test
+
+ io.quarkus
+ quarkus-test-h2
+ test
+
io.rest-assured
rest-assured
test
+
+ org.apache.camel.quarkus
+ camel-quarkus-integration-testcontainers-support
+ test
+
@@ -108,6 +142,8 @@
native
+
+ --report-unsupported-elements-at-runtime
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
index 5c757a5600bb..e73a6580435d 100644
--- a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
@@ -17,11 +17,17 @@
package org.apache.camel.quarkus.component.jta.it;
import java.net.URI;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.transaction.Transactional;
import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@@ -29,18 +35,43 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import io.agroal.api.AgroalDataSource;
+import io.quarkus.agroal.DataSource;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
import org.jboss.logging.Logger;
@Path("/jta")
@ApplicationScoped
public class JtaResource {
-
private static final Logger LOG = Logger.getLogger(JtaResource.class);
+ @Inject
+ @DataSource("camel-ds")
+ AgroalDataSource dataSource;
+
@Inject
ProducerTemplate producerTemplate;
+ @Inject
+ CamelContext context;
+
+ @PostConstruct
+ void postConstruct() throws SQLException {
+ try (Connection conn = dataSource.getConnection()) {
+ try (Statement statement = conn.createStatement()) {
+ try {
+ statement.execute("drop table example");
+ } catch (Exception ignored) {
+ }
+ statement.execute("create table example (id serial primary key, message varchar(255) not null)");
+ }
+ }
+ }
+
@Path("/{policy}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@@ -64,4 +95,42 @@ public Response postInTx(@PathParam("policy") String policy, String message) thr
return post(policy, message);
}
+ @Path("/jdbc")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response jdbc(String message) throws Exception {
+ LOG.infof("message is %s", message);
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
+ mockEndpoint.reset();
+ if (!message.equals("fail")) {
+ mockEndpoint.expectedMessageCount(1);
+ mockEndpoint.message(0).body().isEqualTo(message);
+ }
+ final String response = producerTemplate.requestBody("direct:transaction", message, String.class);
+ mockEndpoint.assertIsSatisfied(15000);
+
+ LOG.infof("Got response from jta: %s", response);
+ return Response
+ .created(new URI("https://camel.apache.org/"))
+ .entity(response)
+ .build();
+ }
+
+ @Path("/mock")
+ @GET
+ @Consumes(MediaType.TEXT_PLAIN)
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response mock() throws Exception {
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
+ List exchanges = mockEndpoint.getExchanges();
+ if (exchanges.isEmpty()) {
+ return Response.ok().entity("empty").build();
+ } else {
+ Message message = exchanges.get(0).getMessage();
+
+ LOG.infof("mock message is " + message.getBody());
+ return Response.ok().entity(message.getBody()).build();
+ }
+ }
}
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
index a2fd01f56764..bb6b2778e731 100644
--- a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
@@ -47,5 +47,24 @@ public void configure() throws Exception {
from("direct:not_supported")
.transacted("PROPAGATION_NOT_SUPPORTED").transform().constant("not_supported");
+
+ from("direct:transaction")
+ .transacted()
+ .setHeader("message", body())
+ .to("jms:queue:txTest?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+ .transform().simple("insert into example(message) values ('${body}')")
+ .to("jdbc:camel-ds?resetAutoCommit=false")
+ .choice()
+ .when(header("message").startsWith("fail"))
+ .log("Failing forever with exception")
+ .process(x -> {
+ throw new RuntimeException("Fail");
+ })
+ .otherwise()
+ .transform().simple("${header.message} added")
+ .endChoice();
+
+ from("jms:queue:txTest?connectionFactory=#xaConnectionFactory")
+ .to("mock:txResult");
}
}
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/WapperXAConnectionFactory.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/WapperXAConnectionFactory.java
new file mode 100644
index 000000000000..85934151f132
--- /dev/null
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/WapperXAConnectionFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.jta.it;
+
+import javax.enterprise.context.Dependent;
+import javax.inject.Named;
+import javax.transaction.TransactionManager;
+
+import io.quarkus.artemis.core.runtime.ArtemisRuntimeConfig;
+import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
+import org.jboss.narayana.jta.jms.ConnectionFactoryProxy;
+import org.jboss.narayana.jta.jms.TransactionHelperImpl;
+
+@Dependent
+@Named("xaConnectionFactory")
+public class WapperXAConnectionFactory extends ConnectionFactoryProxy {
+
+ // This class should be remove if https://github.com/quarkusio/quarkus/issues/14871 resolved
+ // And the ConnectionFactory could be integrated with TransactionManager
+ public WapperXAConnectionFactory(TransactionManager tm, ArtemisRuntimeConfig config) {
+ super(new ActiveMQXAConnectionFactory(
+ config.url, config.username.orElse(null), config.password.orElse(null)),
+ new TransactionHelperImpl(tm));
+ }
+}
diff --git a/integration-tests/jta/src/main/resources/application.properties b/integration-tests/jta/src/main/resources/application.properties
new file mode 100644
index 000000000000..dcd0d95fc2eb
--- /dev/null
+++ b/integration-tests/jta/src/main/resources/application.properties
@@ -0,0 +1,24 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# Quarkus :: DS
+#
+quarkus.datasource.camel-ds.jdbc.url=jdbc:h2:tcp://localhost/mem:test
+quarkus.datasource.camel-ds.db-kind=h2
+quarkus.datasource.camel-ds.jdbc.max-size=8
+quarkus.datasource.camel-ds.jdbc.transactions=xa
diff --git a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/ActiveMQXATestResource.java b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/ActiveMQXATestResource.java
new file mode 100644
index 000000000000..823b31166bba
--- /dev/null
+++ b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/ActiveMQXATestResource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.jta.it;
+
+import java.util.Map;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.apache.camel.util.CollectionHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.TestcontainersConfiguration;
+
+public class ActiveMQXATestResource implements QuarkusTestResourceLifecycleManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQXATestResource.class);
+ private static final String ACTIVEMQ_IMAGE = "vromero/activemq-artemis:2.11.0-alpine";
+ private static final String ACTIVEMQ_USERNAME = "artemis";
+ private static final String ACTIVEMQ_PASSWORD = "simetraehcapa";
+ private static final int ACTIVEMQ_PORT = 61616;
+
+ private GenericContainer> container;
+
+ @Override
+ public Map start() {
+ LOGGER.info(TestcontainersConfiguration.getInstance().toString());
+
+ try {
+ container = new GenericContainer<>(ACTIVEMQ_IMAGE)
+ .withExposedPorts(ACTIVEMQ_PORT)
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER))
+ .withEnv("BROKER_CONFIG_MAX_DISK_USAGE", "100")
+ .waitingFor(Wait.forListeningPort());
+
+ container.start();
+
+ String brokerUrlTcp = String.format("tcp://%s:%d/", container.getContainerIpAddress(),
+ container.getMappedPort(ACTIVEMQ_PORT));
+
+ return CollectionHelper.mapOf(
+ "quarkus.artemis.url", brokerUrlTcp,
+ "quarkus.artemis.username", ACTIVEMQ_USERNAME,
+ "quarkus.artemis.password", ACTIVEMQ_PASSWORD);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (container != null) {
+ container.stop();
+ }
+ } catch (Exception e) {
+ // ignored
+ }
+ }
+}
diff --git a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
index bf7382ce4682..95dcd1c51ce5 100644
--- a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
+++ b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.quarkus.component.jta.it;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
@@ -24,6 +26,8 @@
import static org.hamcrest.Matchers.is;
@QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+@QuarkusTestResource(ActiveMQXATestResource.class)
class JtaTest {
@Test
@@ -131,4 +135,32 @@ public void testInTx() {
.statusCode(201)
.body(is("not_supported"));
}
+
+ @Test
+ public void testJdbcInTx() {
+ final String msg = java.util.UUID.randomUUID().toString().replace("-", "");
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(msg)
+ .post("/jta/jdbc")
+ .then()
+ .statusCode(201)
+ .body(is(msg + " added"));
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body("fail")
+ .post("/jta/jdbc")
+ .then()
+ .statusCode(500);
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .get("/jta/mock")
+ .then()
+ .statusCode(200)
+ .body(is("empty"))
+ .log();
+ }
}