From 498619bdda58a0c0a4c69fa738c36affdf03a397 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 23 Dec 2022 11:41:18 +0100 Subject: [PATCH 1/8] wal decoder: report file & offset in case of CRC mismatch. Signed-off-by: Piotr Tabor --- server/storage/wal/decoder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/storage/wal/decoder.go b/server/storage/wal/decoder.go index dde15ed079f..e1864cdcac9 100644 --- a/server/storage/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -109,9 +109,9 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error { d.crc.Write(rec.Data) if err := rec.Validate(d.crc.Sum32()); err != nil { if d.isTornEntry(data) { - return io.ErrUnexpectedEOF + return fmt.Errorf("%w: in file '%s' at position: %d", io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), d.lastValidOff) } - return err + return fmt.Errorf("%w: in file '%s' at position: %d", err, fileBufReader.FileInfo().Name(), d.lastValidOff) } } // record decoded as valid; point last valid offset to end of record From bee2a08968d9cc5c791095c694d2c51263b0a8c4 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 23 Dec 2022 12:48:24 +0100 Subject: [PATCH 2/8] wal decoding: Add optional mode to ignore CRC errors. Signed-off-by: Piotr Tabor --- server/storage/wal/decoder.go | 17 +++++++ server/storage/wal/repair.go | 9 ++-- server/storage/wal/repair_test.go | 79 ++++++++++-------------------- server/storage/wal/walpb/record.go | 8 +-- 4 files changed, 54 insertions(+), 59 deletions(-) diff --git a/server/storage/wal/decoder.go b/server/storage/wal/decoder.go index e1864cdcac9..11fb3985fb5 100644 --- a/server/storage/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -40,6 +40,11 @@ type decoder struct { // lastValidOff file offset following the last valid decoded record lastValidOff int64 crc hash.Hash32 + + // continueOnCrcError - causes the decoder to continue working even in case of crc mismatch. + // This is a desired mode for tools performing inspection of the corrupted WAL logs. + // See comments on 'decode' method for semantic. + continueOnCrcError bool } func newDecoder(r ...fileutil.FileReader) *decoder { @@ -53,6 +58,11 @@ func newDecoder(r ...fileutil.FileReader) *decoder { } } +// decode reads the next record out of the file. +// In the success path, fills 'rec' and returns nil. +// When it fails, it returns err and usually resets 'rec' to the defaults. +// When continueOnCrcError is set, the method may return ErrUnexpectedEOF or ErrCRCMismatch, but preserve the read +// (potentially corrupted) record content. func (d *decoder) decode(rec *walpb.Record) error { rec.Reset() d.mu.Lock() @@ -108,6 +118,13 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error { if rec.Type != crcType { d.crc.Write(rec.Data) if err := rec.Validate(d.crc.Sum32()); err != nil { + if !d.continueOnCrcError { + rec.Reset() + } else { + // If we continue, we want to update lastValidOff, such that following errors are consistent + defer func() { d.lastValidOff += frameSizeBytes + recBytes + padBytes }() + } + if d.isTornEntry(data) { return fmt.Errorf("%w: in file '%s' at position: %d", io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), d.lastValidOff) } diff --git a/server/storage/wal/repair.go b/server/storage/wal/repair.go index e81ac8dddf5..0cb1be892b9 100644 --- a/server/storage/wal/repair.go +++ b/server/storage/wal/repair.go @@ -15,6 +15,7 @@ package wal import ( + "errors" "io" "os" "path/filepath" @@ -45,8 +46,8 @@ func Repair(lg *zap.Logger, dirpath string) bool { for { lastOffset := decoder.lastOffset() err := decoder.decode(rec) - switch err { - case nil: + switch { + case err == nil: // update crc of the decoder when necessary switch rec.Type { case crcType: @@ -60,11 +61,11 @@ func Repair(lg *zap.Logger, dirpath string) bool { } continue - case io.EOF: + case errors.Is(err, io.EOF): lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF)) return true - case io.ErrUnexpectedEOF: + case errors.Is(err, io.ErrUnexpectedEOF): brokenName := f.Name() + ".broken" bf, bferr := os.Create(brokenName) if bferr != nil { diff --git a/server/storage/wal/repair_test.go b/server/storage/wal/repair_test.go index 864b56da659..0c5da51aa15 100644 --- a/server/storage/wal/repair_test.go +++ b/server/storage/wal/repair_test.go @@ -16,6 +16,8 @@ package wal import ( "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "io" "os" "testing" @@ -43,86 +45,59 @@ func TestRepairTruncate(t *testing.T) { } func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expectedEnts int) { + lg := zaptest.NewLogger(t) p := t.TempDir() // create WAL - w, err := Create(zaptest.NewLogger(t), p, nil) + w, err := Create(lg, p, nil) defer func() { - if err = w.Close(); err != nil { - t.Fatal(err) - } + // The Close might fail. + _ = w.Close() }() - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for _, es := range ents { - if err = w.Save(raftpb.HardState{}, es); err != nil { - t.Fatal(err) - } + assert.NoError(t, w.Save(raftpb.HardState{}, es)) } offset, err := w.tail().Seek(0, io.SeekCurrent) - if err != nil { - t.Fatal(err) - } - w.Close() + require.NoError(t, err) + require.NoError(t, w.Close()) - err = corrupt(p, offset) - if err != nil { - t.Fatal(err) - } + require.NoError(t, corrupt(p, offset)) // verify we broke the wal w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + _, _, _, err = w.ReadAll() - if err != io.ErrUnexpectedEOF { - t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF) - } - w.Close() + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + require.NoError(t, w.Close()) // repair the wal - if ok := Repair(zaptest.NewLogger(t), p); !ok { - t.Fatalf("'Repair' returned '%v', want 'true'", ok) - } + require.True(t, Repair(lg, p), "'Repair' returned 'false', want 'true'") // read it back - w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{}) - if err != nil { - t.Fatal(err) - } + w, err = Open(lg, p, walpb.Snapshot{}) + require.NoError(t, err) + _, _, walEnts, err := w.ReadAll() - if err != nil { - t.Fatal(err) - } - if len(walEnts) != expectedEnts { - t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts) - } + require.NoError(t, err) + assert.Len(t, walEnts, expectedEnts) // write some more entries to repaired log for i := 1; i <= 10; i++ { es := []raftpb.Entry{{Index: uint64(expectedEnts + i)}} - if err = w.Save(raftpb.HardState{}, es); err != nil { - t.Fatal(err) - } + require.NoError(t, w.Save(raftpb.HardState{}, es)) } - w.Close() + require.NoError(t, w.Close()) // read back entries following repair, ensure it's all there - w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{}) - if err != nil { - t.Fatal(err) - } + w, err = Open(lg, p, walpb.Snapshot{}) + require.NoError(t, err) _, _, walEnts, err = w.ReadAll() - if err != nil { - t.Fatal(err) - } - if len(walEnts) != expectedEnts+10 { - t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts+10) - } + require.NoError(t, err) + assert.Len(t, walEnts, expectedEnts+10) } func makeEnts(ents int) (ret [][]raftpb.Entry) { diff --git a/server/storage/wal/walpb/record.go b/server/storage/wal/walpb/record.go index e2070fbba3b..693deab113d 100644 --- a/server/storage/wal/walpb/record.go +++ b/server/storage/wal/walpb/record.go @@ -14,7 +14,10 @@ package walpb -import "errors" +import ( + "errors" + "fmt" +) var ( ErrCRCMismatch = errors.New("walpb: crc mismatch") @@ -24,8 +27,7 @@ func (rec *Record) Validate(crc uint32) error { if rec.Crc == crc { return nil } - rec.Reset() - return ErrCRCMismatch + return fmt.Errorf("%w: expected: %x computed: %x", ErrCRCMismatch, rec.Crc, crc) } // ValidateSnapshotForWrite ensures the Snapshot the newly written snapshot is valid. From 0d8aad54bac0b3b4439279a661864bd3638c4ebe Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 23 Dec 2022 12:59:06 +0100 Subject: [PATCH 3/8] wal: Expose Decoder as package visible interface. Such that can be used by tools. Signed-off-by: Piotr Tabor --- server/storage/wal/decoder.go | 26 +++++++++++++++++-------- server/storage/wal/record_test.go | 8 ++++---- server/storage/wal/repair.go | 10 +++++----- server/storage/wal/wal.go | 32 +++++++++++++++---------------- server/storage/wal/wal_test.go | 2 +- 5 files changed, 44 insertions(+), 34 deletions(-) diff --git a/server/storage/wal/decoder.go b/server/storage/wal/decoder.go index 11fb3985fb5..cbe4b0821dd 100644 --- a/server/storage/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -33,6 +33,13 @@ const minSectorSize = 512 // frameSizeBytes is frame size in bytes, including record size and padding size. const frameSizeBytes = 8 +type Decoder interface { + Decode(rec *walpb.Record) error + LastOffset() int64 + LastCRC() uint32 + UpdateCRC(prevCrc uint32) +} + type decoder struct { mu sync.Mutex brs []*fileutil.FileBufReader @@ -43,11 +50,11 @@ type decoder struct { // continueOnCrcError - causes the decoder to continue working even in case of crc mismatch. // This is a desired mode for tools performing inspection of the corrupted WAL logs. - // See comments on 'decode' method for semantic. + // See comments on 'Decode' method for semantic. continueOnCrcError bool } -func newDecoder(r ...fileutil.FileReader) *decoder { +func NewDecoder(r ...fileutil.FileReader) Decoder { readers := make([]*fileutil.FileBufReader, len(r)) for i := range r { readers[i] = fileutil.NewFileBufReader(r[i]) @@ -58,12 +65,12 @@ func newDecoder(r ...fileutil.FileReader) *decoder { } } -// decode reads the next record out of the file. +// Decode reads the next record out of the file. // In the success path, fills 'rec' and returns nil. // When it fails, it returns err and usually resets 'rec' to the defaults. // When continueOnCrcError is set, the method may return ErrUnexpectedEOF or ErrCRCMismatch, but preserve the read // (potentially corrupted) record content. -func (d *decoder) decode(rec *walpb.Record) error { +func (d *decoder) Decode(rec *walpb.Record) error { rec.Reset() d.mu.Lock() defer d.mu.Unlock() @@ -116,7 +123,10 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error { // skip crc checking if the record type is crcType if rec.Type != crcType { - d.crc.Write(rec.Data) + _, err := d.crc.Write(rec.Data) + if err != nil { + return err + } if err := rec.Validate(d.crc.Sum32()); err != nil { if !d.continueOnCrcError { rec.Reset() @@ -184,15 +194,15 @@ func (d *decoder) isTornEntry(data []byte) bool { return false } -func (d *decoder) updateCRC(prevCrc uint32) { +func (d *decoder) UpdateCRC(prevCrc uint32) { d.crc = crc.New(prevCrc, crcTable) } -func (d *decoder) lastCRC() uint32 { +func (d *decoder) LastCRC() uint32 { return d.crc.Sum32() } -func (d *decoder) lastOffset() int64 { return d.lastValidOff } +func (d *decoder) LastOffset() int64 { return d.lastValidOff } func mustUnmarshalEntry(d []byte) raftpb.Entry { var e raftpb.Entry diff --git a/server/storage/wal/record_test.go b/server/storage/wal/record_test.go index 0a01d6e6fdc..85ceebed9c1 100644 --- a/server/storage/wal/record_test.go +++ b/server/storage/wal/record_test.go @@ -57,8 +57,8 @@ func TestReadRecord(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %v", err) } - decoder := newDecoder(fileutil.NewFileReader(f)) - e := decoder.decode(rec) + decoder := NewDecoder(fileutil.NewFileReader(f)) + e := decoder.Decode(rec) if !reflect.DeepEqual(rec, tt.wr) { t.Errorf("#%d: block = %v, want %v", i, rec, tt.wr) } @@ -81,8 +81,8 @@ func TestWriteRecord(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %v", err) } - decoder := newDecoder(fileutil.NewFileReader(f)) - err = decoder.decode(b) + decoder := NewDecoder(fileutil.NewFileReader(f)) + err = decoder.Decode(b) if err != nil { t.Errorf("err = %v, want nil", err) } diff --git a/server/storage/wal/repair.go b/server/storage/wal/repair.go index 0cb1be892b9..bca72f8aebb 100644 --- a/server/storage/wal/repair.go +++ b/server/storage/wal/repair.go @@ -42,22 +42,22 @@ func Repair(lg *zap.Logger, dirpath string) bool { lg.Info("repairing", zap.String("path", f.Name())) rec := &walpb.Record{} - decoder := newDecoder(fileutil.NewFileReader(f.File)) + decoder := NewDecoder(fileutil.NewFileReader(f.File)) for { - lastOffset := decoder.lastOffset() - err := decoder.decode(rec) + lastOffset := decoder.LastOffset() + err := decoder.Decode(rec) switch { case err == nil: // update crc of the decoder when necessary switch rec.Type { case crcType: - crc := decoder.crc.Sum32() + crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. if crc != 0 && rec.Validate(crc) != nil { return false } - decoder.updateCRC(rec.Crc) + decoder.UpdateCRC(rec.Crc) } continue diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index 145a1a3059a..5c15ebfa66d 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -81,8 +81,8 @@ type WAL struct { state raftpb.HardState // hardstate recorded at the head of WAL start walpb.Snapshot // snapshot to start reading - decoder *decoder // decoder to decode records - readClose func() error // closer for decode reader + decoder Decoder // decoder to Decode records + readClose func() error // closer for Decode reader unsafeNoSync bool // if set, do not fsync @@ -351,7 +351,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool lg: lg, dir: dirpath, start: snap, - decoder: newDecoder(rs...), + decoder: NewDecoder(rs...), readClose: closer, locks: ls, } @@ -452,7 +452,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. decoder := w.decoder var match bool - for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { case entryType: e := mustUnmarshalEntry(rec.Data) @@ -480,14 +480,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. metadata = rec.Data case crcType: - crc := decoder.crc.Sum32() + crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. if crc != 0 && rec.Validate(crc) != nil { state.Reset() return nil, state, nil, ErrCRCMismatch } - decoder.updateCRC(rec.Crc) + decoder.UpdateCRC(rec.Crc) case snapshotType: var snap walpb.Snapshot @@ -527,7 +527,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. // not all, will cause CRC errors on WAL open. Since the records // were never fully synced to disk in the first place, it's safe // to zero them out to avoid any CRC errors from new writes. - if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil { + if _, err = w.tail().Seek(w.decoder.LastOffset(), io.SeekStart); err != nil { return nil, state, nil, err } if err = fileutil.ZeroToEnd(w.tail().File); err != nil { @@ -551,7 +551,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. if w.tail() != nil { // create encoder (chain crc with the decoder), enable appending - w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC()) + w.encoder, err = newFileEncoder(w.tail().File, w.decoder.LastCRC()) if err != nil { return } @@ -587,9 +587,9 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro }() // create a new decoder from the readers on the WAL files - decoder := newDecoder(rs...) + decoder := NewDecoder(rs...) - for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { case snapshotType: var loadedSnap walpb.Snapshot @@ -598,13 +598,13 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro case stateType: state = mustUnmarshalState(rec.Data) case crcType: - crc := decoder.crc.Sum32() + crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. if crc != 0 && rec.Validate(crc) != nil { return nil, ErrCRCMismatch } - decoder.updateCRC(rec.Crc) + decoder.UpdateCRC(rec.Crc) } } // We do not have to read out all the WAL entries @@ -661,9 +661,9 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta }() // create a new decoder from the readers on the WAL files - decoder := newDecoder(rs...) + decoder := NewDecoder(rs...) - for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { case metadataType: if metadata != nil && !bytes.Equal(metadata, rec.Data) { @@ -671,13 +671,13 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta } metadata = rec.Data case crcType: - crc := decoder.crc.Sum32() + crc := decoder.LastCRC() // Current crc of decoder must match the crc of the record. // We need not match 0 crc, since the decoder is a new one at this point. if crc != 0 && rec.Validate(crc) != nil { return nil, ErrCRCMismatch } - decoder.updateCRC(rec.Crc) + decoder.UpdateCRC(rec.Crc) case snapshotType: var loadedSnap walpb.Snapshot pbutil.MustUnmarshal(&loadedSnap, rec.Data) diff --git a/server/storage/wal/wal_test.go b/server/storage/wal/wal_test.go index 5a8ad0dc462..70883ba3be7 100644 --- a/server/storage/wal/wal_test.go +++ b/server/storage/wal/wal_test.go @@ -302,7 +302,7 @@ func TestCut(t *testing.T) { } defer f.Close() nw := &WAL{ - decoder: newDecoder(fileutil.NewFileReader(f)), + decoder: NewDecoder(fileutil.NewFileReader(f)), start: snap, } _, gst, _, err := nw.ReadAll() From 58681d3feb4b04ba835eefa265397c36b80de2e2 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 23 Dec 2022 14:09:21 +0100 Subject: [PATCH 4/8] Expose types of entries within the WAL log for access from the tools. Signed-off-by: Piotr Tabor --- server/storage/wal/decoder.go | 8 ++--- server/storage/wal/repair.go | 2 +- server/storage/wal/wal.go | 56 +++++++++++++++++----------------- server/storage/wal/wal_test.go | 6 ++-- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/server/storage/wal/decoder.go b/server/storage/wal/decoder.go index cbe4b0821dd..1010fb0745e 100644 --- a/server/storage/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -121,8 +121,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error { return err } - // skip crc checking if the record type is crcType - if rec.Type != crcType { + // skip crc checking if the record type is CrcType + if rec.Type != CrcType { _, err := d.crc.Write(rec.Data) if err != nil { return err @@ -204,13 +204,13 @@ func (d *decoder) LastCRC() uint32 { func (d *decoder) LastOffset() int64 { return d.lastValidOff } -func mustUnmarshalEntry(d []byte) raftpb.Entry { +func MustUnmarshalEntry(d []byte) raftpb.Entry { var e raftpb.Entry pbutil.MustUnmarshal(&e, d) return e } -func mustUnmarshalState(d []byte) raftpb.HardState { +func MustUnmarshalState(d []byte) raftpb.HardState { var s raftpb.HardState pbutil.MustUnmarshal(&s, d) return s diff --git a/server/storage/wal/repair.go b/server/storage/wal/repair.go index bca72f8aebb..53734045167 100644 --- a/server/storage/wal/repair.go +++ b/server/storage/wal/repair.go @@ -50,7 +50,7 @@ func Repair(lg *zap.Logger, dirpath string) bool { case err == nil: // update crc of the decoder when necessary switch rec.Type { - case crcType: + case CrcType: crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index 5c15ebfa66d..e913c76c13a 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -36,11 +36,11 @@ import ( ) const ( - metadataType int64 = iota + 1 - entryType - stateType - crcType - snapshotType + MetadataType int64 = iota + 1 + EntryType + StateType + CrcType + SnapshotType // warnSyncDuration is the amount of time allotted to an fsync before // logging a warning @@ -56,7 +56,7 @@ var ( ErrMetadataConflict = errors.New("wal: conflicting metadata found") ErrFileNotFound = errors.New("wal: file not found") - ErrCRCMismatch = errors.New("wal: crc mismatch") + ErrCRCMismatch = walpb.ErrCRCMismatch ErrSnapshotMismatch = errors.New("wal: snapshot mismatch") ErrSnapshotNotFound = errors.New("wal: snapshot not found") ErrSliceOutOfRange = errors.New("wal: slice bounds out of range") @@ -166,7 +166,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { if err = w.saveCrc(0); err != nil { return nil, err } - if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { + if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: metadata}); err != nil { return nil, err } if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { @@ -454,8 +454,8 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. var match bool for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { - case entryType: - e := mustUnmarshalEntry(rec.Data) + case EntryType: + e := MustUnmarshalEntry(rec.Data) // 0 <= e.Index-w.start.Index - 1 < len(ents) if e.Index > w.start.Index { // prevent "panic: runtime error: slice bounds out of range [:13038096702221461992] with capacity 0" @@ -469,17 +469,17 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. } w.enti = e.Index - case stateType: - state = mustUnmarshalState(rec.Data) + case StateType: + state = MustUnmarshalState(rec.Data) - case metadataType: + case MetadataType: if metadata != nil && !bytes.Equal(metadata, rec.Data) { state.Reset() return nil, state, nil, ErrMetadataConflict } metadata = rec.Data - case crcType: + case CrcType: crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. @@ -489,7 +489,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. } decoder.UpdateCRC(rec.Crc) - case snapshotType: + case SnapshotType: var snap walpb.Snapshot pbutil.MustUnmarshal(&snap, rec.Data) if snap.Index == w.start.Index { @@ -591,13 +591,13 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { - case snapshotType: + case SnapshotType: var loadedSnap walpb.Snapshot pbutil.MustUnmarshal(&loadedSnap, rec.Data) snaps = append(snaps, loadedSnap) - case stateType: - state = mustUnmarshalState(rec.Data) - case crcType: + case StateType: + state = MustUnmarshalState(rec.Data) + case CrcType: crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. @@ -665,12 +665,12 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { - case metadataType: + case MetadataType: if metadata != nil && !bytes.Equal(metadata, rec.Data) { return nil, ErrMetadataConflict } metadata = rec.Data - case crcType: + case CrcType: crc := decoder.LastCRC() // Current crc of decoder must match the crc of the record. // We need not match 0 crc, since the decoder is a new one at this point. @@ -678,7 +678,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta return nil, ErrCRCMismatch } decoder.UpdateCRC(rec.Crc) - case snapshotType: + case SnapshotType: var loadedSnap walpb.Snapshot pbutil.MustUnmarshal(&loadedSnap, rec.Data) if loadedSnap.Index == snap.Index { @@ -689,8 +689,8 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta } // We ignore all entry and state type records as these // are not necessary for validating the WAL contents - case entryType: - case stateType: + case EntryType: + case StateType: pbutil.MustUnmarshal(&state, rec.Data) default: return nil, fmt.Errorf("unexpected block type %d", rec.Type) @@ -748,7 +748,7 @@ func (w *WAL) cut() error { return err } - if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil { + if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: w.metadata}); err != nil { return err } @@ -905,7 +905,7 @@ func (w *WAL) Close() error { func (w *WAL) saveEntry(e *raftpb.Entry) error { // TODO: add MustMarshalTo to reduce one allocation. b := pbutil.MustMarshal(e) - rec := &walpb.Record{Type: entryType, Data: b} + rec := &walpb.Record{Type: EntryType, Data: b} if err := w.encoder.encode(rec); err != nil { return err } @@ -919,7 +919,7 @@ func (w *WAL) saveState(s *raftpb.HardState) error { } w.state = *s b := pbutil.MustMarshal(s) - rec := &walpb.Record{Type: stateType, Data: b} + rec := &walpb.Record{Type: StateType, Data: b} return w.encoder.encode(rec) } @@ -968,7 +968,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { w.mu.Lock() defer w.mu.Unlock() - rec := &walpb.Record{Type: snapshotType, Data: b} + rec := &walpb.Record{Type: SnapshotType, Data: b} if err := w.encoder.encode(rec); err != nil { return err } @@ -980,7 +980,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { } func (w *WAL) saveCrc(prevCrc uint32) error { - return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) + return w.encoder.encode(&walpb.Record{Type: CrcType, Crc: prevCrc}) } func (w *WAL) tail() *fileutil.LockedFile { diff --git a/server/storage/wal/wal_test.go b/server/storage/wal/wal_test.go index 70883ba3be7..1063ed4381a 100644 --- a/server/storage/wal/wal_test.go +++ b/server/storage/wal/wal_test.go @@ -74,16 +74,16 @@ func TestNew(t *testing.T) { var wb bytes.Buffer e := newEncoder(&wb, 0, 0) - err = e.encode(&walpb.Record{Type: crcType, Crc: 0}) + err = e.encode(&walpb.Record{Type: CrcType, Crc: 0}) if err != nil { t.Fatalf("err = %v, want nil", err) } - err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")}) + err = e.encode(&walpb.Record{Type: MetadataType, Data: []byte("somedata")}) if err != nil { t.Fatalf("err = %v, want nil", err) } r := &walpb.Record{ - Type: snapshotType, + Type: SnapshotType, Data: pbutil.MustMarshal(&walpb.Snapshot{}), } if err = e.encode(r); err != nil { From e571fb7baacfd838a53d9f3dd04f215bd0357451 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 23 Dec 2022 16:17:06 +0100 Subject: [PATCH 5/8] Add --raw mode to ./etcd-dump-log This mode allows to look at RAW protos for all entries in WAL logs in the given directory. Signed-off-by: Piotr Tabor --- server/storage/wal/decoder.go | 11 ++- server/storage/wal/repair_test.go | 4 +- tools/etcd-dump-logs/etcd-dump-log_test.go | 68 +++++++------ tools/etcd-dump-logs/main.go | 51 +++++++--- tools/etcd-dump-logs/raw.go | 107 +++++++++++++++++++++ tools/etcd-dump-logs/raw_test.go | 55 +++++++++++ 6 files changed, 244 insertions(+), 52 deletions(-) create mode 100644 tools/etcd-dump-logs/raw.go create mode 100644 tools/etcd-dump-logs/raw_test.go diff --git a/server/storage/wal/decoder.go b/server/storage/wal/decoder.go index 1010fb0745e..168c71cb6bd 100644 --- a/server/storage/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -54,17 +54,22 @@ type decoder struct { continueOnCrcError bool } -func NewDecoder(r ...fileutil.FileReader) Decoder { +func NewDecoderAdvanced(continueOnCrcError bool, r ...fileutil.FileReader) Decoder { readers := make([]*fileutil.FileBufReader, len(r)) for i := range r { readers[i] = fileutil.NewFileBufReader(r[i]) } return &decoder{ - brs: readers, - crc: crc.New(0, crcTable), + brs: readers, + crc: crc.New(0, crcTable), + continueOnCrcError: continueOnCrcError, } } +func NewDecoder(r ...fileutil.FileReader) Decoder { + return NewDecoderAdvanced(false, r...) +} + // Decode reads the next record out of the file. // In the success path, fills 'rec' and returns nil. // When it fails, it returns err and usually resets 'rec' to the defaults. diff --git a/server/storage/wal/repair_test.go b/server/storage/wal/repair_test.go index 0c5da51aa15..a6ec80e062d 100644 --- a/server/storage/wal/repair_test.go +++ b/server/storage/wal/repair_test.go @@ -16,12 +16,12 @@ package wal import ( "fmt" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "io" "os" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/server/v3/storage/wal/walpb" diff --git a/tools/etcd-dump-logs/etcd-dump-log_test.go b/tools/etcd-dump-logs/etcd-dump-log_test.go index 4fc35719742..ae79d3fea94 100644 --- a/tools/etcd-dump-logs/etcd-dump-log_test.go +++ b/tools/etcd-dump-logs/etcd-dump-log_test.go @@ -52,38 +52,7 @@ func TestEtcdDumpLogEntryType(t *testing.T) { p := t.TempDir() - memberdir := filepath.Join(p, "member") - err = os.Mkdir(memberdir, 0744) - if err != nil { - t.Fatal(err) - } - waldir := walDir(p) - snapdir := snapDir(p) - - w, err := wal.Create(zaptest.NewLogger(t), waldir, nil) - if err != nil { - t.Fatal(err) - } - - err = os.Mkdir(snapdir, 0744) - if err != nil { - t.Fatal(err) - } - - ents := make([]raftpb.Entry, 0) - - // append entries into wal log - appendConfigChangeEnts(&ents) - appendNormalRequestEnts(&ents) - appendNormalIRREnts(&ents) - appendUnknownNormalEnts(&ents) - - // force commit newly appended entries - err = w.Save(raftpb.HardState{}, ents) - if err != nil { - t.Fatal(err) - } - w.Close() + mustCreateWalLog(t, p) argtests := []struct { name string @@ -128,6 +97,41 @@ func TestEtcdDumpLogEntryType(t *testing.T) { } +func mustCreateWalLog(t *testing.T, path string) { + memberdir := filepath.Join(path, "member") + err := os.Mkdir(memberdir, 0744) + if err != nil { + t.Fatal(err) + } + waldir := walDir(path) + snapdir := snapDir(path) + + w, err := wal.Create(zaptest.NewLogger(t), waldir, nil) + if err != nil { + t.Fatal(err) + } + + err = os.Mkdir(snapdir, 0744) + if err != nil { + t.Fatal(err) + } + + ents := make([]raftpb.Entry, 0) + + // append entries into wal log + appendConfigChangeEnts(&ents) + appendNormalRequestEnts(&ents) + appendNormalIRREnts(&ents) + appendUnknownNormalEnts(&ents) + + // force commit newly appended entries + err = w.Save(raftpb.HardState{}, ents) + if err != nil { + t.Fatal(err) + } + w.Close() +} + func appendConfigChangeEnts(ents *[]raftpb.Entry) { configChangeData := []raftpb.ConfChange{ {ID: 1, Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: []byte("")}, diff --git a/tools/etcd-dump-logs/main.go b/tools/etcd-dump-logs/main.go index bdd7c36ce25..1e2f008aaf4 100644 --- a/tools/etcd-dump-logs/main.go +++ b/tools/etcd-dump-logs/main.go @@ -56,8 +56,10 @@ IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`) streamdecoder := flag.String("stream-decoder", "", `The name of an executable decoding tool, the executable must process hex encoded lines of binary input (from etcd-dump-logs) and output a hex encoded line of binary for each input line`) + raw := flag.Bool("raw", false, "Read the logs in the low-level form") flag.Parse() + lg := zap.NewExample() if len(flag.Args()) != 1 { log.Fatalf("Must provide data-dir argument (got %+v)", flag.Args()) @@ -68,6 +70,37 @@ and output a hex encoded line of binary for each input line`) log.Fatal("start-snap and start-index flags cannot be used together.") } + if !*raw { + ents := readUsingReadAll(lg, index, snapfile, dataDir, waldir) + + fmt.Printf("WAL entries: %d\n", len(ents)) + if len(ents) > 0 { + fmt.Printf("lastIndex=%d\n", ents[len(ents)-1].Index) + } + + fmt.Printf("%4s\t%10s\ttype\tdata", "term", "index") + if *streamdecoder != "" { + fmt.Print("\tdecoder_status\tdecoded_data") + } + fmt.Println() + + listEntriesType(*entrytype, *streamdecoder, ents) + } else { + if *snapfile != "" || + *entrytype != defaultEntryTypes || + *streamdecoder != "" { + log.Fatalf("Flags --entry-type, --stream-decoder, --entrytype not supported in the RAW mode.") + } + + wd := *waldir + if wd == "" { + wd = walDir(dataDir) + } + readRaw(lg, index, wd, os.Stdout) + } +} + +func readUsingReadAll(lg *zap.Logger, index *uint64, snapfile *string, dataDir string, waldir *string) []raftpb.Entry { var ( walsnap walpb.Snapshot snapshot *raftpb.Snapshot @@ -84,7 +117,7 @@ and output a hex encoded line of binary for each input line`) ss := snap.New(zap.NewExample(), snapDir(dataDir)) snapshot, err = ss.Load() } else { - snapshot, err = snap.Read(zap.NewExample(), filepath.Join(snapDir(dataDir), *snapfile)) + snapshot, err = snap.Read(lg, filepath.Join(snapDir(dataDir), *snapfile)) } switch err { @@ -123,19 +156,7 @@ and output a hex encoded line of binary for each input line`) vid := types.ID(state.Vote) fmt.Printf("WAL metadata:\nnodeID=%s clusterID=%s term=%d commitIndex=%d vote=%s\n", id, cid, state.Term, state.Commit, vid) - - fmt.Printf("WAL entries: %d\n", len(ents)) - if len(ents) > 0 { - fmt.Printf("lastIndex=%d\n", ents[len(ents)-1].Index) - } - - fmt.Printf("%4s\t%10s\ttype\tdata", "term", "index") - if *streamdecoder != "" { - fmt.Print("\tdecoder_status\tdecoded_data") - } - fmt.Println() - - listEntriesType(*entrytype, *streamdecoder, ents) + return ents } func walDir(dataDir string) string { return filepath.Join(dataDir, "member", "wal") } @@ -360,7 +381,7 @@ func listEntriesType(entrytype string, streamdecoder string, ents []raftpb.Entry printer(e) if streamdecoder == "" { fmt.Println() - continue + //continue } // if decoder is set, pass the e.Data to stdin and read the stdout from decoder diff --git a/tools/etcd-dump-logs/raw.go b/tools/etcd-dump-logs/raw.go new file mode 100644 index 00000000000..3df6de6d6b5 --- /dev/null +++ b/tools/etcd-dump-logs/raw.go @@ -0,0 +1,107 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + + "go.uber.org/zap" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/pkg/v3/fileutil" + "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/raft/v3/raftpb" +) + +func readRaw(lg *zap.Logger, fromIndex *uint64, waldir string, out io.Writer) { + var walReaders []fileutil.FileReader + files, err := ioutil.ReadDir(waldir) + if err != nil { + lg.Fatal("Failed to read directory.", zap.String("directory", waldir), zap.Error(err)) + } + for _, finfo := range files { + if filepath.Ext(finfo.Name()) != ".wal" { + lg.Warn("Ignoring not .wal file", zap.String("filename", finfo.Name())) + } + f, err := os.Open(filepath.Join(waldir, finfo.Name())) + if err != nil { + lg.Fatal("Failed to read file", zap.String("filename", finfo.Name()), zap.Error(err)) + } + walReaders = append(walReaders, fileutil.NewFileReader(f)) + } + decoder := wal.NewDecoderAdvanced(true, walReaders...) + // The variable is used to not pollute log with multiple continuous crc errors. + crcDesync := false + for { + rec := walpb.Record{} + err := decoder.Decode(&rec) + if err == nil || errors.Is(err, walpb.ErrCRCMismatch) { + if err != nil && !crcDesync { + lg.Warn("Reading entry failed with CRC error", zap.Error(err)) + crcDesync = true + } + printRec(lg, &rec, fromIndex, out) + if rec.Type == wal.CrcType { + decoder.UpdateCRC(rec.Crc) + crcDesync = false + } + continue + } + if errors.Is(err, io.EOF) { + lg.Info("EOF: All entries were processed") + break + } else { + lg.Error("Reading failed", zap.Error(err)) + break + } + } +} + +func printRec(lg *zap.Logger, rec *walpb.Record, fromIndex *uint64, out io.Writer) { + switch rec.Type { + case wal.MetadataType: + var metadata etcdserverpb.Metadata + pbutil.MustUnmarshal(&metadata, rec.Data) + fmt.Fprintf(out, "Metadata: %s\n", metadata.String()) + case wal.CrcType: + fmt.Fprintf(out, "CRC: %d\n", rec.Crc) + case wal.EntryType: + e := wal.MustUnmarshalEntry(rec.Data) + if fromIndex == nil || e.Index >= *fromIndex { + fmt.Fprintf(out, "Entry: %s\n", e.String()) + } + case wal.SnapshotType: + var snap walpb.Snapshot + pbutil.MustUnmarshal(&snap, rec.Data) + if fromIndex == nil || snap.Index >= *fromIndex { + fmt.Fprintf(out, "Snapshot: %s\n", snap.String()) + } + case wal.StateType: + var state raftpb.HardState + pbutil.MustUnmarshal(&state, rec.Data) + if fromIndex == nil || state.Commit >= *fromIndex { + fmt.Fprintf(out, "HardState: %s\n", state.String()) + } + default: + lg.Error("Unexpected WAL log type", zap.Int64("type", rec.Type)) + } +} diff --git a/tools/etcd-dump-logs/raw_test.go b/tools/etcd-dump-logs/raw_test.go new file mode 100644 index 00000000000..880b96d8b65 --- /dev/null +++ b/tools/etcd-dump-logs/raw_test.go @@ -0,0 +1,55 @@ +package main + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" +) + +func Test_readRaw(t *testing.T) { + path := t.TempDir() + mustCreateWalLog(t, path) + var out bytes.Buffer + readRaw(zaptest.NewLogger(t), nil, walDir(path), &out) + assert.Equal(t, + `CRC: 0 +Metadata: +Snapshot: +Entry: Term:1 Index:1 Type:EntryConfChange Data:"\010\001\020\000\030\002\"\000" +Entry: Term:2 Index:2 Type:EntryConfChange Data:"\010\002\020\001\030\002\"\000" +Entry: Term:2 Index:3 Type:EntryConfChange Data:"\010\003\020\002\030\002\"\000" +Entry: Term:2 Index:4 Type:EntryConfChange Data:"\010\004\020\003\030\003\"\000" +Entry: Term:3 Index:5 Data:"\010\000\022\000\032\006/path0\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0012\0008\000@\000H\tP\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000" +Entry: Term:3 Index:6 Data:"\010\001\022\004QGET\032\006/path1\"\023{\"0\":\"1\",\"2\":[\"3\"]}(\0002\0008\000@\000H\tP\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000" +Entry: Term:3 Index:7 Data:"\010\002\022\004SYNC\032\006/path2\"\023{\"0\":\"1\",\"2\":[\"3\"]}(\0002\0008\000@\000H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000" +Entry: Term:3 Index:8 Data:"\010\003\022\006DELETE\032\006/path3\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0002\0008\000@\001H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000" +Entry: Term:3 Index:9 Data:"\010\004\022\006RANDOM\032\246\001/path4/superlong/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0002\0008\000@\000H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000" +Entry: Term:4 Index:10 Data:"\010\005\032\025\n\0011\022\002hi\030\006 \001(\001X\240\234\001h\240\234\001" +Entry: Term:5 Index:11 Data:"\010\006\"\020\n\004foo1\022\004bar1\030\0010\001" +Entry: Term:6 Index:12 Data:"\010\007*\010\n\0010\022\0019\030\001" +Entry: Term:7 Index:13 Data:"\010\0102\024\022\010\032\006\n\001a\022\001b\032\010\032\006\n\001a\022\001b" +Entry: Term:8 Index:14 Data:"\010\t:\002\020\001" +Entry: Term:9 Index:15 Data:"\010\nB\004\010\001\020\001" +Entry: Term:10 Index:16 Data:"\010\013J\002\010\002" +Entry: Term:11 Index:17 Data:"\010\014R\006\010\003\020\004\030\005" +Entry: Term:12 Index:18 Data:"\010\r\302>\000" +Entry: Term:13 Index:19 Data:"\010\016\232?\000" +Entry: Term:14 Index:20 Data:"\010\017\242?\031\n\006myname\022\010password\032\005token" +Entry: Term:15 Index:21 Data:"\010\020\342D\020\n\005name1\022\005pass1\032\000" +Entry: Term:16 Index:22 Data:"\010\021\352D\007\n\005name1" +Entry: Term:17 Index:23 Data:"\010\022\362D\007\n\005name1" +Entry: Term:18 Index:24 Data:"\010\023\372D\016\n\005name1\022\005pass2" +Entry: Term:19 Index:25 Data:"\010\024\202E\016\n\005user1\022\005role1" +Entry: Term:20 Index:26 Data:"\010\025\212E\016\n\005user2\022\005role2" +Entry: Term:21 Index:27 Data:"\010\026\222E\000" +Entry: Term:22 Index:28 Data:"\010\027\232E\000" +Entry: Term:23 Index:29 Data:"\010\030\202K\007\n\005role2" +Entry: Term:24 Index:30 Data:"\010\031\212K\007\n\005role1" +Entry: Term:25 Index:31 Data:"\010\032\222K\007\n\005role3" +Entry: Term:26 Index:32 Data:"\010\033\232K\033\n\005role3\022\022\010\001\022\004Keys\032\010RangeEnd" +Entry: Term:27 Index:33 Data:"\010\034\242K\026\n\005role3\022\003key\032\010rangeend" +Entry: Term:27 Index:34 Data:"?" +`, out.String()) +} From d79bc3fa7e2aec32144d8349561504f201195ffd Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Wed, 28 Dec 2022 16:27:31 +0100 Subject: [PATCH 6/8] etcd-dump-logs: Fix order of imports.. Signed-off-by: Piotr Tabor --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index c70e888e000..4e6417a74ef 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ hack/tls-setup/certs /tools/proto-annotations/proto-annotations /tools/benchmark/benchmark /out +/etcd-dump-logs From 8ec3cbc551dd57aee124435593629b1321258214 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 29 Dec 2022 09:52:48 +0100 Subject: [PATCH 7/8] fixup! Add --raw mode to ./etcd-dump-log Signed-off-by: Piotr Tabor --- server/storage/wal/repair_test.go | 2 +- tools/etcd-dump-logs/raw.go | 2 +- tools/etcd-dump-logs/raw_test.go | 14 ++++++++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/server/storage/wal/repair_test.go b/server/storage/wal/repair_test.go index a6ec80e062d..b1fd9d25d39 100644 --- a/server/storage/wal/repair_test.go +++ b/server/storage/wal/repair_test.go @@ -57,7 +57,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect require.NoError(t, err) for _, es := range ents { - assert.NoError(t, w.Save(raftpb.HardState{}, es)) + require.NoError(t, w.Save(raftpb.HardState{}, es)) } offset, err := w.tail().Seek(0, io.SeekCurrent) diff --git a/tools/etcd-dump-logs/raw.go b/tools/etcd-dump-logs/raw.go index 3df6de6d6b5..85259430a32 100644 --- a/tools/etcd-dump-logs/raw.go +++ b/tools/etcd-dump-logs/raw.go @@ -67,7 +67,7 @@ func readRaw(lg *zap.Logger, fromIndex *uint64, waldir string, out io.Writer) { continue } if errors.Is(err, io.EOF) { - lg.Info("EOF: All entries were processed") + fmt.Fprintf(out, "EOF: All entries were processed.\n") break } else { lg.Error("Reading failed", zap.Error(err)) diff --git a/tools/etcd-dump-logs/raw_test.go b/tools/etcd-dump-logs/raw_test.go index 880b96d8b65..bc485c2049c 100644 --- a/tools/etcd-dump-logs/raw_test.go +++ b/tools/etcd-dump-logs/raw_test.go @@ -1,3 +1,16 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package main import ( @@ -51,5 +64,6 @@ Entry: Term:25 Index:31 Data:"\010\032\222K\007\n\005role3" Entry: Term:26 Index:32 Data:"\010\033\232K\033\n\005role3\022\022\010\001\022\004Keys\032\010RangeEnd" Entry: Term:27 Index:33 Data:"\010\034\242K\026\n\005role3\022\003key\032\010rangeend" Entry: Term:27 Index:34 Data:"?" +EOF: All entries were processed. `, out.String()) } From 007858dc978ed021a43a974a669e3a7d8983a5c4 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 29 Dec 2022 12:10:45 +0100 Subject: [PATCH 8/8] etcd-dump-logs: Migrate from zap to log for raw Signed-off-by: Piotr Tabor --- tools/etcd-dump-logs/main.go | 6 +++--- tools/etcd-dump-logs/raw.go | 22 +++++++++++----------- tools/etcd-dump-logs/raw_test.go | 3 +-- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/tools/etcd-dump-logs/main.go b/tools/etcd-dump-logs/main.go index 1e2f008aaf4..6eb6dcf3610 100644 --- a/tools/etcd-dump-logs/main.go +++ b/tools/etcd-dump-logs/main.go @@ -96,7 +96,7 @@ and output a hex encoded line of binary for each input line`) if wd == "" { wd = walDir(dataDir) } - readRaw(lg, index, wd, os.Stdout) + readRaw(index, wd, os.Stdout) } } @@ -114,7 +114,7 @@ func readUsingReadAll(lg *zap.Logger, index *uint64, snapfile *string, dataDir s walsnap.Index = *index } else { if *snapfile == "" { - ss := snap.New(zap.NewExample(), snapDir(dataDir)) + ss := snap.New(lg, snapDir(dataDir)) snapshot, err = ss.Load() } else { snapshot, err = snap.Read(lg, filepath.Join(snapDir(dataDir), *snapfile)) @@ -381,7 +381,7 @@ func listEntriesType(entrytype string, streamdecoder string, ents []raftpb.Entry printer(e) if streamdecoder == "" { fmt.Println() - //continue + continue } // if decoder is set, pass the e.Data to stdin and read the stdout from decoder diff --git a/tools/etcd-dump-logs/raw.go b/tools/etcd-dump-logs/raw.go index 85259430a32..6a76022e880 100644 --- a/tools/etcd-dump-logs/raw.go +++ b/tools/etcd-dump-logs/raw.go @@ -19,11 +19,10 @@ import ( "fmt" "io" "io/ioutil" + "log" "os" "path/filepath" - "go.uber.org/zap" - "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/pbutil" @@ -32,19 +31,20 @@ import ( "go.etcd.io/raft/v3/raftpb" ) -func readRaw(lg *zap.Logger, fromIndex *uint64, waldir string, out io.Writer) { +func readRaw(fromIndex *uint64, waldir string, out io.Writer) { var walReaders []fileutil.FileReader files, err := ioutil.ReadDir(waldir) if err != nil { - lg.Fatal("Failed to read directory.", zap.String("directory", waldir), zap.Error(err)) + log.Fatalf("Error: Failed to read directory '%s' error:%v", waldir, err) } for _, finfo := range files { if filepath.Ext(finfo.Name()) != ".wal" { - lg.Warn("Ignoring not .wal file", zap.String("filename", finfo.Name())) + log.Printf("Warning: Ignoring not .wal file: %s", finfo.Name()) + continue } f, err := os.Open(filepath.Join(waldir, finfo.Name())) if err != nil { - lg.Fatal("Failed to read file", zap.String("filename", finfo.Name()), zap.Error(err)) + log.Printf("Error: Failed to read file: %s . error:%v", finfo.Name(), err) } walReaders = append(walReaders, fileutil.NewFileReader(f)) } @@ -56,10 +56,10 @@ func readRaw(lg *zap.Logger, fromIndex *uint64, waldir string, out io.Writer) { err := decoder.Decode(&rec) if err == nil || errors.Is(err, walpb.ErrCRCMismatch) { if err != nil && !crcDesync { - lg.Warn("Reading entry failed with CRC error", zap.Error(err)) + log.Printf("Error: Reading entry failed with CRC error: %c", err) crcDesync = true } - printRec(lg, &rec, fromIndex, out) + printRec(&rec, fromIndex, out) if rec.Type == wal.CrcType { decoder.UpdateCRC(rec.Crc) crcDesync = false @@ -70,13 +70,13 @@ func readRaw(lg *zap.Logger, fromIndex *uint64, waldir string, out io.Writer) { fmt.Fprintf(out, "EOF: All entries were processed.\n") break } else { - lg.Error("Reading failed", zap.Error(err)) + log.Printf("Error: Reading failed: %v", err) break } } } -func printRec(lg *zap.Logger, rec *walpb.Record, fromIndex *uint64, out io.Writer) { +func printRec(rec *walpb.Record, fromIndex *uint64, out io.Writer) { switch rec.Type { case wal.MetadataType: var metadata etcdserverpb.Metadata @@ -102,6 +102,6 @@ func printRec(lg *zap.Logger, rec *walpb.Record, fromIndex *uint64, out io.Write fmt.Fprintf(out, "HardState: %s\n", state.String()) } default: - lg.Error("Unexpected WAL log type", zap.Int64("type", rec.Type)) + log.Printf("Unexpected WAL log type: %d", rec.Type) } } diff --git a/tools/etcd-dump-logs/raw_test.go b/tools/etcd-dump-logs/raw_test.go index bc485c2049c..d1509ec4ea3 100644 --- a/tools/etcd-dump-logs/raw_test.go +++ b/tools/etcd-dump-logs/raw_test.go @@ -18,14 +18,13 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.uber.org/zap/zaptest" ) func Test_readRaw(t *testing.T) { path := t.TempDir() mustCreateWalLog(t, path) var out bytes.Buffer - readRaw(zaptest.NewLogger(t), nil, walDir(path), &out) + readRaw(nil, walDir(path), &out) assert.Equal(t, `CRC: 0 Metadata: