mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			214 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			214 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2018 Google Inc. All Rights Reserved.
 | 
						|
//
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package mesos
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"github.com/Rican7/retry"
 | 
						|
	"github.com/Rican7/retry/strategy"
 | 
						|
	"github.com/mesos/mesos-go/api/v1/lib"
 | 
						|
	"github.com/mesos/mesos-go/api/v1/lib/agent"
 | 
						|
	"github.com/mesos/mesos-go/api/v1/lib/agent/calls"
 | 
						|
	mclient "github.com/mesos/mesos-go/api/v1/lib/client"
 | 
						|
	"github.com/mesos/mesos-go/api/v1/lib/encoding/codecs"
 | 
						|
	"github.com/mesos/mesos-go/api/v1/lib/httpcli"
 | 
						|
	"net/url"
 | 
						|
	"sync"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	maxRetryAttempts = 3
 | 
						|
	invalidPID       = -1
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	mesosClientOnce sync.Once
 | 
						|
	mesosClient     *client
 | 
						|
)
 | 
						|
 | 
						|
type client struct {
 | 
						|
	hc *httpcli.Client
 | 
						|
}
 | 
						|
 | 
						|
type mesosAgentClient interface {
 | 
						|
	ContainerInfo(id string) (*containerInfo, error)
 | 
						|
	ContainerPid(id string) (int, error)
 | 
						|
}
 | 
						|
 | 
						|
type containerInfo struct {
 | 
						|
	cntr   *mContainer
 | 
						|
	labels map[string]string
 | 
						|
}
 | 
						|
 | 
						|
// Client is an interface to query mesos agent http endpoints
 | 
						|
func Client() (mesosAgentClient, error) {
 | 
						|
	mesosClientOnce.Do(func() {
 | 
						|
		// Start Client
 | 
						|
		apiURL := url.URL{
 | 
						|
			Scheme: "http",
 | 
						|
			Host:   *MesosAgentAddress,
 | 
						|
			Path:   "/api/v1",
 | 
						|
		}
 | 
						|
 | 
						|
		mesosClient = &client{
 | 
						|
			hc: httpcli.New(
 | 
						|
				httpcli.Endpoint(apiURL.String()),
 | 
						|
				httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]),
 | 
						|
				httpcli.Do(httpcli.With(httpcli.Timeout(*MesosAgentTimeout))),
 | 
						|
			),
 | 
						|
		}
 | 
						|
	})
 | 
						|
 | 
						|
	_, err := mesosClient.getVersion()
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to get version")
 | 
						|
	}
 | 
						|
	return mesosClient, nil
 | 
						|
}
 | 
						|
 | 
						|
// ContainerInfo returns the container information of the given container id
 | 
						|
func (self *client) ContainerInfo(id string) (*containerInfo, error) {
 | 
						|
	c, err := self.getContainer(id)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Get labels of the container
 | 
						|
	l, err := self.getLabels(c)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &containerInfo{
 | 
						|
		cntr:   c,
 | 
						|
		labels: l,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Get the Pid of the container
 | 
						|
func (self *client) ContainerPid(id string) (int, error) {
 | 
						|
	var pid int
 | 
						|
	var err error
 | 
						|
	err = retry.Retry(
 | 
						|
		func(attempt uint) error {
 | 
						|
			c, err := self.ContainerInfo(id)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
 | 
						|
			if c.cntr.ContainerStatus != nil {
 | 
						|
				pid = int(*c.cntr.ContainerStatus.ExecutorPID)
 | 
						|
			} else {
 | 
						|
				err = fmt.Errorf("error fetching Pid")
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		},
 | 
						|
		strategy.Limit(maxRetryAttempts),
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return invalidPID, fmt.Errorf("failed to fetch pid")
 | 
						|
	}
 | 
						|
	return pid, err
 | 
						|
}
 | 
						|
 | 
						|
func (self *client) getContainer(id string) (*mContainer, error) {
 | 
						|
	// Get all containers
 | 
						|
	cntrs, err := self.getContainers()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Check if there is a container with given id and return the container
 | 
						|
	for _, c := range cntrs.Containers {
 | 
						|
		if c.ContainerID.Value == id {
 | 
						|
			return &c, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil, fmt.Errorf("can't locate container %s", id)
 | 
						|
}
 | 
						|
 | 
						|
func (self *client) getVersion() (string, error) {
 | 
						|
	req := calls.NonStreaming(calls.GetVersion())
 | 
						|
	result, err := self.fetchAndDecode(req)
 | 
						|
	if err != nil {
 | 
						|
		return "", fmt.Errorf("failed to get mesos version: %v", err)
 | 
						|
	}
 | 
						|
	version := result.GetVersion
 | 
						|
 | 
						|
	if version == nil {
 | 
						|
		return "", fmt.Errorf("failed to get mesos version")
 | 
						|
	}
 | 
						|
	return version.VersionInfo.Version, nil
 | 
						|
}
 | 
						|
 | 
						|
func (self *client) getContainers() (mContainers, error) {
 | 
						|
	req := calls.NonStreaming(calls.GetContainers())
 | 
						|
	result, err := self.fetchAndDecode(req)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to get mesos containers: %v", err)
 | 
						|
	}
 | 
						|
	cntrs := result.GetContainers
 | 
						|
 | 
						|
	if cntrs == nil {
 | 
						|
		return nil, fmt.Errorf("failed to get mesos containers")
 | 
						|
	}
 | 
						|
	return cntrs, nil
 | 
						|
}
 | 
						|
 | 
						|
func (self *client) getLabels(c *mContainer) (map[string]string, error) {
 | 
						|
	// Get mesos agent state which contains all containers labels
 | 
						|
	var s state
 | 
						|
	req := calls.NonStreaming(calls.GetState())
 | 
						|
	result, err := self.fetchAndDecode(req)
 | 
						|
	if err != nil {
 | 
						|
		return map[string]string{}, fmt.Errorf("failed to get mesos agent state: %v", err)
 | 
						|
	}
 | 
						|
	s.st = result.GetState
 | 
						|
 | 
						|
	// Fetch labels from state object
 | 
						|
	labels, err := s.FetchLabels(c.FrameworkID.Value, c.ExecutorID.Value)
 | 
						|
	if err != nil {
 | 
						|
		return labels, fmt.Errorf("error while fetching labels from executor: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	return labels, nil
 | 
						|
}
 | 
						|
 | 
						|
func (self *client) fetchAndDecode(req calls.RequestFunc) (*agent.Response, error) {
 | 
						|
	var res mesos.Response
 | 
						|
	var err error
 | 
						|
 | 
						|
	// Send request
 | 
						|
	err = retry.Retry(
 | 
						|
		func(attempt uint) error {
 | 
						|
			res, err = mesosClient.hc.Send(req, mclient.ResponseClassSingleton, nil)
 | 
						|
			return err
 | 
						|
		},
 | 
						|
		strategy.Limit(maxRetryAttempts),
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("error fetching %s: %s", req.Call(), err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Decode the result
 | 
						|
	var target agent.Response
 | 
						|
	err = res.Decode(&target)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("error while decoding response body from %s: %s", res, err)
 | 
						|
	}
 | 
						|
 | 
						|
	return &target, nil
 | 
						|
}
 |