diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index eb09175c..39c7a713 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -36,7 +36,11 @@ jobs: - name: Clippy run: cargo clippy --features failpoints --all --all-targets -- -D clippy::all - name: Run tests - run: RUST_BACKTRACE=1 cargo test --features failpoints --all --verbose -- --nocapture + run: | + cargo test --all --verbose + cargo test --test failpoints --features failpoints --verbose -- --test-threads 1 --nocapture + env: + RUST_BACKTRACE: 1 - name: Run asan tests if: ${{ matrix.os == 'ubuntu-latest' }} run: cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --features failpoints --verbose -- --nocapture @@ -62,7 +66,9 @@ jobs: - name: Install grcov run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi - name: Run tests - run: cargo test --features failpoints --all --verbose + run: | + cargo test --all --verbose + cargo test --test failpoints --features failpoints -- --test-threads 1 --nocapture env: RUSTFLAGS: '-Zinstrument-coverage' LLVM_PROFILE_FILE: '%p-%m.profraw' diff --git a/Cargo.toml b/Cargo.toml index e320210d..fb52d27d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,17 @@ edition = "2018" name = "append-compact-purge" path = "examples/append_compact_purge.rs" +[[test]] +name = "failpoints" +path = "tests/failpoints/mod.rs" +required-features = ["failpoints"] + +[[bench]] +name = "benches" +path = "tests/benches/mod.rs" +harness = false +required-features = ["failpoints"] + [dependencies] byteorder = "1.2" crc32fast = "1.2" @@ -53,8 +64,3 @@ protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", rev = "82 [workspace] members = [ "stress" ] - -[[bench]] -name = "bench_recovery" -harness = false -required-features = ["failpoints"] diff --git a/README.md b/README.md index 16d54d00..2cc0d17c 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,8 @@ Contributions are always welcome! Here are a few tips for making a PR: ``` cargo fmt --all -- --check cargo clippy --all --all-targets -- -D clippy::all -cargo test --features failpoints --all +cargo test --all +cargo test --test failpoints --features failpoints -- --test-threads 1 ``` - For changes that might induce performance effects, please quote the targeted benchmark results in the PR description. In addition to micro-benchmarks, there is a standalone [stress test tool](https://github.com/tikv/raft-engine/tree/master/stress) which you can use to demonstrate the system performance. diff --git a/src/consistency.rs b/src/consistency.rs index 35b5c770..c2778730 100644 --- a/src/consistency.rs +++ b/src/consistency.rs @@ -9,15 +9,15 @@ use crate::Result; #[derive(Default)] pub struct ConsistencyChecker { - // Raft group id -> last index - pending: HashMap, - // Raft group id -> last uncorrupted index + // Raft group id -> first index, last index + raft_groups: HashMap, + // Raft group id -> last unaffected index corrupted: HashMap, } impl ConsistencyChecker { - pub fn finish(self) -> Vec<(u64, u64)> { - self.corrupted.into_iter().collect() + pub fn finish(self) -> HashMap { + self.corrupted } } @@ -29,14 +29,15 @@ impl ReplayMachine for ConsistencyChecker { let incoming_first_index = ents.0.first().unwrap().index; let incoming_last_index = ents.0.last().unwrap().index; let last_index = self - .pending + .raft_groups .entry(item.raft_group_id) - .or_insert(incoming_last_index); - if *last_index + 1 < incoming_first_index { + .or_insert((incoming_first_index, incoming_last_index)); + if last_index.1 + 1 < incoming_first_index { self.corrupted - .insert(item.raft_group_id, incoming_first_index); + .entry(item.raft_group_id) + .or_insert(last_index.1); } - *last_index = incoming_last_index; + last_index.1 = incoming_last_index; } } } @@ -44,11 +45,24 @@ impl ReplayMachine for ConsistencyChecker { } fn merge(&mut self, mut rhs: Self, _queue: LogQueue) -> Result<()> { - for (id, last_index) in rhs.pending.drain() { - self.pending.insert(id, last_index); + let mut new_corrupted: HashMap = HashMap::default(); + // Find holes between self and rhs. + for (id, (first, last)) in rhs.raft_groups.drain() { + self.raft_groups + .entry(id) + .and_modify(|(_, l)| { + if *l + 1 < first { + new_corrupted.insert(id, *l); + } + *l = last; + }) + .or_insert((first, last)); + } + for (id, last_index) in new_corrupted.drain() { + self.corrupted.entry(id).or_insert(last_index); } for (id, last_index) in rhs.corrupted.drain() { - self.corrupted.insert(id, last_index); + self.corrupted.entry(id).or_insert(last_index); } Ok(()) } diff --git a/src/engine.rs b/src/engine.rs index 0f66b5ec..b624b08b 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -8,11 +8,11 @@ use std::u64; use log::{error, info}; use protobuf::{parse_from_bytes, Message}; -use crate::config::Config; +use crate::config::{Config, RecoveryMode}; use crate::consistency::ConsistencyChecker; use crate::event_listener::EventListener; use crate::file_builder::*; -use crate::file_pipe_log::{FilePipeLog, ReplayMachine}; +use crate::file_pipe_log::FilePipeLog; use crate::log_batch::{Command, LogBatch, MessageExt}; use crate::memtable::{EntryIndex, MemTableAccessor, MemTableRecoverContext}; use crate::metrics::*; @@ -273,27 +273,42 @@ where v }) } + + pub fn file_span(&self, queue: LogQueue) -> (u64, u64) { + self.pipe_log.file_span(queue) + } +} + +impl Engine> { + pub fn consistency_check(path: &std::path::Path) -> Result> { + Self::consistency_check_with(path, Arc::new(DefaultFileBuilder {})) + } } impl Engine> where B: FileBuilder, { - /// Return a list of corrupted Raft groups, including their id and last unaffected - /// log index. - pub fn consistency_check(cfg: Config, file_builder: Arc) -> Result> { - let (_, mut append, rewrite) = + /// Returns a list of corrupted Raft groups, including their id and last unaffected + /// log index. Head or tail corruption might not be detected. + pub fn consistency_check_with( + path: &std::path::Path, + file_builder: Arc, + ) -> Result> { + let cfg = Config { + dir: path.to_str().unwrap().to_owned(), + recovery_mode: RecoveryMode::TolerateAnyCorruption, + ..Default::default() + }; + let (_, append, rewrite) = FilePipeLog::open::(&cfg, file_builder, vec![])?; - append.merge(rewrite, LogQueue::Rewrite)?; - Ok(append.finish()) - } - - pub fn unsafe_truncate_raft_groups( - _cfg: Config, - _file_builder: Arc, - _raft_groups: Vec<(u64, Option)>, - ) -> Result<()> { - todo!() + let mut map = rewrite.finish(); + for (id, index) in append.finish() { + map.entry(id).or_insert(index); + } + let mut list: Vec<(u64, u64)> = map.into_iter().collect(); + list.sort_unstable(); + Ok(list) } } @@ -325,37 +340,34 @@ mod tests { type RaftLogEngine = Engine; impl RaftLogEngine { - fn append(&self, raft_group_id: u64, entries: &[Entry]) -> Result { - let mut batch = LogBatch::default(); - batch.add_entries::(raft_group_id, entries)?; - self.write(&mut batch, false) + fn append(&self, raft_group_id: u64, entries: &[Entry]) { + if !entries.is_empty() { + let mut batch = LogBatch::default(); + batch.add_entries::(raft_group_id, entries).unwrap(); + batch + .put_message( + raft_group_id, + b"last_index".to_vec(), + &RaftLocalState { + last_index: entries[entries.len() - 1].index, + ..Default::default() + }, + ) + .unwrap(); + self.write(&mut batch, true).unwrap(); + } } - } - fn append_log(engine: &RaftLogEngine, raft: u64, entry: &Entry) { - let mut log_batch = LogBatch::default(); - log_batch - .add_entries::(raft, &[entry.clone()]) - .unwrap(); - log_batch - .put_message( - raft, - b"last_index".to_vec(), - &RaftLocalState { - last_index: entry.index, - ..Default::default() - }, - ) - .unwrap(); - engine.write(&mut log_batch, false).unwrap(); - } + fn append_one(&self, raft_group_id: u64, entry: &Entry) { + self.append(raft_group_id, &[entry.clone()]) + } - fn last_index(engine: &RaftLogEngine, raft: u64) -> u64 { - engine - .get_message::(raft, b"last_index") - .unwrap() - .unwrap() - .last_index + fn last_index_slow(&self, raft_group_id: u64) -> u64 { + self.get_message::(raft_group_id, b"last_index") + .unwrap() + .unwrap() + .last_index + } } #[test] @@ -373,7 +385,7 @@ mod tests { }; let engine = RaftLogEngine::open(cfg).unwrap(); - append_log(&engine, 1, &Entry::new()); + engine.append_one(1, &Entry::new()); assert!(engine.memtables.get(1).is_some()); let mut log_batch = LogBatch::default(); @@ -402,9 +414,9 @@ mod tests { entry.set_data(vec![b'x'; entry_size].into()); for i in 10..20 { entry.set_index(i); - engine.append(i, &[entry.clone()]).unwrap(); + engine.append_one(i, &entry); entry.set_index(i + 1); - engine.append(i, &[entry.clone()]).unwrap(); + engine.append_one(i, &entry); } for i in 10..20 { @@ -461,7 +473,7 @@ mod tests { entry.set_data(vec![b'x'; 1024].into()); for i in 0..100 { entry.set_index(i); - append_log(&engine, 1, &entry); + engine.append_one(1, &entry); } // GC all log entries. Won't trigger purge because total size is not enough. @@ -474,7 +486,7 @@ mod tests { // Append more logs to make total size greater than `purge_threshold`. for i in 100..250 { entry.set_index(i); - append_log(&engine, 1, &entry); + engine.append_one(1, &entry); } // GC first 101 log entries. @@ -532,7 +544,7 @@ mod tests { for i in 1..=10 { for j in 1..=10 { entry.set_index(i); - append_log(&engine, j, &entry); + engine.append_one(j, &entry); } } @@ -551,7 +563,7 @@ mod tests { for j in 1..=10 { let e = engine.get_entry::(j, i).unwrap().unwrap(); assert_eq!(e.get_data(), entry.get_data()); - assert_eq!(last_index(&engine, j), 10); + assert_eq!(engine.last_index_slow(j), 10); } } @@ -566,7 +578,7 @@ mod tests { for j in 1..=10 { let e = engine.get_entry::(j, i).unwrap().unwrap(); assert_eq!(e.get_data(), entry.get_data()); - assert_eq!(last_index(&engine, j), 10); + assert_eq!(engine.last_index_slow(j), 10); } } @@ -574,7 +586,7 @@ mod tests { for i in 11..=20 { for j in 1..=10 { entry.set_index(i); - append_log(&engine, j, &entry); + engine.append_one(j, &entry); } } @@ -607,7 +619,7 @@ mod tests { // entries[1..10], Clean, entries[2..11] for j in 1..=10 { entry.set_index(j); - append_log(&engine, 1, &entry); + engine.append_one(1, &entry); } let mut log_batch = LogBatch::with_capacity(1); log_batch.add_command(1, Command::Clean); @@ -617,7 +629,7 @@ mod tests { entry.set_data(vec![b'y'; 1024].into()); for j in 2..=11 { entry.set_index(j); - append_log(&engine, 1, &entry); + engine.append_one(1, &entry); } assert_eq!(engine.pipe_log.file_span(LogQueue::Append).1, 1); @@ -626,7 +638,7 @@ mod tests { for i in 2..64 { for j in 1..=10 { entry.set_index(j); - append_log(&engine, i, &entry); + engine.append_one(i, &entry); } } @@ -662,7 +674,7 @@ mod tests { for i in 64..=128 { for j in 1..=10 { entry.set_index(j); - append_log(&engine, i, &entry); + engine.append_one(i, &entry); } } @@ -690,187 +702,6 @@ mod tests { test_clean_raft_with_only_rewrite(false); } - #[test] - #[cfg(feature = "failpoints")] - fn test_pipe_log_listeners() { - use std::collections::HashMap; - use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; - use std::time::Duration; - - #[derive(Default)] - struct QueueHook { - files: AtomicUsize, - appends: AtomicUsize, - applys: AtomicUsize, - purged: AtomicU64, - } - - impl QueueHook { - fn files(&self) -> usize { - self.files.load(Ordering::Acquire) - } - fn appends(&self) -> usize { - self.appends.load(Ordering::Acquire) - } - fn applys(&self) -> usize { - self.applys.load(Ordering::Acquire) - } - fn purged(&self) -> u64 { - self.purged.load(Ordering::Acquire) - } - } - - struct Hook(HashMap); - impl Default for Hook { - fn default() -> Hook { - let mut hash = HashMap::default(); - hash.insert(LogQueue::Append, QueueHook::default()); - hash.insert(LogQueue::Rewrite, QueueHook::default()); - Hook(hash) - } - } - - impl EventListener for Hook { - fn post_new_log_file(&self, id: FileId) { - self.0[&id.queue].files.fetch_add(1, Ordering::Release); - } - - fn on_append_log_file(&self, handle: FileBlockHandle) { - self.0[&handle.id.queue] - .appends - .fetch_add(1, Ordering::Release); - } - - fn post_apply_memtables(&self, id: FileId) { - self.0[&id.queue].applys.fetch_add(1, Ordering::Release); - } - - fn post_purge(&self, id: FileId) { - self.0[&id.queue].purged.store(id.seq, Ordering::Release); - } - } - - let dir = tempfile::Builder::new() - .prefix("test_pipe_log_listeners") - .tempdir() - .unwrap(); - - let cfg = Config { - dir: dir.path().to_str().unwrap().to_owned(), - target_file_size: ReadableSize::kb(128), - purge_threshold: ReadableSize::kb(512), - batch_compression_threshold: ReadableSize::kb(0), - ..Default::default() - }; - - let hook = Arc::new(Hook::default()); - let engine = - Arc::new(RaftLogEngine::open_with_listeners(cfg.clone(), vec![hook.clone()]).unwrap()); - assert_eq!(hook.0[&LogQueue::Append].files(), 1); - assert_eq!(hook.0[&LogQueue::Rewrite].files(), 1); - - let mut entry = Entry::new(); - entry.set_data(vec![b'x'; 64 * 1024].into()); - - // Append 10 logs for region 1, 10 logs for region 2. - for i in 1..=20 { - let region_id = (i as u64 - 1) % 2 + 1; - entry.set_index((i as u64 + 1) / 2); - append_log(&engine, region_id, &entry); - assert_eq!(hook.0[&LogQueue::Append].appends(), i); - assert_eq!(hook.0[&LogQueue::Append].applys(), i); - } - assert_eq!(hook.0[&LogQueue::Append].files(), 10); - - assert!(engine - .purge_manager - .needs_rewrite_log_files(LogQueue::Append)); - engine.purge_manager.purge_expired_files().unwrap(); - assert_eq!(hook.0[&LogQueue::Append].purged(), 8); - - // All things in a region will in one write batch. - assert_eq!(hook.0[&LogQueue::Rewrite].files(), 2); - assert_eq!(hook.0[&LogQueue::Rewrite].appends(), 2); - assert_eq!(hook.0[&LogQueue::Rewrite].applys(), 2); - - // Append 5 logs for region 1, 5 logs for region 2. - for i in 21..=30 { - let region_id = (i as u64 - 1) % 2 + 1; - entry.set_index((i as u64 + 1) / 2); - append_log(&engine, region_id, &entry); - assert_eq!(hook.0[&LogQueue::Append].appends(), i); - assert_eq!(hook.0[&LogQueue::Append].applys(), i); - } - // Compact so that almost all content of rewrite queue will become garbage. - engine.compact_to(1, 14); - engine.compact_to(2, 14); - assert_eq!(hook.0[&LogQueue::Append].appends(), 32); - assert_eq!(hook.0[&LogQueue::Append].applys(), 32); - - engine.purge_manager.purge_expired_files().unwrap(); - assert_eq!(hook.0[&LogQueue::Append].purged(), 13); - assert_eq!(hook.0[&LogQueue::Rewrite].purged(), 2); - - // Write region 3 without applying. - let apply_memtable_region_3_fp = "memtable_accessor::apply::region_3"; - fail::cfg(apply_memtable_region_3_fp, "pause").unwrap(); - let engine_clone = engine.clone(); - let mut entry_clone = entry.clone(); - let th = std::thread::spawn(move || { - entry_clone.set_index(1); - append_log(&engine_clone, 3, &entry_clone); - }); - - // Sleep a while to wait the log batch `Append(3, [1])` to get written. - std::thread::sleep(Duration::from_millis(200)); - assert_eq!(hook.0[&LogQueue::Append].appends(), 33); - let file_not_applied = engine.pipe_log.file_span(LogQueue::Append).1; - assert_eq!(hook.0[&LogQueue::Append].applys(), 32); - - for i in 31..=40 { - let region_id = (i as u64 - 1) % 2 + 1; - entry.set_index((i as u64 + 1) / 2); - append_log(&engine, region_id, &entry); - assert_eq!(hook.0[&LogQueue::Append].appends(), i + 3); - assert_eq!(hook.0[&LogQueue::Append].applys(), i + 2); - } - - // Can't purge because region 3 is not yet applied. - assert!(engine - .purge_manager - .needs_rewrite_log_files(LogQueue::Append)); - engine.purge_manager.purge_expired_files().unwrap(); - let first = engine.pipe_log.file_span(LogQueue::Append).0; - assert_eq!(file_not_applied, first); - - // Resume write on region 3. - fail::remove(apply_memtable_region_3_fp); - th.join().unwrap(); - - std::thread::sleep(Duration::from_millis(200)); - engine.purge_manager.purge_expired_files().unwrap(); - let new_first = engine.pipe_log.file_span(LogQueue::Append).0; - assert_ne!(file_not_applied, new_first); - - // Drop and then recover. - drop(engine); - - let hook = Arc::new(Hook::default()); - let engine = RaftLogEngine::open_with_listeners(cfg, vec![hook.clone()]).unwrap(); - assert_eq!( - hook.0[&LogQueue::Append].files() as u64, - engine.pipe_log.file_span(LogQueue::Append).1 - - engine.pipe_log.file_span(LogQueue::Append).0 - + 1 - ); - assert_eq!( - hook.0[&LogQueue::Rewrite].files() as u64, - engine.pipe_log.file_span(LogQueue::Rewrite).1 - - engine.pipe_log.file_span(LogQueue::Rewrite).0 - + 1 - ); - } - #[test] fn test_empty_protobuf_message() { let dir = tempfile::Builder::new() @@ -929,122 +760,4 @@ mod tests { empty_state ); } - - #[cfg(feature = "failpoints")] - struct ConcurrentWriteContext { - engine: Arc, - ths: Vec>, - } - - #[cfg(feature = "failpoints")] - impl ConcurrentWriteContext { - fn new(engine: Arc) -> Self { - Self { - engine, - ths: Vec::new(), - } - } - - fn leader_write(&mut self, mut log_batch: LogBatch) { - if self.ths.is_empty() { - fail::cfg("write_barrier::leader_exit", "pause").unwrap(); - let engine_clone = self.engine.clone(); - self.ths.push( - std::thread::Builder::new() - .spawn(move || { - engine_clone.write(&mut LogBatch::default(), true).unwrap(); - }) - .unwrap(), - ); - } - let engine_clone = self.engine.clone(); - self.ths.push( - std::thread::Builder::new() - .spawn(move || { - engine_clone.write(&mut log_batch, true).unwrap(); - }) - .unwrap(), - ); - } - - fn follower_write(&mut self, mut log_batch: LogBatch) { - assert!(self.ths.len() == 2); - let engine_clone = self.engine.clone(); - self.ths.push( - std::thread::Builder::new() - .spawn(move || { - engine_clone.write(&mut log_batch, true).unwrap(); - }) - .unwrap(), - ); - } - - fn join(&mut self) { - fail::remove("write_barrier::leader_exit"); - for t in self.ths.drain(..) { - t.join().unwrap(); - } - } - } - - #[test] - #[cfg(feature = "failpoints")] - fn test_concurrent_write_empty_log_batch() { - let dir = tempfile::Builder::new() - .prefix("test_concurrent_write_empty_log_batch") - .tempdir() - .unwrap(); - let cfg = Config { - dir: dir.path().to_str().unwrap().to_owned(), - ..Default::default() - }; - let engine = Arc::new(RaftLogEngine::open(cfg.clone()).unwrap()); - let mut ctx = ConcurrentWriteContext::new(engine.clone()); - - let some_entries = vec![ - Entry::new(), - Entry { - index: 1, - ..Default::default() - }, - ]; - - ctx.leader_write(LogBatch::default()); - let mut log_batch = LogBatch::default(); - log_batch.add_entries::(1, &some_entries).unwrap(); - ctx.follower_write(log_batch); - ctx.join(); - - let mut log_batch = LogBatch::default(); - log_batch.add_entries::(2, &some_entries).unwrap(); - ctx.leader_write(log_batch); - ctx.follower_write(LogBatch::default()); - ctx.join(); - drop(ctx); - drop(engine); - - let engine = RaftLogEngine::open(cfg).unwrap(); - let mut entries = Vec::new(); - engine - .fetch_entries_to::( - 1, /*region*/ - 0, /*begin*/ - 2, /*end*/ - None, /*max_size*/ - &mut entries, - ) - .unwrap(); - assert_eq!(entries, some_entries); - entries.clear(); - engine - .fetch_entries_to::( - 2, /*region*/ - 0, /*begin*/ - 2, /*end*/ - None, /*max_size*/ - &mut entries, - ) - .unwrap(); - assert_eq!(entries, some_entries); - } } diff --git a/src/log_batch.rs b/src/log_batch.rs index 82880cfb..3e1ab31f 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -665,6 +665,24 @@ impl LogBatch { (&mut self.buf[header_offset + 8..header_offset + 16]) .write_u64::(footer_roffset as u64)?; + #[cfg(feature = "failpoints")] + { + let corrupted_items = || { + fail::fail_point!("log_batch::corrupted_items", |_| true); + false + }; + let corrupted_entries = || { + fail::fail_point!("log_batch::corrupted_entries", |_| true); + false + }; + if corrupted_items() { + self.buf[footer_roffset] += 1; + } + if corrupted_entries() && footer_roffset > LOG_BATCH_HEADER_LEN { + self.buf[footer_roffset - 1] += 1; + } + } + self.buf_state = BufState::Sealed(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN); Ok(self.buf.len() - header_offset) } diff --git a/src/memtable.rs b/src/memtable.rs index 72e7f5f5..4455dd57 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -87,10 +87,16 @@ impl MemTable { /// Mrege from newer neighbor `rhs`. /// Only called during parllel recovery. pub fn merge_newer_neighbor(&mut self, rhs: &mut Self) { - assert_eq!(self.region_id, rhs.region_id); + debug_assert_eq!(self.region_id, rhs.region_id); if let (Some(last), Some(next)) = (self.entry_indexes.back(), rhs.entry_indexes.front()) { - assert_eq!(last.index + 1, next.index); + assert_eq!( + last.index + 1, + next.index, + "memtable {} has a hole", + self.region_id + ); } + debug_assert!(rhs.rewrite_count == 0 || self.rewrite_count == self.entry_indexes.len()); self.entry_indexes.append(&mut rhs.entry_indexes); self.rewrite_count += rhs.rewrite_count; self.kvs.extend(rhs.kvs.drain()); @@ -102,16 +108,13 @@ impl MemTable { debug_assert_eq!(rhs.rewrite_count, rhs.entry_indexes.len()); debug_assert_eq!(self.rewrite_count, 0); - if !self.entry_indexes.is_empty() { - if !rhs.entry_indexes.is_empty() { - let front = self.entry_indexes[0].index; - let rewrite_front = rhs.entry_indexes.front().unwrap().index; - let rewrite_back = rhs.entry_indexes.back().unwrap().index; - if front > rewrite_back + 1 { - rhs.compact_to(rewrite_back + 1); + if let Some((first, _)) = self.span() { + if let Some((_, rewrite_last)) = rhs.span() { + if first > rewrite_last + 1 { + rhs.compact_to(rewrite_last + 1); } else { - assert!(front >= rewrite_front); - rhs.truncate_back(front); + // TODO(tabokie): add test case for first < rewrite_first. + rhs.unsafe_truncate_back(first); } } rhs.entry_indexes.append(&mut self.entry_indexes); @@ -172,38 +175,37 @@ impl MemTable { } pub fn get_entry(&self, index: u64) -> Option { - if self.entry_indexes.is_empty() { - return None; - } + if let Some((first, last)) = self.span() { + if index < first || index > last { + return None; + } - let first_index = self.entry_indexes.front().unwrap().index; - let last_index = self.entry_indexes.back().unwrap().index; - if index < first_index || index > last_index { - return None; + let ioffset = (index - first) as usize; + let entry_index = self.entry_indexes[ioffset]; + Some(entry_index) + } else { + None } - - let ioffset = (index - first_index) as usize; - let entry_index = self.entry_indexes[ioffset]; - Some(entry_index) } pub fn append(&mut self, entry_indexes: Vec) { if entry_indexes.is_empty() { return; } - - let first_index_to_add = entry_indexes[0].index; - self.truncate_back(first_index_to_add); - - if let Some(index) = self.entry_indexes.back() { - assert_eq!( - index.index + 1, - first_index_to_add, + if let Some((first, last)) = self.span() { + let first_index_to_add = entry_indexes[0].index; + assert!( + first <= first_index_to_add, + "corrupted raft {}", + self.region_id + ); + assert!( + last + 1 >= first_index_to_add, "memtable {} has a hole", self.region_id ); + self.unsafe_truncate_back(first_index_to_add); } - self.entry_indexes.extend(entry_indexes); } @@ -230,15 +232,14 @@ impl MemTable { if rewrite_indexes.is_empty() { return; } - if self.entry_indexes.is_empty() { + let len = self.entry_indexes.len(); + if len == 0 { self.global_stats .add_compacted_rewrite(rewrite_indexes.len()); return; } - let first = self.entry_indexes[0].index; - let last = self.entry_indexes[self.entry_indexes.len() - 1].index; - + let last = self.entry_indexes[len - 1].index; let rewrite_first = std::cmp::max(rewrite_indexes[0].index, first); let rewrite_last = std::cmp::min(rewrite_indexes[rewrite_indexes.len() - 1].index, last); let rewrite_len = (rewrite_last + 1).saturating_sub(rewrite_first) as usize; @@ -300,24 +301,21 @@ impl MemTable { // Removes all entry indexes with index greater than or equal to `index`. // Returns the truncated amount. - fn truncate_back(&mut self, index: u64) -> usize { - if self.entry_indexes.is_empty() { - return 0; - } - let first = self.entry_indexes[0].index; - let last = self.entry_indexes[self.entry_indexes.len() - 1].index; - // Compacted entries can't be overwritten. - assert!(first <= index, "corrupted raft {}", self.region_id); + fn unsafe_truncate_back(&mut self, index: u64) -> usize { + if let Some((first, last)) = self.span() { + self.entry_indexes + .truncate(index.saturating_sub(first) as usize); - self.entry_indexes.truncate((index - first) as usize); + if self.rewrite_count > self.entry_indexes.len() { + let compacted_rewrite = self.rewrite_count - self.entry_indexes.len(); + self.rewrite_count = self.entry_indexes.len(); + self.global_stats.add_compacted_rewrite(compacted_rewrite); + } - if self.rewrite_count > self.entry_indexes.len() { - let compacted_rewrite = self.rewrite_count - self.entry_indexes.len(); - self.rewrite_count = self.entry_indexes.len(); - self.global_stats.add_compacted_rewrite(compacted_rewrite); + (last + 1).saturating_sub(index) as usize + } else { + 0 } - - (last + 1).saturating_sub(index) as usize } fn maybe_shrink_entry_indexes(&mut self) { @@ -338,19 +336,20 @@ impl MemTable { if end <= begin { return Ok(()); } - if self.entry_indexes.is_empty() { + let len = self.entry_indexes.len(); + if len == 0 { return Err(Error::EntryNotFound); } - let first_index = self.entry_indexes.front().unwrap().index; - if begin < first_index { + let first = self.entry_indexes[0].index; + if begin < first { return Err(Error::EntryCompacted); } - let last_index = self.entry_indexes.back().unwrap().index; - if end > last_index + 1 { + let last = self.entry_indexes[len - 1].index; + if end > last + 1 { return Err(Error::EntryNotFound); } - let start_pos = (begin - first_index) as usize; + let start_pos = (begin - first) as usize; let end_pos = (end - begin) as usize + start_pos; let (first, second) = slices_in_range(&self.entry_indexes, start_pos, end_pos); @@ -395,8 +394,8 @@ impl MemTable { pub fn fetch_rewritten_entry_indexes(&self, vec_idx: &mut Vec) -> Result<()> { if self.rewrite_count > 0 { + let first = self.entry_indexes[0].index; let end = self.entry_indexes[self.rewrite_count - 1].index + 1; - let first = self.entry_indexes.front().unwrap().index; self.fetch_entries_to(first, end, None, vec_idx) } else { Ok(()) @@ -461,6 +460,19 @@ impl MemTable { self.entry_indexes.back().map(|e| e.index) } + #[inline] + fn span(&self) -> Option<(u64, u64)> { + let len = self.entry_indexes.len(); + if len > 0 { + Some(( + self.entry_indexes[0].index, + self.entry_indexes[len - 1].index, + )) + } else { + None + } + } + #[cfg(test)] fn consistency_check(&self) { let mut seen_append = false; diff --git a/benches/bench_recovery.rs b/tests/benches/bench_recovery.rs similarity index 95% rename from benches/bench_recovery.rs rename to tests/benches/bench_recovery.rs index 3356351b..983488fe 100644 --- a/benches/bench_recovery.rs +++ b/tests/benches/bench_recovery.rs @@ -1,19 +1,15 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use criterion::{criterion_group, BenchmarkId, Criterion}; use raft::eraftpb::Entry; use raft_engine::ReadableSize; -use raft_engine::{Config as EngineConfig, Engine as RaftLogEngine, LogBatch, MessageExt, Result}; +use raft_engine::{Config as EngineConfig, Engine, LogBatch, MessageExt, Result}; use rand::{Rng, SeedableRng}; use std::collections::HashMap; use std::fmt; use std::path::PathBuf; use tempfile::TempDir; -extern crate libc; - -type Engine = RaftLogEngine; - #[derive(Clone)] struct MessageExtTyped; impl MessageExt for MessageExtTyped { @@ -187,4 +183,3 @@ criterion_group! { config = Criterion::default().sample_size(10); targets = bench_recovery } -criterion_main!(benches); diff --git a/tests/benches/mod.rs b/tests/benches/mod.rs new file mode 100644 index 00000000..baacd43c --- /dev/null +++ b/tests/benches/mod.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +#![feature(test)] + +extern crate libc; +extern crate test; + +use criterion::criterion_main; + +mod bench_recovery; + +criterion_main!(bench_recovery::benches); diff --git a/tests/failpoints/mod.rs b/tests/failpoints/mod.rs new file mode 100644 index 00000000..e78ab5b0 --- /dev/null +++ b/tests/failpoints/mod.rs @@ -0,0 +1,9 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +#![feature(test)] + +extern crate test; + +mod util; + +mod test_engine; diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs new file mode 100644 index 00000000..34803ee9 --- /dev/null +++ b/tests/failpoints/test_engine.rs @@ -0,0 +1,357 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use std::sync::Arc; + +use kvproto::raft_serverpb::RaftLocalState; +use raft::eraftpb::Entry; +use raft_engine::*; + +use crate::util::{catch_unwind_silent, MessageExtTyped}; + +fn append(engine: &Engine, raft_group_id: u64, entries: &[Entry]) { + if !entries.is_empty() { + let mut batch = LogBatch::default(); + batch + .add_entries::(raft_group_id, entries) + .unwrap(); + batch + .put_message( + raft_group_id, + b"last_index".to_vec(), + &RaftLocalState { + last_index: entries[entries.len() - 1].index, + ..Default::default() + }, + ) + .unwrap(); + engine.write(&mut batch, true).unwrap(); + } +} + +fn append_one(engine: &Engine, raft_group_id: u64, entry: &Entry) { + append(engine, raft_group_id, &[entry.clone()]) +} + +#[test] +fn test_pipe_log_listeners() { + use std::collections::HashMap; + use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; + use std::time::Duration; + + #[derive(Default)] + struct QueueHook { + files: AtomicUsize, + appends: AtomicUsize, + applys: AtomicUsize, + purged: AtomicU64, + } + + impl QueueHook { + fn files(&self) -> usize { + self.files.load(Ordering::Acquire) + } + fn appends(&self) -> usize { + self.appends.load(Ordering::Acquire) + } + fn applys(&self) -> usize { + self.applys.load(Ordering::Acquire) + } + fn purged(&self) -> u64 { + self.purged.load(Ordering::Acquire) + } + } + + struct Hook(HashMap); + impl Default for Hook { + fn default() -> Hook { + let mut hash = HashMap::default(); + hash.insert(LogQueue::Append, QueueHook::default()); + hash.insert(LogQueue::Rewrite, QueueHook::default()); + Hook(hash) + } + } + + impl EventListener for Hook { + fn post_new_log_file(&self, id: FileId) { + self.0[&id.queue].files.fetch_add(1, Ordering::Release); + } + + fn on_append_log_file(&self, handle: FileBlockHandle) { + self.0[&handle.id.queue] + .appends + .fetch_add(1, Ordering::Release); + } + + fn post_apply_memtables(&self, id: FileId) { + self.0[&id.queue].applys.fetch_add(1, Ordering::Release); + } + + fn post_purge(&self, id: FileId) { + self.0[&id.queue].purged.store(id.seq, Ordering::Release); + } + } + + let dir = tempfile::Builder::new() + .prefix("test_pipe_log_listeners") + .tempdir() + .unwrap(); + + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize::kb(128), + purge_threshold: ReadableSize::kb(512), + batch_compression_threshold: ReadableSize::kb(0), + ..Default::default() + }; + + let hook = Arc::new(Hook::default()); + let engine = Arc::new(Engine::open_with_listeners(cfg.clone(), vec![hook.clone()]).unwrap()); + assert_eq!(hook.0[&LogQueue::Append].files(), 1); + assert_eq!(hook.0[&LogQueue::Rewrite].files(), 1); + + let mut entry = Entry::new(); + entry.set_data(vec![b'x'; 64 * 1024].into()); + + // Append 10 logs for region 1, 10 logs for region 2. + for i in 1..=20 { + let region_id = (i as u64 - 1) % 2 + 1; + entry.set_index((i as u64 + 1) / 2); + append_one(&engine, region_id, &entry); + assert_eq!(hook.0[&LogQueue::Append].appends(), i); + assert_eq!(hook.0[&LogQueue::Append].applys(), i); + } + assert_eq!(hook.0[&LogQueue::Append].files(), 10); + + engine.purge_expired_files().unwrap(); + assert_eq!(hook.0[&LogQueue::Append].purged(), 8); + + // All things in a region will in one write batch. + assert_eq!(hook.0[&LogQueue::Rewrite].files(), 2); + assert_eq!(hook.0[&LogQueue::Rewrite].appends(), 2); + assert_eq!(hook.0[&LogQueue::Rewrite].applys(), 2); + + // Append 5 logs for region 1, 5 logs for region 2. + for i in 21..=30 { + let region_id = (i as u64 - 1) % 2 + 1; + entry.set_index((i as u64 + 1) / 2); + append_one(&engine, region_id, &entry); + assert_eq!(hook.0[&LogQueue::Append].appends(), i); + assert_eq!(hook.0[&LogQueue::Append].applys(), i); + } + // Compact so that almost all content of rewrite queue will become garbage. + engine.compact_to(1, 14); + engine.compact_to(2, 14); + assert_eq!(hook.0[&LogQueue::Append].appends(), 32); + assert_eq!(hook.0[&LogQueue::Append].applys(), 32); + + engine.purge_expired_files().unwrap(); + assert_eq!(hook.0[&LogQueue::Append].purged(), 13); + assert_eq!(hook.0[&LogQueue::Rewrite].purged(), 2); + + // Write region 3 without applying. + let apply_memtable_region_3_fp = "memtable_accessor::apply::region_3"; + fail::cfg(apply_memtable_region_3_fp, "pause").unwrap(); + let engine_clone = engine.clone(); + let mut entry_clone = entry.clone(); + let th = std::thread::spawn(move || { + entry_clone.set_index(1); + append_one(&engine_clone, 3, &entry_clone); + }); + + // Sleep a while to wait the log batch `Append(3, [1])` to get written. + std::thread::sleep(Duration::from_millis(200)); + assert_eq!(hook.0[&LogQueue::Append].appends(), 33); + let file_not_applied = engine.file_span(LogQueue::Append).1; + assert_eq!(hook.0[&LogQueue::Append].applys(), 32); + + for i in 31..=40 { + let region_id = (i as u64 - 1) % 2 + 1; + entry.set_index((i as u64 + 1) / 2); + append_one(&engine, region_id, &entry); + assert_eq!(hook.0[&LogQueue::Append].appends(), i + 3); + assert_eq!(hook.0[&LogQueue::Append].applys(), i + 2); + } + + // Can't purge because region 3 is not yet applied. + engine.purge_expired_files().unwrap(); + let first = engine.file_span(LogQueue::Append).0; + assert_eq!(file_not_applied, first); + + // Resume write on region 3. + fail::remove(apply_memtable_region_3_fp); + th.join().unwrap(); + + std::thread::sleep(Duration::from_millis(200)); + engine.purge_expired_files().unwrap(); + let new_first = engine.file_span(LogQueue::Append).0; + assert_ne!(file_not_applied, new_first); + + // Drop and then recover. + drop(engine); + + let hook = Arc::new(Hook::default()); + let engine = Engine::open_with_listeners(cfg, vec![hook.clone()]).unwrap(); + assert_eq!( + hook.0[&LogQueue::Append].files() as u64, + engine.file_span(LogQueue::Append).1 - engine.file_span(LogQueue::Append).0 + 1 + ); + assert_eq!( + hook.0[&LogQueue::Rewrite].files() as u64, + engine.file_span(LogQueue::Rewrite).1 - engine.file_span(LogQueue::Rewrite).0 + 1 + ); +} + +struct ConcurrentWriteContext { + engine: Arc, + ths: Vec>, +} + +impl ConcurrentWriteContext { + fn new(engine: Arc) -> Self { + Self { + engine, + ths: Vec::new(), + } + } + + fn leader_write(&mut self, mut log_batch: LogBatch) { + if self.ths.is_empty() { + fail::cfg("write_barrier::leader_exit", "pause").unwrap(); + let engine_clone = self.engine.clone(); + self.ths.push( + std::thread::Builder::new() + .spawn(move || { + engine_clone.write(&mut LogBatch::default(), true).unwrap(); + }) + .unwrap(), + ); + } + let engine_clone = self.engine.clone(); + self.ths.push( + std::thread::Builder::new() + .spawn(move || { + engine_clone.write(&mut log_batch, true).unwrap(); + }) + .unwrap(), + ); + } + + fn follower_write(&mut self, mut log_batch: LogBatch) { + assert!(self.ths.len() == 2); + let engine_clone = self.engine.clone(); + self.ths.push( + std::thread::Builder::new() + .spawn(move || { + engine_clone.write(&mut log_batch, true).unwrap(); + }) + .unwrap(), + ); + } + + fn join(&mut self) { + fail::remove("write_barrier::leader_exit"); + for t in self.ths.drain(..) { + t.join().unwrap(); + } + } +} + +#[test] +fn test_concurrent_write_empty_log_batch() { + let dir = tempfile::Builder::new() + .prefix("test_concurrent_write_empty_log_batch") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + ..Default::default() + }; + let engine = Arc::new(Engine::open(cfg.clone()).unwrap()); + let mut ctx = ConcurrentWriteContext::new(engine.clone()); + + let some_entries = vec![ + Entry::new(), + Entry { + index: 1, + ..Default::default() + }, + ]; + + ctx.leader_write(LogBatch::default()); + let mut log_batch = LogBatch::default(); + log_batch + .add_entries::(1, &some_entries) + .unwrap(); + ctx.follower_write(log_batch); + ctx.join(); + + let mut log_batch = LogBatch::default(); + log_batch + .add_entries::(2, &some_entries) + .unwrap(); + ctx.leader_write(log_batch); + ctx.follower_write(LogBatch::default()); + ctx.join(); + drop(ctx); + drop(engine); + + let engine = Engine::open(cfg).unwrap(); + let mut entries = Vec::new(); + engine + .fetch_entries_to::( + 1, /*region*/ + 0, /*begin*/ + 2, /*end*/ + None, /*max_size*/ + &mut entries, + ) + .unwrap(); + assert_eq!(entries, some_entries); + entries.clear(); + engine + .fetch_entries_to::( + 2, /*region*/ + 0, /*begin*/ + 2, /*end*/ + None, /*max_size*/ + &mut entries, + ) + .unwrap(); + assert_eq!(entries, some_entries); +} + +#[test] +fn test_consistency_tools() { + let dir = tempfile::Builder::new() + .prefix("test_consistency_tools1") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(128), + ..Default::default() + }; + let engine = Arc::new(Engine::open(cfg.clone()).unwrap()); + let mut entry = Entry::new(); + entry.set_data(vec![b'x'; 128].into()); + for index in 1..=100 { + entry.set_index(index); + for rid in 1..=10 { + if index == rid * rid { + fail::cfg("log_batch::corrupted_items", "return").unwrap(); + } + append_one(&engine, rid, &entry); + if index == rid * rid { + fail::remove("log_batch::corrupted_items"); + } + } + } + drop(engine); + + let ids = Engine::consistency_check(dir.path()).unwrap(); + for (id, index) in ids.iter() { + assert_eq!(id * id, index + 1); + } + + assert!(catch_unwind_silent(|| Engine::open(cfg.clone()).unwrap()).is_err()); +} diff --git a/tests/failpoints/util.rs b/tests/failpoints/util.rs new file mode 100644 index 00000000..7edb9bde --- /dev/null +++ b/tests/failpoints/util.rs @@ -0,0 +1,28 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use std::panic::{self, AssertUnwindSafe}; + +use raft::eraftpb::Entry; +use raft_engine::MessageExt; + +#[derive(Clone)] +pub struct MessageExtTyped; +impl MessageExt for MessageExtTyped { + type Entry = Entry; + + fn index(entry: &Entry) -> u64 { + entry.index + } +} + +/// Catch panic while suppressing default panic hook. +pub fn catch_unwind_silent(f: F) -> std::thread::Result +where + F: FnOnce() -> R, +{ + let prev_hook = panic::take_hook(); + panic::set_hook(Box::new(|_| {})); + let result = panic::catch_unwind(AssertUnwindSafe(f)); + panic::set_hook(prev_hook); + result +}