Files
hakurei/internal/pkg/pkg.go
Ophestra 0ab6c13c77
All checks were successful
Test / Create distribution (push) Successful in 43s
Test / Sandbox (push) Successful in 2m32s
Test / ShareFS (push) Successful in 3m43s
Test / Hpkg (push) Successful in 4m29s
Test / Sandbox (race detector) (push) Successful in 4m57s
Test / Hakurei (race detector) (push) Successful in 5m45s
Test / Hakurei (push) Successful in 2m31s
Test / Flake checks (push) Successful in 1m44s
internal/pkg: consistency check for on-disk cache
This change adds a method to check on-disk cache consistency and destroy inconsistent entries as they are encountered. This primarily helps verify artifact implementation correctness, but can also repair a cache that got into an inconsistent state from curing a misbehaving artifact, without having to destroy the entire cache.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2026-01-05 05:30:29 +09:00

900 lines
23 KiB
Go

// Package pkg provides utilities for packaging software.
package pkg
import (
"bytes"
"crypto/sha512"
"encoding/base64"
"encoding/binary"
"errors"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"slices"
"strings"
"sync"
"hakurei.app/container/check"
)
type (
// A Checksum is a SHA-384 checksum computed for a cured [Artifact].
Checksum = [sha512.Size384]byte
// An ID is a unique identifier returned by [Artifact.ID]. This value must
// be deterministically determined ahead of time.
ID Checksum
)
// Encode is abbreviation for base64.URLEncoding.EncodeToString(checksum[:]).
func Encode(checksum Checksum) string {
return base64.URLEncoding.EncodeToString(checksum[:])
}
// Decode is abbreviation for base64.URLEncoding.Decode(checksum[:], []byte(s)).
func Decode(s string) (checksum Checksum, err error) {
var n int
n, err = base64.URLEncoding.Decode(checksum[:], []byte(s))
if err == nil && n != len(Checksum{}) {
err = io.ErrUnexpectedEOF
}
return
}
// MustDecode decodes a string representation of [Checksum] and panics if there
// is a decoding error or the resulting data is too short.
func MustDecode(s string) Checksum {
if checksum, err := Decode(s); err != nil {
panic(err)
} else {
return checksum
}
}
// CacheDataFunc tries to load [File] from [Cache], and if that fails, obtains
// it via [File.Data] instead.
type CacheDataFunc func(f File) (data []byte, err error)
// An Artifact is a read-only reference to a piece of data that may be created
// deterministically but might not currently be available in memory or on the
// filesystem.
type Artifact interface {
// Kind returns the [Kind] of artifact. This is usually unique to the
// concrete type but two functionally identical implementations of
// [Artifact] is allowed to return the same [Kind] value.
Kind() Kind
// Params returns opaque bytes that describes [Artifact]. Implementations
// must guarantee that these values are unique among differing instances
// of the same implementation with the same dependencies.
//
// Callers must not modify the retuned byte slice.
//
// Result must remain identical across multiple invocations.
Params() []byte
// Dependencies returns a slice of [Artifact] that the current instance
// depends on to produce its contents.
//
// Callers must not modify the retuned slice.
//
// Result must remain identical across multiple invocations.
Dependencies() []Artifact
// Cure cures the current [Artifact] to the caller-specified temporary
// pathname. This is not the final resting place of the [Artifact] and this
// pathname should not be directly referred to in the final contents.
//
// If the implementation produces a single file, it must implement [File]
// as well. In that case, Cure must produce a single regular file with
// contents identical to that returned by [File.Data].
//
// Implementations may use temp as scratch space. The caller is not required
// to create a directory here, implementations are expected to create it if
// they wish to use it, using [os.MkdirAll].
Cure(work, temp *check.Absolute, loadData CacheDataFunc) (err error)
}
// KnownIdent is optionally implemented by [Artifact] and is used instead of
// [Kind.Ident] when it is available.
//
// This is very subtle to use correctly. The implementation must ensure that
// this value is globally unique, otherwise [Cache] can enter an inconsistent
// state. This should not be implemented outside of testing.
type KnownIdent interface {
// ID returns a globally unique identifier referring to the current
// [Artifact]. This value must be known ahead of time and guaranteed to be
// unique without having obtained the full contents of the [Artifact].
ID() ID
}
// KnownChecksum is optionally implemented by [Artifact] for an artifact with
// output known ahead of time.
type KnownChecksum interface {
// Checksum returns the address of a known checksum.
//
// Callers must not modify the [Checksum].
//
// Result must remain identical across multiple invocations.
Checksum() Checksum
}
// A File refers to an [Artifact] backed by a single file.
type File interface {
// Data returns the full contents of [Artifact]. If [Artifact.Checksum]
// returns a non-nil address, Data is responsible for validating any data
// it produces and must return [ChecksumMismatchError] if validation fails.
//
// Callers must not modify the returned byte slice.
Data() ([]byte, error)
Artifact
}
// Ident returns the identifier of an [Artifact].
func Ident(a Artifact) ID {
if ki, ok := a.(KnownIdent); ok {
return ki.ID()
}
return a.Kind().Ident(a.Params(), a.Dependencies()...)
}
// Kind corresponds to the concrete type of [Artifact] and is used to create
// identifier for an [Artifact] with dependencies.
type Kind uint64
const (
// KindHTTPGet is the kind of [Artifact] returned by [NewHTTPGet].
KindHTTPGet Kind = iota
// KindTar is the kind of artifact returned by [NewTar].
KindTar
)
// Ident returns a deterministic identifier for the supplied params and
// dependencies. The caller is responsible for ensuring params uniquely and
// deterministically describes the current [Artifact].
func (k Kind) Ident(params []byte, deps ...Artifact) ID {
type extIdent [len(ID{}) + wordSize]byte
identifiers := make([]extIdent, len(deps))
for i, a := range deps {
id := Ident(a)
copy(identifiers[i][wordSize:], id[:])
binary.LittleEndian.PutUint64(identifiers[i][:], uint64(a.Kind()))
}
slices.SortFunc(identifiers, func(a, b extIdent) int {
return bytes.Compare(a[:], b[:])
})
slices.Compact(identifiers)
h := sha512.New384()
h.Write(binary.LittleEndian.AppendUint64(nil, uint64(k)))
h.Write(params)
for _, e := range identifiers {
h.Write(e[:])
}
return ID(h.Sum(nil))
}
const (
// dirIdentifier is the directory name appended to Cache.base for storing
// artifacts named after their [ID].
dirIdentifier = "identifier"
// dirChecksum is the directory name appended to Cache.base for storing
// artifacts named after their [Checksum].
dirChecksum = "checksum"
// dirWork is the directory name appended to Cache.base for working
// pathnames set up during [Cache.Cure].
dirWork = "work"
// dirTemp is the directory name appended to Cache.base for scratch space
// pathnames allocated during [Cache.Cure].
dirTemp = "temp"
// checksumLinknamePrefix is prepended to the encoded [Checksum] value
// of an [Artifact] when creating a symbolic link to dirChecksum.
checksumLinknamePrefix = "../" + dirChecksum + "/"
)
// Cache is a support layer that implementations of [Artifact] can use to store
// cured [Artifact] data in a content addressed fashion.
type Cache struct {
// Directory where all [Cache] related files are placed.
base *check.Absolute
// Whether to validate [File.Data] for a [KnownChecksum] file. This
// significantly reduces performance.
strict bool
// Synchronises access to dirChecksum.
checksumMu sync.RWMutex
// Identifier to content pair cache.
ident map[ID]Checksum
// Identifier to error pair for unrecoverably faulted [Artifact].
identErr map[ID]error
// Pending identifiers, accessed through Cure for entries not in ident.
identPending map[ID]<-chan struct{}
// Synchronises access to ident and corresponding filesystem entries.
identMu sync.RWMutex
}
// IsStrict returns whether the [Cache] strictly verifies checksums.
func (c *Cache) IsStrict() bool { return c.strict }
// SetStrict sets whether the [Cache] strictly verifies checksums, even when
// the implementation promises to validate them internally. This significantly
// reduces performance and is not recommended outside of testing.
//
// This method is not safe for concurrent use with any other method.
func (c *Cache) SetStrict(strict bool) { c.strict = strict }
// A ChecksumMismatchError describes an [Artifact] with unexpected content.
type ChecksumMismatchError struct {
// Actual and expected checksums.
Got, Want Checksum
}
func (e *ChecksumMismatchError) Error() string {
return "got " + Encode(e.Got) +
" instead of " + Encode(e.Want)
}
// ScrubError describes the outcome of a [Cache.Scrub] call where errors were
// found and removed from the underlying storage of [Cache].
type ScrubError struct {
// Content-addressed entries not matching their checksum. This can happen
// if an incorrect [File] implementation was cured against a non-strict
// [Cache].
ChecksumMismatches []ChecksumMismatchError
// Dangling identifier symlinks. This can happen if the content-addressed
// entry was removed while scrubbing due to a checksum mismatch.
DanglingIdentifiers []ID
// Miscellaneous errors, including [os.ReadDir] on checksum and identifier
// directories, [Decode] on entry names and [os.RemoveAll] on inconsistent
// entries.
Errs []error
}
// Unwrap returns a concatenation of ChecksumMismatches and Errs.
func (e *ScrubError) Unwrap() []error {
s := make([]error, 0, len(e.ChecksumMismatches)+len(e.Errs))
for _, err := range e.ChecksumMismatches {
s = append(s, &err)
}
for _, err := range e.Errs {
s = append(s, err)
}
return s
}
// Error returns a multi-line representation of [ScrubError].
func (e *ScrubError) Error() string {
var segments []string
if len(e.ChecksumMismatches) > 0 {
s := "checksum mismatches:\n"
for _, m := range e.ChecksumMismatches {
s += m.Error() + "\n"
}
segments = append(segments, s)
}
if len(e.DanglingIdentifiers) > 0 {
s := "dangling identifiers:\n"
for _, id := range e.DanglingIdentifiers {
s += Encode(id) + "\n"
}
segments = append(segments, s)
}
if len(e.Errs) > 0 {
s := "errors during scrub:\n"
for _, err := range e.Errs {
s += err.Error() + "\n"
}
segments = append(segments, s)
}
return strings.Join(segments, "\n")
}
// Scrub frees internal in-memory identifier to content pair cache, verifies all
// cached artifacts against their checksums, checks for dangling identifier
// symlinks and removes them if found.
//
// This method is not safe for concurrent use with any other method.
func (c *Cache) Scrub() error {
c.identMu.Lock()
defer c.identMu.Unlock()
c.checksumMu.Lock()
defer c.checksumMu.Unlock()
c.ident = make(map[ID]Checksum)
c.identErr = make(map[ID]error)
var se ScrubError
var (
ent os.DirEntry
dir *check.Absolute
)
condemnEntry := func() {
chmodErr, removeErr := removeAll(dir.Append(ent.Name()))
if chmodErr != nil {
se.Errs = append(se.Errs, chmodErr)
}
if removeErr != nil {
se.Errs = append(se.Errs, removeErr)
}
}
dir = c.base.Append(dirChecksum)
if entries, err := os.ReadDir(dir.String()); err != nil {
se.Errs = append(se.Errs, err)
} else {
var got, want Checksum
for _, ent = range entries {
if want, err = Decode(ent.Name()); err != nil {
se.Errs = append(se.Errs, err)
condemnEntry()
continue
}
if ent.IsDir() {
if got, err = HashDir(dir.Append(ent.Name())); err != nil {
se.Errs = append(se.Errs, err)
continue
}
} else if ent.Type().IsRegular() {
h := sha512.New384()
var r *os.File
r, err = os.Open(dir.Append(ent.Name()).String())
if err != nil {
se.Errs = append(se.Errs, err)
continue
}
_, err = io.Copy(h, r)
closeErr := r.Close()
if closeErr != nil {
se.Errs = append(se.Errs, closeErr)
}
if err != nil {
se.Errs = append(se.Errs, err)
continue
}
h.Sum(got[:0])
} else {
se.Errs = append(se.Errs, InvalidFileModeError(ent.Type()))
condemnEntry()
continue
}
if got != want {
se.ChecksumMismatches = append(se.ChecksumMismatches, ChecksumMismatchError{
Got: got,
Want: want,
})
condemnEntry()
}
}
}
dir = c.base.Append(dirIdentifier)
if entries, err := os.ReadDir(dir.String()); err != nil {
se.Errs = append(se.Errs, err)
} else {
var (
id ID
linkname string
)
for _, ent = range entries {
if id, err = Decode(ent.Name()); err != nil {
se.Errs = append(se.Errs, err)
condemnEntry()
continue
}
if linkname, err = os.Readlink(
dir.Append(ent.Name()).String(),
); err != nil {
se.Errs = append(se.Errs, err)
se.DanglingIdentifiers = append(se.DanglingIdentifiers, id)
condemnEntry()
continue
}
if _, err = Decode(path.Base(linkname)); err != nil {
se.Errs = append(se.Errs, err)
se.DanglingIdentifiers = append(se.DanglingIdentifiers, id)
condemnEntry()
continue
}
if _, err = os.Stat(dir.Append(ent.Name()).String()); err != nil {
if !errors.Is(err, os.ErrNotExist) {
se.Errs = append(se.Errs, err)
}
se.DanglingIdentifiers = append(se.DanglingIdentifiers, id)
condemnEntry()
continue
}
}
}
if len(c.identPending) > 0 {
se.Errs = append(se.Errs, errors.New(
"scrub began with pending artifacts",
))
} else {
chmodErr, removeErr := removeAll(c.base.Append(dirWork))
if chmodErr != nil {
se.Errs = append(se.Errs, chmodErr)
}
if removeErr != nil {
se.Errs = append(se.Errs, removeErr)
}
if err := os.Mkdir(
c.base.Append(dirWork).String(),
0700,
); err != nil {
se.Errs = append(se.Errs, err)
}
chmodErr, removeErr = removeAll(c.base.Append(dirTemp))
if chmodErr != nil {
se.Errs = append(se.Errs, chmodErr)
}
if removeErr != nil {
se.Errs = append(se.Errs, removeErr)
}
}
if len(se.ChecksumMismatches) > 0 ||
len(se.DanglingIdentifiers) > 0 ||
len(se.Errs) > 0 {
return &se
} else {
return nil
}
}
// loadOrStoreIdent attempts to load a cached [Artifact] by its identifier or
// wait for a pending [Artifact] to cure. If neither is possible, the current
// identifier is stored in identPending and a non-nil channel is returned.
func (c *Cache) loadOrStoreIdent(id *ID) (
done chan<- struct{},
checksum Checksum,
err error,
) {
var ok bool
c.identMu.Lock()
if checksum, ok = c.ident[*id]; ok {
c.identMu.Unlock()
return
}
if err, ok = c.identErr[*id]; ok {
c.identMu.Unlock()
return
}
var notify <-chan struct{}
if notify, ok = c.identPending[*id]; ok {
c.identMu.Unlock()
<-notify
c.identMu.RLock()
if checksum, ok = c.ident[*id]; !ok {
err = c.identErr[*id]
}
c.identMu.RUnlock()
return
}
d := make(chan struct{})
c.identPending[*id] = d
c.identMu.Unlock()
done = d
return
}
// finaliseIdent commits a checksum or error to ident for an identifier
// previously submitted to identPending.
func (c *Cache) finaliseIdent(
done chan<- struct{},
id *ID,
checksum *Checksum,
err error,
) {
c.identMu.Lock()
if err != nil {
c.identErr[*id] = err
} else {
c.ident[*id] = *checksum
}
delete(c.identPending, *id)
c.identMu.Unlock()
close(done)
}
// loadData provides [CacheDataFunc] for [Artifact.Cure].
func (c *Cache) loadData(f File) (data []byte, err error) {
var r *os.File
if kc, ok := f.(KnownChecksum); ok {
c.checksumMu.RLock()
r, err = os.Open(c.base.Append(
dirChecksum,
Encode(kc.Checksum()),
).String())
c.checksumMu.RUnlock()
} else {
c.identMu.RLock()
r, err = os.Open(c.base.Append(
dirIdentifier,
Encode(Ident(f)),
).String())
c.identMu.RUnlock()
}
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return
}
return f.Data()
}
data, err = io.ReadAll(r)
closeErr := r.Close()
if err == nil {
err = closeErr
}
return
}
// InvalidFileModeError describes an [Artifact.Cure] that did not result in
// a regular file or directory located at the work pathname.
type InvalidFileModeError fs.FileMode
// Error returns a constant string.
func (e InvalidFileModeError) Error() string {
return "artifact did not produce a regular file or directory"
}
// NoOutputError describes an [Artifact.Cure] that did not populate its
// work pathname despite completing successfully.
type NoOutputError struct{}
// Unwrap returns [os.ErrNotExist].
func (NoOutputError) Unwrap() error { return os.ErrNotExist }
// Error returns a constant string.
func (NoOutputError) Error() string {
return "artifact cured successfully but did not produce any output"
}
// removeAll is similar to [os.RemoveAll] but is robust against any permissions.
func removeAll(pathname *check.Absolute) (chmodErr, removeErr error) {
chmodErr = filepath.WalkDir(pathname.String(), func(
path string,
d fs.DirEntry,
err error,
) error {
if err != nil {
return err
}
if d.IsDir() {
return os.Chmod(path, 0700)
}
return nil
})
if errors.Is(chmodErr, os.ErrNotExist) {
chmodErr = nil
}
removeErr = os.RemoveAll(pathname.String())
return
}
// overrideFileInfo overrides the permission bits of [fs.FileInfo] to 0500 and
// is the concrete type returned by overrideFile.Stat.
type overrideFileInfo struct{ fs.FileInfo }
// Mode returns [fs.FileMode] with its permission bits set to 0500.
func (fi overrideFileInfo) Mode() fs.FileMode {
return fi.FileInfo.Mode()&(^fs.FileMode(0777)) | 0500
}
// Sys returns nil to avoid passing the original permission bits.
func (fi overrideFileInfo) Sys() any { return nil }
// overrideFile overrides the permission bits of [fs.File] to 0500 and is the
// concrete type returned by dotOverrideFS for calls with "." passed as name.
type overrideFile struct{ fs.File }
func (f overrideFile) Stat() (fi fs.FileInfo, err error) {
fi, err = f.File.Stat()
if err != nil {
return
}
fi = overrideFileInfo{fi}
return
}
// dirFS is implemented by the concrete type of the return value of [os.DirFS].
type dirFS interface {
fs.StatFS
fs.ReadFileFS
fs.ReadDirFS
fs.ReadLinkFS
}
// dotOverrideFS overrides the permission bits of "." to 0500 to avoid the extra
// system calls to add and remove write bit from the target directory.
type dotOverrideFS struct{ dirFS }
// Open wraps the underlying [fs.FS] with "." special case.
func (fsys dotOverrideFS) Open(name string) (f fs.File, err error) {
f, err = fsys.dirFS.Open(name)
if err != nil || name != "." {
return
}
f = overrideFile{f}
return
}
// Stat wraps the underlying [fs.FS] with "." special case.
func (fsys dotOverrideFS) Stat(name string) (fi fs.FileInfo, err error) {
fi, err = fsys.dirFS.Stat(name)
if err != nil || name != "." {
return
}
fi = overrideFileInfo{fi}
return
}
// Cure cures the [Artifact] and returns its pathname and [Checksum].
func (c *Cache) Cure(a Artifact) (
pathname *check.Absolute,
checksum Checksum,
err error,
) {
id := Ident(a)
ids := Encode(id)
pathname = c.base.Append(
dirIdentifier,
ids,
)
defer func() {
if err != nil {
pathname = nil
checksum = Checksum{}
}
}()
var done chan<- struct{}
done, checksum, err = c.loadOrStoreIdent(&id)
if done == nil {
return
} else {
defer func() { c.finaliseIdent(done, &id, &checksum, err) }()
}
_, err = os.Lstat(pathname.String())
if err == nil {
var name string
if name, err = os.Readlink(pathname.String()); err != nil {
return
}
checksum, err = Decode(path.Base(name))
return
}
if !errors.Is(err, os.ErrNotExist) {
return
}
var checksums string
defer func() {
if err == nil && checksums != "" {
err = os.Symlink(
checksumLinknamePrefix+checksums,
pathname.String(),
)
}
}()
var checksumPathname *check.Absolute
var checksumFi os.FileInfo
if kc, ok := a.(KnownChecksum); ok {
checksum = kc.Checksum()
checksums = Encode(checksum)
checksumPathname = c.base.Append(
dirChecksum,
checksums,
)
c.checksumMu.RLock()
checksumFi, err = os.Stat(checksumPathname.String())
c.checksumMu.RUnlock()
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return
}
checksumFi, err = nil, nil
}
}
if f, ok := a.(File); ok {
if checksumFi != nil {
if !checksumFi.Mode().IsRegular() {
// unreachable
err = InvalidFileModeError(checksumFi.Mode())
}
return
}
var data []byte
data, err = f.Data()
if err != nil {
return
}
if checksumPathname == nil {
h := sha512.New384()
h.Write(data)
h.Sum(checksum[:0])
checksums = Encode(checksum)
checksumPathname = c.base.Append(
dirChecksum,
checksums,
)
} else if c.IsStrict() {
h := sha512.New384()
h.Write(data)
if got := Checksum(h.Sum(nil)); got != checksum {
err = &ChecksumMismatchError{
Got: got,
Want: checksum,
}
return
}
}
c.checksumMu.Lock()
var w *os.File
w, err = os.OpenFile(
checksumPathname.String(),
os.O_CREATE|os.O_EXCL|os.O_WRONLY,
0400,
)
if err != nil {
c.checksumMu.Unlock()
if errors.Is(err, os.ErrExist) {
err = nil
}
return
}
_, err = w.Write(data)
closeErr := w.Close()
if err == nil {
err = closeErr
}
c.checksumMu.Unlock()
return
} else {
if checksumFi != nil {
if !checksumFi.Mode().IsDir() {
// unreachable
err = InvalidFileModeError(checksumFi.Mode())
}
return
}
workPathname := c.base.Append(dirWork, ids)
defer func() {
if err != nil {
chmodErr, removeErr := removeAll(workPathname)
if chmodErr != nil || removeErr != nil {
err = errors.Join(err, chmodErr, removeErr)
} else if errors.Is(err, os.ErrExist) {
// two artifacts may be backed by the same file
err = nil
}
}
}()
tempPathname := c.base.Append(dirTemp, ids)
if err = a.Cure(workPathname, tempPathname, c.loadData); err != nil {
return
}
if chmodErr, removeErr := removeAll(tempPathname); chmodErr != nil || removeErr != nil {
err = errors.Join(err, chmodErr, removeErr)
return
}
var fi os.FileInfo
if fi, err = os.Lstat(workPathname.String()); err != nil {
if errors.Is(err, os.ErrNotExist) {
err = NoOutputError{}
}
return
}
if !fi.IsDir() {
if !fi.Mode().IsRegular() {
err = InvalidFileModeError(fi.Mode())
} else {
err = errors.New("non-file artifact produced regular file")
}
return
}
var gotChecksum Checksum
if gotChecksum, err = HashFS(
dotOverrideFS{os.DirFS(workPathname.String()).(dirFS)},
".",
); err != nil {
return
}
if checksumPathname == nil {
checksum = gotChecksum
checksums = Encode(checksum)
checksumPathname = c.base.Append(
dirChecksum,
checksums,
)
} else {
if gotChecksum != checksum {
err = &ChecksumMismatchError{
Got: gotChecksum,
Want: checksum,
}
return
}
}
if err = os.Chmod(workPathname.String(), 0700); err != nil {
return
}
c.checksumMu.Lock()
if err = os.Rename(
workPathname.String(),
checksumPathname.String(),
); err != nil {
if !errors.Is(err, os.ErrExist) {
c.checksumMu.Unlock()
return
}
// err is zeroed during deferred cleanup
} else {
err = os.Chmod(checksumPathname.String(), 0500)
}
c.checksumMu.Unlock()
return
}
}
// New returns the address to a new instance of [Cache].
func New(base *check.Absolute) (*Cache, error) {
for _, name := range []string{
dirIdentifier,
dirChecksum,
dirWork,
} {
if err := os.MkdirAll(base.Append(name).String(), 0700); err != nil &&
!errors.Is(err, os.ErrExist) {
return nil, err
}
}
return &Cache{
base: base,
ident: make(map[ID]Checksum),
identErr: make(map[ID]error),
identPending: make(map[ID]<-chan struct{}),
}, nil
}