Skip to content

Commit

Permalink
Bump the AWS SDK version to 2.22.12 and 1.12.633 to fix all vulnerabi…
Browse files Browse the repository at this point in the history
…lities
  • Loading branch information
YangSan0622 committed Jan 16, 2024
1 parent fb7e721 commit 240c2fd
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void testBuildConfig_fromMap_succeeds() {
public void testBuildConfig_noRegionConfigsSupplied_throwsException() {
Map<String, Object> configWithoutRegion = new HashMap<>();
configWithoutRegion.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, "https://test/");
System.setProperty("aws.profile", "");

Exception exception = assertThrows(AWSSchemaRegistryException.class,
() -> new GlueSchemaRegistryConfiguration(configWithoutRegion));
Expand Down
15 changes: 15 additions & 0 deletions integration-tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,18 @@ services:
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes

localstack:
container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
image: 'public.ecr.aws/localstack/localstack:latest'
ports:
- "127.0.0.1:4566:4566"
environment:
- SERVICES=cloudwatch, dynamodb, kinesis
- DEBUG=1
- DOCKER_HOST=unix:///var/run/docker.sock
- DEFAULT_REGION=us-east-2
- PARITY_AWS_ACCESS_KEY_ID=1
volumes:
- "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
4 changes: 2 additions & 2 deletions integration-tests/run-local-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ cleanUpConnectFiles() {

cleanUpDockerResources || true
# Start Kafka using docker command asynchronously
docker-compose up &
docker-compose up --no-attach localstack &
sleep 10
## Run mvn tests for Kafka and Kinesis Platforms
cd .. && mvn --file integration-tests/pom.xml verify -Psurefire -X && cd integration-tests
Expand All @@ -131,7 +131,7 @@ downloadMongoDBConnector
copyGSRConverters

runConnectTests() {
docker-compose up &
docker-compose up --no-attach localstack &
setUpMongoDBLocal
startKafkaConnectTasks ${1}
echo "Waiting for Sink task to pick up data.."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void doKafkaStreamsProcess(final ProducerProperties producerProperties) t
final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.cleanUp();
streams.start();
Thread.sleep(1000L);
Thread.sleep(5000L);
streams.close();

log.info("Finish processing {} message streaming via Kafka.", producerProperties.getDataFormat());
Expand Down Expand Up @@ -360,9 +360,9 @@ private Properties getKafkaConsumerProperties(final ConsumerProperties consumerP

private Properties getKafkaStreamsProperties(final ProducerProperties producerProperties) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-test");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-test-"+producerProperties.getDataFormat());
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapBrokers);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GlueSchemaRegistryKafkaStreamsSerde.class);
properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, producerProperties.getDataFormat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

import cloud.localstack.Constants;
import cloud.localstack.ServiceName;
import cloud.localstack.awssdkv2.TestUtils;
import cloud.localstack.docker.LocalstackDockerExtension;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
Expand Down Expand Up @@ -49,7 +46,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -106,15 +102,11 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(LocalstackDockerExtension.class)
@LocalstackDockerProperties(services = {ServiceName.KINESIS, ServiceName.DYNAMO, ServiceName.CLOUDWATCH}, imageName =
"public.ecr.aws/d4c7g6k3/localstack", imageTag = "0.12.10")
public class GlueSchemaRegistryKinesisIntegrationTest {
private static final Logger LOGGER = LogManager.getLogger(GlueSchemaRegistryKinesisIntegrationTest.class);
private static final DynamoDbAsyncClient dynamoClient = TestUtils.getClientDyanamoAsyncV2();
private static final CloudWatchAsyncClient cloudWatchClient = TestUtils.getClientCloudWatchAsyncV2();
private static final String LOCALSTACK_HOSTNAME = "localhost";
private static final int LOCALSTACK_KINESIS_PORT = 4566;
private static final String LOCALSTACK_ENDPOINT = String.format("http://%s:%d",LOCALSTACK_HOSTNAME,LOCALSTACK_KINESIS_PORT);
private static final int LOCALSTACK_CLOUDWATCH_PORT = Constants.DEFAULT_PORTS.get(ServiceName.CLOUDWATCH)
.intValue();
private static final int KCL_SCHEDULER_START_UP_WAIT_TIME_SECONDS = 15;
Expand All @@ -140,6 +132,26 @@ public class GlueSchemaRegistryKinesisIntegrationTest {
private static AwsCredentialsProvider awsCredentialsProvider = DefaultCredentialsProvider.builder()
.build();

private static final DynamoDbAsyncClient dynamoClient;
private static final CloudWatchAsyncClient cloudWatchClient;

static {
try {
dynamoClient = DynamoDbAsyncClient.builder()
.endpointOverride(new URI(LOCALSTACK_ENDPOINT))
.region(Region.of(GlueSchemaRegistryConnectionProperties.REGION))
.credentialsProvider(awsCredentialsProvider)
.build();
cloudWatchClient = CloudWatchAsyncClient.builder()
.endpointOverride(new URI(LOCALSTACK_ENDPOINT))
.region(Region.of(GlueSchemaRegistryConnectionProperties.REGION))
.credentialsProvider(awsCredentialsProvider)
.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

private static List<String> schemasToCleanUp = new ArrayList<>();
private final TestDataGeneratorFactory testDataGeneratorFactory = new TestDataGeneratorFactory();
private final GlueSchemaRegistrySerializerFactory glueSchemaRegistrySerializerFactory =
Expand Down Expand Up @@ -219,9 +231,12 @@ private static Stream<Arguments> testSingleKCLKPLDataProvider() {
}

@BeforeEach
public void setUp() throws InterruptedException, ExecutionException {
public void setUp() throws InterruptedException, ExecutionException, URISyntaxException {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
kinesisClient = TestUtils.getClientKinesisAsyncV2();
kinesisClient = KinesisAsyncClient.builder()
.endpointOverride(new URI(LOCALSTACK_ENDPOINT))
.region(Region.of(GlueSchemaRegistryConnectionProperties.REGION))
.build();

streamName = String.format("%s%s", TEST_KINESIS_STREAM_PREFIX, RandomStringUtils.randomAlphanumeric(4));
LOGGER.info("Creating Kinesis Stream : {} with {} shards on localStack..", streamName, SHARD_COUNT);
Expand Down Expand Up @@ -496,7 +511,7 @@ private String produceRecordsWithKPL(String streamName,
byte[] serializedBytes = dataFormatSerializer.serialize(record);

putFutures.add(producer.addUserRecord(streamName, Long.toString(timestamp.toEpochMilli()), null,
ByteBuffer.wrap(serializedBytes), gsrSchema));
ByteBuffer.wrap(serializedBytes),gsrSchema));
}

String shardId = null;
Expand Down
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<glue.schema.registry.groupId>software.amazon.glue</glue.schema.registry.groupId>
<aws.sdk.v2.version>2.18.4</aws.sdk.v2.version>
<aws.sdk.v1.version>1.12.151</aws.sdk.v1.version>
<aws.sdk.v2.version>2.22.12</aws.sdk.v2.version>
<aws.sdk.v1.version>1.12.633</aws.sdk.v1.version>
<kafka.scala.version>2.12</kafka.scala.version>
<kafka.version>3.6.0</kafka.version>
<kafka.version>3.6.1</kafka.version>
<avro.version>1.11.3</avro.version>
<mbknor.jsonschema.converter.version>1.0.39</mbknor.jsonschema.converter.version>
<everit.json.schema.version>1.14.2</everit.json.schema.version>
Expand All @@ -105,16 +105,16 @@
<hamcrest.version>1.1</hamcrest.version>
<!-- LATEST KCL Does not work with LocalStack yet, remove once new version works -->
<kinesis.client.version>2.2.9</kinesis.client.version>
<kinesis.producer.version>LATEST</kinesis.producer.version>
<!-- LATEST KPL will cause integration test failure in Linux environment, update once we find a way to address the issue -->
<kinesis.producer.version>0.15.8</kinesis.producer.version>
<junit.platform.commons.version>1.6.2</junit.platform.commons.version>
<junit.platform.surefire.version>1.3.2</junit.platform.surefire.version>
<guava.version>32.0.0-jre</guava.version>
<commons.lang3.version>3.8.1</commons.lang3.version>
<commons.cli.version>1.2</commons.cli.version>
<awaitility.version>3.0.0</awaitility.version>
<localstack.utils>0.2.11</localstack.utils>
<protobuf.java.version>3.19.6</protobuf.java.version>
<localstack.utils>0.2.11</localstack.utils>
<localstack.utils>0.2.23</localstack.utils>
<aws.msk.iam.auth>1.1.5</aws.msk.iam.auth>
</properties>

Expand Down

0 comments on commit 240c2fd

Please sign in to comment.