forked from rosa/hakurei
internal/pkg: flush cached errors on abort
This avoids disabling the artifact until cache is reopened. The same has to be implemented for Cancel in a future change. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
@@ -219,7 +219,7 @@ func (c *common) Open(a Artifact) (r io.ReadCloser, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var pathname *check.Absolute
|
var pathname *check.Absolute
|
||||||
if pathname, _, err = c.cache.Cure(a); err != nil {
|
if pathname, _, err = c.cache.cure(a, true); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -601,6 +601,8 @@ type Cache struct {
|
|||||||
identPending map[unique.Handle[ID]]*pendingCure
|
identPending map[unique.Handle[ID]]*pendingCure
|
||||||
// Synchronises access to ident and corresponding filesystem entries.
|
// Synchronises access to ident and corresponding filesystem entries.
|
||||||
identMu sync.RWMutex
|
identMu sync.RWMutex
|
||||||
|
// Synchronises entry into Abort and Cure.
|
||||||
|
abortMu sync.RWMutex
|
||||||
|
|
||||||
// Synchronises entry into exclusive artifacts for the cure method.
|
// Synchronises entry into exclusive artifacts for the cure method.
|
||||||
exclMu sync.Mutex
|
exclMu sync.Mutex
|
||||||
@@ -1036,6 +1038,9 @@ func (c *Cache) Scrub(checks int) error {
|
|||||||
// loadOrStoreIdent attempts to load a cached [Artifact] by its identifier or
|
// loadOrStoreIdent attempts to load a cached [Artifact] by its identifier or
|
||||||
// wait for a pending [Artifact] to cure. If neither is possible, the current
|
// wait for a pending [Artifact] to cure. If neither is possible, the current
|
||||||
// identifier is stored in identPending and a non-nil channel is returned.
|
// identifier is stored in identPending and a non-nil channel is returned.
|
||||||
|
//
|
||||||
|
// Since identErr is treated as grow-only, loadOrStoreIdent must not be entered
|
||||||
|
// without holding a read lock on abortMu.
|
||||||
func (c *Cache) loadOrStoreIdent(id unique.Handle[ID]) (
|
func (c *Cache) loadOrStoreIdent(id unique.Handle[ID]) (
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
done chan<- struct{},
|
done chan<- struct{},
|
||||||
@@ -1069,6 +1074,7 @@ func (c *Cache) loadOrStoreIdent(id unique.Handle[ID]) (
|
|||||||
d := make(chan struct{})
|
d := make(chan struct{})
|
||||||
pending = &pendingCure{done: d}
|
pending = &pendingCure{done: d}
|
||||||
ctx, pending.cancel = context.WithCancel(c.toplevel.Load().ctx)
|
ctx, pending.cancel = context.WithCancel(c.toplevel.Load().ctx)
|
||||||
|
c.wg.Add(1)
|
||||||
c.identPending[id] = pending
|
c.identPending[id] = pending
|
||||||
c.identMu.Unlock()
|
c.identMu.Unlock()
|
||||||
done = d
|
done = d
|
||||||
@@ -1091,6 +1097,7 @@ func (c *Cache) finaliseIdent(
|
|||||||
}
|
}
|
||||||
delete(c.identPending, id)
|
delete(c.identPending, id)
|
||||||
c.identMu.Unlock()
|
c.identMu.Unlock()
|
||||||
|
c.wg.Done()
|
||||||
|
|
||||||
close(done)
|
close(done)
|
||||||
}
|
}
|
||||||
@@ -1310,6 +1317,9 @@ func (c *Cache) Cure(a Artifact) (
|
|||||||
checksum unique.Handle[Checksum],
|
checksum unique.Handle[Checksum],
|
||||||
err error,
|
err error,
|
||||||
) {
|
) {
|
||||||
|
c.abortMu.RLock()
|
||||||
|
defer c.abortMu.RUnlock()
|
||||||
|
|
||||||
if err = c.toplevel.Load().ctx.Err(); err != nil {
|
if err = c.toplevel.Load().ctx.Err(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -1504,7 +1514,8 @@ func (r *RContext) NewMeasuredReader(
|
|||||||
return r.cache.newMeasuredReader(rc, checksum)
|
return r.cache.newMeasuredReader(rc, checksum)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cure implements Cure without checking the full dependency graph.
|
// cure implements Cure without acquiring a read lock on abortMu. cure must not
|
||||||
|
// be entered during Abort.
|
||||||
func (c *Cache) cure(a Artifact, curesExempt bool) (
|
func (c *Cache) cure(a Artifact, curesExempt bool) (
|
||||||
pathname *check.Absolute,
|
pathname *check.Absolute,
|
||||||
checksum unique.Handle[Checksum],
|
checksum unique.Handle[Checksum],
|
||||||
@@ -1871,8 +1882,8 @@ func (c *Cache) OpenStatus(a Artifact) (r io.ReadSeekCloser, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abort cancels all pending cures but does not close the store. Abort does not
|
// Abort cancels all pending cures and waits for them to clean up, but does not
|
||||||
// wait for cures to complete.
|
// close the cache.
|
||||||
func (c *Cache) Abort() {
|
func (c *Cache) Abort() {
|
||||||
c.closeMu.Lock()
|
c.closeMu.Lock()
|
||||||
defer c.closeMu.Unlock()
|
defer c.closeMu.Unlock()
|
||||||
@@ -1881,7 +1892,16 @@ func (c *Cache) Abort() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.toplevel.Swap(newToplevel(c.parent)).cancel()
|
c.toplevel.Load().cancel()
|
||||||
|
c.abortMu.Lock()
|
||||||
|
defer c.abortMu.Unlock()
|
||||||
|
|
||||||
|
// holding abortMu, identPending stays empty
|
||||||
|
c.wg.Wait()
|
||||||
|
c.identMu.Lock()
|
||||||
|
c.toplevel.Store(newToplevel(c.parent))
|
||||||
|
clear(c.identErr)
|
||||||
|
c.identMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close cancels all pending cures and waits for them to clean up.
|
// Close cancels all pending cures and waits for them to clean up.
|
||||||
|
|||||||
Reference in New Issue
Block a user