Skip to content

Commit

Permalink
feat: allow specifying an expected object size for resumable operatio…
Browse files Browse the repository at this point in the history
…ns. (#2661)

Update resumable upload failure detection to be more specific about classifying a request as SCENARIO_5

Fixes #2511
  • Loading branch information
BenWhitehead committed Aug 23, 2024
1 parent 380057b commit 3405611
Show file tree
Hide file tree
Showing 17 changed files with 347 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
BidiWriteObjectRequest req = grpc.getBidiWriteObjectRequest(info, opts);

ApiFuture<BidiResumableWrite> startResumableWrite =
grpc.startResumableWrite(grpcCallContext, req);
grpc.startResumableWrite(grpcCallContext, req, opts);
return ResumableMedia.gapic()
.write()
.bidiByteChannel(grpc.storageClient.bidiWriteObjectCallable())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
WriteObjectRequest req = grpc.getWriteObjectRequest(info, opts);

ApiFuture<ResumableWrite> startResumableWrite =
grpc.startResumableWrite(grpcCallContext, req);
grpc.startResumableWrite(grpcCallContext, req, opts);
return ResumableMedia.gapic()
.write()
.byteChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.OutOfRangeException;
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
import com.google.cloud.storage.Conversions.Decoder;
Expand Down Expand Up @@ -345,10 +346,17 @@ public void onNext(BidiWriteObjectResponse value) {
public void onError(Throwable t) {
if (t instanceof OutOfRangeException) {
OutOfRangeException oore = (OutOfRangeException) t;
clientDetectedError(
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
ImmutableList.of(lastWrittenRequest), null, context, oore));
} else if (t instanceof ApiException) {
ErrorDetails ed = oore.getErrorDetails();
if (!(ed != null
&& ed.getErrorInfo() != null
&& ed.getErrorInfo().getReason().equals("GRPC_MISMATCHED_UPLOAD_SIZE"))) {
clientDetectedError(
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
ImmutableList.of(lastWrittenRequest), null, context, oore));
return;
}
}
if (t instanceof ApiException) {
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
// things fall in line with our retry handlers.
// This is suboptimal, as it will initialize a second exception, however this is the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.OutOfRangeException;
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
import com.google.cloud.storage.Conversions.Decoder;
Expand Down Expand Up @@ -267,11 +268,18 @@ public void onError(Throwable t) {
if (t instanceof OutOfRangeException) {
OutOfRangeException oore = (OutOfRangeException) t;
open = false;
StorageException storageException =
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
segments, null, context, oore);
invocationHandle.setException(storageException);
} else if (t instanceof ApiException) {
ErrorDetails ed = oore.getErrorDetails();
if (!(ed != null
&& ed.getErrorInfo() != null
&& ed.getErrorInfo().getReason().equals("GRPC_MISMATCHED_UPLOAD_SIZE"))) {
StorageException storageException =
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
segments, null, context, oore);
invocationHandle.setException(storageException);
return;
}
}
if (t instanceof ApiException) {
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
// things fall in line with our retry handlers.
// This is suboptimal, as it will initialize a second exception, however this is the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.BidiWriteObjectRequest;
import com.google.storage.v2.BidiWriteObjectResponse;
Expand Down Expand Up @@ -50,7 +52,8 @@ GapicBidiWritableByteChannelSessionBuilder bidiByteChannel(

ApiFuture<ResumableWrite> resumableWrite(
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> callable,
WriteObjectRequest writeObjectRequest) {
WriteObjectRequest writeObjectRequest,
Opts<ObjectTargetOpt> opts) {
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
if (writeObjectRequest.hasWriteObjectSpec()) {
b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec());
Expand All @@ -61,7 +64,7 @@ ApiFuture<ResumableWrite> resumableWrite(
if (writeObjectRequest.hasObjectChecksums()) {
b.setObjectChecksums(writeObjectRequest.getObjectChecksums());
}
StartResumableWriteRequest req = b.build();
StartResumableWriteRequest req = opts.startResumableWriteRequest().apply(b).build();
Function<String, WriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
Expand All @@ -80,7 +83,8 @@ ApiFuture<ResumableWrite> resumableWrite(

ApiFuture<BidiResumableWrite> bidiResumableWrite(
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> x,
BidiWriteObjectRequest writeObjectRequest) {
BidiWriteObjectRequest writeObjectRequest,
Opts<ObjectTargetOpt> opts) {
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
if (writeObjectRequest.hasWriteObjectSpec()) {
b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec());
Expand All @@ -91,7 +95,7 @@ ApiFuture<BidiResumableWrite> bidiResumableWrite(
if (writeObjectRequest.hasObjectChecksums()) {
b.setObjectChecksums(writeObjectRequest.getObjectChecksums());
}
StartResumableWriteRequest req = b.build();
StartResumableWriteRequest req = opts.startResumableWriteRequest().apply(b).build();
Function<String, BidiWriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write =
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext);

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);
ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req, opts);
ApiFuture<GrpcResumableSession> session2 =
ApiFutures.transform(
start,
Expand Down Expand Up @@ -365,7 +365,7 @@ public Blob createFrom(
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);
ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req, opts);

BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
Expand Down Expand Up @@ -790,7 +790,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
// in JSON, the starting of the resumable session happens before the invocation of write can
// happen. Emulate the same thing here.
// 1. create the future
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req);
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req, opts);
// 2. await the result of the future
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
// 3. wrap the result in another future container before constructing the BlobWriteChannel
Expand Down Expand Up @@ -1919,7 +1919,7 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(

@VisibleForTesting
ApiFuture<ResumableWrite> startResumableWrite(
GrpcCallContext grpcCallContext, WriteObjectRequest req) {
GrpcCallContext grpcCallContext, WriteObjectRequest req, Opts<ObjectTargetOpt> opts) {
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req));
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return ResumableMedia.gapic()
Expand All @@ -1928,11 +1928,12 @@ ApiFuture<ResumableWrite> startResumableWrite(
storageClient
.startResumableWriteCallable()
.withDefaultCallContext(merge.withRetryableCodes(codes)),
req);
req,
opts);
}

ApiFuture<BidiResumableWrite> startResumableWrite(
GrpcCallContext grpcCallContext, BidiWriteObjectRequest req) {
GrpcCallContext grpcCallContext, BidiWriteObjectRequest req, Opts<ObjectTargetOpt> opts) {
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req));
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return ResumableMedia.gapic()
Expand All @@ -1941,7 +1942,8 @@ ApiFuture<BidiResumableWrite> startResumableWrite(
storageClient
.startResumableWriteCallable()
.withDefaultCallContext(merge.withRetryableCodes(codes)),
req);
req,
opts);
}

