internal/pipewire: implement Core::Destroy
All checks were successful
Test / Create distribution (push) Successful in 35s
Test / Sandbox (push) Successful in 2m28s
Test / Hakurei (push) Successful in 3m25s
Test / Hpkg (push) Successful in 4m19s
Test / Sandbox (race detector) (push) Successful in 4m26s
Test / Hakurei (race detector) (push) Successful in 5m21s
Test / Flake checks (push) Successful in 1m43s

This change also implements pending destructible check on Sync. Destruction method should always be implemented as a wrapper of destructible.destroy.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2025-12-14 09:20:58 +09:00
parent 00a5bdf006
commit b0f2ab6fff
4 changed files with 103 additions and 1 deletions

View File

@@ -382,6 +382,41 @@ func (s *removable) mustCheckDestroy() {
}
}
// destructible is embedded by proxies that can be targeted by the [CoreRemoveId] event and the
// [CoreDestroy] method and requires no cleanup. destructible purposefully does not override
// removable.mustCheckDestroy because it is used by unexported methods called during event handling
// and are exempt from the destruction check.
type destructible struct {
destroyed bool
removable
}
// checkDestroy overrides removable.checkDestroy to also check the destroyed field.
func (s *destructible) checkDestroy() error {
if s.destroyed {
return ErrProxyDestroyed
}
if err := s.removable.checkDestroy(); err != nil {
return err
}
return nil
}
// destroy calls removable.checkDestroy then queues a [CoreDestroy] event if it succeeds.
func (s *destructible) destroy(ctx *Context, id Int) error {
if err := s.checkDestroy(); err != nil {
return err
}
l := len(ctx.pendingDestruction)
ctx.pendingDestruction[id] = struct{}{}
if len(ctx.pendingDestruction) != l+1 {
return ErrProxyDestroyed
}
s.destroyed = true
return ctx.GetCore().destroy(id)
}
// An InconsistentIdError describes an inconsistent state where the server claims an impossible
// proxy or global id. This is only generated by the [CoreBoundProps] event.
type InconsistentIdError struct {
@@ -504,6 +539,10 @@ func (core *Core) Sync() error {
core.ctx.closeReceivedFiles()
return &ProxyFatalError{Err: UnacknowledgedProxyError(slices.Collect(maps.Keys(core.ctx.pendingIds))), ProxyErrs: core.ctx.cloneAsProxyErrors()}
}
if len(core.ctx.pendingDestruction) != 0 {
core.ctx.closeReceivedFiles()
return &ProxyFatalError{Err: UnacknowledgedProxyDestructionError(slices.Collect(maps.Keys(core.ctx.pendingDestruction))), ProxyErrs: core.ctx.cloneAsProxyErrors()}
}
return core.ctx.doSyncComplete()
}
@@ -624,6 +663,37 @@ func (core *Core) createObject(factoryName, typeName String, version Int, props
)
}
// CoreDestroy is sent when the client requests to destroy an object.
type CoreDestroy struct {
// The proxy id of the object to destroy.
ID Int `json:"id"`
}
// Opcode satisfies [Message] with a constant value.
func (c *CoreDestroy) Opcode() byte { return PW_CORE_METHOD_DESTROY }
// FileCount satisfies [Message] with a constant value.
func (c *CoreDestroy) FileCount() Int { return 0 }
// Size satisfies [KnownSize] with a constant value.
func (c *CoreDestroy) Size() Word { return SizePrefix + Size(SizeInt) }
// MarshalBinary satisfies [encoding.BinaryMarshaler] via [Marshal].
func (c *CoreDestroy) MarshalBinary() ([]byte, error) { return Marshal(c) }
// UnmarshalBinary satisfies [encoding.BinaryUnmarshaler] via [Unmarshal].
func (c *CoreDestroy) UnmarshalBinary(data []byte) error { return Unmarshal(data, c) }
// destroy queues a [CoreDestroy] message for the PipeWire server.
// This is not safe to use directly, callers should use the exported method
// on the proxy implementation instead.
func (core *Core) destroy(id Int) error {
return core.ctx.writeMessage(
PW_ID_CORE,
&CoreDestroy{id},
)
}
// A RegistryGlobal event is emitted to notify a client about a new global object.
type RegistryGlobal struct {
// The global id.
@@ -889,6 +959,8 @@ func (core *Core) consume(opcode byte, files []int, unmarshal func(v any)) error
panic(&UnknownProxyIdError[*CoreRemoveId]{Id: coreRemoveId.ID, Event: &coreRemoveId})
} else {
delete(core.ctx.proxy, coreRemoveId.ID)
// not always populated so this is not checked
delete(core.ctx.pendingDestruction, coreRemoveId.ID)
return proxy.remove()
}

