Skip to content

Commit

Permalink
ARROW-2794: [Plasma] Add the RPC of a list of Delete Objects in Plasma
Browse files Browse the repository at this point in the history
This pull request includes following changes:
1. Add a RPC to delete a list of objects, which could be used in Garbage Collection in Ray according to the [Garbage Collection Discussion](ray-project/ray#2242 (comment))..
2. Fix a bug in ReadDeleteRequest, change the wrong message type of PlasmaReleaseReply to PlasmaDeleteRequest.

Author: Yuhong Guo <yuhong.gyh@antfin.com>

Closes #2174 from guoyuhong/addDeleteObjs and squashes the following commits:

793bc5b <Yuhong Guo> Change according to comment.
c8562f6 <Yuhong Guo> Trigger build.
5f3aafe4 <Yuhong Guo> Change back Delete call.
7db6413 <Yuhong Guo> Add back Delete function for single obj for client
d67c830 <Yuhong Guo> Fix warning failure.
a1655fb <Yuhong Guo> Delete a comma.
bc2efc2 <Yuhong Guo> Change according to comment
3a3b786 <Yuhong Guo> Fix warning error.
c67fb4d <Yuhong Guo> Fix a bug in ReadDeleteRequest
d3188fb <Yuhong Guo> Add Delete Object list RPC to support garbage colletion in Ray
  • Loading branch information
guoyuhong authored and pcmoritz committed Jul 4, 2018
1 parent e7aaf7b commit 037c156
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 47 deletions.
33 changes: 21 additions & 12 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp

Status Seal(const ObjectID& object_id);

Status Delete(const ObjectID& object_id);
Status Delete(const std::vector<ObjectID>& object_ids);

Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);

Expand Down Expand Up @@ -808,21 +808,26 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
return ReadAbortReply(buffer.data(), buffer.size(), &id);
}

Status PlasmaClient::Impl::Delete(const ObjectID& object_id) {
Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
RETURN_NOT_OK(FlushReleaseHistory());
// If the object is in used, client can't send the remove message.
if (objects_in_use_.count(object_id) > 0) {
return Status::UnknownError("PlasmaClient::Object is in use.");
} else {
// If we don't already have a reference to the object, we can try to remove the object
RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id));
std::vector<ObjectID> not_in_use_ids;
for (auto& object_id : object_ids) {
// If the object is in used, skip it.
if (objects_in_use_.count(object_id) == 0) {
not_in_use_ids.push_back(object_id);
}
}
if (not_in_use_ids.size() > 0) {
RETURN_NOT_OK(SendDeleteRequest(store_conn_, not_in_use_ids));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaDeleteReply, &buffer));
ObjectID object_id2;
DCHECK_GT(buffer.size(), 0);
RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), &object_id2));
return Status::OK();
std::vector<PlasmaError> error_codes;
not_in_use_ids.clear();
RETURN_NOT_OK(
ReadDeleteReply(buffer.data(), buffer.size(), &not_in_use_ids, &error_codes));
}
return Status::OK();
}

Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
Expand Down Expand Up @@ -1036,7 +1041,11 @@ Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(obje
Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); }

Status PlasmaClient::Delete(const ObjectID& object_id) {
return impl_->Delete(object_id);
return impl_->Delete(std::vector<ObjectID>{object_id});
}

Status PlasmaClient::Delete(const std::vector<ObjectID>& object_ids) {
return impl_->Delete(object_ids);
}

Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status.
Status Delete(const ObjectID& object_id);

/// Delete a list of objects from the object store. This currently assumes that the
/// object is present, has been sealed and not used by another client. Otherwise,
/// it is a no operation.
///
/// \param object_ids The list of IDs of the objects to delete.
/// \return The return status. If all the objects are non-existent, return OK.
Status Delete(const std::vector<ObjectID>& object_ids);

/// Delete objects until we have freed up num_bytes bytes or there are no more
/// released objects that can be deleted.
///
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/plasma/format/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,19 @@ table PlasmaReleaseReply {
}

table PlasmaDeleteRequest {
// The number of objects to delete.
count: int;
// ID of the object to be deleted.
object_id: string;
object_ids: [string];
}

