All checks were successful
Test / Create distribution (push) Successful in 52s
Test / Sandbox (push) Successful in 3m30s
Test / Hakurei (push) Successful in 5m40s
Test / Sandbox (race detector) (push) Successful in 6m30s
Test / Hpkg (push) Successful in 7m21s
Test / Hakurei (race detector) (push) Successful in 3m22s
Test / Flake checks (push) Successful in 2m2s
None of these are valid with their zero value, and the implementations assume they are created by the builder methods. They are by all means an implementation detail and exporting them makes no sense. Signed-off-by: Ophestra <cat@gensokyo.uk>
201 lines
5.4 KiB
Go
201 lines
5.4 KiB
Go
package system
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
|
|
"hakurei.app/container"
|
|
"hakurei.app/system/dbus"
|
|
)
|
|
|
|
var (
|
|
ErrDBusConfig = errors.New("dbus config not supplied")
|
|
)
|
|
|
|
// MustProxyDBus calls ProxyDBus and panics if an error is returned.
|
|
func (sys *I) MustProxyDBus(sessionPath string, session *dbus.Config, systemPath string, system *dbus.Config) *I {
|
|
if _, err := sys.ProxyDBus(session, system, sessionPath, systemPath); err != nil {
|
|
panic(err.Error())
|
|
} else {
|
|
return sys
|
|
}
|
|
}
|
|
|
|
// ProxyDBus finalises configuration ahead of time and starts xdg-dbus-proxy via [dbus] and terminates it on revert.
|
|
// This [Op] is always [Process] scoped.
|
|
func (sys *I) ProxyDBus(session, system *dbus.Config, sessionPath, systemPath string) (func(), error) {
|
|
d := new(dbusProxyOp)
|
|
|
|
// session bus is required as otherwise this is effectively a very expensive noop
|
|
if session == nil {
|
|
return nil, newOpErrorMessage("dbus", ErrDBusConfig,
|
|
"attempted to create message bus proxy args without session bus config", false)
|
|
}
|
|
|
|
// system bus is optional
|
|
d.system = system != nil
|
|
|
|
var sessionBus, systemBus dbus.ProxyPair
|
|
sessionBus[0], systemBus[0] = sys.dbusAddress()
|
|
sessionBus[1], systemBus[1] = sessionPath, systemPath
|
|
d.out = &linePrefixWriter{println: log.Println, prefix: "(dbus) ", buf: new(strings.Builder)}
|
|
if final, err := sys.dbusFinalise(sessionBus, systemBus, session, system); err != nil {
|
|
if errors.Is(err, syscall.EINVAL) {
|
|
return nil, newOpErrorMessage("dbus", err,
|
|
"message bus proxy configuration contains NUL byte", false)
|
|
}
|
|
return nil, newOpErrorMessage("dbus", err,
|
|
fmt.Sprintf("cannot finalise message bus proxy: %v", err), false)
|
|
} else {
|
|
if sys.isVerbose() {
|
|
sys.verbose("session bus proxy:", session.Args(sessionBus))
|
|
if system != nil {
|
|
sys.verbose("system bus proxy:", system.Args(systemBus))
|
|
}
|
|
|
|
// this calls the argsWt String method
|
|
sys.verbose("message bus proxy final args:", final.WriterTo)
|
|
}
|
|
|
|
d.final = final
|
|
}
|
|
|
|
sys.ops = append(sys.ops, d)
|
|
return d.out.Dump, nil
|
|
}
|
|
|
|
// dbusProxyOp implements [I.ProxyDBus].
|
|
type dbusProxyOp struct {
|
|
proxy *dbus.Proxy // populated during apply
|
|
|
|
final *dbus.Final
|
|
out *linePrefixWriter
|
|
// whether system bus proxy is enabled
|
|
system bool
|
|
}
|
|
|
|
func (d *dbusProxyOp) Type() Enablement { return Process }
|
|
|
|
func (d *dbusProxyOp) apply(sys *I) error {
|
|
sys.verbosef("session bus proxy on %q for upstream %q", d.final.Session[1], d.final.Session[0])
|
|
if d.system {
|
|
sys.verbosef("system bus proxy on %q for upstream %q", d.final.System[1], d.final.System[0])
|
|
}
|
|
|
|
d.proxy = dbus.New(sys.ctx, d.final, d.out)
|
|
if err := sys.dbusProxyStart(d.proxy); err != nil {
|
|
d.out.Dump()
|
|
return newOpErrorMessage("dbus", err,
|
|
fmt.Sprintf("cannot start message bus proxy: %v", err), false)
|
|
}
|
|
sys.verbose("starting message bus proxy", d.proxy)
|
|
return nil
|
|
}
|
|
|
|
func (d *dbusProxyOp) revert(sys *I, _ *Criteria) error {
|
|
// criteria ignored here since dbus is always process-scoped
|
|
sys.verbose("terminating message bus proxy")
|
|
sys.dbusProxyClose(d.proxy)
|
|
|
|
exitMessage := "message bus proxy exit"
|
|
defer func() { sys.verbose(exitMessage) }()
|
|
|
|
err := sys.dbusProxyWait(d.proxy)
|
|
if errors.Is(err, context.Canceled) {
|
|
exitMessage = "message bus proxy canceled upstream"
|
|
err = nil
|
|
}
|
|
return newOpErrorMessage("dbus", err,
|
|
fmt.Sprintf("message bus proxy error: %v", err), true)
|
|
}
|
|
|
|
func (d *dbusProxyOp) Is(o Op) bool {
|
|
target, ok := o.(*dbusProxyOp)
|
|
return ok && d != nil && target != nil &&
|
|
d.system == target.system &&
|
|
d.final != nil && target.final != nil &&
|
|
d.final.Session == target.final.Session &&
|
|
d.final.System == target.final.System &&
|
|
dbus.EqualAddrEntries(d.final.SessionUpstream, target.final.SessionUpstream) &&
|
|
dbus.EqualAddrEntries(d.final.SystemUpstream, target.final.SystemUpstream) &&
|
|
reflect.DeepEqual(d.final.WriterTo, target.final.WriterTo)
|
|
}
|
|
|
|
func (d *dbusProxyOp) Path() string { return container.Nonexistent }
|
|
func (d *dbusProxyOp) String() string { return d.proxy.String() }
|
|
|
|
const (
|
|
// lpwSizeThreshold is the threshold of bytes written to linePrefixWriter which,
|
|
// if reached or exceeded, causes linePrefixWriter to drop all future writes.
|
|
lpwSizeThreshold = 1 << 24
|
|
)
|
|
|
|
// linePrefixWriter calls println with a prefix for every line written.
|
|
type linePrefixWriter struct {
|
|
prefix string
|
|
println func(v ...any)
|
|
|
|
n int
|
|
msg []string
|
|
buf *strings.Builder
|
|
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (s *linePrefixWriter) Write(p []byte) (n int, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.write(p, 0)
|
|
}
|
|
|
|
func (s *linePrefixWriter) write(p []byte, a int) (int, error) {
|
|
if s.n >= lpwSizeThreshold {
|
|
if len(p) == 0 {
|
|
return a, nil
|
|
}
|
|
return a, syscall.ENOMEM
|
|
}
|
|
|
|
if i := bytes.IndexByte(p, '\n'); i == -1 {
|
|
n, _ := s.buf.Write(p)
|
|
s.n += n
|
|
return a + n, nil
|
|
} else {
|
|
n, _ := s.buf.Write(p[:i])
|
|
s.n += n + 1
|
|
|
|
v := s.buf.String()
|
|
if strings.HasPrefix(v, "init: ") {
|
|
s.n -= len(v) + 1
|
|
// pass through container init messages
|
|
s.println(s.prefix + v)
|
|
} else {
|
|
s.msg = append(s.msg, v)
|
|
}
|
|
|
|
s.buf.Reset()
|
|
return s.write(p[i+1:], a+n+1)
|
|
}
|
|
}
|
|
|
|
func (s *linePrefixWriter) Dump() {
|
|
s.mu.RLock()
|
|
for _, m := range s.msg {
|
|
s.println(s.prefix + m)
|
|
}
|
|
if s.buf != nil && s.buf.Len() != 0 {
|
|
s.println("*" + s.prefix + s.buf.String())
|
|
}
|
|
if s.n >= lpwSizeThreshold {
|
|
s.println("+" + s.prefix + "write threshold reached, output may be incomplete")
|
|
}
|
|
s.mu.RUnlock()
|
|
}
|