From b5592633f5b980970808fc5be43b0fb4f4d4e784 Mon Sep 17 00:00:00 2001 From: Ophestra Date: Mon, 30 Mar 2026 02:53:26 +0900 Subject: [PATCH] internal/uevent: separate recvmsg helper This enables messages to be received separately. Signed-off-by: Ophestra --- internal/uevent/uevent.go | 65 ++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/internal/uevent/uevent.go b/internal/uevent/uevent.go index 25fd02be..6c041775 100644 --- a/internal/uevent/uevent.go +++ b/internal/uevent/uevent.go @@ -27,24 +27,30 @@ type ( } ) +const ( + exclConsume = iota + + _exclLen +) + // Conn represents a NETLINK_KOBJECT_UEVENT socket. type Conn struct { conn *netlink.Conn // Whether currently between a call to enterExcl and exitExcl. - excl atomic.Bool + excl [_exclLen]atomic.Bool } // enterExcl must be called entering a critical section that interacts with conn. -func (c *Conn) enterExcl() error { - if !c.excl.CompareAndSwap(false, true) { +func (c *Conn) enterExcl(k int) error { + if !c.excl[k].CompareAndSwap(false, true) { return syscall.EAGAIN } return nil } // exitExcl must be called exiting a critical section that interacts with conn. -func (c *Conn) exitExcl() { c.excl.Store(false) } +func (c *Conn) exitExcl(k int) { c.excl[k].Store(false) } // Close closes the underlying socket. func (c *Conn) Close() error { return c.conn.Close() } @@ -89,39 +95,48 @@ func (e *BadPortError) Error() string { " on NETLINK_KOBJECT_UEVENT" } +// receiveEvent receives a single event and returns the address of its [Message]. +func (c *Conn) receiveEvent(ctx context.Context) (*Message, error) { + data, _, from, err := c.conn.Recvmsg(ctx, 0) + if err != nil { + if errors.Is(err, syscall.ENOBUFS) { + return nil, ReceiveBufferError{} + } + return nil, err + } + + // lib/kobject_uevent.c: + // set portid 0 to inform userspace message comes from kernel + if v, ok := from.(*syscall.SockaddrNetlink); !ok { + return nil, ErrBadSocket + } else if v.Pid != 0 { + return nil, (*BadPortError)(v) + + } + + var msg Message + if err = msg.UnmarshalBinary(data); err != nil { + return nil, err + } + return &msg, err +} + // Consume continuously receives and parses events from the kernel. It returns // the first error it encounters. // // Callers must not restart event processing after a non-nil error that does not // satisfy [Recoverable] is returned. func (c *Conn) Consume(ctx context.Context, events chan<- *Message) error { - if err := c.enterExcl(); err != nil { + if err := c.enterExcl(exclConsume); err != nil { return err } - defer c.exitExcl() + defer c.exitExcl(exclConsume) for { - data, _, from, err := c.conn.Recvmsg(ctx, 0) + msg, err := c.receiveEvent(ctx) if err != nil { - if errors.Is(err, syscall.ENOBUFS) { - return ReceiveBufferError{} - } return err } - - // lib/kobject_uevent.c: - // set portid 0 to inform userspace message comes from kernel - if v, ok := from.(*syscall.SockaddrNetlink); !ok { - return ErrBadSocket - } else if v.Pid != 0 { - return (*BadPortError)(v) - - } - - var msg Message - if err = msg.UnmarshalBinary(data); err != nil { - return err - } - events <- &msg + events <- msg } }