mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #21768 from mesosphere/jdef_fix_scheduler_ha_787
Auto commit by PR queue bot
This commit is contained in:
		@@ -17,8 +17,6 @@ limitations under the License.
 | 
			
		||||
package election
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/watch"
 | 
			
		||||
 | 
			
		||||
@@ -46,14 +44,7 @@ type Service interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type notifier struct {
 | 
			
		||||
	changed chan struct{} // to notify the service loop about changed state
 | 
			
		||||
 | 
			
		||||
	// desired is updated with every change, current is updated after
 | 
			
		||||
	// Start()/Stop() finishes. 'cond' is used to signal that a change
 | 
			
		||||
	// might be needed. This handles the case where mastership flops
 | 
			
		||||
	// around without calling Start()/Stop() excessively.
 | 
			
		||||
	desired, current Master
 | 
			
		||||
	lock             sync.Mutex // to protect the desired variable
 | 
			
		||||
	masters chan Master // elected masters arrive here, should be buffered to better deal with rapidly flapping masters
 | 
			
		||||
 | 
			
		||||
	// for comparison, to see if we are master.
 | 
			
		||||
	id Master
 | 
			
		||||
@@ -64,8 +55,7 @@ type notifier struct {
 | 
			
		||||
// Notify runs Elect() on m, and calls Start()/Stop() on s when the
 | 
			
		||||
// elected master starts/stops matching 'id'. Never returns.
 | 
			
		||||
func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{}) {
 | 
			
		||||
	n := ¬ifier{id: Master(id), service: s}
 | 
			
		||||
	n.changed = make(chan struct{})
 | 
			
		||||
	n := ¬ifier{id: Master(id), service: s, masters: make(chan Master, 1)}
 | 
			
		||||
	finished := runtime.After(func() {
 | 
			
		||||
		runtime.Until(func() {
 | 
			
		||||
			for {
 | 
			
		||||
@@ -87,14 +77,21 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{})
 | 
			
		||||
							break
 | 
			
		||||
						}
 | 
			
		||||
 | 
			
		||||
						n.lock.Lock()
 | 
			
		||||
						n.desired = electedMaster
 | 
			
		||||
						n.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
						// notify serviceLoop, but don't block. If a change
 | 
			
		||||
						// is queued already it will see the new n.desired.
 | 
			
		||||
						select {
 | 
			
		||||
						case n.changed <- struct{}{}:
 | 
			
		||||
					sendElected:
 | 
			
		||||
						for {
 | 
			
		||||
							select {
 | 
			
		||||
							case <-abort:
 | 
			
		||||
								return
 | 
			
		||||
							case n.masters <- electedMaster:
 | 
			
		||||
								break sendElected
 | 
			
		||||
							default: // ring full, discard old value and add the new
 | 
			
		||||
								select {
 | 
			
		||||
								case <-abort:
 | 
			
		||||
									return
 | 
			
		||||
								case <-n.masters:
 | 
			
		||||
								default: // ring was cleared for us?!
 | 
			
		||||
								}
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
@@ -106,22 +103,19 @@ func Notify(m MasterElector, path, id string, s Service, abort <-chan struct{})
 | 
			
		||||
 | 
			
		||||
// serviceLoop waits for changes, and calls Start()/Stop() as needed.
 | 
			
		||||
func (n *notifier) serviceLoop(abort <-chan struct{}) {
 | 
			
		||||
	var current Master
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-abort:
 | 
			
		||||
			return
 | 
			
		||||
		case <-n.changed:
 | 
			
		||||
			n.lock.Lock()
 | 
			
		||||
			newDesired := n.desired // copy value to avoid race below
 | 
			
		||||
			n.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
			if n.current != n.id && newDesired == n.id {
 | 
			
		||||
				n.service.Validate(newDesired, n.current)
 | 
			
		||||
		case desired := <-n.masters:
 | 
			
		||||
			if current != n.id && desired == n.id {
 | 
			
		||||
				n.service.Validate(desired, current)
 | 
			
		||||
				n.service.Start()
 | 
			
		||||
			} else if n.current == n.id && newDesired != n.id {
 | 
			
		||||
			} else if current == n.id && desired != n.id {
 | 
			
		||||
				n.service.Stop()
 | 
			
		||||
			}
 | 
			
		||||
			n.current = newDesired
 | 
			
		||||
			current = desired
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -43,6 +43,7 @@ import (
 | 
			
		||||
	mesos "github.com/mesos/mesos-go/mesosproto"
 | 
			
		||||
	mutil "github.com/mesos/mesos-go/mesosutil"
 | 
			
		||||
	bindings "github.com/mesos/mesos-go/scheduler"
 | 
			
		||||
	"github.com/pborman/uuid"
 | 
			
		||||
	"github.com/prometheus/client_golang/prometheus"
 | 
			
		||||
	"github.com/spf13/pflag"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
@@ -594,8 +595,14 @@ func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error {
 | 
			
		||||
		validation := ha.ValidationFunc(validateLeadershipTransition)
 | 
			
		||||
		srv := ha.NewCandidate(schedulerProcess, driverFactory, validation)
 | 
			
		||||
		path := meta.ElectionPath(s.frameworkName)
 | 
			
		||||
		log.Infof("registering for election at %v with id %v", path, eid.GetValue())
 | 
			
		||||
		go election.Notify(election.NewEtcdMasterElector(etcdClient), path, eid.GetValue(), srv, nil)
 | 
			
		||||
		uuid := eid.GetValue() + ":" + uuid.New() // unique for each scheduler instance
 | 
			
		||||
		log.Infof("registering for election at %v with id %v", path, uuid)
 | 
			
		||||
		go election.Notify(
 | 
			
		||||
			election.NewEtcdMasterElector(etcdClient),
 | 
			
		||||
			path,
 | 
			
		||||
			uuid,
 | 
			
		||||
			srv,
 | 
			
		||||
			nil)
 | 
			
		||||
	} else {
 | 
			
		||||
		log.Infoln("self-electing in non-HA mode")
 | 
			
		||||
		schedulerProcess.Elect(driverFactory)
 | 
			
		||||
@@ -650,8 +657,28 @@ func (s *SchedulerServer) awaitFailover(schedulerProcess schedulerProcessInterfa
 | 
			
		||||
 | 
			
		||||
func validateLeadershipTransition(desired, current string) {
 | 
			
		||||
	log.Infof("validating leadership transition")
 | 
			
		||||
	// desired, current are of the format <executor-id>:<scheduler-uuid> (see Run()).
 | 
			
		||||
	// parse them and ensure that executor ID's match, otherwise the cluster can get into
 | 
			
		||||
	// a bad state after scheduler failover: executor ID is a config hash that must remain
 | 
			
		||||
	// consistent across failover events.
 | 
			
		||||
	var (
 | 
			
		||||
		i = strings.LastIndex(desired, ":")
 | 
			
		||||
		j = strings.LastIndex(current, ":")
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	if i > -1 {
 | 
			
		||||
		desired = desired[0:i]
 | 
			
		||||
	} else {
 | 
			
		||||
		log.Fatalf("desired id %q is invalid", desired)
 | 
			
		||||
	}
 | 
			
		||||
	if j > -1 {
 | 
			
		||||
		current = current[0:j]
 | 
			
		||||
	} else if current != "" {
 | 
			
		||||
		log.Fatalf("current id %q is invalid", current)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if desired != current && current != "" {
 | 
			
		||||
		log.Fatalf("desired executor id != current executor id", desired, current)
 | 
			
		||||
		log.Fatalf("desired executor id %q != current executor id %q", desired, current)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user