internal/pipewire: implement Core::RemoveId
All checks were successful
Test / Create distribution (push) Successful in 1m15s
Test / Sandbox (push) Successful in 3m15s
Test / Hakurei (push) Successful in 4m19s
Test / Hakurei (race detector) (push) Successful in 3m24s
Test / Sandbox (race detector) (push) Successful in 2m34s
Test / Hpkg (push) Successful in 3m34s
Test / Flake checks (push) Successful in 1m50s

This is emitted by the server when a proxy id is removed for any reason. Currently, the only path for this to be emitted is when a global object is destroyed while some proxy is still bound to it.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2025-12-13 21:49:28 +09:00
parent ebc67bb8ad
commit b5999b8814
5 changed files with 140 additions and 13 deletions

View File

@@ -103,6 +103,8 @@ type Client struct {
// Populated by [CoreBoundProps] events targeting [Client].
Properties SPADict `json:"props"`
noRemove
}
func (client *Client) consume(opcode byte, files []int, unmarshal func(v any)) error {

View File

@@ -40,13 +40,14 @@ const (
PW_CORE_EVENT_ADD_MEM
PW_CORE_EVENT_REMOVE_MEM
PW_CORE_EVENT_BOUND_PROPS
PW_CORE_EVENT_NUM
PW_CORE_EVENT_NUM
PW_VERSION_CORE_EVENTS = 1
)
const (
PW_CORE_METHOD_ADD_LISTENER = iota
PW_CORE_METHOD_HELLO
PW_CORE_METHOD_SYNC
PW_CORE_METHOD_PONG
@@ -54,25 +55,26 @@ const (
PW_CORE_METHOD_GET_REGISTRY
PW_CORE_METHOD_CREATE_OBJECT
PW_CORE_METHOD_DESTROY
PW_CORE_METHOD_NUM
PW_CORE_METHOD_NUM
PW_VERSION_CORE_METHODS = 0
)
const (
PW_REGISTRY_EVENT_GLOBAL = iota
PW_REGISTRY_EVENT_GLOBAL_REMOVE
PW_REGISTRY_EVENT_NUM
PW_REGISTRY_EVENT_NUM
PW_VERSION_REGISTRY_EVENTS = 0
)
const (
PW_REGISTRY_METHOD_ADD_LISTENER = iota
PW_REGISTRY_METHOD_BIND
PW_REGISTRY_METHOD_DESTROY
PW_REGISTRY_METHOD_NUM
PW_REGISTRY_METHOD_NUM
PW_VERSION_REGISTRY_METHODS = 0
)
@@ -266,6 +268,31 @@ type CoreErrorEvent struct{ CoreError }
// Opcode satisfies [Message] with a constant value.
func (c *CoreErrorEvent) Opcode() byte { return PW_CORE_EVENT_ERROR }
// The CoreRemoveId event is used internally by the object ID management logic.
//
// When a client deletes an object, the server will send this event to acknowledge
// that it has seen the delete request. When the client receives this event, it
// will know that it can safely reuse the object ID.
type CoreRemoveId struct {
// A proxy id that was removed.
ID Int `json:"id"`
}
// Opcode satisfies [Message] with a constant value.
func (c *CoreRemoveId) Opcode() byte { return PW_CORE_EVENT_REMOVE_ID }
// FileCount satisfies [Message] with a constant value.
func (c *CoreRemoveId) FileCount() Int { return 0 }
// Size satisfies [KnownSize] with a constant value.
func (c *CoreRemoveId) Size() Word { return SizePrefix + Size(SizeInt) }
// MarshalBinary satisfies [encoding.BinaryMarshaler] via [Marshal].
func (c *CoreRemoveId) MarshalBinary() ([]byte, error) { return Marshal(c) }
// UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal].
func (c *CoreRemoveId) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) }
// The CoreBoundProps event is emitted when a local object ID is bound to a global ID.
// It is emitted before the global becomes visible in the registry.
type CoreBoundProps struct {
@@ -299,7 +326,7 @@ func (c *CoreBoundProps) UnmarshalBinary(data []byte) error { return Unmarshal(d
// ErrBadBoundProps is returned when a [CoreBoundProps] event targeting a proxy
// that should never be targeted is received and processed.
var ErrBadBoundProps = errors.New("attempted to store bound props on proxy that should never be targeted")
var ErrBadBoundProps = errors.New("attempting to store bound props on a proxy that should never be targeted")
// noAck is embedded by proxies that are never targeted by [CoreBoundProps].
type noAck struct{}
@@ -307,6 +334,54 @@ type noAck struct{}
// setBoundProps should never be called as this proxy should never be targeted by [CoreBoundProps].
func (noAck) setBoundProps(*CoreBoundProps) error { return ErrBadBoundProps }
// ErrBadRemove is returned when a [CoreRemoveId] event targeting a proxy
// that should never be targeted is received and processed.
var ErrBadRemove = errors.New("attempting to remove a proxy that should never be targeted")
// noRemove is embedded by proxies that are never targeted by [CoreRemoveId].
type noRemove struct{}
// remove should never be called as this proxy should never be targeted by [CoreRemoveId].
func (noRemove) remove() error { panic(ErrBadRemove) }
// ErrInvalidRemove is returned when a proxy is somehow removed twice. This is only reached for
// an implementation error as the proxy struct should no longer be reachable after the first call.
var ErrInvalidRemove = errors.New("attempting to remove an already freed proxy")
// removable is embedded by proxies that can be targeted by [CoreRemoveId] and requires no cleanup.
type removable bool
// remove checks against removal of a freed proxy and marks the proxy as removed.
func (s *removable) remove() error {
if *s {
panic(ErrInvalidRemove)
}
*s = true
return nil
}
// ErrProxyDestroyed is returned when attempting to use a proxy method when the underlying
// proxy has already been targeted by a [CoreRemoveId] event.
var ErrProxyDestroyed = errors.New("underlying proxy has been removed")
// checkDestroy returns [ErrProxyDestroyed] if the current proxy has been destroyed.
// Must be called at the beginning of any exported method of a proxy embedding removable.
func (s *removable) checkDestroy() error {
if *s {
// not fatal: the caller is allowed to recover from this and allocate a new proxy
return ErrProxyDestroyed
}
return nil
}
// mustCheckDestroy calls checkDestroy and panics if a non-nil error is returned.
// This is useful for non-exported methods as they should become unreachable.
func (s *removable) mustCheckDestroy() {
if err := s.checkDestroy(); err != nil {
panic(err)
}
}
// An InconsistentIdError describes an inconsistent state where the server claims an impossible
// proxy or global id. This is only generated by the [CoreBoundProps] event.
type InconsistentIdError struct {
@@ -682,22 +757,24 @@ type Core struct {
done bool
ctx *Context
noAck
noRemove
}
// ErrUnexpectedDone is a [CoreDone] event with unexpected values.
var ErrUnexpectedDone = errors.New("multiple Core::Done events targeting Core::Sync")
// An UnknownBoundIdError describes the server claiming to have bound a proxy id that was never allocated.
type UnknownBoundIdError[E any] struct {
// An UnknownProxyIdError describes an event targeting a proxy id that was never allocated.
type UnknownProxyIdError[E any] struct {
// Offending id decoded from Data.
Id Int
// Event received from the server.
Event E
}
func (e *UnknownBoundIdError[E]) Error() string {
return "unknown bound proxy id " + strconv.Itoa(int(e.Id))
func (e *UnknownProxyIdError[E]) Error() string {
return "unknown proxy id " + strconv.Itoa(int(e.Id))
}
// An InvalidPingError is a [CorePing] event targeting a proxy id that was never allocated.
@@ -752,6 +829,17 @@ func (core *Core) consume(opcode byte, files []int, unmarshal func(v any)) error
unmarshal(&coreError)
return &coreError
case PW_CORE_EVENT_REMOVE_ID:
var coreRemoveId CoreRemoveId
unmarshal(&coreRemoveId)
if proxy, ok := core.ctx.proxy[coreRemoveId.ID]; !ok {
// this should never happen so is non-recoverable if it does
panic(&UnknownProxyIdError[*CoreRemoveId]{Id: coreRemoveId.ID, Event: &coreRemoveId})
} else {
delete(core.ctx.proxy, coreRemoveId.ID)
return proxy.remove()
}
case PW_CORE_EVENT_BOUND_PROPS:
var boundProps CoreBoundProps
unmarshal(&boundProps)
@@ -759,7 +847,7 @@ func (core *Core) consume(opcode byte, files []int, unmarshal func(v any)) error
delete(core.ctx.pendingIds, boundProps.ID)
proxy, ok := core.ctx.proxy[boundProps.ID]
if !ok {
return &UnknownBoundIdError[*CoreBoundProps]{Id: boundProps.ID, Event: &boundProps}
return &UnknownProxyIdError[*CoreBoundProps]{Id: boundProps.ID, Event: &boundProps}
}
return proxy.setBoundProps(&boundProps)
@@ -782,7 +870,9 @@ type Registry struct {
Objects map[Int]RegistryGlobal `json:"objects"`
ctx *Context
noAck
noRemove
}
// A GlobalIDCollisionError describes a [RegistryGlobal] event stepping on a previous instance of itself.
@@ -822,6 +912,8 @@ func (registry *Registry) consume(opcode byte, files []int, unmarshal func(v any
case PW_REGISTRY_EVENT_GLOBAL_REMOVE:
var globalRemove RegistryGlobalRemove
unmarshal(&globalRemove)
// server emits PW_CORE_EVENT_REMOVE_ID events targeting
// affected proxies so they do not need to be handled here
l := len(registry.Objects)
delete(registry.Objects, globalRemove.ID)
if len(registry.Objects) != l-1 {

View File

@@ -171,7 +171,7 @@ func TestCoreError(t *testing.T) {
/* padding */ 0, 0, 0, 0,
/* size: 0x1b bytes */ 0x1b, 0, 0, 0,
/*type: String*/ 8, 0, 0, 0,
/*type: String */ 8, 0, 0, 0,
// value: "no permission to destroy 0\x00"
0x6e, 0x6f, 0x20, 0x70,
@@ -192,6 +192,24 @@ func TestCoreError(t *testing.T) {
}.run(t)
}
func TestCoreRemoveId(t *testing.T) {
t.Parallel()
encodingTestCases[pipewire.CoreRemoveId, *pipewire.CoreRemoveId]{
{"sample", []byte{
/* size: rest of data */ 0x10, 0, 0, 0,
/* type: Struct */ 0xe, 0, 0, 0,
/* size: 4 bytes */ 4, 0, 0, 0,
/* type: Int */ 4, 0, 0, 0,
/* value: 3 */ 3, 0, 0, 0,
/* padding */ 0, 0, 0, 0,
}, pipewire.CoreRemoveId{
ID: 3,
}, nil},
}.run(t)
}
func TestCoreBoundProps(t *testing.T) {
t.Parallel()

View File

@@ -420,6 +420,9 @@ type eventProxy interface {
consume(opcode byte, files []int, unmarshal func(v any)) error
// setBoundProps stores a [CoreBoundProps] event received from the server.
setBoundProps(event *CoreBoundProps) error
// remove is called when the proxy is removed for any reason, usually from
// being targeted by a [PW_CORE_EVENT_REMOVE_ID] event.
remove() error
// Stringer returns the PipeWire interface name.
fmt.Stringer

View File

@@ -26,9 +26,10 @@ const (
const (
PW_SECURITY_CONTEXT_METHOD_ADD_LISTENER = iota
PW_SECURITY_CONTEXT_METHOD_CREATE
PW_SECURITY_CONTEXT_METHOD_NUM
PW_SECURITY_CONTEXT_METHOD_CREATE
PW_SECURITY_CONTEXT_METHOD_NUM
PW_VERSION_SECURITY_CONTEXT_METHODS = 0
)
@@ -90,6 +91,8 @@ type SecurityContext struct {
GlobalID Int `json:"id"`
ctx *Context
removable
}
// GetSecurityContext queues a [RegistryBind] message for the PipeWire server
@@ -109,6 +112,10 @@ func (registry *Registry) GetSecurityContext() (securityContext *SecurityContext
// Create queues a [SecurityContextCreate] message for the PipeWire server.
func (securityContext *SecurityContext) Create(listenFd, closeFd int, props SPADict) error {
if err := securityContext.checkDestroy(); err != nil {
return err
}
// queued in reverse based on upstream behaviour, unsure why
offset := securityContext.ctx.queueFiles(closeFd, listenFd)
return securityContext.ctx.writeMessage(
@@ -144,6 +151,9 @@ func (scc *securityContextCloser) Close() (err error) {
// BindAndCreate binds a new socket to the specified pathname and pass it to Create.
// It returns an [io.Closer] corresponding to [SecurityContextCreate.CloseFd].
func (securityContext *SecurityContext) BindAndCreate(pathname string, props SPADict) (io.Closer, error) {
if err := securityContext.checkDestroy(); err != nil {
return nil, err
}
var scc securityContextCloser
// ensure pathname is available
@@ -185,6 +195,7 @@ func (securityContext *SecurityContext) BindAndCreate(pathname string, props SPA
}
func (securityContext *SecurityContext) consume(opcode byte, files []int, _ func(v any)) error {
securityContext.mustCheckDestroy()
closeReceivedFiles(files...)
switch opcode {
// SecurityContext does not receive any events
@@ -196,6 +207,7 @@ func (securityContext *SecurityContext) consume(opcode byte, files []int, _ func
}
func (securityContext *SecurityContext) setBoundProps(event *CoreBoundProps) error {
securityContext.mustCheckDestroy()
if securityContext.ID != event.ID {
return &InconsistentIdError{Proxy: securityContext, ID: securityContext.ID, ServerID: event.ID}
}