diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index 0b68f0bc8cf2..1ab0572ab614 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -646,6 +646,20 @@ CHECKED_STATUS VerifyArg(const SetCDCCheckpointRequestPB& req) { return Status::OK(); } +// This function is to handle the upgrade scenario where the DB is upgraded from a version +// without CDCSDK changes to the one with it. So in case, some required options are missing, +// the default values will be added for the same. +void AddDefaultOptionsIfMissing(std::unordered_map* options) { + if ((*options).find(cdc::kSourceType) == (*options).end()) { + (*options).emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER)); + } + + if ((*options).find(cdc::kCheckpointType) == (*options).end()) { + (*options).emplace(cdc::kCheckpointType, + CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT)); + } +} + } // namespace template @@ -2107,6 +2121,9 @@ Result> CDCServiceImpl::GetStream(const std::str RETURN_NOT_OK(client()->GetCDCStream(stream_id, &ns_id, &object_ids, &options)); auto stream_metadata = std::make_shared(); + + AddDefaultOptionsIfMissing(&options); + for (const auto& option : options) { if (option.first == kRecordType) { SCHECK(CDCRecordType_Parse(option.second, &stream_metadata->record_type), diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index c8dfe3d866ff..78feca90ac28 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -210,6 +210,34 @@ class CDCStreamLoader : public Visitor { public: explicit CDCStreamLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {} + void AddDefaultValuesIfMissing(const SysCDCStreamEntryPB& metadata, + CDCStreamInfo::WriteLock* l) { + bool source_type_present = false; + bool checkpoint_type_present = false; + + // Iterate over all the options to check if checkpoint_type and source_type are present. + for (auto option : metadata.options()) { + if (option.key() == cdc::kSourceType) { + source_type_present = true; + } + if (option.key() == cdc::kCheckpointType) { + checkpoint_type_present = true; + } + } + + if (!source_type_present) { + auto source_type_opt = l->mutable_data()->pb.add_options(); + source_type_opt->set_key(cdc::kSourceType); + source_type_opt->set_value(cdc::CDCRequestSource_Name(cdc::XCLUSTER)); + } + + if (!checkpoint_type_present) { + auto checkpoint_type_opt = l->mutable_data()->pb.add_options(); + checkpoint_type_opt->set_key(cdc::kCheckpointType); + checkpoint_type_opt->set_value(cdc::CDCCheckpointType_Name(cdc::IMPLICIT)); + } + } + Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata) REQUIRES(catalog_manager_->mutex_) { DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id)) @@ -245,6 +273,10 @@ class CDCStreamLoader : public Visitor { auto l = stream->LockForWrite(); l.mutable_data()->pb.CopyFrom(metadata); + // If no source_type and checkpoint_type is present, that means the stream was created in + // a previous version where these options were not present. + AddDefaultValuesIfMissing(metadata, &l); + // If the table has been deleted, then mark this stream as DELETING so it can be deleted by the // catalog manager background thread. Otherwise if this stream is missing an entry // for state, then mark its state as Active.