Adds filtering of hosts to DialContexts.

The provided DialContext wraps existing clients' DialContext in an attempt to
preserve any existing timeout configuration. In some cases, we may replace
infinite timeouts with golang defaults.

- scaleio: tcp connect/keepalive values changed from 0/15 to 30/30
- storageos: no change
This commit is contained in:
Matthew Cary
2020-06-09 21:30:40 +00:00
parent 74dbf274d9
commit f2e23afcf1
41 changed files with 346 additions and 65 deletions

View File

@@ -18,6 +18,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/volume",
visibility = ["//visibility:public"],
deps = [
"//pkg/proxy/util:go_default_library",
"//pkg/volume/util/fs:go_default_library",
"//pkg/volume/util/hostutil:go_default_library",
"//pkg/volume/util/recyclerclient:go_default_library",

View File

@@ -39,6 +39,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
"k8s.io/kubernetes/pkg/volume/util/subpath"
@@ -450,6 +451,9 @@ type VolumeHost interface {
// Returns an interface that should be used to execute subpath operations
GetSubpather() subpath.Interface
// Returns options to pass for proxyutil filtered dialers.
GetFilteredDialOptions() *proxyutil.FilteredDialOptions
}
// VolumePluginMgr tracks registered plugins.

View File

@@ -41,6 +41,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/volume/scaleio",
deps = [
"//pkg/proxy/util:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"regexp"
@@ -33,6 +34,7 @@ import (
sio "github.com/thecodeteam/goscaleio"
siotypes "github.com/thecodeteam/goscaleio/types/v1"
"k8s.io/klog/v2"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
)
var (
@@ -56,37 +58,39 @@ type sioInterface interface {
}
type sioClient struct {
client *sio.Client
gateway string
username string
password string
insecure bool
certsEnabled bool
system *siotypes.System
sysName string
sysClient *sio.System
protectionDomain *siotypes.ProtectionDomain
pdName string
pdClient *sio.ProtectionDomain
storagePool *siotypes.StoragePool
spName string
spClient *sio.StoragePool
provisionMode string
sdcPath string
sdcGUID string
instanceID string
inited bool
diskRegex *regexp.Regexp
mtx sync.Mutex
exec utilexec.Interface
client *sio.Client
gateway string
username string
password string
insecure bool
certsEnabled bool
system *siotypes.System
sysName string
sysClient *sio.System
protectionDomain *siotypes.ProtectionDomain
pdName string
pdClient *sio.ProtectionDomain
storagePool *siotypes.StoragePool
spName string
spClient *sio.StoragePool
provisionMode string
sdcPath string
sdcGUID string
instanceID string
inited bool
diskRegex *regexp.Regexp
mtx sync.Mutex
exec utilexec.Interface
filteredDialOptions *proxyutil.FilteredDialOptions
}
func newSioClient(gateway, username, password string, sslEnabled bool, exec utilexec.Interface) (*sioClient, error) {
func newSioClient(gateway, username, password string, sslEnabled bool, exec utilexec.Interface, filteredDialOptions *proxyutil.FilteredDialOptions) (*sioClient, error) {
client := new(sioClient)
client.gateway = gateway
client.username = username
client.password = password
client.exec = exec
client.filteredDialOptions = filteredDialOptions
if sslEnabled {
client.insecure = false
client.certsEnabled = true
@@ -118,6 +122,15 @@ func (c *sioClient) init() error {
klog.Error(log("failed to create client: %v", err))
return err
}
transport, ok := client.Http.Transport.(*http.Transport)
if !ok {
return errors.New("could not set http.Transport options for scaleio client")
}
//lint:ignore SA1019 DialTLS must be used to support legacy clients.
if transport.DialTLS != nil {
return errors.New("DialTLS will be used instead of DialContext")
}
transport.DialContext = proxyutil.NewFilteredDialContext(transport.DialContext, nil, c.filteredDialOptions)
c.client = client
if _, err = c.client.Authenticate(
&sio.ConfigConnect{

View File

@@ -21,6 +21,7 @@ import (
"strconv"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume"
utilexec "k8s.io/utils/exec"
siotypes "github.com/thecodeteam/goscaleio/types/v1"
@@ -30,9 +31,10 @@ type sioMgr struct {
client sioInterface
configData map[string]string
exec utilexec.Interface
host volume.VolumeHost
}
func newSioMgr(configs map[string]string, exec utilexec.Interface) (*sioMgr, error) {
func newSioMgr(configs map[string]string, host volume.VolumeHost, exec utilexec.Interface) (*sioMgr, error) {
if configs == nil {
return nil, errors.New("missing configuration data")
}
@@ -41,7 +43,7 @@ func newSioMgr(configs map[string]string, exec utilexec.Interface) (*sioMgr, err
configs[confKey.sdcRootPath] = defaultString(configs[confKey.sdcRootPath], sdcRootPath)
configs[confKey.storageMode] = defaultString(configs[confKey.storageMode], "ThinProvisioned")
mgr := &sioMgr{configData: configs, exec: exec}
mgr := &sioMgr{configData: configs, host: host, exec: exec}
return mgr, nil
}
@@ -61,7 +63,7 @@ func (m *sioMgr) getClient() (sioInterface, error) {
certsEnabled := b
klog.V(4).Info(log("creating new client for gateway %s", gateway))
client, err := newSioClient(gateway, username, password, certsEnabled, m.exec)
client, err := newSioClient(gateway, username, password, certsEnabled, m.exec, m.host.GetFilteredDialOptions())
if err != nil {
klog.Error(log("failed to create scaleio client: %v", err))
return nil, err

View File

@@ -22,6 +22,7 @@ import (
"time"
siotypes "github.com/thecodeteam/goscaleio/types/v1"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/utils/exec/testing"
)
@@ -42,7 +43,8 @@ var (
)
func newTestMgr(t *testing.T) *sioMgr {
mgr, err := newSioMgr(fakeConfig, &testingexec.FakeExec{})
host := volumetesting.NewFakeVolumeHost(t, "/tmp/fake", nil, nil)
mgr, err := newSioMgr(fakeConfig, host, &testingexec.FakeExec{})
if err != nil {
t.Error(err)
}
@@ -51,7 +53,8 @@ func newTestMgr(t *testing.T) *sioMgr {
}
func TestMgrNew(t *testing.T) {
mgr, err := newSioMgr(fakeConfig, &testingexec.FakeExec{})
host := volumetesting.NewFakeVolumeHost(t, "/tmp/fake", nil, nil)
mgr, err := newSioMgr(fakeConfig, host, &testingexec.FakeExec{})
if err != nil {
t.Fatal(err)
}

View File

@@ -405,7 +405,7 @@ func (v *sioVolume) setSioMgr() error {
klog.Error(log("failed to retrieve sdc guid: %v", err))
return err
}
mgr, err := newSioMgr(configData, v.plugin.host.GetExec(v.plugin.GetPluginName()))
mgr, err := newSioMgr(configData, v.plugin.host, v.plugin.host.GetExec(v.plugin.GetPluginName()))
if err != nil {
klog.Error(log("failed to reset sio manager: %v", err))
@@ -444,8 +444,7 @@ func (v *sioVolume) resetSioMgr() error {
klog.Error(log("failed to retrieve sdc guid: %v", err))
return err
}
mgr, err := newSioMgr(configData, v.plugin.host.GetExec(v.plugin.GetPluginName()))
mgr, err := newSioMgr(configData, v.plugin.host, v.plugin.host.GetExec(v.plugin.GetPluginName()))
if err != nil {
klog.Error(log("failed to reset scaleio mgr: %v", err))
@@ -480,8 +479,7 @@ func (v *sioVolume) setSioMgrFromConfig() error {
klog.Error(log("failed to load secret: %v", err))
return err
}
mgr, err := newSioMgr(data, v.plugin.host.GetExec(v.plugin.GetPluginName()))
mgr, err := newSioMgr(data, v.plugin.host, v.plugin.host.GetExec(v.plugin.GetPluginName()))
if err != nil {
klog.Error(log("failed while setting scaleio mgr from config: %v", err))
@@ -516,8 +514,7 @@ func (v *sioVolume) setSioMgrFromSpec() error {
klog.Error(log("failed to load secret: %v", err))
return err
}
mgr, err := newSioMgr(configData, v.plugin.host.GetExec(v.plugin.GetPluginName()))
mgr, err := newSioMgr(configData, v.plugin.host, v.plugin.host.GetExec(v.plugin.GetPluginName()))
if err != nil {
klog.Error(log("failed to reset sio manager: %v", err))

View File

@@ -15,6 +15,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/volume/storageos",
deps = [
"//pkg/proxy/util:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@@ -110,7 +110,7 @@ func (plugin *storageosPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volu
return nil, err
}
return plugin.newMounterInternal(spec, pod, apiCfg, &storageosUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()))
return plugin.newMounterInternal(spec, pod, apiCfg, &storageosUtil{host: plugin.host}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()))
}
func (plugin *storageosPlugin) newMounterInternal(spec *volume.Spec, pod *v1.Pod, apiCfg *storageosAPIConfig, manager storageosManager, mounter mount.Interface, exec utilexec.Interface) (volume.Mounter, error) {
@@ -142,7 +142,7 @@ func (plugin *storageosPlugin) newMounterInternal(spec *volume.Spec, pod *v1.Pod
}
func (plugin *storageosPlugin) NewUnmounter(pvName string, podUID types.UID) (volume.Unmounter, error) {
return plugin.newUnmounterInternal(pvName, podUID, &storageosUtil{}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()))
return plugin.newUnmounterInternal(pvName, podUID, &storageosUtil{host: plugin.host}, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()))
}
func (plugin *storageosPlugin) newUnmounterInternal(pvName string, podUID types.UID, manager storageosManager, mounter mount.Interface, exec utilexec.Interface) (volume.Unmounter, error) {
@@ -194,7 +194,7 @@ func (plugin *storageosPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, er
return nil, fmt.Errorf("failed to get admin secret from [%q/%q]: %v", adminSecretNamespace, adminSecretName, err)
}
return plugin.newDeleterInternal(spec, apiCfg, &storageosUtil{})
return plugin.newDeleterInternal(spec, apiCfg, &storageosUtil{host: plugin.host})
}
func (plugin *storageosPlugin) newDeleterInternal(spec *volume.Spec, apiCfg *storageosAPIConfig, manager storageosManager) (volume.Deleter, error) {
@@ -215,7 +215,7 @@ func (plugin *storageosPlugin) newDeleterInternal(spec *volume.Spec, apiCfg *sto
}
func (plugin *storageosPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
return plugin.newProvisionerInternal(options, &storageosUtil{})
return plugin.newProvisionerInternal(options, &storageosUtil{host: plugin.host})
}
func (plugin *storageosPlugin) newProvisionerInternal(options volume.VolumeOptions, manager storageosManager) (volume.Provisioner, error) {

View File

@@ -26,6 +26,8 @@ import (
storageosapi "github.com/storageos/go-api"
storageostypes "github.com/storageos/go-api/types"
"k8s.io/klog/v2"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/volume"
utilexec "k8s.io/utils/exec"
)
@@ -76,13 +78,17 @@ type apiImplementer interface {
// storageosUtil is the utility structure to interact with the StorageOS API.
type storageosUtil struct {
api apiImplementer
api apiImplementer
host volume.VolumeHost
}
func (u *storageosUtil) NewAPI(apiCfg *storageosAPIConfig) error {
if u.api != nil {
return nil
}
if u.host == nil {
return errors.New("host must not be nil")
}
if apiCfg == nil {
apiCfg = &storageosAPIConfig{
apiAddr: defaultAPIAddress,
@@ -98,6 +104,9 @@ func (u *storageosUtil) NewAPI(apiCfg *storageosAPIConfig) error {
return err
}
api.SetAuth(apiCfg.apiUser, apiCfg.apiPass)
if err := api.SetDialContext(proxyutil.NewFilteredDialContext(api.GetDialContext(), nil, u.host.GetFilteredDialOptions())); err != nil {
return fmt.Errorf("failed to set DialContext in storageos client: %v", err)
}
u.api = api
return nil
}

View File

@@ -49,8 +49,12 @@ func GetAPIConfig() *storageosAPIConfig {
}
func TestClient(t *testing.T) {
util := storageosUtil{}
err := util.NewAPI(GetAPIConfig())
tmpDir, err := utiltesting.MkTmpdir("storageos_test")
if err != nil {
t.Fatalf("error creating tmpdir: %v", err)
}
util := storageosUtil{host: volumetest.NewFakeVolumeHost(t, tmpDir, nil, nil)}
err = util.NewAPI(GetAPIConfig())
if err != nil {
t.Fatalf("error getting api config: %v", err)
}

View File

@@ -13,6 +13,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/volume/testing",
deps = [
"//pkg/proxy/util:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/hostutil:go_default_library",

View File

@@ -46,6 +46,7 @@ import (
"k8s.io/client-go/tools/record"
utiltesting "k8s.io/client-go/util/testing"
cloudprovider "k8s.io/cloud-provider"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
. "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
@@ -118,6 +119,7 @@ type fakeVolumeHost struct {
informerFactory informers.SharedInformerFactory
kubeletErr error
mux sync.Mutex
filteredDialOptions *proxyutil.FilteredDialOptions
}
var _ VolumeHost = &fakeVolumeHost{}
@@ -207,6 +209,10 @@ func (f *fakeVolumeHost) GetSubpather() subpath.Interface {
return f.subpather
}
func (f *fakeVolumeHost) GetFilteredDialOptions() *proxyutil.FilteredDialOptions {
return f.filteredDialOptions
}
func (f *fakeVolumeHost) GetPluginMgr() *VolumePluginMgr {
return f.pluginMgr
}