state: store config in separate gob stream
All checks were successful
Build / Create distribution (push) Successful in 1m37s
Test / Run NixOS test (push) Successful in 3m38s

This enables early serialisation of config.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2025-01-21 12:10:58 +09:00
parent fa0616b274
commit dfcdc5ce20
7 changed files with 135 additions and 55 deletions

View File

@@ -1,9 +1,11 @@
package state
import (
"encoding/binary"
"encoding/gob"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path"
@@ -208,12 +210,11 @@ func (b *multiBackend) load(decode bool) (Entries, error) {
s := new(State)
r[*id] = s
// append regardless, but only parse if required, used to implement Len
// append regardless, but only parse if required, implements Len
if decode {
if err = gob.NewDecoder(f).Decode(s); err != nil {
if err = b.decodeState(f, s); err != nil {
return err
}
if s.ID != *id {
return fmt.Errorf("state entry %s has unexpected id %s", id, &s.ID)
}
@@ -229,18 +230,65 @@ func (b *multiBackend) load(decode bool) (Entries, error) {
return r, nil
}
// state file consists of an eight byte header, followed by concatenated gobs
// of [fst.Config] and [State], if [State.Config] is not nil or offset < 0,
// the first gob is skipped
func (b *multiBackend) decodeState(r io.ReadSeeker, state *State) error {
offset := make([]byte, 8)
if l, err := r.Read(offset); err != nil {
if errors.Is(err, io.EOF) {
return fmt.Errorf("state file too short: %d bytes", l)
}
return err
}
// decode volatile state first
var skipConfig bool
{
o := int64(binary.LittleEndian.Uint64(offset))
skipConfig = o < 0
if !skipConfig {
if l, err := r.Seek(o, io.SeekCurrent); err != nil {
return err
} else if l != 8+o {
return fmt.Errorf("invalid seek offset %d", l)
}
}
}
if err := gob.NewDecoder(r).Decode(state); err != nil {
return err
}
// decode sealed config
if state.Config == nil {
// config must be provided either as part of volatile state,
// or in the config segment
if skipConfig {
return ErrNoConfig
}
state.Config = new(fst.Config)
if _, err := r.Seek(8, io.SeekStart); err != nil {
return err
}
return gob.NewDecoder(r).Decode(state.Config)
} else {
return nil
}
}
// Save writes process state to filesystem
func (b *multiBackend) Save(state *State) error {
func (b *multiBackend) Save(state *State, configWriter io.WriterTo) error {
b.lock.Lock()
defer b.lock.Unlock()
if state.Config == nil {
return errors.New("state does not contain config")
if configWriter == nil && state.Config == nil {
return ErrNoConfig
}
statePath := b.filename(&state.ID)
// create and open state data file
if f, err := os.OpenFile(statePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil {
return err
} else {
@@ -250,11 +298,43 @@ func (b *multiBackend) Save(state *State) error {
panic("state file closed prematurely")
}
}()
// encode into state file
return gob.NewEncoder(f).Encode(state)
return b.encodeState(f, state, configWriter)
}
}
func (b *multiBackend) encodeState(w io.WriteSeeker, state *State, configWriter io.WriterTo) error {
offset := make([]byte, 8)
// skip header bytes
if _, err := w.Seek(8, io.SeekStart); err != nil {
return err
}
if configWriter != nil {
// write config gob and encode header
if l, err := configWriter.WriteTo(w); err != nil {
return err
} else {
binary.LittleEndian.PutUint64(offset, uint64(l))
}
} else {
// offset == -1 indicates absence of config gob
binary.LittleEndian.PutUint64(offset, 0xffffffffffffffff)
}
// encode volatile state
if err := gob.NewEncoder(w).Encode(state); err != nil {
return err
}
// write header
if _, err := w.Seek(0, io.SeekStart); err != nil {
return err
}
_, err := w.Write(offset)
return err
}
func (b *multiBackend) Destroy(id fst.ID) error {
b.lock.Lock()
defer b.lock.Unlock()