Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rollback to a consistent state after fsync error #131

Merged
merged 31 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4d22351
rollback to a consistent state when fsync error
MrCroxx Oct 28, 2021
751965d
fix broken tests
MrCroxx Oct 28, 2021
847aa7a
Merge branch 'master' into extract-fsync-handle
MrCroxx Oct 28, 2021
0caa976
Merge branch 'master' into extract-fsync-handle
MrCroxx Oct 28, 2021
26f9c3d
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 1, 2021
9b07cbd
truncate, sync, panic after io error
MrCroxx Nov 1, 2021
a7bbabd
truncate after append error
MrCroxx Nov 1, 2021
d5a58f3
fix typo
MrCroxx Nov 1, 2021
e391e96
panic directly if needed
MrCroxx Nov 3, 2021
07a0dc5
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 3, 2021
0468de4
fix sync after truncate when rotate
MrCroxx Nov 3, 2021
86631aa
add io error test
MrCroxx Nov 3, 2021
23bd3c1
remove Cargo.lock
MrCroxx Nov 3, 2021
1d3fb70
make clippy happy
MrCroxx Nov 3, 2021
e67c58e
use failpoints with cfg_callback
MrCroxx Nov 3, 2021
cc0f094
update README
MrCroxx Nov 3, 2021
aba71e3
refine mod test_io_error
MrCroxx Nov 4, 2021
d1f85b0
update ci
MrCroxx Nov 4, 2021
94c274d
mute expected panic, update ci
MrCroxx Nov 4, 2021
f133b25
make test stable
MrCroxx Nov 4, 2021
c2a5c52
fix concurrent bug in test_io_error
MrCroxx Nov 4, 2021
790bdce
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 4, 2021
c3d8b62
use ticket lock to make ConcurrentWriteContext scale
MrCroxx Nov 4, 2021
575f9a8
fix asan ci
MrCroxx Nov 4, 2021
a2ebaef
Merge branch 'master' into extract-fsync-handle
tabokie Nov 6, 2021
3f934ad
refine panic detect
MrCroxx Nov 4, 2021
583c192
refine test_io_error code
MrCroxx Nov 9, 2021
fa5ffc1
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 10, 2021
1903deb
fix merge error
MrCroxx Nov 10, 2021
88582e7
remove Cargo.lock
MrCroxx Nov 10, 2021
8588698
Merge branch 'master' into extract-fsync-handle
MrCroxx Nov 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ jobs:
RUST_BACKTRACE: 1
- name: Run asan tests
if: ${{ matrix.os == 'ubuntu-latest' }}
run: cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --features failpoints --verbose -- --nocapture
run: |
cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --all --verbose -- --nocapture
cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --test failpoints --features failpoints --verbose -- --test-threads 1 --nocapture
env:
RUST_BACKTRACE: 1
RUSTFLAGS: '-Zsanitizer=address'
Expand Down
29 changes: 12 additions & 17 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::Instant;
use std::u64;

use fail::fail_point;
use log::{error, info};
use protobuf::{parse_from_bytes, Message};

Expand All @@ -20,7 +21,7 @@ use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
use crate::purge::{PurgeHook, PurgeManager};
use crate::util::InstantExt;
use crate::write_barrier::{WriteBarrier, Writer};
use crate::{Error, Result};
use crate::Result;

