internal/app/state: acquire big lock for toplevel operations
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				Test / Create distribution (push) Successful in 33s
				
			
		
			
				
	
				Test / Sandbox (push) Successful in 2m20s
				
			
		
			
				
	
				Test / Hakurei (push) Successful in 3m5s
				
			
		
			
				
	
				Test / Hpkg (push) Successful in 3m54s
				
			
		
			
				
	
				Test / Sandbox (race detector) (push) Successful in 4m3s
				
			
		
			
				
	
				Test / Hakurei (race detector) (push) Successful in 4m50s
				
			
		
			
				
	
				Test / Flake checks (push) Successful in 1m24s
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	Test / Create distribution (push) Successful in 33s
				
			Test / Sandbox (push) Successful in 2m20s
				
			Test / Hakurei (push) Successful in 3m5s
				
			Test / Hpkg (push) Successful in 3m54s
				
			Test / Sandbox (race detector) (push) Successful in 4m3s
				
			Test / Hakurei (race detector) (push) Successful in 4m50s
				
			Test / Flake checks (push) Successful in 1m24s
				
			This avoids getting into an inconsistent state for simultaneous calls to List and Do on a previously unknown identity. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
		
							parent
							
								
									546b00429f
								
							
						
					
					
						commit
						dacd9550e0
					
				| @ -14,73 +14,116 @@ import ( | |||||||
| 	"hakurei.app/message" | 	"hakurei.app/message" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // multiLockFileName is the name of the file backing [lockedfile.Mutex] of a multiBackend. | // multiLockFileName is the name of the file backing [lockedfile.Mutex] of a multiStore and multiBackend. | ||||||
| const multiLockFileName = "lock" | const multiLockFileName = "lock" | ||||||
| 
 | 
 | ||||||
| // fine-grained locking and access | // fine-grained locking and access | ||||||
| type multiStore struct { | type multiStore struct { | ||||||
| 	base string | 	base string | ||||||
| 
 | 
 | ||||||
| 	// initialised backends | 	// All currently known instances of multiHandle, keyed by their identity. | ||||||
| 	backends *sync.Map | 	handles sync.Map | ||||||
|  | 	// Held during List and when initialising previously unknown identities during Do. | ||||||
|  | 	// Must not be accessed directly. Callers should use the bigLock method instead. | ||||||
|  | 	fileMu *lockedfile.Mutex | ||||||
|  | 
 | ||||||
|  | 	// For creating the base directory. | ||||||
|  | 	mkdirOnce sync.Once | ||||||
|  | 	// Stored error value via mkdirOnce. | ||||||
|  | 	mkdirErr error | ||||||
| 
 | 
 | ||||||
| 	msg message.Msg | 	msg message.Msg | ||||||
| 	mu  sync.RWMutex | 	mu  sync.RWMutex | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *multiStore) Do(identity int, f func(c Cursor)) (bool, error) { | // bigLock acquires fileMu on multiStore. | ||||||
| 	s.mu.RLock() | // Must be called while holding a read lock on multiStore. | ||||||
| 	defer s.mu.RUnlock() | func (s *multiStore) bigLock() (unlock func(), err error) { | ||||||
| 
 | 	s.mkdirOnce.Do(func() { s.mkdirErr = os.MkdirAll(s.base, 0700) }) | ||||||
| 	// load or initialise new backend | 	if s.mkdirErr != nil { | ||||||
| 	b := new(multiBackend) | 		return nil, &hst.AppError{Step: "create state store directory", Err: s.mkdirErr} | ||||||
| 	b.mu.Lock() |  | ||||||
| 	if v, ok := s.backends.LoadOrStore(identity, b); ok { |  | ||||||
| 		b = v.(*multiBackend) |  | ||||||
| 	} else { |  | ||||||
| 		b.path = path.Join(s.base, strconv.Itoa(identity)) |  | ||||||
| 
 |  | ||||||
| 		// ensure directory |  | ||||||
| 		if err := os.MkdirAll(b.path, 0700); err != nil && !errors.Is(err, fs.ErrExist) { |  | ||||||
| 			s.backends.CompareAndDelete(identity, b) |  | ||||||
| 			return false, &hst.AppError{Step: "create store segment directory", Err: err} |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		// set up file-based mutex |  | ||||||
| 		b.lockfile = lockedfile.MutexAt(path.Join(b.path, multiLockFileName)) |  | ||||||
| 
 |  | ||||||
| 		b.mu.Unlock() |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// lock backend | 	if unlock, err = s.fileMu.Lock(); err != nil { | ||||||
| 	if unlock, err := b.lockfile.Lock(); err != nil { | 		return nil, &hst.AppError{Step: "acquire lock on the state store", Err: err} | ||||||
| 		return false, &hst.AppError{Step: "lock store segment", Err: err} | 	} | ||||||
|  | 	return | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // identityHandle loads or initialises a multiHandle for identity. | ||||||
|  | // Must be called while holding a read lock on multiStore. | ||||||
|  | func (s *multiStore) identityHandle(identity int) (*multiHandle, error) { | ||||||
|  | 	b := new(multiHandle) | ||||||
|  | 	b.mu.Lock() | ||||||
|  | 
 | ||||||
|  | 	if v, ok := s.handles.LoadOrStore(identity, b); ok { | ||||||
|  | 		b = v.(*multiHandle) | ||||||
|  | 	} else { | ||||||
|  | 		// acquire big lock to initialise previously unknown segment handle | ||||||
|  | 		if unlock, err := s.bigLock(); err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} else { | ||||||
|  | 			defer unlock() | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		b.path = path.Join(s.base, strconv.Itoa(identity)) | ||||||
|  | 		b.fileMu = lockedfile.MutexAt(path.Join(b.path, multiLockFileName)) | ||||||
|  | 
 | ||||||
|  | 		if err := os.MkdirAll(b.path, 0700); err != nil && !errors.Is(err, fs.ErrExist) { | ||||||
|  | 			s.handles.CompareAndDelete(identity, b) | ||||||
|  | 			return nil, &hst.AppError{Step: "create store segment directory", Err: err} | ||||||
|  | 		} | ||||||
|  | 		b.mu.Unlock() | ||||||
|  | 	} | ||||||
|  | 	return b, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // do implements multiStore.Do on multiHandle. | ||||||
|  | func (h *multiHandle) do(identity int, f func(c Cursor)) (bool, error) { | ||||||
|  | 	if unlock, err := h.fileMu.Lock(); err != nil { | ||||||
|  | 		return false, &hst.AppError{Step: "acquire lock on store segment " + strconv.Itoa(identity), Err: err} | ||||||
| 	} else { | 	} else { | ||||||
| 		// unlock backend after Do is complete | 		// unlock backend after Do is complete | ||||||
| 		defer unlock() | 		defer unlock() | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// expose backend methods without exporting the pointer | 	// expose backend methods without exporting the pointer | ||||||
| 	c := new(struct{ *multiBackend }) | 	c := &struct{ *multiHandle }{h} | ||||||
| 	c.multiBackend = b |  | ||||||
| 	f(c) | 	f(c) | ||||||
| 	// disable access to the backend on a best-effort basis | 	// disable access to the backend on a best-effort basis | ||||||
| 	c.multiBackend = nil | 	c.multiHandle = nil | ||||||
| 
 |  | ||||||
| 	return true, nil | 	return true, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (s *multiStore) Do(identity int, f func(c Cursor)) (bool, error) { | ||||||
|  | 	s.mu.RLock() | ||||||
|  | 	defer s.mu.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	if h, err := s.identityHandle(identity); err != nil { | ||||||
|  | 		return false, err | ||||||
|  | 	} else { | ||||||
|  | 		return h.do(identity, f) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (s *multiStore) List() ([]int, error) { | func (s *multiStore) List() ([]int, error) { | ||||||
| 	var entries []os.DirEntry | 	var entries []os.DirEntry | ||||||
| 
 | 
 | ||||||
| 	// read base directory to get all identities | 	// acquire big lock to read store segment list | ||||||
| 	if v, err := os.ReadDir(s.base); err != nil && !errors.Is(err, os.ErrNotExist) { | 	s.mu.RLock() | ||||||
| 		return nil, &hst.AppError{Step: "read store directory", Err: err} | 	if unlock, err := s.bigLock(); err != nil { | ||||||
|  | 		return nil, err | ||||||
| 	} else { | 	} else { | ||||||
| 		entries = v | 		entries, err = os.ReadDir(s.base) | ||||||
|  | 		s.mu.RUnlock() | ||||||
|  | 		unlock() | ||||||
|  | 
 | ||||||
|  | 		if err != nil && !errors.Is(err, os.ErrNotExist) { | ||||||
|  | 			return nil, &hst.AppError{Step: "read store directory", Err: err} | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	aidsBuf := make([]int, 0, len(entries)) | 	identities := make([]int, 0, len(entries)) | ||||||
| 	for _, e := range entries { | 	for _, e := range entries { | ||||||
| 		// skip non-directories | 		// skip non-directories | ||||||
| 		if !e.IsDir() { | 		if !e.IsDir() { | ||||||
| @ -88,9 +131,14 @@ func (s *multiStore) List() ([]int, error) { | |||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		// skip lock file | ||||||
|  | 		if e.Name() == multiLockFileName { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		// skip non-numerical names | 		// skip non-numerical names | ||||||
| 		if v, err := strconv.Atoi(e.Name()); err != nil { | 		if v, err := strconv.Atoi(e.Name()); err != nil { | ||||||
| 			s.msg.Verbosef("skipped non-aid entry %q", e.Name()) | 			s.msg.Verbosef("skipped non-identity entry %q", e.Name()) | ||||||
| 			continue | 			continue | ||||||
| 		} else { | 		} else { | ||||||
| 			if v < hst.IdentityMin || v > hst.IdentityMax { | 			if v < hst.IdentityMin || v > hst.IdentityMax { | ||||||
| @ -98,33 +146,34 @@ func (s *multiStore) List() ([]int, error) { | |||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			aidsBuf = append(aidsBuf, v) | 			identities = append(identities, v) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return append([]int(nil), aidsBuf...), nil | 	return identities, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type multiBackend struct { | // multiHandle is a handle on a multiStore segment. | ||||||
|  | type multiHandle struct { | ||||||
| 	path string | 	path string | ||||||
| 
 | 
 | ||||||
| 	// created/opened by prepare | 	// created by prepare | ||||||
| 	lockfile *lockedfile.Mutex | 	fileMu *lockedfile.Mutex | ||||||
| 
 | 
 | ||||||
| 	mu sync.RWMutex | 	mu sync.RWMutex | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (b *multiBackend) filename(id *hst.ID) string { return path.Join(b.path, id.String()) } | func (h *multiHandle) filename(id *hst.ID) string { return path.Join(h.path, id.String()) } | ||||||
| 
 | 
 | ||||||
| // reads all launchers in multiBackend | // load iterates over all [hst.State] entries reachable via multiHandle, | ||||||
| // file contents are ignored if decode is false | // decoding their contents if decode is true. | ||||||
| func (b *multiBackend) load(decode bool) (map[hst.ID]*hst.State, error) { | func (h *multiHandle) load(decode bool) (map[hst.ID]*hst.State, error) { | ||||||
| 	b.mu.RLock() | 	h.mu.RLock() | ||||||
| 	defer b.mu.RUnlock() | 	defer h.mu.RUnlock() | ||||||
| 
 | 
 | ||||||
| 	// read directory contents, should only contain files named after ids | 	// read directory contents, should only contain files named after ids | ||||||
| 	var entries []os.DirEntry | 	var entries []os.DirEntry | ||||||
| 	if pl, err := os.ReadDir(b.path); err != nil { | 	if pl, err := os.ReadDir(h.path); err != nil { | ||||||
| 		return nil, &hst.AppError{Step: "read store segment directory", Err: err} | 		return nil, &hst.AppError{Step: "read store segment directory", Err: err} | ||||||
| 	} else { | 	} else { | ||||||
| 		entries = pl | 		entries = pl | ||||||
| @ -152,7 +201,7 @@ func (b *multiBackend) load(decode bool) (map[hst.ID]*hst.State, error) { | |||||||
| 		// run in a function to better handle file closing | 		// run in a function to better handle file closing | ||||||
| 		if err := func() error { | 		if err := func() error { | ||||||
| 			// open state file for reading | 			// open state file for reading | ||||||
| 			if f, err := os.Open(path.Join(b.path, e.Name())); err != nil { | 			if f, err := os.Open(path.Join(h.path, e.Name())); err != nil { | ||||||
| 				return &hst.AppError{Step: "open state file", Err: err} | 				return &hst.AppError{Step: "open state file", Err: err} | ||||||
| 			} else { | 			} else { | ||||||
| 				var s hst.State | 				var s hst.State | ||||||
| @ -183,15 +232,15 @@ func (b *multiBackend) load(decode bool) (map[hst.ID]*hst.State, error) { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Save writes process state to filesystem. | // Save writes process state to filesystem. | ||||||
| func (b *multiBackend) Save(state *hst.State) error { | func (h *multiHandle) Save(state *hst.State) error { | ||||||
| 	b.mu.Lock() | 	h.mu.Lock() | ||||||
| 	defer b.mu.Unlock() | 	defer h.mu.Unlock() | ||||||
| 
 | 
 | ||||||
| 	if err := state.Config.Validate(); err != nil { | 	if err := state.Config.Validate(); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	statePath := b.filename(&state.ID) | 	statePath := h.filename(&state.ID) | ||||||
| 	if f, err := os.OpenFile(statePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil { | 	if f, err := os.OpenFile(statePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil { | ||||||
| 		return &hst.AppError{Step: "create state file", Err: err} | 		return &hst.AppError{Step: "create state file", Err: err} | ||||||
| 	} else if err = entryEncode(f, state); err != nil { | 	} else if err = entryEncode(f, state); err != nil { | ||||||
| @ -203,21 +252,21 @@ func (b *multiBackend) Save(state *hst.State) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (b *multiBackend) Destroy(id hst.ID) error { | func (h *multiHandle) Destroy(id hst.ID) error { | ||||||
| 	b.mu.Lock() | 	h.mu.Lock() | ||||||
| 	defer b.mu.Unlock() | 	defer h.mu.Unlock() | ||||||
| 
 | 
 | ||||||
| 	if err := os.Remove(b.filename(&id)); err != nil { | 	if err := os.Remove(h.filename(&id)); err != nil { | ||||||
| 		return &hst.AppError{Step: "destroy state entry", Err: err} | 		return &hst.AppError{Step: "destroy state entry", Err: err} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (b *multiBackend) Load() (map[hst.ID]*hst.State, error) { return b.load(true) } | func (h *multiHandle) Load() (map[hst.ID]*hst.State, error) { return h.load(true) } | ||||||
| 
 | 
 | ||||||
| func (b *multiBackend) Len() (int, error) { | func (h *multiHandle) Len() (int, error) { | ||||||
| 	// rn consists of only nil entries but has the correct length | 	// rn consists of only nil entries but has the correct length | ||||||
| 	rn, err := b.load(false) | 	rn, err := h.load(false) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return -1, &hst.AppError{Step: "count state entries", Err: err} | 		return -1, &hst.AppError{Step: "count state entries", Err: err} | ||||||
| 	} | 	} | ||||||
| @ -226,9 +275,7 @@ func (b *multiBackend) Len() (int, error) { | |||||||
| 
 | 
 | ||||||
| // NewMulti returns an instance of the multi-file store. | // NewMulti returns an instance of the multi-file store. | ||||||
| func NewMulti(msg message.Msg, runDir string) Store { | func NewMulti(msg message.Msg, runDir string) Store { | ||||||
| 	return &multiStore{ | 	store := &multiStore{msg: msg, base: path.Join(runDir, "state")} | ||||||
| 		msg:      msg, | 	store.fileMu = lockedfile.MutexAt(path.Join(store.base, multiLockFileName)) | ||||||
| 		base:     path.Join(runDir, "state"), | 	return store | ||||||
| 		backends: new(sync.Map), |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user