diff --git a/go.mod b/go.mod index 61f5e15a5..70d095941 100644 --- a/go.mod +++ b/go.mod @@ -14,9 +14,9 @@ require ( k8s.io/apimachinery v0.18.1 k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible k8s.io/code-generator v0.18.0 - knative.dev/pkg v0.0.0-20200618002824-96c250871fac + knative.dev/pkg v0.0.0-20200619020725-7df8fc5d7743 knative.dev/sample-controller v0.0.0-20200510050845-bf7c19498b7e - knative.dev/test-infra v0.0.0-20200617235125-6382dba95484 + knative.dev/test-infra v0.0.0-20200618184825-a7b2980a8884 ) replace ( diff --git a/go.sum b/go.sum index e4d87ecfa..ff9bdb61b 100644 --- a/go.sum +++ b/go.sum @@ -877,6 +877,8 @@ go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto= +go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -1386,6 +1388,8 @@ knative.dev/pkg v0.0.0-20200603222317-b79e4a24ca50/go.mod h1:8IfPj/lpuKHHg82xZCl knative.dev/pkg v0.0.0-20200611204322-2ddcfef739a2/go.mod h1:rA+FklsrVahwF4a+D63NyHJlzDoAFH81K4J5CYuE3bA= knative.dev/pkg v0.0.0-20200618002824-96c250871fac h1:X8XHaSFsUIW2IJCIEQEzNfPbs/gGib3CUK/+lkZuoEo= knative.dev/pkg v0.0.0-20200618002824-96c250871fac/go.mod h1:4ty6MSlNjZk5qBaGb3Gt4gopjMD4gRknfTABblcFpQ8= +knative.dev/pkg v0.0.0-20200619020725-7df8fc5d7743 h1:W1NKMizoXYYX5e2mkFXnn21T7X6ROKKwL8YetGu7xCQ= +knative.dev/pkg v0.0.0-20200619020725-7df8fc5d7743/go.mod h1:DquzK0hsLDcg2q63Sn+CngAyRwv4cKMpt5F19YzBfb0= knative.dev/sample-controller v0.0.0-20200510050845-bf7c19498b7e h1:I6nRhlOCuFMShAMRhbe9c0+pbfQHttUWZMqVVtRnNt0= knative.dev/sample-controller v0.0.0-20200510050845-bf7c19498b7e/go.mod h1:D2ZDLrR9Dq9LiiVN7TatzI7WMcEPgk1MHbbhgBKE6W8= knative.dev/test-infra v0.0.0-20200407185800-1b88cb3b45a5/go.mod h1:xcdUkMJrLlBswIZqL5zCuBFOC22WIPMQoVX1L35i0vQ= @@ -1401,6 +1405,8 @@ knative.dev/test-infra v0.0.0-20200615231324-3a016f44102c h1:pzn7d3gVWX6p10CpdSF knative.dev/test-infra v0.0.0-20200615231324-3a016f44102c/go.mod h1:+BfrTJpc++rH30gX/C0QY6NT2eYVzycll52uw6CrQnc= knative.dev/test-infra v0.0.0-20200617235125-6382dba95484 h1:5D1Fm6aA1T1QQXLb1HkJ5t8gB9pTkhLYak1CCqIP+pE= knative.dev/test-infra v0.0.0-20200617235125-6382dba95484/go.mod h1:+BfrTJpc++rH30gX/C0QY6NT2eYVzycll52uw6CrQnc= +knative.dev/test-infra v0.0.0-20200618184825-a7b2980a8884 h1:qGxu/U/8VxhAuyFedrrne4s0vfY+YfoRwJJCY0AKpbw= +knative.dev/test-infra v0.0.0-20200618184825-a7b2980a8884/go.mod h1:qeiTuhDKO/HHheqVfepbxy5/q+O9toSJW6CO/DgjxFY= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= diff --git a/pkg/client/injection/reconciler/networking/v1alpha1/certificate/reconciler.go b/pkg/client/injection/reconciler/networking/v1alpha1/certificate/reconciler.go index ac18c1f79..ba098eba3 100644 --- a/pkg/client/injection/reconciler/networking/v1alpha1/certificate/reconciler.go +++ b/pkg/client/injection/reconciler/networking/v1alpha1/certificate/reconciler.go @@ -181,7 +181,7 @@ func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { // updates regardless of whether the reconciliation errored out. reconcileEvent = r.reconciler.ReconcileKind(ctx, resource) - reconciler.PostProcessReconcile(ctx, resource) + reconciler.PostProcessReconcile(ctx, resource, original) } else if fin, ok := r.reconciler.(Finalizer); ok { // Append the target method to the logger. diff --git a/pkg/client/injection/reconciler/networking/v1alpha1/ingress/reconciler.go b/pkg/client/injection/reconciler/networking/v1alpha1/ingress/reconciler.go index e447d4de4..d68923a77 100644 --- a/pkg/client/injection/reconciler/networking/v1alpha1/ingress/reconciler.go +++ b/pkg/client/injection/reconciler/networking/v1alpha1/ingress/reconciler.go @@ -170,7 +170,7 @@ func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { // updates regardless of whether the reconciliation errored out. reconcileEvent = r.reconciler.ReconcileKind(ctx, resource) - reconciler.PostProcessReconcile(ctx, resource) + reconciler.PostProcessReconcile(ctx, resource, original) } else if fin, ok := r.reconciler.(Finalizer); ok { // Append the target method to the logger. diff --git a/pkg/client/injection/reconciler/networking/v1alpha1/serverlessservice/reconciler.go b/pkg/client/injection/reconciler/networking/v1alpha1/serverlessservice/reconciler.go index 42dc01860..98a9807a9 100644 --- a/pkg/client/injection/reconciler/networking/v1alpha1/serverlessservice/reconciler.go +++ b/pkg/client/injection/reconciler/networking/v1alpha1/serverlessservice/reconciler.go @@ -170,7 +170,7 @@ func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { // updates regardless of whether the reconciliation errored out. reconcileEvent = r.reconciler.ReconcileKind(ctx, resource) - reconciler.PostProcessReconcile(ctx, resource) + reconciler.PostProcessReconcile(ctx, resource, original) } else if fin, ok := r.reconciler.(Finalizer); ok { // Append the target method to the logger. diff --git a/vendor/go.opencensus.io/Makefile b/vendor/go.opencensus.io/Makefile index 457866cb1..b3ce3df30 100644 --- a/vendor/go.opencensus.io/Makefile +++ b/vendor/go.opencensus.io/Makefile @@ -8,7 +8,7 @@ ALL_PKGS := $(shell go list $(sort $(dir $(ALL_SRC)))) GOTEST_OPT?=-v -race -timeout 30s GOTEST_OPT_WITH_COVERAGE = $(GOTEST_OPT) -coverprofile=coverage.txt -covermode=atomic GOTEST=go test -GOFMT=gofmt +GOIMPORTS=goimports GOLINT=golint GOVET=go vet EMBEDMD=embedmd @@ -17,14 +17,14 @@ TRACE_ID_LINT_EXCEPTION="type name will be used as trace.TraceID by other packag TRACE_OPTION_LINT_EXCEPTION="type name will be used as trace.TraceOptions by other packages" README_FILES := $(shell find . -name '*README.md' | sort | tr '\n' ' ') -.DEFAULT_GOAL := fmt-lint-vet-embedmd-test +.DEFAULT_GOAL := imports-lint-vet-embedmd-test -.PHONY: fmt-lint-vet-embedmd-test -fmt-lint-vet-embedmd-test: fmt lint vet embedmd test +.PHONY: imports-lint-vet-embedmd-test +imports-lint-vet-embedmd-test: imports lint vet embedmd test # TODO enable test-with-coverage in tavis .PHONY: travis-ci -travis-ci: fmt lint vet embedmd test test-386 +travis-ci: imports lint vet embedmd test test-386 all-pkgs: @echo $(ALL_PKGS) | tr ' ' '\n' | sort @@ -44,15 +44,15 @@ test-386: test-with-coverage: $(GOTEST) $(GOTEST_OPT_WITH_COVERAGE) $(ALL_PKGS) -.PHONY: fmt -fmt: - @FMTOUT=`$(GOFMT) -s -l $(ALL_SRC) 2>&1`; \ - if [ "$$FMTOUT" ]; then \ - echo "$(GOFMT) FAILED => gofmt the following files:\n"; \ - echo "$$FMTOUT\n"; \ +.PHONY: imports +imports: + @IMPORTSOUT=`$(GOIMPORTS) -l $(ALL_SRC) 2>&1`; \ + if [ "$$IMPORTSOUT" ]; then \ + echo "$(GOIMPORTS) FAILED => goimports the following files:\n"; \ + echo "$$IMPORTSOUT\n"; \ exit 1; \ else \ - echo "Fmt finished successfully"; \ + echo "Imports finished successfully"; \ fi .PHONY: lint @@ -91,6 +91,7 @@ embedmd: .PHONY: install-tools install-tools: - go get -u golang.org/x/tools/cmd/cover go get -u golang.org/x/lint/golint + go get -u golang.org/x/tools/cmd/cover + go get -u golang.org/x/tools/cmd/goimports go get -u github.com/rakyll/embedmd diff --git a/vendor/go.opencensus.io/plugin/ocgrpc/client.go b/vendor/go.opencensus.io/plugin/ocgrpc/client.go index 28fddb844..2063b6f76 100644 --- a/vendor/go.opencensus.io/plugin/ocgrpc/client.go +++ b/vendor/go.opencensus.io/plugin/ocgrpc/client.go @@ -16,8 +16,8 @@ package ocgrpc import ( "context" - "go.opencensus.io/trace" + "go.opencensus.io/trace" "google.golang.org/grpc/stats" ) diff --git a/vendor/go.opencensus.io/plugin/ocgrpc/server.go b/vendor/go.opencensus.io/plugin/ocgrpc/server.go index 15ada839d..8a53e0972 100644 --- a/vendor/go.opencensus.io/plugin/ocgrpc/server.go +++ b/vendor/go.opencensus.io/plugin/ocgrpc/server.go @@ -16,9 +16,10 @@ package ocgrpc import ( "context" - "go.opencensus.io/trace" "google.golang.org/grpc/stats" + + "go.opencensus.io/trace" ) // ServerHandler implements gRPC stats.Handler recording OpenCensus stats and diff --git a/vendor/go.opencensus.io/plugin/ocgrpc/trace_common.go b/vendor/go.opencensus.io/plugin/ocgrpc/trace_common.go index fef582756..61bc543d0 100644 --- a/vendor/go.opencensus.io/plugin/ocgrpc/trace_common.go +++ b/vendor/go.opencensus.io/plugin/ocgrpc/trace_common.go @@ -15,16 +15,16 @@ package ocgrpc import ( + "context" "strings" "google.golang.org/grpc/codes" - - "context" - "go.opencensus.io/trace" - "go.opencensus.io/trace/propagation" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" + + "go.opencensus.io/trace" + "go.opencensus.io/trace/propagation" ) const traceContextKey = "grpc-trace-bin" diff --git a/vendor/go.opencensus.io/plugin/ochttp/propagation/b3/b3.go b/vendor/go.opencensus.io/plugin/ochttp/propagation/b3/b3.go index 2f1c7f006..9ad885219 100644 --- a/vendor/go.opencensus.io/plugin/ochttp/propagation/b3/b3.go +++ b/vendor/go.opencensus.io/plugin/ochttp/propagation/b3/b3.go @@ -68,7 +68,7 @@ func ParseTraceID(tid string) (trace.TraceID, bool) { return trace.TraceID{}, false } b, err := hex.DecodeString(tid) - if err != nil { + if err != nil || len(b) > 16 { return trace.TraceID{}, false } var traceID trace.TraceID @@ -90,7 +90,7 @@ func ParseSpanID(sid string) (spanID trace.SpanID, ok bool) { return trace.SpanID{}, false } b, err := hex.DecodeString(sid) - if err != nil { + if err != nil || len(b) > 8 { return trace.SpanID{}, false } start := 8 - len(b) diff --git a/vendor/go.opencensus.io/stats/record.go b/vendor/go.opencensus.io/stats/record.go index ad4691184..2b9728346 100644 --- a/vendor/go.opencensus.io/stats/record.go +++ b/vendor/go.opencensus.io/stats/record.go @@ -31,10 +31,19 @@ func init() { } } +// Recorder provides an interface for exporting measurement information from +// the static Record method by using the WithRecorder option. +type Recorder interface { + // Record records a set of measurements associated with the given tags and attachments. + // The second argument is a `[]Measurement`. + Record(*tag.Map, interface{}, map[string]interface{}) +} + type recordOptions struct { attachments metricdata.Attachments mutators []tag.Mutator measurements []Measurement + recorder Recorder } // WithAttachments applies provided exemplar attachments. @@ -58,6 +67,14 @@ func WithMeasurements(measurements ...Measurement) Options { } } +// WithRecorder records the measurements to the specified `Recorder`, rather +// than to the global metrics recorder. +func WithRecorder(meter Recorder) Options { + return func(ro *recordOptions) { + ro.recorder = meter + } +} + // Options apply changes to recordOptions. type Options func(*recordOptions) @@ -93,6 +110,9 @@ func RecordWithOptions(ctx context.Context, ros ...Options) error { return nil } recorder := internal.DefaultRecorder + if o.recorder != nil { + recorder = o.recorder.Record + } if recorder == nil { return nil } diff --git a/vendor/go.opencensus.io/stats/view/export.go b/vendor/go.opencensus.io/stats/view/export.go index 7cb59718f..73ba11f5b 100644 --- a/vendor/go.opencensus.io/stats/view/export.go +++ b/vendor/go.opencensus.io/stats/view/export.go @@ -14,13 +14,6 @@ package view -import "sync" - -var ( - exportersMu sync.RWMutex // guards exporters - exporters = make(map[Exporter]struct{}) -) - // Exporter exports the collected records as view data. // // The ExportView method should return quickly; if an @@ -43,16 +36,10 @@ type Exporter interface { // // Binaries can register exporters, libraries shouldn't register exporters. func RegisterExporter(e Exporter) { - exportersMu.Lock() - defer exportersMu.Unlock() - - exporters[e] = struct{}{} + defaultWorker.RegisterExporter(e) } // UnregisterExporter unregisters an exporter. func UnregisterExporter(e Exporter) { - exportersMu.Lock() - defer exportersMu.Unlock() - - delete(exporters, e) + defaultWorker.UnregisterExporter(e) } diff --git a/vendor/go.opencensus.io/stats/view/view_to_metric.go b/vendor/go.opencensus.io/stats/view/view_to_metric.go index 293c1646d..5e1656a1f 100644 --- a/vendor/go.opencensus.io/stats/view/view_to_metric.go +++ b/vendor/go.opencensus.io/stats/view/view_to_metric.go @@ -18,6 +18,8 @@ package view import ( "time" + "go.opencensus.io/resource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/stats" ) @@ -125,7 +127,7 @@ func rowToTimeseries(v *viewInternal, row *Row, now time.Time, startTime time.Ti } } -func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricdata.Metric { +func viewToMetric(v *viewInternal, r *resource.Resource, now time.Time, startTime time.Time) *metricdata.Metric { if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 || v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 { startTime = time.Time{} @@ -144,6 +146,7 @@ func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricda m := &metricdata.Metric{ Descriptor: *v.metricDescriptor, TimeSeries: ts, + Resource: r, } return m } diff --git a/vendor/go.opencensus.io/stats/view/worker.go b/vendor/go.opencensus.io/stats/view/worker.go index 2f3c018af..ab8bfd46d 100644 --- a/vendor/go.opencensus.io/stats/view/worker.go +++ b/vendor/go.opencensus.io/stats/view/worker.go @@ -20,6 +20,8 @@ import ( "sync" "time" + "go.opencensus.io/resource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/stats" @@ -28,7 +30,7 @@ import ( ) func init() { - defaultWorker = newWorker() + defaultWorker = NewMeter().(*worker) go defaultWorker.start() internal.DefaultRecorder = record } @@ -47,8 +49,69 @@ type worker struct { c chan command quit, done chan bool mu sync.RWMutex + r *resource.Resource + + exportersMu sync.RWMutex + exporters map[Exporter]struct{} +} + +// Meter defines an interface which allows a single process to maintain +// multiple sets of metrics exports (intended for the advanced case where a +// single process wants to report metrics about multiple objects, such as +// multiple databases or HTTP services). +// +// Note that this is an advanced use case, and the static functions in this +// module should cover the common use cases. +type Meter interface { + stats.Recorder + // Find returns a registered view associated with this name. + // If no registered view is found, nil is returned. + Find(name string) *View + // Register begins collecting data for the given views. + // Once a view is registered, it reports data to the registered exporters. + Register(views ...*View) error + // Unregister the given views. Data will not longer be exported for these views + // after Unregister returns. + // It is not necessary to unregister from views you expect to collect for the + // duration of your program execution. + Unregister(views ...*View) + // SetReportingPeriod sets the interval between reporting aggregated views in + // the program. If duration is less than or equal to zero, it enables the + // default behavior. + // + // Note: each exporter makes different promises about what the lowest supported + // duration is. For example, the Stackdriver exporter recommends a value no + // lower than 1 minute. Consult each exporter per your needs. + SetReportingPeriod(time.Duration) + + // RegisterExporter registers an exporter. + // Collected data will be reported via all the + // registered exporters. Once you no longer + // want data to be exported, invoke UnregisterExporter + // with the previously registered exporter. + // + // Binaries can register exporters, libraries shouldn't register exporters. + RegisterExporter(Exporter) + // UnregisterExporter unregisters an exporter. + UnregisterExporter(Exporter) + // SetResource may be used to set the Resource associated with this registry. + // This is intended to be used in cases where a single process exports metrics + // for multiple Resources, typically in a multi-tenant situation. + SetResource(*resource.Resource) + + // Start causes the Meter to start processing Record calls and aggregating + // statistics as well as exporting data. + Start() + // Stop causes the Meter to stop processing calls and terminate data export. + Stop() + + // RetrieveData gets a snapshot of the data collected for the the view registered + // with the given name. It is intended for testing only. + RetrieveData(viewName string) ([]*Row, error) } +var _ Meter = (*worker)(nil) + var defaultWorker *worker var defaultReportingDuration = 10 * time.Second @@ -56,11 +119,17 @@ var defaultReportingDuration = 10 * time.Second // Find returns a registered view associated with this name. // If no registered view is found, nil is returned. func Find(name string) (v *View) { + return defaultWorker.Find(name) +} + +// Find returns a registered view associated with this name. +// If no registered view is found, nil is returned. +func (w *worker) Find(name string) (v *View) { req := &getViewByNameReq{ name: name, c: make(chan *getViewByNameResp), } - defaultWorker.c <- req + w.c <- req resp := <-req.c return resp.v } @@ -68,11 +137,17 @@ func Find(name string) (v *View) { // Register begins collecting data for the given views. // Once a view is registered, it reports data to the registered exporters. func Register(views ...*View) error { + return defaultWorker.Register(views...) +} + +// Register begins collecting data for the given views. +// Once a view is registered, it reports data to the registered exporters. +func (w *worker) Register(views ...*View) error { req := ®isterViewReq{ views: views, err: make(chan error), } - defaultWorker.c <- req + w.c <- req return <-req.err } @@ -81,6 +156,14 @@ func Register(views ...*View) error { // It is not necessary to unregister from views you expect to collect for the // duration of your program execution. func Unregister(views ...*View) { + defaultWorker.Unregister(views...) +} + +// Unregister the given views. Data will not longer be exported for these views +// after Unregister returns. +// It is not necessary to unregister from views you expect to collect for the +// duration of your program execution. +func (w *worker) Unregister(views ...*View) { names := make([]string, len(views)) for i := range views { names[i] = views[i].Name @@ -89,31 +172,42 @@ func Unregister(views ...*View) { views: names, done: make(chan struct{}), } - defaultWorker.c <- req + w.c <- req <-req.done } // RetrieveData gets a snapshot of the data collected for the the view registered // with the given name. It is intended for testing only. func RetrieveData(viewName string) ([]*Row, error) { + return defaultWorker.RetrieveData(viewName) +} + +// RetrieveData gets a snapshot of the data collected for the the view registered +// with the given name. It is intended for testing only. +func (w *worker) RetrieveData(viewName string) ([]*Row, error) { req := &retrieveDataReq{ now: time.Now(), v: viewName, c: make(chan *retrieveDataResp), } - defaultWorker.c <- req + w.c <- req resp := <-req.c return resp.rows, resp.err } func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { + defaultWorker.Record(tags, ms, attachments) +} + +// Record records a set of measurements ms associated with the given tags and attachments. +func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { req := &recordReq{ tm: tags, ms: ms.([]stats.Measurement), attachments: attachments, t: time.Now(), } - defaultWorker.c <- req + w.c <- req } // SetReportingPeriod sets the interval between reporting aggregated views in @@ -124,17 +218,31 @@ func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { // duration is. For example, the Stackdriver exporter recommends a value no // lower than 1 minute. Consult each exporter per your needs. func SetReportingPeriod(d time.Duration) { + defaultWorker.SetReportingPeriod(d) +} + +// SetReportingPeriod sets the interval between reporting aggregated views in +// the program. If duration is less than or equal to zero, it enables the +// default behavior. +// +// Note: each exporter makes different promises about what the lowest supported +// duration is. For example, the Stackdriver exporter recommends a value no +// lower than 1 minute. Consult each exporter per your needs. +func (w *worker) SetReportingPeriod(d time.Duration) { // TODO(acetechnologist): ensure that the duration d is more than a certain // value. e.g. 1s req := &setReportingPeriodReq{ d: d, c: make(chan bool), } - defaultWorker.c <- req + w.c <- req <-req.c // don't return until the timer is set to the new duration. } -func newWorker() *worker { +// NewMeter constructs a Meter instance. You should only need to use this if +// you need to separate out Measurement recordings and View aggregations within +// a single process. +func NewMeter() Meter { return &worker{ measures: make(map[string]*measureRef), views: make(map[string]*viewInternal), @@ -143,9 +251,23 @@ func newWorker() *worker { c: make(chan command, 1024), quit: make(chan bool), done: make(chan bool), + + exporters: make(map[Exporter]struct{}), } } +// SetResource associates all data collected by this Meter with the specified +// resource. This resource is reported when using metricexport.ReadAndExport; +// it is not provided when used with ExportView/RegisterExporter, because that +// interface does not provide a means for reporting the Resource. +func (w *worker) SetResource(r *resource.Resource) { + w.r = r +} + +func (w *worker) Start() { + go w.start() +} + func (w *worker) start() { prodMgr := metricproducer.GlobalManager() prodMgr.AddProducer(w) @@ -155,7 +277,7 @@ func (w *worker) start() { case cmd := <-w.c: cmd.handleCommand(w) case <-w.timer.C: - w.reportUsage(time.Now()) + w.reportUsage() case <-w.quit: w.timer.Stop() close(w.c) @@ -165,7 +287,7 @@ func (w *worker) start() { } } -func (w *worker) stop() { +func (w *worker) Stop() { prodMgr := metricproducer.GlobalManager() prodMgr.DeleteProducer(w) @@ -202,44 +324,45 @@ func (w *worker) tryRegisterView(v *View) (*viewInternal, error) { return x, nil } w.views[vi.view.Name] = vi + w.startTimes[vi] = time.Now() ref := w.getMeasureRef(vi.view.Measure.Name()) ref.views[vi] = struct{}{} return vi, nil } -func (w *worker) unregisterView(viewName string) { +func (w *worker) unregisterView(v *viewInternal) { w.mu.Lock() defer w.mu.Unlock() - delete(w.views, viewName) + delete(w.views, v.view.Name) + delete(w.startTimes, v) + if measure := w.measures[v.view.Measure.Name()]; measure != nil { + delete(measure.views, v) + } } -func (w *worker) reportView(v *viewInternal, now time.Time) { +func (w *worker) reportView(v *viewInternal) { if !v.isSubscribed() { return } rows := v.collectedRows() - _, ok := w.startTimes[v] - if !ok { - w.startTimes[v] = now - } viewData := &Data{ View: v.view, Start: w.startTimes[v], End: time.Now(), Rows: rows, } - exportersMu.Lock() - for e := range exporters { + w.exportersMu.Lock() + defer w.exportersMu.Unlock() + for e := range w.exporters { e.ExportView(viewData) } - exportersMu.Unlock() } -func (w *worker) reportUsage(now time.Time) { +func (w *worker) reportUsage() { w.mu.Lock() defer w.mu.Unlock() for _, v := range w.views { - w.reportView(v, now) + w.reportView(v) } } @@ -248,11 +371,6 @@ func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric { return nil } - _, ok := w.startTimes[v] - if !ok { - w.startTimes[v] = now - } - var startTime time.Time if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 || v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 { @@ -261,7 +379,7 @@ func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric { startTime = w.startTimes[v] } - return viewToMetric(v, now, startTime) + return viewToMetric(v, w.r, now, startTime) } // Read reads all view data and returns them as metrics. @@ -279,3 +397,17 @@ func (w *worker) Read() []*metricdata.Metric { } return metrics } + +func (w *worker) RegisterExporter(e Exporter) { + w.exportersMu.Lock() + defer w.exportersMu.Unlock() + + w.exporters[e] = struct{}{} +} + +func (w *worker) UnregisterExporter(e Exporter) { + w.exportersMu.Lock() + defer w.exportersMu.Unlock() + + delete(w.exporters, e) +} diff --git a/vendor/go.opencensus.io/stats/view/worker_commands.go b/vendor/go.opencensus.io/stats/view/worker_commands.go index 0267e179a..9ac4cc059 100644 --- a/vendor/go.opencensus.io/stats/view/worker_commands.go +++ b/vendor/go.opencensus.io/stats/view/worker_commands.go @@ -95,7 +95,7 @@ func (cmd *unregisterFromViewReq) handleCommand(w *worker) { } // Report pending data for this view before removing it. - w.reportView(vi, time.Now()) + w.reportView(vi) vi.unsubscribe() if !vi.isSubscribed() { @@ -103,7 +103,7 @@ func (cmd *unregisterFromViewReq) handleCommand(w *worker) { // The collected data can be cleared. vi.clearRows() } - w.unregisterView(name) + w.unregisterView(vi) } cmd.done <- struct{}{} } @@ -163,7 +163,7 @@ func (cmd *recordReq) handleCommand(w *worker) { } ref := w.getMeasureRef(m.Measure().Name()) for v := range ref.views { - v.addSample(cmd.tm, m.Value(), cmd.attachments, time.Now()) + v.addSample(cmd.tm, m.Value(), cmd.attachments, cmd.t) } } } diff --git a/vendor/go.opencensus.io/trace/lrumap.go b/vendor/go.opencensus.io/trace/lrumap.go index dc7a295c7..908c2497e 100644 --- a/vendor/go.opencensus.io/trace/lrumap.go +++ b/vendor/go.opencensus.io/trace/lrumap.go @@ -44,7 +44,7 @@ func (lm lruMap) len() int { } func (lm lruMap) keys() []interface{} { - keys := []interface{}{} + keys := make([]interface{}, len(lm.cacheKeys)) for k := range lm.cacheKeys { keys = append(keys, k) } diff --git a/vendor/go.opencensus.io/trace/trace.go b/vendor/go.opencensus.io/trace/trace.go index 3f8977b41..125e2cd90 100644 --- a/vendor/go.opencensus.io/trace/trace.go +++ b/vendor/go.opencensus.io/trace/trace.go @@ -345,7 +345,7 @@ func (s *Span) SetStatus(status Status) { } func (s *Span) interfaceArrayToLinksArray() []Link { - linksArr := make([]Link, 0) + linksArr := make([]Link, 0, len(s.links.queue)) for _, value := range s.links.queue { linksArr = append(linksArr, value.(Link)) } @@ -353,7 +353,7 @@ func (s *Span) interfaceArrayToLinksArray() []Link { } func (s *Span) interfaceArrayToMessageEventArray() []MessageEvent { - messageEventArr := make([]MessageEvent, 0) + messageEventArr := make([]MessageEvent, 0, len(s.messageEvents.queue)) for _, value := range s.messageEvents.queue { messageEventArr = append(messageEventArr, value.(MessageEvent)) } @@ -361,7 +361,7 @@ func (s *Span) interfaceArrayToMessageEventArray() []MessageEvent { } func (s *Span) interfaceArrayToAnnotationArray() []Annotation { - annotationArr := make([]Annotation, 0) + annotationArr := make([]Annotation, 0, len(s.annotations.queue)) for _, value := range s.annotations.queue { annotationArr = append(annotationArr, value.(Annotation)) } @@ -369,7 +369,7 @@ func (s *Span) interfaceArrayToAnnotationArray() []Annotation { } func (s *Span) lruAttributesToAttributeMap() map[string]interface{} { - attributes := make(map[string]interface{}) + attributes := make(map[string]interface{}, s.lruAttributes.len()) for _, key := range s.lruAttributes.keys() { value, ok := s.lruAttributes.get(key) if ok { @@ -420,7 +420,7 @@ func (s *Span) lazyPrintfInternal(attributes []Attribute, format string, a ...in var m map[string]interface{} s.mu.Lock() if len(attributes) != 0 { - m = make(map[string]interface{}) + m = make(map[string]interface{}, len(attributes)) copyAttributes(m, attributes) } s.annotations.add(Annotation{ @@ -436,7 +436,7 @@ func (s *Span) printStringInternal(attributes []Attribute, str string) { var a map[string]interface{} s.mu.Lock() if len(attributes) != 0 { - a = make(map[string]interface{}) + a = make(map[string]interface{}, len(attributes)) copyAttributes(a, attributes) } s.annotations.add(Annotation{ diff --git a/vendor/knative.dev/pkg/codegen/cmd/injection-gen/generators/reconciler_reconciler.go b/vendor/knative.dev/pkg/codegen/cmd/injection-gen/generators/reconciler_reconciler.go index 647b82fd0..8b4d0c733 100644 --- a/vendor/knative.dev/pkg/codegen/cmd/injection-gen/generators/reconciler_reconciler.go +++ b/vendor/knative.dev/pkg/codegen/cmd/injection-gen/generators/reconciler_reconciler.go @@ -320,7 +320,7 @@ func (r *reconcilerImpl) Reconcile(ctx {{.contextContext|raw}}, key string) erro reconcileEvent = r.reconciler.ReconcileKind(ctx, resource) {{if .isKRShaped}} - reconciler.PostProcessReconcile(ctx, resource) + reconciler.PostProcessReconcile(ctx, resource, original) {{end}} } else if fin, ok := r.reconciler.(Finalizer); ok { // Append the target method to the logger. diff --git a/vendor/knative.dev/pkg/configmap/parse.go b/vendor/knative.dev/pkg/configmap/parse.go index 210d2114e..1ef675818 100644 --- a/vendor/knative.dev/pkg/configmap/parse.go +++ b/vendor/knative.dev/pkg/configmap/parse.go @@ -77,6 +77,20 @@ func AsInt64(key string, target *int64) ParseFunc { } } +// AsUint32 parses the value at key as an uint32 into the target, if it exists. +func AsUint32(key string, target *uint32) ParseFunc { + return func(data map[string]string) error { + if raw, ok := data[key]; ok { + val, err := strconv.ParseUint(raw, 10, 32) + if err != nil { + return fmt.Errorf("failed to parse %q: %w", key, err) + } + *target = uint32(val) + } + return nil + } +} + // AsFloat64 parses the value at key as a float64 into the target, if it exists. func AsFloat64(key string, target *float64) ParseFunc { return func(data map[string]string) error { diff --git a/vendor/knative.dev/pkg/controller/controller.go b/vendor/knative.dev/pkg/controller/controller.go index 3981c5820..6024ec253 100644 --- a/vendor/knative.dev/pkg/controller/controller.go +++ b/vendor/knative.dev/pkg/controller/controller.go @@ -36,8 +36,10 @@ import ( "k8s.io/client-go/util/workqueue" "knative.dev/pkg/kmeta" + kle "knative.dev/pkg/leaderelection" "knative.dev/pkg/logging" "knative.dev/pkg/logging/logkey" + "knative.dev/pkg/reconciler" ) const ( @@ -176,6 +178,10 @@ func FilterWithNameAndNamespace(namespace, name string) func(obj interface{}) bo // Impl is our core controller implementation. It handles queuing and feeding work // from the queue to an implementation of Reconciler. type Impl struct { + // Name is the unique name for this controller workqueue within this process. + // This is used for surfacing metrics, and per-controller leader election. + Name string + // Reconciler is the workhorse of this controller, it is fed the keys // from the workqueue to process. Public for testing. Reconciler Reconciler @@ -205,7 +211,9 @@ func NewImpl(r Reconciler, logger *zap.SugaredLogger, workQueueName string) *Imp } func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName string, reporter StatsReporter) *Impl { + logger = logger.Named(workQueueName) return &Impl{ + Name: workQueueName, Reconciler: r, WorkQueue: workqueue.NewNamedRateLimitingQueue( workqueue.DefaultControllerRateLimiter(), @@ -341,6 +349,14 @@ func (c *Impl) EnqueueKey(key types.NamespacedName) { c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len()) } +// MaybeEnqueueBucketKey takes a Bucket and namespace/name string and puts it onto the work queue. +func (c *Impl) MaybeEnqueueBucketKey(bkt reconciler.Bucket, key types.NamespacedName) { + if bkt.Has(key) { + c.WorkQueue.Add(key) + c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len()) + } +} + // EnqueueKeyAfter takes a namespace/name string and schedules its execution in // the work queue after given delay. func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) { @@ -349,10 +365,12 @@ func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) { } // RunContext starts the controller's worker threads, the number of which is threadiness. +// If the context has been decorated for LeaderElection, then an elector is built and run. // It then blocks until the context is cancelled, at which point it shuts down its // internal work queue and waits for workers to finish processing their current // work items. func (c *Impl) RunContext(ctx context.Context, threadiness int) error { + logger := c.logger defer runtime.HandleCrash() sg := sync.WaitGroup{} defer sg.Wait() @@ -363,8 +381,20 @@ func (c *Impl) RunContext(ctx context.Context, threadiness int) error { } }() + if la, ok := c.Reconciler.(reconciler.LeaderAware); ok { + // Build and execute an elector. + le, err := kle.BuildElector(ctx, la, c.Name, c.MaybeEnqueueBucketKey) + if err != nil { + return err + } + sg.Add(1) + go func() { + defer sg.Done() + le.Run(ctx) + }() + } + // Launch workers to process resources that get enqueued to our workqueue. - logger := c.logger logger.Info("Starting controller and workers") for i := 0; i < threadiness; i++ { sg.Add(1) diff --git a/vendor/knative.dev/pkg/injection/README.md b/vendor/knative.dev/pkg/injection/README.md index d30b090bc..e23ff32f5 100644 --- a/vendor/knative.dev/pkg/injection/README.md +++ b/vendor/knative.dev/pkg/injection/README.md @@ -471,7 +471,7 @@ reconciler.PreProcessReconcile(ctx, resource) reconcileEvent = r.reconciler.ReconcileKind(ctx, resource) -reconciler.PostProcessReconcile(ctx, resource) +reconciler.PostProcessReconcile(ctx, resource, oldResource) ``` #### Stubs diff --git a/vendor/knative.dev/pkg/injection/sharedmain/main.go b/vendor/knative.dev/pkg/injection/sharedmain/main.go index 1410138e1..5c2ba94a3 100644 --- a/vendor/knative.dev/pkg/injection/sharedmain/main.go +++ b/vendor/knative.dev/pkg/injection/sharedmain/main.go @@ -52,6 +52,7 @@ import ( "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/profiling" + "knative.dev/pkg/reconciler" "knative.dev/pkg/signals" "knative.dev/pkg/system" "knative.dev/pkg/version" @@ -399,6 +400,11 @@ func SecretFetcher(ctx context.Context) metrics.SecretFetcher { func ControllersAndWebhooksFromCtors(ctx context.Context, cmw *configmap.InformedWatcher, ctors ...injection.ControllerConstructor) ([]*controller.Impl, []interface{}) { + + // Check whether the context has been infused with a leader elector builder. + // If it has, then every reconciler we plan to start MUST implement LeaderAware. + leEnabled := kle.HasLeaderElection(ctx) + controllers := make([]*controller.Impl, 0, len(ctors)) webhooks := make([]interface{}, 0) for _, cf := range ctors { @@ -410,6 +416,12 @@ func ControllersAndWebhooksFromCtors(ctx context.Context, case webhook.AdmissionController, webhook.ConversionController: webhooks = append(webhooks, c) } + + if leEnabled { + if _, ok := ctrl.Reconciler.(reconciler.LeaderAware); !ok { + log.Fatalf("%T is not leader-aware, all reconcilers must be leader-aware to enable fine-grained leader election.", ctrl.Reconciler) + } + } } return controllers, webhooks diff --git a/vendor/knative.dev/pkg/leaderelection/config.go b/vendor/knative.dev/pkg/leaderelection/config.go index 06f0874e2..44378d30c 100644 --- a/vendor/knative.dev/pkg/leaderelection/config.go +++ b/vendor/knative.dev/pkg/leaderelection/config.go @@ -30,6 +30,10 @@ import ( const configMapNameEnv = "CONFIG_LEADERELECTION_NAME" +// MaxBuckets is the maximum number of buckets to allow users to define. +// This is a variable so that it may be customized in the binary entrypoint. +var MaxBuckets uint32 = 10 + var validResourceLocks = sets.NewString("leases", "configmaps", "endpoints") // NewConfigFromMap returns a Config for the given map, or an error. @@ -43,6 +47,8 @@ func NewConfigFromMap(data map[string]string) (*Config, error) { cm.AsDuration("renewDeadline", &config.RenewDeadline), cm.AsDuration("retryPeriod", &config.RetryPeriod), + cm.AsUint32("buckets", &config.Buckets), + // enabledComponents are not validated here, because they are dependent on // the component. Components should provide additional validation for this // field. @@ -51,6 +57,9 @@ func NewConfigFromMap(data map[string]string) (*Config, error) { return nil, err } + if config.Buckets < 1 || config.Buckets > MaxBuckets { + return nil, fmt.Errorf("buckets: value must be between %d <= %d <= %d", 1, config.Buckets, MaxBuckets) + } if !validResourceLocks.Has(config.ResourceLock) { return nil, fmt.Errorf(`resourceLock: invalid value %q: valid values are "leases","configmaps","endpoints"`, config.ResourceLock) } @@ -72,6 +81,7 @@ func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) { // single source repository, viz: serving or eventing. type Config struct { ResourceLock string + Buckets uint32 LeaseDuration time.Duration RenewDeadline time.Duration RetryPeriod time.Duration @@ -83,6 +93,7 @@ func (c *Config) GetComponentConfig(name string) ComponentConfig { return ComponentConfig{ Component: name, LeaderElect: true, + Buckets: c.Buckets, ResourceLock: c.ResourceLock, LeaseDuration: c.LeaseDuration, RenewDeadline: c.RenewDeadline, @@ -96,6 +107,7 @@ func (c *Config) GetComponentConfig(name string) ComponentConfig { func defaultConfig() *Config { return &Config{ ResourceLock: "leases", + Buckets: 1, LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, @@ -107,6 +119,7 @@ func defaultConfig() *Config { type ComponentConfig struct { Component string LeaderElect bool + Buckets uint32 ResourceLock string LeaseDuration time.Duration RenewDeadline time.Duration diff --git a/vendor/knative.dev/pkg/leaderelection/context.go b/vendor/knative.dev/pkg/leaderelection/context.go new file mode 100644 index 000000000..a0d8555f9 --- /dev/null +++ b/vendor/knative.dev/pkg/leaderelection/context.go @@ -0,0 +1,223 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "fmt" + "hash/fnv" + "strings" + "sync" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" + "knative.dev/pkg/system" +) + +// WithStandardLeaderElectorBuilder infuses a context with the ability to build +// LeaderElectors with the provided component configuration acquiring resource +// locks via the provided kubernetes client. +func WithStandardLeaderElectorBuilder(ctx context.Context, kc kubernetes.Interface, cc ComponentConfig) context.Context { + return context.WithValue(ctx, builderKey{}, &standardBuilder{ + kc: kc, + lec: cc, + }) +} + +// HasLeaderElection returns whether there is leader election configuration +// associated with the context +func HasLeaderElection(ctx context.Context) bool { + val := ctx.Value(builderKey{}) + return val != nil +} + +// Elector is the interface for running a leader elector. +type Elector interface { + Run(context.Context) +} + +// BuildElector builds a leaderelection.LeaderElector for the named LeaderAware +// reconciler using a builder added to the context via WithStandardLeaderElectorBuilder. +func BuildElector(ctx context.Context, la reconciler.LeaderAware, name string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) { + if val := ctx.Value(builderKey{}); val != nil { + switch builder := val.(type) { + case *standardBuilder: + return builder.BuildElector(ctx, la, name, enq) + } + // TODO(mattmoor): Add a flavor of builder that relies on StatefulSet to partition the key space. + } + + return &unopposedElector{ + la: la, + bkt: reconciler.UniversalBucket(), + enq: enq, + }, nil +} + +type builderKey struct{} + +type standardBuilder struct { + kc kubernetes.Interface + lec ComponentConfig +} + +func (b *standardBuilder) BuildElector(ctx context.Context, la reconciler.LeaderAware, name string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) { + logger := logging.FromContext(ctx) + + id, err := UniqueID() + if err != nil { + return nil, err + } + + buckets := make([]Elector, 0, b.lec.Buckets) + for i := uint32(0); i < b.lec.Buckets; i++ { + bkt := &bucket{ + component: b.lec.Component, + name: name, + index: i, + total: b.lec.Buckets, + } + + rl, err := resourcelock.New(b.lec.ResourceLock, + system.Namespace(), // use namespace we are running in + bkt.Name(), + b.kc.CoreV1(), + b.kc.CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + }) + if err != nil { + return nil, err + } + logger.Infof("%s will run in leader-elected mode with id %q", bkt.Name(), rl.Identity()) + + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: b.lec.LeaseDuration, + RenewDeadline: b.lec.RenewDeadline, + RetryPeriod: b.lec.RetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(context.Context) { + logger.Infof("%q has started leading %q", rl.Identity(), bkt.Name()) + if err := la.Promote(bkt, enq); err != nil { + // TODO(mattmoor): We expect this to effectively never happen, + // but if it does, we should support wrapping `le` in an elector + // we can cancel here. + logger.Fatalf("%q failed to Promote: %v", rl.Identity(), err) + } + }, + OnStoppedLeading: func() { + logger.Infof("%q has stopped leading %q", rl.Identity(), bkt.Name()) + la.Demote(bkt) + }, + }, + ReleaseOnCancel: true, + + Name: rl.Identity(), + }) + if err != nil { + return nil, err + } + // TODO: use health check watchdog, knative/pkg#1048 + // if lec.WatchDog != nil { + // lec.WatchDog.SetLeaderElection(le) + // } + buckets = append(buckets, &runUntilCancelled{Elector: le}) + } + return &runAll{les: buckets}, nil +} + +// unopposedElector promotes when run without needing to be elected. +type unopposedElector struct { + bkt reconciler.Bucket + la reconciler.LeaderAware + enq func(reconciler.Bucket, types.NamespacedName) +} + +// Run implements Elector +func (ue *unopposedElector) Run(ctx context.Context) { + ue.la.Promote(ue.bkt, ue.enq) +} + +type runAll struct { + les []Elector +} + +// Run implements Elector +func (ra *runAll) Run(ctx context.Context) { + sg := sync.WaitGroup{} + defer sg.Wait() + + for _, le := range ra.les { + sg.Add(1) + go func(le Elector) { + defer sg.Done() + le.Run(ctx) + }(le) + } +} + +// runUntilCancelled wraps a single-term Elector into one that runs until +// the passed context is cancelled. +type runUntilCancelled struct { + // Elector is a single-term elector as we get from K8s leaderelection package. + Elector +} + +// Run implements Elector +func (ruc *runUntilCancelled) Run(ctx context.Context) { + // Turn the single-term elector into a continuous election cycle. + for { + ruc.Elector.Run(ctx) + select { + case <-ctx.Done(): + return // Run quit because context was cancelled, we are done! + default: + // Context wasn't cancelled, start over. + } + } +} + +type bucket struct { + component string + name string + + // We are bucket {index} of {total} + index uint32 + total uint32 +} + +var _ reconciler.Bucket = (*bucket)(nil) + +// Name implements reconciler.Bucket +func (b *bucket) Name() string { + // The resource name is the lowercase: + // {component}.{workqueue}.{index}-of-{total} + return strings.ToLower(fmt.Sprintf("%s.%s.%02d-of-%02d", b.component, b.name, b.index, b.total)) +} + +// Has implements reconciler.Bucket +func (b *bucket) Has(nn types.NamespacedName) bool { + h := fnv.New32a() + h.Write([]byte(nn.Namespace + "." + nn.Name)) + ii := h.Sum32() % b.total + return b.index == ii +} diff --git a/vendor/knative.dev/pkg/reconciler/leader.go b/vendor/knative.dev/pkg/reconciler/leader.go new file mode 100644 index 000000000..3d5f68fdc --- /dev/null +++ b/vendor/knative.dev/pkg/reconciler/leader.go @@ -0,0 +1,118 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "sync" + + "k8s.io/apimachinery/pkg/types" +) + +// Bucket is an opaque type used to scope leadership. +type Bucket interface { + // Name returns a string representing this bucket, which uniquely + // identifies the bucket and is suitable for use as a resource lock name. + Name() string + + // Has determines whether this Bucket contains a particular key. + Has(key types.NamespacedName) bool +} + +// UniversalBucket returns a Bucket that "Has()" all keys. +func UniversalBucket() Bucket { + return &bucket{} +} + +type bucket struct{} + +var _ Bucket = (*bucket)(nil) + +// Name implements Bucket +func (b *bucket) Name() string { + return "" +} + +// Has implements Bucket +func (b *bucket) Has(nn types.NamespacedName) bool { + return true +} + +// LeaderAware is implemented by Reconcilers that are aware of their leader status. +type LeaderAware interface { + // Promote is called when we become the leader of a given Bucket. It must be + // supplied with an enqueue function through which a Bucket resync may be triggered. + Promote(b Bucket, enq func(Bucket, types.NamespacedName)) error + + // Demote is called when we stop being the leader for the specified Bucket. + Demote(Bucket) +} + +// LeaderAwareFuncs implements LeaderAware using the given functions for handling +// promotion and demotion. +type LeaderAwareFuncs struct { + sync.RWMutex + buckets map[string]Bucket + + PromoteFunc func(b Bucket, enq func(Bucket, types.NamespacedName)) error + DemoteFunc func(b Bucket) +} + +var _ LeaderAware = (*LeaderAwareFuncs)(nil) + +// IsLeaderFor implements LeaderAware +func (laf *LeaderAwareFuncs) IsLeaderFor(key types.NamespacedName) bool { + laf.RLock() + defer laf.RUnlock() + + for _, bkt := range laf.buckets { + if bkt.Has(key) { + return true + } + } + return false +} + +// Promote implements LeaderAware +func (laf *LeaderAwareFuncs) Promote(b Bucket, enq func(Bucket, types.NamespacedName)) error { + func() { + laf.Lock() + defer laf.Unlock() + if laf.buckets == nil { + laf.buckets = make(map[string]Bucket, 1) + } + laf.buckets[b.Name()] = b + return + }() + + if promote := laf.PromoteFunc; promote != nil { + return promote(b, enq) + } + return nil +} + +// Demote implements LeaderAware +func (laf *LeaderAwareFuncs) Demote(b Bucket) { + func() { + laf.Lock() + defer laf.Unlock() + delete(laf.buckets, b.Name()) + }() + + if demote := laf.DemoteFunc; demote != nil { + demote(b) + } +} diff --git a/vendor/knative.dev/pkg/reconciler/reconcile_common.go b/vendor/knative.dev/pkg/reconciler/reconcile_common.go index c9f8b5cb8..a807edfb6 100644 --- a/vendor/knative.dev/pkg/reconciler/reconcile_common.go +++ b/vendor/knative.dev/pkg/reconciler/reconcile_common.go @@ -18,6 +18,10 @@ package reconciler import ( "context" + "reflect" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -50,7 +54,7 @@ func PreProcessReconcile(ctx context.Context, resource duckv1.KRShaped) { } // PostProcessReconcile contains logic to apply after reconciliation of a resource. -func PostProcessReconcile(ctx context.Context, resource duckv1.KRShaped) { +func PostProcessReconcile(ctx context.Context, resource, oldResource duckv1.KRShaped) { logger := logging.FromContext(ctx) status := resource.GetStatus() mgr := resource.GetConditionSet().Manage(status) @@ -64,4 +68,26 @@ func PostProcessReconcile(ctx context.Context, resource duckv1.KRShaped) { } else if rc.Reason == failedGenerationBump { logger.Warn("A reconciler observed a new generation without updating the resource status") } + + groomConditionsTransitionTime(resource, oldResource) +} + +// groomConditionsTransitionTime ensures that the LastTransitionTime only advances for resources +// where the condition has changed during reconciliation. This also ensures that all advanced +// conditions share the same timestamp. +func groomConditionsTransitionTime(resource, oldResource duckv1.KRShaped) { + now := apis.VolatileTime{Inner: metav1.NewTime(time.Now())} + sts := resource.GetStatus() + for i := range sts.Conditions { + cond := &sts.Conditions[i] + + if oldCond := oldResource.GetStatus().GetCondition(cond.Type); oldCond != nil { + cond.LastTransitionTime = oldCond.LastTransitionTime + if reflect.DeepEqual(cond, oldCond) { + continue + } + } + + cond.LastTransitionTime = now + } } diff --git a/vendor/knative.dev/pkg/test/presubmit-tests.sh b/vendor/knative.dev/pkg/test/presubmit-tests.sh index 261599f71..af8199dd0 100755 --- a/vendor/knative.dev/pkg/test/presubmit-tests.sh +++ b/vendor/knative.dev/pkg/test/presubmit-tests.sh @@ -37,4 +37,16 @@ function pre_build_tests() { return 0 } +# Run the unit tests with an additional flag '-mod=vendor' to avoid +# downloading the deps in unit tests CI job +function unit_tests() { + # Run the default way. + default_unit_test_runner || failed=1 + + # Run unit testing select packages without race detection, + # so that they may use: // +build !race + report_go_test ./leaderelection || failed=1 + +} + main $@ diff --git a/vendor/modules.txt b/vendor/modules.txt index de329179e..b4a6aae57 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -178,7 +178,7 @@ github.com/rogpeppe/go-internal/module github.com/rogpeppe/go-internal/semver # github.com/spf13/pflag v1.0.5 github.com/spf13/pflag -# go.opencensus.io v0.22.3 +# go.opencensus.io v0.22.4 go.opencensus.io go.opencensus.io/internal go.opencensus.io/internal/tagencoding @@ -707,7 +707,7 @@ k8s.io/utils/buffer k8s.io/utils/integer k8s.io/utils/pointer k8s.io/utils/trace -# knative.dev/pkg v0.0.0-20200618002824-96c250871fac +# knative.dev/pkg v0.0.0-20200619020725-7df8fc5d7743 ## explicit knative.dev/pkg/apis knative.dev/pkg/apis/duck @@ -784,7 +784,7 @@ knative.dev/sample-controller/pkg/client/injection/informers/samples/v1alpha1/ad knative.dev/sample-controller/pkg/client/injection/reconciler/samples/v1alpha1/addressableservice knative.dev/sample-controller/pkg/client/listers/samples/v1alpha1 knative.dev/sample-controller/pkg/reconciler/addressableservice -# knative.dev/test-infra v0.0.0-20200617235125-6382dba95484 +# knative.dev/test-infra v0.0.0-20200618184825-a7b2980a8884 ## explicit knative.dev/test-infra/scripts # sigs.k8s.io/yaml v1.2.0