5 Commits

Author SHA1 Message Date
28a8dc67d2 internal/pipewire: raise Core::Sync timeout
All checks were successful
Test / Create distribution (push) Successful in 38s
Test / Sandbox (push) Successful in 2m27s
Test / Hakurei (push) Successful in 3m24s
Test / Hpkg (push) Successful in 4m12s
Test / Sandbox (race detector) (push) Successful in 4m40s
Test / Hakurei (race detector) (push) Successful in 5m26s
Test / Flake checks (push) Successful in 1m35s
Hopefully relieves spurious failures on a very overloaded system.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-12-19 00:49:33 +09:00
ec49c63c5f internal/pipewire: EPOLL_CTL_ADD instead of EPOLL_CTL_MOD
All checks were successful
Test / Create distribution (push) Successful in 37s
Test / Sandbox (push) Successful in 2m29s
Test / Hakurei (push) Successful in 3m24s
Test / Hpkg (push) Successful in 4m15s
Test / Sandbox (race detector) (push) Successful in 4m30s
Test / Hakurei (race detector) (push) Successful in 5m26s
Test / Flake checks (push) Successful in 1m45s
Implementation is no longer tied down by the limitations of SyscallConn.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-12-19 00:43:44 +09:00
5a50bf80ee internal/pipewire: hold socket fd directly
All checks were successful
Test / Create distribution (push) Successful in 38s
Test / Sandbox (push) Successful in 2m39s
Test / Hakurei (push) Successful in 3m31s
Test / Sandbox (race detector) (push) Successful in 4m30s
Test / Hpkg (push) Successful in 4m34s
Test / Hakurei (race detector) (push) Successful in 5m29s
Test / Flake checks (push) Successful in 1m40s
The interface provided by net is not used here and is a leftover from a previous implementation. This change removes it.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-12-19 00:28:24 +09:00
ce06b7b663 internal/pipewire: inform conn of blocking intent
All checks were successful
Test / Create distribution (push) Successful in 30s
Test / Sandbox (push) Successful in 2m31s
Test / Hakurei (push) Successful in 3m29s
Test / Hpkg (push) Successful in 4m24s
Test / Sandbox (race detector) (push) Successful in 4m30s
Test / Hakurei (race detector) (push) Successful in 5m27s
Test / Flake checks (push) Successful in 1m40s
The interface does not expose underlying kernel notification mechanisms. This change removes the need to poll in situations were the next call might block.

This is made cumbersome by the SyscallConn interface left over from a previous implementation, it will be replaced in a later commit as the current implementation does not make use of any net.Conn methods other than Close.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-12-19 00:00:33 +09:00
08bdc68f3a internal/pipewire: sendmsg/recvmsg errors are fatal
All checks were successful
Test / Create distribution (push) Successful in 40s
Test / Sandbox (push) Successful in 2m35s
Test / Hakurei (push) Successful in 3m31s
Test / Hpkg (push) Successful in 4m20s
Test / Sandbox (race detector) (push) Successful in 4m42s
Test / Hakurei (race detector) (push) Successful in 5m28s
Test / Flake checks (push) Successful in 1m35s
When returned wrapped as a syscall error, these are impossible to recover from, so wrap them as a fatal error.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-12-18 23:33:12 +09:00
4 changed files with 163 additions and 34 deletions

View File