pub struct Engine<B = DefaultFileBuilder, P = FilePipeLog<B>>
where
Expand Down Expand Up @@ -122,14 +123,12 @@ where
let mut writer = Writer::new(log_batch as &_, sync);
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
for writer in group.iter_mut() {
fail_point!("engine::write::pre");
sync |= writer.is_sync();
let log_batch = writer.get_payload();
let res = if !log_batch.is_empty() {
self.pipe_log.append(
LogQueue::Append,
log_batch.encoded_bytes(),
false, /*sync*/
)
self.pipe_log
.append(LogQueue::Append, log_batch.encoded_bytes())
} else {
// TODO(tabokie)
Ok(FileBlockHandle {
Expand All @@ -140,16 +139,12 @@ where
};
writer.set_output(res);
}
if sync {
// fsync() is not retryable, a failed attempt could result in
// unrecoverable loss of data written after last successful
// fsync(). See [PostgreSQL's fsync() surprise]
// (https://lwn.net/Articles/752063/) for more details.
if let Err(e) = self.pipe_log.sync(LogQueue::Append) {
for writer in group.iter_mut() {
writer.set_output(Err(Error::Fsync(e.to_string())));
}
}
if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) {
panic!(
"Cannot sync queue: {:?}, for there is an IO error raised: {}",
LogQueue::Append,
e
);
}
}
writer.finish()?
Expand All @@ -171,7 +166,7 @@ where
/// Synchronize the Raft engine.
pub fn sync(&self) -> Result<()> {
// TODO(tabokie): use writer.
self.pipe_log.sync(LogQueue::Append)
self.pipe_log.maybe_sync(LogQueue::Append, true)
}

pub fn put_message<S: Message>(&self, region_id: u64, key: &[u8], m: &S) -> Result<()> {
Expand Down
2 changes: 0 additions & 2 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ pub enum Error {
Corruption(String),
#[error("IO Error: {0}")]
Io(#[from] IoError),
#[error("Fsync Error: {0}")]
Fsync(String),
#[error("Codec Error: {0}")]
Codec(#[from] CodecError),
#[error("Protobuf Error: {0}")]
Expand Down
134 changes: 71 additions & 63 deletions src/file_pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

use fail::fail_point;
use fs2::FileExt;
use log::{debug, info, warn};
use num_derive::{FromPrimitive, ToPrimitive};
Expand Down Expand Up @@ -160,9 +161,6 @@ impl<W: Seek + Write> ActiveFile<W> {
}

fn rotate(&mut self, fd: Arc<LogFd>, writer: W) -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename it to reset

if self.last_sync < self.written {
self.fd.sync()?;
}
self.writer = writer;
self.written = 0;
self.capacity = 0;
Expand All @@ -172,6 +170,10 @@ impl<W: Seek + Write> ActiveFile<W> {
}

fn truncate(&mut self) -> Result<()> {
fail_point!("active_file::truncate::force", |_| {
self.fd.truncate(self.written)?;
Ok(())
});
if self.written < self.capacity {
self.fd.truncate(self.written)?;
self.capacity = self.written;
Expand All @@ -184,10 +186,10 @@ impl<W: Seek + Write> ActiveFile<W> {
self.written = 0;
let mut buf = Vec::with_capacity(LOG_FILE_HEADER_LEN);
LogFileHeader::new().encode(&mut buf)?;
self.write(&buf, true, 0)
self.write(&buf, 0)
}

fn write(&mut self, buf: &[u8], sync: bool, target_file_size: usize) -> Result<()> {
fn write(&mut self, buf: &[u8], target_file_size: usize) -> Result<()> {
if self.written + buf.len() > self.capacity {
// Use fallocate to pre-allocate disk space for active file.
let alloc = std::cmp::max(
Expand All @@ -204,8 +206,15 @@ impl<W: Seek + Write> ActiveFile<W> {
}
self.writer.write_all(buf)?;
self.written += buf.len();
if sync {
Ok(())
}

fn sync(&mut self) -> Result<()> {
if self.last_sync < self.written {
let start = Instant::now();
self.fd.sync()?;
self.last_sync = self.written;
LOG_SYNC_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
}
Ok(())
}
Expand Down Expand Up @@ -305,19 +314,20 @@ impl<B: FileBuilder> LogManager<B> {
Ok(manager)
}

fn new_log_file(&mut self) -> Result<()> {
fn rotate(&mut self) -> Result<()> {
debug_assert!(self.active_file_seq >= INIT_FILE_ID);
// Necessary to truncate extra zeros from fallocate().
self.truncate_active_log()?;
self.active_file.truncate()?;
self.active_file.sync()?;
self.active_file_seq += 1;

let path = build_file_path(
&self.dir,
FileId {
queue: self.queue,
seq: self.active_file_seq,
},
);

let fd = Arc::new(LogFd::create(&path)?);
self.all_files.push_back(fd.clone());
self.active_file.rotate(
Expand All @@ -326,29 +336,26 @@ impl<B: FileBuilder> LogManager<B> {
.build_writer(&path, LogFile::new(fd), true /*create*/)?,
)?;
self.sync_dir()?;

for listener in &self.listeners {
listener.post_new_log_file(FileId {
queue: self.queue,
seq: self.active_file_seq,
});
}

self.update_metrics();

Ok(())
}

fn sync(&mut self) -> Result<()> {
self.active_file.sync()
}

fn sync_dir(&self) -> Result<()> {
let path = PathBuf::from(&self.dir);
std::fs::File::open(path).and_then(|d| d.sync_all())?;
Ok(())
}

fn truncate_active_log(&mut self) -> Result<()> {
self.active_file.truncate()
}

fn get_fd(&self, file_seq: FileSeq) -> Result<Arc<LogFd>> {
if file_seq < self.first_file_seq || file_seq > self.active_file_seq {
return Err(Error::Io(IoError::new(
Expand All @@ -359,10 +366,6 @@ impl<B: FileBuilder> LogManager<B> {
Ok(self.all_files[(file_seq - self.first_file_seq) as usize].clone())
}

fn get_active_fd(&self) -> Option<Arc<LogFd>> {
self.all_files.back().cloned()
}

fn purge_to(&mut self, file_seq: FileSeq) -> Result<usize> {
if file_seq > self.active_file_seq {
return Err(box_err!("Purge active or newer files"));
Expand All @@ -374,15 +377,17 @@ impl<B: FileBuilder> LogManager<B> {
Ok(end_offset)
}

fn append(&mut self, content: &[u8], sync: &mut bool) -> Result<(FileBlockHandle, Arc<LogFd>)> {
if self.active_file.written >= self.rotate_size {
self.new_log_file()?;
}
if self.active_file.since_last_sync() >= self.bytes_per_sync {
*sync = true;
}
fn append(&mut self, content: &[u8]) -> Result<FileBlockHandle> {
let offset = self.active_file.written as u64;
self.active_file.write(content, *sync, self.rotate_size)?;
if let Err(e) = self.active_file.write(content, self.rotate_size) {
if let Err(te) = self.active_file.truncate() {
panic!(
"error when truncate {} after error: {}, get: {}",
self.active_file_seq, e, te
);
}
return Err(e);
}
let handle = FileBlockHandle {
id: FileId {
queue: self.queue,
Expand All @@ -394,7 +399,7 @@ impl<B: FileBuilder> LogManager<B> {
for listener in &self.listeners {
listener.on_append_log_file(handle);
}
Ok((handle, self.active_file.fd.clone()))
Ok(handle)
}

fn update_metrics(&self) {
Expand Down Expand Up @@ -638,7 +643,6 @@ impl<B: FileBuilder> FilePipeLog<B> {
}
}
}
debug!("Recover queue:{:?} finish.", queue);
Ok(sequential_replay_machine)
})
.try_reduce(
Expand All @@ -648,24 +652,10 @@ impl<B: FileBuilder> FilePipeLog<B> {
Ok(sequential_replay_machine_left)
},
)?;
debug!("Recover queue:{:?} finish.", queue);
Ok(sequential_replay_machine)
}

fn append_bytes(
&self,
queue: LogQueue,
content: &[u8],
sync: &mut bool,
) -> Result<FileBlockHandle> {
let (block_handle, fd) = self.mut_queue(queue).append(content, sync)?;
if *sync {
let start = Instant::now();
fd.sync()?;
LOG_SYNC_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
}
Ok(block_handle)
}

fn get_queue(&self, queue: LogQueue) -> RwLockReadGuard<LogManager<B>> {
match queue {
LogQueue::Append => self.appender.read(),
Expand Down Expand Up @@ -694,9 +684,9 @@ impl<B: FileBuilder> PipeLog for FilePipeLog<B> {
Ok(buf)
}

fn append(&self, queue: LogQueue, bytes: &[u8], mut sync: bool) -> Result<FileBlockHandle> {
fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result<FileBlockHandle> {
let start = Instant::now();
let block_handle = self.append_bytes(queue, bytes, &mut sync)?;
let block_handle = self.mut_queue(queue).append(bytes)?;
match queue {
LogQueue::Rewrite => {
LOG_APPEND_TIME_HISTOGRAM_VEC
Expand All @@ -712,10 +702,28 @@ impl<B: FileBuilder> PipeLog for FilePipeLog<B> {
Ok(block_handle)
}

fn sync(&self, queue: LogQueue) -> Result<()> {
if let Some(fd) = self.get_queue(queue).get_active_fd() {
fd.sync()?;
fn maybe_sync(&self, queue: LogQueue, force: bool) -> Result<()> {
let mut manager = match queue {
LogQueue::Append => self.appender.write(),
LogQueue::Rewrite => self.rewriter.write(),
};

if manager.active_file.written >= manager.rotate_size {
if let Err(e) = manager.rotate() {
panic!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says this panic is not covered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the coverage test locally and it said these lines are covered. And I checked manually and made sure that it is covered.

image

I've also checked the CI script, which seems fine. I'm not sure if it's a bug of the CI coverage test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found where the problem lays, fixing.

"error when rotate [{:?}:{}]: {}",
queue, manager.active_file_seq, e
);
}
} else if manager.active_file.since_last_sync() >= manager.bytes_per_sync || force {
if let Err(e) = manager.sync() {
panic!(
"error when sync [{:?}:{}]: {}",
queue, manager.active_file_seq, e,
);
}
}

Ok(())
}

Expand All @@ -728,8 +736,8 @@ impl<B: FileBuilder> PipeLog for FilePipeLog<B> {
self.get_queue(queue).size()
}

fn new_log_file(&self, queue: LogQueue) -> Result<()> {
self.mut_queue(queue).new_log_file()
fn rotate(&self, queue: LogQueue) -> Result<()> {
self.mut_queue(queue).rotate()
}

fn purge_to(&self, file_id: FileId) -> Result<usize> {
Expand Down Expand Up @@ -860,34 +868,34 @@ mod tests {

// generate file 1, 2, 3
let content: Vec<u8> = vec![b'a'; 1024];
let file_handle = pipe_log.append_bytes(queue, &content, &mut false).unwrap();
let file_handle = pipe_log.mut_queue(queue).append(&content).unwrap();
pipe_log.maybe_sync(queue, false).unwrap();
assert_eq!(file_handle.id.seq, 1);
assert_eq!(file_handle.offset, header_size);
assert_eq!(pipe_log.file_span(queue).1, 1);
assert_eq!(pipe_log.file_span(queue).1, 2);

let file_handle = pipe_log.append_bytes(queue, &content, &mut false).unwrap();
let file_handle = pipe_log.mut_queue(queue).append(&content).unwrap();
pipe_log.maybe_sync(queue, false).unwrap();
assert_eq!(file_handle.id.seq, 2);
assert_eq!(file_handle.offset, header_size);
assert_eq!(pipe_log.file_span(queue).1, 2);
assert_eq!(pipe_log.file_span(queue).1, 3);

// purge file 1
assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1);
assert_eq!(pipe_log.file_span(queue).0, 2);

// cannot purge active file
assert!(pipe_log.purge_to(FileId { queue, seq: 3 }).is_err());
assert!(pipe_log.purge_to(FileId { queue, seq: 4 }).is_err());

// append position
let s_content = b"short content".to_vec();
let file_handle = pipe_log
.append_bytes(queue, &s_content, &mut false)
.unwrap();
let file_handle = pipe_log.mut_queue(queue).append(&s_content).unwrap();
pipe_log.maybe_sync(queue, false).unwrap();
assert_eq!(file_handle.id.seq, 3);
assert_eq!(file_handle.offset, header_size);

let file_handle = pipe_log
.append_bytes(queue, &s_content, &mut false)
.unwrap();
let file_handle = pipe_log.mut_queue(queue).append(&s_content).unwrap();
pipe_log.maybe_sync(queue, false).unwrap();
assert_eq!(file_handle.id.seq, 3);
assert_eq!(
file_handle.offset,
Expand Down
3 changes: 3 additions & 0 deletions src/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ impl LogFd {
written += bytes;
offset += bytes;
}
fail_point!("log_fd::write::post_err", |_| {
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved
Err(from_nix_error(nix::Error::invalid_argument(), "fp"))
});
Ok(written)
}

Expand Down
Loading