internal/pipewire: post-sync cleanup functions
All checks were successful
Test / Create distribution (push) Successful in 35s
Test / Sandbox (push) Successful in 2m42s
Test / Sandbox (race detector) (push) Successful in 4m47s
Test / Hpkg (push) Successful in 5m40s
Test / Hakurei (push) Successful in 5m45s
Test / Hakurei (race detector) (push) Successful in 6m33s
Test / Flake checks (push) Successful in 1m43s

This makes it easier to handle resources who only needs to stay alive before the next sync.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
Ophestra 2025-12-06 20:59:41 +09:00
parent 5e7861bb00
commit f44923da29
Signed by: cat
SSH Key Fingerprint: SHA256:gQ67O0enBZ7UdZypgtspB2FDM1g3GVw8nX0XSdcFw8Q
2 changed files with 65 additions and 2 deletions

View File

@ -372,7 +372,8 @@ func (core *Core) Sync() error {
return err
}
}
return core.ctx.cloneProxyErrors()
return core.ctx.doSyncComplete()
}
// The CorePong message is sent from the client to the server when the server emits the Ping event.

View File

@ -78,6 +78,9 @@ type Context struct {
// Pending footer value deferred to the next round trip,
// sent if pendingFooter is nil. This is for emulating upstream behaviour
deferredPendingFooter KnownSize
// Deferred operations ran after a [Core.Sync] completes or Close is called. Errors
//are reported as part of [ProxyConsumeError] and is not considered fatal unless panicked.
syncComplete []func() error
// Proxy for built-in core events.
core Core
// Proxy for built-in client events.
@ -92,6 +95,10 @@ type Context struct {
conn Conn
}
// cleanup arranges for f to be called after the next [CoreDone] event
// or when [Context] is closed.
func (ctx *Context) cleanup(f func() error) { ctx.syncComplete = append(ctx.syncComplete, f) }
// GetCore returns the address of [Core] held by this [Context].
func (ctx *Context) GetCore() *Core { return &ctx.core }
@ -590,12 +597,14 @@ func (ctx *Context) roundtrip() (err error) {
// populated early for finalizers, but does not overwrite existing errors
if len(danglingFiles) > 0 && err == nil {
ctx.closeReceivedFiles()
err = &ProxyFatalError{Err: danglingFiles, ProxyErrs: ctx.cloneAsProxyErrors()}
return
}
// this check must happen after everything else passes
if len(ctx.pendingIds) != 0 {
ctx.closeReceivedFiles()
err = &ProxyFatalError{Err: UnacknowledgedProxyError(slices.Collect(maps.Keys(ctx.pendingIds))), ProxyErrs: ctx.cloneAsProxyErrors()}
return
}
@ -629,6 +638,7 @@ func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err erro
if r == nil {
return
}
ctx.closeReceivedFiles()
recoveredErr, ok := r.(error)
if !ok {
@ -713,5 +723,57 @@ func closeReceivedFiles(fds ...int) {
}
}
// doSyncComplete calls syncComplete functions and collects their errors alongside errors
// cloned from proxyErrors. A panic is translated into ProxyFatalError.
func (ctx *Context) doSyncComplete() (err error) {
proxyErrors := ctx.cloneAsProxyErrors()
defer func() {
r := recover()
if r == nil {
return
}
ctx.closeReceivedFiles()
recoveredErr, ok := r.(error)
if !ok {
panic(r)
}
if recoveredErr == nil {
panic(&runtime.PanicNilError{})
}
err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: proxyErrors}
return
}()
for _, f := range ctx.syncComplete {
if scErr := f(); scErr != nil {
proxyErrors = append(proxyErrors, scErr)
}
}
ctx.syncComplete = ctx.syncComplete[:0]
if len(proxyErrors) > 0 {
err = proxyErrors
}
return
}
// Close frees the underlying buffer and closes the connection.
func (ctx *Context) Close() error { ctx.free(); return ctx.conn.Close() }
func (ctx *Context) Close() (err error) {
ctx.free()
err = ctx.doSyncComplete()
closeErr := ctx.conn.Close()
if closeErr != nil {
if err == nil {
return closeErr
} else if proxyErrors, ok := err.(ProxyConsumeError); ok {
return &ProxyFatalError{Err: err, ProxyErrs: proxyErrors}
} else {
return
}
} else {
return err
}
}