mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			945 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			945 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
 *
 | 
						|
 * Copyright 2014, Google Inc.
 | 
						|
 * All rights reserved.
 | 
						|
 *
 | 
						|
 * Redistribution and use in source and binary forms, with or without
 | 
						|
 * modification, are permitted provided that the following conditions are
 | 
						|
 * met:
 | 
						|
 *
 | 
						|
 *     * Redistributions of source code must retain the above copyright
 | 
						|
 * notice, this list of conditions and the following disclaimer.
 | 
						|
 *     * Redistributions in binary form must reproduce the above
 | 
						|
 * copyright notice, this list of conditions and the following disclaimer
 | 
						|
 * in the documentation and/or other materials provided with the
 | 
						|
 * distribution.
 | 
						|
 *     * Neither the name of Google Inc. nor the names of its
 | 
						|
 * contributors may be used to endorse or promote products derived from
 | 
						|
 * this software without specific prior written permission.
 | 
						|
 *
 | 
						|
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 | 
						|
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 | 
						|
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 | 
						|
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 | 
						|
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 | 
						|
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 | 
						|
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 | 
						|
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 | 
						|
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 | 
						|
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 | 
						|
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
						|
 *
 | 
						|
 */
 | 
						|
 | 
						|
package grpc
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"reflect"
 | 
						|
	"runtime"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"golang.org/x/net/context"
 | 
						|
	"golang.org/x/net/http2"
 | 
						|
	"golang.org/x/net/trace"
 | 
						|
	"google.golang.org/grpc/codes"
 | 
						|
	"google.golang.org/grpc/credentials"
 | 
						|
	"google.golang.org/grpc/grpclog"
 | 
						|
	"google.golang.org/grpc/internal"
 | 
						|
	"google.golang.org/grpc/metadata"
 | 
						|
	"google.golang.org/grpc/transport"
 | 
						|
)
 | 
						|
 | 
						|
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
 | 
						|
 | 
						|
// MethodDesc represents an RPC service's method specification.
 | 
						|
type MethodDesc struct {
 | 
						|
	MethodName string
 | 
						|
	Handler    methodHandler
 | 
						|
}
 | 
						|
 | 
						|
// ServiceDesc represents an RPC service's specification.
 | 
						|
type ServiceDesc struct {
 | 
						|
	ServiceName string
 | 
						|
	// The pointer to the service interface. Used to check whether the user
 | 
						|
	// provided implementation satisfies the interface requirements.
 | 
						|
	HandlerType interface{}
 | 
						|
	Methods     []MethodDesc
 | 
						|
	Streams     []StreamDesc
 | 
						|
	Metadata    interface{}
 | 
						|
}
 | 
						|
 | 
						|
// service consists of the information of the server serving this service and
 | 
						|
// the methods in this service.
 | 
						|
type service struct {
 | 
						|
	server interface{} // the server for service methods
 | 
						|
	md     map[string]*MethodDesc
 | 
						|
	sd     map[string]*StreamDesc
 | 
						|
	mdata  interface{}
 | 
						|
}
 | 
						|
 | 
						|
// Server is a gRPC server to serve RPC requests.
 | 
						|
type Server struct {
 | 
						|
	opts options
 | 
						|
 | 
						|
	mu     sync.Mutex // guards following
 | 
						|
	lis    map[net.Listener]bool
 | 
						|
	conns  map[io.Closer]bool
 | 
						|
	drain  bool
 | 
						|
	ctx    context.Context
 | 
						|
	cancel context.CancelFunc
 | 
						|
	// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
 | 
						|
	// and all the transport goes away.
 | 
						|
	cv     *sync.Cond
 | 
						|
	m      map[string]*service // service name -> service info
 | 
						|
	events trace.EventLog
 | 
						|
}
 | 
						|
 | 
						|
type options struct {
 | 
						|
	creds                credentials.TransportCredentials
 | 
						|
	codec                Codec
 | 
						|
	cp                   Compressor
 | 
						|
	dc                   Decompressor
 | 
						|
	maxMsgSize           int
 | 
						|
	unaryInt             UnaryServerInterceptor
 | 
						|
	streamInt            StreamServerInterceptor
 | 
						|
	maxConcurrentStreams uint32
 | 
						|
	useHandlerImpl       bool // use http.Handler-based server
 | 
						|
}
 | 
						|
 | 
						|
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
 | 
						|
 | 
						|
