mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Revert "Revert "Don't take the proxy mutex in the traffic path""
This commit is contained in:
		@@ -41,6 +41,7 @@ type portal struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type serviceInfo struct {
 | 
					type serviceInfo struct {
 | 
				
			||||||
 | 
						isAliveAtomic       int32 // Only access this with atomic ops
 | 
				
			||||||
	portal              portal
 | 
						portal              portal
 | 
				
			||||||
	protocol            api.Protocol
 | 
						protocol            api.Protocol
 | 
				
			||||||
	proxyPort           int
 | 
						proxyPort           int
 | 
				
			||||||
@@ -55,6 +56,18 @@ type serviceInfo struct {
 | 
				
			|||||||
	externalIPs []string
 | 
						externalIPs []string
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (info *serviceInfo) setAlive(b bool) {
 | 
				
			||||||
 | 
						var i int32
 | 
				
			||||||
 | 
						if b {
 | 
				
			||||||
 | 
							i = 1
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						atomic.StoreInt32(&info.isAliveAtomic, i)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (info *serviceInfo) isAlive() bool {
 | 
				
			||||||
 | 
						return atomic.LoadInt32(&info.isAliveAtomic) != 0
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func logTimeout(err error) bool {
 | 
					func logTimeout(err error) bool {
 | 
				
			||||||
	if e, ok := err.(net.Error); ok {
 | 
						if e, ok := err.(net.Error); ok {
 | 
				
			||||||
		if e.Timeout() {
 | 
							if e.Timeout() {
 | 
				
			||||||
@@ -256,6 +269,7 @@ func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceIn
 | 
				
			|||||||
// This assumes proxier.mu is locked.
 | 
					// This assumes proxier.mu is locked.
 | 
				
			||||||
func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error {
 | 
					func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error {
 | 
				
			||||||
	delete(proxier.serviceMap, service)
 | 
						delete(proxier.serviceMap, service)
 | 
				
			||||||
 | 
						info.setAlive(false)
 | 
				
			||||||
	err := info.socket.Close()
 | 
						err := info.socket.Close()
 | 
				
			||||||
	port := info.socket.ListenPort()
 | 
						port := info.socket.ListenPort()
 | 
				
			||||||
	proxier.proxyPorts.Release(port)
 | 
						proxier.proxyPorts.Release(port)
 | 
				
			||||||
@@ -294,6 +308,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
 | 
				
			|||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	si := &serviceInfo{
 | 
						si := &serviceInfo{
 | 
				
			||||||
 | 
							isAliveAtomic:       1,
 | 
				
			||||||
		proxyPort:           portNum,
 | 
							proxyPort:           portNum,
 | 
				
			||||||
		protocol:            protocol,
 | 
							protocol:            protocol,
 | 
				
			||||||
		socket:              sock,
 | 
							socket:              sock,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -429,6 +429,9 @@ func TestTCPProxyStop(t *testing.T) {
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("error adding new service: %#v", err)
 | 
							t.Fatalf("error adding new service: %#v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if !svcInfo.isAlive() {
 | 
				
			||||||
 | 
							t.Fatalf("wrong value for isAlive(): expected true")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
 | 
						conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("error connecting to proxy: %v", err)
 | 
							t.Fatalf("error connecting to proxy: %v", err)
 | 
				
			||||||
@@ -437,6 +440,9 @@ func TestTCPProxyStop(t *testing.T) {
 | 
				
			|||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stopProxyByName(p, service)
 | 
						stopProxyByName(p, service)
 | 
				
			||||||
 | 
						if svcInfo.isAlive() {
 | 
				
			||||||
 | 
							t.Fatalf("wrong value for isAlive(): expected false")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	// Wait for the port to really close.
 | 
						// Wait for the port to really close.
 | 
				
			||||||
	if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
 | 
						if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -111,7 +111,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
 | 
					func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
 | 
							if !myInfo.isAlive() {
 | 
				
			||||||
			// The service port was closed or replaced.
 | 
								// The service port was closed or replaced.
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -125,7 +125,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
 | 
				
			|||||||
			if isClosedError(err) {
 | 
								if isClosedError(err) {
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
 | 
								if !myInfo.isAlive() {
 | 
				
			||||||
				// Then the service port was just closed so the accept failure is to be expected.
 | 
									// Then the service port was just closed so the accept failure is to be expected.
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -198,7 +198,7 @@ func newClientCache() *clientCache {
 | 
				
			|||||||
func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
 | 
					func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
 | 
				
			||||||
	var buffer [4096]byte // 4KiB should be enough for most whole-packets
 | 
						var buffer [4096]byte // 4KiB should be enough for most whole-packets
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
 | 
							if !myInfo.isAlive() {
 | 
				
			||||||
			// The service port was closed or replaced.
 | 
								// The service port was closed or replaced.
 | 
				
			||||||
			break
 | 
								break
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user