Observer pattern for service registration interface (#8123)

* use observer pattern for service discovery

* update perf standby method

* fix test

* revert usersTags to being called serviceTags

* use previous consul code

* vault isnt a performance standby before starting

* log err

* changes from feedback

* add Run method to interface

* changes from feedback

* fix core test

* update example
This commit is contained in:
Becca Petrin
2020-01-24 09:42:03 -08:00
committed by GitHub
parent ae0892b3ce
commit d7d4084c86
13 changed files with 480 additions and 378 deletions

View File

@@ -919,11 +919,22 @@ func (c *ServerCommand) Run(args []string) int {
return 1
}
if config.Storage.Type == "raft" {
// Do any custom configuration needed per backend
switch config.Storage.Type {
case "consul":
if config.ServiceRegistration == nil {
// If Consul is configured for storage and service registration is unconfigured,
// use Consul for service registration without requiring additional configuration.
// This maintains backward-compatibility.
config.ServiceRegistration = &server.ServiceRegistration{
Type: "consul",
Config: config.Storage.Config,
}
}
case "raft":
if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" {
config.ClusterAddr = envCA
}
if len(config.ClusterAddr) == 0 {
c.UI.Error("Cluster address must be set when using raft storage")
return 1
@@ -943,6 +954,9 @@ func (c *ServerCommand) Run(args []string) int {
return 1
}
// Instantiate the wait group
c.WaitGroup = &sync.WaitGroup{}
// Initialize the Service Discovery, if there is one
var configSR sr.ServiceRegistration
if config.ServiceRegistration != nil {
@@ -954,11 +968,25 @@ func (c *ServerCommand) Run(args []string) int {
namedSDLogger := c.logger.Named("service_registration." + config.ServiceRegistration.Type)
allLoggers = append(allLoggers, namedSDLogger)
configSR, err = sdFactory(config.ServiceRegistration.Config, namedSDLogger)
// Since we haven't even begun starting Vault's core yet,
// we know that Vault is in its pre-running state.
state := sr.State{
VaultVersion: version.GetVersion().VersionNumber(),
IsInitialized: false,
IsSealed: true,
IsActive: false,
IsPerformanceStandby: false,
}
configSR, err = sdFactory(config.ServiceRegistration.Config, namedSDLogger, state, config.Storage.RedirectAddr)
if err != nil {
c.UI.Error(fmt.Sprintf("Error initializing service_registration of type %s: %s", config.ServiceRegistration.Type, err))
return 1
}
if err := configSR.Run(c.ShutdownCh, c.WaitGroup); err != nil {
c.UI.Error(fmt.Sprintf("Error running service_registration of type %s: %s", config.ServiceRegistration.Type, err))
return 1
}
}
infoKeys := make([]string, 0, 10)
@@ -1514,26 +1542,7 @@ CLUSTER_SYNTHESIS_COMPLETE:
}
}
// Perform service discovery registrations and initialization of
// HTTP server after the verifyOnly check.
// Instantiate the wait group
c.WaitGroup = &sync.WaitGroup{}
// If service discovery is available, run service discovery
if disc := coreConfig.GetServiceRegistration(); disc != nil {
activeFunc := func() bool {
if isLeader, _, _, err := core.Leader(); err == nil {
return isLeader
}
return false
}
if err := disc.RunServiceRegistration(c.WaitGroup, c.ShutdownCh, coreConfig.RedirectAddr, activeFunc, core.Sealed, core.PerfStandby); err != nil {
c.UI.Error(fmt.Sprintf("Error initializing service discovery: %v", err))
return 1
}
}
// Perform initialization of HTTP server after the verifyOnly check.
// If we're in Dev mode, then initialize the core
if c.flagDev && !c.flagDevSkipInit {
init, err := c.enableDev(core, coreConfig)