internal/pipewire: store proxy errors in context
All checks were successful
Test / Create distribution (push) Successful in 35s
Test / Sandbox (push) Successful in 2m37s
Test / Sandbox (race detector) (push) Successful in 4m37s
Test / Hakurei (push) Successful in 4m50s
Test / Hpkg (push) Successful in 5m6s
Test / Hakurei (race detector) (push) Successful in 4m39s
Test / Flake checks (push) Successful in 1m41s
All checks were successful
Test / Create distribution (push) Successful in 35s
Test / Sandbox (push) Successful in 2m37s
Test / Sandbox (race detector) (push) Successful in 4m37s
Test / Hakurei (push) Successful in 4m50s
Test / Hpkg (push) Successful in 5m6s
Test / Hakurei (race detector) (push) Successful in 4m39s
Test / Flake checks (push) Successful in 1m41s
This change fixes handling of non-fatal errors during a roundtrip as there can be multiple receive calls per roundtrip. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
parent
490093a659
commit
7cb3308a53
@ -361,18 +361,18 @@ func (core *Core) Sync() error {
|
||||
if err := core.ctx.coreSync(roundtripSyncID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(syncTimeout)
|
||||
|
||||
for !core.done {
|
||||
if time.Now().After(deadline) {
|
||||
return ErrNotDone
|
||||
}
|
||||
|
||||
if err := core.ctx.Roundtrip(); err != nil {
|
||||
if err := core.ctx.roundtrip(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return core.ctx.cloneProxyErrors()
|
||||
}
|
||||
|
||||
// The CorePong message is sent from the client to the server when the server emits the Ping event.
|
||||
|
||||
@ -69,6 +69,9 @@ type Context struct {
|
||||
// These are not automatically set up as [os.File] because it is impossible
|
||||
// to undo the effects of os.NewFile, which can be inconvenient for some uses.
|
||||
receivedFiles []int
|
||||
// Non-protocol errors encountered during event handling of the current Roundtrip;
|
||||
// errors that prevent event processing from continuing must be panicked.
|
||||
proxyErrors ProxyConsumeError
|
||||
// Pending footer value for the next outgoing message.
|
||||
// Newer footers appear to simply replace the existing one.
|
||||
pendingFooter KnownSize
|
||||
@ -80,11 +83,12 @@ type Context struct {
|
||||
// Proxy for built-in client events.
|
||||
client Client
|
||||
|
||||
// Passed to [Conn.ReadMsgUnix]. Not copied if sufficient for all received messages.
|
||||
// Passed to [Conn.Recvmsg]. Not copied if sufficient for all received messages.
|
||||
iovecBuf [1 << 15]byte
|
||||
// Passed to [Conn.ReadMsgUnix] for ancillary messages and is never copied.
|
||||
// Passed to [Conn.Recvmsg] for ancillary messages and is never copied.
|
||||
oobBuf [(_SCM_MAX_FD/2+_SCM_MAX_FD%2+2)<<3 + 1]byte
|
||||
// Underlying connection, usually implemented by [net.UnixConn].
|
||||
// Underlying connection, usually implemented by [net.UnixConn]
|
||||
// via the [SyscallConn] adapter.
|
||||
conn Conn
|
||||
}
|
||||
|
||||
@ -524,6 +528,27 @@ func (e ProxyConsumeError) Error() string {
|
||||
return s
|
||||
}
|
||||
|
||||
// cloneAsProxyErrors clones and truncates proxyErrors if it contains errors,
|
||||
// returning the cloned slice.
|
||||
func (ctx *Context) cloneAsProxyErrors() (proxyErrors ProxyConsumeError) {
|
||||
if len(ctx.proxyErrors) == 0 {
|
||||
return
|
||||
}
|
||||
proxyErrors = slices.Clone(ctx.proxyErrors)
|
||||
ctx.proxyErrors = ctx.proxyErrors[:0]
|
||||
return
|
||||
}
|
||||
|
||||
// cloneProxyErrors is like cloneAsProxyErrors, but returns nil if proxyErrors
|
||||
// does not contain errors.
|
||||
func (ctx *Context) cloneProxyErrors() (err error) {
|
||||
proxyErrors := ctx.cloneAsProxyErrors()
|
||||
if len(proxyErrors) > 0 {
|
||||
err = proxyErrors
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// roundtripSyncID is the id passed to Context.coreSync during a [Context.Roundtrip].
|
||||
const roundtripSyncID = 0
|
||||
|
||||
@ -533,6 +558,15 @@ const roundtripSyncID = 0
|
||||
// For a non-nil error, if the error happens over the network, it has concrete type
|
||||
// [os.SyscallError].
|
||||
func (ctx *Context) Roundtrip() (err error) {
|
||||
err = ctx.roundtrip()
|
||||
if err == nil {
|
||||
err = ctx.cloneProxyErrors()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// roundtrip implements the Roundtrip method without checking proxyErrors.
|
||||
func (ctx *Context) roundtrip() (err error) {
|
||||
if err = ctx.sendmsg(ctx.buf, ctx.pendingFiles...); err != nil {
|
||||
return
|
||||
}
|
||||
@ -544,13 +578,14 @@ func (ctx *Context) Roundtrip() (err error) {
|
||||
continue
|
||||
}
|
||||
|
||||
// only returned by recvmsg
|
||||
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
|
||||
if len(remaining) == 0 {
|
||||
err = nil
|
||||
} else if len(remaining) < SizeHeader {
|
||||
err = ErrRoundtripEOFHeader
|
||||
err = &ProxyFatalError{Err: ErrRoundtripEOFHeader, ProxyErrs: ctx.cloneAsProxyErrors()}
|
||||
} else {
|
||||
err = ErrRoundtripEOFBody
|
||||
err = &ProxyFatalError{Err: ErrRoundtripEOFBody, ProxyErrs: ctx.cloneAsProxyErrors()}
|
||||
}
|
||||
}
|
||||
return
|
||||
@ -559,11 +594,6 @@ func (ctx *Context) Roundtrip() (err error) {
|
||||
|
||||
// consume receives messages from the server and processes events.
|
||||
func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err error) {
|
||||
var (
|
||||
// this holds onto non-protocol errors encountered during event handling;
|
||||
// errors that prevent event processing from continuing must be panicked
|
||||
proxyErrors ProxyConsumeError
|
||||
)
|
||||
defer func() {
|
||||
// anything before this has already been processed and must not be closed
|
||||
// here, as anything holding onto them will end up with a dangling fd that
|
||||
@ -594,7 +624,7 @@ func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err erro
|
||||
panic(&runtime.PanicNilError{})
|
||||
}
|
||||
|
||||
err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: proxyErrors}
|
||||
err = &ProxyFatalError{Err: recoveredErr, ProxyErrs: ctx.cloneAsProxyErrors()}
|
||||
return
|
||||
}()
|
||||
|
||||
@ -644,7 +674,7 @@ func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err erro
|
||||
})
|
||||
remaining = remaining[header.Size:]
|
||||
if proxyErr != nil {
|
||||
proxyErrors = append(proxyErrors, proxyErr)
|
||||
ctx.proxyErrors = append(ctx.proxyErrors, proxyErr)
|
||||
}
|
||||
}
|
||||
|
||||
@ -666,13 +696,6 @@ func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err erro
|
||||
ctx.receivedFiles = ctx.receivedFiles[:0]
|
||||
}
|
||||
|
||||
// these are checked and made available first since they describe the cause
|
||||
// of so-called symptoms checked after this point; the symptoms should only
|
||||
// be made available as a catch-all if these are unavailable
|
||||
if len(proxyErrors) > 0 {
|
||||
return remaining, proxyErrors
|
||||
}
|
||||
|
||||
// populated early for finalizers
|
||||
if len(danglingFiles) > 0 {
|
||||
return remaining, danglingFiles
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user