From adc30c0f7c3e76c8e1d2a15673b359bb80e8f7f1 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 27 Sep 2024 22:46:07 +0800 Subject: [PATCH] feat: add `rw_actor_splits` (#18746) Signed-off-by: Shanicky Chen --- proto/meta.proto | 20 ++++ .../catalog/system_catalog/rw_catalog/mod.rs | 1 + .../rw_catalog/rw_actor_splits.rs | 51 ++++++++++ src/frontend/src/meta_client.rs | 7 ++ src/frontend/src/test_utils.rs | 5 + src/meta/service/src/stream_service.rs | 92 ++++++++++++++++++- src/meta/src/controller/catalog.rs | 22 +++++ src/meta/src/controller/fragment.rs | 14 +++ src/meta/src/controller/utils.rs | 12 ++- src/meta/src/stream/source_manager.rs | 15 +++ src/rpc_client/src/meta_client.rs | 15 ++- 11 files changed, 249 insertions(+), 5 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_splits.rs diff --git a/proto/meta.proto b/proto/meta.proto index e088b7fb7330..75a9b5d5caf0 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -261,6 +261,25 @@ message ListActorStatesResponse { repeated ActorState states = 1; } +message ListActorSplitsRequest {} + +message ListActorSplitsResponse { + enum FragmentType { + UNSPECIFIED = 0; + NON_SHARED_SOURCE = 1; + SHARED_SOURCE = 2; + SHARED_SOURCE_BACKFILL = 3; + } + message ActorSplit { + uint32 actor_id = 1; + uint32 fragment_id = 2; + uint32 source_id = 3; + string split_id = 4; + FragmentType fragment_type = 5; + } + repeated ActorSplit actor_splits = 1; +} + message ListObjectDependenciesRequest {} message ListObjectDependenciesResponse { @@ -302,6 +321,7 @@ service StreamManagerService { rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse); rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse); rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse); + rpc ListActorSplits(ListActorSplitsRequest) returns (ListActorSplitsResponse); rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse); rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse); rpc Recover(RecoverRequest) returns (RecoverResponse); diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 75825a74320c..860cdba1ec78 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -57,5 +57,6 @@ mod rw_views; mod rw_worker_nodes; mod rw_actor_id_to_ddl; +mod rw_actor_splits; mod rw_fragment_id_to_ddl; mod rw_worker_actor_count; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_splits.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_splits.rs new file mode 100644 index 000000000000..9cc62598b0df --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_splits.rs @@ -0,0 +1,51 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::Fields; +use risingwave_frontend_macro::system_catalog; +use risingwave_pb::meta::list_actor_splits_response::{ActorSplit, FragmentType}; + +use crate::catalog::system_catalog::SysCatalogReaderImpl; +use crate::error::Result; + +#[derive(Fields)] +#[primary_key(actor_id, split_id, source_id)] +struct RwActorSplit { + actor_id: i32, + split_id: String, + source_id: i32, + fragment_id: i32, + fragment_type: String, +} + +impl From for RwActorSplit { + fn from(actor_split: ActorSplit) -> Self { + Self { + actor_id: actor_split.actor_id as _, + split_id: actor_split.split_id, + source_id: actor_split.source_id as _, + fragment_id: actor_split.fragment_id as _, + fragment_type: FragmentType::try_from(actor_split.fragment_type) + .unwrap_or(FragmentType::Unspecified) + .as_str_name() + .to_string(), + } + } +} + +#[system_catalog(table, "rw_catalog.rw_actor_splits")] +async fn read_rw_actor_splits(reader: &SysCatalogReaderImpl) -> Result> { + let actor_splits = reader.meta_client.list_actor_splits().await?; + Ok(actor_splits.into_iter().map(RwActorSplit::from).collect()) +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 5b6f0e79819c..cf7214509050 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -30,6 +30,7 @@ use risingwave_pb::hummock::{ HummockSnapshot, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; +use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; @@ -68,6 +69,8 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_actor_states(&self) -> Result>; + async fn list_actor_splits(&self) -> Result>; + async fn list_object_dependencies(&self) -> Result>; async fn list_meta_snapshots(&self) -> Result>; @@ -178,6 +181,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.list_actor_states().await } + async fn list_actor_splits(&self) -> Result> { + self.0.list_actor_splits().await + } + async fn list_object_dependencies(&self) -> Result> { self.0.list_object_dependencies().await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 731c7a486df8..42eedf2122fc 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -52,6 +52,7 @@ use risingwave_pb::hummock::{ HummockSnapshot, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; +use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; @@ -973,6 +974,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(vec![]) } + async fn list_actor_splits(&self) -> RpcResult> { + Ok(vec![]) + } + async fn list_object_dependencies(&self) -> RpcResult> { Ok(vec![]) } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 7cc43ae1c78b..273c45b8d2aa 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -16,12 +16,14 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_connector::source::SplitMetaData; use risingwave_meta::manager::{LocalNotification, MetadataManager}; use risingwave_meta::model; use risingwave_meta::model::ActorId; -use risingwave_meta::stream::ThrottleConfig; +use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig}; use risingwave_meta_model_v2::{SourceId, StreamingParallelism}; use risingwave_pb::meta::cancel_creating_jobs_request::Jobs; +use risingwave_pb::meta::list_actor_splits_response::FragmentType; use risingwave_pb::meta::list_table_fragments_response::{ ActorInfo, FragmentInfo, TableFragmentInfo, }; @@ -420,4 +422,92 @@ impl StreamManagerService for StreamServiceImpl { .await; Ok(Response::new(RecoverResponse {})) } + + async fn list_actor_splits( + &self, + _request: Request, + ) -> Result, Status> { + match &self.metadata_manager { + MetadataManager::V1(_) => Ok(Response::new(ListActorSplitsResponse { + // TODO: remove this when v1 is removed + actor_splits: vec![], + })), + MetadataManager::V2(mgr) => { + let SourceManagerRunningInfo { + source_fragments, + backfill_fragments, + mut actor_splits, + } = self.stream_manager.source_manager.get_running_info().await; + + let source_actors = mgr.catalog_controller.list_source_actors().await?; + + let is_shared_source = mgr + .catalog_controller + .list_source_id_with_shared_types() + .await?; + + let fragment_to_source: HashMap<_, _> = + source_fragments + .into_iter() + .flat_map(|(source_id, fragment_ids)| { + let source_type = if is_shared_source + .get(&(source_id as _)) + .copied() + .unwrap_or(false) + { + FragmentType::SharedSource + } else { + FragmentType::NonSharedSource + }; + + fragment_ids + .into_iter() + .map(move |fragment_id| (fragment_id, (source_id, source_type))) + }) + .chain(backfill_fragments.into_iter().flat_map( + |(source_id, fragment_ids)| { + fragment_ids.into_iter().flat_map( + move |(fragment_id, upstream_fragment_id)| { + [ + ( + fragment_id, + (source_id, FragmentType::SharedSourceBackfill), + ), + ( + upstream_fragment_id, + (source_id, FragmentType::SharedSource), + ), + ] + }, + ) + }, + )) + .collect(); + + let actor_splits = source_actors + .into_iter() + .flat_map(|(actor_id, fragment_id)| { + let (source_id, fragment_type) = fragment_to_source + .get(&(fragment_id as _)) + .copied() + .unwrap_or_default(); + + actor_splits + .remove(&(actor_id as _)) + .unwrap_or_default() + .into_iter() + .map(move |split| list_actor_splits_response::ActorSplit { + actor_id: actor_id as _, + source_id: source_id as _, + fragment_id: fragment_id as _, + split_id: split.id().to_string(), + fragment_type: fragment_type.into(), + }) + }) + .collect_vec(); + + Ok(Response::new(ListActorSplitsResponse { actor_splits })) + } + } + } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index cb455c6b49a5..23a6a9d2f32d 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2723,6 +2723,28 @@ impl CatalogController { inner.list_sources().await } + // Return a hashmap to distinguish whether each source is shared or not. + pub async fn list_source_id_with_shared_types(&self) -> MetaResult> { + let inner = self.inner.read().await; + let source_ids: Vec<(SourceId, Option)> = Source::find() + .select_only() + .columns([source::Column::SourceId, source::Column::SourceInfo]) + .into_tuple() + .all(&inner.db) + .await?; + + Ok(source_ids + .into_iter() + .map(|(source_id, info)| { + ( + source_id, + info.map(|info| info.to_protobuf().cdc_source_job) + .unwrap_or(false), + ) + }) + .collect()) + } + pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult> { let inner = self.inner.read().await; let source_ids: Vec = Source::find() diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 3594d50c5d0c..6d1d9ceba40e 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -804,6 +804,20 @@ impl CatalogController { Ok(actor_locations) } + pub async fn list_source_actors(&self) -> MetaResult> { + let inner = self.inner.read().await; + + let source_actors: Vec<(ActorId, FragmentId)> = Actor::find() + .select_only() + .filter(actor::Column::Splits.is_not_null()) + .columns([actor::Column::ActorId, actor::Column::FragmentId]) + .into_tuple() + .all(&inner.db) + .await?; + + Ok(source_actors) + } + pub async fn list_fragment_descs(&self) -> MetaResult> { let inner = self.inner.read().await; let fragment_descs: Vec = Fragment::find() diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 43fed3380d6b..ce7773355929 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -27,8 +27,8 @@ use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege, - view, ActorId, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, PrivilegeId, - SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId, + view, ActorId, ConnectorSplits, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, + PrivilegeId, SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId, }; use risingwave_pb::catalog::{ PbConnection, PbFunction, PbIndex, PbSecret, PbSink, PbSource, PbSubscription, PbTable, PbView, @@ -256,6 +256,14 @@ pub struct PartialActorLocation { pub status: ActorStatus, } +#[derive(Clone, DerivePartialModel, FromQueryResult)] +#[sea_orm(entity = "Actor")] +pub struct PartialActorSplits { + pub actor_id: ActorId, + pub fragment_id: FragmentId, + pub splits: Option, +} + #[derive(FromQueryResult)] pub struct FragmentDesc { pub fragment_id: FragmentId, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 8fcceac82c0c..680712bf3039 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -235,6 +235,12 @@ pub struct SourceManagerCore { actor_splits: HashMap>, } +pub struct SourceManagerRunningInfo { + pub source_fragments: HashMap>, + pub backfill_fragments: HashMap>, + pub actor_splits: HashMap>, +} + impl SourceManagerCore { fn new( metadata_manager: MetadataManager, @@ -1101,6 +1107,15 @@ impl SourceManager { core.actor_splits.clone() } + pub async fn get_running_info(&self) -> SourceManagerRunningInfo { + let core = self.core.lock().await; + SourceManagerRunningInfo { + source_fragments: core.source_fragments.clone(), + backfill_fragments: core.backfill_fragments.clone(), + actor_splits: core.actor_splits.clone(), + } + } + /// Checks whether the external source metadata has changed, and sends a split assignment command /// if it has. /// diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 8e7dc5dee1b4..2ada565c8539 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -74,6 +74,7 @@ use risingwave_pb::meta::cluster_service_client::ClusterServiceClient; use risingwave_pb::meta::event_log_service_client::EventLogServiceClient; use risingwave_pb::meta::heartbeat_request::{extra_info, ExtraInfo}; use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient; +use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; @@ -996,6 +997,15 @@ impl MetaClient { Ok(resp.states) } + pub async fn list_actor_splits(&self) -> Result> { + let resp = self + .inner + .list_actor_splits(ListActorSplitsRequest {}) + .await?; + + Ok(resp.actor_splits) + } + pub async fn list_object_dependencies(&self) -> Result> { let resp = self .inner @@ -2063,6 +2073,7 @@ macro_rules! for_all_meta_rpc { ,{ stream_client, list_table_fragment_states, ListTableFragmentStatesRequest, ListTableFragmentStatesResponse } ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse } ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse } + ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse } ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse } ,{ stream_client, recover, RecoverRequest, RecoverResponse } ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse } @@ -2077,14 +2088,14 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse } ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse } ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse } - ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse } + ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse } ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse } ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse } ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse } ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse } ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse } ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse } - , {ddl_client, drop_secret, DropSecretRequest, DropSecretResponse} + ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse} ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse } ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse } ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }