Skip to content

Commit

Permalink
YSQL: create WaitUntilIndexPermissionsAtLeast APIs (#4347)
Browse files Browse the repository at this point in the history
Summary:

Create a client function `WaitUntilIndexPermissionsAtLeast` to wait for
an index to reach or exceed a given permission.  For example, wait for
index permissions to be at least `INDEX_PERM_WRITE_AND_DELETE`.  Have it
return the actual permission when done waiting.  This is to be able to
handle both success and failure cases.  Continuing the previous example,
a failure can cause the permission to be
`INDEX_PERM_DELETE_ONLY_WHILE_REMOVING`.

Create functions (checked below) to bubble the information up to the
pggate layer.

* [x] `YBCPgWaitUntilIndexPermissionsAtLeast`
  * [x] `PgApiImpl::WaitUntilIndexPermissionsAtLeast`
    * [x] `PgSession::WaitUntilIndexPermissionsAtLeast`
      * [x] `YBClient::WaitUntilIndexPermissionsAtLeast`
        * [x] `YBClient::Data::WaitUntilIndexPermissionsAtLeast`
          * [x] `YBClient::Data::GetIndexPermissions`
            * [ ] `YBClient::Data::GetTableSchemaById`

This is part 1 of the effort of bringing index backfill to YSQL.

Close: #4347

Test Plan: jenkins

Reviewers: amitanand, mihnea

Reviewed By: mihnea

Subscribers: amitanand, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D8398
  • Loading branch information
jaki committed May 20, 2020
1 parent 2b42531 commit 006b6bf
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 1 deletion.
15 changes: 15 additions & 0 deletions src/postgres/src/include/catalog/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,19 @@ extern void RestoreReindexState(void *reindexstate);

extern void IndexSetParentIndex(Relation idx, Oid parentOid);

/*
* This should exactly match the IndexPermissions enum in
* src/yb/common/common.proto. See the definition there for details.
*/
typedef enum
{
YB_INDEX_PERM_DELETE_ONLY = 0,
YB_INDEX_PERM_WRITE_AND_DELETE = 2,
YB_INDEX_PERM_DO_BACKFILL = 4,
YB_INDEX_PERM_READ_WRITE_AND_DELETE = 6,
YB_INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING = 8,
YB_INDEX_PERM_DELETE_ONLY_WHILE_REMOVING = 10,
YB_INDEX_PERM_INDEX_UNUSED = 12,
} YBIndexPermissions;

#endif /* INDEX_H */
52 changes: 51 additions & 1 deletion src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ Status YBClient::Data::WaitForFlushTableToFinish(YBClient* client,
return RetryFunc(
deadline, "Waiting for FlushTables to be completed", "Timed out waiting for FlushTables",
std::bind(&YBClient::Data::IsFlushTableInProgress, this, client, flush_id, _1, _2));
return Status::OK();
}

Status YBClient::Data::InitLocalHostNames() {
Expand Down Expand Up @@ -1584,6 +1583,57 @@ Status YBClient::Data::GetTableSchemaById(YBClient* client,
return Status::OK();
}

Result<IndexPermissions> YBClient::Data::GetIndexPermissions(
YBClient* client,
const TableId& table_id,
const TableId& index_id,
const CoarseTimePoint deadline) {
std::shared_ptr<YBTableInfo> yb_table_info = std::make_shared<YBTableInfo>();
Synchronizer sync;

RETURN_NOT_OK(GetTableSchemaById(client,
table_id,
deadline,
yb_table_info,
sync.AsStatusCallback()));
Status s = sync.Wait();
if (!s.ok()) {
return s;
}

const IndexInfo* index_info =
VERIFY_RESULT(yb_table_info->index_map.FindIndex(index_id));
return index_info->index_permissions();
}

Result<IndexPermissions> YBClient::Data::WaitUntilIndexPermissionsAtLeast(
YBClient* client,
const TableId& table_id,
const TableId& index_id,
const CoarseTimePoint deadline,
const IndexPermissions& target_index_permissions) {
RETURN_NOT_OK(RetryFunc(
deadline,
"Waiting for index to have desired permissions",
"Timed out waiting for proper index permissions",
[&] (CoarseTimePoint deadline, bool* retry) -> Status {
IndexPermissions actual_index_permissions = VERIFY_RESULT(GetIndexPermissions(
client,
table_id,
index_id,
deadline));
*retry = (actual_index_permissions < target_index_permissions);
return Status::OK();
}));
// Now, the index permissions are guaranteed to be at (or beyond) the target. Query again to
// return it.
return GetIndexPermissions(
client,
table_id,
index_id,
deadline);
}

void YBClient::Data::CreateCDCStream(YBClient* client,
const TableId& table_id,
const std::unordered_map<std::string, std::string>& options,
Expand Down
12 changes: 12 additions & 0 deletions src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ class YBClient::Data {
std::shared_ptr<YBTableInfo> info,
StatusCallback callback);

Result<IndexPermissions> GetIndexPermissions(
YBClient* client,
const TableId& table_id,
const TableId& index_id,
const CoarseTimePoint deadline);
Result<IndexPermissions> WaitUntilIndexPermissionsAtLeast(
YBClient* client,
const TableId& table_id,
const TableId& index_id,
const CoarseTimePoint deadline,
const IndexPermissions& target_index_permissions);

void CreateCDCStream(YBClient* client,
const TableId& table_id,
const std::unordered_map<std::string, std::string>& options,
Expand Down
28 changes: 28 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,34 @@ Status YBClient::GetTableSchemaById(const TableId& table_id, std::shared_ptr<YBT
return data_->GetTableSchemaById(this, table_id, deadline, info, callback);
}

Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
const TableId& table_id,
const TableId& index_id,
const IndexPermissions& target_index_permissions) {
auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
return data_->WaitUntilIndexPermissionsAtLeast(
this,
table_id,
index_id,
deadline,
target_index_permissions);
}

Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
const YBTableName& table_name,
const YBTableName& index_name,
const IndexPermissions& target_index_permissions) {
auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
YBTableInfo table_info = VERIFY_RESULT(GetYBTableInfo(table_name));
YBTableInfo index_info = VERIFY_RESULT(GetYBTableInfo(index_name));
return data_->WaitUntilIndexPermissionsAtLeast(
this,
table_info.table_id,
index_info.table_id,
deadline,
target_index_permissions);
}

Status YBClient::CreateNamespace(const std::string& namespace_name,
const boost::optional<YQLDatabase>& database_type,
const std::string& creator_role_name,
Expand Down
9 changes: 9 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ class YBClient {
CHECKED_STATUS GetTableSchemaById(const TableId& table_id, std::shared_ptr<YBTableInfo> info,
StatusCallback callback);

Result<IndexPermissions> WaitUntilIndexPermissionsAtLeast(
const TableId& table_id,
const TableId& index_id,
const IndexPermissions& target_index_permissions);
Result<IndexPermissions> WaitUntilIndexPermissionsAtLeast(
const YBTableName& table_name,
const YBTableName& index_name,
const IndexPermissions& target_index_permissions);

// Namespace related methods.

// Create a new namespace with the given name.
Expand Down
3 changes: 3 additions & 0 deletions src/yb/common/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
#ifndef YB_COMMON_INDEX_H_
#define YB_COMMON_INDEX_H_

#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "yb/common/common.pb.h"
#include "yb/common/entity_ids.h"
#include "yb/common/schema.h"

Expand Down Expand Up @@ -65,6 +67,7 @@ class IndexInfo {
const std::vector<ColumnId>& indexed_range_column_ids() const {
return indexed_range_column_ids_;
}
const IndexPermissions index_permissions() const { return index_permissions_; }

// Return column ids that are primary key columns of the indexed table.
std::vector<ColumnId> index_key_column_ids() const;
Expand Down
2 changes: 2 additions & 0 deletions src/yb/integration-tests/cassandra_cpp_driver-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,8 @@ Result<IndexPermissions> GetIndexPermissions(
return index_info_pb.index_permissions();
}

// TODO(jason): make Client::WaitUntilIndexPermissionsAtLeast compatible with this function
// (particularly the exponential_backoff), and replace all instances of this function with that one.
IndexPermissions WaitUntilIndexPermissionIsAtLeast(
client::YBClient* client, const YBTableName& table_name, const YBTableName& index_table_name,
IndexPermissions min_permission, bool exponential_backoff = true) {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/util/enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ class EnumBitSet {
return impl_ == rhs.impl_;
}

bool operator!=(const EnumBitSet<Enum>& rhs) const {
return impl_ != rhs.impl_;
}

bool operator<(const EnumBitSet<Enum>& rhs) const {
return impl_.to_ullong() < rhs.impl_.to_ullong();
}
Expand Down
10 changes: 10 additions & 0 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -935,5 +935,15 @@ void PgSession::SetTimeout(const int timeout_ms) {
session_->SetSingleRpcTimeout(MonoDelta::FromMilliseconds(timeout_ms));
}

Result<IndexPermissions> PgSession::WaitUntilIndexPermissionsAtLeast(
const PgObjectId& table_id,
const PgObjectId& index_id,
const IndexPermissions& target_index_permissions) {
return client_->WaitUntilIndexPermissionsAtLeast(
table_id.GetYBTableId(),
index_id.GetYBTableId(),
target_index_permissions);
}

} // namespace pggate
} // namespace yb
5 changes: 5 additions & 0 deletions src/yb/yql/pggate/pg_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ class PgSession : public RefCountedThreadSafe<PgSession> {
// Sets the specified timeout in the rpc service.
void SetTimeout(int timeout_ms);

Result<IndexPermissions> WaitUntilIndexPermissionsAtLeast(
const PgObjectId& table_id,
const PgObjectId& index_id,
const IndexPermissions& target_index_permissions);

private:
CHECKED_STATUS FlushBufferedOperationsImpl();
CHECKED_STATUS FlushBufferedOperationsImpl(const PgsqlOpBuffer& ops, bool transactional);
Expand Down
10 changes: 10 additions & 0 deletions src/yb/yql/pggate/pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,16 @@ Status PgApiImpl::ExecDropIndex(PgStatement *handle) {
return down_cast<PgDropIndex*>(handle)->Exec();
}

Result<IndexPermissions> PgApiImpl::WaitUntilIndexPermissionsAtLeast(
const PgObjectId& table_id,
const PgObjectId& index_id,
const IndexPermissions& target_index_permissions) {
return pg_session_->WaitUntilIndexPermissionsAtLeast(
table_id,
index_id,
target_index_permissions);
}

//--------------------------------------------------------------------------------------------------
// DML Statment Support.
//--------------------------------------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions src/yb/yql/pggate/pggate.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ class PgApiImpl {

CHECKED_STATUS ExecDropIndex(PgStatement *handle);

Result<IndexPermissions> WaitUntilIndexPermissionsAtLeast(
const PgObjectId& table_id,
const PgObjectId& index_id,
const IndexPermissions& target_index_permissions);

//------------------------------------------------------------------------------------------------
// All DML statements
CHECKED_STATUS DmlAppendTarget(PgStatement *handle, PgExpr *expr);
Expand Down
24 changes: 24 additions & 0 deletions src/yb/yql/pggate/ybc_pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
// or implied. See the License for the specific language governing permissions and limitations
// under the License.

#include <string>

#include <cds/init.h>

#include "yb/common/ybc-internal.h"
Expand Down Expand Up @@ -420,6 +422,28 @@ YBCStatus YBCPgExecDropIndex(YBCPgStatement handle) {
return ToYBCStatus(pgapi->ExecDropIndex(handle));
}

YBCStatus YBCPgWaitUntilIndexPermissionsAtLeast(
const YBCPgOid database_oid,
const YBCPgOid table_oid,
const YBCPgOid index_oid,
const uint32_t target_index_permissions,
uint32_t *actual_index_permissions) {
const PgObjectId table_id(database_oid, table_oid);
const PgObjectId index_id(database_oid, index_oid);
IndexPermissions returned_index_permissions = IndexPermissions::INDEX_PERM_DELETE_ONLY;
YBCStatus s = ExtractValueFromResult(pgapi->WaitUntilIndexPermissionsAtLeast(
table_id,
index_id,
static_cast<IndexPermissions>(target_index_permissions)),
&returned_index_permissions);
if (s) {
// Bad status.
return s;
}
*actual_index_permissions = static_cast<uint32_t>(returned_index_permissions);
return YBCStatusOK();
}

//--------------------------------------------------------------------------------------------------
// DML Statements.
//--------------------------------------------------------------------------------------------------
Expand Down
7 changes: 7 additions & 0 deletions src/yb/yql/pggate/ybc_pggate.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ YBCStatus YBCPgNewDropIndex(YBCPgOid database_oid,

YBCStatus YBCPgExecDropIndex(YBCPgStatement handle);

YBCStatus YBCPgWaitUntilIndexPermissionsAtLeast(
const YBCPgOid database_oid,
const YBCPgOid table_oid,
const YBCPgOid index_oid,
const uint32_t target_index_permissions,
uint32_t *actual_index_permissions);

//--------------------------------------------------------------------------------------------------
// DML statements (select, insert, update, delete, truncate)
//--------------------------------------------------------------------------------------------------
Expand Down

0 comments on commit 006b6bf

Please sign in to comment.