Skip to content

Commit

Permalink
Merge pull request #15043 from ptabor/20221223-log-diagnostics
Browse files Browse the repository at this point in the history
etcd-dump-logs: Expand to allow diagnosing CRC corrupted problems in WAL log
  • Loading branch information
ptabor authored Dec 30, 2022
2 parents bcd5b54 + 007858d commit 4ae4d9f
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 178 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ hack/tls-setup/certs
/tools/proto-annotations/proto-annotations
/tools/benchmark/benchmark
/out
/etcd-dump-logs
60 changes: 46 additions & 14 deletions server/storage/wal/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,49 @@ const minSectorSize = 512
// frameSizeBytes is frame size in bytes, including record size and padding size.
const frameSizeBytes = 8

type Decoder interface {
Decode(rec *walpb.Record) error
LastOffset() int64
LastCRC() uint32
UpdateCRC(prevCrc uint32)
}

type decoder struct {
mu sync.Mutex
brs []*fileutil.FileBufReader

// lastValidOff file offset following the last valid decoded record
lastValidOff int64
crc hash.Hash32

// continueOnCrcError - causes the decoder to continue working even in case of crc mismatch.
// This is a desired mode for tools performing inspection of the corrupted WAL logs.
// See comments on 'Decode' method for semantic.
continueOnCrcError bool
}

