From b7fb8eb9c2531a0db4bc769732a480f7409bff1e Mon Sep 17 00:00:00 2001 From: Ophestra Date: Wed, 3 Dec 2025 03:40:49 +0900 Subject: [PATCH] internal/pipewire: relocate context implementation This should make things slightly easier to navigate. Signed-off-by: Ophestra --- internal/pipewire/context.go | 640 +++++++++++++++++++++ internal/pipewire/context_test.go | 876 +++++++++++++++++++++++++++++ internal/pipewire/pipewire.go | 639 --------------------- internal/pipewire/pipewire_test.go | 870 ---------------------------- 4 files changed, 1516 insertions(+), 1509 deletions(-) create mode 100644 internal/pipewire/context.go create mode 100644 internal/pipewire/context_test.go diff --git a/internal/pipewire/context.go b/internal/pipewire/context.go new file mode 100644 index 0000000..654bfc4 --- /dev/null +++ b/internal/pipewire/context.go @@ -0,0 +1,640 @@ +package pipewire + +import ( + "encoding/binary" + "fmt" + "io" + "maps" + "net" + "os" + "runtime" + "slices" + "strconv" + "syscall" + "time" +) + +// Conn is a subset of methods of [net.UnixConn] used by [Context]. +type Conn interface { + // ReadMsgUnix reads a message from c, copying the payload into b and + // the associated out-of-band data into oob. It returns the number of + // bytes copied into b, the number of bytes copied into oob, the flags + // that were set on the message and the source address of the message. + // + // Note that if len(b) == 0 and len(oob) > 0, this function will still + // read (and discard) 1 byte from the connection. + ReadMsgUnix(b, oob []byte) (n, oobn, flags int, addr *net.UnixAddr, err error) + + // WriteMsgUnix writes a message to addr via c, copying the payload + // from b and the associated out-of-band data from oob. It returns the + // number of payload and out-of-band bytes written. + // + // Note that if len(b) == 0 and len(oob) > 0, this function will still + // write 1 byte to the connection. + WriteMsgUnix(b, oob []byte, addr *net.UnixAddr) (n, oobn int, err error) + + // SetDeadline sets the read and write deadlines associated + // with the connection. It is equivalent to calling both + // SetReadDeadline and SetWriteDeadline. + // + // A deadline is an absolute time after which I/O operations + // fail instead of blocking. The deadline applies to all future + // and pending I/O, not just the immediately following call to + // Read or Write. After a deadline has been exceeded, the + // connection can be refreshed by setting a deadline in the future. + // + // If the deadline is exceeded a call to Read or Write or to other + // I/O methods will return an error that wraps os.ErrDeadlineExceeded. + // This can be tested using errors.Is(err, os.ErrDeadlineExceeded). + // The error's Timeout method will return true, but note that there + // are other possible errors for which the Timeout method will + // return true even if the deadline has not been exceeded. + // + // An idle timeout can be implemented by repeatedly extending + // the deadline after successful Read or Write calls. + // + // A zero value for t means I/O operations will not time out. + SetDeadline(t time.Time) error + + // Close closes the connection. + // Any blocked Read or Write operations will be unblocked and return errors. + Close() error +} + +// The kernel constant SCM_MAX_FD defines a limit on the number of file descriptors in the array. +// Attempting to send an array larger than this limit causes sendmsg(2) to fail with the error +// EINVAL. SCM_MAX_FD has the value 253 (or 255 before Linux 2.6.38). +const _SCM_MAX_FD = 253 + +// A Context holds state of a connection to PipeWire. +type Context struct { + // Pending message data, committed via a call to Roundtrip. + buf []byte + // Current [Header.Sequence] value, incremented every write. + sequence Int + // Current server-side [Header.Sequence] value, incremented on every event processed. + remoteSequence Int + // Proxy id associations. + proxy map[Int]eventProxy + // Newly allocated proxies pending acknowledgement from the server. + pendingIds map[Int]struct{} + // Smallest available Id for the next proxy. + nextId Int + // Server side registry generation number. + generation Long + // Pending file descriptors to be sent with the next message. + pendingFiles []int + // File count kept track of in [Header]. + headerFiles int + // Files from the server. This is discarded on every Roundtrip so eventProxy + // implementations must make sure to close them to avoid leaking fds. + // + // These are not automatically set up as [os.File] because it is impossible + // to undo the effects of os.NewFile, which can be inconvenient for some uses. + receivedFiles []int + // Pending footer value for the next outgoing message. + // Newer footers appear to simply replace the existing one. + pendingFooter KnownSize + // Pending footer value deferred to the next round trip, + // sent if pendingFooter is nil. This is for emulating upstream behaviour + deferredPendingFooter KnownSize + // Proxy for built-in core events. + core Core + // Proxy for built-in client events. + client Client + + // Passed to [Conn.ReadMsgUnix]. Not copied if sufficient for all received messages. + iovecBuf [1 << 15]byte + // Passed to [Conn.ReadMsgUnix] for ancillary messages and is never copied. + oobBuf [(_SCM_MAX_FD/2+_SCM_MAX_FD%2+2)<<3 + 1]byte + // Underlying connection, usually implemented by [net.UnixConn]. + conn Conn +} + +// GetCore returns the address of [Core] held by this [Context]. +func (ctx *Context) GetCore() *Core { return &ctx.core } + +// GetClient returns the address of [Client] held by this [Context]. +func (ctx *Context) GetClient() *Client { return &ctx.client } + +// New initialises [Context] for an already established connection and returns its address. +// The caller must not call any method of the underlying [Conn] after this function returns. +func New(conn Conn, props SPADict) (*Context, error) { + ctx := Context{conn: conn} + ctx.core.ctx = &ctx + ctx.proxy = map[Int]eventProxy{ + PW_ID_CORE: &ctx.core, + PW_ID_CLIENT: &ctx.client, + } + ctx.pendingIds = map[Int]struct{}{ + PW_ID_CLIENT: {}, + } + ctx.nextId = Int(len(ctx.proxy)) + + if err := ctx.coreHello(); err != nil { + return nil, err + } + if err := ctx.clientUpdateProperties(props); err != nil { + return nil, err + } + + return &ctx, nil +} + +// MustNew calls [New](conn, props) and panics on error. +// It is intended for use in tests with hard-coded strings. +func MustNew(conn Conn, props SPADict) *Context { + if ctx, err := New(conn, props); err != nil { + panic(err) + } else { + return ctx + } +} + +// free releases the underlying storage of buf. +func (ctx *Context) free() { ctx.buf = make([]byte, 0) } + +// queueFiles queues some file descriptors to be sent for the next message. +// It returns the offset of their index for the syscall.SCM_RIGHTS message. +func (ctx *Context) queueFiles(fds ...int) (offset Fd) { + offset = Fd(len(ctx.pendingFiles)) + ctx.pendingFiles = append(ctx.pendingFiles, fds...) + return +} + +// writeMessage appends the POD representation of v and an optional footer to buf. +func (ctx *Context) writeMessage( + Id Int, opcode byte, + v KnownSize, +) (err error) { + if ctx.pendingFooter == nil && ctx.deferredPendingFooter != nil { + ctx.pendingFooter, ctx.deferredPendingFooter = ctx.deferredPendingFooter, nil + } + + size := v.Size() + if ctx.pendingFooter != nil { + size += ctx.pendingFooter.Size() + } + if size&^SizeMax != 0 { + return ErrSizeRange + } + + ctx.buf = slices.Grow(ctx.buf, int(SizeHeader+size)) + ctx.buf = (&Header{ + ID: Id, Opcode: opcode, Size: size, + Sequence: ctx.sequence, + FileCount: Int(len(ctx.pendingFiles) - ctx.headerFiles), + }).append(ctx.buf) + ctx.headerFiles = len(ctx.pendingFiles) + ctx.buf, err = MarshalAppend(ctx.buf, v) + if err == nil && ctx.pendingFooter != nil { + ctx.buf, err = MarshalAppend(ctx.buf, ctx.pendingFooter) + ctx.pendingFooter = nil + } + ctx.sequence++ + return +} + +// newProxyId returns a newly allocated proxy Id for the specified type. +func (ctx *Context) newProxyId(proxy eventProxy, ack bool) Int { + newId := ctx.nextId + ctx.proxy[newId] = proxy + if ack { + ctx.pendingIds[newId] = struct{}{} + } + +increment: + ctx.nextId++ + + if _, ok := ctx.proxy[ctx.nextId]; ok { + goto increment + } + return newId +} + +// connTimeout is the maximum duration an I/O operation is allowed for [Conn]. +const connTimeout = 5 * time.Second + +// receiveAll receives from conn until no more data is available. +// The returned slice is valid until the next call to receiveAll. +func (ctx *Context) receiveAll() (payload []byte, err error) { + if err = ctx.conn.SetDeadline(time.Now().Add(connTimeout)); err != nil { + return + } + + var n, oobn int + ctx.receivedFiles = ctx.receivedFiles[:0] + buf := ctx.iovecBuf[:] + +recvmsg: + buf = buf[n:] + n, oobn, _, _, err = ctx.conn.ReadMsgUnix(buf, ctx.oobBuf[:]) + if err != nil { + return + } + if oobn == len(ctx.oobBuf) { + return nil, syscall.ENOMEM // unreachable + } + if oob := ctx.oobBuf[:oobn]; len(oob) > 0 { + var scm []syscall.SocketControlMessage + if scm, err = syscall.ParseSocketControlMessage(oob); err != nil { + return + } + + var fds []int + for i := range scm { + if fds, err = syscall.ParseUnixRights(&scm[i]); err != nil { + return + } + ctx.receivedFiles = append(ctx.receivedFiles, fds...) + } + } + + // receive until buffer fills or payload is depleted + if n > 0 { + goto recvmsg + } + data := ctx.iovecBuf[:len(ctx.iovecBuf)-len(buf)] + + // avoids copy if payload fits in a single ctx.recvmsgBuf + if payload == nil && len(buf) > 0 { + payload = data + return + } + + payload = append(payload, data...) + // this indicates a full ctx.recvmsgBuf + if len(buf) == 0 { + ctx.buf = ctx.iovecBuf[:] + goto recvmsg + } + + return +} + +// An UnknownIdError describes a server message with an Id unknown to [Context]. +type UnknownIdError struct { + // Offending id decoded from Data. + Id Int + // Message received from the server. + Data string +} + +func (e *UnknownIdError) Error() string { return "unknown proxy id " + strconv.Itoa(int(e.Id)) } + +// UnsupportedOpcodeError describes a message with an unsupported opcode. +type UnsupportedOpcodeError struct { + // Offending opcode. + Opcode byte + // Name of interface processed by the proxy. + Interface string +} + +func (e *UnsupportedOpcodeError) Error() string { + return "unsupported " + e.Interface + " opcode " + strconv.Itoa(int(e.Opcode)) +} + +// UnsupportedFooterOpcodeError describes a [Footer] with an unsupported opcode. +type UnsupportedFooterOpcodeError Id + +func (e UnsupportedFooterOpcodeError) Error() string { + return "unsupported footer opcode " + strconv.Itoa(int(e)) +} + +// A RoundtripUnexpectedEOFError describes an unexpected EOF encountered during [Context.Roundtrip]. +type RoundtripUnexpectedEOFError uintptr + +const ( + // ErrRoundtripEOFHeader is returned when unexpectedly encountering EOF + // decoding the message header. + ErrRoundtripEOFHeader RoundtripUnexpectedEOFError = iota + // ErrRoundtripEOFBody is returned when unexpectedly encountering EOF + // establishing message body bounds. + ErrRoundtripEOFBody + // ErrRoundtripEOFFooter is like [ErrRoundtripEOFBody], but for when establishing + // bounds for the footer instead. + ErrRoundtripEOFFooter + // ErrRoundtripEOFFooterOpcode is returned when unexpectedly encountering EOF + // during the footer opcode hack. + ErrRoundtripEOFFooterOpcode +) + +func (RoundtripUnexpectedEOFError) Unwrap() error { return io.ErrUnexpectedEOF } +func (e RoundtripUnexpectedEOFError) Error() string { + var suffix string + switch e { + case ErrRoundtripEOFHeader: + suffix = "decoding message header" + case ErrRoundtripEOFBody: + suffix = "establishing message body bounds" + case ErrRoundtripEOFFooter: + suffix = "establishing message footer bounds" + case ErrRoundtripEOFFooterOpcode: + suffix = "decoding message footer opcode" + + default: + return "unexpected EOF" + } + + return "unexpected EOF " + suffix +} + +// eventProxy consumes events during a [Context.Roundtrip]. +type eventProxy interface { + // consume consumes an event and its optional footer. + consume(opcode byte, files []int, unmarshal func(v any)) error + // setBoundProps stores a [CoreBoundProps] event received from the server. + setBoundProps(event *CoreBoundProps) error + + // Stringer returns the PipeWire interface name. + fmt.Stringer +} + +// unmarshal is like [Unmarshal] but handles footer if present. +func (ctx *Context) unmarshal(header *Header, data []byte, v any) error { + n, err := UnmarshalNext(data, v) + if err != nil { + return err + } + if len(data) < int(header.Size) || header.Size < n { + return ErrRoundtripEOFFooter + } + isLastMessage := len(data) == int(header.Size) + + data = data[n:header.Size] + if len(data) > 0 { + /* the footer concrete type is determined by opcode, which cannot be + decoded directly before the type is known, so this hack is required: + skip the struct prefix, then the integer prefix, and the next SizeId + bytes are the encoded opcode value */ + if len(data) < int(SizePrefix*2+SizeId) { + return ErrRoundtripEOFFooterOpcode + } + switch opcode := binary.NativeEndian.Uint32(data[SizePrefix*2:]); opcode { + case FOOTER_CORE_OPCODE_GENERATION: + var footer Footer[FooterCoreGeneration] + if err = Unmarshal(data, &footer); err != nil { + return err + } + if ctx.generation != footer.Payload.RegistryGeneration { + var pendingFooter = Footer[FooterClientGeneration]{ + FOOTER_CORE_OPCODE_GENERATION, + FooterClientGeneration{ClientGeneration: footer.Payload.RegistryGeneration}, + } + + // this emulates upstream behaviour that pending footer updated on the last message + // during a roundtrip is pushed back to the first message of the next roundtrip + if isLastMessage { + ctx.deferredPendingFooter = &pendingFooter + } else { + ctx.pendingFooter = &pendingFooter + } + } + ctx.generation = footer.Payload.RegistryGeneration + return nil + + default: + return UnsupportedFooterOpcodeError(opcode) + } + } + return nil +} + +// An UnexpectedSequenceError is a server-side sequence number that does not +// match its counterpart tracked by the client. This indicates that either +// the client has somehow missed events, or data being interpreted as [Header] +// is, in fact, not the message header. +type UnexpectedSequenceError Int + +func (e UnexpectedSequenceError) Error() string { return "unexpected seq " + strconv.Itoa(int(e)) } + +// An UnexpectedFilesError describes an inconsistent state where file count claimed by +// [Header] accumulates to a value greater than the total number of files received. +type UnexpectedFilesError int + +func (e UnexpectedFilesError) Error() string { + return "server message headers claim to have sent more than " + strconv.Itoa(int(e)) + " files" +} + +// A DanglingFilesError holds onto files that were sent by the server but no [Header] +// accounts for. These are closed by their finalizers if discarded. +type DanglingFilesError []*os.File + +func (e DanglingFilesError) Error() string { + return "received " + strconv.Itoa(len(e)) + " dangling files" +} + +// An UnacknowledgedProxyError holds newly allocated proxy ids that the server failed +// to acknowledge after an otherwise successful [Context.Roundtrip]. +type UnacknowledgedProxyError []Int + +func (e UnacknowledgedProxyError) Error() string { + return "server did not acknowledge " + strconv.Itoa(len(e)) + " proxies" +} + +// A ProxyFatalError describes an error that terminates event handling during a +// [Context.Roundtrip] and makes further event processing no longer possible. +type ProxyFatalError struct { + // The fatal error causing the termination of event processing. + Err error + // Previous non-fatal proxy errors. + ProxyErrs []error +} + +func (e *ProxyFatalError) Unwrap() []error { return append(e.ProxyErrs, e.Err) } +func (e *ProxyFatalError) Error() string { + s := e.Err.Error() + if len(e.ProxyErrs) > 0 { + s += "; " + strconv.Itoa(len(e.ProxyErrs)) + " additional proxy errors occurred before this point" + } + return s +} + +// A ProxyConsumeError is a collection of non-protocol errors returned by proxies +// during event processing. These do not prevent event handling from continuing but +// may be considered fatal to the application. +type ProxyConsumeError []error + +func (e ProxyConsumeError) Unwrap() []error { return e } +func (e ProxyConsumeError) Error() string { + if len(e) == 0 { + return "invalid proxy consume error" + } + + // first error is usually the most relevant one + s := e[0].Error() + if len(e) > 1 { + s += "; " + strconv.Itoa(len(e)) + " additional proxy errors occurred after this point" + } + return s +} + +// roundtripSyncID is the id passed to Context.coreSync during a [Context.Roundtrip]. +const roundtripSyncID = 0 + +// Roundtrip queues the [CoreSync] message and sends all pending messages to the server. +// +// For a non-nil error, if the error happens over the network, it has concrete type +// [net.OpError]. +func (ctx *Context) Roundtrip() (err error) { + if err = ctx.conn.SetDeadline(time.Now().Add(connTimeout)); err != nil { + return + } + if _, _, err = ctx.conn.WriteMsgUnix(ctx.buf, syscall.UnixRights(ctx.pendingFiles...), nil); err != nil { + return + } + + var ( + // this holds onto non-protocol errors encountered during event handling; + // errors that prevent event processing from continuing must be panicked + proxyErrors ProxyConsumeError + + // current position of processed events in ctx.receivedFiles, anything + // beyond this is closed if event processing is terminated + receivedHeaderFiles int + ) + defer func() { + // anything before this has already been processed and must not be closed + // here, as anything holding onto them will end up with a dangling fd that + // can be reused and cause serious problems + if len(ctx.receivedFiles) > receivedHeaderFiles { + for _, fd := range ctx.receivedFiles[receivedHeaderFiles:] { + _ = syscall.Close(fd) + } + + // this catches cases where Roundtrip somehow returns without processing + // all received files or preparing an error for dangling files, this is + // always overwritten by the fatal error being processed below or made + // inaccessible due to repanicking, so if this ends up returned to the + // caller it indicates something has gone seriously wrong in Roundtrip + if err == nil { + err = syscall.ENOTRECOVERABLE + } + } + + r := recover() + if r == nil { + return + } + + recoveredErr, ok := r.(error) + if !ok { + panic(r) + } + if recoveredErr == nil { + panic(&runtime.PanicNilError{}) + } + + err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: proxyErrors} + return + }() + + ctx.buf = ctx.buf[:0] + ctx.pendingFiles = ctx.pendingFiles[:0] + ctx.headerFiles = 0 + + var data []byte + if data, err = ctx.receiveAll(); err != nil { + return + } + + var header Header + for len(data) > 0 { + if len(data) < SizeHeader { + return ErrRoundtripEOFHeader + } + + if err = header.UnmarshalBinary(data[:SizeHeader]); err != nil { + return + } + if header.Sequence != ctx.remoteSequence { + return UnexpectedSequenceError(header.Sequence) + } + ctx.remoteSequence++ + + if len(data) < int(SizeHeader+header.Size) { + return ErrRoundtripEOFBody + } + + proxy, ok := ctx.proxy[header.ID] + if !ok { + return &UnknownIdError{header.ID, string(data[:SizeHeader+header.Size])} + } + + nextReceivedHeaderFiles := receivedHeaderFiles + int(header.FileCount) + if nextReceivedHeaderFiles > len(ctx.receivedFiles) { + return UnexpectedFilesError(len(ctx.receivedFiles)) + } + files := ctx.receivedFiles[receivedHeaderFiles:nextReceivedHeaderFiles] + receivedHeaderFiles = nextReceivedHeaderFiles + + data = data[SizeHeader:] + proxyErr := proxy.consume(header.Opcode, files, func(v any) { + if unmarshalErr := ctx.unmarshal(&header, data, v); unmarshalErr != nil { + panic(unmarshalErr) + } + }) + data = data[header.Size:] + if proxyErr != nil { + proxyErrors = append(proxyErrors, proxyErr) + } + } + + // prepared here so finalizers are set up, but should not prevent proxyErrors + // from reaching the caller as those describe the cause of these dangling fds + var danglingFiles DanglingFilesError + if len(ctx.receivedFiles) > receivedHeaderFiles { + danglingFds := ctx.receivedFiles[receivedHeaderFiles:] + // having multiple *os.File with the same fd causes serious problems + slices.Sort(danglingFds) + danglingFds = slices.Compact(danglingFds) + + danglingFiles = make(DanglingFilesError, 0, len(danglingFds)) + for _, fd := range danglingFds { + // hold these as *os.File so they are closed if this error never reaches the caller, + // or the caller discards or otherwise does not handle this error, to avoid leaking fds + danglingFiles = append(danglingFiles, os.NewFile(uintptr(fd), + "dangling fd "+strconv.Itoa(fd)+" received from PipeWire")) + } + } + + // these are checked and made available first since they describe the cause + // of so-called symptoms checked after this point; the symptoms should only + // be made available as a catch-all if these are unavailable + if len(proxyErrors) > 0 { + return proxyErrors + } + + // populated early for finalizers + if len(danglingFiles) > 0 { + return danglingFiles + } + + // this check must happen after everything else passes + if len(ctx.pendingIds) != 0 { + return UnacknowledgedProxyError(slices.Collect(maps.Keys(ctx.pendingIds))) + } + return +} + +// An UnexpectedFileCountError is returned as part of a [ProxyFatalError] for an event +// that received an unexpected number of files. +type UnexpectedFileCountError [2]int + +func (e *UnexpectedFileCountError) Error() string { + return "received " + strconv.Itoa(e[1]) + " files instead of the expected " + strconv.Itoa(e[0]) +} + +// closeReceivedFiles closes all received files and panics with [UnexpectedFileCountError] +// if one or more files are passed. This is used with events that do not expect files. +func closeReceivedFiles(fds ...int) { + for _, fd := range fds { + _ = syscall.Close(fd) + } + if len(fds) > 0 { + panic(&UnexpectedFileCountError{0, len(fds)}) + } +} + +// Close frees the underlying buffer and closes the connection. +func (ctx *Context) Close() error { ctx.free(); return ctx.conn.Close() } diff --git a/internal/pipewire/context_test.go b/internal/pipewire/context_test.go new file mode 100644 index 0000000..f819dff --- /dev/null +++ b/internal/pipewire/context_test.go @@ -0,0 +1,876 @@ +package pipewire_test + +import ( + "fmt" + "net" + "reflect" + . "syscall" + "testing" + "time" + + "hakurei.app/container/stub" + "hakurei.app/internal/pipewire" +) + +func TestContext(t *testing.T) { + t.Parallel() + + var ( + // Underlying connection stub holding test data. + conn = stubUnixConn{samples: []stubUnixConnSample{ + {SYS_SENDMSG, samplePWContainer00, MSG_DONTWAIT | MSG_NOSIGNAL, nil, 0}, + {SYS_RECVMSG, samplePWContainer01, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, + {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, + {SYS_SENDMSG, samplePWContainer03, MSG_DONTWAIT | MSG_NOSIGNAL, nil, 0}, + {SYS_RECVMSG, samplePWContainer04, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, + {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, + {SYS_SENDMSG, samplePWContainer06, MSG_DONTWAIT | MSG_NOSIGNAL, []int{20, 21}, 0}, + {SYS_RECVMSG, samplePWContainer07, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, + {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, + }} + + // Context instance under testing. + ctx = pipewire.MustNew(&conn, pipewire.SPADict{ + {Key: "remote.intention", Value: "manager"}, + {Key: "application.name", Value: "pw-container"}, + {Key: "application.process.binary", Value: "pw-container"}, + {Key: "application.language", Value: "en_US.UTF-8"}, + {Key: "application.process.id", Value: "1443"}, + {Key: "application.process.user", Value: "alice"}, + {Key: "application.process.host", Value: "nixos"}, + {Key: "application.process.session-id", Value: "1"}, + {Key: "window.x11.display", Value: ":0"}, + {Key: "cpu.vm.name", Value: "qemu"}, + {Key: "log.level", Value: "0"}, + {Key: "cpu.max-align", Value: "32"}, + {Key: "default.clock.rate", Value: "48000"}, + {Key: "default.clock.quantum", Value: "1024"}, + {Key: "default.clock.min-quantum", Value: "32"}, + {Key: "default.clock.max-quantum", Value: "2048"}, + {Key: "default.clock.quantum-limit", Value: "8192"}, + {Key: "default.clock.quantum-floor", Value: "4"}, + {Key: "default.video.width", Value: "640"}, + {Key: "default.video.height", Value: "480"}, + {Key: "default.video.rate.num", Value: "25"}, + {Key: "default.video.rate.denom", Value: "1"}, + {Key: "clock.power-of-two-quantum", Value: "true"}, + {Key: "link.max-buffers", Value: "64"}, + {Key: "mem.warn-mlock", Value: "false"}, + {Key: "mem.allow-mlock", Value: "true"}, + {Key: "settings.check-quantum", Value: "false"}, + {Key: "settings.check-rate", Value: "false"}, + {Key: "core.version", Value: "1.4.7"}, + {Key: "core.name", Value: "pipewire-alice-1443"}, + }) + ) + + var registry *pipewire.Registry + const wantRegistryId = 2 + if r, err := ctx.GetRegistry(); err != nil { + t.Fatalf("GetRegistry: error = %v", err) + } else { + if r.ID != wantRegistryId { + t.Fatalf("GetRegistry: ID = %d, want %d", r.ID, wantRegistryId) + } + registry = r + } + if err := ctx.GetCore().Sync(); err != nil { + t.Fatalf("Sync: error = %v", err) + } + + wantCoreInfo0 := pipewire.CoreInfo{ + ID: 0, + Cookie: -2069267610, + UserName: "alice", + HostName: "nixos", + Version: "1.4.7", + Name: "pipewire-0", + ChangeMask: pipewire.PW_CORE_CHANGE_MASK_PROPS, + Properties: &pipewire.SPADict{ + {Key: "config.name", Value: "pipewire.conf"}, + {Key: "application.name", Value: "pipewire"}, + {Key: "application.process.binary", Value: "pipewire"}, + {Key: "application.language", Value: "en_US.UTF-8"}, + {Key: "application.process.id", Value: "1446"}, + {Key: "application.process.user", Value: "alice"}, + {Key: "application.process.host", Value: "nixos"}, + {Key: "window.x11.display", Value: ":0"}, + {Key: "cpu.vm.name", Value: "qemu"}, + {Key: "link.max-buffers", Value: "16"}, + {Key: "core.daemon", Value: "true"}, + {Key: "core.name", Value: "pipewire-0"}, + {Key: "default.clock.min-quantum", Value: "1024"}, + {Key: "cpu.max-align", Value: "32"}, + {Key: "default.clock.rate", Value: "48000"}, + {Key: "default.clock.quantum", Value: "1024"}, + {Key: "default.clock.max-quantum", Value: "2048"}, + {Key: "default.clock.quantum-limit", Value: "8192"}, + {Key: "default.clock.quantum-floor", Value: "4"}, + {Key: "default.video.width", Value: "640"}, + {Key: "default.video.height", Value: "480"}, + {Key: "default.video.rate.num", Value: "25"}, + {Key: "default.video.rate.denom", Value: "1"}, + {Key: "log.level", Value: "2"}, + {Key: "clock.power-of-two-quantum", Value: "true"}, + {Key: "mem.warn-mlock", Value: "false"}, + {Key: "mem.allow-mlock", Value: "true"}, + {Key: "settings.check-quantum", Value: "false"}, + {Key: "settings.check-rate", Value: "false"}, + {Key: "object.id", Value: "0"}, + {Key: "object.serial", Value: "0"}}, + } + + wantClient0 := pipewire.Client{ + Info: &pipewire.ClientInfo{ + ID: 34, + ChangeMask: pipewire.PW_CLIENT_CHANGE_MASK_PROPS, + Properties: &pipewire.SPADict{ + {Key: "pipewire.protocol", Value: "protocol-native"}, + {Key: "core.name", Value: "pipewire-alice-1443"}, + {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, + {Key: "pipewire.sec.pid", Value: "1443"}, + {Key: "pipewire.sec.uid", Value: "1000"}, + {Key: "pipewire.sec.gid", Value: "100"}, + {Key: "module.id", Value: "2"}, + {Key: "object.id", Value: "34"}, + {Key: "object.serial", Value: "34"}, + {Key: "remote.intention", Value: "manager"}, + {Key: "application.name", Value: "pw-container"}, + {Key: "application.process.binary", Value: "pw-container"}, + {Key: "application.language", Value: "en_US.UTF-8"}, + {Key: "application.process.id", Value: "1443"}, + {Key: "application.process.user", Value: "alice"}, + {Key: "application.process.host", Value: "nixos"}, + {Key: "application.process.session-id", Value: "1"}, + {Key: "window.x11.display", Value: ":0"}, + {Key: "cpu.vm.name", Value: "qemu"}, + {Key: "log.level", Value: "0"}, + {Key: "cpu.max-align", Value: "32"}, + {Key: "default.clock.rate", Value: "48000"}, + {Key: "default.clock.quantum", Value: "1024"}, + {Key: "default.clock.min-quantum", Value: "32"}, + {Key: "default.clock.max-quantum", Value: "2048"}, + {Key: "default.clock.quantum-limit", Value: "8192"}, + {Key: "default.clock.quantum-floor", Value: "4"}, + {Key: "default.video.width", Value: "640"}, + {Key: "default.video.height", Value: "480"}, + {Key: "default.video.rate.num", Value: "25"}, + {Key: "default.video.rate.denom", Value: "1"}, + {Key: "clock.power-of-two-quantum", Value: "true"}, + {Key: "link.max-buffers", Value: "64"}, + {Key: "mem.warn-mlock", Value: "false"}, + {Key: "mem.allow-mlock", Value: "true"}, + {Key: "settings.check-quantum", Value: "false"}, + {Key: "settings.check-rate", Value: "false"}, + {Key: "core.version", Value: "1.4.7"}, + {Key: "pipewire.access", Value: "unrestricted"}, + }, + }, + Properties: pipewire.SPADict{ + {Key: "object.serial", Value: "34"}, + {Key: "module.id", Value: "2"}, + {Key: "pipewire.protocol", Value: "protocol-native"}, + {Key: "pipewire.sec.pid", Value: "1443"}, + {Key: "pipewire.sec.uid", Value: "1000"}, + {Key: "pipewire.sec.gid", Value: "100"}, + {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, + }, + } + + wantRegistry0 := pipewire.Registry{ + ID: wantRegistryId, + Objects: map[pipewire.Int]pipewire.RegistryGlobal{ + pipewire.PW_ID_CORE: { + ID: pipewire.PW_ID_CORE, + Permissions: pipewire.PW_CORE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Core, + Version: pipewire.PW_VERSION_CORE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "0"}, + {Key: "core.name", Value: "pipewire-0"}, + }, + }, + + 1: { + ID: 1, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "1"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-rt"}, + }, + }, + + 3: { + ID: 3, + Permissions: pipewire.PW_SECURITY_CONTEXT_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_SecurityContext, + Version: pipewire.PW_VERSION_SECURITY_CONTEXT, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "3"}, + }, + }, + + 2: { + ID: 2, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "2"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-protocol-native"}, + }, + }, + + 5: { + ID: 5, + Permissions: pipewire.PW_PROFILER_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Profiler, + Version: pipewire.PW_VERSION_PROFILER, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "5"}, + }, + }, + + 4: { + ID: 4, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "4"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-profiler"}, + }, + }, + + 6: { + ID: 6, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "6"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-metadata"}, + }, + }, + + 7: { + ID: 7, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "7"}, + {Key: "module.id", Value: "6"}, + {Key: "factory.name", Value: "metadata"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Metadata}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 8: { + ID: 8, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "8"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-spa-device-factory"}, + }, + }, + + 9: { + ID: 9, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "9"}, + {Key: "module.id", Value: "8"}, + {Key: "factory.name", Value: "spa-device-factory"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Device}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 10: { + ID: 10, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "10"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-spa-node-factory"}, + }, + }, + + 11: { + ID: 11, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "11"}, + {Key: "module.id", Value: "10"}, + {Key: "factory.name", Value: "spa-node-factory"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Node}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 12: { + ID: 12, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "12"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-client-node"}, + }, + }, + + 13: { + ID: 13, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "13"}, + {Key: "module.id", Value: "12"}, + {Key: "factory.name", Value: "client-node"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_ClientNode}, + {Key: "factory.type.version", Value: "6"}, + }, + }, + + 14: { + ID: 14, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "14"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-client-device"}, + }, + }, + + 15: { + ID: 15, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "15"}, + {Key: "module.id", Value: "14"}, + {Key: "factory.name", Value: "client-device"}, + {Key: "factory.type.name", Value: "Spa:Pointer:Interface:Device"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 16: { + ID: 16, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "16"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-portal"}, + }, + }, + + 17: { + ID: 17, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "17"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-access"}, + }, + }, + + 18: { + ID: 18, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "18"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-adapter"}, + }, + }, + + 19: { + ID: 19, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "19"}, + {Key: "module.id", Value: "18"}, + {Key: "factory.name", Value: "adapter"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Node}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 20: { + ID: 20, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "20"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-link-factory"}, + }, + }, + + 21: { + ID: 21, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "21"}, + {Key: "module.id", Value: "20"}, + {Key: "factory.name", Value: "link-factory"}, + {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Link}, + {Key: "factory.type.version", Value: "3"}, + }, + }, + + 22: { + ID: 22, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "22"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-session-manager"}, + }, + }, + + 23: { + ID: 23, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "23"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "client-endpoint"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:ClientEndpoint"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 24: { + ID: 24, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "24"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "client-session"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:ClientSession"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 25: { + ID: 25, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "25"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "session"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:Session"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 26: { + ID: 26, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "26"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "endpoint"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:Endpoint"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 27: { + ID: 27, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "27"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "endpoint-stream"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:EndpointStream"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 28: { + ID: 28, + Permissions: pipewire.PW_FACTORY_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Factory, + Version: pipewire.PW_VERSION_FACTORY, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "28"}, + {Key: "module.id", Value: "22"}, + {Key: "factory.name", Value: "endpoint-link"}, + {Key: "factory.type.name", Value: "PipeWire:Interface:EndpointLink"}, + {Key: "factory.type.version", Value: "0"}, + }, + }, + + 29: { + ID: 29, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "29"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-x11-bell"}, + }, + }, + + 30: { + ID: 30, + Permissions: pipewire.PW_MODULE_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Module, + Version: pipewire.PW_VERSION_MODULE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "30"}, + {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-jackdbus-detect"}, + }, + }, + + 31: { + ID: 31, + Permissions: pipewire.PW_PERM_RWXM, // why is this not PW_NODE_PERM_MASK? + Type: pipewire.PW_TYPE_INTERFACE_Node, + Version: pipewire.PW_VERSION_NODE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "31"}, + {Key: "factory.id", Value: "11"}, + {Key: "priority.driver", Value: "200000"}, + {Key: "node.name", Value: "Dummy-Driver"}, + }, + }, + + 32: { + ID: 32, + Permissions: pipewire.PW_PERM_RWXM, // why is this not PW_NODE_PERM_MASK? + Type: pipewire.PW_TYPE_INTERFACE_Node, + Version: pipewire.PW_VERSION_NODE, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "32"}, + {Key: "factory.id", Value: "11"}, + {Key: "priority.driver", Value: "190000"}, + {Key: "node.name", Value: "Freewheel-Driver"}, + }, + }, + + 33: { + ID: 33, + Permissions: pipewire.PW_METADATA_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Metadata, + Version: pipewire.PW_VERSION_METADATA, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "33"}, + {Key: "metadata.name", Value: "settings"}, + }, + }, + + 34: { + ID: 34, + Permissions: pipewire.PW_CLIENT_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Client, + Version: pipewire.PW_VERSION_CLIENT, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "34"}, + {Key: "module.id", Value: "2"}, + {Key: "pipewire.protocol", Value: "protocol-native"}, + {Key: "pipewire.sec.pid", Value: "1443"}, + {Key: "pipewire.sec.uid", Value: "1000"}, + {Key: "pipewire.sec.gid", Value: "100"}, + {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, + {Key: "pipewire.access", Value: "unrestricted"}, + {Key: "application.name", Value: "pw-container"}, + }, + }, + + 35: { + ID: 35, + Permissions: pipewire.PW_CLIENT_PERM_MASK, + Type: pipewire.PW_TYPE_INTERFACE_Client, + Version: pipewire.PW_VERSION_CLIENT, + Properties: &pipewire.SPADict{ + {Key: "object.serial", Value: "35"}, + {Key: "module.id", Value: "2"}, + {Key: "pipewire.protocol", Value: "protocol-native"}, + {Key: "pipewire.sec.pid", Value: "1447"}, + {Key: "pipewire.sec.uid", Value: "1000"}, + {Key: "pipewire.sec.gid", Value: "100"}, + {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, + {Key: "pipewire.access", Value: "unrestricted"}, + {Key: "application.name", Value: "WirePlumber"}, + }, + }, + }, + } + + if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { + t.Fatalf("New: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) + } + if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { + t.Fatalf("New: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) + } + if registry.ID != wantRegistry0.ID { + t.Fatalf("GetRegistry: ID = %d, want %d", registry.ID, wantRegistry0.ID) + } + if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { + t.Fatalf("GetRegistry: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) + } + + var securityContext *pipewire.SecurityContext + const wantSecurityContextId = 3 + if c, err := registry.GetSecurityContext(); err != nil { + t.Fatalf("GetSecurityContext: error = %v", err) + } else { + if c.ID != wantSecurityContextId { + t.Fatalf("GetSecurityContext: ID = %d, want %d", c.ID, wantSecurityContextId) + } + securityContext = c + } + if err := ctx.Roundtrip(); err != nil { + t.Fatalf("Roundtrip: error = %v", err) + } + + // none of these should change + if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { + t.Fatalf("Roundtrip: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) + } + if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { + t.Fatalf("Roundtrip: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) + } + if registry.ID != wantRegistry0.ID { + t.Fatalf("Roundtrip: ID = %d, want %d", registry.ID, wantRegistry0.ID) + } + if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { + t.Fatalf("Roundtrip: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) + } + + if err := securityContext.Create(21, 20, pipewire.SPADict{ + {Key: "pipewire.sec.engine", Value: "org.flatpak"}, + {Key: "pipewire.access", Value: "restricted"}, + }); err != nil { + t.Fatalf("SecurityContext.Create: error = %v", err) + } + if err := ctx.GetCore().Sync(); err != nil { + t.Fatalf("Sync: error = %v", err) + } + + // none of these should change + if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { + t.Fatalf("Roundtrip: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) + } + if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { + t.Fatalf("Roundtrip: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) + } + if registry.ID != wantRegistry0.ID { + t.Fatalf("Roundtrip: ID = %d, want %d", registry.ID, wantRegistry0.ID) + } + if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { + t.Fatalf("Roundtrip: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) + } + + if err := ctx.Close(); err != nil { + t.Fatalf("Close: error = %v", err) + } +} + +// stubUnixConnSample is sample data held by stubUnixConn. +type stubUnixConnSample struct { + nr uintptr + iovec string + flags uintptr + files []int + errno Errno +} + +// stubUnixConn implements [pipewire.Conn] and checks the behaviour of [pipewire.Context]. +type stubUnixConn struct { + samples []stubUnixConnSample + current int + + deadline *time.Time +} + +// checkDeadline checks whether deadline is set reasonably. +func (conn *stubUnixConn) checkDeadline() error { + if conn.deadline == nil || conn.deadline.Before(time.Now()) { + return fmt.Errorf("invalid deadline %v", conn.deadline) + } + conn.deadline = nil + return nil +} + +// nextSample returns the current sample and increments the counter. +func (conn *stubUnixConn) nextSample(nr uintptr) (sample *stubUnixConnSample, wantOOB []byte, err error) { + sample = &conn.samples[conn.current] + conn.current++ + if sample.nr != nr { + err = fmt.Errorf("unexpected syscall %d", SYS_SENDMSG) + return + } + if len(sample.files) > 0 { + wantOOB = UnixRights(sample.files...) + } + + return +} + +func (conn *stubUnixConn) ReadMsgUnix(b, oob []byte) (n, oobn, flags int, addr *net.UnixAddr, err error) { + if conn.samples[conn.current-1].nr == SYS_SENDMSG { + if err = conn.checkDeadline(); err != nil { + return + } + } + + var ( + sample *stubUnixConnSample + wantOOB []byte + ) + sample, wantOOB, err = conn.nextSample(SYS_RECVMSG) + if err != nil { + return + } + + if copy(b, sample.iovec) != len(sample.iovec) { + err = fmt.Errorf("insufficient iovec size %d, want at least %d", len(b), len(sample.iovec)) + } + if copy(oob, wantOOB) != len(wantOOB) { + err = fmt.Errorf("insufficient oob size %d, want at least %d", len(oob), len(wantOOB)) + } + + if sample.errno != 0 && sample.errno != EAGAIN { + err = sample.errno + } + return len(sample.iovec), len(wantOOB), MSG_CMSG_CLOEXEC, nil, nil +} + +func (conn *stubUnixConn) WriteMsgUnix(b, oob []byte, addr *net.UnixAddr) (n, oobn int, err error) { + if addr != nil { + err = fmt.Errorf("WriteMsgUnix called with non-nil addr: %#v", addr) + return + } + if err = conn.checkDeadline(); err != nil { + return + } + + var ( + sample *stubUnixConnSample + wantOOB []byte + ) + sample, wantOOB, err = conn.nextSample(SYS_SENDMSG) + if err != nil { + return + } + + if string(b) != sample.iovec { + err = fmt.Errorf("iovec: %#v, want %#v", b, []byte(sample.iovec)) + return + } + if string(oob[:len(wantOOB)]) != string(wantOOB) { + err = fmt.Errorf("oob: %#v, want %#v", oob[:len(wantOOB)], wantOOB) + return + } + return len(sample.iovec), len(wantOOB), nil +} + +func (conn *stubUnixConn) SetDeadline(t time.Time) error { conn.deadline = &t; return nil } + +func (conn *stubUnixConn) Close() error { + if conn.current != len(conn.samples) { + return fmt.Errorf("consumed %d samples, want %d", conn.current, len(conn.samples)) + } + return nil +} + +func TestContextErrors(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + err error + want string + }{ + {"ProxyConsumeError invalid", pipewire.ProxyConsumeError{}, "invalid proxy consume error"}, + {"ProxyConsumeError single", pipewire.ProxyConsumeError{ + stub.UniqueError(0), + }, "unique error 0 injected by the test suite"}, + {"ProxyConsumeError multiple", pipewire.ProxyConsumeError{ + stub.UniqueError(1), + stub.UniqueError(2), + stub.UniqueError(3), + stub.UniqueError(4), + stub.UniqueError(5), + stub.UniqueError(6), + stub.UniqueError(7), + }, "unique error 1 injected by the test suite; 7 additional proxy errors occurred after this point"}, + + {"ProxyFatalError", &pipewire.ProxyFatalError{ + Err: stub.UniqueError(8), + }, "unique error 8 injected by the test suite"}, + {"ProxyFatalError proxy errors", &pipewire.ProxyFatalError{ + Err: stub.UniqueError(9), + ProxyErrs: make([]error, 1<<4), + }, "unique error 9 injected by the test suite; 16 additional proxy errors occurred before this point"}, + + {"UnexpectedFileCountError", &pipewire.UnexpectedFileCountError{0, -1}, "received -1 files instead of the expected 0"}, + {"UnacknowledgedProxyError", make(pipewire.UnacknowledgedProxyError, 1<<4), "server did not acknowledge 16 proxies"}, + {"DanglingFilesError", make(pipewire.DanglingFilesError, 1<<4), "received 16 dangling files"}, + {"UnexpectedFilesError", pipewire.UnexpectedFilesError(1 << 4), "server message headers claim to have sent more than 16 files"}, + {"UnexpectedSequenceError", pipewire.UnexpectedSequenceError(1 << 4), "unexpected seq 16"}, + {"UnsupportedFooterOpcodeError", pipewire.UnsupportedFooterOpcodeError(1 << 4), "unsupported footer opcode 16"}, + + {"RoundtripUnexpectedEOFError ErrRoundtripEOFHeader", pipewire.ErrRoundtripEOFHeader, "unexpected EOF decoding message header"}, + {"RoundtripUnexpectedEOFError ErrRoundtripEOFBody", pipewire.ErrRoundtripEOFBody, "unexpected EOF establishing message body bounds"}, + {"RoundtripUnexpectedEOFError ErrRoundtripEOFFooter", pipewire.ErrRoundtripEOFFooter, "unexpected EOF establishing message footer bounds"}, + {"RoundtripUnexpectedEOFError ErrRoundtripEOFFooterOpcode", pipewire.ErrRoundtripEOFFooterOpcode, "unexpected EOF decoding message footer opcode"}, + {"RoundtripUnexpectedEOFError invalid", pipewire.RoundtripUnexpectedEOFError(0xbad), "unexpected EOF"}, + + {"UnsupportedOpcodeError", &pipewire.UnsupportedOpcodeError{ + Opcode: 0xff, + Interface: pipewire.PW_TYPE_INFO_INTERFACE_BASE + "Invalid", + }, "unsupported PipeWire:Interface:Invalid opcode 255"}, + + {"UnknownIdError", &pipewire.UnknownIdError{ + Id: -1, + Data: "\x00", + }, "unknown proxy id -1"}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + if got := tc.err.Error(); got != tc.want { + t.Errorf("Error: %q, want %q", got, tc.want) + } + }) + } +} diff --git a/internal/pipewire/pipewire.go b/internal/pipewire/pipewire.go index c3f57bf..cee5c86 100644 --- a/internal/pipewire/pipewire.go +++ b/internal/pipewire/pipewire.go @@ -14,645 +14,6 @@ // for any other uses of the protocol. package pipewire -import ( - "encoding/binary" - "fmt" - "io" - "maps" - "net" - "os" - "runtime" - "slices" - "strconv" - "syscall" - "time" -) - -// Conn is a subset of methods of [net.UnixConn] used by [Context]. -type Conn interface { - // ReadMsgUnix reads a message from c, copying the payload into b and - // the associated out-of-band data into oob. It returns the number of - // bytes copied into b, the number of bytes copied into oob, the flags - // that were set on the message and the source address of the message. - // - // Note that if len(b) == 0 and len(oob) > 0, this function will still - // read (and discard) 1 byte from the connection. - ReadMsgUnix(b, oob []byte) (n, oobn, flags int, addr *net.UnixAddr, err error) - - // WriteMsgUnix writes a message to addr via c, copying the payload - // from b and the associated out-of-band data from oob. It returns the - // number of payload and out-of-band bytes written. - // - // Note that if len(b) == 0 and len(oob) > 0, this function will still - // write 1 byte to the connection. - WriteMsgUnix(b, oob []byte, addr *net.UnixAddr) (n, oobn int, err error) - - // SetDeadline sets the read and write deadlines associated - // with the connection. It is equivalent to calling both - // SetReadDeadline and SetWriteDeadline. - // - // A deadline is an absolute time after which I/O operations - // fail instead of blocking. The deadline applies to all future - // and pending I/O, not just the immediately following call to - // Read or Write. After a deadline has been exceeded, the - // connection can be refreshed by setting a deadline in the future. - // - // If the deadline is exceeded a call to Read or Write or to other - // I/O methods will return an error that wraps os.ErrDeadlineExceeded. - // This can be tested using errors.Is(err, os.ErrDeadlineExceeded). - // The error's Timeout method will return true, but note that there - // are other possible errors for which the Timeout method will - // return true even if the deadline has not been exceeded. - // - // An idle timeout can be implemented by repeatedly extending - // the deadline after successful Read or Write calls. - // - // A zero value for t means I/O operations will not time out. - SetDeadline(t time.Time) error - - // Close closes the connection. - // Any blocked Read or Write operations will be unblocked and return errors. - Close() error -} - -// The kernel constant SCM_MAX_FD defines a limit on the number of file descriptors in the array. -// Attempting to send an array larger than this limit causes sendmsg(2) to fail with the error -// EINVAL. SCM_MAX_FD has the value 253 (or 255 before Linux 2.6.38). -const _SCM_MAX_FD = 253 - -// A Context holds state of a connection to PipeWire. -type Context struct { - // Pending message data, committed via a call to Roundtrip. - buf []byte - // Current [Header.Sequence] value, incremented every write. - sequence Int - // Current server-side [Header.Sequence] value, incremented on every event processed. - remoteSequence Int - // Proxy id associations. - proxy map[Int]eventProxy - // Newly allocated proxies pending acknowledgement from the server. - pendingIds map[Int]struct{} - // Smallest available Id for the next proxy. - nextId Int - // Server side registry generation number. - generation Long - // Pending file descriptors to be sent with the next message. - pendingFiles []int - // File count kept track of in [Header]. - headerFiles int - // Files from the server. This is discarded on every Roundtrip so eventProxy - // implementations must make sure to close them to avoid leaking fds. - // - // These are not automatically set up as [os.File] because it is impossible - // to undo the effects of os.NewFile, which can be inconvenient for some uses. - receivedFiles []int - // Pending footer value for the next outgoing message. - // Newer footers appear to simply replace the existing one. - pendingFooter KnownSize - // Pending footer value deferred to the next round trip, - // sent if pendingFooter is nil. This is for emulating upstream behaviour - deferredPendingFooter KnownSize - // Proxy for built-in core events. - core Core - // Proxy for built-in client events. - client Client - - // Passed to [Conn.ReadMsgUnix]. Not copied if sufficient for all received messages. - iovecBuf [1 << 15]byte - // Passed to [Conn.ReadMsgUnix] for ancillary messages and is never copied. - oobBuf [(_SCM_MAX_FD/2+_SCM_MAX_FD%2+2)<<3 + 1]byte - // Underlying connection, usually implemented by [net.UnixConn]. - conn Conn -} - -// GetCore returns the address of [Core] held by this [Context]. -func (ctx *Context) GetCore() *Core { return &ctx.core } - -// GetClient returns the address of [Client] held by this [Context]. -func (ctx *Context) GetClient() *Client { return &ctx.client } - -// New initialises [Context] for an already established connection and returns its address. -// The caller must not call any method of the underlying [Conn] after this function returns. -func New(conn Conn, props SPADict) (*Context, error) { - ctx := Context{conn: conn} - ctx.core.ctx = &ctx - ctx.proxy = map[Int]eventProxy{ - PW_ID_CORE: &ctx.core, - PW_ID_CLIENT: &ctx.client, - } - ctx.pendingIds = map[Int]struct{}{ - PW_ID_CLIENT: {}, - } - ctx.nextId = Int(len(ctx.proxy)) - - if err := ctx.coreHello(); err != nil { - return nil, err - } - if err := ctx.clientUpdateProperties(props); err != nil { - return nil, err - } - - return &ctx, nil -} - -// MustNew calls [New](conn, props) and panics on error. -// It is intended for use in tests with hard-coded strings. -func MustNew(conn Conn, props SPADict) *Context { - if ctx, err := New(conn, props); err != nil { - panic(err) - } else { - return ctx - } -} - -// free releases the underlying storage of buf. -func (ctx *Context) free() { ctx.buf = make([]byte, 0) } - -// queueFiles queues some file descriptors to be sent for the next message. -// It returns the offset of their index for the syscall.SCM_RIGHTS message. -func (ctx *Context) queueFiles(fds ...int) (offset Fd) { - offset = Fd(len(ctx.pendingFiles)) - ctx.pendingFiles = append(ctx.pendingFiles, fds...) - return -} - -// writeMessage appends the POD representation of v and an optional footer to buf. -func (ctx *Context) writeMessage( - Id Int, opcode byte, - v KnownSize, -) (err error) { - if ctx.pendingFooter == nil && ctx.deferredPendingFooter != nil { - ctx.pendingFooter, ctx.deferredPendingFooter = ctx.deferredPendingFooter, nil - } - - size := v.Size() - if ctx.pendingFooter != nil { - size += ctx.pendingFooter.Size() - } - if size&^SizeMax != 0 { - return ErrSizeRange - } - - ctx.buf = slices.Grow(ctx.buf, int(SizeHeader+size)) - ctx.buf = (&Header{ - ID: Id, Opcode: opcode, Size: size, - Sequence: ctx.sequence, - FileCount: Int(len(ctx.pendingFiles) - ctx.headerFiles), - }).append(ctx.buf) - ctx.headerFiles = len(ctx.pendingFiles) - ctx.buf, err = MarshalAppend(ctx.buf, v) - if err == nil && ctx.pendingFooter != nil { - ctx.buf, err = MarshalAppend(ctx.buf, ctx.pendingFooter) - ctx.pendingFooter = nil - } - ctx.sequence++ - return -} - -// newProxyId returns a newly allocated proxy Id for the specified type. -func (ctx *Context) newProxyId(proxy eventProxy, ack bool) Int { - newId := ctx.nextId - ctx.proxy[newId] = proxy - if ack { - ctx.pendingIds[newId] = struct{}{} - } - -increment: - ctx.nextId++ - - if _, ok := ctx.proxy[ctx.nextId]; ok { - goto increment - } - return newId -} - -// connTimeout is the maximum duration an I/O operation is allowed for [Conn]. -const connTimeout = 5 * time.Second - -// receiveAll receives from conn until no more data is available. -// The returned slice is valid until the next call to receiveAll. -func (ctx *Context) receiveAll() (payload []byte, err error) { - if err = ctx.conn.SetDeadline(time.Now().Add(connTimeout)); err != nil { - return - } - - var n, oobn int - ctx.receivedFiles = ctx.receivedFiles[:0] - buf := ctx.iovecBuf[:] - -recvmsg: - buf = buf[n:] - n, oobn, _, _, err = ctx.conn.ReadMsgUnix(buf, ctx.oobBuf[:]) - if err != nil { - return - } - if oobn == len(ctx.oobBuf) { - return nil, syscall.ENOMEM // unreachable - } - if oob := ctx.oobBuf[:oobn]; len(oob) > 0 { - var scm []syscall.SocketControlMessage - if scm, err = syscall.ParseSocketControlMessage(oob); err != nil { - return - } - - var fds []int - for i := range scm { - if fds, err = syscall.ParseUnixRights(&scm[i]); err != nil { - return - } - ctx.receivedFiles = append(ctx.receivedFiles, fds...) - } - } - - // receive until buffer fills or payload is depleted - if n > 0 { - goto recvmsg - } - data := ctx.iovecBuf[:len(ctx.iovecBuf)-len(buf)] - - // avoids copy if payload fits in a single ctx.recvmsgBuf - if payload == nil && len(buf) > 0 { - payload = data - return - } - - payload = append(payload, data...) - // this indicates a full ctx.recvmsgBuf - if len(buf) == 0 { - ctx.buf = ctx.iovecBuf[:] - goto recvmsg - } - - return -} - -// An UnknownIdError describes a server message with an Id unknown to [Context]. -type UnknownIdError struct { - // Offending id decoded from Data. - Id Int - // Message received from the server. - Data string -} - -func (e *UnknownIdError) Error() string { return "unknown proxy id " + strconv.Itoa(int(e.Id)) } - -// UnsupportedOpcodeError describes a message with an unsupported opcode. -type UnsupportedOpcodeError struct { - // Offending opcode. - Opcode byte - // Name of interface processed by the proxy. - Interface string -} - -func (e *UnsupportedOpcodeError) Error() string { - return "unsupported " + e.Interface + " opcode " + strconv.Itoa(int(e.Opcode)) -} - -// UnsupportedFooterOpcodeError describes a [Footer] with an unsupported opcode. -type UnsupportedFooterOpcodeError Id - -func (e UnsupportedFooterOpcodeError) Error() string { - return "unsupported footer opcode " + strconv.Itoa(int(e)) -} - -// A RoundtripUnexpectedEOFError describes an unexpected EOF encountered during [Context.Roundtrip]. -type RoundtripUnexpectedEOFError uintptr - -const ( - // ErrRoundtripEOFHeader is returned when unexpectedly encountering EOF - // decoding the message header. - ErrRoundtripEOFHeader RoundtripUnexpectedEOFError = iota - // ErrRoundtripEOFBody is returned when unexpectedly encountering EOF - // establishing message body bounds. - ErrRoundtripEOFBody - // ErrRoundtripEOFFooter is like [ErrRoundtripEOFBody], but for when establishing - // bounds for the footer instead. - ErrRoundtripEOFFooter - // ErrRoundtripEOFFooterOpcode is returned when unexpectedly encountering EOF - // during the footer opcode hack. - ErrRoundtripEOFFooterOpcode -) - -func (RoundtripUnexpectedEOFError) Unwrap() error { return io.ErrUnexpectedEOF } -func (e RoundtripUnexpectedEOFError) Error() string { - var suffix string - switch e { - case ErrRoundtripEOFHeader: - suffix = "decoding message header" - case ErrRoundtripEOFBody: - suffix = "establishing message body bounds" - case ErrRoundtripEOFFooter: - suffix = "establishing message footer bounds" - case ErrRoundtripEOFFooterOpcode: - suffix = "decoding message footer opcode" - - default: - return "unexpected EOF" - } - - return "unexpected EOF " + suffix -} - -// eventProxy consumes events during a [Context.Roundtrip]. -type eventProxy interface { - // consume consumes an event and its optional footer. - consume(opcode byte, files []int, unmarshal func(v any)) error - // setBoundProps stores a [CoreBoundProps] event received from the server. - setBoundProps(event *CoreBoundProps) error - - // Stringer returns the PipeWire interface name. - fmt.Stringer -} - -// unmarshal is like [Unmarshal] but handles footer if present. -func (ctx *Context) unmarshal(header *Header, data []byte, v any) error { - n, err := UnmarshalNext(data, v) - if err != nil { - return err - } - if len(data) < int(header.Size) || header.Size < n { - return ErrRoundtripEOFFooter - } - isLastMessage := len(data) == int(header.Size) - - data = data[n:header.Size] - if len(data) > 0 { - /* the footer concrete type is determined by opcode, which cannot be - decoded directly before the type is known, so this hack is required: - skip the struct prefix, then the integer prefix, and the next SizeId - bytes are the encoded opcode value */ - if len(data) < int(SizePrefix*2+SizeId) { - return ErrRoundtripEOFFooterOpcode - } - switch opcode := binary.NativeEndian.Uint32(data[SizePrefix*2:]); opcode { - case FOOTER_CORE_OPCODE_GENERATION: - var footer Footer[FooterCoreGeneration] - if err = Unmarshal(data, &footer); err != nil { - return err - } - if ctx.generation != footer.Payload.RegistryGeneration { - var pendingFooter = Footer[FooterClientGeneration]{ - FOOTER_CORE_OPCODE_GENERATION, - FooterClientGeneration{ClientGeneration: footer.Payload.RegistryGeneration}, - } - - // this emulates upstream behaviour that pending footer updated on the last message - // during a roundtrip is pushed back to the first message of the next roundtrip - if isLastMessage { - ctx.deferredPendingFooter = &pendingFooter - } else { - ctx.pendingFooter = &pendingFooter - } - } - ctx.generation = footer.Payload.RegistryGeneration - return nil - - default: - return UnsupportedFooterOpcodeError(opcode) - } - } - return nil -} - -// An UnexpectedSequenceError is a server-side sequence number that does not -// match its counterpart tracked by the client. This indicates that either -// the client has somehow missed events, or data being interpreted as [Header] -// is, in fact, not the message header. -type UnexpectedSequenceError Int - -func (e UnexpectedSequenceError) Error() string { return "unexpected seq " + strconv.Itoa(int(e)) } - -// An UnexpectedFilesError describes an inconsistent state where file count claimed by -// [Header] accumulates to a value greater than the total number of files received. -type UnexpectedFilesError int - -func (e UnexpectedFilesError) Error() string { - return "server message headers claim to have sent more than " + strconv.Itoa(int(e)) + " files" -} - -// A DanglingFilesError holds onto files that were sent by the server but no [Header] -// accounts for. These are closed by their finalizers if discarded. -type DanglingFilesError []*os.File - -func (e DanglingFilesError) Error() string { - return "received " + strconv.Itoa(len(e)) + " dangling files" -} - -// An UnacknowledgedProxyError holds newly allocated proxy ids that the server failed -// to acknowledge after an otherwise successful [Context.Roundtrip]. -type UnacknowledgedProxyError []Int - -func (e UnacknowledgedProxyError) Error() string { - return "server did not acknowledge " + strconv.Itoa(len(e)) + " proxies" -} - -// A ProxyFatalError describes an error that terminates event handling during a -// [Context.Roundtrip] and makes further event processing no longer possible. -type ProxyFatalError struct { - // The fatal error causing the termination of event processing. - Err error - // Previous non-fatal proxy errors. - ProxyErrs []error -} - -func (e *ProxyFatalError) Unwrap() []error { return append(e.ProxyErrs, e.Err) } -func (e *ProxyFatalError) Error() string { - s := e.Err.Error() - if len(e.ProxyErrs) > 0 { - s += "; " + strconv.Itoa(len(e.ProxyErrs)) + " additional proxy errors occurred before this point" - } - return s -} - -// A ProxyConsumeError is a collection of non-protocol errors returned by proxies -// during event processing. These do not prevent event handling from continuing but -// may be considered fatal to the application. -type ProxyConsumeError []error - -func (e ProxyConsumeError) Unwrap() []error { return e } -func (e ProxyConsumeError) Error() string { - if len(e) == 0 { - return "invalid proxy consume error" - } - - // first error is usually the most relevant one - s := e[0].Error() - if len(e) > 1 { - s += "; " + strconv.Itoa(len(e)) + " additional proxy errors occurred after this point" - } - return s -} - -// roundtripSyncID is the id passed to Context.coreSync during a [Context.Roundtrip]. -const roundtripSyncID = 0 - -// Roundtrip queues the [CoreSync] message and sends all pending messages to the server. -// -// For a non-nil error, if the error happens over the network, it has concrete type -// [net.OpError]. -func (ctx *Context) Roundtrip() (err error) { - if err = ctx.conn.SetDeadline(time.Now().Add(connTimeout)); err != nil { - return - } - if _, _, err = ctx.conn.WriteMsgUnix(ctx.buf, syscall.UnixRights(ctx.pendingFiles...), nil); err != nil { - return - } - - var ( - // this holds onto non-protocol errors encountered during event handling; - // errors that prevent event processing from continuing must be panicked - proxyErrors ProxyConsumeError - - // current position of processed events in ctx.receivedFiles, anything - // beyond this is closed if event processing is terminated - receivedHeaderFiles int - ) - defer func() { - // anything before this has already been processed and must not be closed - // here, as anything holding onto them will end up with a dangling fd that - // can be reused and cause serious problems - if len(ctx.receivedFiles) > receivedHeaderFiles { - for _, fd := range ctx.receivedFiles[receivedHeaderFiles:] { - _ = syscall.Close(fd) - } - - // this catches cases where Roundtrip somehow returns without processing - // all received files or preparing an error for dangling files, this is - // always overwritten by the fatal error being processed below or made - // inaccessible due to repanicking, so if this ends up returned to the - // caller it indicates something has gone seriously wrong in Roundtrip - if err == nil { - err = syscall.ENOTRECOVERABLE - } - } - - r := recover() - if r == nil { - return - } - - recoveredErr, ok := r.(error) - if !ok { - panic(r) - } - if recoveredErr == nil { - panic(&runtime.PanicNilError{}) - } - - err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: proxyErrors} - return - }() - - ctx.buf = ctx.buf[:0] - ctx.pendingFiles = ctx.pendingFiles[:0] - ctx.headerFiles = 0 - - var data []byte - if data, err = ctx.receiveAll(); err != nil { - return - } - - var header Header - for len(data) > 0 { - if len(data) < SizeHeader { - return ErrRoundtripEOFHeader - } - - if err = header.UnmarshalBinary(data[:SizeHeader]); err != nil { - return - } - if header.Sequence != ctx.remoteSequence { - return UnexpectedSequenceError(header.Sequence) - } - ctx.remoteSequence++ - - if len(data) < int(SizeHeader+header.Size) { - return ErrRoundtripEOFBody - } - - proxy, ok := ctx.proxy[header.ID] - if !ok { - return &UnknownIdError{header.ID, string(data[:SizeHeader+header.Size])} - } - - nextReceivedHeaderFiles := receivedHeaderFiles + int(header.FileCount) - if nextReceivedHeaderFiles > len(ctx.receivedFiles) { - return UnexpectedFilesError(len(ctx.receivedFiles)) - } - files := ctx.receivedFiles[receivedHeaderFiles:nextReceivedHeaderFiles] - receivedHeaderFiles = nextReceivedHeaderFiles - - data = data[SizeHeader:] - proxyErr := proxy.consume(header.Opcode, files, func(v any) { - if unmarshalErr := ctx.unmarshal(&header, data, v); unmarshalErr != nil { - panic(unmarshalErr) - } - }) - data = data[header.Size:] - if proxyErr != nil { - proxyErrors = append(proxyErrors, proxyErr) - } - } - - // prepared here so finalizers are set up, but should not prevent proxyErrors - // from reaching the caller as those describe the cause of these dangling fds - var danglingFiles DanglingFilesError - if len(ctx.receivedFiles) > receivedHeaderFiles { - danglingFds := ctx.receivedFiles[receivedHeaderFiles:] - // having multiple *os.File with the same fd causes serious problems - slices.Sort(danglingFds) - danglingFds = slices.Compact(danglingFds) - - danglingFiles = make(DanglingFilesError, 0, len(danglingFds)) - for _, fd := range danglingFds { - // hold these as *os.File so they are closed if this error never reaches the caller, - // or the caller discards or otherwise does not handle this error, to avoid leaking fds - danglingFiles = append(danglingFiles, os.NewFile(uintptr(fd), - "dangling fd "+strconv.Itoa(fd)+" received from PipeWire")) - } - } - - // these are checked and made available first since they describe the cause - // of so-called symptoms checked after this point; the symptoms should only - // be made available as a catch-all if these are unavailable - if len(proxyErrors) > 0 { - return proxyErrors - } - - // populated early for finalizers - if len(danglingFiles) > 0 { - return danglingFiles - } - - // this check must happen after everything else passes - if len(ctx.pendingIds) != 0 { - return UnacknowledgedProxyError(slices.Collect(maps.Keys(ctx.pendingIds))) - } - return -} - -// An UnexpectedFileCountError is returned as part of a [ProxyFatalError] for an event -// that received an unexpected number of files. -type UnexpectedFileCountError [2]int - -func (e *UnexpectedFileCountError) Error() string { - return "received " + strconv.Itoa(e[1]) + " files instead of the expected " + strconv.Itoa(e[0]) -} - -// closeReceivedFiles closes all received files and panics with [UnexpectedFileCountError] -// if one or more files are passed. This is used with events that do not expect files. -func closeReceivedFiles(fds ...int) { - for _, fd := range fds { - _ = syscall.Close(fd) - } - if len(fds) > 0 { - panic(&UnexpectedFileCountError{0, len(fds)}) - } -} - -// Close frees the underlying buffer and closes the connection. -func (ctx *Context) Close() error { ctx.free(); return ctx.conn.Close() } - /* pipewire/device.h */ const ( diff --git a/internal/pipewire/pipewire_test.go b/internal/pipewire/pipewire_test.go index f7df3ab..63b5f4f 100644 --- a/internal/pipewire/pipewire_test.go +++ b/internal/pipewire/pipewire_test.go @@ -3,816 +3,10 @@ package pipewire_test import ( _ "embed" "encoding/binary" - "fmt" - "net" - "reflect" - . "syscall" - "testing" - "time" - "hakurei.app/container/stub" "hakurei.app/internal/pipewire" ) -func TestContext(t *testing.T) { - t.Parallel() - - var ( - // Underlying connection stub holding test data. - conn = stubUnixConn{samples: []stubUnixConnSample{ - {SYS_SENDMSG, samplePWContainer00, MSG_DONTWAIT | MSG_NOSIGNAL, nil, 0}, - {SYS_RECVMSG, samplePWContainer01, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, - {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, - {SYS_SENDMSG, samplePWContainer03, MSG_DONTWAIT | MSG_NOSIGNAL, nil, 0}, - {SYS_RECVMSG, samplePWContainer04, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, - {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, - {SYS_SENDMSG, samplePWContainer06, MSG_DONTWAIT | MSG_NOSIGNAL, []int{20, 21}, 0}, - {SYS_RECVMSG, samplePWContainer07, MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, 0}, - {SYS_RECVMSG, "", MSG_DONTWAIT | MSG_CMSG_CLOEXEC, nil, EAGAIN}, - }} - - // Context instance under testing. - ctx = pipewire.MustNew(&conn, pipewire.SPADict{ - {Key: "remote.intention", Value: "manager"}, - {Key: "application.name", Value: "pw-container"}, - {Key: "application.process.binary", Value: "pw-container"}, - {Key: "application.language", Value: "en_US.UTF-8"}, - {Key: "application.process.id", Value: "1443"}, - {Key: "application.process.user", Value: "alice"}, - {Key: "application.process.host", Value: "nixos"}, - {Key: "application.process.session-id", Value: "1"}, - {Key: "window.x11.display", Value: ":0"}, - {Key: "cpu.vm.name", Value: "qemu"}, - {Key: "log.level", Value: "0"}, - {Key: "cpu.max-align", Value: "32"}, - {Key: "default.clock.rate", Value: "48000"}, - {Key: "default.clock.quantum", Value: "1024"}, - {Key: "default.clock.min-quantum", Value: "32"}, - {Key: "default.clock.max-quantum", Value: "2048"}, - {Key: "default.clock.quantum-limit", Value: "8192"}, - {Key: "default.clock.quantum-floor", Value: "4"}, - {Key: "default.video.width", Value: "640"}, - {Key: "default.video.height", Value: "480"}, - {Key: "default.video.rate.num", Value: "25"}, - {Key: "default.video.rate.denom", Value: "1"}, - {Key: "clock.power-of-two-quantum", Value: "true"}, - {Key: "link.max-buffers", Value: "64"}, - {Key: "mem.warn-mlock", Value: "false"}, - {Key: "mem.allow-mlock", Value: "true"}, - {Key: "settings.check-quantum", Value: "false"}, - {Key: "settings.check-rate", Value: "false"}, - {Key: "core.version", Value: "1.4.7"}, - {Key: "core.name", Value: "pipewire-alice-1443"}, - }) - ) - - var registry *pipewire.Registry - const wantRegistryId = 2 - if r, err := ctx.GetRegistry(); err != nil { - t.Fatalf("GetRegistry: error = %v", err) - } else { - if r.ID != wantRegistryId { - t.Fatalf("GetRegistry: ID = %d, want %d", r.ID, wantRegistryId) - } - registry = r - } - if err := ctx.GetCore().Sync(); err != nil { - t.Fatalf("Sync: error = %v", err) - } - - wantCoreInfo0 := pipewire.CoreInfo{ - ID: 0, - Cookie: -2069267610, - UserName: "alice", - HostName: "nixos", - Version: "1.4.7", - Name: "pipewire-0", - ChangeMask: pipewire.PW_CORE_CHANGE_MASK_PROPS, - Properties: &pipewire.SPADict{ - {Key: "config.name", Value: "pipewire.conf"}, - {Key: "application.name", Value: "pipewire"}, - {Key: "application.process.binary", Value: "pipewire"}, - {Key: "application.language", Value: "en_US.UTF-8"}, - {Key: "application.process.id", Value: "1446"}, - {Key: "application.process.user", Value: "alice"}, - {Key: "application.process.host", Value: "nixos"}, - {Key: "window.x11.display", Value: ":0"}, - {Key: "cpu.vm.name", Value: "qemu"}, - {Key: "link.max-buffers", Value: "16"}, - {Key: "core.daemon", Value: "true"}, - {Key: "core.name", Value: "pipewire-0"}, - {Key: "default.clock.min-quantum", Value: "1024"}, - {Key: "cpu.max-align", Value: "32"}, - {Key: "default.clock.rate", Value: "48000"}, - {Key: "default.clock.quantum", Value: "1024"}, - {Key: "default.clock.max-quantum", Value: "2048"}, - {Key: "default.clock.quantum-limit", Value: "8192"}, - {Key: "default.clock.quantum-floor", Value: "4"}, - {Key: "default.video.width", Value: "640"}, - {Key: "default.video.height", Value: "480"}, - {Key: "default.video.rate.num", Value: "25"}, - {Key: "default.video.rate.denom", Value: "1"}, - {Key: "log.level", Value: "2"}, - {Key: "clock.power-of-two-quantum", Value: "true"}, - {Key: "mem.warn-mlock", Value: "false"}, - {Key: "mem.allow-mlock", Value: "true"}, - {Key: "settings.check-quantum", Value: "false"}, - {Key: "settings.check-rate", Value: "false"}, - {Key: "object.id", Value: "0"}, - {Key: "object.serial", Value: "0"}}, - } - - wantClient0 := pipewire.Client{ - Info: &pipewire.ClientInfo{ - ID: 34, - ChangeMask: pipewire.PW_CLIENT_CHANGE_MASK_PROPS, - Properties: &pipewire.SPADict{ - {Key: "pipewire.protocol", Value: "protocol-native"}, - {Key: "core.name", Value: "pipewire-alice-1443"}, - {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, - {Key: "pipewire.sec.pid", Value: "1443"}, - {Key: "pipewire.sec.uid", Value: "1000"}, - {Key: "pipewire.sec.gid", Value: "100"}, - {Key: "module.id", Value: "2"}, - {Key: "object.id", Value: "34"}, - {Key: "object.serial", Value: "34"}, - {Key: "remote.intention", Value: "manager"}, - {Key: "application.name", Value: "pw-container"}, - {Key: "application.process.binary", Value: "pw-container"}, - {Key: "application.language", Value: "en_US.UTF-8"}, - {Key: "application.process.id", Value: "1443"}, - {Key: "application.process.user", Value: "alice"}, - {Key: "application.process.host", Value: "nixos"}, - {Key: "application.process.session-id", Value: "1"}, - {Key: "window.x11.display", Value: ":0"}, - {Key: "cpu.vm.name", Value: "qemu"}, - {Key: "log.level", Value: "0"}, - {Key: "cpu.max-align", Value: "32"}, - {Key: "default.clock.rate", Value: "48000"}, - {Key: "default.clock.quantum", Value: "1024"}, - {Key: "default.clock.min-quantum", Value: "32"}, - {Key: "default.clock.max-quantum", Value: "2048"}, - {Key: "default.clock.quantum-limit", Value: "8192"}, - {Key: "default.clock.quantum-floor", Value: "4"}, - {Key: "default.video.width", Value: "640"}, - {Key: "default.video.height", Value: "480"}, - {Key: "default.video.rate.num", Value: "25"}, - {Key: "default.video.rate.denom", Value: "1"}, - {Key: "clock.power-of-two-quantum", Value: "true"}, - {Key: "link.max-buffers", Value: "64"}, - {Key: "mem.warn-mlock", Value: "false"}, - {Key: "mem.allow-mlock", Value: "true"}, - {Key: "settings.check-quantum", Value: "false"}, - {Key: "settings.check-rate", Value: "false"}, - {Key: "core.version", Value: "1.4.7"}, - {Key: "pipewire.access", Value: "unrestricted"}, - }, - }, - Properties: pipewire.SPADict{ - {Key: "object.serial", Value: "34"}, - {Key: "module.id", Value: "2"}, - {Key: "pipewire.protocol", Value: "protocol-native"}, - {Key: "pipewire.sec.pid", Value: "1443"}, - {Key: "pipewire.sec.uid", Value: "1000"}, - {Key: "pipewire.sec.gid", Value: "100"}, - {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, - }, - } - - wantRegistry0 := pipewire.Registry{ - ID: wantRegistryId, - Objects: map[pipewire.Int]pipewire.RegistryGlobal{ - pipewire.PW_ID_CORE: { - ID: pipewire.PW_ID_CORE, - Permissions: pipewire.PW_CORE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Core, - Version: pipewire.PW_VERSION_CORE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "0"}, - {Key: "core.name", Value: "pipewire-0"}, - }, - }, - - 1: { - ID: 1, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "1"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-rt"}, - }, - }, - - 3: { - ID: 3, - Permissions: pipewire.PW_SECURITY_CONTEXT_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_SecurityContext, - Version: pipewire.PW_VERSION_SECURITY_CONTEXT, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "3"}, - }, - }, - - 2: { - ID: 2, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "2"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-protocol-native"}, - }, - }, - - 5: { - ID: 5, - Permissions: pipewire.PW_PROFILER_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Profiler, - Version: pipewire.PW_VERSION_PROFILER, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "5"}, - }, - }, - - 4: { - ID: 4, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "4"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-profiler"}, - }, - }, - - 6: { - ID: 6, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "6"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-metadata"}, - }, - }, - - 7: { - ID: 7, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "7"}, - {Key: "module.id", Value: "6"}, - {Key: "factory.name", Value: "metadata"}, - {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Metadata}, - {Key: "factory.type.version", Value: "3"}, - }, - }, - - 8: { - ID: 8, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "8"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-spa-device-factory"}, - }, - }, - - 9: { - ID: 9, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "9"}, - {Key: "module.id", Value: "8"}, - {Key: "factory.name", Value: "spa-device-factory"}, - {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Device}, - {Key: "factory.type.version", Value: "3"}, - }, - }, - - 10: { - ID: 10, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "10"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-spa-node-factory"}, - }, - }, - - 11: { - ID: 11, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "11"}, - {Key: "module.id", Value: "10"}, - {Key: "factory.name", Value: "spa-node-factory"}, - {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Node}, - {Key: "factory.type.version", Value: "3"}, - }, - }, - - 12: { - ID: 12, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "12"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-client-node"}, - }, - }, - - 13: { - ID: 13, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "13"}, - {Key: "module.id", Value: "12"}, - {Key: "factory.name", Value: "client-node"}, - {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_ClientNode}, - {Key: "factory.type.version", Value: "6"}, - }, - }, - - 14: { - ID: 14, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "14"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-client-device"}, - }, - }, - - 15: { - ID: 15, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "15"}, - {Key: "module.id", Value: "14"}, - {Key: "factory.name", Value: "client-device"}, - {Key: "factory.type.name", Value: "Spa:Pointer:Interface:Device"}, - {Key: "factory.type.version", Value: "0"}, - }, - }, - - 16: { - ID: 16, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "16"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-portal"}, - }, - }, - - 17: { - ID: 17, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "17"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-access"}, - }, - }, - - 18: { - ID: 18, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "18"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-adapter"}, - }, - }, - - 19: { - ID: 19, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "19"}, - {Key: "module.id", Value: "18"}, - {Key: "factory.name", Value: "adapter"}, - {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Node}, - {Key: "factory.type.version", Value: "3"}, - }, - }, - - 20: { - ID: 20, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "20"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-link-factory"}, - }, - }, - - 21: { - ID: 21, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "21"}, - {Key: "module.id", Value: "20"}, - {Key: "factory.name", Value: "link-factory"}, - {Key: "factory.type.name", Value: pipewire.PW_TYPE_INTERFACE_Link}, - {Key: "factory.type.version", Value: "3"}, - }, - }, - - 22: { - ID: 22, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "22"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-session-manager"}, - }, - }, - - 23: { - ID: 23, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "23"}, - {Key: "module.id", Value: "22"}, - {Key: "factory.name", Value: "client-endpoint"}, - {Key: "factory.type.name", Value: "PipeWire:Interface:ClientEndpoint"}, - {Key: "factory.type.version", Value: "0"}, - }, - }, - - 24: { - ID: 24, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "24"}, - {Key: "module.id", Value: "22"}, - {Key: "factory.name", Value: "client-session"}, - {Key: "factory.type.name", Value: "PipeWire:Interface:ClientSession"}, - {Key: "factory.type.version", Value: "0"}, - }, - }, - - 25: { - ID: 25, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "25"}, - {Key: "module.id", Value: "22"}, - {Key: "factory.name", Value: "session"}, - {Key: "factory.type.name", Value: "PipeWire:Interface:Session"}, - {Key: "factory.type.version", Value: "0"}, - }, - }, - - 26: { - ID: 26, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "26"}, - {Key: "module.id", Value: "22"}, - {Key: "factory.name", Value: "endpoint"}, - {Key: "factory.type.name", Value: "PipeWire:Interface:Endpoint"}, - {Key: "factory.type.version", Value: "0"}, - }, - }, - - 27: { - ID: 27, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "27"}, - {Key: "module.id", Value: "22"}, - {Key: "factory.name", Value: "endpoint-stream"}, - {Key: "factory.type.name", Value: "PipeWire:Interface:EndpointStream"}, - {Key: "factory.type.version", Value: "0"}, - }, - }, - - 28: { - ID: 28, - Permissions: pipewire.PW_FACTORY_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Factory, - Version: pipewire.PW_VERSION_FACTORY, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "28"}, - {Key: "module.id", Value: "22"}, - {Key: "factory.name", Value: "endpoint-link"}, - {Key: "factory.type.name", Value: "PipeWire:Interface:EndpointLink"}, - {Key: "factory.type.version", Value: "0"}, - }, - }, - - 29: { - ID: 29, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "29"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-x11-bell"}, - }, - }, - - 30: { - ID: 30, - Permissions: pipewire.PW_MODULE_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Module, - Version: pipewire.PW_VERSION_MODULE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "30"}, - {Key: "module.name", Value: pipewire.PIPEWIRE_MODULE_PREFIX + "module-jackdbus-detect"}, - }, - }, - - 31: { - ID: 31, - Permissions: pipewire.PW_PERM_RWXM, // why is this not PW_NODE_PERM_MASK? - Type: pipewire.PW_TYPE_INTERFACE_Node, - Version: pipewire.PW_VERSION_NODE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "31"}, - {Key: "factory.id", Value: "11"}, - {Key: "priority.driver", Value: "200000"}, - {Key: "node.name", Value: "Dummy-Driver"}, - }, - }, - - 32: { - ID: 32, - Permissions: pipewire.PW_PERM_RWXM, // why is this not PW_NODE_PERM_MASK? - Type: pipewire.PW_TYPE_INTERFACE_Node, - Version: pipewire.PW_VERSION_NODE, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "32"}, - {Key: "factory.id", Value: "11"}, - {Key: "priority.driver", Value: "190000"}, - {Key: "node.name", Value: "Freewheel-Driver"}, - }, - }, - - 33: { - ID: 33, - Permissions: pipewire.PW_METADATA_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Metadata, - Version: pipewire.PW_VERSION_METADATA, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "33"}, - {Key: "metadata.name", Value: "settings"}, - }, - }, - - 34: { - ID: 34, - Permissions: pipewire.PW_CLIENT_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Client, - Version: pipewire.PW_VERSION_CLIENT, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "34"}, - {Key: "module.id", Value: "2"}, - {Key: "pipewire.protocol", Value: "protocol-native"}, - {Key: "pipewire.sec.pid", Value: "1443"}, - {Key: "pipewire.sec.uid", Value: "1000"}, - {Key: "pipewire.sec.gid", Value: "100"}, - {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, - {Key: "pipewire.access", Value: "unrestricted"}, - {Key: "application.name", Value: "pw-container"}, - }, - }, - - 35: { - ID: 35, - Permissions: pipewire.PW_CLIENT_PERM_MASK, - Type: pipewire.PW_TYPE_INTERFACE_Client, - Version: pipewire.PW_VERSION_CLIENT, - Properties: &pipewire.SPADict{ - {Key: "object.serial", Value: "35"}, - {Key: "module.id", Value: "2"}, - {Key: "pipewire.protocol", Value: "protocol-native"}, - {Key: "pipewire.sec.pid", Value: "1447"}, - {Key: "pipewire.sec.uid", Value: "1000"}, - {Key: "pipewire.sec.gid", Value: "100"}, - {Key: "pipewire.sec.socket", Value: "pipewire-0-manager"}, - {Key: "pipewire.access", Value: "unrestricted"}, - {Key: "application.name", Value: "WirePlumber"}, - }, - }, - }, - } - - if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { - t.Fatalf("New: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) - } - if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { - t.Fatalf("New: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) - } - if registry.ID != wantRegistry0.ID { - t.Fatalf("GetRegistry: ID = %d, want %d", registry.ID, wantRegistry0.ID) - } - if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { - t.Fatalf("GetRegistry: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) - } - - var securityContext *pipewire.SecurityContext - const wantSecurityContextId = 3 - if c, err := registry.GetSecurityContext(); err != nil { - t.Fatalf("GetSecurityContext: error = %v", err) - } else { - if c.ID != wantSecurityContextId { - t.Fatalf("GetSecurityContext: ID = %d, want %d", c.ID, wantSecurityContextId) - } - securityContext = c - } - if err := ctx.Roundtrip(); err != nil { - t.Fatalf("Roundtrip: error = %v", err) - } - - // none of these should change - if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { - t.Fatalf("Roundtrip: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) - } - if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { - t.Fatalf("Roundtrip: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) - } - if registry.ID != wantRegistry0.ID { - t.Fatalf("Roundtrip: ID = %d, want %d", registry.ID, wantRegistry0.ID) - } - if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { - t.Fatalf("Roundtrip: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) - } - - if err := securityContext.Create(21, 20, pipewire.SPADict{ - {Key: "pipewire.sec.engine", Value: "org.flatpak"}, - {Key: "pipewire.access", Value: "restricted"}, - }); err != nil { - t.Fatalf("SecurityContext.Create: error = %v", err) - } - if err := ctx.GetCore().Sync(); err != nil { - t.Fatalf("Sync: error = %v", err) - } - - // none of these should change - if coreInfo := ctx.GetCore().Info; !reflect.DeepEqual(coreInfo, &wantCoreInfo0) { - t.Fatalf("Roundtrip: CoreInfo = %s, want %s", mustMarshalJSON(coreInfo), mustMarshalJSON(&wantCoreInfo0)) - } - if client := ctx.GetClient(); !reflect.DeepEqual(client, &wantClient0) { - t.Fatalf("Roundtrip: Client = %s, want %s", mustMarshalJSON(client), mustMarshalJSON(&wantClient0)) - } - if registry.ID != wantRegistry0.ID { - t.Fatalf("Roundtrip: ID = %d, want %d", registry.ID, wantRegistry0.ID) - } - if !reflect.DeepEqual(registry.Objects, wantRegistry0.Objects) { - t.Fatalf("Roundtrip: Objects = %s, want %s", mustMarshalJSON(registry.Objects), mustMarshalJSON(wantRegistry0.Objects)) - } - - if err := ctx.Close(); err != nil { - t.Fatalf("Close: error = %v", err) - } -} - -// stubUnixConnSample is sample data held by stubUnixConn. -type stubUnixConnSample struct { - nr uintptr - iovec string - flags uintptr - files []int - errno Errno -} - -// stubUnixConn implements [pipewire.Conn] and checks the behaviour of [pipewire.Context]. -type stubUnixConn struct { - samples []stubUnixConnSample - current int - - deadline *time.Time -} - -// checkDeadline checks whether deadline is set reasonably. -func (conn *stubUnixConn) checkDeadline() error { - if conn.deadline == nil || conn.deadline.Before(time.Now()) { - return fmt.Errorf("invalid deadline %v", conn.deadline) - } - conn.deadline = nil - return nil -} - -// nextSample returns the current sample and increments the counter. -func (conn *stubUnixConn) nextSample(nr uintptr) (sample *stubUnixConnSample, wantOOB []byte, err error) { - sample = &conn.samples[conn.current] - conn.current++ - if sample.nr != nr { - err = fmt.Errorf("unexpected syscall %d", SYS_SENDMSG) - return - } - if len(sample.files) > 0 { - wantOOB = UnixRights(sample.files...) - } - - return -} - -func (conn *stubUnixConn) ReadMsgUnix(b, oob []byte) (n, oobn, flags int, addr *net.UnixAddr, err error) { - if conn.samples[conn.current-1].nr == SYS_SENDMSG { - if err = conn.checkDeadline(); err != nil { - return - } - } - - var ( - sample *stubUnixConnSample - wantOOB []byte - ) - sample, wantOOB, err = conn.nextSample(SYS_RECVMSG) - if err != nil { - return - } - - if copy(b, sample.iovec) != len(sample.iovec) { - err = fmt.Errorf("insufficient iovec size %d, want at least %d", len(b), len(sample.iovec)) - } - if copy(oob, wantOOB) != len(wantOOB) { - err = fmt.Errorf("insufficient oob size %d, want at least %d", len(oob), len(wantOOB)) - } - - if sample.errno != 0 && sample.errno != EAGAIN { - err = sample.errno - } - return len(sample.iovec), len(wantOOB), MSG_CMSG_CLOEXEC, nil, nil -} - -func (conn *stubUnixConn) WriteMsgUnix(b, oob []byte, addr *net.UnixAddr) (n, oobn int, err error) { - if addr != nil { - err = fmt.Errorf("WriteMsgUnix called with non-nil addr: %#v", addr) - return - } - if err = conn.checkDeadline(); err != nil { - return - } - - var ( - sample *stubUnixConnSample - wantOOB []byte - ) - sample, wantOOB, err = conn.nextSample(SYS_SENDMSG) - if err != nil { - return - } - - if string(b) != sample.iovec { - err = fmt.Errorf("iovec: %#v, want %#v", b, []byte(sample.iovec)) - return - } - if string(oob[:len(wantOOB)]) != string(wantOOB) { - err = fmt.Errorf("oob: %#v, want %#v", oob[:len(wantOOB)], wantOOB) - return - } - return len(sample.iovec), len(wantOOB), nil -} - -func (conn *stubUnixConn) SetDeadline(t time.Time) error { conn.deadline = &t; return nil } - -func (conn *stubUnixConn) Close() error { - if conn.current != len(conn.samples) { - return fmt.Errorf("consumed %d samples, want %d", conn.current, len(conn.samples)) - } - return nil -} - var ( //go:embed testdata/pw-container-00-sendmsg samplePWContainer00 string @@ -863,67 +57,3 @@ func splitMessages(iovec string) (messages [][3][]byte) { } return } - -func TestContextErrors(t *testing.T) { - t.Parallel() - - testCases := []struct { - name string - err error - want string - }{ - {"ProxyConsumeError invalid", pipewire.ProxyConsumeError{}, "invalid proxy consume error"}, - {"ProxyConsumeError single", pipewire.ProxyConsumeError{ - stub.UniqueError(0), - }, "unique error 0 injected by the test suite"}, - {"ProxyConsumeError multiple", pipewire.ProxyConsumeError{ - stub.UniqueError(1), - stub.UniqueError(2), - stub.UniqueError(3), - stub.UniqueError(4), - stub.UniqueError(5), - stub.UniqueError(6), - stub.UniqueError(7), - }, "unique error 1 injected by the test suite; 7 additional proxy errors occurred after this point"}, - - {"ProxyFatalError", &pipewire.ProxyFatalError{ - Err: stub.UniqueError(8), - }, "unique error 8 injected by the test suite"}, - {"ProxyFatalError proxy errors", &pipewire.ProxyFatalError{ - Err: stub.UniqueError(9), - ProxyErrs: make([]error, 1<<4), - }, "unique error 9 injected by the test suite; 16 additional proxy errors occurred before this point"}, - - {"UnexpectedFileCountError", &pipewire.UnexpectedFileCountError{0, -1}, "received -1 files instead of the expected 0"}, - {"UnacknowledgedProxyError", make(pipewire.UnacknowledgedProxyError, 1<<4), "server did not acknowledge 16 proxies"}, - {"DanglingFilesError", make(pipewire.DanglingFilesError, 1<<4), "received 16 dangling files"}, - {"UnexpectedFilesError", pipewire.UnexpectedFilesError(1 << 4), "server message headers claim to have sent more than 16 files"}, - {"UnexpectedSequenceError", pipewire.UnexpectedSequenceError(1 << 4), "unexpected seq 16"}, - {"UnsupportedFooterOpcodeError", pipewire.UnsupportedFooterOpcodeError(1 << 4), "unsupported footer opcode 16"}, - - {"RoundtripUnexpectedEOFError ErrRoundtripEOFHeader", pipewire.ErrRoundtripEOFHeader, "unexpected EOF decoding message header"}, - {"RoundtripUnexpectedEOFError ErrRoundtripEOFBody", pipewire.ErrRoundtripEOFBody, "unexpected EOF establishing message body bounds"}, - {"RoundtripUnexpectedEOFError ErrRoundtripEOFFooter", pipewire.ErrRoundtripEOFFooter, "unexpected EOF establishing message footer bounds"}, - {"RoundtripUnexpectedEOFError ErrRoundtripEOFFooterOpcode", pipewire.ErrRoundtripEOFFooterOpcode, "unexpected EOF decoding message footer opcode"}, - {"RoundtripUnexpectedEOFError invalid", pipewire.RoundtripUnexpectedEOFError(0xbad), "unexpected EOF"}, - - {"UnsupportedOpcodeError", &pipewire.UnsupportedOpcodeError{ - Opcode: 0xff, - Interface: pipewire.PW_TYPE_INFO_INTERFACE_BASE + "Invalid", - }, "unsupported PipeWire:Interface:Invalid opcode 255"}, - - {"UnknownIdError", &pipewire.UnknownIdError{ - Id: -1, - Data: "\x00", - }, "unknown proxy id -1"}, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - if got := tc.err.Error(); got != tc.want { - t.Errorf("Error: %q, want %q", got, tc.want) - } - }) - } -}