internal/store: rename from state
All checks were successful
Test / Create distribution (push) Successful in 33s
Test / Sandbox (push) Successful in 2m9s
Test / Hakurei (push) Successful in 3m8s
Test / Hpkg (push) Successful in 4m2s
Test / Sandbox (race detector) (push) Successful in 4m7s
Test / Hakurei (race detector) (push) Successful in 4m55s
Test / Flake checks (push) Successful in 1m25s
All checks were successful
Test / Create distribution (push) Successful in 33s
Test / Sandbox (push) Successful in 2m9s
Test / Hakurei (push) Successful in 3m8s
Test / Hpkg (push) Successful in 4m2s
Test / Sandbox (race detector) (push) Successful in 4m7s
Test / Hakurei (race detector) (push) Successful in 4m55s
Test / Flake checks (push) Successful in 1m25s
This reduces collision with local variable names, and generally makes sense for the new store package, since it no longer specifies the state struct. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
130
internal/store/compat.go
Normal file
130
internal/store/compat.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/message"
|
||||
)
|
||||
|
||||
/* this provides an implementation of Store on top of the improved state tracking to ease in the changes */
|
||||
|
||||
type Store interface {
|
||||
// Do calls f exactly once and ensures store exclusivity until f returns.
|
||||
// Returns whether f is called and any errors during the locking process.
|
||||
// Cursor provided to f becomes invalid as soon as f returns.
|
||||
Do(identity int, f func(c Cursor)) (ok bool, err error)
|
||||
|
||||
// List queries the store and returns a list of identities known to the store.
|
||||
// Note that some or all returned identities might not have any active apps.
|
||||
List() (identities []int, err error)
|
||||
}
|
||||
|
||||
func (s *stateStore) Do(identity int, f func(c Cursor)) (bool, error) {
|
||||
if h, err := s.identityHandle(identity); err != nil {
|
||||
return false, err
|
||||
} else {
|
||||
return h.do(f)
|
||||
}
|
||||
}
|
||||
|
||||
// storeAdapter satisfies [Store] via stateStore.
|
||||
type storeAdapter struct {
|
||||
msg message.Msg
|
||||
*stateStore
|
||||
}
|
||||
|
||||
func (s storeAdapter) List() ([]int, error) {
|
||||
segments, n, err := s.segments()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
identities := make([]int, 0, n)
|
||||
for si := range segments {
|
||||
if si.err != nil {
|
||||
if m, ok := message.GetMessage(err); ok {
|
||||
s.msg.Verbose(m)
|
||||
} else {
|
||||
// unreachable
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
identities = append(identities, si.identity)
|
||||
}
|
||||
return identities, nil
|
||||
}
|
||||
|
||||
// NewMulti returns an instance of the multi-file store.
|
||||
func NewMulti(msg message.Msg, prefix *check.Absolute) Store {
|
||||
return storeAdapter{msg, newStore(prefix.Append("state"))}
|
||||
}
|
||||
|
||||
// Cursor provides access to the store of an identity.
|
||||
type Cursor interface {
|
||||
Save(state *hst.State) error
|
||||
Destroy(id hst.ID) error
|
||||
Load() (map[hst.ID]*hst.State, error)
|
||||
Len() (int, error)
|
||||
}
|
||||
|
||||
// do implements stateStore.Do on storeHandle.
|
||||
func (h *storeHandle) do(f func(c Cursor)) (bool, error) {
|
||||
if unlock, err := h.fileMu.Lock(); err != nil {
|
||||
return false, &hst.AppError{Step: "acquire lock on store segment " + strconv.Itoa(h.identity), Err: err}
|
||||
} else {
|
||||
defer unlock()
|
||||
}
|
||||
|
||||
f(h)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
/* these compatibility methods must only be called while fileMu is held */
|
||||
|
||||
func (h *storeHandle) Save(state *hst.State) error {
|
||||
return (&stateEntryHandle{nil, h.path.Append(state.ID.String()), state.ID}).save(state)
|
||||
}
|
||||
|
||||
func (h *storeHandle) Destroy(id hst.ID) error {
|
||||
return (&stateEntryHandle{nil, h.path.Append(id.String()), id}).destroy()
|
||||
}
|
||||
|
||||
func (h *storeHandle) Load() (map[hst.ID]*hst.State, error) {
|
||||
entries, n, err := h.entries()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := make(map[hst.ID]*hst.State, n)
|
||||
for eh := range entries {
|
||||
if eh.decodeErr != nil {
|
||||
err = eh.decodeErr
|
||||
break
|
||||
}
|
||||
var s hst.State
|
||||
if _, err = eh.load(&s); err != nil {
|
||||
break
|
||||
}
|
||||
r[eh.ID] = &s
|
||||
}
|
||||
return r, err
|
||||
}
|
||||
|
||||
func (h *storeHandle) Len() (int, error) {
|
||||
entries, _, err := h.entries()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
var n int
|
||||
for eh := range entries {
|
||||
if eh.decodeErr != nil {
|
||||
err = eh.decodeErr
|
||||
}
|
||||
n++
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
120
internal/store/compat_test.go
Normal file
120
internal/store/compat_test.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package store_test
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/internal/store"
|
||||
"hakurei.app/message"
|
||||
)
|
||||
|
||||
func TestMulti(t *testing.T) {
|
||||
s := store.NewMulti(message.NewMsg(log.New(log.Writer(), "multi: ", 0)), check.MustAbs(t.TempDir()))
|
||||
|
||||
t.Run("list empty store", func(t *testing.T) {
|
||||
if identities, err := s.List(); err != nil {
|
||||
t.Fatalf("List: error = %v", err)
|
||||
} else if len(identities) != 0 {
|
||||
t.Fatalf("List: identities = %#v", identities)
|
||||
}
|
||||
})
|
||||
|
||||
const (
|
||||
insertEntryChecked = iota
|
||||
insertEntryNoCheck
|
||||
insertEntryOtherApp
|
||||
|
||||
tl
|
||||
)
|
||||
|
||||
var tc [tl]hst.State
|
||||
for i := 0; i < tl; i++ {
|
||||
if err := hst.NewInstanceID(&tc[i].ID); err != nil {
|
||||
t.Fatalf("cannot create dummy state: %v", err)
|
||||
}
|
||||
tc[i].PID = rand.Int()
|
||||
tc[i].Config = hst.Template()
|
||||
tc[i].Time = time.Now()
|
||||
}
|
||||
|
||||
do := func(identity int, f func(c store.Cursor)) {
|
||||
if ok, err := s.Do(identity, f); err != nil {
|
||||
t.Fatalf("Do: ok = %v, error = %v", ok, err)
|
||||
}
|
||||
}
|
||||
|
||||
insert := func(i, identity int) {
|
||||
do(identity, func(c store.Cursor) {
|
||||
if err := c.Save(&tc[i]); err != nil {
|
||||
t.Fatalf("Save: error = %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
check := func(i, identity int) {
|
||||
do(identity, func(c store.Cursor) {
|
||||
if entries, err := c.Load(); err != nil {
|
||||
t.Fatalf("Load: error = %v", err)
|
||||
} else if got, ok := entries[tc[i].ID]; !ok {
|
||||
t.Fatalf("Load: entry %s missing", &tc[i].ID)
|
||||
} else {
|
||||
got.Time = tc[i].Time
|
||||
if !reflect.DeepEqual(got, &tc[i]) {
|
||||
t.Fatalf("Load: entry %s got %#v, want %#v", &tc[i].ID, got, &tc[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// insert entry checked
|
||||
insert(insertEntryChecked, 0)
|
||||
check(insertEntryChecked, 0)
|
||||
|
||||
// insert entry unchecked
|
||||
insert(insertEntryNoCheck, 0)
|
||||
|
||||
// insert entry different identity
|
||||
insert(insertEntryOtherApp, 1)
|
||||
check(insertEntryOtherApp, 1)
|
||||
|
||||
// check previous insertion
|
||||
check(insertEntryNoCheck, 0)
|
||||
|
||||
// list identities
|
||||
if identities, err := s.List(); err != nil {
|
||||
t.Fatalf("List: error = %v", err)
|
||||
} else {
|
||||
slices.Sort(identities)
|
||||
want := []int{0, 1}
|
||||
if !slices.Equal(identities, want) {
|
||||
t.Fatalf("List() = %#v, want %#v", identities, want)
|
||||
}
|
||||
}
|
||||
|
||||
// join store
|
||||
if entries, err := store.Join(s); err != nil {
|
||||
t.Fatalf("Join: error = %v", err)
|
||||
} else if len(entries) != 3 {
|
||||
t.Fatalf("Join(s) = %#v", entries)
|
||||
}
|
||||
|
||||
// clear identity 1
|
||||
do(1, func(c store.Cursor) {
|
||||
if err := c.Destroy(tc[insertEntryOtherApp].ID); err != nil {
|
||||
t.Fatalf("Destroy: error = %v", err)
|
||||
}
|
||||
})
|
||||
do(1, func(c store.Cursor) {
|
||||
if l, err := c.Len(); err != nil {
|
||||
t.Fatalf("Len: error = %v", err)
|
||||
} else if l != 0 {
|
||||
t.Fatalf("Len: %d, want 0", l)
|
||||
}
|
||||
})
|
||||
}
|
||||
52
internal/store/data.go
Normal file
52
internal/store/data.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
// entryEncode encodes [hst.State] into [io.Writer] with the state entry header.
|
||||
// entryEncode does not validate the embedded [hst.Config] value.
|
||||
//
|
||||
// A non-nil error returned by entryEncode is of type [hst.AppError].
|
||||
func entryEncode(w io.Writer, s *hst.State) error {
|
||||
if err := entryWriteHeader(w, s.Enablements.Unwrap()); err != nil {
|
||||
return &hst.AppError{Step: "encode state header", Err: err}
|
||||
} else if err = gob.NewEncoder(w).Encode(s); err != nil {
|
||||
return &hst.AppError{Step: "encode state body", Err: err}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// entryDecodeHeader calls entryReadHeader, returning [hst.AppError] for a non-nil error.
|
||||
func entryDecodeHeader(r io.Reader) (hst.Enablement, error) {
|
||||
if et, err := entryReadHeader(r); err != nil {
|
||||
return 0, &hst.AppError{Step: "decode state header", Err: err}
|
||||
} else {
|
||||
return et, nil
|
||||
}
|
||||
}
|
||||
|
||||
// entryDecode decodes [hst.State] from [io.Reader] and stores the result in the value pointed to by p.
|
||||
// entryDecode validates the embedded [hst.Config] value.
|
||||
//
|
||||
// A non-nil error returned by entryDecode is of type [hst.AppError].
|
||||
func entryDecode(r io.Reader, p *hst.State) (hst.Enablement, error) {
|
||||
if et, err := entryDecodeHeader(r); err != nil {
|
||||
return et, err
|
||||
} else if err = gob.NewDecoder(r).Decode(&p); err != nil {
|
||||
return et, &hst.AppError{Step: "decode state body", Err: err}
|
||||
} else if err = p.Config.Validate(); err != nil {
|
||||
return et, err
|
||||
} else if p.Enablements.Unwrap() != et {
|
||||
return et, &hst.AppError{Step: "validate state enablement", Err: os.ErrInvalid,
|
||||
Msg: fmt.Sprintf("state entry %s has unexpected enablement byte %#x, %#x", p.ID.String(), byte(p.Enablements.Unwrap()), byte(et))}
|
||||
} else {
|
||||
return et, nil
|
||||
}
|
||||
}
|
||||
145
internal/store/data_test.go
Normal file
145
internal/store/data_test.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"hakurei.app/container/stub"
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
func TestEntryData(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
mustEncodeGob := func(e any) string {
|
||||
var buf bytes.Buffer
|
||||
if err := gob.NewEncoder(&buf).Encode(e); err != nil {
|
||||
t.Fatalf("cannot encode invalid state: %v", err)
|
||||
return "\x00" // not reached
|
||||
} else {
|
||||
return buf.String()
|
||||
}
|
||||
}
|
||||
templateStateGob := mustEncodeGob(newTemplateState())
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
data string
|
||||
s *hst.State
|
||||
err error
|
||||
}{
|
||||
{"invalid header", "\x00\xff\xca\xfe\xff\xff\xff\x00", nil, &hst.AppError{
|
||||
Step: "decode state header", Err: errors.New("unexpected revision ffff")}},
|
||||
|
||||
{"invalid gob", "\x00\xff\xca\xfe\x00\x00\xff\x00", nil, &hst.AppError{
|
||||
Step: "decode state body", Err: io.EOF}},
|
||||
|
||||
{"invalid config", "\x00\xff\xca\xfe\x00\x00\xff\x00" + mustEncodeGob(new(hst.State)), new(hst.State), &hst.AppError{
|
||||
Step: "validate configuration", Err: hst.ErrConfigNull,
|
||||
Msg: "invalid configuration"}},
|
||||
|
||||
{"inconsistent enablement", "\x00\xff\xca\xfe\x00\x00\xff\x00" + templateStateGob, newTemplateState(), &hst.AppError{
|
||||
Step: "validate state enablement", Err: os.ErrInvalid,
|
||||
Msg: "state entry aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa has unexpected enablement byte 0xd, 0xff"}},
|
||||
|
||||
{"template", "\x00\xff\xca\xfe\x00\x00\x0d\xf2" + templateStateGob, newTemplateState(), nil},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("encode", func(t *testing.T) {
|
||||
if tc.s == nil || tc.s.Config == nil {
|
||||
return
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := entryEncode(&buf, tc.s); err != nil {
|
||||
t.Fatalf("entryEncode: error = %v", err)
|
||||
}
|
||||
|
||||
if tc.err == nil {
|
||||
// Gob encoding is not guaranteed to be deterministic.
|
||||
// While the current implementation mostly is, it has randomised order
|
||||
// for iterating over maps, and hst.Config holds a map for environ.
|
||||
var got hst.State
|
||||
if et, err := entryDecode(&buf, &got); err != nil {
|
||||
t.Fatalf("entryDecode: error = %v", err)
|
||||
} else if stateEt := got.Enablements.Unwrap(); et != stateEt {
|
||||
t.Fatalf("entryDecode: et = %x, state %x", et, stateEt)
|
||||
}
|
||||
if !reflect.DeepEqual(&got, tc.s) {
|
||||
t.Errorf("entryEncode: %x", buf.Bytes())
|
||||
}
|
||||
} else if testing.Verbose() {
|
||||
t.Logf("%x", buf.String())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("decode", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var got hst.State
|
||||
if et, err := entryDecode(strings.NewReader(tc.data), &got); !reflect.DeepEqual(err, tc.err) {
|
||||
t.Fatalf("entryDecode: error = %#v, want %#v", err, tc.err)
|
||||
} else if err != nil {
|
||||
return
|
||||
} else if stateEt := got.Enablements.Unwrap(); et != stateEt {
|
||||
t.Fatalf("entryDecode: et = %x, state %x", et, stateEt)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(&got, tc.s) {
|
||||
t.Errorf("entryDecode: %#v, want %#v", &got, tc.s)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("encode fault", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := newTemplateState()
|
||||
|
||||
t.Run("gob", func(t *testing.T) {
|
||||
var want = &hst.AppError{Step: "encode state body", Err: stub.UniqueError(0xcafe)}
|
||||
if err := entryEncode(stubNErrorWriter(entryHeaderSize), s); !reflect.DeepEqual(err, want) {
|
||||
t.Errorf("entryEncode: error = %#v, want %#v", err, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("header", func(t *testing.T) {
|
||||
var want = &hst.AppError{Step: "encode state header", Err: stub.UniqueError(0xcafe)}
|
||||
if err := entryEncode(stubNErrorWriter(entryHeaderSize-1), s); !reflect.DeepEqual(err, want) {
|
||||
t.Errorf("entryEncode: error = %#v, want %#v", err, want)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// newTemplateState returns the address of a new template [hst.State] struct.
|
||||
func newTemplateState() *hst.State {
|
||||
return &hst.State{
|
||||
ID: hst.ID(bytes.Repeat([]byte{0xaa}, len(hst.ID{}))),
|
||||
PID: 0xcafebabe,
|
||||
ShimPID: 0xdeadbeef,
|
||||
Config: hst.Template(),
|
||||
Time: time.Unix(0, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// stubNErrorWriter returns an error for writes above a certain size.
|
||||
type stubNErrorWriter int
|
||||
|
||||
func (w stubNErrorWriter) Write(p []byte) (n int, err error) {
|
||||
if len(p) > int(w) {
|
||||
return int(w), stub.UniqueError(0xcafe)
|
||||
}
|
||||
return io.Discard.Write(p)
|
||||
}
|
||||
86
internal/store/header.go
Normal file
86
internal/store/header.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
const (
|
||||
// entryHeaderMagic are magic bytes at the beginning of the state entry file.
|
||||
entryHeaderMagic = "\x00\xff\xca\xfe"
|
||||
// entryHeaderRevision follows entryHeaderMagic and is incremented for revisions of the format.
|
||||
entryHeaderRevision = "\x00\x00"
|
||||
// entryHeaderSize is the fixed size of the header in bytes, including the enablement byte and its complement.
|
||||
entryHeaderSize = len(entryHeaderMagic+entryHeaderRevision) + 2
|
||||
)
|
||||
|
||||
// entryHeaderEncode encodes a state entry header for a [hst.Enablement] byte.
|
||||
func entryHeaderEncode(et hst.Enablement) *[entryHeaderSize]byte {
|
||||
data := [entryHeaderSize]byte([]byte(
|
||||
entryHeaderMagic + entryHeaderRevision + string([]hst.Enablement{et, ^et}),
|
||||
))
|
||||
return &data
|
||||
}
|
||||
|
||||
// entryHeaderDecode validates a state entry header and returns the [hst.Enablement] byte.
|
||||
func entryHeaderDecode(data *[entryHeaderSize]byte) (hst.Enablement, error) {
|
||||
if magic := data[:len(entryHeaderMagic)]; string(magic) != entryHeaderMagic {
|
||||
return 0, errors.New("invalid header " + hex.EncodeToString(magic))
|
||||
}
|
||||
if revision := data[len(entryHeaderMagic):len(entryHeaderMagic+entryHeaderRevision)]; string(revision) != entryHeaderRevision {
|
||||
return 0, errors.New("unexpected revision " + hex.EncodeToString(revision))
|
||||
}
|
||||
|
||||
et := data[len(entryHeaderMagic+entryHeaderRevision)]
|
||||
if et != ^data[len(entryHeaderMagic+entryHeaderRevision)+1] {
|
||||
return 0, errors.New("header enablement value is inconsistent")
|
||||
}
|
||||
return hst.Enablement(et), nil
|
||||
}
|
||||
|
||||
// EntrySizeError is returned for a file too small to hold a state entry header.
|
||||
type EntrySizeError struct {
|
||||
Name string
|
||||
Size int64
|
||||
}
|
||||
|
||||
func (e *EntrySizeError) Error() string {
|
||||
if e.Name == "" {
|
||||
return "state entry file is too short"
|
||||
}
|
||||
return "state entry file " + strconv.Quote(e.Name) + " is too short"
|
||||
}
|
||||
|
||||
// entryCheckFile checks whether [os.FileInfo] refers to a file that might hold [hst.State].
|
||||
func entryCheckFile(fi os.FileInfo) error {
|
||||
if fi.IsDir() {
|
||||
return syscall.EISDIR
|
||||
}
|
||||
if s := fi.Size(); s <= int64(entryHeaderSize) {
|
||||
return &EntrySizeError{Name: fi.Name(), Size: s}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// entryReadHeader reads [hst.Enablement] from an [io.Reader].
|
||||
func entryReadHeader(r io.Reader) (hst.Enablement, error) {
|
||||
var data [entryHeaderSize]byte
|
||||
if n, err := r.Read(data[:]); err != nil {
|
||||
return 0, err
|
||||
} else if n != entryHeaderSize {
|
||||
return 0, &EntrySizeError{Size: int64(n)}
|
||||
}
|
||||
return entryHeaderDecode(&data)
|
||||
}
|
||||
|
||||
// entryWriteHeader writes [hst.Enablement] header to an [io.Writer].
|
||||
func entryWriteHeader(w io.Writer, et hst.Enablement) error {
|
||||
_, err := w.Write(entryHeaderEncode(et)[:])
|
||||
return err
|
||||
}
|
||||
184
internal/store/header_test.go
Normal file
184
internal/store/header_test.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
func TestEntryHeader(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
data [entryHeaderSize]byte
|
||||
et hst.Enablement
|
||||
err error
|
||||
}{
|
||||
{"complement mismatch", [entryHeaderSize]byte{0x00, 0xff, 0xca, 0xfe, 0x00, 0x00,
|
||||
0x0a, 0xf6}, 0,
|
||||
errors.New("header enablement value is inconsistent")},
|
||||
{"unexpected revision", [entryHeaderSize]byte{0x00, 0xff, 0xca, 0xfe, 0xff, 0xff}, 0,
|
||||
errors.New("unexpected revision ffff")},
|
||||
{"invalid header", [entryHeaderSize]byte{0x00, 0xfe, 0xca, 0xfe}, 0,
|
||||
errors.New("invalid header 00fecafe")},
|
||||
|
||||
{"success high", [entryHeaderSize]byte{0x00, 0xff, 0xca, 0xfe, 0x00, 0x00,
|
||||
0xff, 0x00}, 0xff, nil},
|
||||
{"success", [entryHeaderSize]byte{0x00, 0xff, 0xca, 0xfe, 0x00, 0x00,
|
||||
0x09, 0xf6}, hst.EWayland | hst.EPulse, nil},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("encode", func(t *testing.T) {
|
||||
if tc.err != nil {
|
||||
return
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
if got := entryHeaderEncode(tc.et); *got != tc.data {
|
||||
t.Errorf("entryHeaderEncode: %x, want %x", *got, tc.data)
|
||||
}
|
||||
|
||||
t.Run("write", func(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
if err := entryWriteHeader(&buf, tc.et); err != nil {
|
||||
t.Fatalf("entryWriteHeader: error = %v", err)
|
||||
}
|
||||
if got := ([entryHeaderSize]byte)(buf.Bytes()); got != tc.data {
|
||||
t.Errorf("entryWriteHeader: %x, want %x", got, tc.data)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("decode", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
got, err := entryHeaderDecode(&tc.data)
|
||||
if !reflect.DeepEqual(err, tc.err) {
|
||||
t.Fatalf("entryHeaderDecode: error = %#v, want %#v", err, tc.err)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if got != tc.et {
|
||||
t.Errorf("entryHeaderDecode: et = %q, want %q", got, tc.et)
|
||||
}
|
||||
|
||||
if got, err = entryReadHeader(bytes.NewReader(tc.data[:])); err != nil {
|
||||
t.Fatalf("entryReadHeader: error = %#v", err)
|
||||
} else if got != tc.et {
|
||||
t.Errorf("entryReadHeader: et = %q, want %q", got, tc.et)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEntrySizeError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
err error
|
||||
want string
|
||||
}{
|
||||
{"size only", &EntrySizeError{Size: 0xdeadbeef},
|
||||
`state entry file is too short`},
|
||||
{"full", &EntrySizeError{Name: "nonexistent", Size: 0xdeadbeef},
|
||||
`state entry file "nonexistent" is too short`},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if got := tc.err.Error(); got != tc.want {
|
||||
t.Errorf("Error: %s, want %s", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEntryCheckFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
fi os.FileInfo
|
||||
err error
|
||||
}{
|
||||
{"dir", &stubFi{name: "dir", isDir: true},
|
||||
syscall.EISDIR},
|
||||
{"short", stubFi{name: "short", size: 8},
|
||||
&EntrySizeError{Name: "short", Size: 8}},
|
||||
{"success", stubFi{size: 9}, nil},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if err := entryCheckFile(tc.fi); !reflect.DeepEqual(err, tc.err) {
|
||||
t.Errorf("entryCheckFile: error = %#v, want %#v", err, tc.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEntryReadHeader(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
newR func() io.Reader
|
||||
err error
|
||||
}{
|
||||
{"eof", func() io.Reader { return bytes.NewReader([]byte{}) }, io.EOF},
|
||||
{"short", func() io.Reader { return bytes.NewReader([]byte{0}) }, &EntrySizeError{Size: 1}},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if _, err := entryReadHeader(tc.newR()); !reflect.DeepEqual(err, tc.err) {
|
||||
t.Errorf("entryReadHeader: error = %#v, want %#v", err, tc.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// stubFi partially implements [os.FileInfo] using hardcoded values.
|
||||
type stubFi struct {
|
||||
name string
|
||||
size int64
|
||||
isDir bool
|
||||
}
|
||||
|
||||
func (fi stubFi) Name() string {
|
||||
if fi.name == "" {
|
||||
panic("unreachable")
|
||||
}
|
||||
return fi.name
|
||||
}
|
||||
|
||||
func (fi stubFi) Size() int64 {
|
||||
if fi.size < 0 {
|
||||
panic("unreachable")
|
||||
}
|
||||
return fi.size
|
||||
}
|
||||
|
||||
func (fi stubFi) IsDir() bool { return fi.isDir }
|
||||
|
||||
func (fi stubFi) Mode() fs.FileMode { panic("unreachable") }
|
||||
func (fi stubFi) ModTime() time.Time { panic("unreachable") }
|
||||
func (fi stubFi) Sys() any { panic("unreachable") }
|
||||
64
internal/store/join.go
Normal file
64
internal/store/join.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"maps"
|
||||
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrDuplicate = errors.New("store contains duplicates")
|
||||
)
|
||||
|
||||
/*
|
||||
Joiner is the interface that wraps the Join method.
|
||||
|
||||
The Join function uses Joiner if available.
|
||||
*/
|
||||
type Joiner interface {
|
||||
Join() (map[hst.ID]*hst.State, error)
|
||||
}
|
||||
|
||||
// Join returns joined state entries of all active identities.
|
||||
func Join(s Store) (map[hst.ID]*hst.State, error) {
|
||||
if j, ok := s.(Joiner); ok {
|
||||
return j.Join()
|
||||
}
|
||||
|
||||
var (
|
||||
aids []int
|
||||
entries = make(map[hst.ID]*hst.State)
|
||||
|
||||
el int
|
||||
res map[hst.ID]*hst.State
|
||||
loadErr error
|
||||
)
|
||||
|
||||
if ln, err := s.List(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
aids = ln
|
||||
}
|
||||
|
||||
for _, aid := range aids {
|
||||
if _, err := s.Do(aid, func(c Cursor) {
|
||||
res, loadErr = c.Load()
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if loadErr != nil {
|
||||
return nil, loadErr
|
||||
}
|
||||
|
||||
// save expected length
|
||||
el = len(entries) + len(res)
|
||||
maps.Copy(entries, res)
|
||||
if len(entries) != el {
|
||||
return nil, ErrDuplicate
|
||||
}
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
161
internal/store/segment.go
Normal file
161
internal/store/segment.go
Normal file
@@ -0,0 +1,161 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"iter"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/internal/lockedfile"
|
||||
)
|
||||
|
||||
// stateEntryHandle is a handle on a state entry retrieved from a storeHandle.
|
||||
// Must only be used while its parent storeHandle.fileMu is held.
|
||||
type stateEntryHandle struct {
|
||||
// Error returned while decoding pathname.
|
||||
// A non-nil value disables stateEntryHandle.
|
||||
decodeErr error
|
||||
|
||||
// Checked path to entry file.
|
||||
pathname *check.Absolute
|
||||
|
||||
hst.ID
|
||||
}
|
||||
|
||||
// open opens the underlying state entry file, returning [hst.AppError] for a non-nil error.
|
||||
func (eh *stateEntryHandle) open(flag int, perm os.FileMode) (*os.File, error) {
|
||||
if eh.decodeErr != nil {
|
||||
return nil, eh.decodeErr
|
||||
}
|
||||
|
||||
if f, err := os.OpenFile(eh.pathname.String(), flag, perm); err != nil {
|
||||
return nil, &hst.AppError{Step: "open state entry", Err: err}
|
||||
} else {
|
||||
return f, nil
|
||||
}
|
||||
}
|
||||
|
||||
// destroy removes the underlying state entry file, returning [hst.AppError] for a non-nil error.
|
||||
func (eh *stateEntryHandle) destroy() error {
|
||||
// destroy does not go through open
|
||||
if eh.decodeErr != nil {
|
||||
return eh.decodeErr
|
||||
}
|
||||
|
||||
if err := os.Remove(eh.pathname.String()); err != nil {
|
||||
return &hst.AppError{Step: "destroy state entry", Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// save encodes [hst.State] and writes it to the underlying file.
|
||||
// An error is returned if a file already exists with the same identifier.
|
||||
// save does not validate the embedded [hst.Config].
|
||||
func (eh *stateEntryHandle) save(state *hst.State) error {
|
||||
f, err := eh.open(os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = entryEncode(f, state)
|
||||
if closeErr := f.Close(); closeErr != nil && err == nil {
|
||||
err = &hst.AppError{Step: "close state file", Err: closeErr}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// load loads and validates the state entry header, and returns the [hst.Enablement] byte.
|
||||
// for a non-nil v, the full state payload is decoded and stored in the value pointed to by v.
|
||||
// load validates the embedded hst.Config value.
|
||||
func (eh *stateEntryHandle) load(v *hst.State) (hst.Enablement, error) {
|
||||
f, err := eh.open(os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var et hst.Enablement
|
||||
if v != nil {
|
||||
et, err = entryDecode(f, v)
|
||||
if err == nil && v.ID != eh.ID {
|
||||
err = &hst.AppError{Step: "validate state identifier", Err: os.ErrInvalid,
|
||||
Msg: fmt.Sprintf("state entry %s has unexpected id %s", eh.ID.String(), v.ID.String())}
|
||||
}
|
||||
} else {
|
||||
et, err = entryDecodeHeader(f)
|
||||
}
|
||||
|
||||
if closeErr := f.Close(); closeErr != nil && err == nil {
|
||||
err = &hst.AppError{Step: "close state file", Err: closeErr}
|
||||
}
|
||||
return et, err
|
||||
}
|
||||
|
||||
// storeHandle is a handle on a stateStore segment.
|
||||
// Initialised by stateStore.identityHandle.
|
||||
type storeHandle struct {
|
||||
// Identity of instances tracked by this segment.
|
||||
identity int
|
||||
// Pathname of directory that the segment referred to by storeHandle is rooted in.
|
||||
path *check.Absolute
|
||||
// Inter-process mutex to synchronise operations against resources in this segment.
|
||||
fileMu *lockedfile.Mutex
|
||||
|
||||
// Must be held alongside fileMu.
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// entries returns an iterator over all stateEntryHandle held in this segment.
|
||||
// Must be called while holding a lock on mu and fileMu.
|
||||
// A non-nil error attached to a stateEntryHandle indicates a malformed identifier and is of type [hst.AppError].
|
||||
// A non-nil error returned by entries is of type [hst.AppError].
|
||||
func (h *storeHandle) entries() (iter.Seq[*stateEntryHandle], int, error) {
|
||||
// for error reporting
|
||||
const step = "read store segment entries"
|
||||
|
||||
// read directory contents, should only contain storeMutexName and identifier
|
||||
var entries []os.DirEntry
|
||||
if pl, err := os.ReadDir(h.path.String()); err != nil {
|
||||
return nil, -1, &hst.AppError{Step: step, Err: err}
|
||||
} else {
|
||||
entries = pl
|
||||
}
|
||||
|
||||
// expects lock file
|
||||
l := len(entries)
|
||||
if l > 0 {
|
||||
l--
|
||||
}
|
||||
|
||||
return func(yield func(*stateEntryHandle) bool) {
|
||||
for _, ent := range entries {
|
||||
var eh = stateEntryHandle{pathname: h.path.Append(ent.Name())}
|
||||
|
||||
// this should never happen
|
||||
if ent.IsDir() {
|
||||
eh.decodeErr = &hst.AppError{Step: step,
|
||||
Err: errors.New("unexpected directory " + strconv.Quote(ent.Name()) + " in store")}
|
||||
goto out
|
||||
}
|
||||
|
||||
// silently skip lock file
|
||||
if ent.Name() == storeMutexName {
|
||||
continue
|
||||
}
|
||||
|
||||
// this either indicates a serious bug or external interference
|
||||
if err := eh.ID.UnmarshalText([]byte(ent.Name())); err != nil {
|
||||
eh.decodeErr = &hst.AppError{Step: "decode store segment entry", Err: err}
|
||||
goto out
|
||||
}
|
||||
|
||||
out:
|
||||
if !yield(&eh) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}, l, nil
|
||||
}
|
||||
259
internal/store/segment_test.go
Normal file
259
internal/store/segment_test.go
Normal file
@@ -0,0 +1,259 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"iter"
|
||||
"os"
|
||||
"reflect"
|
||||
"slices"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/container/stub"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/internal/lockedfile"
|
||||
)
|
||||
|
||||
func TestStateEntryHandle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("lockout", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
wantErr := func() error { return stub.UniqueError(0) }
|
||||
eh := stateEntryHandle{decodeErr: wantErr(), pathname: check.MustAbs("/proc/nonexistent")}
|
||||
|
||||
if _, err := eh.open(-1, 0); !reflect.DeepEqual(err, wantErr()) {
|
||||
t.Errorf("open: error = %v, want %v", err, wantErr())
|
||||
}
|
||||
if err := eh.destroy(); !reflect.DeepEqual(err, wantErr()) {
|
||||
t.Errorf("destroy: error = %v, want %v", err, wantErr())
|
||||
}
|
||||
if err := eh.save(nil); !reflect.DeepEqual(err, wantErr()) {
|
||||
t.Errorf("save: error = %v, want %v", err, wantErr())
|
||||
}
|
||||
if _, err := eh.load(nil); !reflect.DeepEqual(err, wantErr()) {
|
||||
t.Errorf("load: error = %v, want %v", err, wantErr())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("od", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
{
|
||||
eh := stateEntryHandle{pathname: check.MustAbs(t.TempDir()).Append("entry")}
|
||||
if f, err := eh.open(os.O_CREATE|syscall.O_EXCL, 0); err != nil {
|
||||
t.Fatalf("open: error = %v", err)
|
||||
} else if err = f.Close(); err != nil {
|
||||
t.Errorf("Close: error = %v", err)
|
||||
}
|
||||
if err := eh.destroy(); err != nil {
|
||||
t.Fatalf("destroy: error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("nonexistent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
eh := stateEntryHandle{pathname: check.MustAbs("/proc/nonexistent")}
|
||||
|
||||
wantErrOpen := &hst.AppError{Step: "open state entry",
|
||||
Err: &os.PathError{Op: "open", Path: "/proc/nonexistent", Err: syscall.ENOENT}}
|
||||
if _, err := eh.open(os.O_CREATE|syscall.O_EXCL, 0); !reflect.DeepEqual(err, wantErrOpen) {
|
||||
t.Errorf("open: error = %#v, want %#v", err, wantErrOpen)
|
||||
}
|
||||
|
||||
wantErrDestroy := &hst.AppError{Step: "destroy state entry",
|
||||
Err: &os.PathError{Op: "remove", Path: "/proc/nonexistent", Err: syscall.ENOENT}}
|
||||
if err := eh.destroy(); !reflect.DeepEqual(err, wantErrDestroy) {
|
||||
t.Errorf("destroy: error = %#v, want %#v", err, wantErrDestroy)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("saveload", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
eh := stateEntryHandle{pathname: check.MustAbs(t.TempDir()).Append("entry"),
|
||||
ID: newTemplateState().ID}
|
||||
|
||||
if err := eh.save(newTemplateState()); err != nil {
|
||||
t.Fatalf("save: error = %v", err)
|
||||
}
|
||||
|
||||
t.Run("validate", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("internal", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var got hst.State
|
||||
if f, err := os.Open(eh.pathname.String()); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else if _, err = entryDecode(f, &got); err != nil {
|
||||
t.Fatalf("entryDecode: error = %v", err)
|
||||
} else if err = f.Close(); err != nil {
|
||||
t.Fatal(f.Close())
|
||||
}
|
||||
|
||||
if want := newTemplateState(); !reflect.DeepEqual(&got, want) {
|
||||
t.Errorf("entryDecode: %#v, want %#v", &got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("load header only", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if et, err := eh.load(nil); err != nil {
|
||||
t.Fatalf("load: error = %v", err)
|
||||
} else if want := newTemplateState().Enablements.Unwrap(); et != want {
|
||||
t.Errorf("load: et = %x, want %x", et, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("load", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var got hst.State
|
||||
if _, err := eh.load(&got); err != nil {
|
||||
t.Fatalf("load: error = %v", err)
|
||||
} else if want := newTemplateState(); !reflect.DeepEqual(&got, want) {
|
||||
t.Errorf("load: %#v, want %#v", &got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("load inconsistent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
wantErr := &hst.AppError{Step: "validate state identifier", Err: os.ErrInvalid,
|
||||
Msg: "state entry 00000000000000000000000000000000 has unexpected id aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
|
||||
|
||||
ehi := stateEntryHandle{pathname: eh.pathname}
|
||||
if _, err := ehi.load(new(hst.State)); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("load: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStoreHandle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
ents [2][]string
|
||||
want func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle
|
||||
ext func(t *testing.T, entries iter.Seq[*stateEntryHandle], n int)
|
||||
}{
|
||||
{"errors", [2][]string{{
|
||||
"e81eb203b4190ac5c3842ef44d429945",
|
||||
"lock",
|
||||
"f0-invalid",
|
||||
}, {
|
||||
"f1-directory",
|
||||
}}, func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle {
|
||||
return []*stateEntryHandle{
|
||||
newEh(nil, "e81eb203b4190ac5c3842ef44d429945"),
|
||||
newEh(&hst.AppError{Step: "decode store segment entry",
|
||||
Err: hst.IdentifierDecodeError{Err: hst.ErrIdentifierLength}}, "f0-invalid"),
|
||||
newEh(&hst.AppError{Step: "read store segment entries",
|
||||
Err: errors.New(`unexpected directory "f1-directory" in store`)}, "f1-directory"),
|
||||
}
|
||||
}, nil},
|
||||
|
||||
{"success", [2][]string{{
|
||||
"e81eb203b4190ac5c3842ef44d429945",
|
||||
"7958cfbb9272d9cf9cfd61c85afa13f1",
|
||||
"d0b5f7446dd5bd3424ff2f7ac9cace1e",
|
||||
"c8c8e2c4aea5c32fe47240ff8caa874e",
|
||||
"fa0d30b249d80f155a1f80ceddcc32f2",
|
||||
"lock",
|
||||
}}, func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle {
|
||||
return []*stateEntryHandle{
|
||||
newEh(nil, "7958cfbb9272d9cf9cfd61c85afa13f1"),
|
||||
newEh(nil, "c8c8e2c4aea5c32fe47240ff8caa874e"),
|
||||
newEh(nil, "d0b5f7446dd5bd3424ff2f7ac9cace1e"),
|
||||
newEh(nil, "e81eb203b4190ac5c3842ef44d429945"),
|
||||
newEh(nil, "fa0d30b249d80f155a1f80ceddcc32f2"),
|
||||
}
|
||||
}, func(t *testing.T, entries iter.Seq[*stateEntryHandle], n int) {
|
||||
if n != 5 {
|
||||
t.Fatalf("entries: n = %d", n)
|
||||
}
|
||||
|
||||
// check partial drain
|
||||
for range entries {
|
||||
break
|
||||
}
|
||||
}},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
p := check.MustAbs(t.TempDir()).Append("segment")
|
||||
if err := os.Mkdir(p.String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
createEntries(t, p, tc.ents)
|
||||
|
||||
var got []*stateEntryHandle
|
||||
if entries, n, err := (&storeHandle{
|
||||
identity: -0xbad,
|
||||
path: p,
|
||||
fileMu: lockedfile.MutexAt(p.Append("lock").String()),
|
||||
}).entries(); err != nil {
|
||||
t.Fatalf("entries: error = %v", err)
|
||||
} else {
|
||||
got = slices.AppendSeq(make([]*stateEntryHandle, 0, n), entries)
|
||||
if tc.ext != nil {
|
||||
tc.ext(t, entries, n)
|
||||
}
|
||||
}
|
||||
|
||||
slices.SortFunc(got, func(a, b *stateEntryHandle) int { return strings.Compare(a.pathname.String(), b.pathname.String()) })
|
||||
want := tc.want(func(err error, name string) *stateEntryHandle {
|
||||
eh := stateEntryHandle{decodeErr: err, pathname: p.Append(name)}
|
||||
if err == nil {
|
||||
if err = eh.UnmarshalText([]byte(name)); err != nil {
|
||||
t.Fatalf("UnmarshalText: error = %v", err)
|
||||
}
|
||||
}
|
||||
return &eh
|
||||
})
|
||||
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("entries: %q, want %q", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("nonexistent", func(t *testing.T) {
|
||||
var wantErr = &hst.AppError{Step: "read store segment entries", Err: &os.PathError{
|
||||
Op: "open",
|
||||
Path: "/proc/nonexistent",
|
||||
Err: syscall.ENOENT,
|
||||
}}
|
||||
if _, _, err := (&storeHandle{
|
||||
identity: -0xbad,
|
||||
path: check.MustAbs("/proc/nonexistent"),
|
||||
}).entries(); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Fatalf("entries: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// createEntries creates file and directory entries in the specified prefix.
|
||||
func createEntries(t *testing.T, prefix *check.Absolute, ents [2][]string) {
|
||||
for _, s := range ents[0] {
|
||||
if f, err := os.OpenFile(prefix.Append(s).String(), os.O_CREATE|os.O_EXCL, 0600); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else if err = f.Close(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
for _, s := range ents[1] {
|
||||
if err := os.Mkdir(prefix.Append(s).String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
162
internal/store/store.go
Normal file
162
internal/store/store.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/fs"
|
||||
"iter"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/internal/lockedfile"
|
||||
)
|
||||
|
||||
// storeMutexName is the pathname of the file backing [lockedfile.Mutex] of a stateStore and storeHandle.
|
||||
const storeMutexName = "lock"
|
||||
|
||||
// A stateStore keeps track of [hst.State] via a well-known filesystem accessible to all hakurei priv-side processes.
|
||||
// Access to store data and related resources are synchronised on a per-segment basis via storeHandle.
|
||||
type stateStore struct {
|
||||
// Pathname of directory that the store is rooted in.
|
||||
base *check.Absolute
|
||||
|
||||
// All currently known instances of storeHandle, keyed by their identity.
|
||||
handles sync.Map
|
||||
|
||||
// Inter-process mutex to synchronise operations against the entire store.
|
||||
// Held during List and when initialising previously unknown identities during Do.
|
||||
// Must not be accessed directly. Callers should use the bigLock method instead.
|
||||
fileMu *lockedfile.Mutex
|
||||
|
||||
// For creating the base directory.
|
||||
mkdirOnce sync.Once
|
||||
// Stored error value via mkdirOnce.
|
||||
mkdirErr error
|
||||
}
|
||||
|
||||
// bigLock acquires fileMu on stateStore.
|
||||
// A non-nil error returned by bigLock is of type [hst.AppError].
|
||||
func (s *stateStore) bigLock() (unlock func(), err error) {
|
||||
s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base.String(), 0700) })
|
||||
if s.mkdirErr != nil {
|
||||
return nil, &hst.AppError{Step: "create state store directory", Err: s.mkdirErr}
|
||||
}
|
||||
|
||||
if unlock, err = s.fileMu.Lock(); err != nil {
|
||||
return nil, &hst.AppError{Step: "acquire lock on the state store", Err: err}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// identityHandle loads or initialises a storeHandle for identity.
|
||||
// A non-nil error returned by identityHandle is of type [hst.AppError].
|
||||
func (s *stateStore) identityHandle(identity int) (*storeHandle, error) {
|
||||
h := new(storeHandle)
|
||||
h.mu.Lock()
|
||||
|
||||
if v, ok := s.handles.LoadOrStore(identity, h); ok {
|
||||
h = v.(*storeHandle)
|
||||
} else {
|
||||
// acquire big lock to initialise previously unknown segment handle
|
||||
if unlock, err := s.bigLock(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
defer unlock()
|
||||
}
|
||||
|
||||
h.identity = identity
|
||||
h.path = s.base.Append(strconv.Itoa(identity))
|
||||
h.fileMu = lockedfile.MutexAt(h.path.Append(storeMutexName).String())
|
||||
|
||||
err := os.MkdirAll(h.path.String(), 0700)
|
||||
h.mu.Unlock()
|
||||
if err != nil && !errors.Is(err, fs.ErrExist) {
|
||||
// handle methods will likely return ENOENT
|
||||
s.handles.CompareAndDelete(identity, h)
|
||||
return nil, &hst.AppError{Step: "create store segment directory", Err: err}
|
||||
}
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// segmentIdentity is produced by the iterator returned by stateStore.segments.
|
||||
type segmentIdentity struct {
|
||||
// Identity of the current segment.
|
||||
identity int
|
||||
// Error encountered while processing this segment.
|
||||
err error
|
||||
}
|
||||
|
||||
// segments returns an iterator over all segmentIdentity known to the store.
|
||||
// To obtain a storeHandle on a segment, caller must then call identityHandle.
|
||||
// A non-nil error returned by segments is of type [hst.AppError].
|
||||
func (s *stateStore) segments() (iter.Seq[segmentIdentity], int, error) {
|
||||
// read directory contents, should only contain storeMutexName and identity
|
||||
var entries []os.DirEntry
|
||||
|
||||
// acquire big lock to read store segment list
|
||||
if unlock, err := s.bigLock(); err != nil {
|
||||
return nil, -1, err
|
||||
} else {
|
||||
entries, err = os.ReadDir(s.base.String())
|
||||
unlock()
|
||||
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, -1, &hst.AppError{Step: "read store segments", Err: err}
|
||||
}
|
||||
}
|
||||
|
||||
// expects lock file
|
||||
l := len(entries)
|
||||
if l > 0 {
|
||||
l--
|
||||
}
|
||||
|
||||
return func(yield func(segmentIdentity) bool) {
|
||||
// for error reporting
|
||||
const step = "process store segment"
|
||||
|
||||
for _, ent := range entries {
|
||||
si := segmentIdentity{identity: -1}
|
||||
|
||||
// should only be the big lock
|
||||
if !ent.IsDir() {
|
||||
if ent.Name() == storeMutexName {
|
||||
continue
|
||||
}
|
||||
|
||||
// this should never happen
|
||||
si.err = &hst.AppError{Step: step, Err: syscall.EISDIR,
|
||||
Msg: "skipped non-directory entry " + strconv.Quote(ent.Name())}
|
||||
goto out
|
||||
}
|
||||
|
||||
// failure paths either indicates a serious bug or external interference
|
||||
if v, err := strconv.Atoi(ent.Name()); err != nil {
|
||||
si.err = &hst.AppError{Step: step, Err: err,
|
||||
Msg: "skipped non-identity entry " + strconv.Quote(ent.Name())}
|
||||
goto out
|
||||
} else if v < hst.IdentityMin || v > hst.IdentityMax {
|
||||
si.err = &hst.AppError{Step: step, Err: syscall.ERANGE,
|
||||
Msg: "skipped out of bounds entry " + strconv.Itoa(v)}
|
||||
goto out
|
||||
} else {
|
||||
si.identity = v
|
||||
}
|
||||
|
||||
out:
|
||||
if !yield(si) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}, l, nil
|
||||
}
|
||||
|
||||
// newStore returns the address of a new instance of stateStore.
|
||||
// Multiple instances of stateStore rooted in the same directory is supported, but discouraged.
|
||||
func newStore(base *check.Absolute) *stateStore {
|
||||
return &stateStore{base: base, fileMu: lockedfile.MutexAt(base.Append(storeMutexName).String())}
|
||||
}
|
||||
254
internal/store/store_test.go
Normal file
254
internal/store/store_test.go
Normal file
@@ -0,0 +1,254 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"iter"
|
||||
"os"
|
||||
"reflect"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
func TestStateStoreBigLock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
{
|
||||
s := newStore(check.MustAbs(t.TempDir()).Append("state"))
|
||||
for i := 0; i < 2; i++ { // check once behaviour
|
||||
if unlock, err := s.bigLock(); err != nil {
|
||||
t.Fatalf("bigLock: error = %v", err)
|
||||
} else {
|
||||
unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("mkdir", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
wantErr := &hst.AppError{Step: "create state store directory",
|
||||
Err: &os.PathError{Op: "mkdir", Path: "/proc/nonexistent", Err: syscall.ENOENT}}
|
||||
for i := 0; i < 2; i++ { // check once behaviour
|
||||
if _, err := newStore(check.MustAbs("/proc/nonexistent")).bigLock(); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("bigLock: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("access", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("inaccessible")
|
||||
if err := os.MkdirAll(base.String(), 0); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
wantErr := &hst.AppError{Step: "acquire lock on the state store",
|
||||
Err: &os.PathError{Op: "open", Path: base.Append(storeMutexName).String(), Err: syscall.EACCES}}
|
||||
if _, err := newStore(base).bigLock(); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("bigLock: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStoreIdentityHandle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("loadstore", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := newStore(check.MustAbs(t.TempDir()).Append("store"))
|
||||
|
||||
var handleAddr [8]*storeHandle
|
||||
checkHandle := func(identity int, load bool) {
|
||||
if h, err := s.identityHandle(identity); err != nil {
|
||||
t.Fatalf("identityHandle: error = %v", err)
|
||||
} else if load != (handleAddr[identity] != nil) {
|
||||
t.Fatalf("identityHandle: load = %v, want %v", load, handleAddr[identity] != nil)
|
||||
} else if !load {
|
||||
handleAddr[identity] = h
|
||||
|
||||
if h.identity != identity {
|
||||
t.Errorf("identityHandle: identity = %d, want %d", h.identity, identity)
|
||||
}
|
||||
} else if h != handleAddr[identity] {
|
||||
t.Fatalf("identityHandle: %p, want %p", h, handleAddr[identity])
|
||||
}
|
||||
}
|
||||
|
||||
checkHandle(0, false)
|
||||
checkHandle(1, false)
|
||||
checkHandle(2, false)
|
||||
checkHandle(3, false)
|
||||
checkHandle(7, false)
|
||||
checkHandle(7, true)
|
||||
checkHandle(2, true)
|
||||
checkHandle(1, true)
|
||||
checkHandle(2, true)
|
||||
checkHandle(0, true)
|
||||
})
|
||||
|
||||
t.Run("access", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("inaccessible")
|
||||
if err := os.MkdirAll(base.String(), 0); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
wantErr := &hst.AppError{Step: "acquire lock on the state store",
|
||||
Err: &os.PathError{Op: "open", Path: base.Append(storeMutexName).String(), Err: syscall.EACCES}}
|
||||
if _, err := newStore(base).identityHandle(0); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("identityHandle: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("access segment", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("inaccessible")
|
||||
if err := os.MkdirAll(base.String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if f, err := os.Create(base.Append(storeMutexName).String()); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else if err = f.Close(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if err := os.Chmod(base.String(), 0100); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if err := os.Chmod(base.String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
})
|
||||
|
||||
wantErr := &hst.AppError{Step: "create store segment directory",
|
||||
Err: &os.PathError{Op: "mkdir", Path: base.Append("0").String(), Err: syscall.EACCES}}
|
||||
if _, err := newStore(base).identityHandle(0); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("identityHandle: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStoreSegments(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
ents [2][]string
|
||||
want []segmentIdentity
|
||||
ext func(t *testing.T, segments iter.Seq[segmentIdentity], n int)
|
||||
}{
|
||||
{"errors", [2][]string{{
|
||||
"f0-invalid-file",
|
||||
}, {
|
||||
"f1-invalid-syntax",
|
||||
"9999",
|
||||
"16384",
|
||||
}}, []segmentIdentity{
|
||||
{-1, &hst.AppError{Step: "process store segment", Err: syscall.EISDIR,
|
||||
Msg: `skipped non-directory entry "f0-invalid-file"`}},
|
||||
{-1, &hst.AppError{Step: "process store segment", Err: syscall.ERANGE,
|
||||
Msg: `skipped out of bounds entry 16384`}},
|
||||
{-1, &hst.AppError{Step: "process store segment",
|
||||
Err: &strconv.NumError{Func: "Atoi", Num: "f1-invalid-syntax", Err: strconv.ErrSyntax},
|
||||
Msg: `skipped non-identity entry "f1-invalid-syntax"`}},
|
||||
{9999, nil},
|
||||
}, nil},
|
||||
|
||||
{"success", [2][]string{{
|
||||
"lock",
|
||||
}, {
|
||||
"0",
|
||||
"1",
|
||||
"2",
|
||||
"3",
|
||||
"4",
|
||||
"5",
|
||||
"6",
|
||||
"7",
|
||||
"9",
|
||||
"13",
|
||||
"20",
|
||||
"31",
|
||||
"197",
|
||||
}}, []segmentIdentity{
|
||||
{0, nil},
|
||||
{1, nil},
|
||||
{2, nil},
|
||||
{3, nil},
|
||||
{4, nil},
|
||||
{5, nil},
|
||||
{6, nil},
|
||||
{7, nil},
|
||||
{9, nil},
|
||||
{13, nil},
|
||||
{20, nil},
|
||||
{31, nil},
|
||||
{197, nil},
|
||||
}, func(t *testing.T, segments iter.Seq[segmentIdentity], n int) {
|
||||
if n != 13 {
|
||||
t.Fatalf("segments: n = %d", n)
|
||||
}
|
||||
|
||||
// check partial drain
|
||||
for range segments {
|
||||
break
|
||||
}
|
||||
}},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("store")
|
||||
if err := os.Mkdir(base.String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
createEntries(t, base, tc.ents)
|
||||
|
||||
var got []segmentIdentity
|
||||
if segments, n, err := newStore(base).segments(); err != nil {
|
||||
t.Fatalf("segments: error = %v", err)
|
||||
} else {
|
||||
got = slices.AppendSeq(make([]segmentIdentity, 0, n), segments)
|
||||
if tc.ext != nil {
|
||||
tc.ext(t, segments, n)
|
||||
}
|
||||
}
|
||||
|
||||
slices.SortFunc(got, func(a, b segmentIdentity) int {
|
||||
if a.identity == b.identity {
|
||||
return strings.Compare(a.err.Error(), b.err.Error())
|
||||
}
|
||||
return cmp.Compare(a.identity, b.identity)
|
||||
})
|
||||
if !reflect.DeepEqual(got, tc.want) {
|
||||
t.Errorf("segments: %#v, want %#v", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("access", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("inaccessible")
|
||||
if err := os.MkdirAll(base.String(), 0); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
wantErr := &hst.AppError{Step: "acquire lock on the state store",
|
||||
Err: &os.PathError{Op: "open", Path: base.Append(storeMutexName).String(), Err: syscall.EACCES}}
|
||||
if _, _, err := newStore(base).segments(); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("segments: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user