internal/pkg: exclusive artifacts
All checks were successful
Test / Create distribution (push) Successful in 50s
Test / Sandbox (push) Successful in 2m34s
Test / Hakurei (push) Successful in 3m46s
Test / ShareFS (push) Successful in 3m59s
Test / Hpkg (push) Successful in 4m32s
Test / Sandbox (race detector) (push) Successful in 5m0s
Test / Hakurei (race detector) (push) Successful in 6m8s
Test / Flake checks (push) Successful in 1m36s
All checks were successful
Test / Create distribution (push) Successful in 50s
Test / Sandbox (push) Successful in 2m34s
Test / Hakurei (push) Successful in 3m46s
Test / ShareFS (push) Successful in 3m59s
Test / Hpkg (push) Successful in 4m32s
Test / Sandbox (race detector) (push) Successful in 5m0s
Test / Hakurei (race detector) (push) Successful in 6m8s
Test / Flake checks (push) Successful in 1m36s
This alleviates scheduler overhead when curing many artifacts. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
@@ -105,6 +105,9 @@ type execArtifact struct {
|
||||
// equivalent to execTimeoutDefault. This value is never encoded in Params
|
||||
// because it cannot affect outcome.
|
||||
timeout time.Duration
|
||||
|
||||
// Caller-supplied exclusivity value, returned as is by IsExclusive.
|
||||
exclusive bool
|
||||
}
|
||||
|
||||
var _ fmt.Stringer = new(execArtifact)
|
||||
@@ -123,7 +126,7 @@ var _ KnownChecksum = new(execNetArtifact)
|
||||
func (a *execNetArtifact) Checksum() Checksum { return a.checksum }
|
||||
|
||||
// Kind returns the hardcoded [Kind] constant.
|
||||
func (a *execNetArtifact) Kind() Kind { return KindExecNet }
|
||||
func (*execNetArtifact) Kind() Kind { return KindExecNet }
|
||||
|
||||
// Params is [Checksum] concatenated with [KindExec] params.
|
||||
func (a *execNetArtifact) Params(ctx *IContext) {
|
||||
@@ -157,13 +160,14 @@ func (a *execNetArtifact) Cure(f *FContext) error {
|
||||
// negative timeout value is equivalent tp [ExecTimeoutDefault], a timeout value
|
||||
// greater than [ExecTimeoutMax] is equivalent to [ExecTimeoutMax].
|
||||
//
|
||||
// The user-facing name is not accessible from the container and does not
|
||||
// affect curing outcome. Because of this, it is omitted from parameter data
|
||||
// for computing identifier.
|
||||
// The user-facing name and exclusivity value are not accessible from the
|
||||
// container and does not affect curing outcome. Because of this, it is omitted
|
||||
// from parameter data for computing identifier.
|
||||
func NewExec(
|
||||
name string,
|
||||
checksum *Checksum,
|
||||
timeout time.Duration,
|
||||
exclusive bool,
|
||||
|
||||
dir *check.Absolute,
|
||||
env []string,
|
||||
@@ -181,7 +185,7 @@ func NewExec(
|
||||
if timeout > ExecTimeoutMax {
|
||||
timeout = ExecTimeoutMax
|
||||
}
|
||||
a := execArtifact{name, paths, dir, env, pathname, args, timeout}
|
||||
a := execArtifact{name, paths, dir, env, pathname, args, timeout, exclusive}
|
||||
if checksum == nil {
|
||||
return &a
|
||||
}
|
||||
@@ -189,7 +193,7 @@ func NewExec(
|
||||
}
|
||||
|
||||
// Kind returns the hardcoded [Kind] constant.
|
||||
func (a *execArtifact) Kind() Kind { return KindExec }
|
||||
func (*execArtifact) Kind() Kind { return KindExec }
|
||||
|
||||
// Params writes paths, executable pathname and args.
|
||||
func (a *execArtifact) Params(ctx *IContext) {
|
||||
@@ -237,6 +241,9 @@ func (a *execArtifact) Dependencies() []Artifact {
|
||||
return slices.Concat(artifacts...)
|
||||
}
|
||||
|
||||
// IsExclusive returns the caller-supplied exclusivity value.
|
||||
func (a *execArtifact) IsExclusive() bool { return a.exclusive }
|
||||
|
||||
// String returns the caller-supplied reporting name.
|
||||
func (a *execArtifact) String() string { return a.name }
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ func TestExec(t *testing.T) {
|
||||
|
||||
cureMany(t, c, []cureStep{
|
||||
{"container", pkg.NewExec(
|
||||
"exec-offline", nil, 0,
|
||||
"exec-offline", nil, 0, false,
|
||||
pkg.AbsWork,
|
||||
[]string{"HAKUREI_TEST=1"},
|
||||
check.MustAbs("/opt/bin/testtool"),
|
||||
@@ -62,7 +62,7 @@ func TestExec(t *testing.T) {
|
||||
), ignorePathname, wantChecksumOffline, nil},
|
||||
|
||||
{"error passthrough", pkg.NewExec(
|
||||
"", nil, 0,
|
||||
"", nil, 0, true,
|
||||
pkg.AbsWork,
|
||||
[]string{"HAKUREI_TEST=1"},
|
||||
check.MustAbs("/opt/bin/testtool"),
|
||||
@@ -85,7 +85,7 @@ func TestExec(t *testing.T) {
|
||||
}},
|
||||
|
||||
{"invalid paths", pkg.NewExec(
|
||||
"", nil, 0,
|
||||
"", nil, 0, false,
|
||||
pkg.AbsWork,
|
||||
[]string{"HAKUREI_TEST=1"},
|
||||
check.MustAbs("/opt/bin/testtool"),
|
||||
@@ -98,7 +98,7 @@ func TestExec(t *testing.T) {
|
||||
// check init failure passthrough
|
||||
var exitError *exec.ExitError
|
||||
if _, _, err := c.Cure(pkg.NewExec(
|
||||
"", nil, 0,
|
||||
"", nil, 0, false,
|
||||
pkg.AbsWork,
|
||||
nil,
|
||||
check.MustAbs("/opt/bin/testtool"),
|
||||
@@ -120,7 +120,7 @@ func TestExec(t *testing.T) {
|
||||
)
|
||||
cureMany(t, c, []cureStep{
|
||||
{"container", pkg.NewExec(
|
||||
"exec-net", &wantChecksum, 0,
|
||||
"exec-net", &wantChecksum, 0, false,
|
||||
pkg.AbsWork,
|
||||
[]string{"HAKUREI_TEST=1"},
|
||||
check.MustAbs("/opt/bin/testtool"),
|
||||
@@ -152,7 +152,7 @@ func TestExec(t *testing.T) {
|
||||
|
||||
cureMany(t, c, []cureStep{
|
||||
{"container", pkg.NewExec(
|
||||
"exec-overlay-root", nil, 0,
|
||||
"exec-overlay-root", nil, 0, false,
|
||||
pkg.AbsWork,
|
||||
[]string{"HAKUREI_TEST=1", "HAKUREI_ROOT=1"},
|
||||
check.MustAbs("/opt/bin/testtool"),
|
||||
@@ -178,7 +178,7 @@ func TestExec(t *testing.T) {
|
||||
|
||||
cureMany(t, c, []cureStep{
|
||||
{"container", pkg.NewExec(
|
||||
"exec-overlay-work", nil, 0,
|
||||
"exec-overlay-work", nil, 0, false,
|
||||
pkg.AbsWork,
|
||||
[]string{"HAKUREI_TEST=1", "HAKUREI_ROOT=1"},
|
||||
check.MustAbs("/work/bin/testtool"),
|
||||
@@ -209,7 +209,7 @@ func TestExec(t *testing.T) {
|
||||
|
||||
cureMany(t, c, []cureStep{
|
||||
{"container", pkg.NewExec(
|
||||
"exec-multiple-layers", nil, 0,
|
||||
"exec-multiple-layers", nil, 0, false,
|
||||
pkg.AbsWork,
|
||||
[]string{"HAKUREI_TEST=1", "HAKUREI_ROOT=1"},
|
||||
check.MustAbs("/opt/bin/testtool"),
|
||||
@@ -262,7 +262,7 @@ func TestExec(t *testing.T) {
|
||||
|
||||
cureMany(t, c, []cureStep{
|
||||
{"container", pkg.NewExec(
|
||||
"exec-layer-promotion", nil, 0,
|
||||
"exec-layer-promotion", nil, 0, true,
|
||||
pkg.AbsWork,
|
||||
[]string{"HAKUREI_TEST=1", "HAKUREI_ROOT=1"},
|
||||
check.MustAbs("/opt/bin/testtool"),
|
||||
|
||||
@@ -37,13 +37,16 @@ func NewFile(name string, data []byte) FileArtifact {
|
||||
}
|
||||
|
||||
// Kind returns the hardcoded [Kind] constant.
|
||||
func (a *fileArtifact) Kind() Kind { return KindFile }
|
||||
func (*fileArtifact) Kind() Kind { return KindFile }
|
||||
|
||||
// Params writes the result of Cure.
|
||||
func (a *fileArtifact) Params(ctx *IContext) { ctx.GetHash().Write(*a) }
|
||||
|
||||
// Dependencies returns a nil slice.
|
||||
func (a *fileArtifact) Dependencies() []Artifact { return nil }
|
||||
func (*fileArtifact) Dependencies() []Artifact { return nil }
|
||||
|
||||
// IsExclusive returns false: Cure returns a prepopulated buffer.
|
||||
func (*fileArtifact) IsExclusive() bool { return false }
|
||||
|
||||
// Checksum computes and returns the checksum of caller-supplied data.
|
||||
func (a *fileArtifact) Checksum() Checksum {
|
||||
|
||||
@@ -40,7 +40,7 @@ func NewHTTPGet(
|
||||
}
|
||||
|
||||
// Kind returns the hardcoded [Kind] constant.
|
||||
func (a *httpArtifact) Kind() Kind { return KindHTTPGet }
|
||||
func (*httpArtifact) Kind() Kind { return KindHTTPGet }
|
||||
|
||||
// Params writes the backing url string. Client is not represented as it does
|
||||
// not affect [Cache.Cure] outcome.
|
||||
@@ -49,7 +49,10 @@ func (a *httpArtifact) Params(ctx *IContext) {
|
||||
}
|
||||
|
||||
// Dependencies returns a nil slice.
|
||||
func (a *httpArtifact) Dependencies() []Artifact { return nil }
|
||||
func (*httpArtifact) Dependencies() []Artifact { return nil }
|
||||
|
||||
// IsExclusive returns false: Cure returns as soon as a response is received.
|
||||
func (*httpArtifact) IsExclusive() bool { return false }
|
||||
|
||||
// Checksum returns the caller-supplied checksum.
|
||||
func (a *httpArtifact) Checksum() Checksum { return a.checksum.Value() }
|
||||
|
||||
@@ -253,6 +253,24 @@ type Artifact interface {
|
||||
//
|
||||
// Result must remain identical across multiple invocations.
|
||||
Dependencies() []Artifact
|
||||
|
||||
// IsExclusive returns whether the [Artifact] is exclusive. Exclusive
|
||||
// artifacts might not run in parallel with each other, and are still
|
||||
// subject to the cures limit.
|
||||
//
|
||||
// Some implementations may saturate the CPU for a nontrivial amount of
|
||||
// time. Curing multiple such implementations simultaneously causes
|
||||
// significant CPU scheduler overhead. An exclusive artifact will generally
|
||||
// not be cured alongside another exclusive artifact, thus alleviating this
|
||||
// overhead.
|
||||
//
|
||||
// Note that [Cache] reserves the right to still cure exclusive
|
||||
// artifacts concurrently as this is not a synchronisation primitive but
|
||||
// an optimisation one. Implementations are forbidden from accessing global
|
||||
// state regardless of exclusivity.
|
||||
//
|
||||
// Result must remain identical across multiple invocations.
|
||||
IsExclusive() bool
|
||||
}
|
||||
|
||||
// FloodArtifact refers to an [Artifact] requiring its entire dependency graph
|
||||
@@ -472,6 +490,8 @@ type Cache struct {
|
||||
// Synchronises access to ident and corresponding filesystem entries.
|
||||
identMu sync.RWMutex
|
||||
|
||||
// Synchronises entry into exclusive artifacts for the cure method.
|
||||
exclMu sync.Mutex
|
||||
// Buffered I/O free list, must not be accessed directly.
|
||||
bufioPool sync.Pool
|
||||
|
||||
@@ -1215,7 +1235,10 @@ func (e *DependencyCureError) Error() string {
|
||||
}
|
||||
|
||||
// enterCure must be called before entering an [Artifact] implementation.
|
||||
func (c *Cache) enterCure(curesExempt bool) error {
|
||||
func (c *Cache) enterCure(a Artifact, curesExempt bool) error {
|
||||
if a.IsExclusive() {
|
||||
c.exclMu.Lock()
|
||||
}
|
||||
if curesExempt {
|
||||
return nil
|
||||
}
|
||||
@@ -1225,15 +1248,23 @@ func (c *Cache) enterCure(curesExempt bool) error {
|
||||
return nil
|
||||
|
||||
case <-c.ctx.Done():
|
||||
if a.IsExclusive() {
|
||||
c.exclMu.Unlock()
|
||||
}
|
||||
return c.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// exitCure must be called after exiting an [Artifact] implementation.
|
||||
func (c *Cache) exitCure(curesExempt bool) {
|
||||
if !curesExempt {
|
||||
<-c.cures
|
||||
func (c *Cache) exitCure(a Artifact, curesExempt bool) {
|
||||
if a.IsExclusive() {
|
||||
c.exclMu.Unlock()
|
||||
}
|
||||
if curesExempt {
|
||||
return
|
||||
}
|
||||
|
||||
<-c.cures
|
||||
}
|
||||
|
||||
// getWriter is like [bufio.NewWriter] but for bufioPool.
|
||||
@@ -1456,7 +1487,7 @@ func (c *Cache) cure(a Artifact, curesExempt bool) (
|
||||
}()
|
||||
|
||||
var r io.ReadCloser
|
||||
if err = c.enterCure(curesExempt); err != nil {
|
||||
if err = c.enterCure(a, curesExempt); err != nil {
|
||||
return
|
||||
}
|
||||
r, err = f.Cure(&RContext{c})
|
||||
@@ -1505,7 +1536,7 @@ func (c *Cache) cure(a Artifact, curesExempt bool) (
|
||||
err = closeErr
|
||||
}
|
||||
}
|
||||
c.exitCure(curesExempt)
|
||||
c.exitCure(a, curesExempt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -1539,11 +1570,11 @@ func (c *Cache) cure(a Artifact, curesExempt bool) (
|
||||
switch ca := a.(type) {
|
||||
case TrivialArtifact:
|
||||
defer t.destroy(&err)
|
||||
if err = c.enterCure(curesExempt); err != nil {
|
||||
if err = c.enterCure(a, curesExempt); err != nil {
|
||||
return
|
||||
}
|
||||
err = ca.Cure(&t)
|
||||
c.exitCure(curesExempt)
|
||||
c.exitCure(a, curesExempt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -1573,11 +1604,11 @@ func (c *Cache) cure(a Artifact, curesExempt bool) (
|
||||
}
|
||||
|
||||
defer f.destroy(&err)
|
||||
if err = c.enterCure(curesExempt); err != nil {
|
||||
if err = c.enterCure(a, curesExempt); err != nil {
|
||||
return
|
||||
}
|
||||
err = ca.Cure(&f)
|
||||
c.exitCure(curesExempt)
|
||||
c.exitCure(a, curesExempt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -96,12 +96,14 @@ func (a *stubArtifact) Kind() pkg.Kind { return a.kind }
|
||||
func (a *stubArtifact) Params(ctx *pkg.IContext) { ctx.GetHash().Write(a.params) }
|
||||
func (a *stubArtifact) Dependencies() []pkg.Artifact { return a.deps }
|
||||
func (a *stubArtifact) Cure(t *pkg.TContext) error { return a.cure(t) }
|
||||
func (*stubArtifact) IsExclusive() bool { return false }
|
||||
|
||||
// A stubArtifactF implements [FloodArtifact] with hardcoded behaviour.
|
||||
type stubArtifactF struct {
|
||||
kind pkg.Kind
|
||||
params []byte
|
||||
deps []pkg.Artifact
|
||||
excl bool
|
||||
|
||||
cure func(f *pkg.FContext) error
|
||||
}
|
||||
@@ -110,6 +112,7 @@ func (a *stubArtifactF) Kind() pkg.Kind { return a.kind }
|
||||
func (a *stubArtifactF) Params(ctx *pkg.IContext) { ctx.GetHash().Write(a.params) }
|
||||
func (a *stubArtifactF) Dependencies() []pkg.Artifact { return a.deps }
|
||||
func (a *stubArtifactF) Cure(f *pkg.FContext) error { return a.cure(f) }
|
||||
func (a *stubArtifactF) IsExclusive() bool { return a.excl }
|
||||
|
||||
// A stubFile implements [FileArtifact] with hardcoded behaviour.
|
||||
type stubFile struct {
|
||||
|
||||
@@ -80,6 +80,9 @@ func (a *tarArtifact) Dependencies() []Artifact {
|
||||
return []Artifact{a.f}
|
||||
}
|
||||
|
||||
// IsExclusive returns false: decompressor and tar reader are fully sequential.
|
||||
func (a *tarArtifact) IsExclusive() bool { return false }
|
||||
|
||||
// A DisallowedTypeflagError describes a disallowed typeflag encountered while
|
||||
// unpacking a tarball.
|
||||
type DisallowedTypeflagError byte
|
||||
|
||||
Reference in New Issue
Block a user