package main import ( "context" "log" "time" "hakurei.app/fhs" "hakurei.app/internal/report" "hakurei.app/internal/uevent" ) // newRejectColdboot returns a function to be called on every subsequent pending // coldboot, and returns whether coldboot should proceed. Rejection is sticky. func newRejectColdboot() func() bool { // one coldboot per five minutes, two consecutive coldboot const ( coldbootInterval = 5 * time.Minute coldbootBurst = 2 ) done := make(chan struct{}) s := make(chan struct{}, coldbootBurst) s <- struct{}{} // for early fault before reporting is ready go func() { t := time.NewTicker(coldbootInterval) for { select { case <-done: return case <-t.C: select { case s <- struct{}{}: default: } } } }() return func() bool { select { case <-s: return true case <-done: return false default: close(done) return false } } } // consume continuously consumes events from conn with retries. func consume( ctx context.Context, r *report.Reporter, conn *uevent.Conn, uuid uevent.UUID, events chan<- *uevent.Message, ) { defer close(events) nextColdboot := newRejectColdboot() coldboot := true retry: if dispatchErr := conn.Consume(ctx, fhs.Sys, &uuid, events, coldboot, func(path string) { log.Println("coldboot visited", path) }, func(err error) bool { if _, ok := err.(uevent.NeedsColdboot); ok && !nextColdboot() { r.Dispatch( report.Degraded, "rejecting coldboot loop", err, ) return false } r.Dispatch( report.Inconsistent, "consumed invalid message", err, ) return true }, nil); dispatchErr != nil { if _, ok := dispatchErr.(uevent.Recoverable); !ok { r.Dispatch( report.Fatal, "discontinuing uevent processing due to nonrecoverable error", dispatchErr, ) return } if _, ok := dispatchErr.(uevent.NeedsColdboot); ok { // coldboot loop rejected by handler coldboot = false } goto retry } }