|
|
|
|
@@ -5,16 +5,17 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"log/slog"
|
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"syscall"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"cuelang.org/go/cue"
|
|
|
|
|
core "github.com/holos-run/holos/api/core/v1alpha5"
|
|
|
|
|
"github.com/holos-run/holos/internal/artifact"
|
|
|
|
|
"github.com/holos-run/holos/internal/errors"
|
|
|
|
|
"github.com/holos-run/holos/internal/holos"
|
|
|
|
|
"github.com/holos-run/holos/internal/logger"
|
|
|
|
|
@@ -107,167 +108,84 @@ func (c *Component) Path() string {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var _ holos.BuildPlan = &BuildPlan{}
|
|
|
|
|
var _ task = generatorTask{}
|
|
|
|
|
var _ task = transformersTask{}
|
|
|
|
|
var _ task = validatorTask{}
|
|
|
|
|
|
|
|
|
|
// BuildPlan represents a component builder.
|
|
|
|
|
type BuildPlan struct {
|
|
|
|
|
core.BuildPlan
|
|
|
|
|
Opts holos.BuildOpts
|
|
|
|
|
type task interface {
|
|
|
|
|
id() string
|
|
|
|
|
run(ctx context.Context) error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build builds a BuildPlan into Artifact files.
|
|
|
|
|
func (b *BuildPlan) Build(ctx context.Context) error {
|
|
|
|
|
name := b.BuildPlan.Metadata.Name
|
|
|
|
|
path := b.Opts.Path
|
|
|
|
|
log := logger.FromContext(ctx).With("name", name, "path", path)
|
|
|
|
|
msg := fmt.Sprintf("could not build %s", name)
|
|
|
|
|
if b.BuildPlan.Spec.Disabled {
|
|
|
|
|
log.WarnContext(ctx, fmt.Sprintf("%s: disabled", msg))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
type taskParams struct {
|
|
|
|
|
taskName string
|
|
|
|
|
buildPlanName string
|
|
|
|
|
opts holos.BuildOpts
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
|
|
|
// One more for the producer
|
|
|
|
|
g.SetLimit(b.Opts.Concurrency + 1)
|
|
|
|
|
func (t taskParams) id() string {
|
|
|
|
|
return fmt.Sprintf("%s:%s/%s", t.opts.Path, t.buildPlanName, t.taskName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Producer.
|
|
|
|
|
g.Go(func() error {
|
|
|
|
|
for _, a := range b.BuildPlan.Spec.Artifacts {
|
|
|
|
|
msg := fmt.Sprintf("%s artifact %s", msg, a.Artifact)
|
|
|
|
|
log := log.With("artifact", a.Artifact)
|
|
|
|
|
if a.Skip {
|
|
|
|
|
log.WarnContext(ctx, fmt.Sprintf("%s: skipped field is true", msg))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
default:
|
|
|
|
|
// https://golang.org/doc/faq#closures_and_goroutines
|
|
|
|
|
a := a
|
|
|
|
|
// Worker. Blocks if limit has been reached.
|
|
|
|
|
g.Go(func() error {
|
|
|
|
|
for _, gen := range a.Generators {
|
|
|
|
|
switch gen.Kind {
|
|
|
|
|
case "Resources":
|
|
|
|
|
if err := b.resources(log, gen, b.Opts.Store); err != nil {
|
|
|
|
|
return errors.Format("could not generate resources: %w", err)
|
|
|
|
|
}
|
|
|
|
|
case "Helm":
|
|
|
|
|
if err := b.helm(ctx, log, gen, b.Opts.Store); err != nil {
|
|
|
|
|
return errors.Format("could not generate helm: %w", err)
|
|
|
|
|
}
|
|
|
|
|
case "File":
|
|
|
|
|
if err := b.file(log, gen, b.Opts.Store); err != nil {
|
|
|
|
|
return errors.Format("could not generate file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
return errors.Format("%s: unsupported kind %s", msg, gen.Kind)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
type generatorTask struct {
|
|
|
|
|
taskParams
|
|
|
|
|
generator core.Generator
|
|
|
|
|
wg *sync.WaitGroup
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, t := range a.Transformers {
|
|
|
|
|
switch t.Kind {
|
|
|
|
|
case "Kustomize":
|
|
|
|
|
if err := b.kustomize(ctx, log, t, b.Opts.Store); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
case "Join":
|
|
|
|
|
s := make([][]byte, 0, len(t.Inputs))
|
|
|
|
|
for _, input := range t.Inputs {
|
|
|
|
|
if data, ok := b.Opts.Store.Get(string(input)); ok {
|
|
|
|
|
s = append(s, data)
|
|
|
|
|
} else {
|
|
|
|
|
return errors.Format("%s: missing %s", msg, input)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
data := bytes.Join(s, []byte(t.Join.Separator))
|
|
|
|
|
if err := b.Opts.Store.Set(string(t.Output), data); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
log.Debug("set artifact: " + string(t.Output))
|
|
|
|
|
default:
|
|
|
|
|
return errors.Format("%s: unsupported kind %s", msg, t.Kind)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, validator := range a.Validators {
|
|
|
|
|
switch validator.Kind {
|
|
|
|
|
case "Command":
|
|
|
|
|
if err := b.validate(ctx, log, validator, b.Opts.Store); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
return errors.Format("%s: unsupported kind %s", msg, validator.Kind)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write the final artifact
|
|
|
|
|
if err := b.Opts.Store.Save(b.Opts.WriteTo, string(a.Artifact)); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
log.DebugContext(ctx, "wrote "+filepath.Join(b.Opts.WriteTo, string(a.Artifact)))
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
func (t generatorTask) run(ctx context.Context) error {
|
|
|
|
|
defer t.wg.Done()
|
|
|
|
|
msg := fmt.Sprintf("could not build %s", t.id())
|
|
|
|
|
switch t.generator.Kind {
|
|
|
|
|
case "Resources":
|
|
|
|
|
if err := t.resources(); err != nil {
|
|
|
|
|
return errors.Format("%s: could not generate resources: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Wait for completion and return the first error (if any)
|
|
|
|
|
return g.Wait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) Export(idx int, encoder holos.OrderedEncoder) error {
|
|
|
|
|
if err := encoder.Encode(idx, &b.BuildPlan); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
case "Helm":
|
|
|
|
|
if err := t.helm(ctx); err != nil {
|
|
|
|
|
return errors.Format("%s: could not generate helm: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
case "File":
|
|
|
|
|
if err := t.file(); err != nil {
|
|
|
|
|
return errors.Format("%s: could not generate file: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
return errors.Format("%s: unsupported kind %s", msg, t.generator.Kind)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) Load(v cue.Value) error {
|
|
|
|
|
return errors.Wrap(v.Decode(&b.BuildPlan))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) file(
|
|
|
|
|
log *slog.Logger,
|
|
|
|
|
g core.Generator,
|
|
|
|
|
store artifact.Store,
|
|
|
|
|
) error {
|
|
|
|
|
data, err := os.ReadFile(filepath.Join(string(b.Opts.Path), string(g.File.Source)))
|
|
|
|
|
func (t generatorTask) file() error {
|
|
|
|
|
data, err := os.ReadFile(filepath.Join(string(t.opts.Path), string(t.generator.File.Source)))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
if err := store.Set(string(g.Output), data); err != nil {
|
|
|
|
|
if err := t.opts.Store.Set(string(t.generator.Output), data); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
log.Debug("set artifact: " + string(g.Output))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) helm(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
log *slog.Logger,
|
|
|
|
|
g core.Generator,
|
|
|
|
|
store artifact.Store,
|
|
|
|
|
) error {
|
|
|
|
|
chartName := g.Helm.Chart.Name
|
|
|
|
|
log = log.With("chart", chartName)
|
|
|
|
|
func (t generatorTask) helm(ctx context.Context) error {
|
|
|
|
|
chartName := t.generator.Helm.Chart.Name
|
|
|
|
|
// Unnecessary? cargo cult copied from internal/cli/render/render.go
|
|
|
|
|
if chartName == "" {
|
|
|
|
|
return errors.New("missing chart name")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Cache the chart by version to pull new versions. (#273)
|
|
|
|
|
cacheDir := filepath.Join(string(b.Opts.Path), "vendor", g.Helm.Chart.Version)
|
|
|
|
|
cacheDir := filepath.Join(string(t.opts.Path), "vendor", t.generator.Helm.Chart.Version)
|
|
|
|
|
cachePath := filepath.Join(cacheDir, filepath.Base(chartName))
|
|
|
|
|
|
|
|
|
|
log := logger.FromContext(ctx)
|
|
|
|
|
|
|
|
|
|
if _, err := os.Stat(cachePath); os.IsNotExist(err) {
|
|
|
|
|
timeout, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
|
|
|
|
defer cancel()
|
|
|
|
|
err := onceWithLock(log, timeout, cachePath, func() error {
|
|
|
|
|
return b.cacheChart(ctx, log, cacheDir, g.Helm.Chart)
|
|
|
|
|
})
|
|
|
|
|
err := func() error {
|
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
|
|
|
|
defer cancel()
|
|
|
|
|
return onceWithLock(log, ctx, cachePath, func() error {
|
|
|
|
|
return cacheChart(ctx, cacheDir, t.generator.Helm.Chart, t.opts.Stderr)
|
|
|
|
|
})
|
|
|
|
|
}()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Format("could not cache chart: %w", err)
|
|
|
|
|
}
|
|
|
|
|
@@ -280,7 +198,7 @@ func (b *BuildPlan) helm(
|
|
|
|
|
}
|
|
|
|
|
defer util.Remove(ctx, tempDir)
|
|
|
|
|
|
|
|
|
|
data, err := yaml.Marshal(g.Helm.Values)
|
|
|
|
|
data, err := yaml.Marshal(t.generator.Helm.Values)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Format("could not marshal values: %w", err)
|
|
|
|
|
}
|
|
|
|
|
@@ -293,22 +211,22 @@ func (b *BuildPlan) helm(
|
|
|
|
|
|
|
|
|
|
// Run charts
|
|
|
|
|
args := []string{"template"}
|
|
|
|
|
if !g.Helm.EnableHooks {
|
|
|
|
|
if !t.generator.Helm.EnableHooks {
|
|
|
|
|
args = append(args, "--no-hooks")
|
|
|
|
|
}
|
|
|
|
|
for _, apiVersion := range g.Helm.APIVersions {
|
|
|
|
|
for _, apiVersion := range t.generator.Helm.APIVersions {
|
|
|
|
|
args = append(args, "--api-versions", apiVersion)
|
|
|
|
|
}
|
|
|
|
|
if kubeVersion := g.Helm.KubeVersion; kubeVersion != "" {
|
|
|
|
|
if kubeVersion := t.generator.Helm.KubeVersion; kubeVersion != "" {
|
|
|
|
|
args = append(args, "--kube-version", kubeVersion)
|
|
|
|
|
}
|
|
|
|
|
args = append(args,
|
|
|
|
|
"--include-crds",
|
|
|
|
|
"--values", valuesPath,
|
|
|
|
|
"--namespace", g.Helm.Namespace,
|
|
|
|
|
"--namespace", t.generator.Helm.Namespace,
|
|
|
|
|
"--kubeconfig", "/dev/null",
|
|
|
|
|
"--version", g.Helm.Chart.Version,
|
|
|
|
|
g.Helm.Chart.Release,
|
|
|
|
|
"--version", t.generator.Helm.Chart.Version,
|
|
|
|
|
t.generator.Helm.Chart.Release,
|
|
|
|
|
cachePath,
|
|
|
|
|
)
|
|
|
|
|
helmOut, err := util.RunCmd(ctx, "helm", args...)
|
|
|
|
|
@@ -325,36 +243,31 @@ func (b *BuildPlan) helm(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set the artifact
|
|
|
|
|
if err := store.Set(string(g.Output), helmOut.Stdout.Bytes()); err != nil {
|
|
|
|
|
if err := t.opts.Store.Set(string(t.generator.Output), helmOut.Stdout.Bytes()); err != nil {
|
|
|
|
|
return errors.Format("could not store helm output: %w", err)
|
|
|
|
|
}
|
|
|
|
|
log.Debug("set artifact: " + string(g.Output))
|
|
|
|
|
log.Debug("set artifact: " + string(t.generator.Output))
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) resources(
|
|
|
|
|
log *slog.Logger,
|
|
|
|
|
g core.Generator,
|
|
|
|
|
store artifact.Store,
|
|
|
|
|
) error {
|
|
|
|
|
func (t generatorTask) resources() error {
|
|
|
|
|
var size int
|
|
|
|
|
for _, m := range g.Resources {
|
|
|
|
|
for _, m := range t.generator.Resources {
|
|
|
|
|
size += len(m)
|
|
|
|
|
}
|
|
|
|
|
list := make([]core.Resource, 0, size)
|
|
|
|
|
|
|
|
|
|
for _, m := range g.Resources {
|
|
|
|
|
for _, m := range t.generator.Resources {
|
|
|
|
|
for _, r := range m {
|
|
|
|
|
list = append(list, r)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msg := fmt.Sprintf(
|
|
|
|
|
"could not generate %s for %s path %s",
|
|
|
|
|
g.Output,
|
|
|
|
|
b.BuildPlan.Metadata.Name,
|
|
|
|
|
b.Opts.Path,
|
|
|
|
|
"could not generate %s for %s",
|
|
|
|
|
t.generator.Output,
|
|
|
|
|
t.id(),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
buf, err := marshal(list)
|
|
|
|
|
@@ -362,114 +275,220 @@ func (b *BuildPlan) resources(
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := store.Set(string(g.Output), buf.Bytes()); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug("set artifact " + string(g.Output))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) validate(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
log *slog.Logger,
|
|
|
|
|
validator core.Validator,
|
|
|
|
|
store artifact.Store,
|
|
|
|
|
) error {
|
|
|
|
|
tempDir, err := os.MkdirTemp("", "holos.validate")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
defer util.Remove(ctx, tempDir)
|
|
|
|
|
msg := fmt.Sprintf(
|
|
|
|
|
"could not validate %s path %s",
|
|
|
|
|
b.BuildPlan.Metadata.Name,
|
|
|
|
|
b.Opts.Path,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Write the inputs
|
|
|
|
|
for _, input := range validator.Inputs {
|
|
|
|
|
path := string(input)
|
|
|
|
|
if err := store.Save(tempDir, path); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
log.DebugContext(ctx, "wrote "+filepath.Join(tempDir, path))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(validator.Command.Args) < 1 {
|
|
|
|
|
return errors.Format("%s: command args length must be at least 1", msg)
|
|
|
|
|
}
|
|
|
|
|
size := len(validator.Command.Args) + len(validator.Inputs)
|
|
|
|
|
args := make([]string, 0, size)
|
|
|
|
|
args = append(args, validator.Command.Args...)
|
|
|
|
|
for _, input := range validator.Inputs {
|
|
|
|
|
args = append(args, filepath.Join(tempDir, string(input)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute the validator
|
|
|
|
|
if _, err = util.RunCmdA(ctx, b.Opts.Stderr, args[0], args[1:]...); err != nil {
|
|
|
|
|
if err := t.opts.Store.Set(string(t.generator.Output), buf.Bytes()); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) kustomize(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
log *slog.Logger,
|
|
|
|
|
t core.Transformer,
|
|
|
|
|
store artifact.Store,
|
|
|
|
|
) error {
|
|
|
|
|
tempDir, err := os.MkdirTemp("", "holos.kustomize")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
defer util.Remove(ctx, tempDir)
|
|
|
|
|
msg := fmt.Sprintf(
|
|
|
|
|
"could not transform %s for %s path %s",
|
|
|
|
|
t.Output,
|
|
|
|
|
b.BuildPlan.Metadata.Name,
|
|
|
|
|
b.Opts.Path,
|
|
|
|
|
)
|
|
|
|
|
type transformersTask struct {
|
|
|
|
|
taskParams
|
|
|
|
|
transformers []core.Transformer
|
|
|
|
|
wg *sync.WaitGroup
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write the kustomization
|
|
|
|
|
data, err := yaml.Marshal(t.Kustomize.Kustomization)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
path := filepath.Join(tempDir, "kustomization.yaml")
|
|
|
|
|
if err := os.WriteFile(path, data, 0666); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
log.DebugContext(ctx, "wrote "+path)
|
|
|
|
|
|
|
|
|
|
// Write the inputs
|
|
|
|
|
for _, input := range t.Inputs {
|
|
|
|
|
path := string(input)
|
|
|
|
|
if err := store.Save(tempDir, path); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
func (t transformersTask) run(ctx context.Context) error {
|
|
|
|
|
defer t.wg.Done()
|
|
|
|
|
for idx, transformer := range t.transformers {
|
|
|
|
|
msg := fmt.Sprintf("could not build %s/%d", t.id(), idx)
|
|
|
|
|
switch transformer.Kind {
|
|
|
|
|
case "Kustomize":
|
|
|
|
|
if err := kustomize(ctx, transformer, t.taskParams); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
case "Join":
|
|
|
|
|
s := make([][]byte, 0, len(transformer.Inputs))
|
|
|
|
|
for _, input := range transformer.Inputs {
|
|
|
|
|
if data, ok := t.opts.Store.Get(string(input)); ok {
|
|
|
|
|
s = append(s, data)
|
|
|
|
|
} else {
|
|
|
|
|
return errors.Format("%s: missing %s", msg, input)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
data := bytes.Join(s, []byte(transformer.Join.Separator))
|
|
|
|
|
if err := t.opts.Store.Set(string(transformer.Output), data); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
return errors.Format("%s: unsupported kind %s", msg, transformer.Kind)
|
|
|
|
|
}
|
|
|
|
|
log.DebugContext(ctx, "wrote "+filepath.Join(tempDir, path))
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute kustomize
|
|
|
|
|
r, err := util.RunCmdW(ctx, b.Opts.Stderr, "kubectl", "kustomize", tempDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
kErr := r.Stderr.String()
|
|
|
|
|
err = errors.Format("%s: could not run kustomize: %w", msg, err)
|
|
|
|
|
log.ErrorContext(ctx, fmt.Sprintf("%s: stderr:\n%s", err.Error(), kErr), "err", err, "stderr", kErr)
|
|
|
|
|
return err
|
|
|
|
|
type validatorTask struct {
|
|
|
|
|
taskParams
|
|
|
|
|
validator core.Validator
|
|
|
|
|
wg *sync.WaitGroup
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t validatorTask) run(ctx context.Context) error {
|
|
|
|
|
defer t.wg.Done()
|
|
|
|
|
msg := fmt.Sprintf("could not validate %s", t.id())
|
|
|
|
|
switch kind := t.validator.Kind; kind {
|
|
|
|
|
case "Command":
|
|
|
|
|
if err := validate(ctx, t.validator, t.taskParams); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
return errors.Format("%s: unsupported kind %s", msg, kind)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Store the artifact
|
|
|
|
|
if err := store.Set(string(t.Output), r.Stdout.Bytes()); err != nil {
|
|
|
|
|
func worker(ctx context.Context, idx int, tasks chan task) error {
|
|
|
|
|
log := logger.FromContext(ctx).With("worker", idx)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
case task, ok := <-tasks:
|
|
|
|
|
if !ok {
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("worker %d returning: tasks chan closed", idx))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("worker %d task %s starting", idx, task.id()))
|
|
|
|
|
if err := task.run(ctx); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("worker %d task %s finished ok", idx, task.id()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func buildArtifact(ctx context.Context, idx int, artifact core.Artifact, tasks chan task, buildPlanName string, opts holos.BuildOpts) error {
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
msg := fmt.Sprintf("could not build %s artifact %s", buildPlanName, artifact.Artifact)
|
|
|
|
|
// Process Generators concurrently
|
|
|
|
|
for gid, gen := range artifact.Generators {
|
|
|
|
|
task := generatorTask{
|
|
|
|
|
taskParams: taskParams{
|
|
|
|
|
taskName: fmt.Sprintf("artifact/%d/generator/%d", idx, gid),
|
|
|
|
|
buildPlanName: buildPlanName,
|
|
|
|
|
opts: opts,
|
|
|
|
|
},
|
|
|
|
|
generator: gen,
|
|
|
|
|
wg: &wg,
|
|
|
|
|
}
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
case tasks <- task:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
|
|
// Process Transformers sequentially
|
|
|
|
|
task := transformersTask{
|
|
|
|
|
taskParams: taskParams{
|
|
|
|
|
taskName: fmt.Sprintf("artifact/%d/transformers", idx),
|
|
|
|
|
buildPlanName: buildPlanName,
|
|
|
|
|
opts: opts,
|
|
|
|
|
},
|
|
|
|
|
transformers: artifact.Transformers,
|
|
|
|
|
wg: &wg,
|
|
|
|
|
}
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
case tasks <- task:
|
|
|
|
|
}
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
|
|
// Process Validators concurrently
|
|
|
|
|
for vid, val := range artifact.Validators {
|
|
|
|
|
task := validatorTask{
|
|
|
|
|
taskParams: taskParams{
|
|
|
|
|
taskName: fmt.Sprintf("artifact/%d/validator/%d", idx, vid),
|
|
|
|
|
buildPlanName: buildPlanName,
|
|
|
|
|
opts: opts,
|
|
|
|
|
},
|
|
|
|
|
validator: val,
|
|
|
|
|
wg: &wg,
|
|
|
|
|
}
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
case tasks <- task:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
|
|
// Write the final artifact
|
|
|
|
|
out := string(artifact.Artifact)
|
|
|
|
|
if err := opts.Store.Save(opts.WriteTo, out); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
log.Debug("set artifact " + string(t.Output))
|
|
|
|
|
log := logger.FromContext(ctx)
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("wrote %s", filepath.Join(opts.WriteTo, out)))
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// BuildPlan represents a component builder.
|
|
|
|
|
type BuildPlan struct {
|
|
|
|
|
core.BuildPlan
|
|
|
|
|
Opts holos.BuildOpts
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) Build(ctx context.Context) error {
|
|
|
|
|
name := b.BuildPlan.Metadata.Name
|
|
|
|
|
path := b.Opts.Path
|
|
|
|
|
log := logger.FromContext(ctx).With("name", name, "path", path)
|
|
|
|
|
|
|
|
|
|
msg := fmt.Sprintf("could not build %s", name)
|
|
|
|
|
if b.BuildPlan.Spec.Disabled {
|
|
|
|
|
log.WarnContext(ctx, fmt.Sprintf("%s: disabled", msg))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
|
|
|
tasks := make(chan task)
|
|
|
|
|
|
|
|
|
|
// Start the worker pool.
|
|
|
|
|
for idx := 0; idx < max(1, b.Opts.Concurrency); idx++ {
|
|
|
|
|
g.Go(func() error {
|
|
|
|
|
return worker(ctx, idx, tasks)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start one producer that fans out to one pipeline per artifact.
|
|
|
|
|
g.Go(func() error {
|
|
|
|
|
// Close the tasks chan when the producer returns.
|
|
|
|
|
defer func() {
|
|
|
|
|
log.DebugContext(ctx, "producer returning: closing tasks chan")
|
|
|
|
|
close(tasks)
|
|
|
|
|
}()
|
|
|
|
|
// Separate error group for producers.
|
|
|
|
|
p, ctx := errgroup.WithContext(ctx)
|
|
|
|
|
for idx, a := range b.BuildPlan.Spec.Artifacts {
|
|
|
|
|
p.Go(func() error {
|
|
|
|
|
return buildArtifact(ctx, idx, a, tasks, b.Metadata.Name, b.Opts)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
// Wait on producers to finish.
|
|
|
|
|
return errors.Wrap(p.Wait())
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Wait on workers to finish.
|
|
|
|
|
return g.Wait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) Export(idx int, encoder holos.OrderedEncoder) error {
|
|
|
|
|
if err := encoder.Encode(idx, &b.BuildPlan); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BuildPlan) Load(v cue.Value) error {
|
|
|
|
|
return errors.Wrap(v.Decode(&b.BuildPlan))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func marshal(list []core.Resource) (buf bytes.Buffer, err error) {
|
|
|
|
|
encoder := yaml.NewEncoder(&buf)
|
|
|
|
|
defer encoder.Close()
|
|
|
|
|
@@ -482,27 +501,59 @@ func marshal(list []core.Resource) (buf bytes.Buffer, err error) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cacheChart stores a cached copy of Chart in the chart subdirectory of path.
|
|
|
|
|
//
|
|
|
|
|
// We assume the only method responsible for writing to chartDir is cacheChart
|
|
|
|
|
// itself. cacheChart runs concurrently when rendering a platform.
|
|
|
|
|
//
|
|
|
|
|
// We rely on the atomicity of moving temporary directories into place on the
|
|
|
|
|
// same filesystem via os.Rename. If a syscall.EEXIST error occurs during
|
|
|
|
|
// renaming, it indicates that the cached chart already exists, which is
|
|
|
|
|
// expected when this function is called concurrently.
|
|
|
|
|
//
|
|
|
|
|
// TODO(jeff): Break the dependency on v1alpha5, make it work across versions as
|
|
|
|
|
// a utility function.
|
|
|
|
|
func (b *BuildPlan) cacheChart(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
log *slog.Logger,
|
|
|
|
|
cacheDir string,
|
|
|
|
|
chart core.Chart,
|
|
|
|
|
) error {
|
|
|
|
|
// onceWithLock obtains a filesystem lock with mkdir, then executes fn. If the
|
|
|
|
|
// lock is already locked, onceWithLock waits for it to be released then returns
|
|
|
|
|
// without calling fn.
|
|
|
|
|
func onceWithLock(log *slog.Logger, ctx context.Context, path string, fn func() error) error {
|
|
|
|
|
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Obtain a lock with a timeout.
|
|
|
|
|
lockDir := path + ".lock"
|
|
|
|
|
log = log.With("lock", lockDir)
|
|
|
|
|
|
|
|
|
|
err := os.Mkdir(lockDir, 0777)
|
|
|
|
|
if err == nil {
|
|
|
|
|
defer os.RemoveAll(lockDir)
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("acquired %s", lockDir))
|
|
|
|
|
if err := fn(); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("released %s", lockDir))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait until the lock is released then return.
|
|
|
|
|
if os.IsExist(err) {
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("blocked %s", lockDir))
|
|
|
|
|
stillBlocked := time.After(5 * time.Second)
|
|
|
|
|
deadLocked := time.After(10 * time.Second)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-stillBlocked:
|
|
|
|
|
log.WarnContext(ctx, fmt.Sprintf("waiting for %s to be released", lockDir))
|
|
|
|
|
case <-deadLocked:
|
|
|
|
|
log.WarnContext(ctx, fmt.Sprintf("still waiting for %s to be released (dead lock?)", lockDir))
|
|
|
|
|
case <-time.After(100 * time.Millisecond):
|
|
|
|
|
if _, err := os.Stat(lockDir); os.IsNotExist(err) {
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("unblocked %s", lockDir))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return errors.Wrap(ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unexpected error
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func cacheChart(ctx context.Context, cacheDir string, chart core.Chart, stderr io.Writer) error {
|
|
|
|
|
log := logger.FromContext(ctx)
|
|
|
|
|
// Add repositories
|
|
|
|
|
repo := chart.Repository
|
|
|
|
|
stderr := b.Opts.Stderr
|
|
|
|
|
if repo.URL == "" {
|
|
|
|
|
// repo update not needed for oci charts so this is debug instead of warn.
|
|
|
|
|
log.DebugContext(ctx, "skipped helm repo add and update: repo url is empty")
|
|
|
|
|
@@ -571,51 +622,82 @@ func (b *BuildPlan) cacheChart(
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// onceWithLock obtains a filesystem lock with mkdir, then executes fn. If the
|
|
|
|
|
// lock is already locked, onceWithLock waits for it to be released then returns
|
|
|
|
|
// without calling fn.
|
|
|
|
|
func onceWithLock(log *slog.Logger, ctx context.Context, path string, fn func() error) error {
|
|
|
|
|
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {
|
|
|
|
|
func kustomize(ctx context.Context, t core.Transformer, p taskParams) error {
|
|
|
|
|
tempDir, err := os.MkdirTemp("", "holos.kustomize")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
defer util.Remove(ctx, tempDir)
|
|
|
|
|
msg := fmt.Sprintf(
|
|
|
|
|
"could not transform %s for %s path %s",
|
|
|
|
|
t.Output,
|
|
|
|
|
p.buildPlanName,
|
|
|
|
|
p.opts.Path,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Obtain a lock with a timeout.
|
|
|
|
|
lockDir := path + ".lock"
|
|
|
|
|
log = log.With("lock", lockDir)
|
|
|
|
|
|
|
|
|
|
err := os.Mkdir(lockDir, 0777)
|
|
|
|
|
if err == nil {
|
|
|
|
|
defer os.RemoveAll(lockDir)
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("acquired %s", lockDir))
|
|
|
|
|
if err := fn(); err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("released %s", lockDir))
|
|
|
|
|
return nil
|
|
|
|
|
// Write the kustomization
|
|
|
|
|
data, err := yaml.Marshal(t.Kustomize.Kustomization)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
path := filepath.Join(tempDir, "kustomization.yaml")
|
|
|
|
|
if err := os.WriteFile(path, data, 0666); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait until the lock is released then return.
|
|
|
|
|
if os.IsExist(err) {
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("blocked %s", lockDir))
|
|
|
|
|
stillBlocked := time.After(5 * time.Second)
|
|
|
|
|
deadLocked := time.After(10 * time.Second)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-stillBlocked:
|
|
|
|
|
log.WarnContext(ctx, fmt.Sprintf("waiting for %s to be released", lockDir))
|
|
|
|
|
case <-deadLocked:
|
|
|
|
|
log.WarnContext(ctx, fmt.Sprintf("still waiting for %s to be released (dead lock?)", lockDir))
|
|
|
|
|
case <-time.After(100 * time.Millisecond):
|
|
|
|
|
if _, err := os.Stat(lockDir); os.IsNotExist(err) {
|
|
|
|
|
log.DebugContext(ctx, fmt.Sprintf("unblocked %s", lockDir))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return errors.Wrap(ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
// Write the inputs
|
|
|
|
|
for _, input := range t.Inputs {
|
|
|
|
|
path := string(input)
|
|
|
|
|
if err := p.opts.Store.Save(tempDir, path); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unexpected error
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
// Execute kustomize
|
|
|
|
|
r, err := util.RunCmdW(ctx, p.opts.Stderr, "kubectl", "kustomize", tempDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Format("%s: could not run kustomize: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Store the artifact
|
|
|
|
|
if err := p.opts.Store.Set(string(t.Output), r.Stdout.Bytes()); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func validate(ctx context.Context, validator core.Validator, p taskParams) error {
|
|
|
|
|
store := p.opts.Store
|
|
|
|
|
tempDir, err := os.MkdirTemp("", "holos.validate")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
// defer util.Remove(ctx, tempDir)
|
|
|
|
|
msg := fmt.Sprintf("could not validate %s", p.id())
|
|
|
|
|
|
|
|
|
|
// Write the inputs
|
|
|
|
|
for _, input := range validator.Inputs {
|
|
|
|
|
path := string(input)
|
|
|
|
|
if err := store.Save(tempDir, path); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(validator.Command.Args) < 1 {
|
|
|
|
|
return errors.Format("%s: command args length must be at least 1", msg)
|
|
|
|
|
}
|
|
|
|
|
size := len(validator.Command.Args) + len(validator.Inputs)
|
|
|
|
|
args := make([]string, 0, size)
|
|
|
|
|
args = append(args, validator.Command.Args...)
|
|
|
|
|
for _, input := range validator.Inputs {
|
|
|
|
|
args = append(args, filepath.Join(tempDir, string(input)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute the validator
|
|
|
|
|
if _, err = util.RunCmdA(ctx, p.opts.Stderr, args[0], args[1:]...); err != nil {
|
|
|
|
|
return errors.Format("%s: %w", msg, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|