mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	This had to be able to build on OS X before to make verify-typecheck pass, but now that that's fixed we can tag the code properly as being linux-only.
		
			
				
	
	
		
			221 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			221 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
//go:build linux
 | 
						|
// +build linux
 | 
						|
 | 
						|
/*
 | 
						|
Copyright 2015 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package ipvs
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/klog/v2"
 | 
						|
	utilipvs "k8s.io/kubernetes/pkg/proxy/ipvs/util"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	rsCheckDeleteInterval = 1 * time.Minute
 | 
						|
)
 | 
						|
 | 
						|
// listItem stores real server information and the process time.
 | 
						|
// If nothing special happened, real server will be delete after process time.
 | 
						|
type listItem struct {
 | 
						|
	VirtualServer *utilipvs.VirtualServer
 | 
						|
	RealServer    *utilipvs.RealServer
 | 
						|
}
 | 
						|
 | 
						|
// String return the unique real server name(with virtual server information)
 | 
						|
func (g *listItem) String() string {
 | 
						|
	return GetUniqueRSName(g.VirtualServer, g.RealServer)
 | 
						|
}
 | 
						|
 | 
						|
// GetUniqueRSName return a string type unique rs name with vs information
 | 
						|
func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string {
 | 
						|
	return vs.String() + "/" + rs.String()
 | 
						|
}
 | 
						|
 | 
						|
type graceTerminateRSList struct {
 | 
						|
	lock sync.Mutex
 | 
						|
	list map[string]*listItem
 | 
						|
}
 | 
						|
 | 
						|
// add push an new element to the rsList
 | 
						|
func (q *graceTerminateRSList) add(rs *listItem) bool {
 | 
						|
	q.lock.Lock()
 | 
						|
	defer q.lock.Unlock()
 | 
						|
 | 
						|
	uniqueRS := rs.String()
 | 
						|
	if _, ok := q.list[uniqueRS]; ok {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	klog.V(5).InfoS("Adding real server to graceful delete real server list", "realServer", rs)
 | 
						|
	q.list[uniqueRS] = rs
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// remove remove an element from the rsList
 | 
						|
func (q *graceTerminateRSList) remove(rs *listItem) bool {
 | 
						|
	q.lock.Lock()
 | 
						|
	defer q.lock.Unlock()
 | 
						|
 | 
						|
	uniqueRS := rs.String()
 | 
						|
	if _, ok := q.list[uniqueRS]; ok {
 | 
						|
		delete(q.list, uniqueRS)
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// return the size of the list
 | 
						|
func (q *graceTerminateRSList) len() int {
 | 
						|
	q.lock.Lock()
 | 
						|
	defer q.lock.Unlock()
 | 
						|
 | 
						|
	return len(q.list)
 | 
						|
}
 | 
						|
 | 
						|
func (q *graceTerminateRSList) flushList(handler func(rsToDelete *listItem) (bool, error)) bool {
 | 
						|
	q.lock.Lock()
 | 
						|
	defer q.lock.Unlock()
 | 
						|
	success := true
 | 
						|
	for name, rs := range q.list {
 | 
						|
		deleted, err := handler(rs)
 | 
						|
		if err != nil {
 | 
						|
			klog.ErrorS(err, "Error in deleting real server", "realServer", name)
 | 
						|
			success = false
 | 
						|
		}
 | 
						|
		if deleted {
 | 
						|
			klog.InfoS("Removed real server from graceful delete real server list", "realServer", name)
 | 
						|
			delete(q.list, rs.String())
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return success
 | 
						|
}
 | 
						|
 | 
						|
// exist check whether the specified unique RS is in the rsList
 | 
						|
func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) {
 | 
						|
	q.lock.Lock()
 | 
						|
	defer q.lock.Unlock()
 | 
						|
 | 
						|
	if rs, ok := q.list[uniqueRS]; ok {
 | 
						|
		return rs, true
 | 
						|
	}
 | 
						|
	return nil, false
 | 
						|
}
 | 
						|
 | 
						|
// GracefulTerminationManager manage rs graceful termination information and do graceful termination work
 | 
						|
// rsList is the rs list to graceful termination, ipvs is the ipvsinterface to do ipvs delete/update work
 | 
						|
type GracefulTerminationManager struct {
 | 
						|
	rsList graceTerminateRSList
 | 
						|
	ipvs   utilipvs.Interface
 | 
						|
}
 | 
						|
 | 
						|
// NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work
 | 
						|
func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
 | 
						|
	l := make(map[string]*listItem)
 | 
						|
	return &GracefulTerminationManager{
 | 
						|
		rsList: graceTerminateRSList{
 | 
						|
			list: l,
 | 
						|
		},
 | 
						|
		ipvs: ipvs,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// InTerminationList to check whether specified unique rs name is in graceful termination list
 | 
						|
func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool {
 | 
						|
	_, exist := m.rsList.exist(uniqueRS)
 | 
						|
	return exist
 | 
						|
}
 | 
						|
 | 
						|
// GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list
 | 
						|
func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
 | 
						|
	// Try to delete rs before add it to graceful delete list
 | 
						|
	ele := &listItem{
 | 
						|
		VirtualServer: vs,
 | 
						|
		RealServer:    rs,
 | 
						|
	}
 | 
						|
	deleted, err := m.deleteRsFunc(ele)
 | 
						|
	if err != nil {
 | 
						|
		klog.ErrorS(err, "Error in deleting real server", "realServer", ele)
 | 
						|
	}
 | 
						|
	if deleted {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	rs.Weight = 0
 | 
						|
	err = m.ipvs.UpdateRealServer(vs, rs)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	klog.V(5).InfoS("Adding real server to graceful delete real server list", "realServer", ele)
 | 
						|
	m.rsList.add(ele)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, error) {
 | 
						|
	klog.V(5).InfoS("Trying to delete real server", "realServer", rsToDelete)
 | 
						|
	rss, err := m.ipvs.GetRealServers(rsToDelete.VirtualServer)
 | 
						|
	if err != nil {
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
	for _, rs := range rss {
 | 
						|
		if rsToDelete.RealServer.Equal(rs) {
 | 
						|
			// For UDP and SCTP traffic, no graceful termination, we immediately delete the RS
 | 
						|
			//     (existing connections will be deleted on the next packet because sysctlExpireNoDestConn=1)
 | 
						|
			// For other protocols, don't delete until all connections have expired)
 | 
						|
			if utilipvs.IsRsGracefulTerminationNeeded(rsToDelete.VirtualServer.Protocol) && rs.ActiveConn+rs.InactiveConn != 0 {
 | 
						|
				klog.V(5).InfoS("Skip deleting real server till all connection have expired", "realServer", rsToDelete, "activeConnection", rs.ActiveConn, "inactiveConnection", rs.InactiveConn)
 | 
						|
				return false, nil
 | 
						|
			}
 | 
						|
			klog.V(5).InfoS("Deleting real server", "realServer", rsToDelete)
 | 
						|
			err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rs)
 | 
						|
			if err != nil {
 | 
						|
				return false, fmt.Errorf("delete destination %q err: %w", rs.String(), err)
 | 
						|
			}
 | 
						|
			return true, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return true, fmt.Errorf("failed to delete rs %q, can't find the real server", rsToDelete.String())
 | 
						|
}
 | 
						|
 | 
						|
func (m *GracefulTerminationManager) tryDeleteRs() {
 | 
						|
	if !m.rsList.flushList(m.deleteRsFunc) {
 | 
						|
		klog.ErrorS(nil, "Try flush graceful termination list error")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// MoveRSOutofGracefulDeleteList to delete an rs and remove it from the rsList immediately
 | 
						|
func (m *GracefulTerminationManager) MoveRSOutofGracefulDeleteList(uniqueRS string) error {
 | 
						|
	rsToDelete, find := m.rsList.exist(uniqueRS)
 | 
						|
	if !find || rsToDelete == nil {
 | 
						|
		return fmt.Errorf("failed to find rs: %q", uniqueRS)
 | 
						|
	}
 | 
						|
	err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	m.rsList.remove(rsToDelete)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Run start a goroutine to try to delete rs in the graceful delete rsList with an interval 1 minute
 | 
						|
func (m *GracefulTerminationManager) Run() {
 | 
						|
	go wait.Until(m.tryDeleteRs, rsCheckDeleteInterval, wait.NeverStop)
 | 
						|
}
 |