-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rollback to a consistent state after fsync error #131
Changes from 16 commits
4d22351
751965d
847aa7a
0caa976
26f9c3d
9b07cbd
a7bbabd
d5a58f3
e391e96
07a0dc5
0468de4
86631aa
23bd3c1
1d3fb70
e67c58e
cc0f094
aba71e3
d1f85b0
94c274d
f133b25
c2a5c52
790bdce
c3d8b62
575f9a8
a2ebaef
3f934ad
583c192
fa5ffc1
1903deb
88582e7
8588698
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,341 @@ | ||
// use crate::test_util::generate_entries; | ||
// use std::panic; | ||
MrCroxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#[cfg(test)] | ||
#[cfg(feature = "failpoints")] | ||
mod io_error_tests { | ||
use raft::eraftpb::Entry; | ||
use raft_engine::{ | ||
Config, Engine as RaftLogEngine, LogBatch, MessageExt, ReadableSize, Result, | ||
}; | ||
use std::panic; | ||
MrCroxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
use std::sync::atomic::AtomicU64; | ||
use std::sync::Arc; | ||
|
||
#[derive(Clone)] | ||
pub struct M; | ||
|
||
impl MessageExt for M { | ||
type Entry = Entry; | ||
|
||
fn index(e: &Entry) -> u64 { | ||
e.index | ||
} | ||
} | ||
|
||
fn generate_batch( | ||
region: u64, | ||
begin_index: u64, | ||
end_index: u64, | ||
data: Option<Vec<u8>>, | ||
) -> LogBatch { | ||
let mut batch = LogBatch::default(); | ||
let mut v = vec![Entry::new(); (end_index - begin_index) as usize]; | ||
let mut index = begin_index; | ||
for e in v.iter_mut() { | ||
e.set_index(index); | ||
if let Some(ref data) = data { | ||
e.set_data(data.clone().into()) | ||
} | ||
index += 1; | ||
} | ||
batch.add_entries::<M>(region, &v).unwrap(); | ||
batch | ||
} | ||
|
||
fn write_tmp_engine( | ||
bytes_per_sync: ReadableSize, | ||
target_file_size: ReadableSize, | ||
entry_size: usize, | ||
entry_count: u64, | ||
batch_count: u64, | ||
) -> Result<usize> { | ||
let dir = tempfile::Builder::new() | ||
.prefix("handle_io_error") | ||
.tempdir() | ||
.unwrap(); | ||
let cfg = Config { | ||
dir: dir.path().to_str().unwrap().to_owned(), | ||
bytes_per_sync, | ||
target_file_size, | ||
batch_compression_threshold: ReadableSize(0), | ||
..Default::default() | ||
}; | ||
let engine = RaftLogEngine::open(cfg).unwrap(); | ||
|
||
let entry_data = vec![b'x'; entry_size]; | ||
let mut index = 1; | ||
let mut written = 0; | ||
for _ in 0..batch_count { | ||
let mut log_batch = | ||
generate_batch(1, index, index + entry_count + 1, Some(entry_data.clone())); | ||
index += entry_count; | ||
written += engine.write(&mut log_batch, false)?; | ||
} | ||
Ok(written) | ||
} | ||
|
||
struct ConcurrentWriteContext { | ||
engine: Arc<RaftLogEngine>, | ||
ths: Vec<std::thread::JoinHandle<()>>, | ||
} | ||
|
||
impl ConcurrentWriteContext { | ||
fn new(engine: Arc<RaftLogEngine>) -> Self { | ||
Self { | ||
engine, | ||
ths: Vec::new(), | ||
} | ||
} | ||
|
||
fn leader_write<F: FnOnce(Result<usize>) + Send + Sync + 'static>( | ||
&mut self, | ||
mut log_batch: LogBatch, | ||
sync: bool, | ||
cb: Option<F>, | ||
) { | ||
if self.ths.is_empty() { | ||
fail::cfg("write_barrier::leader_exit", "pause").unwrap(); | ||
let engine_clone = self.engine.clone(); | ||
self.ths.push( | ||
std::thread::Builder::new() | ||
.spawn(move || { | ||
engine_clone.write(&mut LogBatch::default(), false).unwrap(); | ||
}) | ||
.unwrap(), | ||
); | ||
} | ||
let engine_clone = self.engine.clone(); | ||
self.ths.push( | ||
std::thread::Builder::new() | ||
.spawn(move || { | ||
let r = engine_clone.write(&mut log_batch, sync); | ||
if let Some(f) = cb { | ||
f(r) | ||
} | ||
}) | ||
.unwrap(), | ||
); | ||
} | ||
|
||
fn follower_write<F: FnOnce(Result<usize>) + Send + Sync + 'static>( | ||
&mut self, | ||
mut log_batch: LogBatch, | ||
sync: bool, | ||
cb: Option<F>, | ||
) { | ||
assert!(self.ths.len() >= 2); | ||
let engine_clone = self.engine.clone(); | ||
self.ths.push( | ||
std::thread::Builder::new() | ||
.spawn(move || { | ||
let r = engine_clone.write(&mut log_batch, sync); | ||
if let Some(f) = cb { | ||
f(r) | ||
} | ||
}) | ||
.unwrap(), | ||
); | ||
} | ||
|
||
fn join(&mut self) { | ||
fail::remove("write_barrier::leader_exit"); | ||
for t in self.ths.drain(..) { | ||
t.join().unwrap(); | ||
} | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_open_error() { | ||
fail::cfg("log_fd::open::err", "return").unwrap(); | ||
assert!(panic::catch_unwind(|| { | ||
let dir = tempfile::Builder::new() | ||
.prefix("handle_io_error") | ||
MrCroxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.tempdir() | ||
.unwrap(); | ||
let cfg = Config { | ||
dir: dir.path().to_str().unwrap().to_owned(), | ||
bytes_per_sync: ReadableSize::kb(1), | ||
target_file_size: ReadableSize::kb(4), | ||
batch_compression_threshold: ReadableSize(0), | ||
..Default::default() | ||
}; | ||
let engine = RaftLogEngine::open(cfg.clone()).unwrap(); | ||
drop(engine); | ||
let _ = RaftLogEngine::open(cfg).unwrap(); | ||
}) | ||
.is_err()); | ||
fail::cfg("log_fd::open::err", "off").unwrap(); | ||
} | ||
|
||
#[test] | ||
fn test_rotate_error() { | ||
// panic when truncate | ||
fail::cfg("active_file::truncate::force", "return").unwrap(); | ||
fail::cfg("log_fd::truncate::err", "return").unwrap(); | ||
assert!(panic::catch_unwind(|| { | ||
write_tmp_engine(ReadableSize::kb(1024), ReadableSize::kb(4), 1024, 1, 4) | ||
}) | ||
.is_err()); | ||
|
||
// panic when sync | ||
fail::cfg("active_file::truncate::force", "off").unwrap(); | ||
fail::cfg("log_fd::truncate::err", "off").unwrap(); | ||
fail::cfg("log_fd::sync::err", "return").unwrap(); | ||
assert!(panic::catch_unwind(|| { | ||
write_tmp_engine(ReadableSize::kb(1024), ReadableSize::kb(4), 1024, 1, 4) | ||
}) | ||
.is_err()); | ||
|
||
// panic when create file | ||
fail::cfg("log_fd::sync::err", "off").unwrap(); | ||
fail::cfg("log_fd::create::err", "return").unwrap(); | ||
assert!(panic::catch_unwind(|| { | ||
write_tmp_engine(ReadableSize::kb(1024), ReadableSize::kb(4), 1024, 1, 4) | ||
}) | ||
.is_err()); | ||
|
||
// panic when write header | ||
fail::cfg("log_fd::create::err", "off").unwrap(); | ||
fail::cfg("log_fd::write::err", "return").unwrap(); | ||
assert!(panic::catch_unwind(|| { | ||
write_tmp_engine(ReadableSize::kb(1024), ReadableSize::kb(4), 1024, 1, 4) | ||
}) | ||
.is_err()); | ||
fail::cfg("log_fd::write::err", "off").unwrap(); | ||
} | ||
|
||
#[test] | ||
fn test_concurrent_write_error() { | ||
// b1 success; b2 fail, truncate; b3 success | ||
let timer = AtomicU64::new(0); | ||
fail::cfg_callback("engine::write::pre", move || { | ||
match timer.fetch_add(1, std::sync::atomic::Ordering::SeqCst) { | ||
2 => fail::cfg("log_fd::write::post_err", "return").unwrap(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the difference between post_err and err? If you want to test file is truncated, a engine restart is needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for checking if the following appends correctly overwrite the pervious ones. |
||
3 => fail::cfg("log_fd::write::post_err", "off").unwrap(), | ||
_ => {} | ||
} | ||
}) | ||
.unwrap(); | ||
|
||
// truncate and sync when write error | ||
let dir = tempfile::Builder::new() | ||
.prefix("handle_io_error") | ||
.tempdir() | ||
.unwrap(); | ||
let cfg = Config { | ||
dir: dir.path().to_str().unwrap().to_owned(), | ||
bytes_per_sync: ReadableSize::kb(1024), | ||
target_file_size: ReadableSize::kb(1024), | ||
batch_compression_threshold: ReadableSize(0), | ||
..Default::default() | ||
}; | ||
|
||
let engine = Arc::new(RaftLogEngine::open(cfg).unwrap()); | ||
let mut ctx = ConcurrentWriteContext::new(engine.clone()); | ||
|
||
let content = vec![b'x'; 1024]; | ||
|
||
ctx.leader_write( | ||
generate_batch(1, 1, 11, Some(content.clone())), | ||
false, | ||
Some(|r: Result<usize>| { | ||
assert!(r.is_ok()); | ||
}), | ||
); | ||
ctx.follower_write( | ||
generate_batch(2, 1, 11, Some(content.clone())), | ||
false, | ||
Some(|r: Result<usize>| { | ||
assert!(r.is_err()); | ||
}), | ||
); | ||
ctx.follower_write( | ||
generate_batch(3, 1, 11, Some(content)), | ||
false, | ||
Some(|r: Result<usize>| { | ||
assert!(r.is_ok()); | ||
}), | ||
); | ||
// ctx.follower_write(generate_batch(3, 1, 11, Some(content)), false); | ||
ctx.join(); | ||
assert_eq!( | ||
10, | ||
engine | ||
.fetch_entries_to::<M>(1, 1, 11, None, &mut vec![]) | ||
.unwrap() | ||
); | ||
assert_eq!( | ||
0, | ||
engine | ||
.fetch_entries_to::<M>(2, 1, 11, None, &mut vec![]) | ||
.unwrap() | ||
); | ||
assert_eq!( | ||
10, | ||
engine | ||
.fetch_entries_to::<M>(3, 1, 11, None, &mut vec![]) | ||
.unwrap() | ||
); | ||
fail::cfg("engine::write::pre", "off").unwrap(); | ||
} | ||
|
||
#[test] | ||
fn test_concurrent_write_truncate_error() { | ||
// truncate and sync when write error | ||
assert!(panic::catch_unwind(|| { | ||
// b0 (ctx); b1 success; b2 fail, truncate, panic; b3(x) | ||
let timer = AtomicU64::new(0); | ||
fail::cfg_callback("engine::write::pre", move || { | ||
match timer.fetch_add(1, std::sync::atomic::Ordering::SeqCst) { | ||
2 => { | ||
fail::cfg("log_fd::write::err", "return").unwrap(); | ||
fail::cfg("log_fd::truncate::err", "return").unwrap() | ||
} | ||
3 => { | ||
fail::cfg("log_fd::write::err", "off").unwrap(); | ||
fail::cfg("log_fd::truncate::err", "off").unwrap() | ||
} | ||
_ => {} | ||
} | ||
}) | ||
.unwrap(); | ||
|
||
let dir = tempfile::Builder::new() | ||
.prefix("handle_io_error") | ||
.tempdir() | ||
.unwrap(); | ||
let cfg = Config { | ||
dir: dir.path().to_str().unwrap().to_owned(), | ||
bytes_per_sync: ReadableSize::kb(1024), | ||
target_file_size: ReadableSize::kb(1024), | ||
batch_compression_threshold: ReadableSize(0), | ||
..Default::default() | ||
}; | ||
|
||
let engine = Arc::new(RaftLogEngine::open(cfg).unwrap()); | ||
let mut ctx = ConcurrentWriteContext::new(engine); | ||
|
||
let content = vec![b'x'; 1024]; | ||
|
||
ctx.leader_write( | ||
generate_batch(1, 1, 11, Some(content.clone())), | ||
false, | ||
None::<fn(_)>, | ||
); | ||
ctx.follower_write( | ||
generate_batch(2, 1, 11, Some(content.clone())), | ||
false, | ||
None::<fn(_)>, | ||
); | ||
ctx.follower_write( | ||
generate_batch(3, 1, 11, Some(content)), | ||
false, | ||
None::<fn(_)>, | ||
); | ||
ctx.join(); | ||
}) | ||
.is_err()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should send another PR to move all existing failpoint tests to this folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'd like to move io error test to this folder first in this PR, then send another one to move the existing test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then add a mod.rs for failpoints folder, listing individual test in main Cargo.toml doesn't look good.