From 91a2d4d6e1c0039f6b442c8c97b1aa809e09e33b Mon Sep 17 00:00:00 2001 From: Ophestra Date: Mon, 30 Mar 2026 23:49:16 +0900 Subject: [PATCH] internal/uevent: integrate error handling in event loop There are many subtleties when recovering from errors in the event loop, and coldboot requires internals to drain the receive buffer as synthetic uevents are being arranged. Signed-off-by: Ophestra --- internal/uevent/uevent.go | 90 ++++++++++++++++++++++++++++++++-- internal/uevent/uevent_test.go | 19 +++++-- 2 files changed, 100 insertions(+), 9 deletions(-) diff --git a/internal/uevent/uevent.go b/internal/uevent/uevent.go index 6c041775..2c37172f 100644 --- a/internal/uevent/uevent.go +++ b/internal/uevent/uevent.go @@ -121,22 +121,102 @@ func (c *Conn) receiveEvent(ctx context.Context) (*Message, error) { return &msg, err } -// Consume continuously receives and parses events from the kernel. It returns -// the first error it encounters. +// Consume continuously receives and parses events from the kernel and handles +// [Recoverable] and [NeedsColdboot] errors via caller-supplied functions, +// entering coldboot when required. +// +// For each uevent file visited by [Coldboot], handleColdbootVisited is called +// with its pathname. This function must never block. +// +// When consuming events, a non-nil error not satisfying [Recoverable] is +// returned immediately. Otherwise, handleConsumeErr is called with the error +// value. If the error satisfies [NeedsColdboot], a [Coldboot] is arranged +// before event processing resumes. If handleConsumeErr returns false, the error +// value is immediately returned as is. +// +// Callers are expected to reject excessively frequent [NeedsColdboot] errors +// in handleConsumeErr to avoid being stuck in a [Coldboot] loop. Event +// processing is allowed to restart without initial coldboot after recovering +// from such a condition, provided the caller adequately reports the degraded, +// diverging state to the user. // // Callers must not restart event processing after a non-nil error that does not // satisfy [Recoverable] is returned. -func (c *Conn) Consume(ctx context.Context, events chan<- *Message) error { +func (c *Conn) Consume( + ctx context.Context, + sysfs string, + events chan<- *Message, + coldboot bool, + + handleColdbootVisited func(string), + handleConsumeErr func(error) bool, + handleWalkErr func(error) error, +) error { if err := c.enterExcl(exclConsume); err != nil { return err } defer c.exitExcl(exclConsume) + filterErr := func(err error) (error, bool) { + if _, ok := err.(Recoverable); !ok { + return err, true + } + + // avoids dropping pending coldboot + if _, ok := err.(NeedsColdboot); ok { + coldboot = true + } + + return err, !handleConsumeErr(err) + } + +retry: + if coldboot { + goto coldboot + } for { msg, err := c.receiveEvent(ctx) - if err != nil { + if err == nil { + events <- msg + continue + } + + if _, ok := filterErr(err); ok { return err } - events <- msg } + +coldboot: + coldboot = false + + visited := make(chan string) + ctxColdboot, cancelColdboot := context.WithCancel(ctx) + var coldbootErr error + go func() { + coldbootErr = Coldboot(ctxColdboot, sysfs, visited, handleWalkErr) + close(visited) + }() + for pathname := range visited { + handleColdbootVisited(pathname) + + for { + msg, err := c.receiveEvent(nil) + if err == nil { + events <- msg + continue + } + if errors.Is(err, syscall.EWOULDBLOCK) { + break + } + if filteredErr, ok := filterErr(err); ok { + cancelColdboot() + return filteredErr + } + } + } + cancelColdboot() + if coldbootErr != nil { + return coldbootErr + } + goto retry } diff --git a/internal/uevent/uevent_test.go b/internal/uevent/uevent_test.go index 9b986ff3..6420eea1 100644 --- a/internal/uevent/uevent_test.go +++ b/internal/uevent/uevent_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "hakurei.app/fhs" "hakurei.app/internal/uevent" ) @@ -155,13 +156,23 @@ func TestDialConsume(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() + consume := func(c *uevent.Conn, ctx context.Context) error { + return c.Consume(ctx, fhs.Sys, events, false, func(path string) { + t.Log("coldboot visited", path) + }, func(err error) bool { + t.Log(err) + _, ok := err.(uevent.NeedsColdboot) + return !ok + }, nil) + } + wg.Go(func() { - if err = c.Consume(ctx, events); err != context.Canceled { + if err = consume(c, ctx); err != context.Canceled { panic(err) } }) wg.Go(func() { - if err0 = c0.Consume(ctx, events); err0 != context.Canceled { + if err0 = consume(c0, ctx); err0 != context.Canceled { panic(err0) } }) @@ -185,11 +196,11 @@ func TestDialConsume(t *testing.T) { exclExit := make(chan struct{}) wg.Go(func() { defer func() { exclExit <- struct{}{} }() - errs[0] = c.Consume(ctx, events) + errs[0] = consume(c, ctx) }) wg.Go(func() { defer func() { exclExit <- struct{}{} }() - errs[1] = c.Consume(ctx, events) + errs[1] = consume(c, ctx) }) <-exclExit cancel()