From b9b9705b520cd3effaba3dc5e94c83e36f655bc4 Mon Sep 17 00:00:00 2001 From: Ophestra Date: Sun, 7 Dec 2025 13:54:11 +0900 Subject: [PATCH] internal/pipewire: specify opcode and file count with message This adds checking of FileCount while writing a message. Message encoding is relocated to an exported method to be used externally, probably for test stubbing. Signed-off-by: Ophestra --- internal/pipewire/client.go | 13 ++++- internal/pipewire/core.go | 79 ++++++++++++++++++++++++++-- internal/pipewire/pipewire.go | 40 ++++++-------- internal/pipewire/pod.go | 46 ++++++++++++++++ internal/pipewire/securitycontext.go | 7 ++- 5 files changed, 156 insertions(+), 29 deletions(-) diff --git a/internal/pipewire/client.go b/internal/pipewire/client.go index 8e901d8..faa114b 100644 --- a/internal/pipewire/client.go +++ b/internal/pipewire/client.go @@ -46,6 +46,12 @@ type ClientInfo struct { Properties *SPADict `json:"props"` } +// Opcode satisfies [Message] with a constant value. +func (c *ClientInfo) Opcode() byte { return PW_CLIENT_EVENT_INFO } + +// FileCount satisfies [Message] with a constant value. +func (c *ClientInfo) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a value computed at runtime. func (c *ClientInfo) Size() Word { return SizePrefix + @@ -66,6 +72,12 @@ type ClientUpdateProperties struct { Properties *SPADict `json:"props"` } +// Opcode satisfies [Message] with a constant value. +func (c *ClientUpdateProperties) Opcode() byte { return PW_CLIENT_METHOD_UPDATE_PROPERTIES } + +// FileCount satisfies [Message] with a constant value. +func (c *ClientUpdateProperties) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a value computed at runtime. func (c *ClientUpdateProperties) Size() Word { return SizePrefix + c.Properties.Size() } @@ -80,7 +92,6 @@ func (c *ClientUpdateProperties) UnmarshalBinary(data []byte) error { return Unm func (ctx *Context) clientUpdateProperties(props SPADict) error { return ctx.writeMessage( PW_ID_CLIENT, - PW_CLIENT_METHOD_UPDATE_PROPERTIES, &ClientUpdateProperties{&props}, ) } diff --git a/internal/pipewire/core.go b/internal/pipewire/core.go index 6a8af0b..35340c8 100644 --- a/internal/pipewire/core.go +++ b/internal/pipewire/core.go @@ -133,6 +133,12 @@ type CoreInfo struct { Properties *SPADict `json:"props"` } +// Opcode satisfies [Message] with a constant value. +func (c *CoreInfo) Opcode() byte { return PW_CORE_EVENT_INFO } + +// FileCount satisfies [Message] with a constant value. +func (c *CoreInfo) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a value computed at runtime. func (c *CoreInfo) Size() Word { return SizePrefix + @@ -160,6 +166,12 @@ type CoreDone struct { Sequence Int `json:"seq"` } +// Opcode satisfies [Message] with a constant value. +func (c *CoreDone) Opcode() byte { return PW_CORE_EVENT_DONE } + +// FileCount satisfies [Message] with a constant value. +func (c *CoreDone) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a constant value. func (c *CoreDone) Size() Word { return SizePrefix + Size(SizeInt) + Size(SizeInt) } @@ -179,6 +191,12 @@ type CorePing struct { Sequence Int `json:"seq"` } +// Opcode satisfies [Message] with a constant value. +func (c *CorePing) Opcode() byte { return PW_CORE_EVENT_PING } + +// FileCount satisfies [Message] with a constant value. +func (c *CorePing) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a constant value. func (c *CorePing) Size() Word { return SizePrefix + Size(SizeInt) + Size(SizeInt) } @@ -209,6 +227,9 @@ type CoreError struct { Message String `json:"message"` } +// FileCount satisfies [Message] with a constant value. +func (c *CoreError) FileCount() Int { return 0 } + func (c *CoreError) Error() string { return "received Core::Error on" + " id " + strconv.Itoa(int(c.ID)) + @@ -232,6 +253,18 @@ func (c *CoreError) MarshalBinary() ([]byte, error) { return Marshal(c) } // UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. func (c *CoreError) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } +// CoreErrorMethod is [CoreError] as a method [Message]. +type CoreErrorMethod struct{ CoreError } + +// Opcode satisfies [Message] with a constant value. +func (c *CoreErrorMethod) Opcode() byte { return PW_CORE_METHOD_ERROR } + +// CoreErrorEvent is [CoreError] as an event [Message]. +type CoreErrorEvent struct{ CoreError } + +// Opcode satisfies [Message] with a constant value. +func (c *CoreErrorEvent) Opcode() byte { return PW_CORE_EVENT_ERROR } + // The CoreBoundProps event is emitted when a local object ID is bound to a global ID. // It is emitted before the global becomes visible in the registry. type CoreBoundProps struct { @@ -243,6 +276,12 @@ type CoreBoundProps struct { Properties *SPADict `json:"props"` } +// Opcode satisfies [Message] with a constant value. +func (c *CoreBoundProps) Opcode() byte { return PW_CORE_EVENT_BOUND_PROPS } + +// FileCount satisfies [Message] with a constant value. +func (c *CoreBoundProps) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a value computed at runtime. func (c *CoreBoundProps) Size() Word { return SizePrefix + @@ -294,6 +333,12 @@ type CoreHello struct { Version Int `json:"version"` } +// Opcode satisfies [Message] with a constant value. +func (c *CoreHello) Opcode() byte { return PW_CORE_METHOD_HELLO } + +// FileCount satisfies [Message] with a constant value. +func (c *CoreHello) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a constant value. func (c *CoreHello) Size() Word { return SizePrefix + Size(SizeInt) } @@ -308,7 +353,6 @@ func (c *CoreHello) UnmarshalBinary(data []byte) error { return Unmarshal(data, func (ctx *Context) coreHello() error { return ctx.writeMessage( PW_ID_CORE, - PW_CORE_METHOD_HELLO, &CoreHello{PW_VERSION_CORE}, ) } @@ -328,6 +372,12 @@ type CoreSync struct { Sequence Int `json:"seq"` } +// Opcode satisfies [Message] with a constant value. +func (c *CoreSync) Opcode() byte { return PW_CORE_METHOD_SYNC } + +// FileCount satisfies [Message] with a constant value. +func (c *CoreSync) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a constant value. func (c *CoreSync) Size() Word { return SizePrefix + Size(SizeInt) + Size(SizeInt) } @@ -342,7 +392,6 @@ func (c *CoreSync) UnmarshalBinary(data []byte) error { return Unmarshal(data, c func (ctx *Context) coreSync(id Int) error { return ctx.writeMessage( PW_ID_CORE, - PW_CORE_METHOD_SYNC, &CoreSync{id, CoreSyncSequenceOffset + Int(ctx.sequence)}, ) } @@ -390,6 +439,12 @@ type CorePong struct { Sequence Int `json:"seq"` } +// Opcode satisfies [Message] with a constant value. +func (c *CorePong) Opcode() byte { return PW_CORE_METHOD_PONG } + +// FileCount satisfies [Message] with a constant value. +func (c *CorePong) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a constant value. func (c *CorePong) Size() Word { return SizePrefix + Size(SizeInt) + Size(SizeInt) } @@ -414,6 +469,12 @@ type CoreGetRegistry struct { NewID Int `json:"new_id"` } +// Opcode satisfies [Message] with a constant value. +func (c *CoreGetRegistry) Opcode() byte { return PW_CORE_METHOD_GET_REGISTRY } + +// FileCount satisfies [Message] with a constant value. +func (c *CoreGetRegistry) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a constant value. func (c *CoreGetRegistry) Size() Word { return SizePrefix + Size(SizeInt) + Size(SizeInt) } @@ -431,7 +492,6 @@ func (ctx *Context) GetRegistry() (*Registry, error) { registry.ID = newId return ®istry, ctx.writeMessage( PW_ID_CORE, - PW_CORE_METHOD_GET_REGISTRY, &CoreGetRegistry{PW_VERSION_REGISTRY, newId}, ) } @@ -450,6 +510,12 @@ type RegistryGlobal struct { Properties *SPADict `json:"props"` } +// Opcode satisfies [Message] with a constant value. +func (c *RegistryGlobal) Opcode() byte { return PW_REGISTRY_EVENT_GLOBAL } + +// FileCount satisfies [Message] with a constant value. +func (c *RegistryGlobal) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a value computed at runtime. func (c *RegistryGlobal) Size() Word { return SizePrefix + @@ -481,6 +547,12 @@ type RegistryBind struct { NewID Int `json:"new_id"` } +// Opcode satisfies [Message] with a constant value. +func (c *RegistryBind) Opcode() byte { return PW_REGISTRY_METHOD_BIND } + +// FileCount satisfies [Message] with a constant value. +func (c *RegistryBind) FileCount() Int { return 0 } + // Size satisfies [KnownSize] with a value computed at runtime. func (c *RegistryBind) Size() Word { return SizePrefix + @@ -507,7 +579,6 @@ func (registry *Registry) bind(proxy eventProxy, id, version Int) (Int, error) { } return bind.NewID, registry.ctx.writeMessage( registry.ID, - PW_REGISTRY_METHOD_BIND, &bind, ) } diff --git a/internal/pipewire/pipewire.go b/internal/pipewire/pipewire.go index 65178bf..d17edf9 100644 --- a/internal/pipewire/pipewire.go +++ b/internal/pipewire/pipewire.go @@ -191,36 +191,30 @@ func (ctx *Context) queueFiles(fds ...int) (offset Fd) { return } +// InconsistentFilesError describes an implementation error where an incorrect amount +// of files is queued between two messages. +type InconsistentFilesError [2]Int + +func (e *InconsistentFilesError) Error() string { + return "queued " + strconv.Itoa(int(e[0])) + " files instead of the expected " + strconv.Itoa(int(e[1])) +} + // writeMessage appends the POD representation of v and an optional footer to buf. -func (ctx *Context) writeMessage( - Id Int, opcode byte, - v KnownSize, -) (err error) { +func (ctx *Context) writeMessage(Id Int, v Message) (err error) { + if fileCount := Int(len(ctx.pendingFiles) - ctx.headerFiles); fileCount != v.FileCount() { + return &InconsistentFilesError{fileCount, v.FileCount()} + } + if ctx.pendingFooter == nil && ctx.deferredPendingFooter != nil { ctx.pendingFooter, ctx.deferredPendingFooter = ctx.deferredPendingFooter, nil } - size := v.Size() - if ctx.pendingFooter != nil { - size += ctx.pendingFooter.Size() - } - if size&^SizeMax != 0 { - return ErrSizeRange - } - - ctx.buf = slices.Grow(ctx.buf, int(SizeHeader+size)) - ctx.buf = (&Header{ - ID: Id, Opcode: opcode, Size: size, - Sequence: ctx.sequence, - FileCount: Int(len(ctx.pendingFiles) - ctx.headerFiles), - }).append(ctx.buf) - ctx.headerFiles = len(ctx.pendingFiles) - ctx.buf, err = MarshalAppend(ctx.buf, v) - if err == nil && ctx.pendingFooter != nil { - ctx.buf, err = MarshalAppend(ctx.buf, ctx.pendingFooter) + ctx.buf, err = MessageEncoder{v}.AppendMessage(ctx.buf, Id, ctx.sequence, ctx.pendingFooter) + if err == nil { + ctx.headerFiles = len(ctx.pendingFiles) ctx.pendingFooter = nil + ctx.sequence++ } - ctx.sequence++ return } diff --git a/internal/pipewire/pod.go b/internal/pipewire/pod.go index 523322f..15ff25c 100644 --- a/internal/pipewire/pod.go +++ b/internal/pipewire/pod.go @@ -5,6 +5,7 @@ import ( "io" "math" "reflect" + "slices" "strconv" ) @@ -573,3 +574,48 @@ func (d *SPADict) UnmarshalPOD(data []byte) (Word, error) { } return wireSize, nil } + +// A Message is a value that can be transmitted as a message over PipeWire protocol native. +type Message interface { + // Opcode returns the opcode of this message. + Opcode() byte + // FileCount returns the number of files associated with this message. + FileCount() Int + + KnownSize +} + +// A MessageEncoder provides methods for encoding a [Message]. +type MessageEncoder struct{ Message } + +// SizeMessage returns the size of Message transmitted over protocol native. +func (m MessageEncoder) SizeMessage(footer KnownSize) (size Word) { + size = SizeHeader + m.Message.Size() + if footer != nil { + size += footer.Size() + } + return +} + +// AppendMessage appends the protocol native encoding of Message to dst and returns the appended slice. +func (m MessageEncoder) AppendMessage(dst []byte, Id, sequence Int, footer KnownSize) (data []byte, err error) { + size := m.SizeMessage(footer) + if size&^SizeMax != 0 { + return dst, ErrSizeRange + } + + data = slices.Grow(dst, int(size)) + data = (&Header{ + ID: Id, + Opcode: m.Message.Opcode(), + Size: size - SizeHeader, + Sequence: sequence, + FileCount: m.Message.FileCount(), + }).append(data) + data, err = MarshalAppend(data, m.Message) + if err == nil && footer != nil { + data, err = MarshalAppend(data, footer) + } + + return +} diff --git a/internal/pipewire/securitycontext.go b/internal/pipewire/securitycontext.go index f07d5dc..107fe99 100644 --- a/internal/pipewire/securitycontext.go +++ b/internal/pipewire/securitycontext.go @@ -61,6 +61,12 @@ type SecurityContextCreate struct { Properties *SPADict `json:"props"` } +// Opcode satisfies [Message] with a constant value. +func (c *SecurityContextCreate) Opcode() byte { return PW_SECURITY_CONTEXT_METHOD_CREATE } + +// FileCount satisfies [Message] with a constant value. +func (c *SecurityContextCreate) FileCount() Int { return 2 } + // Size satisfies [KnownSize] with a value computed at runtime. func (c *SecurityContextCreate) Size() Word { return SizePrefix + @@ -106,7 +112,6 @@ func (securityContext *SecurityContext) Create(listenFd, closeFd int, props SPAD offset := securityContext.ctx.queueFiles(closeFd, listenFd) return securityContext.ctx.writeMessage( securityContext.ID, - PW_SECURITY_CONTEXT_METHOD_CREATE, &SecurityContextCreate{ListenFd: offset + 1, CloseFd: offset + 0, Properties: &props}, ) }