mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			142 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			142 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 Google Inc. All rights reserved.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package proxy
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
						|
	"github.com/golang/glog"
 | 
						|
)
 | 
						|
 | 
						|
// Proxier is a simple proxy for tcp connections between a localhost:lport and services that provide
 | 
						|
// the actual implementations.
 | 
						|
type Proxier struct {
 | 
						|
	loadBalancer LoadBalancer
 | 
						|
	serviceMap   map[string]int
 | 
						|
}
 | 
						|
 | 
						|
// NewProxier returns a newly created and correctly initialized instance of Proxier.
 | 
						|
func NewProxier(loadBalancer LoadBalancer) *Proxier {
 | 
						|
	return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)}
 | 
						|
}
 | 
						|
 | 
						|
func copyBytes(in, out *net.TCPConn) {
 | 
						|
	glog.Infof("Copying from %v <-> %v <-> %v <-> %v",
 | 
						|
		in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
 | 
						|
	_, err := io.Copy(in, out)
 | 
						|
	if err != nil {
 | 
						|
		glog.Errorf("I/O error: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	in.CloseRead()
 | 
						|
	out.CloseWrite()
 | 
						|
}
 | 
						|
 | 
						|
// proxyConnection creates a bidirectional byte shuffler.
 | 
						|
// It copies bytes to/from each connection.
 | 
						|
func proxyConnection(in, out *net.TCPConn) {
 | 
						|
	glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
 | 
						|
		in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
 | 
						|
	go copyBytes(in, out)
 | 
						|
	go copyBytes(out, in)
 | 
						|
}
 | 
						|
 | 
						|
// AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints.
 | 
						|
// It never returns.
 | 
						|
func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
 | 
						|
	for {
 | 
						|
		inConn, err := listener.Accept()
 | 
						|
		if err != nil {
 | 
						|
			glog.Errorf("Accept failed: %v", err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		glog.Infof("Accepted connection from: %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
 | 
						|
 | 
						|
		// Figure out where this request should go.
 | 
						|
		endpoint, err := proxier.loadBalancer.LoadBalance(service, inConn.RemoteAddr())
 | 
						|
		if err != nil {
 | 
						|
			glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
 | 
						|
			inConn.Close()
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		glog.Infof("Mapped service %s to endpoint %s", service, endpoint)
 | 
						|
		outConn, err := net.DialTimeout("tcp", endpoint, time.Duration(5)*time.Second)
 | 
						|
		// We basically need to take everything from inConn and send to outConn
 | 
						|
		// and anything coming from outConn needs to be sent to inConn.
 | 
						|
		if err != nil {
 | 
						|
			glog.Errorf("Dial failed: %v", err)
 | 
						|
			inConn.Close()
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// addService starts listening for a new service on a given port.
 | 
						|
func (proxier Proxier) addService(service string, port int) error {
 | 
						|
	// Make sure we can start listening on the port before saying all's well.
 | 
						|
	l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	proxier.addServiceCommon(service, l)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// addService starts listening for a new service, returning the port it's using.
 | 
						|
// For testing on a system with unknown ports used.
 | 
						|
func (proxier Proxier) addServiceOnUnusedPort(service string) (string, error) {
 | 
						|
	// Make sure we can start listening on the port before saying all's well.
 | 
						|
	l, err := net.Listen("tcp", ":0")
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	proxier.addServiceCommon(service, l)
 | 
						|
	_, port, err := net.SplitHostPort(l.Addr().String())
 | 
						|
	return port, nil
 | 
						|
}
 | 
						|
 | 
						|
func (proxier Proxier) addServiceCommon(service string, l net.Listener) {
 | 
						|
	glog.Infof("Listening for %s on %s", service, l.Addr().String())
 | 
						|
	// If that succeeds, start the accepting loop.
 | 
						|
	go proxier.AcceptHandler(service, l)
 | 
						|
}
 | 
						|
 | 
						|
// OnUpdate receives update notices for the updated services and start listening newly added services.
 | 
						|
// It implements "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config".ServiceConfigHandler.OnUpdate.
 | 
						|
func (proxier Proxier) OnUpdate(services []api.Service) {
 | 
						|
	glog.Infof("Received update notice: %+v", services)
 | 
						|
	for _, service := range services {
 | 
						|
		port, exists := proxier.serviceMap[service.ID]
 | 
						|
		if exists && port == service.Port {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		glog.Infof("Adding a new service %s on port %d", service.ID, service.Port)
 | 
						|
		err := proxier.addService(service.ID, service.Port)
 | 
						|
		if err != nil {
 | 
						|
			glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		proxier.serviceMap[service.ID] = service.Port
 | 
						|
	}
 | 
						|
}
 |