Skip to content

Commit

Permalink
Add write*Blob option to replace existing blob (#31729)
Browse files Browse the repository at this point in the history
Adds a new parameter to the BlobContainer#write*Blob methods to specify whether the existing file
should be overridden or not. For some metadata files in the repository, we actually want to replace
the current file. This is currently implemented through an explicit blob delete and then a fresh write.
In case of using a cloud provider (S3, GCS, Azure), this results in 2 API requests instead of just 1.
This change will therefore allow us to achieve the same functionality using less API requests.
  • Loading branch information
ywelsch authored Jul 3, 2018
1 parent 631a53a commit 2bb4f38
Show file tree
Hide file tree
Showing 21 changed files with 131 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public InputStream readBlob(String name) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,20 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Contai
handlers.insert(path, (request) -> {
final String destContainerName = request.getParam("container");
final String destBlobName = objectName(request.getParameters());
final String ifNoneMatch = request.getHeader("If-None-Match");

final Container destContainer = containers.get(destContainerName);
if (destContainer == null) {
return newContainerNotFoundError(request.getId());
}

byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody());
if (existingBytes != null) {
return newBlobAlreadyExistsError(request.getId());
if ("*".equals(ifNoneMatch)) {
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody());
if (existingBytes != null) {
return newBlobAlreadyExistsError(request.getId());
}
} else {
destContainer.objects.put(destBlobName, request.getBody());
}

return new Response(RestStatus.CREATED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);

try {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
} catch (URISyntaxException|StorageException e) {
throw new IOException("Can not write blob " + blobName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException,
FileAlreadyExistsException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize);
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,20 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
return blobsBuilder.immutableMap();
}

public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
try {
final AccessCondition accessCondition =
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
SocketAccess.doPrivilegedVoidException(() ->
blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, client.v2().get()));
blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get()));
} catch (final StorageException se) {
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
}

@Override
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
if (blobs.containsKey(blobName)) {
if (failIfAlreadyExists && blobs.containsKey(blobName)) {
throw new FileAlreadyExistsException(blobName);
}
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
handlers.insert("POST /upload/storage/v1/b/{bucket}/o", (request) -> {
final String ifGenerationMatch = request.getParam("ifGenerationMatch");
if ("0".equals(ifGenerationMatch) == false) {
return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
}

final String uploadType = request.getParam("uploadType");
if ("resumable".equals(uploadType)) {
final String objectName = request.getParam("name");
Expand All @@ -172,12 +168,19 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.putIfAbsent(objectName, EMPTY_BYTE) == null) {
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
if ("0".equals(ifGenerationMatch)) {
if (bucket.objects.putIfAbsent(objectName, EMPTY_BYTE) == null) {
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
+ objectName;
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
} else {
return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
}
} else {
return newError(RestStatus.CONFLICT, "object already exist");
bucket.objects.put(objectName, EMPTY_BYTE);
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
+ objectName;
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
}
} else if ("multipart".equals(uploadType)) {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,31 +193,34 @@ public void close() throws IOException {

/**
* Writes a blob in the specific bucket
*
* @param inputStream content of the blob to be written
* @param inputStream content of the blob to be written
* @param blobSize expected size of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
writeBlobResumable(blobInfo, inputStream);
writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists);
} else {
writeBlobMultipart(blobInfo, inputStream, blobSize);
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
}
}

/**
* Uploads a blob using the "resumable upload" method (multiple requests, which
* can be independently retried in case of failure, see
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
*
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
try {
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
new Storage.BlobWriteOption[0];
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
Expand All @@ -236,7 +239,7 @@ public int write(ByteBuffer src) throws IOException {
}
}));
} catch (final StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
Expand All @@ -248,20 +251,24 @@ public int write(ByteBuffer src) throws IOException {
* 'multipart/related' request containing both data and metadata. The request is
* gziped), see:
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload
*
* @param blobInfo the info for the blob to be uploaded
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param blobSize the size
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize) throws IOException {
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
new Storage.BlobTargetOption[0];
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist()));
() -> client().create(blobInfo, baos.toByteArray(), targetOptions));
} catch (final StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
store.execute((Operation<Void>) fileContext -> {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) :
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)};
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testReadOnly() throws Exception {
assertTrue(util.exists(hdfsPath));

byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
writeBlob(container, "foo", new BytesArray(data));
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
assertTrue(container.blobExists("foo"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ public InputStream readBlob(String blobName) throws IOException {
}
}

/**
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
*/
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= blobStore.bufferSizeInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ public interface BlobContainer {
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @throws FileAlreadyExistsException if a blob by the same name already exists
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it. When the BlobContainer implementation
* does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then
* the {@link #writeBlob(String, InputStream, long)} method is used.
* the {@link #writeBlob(String, InputStream, long, boolean)} method is used.
*
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
Expand All @@ -90,11 +92,14 @@ public interface BlobContainer {
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @throws FileAlreadyExistsException if a blob by the same name already exists
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
writeBlob(blobName, inputStream, blobSize);
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists)
throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}

/**
Expand Down
Loading

0 comments on commit 2bb4f38

Please sign in to comment.