Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rollback to a consistent state after fsync error #131

Merged
merged 31 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4d22351
rollback to a consistent state when fsync error
MrCroxx Oct 28, 2021
751965d
fix broken tests
MrCroxx Oct 28, 2021
847aa7a
Merge branch 'master' into extract-fsync-handle
MrCroxx Oct 28, 2021
0caa976
Merge branch 'master' into extract-fsync-handle
MrCroxx Oct 28, 2021
26f9c3d
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 1, 2021
9b07cbd
truncate, sync, panic after io error
MrCroxx Nov 1, 2021
a7bbabd
truncate after append error
MrCroxx Nov 1, 2021
d5a58f3
fix typo
MrCroxx Nov 1, 2021
e391e96
panic directly if needed
MrCroxx Nov 3, 2021
07a0dc5
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 3, 2021
0468de4
fix sync after truncate when rotate
MrCroxx Nov 3, 2021
86631aa
add io error test
MrCroxx Nov 3, 2021
23bd3c1
remove Cargo.lock
MrCroxx Nov 3, 2021
1d3fb70
make clippy happy
MrCroxx Nov 3, 2021
e67c58e
use failpoints with cfg_callback
MrCroxx Nov 3, 2021
cc0f094
update README
MrCroxx Nov 3, 2021
aba71e3
refine mod test_io_error
MrCroxx Nov 4, 2021
d1f85b0
update ci
MrCroxx Nov 4, 2021
94c274d
mute expected panic, update ci
MrCroxx Nov 4, 2021
f133b25
make test stable
MrCroxx Nov 4, 2021
c2a5c52
fix concurrent bug in test_io_error
MrCroxx Nov 4, 2021
790bdce
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 4, 2021
c3d8b62
use ticket lock to make ConcurrentWriteContext scale
MrCroxx Nov 4, 2021
575f9a8
fix asan ci
MrCroxx Nov 4, 2021
a2ebaef
Merge branch 'master' into extract-fsync-handle
tabokie Nov 6, 2021
3f934ad
refine panic detect
MrCroxx Nov 4, 2021
583c192
refine test_io_error code
MrCroxx Nov 9, 2021
fa5ffc1
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 10, 2021
1903deb
fix merge error
MrCroxx Nov 10, 2021
88582e7
remove Cargo.lock
MrCroxx Nov 10, 2021
8588698
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ jobs:
- name: Clippy
run: cargo clippy --features failpoints --all --all-targets -- -D clippy::all
- name: Run tests
run: RUST_BACKTRACE=1 cargo test --features failpoints --all --verbose -- --nocapture
run: RUST_BACKTRACE=1 cargo test --features failpoints --workspace --verbose -- --nocapture --skip io_error_tests
- name: Run IO tests
run: RUST_BACKTRACE=1 cargo test --package raft-engine --test io_error_test --features failpoints -- --test-threads 1
- name: Run asan tests
if: ${{ matrix.os == 'ubuntu-latest' }}
run: cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --features failpoints --verbose -- --nocapture
Expand Down
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,9 @@ members = [ "stress" ]
name = "bench_recovery"
harness = false
required-features = ["failpoints"]

