From f44923da296720360e359016ac83c6a56deeb0f1 Mon Sep 17 00:00:00 2001 From: Ophestra Date: Sat, 6 Dec 2025 20:59:41 +0900 Subject: [PATCH] internal/pipewire: post-sync cleanup functions This makes it easier to handle resources who only needs to stay alive before the next sync. Signed-off-by: Ophestra --- internal/pipewire/core.go | 3 +- internal/pipewire/pipewire.go | 64 ++++++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/internal/pipewire/core.go b/internal/pipewire/core.go index 3b67a50..4642bdb 100644 --- a/internal/pipewire/core.go +++ b/internal/pipewire/core.go @@ -372,7 +372,8 @@ func (core *Core) Sync() error { return err } } - return core.ctx.cloneProxyErrors() + + return core.ctx.doSyncComplete() } // The CorePong message is sent from the client to the server when the server emits the Ping event. diff --git a/internal/pipewire/pipewire.go b/internal/pipewire/pipewire.go index c1cb397..06f7fc6 100644 --- a/internal/pipewire/pipewire.go +++ b/internal/pipewire/pipewire.go @@ -78,6 +78,9 @@ type Context struct { // Pending footer value deferred to the next round trip, // sent if pendingFooter is nil. This is for emulating upstream behaviour deferredPendingFooter KnownSize + // Deferred operations ran after a [Core.Sync] completes or Close is called. Errors + //are reported as part of [ProxyConsumeError] and is not considered fatal unless panicked. + syncComplete []func() error // Proxy for built-in core events. core Core // Proxy for built-in client events. @@ -92,6 +95,10 @@ type Context struct { conn Conn } +// cleanup arranges for f to be called after the next [CoreDone] event +// or when [Context] is closed. +func (ctx *Context) cleanup(f func() error) { ctx.syncComplete = append(ctx.syncComplete, f) } + // GetCore returns the address of [Core] held by this [Context]. func (ctx *Context) GetCore() *Core { return &ctx.core } @@ -590,12 +597,14 @@ func (ctx *Context) roundtrip() (err error) { // populated early for finalizers, but does not overwrite existing errors if len(danglingFiles) > 0 && err == nil { + ctx.closeReceivedFiles() err = &ProxyFatalError{Err: danglingFiles, ProxyErrs: ctx.cloneAsProxyErrors()} return } // this check must happen after everything else passes if len(ctx.pendingIds) != 0 { + ctx.closeReceivedFiles() err = &ProxyFatalError{Err: UnacknowledgedProxyError(slices.Collect(maps.Keys(ctx.pendingIds))), ProxyErrs: ctx.cloneAsProxyErrors()} return } @@ -629,6 +638,7 @@ func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err erro if r == nil { return } + ctx.closeReceivedFiles() recoveredErr, ok := r.(error) if !ok { @@ -713,5 +723,57 @@ func closeReceivedFiles(fds ...int) { } } +// doSyncComplete calls syncComplete functions and collects their errors alongside errors +// cloned from proxyErrors. A panic is translated into ProxyFatalError. +func (ctx *Context) doSyncComplete() (err error) { + proxyErrors := ctx.cloneAsProxyErrors() + defer func() { + r := recover() + if r == nil { + return + } + ctx.closeReceivedFiles() + + recoveredErr, ok := r.(error) + if !ok { + panic(r) + } + if recoveredErr == nil { + panic(&runtime.PanicNilError{}) + } + + err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: proxyErrors} + return + }() + + for _, f := range ctx.syncComplete { + if scErr := f(); scErr != nil { + proxyErrors = append(proxyErrors, scErr) + } + } + ctx.syncComplete = ctx.syncComplete[:0] + + if len(proxyErrors) > 0 { + err = proxyErrors + } + return +} + // Close frees the underlying buffer and closes the connection. -func (ctx *Context) Close() error { ctx.free(); return ctx.conn.Close() } +func (ctx *Context) Close() (err error) { + ctx.free() + err = ctx.doSyncComplete() + closeErr := ctx.conn.Close() + + if closeErr != nil { + if err == nil { + return closeErr + } else if proxyErrors, ok := err.(ProxyConsumeError); ok { + return &ProxyFatalError{Err: err, ProxyErrs: proxyErrors} + } else { + return + } + } else { + return err + } +}