Compare commits

...

8 Commits

Author SHA1 Message Date
Jeff McCune
ee16f14e03 refactor build plan pipeline
Previously the BuildPlan pipeline didn't execute generators and
transformers concurrently.  All steps were sequentially executed.  Holos
was primarily concurrent by executing multiple BuildPlans at once.

This patch changes the Build implementation for each BuildPlan to
execute a GoRoutine pipeline.  One producer fans out to a group of
routines each executing the pipeline for one artifact in the build plan.
The pipeline has 3 stages:

1: Fan-out to build each Generator concurrently.
2: Fan-in to build each Transformer sequentially.
3: Fan-out again to run each validator concurrently.

When the artifact pipelines return, the producer closes the tasks
channel causing the worker tasks to return.

Note the overall runtime for 8 BuildPlans is roughly equivalent to
previously at 160ms with --concurrency=8 on my M3 Max.  I expect this to
perform better than previously when multiple artifacts are rendered for
each BuildPlan.
2024-11-29 14:52:06 -08:00
Jeff McCune
7530345620 main: add tracing and profiling
Writes files based on parent pid and process pid to avoid collisions.

Analyze with:

export HOLOS_TRACE=trace.%d.%d.out
go tool trace trace.999.1000.out

export HOLOS_CPU_PROFILE=cpu.%d.%d.prof
go tool pprof cpu.999.1000.prof

