All checks were successful
Test / Create distribution (push) Successful in 1m0s
Test / Sandbox (push) Successful in 2m41s
Test / Hakurei (push) Successful in 4m1s
Test / ShareFS (push) Successful in 4m1s
Test / Hpkg (push) Successful in 4m35s
Test / Sandbox (race detector) (push) Successful in 5m4s
Test / Hakurei (race detector) (push) Successful in 6m0s
Test / Flake checks (push) Successful in 1m46s
This should hopefully provide good separation between the artifact curing backend implementation and the (still work in progress) language. Making the IR parseable also guarantees uniqueness of the representation. Signed-off-by: Ophestra <cat@gensokyo.uk>
1738 lines
46 KiB
Go
1738 lines
46 KiB
Go
// Package pkg provides utilities for packaging software.
|
|
package pkg
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha512"
|
|
"encoding/base64"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"io/fs"
|
|
"iter"
|
|
"maps"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"runtime"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"testing"
|
|
"unique"
|
|
"unsafe"
|
|
|
|
"hakurei.app/container/check"
|
|
"hakurei.app/internal/lockedfile"
|
|
"hakurei.app/message"
|
|
)
|
|
|
|
type (
|
|
// A Checksum is a SHA-384 checksum computed for a cured [Artifact].
|
|
Checksum = [sha512.Size384]byte
|
|
|
|
// An ID is a unique identifier returned by [Artifact.ID]. This value must
|
|
// be deterministically determined ahead of time.
|
|
ID Checksum
|
|
)
|
|
|
|
// Encode is abbreviation for base64.URLEncoding.EncodeToString(checksum[:]).
|
|
func Encode(checksum Checksum) string {
|
|
return base64.URLEncoding.EncodeToString(checksum[:])
|
|
}
|
|
|
|
// Decode is abbreviation for base64.URLEncoding.Decode(checksum[:], []byte(s)).
|
|
func Decode(buf *Checksum, s string) (err error) {
|
|
var n int
|
|
n, err = base64.URLEncoding.Decode(buf[:], []byte(s))
|
|
if err == nil && n != len(buf) {
|
|
err = io.ErrUnexpectedEOF
|
|
}
|
|
return
|
|
}
|
|
|
|
// MustDecode decodes a string representation of [Checksum] and panics if there
|
|
// is a decoding error or the resulting data is too short.
|
|
func MustDecode(s string) (checksum Checksum) {
|
|
if err := Decode(&checksum, s); err != nil {
|
|
panic(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// TContext is passed to [TrivialArtifact.Cure] and provides information and
|
|
// methods required for curing the [TrivialArtifact].
|
|
//
|
|
// Methods of TContext are safe for concurrent use. TContext is valid
|
|
// until [TrivialArtifact.Cure] returns.
|
|
type TContext struct {
|
|
// Address of underlying [Cache], should be zeroed or made unusable after
|
|
// [TrivialArtifact.Cure] returns and must not be exposed directly.
|
|
cache *Cache
|
|
|
|
// Populated during [Cache.Cure].
|
|
work, temp *check.Absolute
|
|
}
|
|
|
|
// destroy destroys the temporary directory and joins its errors with the error
|
|
// referred to by errP. If the error referred to by errP is non-nil, the work
|
|
// directory is removed similarly. [Cache] is responsible for making sure work
|
|
// is never left behind for a successful [Cache.Cure].
|
|
//
|
|
// destroy must be deferred by [Cache.Cure] if [TContext] is passed to any Cure
|
|
// implementation. It should not be called prior to that point.
|
|
func (t *TContext) destroy(errP *error) {
|
|
if chmodErr, removeErr := removeAll(t.temp); chmodErr != nil || removeErr != nil {
|
|
*errP = errors.Join(*errP, chmodErr, removeErr)
|
|
return
|
|
}
|
|
|
|
if *errP != nil {
|
|
chmodErr, removeErr := removeAll(t.work)
|
|
if chmodErr != nil || removeErr != nil {
|
|
*errP = errors.Join(*errP, chmodErr, removeErr)
|
|
} else if errors.Is(*errP, os.ErrExist) {
|
|
// two artifacts may be backed by the same file
|
|
*errP = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Unwrap returns the underlying [context.Context].
|
|
func (t *TContext) Unwrap() context.Context { return t.cache.ctx }
|
|
|
|
// GetMessage returns [message.Msg] held by the underlying [Cache].
|
|
func (t *TContext) GetMessage() message.Msg { return t.cache.msg }
|
|
|
|
// GetWorkDir returns a pathname to a directory which [Artifact] is expected to
|
|
// write its output to. This is not the final resting place of the [Artifact]
|
|
// and this pathname should not be directly referred to in the final contents.
|
|
func (t *TContext) GetWorkDir() *check.Absolute { return t.work }
|
|
|
|
// GetTempDir returns a pathname which implementations may use as scratch space.
|
|
// A directory is not created automatically, implementations are expected to
|
|
// create it if they wish to use it, using [os.MkdirAll].
|
|
func (t *TContext) GetTempDir() *check.Absolute { return t.temp }
|
|
|
|
// Open tries to open [Artifact] for reading. If a implements [FileArtifact],
|
|
// its reader might be used directly, eliminating the roundtrip to vfs.
|
|
// Otherwise, it must cure into a directory containing a single regular file.
|
|
//
|
|
// If err is nil, the caller must close the resulting [io.ReadCloser] and return
|
|
// its error, if any. Failure to read r to EOF may result in a spurious
|
|
// [ChecksumMismatchError], or the underlying implementation may block on Close.
|
|
func (t *TContext) Open(a Artifact) (r io.ReadCloser, err error) {
|
|
if f, ok := a.(FileArtifact); ok {
|
|
return t.cache.openFile(f)
|
|
}
|
|
|
|
var pathname *check.Absolute
|
|
if pathname, _, err = t.cache.Cure(a); err != nil {
|
|
return
|
|
}
|
|
|
|
var entries []os.DirEntry
|
|
if entries, err = os.ReadDir(pathname.String()); err != nil {
|
|
return
|
|
}
|
|
|
|
if len(entries) != 1 || !entries[0].Type().IsRegular() {
|
|
err = errors.New(
|
|
"input directory does not contain a single regular file",
|
|
)
|
|
return
|
|
} else {
|
|
return os.Open(pathname.Append(entries[0].Name()).String())
|
|
}
|
|
}
|
|
|
|
// FContext is passed to [FloodArtifact.Cure] and provides information and
|
|
// methods required for curing the [FloodArtifact].
|
|
//
|
|
// Methods of FContext are safe for concurrent use. FContext is valid
|
|
// until [FloodArtifact.Cure] returns.
|
|
type FContext struct {
|
|
TContext
|
|
|
|
// Cured top-level dependencies looked up by Pathname.
|
|
deps map[Artifact]cureRes
|
|
}
|
|
|
|
// InvalidLookupError is the identifier of non-dependency [Artifact] looked up
|
|
// via [FContext.Pathname] by a misbehaving [Artifact] implementation.
|
|
type InvalidLookupError ID
|
|
|
|
func (e InvalidLookupError) Error() string {
|
|
return "attempting to look up non-dependency artifact " + Encode(e)
|
|
}
|
|
|
|
var _ error = InvalidLookupError{}
|
|
|
|
// GetArtifact returns the identifier pathname and checksum of an [Artifact].
|
|
// Calling Pathname with an [Artifact] not part of the slice returned by
|
|
// [Artifact.Dependencies] panics.
|
|
func (f *FContext) GetArtifact(a Artifact) (
|
|
pathname *check.Absolute,
|
|
checksum unique.Handle[Checksum],
|
|
) {
|
|
if res, ok := f.deps[a]; ok {
|
|
return res.pathname, res.checksum
|
|
}
|
|
panic(InvalidLookupError(f.cache.Ident(a).Value()))
|
|
}
|
|
|
|
// RContext is passed to [FileArtifact.Cure] and provides helper methods useful
|
|
// for curing the [FileArtifact].
|
|
//
|
|
// Methods of RContext are safe for concurrent use. RContext is valid
|
|
// until [FileArtifact.Cure] returns.
|
|
type RContext struct {
|
|
// Address of underlying [Cache], should be zeroed or made unusable after
|
|
// [FileArtifact.Cure] returns and must not be exposed directly.
|
|
cache *Cache
|
|
}
|
|
|
|
// Unwrap returns the underlying [context.Context].
|
|
func (r *RContext) Unwrap() context.Context { return r.cache.ctx }
|
|
|
|
// An Artifact is a read-only reference to a piece of data that may be created
|
|
// deterministically but might not currently be available in memory or on the
|
|
// filesystem.
|
|
type Artifact interface {
|
|
// Kind returns the [Kind] of artifact. This is usually unique to the
|
|
// concrete type but two functionally identical implementations of
|
|
// [Artifact] is allowed to return the same [Kind] value.
|
|
Kind() Kind
|
|
|
|
// Params writes deterministic values describing [Artifact]. Implementations
|
|
// must guarantee that these values are unique among differing instances
|
|
// of the same implementation with identical dependencies and conveys enough
|
|
// information to create another instance of [Artifact] identical to the
|
|
// instance emitting these values. The new instance created via [IRReadFunc]
|
|
// from these values must then produce identical IR values.
|
|
//
|
|
// Result must remain identical across multiple invocations.
|
|
Params(ctx *IContext)
|
|
|
|
// Dependencies returns a slice of [Artifact] that the current instance
|
|
// depends on to produce its contents.
|
|
//
|
|
// Callers must not modify the retuned slice.
|
|
//
|
|
// Result must remain identical across multiple invocations.
|
|
Dependencies() []Artifact
|
|
|
|
// IsExclusive returns whether the [Artifact] is exclusive. Exclusive
|
|
// artifacts might not run in parallel with each other, and are still
|
|
// subject to the cures limit.
|
|
//
|
|
// Some implementations may saturate the CPU for a nontrivial amount of
|
|
// time. Curing multiple such implementations simultaneously causes
|
|
// significant CPU scheduler overhead. An exclusive artifact will generally
|
|
// not be cured alongside another exclusive artifact, thus alleviating this
|
|
// overhead.
|
|
//
|
|
// Note that [Cache] reserves the right to still cure exclusive
|
|
// artifacts concurrently as this is not a synchronisation primitive but
|
|
// an optimisation one. Implementations are forbidden from accessing global
|
|
// state regardless of exclusivity.
|
|
//
|
|
// Result must remain identical across multiple invocations.
|
|
IsExclusive() bool
|
|
}
|
|
|
|
// FloodArtifact refers to an [Artifact] requiring its entire dependency graph
|
|
// to be cured prior to curing itself.
|
|
type FloodArtifact interface {
|
|
// Cure cures the current [Artifact] to the working directory obtained via
|
|
// [TContext.GetWorkDir] embedded in [FContext].
|
|
//
|
|
// Implementations must not retain c.
|
|
Cure(f *FContext) (err error)
|
|
|
|
Artifact
|
|
}
|
|
|
|
// Flood returns an iterator over the dependency tree of an [Artifact].
|
|
func Flood(a Artifact) iter.Seq[Artifact] {
|
|
return func(yield func(Artifact) bool) {
|
|
for _, d := range a.Dependencies() {
|
|
if !yield(d) {
|
|
return
|
|
}
|
|
|
|
for d0 := range Flood(d) {
|
|
if !yield(d0) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TrivialArtifact refers to an [Artifact] that cures without requiring that
|
|
// any other [Artifact] is cured before it. Its dependency tree is ignored after
|
|
// computing its identifier.
|
|
//
|
|
// TrivialArtifact is unable to cure any other [Artifact] and it cannot access
|
|
// pathnames. This type of [Artifact] is primarily intended for dependency-less
|
|
// artifacts or direct dependencies that only consists of [FileArtifact].
|
|
type TrivialArtifact interface {
|
|
// Cure cures the current [Artifact] to the working directory obtained via
|
|
// [TContext.GetWorkDir].
|
|
//
|
|
// Implementations must not retain c.
|
|
Cure(t *TContext) (err error)
|
|
|
|
Artifact
|
|
}
|
|
|
|
// KnownIdent is optionally implemented by [Artifact] and is used instead of
|
|
// [Kind.Ident] when it is available.
|
|
//
|
|
// This is very subtle to use correctly. The implementation must ensure that
|
|
// this value is globally unique, otherwise [Cache] can enter an inconsistent
|
|
// state. This should not be implemented outside of testing.
|
|
type KnownIdent interface {
|
|
// ID returns a globally unique identifier referring to the current
|
|
// [Artifact]. This value must be known ahead of time and guaranteed to be
|
|
// unique without having obtained the full contents of the [Artifact].
|
|
ID() ID
|
|
}
|
|
|
|
// KnownChecksum is optionally implemented by [Artifact] for an artifact with
|
|
// output known ahead of time.
|
|
type KnownChecksum interface {
|
|
// Checksum returns the address of a known checksum.
|
|
//
|
|
// Callers must not modify the [Checksum].
|
|
//
|
|
// Result must remain identical across multiple invocations.
|
|
Checksum() Checksum
|
|
}
|
|
|
|
// FileArtifact refers to an [Artifact] backed by a single file.
|
|
type FileArtifact interface {
|
|
// Cure returns [io.ReadCloser] of the full contents of [FileArtifact]. If
|
|
// [FileArtifact] implements [KnownChecksum], Cure is responsible for
|
|
// validating any data it produces and must return [ChecksumMismatchError]
|
|
// if validation fails. This error is conventionally returned during the
|
|
// first call to Close, but may be returned during any call to Read before
|
|
// EOF, or by Cure itself.
|
|
//
|
|
// Callers are responsible for closing the resulting [io.ReadCloser].
|
|
//
|
|
// Result must remain identical across multiple invocations.
|
|
Cure(r *RContext) (io.ReadCloser, error)
|
|
|
|
Artifact
|
|
}
|
|
|
|
// reportName returns a string describing [Artifact] presented to the user.
|
|
func reportName(a Artifact, id unique.Handle[ID]) string {
|
|
r := Encode(id.Value())
|
|
if s, ok := a.(fmt.Stringer); ok {
|
|
if name := s.String(); name != "" {
|
|
r += "-" + name
|
|
}
|
|
}
|
|
return r
|
|
}
|
|
|
|
// Kind corresponds to the concrete type of [Artifact] and is used to create
|
|
// identifier for an [Artifact] with dependencies.
|
|
type Kind uint64
|
|
|
|
const (
|
|
// KindHTTPGet is the kind of [Artifact] returned by [NewHTTPGet].
|
|
KindHTTPGet Kind = iota
|
|
// KindTar is the kind of [Artifact] returned by [NewTar].
|
|
KindTar
|
|
// KindExec is the kind of [Artifact] returned by [NewExec].
|
|
KindExec
|
|
// KindExecNet is the kind of [Artifact] returned by [NewExec] but with a
|
|
// non-nil checksum.
|
|
KindExecNet
|
|
// KindFile is the kind of [Artifact] returned by [NewFile].
|
|
KindFile
|
|
|
|
// KindCustomOffset is the first [Kind] value reserved for implementations
|
|
// not from this package.
|
|
KindCustomOffset = 1 << 31
|
|
)
|
|
|
|
const (
|
|
// fileLock is the file name appended to Cache.base for guaranteeing
|
|
// exclusive access to the cache directory.
|
|
fileLock = "lock"
|
|
|
|
// dirIdentifier is the directory name appended to Cache.base for storing
|
|
// artifacts named after their [ID].
|
|
dirIdentifier = "identifier"
|
|
// dirChecksum is the directory name appended to Cache.base for storing
|
|
// artifacts named after their [Checksum].
|
|
dirChecksum = "checksum"
|
|
|
|
// dirWork is the directory name appended to Cache.base for working
|
|
// pathnames set up during [Cache.Cure].
|
|
dirWork = "work"
|
|
// dirTemp is the directory name appended to Cache.base for scratch space
|
|
// pathnames allocated during [Cache.Cure].
|
|
dirTemp = "temp"
|
|
|
|
// checksumLinknamePrefix is prepended to the encoded [Checksum] value
|
|
// of an [Artifact] when creating a symbolic link to dirChecksum.
|
|
checksumLinknamePrefix = "../" + dirChecksum + "/"
|
|
)
|
|
|
|
// cureRes are the non-error results returned by [Cache.Cure].
|
|
type cureRes struct {
|
|
pathname *check.Absolute
|
|
checksum unique.Handle[Checksum]
|
|
}
|
|
|
|
// A pendingArtifactDep is a dependency [Artifact] pending concurrent curing,
|
|
// subject to the cures limit. Values pointed to by result addresses are safe
|
|
// to access after the [sync.WaitGroup] associated with this pendingArtifactDep
|
|
// is done. pendingArtifactDep must not be reused or modified after it is sent
|
|
// to Cache.cureDep.
|
|
type pendingArtifactDep struct {
|
|
// Dependency artifact populated during [Cache.Cure].
|
|
a Artifact
|
|
|
|
// Address of result pathname populated during [Cache.Cure] and dereferenced
|
|
// if curing succeeds.
|
|
resP *cureRes
|
|
|
|
// Address of result error slice populated during [Cache.Cure], dereferenced
|
|
// after acquiring errsMu if curing fails. No additional action is taken,
|
|
// [Cache] and its caller are responsible for further error handling.
|
|
errs *DependencyCureError
|
|
// Address of mutex synchronising access to errs.
|
|
errsMu *sync.Mutex
|
|
|
|
// For synchronising access to result buffer.
|
|
*sync.WaitGroup
|
|
}
|
|
|
|
// Cache is a support layer that implementations of [Artifact] can use to store
|
|
// cured [Artifact] data in a content addressed fashion.
|
|
type Cache struct {
|
|
// Cures of any variant of [Artifact] sends to cures before entering the
|
|
// implementation and receives an equal amount of elements after.
|
|
cures chan struct{}
|
|
|
|
// [context.WithCancel] over caller-supplied context, used by [Artifact] and
|
|
// all dependency curing goroutines.
|
|
ctx context.Context
|
|
// Cancels ctx.
|
|
cancel context.CancelFunc
|
|
// For waiting on dependency curing goroutines.
|
|
wg sync.WaitGroup
|
|
// Reports new cures and passed to [Artifact].
|
|
msg message.Msg
|
|
|
|
// Directory where all [Cache] related files are placed.
|
|
base *check.Absolute
|
|
|
|
// Whether to validate [FileArtifact.Cure] for a [KnownChecksum] file. This
|
|
// significantly reduces performance.
|
|
strict bool
|
|
// Maximum size of a dependency graph.
|
|
threshold uintptr
|
|
|
|
// Artifact to [unique.Handle] of identifier cache.
|
|
artifact sync.Map
|
|
// Identifier free list, must not be accessed directly.
|
|
identPool sync.Pool
|
|
|
|
// Synchronises access to dirChecksum.
|
|
checksumMu sync.RWMutex
|
|
|
|
// Identifier to content pair cache.
|
|
ident map[unique.Handle[ID]]unique.Handle[Checksum]
|
|
// Identifier to error pair for unrecoverably faulted [Artifact].
|
|
identErr map[unique.Handle[ID]]error
|
|
// Pending identifiers, accessed through Cure for entries not in ident.
|
|
identPending map[unique.Handle[ID]]<-chan struct{}
|
|
// Synchronises access to ident and corresponding filesystem entries.
|
|
identMu sync.RWMutex
|
|
|
|
// Synchronises entry into exclusive artifacts for the cure method.
|
|
exclMu sync.Mutex
|
|
// Buffered I/O free list, must not be accessed directly.
|
|
bufioPool sync.Pool
|
|
|
|
// Unlocks the on-filesystem cache. Must only be called from Close.
|
|
unlock func()
|
|
// Synchronises calls to Close.
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
// IsStrict returns whether the [Cache] strictly verifies checksums.
|
|
func (c *Cache) IsStrict() bool { return c.strict }
|
|
|
|
// SetStrict sets whether the [Cache] strictly verifies checksums, even when
|
|
// the implementation promises to validate them internally. This significantly
|
|
// reduces performance and is not recommended outside of testing.
|
|
//
|
|
// This method is not safe for concurrent use with any other method.
|
|
func (c *Cache) SetStrict(strict bool) { c.strict = strict }
|
|
|
|
// SetThreshold imposes a maximum size on the dependency graph, checked on every
|
|
// call to Cure. The zero value disables this check entirely.
|
|
//
|
|
// This method is not safe for concurrent use with any other method.
|
|
func (c *Cache) SetThreshold(threshold uintptr) { c.threshold = threshold }
|
|
|
|
// extIdent is a [Kind] concatenated with [ID].
|
|
type extIdent [wordSize + len(ID{})]byte
|
|
|
|
// getIdentBuf returns the address of an extIdent for Ident.
|
|
func (c *Cache) getIdentBuf() *extIdent { return c.identPool.Get().(*extIdent) }
|
|
|
|
// putIdentBuf adds buf to identPool.
|
|
func (c *Cache) putIdentBuf(buf *extIdent) { c.identPool.Put(buf) }
|
|
|
|
// storeIdent adds an [Artifact] to the artifact cache.
|
|
func (c *Cache) storeIdent(a Artifact, buf *extIdent) unique.Handle[ID] {
|
|
idu := unique.Make(ID(buf[wordSize:]))
|
|
c.artifact.Store(a, idu)
|
|
return idu
|
|
}
|
|
|
|
// Ident returns the identifier of an [Artifact].
|
|
func (c *Cache) Ident(a Artifact) unique.Handle[ID] {
|
|
buf, idu := c.unsafeIdent(a, false)
|
|
if buf != nil {
|
|
idu = c.storeIdent(a, buf)
|
|
c.putIdentBuf(buf)
|
|
}
|
|
return idu
|
|
}
|
|
|
|
// unsafeIdent implements Ident but returns the underlying buffer for a newly
|
|
// computed identifier. Callers must return this buffer to identPool. encodeKind
|
|
// is only a hint, kind may still be encoded in the buffer.
|
|
func (c *Cache) unsafeIdent(a Artifact, encodeKind bool) (
|
|
buf *extIdent,
|
|
idu unique.Handle[ID],
|
|
) {
|
|
if id, ok := c.artifact.Load(a); ok {
|
|
idu = id.(unique.Handle[ID])
|
|
return
|
|
}
|
|
|
|
if ki, ok := a.(KnownIdent); ok {
|
|
buf = c.getIdentBuf()
|
|
if encodeKind {
|
|
binary.LittleEndian.PutUint64(buf[:], uint64(a.Kind()))
|
|
}
|
|
*(*ID)(buf[wordSize:]) = ki.ID()
|
|
return
|
|
}
|
|
|
|
buf = c.getIdentBuf()
|
|
h := sha512.New384()
|
|
if err := c.Encode(h, a); err != nil {
|
|
// unreachable
|
|
panic(err)
|
|
}
|
|
binary.LittleEndian.PutUint64(buf[:], uint64(a.Kind()))
|
|
h.Sum(buf[wordSize:wordSize])
|
|
return
|
|
}
|
|
|
|
// A ChecksumMismatchError describes an [Artifact] with unexpected content.
|
|
type ChecksumMismatchError struct {
|
|
// Actual and expected checksums.
|
|
Got, Want Checksum
|
|
}
|
|
|
|
func (e *ChecksumMismatchError) Error() string {
|
|
return "got " + Encode(e.Got) +
|
|
" instead of " + Encode(e.Want)
|
|
}
|
|
|
|
// ScrubError describes the outcome of a [Cache.Scrub] call where errors were
|
|
// found and removed from the underlying storage of [Cache].
|
|
type ScrubError struct {
|
|
// Content-addressed entries not matching their checksum. This can happen
|
|
// if an incorrect [FileArtifact] implementation was cured against
|
|
// a non-strict [Cache].
|
|
ChecksumMismatches []ChecksumMismatchError
|
|
// Dangling identifier symlinks. This can happen if the content-addressed
|
|
// entry was removed while scrubbing due to a checksum mismatch.
|
|
DanglingIdentifiers []ID
|
|
// Miscellaneous errors, including [os.ReadDir] on checksum and identifier
|
|
// directories, [Decode] on entry names and [os.RemoveAll] on inconsistent
|
|
// entries.
|
|
Errs map[unique.Handle[string]][]error
|
|
}
|
|
|
|
// errs is a deterministic iterator over Errs.
|
|
func (e *ScrubError) errs(yield func(unique.Handle[string], []error) bool) {
|
|
keys := slices.AppendSeq(
|
|
make([]unique.Handle[string], 0, len(e.Errs)),
|
|
maps.Keys(e.Errs),
|
|
)
|
|
slices.SortFunc(keys, func(a, b unique.Handle[string]) int {
|
|
return strings.Compare(a.Value(), b.Value())
|
|
})
|
|
for _, key := range keys {
|
|
if !yield(key, e.Errs[key]) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Unwrap returns a concatenation of ChecksumMismatches and Errs.
|
|
func (e *ScrubError) Unwrap() []error {
|
|
s := make([]error, 0, len(e.ChecksumMismatches)+len(e.Errs))
|
|
for _, err := range e.ChecksumMismatches {
|
|
s = append(s, &err)
|
|
}
|
|
for _, errs := range e.errs {
|
|
s = append(s, errs...)
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Error returns a multi-line representation of [ScrubError].
|
|
func (e *ScrubError) Error() string {
|
|
var segments []string
|
|
if len(e.ChecksumMismatches) > 0 {
|
|
s := "checksum mismatches:\n"
|
|
for _, m := range e.ChecksumMismatches {
|
|
s += m.Error() + "\n"
|
|
}
|
|
segments = append(segments, s)
|
|
}
|
|
if len(e.DanglingIdentifiers) > 0 {
|
|
s := "dangling identifiers:\n"
|
|
for _, id := range e.DanglingIdentifiers {
|
|
s += Encode(id) + "\n"
|
|
}
|
|
segments = append(segments, s)
|
|
}
|
|
if len(e.Errs) > 0 {
|
|
s := "errors during scrub:\n"
|
|
for pathname, errs := range e.errs {
|
|
s += " " + pathname.Value() + ":\n"
|
|
for _, err := range errs {
|
|
s += " " + err.Error() + "\n"
|
|
}
|
|
}
|
|
segments = append(segments, s)
|
|
}
|
|
return strings.Join(segments, "\n")
|
|
}
|
|
|
|
// Scrub frees internal in-memory identifier to content pair cache, verifies all
|
|
// cached artifacts against their checksums, checks for dangling identifier
|
|
// symlinks and removes them if found.
|
|
//
|
|
// This method is not safe for concurrent use with any other method.
|
|
func (c *Cache) Scrub(checks int) error {
|
|
if checks <= 0 {
|
|
checks = runtime.NumCPU()
|
|
}
|
|
|
|
c.identMu.Lock()
|
|
defer c.identMu.Unlock()
|
|
c.checksumMu.Lock()
|
|
defer c.checksumMu.Unlock()
|
|
|
|
c.ident = make(map[unique.Handle[ID]]unique.Handle[Checksum])
|
|
c.identErr = make(map[unique.Handle[ID]]error)
|
|
c.artifact.Clear()
|
|
|
|
var (
|
|
se = ScrubError{Errs: make(map[unique.Handle[string]][]error)}
|
|
seMu sync.Mutex
|
|
|
|
addErr = func(pathname *check.Absolute, err error) {
|
|
seMu.Lock()
|
|
se.Errs[pathname.Handle()] = append(se.Errs[pathname.Handle()], err)
|
|
seMu.Unlock()
|
|
}
|
|
)
|
|
|
|
type checkEntry struct {
|
|
ent os.DirEntry
|
|
check func(ent os.DirEntry, want *Checksum) bool
|
|
}
|
|
var (
|
|
dir *check.Absolute
|
|
wg sync.WaitGroup
|
|
w = make(chan checkEntry, checks)
|
|
p = sync.Pool{New: func() any { return new(Checksum) }}
|
|
)
|
|
condemn := func(ent os.DirEntry) {
|
|
pathname := dir.Append(ent.Name())
|
|
chmodErr, removeErr := removeAll(pathname)
|
|
if chmodErr != nil {
|
|
addErr(pathname, chmodErr)
|
|
}
|
|
if removeErr != nil {
|
|
addErr(pathname, removeErr)
|
|
}
|
|
}
|
|
for i := 0; i < checks; i++ {
|
|
go func() {
|
|
for ce := range w {
|
|
want := p.Get().(*Checksum)
|
|
ent := ce.ent
|
|
if err := Decode(want, ent.Name()); err != nil {
|
|
addErr(dir.Append(ent.Name()), err)
|
|
wg.Go(func() { condemn(ent) })
|
|
} else if !ce.check(ent, want) {
|
|
wg.Go(func() { condemn(ent) })
|
|
} else {
|
|
c.msg.Verbosef("%s is consistent", ent.Name())
|
|
}
|
|
p.Put(want)
|
|
wg.Done()
|
|
}
|
|
}()
|
|
}
|
|
defer close(w)
|
|
|
|
dir = c.base.Append(dirChecksum)
|
|
if entries, readdirErr := os.ReadDir(dir.String()); readdirErr != nil {
|
|
addErr(dir, readdirErr)
|
|
} else {
|
|
wg.Add(len(entries))
|
|
for _, ent := range entries {
|
|
w <- checkEntry{ent, func(ent os.DirEntry, want *Checksum) bool {
|
|
got := p.Get().(*Checksum)
|
|
defer p.Put(got)
|
|
|
|
pathname := dir.Append(ent.Name())
|
|
if ent.IsDir() {
|
|
if err := HashDir(got, pathname); err != nil {
|
|
addErr(pathname, err)
|
|
return true
|
|
}
|
|
} else if ent.Type().IsRegular() {
|
|
h := sha512.New384()
|
|
|
|
if r, err := os.Open(pathname.String()); err != nil {
|
|
addErr(pathname, err)
|
|
return true
|
|
} else {
|
|
_, err = io.Copy(h, r)
|
|
closeErr := r.Close()
|
|
if closeErr != nil {
|
|
addErr(pathname, closeErr)
|
|
}
|
|
if err != nil {
|
|
addErr(pathname, err)
|
|
}
|
|
}
|
|
h.Sum(got[:0])
|
|
} else {
|
|
addErr(pathname, InvalidFileModeError(ent.Type()))
|
|
return false
|
|
}
|
|
|
|
if *got != *want {
|
|
seMu.Lock()
|
|
se.ChecksumMismatches = append(se.ChecksumMismatches,
|
|
ChecksumMismatchError{Got: *got, Want: *want},
|
|
)
|
|
seMu.Unlock()
|
|
return false
|
|
}
|
|
return true
|
|
}}
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
dir = c.base.Append(dirIdentifier)
|
|
if entries, readdirErr := os.ReadDir(dir.String()); readdirErr != nil {
|
|
addErr(dir, readdirErr)
|
|
} else {
|
|
wg.Add(len(entries))
|
|
for _, ent := range entries {
|
|
w <- checkEntry{ent, func(ent os.DirEntry, want *Checksum) bool {
|
|
got := p.Get().(*Checksum)
|
|
defer p.Put(got)
|
|
|
|
pathname := dir.Append(ent.Name())
|
|
if linkname, err := os.Readlink(
|
|
pathname.String(),
|
|
); err != nil {
|
|
seMu.Lock()
|
|
se.Errs[pathname.Handle()] = append(se.Errs[pathname.Handle()], err)
|
|
se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want)
|
|
seMu.Unlock()
|
|
return false
|
|
} else if err = Decode(got, path.Base(linkname)); err != nil {
|
|
seMu.Lock()
|
|
lnp := dir.Append(linkname)
|
|
se.Errs[lnp.Handle()] = append(se.Errs[lnp.Handle()], err)
|
|
se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want)
|
|
seMu.Unlock()
|
|
return false
|
|
}
|
|
|
|
if _, err := os.Stat(pathname.String()); err != nil {
|
|
if !errors.Is(err, os.ErrNotExist) {
|
|
addErr(pathname, err)
|
|
}
|
|
seMu.Lock()
|
|
se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want)
|
|
seMu.Unlock()
|
|
return false
|
|
}
|
|
return true
|
|
}}
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
if len(c.identPending) > 0 {
|
|
addErr(c.base, errors.New(
|
|
"scrub began with pending artifacts",
|
|
))
|
|
} else {
|
|
pathname := c.base.Append(dirWork)
|
|
chmodErr, removeErr := removeAll(pathname)
|
|
if chmodErr != nil {
|
|
addErr(pathname, chmodErr)
|
|
}
|
|
if removeErr != nil {
|
|
addErr(pathname, removeErr)
|
|
}
|
|
|
|
if err := os.Mkdir(pathname.String(), 0700); err != nil {
|
|
addErr(pathname, err)
|
|
}
|
|
|
|
pathname = c.base.Append(dirTemp)
|
|
chmodErr, removeErr = removeAll(pathname)
|
|
if chmodErr != nil {
|
|
addErr(pathname, chmodErr)
|
|
}
|
|
if removeErr != nil {
|
|
addErr(pathname, removeErr)
|
|
}
|
|
}
|
|
|
|
if len(se.ChecksumMismatches) > 0 ||
|
|
len(se.DanglingIdentifiers) > 0 ||
|
|
len(se.Errs) > 0 {
|
|
slices.SortFunc(se.ChecksumMismatches, func(a, b ChecksumMismatchError) int {
|
|
return bytes.Compare(a.Want[:], b.Want[:])
|
|
})
|
|
slices.SortFunc(se.DanglingIdentifiers, func(a, b ID) int {
|
|
return bytes.Compare(a[:], b[:])
|
|
})
|
|
return &se
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// loadOrStoreIdent attempts to load a cached [Artifact] by its identifier or
|
|
// wait for a pending [Artifact] to cure. If neither is possible, the current
|
|
// identifier is stored in identPending and a non-nil channel is returned.
|
|
func (c *Cache) loadOrStoreIdent(id unique.Handle[ID]) (
|
|
done chan<- struct{},
|
|
checksum unique.Handle[Checksum],
|
|
err error,
|
|
) {
|
|
var ok bool
|
|
|
|
c.identMu.Lock()
|
|
if checksum, ok = c.ident[id]; ok {
|
|
c.identMu.Unlock()
|
|
return
|
|
}
|
|
if err, ok = c.identErr[id]; ok {
|
|
c.identMu.Unlock()
|
|
return
|
|
}
|
|
|
|
var notify <-chan struct{}
|
|
if notify, ok = c.identPending[id]; ok {
|
|
c.identMu.Unlock()
|
|
<-notify
|
|
c.identMu.RLock()
|
|
if checksum, ok = c.ident[id]; !ok {
|
|
err = c.identErr[id]
|
|
}
|
|
c.identMu.RUnlock()
|
|
return
|
|
}
|
|
|
|
d := make(chan struct{})
|
|
c.identPending[id] = d
|
|
c.identMu.Unlock()
|
|
done = d
|
|
return
|
|
}
|
|
|
|
// finaliseIdent commits a checksum or error to ident for an identifier
|
|
// previously submitted to identPending.
|
|
func (c *Cache) finaliseIdent(
|
|
done chan<- struct{},
|
|
id unique.Handle[ID],
|
|
checksum unique.Handle[Checksum],
|
|
err error,
|
|
) {
|
|
c.identMu.Lock()
|
|
if err != nil {
|
|
c.identErr[id] = err
|
|
} else {
|
|
c.ident[id] = checksum
|
|
}
|
|
delete(c.identPending, id)
|
|
c.identMu.Unlock()
|
|
|
|
close(done)
|
|
}
|
|
|
|
// openFile tries to load [FileArtifact] from [Cache], and if that fails,
|
|
// obtains it via [FileArtifact.Cure] instead. Notably, it does not cure
|
|
// [FileArtifact] to the filesystem. If err is nil, the caller is responsible
|
|
// for closing the resulting [io.ReadCloser].
|
|
func (c *Cache) openFile(f FileArtifact) (r io.ReadCloser, err error) {
|
|
if kc, ok := f.(KnownChecksum); ok {
|
|
c.checksumMu.RLock()
|
|
r, err = os.Open(c.base.Append(
|
|
dirChecksum,
|
|
Encode(kc.Checksum()),
|
|
).String())
|
|
c.checksumMu.RUnlock()
|
|
} else {
|
|
c.identMu.RLock()
|
|
r, err = os.Open(c.base.Append(
|
|
dirIdentifier,
|
|
Encode(c.Ident(f).Value()),
|
|
).String())
|
|
c.identMu.RUnlock()
|
|
}
|
|
|
|
if err != nil {
|
|
if !errors.Is(err, os.ErrNotExist) {
|
|
return
|
|
}
|
|
if c.msg.IsVerbose() {
|
|
rn := reportName(f, c.Ident(f))
|
|
c.msg.Verbosef("curing %s to memory...", rn)
|
|
defer func() {
|
|
if err == nil {
|
|
c.msg.Verbosef("cured %s to memory", rn)
|
|
}
|
|
}()
|
|
}
|
|
return f.Cure(&RContext{c})
|
|
}
|
|
return
|
|
}
|
|
|
|
// InvalidFileModeError describes a [FloodArtifact.Cure] or
|
|
// [TrivialArtifact.Cure] that did not result in a regular file or directory
|
|
// located at the work pathname.
|
|
type InvalidFileModeError fs.FileMode
|
|
|
|
// Error returns a constant string.
|
|
func (e InvalidFileModeError) Error() string {
|
|
return "artifact did not produce a regular file or directory"
|
|
}
|
|
|
|
// NoOutputError describes a [FloodArtifact.Cure] or [TrivialArtifact.Cure]
|
|
// that did not populate its work pathname despite completing successfully.
|
|
type NoOutputError struct{}
|
|
|
|
// Unwrap returns [os.ErrNotExist].
|
|
func (NoOutputError) Unwrap() error { return os.ErrNotExist }
|
|
|
|
// Error returns a constant string.
|
|
func (NoOutputError) Error() string {
|
|
return "artifact cured successfully but did not produce any output"
|
|
}
|
|
|
|
// removeAll is similar to [os.RemoveAll] but is robust against any permissions.
|
|
func removeAll(pathname *check.Absolute) (chmodErr, removeErr error) {
|
|
chmodErr = filepath.WalkDir(pathname.String(), func(
|
|
path string,
|
|
d fs.DirEntry,
|
|
err error,
|
|
) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if d.IsDir() {
|
|
return os.Chmod(path, 0700)
|
|
}
|
|
return nil
|
|
})
|
|
if errors.Is(chmodErr, os.ErrNotExist) {
|
|
chmodErr = nil
|
|
}
|
|
removeErr = os.RemoveAll(pathname.String())
|
|
return
|
|
}
|
|
|
|
// zeroTimes zeroes atime and mtime for the named file.
|
|
func zeroTimes(path string) (err error) {
|
|
// include/uapi/linux/fcntl.h
|
|
const (
|
|
AT_FDCWD = -100
|
|
AT_SYMLINK_NOFOLLOW = 0x100
|
|
)
|
|
_AT_FDCWD := AT_FDCWD
|
|
|
|
var _p0 *byte
|
|
_p0, err = syscall.BytePtrFromString(path)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if _, _, errno := syscall.Syscall6(
|
|
syscall.SYS_UTIMENSAT,
|
|
uintptr(_AT_FDCWD),
|
|
uintptr(unsafe.Pointer(_p0)),
|
|
uintptr(unsafe.Pointer(new([2]syscall.Timespec))),
|
|
AT_SYMLINK_NOFOLLOW,
|
|
0, 0,
|
|
); errno != 0 {
|
|
return os.NewSyscallError("utimensat", errno)
|
|
}
|
|
return
|
|
}
|
|
|
|
// overrideFileInfo overrides the permission bits of [fs.FileInfo] to 0500 and
|
|
// is the concrete type returned by overrideFile.Stat.
|
|
type overrideFileInfo struct{ fs.FileInfo }
|
|
|
|
// Mode returns [fs.FileMode] with its permission bits set to 0500.
|
|
func (fi overrideFileInfo) Mode() fs.FileMode {
|
|
return fi.FileInfo.Mode()&(^fs.FileMode(0777)) | 0500
|
|
}
|
|
|
|
// Sys returns nil to avoid passing the original permission bits.
|
|
func (fi overrideFileInfo) Sys() any { return nil }
|
|
|
|
// overrideFile overrides the permission bits of [fs.File] to 0500 and is the
|
|
// concrete type returned by dotOverrideFS for calls with "." passed as name.
|
|
type overrideFile struct{ fs.File }
|
|
|
|
func (f overrideFile) Stat() (fi fs.FileInfo, err error) {
|
|
fi, err = f.File.Stat()
|
|
if err != nil {
|
|
return
|
|
}
|
|
fi = overrideFileInfo{fi}
|
|
return
|
|
}
|
|
|
|
// dirFS is implemented by the concrete type of the return value of [os.DirFS].
|
|
type dirFS interface {
|
|
fs.StatFS
|
|
fs.ReadFileFS
|
|
fs.ReadDirFS
|
|
fs.ReadLinkFS
|
|
}
|
|
|
|
// dotOverrideFS overrides the permission bits of "." to 0500 to avoid the extra
|
|
// system calls to add and remove write bit from the target directory.
|
|
type dotOverrideFS struct{ dirFS }
|
|
|
|
// Open wraps the underlying [fs.FS] with "." special case.
|
|
func (fsys dotOverrideFS) Open(name string) (f fs.File, err error) {
|
|
f, err = fsys.dirFS.Open(name)
|
|
if err != nil || name != "." {
|
|
return
|
|
}
|
|
f = overrideFile{f}
|
|
return
|
|
}
|
|
|
|
// Stat wraps the underlying [fs.FS] with "." special case.
|
|
func (fsys dotOverrideFS) Stat(name string) (fi fs.FileInfo, err error) {
|
|
fi, err = fsys.dirFS.Stat(name)
|
|
if err != nil || name != "." {
|
|
return
|
|
}
|
|
fi = overrideFileInfo{fi}
|
|
return
|
|
}
|
|
|
|
// InvalidArtifactError describes an artifact that does not implement a
|
|
// supported Cure method.
|
|
type InvalidArtifactError ID
|
|
|
|
func (e InvalidArtifactError) Error() string {
|
|
return "artifact " + Encode(e) + " cannot be cured"
|
|
}
|
|
|
|
// DependencyError refers to an artifact with a dependency tree larger than the
|
|
// threshold specified by a previous call to [Cache.SetThreshold].
|
|
type DependencyError struct{ A Artifact }
|
|
|
|
func (e DependencyError) Error() string {
|
|
return "artifact has too many dependencies"
|
|
}
|
|
|
|
// Cure cures the [Artifact] and returns its pathname and [Checksum]. Direct
|
|
// calls to Cure are not subject to the cures limit.
|
|
func (c *Cache) Cure(a Artifact) (
|
|
pathname *check.Absolute,
|
|
checksum unique.Handle[Checksum],
|
|
err error,
|
|
) {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
err = c.ctx.Err()
|
|
return
|
|
|
|
default:
|
|
}
|
|
|
|
if c.threshold > 0 {
|
|
var n uintptr
|
|
for range Flood(a) {
|
|
if n == c.threshold {
|
|
err = DependencyError{a}
|
|
return
|
|
}
|
|
n++
|
|
}
|
|
c.msg.Verbosef("visited %d artifacts", n)
|
|
}
|
|
|
|
return c.cure(a, true)
|
|
}
|
|
|
|
// CureError wraps a non-nil error returned attempting to cure an [Artifact].
|
|
type CureError struct {
|
|
Ident unique.Handle[ID]
|
|
Err error
|
|
}
|
|
|
|
// Unwrap returns the underlying error.
|
|
func (e *CureError) Unwrap() error { return e.Err }
|
|
|
|
// Error returns the error message from the underlying Err.
|
|
func (e *CureError) Error() string { return e.Err.Error() }
|
|
|
|
// A DependencyCureError wraps errors returned while curing dependencies.
|
|
type DependencyCureError []*CureError
|
|
|
|
// unwrapM recursively expands underlying errors into a caller-supplied map.
|
|
func (e *DependencyCureError) unwrapM(me map[unique.Handle[ID]]*CureError) {
|
|
for _, err := range *e {
|
|
if _e, ok := err.Err.(*DependencyCureError); ok {
|
|
_e.unwrapM(me)
|
|
continue
|
|
}
|
|
me[err.Ident] = err
|
|
}
|
|
}
|
|
|
|
// unwrap recursively expands and deduplicates underlying errors.
|
|
func (e *DependencyCureError) unwrap() DependencyCureError {
|
|
me := make(map[unique.Handle[ID]]*CureError)
|
|
e.unwrapM(me)
|
|
errs := slices.AppendSeq(
|
|
make(DependencyCureError, 0, len(me)),
|
|
maps.Values(me),
|
|
)
|
|
|
|
var identBuf [2]ID
|
|
slices.SortFunc(errs, func(a, b *CureError) int {
|
|
identBuf[0], identBuf[1] = a.Ident.Value(), b.Ident.Value()
|
|
return slices.Compare(identBuf[0][:], identBuf[1][:])
|
|
})
|
|
|
|
return errs
|
|
}
|
|
|
|
// Unwrap returns a deduplicated slice of underlying errors.
|
|
func (e *DependencyCureError) Unwrap() []error {
|
|
errs := e.unwrap()
|
|
_errs := make([]error, len(errs))
|
|
for i, err := range errs {
|
|
_errs[i] = err
|
|
}
|
|
return _errs
|
|
}
|
|
|
|
// Error returns a user-facing multiline error message.
|
|
func (e *DependencyCureError) Error() string {
|
|
errs := e.unwrap()
|
|
if len(errs) == 0 {
|
|
return "invalid dependency cure outcome"
|
|
}
|
|
var buf strings.Builder
|
|
buf.WriteString("errors curing dependencies:")
|
|
for _, err := range errs {
|
|
buf.WriteString("\n\t" + Encode(err.Ident.Value()) + ": " + err.Error())
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
// enterCure must be called before entering an [Artifact] implementation.
|
|
func (c *Cache) enterCure(a Artifact, curesExempt bool) error {
|
|
if a.IsExclusive() {
|
|
c.exclMu.Lock()
|
|
}
|
|
if curesExempt {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case c.cures <- struct{}{}:
|
|
return nil
|
|
|
|
case <-c.ctx.Done():
|
|
if a.IsExclusive() {
|
|
c.exclMu.Unlock()
|
|
}
|
|
return c.ctx.Err()
|
|
}
|
|
}
|
|
|
|
// exitCure must be called after exiting an [Artifact] implementation.
|
|
func (c *Cache) exitCure(a Artifact, curesExempt bool) {
|
|
if a.IsExclusive() {
|
|
c.exclMu.Unlock()
|
|
}
|
|
if curesExempt {
|
|
return
|
|
}
|
|
|
|
<-c.cures
|
|
}
|
|
|
|
// getWriter is like [bufio.NewWriter] but for bufioPool.
|
|
func (c *Cache) getWriter(w io.Writer) *bufio.Writer {
|
|
bw := c.bufioPool.Get().(*bufio.Writer)
|
|
bw.Reset(w)
|
|
return bw
|
|
}
|
|
|
|
// measuredReader implements [io.ReadCloser] and measures the checksum during
|
|
// Close. If the underlying reader is not read to EOF, Close blocks until all
|
|
// remaining data is consumed and validated.
|
|
type measuredReader struct {
|
|
// Underlying reader. Never exposed directly.
|
|
r io.ReadCloser
|
|
// For validating checksum. Never exposed directly.
|
|
h hash.Hash
|
|
// Buffers writes to h, initialised by [Cache]. Never exposed directly.
|
|
hbw *bufio.Writer
|
|
// Expected checksum, compared during Close.
|
|
want unique.Handle[Checksum]
|
|
|
|
// For accessing free lists.
|
|
c *Cache
|
|
|
|
// Set up via [io.TeeReader] by [Cache].
|
|
io.Reader
|
|
}
|
|
|
|
// Close reads the underlying [io.ReadCloser] to EOF, closes it and measures its
|
|
// outcome. It returns a [ChecksumMismatchError] for an unexpected checksum.
|
|
func (mr *measuredReader) Close() (err error) {
|
|
if mr.hbw == nil || mr.Reader == nil {
|
|
return os.ErrInvalid
|
|
}
|
|
err = mr.hbw.Flush()
|
|
mr.c.putWriter(mr.hbw)
|
|
mr.hbw, mr.Reader = nil, nil
|
|
if err != nil {
|
|
_ = mr.r.Close()
|
|
return
|
|
}
|
|
var n int64
|
|
if n, err = io.Copy(mr.h, mr.r); err != nil {
|
|
_ = mr.r.Close()
|
|
return
|
|
}
|
|
|
|
if n > 0 {
|
|
mr.c.msg.Verbosef("missed %d bytes on measured reader", n)
|
|
}
|
|
|
|
if err = mr.r.Close(); err != nil {
|
|
return
|
|
}
|
|
|
|
buf := mr.c.getIdentBuf()
|
|
mr.h.Sum(buf[:0])
|
|
|
|
if got := Checksum(buf[:]); got != mr.want.Value() {
|
|
err = &ChecksumMismatchError{
|
|
Got: got,
|
|
Want: mr.want.Value(),
|
|
}
|
|
}
|
|
|
|
mr.c.putIdentBuf(buf)
|
|
return
|
|
}
|
|
|
|
// newMeasuredReader implements [RContext.NewMeasuredReader].
|
|
func (c *Cache) newMeasuredReader(
|
|
r io.ReadCloser,
|
|
checksum unique.Handle[Checksum],
|
|
) io.ReadCloser {
|
|
mr := measuredReader{r: r, h: sha512.New384(), want: checksum, c: c}
|
|
mr.hbw = c.getWriter(mr.h)
|
|
mr.Reader = io.TeeReader(r, mr.hbw)
|
|
return &mr
|
|
}
|
|
|
|
// NewMeasuredReader returns an [io.ReadCloser] implementing behaviour required
|
|
// by [FileArtifact]. The resulting [io.ReadCloser] holds a buffer originating
|
|
// from [Cache] and must be closed to return this buffer.
|
|
func (r *RContext) NewMeasuredReader(
|
|
rc io.ReadCloser,
|
|
checksum unique.Handle[Checksum],
|
|
) io.ReadCloser {
|
|
return r.cache.newMeasuredReader(rc, checksum)
|
|
}
|
|
|
|
// putWriter adds bw to bufioPool.
|
|
func (c *Cache) putWriter(bw *bufio.Writer) { c.bufioPool.Put(bw) }
|
|
|
|
// cure implements Cure without checking the full dependency graph.
|
|
func (c *Cache) cure(a Artifact, curesExempt bool) (
|
|
pathname *check.Absolute,
|
|
checksum unique.Handle[Checksum],
|
|
err error,
|
|
) {
|
|
id := c.Ident(a)
|
|
ids := Encode(id.Value())
|
|
pathname = c.base.Append(
|
|
dirIdentifier,
|
|
ids,
|
|
)
|
|
defer func() {
|
|
if err != nil {
|
|
pathname = nil
|
|
checksum = unique.Handle[Checksum]{}
|
|
}
|
|
}()
|
|
|
|
var done chan<- struct{}
|
|
done, checksum, err = c.loadOrStoreIdent(id)
|
|
if done == nil {
|
|
return
|
|
} else {
|
|
defer func() { c.finaliseIdent(done, id, checksum, err) }()
|
|
}
|
|
|
|
_, err = os.Lstat(pathname.String())
|
|
if err == nil {
|
|
var name string
|
|
if name, err = os.Readlink(pathname.String()); err != nil {
|
|
return
|
|
}
|
|
buf := c.getIdentBuf()
|
|
err = Decode((*Checksum)(buf[:]), path.Base(name))
|
|
if err == nil {
|
|
checksum = unique.Make(Checksum(buf[:]))
|
|
}
|
|
c.putIdentBuf(buf)
|
|
return
|
|
}
|
|
if !errors.Is(err, os.ErrNotExist) {
|
|
return
|
|
}
|
|
|
|
var checksums string
|
|
defer func() {
|
|
if err == nil && checksums != "" {
|
|
err = os.Symlink(
|
|
checksumLinknamePrefix+checksums,
|
|
pathname.String(),
|
|
)
|
|
if err == nil {
|
|
err = zeroTimes(pathname.String())
|
|
}
|
|
}
|
|
}()
|
|
|
|
var checksumPathname *check.Absolute
|
|
var checksumFi os.FileInfo
|
|
if kc, ok := a.(KnownChecksum); ok {
|
|
checksum = unique.Make(kc.Checksum())
|
|
checksums = Encode(checksum.Value())
|
|
checksumPathname = c.base.Append(
|
|
dirChecksum,
|
|
checksums,
|
|
)
|
|
|
|
c.checksumMu.RLock()
|
|
checksumFi, err = os.Stat(checksumPathname.String())
|
|
c.checksumMu.RUnlock()
|
|
|
|
if err != nil {
|
|
if !errors.Is(err, os.ErrNotExist) {
|
|
return
|
|
}
|
|
|
|
checksumFi, err = nil, nil
|
|
}
|
|
}
|
|
|
|
if c.msg.IsVerbose() {
|
|
rn := reportName(a, id)
|
|
c.msg.Verbosef("curing %s...", rn)
|
|
defer func() {
|
|
if err != nil {
|
|
return
|
|
}
|
|
if checksums != "" {
|
|
c.msg.Verbosef("cured %s checksum %s", rn, checksums)
|
|
} else {
|
|
c.msg.Verbosef("cured %s", rn)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// cure FileArtifact outside type switch to skip TContext initialisation
|
|
if f, ok := a.(FileArtifact); ok {
|
|
if checksumFi != nil {
|
|
if !checksumFi.Mode().IsRegular() {
|
|
// unreachable
|
|
err = InvalidFileModeError(checksumFi.Mode())
|
|
}
|
|
return
|
|
}
|
|
|
|
work := c.base.Append(dirWork, ids)
|
|
var w *os.File
|
|
if w, err = os.OpenFile(
|
|
work.String(),
|
|
os.O_CREATE|os.O_EXCL|os.O_WRONLY,
|
|
0400,
|
|
); err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
closeErr := w.Close()
|
|
if err == nil {
|
|
err = closeErr
|
|
}
|
|
|
|
removeErr := os.Remove(work.String())
|
|
if err == nil && !errors.Is(removeErr, os.ErrNotExist) {
|
|
err = removeErr
|
|
}
|
|
}()
|
|
|
|
var r io.ReadCloser
|
|
if err = c.enterCure(a, curesExempt); err != nil {
|
|
return
|
|
}
|
|
r, err = f.Cure(&RContext{c})
|
|
if err == nil {
|
|
if checksumPathname == nil || c.IsStrict() {
|
|
h := sha512.New384()
|
|
hbw := c.getWriter(h)
|
|
_, err = io.Copy(w, io.TeeReader(r, hbw))
|
|
flushErr := hbw.Flush()
|
|
c.putWriter(hbw)
|
|
if err == nil {
|
|
err = flushErr
|
|
}
|
|
|
|
if err == nil {
|
|
buf := c.getIdentBuf()
|
|
h.Sum(buf[:0])
|
|
|
|
if checksumPathname == nil {
|
|
checksum = unique.Make(Checksum(buf[:]))
|
|
checksums = Encode(Checksum(buf[:]))
|
|
} else if c.IsStrict() {
|
|
if got := Checksum(buf[:]); got != checksum.Value() {
|
|
err = &ChecksumMismatchError{
|
|
Got: got,
|
|
Want: checksum.Value(),
|
|
}
|
|
}
|
|
}
|
|
|
|
c.putIdentBuf(buf)
|
|
|
|
if checksumPathname == nil {
|
|
checksumPathname = c.base.Append(
|
|
dirChecksum,
|
|
checksums,
|
|
)
|
|
}
|
|
}
|
|
} else {
|
|
_, err = io.Copy(w, r)
|
|
}
|
|
|
|
closeErr := r.Close()
|
|
if err == nil {
|
|
err = closeErr
|
|
}
|
|
}
|
|
c.exitCure(a, curesExempt)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
c.checksumMu.Lock()
|
|
if err = os.Rename(
|
|
work.String(),
|
|
checksumPathname.String(),
|
|
); err != nil {
|
|
c.checksumMu.Unlock()
|
|
return
|
|
}
|
|
timeErr := zeroTimes(checksumPathname.String())
|
|
c.checksumMu.Unlock()
|
|
|
|
if err == nil {
|
|
err = timeErr
|
|
}
|
|
return
|
|
}
|
|
|
|
if checksumFi != nil {
|
|
if !checksumFi.Mode().IsDir() {
|
|
// unreachable
|
|
err = InvalidFileModeError(checksumFi.Mode())
|
|
}
|
|
return
|
|
}
|
|
|
|
t := TContext{c, c.base.Append(dirWork, ids), c.base.Append(dirTemp, ids)}
|
|
switch ca := a.(type) {
|
|
case TrivialArtifact:
|
|
defer t.destroy(&err)
|
|
if err = c.enterCure(a, curesExempt); err != nil {
|
|
return
|
|
}
|
|
err = ca.Cure(&t)
|
|
c.exitCure(a, curesExempt)
|
|
if err != nil {
|
|
return
|
|
}
|
|
break
|
|
|
|
case FloodArtifact:
|
|
deps := a.Dependencies()
|
|
f := FContext{t, make(map[Artifact]cureRes, len(deps))}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(deps))
|
|
res := make([]cureRes, len(deps))
|
|
errs := make(DependencyCureError, 0, len(deps))
|
|
var errsMu sync.Mutex
|
|
for i, d := range deps {
|
|
pending := pendingArtifactDep{d, &res[i], &errs, &errsMu, &wg}
|
|
go pending.cure(c)
|
|
}
|
|
wg.Wait()
|
|
|
|
if len(errs) > 0 {
|
|
err = &errs
|
|
return
|
|
}
|
|
for i, p := range res {
|
|
f.deps[deps[i]] = p
|
|
}
|
|
|
|
defer f.destroy(&err)
|
|
if err = c.enterCure(a, curesExempt); err != nil {
|
|
return
|
|
}
|
|
err = ca.Cure(&f)
|
|
c.exitCure(a, curesExempt)
|
|
if err != nil {
|
|
return
|
|
}
|
|
break
|
|
|
|
default:
|
|
err = InvalidArtifactError(id.Value())
|
|
return
|
|
}
|
|
t.cache = nil
|
|
|
|
var fi os.FileInfo
|
|
if fi, err = os.Lstat(t.work.String()); err != nil {
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
err = NoOutputError{}
|
|
}
|
|
return
|
|
}
|
|
|
|
if !fi.IsDir() {
|
|
if !fi.Mode().IsRegular() {
|
|
err = InvalidFileModeError(fi.Mode())
|
|
} else {
|
|
err = errors.New("non-file artifact produced regular file")
|
|
}
|
|
return
|
|
}
|
|
|
|
var gotChecksum Checksum
|
|
if err = HashFS(
|
|
&gotChecksum,
|
|
dotOverrideFS{os.DirFS(t.work.String()).(dirFS)},
|
|
".",
|
|
); err != nil {
|
|
return
|
|
}
|
|
|
|
if checksumPathname == nil {
|
|
checksum = unique.Make(gotChecksum)
|
|
checksums = Encode(gotChecksum)
|
|
checksumPathname = c.base.Append(
|
|
dirChecksum,
|
|
checksums,
|
|
)
|
|
} else if gotChecksum != checksum.Value() {
|
|
err = &ChecksumMismatchError{
|
|
Got: gotChecksum,
|
|
Want: checksum.Value(),
|
|
}
|
|
return
|
|
}
|
|
|
|
if err = os.Chmod(t.work.String(), 0700); err != nil {
|
|
return
|
|
}
|
|
if err = filepath.WalkDir(t.work.String(), func(path string, _ fs.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return zeroTimes(path)
|
|
}); err != nil {
|
|
return
|
|
}
|
|
c.checksumMu.Lock()
|
|
if err = os.Rename(
|
|
t.work.String(),
|
|
checksumPathname.String(),
|
|
); err != nil {
|
|
if !errors.Is(err, os.ErrExist) {
|
|
c.checksumMu.Unlock()
|
|
return
|
|
}
|
|
// err is zeroed during deferred cleanup
|
|
} else {
|
|
err = os.Chmod(checksumPathname.String(), 0500)
|
|
}
|
|
c.checksumMu.Unlock()
|
|
return
|
|
}
|
|
|
|
// cure cures the pending [Artifact], stores its result and notifies the caller.
|
|
func (pending *pendingArtifactDep) cure(c *Cache) {
|
|
defer pending.Done()
|
|
|
|
var err error
|
|
pending.resP.pathname, pending.resP.checksum, err = c.cure(pending.a, false)
|
|
if err == nil {
|
|
return
|
|
}
|
|
|
|
pending.errsMu.Lock()
|
|
*pending.errs = append(*pending.errs, &CureError{c.Ident(pending.a), err})
|
|
pending.errsMu.Unlock()
|
|
}
|
|
|
|
// Close cancels all pending cures and waits for them to clean up.
|
|
func (c *Cache) Close() {
|
|
c.closeOnce.Do(func() {
|
|
c.cancel()
|
|
c.wg.Wait()
|
|
close(c.cures)
|
|
c.unlock()
|
|
})
|
|
}
|
|
|
|
// Open returns the address of a newly opened instance of [Cache].
|
|
//
|
|
// Concurrent cures of a [FloodArtifact] dependency graph is limited to the
|
|
// caller-supplied value, however direct calls to [Cache.Cure] is not subject
|
|
// to this limitation.
|
|
//
|
|
// A cures value of 0 or lower is equivalent to the value returned by
|
|
// [runtime.NumCPU].
|
|
//
|
|
// A successful call to Open guarantees exclusive access to the on-filesystem
|
|
// cache for the resulting instance of [Cache]. The [Cache.Close] method cancels
|
|
// and waits for pending cures on [Cache] before releasing this lock and must be
|
|
// called once the [Cache] is no longer needed.
|
|
func Open(
|
|
ctx context.Context,
|
|
msg message.Msg,
|
|
cures int,
|
|
base *check.Absolute,
|
|
) (*Cache, error) {
|
|
return open(ctx, msg, cures, base, true)
|
|
}
|
|
|
|
// open implements Open but allows omitting the [lockedfile] lock when called
|
|
// from a test. This is used to simulate invalid states in the test suite.
|
|
func open(
|
|
ctx context.Context,
|
|
msg message.Msg,
|
|
cures int,
|
|
base *check.Absolute,
|
|
lock bool,
|
|
) (*Cache, error) {
|
|
if cures < 1 {
|
|
cures = runtime.NumCPU()
|
|
}
|
|
|
|
for _, name := range []string{
|
|
dirIdentifier,
|
|
dirChecksum,
|
|
dirWork,
|
|
} {
|
|
if err := os.MkdirAll(base.Append(name).String(), 0700); err != nil &&
|
|
!errors.Is(err, os.ErrExist) {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
c := Cache{
|
|
cures: make(chan struct{}, cures),
|
|
|
|
msg: msg,
|
|
base: base,
|
|
|
|
ident: make(map[unique.Handle[ID]]unique.Handle[Checksum]),
|
|
identErr: make(map[unique.Handle[ID]]error),
|
|
identPending: make(map[unique.Handle[ID]]<-chan struct{}),
|
|
}
|
|
c.ctx, c.cancel = context.WithCancel(ctx)
|
|
c.identPool.New = func() any { return new(extIdent) }
|
|
c.bufioPool.New = func() any { return new(bufio.Writer) }
|
|
|
|
if lock || !testing.Testing() {
|
|
if unlock, err := lockedfile.MutexAt(
|
|
base.Append(fileLock).String(),
|
|
).Lock(); err != nil {
|
|
return nil, err
|
|
} else {
|
|
c.unlock = unlock
|
|
}
|
|
} else {
|
|
c.unlock = func() {}
|
|
}
|
|
|
|
return &c, nil
|
|
}
|