[[test]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should send another PR to move all existing failpoint tests to this folder.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'd like to move io error test to this folder first in this PR, then send another one to move the existing test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then add a mod.rs for failpoints folder, listing individual test in main Cargo.toml doesn't look good.

name = "io_error_test"
path = "failpoints/io_error_test.rs"
harness = true
required-features = ["failpoints"]
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ Contributions are always welcome! Here are a few tips for making a PR:
```
cargo fmt --all -- --check
cargo clippy --all --all-targets -- -D clippy::all
cargo test --features failpoints --all
cargo test --features failpoints --workspace -- --skip io_error_tests
cargo test --package raft-engine --test io_error_test --features failpoints -- --test-threads 1
```

- For changes that might induce performance effects, please quote the targeted benchmark results in the PR description. In addition to micro-benchmarks, there is a standalone [stress test tool](https://github.com/tikv/raft-engine/tree/master/stress) which you can use to demonstrate the system performance.
Expand Down
341 changes: 341 additions & 0 deletions failpoints/io_error_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
// use crate::test_util::generate_entries;
// use std::panic;
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(test)]
#[cfg(feature = "failpoints")]
mod io_error_tests {
use raft::eraftpb::Entry;
use raft_engine::{
Config, Engine as RaftLogEngine, LogBatch, MessageExt, ReadableSize, Result,
};
use std::panic;
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

#[derive(Clone)]
pub struct M;

impl MessageExt for M {
type Entry = Entry;

fn index(e: &Entry) -> u64 {
e.index
}
}

fn generate_batch(
region: u64,
begin_index: u64,
end_index: u64,
data: Option<Vec<u8>>,
) -> LogBatch {
let mut batch = LogBatch::default();
let mut v = vec![Entry::new(); (end_index - begin_index) as usize];
let mut index = begin_index;
for e in v.iter_mut() {
e.set_index(index);
if let Some(ref data) = data {
e.set_data(data.clone().into())
}
index += 1;
}
batch.add_entries::<M>(region, &v).unwrap();
batch
}

fn write_tmp_engine(
bytes_per_sync: ReadableSize,
target_file_size: ReadableSize,
entry_size: usize,
entry_count: u64,
batch_count: u64,
) -> Result<usize> {
let dir = tempfile::Builder::new()
.prefix("handle_io_error")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
bytes_per_sync,
target_file_size,
batch_compression_threshold: ReadableSize(0),
..Default::default()
};
let engine = RaftLogEngine::open(cfg).unwrap();

let entry_data = vec![b'x'; entry_size];
let mut index = 1;
let mut written = 0;
for _ in 0..batch_count {
let mut log_batch =
generate_batch(1, index, index + entry_count + 1, Some(entry_data.clone()));
index += entry_count;
written += engine.write(&mut log_batch, false)?;
}
Ok(written)
}

struct ConcurrentWriteContext {
engine: Arc<RaftLogEngine>,
ths: Vec<std::thread::JoinHandle<()>>,
}

impl ConcurrentWriteContext {
fn new(engine: Arc<RaftLogEngine>) -> Self {
Self {
engine,
ths: Vec::new(),
}
}

fn leader_write<F: FnOnce(Result<usize>) + Send + Sync + 'static>(
&mut self,
mut log_batch: LogBatch,
sync: bool,
cb: Option<F>,
) {
if self.ths.is_empty() {
fail::cfg("write_barrier::leader_exit", "pause").unwrap();
let engine_clone = self.engine.clone();
self.ths.push(
std::thread::Builder::new()
.spawn(move || {
engine_clone.write(&mut LogBatch::default(), false).unwrap();
})
.unwrap(),
);
}
let engine_clone = self.engine.clone();
self.ths.push(
std::thread::Builder::new()
.spawn(move || {
let r = engine_clone.write(&mut log_batch, sync);
if let Some(f) = cb {
f(r)
}
})
.unwrap(),
);
}

fn follower_write<F: FnOnce(Result<usize>) + Send + Sync + 'static>(
&mut self,
mut log_batch: LogBatch,
sync: bool,
cb: Option<F>,
) {
assert!(self.ths.len() >= 2);
let engine_clone = self.engine.clone();
self.ths.push(
std::thread::Builder::new()
.spawn(move || {
let r = engine_clone.write(&mut log_batch, sync);
if let Some(f) = cb {
f(r)
}
})
.unwrap(),
);
}

fn join(&mut self) {
fail::remove("write_barrier::leader_exit");
for t in self.ths.drain(..) {
t.join().unwrap();
}
}
}

#[test]
fn test_open_error() {
fail::cfg("log_fd::open::err", "return").unwrap();
assert!(panic::catch_unwind(|| {
let dir = tempfile::Builder::new()
.prefix("handle_io_error")
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
bytes_per_sync: ReadableSize::kb(1),
target_file_size: ReadableSize::kb(4),
batch_compression_threshold: ReadableSize(0),
..Default::default()
};
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
drop(engine);
let _ = RaftLogEngine::open(cfg).unwrap();
})
.is_err());
fail::cfg("log_fd::open::err", "off").unwrap();
}

#[test]
fn test_rotate_error() {
// panic when truncate
fail::cfg("active_file::truncate::force", "return").unwrap();
fail::cfg("log_fd::truncate::err", "return").unwrap();
assert!(panic::catch_unwind(|| {
write_tmp_engine(ReadableSize::kb(1024), ReadableSize::kb(4), 1024, 1, 4)
})
.is_err());

// panic when sync
fail::cfg("active_file::truncate::force", "off").unwrap();
fail::cfg("log_fd::truncate::err", "off").unwrap();
fail::cfg("log_fd::sync::err", "return").unwrap();
assert!(panic::catch_unwind(|| {
write_tmp_engine(ReadableSize::kb(1024), ReadableSize::kb(4), 1024, 1, 4)
})
.is_err());

// panic when create file
fail::cfg("log_fd::sync::err", "off").unwrap();
fail::cfg("log_fd::create::err", "return").unwrap();
assert!(panic::catch_unwind(|| {
write_tmp_engine(ReadableSize::kb(1024), ReadableSize::kb(4), 1024, 1, 4)
})
.is_err());

// panic when write header
fail::cfg("log_fd::create::err", "off").unwrap();
fail::cfg("log_fd::write::err", "return").unwrap();
assert!(panic::catch_unwind(|| {
write_tmp_engine(ReadableSize::kb(1024), ReadableSize::kb(4), 1024, 1, 4)
})
.is_err());
fail::cfg("log_fd::write::err", "off").unwrap();
}

