Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add rw_actor_splits #18746

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,25 @@ message ListActorStatesResponse {
repeated ActorState states = 1;
}

message ListActorSplitsRequest {}

message ListActorSplitsResponse {
enum FragmentType {
UNSPECIFIED = 0;
NON_SHARED_SOURCE = 1;
SHARED_SOURCE = 2;
SHARED_SOURCE_BACKFILL = 3;
}
message ActorSplit {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 source_id = 3;
string split_id = 4;
FragmentType fragment_type = 5;
}
repeated ActorSplit actor_splits = 1;
}

message ListObjectDependenciesRequest {}

message ListObjectDependenciesResponse {
Expand Down Expand Up @@ -302,6 +321,7 @@ service StreamManagerService {
rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse);
rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse);
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
rpc ListActorSplits(ListActorSplitsRequest) returns (ListActorSplitsResponse);
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ mod rw_views;
mod rw_worker_nodes;

mod rw_actor_id_to_ddl;
mod rw_actor_splits;
mod rw_fragment_id_to_ddl;
mod rw_worker_actor_count;
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::meta::list_actor_splits_response::{ActorSplit, FragmentType};

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
#[primary_key(actor_id, split_id, source_id)]
struct RwActorSplit {
actor_id: i32,
split_id: String,
source_id: i32,
fragment_id: i32,
fragment_type: String,
}

impl From<ActorSplit> for RwActorSplit {
fn from(actor_split: ActorSplit) -> Self {
Self {
actor_id: actor_split.actor_id as _,
split_id: actor_split.split_id,
source_id: actor_split.source_id as _,
fragment_id: actor_split.fragment_id as _,
fragment_type: FragmentType::try_from(actor_split.fragment_type)
.unwrap_or(FragmentType::Unspecified)
.as_str_name()
.to_string(),
}
}
}

#[system_catalog(table, "rw_catalog.rw_actor_splits")]
async fn read_rw_actor_splits(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorSplit>> {
let actor_splits = reader.meta_client.list_actor_splits().await?;
Ok(actor_splits.into_iter().map(RwActorSplit::from).collect())
}
7 changes: 7 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_pb::hummock::{
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
Expand Down Expand Up @@ -68,6 +69,8 @@ pub trait FrontendMetaClient: Send + Sync {

async fn list_actor_states(&self) -> Result<Vec<ActorState>>;

async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>>;

async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;

async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;
Expand Down Expand Up @@ -178,6 +181,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.list_actor_states().await
}

async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
self.0.list_actor_splits().await
}

async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
self.0.list_object_dependencies().await
}
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use risingwave_pb::hummock::{
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
Expand Down Expand Up @@ -973,6 +974,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
Ok(vec![])
}

async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
Ok(vec![])
}

async fn list_object_dependencies(&self) -> RpcResult<Vec<PbObjectDependencies>> {
Ok(vec![])
}
Expand Down
92 changes: 91 additions & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_connector::source::SplitMetaData;
use risingwave_meta::manager::{LocalNotification, MetadataManager};
use risingwave_meta::model;
use risingwave_meta::model::ActorId;
use risingwave_meta::stream::ThrottleConfig;
use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
use risingwave_meta_model_v2::{SourceId, StreamingParallelism};
use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
use risingwave_pb::meta::list_actor_splits_response::FragmentType;
use risingwave_pb::meta::list_table_fragments_response::{
ActorInfo, FragmentInfo, TableFragmentInfo,
};
Expand Down Expand Up @@ -420,4 +422,92 @@ impl StreamManagerService for StreamServiceImpl {
.await;
Ok(Response::new(RecoverResponse {}))
}

