Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s range lots of pods(about 300k pods) in 100 threads, cause etcd OOM #12342

Closed
yangxuanjia opened this issue Sep 28, 2020 · 5 comments · May be fixed by #12343
Closed

k8s range lots of pods(about 300k pods) in 100 threads, cause etcd OOM #12342

yangxuanjia opened this issue Sep 28, 2020 · 5 comments · May be fixed by #12343
Labels

Comments

@yangxuanjia
Copy link
Contributor

refer memory leak: looks like KeyValue Unmarshal or txReaderBuffer cause #12256

@ptabor
Copy link
Contributor

ptabor commented Oct 23, 2020

Could you, please, create a repro in form of a test ?

@yangxuanjia
Copy link
Contributor Author

@ptabor
This bug is difficult to reproduce under unit testing. We reproduced this problem by setting up a cluster environment of physical machines, and then simulating range requests through many clients at the same time, and obtaining 300k pods to reproduce. But the essence of the problem is I I have found it, that is, every time the range requests to obtain 300k pods, the array kvs is accumulated in the memory.Once more than 100 threads are requested at the same time, it is easy to burst the memory of etcd and become OOM.

@yangxuanjia
Copy link
Contributor Author

yangxuanjia commented Oct 23, 2020

I make a test for easy test in unit test, to repro the case.

package main

import (
	"bytes"
	"fmt"
	"net/http"
	"sort"
	"sync"
	"time"

	_ "net/http/pprof"
    go_metrics "github.com/rcrowley/go-metrics"
	go_bytesize "github.com/inhies/go-bytesize"
)

// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
	buckets map[string]*bucketBuffer
}

func (txb *txBuffer) reset() {
	for k, v := range txb.buckets {
		if v.used == 0 {
			// demote
			delete(txb.buckets, k)
		}
		v.used = 0
	}
}

// txWriteBuffer buffers writes of pending updates that have not yet committed.
type txWriteBuffer struct {
	txBuffer
	seq bool
}

func (txw *txWriteBuffer) put(bucket, k, v []byte) {
	txw.seq = false
	txw.putSeq(bucket, k, v)
}

func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
	b, ok := txw.buckets[string(bucket)]
	if !ok {
		b = newBucketBuffer()
		txw.buckets[string(bucket)] = b
	}
	b.add(k, v)
}

func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
	for k, wb := range txw.buckets {
		rb, ok := txr.buckets[k]
		if !ok {
			delete(txw.buckets, k)
			txr.buckets[k] = wb
			continue
		}
		if !txw.seq && wb.used > 1 {
			// assume no duplicate keys
			sort.Sort(wb)
		}
		rb.merge(wb)
	}
	txw.reset()
}

// txReadBuffer accesses buffered updates.
type txReadBuffer struct{ txBuffer }

func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
	if b := txr.buckets[string(bucketName)]; b != nil {
		return b.Range(key, endKey, limit)
	}
	return nil, nil
}

func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
	if b := txr.buckets[string(bucketName)]; b != nil {
		return b.ForEach(visitor)
	}
	return nil
}

// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock()
func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
	txrCopy := txReadBuffer{
		txBuffer: txBuffer{
			buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
		},
	}
	for bucketName, bucket := range txr.txBuffer.buckets {
		txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
	}
	return txrCopy
}

type kv struct {
	key []byte
	val []byte
}

// bucketBuffer buffers key-value pairs that are pending commit.
type bucketBuffer struct {
	buf []kv
	// used tracks number of elements in use so buf can be reused without reallocation.
	used int
}

func newBucketBuffer() *bucketBuffer {
	return &bucketBuffer{buf: make([]kv, 512), used: 0}
}

func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
	f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
	idx := sort.Search(bb.used, f)
	if idx < 0 {
		return nil, nil
	}
	if len(endKey) == 0 {
		if bytes.Equal(key, bb.buf[idx].key) {
			keys = append(keys, bb.buf[idx].key)
			vals = append(vals, bb.buf[idx].val)
		}
		return keys, vals
	}
	if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
		return nil, nil
	}
	for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
		if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
			break
		}
		keys = append(keys, bb.buf[i].key)
		vals = append(vals, bb.buf[i].val)
	}
	return keys, vals
}

func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
	for i := 0; i < bb.used; i++ {
		if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
			return err
		}
	}
	return nil
}

