mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 02:08:13 +00:00 
			
		
		
		
	Factored TimedObserver into less surprising pieces
This commit is contained in:
		| @@ -61,13 +61,13 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) { | ||||
| // requestWatermark is used to track maximal numbers of requests in a particular phase of handling | ||||
| type requestWatermark struct { | ||||
| 	phase                                string | ||||
| 	readOnlyObserver, mutatingObserver   fcmetrics.TimedObserver | ||||
| 	readOnlyObserver, mutatingObserver   fcmetrics.RatioedChangeObserver | ||||
| 	lock                                 sync.Mutex | ||||
| 	readOnlyWatermark, mutatingWatermark int | ||||
| } | ||||
|  | ||||
| func (w *requestWatermark) recordMutating(mutatingVal int) { | ||||
| 	w.mutatingObserver.Set(float64(mutatingVal)) | ||||
| 	w.mutatingObserver.Observe(float64(mutatingVal)) | ||||
|  | ||||
| 	w.lock.Lock() | ||||
| 	defer w.lock.Unlock() | ||||
| @@ -78,7 +78,7 @@ func (w *requestWatermark) recordMutating(mutatingVal int) { | ||||
| } | ||||
|  | ||||
| func (w *requestWatermark) recordReadOnly(readOnlyVal int) { | ||||
| 	w.readOnlyObserver.Set(float64(readOnlyVal)) | ||||
| 	w.readOnlyObserver.Observe(float64(readOnlyVal)) | ||||
|  | ||||
| 	w.lock.Lock() | ||||
| 	defer w.lock.Unlock() | ||||
| @@ -132,11 +132,11 @@ func WithMaxInFlightLimit( | ||||
| 	var mutatingChan chan bool | ||||
| 	if nonMutatingLimit != 0 { | ||||
| 		nonMutatingChan = make(chan bool, nonMutatingLimit) | ||||
| 		watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit)) | ||||
| 		watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit)) | ||||
| 	} | ||||
| 	if mutatingLimit != 0 { | ||||
| 		mutatingChan = make(chan bool, mutatingLimit) | ||||
| 		watermark.mutatingObserver.SetX1(float64(mutatingLimit)) | ||||
| 		watermark.mutatingObserver.SetDenominator(float64(mutatingLimit)) | ||||
| 	} | ||||
|  | ||||
| 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
|   | ||||
| @@ -102,8 +102,8 @@ type configController struct { | ||||
| 	name                  string // varies in tests of fighting controllers | ||||
| 	clock                 clock.PassiveClock | ||||
| 	queueSetFactory       fq.QueueSetFactory | ||||
| 	reqsObsPairGenerator  metrics.TimedObserverPairGenerator | ||||
| 	execSeatsObsGenerator metrics.TimedObserverGenerator | ||||
| 	reqsObsPairGenerator  metrics.RatioedChangeObserverPairGenerator | ||||
| 	execSeatsObsGenerator metrics.RatioedChangeObserverGenerator | ||||
|  | ||||
| 	// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager | ||||
| 	asFieldManager string | ||||
| @@ -193,10 +193,10 @@ type priorityLevelState struct { | ||||
| 	numPending int | ||||
|  | ||||
| 	// Observers tracking number of requests waiting, executing | ||||
| 	reqsObsPair metrics.TimedObserverPair | ||||
| 	reqsObsPair metrics.RatioedChangeObserverPair | ||||
|  | ||||
| 	// Observer of number of seats occupied throughout execution | ||||
| 	execSeatsObs metrics.TimedObserver | ||||
| 	execSeatsObs metrics.RatioedChangeObserver | ||||
| } | ||||
|  | ||||
| // NewTestableController is extra flexible to facilitate testing | ||||
| @@ -693,7 +693,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { | ||||
| // given priority level configuration.  Returns nil if that config | ||||
| // does not call for limiting.  Returns nil and an error if the given | ||||
| // object is malformed in a way that is a problem for this package. | ||||
| func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) { | ||||
| func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { | ||||
| 	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { | ||||
| 		return nil, errors.New("broken union structure at the top") | ||||
| 	} | ||||
|   | ||||
| @@ -135,10 +135,10 @@ type TestableConfig struct { | ||||
| 	RequestWaitLimit time.Duration | ||||
|  | ||||
| 	// ObsPairGenerator for metrics about requests | ||||
| 	ReqsObsPairGenerator metrics.TimedObserverPairGenerator | ||||
| 	ReqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator | ||||
|  | ||||
| 	// TimedObserverPairGenerator for metrics about seats occupied by all phases of execution | ||||
| 	ExecSeatsObsGenerator metrics.TimedObserverGenerator | ||||
| 	// RatioedChangeObserverPairGenerator for metrics about seats occupied by all phases of execution | ||||
| 	ExecSeatsObsGenerator metrics.RatioedChangeObserverGenerator | ||||
|  | ||||
| 	// QueueSetFactory for the queuing implementation | ||||
| 	QueueSetFactory fq.QueueSetFactory | ||||
|   | ||||
| @@ -105,7 +105,7 @@ type ctlrTestRequest struct { | ||||
| 	descr1, descr2 interface{} | ||||
| } | ||||
|  | ||||
| func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.TimedObserverPair, eso metrics.TimedObserver) (fq.QueueSetCompleter, error) { | ||||
| func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedChangeObserverPair, eso metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { | ||||
| 	return ctlrTestQueueSetCompleter{cts, nil, qc}, nil | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -28,10 +28,9 @@ import ( | ||||
| // Integrator computes the moments of some variable X over time as | ||||
| // read from a particular clock.  The integrals start when the | ||||
| // Integrator is created, and ends at the latest operation on the | ||||
| // Integrator.  As a `metrics.TimedObserver` this fixes X1=1 and | ||||
| // ignores attempts to change X1. | ||||
| // Integrator. | ||||
| type Integrator interface { | ||||
| 	metrics.TimedObserver | ||||
| 	metrics.ChangeObserver | ||||
|  | ||||
| 	GetResults() IntegratorResults | ||||
|  | ||||
| @@ -70,10 +69,7 @@ func NewIntegrator(clock clock.PassiveClock) Integrator { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (igr *integrator) SetX1(x1 float64) { | ||||
| } | ||||
|  | ||||
| func (igr *integrator) Set(x float64) { | ||||
| func (igr *integrator) Observe(x float64) { | ||||
| 	igr.Lock() | ||||
| 	igr.setLocked(x) | ||||
| 	igr.Unlock() | ||||
|   | ||||
| @@ -38,7 +38,7 @@ func TestIntegrator(t *testing.T) { | ||||
| 	if !results.Equal(&rToo) { | ||||
| 		t.Errorf("expected %#+v, got %#+v", results, rToo) | ||||
| 	} | ||||
| 	igr.Set(2) | ||||
| 	igr.Observe(2) | ||||
| 	results = igr.GetResults() | ||||
| 	if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) { | ||||
| 		t.Errorf("expected %#+v, got %#+v", e, results) | ||||
|   | ||||
| @@ -32,10 +32,10 @@ import ( | ||||
| // before committing to a concurrency allotment for the second. | ||||
| type QueueSetFactory interface { | ||||
| 	// BeginConstruction does the first phase of creating a QueueSet. | ||||
| 	// The TimedObserverPair observes number of requests, | ||||
| 	// The RatioedChangeObserverPair observes number of requests, | ||||
| 	// execution covering just the regular phase. | ||||
| 	// The TimedObserver observes number of seats occupied through all phases of execution. | ||||
| 	BeginConstruction(QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (QueueSetCompleter, error) | ||||
| 	// The RatioedChangeObserver observes number of seats occupied through all phases of execution. | ||||
| 	BeginConstruction(QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (QueueSetCompleter, error) | ||||
| } | ||||
|  | ||||
| // QueueSetCompleter finishes the two-step process of creating or | ||||
|   | ||||
| @@ -61,8 +61,8 @@ type promiseFactoryFactory func(*queueSet) promiseFactory | ||||
| // the fields `factory` and `theSet` is non-nil. | ||||
| type queueSetCompleter struct { | ||||
| 	factory      *queueSetFactory | ||||
| 	reqsObsPair  metrics.TimedObserverPair | ||||
| 	execSeatsObs metrics.TimedObserver | ||||
| 	reqsObsPair  metrics.RatioedChangeObserverPair | ||||
| 	execSeatsObs metrics.RatioedChangeObserver | ||||
| 	theSet       *queueSet | ||||
| 	qCfg         fq.QueuingConfig | ||||
| 	dealer       *shufflesharding.Dealer | ||||
| @@ -81,9 +81,9 @@ type queueSet struct { | ||||
| 	clock                    eventclock.Interface | ||||
| 	estimatedServiceDuration time.Duration | ||||
|  | ||||
| 	reqsObsPair metrics.TimedObserverPair // .RequestsExecuting covers regular phase only | ||||
| 	reqsObsPair metrics.RatioedChangeObserverPair // .RequestsExecuting covers regular phase only | ||||
|  | ||||
| 	execSeatsObs metrics.TimedObserver // for all phases of execution | ||||
| 	execSeatsObs metrics.RatioedChangeObserver // for all phases of execution | ||||
|  | ||||
| 	promiseFactory promiseFactory | ||||
|  | ||||
| @@ -148,7 +148,7 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) { | ||||
| func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { | ||||
| 	dealer, err := checkConfig(qCfg) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -243,9 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, | ||||
| 	if qll < 1 { | ||||
| 		qll = 1 | ||||
| 	} | ||||
| 	qs.reqsObsPair.RequestsWaiting.SetX1(float64(qll)) | ||||
| 	qs.reqsObsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit)) | ||||
| 	qs.execSeatsObs.SetX1(float64(dCfg.ConcurrencyLimit)) | ||||
| 	qs.reqsObsPair.RequestsWaiting.SetDenominator(float64(qll)) | ||||
| 	qs.reqsObsPair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) | ||||
| 	qs.execSeatsObs.SetDenominator(float64(dCfg.ConcurrencyLimit)) | ||||
|  | ||||
| 	qs.dispatchAsMuchAsPossibleLocked() | ||||
| } | ||||
|   | ||||
| @@ -1461,10 +1461,10 @@ func newFIFO(requests ...*request) fifo { | ||||
| 	return l | ||||
| } | ||||
|  | ||||
| func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair { | ||||
| func newObserverPair(clk clock.PassiveClock) metrics.RatioedChangeObserverPair { | ||||
| 	return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) | ||||
| } | ||||
|  | ||||
| func newExecSeatsObserver(clk clock.PassiveClock) metrics.TimedObserver { | ||||
| func newExecSeatsObserver(clk clock.PassiveClock) metrics.RatioedChangeObserver { | ||||
| 	return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, []string{"test"}) | ||||
| } | ||||
|   | ||||
| @@ -40,7 +40,7 @@ type noRestraint struct{} | ||||
|  | ||||
| type noRestraintRequest struct{} | ||||
|  | ||||
| func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (fq.QueueSetCompleter, error) { | ||||
| func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) { | ||||
| 	return noRestraintCompleter{}, nil | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -0,0 +1,65 @@ | ||||
| /* | ||||
| Copyright 2019 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package metrics | ||||
|  | ||||
| // Observer is something that can be given numeric observations. | ||||
| type Observer interface { | ||||
| 	// Observe takes an observation | ||||
| 	Observe(float64) | ||||
| } | ||||
|  | ||||
| //  ChangeObserver extends Observer with the ability to take | ||||
| // an observation that is relative to the previous observation. | ||||
| type ChangeObserver interface { | ||||
| 	Observer | ||||
|  | ||||
| 	// Observe a new value that differs by the given amount from the previous observation. | ||||
| 	Add(float64) | ||||
| } | ||||
|  | ||||
| // RatioedChangeObserver tracks ratios. | ||||
| // The numerator is set/changed through the ChangeObserver methods, | ||||
| // and the denominator can be updated through the SetDenominator method. | ||||
| // A ratio is tracked whenever the numerator is set/changed. | ||||
| type RatioedChangeObserver interface { | ||||
| 	ChangeObserver | ||||
|  | ||||
| 	// SetDenominator sets the denominator to use until it is changed again | ||||
| 	SetDenominator(float64) | ||||
| } | ||||
|  | ||||
| // RatioedChangeObserverGenerator creates related observers that are | ||||
| // differentiated by a series of label values | ||||
| type RatioedChangeObserverGenerator interface { | ||||
| 	Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver | ||||
| } | ||||
|  | ||||
| // RatioedChangeObserverPair is a corresponding pair of observers, one for the | ||||
| // number of requests waiting in queue(s) and one for the number of | ||||
| // requests being executed | ||||
| type RatioedChangeObserverPair struct { | ||||
| 	// RequestsWaiting is given observations of the number of currently queued requests | ||||
| 	RequestsWaiting RatioedChangeObserver | ||||
|  | ||||
| 	// RequestsExecuting is given observations of the number of requests currently executing | ||||
| 	RequestsExecuting RatioedChangeObserver | ||||
| } | ||||
|  | ||||
| // RatioedChangeObserverPairGenerator generates pairs | ||||
| type RatioedChangeObserverPairGenerator interface { | ||||
| 	Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair | ||||
| } | ||||
| @@ -34,13 +34,13 @@ const ( | ||||
| 	labelValueExecuting = "executing" | ||||
| ) | ||||
|  | ||||
| // SampleAndWaterMarkPairGenerator makes pairs of TimedObservers that | ||||
| // SampleAndWaterMarkPairGenerator makes pairs of RatioedChangeObservers that | ||||
| // track samples and watermarks. | ||||
| type SampleAndWaterMarkPairGenerator struct { | ||||
| 	urGenerator SampleAndWaterMarkObserverGenerator | ||||
| } | ||||
|  | ||||
| var _ TimedObserverPairGenerator = SampleAndWaterMarkPairGenerator{} | ||||
| var _ RatioedChangeObserverPairGenerator = SampleAndWaterMarkPairGenerator{} | ||||
|  | ||||
| // NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator | ||||
| func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator { | ||||
| @@ -50,10 +50,10 @@ func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samp | ||||
| } | ||||
|  | ||||
| // Generate makes a new pair | ||||
| func (spg SampleAndWaterMarkPairGenerator) Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair { | ||||
| 	return TimedObserverPair{ | ||||
| 		RequestsWaiting:   spg.urGenerator.Generate(0, waiting1, append([]string{labelValueWaiting}, labelValues...)), | ||||
| 		RequestsExecuting: spg.urGenerator.Generate(0, executing1, append([]string{labelValueExecuting}, labelValues...)), | ||||
| func (spg SampleAndWaterMarkPairGenerator) Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair { | ||||
| 	return RatioedChangeObserverPair{ | ||||
| 		RequestsWaiting:   spg.urGenerator.Generate(0, initialWaitingDenominator, append([]string{labelValueWaiting}, labelValues...)), | ||||
| 		RequestsExecuting: spg.urGenerator.Generate(0, initialExecutingDenominator, append([]string{labelValueExecuting}, labelValues...)), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -61,7 +61,7 @@ func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables { | ||||
| 	return spg.urGenerator.metrics() | ||||
| } | ||||
|  | ||||
| // SampleAndWaterMarkObserverGenerator creates TimedObservers that | ||||
| // SampleAndWaterMarkObserverGenerator creates RatioedChangeObservers that | ||||
| // populate histograms of samples and low- and high-water-marks.  The | ||||
| // generator has a samplePeriod, and the histograms get an observation | ||||
| // every samplePeriod.  The sampling windows are quantized based on | ||||
| @@ -79,7 +79,7 @@ type sampleAndWaterMarkObserverGenerator struct { | ||||
| 	waterMarks   *compbasemetrics.HistogramVec | ||||
| } | ||||
|  | ||||
| var _ TimedObserverGenerator = SampleAndWaterMarkObserverGenerator{} | ||||
| var _ RatioedChangeObserverGenerator = SampleAndWaterMarkObserverGenerator{} | ||||
|  | ||||
| // NewSampleAndWaterMarkHistogramsGenerator makes a new one | ||||
| func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator { | ||||
| @@ -97,23 +97,23 @@ func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 { | ||||
| 	return int64(when.Sub(swg.t0) / swg.samplePeriod) | ||||
| } | ||||
|  | ||||
| // Generate makes a new TimedObserver | ||||
| func (swg *sampleAndWaterMarkObserverGenerator) Generate(x, x1 float64, labelValues []string) TimedObserver { | ||||
| 	relX := x / x1 | ||||
| // Generate makes a new RatioedChangeObserver | ||||
| func (swg *sampleAndWaterMarkObserverGenerator) Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver { | ||||
| 	ratio := initialNumerator / initialDenominator | ||||
| 	when := swg.clock.Now() | ||||
| 	return &sampleAndWaterMarkHistograms{ | ||||
| 		sampleAndWaterMarkObserverGenerator: swg, | ||||
| 		labelValues:                         labelValues, | ||||
| 		loLabelValues:                       append([]string{labelValueLo}, labelValues...), | ||||
| 		hiLabelValues:                       append([]string{labelValueHi}, labelValues...), | ||||
| 		x1:                                  x1, | ||||
| 		denominator:                         initialDenominator, | ||||
| 		sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{ | ||||
| 			lastSet:    when, | ||||
| 			lastSetInt: swg.quantize(when), | ||||
| 			x:          x, | ||||
| 			relX:       relX, | ||||
| 			loRelX:     relX, | ||||
| 			hiRelX:     relX, | ||||
| 			numerator:  initialNumerator, | ||||
| 			ratio:      ratio, | ||||
| 			loRatio:    ratio, | ||||
| 			hiRatio:    ratio, | ||||
| 		}} | ||||
| } | ||||
|  | ||||
| @@ -127,39 +127,39 @@ type sampleAndWaterMarkHistograms struct { | ||||
| 	loLabelValues, hiLabelValues []string | ||||
|  | ||||
| 	sync.Mutex | ||||
| 	x1 float64 | ||||
| 	denominator float64 | ||||
| 	sampleAndWaterMarkAccumulator | ||||
| } | ||||
|  | ||||
| type sampleAndWaterMarkAccumulator struct { | ||||
| 	lastSet        time.Time | ||||
| 	lastSetInt     int64 // lastSet / samplePeriod | ||||
| 	x              float64 | ||||
| 	relX           float64 // x / x1 | ||||
| 	loRelX, hiRelX float64 | ||||
| 	lastSet          time.Time | ||||
| 	lastSetInt       int64 // lastSet / samplePeriod | ||||
| 	numerator        float64 | ||||
| 	ratio            float64 // numerator/denominator | ||||
| 	loRatio, hiRatio float64 | ||||
| } | ||||
|  | ||||
| var _ TimedObserver = (*sampleAndWaterMarkHistograms)(nil) | ||||
| var _ RatioedChangeObserver = (*sampleAndWaterMarkHistograms)(nil) | ||||
|  | ||||
| func (saw *sampleAndWaterMarkHistograms) Add(deltaX float64) { | ||||
| func (saw *sampleAndWaterMarkHistograms) Add(deltaNumerator float64) { | ||||
| 	saw.innerSet(func() { | ||||
| 		saw.x += deltaX | ||||
| 		saw.numerator += deltaNumerator | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (saw *sampleAndWaterMarkHistograms) Set(x float64) { | ||||
| func (saw *sampleAndWaterMarkHistograms) Observe(numerator float64) { | ||||
| 	saw.innerSet(func() { | ||||
| 		saw.x = x | ||||
| 		saw.numerator = numerator | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (saw *sampleAndWaterMarkHistograms) SetX1(x1 float64) { | ||||
| func (saw *sampleAndWaterMarkHistograms) SetDenominator(denominator float64) { | ||||
| 	saw.innerSet(func() { | ||||
| 		saw.x1 = x1 | ||||
| 		saw.denominator = denominator | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { | ||||
| func (saw *sampleAndWaterMarkHistograms) innerSet(updateNumeratorOrDenominator func()) { | ||||
| 	when, whenInt, acc, wellOrdered := func() (time.Time, int64, sampleAndWaterMarkAccumulator, bool) { | ||||
| 		saw.Lock() | ||||
| 		defer saw.Unlock() | ||||
| @@ -168,11 +168,11 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { | ||||
| 		whenInt := saw.quantize(when) | ||||
| 		acc := saw.sampleAndWaterMarkAccumulator | ||||
| 		wellOrdered := !when.Before(acc.lastSet) | ||||
| 		updateXOrX1() | ||||
| 		saw.relX = saw.x / saw.x1 | ||||
| 		updateNumeratorOrDenominator() | ||||
| 		saw.ratio = saw.numerator / saw.denominator | ||||
| 		if wellOrdered { | ||||
| 			if acc.lastSetInt < whenInt { | ||||
| 				saw.loRelX, saw.hiRelX = acc.relX, acc.relX | ||||
| 				saw.loRatio, saw.hiRatio = acc.ratio, acc.ratio | ||||
| 				saw.lastSetInt = whenInt | ||||
| 			} | ||||
| 			saw.lastSet = when | ||||
| @@ -187,10 +187,10 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { | ||||
| 		// would be wrong to update `saw.lastSet` in this case because | ||||
| 		// that plants a time bomb for future updates to | ||||
| 		// `saw.lastSetInt`. | ||||
| 		if saw.relX < saw.loRelX { | ||||
| 			saw.loRelX = saw.relX | ||||
| 		} else if saw.relX > saw.hiRelX { | ||||
| 			saw.hiRelX = saw.relX | ||||
| 		if saw.ratio < saw.loRatio { | ||||
| 			saw.loRatio = saw.ratio | ||||
| 		} else if saw.ratio > saw.hiRatio { | ||||
| 			saw.hiRatio = saw.ratio | ||||
| 		} | ||||
| 		return when, whenInt, acc, wellOrdered | ||||
| 	}() | ||||
| @@ -200,10 +200,10 @@ func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { | ||||
| 		klog.Errorf("Time went backwards from %s to %s for labelValues=%#+v", lastSetS, whenS, saw.labelValues) | ||||
| 	} | ||||
| 	for acc.lastSetInt < whenInt { | ||||
| 		saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.relX) | ||||
| 		saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRelX) | ||||
| 		saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRelX) | ||||
| 		saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.ratio) | ||||
| 		saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRatio) | ||||
| 		saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRatio) | ||||
| 		acc.lastSetInt++ | ||||
| 		acc.loRelX, acc.hiRelX = acc.relX, acc.relX | ||||
| 		acc.loRatio, acc.hiRatio = acc.ratio, acc.ratio | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -81,7 +81,7 @@ func TestSampler(t *testing.T) { | ||||
| 			dt = diff | ||||
| 		} | ||||
| 		clk.SetTime(t1) | ||||
| 		saw.Set(1) | ||||
| 		saw.Observe(1) | ||||
| 		expectedCount := int64(dt / samplingPeriod) | ||||
| 		actualCount, err := getHistogramCount(regs, samplesHistName) | ||||
| 		if err != nil { | ||||
|   | ||||
| @@ -1,52 +0,0 @@ | ||||
| /* | ||||
| Copyright 2019 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package metrics | ||||
|  | ||||
| // TimedObserver gets informed about the values assigned to a variable | ||||
| // `X float64` over time, and reports on the ratio `X/X1`. | ||||
| type TimedObserver interface { | ||||
| 	// Add notes a change to the variable | ||||
| 	Add(deltaX float64) | ||||
|  | ||||
| 	// Set notes a setting of the variable | ||||
| 	Set(x float64) | ||||
|  | ||||
| 	// SetX1 changes the value to use for X1 | ||||
| 	SetX1(x1 float64) | ||||
| } | ||||
|  | ||||
| // TimedObserverGenerator creates related observers that are | ||||
| // differentiated by a series of label values | ||||
| type TimedObserverGenerator interface { | ||||
| 	Generate(x, x1 float64, labelValues []string) TimedObserver | ||||
| } | ||||
|  | ||||
| // TimedObserverPair is a corresponding pair of observers, one for the | ||||
| // number of requests waiting in queue(s) and one for the number of | ||||
| // requests being executed | ||||
| type TimedObserverPair struct { | ||||
| 	// RequestsWaiting is given observations of the number of currently queued requests | ||||
| 	RequestsWaiting TimedObserver | ||||
|  | ||||
| 	// RequestsExecuting is given observations of the number of requests currently executing | ||||
| 	RequestsExecuting TimedObserver | ||||
| } | ||||
|  | ||||
| // TimedObserverPairGenerator generates pairs | ||||
| type TimedObserverPairGenerator interface { | ||||
| 	Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 Mike Spreitzer
					Mike Spreitzer