From dacd9550e0f696d8e6723a304ba056502109201f Mon Sep 17 00:00:00 2001 From: Ophestra Date: Sun, 26 Oct 2025 03:27:56 +0900 Subject: [PATCH] internal/app/state: acquire big lock for toplevel operations This avoids getting into an inconsistent state for simultaneous calls to List and Do on a previously unknown identity. Signed-off-by: Ophestra --- internal/app/state/multi.go | 181 +++++++++++++++++++++++------------- 1 file changed, 114 insertions(+), 67 deletions(-) diff --git a/internal/app/state/multi.go b/internal/app/state/multi.go index 446d6ce..99a02ac 100644 --- a/internal/app/state/multi.go +++ b/internal/app/state/multi.go @@ -14,73 +14,116 @@ import ( "hakurei.app/message" ) -// multiLockFileName is the name of the file backing [lockedfile.Mutex] of a multiBackend. +// multiLockFileName is the name of the file backing [lockedfile.Mutex] of a multiStore and multiBackend. const multiLockFileName = "lock" // fine-grained locking and access type multiStore struct { base string - // initialised backends - backends *sync.Map + // All currently known instances of multiHandle, keyed by their identity. + handles sync.Map + // Held during List and when initialising previously unknown identities during Do. + // Must not be accessed directly. Callers should use the bigLock method instead. + fileMu *lockedfile.Mutex + + // For creating the base directory. + mkdirOnce sync.Once + // Stored error value via mkdirOnce. + mkdirErr error msg message.Msg mu sync.RWMutex } -func (s *multiStore) Do(identity int, f func(c Cursor)) (bool, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - // load or initialise new backend - b := new(multiBackend) - b.mu.Lock() - if v, ok := s.backends.LoadOrStore(identity, b); ok { - b = v.(*multiBackend) - } else { - b.path = path.Join(s.base, strconv.Itoa(identity)) - - // ensure directory - if err := os.MkdirAll(b.path, 0700); err != nil && !errors.Is(err, fs.ErrExist) { - s.backends.CompareAndDelete(identity, b) - return false, &hst.AppError{Step: "create store segment directory", Err: err} - } - - // set up file-based mutex - b.lockfile = lockedfile.MutexAt(path.Join(b.path, multiLockFileName)) - - b.mu.Unlock() +// bigLock acquires fileMu on multiStore. +// Must be called while holding a read lock on multiStore. +func (s *multiStore) bigLock() (unlock func(), err error) { + s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base, 0700) }) + if s.mkdirErr != nil { + return nil, &hst.AppError{Step: "create state store directory", Err: s.mkdirErr} } - // lock backend - if unlock, err := b.lockfile.Lock(); err != nil { - return false, &hst.AppError{Step: "lock store segment", Err: err} + if unlock, err = s.fileMu.Lock(); err != nil { + return nil, &hst.AppError{Step: "acquire lock on the state store", Err: err} + } + return +} + +// identityHandle loads or initialises a multiHandle for identity. +// Must be called while holding a read lock on multiStore. +func (s *multiStore) identityHandle(identity int) (*multiHandle, error) { + b := new(multiHandle) + b.mu.Lock() + + if v, ok := s.handles.LoadOrStore(identity, b); ok { + b = v.(*multiHandle) + } else { + // acquire big lock to initialise previously unknown segment handle + if unlock, err := s.bigLock(); err != nil { + return nil, err + } else { + defer unlock() + } + + b.path = path.Join(s.base, strconv.Itoa(identity)) + b.fileMu = lockedfile.MutexAt(path.Join(b.path, multiLockFileName)) + + if err := os.MkdirAll(b.path, 0700); err != nil && !errors.Is(err, fs.ErrExist) { + s.handles.CompareAndDelete(identity, b) + return nil, &hst.AppError{Step: "create store segment directory", Err: err} + } + b.mu.Unlock() + } + return b, nil +} + +// do implements multiStore.Do on multiHandle. +func (h *multiHandle) do(identity int, f func(c Cursor)) (bool, error) { + if unlock, err := h.fileMu.Lock(); err != nil { + return false, &hst.AppError{Step: "acquire lock on store segment " + strconv.Itoa(identity), Err: err} } else { // unlock backend after Do is complete defer unlock() } // expose backend methods without exporting the pointer - c := new(struct{ *multiBackend }) - c.multiBackend = b + c := &struct{ *multiHandle }{h} f(c) // disable access to the backend on a best-effort basis - c.multiBackend = nil - + c.multiHandle = nil return true, nil } +func (s *multiStore) Do(identity int, f func(c Cursor)) (bool, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + if h, err := s.identityHandle(identity); err != nil { + return false, err + } else { + return h.do(identity, f) + } +} + func (s *multiStore) List() ([]int, error) { var entries []os.DirEntry - // read base directory to get all identities - if v, err := os.ReadDir(s.base); err != nil && !errors.Is(err, os.ErrNotExist) { - return nil, &hst.AppError{Step: "read store directory", Err: err} + // acquire big lock to read store segment list + s.mu.RLock() + if unlock, err := s.bigLock(); err != nil { + return nil, err } else { - entries = v + entries, err = os.ReadDir(s.base) + s.mu.RUnlock() + unlock() + + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, &hst.AppError{Step: "read store directory", Err: err} + } } - aidsBuf := make([]int, 0, len(entries)) + identities := make([]int, 0, len(entries)) for _, e := range entries { // skip non-directories if !e.IsDir() { @@ -88,9 +131,14 @@ func (s *multiStore) List() ([]int, error) { continue } + // skip lock file + if e.Name() == multiLockFileName { + continue + } + // skip non-numerical names if v, err := strconv.Atoi(e.Name()); err != nil { - s.msg.Verbosef("skipped non-aid entry %q", e.Name()) + s.msg.Verbosef("skipped non-identity entry %q", e.Name()) continue } else { if v < hst.IdentityMin || v > hst.IdentityMax { @@ -98,33 +146,34 @@ func (s *multiStore) List() ([]int, error) { continue } - aidsBuf = append(aidsBuf, v) + identities = append(identities, v) } } - return append([]int(nil), aidsBuf...), nil + return identities, nil } -type multiBackend struct { +// multiHandle is a handle on a multiStore segment. +type multiHandle struct { path string - // created/opened by prepare - lockfile *lockedfile.Mutex + // created by prepare + fileMu *lockedfile.Mutex mu sync.RWMutex } -func (b *multiBackend) filename(id *hst.ID) string { return path.Join(b.path, id.String()) } +func (h *multiHandle) filename(id *hst.ID) string { return path.Join(h.path, id.String()) } -// reads all launchers in multiBackend -// file contents are ignored if decode is false -func (b *multiBackend) load(decode bool) (map[hst.ID]*hst.State, error) { - b.mu.RLock() - defer b.mu.RUnlock() +// load iterates over all [hst.State] entries reachable via multiHandle, +// decoding their contents if decode is true. +func (h *multiHandle) load(decode bool) (map[hst.ID]*hst.State, error) { + h.mu.RLock() + defer h.mu.RUnlock() // read directory contents, should only contain files named after ids var entries []os.DirEntry - if pl, err := os.ReadDir(b.path); err != nil { + if pl, err := os.ReadDir(h.path); err != nil { return nil, &hst.AppError{Step: "read store segment directory", Err: err} } else { entries = pl @@ -152,7 +201,7 @@ func (b *multiBackend) load(decode bool) (map[hst.ID]*hst.State, error) { // run in a function to better handle file closing if err := func() error { // open state file for reading - if f, err := os.Open(path.Join(b.path, e.Name())); err != nil { + if f, err := os.Open(path.Join(h.path, e.Name())); err != nil { return &hst.AppError{Step: "open state file", Err: err} } else { var s hst.State @@ -183,15 +232,15 @@ func (b *multiBackend) load(decode bool) (map[hst.ID]*hst.State, error) { } // Save writes process state to filesystem. -func (b *multiBackend) Save(state *hst.State) error { - b.mu.Lock() - defer b.mu.Unlock() +func (h *multiHandle) Save(state *hst.State) error { + h.mu.Lock() + defer h.mu.Unlock() if err := state.Config.Validate(); err != nil { return err } - statePath := b.filename(&state.ID) + statePath := h.filename(&state.ID) if f, err := os.OpenFile(statePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil { return &hst.AppError{Step: "create state file", Err: err} } else if err = entryEncode(f, state); err != nil { @@ -203,21 +252,21 @@ func (b *multiBackend) Save(state *hst.State) error { return nil } -func (b *multiBackend) Destroy(id hst.ID) error { - b.mu.Lock() - defer b.mu.Unlock() +func (h *multiHandle) Destroy(id hst.ID) error { + h.mu.Lock() + defer h.mu.Unlock() - if err := os.Remove(b.filename(&id)); err != nil { + if err := os.Remove(h.filename(&id)); err != nil { return &hst.AppError{Step: "destroy state entry", Err: err} } return nil } -func (b *multiBackend) Load() (map[hst.ID]*hst.State, error) { return b.load(true) } +func (h *multiHandle) Load() (map[hst.ID]*hst.State, error) { return h.load(true) } -func (b *multiBackend) Len() (int, error) { +func (h *multiHandle) Len() (int, error) { // rn consists of only nil entries but has the correct length - rn, err := b.load(false) + rn, err := h.load(false) if err != nil { return -1, &hst.AppError{Step: "count state entries", Err: err} } @@ -226,9 +275,7 @@ func (b *multiBackend) Len() (int, error) { // NewMulti returns an instance of the multi-file store. func NewMulti(msg message.Msg, runDir string) Store { - return &multiStore{ - msg: msg, - base: path.Join(runDir, "state"), - backends: new(sync.Map), - } + store := &multiStore{msg: msg, base: path.Join(runDir, "state")} + store.fileMu = lockedfile.MutexAt(path.Join(store.base, multiLockFileName)) + return store }