diff --git a/internal/uevent/uevent.go b/internal/uevent/uevent.go index 6c041775..2c37172f 100644 --- a/internal/uevent/uevent.go +++ b/internal/uevent/uevent.go @@ -121,22 +121,102 @@ func (c *Conn) receiveEvent(ctx context.Context) (*Message, error) { return &msg, err } -// Consume continuously receives and parses events from the kernel. It returns -// the first error it encounters. +// Consume continuously receives and parses events from the kernel and handles +// [Recoverable] and [NeedsColdboot] errors via caller-supplied functions, +// entering coldboot when required. +// +// For each uevent file visited by [Coldboot], handleColdbootVisited is called +// with its pathname. This function must never block. +// +// When consuming events, a non-nil error not satisfying [Recoverable] is +// returned immediately. Otherwise, handleConsumeErr is called with the error +// value. If the error satisfies [NeedsColdboot], a [Coldboot] is arranged +// before event processing resumes. If handleConsumeErr returns false, the error +// value is immediately returned as is. +// +// Callers are expected to reject excessively frequent [NeedsColdboot] errors +// in handleConsumeErr to avoid being stuck in a [Coldboot] loop. Event +// processing is allowed to restart without initial coldboot after recovering +// from such a condition, provided the caller adequately reports the degraded, +// diverging state to the user. // // Callers must not restart event processing after a non-nil error that does not // satisfy [Recoverable] is returned. -func (c *Conn) Consume(ctx context.Context, events chan<- *Message) error { +func (c *Conn) Consume( + ctx context.Context, + sysfs string, + events chan<- *Message, + coldboot bool, + + handleColdbootVisited func(string), + handleConsumeErr func(error) bool, + handleWalkErr func(error) error, +) error { if err := c.enterExcl(exclConsume); err != nil { return err } defer c.exitExcl(exclConsume) + filterErr := func(err error) (error, bool) { + if _, ok := err.(Recoverable); !ok { + return err, true + } + + // avoids dropping pending coldboot + if _, ok := err.(NeedsColdboot); ok { + coldboot = true + } + + return err, !handleConsumeErr(err) + } + +retry: + if coldboot { + goto coldboot + } for { msg, err := c.receiveEvent(ctx) - if err != nil { + if err == nil { + events <- msg + continue + } + + if _, ok := filterErr(err); ok { return err } - events <- msg } + +coldboot: + coldboot = false + + visited := make(chan string) + ctxColdboot, cancelColdboot := context.WithCancel(ctx) + var coldbootErr error + go func() { + coldbootErr = Coldboot(ctxColdboot, sysfs, visited, handleWalkErr) + close(visited) + }() + for pathname := range visited { + handleColdbootVisited(pathname) + + for { + msg, err := c.receiveEvent(nil) + if err == nil { + events <- msg + continue + } + if errors.Is(err, syscall.EWOULDBLOCK) { + break + } + if filteredErr, ok := filterErr(err); ok { + cancelColdboot() + return filteredErr + } + } + } + cancelColdboot() + if coldbootErr != nil { + return coldbootErr + } + goto retry } diff --git a/internal/uevent/uevent_test.go b/internal/uevent/uevent_test.go index 9b986ff3..6420eea1 100644 --- a/internal/uevent/uevent_test.go +++ b/internal/uevent/uevent_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "hakurei.app/fhs" "hakurei.app/internal/uevent" ) @@ -155,13 +156,23 @@ func TestDialConsume(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() + consume := func(c *uevent.Conn, ctx context.Context) error { + return c.Consume(ctx, fhs.Sys, events, false, func(path string) { + t.Log("coldboot visited", path) + }, func(err error) bool { + t.Log(err) + _, ok := err.(uevent.NeedsColdboot) + return !ok + }, nil) + } + wg.Go(func() { - if err = c.Consume(ctx, events); err != context.Canceled { + if err = consume(c, ctx); err != context.Canceled { panic(err) } }) wg.Go(func() { - if err0 = c0.Consume(ctx, events); err0 != context.Canceled { + if err0 = consume(c0, ctx); err0 != context.Canceled { panic(err0) } }) @@ -185,11 +196,11 @@ func TestDialConsume(t *testing.T) { exclExit := make(chan struct{}) wg.Go(func() { defer func() { exclExit <- struct{}{} }() - errs[0] = c.Consume(ctx, events) + errs[0] = consume(c, ctx) }) wg.Go(func() { defer func() { exclExit <- struct{}{} }() - errs[1] = c.Consume(ctx, events) + errs[1] = consume(c, ctx) }) <-exclExit cancel()