@@ -514,7 +514,7 @@ var ErrNotDone = errors.New("did not receive a Core::Done event targeting previo
const (
// syncTimeout is the maximum duration [Core.Sync] is allowed to take before
// receiving [CoreDone] or failing.
syncTimeout = 5 * time.Second
syncTimeout = 10 * time.Second
)
// Sync queues a [CoreSync] message for the PipeWire server and initiates a Roundtrip.

View File

@@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"io"
"net"
"os"
"path"
"runtime"
@@ -27,10 +26,16 @@ import (
"strconv"
"strings"
"syscall"
"time"
)
// Conn is a low level unix socket interface used by [Context].
type Conn interface {
// MightBlock informs the implementation that the next call to
// Recvmsg or Sendmsg might block. A zero or negative timeout
// cancels this behaviour.
MightBlock(timeout time.Duration)
// Recvmsg calls syscall.Recvmsg on the underlying socket.
Recvmsg(p, oob []byte, flags int) (n, oobn, recvflags int, err error)
@@ -138,45 +143,142 @@ func New(conn Conn, props SPADict) (*Context, error) {
return &ctx, nil
}
// A SyscallConnCloser is a [syscall.Conn] that implements [io.Closer].
type SyscallConnCloser interface {
syscall.Conn
io.Closer
// unixConn is an implementation of the [Conn] interface for connections
// to Unix domain sockets.
type unixConn struct {
fd int
// Whether creation of a new epoll instance was attempted.
epoll bool
// File descriptor referring to the new epoll instance.
// Valid if epoll is true and epollErr is nil.
epollFd int
// Error returned by syscall.EpollCreate1.
epollErr error
// Stores epoll events from the kernel.
epollBuf [32]syscall.EpollEvent
// If non-zero, next call is treated as a blocking call.
timeout time.Duration
}
// A SyscallConn is a [Conn] adapter for [syscall.Conn].
type SyscallConn struct{ SyscallConnCloser }
// Dial connects to a Unix domain socket described by name.
func Dial(name string) (Conn, error) {
if fd, err := syscall.Socket(syscall.AF_UNIX, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC|syscall.SOCK_NONBLOCK, 0); err != nil {
return nil, os.NewSyscallError("socket", err)
} else if err = syscall.Connect(fd, &syscall.SockaddrUnix{Name: name}); err != nil {
_ = syscall.Close(fd)
return nil, os.NewSyscallError("connect", err)
} else {
return &unixConn{fd: fd}, nil
}
}
// Recvmsg implements [Conn.Recvmsg] via [syscall.Conn.SyscallConn].
func (conn SyscallConn) Recvmsg(p, oob []byte, flags int) (n, oobn, recvflags int, err error) {
var rc syscall.RawConn
if rc, err = conn.SyscallConn(); err != nil {
// MightBlock informs the implementation that the next call
// might block for a non-zero timeout.
func (conn *unixConn) MightBlock(timeout time.Duration) {
if timeout < 0 {
timeout = 0
}
conn.timeout = timeout
}
// wantsEpoll is called at the beginning of any method that might use epoll.
func (conn *unixConn) wantsEpoll() error {
if !conn.epoll {
conn.epoll = true
conn.epollFd, conn.epollErr = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if conn.epollErr == nil {
if conn.epollErr = syscall.EpollCtl(conn.epollFd, syscall.EPOLL_CTL_ADD, conn.fd, &syscall.EpollEvent{
Events: syscall.EPOLLERR | syscall.EPOLLHUP,
Fd: int32(conn.fd),
}); conn.epollErr != nil {
_ = syscall.Close(conn.epollFd)
}
}
}
return conn.epollErr
}
// wait waits for a specific I/O event on fd. Caller must arrange for wantsEpoll
// to be called somewhere before wait is called.
func (conn *unixConn) wait(event uint32) (err error) {
if conn.timeout == 0 {
return nil
}
deadline := time.Now().Add(conn.timeout)
conn.timeout = 0
if err = syscall.EpollCtl(conn.epollFd, syscall.EPOLL_CTL_MOD, conn.fd, &syscall.EpollEvent{
Events: event | syscall.EPOLLERR | syscall.EPOLLHUP,
Fd: int32(conn.fd),
}); err != nil {
return
}
if controlErr := rc.Control(func(fd uintptr) {
n, oobn, recvflags, _, err = syscall.Recvmsg(int(fd), p, oob, flags)
}); controlErr != nil && err == nil {
err = controlErr
for timeout := deadline.Sub(time.Now()); timeout > 0; timeout = deadline.Sub(time.Now()) {
var n int
if n, err = syscall.EpollWait(conn.epollFd, conn.epollBuf[:], int(timeout/time.Millisecond)); err != nil {
return
}
switch n {
case 1: // only the socket fd is ever added
if conn.epollBuf[0].Fd != int32(conn.fd) { // unreachable
return syscall.ENOTRECOVERABLE
}
if conn.epollBuf[0].Events&event == event ||
conn.epollBuf[0].Events&syscall.EPOLLERR|syscall.EPOLLHUP != 0 {
return nil
}
err = syscall.ETIME
continue
case 0: // timeout
return syscall.ETIMEDOUT
default: // unreachable
return syscall.ENOTRECOVERABLE
}
}
return
}
// Sendmsg implements [Conn.Sendmsg] via [syscall.Conn.SyscallConn].
func (conn SyscallConn) Sendmsg(p, oob []byte, flags int) (n int, err error) {
var rc syscall.RawConn
if rc, err = conn.SyscallConn(); err != nil {
// Recvmsg calls syscall.Recvmsg on the underlying socket.
func (conn *unixConn) Recvmsg(p, oob []byte, flags int) (n, oobn, recvflags int, err error) {
if err = conn.wantsEpoll(); err != nil {
return
} else if err = conn.wait(syscall.EPOLLIN); err != nil {
return
}
if controlErr := rc.Control(func(fd uintptr) {
n, err = syscall.SendmsgN(int(fd), p, oob, nil, flags)
}); controlErr != nil && err == nil {
err = controlErr
}
n, oobn, recvflags, _, err = syscall.Recvmsg(conn.fd, p, oob, flags)
return
}
// Sendmsg calls syscall.Sendmsg on the underlying socket.
func (conn *unixConn) Sendmsg(p, oob []byte, flags int) (n int, err error) {
if err = conn.wantsEpoll(); err != nil {
return
} else if err = conn.wait(syscall.EPOLLOUT); err != nil {
return
}
n, err = syscall.SendmsgN(conn.fd, p, oob, nil, flags)
return
}
// Close closes the underlying socket and the epoll fd if populated.
func (conn *unixConn) Close() (err error) {
if conn.epoll && conn.epollErr == nil {
conn.epollErr = syscall.Close(conn.epollFd)
}
if err = syscall.Close(conn.fd); err != nil {
return
}
return conn.epollErr
}
// MustNew calls [New](conn, props) and panics on error.
// It is intended for use in tests with hard-coded strings.
func MustNew(conn Conn, props SPADict) *Context {
@@ -310,7 +412,7 @@ func (ctx *Context) recvmsg(remaining []byte) (payload []byte, err error) {
}
if err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
ctx.closeReceivedFiles()
return nil, os.NewSyscallError("recvmsg", err)
return nil, &ProxyFatalError{Err: os.NewSyscallError("recvmsg", err), ProxyErrs: ctx.cloneAsProxyErrors()}
}
}
@@ -347,7 +449,7 @@ func (ctx *Context) sendmsg(p []byte, fds ...int) error {
}
if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
return os.NewSyscallError("sendmsg", err)
return &ProxyFatalError{Err: os.NewSyscallError("sendmsg", err), ProxyErrs: ctx.cloneAsProxyErrors()}
}
return err
}
@@ -598,8 +700,15 @@ func (ctx *Context) Roundtrip() (err error) {
return
}
const (
// roundtripTimeout is the maximum duration socket operations during
// Context.roundtrip is allowed to block for.
roundtripTimeout = 5 * time.Second
)
// roundtrip implements the Roundtrip method without checking proxyErrors.
func (ctx *Context) roundtrip() (err error) {
ctx.conn.MightBlock(roundtripTimeout)
if err = ctx.sendmsg(ctx.buf, ctx.pendingFiles...); err != nil {
return
}
@@ -633,6 +742,7 @@ func (ctx *Context) roundtrip() (err error) {
}()
var remaining []byte
ctx.conn.MightBlock(roundtripTimeout)
for {
remaining, err = ctx.consume(remaining)
if err == nil {
@@ -857,14 +967,14 @@ const Remote = "PIPEWIRE_REMOTE"
const DEFAULT_SYSTEM_RUNTIME_DIR = "/run/pipewire"
// connectName connects to a PipeWire remote by name and returns the [net.UnixConn].
func connectName(name string, manager bool) (conn *net.UnixConn, err error) {
// connectName connects to a PipeWire remote by name and returns the resulting [Conn].
func connectName(name string, manager bool) (conn Conn, err error) {
if manager && !strings.HasSuffix(name, "-manager") {
return connectName(name+"-manager", false)
}
if path.IsAbs(name) || (len(name) > 0 && name[0] == '@') {
return net.DialUnix("unix", nil, &net.UnixAddr{Name: name, Net: "unix"})
return Dial(name)
} else {
runtimeDir, ok := os.LookupEnv("PIPEWIRE_RUNTIME_DIR")
if !ok || !path.IsAbs(runtimeDir) {
@@ -879,7 +989,7 @@ func connectName(name string, manager bool) (conn *net.UnixConn, err error) {
if !ok || !path.IsAbs(runtimeDir) {
runtimeDir = DEFAULT_SYSTEM_RUNTIME_DIR
}
return net.DialUnix("unix", nil, &net.UnixAddr{Name: path.Join(runtimeDir, name), Net: "unix"})
return Dial(path.Join(runtimeDir, name))
}
}
@@ -897,12 +1007,11 @@ func ConnectName(name string, manager bool, props SPADict) (ctx *Context, err er
}
}
var conn *net.UnixConn
var conn Conn
if conn, err = connectName(name, manager); err != nil {
return
}
if ctx, err = New(SyscallConn{conn}, props); err != nil {
if ctx, err = New(conn, props); err != nil {
ctx = nil
_ = conn.Close()
}

View File

@@ -6,6 +6,7 @@ import (
"strconv"
. "syscall"
"testing"
"time"
"hakurei.app/container/stub"
"hakurei.app/internal/pipewire"
@@ -715,6 +716,18 @@ type stubUnixConn struct {
current int
}
func (conn *stubUnixConn) MightBlock(timeout time.Duration) {
if timeout != 5*time.Second {
panic("unexpected timeout " + timeout.String())
}
if conn.current == 0 ||
(conn.samples[conn.current-1].nr == SYS_RECVMSG && conn.samples[conn.current-1].errno == EAGAIN && conn.samples[conn.current].nr == SYS_SENDMSG) ||
(conn.samples[conn.current-1].nr == SYS_SENDMSG && conn.samples[conn.current].nr == SYS_RECVMSG) {
return
}
panic("unexpected blocking hint before sample " + strconv.Itoa(conn.current))
}
// nextSample returns the current sample and increments the counter.
func (conn *stubUnixConn) nextSample(nr uintptr) (sample *stubUnixConnSample, wantOOB []byte, err error) {
sample = &conn.samples[conn.current]

View File

@@ -6,6 +6,7 @@ import (
"path"
"syscall"
"testing"
"time"
"hakurei.app/container/stub"
"hakurei.app/internal/acl"
@@ -497,6 +498,12 @@ type stubPipeWireConn struct {
curSendmsg int
}
func (conn *stubPipeWireConn) MightBlock(timeout time.Duration) {
if timeout != 5*time.Second {
panic("unexpected timeout " + timeout.String())
}
}
// Recvmsg marshals and copies a stubMessage prepared ahead of time.
func (conn *stubPipeWireConn) Recvmsg(p, _ []byte, _ int) (n, _, recvflags int, err error) {
defer func() { conn.curRecvmsg++ }()