internal/app/state: improve store internals

This fully exposes the store internals for #19 and are final preparations for removing the legacy store interface.

This change also fixes a potential deadlock in the handle initialisation mkdir failure path. This however is never reachable in hakurei as the store is never accessed concurrently.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2025-10-28 23:04:17 +09:00
parent 5e5826459e
commit 65342d588f
5 changed files with 382 additions and 72 deletions

View File

@@ -3,14 +3,15 @@ package state
import (
"errors"
"io/fs"
"iter"
"os"
"strconv"
"sync"
"syscall"
"hakurei.app/container/check"
"hakurei.app/hst"
"hakurei.app/internal/lockedfile"
"hakurei.app/message"
)
// storeMutexName is the pathname of the file backing [lockedfile.Mutex] of a stateStore and storeHandle.
@@ -34,11 +35,10 @@ type stateStore struct {
mkdirOnce sync.Once
// Stored error value via mkdirOnce.
mkdirErr error
msg message.Msg
}
// bigLock acquires fileMu on stateStore.
// A non-nil error returned by bigLock is of type [hst.AppError].
func (s *stateStore) bigLock() (unlock func(), err error) {
s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base.String(), 0700) })
if s.mkdirErr != nil {
@@ -52,6 +52,7 @@ func (s *stateStore) bigLock() (unlock func(), err error) {
}
// identityHandle loads or initialises a storeHandle for identity.
// A non-nil error returned by identityHandle is of type [hst.AppError].
func (s *stateStore) identityHandle(identity int) (*storeHandle, error) {
h := new(storeHandle)
h.mu.Lock()
@@ -70,61 +71,92 @@ func (s *stateStore) identityHandle(identity int) (*storeHandle, error) {
h.path = s.base.Append(strconv.Itoa(identity))
h.fileMu = lockedfile.MutexAt(h.path.Append(storeMutexName).String())
if err := os.MkdirAll(h.path.String(), 0700); err != nil && !errors.Is(err, fs.ErrExist) {
err := os.MkdirAll(h.path.String(), 0700)
h.mu.Unlock()
if err != nil && !errors.Is(err, fs.ErrExist) {
// handle methods will likely return ENOENT
s.handles.CompareAndDelete(identity, h)
return nil, &hst.AppError{Step: "create store segment directory", Err: err}
}
h.mu.Unlock()
}
return h, nil
}
func (s *stateStore) Do(identity int, f func(c Cursor)) (bool, error) {
if h, err := s.identityHandle(identity); err != nil {
return false, err
} else {
return h.do(f)
}
// segmentIdentity is produced by the iterator returned by stateStore.segments.
type segmentIdentity struct {
// Identity of the current segment.
identity int
// Error encountered while processing this segment.
err error
}
func (s *stateStore) List() ([]int, error) {
// segments returns an iterator over all segmentIdentity known to the store.
// To obtain a storeHandle on a segment, caller must then call identityHandle.
// A non-nil error returned by segments is of type [hst.AppError].
func (s *stateStore) segments() (iter.Seq[segmentIdentity], int, error) {
// read directory contents, should only contain storeMutexName and identity
var entries []os.DirEntry
// acquire big lock to read store segment list
if unlock, err := s.bigLock(); err != nil {
return nil, err
return nil, -1, err
} else {
entries, err = os.ReadDir(s.base.String())
unlock()
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, &hst.AppError{Step: "read store directory", Err: err}
return nil, -1, &hst.AppError{Step: "read store segments", Err: err}
}
}
identities := make([]int, 0, len(entries))
for _, e := range entries {
// should only be the big lock
if !e.IsDir() {
if e.Name() != storeMutexName {
s.msg.Verbosef("skipped non-directory entry %q", e.Name())
}
continue
}
// this either indicates a serious bug or external interference
if v, err := strconv.Atoi(e.Name()); err != nil {
s.msg.Verbosef("skipped non-identity entry %q", e.Name())
continue
} else {
if v < hst.IdentityMin || v > hst.IdentityMax {
s.msg.Verbosef("skipped out of bounds entry %q", e.Name())
continue
}
identities = append(identities, v)
}
// expects lock file
l := len(entries)
if l > 0 {
l--
}
return identities, nil
return func(yield func(segmentIdentity) bool) {
// for error reporting
const step = "process store segment"
for _, ent := range entries {
si := segmentIdentity{identity: -1}
// should only be the big lock
if !ent.IsDir() {
if ent.Name() == storeMutexName {
continue
}
// this should never happen
si.err = &hst.AppError{Step: step, Err: syscall.EISDIR,
Msg: "skipped non-directory entry " + strconv.Quote(ent.Name())}
goto out
}
// failure paths either indicates a serious bug or external interference
if v, err := strconv.Atoi(ent.Name()); err != nil {
si.err = &hst.AppError{Step: step, Err: err,
Msg: "skipped non-identity entry " + strconv.Quote(ent.Name())}
goto out
} else if v < hst.IdentityMin || v > hst.IdentityMax {
si.err = &hst.AppError{Step: step, Err: syscall.ERANGE,
Msg: "skipped out of bounds entry " + strconv.Itoa(v)}
goto out
} else {
si.identity = v
}
out:
if !yield(si) {
break
}
}
}, l, nil
}
// newStore returns the address of a new instance of stateStore.
// Multiple instances of stateStore rooted in the same directory is supported, but discouraged.
func newStore(base *check.Absolute) *stateStore {
return &stateStore{base: base, fileMu: lockedfile.MutexAt(base.Append(storeMutexName).String())}
}