internal/uevent: integrate error handling in event loop
All checks were successful
Test / Create distribution (push) Successful in 1m17s
Test / Sandbox (push) Successful in 3m13s
Test / Hakurei (push) Successful in 4m18s
Test / ShareFS (push) Successful in 4m24s
Test / Sandbox (race detector) (push) Successful in 5m35s
Test / Hakurei (race detector) (push) Successful in 6m42s
Test / Flake checks (push) Successful in 1m25s

There are many subtleties when recovering from errors in the event loop, and coldboot requires internals to drain the receive buffer as synthetic uevents are being arranged.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2026-03-30 23:49:16 +09:00
parent a854719b9f
commit 91a2d4d6e1
2 changed files with 100 additions and 9 deletions

View File

@@ -121,22 +121,102 @@ func (c *Conn) receiveEvent(ctx context.Context) (*Message, error) {
return &msg, err return &msg, err
} }
// Consume continuously receives and parses events from the kernel. It returns // Consume continuously receives and parses events from the kernel and handles
// the first error it encounters. // [Recoverable] and [NeedsColdboot] errors via caller-supplied functions,
// entering coldboot when required.
//
// For each uevent file visited by [Coldboot], handleColdbootVisited is called
// with its pathname. This function must never block.
//
// When consuming events, a non-nil error not satisfying [Recoverable] is
// returned immediately. Otherwise, handleConsumeErr is called with the error
// value. If the error satisfies [NeedsColdboot], a [Coldboot] is arranged
// before event processing resumes. If handleConsumeErr returns false, the error
// value is immediately returned as is.
//
// Callers are expected to reject excessively frequent [NeedsColdboot] errors
// in handleConsumeErr to avoid being stuck in a [Coldboot] loop. Event
// processing is allowed to restart without initial coldboot after recovering
// from such a condition, provided the caller adequately reports the degraded,
// diverging state to the user.
// //
// Callers must not restart event processing after a non-nil error that does not // Callers must not restart event processing after a non-nil error that does not
// satisfy [Recoverable] is returned. // satisfy [Recoverable] is returned.
func (c *Conn) Consume(ctx context.Context, events chan<- *Message) error { func (c *Conn) Consume(
ctx context.Context,
sysfs string,
events chan<- *Message,
coldboot bool,
handleColdbootVisited func(string),
handleConsumeErr func(error) bool,
handleWalkErr func(error) error,
) error {
if err := c.enterExcl(exclConsume); err != nil { if err := c.enterExcl(exclConsume); err != nil {
return err return err
} }
defer c.exitExcl(exclConsume) defer c.exitExcl(exclConsume)
filterErr := func(err error) (error, bool) {
if _, ok := err.(Recoverable); !ok {
return err, true
}
// avoids dropping pending coldboot
if _, ok := err.(NeedsColdboot); ok {
coldboot = true
}
return err, !handleConsumeErr(err)
}
retry:
if coldboot {
goto coldboot
}
for { for {
msg, err := c.receiveEvent(ctx) msg, err := c.receiveEvent(ctx)
if err != nil { if err == nil {
events <- msg
continue
}
if _, ok := filterErr(err); ok {
return err return err
} }
events <- msg
} }
coldboot:
coldboot = false
visited := make(chan string)
ctxColdboot, cancelColdboot := context.WithCancel(ctx)
var coldbootErr error
go func() {
coldbootErr = Coldboot(ctxColdboot, sysfs, visited, handleWalkErr)
close(visited)
}()
for pathname := range visited {
handleColdbootVisited(pathname)
for {
msg, err := c.receiveEvent(nil)
if err == nil {
events <- msg
continue
}
if errors.Is(err, syscall.EWOULDBLOCK) {
break
}
if filteredErr, ok := filterErr(err); ok {
cancelColdboot()
return filteredErr
}
}
}
cancelColdboot()
if coldbootErr != nil {
return coldbootErr
}
goto retry
} }

View File

@@ -10,6 +10,7 @@ import (
"testing" "testing"
"time" "time"
"hakurei.app/fhs"
"hakurei.app/internal/uevent" "hakurei.app/internal/uevent"
) )
@@ -155,13 +156,23 @@ func TestDialConsume(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context()) ctx, cancel := context.WithCancel(t.Context())
defer cancel() defer cancel()
consume := func(c *uevent.Conn, ctx context.Context) error {
return c.Consume(ctx, fhs.Sys, events, false, func(path string) {
t.Log("coldboot visited", path)
}, func(err error) bool {
t.Log(err)
_, ok := err.(uevent.NeedsColdboot)
return !ok
}, nil)
}
wg.Go(func() { wg.Go(func() {
if err = c.Consume(ctx, events); err != context.Canceled { if err = consume(c, ctx); err != context.Canceled {
panic(err) panic(err)
} }
}) })
wg.Go(func() { wg.Go(func() {
if err0 = c0.Consume(ctx, events); err0 != context.Canceled { if err0 = consume(c0, ctx); err0 != context.Canceled {
panic(err0) panic(err0)
} }
}) })
@@ -185,11 +196,11 @@ func TestDialConsume(t *testing.T) {
exclExit := make(chan struct{}) exclExit := make(chan struct{})
wg.Go(func() { wg.Go(func() {
defer func() { exclExit <- struct{}{} }() defer func() { exclExit <- struct{}{} }()
errs[0] = c.Consume(ctx, events) errs[0] = consume(c, ctx)
}) })
wg.Go(func() { wg.Go(func() {
defer func() { exclExit <- struct{}{} }() defer func() { exclExit <- struct{}{} }()
errs[1] = c.Consume(ctx, events) errs[1] = consume(c, ctx)
}) })
<-exclExit <-exclExit
cancel() cancel()