diff --git a/CHANGES.md b/CHANGES.md index 83e57d583ef14..4c0cb3ce3c241 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -78,6 +78,7 @@ * Passing a tag into MultiProcessShared is now required in the Python SDK ([#26168](https://github.com/apache/beam/issues/26168)). * CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow runner as the Google Cloud Debugger service is [shutting down](https://cloud.google.com/debugger/docs/deprecations). (Java) ([#25959](https://github.com/apache/beam/issues/25959)). * AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20)) are finally removed ([#26681](https://github.com/apache/beam/issues/26681)). +* AWS 2 SnsIO.writeAsync (deprecated in Beam v2.37.0 due to risk of data loss) was finally removed ([#26710](https://github.com/apache/beam/issues/26710)). ## Deprecations diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java deleted file mode 100644 index 7b9ea39989f44..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import static org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory.defaultFactory; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import java.net.URI; -import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.checkerframework.checker.nullness.qual.Nullable; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sns.SnsAsyncClient; - -/** Basic implementation of {@link SnsAsyncClientProvider} used by default in {@link SnsIO}. */ -class BasicSnsAsyncClientProvider implements SnsAsyncClientProvider { - private final ClientConfiguration config; - - BasicSnsAsyncClientProvider( - AwsCredentialsProvider credentialsProvider, String region, @Nullable URI endpoint) { - checkArgument(credentialsProvider != null, "awsCredentialsProvider can not be null"); - checkArgument(region != null, "region can not be null"); - config = ClientConfiguration.create(credentialsProvider, Region.of(region), endpoint); - } - - @Override - public SnsAsyncClient getSnsAsyncClient() { - return defaultFactory().create(SnsAsyncClient.builder(), config, null).build(); - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - BasicSnsAsyncClientProvider that = (BasicSnsAsyncClientProvider) o; - return config.equals(that.config); - } - - @Override - public int hashCode() { - return config.hashCode(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java deleted file mode 100644 index 372ea73ef81fc..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import java.io.Serializable; -import software.amazon.awssdk.services.sns.SnsAsyncClient; - -/** - * Provides instances of Asynchronous SNS client. - * - *

Please note, that any instance of {@link SnsAsyncClientProvider} must be {@link Serializable} - * to ensure it can be sent to worker machines. - */ -public interface SnsAsyncClientProvider extends Serializable { - SnsAsyncClient getSnsAsyncClient(); -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java index f8e40d00fadd8..7b9982517b80d 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java @@ -18,11 +18,8 @@ package org.apache.beam.sdk.io.aws2.sns; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; -import java.net.URI; -import java.util.function.BiConsumer; import java.util.function.Consumer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory; @@ -45,7 +42,6 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sns.SnsAsyncClient; import software.amazon.awssdk.services.sns.SnsClient; import software.amazon.awssdk.services.sns.model.InvalidParameterException; import software.amazon.awssdk.services.sns.model.NotFoundException; @@ -114,16 +110,6 @@ public static Write write() { .build(); } - /** - * @deprecated Please use {@link SnsIO#write()} to avoid the risk of data loss. - * @see Issue #21366, BEAM-13203 - */ - @Deprecated - public static WriteAsync writeAsync() { - return new AutoValue_SnsIO_WriteAsync.Builder().build(); - } - /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write @@ -305,152 +291,4 @@ public void tearDown() { } } } - - /** - * Implementation of {@link #writeAsync}. - * - * @deprecated Please use {@link SnsIO#write()} to avoid the risk of data loss. - * @see Issue #21366, BEAM-13203 - */ - @Deprecated - @AutoValue - public abstract static class WriteAsync - extends PTransform, PCollection>> { - - abstract @Nullable SnsAsyncClientProvider getSnsClientProvider(); - - /** SerializableFunction to create PublishRequest. */ - abstract @Nullable SerializableFunction getPublishRequestFn(); - - /** Coder for element T. */ - abstract @Nullable Coder getCoder(); - - abstract Builder builder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setSnsClientProvider(SnsAsyncClientProvider asyncClientProvider); - - abstract Builder setCoder(Coder elementCoder); - - abstract Builder setPublishRequestFn( - SerializableFunction publishRequestFn); - - abstract WriteAsync build(); - } - - /** - * Specify a Coder for SNS PublishRequest object. - * - * @param elementCoder Coder - */ - public WriteAsync withCoder(Coder elementCoder) { - checkNotNull(elementCoder, "elementCoder cannot be null"); - return builder().setCoder(elementCoder).build(); - } - - /** - * Specify a function for converting a message into PublishRequest object. - * - * @param publishRequestFn publishRequestFn - */ - public WriteAsync withPublishRequestFn( - SerializableFunction publishRequestFn) { - checkNotNull(publishRequestFn, "publishRequestFn cannot be null"); - return builder().setPublishRequestFn(publishRequestFn).build(); - } - - /** - * Allows to specify custom {@link SnsAsyncClientProvider}. {@link SnsAsyncClientProvider} - * creates new {@link SnsAsyncClientProvider} which is later used for writing to a SNS topic. - */ - public WriteAsync withSnsClientProvider(SnsAsyncClientProvider asyncClientProvider) { - checkNotNull(asyncClientProvider, "asyncClientProvider cannot be null"); - return builder().setSnsClientProvider(asyncClientProvider).build(); - } - - /** - * Specify credential details and region to be used to write to SNS. If you need more - * sophisticated credential protocol, then you should look at {@link - * WriteAsync#withSnsClientProvider(SnsAsyncClientProvider)}. - */ - public WriteAsync withSnsClientProvider( - AwsCredentialsProvider credentialsProvider, String region) { - checkNotNull(credentialsProvider, "credentialsProvider cannot be null"); - checkNotNull(region, "region cannot be null"); - return withSnsClientProvider(credentialsProvider, region, null); - } - - /** - * Specify credential details and region to be used to write to SNS. If you need more - * sophisticated credential protocol, then you should look at {@link - * WriteAsync#withSnsClientProvider(SnsAsyncClientProvider)}. - * - *

The {@code serviceEndpoint} sets an alternative service host. - */ - public WriteAsync withSnsClientProvider( - AwsCredentialsProvider credentialsProvider, String region, URI serviceEndpoint) { - checkNotNull(credentialsProvider, "credentialsProvider cannot be null"); - checkNotNull(region, "region cannot be null"); - return withSnsClientProvider( - new BasicSnsAsyncClientProvider(credentialsProvider, region, serviceEndpoint)); - } - - @Override - public PCollection> expand(PCollection input) { - checkArgument(getSnsClientProvider() != null, "withSnsClientProvider() needs to called"); - checkArgument(getPublishRequestFn() != null, "withPublishRequestFn() needs to called"); - checkArgument(getCoder() != null, "withElementCoder() needs to called"); - - return input - .apply(ParDo.of(new SnsWriteAsyncFn<>(this))) - .setCoder(SnsResponseCoder.of(getCoder())); - } - - private static class SnsWriteAsyncFn extends DoFn> { - - private static final Logger LOG = LoggerFactory.getLogger(SnsWriteAsyncFn.class); - - private final WriteAsync spec; - private transient SnsAsyncClient client; - - SnsWriteAsyncFn(WriteAsync spec) { - this.spec = spec; - } - - @Setup - public void setup() { - this.client = spec.getSnsClientProvider().getSnsAsyncClient(); - } - - @SuppressWarnings("FutureReturnValueIgnored") - @ProcessElement - public void processElement(ProcessContext context) { - PublishRequest publishRequest = spec.getPublishRequestFn().apply(context.element()); - client.publish(publishRequest).whenComplete(getPublishResponse(context)); - } - - private BiConsumer getPublishResponse( - DoFn>.ProcessContext context) { - return (response, ex) -> { - if (ex == null) { - SnsResponse snsResponse = SnsResponse.of(context.element(), response); - context.output(snsResponse); - } else { - LOG.error("Error while publishing request to SNS", ex); - throw new SnsWriteException("Error while publishing request to SNS", ex); - } - }; - } - } - } - - /** Exception class for SNS write exceptions. */ - protected static class SnsWriteException extends RuntimeException { - - SnsWriteException(String message, Throwable error) { - super(message, error); - } - } } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java deleted file mode 100644 index 05348cd736eaa..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import com.google.auto.value.AutoValue; -import java.io.Serializable; -import java.util.Optional; -import java.util.OptionalInt; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; -import software.amazon.awssdk.services.sns.model.PublishResponse; - -@AutoValue -abstract class SnsResponse implements Serializable { - - public abstract T element(); - - public abstract OptionalInt statusCode(); - - public abstract Optional statusText(); - - static SnsResponse create( - @NonNull T element, OptionalInt statusCode, Optional statusText) { - - return new AutoValue_SnsResponse<>(element, statusCode, statusText); - } - - public static SnsResponse of(@NonNull T element, @Nullable PublishResponse response) { - - final Optional publishResponse = Optional.ofNullable(response); - OptionalInt statusCode = - publishResponse - .map(r -> OptionalInt.of(r.sdkHttpResponse().statusCode())) - .orElse(OptionalInt.empty()); - - Optional statusText = publishResponse.flatMap(r -> r.sdkHttpResponse().statusText()); - - return create(element, statusCode, statusText); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java deleted file mode 100644 index 88d40f74edcac..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; -import org.apache.beam.sdk.coders.BooleanCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.StructuredCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; - -/** - * Custom Coder for WrappedSnsResponse. - * - * @deprecated Coder of deprecated {@link SnsResponse}. - */ -@Deprecated -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -class SnsResponseCoder extends StructuredCoder> { - - private final Coder elementCoder; - private static final VarIntCoder STATUS_CODE_CODER = VarIntCoder.of(); - private static final StringUtf8Coder STATUS_TEXT_CODER = StringUtf8Coder.of(); - - public SnsResponseCoder(Coder elementCoder) { - this.elementCoder = elementCoder; - } - - static SnsResponseCoder of(Coder elementCoder) { - return new SnsResponseCoder<>(elementCoder); - } - - @Override - public void encode(SnsResponse value, OutputStream outStream) throws IOException { - T element = value.element(); - elementCoder.encode(element, outStream); - - OptionalInt statusCode = value.statusCode(); - if (statusCode.isPresent()) { - BooleanCoder.of().encode(Boolean.TRUE, outStream); - STATUS_CODE_CODER.encode(statusCode.getAsInt(), outStream); - } else { - BooleanCoder.of().encode(Boolean.FALSE, outStream); - } - - Optional statusText = value.statusText(); - if (statusText.isPresent()) { - BooleanCoder.of().encode(Boolean.TRUE, outStream); - STATUS_TEXT_CODER.encode(statusText.get(), outStream); - } else { - BooleanCoder.of().encode(Boolean.FALSE, outStream); - } - } - - @Override - public SnsResponse decode(InputStream inStream) throws IOException { - T element = elementCoder.decode(inStream); - - OptionalInt statusCode = OptionalInt.empty(); - if (BooleanCoder.of().decode(inStream)) { - statusCode = OptionalInt.of(STATUS_CODE_CODER.decode(inStream)); - } - - Optional statusText = Optional.empty(); - if (BooleanCoder.of().decode(inStream)) { - statusText = Optional.of(STATUS_TEXT_CODER.decode(inStream)); - } - return SnsResponse.create(element, statusCode, statusText); - } - - @Override - public List> getCoderArguments() { - return ImmutableList.of(elementCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - elementCoder.verifyDeterministic(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProviderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProviderTest.java deleted file mode 100644 index 4b94750a8bb2a..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProviderTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.util.SerializableUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; - -/** Tests on {@link BasicSnsAsyncClientProvider}. */ -@RunWith(JUnit4.class) -public class BasicSnsAsyncClientProviderTest { - - @Test - public void testSerialization() { - AwsCredentialsProvider awsCredentialsProvider = - StaticCredentialsProvider.create( - AwsBasicCredentials.create("ACCESS_KEY_ID", "SECRET_ACCESS_KEY")); - - BasicSnsAsyncClientProvider snsAsyncClientProvider = - new BasicSnsAsyncClientProvider(awsCredentialsProvider, "us-east-1", null); - - byte[] serializedBytes = SerializableUtils.serializeToByteArray(snsAsyncClientProvider); - - BasicSnsAsyncClientProvider snsAsyncClientProviderDeserialized = - (BasicSnsAsyncClientProvider) - SerializableUtils.deserializeFromByteArray(serializedBytes, "Aws Credentials Provider"); - - assertEquals(snsAsyncClientProvider, snsAsyncClientProviderDeserialized); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java deleted file mode 100644 index 697ae8ba7b769..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import java.io.Serializable; -import software.amazon.awssdk.services.sns.SnsAsyncClient; - -class MockSnsAsyncBaseClient implements SnsAsyncClient, Serializable { - @Override - public String serviceName() { - return null; - } - - @Override - public void close() {} -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java deleted file mode 100644 index 65e476874162f..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import software.amazon.awssdk.http.SdkHttpResponse; -import software.amazon.awssdk.services.sns.model.PublishRequest; -import software.amazon.awssdk.services.sns.model.PublishResponse; - -final class MockSnsAsyncClient extends MockSnsAsyncBaseClient { - private final int statusCode; - - private MockSnsAsyncClient(int statusCode) { - this.statusCode = statusCode; - } - - static MockSnsAsyncClient withStatusCode(int statusCode) { - return new MockSnsAsyncClient(statusCode); - } - - @Override - public CompletableFuture publish(PublishRequest publishRequest) { - return CompletableFuture.supplyAsync( - () -> { - SdkHttpResponse sdkHttpResponse = - SdkHttpResponse.builder().statusCode(statusCode).build(); - PublishResponse.Builder builder = PublishResponse.builder(); - builder.messageId(UUID.randomUUID().toString()); - builder.sdkHttpResponse(sdkHttpResponse).build(); - return builder.build(); - }); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java deleted file mode 100644 index 198b0f78dd68f..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import java.util.concurrent.CompletableFuture; -import software.amazon.awssdk.services.sns.model.PublishRequest; -import software.amazon.awssdk.services.sns.model.PublishResponse; - -final class MockSnsAsyncExceptionClient extends MockSnsAsyncBaseClient { - private MockSnsAsyncExceptionClient() {} - - static MockSnsAsyncExceptionClient create() { - return new MockSnsAsyncExceptionClient(); - } - - @Override - public CompletableFuture publish(PublishRequest publishRequest) { - return CompletableFuture.supplyAsync( - () -> { - throw new RuntimeException("Error occurred during publish call"); - }); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java deleted file mode 100644 index 9fb544aca36a5..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertEquals; - -import java.io.Serializable; -import java.util.Set; -import java.util.stream.StreamSupport; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import software.amazon.awssdk.services.sns.model.PublishRequest; - -@RunWith(JUnit4.class) -public class SnsIOWriteTest implements Serializable { - private static final String TOPIC = "test"; - private static final int FAILURE_STATUS_CODE = 400; - private static final int SUCCESS_STATUS_CODE = 200; - - @Rule public transient TestPipeline pipeline = TestPipeline.create(); - - @Test - @Ignore("Deprecated SnsIO.writeAsync doesn't wait for future responses.") - public void shouldReturnResponseOnPublishSuccess() { - String testMessage1 = "test1"; - String testMessage2 = "test2"; - String testMessage3 = "test3"; - - PCollection> result = - pipeline - .apply( - Create.of(testMessage1, testMessage2, testMessage3).withCoder(StringUtf8Coder.of())) - .apply( - SnsIO.writeAsync() - .withCoder(StringUtf8Coder.of()) - .withPublishRequestFn(createPublishRequestFn()) - .withSnsClientProvider( - () -> MockSnsAsyncClient.withStatusCode(SUCCESS_STATUS_CODE))); - - PAssert.that(result) - .satisfies( - (responses) -> { - ImmutableSet messagesInResponse = - StreamSupport.stream(responses.spliterator(), false) - .filter(response -> response.statusCode().getAsInt() == SUCCESS_STATUS_CODE) - .map(SnsResponse::element) - .collect(ImmutableSet.toImmutableSet()); - - Set originalMessages = - Sets.newHashSet(testMessage1, testMessage2, testMessage3); - Sets.SetView difference = - Sets.difference(messagesInResponse, originalMessages); - - assertEquals(3, messagesInResponse.size()); - assertEquals(0, difference.size()); - return null; - }); - pipeline.run().waitUntilFinish(); - } - - @Test - @Ignore("Deprecated SnsIO.writeAsync doesn't wait for future responses.") - public void shouldReturnResponseOnPublishFailure() { - String testMessage1 = "test1"; - String testMessage2 = "test2"; - - PCollection> result = - pipeline - .apply(Create.of(testMessage1, testMessage2).withCoder(StringUtf8Coder.of())) - .apply( - SnsIO.writeAsync() - .withCoder(StringUtf8Coder.of()) - .withPublishRequestFn(createPublishRequestFn()) - .withSnsClientProvider( - () -> MockSnsAsyncClient.withStatusCode(FAILURE_STATUS_CODE))); - - PAssert.that(result) - .satisfies( - (responses) -> { - ImmutableSet messagesInResponse = - StreamSupport.stream(responses.spliterator(), false) - .filter(response -> response.statusCode().getAsInt() != SUCCESS_STATUS_CODE) - .map(SnsResponse::element) - .collect(ImmutableSet.toImmutableSet()); - - Set originalMessages = Sets.newHashSet(testMessage1, testMessage2); - Sets.SetView difference = - Sets.difference(messagesInResponse, originalMessages); - - assertEquals(2, messagesInResponse.size()); - assertEquals(0, difference.size()); - return null; - }); - pipeline.run().waitUntilFinish(); - } - - @Test - @Ignore("Deprecated SnsIO.writeAsync doesn't fail on failure status code.") - public void shouldThrowIfThrowErrorOptionSet() { - pipeline - .apply(Create.of("test1")) - .apply( - SnsIO.writeAsync() - .withCoder(StringUtf8Coder.of()) - .withPublishRequestFn(createPublishRequestFn()) - .withSnsClientProvider( - () -> MockSnsAsyncClient.withStatusCode(FAILURE_STATUS_CODE))); - - assertThatThrownBy(() -> pipeline.run().waitUntilFinish()) - .isInstanceOf(Pipeline.PipelineExecutionException.class); - } - - @Test - @Ignore("Deprecated SnsIO.writeAsync doesn't propagate async failures.") - public void shouldThrowIfThrowErrorOptionSetOnInternalException() { - pipeline - .apply(Create.of("test1")) - .apply( - SnsIO.writeAsync() - .withCoder(StringUtf8Coder.of()) - .withPublishRequestFn(createPublishRequestFn()) - .withSnsClientProvider(MockSnsAsyncExceptionClient::create)); - - assertThatThrownBy(() -> pipeline.run().waitUntilFinish()) - .isInstanceOf(Pipeline.PipelineExecutionException.class); - } - - private SerializableFunction createPublishRequestFn() { - return (input) -> PublishRequest.builder().topicArn(TOPIC).message(input).build(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java deleted file mode 100644 index f0d4563a90c14..0000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Optional; -import java.util.OptionalInt; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.junit.Assert; -import org.junit.Test; - -public class SnsResponseCoderTest { - - @Test - public void verifyResponseWithStatusCodeAndText() throws IOException { - - SnsResponse expected = - SnsResponse.create("test-1", OptionalInt.of(200), Optional.of("OK")); - - SnsResponseCoder coder = SnsResponseCoder.of(StringUtf8Coder.of()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - coder.encode(expected, output); - - ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); - SnsResponse actual = coder.decode(in); - - Assert.assertEquals(expected, actual); - } - - @Test - public void verifyResponseWithStatusAndNoText() throws IOException { - SnsResponse expected = - SnsResponse.create("test-2", OptionalInt.of(200), Optional.empty()); - - SnsResponseCoder coder = SnsResponseCoder.of(StringUtf8Coder.of()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - coder.encode(expected, output); - - ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); - SnsResponse actual = coder.decode(in); - - Assert.assertEquals(expected, actual); - } - - @Test - public void verifyResponseWithNoStatusCodeAndText() throws IOException { - - SnsResponse expected = - SnsResponse.create("test-3", OptionalInt.empty(), Optional.empty()); - - SnsResponseCoder coder = SnsResponseCoder.of(StringUtf8Coder.of()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - coder.encode(expected, output); - - ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); - SnsResponse actual = coder.decode(in); - - Assert.assertEquals(expected, actual); - } -}