export HOLOS_MEM_PROFILE=mem.%d.%d.prof
go tool pprof mem.999.1000.prof
2024-11-29 14:52:06 -08:00
Jeff McCune
47d60ef86d docs: fix cue vet path in validators post (#357)
Without this patch the validator fails if a component manages two of the
same kind of resource, which is common.

This patch updates the example to use the metadata namespace and name as
lookup keys.  This works for most components, but may not for
ClusterResources.  Use the kind top level field in that case and pass
the field name of the validator as a tag value to vary by component.
2024-11-25 19:35:02 -08:00
Jeff McCune
9e9f6efd04 docs: fix typos in validators blog post (#357) 2024-11-25 15:46:54 -08:00
Jeff McCune
fb4a043823 docs: add card for validators blog post (#357) 2024-11-25 15:11:11 -08:00
Jeff McCune
d718ab1910 docs: redirect /docs/local-cluster to the v1alpha5 topic 2024-11-25 10:53:31 -08:00
Jeff McCune
c649db18a9 docs: redirect /docs/quickstart to the overview 2024-11-25 10:43:16 -08:00
Jeff McCune
b3bddf3ee3 docs: add validators blog post (#357) 2024-11-25 08:49:27 -08:00
9 changed files with 499 additions and 321 deletions

View File

@@ -7,11 +7,11 @@ cd $WORK
exec holos generate platform v1alpha5 --force
# Platforms are empty by default.
exec holos render platform ./platform
exec holos render platform
stderr -count=1 '^rendered platform'
# Holos uses CUE to build a platform specification.
exec cue export --expression holos --out=yaml ./platform
exec holos show platform
cmp stdout want/1.platform_spec.yaml
# Define the host and port in projects/blackbox.schema.cue
@@ -22,7 +22,7 @@ mv projects/platform/components/prometheus/prometheus.cue.disabled projects/plat
mv platform/prometheus.cue.disabled platform/prometheus.cue
# Render the platform to render the prometheus chart.
exec holos render platform ./platform
exec holos render platform
stderr -count=1 '^rendered prometheus'
stderr -count=1 '^rendered platform'
cmp deploy/components/prometheus/prometheus.gen.yaml want/1.prometheus.gen.yaml
@@ -73,8 +73,8 @@ core.#BuildPlan & {
metadata: name: _Tags.component.name
}
-- want/1.platform_spec.yaml --
kind: Platform
apiVersion: v1alpha5
kind: Platform
metadata:
name: default
spec:

View File

@@ -135,11 +135,19 @@ package policy
import apps "k8s.io/api/apps/v1"
// Organize by kind then name to avoid conflicts.
kind: [KIND=string]: [NAME=string]: {...}
// Useful when one component manages the same resource kind and name across
// multiple namespaces.
let KIND = kind
namespace: [NS=string]: KIND
// Block Secret resources. kind will not unify with "Secret"
secret: kind: "Use an ExternalSecret instead. Forbidden by security policy."
kind: secret: [NAME=string]: kind: "Use an ExternalSecret instead. Forbidden by security policy. secret/\(NAME)"
// Validate Deployment against Kubernetes type definitions.
deployment: apps.#Deployment
kind: deployment: [_]: apps.#Deployment
```
```shell
EOF
@@ -160,7 +168,16 @@ package holos
#ComponentConfig: Validators: cue: {
kind: "Command"
// Note --path maps each resource to a top level field named by the kind.
command: args: ["holos", "cue", "vet", "./policy", "--path", "strings.ToLower(kind)"]
command: args: [
"holos",
"cue",
"vet",
"./policy",
"--path=\"namespace\"",
"--path=metadata.namespace",
"--path=strings.ToLower(kind)",
"--path=metadata.name",
]
}
```
```shell

View File

@@ -0,0 +1,35 @@
---
slug: validators-feature
title: Validators added in Holos v0.101.0
authors: [jeff]
tags: [holos, feature]
image: /img/cards/validators.png
description: Validators are useful to enforce policy and catch Helm errors.
---
import RenderingOverview from '@site/src/diagrams/rendering-overview.mdx';
import RenderPlatformDiagram from '@site/src/diagrams/render-platform-sequence.mdx';
We've added support for [Validators] in [v0.101.0]. Validators are useful to
enforce policies and ensure consistency early in the process. This feature
addresses two primary use cases:
1. Prevent insecure configuration early in the process. For example, prevent
Helm from rendering a `Secret` which would otherwise be committed to version control.
2. Prevent unsafe configuration by validating manifests against Kubernetes core
and custom resource type definitions.
Check out the [Validators] tutorial for examples of both use cases.
[Validators]: https://holos.run/docs/v1alpha5/tutorial/validators/
[v0.101.0]: https://github.com/holos-run/holos/releases/tag/v0.101.0
{/* truncate */}
<RenderingOverview />
Validators complete the core functionality of the Holos manifest rendering
pipeline. We are seeking design partners to help enhance Generators and
Transformers. Validators are implemented using a generic `Command` kind, and
we're considering a similar kind for Generators and Transformers. Please
connect with us if you'd like to help design these enhancements.

View File

@@ -1,10 +1,14 @@
/docs /docs/v1alpha5/ 301
/docs/ /docs/v1alpha5/ 301
/docs/tutorial /docs/v1alpha5/tutorial/ 301
/docs/tutorial/ /docs/v1alpha5/tutorial/ 301
/docs/overview /docs/v1alpha5/tutorial/overview/ 301
/docs/overview/ /docs/v1alpha5/tutorial/overview/ 301
/docs/topics /docs/v1alpha5/topics/structures/ 301
/docs/topics/ /docs/v1alpha5/topics/structures/ 301
/docs/setup /docs/v1alpha5/tutorial/setup/ 301
/docs/setup/ /docs/v1alpha5/tutorial/setup/ 301
/docs /docs/v1alpha5/ 301
/docs/ /docs/v1alpha5/ 301
/docs/tutorial /docs/v1alpha5/tutorial/ 301
/docs/tutorial/ /docs/v1alpha5/tutorial/ 301
/docs/quickstart /docs/v1alpha5/tutorial/overview/ 301
/docs/quickstart/ /docs/v1alpha5/tutorial/overview/ 301
/docs/overview /docs/v1alpha5/tutorial/overview/ 301
/docs/overview/ /docs/v1alpha5/tutorial/overview/ 301
/docs/topics /docs/v1alpha5/topics/structures/ 301
/docs/topics/ /docs/v1alpha5/topics/structures/ 301
/docs/setup /docs/v1alpha5/tutorial/setup/ 301
/docs/setup/ /docs/v1alpha5/tutorial/setup/ 301
/docs/local-cluster /docs/v1alpha5/topics/local-cluster/ 301
/docs/local-cluster/ /docs/v1alpha5/topics/local-cluster/ 301

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

View File

@@ -20,6 +20,7 @@ var _ Store = NewStore()
type Store interface {
Get(path string) (data []byte, ok bool)
Set(path string, data []byte) error
// Save previously set path to dir preserving directories.
Save(dir, path string) error
}

View File

@@ -5,16 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"cuelang.org/go/cue"
core "github.com/holos-run/holos/api/core/v1alpha5"
"github.com/holos-run/holos/internal/artifact"
"github.com/holos-run/holos/internal/errors"
"github.com/holos-run/holos/internal/holos"
"github.com/holos-run/holos/internal/logger"
@@ -107,167 +108,84 @@ func (c *Component) Path() string {
}
var _ holos.BuildPlan = &BuildPlan{}
var _ task = generatorTask{}
var _ task = transformersTask{}
var _ task = validatorTask{}
// BuildPlan represents a component builder.
type BuildPlan struct {
core.BuildPlan
Opts holos.BuildOpts
type task interface {
id() string
run(ctx context.Context) error
}
// Build builds a BuildPlan into Artifact files.
func (b *BuildPlan) Build(ctx context.Context) error {
name := b.BuildPlan.Metadata.Name
path := b.Opts.Path
log := logger.FromContext(ctx).With("name", name, "path", path)
msg := fmt.Sprintf("could not build %s", name)
if b.BuildPlan.Spec.Disabled {
log.WarnContext(ctx, fmt.Sprintf("%s: disabled", msg))
return nil
}
type taskParams struct {
taskName string
buildPlanName string
opts holos.BuildOpts
}
g, ctx := errgroup.WithContext(ctx)
// One more for the producer
g.SetLimit(b.Opts.Concurrency + 1)
func (t taskParams) id() string {
return fmt.Sprintf("%s:%s/%s", t.opts.Path, t.buildPlanName, t.taskName)
}
// Producer.
g.Go(func() error {
for _, a := range b.BuildPlan.Spec.Artifacts {
msg := fmt.Sprintf("%s artifact %s", msg, a.Artifact)
log := log.With("artifact", a.Artifact)
if a.Skip {
log.WarnContext(ctx, fmt.Sprintf("%s: skipped field is true", msg))
continue
}
select {
case <-ctx.Done():
return ctx.Err()
default:
// https://golang.org/doc/faq#closures_and_goroutines
a := a
// Worker. Blocks if limit has been reached.
g.Go(func() error {
for _, gen := range a.Generators {
switch gen.Kind {
case "Resources":
if err := b.resources(log, gen, b.Opts.Store); err != nil {
return errors.Format("could not generate resources: %w", err)
}
case "Helm":
if err := b.helm(ctx, log, gen, b.Opts.Store); err != nil {
return errors.Format("could not generate helm: %w", err)
}
case "File":
if err := b.file(log, gen, b.Opts.Store); err != nil {
return errors.Format("could not generate file: %w", err)
}
default:
return errors.Format("%s: unsupported kind %s", msg, gen.Kind)
}
}
type generatorTask struct {
taskParams
generator core.Generator
wg *sync.WaitGroup
}
for _, t := range a.Transformers {
switch t.Kind {
case "Kustomize":
if err := b.kustomize(ctx, log, t, b.Opts.Store); err != nil {
return errors.Wrap(err)
}
case "Join":
s := make([][]byte, 0, len(t.Inputs))
for _, input := range t.Inputs {
if data, ok := b.Opts.Store.Get(string(input)); ok {
s = append(s, data)
} else {
return errors.Format("%s: missing %s", msg, input)
}
}
data := bytes.Join(s, []byte(t.Join.Separator))
if err := b.Opts.Store.Set(string(t.Output), data); err != nil {
return errors.Format("%s: %w", msg, err)
}
log.Debug("set artifact: " + string(t.Output))
default:
return errors.Format("%s: unsupported kind %s", msg, t.Kind)
}
}
for _, validator := range a.Validators {
switch validator.Kind {
case "Command":
if err := b.validate(ctx, log, validator, b.Opts.Store); err != nil {
return errors.Wrap(err)
}
default:
return errors.Format("%s: unsupported kind %s", msg, validator.Kind)
}
}
// Write the final artifact
if err := b.Opts.Store.Save(b.Opts.WriteTo, string(a.Artifact)); err != nil {
return errors.Format("%s: %w", msg, err)
}
log.DebugContext(ctx, "wrote "+filepath.Join(b.Opts.WriteTo, string(a.Artifact)))
return nil
})
}
func (t generatorTask) run(ctx context.Context) error {
defer t.wg.Done()
msg := fmt.Sprintf("could not build %s", t.id())
switch t.generator.Kind {
case "Resources":
if err := t.resources(); err != nil {
return errors.Format("%s: could not generate resources: %w", msg, err)
}
return nil
})
// Wait for completion and return the first error (if any)
return g.Wait()
}
func (b *BuildPlan) Export(idx int, encoder holos.OrderedEncoder) error {
if err := encoder.Encode(idx, &b.BuildPlan); err != nil {
return errors.Wrap(err)
case "Helm":
if err := t.helm(ctx); err != nil {
return errors.Format("%s: could not generate helm: %w", msg, err)
}
case "File":
if err := t.file(); err != nil {
return errors.Format("%s: could not generate file: %w", msg, err)
}
default:
return errors.Format("%s: unsupported kind %s", msg, t.generator.Kind)
}
return nil
}
func (b *BuildPlan) Load(v cue.Value) error {
return errors.Wrap(v.Decode(&b.BuildPlan))
}
func (b *BuildPlan) file(
log *slog.Logger,
g core.Generator,
store artifact.Store,
) error {
data, err := os.ReadFile(filepath.Join(string(b.Opts.Path), string(g.File.Source)))
func (t generatorTask) file() error {
data, err := os.ReadFile(filepath.Join(string(t.opts.Path), string(t.generator.File.Source)))
if err != nil {
return errors.Wrap(err)
}
if err := store.Set(string(g.Output), data); err != nil {
if err := t.opts.Store.Set(string(t.generator.Output), data); err != nil {
return errors.Wrap(err)
}
log.Debug("set artifact: " + string(g.Output))
return nil
}
func (b *BuildPlan) helm(
ctx context.Context,
log *slog.Logger,
g core.Generator,
store artifact.Store,
) error {
chartName := g.Helm.Chart.Name
log = log.With("chart", chartName)
func (t generatorTask) helm(ctx context.Context) error {
chartName := t.generator.Helm.Chart.Name
// Unnecessary? cargo cult copied from internal/cli/render/render.go
if chartName == "" {
return errors.New("missing chart name")
}
// Cache the chart by version to pull new versions. (#273)
cacheDir := filepath.Join(string(b.Opts.Path), "vendor", g.Helm.Chart.Version)
cacheDir := filepath.Join(string(t.opts.Path), "vendor", t.generator.Helm.Chart.Version)
cachePath := filepath.Join(cacheDir, filepath.Base(chartName))
log := logger.FromContext(ctx)
if _, err := os.Stat(cachePath); os.IsNotExist(err) {
timeout, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
err := onceWithLock(log, timeout, cachePath, func() error {
return b.cacheChart(ctx, log, cacheDir, g.Helm.Chart)
})
err := func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
return onceWithLock(log, ctx, cachePath, func() error {
return cacheChart(ctx, cacheDir, t.generator.Helm.Chart, t.opts.Stderr)
})
}()
if err != nil {
return errors.Format("could not cache chart: %w", err)
}
@@ -280,7 +198,7 @@ func (b *BuildPlan) helm(
}
defer util.Remove(ctx, tempDir)
data, err := yaml.Marshal(g.Helm.Values)
data, err := yaml.Marshal(t.generator.Helm.Values)
if err != nil {
return errors.Format("could not marshal values: %w", err)
}
@@ -293,22 +211,22 @@ func (b *BuildPlan) helm(
// Run charts
args := []string{"template"}
if !g.Helm.EnableHooks {
if !t.generator.Helm.EnableHooks {
args = append(args, "--no-hooks")
}
for _, apiVersion := range g.Helm.APIVersions {
for _, apiVersion := range t.generator.Helm.APIVersions {
args = append(args, "--api-versions", apiVersion)
}
if kubeVersion := g.Helm.KubeVersion; kubeVersion != "" {
if kubeVersion := t.generator.Helm.KubeVersion; kubeVersion != "" {
args = append(args, "--kube-version", kubeVersion)
}
args = append(args,
"--include-crds",
"--values", valuesPath,
"--namespace", g.Helm.Namespace,
"--namespace", t.generator.Helm.Namespace,
"--kubeconfig", "/dev/null",
"--version", g.Helm.Chart.Version,
g.Helm.Chart.Release,
"--version", t.generator.Helm.Chart.Version,
t.generator.Helm.Chart.Release,
cachePath,
)
helmOut, err := util.RunCmd(ctx, "helm", args...)
@@ -325,36 +243,31 @@ func (b *BuildPlan) helm(
}
// Set the artifact
if err := store.Set(string(g.Output), helmOut.Stdout.Bytes()); err != nil {
if err := t.opts.Store.Set(string(t.generator.Output), helmOut.Stdout.Bytes()); err != nil {
return errors.Format("could not store helm output: %w", err)
}
log.Debug("set artifact: " + string(g.Output))
log.Debug("set artifact: " + string(t.generator.Output))
return nil
}
func (b *BuildPlan) resources(
log *slog.Logger,
g core.Generator,
store artifact.Store,
) error {
func (t generatorTask) resources() error {
var size int
for _, m := range g.Resources {
for _, m := range t.generator.Resources {
size += len(m)
}
list := make([]core.Resource, 0, size)
for _, m := range g.Resources {
for _, m := range t.generator.Resources {
for _, r := range m {
list = append(list, r)
}
}
msg := fmt.Sprintf(
"could not generate %s for %s path %s",
g.Output,
b.BuildPlan.Metadata.Name,
b.Opts.Path,
"could not generate %s for %s",
t.generator.Output,
t.id(),
)
buf, err := marshal(list)
@@ -362,114 +275,220 @@ func (b *BuildPlan) resources(
return errors.Format("%s: %w", msg, err)
}
if err := store.Set(string(g.Output), buf.Bytes()); err != nil {
return errors.Format("%s: %w", msg, err)
}
log.Debug("set artifact " + string(g.Output))
return nil
}
func (b *BuildPlan) validate(
ctx context.Context,
log *slog.Logger,
validator core.Validator,
store artifact.Store,
) error {
tempDir, err := os.MkdirTemp("", "holos.validate")
if err != nil {
return errors.Wrap(err)
}
defer util.Remove(ctx, tempDir)
msg := fmt.Sprintf(
"could not validate %s path %s",
b.BuildPlan.Metadata.Name,
b.Opts.Path,
)
// Write the inputs
for _, input := range validator.Inputs {
path := string(input)
if err := store.Save(tempDir, path); err != nil {
return errors.Format("%s: %w", msg, err)
}
log.DebugContext(ctx, "wrote "+filepath.Join(tempDir, path))
}
if len(validator.Command.Args) < 1 {
return errors.Format("%s: command args length must be at least 1", msg)
}
size := len(validator.Command.Args) + len(validator.Inputs)
args := make([]string, 0, size)
args = append(args, validator.Command.Args...)
for _, input := range validator.Inputs {
args = append(args, filepath.Join(tempDir, string(input)))
}
// Execute the validator
if _, err = util.RunCmdA(ctx, b.Opts.Stderr, args[0], args[1:]...); err != nil {
if err := t.opts.Store.Set(string(t.generator.Output), buf.Bytes()); err != nil {
return errors.Format("%s: %w", msg, err)
}
return nil
}
func (b *BuildPlan) kustomize(
ctx context.Context,
log *slog.Logger,
t core.Transformer,
store artifact.Store,
) error {
tempDir, err := os.MkdirTemp("", "holos.kustomize")
if err != nil {
return errors.Wrap(err)
}
defer util.Remove(ctx, tempDir)
msg := fmt.Sprintf(
"could not transform %s for %s path %s",
t.Output,
b.BuildPlan.Metadata.Name,
b.Opts.Path,
)
type transformersTask struct {
taskParams
transformers []core.Transformer
wg *sync.WaitGroup
}
// Write the kustomization
data, err := yaml.Marshal(t.Kustomize.Kustomization)
if err != nil {
return errors.Format("%s: %w", msg, err)
}
path := filepath.Join(tempDir, "kustomization.yaml")
if err := os.WriteFile(path, data, 0666); err != nil {
return errors.Format("%s: %w", msg, err)
}
log.DebugContext(ctx, "wrote "+path)
// Write the inputs
for _, input := range t.Inputs {
path := string(input)
if err := store.Save(tempDir, path); err != nil {
return errors.Format("%s: %w", msg, err)
func (t transformersTask) run(ctx context.Context) error {
defer t.wg.Done()
for idx, transformer := range t.transformers {
msg := fmt.Sprintf("could not build %s/%d", t.id(), idx)
switch transformer.Kind {
case "Kustomize":
if err := kustomize(ctx, transformer, t.taskParams); err != nil {
return errors.Wrap(err)
}
case "Join":
s := make([][]byte, 0, len(transformer.Inputs))
for _, input := range transformer.Inputs {
if data, ok := t.opts.Store.Get(string(input)); ok {
s = append(s, data)
} else {
return errors.Format("%s: missing %s", msg, input)
}
}
data := bytes.Join(s, []byte(transformer.Join.Separator))
if err := t.opts.Store.Set(string(transformer.Output), data); err != nil {
return errors.Format("%s: %w", msg, err)
}
default:
return errors.Format("%s: unsupported kind %s", msg, transformer.Kind)
}
log.DebugContext(ctx, "wrote "+filepath.Join(tempDir, path))
}
return nil
}
// Execute kustomize
r, err := util.RunCmdW(ctx, b.Opts.Stderr, "kubectl", "kustomize", tempDir)
if err != nil {
kErr := r.Stderr.String()
err = errors.Format("%s: could not run kustomize: %w", msg, err)
log.ErrorContext(ctx, fmt.Sprintf("%s: stderr:\n%s", err.Error(), kErr), "err", err, "stderr", kErr)
return err
type validatorTask struct {
taskParams
validator core.Validator
wg *sync.WaitGroup
}
func (t validatorTask) run(ctx context.Context) error {
defer t.wg.Done()
msg := fmt.Sprintf("could not validate %s", t.id())
switch kind := t.validator.Kind; kind {
case "Command":
if err := validate(ctx, t.validator, t.taskParams); err != nil {
return errors.Wrap(err)
}
default:
return errors.Format("%s: unsupported kind %s", msg, kind)
}
return nil
}
// Store the artifact
if err := store.Set(string(t.Output), r.Stdout.Bytes()); err != nil {
func worker(ctx context.Context, idx int, tasks chan task) error {
log := logger.FromContext(ctx).With("worker", idx)
for {
select {
case <-ctx.Done():
return ctx.Err()
case task, ok := <-tasks:
if !ok {
log.DebugContext(ctx, fmt.Sprintf("worker %d returning: tasks chan closed", idx))
return nil
}
log.DebugContext(ctx, fmt.Sprintf("worker %d task %s starting", idx, task.id()))
if err := task.run(ctx); err != nil {
return errors.Wrap(err)
}
log.DebugContext(ctx, fmt.Sprintf("worker %d task %s finished ok", idx, task.id()))
}
}
}
func buildArtifact(ctx context.Context, idx int, artifact core.Artifact, tasks chan task, buildPlanName string, opts holos.BuildOpts) error {
var wg sync.WaitGroup
msg := fmt.Sprintf("could not build %s artifact %s", buildPlanName, artifact.Artifact)
// Process Generators concurrently
for gid, gen := range artifact.Generators {
task := generatorTask{
taskParams: taskParams{
taskName: fmt.Sprintf("artifact/%d/generator/%d", idx, gid),
buildPlanName: buildPlanName,
opts: opts,
},
generator: gen,
wg: &wg,
}
wg.Add(1)
select {
case <-ctx.Done():
return ctx.Err()
case tasks <- task:
}
}
wg.Wait()
// Process Transformers sequentially
task := transformersTask{
taskParams: taskParams{
taskName: fmt.Sprintf("artifact/%d/transformers", idx),
buildPlanName: buildPlanName,
opts: opts,
},
transformers: artifact.Transformers,
wg: &wg,
}
wg.Add(1)
select {
case <-ctx.Done():
return ctx.Err()
case tasks <- task:
}
wg.Wait()
// Process Validators concurrently
for vid, val := range artifact.Validators {
task := validatorTask{
taskParams: taskParams{
taskName: fmt.Sprintf("artifact/%d/validator/%d", idx, vid),
buildPlanName: buildPlanName,
opts: opts,
},
validator: val,
wg: &wg,
}
wg.Add(1)
select {
case <-ctx.Done():
return ctx.Err()
case tasks <- task:
}
}
wg.Wait()
// Write the final artifact
out := string(artifact.Artifact)
if err := opts.Store.Save(opts.WriteTo, out); err != nil {
return errors.Format("%s: %w", msg, err)
}
log.Debug("set artifact " + string(t.Output))
log := logger.FromContext(ctx)
log.DebugContext(ctx, fmt.Sprintf("wrote %s", filepath.Join(opts.WriteTo, out)))
return nil
}
// BuildPlan represents a component builder.
type BuildPlan struct {
core.BuildPlan
Opts holos.BuildOpts
}
func (b *BuildPlan) Build(ctx context.Context) error {
name := b.BuildPlan.Metadata.Name
path := b.Opts.Path
log := logger.FromContext(ctx).With("name", name, "path", path)
msg := fmt.Sprintf("could not build %s", name)
if b.BuildPlan.Spec.Disabled {
log.WarnContext(ctx, fmt.Sprintf("%s: disabled", msg))
return nil
}
g, ctx := errgroup.WithContext(ctx)
tasks := make(chan task)
// Start the worker pool.
for idx := 0; idx < max(1, b.Opts.Concurrency); idx++ {
g.Go(func() error {
return worker(ctx, idx, tasks)
})
}
// Start one producer that fans out to one pipeline per artifact.
g.Go(func() error {
// Close the tasks chan when the producer returns.
defer func() {
log.DebugContext(ctx, "producer returning: closing tasks chan")
close(tasks)
}()
// Separate error group for producers.
p, ctx := errgroup.WithContext(ctx)
for idx, a := range b.BuildPlan.Spec.Artifacts {
p.Go(func() error {
return buildArtifact(ctx, idx, a, tasks, b.Metadata.Name, b.Opts)
})
}
// Wait on producers to finish.
return errors.Wrap(p.Wait())
})
// Wait on workers to finish.
return g.Wait()
}
func (b *BuildPlan) Export(idx int, encoder holos.OrderedEncoder) error {
if err := encoder.Encode(idx, &b.BuildPlan); err != nil {
return errors.Wrap(err)
}
return nil
}
func (b *BuildPlan) Load(v cue.Value) error {
return errors.Wrap(v.Decode(&b.BuildPlan))
}
func marshal(list []core.Resource) (buf bytes.Buffer, err error) {
encoder := yaml.NewEncoder(&buf)
defer encoder.Close()
@@ -482,27 +501,59 @@ func marshal(list []core.Resource) (buf bytes.Buffer, err error) {
return
}
// cacheChart stores a cached copy of Chart in the chart subdirectory of path.
//
// We assume the only method responsible for writing to chartDir is cacheChart
// itself. cacheChart runs concurrently when rendering a platform.
//
// We rely on the atomicity of moving temporary directories into place on the
// same filesystem via os.Rename. If a syscall.EEXIST error occurs during
// renaming, it indicates that the cached chart already exists, which is
// expected when this function is called concurrently.
//
// TODO(jeff): Break the dependency on v1alpha5, make it work across versions as
// a utility function.
func (b *BuildPlan) cacheChart(
ctx context.Context,
log *slog.Logger,
cacheDir string,
chart core.Chart,
) error {
// onceWithLock obtains a filesystem lock with mkdir, then executes fn. If the
// lock is already locked, onceWithLock waits for it to be released then returns
// without calling fn.
func onceWithLock(log *slog.Logger, ctx context.Context, path string, fn func() error) error {
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {
return errors.Wrap(err)
}
// Obtain a lock with a timeout.
lockDir := path + ".lock"
log = log.With("lock", lockDir)
err := os.Mkdir(lockDir, 0777)
if err == nil {
defer os.RemoveAll(lockDir)
log.DebugContext(ctx, fmt.Sprintf("acquired %s", lockDir))
if err := fn(); err != nil {
return errors.Wrap(err)
}
log.DebugContext(ctx, fmt.Sprintf("released %s", lockDir))
return nil
}
// Wait until the lock is released then return.
if os.IsExist(err) {
log.DebugContext(ctx, fmt.Sprintf("blocked %s", lockDir))
stillBlocked := time.After(5 * time.Second)
deadLocked := time.After(10 * time.Second)
for {
select {
case <-stillBlocked:
log.WarnContext(ctx, fmt.Sprintf("waiting for %s to be released", lockDir))
case <-deadLocked:
log.WarnContext(ctx, fmt.Sprintf("still waiting for %s to be released (dead lock?)", lockDir))
case <-time.After(100 * time.Millisecond):
if _, err := os.Stat(lockDir); os.IsNotExist(err) {
log.DebugContext(ctx, fmt.Sprintf("unblocked %s", lockDir))
return nil
}
case <-ctx.Done():
return errors.Wrap(ctx.Err())
}
}
}
// Unexpected error
return errors.Wrap(err)
}
func cacheChart(ctx context.Context, cacheDir string, chart core.Chart, stderr io.Writer) error {
log := logger.FromContext(ctx)
// Add repositories
repo := chart.Repository
stderr := b.Opts.Stderr
if repo.URL == "" {
// repo update not needed for oci charts so this is debug instead of warn.
log.DebugContext(ctx, "skipped helm repo add and update: repo url is empty")
@@ -571,51 +622,82 @@ func (b *BuildPlan) cacheChart(
return nil
}
// onceWithLock obtains a filesystem lock with mkdir, then executes fn. If the
// lock is already locked, onceWithLock waits for it to be released then returns
// without calling fn.
func onceWithLock(log *slog.Logger, ctx context.Context, path string, fn func() error) error {
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {
func kustomize(ctx context.Context, t core.Transformer, p taskParams) error {
tempDir, err := os.MkdirTemp("", "holos.kustomize")
if err != nil {
return errors.Wrap(err)
}
defer util.Remove(ctx, tempDir)
msg := fmt.Sprintf(
"could not transform %s for %s path %s",
t.Output,
p.buildPlanName,
p.opts.Path,
)
// Obtain a lock with a timeout.
lockDir := path + ".lock"
log = log.With("lock", lockDir)
err := os.Mkdir(lockDir, 0777)
if err == nil {
defer os.RemoveAll(lockDir)
log.DebugContext(ctx, fmt.Sprintf("acquired %s", lockDir))
if err := fn(); err != nil {
return errors.Wrap(err)
}
log.DebugContext(ctx, fmt.Sprintf("released %s", lockDir))
return nil
// Write the kustomization
data, err := yaml.Marshal(t.Kustomize.Kustomization)
if err != nil {
return errors.Format("%s: %w", msg, err)
}
path := filepath.Join(tempDir, "kustomization.yaml")
if err := os.WriteFile(path, data, 0666); err != nil {
return errors.Format("%s: %w", msg, err)
}
// Wait until the lock is released then return.
if os.IsExist(err) {
log.DebugContext(ctx, fmt.Sprintf("blocked %s", lockDir))
stillBlocked := time.After(5 * time.Second)
deadLocked := time.After(10 * time.Second)
for {
select {
case <-stillBlocked:
log.WarnContext(ctx, fmt.Sprintf("waiting for %s to be released", lockDir))
case <-deadLocked:
log.WarnContext(ctx, fmt.Sprintf("still waiting for %s to be released (dead lock?)", lockDir))
case <-time.After(100 * time.Millisecond):
if _, err := os.Stat(lockDir); os.IsNotExist(err) {
log.DebugContext(ctx, fmt.Sprintf("unblocked %s", lockDir))
return nil
}
case <-ctx.Done():
return errors.Wrap(ctx.Err())
}
// Write the inputs
for _, input := range t.Inputs {
path := string(input)
if err := p.opts.Store.Save(tempDir, path); err != nil {
return errors.Format("%s: %w", msg, err)
}
}
// Unexpected error
return errors.Wrap(err)
// Execute kustomize
r, err := util.RunCmdW(ctx, p.opts.Stderr, "kubectl", "kustomize", tempDir)
if err != nil {
return errors.Format("%s: could not run kustomize: %w", msg, err)
}
// Store the artifact
if err := p.opts.Store.Set(string(t.Output), r.Stdout.Bytes()); err != nil {
return errors.Format("%s: %w", msg, err)
}
return nil
}
func validate(ctx context.Context, validator core.Validator, p taskParams) error {
store := p.opts.Store
tempDir, err := os.MkdirTemp("", "holos.validate")
if err != nil {
return errors.Wrap(err)
}
// defer util.Remove(ctx, tempDir)
msg := fmt.Sprintf("could not validate %s", p.id())
// Write the inputs
for _, input := range validator.Inputs {
path := string(input)
if err := store.Save(tempDir, path); err != nil {
return errors.Format("%s: %w", msg, err)
}
}
if len(validator.Command.Args) < 1 {
return errors.Format("%s: command args length must be at least 1", msg)
}
size := len(validator.Command.Args) + len(validator.Inputs)
args := make([]string, 0, size)
args = append(args, validator.Command.Args...)
for _, input := range validator.Inputs {
args = append(args, filepath.Join(tempDir, string(input)))
}
// Execute the validator
if _, err = util.RunCmdA(ctx, p.opts.Stderr, args[0], args[1:]...); err != nil {
return errors.Format("%s: %w", msg, err)
}
return nil
}

View File

@@ -4,6 +4,9 @@ import (
"context"
"fmt"
"log/slog"
"os"
"runtime/pprof"
"runtime/trace"
"connectrpc.com/connect"
cue "cuelang.org/go/cue/errors"
@@ -12,12 +15,48 @@ import (
"google.golang.org/genproto/googleapis/rpc/errdetails"
)
func memProfile(ctx context.Context, cfg *holos.Config) {
if format := os.Getenv("HOLOS_MEM_PROFILE"); format != "" {
f, _ := os.Create(fmt.Sprintf(format, os.Getppid(), os.Getpid()))
defer f.Close()
if err := pprof.WriteHeapProfile(f); err != nil {
_ = HandleError(ctx, err, cfg)
}
}
}
// MakeMain makes a main function for the cli or tests.
func MakeMain(options ...holos.Option) func() int {
return func() (exitCode int) {
cfg := holos.New(options...)
slog.SetDefault(cfg.Logger())
ctx := context.Background()
if format := os.Getenv("HOLOS_CPU_PROFILE"); format != "" {
f, _ := os.Create(fmt.Sprintf(format, os.Getppid(), os.Getpid()))
err := pprof.StartCPUProfile(f)
defer func() {
pprof.StopCPUProfile()
f.Close()
}()
if err != nil {
return HandleError(ctx, err, cfg)
}
}
defer memProfile(ctx, cfg)
if format := os.Getenv("HOLOS_TRACE"); format != "" {
f, _ := os.Create(fmt.Sprintf(format, os.Getppid(), os.Getpid()))
err := trace.Start(f)
defer func() {
trace.Stop()
f.Close()
}()
if err != nil {
return HandleError(ctx, err, cfg)
}
}
feature := &holos.EnvFlagger{}
if err := New(cfg, feature).ExecuteContext(ctx); err != nil {
return HandleError(ctx, err, cfg)

View File

@@ -1 +1 @@
0
1