internal/pkg: move concurrent cure implementation
Test / Create distribution (push) Successful in 57s
Test / Sandbox (push) Successful in 2m58s
Test / ShareFS (push) Successful in 3m59s
Test / Hakurei (push) Successful in 4m52s
Test / Sandbox (race detector) (push) Successful in 5m30s
Test / Hakurei (race detector) (push) Successful in 6m40s
Test / Flake checks (push) Successful in 1m13s
Test / Create distribution (push) Successful in 57s
Test / Sandbox (push) Successful in 2m58s
Test / ShareFS (push) Successful in 3m59s
Test / Hakurei (push) Successful in 4m52s
Test / Sandbox (race detector) (push) Successful in 5m30s
Test / Hakurei (race detector) (push) Successful in 6m40s
Test / Flake checks (push) Successful in 1m13s
This is useful independent of cure and might replace the Collect hack. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
+23
-17
@@ -1922,6 +1922,28 @@ func (c *Cache) tryExtern(
|
|||||||
return checksum, err
|
return checksum, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cureMany concurrently collects outcome of multiple [Artifact].
|
||||||
|
func (c *Cache) cureMany(inputs []Artifact, r map[Artifact]cureRes) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(inputs))
|
||||||
|
res := make([]cureRes, len(inputs))
|
||||||
|
errs := make(DependencyCureError, 0, len(inputs))
|
||||||
|
var errsMu sync.Mutex
|
||||||
|
for i, d := range inputs {
|
||||||
|
pending := pendingArtifactDep{d, &res[i], &errs, &errsMu, &wg}
|
||||||
|
go pending.cure(c)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
return &errs
|
||||||
|
}
|
||||||
|
for i, p := range res {
|
||||||
|
r[inputs[i]] = p
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// cure implements Cure without acquiring a read lock on abortMu. cure must not
|
// cure implements Cure without acquiring a read lock on abortMu. cure must not
|
||||||
// be entered during Abort.
|
// be entered during Abort.
|
||||||
func (c *Cache) cure(a Artifact, curesExempt bool) (
|
func (c *Cache) cure(a Artifact, curesExempt bool) (
|
||||||
@@ -2182,25 +2204,9 @@ func (c *Cache) cure(a Artifact, curesExempt bool) (
|
|||||||
case FloodArtifact:
|
case FloodArtifact:
|
||||||
inputs := a.Inputs()
|
inputs := a.Inputs()
|
||||||
f := FContext{t, make(map[Artifact]cureRes, len(inputs))}
|
f := FContext{t, make(map[Artifact]cureRes, len(inputs))}
|
||||||
|
if err = c.cureMany(inputs, f.inputs); err != nil {
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(len(inputs))
|
|
||||||
res := make([]cureRes, len(inputs))
|
|
||||||
errs := make(DependencyCureError, 0, len(inputs))
|
|
||||||
var errsMu sync.Mutex
|
|
||||||
for i, d := range inputs {
|
|
||||||
pending := pendingArtifactDep{d, &res[i], &errs, &errsMu, &wg}
|
|
||||||
go pending.cure(c)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
if len(errs) > 0 {
|
|
||||||
err = &errs
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for i, p := range res {
|
|
||||||
f.inputs[inputs[i]] = p
|
|
||||||
}
|
|
||||||
|
|
||||||
sh := sha512.New384()
|
sh := sha512.New384()
|
||||||
err = c.encode(sh, a, f.inputs)
|
err = c.encode(sh, a, f.inputs)
|
||||||
|
|||||||
Reference in New Issue
Block a user