Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate max_committed_epoch of hummock version #18644

Merged
merged 7 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ message MetaSnapshotManifest {
message MetaSnapshotMetadata {
uint64 id = 1;
uint64 hummock_version_id = 2;
uint64 max_committed_epoch = 3;
reserved 3;
reserved 'max_committed_epoch';
reserved 4;
reserved 'safe_epoch';
optional uint32 format_version = 5;
Expand Down
4 changes: 2 additions & 2 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ message HummockVersion {
uint64 id = 1;
// Levels of each compaction group
map<uint64, Levels> levels = 2;
uint64 max_committed_epoch = 3;
uint64 max_committed_epoch = 3 [deprecated = true];
reserved 4;
reserved 'safe_epoch';
map<uint32, TableWatermarks> table_watermarks = 5;
Expand All @@ -191,7 +191,7 @@ message HummockVersionDelta {
uint64 prev_id = 2;
// Levels of each compaction group
map<uint64, GroupDeltas> group_deltas = 3;
uint64 max_committed_epoch = 4;
uint64 max_committed_epoch = 4 [deprecated = true];
reserved 5;
reserved 'safe_epoch';
bool trivial_move = 6;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ workspace-hack = { path = "../workspace-hack" }
assert_matches = "1"
expect-test = "1.5"
rand = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["test"] }
risingwave_test_runner = { workspace = true }

[features]
Expand Down
1 change: 0 additions & 1 deletion src/meta/model_v2/src/hummock_version_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl From<Model> for PbHummockVersionDelta {
let ret = value.full_version_delta.to_protobuf();
assert_eq!(value.id, ret.id as i64);
assert_eq!(value.prev_id, ret.prev_id as i64);
assert_eq!(value.max_committed_epoch, ret.max_committed_epoch as i64);
assert_eq!(value.trivial_move, ret.trivial_move);
ret
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ impl InflightGraphInfo {
}
}

pub fn is_empty(&self) -> bool {
self.fragment_infos.is_empty()
}

/// Update worker nodes snapshot. We need to support incremental updates for it in the future.
pub fn on_new_worker_node_map(&self, node_map: &HashMap<WorkerId, WorkerNode>) {
for (node_id, actors) in &self.actor_map {
Expand Down
14 changes: 9 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use prometheus::HistogramTimer;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_common::{bail, must_match};
use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
Expand Down Expand Up @@ -597,7 +596,7 @@ impl GlobalBarrierManager {
let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;

let initial_invalid_state = BarrierManagerState::new(
TracedEpoch::new(Epoch(INVALID_EPOCH)),
None,
InflightGraphInfo::default(),
InflightSubscriptionInfo::default(),
None,
Expand Down Expand Up @@ -949,7 +948,14 @@ impl GlobalBarrierManager {
}
}

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
let Some((prev_epoch, curr_epoch)) = self.state.next_epoch_pair(&command) else {
// skip the command when there is nothing to do with the barrier
for mut notifier in notifiers {
notifier.notify_started();
notifier.notify_collected();
}
return Ok(());
};

// Insert newly added creating job
if let Command::CreateStreamingJob {
Expand Down Expand Up @@ -1175,7 +1181,6 @@ impl GlobalBarrierManagerContext {
change_log_delta: Default::default(),
committed_epoch: epoch,
tables_to_commit,
is_visible_table_committed_epoch: false,
};
self.hummock_manager.commit_epoch(info).await?;
Ok(())
Expand Down Expand Up @@ -1770,6 +1775,5 @@ fn collect_commit_epoch_info(
change_log_delta: table_new_change_log,
committed_epoch: epoch,
tables_to_commit,
is_visible_table_committed_epoch: true,
}
}
87 changes: 55 additions & 32 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,33 @@ impl GlobalBarrierManager {
.context
.hummock_manager
.on_current_version(|version| {
let max_committed_epoch = version.max_committed_epoch_for_meta();
for (table_id, info) in version.state_table_info.info() {
assert_eq!(
info.committed_epoch, max_committed_epoch,
"table {} with invisible epoch is not purged",
table_id
let state_table_info = version.state_table_info.info();
let committed_epoch = state_table_info
.values()
.map(|info| info.committed_epoch)
.next();
let existing_table_ids = info.existing_table_ids();
for table_id in existing_table_ids {
assert!(
state_table_info.contains_key(&table_id),
"table id {table_id} not registered to hummock but in recovered job {:?}. hummock table info{:?}",
info.existing_table_ids().collect_vec(),
state_table_info
);
}
if let Some(committed_epoch) = committed_epoch {
for (table_id, info) in version.state_table_info.info() {
assert_eq!(
info.committed_epoch, committed_epoch,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm: the reason why this assertion is valid is because snapshot backfill is non-resumable and tables with fake epochs will be cleaned. In the future with partial ckpt recovery, this assertion will not longer hold.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's only a sanity check for current implementation.

"table {} with invisible epoch is not purged",
table_id
);
}
}
(
TracedEpoch::new(Epoch::from(max_committed_epoch)),
committed_epoch.map(|committed_epoch| {
TracedEpoch::new(Epoch::from(committed_epoch))
}),
version.id,
)
})
Expand Down Expand Up @@ -388,30 +405,36 @@ impl GlobalBarrierManager {
subscriptions_to_add: Default::default(),
});

// Use a different `curr_epoch` for each recovery attempt.
let new_epoch = prev_epoch.next();

let mut node_to_collect = control_stream_manager.inject_barrier(
None,
Some(mutation),
(&new_epoch, &prev_epoch),
&BarrierKind::Initial,
&info,
Some(&info),
Some(node_actors),
vec![],
vec![],
)?;
debug!(?node_to_collect, "inject initial barrier");
while !node_to_collect.is_empty() {
let (worker_id, result) = control_stream_manager
.next_complete_barrier_response()
.await;
let resp = result?;
assert_eq!(resp.epoch, prev_epoch.value().0);
assert!(node_to_collect.remove(&worker_id));
}
debug!("collected initial barrier");
let new_epoch = if let Some(prev_epoch) = &prev_epoch {
// Use a different `curr_epoch` for each recovery attempt.
let new_epoch = prev_epoch.next();

let mut node_to_collect = control_stream_manager.inject_barrier(
None,
Some(mutation),
(&new_epoch, prev_epoch),
&BarrierKind::Initial,
&info,
Some(&info),
Some(node_actors),
vec![],
vec![],
)?;
debug!(?node_to_collect, "inject initial barrier");
while !node_to_collect.is_empty() {
let (worker_id, result) = control_stream_manager
.next_complete_barrier_response()
.await;
let resp = result?;
assert_eq!(resp.epoch, prev_epoch.value().0);
assert!(node_to_collect.remove(&worker_id));
}
debug!("collected initial barrier");
Some(new_epoch)
} else {
assert!(info.is_empty());
None
};

(
BarrierManagerState::new(new_epoch, info, subscription_info, paused_reason),
Expand Down Expand Up @@ -446,7 +469,7 @@ impl GlobalBarrierManager {
CheckpointControl::new(self.context.clone(), create_mview_tracker).await;

tracing::info!(
epoch = self.state.in_flight_prev_epoch().value().0,
epoch = self.state.in_flight_prev_epoch().map(|epoch| epoch.value().0),
paused = ?self.state.paused_reason(),
"recovery success"
);
Expand Down
25 changes: 17 additions & 8 deletions src/meta/src/barrier/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::util::epoch::Epoch;
use risingwave_pb::meta::PausedReason;

use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo};
Expand All @@ -23,7 +24,7 @@ pub struct BarrierManagerState {
///
/// There's no need to persist this field. On recovery, we will restore this from the latest
/// committed snapshot in `HummockManager`.
in_flight_prev_epoch: TracedEpoch,
in_flight_prev_epoch: Option<TracedEpoch>,

/// Inflight running actors info.
pub(crate) inflight_graph_info: InflightGraphInfo,
Expand All @@ -36,7 +37,7 @@ pub struct BarrierManagerState {

impl BarrierManagerState {
pub fn new(
in_flight_prev_epoch: TracedEpoch,
in_flight_prev_epoch: Option<TracedEpoch>,
inflight_graph_info: InflightGraphInfo,
inflight_subscription_info: InflightSubscriptionInfo,
paused_reason: Option<PausedReason>,
Expand All @@ -60,16 +61,24 @@ impl BarrierManagerState {
}
}

pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
&self.in_flight_prev_epoch
pub fn in_flight_prev_epoch(&self) -> Option<&TracedEpoch> {
self.in_flight_prev_epoch.as_ref()
}

/// Returns the epoch pair for the next barrier, and updates the state.
pub fn next_epoch_pair(&mut self) -> (TracedEpoch, TracedEpoch) {
let prev_epoch = self.in_flight_prev_epoch.clone();
pub fn next_epoch_pair(&mut self, command: &Command) -> Option<(TracedEpoch, TracedEpoch)> {
if self.inflight_graph_info.is_empty()
&& !matches!(&command, Command::CreateStreamingJob { .. })
{
return None;
};
let in_flight_prev_epoch = self
.in_flight_prev_epoch
.get_or_insert_with(|| TracedEpoch::new(Epoch::now()));
let prev_epoch = in_flight_prev_epoch.clone();
let next_epoch = prev_epoch.next();
self.in_flight_prev_epoch = next_epoch.clone();
(prev_epoch, next_epoch)
*in_flight_prev_epoch = next_epoch.clone();
Some((prev_epoch, next_epoch))
}

/// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub struct CommitEpochInfo {
pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
pub committed_epoch: u64,
pub tables_to_commit: HashSet<TableId>,
pub is_visible_table_committed_epoch: bool,
}

impl HummockManager {
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -79,7 +78,6 @@ impl HummockManager {
change_log_delta,
committed_epoch,
tables_to_commit,
is_visible_table_committed_epoch,
} = commit_info;
let mut versioning_guard = self.versioning.write().await;
let _timer = start_measure_real_process_timer!(self, "commit_epoch");
Expand All @@ -88,11 +86,12 @@ impl HummockManager {
return Ok(());
}

assert!(!tables_to_commit.is_empty());

let versioning: &mut Versioning = &mut versioning_guard;
self.commit_epoch_sanity_check(
committed_epoch,
&tables_to_commit,
is_visible_table_committed_epoch,
&sstables,
&sst_to_context,
&versioning.current_version,
Expand Down Expand Up @@ -194,7 +193,6 @@ impl HummockManager {
let time_travel_delta = version.pre_commit_epoch(
committed_epoch,
&tables_to_commit,
is_visible_table_committed_epoch,
new_compaction_group,
commit_sstables,
&new_table_ids,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,16 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);
let mut new_version_delta = version.new_delta();

let committed_epoch = new_version_delta
.latest_version()
.state_table_info
.info()
.values()
.map(|info| info.committed_epoch)
.max()
.unwrap_or(INVALID_EPOCH);

for (table_id, raw_group_id) in pairs {
let mut group_id = *raw_group_id;
Expand Down Expand Up @@ -265,7 +274,7 @@ impl HummockManager {
.insert(
TableId::new(*table_id),
PbStateTableInfoDelta {
committed_epoch: INVALID_EPOCH,
committed_epoch,
compaction_group_id: *raw_group_id,
}
)
Expand Down Expand Up @@ -293,7 +302,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);
let mut new_version_delta = version.new_delta();
let mut modified_groups: HashMap<CompactionGroupId, /* #member table */ u64> =
HashMap::new();
// Remove member tables
Expand Down Expand Up @@ -481,7 +490,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);
let mut new_version_delta = version.new_delta();

let new_sst_start_id = next_sstable_object_id(
&self.env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);
let mut new_version_delta = version.new_delta();

let target_compaction_group_id = {
// merge right_group_id to left_group_id and remove right_group_id
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelecto

impl<'a> HummockVersionTransaction<'a> {
fn apply_compact_task(&mut self, compact_task: &CompactTask) {
let mut version_delta = self.new_delta(None);
let mut version_delta = self.new_delta();
let trivial_move = CompactStatus::is_trivial_move_task(compact_task);
version_delta.trivial_move = trivial_move;

Expand Down
12 changes: 0 additions & 12 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ impl HummockManager {
&self,
committed_epoch: HummockEpoch,
tables_to_commit: &HashSet<TableId>,
is_visible_table_committed_epoch: bool,
sstables: &[LocalSstableInfo],
sst_to_context: &HashMap<HummockSstableObjectId, HummockContextId>,
current_version: &HummockVersion,
Expand All @@ -215,17 +214,6 @@ impl HummockManager {
}
}

if is_visible_table_committed_epoch
&& committed_epoch <= current_version.max_committed_epoch_for_meta()
{
return Err(anyhow::anyhow!(
"Epoch {} <= max_committed_epoch {}",
committed_epoch,
current_version.max_committed_epoch_for_meta()
)
.into());
}

// sanity check on monotonically increasing table committed epoch
for table_id in tables_to_commit {
if let Some(info) = current_version.state_table_info.info().get(table_id) {
Expand Down
Loading
Loading