feat(init): implement init gRPC API, forward reboot to init (#579)

This implements insecure over-file-socket gRPC API for init with two
first simplest APIs: reboot and shutdown (poweroff).

File socket is mounted only to `osd` service, so it is the only service
which can access init API. Osd forwards reboot/shutdown already
implemented APIs to init which actually executes these.

This enables graceful shutdown/reboot with service shutdown, sync, etc.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov
2019-04-26 23:04:24 +03:00
committed by GitHub
parent 2f6d5e0260
commit ab2917e833
13 changed files with 240 additions and 61 deletions

View File

@@ -13,10 +13,14 @@ RUN protoc -I/usr/local/include -I./proto --go_out=plugins=grpc:proto proto/api.
WORKDIR /trustd
COPY ./internal/app/trustd/proto ./proto
RUN protoc -I/usr/local/include -I./proto --go_out=plugins=grpc:proto proto/api.proto
WORKDIR /init
COPY ./internal/app/init/proto ./proto
RUN protoc -I/usr/local/include -I./proto --go_out=plugins=grpc:proto proto/api.proto
FROM scratch AS proto
COPY --from=proto-build /osd/proto/api.pb.go /internal/app/osd/proto/
COPY --from=proto-build /trustd/proto/api.pb.go /internal/app/trustd/proto/
COPY --from=proto-build /init/proto/api.pb.go /internal/app/init/proto/
# The base provides a common image to build the Talos source code.

View File

@@ -19,6 +19,7 @@ import (
"github.com/golang/protobuf/ptypes/empty"
"github.com/talos-systems/talos/cmd/osctl/pkg/client/config"
initproto "github.com/talos-systems/talos/internal/app/init/proto"
"github.com/talos-systems/talos/internal/app/osd/proto"
"github.com/talos-systems/talos/internal/pkg/proc"
"google.golang.org/grpc"
@@ -37,8 +38,9 @@ type Credentials struct {
// Client implements the proto.OSDClient interface. It serves as the
// concrete type with the required methods.
type Client struct {
conn *grpc.ClientConn
client proto.OSDClient
conn *grpc.ClientConn
client proto.OSDClient
initClient initproto.InitClient
}
// NewDefaultClientCredentials initializes ClientCredentials using default paths
@@ -101,6 +103,7 @@ func NewClient(port int, clientcreds *Credentials) (c *Client, err error) {
}
c.client = proto.NewOSDClient(c.conn)
c.initClient = initproto.NewInitClient(c.conn)
return c, nil
}
@@ -180,7 +183,7 @@ func (c *Client) Reset() (err error) {
// Reboot implements the proto.OSDClient interface.
func (c *Client) Reboot() (err error) {
ctx := context.Background()
_, err = c.client.Reboot(ctx, &empty.Empty{})
_, err = c.initClient.Reboot(ctx, &empty.Empty{})
if err != nil {
return
}
@@ -191,7 +194,7 @@ func (c *Client) Reboot() (err error) {
// Shutdown implements the proto.OSDClient interface.
func (c *Client) Shutdown() (err error) {
ctx := context.Background()
_, err = c.client.Shutdown(ctx, &empty.Empty{})
_, err = c.initClient.Shutdown(ctx, &empty.Empty{})
if err != nil {
return
}

View File

@@ -0,0 +1,64 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package reg
import (
"context"
"sync/atomic"
"github.com/golang/protobuf/ptypes/empty"
"github.com/talos-systems/talos/internal/app/init/proto"
"github.com/talos-systems/talos/pkg/userdata"
"google.golang.org/grpc"
)
// Registrator is the concrete type that implements the factory.Registrator and
// proto.Init interfaces.
type Registrator struct {
Data *userdata.UserData
ShutdownCh chan struct{}
RebootCh chan struct{}
rebootCalled uint32
}
// NewRegistrator builds new Registrator instance
func NewRegistrator(data *userdata.UserData) *Registrator {
return &Registrator{
Data: data,
ShutdownCh: make(chan struct{}),
RebootCh: make(chan struct{}),
}
}
// Register implements the factory.Registrator interface.
func (r *Registrator) Register(s *grpc.Server) {
proto.RegisterInitServer(s, r)
}
// Reboot implements the proto.InitServer interface.
func (r *Registrator) Reboot(ctx context.Context, in *empty.Empty) (reply *proto.RebootReply, err error) {
reply = &proto.RebootReply{}
// make sure channel is closed only once (and initiate either reboot or shutdown)
if atomic.CompareAndSwapUint32(&r.rebootCalled, 0, 1) {
close(r.RebootCh)
}
return
}
// Shutdown implements the proto.InitServer interface.
func (r *Registrator) Shutdown(ctx context.Context, in *empty.Empty) (reply *proto.ShutdownReply, err error) {
reply = &proto.ShutdownReply{}
// make sure channel is closed only once (and initiate either reboot or shutdown)
if atomic.CompareAndSwapUint32(&r.rebootCalled, 0, 1) {
close(r.ShutdownCh)
}
return
}

View File

@@ -17,6 +17,7 @@ import (
criconstants "github.com/containerd/cri/pkg/constants"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/internal/platform"
"github.com/talos-systems/talos/internal/app/init/internal/reg"
"github.com/talos-systems/talos/internal/app/init/internal/rootfs"
"github.com/talos-systems/talos/internal/app/init/internal/rootfs/mount"
"github.com/talos-systems/talos/internal/app/init/pkg/network"
@@ -24,6 +25,7 @@ import (
ctrdrunner "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/services"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/internal/pkg/grpc/factory"
"github.com/talos-systems/talos/pkg/userdata"
"golang.org/x/sys/unix"
@@ -192,6 +194,23 @@ func root() (err error) {
svcs := system.Services(data)
defer svcs.Shutdown()
// Instantiate internal init API
api := reg.NewRegistrator(data)
server := factory.NewServer(api)
listener, err := factory.NewListener(
factory.Network("unix"),
factory.SocketPath(constants.InitSocketPath),
)
if err != nil {
panic(err)
}
defer server.Stop()
go func() {
// nolint: errcheck
server.Serve(listener)
}()
// Start containerd.
svcs.Start(&services.Containerd{})
@@ -199,13 +218,20 @@ func root() (err error) {
go startKubernetesServices(startupErrCh, data)
select {
case <-api.ShutdownCh:
log.Printf("poweroff via API received")
// poweroff, proceed to shutdown but mark as poweroff
rebootFlag = unix.LINUX_REBOOT_CMD_POWER_OFF
case <-poweroffCh:
log.Printf("poweroff via ACPI")
// poweroff, proceed to shutdown but mark as poweroff
rebootFlag = unix.LINUX_REBOOT_CMD_POWER_OFF
case err = <-startupErrCh:
panic(err)
case <-termCh:
log.Printf("SIGTERM received, rebooting...")
case <-api.RebootCh:
log.Printf("reboot via API received, rebooting...")
}
return nil
@@ -315,21 +341,21 @@ func sync() {
unix.Sync()
}()
log.Printf("Waiting for sync...")
log.Printf("waiting for sync...")
for i := 29; i >= 0; i-- {
select {
case <-syncdone:
log.Printf("Sync done")
log.Printf("sync done")
return
case <-time.After(time.Second):
}
if i != 0 {
log.Printf("Waiting %d more seconds for sync to finish", i)
log.Printf("waiting %d more seconds for sync to finish", i)
}
}
log.Printf("Sync hasn't completed in time, aborting...")
log.Printf("sync hasn't completed in time, aborting...")
}
func reboot() {

View File

@@ -62,6 +62,7 @@ func (o *OSD) Runner(data *userdata.UserData) (runner.Runner, error) {
{Type: "bind", Destination: "/etc/kubernetes", Source: "/etc/kubernetes", Options: []string{"bind", "rw"}},
{Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}},
{Type: "bind", Destination: "/var/log", Source: "/var/log", Options: []string{"rbind", "rw"}},
{Type: "bind", Destination: "/var/lib/init", Source: "/var/lib/init", Options: []string{"rbind", "rw"}},
}
env := []string{}

View File

@@ -0,0 +1,20 @@
syntax = "proto3";
package proto;
import "google/protobuf/empty.proto";
// The Init service definition.
service Init {
rpc Reboot(google.protobuf.Empty) returns (RebootReply) {}
rpc Shutdown(google.protobuf.Empty) returns (ShutdownReply) {}
}
// The response message containing the reboot status.
message RebootReply {}
// The response message containing the shutdown status.
message ShutdownReply {}

View File

@@ -0,0 +1,44 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package reg
import (
"context"
"github.com/golang/protobuf/ptypes/empty"
"github.com/talos-systems/talos/internal/app/init/proto"
"github.com/talos-systems/talos/internal/pkg/constants"
"google.golang.org/grpc"
)
// InitServiceClient is a gRPC client for init service API
type InitServiceClient struct {
proto.InitClient
}
// NewInitServiceClient initializes new client and connects to init
func NewInitServiceClient() (*InitServiceClient, error) {
conn, err := grpc.Dial("unix:"+constants.InitSocketPath,
grpc.WithInsecure(),
)
if err != nil {
return nil, err
}
return &InitServiceClient{
InitClient: proto.NewInitClient(conn),
}, nil
}
// Reboot executes init Reboot() API
func (c *InitServiceClient) Reboot(ctx context.Context, empty *empty.Empty) (*proto.RebootReply, error) {
return c.InitClient.Reboot(ctx, empty)
}
// Shutdown executes init Shutdown() API
func (c *InitServiceClient) Shutdown(ctx context.Context, empty *empty.Empty) (*proto.ShutdownReply, error) {
return c.InitClient.Shutdown(ctx, empty)
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
containerdrunner "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
initproto "github.com/talos-systems/talos/internal/app/init/proto"
"github.com/talos-systems/talos/internal/app/osd/proto"
filechunker "github.com/talos-systems/talos/internal/pkg/chunker/file"
"github.com/talos-systems/talos/internal/pkg/constants"
@@ -41,12 +42,16 @@ import (
// Registrator is the concrete type that implements the factory.Registrator and
// proto.OSDServer interfaces.
type Registrator struct {
// every Init service API is proxied via OSD
*InitServiceClient
Data *userdata.UserData
}
// Register implements the factory.Registrator interface.
func (r *Registrator) Register(s *grpc.Server) {
proto.RegisterOSDServer(s, r)
initproto.RegisterInitServer(s, r)
}
// Kubeconfig implements the proto.OSDServer interface. The admin kubeconfig is
@@ -240,26 +245,6 @@ func (r *Registrator) Reset(ctx context.Context, in *empty.Empty) (reply *proto.
return reply, nil
}
// Reboot implements the proto.OSDServer interface.
func (r *Registrator) Reboot(ctx context.Context, in *empty.Empty) (reply *proto.RebootReply, err error) {
reply = &proto.RebootReply{}
// nolint: errcheck
defer unix.Reboot(int(unix.LINUX_REBOOT_CMD_RESTART))
return
}
// Shutdown implements the proto.OSDServer interface.
func (r *Registrator) Shutdown(ctx context.Context, in *empty.Empty) (reply *proto.ShutdownReply, err error) {
reply = &proto.ShutdownReply{}
// nolint: errcheck
defer unix.Reboot(unix.LINUX_REBOOT_CMD_POWER_OFF)
return
}
// Dmesg implements the proto.OSDServer interface. The klogctl syscall is used
// to read from the ring buffer at /proc/kmsg by taking the
// SYSLOG_ACTION_READ_ALL action. This action reads all messages remaining in

View File

@@ -38,9 +38,17 @@ func main() {
log.Fatalf("credentials: %v", err)
}
initClient, err := reg.NewInitServiceClient()
if err != nil {
log.Fatalf("init client: %v", err)
}
log.Println("Starting osd")
err = factory.Listen(
&reg.Registrator{Data: data},
err = factory.ListenAndServe(
&reg.Registrator{
Data: data,
InitServiceClient: initClient,
},
factory.Port(constants.OsdPort),
factory.ServerOptions(
grpc.Creds(

View File

@@ -6,16 +6,16 @@ package proto;
import "google/protobuf/empty.proto";
// The OSD service definition.
//
// OSD Service also implements all the API of Init Service
service OSD {
rpc Dmesg(google.protobuf.Empty) returns (Data) {}
rpc Kubeconfig(google.protobuf.Empty) returns (Data) {}
rpc Logs(LogsRequest) returns (stream Data) {}
rpc Processes(ProcessesRequest) returns (ProcessesReply) {}
rpc Reboot(google.protobuf.Empty) returns (RebootReply) {}
rpc Reset(google.protobuf.Empty) returns (ResetReply) {}
rpc Restart(RestartRequest) returns (RestartReply) {}
rpc Routes(google.protobuf.Empty) returns (RoutesReply) {}
rpc Shutdown(google.protobuf.Empty) returns (ShutdownReply) {}
rpc Stats(StatsRequest) returns (StatsReply) {}
rpc Top(google.protobuf.Empty) returns (TopReply) {}
rpc DF(google.protobuf.Empty) returns (DFReply) {}
@@ -61,15 +61,9 @@ message RestartRequest {
// The response message containing the restart status.
message RestartReply {}
// The response message containing the shutdown status.
message ShutdownReply {}
// The response message containing the restart status.
message ResetReply {}
// The response message containing the restart status.
message RebootReply {}
// The request message containing the process name.
message LogsRequest {
string namespace = 1;

View File

@@ -46,7 +46,7 @@ func main() {
data.Services.Trustd.Password,
)
err = factory.Listen(
err = factory.ListenAndServe(
&reg.Registrator{Data: data.Security.OS},
factory.Port(constants.TrustdPort),
factory.ServerOptions(

View File

@@ -99,6 +99,9 @@ const (
// TalosConfigEnvVar is the environment variable for setting the Talos configuration file path.
TalosConfigEnvVar = "TALOSCONFIG"
// InitSocketPath is the path to file socket of init API
InitSocketPath = "/var/lib/init/init.sock"
)
// See https://linux.die.net/man/3/klogctl

View File

@@ -7,6 +7,8 @@ package factory
import (
"crypto/tls"
"net"
"os"
"path/filepath"
"strconv"
"github.com/pkg/errors"
@@ -22,6 +24,7 @@ type Registrator interface {
// Options is the functional options struct.
type Options struct {
Port int
SocketPath string
Network string
Config *tls.Config
ServerOptions []grpc.ServerOption
@@ -37,6 +40,13 @@ func Port(o int) Option {
}
}
// SocketPath sets the listen unix file socket path of the server.
func SocketPath(o string) Option {
return func(args *Options) {
args.SocketPath = o
}
}
// Network sets the network type of the listener.
func Network(o string) Option {
return func(args *Options) {
@@ -61,7 +71,8 @@ func ServerOptions(o ...grpc.ServerOption) Option {
// NewDefaultOptions initializes the Options struct with default values.
func NewDefaultOptions(setters ...Option) *Options {
opts := &Options{
Network: "tcp",
Network: "tcp",
SocketPath: "/run/factory/factory.sock",
}
for _, setter := range setters {
@@ -71,38 +82,54 @@ func NewDefaultOptions(setters ...Option) *Options {
return opts
}
// Listen configures TLS for mutual authtentication by loading the CA into a
// CertPool and configuring the server's policy for TLS Client Authentication.
// Once TLS is configured, the gRPC options are built to make use of the TLS
// configuration and the receiver (Server) is registered to the gRPC server.
// Finally the gRPC server is started.
func Listen(r Registrator, setters ...Option) (err error) {
// NewServer builds grpc server and binds it to the Registrator
func NewServer(r Registrator, setters ...Option) *grpc.Server {
opts := NewDefaultOptions(setters...)
if opts.Network == "tcp" && opts.Port == 0 {
return errors.Errorf("a port is required for TCP listener")
}
server := grpc.NewServer(opts.ServerOptions...)
r.Register(server)
return server
}
// NewListener builds listener for grpc server
func NewListener(setters ...Option) (net.Listener, error) {
opts := NewDefaultOptions(setters...)
if opts.Network == "tcp" && opts.Port == 0 {
return nil, errors.New("a port is required for TCP listener")
}
var address string
switch opts.Network {
case "unix":
address = "/run/factory/factory.sock"
address = opts.SocketPath
// make any dirs on the path to the listening socket
if err := os.MkdirAll(filepath.Dir(address), 0700); err != nil {
return nil, errors.Wrap(err, "error creating containing directory for the file socket")
}
case "tcp":
address = ":" + strconv.Itoa(opts.Port)
default:
return errors.Errorf("unknown network: %s", opts.Network)
}
listener, err := net.Listen(opts.Network, address)
if err != nil {
return
}
err = server.Serve(listener)
if err != nil {
return
return nil, errors.Errorf("unknown network: %s", opts.Network)
}
return nil
return net.Listen(opts.Network, address)
}
// ListenAndServe configures TLS for mutual authtentication by loading the CA into a
// CertPool and configuring the server's policy for TLS Client Authentication.
// Once TLS is configured, the gRPC options are built to make use of the TLS
// configuration and the receiver (Server) is registered to the gRPC server.
// Finally the gRPC server is started.
func ListenAndServe(r Registrator, setters ...Option) (err error) {
server := NewServer(r, setters...)
listener, err := NewListener(setters...)
if err != nil {
return err
}
return server.Serve(listener)
}