private SourceObject sourceObjectEncode(SourceBlob from) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
ApiFuture<ResumableWrite> f =
grpcStorage.startResumableWrite(
grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts));
grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts), opts);
ApiFuture<WriteCtx<ResumableWrite>> start =
ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public void rewindTo(long offset) {
&& contentLength != null
&& contentLength > 0) {
String errorMessage = cause.getContent().toLowerCase(Locale.US);
if (errorMessage.contains("content-range")) {
if (errorMessage.contains("content-range")
&& !errorMessage.contains("earlier")) { // TODO: exclude "earlier request"
StorageException se =
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
uploadId, response, cause, cause::getContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.UnifiedOpts.ResumableUploadExpectedObjectSize;
import com.google.cloud.storage.UnifiedOpts.SourceGenerationMatch;
import com.google.cloud.storage.UnifiedOpts.SourceGenerationNotMatch;
import com.google.cloud.storage.UnifiedOpts.SourceMetagenerationMatch;
Expand Down Expand Up @@ -102,7 +103,8 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
|| o instanceof SourceMetagenerationMatch
|| o instanceof SourceMetagenerationNotMatch
|| o instanceof Crc32cMatch
|| o instanceof Md5Match;
|| o instanceof Md5Match
|| o instanceof ResumableUploadExpectedObjectSize;
TO_EXCLUDE_FROM_PARTS = tmp.negate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,23 @@ public static BlobWriteOption detectContentType() {
return new BlobWriteOption(UnifiedOpts.detectContentType());
}

/**
* Set a precondition on the number of bytes that GCS should expect for a resumable upload. See
* the docs for <a
* href="https://cloud.google.com/storage/docs/json_api/v1/parameters#xuploadcontentlength">X-Upload-Content-Length</a>
* for more detail.
*
* <p>If the method invoked with this option does not perform a resumable upload, this option
* will be ignored.
*
* @since 2.42.0
*/
@BetaApi
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public static BlobWriteOption expectedObjectSize(long objectContentSize) {
return new BlobWriteOption(UnifiedOpts.resumableUploadExpectedObjectSize(objectContentSize));
}

/**
* Deduplicate any options which are the same parameter. The value which comes last in {@code
* os} will be the value included in the return.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.RestoreObjectRequest;
import com.google.storage.v2.RewriteObjectRequest;
import com.google.storage.v2.StartResumableWriteRequest;
import com.google.storage.v2.UpdateBucketRequest;
import com.google.storage.v2.UpdateHmacKeyRequest;
import com.google.storage.v2.UpdateObjectRequest;
Expand Down Expand Up @@ -196,6 +197,10 @@ default Mapper<ComposeObjectRequest.Builder> composeObject() {
default Mapper<RewriteObjectRequest.Builder> rewriteObject() {
return Mapper.identity();
}

default Mapper<StartResumableWriteRequest.Builder> startResumableWrite() {
return Mapper.identity();
}
}

/** Base interface for those Opts which are applicable to Bucket List operations */
Expand Down Expand Up @@ -487,6 +492,12 @@ static Projection projection(@NonNull String projection) {
return new Projection(projection);
}

static ResumableUploadExpectedObjectSize resumableUploadExpectedObjectSize(
long expectedObjectSize) {
checkArgument(expectedObjectSize >= 0, "expectedObjectSize >= 0 (%s >= 0)", expectedObjectSize);
return new ResumableUploadExpectedObjectSize(expectedObjectSize);
}

static SoftDeleted softDeleted(boolean softDeleted) {
return new SoftDeleted(softDeleted);
}
Expand Down Expand Up @@ -1832,6 +1843,25 @@ public Mapper<UpdateObjectRequest.Builder> updateObject() {
}
}

static final class ResumableUploadExpectedObjectSize extends RpcOptVal<@NonNull Long>
implements ObjectTargetOpt {
private static final long serialVersionUID = 3640126281492196357L;

private ResumableUploadExpectedObjectSize(@NonNull Long val) {
super(StorageRpc.Option.X_UPLOAD_CONTENT_LENGTH, val);
}

@Override
public Mapper<StartResumableWriteRequest.Builder> startResumableWrite() {
return b -> {
if (val > 0) {
b.getWriteObjectSpecBuilder().setObjectSize(val);
}
return b;
};
}
}

static final class ShowDeletedKeys extends RpcOptVal<@NonNull Boolean> implements HmacKeyListOpt {
private static final long serialVersionUID = -6604176744362903487L;

Expand Down Expand Up @@ -2426,6 +2456,10 @@ Mapper<BidiWriteObjectRequest.Builder> bidiWriteObjectRequest() {
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::bidiWriteObject);
}

Mapper<StartResumableWriteRequest.Builder> startResumableWriteRequest() {
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::startResumableWrite);
}

Mapper<GetObjectRequest.Builder> getObjectsRequest() {
return fuseMappers(ObjectSourceOpt.class, ObjectSourceOpt::getObject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,10 @@ public String open(StorageObject object, Map<Option, ?> options) {
requestFactory.buildPostRequest(url, new JsonHttpContent(jsonFactory, object));
HttpHeaders requestHeaders = httpRequest.getHeaders();
requestHeaders.set("X-Upload-Content-Type", detectContentType(object, options));
Long xUploadContentLength = Option.X_UPLOAD_CONTENT_LENGTH.getLong(options);
if (xUploadContentLength != null) {
requestHeaders.set("X-Upload-Content-Length", xUploadContentLength);
}
setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options);
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 200) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ enum Option {
SOFT_DELETED("softDeleted"),
COPY_SOURCE_ACL("copySourceAcl"),
GENERATION("generation"),
INCLUDE_FOLDERS_AS_PREFIXES("includeFoldersAsPrefixes");
INCLUDE_FOLDERS_AS_PREFIXES("includeFoldersAsPrefixes"),
X_UPLOAD_CONTENT_LENGTH("x-upload-content-length");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.storage.v2.StartResumableWriteRequest;
import com.google.storage.v2.StartResumableWriteResponse;
import com.google.storage.v2.WriteObjectRequest;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void syntax_directBuffered_fluent() {
@Test
public void syntax_resumableUnbuffered_fluent() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req);
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
Expand All @@ -110,7 +111,7 @@ public void syntax_resumableUnbuffered_fluent() {
@Test
public void syntax_resumableBuffered_fluent() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req);
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
Expand Down Expand Up @@ -150,7 +151,7 @@ public void syntax_directBuffered_incremental() {
@Test
public void syntax_resumableUnbuffered_incremental() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req);
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
GapicWritableByteChannelSessionBuilder b1 =
ResumableMedia.gapic()
.write()
Expand All @@ -164,7 +165,7 @@ public void syntax_resumableUnbuffered_incremental() {
@Test
public void syntax_resumableBuffered_incremental() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req);
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
GapicWritableByteChannelSessionBuilder b1 =
ResumableMedia.gapic()
.write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ void testUploads(@ForAll("scenario") Scenario s) throws Exception {

ApiFuture<ResumableWrite> f =
storage.startResumableWrite(
GrpcCallContext.createDefault(), storage.getWriteObjectRequest(info, Opts.empty()));
GrpcCallContext.createDefault(),
storage.getWriteObjectRequest(info, Opts.empty()),
Opts.empty());
ResumableWrite resumableWrite = ApiExceptions.callAndTranslateApiException(f);

UploadCtx uploadCtx =
Expand Down
Loading

0 comments on commit 3405611

Please sign in to comment.