-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rollback to a consistent state after fsync error #131
Changes from 24 commits
4d22351
751965d
847aa7a
0caa976
26f9c3d
9b07cbd
a7bbabd
d5a58f3
e391e96
07a0dc5
0468de4
86631aa
23bd3c1
1d3fb70
e67c58e
cc0f094
aba71e3
d1f85b0
94c274d
f133b25
c2a5c52
790bdce
c3d8b62
575f9a8
a2ebaef
3f934ad
583c192
fa5ffc1
1903deb
88582e7
8588698
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ use std::path::{Path, PathBuf}; | |
use std::sync::Arc; | ||
use std::time::Instant; | ||
|
||
use fail::fail_point; | ||
use fs2::FileExt; | ||
use log::{debug, info, warn}; | ||
use num_derive::{FromPrimitive, ToPrimitive}; | ||
|
@@ -160,9 +161,6 @@ impl<W: Seek + Write> ActiveFile<W> { | |
} | ||
|
||
fn rotate(&mut self, fd: Arc<LogFd>, writer: W) -> Result<()> { | ||
if self.last_sync < self.written { | ||
self.fd.sync()?; | ||
} | ||
self.writer = writer; | ||
self.written = 0; | ||
self.capacity = 0; | ||
|
@@ -172,6 +170,10 @@ impl<W: Seek + Write> ActiveFile<W> { | |
} | ||
|
||
fn truncate(&mut self) -> Result<()> { | ||
fail_point!("active_file::truncate::force", |_| { | ||
self.fd.truncate(self.written)?; | ||
Ok(()) | ||
}); | ||
if self.written < self.capacity { | ||
self.fd.truncate(self.written)?; | ||
self.capacity = self.written; | ||
|
@@ -184,10 +186,10 @@ impl<W: Seek + Write> ActiveFile<W> { | |
self.written = 0; | ||
let mut buf = Vec::with_capacity(LOG_FILE_HEADER_LEN); | ||
LogFileHeader::new().encode(&mut buf)?; | ||
self.write(&buf, true, 0) | ||
self.write(&buf, 0) | ||
} | ||
|
||
fn write(&mut self, buf: &[u8], sync: bool, target_file_size: usize) -> Result<()> { | ||
fn write(&mut self, buf: &[u8], target_file_size: usize) -> Result<()> { | ||
if self.written + buf.len() > self.capacity { | ||
// Use fallocate to pre-allocate disk space for active file. | ||
let alloc = std::cmp::max( | ||
|
@@ -204,8 +206,15 @@ impl<W: Seek + Write> ActiveFile<W> { | |
} | ||
self.writer.write_all(buf)?; | ||
self.written += buf.len(); | ||
if sync { | ||
Ok(()) | ||
} | ||
|
||
fn sync(&mut self) -> Result<()> { | ||
if self.last_sync < self.written { | ||
let start = Instant::now(); | ||
self.fd.sync()?; | ||
self.last_sync = self.written; | ||
LOG_SYNC_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64()); | ||
} | ||
Ok(()) | ||
} | ||
|
@@ -305,19 +314,20 @@ impl<B: FileBuilder> LogManager<B> { | |
Ok(manager) | ||
} | ||
|
||
fn new_log_file(&mut self) -> Result<()> { | ||
fn rotate(&mut self) -> Result<()> { | ||
debug_assert!(self.active_file_seq >= INIT_FILE_ID); | ||
// Necessary to truncate extra zeros from fallocate(). | ||
self.truncate_active_log()?; | ||
self.active_file.truncate()?; | ||
self.active_file.sync()?; | ||
self.active_file_seq += 1; | ||
|
||
let path = build_file_path( | ||
&self.dir, | ||
FileId { | ||
queue: self.queue, | ||
seq: self.active_file_seq, | ||
}, | ||
); | ||
|
||
let fd = Arc::new(LogFd::create(&path)?); | ||
self.all_files.push_back(fd.clone()); | ||
self.active_file.rotate( | ||
|
@@ -326,29 +336,26 @@ impl<B: FileBuilder> LogManager<B> { | |
.build_writer(&path, LogFile::new(fd), true /*create*/)?, | ||
)?; | ||
self.sync_dir()?; | ||
|
||
for listener in &self.listeners { | ||
listener.post_new_log_file(FileId { | ||
queue: self.queue, | ||
seq: self.active_file_seq, | ||
}); | ||
} | ||
|
||
self.update_metrics(); | ||
|
||
Ok(()) | ||
} | ||
|
||
fn sync(&mut self) -> Result<()> { | ||
self.active_file.sync() | ||
} | ||
|
||
fn sync_dir(&self) -> Result<()> { | ||
let path = PathBuf::from(&self.dir); | ||
std::fs::File::open(path).and_then(|d| d.sync_all())?; | ||
Ok(()) | ||
} | ||
|
||
fn truncate_active_log(&mut self) -> Result<()> { | ||
self.active_file.truncate() | ||
} | ||
|
||
fn get_fd(&self, file_seq: FileSeq) -> Result<Arc<LogFd>> { | ||
if file_seq < self.first_file_seq || file_seq > self.active_file_seq { | ||
return Err(Error::Io(IoError::new( | ||
|
@@ -359,10 +366,6 @@ impl<B: FileBuilder> LogManager<B> { | |
Ok(self.all_files[(file_seq - self.first_file_seq) as usize].clone()) | ||
} | ||
|
||
fn get_active_fd(&self) -> Option<Arc<LogFd>> { | ||
self.all_files.back().cloned() | ||
} | ||
|
||
fn purge_to(&mut self, file_seq: FileSeq) -> Result<usize> { | ||
if file_seq > self.active_file_seq { | ||
return Err(box_err!("Purge active or newer files")); | ||
|
@@ -374,15 +377,17 @@ impl<B: FileBuilder> LogManager<B> { | |
Ok(end_offset) | ||
} | ||
|
||
fn append(&mut self, content: &[u8], sync: &mut bool) -> Result<(FileBlockHandle, Arc<LogFd>)> { | ||
if self.active_file.written >= self.rotate_size { | ||
self.new_log_file()?; | ||
} | ||
if self.active_file.since_last_sync() >= self.bytes_per_sync { | ||
*sync = true; | ||
} | ||
fn append(&mut self, content: &[u8]) -> Result<FileBlockHandle> { | ||
let offset = self.active_file.written as u64; | ||
self.active_file.write(content, *sync, self.rotate_size)?; | ||
if let Err(e) = self.active_file.write(content, self.rotate_size) { | ||
if let Err(te) = self.active_file.truncate() { | ||
panic!( | ||
"error when truncate {} after error: {}, get: {}", | ||
self.active_file_seq, e, te | ||
); | ||
} | ||
return Err(e); | ||
} | ||
let handle = FileBlockHandle { | ||
id: FileId { | ||
queue: self.queue, | ||
|
@@ -394,7 +399,7 @@ impl<B: FileBuilder> LogManager<B> { | |
for listener in &self.listeners { | ||
listener.on_append_log_file(handle); | ||
} | ||
Ok((handle, self.active_file.fd.clone())) | ||
Ok(handle) | ||
} | ||
|
||
fn update_metrics(&self) { | ||
|
@@ -638,7 +643,6 @@ impl<B: FileBuilder> FilePipeLog<B> { | |
} | ||
} | ||
} | ||
debug!("Recover queue:{:?} finish.", queue); | ||
Ok(sequential_replay_machine) | ||
}) | ||
.try_reduce( | ||
|
@@ -648,24 +652,10 @@ impl<B: FileBuilder> FilePipeLog<B> { | |
Ok(sequential_replay_machine_left) | ||
}, | ||
)?; | ||
debug!("Recover queue:{:?} finish.", queue); | ||
Ok(sequential_replay_machine) | ||
} | ||
|
||
fn append_bytes( | ||
&self, | ||
queue: LogQueue, | ||
content: &[u8], | ||
sync: &mut bool, | ||
) -> Result<FileBlockHandle> { | ||
let (block_handle, fd) = self.mut_queue(queue).append(content, sync)?; | ||
if *sync { | ||
let start = Instant::now(); | ||
fd.sync()?; | ||
LOG_SYNC_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64()); | ||
} | ||
Ok(block_handle) | ||
} | ||
|
||
fn get_queue(&self, queue: LogQueue) -> RwLockReadGuard<LogManager<B>> { | ||
match queue { | ||
LogQueue::Append => self.appender.read(), | ||
|
@@ -694,9 +684,9 @@ impl<B: FileBuilder> PipeLog for FilePipeLog<B> { | |
Ok(buf) | ||
} | ||
|
||
fn append(&self, queue: LogQueue, bytes: &[u8], mut sync: bool) -> Result<FileBlockHandle> { | ||
fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result<FileBlockHandle> { | ||
let start = Instant::now(); | ||
let block_handle = self.append_bytes(queue, bytes, &mut sync)?; | ||
let block_handle = self.mut_queue(queue).append(bytes)?; | ||
match queue { | ||
LogQueue::Rewrite => { | ||
LOG_APPEND_TIME_HISTOGRAM_VEC | ||
|
@@ -712,10 +702,28 @@ impl<B: FileBuilder> PipeLog for FilePipeLog<B> { | |
Ok(block_handle) | ||
} | ||
|
||
fn sync(&self, queue: LogQueue) -> Result<()> { | ||
if let Some(fd) = self.get_queue(queue).get_active_fd() { | ||
fd.sync()?; | ||
fn maybe_sync(&self, queue: LogQueue, force: bool) -> Result<()> { | ||
let mut manager = match queue { | ||
LogQueue::Append => self.appender.write(), | ||
LogQueue::Rewrite => self.rewriter.write(), | ||
}; | ||
|
||
if manager.active_file.written >= manager.rotate_size { | ||
if let Err(e) = manager.rotate() { | ||
panic!( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It says this panic is not covered. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just found where the problem lays, fixing. |
||
"error when rotate [{:?}:{}]: {}", | ||
queue, manager.active_file_seq, e | ||
); | ||
} | ||
} else if manager.active_file.since_last_sync() >= manager.bytes_per_sync || force { | ||
if let Err(e) = manager.sync() { | ||
panic!( | ||
"error when sync [{:?}:{}]: {}", | ||
queue, manager.active_file_seq, e, | ||
); | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -728,8 +736,8 @@ impl<B: FileBuilder> PipeLog for FilePipeLog<B> { | |
self.get_queue(queue).size() | ||
} | ||
|
||
fn new_log_file(&self, queue: LogQueue) -> Result<()> { | ||
self.mut_queue(queue).new_log_file() | ||
fn rotate(&self, queue: LogQueue) -> Result<()> { | ||
self.mut_queue(queue).rotate() | ||
} | ||
|
||
fn purge_to(&self, file_id: FileId) -> Result<usize> { | ||
|
@@ -860,34 +868,34 @@ mod tests { | |
|
||
// generate file 1, 2, 3 | ||
let content: Vec<u8> = vec![b'a'; 1024]; | ||
let file_handle = pipe_log.append_bytes(queue, &content, &mut false).unwrap(); | ||
let file_handle = pipe_log.mut_queue(queue).append(&content).unwrap(); | ||
pipe_log.maybe_sync(queue, false).unwrap(); | ||
assert_eq!(file_handle.id.seq, 1); | ||
assert_eq!(file_handle.offset, header_size); | ||
assert_eq!(pipe_log.file_span(queue).1, 1); | ||
assert_eq!(pipe_log.file_span(queue).1, 2); | ||
|
||
let file_handle = pipe_log.append_bytes(queue, &content, &mut false).unwrap(); | ||
let file_handle = pipe_log.mut_queue(queue).append(&content).unwrap(); | ||
pipe_log.maybe_sync(queue, false).unwrap(); | ||
assert_eq!(file_handle.id.seq, 2); | ||
assert_eq!(file_handle.offset, header_size); | ||
assert_eq!(pipe_log.file_span(queue).1, 2); | ||
assert_eq!(pipe_log.file_span(queue).1, 3); | ||
|
||
// purge file 1 | ||
assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1); | ||
assert_eq!(pipe_log.file_span(queue).0, 2); | ||
|
||
// cannot purge active file | ||
assert!(pipe_log.purge_to(FileId { queue, seq: 3 }).is_err()); | ||
assert!(pipe_log.purge_to(FileId { queue, seq: 4 }).is_err()); | ||
|
||
// append position | ||
let s_content = b"short content".to_vec(); | ||
let file_handle = pipe_log | ||
.append_bytes(queue, &s_content, &mut false) | ||
.unwrap(); | ||
let file_handle = pipe_log.mut_queue(queue).append(&s_content).unwrap(); | ||
pipe_log.maybe_sync(queue, false).unwrap(); | ||
assert_eq!(file_handle.id.seq, 3); | ||
assert_eq!(file_handle.offset, header_size); | ||
|
||
let file_handle = pipe_log | ||
.append_bytes(queue, &s_content, &mut false) | ||
.unwrap(); | ||
let file_handle = pipe_log.mut_queue(queue).append(&s_content).unwrap(); | ||
pipe_log.maybe_sync(queue, false).unwrap(); | ||
assert_eq!(file_handle.id.seq, 3); | ||
assert_eq!( | ||
file_handle.offset, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename it to
reset