From b0f2ab6fffd48580d758de80165f8d97438b4066 Mon Sep 17 00:00:00 2001 From: Ophestra Date: Sun, 14 Dec 2025 09:20:58 +0900 Subject: [PATCH] internal/pipewire: implement Core::Destroy This change also implements pending destructible check on Sync. Destruction method should always be implemented as a wrapper of destructible.destroy. Signed-off-by: Ophestra --- internal/pipewire/core.go | 72 ++++++++++++++++++++++++++++++ internal/pipewire/core_test.go | 18 ++++++++ internal/pipewire/pipewire.go | 13 +++++- internal/pipewire/pipewire_test.go | 1 + 4 files changed, 103 insertions(+), 1 deletion(-) diff --git a/internal/pipewire/core.go b/internal/pipewire/core.go index 6adcac1..517ef2c 100644 --- a/internal/pipewire/core.go +++ b/internal/pipewire/core.go @@ -382,6 +382,41 @@ func (s *removable) mustCheckDestroy() { } } +// destructible is embedded by proxies that can be targeted by the [CoreRemoveId] event and the +// [CoreDestroy] method and requires no cleanup. destructible purposefully does not override +// removable.mustCheckDestroy because it is used by unexported methods called during event handling +// and are exempt from the destruction check. +type destructible struct { + destroyed bool + + removable +} + +// checkDestroy overrides removable.checkDestroy to also check the destroyed field. +func (s *destructible) checkDestroy() error { + if s.destroyed { + return ErrProxyDestroyed + } + if err := s.removable.checkDestroy(); err != nil { + return err + } + return nil +} + +// destroy calls removable.checkDestroy then queues a [CoreDestroy] event if it succeeds. +func (s *destructible) destroy(ctx *Context, id Int) error { + if err := s.checkDestroy(); err != nil { + return err + } + l := len(ctx.pendingDestruction) + ctx.pendingDestruction[id] = struct{}{} + if len(ctx.pendingDestruction) != l+1 { + return ErrProxyDestroyed + } + s.destroyed = true + return ctx.GetCore().destroy(id) +} + // An InconsistentIdError describes an inconsistent state where the server claims an impossible // proxy or global id. This is only generated by the [CoreBoundProps] event. type InconsistentIdError struct { @@ -504,6 +539,10 @@ func (core *Core) Sync() error { core.ctx.closeReceivedFiles() return &ProxyFatalError{Err: UnacknowledgedProxyError(slices.Collect(maps.Keys(core.ctx.pendingIds))), ProxyErrs: core.ctx.cloneAsProxyErrors()} } + if len(core.ctx.pendingDestruction) != 0 { + core.ctx.closeReceivedFiles() + return &ProxyFatalError{Err: UnacknowledgedProxyDestructionError(slices.Collect(maps.Keys(core.ctx.pendingDestruction))), ProxyErrs: core.ctx.cloneAsProxyErrors()} + } return core.ctx.doSyncComplete() } @@ -624,6 +663,37 @@ func (core *Core) createObject(factoryName, typeName String, version Int, props ) } +// CoreDestroy is sent when the client requests to destroy an object. +type CoreDestroy struct { + // The proxy id of the object to destroy. + ID Int `json:"id"` +} + +// Opcode satisfies [Message] with a constant value. +func (c *CoreDestroy) Opcode() byte { return PW_CORE_METHOD_DESTROY } + +// FileCount satisfies [Message] with a constant value. +func (c *CoreDestroy) FileCount() Int { return 0 } + +// Size satisfies [KnownSize] with a constant value. +func (c *CoreDestroy) Size() Word { return SizePrefix + Size(SizeInt) } + +// MarshalBinary satisfies [encoding.BinaryMarshaler] via [Marshal]. +func (c *CoreDestroy) MarshalBinary() ([]byte, error) { return Marshal(c) } + +// UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal]. +func (c *CoreDestroy) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) } + +// destroy queues a [CoreDestroy] message for the PipeWire server. +// This is not safe to use directly, callers should use the exported method +// on the proxy implementation instead. +func (core *Core) destroy(id Int) error { + return core.ctx.writeMessage( + PW_ID_CORE, + &CoreDestroy{id}, + ) +} + // A RegistryGlobal event is emitted to notify a client about a new global object. type RegistryGlobal struct { // The global id. @@ -889,6 +959,8 @@ func (core *Core) consume(opcode byte, files []int, unmarshal func(v any)) error panic(&UnknownProxyIdError[*CoreRemoveId]{Id: coreRemoveId.ID, Event: &coreRemoveId}) } else { delete(core.ctx.proxy, coreRemoveId.ID) + // not always populated so this is not checked + delete(core.ctx.pendingDestruction, coreRemoveId.ID) return proxy.remove() } diff --git a/internal/pipewire/core_test.go b/internal/pipewire/core_test.go index 39fbcd6..4840aa6 100644 --- a/internal/pipewire/core_test.go +++ b/internal/pipewire/core_test.go @@ -365,6 +365,24 @@ func TestCoreCreateObject(t *testing.T) { }.run(t) } +func TestCoreDestroy(t *testing.T) { + t.Parallel() + + encodingTestCases[pipewire.CoreDestroy, *pipewire.CoreDestroy]{ + {"sample", []byte{ + /* size: rest of data */ 0x10, 0, 0, 0, + /* type: Struct */ 0xe, 0, 0, 0, + + /* size: 4 bytes */ 4, 0, 0, 0, + /* type: Int */ 4, 0, 0, 0, + /* value: 3 */ 3, 0, 0, 0, + /* padding */ 0, 0, 0, 0, + }, pipewire.CoreDestroy{ + ID: 3, + }, nil}, + }.run(t) +} + func TestRegistryGlobal(t *testing.T) { t.Parallel() diff --git a/internal/pipewire/pipewire.go b/internal/pipewire/pipewire.go index 35d380f..6d290d0 100644 --- a/internal/pipewire/pipewire.go +++ b/internal/pipewire/pipewire.go @@ -60,6 +60,8 @@ type Context struct { pendingIds map[Int]struct{} // Smallest available Id for the next proxy. nextId Int + // Proxies targeted by the [CoreDestroy] event pending until next [CoreSync]. + pendingDestruction map[Int]struct{} // Server side registry generation number. generation Long // Pending file descriptors to be sent with the next message. @@ -121,6 +123,7 @@ func New(conn Conn, props SPADict) (*Context, error) { PW_ID_CLIENT: {}, } ctx.nextId = Int(len(ctx.proxy)) + ctx.pendingDestruction = make(map[Int]struct{}) if err := ctx.core.hello(); err != nil { return nil, err @@ -503,13 +506,21 @@ func (e DanglingFilesError) Error() string { } // An UnacknowledgedProxyError holds newly allocated proxy ids that the server failed -// to acknowledge after an otherwise successful [Context.Roundtrip]. +// to acknowledge after an otherwise successful [Core.Sync]. type UnacknowledgedProxyError []Int func (e UnacknowledgedProxyError) Error() string { return "server did not acknowledge " + strconv.Itoa(len(e)) + " proxies" } +// An UnacknowledgedProxyDestructionError holds destroyed proxy ids that the server failed +// to acknowledge after an otherwise successful [Core.Sync]. +type UnacknowledgedProxyDestructionError []Int + +func (e UnacknowledgedProxyDestructionError) Error() string { + return "server did not acknowledge " + strconv.Itoa(len(e)) + " proxy destructions" +} + // A ProxyFatalError describes an error that terminates event handling during a // [Context.Roundtrip] and makes further event processing no longer possible. type ProxyFatalError struct { diff --git a/internal/pipewire/pipewire_test.go b/internal/pipewire/pipewire_test.go index afa2d7e..1ba7285 100644 --- a/internal/pipewire/pipewire_test.go +++ b/internal/pipewire/pipewire_test.go @@ -836,6 +836,7 @@ func TestContextErrors(t *testing.T) { {"UnexpectedFileCountError", &pipewire.UnexpectedFileCountError{0, -1}, "received -1 files instead of the expected 0"}, {"UnacknowledgedProxyError", make(pipewire.UnacknowledgedProxyError, 1<<4), "server did not acknowledge 16 proxies"}, + {"UnacknowledgedProxyDestructionError", make(pipewire.UnacknowledgedProxyDestructionError, 1<<4), "server did not acknowledge 16 proxy destructions"}, {"DanglingFilesError", make(pipewire.DanglingFilesError, 1<<4), "received 16 dangling files"}, {"UnexpectedFilesError", pipewire.UnexpectedFilesError(1 << 4), "server message headers claim to have sent more files than actually received"}, {"UnexpectedSequenceError", pipewire.UnexpectedSequenceError(1 << 4), "unexpected seq 16"},