feat: Add time api to apid

This extends apid to cover the time api.

Signed-off-by: Brad Beam <brad.beam@talos-systems.com>
This commit is contained in:
Brad Beam
2019-10-25 19:08:55 +00:00
committed by Andrew Rynhard
parent d3d011c8d2
commit ee24e42319
12 changed files with 310 additions and 87 deletions

View File

@@ -20,6 +20,7 @@ import (
common "github.com/talos-systems/talos/api/common"
machine "github.com/talos-systems/talos/api/machine"
os "github.com/talos-systems/talos/api/os"
time "github.com/talos-systems/talos/api/time"
constants "github.com/talos-systems/talos/pkg/constants"
tls "github.com/talos-systems/talos/pkg/grpc/tls"
)
@@ -215,6 +216,15 @@ type VersionReply = machine.VersionReply
// VersionInfo from public import machine/machine.proto
type VersionInfo = machine.VersionInfo
// TimeRequest from public import time/time.proto
type TimeRequest = time.TimeRequest
// TimeReply from public import time/time.proto
type TimeReply = time.TimeReply
// TimeResponse from public import time/time.proto
type TimeResponse = time.TimeResponse
// NodeMetadata from public import common/common.proto
type NodeMetadata = common.NodeMetadata
@@ -224,16 +234,17 @@ type Empty = empty.Empty
func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) }
var fileDescriptor_00212fb1f9d3bf1c = []byte{
// 140 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x2c, 0x8b, 0xb1, 0x0a, 0xc2, 0x30,
0x14, 0x45, 0xd5, 0x82, 0xa0, 0x6e, 0x8a, 0x4b, 0xdd, 0xc4, 0xd5, 0xbe, 0xc1, 0x3f, 0xf0, 0x0b,
0x32, 0xbb, 0xa5, 0x21, 0xa6, 0x81, 0xbe, 0xde, 0x87, 0xef, 0x75, 0xe8, 0xdf, 0x0b, 0x4d, 0xa6,
0xc3, 0x39, 0x97, 0x7b, 0x3c, 0x78, 0xc9, 0x9d, 0xfc, 0x60, 0x38, 0x37, 0x5e, 0x72, 0x7b, 0x82,
0x12, 0xb4, 0x94, 0xf6, 0xca, 0x3e, 0x0c, 0x79, 0x8a, 0x54, 0x59, 0xf3, 0x25, 0x80, 0x19, 0x13,
0x15, 0xd4, 0x78, 0x4b, 0x40, 0x1a, 0x23, 0xad, 0xd6, 0xcf, 0x5f, 0x8a, 0x2c, 0xb6, 0x94, 0xf1,
0xfd, 0xf8, 0xdc, 0x53, 0xb6, 0x61, 0xee, 0xbb, 0x00, 0x26, 0xf3, 0x23, 0xf4, 0xa9, 0x8b, 0x5a,
0x64, 0x2d, 0x46, 0x5e, 0xb2, 0xdb, 0xb8, 0xad, 0xdb, 0xb9, 0xa6, 0xdf, 0xaf, 0xa7, 0xd7, 0x3f,
0x00, 0x00, 0xff, 0xff, 0x96, 0x64, 0x43, 0x69, 0x9c, 0x00, 0x00, 0x00,
// 151 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x2c, 0xcb, 0x31, 0xce, 0xc2, 0x30,
0x0c, 0x05, 0xe0, 0xff, 0xa7, 0x08, 0x09, 0x18, 0x90, 0x40, 0x2c, 0x65, 0x43, 0xac, 0xd4, 0x03,
0x37, 0xe0, 0x04, 0x99, 0xd9, 0x92, 0x28, 0xa4, 0x96, 0xea, 0xda, 0xc2, 0xee, 0xd0, 0xdb, 0x23,
0x9a, 0x2c, 0x7e, 0x7a, 0x9f, 0xf5, 0x76, 0x5b, 0x2f, 0xd8, 0xc9, 0x87, 0x8d, 0x8f, 0x8d, 0x17,
0x6c, 0xf7, 0xac, 0xc0, 0x5a, 0xa4, 0x3d, 0x93, 0x8f, 0x3d, 0x8e, 0x09, 0x6a, 0x56, 0x3e, 0x18,
0x52, 0x82, 0xdf, 0xa9, 0x70, 0x8a, 0x4c, 0xc4, 0x23, 0x94, 0xa8, 0x78, 0xc9, 0xcc, 0x79, 0x48,
0xb0, 0xb4, 0x30, 0xbd, 0x21, 0x91, 0xd8, 0x5c, 0x9e, 0xcf, 0xdb, 0xeb, 0x9a, 0xd1, 0xfa, 0x29,
0x74, 0x91, 0x09, 0xcc, 0x0f, 0xac, 0x77, 0x9d, 0xd5, 0x12, 0x69, 0x69, 0xe0, 0x05, 0xdd, 0x9f,
0xfb, 0x77, 0x2b, 0xd7, 0xb8, 0x75, 0xd8, 0x2c, 0xb3, 0xc7, 0x37, 0x00, 0x00, 0xff, 0xff, 0xf0,
0x38, 0xc4, 0xa9, 0xaf, 0x00, 0x00, 0x00,
}
type ApiProxy struct {
@@ -479,6 +490,30 @@ func (p *ApiProxy) Proxy(ctx context.Context, method string, creds credentials.T
resp.Response = append(resp.Response, msg.(*machine.VersionReply).Response[0])
}
response = resp
case "/time.Time/Time":
// Initialize target clients
clients, err := createTimeClient(targets, creds, proxyMd)
if err != nil {
break
}
resp := &time.TimeReply{}
msgs, err = proxyTimeRunner(clients, in, proxyTime)
for _, msg := range msgs {
resp.Response = append(resp.Response, msg.(*time.TimeReply).Response[0])
}
response = resp
case "/time.Time/TimeCheck":
// Initialize target clients
clients, err := createTimeClient(targets, creds, proxyMd)
if err != nil {
break
}
resp := &time.TimeReply{}
msgs, err = proxyTimeRunner(clients, in, proxyTimeCheck)
for _, msg := range msgs {
resp.Response = append(resp.Response, msg.(*time.TimeReply).Response[0])
}
response = resp
}
@@ -752,6 +787,62 @@ func proxyVersion(client *proxyMachineClient, in interface{}, wg *sync.WaitGroup
respCh <- resp
}
type runnerTimeFn func(*proxyTimeClient, interface{}, *sync.WaitGroup, chan proto.Message, chan error)
func proxyTimeRunner(clients []*proxyTimeClient, in interface{}, runner runnerTimeFn) ([]proto.Message, error) {
var (
errors *go_multierror.Error
wg sync.WaitGroup
)
respCh := make(chan proto.Message, len(clients))
errCh := make(chan error, len(clients))
wg.Add(len(clients))
for _, client := range clients {
go runner(client, in, &wg, respCh, errCh)
}
wg.Wait()
close(respCh)
close(errCh)
var response []proto.Message
for resp := range respCh {
response = append(response, resp)
}
for err := range errCh {
errors = go_multierror.Append(errors, err)
}
return response, errors.ErrorOrNil()
}
type proxyTimeClient struct {
Conn time.TimeClient
Context context.Context
Target string
DialOpts []grpc.DialOption
}
func proxyTime(client *proxyTimeClient, in interface{}, wg *sync.WaitGroup, respCh chan proto.Message, errCh chan error) {
defer wg.Done()
resp, err := client.Conn.Time(client.Context, in.(*empty.Empty))
if err != nil {
errCh <- err
return
}
resp.Response[0].Metadata = &NodeMetadata{Hostname: client.Target}
respCh <- resp
}
func proxyTimeCheck(client *proxyTimeClient, in interface{}, wg *sync.WaitGroup, respCh chan proto.Message, errCh chan error) {
defer wg.Done()
resp, err := client.Conn.TimeCheck(client.Context, in.(*time.TimeRequest))
if err != nil {
errCh <- err
return
}
resp.Response[0].Metadata = &NodeMetadata{Hostname: client.Target}
respCh <- resp
}
func createOSClient(targets []string, creds credentials.TransportCredentials, proxyMd metadata.MD) ([]*proxyOSClient, error) {
var errors *go_multierror.Error
clients := make([]*proxyOSClient, 0, len(targets))
@@ -800,14 +891,40 @@ func createMachineClient(targets []string, creds credentials.TransportCredential
return clients, errors.ErrorOrNil()
}
func createTimeClient(targets []string, creds credentials.TransportCredentials, proxyMd metadata.MD) ([]*proxyTimeClient, error) {
var errors *go_multierror.Error
clients := make([]*proxyTimeClient, 0, len(targets))
for _, target := range targets {
c := &proxyTimeClient{
// TODO change the context to be more useful ( ex cancelable )
Context: metadata.NewOutgoingContext(context.Background(), proxyMd),
Target: target,
}
// TODO: i think we potentially leak a client here,
// we should close the request // cancel the context if it errors
// Explicitly set OSD port
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", target, 50000), grpc.WithTransportCredentials(creds))
if err != nil {
// TODO: probably worth wrapping err to add some context about the target
errors = go_multierror.Append(errors, err)
continue
}
c.Conn = time.NewTimeClient(conn)
clients = append(clients, c)
}
return clients, errors.ErrorOrNil()
}
type Registrator struct {
os.OSClient
machine.MachineClient
time.TimeClient
}
func (r *Registrator) Register(s *grpc.Server) {
os.RegisterOSServer(s, r)
machine.RegisterMachineServer(s, r)
time.RegisterTimeServer(s, r)
}
func (r *Registrator) Containers(ctx context.Context, in *os.ContainersRequest) (*os.ContainersReply, error) {
@@ -909,6 +1026,14 @@ func (r *Registrator) Version(ctx context.Context, in *empty.Empty) (*machine.Ve
return r.MachineClient.Version(ctx, in)
}
func (r *Registrator) Time(ctx context.Context, in *empty.Empty) (*time.TimeReply, error) {
return r.TimeClient.Time(ctx, in)
}
func (r *Registrator) TimeCheck(ctx context.Context, in *time.TimeRequest) (*time.TimeReply, error) {
return r.TimeClient.TimeCheck(ctx, in)
}
type LocalOSClient struct {
os.OSClient
}
@@ -1024,3 +1149,27 @@ func (c *LocalMachineClient) Stop(ctx context.Context, in *machine.StopRequest,
func (c *LocalMachineClient) Version(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*machine.VersionReply, error) {
return c.MachineClient.Version(ctx, in, opts...)
}
type LocalTimeClient struct {
time.TimeClient
}
func NewLocalTimeClient() (time.TimeClient, error) {
conn, err := grpc.Dial("unix:"+constants.TimeSocketPath,
grpc.WithInsecure(),
)
if err != nil {
return nil, err
}
return &LocalTimeClient{
TimeClient: time.NewTimeClient(conn),
}, nil
}
func (c *LocalTimeClient) Time(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*time.TimeReply, error) {
return c.TimeClient.Time(ctx, in, opts...)
}
func (c *LocalTimeClient) TimeCheck(ctx context.Context, in *time.TimeRequest, opts ...grpc.CallOption) (*time.TimeReply, error) {
return c.TimeClient.TimeCheck(ctx, in, opts...)
}