mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #69902 from pbarker/audit-int
integration test for dynamic audit
This commit is contained in:
		@@ -56,6 +56,7 @@ go_test(
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/globalflag:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/rest:go_default_library",
 | 
			
		||||
        "//vendor/github.com/spf13/pflag:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -29,6 +29,7 @@ import (
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/storagebackend"
 | 
			
		||||
	utilflag "k8s.io/apiserver/pkg/util/flag"
 | 
			
		||||
	auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
 | 
			
		||||
	auditdynamic "k8s.io/apiserver/plugin/pkg/audit/dynamic"
 | 
			
		||||
	audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
 | 
			
		||||
	restclient "k8s.io/client-go/rest"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/legacyscheme"
 | 
			
		||||
@@ -241,6 +242,9 @@ func TestAddFlags(t *testing.T) {
 | 
			
		||||
				InitialBackoff:     2 * time.Second,
 | 
			
		||||
				GroupVersionString: "audit.k8s.io/v1alpha1",
 | 
			
		||||
			},
 | 
			
		||||
			DynamicOptions: apiserveroptions.AuditDynamicOptions{
 | 
			
		||||
				BatchConfig: auditdynamic.NewDefaultWebhookBatchConfig(),
 | 
			
		||||
			},
 | 
			
		||||
			PolicyFile: "/policy",
 | 
			
		||||
		},
 | 
			
		||||
		Features: &apiserveroptions.FeatureOptions{
 | 
			
		||||
 
 | 
			
		||||
@@ -148,9 +148,14 @@ type AuditWebhookOptions struct {
 | 
			
		||||
	GroupVersionString string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AuditDynamicOptions control the configuration of dynamic backends for audit events
 | 
			
		||||
type AuditDynamicOptions struct {
 | 
			
		||||
	// Enabled tells whether the dynamic audit capability is enabled.
 | 
			
		||||
	Enabled bool
 | 
			
		||||
 | 
			
		||||
	// Configuration for batching backend. This is currently only used as an override
 | 
			
		||||
	// for integration tests
 | 
			
		||||
	BatchConfig *pluginbuffered.BatchConfig
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewAuditOptions() *AuditOptions {
 | 
			
		||||
@@ -174,7 +179,8 @@ func NewAuditOptions() *AuditOptions {
 | 
			
		||||
			GroupVersionString: "audit.k8s.io/v1",
 | 
			
		||||
		},
 | 
			
		||||
		DynamicOptions: AuditDynamicOptions{
 | 
			
		||||
			Enabled: false,
 | 
			
		||||
			Enabled:     false,
 | 
			
		||||
			BatchConfig: plugindynamic.NewDefaultWebhookBatchConfig(),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -634,7 +640,7 @@ func (o *AuditDynamicOptions) newBackend(
 | 
			
		||||
 | 
			
		||||
	dc := &plugindynamic.Config{
 | 
			
		||||
		Informer:       informer,
 | 
			
		||||
		BufferedConfig: plugindynamic.NewDefaultWebhookBatchConfig(),
 | 
			
		||||
		BufferedConfig: o.BatchConfig,
 | 
			
		||||
		EventConfig: plugindynamic.EventConfig{
 | 
			
		||||
			Sink: eventSink,
 | 
			
		||||
			Source: corev1.EventSource{
 | 
			
		||||
 
 | 
			
		||||
@@ -11,7 +11,6 @@ go_test(
 | 
			
		||||
    srcs = [
 | 
			
		||||
        "apiserver_test.go",
 | 
			
		||||
        "main_test.go",
 | 
			
		||||
        "setup_test.go",
 | 
			
		||||
        "webhook_test.go",
 | 
			
		||||
    ],
 | 
			
		||||
    tags = ["integration"],
 | 
			
		||||
@@ -43,7 +42,6 @@ go_test(
 | 
			
		||||
        "//staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/sample-apiserver/pkg/cmd/server:go_default_library",
 | 
			
		||||
        "//test/integration/framework:go_default_library",
 | 
			
		||||
        "//vendor/github.com/pborman/uuid:go_default_library",
 | 
			
		||||
        "//vendor/github.com/stretchr/testify/assert:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -30,6 +30,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/master"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/master/reconcilers"
 | 
			
		||||
	"k8s.io/kubernetes/test/integration/framework"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestWebhookLoopback(t *testing.T) {
 | 
			
		||||
@@ -40,7 +41,7 @@ func TestWebhookLoopback(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	called := int32(0)
 | 
			
		||||
 | 
			
		||||
	client, _ := startTestServer(t, stopCh, TestServerSetup{
 | 
			
		||||
	client, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
 | 
			
		||||
		ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
 | 
			
		||||
		},
 | 
			
		||||
		ModifyServerConfig: func(config *master.Config) {
 | 
			
		||||
 
 | 
			
		||||
@@ -12,6 +12,7 @@ go_library(
 | 
			
		||||
        "master_utils.go",
 | 
			
		||||
        "perf_utils.go",
 | 
			
		||||
        "serializer.go",
 | 
			
		||||
        "test_server.go",
 | 
			
		||||
        "util.go",
 | 
			
		||||
    ],
 | 
			
		||||
    data = [
 | 
			
		||||
@@ -19,6 +20,8 @@ go_library(
 | 
			
		||||
    ],
 | 
			
		||||
    importpath = "k8s.io/kubernetes/test/integration/framework",
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//cmd/kube-apiserver/app:go_default_library",
 | 
			
		||||
        "//cmd/kube-apiserver/app/options:go_default_library",
 | 
			
		||||
        "//pkg/api/legacyscheme:go_default_library",
 | 
			
		||||
        "//pkg/api/testapi:go_default_library",
 | 
			
		||||
        "//pkg/apis/batch:go_default_library",
 | 
			
		||||
@@ -57,6 +60,7 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/rest:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/util/cert:go_default_library",
 | 
			
		||||
        "//test/e2e/framework:go_default_library",
 | 
			
		||||
        "//test/utils:go_default_library",
 | 
			
		||||
        "//vendor/github.com/go-openapi/spec:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package apiserver
 | 
			
		||||
package framework
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
@@ -36,7 +36,6 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/cmd/kube-apiserver/app"
 | 
			
		||||
	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/master"
 | 
			
		||||
	"k8s.io/kubernetes/test/integration/framework"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type TestServerSetup struct {
 | 
			
		||||
@@ -44,8 +43,8 @@ type TestServerSetup struct {
 | 
			
		||||
	ModifyServerConfig     func(*master.Config)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// startTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions
 | 
			
		||||
func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) {
 | 
			
		||||
// StartTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions
 | 
			
		||||
func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) {
 | 
			
		||||
	certDir, _ := ioutil.TempDir("", "test-integration-"+t.Name())
 | 
			
		||||
	go func() {
 | 
			
		||||
		<-stopCh
 | 
			
		||||
@@ -89,7 +88,7 @@ func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
 | 
			
		||||
	kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
 | 
			
		||||
	kubeAPIServerOptions.InsecureServing.BindPort = 0
 | 
			
		||||
	kubeAPIServerOptions.Etcd.StorageConfig.Prefix = path.Join("/", uuid.New(), "registry")
 | 
			
		||||
	kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
 | 
			
		||||
	kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
 | 
			
		||||
	kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
 | 
			
		||||
	kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
 | 
			
		||||
	kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
 | 
			
		||||
@@ -10,6 +10,7 @@ go_test(
 | 
			
		||||
    name = "go_default_test",
 | 
			
		||||
    size = "large",
 | 
			
		||||
    srcs = [
 | 
			
		||||
        "audit_dynamic_test.go",
 | 
			
		||||
        "audit_test.go",
 | 
			
		||||
        "crd_test.go",
 | 
			
		||||
        "graceful_shutdown_test.go",
 | 
			
		||||
@@ -22,6 +23,7 @@ go_test(
 | 
			
		||||
    embed = [":go_default_library"],
 | 
			
		||||
    tags = ["integration"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//cmd/kube-apiserver/app/options:go_default_library",
 | 
			
		||||
        "//cmd/kube-apiserver/app/testing:go_default_library",
 | 
			
		||||
        "//pkg/apis/core:go_default_library",
 | 
			
		||||
        "//pkg/client/clientset_generated/internalclientset:go_default_library",
 | 
			
		||||
@@ -66,6 +68,7 @@ go_test(
 | 
			
		||||
        "//test/integration/framework:go_default_library",
 | 
			
		||||
        "//test/utils:go_default_library",
 | 
			
		||||
        "//vendor/github.com/evanphx/json-patch:go_default_library",
 | 
			
		||||
        "//vendor/github.com/stretchr/testify/require:go_default_library",
 | 
			
		||||
        "//vendor/sigs.k8s.io/yaml:go_default_library",
 | 
			
		||||
    ] + select({
 | 
			
		||||
        "@io_bazel_rules_go//go/platform:android": [
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										282
									
								
								test/integration/master/audit_dynamic_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										282
									
								
								test/integration/master/audit_dynamic_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,282 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2018 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package master
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/require"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	auditinternal "k8s.io/apiserver/pkg/apis/audit"
 | 
			
		||||
	"k8s.io/apiserver/pkg/features"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
 | 
			
		||||
	"k8s.io/client-go/kubernetes"
 | 
			
		||||
	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
 | 
			
		||||
	"k8s.io/kubernetes/test/integration/framework"
 | 
			
		||||
	"k8s.io/kubernetes/test/utils"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TestDynamicAudit ensures that v1alpha of the auditregistration api works
 | 
			
		||||
func TestDynamicAudit(t *testing.T) {
 | 
			
		||||
	// start api server
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	defer close(stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
 | 
			
		||||
	kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
 | 
			
		||||
		ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
 | 
			
		||||
			opts.Audit.DynamicOptions.Enabled = true
 | 
			
		||||
			// set max batch size so the buffers flush immediately
 | 
			
		||||
			opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1
 | 
			
		||||
			opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true")
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	// create test sinks
 | 
			
		||||
	testServer1 := utils.NewAuditTestServer(t, "test1")
 | 
			
		||||
	defer testServer1.Close()
 | 
			
		||||
	testServer2 := utils.NewAuditTestServer(t, "test2")
 | 
			
		||||
	defer testServer2.Close()
 | 
			
		||||
 | 
			
		||||
	// check that servers are healthy
 | 
			
		||||
	require.NoError(t, testServer1.Health(), "server1 never became healthy")
 | 
			
		||||
	require.NoError(t, testServer2.Health(), "server2 never became healthy")
 | 
			
		||||
 | 
			
		||||
	// build AuditSink configurations
 | 
			
		||||
	sinkConfig1 := testServer1.BuildSinkConfiguration()
 | 
			
		||||
	sinkConfig2 := testServer2.BuildSinkConfiguration()
 | 
			
		||||
 | 
			
		||||
	// test creates a single audit sink, generates audit events, and ensures they arrive at the server
 | 
			
		||||
	success := t.Run("one sink", func(t *testing.T) {
 | 
			
		||||
		_, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig1)
 | 
			
		||||
		require.NoError(t, err, "failed to create audit sink1")
 | 
			
		||||
		t.Log("created audit sink1")
 | 
			
		||||
 | 
			
		||||
		// verify sink is ready
 | 
			
		||||
		sinkHealth(t, kubeclient, testServer1)
 | 
			
		||||
 | 
			
		||||
		// perform configmap ops
 | 
			
		||||
		configMapOperations(t, kubeclient)
 | 
			
		||||
 | 
			
		||||
		// check for corresponding events
 | 
			
		||||
		missing, err := testServer1.WaitForEvents(expectedEvents)
 | 
			
		||||
		require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
 | 
			
		||||
	})
 | 
			
		||||
	require.True(t, success) // propagate failure
 | 
			
		||||
 | 
			
		||||
	// test creates a second audit sink, generates audit events, and ensures events arrive in both servers
 | 
			
		||||
	success = t.Run("two sink", func(t *testing.T) {
 | 
			
		||||
		_, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig2)
 | 
			
		||||
		require.NoError(t, err, "failed to create audit sink2")
 | 
			
		||||
		t.Log("created audit sink2")
 | 
			
		||||
 | 
			
		||||
		// verify both sinks are ready
 | 
			
		||||
		sinkHealth(t, kubeclient, testServer1, testServer2)
 | 
			
		||||
 | 
			
		||||
		// perform configmap ops
 | 
			
		||||
		configMapOperations(t, kubeclient)
 | 
			
		||||
 | 
			
		||||
		// check for corresponding events in both sinks
 | 
			
		||||
		missing, err := testServer1.WaitForEvents(expectedEvents)
 | 
			
		||||
		require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
 | 
			
		||||
		missing, err = testServer2.WaitForEvents(expectedEvents)
 | 
			
		||||
		require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing)
 | 
			
		||||
	})
 | 
			
		||||
	require.True(t, success) // propagate failure
 | 
			
		||||
 | 
			
		||||
	// test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server
 | 
			
		||||
	success = t.Run("delete sink", func(t *testing.T) {
 | 
			
		||||
		err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(sinkConfig2.Name, &metav1.DeleteOptions{})
 | 
			
		||||
		require.NoError(t, err, "failed to delete audit sink2")
 | 
			
		||||
		t.Log("deleted audit sink2")
 | 
			
		||||
 | 
			
		||||
		var finalErr error
 | 
			
		||||
		err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
 | 
			
		||||
			// reset event lists
 | 
			
		||||
			testServer1.ResetEventList()
 | 
			
		||||
			testServer2.ResetEventList()
 | 
			
		||||
 | 
			
		||||
			// perform configmap ops
 | 
			
		||||
			configMapOperations(t, kubeclient)
 | 
			
		||||
 | 
			
		||||
			// check for corresponding events in server1
 | 
			
		||||
			missing, err := testServer1.WaitForEvents(expectedEvents)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing)
 | 
			
		||||
				return false, nil
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// check that server2 is empty
 | 
			
		||||
			if len(testServer2.GetEventList().Items) != 0 {
 | 
			
		||||
				finalErr = fmt.Errorf("server2 event list should be empty")
 | 
			
		||||
				return false, nil
 | 
			
		||||
			}
 | 
			
		||||
			return true, nil
 | 
			
		||||
		})
 | 
			
		||||
		require.NoError(t, err, finalErr)
 | 
			
		||||
	})
 | 
			
		||||
	require.True(t, success) // propagate failure
 | 
			
		||||
 | 
			
		||||
	// This test will run a background process that generates audit events sending them to a sink.
 | 
			
		||||
	// Whilst that generation is occurring, the sink is updated to point to a different server.
 | 
			
		||||
	// The test checks that no events are lost or duplicated during the update.
 | 
			
		||||
	t.Run("update sink", func(t *testing.T) {
 | 
			
		||||
		// fetch sink1 config
 | 
			
		||||
		sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(sinkConfig1.Name, metav1.GetOptions{})
 | 
			
		||||
		require.NoError(t, err)
 | 
			
		||||
 | 
			
		||||
		// reset event lists
 | 
			
		||||
		testServer1.ResetEventList()
 | 
			
		||||
		testServer2.ResetEventList()
 | 
			
		||||
 | 
			
		||||
		// run operations in background
 | 
			
		||||
		stopChan := make(chan struct{})
 | 
			
		||||
		expectedEvents := &atomic.Value{}
 | 
			
		||||
		expectedEvents.Store([]utils.AuditEvent{})
 | 
			
		||||
		wg := &sync.WaitGroup{}
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go asyncOps(stopChan, wg, kubeclient, expectedEvents)
 | 
			
		||||
 | 
			
		||||
		// check to see that at least 20 events have arrived in server1
 | 
			
		||||
		err = testServer1.WaitForNumEvents(20)
 | 
			
		||||
		require.NoError(t, err, "failed to find enough events in server1")
 | 
			
		||||
 | 
			
		||||
		// check that no events are in server 2 yet
 | 
			
		||||
		require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet")
 | 
			
		||||
 | 
			
		||||
		// update the url
 | 
			
		||||
		sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL
 | 
			
		||||
		_, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(sink1)
 | 
			
		||||
		require.NoError(t, err, "failed to update audit sink1")
 | 
			
		||||
		t.Log("updated audit sink1 to point to server2")
 | 
			
		||||
 | 
			
		||||
		// check that at least 20 events have arrived in server2
 | 
			
		||||
		err = testServer2.WaitForNumEvents(20)
 | 
			
		||||
		require.NoError(t, err, "failed to find enough events in server2")
 | 
			
		||||
 | 
			
		||||
		// stop the operations and ensure they have finished
 | 
			
		||||
		close(stopChan)
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
 | 
			
		||||
		// check that the final events have arrived
 | 
			
		||||
		expected := expectedEvents.Load().([]utils.AuditEvent)
 | 
			
		||||
		missing, err := testServer2.WaitForEvents(expected[len(expected)-4:])
 | 
			
		||||
		require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing)
 | 
			
		||||
 | 
			
		||||
		// combine the event lists
 | 
			
		||||
		el1 := testServer1.GetEventList()
 | 
			
		||||
		el2 := testServer2.GetEventList()
 | 
			
		||||
		combinedList := auditinternal.EventList{}
 | 
			
		||||
		combinedList.Items = append(el1.Items, el2.Items...)
 | 
			
		||||
 | 
			
		||||
		// check that there are no duplicate events
 | 
			
		||||
		dups, err := utils.CheckForDuplicates(combinedList)
 | 
			
		||||
		require.NoError(t, err, "duplicate events found: %#v", dups)
 | 
			
		||||
 | 
			
		||||
		// check that no events are missing
 | 
			
		||||
		missing, err = utils.CheckAuditList(combinedList, expected)
 | 
			
		||||
		require.NoError(t, err, "failed to match all expected events: %#v not found", missing)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// sinkHealth checks if sinks are running by verifying that uniquely identified events are found
 | 
			
		||||
// in the given servers
 | 
			
		||||
func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) {
 | 
			
		||||
	var missing []utils.AuditEvent
 | 
			
		||||
	i := 0
 | 
			
		||||
	var finalErr error
 | 
			
		||||
	err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
 | 
			
		||||
		i++
 | 
			
		||||
		name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
 | 
			
		||||
		expected, err := simpleOp(name, kubeclient)
 | 
			
		||||
		require.NoError(t, err, "could not perform config map operations")
 | 
			
		||||
 | 
			
		||||
		// check that all given servers have received events
 | 
			
		||||
		for _, server := range servers {
 | 
			
		||||
			missing, err = server.WaitForEvents(expected)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing)
 | 
			
		||||
				return false, nil
 | 
			
		||||
			}
 | 
			
		||||
			server.ResetEventList()
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
	require.NoError(t, err, finalErr)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// simpleOp is a function that simply tries to get a configmap with the given name and returns the
 | 
			
		||||
// corresponding expected audit event
 | 
			
		||||
func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) {
 | 
			
		||||
	_, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
 | 
			
		||||
	if err != nil && !errors.IsNotFound(err) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	expectedEvents := []utils.AuditEvent{
 | 
			
		||||
		{
 | 
			
		||||
			Level:             auditinternal.LevelRequestResponse,
 | 
			
		||||
			Stage:             auditinternal.StageResponseComplete,
 | 
			
		||||
			RequestURI:        fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
 | 
			
		||||
			Verb:              "get",
 | 
			
		||||
			Code:              404,
 | 
			
		||||
			User:              auditTestUser,
 | 
			
		||||
			Resource:          "configmaps",
 | 
			
		||||
			Namespace:         namespace,
 | 
			
		||||
			RequestObject:     false,
 | 
			
		||||
			ResponseObject:    true,
 | 
			
		||||
			AuthorizeDecision: "allow",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	return expectedEvents, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// asyncOps runs the simpleOp function until the stopChan is closed updating
 | 
			
		||||
// the expected atomic events list
 | 
			
		||||
func asyncOps(
 | 
			
		||||
	stopChan <-chan struct{},
 | 
			
		||||
	wg *sync.WaitGroup,
 | 
			
		||||
	kubeclient kubernetes.Interface,
 | 
			
		||||
	expected *atomic.Value,
 | 
			
		||||
) {
 | 
			
		||||
	for i := 0; ; i++ {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-stopChan:
 | 
			
		||||
			wg.Done()
 | 
			
		||||
			return
 | 
			
		||||
		default:
 | 
			
		||||
			name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
 | 
			
		||||
			exp, err := simpleOp(name, kubeclient)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				// retry on errors
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			e := expected.Load().([]utils.AuditEvent)
 | 
			
		||||
			evList := []utils.AuditEvent{}
 | 
			
		||||
			evList = append(e, exp...)
 | 
			
		||||
			expected.Store(evList)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -59,87 +59,8 @@ rules:
 | 
			
		||||
		"audit.k8s.io/v1":      auditv1.SchemeGroupVersion,
 | 
			
		||||
		"audit.k8s.io/v1beta1": auditv1beta1.SchemeGroupVersion,
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TestAudit ensures that both v1beta1 and v1 version audit api could work.
 | 
			
		||||
func TestAudit(t *testing.T) {
 | 
			
		||||
	for version := range versions {
 | 
			
		||||
		testAudit(t, version)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testAudit(t *testing.T, version string) {
 | 
			
		||||
	// prepare audit policy file
 | 
			
		||||
	auditPolicy := []byte(strings.Replace(auditPolicyPattern, "{version}", version, 1))
 | 
			
		||||
	policyFile, err := ioutil.TempFile("", "audit-policy.yaml")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to create audit policy file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer os.Remove(policyFile.Name())
 | 
			
		||||
	if _, err := policyFile.Write(auditPolicy); err != nil {
 | 
			
		||||
		t.Fatalf("Failed to write audit policy file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if err := policyFile.Close(); err != nil {
 | 
			
		||||
		t.Fatalf("Failed to close audit policy file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// prepare audit log file
 | 
			
		||||
	logFile, err := ioutil.TempFile("", "audit.log")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to create audit log file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer os.Remove(logFile.Name())
 | 
			
		||||
 | 
			
		||||
	// start api server
 | 
			
		||||
	result := kubeapiservertesting.StartTestServerOrDie(t, nil,
 | 
			
		||||
		[]string{
 | 
			
		||||
			"--audit-policy-file", policyFile.Name(),
 | 
			
		||||
			"--audit-log-version", version,
 | 
			
		||||
			"--audit-log-mode", "blocking",
 | 
			
		||||
			"--audit-log-path", logFile.Name()},
 | 
			
		||||
		framework.SharedEtcd())
 | 
			
		||||
	defer result.TearDownFn()
 | 
			
		||||
 | 
			
		||||
	kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	func() {
 | 
			
		||||
		// create, get, watch, update, patch, list and delete configmap.
 | 
			
		||||
		configMap := &apiv1.ConfigMap{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name: "audit-configmap",
 | 
			
		||||
			},
 | 
			
		||||
			Data: map[string]string{
 | 
			
		||||
				"map-key": "map-value",
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		_, err := kubeclient.CoreV1().ConfigMaps(namespace).Create(configMap)
 | 
			
		||||
		expectNoError(t, err, "failed to create audit-configmap")
 | 
			
		||||
 | 
			
		||||
		_, err = kubeclient.CoreV1().ConfigMaps(namespace).Get(configMap.Name, metav1.GetOptions{})
 | 
			
		||||
		expectNoError(t, err, "failed to get audit-configmap")
 | 
			
		||||
 | 
			
		||||
		configMapChan, err := kubeclient.CoreV1().ConfigMaps(namespace).Watch(watchOptions)
 | 
			
		||||
		expectNoError(t, err, "failed to create watch for config maps")
 | 
			
		||||
		configMapChan.Stop()
 | 
			
		||||
 | 
			
		||||
		_, err = kubeclient.CoreV1().ConfigMaps(namespace).Update(configMap)
 | 
			
		||||
		expectNoError(t, err, "failed to update audit-configmap")
 | 
			
		||||
 | 
			
		||||
		_, err = kubeclient.CoreV1().ConfigMaps(namespace).Patch(configMap.Name, types.JSONPatchType, patch)
 | 
			
		||||
		expectNoError(t, err, "failed to patch configmap")
 | 
			
		||||
 | 
			
		||||
		_, err = kubeclient.CoreV1().ConfigMaps(namespace).List(metav1.ListOptions{})
 | 
			
		||||
		expectNoError(t, err, "failed to list config maps")
 | 
			
		||||
 | 
			
		||||
		err = kubeclient.CoreV1().ConfigMaps(namespace).Delete(configMap.Name, &metav1.DeleteOptions{})
 | 
			
		||||
		expectNoError(t, err, "failed to delete audit-configmap")
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	expectedEvents := []utils.AuditEvent{
 | 
			
		||||
	expectedEvents = []utils.AuditEvent{
 | 
			
		||||
		{
 | 
			
		||||
			Level:             auditinternal.LevelRequestResponse,
 | 
			
		||||
			Stage:             auditinternal.StageResponseComplete,
 | 
			
		||||
@@ -238,7 +159,56 @@ func testAudit(t *testing.T, version string) {
 | 
			
		||||
			AuthorizeDecision: "allow",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TestAudit ensures that both v1beta1 and v1 version audit api could work.
 | 
			
		||||
func TestAudit(t *testing.T) {
 | 
			
		||||
	for version := range versions {
 | 
			
		||||
		testAudit(t, version)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testAudit(t *testing.T, version string) {
 | 
			
		||||
	// prepare audit policy file
 | 
			
		||||
	auditPolicy := []byte(strings.Replace(auditPolicyPattern, "{version}", version, 1))
 | 
			
		||||
	policyFile, err := ioutil.TempFile("", "audit-policy.yaml")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to create audit policy file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer os.Remove(policyFile.Name())
 | 
			
		||||
	if _, err := policyFile.Write(auditPolicy); err != nil {
 | 
			
		||||
		t.Fatalf("Failed to write audit policy file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if err := policyFile.Close(); err != nil {
 | 
			
		||||
		t.Fatalf("Failed to close audit policy file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// prepare audit log file
 | 
			
		||||
	logFile, err := ioutil.TempFile("", "audit.log")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to create audit log file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer os.Remove(logFile.Name())
 | 
			
		||||
 | 
			
		||||
	// start api server
 | 
			
		||||
	result := kubeapiservertesting.StartTestServerOrDie(t, nil,
 | 
			
		||||
		[]string{
 | 
			
		||||
			"--audit-policy-file", policyFile.Name(),
 | 
			
		||||
			"--audit-log-version", version,
 | 
			
		||||
			"--audit-log-mode", "blocking",
 | 
			
		||||
			"--audit-log-path", logFile.Name()},
 | 
			
		||||
		framework.SharedEtcd())
 | 
			
		||||
	defer result.TearDownFn()
 | 
			
		||||
 | 
			
		||||
	kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// perform configmap operations
 | 
			
		||||
	configMapOperations(t, kubeclient)
 | 
			
		||||
 | 
			
		||||
	// check for corresponding audit logs
 | 
			
		||||
	stream, err := os.Open(logFile.Name())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
@@ -253,6 +223,47 @@ func testAudit(t *testing.T, version string) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// configMapOperations is a set of known operations perfomed on the configmap type
 | 
			
		||||
// which correspond to the expected events.
 | 
			
		||||
// This is shared by the dynamic test
 | 
			
		||||
func configMapOperations(t *testing.T, kubeclient kubernetes.Interface) {
 | 
			
		||||
	// create, get, watch, update, patch, list and delete configmap.
 | 
			
		||||
	configMap := &apiv1.ConfigMap{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "audit-configmap",
 | 
			
		||||
		},
 | 
			
		||||
		Data: map[string]string{
 | 
			
		||||
			"map-key": "map-value",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err := kubeclient.CoreV1().ConfigMaps(namespace).Create(configMap)
 | 
			
		||||
	expectNoError(t, err, "failed to create audit-configmap")
 | 
			
		||||
 | 
			
		||||
	_, err = kubeclient.CoreV1().ConfigMaps(namespace).Get(configMap.Name, metav1.GetOptions{})
 | 
			
		||||
	expectNoError(t, err, "failed to get audit-configmap")
 | 
			
		||||
 | 
			
		||||
	configMapChan, err := kubeclient.CoreV1().ConfigMaps(namespace).Watch(watchOptions)
 | 
			
		||||
	expectNoError(t, err, "failed to create watch for config maps")
 | 
			
		||||
	for range configMapChan.ResultChan() {
 | 
			
		||||
		// Block until watchOptions.TimeoutSeconds expires.
 | 
			
		||||
		// If the test finishes before watchOptions.TimeoutSeconds expires, the watch audit
 | 
			
		||||
		// event at stage ResponseComplete will not be generated.
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = kubeclient.CoreV1().ConfigMaps(namespace).Update(configMap)
 | 
			
		||||
	expectNoError(t, err, "failed to update audit-configmap")
 | 
			
		||||
 | 
			
		||||
	_, err = kubeclient.CoreV1().ConfigMaps(namespace).Patch(configMap.Name, types.JSONPatchType, patch)
 | 
			
		||||
	expectNoError(t, err, "failed to patch configmap")
 | 
			
		||||
 | 
			
		||||
	_, err = kubeclient.CoreV1().ConfigMaps(namespace).List(metav1.ListOptions{})
 | 
			
		||||
	expectNoError(t, err, "failed to list config maps")
 | 
			
		||||
 | 
			
		||||
	err = kubeclient.CoreV1().ConfigMaps(namespace).Delete(configMap.Name, &metav1.DeleteOptions{})
 | 
			
		||||
	expectNoError(t, err, "failed to delete audit-configmap")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func expectNoError(t *testing.T, err error, msg string) {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("%s: %v", msg, err)
 | 
			
		||||
 
 | 
			
		||||
@@ -9,6 +9,7 @@ go_library(
 | 
			
		||||
    name = "go_default_library",
 | 
			
		||||
    srcs = [
 | 
			
		||||
        "audit.go",
 | 
			
		||||
        "audit_dynamic.go",
 | 
			
		||||
        "conditions.go",
 | 
			
		||||
        "create_resources.go",
 | 
			
		||||
        "delete_resources.go",
 | 
			
		||||
@@ -33,6 +34,7 @@ go_library(
 | 
			
		||||
        "//pkg/kubectl:go_default_library",
 | 
			
		||||
        "//pkg/util/labels:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/apps/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/batch/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
 | 
			
		||||
@@ -50,12 +52,14 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/audit:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/scale:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
 | 
			
		||||
        "//vendor/github.com/davecgh/go-spew/spew:go_default_library",
 | 
			
		||||
        "//vendor/github.com/stretchr/testify/require:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/klog:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -25,11 +25,14 @@ import (
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	auditinternal "k8s.io/apiserver/pkg/apis/audit"
 | 
			
		||||
	"k8s.io/apiserver/pkg/audit"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// AuditEvent is a simplified representation of an audit event for testing purposes
 | 
			
		||||
type AuditEvent struct {
 | 
			
		||||
	ID                 types.UID
 | 
			
		||||
	Level              auditinternal.Level
 | 
			
		||||
	Stage              auditinternal.Stage
 | 
			
		||||
	RequestURI         string
 | 
			
		||||
@@ -45,17 +48,21 @@ type AuditEvent struct {
 | 
			
		||||
	AuthorizeDecision  string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Search the audit log for the expected audit lines.
 | 
			
		||||
// CheckAuditLines searches the audit log for the expected audit lines.
 | 
			
		||||
// if includeID is true the event ids will also be verified
 | 
			
		||||
func CheckAuditLines(stream io.Reader, expected []AuditEvent, version schema.GroupVersion) (missing []AuditEvent, err error) {
 | 
			
		||||
	expectations := map[AuditEvent]bool{}
 | 
			
		||||
	for _, event := range expected {
 | 
			
		||||
		expectations[event] = false
 | 
			
		||||
	}
 | 
			
		||||
	expectations := buildEventExpectations(expected)
 | 
			
		||||
 | 
			
		||||
	scanner := bufio.NewScanner(stream)
 | 
			
		||||
	for scanner.Scan() {
 | 
			
		||||
		line := scanner.Text()
 | 
			
		||||
		event, err := parseAuditLine(line, version)
 | 
			
		||||
		e := &auditinternal.Event{}
 | 
			
		||||
		decoder := audit.Codecs.UniversalDecoder(version)
 | 
			
		||||
		if err := runtime.DecodeInto(decoder, []byte(line), e); err != nil {
 | 
			
		||||
			return expected, fmt.Errorf("failed decoding buf: %s, apiVersion: %s", line, version)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		event, err := testEventFromInternal(e)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return expected, err
 | 
			
		||||
		}
 | 
			
		||||
@@ -69,22 +76,65 @@ func CheckAuditLines(stream io.Reader, expected []AuditEvent, version schema.Gro
 | 
			
		||||
		return expected, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	missing = make([]AuditEvent, 0)
 | 
			
		||||
	for event, found := range expectations {
 | 
			
		||||
		if !found {
 | 
			
		||||
			missing = append(missing, event)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	missing = findMissing(expectations)
 | 
			
		||||
	return missing, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func parseAuditLine(line string, version schema.GroupVersion) (AuditEvent, error) {
 | 
			
		||||
	e := &auditinternal.Event{}
 | 
			
		||||
	decoder := audit.Codecs.UniversalDecoder(version)
 | 
			
		||||
	if err := runtime.DecodeInto(decoder, []byte(line), e); err != nil {
 | 
			
		||||
		return AuditEvent{}, fmt.Errorf("failed decoding buf: %s, apiVersion: %s", line, version)
 | 
			
		||||
// CheckAuditList searches an audit event list for the expected audit events.
 | 
			
		||||
// if includeID is true the event ids will also be verified
 | 
			
		||||
func CheckAuditList(el auditinternal.EventList, expected []AuditEvent) (missing []AuditEvent, err error) {
 | 
			
		||||
	expectations := buildEventExpectations(expected)
 | 
			
		||||
 | 
			
		||||
	for _, e := range el.Items {
 | 
			
		||||
		event, err := testEventFromInternal(&e)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return expected, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// If the event was expected, mark it as found.
 | 
			
		||||
		if _, found := expectations[event]; found {
 | 
			
		||||
			expectations[event] = true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	missing = findMissing(expectations)
 | 
			
		||||
	return missing, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CheckForDuplicates checks a list for duplicate events
 | 
			
		||||
func CheckForDuplicates(el auditinternal.EventList) (auditinternal.EventList, error) {
 | 
			
		||||
	// eventMap holds a map of audit events with just a nil value
 | 
			
		||||
	eventMap := map[AuditEvent]*bool{}
 | 
			
		||||
	duplicates := auditinternal.EventList{}
 | 
			
		||||
	var err error
 | 
			
		||||
	for _, e := range el.Items {
 | 
			
		||||
		event, err := testEventFromInternal(&e)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return duplicates, err
 | 
			
		||||
		}
 | 
			
		||||
		event.ID = e.AuditID
 | 
			
		||||
		if _, ok := eventMap[event]; ok {
 | 
			
		||||
			duplicates.Items = append(duplicates.Items, e)
 | 
			
		||||
			err = fmt.Errorf("failed duplicate check")
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		eventMap[event] = nil
 | 
			
		||||
	}
 | 
			
		||||
	return duplicates, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// buildEventExpectations creates a bool map out of a list of audit events
 | 
			
		||||
func buildEventExpectations(expected []AuditEvent) map[AuditEvent]bool {
 | 
			
		||||
	expectations := map[AuditEvent]bool{}
 | 
			
		||||
	for _, event := range expected {
 | 
			
		||||
		expectations[event] = false
 | 
			
		||||
	}
 | 
			
		||||
	return expectations
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// testEventFromInternal takes an internal audit event and returns a test event
 | 
			
		||||
// if includeID is true the event id will be included
 | 
			
		||||
func testEventFromInternal(e *auditinternal.Event) (AuditEvent, error) {
 | 
			
		||||
	event := AuditEvent{
 | 
			
		||||
		Level:      e.Level,
 | 
			
		||||
		Stage:      e.Stage,
 | 
			
		||||
@@ -113,3 +163,14 @@ func parseAuditLine(line string, version schema.GroupVersion) (AuditEvent, error
 | 
			
		||||
	event.AuthorizeDecision = e.Annotations["authorization.k8s.io/decision"]
 | 
			
		||||
	return event, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// findMissing checks for false values in the expectations map and returns them as a list
 | 
			
		||||
func findMissing(expectations map[AuditEvent]bool) []AuditEvent {
 | 
			
		||||
	var missing []AuditEvent
 | 
			
		||||
	for event, found := range expectations {
 | 
			
		||||
		if !found {
 | 
			
		||||
			missing = append(missing, event)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return missing
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										193
									
								
								test/utils/audit_dynamic.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										193
									
								
								test/utils/audit_dynamic.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,193 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2018 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package utils
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/httptest"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/require"
 | 
			
		||||
 | 
			
		||||
	auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	auditinternal "k8s.io/apiserver/pkg/apis/audit"
 | 
			
		||||
	auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
 | 
			
		||||
	"k8s.io/apiserver/pkg/audit"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// AuditTestServer is a helper server for dynamic audit testing
 | 
			
		||||
type AuditTestServer struct {
 | 
			
		||||
	Name            string
 | 
			
		||||
	LockedEventList *LockedEventList
 | 
			
		||||
	Server          *httptest.Server
 | 
			
		||||
	t               *testing.T
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LockedEventList is an event list with a lock for concurrent access
 | 
			
		||||
type LockedEventList struct {
 | 
			
		||||
	*sync.RWMutex
 | 
			
		||||
	EventList auditinternal.EventList
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewLockedEventList returns a new LockedEventList
 | 
			
		||||
func NewLockedEventList() *LockedEventList {
 | 
			
		||||
	return &LockedEventList{
 | 
			
		||||
		RWMutex:   &sync.RWMutex{},
 | 
			
		||||
		EventList: auditinternal.EventList{},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewAuditTestServer returns a new audit test server
 | 
			
		||||
func NewAuditTestServer(t *testing.T, name string) *AuditTestServer {
 | 
			
		||||
	s := &AuditTestServer{
 | 
			
		||||
		Name:            name,
 | 
			
		||||
		LockedEventList: NewLockedEventList(),
 | 
			
		||||
		t:               t,
 | 
			
		||||
	}
 | 
			
		||||
	s.buildServer()
 | 
			
		||||
	return s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetEventList safely returns the internal event list
 | 
			
		||||
func (a *AuditTestServer) GetEventList() auditinternal.EventList {
 | 
			
		||||
	a.LockedEventList.RLock()
 | 
			
		||||
	defer a.LockedEventList.RUnlock()
 | 
			
		||||
	return a.LockedEventList.EventList
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ResetEventList resets the internal event list
 | 
			
		||||
func (a *AuditTestServer) ResetEventList() {
 | 
			
		||||
	a.LockedEventList.Lock()
 | 
			
		||||
	defer a.LockedEventList.Unlock()
 | 
			
		||||
	a.LockedEventList.EventList = auditinternal.EventList{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AppendEvents will add the given events to the internal event list
 | 
			
		||||
func (a *AuditTestServer) AppendEvents(events []auditinternal.Event) {
 | 
			
		||||
	a.LockedEventList.Lock()
 | 
			
		||||
	defer a.LockedEventList.Unlock()
 | 
			
		||||
	a.LockedEventList.EventList.Items = append(a.LockedEventList.EventList.Items, events...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitForEvents waits for the given events to arrive in the server or the 30s timeout is reached
 | 
			
		||||
func (a *AuditTestServer) WaitForEvents(expected []AuditEvent) ([]AuditEvent, error) {
 | 
			
		||||
	var missing []AuditEvent
 | 
			
		||||
	err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
 | 
			
		||||
		var err error
 | 
			
		||||
		a.LockedEventList.RLock()
 | 
			
		||||
		defer a.LockedEventList.RUnlock()
 | 
			
		||||
		el := a.GetEventList()
 | 
			
		||||
		if len(el.Items) < 1 {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		missing, err = CheckAuditList(el, expected)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
	return missing, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitForNumEvents checks that at least the given number of events has arrived or the 30s timeout is reached
 | 
			
		||||
func (a *AuditTestServer) WaitForNumEvents(numEvents int) error {
 | 
			
		||||
	err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
 | 
			
		||||
		el := a.GetEventList()
 | 
			
		||||
		if len(el.Items) < numEvents {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("%v: %d events failed to arrive in %v", err, numEvents, wait.ForeverTestTimeout)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Health polls the server healthcheck until successful or the 30s timeout has been reached
 | 
			
		||||
func (a *AuditTestServer) Health() error {
 | 
			
		||||
	err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
 | 
			
		||||
		resp, err := http.Get(fmt.Sprintf("%s/health", a.Server.URL))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		if resp.StatusCode != 200 {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("server %s permanently failed health check: %v", a.Server.URL, err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close the server
 | 
			
		||||
func (a *AuditTestServer) Close() {
 | 
			
		||||
	a.Server.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BuildSinkConfiguration creates a generic audit sink configuration for this server
 | 
			
		||||
func (a *AuditTestServer) BuildSinkConfiguration() *auditregv1alpha1.AuditSink {
 | 
			
		||||
	return &auditregv1alpha1.AuditSink{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: a.Name,
 | 
			
		||||
		},
 | 
			
		||||
		Spec: auditregv1alpha1.AuditSinkSpec{
 | 
			
		||||
			Policy: auditregv1alpha1.Policy{
 | 
			
		||||
				Level: auditregv1alpha1.LevelRequestResponse,
 | 
			
		||||
				Stages: []auditregv1alpha1.Stage{
 | 
			
		||||
					auditregv1alpha1.StageResponseStarted,
 | 
			
		||||
					auditregv1alpha1.StageResponseComplete,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			Webhook: auditregv1alpha1.Webhook{
 | 
			
		||||
				ClientConfig: auditregv1alpha1.WebhookClientConfig{
 | 
			
		||||
					URL: &a.Server.URL,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// buildServer creates an http test server that will update the internal event list
 | 
			
		||||
// with the value it receives
 | 
			
		||||
func (a *AuditTestServer) buildServer() {
 | 
			
		||||
	decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion)
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
		body, err := ioutil.ReadAll(r.Body)
 | 
			
		||||
		require.NoError(a.t, err, "could not read request body")
 | 
			
		||||
		el := auditinternal.EventList{}
 | 
			
		||||
		err = runtime.DecodeInto(decoder, body, &el)
 | 
			
		||||
		r.Body.Close()
 | 
			
		||||
		require.NoError(a.t, err, "failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion)
 | 
			
		||||
		a.AppendEvents(el.Items)
 | 
			
		||||
		w.WriteHeader(200)
 | 
			
		||||
	})
 | 
			
		||||
	mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
		w.WriteHeader(200)
 | 
			
		||||
	})
 | 
			
		||||
	a.Server = httptest.NewServer(mux)
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user