|
|
|
@@ -5,10 +5,10 @@ import (
|
|
|
|
"fmt"
|
|
|
|
"fmt"
|
|
|
|
"io/fs"
|
|
|
|
"io/fs"
|
|
|
|
"os"
|
|
|
|
"os"
|
|
|
|
"path"
|
|
|
|
|
|
|
|
"strconv"
|
|
|
|
"strconv"
|
|
|
|
"sync"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"hakurei.app/container/check"
|
|
|
|
"hakurei.app/hst"
|
|
|
|
"hakurei.app/hst"
|
|
|
|
"hakurei.app/internal/lockedfile"
|
|
|
|
"hakurei.app/internal/lockedfile"
|
|
|
|
"hakurei.app/message"
|
|
|
|
"hakurei.app/message"
|
|
|
|
@@ -19,7 +19,8 @@ const multiLockFileName = "lock"
|
|
|
|
|
|
|
|
|
|
|
|
// fine-grained locking and access
|
|
|
|
// fine-grained locking and access
|
|
|
|
type multiStore struct {
|
|
|
|
type multiStore struct {
|
|
|
|
base string
|
|
|
|
// Pathname of directory that the store is rooted in.
|
|
|
|
|
|
|
|
base *check.Absolute
|
|
|
|
|
|
|
|
|
|
|
|
// All currently known instances of multiHandle, keyed by their identity.
|
|
|
|
// All currently known instances of multiHandle, keyed by their identity.
|
|
|
|
handles sync.Map
|
|
|
|
handles sync.Map
|
|
|
|
@@ -39,7 +40,7 @@ type multiStore struct {
|
|
|
|
// bigLock acquires fileMu on multiStore.
|
|
|
|
// bigLock acquires fileMu on multiStore.
|
|
|
|
// Must be called while holding a read lock on multiStore.
|
|
|
|
// Must be called while holding a read lock on multiStore.
|
|
|
|
func (s *multiStore) bigLock() (unlock func(), err error) {
|
|
|
|
func (s *multiStore) bigLock() (unlock func(), err error) {
|
|
|
|
s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base, 0700) })
|
|
|
|
s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base.String(), 0700) })
|
|
|
|
if s.mkdirErr != nil {
|
|
|
|
if s.mkdirErr != nil {
|
|
|
|
return nil, &hst.AppError{Step: "create state store directory", Err: s.mkdirErr}
|
|
|
|
return nil, &hst.AppError{Step: "create state store directory", Err: s.mkdirErr}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@@ -66,10 +67,10 @@ func (s *multiStore) identityHandle(identity int) (*multiHandle, error) {
|
|
|
|
defer unlock()
|
|
|
|
defer unlock()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
b.path = path.Join(s.base, strconv.Itoa(identity))
|
|
|
|
b.path = s.base.Append(strconv.Itoa(identity))
|
|
|
|
b.fileMu = lockedfile.MutexAt(path.Join(b.path, multiLockFileName))
|
|
|
|
b.fileMu = lockedfile.MutexAt(b.path.Append(multiLockFileName).String())
|
|
|
|
|
|
|
|
|
|
|
|
if err := os.MkdirAll(b.path, 0700); err != nil && !errors.Is(err, fs.ErrExist) {
|
|
|
|
if err := os.MkdirAll(b.path.String(), 0700); err != nil && !errors.Is(err, fs.ErrExist) {
|
|
|
|
s.handles.CompareAndDelete(identity, b)
|
|
|
|
s.handles.CompareAndDelete(identity, b)
|
|
|
|
return nil, &hst.AppError{Step: "create store segment directory", Err: err}
|
|
|
|
return nil, &hst.AppError{Step: "create store segment directory", Err: err}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@@ -114,7 +115,7 @@ func (s *multiStore) List() ([]int, error) {
|
|
|
|
if unlock, err := s.bigLock(); err != nil {
|
|
|
|
if unlock, err := s.bigLock(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
entries, err = os.ReadDir(s.base)
|
|
|
|
entries, err = os.ReadDir(s.base.String())
|
|
|
|
s.mu.RUnlock()
|
|
|
|
s.mu.RUnlock()
|
|
|
|
unlock()
|
|
|
|
unlock()
|
|
|
|
|
|
|
|
|
|
|
|
@@ -155,7 +156,8 @@ func (s *multiStore) List() ([]int, error) {
|
|
|
|
|
|
|
|
|
|
|
|
// multiHandle is a handle on a multiStore segment.
|
|
|
|
// multiHandle is a handle on a multiStore segment.
|
|
|
|
type multiHandle struct {
|
|
|
|
type multiHandle struct {
|
|
|
|
path string
|
|
|
|
// Pathname of directory that the segment referred to by multiHandle is rooted in.
|
|
|
|
|
|
|
|
path *check.Absolute
|
|
|
|
|
|
|
|
|
|
|
|
// created by prepare
|
|
|
|
// created by prepare
|
|
|
|
fileMu *lockedfile.Mutex
|
|
|
|
fileMu *lockedfile.Mutex
|
|
|
|
@@ -163,7 +165,8 @@ type multiHandle struct {
|
|
|
|
mu sync.RWMutex
|
|
|
|
mu sync.RWMutex
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (h *multiHandle) filename(id *hst.ID) string { return path.Join(h.path, id.String()) }
|
|
|
|
// instance returns the absolute pathname of a state entry file.
|
|
|
|
|
|
|
|
func (h *multiHandle) instance(id *hst.ID) *check.Absolute { return h.path.Append(id.String()) }
|
|
|
|
|
|
|
|
|
|
|
|
// load iterates over all [hst.State] entries reachable via multiHandle,
|
|
|
|
// load iterates over all [hst.State] entries reachable via multiHandle,
|
|
|
|
// decoding their contents if decode is true.
|
|
|
|
// decoding their contents if decode is true.
|
|
|
|
@@ -173,7 +176,7 @@ func (h *multiHandle) load(decode bool) (map[hst.ID]*hst.State, error) {
|
|
|
|
|
|
|
|
|
|
|
|
// read directory contents, should only contain files named after ids
|
|
|
|
// read directory contents, should only contain files named after ids
|
|
|
|
var entries []os.DirEntry
|
|
|
|
var entries []os.DirEntry
|
|
|
|
if pl, err := os.ReadDir(h.path); err != nil {
|
|
|
|
if pl, err := os.ReadDir(h.path.String()); err != nil {
|
|
|
|
return nil, &hst.AppError{Step: "read store segment directory", Err: err}
|
|
|
|
return nil, &hst.AppError{Step: "read store segment directory", Err: err}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
entries = pl
|
|
|
|
entries = pl
|
|
|
|
@@ -201,7 +204,7 @@ func (h *multiHandle) load(decode bool) (map[hst.ID]*hst.State, error) {
|
|
|
|
// run in a function to better handle file closing
|
|
|
|
// run in a function to better handle file closing
|
|
|
|
if err := func() error {
|
|
|
|
if err := func() error {
|
|
|
|
// open state file for reading
|
|
|
|
// open state file for reading
|
|
|
|
if f, err := os.Open(path.Join(h.path, e.Name())); err != nil {
|
|
|
|
if f, err := os.Open(h.path.Append(e.Name()).String()); err != nil {
|
|
|
|
return &hst.AppError{Step: "open state file", Err: err}
|
|
|
|
return &hst.AppError{Step: "open state file", Err: err}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
var s hst.State
|
|
|
|
var s hst.State
|
|
|
|
@@ -240,8 +243,7 @@ func (h *multiHandle) Save(state *hst.State) error {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
statePath := h.filename(&state.ID)
|
|
|
|
if f, err := os.OpenFile(h.instance(&state.ID).String(), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil {
|
|
|
|
if f, err := os.OpenFile(statePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil {
|
|
|
|
|
|
|
|
return &hst.AppError{Step: "create state file", Err: err}
|
|
|
|
return &hst.AppError{Step: "create state file", Err: err}
|
|
|
|
} else if err = entryEncode(f, state); err != nil {
|
|
|
|
} else if err = entryEncode(f, state); err != nil {
|
|
|
|
_ = f.Close()
|
|
|
|
_ = f.Close()
|
|
|
|
@@ -256,7 +258,7 @@ func (h *multiHandle) Destroy(id hst.ID) error {
|
|
|
|
h.mu.Lock()
|
|
|
|
h.mu.Lock()
|
|
|
|
defer h.mu.Unlock()
|
|
|
|
defer h.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
if err := os.Remove(h.filename(&id)); err != nil {
|
|
|
|
if err := os.Remove(h.instance(&id).String()); err != nil {
|
|
|
|
return &hst.AppError{Step: "destroy state entry", Err: err}
|
|
|
|
return &hst.AppError{Step: "destroy state entry", Err: err}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
@@ -274,8 +276,8 @@ func (h *multiHandle) Len() (int, error) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewMulti returns an instance of the multi-file store.
|
|
|
|
// NewMulti returns an instance of the multi-file store.
|
|
|
|
func NewMulti(msg message.Msg, runDir string) Store {
|
|
|
|
func NewMulti(msg message.Msg, prefix *check.Absolute) Store {
|
|
|
|
store := &multiStore{msg: msg, base: path.Join(runDir, "state")}
|
|
|
|
store := &multiStore{msg: msg, base: prefix.Append("state")}
|
|
|
|
store.fileMu = lockedfile.MutexAt(path.Join(store.base, multiLockFileName))
|
|
|
|
store.fileMu = lockedfile.MutexAt(store.base.Append(multiLockFileName).String())
|
|
|
|
return store
|
|
|
|
return store
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|