Skip to content

Commit

Permalink
truncate, sync, panic after io error
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Nov 1, 2021
1 parent 26f9c3d commit 9b07cbd
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 149 deletions.
29 changes: 9 additions & 20 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog};
use crate::purge::{PurgeHook, PurgeManager};
use crate::util::InstantExt;
use crate::write_barrier::{WriteBarrier, Writer};
use crate::{Error, Result};
use crate::Result;

pub struct Engine<B = DefaultFileBuilder, P = FilePipeLog<B>>
where
Expand Down Expand Up @@ -121,13 +121,12 @@ where
let block_handle = {
let mut writer = Writer::new(log_batch as &_, sync);
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
let mut ctx = self.pipe_log.pre_write(LogQueue::Append);
for writer in group.iter_mut() {
sync |= writer.is_sync();
let log_batch = writer.get_payload();
let res = if !log_batch.is_empty() {
self.pipe_log
.append(LogQueue::Append, &mut ctx, log_batch.encoded_bytes())
.append(LogQueue::Append, log_batch.encoded_bytes())
} else {
// TODO(tabokie)
Ok(FileBlockHandle {
Expand All @@ -138,22 +137,12 @@ where
};
writer.set_output(res);
}
match self.pipe_log.post_write(LogQueue::Append, ctx, false) {
Ok(()) => {}
Err(Error::Fsync(true, msg)) => {
// fsync() is not retryable, a failed attempt could result in
// unrecoverable loss of data written after last successful
// fsync(). See [PostgreSQL's fsync() surprise]
// (https://lwn.net/Articles/752063/) for more details.
for writer in group.iter_mut() {
writer.set_output(Err(Error::Fsync(true, msg.to_owned())));
}
}
Err(Error::Fsync(false, msg)) => {
// TODO(MrCroxx): mitigate the influence?
panic!("Untriable fsync error: {}", msg);
}
Err(e) => return Err(e),
if let Err(e) = self.pipe_log.sync(LogQueue::Append, sync) {
panic!(
"Cannot sync queue: {:?}, for there is an IO error raised: {}",
LogQueue::Append,
e
);
}
}
writer.finish()?
Expand All @@ -175,7 +164,7 @@ where
/// Synchronize the Raft engine.
pub fn sync(&self) -> Result<()> {
// TODO(tabokie): use writer.
self.pipe_log.sync(LogQueue::Append)
self.pipe_log.sync(LogQueue::Append, true)
}

pub fn put_message<S: Message>(&self, region_id: u64, key: &[u8], m: &S) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ pub enum Error {
Corruption(String),
#[error("IO Error: {0}")]
Io(#[from] IoError),
#[error("Fsync Error: retriable: {0}, reason: {1}")]
Fsync(bool, String),
#[error("Fsync Error: reason: {0}")]
Severe(String),
#[error("Codec Error: {0}")]
Codec(#[from] CodecError),
#[error("Protobuf Error: {0}")]
Expand Down
148 changes: 46 additions & 102 deletions src/file_pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,6 @@ impl<W: Seek + Write> ActiveFile<W> {
Ok(())
}

/// rockback to last synced position
fn rollback(&mut self) -> Result<()> {
self.writer
.seek(std::io::SeekFrom::Start(self.last_sync as u64))?;
self.written = self.last_sync;
Ok(())
}

fn write_header(&mut self) -> Result<()> {
self.writer.seek(std::io::SeekFrom::Start(0))?;
self.written = 0;
Expand Down Expand Up @@ -384,10 +376,6 @@ impl<B: FileBuilder> LogManager<B> {
Ok(self.all_files[(file_seq - self.first_file_seq) as usize].clone())
}

fn get_active_fd(&self) -> Option<Arc<LogFd>> {
self.all_files.back().cloned()
}

fn purge_to(&mut self, file_seq: FileSeq) -> Result<usize> {
if file_seq > self.active_file_seq {
return Err(box_err!("Purge active or newer files"));
Expand All @@ -399,11 +387,7 @@ impl<B: FileBuilder> LogManager<B> {
Ok(end_offset)
}

fn append(
&mut self,
ctx: &mut FilePipeLogWriteContext,
content: &[u8],
) -> Result<FileBlockHandle> {
fn append(&mut self, content: &[u8]) -> Result<FileBlockHandle> {
let offset = self.active_file.written as u64;
self.active_file.write(content, self.rotate_size)?;
let handle = FileBlockHandle {
Expand All @@ -417,13 +401,6 @@ impl<B: FileBuilder> LogManager<B> {
for listener in &self.listeners {
listener.on_append_log_file(handle);
}
if self.active_file.written >= self.rotate_size {
ctx.syncable = Syncable::NeedRotate
} else if self.active_file.since_last_sync() >= self.bytes_per_sync
&& ctx.syncable == Syncable::DontNeed
{
ctx.syncable = Syncable::NeedSync
}
Ok(handle)
}

Expand All @@ -438,13 +415,6 @@ impl<B: FileBuilder> LogManager<B> {
(self.active_file_seq - self.first_file_seq) as usize * self.rotate_size
+ self.active_file.written
}

fn get_write_context(&self) -> FilePipeLogWriteContext {
FilePipeLogWriteContext {
synced: self.active_file.written == self.active_file.last_sync,
syncable: Syncable::DontNeed,
}
}
}

pub trait ReplayMachine: Send + Default {
Expand Down Expand Up @@ -688,13 +658,8 @@ impl<B: FileBuilder> FilePipeLog<B> {
Ok(sequential_replay_machine)
}

fn append_bytes(
&self,
queue: LogQueue,
ctx: &mut FilePipeLogWriteContext,
content: &[u8],
) -> Result<FileBlockHandle> {
self.mut_queue(queue).append(ctx, content)
fn append_bytes(&self, queue: LogQueue, content: &[u8]) -> Result<FileBlockHandle> {
self.mut_queue(queue).append(content)
}

fn get_queue(&self, queue: LogQueue) -> RwLockReadGuard<LogManager<B>> {
Expand All @@ -712,22 +677,7 @@ impl<B: FileBuilder> FilePipeLog<B> {
}
}

#[derive(PartialEq)]
enum Syncable {
DontNeed,
NeedSync,
NeedRotate,
}

/// invariant: non-active file must be successfully synced.
pub struct FilePipeLogWriteContext {
synced: bool,
syncable: Syncable,
}

impl<B: FileBuilder> PipeLog for FilePipeLog<B> {
type WriteContext = FilePipeLogWriteContext;

fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>> {
let fd = self.get_queue(handle.id.queue).get_fd(handle.id.seq)?;
let mut reader = self
Expand All @@ -740,41 +690,9 @@ impl<B: FileBuilder> PipeLog for FilePipeLog<B> {
Ok(buf)
}

fn pre_write(&self, queue: LogQueue) -> Self::WriteContext {
self.get_queue(queue).get_write_context()
}

fn post_write(
&self,
queue: LogQueue,
mut ctx: Self::WriteContext,
force_sync: bool,
) -> Result<()> {
if ctx.syncable == Syncable::DontNeed && force_sync {
ctx.syncable = Syncable::NeedSync;
}
if let Err(e) = match ctx.syncable {
Syncable::DontNeed => Ok(()),
Syncable::NeedSync => self.mut_queue(queue).sync(),
Syncable::NeedRotate => self.mut_queue(queue).truncate_and_sync(),
} {
self.mut_queue(queue).active_file.rollback()?;
return Err(Error::Fsync(ctx.synced, e.to_string()));
}
if ctx.syncable == Syncable::NeedRotate {
self.mut_queue(queue).rotate()?;
}
Ok(())
}

fn append(
&self,
queue: LogQueue,
ctx: &mut Self::WriteContext,
bytes: &[u8],
) -> Result<FileBlockHandle> {
fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result<FileBlockHandle> {
let start = Instant::now();
let block_handle = self.append_bytes(queue, ctx, bytes)?;
let block_handle = self.append_bytes(queue, bytes)?;
match queue {
LogQueue::Rewrite => {
LOG_APPEND_TIME_HISTOGRAM_VEC
Expand All @@ -790,10 +708,40 @@ impl<B: FileBuilder> PipeLog for FilePipeLog<B> {
Ok(block_handle)
}

fn sync(&self, queue: LogQueue) -> Result<()> {
if let Some(fd) = self.get_queue(queue).get_active_fd() {
fd.sync()?;
fn sync(&self, queue: LogQueue, force: bool) -> Result<()> {
let mut manager = match queue {
LogQueue::Append => self.appender.write(),
LogQueue::Rewrite => self.rewriter.write(),
};

if manager.active_file.written >= manager.rotate_size {
// need rotate
if let Err(e) = manager.truncate_and_sync() {
return Err(Error::Severe(format!(
"cannot truncate and sync queue: {:?} when rotating, for there is IO error: {}",
queue, e
)));
}
if let Err(e) = manager.rotate() {
return Err(Error::Severe(format!(
"cannot rotate queue: {:?}, for there is IO error raised: {}",
queue, e
)));
}
} else if manager.active_file.since_last_sync() >= manager.bytes_per_sync || force {
// need_sync
if let Err(e) = manager.sync() {
let truncated = match manager.truncate_and_sync() {
Ok(()) => true,
Err(_) => false,
};
return Err(Error::Severe(format!(
"IO error raised when syncing log: {}, log file truncated: {}",
e, truncated
)));
}
}

Ok(())
}

Expand Down Expand Up @@ -938,16 +886,14 @@ mod tests {

// generate file 1, 2, 3
let content: Vec<u8> = vec![b'a'; 1024];
let mut ctx = pipe_log.pre_write(queue);
let file_handle = pipe_log.append_bytes(queue, &mut ctx, &content).unwrap();
pipe_log.post_write(queue, ctx, false).unwrap();
let file_handle = pipe_log.append_bytes(queue, &content).unwrap();
pipe_log.sync(queue, false).unwrap();
assert_eq!(file_handle.id.seq, 1);
assert_eq!(file_handle.offset, header_size);
assert_eq!(pipe_log.file_span(queue).1, 2);

let mut ctx = pipe_log.pre_write(queue);
let file_handle = pipe_log.append_bytes(queue, &mut ctx, &content).unwrap();
pipe_log.post_write(queue, ctx, false).unwrap();
let file_handle = pipe_log.append_bytes(queue, &content).unwrap();
pipe_log.sync(queue, false).unwrap();
assert_eq!(file_handle.id.seq, 2);
assert_eq!(file_handle.offset, header_size);
assert_eq!(pipe_log.file_span(queue).1, 3);
Expand All @@ -961,15 +907,13 @@ mod tests {

// append position
let s_content = b"short content".to_vec();
let mut ctx = pipe_log.pre_write(queue);
let file_handle = pipe_log.append_bytes(queue, &mut ctx, &s_content).unwrap();
pipe_log.post_write(queue, ctx, false).unwrap();
let file_handle = pipe_log.append_bytes(queue, &s_content).unwrap();
pipe_log.sync(queue, false).unwrap();
assert_eq!(file_handle.id.seq, 3);
assert_eq!(file_handle.offset, header_size);

let mut ctx = pipe_log.pre_write(queue);
let file_handle = pipe_log.append_bytes(queue, &mut ctx, &s_content).unwrap();
pipe_log.post_write(queue, ctx, false).unwrap();
let file_handle = pipe_log.append_bytes(queue, &s_content).unwrap();
pipe_log.sync(queue, false).unwrap();
assert_eq!(file_handle.id.seq, 3);
assert_eq!(
file_handle.offset,
Expand Down
19 changes: 4 additions & 15 deletions src/pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,14 @@ impl FileBlockHandle {
}

pub trait PipeLog: Sized {
type WriteContext;

/// Read some bytes from the given position.
fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>;

fn pre_write(&self, queue: LogQueue) -> Self::WriteContext;

fn post_write(&self, queue: LogQueue, ctx: Self::WriteContext, force_sync: bool) -> Result<()>;

/// Write a batch into the append queue.
fn append(
&self,
queue: LogQueue,
ctx: &mut Self::WriteContext,
bytes: &[u8],
) -> Result<FileBlockHandle>;

/// Sync the given queue.
fn sync(&self, queue: LogQueue) -> Result<()>;
fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result<FileBlockHandle>;

/// Sync and rotate the given queue if needed.
fn sync(&self, queue: LogQueue, force: bool) -> Result<()>;

fn file_span(&self, queue: LogQueue) -> (FileSeq, FileSeq);

Expand Down
15 changes: 5 additions & 10 deletions src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,12 @@ where
) -> Result<()> {
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
if len == 0 {
if sync {
self.pipe_log.sync(LogQueue::Rewrite)?;
}
return Ok(());
return self.pipe_log.sync(LogQueue::Rewrite, sync);
}

let mut ctx = self.pipe_log.pre_write(LogQueue::Rewrite);
let file_handle =
self.pipe_log
.append(LogQueue::Rewrite, &mut ctx, log_batch.encoded_bytes())?;
self.pipe_log.post_write(LogQueue::Rewrite, ctx, sync)?;
let file_handle = self
.pipe_log
.append(LogQueue::Rewrite, log_batch.encoded_bytes())?;
self.pipe_log.sync(LogQueue::Rewrite, sync)?;
log_batch.finish_write(file_handle);
for item in log_batch.drain() {
let raft = item.raft_group_id;
Expand Down

0 comments on commit 9b07cbd

Please sign in to comment.