Compare commits
4 Commits
wip-sysfs-
...
staging
| Author | SHA1 | Date | |
|---|---|---|---|
|
b5592633f5
|
|||
|
584e302168
|
|||
|
141958656f
|
|||
|
648079f42c
|
@@ -179,7 +179,7 @@ func (direct) mustLoopback(ctx context.Context, msg message.Msg) {
|
||||
lo = ifi.Index
|
||||
}
|
||||
|
||||
c, err := netlink.DialRoute()
|
||||
c, err := netlink.DialRoute(0)
|
||||
if err != nil {
|
||||
msg.GetLogger().Fatalln(err)
|
||||
}
|
||||
|
||||
@@ -46,7 +46,11 @@ type Conn struct {
|
||||
|
||||
// Dial returns the address of a newly connected generic netlink connection of
|
||||
// specified family and groups.
|
||||
func Dial(family int, groups uint32) (*Conn, error) {
|
||||
//
|
||||
// For a nonzero rcvbuf, the socket receive buffer size is set to its absolute
|
||||
// value via SO_RCVBUF for a positive value, or SO_RCVBUFFORCE for a negative
|
||||
// value.
|
||||
func Dial(family int, groups uint32, rcvbuf int64) (*Conn, error) {
|
||||
var c Conn
|
||||
if fd, err := syscall.Socket(
|
||||
syscall.AF_NETLINK,
|
||||
@@ -75,6 +79,23 @@ func Dial(family int, groups uint32) (*Conn, error) {
|
||||
return nil, syscall.ENOTRECOVERABLE
|
||||
}
|
||||
|
||||
if rcvbuf != 0 {
|
||||
opt := syscall.SO_RCVBUF
|
||||
if rcvbuf < 0 {
|
||||
opt = syscall.SO_RCVBUFFORCE
|
||||
rcvbuf = -rcvbuf
|
||||
}
|
||||
if err = syscall.SetsockoptInt(
|
||||
fd,
|
||||
syscall.SOL_SOCKET,
|
||||
opt,
|
||||
int(rcvbuf),
|
||||
); err != nil {
|
||||
_ = syscall.Close(fd)
|
||||
return nil, os.NewSyscallError("setsockopt", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.family = family
|
||||
c.f = os.NewFile(uintptr(fd), "netlink")
|
||||
if c.raw, err = c.f.SyscallConn(); err != nil {
|
||||
@@ -101,13 +122,13 @@ func (c *Conn) Close() error {
|
||||
return c.f.Close()
|
||||
}
|
||||
|
||||
// Recvfrom wraps recv(2) with nonblocking behaviour via the runtime network poller.
|
||||
// Recvmsg wraps recv(2) with nonblocking behaviour via the runtime network poller.
|
||||
//
|
||||
// The returned slice is valid until the next call to Recvfrom.
|
||||
func (c *Conn) Recvfrom(
|
||||
// The returned slice is valid until the next call to Recvmsg.
|
||||
func (c *Conn) Recvmsg(
|
||||
ctx context.Context,
|
||||
flags int,
|
||||
) (data []byte, from syscall.Sockaddr, err error) {
|
||||
) (data []byte, recvflags int, from syscall.Sockaddr, err error) {
|
||||
if err = c.f.SetReadDeadline(time.Time{}); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -117,7 +138,7 @@ func (c *Conn) Recvfrom(
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
rcErr := c.raw.Read(func(fd uintptr) (done bool) {
|
||||
n, from, err = syscall.Recvfrom(int(fd), data, flags)
|
||||
n, _, recvflags, from, err = syscall.Recvmsg(int(fd), data, nil, flags)
|
||||
return err != syscall.EWOULDBLOCK
|
||||
})
|
||||
if n >= 0 {
|
||||
@@ -129,7 +150,7 @@ func (c *Conn) Recvfrom(
|
||||
select {
|
||||
case rcErr := <-done:
|
||||
if err != nil {
|
||||
err = os.NewSyscallError("recvfrom", err)
|
||||
err = os.NewSyscallError("recvmsg", err)
|
||||
} else {
|
||||
err = rcErr
|
||||
}
|
||||
@@ -147,12 +168,12 @@ func (c *Conn) Recvfrom(
|
||||
}
|
||||
}
|
||||
|
||||
// Sendto wraps send(2) with nonblocking behaviour via the runtime network poller.
|
||||
func (c *Conn) Sendto(
|
||||
// Sendmsg wraps send(2) with nonblocking behaviour via the runtime network poller.
|
||||
func (c *Conn) Sendmsg(
|
||||
ctx context.Context,
|
||||
p []byte,
|
||||
flags int,
|
||||
to syscall.Sockaddr,
|
||||
flags int,
|
||||
) (err error) {
|
||||
if err = c.f.SetWriteDeadline(time.Time{}); err != nil {
|
||||
return
|
||||
@@ -161,7 +182,7 @@ func (c *Conn) Sendto(
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- c.raw.Write(func(fd uintptr) (done bool) {
|
||||
err = syscall.Sendto(int(fd), p, flags, to)
|
||||
err = syscall.Sendmsg(int(fd), p, nil, to, flags)
|
||||
return err != syscall.EWOULDBLOCK
|
||||
})
|
||||
}()
|
||||
@@ -169,7 +190,7 @@ func (c *Conn) Sendto(
|
||||
select {
|
||||
case rcErr := <-done:
|
||||
if err != nil {
|
||||
err = os.NewSyscallError("sendto", err)
|
||||
err = os.NewSyscallError("sendmsg", err)
|
||||
} else {
|
||||
err = rcErr
|
||||
}
|
||||
@@ -278,7 +299,7 @@ type HandlerFunc func(resp []syscall.NetlinkMessage) error
|
||||
func (c *Conn) receive(ctx context.Context, f HandlerFunc, flags int) error {
|
||||
for {
|
||||
var resp []syscall.NetlinkMessage
|
||||
if data, _, err := c.Recvfrom(ctx, flags); err != nil {
|
||||
if data, _, _, err := c.Recvmsg(ctx, flags); err != nil {
|
||||
return err
|
||||
} else if len(data) < syscall.NLMSG_HDRLEN {
|
||||
return syscall.EBADE
|
||||
@@ -302,9 +323,9 @@ func (c *Conn) Roundtrip(ctx context.Context, f HandlerFunc) error {
|
||||
}
|
||||
defer func() { c.seq++ }()
|
||||
|
||||
if err := c.Sendto(ctx, c.pending(), 0, &syscall.SockaddrNetlink{
|
||||
if err := c.Sendmsg(ctx, c.pending(), &syscall.SockaddrNetlink{
|
||||
Family: syscall.AF_NETLINK,
|
||||
}); err != nil {
|
||||
}, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -13,8 +13,8 @@ type RouteConn struct{ conn *Conn }
|
||||
func (c *RouteConn) Close() error { return c.conn.Close() }
|
||||
|
||||
// DialRoute returns the address of a newly connected [RouteConn].
|
||||
func DialRoute() (*RouteConn, error) {
|
||||
c, err := Dial(syscall.NETLINK_ROUTE, 0)
|
||||
func DialRoute(rcvbuf int64) (*RouteConn, error) {
|
||||
c, err := Dial(syscall.NETLINK_ROUTE, 0, rcvbuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -19,10 +19,6 @@ const (
|
||||
KOBJ_OFFLINE
|
||||
KOBJ_BIND
|
||||
KOBJ_UNBIND
|
||||
|
||||
// Synthetic denotes a [Message] that originates from outside the kernel. It
|
||||
// is not valid in the wire format and is only meaningful within this package.
|
||||
Synthetic KobjectAction = 0xfeed
|
||||
)
|
||||
|
||||
// lib/kobject_uevent.c
|
||||
@@ -42,10 +38,6 @@ func (act KobjectAction) Valid() bool { return int(act) < len(kobject_actions) }
|
||||
|
||||
// String returns the corresponding string sent over netlink.
|
||||
func (act KobjectAction) String() string {
|
||||
if act == Synthetic {
|
||||
return "synthetic"
|
||||
}
|
||||
|
||||
if !act.Valid() {
|
||||
return "unsupported kobject_action " + strconv.Itoa(int(act))
|
||||
}
|
||||
@@ -53,7 +45,7 @@ func (act KobjectAction) String() string {
|
||||
}
|
||||
|
||||
func (act KobjectAction) AppendText(b []byte) ([]byte, error) {
|
||||
if !act.Valid() && act != Synthetic {
|
||||
if !act.Valid() {
|
||||
return b, syscall.EINVAL
|
||||
}
|
||||
return append(b, act.String()...), nil
|
||||
|
||||
@@ -29,15 +29,4 @@ func TestKobjectAction(t *testing.T) {
|
||||
t.Errorf("String: %q, want %q", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
adeT(t, "synthetic", uevent.Synthetic, "synthetic",
|
||||
uevent.UnsupportedActionError("synthetic"), nil)
|
||||
|
||||
t.Run("validate synthetic", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
if uevent.Synthetic.Valid() {
|
||||
t.Errorf("Valid unexpectedly succeeded")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -110,12 +110,6 @@ SEQNUM=780`},
|
||||
}, "move", uevent.MissingHeaderError(
|
||||
"move",
|
||||
), syscall.EINVAL, "unsupported kobject_action 2989 event:"},
|
||||
|
||||
{"synthetic", uevent.Message{
|
||||
Action: uevent.Synthetic,
|
||||
}, "synthetic@\x00", uevent.UnsupportedActionError(
|
||||
"synthetic",
|
||||
), nil, "synthetic event:"},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
@@ -1,87 +0,0 @@
|
||||
package uevent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io/fs"
|
||||
"log"
|
||||
"path/filepath"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// Enumerate scans sysfs and emits [Synthetic] events. It returns the first
|
||||
// error it encounters.
|
||||
//
|
||||
// The specified filesystem must present the sysfs root.
|
||||
func Enumerate(
|
||||
sysfs fs.FS,
|
||||
handleWalkErr func(error) error,
|
||||
events chan<- *Message,
|
||||
) error {
|
||||
if handleWalkErr == nil {
|
||||
handleWalkErr = func(err error) error {
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
log.Println("enumerate", err)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return fs.WalkDir(sysfs, "devices", func(
|
||||
path string,
|
||||
d fs.DirEntry,
|
||||
err error,
|
||||
) error {
|
||||
if err != nil {
|
||||
return handleWalkErr(err)
|
||||
}
|
||||
|
||||
if d.IsDir() || d.Name() != "uevent" {
|
||||
return nil
|
||||
}
|
||||
|
||||
msg := Message{
|
||||
Action: Synthetic,
|
||||
|
||||
// cleans path, appears to be compatible with kernel behaviour
|
||||
DevPath: filepath.Dir(path),
|
||||
}
|
||||
|
||||
var target string
|
||||
if target, err = fs.ReadLink(
|
||||
sysfs,
|
||||
filepath.Join(msg.DevPath, "subsystem"),
|
||||
); err != nil {
|
||||
if err = handleWalkErr(err); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
msg.Env = append(msg.Env, "SUBSYSTEM="+filepath.Base(target))
|
||||
}
|
||||
|
||||
// read entire file: slicing does not copy
|
||||
var env []byte
|
||||
if env, err = fs.ReadFile(sysfs, path); err != nil {
|
||||
return handleWalkErr(err)
|
||||
}
|
||||
|
||||
for _, s := range bytes.Split(env, []byte{'\n'}) {
|
||||
if len(s) == 0 {
|
||||
continue
|
||||
}
|
||||
msg.Env = append(msg.Env, unsafe.String(unsafe.SliceData(s), len(s)))
|
||||
}
|
||||
|
||||
if len(msg.Env) == 0 {
|
||||
// this implies absent subsystem, its error is already handled
|
||||
return nil
|
||||
}
|
||||
|
||||
if msg.DevPath != "" && msg.DevPath[0] != '/' {
|
||||
msg.DevPath = "/" + msg.DevPath
|
||||
}
|
||||
events <- &msg
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@@ -1,28 +0,0 @@
|
||||
package uevent_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"hakurei.app/internal/uevent"
|
||||
)
|
||||
|
||||
func TestEnumerate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
events := make(chan *uevent.Message, 1<<10)
|
||||
wg.Go(func() {
|
||||
for msg := range events {
|
||||
t.Log(msg)
|
||||
}
|
||||
})
|
||||
|
||||
if err := uevent.Enumerate(os.DirFS("/sys"), nil, events); err != nil {
|
||||
t.Fatalf("Enumerate: error = %v", err)
|
||||
}
|
||||
close(events)
|
||||
}
|
||||
@@ -18,6 +18,19 @@ type (
|
||||
Recoverable interface{ recoverable() }
|
||||
// Nontrivial is satisfied by errors preferring a JSON encoding.
|
||||
Nontrivial interface{ nontrivial() }
|
||||
|
||||
// NeedsColdboot is satisfied by errors indicating divergence of local state
|
||||
// from the kernel, usually from lost uevent data.
|
||||
NeedsColdboot interface {
|
||||
Recoverable
|
||||
coldboot()
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
exclConsume = iota
|
||||
|
||||
_exclLen
|
||||
)
|
||||
|
||||
// Conn represents a NETLINK_KOBJECT_UEVENT socket.
|
||||
@@ -25,27 +38,27 @@ type Conn struct {
|
||||
conn *netlink.Conn
|
||||
|
||||
// Whether currently between a call to enterExcl and exitExcl.
|
||||
excl atomic.Bool
|
||||
excl [_exclLen]atomic.Bool
|
||||
}
|
||||
|
||||
// enterExcl must be called entering a critical section that interacts with conn.
|
||||
func (c *Conn) enterExcl() error {
|
||||
if !c.excl.CompareAndSwap(false, true) {
|
||||
func (c *Conn) enterExcl(k int) error {
|
||||
if !c.excl[k].CompareAndSwap(false, true) {
|
||||
return syscall.EAGAIN
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// exitExcl must be called exiting a critical section that interacts with conn.
|
||||
func (c *Conn) exitExcl() { c.excl.Store(false) }
|
||||
func (c *Conn) exitExcl(k int) { c.excl[k].Store(false) }
|
||||
|
||||
// Close closes the underlying socket.
|
||||
func (c *Conn) Close() error { return c.conn.Close() }
|
||||
|
||||
// Dial returns the address of a newly connected [Conn].
|
||||
func Dial() (*Conn, error) {
|
||||
func Dial(rcvbuf int64) (*Conn, error) {
|
||||
// kernel group is hard coded in lib/kobject_uevent.c, undocumented
|
||||
c, err := netlink.Dial(syscall.NETLINK_KOBJECT_UEVENT, 1)
|
||||
c, err := netlink.Dial(syscall.NETLINK_KOBJECT_UEVENT, 1, rcvbuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -58,6 +71,18 @@ var (
|
||||
ErrBadSocket = errors.New("unexpected socket address")
|
||||
)
|
||||
|
||||
// ReceiveBufferError indicates one or more [Message] being lost due to the
|
||||
// socket receive buffer filling up. This is usually caused by epoll waking the
|
||||
// receiving program up too late.
|
||||
type ReceiveBufferError struct{ _ [0]*ReceiveBufferError }
|
||||
|
||||
var _ NeedsColdboot = ReceiveBufferError{}
|
||||
|
||||
func (ReceiveBufferError) recoverable() {}
|
||||
func (ReceiveBufferError) coldboot() {}
|
||||
func (ReceiveBufferError) Unwrap() error { return syscall.ENOBUFS }
|
||||
func (e ReceiveBufferError) Error() string { return syscall.ENOBUFS.Error() }
|
||||
|
||||
// BadPortError is returned by [Conn.Consume] upon receiving a message that did
|
||||
// not come from the kernel.
|
||||
type BadPortError syscall.SockaddrNetlink
|
||||
@@ -70,36 +95,48 @@ func (e *BadPortError) Error() string {
|
||||
" on NETLINK_KOBJECT_UEVENT"
|
||||
}
|
||||
|
||||
// receiveEvent receives a single event and returns the address of its [Message].
|
||||
func (c *Conn) receiveEvent(ctx context.Context) (*Message, error) {
|
||||
data, _, from, err := c.conn.Recvmsg(ctx, 0)
|
||||
if err != nil {
|
||||
if errors.Is(err, syscall.ENOBUFS) {
|
||||
return nil, ReceiveBufferError{}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// lib/kobject_uevent.c:
|
||||
// set portid 0 to inform userspace message comes from kernel
|
||||
if v, ok := from.(*syscall.SockaddrNetlink); !ok {
|
||||
return nil, ErrBadSocket
|
||||
} else if v.Pid != 0 {
|
||||
return nil, (*BadPortError)(v)
|
||||
|
||||
}
|
||||
|
||||
var msg Message
|
||||
if err = msg.UnmarshalBinary(data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &msg, err
|
||||
}
|
||||
|
||||
// Consume continuously receives and parses events from the kernel. It returns
|
||||
// the first error it encounters.
|
||||
//
|
||||
// Callers must not restart event processing after a non-nil error that does not
|
||||
// satisfy [Recoverable] is returned.
|
||||
func (c *Conn) Consume(ctx context.Context, events chan<- *Message) error {
|
||||
if err := c.enterExcl(); err != nil {
|
||||
if err := c.enterExcl(exclConsume); err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.exitExcl()
|
||||
defer c.exitExcl(exclConsume)
|
||||
|
||||
for {
|
||||
data, from, err := c.conn.Recvfrom(ctx, 0)
|
||||
msg, err := c.receiveEvent(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// lib/kobject_uevent.c:
|
||||
// set portid 0 to inform userspace message comes from kernel
|
||||
if v, ok := from.(*syscall.SockaddrNetlink); !ok {
|
||||
return ErrBadSocket
|
||||
} else if v.Pid != 0 {
|
||||
return (*BadPortError)(v)
|
||||
|
||||
}
|
||||
|
||||
var msg Message
|
||||
if err = msg.UnmarshalBinary(data); err != nil {
|
||||
return err
|
||||
}
|
||||
events <- &msg
|
||||
events <- msg
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ func adeB[V any, S interface {
|
||||
func TestDialConsume(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c, err := uevent.Dial()
|
||||
c, err := uevent.Dial(0)
|
||||
if err != nil {
|
||||
t.Fatalf("Dial: error = %v", err)
|
||||
}
|
||||
@@ -127,7 +127,7 @@ func TestDialConsume(t *testing.T) {
|
||||
})
|
||||
|
||||
// check kernel-assigned port id
|
||||
c0, err0 := uevent.Dial()
|
||||
c0, err0 := uevent.Dial(0)
|
||||
if err0 != nil {
|
||||
t.Fatalf("Dial: error = %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user