Files
vault/serviceregistration/kubernetes/retry_handler.go
2020-02-13 09:56:29 -08:00

132 lines
3.7 KiB
Go

package kubernetes
import (
"fmt"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/serviceregistration/kubernetes/client"
)
// How often to retry sending a state update if it fails.
var retryFreq = 5 * time.Second
// retryHandler executes retries.
// It is thread-safe.
type retryHandler struct {
// These don't need a mutex because they're never mutated.
logger hclog.Logger
namespace, podName string
// To synchronize setInitialState and patchesToRetry.
lock sync.Mutex
// setInitialState will be nil if this has been done successfully.
// It must be done before any patches are retried.
setInitialState func() error
// The map holds the path to the label being updated. It will only either
// not hold a particular label, or hold _the last_ state we were aware of.
// These should only be updated after initial state has been set.
patchesToRetry map[string]*client.Patch
}
func (r *retryHandler) SetInitialState(setInitialState func() error) {
r.lock.Lock()
defer r.lock.Unlock()
if err := setInitialState(); err != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("unable to set initial state due to %s, will retry", err.Error()))
}
r.setInitialState = setInitialState
}
}
// Run must be called for retries to be started.
func (r *retryHandler) Run(shutdownCh <-chan struct{}, wait *sync.WaitGroup, c *client.Client) {
// Run this in a go func so this call doesn't block.
go func() {
// Make sure Vault will give us time to finish up here.
wait.Add(1)
defer wait.Done()
retry := time.NewTicker(retryFreq)
defer retry.Stop()
for {
select {
case <-shutdownCh:
return
case <-retry.C:
r.retry(c)
}
}
}()
}
// Notify adds a patch to be retried until it's either completed without
// error, or no longer needed.
func (r *retryHandler) Notify(c *client.Client, patch *client.Patch) {
r.lock.Lock()
defer r.lock.Unlock()
// Initial state must be set first, or subsequent notifications we've
// received could get smashed by a late-arriving initial state.
// We will store this to retry it when appropriate.
if r.setInitialState != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("cannot notify of present state for %s because initial state is unset", patch.Path))
}
r.patchesToRetry[patch.Path] = patch
return
}
// Initial state has been sent, so it's OK to attempt a patch immediately.
if err := c.PatchPod(r.namespace, r.podName, patch); err != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("unable to update state for %s due to %s, will retry", patch.Path, err.Error()))
}
r.patchesToRetry[patch.Path] = patch
}
}
func (r *retryHandler) retry(c *client.Client) {
r.lock.Lock()
defer r.lock.Unlock()
// Initial state must be set first, or subsequent notifications we've
// received could get smashed by a late-arriving initial state.
if r.setInitialState != nil {
if err := r.setInitialState(); err != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("unable to set initial state due to %s, will retry", err.Error()))
}
// On failure, we leave the initial state func populated for
// the next retry.
return
}
// On success, we set it to nil and allow the logic to continue.
r.setInitialState = nil
}
if len(r.patchesToRetry) == 0 {
// Nothing further to do here.
return
}
patches := make([]*client.Patch, len(r.patchesToRetry))
i := 0
for _, patch := range r.patchesToRetry {
patches[i] = patch
i++
}
if err := c.PatchPod(r.namespace, r.podName, patches...); err != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("unable to update state for due to %s, will retry", err.Error()))
}
return
}
r.patchesToRetry = make(map[string]*client.Patch)
}