replace kubeclient with kubeclientset in scheduler factory

Current factory's client type does not allow to use different client implementing the same interface.
This commit is contained in:
Jan Chaloupka
2016-10-05 10:40:39 +02:00
parent 92cb90fc5d
commit 19ab3c6140
7 changed files with 54 additions and 46 deletions

View File

@@ -28,11 +28,11 @@ import (
"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/runtime"
@@ -88,7 +88,6 @@ func Run(s *options.SchedulerServer) error {
kubeconfig.QPS = s.KubeAPIQPS
kubeconfig.Burst = int(s.KubeAPIBurst)
kubeClient, err := client.New(kubeconfig)
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
@@ -115,7 +114,7 @@ func Run(s *options.SchedulerServer) error {
glog.Fatal(server.ListenAndServe())
}()
configFactory := factory.NewConfigFactory(kubeClient, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
configFactory := factory.NewConfigFactory(leaderElectionClient, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
config, err := createConfig(s, configFactory)
if err != nil {
@@ -125,7 +124,7 @@ func Run(s *options.SchedulerServer) error {
eventBroadcaster := record.NewBroadcaster()
config.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: s.SchedulerName})
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: leaderElectionClient.Core().Events("")})
sched := scheduler.New(config)

View File

@@ -25,8 +25,8 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apimachinery/registered"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
@@ -408,7 +408,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
if _, err := factory.NewConfigFactory(client, "some-scheduler-name", api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains).CreateFromConfig(policy); err != nil {
t.Errorf("%s: Error constructing: %v", v, err)

View File

@@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime"
@@ -53,7 +53,7 @@ const (
// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
Client clientset.Interface
// queue for pods that need scheduling
PodQueue *cache.FIFO
// a means to list all known scheduled pods.
@@ -96,7 +96,7 @@ type ConfigFactory struct {
}
// Initializes the factory.
func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) *ConfigFactory {
func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) *ConfigFactory {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
@@ -478,7 +478,7 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
// scheduled.
func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWatch {
selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(api.PodSucceeded) + ",status.phase!=" + string(api.PodFailed))
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, selector)
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "pods", api.NamespaceAll, selector)
}
// Returns a cache.ListWatch that finds all pods that are
@@ -486,7 +486,7 @@ func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWa
// TODO: return a ListerWatcher interface instead?
func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatch {
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(api.PodSucceeded) + ",status.phase!=" + string(api.PodFailed))
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, selector)
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "pods", api.NamespaceAll, selector)
}
// createNodeLW returns a cache.ListWatch that gets all changes to nodes.
@@ -494,32 +494,32 @@ func (factory *ConfigFactory) createNodeLW() *cache.ListWatch {
// all nodes are considered to ensure that the scheduler cache has access to all nodes for lookups
// the NodeCondition is used to filter out the nodes that are not ready or unschedulable
// the filtered list is used as the super set of nodes to consider for scheduling
return cache.NewListWatchFromClient(factory.Client, "nodes", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "nodes", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// createPersistentVolumeLW returns a cache.ListWatch that gets all changes to persistentVolumes.
func (factory *ConfigFactory) createPersistentVolumeLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "persistentVolumes", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "persistentVolumes", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// createPersistentVolumeClaimLW returns a cache.ListWatch that gets all changes to persistentVolumeClaims.
func (factory *ConfigFactory) createPersistentVolumeClaimLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "persistentVolumeClaims", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "persistentVolumeClaims", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "services", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "services", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to controllers.
func (factory *ConfigFactory) createControllerLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "replicationControllers", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Core().GetRESTClient(), "replicationControllers", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to replicasets.
func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.ExtensionsClient, "replicasets", api.NamespaceAll, fields.ParseSelectorOrDie(""))
return cache.NewListWatchFromClient(factory.Client.Extensions().GetRESTClient(), "replicasets", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}
func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
@@ -547,7 +547,7 @@ func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue
// Get the pod again; it may have changed/been scheduled already.
getBackoff := initialGetBackoff
for {
pod, err := factory.Client.Pods(podID.Namespace).Get(podID.Name)
pod, err := factory.Client.Core().Pods(podID.Namespace).Get(podID.Name)
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddIfNotPresent(pod)
@@ -587,26 +587,26 @@ func (ne *nodeEnumerator) Get(index int) interface{} {
}
type binder struct {
*client.Client
Client clientset.Interface
}
// Bind just does a POST binding RPC.
func (b *binder) Bind(binding *api.Binding) error {
glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
return b.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
return b.Client.Core().GetRESTClient().Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
// TODO: use Pods interface for binding once clusters are upgraded
// return b.Pods(binding.Namespace).Bind(binding)
}
type podConditionUpdater struct {
*client.Client
Client clientset.Interface
}
func (p *podConditionUpdater) Update(pod *api.Pod, condition *api.PodCondition) error {
glog.V(2).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
if api.UpdatePodCondition(&pod.Status, condition) {
_, err := p.Pods(pod.Namespace).UpdateStatus(pod)
_, err := p.Client.Core().Pods(pod.Namespace).UpdateStatus(pod)
return err
}
return nil

View File

@@ -28,8 +28,8 @@ import (
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
@@ -47,7 +47,7 @@ func TestCreate(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
factory := NewConfigFactory(client, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
factory.Create()
}
@@ -65,7 +65,7 @@ func TestCreateFromConfig(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
factory := NewConfigFactory(client, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
// Pre-register some predicate and priority functions
@@ -106,7 +106,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
factory := NewConfigFactory(client, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
configData = []byte(`{}`)
@@ -149,7 +149,7 @@ func TestDefaultErrorFunc(t *testing.T) {
mux.Handle(testapi.Default.ResourcePath("pods", "bar", "foo"), &handler)
server := httptest.NewServer(mux)
defer server.Close()
factory := NewConfigFactory(client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}), api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
factory := NewConfigFactory(clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}), api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
@@ -232,7 +232,7 @@ func TestBind(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
b := binder{client}
if err := b.Bind(item.binding); err != nil {
@@ -317,7 +317,7 @@ func TestResponsibleForPod(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
// factory of "default-scheduler"
factoryDefaultScheduler := NewConfigFactory(client, api.DefaultSchedulerName, api.DefaultHardPodAffinitySymmetricWeight, api.DefaultFailureDomains)
// factory of "foo-scheduler"
@@ -381,7 +381,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
server := httptest.NewServer(&handler)
// TODO: Uncomment when fix #19254
// defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
// factory of "default-scheduler"
factory := NewConfigFactory(client, api.DefaultSchedulerName, -1, api.DefaultFailureDomains)
_, err := factory.Create()
@@ -398,7 +398,7 @@ func TestInvalidFactoryArgs(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
testCases := []struct {
hardPodAffinitySymmetricWeight int