internal/pkg: lock on-filesystem cache
All checks were successful
Test / Create distribution (push) Successful in 47s
Test / Sandbox (push) Successful in 2m55s
Test / ShareFS (push) Successful in 4m52s
Test / Hpkg (push) Successful in 5m11s
Test / Sandbox (race detector) (push) Successful in 5m17s
Test / Hakurei (race detector) (push) Successful in 7m53s
Test / Hakurei (push) Successful in 4m6s
Test / Flake checks (push) Successful in 1m41s
All checks were successful
Test / Create distribution (push) Successful in 47s
Test / Sandbox (push) Successful in 2m55s
Test / ShareFS (push) Successful in 4m52s
Test / Hpkg (push) Successful in 5m11s
Test / Sandbox (race detector) (push) Successful in 5m17s
Test / Hakurei (race detector) (push) Successful in 7m53s
Test / Hakurei (push) Successful in 4m6s
Test / Flake checks (push) Successful in 1m41s
Any fine-grained file-based locking here significantly hurts performance and is not part of the use case of the package. This change guarantees exclusive access to prevent inconsistent state on the filesystem. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
@@ -22,10 +22,12 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"testing"
|
||||||
"unique"
|
"unique"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"hakurei.app/container/check"
|
"hakurei.app/container/check"
|
||||||
|
"hakurei.app/internal/lockedfile"
|
||||||
"hakurei.app/message"
|
"hakurei.app/message"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -353,6 +355,10 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// fileLock is the file name appended to Cache.base for guaranteeing
|
||||||
|
// exclusive access to the cache directory.
|
||||||
|
fileLock = "lock"
|
||||||
|
|
||||||
// dirIdentifier is the directory name appended to Cache.base for storing
|
// dirIdentifier is the directory name appended to Cache.base for storing
|
||||||
// artifacts named after their [ID].
|
// artifacts named after their [ID].
|
||||||
dirIdentifier = "identifier"
|
dirIdentifier = "identifier"
|
||||||
@@ -438,6 +444,11 @@ type Cache struct {
|
|||||||
identPending map[unique.Handle[ID]]<-chan struct{}
|
identPending map[unique.Handle[ID]]<-chan struct{}
|
||||||
// Synchronises access to ident and corresponding filesystem entries.
|
// Synchronises access to ident and corresponding filesystem entries.
|
||||||
identMu sync.RWMutex
|
identMu sync.RWMutex
|
||||||
|
|
||||||
|
// Unlocks the on-filesystem cache. Must only be called from Close.
|
||||||
|
unlock func()
|
||||||
|
// Synchronises calls to Close.
|
||||||
|
closeOnce sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsStrict returns whether the [Cache] strictly verifies checksums.
|
// IsStrict returns whether the [Cache] strictly verifies checksums.
|
||||||
@@ -1405,19 +1416,44 @@ func (pending *pendingArtifactDep) cure(c *Cache) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Close cancels all pending cures and waits for them to clean up.
|
// Close cancels all pending cures and waits for them to clean up.
|
||||||
func (c *Cache) Close() { c.cancel(); c.wg.Wait() }
|
func (c *Cache) Close() {
|
||||||
|
c.closeOnce.Do(func() {
|
||||||
|
c.cancel()
|
||||||
|
c.wg.Wait()
|
||||||
|
c.unlock()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// New returns the address to a new instance of [Cache]. Concurrent cures of
|
// Open returns the address of a newly opened instance of [Cache].
|
||||||
// dependency [Artifact] is limited to the caller-supplied value, however direct
|
//
|
||||||
// calls to [Cache.Cure] is not subject to this limitation.
|
// Concurrent cures of a [FloodArtifact] dependency graph is limited to the
|
||||||
|
// caller-supplied value, however direct calls to [Cache.Cure] is not subject
|
||||||
|
// to this limitation.
|
||||||
//
|
//
|
||||||
// A cures value of 0 or lower is equivalent to the value returned by
|
// A cures value of 0 or lower is equivalent to the value returned by
|
||||||
// [runtime.NumCPU].
|
// [runtime.NumCPU].
|
||||||
func New(
|
//
|
||||||
|
// A successful call to Open guarantees exclusive access to the on-filesystem
|
||||||
|
// cache for the resulting instance of [Cache]. The [Cache.Close] method cancels
|
||||||
|
// and waits for pending cures on [Cache] before releasing this lock and must be
|
||||||
|
// called once the [Cache] is no longer needed.
|
||||||
|
func Open(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
msg message.Msg,
|
msg message.Msg,
|
||||||
cures int,
|
cures int,
|
||||||
base *check.Absolute,
|
base *check.Absolute,
|
||||||
|
) (*Cache, error) {
|
||||||
|
return open(ctx, msg, cures, base, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// open implements Open but allows omitting the [lockedfile] lock when called
|
||||||
|
// from a test. This is used to simulate invalid states in the test suite.
|
||||||
|
func open(
|
||||||
|
ctx context.Context,
|
||||||
|
msg message.Msg,
|
||||||
|
cures int,
|
||||||
|
base *check.Absolute,
|
||||||
|
lock bool,
|
||||||
) (*Cache, error) {
|
) (*Cache, error) {
|
||||||
for _, name := range []string{
|
for _, name := range []string{
|
||||||
dirIdentifier,
|
dirIdentifier,
|
||||||
@@ -1443,6 +1479,18 @@ func New(
|
|||||||
c.cureDep = cureDep
|
c.cureDep = cureDep
|
||||||
c.identPool.New = func() any { return new(extIdent) }
|
c.identPool.New = func() any { return new(extIdent) }
|
||||||
|
|
||||||
|
if lock || !testing.Testing() {
|
||||||
|
if unlock, err := lockedfile.MutexAt(
|
||||||
|
base.Append(fileLock).String(),
|
||||||
|
).Lock(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
c.unlock = unlock
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
c.unlock = func() {}
|
||||||
|
}
|
||||||
|
|
||||||
if cures < 1 {
|
if cures < 1 {
|
||||||
cures = runtime.NumCPU()
|
cures = runtime.NumCPU()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,15 @@ import (
|
|||||||
"hakurei.app/message"
|
"hakurei.app/message"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//go:linkname unsafeOpen hakurei.app/internal/pkg.open
|
||||||
|
func unsafeOpen(
|
||||||
|
ctx context.Context,
|
||||||
|
msg message.Msg,
|
||||||
|
cures int,
|
||||||
|
base *check.Absolute,
|
||||||
|
lock bool,
|
||||||
|
) (*pkg.Cache, error)
|
||||||
|
|
||||||
func TestMain(m *testing.M) { container.TryArgv0(nil); os.Exit(m.Run()) }
|
func TestMain(m *testing.M) { container.TryArgv0(nil); os.Exit(m.Run()) }
|
||||||
|
|
||||||
// overrideIdent overrides the ID method of [Artifact].
|
// overrideIdent overrides the ID method of [Artifact].
|
||||||
@@ -213,7 +222,7 @@ func TestIdent(t *testing.T) {
|
|||||||
var cache *pkg.Cache
|
var cache *pkg.Cache
|
||||||
if a, err := check.NewAbs(t.TempDir()); err != nil {
|
if a, err := check.NewAbs(t.TempDir()); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if cache, err = pkg.New(t.Context(), msg, 0, a); err != nil {
|
} else if cache, err = pkg.Open(t.Context(), msg, 0, a); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
t.Cleanup(cache.Close)
|
t.Cleanup(cache.Close)
|
||||||
@@ -274,8 +283,8 @@ func checkWithCache(t *testing.T, testCases []cacheTestCase) {
|
|||||||
msg.SwapVerbose(testing.Verbose())
|
msg.SwapVerbose(testing.Verbose())
|
||||||
|
|
||||||
var scrubFunc func() error // scrub after hashing
|
var scrubFunc func() error // scrub after hashing
|
||||||
if c, err := pkg.New(t.Context(), msg, 0, base); err != nil {
|
if c, err := pkg.Open(t.Context(), msg, 0, base); err != nil {
|
||||||
t.Fatalf("New: error = %v", err)
|
t.Fatalf("Open: error = %v", err)
|
||||||
} else {
|
} else {
|
||||||
t.Cleanup(c.Close)
|
t.Cleanup(c.Close)
|
||||||
if tc.early != nil {
|
if tc.early != nil {
|
||||||
@@ -294,6 +303,11 @@ func checkWithCache(t *testing.T, testCases []cacheTestCase) {
|
|||||||
restoreTemp = true
|
restoreTemp = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// destroy lock file to avoid changing cache checksums
|
||||||
|
if err := os.Remove(base.Append("lock").String()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
var checksum pkg.Checksum
|
var checksum pkg.Checksum
|
||||||
if err := pkg.HashDir(&checksum, base); err != nil {
|
if err := pkg.HashDir(&checksum, base); err != nil {
|
||||||
t.Fatalf("HashDir: error = %v", err)
|
t.Fatalf("HashDir: error = %v", err)
|
||||||
@@ -522,12 +536,12 @@ func TestCache(t *testing.T) {
|
|||||||
)},
|
)},
|
||||||
})
|
})
|
||||||
|
|
||||||
if c0, err := pkg.New(
|
if c0, err := unsafeOpen(
|
||||||
t.Context(),
|
t.Context(),
|
||||||
message.New(nil),
|
message.New(nil),
|
||||||
0, base,
|
0, base, false,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
t.Fatalf("New: error = %v", err)
|
t.Fatalf("open: error = %v", err)
|
||||||
} else {
|
} else {
|
||||||
t.Cleanup(c.Close) // check doubled cancel
|
t.Cleanup(c.Close) // check doubled cancel
|
||||||
cureMany(t, c0, []cureStep{
|
cureMany(t, c0, []cureStep{
|
||||||
@@ -1017,12 +1031,12 @@ func TestNew(t *testing.T) {
|
|||||||
Path: container.Nonexistent,
|
Path: container.Nonexistent,
|
||||||
Err: syscall.ENOENT,
|
Err: syscall.ENOENT,
|
||||||
}
|
}
|
||||||
if _, err := pkg.New(
|
if _, err := pkg.Open(
|
||||||
t.Context(),
|
t.Context(),
|
||||||
message.New(nil),
|
message.New(nil),
|
||||||
0, check.MustAbs(container.Nonexistent),
|
0, check.MustAbs(container.Nonexistent),
|
||||||
); !reflect.DeepEqual(err, wantErr) {
|
); !reflect.DeepEqual(err, wantErr) {
|
||||||
t.Errorf("New: error = %#v, want %#v", err, wantErr)
|
t.Errorf("Open: error = %#v, want %#v", err, wantErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -1045,12 +1059,12 @@ func TestNew(t *testing.T) {
|
|||||||
Path: tempDir.Append("cache").String(),
|
Path: tempDir.Append("cache").String(),
|
||||||
Err: syscall.EACCES,
|
Err: syscall.EACCES,
|
||||||
}
|
}
|
||||||
if _, err := pkg.New(
|
if _, err := pkg.Open(
|
||||||
t.Context(),
|
t.Context(),
|
||||||
message.New(nil),
|
message.New(nil),
|
||||||
0, tempDir.Append("cache"),
|
0, tempDir.Append("cache"),
|
||||||
); !reflect.DeepEqual(err, wantErr) {
|
); !reflect.DeepEqual(err, wantErr) {
|
||||||
t.Errorf("New: error = %#v, want %#v", err, wantErr)
|
t.Errorf("Open: error = %#v, want %#v", err, wantErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user