From 6956dfc31ae014c0bdeedf5d3f20963b99abcddb Mon Sep 17 00:00:00 2001 From: Ophestra Date: Sat, 24 Jan 2026 16:02:50 +0900 Subject: [PATCH] internal/pkg: block on implementation entry This avoids blocking while not in Cure method of the implementation. Signed-off-by: Ophestra --- internal/pkg/exec_test.go | 4 +- internal/pkg/pkg.go | 96 ++++++++++++++++++++++++--------------- internal/pkg/pkg_test.go | 6 ++- 3 files changed, 68 insertions(+), 38 deletions(-) diff --git a/internal/pkg/exec_test.go b/internal/pkg/exec_test.go index d3d1e15..862a28c 100644 --- a/internal/pkg/exec_test.go +++ b/internal/pkg/exec_test.go @@ -239,7 +239,9 @@ func TestExec(t *testing.T) { cure: func(t *pkg.TContext) error { return os.MkdirAll(t.GetWorkDir().String(), 0700) }, - }}, 1<<5 /* concurrent cache hits */), cure: func(f *pkg.FContext) error { + }}, 1<<5 /* concurrent cache hits */), + + cure: func(f *pkg.FContext) error { work := f.GetWorkDir() if err := os.MkdirAll(work.String(), 0700); err != nil { return err diff --git a/internal/pkg/pkg.go b/internal/pkg/pkg.go index c8e3699..7f082d5 100644 --- a/internal/pkg/pkg.go +++ b/internal/pkg/pkg.go @@ -413,9 +413,9 @@ type pendingArtifactDep struct { // Cache is a support layer that implementations of [Artifact] can use to store // cured [Artifact] data in a content addressed fashion. type Cache struct { - // Work for curing dependency [Artifact] is sent here and cured concurrently - // while subject to the cures limit. Invalid after the context is canceled. - cureDep chan<- *pendingArtifactDep + // Cures of any variant of [Artifact] sends to cures before entering the + // implementation and receives an equal amount of elements after. + cures chan struct{} // [context.WithCancel] over caller-supplied context, used by [Artifact] and // all dependency curing goroutines. @@ -1102,6 +1102,14 @@ func (c *Cache) Cure(a Artifact) ( checksum unique.Handle[Checksum], err error, ) { + select { + case <-c.ctx.Done(): + err = c.ctx.Err() + return + + default: + } + if c.threshold > 0 { var n uintptr for range Flood(a) { @@ -1114,7 +1122,7 @@ func (c *Cache) Cure(a Artifact) ( c.msg.Verbosef("visited %d artifacts", n) } - return c.cure(a) + return c.cure(a, true) } // CureError wraps a non-nil error returned attempting to cure an [Artifact]. @@ -1187,8 +1195,30 @@ func (e *DependencyCureError) Error() string { return buf.String() } +// enterCure must be called before entering an [Artifact] implementation. +func (c *Cache) enterCure(curesExempt bool) error { + if curesExempt { + return nil + } + + select { + case c.cures <- struct{}{}: + return nil + + case <-c.ctx.Done(): + return c.ctx.Err() + } +} + +// exitCure must be called after exiting an [Artifact] implementation. +func (c *Cache) exitCure(curesExempt bool) { + if !curesExempt { + <-c.cures + } +} + // cure implements Cure without checking the full dependency graph. -func (c *Cache) cure(a Artifact) ( +func (c *Cache) cure(a Artifact, curesExempt bool) ( pathname *check.Absolute, checksum unique.Handle[Checksum], err error, @@ -1294,7 +1324,11 @@ func (c *Cache) cure(a Artifact) ( } var data []byte + if err = c.enterCure(curesExempt); err != nil { + return + } data, err = f.Cure(c.ctx) + c.exitCure(curesExempt) if err != nil { return } @@ -1365,7 +1399,12 @@ func (c *Cache) cure(a Artifact) ( switch ca := a.(type) { case TrivialArtifact: defer t.destroy(&err) - if err = ca.Cure(&t); err != nil { + if err = c.enterCure(curesExempt); err != nil { + return + } + err = ca.Cure(&t) + c.exitCure(curesExempt) + if err != nil { return } break @@ -1381,14 +1420,7 @@ func (c *Cache) cure(a Artifact) ( var errsMu sync.Mutex for i, d := range deps { pending := pendingArtifactDep{d, &res[i], &errs, &errsMu, &wg} - select { - case c.cureDep <- &pending: - break - - case <-c.ctx.Done(): - err = c.ctx.Err() - return - } + go pending.cure(c) } wg.Wait() @@ -1401,7 +1433,12 @@ func (c *Cache) cure(a Artifact) ( } defer f.destroy(&err) - if err = ca.Cure(&f); err != nil { + if err = c.enterCure(curesExempt); err != nil { + return + } + err = ca.Cure(&f) + c.exitCure(curesExempt) + if err != nil { return } break @@ -1486,7 +1523,7 @@ func (pending *pendingArtifactDep) cure(c *Cache) { defer pending.Done() var err error - pending.resP.pathname, pending.resP.checksum, err = c.cure(pending.a) + pending.resP.pathname, pending.resP.checksum, err = c.cure(pending.a, false) if err == nil { return } @@ -1501,6 +1538,7 @@ func (c *Cache) Close() { c.closeOnce.Do(func() { c.cancel() c.wg.Wait() + close(c.cures) c.unlock() }) } @@ -1536,6 +1574,10 @@ func open( base *check.Absolute, lock bool, ) (*Cache, error) { + if cures < 1 { + cures = runtime.NumCPU() + } + for _, name := range []string{ dirIdentifier, dirChecksum, @@ -1548,6 +1590,8 @@ func open( } c := Cache{ + cures: make(chan struct{}, cures), + msg: msg, base: base, @@ -1556,8 +1600,6 @@ func open( identPending: make(map[unique.Handle[ID]]<-chan struct{}), } c.ctx, c.cancel = context.WithCancel(ctx) - cureDep := make(chan *pendingArtifactDep, cures) - c.cureDep = cureDep c.identPool.New = func() any { return new(extIdent) } if lock || !testing.Testing() { @@ -1572,23 +1614,5 @@ func open( c.unlock = func() {} } - if cures < 1 { - cures = runtime.NumCPU() - } - for i := 0; i < cures; i++ { - c.wg.Go(func() { - for { - select { - case <-c.ctx.Done(): - return - - case pending := <-cureDep: - pending.cure(&c) - break - } - } - }) - } - return &c, nil } diff --git a/internal/pkg/pkg_test.go b/internal/pkg/pkg_test.go index 727a6c3..2581cce 100644 --- a/internal/pkg/pkg_test.go +++ b/internal/pkg/pkg_test.go @@ -283,7 +283,7 @@ func checkWithCache(t *testing.T, testCases []cacheTestCase) { msg.SwapVerbose(testing.Verbose()) var scrubFunc func() error // scrub after hashing - if c, err := pkg.Open(t.Context(), msg, 0, base); err != nil { + if c, err := pkg.Open(t.Context(), msg, 1<<4, base); err != nil { t.Fatalf("Open: error = %v", err) } else { t.Cleanup(c.Close) @@ -561,6 +561,10 @@ func TestCache(t *testing.T) { stub.UniqueError }{UniqueError: 0xbad}, )}, + + cure: func(f *pkg.FContext) error { + panic("attempting to cure impossible artifact") + }, }, nil, pkg.Checksum{}, &pkg.DependencyCureError{ { Ident: unique.Make(pkg.ID{0xff, 3}),