Files
hakurei/internal/pkg/exec.go
Ophestra a6600be34a
All checks were successful
Test / Create distribution (push) Successful in 1m17s
Test / Sandbox (push) Successful in 3m5s
Test / Hakurei (push) Successful in 4m12s
Test / ShareFS (push) Successful in 4m25s
Test / Sandbox (race detector) (push) Successful in 5m39s
Test / Hakurei (race detector) (push) Successful in 6m44s
Test / Flake checks (push) Successful in 1m24s
all: use filepath
This makes package check portable, and removes nonportable behaviour from package pkg, pipewire, and system. All other packages remain nonportable due to their nature. No latency increase was observed due to this change on amd64 and arm64 linux.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2026-03-30 18:24:53 +09:00

673 lines
16 KiB
Go

package pkg
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"slices"
"strconv"
"syscall"
"time"
"unique"
"hakurei.app/check"
"hakurei.app/container"
"hakurei.app/container/seccomp"
"hakurei.app/container/std"
"hakurei.app/ext"
"hakurei.app/fhs"
"hakurei.app/message"
)
// AbsWork is the container pathname [TContext.GetWorkDir] is mounted on.
var AbsWork = fhs.AbsRoot.Append("work/")
// ExecPath is a slice of [Artifact] and the [check.Absolute] pathname to make
// it available at under in the container.
type ExecPath struct {
// Pathname in the container mount namespace.
P *check.Absolute
// Artifacts to mount on the pathname, must contain at least one [Artifact].
// If there are multiple entries or W is true, P is set up as an overlay
// mount, and entries of A must not implement [FileArtifact].
A []Artifact
// Whether to make the mount point writable via the temp directory.
W bool
}
// SetSchedIdle is whether to set [ext.SCHED_IDLE] scheduling priority.
var SetSchedIdle bool
// GetArtifactFunc is the function signature of [FContext.GetArtifact].
type GetArtifactFunc func(Artifact) (*check.Absolute, unique.Handle[Checksum])
// PromoteLayers returns artifacts with identical-by-content layers promoted to
// the highest priority instance, as if mounted via [ExecPath].
func PromoteLayers(
artifacts []Artifact,
getArtifact GetArtifactFunc,
report func(i int, d Artifact),
) []*check.Absolute {
layers := make([]*check.Absolute, 0, len(artifacts))
checksums := make(map[unique.Handle[Checksum]]struct{}, len(artifacts))
for i := range artifacts {
d := artifacts[len(artifacts)-1-i]
pathname, checksum := getArtifact(d)
if _, ok := checksums[checksum]; ok {
report(len(artifacts)-1-i, d)
continue
}
checksums[checksum] = struct{}{}
layers = append(layers, pathname)
}
slices.Reverse(layers)
return layers
}
// layers returns pathnames collected from A deduplicated via [PromoteLayers].
func (p *ExecPath) layers(
msg message.Msg,
getArtifact GetArtifactFunc,
ident func(a Artifact) unique.Handle[ID],
) []*check.Absolute {
return PromoteLayers(p.A, getArtifact, func(i int, d Artifact) {
if msg.IsVerbose() {
msg.Verbosef("promoted layer %d as %s", i, reportName(d, ident(d)))
}
})
}
// Path returns a populated [ExecPath].
func Path(pathname *check.Absolute, writable bool, a ...Artifact) ExecPath {
return ExecPath{pathname, a, writable}
}
// MustPath is like [Path], but takes a string pathname via [check.MustAbs].
func MustPath(pathname string, writable bool, a ...Artifact) ExecPath {
return ExecPath{check.MustAbs(pathname), a, writable}
}
const (
// ExecTimeoutDefault replaces out of range [NewExec] timeout values.
ExecTimeoutDefault = 15 * time.Minute
// ExecTimeoutMax is the arbitrary upper bound of [NewExec] timeout.
ExecTimeoutMax = 48 * time.Hour
)
// An execArtifact is an [Artifact] that produces output by running a program
// part of another [Artifact] in a [container] to produce its output.
//
// Methods of execArtifact does not modify any struct field or underlying arrays
// referred to by slices.
type execArtifact struct {
// Caller-supplied user-facing reporting name, guaranteed to be nonzero
// during initialisation.
name string
// Caller-supplied inner mount points.
paths []ExecPath
// Passed through to [container.Params].
dir *check.Absolute
// Passed through to [container.Params].
env []string
// Passed through to [container.Params].
path *check.Absolute
// Passed through to [container.Params].
args []string
// Duration the initial process is allowed to run. The zero value is
// equivalent to [ExecTimeoutDefault].
timeout time.Duration
// Caller-supplied exclusivity value, returned as is by IsExclusive.
exclusive bool
}
var _ fmt.Stringer = new(execArtifact)
// execNetArtifact is like execArtifact but implements [KnownChecksum] and has
// its resulting container keep the host net namespace.
type execNetArtifact struct {
checksum Checksum
execArtifact
}
var _ KnownChecksum = new(execNetArtifact)
// Checksum returns the caller-supplied checksum.
func (a *execNetArtifact) Checksum() Checksum { return a.checksum }
// Kind returns the hardcoded [Kind] constant.
func (*execNetArtifact) Kind() Kind { return KindExecNet }
// Cure cures the [Artifact] in the container described by the caller. The
// container retains host networking.
func (a *execNetArtifact) Cure(f *FContext) error {
return a.cure(f, true)
}
// NewExec returns a new [Artifact] that executes the program path in a
// container with specified paths bind mounted read-only in order. A private
// instance of /proc and /dev is made available to the container.
//
// The working and temporary directories are both created and mounted writable
// on [AbsWork] and [fhs.AbsTmp] respectively. If one or more paths target
// [AbsWork], the final entry is set up as a writable overlay mount on /work for
// which the upperdir is the host side work directory. In this configuration,
// the W field is ignored, and the program must avoid causing whiteout files to
// be created. Cure fails if upperdir ends up with entries other than directory,
// regular or symlink.
//
// If checksum is non-nil, the resulting [Artifact] implements [KnownChecksum]
// and its container runs in the host net namespace.
//
// The container is allowed to run for the specified duration before the initial
// process and all processes originating from it is terminated. A zero or
// negative timeout value is equivalent tp [ExecTimeoutDefault], a timeout value
// greater than [ExecTimeoutMax] is equivalent to [ExecTimeoutMax].
//
// The user-facing name and exclusivity value are not accessible from the
// container and does not affect curing outcome. Because of this, it is omitted
// from parameter data for computing identifier.
func NewExec(
name string,
checksum *Checksum,
timeout time.Duration,
exclusive bool,
dir *check.Absolute,
env []string,
pathname *check.Absolute,
args []string,
paths ...ExecPath,
) Artifact {
if name == "" {
name = "exec-" + filepath.Base(pathname.String())
}
if timeout <= 0 {
timeout = ExecTimeoutDefault
}
if timeout > ExecTimeoutMax {
timeout = ExecTimeoutMax
}
a := execArtifact{name, paths, dir, env, pathname, args, timeout, exclusive}
if checksum == nil {
return &a
}
return &execNetArtifact{*checksum, a}
}
// Kind returns the hardcoded [Kind] constant.
func (*execArtifact) Kind() Kind { return KindExec }
// Params writes paths, executable pathname and args.
func (a *execArtifact) Params(ctx *IContext) {
ctx.WriteString(a.name)
ctx.WriteUint32(uint32(len(a.paths)))
for _, p := range a.paths {
if p.P != nil {
ctx.WriteString(p.P.String())
} else {
ctx.WriteString("invalid P\x00")
}
ctx.WriteUint32(uint32(len(p.A)))
for _, d := range p.A {
ctx.WriteIdent(d)
}
if p.W {
ctx.WriteUint32(1)
} else {
ctx.WriteUint32(0)
}
}
ctx.WriteString(a.dir.String())
ctx.WriteUint32(uint32(len(a.env)))
for _, e := range a.env {
ctx.WriteString(e)
}
ctx.WriteString(a.path.String())
ctx.WriteUint32(uint32(len(a.args)))
for _, arg := range a.args {
ctx.WriteString(arg)
}
ctx.WriteUint32(uint32(a.timeout & 0xffffffff))
ctx.WriteUint32(uint32(a.timeout >> 32))
if a.exclusive {
ctx.WriteUint32(1)
} else {
ctx.WriteUint32(0)
}
}
// readExecArtifact interprets IR values and returns the address of execArtifact
// or execNetArtifact.
func readExecArtifact(r *IRReader, net bool) Artifact {
r.DiscardAll()
name := r.ReadString()
sz := r.ReadUint32()
if sz > irMaxDeps {
panic(ErrIRDepend)
}
paths := make([]ExecPath, sz)
for i := range paths {
paths[i].P = check.MustAbs(r.ReadString())
sz = r.ReadUint32()
if sz > irMaxDeps {
panic(ErrIRDepend)
}
paths[i].A = make([]Artifact, sz)
for j := range paths[i].A {
paths[i].A[j] = r.ReadIdent()
}
paths[i].W = r.ReadUint32() != 0
}
dir := check.MustAbs(r.ReadString())
sz = r.ReadUint32()
if sz > irMaxValues {
panic(ErrIRValues)
}
env := make([]string, sz)
for i := range env {
env[i] = r.ReadString()
}
pathname := check.MustAbs(r.ReadString())
sz = r.ReadUint32()
if sz > irMaxValues {
panic(ErrIRValues)
}
args := make([]string, sz)
for i := range args {
args[i] = r.ReadString()
}
timeout := time.Duration(r.ReadUint32())
timeout |= time.Duration(r.ReadUint32()) << 32
exclusive := r.ReadUint32() != 0
checksum, ok := r.Finalise()
var checksumP *Checksum
if net {
if !ok {
panic(ErrExpectedChecksum)
}
checksumVal := checksum.Value()
checksumP = &checksumVal
} else {
if ok {
panic(ErrUnexpectedChecksum)
}
}
return NewExec(
name, checksumP, timeout, exclusive, dir, env, pathname, args, paths...,
)
}
func init() {
register(KindExec,
func(r *IRReader) Artifact { return readExecArtifact(r, false) })
register(KindExecNet,
func(r *IRReader) Artifact { return readExecArtifact(r, true) })
}
// Dependencies returns a slice of all artifacts collected from caller-supplied
// [ExecPath].
func (a *execArtifact) Dependencies() []Artifact {
artifacts := make([][]Artifact, 0, len(a.paths))
for _, p := range a.paths {
artifacts = append(artifacts, p.A)
}
return slices.Concat(artifacts...)
}
// IsExclusive returns the caller-supplied exclusivity value.
func (a *execArtifact) IsExclusive() bool { return a.exclusive }
// String returns the caller-supplied reporting name.
func (a *execArtifact) String() string { return a.name }
// Cure cures the [Artifact] in the container described by the caller.
func (a *execArtifact) Cure(f *FContext) (err error) {
return a.cure(f, false)
}
const (
// execWaitDelay is passed through to [container.Params].
execWaitDelay = time.Nanosecond
)
// scanVerbose prefixes program output for a verbose [message.Msg].
func scanVerbose(
msg message.Msg,
cancel context.CancelFunc,
done chan<- struct{},
prefix string,
r io.Reader,
) {
defer close(done)
s := bufio.NewScanner(r)
s.Buffer(
make([]byte, bufio.MaxScanTokenSize),
bufio.MaxScanTokenSize<<12,
)
for s.Scan() {
msg.Verbose(prefix, s.Text())
}
if err := s.Err(); err != nil && !errors.Is(err, os.ErrClosed) {
cancel()
msg.Verbose("*"+prefix, err)
}
}
var (
// ErrInvalidPaths is returned for an [Artifact] of [KindExec] or
// [KindExecNet] specified with invalid paths.
ErrInvalidPaths = errors.New("invalid mount point")
)
// SeccompPresets is the [seccomp] presets used by exec artifacts.
const SeccompPresets = std.PresetStrict &
^(std.PresetDenyNS | std.PresetDenyDevel)
// makeContainer sets up the specified temp and work directories and returns the
// corresponding [container.Container] that would have run for cure.
func (a *execArtifact) makeContainer(
ctx context.Context,
msg message.Msg,
hostNet bool,
temp, work *check.Absolute,
getArtifact GetArtifactFunc,
ident func(a Artifact) unique.Handle[ID],
) (z *container.Container, err error) {
overlayWorkIndex := -1
for i, p := range a.paths {
if p.P == nil || len(p.A) == 0 {
return nil, ErrInvalidPaths
}
if p.P.Is(AbsWork) {
overlayWorkIndex = i
}
}
var artifactCount int
for _, p := range a.paths {
artifactCount += len(p.A)
}
z = container.New(ctx, msg)
z.WaitDelay = execWaitDelay
z.SeccompPresets = SeccompPresets
z.SeccompFlags |= seccomp.AllowMultiarch
z.ParentPerm = 0700
z.HostNet = hostNet
z.Hostname = "cure"
z.SetScheduler = SetSchedIdle
z.SchedPolicy = ext.SCHED_IDLE
if z.HostNet {
z.Hostname = "cure-net"
}
z.Uid, z.Gid = (1<<10)-1, (1<<10)-1
z.Dir, z.Env, z.Path, z.Args = a.dir, a.env, a.path, a.args
z.Grow(len(a.paths) + 4)
for i, b := range a.paths {
if i == overlayWorkIndex {
if err = os.MkdirAll(work.String(), 0700); err != nil {
return
}
tempWork := temp.Append(".work")
if err = os.MkdirAll(tempWork.String(), 0700); err != nil {
return
}
z.Overlay(
AbsWork,
work,
tempWork,
b.layers(msg, getArtifact, ident)...,
)
continue
}
if a.paths[i].W {
tempUpper, tempWork := temp.Append(
".upper", strconv.Itoa(i),
), temp.Append(
".work", strconv.Itoa(i),
)
if err = os.MkdirAll(tempUpper.String(), 0700); err != nil {
return
}
if err = os.MkdirAll(tempWork.String(), 0700); err != nil {
return
}
z.Overlay(b.P, tempUpper, tempWork, b.layers(msg, getArtifact, ident)...)
} else if len(b.A) == 1 {
pathname, _ := getArtifact(b.A[0])
z.Bind(pathname, b.P, 0)
} else {
z.OverlayReadonly(b.P, b.layers(msg, getArtifact, ident)...)
}
}
if overlayWorkIndex < 0 {
z.Bind(
work,
AbsWork,
std.BindWritable|std.BindEnsure,
)
}
z.Bind(
temp,
fhs.AbsTmp,
std.BindWritable|std.BindEnsure,
)
z.Proc(fhs.AbsProc).Dev(fhs.AbsDev, true)
return
}
var (
// ErrExecBusy is returned entering [Cache.EnterExec] while another
// goroutine has not yet returned from it.
ErrExecBusy = errors.New("scratch directories in use")
// ErrNotExec is returned for unsupported implementations of [Artifact]
// passed to [Cache.EnterExec].
ErrNotExec = errors.New("attempting to run a non-exec artifact")
)
// EnterExec runs the container of an [Artifact] of [KindExec] or [KindExecNet]
// with its entry point, argument, and standard streams replaced with values
// supplied by the caller.
func (c *Cache) EnterExec(
ctx context.Context,
a Artifact,
retainSession bool,
stdin io.Reader,
stdout, stderr io.Writer,
path *check.Absolute,
args ...string,
) (err error) {
if !c.inExec.CompareAndSwap(false, true) {
return ErrExecBusy
}
defer c.inExec.Store(false)
var hostNet bool
var e *execArtifact
switch f := a.(type) {
case *execArtifact:
e = f
case *execNetArtifact:
e = &f.execArtifact
hostNet = true
default:
return ErrNotExec
}
deps := Collect(a.Dependencies())
if _, _, err = c.Cure(&deps); err == nil {
return errors.New("unreachable")
} else if !IsCollected(err) {
return
}
dm := make(map[Artifact]cureRes)
for i, p := range deps {
var res cureRes
res.pathname, res.checksum, err = c.Cure(p)
if err != nil {
return
}
dm[deps[i]] = res
}
scratch := c.base.Append(dirExecScratch)
temp, work := scratch.Append("temp"), scratch.Append("work")
// work created during makeContainer
if err = os.MkdirAll(temp.String(), 0700); err != nil {
return
}
defer func() {
if chmodErr, removeErr := removeAll(scratch); chmodErr != nil || removeErr != nil {
err = errors.Join(err, chmodErr, removeErr)
}
}()
var z *container.Container
z, err = e.makeContainer(
ctx, c.msg,
hostNet,
temp, work,
func(a Artifact) (*check.Absolute, unique.Handle[Checksum]) {
if res, ok := dm[a]; ok {
return res.pathname, res.checksum
}
panic(InvalidLookupError(c.Ident(a).Value()))
},
c.Ident,
)
if err != nil {
return
}
z.Stdin, z.Stdout, z.Stderr = stdin, stdout, stderr
z.Path, z.Args = path, args
z.RetainSession = retainSession
if err = z.Start(); err != nil {
return
}
if err = z.Serve(); err != nil {
return
}
return z.Wait()
}
// cure is like Cure but allows optional host net namespace.
func (a *execArtifact) cure(f *FContext, hostNet bool) (err error) {
ctx, cancel := context.WithTimeout(f.Unwrap(), a.timeout)
defer cancel()
msg := f.GetMessage()
var z *container.Container
if z, err = a.makeContainer(
ctx, msg, hostNet,
f.GetTempDir(), f.GetWorkDir(),
f.GetArtifact,
f.cache.Ident,
); err != nil {
return
}
var status io.Writer
if status, err = f.GetStatusWriter(); err != nil {
return
}
if msg.IsVerbose() {
var stdout, stderr io.ReadCloser
if stdout, err = z.StdoutPipe(); err != nil {
return
}
if stderr, err = z.StderrPipe(); err != nil {
_ = stdout.Close()
return
}
defer func() {
if err != nil && !errors.As(err, new(*exec.ExitError)) {
_ = stdout.Close()
_ = stderr.Close()
}
}()
brStdout, brStderr := f.cache.getReader(stdout), f.cache.getReader(stderr)
stdoutDone, stderrDone := make(chan struct{}), make(chan struct{})
go scanVerbose(
msg, cancel, stdoutDone,
"("+a.name+":1)",
io.TeeReader(brStdout, status),
)
go scanVerbose(
msg, cancel, stderrDone,
"("+a.name+":2)",
io.TeeReader(brStderr, status),
)
defer func() {
<-stdoutDone
<-stderrDone
f.cache.putReader(brStdout)
f.cache.putReader(brStderr)
}()
} else {
z.Stdout, z.Stderr = status, status
}
if err = z.Start(); err != nil {
return
}
if err = z.Serve(); err != nil {
return
}
if err = z.Wait(); err != nil {
return
}
// do not allow empty directories to succeed
for {
err = syscall.Rmdir(f.GetWorkDir().String())
if err != syscall.EINTR {
break
}
}
if err != nil && errors.Is(err, syscall.ENOTEMPTY) {
err = nil
}
return
}