internal/outcome: rename from app
All checks were successful
Test / Sandbox (race detector) (push) Successful in 4m7s
Test / Hakurei (race detector) (push) Successful in 4m55s
Test / Flake checks (push) Successful in 1m27s
Test / Create distribution (push) Successful in 33s
Test / Sandbox (push) Successful in 2m11s
Test / Hakurei (push) Successful in 3m9s
Test / Hpkg (push) Successful in 4m1s
All checks were successful
Test / Sandbox (race detector) (push) Successful in 4m7s
Test / Hakurei (race detector) (push) Successful in 4m55s
Test / Flake checks (push) Successful in 1m27s
Test / Create distribution (push) Successful in 33s
Test / Sandbox (push) Successful in 2m11s
Test / Hakurei (push) Successful in 3m9s
Test / Hpkg (push) Successful in 4m1s
This is less ambiguous, and more accurately describes the purpose of the package. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
52
internal/state/data.go
Normal file
52
internal/state/data.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
// entryEncode encodes [hst.State] into [io.Writer] with the state entry header.
|
||||
// entryEncode does not validate the embedded [hst.Config] value.
|
||||
//
|
||||
// A non-nil error returned by entryEncode is of type [hst.AppError].
|
||||
func entryEncode(w io.Writer, s *hst.State) error {
|
||||
if err := entryWriteHeader(w, s.Enablements.Unwrap()); err != nil {
|
||||
return &hst.AppError{Step: "encode state header", Err: err}
|
||||
} else if err = gob.NewEncoder(w).Encode(s); err != nil {
|
||||
return &hst.AppError{Step: "encode state body", Err: err}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// entryDecodeHeader calls entryReadHeader, returning [hst.AppError] for a non-nil error.
|
||||
func entryDecodeHeader(r io.Reader) (hst.Enablement, error) {
|
||||
if et, err := entryReadHeader(r); err != nil {
|
||||
return 0, &hst.AppError{Step: "decode state header", Err: err}
|
||||
} else {
|
||||
return et, nil
|
||||
}
|
||||
}
|
||||
|
||||
// entryDecode decodes [hst.State] from [io.Reader] and stores the result in the value pointed to by p.
|
||||
// entryDecode validates the embedded [hst.Config] value.
|
||||
//
|
||||
// A non-nil error returned by entryDecode is of type [hst.AppError].
|
||||
func entryDecode(r io.Reader, p *hst.State) (hst.Enablement, error) {
|
||||
if et, err := entryDecodeHeader(r); err != nil {
|
||||
return et, err
|
||||
} else if err = gob.NewDecoder(r).Decode(&p); err != nil {
|
||||
return et, &hst.AppError{Step: "decode state body", Err: err}
|
||||
} else if err = p.Config.Validate(); err != nil {
|
||||
return et, err
|
||||
} else if p.Enablements.Unwrap() != et {
|
||||
return et, &hst.AppError{Step: "validate state enablement", Err: os.ErrInvalid,
|
||||
Msg: fmt.Sprintf("state entry %s has unexpected enablement byte %#x, %#x", p.ID.String(), byte(p.Enablements.Unwrap()), byte(et))}
|
||||
} else {
|
||||
return et, nil
|
||||
}
|
||||
}
|
||||
145
internal/state/data_test.go
Normal file
145
internal/state/data_test.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"hakurei.app/container/stub"
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
func TestEntryData(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
mustEncodeGob := func(e any) string {
|
||||
var buf bytes.Buffer
|
||||
if err := gob.NewEncoder(&buf).Encode(e); err != nil {
|
||||
t.Fatalf("cannot encode invalid state: %v", err)
|
||||
return "\x00" // not reached
|
||||
} else {
|
||||
return buf.String()
|
||||
}
|
||||
}
|
||||
templateStateGob := mustEncodeGob(newTemplateState())
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
data string
|
||||
s *hst.State
|
||||
err error
|
||||
}{
|
||||
{"invalid header", "\x00\xff\xca\xfe\xff\xff\xff\x00", nil, &hst.AppError{
|
||||
Step: "decode state header", Err: errors.New("unexpected revision ffff")}},
|
||||
|
||||
{"invalid gob", "\x00\xff\xca\xfe\x00\x00\xff\x00", nil, &hst.AppError{
|
||||
Step: "decode state body", Err: io.EOF}},
|
||||
|
||||
{"invalid config", "\x00\xff\xca\xfe\x00\x00\xff\x00" + mustEncodeGob(new(hst.State)), new(hst.State), &hst.AppError{
|
||||
Step: "validate configuration", Err: hst.ErrConfigNull,
|
||||
Msg: "invalid configuration"}},
|
||||
|
||||
{"inconsistent enablement", "\x00\xff\xca\xfe\x00\x00\xff\x00" + templateStateGob, newTemplateState(), &hst.AppError{
|
||||
Step: "validate state enablement", Err: os.ErrInvalid,
|
||||
Msg: "state entry aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa has unexpected enablement byte 0xd, 0xff"}},
|
||||
|
||||
{"template", "\x00\xff\xca\xfe\x00\x00\x0d\xf2" + templateStateGob, newTemplateState(), nil},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("encode", func(t *testing.T) {
|
||||
if tc.s == nil || tc.s.Config == nil {
|
||||
return
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := entryEncode(&buf, tc.s); err != nil {
|
||||
t.Fatalf("entryEncode: error = %v", err)
|
||||
}
|
||||
|
||||
if tc.err == nil {
|
||||
// Gob encoding is not guaranteed to be deterministic.
|
||||
// While the current implementation mostly is, it has randomised order
|
||||
// for iterating over maps, and hst.Config holds a map for environ.
|
||||
var got hst.State
|
||||
if et, err := entryDecode(&buf, &got); err != nil {
|
||||
t.Fatalf("entryDecode: error = %v", err)
|
||||
} else if stateEt := got.Enablements.Unwrap(); et != stateEt {
|
||||
t.Fatalf("entryDecode: et = %x, state %x", et, stateEt)
|
||||
}
|
||||
if !reflect.DeepEqual(&got, tc.s) {
|
||||
t.Errorf("entryEncode: %x", buf.Bytes())
|
||||
}
|
||||
} else if testing.Verbose() {
|
||||
t.Logf("%x", buf.String())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("decode", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var got hst.State
|
||||
if et, err := entryDecode(strings.NewReader(tc.data), &got); !reflect.DeepEqual(err, tc.err) {
|
||||
t.Fatalf("entryDecode: error = %#v, want %#v", err, tc.err)
|
||||
} else if err != nil {
|
||||
return
|
||||
} else if stateEt := got.Enablements.Unwrap(); et != stateEt {
|
||||
t.Fatalf("entryDecode: et = %x, state %x", et, stateEt)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(&got, tc.s) {
|
||||
t.Errorf("entryDecode: %#v, want %#v", &got, tc.s)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("encode fault", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := newTemplateState()
|
||||
|
||||
t.Run("gob", func(t *testing.T) {
|
||||
var want = &hst.AppError{Step: "encode state body", Err: stub.UniqueError(0xcafe)}
|
||||
if err := entryEncode(stubNErrorWriter(entryHeaderSize), s); !reflect.DeepEqual(err, want) {
|
||||
t.Errorf("entryEncode: error = %#v, want %#v", err, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("header", func(t *testing.T) {
|
||||
var want = &hst.AppError{Step: "encode state header", Err: stub.UniqueError(0xcafe)}
|
||||
if err := entryEncode(stubNErrorWriter(entryHeaderSize-1), s); !reflect.DeepEqual(err, want) {
|
||||
t.Errorf("entryEncode: error = %#v, want %#v", err, want)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// newTemplateState returns the address of a new template [hst.State] struct.
|
||||
func newTemplateState() *hst.State {
|
||||
return &hst.State{
|
||||
ID: hst.ID(bytes.Repeat([]byte{0xaa}, len(hst.ID{}))),
|
||||
PID: 0xcafebabe,
|
||||
ShimPID: 0xdeadbeef,
|
||||
Config: hst.Template(),
|
||||
Time: time.Unix(0, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// stubNErrorWriter returns an error for writes above a certain size.
|
||||
type stubNErrorWriter int
|
||||
|
||||
func (w stubNErrorWriter) Write(p []byte) (n int, err error) {
|
||||
if len(p) > int(w) {
|
||||
return int(w), stub.UniqueError(0xcafe)
|
||||
}
|
||||
return io.Discard.Write(p)
|
||||
}
|
||||
86
internal/state/header.go
Normal file
86
internal/state/header.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
const (
|
||||
// entryHeaderMagic are magic bytes at the beginning of the state entry file.
|
||||
entryHeaderMagic = "\x00\xff\xca\xfe"
|
||||
// entryHeaderRevision follows entryHeaderMagic and is incremented for revisions of the format.
|
||||
entryHeaderRevision = "\x00\x00"
|
||||
// entryHeaderSize is the fixed size of the header in bytes, including the enablement byte and its complement.
|
||||
entryHeaderSize = len(entryHeaderMagic+entryHeaderRevision) + 2
|
||||
)
|
||||
|
||||
// entryHeaderEncode encodes a state entry header for a [hst.Enablement] byte.
|
||||
func entryHeaderEncode(et hst.Enablement) *[entryHeaderSize]byte {
|
||||
data := [entryHeaderSize]byte([]byte(
|
||||
entryHeaderMagic + entryHeaderRevision + string([]hst.Enablement{et, ^et}),
|
||||
))
|
||||
return &data
|
||||
}
|
||||
|
||||
// entryHeaderDecode validates a state entry header and returns the [hst.Enablement] byte.
|
||||
func entryHeaderDecode(data *[entryHeaderSize]byte) (hst.Enablement, error) {
|
||||
if magic := data[:len(entryHeaderMagic)]; string(magic) != entryHeaderMagic {
|
||||
return 0, errors.New("invalid header " + hex.EncodeToString(magic))
|
||||
}
|
||||
if revision := data[len(entryHeaderMagic):len(entryHeaderMagic+entryHeaderRevision)]; string(revision) != entryHeaderRevision {
|
||||
return 0, errors.New("unexpected revision " + hex.EncodeToString(revision))
|
||||
}
|
||||
|
||||
et := data[len(entryHeaderMagic+entryHeaderRevision)]
|
||||
if et != ^data[len(entryHeaderMagic+entryHeaderRevision)+1] {
|
||||
return 0, errors.New("header enablement value is inconsistent")
|
||||
}
|
||||
return hst.Enablement(et), nil
|
||||
}
|
||||
|
||||
// EntrySizeError is returned for a file too small to hold a state entry header.
|
||||
type EntrySizeError struct {
|
||||
Name string
|
||||
Size int64
|
||||
}
|
||||
|
||||
func (e *EntrySizeError) Error() string {
|
||||
if e.Name == "" {
|
||||
return "state entry file is too short"
|
||||
}
|
||||
return "state entry file " + strconv.Quote(e.Name) + " is too short"
|
||||
}
|
||||
|
||||
// entryCheckFile checks whether [os.FileInfo] refers to a file that might hold [hst.State].
|
||||
func entryCheckFile(fi os.FileInfo) error {
|
||||
if fi.IsDir() {
|
||||
return syscall.EISDIR
|
||||
}
|
||||
if s := fi.Size(); s <= int64(entryHeaderSize) {
|
||||
return &EntrySizeError{Name: fi.Name(), Size: s}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// entryReadHeader reads [hst.Enablement] from an [io.Reader].
|
||||
func entryReadHeader(r io.Reader) (hst.Enablement, error) {
|
||||
var data [entryHeaderSize]byte
|
||||
if n, err := r.Read(data[:]); err != nil {
|
||||
return 0, err
|
||||
} else if n != entryHeaderSize {
|
||||
return 0, &EntrySizeError{Size: int64(n)}
|
||||
}
|
||||
return entryHeaderDecode(&data)
|
||||
}
|
||||
|
||||
// entryWriteHeader writes [hst.Enablement] header to an [io.Writer].
|
||||
func entryWriteHeader(w io.Writer, et hst.Enablement) error {
|
||||
_, err := w.Write(entryHeaderEncode(et)[:])
|
||||
return err
|
||||
}
|
||||
184
internal/state/header_test.go
Normal file
184
internal/state/header_test.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
func TestEntryHeader(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
data [entryHeaderSize]byte
|
||||
et hst.Enablement
|
||||
err error
|
||||
}{
|
||||
{"complement mismatch", [entryHeaderSize]byte{0x00, 0xff, 0xca, 0xfe, 0x00, 0x00,
|
||||
0x0a, 0xf6}, 0,
|
||||
errors.New("header enablement value is inconsistent")},
|
||||
{"unexpected revision", [entryHeaderSize]byte{0x00, 0xff, 0xca, 0xfe, 0xff, 0xff}, 0,
|
||||
errors.New("unexpected revision ffff")},
|
||||
{"invalid header", [entryHeaderSize]byte{0x00, 0xfe, 0xca, 0xfe}, 0,
|
||||
errors.New("invalid header 00fecafe")},
|
||||
|
||||
{"success high", [entryHeaderSize]byte{0x00, 0xff, 0xca, 0xfe, 0x00, 0x00,
|
||||
0xff, 0x00}, 0xff, nil},
|
||||
{"success", [entryHeaderSize]byte{0x00, 0xff, 0xca, 0xfe, 0x00, 0x00,
|
||||
0x09, 0xf6}, hst.EWayland | hst.EPulse, nil},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("encode", func(t *testing.T) {
|
||||
if tc.err != nil {
|
||||
return
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
if got := entryHeaderEncode(tc.et); *got != tc.data {
|
||||
t.Errorf("entryHeaderEncode: %x, want %x", *got, tc.data)
|
||||
}
|
||||
|
||||
t.Run("write", func(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
if err := entryWriteHeader(&buf, tc.et); err != nil {
|
||||
t.Fatalf("entryWriteHeader: error = %v", err)
|
||||
}
|
||||
if got := ([entryHeaderSize]byte)(buf.Bytes()); got != tc.data {
|
||||
t.Errorf("entryWriteHeader: %x, want %x", got, tc.data)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("decode", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
got, err := entryHeaderDecode(&tc.data)
|
||||
if !reflect.DeepEqual(err, tc.err) {
|
||||
t.Fatalf("entryHeaderDecode: error = %#v, want %#v", err, tc.err)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if got != tc.et {
|
||||
t.Errorf("entryHeaderDecode: et = %q, want %q", got, tc.et)
|
||||
}
|
||||
|
||||
if got, err = entryReadHeader(bytes.NewReader(tc.data[:])); err != nil {
|
||||
t.Fatalf("entryReadHeader: error = %#v", err)
|
||||
} else if got != tc.et {
|
||||
t.Errorf("entryReadHeader: et = %q, want %q", got, tc.et)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEntrySizeError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
err error
|
||||
want string
|
||||
}{
|
||||
{"size only", &EntrySizeError{Size: 0xdeadbeef},
|
||||
`state entry file is too short`},
|
||||
{"full", &EntrySizeError{Name: "nonexistent", Size: 0xdeadbeef},
|
||||
`state entry file "nonexistent" is too short`},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if got := tc.err.Error(); got != tc.want {
|
||||
t.Errorf("Error: %s, want %s", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEntryCheckFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
fi os.FileInfo
|
||||
err error
|
||||
}{
|
||||
{"dir", &stubFi{name: "dir", isDir: true},
|
||||
syscall.EISDIR},
|
||||
{"short", stubFi{name: "short", size: 8},
|
||||
&EntrySizeError{Name: "short", Size: 8}},
|
||||
{"success", stubFi{size: 9}, nil},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if err := entryCheckFile(tc.fi); !reflect.DeepEqual(err, tc.err) {
|
||||
t.Errorf("entryCheckFile: error = %#v, want %#v", err, tc.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEntryReadHeader(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
newR func() io.Reader
|
||||
err error
|
||||
}{
|
||||
{"eof", func() io.Reader { return bytes.NewReader([]byte{}) }, io.EOF},
|
||||
{"short", func() io.Reader { return bytes.NewReader([]byte{0}) }, &EntrySizeError{Size: 1}},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if _, err := entryReadHeader(tc.newR()); !reflect.DeepEqual(err, tc.err) {
|
||||
t.Errorf("entryReadHeader: error = %#v, want %#v", err, tc.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// stubFi partially implements [os.FileInfo] using hardcoded values.
|
||||
type stubFi struct {
|
||||
name string
|
||||
size int64
|
||||
isDir bool
|
||||
}
|
||||
|
||||
func (fi stubFi) Name() string {
|
||||
if fi.name == "" {
|
||||
panic("unreachable")
|
||||
}
|
||||
return fi.name
|
||||
}
|
||||
|
||||
func (fi stubFi) Size() int64 {
|
||||
if fi.size < 0 {
|
||||
panic("unreachable")
|
||||
}
|
||||
return fi.size
|
||||
}
|
||||
|
||||
func (fi stubFi) IsDir() bool { return fi.isDir }
|
||||
|
||||
func (fi stubFi) Mode() fs.FileMode { panic("unreachable") }
|
||||
func (fi stubFi) ModTime() time.Time { panic("unreachable") }
|
||||
func (fi stubFi) Sys() any { panic("unreachable") }
|
||||
64
internal/state/join.go
Normal file
64
internal/state/join.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"maps"
|
||||
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrDuplicate = errors.New("store contains duplicates")
|
||||
)
|
||||
|
||||
/*
|
||||
Joiner is the interface that wraps the Join method.
|
||||
|
||||
The Join function uses Joiner if available.
|
||||
*/
|
||||
type Joiner interface {
|
||||
Join() (map[hst.ID]*hst.State, error)
|
||||
}
|
||||
|
||||
// Join returns joined state entries of all active identities.
|
||||
func Join(s Store) (map[hst.ID]*hst.State, error) {
|
||||
if j, ok := s.(Joiner); ok {
|
||||
return j.Join()
|
||||
}
|
||||
|
||||
var (
|
||||
aids []int
|
||||
entries = make(map[hst.ID]*hst.State)
|
||||
|
||||
el int
|
||||
res map[hst.ID]*hst.State
|
||||
loadErr error
|
||||
)
|
||||
|
||||
if ln, err := s.List(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
aids = ln
|
||||
}
|
||||
|
||||
for _, aid := range aids {
|
||||
if _, err := s.Do(aid, func(c Cursor) {
|
||||
res, loadErr = c.Load()
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if loadErr != nil {
|
||||
return nil, loadErr
|
||||
}
|
||||
|
||||
// save expected length
|
||||
el = len(entries) + len(res)
|
||||
maps.Copy(entries, res)
|
||||
if len(entries) != el {
|
||||
return nil, ErrDuplicate
|
||||
}
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
161
internal/state/segment.go
Normal file
161
internal/state/segment.go
Normal file
@@ -0,0 +1,161 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"iter"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/internal/lockedfile"
|
||||
)
|
||||
|
||||
// stateEntryHandle is a handle on a state entry retrieved from a storeHandle.
|
||||
// Must only be used while its parent storeHandle.fileMu is held.
|
||||
type stateEntryHandle struct {
|
||||
// Error returned while decoding pathname.
|
||||
// A non-nil value disables stateEntryHandle.
|
||||
decodeErr error
|
||||
|
||||
// Checked path to entry file.
|
||||
pathname *check.Absolute
|
||||
|
||||
hst.ID
|
||||
}
|
||||
|
||||
// open opens the underlying state entry file, returning [hst.AppError] for a non-nil error.
|
||||
func (eh *stateEntryHandle) open(flag int, perm os.FileMode) (*os.File, error) {
|
||||
if eh.decodeErr != nil {
|
||||
return nil, eh.decodeErr
|
||||
}
|
||||
|
||||
if f, err := os.OpenFile(eh.pathname.String(), flag, perm); err != nil {
|
||||
return nil, &hst.AppError{Step: "open state entry", Err: err}
|
||||
} else {
|
||||
return f, nil
|
||||
}
|
||||
}
|
||||
|
||||
// destroy removes the underlying state entry file, returning [hst.AppError] for a non-nil error.
|
||||
func (eh *stateEntryHandle) destroy() error {
|
||||
// destroy does not go through open
|
||||
if eh.decodeErr != nil {
|
||||
return eh.decodeErr
|
||||
}
|
||||
|
||||
if err := os.Remove(eh.pathname.String()); err != nil {
|
||||
return &hst.AppError{Step: "destroy state entry", Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// save encodes [hst.State] and writes it to the underlying file.
|
||||
// An error is returned if a file already exists with the same identifier.
|
||||
// save does not validate the embedded [hst.Config].
|
||||
func (eh *stateEntryHandle) save(state *hst.State) error {
|
||||
f, err := eh.open(os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = entryEncode(f, state)
|
||||
if closeErr := f.Close(); closeErr != nil && err == nil {
|
||||
err = &hst.AppError{Step: "close state file", Err: closeErr}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// load loads and validates the state entry header, and returns the [hst.Enablement] byte.
|
||||
// for a non-nil v, the full state payload is decoded and stored in the value pointed to by v.
|
||||
// load validates the embedded hst.Config value.
|
||||
func (eh *stateEntryHandle) load(v *hst.State) (hst.Enablement, error) {
|
||||
f, err := eh.open(os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var et hst.Enablement
|
||||
if v != nil {
|
||||
et, err = entryDecode(f, v)
|
||||
if err == nil && v.ID != eh.ID {
|
||||
err = &hst.AppError{Step: "validate state identifier", Err: os.ErrInvalid,
|
||||
Msg: fmt.Sprintf("state entry %s has unexpected id %s", eh.ID.String(), v.ID.String())}
|
||||
}
|
||||
} else {
|
||||
et, err = entryDecodeHeader(f)
|
||||
}
|
||||
|
||||
if closeErr := f.Close(); closeErr != nil && err == nil {
|
||||
err = &hst.AppError{Step: "close state file", Err: closeErr}
|
||||
}
|
||||
return et, err
|
||||
}
|
||||
|
||||
// storeHandle is a handle on a stateStore segment.
|
||||
// Initialised by stateStore.identityHandle.
|
||||
type storeHandle struct {
|
||||
// Identity of instances tracked by this segment.
|
||||
identity int
|
||||
// Pathname of directory that the segment referred to by storeHandle is rooted in.
|
||||
path *check.Absolute
|
||||
// Inter-process mutex to synchronise operations against resources in this segment.
|
||||
fileMu *lockedfile.Mutex
|
||||
|
||||
// Must be held alongside fileMu.
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// entries returns an iterator over all stateEntryHandle held in this segment.
|
||||
// Must be called while holding a lock on mu and fileMu.
|
||||
// A non-nil error attached to a stateEntryHandle indicates a malformed identifier and is of type [hst.AppError].
|
||||
// A non-nil error returned by entries is of type [hst.AppError].
|
||||
func (h *storeHandle) entries() (iter.Seq[*stateEntryHandle], int, error) {
|
||||
// for error reporting
|
||||
const step = "read store segment entries"
|
||||
|
||||
// read directory contents, should only contain storeMutexName and identifier
|
||||
var entries []os.DirEntry
|
||||
if pl, err := os.ReadDir(h.path.String()); err != nil {
|
||||
return nil, -1, &hst.AppError{Step: step, Err: err}
|
||||
} else {
|
||||
entries = pl
|
||||
}
|
||||
|
||||
// expects lock file
|
||||
l := len(entries)
|
||||
if l > 0 {
|
||||
l--
|
||||
}
|
||||
|
||||
return func(yield func(*stateEntryHandle) bool) {
|
||||
for _, ent := range entries {
|
||||
var eh = stateEntryHandle{pathname: h.path.Append(ent.Name())}
|
||||
|
||||
// this should never happen
|
||||
if ent.IsDir() {
|
||||
eh.decodeErr = &hst.AppError{Step: step,
|
||||
Err: errors.New("unexpected directory " + strconv.Quote(ent.Name()) + " in store")}
|
||||
goto out
|
||||
}
|
||||
|
||||
// silently skip lock file
|
||||
if ent.Name() == storeMutexName {
|
||||
continue
|
||||
}
|
||||
|
||||
// this either indicates a serious bug or external interference
|
||||
if err := eh.ID.UnmarshalText([]byte(ent.Name())); err != nil {
|
||||
eh.decodeErr = &hst.AppError{Step: "decode store segment entry", Err: err}
|
||||
goto out
|
||||
}
|
||||
|
||||
out:
|
||||
if !yield(&eh) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}, l, nil
|
||||
}
|
||||
259
internal/state/segment_test.go
Normal file
259
internal/state/segment_test.go
Normal file
@@ -0,0 +1,259 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"iter"
|
||||
"os"
|
||||
"reflect"
|
||||
"slices"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/container/stub"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/internal/lockedfile"
|
||||
)
|
||||
|
||||
func TestStateEntryHandle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("lockout", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
wantErr := func() error { return stub.UniqueError(0) }
|
||||
eh := stateEntryHandle{decodeErr: wantErr(), pathname: check.MustAbs("/proc/nonexistent")}
|
||||
|
||||
if _, err := eh.open(-1, 0); !reflect.DeepEqual(err, wantErr()) {
|
||||
t.Errorf("open: error = %v, want %v", err, wantErr())
|
||||
}
|
||||
if err := eh.destroy(); !reflect.DeepEqual(err, wantErr()) {
|
||||
t.Errorf("destroy: error = %v, want %v", err, wantErr())
|
||||
}
|
||||
if err := eh.save(nil); !reflect.DeepEqual(err, wantErr()) {
|
||||
t.Errorf("save: error = %v, want %v", err, wantErr())
|
||||
}
|
||||
if _, err := eh.load(nil); !reflect.DeepEqual(err, wantErr()) {
|
||||
t.Errorf("load: error = %v, want %v", err, wantErr())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("od", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
{
|
||||
eh := stateEntryHandle{pathname: check.MustAbs(t.TempDir()).Append("entry")}
|
||||
if f, err := eh.open(os.O_CREATE|syscall.O_EXCL, 0); err != nil {
|
||||
t.Fatalf("open: error = %v", err)
|
||||
} else if err = f.Close(); err != nil {
|
||||
t.Errorf("Close: error = %v", err)
|
||||
}
|
||||
if err := eh.destroy(); err != nil {
|
||||
t.Fatalf("destroy: error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("nonexistent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
eh := stateEntryHandle{pathname: check.MustAbs("/proc/nonexistent")}
|
||||
|
||||
wantErrOpen := &hst.AppError{Step: "open state entry",
|
||||
Err: &os.PathError{Op: "open", Path: "/proc/nonexistent", Err: syscall.ENOENT}}
|
||||
if _, err := eh.open(os.O_CREATE|syscall.O_EXCL, 0); !reflect.DeepEqual(err, wantErrOpen) {
|
||||
t.Errorf("open: error = %#v, want %#v", err, wantErrOpen)
|
||||
}
|
||||
|
||||
wantErrDestroy := &hst.AppError{Step: "destroy state entry",
|
||||
Err: &os.PathError{Op: "remove", Path: "/proc/nonexistent", Err: syscall.ENOENT}}
|
||||
if err := eh.destroy(); !reflect.DeepEqual(err, wantErrDestroy) {
|
||||
t.Errorf("destroy: error = %#v, want %#v", err, wantErrDestroy)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("saveload", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
eh := stateEntryHandle{pathname: check.MustAbs(t.TempDir()).Append("entry"),
|
||||
ID: newTemplateState().ID}
|
||||
|
||||
if err := eh.save(newTemplateState()); err != nil {
|
||||
t.Fatalf("save: error = %v", err)
|
||||
}
|
||||
|
||||
t.Run("validate", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("internal", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var got hst.State
|
||||
if f, err := os.Open(eh.pathname.String()); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else if _, err = entryDecode(f, &got); err != nil {
|
||||
t.Fatalf("entryDecode: error = %v", err)
|
||||
} else if err = f.Close(); err != nil {
|
||||
t.Fatal(f.Close())
|
||||
}
|
||||
|
||||
if want := newTemplateState(); !reflect.DeepEqual(&got, want) {
|
||||
t.Errorf("entryDecode: %#v, want %#v", &got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("load header only", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if et, err := eh.load(nil); err != nil {
|
||||
t.Fatalf("load: error = %v", err)
|
||||
} else if want := newTemplateState().Enablements.Unwrap(); et != want {
|
||||
t.Errorf("load: et = %x, want %x", et, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("load", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var got hst.State
|
||||
if _, err := eh.load(&got); err != nil {
|
||||
t.Fatalf("load: error = %v", err)
|
||||
} else if want := newTemplateState(); !reflect.DeepEqual(&got, want) {
|
||||
t.Errorf("load: %#v, want %#v", &got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("load inconsistent", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
wantErr := &hst.AppError{Step: "validate state identifier", Err: os.ErrInvalid,
|
||||
Msg: "state entry 00000000000000000000000000000000 has unexpected id aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
|
||||
|
||||
ehi := stateEntryHandle{pathname: eh.pathname}
|
||||
if _, err := ehi.load(new(hst.State)); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("load: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStoreHandle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
ents [2][]string
|
||||
want func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle
|
||||
ext func(t *testing.T, entries iter.Seq[*stateEntryHandle], n int)
|
||||
}{
|
||||
{"errors", [2][]string{{
|
||||
"e81eb203b4190ac5c3842ef44d429945",
|
||||
"lock",
|
||||
"f0-invalid",
|
||||
}, {
|
||||
"f1-directory",
|
||||
}}, func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle {
|
||||
return []*stateEntryHandle{
|
||||
newEh(nil, "e81eb203b4190ac5c3842ef44d429945"),
|
||||
newEh(&hst.AppError{Step: "decode store segment entry",
|
||||
Err: hst.IdentifierDecodeError{Err: hst.ErrIdentifierLength}}, "f0-invalid"),
|
||||
newEh(&hst.AppError{Step: "read store segment entries",
|
||||
Err: errors.New(`unexpected directory "f1-directory" in store`)}, "f1-directory"),
|
||||
}
|
||||
}, nil},
|
||||
|
||||
{"success", [2][]string{{
|
||||
"e81eb203b4190ac5c3842ef44d429945",
|
||||
"7958cfbb9272d9cf9cfd61c85afa13f1",
|
||||
"d0b5f7446dd5bd3424ff2f7ac9cace1e",
|
||||
"c8c8e2c4aea5c32fe47240ff8caa874e",
|
||||
"fa0d30b249d80f155a1f80ceddcc32f2",
|
||||
"lock",
|
||||
}}, func(newEh func(err error, name string) *stateEntryHandle) []*stateEntryHandle {
|
||||
return []*stateEntryHandle{
|
||||
newEh(nil, "7958cfbb9272d9cf9cfd61c85afa13f1"),
|
||||
newEh(nil, "c8c8e2c4aea5c32fe47240ff8caa874e"),
|
||||
newEh(nil, "d0b5f7446dd5bd3424ff2f7ac9cace1e"),
|
||||
newEh(nil, "e81eb203b4190ac5c3842ef44d429945"),
|
||||
newEh(nil, "fa0d30b249d80f155a1f80ceddcc32f2"),
|
||||
}
|
||||
}, func(t *testing.T, entries iter.Seq[*stateEntryHandle], n int) {
|
||||
if n != 5 {
|
||||
t.Fatalf("entries: n = %d", n)
|
||||
}
|
||||
|
||||
// check partial drain
|
||||
for range entries {
|
||||
break
|
||||
}
|
||||
}},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
p := check.MustAbs(t.TempDir()).Append("segment")
|
||||
if err := os.Mkdir(p.String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
createEntries(t, p, tc.ents)
|
||||
|
||||
var got []*stateEntryHandle
|
||||
if entries, n, err := (&storeHandle{
|
||||
identity: -0xbad,
|
||||
path: p,
|
||||
fileMu: lockedfile.MutexAt(p.Append("lock").String()),
|
||||
}).entries(); err != nil {
|
||||
t.Fatalf("entries: error = %v", err)
|
||||
} else {
|
||||
got = slices.AppendSeq(make([]*stateEntryHandle, 0, n), entries)
|
||||
if tc.ext != nil {
|
||||
tc.ext(t, entries, n)
|
||||
}
|
||||
}
|
||||
|
||||
slices.SortFunc(got, func(a, b *stateEntryHandle) int { return strings.Compare(a.pathname.String(), b.pathname.String()) })
|
||||
want := tc.want(func(err error, name string) *stateEntryHandle {
|
||||
eh := stateEntryHandle{decodeErr: err, pathname: p.Append(name)}
|
||||
if err == nil {
|
||||
if err = eh.UnmarshalText([]byte(name)); err != nil {
|
||||
t.Fatalf("UnmarshalText: error = %v", err)
|
||||
}
|
||||
}
|
||||
return &eh
|
||||
})
|
||||
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("entries: %q, want %q", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("nonexistent", func(t *testing.T) {
|
||||
var wantErr = &hst.AppError{Step: "read store segment entries", Err: &os.PathError{
|
||||
Op: "open",
|
||||
Path: "/proc/nonexistent",
|
||||
Err: syscall.ENOENT,
|
||||
}}
|
||||
if _, _, err := (&storeHandle{
|
||||
identity: -0xbad,
|
||||
path: check.MustAbs("/proc/nonexistent"),
|
||||
}).entries(); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Fatalf("entries: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// createEntries creates file and directory entries in the specified prefix.
|
||||
func createEntries(t *testing.T, prefix *check.Absolute, ents [2][]string) {
|
||||
for _, s := range ents[0] {
|
||||
if f, err := os.OpenFile(prefix.Append(s).String(), os.O_CREATE|os.O_EXCL, 0600); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else if err = f.Close(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
for _, s := range ents[1] {
|
||||
if err := os.Mkdir(prefix.Append(s).String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
131
internal/state/state.go
Normal file
131
internal/state/state.go
Normal file
@@ -0,0 +1,131 @@
|
||||
// Package state provides cross-process state tracking for hakurei container instances.
|
||||
package state
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/message"
|
||||
)
|
||||
|
||||
/* this provides an implementation of Store on top of the improved state tracking to ease in the changes */
|
||||
|
||||
type Store interface {
|
||||
// Do calls f exactly once and ensures store exclusivity until f returns.
|
||||
// Returns whether f is called and any errors during the locking process.
|
||||
// Cursor provided to f becomes invalid as soon as f returns.
|
||||
Do(identity int, f func(c Cursor)) (ok bool, err error)
|
||||
|
||||
// List queries the store and returns a list of identities known to the store.
|
||||
// Note that some or all returned identities might not have any active apps.
|
||||
List() (identities []int, err error)
|
||||
}
|
||||
|
||||
func (s *stateStore) Do(identity int, f func(c Cursor)) (bool, error) {
|
||||
if h, err := s.identityHandle(identity); err != nil {
|
||||
return false, err
|
||||
} else {
|
||||
return h.do(f)
|
||||
}
|
||||
}
|
||||
|
||||
// storeAdapter satisfies [Store] via stateStore.
|
||||
type storeAdapter struct {
|
||||
msg message.Msg
|
||||
*stateStore
|
||||
}
|
||||
|
||||
func (s storeAdapter) List() ([]int, error) {
|
||||
segments, n, err := s.segments()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
identities := make([]int, 0, n)
|
||||
for si := range segments {
|
||||
if si.err != nil {
|
||||
if m, ok := message.GetMessage(err); ok {
|
||||
s.msg.Verbose(m)
|
||||
} else {
|
||||
// unreachable
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
identities = append(identities, si.identity)
|
||||
}
|
||||
return identities, nil
|
||||
}
|
||||
|
||||
// NewMulti returns an instance of the multi-file store.
|
||||
func NewMulti(msg message.Msg, prefix *check.Absolute) Store {
|
||||
return storeAdapter{msg, newStore(prefix.Append("state"))}
|
||||
}
|
||||
|
||||
// Cursor provides access to the store of an identity.
|
||||
type Cursor interface {
|
||||
Save(state *hst.State) error
|
||||
Destroy(id hst.ID) error
|
||||
Load() (map[hst.ID]*hst.State, error)
|
||||
Len() (int, error)
|
||||
}
|
||||
|
||||
// do implements stateStore.Do on storeHandle.
|
||||
func (h *storeHandle) do(f func(c Cursor)) (bool, error) {
|
||||
if unlock, err := h.fileMu.Lock(); err != nil {
|
||||
return false, &hst.AppError{Step: "acquire lock on store segment " + strconv.Itoa(h.identity), Err: err}
|
||||
} else {
|
||||
defer unlock()
|
||||
}
|
||||
|
||||
f(h)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
/* these compatibility methods must only be called while fileMu is held */
|
||||
|
||||
func (h *storeHandle) Save(state *hst.State) error {
|
||||
return (&stateEntryHandle{nil, h.path.Append(state.ID.String()), state.ID}).save(state)
|
||||
}
|
||||
|
||||
func (h *storeHandle) Destroy(id hst.ID) error {
|
||||
return (&stateEntryHandle{nil, h.path.Append(id.String()), id}).destroy()
|
||||
}
|
||||
|
||||
func (h *storeHandle) Load() (map[hst.ID]*hst.State, error) {
|
||||
entries, n, err := h.entries()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := make(map[hst.ID]*hst.State, n)
|
||||
for eh := range entries {
|
||||
if eh.decodeErr != nil {
|
||||
err = eh.decodeErr
|
||||
break
|
||||
}
|
||||
var s hst.State
|
||||
if _, err = eh.load(&s); err != nil {
|
||||
break
|
||||
}
|
||||
r[eh.ID] = &s
|
||||
}
|
||||
return r, err
|
||||
}
|
||||
|
||||
func (h *storeHandle) Len() (int, error) {
|
||||
entries, _, err := h.entries()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
var n int
|
||||
for eh := range entries {
|
||||
if eh.decodeErr != nil {
|
||||
err = eh.decodeErr
|
||||
}
|
||||
n++
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
120
internal/state/state_test.go
Normal file
120
internal/state/state_test.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package state_test
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/internal/state"
|
||||
"hakurei.app/message"
|
||||
)
|
||||
|
||||
func TestMulti(t *testing.T) {
|
||||
s := state.NewMulti(message.NewMsg(log.New(log.Writer(), "multi: ", 0)), check.MustAbs(t.TempDir()))
|
||||
|
||||
t.Run("list empty store", func(t *testing.T) {
|
||||
if identities, err := s.List(); err != nil {
|
||||
t.Fatalf("List: error = %v", err)
|
||||
} else if len(identities) != 0 {
|
||||
t.Fatalf("List: identities = %#v", identities)
|
||||
}
|
||||
})
|
||||
|
||||
const (
|
||||
insertEntryChecked = iota
|
||||
insertEntryNoCheck
|
||||
insertEntryOtherApp
|
||||
|
||||
tl
|
||||
)
|
||||
|
||||
var tc [tl]hst.State
|
||||
for i := 0; i < tl; i++ {
|
||||
if err := hst.NewInstanceID(&tc[i].ID); err != nil {
|
||||
t.Fatalf("cannot create dummy state: %v", err)
|
||||
}
|
||||
tc[i].PID = rand.Int()
|
||||
tc[i].Config = hst.Template()
|
||||
tc[i].Time = time.Now()
|
||||
}
|
||||
|
||||
do := func(identity int, f func(c state.Cursor)) {
|
||||
if ok, err := s.Do(identity, f); err != nil {
|
||||
t.Fatalf("Do: ok = %v, error = %v", ok, err)
|
||||
}
|
||||
}
|
||||
|
||||
insert := func(i, identity int) {
|
||||
do(identity, func(c state.Cursor) {
|
||||
if err := c.Save(&tc[i]); err != nil {
|
||||
t.Fatalf("Save: error = %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
check := func(i, identity int) {
|
||||
do(identity, func(c state.Cursor) {
|
||||
if entries, err := c.Load(); err != nil {
|
||||
t.Fatalf("Load: error = %v", err)
|
||||
} else if got, ok := entries[tc[i].ID]; !ok {
|
||||
t.Fatalf("Load: entry %s missing", &tc[i].ID)
|
||||
} else {
|
||||
got.Time = tc[i].Time
|
||||
if !reflect.DeepEqual(got, &tc[i]) {
|
||||
t.Fatalf("Load: entry %s got %#v, want %#v", &tc[i].ID, got, &tc[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// insert entry checked
|
||||
insert(insertEntryChecked, 0)
|
||||
check(insertEntryChecked, 0)
|
||||
|
||||
// insert entry unchecked
|
||||
insert(insertEntryNoCheck, 0)
|
||||
|
||||
// insert entry different identity
|
||||
insert(insertEntryOtherApp, 1)
|
||||
check(insertEntryOtherApp, 1)
|
||||
|
||||
// check previous insertion
|
||||
check(insertEntryNoCheck, 0)
|
||||
|
||||
// list identities
|
||||
if identities, err := s.List(); err != nil {
|
||||
t.Fatalf("List: error = %v", err)
|
||||
} else {
|
||||
slices.Sort(identities)
|
||||
want := []int{0, 1}
|
||||
if !slices.Equal(identities, want) {
|
||||
t.Fatalf("List() = %#v, want %#v", identities, want)
|
||||
}
|
||||
}
|
||||
|
||||
// join store
|
||||
if entries, err := state.Join(s); err != nil {
|
||||
t.Fatalf("Join: error = %v", err)
|
||||
} else if len(entries) != 3 {
|
||||
t.Fatalf("Join(s) = %#v", entries)
|
||||
}
|
||||
|
||||
// clear identity 1
|
||||
do(1, func(c state.Cursor) {
|
||||
if err := c.Destroy(tc[insertEntryOtherApp].ID); err != nil {
|
||||
t.Fatalf("Destroy: error = %v", err)
|
||||
}
|
||||
})
|
||||
do(1, func(c state.Cursor) {
|
||||
if l, err := c.Len(); err != nil {
|
||||
t.Fatalf("Len: error = %v", err)
|
||||
} else if l != 0 {
|
||||
t.Fatalf("Len: %d, want 0", l)
|
||||
}
|
||||
})
|
||||
}
|
||||
162
internal/state/store.go
Normal file
162
internal/state/store.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/fs"
|
||||
"iter"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
"hakurei.app/internal/lockedfile"
|
||||
)
|
||||
|
||||
// storeMutexName is the pathname of the file backing [lockedfile.Mutex] of a stateStore and storeHandle.
|
||||
const storeMutexName = "lock"
|
||||
|
||||
// A stateStore keeps track of [hst.State] via a well-known filesystem accessible to all hakurei priv-side processes.
|
||||
// Access to store data and related resources are synchronised on a per-segment basis via storeHandle.
|
||||
type stateStore struct {
|
||||
// Pathname of directory that the store is rooted in.
|
||||
base *check.Absolute
|
||||
|
||||
// All currently known instances of storeHandle, keyed by their identity.
|
||||
handles sync.Map
|
||||
|
||||
// Inter-process mutex to synchronise operations against the entire store.
|
||||
// Held during List and when initialising previously unknown identities during Do.
|
||||
// Must not be accessed directly. Callers should use the bigLock method instead.
|
||||
fileMu *lockedfile.Mutex
|
||||
|
||||
// For creating the base directory.
|
||||
mkdirOnce sync.Once
|
||||
// Stored error value via mkdirOnce.
|
||||
mkdirErr error
|
||||
}
|
||||
|
||||
// bigLock acquires fileMu on stateStore.
|
||||
// A non-nil error returned by bigLock is of type [hst.AppError].
|
||||
func (s *stateStore) bigLock() (unlock func(), err error) {
|
||||
s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base.String(), 0700) })
|
||||
if s.mkdirErr != nil {
|
||||
return nil, &hst.AppError{Step: "create state store directory", Err: s.mkdirErr}
|
||||
}
|
||||
|
||||
if unlock, err = s.fileMu.Lock(); err != nil {
|
||||
return nil, &hst.AppError{Step: "acquire lock on the state store", Err: err}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// identityHandle loads or initialises a storeHandle for identity.
|
||||
// A non-nil error returned by identityHandle is of type [hst.AppError].
|
||||
func (s *stateStore) identityHandle(identity int) (*storeHandle, error) {
|
||||
h := new(storeHandle)
|
||||
h.mu.Lock()
|
||||
|
||||
if v, ok := s.handles.LoadOrStore(identity, h); ok {
|
||||
h = v.(*storeHandle)
|
||||
} else {
|
||||
// acquire big lock to initialise previously unknown segment handle
|
||||
if unlock, err := s.bigLock(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
defer unlock()
|
||||
}
|
||||
|
||||
h.identity = identity
|
||||
h.path = s.base.Append(strconv.Itoa(identity))
|
||||
h.fileMu = lockedfile.MutexAt(h.path.Append(storeMutexName).String())
|
||||
|
||||
err := os.MkdirAll(h.path.String(), 0700)
|
||||
h.mu.Unlock()
|
||||
if err != nil && !errors.Is(err, fs.ErrExist) {
|
||||
// handle methods will likely return ENOENT
|
||||
s.handles.CompareAndDelete(identity, h)
|
||||
return nil, &hst.AppError{Step: "create store segment directory", Err: err}
|
||||
}
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// segmentIdentity is produced by the iterator returned by stateStore.segments.
|
||||
type segmentIdentity struct {
|
||||
// Identity of the current segment.
|
||||
identity int
|
||||
// Error encountered while processing this segment.
|
||||
err error
|
||||
}
|
||||
|
||||
// segments returns an iterator over all segmentIdentity known to the store.
|
||||
// To obtain a storeHandle on a segment, caller must then call identityHandle.
|
||||
// A non-nil error returned by segments is of type [hst.AppError].
|
||||
func (s *stateStore) segments() (iter.Seq[segmentIdentity], int, error) {
|
||||
// read directory contents, should only contain storeMutexName and identity
|
||||
var entries []os.DirEntry
|
||||
|
||||
// acquire big lock to read store segment list
|
||||
if unlock, err := s.bigLock(); err != nil {
|
||||
return nil, -1, err
|
||||
} else {
|
||||
entries, err = os.ReadDir(s.base.String())
|
||||
unlock()
|
||||
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, -1, &hst.AppError{Step: "read store segments", Err: err}
|
||||
}
|
||||
}
|
||||
|
||||
// expects lock file
|
||||
l := len(entries)
|
||||
if l > 0 {
|
||||
l--
|
||||
}
|
||||
|
||||
return func(yield func(segmentIdentity) bool) {
|
||||
// for error reporting
|
||||
const step = "process store segment"
|
||||
|
||||
for _, ent := range entries {
|
||||
si := segmentIdentity{identity: -1}
|
||||
|
||||
// should only be the big lock
|
||||
if !ent.IsDir() {
|
||||
if ent.Name() == storeMutexName {
|
||||
continue
|
||||
}
|
||||
|
||||
// this should never happen
|
||||
si.err = &hst.AppError{Step: step, Err: syscall.EISDIR,
|
||||
Msg: "skipped non-directory entry " + strconv.Quote(ent.Name())}
|
||||
goto out
|
||||
}
|
||||
|
||||
// failure paths either indicates a serious bug or external interference
|
||||
if v, err := strconv.Atoi(ent.Name()); err != nil {
|
||||
si.err = &hst.AppError{Step: step, Err: err,
|
||||
Msg: "skipped non-identity entry " + strconv.Quote(ent.Name())}
|
||||
goto out
|
||||
} else if v < hst.IdentityMin || v > hst.IdentityMax {
|
||||
si.err = &hst.AppError{Step: step, Err: syscall.ERANGE,
|
||||
Msg: "skipped out of bounds entry " + strconv.Itoa(v)}
|
||||
goto out
|
||||
} else {
|
||||
si.identity = v
|
||||
}
|
||||
|
||||
out:
|
||||
if !yield(si) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}, l, nil
|
||||
}
|
||||
|
||||
// newStore returns the address of a new instance of stateStore.
|
||||
// Multiple instances of stateStore rooted in the same directory is supported, but discouraged.
|
||||
func newStore(base *check.Absolute) *stateStore {
|
||||
return &stateStore{base: base, fileMu: lockedfile.MutexAt(base.Append(storeMutexName).String())}
|
||||
}
|
||||
254
internal/state/store_test.go
Normal file
254
internal/state/store_test.go
Normal file
@@ -0,0 +1,254 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"iter"
|
||||
"os"
|
||||
"reflect"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"hakurei.app/container/check"
|
||||
"hakurei.app/hst"
|
||||
)
|
||||
|
||||
func TestStateStoreBigLock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
{
|
||||
s := newStore(check.MustAbs(t.TempDir()).Append("state"))
|
||||
for i := 0; i < 2; i++ { // check once behaviour
|
||||
if unlock, err := s.bigLock(); err != nil {
|
||||
t.Fatalf("bigLock: error = %v", err)
|
||||
} else {
|
||||
unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("mkdir", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
wantErr := &hst.AppError{Step: "create state store directory",
|
||||
Err: &os.PathError{Op: "mkdir", Path: "/proc/nonexistent", Err: syscall.ENOENT}}
|
||||
for i := 0; i < 2; i++ { // check once behaviour
|
||||
if _, err := newStore(check.MustAbs("/proc/nonexistent")).bigLock(); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("bigLock: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("access", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("inaccessible")
|
||||
if err := os.MkdirAll(base.String(), 0); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
wantErr := &hst.AppError{Step: "acquire lock on the state store",
|
||||
Err: &os.PathError{Op: "open", Path: base.Append(storeMutexName).String(), Err: syscall.EACCES}}
|
||||
if _, err := newStore(base).bigLock(); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("bigLock: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStoreIdentityHandle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("loadstore", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := newStore(check.MustAbs(t.TempDir()).Append("store"))
|
||||
|
||||
var handleAddr [8]*storeHandle
|
||||
checkHandle := func(identity int, load bool) {
|
||||
if h, err := s.identityHandle(identity); err != nil {
|
||||
t.Fatalf("identityHandle: error = %v", err)
|
||||
} else if load != (handleAddr[identity] != nil) {
|
||||
t.Fatalf("identityHandle: load = %v, want %v", load, handleAddr[identity] != nil)
|
||||
} else if !load {
|
||||
handleAddr[identity] = h
|
||||
|
||||
if h.identity != identity {
|
||||
t.Errorf("identityHandle: identity = %d, want %d", h.identity, identity)
|
||||
}
|
||||
} else if h != handleAddr[identity] {
|
||||
t.Fatalf("identityHandle: %p, want %p", h, handleAddr[identity])
|
||||
}
|
||||
}
|
||||
|
||||
checkHandle(0, false)
|
||||
checkHandle(1, false)
|
||||
checkHandle(2, false)
|
||||
checkHandle(3, false)
|
||||
checkHandle(7, false)
|
||||
checkHandle(7, true)
|
||||
checkHandle(2, true)
|
||||
checkHandle(1, true)
|
||||
checkHandle(2, true)
|
||||
checkHandle(0, true)
|
||||
})
|
||||
|
||||
t.Run("access", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("inaccessible")
|
||||
if err := os.MkdirAll(base.String(), 0); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
wantErr := &hst.AppError{Step: "acquire lock on the state store",
|
||||
Err: &os.PathError{Op: "open", Path: base.Append(storeMutexName).String(), Err: syscall.EACCES}}
|
||||
if _, err := newStore(base).identityHandle(0); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("identityHandle: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("access segment", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("inaccessible")
|
||||
if err := os.MkdirAll(base.String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if f, err := os.Create(base.Append(storeMutexName).String()); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
} else if err = f.Close(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if err := os.Chmod(base.String(), 0100); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if err := os.Chmod(base.String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
})
|
||||
|
||||
wantErr := &hst.AppError{Step: "create store segment directory",
|
||||
Err: &os.PathError{Op: "mkdir", Path: base.Append("0").String(), Err: syscall.EACCES}}
|
||||
if _, err := newStore(base).identityHandle(0); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("identityHandle: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStoreSegments(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
ents [2][]string
|
||||
want []segmentIdentity
|
||||
ext func(t *testing.T, segments iter.Seq[segmentIdentity], n int)
|
||||
}{
|
||||
{"errors", [2][]string{{
|
||||
"f0-invalid-file",
|
||||
}, {
|
||||
"f1-invalid-syntax",
|
||||
"9999",
|
||||
"16384",
|
||||
}}, []segmentIdentity{
|
||||
{-1, &hst.AppError{Step: "process store segment", Err: syscall.EISDIR,
|
||||
Msg: `skipped non-directory entry "f0-invalid-file"`}},
|
||||
{-1, &hst.AppError{Step: "process store segment", Err: syscall.ERANGE,
|
||||
Msg: `skipped out of bounds entry 16384`}},
|
||||
{-1, &hst.AppError{Step: "process store segment",
|
||||
Err: &strconv.NumError{Func: "Atoi", Num: "f1-invalid-syntax", Err: strconv.ErrSyntax},
|
||||
Msg: `skipped non-identity entry "f1-invalid-syntax"`}},
|
||||
{9999, nil},
|
||||
}, nil},
|
||||
|
||||
{"success", [2][]string{{
|
||||
"lock",
|
||||
}, {
|
||||
"0",
|
||||
"1",
|
||||
"2",
|
||||
"3",
|
||||
"4",
|
||||
"5",
|
||||
"6",
|
||||
"7",
|
||||
"9",
|
||||
"13",
|
||||
"20",
|
||||
"31",
|
||||
"197",
|
||||
}}, []segmentIdentity{
|
||||
{0, nil},
|
||||
{1, nil},
|
||||
{2, nil},
|
||||
{3, nil},
|
||||
{4, nil},
|
||||
{5, nil},
|
||||
{6, nil},
|
||||
{7, nil},
|
||||
{9, nil},
|
||||
{13, nil},
|
||||
{20, nil},
|
||||
{31, nil},
|
||||
{197, nil},
|
||||
}, func(t *testing.T, segments iter.Seq[segmentIdentity], n int) {
|
||||
if n != 13 {
|
||||
t.Fatalf("segments: n = %d", n)
|
||||
}
|
||||
|
||||
// check partial drain
|
||||
for range segments {
|
||||
break
|
||||
}
|
||||
}},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("store")
|
||||
if err := os.Mkdir(base.String(), 0700); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
createEntries(t, base, tc.ents)
|
||||
|
||||
var got []segmentIdentity
|
||||
if segments, n, err := newStore(base).segments(); err != nil {
|
||||
t.Fatalf("segments: error = %v", err)
|
||||
} else {
|
||||
got = slices.AppendSeq(make([]segmentIdentity, 0, n), segments)
|
||||
if tc.ext != nil {
|
||||
tc.ext(t, segments, n)
|
||||
}
|
||||
}
|
||||
|
||||
slices.SortFunc(got, func(a, b segmentIdentity) int {
|
||||
if a.identity == b.identity {
|
||||
return strings.Compare(a.err.Error(), b.err.Error())
|
||||
}
|
||||
return cmp.Compare(a.identity, b.identity)
|
||||
})
|
||||
if !reflect.DeepEqual(got, tc.want) {
|
||||
t.Errorf("segments: %#v, want %#v", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("access", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
base := check.MustAbs(t.TempDir()).Append("inaccessible")
|
||||
if err := os.MkdirAll(base.String(), 0); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
wantErr := &hst.AppError{Step: "acquire lock on the state store",
|
||||
Err: &os.PathError{Op: "open", Path: base.Append(storeMutexName).String(), Err: syscall.EACCES}}
|
||||
if _, _, err := newStore(base).segments(); !reflect.DeepEqual(err, wantErr) {
|
||||
t.Errorf("segments: error = %#v, want %#v", err, wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user