internal/pkg: block on implementation entry
All checks were successful
Test / Create distribution (push) Successful in 50s
Test / Sandbox (push) Successful in 2m38s
Test / Hakurei (push) Successful in 3m50s
Test / ShareFS (push) Successful in 3m59s
Test / Hpkg (push) Successful in 4m30s
Test / Sandbox (race detector) (push) Successful in 4m58s
Test / Hakurei (race detector) (push) Successful in 3m7s
Test / Flake checks (push) Successful in 1m39s

This avoids blocking while not in Cure method of the implementation.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2026-01-24 16:02:50 +09:00
parent d9ebaf20f8
commit 6956dfc31a
3 changed files with 68 additions and 38 deletions

View File

@@ -239,7 +239,9 @@ func TestExec(t *testing.T) {
cure: func(t *pkg.TContext) error {
return os.MkdirAll(t.GetWorkDir().String(), 0700)
},
}}, 1<<5 /* concurrent cache hits */), cure: func(f *pkg.FContext) error {
}}, 1<<5 /* concurrent cache hits */),
cure: func(f *pkg.FContext) error {
work := f.GetWorkDir()
if err := os.MkdirAll(work.String(), 0700); err != nil {
return err

View File

@@ -413,9 +413,9 @@ type pendingArtifactDep struct {
// Cache is a support layer that implementations of [Artifact] can use to store
// cured [Artifact] data in a content addressed fashion.
type Cache struct {
// Work for curing dependency [Artifact] is sent here and cured concurrently
// while subject to the cures limit. Invalid after the context is canceled.
cureDep chan<- *pendingArtifactDep
// Cures of any variant of [Artifact] sends to cures before entering the
// implementation and receives an equal amount of elements after.
cures chan struct{}
// [context.WithCancel] over caller-supplied context, used by [Artifact] and
// all dependency curing goroutines.
@@ -1102,6 +1102,14 @@ func (c *Cache) Cure(a Artifact) (
checksum unique.Handle[Checksum],
err error,
) {
select {
case <-c.ctx.Done():
err = c.ctx.Err()
return
default:
}
if c.threshold > 0 {
var n uintptr
for range Flood(a) {
@@ -1114,7 +1122,7 @@ func (c *Cache) Cure(a Artifact) (
c.msg.Verbosef("visited %d artifacts", n)
}
return c.cure(a)
return c.cure(a, true)
}
// CureError wraps a non-nil error returned attempting to cure an [Artifact].
@@ -1187,8 +1195,30 @@ func (e *DependencyCureError) Error() string {
return buf.String()
}
// enterCure must be called before entering an [Artifact] implementation.
func (c *Cache) enterCure(curesExempt bool) error {
if curesExempt {
return nil
}
select {
case c.cures <- struct{}{}:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
}
}
// exitCure must be called after exiting an [Artifact] implementation.
func (c *Cache) exitCure(curesExempt bool) {
if !curesExempt {
<-c.cures
}
}
// cure implements Cure without checking the full dependency graph.
func (c *Cache) cure(a Artifact) (
func (c *Cache) cure(a Artifact, curesExempt bool) (
pathname *check.Absolute,
checksum unique.Handle[Checksum],
err error,
@@ -1294,7 +1324,11 @@ func (c *Cache) cure(a Artifact) (
}
var data []byte
if err = c.enterCure(curesExempt); err != nil {
return
}
data, err = f.Cure(c.ctx)
c.exitCure(curesExempt)
if err != nil {
return
}
@@ -1365,7 +1399,12 @@ func (c *Cache) cure(a Artifact) (
switch ca := a.(type) {
case TrivialArtifact:
defer t.destroy(&err)
if err = ca.Cure(&t); err != nil {
if err = c.enterCure(curesExempt); err != nil {
return
}
err = ca.Cure(&t)
c.exitCure(curesExempt)
if err != nil {
return
}
break
@@ -1381,14 +1420,7 @@ func (c *Cache) cure(a Artifact) (
var errsMu sync.Mutex
for i, d := range deps {
pending := pendingArtifactDep{d, &res[i], &errs, &errsMu, &wg}
select {
case c.cureDep <- &pending:
break
case <-c.ctx.Done():
err = c.ctx.Err()
return
}
go pending.cure(c)
}
wg.Wait()
@@ -1401,7 +1433,12 @@ func (c *Cache) cure(a Artifact) (
}
defer f.destroy(&err)
if err = ca.Cure(&f); err != nil {
if err = c.enterCure(curesExempt); err != nil {
return
}
err = ca.Cure(&f)
c.exitCure(curesExempt)
if err != nil {
return
}
break
@@ -1486,7 +1523,7 @@ func (pending *pendingArtifactDep) cure(c *Cache) {
defer pending.Done()
var err error
pending.resP.pathname, pending.resP.checksum, err = c.cure(pending.a)
pending.resP.pathname, pending.resP.checksum, err = c.cure(pending.a, false)
if err == nil {
return
}
@@ -1501,6 +1538,7 @@ func (c *Cache) Close() {
c.closeOnce.Do(func() {
c.cancel()
c.wg.Wait()
close(c.cures)
c.unlock()
})
}
@@ -1536,6 +1574,10 @@ func open(
base *check.Absolute,
lock bool,
) (*Cache, error) {
if cures < 1 {
cures = runtime.NumCPU()
}
for _, name := range []string{
dirIdentifier,
dirChecksum,
@@ -1548,6 +1590,8 @@ func open(
}
c := Cache{
cures: make(chan struct{}, cures),
msg: msg,
base: base,
@@ -1556,8 +1600,6 @@ func open(
identPending: make(map[unique.Handle[ID]]<-chan struct{}),
}
c.ctx, c.cancel = context.WithCancel(ctx)
cureDep := make(chan *pendingArtifactDep, cures)
c.cureDep = cureDep
c.identPool.New = func() any { return new(extIdent) }
if lock || !testing.Testing() {
@@ -1572,23 +1614,5 @@ func open(
c.unlock = func() {}
}
if cures < 1 {
cures = runtime.NumCPU()
}
for i := 0; i < cures; i++ {
c.wg.Go(func() {
for {
select {
case <-c.ctx.Done():
return
case pending := <-cureDep:
pending.cure(&c)
break
}
}
})
}
return &c, nil
}

View File

@@ -283,7 +283,7 @@ func checkWithCache(t *testing.T, testCases []cacheTestCase) {
msg.SwapVerbose(testing.Verbose())
var scrubFunc func() error // scrub after hashing
if c, err := pkg.Open(t.Context(), msg, 0, base); err != nil {
if c, err := pkg.Open(t.Context(), msg, 1<<4, base); err != nil {
t.Fatalf("Open: error = %v", err)
} else {
t.Cleanup(c.Close)
@@ -561,6 +561,10 @@ func TestCache(t *testing.T) {
stub.UniqueError
}{UniqueError: 0xbad},
)},
cure: func(f *pkg.FContext) error {
panic("attempting to cure impossible artifact")
},
}, nil, pkg.Checksum{}, &pkg.DependencyCureError{
{
Ident: unique.Make(pkg.ID{0xff, 3}),