Files
hakurei/internal/uevent/uevent.go
Ophestra cd0beeaf8e
All checks were successful
Test / Create distribution (push) Successful in 1m3s
Test / Sandbox (push) Successful in 2m42s
Test / Hakurei (push) Successful in 3m49s
Test / ShareFS (push) Successful in 3m47s
Test / Sandbox (race detector) (push) Successful in 5m12s
Test / Hakurei (race detector) (push) Successful in 6m20s
Test / Flake checks (push) Successful in 1m20s
internal/uevent: optionally pass UUID during coldboot
This enables rejection of non-coldboot synthetic events.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2026-04-06 12:42:47 +09:00

339 lines
8.7 KiB
Go

// Package uevent provides userspace client for consuming events from a
// NETLINK_KOBJECT_UEVENT socket, as well as helpers for supplementing
// events received from the kernel.
package uevent
import (
"context"
"encoding"
"encoding/hex"
"errors"
"fmt"
"strconv"
"sync/atomic"
"syscall"
"unsafe"
"hakurei.app/internal/netlink"
)
type (
// Recoverable is satisfied by errors that are safe to recover from.
Recoverable interface{ recoverable() }
// Nontrivial is satisfied by errors preferring a JSON encoding.
Nontrivial interface{ nontrivial() }
// NeedsColdboot is satisfied by errors indicating divergence of local state
// from the kernel, usually from lost uevent data.
NeedsColdboot interface {
Recoverable
coldboot()
}
)
const (
exclConsume = iota
_exclLen
)
// Conn represents a NETLINK_KOBJECT_UEVENT socket.
type Conn struct {
conn *netlink.Conn
// Whether currently between a call to enterExcl and exitExcl.
excl [_exclLen]atomic.Bool
}
// enterExcl must be called entering a critical section that interacts with conn.
func (c *Conn) enterExcl(k int) error {
if !c.excl[k].CompareAndSwap(false, true) {
return syscall.EAGAIN
}
return nil
}
// exitExcl must be called exiting a critical section that interacts with conn.
func (c *Conn) exitExcl(k int) { c.excl[k].Store(false) }
// Close closes the underlying socket.
func (c *Conn) Close() error { return c.conn.Close() }
// Dial returns the address of a newly connected [Conn].
func Dial(rcvbuf int64) (*Conn, error) {
// kernel group is hard coded in lib/kobject_uevent.c, undocumented
c, err := netlink.Dial(syscall.NETLINK_KOBJECT_UEVENT, 1, rcvbuf)
if err != nil {
return nil, err
}
return &Conn{conn: c}, err
}
var (
// ErrBadSocket is returned by [Conn.Consume] for a reply from a
// syscall.Sockaddr with unexpected concrete type.
ErrBadSocket = errors.New("unexpected socket address")
)
// ReceiveBufferError indicates one or more [Message] being lost due to the
// socket receive buffer filling up. This is usually caused by epoll waking the
// receiving program up too late.
type ReceiveBufferError struct{ _ [0]*ReceiveBufferError }
var _ NeedsColdboot = ReceiveBufferError{}
func (ReceiveBufferError) recoverable() {}
func (ReceiveBufferError) coldboot() {}
func (ReceiveBufferError) Unwrap() error { return syscall.ENOBUFS }
func (e ReceiveBufferError) Error() string { return syscall.ENOBUFS.Error() }
// BadPortError is returned by [Conn.Consume] upon receiving a message that did
// not come from the kernel.
type BadPortError syscall.SockaddrNetlink
var _ Recoverable = new(BadPortError)
func (*BadPortError) recoverable() {}
func (e *BadPortError) Error() string {
return "unexpected message from port id " + strconv.Itoa(int(e.Pid)) +
" on NETLINK_KOBJECT_UEVENT"
}
// receiveEvent receives a single event and returns the address of its [Message].
func (c *Conn) receiveEvent(ctx context.Context) (*Message, error) {
data, _, from, err := c.conn.Recvmsg(ctx, 0)
if err != nil {
if errors.Is(err, syscall.ENOBUFS) {
return nil, ReceiveBufferError{}
}
return nil, err
}
// lib/kobject_uevent.c:
// set portid 0 to inform userspace message comes from kernel
if v, ok := from.(*syscall.SockaddrNetlink); !ok {
return nil, ErrBadSocket
} else if v.Pid != 0 {
return nil, (*BadPortError)(v)
}
var msg Message
if err = msg.UnmarshalBinary(data); err != nil {
return nil, err
}
return &msg, err
}
// UUID represents the value of SYNTH_UUID.
//
// This is not a generic UUID implementation. Do not attempt to use it for
// anything other than passing and interpreting the SYNTH_UUID environment
// variable of a uevent.
type UUID [16]byte
const (
// SizeUUID is the fixed size of string representation of [UUID] according
// to Documentation/ABI/testing/sysfs-uevent.
SizeUUID = 4 + len(UUID{})*2
// UUIDSep is the separator byte of [UUID].
UUIDSep = '-'
)
var (
_ encoding.TextAppender = new(UUID)
_ encoding.TextMarshaler = new(UUID)
_ encoding.TextUnmarshaler = new(UUID)
)
// String formats uuid according to Documentation/ABI/testing/sysfs-uevent.
func (uuid *UUID) String() string {
s := make([]byte, 0, SizeUUID)
s = hex.AppendEncode(s, uuid[:4])
s = append(s, UUIDSep)
s = hex.AppendEncode(s, uuid[4:6])
s = append(s, UUIDSep)
s = hex.AppendEncode(s, uuid[6:8])
s = append(s, UUIDSep)
s = hex.AppendEncode(s, uuid[8:10])
s = append(s, UUIDSep)
s = hex.AppendEncode(s, uuid[10:16])
return unsafe.String(unsafe.SliceData(s), len(s))
}
func (uuid *UUID) AppendText(data []byte) ([]byte, error) {
return append(data, uuid.String()...), nil
}
func (uuid *UUID) MarshalText() ([]byte, error) {
return uuid.AppendText(nil)
}
var (
// ErrAutoUUID is returned parsing a SYNTH_UUID generated by the kernel for
// a synthetic event without a UUID passed in.
ErrAutoUUID = errors.New("UUID is not passed in")
)
// UUIDSizeError describes an incorrectly sized string representation of [UUID].
type UUIDSizeError int
func (e UUIDSizeError) Error() string {
return "got " + strconv.Itoa(int(e)) + " bytes " +
"instead of " + strconv.Itoa(SizeUUID)
}
// UUIDSeparatorError is an invalid separator in a malformed string
// representation of [UUID].
type UUIDSeparatorError byte
func (e UUIDSeparatorError) Error() string {
return fmt.Sprintf("invalid UUID separator: %#U", rune(e))
}
// UnmarshalText parses data according to Documentation/ABI/testing/sysfs-uevent.
func (uuid *UUID) UnmarshalText(data []byte) (err error) {
if len(data) == 1 && data[0] == '0' {
return ErrAutoUUID
}
if len(data) != SizeUUID {
return UUIDSizeError(len(data))
}
if _, err = hex.Decode(uuid[:], data[:8]); err != nil {
return
}
if data[8] != UUIDSep {
return UUIDSeparatorError(data[8])
}
data = data[9:]
if _, err = hex.Decode(uuid[4:], data[:4]); err != nil {
return
}
if data[4] != UUIDSep {
return UUIDSeparatorError(data[4])
}
data = data[5:]
if _, err = hex.Decode(uuid[6:], data[:4]); err != nil {
return
}
if data[4] != UUIDSep {
return UUIDSeparatorError(data[4])
}
data = data[5:]
if _, err = hex.Decode(uuid[8:], data[:4]); err != nil {
return
}
if data[4] != UUIDSep {
return UUIDSeparatorError(data[4])
}
data = data[5:]
_, err = hex.Decode(uuid[10:], data)
return
}
// Consume continuously receives and parses events from the kernel and handles
// [Recoverable] and [NeedsColdboot] errors via caller-supplied functions,
// entering coldboot when required.
//
// For each uevent file visited by [Coldboot], handleColdbootVisited is called
// with its pathname. This function must never block.
//
// When consuming events, a non-nil error not satisfying [Recoverable] is
// returned immediately. Otherwise, handleConsumeErr is called with the error
// value. If the error satisfies [NeedsColdboot], a [Coldboot] is arranged
// before event processing resumes. If handleConsumeErr returns false, the error
// value is immediately returned as is.
//
// Callers are expected to reject excessively frequent [NeedsColdboot] errors
// in handleConsumeErr to avoid being stuck in a [Coldboot] loop. Event
// processing is allowed to restart without initial coldboot after recovering
// from such a condition, provided the caller adequately reports the degraded,
// diverging state to the user.
//
// Callers must not restart event processing after a non-nil error that does not
// satisfy [Recoverable] is returned.
func (c *Conn) Consume(
ctx context.Context,
sysfs string,
uuid *UUID,
events chan<- *Message,
coldboot bool,
handleColdbootVisited func(string),
handleConsumeErr func(error) bool,
handleWalkErr func(error) error,
) error {
if err := c.enterExcl(exclConsume); err != nil {
return err
}
defer c.exitExcl(exclConsume)
filterErr := func(err error) (error, bool) {
if _, ok := err.(Recoverable); !ok {
return err, true
}
// avoids dropping pending coldboot
if _, ok := err.(NeedsColdboot); ok {
coldboot = true
}
return err, !handleConsumeErr(err)
}
retry:
if coldboot {
goto coldboot
}
for {
msg, err := c.receiveEvent(ctx)
if err == nil {
events <- msg
continue
}
if _, ok := filterErr(err); ok {
return err
}
}
coldboot:
coldboot = false
visited := make(chan string)
ctxColdboot, cancelColdboot := context.WithCancel(ctx)
var coldbootErr error
go func() {
coldbootErr = Coldboot(ctxColdboot, sysfs, uuid, visited, handleWalkErr)
close(visited)
}()
for pathname := range visited {
handleColdbootVisited(pathname)
for {
msg, err := c.receiveEvent(nil)
if err == nil {
events <- msg
continue
}
if errors.Is(err, syscall.EWOULDBLOCK) {
break
}
if filteredErr, ok := filterErr(err); ok {
cancelColdboot()
return filteredErr
}
}
}
cancelColdboot()
if coldbootErr != nil {
return coldbootErr
}
goto retry
}