View File

@@ -365,6 +365,24 @@ func TestCoreCreateObject(t *testing.T) {
}.run(t)
}
func TestCoreDestroy(t *testing.T) {
t.Parallel()
encodingTestCases[pipewire.CoreDestroy, *pipewire.CoreDestroy]{
{"sample", []byte{
/* size: rest of data */ 0x10, 0, 0, 0,
/* type: Struct */ 0xe, 0, 0, 0,
/* size: 4 bytes */ 4, 0, 0, 0,
/* type: Int */ 4, 0, 0, 0,
/* value: 3 */ 3, 0, 0, 0,
/* padding */ 0, 0, 0, 0,
}, pipewire.CoreDestroy{
ID: 3,
}, nil},
}.run(t)
}
func TestRegistryGlobal(t *testing.T) {
t.Parallel()

View File

@@ -60,6 +60,8 @@ type Context struct {
pendingIds map[Int]struct{}
// Smallest available Id for the next proxy.
nextId Int
// Proxies targeted by the [CoreDestroy] event pending until next [CoreSync].
pendingDestruction map[Int]struct{}
// Server side registry generation number.
generation Long
// Pending file descriptors to be sent with the next message.
@@ -121,6 +123,7 @@ func New(conn Conn, props SPADict) (*Context, error) {
PW_ID_CLIENT: {},
}
ctx.nextId = Int(len(ctx.proxy))
ctx.pendingDestruction = make(map[Int]struct{})
if err := ctx.core.hello(); err != nil {
return nil, err
@@ -503,13 +506,21 @@ func (e DanglingFilesError) Error() string {
}
// An UnacknowledgedProxyError holds newly allocated proxy ids that the server failed
// to acknowledge after an otherwise successful [Context.Roundtrip].
// to acknowledge after an otherwise successful [Core.Sync].
type UnacknowledgedProxyError []Int
func (e UnacknowledgedProxyError) Error() string {
return "server did not acknowledge " + strconv.Itoa(len(e)) + " proxies"
}
// An UnacknowledgedProxyDestructionError holds destroyed proxy ids that the server failed
// to acknowledge after an otherwise successful [Core.Sync].
type UnacknowledgedProxyDestructionError []Int
func (e UnacknowledgedProxyDestructionError) Error() string {
return "server did not acknowledge " + strconv.Itoa(len(e)) + " proxy destructions"
}
// A ProxyFatalError describes an error that terminates event handling during a
// [Context.Roundtrip] and makes further event processing no longer possible.
type ProxyFatalError struct {

View File

@@ -836,6 +836,7 @@ func TestContextErrors(t *testing.T) {
{"UnexpectedFileCountError", &pipewire.UnexpectedFileCountError{0, -1}, "received -1 files instead of the expected 0"},
{"UnacknowledgedProxyError", make(pipewire.UnacknowledgedProxyError, 1<<4), "server did not acknowledge 16 proxies"},
{"UnacknowledgedProxyDestructionError", make(pipewire.UnacknowledgedProxyDestructionError, 1<<4), "server did not acknowledge 16 proxy destructions"},
{"DanglingFilesError", make(pipewire.DanglingFilesError, 1<<4), "received 16 dangling files"},
{"UnexpectedFilesError", pipewire.UnexpectedFilesError(1 << 4), "server message headers claim to have sent more files than actually received"},
{"UnexpectedSequenceError", pipewire.UnexpectedSequenceError(1 << 4), "unexpected seq 16"},