mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-12-11 18:45:36 +00:00
Job: Handle error returned from AddEventHandler function (#119917)
* Job: Handle error returned from AddEventHandler function Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> * Use the error message the similar to CronJob Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> * Clean up error messages Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> * Put the tesing.T on the second place in the args for the newControllerFromClient function Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> * Put the testing.T on the second place in the args for the newControllerFromClientWithClock function Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> * Call t.Helper() Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> * Put the testing.TB on the second place in the args for the createJobControllerWithSharedInformers function and call tb.Helper() there Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> * Put the testing.TB on the second place in the args for the startJobControllerAndWaitForCaches function and call tb.Helper() there Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> * Adapt TestFinializerCleanup to the eventhandler error Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com> --------- Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
This commit is contained in:
@@ -52,7 +52,10 @@ func setup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating CronJob controller: %v", err)
|
||||
}
|
||||
jc := job.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
|
||||
jc, err := job.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating Job controller: %v", err)
|
||||
}
|
||||
|
||||
return server.TearDownFn, cjc, jc, informerSet, clientSet
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ func TestMetricsOnSuccesses(t *testing.T) {
|
||||
// setup the job controller
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
testCases := map[string]struct {
|
||||
@@ -151,7 +151,7 @@ func TestJobFinishedNumReasonMetric(t *testing.T) {
|
||||
// setup the job controller
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
testCases := map[string]struct {
|
||||
@@ -378,7 +378,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
|
||||
// Make the job controller significantly slower to trigger race condition.
|
||||
restConfig.QPS = 1
|
||||
restConfig.Burst = 1
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@@ -454,7 +454,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
|
||||
cancel()
|
||||
|
||||
// start the second controller to promote the interim FailureTarget job condition as Failed
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||
// verify the job is correctly marked as Failed
|
||||
validateJobFailed(ctx, t, cs, jobObj)
|
||||
validateNoOrphanPodsWithFinalizers(ctx, t, cs, jobObj)
|
||||
@@ -634,7 +634,7 @@ func TestJobPodFailurePolicy(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@@ -659,7 +659,7 @@ func TestJobPodFailurePolicy(t *testing.T) {
|
||||
|
||||
if test.restartController {
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||
}
|
||||
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
@@ -691,7 +691,7 @@ func TestBackoffLimitPerIndex_DelayedPodDeletion(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-failed")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@@ -766,7 +766,7 @@ func TestBackoffLimitPerIndex_Reenabling(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-reenabled")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
resetMetrics()
|
||||
|
||||
@@ -852,7 +852,7 @@ func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T)
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
@@ -1239,7 +1239,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@@ -1316,7 +1316,7 @@ func TestNonParallelJob(t *testing.T) {
|
||||
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@@ -1332,7 +1332,7 @@ func TestNonParallelJob(t *testing.T) {
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||
|
||||
// Failed Pod is replaced.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
@@ -1346,7 +1346,7 @@ func TestNonParallelJob(t *testing.T) {
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||
|
||||
// No more Pods are created after the Pod succeeds.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
@@ -1377,7 +1377,7 @@ func TestParallelJob(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
resetMetrics()
|
||||
|
||||
@@ -1463,7 +1463,7 @@ func TestParallelJob(t *testing.T) {
|
||||
func TestParallelJobParallelism(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
@@ -1533,7 +1533,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "completions")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
@@ -1607,7 +1607,7 @@ func TestIndexedJob(t *testing.T) {
|
||||
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
resetMetrics()
|
||||
|
||||
@@ -1758,7 +1758,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
resetMetrics()
|
||||
|
||||
@@ -1921,7 +1921,7 @@ func TestElasticIndexedJob(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
resetMetrics()
|
||||
|
||||
@@ -2004,7 +2004,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
|
||||
restConfig.QPS = 100
|
||||
restConfig.Burst = 100
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig)
|
||||
defer cancel()
|
||||
backoff := wait.Backoff{
|
||||
Duration: time.Second,
|
||||
@@ -2083,7 +2083,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
||||
// Make the job controller significantly slower to trigger race condition.
|
||||
restConfig.QPS = 1
|
||||
restConfig.Burst = 1
|
||||
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
|
||||
jc, ctx, cancel := createJobControllerWithSharedInformers(t, restConfig, informerSet)
|
||||
resetMetrics()
|
||||
defer cancel()
|
||||
restConfig.QPS = 200
|
||||
@@ -2126,7 +2126,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
|
||||
t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 50))
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
// Job tracking with finalizers requires less calls in Indexed mode,
|
||||
@@ -2165,7 +2165,7 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
|
||||
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second))
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
|
||||
@@ -2251,7 +2251,7 @@ func validateExpotentialBackoffDelay(t *testing.T, defaultPodFailureBackoff time
|
||||
func TestJobFailedWithInterrupts(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@@ -2291,7 +2291,7 @@ func TestJobFailedWithInterrupts(t *testing.T) {
|
||||
}
|
||||
t.Log("Recreating job controller")
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||
validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
|
||||
}
|
||||
|
||||
@@ -2322,7 +2322,7 @@ func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) {
|
||||
// Step 0: create job.
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@@ -2349,7 +2349,7 @@ func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) {
|
||||
}
|
||||
|
||||
// Step 3: Restart controller.
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
|
||||
}
|
||||
|
||||
@@ -2382,7 +2382,7 @@ func TestSuspendJob(t *testing.T) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
@@ -2433,7 +2433,7 @@ func TestSuspendJob(t *testing.T) {
|
||||
func TestSuspendJobControllerRestart(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
@@ -2455,7 +2455,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
|
||||
func TestNodeSelectorUpdate(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||
defer cancel()
|
||||
|
||||
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{
|
||||
@@ -2919,9 +2919,10 @@ func setup(t testing.TB, nsBaseName string) (framework.TearDownFunc, *restclient
|
||||
return closeFn, config, clientSet, ns
|
||||
}
|
||||
|
||||
func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context.Context, context.CancelFunc) {
|
||||
func startJobControllerAndWaitForCaches(tb testing.TB, restConfig *restclient.Config) (context.Context, context.CancelFunc) {
|
||||
tb.Helper()
|
||||
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0)
|
||||
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
|
||||
jc, ctx, cancel := createJobControllerWithSharedInformers(tb, restConfig, informerSet)
|
||||
informerSet.Start(ctx.Done())
|
||||
go jc.Run(ctx, 1)
|
||||
|
||||
@@ -2940,10 +2941,14 @@ func resetMetrics() {
|
||||
metrics.PodFailuresHandledByFailurePolicy.Reset()
|
||||
}
|
||||
|
||||
func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {
|
||||
func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {
|
||||
tb.Helper()
|
||||
clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller"))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
jc := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
|
||||
jc, err := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
|
||||
if err != nil {
|
||||
tb.Fatalf("Error creating Job controller: %v", err)
|
||||
}
|
||||
return jc, ctx, cancel
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user