internal/app/state: improve handles internals
All checks were successful
Test / Create distribution (push) Successful in 33s
Test / Sandbox (push) Successful in 2m8s
Test / Hakurei (push) Successful in 3m10s
Test / Hpkg (push) Successful in 3m56s
Test / Sandbox (race detector) (push) Successful in 4m7s
Test / Hakurei (race detector) (push) Successful in 4m53s
Test / Flake checks (push) Successful in 1m31s
All checks were successful
Test / Create distribution (push) Successful in 33s
Test / Sandbox (push) Successful in 2m8s
Test / Hakurei (push) Successful in 3m10s
Test / Hpkg (push) Successful in 3m56s
Test / Sandbox (race detector) (push) Successful in 4m7s
Test / Hakurei (race detector) (push) Successful in 4m53s
Test / Flake checks (push) Successful in 1m31s
This replaces the Store interface with something better reflecting the underlying data format for #19. An implementation of Store is provided on top of the new code to ease transition. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
@@ -23,21 +23,30 @@ func entryEncode(w io.Writer, s *hst.State) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// entryDecodeHeader calls entryReadHeader, returning [hst.AppError] for a non-nil error.
|
||||||
|
func entryDecodeHeader(r io.Reader) (hst.Enablement, error) {
|
||||||
|
if et, err := entryReadHeader(r); err != nil {
|
||||||
|
return 0, &hst.AppError{Step: "decode state header", Err: err}
|
||||||
|
} else {
|
||||||
|
return et, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// entryDecode decodes [hst.State] from [io.Reader] and stores the result in the value pointed to by p.
|
// entryDecode decodes [hst.State] from [io.Reader] and stores the result in the value pointed to by p.
|
||||||
// entryDecode validates the embedded [hst.Config] value.
|
// entryDecode validates the embedded [hst.Config] value.
|
||||||
//
|
//
|
||||||
// A non-nil error returned by entryDecode is of type [hst.AppError].
|
// A non-nil error returned by entryDecode is of type [hst.AppError].
|
||||||
func entryDecode(r io.Reader, p *hst.State) error {
|
func entryDecode(r io.Reader, p *hst.State) (hst.Enablement, error) {
|
||||||
if et, err := entryReadHeader(r); err != nil {
|
if et, err := entryDecodeHeader(r); err != nil {
|
||||||
return &hst.AppError{Step: "decode state header", Err: err}
|
return et, err
|
||||||
} else if err = gob.NewDecoder(r).Decode(&p); err != nil {
|
} else if err = gob.NewDecoder(r).Decode(&p); err != nil {
|
||||||
return &hst.AppError{Step: "decode state body", Err: err}
|
return et, &hst.AppError{Step: "decode state body", Err: err}
|
||||||
} else if err = p.Config.Validate(); err != nil {
|
} else if err = p.Config.Validate(); err != nil {
|
||||||
return err
|
return et, err
|
||||||
} else if p.Enablements.Unwrap() != et {
|
} else if p.Enablements.Unwrap() != et {
|
||||||
return &hst.AppError{Step: "validate state enablement", Err: os.ErrInvalid,
|
return et, &hst.AppError{Step: "validate state enablement", Err: os.ErrInvalid,
|
||||||
Msg: fmt.Sprintf("state entry %s has unexpected enablement byte %#x, %#x", p.ID.String(), byte(p.Enablements.Unwrap()), byte(et))}
|
Msg: fmt.Sprintf("state entry %s has unexpected enablement byte %#x, %#x", p.ID.String(), byte(p.Enablements.Unwrap()), byte(et))}
|
||||||
} else {
|
} else {
|
||||||
return nil
|
return et, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,15 +17,6 @@ import (
|
|||||||
|
|
||||||
func TestEntryData(t *testing.T) {
|
func TestEntryData(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
newTemplateState := func() *hst.State {
|
|
||||||
return &hst.State{
|
|
||||||
ID: hst.ID(bytes.Repeat([]byte{0xaa}, len(hst.ID{}))),
|
|
||||||
PID: 0xcafebabe,
|
|
||||||
ShimPID: 0xdeadbeef,
|
|
||||||
Config: hst.Template(),
|
|
||||||
Time: time.Unix(0, 0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mustEncodeGob := func(e any) string {
|
mustEncodeGob := func(e any) string {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
@@ -80,8 +71,10 @@ func TestEntryData(t *testing.T) {
|
|||||||
// While the current implementation mostly is, it has randomised order
|
// While the current implementation mostly is, it has randomised order
|
||||||
// for iterating over maps, and hst.Config holds a map for environ.
|
// for iterating over maps, and hst.Config holds a map for environ.
|
||||||
var got hst.State
|
var got hst.State
|
||||||
if err := entryDecode(&buf, &got); err != nil {
|
if et, err := entryDecode(&buf, &got); err != nil {
|
||||||
t.Fatalf("entryDecode: error = %v", err)
|
t.Fatalf("entryDecode: error = %v", err)
|
||||||
|
} else if stateEt := got.Enablements.Unwrap(); et != stateEt {
|
||||||
|
t.Fatalf("entryDecode: et = %x, state %x", et, stateEt)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(&got, tc.s) {
|
if !reflect.DeepEqual(&got, tc.s) {
|
||||||
t.Errorf("entryEncode: %x", buf.Bytes())
|
t.Errorf("entryEncode: %x", buf.Bytes())
|
||||||
@@ -95,10 +88,12 @@ func TestEntryData(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
var got hst.State
|
var got hst.State
|
||||||
if err := entryDecode(strings.NewReader(tc.data), &got); !reflect.DeepEqual(err, tc.err) {
|
if et, err := entryDecode(strings.NewReader(tc.data), &got); !reflect.DeepEqual(err, tc.err) {
|
||||||
t.Fatalf("entryDecode: error = %#v, want %#v", err, tc.err)
|
t.Fatalf("entryDecode: error = %#v, want %#v", err, tc.err)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return
|
return
|
||||||
|
} else if stateEt := got.Enablements.Unwrap(); et != stateEt {
|
||||||
|
t.Fatalf("entryDecode: et = %x, state %x", et, stateEt)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reflect.DeepEqual(&got, tc.s) {
|
if !reflect.DeepEqual(&got, tc.s) {
|
||||||
@@ -128,6 +123,17 @@ func TestEntryData(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newTemplateState returns the address of a new template [hst.State] struct.
|
||||||
|
func newTemplateState() *hst.State {
|
||||||
|
return &hst.State{
|
||||||
|
ID: hst.ID(bytes.Repeat([]byte{0xaa}, len(hst.ID{}))),
|
||||||
|
PID: 0xcafebabe,
|
||||||
|
ShimPID: 0xdeadbeef,
|
||||||
|
Config: hst.Template(),
|
||||||
|
Time: time.Unix(0, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// stubNErrorWriter returns an error for writes above a certain size.
|
// stubNErrorWriter returns an error for writes above a certain size.
|
||||||
type stubNErrorWriter int
|
type stubNErrorWriter int
|
||||||
|
|
||||||
|
|||||||
@@ -1,283 +0,0 @@
|
|||||||
package state
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io/fs"
|
|
||||||
"os"
|
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"hakurei.app/container/check"
|
|
||||||
"hakurei.app/hst"
|
|
||||||
"hakurei.app/internal/lockedfile"
|
|
||||||
"hakurei.app/message"
|
|
||||||
)
|
|
||||||
|
|
||||||
// multiLockFileName is the name of the file backing [lockedfile.Mutex] of a multiStore and multiBackend.
|
|
||||||
const multiLockFileName = "lock"
|
|
||||||
|
|
||||||
// fine-grained locking and access
|
|
||||||
type multiStore struct {
|
|
||||||
// Pathname of directory that the store is rooted in.
|
|
||||||
base *check.Absolute
|
|
||||||
|
|
||||||
// All currently known instances of multiHandle, keyed by their identity.
|
|
||||||
handles sync.Map
|
|
||||||
// Held during List and when initialising previously unknown identities during Do.
|
|
||||||
// Must not be accessed directly. Callers should use the bigLock method instead.
|
|
||||||
fileMu *lockedfile.Mutex
|
|
||||||
|
|
||||||
// For creating the base directory.
|
|
||||||
mkdirOnce sync.Once
|
|
||||||
// Stored error value via mkdirOnce.
|
|
||||||
mkdirErr error
|
|
||||||
|
|
||||||
msg message.Msg
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// bigLock acquires fileMu on multiStore.
|
|
||||||
// Must be called while holding a read lock on multiStore.
|
|
||||||
func (s *multiStore) bigLock() (unlock func(), err error) {
|
|
||||||
s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base.String(), 0700) })
|
|
||||||
if s.mkdirErr != nil {
|
|
||||||
return nil, &hst.AppError{Step: "create state store directory", Err: s.mkdirErr}
|
|
||||||
}
|
|
||||||
|
|
||||||
if unlock, err = s.fileMu.Lock(); err != nil {
|
|
||||||
return nil, &hst.AppError{Step: "acquire lock on the state store", Err: err}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// identityHandle loads or initialises a multiHandle for identity.
|
|
||||||
// Must be called while holding a read lock on multiStore.
|
|
||||||
func (s *multiStore) identityHandle(identity int) (*multiHandle, error) {
|
|
||||||
b := new(multiHandle)
|
|
||||||
b.mu.Lock()
|
|
||||||
|
|
||||||
if v, ok := s.handles.LoadOrStore(identity, b); ok {
|
|
||||||
b = v.(*multiHandle)
|
|
||||||
} else {
|
|
||||||
// acquire big lock to initialise previously unknown segment handle
|
|
||||||
if unlock, err := s.bigLock(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else {
|
|
||||||
defer unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
b.path = s.base.Append(strconv.Itoa(identity))
|
|
||||||
b.fileMu = lockedfile.MutexAt(b.path.Append(multiLockFileName).String())
|
|
||||||
|
|
||||||
if err := os.MkdirAll(b.path.String(), 0700); err != nil && !errors.Is(err, fs.ErrExist) {
|
|
||||||
s.handles.CompareAndDelete(identity, b)
|
|
||||||
return nil, &hst.AppError{Step: "create store segment directory", Err: err}
|
|
||||||
}
|
|
||||||
b.mu.Unlock()
|
|
||||||
}
|
|
||||||
return b, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// do implements multiStore.Do on multiHandle.
|
|
||||||
func (h *multiHandle) do(identity int, f func(c Cursor)) (bool, error) {
|
|
||||||
if unlock, err := h.fileMu.Lock(); err != nil {
|
|
||||||
return false, &hst.AppError{Step: "acquire lock on store segment " + strconv.Itoa(identity), Err: err}
|
|
||||||
} else {
|
|
||||||
// unlock backend after Do is complete
|
|
||||||
defer unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// expose backend methods without exporting the pointer
|
|
||||||
c := &struct{ *multiHandle }{h}
|
|
||||||
f(c)
|
|
||||||
// disable access to the backend on a best-effort basis
|
|
||||||
c.multiHandle = nil
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *multiStore) Do(identity int, f func(c Cursor)) (bool, error) {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
|
|
||||||
if h, err := s.identityHandle(identity); err != nil {
|
|
||||||
return false, err
|
|
||||||
} else {
|
|
||||||
return h.do(identity, f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *multiStore) List() ([]int, error) {
|
|
||||||
var entries []os.DirEntry
|
|
||||||
|
|
||||||
// acquire big lock to read store segment list
|
|
||||||
s.mu.RLock()
|
|
||||||
if unlock, err := s.bigLock(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else {
|
|
||||||
entries, err = os.ReadDir(s.base.String())
|
|
||||||
s.mu.RUnlock()
|
|
||||||
unlock()
|
|
||||||
|
|
||||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
||||||
return nil, &hst.AppError{Step: "read store directory", Err: err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
identities := make([]int, 0, len(entries))
|
|
||||||
for _, e := range entries {
|
|
||||||
// skip non-directories
|
|
||||||
if !e.IsDir() {
|
|
||||||
s.msg.Verbosef("skipped non-directory entry %q", e.Name())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// skip lock file
|
|
||||||
if e.Name() == multiLockFileName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// skip non-numerical names
|
|
||||||
if v, err := strconv.Atoi(e.Name()); err != nil {
|
|
||||||
s.msg.Verbosef("skipped non-identity entry %q", e.Name())
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
if v < hst.IdentityMin || v > hst.IdentityMax {
|
|
||||||
s.msg.Verbosef("skipped out of bounds entry %q", e.Name())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
identities = append(identities, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return identities, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// multiHandle is a handle on a multiStore segment.
|
|
||||||
type multiHandle struct {
|
|
||||||
// Pathname of directory that the segment referred to by multiHandle is rooted in.
|
|
||||||
path *check.Absolute
|
|
||||||
|
|
||||||
// created by prepare
|
|
||||||
fileMu *lockedfile.Mutex
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// instance returns the absolute pathname of a state entry file.
|
|
||||||
func (h *multiHandle) instance(id *hst.ID) *check.Absolute { return h.path.Append(id.String()) }
|
|
||||||
|
|
||||||
// load iterates over all [hst.State] entries reachable via multiHandle,
|
|
||||||
// decoding their contents if decode is true.
|
|
||||||
func (h *multiHandle) load(decode bool) (map[hst.ID]*hst.State, error) {
|
|
||||||
h.mu.RLock()
|
|
||||||
defer h.mu.RUnlock()
|
|
||||||
|
|
||||||
// read directory contents, should only contain files named after ids
|
|
||||||
var entries []os.DirEntry
|
|
||||||
if pl, err := os.ReadDir(h.path.String()); err != nil {
|
|
||||||
return nil, &hst.AppError{Step: "read store segment directory", Err: err}
|
|
||||||
} else {
|
|
||||||
entries = pl
|
|
||||||
}
|
|
||||||
|
|
||||||
// allocate as if every entry is valid
|
|
||||||
// since that should be the case assuming no external interference happens
|
|
||||||
r := make(map[hst.ID]*hst.State, len(entries))
|
|
||||||
|
|
||||||
for _, e := range entries {
|
|
||||||
if e.IsDir() {
|
|
||||||
return nil, fmt.Errorf("unexpected directory %q in store", e.Name())
|
|
||||||
}
|
|
||||||
|
|
||||||
// skip lock file
|
|
||||||
if e.Name() == multiLockFileName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var id hst.ID
|
|
||||||
if err := id.UnmarshalText([]byte(e.Name())); err != nil {
|
|
||||||
return nil, &hst.AppError{Step: "parse state key", Err: err}
|
|
||||||
}
|
|
||||||
|
|
||||||
// run in a function to better handle file closing
|
|
||||||
if err := func() error {
|
|
||||||
// open state file for reading
|
|
||||||
if f, err := os.Open(h.path.Append(e.Name()).String()); err != nil {
|
|
||||||
return &hst.AppError{Step: "open state file", Err: err}
|
|
||||||
} else {
|
|
||||||
var s hst.State
|
|
||||||
r[id] = &s
|
|
||||||
|
|
||||||
// append regardless, but only parse if required, implements Len
|
|
||||||
if decode {
|
|
||||||
if err = entryDecode(f, &s); err != nil {
|
|
||||||
_ = f.Close()
|
|
||||||
return err
|
|
||||||
} else if s.ID != id {
|
|
||||||
return &hst.AppError{Step: "validate state identifier", Err: os.ErrInvalid,
|
|
||||||
Msg: fmt.Sprintf("state entry %s has unexpected id %s", id, &s.ID)}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = f.Close(); err != nil {
|
|
||||||
return &hst.AppError{Step: "close state file", Err: err}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save writes process state to filesystem.
|
|
||||||
func (h *multiHandle) Save(state *hst.State) error {
|
|
||||||
h.mu.Lock()
|
|
||||||
defer h.mu.Unlock()
|
|
||||||
|
|
||||||
if err := state.Config.Validate(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if f, err := os.OpenFile(h.instance(&state.ID).String(), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil {
|
|
||||||
return &hst.AppError{Step: "create state file", Err: err}
|
|
||||||
} else if err = entryEncode(f, state); err != nil {
|
|
||||||
_ = f.Close()
|
|
||||||
return err
|
|
||||||
} else if err = f.Close(); err != nil {
|
|
||||||
return &hst.AppError{Step: "close state file", Err: err}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *multiHandle) Destroy(id hst.ID) error {
|
|
||||||
h.mu.Lock()
|
|
||||||
defer h.mu.Unlock()
|
|
||||||
|
|
||||||
if err := os.Remove(h.instance(&id).String()); err != nil {
|
|
||||||
return &hst.AppError{Step: "destroy state entry", Err: err}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *multiHandle) Load() (map[hst.ID]*hst.State, error) { return h.load(true) }
|
|
||||||
|
|
||||||
func (h *multiHandle) Len() (int, error) {
|
|
||||||
// rn consists of only nil entries but has the correct length
|
|
||||||
rn, err := h.load(false)
|
|
||||||
if err != nil {
|
|
||||||
return -1, &hst.AppError{Step: "count state entries", Err: err}
|
|
||||||
}
|
|
||||||
return len(rn), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewMulti returns an instance of the multi-file store.
|
|
||||||
func NewMulti(msg message.Msg, prefix *check.Absolute) Store {
|
|
||||||
store := &multiStore{msg: msg, base: prefix.Append("state")}
|
|
||||||
store.fileMu = lockedfile.MutexAt(store.base.Append(multiLockFileName).String())
|
|
||||||
return store
|
|
||||||
}
|
|
||||||
161
internal/app/state/segment.go
Normal file
161
internal/app/state/segment.go
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"iter"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"hakurei.app/container/check"
|
||||||
|
"hakurei.app/hst"
|
||||||
|
"hakurei.app/internal/lockedfile"
|
||||||
|
)
|
||||||
|
|
||||||
|
// stateEntryHandle is a handle on a state entry retrieved from a storeHandle.
|
||||||
|
// Must only be used while its parent storeHandle.fileMu is held.
|
||||||
|
type stateEntryHandle struct {
|
||||||
|
// Error returned while decoding pathname.
|
||||||
|
// A non-nil value disables stateEntryHandle.
|
||||||
|
decodeErr error
|
||||||
|
|
||||||
|
// Checked path to entry file.
|
||||||
|
pathname *check.Absolute
|
||||||
|
|
||||||
|
hst.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// open opens the underlying state entry file, returning [hst.AppError] for a non-nil error.
|
||||||
|
func (eh *stateEntryHandle) open(flag int, perm os.FileMode) (*os.File, error) {
|
||||||
|
if eh.decodeErr != nil {
|
||||||
|
return nil, eh.decodeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if f, err := os.OpenFile(eh.pathname.String(), flag, perm); err != nil {
|
||||||
|
return nil, &hst.AppError{Step: "open state entry", Err: err}
|
||||||
|
} else {
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// destroy removes the underlying state entry file, returning [hst.AppError] for a non-nil error.
|
||||||
|
func (eh *stateEntryHandle) destroy() error {
|
||||||
|
// destroy does not go through open
|
||||||
|
if eh.decodeErr != nil {
|
||||||
|
return eh.decodeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Remove(eh.pathname.String()); err != nil {
|
||||||
|
return &hst.AppError{Step: "destroy state entry", Err: err}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// save encodes [hst.State] and writes it to the underlying file.
|
||||||
|
// An error is returned if a file already exists with the same identifier.
|
||||||
|
// save does not validate the embedded [hst.Config].
|
||||||
|
func (eh *stateEntryHandle) save(state *hst.State) error {
|
||||||
|
f, err := eh.open(os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = entryEncode(f, state)
|
||||||
|
if closeErr := f.Close(); closeErr != nil && err == nil {
|
||||||
|
err = &hst.AppError{Step: "close state file", Err: closeErr}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// load loads and validates the state entry header, and returns the [hst.Enablement] byte.
|
||||||
|
// for a non-nil v, the full state payload is decoded and stored in the value pointed to by v.
|
||||||
|
// load validates the embedded hst.Config value.
|
||||||
|
func (eh *stateEntryHandle) load(v *hst.State) (hst.Enablement, error) {
|
||||||
|
f, err := eh.open(os.O_RDONLY, 0)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var et hst.Enablement
|
||||||
|
if v != nil {
|
||||||
|
et, err = entryDecode(f, v)
|
||||||
|
if err == nil && v.ID != eh.ID {
|
||||||
|
err = &hst.AppError{Step: "validate state identifier", Err: os.ErrInvalid,
|
||||||
|
Msg: fmt.Sprintf("state entry %s has unexpected id %s", eh.ID.String(), v.ID.String())}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
et, err = entryDecodeHeader(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
if closeErr := f.Close(); closeErr != nil && err == nil {
|
||||||
|
err = &hst.AppError{Step: "close state file", Err: closeErr}
|
||||||
|
}
|
||||||
|
return et, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// storeHandle is a handle on a stateStore segment.
|
||||||
|
// Initialised by stateStore.identityHandle.
|
||||||
|
type storeHandle struct {
|
||||||
|
// Identity of instances tracked by this segment.
|
||||||
|
identity int
|
||||||
|
// Pathname of directory that the segment referred to by storeHandle is rooted in.
|
||||||
|
path *check.Absolute
|
||||||
|
// Inter-process mutex to synchronise operations against resources in this segment.
|
||||||
|
fileMu *lockedfile.Mutex
|
||||||
|
|
||||||
|
// Must be held alongside fileMu.
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// entries returns an iterator over all stateEntryHandle held in this segment.
|
||||||
|
// Must be called while holding a lock on mu and fileMu.
|
||||||
|
// A non-nil error attached to a stateEntryHandle indicates a malformed identifier and is of type [hst.AppError].
|
||||||
|
// A non-nil error returned by entries is of type [hst.AppError].
|
||||||
|
func (h *storeHandle) entries() (iter.Seq[*stateEntryHandle], int, error) {
|
||||||
|
// for error reporting
|
||||||
|
const step = "read store segment entries"
|
||||||
|
|
||||||
|
// read directory contents, should only contain storeMutexName and identifier
|
||||||
|
var entries []os.DirEntry
|
||||||
|
if pl, err := os.ReadDir(h.path.String()); err != nil {
|
||||||
|
return nil, -1, &hst.AppError{Step: step, Err: err}
|
||||||
|
} else {
|
||||||
|
entries = pl
|
||||||
|
}
|
||||||
|
|
||||||
|
// expects lock file
|
||||||
|
l := len(entries)
|
||||||
|
if l > 0 {
|
||||||
|
l--
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(yield func(*stateEntryHandle) bool) {
|
||||||
|
for _, ent := range entries {
|
||||||
|
var eh = stateEntryHandle{pathname: h.path.Append(ent.Name())}
|
||||||
|
|
||||||
|
// this should never happen
|
||||||
|
if ent.IsDir() {
|
||||||
|
eh.decodeErr = &hst.AppError{Step: step,
|
||||||
|
Err: errors.New("unexpected directory " + strconv.Quote(ent.Name()) + " in store")}
|
||||||
|
goto out
|
||||||
|
}
|
||||||
|
|
||||||
|
// silently skip lock file
|
||||||
|
if ent.Name() == storeMutexName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// this either indicates a serious bug or external interference
|
||||||
|
if err := eh.ID.UnmarshalText([]byte(ent.Name())); err != nil {
|
||||||
|
eh.decodeErr = &hst.AppError{Step: "decode store segment entry", Err: err}
|
||||||
|
goto out
|
||||||
|
}
|
||||||
|
|
||||||
|
out:
|
||||||
|
if !yield(&eh) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, l, nil
|
||||||
|
}
|
||||||
255
internal/app/state/segment_test.go
Normal file
255
internal/app/state/segment_test.go
Normal file
@@ -0,0 +1,255 @@
|
|||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"iter"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"slices"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"hakurei.app/container/check"
|
||||||
|
"hakurei.app/container/stub"
|
||||||
|
"hakurei.app/hst"
|
||||||
|
"hakurei.app/internal/lockedfile"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStateEntryHandle(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("lockout", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
wantErr := func() error { return stub.UniqueError(0) }
|
||||||
|
eh := stateEntryHandle{decodeErr: wantErr(), pathname: check.MustAbs("/proc/nonexistent")}
|
||||||
|
|
||||||
|
if _, err := eh.open(-1, 0); !reflect.DeepEqual(err, wantErr()) {
|
||||||
|
t.Errorf("open: error = %v, want %v", err, wantErr())
|
||||||
|
}
|
||||||
|
if err := eh.destroy(); !reflect.DeepEqual(err, wantErr()) {
|
||||||
|
t.Errorf("destroy: error = %v, want %v", err, wantErr())
|
||||||
|
}
|
||||||
|
if err := eh.save(nil); !reflect.DeepEqual(err, wantErr()) {
|
||||||
|
t.Errorf("save: error = %v, want %v", err, wantErr())
|
||||||
|
}
|
||||||
|
if _, err := eh.load(nil); !reflect.DeepEqual(err, wantErr()) {
|
||||||
|
t.Errorf("load: error = %v, want %v", err, wantErr())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("od", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
{
|
||||||
|
eh := stateEntryHandle{pathname: check.MustAbs(t.TempDir()).Append("entry")}
|
||||||
|
if f, err := eh.open(os.O_CREATE|syscall.O_EXCL, 0); err != nil {
|
||||||
|
t.Fatalf("open: error = %v", err)
|
||||||
|
} else if err = f.Close(); err != nil {
|
||||||
|
t.Errorf("Close: error = %v", err)
|
||||||
|
}
|
||||||
|
if err := eh.destroy(); err != nil {
|
||||||
|
t.Fatalf("destroy: error = %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("nonexistent", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
eh := stateEntryHandle{pathname: check.MustAbs("/proc/nonexistent")}
|
||||||
|
|
||||||
|
wantErrOpen := &hst.AppError{Step: "open state entry",
|
||||||
|
Err: &os.PathError{Op: "open", Path: "/proc/nonexistent", Err: syscall.ENOENT}}
|
||||||
|
if _, err := eh.open(os.O_CREATE|syscall.O_EXCL, 0); !reflect.DeepEqual(err, wantErrOpen) {
|
||||||
|
t.Errorf("open: error = %#v, want %#v", err, wantErrOpen)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantErrDestroy := &hst.AppError{Step: "destroy state entry",
|
||||||
|
Err: &os.PathError{Op: "remove", Path: "/proc/nonexistent", Err: syscall.ENOENT}}
|
||||||
|
if err := eh.destroy(); !reflect.DeepEqual(err, wantErrDestroy) {
|
||||||
|
t.Errorf("destroy: error = %#v, want %#v", err, wantErrDestroy)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("saveload", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
eh := stateEntryHandle{pathname: check.MustAbs(t.TempDir()).Append("entry"),
|
||||||
|
ID: newTemplateState().ID}
|
||||||
|
|
||||||
|
if err := eh.save(newTemplateState()); err != nil {
|
||||||
|
t.Fatalf("save: error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("validate", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("internal", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var got hst.State
|
||||||
|
if f, err := os.Open(eh.pathname.String()); err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
} else if _, err = entryDecode(f, &got); err != nil {
|
||||||
|
t.Fatalf("entryDecode: error = %v", err)
|
||||||
|
} else if err = f.Close(); err != nil {
|
||||||
|
t.Fatal(f.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
if want := newTemplateState(); !reflect.DeepEqual(&got, want) {
|
||||||
|
t.Errorf("entryDecode: %#v, want %#v", &got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("load header only", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
if et, err := eh.load(nil); err != nil {
|
||||||
|
t.Fatalf("load: error = %v", err)
|
||||||
|
} else if want := newTemplateState().Enablements.Unwrap(); et != want {
|
||||||
|
t.Errorf("load: et = %x, want %x", et, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("load", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var got hst.State
|
||||||
|
if _, err := eh.load(&got); err != nil {
|
||||||
|
t.Fatalf("load: error = %v", err)
|
||||||
|
} else if want := newTemplateState(); !reflect.DeepEqual(&got, want) {
|
||||||
|
t.Errorf("load: %#v, want %#v", &got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("load inconsistent", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
wantErr := &hst.AppError{Step: "validate state identifier", Err: os.ErrInvalid,
|
||||||
|
Msg: "state entry 00000000000000000000000000000000 has unexpected id aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
|
||||||
|
|
||||||
|
ehi := stateEntryHandle{pathname: eh.pathname}
|
||||||
|
if _, err := ehi.load(new(hst.State)); !reflect.DeepEqual(err, wantErr) {
|
||||||
|
t.Errorf("load: error = %#v, want %#v", err, wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStoreHandle(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
ents [2][]string
|
||||||
|
want func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle
|
||||||
|
ext func(t *testing.T, entries iter.Seq[*stateEntryHandle], n int)
|
||||||
|
}{
|
||||||
|
{"errors", [2][]string{{
|
||||||
|
"e81eb203b4190ac5c3842ef44d429945",
|
||||||
|
"lock",
|
||||||
|
"f0-invalid",
|
||||||
|
}, {
|
||||||
|
"f1-directory",
|
||||||
|
}}, func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle {
|
||||||
|
return []*stateEntryHandle{
|
||||||
|
newEh(nil, "e81eb203b4190ac5c3842ef44d429945"),
|
||||||
|
newEh(&hst.AppError{Step: "decode store segment entry",
|
||||||
|
Err: hst.IdentifierDecodeError{Err: hst.ErrIdentifierLength}}, "f0-invalid"),
|
||||||
|
newEh(&hst.AppError{Step: "read store segment entries",
|
||||||
|
Err: errors.New(`unexpected directory "f1-directory" in store`)}, "f1-directory"),
|
||||||
|
}
|
||||||
|
}, nil},
|
||||||
|
|
||||||
|
{"success", [2][]string{{
|
||||||
|
"e81eb203b4190ac5c3842ef44d429945",
|
||||||
|
"7958cfbb9272d9cf9cfd61c85afa13f1",
|
||||||
|
"d0b5f7446dd5bd3424ff2f7ac9cace1e",
|
||||||
|
"c8c8e2c4aea5c32fe47240ff8caa874e",
|
||||||
|
"fa0d30b249d80f155a1f80ceddcc32f2",
|
||||||
|
"lock",
|
||||||
|
}}, func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle {
|
||||||
|
return []*stateEntryHandle{
|
||||||
|
newEh(nil, "7958cfbb9272d9cf9cfd61c85afa13f1"),
|
||||||
|
newEh(nil, "c8c8e2c4aea5c32fe47240ff8caa874e"),
|
||||||
|
newEh(nil, "d0b5f7446dd5bd3424ff2f7ac9cace1e"),
|
||||||
|
newEh(nil, "e81eb203b4190ac5c3842ef44d429945"),
|
||||||
|
newEh(nil, "fa0d30b249d80f155a1f80ceddcc32f2"),
|
||||||
|
}
|
||||||
|
}, func(t *testing.T, entries iter.Seq[*stateEntryHandle], n int) {
|
||||||
|
if n != 5 {
|
||||||
|
t.Fatalf("entries: n = %d", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check partial drain
|
||||||
|
for range entries {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
p := check.MustAbs(t.TempDir()).Append("segment")
|
||||||
|
if err := os.Mkdir(p.String(), 0700); err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range tc.ents[0] {
|
||||||
|
if f, err := os.OpenFile(p.Append(s).String(), os.O_CREATE|os.O_EXCL, 0600); err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
} else if err = f.Close(); err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, s := range tc.ents[1] {
|
||||||
|
if err := os.Mkdir(p.Append(s).String(), 0700); err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var got []*stateEntryHandle
|
||||||
|
if entries, n, err := (&storeHandle{
|
||||||
|
identity: -0xbad,
|
||||||
|
path: p,
|
||||||
|
fileMu: lockedfile.MutexAt(p.Append("lock").String()),
|
||||||
|
}).entries(); err != nil {
|
||||||
|
t.Fatalf("entries: error = %v", err)
|
||||||
|
} else {
|
||||||
|
got = slices.AppendSeq(make([]*stateEntryHandle, 0, n), entries)
|
||||||
|
if tc.ext != nil {
|
||||||
|
tc.ext(t, entries, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
slices.SortFunc(got, func(a, b *stateEntryHandle) int { return strings.Compare(a.pathname.String(), b.pathname.String()) })
|
||||||
|
want := tc.want(func(err error, name string) *stateEntryHandle {
|
||||||
|
eh := stateEntryHandle{decodeErr: err, pathname: p.Append(name)}
|
||||||
|
if err == nil {
|
||||||
|
if err = eh.UnmarshalText([]byte(name)); err != nil {
|
||||||
|
t.Fatalf("UnmarshalText: error = %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &eh
|
||||||
|
})
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(got, want) {
|
||||||
|
t.Errorf("entries: %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("nonexistent", func(t *testing.T) {
|
||||||
|
var wantErr = &hst.AppError{Step: "read store segment entries", Err: &os.PathError{
|
||||||
|
Op: "open",
|
||||||
|
Path: "/proc/nonexistent",
|
||||||
|
Err: syscall.ENOENT,
|
||||||
|
}}
|
||||||
|
if _, _, err := (&storeHandle{
|
||||||
|
identity: -0xbad,
|
||||||
|
path: check.MustAbs("/proc/nonexistent"),
|
||||||
|
}).entries(); !reflect.DeepEqual(err, wantErr) {
|
||||||
|
t.Fatalf("entries: error = %#v, want %#v", err, wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -2,9 +2,16 @@
|
|||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"hakurei.app/container/check"
|
||||||
"hakurei.app/hst"
|
"hakurei.app/hst"
|
||||||
|
"hakurei.app/internal/lockedfile"
|
||||||
|
"hakurei.app/message"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
/* this provides an implementation of Store on top of the improved state tracking to ease in the changes */
|
||||||
|
|
||||||
type Store interface {
|
type Store interface {
|
||||||
// Do calls f exactly once and ensures store exclusivity until f returns.
|
// Do calls f exactly once and ensures store exclusivity until f returns.
|
||||||
// Returns whether f is called and any errors during the locking process.
|
// Returns whether f is called and any errors during the locking process.
|
||||||
@@ -16,6 +23,13 @@ type Store interface {
|
|||||||
List() (identities []int, err error)
|
List() (identities []int, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewMulti returns an instance of the multi-file store.
|
||||||
|
func NewMulti(msg message.Msg, prefix *check.Absolute) Store {
|
||||||
|
store := &stateStore{msg: msg, base: prefix.Append("state")}
|
||||||
|
store.fileMu = lockedfile.MutexAt(store.base.Append(storeMutexName).String())
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
|
||||||
// Cursor provides access to the store of an identity.
|
// Cursor provides access to the store of an identity.
|
||||||
type Cursor interface {
|
type Cursor interface {
|
||||||
Save(state *hst.State) error
|
Save(state *hst.State) error
|
||||||
@@ -23,3 +37,62 @@ type Cursor interface {
|
|||||||
Load() (map[hst.ID]*hst.State, error)
|
Load() (map[hst.ID]*hst.State, error)
|
||||||
Len() (int, error)
|
Len() (int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do implements stateStore.Do on storeHandle.
|
||||||
|
func (h *storeHandle) do(f func(c Cursor)) (bool, error) {
|
||||||
|
if unlock, err := h.fileMu.Lock(); err != nil {
|
||||||
|
return false, &hst.AppError{Step: "acquire lock on store segment " + strconv.Itoa(h.identity), Err: err}
|
||||||
|
} else {
|
||||||
|
defer unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
f(h)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/* these compatibility methods must only be called while fileMu is held */
|
||||||
|
|
||||||
|
func (h *storeHandle) Save(state *hst.State) error {
|
||||||
|
return (&stateEntryHandle{nil, h.path.Append(state.ID.String()), state.ID}).save(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *storeHandle) Destroy(id hst.ID) error {
|
||||||
|
return (&stateEntryHandle{nil, h.path.Append(id.String()), id}).destroy()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *storeHandle) Load() (map[hst.ID]*hst.State, error) {
|
||||||
|
entries, n, err := h.entries()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
r := make(map[hst.ID]*hst.State, n)
|
||||||
|
for eh := range entries {
|
||||||
|
if eh.decodeErr != nil {
|
||||||
|
err = eh.decodeErr
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var s hst.State
|
||||||
|
if _, err = eh.load(&s); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
r[eh.ID] = &s
|
||||||
|
}
|
||||||
|
return r, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *storeHandle) Len() (int, error) {
|
||||||
|
entries, _, err := h.entries()
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var n int
|
||||||
|
for eh := range entries {
|
||||||
|
if eh.decodeErr != nil {
|
||||||
|
err = eh.decodeErr
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|||||||
130
internal/app/state/store.go
Normal file
130
internal/app/state/store.go
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io/fs"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"hakurei.app/container/check"
|
||||||
|
"hakurei.app/hst"
|
||||||
|
"hakurei.app/internal/lockedfile"
|
||||||
|
"hakurei.app/message"
|
||||||
|
)
|
||||||
|
|
||||||
|
// storeMutexName is the pathname of the file backing [lockedfile.Mutex] of a stateStore and storeHandle.
|
||||||
|
const storeMutexName = "lock"
|
||||||
|
|
||||||
|
// A stateStore keeps track of [hst.State] via a well-known filesystem accessible to all hakurei priv-side processes.
|
||||||
|
// Access to store data and related resources are synchronised on a per-segment basis via storeHandle.
|
||||||
|
type stateStore struct {
|
||||||
|
// Pathname of directory that the store is rooted in.
|
||||||
|
base *check.Absolute
|
||||||
|
|
||||||
|
// All currently known instances of storeHandle, keyed by their identity.
|
||||||
|
handles sync.Map
|
||||||
|
|
||||||
|
// Inter-process mutex to synchronise operations against the entire store.
|
||||||
|
// Held during List and when initialising previously unknown identities during Do.
|
||||||
|
// Must not be accessed directly. Callers should use the bigLock method instead.
|
||||||
|
fileMu *lockedfile.Mutex
|
||||||
|
|
||||||
|
// For creating the base directory.
|
||||||
|
mkdirOnce sync.Once
|
||||||
|
// Stored error value via mkdirOnce.
|
||||||
|
mkdirErr error
|
||||||
|
|
||||||
|
msg message.Msg
|
||||||
|
}
|
||||||
|
|
||||||
|
// bigLock acquires fileMu on stateStore.
|
||||||
|
func (s *stateStore) bigLock() (unlock func(), err error) {
|
||||||
|
s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base.String(), 0700) })
|
||||||
|
if s.mkdirErr != nil {
|
||||||
|
return nil, &hst.AppError{Step: "create state store directory", Err: s.mkdirErr}
|
||||||
|
}
|
||||||
|
|
||||||
|
if unlock, err = s.fileMu.Lock(); err != nil {
|
||||||
|
return nil, &hst.AppError{Step: "acquire lock on the state store", Err: err}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// identityHandle loads or initialises a storeHandle for identity.
|
||||||
|
func (s *stateStore) identityHandle(identity int) (*storeHandle, error) {
|
||||||
|
h := new(storeHandle)
|
||||||
|
h.mu.Lock()
|
||||||
|
|
||||||
|
if v, ok := s.handles.LoadOrStore(identity, h); ok {
|
||||||
|
h = v.(*storeHandle)
|
||||||
|
} else {
|
||||||
|
// acquire big lock to initialise previously unknown segment handle
|
||||||
|
if unlock, err := s.bigLock(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
defer unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
h.identity = identity
|
||||||
|
h.path = s.base.Append(strconv.Itoa(identity))
|
||||||
|
h.fileMu = lockedfile.MutexAt(h.path.Append(storeMutexName).String())
|
||||||
|
|
||||||
|
if err := os.MkdirAll(h.path.String(), 0700); err != nil && !errors.Is(err, fs.ErrExist) {
|
||||||
|
s.handles.CompareAndDelete(identity, h)
|
||||||
|
return nil, &hst.AppError{Step: "create store segment directory", Err: err}
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateStore) Do(identity int, f func(c Cursor)) (bool, error) {
|
||||||
|
if h, err := s.identityHandle(identity); err != nil {
|
||||||
|
return false, err
|
||||||
|
} else {
|
||||||
|
return h.do(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateStore) List() ([]int, error) {
|
||||||
|
var entries []os.DirEntry
|
||||||
|
|
||||||
|
// acquire big lock to read store segment list
|
||||||
|
if unlock, err := s.bigLock(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
entries, err = os.ReadDir(s.base.String())
|
||||||
|
unlock()
|
||||||
|
|
||||||
|
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||||
|
return nil, &hst.AppError{Step: "read store directory", Err: err}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
identities := make([]int, 0, len(entries))
|
||||||
|
for _, e := range entries {
|
||||||
|
// should only be the big lock
|
||||||
|
if !e.IsDir() {
|
||||||
|
if e.Name() != storeMutexName {
|
||||||
|
s.msg.Verbosef("skipped non-directory entry %q", e.Name())
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// this either indicates a serious bug or external interference
|
||||||
|
if v, err := strconv.Atoi(e.Name()); err != nil {
|
||||||
|
s.msg.Verbosef("skipped non-identity entry %q", e.Name())
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
if v < hst.IdentityMin || v > hst.IdentityMax {
|
||||||
|
s.msg.Verbosef("skipped out of bounds entry %q", e.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
identities = append(identities, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return identities, nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user