From 7cb3308a53f55bd41c6524c9e5e37667fa3b343e Mon Sep 17 00:00:00 2001 From: Ophestra Date: Sat, 6 Dec 2025 19:13:46 +0900 Subject: [PATCH] internal/pipewire: store proxy errors in context This change fixes handling of non-fatal errors during a roundtrip as there can be multiple receive calls per roundtrip. Signed-off-by: Ophestra --- internal/pipewire/core.go | 6 ++-- internal/pipewire/pipewire.go | 61 ++++++++++++++++++++++++----------- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/internal/pipewire/core.go b/internal/pipewire/core.go index c2913b2..3b67a50 100644 --- a/internal/pipewire/core.go +++ b/internal/pipewire/core.go @@ -361,18 +361,18 @@ func (core *Core) Sync() error { if err := core.ctx.coreSync(roundtripSyncID); err != nil { return err } - deadline := time.Now().Add(syncTimeout) + for !core.done { if time.Now().After(deadline) { return ErrNotDone } - if err := core.ctx.Roundtrip(); err != nil { + if err := core.ctx.roundtrip(); err != nil { return err } } - return nil + return core.ctx.cloneProxyErrors() } // The CorePong message is sent from the client to the server when the server emits the Ping event. diff --git a/internal/pipewire/pipewire.go b/internal/pipewire/pipewire.go index 331fcf1..457f2b0 100644 --- a/internal/pipewire/pipewire.go +++ b/internal/pipewire/pipewire.go @@ -69,6 +69,9 @@ type Context struct { // These are not automatically set up as [os.File] because it is impossible // to undo the effects of os.NewFile, which can be inconvenient for some uses. receivedFiles []int + // Non-protocol errors encountered during event handling of the current Roundtrip; + // errors that prevent event processing from continuing must be panicked. + proxyErrors ProxyConsumeError // Pending footer value for the next outgoing message. // Newer footers appear to simply replace the existing one. pendingFooter KnownSize @@ -80,11 +83,12 @@ type Context struct { // Proxy for built-in client events. client Client - // Passed to [Conn.ReadMsgUnix]. Not copied if sufficient for all received messages. + // Passed to [Conn.Recvmsg]. Not copied if sufficient for all received messages. iovecBuf [1 << 15]byte - // Passed to [Conn.ReadMsgUnix] for ancillary messages and is never copied. + // Passed to [Conn.Recvmsg] for ancillary messages and is never copied. oobBuf [(_SCM_MAX_FD/2+_SCM_MAX_FD%2+2)<<3 + 1]byte - // Underlying connection, usually implemented by [net.UnixConn]. + // Underlying connection, usually implemented by [net.UnixConn] + // via the [SyscallConn] adapter. conn Conn } @@ -524,6 +528,27 @@ func (e ProxyConsumeError) Error() string { return s } +// cloneAsProxyErrors clones and truncates proxyErrors if it contains errors, +// returning the cloned slice. +func (ctx *Context) cloneAsProxyErrors() (proxyErrors ProxyConsumeError) { + if len(ctx.proxyErrors) == 0 { + return + } + proxyErrors = slices.Clone(ctx.proxyErrors) + ctx.proxyErrors = ctx.proxyErrors[:0] + return +} + +// cloneProxyErrors is like cloneAsProxyErrors, but returns nil if proxyErrors +// does not contain errors. +func (ctx *Context) cloneProxyErrors() (err error) { + proxyErrors := ctx.cloneAsProxyErrors() + if len(proxyErrors) > 0 { + err = proxyErrors + } + return +} + // roundtripSyncID is the id passed to Context.coreSync during a [Context.Roundtrip]. const roundtripSyncID = 0 @@ -533,6 +558,15 @@ const roundtripSyncID = 0 // For a non-nil error, if the error happens over the network, it has concrete type // [os.SyscallError]. func (ctx *Context) Roundtrip() (err error) { + err = ctx.roundtrip() + if err == nil { + err = ctx.cloneProxyErrors() + } + return +} + +// roundtrip implements the Roundtrip method without checking proxyErrors. +func (ctx *Context) roundtrip() (err error) { if err = ctx.sendmsg(ctx.buf, ctx.pendingFiles...); err != nil { return } @@ -544,13 +578,14 @@ func (ctx *Context) Roundtrip() (err error) { continue } + // only returned by recvmsg if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK { if len(remaining) == 0 { err = nil } else if len(remaining) < SizeHeader { - err = ErrRoundtripEOFHeader + err = &ProxyFatalError{Err: ErrRoundtripEOFHeader, ProxyErrs: ctx.cloneAsProxyErrors()} } else { - err = ErrRoundtripEOFBody + err = &ProxyFatalError{Err: ErrRoundtripEOFBody, ProxyErrs: ctx.cloneAsProxyErrors()} } } return @@ -559,11 +594,6 @@ func (ctx *Context) Roundtrip() (err error) { // consume receives messages from the server and processes events. func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err error) { - var ( - // this holds onto non-protocol errors encountered during event handling; - // errors that prevent event processing from continuing must be panicked - proxyErrors ProxyConsumeError - ) defer func() { // anything before this has already been processed and must not be closed // here, as anything holding onto them will end up with a dangling fd that @@ -594,7 +624,7 @@ func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err erro panic(&runtime.PanicNilError{}) } - err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: proxyErrors} + err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: ctx.cloneAsProxyErrors()} return }() @@ -644,7 +674,7 @@ func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err erro }) remaining = remaining[header.Size:] if proxyErr != nil { - proxyErrors = append(proxyErrors, proxyErr) + ctx.proxyErrors = append(ctx.proxyErrors, proxyErr) } } @@ -666,13 +696,6 @@ func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err erro ctx.receivedFiles = ctx.receivedFiles[:0] } - // these are checked and made available first since they describe the cause - // of so-called symptoms checked after this point; the symptoms should only - // be made available as a catch-all if these are unavailable - if len(proxyErrors) > 0 { - return remaining, proxyErrors - } - // populated early for finalizers if len(danglingFiles) > 0 { return remaining, danglingFiles