Files
hakurei/internal/pkg/exec.go
Ophestra e661260607
All checks were successful
Test / Create distribution (push) Successful in 1m4s
Test / Sandbox (push) Successful in 2m40s
Test / Hakurei (push) Successful in 3m38s
Test / ShareFS (push) Successful in 3m42s
Test / Sandbox (race detector) (push) Successful in 5m21s
Test / Hakurei (race detector) (push) Successful in 6m20s
Test / Flake checks (push) Successful in 1m22s
internal/pkg: enter exec container
This enables much easier troubleshooting of failing cures.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2026-03-26 15:05:04 +09:00

673 lines
16 KiB
Go

package pkg
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path"
"slices"
"strconv"
"syscall"
"time"
"unique"
"hakurei.app/check"
"hakurei.app/container"
"hakurei.app/container/seccomp"
"hakurei.app/container/std"
"hakurei.app/ext"
"hakurei.app/fhs"
"hakurei.app/message"
)
// AbsWork is the container pathname [TContext.GetWorkDir] is mounted on.
var AbsWork = fhs.AbsRoot.Append("work/")
// ExecPath is a slice of [Artifact] and the [check.Absolute] pathname to make
// it available at under in the container.
type ExecPath struct {
// Pathname in the container mount namespace.
P *check.Absolute
// Artifacts to mount on the pathname, must contain at least one [Artifact].
// If there are multiple entries or W is true, P is set up as an overlay
// mount, and entries of A must not implement [FileArtifact].
A []Artifact
// Whether to make the mount point writable via the temp directory.
W bool
}
// SetSchedIdle is whether to set [ext.SCHED_IDLE] scheduling priority.
var SetSchedIdle bool
// GetArtifactFunc is the function signature of [FContext.GetArtifact].
type GetArtifactFunc func(Artifact) (*check.Absolute, unique.Handle[Checksum])
// PromoteLayers returns artifacts with identical-by-content layers promoted to
// the highest priority instance, as if mounted via [ExecPath].
func PromoteLayers(
artifacts []Artifact,
getArtifact GetArtifactFunc,
report func(i int, d Artifact),
) []*check.Absolute {
layers := make([]*check.Absolute, 0, len(artifacts))
checksums := make(map[unique.Handle[Checksum]]struct{}, len(artifacts))
for i := range artifacts {
d := artifacts[len(artifacts)-1-i]
pathname, checksum := getArtifact(d)
if _, ok := checksums[checksum]; ok {
report(len(artifacts)-1-i, d)
continue
}
checksums[checksum] = struct{}{}
layers = append(layers, pathname)
}
slices.Reverse(layers)
return layers
}
// layers returns pathnames collected from A deduplicated via [PromoteLayers].
func (p *ExecPath) layers(
msg message.Msg,
getArtifact GetArtifactFunc,
ident func(a Artifact) unique.Handle[ID],
) []*check.Absolute {
return PromoteLayers(p.A, getArtifact, func(i int, d Artifact) {
if msg.IsVerbose() {
msg.Verbosef("promoted layer %d as %s", i, reportName(d, ident(d)))
}
})
}
// Path returns a populated [ExecPath].
func Path(pathname *check.Absolute, writable bool, a ...Artifact) ExecPath {
return ExecPath{pathname, a, writable}
}
// MustPath is like [Path], but takes a string pathname via [check.MustAbs].
func MustPath(pathname string, writable bool, a ...Artifact) ExecPath {
return ExecPath{check.MustAbs(pathname), a, writable}
}
const (
// ExecTimeoutDefault replaces out of range [NewExec] timeout values.
ExecTimeoutDefault = 15 * time.Minute
// ExecTimeoutMax is the arbitrary upper bound of [NewExec] timeout.
ExecTimeoutMax = 48 * time.Hour
)
// An execArtifact is an [Artifact] that produces output by running a program
// part of another [Artifact] in a [container] to produce its output.
//
// Methods of execArtifact does not modify any struct field or underlying arrays
// referred to by slices.
type execArtifact struct {
// Caller-supplied user-facing reporting name, guaranteed to be nonzero
// during initialisation.
name string
// Caller-supplied inner mount points.
paths []ExecPath
// Passed through to [container.Params].
dir *check.Absolute
// Passed through to [container.Params].
env []string
// Passed through to [container.Params].
path *check.Absolute
// Passed through to [container.Params].
args []string
// Duration the initial process is allowed to run. The zero value is
// equivalent to [ExecTimeoutDefault].
timeout time.Duration
// Caller-supplied exclusivity value, returned as is by IsExclusive.
exclusive bool
}
var _ fmt.Stringer = new(execArtifact)
// execNetArtifact is like execArtifact but implements [KnownChecksum] and has
// its resulting container keep the host net namespace.
type execNetArtifact struct {
checksum Checksum
execArtifact
}
var _ KnownChecksum = new(execNetArtifact)
// Checksum returns the caller-supplied checksum.
func (a *execNetArtifact) Checksum() Checksum { return a.checksum }
// Kind returns the hardcoded [Kind] constant.
func (*execNetArtifact) Kind() Kind { return KindExecNet }
// Cure cures the [Artifact] in the container described by the caller. The
// container retains host networking.
func (a *execNetArtifact) Cure(f *FContext) error {
return a.cure(f, true)
}
// NewExec returns a new [Artifact] that executes the program path in a
// container with specified paths bind mounted read-only in order. A private
// instance of /proc and /dev is made available to the container.
//
// The working and temporary directories are both created and mounted writable
// on [AbsWork] and [fhs.AbsTmp] respectively. If one or more paths target
// [AbsWork], the final entry is set up as a writable overlay mount on /work for
// which the upperdir is the host side work directory. In this configuration,
// the W field is ignored, and the program must avoid causing whiteout files to
// be created. Cure fails if upperdir ends up with entries other than directory,
// regular or symlink.
//
// If checksum is non-nil, the resulting [Artifact] implements [KnownChecksum]
// and its container runs in the host net namespace.
//
// The container is allowed to run for the specified duration before the initial
// process and all processes originating from it is terminated. A zero or
// negative timeout value is equivalent tp [ExecTimeoutDefault], a timeout value
// greater than [ExecTimeoutMax] is equivalent to [ExecTimeoutMax].
//
// The user-facing name and exclusivity value are not accessible from the
// container and does not affect curing outcome. Because of this, it is omitted
// from parameter data for computing identifier.
func NewExec(
name string,
checksum *Checksum,
timeout time.Duration,
exclusive bool,
dir *check.Absolute,
env []string,
pathname *check.Absolute,
args []string,
paths ...ExecPath,
) Artifact {
if name == "" {
name = "exec-" + path.Base(pathname.String())
}
if timeout <= 0 {
timeout = ExecTimeoutDefault
}
if timeout > ExecTimeoutMax {
timeout = ExecTimeoutMax
}
a := execArtifact{name, paths, dir, env, pathname, args, timeout, exclusive}
if checksum == nil {
return &a
}
return &execNetArtifact{*checksum, a}
}
// Kind returns the hardcoded [Kind] constant.
func (*execArtifact) Kind() Kind { return KindExec }
// Params writes paths, executable pathname and args.
func (a *execArtifact) Params(ctx *IContext) {
ctx.WriteString(a.name)
ctx.WriteUint32(uint32(len(a.paths)))
for _, p := range a.paths {
if p.P != nil {
ctx.WriteString(p.P.String())
} else {
ctx.WriteString("invalid P\x00")
}
ctx.WriteUint32(uint32(len(p.A)))
for _, d := range p.A {
ctx.WriteIdent(d)
}
if p.W {
ctx.WriteUint32(1)
} else {
ctx.WriteUint32(0)
}
}
ctx.WriteString(a.dir.String())
ctx.WriteUint32(uint32(len(a.env)))
for _, e := range a.env {
ctx.WriteString(e)
}
ctx.WriteString(a.path.String())
ctx.WriteUint32(uint32(len(a.args)))
for _, arg := range a.args {
ctx.WriteString(arg)
}
ctx.WriteUint32(uint32(a.timeout & 0xffffffff))
ctx.WriteUint32(uint32(a.timeout >> 32))
if a.exclusive {
ctx.WriteUint32(1)
} else {
ctx.WriteUint32(0)
}
}
// readExecArtifact interprets IR values and returns the address of execArtifact
// or execNetArtifact.
func readExecArtifact(r *IRReader, net bool) Artifact {
r.DiscardAll()
name := r.ReadString()
sz := r.ReadUint32()
if sz > irMaxDeps {
panic(ErrIRDepend)
}
paths := make([]ExecPath, sz)
for i := range paths {
paths[i].P = check.MustAbs(r.ReadString())
sz = r.ReadUint32()
if sz > irMaxDeps {
panic(ErrIRDepend)
}
paths[i].A = make([]Artifact, sz)
for j := range paths[i].A {
paths[i].A[j] = r.ReadIdent()
}
paths[i].W = r.ReadUint32() != 0
}
dir := check.MustAbs(r.ReadString())
sz = r.ReadUint32()
if sz > irMaxValues {
panic(ErrIRValues)
}
env := make([]string, sz)
for i := range env {
env[i] = r.ReadString()
}
pathname := check.MustAbs(r.ReadString())
sz = r.ReadUint32()
if sz > irMaxValues {
panic(ErrIRValues)
}
args := make([]string, sz)
for i := range args {
args[i] = r.ReadString()
}
timeout := time.Duration(r.ReadUint32())
timeout |= time.Duration(r.ReadUint32()) << 32
exclusive := r.ReadUint32() != 0
checksum, ok := r.Finalise()
var checksumP *Checksum
if net {
if !ok {
panic(ErrExpectedChecksum)
}
checksumVal := checksum.Value()
checksumP = &checksumVal
} else {
if ok {
panic(ErrUnexpectedChecksum)
}
}
return NewExec(
name, checksumP, timeout, exclusive, dir, env, pathname, args, paths...,
)
}
func init() {
register(KindExec,
func(r *IRReader) Artifact { return readExecArtifact(r, false) })
register(KindExecNet,
func(r *IRReader) Artifact { return readExecArtifact(r, true) })
}
// Dependencies returns a slice of all artifacts collected from caller-supplied
// [ExecPath].
func (a *execArtifact) Dependencies() []Artifact {
artifacts := make([][]Artifact, 0, len(a.paths))
for _, p := range a.paths {
artifacts = append(artifacts, p.A)
}
return slices.Concat(artifacts...)
}
// IsExclusive returns the caller-supplied exclusivity value.
func (a *execArtifact) IsExclusive() bool { return a.exclusive }
// String returns the caller-supplied reporting name.
func (a *execArtifact) String() string { return a.name }
// Cure cures the [Artifact] in the container described by the caller.
func (a *execArtifact) Cure(f *FContext) (err error) {
return a.cure(f, false)
}
const (
// execWaitDelay is passed through to [container.Params].
execWaitDelay = time.Nanosecond
)
// scanVerbose prefixes program output for a verbose [message.Msg].
func scanVerbose(
msg message.Msg,
cancel context.CancelFunc,
done chan<- struct{},
prefix string,
r io.Reader,
) {
defer close(done)
s := bufio.NewScanner(r)
s.Buffer(
make([]byte, bufio.MaxScanTokenSize),
bufio.MaxScanTokenSize<<12,
)
for s.Scan() {
msg.Verbose(prefix, s.Text())
}
if err := s.Err(); err != nil && !errors.Is(err, os.ErrClosed) {
cancel()
msg.Verbose("*"+prefix, err)
}
}
var (
// ErrInvalidPaths is returned for an [Artifact] of [KindExec] or
// [KindExecNet] specified with invalid paths.
ErrInvalidPaths = errors.New("invalid mount point")
)
// SeccompPresets is the [seccomp] presets used by exec artifacts.
const SeccompPresets = std.PresetStrict &
^(std.PresetDenyNS | std.PresetDenyDevel)
// makeContainer sets up the specified temp and work directories and returns the
// corresponding [container.Container] that would have run for cure.
func (a *execArtifact) makeContainer(
ctx context.Context,
msg message.Msg,
hostNet bool,
temp, work *check.Absolute,
getArtifact GetArtifactFunc,
ident func(a Artifact) unique.Handle[ID],
) (z *container.Container, err error) {
overlayWorkIndex := -1
for i, p := range a.paths {
if p.P == nil || len(p.A) == 0 {
return nil, ErrInvalidPaths
}
if p.P.Is(AbsWork) {
overlayWorkIndex = i
}
}
var artifactCount int
for _, p := range a.paths {
artifactCount += len(p.A)
}
z = container.New(ctx, msg)
z.WaitDelay = execWaitDelay
z.SeccompPresets = SeccompPresets
z.SeccompFlags |= seccomp.AllowMultiarch
z.ParentPerm = 0700
z.HostNet = hostNet
z.Hostname = "cure"
z.SetScheduler = SetSchedIdle
z.SchedPolicy = ext.SCHED_IDLE
if z.HostNet {
z.Hostname = "cure-net"
}
z.Uid, z.Gid = (1<<10)-1, (1<<10)-1
z.Dir, z.Env, z.Path, z.Args = a.dir, a.env, a.path, a.args
z.Grow(len(a.paths) + 4)
for i, b := range a.paths {
if i == overlayWorkIndex {
if err = os.MkdirAll(work.String(), 0700); err != nil {
return
}
tempWork := temp.Append(".work")
if err = os.MkdirAll(tempWork.String(), 0700); err != nil {
return
}
z.Overlay(
AbsWork,
work,
tempWork,
b.layers(msg, getArtifact, ident)...,
)
continue
}
if a.paths[i].W {
tempUpper, tempWork := temp.Append(
".upper", strconv.Itoa(i),
), temp.Append(
".work", strconv.Itoa(i),
)
if err = os.MkdirAll(tempUpper.String(), 0700); err != nil {
return
}
if err = os.MkdirAll(tempWork.String(), 0700); err != nil {
return
}
z.Overlay(b.P, tempUpper, tempWork, b.layers(msg, getArtifact, ident)...)
} else if len(b.A) == 1 {
pathname, _ := getArtifact(b.A[0])
z.Bind(pathname, b.P, 0)
} else {
z.OverlayReadonly(b.P, b.layers(msg, getArtifact, ident)...)
}
}
if overlayWorkIndex < 0 {
z.Bind(
work,
AbsWork,
std.BindWritable|std.BindEnsure,
)
}
z.Bind(
temp,
fhs.AbsTmp,
std.BindWritable|std.BindEnsure,
)
z.Proc(fhs.AbsProc).Dev(fhs.AbsDev, true)
return
}
var (
// ErrExecBusy is returned entering [Cache.EnterExec] while another
// goroutine has not yet returned from it.
ErrExecBusy = errors.New("scratch directories in use")
// ErrNotExec is returned for unsupported implementations of [Artifact]
// passed to [Cache.EnterExec].
ErrNotExec = errors.New("attempting to run a non-exec artifact")
)
// EnterExec runs the container of an [Artifact] of [KindExec] or [KindExecNet]
// with its entry point, argument, and standard streams replaced with values
// supplied by the caller.
func (c *Cache) EnterExec(
ctx context.Context,
a Artifact,
retainSession bool,
stdin io.Reader,
stdout, stderr io.Writer,
path *check.Absolute,
args ...string,
) (err error) {
if !c.inExec.CompareAndSwap(false, true) {
return ErrExecBusy
}
defer c.inExec.Store(false)
var hostNet bool
var e *execArtifact
switch f := a.(type) {
case *execArtifact:
e = f
case *execNetArtifact:
e = &f.execArtifact
hostNet = true
default:
return ErrNotExec
}
deps := Collect(a.Dependencies())
if _, _, err = c.Cure(&deps); err == nil {
return errors.New("unreachable")
} else if !IsCollected(err) {
return
}
dm := make(map[Artifact]cureRes)
for i, p := range deps {
var res cureRes
res.pathname, res.checksum, err = c.Cure(p)
if err != nil {
return
}
dm[deps[i]] = res
}
scratch := c.base.Append(dirExecScratch)
temp, work := scratch.Append("temp"), scratch.Append("work")
// work created during makeContainer
if err = os.MkdirAll(temp.String(), 0700); err != nil {
return
}
defer func() {
if chmodErr, removeErr := removeAll(scratch); chmodErr != nil || removeErr != nil {
err = errors.Join(err, chmodErr, removeErr)
}
}()
var z *container.Container
z, err = e.makeContainer(
ctx, c.msg,
hostNet,
temp, work,
func(a Artifact) (*check.Absolute, unique.Handle[Checksum]) {
if res, ok := dm[a]; ok {
return res.pathname, res.checksum
}
panic(InvalidLookupError(c.Ident(a).Value()))
},
c.Ident,
)
if err != nil {
return
}
z.Stdin, z.Stdout, z.Stderr = stdin, stdout, stderr
z.Path, z.Args = path, args
z.RetainSession = retainSession
if err = z.Start(); err != nil {
return
}
if err = z.Serve(); err != nil {
return
}
return z.Wait()
}
// cure is like Cure but allows optional host net namespace.
func (a *execArtifact) cure(f *FContext, hostNet bool) (err error) {
ctx, cancel := context.WithTimeout(f.Unwrap(), a.timeout)
defer cancel()
msg := f.GetMessage()
var z *container.Container
if z, err = a.makeContainer(
ctx, msg, hostNet,
f.GetTempDir(), f.GetWorkDir(),
f.GetArtifact,
f.cache.Ident,
); err != nil {
return
}
var status io.Writer
if status, err = f.GetStatusWriter(); err != nil {
return
}
if msg.IsVerbose() {
var stdout, stderr io.ReadCloser
if stdout, err = z.StdoutPipe(); err != nil {
return
}
if stderr, err = z.StderrPipe(); err != nil {
_ = stdout.Close()
return
}
defer func() {
if err != nil && !errors.As(err, new(*exec.ExitError)) {
_ = stdout.Close()
_ = stderr.Close()
}
}()
brStdout, brStderr := f.cache.getReader(stdout), f.cache.getReader(stderr)
stdoutDone, stderrDone := make(chan struct{}), make(chan struct{})
go scanVerbose(
msg, cancel, stdoutDone,
"("+a.name+":1)",
io.TeeReader(brStdout, status),
)
go scanVerbose(
msg, cancel, stderrDone,
"("+a.name+":2)",
io.TeeReader(brStderr, status),
)
defer func() {
<-stdoutDone
<-stderrDone
f.cache.putReader(brStdout)
f.cache.putReader(brStderr)
}()
} else {
z.Stdout, z.Stderr = status, status
}
if err = z.Start(); err != nil {
return
}
if err = z.Serve(); err != nil {
return
}
if err = z.Wait(); err != nil {
return
}
// do not allow empty directories to succeed
for {
err = syscall.Rmdir(f.GetWorkDir().String())
if err != syscall.EINTR {
break
}
}
if err != nil && errors.Is(err, syscall.ENOTEMPTY) {
err = nil
}
return
}