Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ban ALTER SOURCE for shared source, and refine all ALTER clauses #18750

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub async fn handle_alter_parallelism(
session.check_privilege_for_drop_alter(schema_name, &**sink)?;
sink.id.sink_id()
}
// TODO: support alter parallelism for shared source
_ => bail!(
"invalid statement type for alter parallelism: {:?}",
stmt_type
Expand Down
9 changes: 7 additions & 2 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::max_column_id;
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_sqlparser::ast::{
Expand Down Expand Up @@ -59,11 +60,15 @@ pub async fn handle_alter_source_column(
};

if catalog.associated_table_id.is_some() {
Err(ErrorCode::NotSupported(
return Err(ErrorCode::NotSupported(
"alter table with connector with ALTER SOURCE statement".to_string(),
"try to use ALTER TABLE instead".to_string(),
))?
)
.into());
};
if catalog.info.is_shared() {
bail_not_implemented!(issue = 123, "alter shared source");
}

// Currently only allow source without schema registry
let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?;
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ pub async fn handle_alter_source_with_sr(
)
.into());
};
if source.info.is_shared() {
bail_not_implemented!(issue = 123, "alter shared source");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 123 just a placeholder?

}

check_format_encode(&source, &connector_schema)?;

Expand Down
8 changes: 8 additions & 0 deletions src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,11 @@ impl From<PbSource> for ActiveModel {
}
}
}

impl Model {
pub fn is_shared(&self) -> bool {
self.source_info
.as_ref()
.is_some_and(|s| s.to_protobuf().is_shared())
}
}
1 change: 1 addition & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl StreamManagerService for StreamServiceImpl {
}
};

// TODO: check whether shared source is correct
let mutation: ThrottleConfig = actor_to_apply
.iter()
.map(|(fragment_id, actors)| {
Expand Down
139 changes: 81 additions & 58 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1669,7 +1669,21 @@ impl CatalogController {
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
let is_shared = source.is_shared();
relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into()));

// Note: For non-shared source, we don't update their state tables, which
// belongs to the MV.
if is_shared {
update_internal_tables(
&txn,
object_id,
object::Column::OwnerId,
Value::Int(Some(new_owner)),
&mut relations,
)
.await?;
}
}
ObjectType::Sink => {
let sink = Sink::find_by_id(object_id)
Expand All @@ -1678,34 +1692,14 @@ impl CatalogController {
.ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into()));

// internal tables.
let internal_tables: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.eq(object_id))
.into_tuple()
.all(&txn)
.await?;

Object::update_many()
.col_expr(
object::Column::OwnerId,
SimpleExpr::Value(Value::Int(Some(new_owner))),
)
.filter(object::Column::Oid.is_in(internal_tables.clone()))
.exec(&txn)
.await?;

let table_objs = Table::find()
.find_also_related(Object)
.filter(table::Column::TableId.is_in(internal_tables))
.all(&txn)
.await?;
for (table, table_obj) in table_objs {
relations.push(PbRelationInfo::Table(
ObjectModel(table, table_obj.unwrap()).into(),
));
}
update_internal_tables(
&txn,
object_id,
object::Column::OwnerId,
Value::Int(Some(new_owner)),
&mut relations,
)
.await?;
}
ObjectType::Subscription => {
let subscription = Subscription::find_by_id(object_id)
Expand Down Expand Up @@ -1883,11 +1877,25 @@ impl CatalogController {
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
let is_shared = source.is_shared();

let mut obj = obj.into_active_model();
obj.schema_id = Set(Some(new_schema));
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into()));

// Note: For non-shared source, we don't update their state tables, which
// belongs to the MV.
if is_shared {
update_internal_tables(
&txn,
object_id,
object::Column::SchemaId,
Value::Int(Some(new_schema)),
&mut relations,
)
.await?;
}
}
ObjectType::Sink => {
let sink = Sink::find_by_id(object_id)
Expand All @@ -1901,36 +1909,14 @@ impl CatalogController {
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into()));

// internal tables.
let internal_tables: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.eq(object_id))
.into_tuple()
.all(&txn)
.await?;

if !internal_tables.is_empty() {
Object::update_many()
.col_expr(
object::Column::SchemaId,
SimpleExpr::Value(Value::Int(Some(new_schema))),
)
.filter(object::Column::Oid.is_in(internal_tables.clone()))
.exec(&txn)
.await?;

let table_objs = Table::find()
.find_also_related(Object)
.filter(table::Column::TableId.is_in(internal_tables))
.all(&txn)
.await?;
for (table, table_obj) in table_objs {
relations.push(PbRelationInfo::Table(
ObjectModel(table, table_obj.unwrap()).into(),
));
}
}
update_internal_tables(
&txn,
object_id,
object::Column::SchemaId,
Value::Int(Some(new_schema)),
&mut relations,
)
.await?;
}
ObjectType::Subscription => {
let subscription = Subscription::find_by_id(object_id)
Expand Down Expand Up @@ -2452,6 +2438,7 @@ impl CatalogController {
}};
}

// TODO: check is there any thing to change for shared source?
let old_name = match object_type {
ObjectType::Table => rename_relation!(Table, table, table_id, object_id),
ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
Expand Down Expand Up @@ -3395,6 +3382,42 @@ impl CatalogControllerInner {
}
}

async fn update_internal_tables(
txn: &DatabaseTransaction,
object_id: i32,
column: object::Column,
new_value: Value,
relations_to_notify: &mut Vec<PbRelationInfo>,
) -> MetaResult<()> {
let internal_tables: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.eq(object_id))
.into_tuple()
.all(txn)
.await?;

if !internal_tables.is_empty() {
Object::update_many()
.col_expr(column, SimpleExpr::Value(new_value))
.filter(object::Column::Oid.is_in(internal_tables.clone()))
.exec(txn)
.await?;

let table_objs = Table::find()
.find_also_related(Object)
.filter(table::Column::TableId.is_in(internal_tables))
.all(txn)
.await?;
for (table, table_obj) in table_objs {
relations_to_notify.push(PbRelationInfo::Table(
ObjectModel(table, table_obj.unwrap()).into(),
));
}
}
Ok(())
}

#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
Expand Down
Loading