OSS parts of the new client controlled consistency feature (#10974)

This commit is contained in:
Nick Cabatoff
2021-02-24 06:58:10 -05:00
committed by GitHub
parent e84e7b2d76
commit 22b486b651
27 changed files with 754 additions and 44 deletions

View File

@@ -384,6 +384,8 @@ type Client struct {
wrappingLookupFunc WrappingLookupFunc
mfaCreds []string
policyOverride bool
requestCallbacks []RequestCallback
responseCallbacks []ResponseCallback
}
// NewClient returns a new client for the given configuration.
@@ -866,6 +868,10 @@ func (c *Client) RawRequestWithContext(ctx context.Context, r *Request) (*Respon
c.modifyLock.RUnlock()
for _, cb := range c.requestCallbacks {
cb(r)
}
if limiter != nil {
limiter.Wait(ctx)
}
@@ -907,7 +913,7 @@ START:
}
if checkRetry == nil {
checkRetry = retryablehttp.DefaultRetryPolicy
checkRetry = DefaultRetryPolicy
}
client := &retryablehttp.Client{
@@ -968,9 +974,91 @@ START:
goto START
}
if result != nil {
for _, cb := range c.responseCallbacks {
cb(result)
}
}
if err := result.Error(); err != nil {
return result, err
}
return result, nil
}
type RequestCallback func(*Request)
type ResponseCallback func(*Response)
// WithRequestCallbacks makes a shallow clone of Client, modifies it to use
// the given callbacks, and returns it. Each of the callbacks will be invoked
// on every outgoing request. A client may be used to issue requests
// concurrently; any locking needed by callbacks invoked concurrently is the
// callback's responsibility.
func (c *Client) WithRequestCallbacks(callbacks ...RequestCallback) *Client {
c2 := *c
c2.modifyLock = sync.RWMutex{}
c2.requestCallbacks = callbacks
return &c2
}
// WithResponseCallbacks makes a shallow clone of Client, modifies it to use
// the given callbacks, and returns it. Each of the callbacks will be invoked
// on every received response. A client may be used to issue requests
// concurrently; any locking needed by callbacks invoked concurrently is the
// callback's responsibility.
func (c *Client) WithResponseCallbacks(callbacks ...ResponseCallback) *Client {
c2 := *c
c2.modifyLock = sync.RWMutex{}
c2.responseCallbacks = callbacks
return &c2
}
// RecordState returns a response callback that will record the state returned
// by Vault in a response header.
func RecordState(state *string) ResponseCallback {
return func(resp *Response) {
*state = resp.Header.Get("X-Vault-Index")
}
}
// RequireState returns a request callback that will add a request header to
// specify the state we require of Vault. This state was obtained from a
// response header seen previous, probably captured with RecordState.
func RequireState(states ...string) RequestCallback {
return func(req *Request) {
for _, s := range states {
req.Headers.Add("X-Vault-Index", s)
}
}
}
// ForwardInconsistent returns a request callback that will add a request
// header which says: if the state required isn't present on the node receiving
// this request, forward it to the active node. This should be used in
// conjunction with RequireState.
func ForwardInconsistent() RequestCallback {
return func(req *Request) {
req.Headers.Set("X-Vault-Inconsistent", "forward-active-node")
}
}
// ForwardAlways returns a request callback which adds a header telling any
// performance standbys handling the request to forward it to the active node.
// This feature must be enabled in Vault's configuration.
func ForwardAlways() RequestCallback {
return func(req *Request) {
req.Headers.Set("X-Vault-Forward", "active-node")
}
}
// DefaultRetryPolicy is the default retry policy used by new Client objects.
// It is the same as retryablehttp.DefaultRetryPolicy except that it also retries
// 412 requests, which are returned by Vault when a X-Vault-Index header isn't
// satisfied.
func DefaultRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
retry, err := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if err != nil || retry {
return retry, err
}
return resp.StatusCode == 412, nil
}