From af741f20a003d48863eb5e9f7cf71052abd4116d Mon Sep 17 00:00:00 2001 From: Ophestra Date: Tue, 2 Dec 2025 06:03:21 +0900 Subject: [PATCH] internal/pipewire: implement client context This consumes the entire sample, is validated to send identical messages and correctly handle received messages. Signed-off-by: Ophestra --- internal/pipewire/client.go | 42 ++ internal/pipewire/core.go | 235 ++++++++ internal/pipewire/core_test.go | 4 +- internal/pipewire/header.go | 6 +- internal/pipewire/pipewire.go | 501 +++++++++++++++++ internal/pipewire/pipewire_test.go | 805 +++++++++++++++++++++++++++ internal/pipewire/securitycontext.go | 62 +++ 7 files changed, 1651 insertions(+), 4 deletions(-) diff --git a/internal/pipewire/client.go b/internal/pipewire/client.go index cf66647..7c619ca 100644 --- a/internal/pipewire/client.go +++ b/internal/pipewire/client.go @@ -74,3 +74,45 @@ func (c *ClientUpdateProperties) MarshalBinary() ([]byte, error) { return Marsha // UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. func (c *ClientUpdateProperties) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } + +// clientUpdateProperties queues a [ClientUpdateProperties] message for the PipeWire server. +// This method should not be called directly, the New function queues this message. +func (ctx *Context) clientUpdateProperties(props SPADict) error { + return ctx.writeMessage( + PW_ID_CLIENT, + PW_CLIENT_METHOD_UPDATE_PROPERTIES, + &ClientUpdateProperties{&props}, + ) +} + +// Client holds state of [PW_TYPE_INTERFACE_Client]. +type Client struct { + // Additional information from the server, populated or updated during [Context.Roundtrip]. + Info *ClientInfo `json:"info"` + + // Populated by [CoreBoundProps] events targeting [Client]. + Properties SPADict `json:"props"` +} + +func (client *Client) consume(opcode byte, files []int, unmarshal func(v any) error) error { + if err := closeReceivedFiles(files...); err != nil { + return err + } + + switch opcode { + case PW_CLIENT_EVENT_INFO: + return unmarshal(&client.Info) + + default: + return &UnsupportedOpcodeError{opcode, client.String()} + } +} + +func (client *Client) setBoundProps(event *CoreBoundProps) error { + if event.Properties != nil { + client.Properties = *event.Properties + } + return nil +} + +func (client *Client) String() string { return PW_TYPE_INTERFACE_Registry } diff --git a/internal/pipewire/core.go b/internal/pipewire/core.go index 4b61972..fe7fb78 100644 --- a/internal/pipewire/core.go +++ b/internal/pipewire/core.go @@ -1,5 +1,11 @@ package pipewire +import ( + "errors" + "fmt" + "strconv" +) + /* pipewire/core.h */ const ( @@ -240,6 +246,37 @@ func (c *CoreBoundProps) MarshalBinary() ([]byte, error) { return Marshal(c) } // UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. func (c *CoreBoundProps) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } +// ErrBadBoundProps is returned when a [CoreBoundProps] event targeting a proxy +// that should never be targeted is received and processed. +var ErrBadBoundProps = errors.New("attempted to store bound props on proxy that should never be targeted") + +// noAck is embedded by proxies that are never targeted by [CoreBoundProps]. +type noAck struct{} + +// setBoundProps should never be called as this proxy should never be targeted by [CoreBoundProps]. +func (noAck) setBoundProps(*CoreBoundProps) error { return ErrBadBoundProps } + +// An InconsistentIdError describes an inconsistent state where the server claims an impossible +// proxy or global id. This is only generated by the [CoreBoundProps] event. +type InconsistentIdError struct { + // Whether the inconsistent id is the global resource id. + Global bool + // Targeted proxy instance. + Proxy fmt.Stringer + // Differing ids. + ID, ServerID Int +} + +func (e *InconsistentIdError) Error() string { + name := "proxy" + if e.Global { + name = "global" + } + + return name + " id " + strconv.Itoa(int(e.ID)) + " targeting " + e.Proxy.String() + + " inconsistent with " + strconv.Itoa(int(e.ServerID)) + " claimed by the server" +} + // CoreHello is the first message sent by a client. type CoreHello struct { // The version number of the client, usually PW_VERSION_CORE. @@ -255,6 +292,16 @@ func (c *CoreHello) MarshalBinary() ([]byte, error) { return Marshal(c) } // UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. func (c *CoreHello) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } +// coreHello queues a [CoreHello] message for the PipeWire server. +// This method should not be called directly, the New function queues this message. +func (ctx *Context) coreHello() error { + return ctx.writeMessage( + PW_ID_CORE, + PW_CORE_METHOD_HELLO, + &CoreHello{PW_VERSION_CORE}, + ) +} + const ( // CoreSyncSequenceOffset is the offset to [Header.Sequence] to produce [CoreSync.Sequence]. CoreSyncSequenceOffset = 0x40000000 @@ -279,6 +326,35 @@ func (c *CoreSync) MarshalBinary() ([]byte, error) { return Marshal(c) } // UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. func (c *CoreSync) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } +// coreSync queues a [CoreSync] message for the PipeWire server. +// This is not safe to use directly, callers should use Sync instead. +func (ctx *Context) coreSync(id Int) error { + return ctx.writeMessage( + PW_ID_CORE, + PW_CORE_METHOD_SYNC, + &CoreSync{id, CoreSyncSequenceOffset + Int(ctx.sequence)}, + ) +} + +// ErrNotDone is returned if [Core.Sync] returns from its [Context.Roundtrip] without +// receiving a [CoreDone] event targeting the [CoreSync] event it delivered. +var ErrNotDone = errors.New("did not receive a Core::Done event targeting previously delivered Core::Sync") + +// Sync queues a [CoreSync] message for the PipeWire server and initiates a Roundtrip. +func (core *Core) Sync() error { + core.done = false + if err := core.ctx.coreSync(roundtripSyncID); err != nil { + return err + } + if err := core.ctx.Roundtrip(); err != nil { + return err + } + if !core.done { + return ErrNotDone + } + return nil +} + // The CorePong message is sent from the client to the server when the server emits the Ping event. type CorePong struct { // Copied from [CorePing.ID]. @@ -320,6 +396,19 @@ func (c *CoreGetRegistry) MarshalBinary() ([]byte, error) { return Marshal(c) } // UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. func (c *CoreGetRegistry) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } +// GetRegistry queues a [CoreGetRegistry] message for the PipeWire server +// and returns the address of the newly allocated [Registry]. +func (ctx *Context) GetRegistry() (*Registry, error) { + registry := Registry{Objects: make(map[Int]RegistryGlobal), ctx: ctx} + newId := ctx.newProxyId(®istry, false) + registry.ID = newId + return ®istry, ctx.writeMessage( + PW_ID_CORE, + PW_CORE_METHOD_GET_REGISTRY, + &CoreGetRegistry{PW_VERSION_REGISTRY, newId}, + ) +} + // A RegistryGlobal event is emitted to notify a client about a new global object. type RegistryGlobal struct { // The global id. @@ -379,3 +468,149 @@ func (c *RegistryBind) MarshalBinary() ([]byte, error) { return Marshal(c) } // UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. func (c *RegistryBind) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } + +// bind queues a [RegistryBind] message for the PipeWire server +// and returns the newly allocated proxy id. +func (registry *Registry) bind(proxy eventProxy, id, version Int) (Int, error) { + bind := RegistryBind{ + ID: id, + Type: proxy.String(), + Version: version, + NewID: registry.ctx.newProxyId(proxy, true), + } + return bind.NewID, registry.ctx.writeMessage( + registry.ID, + PW_REGISTRY_METHOD_BIND, + &bind, + ) +} + +// An UnsupportedObjectTypeError is the name of a type not known by the server [Registry]. +type UnsupportedObjectTypeError string + +func (e UnsupportedObjectTypeError) Error() string { return "unsupported object type " + string(e) } + +// Core holds state of [PW_TYPE_INTERFACE_Core]. +type Core struct { + // Additional information from the server, populated or updated during [Context.Roundtrip]. + Info *CoreInfo `json:"info"` + + // Whether a [CoreDone] event was received during Sync. + done bool + + ctx *Context + noAck +} + +// ErrUnexpectedDone is a [CoreDone] event with unexpected values. +var ErrUnexpectedDone = errors.New("multiple Core::Done events targeting Core::Sync") + +// An UnknownBoundIdError describes the server claiming to have bound a proxy id that was never allocated. +type UnknownBoundIdError[E any] struct { + // Offending id decoded from Data. + Id Int + // Event received from the server. + Event E +} + +func (e *UnknownBoundIdError[E]) Error() string { + return "unknown bound proxy id " + strconv.Itoa(int(e.Id)) +} + +func (core *Core) consume(opcode byte, files []int, unmarshal func(v any) error) error { + if err := closeReceivedFiles(files...); err != nil { + return err + } + + switch opcode { + case PW_CORE_EVENT_INFO: + return unmarshal(&core.Info) + + case PW_CORE_EVENT_DONE: + var done CoreDone + if err := unmarshal(&done); err != nil { + return err + } + if done.ID == roundtripSyncID && done.Sequence == CoreSyncSequenceOffset+core.ctx.sequence-1 { + if core.done { + return ErrUnexpectedDone + } + core.done = true + } + + // silently ignore non-matching events because the server sends out + // an event with id -1 seq 0 that does not appear to correspond to + // anything, and this behaviour is never mentioned in documentation + return nil + + case PW_CORE_EVENT_BOUND_PROPS: + var boundProps CoreBoundProps + if err := unmarshal(&boundProps); err != nil { + return err + } + + delete(core.ctx.pendingIds, boundProps.ID) + proxy, ok := core.ctx.proxy[boundProps.ID] + if !ok { + return &UnknownBoundIdError[*CoreBoundProps]{Id: boundProps.ID, Event: &boundProps} + } + return proxy.setBoundProps(&boundProps) + + default: + return &UnsupportedOpcodeError{opcode, core.String()} + } +} + +func (core *Core) String() string { return PW_TYPE_INTERFACE_Core } + +// Registry holds state of [PW_TYPE_INTERFACE_Registry]. +type Registry struct { + // Proxy id as tracked by [Context]. + ID Int `json:"proxy_id"` + + // Global objects received via the [RegistryGlobal] event. + // + // This requires more processing before it can be used, but is not implemented + // as it is not used by Hakurei. + Objects map[Int]RegistryGlobal `json:"objects"` + + ctx *Context + noAck +} + +// A GlobalIDCollisionError describes a [RegistryGlobal] event stepping on a previous instance of itself. +type GlobalIDCollisionError struct { + // The colliding id. + ID Int + // Involved events. + Previous, Current *RegistryGlobal +} + +func (e *GlobalIDCollisionError) Error() string { + return "new Registry::Global event for " + e.Current.Type + + " stepping on previous id " + strconv.Itoa(int(e.ID)) + " for " + e.Previous.Type +} + +func (registry *Registry) consume(opcode byte, files []int, unmarshal func(v any) error) error { + if err := closeReceivedFiles(files...); err != nil { + return err + } + + switch opcode { + case PW_REGISTRY_EVENT_GLOBAL: + var global RegistryGlobal + if err := unmarshal(&global); err != nil { + return err + } + if object, ok := registry.Objects[global.ID]; ok { + return &GlobalIDCollisionError{global.ID, &object, &global} + } + registry.Objects[global.ID] = global + return nil + + default: + return &UnsupportedOpcodeError{opcode, registry.String()} + } +} + +func (registry *Registry) String() string { return PW_TYPE_INTERFACE_Registry } diff --git a/internal/pipewire/core_test.go b/internal/pipewire/core_test.go index 9771eff..9d19be4 100644 --- a/internal/pipewire/core_test.go +++ b/internal/pipewire/core_test.go @@ -23,6 +23,7 @@ func TestFooterCoreGeneration(t *testing.T) { Payload: pipewire.FooterCoreGeneration{RegistryGeneration: 0x23}, }, nil}, + // happens on the last message, client footer sent in the next roundtrip {"sample2", samplePWContainer[1][42][2], pipewire.Footer[pipewire.FooterCoreGeneration]{ Opcode: pipewire.FOOTER_CORE_OPCODE_GENERATION, Payload: pipewire.FooterCoreGeneration{RegistryGeneration: 0x24}, @@ -35,13 +36,14 @@ func TestFooterCoreGeneration(t *testing.T) { {"sample0", samplePWContainer[3][0][2], pipewire.Footer[pipewire.FooterClientGeneration]{ Opcode: pipewire.FOOTER_CORE_OPCODE_GENERATION, - // why does this not match FooterCoreGeneration sample2? + // triggered by difference in sample1, sample0 is overwritten in the same roundtrip Payload: pipewire.FooterClientGeneration{ClientGeneration: 0x23}, }, nil}, /* sendmsg 2 */ {"sample1", samplePWContainer[6][0][2], pipewire.Footer[pipewire.FooterClientGeneration]{ + // triggered by difference in sample2, last footer in the previous roundtrip Opcode: pipewire.FOOTER_CORE_OPCODE_GENERATION, Payload: pipewire.FooterClientGeneration{ClientGeneration: 0x24}, }, nil}, diff --git a/internal/pipewire/header.go b/internal/pipewire/header.go index 40ea080..3aecfb7 100644 --- a/internal/pipewire/header.go +++ b/internal/pipewire/header.go @@ -31,7 +31,7 @@ type Header struct { // An increasing sequence number for each message. Sequence Int `json:"seq"` // Number of file descriptors in this message. - FileCount Word `json:"n_fds"` + FileCount Int `json:"n_fds"` } // append appends the protocol native message header to data. @@ -41,7 +41,7 @@ func (h *Header) append(data []byte) []byte { data = binary.NativeEndian.AppendUint32(data, Word(h.ID)) data = binary.NativeEndian.AppendUint32(data, Word(h.Opcode)<<24|h.Size) data = binary.NativeEndian.AppendUint32(data, Word(h.Sequence)) - data = binary.NativeEndian.AppendUint32(data, h.FileCount) + data = binary.NativeEndian.AppendUint32(data, Word(h.FileCount)) return data } @@ -60,7 +60,7 @@ func (h *Header) unmarshalBinary(data [SizeHeader]byte) { h.Opcode = byte(h.Size >> 24) h.Size &= SizeMax h.Sequence = Int(binary.NativeEndian.Uint32(data[8:])) - h.FileCount = binary.NativeEndian.Uint32(data[12:]) + h.FileCount = Int(binary.NativeEndian.Uint32(data[12:])) } // UnmarshalBinary decodes the protocol native message header. diff --git a/internal/pipewire/pipewire.go b/internal/pipewire/pipewire.go index cee5c86..b537d39 100644 --- a/internal/pipewire/pipewire.go +++ b/internal/pipewire/pipewire.go @@ -14,6 +14,507 @@ // for any other uses of the protocol. package pipewire +import ( + "encoding/binary" + "fmt" + "io" + "net" + "slices" + "strconv" + "syscall" + "time" +) + +// Conn is a subset of methods of [net.UnixConn] used by [Context]. +type Conn interface { + // ReadMsgUnix reads a message from c, copying the payload into b and + // the associated out-of-band data into oob. It returns the number of + // bytes copied into b, the number of bytes copied into oob, the flags + // that were set on the message and the source address of the message. + // + // Note that if len(b) == 0 and len(oob) > 0, this function will still + // read (and discard) 1 byte from the connection. + ReadMsgUnix(b, oob []byte) (n, oobn, flags int, addr *net.UnixAddr, err error) + + // WriteMsgUnix writes a message to addr via c, copying the payload + // from b and the associated out-of-band data from oob. It returns the + // number of payload and out-of-band bytes written. + // + // Note that if len(b) == 0 and len(oob) > 0, this function will still + // write 1 byte to the connection. + WriteMsgUnix(b, oob []byte, addr *net.UnixAddr) (n, oobn int, err error) + + // SetDeadline sets the read and write deadlines associated + // with the connection. It is equivalent to calling both + // SetReadDeadline and SetWriteDeadline. + // + // A deadline is an absolute time after which I/O operations + // fail instead of blocking. The deadline applies to all future + // and pending I/O, not just the immediately following call to + // Read or Write. After a deadline has been exceeded, the + // connection can be refreshed by setting a deadline in the future. + // + // If the deadline is exceeded a call to Read or Write or to other + // I/O methods will return an error that wraps os.ErrDeadlineExceeded. + // This can be tested using errors.Is(err, os.ErrDeadlineExceeded). + // The error's Timeout method will return true, but note that there + // are other possible errors for which the Timeout method will + // return true even if the deadline has not been exceeded. + // + // An idle timeout can be implemented by repeatedly extending + // the deadline after successful Read or Write calls. + // + // A zero value for t means I/O operations will not time out. + SetDeadline(t time.Time) error + + // Close closes the connection. + // Any blocked Read or Write operations will be unblocked and return errors. + Close() error +} + +// The kernel constant SCM_MAX_FD defines a limit on the number of file descriptors in the array. +// Attempting to send an array larger than this limit causes sendmsg(2) to fail with the error +// EINVAL. SCM_MAX_FD has the value 253 (or 255 before Linux 2.6.38). +const _SCM_MAX_FD = 253 + +// A Context holds state of a connection to PipeWire. +type Context struct { + // Pending message data, committed via a call to Roundtrip. + buf []byte + // Current [Header.Sequence] value, incremented every write. + sequence Int + // Current server-side [Header.Sequence] value, incremented on every event processed. + remoteSequence Int + // Proxy id associations. + proxy map[Int]eventProxy + // Newly allocated proxies pending acknowledgement from the server. + pendingIds map[Int]struct{} + // Smallest available Id for the next proxy. + nextId Int + // Server side registry generation number. + generation Long + // Pending file descriptors to be sent with the next message. + pendingFiles []int + // File count kept track of in [Header]. + headerFiles int + // Files from the server. This is discarded on every Roundtrip so eventProxy + // implementations must make sure to close them to avoid leaking fds. + receivedFiles []int + // Pending footer value for the next outgoing message. + // Newer footers appear to simply replace the existing one. + pendingFooter KnownSize + // Pending footer value deferred to the next round trip, + // sent if pendingFooter is nil. This is for emulating upstream behaviour + deferredPendingFooter KnownSize + // Proxy for built-in core events. + core Core + // Proxy for built-in client events. + client Client + + // Passed to [Conn.ReadMsgUnix]. Not copied if sufficient for all received messages. + iovecBuf [1 << 15]byte + // Passed to [Conn.ReadMsgUnix] for ancillary messages and is never copied. + oobBuf [(_SCM_MAX_FD/2+_SCM_MAX_FD%2+2)<<3 + 1]byte + // Underlying connection, usually implemented by [net.UnixConn]. + conn Conn +} + +// GetCore returns the address of [Core] held by this [Context]. +func (ctx *Context) GetCore() *Core { return &ctx.core } + +// GetClient returns the address of [Client] held by this [Context]. +func (ctx *Context) GetClient() *Client { return &ctx.client } + +// New initialises [Context] for an already established connection and returns its address. +// The caller must not call any method of the underlying [Conn] after this function returns. +func New(conn Conn, props SPADict) (*Context, error) { + ctx := Context{conn: conn} + ctx.core.ctx = &ctx + ctx.proxy = map[Int]eventProxy{ + PW_ID_CORE: &ctx.core, + PW_ID_CLIENT: &ctx.client, + } + ctx.pendingIds = map[Int]struct{}{ + PW_ID_CLIENT: {}, + } + ctx.nextId = Int(len(ctx.proxy)) + + if err := ctx.coreHello(); err != nil { + return nil, err + } + if err := ctx.clientUpdateProperties(props); err != nil { + return nil, err + } + + return &ctx, nil +} + +// MustNew calls [New](conn, props) and panics on error. +// It is intended for use in tests with hard-coded strings. +func MustNew(conn Conn, props SPADict) *Context { + if ctx, err := New(conn, props); err != nil { + panic(err) + } else { + return ctx + } +} + +// free releases the underlying storage of buf. +func (ctx *Context) free() { ctx.buf = make([]byte, 0) } + +// queueFiles queues some file descriptors to be sent for the next message. +// It returns the offset of their index for the syscall.SCM_RIGHTS message. +func (ctx *Context) queueFiles(fds ...int) (offset Fd) { + offset = Fd(len(ctx.pendingFiles)) + ctx.pendingFiles = append(ctx.pendingFiles, fds...) + return +} + +// writeMessage appends the POD representation of v and an optional footer to buf. +func (ctx *Context) writeMessage( + Id Int, opcode byte, + v KnownSize, +) (err error) { + if ctx.pendingFooter == nil && ctx.deferredPendingFooter != nil { + ctx.pendingFooter, ctx.deferredPendingFooter = ctx.deferredPendingFooter, nil + } + + size := v.Size() + if ctx.pendingFooter != nil { + size += ctx.pendingFooter.Size() + } + if size&^SizeMax != 0 { + return ErrSizeRange + } + + ctx.buf = slices.Grow(ctx.buf, int(SizeHeader+size)) + ctx.buf = (&Header{ + ID: Id, Opcode: opcode, Size: size, + Sequence: ctx.sequence, + FileCount: Int(len(ctx.pendingFiles) - ctx.headerFiles), + }).append(ctx.buf) + ctx.headerFiles = len(ctx.pendingFiles) + ctx.buf, err = MarshalAppend(ctx.buf, v) + if err == nil && ctx.pendingFooter != nil { + ctx.buf, err = MarshalAppend(ctx.buf, ctx.pendingFooter) + ctx.pendingFooter = nil + } + ctx.sequence++ + return +} + +// newProxyId returns a newly allocated proxy Id for the specified type. +func (ctx *Context) newProxyId(proxy eventProxy, ack bool) Int { + newId := ctx.nextId + ctx.proxy[newId] = proxy + if ack { + ctx.pendingIds[newId] = struct{}{} + } + +increment: + ctx.nextId++ + + if _, ok := ctx.proxy[ctx.nextId]; ok { + goto increment + } + return newId +} + +// connTimeout is the maximum duration an I/O operation is allowed for [Conn]. +const connTimeout = 5 * time.Second + +// receiveAll receives from conn until no more data is available. +// The returned slice is valid until the next call to receiveAll. +func (ctx *Context) receiveAll() (payload []byte, err error) { + if err = ctx.conn.SetDeadline(time.Now().Add(connTimeout)); err != nil { + return + } + + var n, oobn int + ctx.receivedFiles = ctx.receivedFiles[:0] + buf := ctx.iovecBuf[:] + +recvmsg: + buf = buf[n:] + n, oobn, _, _, err = ctx.conn.ReadMsgUnix(buf, ctx.oobBuf[:]) + if err != nil { + return + } + if oobn == len(ctx.oobBuf) { + return nil, syscall.ENOMEM // unreachable + } + if oob := ctx.oobBuf[:oobn]; len(oob) > 0 { + var scm []syscall.SocketControlMessage + if scm, err = syscall.ParseSocketControlMessage(oob); err != nil { + return + } + + var fds []int + for i := range scm { + if fds, err = syscall.ParseUnixRights(&scm[i]); err != nil { + return + } + ctx.receivedFiles = append(ctx.receivedFiles, fds...) + } + } + + // receive until buffer fills or payload is depleted + if n > 0 { + goto recvmsg + } + data := ctx.iovecBuf[:len(ctx.iovecBuf)-len(buf)] + + // avoids copy if payload fits in a single ctx.recvmsgBuf + if payload == nil && len(buf) > 0 { + payload = data + return + } + + payload = append(payload, data...) + // this indicates a full ctx.recvmsgBuf + if len(buf) == 0 { + ctx.buf = ctx.iovecBuf[:] + goto recvmsg + } + + return +} + +// An UnknownIdError describes a server message with an Id unknown to [Context]. +type UnknownIdError struct { + // Offending id decoded from Data. + Id Int + // Message received from the server. + Data string +} + +func (e *UnknownIdError) Error() string { return "unknown proxy id " + strconv.Itoa(int(e.Id)) } + +// UnsupportedOpcodeError describes a message with an unsupported opcode. +type UnsupportedOpcodeError struct { + // Offending opcode. + Opcode byte + // Name of interface processed by the proxy. + Interface string +} + +func (e *UnsupportedOpcodeError) Error() string { + return "unsupported " + e.Interface + " opcode " + strconv.Itoa(int(e.Opcode)) +} + +// UnsupportedFooterOpcodeError describes a [Footer] with an unsupported opcode. +type UnsupportedFooterOpcodeError Id + +func (e UnsupportedFooterOpcodeError) Error() string { + return "unsupported footer opcode " + strconv.Itoa(int(e)) +} + +// RoundtripUnexpectedEOFError is returned when EOF was unexpectedly encountered during [Context.Roundtrip]. +type RoundtripUnexpectedEOFError uintptr + +const ( + roundtripEOFHeader RoundtripUnexpectedEOFError = iota + roundtripEOFBody + roundtripEOFFooter + roundtripEOFFooterOpcode +) + +func (RoundtripUnexpectedEOFError) Unwrap() error { return io.ErrUnexpectedEOF } +func (e RoundtripUnexpectedEOFError) Error() string { + var suffix string + switch e { + case roundtripEOFHeader: + suffix = "decoding message header" + case roundtripEOFBody: + suffix = "establishing message body bounds" + case roundtripEOFFooter: + suffix = "establishing message footer bounds" + case roundtripEOFFooterOpcode: + suffix = "decoding message footer opcode" + + default: + return "unexpected EOF" + } + + return "unexpected EOF " + suffix +} + +// eventProxy consumes events during a [Context.Roundtrip]. +type eventProxy interface { + // consume consumes an event and its optional footer. + consume(opcode byte, files []int, unmarshal func(v any) error) error + // setBoundProps stores a [CoreBoundProps] event received from the server. + setBoundProps(event *CoreBoundProps) error + + // Stringer returns the PipeWire interface name. + fmt.Stringer +} + +// unmarshal is like [Unmarshal] but handles footer if present. +func (ctx *Context) unmarshal(header *Header, data []byte, v any) error { + n, err := UnmarshalNext(data, v) + if err != nil { + return err + } + if len(data) < int(header.Size) || header.Size < n { + return roundtripEOFFooter + } + isLastMessage := len(data) == int(header.Size) + + data = data[n:header.Size] + if len(data) > 0 { + /* the footer concrete type is determined by opcode, which cannot be + decoded directly before the type is known, so this hack is required: + skip the struct prefix, then the integer prefix, and the next SizeId + bytes are the encoded opcode value */ + if len(data) < int(SizePrefix*2+SizeId) { + return roundtripEOFFooterOpcode + } + switch opcode := binary.NativeEndian.Uint32(data[SizePrefix*2:]); opcode { + case FOOTER_CORE_OPCODE_GENERATION: + var footer Footer[FooterCoreGeneration] + if err = Unmarshal(data, &footer); err != nil { + return err + } + if ctx.generation != footer.Payload.RegistryGeneration { + var pendingFooter = Footer[FooterClientGeneration]{ + FOOTER_CORE_OPCODE_GENERATION, + FooterClientGeneration{ClientGeneration: footer.Payload.RegistryGeneration}, + } + + // this emulates upstream behaviour that pending footer updated on the last message + // during a roundtrip is pushed back to the first message of the next roundtrip + if isLastMessage { + ctx.deferredPendingFooter = &pendingFooter + } else { + ctx.pendingFooter = &pendingFooter + } + } + ctx.generation = footer.Payload.RegistryGeneration + return nil + + default: + return UnsupportedFooterOpcodeError(opcode) + } + } + return nil +} + +// An UnexpectedSequenceError is a server-side sequence number that does not +// match its counterpart tracked by the client. This indicates that either +// the client has somehow missed events, or data being interpreted as [Header] +// is, in fact, not the message header. +type UnexpectedSequenceError Int + +func (e UnexpectedSequenceError) Error() string { return "unexpected seq " + strconv.Itoa(int(e)) } + +// An UnexpectedFilesError describes an inconsistent state where file count claimed by +// [Header] accumulates to a value greater than the total number of files received. +type UnexpectedFilesError int + +func (e UnexpectedFilesError) Error() string { + return "server message headers claim to have sent more than " + strconv.Itoa(int(e)) + " files" +} + +// A DanglingFilesError holds onto files that were sent by the server but no [Header] +// accounts for. These must be closed to avoid leaking fds. +type DanglingFilesError []int + +func (e DanglingFilesError) Error() string { + return "received " + strconv.Itoa(len(e)) + " dangling files" +} + +// roundtripSyncID is the id passed to Context.coreSync during a [Context.Roundtrip]. +const roundtripSyncID = 0 + +// Roundtrip queues the [CoreSync] message and sends all pending messages to the server. +// +// For a non-nil error, if the error happens over the network, it has concrete type +// [net.OpError]. +func (ctx *Context) Roundtrip() (err error) { + if err = ctx.conn.SetDeadline(time.Now().Add(connTimeout)); err != nil { + return + } + if _, _, err = ctx.conn.WriteMsgUnix(ctx.buf, syscall.UnixRights(ctx.pendingFiles...), nil); err != nil { + return + } + ctx.buf = ctx.buf[:0] + ctx.pendingFiles = ctx.pendingFiles[:0] + ctx.headerFiles = 0 + + var data []byte + if data, err = ctx.receiveAll(); err != nil { + return + } + + var header Header + var receivedHeaderFiles int + for len(data) > 0 { + if len(data) < SizeHeader { + return roundtripEOFHeader + } + + if err = header.UnmarshalBinary(data[:SizeHeader]); err != nil { + return + } + if header.Sequence != ctx.remoteSequence { + return UnexpectedSequenceError(header.Sequence) + } + ctx.remoteSequence++ + + if len(data) < int(SizeHeader+header.Size) { + return roundtripEOFBody + } + + proxy, ok := ctx.proxy[header.ID] + if !ok { + return &UnknownIdError{header.ID, string(data[:SizeHeader+header.Size])} + } + + nextReceivedHeaderFiles := receivedHeaderFiles + int(header.FileCount) + if nextReceivedHeaderFiles > len(ctx.receivedFiles) { + return UnexpectedFilesError(len(ctx.receivedFiles)) + } + files := ctx.receivedFiles[receivedHeaderFiles:nextReceivedHeaderFiles] + receivedHeaderFiles = nextReceivedHeaderFiles + + data = data[SizeHeader:] + err = proxy.consume(header.Opcode, files, func(v any) error { return ctx.unmarshal(&header, data, v) }) + data = data[header.Size:] + if err != nil { + return + } + } + + if len(ctx.receivedFiles) < receivedHeaderFiles { + return DanglingFilesError(ctx.receivedFiles[len(ctx.receivedFiles)-receivedHeaderFiles:]) + } + return +} + +// An UnexpectedFileCountError is returned for an event that received an unexpected +// number of files. The proxy closes these extra files before returning +type UnexpectedFileCountError [2]int + +func (e *UnexpectedFileCountError) Error() string { + return "received " + strconv.Itoa(e[1]) + " files instead of the expected " + strconv.Itoa(e[0]) +} + +// closeReceivedFiles closes all received files and returns [UnexpectedFileCountError] +// if one or more files are passed. This is used with events that do not expect files. +func closeReceivedFiles(fds ...int) error { + for _, fd := range fds { + _ = syscall.Close(fd) + } + if len(fds) == 0 { + return nil + } + return &UnexpectedFileCountError{0, len(fds)} +} + +// Close frees the underlying buffer and closes the connection. +func (ctx *Context) Close() error { ctx.free(); return ctx.conn.Close() } + /* pipewire/device.h */ const ( diff --git a/internal/pipewire/pipewire_test.go b/internal/pipewire/pipewire_test.go index 63b5f4f..950311d 100644 --- a/internal/pipewire/pipewire_test.go +++ b/internal/pipewire/pipewire_test.go @@ -3,10 +3,815 @@ package pipewire_test import ( _ "embed" "encoding/binary" + "fmt" + "net" + "reflect" + . "syscall" + "testing" + "time" "hakurei.app/internal/pipewire" ) +func TestContext(t *testing.T) { + t.Parallel() + + var ( + // Underlying connection stub holding test data. + conn = stubUnixConn{samples: []stubUnixConnSample{ + {SYS_SENDMSG, samplePWContainer00, MSG_DONTWAIT | MSG_NOSIGNAL, nil, 0}, + {SYS_RECVMSG, samplePWContainer01, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, + {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, + {SYS_SENDMSG, samplePWContainer03, MSG_DONTWAIT | MSG_NOSIGNAL, nil, 0}, + {SYS_RECVMSG, samplePWContainer04, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, + {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, + {SYS_SENDMSG, samplePWContainer06, MSG_DONTWAIT | MSG_NOSIGNAL, []int{20, 21}, 0}, + {SYS_RECVMSG, samplePWContainer07, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, + {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, + }} + + // Context instance under testing. + ctx = pipewire.MustNew(&conn, pipewire.SPADict{ + {Key: "remote.intention", Value: "manager"}, + {Key: "application.name", Value: "pw-container"}, + {Key: "application.process.binary", Value: "pw-container"}, + {Key: "application.language", Value: "en_US.UTF-8"}, + {Key: "application.process.id", Value: "1443"}, + {Key: "application.process.user", Value: "alice"}, + {Key: "application.process.host", Value: "nixos"}, + {Key: "application.process.session-id", Value: "1"}, + {Key: "window.x11.display", Value: ":0"}, + {Key: "cpu.vm.name", Value: "qemu"}, + {Key: "log.level", Value: "0"}, + {Key: "cpu.max-align", Value: "32"}, + {Key: "default.clock.rate", Value: "48000"}, + {Key: "default.clock.quantum", Value: "1024"}, + {Key: "default.clock.min-quantum", Value: "32"}, + {Key: "default.clock.max-quantum", Value: "2048"}, + {Key: "default.clock.quantum-limit", Value: "8192"}, + {Key: "default.clock.quantum-floor", Value: "4"}, + {Key: "default.video.width", Value: "640"}, + {Key: "default.video.height", Value: "480"}, + {Key: "default.video.rate.num", Value: "25"}, + {Key: "default.video.rate.denom", Value: "1"}, + {Key: "clock.power-of-two-quantum", Value: "true"}, + {Key: "link.max-buffers", Value: "64"}, + {Key: "mem.warn-mlock", Value: "false"}, + {Key: "mem.allow-mlock", Value: "true"}, + {Key: "settings.check-quantum", Value: "false"}, + {Key: "settings.check-rate", Value: "false"}, + {Key: "core.version", Value: "1.4.7"}, + {Key: "core.name", Value: "pipewire-alice-1443"}, + }) + ) + + var registry *pipewire.Registry + const wantRegistryId = 2 + if r, err := ctx.GetRegistry(); err != nil { + t.Fatalf("GetRegistry: error = %v", err) + } else { + if r.ID != wantRegistryId { + t.Fatalf("GetRegistry: ID = %d, want %d", r.ID, wantRegistryId) + } + registry = r + } + if err := ctx.GetCore().Sync(); err != nil { + t.Fatalf("Sync: error = %v", err) + } + + wantCoreInfo0 := pipewire.CoreInfo{ + ID: 0, + Cookie: -2069267610, + UserName: "alice", + HostName: "nixos", + Version: "1.4.7", + Name: "pipewire-0", + ChangeMask: pipewire.PW_CORE_CHANGE_MASK_PROPS, + Properties: &pipewire.SPADict{ + {Key: "config.name", Value: "pipewire.conf"}, + {Key: "application.name", Value: "pipewire"}, + {Key: "application.process.binary", Value: "pipewire"}, + {Key: "application.language", Value: "en_US.UTF-8"}, + {Key: "application.process.id", Value: "1446"}, + {Key: "application.process.user", Value: "alice"}, + {Key: "application.process.host", Value: "nixos"}, + {Key: "window.x11.display", Value: ":0"}, + {Key: "cpu.vm.name", Value: "qemu"}, + {Key: "link.max-buffers", Value: "16"}, + {Key: "core.daemon", Value: "true"}, + {Key: "core.name", Value: "pipewire-0"}, + {Key: "default.clock.min-quantum", Value: "1024"}, + {Key: "cpu.max-align", Value: "32"}, + {Key: "default.clock.rate", Value: "48000"}, + {Key: "default.clock.quantum", Value: "1024"}, + {Key: "default.clock.max-quantum", Value: "2048"}, + {Key: "default.clock.quantum-limit", Value: "8192"}, + {Key: "default.clock.quantum-floor", Value: "4"}, + {Key: "default.video.width", Value: "640"}, + {Key: "default.video.height", Value: "480"}, + {Key: "default.video.rate.num", Value: "25"}, + {Key: "default.video.rate.denom", Value: "1"}, + {Key: "log.level", Value: "2"}, + {Key: "clock.power-of-two-quantum", Value: "true"}, + {Key: "mem.warn-mlock", Value: "false"}, + {Key: "mem.allow-mlock", Value: "true"}, + {Key: "settings.check-quantum", Value: "false"}, + {Key: "settings.check-rate", Value: "false"}, + {Key: "object.id", Value: "0"}, + {Key: "object.serial", Value: "0"}}, + } + + wantClient0 := pipewire.Client{ + Info: &pipewire.ClientInfo{ + ID: 34, + ChangeMask: pipewire.PW_CLIENT_CHANGE_MASK_PROPS, + Properties: &pipewire.SPADict{ + {Key: "pipewire.protocol", Value: "protocol-native"}, + {Key: "core.name", Value: "pipewire-alice-1443"}, + {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, + {Key: "pipewire.sec.pid", Value: "1443"}, + {Key: "pipewire.sec.uid", Value: "1000"}, + {Key: "pipewire.sec.gid", Value: "100"}, + {Key: "module.id", Value: "2"}, + {Key: "object.id", Value: "34"}, + {Key: "object.serial", Value: "34"}, + {Key: "remote.intention", Value: "manager"}, + {Key: "application.name", Value: "pw-container"}, + {Key: "application.process.binary", Value: "pw-container"}, + {Key: "application.language", Value: "en_US.UTF-8"}, + {Key: "application.process.id", Value: "1443"}, + {Key: "application.process.user", Value: "alice"}, + {Key: "application.process.host", Value: "nixos"}, + {Key: "application.process.session-id", Value: "1"}, + {Key: "window.x11.display", Value: ":0"}, + {Key: "cpu.vm.name", Value: "qemu"}, + {Key: "log.level", Value: "0"}, + {Key: "cpu.max-align", Value: "32"}, + {Key: "default.clock.rate", Value: "48000"}, + {Key: "default.clock.quantum", Value: "1024"}, + {Key: "default.clock.min-quantum", Value: "32"}, + {Key: "default.clock.max-quantum", Value: "2048"}, + {Key: "default.clock.quantum-limit", Value: "8192"}, + {Key: "default.clock.quantum-floor", Value: "4"}, + {Key: "default.video.width", Value: "640"}, + {Key: "default.video.height", Value: "480"}, + {Key: "default.video.rate.num", Value: "25"}, + {Key: "default.video.rate.denom", Value: "1"}, + {Key: "clock.power-of-two-quantum", Value: "true"}, + {Key: "link.max-buffers", Value: "64"}, + {Key: "mem.warn-mlock", Value: "false"}, + {Key: "mem.allow-mlock", Value: "true"}, + {Key: "settings.check-quantum", Value: "false"}, + {Key: "settings.check-rate", Value: "false"}, + {Key: "core.version", Value: "1.4.7"}, + {Key: "pipewire.access", Value: "unrestricted"}, + }, + }, + Properties: pipewire.SPADict{ + {Key: "object.serial", Value: "34"}, + {Key: "module.id", Value: "2"}, + {Key: "pipewire.protocol", Value: "protocol-native"}, + {Key: "pipewire.sec.pid", Value: "1443"}, + {Key: "pipewire.sec.uid", Value: "1000"}, + {Key: "pipewire.sec.gid", Value: "100"}, + {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, + }, + } + + wantRegistry0 := pipewire.Registry{ + ID: wantRegistryId, + Objects: map[pipewire.Int]pipewire.RegistryGlobal{ + pipewire.PW_ID_CORE: { + ID: pipewire.PW_ID_CORE, + Permissions: pipewire.PW_CORE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Core, + Version: pipewire.PW_VERSION_CORE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "0"}, + {Key: "core.name", Value: "pipewire-0"}, + }, + }, + + 1: { + ID: 1, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "1"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-rt"}, + }, + }, + + 3: { + ID: 3, + Permissions: pipewire.PW_SECURITY_CONTEXT_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_SecurityContext, + Version: pipewire.PW_VERSION_SECURITY_CONTEXT, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "3"}, + }, + }, + + 2: { + ID: 2, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "2"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-protocol-native"}, + }, + }, + + 5: { + ID: 5, + Permissions: pipewire.PW_PROFILER_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Profiler, + Version: pipewire.PW_VERSION_PROFILER, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "5"}, + }, + }, + + 4: { + ID: 4, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "4"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-profiler"}, + }, + }, + + 6: { + ID: 6, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "6"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-metadata"}, + }, + }, + + 7: { + ID: 7, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "7"}, + {Key: "module.id", Value: "6"}, + {Key: "factory.name", Value: "metadata"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Metadata}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 8: { + ID: 8, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "8"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-spa-device-factory"}, + }, + }, + + 9: { + ID: 9, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "9"}, + {Key: "module.id", Value: "8"}, + {Key: "factory.name", Value: "spa-device-factory"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Device}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 10: { + ID: 10, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "10"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-spa-node-factory"}, + }, + }, + + 11: { + ID: 11, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "11"}, + {Key: "module.id", Value: "10"}, + {Key: "factory.name", Value: "spa-node-factory"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Node}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 12: { + ID: 12, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "12"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-client-node"}, + }, + }, + + 13: { + ID: 13, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "13"}, + {Key: "module.id", Value: "12"}, + {Key: "factory.name", Value: "client-node"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_ClientNode}, + {Key: "factory.type.version", Value: "6"}, + }, + }, + + 14: { + ID: 14, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "14"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-client-device"}, + }, + }, + + 15: { + ID: 15, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "15"}, + {Key: "module.id", Value: "14"}, + {Key: "factory.name", Value: "client-device"}, + {Key: "factory.type.name", Value: "Spa:Pointer:Interface:Device"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 16: { + ID: 16, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "16"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-portal"}, + }, + }, + + 17: { + ID: 17, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "17"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-access"}, + }, + }, + + 18: { + ID: 18, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "18"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-adapter"}, + }, + }, + + 19: { + ID: 19, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "19"}, + {Key: "module.id", Value: "18"}, + {Key: "factory.name", Value: "adapter"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Node}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 20: { + ID: 20, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "20"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-link-factory"}, + }, + }, + + 21: { + ID: 21, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "21"}, + {Key: "module.id", Value: "20"}, + {Key: "factory.name", Value: "link-factory"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Link}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 22: { + ID: 22, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "22"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-session-manager"}, + }, + }, + + 23: { + ID: 23, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "23"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "client-endpoint"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:ClientEndpoint"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 24: { + ID: 24, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "24"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "client-session"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:ClientSession"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 25: { + ID: 25, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "25"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "session"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:Session"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 26: { + ID: 26, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "26"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "endpoint"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:Endpoint"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 27: { + ID: 27, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "27"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "endpoint-stream"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:EndpointStream"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 28: { + ID: 28, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "28"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "endpoint-link"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:EndpointLink"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 29: { + ID: 29, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "29"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-x11-bell"}, + }, + }, + + 30: { + ID: 30, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "30"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-jackdbus-detect"}, + }, + }, + + 31: { + ID: 31, + Permissions: pipewire.PW_PERM_RWXM, // why is this not PW_NODE_PERM_MASK? + Type: pipewire.PW_TYPE_INTERFACE_Node, + Version: pipewire.PW_VERSION_NODE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "31"}, + {Key: "factory.id", Value: "11"}, + {Key: "priority.driver", Value: "200000"}, + {Key: "node.name", Value: "Dummy-Driver"}, + }, + }, + + 32: { + ID: 32, + Permissions: pipewire.PW_PERM_RWXM, // why is this not PW_NODE_PERM_MASK? + Type: pipewire.PW_TYPE_INTERFACE_Node, + Version: pipewire.PW_VERSION_NODE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "32"}, + {Key: "factory.id", Value: "11"}, + {Key: "priority.driver", Value: "190000"}, + {Key: "node.name", Value: "Freewheel-Driver"}, + }, + }, + + 33: { + ID: 33, + Permissions: pipewire.PW_METADATA_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Metadata, + Version: pipewire.PW_VERSION_METADATA, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "33"}, + {Key: "metadata.name", Value: "settings"}, + }, + }, + + 34: { + ID: 34, + Permissions: pipewire.PW_CLIENT_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Client, + Version: pipewire.PW_VERSION_CLIENT, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "34"}, + {Key: "module.id", Value: "2"}, + {Key: "pipewire.protocol", Value: "protocol-native"}, + {Key: "pipewire.sec.pid", Value: "1443"}, + {Key: "pipewire.sec.uid", Value: "1000"}, + {Key: "pipewire.sec.gid", Value: "100"}, + {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, + {Key: "pipewire.access", Value: "unrestricted"}, + {Key: "application.name", Value: "pw-container"}, + }, + }, + + 35: { + ID: 35, + Permissions: pipewire.PW_CLIENT_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Client, + Version: pipewire.PW_VERSION_CLIENT, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "35"}, + {Key: "module.id", Value: "2"}, + {Key: "pipewire.protocol", Value: "protocol-native"}, + {Key: "pipewire.sec.pid", Value: "1447"}, + {Key: "pipewire.sec.uid", Value: "1000"}, + {Key: "pipewire.sec.gid", Value: "100"}, + {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, + {Key: "pipewire.access", Value: "unrestricted"}, + {Key: "application.name", Value: "WirePlumber"}, + }, + }, + }, + } + + if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { + t.Fatalf("New: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) + } + if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { + t.Fatalf("New: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) + } + if registry.ID != wantRegistry0.ID { + t.Fatalf("GetRegistry: ID = %d, want %d", registry.ID, wantRegistry0.ID) + } + if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { + t.Fatalf("GetRegistry: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) + } + + var securityContext *pipewire.SecurityContext + const wantSecurityContextId = 3 + if c, err := registry.GetSecurityContext(); err != nil { + t.Fatalf("GetSecurityContext: error = %v", err) + } else { + if c.ID != wantSecurityContextId { + t.Fatalf("GetSecurityContext: ID = %d, want %d", c.ID, wantSecurityContextId) + } + securityContext = c + } + if err := ctx.Roundtrip(); err != nil { + t.Fatalf("Roundtrip: error = %v", err) + } + + // none of these should change + if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { + t.Fatalf("Roundtrip: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) + } + if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { + t.Fatalf("Roundtrip: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) + } + if registry.ID != wantRegistry0.ID { + t.Fatalf("Roundtrip: ID = %d, want %d", registry.ID, wantRegistry0.ID) + } + if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { + t.Fatalf("Roundtrip: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) + } + + if err := securityContext.Create(21, 20, pipewire.SPADict{ + {Key: "pipewire.sec.engine", Value: "org.flatpak"}, + {Key: "pipewire.access", Value: "restricted"}, + }); err != nil { + t.Fatalf("SecurityContext.Create: error = %v", err) + } + if err := ctx.GetCore().Sync(); err != nil { + t.Fatalf("Sync: error = %v", err) + } + + // none of these should change + if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { + t.Fatalf("Roundtrip: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) + } + if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { + t.Fatalf("Roundtrip: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) + } + if registry.ID != wantRegistry0.ID { + t.Fatalf("Roundtrip: ID = %d, want %d", registry.ID, wantRegistry0.ID) + } + if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { + t.Fatalf("Roundtrip: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) + } + + if err := ctx.Close(); err != nil { + t.Fatalf("Close: error = %v", err) + } +} + +// stubUnixConnSample is sample data held by stubUnixConn. +type stubUnixConnSample struct { + nr uintptr + iovec string + flags uintptr + files []int + errno Errno +} + +// stubUnixConn implements [pipewire.Conn] and checks the behaviour of [pipewire.Context]. +type stubUnixConn struct { + samples []stubUnixConnSample + current int + + deadline *time.Time +} + +// checkDeadline checks whether deadline is set reasonably. +func (conn *stubUnixConn) checkDeadline() error { + if conn.deadline == nil || conn.deadline.Before(time.Now()) { + return fmt.Errorf("invalid deadline %v", conn.deadline) + } + conn.deadline = nil + return nil +} + +// nextSample returns the current sample and increments the counter. +func (conn *stubUnixConn) nextSample(nr uintptr) (sample *stubUnixConnSample, wantOOB []byte, err error) { + sample = &conn.samples[conn.current] + conn.current++ + if sample.nr != nr { + err = fmt.Errorf("unexpected syscall %d", SYS_SENDMSG) + return + } + if len(sample.files) > 0 { + wantOOB = UnixRights(sample.files...) + } + + return +} + +func (conn *stubUnixConn) ReadMsgUnix(b, oob []byte) (n, oobn, flags int, addr *net.UnixAddr, err error) { + if conn.samples[conn.current-1].nr == SYS_SENDMSG { + if err = conn.checkDeadline(); err != nil { + return + } + } + + var ( + sample *stubUnixConnSample + wantOOB []byte + ) + sample, wantOOB, err = conn.nextSample(SYS_RECVMSG) + if err != nil { + return + } + + if copy(b, sample.iovec) != len(sample.iovec) { + err = fmt.Errorf("insufficient iovec size %d, want at least %d", len(b), len(sample.iovec)) + } + if copy(oob, wantOOB) != len(wantOOB) { + err = fmt.Errorf("insufficient oob size %d, want at least %d", len(oob), len(wantOOB)) + } + + if sample.errno != 0 && sample.errno != EAGAIN { + err = sample.errno + } + return len(sample.iovec), len(wantOOB), MSG_CMSG_CLOEXEC, nil, nil +} + +func (conn *stubUnixConn) WriteMsgUnix(b, oob []byte, addr *net.UnixAddr) (n, oobn int, err error) { + if addr != nil { + err = fmt.Errorf("WriteMsgUnix called with non-nil addr: %#v", addr) + return + } + if err = conn.checkDeadline(); err != nil { + return + } + + var ( + sample *stubUnixConnSample + wantOOB []byte + ) + sample, wantOOB, err = conn.nextSample(SYS_SENDMSG) + if err != nil { + return + } + + if string(b) != sample.iovec { + err = fmt.Errorf("iovec: %#v, want %#v", b, []byte(sample.iovec)) + return + } + if string(oob[:len(wantOOB)]) != string(wantOOB) { + err = fmt.Errorf("oob: %#v, want %#v", oob[:len(wantOOB)], wantOOB) + return + } + return len(sample.iovec), len(wantOOB), nil +} + +func (conn *stubUnixConn) SetDeadline(t time.Time) error { conn.deadline = &t; return nil } + +func (conn *stubUnixConn) Close() error { + if conn.current != len(conn.samples) { + return fmt.Errorf("consumed %d samples, want %d", conn.current, len(conn.samples)) + } + return nil +} + var ( //go:embed testdata/pw-container-00-sendmsg samplePWContainer00 string diff --git a/internal/pipewire/securitycontext.go b/internal/pipewire/securitycontext.go index 849dc95..958673e 100644 --- a/internal/pipewire/securitycontext.go +++ b/internal/pipewire/securitycontext.go @@ -67,3 +67,65 @@ func (c *SecurityContextCreate) MarshalBinary() ([]byte, error) { return Marshal // UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. func (c *SecurityContextCreate) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } + +// SecurityContext holds state of [PW_TYPE_INTERFACE_SecurityContext]. +type SecurityContext struct { + // Proxy id as tracked by [Context]. + ID Int `json:"proxy_id"` + // Global id as tracked by [Registry]. + GlobalID Int `json:"id"` + + ctx *Context +} + +// GetSecurityContext queues a [RegistryBind] message for the PipeWire server +// and returns the address of the newly allocated [SecurityContext]. +func (registry *Registry) GetSecurityContext() (securityContext *SecurityContext, err error) { + securityContext = &SecurityContext{ctx: registry.ctx} + for globalId, object := range registry.Objects { + if object.Type == securityContext.String() { + securityContext.GlobalID = globalId + securityContext.ID, err = registry.bind(securityContext, securityContext.GlobalID, PW_VERSION_SECURITY_CONTEXT) + return + } + } + + return nil, UnsupportedObjectTypeError(securityContext.String()) +} + +// Create queues a [SecurityContextCreate] message for the PipeWire server. +func (securityContext *SecurityContext) Create(listenFd, closeFd int, props SPADict) error { + // queued in reverse based on upstream behaviour, unsure why + offset := securityContext.ctx.queueFiles(closeFd, listenFd) + return securityContext.ctx.writeMessage( + securityContext.ID, + PW_SECURITY_CONTEXT_METHOD_CREATE, + &SecurityContextCreate{ListenFd: offset + 1, CloseFd: offset + 0, Properties: &props}, + ) +} + +func (securityContext *SecurityContext) consume(opcode byte, files []int, _ func(v any) error) error { + if err := closeReceivedFiles(files...); err != nil { + return err + } + + switch opcode { + // SecurityContext does not receive any events + + default: + return &UnsupportedOpcodeError{opcode, securityContext.String()} + } + +} + +func (securityContext *SecurityContext) setBoundProps(event *CoreBoundProps) error { + if securityContext.ID != event.ID { + return &InconsistentIdError{Proxy: securityContext, ID: securityContext.ID, ServerID: event.ID} + } + if securityContext.GlobalID != event.GlobalID { + return &InconsistentIdError{Global: true, Proxy: securityContext, ID: securityContext.GlobalID, ServerID: event.GlobalID} + } + return nil +} + +func (securityContext *SecurityContext) String() string { return PW_TYPE_INTERFACE_SecurityContext }