func (bb *bucketBuffer) add(k, v []byte) {
	bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
	bb.used++
	if bb.used == len(bb.buf) {
		buf := make([]kv, (3*len(bb.buf))/2)
		copy(buf, bb.buf)
		bb.buf = buf
	}
}

// merge merges data from bbsrc into bb.
func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
	for i := 0; i < bbsrc.used; i++ {
		bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
	}
	if bb.used == bbsrc.used {
		return
	}
	if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
		return
	}

	sort.Stable(bb)

	// remove duplicates, using only newest update
	widx := 0
	for ridx := 1; ridx < bb.used; ridx++ {
		if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
			widx++
		}
		bb.buf[widx] = bb.buf[ridx]
	}
	bb.used = widx + 1
}

func (bb *bucketBuffer) Len() int { return bb.used }
func (bb *bucketBuffer) Less(i, j int) bool {
	return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
}
func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }

func (bb *bucketBuffer) Copy() *bucketBuffer {
	bbCopy := bucketBuffer{
		buf:  make([]kv, len(bb.buf)),
		used: bb.used,
	}
	copy(bbCopy.buf, bb.buf)
	return &bbCopy
}



func main(){
	
	cc := make(chan []int, 1)

	go func() {
		http.ListenAndServe("0.0.0.0:8080", nil)
	}()

	go func() {
		registry := go_metrics.NewRegistry()
		go_metrics.RegisterRuntimeMemStats(registry)
		go_metrics.RegisterDebugGCStats(registry)

		go go_metrics.CaptureRuntimeMemStats(registry, 10 * time.Second)
		go go_metrics.CaptureDebugGCStats(registry, 10 * time.Second)

		t := time.NewTimer(10 * time.Second)
		defer t.Stop()
		for {
			select {
			case <-t.C:
				for kk, vv := range registry.GetAll() {
					for k, v := range vv {
						var ff float64
						switch vvv := v.(type) {
						case int64:
							ff = float64(vvv)
						    break
						case float64:
							ff = float64(vvv)
							break
						default:
							fmt.Printf("not handle this value type, kk:%s, k: %s, v: %s", kk, k, vvv)
						}

						bb := go_bytesize.New(ff)
						fmt.Printf("%s, %s, %s \n", kk, k, bb.String())
					}
				}
				fmt.Println("---------------------------------------")
				t.Reset(10 * time.Second)
			}
		}
	}()

	txReadBuf := new(txReadBuffer)
	txReadBuf.buckets = make(map[string]*bucketBuffer)
	for i:=0; i<3000; i++ {
		txReadBuf.buckets[string(i)] = newBucketBuffer()
		for j:=0; j<3000; j++ {
			txReadBuf.buckets[string(i)].add([]byte("aaaaa"+string(i)+"_"+string(j)), []byte("bbbbb"+string(i)+"_"+string(j)))
		}
	}

	wg := sync.WaitGroup{}

	allocCopy := func () {
		for k:=0; k<100; k++ {
			wg.Add(1)
			go func(kk int) {
				defer wg.Done()
				a1 := txReadBuf.unsafeCopy()
				fmt.Printf("golang thread %d, len: %d\n", kk, len(a1.buckets))
			}(k)

			time.Sleep(200*time.Millisecond)
		}
		return
	}

	allocCopy()

	wg.Wait()
	fmt.Printf("golang thread run finish!!! \n")
	fmt.Printf("single run start!!! \n")

	for k:=0; k<10000; k++ {
		a1 := txReadBuf.unsafeCopy()
		fmt.Printf("single run time number %d, len: %d\n", k, len(a1.buckets))
		time.Sleep(200*time.Millisecond)
	}

	select {
	case <-cc:
	}
}

@yangxuanjia
Copy link
Contributor Author

yangxuanjia commented Dec 11, 2020

use get() to get 300k pods in 100 threads memory OOM:
91159754-0eefd580-e6fb-11ea-80a5-e0f6f5e89a1c

use getStream() to get 300k pods in 300 thread memory occupy 48G, and last roughly keep stable.
Screenshot from 2020-12-11 18-18-33

Screenshot from 2020-12-11 18-26-20

@stale
Copy link

stale bot commented Mar 11, 2021

This issue has been automatically marked as stale because it has not had recent activity. It will be closed after 21 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Mar 11, 2021
@stale stale bot closed this as completed Apr 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging a pull request may close this issue.

2 participants