diff --git a/internal/pipewire/core.go b/internal/pipewire/core.go index 35340c8..18d664e 100644 --- a/internal/pipewire/core.go +++ b/internal/pipewire/core.go @@ -6,6 +6,7 @@ import ( "maps" "slices" "strconv" + "syscall" "time" ) @@ -615,6 +616,14 @@ func (e *UnknownBoundIdError[E]) Error() string { return "unknown bound proxy id " + strconv.Itoa(int(e.Id)) } +// An InvalidPingError is a [CorePing] event targeting a proxy id that was never allocated. +type InvalidPingError CorePing + +func (e *InvalidPingError) Error() string { + return "received Core::Ping seq " + strconv.Itoa(int(e.Sequence)) + + " targeting unknown proxy id " + strconv.Itoa(int(e.ID)) +} + func (core *Core) consume(opcode byte, files []int, unmarshal func(v any)) error { closeReceivedFiles(files...) switch opcode { @@ -637,6 +646,23 @@ func (core *Core) consume(opcode byte, files []int, unmarshal func(v any)) error // anything, and this behaviour is never mentioned in documentation return nil + case PW_CORE_EVENT_PING: + var ping CorePing + unmarshal(&ping) + if _, ok := core.ctx.proxy[ping.ID]; ok { + core.ctx.mustWriteMessage(PW_ID_CORE, (*CorePong)(&ping)) + return nil + } else { + invalidPingError := InvalidPingError(ping) + core.ctx.mustWriteMessage(PW_ID_CORE, &CoreErrorMethod{CoreError{ + ID: PW_ID_CORE, + Sequence: core.ctx.currentRemoteSeq(), + Result: -Int(syscall.EINVAL), + Message: invalidPingError.Error(), + }}) + return &invalidPingError + } + case PW_CORE_EVENT_ERROR: var coreError CoreError unmarshal(&coreError) diff --git a/internal/pipewire/pipewire.go b/internal/pipewire/pipewire.go index d17edf9..59ce7c5 100644 --- a/internal/pipewire/pipewire.go +++ b/internal/pipewire/pipewire.go @@ -218,6 +218,14 @@ func (ctx *Context) writeMessage(Id Int, v Message) (err error) { return } +// mustWriteMessage calls writeMessage and panics if a non-nil error is returned. +// This must only be called from eventProxy.consume. +func (ctx *Context) mustWriteMessage(Id Int, v Message) { + if err := ctx.writeMessage(Id, v); err != nil { + panic(err) + } +} + // newProxyId returns a newly allocated proxy Id for the specified type. func (ctx *Context) newProxyId(proxy eventProxy, ack bool) Int { newId := ctx.nextId @@ -620,6 +628,10 @@ func (ctx *Context) roundtrip() (err error) { } } +// currentRemoteSeq returns the current remote sequence number. +// This must only be called from eventProxy.consume. +func (ctx *Context) currentRemoteSeq() Int { return ctx.remoteSequence - 1 } + // consume receives messages from the server and processes events. func (ctx *Context) consume(receiveRemaining []byte) (remaining []byte, err error) { defer func() { diff --git a/internal/pipewire/pipewire_test.go b/internal/pipewire/pipewire_test.go index b6de283..fbe1571 100644 --- a/internal/pipewire/pipewire_test.go +++ b/internal/pipewire/pipewire_test.go @@ -845,6 +845,7 @@ func TestContextErrors(t *testing.T) { {"RoundtripUnexpectedEOFError ErrRoundtripEOFFooter", pipewire.ErrRoundtripEOFFooter, "unexpected EOF establishing message footer bounds"}, {"RoundtripUnexpectedEOFError ErrRoundtripEOFFooterOpcode", pipewire.ErrRoundtripEOFFooterOpcode, "unexpected EOF decoding message footer opcode"}, {"RoundtripUnexpectedEOFError invalid", pipewire.RoundtripUnexpectedEOFError(0xbad), "unexpected EOF"}, + {"InconsistentFilesError", &pipewire.InconsistentFilesError{0, 2}, "queued 0 files instead of the expected 2"}, {"UnsupportedOpcodeError", &pipewire.UnsupportedOpcodeError{ Opcode: 0xff, @@ -855,6 +856,11 @@ func TestContextErrors(t *testing.T) { Id: -1, Data: "\x00", }, "unknown proxy id -1"}, + + {"InvalidPingError", &pipewire.InvalidPingError{ + ID: 0xbad, + Sequence: 0xcafe, + }, "received Core::Ping seq 51966 targeting unknown proxy id 2989"}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) {