From 5936e6a4aab541a437fa9464468b51f91d019a2a Mon Sep 17 00:00:00 2001 From: Ophestra Date: Fri, 16 Jan 2026 02:09:12 +0900 Subject: [PATCH] internal/pkg: parallelise scrub This significantly improves scrubbing performance. Since the cache directory structure is friendly to simultaneous access, this is possible without synchronisation. Signed-off-by: Ophestra --- internal/pkg/dir.go | 11 +- internal/pkg/dir_test.go | 3 +- internal/pkg/pkg.go | 296 ++++++++++++++++++++++++--------------- internal/pkg/pkg_test.go | 142 ++++++++++++++----- 4 files changed, 296 insertions(+), 156 deletions(-) diff --git a/internal/pkg/dir.go b/internal/pkg/dir.go index e446c56..d0c3668 100644 --- a/internal/pkg/dir.go +++ b/internal/pkg/dir.go @@ -196,15 +196,16 @@ func Flatten(fsys fs.FS, root string, w io.Writer) (n int, err error) { } // HashFS returns a checksum produced by hashing the result of [Flatten]. -func HashFS(fsys fs.FS, root string) (Checksum, error) { +func HashFS(buf *Checksum, fsys fs.FS, root string) error { h := sha512.New384() if _, err := Flatten(fsys, root, h); err != nil { - return Checksum{}, err + return err } - return (Checksum)(h.Sum(nil)), nil + h.Sum(buf[:0]) + return nil } // HashDir returns a checksum produced by hashing the result of [Flatten]. -func HashDir(pathname *check.Absolute) (Checksum, error) { - return HashFS(os.DirFS(pathname.String()), ".") +func HashDir(buf *Checksum, pathname *check.Absolute) error { + return HashFS(buf, os.DirFS(pathname.String()), ".") } diff --git a/internal/pkg/dir_test.go b/internal/pkg/dir_test.go index c6a57e4..9ded5bf 100644 --- a/internal/pkg/dir_test.go +++ b/internal/pkg/dir_test.go @@ -523,7 +523,8 @@ func TestFlatten(t *testing.T) { t.Run("hash", func(t *testing.T) { t.Parallel() - if got, err := pkg.HashFS(tc.fsys, "."); err != nil { + var got pkg.Checksum + if err := pkg.HashFS(&got, tc.fsys, "."); err != nil { t.Fatalf("HashFS: error = %v", err) } else if got != tc.sum { t.Fatalf("HashFS: %v", &pkg.ChecksumMismatchError{ diff --git a/internal/pkg/pkg.go b/internal/pkg/pkg.go index d156e75..d127453 100644 --- a/internal/pkg/pkg.go +++ b/internal/pkg/pkg.go @@ -13,6 +13,7 @@ import ( "io" "io/fs" "iter" + "maps" "os" "path" "path/filepath" @@ -43,10 +44,10 @@ func Encode(checksum Checksum) string { } // Decode is abbreviation for base64.URLEncoding.Decode(checksum[:], []byte(s)). -func Decode(s string) (checksum Checksum, err error) { +func Decode(buf *Checksum, s string) (err error) { var n int - n, err = base64.URLEncoding.Decode(checksum[:], []byte(s)) - if err == nil && n != len(Checksum{}) { + n, err = base64.URLEncoding.Decode(buf[:], []byte(s)) + if err == nil && n != len(buf) { err = io.ErrUnexpectedEOF } return @@ -54,12 +55,11 @@ func Decode(s string) (checksum Checksum, err error) { // MustDecode decodes a string representation of [Checksum] and panics if there // is a decoding error or the resulting data is too short. -func MustDecode(s string) Checksum { - if checksum, err := Decode(s); err != nil { +func MustDecode(s string) (checksum Checksum) { + if err := Decode(&checksum, s); err != nil { panic(err) - } else { - return checksum } + return } // IContext is passed to [Artifact.Params] and provides identifier information @@ -563,7 +563,23 @@ type ScrubError struct { // Miscellaneous errors, including [os.ReadDir] on checksum and identifier // directories, [Decode] on entry names and [os.RemoveAll] on inconsistent // entries. - Errs []error + Errs map[unique.Handle[string]][]error +} + +// errs is a deterministic iterator over Errs. +func (e *ScrubError) errs(yield func(unique.Handle[string], []error) bool) { + keys := slices.AppendSeq( + make([]unique.Handle[string], 0, len(e.Errs)), + maps.Keys(e.Errs), + ) + slices.SortFunc(keys, func(a, b unique.Handle[string]) int { + return strings.Compare(a.Value(), b.Value()) + }) + for _, key := range keys { + if !yield(key, e.Errs[key]) { + break + } + } } // Unwrap returns a concatenation of ChecksumMismatches and Errs. @@ -572,8 +588,8 @@ func (e *ScrubError) Unwrap() []error { for _, err := range e.ChecksumMismatches { s = append(s, &err) } - for _, err := range e.Errs { - s = append(s, err) + for _, errs := range e.errs { + s = append(s, errs...) } return s } @@ -597,8 +613,11 @@ func (e *ScrubError) Error() string { } if len(e.Errs) > 0 { s := "errors during scrub:\n" - for _, err := range e.Errs { - s += err.Error() + "\n" + for pathname, errs := range e.errs { + s += " " + pathname.Value() + ":\n" + for _, err := range errs { + s += " " + err.Error() + "\n" + } } segments = append(segments, s) } @@ -610,7 +629,11 @@ func (e *ScrubError) Error() string { // symlinks and removes them if found. // // This method is not safe for concurrent use with any other method. -func (c *Cache) Scrub() error { +func (c *Cache) Scrub(checks int) error { + if checks <= 0 { + checks = runtime.NumCPU() + } + c.identMu.Lock() defer c.identMu.Unlock() c.checksumMu.Lock() @@ -618,147 +641,191 @@ func (c *Cache) Scrub() error { c.ident = make(map[unique.Handle[ID]]Checksum) c.identErr = make(map[unique.Handle[ID]]error) - - var se ScrubError + c.artifact.Clear() var ( - ent os.DirEntry - dir *check.Absolute + se = ScrubError{Errs: make(map[unique.Handle[string]][]error)} + seMu sync.Mutex + + addErr = func(pathname *check.Absolute, err error) { + seMu.Lock() + se.Errs[pathname.Handle()] = append(se.Errs[pathname.Handle()], err) + seMu.Unlock() + } ) - condemnEntry := func() { - chmodErr, removeErr := removeAll(dir.Append(ent.Name())) + + type checkEntry struct { + ent os.DirEntry + check func(ent os.DirEntry, want *Checksum) bool + } + var ( + dir *check.Absolute + wg sync.WaitGroup + w = make(chan checkEntry, checks) + p = sync.Pool{New: func() any { return new(Checksum) }} + ) + condemn := func(ent os.DirEntry) { + pathname := dir.Append(ent.Name()) + chmodErr, removeErr := removeAll(pathname) if chmodErr != nil { - se.Errs = append(se.Errs, chmodErr) + addErr(pathname, chmodErr) } if removeErr != nil { - se.Errs = append(se.Errs, removeErr) + addErr(pathname, removeErr) } } + for i := 0; i < checks; i++ { + go func() { + for ce := range w { + want := p.Get().(*Checksum) + ent := ce.ent + if err := Decode(want, ent.Name()); err != nil { + addErr(dir.Append(ent.Name()), err) + wg.Go(func() { condemn(ent) }) + } else if !ce.check(ent, want) { + wg.Go(func() { condemn(ent) }) + } else { + c.msg.Verbosef("%s is consistent", ent.Name()) + } + p.Put(want) + wg.Done() + } + }() + } + defer close(w) dir = c.base.Append(dirChecksum) - if entries, err := os.ReadDir(dir.String()); err != nil { - se.Errs = append(se.Errs, err) + if entries, readdirErr := os.ReadDir(dir.String()); readdirErr != nil { + addErr(dir, readdirErr) } else { - var got, want Checksum - for _, ent = range entries { - if want, err = Decode(ent.Name()); err != nil { - se.Errs = append(se.Errs, err) - condemnEntry() - continue - } - if ent.IsDir() { - if got, err = HashDir(dir.Append(ent.Name())); err != nil { - se.Errs = append(se.Errs, err) - continue - } - } else if ent.Type().IsRegular() { - h := sha512.New384() - var r *os.File - r, err = os.Open(dir.Append(ent.Name()).String()) - if err != nil { - se.Errs = append(se.Errs, err) - continue - } - _, err = io.Copy(h, r) - closeErr := r.Close() - if closeErr != nil { - se.Errs = append(se.Errs, closeErr) - } - if err != nil { - se.Errs = append(se.Errs, err) - continue - } - h.Sum(got[:0]) - } else { - se.Errs = append(se.Errs, InvalidFileModeError(ent.Type())) - condemnEntry() - continue - } + wg.Add(len(entries)) + for _, ent := range entries { + w <- checkEntry{ent, func(ent os.DirEntry, want *Checksum) bool { + got := p.Get().(*Checksum) + defer p.Put(got) - if got != want { - se.ChecksumMismatches = append(se.ChecksumMismatches, ChecksumMismatchError{ - Got: got, - Want: want, - }) - condemnEntry() - } + pathname := dir.Append(ent.Name()) + if ent.IsDir() { + if err := HashDir(got, pathname); err != nil { + addErr(pathname, err) + return true + } + } else if ent.Type().IsRegular() { + h := sha512.New384() + + if r, err := os.Open(pathname.String()); err != nil { + addErr(pathname, err) + return true + } else { + _, err = io.Copy(h, r) + closeErr := r.Close() + if closeErr != nil { + addErr(pathname, closeErr) + } + if err != nil { + addErr(pathname, err) + } + } + h.Sum(got[:0]) + } else { + addErr(pathname, InvalidFileModeError(ent.Type())) + return false + } + + if *got != *want { + seMu.Lock() + se.ChecksumMismatches = append(se.ChecksumMismatches, + ChecksumMismatchError{Got: *got, Want: *want}, + ) + seMu.Unlock() + return false + } + return true + }} } + wg.Wait() } dir = c.base.Append(dirIdentifier) - if entries, err := os.ReadDir(dir.String()); err != nil { - se.Errs = append(se.Errs, err) + if entries, readdirErr := os.ReadDir(dir.String()); readdirErr != nil { + addErr(dir, readdirErr) } else { - var ( - id ID - linkname string - ) - for _, ent = range entries { - if id, err = Decode(ent.Name()); err != nil { - se.Errs = append(se.Errs, err) - condemnEntry() - continue - } + wg.Add(len(entries)) + for _, ent := range entries { + w <- checkEntry{ent, func(ent os.DirEntry, want *Checksum) bool { + got := p.Get().(*Checksum) + defer p.Put(got) - if linkname, err = os.Readlink( - dir.Append(ent.Name()).String(), - ); err != nil { - se.Errs = append(se.Errs, err) - se.DanglingIdentifiers = append(se.DanglingIdentifiers, id) - condemnEntry() - continue - } - - if _, err = Decode(path.Base(linkname)); err != nil { - se.Errs = append(se.Errs, err) - se.DanglingIdentifiers = append(se.DanglingIdentifiers, id) - condemnEntry() - continue - } - - if _, err = os.Stat(dir.Append(ent.Name()).String()); err != nil { - if !errors.Is(err, os.ErrNotExist) { - se.Errs = append(se.Errs, err) + pathname := dir.Append(ent.Name()) + if linkname, err := os.Readlink( + pathname.String(), + ); err != nil { + seMu.Lock() + se.Errs[pathname.Handle()] = append(se.Errs[pathname.Handle()], err) + se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want) + seMu.Unlock() + return false + } else if err = Decode(got, path.Base(linkname)); err != nil { + seMu.Lock() + lnp := dir.Append(linkname) + se.Errs[lnp.Handle()] = append(se.Errs[lnp.Handle()], err) + se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want) + seMu.Unlock() + return false } - se.DanglingIdentifiers = append(se.DanglingIdentifiers, id) - condemnEntry() - continue - } + + if _, err := os.Stat(pathname.String()); err != nil { + if !errors.Is(err, os.ErrNotExist) { + addErr(pathname, err) + } + seMu.Lock() + se.DanglingIdentifiers = append(se.DanglingIdentifiers, *want) + seMu.Unlock() + return false + } + return true + }} } + wg.Wait() } if len(c.identPending) > 0 { - se.Errs = append(se.Errs, errors.New( + addErr(c.base, errors.New( "scrub began with pending artifacts", )) } else { - chmodErr, removeErr := removeAll(c.base.Append(dirWork)) + pathname := c.base.Append(dirWork) + chmodErr, removeErr := removeAll(pathname) if chmodErr != nil { - se.Errs = append(se.Errs, chmodErr) + addErr(pathname, chmodErr) } if removeErr != nil { - se.Errs = append(se.Errs, removeErr) + addErr(pathname, removeErr) } - if err := os.Mkdir( - c.base.Append(dirWork).String(), - 0700, - ); err != nil { - se.Errs = append(se.Errs, err) + if err := os.Mkdir(pathname.String(), 0700); err != nil { + addErr(pathname, err) } - chmodErr, removeErr = removeAll(c.base.Append(dirTemp)) + pathname = c.base.Append(dirTemp) + chmodErr, removeErr = removeAll(pathname) if chmodErr != nil { - se.Errs = append(se.Errs, chmodErr) + addErr(pathname, chmodErr) } if removeErr != nil { - se.Errs = append(se.Errs, removeErr) + addErr(pathname, removeErr) } } if len(se.ChecksumMismatches) > 0 || len(se.DanglingIdentifiers) > 0 || len(se.Errs) > 0 { + slices.SortFunc(se.ChecksumMismatches, func(a, b ChecksumMismatchError) int { + return bytes.Compare(a.Want[:], b.Want[:]) + }) + slices.SortFunc(se.DanglingIdentifiers, func(a, b ID) int { + return bytes.Compare(a[:], b[:]) + }) return &se } else { return nil @@ -1063,7 +1130,7 @@ func (c *Cache) cure(a Artifact) ( if name, err = os.Readlink(pathname.String()); err != nil { return } - checksum, err = Decode(path.Base(name)) + err = Decode(&checksum, path.Base(name)) return } if !errors.Is(err, os.ErrNotExist) { @@ -1269,7 +1336,8 @@ func (c *Cache) cure(a Artifact) ( } var gotChecksum Checksum - if gotChecksum, err = HashFS( + if err = HashFS( + &gotChecksum, dotOverrideFS{os.DirFS(t.work.String()).(dirFS)}, ".", ); err != nil { diff --git a/internal/pkg/pkg_test.go b/internal/pkg/pkg_test.go index bb48dfa..7e22aa6 100644 --- a/internal/pkg/pkg_test.go +++ b/internal/pkg/pkg_test.go @@ -22,6 +22,7 @@ import ( "hakurei.app/container" "hakurei.app/container/check" + "hakurei.app/container/fhs" "hakurei.app/container/stub" "hakurei.app/internal/pkg" "hakurei.app/message" @@ -281,7 +282,7 @@ func checkWithCache(t *testing.T, testCases []cacheTestCase) { tc.early(t, base) } tc.f(t, base, c) - scrubFunc = c.Scrub + scrubFunc = func() error { return c.Scrub(1 << 7) } } var restoreTemp bool @@ -293,7 +294,8 @@ func checkWithCache(t *testing.T, testCases []cacheTestCase) { restoreTemp = true } - if checksum, err := pkg.HashDir(base); err != nil { + var checksum pkg.Checksum + if err := pkg.HashDir(&checksum, base); err != nil { t.Fatalf("HashDir: error = %v", err) } else if checksum != tc.want { t.Fatalf("HashDir: %v", &pkg.ChecksumMismatchError{ @@ -316,7 +318,7 @@ func checkWithCache(t *testing.T, testCases []cacheTestCase) { } // validate again to make sure scrub did not condemn anything - if checksum, err := pkg.HashDir(base); err != nil { + if err := pkg.HashDir(&checksum, base); err != nil { t.Fatalf("HashDir: error = %v", err) } else if checksum != tc.want { t.Fatalf("(scrubbed) HashDir: %v", &pkg.ChecksumMismatchError{ @@ -364,6 +366,49 @@ func cureMany(t *testing.T, c *pkg.Cache, steps []cureStep) { } } +// newWantScrubError returns the address to a new [ScrubError] for base. +func newWantScrubError(base *check.Absolute) *pkg.ScrubError { + return &pkg.ScrubError{ + ChecksumMismatches: []pkg.ChecksumMismatchError{ + {Got: pkg.MustDecode( + "vsAhtPNo4waRNOASwrQwcIPTqb3SBuJOXw2G4T1mNmVZM-wrQTRllmgXqcIIoRcX", + ), Want: pkg.Checksum{0xff, 0}}, + }, + DanglingIdentifiers: []pkg.ID{ + {0xfe, 0}, + {0xfe, 0xfe}, + {0xfe, 0xff}, + }, + Errs: map[unique.Handle[string]][]error{ + base.Append("checksum", "__8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").Handle(): { + pkg.InvalidFileModeError(fs.ModeSymlink), + }, + + base.Append("checksum", "invalid").Handle(): { + base64.CorruptInputError(4), + }, + + base.Append("nonexistent").Handle(): { + base64.CorruptInputError(8), + }, + + base.Append("identifier", pkg.Encode(pkg.ID{0xfe, 0xff})).Handle(): { + &os.PathError{ + Op: "readlink", + Path: base.Append( + "identifier", + pkg.Encode(pkg.ID{0xfe, 0xff}), + ).String(), + Err: syscall.EINVAL, + }, + }, + base.Append("identifier", "invalid").Handle(): { + base64.CorruptInputError(4), + }, + }, + } +} + func TestCache(t *testing.T) { t.Parallel() @@ -744,9 +789,11 @@ func TestCache(t *testing.T) { }) wantErrScrub := &pkg.ScrubError{ - Errs: []error{errors.New("scrub began with pending artifacts")}, + Errs: map[unique.Handle[string]][]error{ + base.Handle(): {errors.New("scrub began with pending artifacts")}, + }, } - if err := c.Scrub(); !reflect.DeepEqual(err, wantErrScrub) { + if err := c.Scrub(1 << 6); !reflect.DeepEqual(err, wantErrScrub) { t.Fatalf("Scrub: error = %#v, want %#v", err, wantErrScrub) } @@ -800,30 +847,8 @@ func TestCache(t *testing.T) { } } - wantErr := &pkg.ScrubError{ - ChecksumMismatches: []pkg.ChecksumMismatchError{ - {Got: pkg.MustDecode( - "vsAhtPNo4waRNOASwrQwcIPTqb3SBuJOXw2G4T1mNmVZM-wrQTRllmgXqcIIoRcX", - ), Want: pkg.Checksum{0xff, 0}}, - }, - DanglingIdentifiers: []pkg.ID{ - {0xfe, 0}, - {0xfe, 0xfe}, - {0xfe, 0xff}, - }, - Errs: []error{ - pkg.InvalidFileModeError(fs.ModeSymlink), - base64.CorruptInputError(4), - base64.CorruptInputError(8), - &os.PathError{ - Op: "readlink", - Path: base.Append("identifier", pkg.Encode(pkg.ID{0xfe, 0xff})).String(), - Err: syscall.EINVAL, - }, - base64.CorruptInputError(4), - }, - } - if err := c.Scrub(); !reflect.DeepEqual(err, wantErr) { + wantErr := newWantScrubError(base) + if err := c.Scrub(1 << 6); !reflect.DeepEqual(err, wantErr) { t.Fatalf("Scrub: error =\n%s\nwant\n%s", err, wantErr) } }, pkg.MustDecode("E4vEZKhCcL2gPZ2Tt59FS3lDng-d_2SKa2i5G_RbDfwGn6EemptFaGLPUDiOa94C")}, @@ -890,6 +915,48 @@ func TestScrubError(t *testing.T) { want string unwrap []error }{ + {"sample", *newWantScrubError( + fhs.AbsVarLib.Append("cure"), + ), `checksum mismatches: +got vsAhtPNo4waRNOASwrQwcIPTqb3SBuJOXw2G4T1mNmVZM-wrQTRllmgXqcIIoRcX instead of _wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA + +dangling identifiers: +_gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA +_v4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA +_v8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA + +errors during scrub: + /var/lib/cure/checksum/__8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA: + artifact did not produce a regular file or directory + /var/lib/cure/checksum/invalid: + illegal base64 data at input byte 4 + /var/lib/cure/identifier/_v8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA: + readlink /var/lib/cure/identifier/_v8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA: invalid argument + /var/lib/cure/identifier/invalid: + illegal base64 data at input byte 4 + /var/lib/cure/nonexistent: + illegal base64 data at input byte 8 +`, []error{ + &pkg.ChecksumMismatchError{Got: pkg.MustDecode( + "vsAhtPNo4waRNOASwrQwcIPTqb3SBuJOXw2G4T1mNmVZM-wrQTRllmgXqcIIoRcX", + ), Want: pkg.Checksum{0xff, 0}}, + + pkg.InvalidFileModeError(fs.ModeSymlink), + base64.CorruptInputError(4), + + &os.PathError{ + Op: "readlink", + Path: fhs.AbsVarLib.Append("cure").Append( + "identifier", + pkg.Encode(pkg.ID{0xfe, 0xff}), + ).String(), + Err: syscall.EINVAL, + }, + + base64.CorruptInputError(4), + base64.CorruptInputError(8), + }}, + {"full", pkg.ScrubError{ ChecksumMismatches: []pkg.ChecksumMismatchError{ {Want: pkg.MustDecode("CH3AiUrCCcVOjOYLaMKKK1Da78989JtfHeIsxMzWOQFiN4mrCLDYpoDxLWqJWCUN")}, @@ -898,10 +965,12 @@ func TestScrubError(t *testing.T) { (pkg.ID)(bytes.Repeat([]byte{0x75, 0xe6, 0x9d, 0x6d, 0xe7, 0x9f}, 8)), (pkg.ID)(bytes.Repeat([]byte{0x71, 0xa7, 0xde, 0x6d, 0xa6, 0xde}, 8)), }, - Errs: []error{ - stub.UniqueError(0xcafe), - stub.UniqueError(0xbad), - stub.UniqueError(0xff), + Errs: map[unique.Handle[string]][]error{ + unique.Make("/proc/nonexistent"): { + stub.UniqueError(0xcafe), + stub.UniqueError(0xbad), + stub.UniqueError(0xff), + }, }, }, `checksum mismatches: got AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA instead of CH3AiUrCCcVOjOYLaMKKK1Da78989JtfHeIsxMzWOQFiN4mrCLDYpoDxLWqJWCUN @@ -911,9 +980,10 @@ deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef cafebabecafebabecafebabecafebabecafebabecafebabecafebabecafebabe errors during scrub: -unique error 51966 injected by the test suite -unique error 2989 injected by the test suite -unique error 255 injected by the test suite + /proc/nonexistent: + unique error 51966 injected by the test suite + unique error 2989 injected by the test suite + unique error 255 injected by the test suite `, []error{ &pkg.ChecksumMismatchError{Want: pkg.MustDecode("CH3AiUrCCcVOjOYLaMKKK1Da78989JtfHeIsxMzWOQFiN4mrCLDYpoDxLWqJWCUN")}, stub.UniqueError(0xcafe),