Files
hakurei/internal/pipewire/pipewire.go
Ophestra ce06b7b663 internal/pipewire: inform conn of blocking intent
The interface does not expose underlying kernel notification mechanisms. This change removes the need to poll in situations were the next call might block.

This is made cumbersome by the SyscallConn interface left over from a previous implementation, it will be replaced in a later commit as the current implementation does not make use of any net.Conn methods other than Close.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-12-19 00:00:33 +09:00

1052 lines
32 KiB
Go

// Package pipewire provides a partial implementation of the PipeWire protocol native.
//
// This implementation is created based on black box analysis and very limited static
// analysis. The PipeWire documentation is vague and mostly nonexistent, and source code
// readability is not great due to frequent macro abuse, confusing and inconsistent naming
// schemes, almost complete absence of comments and the multiple layers of abstractions
// even internal to the library. The convoluted build system and frequent (mis)use of
// dlopen(3) further complicates static analysis efforts.
//
// Because of this, extreme care must be taken when reusing any code found in this package.
// While it is extensively tested to be correct for its role within Hakurei, remember that
// work is only done against PipeWire behaviour specific to this use case, and it is nearly
// impossible to guarantee that this interpretation of its behaviour is intended, or correct
// for any other uses of the protocol.
package pipewire
import (
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"os"
"path"
"runtime"
"slices"
"strconv"
"strings"
"syscall"
"time"
)
// Conn is a low level unix socket interface used by [Context].
type Conn interface {
// MightBlock informs the implementation that the next call to
// Recvmsg or Sendmsg might block. A zero or negative timeout
// cancels this behaviour.
MightBlock(timeout time.Duration)
// Recvmsg calls syscall.Recvmsg on the underlying socket.
Recvmsg(p, oob []byte, flags int) (n, oobn, recvflags int, err error)
// Sendmsg calls syscall.SendmsgN on the underlying socket.
Sendmsg(p, oob []byte, flags int) (n int, err error)
// Close closes the connection.
Close() error
}
// The kernel constant SCM_MAX_FD defines a limit on the number of file descriptors in the array.
// Attempting to send an array larger than this limit causes sendmsg(2) to fail with the error
// EINVAL. SCM_MAX_FD has the value 253 (or 255 before Linux 2.6.38).
const _SCM_MAX_FD = 253
// A Context holds state of a connection to PipeWire.
type Context struct {
// Pending message data, committed via a call to Roundtrip.
buf []byte
// Current [Header.Sequence] value, incremented every write.
sequence Int
// Pending file descriptors to be sent with the next message.
pendingFiles []int
// File count already kept track of in [Header].
headerFiles int
// Proxy id associations.
proxy map[Int]eventProxy
// Newly allocated proxies pending acknowledgement from the server.
pendingIds map[Int]struct{}
// Smallest available Id for the next proxy.
nextId Int
// Proxies targeted by the [CoreDestroy] event pending until next [CoreSync].
pendingDestruction map[Int]struct{}
// Proxy for built-in core events.
core Core
// Proxy for built-in client events.
client Client
// Current server-side [Header.Sequence] value, incremented on every event processed.
remoteSequence Int
// Files from the server. This is discarded on every Roundtrip so eventProxy
// implementations must make sure to close them to avoid leaking fds.
//
// These are not automatically set up as [os.File] because it is impossible
// to undo the effects of os.NewFile, which can be inconvenient for some uses.
receivedFiles []int
// Non-protocol errors encountered during event handling of the current Roundtrip;
// errors that prevent event processing from continuing must be panicked.
proxyErrors ProxyConsumeError
// Pending footer value for the next outgoing message.
// Newer footers appear to simply replace the existing one.
pendingFooter KnownSize
// Pending footer value deferred to the next round trip,
// sent if pendingFooter is nil. This is for emulating upstream behaviour
deferredPendingFooter KnownSize
// Server side registry generation number.
generation Long
// Deferred operations ran after a [Core.Sync] completes or Close is called. Errors
//are reported as part of [ProxyConsumeError] and is not considered fatal unless panicked.
syncComplete []func() error
// Passed to [Conn.Recvmsg]. Not copied if sufficient for all received messages.
iovecBuf [1 << 15]byte
// Passed to [Conn.Recvmsg] for ancillary messages and is never copied.
oobBuf [(_SCM_MAX_FD/2+_SCM_MAX_FD%2+2)<<3 + 1]byte
// Underlying connection, usually implemented by [net.UnixConn]
// via the [SyscallConn] adapter.
conn Conn
}
// cleanup arranges for f to be called after the next [CoreDone] event
// or when [Context] is closed.
func (ctx *Context) cleanup(f func() error) { ctx.syncComplete = append(ctx.syncComplete, f) }
// GetCore returns the address of [Core] held by this [Context].
func (ctx *Context) GetCore() *Core { return &ctx.core }
// GetClient returns the address of [Client] held by this [Context].
func (ctx *Context) GetClient() *Client { return &ctx.client }
// New initialises [Context] for an already established connection and returns its address.
// The caller must not call any method of the underlying [Conn] after this function returns.
func New(conn Conn, props SPADict) (*Context, error) {
ctx := Context{conn: conn}
ctx.core.ctx = &ctx
ctx.proxy = map[Int]eventProxy{
PW_ID_CORE: &ctx.core,
PW_ID_CLIENT: &ctx.client,
}
ctx.pendingIds = map[Int]struct{}{
PW_ID_CLIENT: {},
}
ctx.nextId = Int(len(ctx.proxy))
ctx.pendingDestruction = make(map[Int]struct{})
if err := ctx.core.hello(); err != nil {
return nil, err
}
if err := ctx.clientUpdateProperties(props); err != nil {
return nil, err
}
return &ctx, nil
}
// A SyscallConnCloser is a [syscall.Conn] that implements [io.Closer].
type SyscallConnCloser interface {
syscall.Conn
io.Closer
}
// A SyscallConn is a [Conn] adapter for [syscall.Conn].
type SyscallConn struct {
// Whether creation of a new epoll instance was attempted.
epoll bool
// File descriptor referring to the new epoll instance.
// Valid if epoll is true and epollErr is nil.
epollFd int
// Error returned by syscall.EpollCreate1.
epollErr error
// Stores epoll events from the kernel.
epollBuf [32]syscall.EpollEvent
// If non-zero, next call is treated as a blocking call.
timeout time.Duration
SyscallConnCloser
}
// MightBlock implements [Conn.MightBlock].
func (conn *SyscallConn) MightBlock(timeout time.Duration) {
if timeout < 0 {
timeout = 0
}
conn.timeout = timeout
}
// wantsEpoll is called at the beginning of any method that wants to use epoll.
func (conn *SyscallConn) wantsEpoll() error {
if !conn.epoll {
conn.epoll = true
conn.epollFd, conn.epollErr = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
}
return conn.epollErr
}
// wait waits for a specific I/O event for a fd passed for [syscall.Conn.SyscallConn]
// and returns a function that should be deferred by the caller regardless of error.
func (conn *SyscallConn) wait(fd int, event uint32, errP *error) (cleanupFunc func()) {
if conn.timeout == 0 {
return func() {}
}
deadline := time.Now().Add(conn.timeout)
conn.timeout = 0
if *errP = syscall.EpollCtl(conn.epollFd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{
Events: event | syscall.EPOLLERR | syscall.EPOLLHUP,
Fd: int32(fd),
}); *errP != nil {
return func() {}
} else {
cleanupFunc = func() {
// fd is guaranteed to remain valid while f executes but not after f returns
if epDelErr := syscall.EpollCtl(conn.epollFd, syscall.EPOLL_CTL_DEL, fd, nil); epDelErr != nil && *errP == nil {
*errP = epDelErr
return
}
}
}
for timeout := deadline.Sub(time.Now()); timeout > 0; timeout = deadline.Sub(time.Now()) {
if n, err := syscall.EpollWait(conn.epollFd, conn.epollBuf[:], int(timeout/time.Millisecond)); err != nil {
*errP = err
return
} else {
switch n {
case 1: // only the socket fd is ever added
if conn.epollBuf[0].Fd != int32(fd) { // unreachable
err = syscall.ENOTRECOVERABLE
break
}
if conn.epollBuf[0].Events&event == event ||
conn.epollBuf[0].Events&syscall.EPOLLERR|syscall.EPOLLHUP != 0 {
break
}
*errP = syscall.ETIME
continue
case 0: // timeout
err = syscall.ETIMEDOUT
break
default: // unreachable
err = syscall.ENOTRECOVERABLE
break
}
*errP = err
break
}
}
return
}
// Recvmsg implements [Conn.Recvmsg] via [syscall.Conn.SyscallConn].
func (conn *SyscallConn) Recvmsg(p, oob []byte, flags int) (n, oobn, recvflags int, err error) {
if err = conn.wantsEpoll(); err != nil {
return
}
var rc syscall.RawConn
if rc, err = conn.SyscallConn(); err != nil {
return
}
if controlErr := rc.Control(func(fd uintptr) {
defer conn.wait(int(fd), syscall.EPOLLIN, &err)()
if err != nil {
return
}
n, oobn, recvflags, _, err = syscall.Recvmsg(int(fd), p, oob, flags)
}); controlErr != nil && err == nil {
err = controlErr
}
return
}
// Sendmsg implements [Conn.Sendmsg] via [syscall.Conn.SyscallConn].
func (conn *SyscallConn) Sendmsg(p, oob []byte, flags int) (n int, err error) {
if err = conn.wantsEpoll(); err != nil {
return
}
var rc syscall.RawConn
if rc, err = conn.SyscallConn(); err != nil {
return
}
if controlErr := rc.Control(func(fd uintptr) {
defer conn.wait(int(fd), syscall.EPOLLOUT, &err)()
if err != nil {
return
}
n, err = syscall.SendmsgN(int(fd), p, oob, nil, flags)
}); controlErr != nil && err == nil {
err = controlErr
}
return
}
// Close implements [Conn.Close] via [syscall.Conn.Close] but also
// closes the epoll fd if populated.
func (conn *SyscallConn) Close() (err error) {
if conn.epoll && conn.epollErr == nil {
conn.epollErr = syscall.Close(conn.epollFd)
}
if err = conn.SyscallConnCloser.Close(); err != nil {
return
}
return conn.epollErr
}
// MustNew calls [New](conn, props) and panics on error.
// It is intended for use in tests with hard-coded strings.
func MustNew(conn Conn, props SPADict) *Context {
if ctx, err := New(conn, props); err != nil {
panic(err)
} else {
return ctx
}
}
// free releases the underlying storage of buf.
func (ctx *Context) free() { ctx.buf = make([]byte, 0) }
// queueFiles queues some file descriptors to be sent for the next message.
// It returns the offset of their index for the syscall.SCM_RIGHTS message.
func (ctx *Context) queueFiles(fds ...int) (offset Fd) {
offset = Fd(len(ctx.pendingFiles))
ctx.pendingFiles = append(ctx.pendingFiles, fds...)
return
}
// InconsistentFilesError describes an implementation error where an incorrect amount
// of files is queued between two messages.
type InconsistentFilesError [2]Int
func (e *InconsistentFilesError) Error() string {
return "queued " + strconv.Itoa(int(e[0])) + " files instead of the expected " + strconv.Itoa(int(e[1]))
}
// writeMessage appends the POD representation of v and an optional footer to buf.
func (ctx *Context) writeMessage(Id Int, v Message) (err error) {
if fileCount := Int(len(ctx.pendingFiles) - ctx.headerFiles); fileCount != v.FileCount() {
return &InconsistentFilesError{fileCount, v.FileCount()}
}
if ctx.pendingFooter == nil && ctx.deferredPendingFooter != nil {
ctx.pendingFooter, ctx.deferredPendingFooter = ctx.deferredPendingFooter, nil
}
ctx.buf, err = MessageEncoder{v}.AppendMessage(ctx.buf, Id, ctx.sequence, ctx.pendingFooter)
if err == nil {
ctx.headerFiles = len(ctx.pendingFiles)
ctx.pendingFooter = nil
ctx.sequence++
}
return
}
// mustWriteMessage calls writeMessage and panics if a non-nil error is returned.
// This must only be called from eventProxy.consume.
func (ctx *Context) mustWriteMessage(Id Int, v Message) {
if err := ctx.writeMessage(Id, v); err != nil {
panic(err)
}
}
// newProxyId returns a newly allocated proxy Id for the specified type.
func (ctx *Context) newProxyId(proxy eventProxy, ack bool) Int {
newId := ctx.nextId
ctx.proxy[newId] = proxy
if ack {
ctx.pendingIds[newId] = struct{}{}
}
increment:
ctx.nextId++
if _, ok := ctx.proxy[ctx.nextId]; ok {
goto increment
}
return newId
}
// closeReceivedFiles closes all receivedFiles. This is only during protocol error
// where [Context] is rendered unusable.
func (ctx *Context) closeReceivedFiles() {
slices.Sort(ctx.receivedFiles)
ctx.receivedFiles = slices.Compact(ctx.receivedFiles)
for _, fd := range ctx.receivedFiles {
_ = syscall.Close(fd)
}
ctx.receivedFiles = ctx.receivedFiles[:0]
}
// recvmsgFlags are flags passed to [Conn.Recvmsg] during Context.recvmsg.
const recvmsgFlags = syscall.MSG_CMSG_CLOEXEC | syscall.MSG_DONTWAIT
// recvmsg receives from conn and returns the received payload backed by
// iovecBuf. The returned slice is valid until the next call to recvmsg.
func (ctx *Context) recvmsg(remaining []byte) (payload []byte, err error) {
if copy(ctx.iovecBuf[:], remaining) != len(remaining) {
// should not be reachable with correct internal usage
return remaining, syscall.ENOMEM
}
var n, oobn, recvflags int
for {
n, oobn, recvflags, err = ctx.conn.Recvmsg(ctx.iovecBuf[len(remaining):], ctx.oobBuf[:], recvmsgFlags)
if oob := ctx.oobBuf[:oobn]; len(oob) > 0 {
var oobErr error
var scm []syscall.SocketControlMessage
if scm, oobErr = syscall.ParseSocketControlMessage(oob); oobErr != nil {
ctx.closeReceivedFiles()
err = oobErr
return
}
var fds []int
for i := range scm {
if fds, oobErr = syscall.ParseUnixRights(&scm[i]); oobErr != nil {
ctx.closeReceivedFiles()
err = oobErr
return
}
ctx.receivedFiles = append(ctx.receivedFiles, fds...)
}
}
if recvflags&syscall.MSG_CTRUNC != 0 {
// unreachable
ctx.closeReceivedFiles()
return nil, syscall.ENOMEM
}
if err != nil {
if err == syscall.EINTR {
continue
}
if err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
ctx.closeReceivedFiles()
return nil, &ProxyFatalError{Err: os.NewSyscallError("recvmsg", err), ProxyErrs: ctx.cloneAsProxyErrors()}
}
}
break
}
if n == 0 && len(remaining) != len(ctx.iovecBuf) && err == nil {
err = syscall.EPIPE // not wrapped as it did not come from the syscall
}
if n > 0 {
payload = ctx.iovecBuf[:len(remaining)+n]
}
return
}
// sendmsgFlags are flags passed to [Conn.Sendmsg] during Context.sendmsg.
const sendmsgFlags = syscall.MSG_NOSIGNAL | syscall.MSG_DONTWAIT
// sendmsg sends p to conn. sendmsg does not retain p.
func (ctx *Context) sendmsg(p []byte, fds ...int) error {
var oob []byte
if len(fds) > 0 {
oob = syscall.UnixRights(fds...)
}
for {
n, err := ctx.conn.Sendmsg(p, oob, sendmsgFlags)
if err == syscall.EINTR {
continue
}
if err == nil && n != len(p) {
err = syscall.EMSGSIZE
}
if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
return &ProxyFatalError{Err: os.NewSyscallError("sendmsg", err), ProxyErrs: ctx.cloneAsProxyErrors()}
}
return err
}
}
// An UnknownIdError describes a server message with an Id unknown to [Context].
type UnknownIdError struct {
// Offending id decoded from Data.
Id Int
// Message received from the server.
Data string
}
func (e *UnknownIdError) Error() string { return "unknown proxy id " + strconv.Itoa(int(e.Id)) }
// UnsupportedOpcodeError describes a message with an unsupported opcode.
type UnsupportedOpcodeError struct {
// Offending opcode.
Opcode byte
// Name of interface processed by the proxy.
Interface string
}
func (e *UnsupportedOpcodeError) Error() string {
return "unsupported " + e.Interface + " opcode " + strconv.Itoa(int(e.Opcode))
}
// UnsupportedFooterOpcodeError describes a [Footer] with an unsupported opcode.
type UnsupportedFooterOpcodeError Id
func (e UnsupportedFooterOpcodeError) Error() string {
return "unsupported footer opcode " + strconv.Itoa(int(e))
}
// A RoundtripUnexpectedEOFError describes an unexpected EOF encountered during [Context.Roundtrip].
type RoundtripUnexpectedEOFError uintptr
const (
// ErrRoundtripEOFHeader is returned when unexpectedly encountering EOF
// decoding the message header.
ErrRoundtripEOFHeader RoundtripUnexpectedEOFError = iota
// ErrRoundtripEOFBody is returned when unexpectedly encountering EOF
// establishing message body bounds.
ErrRoundtripEOFBody
// ErrRoundtripEOFFooter is like [ErrRoundtripEOFBody], but for when establishing
// bounds for the footer instead.
ErrRoundtripEOFFooter
// ErrRoundtripEOFFooterOpcode is returned when unexpectedly encountering EOF
// during the footer opcode hack.
ErrRoundtripEOFFooterOpcode
)
func (RoundtripUnexpectedEOFError) Unwrap() error { return io.ErrUnexpectedEOF }
func (e RoundtripUnexpectedEOFError) Error() string {
var suffix string
switch e {
case ErrRoundtripEOFHeader:
suffix = "decoding message header"
case ErrRoundtripEOFBody:
suffix = "establishing message body bounds"
case ErrRoundtripEOFFooter:
suffix = "establishing message footer bounds"
case ErrRoundtripEOFFooterOpcode:
suffix = "decoding message footer opcode"
default:
return "unexpected EOF"
}
return "unexpected EOF " + suffix
}
// eventProxy consumes events during a [Context.Roundtrip].
type eventProxy interface {
// consume consumes an event and its optional footer.
consume(opcode byte, files []int, unmarshal func(v any)) error
// setBoundProps stores a [CoreBoundProps] event received from the server.
setBoundProps(event *CoreBoundProps) error
// remove is called when the proxy is removed for any reason, usually from
// being targeted by a [PW_CORE_EVENT_REMOVE_ID] event.
remove() error
// Stringer returns the PipeWire interface name.
fmt.Stringer
}
// unmarshal is like [Unmarshal] but handles footer if present.
func (ctx *Context) unmarshal(header *Header, data []byte, v any) error {
n, err := UnmarshalNext(data, v)
if err != nil {
return err
}
if len(data) < int(header.Size) || header.Size < n {
return ErrRoundtripEOFFooter
}
isLastMessage := len(data) == int(header.Size)
data = data[n:header.Size]
if len(data) > 0 {
/* the footer concrete type is determined by opcode, which cannot be
decoded directly before the type is known, so this hack is required:
skip the struct prefix, then the integer prefix, and the next SizeId
bytes are the encoded opcode value */
if len(data) < int(SizePrefix*2+SizeId) {
return ErrRoundtripEOFFooterOpcode
}
switch opcode := binary.NativeEndian.Uint32(data[SizePrefix*2:]); opcode {
case FOOTER_CORE_OPCODE_GENERATION:
var footer Footer[FooterCoreGeneration]
if err = Unmarshal(data, &footer); err != nil {
return err
}
if ctx.generation != footer.Payload.RegistryGeneration {
var pendingFooter = Footer[FooterClientGeneration]{
FOOTER_CORE_OPCODE_GENERATION,
FooterClientGeneration{ClientGeneration: footer.Payload.RegistryGeneration},
}
// this emulates upstream behaviour that pending footer updated on the last message
// during a roundtrip is pushed back to the first message of the next roundtrip
if isLastMessage {
ctx.deferredPendingFooter = &pendingFooter
} else {
ctx.pendingFooter = &pendingFooter
}
}
ctx.generation = footer.Payload.RegistryGeneration
return nil
default:
return UnsupportedFooterOpcodeError(opcode)
}
}
return nil
}
// An UnexpectedSequenceError is a server-side sequence number that does not
// match its counterpart tracked by the client. This indicates that either
// the client has somehow missed events, or data being interpreted as [Header]
// is, in fact, not the message header.
type UnexpectedSequenceError Int
func (e UnexpectedSequenceError) Error() string { return "unexpected seq " + strconv.Itoa(int(e)) }
// An UnexpectedFilesError describes an inconsistent state where file count claimed by
// [Header] accumulates to a value greater than the total number of files received.
type UnexpectedFilesError int
func (e UnexpectedFilesError) Error() string {
return "server message headers claim to have sent more files than actually received"
}
// A DanglingFilesError holds onto files that were sent by the server but no [Header]
// accounts for. These are closed by their finalizers if discarded.
type DanglingFilesError []*os.File
func (e DanglingFilesError) Error() string {
return "received " + strconv.Itoa(len(e)) + " dangling files"
}
// An UnacknowledgedProxyError holds newly allocated proxy ids that the server failed
// to acknowledge after an otherwise successful [Core.Sync].
type UnacknowledgedProxyError []Int
func (e UnacknowledgedProxyError) Error() string {
return "server did not acknowledge " + strconv.Itoa(len(e)) + " proxies"
}
// An UnacknowledgedProxyDestructionError holds destroyed proxy ids that the server failed
// to acknowledge after an otherwise successful [Core.Sync].
type UnacknowledgedProxyDestructionError []Int
func (e UnacknowledgedProxyDestructionError) Error() string {
return "server did not acknowledge " + strconv.Itoa(len(e)) + " proxy destructions"
}
// A ProxyFatalError describes an error that terminates event handling during a
// [Context.Roundtrip] and makes further event processing no longer possible.
type ProxyFatalError struct {
// The fatal error causing the termination of event processing.
Err error
// Previous non-fatal proxy errors.
ProxyErrs []error
}
func (e *ProxyFatalError) Unwrap() []error { return append(e.ProxyErrs, e.Err) }
func (e *ProxyFatalError) Error() string {
s := e.Err.Error()
if len(e.ProxyErrs) > 0 {
s += "; " + strconv.Itoa(len(e.ProxyErrs)) + " additional proxy errors occurred before this point"
}
return s
}
// A ProxyConsumeError is a collection of non-protocol errors returned by proxies
// during event processing. These do not prevent event handling from continuing but
// may be considered fatal to the application.
type ProxyConsumeError []error
func (e ProxyConsumeError) Unwrap() []error { return e }
func (e ProxyConsumeError) Error() string {
if len(e) == 0 {
return "invalid proxy consume error"
}
// first error is usually the most relevant one
s := e[0].Error()
if len(e) > 1 {
s += "; " + strconv.Itoa(len(e)) + " additional proxy errors occurred after this point"
}
return s
}
// cloneAsProxyErrors clones and truncates proxyErrors if it contains errors,
// returning the cloned slice.
func (ctx *Context) cloneAsProxyErrors() (proxyErrors ProxyConsumeError) {
if len(ctx.proxyErrors) == 0 {
return
}
proxyErrors = slices.Clone(ctx.proxyErrors)
ctx.proxyErrors = ctx.proxyErrors[:0]
return
}
// cloneProxyErrors is like cloneAsProxyErrors, but returns nil if proxyErrors
// does not contain errors.
func (ctx *Context) cloneProxyErrors() (err error) {
proxyErrors := ctx.cloneAsProxyErrors()
if len(proxyErrors) > 0 {
err = proxyErrors
}
return
}
// roundtripSyncID is the id passed to Context.coreSync during a [Context.Roundtrip].
const roundtripSyncID = 0
// Roundtrip sends all pending messages to the server and processes events until
// the server has no more messages.
//
// For a non-nil error, if the error happens over the network, it has concrete type
// [os.SyscallError].
func (ctx *Context) Roundtrip() (err error) {
err = ctx.roundtrip()
if err == nil {
err = ctx.cloneProxyErrors()
}
return
}
const (
// roundtripTimeout is the maximum duration socket operations during
// Context.roundtrip is allowed to block for.
roundtripTimeout = 5 * time.Second
)
// roundtrip implements the Roundtrip method without checking proxyErrors.
func (ctx *Context) roundtrip() (err error) {
ctx.conn.MightBlock(roundtripTimeout)
if err = ctx.sendmsg(ctx.buf, ctx.pendingFiles...); err != nil {
return
}
ctx.buf = ctx.buf[:0]
ctx.pendingFiles = ctx.pendingFiles[:0]
ctx.headerFiles = 0
defer func() {
var danglingFiles DanglingFilesError
if len(ctx.receivedFiles) > 0 {
// having multiple *os.File with the same fd causes serious problems
slices.Sort(ctx.receivedFiles)
ctx.receivedFiles = slices.Compact(ctx.receivedFiles)
danglingFiles = make(DanglingFilesError, 0, len(ctx.receivedFiles))
for _, fd := range ctx.receivedFiles {
// hold these as *os.File so they are closed if this error never reaches the caller,
// or the caller discards or otherwise does not handle this error, to avoid leaking fds
danglingFiles = append(danglingFiles, os.NewFile(uintptr(fd),
"dangling fd "+strconv.Itoa(fd)+" received from PipeWire"))
}
ctx.receivedFiles = ctx.receivedFiles[:0]
}
// populated early for finalizers, but does not overwrite existing errors
if len(danglingFiles) > 0 && err == nil {
ctx.closeReceivedFiles()
err = &ProxyFatalError{Err: danglingFiles, ProxyErrs: ctx.cloneAsProxyErrors()}
return
}
}()
var remaining []byte
ctx.conn.MightBlock(roundtripTimeout)
for {
remaining, err = ctx.consume(remaining)
if err == nil {
continue
}
// only returned by recvmsg
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
if len(remaining) == 0 {
err = nil
} else if len(remaining) < SizeHeader {
err = &ProxyFatalError{Err: ErrRoundtripEOFHeader, ProxyErrs: ctx.cloneAsProxyErrors()}
} else {
err = &ProxyFatalError{Err: ErrRoundtripEOFBody, ProxyErrs: ctx.cloneAsProxyErrors()}
}
}
return
}
}
// currentSeq returns the current sequence number.
// This must only be called immediately after queueing a message.
func (ctx *Context) currentSeq() Int { return ctx.sequence - 1 }
// currentRemoteSeq returns the current remote sequence number.
// This must only be called from eventProxy.consume.
func (ctx *Context) currentRemoteSeq() Int { return ctx.remoteSequence - 1 }
// consume receives messages from the server and processes events.
func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err error) {
defer func() {
r := recover()
if r == nil {
return
}
ctx.closeReceivedFiles()
recoveredErr, ok := r.(error)
if !ok {
panic(r)
}
if recoveredErr == nil {
panic(&runtime.PanicNilError{})
}
err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: ctx.cloneAsProxyErrors()}
return
}()
if remaining, err = ctx.recvmsg(receiveRemaining); err != nil {
return
}
var header Header
for len(remaining) > 0 {
if len(remaining) < SizeHeader {
return
}
if err = header.UnmarshalBinary(remaining[:SizeHeader]); err != nil {
return
}
if header.Sequence != ctx.remoteSequence {
return remaining, UnexpectedSequenceError(header.Sequence)
}
if len(remaining) < int(SizeHeader+header.Size) {
return
}
ctx.remoteSequence++
proxy, ok := ctx.proxy[header.ID]
if !ok {
return remaining, &UnknownIdError{header.ID, string(remaining[:SizeHeader+header.Size])}
}
fileCount := int(header.FileCount)
if fileCount > len(ctx.receivedFiles) {
return remaining, UnexpectedFilesError(fileCount)
}
files := ctx.receivedFiles[:fileCount]
ctx.receivedFiles = ctx.receivedFiles[fileCount:]
remaining = remaining[SizeHeader:]
proxyErr := proxy.consume(header.Opcode, files, func(v any) {
if unmarshalErr := ctx.unmarshal(&header, remaining, v); unmarshalErr != nil {
panic(unmarshalErr)
}
})
remaining = remaining[header.Size:]
if proxyErr != nil {
ctx.proxyErrors = append(ctx.proxyErrors, proxyErr)
}
}
return
}
// An UnexpectedFileCountError is returned as part of a [ProxyFatalError] for an event
// that received an unexpected number of files.
type UnexpectedFileCountError [2]int
func (e *UnexpectedFileCountError) Error() string {
return "received " + strconv.Itoa(e[1]) + " files instead of the expected " + strconv.Itoa(e[0])
}
// closeReceivedFiles closes all received files and panics with [UnexpectedFileCountError]
// if one or more files are passed. This is used with events that do not expect files.
func closeReceivedFiles(fds ...int) {
for _, fd := range fds {
_ = syscall.Close(fd)
}
if len(fds) > 0 {
panic(&UnexpectedFileCountError{0, len(fds)})
}
}
// doSyncComplete calls syncComplete functions and collects their errors alongside errors
// cloned from proxyErrors. A panic is translated into ProxyFatalError.
func (ctx *Context) doSyncComplete() (err error) {
proxyErrors := ctx.cloneAsProxyErrors()
defer func() {
r := recover()
if r == nil {
return
}
ctx.closeReceivedFiles()
recoveredErr, ok := r.(error)
if !ok {
panic(r)
}
if recoveredErr == nil {
panic(&runtime.PanicNilError{})
}
err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: proxyErrors}
return
}()
for _, f := range ctx.syncComplete {
if scErr := f(); scErr != nil {
proxyErrors = append(proxyErrors, scErr)
}
}
ctx.syncComplete = ctx.syncComplete[:0]
if len(proxyErrors) > 0 {
err = proxyErrors
}
return
}
// Close frees the underlying buffer and closes the connection.
func (ctx *Context) Close() (err error) {
ctx.free()
err = ctx.doSyncComplete()
closeErr := ctx.conn.Close()
if closeErr != nil {
if err == nil {
return closeErr
} else if proxyErrors, ok := err.(ProxyConsumeError); ok {
return &ProxyFatalError{Err: err, ProxyErrs: proxyErrors}
} else {
return
}
} else {
return err
}
}
// expectsCoreError returns a function that inspects an error value and
// returns the address of a [CoreError] if it is the only error present
// and targets the specified proxy and sequence.
//
// The behaviour of expectsCoreError is only correct for an empty buf
// prior to calling. If buf is not empty, [Core.Sync] is called, with
// its return value stored to the value pointed to by errP if not nil,
// and the function is not populated.
//
// The caller must queue a message and call [Core.Sync] immediately
// after calling expectsCoreError.
func (ctx *Context) expectsCoreError(id Int, errP *error) (asCoreError func() (coreError *CoreError)) {
if len(ctx.buf) > 0 {
if err := ctx.GetCore().Sync(); err != nil {
*errP = err
return nil
}
}
sequence := ctx.sequence
return func() (coreError *CoreError) {
if proxyErrors, ok := (*errP).(ProxyConsumeError); !ok ||
len(proxyErrors) != 1 ||
!errors.As(proxyErrors[0], &coreError) ||
coreError == nil ||
coreError.ID != id ||
coreError.Sequence != sequence {
// do not return a non-matching CoreError
coreError = nil
}
return
}
}
// A PermissionError describes an error emitted by the server when trying to
// perform an operation that the client has no permission for.
type PermissionError struct {
// The id of the resource (proxy if emitted by the client) that is in error.
ID Int `json:"id"`
// An error message.
Message string `json:"message"`
}
func (*PermissionError) Unwrap() error { return syscall.EPERM }
func (e *PermissionError) Error() string { return e.Message }
// Remote is the environment (sic) with the remote name.
const Remote = "PIPEWIRE_REMOTE"
/* modules/module-protocol-native/local-socket.c */
const DEFAULT_SYSTEM_RUNTIME_DIR = "/run/pipewire"
// connectName connects to a PipeWire remote by name and returns the [net.UnixConn].
func connectName(name string, manager bool) (conn *net.UnixConn, err error) {
if manager && !strings.HasSuffix(name, "-manager") {
return connectName(name+"-manager", false)
}
if path.IsAbs(name) || (len(name) > 0 && name[0] == '@') {
return net.DialUnix("unix", nil, &net.UnixAddr{Name: name, Net: "unix"})
} else {
runtimeDir, ok := os.LookupEnv("PIPEWIRE_RUNTIME_DIR")
if !ok || !path.IsAbs(runtimeDir) {
runtimeDir, ok = os.LookupEnv("XDG_RUNTIME_DIR")
}
if !ok || !path.IsAbs(runtimeDir) {
// this is cargo culted from windows stuff and has no effect normally;
// keeping it to maintain compatibility in case someone sets this
runtimeDir, ok = os.LookupEnv("USERPROFILE")
}
if !ok || !path.IsAbs(runtimeDir) {
runtimeDir = DEFAULT_SYSTEM_RUNTIME_DIR
}
return net.DialUnix("unix", nil, &net.UnixAddr{Name: path.Join(runtimeDir, name), Net: "unix"})
}
}
// ConnectName connects to a PipeWire remote by name.
func ConnectName(name string, manager bool, props SPADict) (ctx *Context, err error) {
if manager {
props = append(props, SPADictItem{Key: PW_KEY_REMOTE_INTENTION, Value: "manager"})
}
if name == "" {
if v, ok := os.LookupEnv(Remote); !ok || v == "" {
name = PW_DEFAULT_REMOTE
} else {
name = v
}
}
var unixConn *net.UnixConn
if unixConn, err = connectName(name, manager); err != nil {
return
}
conn := &SyscallConn{SyscallConnCloser: unixConn}
if ctx, err = New(conn, props); err != nil {
ctx = nil
_ = conn.Close()
}
return
}
// Connect connects to the PipeWire remote.
func Connect(manager bool, props SPADict) (ctx *Context, err error) {
return ConnectName("", manager, props)
}