internal/pkg: move dependency flooding to cache

This imposes a hard upper limit to concurrency during dependency satisfaction and moves all dependency-related code out of individual implementations of Artifact. This change also includes ctx and msg as part of Cache.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2026-01-09 01:51:39 +09:00
parent f2430b5f5e
commit f712466714
11 changed files with 680 additions and 409 deletions

View File

@@ -3,6 +3,7 @@ package pkg
import (
"bytes"
"context"
"crypto/sha512"
"encoding/base64"
"encoding/binary"
@@ -12,11 +13,14 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"slices"
"strings"
"sync"
"syscall"
"hakurei.app/container/check"
"hakurei.app/message"
)
type (
@@ -53,45 +57,123 @@ func MustDecode(s string) Checksum {
}
}
// CureContext is passed to [Artifact.Cure] and contains information and methods
// useful for curing the [Artifact], like requesting the data of [File], or that
// other artifacts be cured.
// TContext is passed to [TrivialArtifact.Cure] and provides information and
// methods required for curing the [TrivialArtifact].
//
// Methods of CureContext are safe for concurrent use. CureContext is valid
// until [Artifact.Cure] returns.
type CureContext struct {
// Address of underlying [Cache], should be zeroed after [Artifact.Cure]
// returns and must not be exposed directly.
// Methods of TContext are safe for concurrent use. TContext is valid
// until [TrivialArtifact.Cure] returns.
type TContext struct {
// Address of underlying [Cache], should be zeroed or made unusable after
// [TrivialArtifact.Cure] returns and must not be exposed directly.
cache *Cache
// Populated during [Cache.Cure].
work, temp *check.Absolute
}
// destroy destroys the temporary directory and joins its errors with the error
// referred to by errP. If the error referred to by errP is non-nil, the work
// directory is removed similarly. [Cache] is responsible for making sure work
// is never left behind for a successful [Cache.Cure].
//
// destroy must be deferred by [Cache.Cure] if [TContext] is passed to any Cure
// implementation. It should not be called prior to that point.
func (t *TContext) destroy(errP *error) {
if chmodErr, removeErr := removeAll(t.temp); chmodErr != nil || removeErr != nil {
*errP = errors.Join(*errP, chmodErr, removeErr)
return
}
if *errP != nil {
chmodErr, removeErr := removeAll(t.work)
if chmodErr != nil || removeErr != nil {
*errP = errors.Join(*errP, chmodErr, removeErr)
} else if errors.Is(*errP, os.ErrExist) {
// two artifacts may be backed by the same file
*errP = nil
}
}
}
// Unwrap returns the underlying [context.Context].
func (t *TContext) Unwrap() context.Context { return t.cache.ctx }
// GetMessage returns [message.Msg] held by the underlying [Cache].
func (t *TContext) GetMessage() message.Msg { return t.cache.msg }
// GetWorkDir returns a pathname to a directory which [Artifact] is expected to
// write its output to. This is not the final resting place of the [Artifact]
// and this pathname should not be directly referred to in the final contents.
func (c *CureContext) GetWorkDir() *check.Absolute { return c.work }
func (t *TContext) GetWorkDir() *check.Absolute { return t.work }
// GetTempDir returns a pathname which implementations may use as scratch space.
// A directory is not created automatically, implementations are expected to
// create it if they wish to use it, using [os.MkdirAll].
func (c *CureContext) GetTempDir() *check.Absolute { return c.temp }
func (t *TContext) GetTempDir() *check.Absolute { return t.temp }
// Cure cures the [Artifact] and returns its pathname and [Checksum].
func (c *CureContext) Cure(a Artifact) (
pathname *check.Absolute,
checksum Checksum,
err error,
) {
return c.cache.Cure(a)
// Open tries to open [Artifact] for reading. If a implements [File], its data
// might be used directly, eliminating the roundtrip to vfs. Otherwise, it must
// cure into a directory containing a single regular file.
//
// If err is nil, the caller is responsible for closing the resulting
// [io.ReadCloser].
func (t *TContext) Open(a Artifact) (r io.ReadCloser, err error) {
if f, ok := a.(File); ok {
return t.cache.openFile(f)
}
var pathname *check.Absolute
if pathname, _, err = t.cache.Cure(a); err != nil {
return
}
var entries []os.DirEntry
if entries, err = os.ReadDir(pathname.String()); err != nil {
return
}
if len(entries) != 1 || !entries[0].Type().IsRegular() {
err = errors.New(
"input directory does not contain a single regular file",
)
return
} else {
return os.Open(pathname.Append(entries[0].Name()).String())
}
}
// OpenFile tries to load [File] from [Cache], and if that fails, obtains it via
// [File.Data] instead. Notably, it does not cure [File]. If err is nil, the
// caller is responsible for closing the resulting [io.ReadCloser].
func (c *CureContext) OpenFile(f File) (r io.ReadCloser, err error) {
return c.cache.openFile(f)
// FContext is passed to [FloodArtifact.Cure] and provides information and
// methods required for curing the [FloodArtifact].
//
// Methods of FContext are safe for concurrent use. FContext is valid
// until [FloodArtifact.Cure] returns.
type FContext struct {
TContext
// Cured top-level dependencies looked up by Pathname.
deps map[ID]*check.Absolute
}
// InvalidLookupError is the identifier of non-dependency [Artifact] looked up
// via [FContext.Pathname] by a misbehaving [Artifact] implementation.
type InvalidLookupError ID
func (e InvalidLookupError) Error() string {
return "attempting to look up non-dependency artifact " + Encode(e)
}
var _ error = InvalidLookupError{}
// Pathname returns the identifier pathname of an [Artifact]. Calling Pathname
// with an [Artifact] not part of the slice returned by [Artifact.Dependencies]
// panics.
func (f *FContext) Pathname(a Artifact) *check.Absolute {
id := Ident(a)
if p, ok := f.deps[id]; ok {
return p
} else {
panic(InvalidLookupError(id))
}
}
// An Artifact is a read-only reference to a piece of data that may be created
@@ -119,16 +201,35 @@ type Artifact interface {
//
// Result must remain identical across multiple invocations.
Dependencies() []Artifact
}
// FloodArtifact refers to an [Artifact] requiring its entire dependency graph
// to be cured prior to curing itself.
type FloodArtifact interface {
// Cure cures the current [Artifact] to the working directory obtained via
// [CureContext.GetWorkDir].
//
// If the implementation produces a single file, it must implement [File]
// as well. In that case, Cure must produce a single regular file with
// contents identical to that returned by [File.Data].
// [TContext.GetWorkDir] embedded in [FContext].
//
// Implementations must not retain c.
Cure(c *CureContext) (err error)
Cure(f *FContext) (err error)
Artifact
}
// TrivialArtifact refers to an [Artifact] that cures without requiring that
// any other [Artifact] is cured before it. Its dependency tree is ignored after
// computing its identifier.
//
// TrivialArtifact is unable to cure any other [Artifact] and it cannot access
// pathnames. This type of [Artifact] is primarily intended for dependency-less
// artifacts or direct dependencies that only consists of [File].
type TrivialArtifact interface {
// Cure cures the current [Artifact] to the working directory obtained via
// [TContext.GetWorkDir].
//
// Implementations must not retain c.
Cure(t *TContext) (err error)
Artifact
}
// KnownIdent is optionally implemented by [Artifact] and is used instead of
@@ -157,12 +258,14 @@ type KnownChecksum interface {
// A File refers to an [Artifact] backed by a single file.
type File interface {
// Data returns the full contents of [Artifact]. If [Artifact.Checksum]
// returns a non-nil address, Data is responsible for validating any data
// it produces and must return [ChecksumMismatchError] if validation fails.
// Cure returns the full contents of [File]. If [File] implements
// [KnownChecksum], Cure is responsible for validating any data it produces
// and must return [ChecksumMismatchError] if validation fails.
//
// Callers must not modify the returned byte slice.
Data() ([]byte, error)
//
// Result must remain identical across multiple invocations.
Cure() ([]byte, error)
Artifact
}
@@ -242,9 +345,47 @@ const (
checksumLinknamePrefix = "../" + dirChecksum + "/"
)
// A pendingArtifactDep is a dependency [Artifact] pending concurrent curing,
// subject to the cures limit. Values pointed to by result addresses are safe
// to access after the [sync.WaitGroup] associated with this pendingArtifactDep
// is done. pendingArtifactDep must not be reused or modified after it is sent
// to Cache.cureDep.
type pendingArtifactDep struct {
// Dependency artifact populated during [Cache.Cure].
a Artifact
// Address of result pathname populated during [Cache.Cure] and dereferenced
// if curing succeeds.
resP **check.Absolute
// Address of result error slice populated during [Cache.Cure], dereferenced
// after acquiring errsMu if curing fails. No additional action is taken,
// [Cache] and its caller are responsible for further error handling.
errs *[]error
// Address of mutex synchronising access to errs.
errsMu *sync.Mutex
// For synchronising access to result buffer.
*sync.WaitGroup
}
// Cache is a support layer that implementations of [Artifact] can use to store
// cured [Artifact] data in a content addressed fashion.
type Cache struct {
// Work for curing dependency [Artifact] is sent here and cured concurrently
// while subject to the cures limit. Invalid after the context is canceled.
cureDep chan<- *pendingArtifactDep
// [context.WithCancel] over caller-supplied context, used by [Artifact] and
// all dependency curing goroutines.
ctx context.Context
// Cancels ctx.
cancel context.CancelFunc
// For waiting on dependency curing goroutines.
wg sync.WaitGroup
// Reports new cures and passed to [Artifact].
msg message.Msg
// Directory where all [Cache] related files are placed.
base *check.Absolute
@@ -560,7 +701,9 @@ func (c *Cache) finaliseIdent(
close(done)
}
// openFile provides [CureContext.OpenFile] for [Artifact.Cure].
// openFile tries to load [File] from [Cache], and if that fails, obtains it via
// [File.Cure] instead. Notably, it does not cure [File]. If err is nil, the
// caller is responsible for closing the resulting [io.ReadCloser].
func (c *Cache) openFile(f File) (r io.ReadCloser, err error) {
if kc, ok := f.(KnownChecksum); ok {
c.checksumMu.RLock()
@@ -583,7 +726,7 @@ func (c *Cache) openFile(f File) (r io.ReadCloser, err error) {
return
}
var data []byte
if data, err = f.Data(); err != nil {
if data, err = f.Cure(); err != nil {
return
}
r = io.NopCloser(bytes.NewReader(data))
@@ -691,7 +834,16 @@ func (fsys dotOverrideFS) Stat(name string) (fi fs.FileInfo, err error) {
return
}
// Cure cures the [Artifact] and returns its pathname and [Checksum].
// InvalidArtifactError describes an artifact that does not implement a
// supported Cure method.
type InvalidArtifactError ID
func (e InvalidArtifactError) Error() string {
return "artifact " + Encode(e) + " cannot be cured"
}
// Cure cures the [Artifact] and returns its pathname and [Checksum]. Direct
// calls to Cure are not subject to the cures limit.
func (c *Cache) Cure(a Artifact) (
pathname *check.Absolute,
checksum Checksum,
@@ -764,6 +916,11 @@ func (c *Cache) Cure(a Artifact) (
}
}
if c.msg.IsVerbose() {
c.msg.Verbosef("curing %s...", Encode(id))
}
// cure File outside type switch to skip TContext initialisation
if f, ok := a.(File); ok {
if checksumFi != nil {
if !checksumFi.Mode().IsRegular() {
@@ -774,7 +931,7 @@ func (c *Cache) Cure(a Artifact) (
}
var data []byte
data, err = f.Data()
data, err = f.Cure()
if err != nil {
return
}
@@ -823,107 +980,163 @@ func (c *Cache) Cure(a Artifact) (
c.checksumMu.Unlock()
return
} else {
if checksumFi != nil {
if !checksumFi.Mode().IsDir() {
// unreachable
err = InvalidFileModeError(checksumFi.Mode())
}
return
}
}
cc := CureContext{
cache: c,
work: c.base.Append(dirWork, ids),
temp: c.base.Append(dirTemp, ids),
if checksumFi != nil {
if !checksumFi.Mode().IsDir() {
// unreachable
err = InvalidFileModeError(checksumFi.Mode())
}
defer func() {
if chmodErr, removeErr := removeAll(cc.temp); chmodErr != nil || removeErr != nil {
err = errors.Join(err, chmodErr, removeErr)
return
}
if err != nil {
chmodErr, removeErr := removeAll(cc.work)
if chmodErr != nil || removeErr != nil {
err = errors.Join(err, chmodErr, removeErr)
} else if errors.Is(err, os.ErrExist) {
// two artifacts may be backed by the same file
err = nil
}
}
}()
if err = a.Cure(&cc); err != nil {
return
}
cc.cache = nil
var fi os.FileInfo
if fi, err = os.Lstat(cc.work.String()); err != nil {
if errors.Is(err, os.ErrNotExist) {
err = NoOutputError{}
}
return
}
if !fi.IsDir() {
if !fi.Mode().IsRegular() {
err = InvalidFileModeError(fi.Mode())
} else {
err = errors.New("non-file artifact produced regular file")
}
return
}
var gotChecksum Checksum
if gotChecksum, err = HashFS(
dotOverrideFS{os.DirFS(cc.work.String()).(dirFS)},
".",
); err != nil {
return
}
if checksumPathname == nil {
checksum = gotChecksum
checksums = Encode(checksum)
checksumPathname = c.base.Append(
dirChecksum,
checksums,
)
} else {
if gotChecksum != checksum {
err = &ChecksumMismatchError{
Got: gotChecksum,
Want: checksum,
}
return
}
}
if err = os.Chmod(cc.work.String(), 0700); err != nil {
return
}
c.checksumMu.Lock()
if err = os.Rename(
cc.work.String(),
checksumPathname.String(),
); err != nil {
if !errors.Is(err, os.ErrExist) {
c.checksumMu.Unlock()
return
}
// err is zeroed during deferred cleanup
} else {
err = os.Chmod(checksumPathname.String(), 0500)
}
c.checksumMu.Unlock()
return
}
t := TContext{c, c.base.Append(dirWork, ids), c.base.Append(dirTemp, ids)}
switch ca := a.(type) {
case TrivialArtifact:
defer t.destroy(&err)
if err = ca.Cure(&t); err != nil {
return
}
break
case FloodArtifact:
deps := a.Dependencies()
f := FContext{t, make(map[ID]*check.Absolute, len(deps))}
var wg sync.WaitGroup
wg.Add(len(deps))
res := make([]*check.Absolute, len(deps))
errs := make([]error, 0, len(deps))
var errsMu sync.Mutex
for i, d := range deps {
pending := pendingArtifactDep{d, &res[i], &errs, &errsMu, &wg}
select {
case c.cureDep <- &pending:
break
case <-c.ctx.Done():
err = c.ctx.Err()
return
}
}
wg.Wait()
if len(errs) > 0 {
err = errors.Join(errs...)
if err == nil {
// unreachable
err = syscall.ENOTRECOVERABLE
}
return
}
for i, p := range res {
f.deps[Ident(deps[i])] = p
}
defer f.destroy(&err)
if err = ca.Cure(&f); err != nil {
return
}
break
default:
err = InvalidArtifactError(id)
return
}
t.cache = nil
var fi os.FileInfo
if fi, err = os.Lstat(t.work.String()); err != nil {
if errors.Is(err, os.ErrNotExist) {
err = NoOutputError{}
}
return
}
if !fi.IsDir() {
if !fi.Mode().IsRegular() {
err = InvalidFileModeError(fi.Mode())
} else {
err = errors.New("non-file artifact produced regular file")
}
return
}
var gotChecksum Checksum
if gotChecksum, err = HashFS(
dotOverrideFS{os.DirFS(t.work.String()).(dirFS)},
".",
); err != nil {
return
}
if checksumPathname == nil {
checksum = gotChecksum
checksums = Encode(checksum)
checksumPathname = c.base.Append(
dirChecksum,
checksums,
)
} else {
if gotChecksum != checksum {
err = &ChecksumMismatchError{
Got: gotChecksum,
Want: checksum,
}
return
}
}
if err = os.Chmod(t.work.String(), 0700); err != nil {
return
}
c.checksumMu.Lock()
if err = os.Rename(
t.work.String(),
checksumPathname.String(),
); err != nil {
if !errors.Is(err, os.ErrExist) {
c.checksumMu.Unlock()
return
}
// err is zeroed during deferred cleanup
} else {
err = os.Chmod(checksumPathname.String(), 0500)
}
c.checksumMu.Unlock()
return
}
// New returns the address to a new instance of [Cache].
func New(base *check.Absolute) (*Cache, error) {
// cure cures the pending [Artifact], stores its result and notifies the caller.
func (pending *pendingArtifactDep) cure(c *Cache) {
defer pending.Done()
pathname, _, err := c.Cure(pending.a)
if err == nil {
*pending.resP = pathname
return
}
pending.errsMu.Lock()
*pending.errs = append(*pending.errs, err)
pending.errsMu.Unlock()
}
// Close cancels all pending cures and waits for them to clean up.
func (c *Cache) Close() { c.cancel(); c.wg.Wait() }
// New returns the address to a new instance of [Cache]. Concurrent cures of
// dependency [Artifact] is limited to the caller-supplied value, however direct
// calls to [Cache.Cure] is not subject to this limitation.
//
// A cures value of 0 or lower is equivalent to the value returned by
// [runtime.NumCPU].
func New(
ctx context.Context,
msg message.Msg,
cures int,
base *check.Absolute,
) (*Cache, error) {
for _, name := range []string{
dirIdentifier,
dirChecksum,
@@ -935,11 +1148,35 @@ func New(base *check.Absolute) (*Cache, error) {
}
}
return &Cache{
c := Cache{
msg: msg,
base: base,
ident: make(map[ID]Checksum),
identErr: make(map[ID]error),
identPending: make(map[ID]<-chan struct{}),
}, nil
}
c.ctx, c.cancel = context.WithCancel(ctx)
cureDep := make(chan *pendingArtifactDep, cures)
c.cureDep = cureDep
if cures < 1 {
cures = runtime.NumCPU()
}
for i := 0; i < cures; i++ {
c.wg.Go(func() {
for {
select {
case <-c.ctx.Done():
return
case pending := <-cureDep:
pending.cure(&c)
break
}
}
})
}
return &c, nil
}