diff --git a/Dockerfile b/Dockerfile index 9a97bfcc4..f7a57b6bc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,10 +13,14 @@ RUN protoc -I/usr/local/include -I./proto --go_out=plugins=grpc:proto proto/api. WORKDIR /trustd COPY ./internal/app/trustd/proto ./proto RUN protoc -I/usr/local/include -I./proto --go_out=plugins=grpc:proto proto/api.proto +WORKDIR /init +COPY ./internal/app/init/proto ./proto +RUN protoc -I/usr/local/include -I./proto --go_out=plugins=grpc:proto proto/api.proto FROM scratch AS proto COPY --from=proto-build /osd/proto/api.pb.go /internal/app/osd/proto/ COPY --from=proto-build /trustd/proto/api.pb.go /internal/app/trustd/proto/ +COPY --from=proto-build /init/proto/api.pb.go /internal/app/init/proto/ # The base provides a common image to build the Talos source code. diff --git a/cmd/osctl/pkg/client/client.go b/cmd/osctl/pkg/client/client.go index 381ae4023..240971bb5 100644 --- a/cmd/osctl/pkg/client/client.go +++ b/cmd/osctl/pkg/client/client.go @@ -19,6 +19,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/talos-systems/talos/cmd/osctl/pkg/client/config" + initproto "github.com/talos-systems/talos/internal/app/init/proto" "github.com/talos-systems/talos/internal/app/osd/proto" "github.com/talos-systems/talos/internal/pkg/proc" "google.golang.org/grpc" @@ -37,8 +38,9 @@ type Credentials struct { // Client implements the proto.OSDClient interface. It serves as the // concrete type with the required methods. type Client struct { - conn *grpc.ClientConn - client proto.OSDClient + conn *grpc.ClientConn + client proto.OSDClient + initClient initproto.InitClient } // NewDefaultClientCredentials initializes ClientCredentials using default paths @@ -101,6 +103,7 @@ func NewClient(port int, clientcreds *Credentials) (c *Client, err error) { } c.client = proto.NewOSDClient(c.conn) + c.initClient = initproto.NewInitClient(c.conn) return c, nil } @@ -180,7 +183,7 @@ func (c *Client) Reset() (err error) { // Reboot implements the proto.OSDClient interface. func (c *Client) Reboot() (err error) { ctx := context.Background() - _, err = c.client.Reboot(ctx, &empty.Empty{}) + _, err = c.initClient.Reboot(ctx, &empty.Empty{}) if err != nil { return } @@ -191,7 +194,7 @@ func (c *Client) Reboot() (err error) { // Shutdown implements the proto.OSDClient interface. func (c *Client) Shutdown() (err error) { ctx := context.Background() - _, err = c.client.Shutdown(ctx, &empty.Empty{}) + _, err = c.initClient.Shutdown(ctx, &empty.Empty{}) if err != nil { return } diff --git a/internal/app/init/internal/reg/reg.go b/internal/app/init/internal/reg/reg.go new file mode 100644 index 000000000..c5fb9e02e --- /dev/null +++ b/internal/app/init/internal/reg/reg.go @@ -0,0 +1,64 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package reg + +import ( + "context" + "sync/atomic" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/talos-systems/talos/internal/app/init/proto" + "github.com/talos-systems/talos/pkg/userdata" + "google.golang.org/grpc" +) + +// Registrator is the concrete type that implements the factory.Registrator and +// proto.Init interfaces. +type Registrator struct { + Data *userdata.UserData + + ShutdownCh chan struct{} + RebootCh chan struct{} + + rebootCalled uint32 +} + +// NewRegistrator builds new Registrator instance +func NewRegistrator(data *userdata.UserData) *Registrator { + return &Registrator{ + Data: data, + ShutdownCh: make(chan struct{}), + RebootCh: make(chan struct{}), + } +} + +// Register implements the factory.Registrator interface. +func (r *Registrator) Register(s *grpc.Server) { + proto.RegisterInitServer(s, r) +} + +// Reboot implements the proto.InitServer interface. +func (r *Registrator) Reboot(ctx context.Context, in *empty.Empty) (reply *proto.RebootReply, err error) { + reply = &proto.RebootReply{} + + // make sure channel is closed only once (and initiate either reboot or shutdown) + if atomic.CompareAndSwapUint32(&r.rebootCalled, 0, 1) { + close(r.RebootCh) + } + + return +} + +// Shutdown implements the proto.InitServer interface. +func (r *Registrator) Shutdown(ctx context.Context, in *empty.Empty) (reply *proto.ShutdownReply, err error) { + reply = &proto.ShutdownReply{} + + // make sure channel is closed only once (and initiate either reboot or shutdown) + if atomic.CompareAndSwapUint32(&r.rebootCalled, 0, 1) { + close(r.ShutdownCh) + } + + return +} diff --git a/internal/app/init/main.go b/internal/app/init/main.go index 55fecc500..e4ec9a490 100644 --- a/internal/app/init/main.go +++ b/internal/app/init/main.go @@ -17,6 +17,7 @@ import ( criconstants "github.com/containerd/cri/pkg/constants" "github.com/pkg/errors" "github.com/talos-systems/talos/internal/app/init/internal/platform" + "github.com/talos-systems/talos/internal/app/init/internal/reg" "github.com/talos-systems/talos/internal/app/init/internal/rootfs" "github.com/talos-systems/talos/internal/app/init/internal/rootfs/mount" "github.com/talos-systems/talos/internal/app/init/pkg/network" @@ -24,6 +25,7 @@ import ( ctrdrunner "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd" "github.com/talos-systems/talos/internal/app/init/pkg/system/services" "github.com/talos-systems/talos/internal/pkg/constants" + "github.com/talos-systems/talos/internal/pkg/grpc/factory" "github.com/talos-systems/talos/pkg/userdata" "golang.org/x/sys/unix" @@ -192,6 +194,23 @@ func root() (err error) { svcs := system.Services(data) defer svcs.Shutdown() + // Instantiate internal init API + api := reg.NewRegistrator(data) + server := factory.NewServer(api) + listener, err := factory.NewListener( + factory.Network("unix"), + factory.SocketPath(constants.InitSocketPath), + ) + if err != nil { + panic(err) + } + defer server.Stop() + + go func() { + // nolint: errcheck + server.Serve(listener) + }() + // Start containerd. svcs.Start(&services.Containerd{}) @@ -199,13 +218,20 @@ func root() (err error) { go startKubernetesServices(startupErrCh, data) select { + case <-api.ShutdownCh: + log.Printf("poweroff via API received") + // poweroff, proceed to shutdown but mark as poweroff + rebootFlag = unix.LINUX_REBOOT_CMD_POWER_OFF case <-poweroffCh: + log.Printf("poweroff via ACPI") // poweroff, proceed to shutdown but mark as poweroff rebootFlag = unix.LINUX_REBOOT_CMD_POWER_OFF case err = <-startupErrCh: panic(err) case <-termCh: log.Printf("SIGTERM received, rebooting...") + case <-api.RebootCh: + log.Printf("reboot via API received, rebooting...") } return nil @@ -315,21 +341,21 @@ func sync() { unix.Sync() }() - log.Printf("Waiting for sync...") + log.Printf("waiting for sync...") for i := 29; i >= 0; i-- { select { case <-syncdone: - log.Printf("Sync done") + log.Printf("sync done") return case <-time.After(time.Second): } if i != 0 { - log.Printf("Waiting %d more seconds for sync to finish", i) + log.Printf("waiting %d more seconds for sync to finish", i) } } - log.Printf("Sync hasn't completed in time, aborting...") + log.Printf("sync hasn't completed in time, aborting...") } func reboot() { diff --git a/internal/app/init/pkg/system/services/osd.go b/internal/app/init/pkg/system/services/osd.go index 7679b68ff..7615def98 100644 --- a/internal/app/init/pkg/system/services/osd.go +++ b/internal/app/init/pkg/system/services/osd.go @@ -62,6 +62,7 @@ func (o *OSD) Runner(data *userdata.UserData) (runner.Runner, error) { {Type: "bind", Destination: "/etc/kubernetes", Source: "/etc/kubernetes", Options: []string{"bind", "rw"}}, {Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}}, {Type: "bind", Destination: "/var/log", Source: "/var/log", Options: []string{"rbind", "rw"}}, + {Type: "bind", Destination: "/var/lib/init", Source: "/var/lib/init", Options: []string{"rbind", "rw"}}, } env := []string{} diff --git a/internal/app/init/proto/api.proto b/internal/app/init/proto/api.proto new file mode 100644 index 000000000..651bd6449 --- /dev/null +++ b/internal/app/init/proto/api.proto @@ -0,0 +1,20 @@ + +syntax = "proto3"; + +package proto; + +import "google/protobuf/empty.proto"; + +// The Init service definition. +service Init { + rpc Reboot(google.protobuf.Empty) returns (RebootReply) {} + rpc Shutdown(google.protobuf.Empty) returns (ShutdownReply) {} +} + + +// The response message containing the reboot status. +message RebootReply {} + +// The response message containing the shutdown status. +message ShutdownReply {} + diff --git a/internal/app/osd/internal/reg/init_client.go b/internal/app/osd/internal/reg/init_client.go new file mode 100644 index 000000000..97c72a9c7 --- /dev/null +++ b/internal/app/osd/internal/reg/init_client.go @@ -0,0 +1,44 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package reg + +import ( + "context" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/talos-systems/talos/internal/app/init/proto" + "github.com/talos-systems/talos/internal/pkg/constants" + "google.golang.org/grpc" +) + +// InitServiceClient is a gRPC client for init service API +type InitServiceClient struct { + proto.InitClient +} + +// NewInitServiceClient initializes new client and connects to init +func NewInitServiceClient() (*InitServiceClient, error) { + conn, err := grpc.Dial("unix:"+constants.InitSocketPath, + grpc.WithInsecure(), + ) + + if err != nil { + return nil, err + } + + return &InitServiceClient{ + InitClient: proto.NewInitClient(conn), + }, nil +} + +// Reboot executes init Reboot() API +func (c *InitServiceClient) Reboot(ctx context.Context, empty *empty.Empty) (*proto.RebootReply, error) { + return c.InitClient.Reboot(ctx, empty) +} + +// Shutdown executes init Shutdown() API +func (c *InitServiceClient) Shutdown(ctx context.Context, empty *empty.Empty) (*proto.ShutdownReply, error) { + return c.InitClient.Shutdown(ctx, empty) +} diff --git a/internal/app/osd/internal/reg/reg.go b/internal/app/osd/internal/reg/reg.go index 286a3b60c..39b38708a 100644 --- a/internal/app/osd/internal/reg/reg.go +++ b/internal/app/osd/internal/reg/reg.go @@ -27,6 +27,7 @@ import ( "github.com/talos-systems/talos/internal/app/init/pkg/system/events" "github.com/talos-systems/talos/internal/app/init/pkg/system/runner" containerdrunner "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd" + initproto "github.com/talos-systems/talos/internal/app/init/proto" "github.com/talos-systems/talos/internal/app/osd/proto" filechunker "github.com/talos-systems/talos/internal/pkg/chunker/file" "github.com/talos-systems/talos/internal/pkg/constants" @@ -41,12 +42,16 @@ import ( // Registrator is the concrete type that implements the factory.Registrator and // proto.OSDServer interfaces. type Registrator struct { + // every Init service API is proxied via OSD + *InitServiceClient + Data *userdata.UserData } // Register implements the factory.Registrator interface. func (r *Registrator) Register(s *grpc.Server) { proto.RegisterOSDServer(s, r) + initproto.RegisterInitServer(s, r) } // Kubeconfig implements the proto.OSDServer interface. The admin kubeconfig is @@ -240,26 +245,6 @@ func (r *Registrator) Reset(ctx context.Context, in *empty.Empty) (reply *proto. return reply, nil } -// Reboot implements the proto.OSDServer interface. -func (r *Registrator) Reboot(ctx context.Context, in *empty.Empty) (reply *proto.RebootReply, err error) { - reply = &proto.RebootReply{} - - // nolint: errcheck - defer unix.Reboot(int(unix.LINUX_REBOOT_CMD_RESTART)) - - return -} - -// Shutdown implements the proto.OSDServer interface. -func (r *Registrator) Shutdown(ctx context.Context, in *empty.Empty) (reply *proto.ShutdownReply, err error) { - reply = &proto.ShutdownReply{} - - // nolint: errcheck - defer unix.Reboot(unix.LINUX_REBOOT_CMD_POWER_OFF) - - return -} - // Dmesg implements the proto.OSDServer interface. The klogctl syscall is used // to read from the ring buffer at /proc/kmsg by taking the // SYSLOG_ACTION_READ_ALL action. This action reads all messages remaining in diff --git a/internal/app/osd/main.go b/internal/app/osd/main.go index 312965d04..bc72f8f30 100644 --- a/internal/app/osd/main.go +++ b/internal/app/osd/main.go @@ -38,9 +38,17 @@ func main() { log.Fatalf("credentials: %v", err) } + initClient, err := reg.NewInitServiceClient() + if err != nil { + log.Fatalf("init client: %v", err) + } + log.Println("Starting osd") - err = factory.Listen( - ®.Registrator{Data: data}, + err = factory.ListenAndServe( + ®.Registrator{ + Data: data, + InitServiceClient: initClient, + }, factory.Port(constants.OsdPort), factory.ServerOptions( grpc.Creds( diff --git a/internal/app/osd/proto/api.proto b/internal/app/osd/proto/api.proto index ab52baa1c..7948ac13c 100644 --- a/internal/app/osd/proto/api.proto +++ b/internal/app/osd/proto/api.proto @@ -6,16 +6,16 @@ package proto; import "google/protobuf/empty.proto"; // The OSD service definition. +// +// OSD Service also implements all the API of Init Service service OSD { rpc Dmesg(google.protobuf.Empty) returns (Data) {} rpc Kubeconfig(google.protobuf.Empty) returns (Data) {} rpc Logs(LogsRequest) returns (stream Data) {} rpc Processes(ProcessesRequest) returns (ProcessesReply) {} - rpc Reboot(google.protobuf.Empty) returns (RebootReply) {} rpc Reset(google.protobuf.Empty) returns (ResetReply) {} rpc Restart(RestartRequest) returns (RestartReply) {} rpc Routes(google.protobuf.Empty) returns (RoutesReply) {} - rpc Shutdown(google.protobuf.Empty) returns (ShutdownReply) {} rpc Stats(StatsRequest) returns (StatsReply) {} rpc Top(google.protobuf.Empty) returns (TopReply) {} rpc DF(google.protobuf.Empty) returns (DFReply) {} @@ -61,15 +61,9 @@ message RestartRequest { // The response message containing the restart status. message RestartReply {} -// The response message containing the shutdown status. -message ShutdownReply {} - // The response message containing the restart status. message ResetReply {} -// The response message containing the restart status. -message RebootReply {} - // The request message containing the process name. message LogsRequest { string namespace = 1; diff --git a/internal/app/trustd/main.go b/internal/app/trustd/main.go index 5d2c1f297..b963c235b 100644 --- a/internal/app/trustd/main.go +++ b/internal/app/trustd/main.go @@ -46,7 +46,7 @@ func main() { data.Services.Trustd.Password, ) - err = factory.Listen( + err = factory.ListenAndServe( ®.Registrator{Data: data.Security.OS}, factory.Port(constants.TrustdPort), factory.ServerOptions( diff --git a/internal/pkg/constants/constants.go b/internal/pkg/constants/constants.go index 7c391fc98..6b6853cda 100644 --- a/internal/pkg/constants/constants.go +++ b/internal/pkg/constants/constants.go @@ -99,6 +99,9 @@ const ( // TalosConfigEnvVar is the environment variable for setting the Talos configuration file path. TalosConfigEnvVar = "TALOSCONFIG" + + // InitSocketPath is the path to file socket of init API + InitSocketPath = "/var/lib/init/init.sock" ) // See https://linux.die.net/man/3/klogctl diff --git a/internal/pkg/grpc/factory/factory.go b/internal/pkg/grpc/factory/factory.go index 84d4ec2f5..4fc0e2c09 100644 --- a/internal/pkg/grpc/factory/factory.go +++ b/internal/pkg/grpc/factory/factory.go @@ -7,6 +7,8 @@ package factory import ( "crypto/tls" "net" + "os" + "path/filepath" "strconv" "github.com/pkg/errors" @@ -22,6 +24,7 @@ type Registrator interface { // Options is the functional options struct. type Options struct { Port int + SocketPath string Network string Config *tls.Config ServerOptions []grpc.ServerOption @@ -37,6 +40,13 @@ func Port(o int) Option { } } +// SocketPath sets the listen unix file socket path of the server. +func SocketPath(o string) Option { + return func(args *Options) { + args.SocketPath = o + } +} + // Network sets the network type of the listener. func Network(o string) Option { return func(args *Options) { @@ -61,7 +71,8 @@ func ServerOptions(o ...grpc.ServerOption) Option { // NewDefaultOptions initializes the Options struct with default values. func NewDefaultOptions(setters ...Option) *Options { opts := &Options{ - Network: "tcp", + Network: "tcp", + SocketPath: "/run/factory/factory.sock", } for _, setter := range setters { @@ -71,38 +82,54 @@ func NewDefaultOptions(setters ...Option) *Options { return opts } -// Listen configures TLS for mutual authtentication by loading the CA into a -// CertPool and configuring the server's policy for TLS Client Authentication. -// Once TLS is configured, the gRPC options are built to make use of the TLS -// configuration and the receiver (Server) is registered to the gRPC server. -// Finally the gRPC server is started. -func Listen(r Registrator, setters ...Option) (err error) { +// NewServer builds grpc server and binds it to the Registrator +func NewServer(r Registrator, setters ...Option) *grpc.Server { opts := NewDefaultOptions(setters...) - if opts.Network == "tcp" && opts.Port == 0 { - return errors.Errorf("a port is required for TCP listener") - } - server := grpc.NewServer(opts.ServerOptions...) r.Register(server) + return server +} + +// NewListener builds listener for grpc server +func NewListener(setters ...Option) (net.Listener, error) { + opts := NewDefaultOptions(setters...) + + if opts.Network == "tcp" && opts.Port == 0 { + return nil, errors.New("a port is required for TCP listener") + } + var address string switch opts.Network { case "unix": - address = "/run/factory/factory.sock" + address = opts.SocketPath + + // make any dirs on the path to the listening socket + if err := os.MkdirAll(filepath.Dir(address), 0700); err != nil { + return nil, errors.Wrap(err, "error creating containing directory for the file socket") + } case "tcp": address = ":" + strconv.Itoa(opts.Port) default: - return errors.Errorf("unknown network: %s", opts.Network) - } - listener, err := net.Listen(opts.Network, address) - if err != nil { - return - } - err = server.Serve(listener) - if err != nil { - return + return nil, errors.Errorf("unknown network: %s", opts.Network) } - return nil + return net.Listen(opts.Network, address) +} + +// ListenAndServe configures TLS for mutual authtentication by loading the CA into a +// CertPool and configuring the server's policy for TLS Client Authentication. +// Once TLS is configured, the gRPC options are built to make use of the TLS +// configuration and the receiver (Server) is registered to the gRPC server. +// Finally the gRPC server is started. +func ListenAndServe(r Registrator, setters ...Option) (err error) { + server := NewServer(r, setters...) + listener, err := NewListener(setters...) + + if err != nil { + return err + } + + return server.Serve(listener) }