func newDecoder(r ...fileutil.FileReader) *decoder {
func NewDecoderAdvanced(continueOnCrcError bool, r ...fileutil.FileReader) Decoder {
readers := make([]*fileutil.FileBufReader, len(r))
for i := range r {
readers[i] = fileutil.NewFileBufReader(r[i])
}
return &decoder{
brs: readers,
crc: crc.New(0, crcTable),
brs: readers,
crc: crc.New(0, crcTable),
continueOnCrcError: continueOnCrcError,
}
}

func (d *decoder) decode(rec *walpb.Record) error {
func NewDecoder(r ...fileutil.FileReader) Decoder {
return NewDecoderAdvanced(false, r...)
}

// Decode reads the next record out of the file.
// In the success path, fills 'rec' and returns nil.
// When it fails, it returns err and usually resets 'rec' to the defaults.
// When continueOnCrcError is set, the method may return ErrUnexpectedEOF or ErrCRCMismatch, but preserve the read
// (potentially corrupted) record content.
func (d *decoder) Decode(rec *walpb.Record) error {
rec.Reset()
d.mu.Lock()
defer d.mu.Unlock()
Expand Down Expand Up @@ -104,14 +126,24 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
return err
}

// skip crc checking if the record type is crcType
if rec.Type != crcType {
d.crc.Write(rec.Data)
// skip crc checking if the record type is CrcType
if rec.Type != CrcType {
_, err := d.crc.Write(rec.Data)
if err != nil {
return err
}
if err := rec.Validate(d.crc.Sum32()); err != nil {
if !d.continueOnCrcError {
rec.Reset()
} else {
// If we continue, we want to update lastValidOff, such that following errors are consistent
defer func() { d.lastValidOff += frameSizeBytes + recBytes + padBytes }()
}

if d.isTornEntry(data) {
return io.ErrUnexpectedEOF
return fmt.Errorf("%w: in file '%s' at position: %d", io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), d.lastValidOff)
}
return err
return fmt.Errorf("%w: in file '%s' at position: %d", err, fileBufReader.FileInfo().Name(), d.lastValidOff)
}
}
// record decoded as valid; point last valid offset to end of record
Expand Down Expand Up @@ -167,23 +199,23 @@ func (d *decoder) isTornEntry(data []byte) bool {
return false
}

func (d *decoder) updateCRC(prevCrc uint32) {
func (d *decoder) UpdateCRC(prevCrc uint32) {
d.crc = crc.New(prevCrc, crcTable)
}

func (d *decoder) lastCRC() uint32 {
func (d *decoder) LastCRC() uint32 {
return d.crc.Sum32()
}

func (d *decoder) lastOffset() int64 { return d.lastValidOff }
func (d *decoder) LastOffset() int64 { return d.lastValidOff }

func mustUnmarshalEntry(d []byte) raftpb.Entry {
func MustUnmarshalEntry(d []byte) raftpb.Entry {
var e raftpb.Entry
pbutil.MustUnmarshal(&e, d)
return e
}

func mustUnmarshalState(d []byte) raftpb.HardState {
func MustUnmarshalState(d []byte) raftpb.HardState {
var s raftpb.HardState
pbutil.MustUnmarshal(&s, d)
return s
Expand Down
8 changes: 4 additions & 4 deletions server/storage/wal/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func TestReadRecord(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
decoder := newDecoder(fileutil.NewFileReader(f))
e := decoder.decode(rec)
decoder := NewDecoder(fileutil.NewFileReader(f))
e := decoder.Decode(rec)
if !reflect.DeepEqual(rec, tt.wr) {
t.Errorf("#%d: block = %v, want %v", i, rec, tt.wr)
}
Expand All @@ -81,8 +81,8 @@ func TestWriteRecord(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
decoder := newDecoder(fileutil.NewFileReader(f))
err = decoder.decode(b)
decoder := NewDecoder(fileutil.NewFileReader(f))
err = decoder.Decode(b)
if err != nil {
t.Errorf("err = %v, want nil", err)
}
Expand Down
21 changes: 11 additions & 10 deletions server/storage/wal/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package wal

import (
"errors"
"io"
"os"
"path/filepath"
Expand All @@ -41,30 +42,30 @@ func Repair(lg *zap.Logger, dirpath string) bool {
lg.Info("repairing", zap.String("path", f.Name()))

rec := &walpb.Record{}
decoder := newDecoder(fileutil.NewFileReader(f.File))
decoder := NewDecoder(fileutil.NewFileReader(f.File))
for {
lastOffset := decoder.lastOffset()
err := decoder.decode(rec)
switch err {
case nil:
lastOffset := decoder.LastOffset()
err := decoder.Decode(rec)
switch {
case err == nil:
// update crc of the decoder when necessary
switch rec.Type {
case crcType:
crc := decoder.crc.Sum32()
case CrcType:
crc := decoder.LastCRC()
// current crc of decoder must match the crc of the record.
// do no need to match 0 crc, since the decoder is a new one at this case.
if crc != 0 && rec.Validate(crc) != nil {
return false
}
decoder.updateCRC(rec.Crc)
decoder.UpdateCRC(rec.Crc)
}
continue

case io.EOF:
case errors.Is(err, io.EOF):
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
return true

case io.ErrUnexpectedEOF:
case errors.Is(err, io.ErrUnexpectedEOF):
brokenName := f.Name() + ".broken"
bf, bferr := os.Create(brokenName)
if bferr != nil {
Expand Down
79 changes: 27 additions & 52 deletions server/storage/wal/repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/server/v3/storage/wal/walpb"
Expand All @@ -43,86 +45,59 @@ func TestRepairTruncate(t *testing.T) {
}

func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expectedEnts int) {
lg := zaptest.NewLogger(t)
p := t.TempDir()

// create WAL
w, err := Create(zaptest.NewLogger(t), p, nil)
w, err := Create(lg, p, nil)
defer func() {
if err = w.Close(); err != nil {
t.Fatal(err)
}
// The Close might fail.
_ = w.Close()
}()
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

for _, es := range ents {
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
require.NoError(t, w.Save(raftpb.HardState{}, es))
}

offset, err := w.tail().Seek(0, io.SeekCurrent)
if err != nil {
t.Fatal(err)
}
w.Close()
require.NoError(t, err)
require.NoError(t, w.Close())

err = corrupt(p, offset)
if err != nil {
t.Fatal(err)
}
require.NoError(t, corrupt(p, offset))

// verify we broke the wal
w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

_, _, _, err = w.ReadAll()
if err != io.ErrUnexpectedEOF {
t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF)
}
w.Close()
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
require.NoError(t, w.Close())

// repair the wal
if ok := Repair(zaptest.NewLogger(t), p); !ok {
t.Fatalf("'Repair' returned '%v', want 'true'", ok)
}
require.True(t, Repair(lg, p), "'Repair' returned 'false', want 'true'")

// read it back
w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
w, err = Open(lg, p, walpb.Snapshot{})
require.NoError(t, err)

_, _, walEnts, err := w.ReadAll()
if err != nil {
t.Fatal(err)
}
if len(walEnts) != expectedEnts {
t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts)
}
require.NoError(t, err)
assert.Len(t, walEnts, expectedEnts)

// write some more entries to repaired log
for i := 1; i <= 10; i++ {
es := []raftpb.Entry{{Index: uint64(expectedEnts + i)}}
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
require.NoError(t, w.Save(raftpb.HardState{}, es))
}
w.Close()
require.NoError(t, w.Close())

// read back entries following repair, ensure it's all there
w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
w, err = Open(lg, p, walpb.Snapshot{})
require.NoError(t, err)
_, _, walEnts, err = w.ReadAll()
if err != nil {
t.Fatal(err)
}
if len(walEnts) != expectedEnts+10 {
t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts+10)
}
require.NoError(t, err)
assert.Len(t, walEnts, expectedEnts+10)
}

func makeEnts(ents int) (ret [][]raftpb.Entry) {
Expand Down
Loading

0 comments on commit 4ae4d9f

Please sign in to comment.