async fn list_actor_splits(
&self,
_request: Request<ListActorSplitsRequest>,
) -> Result<Response<ListActorSplitsResponse>, Status> {
match &self.metadata_manager {
MetadataManager::V1(_) => Ok(Response::new(ListActorSplitsResponse {
// TODO: remove this when v1 is removed
actor_splits: vec![],
})),
MetadataManager::V2(mgr) => {
let SourceManagerRunningInfo {
source_fragments,
backfill_fragments,
mut actor_splits,
} = self.stream_manager.source_manager.get_running_info().await;

let source_actors = mgr.catalog_controller.list_source_actors().await?;

let is_shared_source = mgr
.catalog_controller
.list_source_id_with_shared_types()
.await?;

let fragment_to_source: HashMap<_, _> =
source_fragments
.into_iter()
.flat_map(|(source_id, fragment_ids)| {
let source_type = if is_shared_source
.get(&(source_id as _))
.copied()
.unwrap_or(false)
{
FragmentType::SharedSource
} else {
FragmentType::NonSharedSource
};

fragment_ids
.into_iter()
.map(move |fragment_id| (fragment_id, (source_id, source_type)))
})
.chain(backfill_fragments.into_iter().flat_map(
|(source_id, fragment_ids)| {
fragment_ids.into_iter().flat_map(
move |(fragment_id, upstream_fragment_id)| {
[
(
fragment_id,
(source_id, FragmentType::SharedSourceBackfill),
),
(
upstream_fragment_id,
(source_id, FragmentType::SharedSource),
),
]
},
)
},
))
.collect();

let actor_splits = source_actors
.into_iter()
.flat_map(|(actor_id, fragment_id)| {
let (source_id, fragment_type) = fragment_to_source
.get(&(fragment_id as _))
.copied()
.unwrap_or_default();

actor_splits
.remove(&(actor_id as _))
.unwrap_or_default()
.into_iter()
.map(move |split| list_actor_splits_response::ActorSplit {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a column type to show whether it's source fragment or backfill fragment here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to differentiate between the backfill source and the downstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

actor_id: actor_id as _,
source_id: source_id as _,
fragment_id: fragment_id as _,
split_id: split.id().to_string(),
fragment_type: fragment_type.into(),
})
})
.collect_vec();

Ok(Response::new(ListActorSplitsResponse { actor_splits }))
}
}
}
}
22 changes: 22 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,28 @@ impl CatalogController {
inner.list_sources().await
}

// Return a hashmap to distinguish whether each source is shared or not.
pub async fn list_source_id_with_shared_types(&self) -> MetaResult<HashMap<SourceId, bool>> {
let inner = self.inner.read().await;
let source_ids: Vec<(SourceId, Option<StreamSourceInfo>)> = Source::find()
.select_only()
.columns([source::Column::SourceId, source::Column::SourceInfo])
.into_tuple()
.all(&inner.db)
.await?;

Ok(source_ids
.into_iter()
.map(|(source_id, info)| {
(
source_id,
info.map(|info| info.to_protobuf().cdc_source_job)
.unwrap_or(false),
)
})
.collect())
}

pub async fn list_source_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<SourceId>> {
let inner = self.inner.read().await;
let source_ids: Vec<SourceId> = Source::find()
Expand Down
14 changes: 14 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,20 @@ impl CatalogController {
Ok(actor_locations)
}

pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
let inner = self.inner.read().await;

let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
.select_only()
.filter(actor::Column::Splits.is_not_null())
.columns([actor::Column::ActorId, actor::Column::FragmentId])
.into_tuple()
.all(&inner.db)
.await?;

Ok(source_actors)
}

pub async fn list_fragment_descs(&self) -> MetaResult<Vec<FragmentDesc>> {
let inner = self.inner.read().await;
let fragment_descs: Vec<FragmentDesc> = Fragment::find()
Expand Down
12 changes: 10 additions & 2 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::{
actor, actor_dispatcher, connection, database, fragment, function, index, object,
object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege,
view, ActorId, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, PrivilegeId,
SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId,
view, ActorId, ConnectorSplits, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId,
PrivilegeId, SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId,
};
use risingwave_pb::catalog::{
PbConnection, PbFunction, PbIndex, PbSecret, PbSink, PbSource, PbSubscription, PbTable, PbView,
Expand Down Expand Up @@ -256,6 +256,14 @@ pub struct PartialActorLocation {
pub status: ActorStatus,
}

#[derive(Clone, DerivePartialModel, FromQueryResult)]
#[sea_orm(entity = "Actor")]
pub struct PartialActorSplits {
pub actor_id: ActorId,
pub fragment_id: FragmentId,
pub splits: Option<ConnectorSplits>,
}

#[derive(FromQueryResult)]
pub struct FragmentDesc {
pub fragment_id: FragmentId,
Expand Down
15 changes: 15 additions & 0 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ pub struct SourceManagerCore {
actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
}

pub struct SourceManagerRunningInfo {
pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
}

impl SourceManagerCore {
fn new(
metadata_manager: MetadataManager,
Expand Down Expand Up @@ -1101,6 +1107,15 @@ impl SourceManager {
core.actor_splits.clone()
}

pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
let core = self.core.lock().await;
SourceManagerRunningInfo {
source_fragments: core.source_fragments.clone(),
backfill_fragments: core.backfill_fragments.clone(),
actor_splits: core.actor_splits.clone(),
}
}

/// Checks whether the external source metadata has changed, and sends a split assignment command
/// if it has.
///
Expand Down
Loading
Loading