#[test]
fn test_concurrent_write_error() {
// b1 success; b2 fail, truncate; b3 success
let timer = AtomicU64::new(0);
fail::cfg_callback("engine::write::pre", move || {
match timer.fetch_add(1, std::sync::atomic::Ordering::SeqCst) {
2 => fail::cfg("log_fd::write::post_err", "return").unwrap(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between post_err and err? If you want to test file is truncated, a engine restart is needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for checking if the following appends correctly overwrite the pervious ones.

3 => fail::cfg("log_fd::write::post_err", "off").unwrap(),
_ => {}
}
})
.unwrap();

// truncate and sync when write error
let dir = tempfile::Builder::new()
.prefix("handle_io_error")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
bytes_per_sync: ReadableSize::kb(1024),
target_file_size: ReadableSize::kb(1024),
batch_compression_threshold: ReadableSize(0),
..Default::default()
};

let engine = Arc::new(RaftLogEngine::open(cfg).unwrap());
let mut ctx = ConcurrentWriteContext::new(engine.clone());

let content = vec![b'x'; 1024];

ctx.leader_write(
generate_batch(1, 1, 11, Some(content.clone())),
false,
Some(|r: Result<usize>| {
assert!(r.is_ok());
}),
);
ctx.follower_write(
generate_batch(2, 1, 11, Some(content.clone())),
false,
Some(|r: Result<usize>| {
assert!(r.is_err());
}),
);
ctx.follower_write(
generate_batch(3, 1, 11, Some(content)),
false,
Some(|r: Result<usize>| {
assert!(r.is_ok());
}),
);
// ctx.follower_write(generate_batch(3, 1, 11, Some(content)), false);
ctx.join();
assert_eq!(
10,
engine
.fetch_entries_to::<M>(1, 1, 11, None, &mut vec![])
.unwrap()
);
assert_eq!(
0,
engine
.fetch_entries_to::<M>(2, 1, 11, None, &mut vec![])
.unwrap()
);
assert_eq!(
10,
engine
.fetch_entries_to::<M>(3, 1, 11, None, &mut vec![])
.unwrap()
);
fail::cfg("engine::write::pre", "off").unwrap();
}

#[test]
fn test_concurrent_write_truncate_error() {
// truncate and sync when write error
assert!(panic::catch_unwind(|| {
// b0 (ctx); b1 success; b2 fail, truncate, panic; b3(x)
let timer = AtomicU64::new(0);
fail::cfg_callback("engine::write::pre", move || {
match timer.fetch_add(1, std::sync::atomic::Ordering::SeqCst) {
2 => {
fail::cfg("log_fd::write::err", "return").unwrap();
fail::cfg("log_fd::truncate::err", "return").unwrap()
}
3 => {
fail::cfg("log_fd::write::err", "off").unwrap();
fail::cfg("log_fd::truncate::err", "off").unwrap()
}
_ => {}
}
})
.unwrap();

let dir = tempfile::Builder::new()
.prefix("handle_io_error")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
bytes_per_sync: ReadableSize::kb(1024),
target_file_size: ReadableSize::kb(1024),
batch_compression_threshold: ReadableSize(0),
..Default::default()
};

let engine = Arc::new(RaftLogEngine::open(cfg).unwrap());
let mut ctx = ConcurrentWriteContext::new(engine);

let content = vec![b'x'; 1024];

ctx.leader_write(
generate_batch(1, 1, 11, Some(content.clone())),
false,
None::<fn(_)>,
);
ctx.follower_write(
generate_batch(2, 1, 11, Some(content.clone())),
false,
None::<fn(_)>,
);
ctx.follower_write(
generate_batch(3, 1, 11, Some(content)),
false,
None::<fn(_)>,
);
ctx.join();
})
.is_err());
}
}
Loading