From b5999b88140eb437e87580848513dc1e74bc190b Mon Sep 17 00:00:00 2001 From: Ophestra Date: Sat, 13 Dec 2025 21:49:28 +0900 Subject: [PATCH] internal/pipewire: implement Core::RemoveId This is emitted by the server when a proxy id is removed for any reason. Currently, the only path for this to be emitted is when a global object is destroyed while some proxy is still bound to it. Signed-off-by: Ophestra --- internal/pipewire/client.go | 2 + internal/pipewire/core.go | 112 ++++++++++++++++++++++++--- internal/pipewire/core_test.go | 20 ++++- internal/pipewire/pipewire.go | 3 + internal/pipewire/securitycontext.go | 16 +++- 5 files changed, 140 insertions(+), 13 deletions(-) diff --git a/internal/pipewire/client.go b/internal/pipewire/client.go index 662a24a..96beeba 100644 --- a/internal/pipewire/client.go +++ b/internal/pipewire/client.go @@ -103,6 +103,8 @@ type Client struct { // Populated by [CoreBoundProps] events targeting [Client]. Properties SPADict `json:"props"` + + noRemove } func (client *Client) consume(opcode byte, files []int, unmarshal func(v any)) error { diff --git a/internal/pipewire/core.go b/internal/pipewire/core.go index f6199c4..3d103d6 100644 --- a/internal/pipewire/core.go +++ b/internal/pipewire/core.go @@ -40,13 +40,14 @@ const ( PW_CORE_EVENT_ADD_MEM PW_CORE_EVENT_REMOVE_MEM PW_CORE_EVENT_BOUND_PROPS - PW_CORE_EVENT_NUM + PW_CORE_EVENT_NUM PW_VERSION_CORE_EVENTS = 1 ) const ( PW_CORE_METHOD_ADD_LISTENER = iota + PW_CORE_METHOD_HELLO PW_CORE_METHOD_SYNC PW_CORE_METHOD_PONG @@ -54,25 +55,26 @@ const ( PW_CORE_METHOD_GET_REGISTRY PW_CORE_METHOD_CREATE_OBJECT PW_CORE_METHOD_DESTROY - PW_CORE_METHOD_NUM + PW_CORE_METHOD_NUM PW_VERSION_CORE_METHODS = 0 ) const ( PW_REGISTRY_EVENT_GLOBAL = iota PW_REGISTRY_EVENT_GLOBAL_REMOVE - PW_REGISTRY_EVENT_NUM + PW_REGISTRY_EVENT_NUM PW_VERSION_REGISTRY_EVENTS = 0 ) const ( PW_REGISTRY_METHOD_ADD_LISTENER = iota + PW_REGISTRY_METHOD_BIND PW_REGISTRY_METHOD_DESTROY - PW_REGISTRY_METHOD_NUM + PW_REGISTRY_METHOD_NUM PW_VERSION_REGISTRY_METHODS = 0 ) @@ -266,6 +268,31 @@ type CoreErrorEvent struct{ CoreError } // Opcode satisfies [Message] with a constant value. func (c *CoreErrorEvent) Opcode() byte { return PW_CORE_EVENT_ERROR } +// The CoreRemoveId event is used internally by the object ID management logic. +// +// When a client deletes an object, the server will send this event to acknowledge +// that it has seen the delete request. When the client receives this event, it +// will know that it can safely reuse the object ID. +type CoreRemoveId struct { + // A proxy id that was removed. + ID Int `json:"id"` +} + +// Opcode satisfies [Message] with a constant value. +func (c *CoreRemoveId) Opcode() byte { return PW_CORE_EVENT_REMOVE_ID } + +// FileCount satisfies [Message] with a constant value. +func (c *CoreRemoveId) FileCount() Int { return 0 } + +// Size satisfies [KnownSize] with a constant value. +func (c *CoreRemoveId) Size() Word { return SizePrefix + Size(SizeInt) } + +// MarshalBinary satisfies [encoding.BinaryMarshaler] via [Marshal]. +func (c *CoreRemoveId) MarshalBinary() ([]byte, error) { return Marshal(c) } + +// UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. +func (c *CoreRemoveId) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } + // The CoreBoundProps event is emitted when a local object ID is bound to a global ID. // It is emitted before the global becomes visible in the registry. type CoreBoundProps struct { @@ -299,7 +326,7 @@ func (c *CoreBoundProps) UnmarshalBinary(data []byte) error { return Unmarshal(d // ErrBadBoundProps is returned when a [CoreBoundProps] event targeting a proxy // that should never be targeted is received and processed. -var ErrBadBoundProps = errors.New("attempted to store bound props on proxy that should never be targeted") +var ErrBadBoundProps = errors.New("attempting to store bound props on a proxy that should never be targeted") // noAck is embedded by proxies that are never targeted by [CoreBoundProps]. type noAck struct{} @@ -307,6 +334,54 @@ type noAck struct{} // setBoundProps should never be called as this proxy should never be targeted by [CoreBoundProps]. func (noAck) setBoundProps(*CoreBoundProps) error { return ErrBadBoundProps } +// ErrBadRemove is returned when a [CoreRemoveId] event targeting a proxy +// that should never be targeted is received and processed. +var ErrBadRemove = errors.New("attempting to remove a proxy that should never be targeted") + +// noRemove is embedded by proxies that are never targeted by [CoreRemoveId]. +type noRemove struct{} + +// remove should never be called as this proxy should never be targeted by [CoreRemoveId]. +func (noRemove) remove() error { panic(ErrBadRemove) } + +// ErrInvalidRemove is returned when a proxy is somehow removed twice. This is only reached for +// an implementation error as the proxy struct should no longer be reachable after the first call. +var ErrInvalidRemove = errors.New("attempting to remove an already freed proxy") + +// removable is embedded by proxies that can be targeted by [CoreRemoveId] and requires no cleanup. +type removable bool + +// remove checks against removal of a freed proxy and marks the proxy as removed. +func (s *removable) remove() error { + if *s { + panic(ErrInvalidRemove) + } + *s = true + return nil +} + +// ErrProxyDestroyed is returned when attempting to use a proxy method when the underlying +// proxy has already been targeted by a [CoreRemoveId] event. +var ErrProxyDestroyed = errors.New("underlying proxy has been removed") + +// checkDestroy returns [ErrProxyDestroyed] if the current proxy has been destroyed. +// Must be called at the beginning of any exported method of a proxy embedding removable. +func (s *removable) checkDestroy() error { + if *s { + // not fatal: the caller is allowed to recover from this and allocate a new proxy + return ErrProxyDestroyed + } + return nil +} + +// mustCheckDestroy calls checkDestroy and panics if a non-nil error is returned. +// This is useful for non-exported methods as they should become unreachable. +func (s *removable) mustCheckDestroy() { + if err := s.checkDestroy(); err != nil { + panic(err) + } +} + // An InconsistentIdError describes an inconsistent state where the server claims an impossible // proxy or global id. This is only generated by the [CoreBoundProps] event. type InconsistentIdError struct { @@ -682,22 +757,24 @@ type Core struct { done bool ctx *Context + noAck + noRemove } // ErrUnexpectedDone is a [CoreDone] event with unexpected values. var ErrUnexpectedDone = errors.New("multiple Core::Done events targeting Core::Sync") -// An UnknownBoundIdError describes the server claiming to have bound a proxy id that was never allocated. -type UnknownBoundIdError[E any] struct { +// An UnknownProxyIdError describes an event targeting a proxy id that was never allocated. +type UnknownProxyIdError[E any] struct { // Offending id decoded from Data. Id Int // Event received from the server. Event E } -func (e *UnknownBoundIdError[E]) Error() string { - return "unknown bound proxy id " + strconv.Itoa(int(e.Id)) +func (e *UnknownProxyIdError[E]) Error() string { + return "unknown proxy id " + strconv.Itoa(int(e.Id)) } // An InvalidPingError is a [CorePing] event targeting a proxy id that was never allocated. @@ -752,6 +829,17 @@ func (core *Core) consume(opcode byte, files []int, unmarshal func(v any)) error unmarshal(&coreError) return &coreError + case PW_CORE_EVENT_REMOVE_ID: + var coreRemoveId CoreRemoveId + unmarshal(&coreRemoveId) + if proxy, ok := core.ctx.proxy[coreRemoveId.ID]; !ok { + // this should never happen so is non-recoverable if it does + panic(&UnknownProxyIdError[*CoreRemoveId]{Id: coreRemoveId.ID, Event: &coreRemoveId}) + } else { + delete(core.ctx.proxy, coreRemoveId.ID) + return proxy.remove() + } + case PW_CORE_EVENT_BOUND_PROPS: var boundProps CoreBoundProps unmarshal(&boundProps) @@ -759,7 +847,7 @@ func (core *Core) consume(opcode byte, files []int, unmarshal func(v any)) error delete(core.ctx.pendingIds, boundProps.ID) proxy, ok := core.ctx.proxy[boundProps.ID] if !ok { - return &UnknownBoundIdError[*CoreBoundProps]{Id: boundProps.ID, Event: &boundProps} + return &UnknownProxyIdError[*CoreBoundProps]{Id: boundProps.ID, Event: &boundProps} } return proxy.setBoundProps(&boundProps) @@ -782,7 +870,9 @@ type Registry struct { Objects map[Int]RegistryGlobal `json:"objects"` ctx *Context + noAck + noRemove } // A GlobalIDCollisionError describes a [RegistryGlobal] event stepping on a previous instance of itself. @@ -822,6 +912,8 @@ func (registry *Registry) consume(opcode byte, files []int, unmarshal func(v any case PW_REGISTRY_EVENT_GLOBAL_REMOVE: var globalRemove RegistryGlobalRemove unmarshal(&globalRemove) + // server emits PW_CORE_EVENT_REMOVE_ID events targeting + // affected proxies so they do not need to be handled here l := len(registry.Objects) delete(registry.Objects, globalRemove.ID) if len(registry.Objects) != l-1 { diff --git a/internal/pipewire/core_test.go b/internal/pipewire/core_test.go index f9e5f09..6d73b5c 100644 --- a/internal/pipewire/core_test.go +++ b/internal/pipewire/core_test.go @@ -171,7 +171,7 @@ func TestCoreError(t *testing.T) { /* padding */ 0, 0, 0, 0, /* size: 0x1b bytes */ 0x1b, 0, 0, 0, - /*type: String*/ 8, 0, 0, 0, + /*type: String */ 8, 0, 0, 0, // value: "no permission to destroy 0\x00" 0x6e, 0x6f, 0x20, 0x70, @@ -192,6 +192,24 @@ func TestCoreError(t *testing.T) { }.run(t) } +func TestCoreRemoveId(t *testing.T) { + t.Parallel() + + encodingTestCases[pipewire.CoreRemoveId, *pipewire.CoreRemoveId]{ + {"sample", []byte{ + /* size: rest of data */ 0x10, 0, 0, 0, + /* type: Struct */ 0xe, 0, 0, 0, + + /* size: 4 bytes */ 4, 0, 0, 0, + /* type: Int */ 4, 0, 0, 0, + /* value: 3 */ 3, 0, 0, 0, + /* padding */ 0, 0, 0, 0, + }, pipewire.CoreRemoveId{ + ID: 3, + }, nil}, + }.run(t) +} + func TestCoreBoundProps(t *testing.T) { t.Parallel() diff --git a/internal/pipewire/pipewire.go b/internal/pipewire/pipewire.go index 636c759..92a5dbc 100644 --- a/internal/pipewire/pipewire.go +++ b/internal/pipewire/pipewire.go @@ -420,6 +420,9 @@ type eventProxy interface { consume(opcode byte, files []int, unmarshal func(v any)) error // setBoundProps stores a [CoreBoundProps] event received from the server. setBoundProps(event *CoreBoundProps) error + // remove is called when the proxy is removed for any reason, usually from + // being targeted by a [PW_CORE_EVENT_REMOVE_ID] event. + remove() error // Stringer returns the PipeWire interface name. fmt.Stringer diff --git a/internal/pipewire/securitycontext.go b/internal/pipewire/securitycontext.go index dbc3c52..ec28d75 100644 --- a/internal/pipewire/securitycontext.go +++ b/internal/pipewire/securitycontext.go @@ -26,9 +26,10 @@ const ( const ( PW_SECURITY_CONTEXT_METHOD_ADD_LISTENER = iota - PW_SECURITY_CONTEXT_METHOD_CREATE - PW_SECURITY_CONTEXT_METHOD_NUM + PW_SECURITY_CONTEXT_METHOD_CREATE + + PW_SECURITY_CONTEXT_METHOD_NUM PW_VERSION_SECURITY_CONTEXT_METHODS = 0 ) @@ -90,6 +91,8 @@ type SecurityContext struct { GlobalID Int `json:"id"` ctx *Context + + removable } // GetSecurityContext queues a [RegistryBind] message for the PipeWire server @@ -109,6 +112,10 @@ func (registry *Registry) GetSecurityContext() (securityContext *SecurityContext // Create queues a [SecurityContextCreate] message for the PipeWire server. func (securityContext *SecurityContext) Create(listenFd, closeFd int, props SPADict) error { + if err := securityContext.checkDestroy(); err != nil { + return err + } + // queued in reverse based on upstream behaviour, unsure why offset := securityContext.ctx.queueFiles(closeFd, listenFd) return securityContext.ctx.writeMessage( @@ -144,6 +151,9 @@ func (scc *securityContextCloser) Close() (err error) { // BindAndCreate binds a new socket to the specified pathname and pass it to Create. // It returns an [io.Closer] corresponding to [SecurityContextCreate.CloseFd]. func (securityContext *SecurityContext) BindAndCreate(pathname string, props SPADict) (io.Closer, error) { + if err := securityContext.checkDestroy(); err != nil { + return nil, err + } var scc securityContextCloser // ensure pathname is available @@ -185,6 +195,7 @@ func (securityContext *SecurityContext) BindAndCreate(pathname string, props SPA } func (securityContext *SecurityContext) consume(opcode byte, files []int, _ func(v any)) error { + securityContext.mustCheckDestroy() closeReceivedFiles(files...) switch opcode { // SecurityContext does not receive any events @@ -196,6 +207,7 @@ func (securityContext *SecurityContext) consume(opcode byte, files []int, _ func } func (securityContext *SecurityContext) setBoundProps(event *CoreBoundProps) error { + securityContext.mustCheckDestroy() if securityContext.ID != event.ID { return &InconsistentIdError{Proxy: securityContext, ID: securityContext.ID, ServerID: event.ID} }