table PlasmaDeleteReply {
// The number of objects to delete.
count: int;
// ID of the object that was deleted.
object_id: string;
object_ids: [string];
// Error code.
error: PlasmaError;
errors: [PlasmaError];
}

table PlasmaStatusRequest {
Expand Down
53 changes: 41 additions & 12 deletions cpp/src/plasma/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ Status PlasmaReceive(int sock, MessageType message_type, std::vector<uint8_t>* b
return Status::OK();
}

// Helper function to create a vector of elements from Data (Request/Reply struct).
// The Getter function is used to extract one element from Data.
template <typename T, typename Data, typename Getter>
void to_vector(const Data& request, std::vector<T>* out, const Getter& getter) {
int count = request.count();
out->clear();
out->reserve(count);
for (int i = 0; i < count; ++i) {
out->push_back(getter(request, i));
}
}

template <typename Message>
Status PlasmaSend(int sock, MessageType message_type, flatbuffers::FlatBufferBuilder* fbb,
const Message& message) {
Expand Down Expand Up @@ -230,35 +242,52 @@ Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
return plasma_error_status(message->error());
}

// Delete messages.
// Delete objects messages.

Status SendDeleteRequest(int sock, ObjectID object_id) {
Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary()));
auto message =
CreatePlasmaDeleteRequest(fbb, static_cast<int32_t>(object_ids.size()),
to_flatbuffer(&fbb, &object_ids[0], object_ids.size()));
return PlasmaSend(sock, MessageType::PlasmaDeleteRequest, &fbb, message);
}

Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
DCHECK(object_ids);
auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
to_vector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) {
return ObjectID::from_binary(request.object_ids()->Get(i)->str());
});
return Status::OK();
}

Status SendDeleteReply(int sock, ObjectID object_id, PlasmaError error) {
Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
const std::vector<PlasmaError>& errors) {
DCHECK(object_ids.size() == errors.size());
flatbuffers::FlatBufferBuilder fbb;
auto message =
CreatePlasmaDeleteReply(fbb, fbb.CreateString(object_id.binary()), error);
auto message = CreatePlasmaDeleteReply(
fbb, static_cast<int32_t>(object_ids.size()),
to_flatbuffer(&fbb, &object_ids[0], object_ids.size()),
fbb.CreateVector(reinterpret_cast<const int32_t*>(&errors[0]), object_ids.size()));
return PlasmaSend(sock, MessageType::PlasmaDeleteReply, &fbb, message);
}

Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
std::vector<PlasmaError>* errors) {
DCHECK(data);
DCHECK(object_ids);
DCHECK(errors);
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
to_vector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) {
return ObjectID::from_binary(request.object_ids()->Get(i)->str());
});
to_vector(*message, errors, [](const PlasmaDeleteReply& request, int i) {
return static_cast<PlasmaError>(request.errors()->data()[i]);
});
return Status::OK();
}

// Satus messages.
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/plasma/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,17 @@ Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error);

Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id);

/* Plasma Delete message functions. */
/* Plasma Delete objects message functions. */

Status SendDeleteRequest(int sock, ObjectID object_id);
Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids);

Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id);
Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids);

Status SendDeleteReply(int sock, ObjectID object_id, PlasmaError error);
Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
const std::vector<PlasmaError>& errors);

Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id);
Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
std::vector<PlasmaError>* errors);

/* Satus messages. */

