internal/uevent: separate recvmsg helper
All checks were successful
Test / Create distribution (push) Successful in 1m13s
Test / Sandbox (push) Successful in 3m3s
Test / Hakurei (push) Successful in 4m13s
Test / ShareFS (push) Successful in 4m16s
Test / Sandbox (race detector) (push) Successful in 5m37s
Test / Hakurei (race detector) (push) Successful in 6m46s
Test / Flake checks (push) Successful in 1m23s
All checks were successful
Test / Create distribution (push) Successful in 1m13s
Test / Sandbox (push) Successful in 3m3s
Test / Hakurei (push) Successful in 4m13s
Test / ShareFS (push) Successful in 4m16s
Test / Sandbox (race detector) (push) Successful in 5m37s
Test / Hakurei (race detector) (push) Successful in 6m46s
Test / Flake checks (push) Successful in 1m23s
This enables messages to be received separately. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
@@ -27,24 +27,30 @@ type (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
exclConsume = iota
|
||||||
|
|
||||||
|
_exclLen
|
||||||
|
)
|
||||||
|
|
||||||
// Conn represents a NETLINK_KOBJECT_UEVENT socket.
|
// Conn represents a NETLINK_KOBJECT_UEVENT socket.
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
conn *netlink.Conn
|
conn *netlink.Conn
|
||||||
|
|
||||||
// Whether currently between a call to enterExcl and exitExcl.
|
// Whether currently between a call to enterExcl and exitExcl.
|
||||||
excl atomic.Bool
|
excl [_exclLen]atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// enterExcl must be called entering a critical section that interacts with conn.
|
// enterExcl must be called entering a critical section that interacts with conn.
|
||||||
func (c *Conn) enterExcl() error {
|
func (c *Conn) enterExcl(k int) error {
|
||||||
if !c.excl.CompareAndSwap(false, true) {
|
if !c.excl[k].CompareAndSwap(false, true) {
|
||||||
return syscall.EAGAIN
|
return syscall.EAGAIN
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// exitExcl must be called exiting a critical section that interacts with conn.
|
// exitExcl must be called exiting a critical section that interacts with conn.
|
||||||
func (c *Conn) exitExcl() { c.excl.Store(false) }
|
func (c *Conn) exitExcl(k int) { c.excl[k].Store(false) }
|
||||||
|
|
||||||
// Close closes the underlying socket.
|
// Close closes the underlying socket.
|
||||||
func (c *Conn) Close() error { return c.conn.Close() }
|
func (c *Conn) Close() error { return c.conn.Close() }
|
||||||
@@ -89,39 +95,48 @@ func (e *BadPortError) Error() string {
|
|||||||
" on NETLINK_KOBJECT_UEVENT"
|
" on NETLINK_KOBJECT_UEVENT"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// receiveEvent receives a single event and returns the address of its [Message].
|
||||||
|
func (c *Conn) receiveEvent(ctx context.Context) (*Message, error) {
|
||||||
|
data, _, from, err := c.conn.Recvmsg(ctx, 0)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, syscall.ENOBUFS) {
|
||||||
|
return nil, ReceiveBufferError{}
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// lib/kobject_uevent.c:
|
||||||
|
// set portid 0 to inform userspace message comes from kernel
|
||||||
|
if v, ok := from.(*syscall.SockaddrNetlink); !ok {
|
||||||
|
return nil, ErrBadSocket
|
||||||
|
} else if v.Pid != 0 {
|
||||||
|
return nil, (*BadPortError)(v)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
var msg Message
|
||||||
|
if err = msg.UnmarshalBinary(data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &msg, err
|
||||||
|
}
|
||||||
|
|
||||||
// Consume continuously receives and parses events from the kernel. It returns
|
// Consume continuously receives and parses events from the kernel. It returns
|
||||||
// the first error it encounters.
|
// the first error it encounters.
|
||||||
//
|
//
|
||||||
// Callers must not restart event processing after a non-nil error that does not
|
// Callers must not restart event processing after a non-nil error that does not
|
||||||
// satisfy [Recoverable] is returned.
|
// satisfy [Recoverable] is returned.
|
||||||
func (c *Conn) Consume(ctx context.Context, events chan<- *Message) error {
|
func (c *Conn) Consume(ctx context.Context, events chan<- *Message) error {
|
||||||
if err := c.enterExcl(); err != nil {
|
if err := c.enterExcl(exclConsume); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer c.exitExcl()
|
defer c.exitExcl(exclConsume)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
data, _, from, err := c.conn.Recvmsg(ctx, 0)
|
msg, err := c.receiveEvent(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, syscall.ENOBUFS) {
|
|
||||||
return ReceiveBufferError{}
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
events <- msg
|
||||||
// lib/kobject_uevent.c:
|
|
||||||
// set portid 0 to inform userspace message comes from kernel
|
|
||||||
if v, ok := from.(*syscall.SockaddrNetlink); !ok {
|
|
||||||
return ErrBadSocket
|
|
||||||
} else if v.Pid != 0 {
|
|
||||||
return (*BadPortError)(v)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
var msg Message
|
|
||||||
if err = msg.UnmarshalBinary(data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
events <- &msg
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user