internal/pkg: parallelise scrub
All checks were successful
Test / Create distribution (push) Successful in 47s
Test / ShareFS (push) Successful in 5m8s
Test / Sandbox (race detector) (push) Successful in 5m28s
Test / Hpkg (push) Successful in 5m39s
Test / Hakurei (push) Successful in 6m3s
Test / Hakurei (race detector) (push) Successful in 8m6s
Test / Sandbox (push) Successful in 1m40s
Test / Flake checks (push) Successful in 1m43s
All checks were successful
Test / Create distribution (push) Successful in 47s
Test / ShareFS (push) Successful in 5m8s
Test / Sandbox (race detector) (push) Successful in 5m28s
Test / Hpkg (push) Successful in 5m39s
Test / Hakurei (push) Successful in 6m3s
Test / Hakurei (race detector) (push) Successful in 8m6s
Test / Sandbox (push) Successful in 1m40s
Test / Flake checks (push) Successful in 1m43s
This significantly improves scrubbing performance. Since the cache directory structure is friendly to simultaneous access, this is possible without synchronisation. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
||||
"io"
|
||||
"io/fs"
|
||||
"iter"
|
||||
"maps"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
@@ -43,10 +44,10 @@ func Encode(checksum Checksum) string {
|
||||
}
|
||||
|
||||
// Decode is abbreviation for base64.URLEncoding.Decode(checksum[:], []byte(s)).
|
||||
func Decode(s string) (checksum Checksum, err error) {
|
||||
func Decode(buf *Checksum, s string) (err error) {
|
||||
var n int
|
||||
n, err = base64.URLEncoding.Decode(checksum[:], []byte(s))
|
||||
if err == nil && n != len(Checksum{}) {
|
||||
n, err = base64.URLEncoding.Decode(buf[:], []byte(s))
|
||||
if err == nil && n != len(buf) {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
return
|
||||
@@ -54,12 +55,11 @@ func Decode(s string) (checksum Checksum, err error) {
|
||||
|
||||
// MustDecode decodes a string representation of [Checksum] and panics if there
|
||||
// is a decoding error or the resulting data is too short.
|
||||
func MustDecode(s string) Checksum {
|
||||
if checksum, err := Decode(s); err != nil {
|
||||
func MustDecode(s string) (checksum Checksum) {
|
||||
if err := Decode(&checksum, s); err != nil {
|
||||
panic(err)
|
||||
} else {
|
||||
return checksum
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// IContext is passed to [Artifact.Params] and provides identifier information
|
||||
@@ -563,7 +563,23 @@ type ScrubError struct {
|
||||
// Miscellaneous errors, including [os.ReadDir] on checksum and identifier
|
||||
// directories, [Decode] on entry names and [os.RemoveAll] on inconsistent
|
||||
// entries.
|
||||
Errs []error
|
||||
Errs map[unique.Handle[string]][]error
|
||||
}
|
||||
|
||||
// errs is a deterministic iterator over Errs.
|
||||
func (e *ScrubError) errs(yield func(unique.Handle[string], []error) bool) {
|
||||
keys := slices.AppendSeq(
|
||||
make([]unique.Handle[string], 0, len(e.Errs)),
|
||||
maps.Keys(e.Errs),
|
||||
)
|
||||
slices.SortFunc(keys, func(a, b unique.Handle[string]) int {
|
||||
return strings.Compare(a.Value(), b.Value())
|
||||
})
|
||||
for _, key := range keys {
|
||||
if !yield(key, e.Errs[key]) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Unwrap returns a concatenation of ChecksumMismatches and Errs.
|
||||
@@ -572,8 +588,8 @@ func (e *ScrubError) Unwrap() []error {
|
||||
for _, err := range e.ChecksumMismatches {
|
||||
s = append(s, &err)
|
||||
}
|
||||
for _, err := range e.Errs {
|
||||
s = append(s, err)
|
||||
for _, errs := range e.errs {
|
||||
s = append(s, errs...)
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -597,8 +613,11 @@ func (e *ScrubError) Error() string {
|
||||
}
|
||||
if len(e.Errs) > 0 {
|
||||
s := "errors during scrub:\n"
|
||||
for _, err := range e.Errs {
|
||||
s += err.Error() + "\n"
|
||||
for pathname, errs := range e.errs {
|
||||
s += " " + pathname.Value() + ":\n"
|
||||
for _, err := range errs {
|
||||
s += " " + err.Error() + "\n"
|
||||
}
|
||||
}
|
||||
segments = append(segments, s)
|
||||
}
|
||||
@@ -610,7 +629,11 @@ func (e *ScrubError) Error() string {
|
||||
// symlinks and removes them if found.
|
||||
//
|
||||
// This method is not safe for concurrent use with any other method.
|
||||
func (c *Cache) Scrub() error {
|
||||
func (c *Cache) Scrub(checks int) error {
|
||||
if checks <= 0 {
|
||||
checks = runtime.NumCPU()
|
||||
}
|
||||
|
||||
c.identMu.Lock()
|
||||
defer c.identMu.Unlock()
|
||||
c.checksumMu.Lock()
|
||||
@@ -618,147 +641,191 @@ func (c *Cache) Scrub() error {
|
||||
|
||||
c.ident = make(map[unique.Handle[ID]]Checksum)
|
||||
c.identErr = make(map[unique.Handle[ID]]error)
|
||||
|
||||
var se ScrubError
|
||||
c.artifact.Clear()
|
||||
|
||||
var (
|
||||
ent os.DirEntry
|
||||
dir *check.Absolute
|
||||
se = ScrubError{Errs: make(map[unique.Handle[string]][]error)}
|
||||
seMu sync.Mutex
|
||||
|
||||
addErr = func(pathname *check.Absolute, err error) {
|
||||
seMu.Lock()
|
||||
se.Errs[pathname.Handle()] = append(se.Errs[pathname.Handle()], err)
|
||||
seMu.Unlock()
|
||||
}
|
||||
)
|
||||
condemnEntry := func() {
|
||||
chmodErr, removeErr := removeAll(dir.Append(ent.Name()))
|
||||
|
||||
type checkEntry struct {
|
||||
ent os.DirEntry
|
||||
check func(ent os.DirEntry, want *Checksum) bool
|
||||
}
|
||||
var (
|
||||
dir *check.Absolute
|
||||
wg sync.WaitGroup
|
||||
w = make(chan checkEntry, checks)
|
||||
p = sync.Pool{New: func() any { return new(Checksum) }}
|
||||
)
|
||||
condemn := func(ent os.DirEntry) {
|
||||
pathname := dir.Append(ent.Name())
|
||||
chmodErr, removeErr := removeAll(pathname)
|
||||
if chmodErr != nil {
|
||||
se.Errs = append(se.Errs, chmodErr)
|
||||
addErr(pathname, chmodErr)
|
||||
}
|
||||
if removeErr != nil {
|
||||
se.Errs = append(se.Errs, removeErr)
|
||||
addErr(pathname, removeErr)
|
||||
}
|
||||
}
|
||||
for i := 0; i < checks; i++ {
|
||||
go func() {
|
||||
for ce := range w {
|
||||
want := p.Get().(*Checksum)
|
||||
ent := ce.ent
|
||||
if err := Decode(want, ent.Name()); err != nil {
|
||||
addErr(dir.Append(ent.Name()), err)
|
||||
wg.Go(func() { condemn(ent) })
|
||||
} else if !ce.check(ent, want) {
|
||||
wg.Go(func() { condemn(ent) })
|
||||
} else {
|
||||
c.msg.Verbosef("%s is consistent", ent.Name())
|
||||
}
|
||||
p.Put(want)
|
||||
wg.Done()
|
||||
}
|
||||
}()
|
||||
}
|
||||
defer close(w)
|
||||
|
||||
dir = c.base.Append(dirChecksum)
|
||||
if entries, err := os.ReadDir(dir.String()); err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
if entries, readdirErr := os.ReadDir(dir.String()); readdirErr != nil {
|
||||
addErr(dir, readdirErr)
|
||||
} else {
|
||||
var got, want Checksum
|
||||
for _, ent = range entries {
|
||||
if want, err = Decode(ent.Name()); err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
condemnEntry()
|
||||
continue
|
||||
}
|
||||
if ent.IsDir() {
|
||||
if got, err = HashDir(dir.Append(ent.Name())); err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
continue
|
||||
}
|
||||
} else if ent.Type().IsRegular() {
|
||||
h := sha512.New384()
|
||||
var r *os.File
|
||||
r, err = os.Open(dir.Append(ent.Name()).String())
|
||||
if err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
continue
|
||||
}
|
||||
_, err = io.Copy(h, r)
|
||||
closeErr := r.Close()
|
||||
if closeErr != nil {
|
||||
se.Errs = append(se.Errs, closeErr)
|
||||
}
|
||||
if err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
continue
|
||||
}
|
||||
h.Sum(got[:0])
|
||||
} else {
|
||||
se.Errs = append(se.Errs, InvalidFileModeError(ent.Type()))
|
||||
condemnEntry()
|
||||
continue
|
||||
}
|
||||
wg.Add(len(entries))
|
||||
for _, ent := range entries {
|
||||
w <- checkEntry{ent, func(ent os.DirEntry, want *Checksum) bool {
|
||||
got := p.Get().(*Checksum)
|
||||
defer p.Put(got)
|
||||
|
||||
if got != want {
|
||||
se.ChecksumMismatches = append(se.ChecksumMismatches, ChecksumMismatchError{
|
||||
Got: got,
|
||||
Want: want,
|
||||
})
|
||||
condemnEntry()
|
||||
}
|
||||
pathname := dir.Append(ent.Name())
|
||||
if ent.IsDir() {
|
||||
if err := HashDir(got, pathname); err != nil {
|
||||
addErr(pathname, err)
|
||||
return true
|
||||
}
|
||||
} else if ent.Type().IsRegular() {
|
||||
h := sha512.New384()
|
||||
|
||||
if r, err := os.Open(pathname.String()); err != nil {
|
||||
addErr(pathname, err)
|
||||
return true
|
||||
} else {
|
||||
_, err = io.Copy(h, r)
|
||||
closeErr := r.Close()
|
||||
if closeErr != nil {
|
||||
addErr(pathname, closeErr)
|
||||
}
|
||||
if err != nil {
|
||||
addErr(pathname, err)
|
||||
}
|
||||
}
|
||||
h.Sum(got[:0])
|
||||
} else {
|
||||
addErr(pathname, InvalidFileModeError(ent.Type()))
|
||||
return false
|
||||
}
|
||||
|
||||
if *got != *want {
|
||||
seMu.Lock()
|
||||
se.ChecksumMismatches = append(se.ChecksumMismatches,
|
||||
ChecksumMismatchError{Got: *got, Want: *want},
|
||||
)
|
||||
seMu.Unlock()
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
dir = c.base.Append(dirIdentifier)
|
||||
if entries, err := os.ReadDir(dir.String()); err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
if entries, readdirErr := os.ReadDir(dir.String()); readdirErr != nil {
|
||||
addErr(dir, readdirErr)
|
||||
} else {
|
||||
var (
|
||||
id ID
|
||||
linkname string
|
||||
)
|
||||
for _, ent = range entries {
|
||||
if id, err = Decode(ent.Name()); err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
condemnEntry()
|
||||
continue
|
||||
}
|
||||
wg.Add(len(entries))
|
||||
for _, ent := range entries {
|
||||
w <- checkEntry{ent, func(ent os.DirEntry, want *Checksum) bool {
|
||||
got := p.Get().(*Checksum)
|
||||
defer p.Put(got)
|
||||
|
||||
if linkname, err = os.Readlink(
|
||||
dir.Append(ent.Name()).String(),
|
||||
); err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
se.DanglingIdentifiers = append(se.DanglingIdentifiers, id)
|
||||
condemnEntry()
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err = Decode(path.Base(linkname)); err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
se.DanglingIdentifiers = append(se.DanglingIdentifiers, id)
|
||||
condemnEntry()
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err = os.Stat(dir.Append(ent.Name()).String()); err != nil {
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
se.Errs = append(se.Errs, err)
|
||||
pathname := dir.Append(ent.Name())
|
||||
if linkname, err := os.Readlink(
|
||||
pathname.String(),
|
||||
); err != nil {
|
||||
seMu.Lock()
|
||||
se.Errs[pathname.Handle()] = append(se.Errs[pathname.Handle()], err)
|
||||
se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want)
|
||||
seMu.Unlock()
|
||||
return false
|
||||
} else if err = Decode(got, path.Base(linkname)); err != nil {
|
||||
seMu.Lock()
|
||||
lnp := dir.Append(linkname)
|
||||
se.Errs[lnp.Handle()] = append(se.Errs[lnp.Handle()], err)
|
||||
se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want)
|
||||
seMu.Unlock()
|
||||
return false
|
||||
}
|
||||
se.DanglingIdentifiers = append(se.DanglingIdentifiers, id)
|
||||
condemnEntry()
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := os.Stat(pathname.String()); err != nil {
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
addErr(pathname, err)
|
||||
}
|
||||
seMu.Lock()
|
||||
se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want)
|
||||
seMu.Unlock()
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
if len(c.identPending) > 0 {
|
||||
se.Errs = append(se.Errs, errors.New(
|
||||
addErr(c.base, errors.New(
|
||||
"scrub began with pending artifacts",
|
||||
))
|
||||
} else {
|
||||
chmodErr, removeErr := removeAll(c.base.Append(dirWork))
|
||||
pathname := c.base.Append(dirWork)
|
||||
chmodErr, removeErr := removeAll(pathname)
|
||||
if chmodErr != nil {
|
||||
se.Errs = append(se.Errs, chmodErr)
|
||||
addErr(pathname, chmodErr)
|
||||
}
|
||||
if removeErr != nil {
|
||||
se.Errs = append(se.Errs, removeErr)
|
||||
addErr(pathname, removeErr)
|
||||
}
|
||||
|
||||
if err := os.Mkdir(
|
||||
c.base.Append(dirWork).String(),
|
||||
0700,
|
||||
); err != nil {
|
||||
se.Errs = append(se.Errs, err)
|
||||
if err := os.Mkdir(pathname.String(), 0700); err != nil {
|
||||
addErr(pathname, err)
|
||||
}
|
||||
|
||||
chmodErr, removeErr = removeAll(c.base.Append(dirTemp))
|
||||
pathname = c.base.Append(dirTemp)
|
||||
chmodErr, removeErr = removeAll(pathname)
|
||||
if chmodErr != nil {
|
||||
se.Errs = append(se.Errs, chmodErr)
|
||||
addErr(pathname, chmodErr)
|
||||
}
|
||||
if removeErr != nil {
|
||||
se.Errs = append(se.Errs, removeErr)
|
||||
addErr(pathname, removeErr)
|
||||
}
|
||||
}
|
||||
|
||||
if len(se.ChecksumMismatches) > 0 ||
|
||||
len(se.DanglingIdentifiers) > 0 ||
|
||||
len(se.Errs) > 0 {
|
||||
slices.SortFunc(se.ChecksumMismatches, func(a, b ChecksumMismatchError) int {
|
||||
return bytes.Compare(a.Want[:], b.Want[:])
|
||||
})
|
||||
slices.SortFunc(se.DanglingIdentifiers, func(a, b ID) int {
|
||||
return bytes.Compare(a[:], b[:])
|
||||
})
|
||||
return &se
|
||||
} else {
|
||||
return nil
|
||||
@@ -1063,7 +1130,7 @@ func (c *Cache) cure(a Artifact) (
|
||||
if name, err = os.Readlink(pathname.String()); err != nil {
|
||||
return
|
||||
}
|
||||
checksum, err = Decode(path.Base(name))
|
||||
err = Decode(&checksum, path.Base(name))
|
||||
return
|
||||
}
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
@@ -1269,7 +1336,8 @@ func (c *Cache) cure(a Artifact) (
|
||||
}
|
||||
|
||||
var gotChecksum Checksum
|
||||
if gotChecksum, err = HashFS(
|
||||
if err = HashFS(
|
||||
&gotChecksum,
|
||||
dotOverrideFS{os.DirFS(t.work.String()).(dirFS)},
|
||||
".",
|
||||
); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user