mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	implement SafeWaitGroup without race issue
This commit is contained in:
		
							
								
								
									
										32
									
								
								staging/src/k8s.io/apimachinery/pkg/util/waitgroup/BUILD
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										32
									
								
								staging/src/k8s.io/apimachinery/pkg/util/waitgroup/BUILD
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,32 @@
 | 
			
		||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 | 
			
		||||
 | 
			
		||||
go_library(
 | 
			
		||||
    name = "go_default_library",
 | 
			
		||||
    srcs = [
 | 
			
		||||
        "doc.go",
 | 
			
		||||
        "waitgroup.go",
 | 
			
		||||
    ],
 | 
			
		||||
    importpath = "k8s.io/apimachinery/pkg/util/waitgroup",
 | 
			
		||||
    visibility = ["//visibility:public"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
go_test(
 | 
			
		||||
    name = "go_default_test",
 | 
			
		||||
    srcs = ["waitgroup_test.go"],
 | 
			
		||||
    importpath = "k8s.io/apimachinery/pkg/util/waitgroup",
 | 
			
		||||
    library = ":go_default_library",
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
filegroup(
 | 
			
		||||
    name = "package-srcs",
 | 
			
		||||
    srcs = glob(["**"]),
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
    visibility = ["//visibility:private"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
filegroup(
 | 
			
		||||
    name = "all-srcs",
 | 
			
		||||
    srcs = [":package-srcs"],
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
    visibility = ["//visibility:public"],
 | 
			
		||||
)
 | 
			
		||||
							
								
								
									
										19
									
								
								staging/src/k8s.io/apimachinery/pkg/util/waitgroup/doc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								staging/src/k8s.io/apimachinery/pkg/util/waitgroup/doc.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,19 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2017 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
// Package waitgroup implements SafeWaitGroup wrap of sync.WaitGroup.
 | 
			
		||||
// Add with positive delta when waiting will fail, to prevent sync.WaitGroup race issue.
 | 
			
		||||
package waitgroup // import "k8s.io/apimachinery/pkg/util/waitgroup"
 | 
			
		||||
@@ -0,0 +1,57 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2017 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package waitgroup
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// SafeWaitGroup must not be copied after first use.
 | 
			
		||||
type SafeWaitGroup struct {
 | 
			
		||||
	wg sync.WaitGroup
 | 
			
		||||
	mu sync.RWMutex
 | 
			
		||||
	// wait indicate whether Wait is called, if true,
 | 
			
		||||
	// then any Add with positive delta will return error.
 | 
			
		||||
	wait bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add adds delta, which may be negative, similar to sync.WaitGroup.
 | 
			
		||||
// If Add with a positive delta happens after Wait, it will return error,
 | 
			
		||||
// which prevent unsafe Add.
 | 
			
		||||
func (wg *SafeWaitGroup) Add(delta int) error {
 | 
			
		||||
	wg.mu.RLock()
 | 
			
		||||
	if wg.wait && delta > 0 {
 | 
			
		||||
		return fmt.Errorf("Add with postive delta after Wait is forbidden")
 | 
			
		||||
	}
 | 
			
		||||
	wg.mu.RUnlock()
 | 
			
		||||
	wg.wg.Add(delta)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Done decrements the WaitGroup counter.
 | 
			
		||||
func (wg *SafeWaitGroup) Done() {
 | 
			
		||||
	wg.wg.Done()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Wait blocks until the WaitGroup counter is zero.
 | 
			
		||||
func (wg *SafeWaitGroup) Wait() {
 | 
			
		||||
	wg.mu.Lock()
 | 
			
		||||
	wg.wait = true
 | 
			
		||||
	wg.mu.Unlock()
 | 
			
		||||
	wg.wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,178 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2017 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
// Package waitgroup test cases reference golang sync.WaitGroup https://golang.org/src/sync/waitgroup_test.go.
 | 
			
		||||
package waitgroup
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestWaitGroup(t *testing.T) {
 | 
			
		||||
	wg1 := &SafeWaitGroup{}
 | 
			
		||||
	wg2 := &SafeWaitGroup{}
 | 
			
		||||
	n := 16
 | 
			
		||||
	wg1.Add(n)
 | 
			
		||||
	wg2.Add(n)
 | 
			
		||||
	exited := make(chan bool, n)
 | 
			
		||||
	for i := 0; i != n; i++ {
 | 
			
		||||
		go func(i int) {
 | 
			
		||||
			wg1.Done()
 | 
			
		||||
			wg2.Wait()
 | 
			
		||||
			exited <- true
 | 
			
		||||
		}(i)
 | 
			
		||||
	}
 | 
			
		||||
	wg1.Wait()
 | 
			
		||||
	for i := 0; i != n; i++ {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-exited:
 | 
			
		||||
			t.Fatal("SafeWaitGroup released group too soon")
 | 
			
		||||
		default:
 | 
			
		||||
		}
 | 
			
		||||
		wg2.Done()
 | 
			
		||||
	}
 | 
			
		||||
	for i := 0; i != n; i++ {
 | 
			
		||||
		<-exited // Will block if barrier fails to unlock someone.
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWaitGroupNegativeCounter(t *testing.T) {
 | 
			
		||||
	defer func() {
 | 
			
		||||
		err := recover()
 | 
			
		||||
		if err != "sync: negative WaitGroup counter" {
 | 
			
		||||
			t.Fatalf("Unexpected panic: %#v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	wg := &SafeWaitGroup{}
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	wg.Done()
 | 
			
		||||
	wg.Done()
 | 
			
		||||
	t.Fatal("Should panic")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWaitGroupAddFail(t *testing.T) {
 | 
			
		||||
	wg := &SafeWaitGroup{}
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	wg.Done()
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	if err := wg.Add(1); err == nil {
 | 
			
		||||
		t.Errorf("Should return error when add positive after Wait")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkWaitGroupUncontended(b *testing.B) {
 | 
			
		||||
	type PaddedWaitGroup struct {
 | 
			
		||||
		SafeWaitGroup
 | 
			
		||||
		pad [128]uint8
 | 
			
		||||
	}
 | 
			
		||||
	const CallsPerSched = 1000
 | 
			
		||||
	procs := runtime.GOMAXPROCS(-1)
 | 
			
		||||
	N := int32(b.N / CallsPerSched)
 | 
			
		||||
	c := make(chan bool, procs)
 | 
			
		||||
	for p := 0; p < procs; p++ {
 | 
			
		||||
		go func() {
 | 
			
		||||
			var wg PaddedWaitGroup
 | 
			
		||||
			for atomic.AddInt32(&N, -1) >= 0 {
 | 
			
		||||
				runtime.Gosched()
 | 
			
		||||
				for g := 0; g < CallsPerSched; g++ {
 | 
			
		||||
					wg.Add(1)
 | 
			
		||||
					wg.Done()
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			c <- true
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	for p := 0; p < procs; p++ {
 | 
			
		||||
		<-c
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func benchmarkWaitGroupAddDone(b *testing.B, localWork int) {
 | 
			
		||||
	const CallsPerSched = 1000
 | 
			
		||||
	procs := runtime.GOMAXPROCS(-1)
 | 
			
		||||
	N := int32(b.N / CallsPerSched)
 | 
			
		||||
	c := make(chan bool, procs)
 | 
			
		||||
	var wg SafeWaitGroup
 | 
			
		||||
	for p := 0; p < procs; p++ {
 | 
			
		||||
		go func() {
 | 
			
		||||
			foo := 0
 | 
			
		||||
			for atomic.AddInt32(&N, -1) >= 0 {
 | 
			
		||||
				runtime.Gosched()
 | 
			
		||||
				for g := 0; g < CallsPerSched; g++ {
 | 
			
		||||
					wg.Add(1)
 | 
			
		||||
					for i := 0; i < localWork; i++ {
 | 
			
		||||
						foo *= 2
 | 
			
		||||
						foo /= 2
 | 
			
		||||
					}
 | 
			
		||||
					wg.Done()
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			c <- foo == 42
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	for p := 0; p < procs; p++ {
 | 
			
		||||
		<-c
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkWaitGroupAddDone(b *testing.B) {
 | 
			
		||||
	benchmarkWaitGroupAddDone(b, 0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
 | 
			
		||||
	benchmarkWaitGroupAddDone(b, 100)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func benchmarkWaitGroupWait(b *testing.B, localWork int) {
 | 
			
		||||
	const CallsPerSched = 1000
 | 
			
		||||
	procs := runtime.GOMAXPROCS(-1)
 | 
			
		||||
	N := int32(b.N / CallsPerSched)
 | 
			
		||||
	c := make(chan bool, procs)
 | 
			
		||||
	var wg SafeWaitGroup
 | 
			
		||||
	wg.Add(procs)
 | 
			
		||||
	for p := 0; p < procs; p++ {
 | 
			
		||||
		go wg.Done()
 | 
			
		||||
	}
 | 
			
		||||
	for p := 0; p < procs; p++ {
 | 
			
		||||
		go func() {
 | 
			
		||||
			foo := 0
 | 
			
		||||
			for atomic.AddInt32(&N, -1) >= 0 {
 | 
			
		||||
				runtime.Gosched()
 | 
			
		||||
				for g := 0; g < CallsPerSched; g++ {
 | 
			
		||||
					wg.Wait()
 | 
			
		||||
					for i := 0; i < localWork; i++ {
 | 
			
		||||
						foo *= 2
 | 
			
		||||
						foo /= 2
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			c <- foo == 42
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	for p := 0; p < procs; p++ {
 | 
			
		||||
		<-c
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkWaitGroupWait(b *testing.B) {
 | 
			
		||||
	benchmarkWaitGroupWait(b, 0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkWaitGroupWaitWork(b *testing.B) {
 | 
			
		||||
	benchmarkWaitGroupWait(b, 100)
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user