// A ServerOption sets options.
 | 
						|
type ServerOption func(*options)
 | 
						|
 | 
						|
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
 | 
						|
func CustomCodec(codec Codec) ServerOption {
 | 
						|
	return func(o *options) {
 | 
						|
		o.codec = codec
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
 | 
						|
func RPCCompressor(cp Compressor) ServerOption {
 | 
						|
	return func(o *options) {
 | 
						|
		o.cp = cp
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
 | 
						|
func RPCDecompressor(dc Decompressor) ServerOption {
 | 
						|
	return func(o *options) {
 | 
						|
		o.dc = dc
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
 | 
						|
// If this is not set, gRPC uses the default 4MB.
 | 
						|
func MaxMsgSize(m int) ServerOption {
 | 
						|
	return func(o *options) {
 | 
						|
		o.maxMsgSize = m
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
 | 
						|
// of concurrent streams to each ServerTransport.
 | 
						|
func MaxConcurrentStreams(n uint32) ServerOption {
 | 
						|
	return func(o *options) {
 | 
						|
		o.maxConcurrentStreams = n
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Creds returns a ServerOption that sets credentials for server connections.
 | 
						|
func Creds(c credentials.TransportCredentials) ServerOption {
 | 
						|
	return func(o *options) {
 | 
						|
		o.creds = c
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
 | 
						|
// server. Only one unary interceptor can be installed. The construction of multiple
 | 
						|
// interceptors (e.g., chaining) can be implemented at the caller.
 | 
						|
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
 | 
						|
	return func(o *options) {
 | 
						|
		if o.unaryInt != nil {
 | 
						|
			panic("The unary server interceptor has been set.")
 | 
						|
		}
 | 
						|
		o.unaryInt = i
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
 | 
						|
// server. Only one stream interceptor can be installed.
 | 
						|
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
 | 
						|
	return func(o *options) {
 | 
						|
		if o.streamInt != nil {
 | 
						|
			panic("The stream server interceptor has been set.")
 | 
						|
		}
 | 
						|
		o.streamInt = i
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// NewServer creates a gRPC server which has no service registered and has not
 | 
						|
// started to accept requests yet.
 | 
						|
func NewServer(opt ...ServerOption) *Server {
 | 
						|
	var opts options
 | 
						|
	opts.maxMsgSize = defaultMaxMsgSize
 | 
						|
	for _, o := range opt {
 | 
						|
		o(&opts)
 | 
						|
	}
 | 
						|
	if opts.codec == nil {
 | 
						|
		// Set the default codec.
 | 
						|
		opts.codec = protoCodec{}
 | 
						|
	}
 | 
						|
	s := &Server{
 | 
						|
		lis:   make(map[net.Listener]bool),
 | 
						|
		opts:  opts,
 | 
						|
		conns: make(map[io.Closer]bool),
 | 
						|
		m:     make(map[string]*service),
 | 
						|
	}
 | 
						|
	s.cv = sync.NewCond(&s.mu)
 | 
						|
	s.ctx, s.cancel = context.WithCancel(context.Background())
 | 
						|
	if EnableTracing {
 | 
						|
		_, file, line, _ := runtime.Caller(1)
 | 
						|
		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
 | 
						|
	}
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
// printf records an event in s's event log, unless s has been stopped.
 | 
						|
// REQUIRES s.mu is held.
 | 
						|
func (s *Server) printf(format string, a ...interface{}) {
 | 
						|
	if s.events != nil {
 | 
						|
		s.events.Printf(format, a...)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// errorf records an error in s's event log, unless s has been stopped.
 | 
						|
// REQUIRES s.mu is held.
 | 
						|
func (s *Server) errorf(format string, a ...interface{}) {
 | 
						|
	if s.events != nil {
 | 
						|
		s.events.Errorf(format, a...)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// RegisterService register a service and its implementation to the gRPC
 | 
						|
// server. Called from the IDL generated code. This must be called before
 | 
						|
// invoking Serve.
 | 
						|
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
 | 
						|
	ht := reflect.TypeOf(sd.HandlerType).Elem()
 | 
						|
	st := reflect.TypeOf(ss)
 | 
						|
	if !st.Implements(ht) {
 | 
						|
		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
 | 
						|
	}
 | 
						|
	s.register(sd, ss)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
	s.printf("RegisterService(%q)", sd.ServiceName)
 | 
						|
	if _, ok := s.m[sd.ServiceName]; ok {
 | 
						|
		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
 | 
						|
	}
 | 
						|
	srv := &service{
 | 
						|
		server: ss,
 | 
						|
		md:     make(map[string]*MethodDesc),
 | 
						|
		sd:     make(map[string]*StreamDesc),
 | 
						|
		mdata:  sd.Metadata,
 | 
						|
	}
 | 
						|
	for i := range sd.Methods {
 | 
						|
		d := &sd.Methods[i]
 | 
						|
		srv.md[d.MethodName] = d
 | 
						|
	}
 | 
						|
	for i := range sd.Streams {
 | 
						|
		d := &sd.Streams[i]
 | 
						|
		srv.sd[d.StreamName] = d
 | 
						|
	}
 | 
						|
	s.m[sd.ServiceName] = srv
 | 
						|
}
 | 
						|
 | 
						|
// MethodInfo contains the information of an RPC including its method name and type.
 | 
						|
type MethodInfo struct {
 | 
						|
	// Name is the method name only, without the service name or package name.
 | 
						|
	Name string
 | 
						|
	// IsClientStream indicates whether the RPC is a client streaming RPC.
 | 
						|
	IsClientStream bool
 | 
						|
	// IsServerStream indicates whether the RPC is a server streaming RPC.
 | 
						|
	IsServerStream bool
 | 
						|
}
 | 
						|
 | 
						|
// ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
 | 
						|
type ServiceInfo struct {
 | 
						|
	Methods []MethodInfo
 | 
						|
	// Metadata is the metadata specified in ServiceDesc when registering service.
 | 
						|
	Metadata interface{}
 | 
						|
}
 | 
						|
 | 
						|
// GetServiceInfo returns a map from service names to ServiceInfo.
 | 
						|
// Service names include the package names, in the form of <package>.<service>.
 | 
						|
func (s *Server) GetServiceInfo() map[string]ServiceInfo {
 | 
						|
	ret := make(map[string]ServiceInfo)
 | 
						|
	for n, srv := range s.m {
 | 
						|
		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
 | 
						|
		for m := range srv.md {
 | 
						|
			methods = append(methods, MethodInfo{
 | 
						|
				Name:           m,
 | 
						|
				IsClientStream: false,
 | 
						|
				IsServerStream: false,
 | 
						|
			})
 | 
						|
		}
 | 
						|
		for m, d := range srv.sd {
 | 
						|
			methods = append(methods, MethodInfo{
 | 
						|
				Name:           m,
 | 
						|
				IsClientStream: d.ClientStreams,
 | 
						|
				IsServerStream: d.ServerStreams,
 | 
						|
			})
 | 
						|
		}
 | 
						|
 | 
						|
		ret[n] = ServiceInfo{
 | 
						|
			Methods:  methods,
 | 
						|
			Metadata: srv.mdata,
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
var (
 | 
						|
	// ErrServerStopped indicates that the operation is now illegal because of
 | 
						|
	// the server being stopped.
 | 
						|
	ErrServerStopped = errors.New("grpc: the server has been stopped")
 | 
						|
)
 | 
						|
 | 
						|
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
 | 
						|
	if s.opts.creds == nil {
 | 
						|
		return rawConn, nil, nil
 | 
						|
	}
 | 
						|
	return s.opts.creds.ServerHandshake(rawConn)
 | 
						|
}
 | 
						|
 | 
						|
// Serve accepts incoming connections on the listener lis, creating a new
 | 
						|
// ServerTransport and service goroutine for each. The service goroutines
 | 
						|
// read gRPC requests and then call the registered handlers to reply to them.
 | 
						|
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
 | 
						|
// this method returns.
 | 
						|
func (s *Server) Serve(lis net.Listener) error {
 | 
						|
	s.mu.Lock()
 | 
						|
	s.printf("serving")
 | 
						|
	if s.lis == nil {
 | 
						|
		s.mu.Unlock()
 | 
						|
		lis.Close()
 | 
						|
		return ErrServerStopped
 | 
						|
	}
 | 
						|
	s.lis[lis] = true
 | 
						|
	s.mu.Unlock()
 | 
						|
	defer func() {
 | 
						|
		s.mu.Lock()
 | 
						|
		if s.lis != nil && s.lis[lis] {
 | 
						|
			lis.Close()
 | 
						|
			delete(s.lis, lis)
 | 
						|
		}
 | 
						|
		s.mu.Unlock()
 | 
						|
	}()
 | 
						|
 | 
						|
	var tempDelay time.Duration // how long to sleep on accept failure
 | 
						|
 | 
						|
	for {
 | 
						|
		rawConn, err := lis.Accept()
 | 
						|
		if err != nil {
 | 
						|
			if ne, ok := err.(interface {
 | 
						|
				Temporary() bool
 | 
						|
			}); ok && ne.Temporary() {
 | 
						|
				if tempDelay == 0 {
 | 
						|
					tempDelay = 5 * time.Millisecond
 | 
						|
				} else {
 | 
						|
					tempDelay *= 2
 | 
						|
				}
 | 
						|
				if max := 1 * time.Second; tempDelay > max {
 | 
						|
					tempDelay = max
 | 
						|
				}
 | 
						|
				s.mu.Lock()
 | 
						|
				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
 | 
						|
				s.mu.Unlock()
 | 
						|
				select {
 | 
						|
				case <-time.After(tempDelay):
 | 
						|
				case <-s.ctx.Done():
 | 
						|
				}
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			s.mu.Lock()
 | 
						|
			s.printf("done serving; Accept = %v", err)
 | 
						|
			s.mu.Unlock()
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		tempDelay = 0
 | 
						|
		// Start a new goroutine to deal with rawConn
 | 
						|
		// so we don't stall this Accept loop goroutine.
 | 
						|
		go s.handleRawConn(rawConn)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// handleRawConn is run in its own goroutine and handles a just-accepted
 | 
						|
// connection that has not had any I/O performed on it yet.
 | 
						|
func (s *Server) handleRawConn(rawConn net.Conn) {
 | 
						|
	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
 | 
						|
	if err != nil {
 | 
						|
		s.mu.Lock()
 | 
						|
		s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
 | 
						|
		s.mu.Unlock()
 | 
						|
		grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
 | 
						|
		// If serverHandShake returns ErrConnDispatched, keep rawConn open.
 | 
						|
		if err != credentials.ErrConnDispatched {
 | 
						|
			rawConn.Close()
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	s.mu.Lock()
 | 
						|
	if s.conns == nil {
 | 
						|
		s.mu.Unlock()
 | 
						|
		conn.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
	s.mu.Unlock()
 | 
						|
 | 
						|
	if s.opts.useHandlerImpl {
 | 
						|
		s.serveUsingHandler(conn)
 | 
						|
	} else {
 | 
						|
		s.serveNewHTTP2Transport(conn, authInfo)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// serveNewHTTP2Transport sets up a new http/2 transport (using the
 | 
						|
// gRPC http2 server transport in transport/http2_server.go) and
 | 
						|
// serves streams on it.
 | 
						|
// This is run in its own goroutine (it does network I/O in
 | 
						|
// transport.NewServerTransport).
 | 
						|
func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
 | 
						|
	st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
 | 
						|
	if err != nil {
 | 
						|
		s.mu.Lock()
 | 
						|
		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
 | 
						|
		s.mu.Unlock()
 | 
						|
		c.Close()
 | 
						|
		grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if !s.addConn(st) {
 | 
						|
		st.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
	s.serveStreams(st)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) serveStreams(st transport.ServerTransport) {
 | 
						|
	defer s.removeConn(st)
 | 
						|
	defer st.Close()
 | 
						|
	var wg sync.WaitGroup
 | 
						|
	st.HandleStreams(func(stream *transport.Stream) {
 | 
						|
		wg.Add(1)
 | 
						|
		go func() {
 | 
						|
			defer wg.Done()
 | 
						|
			s.handleStream(st, stream, s.traceInfo(st, stream))
 | 
						|
		}()
 | 
						|
	})
 | 
						|
	wg.Wait()
 | 
						|
}
 | 
						|
 | 
						|
var _ http.Handler = (*Server)(nil)
 | 
						|
 | 
						|
// serveUsingHandler is called from handleRawConn when s is configured
 | 
						|
// to handle requests via the http.Handler interface. It sets up a
 | 
						|
// net/http.Server to handle the just-accepted conn. The http.Server
 | 
						|
// is configured to route all incoming requests (all HTTP/2 streams)
 | 
						|
// to ServeHTTP, which creates a new ServerTransport for each stream.
 | 
						|
// serveUsingHandler blocks until conn closes.
 | 
						|
//
 | 
						|
// This codepath is only used when Server.TestingUseHandlerImpl has
 | 
						|
// been configured. This lets the end2end tests exercise the ServeHTTP
 | 
						|
// method as one of the environment types.
 | 
						|
//
 | 
						|
// conn is the *tls.Conn that's already been authenticated.
 | 
						|
func (s *Server) serveUsingHandler(conn net.Conn) {
 | 
						|
	if !s.addConn(conn) {
 | 
						|
		conn.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer s.removeConn(conn)
 | 
						|
	h2s := &http2.Server{
 | 
						|
		MaxConcurrentStreams: s.opts.maxConcurrentStreams,
 | 
						|
	}
 | 
						|
	h2s.ServeConn(conn, &http2.ServeConnOpts{
 | 
						|
		Handler: s,
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
						|
	st, err := transport.NewServerHandlerTransport(w, r)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if !s.addConn(st) {
 | 
						|
		st.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer s.removeConn(st)
 | 
						|
	s.serveStreams(st)
 | 
						|
}
 | 
						|
 | 
						|
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
 | 
						|
// If tracing is not enabled, it returns nil.
 | 
						|
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
 | 
						|
	if !EnableTracing {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	trInfo = &traceInfo{
 | 
						|
		tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
 | 
						|
	}
 | 
						|
	trInfo.firstLine.client = false
 | 
						|
	trInfo.firstLine.remoteAddr = st.RemoteAddr()
 | 
						|
	stream.TraceContext(trInfo.tr)
 | 
						|
	if dl, ok := stream.Context().Deadline(); ok {
 | 
						|
		trInfo.firstLine.deadline = dl.Sub(time.Now())
 | 
						|
	}
 | 
						|
	return trInfo
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) addConn(c io.Closer) bool {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
	if s.conns == nil || s.drain {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	s.conns[c] = true
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) removeConn(c io.Closer) {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
	if s.conns != nil {
 | 
						|
		delete(s.conns, c)
 | 
						|
		s.cv.Broadcast()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
 | 
						|
	var cbuf *bytes.Buffer
 | 
						|
	if cp != nil {
 | 
						|
		cbuf = new(bytes.Buffer)
 | 
						|
	}
 | 
						|
	p, err := encode(s.opts.codec, msg, cp, cbuf)
 | 
						|
	if err != nil {
 | 
						|
		// This typically indicates a fatal issue (e.g., memory
 | 
						|
		// corruption or hardware faults) the application program
 | 
						|
		// cannot handle.
 | 
						|
		//
 | 
						|
		// TODO(zhaoq): There exist other options also such as only closing the
 | 
						|
		// faulty stream locally and remotely (Other streams can keep going). Find
 | 
						|
		// the optimal option.
 | 
						|
		grpclog.Fatalf("grpc: Server failed to encode response %v", err)
 | 
						|
	}
 | 
						|
	return t.Write(stream, p, opts)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
 | 
						|
	if trInfo != nil {
 | 
						|
		defer trInfo.tr.Finish()
 | 
						|
		trInfo.firstLine.client = false
 | 
						|
		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 | 
						|
		defer func() {
 | 
						|
			if err != nil && err != io.EOF {
 | 
						|
				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
				trInfo.tr.SetError()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
	if s.opts.cp != nil {
 | 
						|
		// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
 | 
						|
		stream.SetSendCompress(s.opts.cp.Type())
 | 
						|
	}
 | 
						|
	p := &parser{r: stream}
 | 
						|
	for {
 | 
						|
		pf, req, err := p.recvMsg(s.opts.maxMsgSize)
 | 
						|
		if err == io.EOF {
 | 
						|
			// The entire stream is done (for unary RPC only).
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if err == io.ErrUnexpectedEOF {
 | 
						|
			err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			switch err := err.(type) {
 | 
						|
			case *rpcError:
 | 
						|
				if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
 | 
						|
					grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
 | 
						|
				}
 | 
						|
			case transport.ConnectionError:
 | 
						|
				// Nothing to do here.
 | 
						|
			case transport.StreamError:
 | 
						|
				if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
 | 
						|
					grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
 | 
						|
				}
 | 
						|
			default:
 | 
						|
				panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
 | 
						|
			switch err := err.(type) {
 | 
						|
			case *rpcError:
 | 
						|
				if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
 | 
						|
					grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
 | 
						|
				}
 | 
						|
			default:
 | 
						|
				if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
 | 
						|
					grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
 | 
						|
				}
 | 
						|
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		statusCode := codes.OK
 | 
						|
		statusDesc := ""
 | 
						|
		df := func(v interface{}) error {
 | 
						|
			if pf == compressionMade {
 | 
						|
				var err error
 | 
						|
				req, err = s.opts.dc.Do(bytes.NewReader(req))
 | 
						|
				if err != nil {
 | 
						|
					if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
 | 
						|
						grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
 | 
						|
					}
 | 
						|
					return err
 | 
						|
				}
 | 
						|
			}
 | 
						|
			if len(req) > s.opts.maxMsgSize {
 | 
						|
				// TODO: Revisit the error code. Currently keep it consistent with
 | 
						|
				// java implementation.
 | 
						|
				statusCode = codes.Internal
 | 
						|
				statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
 | 
						|
			}
 | 
						|
			if err := s.opts.codec.Unmarshal(req, v); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			if trInfo != nil {
 | 
						|
				trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
 | 
						|
			}
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
 | 
						|
		if appErr != nil {
 | 
						|
			if err, ok := appErr.(*rpcError); ok {
 | 
						|
				statusCode = err.code
 | 
						|
				statusDesc = err.desc
 | 
						|
			} else {
 | 
						|
				statusCode = convertCode(appErr)
 | 
						|
				statusDesc = appErr.Error()
 | 
						|
			}
 | 
						|
			if trInfo != nil && statusCode != codes.OK {
 | 
						|
				trInfo.tr.LazyLog(stringer(statusDesc), true)
 | 
						|
				trInfo.tr.SetError()
 | 
						|
			}
 | 
						|
			if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
 | 
						|
				grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(stringer("OK"), false)
 | 
						|
		}
 | 
						|
		opts := &transport.Options{
 | 
						|
			Last:  true,
 | 
						|
			Delay: false,
 | 
						|
		}
 | 
						|
		if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
 | 
						|
			switch err := err.(type) {
 | 
						|
			case transport.ConnectionError:
 | 
						|
				// Nothing to do here.
 | 
						|
			case transport.StreamError:
 | 
						|
				statusCode = err.Code
 | 
						|
				statusDesc = err.Desc
 | 
						|
			default:
 | 
						|
				statusCode = codes.Unknown
 | 
						|
				statusDesc = err.Error()
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
 | 
						|
		}
 | 
						|
		return t.WriteStatus(stream, statusCode, statusDesc)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
 | 
						|
	if s.opts.cp != nil {
 | 
						|
		stream.SetSendCompress(s.opts.cp.Type())
 | 
						|
	}
 | 
						|
	ss := &serverStream{
 | 
						|
		t:          t,
 | 
						|
		s:          stream,
 | 
						|
		p:          &parser{r: stream},
 | 
						|
		codec:      s.opts.codec,
 | 
						|
		cp:         s.opts.cp,
 | 
						|
		dc:         s.opts.dc,
 | 
						|
		maxMsgSize: s.opts.maxMsgSize,
 | 
						|
		trInfo:     trInfo,
 | 
						|
	}
 | 
						|
	if ss.cp != nil {
 | 
						|
		ss.cbuf = new(bytes.Buffer)
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		trInfo.tr.LazyLog(&trInfo.firstLine, false)
 | 
						|
		defer func() {
 | 
						|
			ss.mu.Lock()
 | 
						|
			if err != nil && err != io.EOF {
 | 
						|
				ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
				ss.trInfo.tr.SetError()
 | 
						|
			}
 | 
						|
			ss.trInfo.tr.Finish()
 | 
						|
			ss.trInfo.tr = nil
 | 
						|
			ss.mu.Unlock()
 | 
						|
		}()
 | 
						|
	}
 | 
						|
	var appErr error
 | 
						|
	if s.opts.streamInt == nil {
 | 
						|
		appErr = sd.Handler(srv.server, ss)
 | 
						|
	} else {
 | 
						|
		info := &StreamServerInfo{
 | 
						|
			FullMethod:     stream.Method(),
 | 
						|
			IsClientStream: sd.ClientStreams,
 | 
						|
			IsServerStream: sd.ServerStreams,
 | 
						|
		}
 | 
						|
		appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
 | 
						|
	}
 | 
						|
	if appErr != nil {
 | 
						|
		if err, ok := appErr.(*rpcError); ok {
 | 
						|
			ss.statusCode = err.code
 | 
						|
			ss.statusDesc = err.desc
 | 
						|
		} else if err, ok := appErr.(transport.StreamError); ok {
 | 
						|
			ss.statusCode = err.Code
 | 
						|
			ss.statusDesc = err.Desc
 | 
						|
		} else {
 | 
						|
			ss.statusCode = convertCode(appErr)
 | 
						|
			ss.statusDesc = appErr.Error()
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		ss.mu.Lock()
 | 
						|
		if ss.statusCode != codes.OK {
 | 
						|
			ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
 | 
						|
			ss.trInfo.tr.SetError()
 | 
						|
		} else {
 | 
						|
			ss.trInfo.tr.LazyLog(stringer("OK"), false)
 | 
						|
		}
 | 
						|
		ss.mu.Unlock()
 | 
						|
	}
 | 
						|
	return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
 | 
						|
	sm := stream.Method()
 | 
						|
	if sm != "" && sm[0] == '/' {
 | 
						|
		sm = sm[1:]
 | 
						|
	}
 | 
						|
	pos := strings.LastIndex(sm, "/")
 | 
						|
	if pos == -1 {
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
 | 
						|
			trInfo.tr.SetError()
 | 
						|
		}
 | 
						|
		if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
 | 
						|
			if trInfo != nil {
 | 
						|
				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
				trInfo.tr.SetError()
 | 
						|
			}
 | 
						|
			grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.Finish()
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
	service := sm[:pos]
 | 
						|
	method := sm[pos+1:]
 | 
						|
	srv, ok := s.m[service]
 | 
						|
	if !ok {
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
 | 
						|
			trInfo.tr.SetError()
 | 
						|
		}
 | 
						|
		if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
 | 
						|
			if trInfo != nil {
 | 
						|
				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
				trInfo.tr.SetError()
 | 
						|
			}
 | 
						|
			grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
 | 
						|
		}
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.Finish()
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
	// Unary RPC or Streaming RPC?
 | 
						|
	if md, ok := srv.md[method]; ok {
 | 
						|
		s.processUnaryRPC(t, stream, srv, md, trInfo)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if sd, ok := srv.sd[method]; ok {
 | 
						|
		s.processStreamingRPC(t, stream, srv, sd, trInfo)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
 | 
						|
		trInfo.tr.SetError()
 | 
						|
	}
 | 
						|
	if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
 | 
						|
		if trInfo != nil {
 | 
						|
			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
 | 
						|
			trInfo.tr.SetError()
 | 
						|
		}
 | 
						|
		grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
 | 
						|
	}
 | 
						|
	if trInfo != nil {
 | 
						|
		trInfo.tr.Finish()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Stop stops the gRPC server. It immediately closes all open
 | 
						|
// connections and listeners.
 | 
						|
// It cancels all active RPCs on the server side and the corresponding
 | 
						|
// pending RPCs on the client side will get notified by connection
 | 
						|
// errors.
 | 
						|
func (s *Server) Stop() {
 | 
						|
	s.mu.Lock()
 | 
						|
	listeners := s.lis
 | 
						|
	s.lis = nil
 | 
						|
	st := s.conns
 | 
						|
	s.conns = nil
 | 
						|
	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
 | 
						|
	s.cv.Broadcast()
 | 
						|
	s.mu.Unlock()
 | 
						|
 | 
						|
	for lis := range listeners {
 | 
						|
		lis.Close()
 | 
						|
	}
 | 
						|
	for c := range st {
 | 
						|
		c.Close()
 | 
						|
	}
 | 
						|
 | 
						|
	s.mu.Lock()
 | 
						|
	s.cancel()
 | 
						|
	if s.events != nil {
 | 
						|
		s.events.Finish()
 | 
						|
		s.events = nil
 | 
						|
	}
 | 
						|
	s.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
 | 
						|
// connections and RPCs and blocks until all the pending RPCs are finished.
 | 
						|
func (s *Server) GracefulStop() {
 | 
						|
	s.mu.Lock()
 | 
						|
	defer s.mu.Unlock()
 | 
						|
	if s.conns == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	for lis := range s.lis {
 | 
						|
		lis.Close()
 | 
						|
	}
 | 
						|
	s.lis = nil
 | 
						|
	s.cancel()
 | 
						|
	if !s.drain {
 | 
						|
		for c := range s.conns {
 | 
						|
			c.(transport.ServerTransport).Drain()
 | 
						|
		}
 | 
						|
		s.drain = true
 | 
						|
	}
 | 
						|
	for len(s.conns) != 0 {
 | 
						|
		s.cv.Wait()
 | 
						|
	}
 | 
						|
	s.conns = nil
 | 
						|
	if s.events != nil {
 | 
						|
		s.events.Finish()
 | 
						|
		s.events = nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func init() {
 | 
						|
	internal.TestingCloseConns = func(arg interface{}) {
 | 
						|
		arg.(*Server).testingCloseConns()
 | 
						|
	}
 | 
						|
	internal.TestingUseHandlerImpl = func(arg interface{}) {
 | 
						|
		arg.(*Server).opts.useHandlerImpl = true
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// testingCloseConns closes all existing transports but keeps s.lis
 | 
						|
// accepting new connections.
 | 
						|
func (s *Server) testingCloseConns() {
 | 
						|
	s.mu.Lock()
 | 
						|
	for c := range s.conns {
 | 
						|
		c.Close()
 | 
						|
		delete(s.conns, c)
 | 
						|
	}
 | 
						|
	s.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// SetHeader sets the header metadata.
 | 
						|
// When called multiple times, all the provided metadata will be merged.
 | 
						|
// All the metadata will be sent out when one of the following happens:
 | 
						|
//  - grpc.SendHeader() is called;
 | 
						|
//  - The first response is sent out;
 | 
						|
//  - An RPC status is sent out (error or success).
 | 
						|
func SetHeader(ctx context.Context, md metadata.MD) error {
 | 
						|
	if md.Len() == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	stream, ok := transport.StreamFromContext(ctx)
 | 
						|
	if !ok {
 | 
						|
		return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
 | 
						|
	}
 | 
						|
	return stream.SetHeader(md)
 | 
						|
}
 | 
						|
 | 
						|
// SendHeader sends header metadata. It may be called at most once.
 | 
						|
// The provided md and headers set by SetHeader() will be sent.
 | 
						|
func SendHeader(ctx context.Context, md metadata.MD) error {
 | 
						|
	stream, ok := transport.StreamFromContext(ctx)
 | 
						|
	if !ok {
 | 
						|
		return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
 | 
						|
	}
 | 
						|
	t := stream.ServerTransport()
 | 
						|
	if t == nil {
 | 
						|
		grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
 | 
						|
	}
 | 
						|
	if err := t.WriteHeader(stream, md); err != nil {
 | 
						|
		return toRPCErr(err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
 | 
						|
// When called more than once, all the provided metadata will be merged.
 | 
						|
func SetTrailer(ctx context.Context, md metadata.MD) error {
 | 
						|
	if md.Len() == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	stream, ok := transport.StreamFromContext(ctx)
 | 
						|
	if !ok {
 | 
						|
		return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
 | 
						|
	}
 | 
						|
	return stream.SetTrailer(md)
 | 
						|
}
 |