Expand Down
11 changes: 8 additions & 3 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -742,9 +742,14 @@ Status PlasmaStore::process_message(Client* client) {
release_object(object_id, client);
} break;
case MessageType::PlasmaDeleteRequest: {
RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_id));
PlasmaError error_code = delete_object(object_id);
HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_id, error_code), client->fd);
std::vector<ObjectID> object_ids;
std::vector<PlasmaError> error_codes;
RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids));
error_codes.reserve(object_ids.size());
for (auto& object_id : object_ids) {
error_codes.push_back(delete_object(object_id));
}
HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_ids, error_codes), client->fd);
} break;
case MessageType::PlasmaContainsRequest: {
RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
Expand Down
33 changes: 30 additions & 3 deletions cpp/src/plasma/test/client_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ TEST_F(TestPlasmaStore, DeleteTest) {

// Test for deleting non-existance object.
Status result = client_.Delete(object_id);
ASSERT_TRUE(result.IsPlasmaObjectNonexistent());
ARROW_CHECK_OK(result);

// Test for the object being in local Plasma store.
// First create object.
Expand All @@ -176,15 +176,42 @@ TEST_F(TestPlasmaStore, DeleteTest) {
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id));

// Object is in use, can't be delete.
result = client_.Delete(object_id);
ASSERT_TRUE(result.IsUnknownError());
// TODO: Guarantee that the in-use object will be deleted when it is released.
ARROW_CHECK_OK(result);

// Avoid race condition of Plasma Manager waiting for notification.
ARROW_CHECK_OK(client_.Release(object_id));
ARROW_CHECK_OK(client_.Delete(object_id));
}

TEST_F(TestPlasmaStore, DeleteObjectsTest) {
ObjectID object_id1 = ObjectID::from_random();
ObjectID object_id2 = ObjectID::from_random();

// Test for deleting non-existance object.
Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
ARROW_CHECK_OK(result);
// Test for the object being in local Plasma store.
// First create object.
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id1));
ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id2));
// Objects are in use.
result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
// TODO: Guarantee that the in-use object will be deleted when it is released.
ARROW_CHECK_OK(result);
// Avoid race condition of Plasma Manager waiting for notification.
ARROW_CHECK_OK(client_.Release(object_id1));
ARROW_CHECK_OK(client_.Release(object_id2));
ARROW_CHECK_OK(client_.Delete(std::vector<ObjectID>{object_id1, object_id2}));
}

TEST_F(TestPlasmaStore, ContainsTest) {
ObjectID object_id = ObjectID::from_random();

Expand Down
24 changes: 15 additions & 9 deletions cpp/src/plasma/test/serialization_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,25 +223,31 @@ TEST(PlasmaSerialization, ReleaseReply) {
TEST(PlasmaSerialization, DeleteRequest) {
int fd = create_temp_file();
ObjectID object_id1 = ObjectID::from_random();
ARROW_CHECK_OK(SendDeleteRequest(fd, object_id1));
ARROW_CHECK_OK(SendDeleteRequest(fd, std::vector<ObjectID>{object_id1}));
std::vector<uint8_t> data =
read_message_from_file(fd, MessageType::PlasmaDeleteRequest);
ObjectID object_id2;
ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_id2));
ASSERT_EQ(object_id1, object_id2);
std::vector<ObjectID> object_vec;
ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_vec));
ASSERT_EQ(object_vec.size(), 1);
ASSERT_EQ(object_id1, object_vec[0]);
close(fd);
}

TEST(PlasmaSerialization, DeleteReply) {
int fd = create_temp_file();
ObjectID object_id1 = ObjectID::from_random();
PlasmaError error1 = PlasmaError::ObjectExists;
ARROW_CHECK_OK(SendDeleteReply(fd, object_id1, error1));
ARROW_CHECK_OK(SendDeleteReply(fd, std::vector<ObjectID>{object_id1},
std::vector<PlasmaError>{error1}));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaDeleteReply);
ObjectID object_id2;
Status s = ReadDeleteReply(data.data(), data.size(), &object_id2);
ASSERT_EQ(object_id1, object_id2);
ASSERT_TRUE(s.IsPlasmaObjectExists());
std::vector<ObjectID> object_vec;
std::vector<PlasmaError> error_vec;
Status s = ReadDeleteReply(data.data(), data.size(), &object_vec, &error_vec);
ASSERT_EQ(object_vec.size(), 1);
ASSERT_EQ(object_id1, object_vec[0]);
ASSERT_EQ(error_vec.size(), 1);
ASSERT_TRUE(error_vec[0] == PlasmaError::ObjectExists);
ASSERT_TRUE(s.ok());
close(fd);
}

Expand Down

0 comments on commit 037c156

Please sign in to comment.