// Package uevent provides userspace client for consuming events from a // NETLINK_KOBJECT_UEVENT socket, as well as helpers for supplementing // events received from the kernel. package uevent import ( "context" "encoding" "encoding/hex" "errors" "fmt" "strconv" "sync/atomic" "syscall" "unsafe" "hakurei.app/internal/netlink" ) type ( // Recoverable is satisfied by errors that are safe to recover from. Recoverable interface{ recoverable() } // Nontrivial is satisfied by errors preferring a JSON encoding. Nontrivial interface{ nontrivial() } // NeedsColdboot is satisfied by errors indicating divergence of local state // from the kernel, usually from lost uevent data. NeedsColdboot interface { Recoverable coldboot() } ) const ( exclConsume = iota _exclLen ) // Conn represents a NETLINK_KOBJECT_UEVENT socket. type Conn struct { conn *netlink.Conn // Whether currently between a call to enterExcl and exitExcl. excl [_exclLen]atomic.Bool } // enterExcl must be called entering a critical section that interacts with conn. func (c *Conn) enterExcl(k int) error { if !c.excl[k].CompareAndSwap(false, true) { return syscall.EAGAIN } return nil } // exitExcl must be called exiting a critical section that interacts with conn. func (c *Conn) exitExcl(k int) { c.excl[k].Store(false) } // Close closes the underlying socket. func (c *Conn) Close() error { return c.conn.Close() } // Dial returns the address of a newly connected [Conn]. func Dial(rcvbuf int64) (*Conn, error) { // kernel group is hard coded in lib/kobject_uevent.c, undocumented c, err := netlink.Dial(syscall.NETLINK_KOBJECT_UEVENT, 1, rcvbuf) if err != nil { return nil, err } return &Conn{conn: c}, err } var ( // ErrBadSocket is returned by [Conn.Consume] for a reply from a // syscall.Sockaddr with unexpected concrete type. ErrBadSocket = errors.New("unexpected socket address") ) // ReceiveBufferError indicates one or more [Message] being lost due to the // socket receive buffer filling up. This is usually caused by epoll waking the // receiving program up too late. type ReceiveBufferError struct{ _ [0]*ReceiveBufferError } var _ NeedsColdboot = ReceiveBufferError{} func (ReceiveBufferError) recoverable() {} func (ReceiveBufferError) coldboot() {} func (ReceiveBufferError) Unwrap() error { return syscall.ENOBUFS } func (e ReceiveBufferError) Error() string { return syscall.ENOBUFS.Error() } // BadPortError is returned by [Conn.Consume] upon receiving a message that did // not come from the kernel. type BadPortError syscall.SockaddrNetlink var _ Recoverable = new(BadPortError) func (*BadPortError) recoverable() {} func (e *BadPortError) Error() string { return "unexpected message from port id " + strconv.Itoa(int(e.Pid)) + " on NETLINK_KOBJECT_UEVENT" } // receiveEvent receives a single event and returns the address of its [Message]. func (c *Conn) receiveEvent(ctx context.Context) (*Message, error) { data, _, from, err := c.conn.Recvmsg(ctx, 0) if err != nil { if errors.Is(err, syscall.ENOBUFS) { return nil, ReceiveBufferError{} } return nil, err } // lib/kobject_uevent.c: // set portid 0 to inform userspace message comes from kernel if v, ok := from.(*syscall.SockaddrNetlink); !ok { return nil, ErrBadSocket } else if v.Pid != 0 { return nil, (*BadPortError)(v) } var msg Message if err = msg.UnmarshalBinary(data); err != nil { return nil, err } return &msg, err } // UUID represents the value of SYNTH_UUID. // // This is not a generic UUID implementation. Do not attempt to use it for // anything other than passing and interpreting the SYNTH_UUID environment // variable of a uevent. type UUID [16]byte const ( // SizeUUID is the fixed size of string representation of [UUID] according // to Documentation/ABI/testing/sysfs-uevent. SizeUUID = 4 + len(UUID{})*2 // UUIDSep is the separator byte of [UUID]. UUIDSep = '-' ) var ( _ encoding.TextAppender = new(UUID) _ encoding.TextMarshaler = new(UUID) _ encoding.TextUnmarshaler = new(UUID) ) // String formats uuid according to Documentation/ABI/testing/sysfs-uevent. func (uuid *UUID) String() string { s := make([]byte, 0, SizeUUID) s = hex.AppendEncode(s, uuid[:4]) s = append(s, UUIDSep) s = hex.AppendEncode(s, uuid[4:6]) s = append(s, UUIDSep) s = hex.AppendEncode(s, uuid[6:8]) s = append(s, UUIDSep) s = hex.AppendEncode(s, uuid[8:10]) s = append(s, UUIDSep) s = hex.AppendEncode(s, uuid[10:16]) return unsafe.String(unsafe.SliceData(s), len(s)) } func (uuid *UUID) AppendText(data []byte) ([]byte, error) { return append(data, uuid.String()...), nil } func (uuid *UUID) MarshalText() ([]byte, error) { return uuid.AppendText(nil) } var ( // ErrAutoUUID is returned parsing a SYNTH_UUID generated by the kernel for // a synthetic event without a UUID passed in. ErrAutoUUID = errors.New("UUID is not passed in") ) // UUIDSizeError describes an incorrectly sized string representation of [UUID]. type UUIDSizeError int func (e UUIDSizeError) Error() string { return "got " + strconv.Itoa(int(e)) + " bytes " + "instead of " + strconv.Itoa(SizeUUID) } // UUIDSeparatorError is an invalid separator in a malformed string // representation of [UUID]. type UUIDSeparatorError byte func (e UUIDSeparatorError) Error() string { return fmt.Sprintf("invalid UUID separator: %#U", rune(e)) } // UnmarshalText parses data according to Documentation/ABI/testing/sysfs-uevent. func (uuid *UUID) UnmarshalText(data []byte) (err error) { if len(data) == 1 && data[0] == '0' { return ErrAutoUUID } if len(data) != SizeUUID { return UUIDSizeError(len(data)) } if _, err = hex.Decode(uuid[:], data[:8]); err != nil { return } if data[8] != UUIDSep { return UUIDSeparatorError(data[8]) } data = data[9:] if _, err = hex.Decode(uuid[4:], data[:4]); err != nil { return } if data[4] != UUIDSep { return UUIDSeparatorError(data[4]) } data = data[5:] if _, err = hex.Decode(uuid[6:], data[:4]); err != nil { return } if data[4] != UUIDSep { return UUIDSeparatorError(data[4]) } data = data[5:] if _, err = hex.Decode(uuid[8:], data[:4]); err != nil { return } if data[4] != UUIDSep { return UUIDSeparatorError(data[4]) } data = data[5:] _, err = hex.Decode(uuid[10:], data) return } // Consume continuously receives and parses events from the kernel and handles // [Recoverable] and [NeedsColdboot] errors via caller-supplied functions, // entering coldboot when required. // // For each uevent file visited by [Coldboot], handleColdbootVisited is called // with its pathname. This function must never block. // // When consuming events, a non-nil error not satisfying [Recoverable] is // returned immediately. Otherwise, handleConsumeErr is called with the error // value. If the error satisfies [NeedsColdboot], a [Coldboot] is arranged // before event processing resumes. If handleConsumeErr returns false, the error // value is immediately returned as is. // // Callers are expected to reject excessively frequent [NeedsColdboot] errors // in handleConsumeErr to avoid being stuck in a [Coldboot] loop. Event // processing is allowed to restart without initial coldboot after recovering // from such a condition, provided the caller adequately reports the degraded, // diverging state to the user. // // Callers must not restart event processing after a non-nil error that does not // satisfy [Recoverable] is returned. func (c *Conn) Consume( ctx context.Context, sysfs string, uuid *UUID, events chan<- *Message, coldboot bool, handleColdbootVisited func(string), handleConsumeErr func(error) bool, handleWalkErr func(error) error, ) error { if err := c.enterExcl(exclConsume); err != nil { return err } defer c.exitExcl(exclConsume) filterErr := func(err error) (error, bool) { if _, ok := err.(Recoverable); !ok { return err, true } // avoids dropping pending coldboot if _, ok := err.(NeedsColdboot); ok { coldboot = true } return err, !handleConsumeErr(err) } retry: if coldboot { goto coldboot } for { msg, err := c.receiveEvent(ctx) if err == nil { events <- msg continue } if _, ok := filterErr(err); ok { return err } } coldboot: coldboot = false visited := make(chan string) ctxColdboot, cancelColdboot := context.WithCancel(ctx) var coldbootErr error go func() { coldbootErr = Coldboot(ctxColdboot, sysfs, uuid, visited, handleWalkErr) close(visited) }() for pathname := range visited { handleColdbootVisited(pathname) for { msg, err := c.receiveEvent(nil) if err == nil { events <- msg continue } if errors.Is(err, syscall.EWOULDBLOCK) { break } if filteredErr, ok := filterErr(err); ok { cancelColdboot() return filteredErr } } } cancelColdboot() if coldbootErr != nil { return coldbootErr } goto retry }